Skip to content

Commit bee16d4

Browse files
authored
openai: Fix missing or double spans when completion stream is used with context manager (#80)
* openai: Add conditional span closure logic depending on whether streamed completion uses context manager or not * openai: Add conditional span closure logic for async streaming completion * openai: Guard span ending based on the flag rather than the context manager
1 parent 4971007 commit bee16d4

File tree

3 files changed

+249
-0
lines changed

3 files changed

+249
-0
lines changed

instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py

+18
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,13 @@ def __init__(
6767
self.choices = []
6868
self.usage = None
6969
self.service_tier = None
70+
self.ended = False
7071

7172
def end(self, exc=None):
73+
if self.ended:
74+
return
75+
76+
self.ended = True
7277
if exc is not None:
7378
self.span.set_status(StatusCode.ERROR, str(exc))
7479
self.span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__)
@@ -111,6 +116,12 @@ def process_chunk(self, chunk):
111116
if hasattr(chunk, "service_tier"):
112117
self.service_tier = chunk.service_tier
113118

119+
def __enter__(self):
120+
return self
121+
122+
def __exit__(self, exc_type, exc_value, traceback):
123+
self.end(exc_value)
124+
114125
def __iter__(self):
115126
stream = self.__wrapped__
116127
try:
@@ -124,6 +135,13 @@ def __iter__(self):
124135
raise
125136
self.end()
126137

138+
async def __aenter__(self):
139+
# No difference in behavior between sync and async context manager
140+
return self.__enter__()
141+
142+
async def __aexit__(self, exc_type, exc_value, traceback):
143+
self.__exit__(exc_type, exc_value, traceback)
144+
127145
async def __aiter__(self):
128146
stream = self.__wrapped__
129147
try:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
interactions:
2+
- request:
3+
body: |-
4+
{
5+
"messages": [
6+
{
7+
"role": "user",
8+
"content": "Answer in up to 3 words: Which ocean contains Bouvet Island?"
9+
}
10+
],
11+
"model": "gpt-4o-mini",
12+
"stream": true
13+
}
14+
headers:
15+
accept:
16+
- application/json
17+
accept-encoding:
18+
- gzip, deflate
19+
authorization:
20+
- Bearer test_openai_api_key
21+
connection:
22+
- keep-alive
23+
content-length:
24+
- '147'
25+
content-type:
26+
- application/json
27+
host:
28+
- api.openai.com
29+
user-agent:
30+
- OpenAI/Python 1.66.5
31+
x-stainless-arch:
32+
- arm64
33+
x-stainless-async:
34+
- 'false'
35+
x-stainless-lang:
36+
- python
37+
x-stainless-os:
38+
- MacOS
39+
x-stainless-package-version:
40+
- 1.66.5
41+
x-stainless-read-timeout:
42+
- '600'
43+
x-stainless-retry-count:
44+
- '0'
45+
x-stainless-runtime:
46+
- CPython
47+
x-stainless-runtime-version:
48+
- 3.12.6
49+
method: POST
50+
uri: https://api.openai.com/v1/chat/completions
51+
response:
52+
body:
53+
string: |+
54+
data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}]}
55+
56+
data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{"content":"South"},"logprobs":null,"finish_reason":null}]}
57+
58+
data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{"content":" Atlantic"},"logprobs":null,"finish_reason":null}]}
59+
60+
data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{"content":" Ocean"},"logprobs":null,"finish_reason":null}]}
61+
62+
data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{"content":"."},"logprobs":null,"finish_reason":null}]}
63+
64+
data: {"id":"chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL","object":"chat.completion.chunk","created":1745234787,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_dbaca60df0","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}
65+
66+
data: [DONE]
67+
68+
headers:
69+
CF-RAY:
70+
- 933c86cb9ae5773e-LHR
71+
Connection:
72+
- keep-alive
73+
Content-Type:
74+
- text/event-stream; charset=utf-8
75+
Date:
76+
- Mon, 21 Apr 2025 11:26:28 GMT
77+
Server:
78+
- cloudflare
79+
Set-Cookie: test_set_cookie
80+
Transfer-Encoding:
81+
- chunked
82+
X-Content-Type-Options:
83+
- nosniff
84+
access-control-expose-headers:
85+
- X-Request-ID
86+
alt-svc:
87+
- h3=":443"; ma=86400
88+
cf-cache-status:
89+
- DYNAMIC
90+
openai-organization: test_openai_org_id
91+
openai-processing-ms:
92+
- '460'
93+
openai-version:
94+
- '2020-10-01'
95+
strict-transport-security:
96+
- max-age=31536000; includeSubDomains; preload
97+
x-ratelimit-limit-requests:
98+
- '200'
99+
x-ratelimit-limit-tokens:
100+
- '100000'
101+
x-ratelimit-remaining-requests:
102+
- '199'
103+
x-ratelimit-remaining-tokens:
104+
- '88447'
105+
x-ratelimit-reset-requests:
106+
- 7m12s
107+
x-ratelimit-reset-tokens:
108+
- 83h10m39.057s
109+
x-request-id:
110+
- req_a39b652ec0ccb45a968e10112e569bf4
111+
status:
112+
code: 200
113+
message: OK
114+
version: 1

instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py

