From 7c720bc71a5f5cf16b4ed8633ed283a254ef05b3 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 26 Aug 2025 10:41:00 +0200 Subject: [PATCH 1/2] Enable permessage-deflate for websockets. --- .../src/router/ReactiveSocketRouter.ts | 37 +++++++++++++++++-- .../transport/WebsocketDuplexConnection.ts | 37 ++++++++++++++++++- packages/rsocket-router/src/router/types.ts | 5 +++ .../src/routes/endpoints/socket-route.ts | 17 ++++++++- 4 files changed, 90 insertions(+), 6 deletions(-) diff --git a/packages/rsocket-router/src/router/ReactiveSocketRouter.ts b/packages/rsocket-router/src/router/ReactiveSocketRouter.ts index ac6f8a13b..338631249 100644 --- a/packages/rsocket-router/src/router/ReactiveSocketRouter.ts +++ b/packages/rsocket-router/src/router/ReactiveSocketRouter.ts @@ -8,13 +8,14 @@ import * as http from 'http'; import { Payload, RSocketServer } from 'rsocket-core'; import * as ws from 'ws'; import { SocketRouterObserver } from './SocketRouterListener.js'; +import { WebsocketDuplexConnection } from './transport/WebsocketDuplexConnection.js'; import { WebsocketServerTransport } from './transport/WebSocketServerTransport.js'; import { CommonParams, IReactiveStream, IReactiveStreamInput, - RS_ENDPOINT_TYPE, ReactiveSocketRouterOptions, + RS_ENDPOINT_TYPE, SocketResponder } from './types.js'; @@ -24,6 +25,7 @@ export interface ReactiveStreamRequest { dataMimeType: string; initialN: number; responder: SocketResponder; + connection: WebsocketDuplexConnection; } export interface SocketBaseContext { @@ -51,7 +53,30 @@ export class ReactiveSocketRouter { * This follows a similar pattern to the Journey Micro * web sockets router. */ - const wss = new ws.WebSocketServer({ noServer: true }); + const wss = new ws.WebSocketServer({ + noServer: true, + perMessageDeflate: { + zlibDeflateOptions: { + chunkSize: 128 * 1024, // default is 16kb - increased for better efficiency + memLevel: 7, // default is 8 + level: 3 + }, + zlibInflateOptions: { + // for decompressing messages from the client + chunkSize: 32 * 1024 + }, + // don't keep client context between messages + clientNoContextTakeover: true, + // keep context between messages from the server + serverNoContextTakeover: false, + // bigger window uses more memory and potentially more cpu. 10-15 is a good range. + serverMaxWindowBits: 12, + // Limit concurrent compression threads + concurrencyLimit: 8, + // Size (in bytes) below which messages should not be compressed _if context takeover is disabled_. + threshold: 1024 + } + }); server.on('upgrade', (request, socket, head) => { wss.handleUpgrade(request, socket as any, head, (ws) => { wss.emit('connection', ws, request); @@ -66,7 +91,9 @@ export class ReactiveSocketRouter { const rSocketServer = new RSocketServer({ transport, acceptor: { - accept: async (payload) => { + accept: async (payload, rsocket) => { + const connection = (rsocket as any).connection as WebsocketDuplexConnection; + const { max_concurrent_connections } = this.options ?? {}; logger.info(`Currently have ${wss.clients.size} active WebSocket connection(s)`); // wss.clients.size includes this connection, so we check for greater than @@ -104,7 +131,7 @@ export class ReactiveSocketRouter { // TODO: Consider limiting the number of active streams per connection to prevent abuse handleReactiveStream( context, - { payload, initialN, responder, dataMimeType, metadataMimeType }, + { payload, initialN, responder, dataMimeType, metadataMimeType, connection }, observer, abortController, params @@ -191,6 +218,7 @@ export async function handleReactiveStream( context, observer, signal: abortController.signal, + connection: request.connection, responder }); if (!isAuthorized.authorized) { @@ -207,6 +235,7 @@ export async function handleReactiveStream( observer, signal: abortController.signal, responder, + connection: request.connection, initialN }); } catch (ex) { diff --git a/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts b/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts index a5c3d9517..9e59af03c 100644 --- a/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts +++ b/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts @@ -16,6 +16,7 @@ */ import { logger } from '@powersync/lib-services-framework'; +import { Socket } from 'net'; import { Closeable, Deferred, @@ -33,6 +34,7 @@ import WebSocket from 'ws'; export class WebsocketDuplexConnection extends Deferred implements DuplexConnection, Outbound { readonly multiplexerDemultiplexer: Multiplexer & Demultiplexer & FrameHandler; + readonly tracker: WebSocketTracker; constructor( private websocketDuplex: Duplex, @@ -50,6 +52,7 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect websocketDuplex.on('data', this.handleMessage); this.multiplexerDemultiplexer = multiplexerDemultiplexerFactory(frame, this); + this.tracker = new WebSocketTracker(this.rawSocket); } get availability(): number { @@ -97,7 +100,9 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect }; private handleError = (e: WebSocket.ErrorEvent): void => { - logger.error(`Error in WebSocket duplex connection: ${e}`); + // Example: + // Error: The socket was closed while data was being compressed + logger.warn(`Error in WebSocket duplex connection: ${e}`); if (!this.done) { this.close(e.error); } @@ -149,3 +154,33 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect }); } } + +/** + * Tracks encoding and bytes written on a websocket connection, catering for compressed data. + */ +export class WebSocketTracker { + private lastBytesWritten: number; + private socket: Socket; + readonly encoding: 'permessage-deflate' | undefined; + + constructor(ws: WebSocket) { + this.socket = (ws as any)._socket; + this.lastBytesWritten = this.socket.bytesWritten; + + // Crude check, but this is the only extension that would actually be used + if (ws.extensions.includes('permessage-deflate')) { + this.encoding = 'permessage-deflate'; + } else { + this.encoding = undefined; + } + } + + /** + * Consumes and returns the number of bytes sent. + */ + getBytesWritten(): number { + const written = this.socket.bytesWritten - this.lastBytesWritten; + this.lastBytesWritten = this.socket.bytesWritten; + return written; + } +} diff --git a/packages/rsocket-router/src/router/types.ts b/packages/rsocket-router/src/router/types.ts index 6f764148e..29d4272b4 100644 --- a/packages/rsocket-router/src/router/types.ts +++ b/packages/rsocket-router/src/router/types.ts @@ -4,6 +4,7 @@ import { router } from '@powersync/lib-services-framework'; import { OnExtensionSubscriber, OnNextSubscriber, OnTerminalSubscriber } from 'rsocket-core'; import { SocketRouterObserver } from './SocketRouterListener.js'; +import { WebsocketDuplexConnection } from './transport/WebsocketDuplexConnection.js'; export enum RS_ENDPOINT_TYPE { // Other methods are supported by RSocket, but are not yet mapped here @@ -26,6 +27,10 @@ export type CommonStreamPayload = { observer: SocketRouterObserver; responder: SocketResponder; signal: AbortSignal; + /** + * The underlying websocket connection. Should not be used directly apart from tracking metadata. + */ + connection: WebsocketDuplexConnection; }; export type ReactiveStreamPayload = CommonStreamPayload & { diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index 21b4064dc..663ba7844 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -11,7 +11,7 @@ import { APIMetric } from '@powersync/service-types'; export const syncStreamReactive: SocketRouteGenerator = (router) => router.reactiveStream(SyncRoutes.STREAM, { validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }), - handler: async ({ context, params, responder, observer, initialN, signal: upstreamSignal }) => { + handler: async ({ context, params, responder, observer, initialN, signal: upstreamSignal, connection }) => { const { service_context, logger } = context; const { routerEngine, metricsEngine, syncContext } = service_context; @@ -84,6 +84,10 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => metricsEngine.getUpDownCounter(APIMetric.CONCURRENT_CONNECTIONS).add(1); const tracker = new sync.RequestTracker(metricsEngine); + if (connection.tracker.encoding) { + // Must be set before we start the stream + tracker.setCompressed(connection.tracker.encoding); + } try { for await (const data of sync.streamResponse({ syncContext: syncContext, @@ -150,6 +154,17 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => responder.onComplete(); removeStopHandler(); disposer(); + if (connection.tracker.encoding) { + // Technically, this may not be unique to this specific stream, since there could be multiple + // rsocket streams on the same websocket connection. We don't have a way to track compressed bytes + // on individual streams, and we generally expect 1 stream per connection, so this is a reasonable + // approximation. + // If there are multiple streams, bytes written would be split arbitrarily across them, but the + // total should be correct. + // For non-compressed cases, this is tracked by the stream itself. + const socketBytes = connection.tracker.getBytesWritten(); + tracker.addCompressedDataSent(socketBytes); + } logger.info(`Sync stream complete`, { ...tracker.getLogMeta(), stream_ms: Date.now() - streamStart, From 0cde51bc1d408e839f27fd7cc0a8cf5771864437 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 26 Aug 2025 10:41:52 +0200 Subject: [PATCH 2/2] Changeset. --- .changeset/rich-fans-care.md | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .changeset/rich-fans-care.md diff --git a/.changeset/rich-fans-care.md b/.changeset/rich-fans-care.md new file mode 100644 index 000000000..f732f1dfa --- /dev/null +++ b/.changeset/rich-fans-care.md @@ -0,0 +1,8 @@ +--- +'@powersync/service-rsocket-router': minor +'@powersync/service-core': minor +'@powersync/service-image': minor +'@powersync/service-types': patch +--- + +Enable permessage-deflate for websockets.