diff --git a/kafka-streams-framework/build.gradle.kts b/kafka-streams-framework/build.gradle.kts index c7220eb..18199fd 100644 --- a/kafka-streams-framework/build.gradle.kts +++ b/kafka-streams-framework/build.gradle.kts @@ -21,8 +21,8 @@ dependencies { implementation("com.google.guava:guava:31.1-jre") implementation("org.apache.avro:avro:1.11.1") implementation("org.apache.kafka:kafka-clients:7.2.1-ccs") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.39") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.39") + implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.47") + implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.47") implementation("org.apache.commons:commons-lang3:3.12.0") testImplementation("org.apache.kafka:kafka-streams-test-utils:7.2.1-ccs") diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java index a66bd42..0f40f83 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java @@ -6,6 +6,7 @@ import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; import static org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG; @@ -20,6 +21,8 @@ import com.google.common.collect.Streams; import com.typesafe.config.Config; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics; import java.time.Duration; import java.util.ArrayList; @@ -90,7 +93,11 @@ protected void doInit() { app = new KafkaStreams(topology, streamsConfigProps); // export kafka streams metrics - metrics = new KafkaStreamsMetrics(app); + metrics = + new KafkaStreamsMetrics( + app, + Tags.of( + Tag.of("kstreams.app", streamsConfigProps.getProperty(APPLICATION_ID_CONFIG)))); metrics.bindTo(PlatformMetricsRegistry.getMeterRegistry()); // useful for resetting local state - during testing or any other scenarios where