Skip to content

Commit 706c46f

Browse files
authored
Implement Event-Source Send in CopilotStudioClient to handle stream responses (#614)
* Use eventSource to handle streaming responses * Apply feedback * Update API docs * Fix lint issue * Remove resolved and integrity from package-lock
1 parent af4b41f commit 706c46f

File tree

7 files changed

+174
-144
lines changed

7 files changed

+174
-144
lines changed

compat/baseline/agents-copilotstudio-client.api.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ export class ConnectionSettings extends ConnectionOptions {
2525
// @public
2626
export class CopilotStudioClient {
2727
constructor(settings: ConnectionSettings, token: string);
28-
askQuestionAsync(question: string, conversationId?: string): Promise<Activity[]>;
28+
askQuestionAsync(question: string, conversationId?: string): AsyncGenerator<Activity>;
2929
static scopeFromSettings: (settings: ConnectionSettings) => string;
30-
sendActivity(activity: Activity, conversationId?: string): Promise<Activity[]>;
31-
startConversationAsync(emitStartConversationEvent?: boolean): Promise<Activity>;
30+
sendActivity(activity: Activity, conversationId?: string): AsyncGenerator<Activity>;
31+
startConversationAsync(emitStartConversationEvent?: boolean): AsyncGenerator<Activity>;
3232
}
3333

3434
// @public

package-lock.json

Lines changed: 18 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/agents-copilotstudio-client/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
},
2929
"dependencies": {
3030
"@microsoft/agents-activity": "file:../agents-activity",
31-
"axios": "^1.12.2",
31+
"eventsource-client": "^1.2.0",
3232
"rxjs": "7.8.2",
3333
"uuid": "^11.1.0"
3434
},

packages/agents-copilotstudio-client/src/copilotStudioClient.ts

Lines changed: 92 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
* Licensed under the MIT License.
44
*/
55

6+
import { createEventSource, EventSourceClient } from 'eventsource-client'
67
import { ConnectionSettings } from './connectionSettings'
7-
import axios, { AxiosInstance, AxiosRequestConfig } from 'axios'
88
import { getCopilotStudioConnectionUrl, getTokenAudience } from './powerPlatformEnvironment'
99
import { Activity, ActivityTypes, ConversationAccount } from '@microsoft/agents-activity'
1010
import { ExecuteTurnRequest } from './executeTurnRequest'
@@ -14,11 +14,6 @@ import os from 'os'
1414

1515
const logger = debug('copilot-studio:client')
1616

17-
interface streamRead {
18-
done: boolean,
19-
value: string
20-
}
21-
2217
/**
2318
* Client for interacting with Microsoft Copilot Studio services.
2419
* Provides functionality to start conversations and send messages to Copilot Studio bots.
@@ -33,8 +28,8 @@ export class CopilotStudioClient {
3328
private conversationId: string = ''
3429
/** The connection settings for the client. */
3530
private readonly settings: ConnectionSettings
36-
/** The Axios instance used for HTTP requests. */
37-
private readonly client: AxiosInstance
31+
/** The authenticaton token. */
32+
private readonly token: string
3833

3934
/**
4035
* Returns the scope URL needed to connect to Copilot Studio from the connection settings.
@@ -51,79 +46,70 @@ export class CopilotStudioClient {
5146
*/
5247
constructor (settings: ConnectionSettings, token: string) {
5348
this.settings = settings
54-
this.client = axios.create()
55-
this.client.defaults.headers.common.Authorization = `Bearer ${token}`
56-
this.client.defaults.headers.common['User-Agent'] = CopilotStudioClient.getProductInfo()
49+
this.token = token
5750
}
5851

59-
private async postRequestAsync (axiosConfig: AxiosRequestConfig): Promise<Activity[]> {
60-
const activities: Activity[] = []
61-
62-
logger.debug(`>>> SEND TO ${axiosConfig.url}`)
63-
64-
const response = await this.client(axiosConfig)
65-
66-
if (this.settings.useExperimentalEndpoint && !this.settings.directConnectUrl?.trim()) {
67-
const islandExperimentalUrl = response.headers?.[CopilotStudioClient.islandExperimentalUrlHeaderKey]
68-
if (islandExperimentalUrl) {
69-
this.settings.directConnectUrl = islandExperimentalUrl
70-
logger.debug(`Island Experimental URL: ${islandExperimentalUrl}`)
71-
}
72-
}
73-
74-
this.conversationId = response.headers?.[CopilotStudioClient.conversationIdHeaderKey] ?? ''
75-
if (this.conversationId) {
76-
logger.debug(`Conversation ID: ${this.conversationId}`)
77-
}
78-
79-
const sanitizedHeaders = { ...response.headers }
80-
delete sanitizedHeaders['Authorization']
81-
delete sanitizedHeaders[CopilotStudioClient.conversationIdHeaderKey]
82-
logger.debug('Headers received:', sanitizedHeaders)
52+
/**
53+
* Streams activities from the Copilot Studio service using eventsource-client.
54+
* @param url The connection URL for Copilot Studio.
55+
* @param body Optional. The request body (for POST).
56+
* @param method Optional. The HTTP method (default: POST).
57+
* @returns An async generator yielding the Agent's Activities.
58+
*/
59+
private async * postRequestAsync (url: string, body?: any, method: string = 'POST'): AsyncGenerator<Activity> {
60+
logger.debug(`>>> SEND TO ${url}`)
8361

84-
const stream = response.data
85-
const reader = stream.pipeThrough(new TextDecoderStream()).getReader()
86-
let result: string = ''
87-
const results: string[] = []
88-
89-
const processEvents = async ({ done, value }: streamRead): Promise<string[]> => {
90-
if (done) {
91-
logger.debug('Stream complete')
92-
result += value
93-
results.push(result)
94-
return results
62+
const eventSource: EventSourceClient = createEventSource({
63+
url,
64+
headers: {
65+
Authorization: `Bearer ${this.token}`,
66+
'User-Agent': CopilotStudioClient.getProductInfo(),
67+
'Content-Type': 'application/json',
68+
Accept: 'text/event-stream'
69+
},
70+
body: body ? JSON.stringify(body) : undefined,
71+
method,
72+
fetch: async (url, init) => {
73+
const response = await fetch(url, init)
74+
this.processResponseHeaders(response.headers)
75+
return response
9576
}
96-
logger.info('Agent is typing ...')
97-
result += value
98-
99-
return await processEvents(await reader.read())
100-
}
77+
})
10178

102-
const events: string[] = await reader.read().then(processEvents)
103-
104-
events.forEach(event => {
105-
const values: string[] = event.toString().split('\n')
106-
const validEvents = values.filter(e => e.substring(0, 4) === 'data' && e !== 'data: end\r')
107-
validEvents.forEach(ve => {
108-
try {
109-
const act = Activity.fromJson(ve.substring(5, ve.length))
110-
if (act.type === ActivityTypes.Message) {
111-
activities.push(act)
112-
if (!this.conversationId.trim()) {
113-
// Did not get it from the header.
114-
this.conversationId = act.conversation?.id ?? ''
115-
logger.debug(`Conversation ID: ${this.conversationId}`)
79+
try {
80+
for await (const { data, event } of eventSource) {
81+
if (data && event === 'activity') {
82+
try {
83+
const activity = Activity.fromJson(data)
84+
switch (activity.type) {
85+
case ActivityTypes.Message:
86+
if (!this.conversationId.trim()) { // Did not get it from the header.
87+
this.conversationId = activity.conversation?.id ?? ''
88+
logger.debug(`Conversation ID: ${this.conversationId}`)
89+
}
90+
yield activity
91+
break
92+
default:
93+
logger.debug(`Activity type: ${activity.type}`)
94+
yield activity
95+
break
11696
}
117-
} else {
118-
logger.debug(`Activity type: ${act.type}`)
97+
} catch (error) {
98+
logger.error('Failed to parse activity:', error)
11999
}
120-
} catch (e) {
121-
logger.error('Error: ', e)
122-
throw e
100+
} else if (event === 'end') {
101+
logger.debug('Stream complete')
102+
break
123103
}
124-
})
125-
})
126-
return activities
104+
105+
if (eventSource.readyState === 'closed') {
106+
logger.debug('Connection closed')
107+
break
108+
}
109+
}
110+
} finally {
111+
eventSource.close()
112+
}
127113
}
128114

129115
/**
@@ -146,41 +132,50 @@ export class CopilotStudioClient {
146132
return userAgent
147133
}
148134

135+
private processResponseHeaders (responseHeaders: Headers): void {
136+
if (this.settings.useExperimentalEndpoint && !this.settings.directConnectUrl?.trim()) {
137+
const islandExperimentalUrl = responseHeaders?.get(CopilotStudioClient.islandExperimentalUrlHeaderKey)
138+
if (islandExperimentalUrl) {
139+
this.settings.directConnectUrl = islandExperimentalUrl
140+
logger.debug(`Island Experimental URL: ${islandExperimentalUrl}`)
141+
}
142+
}
143+
144+
this.conversationId = responseHeaders?.get(CopilotStudioClient.conversationIdHeaderKey) ?? ''
145+
if (this.conversationId) {
146+
logger.debug(`Conversation ID: ${this.conversationId}`)
147+
}
148+
149+
const sanitizedHeaders = new Headers()
150+
responseHeaders.forEach((value, key) => {
151+
if (key.toLowerCase() !== 'authorization' && key.toLowerCase() !== CopilotStudioClient.conversationIdHeaderKey.toLowerCase()) {
152+
sanitizedHeaders.set(key, value)
153+
}
154+
})
155+
logger.debug('Headers received:', sanitizedHeaders)
156+
}
157+
149158
/**
150159
* Starts a new conversation with the Copilot Studio service.
151160
* @param emitStartConversationEvent Whether to emit a start conversation event. Defaults to true.
152-
* @returns A promise that resolves to the initial activity of the conversation.
161+
* @returns An async generator yielding the Agent's Activities.
153162
*/
154-
public async startConversationAsync (emitStartConversationEvent: boolean = true): Promise<Activity> {
163+
public async * startConversationAsync (emitStartConversationEvent: boolean = true): AsyncGenerator<Activity> {
155164
const uriStart: string = getCopilotStudioConnectionUrl(this.settings)
156165
const body = { emitStartConversationEvent }
157166

158-
const config: AxiosRequestConfig = {
159-
method: 'post',
160-
url: uriStart,
161-
headers: {
162-
Accept: 'text/event-stream',
163-
'Content-Type': 'application/json',
164-
},
165-
data: body,
166-
responseType: 'stream',
167-
adapter: 'fetch'
168-
}
169-
170167
logger.info('Starting conversation ...')
171-
const values = await this.postRequestAsync(config)
172-
const act = values[0]
173-
logger.info(`Conversation '${act.conversation?.id}' started. Received ${values.length} activities.`, values)
174-
return act
168+
169+
yield * this.postRequestAsync(uriStart, body, 'POST')
175170
}
176171

177172
/**
178173
* Sends a question to the Copilot Studio service and retrieves the response activities.
179174
* @param question The question to ask.
180175
* @param conversationId The ID of the conversation. Defaults to the current conversation ID.
181-
* @returns A promise that resolves to an array of activities containing the responses.
176+
* @returns An async generator yielding the Agent's Activities.
182177
*/
183-
public async askQuestionAsync (question: string, conversationId: string = this.conversationId) {
178+
public async * askQuestionAsync (question: string, conversationId: string = this.conversationId) : AsyncGenerator<Activity> {
184179
const conversationAccount: ConversationAccount = {
185180
id: conversationId
186181
}
@@ -191,34 +186,21 @@ export class CopilotStudioClient {
191186
}
192187
const activity = Activity.fromObject(activityObj)
193188

194-
return this.sendActivity(activity)
189+
yield * this.sendActivity(activity)
195190
}
196191

197192
/**
198193
* Sends an activity to the Copilot Studio service and retrieves the response activities.
199194
* @param activity The activity to send.
200195
* @param conversationId The ID of the conversation. Defaults to the current conversation ID.
201-
* @returns A promise that resolves to an array of activities containing the responses.
196+
* @returns An async generator yielding the Agent's Activities.
202197
*/
203-
public async sendActivity (activity: Activity, conversationId: string = this.conversationId) {
198+
public async * sendActivity (activity: Activity, conversationId: string = this.conversationId) : AsyncGenerator<Activity> {
204199
const localConversationId = activity.conversation?.id ?? conversationId
205200
const uriExecute = getCopilotStudioConnectionUrl(this.settings, localConversationId)
206201
const qbody: ExecuteTurnRequest = new ExecuteTurnRequest(activity)
207202

208-
const config: AxiosRequestConfig = {
209-
method: 'post',
210-
url: uriExecute,
211-
headers: {
212-
Accept: 'text/event-stream',
213-
'Content-Type': 'application/json',
214-
},
215-
data: qbody,
216-
responseType: 'stream',
217-
adapter: 'fetch'
218-
}
219203
logger.info('Sending activity...', activity)
220-
const values = await this.postRequestAsync(config)
221-
logger.info(`Received ${values.length} activities.`, values)
222-
return values
204+
yield * this.postRequestAsync(uriExecute, qbody, 'POST')
223205
}
224206
}

packages/agents-copilotstudio-client/src/copilotStudioWebChat.ts

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -218,12 +218,12 @@ export class CopilotStudioWebChat {
218218

219219
logger.debug('--> Connection established.')
220220
notifyTyping()
221-
const activity = await client.startConversationAsync()
222-
// Remove replyToId to avoid timeout issues with WebChat on first activity.
223-
delete activity.replyToId
224-
conversation = activity.conversation
225-
sequence = 0
226-
notifyActivity(activity)
221+
222+
for await (const activity of client.startConversationAsync()) {
223+
delete activity.replyToId
224+
conversation = activity.conversation
225+
notifyActivity(activity)
226+
}
227227
})
228228

229229
const notifyActivity = (activity: Partial<Activity>) => {
@@ -232,9 +232,10 @@ export class CopilotStudioWebChat {
232232
timestamp: new Date().toISOString(),
233233
channelData: {
234234
...activity.channelData,
235-
'webchat:sequence-id': sequence++,
235+
'webchat:sequence-id': sequence,
236236
},
237237
}
238+
sequence++
238239
logger.debug(`Notify '${newActivity.type}' activity to WebChat:`, newActivity)
239240
activitySubscriber?.next(newActivity)
240241
}
@@ -276,15 +277,16 @@ export class CopilotStudioWebChat {
276277
notifyActivity(newActivity)
277278
notifyTyping()
278279

279-
const activities = await client.sendActivity(newActivity)
280+
// Notify WebChat immediately that the message was sent
281+
subscriber.next(newActivity.id!)
280282

281-
for (const responseActivity of activities) {
283+
// Stream the agent's response, but don't block the UI
284+
for await (const responseActivity of client.sendActivity(newActivity)) {
282285
notifyActivity(responseActivity)
286+
logger.info('<-- Activity received correctly from Copilot Studio.')
283287
}
284288

285-
subscriber.next(newActivity.id!)
286289
subscriber.complete()
287-
logger.info('<-- Activity received correctly from Copilot Studio.')
288290
} catch (error) {
289291
logger.error('Error sending Activity to Copilot Studio:', error)
290292
subscriber.error(error)

0 commit comments

Comments
 (0)