From 9959e18aabacf854c746613930fa1a32c96f8db1 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sat, 15 Mar 2025 19:44:52 +0000 Subject: [PATCH 1/5] sorted set and map types --- packages/d2ts/src/utils.ts | 282 +++++++++++++++ packages/d2ts/tests/utils.test.ts | 549 +++++++++++++++++++++++++++++- tsconfig.json | 2 +- 3 files changed, 831 insertions(+), 2 deletions(-) diff --git a/packages/d2ts/src/utils.ts b/packages/d2ts/src/utils.ts index 33b345e..9523121 100644 --- a/packages/d2ts/src/utils.ts +++ b/packages/d2ts/src/utils.ts @@ -52,6 +52,288 @@ export class DefaultMap extends Map { } } +/** + * A sorted set that maintains a sorted list of values. + * + * This implementation maintains a cache of the sorted values, which is invalidated + * when the set is modified. + * There will be more performant implementations, particularly if you can drop down + * to a lower level language, but in javascript this is probably enough for most + * use cases. + */ +export class SortedSet extends Set { + private compare?: (a: T, b: T) => number + private sortedValuesCache?: T[] + + constructor(values?: Iterable, compare?: (a: T, b: T) => number) { + super(values) + this.compare = compare + this.sortedValuesCache = undefined + } + + private invalidateCache() { + this.sortedValuesCache = undefined + } + + private get sortedValues() { + if (!this.sortedValuesCache) { + // Use super.values() to avoid calling our overridden values() method + // which would cause an infinite recursion + this.sortedValuesCache = Array.from(super.values()).sort(this.compare) + } + return this.sortedValuesCache + } + + add(value: T): this { + if (!this.has(value)) { + super.add(value) + this.invalidateCache() + } + return this + } + + asArray(): T[] { + return [...this.sortedValues] + } + + clear(): void { + super.clear() + this.invalidateCache() + } + + delete(value: T): boolean { + if (super.delete(value)) { + this.invalidateCache() + return true + } + return false + } + + entries(): IterableIterator<[T, T]> { + const sortedValues = this.sortedValues + function* entries() { + for (const value of sortedValues!.values()) { + yield [value, value] as [T, T] + } + } + return entries() + } + + findIndex(predicate: (value: T) => boolean): number { + return this.sortedValues.findIndex(predicate) + } + + findLastIndex(predicate: (value: T) => boolean): number { + return this.sortedValues.findLastIndex(predicate) + } + + forEach( + callbackfn: (value: T, value2: T, set: SortedSet) => void, + thisArg?: any, + ): void { + const sortedValues = this.sortedValues + if (thisArg) { + sortedValues!.forEach((value, _) => + callbackfn.call(thisArg, value, value, this), + ) + } else { + sortedValues!.forEach((value, _) => callbackfn(value, value, this)) + } + } + + indexOf(value: T): number { + return this.sortedValues.indexOf(value) + } + + keys(): IterableIterator { + return this.values() + } + + lastIndexOf(value: T): number { + return this.sortedValues.lastIndexOf(value) + } + + pop(): T | undefined { + const value = this.sortedValues.pop() + if (value) { + this.delete(value) + } + return value + } + + shift(): T | undefined { + const value = this.sortedValues.shift() + if (value) { + this.delete(value) + } + return value + } + + slice(start: number, end: number): SortedSet { + return new SortedSet(this.sortedValues.slice(start, end)) + } + + values(): IterableIterator { + return this.sortedValues.values() + } + + valueAt(index: number): T { + return this.sortedValues[index] + } + + [Symbol.iterator](): IterableIterator { + return this.values() + } +} + +/** + * A sorted map that maintains a sorted list of keys. + * + * Uses a `SortedSet` to store the keys, and a `Map` to store the values. + */ +export class SortedMap extends Map { + private sortedKeys: SortedSet + + constructor(entries?: Iterable<[K, V]>, compare?: (a: K, b: K) => number) { + super(entries) + this.sortedKeys = new SortedSet( + (entries ? [...entries] : []).map(([key]) => key), + compare, + ) + } + + asMap(): Map { + return new Map(this) + } + + clear(): void { + super.clear() + this.sortedKeys!.clear() + } + + delete(key: K): boolean { + if (super.delete(key)) { + this.sortedKeys!.delete(key) + return true + } + return false + } + + entries(): IterableIterator<[K, V]> { + const sortedKeys = this.sortedKeys + const map = this + function* entries() { + for (const key of sortedKeys.values()) { + yield [key, map.get(key)!] as [K, V] + } + } + return entries() + } + + findIndex(predicate: (key: K) => boolean): number { + return this.sortedKeys.findIndex(predicate) + } + + findIndexValue(predicate: (value: V) => boolean): number { + return this.sortedKeys.findIndex((key) => predicate(this.get(key)!)) + } + + findLastIndex(predicate: (key: K) => boolean): number { + return this.sortedKeys.findLastIndex(predicate) + } + + findLastIndexValue(predicate: (value: V) => boolean): number { + return this.sortedKeys.findLastIndex((key) => predicate(this.get(key)!)) + } + + forEach( + callbackfn: (value: V, key: K, map: SortedMap) => void, + thisArg?: any, + ): void { + const sortedKeys = this.sortedKeys + const map = this + if (thisArg) { + sortedKeys.forEach((key, _) => + callbackfn.call(thisArg, map.get(key)!, key, map), + ) + } else { + sortedKeys.forEach((key, _) => callbackfn(map.get(key)!, key, map)) + } + } + + indexOf(key: K): number { + return this.sortedKeys.indexOf(key) + } + + indexOfValue(value: V): number { + return this.sortedKeys.findIndex((key) => this.get(key)! === value) + } + + keys(): IterableIterator { + return this.sortedKeys.values() + } + + lastIndexOf(key: K): number { + return this.sortedKeys.lastIndexOf(key) + } + + lastIndexOfValue(value: V): number { + return this.sortedKeys.findLastIndex((key) => this.get(key)! === value) + } + + pop(): [K, V] { + const key = this.sortedKeys.pop() + if (key) { + const value = this.get(key)! + this.delete(key) + return [key, value] + } + throw new Error('SortedMap is empty') + } + + set(key: K, value: V): this { + super.set(key, value) + // We need to check if the sortedKeys is already initialized as `add` will be + // called using the constructor, which will call `super.set` again + this.sortedKeys?.add(key) + return this + } + + shift(): [K, V] { + const key = this.sortedKeys.shift() + if (key) { + const value = this.get(key)! + this.delete(key) + return [key, value] + } + throw new Error('SortedMap is empty') + } + + slice(start: number, end: number): SortedMap { + return new SortedMap( + this.sortedKeys + .slice(start, end) + .asArray() + .map((key) => [key, this.get(key)!]), + ) + } + + values(): IterableIterator { + const sortedKeys = this.sortedKeys + const map = this + function* values() { + for (const key of sortedKeys.values()) { + yield map.get(key)! + } + } + return values() + } + + [Symbol.iterator](): IterableIterator<[K, V]> { + return this.entries() + } +} + // JS engines have various limits on how many args can be passed to a function // with a spread operator, so we need to split the operation into chunks // 32767 is the max for Chrome 14, all others are higher diff --git a/packages/d2ts/tests/utils.test.ts b/packages/d2ts/tests/utils.test.ts index ac95d1c..d15c407 100644 --- a/packages/d2ts/tests/utils.test.ts +++ b/packages/d2ts/tests/utils.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect } from 'vitest' -import { DefaultMap, WeakRefMap } from '../src/utils.js' +import { DefaultMap, WeakRefMap, SortedSet, SortedMap } from '../src/utils.js' describe('DefaultMap', () => { it('should return default value for missing keys', () => { @@ -76,3 +76,550 @@ describe('WeakRefMap', () => { // } // }) }) + +describe('SortedSet', () => { + it('should maintain sorted order of values', () => { + const set = new SortedSet([3, 1, 4, 2], (a, b) => a - b) + expect(Array.from(set)).toEqual([1, 2, 3, 4]) + }) + + it('should work with default comparison', () => { + const set = new SortedSet([3, 1, 4, 2]) + // Default sort converts to strings, so 1, 2, 3, 4 is expected + expect(Array.from(set)).toEqual([1, 2, 3, 4]) + }) + + it('should maintain sorted order when adding values', () => { + const set = new SortedSet([], (a, b) => a - b) + set.add(3) + set.add(1) + set.add(4) + set.add(2) + expect(Array.from(set)).toEqual([1, 2, 3, 4]) + }) + + it('should not add duplicate values', () => { + const set = new SortedSet([1, 2, 3], (a, b) => a - b) + set.add(2) + expect(Array.from(set)).toEqual([1, 2, 3]) + expect(set.size).toBe(3) + }) + + it('should return values as array with asArray()', () => { + const set = new SortedSet([3, 1, 4, 2], (a, b) => a - b) + expect(set.asArray()).toEqual([1, 2, 3, 4]) + }) + + it('should clear all values', () => { + const set = new SortedSet([1, 2, 3], (a, b) => a - b) + set.clear() + expect(set.size).toBe(0) + expect(Array.from(set)).toEqual([]) + }) + + it('should delete values', () => { + const set = new SortedSet([1, 2, 3, 4], (a, b) => a - b) + expect(set.delete(2)).toBe(true) + expect(Array.from(set)).toEqual([1, 3, 4]) + expect(set.delete(5)).toBe(false) + }) + + it('should iterate entries in sorted order', () => { + const set = new SortedSet([3, 1, 4, 2], (a, b) => a - b) + const entries = Array.from(set.entries()) + expect(entries).toEqual([ + [1, 1], + [2, 2], + [3, 3], + [4, 4], + ]) + }) + + it('should find index of value with predicate', () => { + const set = new SortedSet([1, 2, 3, 4, 5], (a, b) => a - b) + expect(set.findIndex((v) => v > 3)).toBe(3) + expect(set.findIndex((v) => v === 2)).toBe(1) + expect(set.findIndex((v) => v > 10)).toBe(-1) + }) + + it('should find last index of value with predicate', () => { + const set = new SortedSet([1, 2, 3, 4, 5], (a, b) => a - b) + expect(set.findLastIndex((v) => v < 4)).toBe(2) + expect(set.findLastIndex((v) => v === 5)).toBe(4) + expect(set.findLastIndex((v) => v < 0)).toBe(-1) + }) + + it('should execute forEach in sorted order', () => { + const set = new SortedSet([3, 1, 4, 2], (a, b) => a - b) + const result: number[] = [] + set.forEach((value) => { + result.push(value) + }) + expect(result).toEqual([1, 2, 3, 4]) + }) + + it('should find index of value', () => { + const set = new SortedSet([1, 2, 3, 4, 5], (a, b) => a - b) + expect(set.indexOf(3)).toBe(2) + expect(set.indexOf(6)).toBe(-1) + }) + + it('should iterate keys in sorted order', () => { + const set = new SortedSet([3, 1, 4, 2], (a, b) => a - b) + const keys = Array.from(set.keys()) + expect(keys).toEqual([1, 2, 3, 4]) + }) + + it('should find last index of value', () => { + const set = new SortedSet([1, 2, 3, 3, 4, 5], (a, b) => a - b) + expect(set.lastIndexOf(3)).toBe(2) // Only one 3 should be in the set + expect(set.lastIndexOf(6)).toBe(-1) + }) + + it('should pop the last value', () => { + const set = new SortedSet([1, 2, 3, 4], (a, b) => a - b) + expect(set.pop()).toBe(4) + expect(Array.from(set)).toEqual([1, 2, 3]) + expect(set.size).toBe(3) + }) + + it('should shift the first value', () => { + const set = new SortedSet([1, 2, 3, 4], (a, b) => a - b) + expect(set.shift()).toBe(1) + expect(Array.from(set)).toEqual([2, 3, 4]) + expect(set.size).toBe(3) + }) + + it('should slice a range of values', () => { + const set = new SortedSet([1, 2, 3, 4, 5], (a, b) => a - b) + const sliced = set.slice(1, 4) + expect(Array.from(sliced)).toEqual([2, 3, 4]) + expect(sliced.size).toBe(3) + }) + + it('should iterate values in sorted order', () => { + const set = new SortedSet([3, 1, 4, 2], (a, b) => a - b) + const values = Array.from(set.values()) + expect(values).toEqual([1, 2, 3, 4]) + }) + + it('should get value at index', () => { + const set = new SortedSet([1, 2, 3, 4, 5], (a, b) => a - b) + expect(set.valueAt(2)).toBe(3) + }) + + it('should work with custom objects', () => { + type Person = { name: string; age: number } + const people: Person[] = [ + { name: 'Alice', age: 30 }, + { name: 'Bob', age: 25 }, + { name: 'Charlie', age: 35 }, + ] + + const set = new SortedSet(people, (a, b) => a.age - b.age) + const sorted = set.asArray() + + expect(sorted[0].name).toBe('Bob') + expect(sorted[1].name).toBe('Alice') + expect(sorted[2].name).toBe('Charlie') + }) + + it('should work with Symbol.iterator', () => { + const set = new SortedSet([3, 1, 4, 2], (a, b) => a - b) + const result = [...set] + expect(result).toEqual([1, 2, 3, 4]) + }) +}) + +describe('SortedMap', () => { + it('should maintain sorted order of keys', () => { + const map = new SortedMap( + [ + [3, 'three'], + [1, 'one'], + [4, 'four'], + [2, 'two'], + ], + (a, b) => a - b, + ) + expect(Array.from(map.keys())).toEqual([1, 2, 3, 4]) + }) + + it('should work with default comparison', () => { + const map = new SortedMap([ + [3, 'three'], + [1, 'one'], + [4, 'four'], + [2, 'two'], + ]) + // Default sort converts to strings, so 1, 2, 3, 4 is expected + expect(Array.from(map.keys())).toEqual([1, 2, 3, 4]) + }) + + it('should maintain sorted order when adding values', () => { + const map = new SortedMap([], (a, b) => a - b) + map.set(3, 'three') + map.set(1, 'one') + map.set(4, 'four') + map.set(2, 'two') + expect(Array.from(map.keys())).toEqual([1, 2, 3, 4]) + }) + + it('should update values for existing keys', () => { + const map = new SortedMap( + [ + [1, 'one'], + [2, 'two'], + ], + (a, b) => a - b, + ) + map.set(1, 'ONE') + expect(map.get(1)).toBe('ONE') + expect(Array.from(map.keys())).toEqual([1, 2]) + }) + + it('should convert to a regular Map with asMap()', () => { + const sortedMap = new SortedMap( + [ + [3, 'three'], + [1, 'one'], + [2, 'two'], + ], + (a, b) => a - b, + ) + const regularMap = sortedMap.asMap() + + expect(regularMap).toBeInstanceOf(Map) + expect(regularMap).not.toBeInstanceOf(SortedMap) + expect(regularMap.get(1)).toBe('one') + expect(regularMap.get(2)).toBe('two') + expect(regularMap.get(3)).toBe('three') + }) + + it('should clear all entries', () => { + const map = new SortedMap( + [ + [1, 'one'], + [2, 'two'], + [3, 'three'], + ], + (a, b) => a - b, + ) + map.clear() + expect(map.size).toBe(0) + expect(Array.from(map.entries())).toEqual([]) + }) + + it('should delete entries', () => { + const map = new SortedMap( + [ + [1, 'one'], + [2, 'two'], + [3, 'three'], + [4, 'four'], + ], + (a, b) => a - b, + ) + expect(map.delete(2)).toBe(true) + expect(Array.from(map.keys())).toEqual([1, 3, 4]) + expect(map.delete(5)).toBe(false) + }) + + it('should iterate entries in sorted order', () => { + const map = new SortedMap( + [ + [3, 'three'], + [1, 'one'], + [4, 'four'], + [2, 'two'], + ], + (a, b) => a - b, + ) + const entries = Array.from(map.entries()) + expect(entries).toEqual([ + [1, 'one'], + [2, 'two'], + [3, 'three'], + [4, 'four'], + ]) + }) + + it('should find index of key with predicate', () => { + const map = new SortedMap( + [ + [1, 'one'], + [2, 'two'], + [3, 'three'], + [4, 'four'], + [5, 'five'], + ], + (a, b) => a - b, + ) + expect(map.findIndex((k) => k > 3)).toBe(3) + expect(map.findIndex((k) => k === 2)).toBe(1) + expect(map.findIndex((k) => k > 10)).toBe(-1) + }) + + it('should find index of value with predicate', () => { + const map = new SortedMap( + [ + [1, 'one'], + [2, 'two'], + [3, 'three'], + [4, 'four'], + [5, 'five'], + ], + (a, b) => a - b, + ) + expect(map.findIndexValue((v) => v.startsWith('f'))).toBe(3) // 'four' is at index 3 + expect(map.findIndexValue((v) => v === 'two')).toBe(1) + expect(map.findIndexValue((v) => v === 'six')).toBe(-1) + }) + + it('should find last index of key with predicate', () => { + const map = new SortedMap( + [ + [1, 'one'], + [2, 'two'], + [3, 'three'], + [4, 'four'], + [5, 'five'], + ], + (a, b) => a - b, + ) + expect(map.findLastIndex((k) => k < 4)).toBe(2) + expect(map.findLastIndex((k) => k === 5)).toBe(4) + expect(map.findLastIndex((k) => k < 0)).toBe(-1) + }) + + it('should find last index of value with predicate', () => { + const map = new SortedMap( + [ + [1, 'one'], + [2, 'two'], + [3, 'three'], + [4, 'four'], + [5, 'five'], + ], + (a, b) => a - b, + ) + expect(map.findLastIndexValue((v) => v.includes('e'))).toBe(4) // 'five' is the last with 'e' + expect(map.findLastIndexValue((v) => v === 'one')).toBe(0) + expect(map.findLastIndexValue((v) => v === 'six')).toBe(-1) + }) + + it('should execute forEach in sorted order', () => { + const map = new SortedMap( + [ + [3, 'three'], + [1, 'one'], + [4, 'four'], + [2, 'two'], + ], + (a, b) => a - b, + ) + const result: Array<[number, string]> = [] + map.forEach((value, key) => { + result.push([key, value]) + }) + expect(result).toEqual([ + [1, 'one'], + [2, 'two'], + [3, 'three'], + [4, 'four'], + ]) + }) + + it('should find index of key', () => { + const map = new SortedMap( + [ + [1, 'one'], + [2, 'two'], + [3, 'three'], + [4, 'four'], + [5, 'five'], + ], + (a, b) => a - b, + ) + expect(map.indexOf(3)).toBe(2) + expect(map.indexOf(6)).toBe(-1) + }) + + it('should find index of value', () => { + const map = new SortedMap( + [ + [1, 'one'], + [2, 'two'], + [3, 'three'], + [4, 'four'], + [5, 'five'], + ], + (a, b) => a - b, + ) + expect(map.indexOfValue('three')).toBe(2) + expect(map.indexOfValue('six')).toBe(-1) + }) + + it('should iterate keys in sorted order', () => { + const map = new SortedMap( + [ + [3, 'three'], + [1, 'one'], + [4, 'four'], + [2, 'two'], + ], + (a, b) => a - b, + ) + const keys = Array.from(map.keys()) + expect(keys).toEqual([1, 2, 3, 4]) + }) + + it('should find last index of key', () => { + const map = new SortedMap( + [ + [1, 'one'], + [2, 'two'], + [3, 'three'], + [4, 'four'], + [5, 'five'], + ], + (a, b) => a - b, + ) + expect(map.lastIndexOf(3)).toBe(2) + expect(map.lastIndexOf(6)).toBe(-1) + }) + + it('should find last index of value', () => { + const map = new SortedMap( + [ + [1, 'one'], + [2, 'two'], + [3, 'three'], + [4, 'four'], + [5, 'five'], + ], + (a, b) => a - b, + ) + // Add a duplicate value to test lastIndexOfValue + map.set(6, 'three') + expect(map.lastIndexOfValue('three')).toBe(5) // Index of key 6 + expect(map.lastIndexOfValue('six')).toBe(-1) + }) + + it('should pop the last entry', () => { + const map = new SortedMap( + [ + [1, 'one'], + [2, 'two'], + [3, 'three'], + [4, 'four'], + ], + (a, b) => a - b, + ) + expect(map.pop()).toEqual([4, 'four']) + expect(map.size).toBe(3) + expect(Array.from(map.keys())).toEqual([1, 2, 3]) + }) + + it('should throw when popping from an empty map', () => { + const map = new SortedMap([], (a, b) => a - b) + expect(() => map.pop()).toThrow('SortedMap is empty') + }) + + it('should shift the first entry', () => { + const map = new SortedMap( + [ + [1, 'one'], + [2, 'two'], + [3, 'three'], + [4, 'four'], + ], + (a, b) => a - b, + ) + expect(map.shift()).toEqual([1, 'one']) + expect(map.size).toBe(3) + expect(Array.from(map.keys())).toEqual([2, 3, 4]) + }) + + it('should throw when shifting from an empty map', () => { + const map = new SortedMap([], (a, b) => a - b) + expect(() => map.shift()).toThrow('SortedMap is empty') + }) + + it('should slice a range of entries', () => { + const map = new SortedMap( + [ + [1, 'one'], + [2, 'two'], + [3, 'three'], + [4, 'four'], + [5, 'five'], + ], + (a, b) => a - b, + ) + const sliced = map.slice(1, 4) + expect(sliced.size).toBe(3) + expect(Array.from(sliced.entries())).toEqual([ + [2, 'two'], + [3, 'three'], + [4, 'four'], + ]) + }) + + it('should iterate values in sorted key order', () => { + const map = new SortedMap( + [ + [3, 'three'], + [1, 'one'], + [4, 'four'], + [2, 'two'], + ], + (a, b) => a - b, + ) + const values = Array.from(map.values()) + expect(values).toEqual(['one', 'two', 'three', 'four']) + }) + + it('should work with Symbol.iterator', () => { + const map = new SortedMap( + [ + [3, 'three'], + [1, 'one'], + [4, 'four'], + [2, 'two'], + ], + (a, b) => a - b, + ) + const result = [...map] + expect(result).toEqual([ + [1, 'one'], + [2, 'two'], + [3, 'three'], + [4, 'four'], + ]) + }) + + it('should work with custom objects as keys', () => { + type Person = { name: string; age: number } + const alice = { name: 'Alice', age: 30 } + const bob = { name: 'Bob', age: 25 } + const charlie = { name: 'Charlie', age: 35 } + + const map = new SortedMap( + [ + [alice, 'developer'], + [bob, 'designer'], + [charlie, 'manager'], + ], + (a, b) => a.age - b.age, + ) + + const keys = Array.from(map.keys()) + expect(keys[0].name).toBe('Bob') + expect(keys[1].name).toBe('Alice') + expect(keys[2].name).toBe('Charlie') + + const values = Array.from(map.values()) + expect(values).toEqual(['designer', 'developer', 'manager']) + }) +}) diff --git a/tsconfig.json b/tsconfig.json index ad64df8..db73f9f 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -11,7 +11,7 @@ "esModuleInterop": true, "forceConsistentCasingInFileNames": true, "isolatedModules": true, - "lib": ["ES2021", "DOM", "DOM.Iterable"], + "lib": ["ES2023", "DOM", "DOM.Iterable"], "module": "ESNext", "moduleResolution": "Bundler", "noFallthroughCasesInSwitch": true, From 958caffce497f46cb746b7e33191158eca59d047 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sun, 16 Mar 2025 11:45:21 +0000 Subject: [PATCH 2/5] WIP Cache --- packages/d2ts/src/cache.ts | 114 +++++++++++++++++++++++++++++ packages/d2ts/src/d2.ts | 27 +++++++ packages/d2ts/src/graph.ts | 1 + packages/d2ts/src/types.ts | 3 + packages/d2ts/src/version-index.ts | 4 + packages/d2ts/tests/cache.test.ts | 110 ++++++++++++++++++++++++++++ 6 files changed, 259 insertions(+) create mode 100644 packages/d2ts/src/cache.ts create mode 100644 packages/d2ts/tests/cache.test.ts diff --git a/packages/d2ts/src/cache.ts b/packages/d2ts/src/cache.ts new file mode 100644 index 0000000..11b7661 --- /dev/null +++ b/packages/d2ts/src/cache.ts @@ -0,0 +1,114 @@ +import { + DataMessage, + ID2, + IStreamBuilder, + Message, + MessageType, + KeyValue, +} from './types.js' +import { Index } from './version-index.js' +import { output } from './operators/output.js' +import { Antichain, Version } from './order.js' +import { MultiSet } from './multiset.js' +import { DefaultMap } from './utils.js' + +export interface CacheOptions { + indexedBy?: { + [key: string]: (item: T) => unknown + } +} + +export interface PipeIntoOptions { + whereKey?: string + where?: Record +} + +export class Cache { + #index = new Index() + #options: CacheOptions> + #stream: IStreamBuilder> + #subscribers = new Set>>() + + constructor( + stream: IStreamBuilder>, + options?: CacheOptions>, + ) { + this.#stream = stream + this.#options = options ?? {} + this.#stream.pipe( + output((message) => { + this.#handleInputMessage(message) + }), + ) + } + + #handleInputMessage(message: Message>): void { + if (message.type === MessageType.DATA) { + const { version, collection } = message.data as DataMessage< + KeyValue + > + for (const [item, multiplicity] of collection.getInner()) { + const [key, value] = item + this.#index.addValue(key, version, [value, multiplicity]) + } + } else if (message.type === MessageType.FRONTIER) { + const frontier = message.data as Antichain + this.#index.compact(frontier) + } + this.#broadcast(message) + } + + #broadcast(message: Message>): void { + if (message.type === MessageType.DATA) { + const { version, collection } = message.data + for (const subscriber of this.#subscribers) { + subscriber.writer.sendData(version, collection) + } + } else if (message.type === MessageType.FRONTIER) { + const frontier = message.data as Antichain + this.#index.compact(frontier) + for (const subscriber of this.#subscribers) { + subscriber.writer.sendFrontier(frontier) + } + } + } + + #sendHistory(input: IStreamBuilder>): void { + const versionedData = new DefaultMap(() => []) + for (const key of this.#index.keys()) { + for (const [version, values] of this.#index.get(key)) { + for (const [value, multiplicity] of values) { + versionedData.get(version).push([[key, value], multiplicity]) + } + } + } + const sortedVersions = Array.from(versionedData.keys()).sort((a, b) => + a.lessThan(b) ? -1 : 1, + ) + for (const version of sortedVersions) { + input.writer.sendData(version, new MultiSet(versionedData.get(version))) + } + const frontier = this.#index.frontier + if (frontier) { + input.writer.sendFrontier(frontier) + } + } + + pipeInto( + graph: ID2, + options: PipeIntoOptions = {}, + ): IStreamBuilder> { + const input = graph.newInput>() + this.#subscribers.add(input) + + graph.addStartupSubscriber(() => { + this.#sendHistory(input) + }) + + graph.addTeardownSubscriber(() => { + this.#subscribers.delete(input) + }) + + return input + } +} diff --git a/packages/d2ts/src/d2.ts b/packages/d2ts/src/d2.ts index 0118c5f..40ded7f 100644 --- a/packages/d2ts/src/d2.ts +++ b/packages/d2ts/src/d2.ts @@ -18,6 +18,9 @@ export class D2 implements ID2 { #frontierStack: Antichain[] = [] #nextOperatorId = 0 #finalized = false + #closed = false + #startupSubscribers: (() => void)[] = [] + #teardownSubscribers: (() => void)[] = [] constructor({ initialFrontier }: D2Options) { this.#frontierStack = [Antichain.create(initialFrontier)] @@ -68,12 +71,18 @@ export class D2 implements ID2 { finalize() { this.#checkNotFinalized() this.#finalized = true + for (const subscriber of this.#startupSubscribers) { + subscriber() + } } step(): void { if (!this.#finalized) { throw new Error('Graph not finalized') } + if (this.#closed) { + throw new Error('Graph closed') + } for (const op of this.#operators) { op.run() } @@ -88,6 +97,24 @@ export class D2 implements ID2 { this.step() } } + + close(): void { + if (this.#closed) { + throw new Error('Graph already closed') + } + this.#closed = true + for (const subscriber of this.#teardownSubscribers) { + subscriber() + } + } + + addStartupSubscriber(subscriber: () => void): void { + this.#startupSubscribers.push(subscriber) + } + + addTeardownSubscriber(subscriber: () => void): void { + this.#teardownSubscribers.push(subscriber) + } } export class StreamBuilder implements IStreamBuilder { diff --git a/packages/d2ts/src/graph.ts b/packages/d2ts/src/graph.ts index 4e2690c..2c384d2 100644 --- a/packages/d2ts/src/graph.ts +++ b/packages/d2ts/src/graph.ts @@ -82,6 +82,7 @@ export class DifferenceStreamWriter implements IDifferenceStreamWriter { data: dataMessage, }) } + // console.log('sendData', JSON.stringify(this.#queues, null, 2)) } sendFrontier(frontier: Antichain | Version | number | number[]): void { diff --git a/packages/d2ts/src/types.ts b/packages/d2ts/src/types.ts index e53075b..7d6125d 100644 --- a/packages/d2ts/src/types.ts +++ b/packages/d2ts/src/types.ts @@ -59,6 +59,9 @@ export interface ID2 { popFrontier(): void finalize(): void step(): void + close(): void + addStartupSubscriber(subscriber: () => void): void + addTeardownSubscriber(subscriber: () => void): void } export interface IStreamBuilder { diff --git a/packages/d2ts/src/version-index.ts b/packages/d2ts/src/version-index.ts index 87ae1de..c6a64c9 100644 --- a/packages/d2ts/src/version-index.ts +++ b/packages/d2ts/src/version-index.ts @@ -44,6 +44,10 @@ export class Index implements IndexType { this.#modifiedKeys = new Set() } + get frontier(): Antichain | null { + return this.#compactionFrontier + } + toString(indent = false): string { return `Index(${JSON.stringify( [...this.#inner].map(([k, v]) => [k, [...v.entries()]]), diff --git a/packages/d2ts/tests/cache.test.ts b/packages/d2ts/tests/cache.test.ts new file mode 100644 index 0000000..11f7bc4 --- /dev/null +++ b/packages/d2ts/tests/cache.test.ts @@ -0,0 +1,110 @@ +import { describe, expect, test } from 'vitest' +import { + Message, + MessageType, + DataMessage, + FrontierMessage, +} from '../src/types.js' +import { MultiSet } from '../src/multiset.js' +import { D2 } from '../src/d2.js' +import { Cache } from '../src/cache.js' +import { map } from '../src/operators/map.js' +import { output } from '../src/operators/output.js' +import { Antichain } from '../src/order.js' + +describe('Cache', () => { + test('sends initial data to subscribers', () => { + const baseGraph = new D2({ initialFrontier: 0 }) + const baseInput = baseGraph.newInput<[string, number]>() + const mapped = baseInput.pipe( + map(([key, value]) => [key, value * 2] as [string, number]), + ) + const cache = new Cache(mapped) + baseGraph.finalize() + + baseInput.sendData( + 0, + new MultiSet<[string, number]>([ + [['a', 1], 1], + [['b', 2], 1], + ]), + ) + baseInput.sendFrontier(1) + + baseGraph.run() + + const newGraph = new D2({ initialFrontier: 0 }) + const messages: Message<[string, number]>[] = [] + + cache.pipeInto(newGraph).pipe(output((message) => messages.push(message))) + newGraph.finalize() + newGraph.run() + + expect(messages.length).toEqual(2) + + { + const msg1 = messages[0] + expect(msg1.type).toEqual(MessageType.DATA) + const data = msg1.data as DataMessage<[string, number]> + expect(data.version.getInner()).toEqual([1]) + expect(data.collection.getInner()).toEqual([ + [['a', 2], 1], + [['b', 4], 1], + ]) + } + { + const msg2 = messages[1] + expect(msg2.type).toEqual(MessageType.FRONTIER) + } + }) + + test('sends incrimental data to subscribers', () => { + const baseGraph = new D2({ initialFrontier: 0 }) + const baseInput = baseGraph.newInput<[string, number]>() + const mapped = baseInput.pipe( + map(([key, value]) => [key, value * 2] as [string, number]), + ) + const cache = new Cache(mapped) + baseGraph.finalize() + + baseGraph.run() + + const newGraph = new D2({ initialFrontier: 0 }) + const messages: Message<[string, number]>[] = [] + + cache.pipeInto(newGraph).pipe(output((message) => messages.push(message))) + newGraph.finalize() + newGraph.run() + + expect(messages.length).toEqual(0) + + baseInput.sendData( + 1, + new MultiSet<[string, number]>([ + [['a', 1], 1], + [['b', 2], 1], + ]), + ) + baseInput.sendFrontier(2) + + baseGraph.run() + newGraph.run() + + expect(messages.length).toEqual(2) + + { + const msg1 = messages[0] + expect(msg1.type).toEqual(MessageType.DATA) + const data = msg1.data as DataMessage<[string, number]> + expect(data.version.getInner()).toEqual([1]) + expect(data.collection.getInner()).toEqual([ + [['a', 2], 1], + [['b', 4], 1], + ]) + } + { + const msg2 = messages[1] + expect(msg2.type).toEqual(MessageType.FRONTIER) + } + }) +}) From f51ec5179a114d447c8644fa69cf108af13ab31c Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sun, 16 Mar 2025 14:39:53 +0000 Subject: [PATCH 3/5] pipeInto with whereKey --- packages/d2ts/src/cache.ts | 53 +++- packages/d2ts/src/index-operators.ts | 48 ++++ packages/d2ts/src/utils.ts | 41 ++- packages/d2ts/src/version-index.ts | 28 +- packages/d2ts/tests/cache.test.ts | 372 ++++++++++++++++++++++++++- 5 files changed, 529 insertions(+), 13 deletions(-) create mode 100644 packages/d2ts/src/index-operators.ts diff --git a/packages/d2ts/src/cache.ts b/packages/d2ts/src/cache.ts index 11b7661..e979cf9 100644 --- a/packages/d2ts/src/cache.ts +++ b/packages/d2ts/src/cache.ts @@ -11,16 +11,18 @@ import { output } from './operators/output.js' import { Antichain, Version } from './order.js' import { MultiSet } from './multiset.js' import { DefaultMap } from './utils.js' +import { filter } from './operators/filter.js' +import { eq, IndexOperator } from './index-operators.js' export interface CacheOptions { indexedBy?: { - [key: string]: (item: T) => unknown + [key: string]: (item: T) => unknown | string } } -export interface PipeIntoOptions { - whereKey?: string - where?: Record +export interface PipeIntoOptions { + whereKey?: K | IndexOperator + where?: Record> } export class Cache { @@ -73,9 +75,29 @@ export class Cache { } } - #sendHistory(input: IStreamBuilder>): void { + #sendHistory( + input: IStreamBuilder>, + options: PipeIntoOptions = {}, + ): void { const versionedData = new DefaultMap(() => []) - for (const key of this.#index.keys()) { + + let keysToSend: K[] + if (options.whereKey) { + if (typeof options.whereKey === 'function') { + keysToSend = this.#index.matchKeys(options.whereKey as IndexOperator) + } else { + keysToSend = [options.whereKey] + } + } else { + keysToSend = this.#index.keys() + } + + if (options.where) { + // TODO: implement where + throw new Error('where is not supported yet') + } + + for (const key of keysToSend) { for (const [version, values] of this.#index.get(key)) { for (const [value, multiplicity] of values) { versionedData.get(version).push([[key, value], multiplicity]) @@ -96,15 +118,30 @@ export class Cache { pipeInto( graph: ID2, - options: PipeIntoOptions = {}, + options: PipeIntoOptions = {}, ): IStreamBuilder> { const input = graph.newInput>() this.#subscribers.add(input) graph.addStartupSubscriber(() => { - this.#sendHistory(input) + this.#sendHistory(input, options) }) + let pipeline = input + + if (options.whereKey) { + const operator = + typeof options.whereKey === 'function' + ? (options.whereKey as IndexOperator) + : (eq(options.whereKey) as IndexOperator) + pipeline = pipeline.pipe(filter(([key]) => operator(key))) + } + + if (options.where) { + // TODO: implement where + throw new Error('where is not supported yet') + } + graph.addTeardownSubscriber(() => { this.#subscribers.delete(input) }) diff --git a/packages/d2ts/src/index-operators.ts b/packages/d2ts/src/index-operators.ts new file mode 100644 index 0000000..6284efd --- /dev/null +++ b/packages/d2ts/src/index-operators.ts @@ -0,0 +1,48 @@ +export type IndexOperator = (key: K) => boolean + +export const eq = (matchKey: K): IndexOperator => { + return (key) => key === matchKey +} + +export function neq(matchKey: K): IndexOperator { + return (key) => key !== matchKey +} + +export function gt(matchKey: K): IndexOperator { + return (key) => key > matchKey +} + +export function gte(matchKey: K): IndexOperator { + return (key) => key >= matchKey +} + +export function lt(matchKey: K): IndexOperator { + return (key) => key < matchKey +} + +export function lte(matchKey: K): IndexOperator { + return (key) => key <= matchKey +} + +export function isIn(matchKeys: K[] | Set): IndexOperator { + const matchKeysSet = matchKeys instanceof Set ? matchKeys : new Set(matchKeys) + return (key) => { + return matchKeysSet.has(key) + } +} + +export function between(start: K, end: K): IndexOperator { + return (key) => key >= start && key <= end +} + +export function and(...operators: IndexOperator[]): IndexOperator { + return (key) => operators.every((op) => op(key)) +} + +export function or(...operators: IndexOperator[]): IndexOperator { + return (key) => operators.some((op) => op(key)) +} + +export function not(operator: IndexOperator): IndexOperator { + return (key) => !operator(key) +} diff --git a/packages/d2ts/src/utils.ts b/packages/d2ts/src/utils.ts index 9523121..6767f69 100644 --- a/packages/d2ts/src/utils.ts +++ b/packages/d2ts/src/utils.ts @@ -67,7 +67,15 @@ export class SortedSet extends Set { constructor(values?: Iterable, compare?: (a: T, b: T) => number) { super(values) - this.compare = compare + this.compare = + compare ?? + ((a, b) => { + if (typeof a === 'number' && typeof b === 'number') { + return a - b + } else { + return a > b ? 1 : -1 + } + }) this.sortedValuesCache = undefined } @@ -169,7 +177,7 @@ export class SortedSet extends Set { return value } - slice(start: number, end: number): SortedSet { + slice(start: number, end?: number): SortedSet { return new SortedSet(this.sortedValues.slice(start, end)) } @@ -334,6 +342,35 @@ export class SortedMap extends Map { } } +/** + * A sorted map that returns a default value for keys that are not present. + */ +export class SortedDefaultMap extends SortedMap { + constructor( + private defaultValue: () => V, + entries?: Iterable<[K, V]>, + ) { + super(entries) + } + + get(key: K): V { + if (!this.has(key)) { + this.set(key, this.defaultValue()) + } + return super.get(key)! + } + + /** + * Update the value for a key using a function. + */ + update(key: K, updater: (value: V) => V): V { + const value = this.get(key) + const newValue = updater(value) + this.set(key, newValue) + return newValue + } +} + // JS engines have various limits on how many args can be passed to a function // with a spread operator, so we need to split the operation into chunks // 32767 is the max for Chrome 14, all others are higher diff --git a/packages/d2ts/src/version-index.ts b/packages/d2ts/src/version-index.ts index c6a64c9..ec3bece 100644 --- a/packages/d2ts/src/version-index.ts +++ b/packages/d2ts/src/version-index.ts @@ -1,6 +1,7 @@ import { Version, Antichain } from './order.js' import { MultiSet } from './multiset.js' -import { DefaultMap, chunkedArrayPush } from './utils.js' +import { DefaultMap, chunkedArrayPush, SortedSet } from './utils.js' +import { IndexOperator } from './index-operators.js' type VersionMap = DefaultMap type IndexMap = DefaultMap> @@ -29,6 +30,7 @@ export class Index implements IndexType { #inner: IndexMap #compactionFrontier: Antichain | null #modifiedKeys: Set + #sortedKeys?: SortedSet constructor() { this.#inner = new DefaultMap>( @@ -106,6 +108,10 @@ export class Index implements IndexType { return this.keys().map((key) => [key, this.get(key)]) } + sortedEntries(): [K, VersionMap<[V, number]>][] { + return this.sortedKeys().map((key) => [key, this.get(key)]) + } + versions(key: K): Version[] { const result = Array.from(this.get(key).keys()) return result @@ -119,6 +125,9 @@ export class Index implements IndexType { return values }) this.#modifiedKeys.add(key) + if (this.#sortedKeys) { + this.#sortedKeys.add(key) + } } append(other: Index): void { @@ -131,6 +140,9 @@ export class Index implements IndexType { }) } this.#modifiedKeys.add(key) + if (this.#sortedKeys) { + this.#sortedKeys.add(key) + } } } @@ -258,12 +270,26 @@ export class Index implements IndexType { } this.#compactionFrontier = compactionFrontier + this.#sortedKeys = undefined // invalidate cache } keys(): K[] { return Array.from(this.#inner.keys()) } + sortedKeysSet(): SortedSet { + this.#sortedKeys ??= new SortedSet(this.keys()) + return this.#sortedKeys + } + + sortedKeys(): K[] { + return this.sortedKeysSet().asArray() + } + + matchKeys(operator: IndexOperator): K[] { + return this.keys().filter(operator) + } + has(key: K): boolean { return this.#inner.has(key) } diff --git a/packages/d2ts/tests/cache.test.ts b/packages/d2ts/tests/cache.test.ts index 11f7bc4..a32f71f 100644 --- a/packages/d2ts/tests/cache.test.ts +++ b/packages/d2ts/tests/cache.test.ts @@ -11,9 +11,10 @@ import { Cache } from '../src/cache.js' import { map } from '../src/operators/map.js' import { output } from '../src/operators/output.js' import { Antichain } from '../src/order.js' +import { gt, lt, eq, or, and, between, isIn } from '../src/index-operators.js' describe('Cache', () => { - test('sends initial data to subscribers', () => { + test('sends all initial data to subscribers', () => { const baseGraph = new D2({ initialFrontier: 0 }) const baseInput = baseGraph.newInput<[string, number]>() const mapped = baseInput.pipe( @@ -58,7 +59,7 @@ describe('Cache', () => { } }) - test('sends incrimental data to subscribers', () => { + test('sends all incremental data to subscribers', () => { const baseGraph = new D2({ initialFrontier: 0 }) const baseInput = baseGraph.newInput<[string, number]>() const mapped = baseInput.pipe( @@ -107,4 +108,371 @@ describe('Cache', () => { expect(msg2.type).toEqual(MessageType.FRONTIER) } }) + + test('sends initial data that matched whereKey to subscribers', () => { + const baseGraph = new D2({ initialFrontier: 0 }) + const baseInput = baseGraph.newInput<[string, number]>() + const mapped = baseInput.pipe( + map(([key, value]) => [key, value * 2] as [string, number]), + ) + const cache = new Cache(mapped) + baseGraph.finalize() + + baseInput.sendData( + 0, + new MultiSet<[string, number]>([ + [['a', 1], 1], + [['b', 2], 1], + ]), + ) + baseInput.sendFrontier(1) + + baseGraph.run() + + const newGraph = new D2({ initialFrontier: 0 }) + const messages: Message<[string, number]>[] = [] + + cache + .pipeInto(newGraph, { whereKey: 'a' }) + .pipe(output((message) => messages.push(message))) + newGraph.finalize() + newGraph.run() + + expect(messages.length).toEqual(2) + + { + const msg1 = messages[0] + expect(msg1.type).toEqual(MessageType.DATA) + const data = msg1.data as DataMessage<[string, number]> + expect(data.version.getInner()).toEqual([1]) + expect(data.collection.getInner()).toEqual([[['a', 2], 1]]) + } + { + const msg2 = messages[1] + expect(msg2.type).toEqual(MessageType.FRONTIER) + } + }) + + test('sends all incremental that matched whereKey data to subscribers', () => { + const baseGraph = new D2({ initialFrontier: 0 }) + const baseInput = baseGraph.newInput<[string, number]>() + const mapped = baseInput.pipe( + map(([key, value]) => [key, value * 2] as [string, number]), + ) + const cache = new Cache(mapped) + baseGraph.finalize() + + baseGraph.run() + + const newGraph = new D2({ initialFrontier: 0 }) + const messages: Message<[string, number]>[] = [] + + cache + .pipeInto(newGraph, { whereKey: 'a' }) + .pipe(output((message) => messages.push(message))) + newGraph.finalize() + newGraph.run() + + expect(messages.length).toEqual(0) + + baseInput.sendData(1, new MultiSet<[string, number]>([[['a', 1], 1]])) + baseInput.sendFrontier(2) + + baseGraph.run() + newGraph.run() + + expect(messages.length).toEqual(2) + + { + const msg1 = messages[0] + expect(msg1.type).toEqual(MessageType.DATA) + const data = msg1.data as DataMessage<[string, number]> + expect(data.version.getInner()).toEqual([1]) + expect(data.collection.getInner()).toEqual([[['a', 2], 1]]) + } + { + const msg2 = messages[1] + expect(msg2.type).toEqual(MessageType.FRONTIER) + } + }) + + test('supports gt operator for whereKey', () => { + const baseGraph = new D2({ initialFrontier: 0 }) + const baseInput = baseGraph.newInput<[string, number]>() + const mapped = baseInput.pipe( + map(([key, value]) => [key, value * 2] as [string, number]), + ) + const cache = new Cache(mapped) + baseGraph.finalize() + + baseInput.sendData( + 0, + new MultiSet<[string, number]>([ + [['a', 1], 1], + [['c', 3], 1], + ]), + ) + baseInput.sendFrontier(1) + + baseGraph.run() + + const newGraph = new D2({ initialFrontier: 0 }) + const messages: Message<[string, number]>[] = [] + + cache + .pipeInto(newGraph, { whereKey: gt('b') }) + .pipe(output((message) => messages.push(message))) + newGraph.finalize() + newGraph.run() + + expect(messages.length).toEqual(2) + + { + const msg1 = messages[0] + expect(msg1.type).toEqual(MessageType.DATA) + const data = msg1.data as DataMessage<[string, number]> + expect(data.version.getInner()).toEqual([1]) + expect(data.collection.getInner()).toEqual([[['c', 6], 1]]) + } + + // Test incremental updates + messages.length = 0 + + // Send data that matches the operator (> 'a') + baseInput.sendData(2, new MultiSet<[string, number]>([[['d', 4], 1]])) + // Send data that doesn't match the operator (<= 'a') + baseInput.sendData(2, new MultiSet<[string, number]>([[['b', 5], 1]])) + baseInput.sendFrontier(3) + + baseGraph.run() + newGraph.run() + + expect(messages.length).toEqual(3) + { + const msg1 = messages[0] + expect(msg1.type).toEqual(MessageType.DATA) + const data = msg1.data as DataMessage<[string, number]> + expect(data.version.getInner()).toEqual([2]) + // Should only include 'd' as it's > 'a', not 'a' itself + expect(data.collection.getInner()).toEqual([[['d', 8], 1]]) + } + }) + + test('supports between operator for whereKey', () => { + const baseGraph = new D2({ initialFrontier: 0 }) + const baseInput = baseGraph.newInput<[string, number]>() + const mapped = baseInput.pipe( + map(([key, value]) => [key, value * 2] as [string, number]), + ) + const cache = new Cache(mapped) + baseGraph.finalize() + + baseInput.sendData( + 0, + new MultiSet<[string, number]>([ + [['a', 1], 1], + [['b', 2], 1], + [['d', 4], 1], + ]), + ) + baseInput.sendFrontier(1) + + baseGraph.run() + + const newGraph = new D2({ initialFrontier: 0 }) + const messages: Message<[string, number]>[] = [] + + cache + .pipeInto(newGraph, { whereKey: between('b', 'd') }) + .pipe(output((message) => messages.push(message))) + newGraph.finalize() + newGraph.run() + + expect(messages.length).toEqual(2) + + { + const msg1 = messages[0] + expect(msg1.type).toEqual(MessageType.DATA) + const data = msg1.data as DataMessage<[string, number]> + expect(data.version.getInner()).toEqual([1]) + expect(data.collection.getInner()).toEqual([ + [['b', 4], 1], + [['d', 8], 1], + ]) + } + + // Test incremental updates + messages.length = 0 + + // Send data that matches the between operator + baseInput.sendData(2, new MultiSet<[string, number]>([[['c', 5], 1]])) + // Send data that doesn't match the between operator + baseInput.sendData(2, new MultiSet<[string, number]>([[['e', 7], 1]])) + baseInput.sendFrontier(3) + + baseGraph.run() + newGraph.run() + + expect(messages.length).toEqual(3) + { + const msg1 = messages[0] + expect(msg1.type).toEqual(MessageType.DATA) + const data = msg1.data as DataMessage<[string, number]> + expect(data.version.getInner()).toEqual([2]) + // Should only include 'b' as it's between 'b' and 'c' + expect(data.collection.getInner()).toEqual([[['c', 10], 1]]) + } + }) + + test('supports isIn operator for whereKey', () => { + const baseGraph = new D2({ initialFrontier: 0 }) + const baseInput = baseGraph.newInput<[string, number]>() + const mapped = baseInput.pipe( + map(([key, value]) => [key, value * 2] as [string, number]), + ) + const cache = new Cache(mapped) + baseGraph.finalize() + + baseInput.sendData( + 0, + new MultiSet<[string, number]>([ + [['a', 1], 1], + [['b', 2], 1], + [['c', 3], 1], + [['d', 4], 1], + ]), + ) + baseInput.sendFrontier(1) + + baseGraph.run() + + const newGraph = new D2({ initialFrontier: 0 }) + const messages: Message<[string, number]>[] = [] + + cache + .pipeInto(newGraph, { whereKey: isIn(['a', 'c', 'e', 'f']) }) + .pipe(output((message) => messages.push(message))) + newGraph.finalize() + newGraph.run() + + expect(messages.length).toEqual(2) + + { + const msg1 = messages[0] + expect(msg1.type).toEqual(MessageType.DATA) + const data = msg1.data as DataMessage<[string, number]> + expect(data.version.getInner()).toEqual([1]) + expect(data.collection.getInner()).toEqual([ + [['a', 2], 1], + [['c', 6], 1], + ]) + } + + // Test incremental updates + messages.length = 0 + + // Send data that matches the isIn operator + baseInput.sendData( + 2, + new MultiSet<[string, number]>([ + [['e', 5], 1], + [['f', 6], 1], + ]), + ) + // Send data that doesn't match the isIn operator + baseInput.sendData( + 2, + new MultiSet<[string, number]>([ + [['g', 7], 1], + [['h', 8], 1], + ]), + ) + baseInput.sendFrontier(3) + + baseGraph.run() + newGraph.run() + + expect(messages.length).toEqual(3) + { + const msg1 = messages[0] + expect(msg1.type).toEqual(MessageType.DATA) + const data = msg1.data as DataMessage<[string, number]> + expect(data.version.getInner()).toEqual([2]) + // Should only include 'a' and 'c' as they're in the set + expect(data.collection.getInner()).toEqual([ + [['e', 10], 1], + [['f', 12], 1], + ]) + } + }) + + test('supports complex operators (and, or, not) for whereKey', () => { + const baseGraph = new D2({ initialFrontier: 0 }) + const baseInput = baseGraph.newInput<[string, number]>() + const mapped = baseInput.pipe( + map(([key, value]) => [key, value * 2] as [string, number]), + ) + const cache = new Cache(mapped) + baseGraph.finalize() + + baseInput.sendData( + 0, + new MultiSet<[string, number]>([ + [['a', 1], 1], + [['b', 2], 1], + [['d', 4], 1], + [['e', 5], 1], + ]), + ) + baseInput.sendFrontier(1) + + baseGraph.run() + + const newGraph = new D2({ initialFrontier: 0 }) + const messages: Message<[string, number]>[] = [] + + // Complex condition: (key > 'b' AND key < 'e') OR key = 'a' + const complexOperator = or(and(gt('b'), lt('e')), eq('a')) + + cache + .pipeInto(newGraph, { whereKey: complexOperator }) + .pipe(output((message) => messages.push(message))) + newGraph.finalize() + newGraph.run() + + expect(messages.length).toEqual(2) + + { + const msg1 = messages[0] + expect(msg1.type).toEqual(MessageType.DATA) + const data = msg1.data as DataMessage<[string, number]> + expect(data.version.getInner()).toEqual([1]) + expect(data.collection.getInner()).toEqual([ + [['a', 2], 1], + [['d', 8], 1], + ]) + } + + // Test incremental updates + messages.length = 0 + + // Send data that matches the complex operator + baseInput.sendData(2, new MultiSet<[string, number]>([[['c', 6], 1]])) + // Send data that doesn't match the complex operator + baseInput.sendData(2, new MultiSet<[string, number]>([[['g', 7], 1]])) + baseInput.sendFrontier(3) + + baseGraph.run() + newGraph.run() + + expect(messages.length).toEqual(3) + { + const msg1 = messages[0] + expect(msg1.type).toEqual(MessageType.DATA) + const data = msg1.data as DataMessage<[string, number]> + expect(data.version.getInner()).toEqual([2]) + // Should only include 'a' and 'c' as they match the complex condition + expect(data.collection.getInner()).toEqual([[['c', 12], 1]]) + } + }) }) From 55f98654e14f514d1278017fc09f8fb9036c566d Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sun, 23 Mar 2025 10:09:09 +0000 Subject: [PATCH 4/5] Tidy up --- packages/d2ts/package.json | 4 ++++ packages/d2ts/src/cache.ts | 22 +--------------------- packages/d2ts/tests/cache.test.ts | 2 -- 3 files changed, 5 insertions(+), 23 deletions(-) diff --git a/packages/d2ts/package.json b/packages/d2ts/package.json index 750fc6d..ffb09e1 100644 --- a/packages/d2ts/package.json +++ b/packages/d2ts/package.json @@ -40,6 +40,10 @@ "./d2ql": { "types": "./dist/d2ql/index.d.ts", "default": "./dist/d2ql/index.js" + }, + "./cache": { + "types": "./dist/cache.d.ts", + "default": "./dist/cache.js" } }, "scripts": { diff --git a/packages/d2ts/src/cache.ts b/packages/d2ts/src/cache.ts index e979cf9..4c25c7e 100644 --- a/packages/d2ts/src/cache.ts +++ b/packages/d2ts/src/cache.ts @@ -14,29 +14,19 @@ import { DefaultMap } from './utils.js' import { filter } from './operators/filter.js' import { eq, IndexOperator } from './index-operators.js' -export interface CacheOptions { - indexedBy?: { - [key: string]: (item: T) => unknown | string - } -} - -export interface PipeIntoOptions { +export interface PipeIntoOptions { whereKey?: K | IndexOperator - where?: Record> } export class Cache { #index = new Index() - #options: CacheOptions> #stream: IStreamBuilder> #subscribers = new Set>>() constructor( stream: IStreamBuilder>, - options?: CacheOptions>, ) { this.#stream = stream - this.#options = options ?? {} this.#stream.pipe( output((message) => { this.#handleInputMessage(message) @@ -92,11 +82,6 @@ export class Cache { keysToSend = this.#index.keys() } - if (options.where) { - // TODO: implement where - throw new Error('where is not supported yet') - } - for (const key of keysToSend) { for (const [version, values] of this.#index.get(key)) { for (const [value, multiplicity] of values) { @@ -137,11 +122,6 @@ export class Cache { pipeline = pipeline.pipe(filter(([key]) => operator(key))) } - if (options.where) { - // TODO: implement where - throw new Error('where is not supported yet') - } - graph.addTeardownSubscriber(() => { this.#subscribers.delete(input) }) diff --git a/packages/d2ts/tests/cache.test.ts b/packages/d2ts/tests/cache.test.ts index a32f71f..9d278e9 100644 --- a/packages/d2ts/tests/cache.test.ts +++ b/packages/d2ts/tests/cache.test.ts @@ -3,14 +3,12 @@ import { Message, MessageType, DataMessage, - FrontierMessage, } from '../src/types.js' import { MultiSet } from '../src/multiset.js' import { D2 } from '../src/d2.js' import { Cache } from '../src/cache.js' import { map } from '../src/operators/map.js' import { output } from '../src/operators/output.js' -import { Antichain } from '../src/order.js' import { gt, lt, eq, or, and, between, isIn } from '../src/index-operators.js' describe('Cache', () => { From f1428db714f379390841a3d4ee24180641b41883 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sun, 23 Mar 2025 10:36:31 +0000 Subject: [PATCH 5/5] Fox tests, and implimentation --- packages/d2ts/src/cache.ts | 2 +- packages/d2ts/tests/cache.test.ts | 88 +++++++++++++++++++------------ 2 files changed, 55 insertions(+), 35 deletions(-) diff --git a/packages/d2ts/src/cache.ts b/packages/d2ts/src/cache.ts index 4c25c7e..fb23cf4 100644 --- a/packages/d2ts/src/cache.ts +++ b/packages/d2ts/src/cache.ts @@ -126,6 +126,6 @@ export class Cache { this.#subscribers.delete(input) }) - return input + return pipeline } } diff --git a/packages/d2ts/tests/cache.test.ts b/packages/d2ts/tests/cache.test.ts index 9d278e9..03f136d 100644 --- a/packages/d2ts/tests/cache.test.ts +++ b/packages/d2ts/tests/cache.test.ts @@ -1,14 +1,11 @@ import { describe, expect, test } from 'vitest' -import { - Message, - MessageType, - DataMessage, -} from '../src/types.js' +import { Message, MessageType, DataMessage } from '../src/types.js' import { MultiSet } from '../src/multiset.js' import { D2 } from '../src/d2.js' import { Cache } from '../src/cache.js' import { map } from '../src/operators/map.js' import { output } from '../src/operators/output.js' +import { debug } from '../src/operators/debug.js' import { gt, lt, eq, or, and, between, isIn } from '../src/index-operators.js' describe('Cache', () => { @@ -232,20 +229,29 @@ describe('Cache', () => { expect(data.version.getInner()).toEqual([1]) expect(data.collection.getInner()).toEqual([[['c', 6], 1]]) } + { + const msg2 = messages[1] + expect(msg2.type).toEqual(MessageType.FRONTIER) + } // Test incremental updates messages.length = 0 - // Send data that matches the operator (> 'a') - baseInput.sendData(2, new MultiSet<[string, number]>([[['d', 4], 1]])) - // Send data that doesn't match the operator (<= 'a') - baseInput.sendData(2, new MultiSet<[string, number]>([[['b', 5], 1]])) + baseInput.sendData( + 2, + new MultiSet<[string, number]>([ + // Send data that matches the operator (> 'b') + [['d', 4], 1], + // Send data that doesn't match the operator (<= 'b') + [['0', 5], 1], + ]), + ) baseInput.sendFrontier(3) baseGraph.run() newGraph.run() - expect(messages.length).toEqual(3) + expect(messages.length).toEqual(2) { const msg1 = messages[0] expect(msg1.type).toEqual(MessageType.DATA) @@ -254,6 +260,10 @@ describe('Cache', () => { // Should only include 'd' as it's > 'a', not 'a' itself expect(data.collection.getInner()).toEqual([[['d', 8], 1]]) } + { + const msg2 = messages[1] + expect(msg2.type).toEqual(MessageType.FRONTIER) + } }) test('supports between operator for whereKey', () => { @@ -302,16 +312,21 @@ describe('Cache', () => { // Test incremental updates messages.length = 0 - // Send data that matches the between operator - baseInput.sendData(2, new MultiSet<[string, number]>([[['c', 5], 1]])) - // Send data that doesn't match the between operator - baseInput.sendData(2, new MultiSet<[string, number]>([[['e', 7], 1]])) + baseInput.sendData( + 2, + new MultiSet<[string, number]>([ + // Send data that matches the between operator + [['c', 5], 1], + // Send data that doesn't match the between operator + [['e', 7], 1], + ]), + ) baseInput.sendFrontier(3) baseGraph.run() newGraph.run() - expect(messages.length).toEqual(3) + expect(messages.length).toEqual(2) { const msg1 = messages[0] expect(msg1.type).toEqual(MessageType.DATA) @@ -369,18 +384,13 @@ describe('Cache', () => { // Test incremental updates messages.length = 0 - // Send data that matches the isIn operator baseInput.sendData( 2, new MultiSet<[string, number]>([ + // Send data that matches the isIn operator [['e', 5], 1], [['f', 6], 1], - ]), - ) - // Send data that doesn't match the isIn operator - baseInput.sendData( - 2, - new MultiSet<[string, number]>([ + // Send data that doesn't match the isIn operator [['g', 7], 1], [['h', 8], 1], ]), @@ -390,7 +400,7 @@ describe('Cache', () => { baseGraph.run() newGraph.run() - expect(messages.length).toEqual(3) + expect(messages.length).toEqual(2) { const msg1 = messages[0] expect(msg1.type).toEqual(MessageType.DATA) @@ -430,11 +440,12 @@ describe('Cache', () => { const messages: Message<[string, number]>[] = [] // Complex condition: (key > 'b' AND key < 'e') OR key = 'a' - const complexOperator = or(and(gt('b'), lt('e')), eq('a')) + const complexOperator = or(and(gt('b'), lt('e')), eq('a'), eq('h')) - cache - .pipeInto(newGraph, { whereKey: complexOperator }) - .pipe(output((message) => messages.push(message))) + cache.pipeInto(newGraph, { whereKey: complexOperator }).pipe( + debug('cache'), + output((message) => messages.push(message)), + ) newGraph.finalize() newGraph.run() @@ -454,23 +465,32 @@ describe('Cache', () => { // Test incremental updates messages.length = 0 - // Send data that matches the complex operator - baseInput.sendData(2, new MultiSet<[string, number]>([[['c', 6], 1]])) - // Send data that doesn't match the complex operator - baseInput.sendData(2, new MultiSet<[string, number]>([[['g', 7], 1]])) + baseInput.sendData( + 2, + new MultiSet<[string, number]>([ + // Send data that matches the complex operator + [['c', 6], 1], + [['h', 8], 1], + // Send data that doesn't match the complex operator + [['g', 7], 1], + ]), + ) baseInput.sendFrontier(3) baseGraph.run() newGraph.run() - expect(messages.length).toEqual(3) + expect(messages.length).toEqual(2) { const msg1 = messages[0] expect(msg1.type).toEqual(MessageType.DATA) const data = msg1.data as DataMessage<[string, number]> expect(data.version.getInner()).toEqual([2]) - // Should only include 'a' and 'c' as they match the complex condition - expect(data.collection.getInner()).toEqual([[['c', 12], 1]]) + // Should only include 'a' and 'h' as they match the complex condition + expect(data.collection.getInner()).toEqual([ + [['c', 12], 1], + [['h', 16], 1], + ]) } }) })