diff --git a/src/app/config.rs b/src/app/config.rs index edd80b1..3e25426 100644 --- a/src/app/config.rs +++ b/src/app/config.rs @@ -46,6 +46,32 @@ pub struct MetricsConfig { pub hostname_tag: Option, } +#[serde_as] +#[derive(PartialEq, Debug, Serialize, Deserialize)] +pub struct KafkaConfig { + /// Kafka security protocol to use. The value must be one of "plaintext, "ssl", "sasl_plaintext", "sasl_ssl". + /// If not specified, defaults to "plaintext". + pub kafka_security_protocol: Option, + + /// TLS CA certificate location for Kafka. + pub kafka_ssl_ca_location: Option, + + /// TLS certificate location for Kafka. + pub kafka_ssl_cert_location: Option, + + /// TLS private key location for Kafka. + pub kafka_ssl_key_location: Option, + + /// SASL mechanism to use for Kafka. The value must be one of "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512". + pub kafka_sasl_mechanism: Option, + + /// SASL username for Kafka. + pub kafka_sasl_username: Option, + + /// SASL password for Kafka. + pub kafka_sasl_password: Option, +} + #[serde_as] #[derive(PartialEq, Debug, Serialize, Deserialize)] pub struct Config { @@ -86,6 +112,10 @@ pub struct Config { /// The topic to produce uptime checks into. pub results_kafka_topic: String, + /// Kafka extended configuration + #[serde(flatten)] + pub kafka_config: KafkaConfig, + /// Which config provider to use to load configs into memory pub config_provider_mode: ConfigProviderMode, @@ -172,6 +202,15 @@ impl Default for Config { }, results_kafka_cluster: vec!["127.0.0.1:9092".to_owned()], results_kafka_topic: "uptime-results".to_owned(), + kafka_config: KafkaConfig { + kafka_security_protocol: None, + kafka_ssl_ca_location: None, + kafka_ssl_cert_location: None, + kafka_ssl_key_location: None, + kafka_sasl_mechanism: None, + kafka_sasl_username: None, + kafka_sasl_password: None, + }, config_provider_mode: ConfigProviderMode::Redis, checker_mode: CheckerMode::Reqwest, vector_batch_size: 10, @@ -261,7 +300,9 @@ mod tests { use crate::{app::cli, logging}; - use super::{CheckerMode, Config, ConfigProviderMode, MetricsConfig, ProducerMode}; + use super::{ + CheckerMode, Config, ConfigProviderMode, KafkaConfig, MetricsConfig, ProducerMode, + }; fn test_with_config(yaml: &str, env_vars: &[(&str, &str)], test_fn: F) where @@ -326,6 +367,15 @@ mod tests { "10.0.0.2:9000".to_owned() ], results_kafka_topic: "uptime-results".to_owned(), + kafka_config: KafkaConfig { + kafka_security_protocol: None, + kafka_ssl_ca_location: None, + kafka_ssl_cert_location: None, + kafka_ssl_key_location: None, + kafka_sasl_mechanism: None, + kafka_sasl_username: None, + kafka_sasl_password: None, + }, config_provider_mode: ConfigProviderMode::Redis, checker_mode: CheckerMode::Reqwest, config_provider_redis_update_ms: 1000, @@ -371,6 +421,16 @@ mod tests { "UPTIME_CHECKER_CONFIGS_KAFKA_CLUSTER", "10.0.0.1,10.0.0.2:7000", ), + ("UPTIME_CHECKER_KAFKA_SECURITY_PROTOCOL", "plaintext"), + ("UPTIME_CHECKER_KAFKA_SSL_CA_LOCATION", "/path/to/ca.crt"), + ( + "UPTIME_CHECKER_KAFKA_SSL_CERT_LOCATION", + "/path/to/cert.crt", + ), + ("UPTIME_CHECKER_KAFKA_SSL_KEY_LOCATION", "/path/to/key.key"), + ("UPTIME_CHECKER_KAFKA_SASL_MECHANISM", "scram-sha-256"), + ("UPTIME_CHECKER_KAFKA_SASL_USERNAME", "my_user"), + ("UPTIME_CHECKER_KAFKA_SASL_PASSWORD", "my_password"), ("UPTIME_CHECKER_CONFIG_PROVIDER_MODE", "redis"), ("UPTIME_CHECKER_CONFIG_PROVIDER_REDIS_UPDATE_MS", "2000"), ( @@ -415,6 +475,15 @@ mod tests { "10.0.0.2:7000".to_owned() ], results_kafka_topic: "uptime-results".to_owned(), + kafka_config: KafkaConfig { + kafka_security_protocol: Some("plaintext".to_owned()), + kafka_ssl_ca_location: Some("/path/to/ca.crt".to_owned()), + kafka_ssl_cert_location: Some("/path/to/cert.crt".to_owned()), + kafka_ssl_key_location: Some("/path/to/key.key".to_owned()), + kafka_sasl_mechanism: Some("scram-sha-256".to_owned()), + kafka_sasl_username: Some("my_user".to_owned()), + kafka_sasl_password: Some("my_password".to_owned()), + }, config_provider_mode: ConfigProviderMode::Redis, checker_mode: CheckerMode::Reqwest, config_provider_redis_update_ms: 2000, diff --git a/src/manager.rs b/src/manager.rs index fe16b1f..3cc2cc6 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -186,8 +186,56 @@ impl Manager { (sender, handle, results_worker) } ProducerMode::Kafka => { - let kafka_overrides = + let mut kafka_overrides = HashMap::from([("compression.type".to_string(), "lz4".to_string())]); + + if let Some(kafka_ssl_ca_location) = &config.kafka_config.kafka_ssl_ca_location { + kafka_overrides.insert( + "ssl.ca.location".to_string(), + kafka_ssl_ca_location.to_owned(), + ); + } + + if let Some(kafka_ssl_cert_location) = &config.kafka_config.kafka_ssl_cert_location + { + kafka_overrides.insert( + "ssl.certificate.location".to_string(), + kafka_ssl_cert_location.to_owned(), + ); + } + + if let Some(kafka_ssl_key_location) = &config.kafka_config.kafka_ssl_key_location { + kafka_overrides.insert( + "ssl.key.location".to_string(), + kafka_ssl_key_location.to_owned(), + ); + } + + if let Some(kafka_security_protocol) = &config.kafka_config.kafka_security_protocol + { + kafka_overrides.insert( + "security.protocol".to_string(), + kafka_security_protocol.to_owned(), + ); + } + + if let Some(kafka_sasl_mechanism) = &config.kafka_config.kafka_sasl_mechanism { + kafka_overrides.insert( + "sasl.mechanism".to_string(), + kafka_sasl_mechanism.to_owned(), + ); + } + + if let Some(kafka_sasl_username) = &config.kafka_config.kafka_sasl_username { + kafka_overrides + .insert("sasl.username".to_string(), kafka_sasl_username.to_owned()); + } + + if let Some(kafka_sasl_password) = &config.kafka_config.kafka_sasl_password { + kafka_overrides + .insert("sasl.password".to_string(), kafka_sasl_password.to_owned()); + } + let kafka_config = KafkaConfig::new_config( config.results_kafka_cluster.to_owned(), Some(kafka_overrides),