Skip to content

Commit 2f4e9ee

Browse files
authored
chore(redis): use the core API in redis instrumentations (#14341)
This change updates the widely-used redis utility functions to ue the core API, facilitating clearer separation of concerns between instrumentation and tracing. To this end, it also moves utility functions to the appropriate subpackage: `_trace` for those related to tracing and `contrib.internal` for those related to instrumentation. ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
1 parent 4e81dd5 commit 2f4e9ee

File tree

10 files changed

+115
-135
lines changed

10 files changed

+115
-135
lines changed

ddtrace/_trace/trace_handlers.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from ddtrace.ext import azure_servicebus as azure_servicebusx
3030
from ddtrace.ext import db
3131
from ddtrace.ext import http
32+
from ddtrace.ext import redis as redisx
3233
from ddtrace.internal import core
3334
from ddtrace.internal.compat import maybe_stringify
3435
from ddtrace.internal.constants import COMPONENT
@@ -137,7 +138,7 @@ def _start_span(ctx: core.ExecutionContext, call_trace: bool = True, **kwargs) -
137138
span = (tracer.trace if call_trace else tracer.start_span)(ctx["span_name"], **span_kwargs)
138139

139140
for tk, tv in ctx.get_item("tags", dict()).items():
140-
span.set_tag_str(tk, tv)
141+
span.set_tag(tk, tv)
141142
if ctx.get_item("measured"):
142143
# PERF: avoid setting via Span.set_tag
143144
span.set_metric(_SPAN_MEASURED_KEY, 1)
@@ -780,6 +781,18 @@ def _on_redis_command_post(ctx: core.ExecutionContext, rowcount):
780781
ctx.span.set_metric(db.ROWCOUNT, rowcount)
781782

782783

784+
def _on_redis_execute_pipeline(ctx: core.ExecutionContext, pin, config_integration, args, instance, query):
785+
span = ctx.span
786+
if args is not None:
787+
# PERF: avoid extra overhead from checks in Span.set_metric
788+
span._metrics[redisx.ARGS_LEN] = len(args)
789+
else:
790+
for attr in ("command_stack", "_command_stack"):
791+
if hasattr(instance, attr):
792+
# PERF: avoid extra overhead from checks in Span.set_metric
793+
span._metrics[redisx.PIPELINE_LEN] = len(getattr(instance, attr))
794+
795+
783796
def _on_valkey_command_post(ctx: core.ExecutionContext, rowcount):
784797
if rowcount is not None:
785798
ctx.span.set_metric(db.ROWCOUNT, rowcount)
@@ -921,6 +934,7 @@ def listen():
921934
core.on("botocore.kinesis.GetRecords.post", _on_botocore_kinesis_getrecords_post)
922935
core.on("redis.async_command.post", _on_redis_command_post)
923936
core.on("redis.command.post", _on_redis_command_post)
937+
core.on("redis.execute_pipeline", _on_redis_execute_pipeline)
924938
core.on("valkey.async_command.post", _on_valkey_command_post)
925939
core.on("valkey.command.post", _on_valkey_command_post)
926940
core.on("azure.functions.request_call_modifier", _on_azure_functions_request_span_modifier)
@@ -984,6 +998,7 @@ def listen():
984998
"botocore.patched_stepfunctions_api_call",
985999
"botocore.patched_bedrock_api_call",
9861000
"redis.command",
1001+
"redis.execute_pipeline",
9871002
"valkey.command",
9881003
"rq.queue.enqueue_job",
9891004
"rq.traced_queue_fetch_job",
@@ -1007,6 +1022,8 @@ def listen():
10071022
"django.middleware.process_template_response",
10081023
"django.middleware.process_view",
10091024
"django.template.render",
1025+
"redis.execute_pipeline",
1026+
"redis.command",
10101027
"psycopg.patched_connect",
10111028
):
10121029
core.on(f"context.ended.{name}", _finish_span)

ddtrace/_trace/utils_redis.py

