Skip to content

Commit 28a4f7b

Browse files
authored
Refactor Kafka client configuration to use a new ClientConfig struct (#70)
1 parent 73c60ea commit 28a4f7b

File tree

3 files changed

+97
-11
lines changed

3 files changed

+97
-11
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,14 +226,14 @@ Make sure the feature `kafka-reporter` is enabled.
226226
#[cfg(feature = "kafka-reporter")]
227227
mod example {
228228
use skywalking::reporter::Report;
229-
use skywalking::reporter::kafka::{KafkaReportBuilder, KafkaReporter, RDKafkaClientConfig};
229+
use skywalking::reporter::kafka::{KafkaReportBuilder, KafkaReporter, ClientConfig};
230230

231231
async fn do_something(reporter: &impl Report) {
232232
// ....
233233
}
234234

235235
async fn foo() {
236-
let mut client_config = RDKafkaClientConfig::new();
236+
let mut client_config = ClientConfig::new();
237237
client_config
238238
.set("bootstrap.servers", "broker:9092")
239239
.set("message.timeout.ms", "6000");

e2e/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use skywalking::{
3535
reporter::{
3636
CollectItem, Report,
3737
grpc::GrpcReporter,
38-
kafka::{KafkaReportBuilder, KafkaReporter, RDKafkaClientConfig},
38+
kafka::{ClientConfig, KafkaReportBuilder, KafkaReporter},
3939
},
4040
trace::{
4141
propagation::{
@@ -252,7 +252,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
252252
let reporter1 = GrpcReporter::connect("http://127.0.0.1:19876").await?;
253253
let handle1 = reporter1.reporting().await.spawn();
254254

255-
let mut client_config = RDKafkaClientConfig::new();
255+
let mut client_config = ClientConfig::new();
256256
client_config
257257
.set("bootstrap.servers", "127.0.0.1:9092")
258258
.set("message.timeout.ms", "6000")

src/reporter/kafka.rs

Lines changed: 93 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
1919
use super::{CollectItemConsume, CollectItemProduce};
2020
use crate::reporter::{CollectItem, Report};
21-
pub use rdkafka::config::{ClientConfig as RDKafkaClientConfig, RDKafkaLogLevel};
22-
use rdkafka::producer::{FutureProducer, FutureRecord};
21+
use rdkafka::{
22+
config::ClientConfig as RDKafkaClientConfig,
23+
producer::{FutureProducer, FutureRecord},
24+
};
2325
use std::{
26+
collections::HashMap,
2427
error,
2528
future::{Future, pending},
2629
pin::Pin,
@@ -48,6 +51,89 @@ pub enum Error {
4851
},
4952
}
5053

54+
/// Log level for Kafka client.
55+
#[derive(Debug, Clone, Copy)]
56+
pub enum LogLevel {
57+
/// Critical level.
58+
Critical,
59+
/// Error level.
60+
Error,
61+
/// Warning level.
62+
Warning,
63+
/// Notice level.
64+
Notice,
65+
/// Info level.
66+
Info,
67+
/// Debug level.
68+
Debug,
69+
}
70+
71+
impl From<LogLevel> for rdkafka::config::RDKafkaLogLevel {
72+
fn from(level: LogLevel) -> Self {
73+
match level {
74+
LogLevel::Critical => rdkafka::config::RDKafkaLogLevel::Critical,
75+
LogLevel::Error => rdkafka::config::RDKafkaLogLevel::Error,
76+
LogLevel::Warning => rdkafka::config::RDKafkaLogLevel::Warning,
77+
LogLevel::Notice => rdkafka::config::RDKafkaLogLevel::Notice,
78+
LogLevel::Info => rdkafka::config::RDKafkaLogLevel::Info,
79+
LogLevel::Debug => rdkafka::config::RDKafkaLogLevel::Debug,
80+
}
81+
}
82+
}
83+
84+
/// Configuration for Kafka client.
85+
#[derive(Debug, Clone)]
86+
pub struct ClientConfig {
87+
/// Configuration parameters as key-value pairs.
88+
params: HashMap<String, String>,
89+
/// Log level for the client.
90+
log_level: Option<LogLevel>,
91+
}
92+
93+
impl ClientConfig {
94+
/// Create a new empty configuration.
95+
pub fn new() -> Self {
96+
Self {
97+
params: HashMap::new(),
98+
log_level: None,
99+
}
100+
}
101+
102+
/// Set a configuration parameter.
103+
pub fn set<K, V>(&mut self, key: K, value: V) -> &mut Self
104+
where
105+
K: Into<String>,
106+
V: Into<String>,
107+
{
108+
self.params.insert(key.into(), value.into());
109+
self
110+
}
111+
112+
/// Set log level.
113+
pub fn set_log_level(&mut self, level: LogLevel) -> &mut Self {
114+
self.log_level = Some(level);
115+
self
116+
}
117+
118+
/// Convert to rdkafka ClientConfig.
119+
fn to_rdkafka_config(&self) -> RDKafkaClientConfig {
120+
let mut config = RDKafkaClientConfig::new();
121+
for (key, value) in &self.params {
122+
config.set(key, value);
123+
}
124+
if let Some(log_level) = self.log_level {
125+
config.set_log_level(log_level.into());
126+
}
127+
config
128+
}
129+
}
130+
131+
impl Default for ClientConfig {
132+
fn default() -> Self {
133+
Self::new()
134+
}
135+
}
136+
51137
type DynErrHandler = dyn Fn(&str, &dyn error::Error) + Send + Sync + 'static;
52138

53139
fn default_err_handle(message: &str, err: &dyn error::Error) {
@@ -71,14 +157,14 @@ pub struct KafkaReportBuilder<P, C> {
71157
state: Arc<State>,
72158
producer: Arc<P>,
73159
consumer: C,
74-
client_config: RDKafkaClientConfig,
160+
client_config: ClientConfig,
75161
namespace: Option<String>,
76162
err_handle: Arc<DynErrHandler>,
77163
}
78164

79165
impl KafkaReportBuilder<mpsc::UnboundedSender<CollectItem>, mpsc::UnboundedReceiver<CollectItem>> {
80-
/// Create builder, with rdkafka client configuration.
81-
pub fn new(client_config: RDKafkaClientConfig) -> Self {
166+
/// Create builder, with client configuration.
167+
pub fn new(client_config: ClientConfig) -> Self {
82168
let (producer, consumer) = mpsc::unbounded_channel();
83169
Self::new_with_pc(client_config, producer, consumer)
84170
}
@@ -87,7 +173,7 @@ impl KafkaReportBuilder<mpsc::UnboundedSender<CollectItem>, mpsc::UnboundedRecei
87173
impl<P: CollectItemProduce, C: CollectItemConsume> KafkaReportBuilder<P, C> {
88174
/// Special purpose, used for user-defined produce and consume operations,
89175
/// usually you can use [KafkaReportBuilder::new].
90-
pub fn new_with_pc(client_config: RDKafkaClientConfig, producer: P, consumer: C) -> Self {
176+
pub fn new_with_pc(client_config: ClientConfig, producer: P, consumer: C) -> Self {
91177
Self {
92178
state: Default::default(),
93179
producer: Arc::new(producer),
@@ -118,7 +204,7 @@ impl<P: CollectItemProduce, C: CollectItemConsume> KafkaReportBuilder<P, C> {
118204
/// handle to push data to kafka in the background.
119205
pub async fn build(self) -> Result<(KafkaReporter<P>, KafkaReporting<C>), Error> {
120206
let kafka_producer = KafkaProducer::new(
121-
self.client_config.create()?,
207+
self.client_config.to_rdkafka_config().create()?,
122208
self.err_handle.clone(),
123209
self.namespace,
124210
)

0 commit comments

Comments
 (0)