Skip to content

Commit e6bf519

Browse files
committed
Support different waterdrop configurations per topic
1 parent c2138e8 commit e6bf519

File tree

5 files changed

+51
-11
lines changed

5 files changed

+51
-11
lines changed

lib/datadog/tracing/contrib/karafka/framework.rb

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,23 @@ module Karafka
99
# - instrument parts of the framework when needed
1010
module Framework
1111
def self.setup
12+
karafka_configurations = Datadog.configuration.tracing.fetch_integration(:karafka).configurations
13+
1214
Datadog.configure do |datadog_config|
13-
karafka_config = datadog_config.tracing[:karafka]
14-
activate_waterdrop!(datadog_config, karafka_config)
15+
karafka_configurations.each do |config_name, karafka_config|
16+
activate_waterdrop!(datadog_config, config_name, karafka_config)
17+
end
1518
end
1619
end
1720

1821
# Apply relevant configuration from Karafka to WaterDrop
19-
def self.activate_waterdrop!(datadog_config, karafka_config)
22+
def self.activate_waterdrop!(datadog_config, config_name, karafka_config)
2023
datadog_config.tracing.instrument(
2124
:waterdrop,
25+
enabled: karafka_config[:enabled],
2226
service_name: karafka_config[:service_name],
2327
distributed_tracing: karafka_config[:distributed_tracing],
28+
describes: config_name
2429
)
2530
end
2631
end

lib/datadog/tracing/contrib/waterdrop/integration.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ def new_configuration
3737
def patcher
3838
Patcher
3939
end
40+
41+
def resolver
42+
@resolver ||= Contrib::Configuration::Resolvers::PatternResolver.new
43+
end
4044
end
4145
end
4246
end

lib/datadog/tracing/contrib/waterdrop/monitor.rb

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,6 @@ module Monitor
1818
message.produced_sync
1919
].freeze
2020

21-
def configuration
22-
Datadog.configuration.tracing[:waterdrop]
23-
end
24-
2521
def instrument(event_id, payload = {}, &block)
2622
return super unless TRACEABLE_EVENTS.include?(event_id)
2723

@@ -40,15 +36,15 @@ def instrument(event_id, payload = {}, &block)
4036

4137
span.set_tag(Contrib::Karafka::Ext::TAG_MESSAGE_COUNT, payload[:messages].size)
4238

43-
payload[:messages].each { |message| inject(trace_digest, message) } if configuration[:distributed_tracing]
39+
payload[:messages].each { |message| inject(trace_digest, message) }
4440
else
4541
action = event_id.sub('message.produced', 'produce')
4642

4743
span.set_tag(Contrib::Ext::Messaging::TAG_DESTINATION, payload[:message][:topic])
4844
span.set_tag(Contrib::Karafka::Ext::TAG_PARTITION, payload[:message][:partition])
4945
span.set_tag(Contrib::Karafka::Ext::TAG_MESSAGE_COUNT, 1)
5046

51-
inject(trace_digest, payload[:message]) if configuration[:distributed_tracing]
47+
inject(trace_digest, payload[:message])
5248
end
5349

5450
span.resource = "waterdrop.#{action}"
@@ -63,9 +59,17 @@ def instrument(event_id, payload = {}, &block)
6359
private
6460

6561
def inject(trace_digest, message)
62+
return unless datadog_configuration(message[:topic])[:distributed_tracing]
63+
6664
message[:headers] ||= {}
6765
WaterDrop.inject(trace_digest, message[:headers])
6866
end
67+
68+
# cache the configuration resolution per topic to avoid repeated lookups in message batches
69+
def datadog_configuration(topic)
70+
@datadog_configuration ||= {}
71+
@datadog_configuration[topic] ||= Datadog.configuration.tracing[:waterdrop, topic]
72+
end
6973
end
7074
end
7175
end

spec/datadog/tracing/contrib/karafka/patcher_spec.rb

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,4 +199,33 @@
199199
expect(span.resource).to eq 'ABC#consume'
200200
end
201201
end
202+
203+
describe "framework auto-instrumentation" do
204+
around do |example|
205+
# Reset before and after each example; don't allow global state to linger.
206+
Datadog.registry[:waterdrop].reset_configuration!
207+
example.run
208+
Datadog.registry[:waterdrop].reset_configuration!
209+
210+
# reset Karafka internal state as well
211+
Karafka::App.config.internal.status.reset!
212+
Karafka.refresh!
213+
end
214+
215+
it "automatically enables waterdrop instrumentation" do
216+
Karafka::App.setup do |c|
217+
c.kafka = { 'bootstrap.servers': '127.0.0.1:9092' }
218+
end
219+
220+
expect(Datadog.configuration.tracing[:karafka][:enabled]).to be true
221+
expect(Datadog.configuration.tracing[:karafka][:distributed_tracing]).to be true
222+
expect(Datadog.configuration.tracing[:karafka, "special_topic"][:enabled]).to be true
223+
expect(Datadog.configuration.tracing[:karafka, "special_topic"][:distributed_tracing]).to be false
224+
225+
expect(Datadog.configuration.tracing[:waterdrop][:enabled]).to be true
226+
expect(Datadog.configuration.tracing[:waterdrop][:distributed_tracing]).to be true
227+
expect(Datadog.configuration.tracing[:waterdrop, "special_topic"][:enabled]).to be true
228+
expect(Datadog.configuration.tracing[:waterdrop, "special_topic"][:distributed_tracing]).to be false
229+
end
230+
end
202231
end

spec/datadog/tracing/contrib/waterdrop/monitor_spec.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
end
99
require 'datadog'
1010

11-
puts "waterdrop version: #{WaterDrop::VERSION}"
12-
1311
RSpec.describe 'Waterdrop monitor' do
1412
before do
1513
Datadog.configure do |c|

0 commit comments

Comments
 (0)