Skip to content

Commit 244a389

Browse files
Merge pull request #119 from code0-tech/118-shutdown-on-ctrl_c
Shutdown on Ctrl_c
2 parents 159880f + 981220b commit 244a389

File tree

7 files changed

+145
-62
lines changed

7 files changed

+145
-62
lines changed

Cargo.lock

Lines changed: 43 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,16 @@ version = "0.0.0"
77
edition = "2024"
88

99
[workspace.dependencies]
10-
code0-flow = { version = "0.0.18" }
11-
tucana = { version = "0.0.39", features = ["aquila"] }
10+
code0-flow = { version = "0.0.19" }
11+
tucana = { version = "0.0.42", features = ["aquila"] }
1212
serde_json = { version = "1.0.138" }
1313
log = "0.4.27"
1414
env_logger = "0.11.8"
1515
regex = "1.11.1"
1616
tokio = { version = "1.44.1", features = ["rt-multi-thread"] }
1717
uuid = { version = "1.16.0", features = ["v4"] }
1818
tonic = "0.14.0"
19-
async-nats = "0.44.2"
19+
async-nats = "0.45.0"
2020
async-trait = "0.1.88"
2121
anyhow = "1.0.98"
2222
prost = "0.14.0"

adapter/rest/src/main.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,14 @@ use tucana::shared::{Struct, ValidationFlow, Value};
1616
#[tokio::main]
1717
async fn main() {
1818
let server = HttpServer { http_server: None };
19-
let runner = ServerRunner::new(server).await.unwrap();
20-
runner.serve().await.unwrap();
19+
let runner = match ServerRunner::new(server).await {
20+
Ok(runner) => runner,
21+
Err(err) => panic!("Failed to create server runner: {:?}", err),
22+
};
23+
match runner.serve().await {
24+
Ok(_) => (),
25+
Err(err) => panic!("Failed to start server runner: {:?}", err),
26+
};
2127
}
2228

