Skip to content

Commit d14a0bd

Browse files
authored
Add context capture and continued methods. (#29)
1 parent c544be8 commit d14a0bd

File tree

8 files changed

+169
-26
lines changed

8 files changed

+169
-26
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ futures-util = "0.3.21"
4545
prost = "0.10.4"
4646
prost-derive = "0.10.1"
4747
thiserror = "1.0.31"
48-
tokio = { version = "1.18.2", features = ["full"] }
48+
tokio = { version = "1.18.2", features = ["parking_lot"] }
4949
tonic = { version = "0.7.2", features = ["codegen"] }
5050
tracing = "0.1.35"
5151
uuid = { version = "1.1.0", features = ["serde", "v4"] }
@@ -54,6 +54,7 @@ uuid = { version = "1.1.0", features = ["serde", "v4"] }
5454
tonic-build = "0.7.2"
5555

5656
[dev-dependencies]
57+
tokio = { version = "1.18.2", features = ["rt-multi-thread"] }
5758
tokio-stream = { version = "0.1.8", features = ["net"] }
5859

5960
[[test]]

e2e/data/expected_context.yaml

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,30 @@
1616
# under the License.
1717
#
1818
segmentItems:
19-
- segmentSize: 1
19+
- segmentSize: gt 1
2020
segments:
21+
- segmentId: not null
22+
spans:
23+
- componentId: 11000
24+
endTime: gt 0
25+
isError: false
26+
operationName: async-callback
27+
parentSpanId: -1
28+
peer: ''
29+
refs:
30+
- networkAddress: ''
31+
parentEndpoint: async-job
32+
parentService: producer
33+
parentServiceInstance: node_0
34+
parentSpanId: 2
35+
parentTraceSegmentId: not null
36+
refType: CrossThread
37+
traceId: not null
38+
skipAnalysis: false
39+
spanId: 0
40+
spanLayer: Http
41+
spanType: Entry
42+
startTime: gt 0
2143
- segmentId: not null
2244
spans:
2345
- componentId: 11000
@@ -31,6 +53,17 @@ segmentItems:
3153
spanLayer: Http
3254
spanType: Exit
3355
startTime: gt 0
56+
- componentId: 11000
57+
endTime: gt 0
58+
isError: false
59+
operationName: async-job
60+
parentSpanId: 0
61+
peer: ''
62+
skipAnalysis: false
63+
spanId: 2
64+
spanLayer: Unknown
65+
spanType: Local
66+
startTime: gt 0
3467
- componentId: 11000
3568
endTime: gt 0
3669
isError: false

e2e/src/main.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,18 @@ async fn handle_ping(
5050

5151
client.request(req).await.unwrap();
5252
}
53+
{
54+
let _span3 = context.create_local_span("async-job");
55+
let snapshot = context.capture();
56+
57+
tokio::spawn(async move {
58+
let mut context2 = tracer::create_trace_context();
59+
let _span3 = context2.create_entry_span("async-callback");
60+
context2.continued(snapshot);
61+
})
62+
.await
63+
.unwrap();
64+
}
5365
Ok(Response::new(Body::from("hoge")))
5466
}
5567

