Skip to content

Commit c5a8ffd

Browse files
fix: Queue messages on lost connection and reconnect also on hibernation (#176)
* fix: Queue messages on lost connection and reconnect also on hibernation * revert hibernated ignore as it would be breaking
1 parent 73e3e5c commit c5a8ffd

File tree

1 file changed

+46
-5
lines changed

1 file changed

+46
-5
lines changed

src/AgentClient/AgentConnection.ts

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ export class AgentConnection {
5757

5858
private nextMessageId = 0;
5959
private pendingMessages = new Map<number, PendingPitcherMessage<any, any>>();
60+
private messageQueue: PendingPitcherMessage<any, any>[] = [];
6061
private notificationListeners: Record<
6162
string,
6263
SliceList<(params: any) => void>
@@ -174,18 +175,29 @@ export class AgentConnection {
174175
options: IRequestOptions = {}
175176
) {
176177
if (this._isDisposed) {
177-
throw new Error("Cannot perform operation: SandboxClient has been disposed");
178+
throw new Error(
179+
"Cannot perform operation: SandboxClient has been disposed"
180+
);
178181
}
179-
182+
180183
const { timeoutMs } = options;
181184
const request = this.createRequest(pitcherRequest, timeoutMs);
182185

186+
// If not connected, queue the message for later
187+
if (this.state !== "CONNECTED") {
188+
this.messageQueue.push(request);
189+
return request.unwrap();
190+
}
191+
183192
try {
184193
// This will throw if we are not in the right connection state
185194
this.connection.send(request.message);
186195

187196
return request.unwrap();
188197
} catch (error) {
198+
// If send fails, queue the message for retry on reconnect
199+
this.messageQueue.push(request);
200+
189201
this.errorEmitter.fire({
190202
message: (error as Error).message,
191203
extras: {
@@ -195,9 +207,8 @@ export class AgentConnection {
195207
},
196208
});
197209

198-
// We always want to return a promise from the method so it does not matter if the error is related to disconnect
199-
// or Pitcher giving an error. It all ends up in the `catch` of the unwrapped promise
200-
return Promise.reject(error);
210+
// Return the queued message's promise instead of rejecting
211+
return request.unwrap();
201212
}
202213
}
203214

@@ -267,6 +278,28 @@ export class AgentConnection {
267278
});
268279

269280
this.state = "CONNECTED";
281+
282+
// Flush the message queue after successful reconnection
283+
await this.flushMessageQueue();
284+
}
285+
286+
private async flushMessageQueue() {
287+
const queuedMessages = [...this.messageQueue];
288+
this.messageQueue = [];
289+
290+
for (const message of queuedMessages) {
291+
if (message.isDisposed) {
292+
// Skip messages that have already been disposed (timed out)
293+
continue;
294+
}
295+
296+
try {
297+
this.connection.send(message.message);
298+
} catch (error) {
299+
// If send still fails, reject the message
300+
message.reject(error as Error);
301+
}
302+
}
270303
}
271304

272305
dispose(): void {
@@ -275,7 +308,15 @@ export class AgentConnection {
275308
this.messageEmitter.dispose();
276309
this.connection.dispose();
277310
this.disposePendingMessages();
311+
this.disposeQueuedMessages();
278312
this.pendingMessages.clear();
313+
this.messageQueue = [];
279314
this.notificationListeners = {};
280315
}
316+
317+
private disposeQueuedMessages() {
318+
this.messageQueue.forEach((queuedMessage) => {
319+
queuedMessage.dispose("Client disposed");
320+
});
321+
}
281322
}

0 commit comments

Comments
 (0)