Skip to content

Commit c544be8

Browse files
authored
Auto finalize context and span when dropped. (#28)
1 parent 0c43b32 commit c544be8

File tree

18 files changed

+922
-620
lines changed

18 files changed

+922
-620
lines changed

Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,14 @@ license = "Apache-2.0"
3232
homepage = "https://skywalking.apache.org/"
3333
repository = "https://github.com/apache/skywalking-rust"
3434

35+
[features]
36+
mock = [] # For internal integration testing only, do not use.
37+
3538
[dependencies]
3639
async-stream = "0.3.3"
3740
base64 = "0.13.0"
3841
bytes = "1.1.0"
42+
cfg-if = "1.0.0"
3943
futures-core = "0.3.21"
4044
futures-util = "0.3.21"
4145
prost = "0.10.4"
@@ -52,6 +56,10 @@ tonic-build = "0.7.2"
5256
[dev-dependencies]
5357
tokio-stream = { version = "0.1.8", features = ["net"] }
5458

59+
[[test]]
60+
name = "trace_context"
61+
required-features = ["mock"]
62+
5563
[[example]]
5664
name = "simple_trace_report"
5765
path = "examples/simple_trace_report.rs"

README.md

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,38 +37,38 @@ context after the span finished.
3737
use skywalking::context::tracer::Tracer;
3838
use skywalking::reporter::grpc::GrpcReporter;
3939
use std::error::Error;
40-
use std::sync::Arc;
4140
use tokio::signal;
4241
43-
async fn handle_request(tracer: Arc<Tracer<GrpcReporter>>) {
42+
async fn handle_request(tracer: Tracer) {
4443
let mut ctx = tracer.create_trace_context();
4544
4645
{
47-
// Generate an Entry Span when a request
48-
// is received. An Entry Span is generated only once per context.
49-
let span = ctx.create_entry_span("op1").unwrap();
46+
// Generate an Entry Span when a request is received.
47+
// An Entry Span is generated only once per context.
48+
// Assign a variable name to guard the span not to be dropped immediately.
49+
let _span = ctx.create_entry_span("op1");
5050
5151
// Something...
5252
5353
{
5454
// Generates an Exit Span when executing an RPC.
55-
let span2 = ctx.create_exit_span("op2", "remote_peer").unwrap();
55+
let _span2 = ctx.create_exit_span("op2", "remote_peer");
5656
5757
// Something...
5858
59-
ctx.finalize_span(span2);
59+
// Auto close span2 when dropped.
6060
}
6161
62-
ctx.finalize_span(span);
62+
// Auto close span when dropped.
6363
}
6464
65-
tracer.finalize_context(ctx);
65+
// Auto report ctx when dropped.
6666
}
6767
6868
#[tokio::main]
6969
async fn main() -> Result<(), Box<dyn Error>> {
7070
let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;
71-
let tracer = Arc::new(Tracer::new("service", "instance", reporter));
71+
let tracer = Tracer::new("service", "instance", reporter);
7272
7373
tokio::spawn(handle_request(tracer.clone()));
7474

e2e/src/main.rs

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,38 +21,25 @@ use hyper::{Body, Client, Method, Request, Response, Server, StatusCode};
2121
use skywalking::context::propagation::context::SKYWALKING_HTTP_CONTEXT_HEADER_KEY;
2222
use skywalking::context::propagation::decoder::decode_propagation;
2323
use skywalking::context::propagation::encoder::encode_propagation;
24-
use skywalking::context::tracer::Tracer;
24+
use skywalking::context::tracer::{self, Tracer};
2525
use skywalking::reporter::grpc::GrpcReporter;
2626
use std::convert::Infallible;
2727
use std::error::Error;
2828
use std::future::pending;
2929
use std::net::SocketAddr;
3030
use structopt::StructOpt;
31-
use tokio::sync::OnceCell;
3231

3332
static NOT_FOUND_MSG: &str = "not found";
3433
static SUCCESS_MSG: &str = "Success";
3534

36-
static GLOBAL_TRACER: OnceCell<Tracer<GrpcReporter>> = OnceCell::const_new();
37-
38-
fn set_global_tracer(tracer: Tracer<GrpcReporter>) {
39-
if GLOBAL_TRACER.set(tracer).is_err() {
40-
panic!("TRACER has setted")
41-
}
42-
}
43-
44-
fn get_global_tracer() -> &'static Tracer<GrpcReporter> {
45-
GLOBAL_TRACER.get().expect("TRACER haven't setted")
46-
}
47-
4835
async fn handle_ping(
4936
_req: Request<Body>,
5037
client: Client<HttpConnector>,
5138
) -> Result<Response<Body>, Infallible> {
52-
let mut context = get_global_tracer().create_trace_context();
53-
let span = context.create_entry_span("/ping").unwrap();
39+
let mut context = tracer::create_trace_context();
40+
let _span = context.create_entry_span("/ping");
5441
{
55-
let span2 = context.create_exit_span("/pong", "consumer:8082").unwrap();
42+
let _span2 = context.create_exit_span("/pong", "consumer:8082");
5643
let header = encode_propagation(&context, "/pong", "consumer:8082");
5744
let req = Request::builder()
5845
.method(Method::GET)
@@ -62,10 +49,7 @@ async fn handle_ping(
6249
.unwrap();
6350

6451
client.request(req).await.unwrap();
65-
context.finalize_span(span2);
6652
}
67-
context.finalize_span(span);
68-
get_global_tracer().finalize_context(context);
6953
Ok(Response::new(Body::from("hoge")))
7054
}
7155

@@ -112,10 +96,8 @@ async fn handle_pong(_req: Request<Body>) -> Result<Response<Body>, Infallible>
11296
.unwrap(),
11397
)
11498
.unwrap();
115-
let mut context = get_global_tracer().create_trace_context_from_propagation(ctx);
116-
let span = context.create_entry_span("/pong").unwrap();
117-
context.finalize_span(span);
118-
get_global_tracer().finalize_context(context);
99+
let mut context = tracer::create_trace_context_from_propagation(ctx);
100+
let _span = context.create_entry_span("/pong");
119101
Ok(Response::new(Body::from("hoge")))
120102
}
121103

@@ -158,13 +140,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
158140
let reporter = GrpcReporter::connect("http://collector:19876").await?;
159141

160142
let handle = if opt.mode == "consumer" {
161-
set_global_tracer(Tracer::new("consumer", "node_0", reporter));
162-
let handle = get_global_tracer().reporting(pending());
143+
tracer::set_global_tracer(Tracer::new("consumer", "node_0", reporter));
144+
let handle = tracer::reporting(pending());
163145
run_consumer_service([0, 0, 0, 0]).await;
164146
handle
165147
} else if opt.mode == "producer" {
166-
set_global_tracer(Tracer::new("producer", "node_0", reporter));
167-
let handle = get_global_tracer().reporting(pending());
148+
tracer::set_global_tracer(Tracer::new("producer", "node_0", reporter));
149+
let handle = tracer::reporting(pending());
168150
run_producer_service([0, 0, 0, 0]).await;
169151
handle
170152
} else {

examples/simple_trace_report.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,38 +18,38 @@
1818
use skywalking::context::tracer::Tracer;
1919
use skywalking::reporter::grpc::GrpcReporter;
2020
use std::error::Error;
21-
use std::sync::Arc;
2221
use tokio::signal;
2322

24-
async fn handle_request(tracer: Arc<Tracer<GrpcReporter>>) {
23+
async fn handle_request(tracer: Tracer) {
2524
let mut ctx = tracer.create_trace_context();
2625

2726
{
28-
// Generate an Entry Span when a request
29-
// is received. An Entry Span is generated only once per context.
30-
let span = ctx.create_entry_span("op1").unwrap();
27+
// Generate an Entry Span when a request is received.
28+
// An Entry Span is generated only once per context.
29+
// You should assign a variable name to guard the span not be dropped immediately.
30+
let _span = ctx.create_entry_span("op1");
3131

3232
// Something...
3333

3434
{
3535
// Generates an Exit Span when executing an RPC.
36-
let span2 = ctx.create_exit_span("op2", "remote_peer").unwrap();
36+
let _span2 = ctx.create_exit_span("op2", "remote_peer");
3737

3838
// Something...
3939

40-
ctx.finalize_span(span2);
40+
// Auto close span2 when dropped.
4141
}
4242

43-
ctx.finalize_span(span);
43+
// Auto close span when dropped.
4444
}
4545

46-
tracer.finalize_context(ctx);
46+
// Auto report ctx when dropped.
4747
}
4848

4949
#[tokio::main]
5050
async fn main() -> Result<(), Box<dyn Error>> {
5151
let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;
52-
let tracer = Arc::new(Tracer::new("service", "instance", reporter));
52+
let tracer = Tracer::new("service", "instance", reporter);
5353

5454
tokio::spawn(handle_request(tracer.clone()));
5555

src/common/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,3 @@
1515
//
1616

1717
pub mod random_generator;
18-
pub mod time;

src/context/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
//
1616

1717
pub mod propagation;
18-
pub mod system_time;
18+
pub mod span;
19+
pub(crate) mod system_time;
1920
pub mod trace_context;
2021
pub mod tracer;

src/context/propagation/encoder.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,14 @@ use base64::encode;
2020
/// Encode TracingContext to carry current trace info to the destination of RPC call.
2121
/// In general, the output of this function will be packed in `sw8` header in HTTP call.
2222
pub fn encode_propagation(context: &TracingContext, endpoint: &str, address: &str) -> String {
23-
let mut res = String::new();
24-
25-
res += "1-";
26-
res += format!("{}-", encode(context.trace_id.to_string())).as_str();
27-
res += format!("{}-", encode(context.trace_segment_id.to_string())).as_str();
28-
res += format!("{}-", context.peek_active_span_id().unwrap_or(0)).as_str();
29-
res += format!("{}-", encode(context.service.as_str())).as_str();
30-
res += format!("{}-", encode(context.service_instance.as_str())).as_str();
31-
res += format!("{}-", encode(endpoint)).as_str();
32-
res += &encode(address);
33-
res
23+
format!(
24+
"1-{}-{}-{}-{}-{}-{}-{}",
25+
encode(context.trace_id()),
26+
encode(context.trace_segment_id()),
27+
context.peek_active_span_id().unwrap_or(0),
28+
encode(context.service()),
29+
encode(context.service_instance()),
30+
encode(endpoint),
31+
encode(address)
32+
)
3433
}

0 commit comments

Comments
 (0)