diff --git a/packages/d2ts/src/operators/index.ts b/packages/d2ts/src/operators/index.ts index f19a228..2adec33 100644 --- a/packages/d2ts/src/operators/index.ts +++ b/packages/d2ts/src/operators/index.ts @@ -15,6 +15,7 @@ export * from './iterate.js' export * from './keying.js' export * from './topK.js' export * from './topKWithFractionalIndex.js' +export * from './topKWithPreviousRef.js' export * from './orderBy.js' export * from './filterBy.js' export { groupBy, groupByOperators } from './groupBy.js' diff --git a/packages/d2ts/src/operators/topKWithPreviousRef.ts b/packages/d2ts/src/operators/topKWithPreviousRef.ts new file mode 100644 index 0000000..61b4f39 --- /dev/null +++ b/packages/d2ts/src/operators/topKWithPreviousRef.ts @@ -0,0 +1,268 @@ +import { + IStreamBuilder, + DataMessage, + MessageType, + KeyValue, + PipedOperator, +} from '../types.js' +import { + DifferenceStreamReader, + DifferenceStreamWriter, + UnaryOperator, +} from '../graph.js' +import { StreamBuilder } from '../d2.js' +import { MultiSet } from '../multiset.js' +import { Antichain, Version } from '../order.js' +import { Index } from '../version-index.js' + +interface topKWithPreviousRefOptions { + limit?: number + offset?: number + previousRef: (value: V1) => Ref + nullValue?: Ref // Optional null value for the first element +} + +/** + * Limits the number of results based on a comparator, with optional offset. + * This works on a keyed stream, where the key is the first element of the tuple. + * The ordering is within a key group, i.e. elements are sorted within a key group + * and the limit + offset is applied to that sorted group. + * To order the entire stream, key by the same value for all elements such as null. + * + * Each output message is a `[key, [value, previousRef]]` tuple where `previousRef` is + * a reference to the previous element extracted with the `previousRef` function. + * The `previousRef` function is provided as an option to the operator. + * This creates a linked list structure for fine-grained insert/update/delete operations. + * + * @param comparator - A function that compares two elements + * @param options - An optional object containing limit, offset and previousRef properties + * @returns A piped operator that orders the elements and limits the number of results + */ +export class TopKWithPreviousRefOperator extends UnaryOperator< + [K, V1 | [V1, Ref]] +> { + #index = new Index() + #indexOut = new Index() + #keysTodo = new Map>() + #comparator: (a: V1, b: V1) => number + #limit: number + #offset: number + #previousRef: (value: V1) => Ref + #nullValue: Ref | undefined + + constructor( + id: number, + inputA: DifferenceStreamReader<[K, V1]>, + output: DifferenceStreamWriter<[K, [V1, Ref]]>, + comparator: (a: V1, b: V1) => number, + options: topKWithPreviousRefOptions, + initialFrontier: Antichain, + ) { + super(id, inputA, output, initialFrontier) + this.#comparator = comparator + this.#limit = options.limit ?? Infinity + this.#offset = options.offset ?? 0 + this.#previousRef = options.previousRef + this.#nullValue = options.nullValue + } + + run(): void { + for (const message of this.inputMessages()) { + if (message.type === MessageType.DATA) { + const { version, collection } = message.data as DataMessage<[K, V1]> + for (const [item, multiplicity] of collection.getInner()) { + const [key, value] = item + this.#index.addValue(key, version, [value, multiplicity]) + + let todoSet = this.#keysTodo.get(version) + if (!todoSet) { + todoSet = new Set() + this.#keysTodo.set(version, todoSet) + } + todoSet.add(key) + + // Add key to all join versions + for (const v2 of this.#index.versions(key)) { + const joinVersion = version.join(v2) + let joinTodoSet = this.#keysTodo.get(joinVersion) + if (!joinTodoSet) { + joinTodoSet = new Set() + this.#keysTodo.set(joinVersion, joinTodoSet) + } + joinTodoSet.add(key) + } + } + } else if (message.type === MessageType.FRONTIER) { + const frontier = message.data as Antichain + if (!this.inputFrontier().lessEqual(frontier)) { + throw new Error('Invalid frontier update') + } + this.setInputFrontier(frontier) + } + } + + // Find versions that are complete + const finishedVersions = Array.from(this.#keysTodo.entries()) + .filter(([version]) => !this.inputFrontier().lessEqualVersion(version)) + .sort(([a], [b]) => { + return a.lessEqual(b) ? -1 : 1 + }) + + for (const [version, keys] of finishedVersions) { + const result: [[K, [V1, Ref]], number][] = [] + + for (const key of keys) { + const curr = this.#index.reconstructAt(key, version) + const currOut = this.#indexOut.reconstructAt(key, version) + + // Sort the current values + const consolidated = new MultiSet(curr).consolidate() + const sortedValues = consolidated + .getInner() + .sort((a, b) => this.#comparator(a[0] as V1, b[0] as V1)) + .slice(this.#offset, this.#offset + this.#limit) + + // Create maps for quick lookup + const currValueMap = new Map() + const prevOutputMap = new Map() + const valueToKey = new Map() + + // Process current values + for (const [value, multiplicity] of sortedValues) { + if (multiplicity > 0) { + let valueKey = valueToKey.get(value) + if (!valueKey) { + valueKey = JSON.stringify(value) + valueToKey.set(value, valueKey) + } + currValueMap.set(valueKey, value) + } + } + + // Process previous output values + for (const [[value, previousRef], multiplicity] of currOut) { + if (multiplicity > 0) { + let valueKey = valueToKey.get(value) + if (!valueKey) { + valueKey = JSON.stringify(value) + valueToKey.set(value, valueKey) + } + prevOutputMap.set(valueKey, [value, previousRef]) + } + } + + // Find values that are no longer in the result + for (const [valueKey, [value, previousRef]] of prevOutputMap.entries()) { + if (!currValueMap.has(valueKey)) { + // Value is no longer in the result, remove it + result.push([[key, [value, previousRef]], -1]) + this.#indexOut.addValue(key, version, [[value, previousRef], -1]) + } + } + + // Create the linked list structure + // Each element gets a reference to the previous element in the sorted order + let previousValue: V1 | null = null + + for (let i = 0; i < sortedValues.length; i++) { + const [value, multiplicity] = sortedValues[i] + if (multiplicity <= 0) continue + + const valueKey = valueToKey.get(value) as string + + // Calculate the previous reference + // The previousRef function extracts a reference from a value + // For the linked list, we want the reference to the previous element + let previousRef: Ref + if (previousValue === null) { + // First element - use the provided nullValue or throw error if not provided + if (this.#nullValue !== undefined) { + previousRef = this.#nullValue + } else { + throw new Error('First element in topKWithPreviousRef requires a nullValue to be specified in options') + } + } else { + // Extract reference from the previous element + previousRef = this.#previousRef(previousValue) + } + + // Check if this is a new value or if the previousRef has changed + const existingEntry = prevOutputMap.get(valueKey) + + if (!existingEntry) { + // New value + result.push([[key, [value, previousRef]], 1]) + this.#indexOut.addValue(key, version, [[value, previousRef], 1]) + } else if (JSON.stringify(existingEntry[1]) !== JSON.stringify(previousRef)) { + // Previous reference has changed, remove old entry and add new one + result.push([[key, existingEntry], -1]) + result.push([[key, [value, previousRef]], 1]) + this.#indexOut.addValue(key, version, [existingEntry, -1]) + this.#indexOut.addValue(key, version, [[value, previousRef], 1]) + } + // If the value exists and the previousRef hasn't changed, do nothing + + // Update previousValue for the next iteration + previousValue = value + } + } + + if (result.length > 0) { + this.output.sendData(version, new MultiSet(result)) + } + this.#keysTodo.delete(version) + } + + if (!this.outputFrontier.lessEqual(this.inputFrontier())) { + throw new Error('Invalid frontier state') + } + if (this.outputFrontier.lessThan(this.inputFrontier())) { + this.outputFrontier = this.inputFrontier() + this.output.sendFrontier(this.outputFrontier) + this.#index.compact(this.outputFrontier) + this.#indexOut.compact(this.outputFrontier) + } + } +} + +/** + * Limits the number of results based on a comparator, with optional offset. + * This works on a keyed stream, where the key is the first element of the tuple. + * The ordering is within a key group, i.e. elements are sorted within a key group + * and the limit + offset is applied to that sorted group. + * To order the entire stream, key by the same value for all elements such as null. + * + * Creates a linked list structure where each element contains a reference to the + * previous element in the sorted order, enabling fine-grained insert/update/delete operations. + * + * @param comparator - A function that compares two elements + * @param options - An object containing limit, offset and previousRef properties + * @returns A piped operator that orders the elements and limits the number of results + */ +export function topKWithPreviousRef< + K extends T extends KeyValue ? K : never, + V1 extends T extends KeyValue ? V : never, + T, + Ref, +>( + comparator: (a: V1, b: V1) => number, + options: topKWithPreviousRefOptions, +): PipedOperator> { + return (stream: IStreamBuilder): IStreamBuilder> => { + const output = new StreamBuilder>( + stream.graph, + new DifferenceStreamWriter>(), + ) + const operator = new TopKWithPreviousRefOperator( + stream.graph.getNextOperatorId(), + stream.connectReader() as DifferenceStreamReader>, + output.writer, + comparator, + options, + stream.graph.frontier(), + ) + stream.graph.addOperator(operator) + stream.graph.addStream(output.connectReader()) + return output + } +} \ No newline at end of file diff --git a/packages/d2ts/tests/operators/topKWithPreviousRef.test.ts b/packages/d2ts/tests/operators/topKWithPreviousRef.test.ts new file mode 100644 index 0000000..7816fdd --- /dev/null +++ b/packages/d2ts/tests/operators/topKWithPreviousRef.test.ts @@ -0,0 +1,551 @@ +import { describe, it, expect } from 'vitest' +import { D2 } from '../../src/d2.js' +import { MultiSet } from '../../src/multiset.js' +import { MessageType } from '../../src/types.js' +import { topKWithPreviousRef } from '../../src/operators/topKWithPreviousRef.js' +import { output } from '../../src/operators/index.js' + +// Helper function to verify the linked list structure +function verifyLinkedListOrder(results: any[], expectedOrder: string[], nullValue: any = null) { + // Extract values and their previous references + const valuesWithPrevRefs = results.map(([[_, [value, prevRef]]]) => ({ + value: value.value, + id: value.id, + prevRef, + })) + + // Build a map from reference to value for quick lookup + const refToValue = new Map() + for (const item of valuesWithPrevRefs) { + refToValue.set(item.id, item.value) + } + + // Follow the linked list to verify order + // First, find the element with the specified null value as previous reference + const firstElement = valuesWithPrevRefs.find(item => item.prevRef === nullValue) + expect(firstElement).toBeDefined() + expect(firstElement!.value).toBe(expectedOrder[0]) + + // Then follow the chain + let current = firstElement + let actualOrder = [current!.value] + + for (let i = 1; i < expectedOrder.length; i++) { + // Find the element that has current element as previous reference + const next = valuesWithPrevRefs.find(item => item.prevRef === current!.id) + expect(next).toBeDefined() + actualOrder.push(next!.value) + current = next + } + + expect(actualOrder).toEqual(expectedOrder) +} + +describe('Operators', () => { + describe('TopKWithPreviousRef operation', () => { + it('should create a linked list structure with previous references', () => { + const graph = new D2({ initialFrontier: 0 }) + const input = graph.newInput<[null, { id: number; value: string }]>() + const allMessages: any[] = [] + + input.pipe( + topKWithPreviousRef( + (a, b) => a.value.localeCompare(b.value), + { + previousRef: (value) => value.id, + nullValue: null, + } + ), + output((message) => { + if (message.type === MessageType.DATA) { + allMessages.push(message.data) + } + }), + ) + + graph.finalize() + + // Initial data - c, a, b, d + input.sendData( + 0, + new MultiSet([ + [[null, { id: 3, value: 'c' }], 1], + [[null, { id: 1, value: 'a' }], 1], + [[null, { id: 2, value: 'b' }], 1], + [[null, { id: 4, value: 'd' }], 1], + ]), + ) + input.sendFrontier(1) + graph.run() + + // Initial result should have all elements with linked list structure + const initialResult = allMessages[0].collection.getInner() + expect(initialResult.length).toBe(4) + + // Verify the linked list structure follows alphabetical order: a -> b -> c -> d + verifyLinkedListOrder(initialResult, ['a', 'b', 'c', 'd'], null) + + // Check specific previous references + const valueToEntry = new Map() + for (const [[_, [value, prevRef]]] of initialResult) { + valueToEntry.set(value.value, { value, prevRef }) + } + + // 'a' should have null as previous reference (first element) + expect(valueToEntry.get('a').prevRef).toBe(null) + // 'b' should have 'a' as previous reference (id: 1) + expect(valueToEntry.get('b').prevRef).toBe(1) + // 'c' should have 'b' as previous reference (id: 2) + expect(valueToEntry.get('c').prevRef).toBe(2) + // 'd' should have 'c' as previous reference (id: 3) + expect(valueToEntry.get('d').prevRef).toBe(3) + }) + + it('should handle element insertion correctly', () => { + const graph = new D2({ initialFrontier: 0 }) + const input = graph.newInput<[null, { id: number; value: string }]>() + const allMessages: any[] = [] + + input.pipe( + topKWithPreviousRef( + (a, b) => a.value.localeCompare(b.value), + { + previousRef: (value) => value.id, + nullValue: null, + } + ), + output((message) => { + if (message.type === MessageType.DATA) { + allMessages.push(message.data) + } + }), + ) + + graph.finalize() + + // Initial data - a, c, e + input.sendData( + 0, + new MultiSet([ + [[null, { id: 1, value: 'a' }], 1], + [[null, { id: 3, value: 'c' }], 1], + [[null, { id: 5, value: 'e' }], 1], + ]), + ) + input.sendFrontier(1) + graph.run() + + // Initial result: a -> c -> e + const initialResult = allMessages[0].collection.getInner() + expect(initialResult.length).toBe(3) + verifyLinkedListOrder(initialResult, ['a', 'c', 'e'], null) + + // Insert 'b' and 'd' in the middle + input.sendData( + 1, + new MultiSet([ + [[null, { id: 2, value: 'b' }], 1], + [[null, { id: 4, value: 'd' }], 1], + ]), + ) + input.sendFrontier(2) + graph.run() + + // Check the changes + const changes = allMessages[1].collection.getInner() + expect(changes.length).toBeGreaterThan(0) + + // Reconstruct the current state by applying the changes + const currentState = new Map() + for (const [[_, [value, prevRef]]] of initialResult) { + currentState.set(JSON.stringify(value), [value, prevRef]) + } + + // Apply the changes + for (const [[_, [value, prevRef]], multiplicity] of changes) { + if (multiplicity < 0) { + currentState.delete(JSON.stringify(value)) + } else { + currentState.set(JSON.stringify(value), [value, prevRef]) + } + } + + // Convert to array for verification + const currentStateArray = Array.from(currentState.values()).map( + ([value, prevRef]) => [[null, [value, prevRef]], 1], + ) + + // Verify the final linked list structure: a -> b -> c -> d -> e + verifyLinkedListOrder(currentStateArray, ['a', 'b', 'c', 'd', 'e'], null) + }) + + it('should handle element removal correctly', () => { + const graph = new D2({ initialFrontier: 0 }) + const input = graph.newInput<[null, { id: number; value: string }]>() + const allMessages: any[] = [] + + input.pipe( + topKWithPreviousRef( + (a, b) => a.value.localeCompare(b.value), + { + previousRef: (value) => value.id, + nullValue: null, + } + ), + output((message) => { + if (message.type === MessageType.DATA) { + allMessages.push(message.data) + } + }), + ) + + graph.finalize() + + // Initial data - a, b, c, d, e + input.sendData( + 0, + new MultiSet([ + [[null, { id: 1, value: 'a' }], 1], + [[null, { id: 2, value: 'b' }], 1], + [[null, { id: 3, value: 'c' }], 1], + [[null, { id: 4, value: 'd' }], 1], + [[null, { id: 5, value: 'e' }], 1], + ]), + ) + input.sendFrontier(1) + graph.run() + + // Initial result: a -> b -> c -> d -> e + const initialResult = allMessages[0].collection.getInner() + expect(initialResult.length).toBe(5) + verifyLinkedListOrder(initialResult, ['a', 'b', 'c', 'd', 'e'], null) + + // Remove 'b' and 'd' + input.sendData( + 1, + new MultiSet([ + [[null, { id: 2, value: 'b' }], -1], + [[null, { id: 4, value: 'd' }], -1], + ]), + ) + input.sendFrontier(2) + graph.run() + + // Check the changes + const changes = allMessages[1].collection.getInner() + expect(changes.length).toBeGreaterThan(0) + + // Reconstruct the current state + const currentState = new Map() + for (const [[_, [value, prevRef]]] of initialResult) { + currentState.set(JSON.stringify(value), [value, prevRef]) + } + + // Apply the changes + for (const [[_, [value, prevRef]], multiplicity] of changes) { + if (multiplicity < 0) { + currentState.delete(JSON.stringify(value)) + } else { + currentState.set(JSON.stringify(value), [value, prevRef]) + } + } + + // Convert to array for verification + const currentStateArray = Array.from(currentState.values()).map( + ([value, prevRef]) => [[null, [value, prevRef]], 1], + ) + + // Verify the final linked list structure: a -> c -> e + verifyLinkedListOrder(currentStateArray, ['a', 'c', 'e'], null) + }) + + it('should handle element movement correctly', () => { + const graph = new D2({ initialFrontier: 0 }) + const input = graph.newInput<[null, { id: number; value: string }]>() + const allMessages: any[] = [] + + input.pipe( + topKWithPreviousRef( + (a, b) => a.value.localeCompare(b.value), + { + previousRef: (value) => value.id, + nullValue: null, + } + ), + output((message) => { + if (message.type === MessageType.DATA) { + allMessages.push(message.data) + } + }), + ) + + graph.finalize() + + // Initial data - a, b, c, d + input.sendData( + 0, + new MultiSet([ + [[null, { id: 1, value: 'a' }], 1], + [[null, { id: 2, value: 'b' }], 1], + [[null, { id: 3, value: 'c' }], 1], + [[null, { id: 4, value: 'd' }], 1], + ]), + ) + input.sendFrontier(1) + graph.run() + + // Initial result: a -> b -> c -> d + const initialResult = allMessages[0].collection.getInner() + expect(initialResult.length).toBe(4) + verifyLinkedListOrder(initialResult, ['a', 'b', 'c', 'd'], null) + + // Move 'b' to the end by changing its value to 'z' + input.sendData( + 1, + new MultiSet([ + [[null, { id: 2, value: 'z' }], 1], + [[null, { id: 2, value: 'b' }], -1], + ]), + ) + input.sendFrontier(2) + graph.run() + + // Check the changes + const changes = allMessages[1].collection.getInner() + expect(changes.length).toBeGreaterThan(0) + + // Reconstruct the current state + const currentState = new Map() + for (const [[_, [value, prevRef]]] of initialResult) { + currentState.set(JSON.stringify(value), [value, prevRef]) + } + + // Apply the changes + for (const [[_, [value, prevRef]], multiplicity] of changes) { + if (multiplicity < 0) { + currentState.delete(JSON.stringify(value)) + } else { + currentState.set(JSON.stringify(value), [value, prevRef]) + } + } + + // Convert to array for verification + const currentStateArray = Array.from(currentState.values()).map( + ([value, prevRef]) => [[null, [value, prevRef]], 1], + ) + + // Verify the final linked list structure: a -> c -> d -> z + verifyLinkedListOrder(currentStateArray, ['a', 'c', 'd', 'z'], null) + }) + + it('should handle limit correctly', () => { + const graph = new D2({ initialFrontier: 0 }) + const input = graph.newInput<[null, { id: number; value: string }]>() + const allMessages: any[] = [] + + input.pipe( + topKWithPreviousRef( + (a, b) => a.value.localeCompare(b.value), + { + previousRef: (value) => value.id, + nullValue: null, + limit: 3, + } + ), + output((message) => { + if (message.type === MessageType.DATA) { + allMessages.push(message.data) + } + }), + ) + + graph.finalize() + + // Initial data - a, b, c, d, e + input.sendData( + 0, + new MultiSet([ + [[null, { id: 1, value: 'a' }], 1], + [[null, { id: 2, value: 'b' }], 1], + [[null, { id: 3, value: 'c' }], 1], + [[null, { id: 4, value: 'd' }], 1], + [[null, { id: 5, value: 'e' }], 1], + ]), + ) + input.sendFrontier(1) + graph.run() + + // Should only have first 3 elements: a -> b -> c + const initialResult = allMessages[0].collection.getInner() + expect(initialResult.length).toBe(3) + verifyLinkedListOrder(initialResult, ['a', 'b', 'c'], null) + + // Check that we have the right elements + const values = initialResult.map(([[_, [value, __]]]) => value.value).sort() + expect(values).toEqual(['a', 'b', 'c']) + }) + + it('should handle offset correctly', () => { + const graph = new D2({ initialFrontier: 0 }) + const input = graph.newInput<[null, { id: number; value: string }]>() + const allMessages: any[] = [] + + input.pipe( + topKWithPreviousRef( + (a, b) => a.value.localeCompare(b.value), + { + previousRef: (value) => value.id, + nullValue: null, + offset: 2, + } + ), + output((message) => { + if (message.type === MessageType.DATA) { + allMessages.push(message.data) + } + }), + ) + + graph.finalize() + + // Initial data - a, b, c, d, e + input.sendData( + 0, + new MultiSet([ + [[null, { id: 1, value: 'a' }], 1], + [[null, { id: 2, value: 'b' }], 1], + [[null, { id: 3, value: 'c' }], 1], + [[null, { id: 4, value: 'd' }], 1], + [[null, { id: 5, value: 'e' }], 1], + ]), + ) + input.sendFrontier(1) + graph.run() + + // Should have elements starting from offset 2: c -> d -> e + const initialResult = allMessages[0].collection.getInner() + expect(initialResult.length).toBe(3) + verifyLinkedListOrder(initialResult, ['c', 'd', 'e'], null) + + // Check that we have the right elements + const values = initialResult.map(([[_, [value, __]]]) => value.value).sort() + expect(values).toEqual(['c', 'd', 'e']) + + // Verify that 'c' has null as previous reference since it's the first in the result + const valueToEntry = new Map() + for (const [[_, [value, prevRef]]] of initialResult) { + valueToEntry.set(value.value, prevRef) + } + expect(valueToEntry.get('c')).toBe(null) + }) + + it('should handle limit and offset together', () => { + const graph = new D2({ initialFrontier: 0 }) + const input = graph.newInput<[null, { id: number; value: string }]>() + const allMessages: any[] = [] + + input.pipe( + topKWithPreviousRef( + (a, b) => a.value.localeCompare(b.value), + { + previousRef: (value) => value.id, + nullValue: null, + offset: 1, + limit: 2, + } + ), + output((message) => { + if (message.type === MessageType.DATA) { + allMessages.push(message.data) + } + }), + ) + + graph.finalize() + + // Initial data - a, b, c, d, e + input.sendData( + 0, + new MultiSet([ + [[null, { id: 1, value: 'a' }], 1], + [[null, { id: 2, value: 'b' }], 1], + [[null, { id: 3, value: 'c' }], 1], + [[null, { id: 4, value: 'd' }], 1], + [[null, { id: 5, value: 'e' }], 1], + ]), + ) + input.sendFrontier(1) + graph.run() + + // Should have elements from offset 1 with limit 2: b -> c + const initialResult = allMessages[0].collection.getInner() + expect(initialResult.length).toBe(2) + verifyLinkedListOrder(initialResult, ['b', 'c'], null) + + // Check that we have the right elements + const values = initialResult.map(([[_, [value, __]]]) => value.value).sort() + expect(values).toEqual(['b', 'c']) + + // Verify that 'b' has null as previous reference since it's the first in the result + const valueToEntry = new Map() + for (const [[_, [value, prevRef]]] of initialResult) { + valueToEntry.set(value.value, prevRef) + } + expect(valueToEntry.get('b')).toBe(null) + expect(valueToEntry.get('c')).toBe(2) // Previous ref to 'b' (id: 2) + }) + + it('should handle different reference types', () => { + const graph = new D2({ initialFrontier: 0 }) + const input = graph.newInput<[null, { id: string; value: string }]>() + const allMessages: any[] = [] + + input.pipe( + topKWithPreviousRef( + (a, b) => a.value.localeCompare(b.value), + { + previousRef: (value) => value.id, + nullValue: 'START', + } + ), + output((message) => { + if (message.type === MessageType.DATA) { + allMessages.push(message.data) + } + }), + ) + + graph.finalize() + + // Initial data with string IDs + input.sendData( + 0, + new MultiSet([ + [[null, { id: 'item_3', value: 'c' }], 1], + [[null, { id: 'item_1', value: 'a' }], 1], + [[null, { id: 'item_2', value: 'b' }], 1], + ]), + ) + input.sendFrontier(1) + graph.run() + + // Initial result: a -> b -> c + const initialResult = allMessages[0].collection.getInner() + expect(initialResult.length).toBe(3) + verifyLinkedListOrder(initialResult, ['a', 'b', 'c'], 'START') + + // Check specific previous references with string IDs + const valueToEntry = new Map() + for (const [[_, [value, prevRef]]] of initialResult) { + valueToEntry.set(value.value, { value, prevRef }) + } + + // 'a' should have 'START' as previous reference (first element) + expect(valueToEntry.get('a').prevRef).toBe('START') + // 'b' should have 'item_1' as previous reference + expect(valueToEntry.get('b').prevRef).toBe('item_1') + // 'c' should have 'item_2' as previous reference + expect(valueToEntry.get('c').prevRef).toBe('item_2') + }) + }) +}) \ No newline at end of file