2329
struct HttpServer {

crates/base/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ uuid = { workspace = true }
1616
prost = { workspace = true }
1717
futures-lite = { workspace = true }
1818
log = { workspace = true }
19+
env_logger = {workspace = true}

crates/base/src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ pub struct AdapterConfig {
5151
///
5252
/// If true the Adapter will expose a grpc health service server.
5353
pub with_health_service: bool,
54+
55+
/// Variant
56+
///
57+
/// The Variant of Draco. E.g. Http, Cron...
58+
pub draco_variant: String,
5459
}
5560

5661
impl AdapterConfig {
@@ -79,6 +84,8 @@ impl AdapterConfig {
7984
let with_health_service =
8085
code0_flow::flow_config::env_with_default("WITH_HEALTH_SERVICE", false);
8186

87+
let draco_variant =
88+
code0_flow::flow_config::env_with_default("DRACO_VARIANT", String::from("None"));
8289
Self {
8390
environment,
8491
nats_bucket,
@@ -89,6 +96,7 @@ impl AdapterConfig {
8996
aquila_url,
9097
definition_path,
9198
with_health_service,
99+
draco_variant,
92100
}
93101
}
94102

crates/base/src/runner.rs

Lines changed: 73 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use crate::{
33
store::AdapterStore,
44
traits::{LoadConfig, Server as AdapterServer},
55
};
6-
use code0_flow::flow_definition::FlowUpdateService;
6+
use code0_flow::flow_service::FlowUpdateService;
77
use std::sync::Arc;
8-
use tokio::sync::broadcast;
8+
use tokio::signal;
99
use tonic::transport::Server;
1010
use tonic_health::pb::health_server::HealthServer;
1111

@@ -20,11 +20,14 @@ pub struct ServerContext<C: LoadConfig> {
2020
pub struct ServerRunner<C: LoadConfig> {
2121
context: ServerContext<C>,
2222
server: Box<dyn AdapterServer<C>>,
23-
shutdown_sender: broadcast::Sender<()>,
2423
}
2524

2625
impl<C: LoadConfig> ServerRunner<C> {
2726
pub async fn new<S: AdapterServer<C>>(server: S) -> anyhow::Result<Self> {
27+
env_logger::Builder::from_default_env()
28+
.filter_level(log::LevelFilter::Debug)
29+
.init();
30+
2831
code0_flow::flow_config::load_env_file();
2932

3033
let adapter_config = AdapterConfig::from_env();
@@ -41,17 +44,15 @@ impl<C: LoadConfig> ServerRunner<C> {
4144
server_config: Arc::new(server_config),
4245
};
4346

44-
let (shutdown_tx, _) = broadcast::channel(1);
45-
4647
Ok(Self {
4748
context,
4849
server: Box::new(server),
49-
shutdown_sender: shutdown_tx,
5050
})
5151
}
5252

53-
pub async fn serve(mut self) -> anyhow::Result<()> {
53+
pub async fn serve(self) -> anyhow::Result<()> {
5454
let config = self.context.adapter_config.clone();
55+
log::info!("Starting Draco Variant: {}", config.draco_variant);
5556

5657
if !config.is_static() {
5758
let definition_service = FlowUpdateService::from_url(
@@ -61,42 +62,84 @@ impl<C: LoadConfig> ServerRunner<C> {
6162
definition_service.send().await;
6263
}
6364

64-
if config.with_health_service {
65+
let health_task = if config.with_health_service {
6566
let health_service =
6667
code0_flow::flow_health::HealthService::new(config.nats_url.clone());
6768
let address = format!("{}:{}", config.grpc_host, config.grpc_port).parse()?;
6869

69-
tokio::spawn(async move {
70-
let _ = Server::builder()
71-
.add_service(HealthServer::new(health_service))
72-
.serve(address)
73-
.await;
74-
});
75-
7670
log::info!(
77-
"Health server started at {}:{}",
71+
"Health server starting at {}:{}",
7872
config.grpc_host,
7973
config.grpc_port
8074
);
81-
}
82-
83-
self.server.init(&self.context).await?;
8475

85-
let mut rx = self.shutdown_sender.subscribe();
86-
let context = self.context;
87-
let mut server = self.server;
76+
Some(tokio::spawn(async move {
77+
if let Err(err) = Server::builder()
78+
.add_service(HealthServer::new(health_service))
79+
.serve(address)
80+
.await
81+
{
82+
log::error!("Health server error: {:?}", err);
83+
} else {
84+
log::info!("Health server stopped gracefully");
85+
}
86+
}))
87+
} else {
88+
None
89+
};
8890

89-
let handle = tokio::spawn(async move {
90-
tokio::select! {
91-
result = server.run(&context) => result,
92-
_ = rx.recv() => server.shutdown(&context).await,
91+
let ServerRunner {
92+
mut server,
93+
context,
94+
} = self;
95+
96+
// Init the adapter server (e.g. create underlying HTTP server)
97+
server.init(&context).await?;
98+
log::info!("Draco successfully initialized.");
99+
100+
match health_task {
101+
Some(mut ht) => {
102+
tokio::select! {
103+
// Main adapter server loop finished on its own
104+
res = server.run(&context) => {
105+
log::warn!("Adapter server finished, shutting down");
106+
ht.abort();
107+
res?;
108+
}
109+
110+
// Health server ended first
111+
_ = &mut ht => {
112+
log::warn!("Health server task finished, shutting down adapter");
113+
server.shutdown(&context).await?;
114+
}
115+
116+
// Ctrl+C / SIGINT
117+
_ = signal::ctrl_c() => {
118+
log::info!("Ctrl+C/Exit signal received, shutting down adapter");
119+
server.shutdown(&context).await?;
120+
ht.abort();
121+
}
122+
}
93123
}
94-
});
95124

96-
tokio::signal::ctrl_c().await?;
97-
let _ = self.shutdown_sender.send(());
98-
handle.await??;
125+
None => {
126+
tokio::select! {
127+
// Adapter server loop ends on its own
128+
res = server.run(&context) => {
129+
log::warn!("Adapter server finished");
130+
res?;
131+
}
132+
133+
// Ctrl+C / SIGINT
134+
_ = signal::ctrl_c() => {
135+
log::info!("Ctrl+C/Exit signal received, shutting down adapter");
136+
server.shutdown(&context).await?;
137+
}
138+
}
139+
}
140+
}
99141

142+
log::info!("Draco shutdown complete");
100143
Ok(())
101144
}
102145
}

0 commit comments

Comments
 (0)