Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/d2ts/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
"./d2ql": {
"types": "./dist/d2ql/index.d.ts",
"default": "./dist/d2ql/index.js"
},
"./cache": {
"types": "./dist/cache.d.ts",
"default": "./dist/cache.js"
}
},
"scripts": {
Expand Down
131 changes: 131 additions & 0 deletions packages/d2ts/src/cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import {
DataMessage,
ID2,
IStreamBuilder,
Message,
MessageType,
KeyValue,
} from './types.js'
import { Index } from './version-index.js'
import { output } from './operators/output.js'
import { Antichain, Version } from './order.js'
import { MultiSet } from './multiset.js'
import { DefaultMap } from './utils.js'
import { filter } from './operators/filter.js'
import { eq, IndexOperator } from './index-operators.js'

export interface PipeIntoOptions<K, _V> {
whereKey?: K | IndexOperator<K>
}

export class Cache<K, V> {
#index = new Index<K, V>()
#stream: IStreamBuilder<KeyValue<K, V>>
#subscribers = new Set<IStreamBuilder<KeyValue<K, V>>>()

constructor(
stream: IStreamBuilder<KeyValue<K, V>>,
) {
this.#stream = stream
this.#stream.pipe(
output((message) => {
this.#handleInputMessage(message)
}),
)
}

#handleInputMessage(message: Message<KeyValue<K, V>>): void {
if (message.type === MessageType.DATA) {
const { version, collection } = message.data as DataMessage<
KeyValue<K, V>
>
for (const [item, multiplicity] of collection.getInner()) {
const [key, value] = item
this.#index.addValue(key, version, [value, multiplicity])
}
} else if (message.type === MessageType.FRONTIER) {
const frontier = message.data as Antichain
this.#index.compact(frontier)
}
this.#broadcast(message)
}

#broadcast(message: Message<KeyValue<K, V>>): void {
if (message.type === MessageType.DATA) {
const { version, collection } = message.data
for (const subscriber of this.#subscribers) {
subscriber.writer.sendData(version, collection)
}
} else if (message.type === MessageType.FRONTIER) {
const frontier = message.data as Antichain
this.#index.compact(frontier)
for (const subscriber of this.#subscribers) {
subscriber.writer.sendFrontier(frontier)
}
}
}

#sendHistory(
input: IStreamBuilder<KeyValue<K, V>>,
options: PipeIntoOptions<K, V> = {},
): void {
const versionedData = new DefaultMap<Version, [[K, V], number][]>(() => [])

let keysToSend: K[]
if (options.whereKey) {
if (typeof options.whereKey === 'function') {
keysToSend = this.#index.matchKeys(options.whereKey as IndexOperator<K>)
} else {
keysToSend = [options.whereKey]
}
} else {
keysToSend = this.#index.keys()
}

for (const key of keysToSend) {
for (const [version, values] of this.#index.get(key)) {
for (const [value, multiplicity] of values) {
versionedData.get(version).push([[key, value], multiplicity])
}
}
}
const sortedVersions = Array.from(versionedData.keys()).sort((a, b) =>
a.lessThan(b) ? -1 : 1,
)
for (const version of sortedVersions) {
input.writer.sendData(version, new MultiSet(versionedData.get(version)))
}
const frontier = this.#index.frontier
if (frontier) {
input.writer.sendFrontier(frontier)
}
}

pipeInto(
graph: ID2,
options: PipeIntoOptions<K, V> = {},
): IStreamBuilder<KeyValue<K, V>> {
const input = graph.newInput<KeyValue<K, V>>()
this.#subscribers.add(input)

graph.addStartupSubscriber(() => {
this.#sendHistory(input, options)
})

let pipeline = input

if (options.whereKey) {
const operator =
typeof options.whereKey === 'function'
? (options.whereKey as IndexOperator<K>)
: (eq(options.whereKey) as IndexOperator<K>)
pipeline = pipeline.pipe(filter(([key]) => operator(key)))
}

graph.addTeardownSubscriber(() => {
this.#subscribers.delete(input)
})

return pipeline
}
}
27 changes: 27 additions & 0 deletions packages/d2ts/src/d2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ export class D2 implements ID2 {
#frontierStack: Antichain[] = []
#nextOperatorId = 0
#finalized = false
#closed = false
#startupSubscribers: (() => void)[] = []
#teardownSubscribers: (() => void)[] = []

constructor({ initialFrontier }: D2Options) {
this.#frontierStack = [Antichain.create(initialFrontier)]
Expand Down Expand Up @@ -68,12 +71,18 @@ export class D2 implements ID2 {
finalize() {
this.#checkNotFinalized()
this.#finalized = true
for (const subscriber of this.#startupSubscribers) {
subscriber()
}
}

step(): void {
if (!this.#finalized) {
throw new Error('Graph not finalized')
}
if (this.#closed) {
throw new Error('Graph closed')
}
for (const op of this.#operators) {
op.run()
}
Expand All @@ -88,6 +97,24 @@ export class D2 implements ID2 {
this.step()
}
}

close(): void {
if (this.#closed) {
throw new Error('Graph already closed')
}
this.#closed = true
for (const subscriber of this.#teardownSubscribers) {
subscriber()
}
}

addStartupSubscriber(subscriber: () => void): void {
this.#startupSubscribers.push(subscriber)
}

addTeardownSubscriber(subscriber: () => void): void {
this.#teardownSubscribers.push(subscriber)
}
}

export class StreamBuilder<T> implements IStreamBuilder<T> {
Expand Down
1 change: 1 addition & 0 deletions packages/d2ts/src/graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export class DifferenceStreamWriter<T> implements IDifferenceStreamWriter<T> {
data: dataMessage,
})
}
// console.log('sendData', JSON.stringify(this.#queues, null, 2))
}

sendFrontier(frontier: Antichain | Version | number | number[]): void {
Expand Down
48 changes: 48 additions & 0 deletions packages/d2ts/src/index-operators.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
export type IndexOperator<K> = (key: K) => boolean

export const eq = <K>(matchKey: K): IndexOperator<K> => {
return (key) => key === matchKey
}

export function neq<K>(matchKey: K): IndexOperator<K> {
return (key) => key !== matchKey
}

export function gt<K>(matchKey: K): IndexOperator<K> {
return (key) => key > matchKey
}

export function gte<K>(matchKey: K): IndexOperator<K> {
return (key) => key >= matchKey
}

export function lt<K>(matchKey: K): IndexOperator<K> {
return (key) => key < matchKey
}

export function lte<K>(matchKey: K): IndexOperator<K> {
return (key) => key <= matchKey
}

export function isIn<K>(matchKeys: K[] | Set<K>): IndexOperator<K> {
const matchKeysSet = matchKeys instanceof Set ? matchKeys : new Set(matchKeys)
return (key) => {
return matchKeysSet.has(key)
}
}

export function between<K>(start: K, end: K): IndexOperator<K> {
return (key) => key >= start && key <= end
}

export function and<K>(...operators: IndexOperator<K>[]): IndexOperator<K> {
return (key) => operators.every((op) => op(key))
}

export function or<K>(...operators: IndexOperator<K>[]): IndexOperator<K> {
return (key) => operators.some((op) => op(key))
}

export function not<K>(operator: IndexOperator<K>): IndexOperator<K> {
return (key) => !operator(key)
}
3 changes: 3 additions & 0 deletions packages/d2ts/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ export interface ID2 {
popFrontier(): void
finalize(): void
step(): void
close(): void
addStartupSubscriber(subscriber: () => void): void
addTeardownSubscriber(subscriber: () => void): void
}

export interface IStreamBuilder<T> {
Expand Down
Loading