+117
Original file line numberDiff line numberDiff line change
@@ -1103,6 +1103,62 @@ def test_chat_stream(default_openai_env, trace_exporter, metrics_reader, logs_ex
11031103
)
11041104

11051105

1106+
@pytest.mark.vcr()
1107+
def test_chat_stream_with_context_manager(default_openai_env, trace_exporter, metrics_reader, logs_exporter):
1108+
client = openai.OpenAI()
1109+
1110+
messages = [
1111+
{
1112+
"role": "user",
1113+
"content": TEST_CHAT_INPUT,
1114+
}
1115+
]
1116+
1117+
# Use a context manager for the streaming response
1118+
with client.chat.completions.create(model=TEST_CHAT_MODEL, messages=messages, stream=True) as chat_completion:
1119+
chunks = [chunk.choices[0].delta.content or "" for chunk in chat_completion if chunk.choices]
1120+
assert "".join(chunks) == "South Atlantic Ocean."
1121+
1122+
spans = trace_exporter.get_finished_spans()
1123+
assert len(spans) == 1
1124+
1125+
span = spans[0]
1126+
assert span.name == f"chat {TEST_CHAT_MODEL}"
1127+
assert span.kind == SpanKind.CLIENT
1128+
assert span.status.status_code == StatusCode.UNSET
1129+
1130+
address, port = address_and_port(client)
1131+
assert dict(span.attributes) == {
1132+
GEN_AI_OPERATION_NAME: "chat",
1133+
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
1134+
GEN_AI_SYSTEM: "openai",
1135+
GEN_AI_RESPONSE_ID: "chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL",
1136+
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
1137+
GEN_AI_RESPONSE_FINISH_REASONS: ("stop",),
1138+
SERVER_ADDRESS: address,
1139+
SERVER_PORT: port,
1140+
GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default",
1141+
}
1142+
1143+
logs = logs_exporter.get_finished_logs()
1144+
assert len(logs) == 2
1145+
log_records = logrecords_from_logs(logs)
1146+
user_message, choice = log_records
1147+
assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"}
1148+
assert dict(user_message.body) == {}
1149+
1150+
assert_stop_log_record(choice)
1151+
1152+
(operation_duration_metric,) = get_sorted_metrics(metrics_reader)
1153+
attributes = {
1154+
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
1155+
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
1156+
}
1157+
assert_operation_duration_metric(
1158+
client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319
1159+
)
1160+
1161+
11061162
@pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available")
11071163
@pytest.mark.vcr()
11081164
def test_chat_stream_with_raw_response(default_openai_env, trace_exporter, metrics_reader, logs_exporter):
@@ -2096,6 +2152,67 @@ async def test_chat_async_stream(default_openai_env, trace_exporter, metrics_rea
20962152
)
20972153

20982154

2155+
@pytest.mark.vcr()
2156+
@pytest.mark.asyncio
2157+
async def test_chat_async_stream_with_context_manager(
2158+
default_openai_env, trace_exporter, metrics_reader, logs_exporter
2159+
):
2160+
client = openai.AsyncOpenAI()
2161+
2162+
messages = [
2163+
{
2164+
"role": "user",
2165+
"content": TEST_CHAT_INPUT,
2166+
}
2167+
]
2168+
2169+
# Use a context manager for the asynchronous streaming response
2170+
async with await client.chat.completions.create(
2171+
model=TEST_CHAT_MODEL, messages=messages, stream=True
2172+
) as chat_completion:
2173+
chunks = [chunk.choices[0].delta.content or "" async for chunk in chat_completion if chunk.choices]
2174+
assert "".join(chunks) == "South Atlantic Ocean."
2175+
2176+
spans = trace_exporter.get_finished_spans()
2177+
assert len(spans) == 1
2178+
2179+
span = spans[0]
2180+
assert span.name == f"chat {TEST_CHAT_MODEL}"
2181+
assert span.kind == SpanKind.CLIENT
2182+
assert span.status.status_code == StatusCode.UNSET
2183+
2184+
address, port = address_and_port(client)
2185+
assert dict(span.attributes) == {
2186+
GEN_AI_OPERATION_NAME: "chat",
2187+
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
2188+
GEN_AI_SYSTEM: "openai",
2189+
GEN_AI_RESPONSE_ID: "chatcmpl-BOja7e365tj5upRjLFinadEB8ZoDL",
2190+
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
2191+
GEN_AI_RESPONSE_FINISH_REASONS: ("stop",),
2192+
SERVER_ADDRESS: address,
2193+
SERVER_PORT: port,
2194+
GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default",
2195+
}
2196+
2197+
logs = logs_exporter.get_finished_logs()
2198+
assert len(logs) == 2
2199+
log_records = logrecords_from_logs(logs)
2200+
user_message, choice = log_records
2201+
assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"}
2202+
assert dict(user_message.body) == {}
2203+
2204+
assert_stop_log_record(choice)
2205+
2206+
(operation_duration_metric,) = get_sorted_metrics(metrics_reader)
2207+
attributes = {
2208+
GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL,
2209+
GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL,
2210+
}
2211+
assert_operation_duration_metric(
2212+
client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319
2213+
)
2214+
2215+
20992216
@pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available")
21002217
@pytest.mark.vcr()
21012218
@pytest.mark.asyncio

0 commit comments

Comments
 (0)