Skip to content

Add metric to check the current number of VT's #6009

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -15,16 +15,14 @@
*/
package io.micrometer.java21.instrument.binder.jdk;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.binder.MeterBinder;
import jdk.jfr.consumer.RecordingStream;

import java.io.Closeable;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.LongAdder;

import static java.util.Collections.emptyList;

@@ -41,43 +39,103 @@ public class VirtualThreadMetrics implements MeterBinder, Closeable {

private static final String SUBMIT_FAILED_EVENT = "jdk.VirtualThreadSubmitFailed";

private final RecordingStream recordingStream;
private static final String START_EVENT = "jdk.VirtualThreadStart";

private static final String END_EVENT = "jdk.VirtualThreadEnd";

private static final String SUBMIT_FAILED_METRIC_NAME = "jvm.threads.virtual.submit.failed";

private static final String VT_PINNED_METRIC_NAME = "jvm.threads.virtual.pinned";

private static final String VT_ACTIVE_METRIC_NAME = "jvm.threads.virtual.active";

private final RecordingConfig recordingCfg;

private RecordingStream recordingStream;

private final Iterable<Tag> tags;

/**
* Instantiates a new Virtual thread metrics.
*/
public VirtualThreadMetrics() {
this(new RecordingConfig(), emptyList());
}

/**
* Instantiates a new Virtual thread metrics.
* @param config the config
*/
public VirtualThreadMetrics(RecordingConfig config) {
this(config, emptyList());
}

/**
* Instantiates a new Virtual thread metrics.
* @param tags the tags
*/
public VirtualThreadMetrics(Iterable<Tag> tags) {
this(new RecordingConfig(), tags);
}

private VirtualThreadMetrics(RecordingConfig config, Iterable<Tag> tags) {
this.recordingStream = createRecordingStream(config);
/**
* Instantiates a new Virtual thread metrics.
* @param config the config
* @param tags the tags
*/
public VirtualThreadMetrics(RecordingConfig config, Iterable<Tag> tags) {
this.recordingCfg = config;
this.tags = tags;
}

@Override
public void bindTo(MeterRegistry registry) {
Timer pinnedTimer = Timer.builder("jvm.threads.virtual.pinned")
.description("The duration while the virtual thread was pinned without releasing its platform thread")
.tags(tags)
.register(registry);

Counter submitFailedCounter = Counter.builder("jvm.threads.virtual.submit.failed")
.description("The number of events when starting or unparking a virtual thread failed")
.tags(tags)
.register(registry);

recordingStream.onEvent(PINNED_EVENT, event -> pinnedTimer.record(event.getDuration()));
recordingStream.onEvent(SUBMIT_FAILED_EVENT, event -> submitFailedCounter.increment());
if (this.recordingStream == null) {
this.recordingStream = createRecordingStream(this.recordingCfg);
}

if (recordingCfg.pinnedMetricEnabled) {
Timer pinnedTimer = Timer.builder(VT_PINNED_METRIC_NAME)
.description("The duration while the virtual thread was pinned without releasing its platform thread")
.tags(tags)
.register(registry);

recordingStream.onEvent(PINNED_EVENT, event -> pinnedTimer.record(event.getDuration()));
}

if (recordingCfg.submitFailedMetricEnabled) {
Counter submitFailedCounter = Counter.builder(SUBMIT_FAILED_METRIC_NAME)
.description("The number of events when starting or unparking a virtual thread failed")
.tags(tags)
.register(registry);

recordingStream.onEvent(SUBMIT_FAILED_EVENT, event -> submitFailedCounter.increment());
}

if (recordingCfg.activeMetricEnabled) {
final LongAdder activeCounter = new LongAdder();
this.recordingStream.onEvent(START_EVENT, event -> activeCounter.increment());
this.recordingStream.onEvent(END_EVENT, event -> activeCounter.decrement());

Gauge.builder(VT_ACTIVE_METRIC_NAME, activeCounter::doubleValue)
.description("The number of active virtual threads")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than a gauge, I wonder if it might be more insightful to have two counters, one for started, one for ended. The active could be derived by taking the difference. And this would allow tracking the rate of virtual threads being started/stopped. Thoughts? Maybe in some use cases we would have to worry about overflow (assuming cumulative counters)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to have both counters. However, these types of events (start/end) are numerous, so they would increase rapidly, and overflow could become a common scenario.

Copy link

@Indresh2410 Indresh2410 Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ceremo @shakuzen IMO, if we have the ability to track rate of virtual threads being started along with active threads count, we could have an option to autoscale JVM apps based on rate

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for autoscaling, the important metric is whether the active (virtual) thread count is continually increasing. If you just look at the rate of threads starting without looking at the rate of threads finishing, I'm not sure it tells much that is important for autoscaling because if you have a high rate of starting threads but finishing is keeping up, there is no problem and no autoscaling needed. Given that, maybe the rate of starting or stopping on its own isn't that interesting and the gauge of active virtual threads is most useful.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @shakuzen . The above makes sense. But lets say we have active virtual threads count. Based on the traffic, we'll have to finetune our thresholds accordingly for active virtual threads

Whereas if we calculate % of virtual threads being used like (active threads/total threads created)*100, we can do 1 time setup where if threshold is greater than 60% (An example), autoscaling should kick in, which generally means active threads are increased (Rate of Finishing threads has slowed down)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the traffic, we'll have to finetune our thresholds accordingly for active virtual threads

I don't think so. You might want to check for an absolute threshold of active virtual threads that is cause for concern, and you may want to check over multiple step intervals that the active virtual thread count is increasing significantly.

Whereas if we calculate % of virtual threads being used like (active threads/total threads created)*100, we can do 1 time setup where if threshold is greater than 60%

Say normally you have between 10-100 active threads, but when there is an issue it climbs to 300, 500, 2000, etc active threads. If your application just started and there is some issue, checking the ratio of active threads to total created may catch the issue. But if the issue happens long after the application started, the ratio no longer catches it because the total threads created is monotonic; it increases over the lifetime of the application. For this reason, using such a ratio is not a good way to catch irregularities.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @shakuzen . One final doubt. Apologies for dragging the conversation

But if the issue happens long after the application started, the ratio no longer catches it because the total threads created is monotonic; it increases over the lifetime of the application. For this reason, using such a ratio is not a good way to catch irregularities.

Since virtual threads created will be a counter, won't the below expression help for us?

sum(active_thread_count)/sum(rate(virtual_threads_created[1m]))

Because taking rate of a counter will give the number of threads created over a period of interval right(1m)? Even if application runs for days/months? (Assuming active_thread_count is a gauge)

.tags(tags)
.register(registry);
}
}

private RecordingStream createRecordingStream(RecordingConfig config) {
RecordingStream recordingStream = new RecordingStream();
recordingStream.enable(PINNED_EVENT).withThreshold(config.pinnedThreshold);
recordingStream.enable(SUBMIT_FAILED_EVENT);
if (config.pinnedMetricEnabled) {
recordingStream.enable(PINNED_EVENT).withThreshold(config.pinnedThreshold);
}
if (config.submitFailedMetricEnabled) {
recordingStream.enable(SUBMIT_FAILED_EVENT);
}
if (config.activeMetricEnabled) {
recordingStream.enable(START_EVENT);
recordingStream.enable(END_EVENT);
}
recordingStream.setMaxAge(config.maxAge);
recordingStream.setMaxSize(config.maxSizeBytes);
recordingStream.startAsync();
@@ -90,12 +148,33 @@ public void close() {
recordingStream.close();
}

private record RecordingConfig(Duration maxAge, long maxSizeBytes, Duration pinnedThreshold) {
private RecordingConfig() {
this(Duration.ofSeconds(5), 10L * 1024 * 1024, Duration.ofMillis(20));
/**
* The RecordingConfig type allows you to configure the recording features and the
* enabled events to listen to.
*/
public record RecordingConfig(Duration maxAge, long maxSizeBytes, Duration pinnedThreshold,
boolean pinnedMetricEnabled, boolean submitFailedMetricEnabled, boolean activeMetricEnabled) {

public RecordingConfig() {
this(true, true, false);
}

public RecordingConfig(boolean pinnedMetricEnabled, boolean submitFailedMetricEnabled,
boolean activeMetricEnabled) {
this(Duration.ofSeconds(5), 10L * 1024 * 1024, Duration.ofMillis(20), pinnedMetricEnabled,
submitFailedMetricEnabled, activeMetricEnabled);
}

private RecordingConfig {
/**
* Instantiates a new Recording config.
* @param maxAge the max age
* @param maxSizeBytes the max size bytes
* @param pinnedThreshold the pinned threshold
* @param pinnedMetricEnabled the pinned metric enabled
* @param submitFailedMetricEnabled the submit failed metric enabled
* @param activeMetricEnabled the active metric enabled
*/
public RecordingConfig {
Objects.requireNonNull(maxAge, "maxAge parameter must not be null");
Objects.requireNonNull(pinnedThreshold, "pinnedThreshold must not be null");
if (maxSizeBytes < 0) {
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@
import java.util.concurrent.locks.LockSupport;

import static java.lang.Thread.State.WAITING;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;

@@ -56,8 +57,6 @@ class VirtualThreadMetricsReflectiveTests {
@BeforeEach
void setUp() {
registry = new SimpleMeterRegistry();
virtualThreadMetrics = new VirtualThreadMetrics(TAGS);
virtualThreadMetrics.bindTo(registry);
}

@AfterEach
@@ -72,6 +71,11 @@ void tearDown() {
*/
@Test
void submitFailedEventsShouldBeRecorded() {
VirtualThreadMetrics.RecordingConfig recordingConfig = new VirtualThreadMetrics.RecordingConfig(false, true,
false);
virtualThreadMetrics = new VirtualThreadMetrics(recordingConfig, TAGS);
virtualThreadMetrics.bindTo(registry);

try (ExecutorService cachedPool = Executors.newCachedThreadPool()) {
ThreadFactory factory = virtualThreadFactoryFor(cachedPool);
Thread thread = factory.newThread(LockSupport::park);
@@ -90,6 +94,8 @@ void submitFailedEventsShouldBeRecorded() {
assertThatThrownBy(() -> factory.newThread(LockSupport::park).start())
.isInstanceOf(RejectedExecutionException.class);
await().atMost(Duration.ofSeconds(2)).until(() -> counter.count() == 2);

assertThat(registry.getMeters()).containsExactly(counter);
}
}

Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
*/
package io.micrometer.java21.instrument.binder.jdk;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
@@ -48,8 +49,6 @@ class VirtualThreadMetricsTests {
@BeforeEach
void setUp() {
registry = new SimpleMeterRegistry();
virtualThreadMetrics = new VirtualThreadMetrics(TAGS);
virtualThreadMetrics.bindTo(registry);
}

@AfterEach
@@ -59,6 +58,11 @@ void tearDown() {

@Test
void pinnedEventsShouldBeRecorded() {
VirtualThreadMetrics.RecordingConfig recordingConfig = new VirtualThreadMetrics.RecordingConfig(true, false,
false);
virtualThreadMetrics = new VirtualThreadMetrics(recordingConfig, TAGS);
virtualThreadMetrics.bindTo(registry);

try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
CountDownLatch latch = new CountDownLatch(1);
List<Future<?>> futures = new ArrayList<>();
@@ -75,6 +79,25 @@ void pinnedEventsShouldBeRecorded() {
await().atMost(Duration.ofSeconds(2)).until(() -> timer.count() == 3);
assertThat(timer.max(MILLISECONDS)).isBetween(40d, 60d); // ~50ms
assertThat(timer.totalTime(MILLISECONDS)).isBetween(130d, 170d); // ~150ms
assertThat(registry.getMeters()).containsExactly(timer);
}
}

@Test
void startEndEventsShouldBeRecorded() {
VirtualThreadMetrics.RecordingConfig recordingConfig = new VirtualThreadMetrics.RecordingConfig(false, false,
true);
virtualThreadMetrics = new VirtualThreadMetrics(recordingConfig, TAGS);
virtualThreadMetrics.bindTo(registry);

try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 3; i++) {
executor.submit(() -> sleep(Duration.ofSeconds(1)));
}

Gauge gauge = registry.get("jvm.threads.virtual.active").tags(TAGS).gauge();
await().atMost(Duration.ofSeconds(2)).until(() -> gauge.value() == 3);
assertThat(registry.getMeters()).containsExactly(gauge);
}
}