Lines changed: 0 additions & 94 deletions
This file was deleted.

ddtrace/contrib/internal/aioredis/patch.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77
from wrapt import wrap_function_wrapper as _w
88

99
from ddtrace import config
10-
from ddtrace._trace.utils_redis import _instrument_redis_cmd
11-
from ddtrace._trace.utils_redis import _instrument_redis_execute_pipeline
1210
from ddtrace.constants import _SPAN_MEASURED_KEY
1311
from ddtrace.constants import SPAN_KIND
1412
from ddtrace.contrib import trace_utils
1513
from ddtrace.contrib.internal.redis_utils import ROW_RETURNING_COMMANDS
14+
from ddtrace.contrib.internal.redis_utils import _instrument_redis_cmd
15+
from ddtrace.contrib.internal.redis_utils import _instrument_redis_execute_pipeline
1616
from ddtrace.contrib.internal.redis_utils import _run_redis_command_async
1717
from ddtrace.contrib.internal.redis_utils import determine_row_count
1818
from ddtrace.ext import SpanKind
@@ -50,8 +50,7 @@
5050
V2 = parse_version("2.0")
5151

5252

53-
def get_version():
54-
# type: () -> str
53+
def get_version() -> str:
5554
return aioredis_version_str
5655

5756

ddtrace/contrib/internal/aredis/patch.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
import wrapt
66

77
from ddtrace import config
8-
from ddtrace._trace.utils_redis import _instrument_redis_cmd
9-
from ddtrace._trace.utils_redis import _instrument_redis_execute_pipeline
8+
from ddtrace.contrib.internal.redis_utils import _instrument_redis_cmd
9+
from ddtrace.contrib.internal.redis_utils import _instrument_redis_execute_pipeline
1010
from ddtrace.contrib.internal.redis_utils import _run_redis_command_async
1111
from ddtrace.internal.schema import schematize_service_name
1212
from ddtrace.internal.utils.formats import CMD_MAX_LEN
@@ -26,8 +26,7 @@
2626
)
2727

2828

29-
def get_version():
30-
# type: () -> str
29+
def get_version() -> str:
3130
return getattr(aredis, "__version__", "")
3231

3332

ddtrace/contrib/internal/flask_cache/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# project
2-
from ddtrace._trace.utils_redis import _extract_conn_tags as extract_redis_tags
32
from ddtrace.contrib.internal.pylibmc.addrs import parse_addresses
3+
from ddtrace.contrib.internal.redis_utils import _extract_conn_tags as extract_redis_tags
44
from ddtrace.ext import net
55

66

ddtrace/contrib/internal/redis/asyncio_patch.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from ddtrace import config
2-
from ddtrace._trace.utils_redis import _instrument_redis_cmd
3-
from ddtrace._trace.utils_redis import _instrument_redis_execute_async_cluster_pipeline
4-
from ddtrace._trace.utils_redis import _instrument_redis_execute_pipeline
2+
from ddtrace.contrib.internal.redis_utils import _instrument_redis_cmd
3+
from ddtrace.contrib.internal.redis_utils import _instrument_redis_execute_pipeline
54
from ddtrace.contrib.internal.redis_utils import _run_redis_command_async
65
from ddtrace.internal.utils.formats import stringify_cache_args
76
from ddtrace.trace import Pin
@@ -32,5 +31,5 @@ async def instrumented_async_execute_cluster_pipeline(func, instance, args, kwar
3231
return await func(*args, **kwargs)
3332

3433
cmds = [stringify_cache_args(c.args, cmd_max_len=config.redis.cmd_max_length) for c in instance._command_stack]
35-
with _instrument_redis_execute_async_cluster_pipeline(pin, config.redis, cmds, instance):
34+
with _instrument_redis_execute_pipeline(pin, config.redis, cmds, instance):
3635
return await func(*args, **kwargs)

ddtrace/contrib/internal/redis/patch.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
import wrapt
66

77
from ddtrace import config
8-
from ddtrace._trace.utils_redis import _instrument_redis_cmd
9-
from ddtrace._trace.utils_redis import _instrument_redis_execute_pipeline
108
from ddtrace.contrib.internal.redis_utils import ROW_RETURNING_COMMANDS
9+
from ddtrace.contrib.internal.redis_utils import _instrument_redis_cmd
10+
from ddtrace.contrib.internal.redis_utils import _instrument_redis_execute_pipeline
1111
from ddtrace.contrib.internal.redis_utils import determine_row_count
1212
from ddtrace.contrib.internal.trace_utils import unwrap
1313
from ddtrace.internal import core
@@ -28,8 +28,7 @@
2828
)
2929

