Skip to content

Commit d14ea45

Browse files
committed
Add metric to check the current number of VT's
1 parent 508d4df commit d14ea45

File tree

2 files changed

+63
-12
lines changed

2 files changed

+63
-12
lines changed

Diff for: micrometer-java21/src/main/java/io/micrometer/java21/instrument/binder/jdk/VirtualThreadMetrics.java

+50-12
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,14 @@
1515
*/
1616
package io.micrometer.java21.instrument.binder.jdk;
1717

18-
import io.micrometer.core.instrument.Counter;
19-
import io.micrometer.core.instrument.MeterRegistry;
20-
import io.micrometer.core.instrument.Tag;
21-
import io.micrometer.core.instrument.Timer;
18+
import io.micrometer.core.instrument.*;
2219
import io.micrometer.core.instrument.binder.MeterBinder;
2320
import jdk.jfr.consumer.RecordingStream;
2421

2522
import java.io.Closeable;
2623
import java.time.Duration;
2724
import java.util.Objects;
25+
import java.util.concurrent.atomic.LongAdder;
2826

2927
import static java.util.Collections.emptyList;
3028

@@ -41,37 +39,69 @@ public class VirtualThreadMetrics implements MeterBinder, Closeable {
4139

4240
private static final String SUBMIT_FAILED_EVENT = "jdk.VirtualThreadSubmitFailed";
4341

44-
private final RecordingStream recordingStream;
42+
private static final String START_EVENT = "jdk.VirtualThreadStart";
4543

46-
private final Iterable<Tag> tags;
44+
private static final String END_EVENT = "jdk.VirtualThreadEnd";
45+
46+
private static final String SUBMIT_FAILED_METRIC_NAME = "jvm.threads.virtual.submit.failed";
47+
48+
private static final String VT_PINNED_METRIC_NAME = "jvm.threads.virtual.pinned";
4749

50+
private static final String VT_ACTIVE_METRIC_NAME = "jvm.threads.virtual.active";
51+
52+
private final RecordingConfig recordingCfg;
53+
54+
private RecordingStream recordingStream;
55+
56+
private final Iterable<Tag> tags;
57+
58+
private boolean activeMetricEnabled;
59+
4860
public VirtualThreadMetrics() {
4961
this(new RecordingConfig(), emptyList());
5062
}
5163

64+
public VirtualThreadMetrics(RecordingConfig config) {
65+
this(config, emptyList());
66+
}
67+
5268
public VirtualThreadMetrics(Iterable<Tag> tags) {
5369
this(new RecordingConfig(), tags);
5470
}
5571

56-
private VirtualThreadMetrics(RecordingConfig config, Iterable<Tag> tags) {
57-
this.recordingStream = createRecordingStream(config);
72+
public VirtualThreadMetrics(RecordingConfig config, Iterable<Tag> tags) {
73+
this.recordingCfg = config;
5874
this.tags = tags;
5975
}
6076

6177
@Override
6278
public void bindTo(MeterRegistry registry) {
63-
Timer pinnedTimer = Timer.builder("jvm.threads.virtual.pinned")
79+
if(this.recordingStream == null) {
80+
this.recordingStream = createRecordingStream(this.recordingCfg);
81+
}
82+
Timer pinnedTimer = Timer.builder(VT_PINNED_METRIC_NAME)
6483
.description("The duration while the virtual thread was pinned without releasing its platform thread")
6584
.tags(tags)
6685
.register(registry);
6786

68-
Counter submitFailedCounter = Counter.builder("jvm.threads.virtual.submit.failed")
87+
Counter submitFailedCounter = Counter.builder(SUBMIT_FAILED_METRIC_NAME)
6988
.description("The number of events when starting or unparking a virtual thread failed")
7089
.tags(tags)
7190
.register(registry);
7291

7392
recordingStream.onEvent(PINNED_EVENT, event -> pinnedTimer.record(event.getDuration()));
7493
recordingStream.onEvent(SUBMIT_FAILED_EVENT, event -> submitFailedCounter.increment());
94+
95+
if(activeMetricEnabled) {
96+
final LongAdder activeCounter = new LongAdder();
97+
this.recordingStream.onEvent(START_EVENT, event -> activeCounter.increment());
98+
this.recordingStream.onEvent(END_EVENT, event -> activeCounter.decrement());
99+
100+
Gauge.builder(VT_ACTIVE_METRIC_NAME, activeCounter::doubleValue)
101+
.description("The number of active virtual threads")
102+
.tags(tags)
103+
.register(registry);
104+
}
75105
}
76106

77107
private RecordingStream createRecordingStream(RecordingConfig config) {
@@ -80,6 +110,10 @@ private RecordingStream createRecordingStream(RecordingConfig config) {
80110
recordingStream.enable(SUBMIT_FAILED_EVENT);
81111
recordingStream.setMaxAge(config.maxAge);
82112
recordingStream.setMaxSize(config.maxSizeBytes);
113+
if(activeMetricEnabled) {
114+
recordingStream.enable(START_EVENT);
115+
recordingStream.enable(END_EVENT);
116+
}
83117
recordingStream.startAsync();
84118

85119
return recordingStream;
@@ -90,12 +124,16 @@ public void close() {
90124
recordingStream.close();
91125
}
92126

93-
private record RecordingConfig(Duration maxAge, long maxSizeBytes, Duration pinnedThreshold) {
127+
public void setActiveMetricEnabled(boolean activeMetricEnabled) {
128+
this.activeMetricEnabled = activeMetricEnabled;
129+
}
130+
131+
public record RecordingConfig(Duration maxAge, long maxSizeBytes, Duration pinnedThreshold) {
94132
private RecordingConfig() {
95133
this(Duration.ofSeconds(5), 10L * 1024 * 1024, Duration.ofMillis(20));
96134
}
97135

98-
private RecordingConfig {
136+
public RecordingConfig {
99137
Objects.requireNonNull(maxAge, "maxAge parameter must not be null");
100138
Objects.requireNonNull(pinnedThreshold, "pinnedThreshold must not be null");
101139
if (maxSizeBytes < 0) {

Diff for: micrometer-java21/src/test/java/io/micrometer/java21/instrument/binder/jdk/VirtualThreadMetricsTests.java

+13
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.micrometer.java21.instrument.binder.jdk;
1717

18+
import io.micrometer.core.instrument.Gauge;
1819
import io.micrometer.core.instrument.Tags;
1920
import io.micrometer.core.instrument.Timer;
2021
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
@@ -49,6 +50,7 @@ class VirtualThreadMetricsTests {
4950
void setUp() {
5051
registry = new SimpleMeterRegistry();
5152
virtualThreadMetrics = new VirtualThreadMetrics(TAGS);
53+
virtualThreadMetrics.setActiveMetricEnabled(true);
5254
virtualThreadMetrics.bindTo(registry);
5355
}
5456

@@ -77,6 +79,17 @@ void pinnedEventsShouldBeRecorded() {
7779
assertThat(timer.totalTime(MILLISECONDS)).isBetween(130d, 170d); // ~150ms
7880
}
7981
}
82+
83+
@Test
84+
void startEndEventsShouldBeRecorded() {
85+
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
86+
for (int i = 0; i < 3; i++) {
87+
executor.submit(() -> sleep(Duration.ofSeconds(1)));
88+
}
89+
Gauge gauge = registry.get("jvm.threads.virtual.active").tags(TAGS).gauge();
90+
await().atMost(Duration.ofSeconds(2)).until(() -> gauge.value() == 3);
91+
}
92+
}
8093

8194
private void pinCurrentThreadAndAwait(CountDownLatch latch) {
8295
synchronized (new Object()) { // assumes that synchronized pins the thread

0 commit comments

Comments
 (0)