Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/soft-memes-drive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents-plugin-silero': patch
---

Fix race condition causing "Writer is not bound to a WritableStream" error in Silero VAD
18 changes: 18 additions & 0 deletions agents/src/vad.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,24 @@ export abstract class VADStream implements AsyncIterableIterator<VADEvent> {
}
}

/**
* Safely send a VAD event to the output stream, handling writer release errors during shutdown.
* @returns true if the event was sent, false if the stream is closing
* @throws Error if an unexpected error occurs
*/
protected sendVADEvent(event: VADEvent): boolean {
if (this.closed) {
return false;
}

try {
this.outputWriter.write(event);
return true;
} catch (e) {
throw e;
}
}

updateInputStream(audioStream: ReadableStream<AudioFrame>) {
this.deferredInputStream.setSource(audioStream);
}
Expand Down
104 changes: 58 additions & 46 deletions plugins/silero/src/vad.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,26 +260,30 @@ export class VADStream extends baseStream {
pubSilenceDuration += windowDuration;
}

this.outputWriter.write({
type: VADEventType.INFERENCE_DONE,
samplesIndex: pubCurrentSample,
timestamp: pubTimestamp,
silenceDuration: pubSilenceDuration,
speechDuration: pubSpeechDuration,
probability: p,
inferenceDuration,
frames: [
new AudioFrame(
inputFrame.data.subarray(0, toCopyInt),
this.#inputSampleRate,
1,
toCopyInt,
),
],
speaking: pubSpeaking,
rawAccumulatedSilence: silenceThresholdDuration,
rawAccumulatedSpeech: speechThresholdDuration,
});
if (
!this.sendVADEvent({
type: VADEventType.INFERENCE_DONE,
samplesIndex: pubCurrentSample,
timestamp: pubTimestamp,
silenceDuration: pubSilenceDuration,
speechDuration: pubSpeechDuration,
probability: p,
inferenceDuration,
frames: [
new AudioFrame(
inputFrame.data.subarray(0, toCopyInt),
this.#inputSampleRate,
1,
toCopyInt,
),
],
speaking: pubSpeaking,
rawAccumulatedSilence: silenceThresholdDuration,
rawAccumulatedSpeech: speechThresholdDuration,
})
) {
continue;
}

const resetWriteCursor = () => {
if (!this.#speechBuffer) throw new Error('speechBuffer is empty');
Expand Down Expand Up @@ -314,19 +318,23 @@ export class VADStream extends baseStream {
pubSilenceDuration = 0;
pubSpeechDuration = speechThresholdDuration;

this.outputWriter.write({
type: VADEventType.START_OF_SPEECH,
samplesIndex: pubCurrentSample,
timestamp: pubTimestamp,
silenceDuration: pubSilenceDuration,
speechDuration: pubSpeechDuration,
probability: p,
inferenceDuration,
frames: [copySpeechBuffer()],
speaking: pubSpeaking,
rawAccumulatedSilence: 0,
rawAccumulatedSpeech: 0,
});
if (
!this.sendVADEvent({
Comment on lines +321 to +322
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (!this.sendVADEvent) {

    continue
}

Feels redundant? We can just do this.sendVADEvent? We don't need a return type?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this.sendVADEvent return false, that means the output writer is closed and we need to jump out of the loop, so I think this is not redundant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's fine for now

type: VADEventType.START_OF_SPEECH,
samplesIndex: pubCurrentSample,
timestamp: pubTimestamp,
silenceDuration: pubSilenceDuration,
speechDuration: pubSpeechDuration,
probability: p,
inferenceDuration,
frames: [copySpeechBuffer()],
speaking: pubSpeaking,
rawAccumulatedSilence: 0,
rawAccumulatedSpeech: 0,
})
) {
continue;
}
}
} else {
silenceThresholdDuration += windowDuration;
Expand All @@ -341,19 +349,23 @@ export class VADStream extends baseStream {
pubSpeechDuration = 0;
pubSilenceDuration = silenceThresholdDuration;

this.outputWriter.write({
type: VADEventType.END_OF_SPEECH,
samplesIndex: pubCurrentSample,
timestamp: pubTimestamp,
silenceDuration: pubSilenceDuration,
speechDuration: pubSpeechDuration,
probability: p,
inferenceDuration,
frames: [copySpeechBuffer()],
speaking: pubSpeaking,
rawAccumulatedSilence: 0,
rawAccumulatedSpeech: 0,
});
if (
!this.sendVADEvent({
type: VADEventType.END_OF_SPEECH,
samplesIndex: pubCurrentSample,
timestamp: pubTimestamp,
silenceDuration: pubSilenceDuration,
speechDuration: pubSpeechDuration,
probability: p,
inferenceDuration,
frames: [copySpeechBuffer()],
speaking: pubSpeaking,
rawAccumulatedSilence: 0,
rawAccumulatedSpeech: 0,
})
) {
continue;
}

resetWriteCursor();
}
Expand Down