3030

31-
def get_version():
32-
# type: () -> str
31+
def get_version() -> str:
3332
return getattr(redis, "__version__", "")
3433

3534

@@ -183,7 +182,7 @@ def _instrumented_execute_pipeline(func, instance, args, kwargs):
183182
stringify_cache_args(c, cmd_max_len=integration_config.cmd_max_length)
184183
for c, _ in instance.command_stack
185184
]
186-
with _instrument_redis_execute_pipeline(pin, integration_config, cmds, instance, is_cluster):
185+
with _instrument_redis_execute_pipeline(pin, integration_config, cmds, instance):
187186
return func(*args, **kwargs)
188187

189188
return _instrumented_execute_pipeline

ddtrace/contrib/internal/redis_utils.py

Lines changed: 80 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
1+
from contextlib import contextmanager
12
from typing import Dict
23
from typing import List
34
from typing import Optional
45
from typing import Union
56

7+
from ddtrace.constants import SPAN_KIND
8+
from ddtrace.contrib import trace_utils
9+
from ddtrace.ext import SpanKind
10+
from ddtrace.ext import SpanTypes
11+
from ddtrace.ext import db
612
from ddtrace.ext import net
713
from ddtrace.ext import redis as redisx
814
from ddtrace.internal import core
15+
from ddtrace.internal.constants import COMPONENT
16+
from ddtrace.internal.schema import schematize_cache_operation
917
from ddtrace.internal.utils.formats import stringify_cache_args
1018

1119

@@ -30,23 +38,6 @@
3038
ROW_RETURNING_COMMANDS = SINGLE_KEY_COMMANDS + MULTI_KEY_COMMANDS
3139

3240

