diff --git a/agents/src/llm/chat_context.ts b/agents/src/llm/chat_context.ts index fa40a0b7..e5972d6c 100644 --- a/agents/src/llm/chat_context.ts +++ b/agents/src/llm/chat_context.ts @@ -499,6 +499,113 @@ export class ChatContext { return await toChatCtx(format, this, injectDummyUserMessage); } + /** + * Compare this ChatContext with another for logical equivalence. + * Unlike strict equality, this method: + * - Ignores timestamps (createdAt fields) + * - Ignores other volatile metadata + * - Focuses on content: compares IDs, types, and payload + * + * This is useful for detecting if the conversation content has changed, + * for example when validating preemptive generation results. + * + * @param other - The ChatContext to compare with + * @returns true if both contexts contain the same sequence of items with matching essential fields + */ + isEquivalent(other: ChatContext): boolean { + // Same object reference + if (this === other) { + return true; + } + + // Different lengths + if (this._items.length !== other._items.length) { + return false; + } + + // Compare each item pair + for (let i = 0; i < this._items.length; i++) { + const a = this._items[i]!; + const b = other._items[i]!; + + // IDs and types must match + if (a.id !== b.id || a.type !== b.type) { + return false; + } + + // Type-specific field comparison + if (a.type === 'message' && b.type === 'message') { + // Compare role, content, and interrupted status (not timestamp) + if (a.role !== b.role || a.interrupted !== b.interrupted) { + return false; + } + + // Compare content arrays + if (a.content.length !== b.content.length) { + return false; + } + + for (let j = 0; j < a.content.length; j++) { + const ca = a.content[j]!; + const cb = b.content[j]!; + + // Both are strings + if (typeof ca === 'string' && typeof cb === 'string') { + if (ca !== cb) { + return false; + } + } + // Both are objects + else if (typeof ca === 'object' && typeof cb === 'object') { + if (ca.type !== cb.type) { + return false; + } + + if (ca.type === 'image_content' && cb.type === 'image_content') { + // Compare essential image fields (not cache) + if ( + ca.id !== cb.id || + ca.image !== cb.image || + ca.inferenceDetail !== cb.inferenceDetail || + ca.inferenceWidth !== cb.inferenceWidth || + ca.inferenceHeight !== cb.inferenceHeight || + ca.mimeType !== cb.mimeType + ) { + return false; + } + } else if (ca.type === 'audio_content' && cb.type === 'audio_content') { + // Compare audio transcript (frames comparison would be too expensive) + if (ca.transcript !== cb.transcript) { + return false; + } + } + } + // Mismatched types + else { + return false; + } + } + } else if (a.type === 'function_call' && b.type === 'function_call') { + // Compare name, callId, and args (not timestamp) + if (a.name !== b.name || a.callId !== b.callId || a.args !== b.args) { + return false; + } + } else if (a.type === 'function_call_output' && b.type === 'function_call_output') { + // Compare name, callId, output, and isError (not timestamp) + if ( + a.name !== b.name || + a.callId !== b.callId || + a.output !== b.output || + a.isError !== b.isError + ) { + return false; + } + } + } + + return true; + } + /** * Internal helper used by `truncate` & `addMessage` to find the correct * insertion index for a timestamp so the list remains sorted. diff --git a/agents/src/stt/stt.ts b/agents/src/stt/stt.ts index 3cab864c..b6778750 100644 --- a/agents/src/stt/stt.ts +++ b/agents/src/stt/stt.ts @@ -38,6 +38,13 @@ export enum SpeechEventType { END_OF_SPEECH = 3, /** Usage event, emitted periodically to indicate usage metrics. */ RECOGNITION_USAGE = 4, + /** + * Preflight transcript, emitted when the STT has a confident interim result + * before the final transcript is ready. This is useful for preemptive generation + * to reduce latency. Contains all pre-committed transcripts including final + * transcripts from previous STT runs. + */ + PREFLIGHT_TRANSCRIPT = 5, } /** SpeechData contains metadata about this {@link SpeechEvent}. */ diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 06a22616..740ef369 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -43,6 +43,7 @@ import { type AgentSession, type TurnDetectionMode } from './agent_session.js'; import { AudioRecognition, type EndOfTurnInfo, + type PreemptiveGenerationInfo, type RecognitionHooks, type _TurnDetector, } from './audio_recognition.js'; @@ -71,6 +72,15 @@ import { SpeechHandle } from './speech_handle.js'; // equivalent to Python's contextvars const speechHandleStorage = new AsyncLocalStorage(); +interface PreemptiveGeneration { + speechHandle: SpeechHandle; + info: PreemptiveGenerationInfo; + chatCtx: ChatContext; + tools: ToolContext; + toolChoice: ToolChoice | null; + createdAt: number; +} + export class AgentActivity implements RecognitionHooks { private static readonly REPLY_TASK_CANCEL_TIMEOUT = 5000; private started = false; @@ -87,6 +97,7 @@ export class AgentActivity implements RecognitionHooks { private audioStream = new DeferredReadableStream(); // default to null as None, which maps to the default provider tool choice value private toolChoice: ToolChoice | null = null; + private preemptiveGeneration?: PreemptiveGeneration; agent: Agent; agentSession: AgentSession; @@ -664,6 +675,64 @@ export class AgentActivity implements RecognitionHooks { ); } + onPreemptiveGeneration(info: PreemptiveGenerationInfo): void { + if (!this.agentSession.options.preemptiveGeneration) { + return; + } + + if (this.draining) { + this.logger.debug('skipping preemptive generation, agent is draining'); + return; + } + + if (this._currentSpeech && !this._currentSpeech.interrupted) { + this.logger.debug('skipping preemptive generation, current speech is not interrupted'); + return; + } + + if (!(this.llm instanceof LLM)) { + this.logger.debug('skipping preemptive generation, LLM is not a standard LLM instance'); + return; + } + + // Cancel any existing preemptive generation + this.cancelPreemptiveGeneration(); + + const chatCtx = this.agent.chatCtx.copy(); + const userMessage = ChatMessage.create({ + role: 'user', + content: info.newTranscript, + }); + + this.logger.info( + { transcript: info.newTranscript, confidence: info.transcriptConfidence }, + 'starting preemptive generation', + ); + + const speechHandle = this.generateReply({ + userMessage, + chatCtx, + scheduleSpeech: false, // Don't schedule yet! + }); + + this.preemptiveGeneration = { + speechHandle, + info, + chatCtx, + tools: this.agent.toolCtx, + toolChoice: this.toolChoice, + createdAt: Date.now(), + }; + } + + private cancelPreemptiveGeneration(): void { + if (this.preemptiveGeneration) { + this.logger.debug('cancelling existing preemptive generation'); + this.preemptiveGeneration.speechHandle._cancel(); + this.preemptiveGeneration = undefined; + } + } + private createSpeechTask(options: { task: Task; ownedSpeechHandle?: SpeechHandle; @@ -775,6 +844,7 @@ export class AgentActivity implements RecognitionHooks { instructions?: string; toolChoice?: ToolChoice | null; allowInterruptions?: boolean; + scheduleSpeech?: boolean; }): SpeechHandle { const { userMessage, @@ -782,6 +852,7 @@ export class AgentActivity implements RecognitionHooks { instructions: defaultInstructions, toolChoice: defaultToolChoice, allowInterruptions: defaultAllowInterruptions, + scheduleSpeech = true, } = options; let instructions = defaultInstructions; @@ -871,7 +942,9 @@ export class AgentActivity implements RecognitionHooks { task.finally(() => this.onPipelineReplyDone()); } - this.scheduleSpeech(handle, SpeechHandle.SPEECH_PRIORITY_NORMAL); + if (scheduleSpeech) { + this.scheduleSpeech(handle, SpeechHandle.SPEECH_PRIORITY_NORMAL); + } return handle; } @@ -977,6 +1050,70 @@ export class AgentActivity implements RecognitionHooks { return; } + // Check if we can use preemptive generation + const preemptive = this.preemptiveGeneration; + if (preemptive) { + // Add the user message to the chat context for comparison + const validationChatCtx = this.agent.chatCtx.copy(); + if (userMessage) { + validationChatCtx.insert(userMessage); + } + + // Validate: transcript matches, context equivalent, tools unchanged, toolChoice unchanged + const transcriptMatches = preemptive.info.newTranscript === info.newTranscript; + const contextEquivalent = preemptive.chatCtx.isEquivalent(validationChatCtx); + const toolsUnchanged = preemptive.tools === this.agent.toolCtx; + const toolChoiceUnchanged = preemptive.toolChoice === this.toolChoice; + + if (transcriptMatches && contextEquivalent && toolsUnchanged && toolChoiceUnchanged) { + // Use preemptive generation! + const speechHandle = preemptive.speechHandle; + this.preemptiveGeneration = undefined; + + const leadTime = Date.now() - preemptive.createdAt; + this.logger.info( + { + transcript: info.newTranscript, + leadTimeMs: leadTime, + confidence: preemptive.info.transcriptConfidence, + }, + 'using preemptive generation', + ); + + // Schedule the preemptive speech + this.scheduleSpeech(speechHandle, SpeechHandle.SPEECH_PRIORITY_NORMAL); + + // Emit metrics + const eouMetrics: EOUMetrics = { + type: 'eou_metrics', + timestamp: Date.now(), + endOfUtteranceDelayMs: info.endOfUtteranceDelay, + transcriptionDelayMs: info.transcriptionDelay, + onUserTurnCompletedDelayMs: callbackDuration, + speechId: speechHandle.id, + }; + + this.agentSession.emit( + AgentSessionEventTypes.MetricsCollected, + createMetricsCollectedEvent({ metrics: eouMetrics }), + ); + + return; + } else { + // Context changed, discard and regenerate + this.logger.warn( + { + transcriptMatches, + contextEquivalent, + toolsUnchanged, + toolChoiceUnchanged, + }, + 'preemptive generation invalidated, regenerating', + ); + this.cancelPreemptiveGeneration(); + } + } + // Ensure the new message is passed to generateReply // This preserves the original message id, making it easier for users to track responses const speechHandle = this.generateReply({ userMessage, chatCtx }); diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 55c25e9a..4d2db8ef 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -57,6 +57,14 @@ export interface VoiceOptions { minEndpointingDelay: number; maxEndpointingDelay: number; maxToolSteps: number; + /** + * Enable preemptive generation to reduce latency. + * When enabled, the agent starts generating a response as soon as a confident + * interim transcript (preflight) is available, before the final transcript is ready. + * The preemptive generation is validated and potentially discarded if the chat context + * or tool definitions change during the turn. + */ + preemptiveGeneration: boolean; } const defaultVoiceOptions: VoiceOptions = { @@ -67,6 +75,7 @@ const defaultVoiceOptions: VoiceOptions = { minEndpointingDelay: 500, maxEndpointingDelay: 6000, maxToolSteps: 3, + preemptiveGeneration: false, } as const; export type TurnDetectionMode = 'stt' | 'vad' | 'realtime_llm' | 'manual' | _TurnDetector; diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index 08b7d028..b4f0287e 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -21,6 +21,11 @@ export interface EndOfTurnInfo { endOfUtteranceDelay: number; } +export interface PreemptiveGenerationInfo { + newTranscript: string; + transcriptConfidence: number; +} + export interface RecognitionHooks { onStartOfSpeech: (ev: VADEvent) => void; onVADInferenceDone: (ev: VADEvent) => void; @@ -28,6 +33,7 @@ export interface RecognitionHooks { onInterimTranscript: (ev: SpeechEvent) => void; onFinalTranscript: (ev: SpeechEvent) => void; onEndOfTurn: (info: EndOfTurnInfo) => Promise; + onPreemptiveGeneration?: (info: PreemptiveGenerationInfo) => void; retrieveChatCtx: () => ChatContext; } @@ -63,10 +69,13 @@ export class AudioRecognition { private lastFinalTranscriptTime = 0; private audioTranscript = ''; private audioInterimTranscript = ''; + private audioPreflightTranscript = ''; private lastSpeakingTime = 0; private userTurnCommitted = false; private speaking = false; private sampleRate?: number; + private finalTranscriptConfidences: number[] = []; + private preflightTranscriptConfidence = 0; private vadInputStream: ReadableStream; private sttInputStream: ReadableStream; @@ -145,6 +154,7 @@ export class AudioRecognition { this.hooks.onFinalTranscript(ev); const transcript = ev.alternatives?.[0]?.text; this.lastLanguage = ev.alternatives?.[0]?.language; + const confidence = ev.alternatives?.[0]?.confidence ?? 0; if (!transcript) { // stt final transcript received but no transcript @@ -155,15 +165,39 @@ export class AudioRecognition { { user_transcript: transcript, language: this.lastLanguage, + confidence, }, 'received user transcript', ); this.lastFinalTranscriptTime = Date.now(); - this.audioTranscript += ` ${transcript}`; - this.audioTranscript = this.audioTranscript.trimStart(); + + // Track confidence for preemptive generation + this.finalTranscriptConfidences.push(confidence); + + const newAudioTranscript = `${this.audioTranscript} ${transcript}`.trim(); + const transcriptChanged = newAudioTranscript !== this.audioPreflightTranscript; + + this.audioTranscript = newAudioTranscript; this.audioInterimTranscript = ''; + // Trigger preemptive generation if transcript changed from preflight + if ( + transcriptChanged && + this.hooks.onPreemptiveGeneration && + (this.vadBaseTurnDetection || this.userTurnCommitted) + ) { + const avgConfidence = this.calculateAverageConfidence(); + this.logger.debug( + { transcript: this.audioTranscript, confidence: avgConfidence }, + 'triggering preemptive generation on final transcript', + ); + this.hooks.onPreemptiveGeneration({ + newTranscript: this.audioTranscript, + transcriptConfidence: avgConfidence, + }); + } + if (!this.speaking) { if (!this.vad) { // Copied from python agents: @@ -182,6 +216,54 @@ export class AudioRecognition { } } break; + case SpeechEventType.PREFLIGHT_TRANSCRIPT: + { + const preflightTranscript = ev.alternatives?.[0]?.text; + const preflightConfidence = ev.alternatives?.[0]?.confidence ?? 0; + + if (!preflightTranscript) { + return; + } + + this.logger.debug( + { + preflight_transcript: preflightTranscript, + confidence: preflightConfidence, + }, + 'received preflight transcript', + ); + + // Update preflight transcript and confidence + this.audioPreflightTranscript = `${this.audioTranscript} ${preflightTranscript}`.trim(); + this.preflightTranscriptConfidence = preflightConfidence; + + // Trigger preemptive generation if conditions are met + if ( + this.hooks.onPreemptiveGeneration && + (this.turnDetectionMode !== 'manual' || this.userTurnCommitted) + ) { + // Calculate confidence including all final transcripts plus the current preflight + const allConfidences = [...this.finalTranscriptConfidences, preflightConfidence]; + const avgConfidence = + allConfidences.length > 0 + ? allConfidences.reduce((a, b) => a + b, 0) / allConfidences.length + : 0; + + this.logger.debug( + { transcript: this.audioPreflightTranscript, confidence: avgConfidence }, + 'triggering preemptive generation on preflight transcript', + ); + + this.hooks.onPreemptiveGeneration({ + newTranscript: this.audioPreflightTranscript, + transcriptConfidence: avgConfidence, + }); + } + + // Still need to increment confidence tracking for turn detection + this.finalTranscriptConfidences.push(preflightConfidence); + } + break; case SpeechEventType.INTERIM_TRANSCRIPT: this.logger.debug({ transcript: ev.alternatives?.[0]?.text }, 'interim transcript'); this.hooks.onInterimTranscript(ev); @@ -412,7 +494,10 @@ export class AudioRecognition { clearUserTurn() { this.audioTranscript = ''; this.audioInterimTranscript = ''; + this.audioPreflightTranscript = ''; this.userTurnCommitted = false; + this.finalTranscriptConfidences = []; + this.preflightTranscriptConfidence = 0; this.sttTask?.cancelAndWait().finally(() => { this.sttTask = Task.from(({ signal }) => this.createSttTask(this.stt, signal)); @@ -464,6 +549,14 @@ export class AudioRecognition { }); } + private calculateAverageConfidence(): number { + if (this.finalTranscriptConfidences.length === 0) { + return 0; + } + const sum = this.finalTranscriptConfidences.reduce((a, b) => a + b, 0); + return sum / this.finalTranscriptConfidences.length; + } + async close() { this.detachInputAudioStream(); await this.commitUserTurnTask?.cancelAndWait(); diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts index bc4b1d4a..612bed17 100644 --- a/plugins/deepgram/src/stt.ts +++ b/plugins/deepgram/src/stt.ts @@ -35,6 +35,13 @@ export interface STTOptions { dictation: boolean; diarize: boolean; numerals: boolean; + /** + * Enable eager end-of-turn detection for preemptive generation. + * When set to a value between 0.3-0.9, Deepgram will emit EagerEndOfTurn events + * when it detects a pause in speech, allowing the agent to start generating responses + * preemptively. + */ + eagerEotThreshold?: number; } const defaultSTTOptions: STTOptions = { @@ -161,6 +168,7 @@ export class SpeechStream extends stt.SpeechStream { keyterm: this.#opts.keyterm, profanity_filter: this.#opts.profanityFilter, language: this.#opts.language, + eager_eot_threshold: this.#opts.eagerEotThreshold, }; Object.entries(params).forEach(([k, v]) => { if (v !== undefined) { @@ -326,6 +334,29 @@ export class SpeechStream extends stt.SpeechStream { break; } + case 'EagerEndOfTurn': { + // Deepgram has detected a pause in speech, but the user is technically + // still speaking. Send a preflight event to enable preemptive generation. + const metadata = json['metadata']; + const requestId = metadata['request_id']; + this.#requestId = requestId; + + const alternatives = liveTranscriptionToSpeechData(this.#opts.language!, json); + + if (alternatives[0] && alternatives[0].text) { + this.#logger.debug( + { transcript: alternatives[0].text, confidence: alternatives[0].confidence }, + 'received eager end-of-turn event', + ); + + this.queue.put({ + type: stt.SpeechEventType.PREFLIGHT_TRANSCRIPT, + alternatives: [alternatives[0], ...alternatives.slice(1)], + }); + } + + break; + } case 'Metadata': { break; }