diff --git a/.changeset/yummy-parents-jam.md b/.changeset/yummy-parents-jam.md new file mode 100644 index 000000000..9f92e637f --- /dev/null +++ b/.changeset/yummy-parents-jam.md @@ -0,0 +1,7 @@ +--- +'@livekit/agents-plugin-google': patch +'@livekit/agents-plugin-openai': patch +'@livekit/agents': patch +--- + +Usage with separate TTS diff --git a/agents/src/llm/realtime.ts b/agents/src/llm/realtime.ts index 12cafb7b6..03d00dbe4 100644 --- a/agents/src/llm/realtime.ts +++ b/agents/src/llm/realtime.ts @@ -19,6 +19,7 @@ export interface MessageGeneration { messageId: string; textStream: ReadableStream; audioStream: ReadableStream; + modalities?: ['text'] | ['text', 'audio']; } export interface GenerationCreatedEvent { @@ -40,6 +41,7 @@ export interface RealtimeCapabilities { turnDetection: boolean; userTranscription: boolean; autoToolReplyGeneration: boolean; + audioOutput: boolean; } export interface InputTranscriptionCompleted { diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 06a226162..6eaa31e7a 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -1520,10 +1520,29 @@ export class AgentActivity implements RecognitionHooks { break; } const trNodeResult = await this.agent.transcriptionNode(msg.textStream, modelSettings); + + // Determine if we need to tee the text stream for both text output and TTS + const needsTextOutput = !!textOutput && !!trNodeResult; + const needsTTSSynthesis = + audioOutput && + this.llm instanceof RealtimeModel && + !this.llm.capabilities.audioOutput && + this.tts; + const needsBothTextAndTTS = needsTextOutput && needsTTSSynthesis; + + // Tee the stream if we need it for both purposes + let textStreamForOutput = trNodeResult; + let textStreamForTTS = trNodeResult; + if (needsBothTextAndTTS && trNodeResult) { + const [stream1, stream2] = trNodeResult.tee(); + textStreamForOutput = stream1; + textStreamForTTS = stream2; + } + let textOut: _TextOut | null = null; - if (trNodeResult) { + if (textStreamForOutput) { const [textForwardTask, _textOut] = performTextForwarding( - trNodeResult, + textStreamForOutput, abortController, textOutput, ); @@ -1532,23 +1551,49 @@ export class AgentActivity implements RecognitionHooks { } let audioOut: _AudioOut | null = null; if (audioOutput) { - const realtimeAudio = await this.agent.realtimeAudioOutputNode( - msg.audioStream, - modelSettings, - ); - if (realtimeAudio) { - const [forwardTask, _audioOut] = performAudioForwarding( - realtimeAudio, - audioOutput, - abortController, + // Check if realtime model has audio output capability + if (this.llm instanceof RealtimeModel && this.llm.capabilities.audioOutput) { + // Use realtime audio output + const realtimeAudio = await this.agent.realtimeAudioOutputNode( + msg.audioStream, + modelSettings, ); - forwardTasks.push(forwardTask); - audioOut = _audioOut; - audioOut.firstFrameFut.await.finally(onFirstFrame); + if (realtimeAudio) { + const [forwardTask, _audioOut] = performAudioForwarding( + realtimeAudio, + audioOutput, + abortController, + ); + forwardTasks.push(forwardTask); + audioOut = _audioOut; + audioOut.firstFrameFut.await.finally(onFirstFrame); + } else { + this.logger.warn( + 'audio output is enabled but neither tts nor realtime audio is available', + ); + } } else { - this.logger.warn( - 'audio output is enabled but neither tts nor realtime audio is available', - ); + // Text-only mode - synthesize audio using TTS + if (this.tts && textStreamForTTS) { + const [ttsTask, ttsStream] = performTTSInference( + (...args) => this.agent.ttsNode(...args), + textStreamForTTS, + modelSettings, + abortController, + ); + forwardTasks.push(ttsTask); + + const [forwardTask, _audioOut] = performAudioForwarding( + ttsStream, + audioOutput, + abortController, + ); + forwardTasks.push(forwardTask); + audioOut = _audioOut; + audioOut.firstFrameFut.await.finally(onFirstFrame); + } else if (!this.tts) { + this.logger.warn('realtime model in text-only mode but no TTS is configured'); + } } } else if (textOut) { textOut.firstTextFut.await.finally(onFirstFrame); diff --git a/plugins/google/src/beta/realtime/realtime_api.ts b/plugins/google/src/beta/realtime/realtime_api.ts index e1857c0da..3dc1309d6 100644 --- a/plugins/google/src/beta/realtime/realtime_api.ts +++ b/plugins/google/src/beta/realtime/realtime_api.ts @@ -290,6 +290,7 @@ export class RealtimeModel extends llm.RealtimeModel { turnDetection: serverTurnDetection, userTranscription: inputAudioTranscription !== null, autoToolReplyGeneration: true, + audioOutput: (options.modalities || [Modality.AUDIO]).includes(Modality.AUDIO), }); // Environment variable fallbacks diff --git a/plugins/openai/src/realtime/api_proto.ts b/plugins/openai/src/realtime/api_proto.ts index 75e66c3e6..239586b3d 100644 --- a/plugins/openai/src/realtime/api_proto.ts +++ b/plugins/openai/src/realtime/api_proto.ts @@ -190,7 +190,7 @@ export interface SessionResource { id: string; object: 'realtime.session'; model: string; - modalities: ['text', 'audio'] | ['text']; // default: ["text", "audio"] + output_modalities: ['text'] | ['audio']; instructions: string; voice: Voice; // default: "alloy" input_audio_format: AudioFormat; // default: "pcm16" @@ -267,7 +267,7 @@ export interface SessionUpdateEvent extends BaseClientEvent { type: 'session.update'; session: Partial<{ model: Model; - modalities: ['text', 'audio'] | ['text']; + modalities: ['text'] | ['text', 'audio']; instructions: string; voice: Voice; input_audio_format: AudioFormat; @@ -350,7 +350,7 @@ export interface ConversationItemDeleteEvent extends BaseClientEvent { export interface ResponseCreateEvent extends BaseClientEvent { type: 'response.create'; response?: Partial<{ - modalities: ['text', 'audio'] | ['text']; + output_modalities: ['text'] | ['audio']; instructions: string; voice: Voice; output_audio_format: AudioFormat; @@ -511,6 +511,7 @@ export interface ResponseContentPartDoneEvent extends BaseServerEvent { export interface ResponseTextDeltaEvent extends BaseServerEvent { type: 'response.text.delta'; response_id: string; + item_id: string; output_index: number; content_index: number; delta: string; @@ -519,6 +520,7 @@ export interface ResponseTextDeltaEvent extends BaseServerEvent { export interface ResponseTextDoneEvent extends BaseServerEvent { type: 'response.text.done'; response_id: string; + item_id: string; output_index: number; content_index: number; text: string; diff --git a/plugins/openai/src/realtime/realtime_model.ts b/plugins/openai/src/realtime/realtime_model.ts index ea51205b2..04cb2e20d 100644 --- a/plugins/openai/src/realtime/realtime_model.ts +++ b/plugins/openai/src/realtime/realtime_model.ts @@ -38,6 +38,7 @@ interface RealtimeOptions { model: api_proto.Model; voice: api_proto.Voice; temperature: number; + modalities: ['text'] | ['text', 'audio']; toolChoice?: llm.ToolChoice; inputAudioTranscription?: api_proto.InputAudioTranscription | null; // TODO(shubhra): add inputAudioNoiseReduction @@ -61,6 +62,7 @@ interface MessageGeneration { textChannel: stream.StreamChannel; audioChannel: stream.StreamChannel; audioTranscript: string; + modalities?: ['text'] | ['text', 'audio']; } interface ResponseGeneration { @@ -121,6 +123,7 @@ const DEFAULT_REALTIME_MODEL_OPTIONS = { model: 'gpt-realtime', voice: 'marin', temperature: DEFAULT_TEMPERATURE, + modalities: ['text', 'audio'] as ['text', 'audio'], inputAudioTranscription: DEFAULT_INPUT_AUDIO_TRANSCRIPTION, turnDetection: DEFAULT_TURN_DETECTION, toolChoice: DEFAULT_TOOL_CHOICE, @@ -142,6 +145,7 @@ export class RealtimeModel extends llm.RealtimeModel { model?: string; voice?: string; temperature?: number; + modalities?: ['text'] | ['text', 'audio']; toolChoice?: llm.ToolChoice; baseURL?: string; inputAudioTranscription?: api_proto.InputAudioTranscription | null; @@ -162,6 +166,7 @@ export class RealtimeModel extends llm.RealtimeModel { turnDetection: options.turnDetection !== null, userTranscription: options.inputAudioTranscription !== null, autoToolReplyGeneration: false, + audioOutput: options.modalities ? (options.modalities as string[]).includes('audio') : true, }); const isAzure = !!(options.apiVersion || options.entraToken || options.azureDeployment); @@ -197,6 +202,7 @@ export class RealtimeModel extends llm.RealtimeModel { apiKey, isAzure, model: options.model || DEFAULT_REALTIME_MODEL_OPTIONS.model, + modalities: options.modalities || DEFAULT_REALTIME_MODEL_OPTIONS.modalities, }; } @@ -229,6 +235,7 @@ export class RealtimeModel extends llm.RealtimeModel { entraToken, baseURL, voice = 'alloy', + modalities, inputAudioTranscription = AZURE_DEFAULT_INPUT_AUDIO_TRANSCRIPTION, turnDetection = AZURE_DEFAULT_TURN_DETECTION, temperature = 0.8, @@ -241,6 +248,7 @@ export class RealtimeModel extends llm.RealtimeModel { entraToken?: string; baseURL?: string; voice?: string; + modalities?: ['text'] | ['text', 'audio']; inputAudioTranscription?: api_proto.InputAudioTranscription; // TODO(shubhra): add inputAudioNoiseReduction turnDetection?: api_proto.TurnDetectionType; @@ -273,6 +281,7 @@ export class RealtimeModel extends llm.RealtimeModel { return new RealtimeModel({ voice, + modalities, inputAudioTranscription, turnDetection, temperature, @@ -398,7 +407,7 @@ export class RealtimeSession extends llm.RealtimeSession { voice: this.oaiRealtimeModel._options.voice, input_audio_format: 'pcm16', output_audio_format: 'pcm16', - modalities: ['text', 'audio'], + modalities: this.oaiRealtimeModel._options.modalities, // Supported combinations are: ['text'] and ['audio', 'text'].", turn_detection: this.oaiRealtimeModel._options.turnDetection, input_audio_transcription: this.oaiRealtimeModel._options.inputAudioTranscription, // TODO(shubhra): add inputAudioNoiseReduction @@ -909,6 +918,12 @@ export class RealtimeSession extends llm.RealtimeSession { case 'response.content_part.done': this.handleResponseContentPartDone(event); break; + case 'response.text.delta': + this.handleResponseTextDelta(event); + break; + case 'response.text.done': + this.handleResponseTextDone(event); + break; case 'response.audio_transcript.delta': this.handleResponseAudioTranscriptDelta(event); break; @@ -1129,35 +1144,39 @@ export class RealtimeSession extends llm.RealtimeSession { const itemType = event.part.type; const responseId = event.response_id; - if (itemType === 'audio') { - this.resolveGeneration(responseId); - if (this.textModeRecoveryRetries > 0) { - this.#logger.info( - { retries: this.textModeRecoveryRetries }, - 'recovered from text-only response', - ); - this.textModeRecoveryRetries = 0; - } + this.resolveGeneration(responseId); + if (this.textModeRecoveryRetries > 0) { + this.#logger.info( + { retries: this.textModeRecoveryRetries }, + 'recovered from text-only response', + ); + this.textModeRecoveryRetries = 0; + } - const itemGeneration: MessageGeneration = { - messageId: itemId, - textChannel: stream.createStreamChannel(), - audioChannel: stream.createStreamChannel(), - audioTranscript: '', - }; - - this.currentGeneration.messageChannel.write({ - messageId: itemId, - textStream: itemGeneration.textChannel.stream(), - audioStream: itemGeneration.audioChannel.stream(), - }); + const itemGeneration: MessageGeneration = { + messageId: itemId, + textChannel: stream.createStreamChannel(), + audioChannel: stream.createStreamChannel(), + audioTranscript: '', + }; - this.currentGeneration.messages.set(itemId, itemGeneration); - this.currentGeneration._firstTokenTimestamp = Date.now(); - return; - } else { - this.interrupt(); - if (this.textModeRecoveryRetries === 0) { + if (!this.oaiRealtimeModel.capabilities.audioOutput) { + itemGeneration.audioChannel.close(); + itemGeneration.modalities = ['text']; + } + + this.currentGeneration.messageChannel.write({ + messageId: itemId, + textStream: itemGeneration.textChannel.stream(), + audioStream: itemGeneration.audioChannel.stream(), + modalities: itemGeneration.modalities || ['text', 'audio'], + }); + + this.currentGeneration.messages.set(itemId, itemGeneration); + + if (itemType === 'text') { + // Only warn if we expected audio but received text + if (this.textModeRecoveryRetries === 0 && this.oaiRealtimeModel.capabilities.audioOutput) { this.#logger.warn({ responseId }, 'received text-only response from OpenAI Realtime API'); } } @@ -1175,6 +1194,32 @@ export class RealtimeSession extends llm.RealtimeSession { // TODO(shubhra): handle text mode recovery } + private handleResponseTextDelta(event: api_proto.ResponseTextDeltaEvent): void { + if (!this.currentGeneration) { + throw new Error('currentGeneration is not set'); + } + + const itemGeneration = this.currentGeneration.messages.get(event.item_id); + if (!itemGeneration) { + throw new Error('itemGeneration is not set'); + } + + // Set first token timestamp if in text-only mode + if (itemGeneration.modalities?.[0] === 'text' && !this.currentGeneration._firstTokenTimestamp) { + this.currentGeneration._firstTokenTimestamp = Date.now(); + } + + itemGeneration.textChannel.write(event.delta); + itemGeneration.audioTranscript += event.delta; + } + + private handleResponseTextDone(_event: api_proto.ResponseTextDoneEvent): void { + if (!this.currentGeneration) { + throw new Error('currentGeneration is not set'); + } + // No additional processing needed - just assert generation exists + } + private handleResponseAudioTranscriptDelta( event: api_proto.ResponseAudioTranscriptDeltaEvent, ): void { @@ -1206,6 +1251,13 @@ export class RealtimeSession extends llm.RealtimeSession { throw new Error('itemGeneration is not set'); } + if (!this.currentGeneration._firstTokenTimestamp) { + this.currentGeneration._firstTokenTimestamp = Date.now(); + } + if (!itemGeneration.modalities) { + itemGeneration.modalities = ['text', 'audio']; + } + const binaryString = atob(event.delta); const len = binaryString.length; const bytes = new Uint8Array(len);