From e580eddd1047ecf2990551371f49e3f10c41baaf Mon Sep 17 00:00:00 2001 From: avinash Date: Fri, 15 Aug 2025 21:32:53 +0530 Subject: [PATCH 1/2] Added debug log for tracking event store delete operations --- .../framework/punctuators/AbstractThrottledPunctuator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java index 640ab79..51993d5 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java @@ -134,6 +134,7 @@ public final void punctuate(long timestamp) { eventStore.put(windowMs, rescheduledTasks.get(windowMs)); } else { // can directly delete key from store + log.debug("Deleting empty window for ts: {}", windowMs); eventStore.delete(windowMs); } } else { From c22bd7115e0e8a08dfa236d776417441ef2d9b23 Mon Sep 17 00:00:00 2001 From: avinash Date: Fri, 15 Aug 2025 21:34:15 +0530 Subject: [PATCH 2/2] minor fix --- .../framework/punctuators/AbstractThrottledPunctuator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java index 51993d5..c3429f9 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java @@ -134,8 +134,8 @@ public final void punctuate(long timestamp) { eventStore.put(windowMs, rescheduledTasks.get(windowMs)); } else { // can directly delete key from store - log.debug("Deleting empty window for ts: {}", windowMs); eventStore.delete(windowMs); + log.debug("Deleted empty window for ts: {}", windowMs); } } else { ArrayList windowTasks = new ArrayList<>(events.subList(i, events.size()));