33-
def _extract_conn_tags(conn_kwargs):
34-
"""Transform redis conn info into dogtrace metas"""
35-
try:
36-
conn_tags = {
37-
net.TARGET_HOST: conn_kwargs["host"],
38-
net.TARGET_PORT: conn_kwargs["port"],
39-
net.SERVER_ADDRESS: conn_kwargs["host"],
40-
redisx.DB: conn_kwargs.get("db") or 0,
41-
}
42-
client_name = conn_kwargs.get("client_name")
43-
if client_name:
44-
conn_tags[redisx.CLIENT_NAME] = client_name
45-
return conn_tags
46-
except Exception:
47-
return {}
48-
49-
5041
def determine_row_count(redis_command: str, result: Optional[Union[List, Dict, str]]) -> int:
5142
empty_results = [b"", [], {}, None]
5243
# result can be an empty list / dict / string
@@ -82,3 +73,75 @@ async def _run_redis_command_async(ctx: core.ExecutionContext, func, args, kwarg
8273
if redis_command not in ROW_RETURNING_COMMANDS:
8374
rowcount = None
8475
core.dispatch("redis.async_command.post", [ctx, rowcount])
76+
77+
78+
def _extract_conn_tags(conn_kwargs) -> Dict[str, str]:
79+
try:
80+
conn_tags = {
81+
net.TARGET_HOST: conn_kwargs["host"],
82+
net.TARGET_PORT: conn_kwargs["port"],
83+
net.SERVER_ADDRESS: conn_kwargs["host"],
84+
redisx.DB: conn_kwargs.get("db") or 0,
85+
}
86+
client_name = conn_kwargs.get("client_name")
87+
if client_name:
88+
conn_tags[redisx.CLIENT_NAME] = client_name
89+
return conn_tags
90+
except Exception:
91+
return {}
92+
93+
94+
def _build_tags(query, pin, instance, integration_name):
95+
ret = dict()
96+
ret[SPAN_KIND] = SpanKind.CLIENT
97+
ret[COMPONENT] = integration_name
98+
ret[db.SYSTEM] = redisx.APP
99+
if query is not None:
100+
span_name = schematize_cache_operation(redisx.RAWCMD, cache_provider=redisx.APP) # type: ignore[operator]
101+
ret[span_name] = query
102+
if pin.tags:
103+
# PERF: avoid Span.set_tag to avoid unnecessary checks
104+
for key, value in pin.tags.items():
105+
ret[key] = value
106+
# some redis clients do not have a connection_pool attribute (ex. aioredis v1.3)
107+
if hasattr(instance, "connection_pool"):
108+
for key, value in _extract_conn_tags(instance.connection_pool.connection_kwargs).items():
109+
ret[key] = value
110+
return ret
111+
112+
113+
@contextmanager
114+
def _instrument_redis_execute_pipeline(pin, config_integration, cmds, instance):
115+
cmd_string = resource = "\n".join(cmds)
116+
if config_integration.resource_only_command:
117+
resource = "\n".join([cmd.split(" ")[0] for cmd in cmds])
118+
119+
with core.context_with_data(
120+
"redis.execute_pipeline",
121+
span_name=schematize_cache_operation(redisx.CMD, cache_provider=redisx.APP),
122+
resource=resource,
123+
service=trace_utils.ext_service(pin, config_integration),
124+
span_type=SpanTypes.REDIS,
125+
pin=pin,
126+
measured=True,
127+
tags=_build_tags(cmd_string, pin, instance, config_integration.integration_name),
128+
) as ctx:
129+
core.dispatch("redis.execute_pipeline", [ctx, pin, config_integration, None, instance, cmd_string])
130+
yield ctx.span
131+
132+
133+
@contextmanager
134+
def _instrument_redis_cmd(pin, config_integration, instance, args):
135+
query = stringify_cache_args(args, cmd_max_len=config_integration.cmd_max_length)
136+
with core.context_with_data(
137+
"redis.command",
138+
span_name=schematize_cache_operation(redisx.CMD, cache_provider=redisx.APP),
139+
pin=pin,
140+
service=trace_utils.ext_service(pin, config_integration),
141+
span_type=SpanTypes.REDIS,
142+
resource=query.split(" ")[0] if config_integration.resource_only_command else query,
143+
measured=True,
144+
tags=_build_tags(query, pin, instance, config_integration.integration_name),
145+
) as ctx:
146+
core.dispatch("redis.execute_pipeline", [ctx, pin, config_integration, args, instance, query])
147+
yield ctx

ddtrace/contrib/internal/yaaredis/patch.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
import yaaredis
66

77
from ddtrace import config
8-
from ddtrace._trace.utils_redis import _instrument_redis_cmd
9-
from ddtrace._trace.utils_redis import _instrument_redis_execute_pipeline
8+
from ddtrace.contrib.internal.redis_utils import _instrument_redis_cmd
9+
from ddtrace.contrib.internal.redis_utils import _instrument_redis_execute_pipeline
1010
from ddtrace.contrib.internal.redis_utils import _run_redis_command_async
1111
from ddtrace.internal.schema import schematize_service_name
1212
from ddtrace.internal.utils.deprecations import DDTraceDeprecationWarning
@@ -28,8 +28,7 @@
2828
)
2929

3030

31-
def get_version():
32-
# type: () -> str
31+
def get_version() -> str:
3332
return getattr(yaaredis, "__version__", "")
3433

3534

0 commit comments

Comments
 (0)