Skip to content

Commit 3b17092

Browse files
committed
feat: remove gc, and improve bye handling
bye handling is not treated as a special message to avoid loopback from resending bye on close. GC is now removed with assuming no messages are likely to be in-flight. If there is, it should show as another new session from the same peer.
1 parent 1e77c63 commit 3b17092

File tree

3 files changed

+55
-76
lines changed

3 files changed

+55
-76
lines changed

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,8 @@ test:
1919
npm run build -w peer
2020
npx playwright test --project=local -j 1
2121

22+
test-local:
23+
PULSEBEAM_BASE_URL=http://localhost:3000/grpc npx playwright test --project=local -j 2
24+
2225
test-flaky:
23-
for i in `seq 100`; do echo $i; PULSEBEAM_BASE_URL=http://localhost:3000/grpc npx playwright test --project=local -j2 || break; done
26+
for i in `seq 100`; do echo $i; make test-local || break; done

peer/session.ts

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import {
22
type ICECandidate,
3-
type MessagePayload,
43
PeerInfo,
54
SdpKind,
65
type Signal,
@@ -210,7 +209,7 @@ export class Session {
210209
100,
211210
(c) => this.sendLocalIceCandidates(c),
212211
);
213-
stream.onpayload = (msg) => this.handleMessage(msg);
212+
stream.onsignal = (msg) => this.handleSignal(msg);
214213
stream.onclosed = (reason) => this.close(reason);
215214

216215
this.pc.oniceconnectionstatechange = async () => {
@@ -399,24 +398,6 @@ export class Session {
399398
}, true);
400399
};
401400

