diff --git a/.changeset/rich-shrimps-push.md b/.changeset/rich-shrimps-push.md new file mode 100644 index 000000000..4951cc869 --- /dev/null +++ b/.changeset/rich-shrimps-push.md @@ -0,0 +1,6 @@ +--- +'@powersync/service-sync-rules': minor +'@powersync/service-image': minor +--- + +Add the `versioned_bucket_ids` option in the `config:` block for sync rules. When enabled, generated bucket ids include the version of sync rules. This allows clients to sync more efficiently after updating sync rules. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 6952092c8..6d824c3b4 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -461,7 +461,8 @@ export class MongoBucketBatch if (sourceTable.syncData) { const { results: evaluated, errors: syncErrors } = this.sync_rules.evaluateRowWithErrors({ record: after, - sourceTable + sourceTable, + bucketIdTransformer: SqlSyncRules.versionedBucketIdTransformer(`${this.group_id}`) }); for (let error of syncErrors) { diff --git a/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap b/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap index 1dad61dc7..e3449a7c7 100644 --- a/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap +++ b/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap @@ -104,6 +104,116 @@ exports[`sync - mongodb > compacting data - invalidate checkpoint 2`] = ` ] `; +exports[`sync - mongodb > encodes sync rules id in buckes for streams 1`] = ` +[ + { + "checkpoint": { + "buckets": [ + { + "bucket": "1#test|0[]", + "checksum": 920318466, + "count": 1, + "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], + }, + ], + "last_op_id": "1", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "test", + }, + ], + "write_checkpoint": undefined, + }, + }, + { + "data": { + "after": "0", + "bucket": "1#test|0[]", + "data": [ + { + "checksum": 920318466, + "data": "{"id":"t1","description":"Test 1"}", + "object_id": "t1", + "object_type": "test", + "op": "PUT", + "op_id": "1", + "subkey": "e5aa2ddc-1328-58fa-a000-0b5ed31eaf1a", + }, + ], + "has_more": false, + "next_after": "1", + }, + }, + { + "checkpoint_complete": { + "last_op_id": "1", + }, + }, +] +`; + +exports[`sync - mongodb > encodes sync rules id in buckes for streams 2`] = ` +[ + { + "checkpoint": { + "buckets": [ + { + "bucket": "2#test|0[]", + "checksum": 920318466, + "count": 1, + "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], + }, + ], + "last_op_id": "2", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "test", + }, + ], + "write_checkpoint": undefined, + }, + }, + { + "data": { + "after": "0", + "bucket": "2#test|0[]", + "data": [ + { + "checksum": 920318466, + "data": "{"id":"t1","description":"Test 1"}", + "object_id": "t1", + "object_type": "test", + "op": "PUT", + "op_id": "2", + "subkey": "e5aa2ddc-1328-58fa-a000-0b5ed31eaf1a", + }, + ], + "has_more": false, + "next_after": "2", + }, + }, + { + "checkpoint_complete": { + "last_op_id": "2", + }, + }, +] +`; + exports[`sync - mongodb > expired token 1`] = ` [ { diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts index 0764496c0..62e7f118e 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts @@ -825,7 +825,8 @@ export class PostgresBucketBatch if (sourceTable.syncData) { const { results: evaluated, errors: syncErrors } = this.sync_rules.evaluateRowWithErrors({ record: after, - sourceTable + sourceTable, + bucketIdTransformer: sync_rules.SqlSyncRules.versionedBucketIdTransformer(`${this.group_id}`) }); for (const error of syncErrors) { diff --git a/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap b/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap index b1631416f..08a73c8c1 100644 --- a/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap +++ b/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap @@ -104,6 +104,116 @@ exports[`sync - postgres > compacting data - invalidate checkpoint 2`] = ` ] `; +exports[`sync - postgres > encodes sync rules id in buckes for streams 1`] = ` +[ + { + "checkpoint": { + "buckets": [ + { + "bucket": "1#test|0[]", + "checksum": 920318466, + "count": 1, + "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], + }, + ], + "last_op_id": "1", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "test", + }, + ], + "write_checkpoint": undefined, + }, + }, + { + "data": { + "after": "0", + "bucket": "1#test|0[]", + "data": [ + { + "checksum": 920318466, + "data": "{"id":"t1","description":"Test 1"}", + "object_id": "t1", + "object_type": "test", + "op": "PUT", + "op_id": "1", + "subkey": "02d285ac-4f96-5124-8fba-c6d1df992dd1", + }, + ], + "has_more": false, + "next_after": "1", + }, + }, + { + "checkpoint_complete": { + "last_op_id": "1", + }, + }, +] +`; + +exports[`sync - postgres > encodes sync rules id in buckes for streams 2`] = ` +[ + { + "checkpoint": { + "buckets": [ + { + "bucket": "2#test|0[]", + "checksum": 920318466, + "count": 1, + "priority": 3, + "subscriptions": [ + { + "default": 0, + }, + ], + }, + ], + "last_op_id": "2", + "streams": [ + { + "errors": [], + "is_default": true, + "name": "test", + }, + ], + "write_checkpoint": undefined, + }, + }, + { + "data": { + "after": "0", + "bucket": "2#test|0[]", + "data": [ + { + "checksum": 920318466, + "data": "{"id":"t1","description":"Test 1"}", + "object_id": "t1", + "object_type": "test", + "op": "PUT", + "op_id": "2", + "subkey": "02d285ac-4f96-5124-8fba-c6d1df992dd1", + }, + ], + "has_more": false, + "next_after": "2", + }, + }, + { + "checkpoint_complete": { + "last_op_id": "2", + }, + }, +] +`; + exports[`sync - postgres > expired token 1`] = ` [ { diff --git a/packages/service-core-tests/src/test-utils/general-utils.ts b/packages/service-core-tests/src/test-utils/general-utils.ts index 524a00116..6f93fffb9 100644 --- a/packages/service-core-tests/src/test-utils/general-utils.ts +++ b/packages/service-core-tests/src/test-utils/general-utils.ts @@ -107,6 +107,7 @@ export function querierOptions(globalParameters: RequestParameters): GetQuerierO return { globalParameters, hasDefaultStreams: true, - streams: {} + streams: {}, + bucketIdTransformer: SqlSyncRules.versionedBucketIdTransformer('1') }; } diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index 48fba5bff..df7d9d904 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -7,7 +7,7 @@ import { utils } from '@powersync/service-core'; import { JSONBig } from '@powersync/service-jsonbig'; -import { RequestParameters } from '@powersync/service-sync-rules'; +import { BucketSourceType, RequestParameters } from '@powersync/service-sync-rules'; import path from 'path'; import * as timers from 'timers/promises'; import { fileURLToPath } from 'url'; @@ -82,7 +82,10 @@ export function registerSyncTests(factory: storage.TestStorageFactory) { const stream = sync.streamResponse({ syncContext, bucketStorage: bucketStorage, - syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + syncRules: { + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + version: bucketStorage.group_id + }, params: { buckets: [], include_checksum: true, @@ -143,7 +146,10 @@ bucket_definitions: const stream = sync.streamResponse({ syncContext, bucketStorage, - syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + syncRules: { + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + version: bucketStorage.group_id + }, params: { buckets: [], include_checksum: true, @@ -206,7 +212,10 @@ bucket_definitions: const stream = sync.streamResponse({ syncContext, bucketStorage, - syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + syncRules: { + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + version: bucketStorage.group_id + }, params: { buckets: [], include_checksum: true, @@ -316,7 +325,10 @@ bucket_definitions: const stream = sync.streamResponse({ syncContext, bucketStorage, - syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + syncRules: { + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + version: bucketStorage.group_id + }, params: { buckets: [], include_checksum: true, @@ -457,7 +469,10 @@ bucket_definitions: const stream = sync.streamResponse({ syncContext, bucketStorage, - syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + syncRules: { + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + version: bucketStorage.group_id + }, params: { buckets: [], include_checksum: true, @@ -573,7 +588,10 @@ bucket_definitions: const stream = sync.streamResponse({ syncContext, bucketStorage, - syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + syncRules: { + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + version: bucketStorage.group_id + }, params: { buckets: [], include_checksum: true, @@ -639,7 +657,10 @@ bucket_definitions: const stream = sync.streamResponse({ syncContext, bucketStorage, - syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + syncRules: { + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + version: bucketStorage.group_id + }, params: { buckets: [], include_checksum: true, @@ -668,7 +689,10 @@ bucket_definitions: const stream = sync.streamResponse({ syncContext, bucketStorage, - syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + syncRules: { + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + version: bucketStorage.group_id + }, params: { buckets: [], include_checksum: true, @@ -699,7 +723,10 @@ bucket_definitions: const stream = sync.streamResponse({ syncContext, bucketStorage, - syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + syncRules: { + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + version: bucketStorage.group_id + }, params: { buckets: [], include_checksum: true, @@ -773,7 +800,10 @@ bucket_definitions: const stream = sync.streamResponse({ syncContext, bucketStorage, - syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + syncRules: { + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + version: bucketStorage.group_id + }, params: { buckets: [], include_checksum: true, @@ -849,7 +879,10 @@ bucket_definitions: const stream = sync.streamResponse({ syncContext, bucketStorage, - syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + syncRules: { + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + version: bucketStorage.group_id + }, params: { buckets: [], include_checksum: true, @@ -916,7 +949,10 @@ bucket_definitions: const stream = sync.streamResponse({ syncContext, bucketStorage, - syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + syncRules: { + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + version: bucketStorage.group_id + }, params: { buckets: [], include_checksum: true, @@ -984,7 +1020,10 @@ bucket_definitions: const stream = sync.streamResponse({ syncContext, bucketStorage, - syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + syncRules: { + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + version: bucketStorage.group_id + }, params: { buckets: [], include_checksum: true, @@ -1047,7 +1086,10 @@ bucket_definitions: const stream = sync.streamResponse({ syncContext, bucketStorage, - syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + syncRules: { + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + version: bucketStorage.group_id + }, params: { buckets: [], include_checksum: true, @@ -1173,7 +1215,10 @@ bucket_definitions: const params: sync.SyncStreamParameters = { syncContext, bucketStorage, - syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + syncRules: { + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + version: bucketStorage.group_id + }, params: { buckets: [], include_checksum: true, @@ -1211,6 +1256,59 @@ bucket_definitions: }) }); }); + + test('encodes sync rules id in buckes for streams', async () => { + await using f = await factory(); + const rules = ` +streams: + test: + auto_subscribe: true + query: SELECT * FROM test; + +config: + edition: 2 +`; + + for (let i = 0; i < 2; i++) { + const syncRules = await f.updateSyncRules({ + content: rules + }); + const bucketStorage = f.getInstance(syncRules); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't1', + description: 'Test 1' + }, + afterReplicaId: 't1' + }); + await batch.commit('0/1'); + }); + + const stream = sync.streamResponse({ + syncContext, + bucketStorage: bucketStorage, + syncRules: { + syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS), + version: bucketStorage.group_id + }, + params: { + buckets: [], + include_checksum: true, + raw_data: true + }, + tracker, + token: { sub: '', exp: Date.now() / 1000 + 10 } as any, + isEncodingAsBson: false + }); + + const lines = await consumeCheckpointLines(stream); + expect(lines).toMatchSnapshot(); + } + }); } /** diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index 1c3f8d8b1..7e215907c 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -88,7 +88,10 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => for await (const data of sync.streamResponse({ syncContext: syncContext, bucketStorage: bucketStorage, - syncRules: syncRules, + syncRules: { + syncRules, + version: bucketStorage.group_id + }, params: { ...params }, diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index 15b16bd33..c77119386 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -71,7 +71,10 @@ export const syncStreamed = routeDefinition({ const syncLines = sync.streamResponse({ syncContext: syncContext, bucketStorage, - syncRules: syncRules, + syncRules: { + syncRules, + version: bucketStorage.group_id + }, params: payload.params, token: payload.context.token_payload!, tracker, diff --git a/packages/service-core/src/sync/BucketChecksumState.ts b/packages/service-core/src/sync/BucketChecksumState.ts index 2679e21f7..4ba687500 100644 --- a/packages/service-core/src/sync/BucketChecksumState.ts +++ b/packages/service-core/src/sync/BucketChecksumState.ts @@ -24,10 +24,15 @@ import { BucketParameterQuerier, QuerierError } from '@powersync/service-sync-ru import { SyncContext } from './SyncContext.js'; import { getIntersection, hasIntersection } from './util.js'; +export interface VersionedSyncRules { + syncRules: SqlSyncRules; + version: number; +} + export interface BucketChecksumStateOptions { syncContext: SyncContext; bucketStorage: BucketChecksumStateStorage; - syncRules: SqlSyncRules; + syncRules: VersionedSyncRules; tokenPayload: RequestJwtPayload; syncRequest: util.StreamingSyncRequest; logger?: Logger; @@ -248,7 +253,7 @@ export class BucketChecksumState { const streamNameToIndex = new Map(); this.streamNameToIndex = streamNameToIndex; - for (const source of this.parameterState.syncRules.bucketSources) { + for (const source of this.parameterState.syncRules.syncRules.bucketSources) { if (this.parameterState.isSubscribedToStream(source)) { streamNameToIndex.set(source.name, subscriptions.length); @@ -376,7 +381,7 @@ export interface CheckpointUpdate { export class BucketParameterState { private readonly context: SyncContext; public readonly bucketStorage: BucketChecksumStateStorage; - public readonly syncRules: SqlSyncRules; + public readonly syncRules: VersionedSyncRules; public readonly syncParams: RequestParameters; private readonly querier: BucketParameterQuerier; /** @@ -399,7 +404,7 @@ export class BucketParameterState { constructor( context: SyncContext, bucketStorage: BucketChecksumStateStorage, - syncRules: SqlSyncRules, + syncRules: VersionedSyncRules, tokenPayload: RequestJwtPayload, request: util.StreamingSyncRequest, logger: Logger @@ -431,10 +436,11 @@ export class BucketParameterState { this.includeDefaultStreams = subscriptions?.include_defaults ?? true; this.explicitStreamSubscriptions = explicitStreamSubscriptions; - const { querier, errors } = syncRules.getBucketParameterQuerier({ + const { querier, errors } = syncRules.syncRules.getBucketParameterQuerier({ globalParameters: this.syncParams, hasDefaultStreams: this.includeDefaultStreams, - streams: streamsByName + streams: streamsByName, + bucketIdTransformer: SqlSyncRules.versionedBucketIdTransformer(`${syncRules.version}`) }); this.querier = querier; this.streamErrors = Object.groupBy(errors, (e) => e.descriptor) as Record; diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index 177036972..32211f044 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -14,7 +14,7 @@ import * as storage from '../storage/storage-index.js'; import * as util from '../util/util-index.js'; import { Logger, logger as defaultLogger } from '@powersync/lib-services-framework'; -import { BucketChecksumState, CheckpointLine } from './BucketChecksumState.js'; +import { BucketChecksumState, CheckpointLine, VersionedSyncRules } from './BucketChecksumState.js'; import { mergeAsyncIterables } from '../streams/streams-index.js'; import { acquireSemaphoreAbortable, settledPromise, tokenStream, TokenStreamOptions } from './util.js'; import { SyncContext } from './SyncContext.js'; @@ -23,7 +23,7 @@ import { OperationsSentStats, RequestTracker, statsForBatch } from './RequestTra export interface SyncStreamParameters { syncContext: SyncContext; bucketStorage: storage.SyncRulesBucketStorage; - syncRules: SqlSyncRules; + syncRules: VersionedSyncRules; params: util.StreamingSyncRequest; token: auth.JwtPayload; logger?: Logger; @@ -100,7 +100,7 @@ export async function* streamResponse( async function* streamResponseInner( syncContext: SyncContext, bucketStorage: storage.SyncRulesBucketStorage, - syncRules: SqlSyncRules, + syncRules: VersionedSyncRules, params: util.StreamingSyncRequest, tokenPayload: RequestJwtPayload, tracker: RequestTracker, diff --git a/packages/service-core/test/src/sync/BucketChecksumState.test.ts b/packages/service-core/test/src/sync/BucketChecksumState.test.ts index ff35cbcb9..a27bf9a37 100644 --- a/packages/service-core/test/src/sync/BucketChecksumState.test.ts +++ b/packages/service-core/test/src/sync/BucketChecksumState.test.ts @@ -17,9 +17,7 @@ import { ParameterLookup, SqlSyncRules, RequestJwtPayload, - BucketSource, - BucketSourceType, - BucketParameterQuerier + BucketSource } from '@powersync/service-sync-rules'; import { describe, expect, test, beforeEach } from 'vitest'; @@ -77,7 +75,10 @@ bucket_definitions: syncContext, syncRequest, tokenPayload, - syncRules: SYNC_RULES_GLOBAL, + syncRules: { + syncRules: SYNC_RULES_GLOBAL, + version: 1 + }, bucketStorage: storage }); @@ -147,7 +148,10 @@ bucket_definitions: tokenPayload, // Client sets the initial state here syncRequest: { buckets: [{ name: 'global[]', after: '1' }] }, - syncRules: SYNC_RULES_GLOBAL, + syncRules: { + syncRules: SYNC_RULES_GLOBAL, + version: 1 + }, bucketStorage: storage }); @@ -185,7 +189,10 @@ bucket_definitions: syncContext, tokenPayload, syncRequest, - syncRules: SYNC_RULES_GLOBAL_TWO, + syncRules: { + syncRules: SYNC_RULES_GLOBAL_TWO, + version: 2 + }, bucketStorage: storage }); @@ -253,7 +260,10 @@ bucket_definitions: tokenPayload, // Client sets the initial state here syncRequest: { buckets: [{ name: 'something_here[]', after: '1' }] }, - syncRules: SYNC_RULES_GLOBAL, + syncRules: { + syncRules: SYNC_RULES_GLOBAL, + version: 1 + }, bucketStorage: storage }); @@ -294,7 +304,10 @@ bucket_definitions: syncContext, tokenPayload, syncRequest, - syncRules: SYNC_RULES_GLOBAL_TWO, + syncRules: { + syncRules: SYNC_RULES_GLOBAL_TWO, + version: 1 + }, bucketStorage: storage }); @@ -347,7 +360,10 @@ bucket_definitions: syncContext, tokenPayload, syncRequest, - syncRules: SYNC_RULES_GLOBAL_TWO, + syncRules: { + syncRules: SYNC_RULES_GLOBAL_TWO, + version: 2 + }, bucketStorage: storage }); @@ -402,7 +418,10 @@ bucket_definitions: syncContext, tokenPayload, syncRequest, - syncRules: SYNC_RULES_GLOBAL_TWO, + syncRules: { + syncRules: SYNC_RULES_GLOBAL_TWO, + version: 2 + }, bucketStorage: storage }); @@ -506,7 +525,10 @@ bucket_definitions: syncContext, tokenPayload: { sub: 'u1' }, syncRequest, - syncRules: SYNC_RULES_DYNAMIC, + syncRules: { + syncRules: SYNC_RULES_DYNAMIC, + version: 1 + }, bucketStorage: storage }); @@ -629,7 +651,7 @@ config: syncContext, syncRequest, tokenPayload, - syncRules: rules, + syncRules: { syncRules: rules, version: 1 }, bucketStorage: storage, ...options }); @@ -637,9 +659,9 @@ config: beforeEach(() => { storage = new MockBucketChecksumStateStorage(); - storage.updateTestChecksum({ bucket: 'stream|0["default"]', checksum: 1, count: 1 }); - storage.updateTestChecksum({ bucket: 'stream|0["a"]', checksum: 1, count: 1 }); - storage.updateTestChecksum({ bucket: 'stream|0["b"]', checksum: 1, count: 1 }); + storage.updateTestChecksum({ bucket: '1#stream|0["default"]', checksum: 1, count: 1 }); + storage.updateTestChecksum({ bucket: '1#stream|0["a"]', checksum: 1, count: 1 }); + storage.updateTestChecksum({ bucket: '1#stream|0["b"]', checksum: 1, count: 1 }); }); test('includes defaults', async () => { @@ -653,7 +675,7 @@ config: expect(line?.checkpointLine).toEqual({ checkpoint: { buckets: [ - { bucket: 'stream|0["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] } + { bucket: '1#stream|0["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] } ], last_op_id: '1', write_checkpoint: undefined, @@ -702,9 +724,9 @@ config: expect(line?.checkpointLine).toEqual({ checkpoint: { buckets: [ - { bucket: 'stream|0["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] }, - { bucket: 'stream|0["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 1 }] }, - { bucket: 'stream|0["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] } + { bucket: '1#stream|0["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] }, + { bucket: '1#stream|0["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 1 }] }, + { bucket: '1#stream|0["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] } ], last_op_id: '1', write_checkpoint: undefined, @@ -734,8 +756,8 @@ config: expect(line?.checkpointLine).toEqual({ checkpoint: { buckets: [ - { bucket: 'stream|0["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] }, - { bucket: 'stream|0["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }, { sub: 1 }] } + { bucket: '1#stream|0["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] }, + { bucket: '1#stream|0["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }, { sub: 1 }] } ], last_op_id: '1', write_checkpoint: undefined, @@ -762,9 +784,9 @@ config: expect(line?.checkpointLine).toEqual({ checkpoint: { buckets: [ - { bucket: 'stream|0["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] }, + { bucket: '1#stream|0["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] }, { - bucket: 'stream|0["default"]', + bucket: '1#stream|0["default"]', checksum: 1, count: 1, priority: 1, @@ -799,10 +821,10 @@ config: expect(line?.checkpointLine).toEqual({ checkpoint: { buckets: [ - { bucket: 'stream|0["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] }, - { bucket: 'stream|0["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] }, + { bucket: '1#stream|0["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] }, + { bucket: '1#stream|0["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] }, { - bucket: 'stream|0["default"]', + bucket: '1#stream|0["default"]', checksum: 1, count: 1, priority: 3, diff --git a/packages/sync-rules/src/BaseSqlDataQuery.ts b/packages/sync-rules/src/BaseSqlDataQuery.ts index 650494dd5..342330e18 100644 --- a/packages/sync-rules/src/BaseSqlDataQuery.ts +++ b/packages/sync-rules/src/BaseSqlDataQuery.ts @@ -5,6 +5,7 @@ import { SourceTableInterface } from './SourceTableInterface.js'; import { SqlTools } from './sql_filters.js'; import { TablePattern } from './TablePattern.js'; import { + BucketIdTransformer, EvaluationResult, QueryParameters, QuerySchema, @@ -25,6 +26,7 @@ export interface EvaluateRowOptions { table: SourceTableInterface; row: SqliteRow; bucketIds: (params: QueryParameters) => string[]; + bucketIdTransformer: BucketIdTransformer | null; } export interface BaseSqlDataQueryOptions { @@ -175,7 +177,7 @@ export class BaseSqlDataQuery { } } - evaluateRowWithOptions(options: EvaluateRowOptions): EvaluationResult[] { + evaluateRowWithOptions(options: Omit): EvaluationResult[] { try { const { table, row, bucketIds } = options; diff --git a/packages/sync-rules/src/SqlBucketDescriptor.ts b/packages/sync-rules/src/SqlBucketDescriptor.ts index beee77a80..46d9670f7 100644 --- a/packages/sync-rules/src/SqlBucketDescriptor.ts +++ b/packages/sync-rules/src/SqlBucketDescriptor.ts @@ -13,6 +13,7 @@ import { TableValuedFunctionSqlParameterQuery } from './TableValuedFunctionSqlPa import { SqlRuleError } from './errors.js'; import { CompatibilityContext } from './compatibility.js'; import { + BucketIdTransformer, EvaluatedParametersResult, EvaluateRowOptions, EvaluationResult, @@ -104,7 +105,13 @@ export class SqlBucketDescriptor implements BucketSource { continue; } - results.push(...query.evaluateRow(options.sourceTable, applyRowContext(options.record, this.compatibility))); + results.push( + ...query.evaluateRow( + options.sourceTable, + applyRowContext(options.record, this.compatibility), + options.bucketIdTransformer + ) + ); } return results; } @@ -131,7 +138,11 @@ export class SqlBucketDescriptor implements BucketSource { pushBucketParameterQueriers(result: PendingQueriers, options: GetQuerierOptions) { const reasons = [this.bucketInclusionReason()]; - const staticBuckets = this.getStaticBucketDescriptions(options.globalParameters, reasons); + const staticBuckets = this.getStaticBucketDescriptions( + options.globalParameters, + reasons, + options.bucketIdTransformer + ); const staticQuerier = { staticBuckets, hasDynamicBuckets: false, @@ -145,15 +156,19 @@ export class SqlBucketDescriptor implements BucketSource { } const dynamicQueriers = this.parameterQueries.map((query) => - query.getBucketParameterQuerier(options.globalParameters, reasons) + query.getBucketParameterQuerier(options.globalParameters, reasons, options.bucketIdTransformer) ); result.queriers.push(...dynamicQueriers); } - getStaticBucketDescriptions(parameters: RequestParameters, reasons: BucketInclusionReason[]): ResolvedBucket[] { + getStaticBucketDescriptions( + parameters: RequestParameters, + reasons: BucketInclusionReason[], + transformer: BucketIdTransformer + ): ResolvedBucket[] { let results: ResolvedBucket[] = []; for (let query of this.globalParameterQueries) { - for (const desc of query.getStaticBucketDescriptions(parameters)) { + for (const desc of query.getStaticBucketDescriptions(parameters, transformer)) { results.push({ ...desc, definition: this.name, diff --git a/packages/sync-rules/src/SqlDataQuery.ts b/packages/sync-rules/src/SqlDataQuery.ts index 8c4ef627f..1c575d6e5 100644 --- a/packages/sync-rules/src/SqlDataQuery.ts +++ b/packages/sync-rules/src/SqlDataQuery.ts @@ -10,7 +10,7 @@ import { checkUnsupportedFeatures, isClauseError } from './sql_support.js'; import { SyncRulesOptions } from './SqlSyncRules.js'; import { TablePattern } from './TablePattern.js'; import { TableQuerySchema } from './TableQuerySchema.js'; -import { EvaluationResult, ParameterMatchClause, QuerySchema, SqliteRow } from './types.js'; +import { BucketIdTransformer, EvaluationResult, ParameterMatchClause, QuerySchema, SqliteRow } from './types.js'; import { getBucketId, isSelectStatement } from './utils.js'; export interface SqlDataQueryOptions extends BaseSqlDataQueryOptions { @@ -185,13 +185,19 @@ export class SqlDataQuery extends BaseSqlDataQuery { this.filter = options.filter; } - evaluateRow(table: SourceTableInterface, row: SqliteRow): EvaluationResult[] { + evaluateRow( + table: SourceTableInterface, + row: SqliteRow, + bucketIdTransformer: BucketIdTransformer + ): EvaluationResult[] { return this.evaluateRowWithOptions({ table, row, bucketIds: (tables) => { const bucketParameters = this.filter.filterRow(tables); - return bucketParameters.map((params) => getBucketId(this.descriptorName, this.bucketParameters, params)); + return bucketParameters.map((params) => + getBucketId(this.descriptorName, this.bucketParameters, params, bucketIdTransformer) + ); } }); } diff --git a/packages/sync-rules/src/SqlParameterQuery.ts b/packages/sync-rules/src/SqlParameterQuery.ts index 37bf7e11a..44fca5504 100644 --- a/packages/sync-rules/src/SqlParameterQuery.ts +++ b/packages/sync-rules/src/SqlParameterQuery.ts @@ -15,6 +15,7 @@ import { TablePattern } from './TablePattern.js'; import { TableQuerySchema } from './TableQuerySchema.js'; import { TableValuedFunctionSqlParameterQuery } from './TableValuedFunctionSqlParameterQuery.js'; import { + BucketIdTransformer, EvaluatedParameters, EvaluatedParametersResult, InputParameter, @@ -348,7 +349,11 @@ export class SqlParameterQuery { * * Internal function, but exposed for tests. */ - resolveBucketDescriptions(bucketParameters: SqliteJsonRow[], parameters: RequestParameters): BucketDescription[] { + resolveBucketDescriptions( + bucketParameters: SqliteJsonRow[], + parameters: RequestParameters, + transformer: BucketIdTransformer + ): BucketDescription[] { // Filters have already been applied and gotten us the set of bucketParameters - don't attempt to filter again. // We _do_ need to evaluate the output columns here, using a combination of precomputed bucketParameters, // and values from token parameters. @@ -372,7 +377,7 @@ export class SqlParameterQuery { } return { - bucket: getBucketId(this.descriptorName, this.bucketParameters, result), + bucket: getBucketId(this.descriptorName, this.bucketParameters, result, transformer), priority: this.priority }; }) @@ -457,7 +462,8 @@ export class SqlParameterQuery { getBucketParameterQuerier( requestParameters: RequestParameters, - reasons: BucketInclusionReason[] + reasons: BucketInclusionReason[], + transformer: BucketIdTransformer ): BucketParameterQuerier { const lookups = this.getLookups(requestParameters); if (lookups.length == 0) { @@ -477,7 +483,7 @@ export class SqlParameterQuery { parameterQueryLookups: lookups, queryDynamicBucketDescriptions: async (source: ParameterLookupSource) => { const bucketParameters = await source.getParameterSets(lookups); - return this.resolveBucketDescriptions(bucketParameters, requestParameters).map((bucket) => ({ + return this.resolveBucketDescriptions(bucketParameters, requestParameters, transformer).map((bucket) => ({ ...bucket, definition: this.descriptorName, inclusion_reasons: reasons diff --git a/packages/sync-rules/src/SqlSyncRules.ts b/packages/sync-rules/src/SqlSyncRules.ts index bca6330ca..66f5e5985 100644 --- a/packages/sync-rules/src/SqlSyncRules.ts +++ b/packages/sync-rules/src/SqlSyncRules.ts @@ -8,6 +8,7 @@ import { SourceTableInterface } from './SourceTableInterface.js'; import { QueryParseResult, SqlBucketDescriptor } from './SqlBucketDescriptor.js'; import { TablePattern } from './TablePattern.js'; import { + BucketIdTransformer, EvaluatedParameters, EvaluatedParametersResult, EvaluatedRow, @@ -59,6 +60,13 @@ export interface RequestedStream { } export interface GetQuerierOptions { + /** + * A bucket id transformer, compatible to the one used when evaluating rows. + * + * Typically, this transformer only depends on the sync rule id (which is known to both the bucket storage + * implementation responsible for evaluating rows and the sync endpoint). + */ + bucketIdTransformer: BucketIdTransformer; globalParameters: RequestParameters; /** * Whether the client is subscribing to default query streams. @@ -85,6 +93,7 @@ export interface GetBucketParameterQuerierResult { export class SqlSyncRules implements SyncRules { bucketSources: BucketSource[] = []; eventDescriptors: SqlEventDescriptor[] = []; + compatibility: CompatibilityContext = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY; content: string; @@ -157,6 +166,7 @@ export class SqlSyncRules implements SyncRules { } compatibility = new CompatibilityContext(edition, options); + rules.compatibility = compatibility; } // Bucket definitions using explicit parameter and data queries. @@ -384,9 +394,17 @@ export class SqlSyncRules implements SyncRules { } evaluateRowWithErrors(options: EvaluateRowOptions): { results: EvaluatedRow[]; errors: EvaluationError[] } { + const resolvedOptions = this.compatibility.isEnabled(CompatibilityOption.versionedBucketIds) + ? options + : { + ...options, + // Disable bucket id transformer when the option is unused. + bucketIdTransformer: (id: string) => id + }; + let rawResults: EvaluationResult[] = []; for (let source of this.bucketSources) { - rawResults.push(...source.evaluateRow(options)); + rawResults.push(...source.evaluateRow(resolvedOptions)); } const results = rawResults.filter(isEvaluatedRow) as EvaluatedRow[]; @@ -421,13 +439,24 @@ export class SqlSyncRules implements SyncRules { } getBucketParameterQuerier(options: GetQuerierOptions): GetBucketParameterQuerierResult { + const resolvedOptions = this.compatibility.isEnabled(CompatibilityOption.versionedBucketIds) + ? options + : { + ...options, + // Disable bucket id transformer when the option is unused. + bucketIdTransformer: (id: string) => id + }; + const queriers: BucketParameterQuerier[] = []; const errors: QuerierError[] = []; const pending = { queriers, errors }; for (const source of this.bucketSources) { - if ((source.subscribedToByDefault && options.hasDefaultStreams) || source.name in options.streams) { - source.pushBucketParameterQueriers(pending, options); + if ( + (source.subscribedToByDefault && resolvedOptions.hasDefaultStreams) || + source.name in resolvedOptions.streams + ) { + source.pushBucketParameterQueriers(pending, resolvedOptions); } } @@ -502,4 +531,8 @@ export class SqlSyncRules implements SyncRules { } } } + + static versionedBucketIdTransformer(version: string) { + return (bucketId: string) => `${version}#${bucketId}`; + } } diff --git a/packages/sync-rules/src/StaticSqlParameterQuery.ts b/packages/sync-rules/src/StaticSqlParameterQuery.ts index 414ee2bdd..57bbc6dfd 100644 --- a/packages/sync-rules/src/StaticSqlParameterQuery.ts +++ b/packages/sync-rules/src/StaticSqlParameterQuery.ts @@ -3,7 +3,13 @@ import { BucketDescription, BucketPriority, DEFAULT_BUCKET_PRIORITY } from './Bu import { SqlRuleError } from './errors.js'; import { SqlTools } from './sql_filters.js'; import { checkUnsupportedFeatures, isClauseError, isParameterValueClause, sqliteBool } from './sql_support.js'; -import { ParameterValueClause, QueryParseOptions, RequestParameters, SqliteJsonValue } from './types.js'; +import { + BucketIdTransformer, + ParameterValueClause, + QueryParseOptions, + RequestParameters, + SqliteJsonValue +} from './types.js'; import { getBucketId, isJsonValue } from './utils.js'; export interface StaticSqlParameterQueryOptions { @@ -153,7 +159,7 @@ export class StaticSqlParameterQuery { this.errors = options.errors ?? []; } - getStaticBucketDescriptions(parameters: RequestParameters): BucketDescription[] { + getStaticBucketDescriptions(parameters: RequestParameters, transformer: BucketIdTransformer): BucketDescription[] { if (this.filter == null) { // Error in filter clause return []; @@ -177,7 +183,7 @@ export class StaticSqlParameterQuery { return [ { - bucket: getBucketId(this.descriptorName, this.bucketParameters, result), + bucket: getBucketId(this.descriptorName, this.bucketParameters, result, transformer), priority: this.priority } ]; diff --git a/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts b/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts index a19930af3..25ca791f1 100644 --- a/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts +++ b/packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts @@ -4,6 +4,7 @@ import { SqlTools } from './sql_filters.js'; import { checkUnsupportedFeatures, isClauseError, isParameterValueClause, sqliteBool } from './sql_support.js'; import { TABLE_VALUED_FUNCTIONS, TableValuedFunction } from './TableValuedFunctions.js'; import { + BucketIdTransformer, ParameterValueClause, ParameterValueSet, QueryParseOptions, @@ -202,7 +203,7 @@ export class TableValuedFunctionSqlParameterQuery { this.errors = options.errors; } - getStaticBucketDescriptions(parameters: RequestParameters): BucketDescription[] { + getStaticBucketDescriptions(parameters: RequestParameters, transformer: BucketIdTransformer): BucketDescription[] { if (this.filter == null || this.callClause == null) { // Error in filter clause return []; @@ -212,7 +213,7 @@ export class TableValuedFunctionSqlParameterQuery { const rows = this.function.call([valueString]); let total: BucketDescription[] = []; for (let row of rows) { - const description = this.getIndividualBucketDescription(row, parameters); + const description = this.getIndividualBucketDescription(row, parameters, transformer); if (description !== null) { total.push(description); } @@ -220,7 +221,11 @@ export class TableValuedFunctionSqlParameterQuery { return total; } - private getIndividualBucketDescription(row: SqliteRow, parameters: RequestParameters): BucketDescription | null { + private getIndividualBucketDescription( + row: SqliteRow, + parameters: RequestParameters, + transformer: BucketIdTransformer + ): BucketDescription | null { const mergedParams: ParameterValueSet = { ...parameters, lookup: (table, column) => { @@ -247,7 +252,7 @@ export class TableValuedFunctionSqlParameterQuery { } return { - bucket: getBucketId(this.descriptorName, this.bucketParameters, result), + bucket: getBucketId(this.descriptorName, this.bucketParameters, result, transformer), priority: this.priority }; } diff --git a/packages/sync-rules/src/compatibility.ts b/packages/sync-rules/src/compatibility.ts index 89459f05b..e50aa06fc 100644 --- a/packages/sync-rules/src/compatibility.ts +++ b/packages/sync-rules/src/compatibility.ts @@ -22,8 +22,15 @@ export class CompatibilityOption { CompatibilityEdition.SYNC_STREAMS ); + static versionedBucketIds = new CompatibilityOption( + 'versioned_bucket_ids', + 'Encode the version of sync rules in generated bucket ids, which avoids clients downloading data twice and improves client-side progress estimates.', + CompatibilityEdition.SYNC_STREAMS + ); + static byName: Record = Object.freeze({ - timestamps_iso8601: this.timestampsIso8601 + timestamps_iso8601: this.timestampsIso8601, + versioned_bucket_ids: this.versionedBucketIds }); } diff --git a/packages/sync-rules/src/streams/stream.ts b/packages/sync-rules/src/streams/stream.ts index 359fe63db..ca13d9369 100644 --- a/packages/sync-rules/src/streams/stream.ts +++ b/packages/sync-rules/src/streams/stream.ts @@ -8,6 +8,7 @@ import { SourceTableInterface } from '../SourceTableInterface.js'; import { GetQuerierOptions, RequestedStream } from '../SqlSyncRules.js'; import { TablePattern } from '../TablePattern.js'; import { + BucketIdTransformer, EvaluatedParametersResult, EvaluateRowOptions, EvaluationResult, @@ -59,27 +60,28 @@ export class SyncStream implements BucketSource { hasExplicitDefaultSubscription = true; } - this.queriersForSubscription(result, subscription, subscriptionParams); + this.queriersForSubscription(result, subscription, subscriptionParams, options.bucketIdTransformer); } // If the stream is subscribed to by default and there is no explicit subscription that would match the default // subscription, also include the default querier. if (this.subscribedToByDefault && !hasExplicitDefaultSubscription) { - this.queriersForSubscription(result, null, options.globalParameters); + this.queriersForSubscription(result, null, options.globalParameters, options.bucketIdTransformer); } } private queriersForSubscription( result: PendingQueriers, subscription: RequestedStream | null, - params: RequestParameters + params: RequestParameters, + bucketIdTransformer: BucketIdTransformer ) { const reason: BucketInclusionReason = subscription != null ? { subscription: subscription.opaque_id } : 'default'; const queriers: BucketParameterQuerier[] = []; try { for (const variant of this.variants) { - const querier = variant.querier(this, reason, params); + const querier = variant.querier(this, reason, params, bucketIdTransformer); if (querier) { queriers.push(querier); } @@ -143,8 +145,6 @@ export class SyncStream implements BucketSource { columns: this.data.columnOutputNames() } }; - - throw new Error('Method not implemented.'); } debugWriteOutputTables(result: Record): void { @@ -184,7 +184,7 @@ export class SyncStream implements BucketSource { bucketIds() { const bucketIds: string[] = []; for (const variant of stream.variants) { - bucketIds.push(...variant.bucketIdsForRow(stream.name, row)); + bucketIds.push(...variant.bucketIdsForRow(stream.name, row, options.bucketIdTransformer)); } return bucketIds; diff --git a/packages/sync-rules/src/streams/variant.ts b/packages/sync-rules/src/streams/variant.ts index 6b509aa1e..825d3a306 100644 --- a/packages/sync-rules/src/streams/variant.ts +++ b/packages/sync-rules/src/streams/variant.ts @@ -2,6 +2,7 @@ import { BucketInclusionReason, ResolvedBucket } from '../BucketDescription.js'; import { BucketParameterQuerier, ParameterLookup } from '../BucketParameterQuerier.js'; import { SourceTableInterface } from '../SourceTableInterface.js'; import { + BucketIdTransformer, EvaluatedParametersResult, EvaluateRowOptions, RequestParameters, @@ -67,8 +68,8 @@ export class StreamVariant { /** * Given a row in the table this stream selects from, returns all ids of buckets to which that row belongs to. */ - bucketIdsForRow(streamName: string, options: TableRow): string[] { - return this.instantiationsForRow(options).map((values) => this.buildBucketId(streamName, values)); + bucketIdsForRow(streamName: string, options: TableRow, transformer: BucketIdTransformer): string[] { + return this.instantiationsForRow(options).map((values) => this.buildBucketId(streamName, values, transformer)); } /** @@ -117,7 +118,12 @@ export class StreamVariant { return this.requestFilters.some((f) => f.type == 'dynamic'); } - querier(stream: SyncStream, reason: BucketInclusionReason, params: RequestParameters): BucketParameterQuerier | null { + querier( + stream: SyncStream, + reason: BucketInclusionReason, + params: RequestParameters, + bucketIdTransformer: BucketIdTransformer + ): BucketParameterQuerier | null { const instantiation = this.partiallyEvaluateParameters(params); if (instantiation == null) { return null; @@ -153,7 +159,7 @@ export class StreamVariant { // When we have no dynamic parameters, the partial evaluation is a full instantiation. const instantiations = this.cartesianProductOfParameterInstantiations(instantiation as SqliteJsonValue[][]); for (const instantiation of instantiations) { - staticBuckets.push(this.resolveBucket(stream, instantiation, reason)); + staticBuckets.push(this.resolveBucket(stream, instantiation, reason, bucketIdTransformer)); } } @@ -198,7 +204,7 @@ export class StreamVariant { perParameterInstantiation as SqliteJsonValue[][] ); - return Promise.resolve(product.map((e) => variant.resolveBucket(stream, e, reason))); + return Promise.resolve(product.map((e) => variant.resolveBucket(stream, e, reason, bucketIdTransformer))); } }; } @@ -299,25 +305,27 @@ export class StreamVariant { * * @param streamName The name of the stream, included in the bucket id * @param instantiation An instantiation for all parameters in this variant. + * @param transformer A transformer adding version information to the inner id. * @returns The generated bucket id */ - private buildBucketId(streamName: string, instantiation: SqliteJsonValue[]) { + private buildBucketId(streamName: string, instantiation: SqliteJsonValue[], transformer: BucketIdTransformer) { if (instantiation.length != this.parameters.length) { throw Error('Internal error, instantiation length mismatch'); } - return `${streamName}|${this.id}${JSONBucketNameSerialize.stringify(instantiation)}`; + return transformer(`${streamName}|${this.id}${JSONBucketNameSerialize.stringify(instantiation)}`); } private resolveBucket( stream: SyncStream, instantiation: SqliteJsonValue[], - reason: BucketInclusionReason + reason: BucketInclusionReason, + bucketIdTransformer: BucketIdTransformer ): ResolvedBucket { return { definition: stream.name, inclusion_reasons: [reason], - bucket: this.buildBucketId(stream.name, instantiation), + bucket: this.buildBucketId(stream.name, instantiation, bucketIdTransformer), priority: stream.priority }; } diff --git a/packages/sync-rules/src/types.ts b/packages/sync-rules/src/types.ts index fc50df7b6..accb0338f 100644 --- a/packages/sync-rules/src/types.ts +++ b/packages/sync-rules/src/types.ts @@ -283,7 +283,23 @@ export interface InputParameter { parametersToLookupValue(parameters: ParameterValueSet): SqliteValue; } -export interface EvaluateRowOptions extends TableRow {} +/** + * Transforms bucket ids generated when evaluating the row by e.g. encoding version information. + * + * Because buckets are recreated on a sync rule redeploy, it makes sense to use different bucket ids (otherwise, clients + * may run into checksum errors causing a sync to take longer than necessary or breaking progress). + * + * So, this transformer receives the original bucket id as generated by defined sync rules, and can prepend a version + * identifier. + * + * Note that this transformation has not been present in older versions of the sync service. To preserve backwards + * compatibility, sync rules will not use this function without an opt-in. + */ +export type BucketIdTransformer = (regularId: string) => string; + +export interface EvaluateRowOptions extends TableRow { + bucketIdTransformer: BucketIdTransformer; +} /** * A row associated with the table it's coming from. diff --git a/packages/sync-rules/src/utils.ts b/packages/sync-rules/src/utils.ts index e58f33b04..234874858 100644 --- a/packages/sync-rules/src/utils.ts +++ b/packages/sync-rules/src/utils.ts @@ -9,7 +9,8 @@ import { SqliteJsonRow, SqliteJsonValue, SqliteRow, - SqliteValue + SqliteValue, + BucketIdTransformer } from './types.js'; import { SyncRuleProcessingError as SyncRulesProcessingError } from './errors.js'; import { CustomArray, CustomObject, CustomSqliteValue } from './types/custom_sqlite_value.js'; @@ -22,11 +23,12 @@ export function isSelectStatement(q: Statement): q is SelectFromStatement { export function getBucketId( descriptor_id: string, bucket_parameters: string[], - params: Record + params: Record, + transformer: BucketIdTransformer ): string { // Important: REAL and INTEGER values matching the same number needs the same representation in the bucket name. const paramArray = bucket_parameters.map((name) => params[`bucket.${name}`]); - return `${descriptor_id}${JSONBucketNameSerialize.stringify(paramArray)}`; + return transformer(`${descriptor_id}${JSONBucketNameSerialize.stringify(paramArray)}`); } const DEPTH_LIMIT = 10; diff --git a/packages/sync-rules/test/src/compatibility.test.ts b/packages/sync-rules/test/src/compatibility.test.ts index 0d15360c2..0b07b47dc 100644 --- a/packages/sync-rules/test/src/compatibility.test.ts +++ b/packages/sync-rules/test/src/compatibility.test.ts @@ -1,7 +1,7 @@ import { describe, expect, test } from 'vitest'; -import { CustomSqliteValue, SqlSyncRules, DateTimeValue, toSyncRulesValue } from '../../src/index.js'; +import { SqlSyncRules, DateTimeValue, toSyncRulesValue } from '../../src/index.js'; -import { ASSETS, PARSE_OPTIONS } from './util.js'; +import { ASSETS, normalizeQuerierOptions, PARSE_OPTIONS } from './util.js'; describe('compatibility options', () => { describe('timestamps', () => { @@ -21,6 +21,7 @@ bucket_definitions: expect( rules.evaluateRow({ sourceTable: ASSETS, + bucketIdTransformer: SqlSyncRules.versionedBucketIdTransformer(''), record: { id: 'id', description: value @@ -48,6 +49,7 @@ config: expect( rules.evaluateRow({ sourceTable: ASSETS, + bucketIdTransformer: SqlSyncRules.versionedBucketIdTransformer(''), record: { id: 'id', description: value @@ -64,6 +66,7 @@ config: streams: stream: query: SELECT id, description FROM assets + auto_subscribe: true config: edition: 2 @@ -73,6 +76,7 @@ config: expect( rules.evaluateRow({ + bucketIdTransformer: SqlSyncRules.versionedBucketIdTransformer('1'), sourceTable: ASSETS, record: { id: 'id', @@ -80,7 +84,20 @@ config: } }) ).toStrictEqual([ - { bucket: 'stream|0[]', data: { description: '2025-08-19T09:21:00Z', id: 'id' }, id: 'id', table: 'assets' } + { bucket: '1#stream|0[]', data: { description: '2025-08-19T09:21:00Z', id: 'id' }, id: 'id', table: 'assets' } + ]); + + expect( + rules.getBucketParameterQuerier( + normalizeQuerierOptions({}, {}, {}, SqlSyncRules.versionedBucketIdTransformer('1')) + ).querier.staticBuckets + ).toStrictEqual([ + { + bucket: '1#stream|0[]', + definition: 'stream', + inclusion_reasons: ['default'], + priority: 3 + } ]); }); @@ -90,16 +107,19 @@ config: streams: stream: query: SELECT id, description FROM assets + auto_subscribe: true config: edition: 2 timestamps_iso8601: false + versioned_bucket_ids: false `, PARSE_OPTIONS ); expect( rules.evaluateRow({ + bucketIdTransformer: SqlSyncRules.versionedBucketIdTransformer('1'), sourceTable: ASSETS, record: { id: 'id', @@ -109,9 +129,75 @@ config: ).toStrictEqual([ { bucket: 'stream|0[]', data: { description: '2025-08-19 09:21:00Z', id: 'id' }, id: 'id', table: 'assets' } ]); + expect( + rules.getBucketParameterQuerier( + normalizeQuerierOptions({}, {}, {}, SqlSyncRules.versionedBucketIdTransformer('1')) + ).querier.staticBuckets + ).toStrictEqual([ + { + bucket: 'stream|0[]', + definition: 'stream', + inclusion_reasons: ['default'], + priority: 3 + } + ]); }); }); + test('can use versioned bucket ids', () => { + const rules = SqlSyncRules.fromYaml( + ` +bucket_definitions: + mybucket: + data: + - SELECT id, description FROM assets + +config: + edition: 1 + versioned_bucket_ids: true + `, + PARSE_OPTIONS + ); + + expect( + rules.evaluateRow({ + sourceTable: ASSETS, + bucketIdTransformer: SqlSyncRules.versionedBucketIdTransformer('1'), + record: { + id: 'id', + description: 'desc' + } + }) + ).toStrictEqual([{ bucket: '1#mybucket[]', data: { description: 'desc', id: 'id' }, id: 'id', table: 'assets' }]); + }); + + test('streams use new options by default', () => { + const rules = SqlSyncRules.fromYaml( + ` +streams: + stream: + query: SELECT id, description FROM assets + +config: + edition: 2 + `, + PARSE_OPTIONS + ); + + expect( + rules.evaluateRow({ + sourceTable: ASSETS, + bucketIdTransformer: SqlSyncRules.versionedBucketIdTransformer('1'), + record: { + id: 'id', + description: new DateTimeValue('2025-08-19T09:21:00Z') + } + }) + ).toStrictEqual([ + { bucket: '1#stream|0[]', data: { description: '2025-08-19T09:21:00Z', id: 'id' }, id: 'id', table: 'assets' } + ]); + }); + test('warning for unknown option', () => { expect(() => { SqlSyncRules.fromYaml( @@ -151,6 +237,7 @@ config: expect( rules.evaluateRow({ sourceTable: ASSETS, + bucketIdTransformer: SqlSyncRules.versionedBucketIdTransformer('1'), record: { id: 'id', description: data @@ -158,7 +245,7 @@ config: }) ).toStrictEqual([ { - bucket: 'mybucket[]', + bucket: withFixedQuirk ? '1#mybucket[]' : 'mybucket[]', data: { description: withFixedQuirk ? '["static value","2025-08-19T09:21:00Z"]' @@ -169,6 +256,19 @@ config: table: 'assets' } ]); + + expect( + rules.getBucketParameterQuerier( + normalizeQuerierOptions({}, {}, {}, SqlSyncRules.versionedBucketIdTransformer('1')) + ).querier.staticBuckets + ).toStrictEqual([ + { + bucket: withFixedQuirk ? '1#mybucket[]' : 'mybucket[]', + definition: 'mybucket', + inclusion_reasons: ['default'], + priority: 3 + } + ]); } }); }); diff --git a/packages/sync-rules/test/src/data_queries.test.ts b/packages/sync-rules/test/src/data_queries.test.ts index 848245334..219cbce04 100644 --- a/packages/sync-rules/test/src/data_queries.test.ts +++ b/packages/sync-rules/test/src/data_queries.test.ts @@ -1,14 +1,31 @@ import { describe, expect, test } from 'vitest'; -import { ExpressionType, SqlDataQuery } from '../../src/index.js'; -import { ASSETS, BASIC_SCHEMA, PARSE_OPTIONS } from './util.js'; +import { ExpressionType, SqlDataQuery, SqlSyncRules } from '../../src/index.js'; +import { ASSETS, BASIC_SCHEMA, identityBucketTransformer, PARSE_OPTIONS } from './util.js'; describe('data queries', () => { + test('uses bucket id transformer', function () { + const sql = 'SELECT * FROM assets WHERE assets.org_id = bucket.org_id'; + const query = SqlDataQuery.fromSql('mybucket', ['org_id'], sql, PARSE_OPTIONS); + expect(query.errors).toEqual([]); + + expect( + query.evaluateRow(ASSETS, { id: 'asset1', org_id: 'org1' }, SqlSyncRules.versionedBucketIdTransformer('1')) + ).toEqual([ + { + bucket: '1#mybucket["org1"]', + table: 'assets', + id: 'asset1', + data: { id: 'asset1', org_id: 'org1' } + } + ]); + }); + test('bucket parameters = query', function () { const sql = 'SELECT * FROM assets WHERE assets.org_id = bucket.org_id'; const query = SqlDataQuery.fromSql('mybucket', ['org_id'], sql, PARSE_OPTIONS); expect(query.errors).toEqual([]); - expect(query.evaluateRow(ASSETS, { id: 'asset1', org_id: 'org1' })).toEqual([ + expect(query.evaluateRow(ASSETS, { id: 'asset1', org_id: 'org1' }, identityBucketTransformer)).toEqual([ { bucket: 'mybucket["org1"]', table: 'assets', @@ -17,7 +34,7 @@ describe('data queries', () => { } ]); - expect(query.evaluateRow(ASSETS, { id: 'asset1', org_id: null })).toEqual([]); + expect(query.evaluateRow(ASSETS, { id: 'asset1', org_id: null }, identityBucketTransformer)).toEqual([]); }); test('bucket parameters IN query', function () { @@ -25,7 +42,13 @@ describe('data queries', () => { const query = SqlDataQuery.fromSql('mybucket', ['category'], sql, PARSE_OPTIONS); expect(query.errors).toEqual([]); - expect(query.evaluateRow(ASSETS, { id: 'asset1', categories: JSON.stringify(['red', 'green']) })).toMatchObject([ + expect( + query.evaluateRow( + ASSETS, + { id: 'asset1', categories: JSON.stringify(['red', 'green']) }, + identityBucketTransformer + ) + ).toMatchObject([ { bucket: 'mybucket["red"]', table: 'assets', @@ -38,7 +61,7 @@ describe('data queries', () => { } ]); - expect(query.evaluateRow(ASSETS, { id: 'asset1', org_id: null })).toEqual([]); + expect(query.evaluateRow(ASSETS, { id: 'asset1', org_id: null }, identityBucketTransformer)).toEqual([]); }); test('static IN data query', function () { @@ -46,7 +69,13 @@ describe('data queries', () => { const query = SqlDataQuery.fromSql('mybucket', [], sql, PARSE_OPTIONS); expect(query.errors).toEqual([]); - expect(query.evaluateRow(ASSETS, { id: 'asset1', categories: JSON.stringify(['red', 'green']) })).toMatchObject([ + expect( + query.evaluateRow( + ASSETS, + { id: 'asset1', categories: JSON.stringify(['red', 'green']) }, + identityBucketTransformer + ) + ).toMatchObject([ { bucket: 'mybucket[]', table: 'assets', @@ -54,7 +83,13 @@ describe('data queries', () => { } ]); - expect(query.evaluateRow(ASSETS, { id: 'asset1', categories: JSON.stringify(['red', 'blue']) })).toEqual([]); + expect( + query.evaluateRow( + ASSETS, + { id: 'asset1', categories: JSON.stringify(['red', 'blue']) }, + identityBucketTransformer + ) + ).toEqual([]); }); test('data IN static query', function () { @@ -62,7 +97,7 @@ describe('data queries', () => { const query = SqlDataQuery.fromSql('mybucket', [], sql, PARSE_OPTIONS); expect(query.errors).toEqual([]); - expect(query.evaluateRow(ASSETS, { id: 'asset1', condition: 'good' })).toMatchObject([ + expect(query.evaluateRow(ASSETS, { id: 'asset1', condition: 'good' }, identityBucketTransformer)).toMatchObject([ { bucket: 'mybucket[]', table: 'assets', @@ -70,7 +105,7 @@ describe('data queries', () => { } ]); - expect(query.evaluateRow(ASSETS, { id: 'asset1', condition: 'bad' })).toEqual([]); + expect(query.evaluateRow(ASSETS, { id: 'asset1', condition: 'bad' }, identityBucketTransformer)).toEqual([]); }); test('table alias', function () { @@ -78,7 +113,7 @@ describe('data queries', () => { const query = SqlDataQuery.fromSql('mybucket', ['org_id'], sql, PARSE_OPTIONS); expect(query.errors).toEqual([]); - expect(query.evaluateRow(ASSETS, { id: 'asset1', org_id: 'org1' })).toEqual([ + expect(query.evaluateRow(ASSETS, { id: 'asset1', org_id: 'org1' }, identityBucketTransformer)).toEqual([ { bucket: 'mybucket["org1"]', table: 'others', diff --git a/packages/sync-rules/test/src/parameter_queries.test.ts b/packages/sync-rules/test/src/parameter_queries.test.ts index 12835dbec..acabec993 100644 --- a/packages/sync-rules/test/src/parameter_queries.test.ts +++ b/packages/sync-rules/test/src/parameter_queries.test.ts @@ -1,6 +1,6 @@ import { describe, expect, test } from 'vitest'; import { ParameterLookup, SqlParameterQuery } from '../../src/index.js'; -import { BASIC_SCHEMA, normalizeTokenParameters, PARSE_OPTIONS } from './util.js'; +import { BASIC_SCHEMA, identityBucketTransformer, normalizeTokenParameters, PARSE_OPTIONS } from './util.js'; import { StaticSqlParameterQuery } from '../../src/StaticSqlParameterQuery.js'; describe('parameter queries', () => { @@ -83,11 +83,19 @@ describe('parameter queries', () => { // We _do_ need to care about the bucket string representation. expect( - query.resolveBucketDescriptions([{ int1: 314, float1: 3.14, float2: 314 }], normalizeTokenParameters({})) + query.resolveBucketDescriptions( + [{ int1: 314, float1: 3.14, float2: 314 }], + normalizeTokenParameters({}), + identityBucketTransformer + ) ).toEqual([{ bucket: 'mybucket[314,3.14,314]', priority: 3 }]); expect( - query.resolveBucketDescriptions([{ int1: 314n, float1: 3.14, float2: 314 }], normalizeTokenParameters({})) + query.resolveBucketDescriptions( + [{ int1: 314n, float1: 3.14, float2: 314 }], + normalizeTokenParameters({}), + identityBucketTransformer + ) ).toEqual([{ bucket: 'mybucket[314,3.14,314]', priority: 3 }]); }); @@ -363,7 +371,8 @@ describe('parameter queries', () => { expect( query.resolveBucketDescriptions( [{ user_id: 'user1' }], - normalizeTokenParameters({ user_id: 'user1', is_admin: true }) + normalizeTokenParameters({ user_id: 'user1', is_admin: true }), + identityBucketTransformer ) ).toEqual([{ bucket: 'mybucket["user1",1]', priority: 3 }]); }); diff --git a/packages/sync-rules/test/src/static_parameter_queries.test.ts b/packages/sync-rules/test/src/static_parameter_queries.test.ts index 611afd7b1..3e2861307 100644 --- a/packages/sync-rules/test/src/static_parameter_queries.test.ts +++ b/packages/sync-rules/test/src/static_parameter_queries.test.ts @@ -1,7 +1,7 @@ import { describe, expect, test } from 'vitest'; -import { RequestParameters, SqlParameterQuery } from '../../src/index.js'; +import { RequestParameters, SqlParameterQuery, SqlSyncRules } from '../../src/index.js'; import { StaticSqlParameterQuery } from '../../src/StaticSqlParameterQuery.js'; -import { normalizeTokenParameters, PARSE_OPTIONS } from './util.js'; +import { identityBucketTransformer, normalizeTokenParameters, PARSE_OPTIONS } from './util.js'; describe('static parameter queries', () => { test('basic query', function () { @@ -9,9 +9,22 @@ describe('static parameter queries', () => { const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery; expect(query.errors).toEqual([]); expect(query.bucketParameters!).toEqual(['user_id']); - expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([ - { bucket: 'mybucket["user1"]', priority: 3 } - ]); + expect( + query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }), identityBucketTransformer) + ).toEqual([{ bucket: 'mybucket["user1"]', priority: 3 }]); + }); + + test('uses bucket id transformer', function () { + const sql = 'SELECT token_parameters.user_id'; + const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery; + expect(query.errors).toEqual([]); + expect(query.bucketParameters!).toEqual(['user_id']); + expect( + query.getStaticBucketDescriptions( + normalizeTokenParameters({ user_id: 'user1' }), + SqlSyncRules.versionedBucketIdTransformer('1') + ) + ).toEqual([{ bucket: '1#mybucket["user1"]', priority: 3 }]); }); test('global query', function () { @@ -19,30 +32,36 @@ describe('static parameter queries', () => { const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery; expect(query.errors).toEqual([]); expect(query.bucketParameters!).toEqual([]); - expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([ - { bucket: 'mybucket[]', priority: 3 } - ]); + expect( + query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }), identityBucketTransformer) + ).toEqual([{ bucket: 'mybucket[]', priority: 3 }]); }); test('query with filter', function () { const sql = 'SELECT token_parameters.user_id WHERE token_parameters.is_admin'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1', is_admin: true }))).toEqual([ - { bucket: 'mybucket["user1"]', priority: 3 } - ]); - expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1', is_admin: false }))).toEqual( - [] - ); + expect( + query.getStaticBucketDescriptions( + normalizeTokenParameters({ user_id: 'user1', is_admin: true }), + identityBucketTransformer + ) + ).toEqual([{ bucket: 'mybucket["user1"]', priority: 3 }]); + expect( + query.getStaticBucketDescriptions( + normalizeTokenParameters({ user_id: 'user1', is_admin: false }), + identityBucketTransformer + ) + ).toEqual([]); }); test('function in select clause', function () { const sql = 'SELECT upper(token_parameters.user_id) as upper_id'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([ - { bucket: 'mybucket["USER1"]', priority: 3 } - ]); + expect( + query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }), identityBucketTransformer) + ).toEqual([{ bucket: 'mybucket["USER1"]', priority: 3 }]); expect(query.bucketParameters!).toEqual(['upper_id']); }); @@ -50,20 +69,24 @@ describe('static parameter queries', () => { const sql = "SELECT WHERE upper(token_parameters.role) = 'ADMIN'"; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ role: 'admin' }))).toEqual([ - { bucket: 'mybucket[]', priority: 3 } - ]); - expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ role: 'user' }))).toEqual([]); + expect( + query.getStaticBucketDescriptions(normalizeTokenParameters({ role: 'admin' }), identityBucketTransformer) + ).toEqual([{ bucket: 'mybucket[]', priority: 3 }]); + expect( + query.getStaticBucketDescriptions(normalizeTokenParameters({ role: 'user' }), identityBucketTransformer) + ).toEqual([]); }); test('comparison in filter clause', function () { const sql = 'SELECT WHERE token_parameters.id1 = token_parameters.id2'; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ id1: 't1', id2: 't1' }))).toEqual([ - { bucket: 'mybucket[]', priority: 3 } - ]); - expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ id1: 't1', id2: 't2' }))).toEqual([]); + expect( + query.getStaticBucketDescriptions(normalizeTokenParameters({ id1: 't1', id2: 't1' }), identityBucketTransformer) + ).toEqual([{ bucket: 'mybucket[]', priority: 3 }]); + expect( + query.getStaticBucketDescriptions(normalizeTokenParameters({ id1: 't1', id2: 't2' }), identityBucketTransformer) + ).toEqual([]); }); test('request.parameters()', function () { @@ -79,9 +102,9 @@ describe('static parameter queries', () => { ) as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketDescriptions(normalizeTokenParameters({}, { org_id: 'test' }))).toEqual([ - { bucket: 'mybucket["test"]', priority: 3 } - ]); + expect( + query.getStaticBucketDescriptions(normalizeTokenParameters({}, { org_id: 'test' }), identityBucketTransformer) + ).toEqual([{ bucket: 'mybucket["test"]', priority: 3 }]); }); test('request.jwt()', function () { @@ -90,9 +113,9 @@ describe('static parameter queries', () => { expect(query.errors).toEqual([]); expect(query.bucketParameters).toEqual(['user_id']); - expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([ - { bucket: 'mybucket["user1"]', priority: 3 } - ]); + expect( + query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }), identityBucketTransformer) + ).toEqual([{ bucket: 'mybucket["user1"]', priority: 3 }]); }); test('request.user_id()', function () { @@ -101,43 +124,45 @@ describe('static parameter queries', () => { expect(query.errors).toEqual([]); expect(query.bucketParameters).toEqual(['user_id']); - expect(query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }))).toEqual([ - { bucket: 'mybucket["user1"]', priority: 3 } - ]); + expect( + query.getStaticBucketDescriptions(normalizeTokenParameters({ user_id: 'user1' }), identityBucketTransformer) + ).toEqual([{ bucket: 'mybucket["user1"]', priority: 3 }]); }); test('static value', function () { const sql = `SELECT WHERE 1`; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([ - { bucket: 'mybucket[]', priority: 3 } - ]); + expect( + query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}), identityBucketTransformer) + ).toEqual([{ bucket: 'mybucket[]', priority: 3 }]); }); test('static expression (1)', function () { const sql = `SELECT WHERE 1 = 1`; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([ - { bucket: 'mybucket[]', priority: 3 } - ]); + expect( + query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}), identityBucketTransformer) + ).toEqual([{ bucket: 'mybucket[]', priority: 3 }]); }); test('static expression (2)', function () { const sql = `SELECT WHERE 1 != 1`; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([]); + expect( + query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}), identityBucketTransformer) + ).toEqual([]); }); test('static IN expression', function () { const sql = `SELECT WHERE 'admin' IN '["admin", "superuser"]'`; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([ - { bucket: 'mybucket[]', priority: 3 } - ]); + expect( + query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}), identityBucketTransformer) + ).toEqual([{ bucket: 'mybucket[]', priority: 3 }]); }); test('IN for permissions in request.jwt() (1)', function () { @@ -146,10 +171,16 @@ describe('static parameter queries', () => { const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery; expect(query.errors).toEqual([]); expect( - query.getStaticBucketDescriptions(new RequestParameters({ sub: '', permissions: ['write', 'read:users'] }, {})) + query.getStaticBucketDescriptions( + new RequestParameters({ sub: '', permissions: ['write', 'read:users'] }, {}), + identityBucketTransformer + ) ).toEqual([{ bucket: 'mybucket[1]', priority: 3 }]); expect( - query.getStaticBucketDescriptions(new RequestParameters({ sub: '', permissions: ['write', 'write:users'] }, {})) + query.getStaticBucketDescriptions( + new RequestParameters({ sub: '', permissions: ['write', 'write:users'] }, {}), + identityBucketTransformer + ) ).toEqual([{ bucket: 'mybucket[0]', priority: 3 }]); }); @@ -159,10 +190,16 @@ describe('static parameter queries', () => { const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery; expect(query.errors).toEqual([]); expect( - query.getStaticBucketDescriptions(new RequestParameters({ sub: '', permissions: ['write', 'read:users'] }, {})) + query.getStaticBucketDescriptions( + new RequestParameters({ sub: '', permissions: ['write', 'read:users'] }, {}), + identityBucketTransformer + ) ).toEqual([{ bucket: 'mybucket[]', priority: 3 }]); expect( - query.getStaticBucketDescriptions(new RequestParameters({ sub: '', permissions: ['write', 'write:users'] }, {})) + query.getStaticBucketDescriptions( + new RequestParameters({ sub: '', permissions: ['write', 'write:users'] }, {}), + identityBucketTransformer + ) ).toEqual([]); }); @@ -170,10 +207,18 @@ describe('static parameter queries', () => { const sql = `SELECT WHERE request.jwt() ->> 'role' IN '["admin", "superuser"]'`; const query = SqlParameterQuery.fromSql('mybucket', sql, PARSE_OPTIONS, '1') as StaticSqlParameterQuery; expect(query.errors).toEqual([]); - expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '', role: 'superuser' }, {}))).toEqual([ - { bucket: 'mybucket[]', priority: 3 } - ]); - expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '', role: 'superadmin' }, {}))).toEqual([]); + expect( + query.getStaticBucketDescriptions( + new RequestParameters({ sub: '', role: 'superuser' }, {}), + identityBucketTransformer + ) + ).toEqual([{ bucket: 'mybucket[]', priority: 3 }]); + expect( + query.getStaticBucketDescriptions( + new RequestParameters({ sub: '', role: 'superadmin' }, {}), + identityBucketTransformer + ) + ).toEqual([]); }); test('case-sensitive queries (1)', () => { diff --git a/packages/sync-rules/test/src/streams.test.ts b/packages/sync-rules/test/src/streams.test.ts index 9352f16cb..cad559439 100644 --- a/packages/sync-rules/test/src/streams.test.ts +++ b/packages/sync-rules/test/src/streams.test.ts @@ -12,6 +12,7 @@ import { SourceTableInterface, SqliteJsonRow, SqliteRow, + SqlSyncRules, StaticSchema, StreamParseOptions, SyncStream, @@ -33,19 +34,19 @@ describe('streams', () => { const desc = parseStream('SELECT * FROM comments'); expect(desc.variants).toHaveLength(1); - expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo' })).toStrictEqual(['stream|0[]']); - expect(desc.evaluateRow({ sourceTable: USERS, record: { id: 'foo' } })).toHaveLength(0); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo' })).toStrictEqual(['1#stream|0[]']); + expect(desc.evaluateRow({ sourceTable: USERS, bucketIdTransformer, record: { id: 'foo' } })).toHaveLength(0); }); test('row condition', () => { const desc = parseStream('SELECT * FROM comments WHERE length(content) > 5'); expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'a' })).toStrictEqual([]); - expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'aaaaaa' })).toStrictEqual(['stream|0[]']); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'aaaaaa' })).toStrictEqual(['1#stream|0[]']); }); test('stream parameter', () => { const desc = parseStream("SELECT * FROM comments WHERE issue_id = subscription.parameter('id')"); - expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', issue_id: 'a' })).toStrictEqual(['stream|0["a"]']); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', issue_id: 'a' })).toStrictEqual(['1#stream|0["a"]']); }); test('row filter and stream parameter', async () => { @@ -54,11 +55,11 @@ describe('streams', () => { ); expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'a', issue_id: 'a' })).toStrictEqual([]); expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'aaaaaa', issue_id: 'i' })).toStrictEqual([ - 'stream|0["i"]' + '1#stream|0["i"]' ]); expect(await queryBucketIds(desc, { parameters: { id: 'subscribed_issue' } })).toStrictEqual([ - 'stream|0["subscribed_issue"]' + '1#stream|0["subscribed_issue"]' ]); }); @@ -67,8 +68,8 @@ describe('streams', () => { const desc = parseStream("SELECT * FROM issues WHERE owner_id = auth.user_id() OR auth.parameter('is_admin')"); expect(evaluateBucketIds(desc, ISSUES, { id: 'foo', owner_id: 'u1' })).toStrictEqual([ - 'stream|0["u1"]', - 'stream|1[]' + '1#stream|0["u1"]', + '1#stream|1[]' ]); expect( @@ -78,7 +79,7 @@ describe('streams', () => { is_admin: false } }) - ).toStrictEqual(['stream|0["u1"]']); + ).toStrictEqual(['1#stream|0["u1"]']); expect( await queryBucketIds(desc, { @@ -87,15 +88,17 @@ describe('streams', () => { is_admin: true } }) - ).toStrictEqual(['stream|0["u1"]', 'stream|1[]']); + ).toStrictEqual(['1#stream|0["u1"]', '1#stream|1[]']); }); test('parameter match or row condition', async () => { const desc = parseStream('SELECT * FROM issues WHERE owner_id = auth.user_id() OR LENGTH(name) = 3'); - expect(evaluateBucketIds(desc, ISSUES, { id: 'foo', owner_id: 'a', name: '' })).toStrictEqual(['stream|0["a"]']); + expect(evaluateBucketIds(desc, ISSUES, { id: 'foo', owner_id: 'a', name: '' })).toStrictEqual([ + '1#stream|0["a"]' + ]); expect(evaluateBucketIds(desc, ISSUES, { id: 'foo', owner_id: 'a', name: 'aaa' })).toStrictEqual([ - 'stream|0["a"]', - 'stream|1[]' + '1#stream|0["a"]', + '1#stream|1[]' ]); expect( @@ -104,16 +107,16 @@ describe('streams', () => { user_id: 'u1' } }) - ).toStrictEqual(['stream|0["u1"]', 'stream|1[]']); + ).toStrictEqual(['1#stream|0["u1"]', '1#stream|1[]']); }); test('row condition or parameter condition', async () => { const desc = parseStream("SELECT * FROM comments WHERE LENGTH(content) > 5 OR auth.parameter('is_admin')"); - expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'short' })).toStrictEqual(['stream|1[]']); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'short' })).toStrictEqual(['1#stream|1[]']); expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'longer' })).toStrictEqual([ - 'stream|0[]', - 'stream|1[]' + '1#stream|0[]', + '1#stream|1[]' ]); expect( @@ -122,14 +125,14 @@ describe('streams', () => { is_admin: false } }) - ).toStrictEqual(['stream|0[]']); + ).toStrictEqual(['1#stream|0[]']); expect( await queryBucketIds(desc, { token_parameters: { is_admin: true } }) - ).toStrictEqual(['stream|0[]', 'stream|1[]']); + ).toStrictEqual(['1#stream|0[]', '1#stream|1[]']); }); test('row condition or row condition', () => { @@ -141,14 +144,14 @@ describe('streams', () => { expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'short', tagged_users: '[]' })).toStrictEqual([]); expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'longer', tagged_users: '[]' })).toStrictEqual([ - 'stream|0[]' + '1#stream|0[]' ]); expect( evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'longer', tagged_users: '["a","b"]' }) - ).toStrictEqual(['stream|0[]']); + ).toStrictEqual(['1#stream|0[]']); expect( evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'short', tagged_users: '["a","b"]' }) - ).toStrictEqual(['stream|0[]']); + ).toStrictEqual(['1#stream|0[]']); }); test('request condition or request condition', async () => { @@ -156,7 +159,7 @@ describe('streams', () => { // Complex conditions that only operate on request data don't need variants. expect(desc.variants).toHaveLength(1); - expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'whatever]' })).toStrictEqual(['stream|0[]']); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo', content: 'whatever]' })).toStrictEqual(['1#stream|0[]']); expect( await queryBucketIds(desc, { token_parameters: { @@ -172,7 +175,7 @@ describe('streams', () => { b: false } }) - ).toStrictEqual(['stream|0[]']); + ).toStrictEqual(['1#stream|0[]']); }); test('subquery or token parameter', async () => { @@ -181,8 +184,8 @@ describe('streams', () => { ); expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'i1' })).toStrictEqual([ - 'stream|0["i1"]', - 'stream|1[]' + '1#stream|0["i1"]', + '1#stream|1[]' ]); expect(desc.evaluateParameterRow(ISSUES, { id: 'i1', owner_id: 'u1' })).toStrictEqual([ @@ -203,10 +206,10 @@ describe('streams', () => { } expect( await queryBucketIds(desc, { token_parameters: { user_id: 'u1', is_admin: false }, getParameterSets }) - ).toStrictEqual(['stream|0["i1"]']); + ).toStrictEqual(['1#stream|0["i1"]']); expect( await queryBucketIds(desc, { token_parameters: { user_id: 'u1', is_admin: true }, getParameterSets }) - ).toStrictEqual(['stream|1[]', 'stream|0["i1"]']); + ).toStrictEqual(['1#stream|1[]', '1#stream|0["i1"]']); }); }); @@ -228,7 +231,7 @@ describe('streams', () => { } ]); expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'issue_id' })).toStrictEqual([ - 'stream|0["issue_id"]' + '1#stream|0["issue_id"]' ]); expect( @@ -240,7 +243,7 @@ describe('streams', () => { return [{ result: 'issue_id' }]; } }) - ).toStrictEqual(['stream|0["issue_id"]']); + ).toStrictEqual(['1#stream|0["issue_id"]']); }); test('parameter value in subquery', async () => { @@ -270,7 +273,7 @@ describe('streams', () => { return [{ result: 'u' }]; } }) - ).toStrictEqual(['stream|0[]']); + ).toStrictEqual(['1#stream|0[]']); // And not for others expect( @@ -290,7 +293,10 @@ describe('streams', () => { id IN (SELECT user_b FROM friends WHERE user_a = auth.user_id()) `); - expect(evaluateBucketIds(desc, USERS, { id: 'a', name: 'a' })).toStrictEqual(['stream|0["a"]', 'stream|1["a"]']); + expect(evaluateBucketIds(desc, USERS, { id: 'a', name: 'a' })).toStrictEqual([ + '1#stream|0["a"]', + '1#stream|1["a"]' + ]); expect(desc.evaluateParameterRow(FRIENDS, { user_a: 'a', user_b: 'b' })).toStrictEqual([ { @@ -328,19 +334,19 @@ describe('streams', () => { token_parameters: { user_id: 'a' }, getParameterSets }) - ).toStrictEqual(['stream|1["b"]']); + ).toStrictEqual(['1#stream|1["b"]']); }); test('on parameter data', async () => { const desc = parseStream("SELECT * FROM comments WHERE issue_id IN (subscription.parameters() -> 'issue_id')"); - expect(evaluateBucketIds(desc, COMMENTS, { id: 'a', issue_id: 'i' })).toStrictEqual(['stream|0["i"]']); + expect(evaluateBucketIds(desc, COMMENTS, { id: 'a', issue_id: 'i' })).toStrictEqual(['1#stream|0["i"]']); expect( await queryBucketIds(desc, { token_parameters: { user_id: 'a' }, parameters: { issue_id: ['i1', 'i2'] } }) - ).toStrictEqual(['stream|0["i1"]', 'stream|0["i2"]']); + ).toStrictEqual(['1#stream|0["i1"]', '1#stream|0["i2"]']); }); test('on parameter data and table', async () => { @@ -349,7 +355,7 @@ describe('streams', () => { ); expect(evaluateBucketIds(desc, COMMENTS, { id: 'a', issue_id: 'i', label: 'l' })).toStrictEqual([ - 'stream|0["i","l"]' + '1#stream|0["i","l"]' ]); expect( await queryBucketIds(desc, { @@ -362,7 +368,12 @@ describe('streams', () => { return [{ result: 'i1' }, { result: 'i2' }]; } }) - ).toStrictEqual(['stream|0["i1","l1"]', 'stream|0["i1","l2"]', 'stream|0["i2","l1"]', 'stream|0["i2","l2"]']); + ).toStrictEqual([ + '1#stream|0["i1","l1"]', + '1#stream|0["i1","l2"]', + '1#stream|0["i2","l1"]', + '1#stream|0["i2","l2"]' + ]); }); }); @@ -384,8 +395,8 @@ describe('streams', () => { } ]); expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', tagged_users: '["a", "b"]' })).toStrictEqual([ - 'stream|0["a"]', - 'stream|0["b"]' + '1#stream|0["a"]', + '1#stream|0["b"]' ]); expect( @@ -397,7 +408,7 @@ describe('streams', () => { return [{ result: 'issue_id' }]; } }) - ).toStrictEqual(['stream|0["issue_id"]']); + ).toStrictEqual(['1#stream|0["issue_id"]']); }); }); @@ -488,7 +499,7 @@ describe('streams', () => { } ]); expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'issue_id' })).toStrictEqual([ - 'stream|0["issue_id"]' + '1#stream|0["issue_id"]' ]); expect( @@ -500,7 +511,7 @@ describe('streams', () => { return [{ result: 'issue_id' }]; } }) - ).toStrictEqual(['stream|0["issue_id"]']); + ).toStrictEqual(['1#stream|0["issue_id"]']); }); test('negated or', () => { @@ -509,7 +520,7 @@ describe('streams', () => { ); expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'issue_id', content: 'foo' })).toStrictEqual([ - 'stream|0["issue_id"]' + '1#stream|0["issue_id"]' ]); expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'issue_id', content: 'foooo' })).toStrictEqual([]); }); @@ -521,11 +532,11 @@ describe('streams', () => { expect(desc.variants).toHaveLength(2); expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'issue_id', content: 'foo' })).toStrictEqual([ - 'stream|0[]', - 'stream|1["issue_id"]' + '1#stream|0[]', + '1#stream|1["issue_id"]' ]); expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'issue_id', content: 'foooo' })).toStrictEqual([ - 'stream|1["issue_id"]' + '1#stream|1["issue_id"]' ]); }); @@ -542,8 +553,8 @@ describe('streams', () => { expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'issue_id', content: 'a' })).toStrictEqual([]); expect(evaluateBucketIds(desc, COMMENTS, { id: 'c', issue_id: 'issue_id', content: 'aaa' })).toStrictEqual([ - 'stream|0["issue_id"]', - 'stream|1[]' + '1#stream|0["issue_id"]', + '1#stream|1[]' ]); expect( @@ -555,7 +566,7 @@ describe('streams', () => { return [{ result: 'issue_id' }]; } }) - ).toStrictEqual(['stream|0["issue_id"]']); + ).toStrictEqual(['1#stream|0["issue_id"]']); expect( await queryBucketIds(desc, { token_parameters: { user_id: 'user1', is_admin: true }, @@ -565,7 +576,7 @@ describe('streams', () => { return [{ result: 'issue_id' }]; } }) - ).toStrictEqual(['stream|1[]', 'stream|0["issue_id"]']); + ).toStrictEqual(['1#stream|1[]', '1#stream|0["issue_id"]']); }); }); }); @@ -628,8 +639,10 @@ const options: StreamParseOptions = { compatibility: new CompatibilityContext(CompatibilityEdition.SYNC_STREAMS) }; +const bucketIdTransformer = SqlSyncRules.versionedBucketIdTransformer('1'); + function evaluateBucketIds(stream: SyncStream, sourceTable: SourceTableInterface, record: SqliteRow) { - return stream.evaluateRow({ sourceTable, record }).map((r) => { + return stream.evaluateRow({ sourceTable, record, bucketIdTransformer }).map((r) => { if ('error' in r) { throw new Error(`Unexpected error evaluating row: ${r.error}`); } @@ -655,7 +668,8 @@ async function createQueriers( normalizeQuerierOptions( options?.token_parameters ?? {}, {}, - { stream: [{ opaque_id: 0, parameters: options?.parameters ?? null }] } + { stream: [{ opaque_id: 0, parameters: options?.parameters ?? null }] }, + bucketIdTransformer ) ); diff --git a/packages/sync-rules/test/src/sync_rules.test.ts b/packages/sync-rules/test/src/sync_rules.test.ts index eb92f85a8..9332418d5 100644 --- a/packages/sync-rules/test/src/sync_rules.test.ts +++ b/packages/sync-rules/test/src/sync_rules.test.ts @@ -13,6 +13,8 @@ import { import { SqlBucketDescriptor } from '../../src/SqlBucketDescriptor.js'; describe('sync rules', () => { + const bucketIdTransformer = SqlSyncRules.versionedBucketIdTransformer(''); + test('parse empty sync rules', () => { const rules = SqlSyncRules.fromYaml('bucket_definitions: {}', PARSE_OPTIONS); expect(rules.bucketSources).toEqual([]); @@ -34,7 +36,13 @@ bucket_definitions: const dataQuery = bucket.dataQueries[0]; expect(dataQuery.bucketParameters).toEqual([]); expect(dataQuery.columnOutputNames()).toEqual(['id', 'description']); - expect(rules.evaluateRow({ sourceTable: ASSETS, record: { id: 'asset1', description: 'test' } })).toEqual([ + expect( + rules.evaluateRow({ + sourceTable: ASSETS, + bucketIdTransformer, + record: { id: 'asset1', description: 'test' } + }) + ).toEqual([ { table: 'assets', id: 'asset1', @@ -134,6 +142,7 @@ bucket_definitions: expect( rules.evaluateRow({ sourceTable: ASSETS, + bucketIdTransformer, record: { id: 'asset1', description: 'test', user_id: 'user1', device_id: 'device1' } }) ).toEqual([ @@ -150,6 +159,7 @@ bucket_definitions: expect( rules.evaluateRow({ sourceTable: ASSETS, + bucketIdTransformer, record: { id: 'asset1', description: 'test', user_id: 'user1', archived: 1, device_id: 'device1' } }) ).toEqual([]); @@ -179,6 +189,7 @@ bucket_definitions: expect( rules.evaluateRow({ sourceTable: ASSETS, + bucketIdTransformer, record: { id: 'asset1', description: 'test', user_id: 'user1' } }) ).toEqual([ @@ -195,6 +206,7 @@ bucket_definitions: expect( rules.evaluateRow({ sourceTable: ASSETS, + bucketIdTransformer, record: { id: 'asset1', description: 'test', owner_id: 'user1' } }) ).toEqual([ @@ -320,6 +332,7 @@ bucket_definitions: expect( rules.evaluateRow({ sourceTable: ASSETS, + bucketIdTransformer, record: { id: 'asset1', description: 'test', user_id: 'user1' } }) ).toEqual([ @@ -357,6 +370,7 @@ bucket_definitions: expect( rules.evaluateRow({ sourceTable: ASSETS, + bucketIdTransformer, record: { id: 'asset1', description: 'test', user_id: 'user1' } }) ).toEqual([ @@ -385,6 +399,7 @@ bucket_definitions: expect( rules.evaluateRow({ sourceTable: ASSETS, + bucketIdTransformer, record: { id: 'asset1', data: JSON.stringify({ count: 5, bool: true }) } }) ).toEqual([ @@ -419,6 +434,7 @@ bucket_definitions: expect( rules.evaluateRow({ sourceTable: ASSETS, + bucketIdTransformer, record: { id: 'asset1', description: 'test', @@ -461,7 +477,11 @@ bucket_definitions: ); expect( - rules.evaluateRow({ sourceTable: ASSETS, record: { id: 'asset1', description: 'test', role: 'admin' } }) + rules.evaluateRow({ + sourceTable: ASSETS, + bucketIdTransformer, + record: { id: 'asset1', description: 'test', role: 'admin' } + }) ).toEqual([ { bucket: 'mybucket[1]', @@ -477,7 +497,11 @@ bucket_definitions: ]); expect( - rules.evaluateRow({ sourceTable: ASSETS, record: { id: 'asset2', description: 'test', role: 'normal' } }) + rules.evaluateRow({ + sourceTable: ASSETS, + bucketIdTransformer, + record: { id: 'asset2', description: 'test', role: 'normal' } + }) ).toEqual([ { bucket: 'mybucket[1]', @@ -530,7 +554,7 @@ bucket_definitions: PARSE_OPTIONS ); - expect(rules.evaluateRow({ sourceTable: ASSETS, record: { id: 'asset1' } })).toEqual([ + expect(rules.evaluateRow({ sourceTable: ASSETS, bucketIdTransformer, record: { id: 'asset1' } })).toEqual([ { bucket: 'mybucket[]', id: 'asset1', @@ -561,7 +585,11 @@ bucket_definitions: ).toMatchObject({ staticBuckets: [{ bucket: 'mybucket[314,3.14,314]', priority: 3 }] }); expect( - rules.evaluateRow({ sourceTable: ASSETS, record: { id: 'asset1', int1: 314n, float1: 3.14, float2: 314 } }) + rules.evaluateRow({ + sourceTable: ASSETS, + bucketIdTransformer, + record: { id: 'asset1', int1: 314n, float1: 3.14, float2: 314 } + }) ).toEqual([ { bucket: 'mybucket[314,3.14,314]', @@ -606,6 +634,7 @@ bucket_definitions: expect( rules.evaluateRow({ sourceTable: new TestSourceTable('assets_123'), + bucketIdTransformer, record: { client_id: 'asset1', description: 'test', archived: 0n, other_id: 'other1' } }) ).toEqual([ @@ -646,6 +675,7 @@ bucket_definitions: expect( rules.evaluateRow({ sourceTable: new TestSourceTable('assets_123'), + bucketIdTransformer, record: { client_id: 'asset1', description: 'test', archived: 0n, other_id: 'other1' } }) ).toEqual([ @@ -679,6 +709,7 @@ bucket_definitions: expect( rules.evaluateRow({ sourceTable: ASSETS, + bucketIdTransformer, record: { id: 'asset1', description: 'test', archived: 0n } }) ).toEqual([ @@ -714,6 +745,7 @@ bucket_definitions: expect( rules.evaluateRow({ sourceTable: ASSETS, + bucketIdTransformer, record: { id: 'asset1' } }) ).toEqual([ diff --git a/packages/sync-rules/test/src/table_valued_function_queries.test.ts b/packages/sync-rules/test/src/table_valued_function_queries.test.ts index 596ade82b..4edd00e3c 100644 --- a/packages/sync-rules/test/src/table_valued_function_queries.test.ts +++ b/packages/sync-rules/test/src/table_valued_function_queries.test.ts @@ -1,7 +1,7 @@ import { describe, expect, test } from 'vitest'; import { RequestParameters, SqlParameterQuery } from '../../src/index.js'; import { StaticSqlParameterQuery } from '../../src/StaticSqlParameterQuery.js'; -import { PARSE_OPTIONS } from './util.js'; +import { identityBucketTransformer, PARSE_OPTIONS } from './util.js'; describe('table-valued function queries', () => { test('json_each(array param)', function () { @@ -18,7 +18,12 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucketParameters).toEqual(['v']); - expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, { array: [1, 2, 3] }))).toEqual([ + expect( + query.getStaticBucketDescriptions( + new RequestParameters({ sub: '' }, { array: [1, 2, 3] }), + identityBucketTransformer + ) + ).toEqual([ { bucket: 'mybucket[1]', priority: 3 }, { bucket: 'mybucket[2]', priority: 3 }, { bucket: 'mybucket[3]', priority: 3 } @@ -31,7 +36,9 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucketParameters).toEqual(['v']); - expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([ + expect( + query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}), identityBucketTransformer) + ).toEqual([ { bucket: 'mybucket[1]', priority: 3 }, { bucket: 'mybucket[2]', priority: 3 }, { bucket: 'mybucket[3]', priority: 3 } @@ -44,7 +51,9 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucketParameters).toEqual(['v']); - expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([]); + expect( + query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}), identityBucketTransformer) + ).toEqual([]); }); test('json_each(array param not present)', function () { @@ -61,7 +70,9 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucketParameters).toEqual(['v']); - expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([]); + expect( + query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}), identityBucketTransformer) + ).toEqual([]); }); test('json_each(array param not present, ifnull)', function () { @@ -78,7 +89,9 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucketParameters).toEqual(['v']); - expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([]); + expect( + query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}), identityBucketTransformer) + ).toEqual([]); }); test('json_each on json_keys', function () { @@ -87,7 +100,9 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucketParameters).toEqual(['value']); - expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}))).toEqual([ + expect( + query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, {}), identityBucketTransformer) + ).toEqual([ { bucket: 'mybucket["a"]', priority: 3 }, { bucket: 'mybucket["b"]', priority: 3 }, { bucket: 'mybucket["c"]', priority: 3 } @@ -108,7 +123,12 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucketParameters).toEqual(['value']); - expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, { array: [1, 2, 3] }))).toEqual([ + expect( + query.getStaticBucketDescriptions( + new RequestParameters({ sub: '' }, { array: [1, 2, 3] }), + identityBucketTransformer + ) + ).toEqual([ { bucket: 'mybucket[1]', priority: 3 }, { bucket: 'mybucket[2]', priority: 3 }, { bucket: 'mybucket[3]', priority: 3 } @@ -129,7 +149,12 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucketParameters).toEqual(['value']); - expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, { array: [1, 2, 3] }))).toEqual([ + expect( + query.getStaticBucketDescriptions( + new RequestParameters({ sub: '' }, { array: [1, 2, 3] }), + identityBucketTransformer + ) + ).toEqual([ { bucket: 'mybucket[1]', priority: 3 }, { bucket: 'mybucket[2]', priority: 3 }, { bucket: 'mybucket[3]', priority: 3 } @@ -150,7 +175,12 @@ describe('table-valued function queries', () => { expect(query.errors).toEqual([]); expect(query.bucketParameters).toEqual(['v']); - expect(query.getStaticBucketDescriptions(new RequestParameters({ sub: '' }, { array: [1, 2, 3] }))).toEqual([ + expect( + query.getStaticBucketDescriptions( + new RequestParameters({ sub: '' }, { array: [1, 2, 3] }), + identityBucketTransformer + ) + ).toEqual([ { bucket: 'mybucket[2]', priority: 3 }, { bucket: 'mybucket[3]', priority: 3 } ]); @@ -182,7 +212,8 @@ describe('table-valued function queries', () => { ] }, {} - ) + ), + identityBucketTransformer ) ).toEqual([{ bucket: 'mybucket[1]', priority: 3 }]); }); diff --git a/packages/sync-rules/test/src/util.ts b/packages/sync-rules/test/src/util.ts index d5f2f6a7f..c86667e70 100644 --- a/packages/sync-rules/test/src/util.ts +++ b/packages/sync-rules/test/src/util.ts @@ -1,5 +1,6 @@ import { CompatibilityContext, + BucketIdTransformer, DEFAULT_TAG, GetQuerierOptions, RequestedStream, @@ -68,12 +69,18 @@ export function normalizeTokenParameters( export function normalizeQuerierOptions( token_parameters: Record, user_parameters?: Record, - streams?: Record + streams?: Record, + bucketIdTransformer?: BucketIdTransformer ): GetQuerierOptions { const globalParameters = normalizeTokenParameters(token_parameters, user_parameters); return { globalParameters, hasDefaultStreams: true, - streams: streams ?? {} + streams: streams ?? {}, + bucketIdTransformer: bucketIdTransformer ?? identityBucketTransformer }; } + +export function identityBucketTransformer(id: string) { + return id; +}