Skip to content

Commit ff5ae3f

Browse files
authored
Add read timeout support to BoltConnection (#1612)
1 parent aa30d32 commit ff5ae3f

File tree

7 files changed

+86
-5
lines changed

7 files changed

+86
-5
lines changed

bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/BoltConnectionImpl.java

+10
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,11 @@ public CompletionStage<Void> close() {
487487
return close.exceptionally(ignored -> null);
488488
}
489489

490+
@Override
491+
public CompletionStage<Void> setReadTimeout(Duration duration) {
492+
return executeInEventLoop(() -> connection.setReadTimeout(duration));
493+
}
494+
490495
@Override
491496
public BoltConnectionState state() {
492497
var state = stateRef.get();
@@ -528,6 +533,11 @@ public boolean serverSideRoutingEnabled() {
528533
return serverSideRouting;
529534
}
530535

536+
@Override
537+
public Optional<Duration> defaultReadTimeout() {
538+
return connection.defaultReadTimeoutMillis();
539+
}
540+
531541
private CompletionStage<Void> executeInEventLoop(Runnable runnable) {
532542
var executeFuture = new CompletableFuture<Void>();
533543
Runnable stageCompletingRunnable = () -> {

bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/async/NetworkConnection.java

+30-5
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import io.netty.channel.ChannelFutureListener;
2424
import io.netty.channel.ChannelHandler;
2525
import io.netty.channel.EventLoop;
26+
import java.time.Duration;
2627
import java.util.Collections;
28+
import java.util.Optional;
2729
import java.util.concurrent.CompletableFuture;
2830
import java.util.concurrent.CompletionStage;
2931
import java.util.concurrent.TimeUnit;
@@ -56,7 +58,8 @@ public class NetworkConnection implements Connection {
5658
private final boolean ssrEnabled;
5759
private final BoltProtocol protocol;
5860

59-
private final Long connectionReadTimeout;
61+
private final Duration defaultReadTimeout;
62+
private Duration readTimeout;
6063

6164
private ChannelHandler connectionReadTimeoutHandler;
6265

@@ -70,8 +73,10 @@ public NetworkConnection(Channel channel, LoggingProvider logging) {
7073
this.telemetryEnabled = ChannelAttributes.telemetryEnabled(channel);
7174
this.ssrEnabled = ChannelAttributes.ssrEnabled(channel);
7275
this.protocol = BoltProtocol.forChannel(channel);
73-
this.connectionReadTimeout =
74-
ChannelAttributes.connectionReadTimeout(channel).orElse(null);
76+
this.defaultReadTimeout = ChannelAttributes.connectionReadTimeout(channel)
77+
.map(Duration::ofSeconds)
78+
.orElse(null);
79+
this.readTimeout = defaultReadTimeout;
7580
}
7681

7782
@Override
@@ -179,6 +184,25 @@ public EventLoop eventLoop() {
179184
return channel.eventLoop();
180185
}
181186

187+
@Override
188+
public Optional<Duration> defaultReadTimeoutMillis() {
189+
return Optional.ofNullable(defaultReadTimeout);
190+
}
191+
192+
@Override
193+
public void setReadTimeout(Duration duration) {
194+
if (!channel.eventLoop().inEventLoop()) {
195+
throw new IllegalStateException("This method may only be called in the EventLoop");
196+
}
197+
198+
if (duration != null && duration.toMillis() > 0) {
199+
// only values greater than zero milliseconds are supported
200+
this.readTimeout = duration;
201+
} else {
202+
this.readTimeout = this.defaultReadTimeout;
203+
}
204+
}
205+
182206
private CompletionStage<Void> writeMessageInEventLoop(Message message, ResponseHandler handler) {
183207
var future = new CompletableFuture<Void>();
184208
Runnable runnable = () -> {
@@ -215,8 +239,9 @@ private void registerConnectionReadTimeout(Channel channel) {
215239
throw new IllegalStateException("This method may only be called in the EventLoop");
216240
}
217241

218-
if (connectionReadTimeout != null && connectionReadTimeoutHandler == null) {
219-
connectionReadTimeoutHandler = new ConnectionReadTimeoutHandler(connectionReadTimeout, TimeUnit.SECONDS);
242+
if (this.readTimeout != null && connectionReadTimeoutHandler == null) {
243+
connectionReadTimeoutHandler =
244+
new ConnectionReadTimeoutHandler(readTimeout.toMillis(), TimeUnit.MILLISECONDS);
220245
channel.pipeline().addFirst(connectionReadTimeoutHandler);
221246
log.log(System.Logger.Level.DEBUG, "Added ConnectionReadTimeoutHandler");
222247
messageDispatcher.setBeforeLastHandlerHook(() -> {

bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/impl/spi/Connection.java

+6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.neo4j.driver.internal.bolt.basicimpl.impl.spi;
1818

1919
import io.netty.channel.EventLoop;
20+
import java.time.Duration;
21+
import java.util.Optional;
2022
import java.util.concurrent.CompletionStage;
2123
import org.neo4j.driver.internal.bolt.api.BoltServerAddress;
2224
import org.neo4j.driver.internal.bolt.basicimpl.impl.messaging.BoltProtocol;
@@ -48,4 +50,8 @@ public interface Connection {
4850
CompletionStage<Void> close();
4951

5052
EventLoop eventLoop();
53+
54+
Optional<Duration> defaultReadTimeoutMillis();
55+
56+
void setReadTimeout(Duration duration);
5157
}

bolt-api-pooled/src/main/java/org/neo4j/driver/internal/bolt/pooledimpl/impl/PooledBoltConnection.java

+11
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.time.Duration;
2020
import java.util.Map;
2121
import java.util.Objects;
22+
import java.util.Optional;
2223
import java.util.Set;
2324
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.CompletionStage;
@@ -316,6 +317,11 @@ public CompletionStage<Void> close() {
316317
return closeFuture;
317318
}
318319

320+
@Override
321+
public CompletionStage<Void> setReadTimeout(Duration duration) {
322+
return delegate.setReadTimeout(duration);
323+
}
324+
319325
@Override
320326
public BoltConnectionState state() {
321327
return delegate.state();
@@ -351,6 +357,11 @@ public boolean serverSideRoutingEnabled() {
351357
return delegate.serverSideRoutingEnabled();
352358
}
353359

360+
@Override
361+
public Optional<Duration> defaultReadTimeout() {
362+
return delegate.defaultReadTimeout();
363+
}
364+
354365
// internal use only
355366
public BoltConnection delegate() {
356367
return delegate;

bolt-api-routed/src/main/java/org/neo4j/driver/internal/bolt/routedimpl/impl/RoutedBoltConnection.java

+11
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.time.Duration;
2222
import java.util.Map;
2323
import java.util.Objects;
24+
import java.util.Optional;
2425
import java.util.Set;
2526
import java.util.concurrent.CompletionStage;
2627
import org.neo4j.driver.internal.bolt.api.AccessMode;
@@ -273,6 +274,11 @@ public CompletionStage<Void> close() {
273274
return delegate.close();
274275
}
275276

277+
@Override
278+
public CompletionStage<Void> setReadTimeout(Duration duration) {
279+
return delegate.setReadTimeout(duration);
280+
}
281+
276282
@Override
277283
public BoltConnectionState state() {
278284
return delegate.state();
@@ -308,6 +314,11 @@ public boolean serverSideRoutingEnabled() {
308314
return delegate.serverSideRoutingEnabled();
309315
}
310316

317+
@Override
318+
public Optional<Duration> defaultReadTimeout() {
319+
return delegate.defaultReadTimeout();
320+
}
321+
311322
private Throwable handledError(Throwable receivedError, boolean notifyHandler) {
312323
var error = FutureUtil.completionExceptionCause(receivedError);
313324

bolt-api/src/main/java/org/neo4j/driver/internal/bolt/api/BoltConnection.java

+7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.time.Duration;
2020
import java.util.Map;
21+
import java.util.Optional;
2122
import java.util.Set;
2223
import java.util.concurrent.CompletionStage;
2324
import org.neo4j.driver.internal.bolt.api.values.Value;
@@ -75,6 +76,10 @@ CompletionStage<BoltConnection> runInAutoCommitTransaction(
7576

7677
CompletionStage<Void> close();
7778

79+
// ----- STATE UPDATES -----
80+
81+
CompletionStage<Void> setReadTimeout(Duration duration);
82+
7883
// ----- MUTABLE DATA -----
7984

8085
BoltConnectionState state();
@@ -92,4 +97,6 @@ CompletionStage<BoltConnection> runInAutoCommitTransaction(
9297
boolean telemetrySupported();
9398

9499
boolean serverSideRoutingEnabled();
100+
101+
Optional<Duration> defaultReadTimeout();
95102
}

driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnection.java

+11
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.time.Duration;
2020
import java.util.Map;
2121
import java.util.Objects;
22+
import java.util.Optional;
2223
import java.util.Set;
2324
import java.util.concurrent.CompletionStage;
2425
import org.neo4j.driver.internal.bolt.api.AccessMode;
@@ -168,6 +169,11 @@ public CompletionStage<Void> close() {
168169
return delegate.close().whenComplete((ignored, throwable) -> boltConnectionListener.onClose(this));
169170
}
170171

172+
@Override
173+
public CompletionStage<Void> setReadTimeout(Duration duration) {
174+
return delegate.setReadTimeout(duration);
175+
}
176+
171177
@Override
172178
public BoltConnectionState state() {
173179
return delegate.state();
@@ -202,4 +208,9 @@ public boolean telemetrySupported() {
202208
public boolean serverSideRoutingEnabled() {
203209
return delegate.serverSideRoutingEnabled();
204210
}
211+
212+
@Override
213+
public Optional<Duration> defaultReadTimeout() {
214+
return delegate.defaultReadTimeout();
215+
}
205216
}

0 commit comments

Comments
 (0)