examples/simple_trace_report.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ async fn handle_request(tracer: Tracer) {
2626
{
2727
// Generate an Entry Span when a request is received.
2828
// An Entry Span is generated only once per context.
29-
// You should assign a variable name to guard the span not be dropped immediately.
29+
// Assign a variable name to guard the span not to be dropped immediately.
3030
let _span = ctx.create_entry_span("op1");
3131

3232
// Something...

src/context/span.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use super::{
3737
/// {
3838
/// // Generate an Entry Span when a request is received.
3939
/// // An Entry Span is generated only once per context.
40-
/// // You should assign a variable name to guard the span not be dropped immediately.
40+
/// // Assign a variable name to guard the span not to be dropped immediately.
4141
/// let _span = ctx.create_entry_span("op1");
4242
///
4343
/// // Something...
@@ -57,7 +57,7 @@ use super::{
5757
/// // Auto report ctx when dropped.
5858
/// }
5959
/// ```
60-
#[derive(Clone)]
60+
#[must_use = "assign a variable name to guard the span not be dropped immediately."]
6161
pub struct Span {
6262
index: usize,
6363
context: WeakTracingContext,
@@ -118,6 +118,8 @@ impl Span {
118118
self.context.upgrade().expect("Context has dropped")
119119
}
120120

121+
// Notice: Perhaps in the future, `RwLock` can be used instead of `Mutex`, so `with_*` can be nested.
122+
// (Although I can't find the meaning of such use at present.)
121123
pub fn with_span_object<T>(&self, f: impl FnOnce(&SpanObject) -> T) -> T {
122124
self.upgrade_context()
123125
.with_active_span_stack(|stack| f(&stack[self.index]))

src/context/trace_context.rs

Lines changed: 59 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ use std::{
3333
mem::take,
3434
sync::{
3535
atomic::{AtomicI32, Ordering},
36-
Arc, Mutex, Weak,
36+
Arc, Mutex, RwLock, Weak,
3737
},
3838
};
3939

4040
struct Inner {
41-
trace_id: String,
41+
trace_id: RwLock<String>,
4242
trace_segment_id: String,
4343
service: String,
4444
service_instance: String,
@@ -49,8 +49,7 @@ struct Inner {
4949
primary_endpoint_name: Mutex<String>,
5050
}
5151

52-
#[derive(Clone)]
53-
#[must_use = "You should call `create_entry_span` after `TracingContext` created."]
52+
#[must_use = "call `create_entry_span` after `TracingContext` created."]
5453
pub struct TracingContext {
5554
inner: Arc<Inner>,
5655
tracer: WeakTracer,
@@ -79,7 +78,7 @@ impl TracingContext {
7978
) -> Self {
8079
TracingContext {
8180
inner: Arc::new(Inner {
82-
trace_id: RandomGenerator::generate(),
81+
trace_id: RwLock::new(RandomGenerator::generate()),
8382
trace_segment_id: RandomGenerator::generate(),
8483
service: service_name.to_string(),
8584
service_instance: instance_name.to_string(),
@@ -104,7 +103,7 @@ impl TracingContext {
104103
) -> Self {
105104
TracingContext {
106105
inner: Arc::new(Inner {
107-
trace_id: context.parent_trace_id.clone(),
106+
trace_id: RwLock::new(context.parent_trace_id.clone()),
108107
trace_segment_id: RandomGenerator::generate(),
109108
service: service_name.to_string(),
110109
service_instance: instance_name.to_string(),
@@ -119,8 +118,16 @@ impl TracingContext {
119118
}
120119

121120
#[inline]
122-
pub fn trace_id(&self) -> &str {
123-
&self.inner.trace_id
121+
pub fn trace_id(&self) -> String {
122+
self.with_trace_id(ToString::to_string)
123+
}
124+
125+
fn with_trace_id<T>(&self, f: impl FnOnce(&String) -> T) -> T {
126+
f(&*self.inner.trace_id.try_read().expect(LOCK_MSG))
127+
}
128+
129+
fn with_trace_id_mut<T>(&mut self, f: impl FnOnce(&mut String) -> T) -> T {
130+
f(&mut *self.inner.trace_id.try_write().expect(LOCK_MSG))
124131
}
125132

126133
#[inline]
@@ -182,8 +189,13 @@ impl TracingContext {
182189
self.with_active_span_stack(|stack| stack.last().map(f))
183190
}
184191

185-
// TODO Using for capture and continued.
186-
#[allow(dead_code)]
192+
pub(crate) fn with_active_span_mut<T>(
193+
&mut self,
194+
f: impl FnOnce(&mut SpanObject) -> T,
195+
) -> Option<T> {
196+
self.with_active_span_stack_mut(|stack| stack.last_mut().map(f))
197+
}
198+
187199
fn with_primary_endpoint_name<T>(&self, f: impl FnOnce(&String) -> T) -> T {
188200
f(&*self.inner.primary_endpoint_name.try_lock().expect(LOCK_MSG))
189201
}
@@ -209,7 +221,7 @@ impl TracingContext {
209221
if let Some(segment_link) = &self.inner.segment_link {
210222
span.refs.push(SegmentReference {
211223
ref_type: RefType::CrossProcess as i32,
212-
trace_id: self.inner.trace_id.clone(),
224+
trace_id: self.trace_id(),
213225
parent_trace_segment_id: segment_link.parent_trace_segment_id.clone(),
214226
parent_span_id: segment_link.parent_span_id,
215227
parent_service: segment_link.parent_service.clone(),
@@ -273,6 +285,40 @@ impl TracingContext {
273285
Span::new(index, self.downgrade())
274286
}
275287

288+
/// Capture a snapshot for cross-thread propagation.
289+
pub fn capture(&self) -> ContextSnapshot {
290+
ContextSnapshot {
291+
trace_id: self.trace_id(),
292+
trace_segment_id: self.trace_segment_id().to_owned(),
293+
span_id: self.peek_active_span_id().unwrap_or(-1),
294+
parent_endpoint: self.with_primary_endpoint_name(Clone::clone),
295+
}
296+
}
297+
298+
/// Build the reference between this segment and a cross-thread segment.
299+
pub fn continued(&mut self, snapshot: ContextSnapshot) {
300+
if snapshot.is_valid() {
301+
self.with_trace_id_mut(|trace_id| *trace_id = snapshot.trace_id.clone());
302+
303+
let tracer = self.upgrade_tracer();
304+
305+
let segment_ref = SegmentReference {
306+
ref_type: RefType::CrossThread as i32,
307+
trace_id: snapshot.trace_id,
308+
parent_trace_segment_id: snapshot.trace_segment_id,
309+
parent_span_id: snapshot.span_id,
310+
parent_service: tracer.service_name().to_owned(),
311+
parent_service_instance: tracer.instance_name().to_owned(),
312+
parent_endpoint: snapshot.parent_endpoint,
313+
network_address_used_at_peer: Default::default(),
314+
};
315+
316+
self.with_active_span_mut(|span| {
317+
span.refs.push(segment_ref);
318+
});
319+
}
320+
}
321+
276322
/// Close span. We can't use closed span after finalize called.
277323
pub(crate) fn finalize_span(&mut self, index: usize) -> Result<(), ()> {
278324
let span = self.pop_active_span(index);
@@ -290,7 +336,7 @@ impl TracingContext {
290336
///
291337
/// Notice: The spans will taked, so this method shouldn't be called twice.
292338
pub(crate) fn convert_segment_object(&mut self) -> SegmentObject {
293-
let trace_id = self.trace_id().to_owned();
339+
let trace_id = self.trace_id();
294340
let trace_segment_id = self.trace_segment_id().to_owned();
295341
let service = self.service().to_owned();
296342
let service_instance = self.service_instance().to_owned();
@@ -368,12 +414,11 @@ impl WeakTracingContext {
368414
}
369415
}
370416

417+
#[derive(Debug)]
371418
pub struct ContextSnapshot {
372419
trace_id: String,
373420
trace_segment_id: String,
374421
span_id: i32,
375-
// TODO Using for capture and continued.
376-
#[allow(dead_code)]
377422
parent_endpoint: String,
378423
}
379424

src/reporter/log.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
use super::Reporter;
1818
use crate::skywalking_proto::v3::SegmentObject;
19-
2019
use std::collections::LinkedList;
2120
use tonic::async_trait;
2221

tests/trace_context.rs

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ use skywalking::skywalking_proto::v3::{
2727
SpanType,
2828
};
2929
use std::collections::LinkedList;
30-
use std::future;
3130
use std::sync::Mutex;
3231
use std::{cell::Ref, sync::Arc};
32+
use std::{future, thread};
33+
use tokio::runtime::Handle;
3334

3435
/// Serialize from A should equal Serialize from B
3536
#[allow(dead_code)]
@@ -236,9 +237,6 @@ fn crossprocess_test() {
236237
drop(span3);
237238

238239
context2.with_spans(|spans| {
239-
let span3 = spans.last().unwrap();
240-
return;
241-
242240
let span3 = spans.last().unwrap();
243241
assert_eq!(span3.span_id, 0);
244242
assert_eq!(span3.parent_span_id, -1);
@@ -261,22 +259,75 @@ fn crossprocess_test() {
261259
}
262260
}
263261

262+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
263+
async fn cross_threads_test() {
264+
MockReporter::with_many(
265+
|reporter| {
266+
let tracer = Tracer::new("service", "instance", reporter);
267+
let mut ctx1 = tracer.create_trace_context();
268+
let _span1 = ctx1.create_entry_span("op1");
269+
let _span2 = ctx1.create_local_span("op2");
270+
let snapshot = ctx1.capture();
271+
272+
let tracer_ = tracer.clone();
273+
thread::spawn(move || {
274+
let mut ctx2 = tracer_.create_trace_context();
275+
let _span3 = ctx2.create_entry_span("op3");
276+
ctx2.continued(snapshot);
277+
})
278+
.join()
279+
.unwrap();
280+
281+
tracer
282+
},
283+
|segments| {
284+
let iter = segments.iter();
285+
let first = iter.nth(0).unwrap();
286+
let second = iter.nth(1).unwrap();
287+
288+
assert_eq!(first.trace_id, second.trace_id);
289+
assert_eq!(first.spans.refs.len(), 1);
290+
assert_eq!(
291+
first.spans.refs[0],
292+
SegmentReference {
293+
ref_type: RefType::CrossThread as i32,
294+
trace_id: second.trace_id.clone(),
295+
parent_trace_segment_id: second.trace_segment_id.clone(),
296+
parent_span_id: 1,
297+
parent_service: "service".to_owned(),
298+
parent_service_instance: "instance".to_owned(),
299+
parent_endpoint: "op2".to_owned(),
300+
..Default::default()
301+
}
302+
);
303+
assert_eq!(second.spans.len(), 2);
304+
},
305+
)
306+
.await;
307+
}
308+
264309
#[derive(Default, Clone)]
265310
struct MockReporter {
266311
segments: Arc<Mutex<LinkedList<SegmentObject>>>,
267312
}
268313

269314
impl MockReporter {
270315
async fn with(f1: impl FnOnce(MockReporter) -> Tracer, f2: impl FnOnce(&SegmentObject)) {
316+
Self::with_many(f1, |segments| f2(&segments.front().unwrap())).await;
317+
}
318+
319+
async fn with_many(
320+
f1: impl FnOnce(MockReporter) -> Tracer,
321+
f2: impl FnOnce(&LinkedList<SegmentObject>),
322+
) {
271323
let reporter = MockReporter::default();
272324

273325
let tracer = f1(reporter.clone());
274326

275327
tracer.reporting(future::ready(())).await.unwrap();
276328

277329
let segments = reporter.segments.try_lock().unwrap();
278-
let segment = segments.front().unwrap();
279-
f2(segment);
330+
f2(&*segments);
280331
}
281332
}
282333

0 commit comments

Comments
 (0)