Skip to content

Commit 124baac

Browse files
authored
Support active thread metric for Executors.newVirtualThreadPerTaskExecutor() (#6008)
Support active thread metrics for Executors.newVirtualThreadPerTaskExecutor(), if reflection is allowed via the `--add-opens` option. Closes gh-5488 Signed-off-by: Johnny Lim <[email protected]>
1 parent cede2bf commit 124baac

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

Diff for: micrometer-core/src/main/java/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetrics.java

+44
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@
2828
import io.micrometer.core.instrument.internal.TimedExecutorService;
2929
import io.micrometer.core.instrument.internal.TimedScheduledExecutorService;
3030

31+
import java.lang.invoke.MethodHandle;
32+
import java.lang.invoke.MethodHandles;
3133
import java.lang.reflect.Field;
34+
import java.lang.reflect.Method;
3235
import java.util.List;
3336
import java.util.Set;
3437
import java.util.concurrent.*;
@@ -56,6 +59,11 @@
5659
@NonNullFields
5760
public class ExecutorServiceMetrics implements MeterBinder {
5861

62+
private static final String CLASS_NAME_THREAD_PER_TASK_EXECUTOR = "java.util.concurrent.ThreadPerTaskExecutor";
63+
64+
@Nullable
65+
private static final MethodHandle METHOD_HANDLE_THREAD_COUNT_FROM_THREAD_PER_TASK_EXECUTOR = getMethodHandleForThreadCountFromThreadPerTaskExecutor();
66+
5967
private static boolean allowIllegalReflectiveAccess = true;
6068

6169
private static final InternalLogger log = InternalLoggerFactory.getInstance(ExecutorServiceMetrics.class);
@@ -315,6 +323,9 @@ else if (className.equals("java.util.concurrent.Executors$FinalizableDelegatedEx
315323
monitor(registry,
316324
unwrapThreadPoolExecutor(executorService, executorService.getClass().getSuperclass()));
317325
}
326+
else if (className.equals(CLASS_NAME_THREAD_PER_TASK_EXECUTOR)) {
327+
monitorThreadPerTaskExecutor(registry, executorService);
328+
}
318329
else {
319330
log.warn("Failed to bind as {} is unsupported.", className);
320331
}
@@ -439,6 +450,39 @@ private void monitor(MeterRegistry registry, ForkJoinPool fj) {
439450
registeredMeterIds.addAll(meters.stream().map(Meter::getId).collect(toSet()));
440451
}
441452

453+
private void monitorThreadPerTaskExecutor(MeterRegistry registry, ExecutorService executorService) {
454+
List<Meter> meters = asList(Gauge
455+
.builder(metricPrefix + "executor.active", executorService,
456+
ExecutorServiceMetrics::getThreadCountFromThreadPerTaskExecutor)
457+
.tags(tags)
458+
.description("The approximate number of threads that are actively executing tasks")
459+
.baseUnit(BaseUnits.THREADS)
460+
.register(registry));
461+
registeredMeterIds.addAll(meters.stream().map(Meter::getId).collect(toSet()));
462+
}
463+
464+
private static long getThreadCountFromThreadPerTaskExecutor(ExecutorService executorService) {
465+
try {
466+
return (long) METHOD_HANDLE_THREAD_COUNT_FROM_THREAD_PER_TASK_EXECUTOR.invoke(executorService);
467+
}
468+
catch (Throwable e) {
469+
throw new RuntimeException(e);
470+
}
471+
}
472+
473+
@Nullable
474+
private static MethodHandle getMethodHandleForThreadCountFromThreadPerTaskExecutor() {
475+
try {
476+
Class<?> clazz = Class.forName(CLASS_NAME_THREAD_PER_TASK_EXECUTOR);
477+
Method method = clazz.getMethod("threadCount");
478+
method.setAccessible(true);
479+
return MethodHandles.lookup().unreflect(method);
480+
}
481+
catch (Throwable e) {
482+
return null;
483+
}
484+
}
485+
442486
/**
443487
* Disable illegal reflective accesses.
444488
*

Diff for: micrometer-java21/src/test/java/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetricsReflectiveTests.java

+24
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,17 @@
1919
import org.junit.jupiter.api.Tag;
2020
import org.junit.jupiter.api.Test;
2121

22+
import java.time.Duration;
2223
import java.util.concurrent.*;
2324

2425
import static org.assertj.core.api.Assertions.assertThat;
26+
import static org.awaitility.Awaitility.await;
2527

2628
/**
2729
* Tests for {@link ExecutorServiceMetrics} with reflection enabled.
2830
*
2931
* @author Tommy Ludwig
32+
* @author Johnny Lim
3033
*/
3134
@Tag("reflective")
3235
class ExecutorServiceMetricsReflectiveTests {
@@ -45,4 +48,25 @@ void threadPoolMetricsWith_AutoShutdownDelegatedExecutorService() throws Interru
4548
assertThat(registry.get("executor.completed").tag("name", "test").functionCounter().count()).isEqualTo(1L);
4649
}
4750

51+
@Test
52+
void monitorWithExecutorsNewVirtualThreadPerTaskExecutor() {
53+
CountDownLatch latch = new CountDownLatch(1);
54+
ExecutorService unmonitored = Executors.newVirtualThreadPerTaskExecutor();
55+
assertThat(unmonitored.getClass().getName()).isEqualTo("java.util.concurrent.ThreadPerTaskExecutor");
56+
ExecutorService monitored = ExecutorServiceMetrics.monitor(registry, unmonitored, "test");
57+
monitored.execute(() -> {
58+
try {
59+
latch.await(1, TimeUnit.SECONDS);
60+
}
61+
catch (InterruptedException e) {
62+
throw new RuntimeException(e);
63+
}
64+
});
65+
await().atMost(Duration.ofSeconds(1))
66+
.untilAsserted(() -> assertThat(registry.get("executor.active").gauge().value()).isEqualTo(1));
67+
latch.countDown();
68+
await().atMost(Duration.ofSeconds(1))
69+
.untilAsserted(() -> assertThat(registry.get("executor.active").gauge().value()).isEqualTo(0));
70+
}
71+
4872
}

0 commit comments

Comments
 (0)