diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3a330ec70..08656dd52 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -721,6 +721,9 @@ importers: jose: specifier: ^4.15.1 version: 4.15.9 + undici: + specifier: ^7.15.0 + version: 7.15.0 ws: specifier: ^8.18.0 version: 8.18.0 @@ -3890,6 +3893,10 @@ packages: undici-types@6.21.0: resolution: {integrity: sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==} + undici@7.15.0: + resolution: {integrity: sha512-7oZJCPvvMvTd0OlqWsIxTuItTpJBpU1tcbVl24FMn3xt3+VSunwUasmfPJRE57oNO1KsZ4PgA1xTdAX4hq8NyQ==} + engines: {node: '>=20.18.1'} + unique-filename@2.0.1: resolution: {integrity: sha512-ODWHtkkdx3IAR+veKxFV+VBkUMcN+FaqzUUd7IZzt+0zhDZFPFxhlqwPF3YQvMHx1TD0tdgYl+kuPnJ8E6ql7A==} engines: {node: ^12.13.0 || ^14.15.0 || >=16.0.0} @@ -7482,6 +7489,8 @@ snapshots: undici-types@6.21.0: {} + undici@7.15.0: {} + unique-filename@2.0.1: dependencies: unique-slug: 3.0.0 diff --git a/test-client/README.md b/test-client/README.md index aedad978f..b07e25e56 100644 --- a/test-client/README.md +++ b/test-client/README.md @@ -4,17 +4,34 @@ This is a minimal client demonstrating direct usage of the HTTP stream sync API. For a full implementation, see our client SDKs. -## Usage +## Setup -```sh -# In project root +1. Install dependencies on the project root + +```shell +# In project root directory pnpm install pnpm build:packages -# In this folder +``` + +2. Build the test-client in the `test-client` directory + +```shell +# In the test-client directory pnpm build -node dist/bin.js fetch-operations --token --endpoint http://localhost:8080 +``` -# More examples: +## Usage + +### fetch-operations + +The `fetch-operations` command downloads data for a single checkpoint, and outputs a normalized form: one CLEAR operation, followed by the latest PUT operation for each row. This normalized form is still split per bucket. The output is not affected by compacting, but can be affected by replication order. + +To avoid normalizing the data, use the `--raw` option. This may include additional CLEAR, MOVE, REMOVE and duplicate PUT operations. + +```sh +# If the endpoint is not available in the token aud field, add the --endpoint argument +node dist/bin.js fetch-operations --token --endpoint http://localhost:8080 # If the endpoint is present in token aud field, it can be omitted from args: node dist/bin.js fetch-operations --token @@ -29,12 +46,35 @@ node dist/bin.js fetch-operations --config path/to/powersync.yaml node dist/bin.js fetch-operations --config path/to/powersync.yaml --sub test-user ``` -The `fetch-operations` command downloads data for a single checkpoint, and outputs a normalized form: one CLEAR operation, followed by the latest PUT operation for each row. This normalized form is still split per bucket. The output is not affected by compacting, but can be affected by replication order. +### generate-token -To avoid normalizing the data, use the `--raw` option. This may include additional CLEAR, MOVE, REMOVE and duplicate PUT operations. - -To generate a token without downloading data, use the `generate-token` command: +Used to generate a JWT token based on your current PowerSync YAML config. ```sh node dist/bin.js generate-token --config path/to/powersync.yaml --sub test-user ``` + +### concurrent-connections + +Use this command to simulate concurrent connections to a PowerSync instance. This can be used for performance benchmarking +and other load-testing use cases. There are two modes available, `websocket` or `http`. By default, the command uses the +`http` mode. + +```shell +# Send two concurrent requests to request a download of sync operations using -n to specify the number of connections +node ./dist/bin.js concurrent-connections -n 2 -t + +# Send two concurrent requests to request a download of sync operations using websocket mode +node ./dist/bin.js concurrent-connections -n 2 -t -m websocket +``` + +Once the sync has completed for a connection the command will print the `op_id`, `ops`, `bytes` and `duration` +each connection. + +To see what rows are being synced, specify `--print=id`, or another relevant field. This will be included +in the output as a `data:` array with each checkpoint. + +```shell +# Send two concurrent requests and print the name field, as an example. +node ./dist/bin.js concurrent-connections -n 2 -t -p name +``` diff --git a/test-client/package.json b/test-client/package.json index ac3b5ae86..2a077708d 100644 --- a/test-client/package.json +++ b/test-client/package.json @@ -18,6 +18,7 @@ "@powersync/service-core": "workspace:*", "commander": "^12.0.0", "jose": "^4.15.1", + "undici": "^7.15.0", "ws": "^8.18.0", "yaml": "^2.5.0" }, @@ -26,4 +27,4 @@ "@types/ws": "~8.2.0", "typescript": "^5.7.3" } -} +} \ No newline at end of file diff --git a/test-client/src/bin.ts b/test-client/src/bin.ts index 1b53d18dc..6734a965e 100644 --- a/test-client/src/bin.ts +++ b/test-client/src/bin.ts @@ -40,10 +40,20 @@ program .option('-u, --sub [sub]', 'sub field for auto-generated token') .option('-n, --num-clients [num-clients]', 'number of clients to connect') .option('-m, --mode [mode]', 'http or websocket') + .option('-p, --print [print]', 'print a field from the data being downloaded') + .option('--once', 'stop after the first checkpoint') .action(async (options) => { const credentials = await getCredentials(options); - await concurrentConnections(credentials, options['numClients'] ?? 10, options.mode ?? 'http'); + await concurrentConnections( + { + ...credentials, + once: options.once ?? false, + mode: options.mode + }, + options['numClients'] ?? 10, + options.print + ); }); await program.parseAsync(); diff --git a/test-client/src/client.ts b/test-client/src/client.ts index 5e2fc8641..55d82ced0 100644 --- a/test-client/src/client.ts +++ b/test-client/src/client.ts @@ -29,7 +29,7 @@ export async function getCheckpointData(options: GetCheckpointOptions) { let data: types.StreamingSyncData[] = []; let checkpoint: types.StreamingSyncCheckpoint; - for await (let chunk of ndjsonStream(response.body!)) { + for await (let { chunk } of ndjsonStream(response.body!)) { if (isStreamingSyncData(chunk)) { // Collect data data.push(chunk); diff --git a/test-client/src/httpStream.ts b/test-client/src/httpStream.ts new file mode 100644 index 000000000..7112d92c5 --- /dev/null +++ b/test-client/src/httpStream.ts @@ -0,0 +1,37 @@ +import type { StreamingSyncLine, StreamingSyncRequest } from '@powersync/service-core'; +import { Readable } from 'node:stream'; +import { request } from 'undici'; +import { ndjsonStream } from './ndjson.js'; +import { StreamEvent, SyncOptions } from './stream.js'; + +export async function* openHttpStream(options: SyncOptions): AsyncGenerator { + const streamRequest: StreamingSyncRequest = { + raw_data: true, + client_id: options.clientId, + buckets: [...(options.bucketPositions ?? new Map()).entries()].map(([bucket, after]) => ({ + name: bucket, + after: after + })) + }; + const response = await request(options.endpoint + '/sync/stream', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Token ${options.token}` + }, + body: JSON.stringify(streamRequest), + signal: options.signal + }); + + if (response.statusCode != 200) { + throw new Error(`Request failed with code: ${response.statusCode}\n${await response.body.text()}`); + } + + const stream = Readable.toWeb(response.body) as ReadableStream; + + for await (let { chunk, size } of ndjsonStream(stream)) { + yield { stats: { decodedBytes: size } }; + yield chunk; + } + // If we reach this, the connection was closed without error by the server +} diff --git a/test-client/src/load-testing/http-worker.ts b/test-client/src/load-testing/http-worker.ts deleted file mode 100644 index 895289157..000000000 --- a/test-client/src/load-testing/http-worker.ts +++ /dev/null @@ -1,59 +0,0 @@ -import { ndjsonStream } from '../ndjson.js'; - -import { parentPort, workerData } from 'worker_threads'; - -if (parentPort == null) { - throw new Error(`Can only run this script in a worker_thread`); -} - -const { i, url, token } = workerData; - -const response = await fetch(url + '/sync/stream', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - Authorization: `Token ${token}` - }, - body: JSON.stringify({ - raw_data: true, - include_checksums: true - }) -}); -if (!response.ok || response.body == null) { - throw new Error(response.statusText + '\n' + (await response.text())); -} - -let size = 0; -let numOperations = 0; -let lastCheckpointStart = 0; - -for await (let chunk of ndjsonStream(response.body)) { - size += JSON.stringify(chunk).length; - if (chunk?.checkpoint_complete) { - const duration = performance.now() - lastCheckpointStart; - console.log( - new Date().toISOString(), - i, - `checkpoint_complete op_id: ${chunk.checkpoint_complete.last_op_id}, ops: ${numOperations}, bytes: ${size}, duration: ${duration.toFixed(0)}ms` - ); - } else if (chunk?.data) { - numOperations += chunk.data.data.length; - } else if (chunk?.checkpoint) { - lastCheckpointStart = performance.now(); - console.log(new Date().toISOString(), i, `checkpoint buckets: ${chunk.checkpoint.buckets.length}`); - } else if (chunk?.checkpoint_diff) { - lastCheckpointStart = performance.now(); - console.log( - new Date().toISOString(), - i, - `checkpoint_diff removed_buckets: ${chunk.checkpoint_diff.removed_buckets.length} updated_buckets: ${chunk.checkpoint_diff.updated_buckets.length}` - ); - } else { - const key = Object.keys(chunk)[0]; - if (key != 'token_expires_in' && key != 'data') { - console.log(new Date().toISOString(), i, key); - } - } -} - -parentPort.postMessage({ done: true }); diff --git a/test-client/src/load-testing/load-test-worker.ts b/test-client/src/load-testing/load-test-worker.ts new file mode 100644 index 000000000..a8b95e65d --- /dev/null +++ b/test-client/src/load-testing/load-test-worker.ts @@ -0,0 +1,65 @@ +import { parentPort, workerData } from 'worker_threads'; +import { openStream, SyncOptions } from '../stream.js'; + +if (parentPort == null) { + throw new Error(`Can only run this script in a worker_thread`); +} + +const { i, print } = workerData; +const request: SyncOptions = workerData.request; + +let size = 0; +let numOperations = 0; +let lastCheckpointStart = 0; +let printData: string[] = []; + +const parseChunk = (chunk: any) => { + if (print == null) { + return; + } + chunk.data.forEach((data: any) => { + if (data.op == 'PUT') { + const payload = JSON.parse(data.data); + printData.push(payload[print]); + } + }); +}; + +for await (let chunk of openStream(request)) { + if ('error' in chunk) { + // Retried automatically + console.error(new Date().toISOString(), i, `Error in stream: ${chunk.error}`); + } else if ('checkpoint_complete' in chunk) { + const duration = performance.now() - lastCheckpointStart; + let message = `checkpoint_complete op_id: ${chunk.checkpoint_complete.last_op_id}, ops: ${numOperations}, bytes: ${size}, duration: ${duration.toFixed(0)}ms`; + if (print) { + message += `, data: [${printData}]`; + } + printData = []; + console.log(new Date().toISOString(), i, message); + + if (request.once) { + break; + } + } else if ('data' in chunk) { + parseChunk(chunk.data); + numOperations += chunk.data.data.length; + } else if ('checkpoint' in chunk) { + lastCheckpointStart = performance.now(); + console.log(new Date().toISOString(), i, `checkpoint buckets: ${chunk.checkpoint.buckets.length}`); + } else if ('checkpoint_diff' in chunk) { + lastCheckpointStart = performance.now(); + console.log( + new Date().toISOString(), + i, + `checkpoint_diff removed_buckets: ${chunk.checkpoint_diff.removed_buckets.length} updated_buckets: ${chunk.checkpoint_diff.updated_buckets.length}` + ); + } else if ('stats' in chunk) { + size += chunk.stats.decodedBytes; + } else { + const key = Object.keys(chunk)[0]; + if (key != 'token_expires_in' && key != 'data') { + console.log(new Date().toISOString(), i, key); + } + } +} diff --git a/test-client/src/load-testing/load-test.ts b/test-client/src/load-testing/load-test.ts index 1898b1a15..2690c6634 100644 --- a/test-client/src/load-testing/load-test.ts +++ b/test-client/src/load-testing/load-test.ts @@ -1,38 +1,24 @@ import { Worker } from 'worker_threads'; -import { Credentials } from '../auth.js'; +import { SyncOptions } from '../stream.js'; export type Mode = 'http' | 'websocket'; -export async function stream(i: number, credentials: Credentials, mode: Mode) { - const worker = - mode == 'websocket' - ? new Worker(new URL('./rsocket-worker.js', import.meta.url), { - workerData: { i, token: credentials.token, url: credentials.endpoint.replace(/^http/, 'ws') } - }) - : new Worker(new URL('./http-worker.js', import.meta.url), { - workerData: { i, token: credentials.token, url: credentials.endpoint } - }); +export async function stream(i: number, request: SyncOptions, print: string | undefined) { + const worker = new Worker(new URL('./load-test-worker.js', import.meta.url), { + workerData: { i, request, print } + }); await new Promise((resolve, reject) => { worker.on('message', (event) => resolve(event)); worker.on('error', (err) => reject(err)); + worker.on('exit', (__code) => { + resolve(null); + }); }); worker.terminate(); } -export async function streamForever(i: number, credentials: Credentials, mode: Mode) { - while (true) { - try { - await stream(i, credentials, mode); - console.log(new Date().toISOString(), i, 'Stream ended'); - } catch (e) { - console.error(new Date().toISOString(), i, e.message); - await new Promise((resolve) => setTimeout(resolve, 1000 + Math.random())); - } - } -} - -export async function concurrentConnections(credentials: Credentials, numClients: number, mode: Mode) { +export async function concurrentConnections(options: SyncOptions, numClients: number, print: string | undefined) { for (let i = 0; i < numClients; i++) { - streamForever(i, credentials, mode); + stream(i, { ...options, clientId: options.clientId ?? `load-test-${i}` }, print); } } diff --git a/test-client/src/load-testing/rsocket-worker.ts b/test-client/src/load-testing/rsocket-worker.ts deleted file mode 100644 index defdf272e..000000000 --- a/test-client/src/load-testing/rsocket-worker.ts +++ /dev/null @@ -1,86 +0,0 @@ -import { RSocketConnector } from 'rsocket-core'; -import { serialize, deserialize } from 'bson'; -import WebSocket from 'ws'; -import { WebsocketClientTransport } from 'rsocket-websocket-client'; - -import { parentPort, workerData } from 'worker_threads'; - -if (parentPort == null) { - throw new Error(`Can only run this script in a worker_thread`); -} - -const { i, url, token } = workerData; - -const client = new RSocketConnector({ - transport: new WebsocketClientTransport({ - url, - wsCreator: (url) => { - return new WebSocket(url) as any; - } - }), - setup: { - dataMimeType: 'application/bson', - metadataMimeType: 'application/bson', - keepAlive: 15_000, - lifetime: 120_000, - payload: { - data: null, - metadata: Buffer.from( - serialize({ - token: `Token ${token}` - }) - ) - } - } -}); - -const rsocket = await client.connect(); - -const SYNC_QUEUE_REQUEST_N = 2; - -let pendingEventsCount = SYNC_QUEUE_REQUEST_N; -let size = 0; - -const stream = rsocket.requestStream( - { - data: Buffer.from(serialize({})), - metadata: Buffer.from( - serialize({ - path: '/sync/stream' - }) - ) - }, - SYNC_QUEUE_REQUEST_N, // The initial N amount - { - onError: (e) => { - console.error(new Date().toISOString(), i, e); - }, - onNext: (payload) => { - const { data } = payload; - // Less events are now pending - pendingEventsCount--; - if (!data) { - return; - } - - size += data.byteLength; - - const chunk = deserialize(data); - if (chunk?.checkpoint_complete) { - console.log(new Date().toISOString(), i, 'checkpoint', chunk.checkpoint_complete.last_op_id, size); - } else { - console.log(new Date().toISOString(), i, Object.keys(chunk)[0]); - } - - const required = SYNC_QUEUE_REQUEST_N - pendingEventsCount; - if (required > 0) { - stream.request(SYNC_QUEUE_REQUEST_N - pendingEventsCount); - pendingEventsCount = SYNC_QUEUE_REQUEST_N; - } - }, - onComplete: () => { - stream.cancel(); - }, - onExtension: () => {} - } -); diff --git a/test-client/src/ndjson.ts b/test-client/src/ndjson.ts index 65af0c45a..200843943 100644 --- a/test-client/src/ndjson.ts +++ b/test-client/src/ndjson.ts @@ -1,7 +1,9 @@ -export function ndjsonStream(response: ReadableStream): ReadableStream & AsyncIterable { +export function ndjsonStream( + response: ReadableStream +): ReadableStream<{ chunk: T; size: number }> & AsyncIterable<{ chunk: T; size: number }> { var is_reader: any, cancellationRequest = false; - return new ReadableStream({ + return new ReadableStream<{ chunk: T; size: number }>({ start: function (controller) { var reader = response.getReader(); is_reader = reader; @@ -19,7 +21,7 @@ export function ndjsonStream(response: ReadableStream): ReadableS if (data_buf.length !== 0) { try { var data_l = JSON.parse(data_buf); - controller.enqueue(data_l); + controller.enqueue({ chunk: data_l, size: data_l.length }); } catch (e) { controller.error(e); return; @@ -37,7 +39,7 @@ export function ndjsonStream(response: ReadableStream): ReadableS if (l.length > 0) { try { var data_line = JSON.parse(l); - controller.enqueue(data_line); + controller.enqueue({ chunk: data_line, size: l.length }); } catch (e) { controller.error(e); cancellationRequest = true; diff --git a/test-client/src/rsocketStream.ts b/test-client/src/rsocketStream.ts new file mode 100644 index 000000000..b992e631b --- /dev/null +++ b/test-client/src/rsocketStream.ts @@ -0,0 +1,130 @@ +import type { StreamingSyncLine, StreamingSyncRequest } from '@powersync/service-core'; +import { deserialize, serialize } from 'bson'; +import { RSocketConnector } from 'rsocket-core'; +import { WebsocketClientTransport } from 'rsocket-websocket-client'; +import type { StreamEvent, SyncOptions } from './stream.js'; + +export async function* openWebSocketStream(options: SyncOptions): AsyncGenerator { + const client = new RSocketConnector({ + transport: new WebsocketClientTransport({ + url: options.endpoint.replace(/^http/, 'ws'), + wsCreator: (url: string) => { + return new WebSocket(url) as any; + } + }), + setup: { + dataMimeType: 'application/bson', + metadataMimeType: 'application/bson', + keepAlive: 15_000, + lifetime: 120_000, + payload: { + data: null, + metadata: Buffer.from( + serialize({ + token: `Token ${options.token}` + }) + ) + } + } + }); + + const rsocket = await client.connect(); + + const SYNC_QUEUE_REQUEST_N = 10; + + let pendingEventsCount = SYNC_QUEUE_REQUEST_N; + + const q: StreamEvent[] = []; + let resolveWaiter: (() => void) | null = null; + let done = false; + let error = null; + + const wake = () => { + if (resolveWaiter) { + resolveWaiter(); + resolveWaiter = null; + } + }; + const streamRequest: StreamingSyncRequest = { + client_id: options.clientId, + buckets: [...(options.bucketPositions ?? new Map()).entries()].map(([bucket, after]) => ({ + name: bucket, + after: after + })) + }; + + const stream = rsocket.requestStream( + { + data: Buffer.from(serialize(streamRequest)), + metadata: Buffer.from( + serialize({ + path: '/sync/stream' + }) + ) + }, + SYNC_QUEUE_REQUEST_N, // The initial N amount + { + onError: (e) => { + error = e ?? new Error('stream error'); + wake(); + }, + onNext: (payload) => { + const { data } = payload; + + // Less events are now pending + pendingEventsCount--; + if (!data) { + return; + } + + const chunk = deserialize(data); + // Note: We don't currently apply any backpressure. + // Since we process data fairly synchronously in the test-client, it should not make a big difference. + q.push({ stats: { decodedBytes: data.byteLength } }); + q.push(chunk as StreamingSyncLine); + wake(); + + const required = SYNC_QUEUE_REQUEST_N - pendingEventsCount; + if (required > 0) { + stream.request(required); + pendingEventsCount += required; + } + }, + onComplete: () => { + stream.cancel(); + done = true; + wake(); + }, + onExtension: () => {} + } + ); + + if (options.signal?.aborted) { + stream.cancel(); + } + options.signal?.addEventListener( + 'abort', + () => { + stream.cancel(); + }, + { once: true } + ); + + try { + while (true) { + if (error) { + throw error; + } + if (q.length > 0) { + yield q.shift()!; + continue; + } + if (done) { + break; + } + await new Promise((r) => (resolveWaiter = r)); + } + } finally { + stream.cancel(); + } +} diff --git a/test-client/src/stream.ts b/test-client/src/stream.ts new file mode 100644 index 000000000..5e7f8e3e2 --- /dev/null +++ b/test-client/src/stream.ts @@ -0,0 +1,46 @@ +import { StreamingSyncLine } from '@powersync/service-core'; +import type { openHttpStream } from './httpStream.js'; +import * as timers from 'node:timers/promises'; + +export type SyncOptions = { + endpoint: string; + token: string; + clientId?: string | undefined; + signal?: AbortSignal | undefined; + once?: boolean | undefined; + bucketPositions?: Map | undefined; + mode?: 'http' | 'websocket'; +}; + +export type StreamEvent = StreamingSyncLine | { error: any } | { stats: { decodedBytes: number } }; + +export async function* openStream(options: SyncOptions) { + let bucketPositions = new Map(options.bucketPositions); + while (!options.signal?.aborted) { + try { + const stream = openStreamIteration({ ...options, bucketPositions }); + + for await (let chunk of stream) { + if ('data' in chunk) { + bucketPositions.set(chunk.data.bucket, chunk.data.next_after); + } + yield chunk; + } + // Connection ended, reconnecting + } catch (e) { + yield { error: e }; + // Connection error, reconnecting + await timers.setTimeout(1000 + Math.random() * 1000); + } + } +} + +export async function* openStreamIteration(options: SyncOptions): AsyncGenerator { + let impl: typeof openHttpStream; + if (options.mode === 'websocket') { + impl = (await import('./rsocketStream.js')).openWebSocketStream; + } else { + impl = (await import('./httpStream.js')).openHttpStream; + } + yield* impl(options); +}