Skip to content

Commit d9a8f65

Browse files
committed
Support streams
1 parent 8a26625 commit d9a8f65

File tree

8 files changed

+68
-16
lines changed

8 files changed

+68
-16
lines changed

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,8 @@ export class SqlSyncRules implements SyncRules {
241241
...options,
242242
accept_potentially_dangerous_queries,
243243
priority: rules.parsePriority(value),
244-
auto_subscribe: value.get('auto_subscribe', true)?.value == true
244+
auto_subscribe: value.get('auto_subscribe', true)?.value == true,
245+
fixedQuirks
245246
};
246247

247248
const data = value.get('query', true) as unknown;

packages/sync-rules/src/streams/from_sql.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import {
4141
Statement
4242
} from 'pgsql-ast-parser';
4343
import { STREAM_FUNCTIONS } from './functions.js';
44+
import { CompatibilityContext, CompatibilityLevel } from '../quirks.js';
4445

4546
export function syncStreamFromSql(
4647
descriptorName: string,
@@ -91,7 +92,8 @@ class SyncStreamCompiler {
9192

9293
const stream = new SyncStream(
9394
this.descriptorName,
94-
new BaseSqlDataQuery(this.compileDataQuery(tools, query, alias, sourceTable))
95+
new BaseSqlDataQuery(this.compileDataQuery(tools, query, alias, sourceTable)),
96+
new CompatibilityContext(CompatibilityLevel.SYNC_STREAMS, this.options.fixedQuirks)
9597
);
9698
stream.subscribedToByDefault = this.options.auto_subscribe ?? false;
9799
if (filter.isValid(tools)) {

packages/sync-rules/src/streams/parameter.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ import {
77
RequestParameters,
88
SqliteJsonValue,
99
SqliteRow,
10-
SqliteValue
10+
SqliteValue,
11+
TableRow
1112
} from '../types.js';
1213

1314
/**
@@ -31,7 +32,7 @@ export interface BucketParameter {
3132
* When a user connects, {@link StaticLookup.fromRequest} would return the user ID from the token. A matching bucket would
3233
* then contain the oplog data for assets with the matching `owner` column.
3334
*/
34-
filterRow(options: EvaluateRowOptions): SqliteJsonValue[];
35+
filterRow(row: TableRow): SqliteJsonValue[];
3536
}
3637

3738
export interface SubqueryEvaluator {

packages/sync-rules/src/streams/stream.ts

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { BucketInclusionReason, BucketPriority, DEFAULT_BUCKET_PRIORITY } from '
33
import { BucketParameterQuerier, PendingQueriers } from '../BucketParameterQuerier.js';
44
import { BucketSource, BucketSourceType, ResultSetDescription } from '../BucketSource.js';
55
import { ColumnDefinition } from '../ExpressionType.js';
6+
import { CompatibilityContext } from '../quirks.js';
67
import { SourceTableInterface } from '../SourceTableInterface.js';
78
import { GetQuerierOptions, RequestedStream } from '../SqlSyncRules.js';
89
import { TablePattern } from '../TablePattern.js';
@@ -12,8 +13,10 @@ import {
1213
EvaluationResult,
1314
RequestParameters,
1415
SourceSchema,
15-
SqliteRow
16+
SqliteRow,
17+
TableRow
1618
} from '../types.js';
19+
import { applyRowContext } from '../utils.js';
1720
import { StreamVariant } from './variant.js';
1821

1922
export class SyncStream implements BucketSource {
@@ -23,7 +26,11 @@ export class SyncStream implements BucketSource {
2326
variants: StreamVariant[];
2427
data: BaseSqlDataQuery;
2528

26-
constructor(name: string, data: BaseSqlDataQuery) {
29+
constructor(
30+
name: string,
31+
data: BaseSqlDataQuery,
32+
private readonly compatibility: CompatibilityContext
33+
) {
2734
this.name = name;
2835
this.subscribedToByDefault = false;
2936
this.priority = DEFAULT_BUCKET_PRIORITY;
@@ -165,13 +172,19 @@ export class SyncStream implements BucketSource {
165172
}
166173

167174
const stream = this;
175+
const mappedRow = applyRowContext(options.record, this.compatibility);
176+
const row: TableRow = {
177+
sourceTable: options.sourceTable,
178+
record: mappedRow
179+
};
180+
168181
return this.data.evaluateRowWithOptions({
169182
table: options.sourceTable,
170-
row: options.record,
183+
row: applyRowContext(options.record, this.compatibility),
171184
bucketIds() {
172185
const bucketIds: string[] = [];
173186
for (const variant of stream.variants) {
174-
bucketIds.push(...variant.bucketIdsForRow(stream.name, options));
187+
bucketIds.push(...variant.bucketIdsForRow(stream.name, row));
175188
}
176189

177190
return bucketIds;

packages/sync-rules/src/streams/variant.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ import {
66
EvaluateRowOptions,
77
RequestParameters,
88
SqliteJsonValue,
9-
SqliteRow
9+
SqliteRow,
10+
TableRow
1011
} from '../types.js';
1112
import { isJsonValue, JSONBucketNameSerialize, normalizeParameterValue } from '../utils.js';
1213
import { BucketParameter, SubqueryEvaluator } from './parameter.js';
@@ -45,7 +46,7 @@ export class StreamVariant {
4546
*
4647
* This is introduced for streams like `SELECT * FROM assets WHERE LENGTH(assets.name < 10)`.
4748
*/
48-
additionalRowFilters: ((options: EvaluateRowOptions) => boolean)[];
49+
additionalRowFilters: ((row: TableRow) => boolean)[];
4950

5051
/**
5152
* Additional filters that are evaluated against the request of the stream subscription.
@@ -66,7 +67,7 @@ export class StreamVariant {
6667
/**
6768
* Given a row in the table this stream selects from, returns all ids of buckets to which that row belongs to.
6869
*/
69-
bucketIdsForRow(streamName: string, options: EvaluateRowOptions): string[] {
70+
bucketIdsForRow(streamName: string, options: TableRow): string[] {
7071
return this.instantiationsForRow(options).map((values) => this.buildBucketId(streamName, values));
7172
}
7273

@@ -75,7 +76,7 @@ export class StreamVariant {
7576
*
7677
* The inner arrays will have a length equal to the amount of parameters in this variant.
7778
*/
78-
instantiationsForRow(options: EvaluateRowOptions): SqliteJsonValue[][] {
79+
instantiationsForRow(options: TableRow): SqliteJsonValue[][] {
7980
for (const additional of this.additionalRowFilters) {
8081
if (!additional(options)) {
8182
return [];

packages/sync-rules/src/types.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { BucketPriority } from './BucketDescription.js';
88
import { ParameterLookup } from './BucketParameterQuerier.js';
99
import { DateTimeValue } from './types/time.js';
1010
import { CustomSqliteValue } from './types/custom_sqlite_value.js';
11-
import { CompatibilityContext } from './quirks.js';
11+
import { CompatibilityContext, Quirk } from './quirks.js';
1212

1313
export interface SyncRules {
1414
evaluateRow(options: EvaluateRowOptions): EvaluationResult[];
@@ -23,6 +23,7 @@ export interface QueryParseOptions extends SyncRulesOptions {
2323

2424
export interface StreamParseOptions extends QueryParseOptions {
2525
auto_subscribe?: boolean;
26+
fixedQuirks: Quirk[];
2627
}
2728

2829
export interface EvaluatedParameters {
@@ -283,9 +284,14 @@ export interface InputParameter {
283284
parametersToLookupValue(parameters: ParameterValueSet): SqliteValue;
284285
}
285286

286-
export interface EvaluateRowOptions {
287+
export interface EvaluateRowOptions extends TableRow<SqliteInputRow> {}
288+
289+
/**
290+
* A row associated with the table it's coming from.
291+
*/
292+
export interface TableRow<R = SqliteRow> {
287293
sourceTable: SourceTableInterface;
288-
record: SqliteInputRow;
294+
record: R;
289295
}
290296

291297
/**

packages/sync-rules/test/src/quirks.test.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,29 @@ fixed_quirks:
5757
{ bucket: 'mybucket[]', data: { description: '2025-08-19T09:21:00Z', id: 'id' }, id: 'id', table: 'assets' }
5858
]);
5959
});
60+
61+
test('streams use new format by default', () => {
62+
const rules = SqlSyncRules.fromYaml(
63+
`
64+
streams:
65+
stream:
66+
query: SELECT id, description FROM assets
67+
`,
68+
PARSE_OPTIONS
69+
);
70+
71+
expect(
72+
rules.evaluateRow({
73+
sourceTable: ASSETS,
74+
record: {
75+
id: 'id',
76+
description: value
77+
}
78+
})
79+
).toStrictEqual([
80+
{ bucket: 'stream|0[]', data: { description: '2025-08-19T09:21:00Z', id: 'id' }, id: 'id', table: 'assets' }
81+
]);
82+
});
6083
});
6184

6285
test('warning for unknown quirk', () => {

packages/sync-rules/test/src/streams.test.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
SqliteJsonRow,
1212
SqliteRow,
1313
StaticSchema,
14+
StreamParseOptions,
1415
SyncStream,
1516
syncStreamFromSql
1617
} from '../../src/index.js';
@@ -610,7 +611,11 @@ const schema = new StaticSchema([
610611
}
611612
]);
612613

613-
const options = { schema: schema, ...PARSE_OPTIONS };
614+
const options: StreamParseOptions = {
615+
schema: schema,
616+
...PARSE_OPTIONS,
617+
fixedQuirks: []
618+
};
614619

615620
function evaluateBucketIds(stream: SyncStream, sourceTable: SourceTableInterface, record: SqliteRow) {
616621
return stream.evaluateRow({ sourceTable, record }).map((r) => {

0 commit comments

Comments
 (0)