Skip to content

Commit 24b19c7

Browse files
authored
UDS datagram support in StatsD (#2722)
Utilizes the support in Netty and Reactor Netty for Unix domain socket datagram protocol via the `UdpClient`. The StatsdConfig `host` should be the path to the socket when the `protocol` is configured to `UDS_DATAGRAM`. Resolves gh-792
1 parent 944032a commit 24b19c7

File tree

7 files changed

+80
-25
lines changed

7 files changed

+80
-25
lines changed

Diff for: implementations/micrometer-registry-statsd/build.gradle

+11
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,27 @@ plugins {
22
id 'com.github.johnrengelman.shadow' version '7.0.0'
33
}
44

5+
repositories {
6+
// TODO remove when reactor-netty 1.0.10 is released
7+
maven { url 'https://repo.spring.io/snapshot' }
8+
}
9+
510
dependencies {
611
api project(':micrometer-core')
712

813
implementation 'io.projectreactor:reactor-core'
914
implementation 'io.projectreactor.netty:reactor-netty-core'
15+
constraints {
16+
// TODO remove when reactor-netty 1.0.10 is released
17+
implementation 'io.projectreactor.netty:reactor-netty-core:1.0.10-SNAPSHOT'
18+
}
1019

1120
testImplementation project(':micrometer-test')
1221
testImplementation 'io.projectreactor:reactor-test'
1322
testImplementation 'ch.qos.logback:logback-classic'
1423
testImplementation 'org.awaitility:awaitility'
24+
// for running tests with UDS on OSX
25+
testImplementation 'io.netty:netty-transport-native-kqueue:4.1.66.Final:osx-x86_64'
1526
}
1627

1728
shadowJar {

Diff for: implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ default boolean enabled() {
5959
}
6060

6161
/**
62-
* @return The host name of the StatsD agent.
62+
* @return Host (or socket in case of Unix domain socket protocol) to receive StatsD metrics.
6363
*/
6464
default String host() {
6565
return getString(this, "host").orElse("localhost");

Diff for: implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdMeterRegistry.java

+9-8
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.micrometer.core.lang.Nullable;
2727
import io.micrometer.core.util.internal.logging.WarnThenDebugLogger;
2828
import io.micrometer.statsd.internal.*;
29+
import io.netty.channel.unix.DomainSocketAddress;
2930
import io.netty.util.AttributeKey;
3031
import org.reactivestreams.Publisher;
3132
import org.reactivestreams.Subscriber;
@@ -40,18 +41,17 @@
4041
import reactor.netty.udp.UdpClient;
4142
import reactor.util.retry.Retry;
4243

44+
import java.net.InetSocketAddress;
4345
import java.net.PortUnreachableException;
46+
import java.net.SocketAddress;
4447
import java.time.Duration;
4548
import java.util.Arrays;
4649
import java.util.Map;
4750
import java.util.concurrent.ConcurrentHashMap;
4851
import java.util.concurrent.TimeUnit;
4952
import java.util.concurrent.atomic.AtomicBoolean;
5053
import java.util.concurrent.atomic.AtomicReference;
51-
import java.util.function.Consumer;
52-
import java.util.function.Function;
53-
import java.util.function.ToDoubleFunction;
54-
import java.util.function.ToLongFunction;
54+
import java.util.function.*;
5555
import java.util.stream.DoubleStream;
5656

5757
/**
@@ -219,19 +219,20 @@ public void onComplete() {
219219
publisher = this.sink.asFlux();
220220
}
221221
if (statsdConfig.protocol() == StatsdProtocol.UDP) {
222-
prepareUdpClient(publisher);
222+
prepareUdpClient(publisher, () -> InetSocketAddress.createUnresolved(statsdConfig.host(), statsdConfig.port()));
223+
} else if (statsdConfig.protocol() == StatsdProtocol.UDS_DATAGRAM) {
224+
prepareUdpClient(publisher, () -> new DomainSocketAddress(statsdConfig.host()));
223225
} else if (statsdConfig.protocol() == StatsdProtocol.TCP) {
224226
prepareTcpClient(publisher);
225227
}
226228
}
227229
}
228230
}
229231

230-
private void prepareUdpClient(Publisher<String> publisher) {
232+
private void prepareUdpClient(Publisher<String> publisher, Supplier<SocketAddress> remoteAddress) {
231233
AtomicReference<UdpClient> udpClientReference = new AtomicReference<>();
232234
UdpClient udpClient = UdpClient.create()
233-
.host(statsdConfig.host())
234-
.port(statsdConfig.port())
235+
.remoteAddress(remoteAddress)
235236
.handle((in, out) -> out
236237
.sendString(publisher)
237238
.neverComplete()

Diff for: implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdProtocol.java

+2
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,7 @@
2323
*/
2424
public enum StatsdProtocol {
2525
UDP,
26+
/** Unix domain socket datagram */
27+
UDS_DATAGRAM,
2628
TCP
2729
}

Diff for: implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdConfigTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ void invalid() {
3737
assertThat(config.validate().failures().stream().map(Validated.Invalid::getMessage))
3838
.containsOnly(
3939
"should be one of 'ETSY', 'DATADOG', 'TELEGRAF', 'SYSDIG'",
40-
"should be one of 'UDP', 'TCP'",
40+
"should be one of 'UDP', 'UDS_DATAGRAM', 'TCP'",
4141
"must contain a valid time unit"
4242
)
4343
.hasSize(4);

Diff for: implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryPublishTest.java

+51-15
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import io.netty.channel.ChannelHandlerContext;
2222
import io.netty.channel.ChannelOutboundHandlerAdapter;
2323
import io.netty.channel.ChannelPromise;
24+
import io.netty.channel.unix.DomainSocketAddress;
2425
import io.netty.handler.logging.LogLevel;
2526
import io.netty.handler.logging.LoggingHandler;
2627
import org.junit.jupiter.api.AfterEach;
2728
import org.junit.jupiter.api.Test;
29+
import org.junit.jupiter.api.condition.OS;
2830
import org.junit.jupiter.params.ParameterizedTest;
2931
import org.junit.jupiter.params.provider.EnumSource;
3032
import reactor.core.Disposable;
@@ -34,6 +36,7 @@
3436
import reactor.netty.tcp.TcpServer;
3537
import reactor.netty.udp.UdpServer;
3638

39+
import java.io.File;
3740
import java.net.InetSocketAddress;
3841
import java.time.Duration;
3942
import java.util.concurrent.CountDownLatch;
@@ -44,6 +47,7 @@
4447

4548
import static org.assertj.core.api.Assertions.assertThat;
4649
import static org.awaitility.Awaitility.await;
50+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
4751

4852
/**
4953
* Tests {@link StatsdMeterRegistry} metrics publishing functionality.
@@ -52,6 +56,7 @@
5256
* @author Johnny Lim
5357
*/
5458
class StatsdMeterRegistryPublishTest {
59+
public static final String UDS_DATAGRAM_SOCKET_PATH = "/tmp/test-server.sock";
5560

5661
StatsdMeterRegistry meterRegistry;
5762
DisposableChannel server;
@@ -62,7 +67,9 @@ class StatsdMeterRegistryPublishTest {
6267

6368
@AfterEach
6469
void cleanUp() {
65-
meterRegistry.close();
70+
if (meterRegistry != null) {
71+
meterRegistry.close();
72+
}
6673
if (server != null) {
6774
server.disposeNow();
6875
}
@@ -71,10 +78,11 @@ void cleanUp() {
7178
@ParameterizedTest
7279
@EnumSource(StatsdProtocol.class)
7380
void receiveMetricsSuccessfully(StatsdProtocol protocol) throws InterruptedException {
81+
skipUdsTestOnWindows(protocol);
7482
serverLatch = new CountDownLatch(3);
7583
server = startServer(protocol, 0);
7684

77-
final int port = getPort();
85+
final int port = getPort(protocol);
7886

7987
meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM);
8088
startRegistryAndWaitForClient();
@@ -88,11 +96,12 @@ void receiveMetricsSuccessfully(StatsdProtocol protocol) throws InterruptedExcep
8896
@ParameterizedTest
8997
@EnumSource(StatsdProtocol.class)
9098
void resumeSendingMetrics_whenServerIntermittentlyFails(StatsdProtocol protocol) throws InterruptedException {
99+
skipUdsTestOnWindows(protocol);
91100
serverLatch = new CountDownLatch(1);
92101
AtomicInteger writeCount = new AtomicInteger();
93102
server = startServer(protocol, 0);
94103

95-
final int port = getPort();
104+
final int port = getPort(protocol);
96105

97106
meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM);
98107
startRegistryAndWaitForClient();
@@ -134,10 +143,11 @@ void resumeSendingMetrics_whenServerIntermittentlyFails(StatsdProtocol protocol)
134143
@EnumSource(StatsdProtocol.class)
135144
@Issue("#1676")
136145
void stopAndStartMeterRegistrySendsMetrics(StatsdProtocol protocol) throws InterruptedException {
146+
skipUdsTestOnWindows(protocol);
137147
serverLatch = new CountDownLatch(3);
138148
server = startServer(protocol, 0);
139149

140-
final int port = getPort();
150+
final int port = getPort(protocol);
141151

142152
meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM);
143153
startRegistryAndWaitForClient();
@@ -175,11 +185,12 @@ void stopAndStartMeterRegistryWithLineSink() throws InterruptedException {
175185
@ParameterizedTest
176186
@EnumSource(StatsdProtocol.class)
177187
void whenBackendInitiallyDown_metricsSentAfterBackendStarts(StatsdProtocol protocol) throws InterruptedException {
188+
skipUdsTestOnWindows(protocol);
178189
AtomicInteger writeCount = new AtomicInteger();
179190
serverLatch = new CountDownLatch(3);
180191
// start server to secure an open port
181192
server = startServer(protocol, 0);
182-
final int port = getPort();
193+
final int port = getPort(protocol);
183194
server.disposeNow();
184195
meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM);
185196
meterRegistry.start();
@@ -190,10 +201,10 @@ void whenBackendInitiallyDown_metricsSentAfterBackendStarts(StatsdProtocol proto
190201
await().until(() -> writeCount.get() == 3);
191202
}
192203
server = startServer(protocol, port);
193-
if (protocol == StatsdProtocol.TCP) {
194-
// client is null until TcpClient first connects
204+
if (protocol == StatsdProtocol.TCP || protocol == StatsdProtocol.UDS_DATAGRAM) {
205+
// client is null until connection established
195206
await().until(() -> meterRegistry.statsdConnection.get() != null);
196-
// TcpClient may take some time to reconnect to the server
207+
// client may take some time to reconnect to the server
197208
await().until(() -> !clientIsDisposed());
198209
}
199210
assertThat(serverLatch.getCount()).isEqualTo(3);
@@ -213,10 +224,11 @@ void whenBackendInitiallyDown_metricsSentAfterBackendStarts(StatsdProtocol proto
213224
@ParameterizedTest
214225
@EnumSource(StatsdProtocol.class)
215226
void whenRegistryStopped_doNotConnectToBackend(StatsdProtocol protocol) throws InterruptedException {
227+
skipUdsTestOnWindows(protocol);
216228
serverLatch = new CountDownLatch(3);
217229
// start server to secure an open port
218230
server = startServer(protocol, 0);
219-
final int port = getPort();
231+
final int port = getPort(protocol);
220232
meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM);
221233
startRegistryAndWaitForClient();
222234
server.disposeNow();
@@ -232,9 +244,10 @@ void whenRegistryStopped_doNotConnectToBackend(StatsdProtocol protocol) throws I
232244
@EnumSource(StatsdProtocol.class)
233245
@Issue("#2177")
234246
void whenSendError_reconnectsAndWritesNewMetrics(StatsdProtocol protocol) throws InterruptedException {
247+
skipUdsTestOnWindows(protocol);
235248
serverLatch = new CountDownLatch(3);
236249
server = startServer(protocol, 0);
237-
final int port = getPort();
250+
final int port = getPort(protocol);
238251
meterRegistry = new StatsdMeterRegistry(getUnbufferedConfig(protocol, port), Clock.SYSTEM);
239252
startRegistryAndWaitForClient();
240253
((Connection) meterRegistry.statsdConnection.get()).addHandler("writeFailure", new ChannelOutboundHandlerAdapter() {
@@ -255,15 +268,21 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
255268
await().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3)).until(() -> serverMetricReadCount.get() == 3);
256269
}
257270

258-
private int getPort() {
271+
private void skipUdsTestOnWindows(StatsdProtocol protocol) {
272+
if (protocol == StatsdProtocol.UDS_DATAGRAM)
273+
assumeTrue(!OS.WINDOWS.isCurrentOs());
274+
}
275+
276+
private int getPort(StatsdProtocol protocol) {
277+
if (protocol == StatsdProtocol.UDS_DATAGRAM) return 0;
259278
return ((InetSocketAddress) server.address()).getPort();
260279
}
261280

262281
private void trackWritesForUdpClient(StatsdProtocol protocol, AtomicInteger writeCount) {
263282
if (protocol == StatsdProtocol.UDP) {
264283
await().until(() -> meterRegistry.statsdConnection.get() != null);
265284
((Connection) meterRegistry.statsdConnection.get())
266-
.addHandler(new LoggingHandler("udpclient", LogLevel.INFO))
285+
.addHandler(new LoggingHandler("testudpclient", LogLevel.INFO))
267286
.addHandler(new ChannelOutboundHandlerAdapter() {
268287
@Override
269288
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
@@ -284,10 +303,10 @@ private boolean clientIsDisposed() {
284303
}
285304

286305
private DisposableChannel startServer(StatsdProtocol protocol, int port) {
287-
if (protocol == StatsdProtocol.UDP) {
306+
if (protocol == StatsdProtocol.UDP || protocol == StatsdProtocol.UDS_DATAGRAM) {
288307
return UdpServer.create()
289-
.host("localhost")
290-
.port(port)
308+
.bindAddress(() -> protocol == StatsdProtocol.UDP ? InetSocketAddress.createUnresolved("localhost", port)
309+
: newDomainSocketAddress())
291310
.handle((in, out) ->
292311
in.receive().asString()
293312
.flatMap(packet -> {
@@ -328,13 +347,30 @@ private DisposableChannel startServer(StatsdProtocol protocol, int port) {
328347
}
329348
}
330349

350+
private static DomainSocketAddress newDomainSocketAddress() {
351+
try {
352+
File tempFile = new File(UDS_DATAGRAM_SOCKET_PATH);
353+
tempFile.delete();
354+
tempFile.deleteOnExit();
355+
return new DomainSocketAddress(tempFile);
356+
}
357+
catch (Exception e) {
358+
throw new RuntimeException("Error creating a temporary file", e);
359+
}
360+
}
361+
331362
private StatsdConfig getUnbufferedConfig(StatsdProtocol protocol, int port) {
332363
return new StatsdConfig() {
333364
@Override
334365
public String get(String key) {
335366
return null;
336367
}
337368

369+
@Override
370+
public String host() {
371+
return protocol == StatsdProtocol.UDS_DATAGRAM ? UDS_DATAGRAM_SOCKET_PATH : "localhost";
372+
}
373+
338374
@Override
339375
public int port() {
340376
return port;

Diff for: samples/micrometer-samples-core/build.gradle

+5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@ plugins {
22
id 'java'
33
}
44

5+
repositories {
6+
// TODO remove when reactor-netty 1.0.10 is released
7+
maven { url 'https://repo.spring.io/snapshot' }
8+
}
9+
510
dependencies {
611
implementation platform('io.projectreactor:reactor-bom:2020.0.+')
712

0 commit comments

Comments
 (0)