402-
private handleMessage = async (payload: MessagePayload) => {
403-
if (this.abort.signal.aborted) {
404-
this.logger.warn("session is closed, ignoring message");
405-
return;
406-
}
407-
switch (payload.payloadType.oneofKind) {
408-
case "signal":
409-
await this.handleSignal(payload.payloadType.signal);
410-
break;
411-
case "bye":
412-
this.close();
413-
break;
414-
case "join":
415-
// nothing to do here. SDK consumer needs to manually trigger the start
416-
break;
417-
}
418-
};
419-
420401
private handleSignal = async (signal: Signal) => {
421402
if (signal.generationCounter < this.generationCounter) {
422403
this.logger.warn("detected staled generationCounter signals, ignoring");

peer/transport.ts

Lines changed: 50 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import type {
2-
Ack,
32
Message,
43
MessageHeader,
54
MessagePayload,
65
PeerInfo,
6+
Signal,
77
} from "./signaling.ts";
88
import { ISignalingClient } from "./signaling.client.ts";
99
import type { Logger } from "./logger.ts";
@@ -13,9 +13,6 @@ import { RpcOptions } from "@protobuf-ts/runtime-rpc";
1313
const POLL_TIMEOUT_MS = 900000;
1414
const POLL_RETRY_BASE_DELAY_MS = 50;
1515
const POLL_RETRY_MAX_DELAY_MS = 1000;
16-
const MAX_RELIABLE_RETRY_COUNT = 5;
17-
const STREAM_GC_DELAY_MS = 10_000; // just enough to avoid collision and quick enough to reuse connection
18-
const STREAM_GC_INTERVAL_MS = 1_000;
1916

2017
export enum ReservedConnId {
2118
Discovery = 0,
@@ -46,7 +43,7 @@ class Queue {
4643
private unreliable: Message[];
4744
private processing: boolean;
4845
private readonly logger: Logger;
49-
public onmsg = async (_: Message) => {};
46+
public onmsg = async (_: Message) => { };
5047

5148
constructor(logger: Logger) {
5249
this.logger = logger.sub("queue");
@@ -118,8 +115,8 @@ export class Transport {
118115
public readonly asleep: typeof defaultAsleep;
119116
private readonly randUint32: typeof defaultRandUint32;
120117
private readonly isRecoverable: typeof defaultIsRecoverable;
121-
public onstream = (_: Stream) => {};
122-
public onclosed = (_reason: string) => {};
118+
public onstream = (_: Stream) => { };
119+
public onclosed = (_reason: string) => { };
123120

124121
constructor(
125122
private readonly client: ISignalingClient,
@@ -142,23 +139,6 @@ export class Transport {
142139
}
143140

144141
async listen() {
145-
await Promise.all([
146-
this.pollLoop(),
147-
this.gcLoop(),
148-
]);
149-
}
150-
151-
async gcLoop() {
152-
while (!this.abort.signal.aborted) {
153-
// use cooldown period to fully close. Otherwise, there's a chance that the other peer is
154-
// still sending some messages. In which case, we need to still ignore for some time until completely quiet.
155-
this.streams = this.streams.filter((s) => !s.isClosed());
156-
await asleep(STREAM_GC_INTERVAL_MS, this.abort.signal);
157-
}
158-
this.logger.debug("gc loop is closed");
159-
}
160-
161-
async pollLoop() {
162142
const rpcOpt: RpcOptions = {
163143
abort: this.abort.signal,
164144
timeout: POLL_TIMEOUT_MS,
@@ -249,15 +229,22 @@ export class Transport {
249229
}
250230

251231
if (!stream) {
252-
this.logger.debug(
253-
`session not found, creating one for ${src.peerId}:${src.connId}`,
254-
);
255-
232+
// if (msg.payload?.payloadType.oneofKind !== "join") {
233+
// this.logger.warn(
234+
// `session not found, but non-join from ${src.peerId}:${src.connId}, dropping as this is likely staled.`,
235+
// );
236+
// return;
237+
// }
238+
//
256239
if (src.peerId == this.info.peerId) {
257240
this.logger.warn("loopback detected, ignoring messages");
258241
return;
259242
}
260243

244+
this.logger.debug(
245+
`session not found, creating one for ${src.peerId}:${src.connId}`,
246+
);
247+
261248
stream = new Stream(
262249
this,
263250
this.info,
@@ -271,6 +258,10 @@ export class Transport {
271258
stream.enqueue(msg);
272259
};
273260

261+
removeStream(stream: Stream) {
262+
this.streams = this.streams.filter((s) => s !== stream);
263+
}
264+
274265
async connect(
275266
otherGroupId: string,
276267
otherPeerId: string,
@@ -300,7 +291,7 @@ export class Transport {
300291
header,
301292
payload,
302293
});
303-
await this.asleep(POLL_RETRY_MAX_DELAY_MS, joinedSignal).catch(() => {});
294+
await this.asleep(POLL_RETRY_MAX_DELAY_MS, joinedSignal).catch(() => { });
304295

305296
found = !!this.streams.find((s) =>
306297
s.other.groupId === otherGroupId && s.other.peerId === otherPeerId
@@ -348,10 +339,9 @@ export class Stream {
348339
public readonly logger: Logger;
349340
private abort: AbortController;
350341
public recvq: Queue;
351-
private closedAt: number;
352342
private lastSeqnum: number;
353-
public onpayload = async (_: MessagePayload) => {};
354-
public onclosed = (_reason: string) => {};
343+
public onsignal = async (_: Signal) => { };
344+
public onclosed = (_reason: string) => { };
355345

356346
constructor(
357347
private readonly transport: Transport,
@@ -366,23 +356,12 @@ export class Stream {
366356
this.recvq = new Queue(this.logger);
367357
this.recvq.onmsg = (msg) => this.handleMessage(msg);
368358
this.lastSeqnum = 0;
369-
this.closedAt = 0;
370359
}
371360

372361
createSignal(...signals: AbortSignal[]): AbortSignal {
373362
return joinSignals(this.abort.signal, ...signals);
374363
}
375364

376-
isClosed(): boolean {
377-
const closed = this.abort.signal.aborted &&
378-
(performance.now() - this.closedAt) > STREAM_GC_DELAY_MS;
379-
380-
if (closed) {
381-
this.logger.debug("stream is ready for GC");
382-
}
383-
return closed;
384-
}
385-
386365
enqueue(msg: Message) {
387366
if (this.abort.signal.aborted) {
388367
this.logger.warn(
@@ -419,23 +398,39 @@ export class Stream {
419398
});
420399
return;
421400
}
422-
this.onpayload(msg.payload);
401+
402+
switch (msg.payload.payloadType.oneofKind) {
403+
case "bye":
404+
this.close("received bye", true);
405+
return;
406+
case "signal":
407+
this.onsignal(msg.payload.payloadType.signal);
408+
return;
409+
case "join":
410+
// nothing to do here, this just creates the session
411+
return;
412+
default:
413+
this.logger.warn("unhandled payload type", { msg });
414+
return;
415+
}
423416
}
424417

425-
async close(reason?: string) {
418+
async close(reason?: string, skipBye?: boolean) {
426419
if (this.abort.signal.aborted) return;
427420
reason = reason || "session is closed";
428-
// make sure to give a chance to send a message
429-
await this.send({
430-
payloadType: {
431-
oneofKind: "bye",
432-
bye: {},
433-
},
434-
}, false).catch((err) =>
435-
this.logger.warn("failed to send bye", { e: err })
436-
);
421+
if (!skipBye) {
422+
// make sure to give a chance to send a message
423+
await this.send({
424+
payloadType: {
425+
oneofKind: "bye",
426+
bye: {},
427+
},
428+
}, false).catch((err) =>
429+
this.logger.warn("failed to send bye", { e: err })
430+
);
431+
}
437432
this.abort.abort(reason);
438-
this.closedAt = performance.now();
433+
this.transport.removeStream(this);
439434
this.onclosed(reason);
440435
this.logger.debug("sent bye to the other peer", { reason });
441436
}

0 commit comments

Comments
 (0)