Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/rich-fans-care.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/service-rsocket-router': minor
'@powersync/service-core': minor
'@powersync/service-image': minor
'@powersync/service-types': patch
---

Enable permessage-deflate for websockets.
37 changes: 33 additions & 4 deletions packages/rsocket-router/src/router/ReactiveSocketRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import * as http from 'http';
import { Payload, RSocketServer } from 'rsocket-core';
import * as ws from 'ws';
import { SocketRouterObserver } from './SocketRouterListener.js';
import { WebsocketDuplexConnection } from './transport/WebsocketDuplexConnection.js';
import { WebsocketServerTransport } from './transport/WebSocketServerTransport.js';
import {
CommonParams,
IReactiveStream,
IReactiveStreamInput,
RS_ENDPOINT_TYPE,
ReactiveSocketRouterOptions,
RS_ENDPOINT_TYPE,
SocketResponder
} from './types.js';

Expand All @@ -24,6 +25,7 @@ export interface ReactiveStreamRequest {
dataMimeType: string;
initialN: number;
responder: SocketResponder;
connection: WebsocketDuplexConnection;
}

export interface SocketBaseContext {
Expand Down Expand Up @@ -51,7 +53,30 @@ export class ReactiveSocketRouter<C extends SocketBaseContext> {
* This follows a similar pattern to the Journey Micro
* web sockets router.
*/
const wss = new ws.WebSocketServer({ noServer: true });
const wss = new ws.WebSocketServer({
noServer: true,
perMessageDeflate: {
zlibDeflateOptions: {
chunkSize: 128 * 1024, // default is 16kb - increased for better efficiency
memLevel: 7, // default is 8
level: 3
},
zlibInflateOptions: {
// for decompressing messages from the client
chunkSize: 32 * 1024
},
// don't keep client context between messages
clientNoContextTakeover: true,
// keep context between messages from the server
serverNoContextTakeover: false,
// bigger window uses more memory and potentially more cpu. 10-15 is a good range.
serverMaxWindowBits: 12,
// Limit concurrent compression threads
concurrencyLimit: 8,
// Size (in bytes) below which messages should not be compressed _if context takeover is disabled_.
threshold: 1024
}
});
server.on('upgrade', (request, socket, head) => {
wss.handleUpgrade(request, socket as any, head, (ws) => {
wss.emit('connection', ws, request);
Expand All @@ -66,7 +91,9 @@ export class ReactiveSocketRouter<C extends SocketBaseContext> {
const rSocketServer = new RSocketServer({
transport,
acceptor: {
accept: async (payload) => {
accept: async (payload, rsocket) => {
const connection = (rsocket as any).connection as WebsocketDuplexConnection;

const { max_concurrent_connections } = this.options ?? {};
logger.info(`Currently have ${wss.clients.size} active WebSocket connection(s)`);
// wss.clients.size includes this connection, so we check for greater than
Expand Down Expand Up @@ -104,7 +131,7 @@ export class ReactiveSocketRouter<C extends SocketBaseContext> {
// TODO: Consider limiting the number of active streams per connection to prevent abuse
handleReactiveStream(
context,
{ payload, initialN, responder, dataMimeType, metadataMimeType },
{ payload, initialN, responder, dataMimeType, metadataMimeType, connection },
observer,
abortController,
params
Expand Down Expand Up @@ -191,6 +218,7 @@ export async function handleReactiveStream<Context extends SocketBaseContext>(
context,
observer,
signal: abortController.signal,
connection: request.connection,
responder
});
if (!isAuthorized.authorized) {
Expand All @@ -207,6 +235,7 @@ export async function handleReactiveStream<Context extends SocketBaseContext>(
observer,
signal: abortController.signal,
responder,
connection: request.connection,
initialN
});
} catch (ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

import { logger } from '@powersync/lib-services-framework';
import { Socket } from 'net';
import {
Closeable,
Deferred,
Expand All @@ -33,6 +34,7 @@ import WebSocket from 'ws';

export class WebsocketDuplexConnection extends Deferred implements DuplexConnection, Outbound {
readonly multiplexerDemultiplexer: Multiplexer & Demultiplexer & FrameHandler;
readonly tracker: WebSocketTracker;

constructor(
private websocketDuplex: Duplex,
Expand All @@ -50,6 +52,7 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect
websocketDuplex.on('data', this.handleMessage);

this.multiplexerDemultiplexer = multiplexerDemultiplexerFactory(frame, this);
this.tracker = new WebSocketTracker(this.rawSocket);
}

get availability(): number {
Expand Down Expand Up @@ -97,7 +100,9 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect
};

private handleError = (e: WebSocket.ErrorEvent): void => {
logger.error(`Error in WebSocket duplex connection: ${e}`);
// Example:
// Error: The socket was closed while data was being compressed
logger.warn(`Error in WebSocket duplex connection: ${e}`);
if (!this.done) {
this.close(e.error);
}
Expand Down Expand Up @@ -149,3 +154,33 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect
});
}
}

/**
* Tracks encoding and bytes written on a websocket connection, catering for compressed data.
*/
export class WebSocketTracker {
private lastBytesWritten: number;
private socket: Socket;
readonly encoding: 'permessage-deflate' | undefined;

constructor(ws: WebSocket) {
this.socket = (ws as any)._socket;
this.lastBytesWritten = this.socket.bytesWritten;

// Crude check, but this is the only extension that would actually be used
if (ws.extensions.includes('permessage-deflate')) {
this.encoding = 'permessage-deflate';
} else {
this.encoding = undefined;
}
}

/**
* Consumes and returns the number of bytes sent.
*/
getBytesWritten(): number {
const written = this.socket.bytesWritten - this.lastBytesWritten;
this.lastBytesWritten = this.socket.bytesWritten;
return written;
}
}
5 changes: 5 additions & 0 deletions packages/rsocket-router/src/router/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { router } from '@powersync/lib-services-framework';
import { OnExtensionSubscriber, OnNextSubscriber, OnTerminalSubscriber } from 'rsocket-core';

import { SocketRouterObserver } from './SocketRouterListener.js';
import { WebsocketDuplexConnection } from './transport/WebsocketDuplexConnection.js';

export enum RS_ENDPOINT_TYPE {
// Other methods are supported by RSocket, but are not yet mapped here
Expand All @@ -26,6 +27,10 @@ export type CommonStreamPayload = {
observer: SocketRouterObserver;
responder: SocketResponder;
signal: AbortSignal;
/**
* The underlying websocket connection. Should not be used directly apart from tracking metadata.
*/
connection: WebsocketDuplexConnection;
};

export type ReactiveStreamPayload<O> = CommonStreamPayload & {
Expand Down
17 changes: 16 additions & 1 deletion packages/service-core/src/routes/endpoints/socket-route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { APIMetric } from '@powersync/service-types';
export const syncStreamReactive: SocketRouteGenerator = (router) =>
router.reactiveStream<util.StreamingSyncRequest, any>(SyncRoutes.STREAM, {
validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }),
handler: async ({ context, params, responder, observer, initialN, signal: upstreamSignal }) => {
handler: async ({ context, params, responder, observer, initialN, signal: upstreamSignal, connection }) => {
const { service_context, logger } = context;
const { routerEngine, metricsEngine, syncContext } = service_context;

Expand Down Expand Up @@ -84,6 +84,10 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>

metricsEngine.getUpDownCounter(APIMetric.CONCURRENT_CONNECTIONS).add(1);
const tracker = new sync.RequestTracker(metricsEngine);
if (connection.tracker.encoding) {
// Must be set before we start the stream
tracker.setCompressed(connection.tracker.encoding);
}
try {
for await (const data of sync.streamResponse({
syncContext: syncContext,
Expand Down Expand Up @@ -153,6 +157,17 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
responder.onComplete();
removeStopHandler();
disposer();
if (connection.tracker.encoding) {
// Technically, this may not be unique to this specific stream, since there could be multiple
// rsocket streams on the same websocket connection. We don't have a way to track compressed bytes
// on individual streams, and we generally expect 1 stream per connection, so this is a reasonable
// approximation.
// If there are multiple streams, bytes written would be split arbitrarily across them, but the
// total should be correct.
// For non-compressed cases, this is tracked by the stream itself.
const socketBytes = connection.tracker.getBytesWritten();
tracker.addCompressedDataSent(socketBytes);
}
logger.info(`Sync stream complete`, {
...tracker.getLogMeta(),
stream_ms: Date.now() - streamStart,
Expand Down