Skip to content

Commit 2c46d0b

Browse files
authored
Polish KafkaMetrics (#6076)
1 parent b7a344c commit 2c46d0b

File tree

1 file changed

+64
-61
lines changed
  • micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka

1 file changed

+64
-61
lines changed

Diff for: micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaMetrics.java

+64-61
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private Iterable<Tag> getCommonTags(MeterRegistry registry) {
137137
// FIXME hack until we have proper API to retrieve common tags
138138
Meter.Id dummyId = Meter.builder("delete.this", OTHER, Collections.emptyList()).register(registry).getId();
139139
registry.remove(dummyId);
140-
return dummyId.getTags();
140+
return dummyId.getTagsAsIterable();
141141
}
142142

143143
/**
@@ -178,74 +178,77 @@ void checkAndBindMetrics(MeterRegistry registry) {
178178
Map<MetricName, ? extends Metric> currentMetrics = this.metricsSupplier.get();
179179
this.metrics.set(currentMetrics);
180180

181-
if (!currentMeters.equals(currentMetrics.keySet())) {
182-
Set<MetricName> metricsToRemove = currentMeters.stream()
183-
.filter(metricName -> !currentMetrics.containsKey(metricName))
184-
.collect(Collectors.toSet());
181+
if (currentMeters.equals(currentMetrics.keySet())) {
182+
return;
183+
}
185184

186-
for (MetricName metricName : metricsToRemove) {
187-
Meter.Id id = meterIdForComparison(metricName);
188-
registry.remove(id);
189-
registeredMeterIds.remove(id);
190-
}
185+
Set<MetricName> metricsToRemove = currentMeters.stream()
186+
.filter(metricName -> !currentMetrics.containsKey(metricName))
187+
.collect(Collectors.toSet());
188+
189+
for (MetricName metricName : metricsToRemove) {
190+
Meter.Id id = meterIdForComparison(metricName);
191+
registry.remove(id);
192+
registeredMeterIds.remove(id);
193+
}
191194

192-
currentMeters = new HashSet<>(currentMetrics.keySet());
195+
currentMeters = new HashSet<>(currentMetrics.keySet());
193196

194-
Map<String, List<Meter>> registryMetersByNames = registry.getMeters()
195-
.stream()
196-
.collect(Collectors.groupingBy(meter -> meter.getId().getName()));
197+
Map<String, List<Meter>> registryMetersByNames = registry.getMeters()
198+
.stream()
199+
.collect(Collectors.groupingBy(meter -> meter.getId().getName()));
197200

198-
currentMetrics.forEach((name, metric) -> {
199-
// Filter out non-numeric values
200-
// Filter out metrics from groups that include metadata
201-
if (!(metric.metricValue() instanceof Number) || METRIC_GROUP_APP_INFO.equals(name.group())
202-
|| METRIC_GROUP_METRICS_COUNT.equals(name.group())) {
203-
return;
204-
}
201+
currentMetrics.forEach((name, metric) -> {
202+
// Filter out non-numeric values
203+
// Filter out metrics from groups that include metadata
204+
if (!(metric.metricValue() instanceof Number) || METRIC_GROUP_APP_INFO.equals(name.group())
205+
|| METRIC_GROUP_METRICS_COUNT.equals(name.group())) {
206+
return;
207+
}
205208

206-
String meterName = meterName(name);
207-
208-
// Kafka has metrics with lower number of tags (e.g. with/without
209-
// topic or partition tag)
210-
// Remove meters with lower number of tags
211-
boolean hasLessTags = false;
212-
for (Meter other : registryMetersByNames.getOrDefault(meterName, emptyList())) {
213-
Meter.Id otherId = other.getId();
214-
List<Tag> tags = otherId.getTags();
215-
List<Tag> meterTagsWithCommonTags = meterTags(name, true);
216-
if (tags.size() < meterTagsWithCommonTags.size()) {
217-
registry.remove(otherId);
218-
registeredMeterIds.remove(otherId);
219-
}
220-
// Check if already exists
221-
else if (tags.size() == meterTagsWithCommonTags.size()) {
222-
if (tags.containsAll(meterTagsWithCommonTags))
223-
return;
224-
}
225-
else
226-
hasLessTags = true;
209+
String meterName = meterName(name);
210+
211+
// Kafka has metrics with lower number of tags (e.g. with/without
212+
// topic or partition tag)
213+
// Remove meters with lower number of tags
214+
boolean hasLessTags = false;
215+
for (Meter other : registryMetersByNames.getOrDefault(meterName, emptyList())) {
216+
Meter.Id otherId = other.getId();
217+
List<Tag> otherTags = otherId.getTags();
218+
List<Tag> meterTagsWithCommonTags = meterTags(name, true);
219+
if (otherTags.size() < meterTagsWithCommonTags.size()) {
220+
registry.remove(otherId);
221+
registeredMeterIds.remove(otherId);
227222
}
228-
if (hasLessTags)
229-
return;
230-
231-
List<Tag> tags = meterTags(name);
232-
try {
233-
Meter meter = bindMeter(registry, metric, meterName, tags);
234-
List<Meter> meters = registryMetersByNames.computeIfAbsent(meterName, k -> new ArrayList<>());
235-
meters.add(meter);
223+
// Check if already exists
224+
else if (otherTags.size() == meterTagsWithCommonTags.size()) {
225+
// https://www.jetbrains.com/help/inspectopedia/SlowListContainsAll.html
226+
if (new HashSet<>(otherTags).containsAll(meterTagsWithCommonTags))
227+
return;
236228
}
237-
catch (Exception ex) {
238-
String message = ex.getMessage();
239-
if (message != null && message.contains("Prometheus requires")) {
240-
warnThenDebugLogger.log(() -> "Failed to bind meter: " + meterName + " " + tags
241-
+ ". However, this could happen and might be restored in the next refresh.");
242-
}
243-
else {
244-
log.warn("Failed to bind meter: " + meterName + " " + tags + ".", ex);
245-
}
229+
else
230+
hasLessTags = true;
231+
}
232+
if (hasLessTags)
233+
return;
234+
235+
List<Tag> tags = meterTags(name);
236+
try {
237+
Meter meter = bindMeter(registry, metric, meterName, tags);
238+
List<Meter> meters = registryMetersByNames.computeIfAbsent(meterName, k -> new ArrayList<>());
239+
meters.add(meter);
240+
}
241+
catch (Exception ex) {
242+
String message = ex.getMessage();
243+
if (message != null && message.contains("Prometheus requires")) {
244+
warnThenDebugLogger.log(() -> "Failed to bind meter: " + meterName + " " + tags
245+
+ ". However, this could happen and might be restored in the next refresh.");
246246
}
247-
});
248-
}
247+
else {
248+
log.warn("Failed to bind meter: " + meterName + " " + tags + ".", ex);
249+
}
250+
}
251+
});
249252
}
250253
catch (Exception e) {
251254
log.warn("Failed to bind KafkaMetric", e);

0 commit comments

Comments
 (0)