From 14a9a6ed7747bf0281ade2bfc9be2f5e15a48c7e Mon Sep 17 00:00:00 2001 From: ShubhamRwt Date: Tue, 22 Apr 2025 17:58:51 +0530 Subject: [PATCH 1/3] Added new metrics Signed-off-by: ShubhamRwt --- .../detector/AnomalyDetectorManager.java | 6 ++--- .../detector/AnomalyDetectorState.java | 24 ++++++++++++++++++- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManager.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManager.java index 8d86931fe0..68e3d9f220 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManager.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManager.java @@ -559,18 +559,18 @@ private void fixAnomalyInProgress(AnomalyType anomalyType) throws Exception { skipReportingIfNotUpdated = anomalyType == KafkaAnomalyType.BROKER_FAILURE; throw ofe; } finally { - handlePostFixAnomaly(isReadyToFix, fixStarted, anomalyId, skipReportingIfNotUpdated); + handlePostFixAnomaly(isReadyToFix, fixStarted, anomalyId, skipReportingIfNotUpdated, anomalyType); } } } } - private void handlePostFixAnomaly(boolean isReadyToFix, boolean fixStarted, String anomalyId, boolean skipReportingIfNotUpdated) { + private void handlePostFixAnomaly(boolean isReadyToFix, boolean fixStarted, String anomalyId, boolean skipReportingIfNotUpdated, AnomalyType anomalyType) { if (isReadyToFix) { _anomalyDetectorState.onAnomalyHandle(_anomalyInProgress, fixStarted ? AnomalyState.Status.FIX_STARTED : AnomalyState.Status.FIX_FAILED_TO_START); if (fixStarted) { - _anomalyDetectorState.incrementNumSelfHealingStarted(); + _anomalyDetectorState.incrementNumSelfHealingStarted(anomalyType); LOG.info("[{}] Self-healing started successfully.", anomalyId); } else { _anomalyDetectorState.incrementNumSelfHealingFailedToStart(); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorState.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorState.java index d79f2a83df..8992cc3ba3 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorState.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorState.java @@ -4,6 +4,7 @@ package com.linkedin.kafka.cruisecontrol.detector; +import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; @@ -81,6 +82,8 @@ public class AnomalyDetectorState { private double _balancednessScore; private boolean _hasUnfixableGoals; private final Map _anomalyDetectToFixCompleteTimer; + private final Map _numSelfHealingStartedPerAnomaly; + private final Map _numSelfHealingEndedPerAnomaly; public AnomalyDetectorState(Time time, AnomalyNotifier anomalyNotifier, @@ -105,6 +108,8 @@ protected boolean removeEldestEntry(Map.Entry eldest) { _ongoingAnomalyDurationSumForAverageMs = 0; _numSelfHealingStarted = new AtomicLong(0L); _numSelfHealingFailedToStart = new AtomicLong(0L); + _numSelfHealingStartedPerAnomaly = new HashMap<>(); + _numSelfHealingEndedPerAnomaly = new HashMap<>(); Map meanTimeBetweenAnomaliesMs = new HashMap<>(); for (AnomalyType anomalyType : KafkaAnomalyType.cachedValues()) { @@ -141,6 +146,14 @@ protected boolean removeEldestEntry(Map.Entry eldest) { Timer timer = dropwizardMetricRegistry.timer( MetricRegistry.name(ANOMALY_DETECTOR_SENSOR, String.format("%s-detect-to-fix-complete-timer", anomalyType.toString().toLowerCase()))); _anomalyDetectToFixCompleteTimer.put(anomalyType, timer); + + Counter counter = dropwizardMetricRegistry.counter( + MetricRegistry.name(ANOMALY_DETECTOR_SENSOR, String.format("num-of-self-healing-started-for-%s", anomalyType.toString().toLowerCase()))); + _numSelfHealingStartedPerAnomaly.put(anomalyType, counter); + + Counter counter1 = dropwizardMetricRegistry.counter( + MetricRegistry.name(ANOMALY_DETECTOR_SENSOR, String.format("num-of-self-healing-ended-for-%s", anomalyType.toString().toLowerCase()))); + _numSelfHealingEndedPerAnomaly.put(anomalyType, counter1); } } else { _anomalyRateByType = new HashMap<>(); @@ -260,8 +273,16 @@ long numSelfHealingStarted() { /** * Increment the number of self healing actions started successfully. */ - void incrementNumSelfHealingStarted() { + void incrementNumSelfHealingStarted(AnomalyType anomalyType) { _numSelfHealingStarted.incrementAndGet(); + _numSelfHealingStartedPerAnomaly.get(anomalyType).inc(); + } + + /** + * Increment the number of self healing actions ended successfully per anomaly. + */ + void incrementNumSelfHealingEndedPerAnomaly(AnomalyType anomalyType) { + _numSelfHealingEndedPerAnomaly.get(anomalyType).inc(); } /** @@ -309,6 +330,7 @@ public synchronized void markSelfHealingFinished(String anomalyId, boolean compl // Time the duration if the self-healing is completed successfully. // completeWithError is true if the proposal execution is interrupted (receive stop signal) or encountered exception. // execution-stopped metrics track the number of stopped execution. + incrementNumSelfHealingEndedPerAnomaly(_ongoingSelfHealingAnomaly.anomalyType()); _anomalyDetectToFixCompleteTimer.get(_ongoingSelfHealingAnomaly.anomalyType()).update(totalTime, TimeUnit.MILLISECONDS); } _ongoingSelfHealingAnomaly = null; From db2909ab86defaa4e7025ab917aa25d83c654450 Mon Sep 17 00:00:00 2001 From: ShubhamRwt Date: Tue, 22 Apr 2025 18:18:23 +0530 Subject: [PATCH 2/3] Added checkstyle fixes Signed-off-by: ShubhamRwt --- .../kafka/cruisecontrol/detector/AnomalyDetectorManager.java | 3 ++- .../kafka/cruisecontrol/detector/AnomalyDetectorState.java | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManager.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManager.java index 68e3d9f220..d102e78994 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManager.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorManager.java @@ -565,7 +565,8 @@ private void fixAnomalyInProgress(AnomalyType anomalyType) throws Exception { } } - private void handlePostFixAnomaly(boolean isReadyToFix, boolean fixStarted, String anomalyId, boolean skipReportingIfNotUpdated, AnomalyType anomalyType) { + private void handlePostFixAnomaly(boolean isReadyToFix, boolean fixStarted, String anomalyId, boolean skipReportingIfNotUpdated, + AnomalyType anomalyType) { if (isReadyToFix) { _anomalyDetectorState.onAnomalyHandle(_anomalyInProgress, fixStarted ? AnomalyState.Status.FIX_STARTED : AnomalyState.Status.FIX_FAILED_TO_START); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorState.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorState.java index 8992cc3ba3..5b359119f9 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorState.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorState.java @@ -272,6 +272,8 @@ long numSelfHealingStarted() { /** * Increment the number of self healing actions started successfully. + * + * @param anomalyType Type of anomaly */ void incrementNumSelfHealingStarted(AnomalyType anomalyType) { _numSelfHealingStarted.incrementAndGet(); @@ -280,6 +282,8 @@ void incrementNumSelfHealingStarted(AnomalyType anomalyType) { /** * Increment the number of self healing actions ended successfully per anomaly. + * + * @param anomalyType Type of anomaly */ void incrementNumSelfHealingEndedPerAnomaly(AnomalyType anomalyType) { _numSelfHealingEndedPerAnomaly.get(anomalyType).inc(); From d1a2a264852f2df3c88c5749aa5f214cf4cf8f4e Mon Sep 17 00:00:00 2001 From: ShubhamRwt Date: Wed, 23 Apr 2025 18:04:37 +0530 Subject: [PATCH 3/3] Added suggestions by Tom and fixing the failed test Signed-off-by: ShubhamRwt --- .../detector/AnomalyDetectorState.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorState.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorState.java index 5b359119f9..d7fa5d6231 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorState.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorState.java @@ -108,8 +108,6 @@ protected boolean removeEldestEntry(Map.Entry eldest) { _ongoingAnomalyDurationSumForAverageMs = 0; _numSelfHealingStarted = new AtomicLong(0L); _numSelfHealingFailedToStart = new AtomicLong(0L); - _numSelfHealingStartedPerAnomaly = new HashMap<>(); - _numSelfHealingEndedPerAnomaly = new HashMap<>(); Map meanTimeBetweenAnomaliesMs = new HashMap<>(); for (AnomalyType anomalyType : KafkaAnomalyType.cachedValues()) { @@ -130,6 +128,8 @@ protected boolean removeEldestEntry(Map.Entry eldest) { (Gauge) () -> hasUnfixableGoals() ? 1 : 0); _anomalyRateByType = new HashMap<>(); + _numSelfHealingStartedPerAnomaly = new HashMap<>(); + _numSelfHealingEndedPerAnomaly = new HashMap<>(); _anomalyRateByType.put(BROKER_FAILURE, dropwizardMetricRegistry.meter(MetricRegistry.name(ANOMALY_DETECTOR_SENSOR, "broker-failure-rate"))); _anomalyRateByType.put(GOAL_VIOLATION, @@ -143,20 +143,24 @@ protected boolean removeEldestEntry(Map.Entry eldest) { _anomalyRateByType.put(MAINTENANCE_EVENT, dropwizardMetricRegistry.meter(MetricRegistry.name(ANOMALY_DETECTOR_SENSOR, "maintenance-event-rate"))); for (KafkaAnomalyType anomalyType : KafkaAnomalyType.cachedValues()) { - Timer timer = dropwizardMetricRegistry.timer( - MetricRegistry.name(ANOMALY_DETECTOR_SENSOR, String.format("%s-detect-to-fix-complete-timer", anomalyType.toString().toLowerCase()))); - _anomalyDetectToFixCompleteTimer.put(anomalyType, timer); + _anomalyDetectToFixCompleteTimer.put(anomalyType, + dropwizardMetricRegistry.timer(MetricRegistry.name(ANOMALY_DETECTOR_SENSOR, + String.format("%s-detect-to-fix-complete-timer", anomalyType.toString().toLowerCase())))); - Counter counter = dropwizardMetricRegistry.counter( - MetricRegistry.name(ANOMALY_DETECTOR_SENSOR, String.format("num-of-self-healing-started-for-%s", anomalyType.toString().toLowerCase()))); - _numSelfHealingStartedPerAnomaly.put(anomalyType, counter); + _numSelfHealingStartedPerAnomaly.put(anomalyType, + dropwizardMetricRegistry.counter(MetricRegistry.name(ANOMALY_DETECTOR_SENSOR, + String.format("num-of-self-healing-started-for-%s", anomalyType.toString().toLowerCase())))); - Counter counter1 = dropwizardMetricRegistry.counter( - MetricRegistry.name(ANOMALY_DETECTOR_SENSOR, String.format("num-of-self-healing-ended-for-%s", anomalyType.toString().toLowerCase()))); - _numSelfHealingEndedPerAnomaly.put(anomalyType, counter1); + _numSelfHealingEndedPerAnomaly.put(anomalyType, + dropwizardMetricRegistry.counter(MetricRegistry.name(ANOMALY_DETECTOR_SENSOR, + String.format("num-of-self-healing-ended-for-%s", anomalyType.toString().toLowerCase())))); } } else { _anomalyRateByType = new HashMap<>(); + _numSelfHealingStartedPerAnomaly = new HashMap<>(); + _numSelfHealingEndedPerAnomaly = new HashMap<>(); + KafkaAnomalyType.cachedValues().forEach(anomalyType -> _numSelfHealingStartedPerAnomaly.put(anomalyType, new Counter())); + KafkaAnomalyType.cachedValues().forEach(anomalyType -> _numSelfHealingEndedPerAnomaly.put(anomalyType, new Counter())); KafkaAnomalyType.cachedValues().forEach(anomalyType -> _anomalyRateByType.put(anomalyType, new Meter())); } _balancednessScore = INITIAL_BALANCEDNESS_SCORE;