Skip to content

Commit 4b6b761

Browse files
Virtual threads metrics (#5067)
Creates a new module micrometer-java21 for code that needs to be baselined on Java 21, including the added virtual threads metrics. The instrumentation is JFR-based, although that is an implementation detail. There are some configuration options for the instrumentation but they are not exposed as public API until there is a clear need for that from users. Fixes gh-3956 Co-authored-by: Jonatan Ivanov <[email protected]>
1 parent 4bbc13f commit 4b6b761

File tree

7 files changed

+407
-1
lines changed

7 files changed

+407
-1
lines changed

Diff for: build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ subprojects {
343343

344344
check.dependsOn("testModules")
345345

346-
if (!(project.name in ['micrometer-test-aspectj-ltw', 'micrometer-test-aspectj-ctw'])) { // add projects here that do not exist in the previous minor so should be excluded from japicmp
346+
if (!(project.name in ['micrometer-test-aspectj-ltw', 'micrometer-test-aspectj-ctw', 'micrometer-java21'])) { // add projects here that do not exist in the previous minor so should be excluded from japicmp
347347
apply plugin: 'me.champeau.gradle.japicmp'
348348
apply plugin: 'de.undercouch.download'
349349

Diff for: micrometer-java21/build.gradle

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
description 'Micrometer core classes that require Java 21'
2+
3+
// skip this module when building with jdk <21
4+
if (!javaLanguageVersion.canCompileOrRun(21)) {
5+
project.tasks.configureEach { task -> task.enabled = false }
6+
}
7+
8+
dependencies {
9+
api project(':micrometer-core')
10+
11+
testImplementation 'org.junit.jupiter:junit-jupiter'
12+
testImplementation 'org.assertj:assertj-core'
13+
testImplementation 'org.awaitility:awaitility'
14+
}
15+
16+
java {
17+
targetCompatibility = 21
18+
}
19+
20+
tasks.withType(JavaCompile).configureEach {
21+
sourceCompatibility = JavaVersion.VERSION_21
22+
targetCompatibility = JavaVersion.VERSION_21
23+
options.release = 21
24+
}
25+
26+
task reflectiveTests(type: Test) {
27+
useJUnitPlatform {
28+
includeTags 'reflective'
29+
}
30+
31+
// This hack is needed since VirtualThreadMetricsReflectiveTests utilizes reflection against java.lang, see its javadoc
32+
jvmArgs += ['--add-opens', 'java.base/java.lang=ALL-UNNAMED']
33+
}
34+
35+
test {
36+
dependsOn reflectiveTests
37+
useJUnitPlatform {
38+
excludeTags 'reflective'
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright 2024 VMware, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.micrometer.java21.instrument.binder.jdk;
17+
18+
import io.micrometer.core.instrument.Counter;
19+
import io.micrometer.core.instrument.MeterRegistry;
20+
import io.micrometer.core.instrument.Tag;
21+
import io.micrometer.core.instrument.Timer;
22+
import io.micrometer.core.instrument.binder.MeterBinder;
23+
import jdk.jfr.consumer.RecordingStream;
24+
25+
import java.io.Closeable;
26+
import java.time.Duration;
27+
import java.util.Objects;
28+
29+
import static java.util.Collections.emptyList;
30+
31+
/**
32+
* Instrumentation support for Virtual Threads, see:
33+
* https://openjdk.org/jeps/425#JDK-Flight-Recorder-JFR
34+
*
35+
* @author Artyom Gabeev
36+
* @since 1.14.0
37+
*/
38+
public class VirtualThreadMetrics implements MeterBinder, Closeable {
39+
40+
private static final String PINNED_EVENT = "jdk.VirtualThreadPinned";
41+
42+
private static final String SUBMIT_FAILED_EVENT = "jdk.VirtualThreadSubmitFailed";
43+
44+
private final RecordingStream recordingStream;
45+
46+
private final Iterable<Tag> tags;
47+
48+
public VirtualThreadMetrics() {
49+
this(new RecordingConfig(), emptyList());
50+
}
51+
52+
public VirtualThreadMetrics(Iterable<Tag> tags) {
53+
this(new RecordingConfig(), tags);
54+
}
55+
56+
private VirtualThreadMetrics(RecordingConfig config, Iterable<Tag> tags) {
57+
this.recordingStream = createRecordingStream(config);
58+
this.tags = tags;
59+
}
60+
61+
@Override
62+
public void bindTo(MeterRegistry registry) {
63+
Timer pinnedTimer = Timer.builder("jvm.threads.virtual.pinned")
64+
.description("The duration while the virtual thread was pinned without releasing its platform thread")
65+
.tags(tags)
66+
.register(registry);
67+
68+
Counter submitFailedCounter = Counter.builder("jvm.threads.virtual.submit.failed")
69+
.description("The number of events when starting or unparking a virtual thread failed")
70+
.tags(tags)
71+
.register(registry);
72+
73+
recordingStream.onEvent(PINNED_EVENT, event -> pinnedTimer.record(event.getDuration()));
74+
recordingStream.onEvent(SUBMIT_FAILED_EVENT, event -> submitFailedCounter.increment());
75+
}
76+
77+
private RecordingStream createRecordingStream(RecordingConfig config) {
78+
RecordingStream recordingStream = new RecordingStream();
79+
recordingStream.enable(PINNED_EVENT).withThreshold(config.pinnedThreshold);
80+
recordingStream.enable(SUBMIT_FAILED_EVENT);
81+
recordingStream.setMaxAge(config.maxAge);
82+
recordingStream.setMaxSize(config.maxSizeBytes);
83+
recordingStream.startAsync();
84+
85+
return recordingStream;
86+
}
87+
88+
@Override
89+
public void close() {
90+
recordingStream.close();
91+
}
92+
93+
private record RecordingConfig(Duration maxAge, long maxSizeBytes, Duration pinnedThreshold) {
94+
private RecordingConfig() {
95+
this(Duration.ofSeconds(5), 10L * 1024 * 1024, Duration.ofMillis(20));
96+
}
97+
98+
private RecordingConfig {
99+
Objects.requireNonNull(maxAge, "maxAge parameter must not be null");
100+
Objects.requireNonNull(pinnedThreshold, "pinnedThreshold must not be null");
101+
if (maxSizeBytes < 0) {
102+
throw new IllegalArgumentException("maxSizeBytes must be positive");
103+
}
104+
}
105+
}
106+
107+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2024 VMware, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/**
18+
* Instrumentation of JDK classes.
19+
*/
20+
@NonNullApi
21+
@NonNullFields
22+
package io.micrometer.java21.instrument.binder.jdk;
23+
24+
import io.micrometer.common.lang.NonNullApi;
25+
import io.micrometer.common.lang.NonNullFields;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright 2024 VMware, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.micrometer.java21.instrument.binder.jdk;
17+
18+
import io.micrometer.core.instrument.Counter;
19+
import io.micrometer.core.instrument.Tags;
20+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
21+
import org.junit.jupiter.api.AfterEach;
22+
import org.junit.jupiter.api.BeforeEach;
23+
import org.junit.jupiter.api.Tag;
24+
import org.junit.jupiter.api.Test;
25+
26+
import java.lang.reflect.Constructor;
27+
import java.time.Duration;
28+
import java.util.concurrent.*;
29+
import java.util.concurrent.locks.LockSupport;
30+
31+
import static java.lang.Thread.State.WAITING;
32+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
33+
import static org.awaitility.Awaitility.await;
34+
35+
/**
36+
* Tests for {@link VirtualThreadMetrics}. If you run these tests from your IDE,
37+
* {@link #submitFailedEventsShouldBeRecorded()} might fail depending on your setup. This
38+
* is because the test (through {@link #virtualThreadFactoryFor(Executor)}) utilizes
39+
* reflection against the {@code java.lang} package which needs to be explicitly enabled.
40+
* If you run into such an issue you can either change your setup and let your IDE run the
41+
* tests utilizing the build system (Gradle) or add the following JVM arg to your test
42+
* config: {@code --add-opens java.base/java.lang=ALL-UNNAMED}
43+
*
44+
* @author Artyom Gabeev
45+
* @author Jonatan Ivanov
46+
*/
47+
@Tag("reflective")
48+
class VirtualThreadMetricsReflectiveTests {
49+
50+
private static final Tags TAGS = Tags.of("k", "v");
51+
52+
private SimpleMeterRegistry registry;
53+
54+
private VirtualThreadMetrics virtualThreadMetrics;
55+
56+
@BeforeEach
57+
void setUp() {
58+
registry = new SimpleMeterRegistry();
59+
virtualThreadMetrics = new VirtualThreadMetrics(TAGS);
60+
virtualThreadMetrics.bindTo(registry);
61+
}
62+
63+
@AfterEach
64+
void tearDown() {
65+
virtualThreadMetrics.close();
66+
}
67+
68+
/**
69+
* Uses a similar approach as the JDK tests to make starting or unparking a virtual
70+
* thread fail, see {@link #virtualThreadFactoryFor(Executor)} and <a href=
71+
* "https://github.com/openjdk/jdk/blob/fdfe503d016086cf78b5a8c27dbe45f0261c68ab/test/jdk/java/lang/Thread/virtual/JfrEvents.java#L143-L187">JfrEvents.java</a>
72+
*/
73+
@Test
74+
void submitFailedEventsShouldBeRecorded() {
75+
try (ExecutorService cachedPool = Executors.newCachedThreadPool()) {
76+
ThreadFactory factory = virtualThreadFactoryFor(cachedPool);
77+
Thread thread = factory.newThread(LockSupport::park);
78+
thread.start();
79+
80+
await().atMost(Duration.ofSeconds(2)).until(() -> thread.getState() == WAITING);
81+
cachedPool.shutdown();
82+
83+
// unpark, the pool was shut down, this should fail
84+
assertThatThrownBy(() -> LockSupport.unpark(thread)).isInstanceOf(RejectedExecutionException.class);
85+
86+
Counter counter = registry.get("jvm.threads.virtual.submit.failed").tags(TAGS).counter();
87+
await().atMost(Duration.ofSeconds(2)).until(() -> counter.count() == 1);
88+
89+
// park, the pool was shut down, this should fail
90+
assertThatThrownBy(() -> factory.newThread(LockSupport::park).start())
91+
.isInstanceOf(RejectedExecutionException.class);
92+
await().atMost(Duration.ofSeconds(2)).until(() -> counter.count() == 2);
93+
}
94+
}
95+
96+
/**
97+
* Creates a {@link ThreadFactory} for virtual threads. The created virtual threads
98+
* will be bound to the provided platform thread pool instead of a default
99+
* ForkJoinPool. At its current form, this is a hack, it utilizes reflection to supply
100+
* the platform thread pool. It seems though there is no other way of doing this, the
101+
* JDK tests are also utilizing reflection to do the same, see: <a href=
102+
* "https://github.com/openjdk/jdk/blob/fdfe503d016086cf78b5a8c27dbe45f0261c68ab/test/lib/jdk/test/lib/thread/VThreadScheduler.java#L71-L90">VThreadScheduler.java</a>
103+
* @param pool platform pool
104+
* @return virtual thread factory bound to the provided platform pool
105+
*/
106+
private static ThreadFactory virtualThreadFactoryFor(Executor pool) {
107+
try {
108+
Class<?> clazz = Class.forName("java.lang.ThreadBuilders$VirtualThreadBuilder");
109+
Constructor<?> constructor = clazz.getDeclaredConstructor(Executor.class);
110+
constructor.setAccessible(true);
111+
return ((Thread.Builder.OfVirtual) constructor.newInstance(pool)).factory();
112+
}
113+
catch (Exception e) {
114+
throw new RuntimeException(e);
115+
}
116+
}
117+
118+
}

0 commit comments

Comments
 (0)