diff --git a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java index 2a886704cd..afef8a59d4 100644 --- a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java +++ b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java @@ -447,6 +447,9 @@ public TimeoutContext withComputedServerSelectionTimeoutContext() { } public Timeout startWaitQueueTimeout(final StartTime checkoutStart) { + if (hasTimeoutMS()) { + return assertNotNull(timeout); + } final long ms = getTimeoutSettings().getMaxWaitTimeMS(); return checkoutStart.timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java index 0ef94d559c..664f886075 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java @@ -42,6 +42,7 @@ import com.mongodb.event.ConnectionPoolListener; import com.mongodb.event.ConnectionPoolReadyEvent; import com.mongodb.event.ConnectionReadyEvent; +import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.VisibleForTesting; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.connection.SdamServerDescriptionManager.SdamIssue; @@ -98,6 +99,7 @@ import static com.mongodb.event.ConnectionClosedEvent.Reason.ERROR; import static com.mongodb.internal.Locks.lockInterruptibly; import static com.mongodb.internal.Locks.withLock; +import static com.mongodb.internal.TimeoutContext.createMongoTimeoutException; import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; import static com.mongodb.internal.connection.ConcurrentPool.INFINITE_SIZE; @@ -193,7 +195,7 @@ public InternalConnection get(final OperationContext operationContext) { Timeout waitQueueTimeout = operationContext.getTimeoutContext().startWaitQueueTimeout(checkoutStart); try { stateAndGeneration.throwIfClosedOrPaused(); - PooledConnection connection = getPooledConnection(waitQueueTimeout, checkoutStart); + PooledConnection connection = getPooledConnection(waitQueueTimeout, checkoutStart, operationContext); if (!connection.opened()) { connection = openConcurrencyLimiter.openOrGetAvailable(operationContext, connection, waitQueueTimeout, checkoutStart); } @@ -208,7 +210,7 @@ public InternalConnection get(final OperationContext operationContext) { @Override public void getAsync(final OperationContext operationContext, final SingleResultCallback callback) { StartTime checkoutStart = connectionCheckoutStarted(operationContext); - Timeout maxWaitTimeout = checkoutStart.timeoutAfterOrInfiniteIfNegative(settings.getMaxWaitTime(NANOSECONDS), NANOSECONDS); + Timeout maxWaitTimeout = operationContext.getTimeoutContext().startWaitQueueTimeout(checkoutStart); SingleResultCallback eventSendingCallback = (connection, failure) -> { SingleResultCallback errHandlingCallback = errorHandlingCallback(callback, LOGGER); if (failure == null) { @@ -225,13 +227,13 @@ public void getAsync(final OperationContext operationContext, final SingleResult eventSendingCallback.onResult(null, e); return; } - asyncWorkManager.enqueue(new Task(maxWaitTimeout, checkoutStart, t -> { + asyncWorkManager.enqueue(new Task(maxWaitTimeout, checkoutStart, operationContext, t -> { if (t != null) { eventSendingCallback.onResult(null, t); } else { PooledConnection connection; try { - connection = getPooledConnection(maxWaitTimeout, checkoutStart); + connection = getPooledConnection(maxWaitTimeout, checkoutStart, operationContext); } catch (Exception e) { eventSendingCallback.onResult(null, e); return; @@ -330,7 +332,9 @@ public int getGeneration() { return stateAndGeneration.generation(); } - private PooledConnection getPooledConnection(final Timeout waitQueueTimeout, final StartTime startTime) throws MongoTimeoutException { + private PooledConnection getPooledConnection(final Timeout waitQueueTimeout, + final StartTime startTime, + final OperationContext operationContext) throws MongoTimeoutException { try { UsageTrackingInternalConnection internalConnection = waitQueueTimeout.call(NANOSECONDS, () -> pool.get(-1L, NANOSECONDS), @@ -345,7 +349,7 @@ private PooledConnection getPooledConnection(final Timeout waitQueueTimeout, fin } return new PooledConnection(internalConnection); } catch (MongoTimeoutException e) { - throw createTimeoutException(startTime); + throw createTimeoutException(startTime, operationContext.getTimeoutContext()); } } @@ -359,13 +363,14 @@ private PooledConnection getPooledConnectionImmediate() { return internalConnection == null ? null : new PooledConnection(internalConnection); } - private MongoTimeoutException createTimeoutException(final StartTime startTime) { + private MongoTimeoutException createTimeoutException(final StartTime startTime, final TimeoutContext timeoutContext) { long elapsedMs = startTime.elapsed().toMillis(); int numPinnedToCursor = pinnedStatsManager.getNumPinnedToCursor(); int numPinnedToTransaction = pinnedStatsManager.getNumPinnedToTransaction(); if (numPinnedToCursor == 0 && numPinnedToTransaction == 0) { - return new MongoTimeoutException(format("Timed out after %d ms while waiting for a connection to server %s.", - elapsedMs, serverId.getAddress())); + String errorMessage = format("Timed out after %d ms while waiting for a connection to server %s.", + elapsedMs, serverId.getAddress()); + return timeoutContext.hasTimeoutMS() ? createMongoTimeoutException(errorMessage) : new MongoTimeoutException(errorMessage); } else { int maxSize = pool.getMaxSize(); int numInUse = pool.getInUseCount(); @@ -394,12 +399,13 @@ private MongoTimeoutException createTimeoutException(final StartTime startTime) int numOtherInUse = numInUse - numPinnedToCursor - numPinnedToTransaction; assertTrue(numOtherInUse >= 0); assertTrue(numPinnedToCursor + numPinnedToTransaction + numOtherInUse <= maxSize); - return new MongoTimeoutException(format("Timed out after %d ms while waiting for a connection to server %s. Details: " + String errorMessage = format("Timed out after %d ms while waiting for a connection to server %s. Details: " + "maxPoolSize: %s, connections in use by cursors: %d, connections in use by transactions: %d, " + "connections in use by other operations: %d", elapsedMs, serverId.getAddress(), sizeToString(maxSize), numPinnedToCursor, numPinnedToTransaction, - numOtherInUse)); + numOtherInUse); + return timeoutContext.hasTimeoutMS() ? createMongoTimeoutException(errorMessage) : new MongoTimeoutException(errorMessage); } } @@ -965,7 +971,7 @@ private PooledConnection openWithConcurrencyLimit(final OperationContext operati PooledConnection availableConnection; try {//phase one availableConnection = acquirePermitOrGetAvailableOpenedConnection( - mode == OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE, waitQueueTimeout, startTime); + mode == OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE, waitQueueTimeout, startTime, operationContext); } catch (Exception e) { connection.closeSilently(); throw e; @@ -1007,7 +1013,7 @@ void openWithConcurrencyLimitAsync( final SingleResultCallback callback) { PooledConnection availableConnection; try {//phase one - availableConnection = acquirePermitOrGetAvailableOpenedConnection(true, maxWaitTimeout, startTime); + availableConnection = acquirePermitOrGetAvailableOpenedConnection(true, maxWaitTimeout, startTime, operationContext); } catch (Exception e) { connection.closeSilently(); callback.onResult(null, e); @@ -1038,7 +1044,8 @@ void openWithConcurrencyLimitAsync( */ @Nullable private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boolean tryGetAvailable, - final Timeout waitQueueTimeout, final StartTime startTime) + final Timeout waitQueueTimeout, final StartTime startTime, + final OperationContext operationContext) throws MongoTimeoutException, MongoInterruptedException { PooledConnection availableConnection = null; boolean expressedDesireToGetAvailableConnection = false; @@ -1067,7 +1074,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole & (availableConnection = tryGetAvailable ? tryGetAvailableConnection() : null) == null) { Timeout.onExistsAndExpired(waitQueueTimeout, () -> { - throw createTimeoutException(startTime); + throw createTimeoutException(startTime, operationContext.getTimeoutContext()); }); waitQueueTimeout.awaitOn(permitAvailableOrHandedOverOrClosedOrPausedCondition, () -> "acquiring permit or getting available opened connection"); @@ -1389,10 +1396,15 @@ final class Task { private final Timeout timeout; private final StartTime startTime; private final Consumer action; + private final OperationContext operationContext; private boolean completed; - Task(final Timeout timeout, final StartTime startTime, final Consumer action) { + Task(final Timeout timeout, + final StartTime startTime, + final OperationContext operationContext, + final Consumer action) { this.timeout = timeout; + this.operationContext = operationContext; this.startTime = startTime; this.action = action; } @@ -1406,7 +1418,7 @@ void failAsClosed() { } void failAsTimedOut() { - doComplete(() -> createTimeoutException(startTime)); + doComplete(() -> createTimeoutException(startTime, operationContext.getTimeoutContext())); } private void doComplete(final Supplier failureSupplier) { diff --git a/driver-core/src/main/com/mongodb/internal/time/Timeout.java b/driver-core/src/main/com/mongodb/internal/time/Timeout.java index 3dba42e580..c497f08945 100644 --- a/driver-core/src/main/com/mongodb/internal/time/Timeout.java +++ b/driver-core/src/main/com/mongodb/internal/time/Timeout.java @@ -21,8 +21,8 @@ import com.mongodb.internal.function.CheckedFunction; import com.mongodb.internal.function.CheckedRunnable; import com.mongodb.internal.function.CheckedSupplier; -import com.mongodb.lang.Nullable; import com.mongodb.lang.NonNull; +import com.mongodb.lang.Nullable; import java.util.Arrays; import java.util.Collections; @@ -40,6 +40,8 @@ /** * A Timeout is a "deadline", point in time by which something must happen. * + * Implementations of this interface must be immutable. + * * @see TimePoint */ public interface Timeout { diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java b/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java index 56122ec64a..c6e049ba0f 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java @@ -66,11 +66,13 @@ import static com.mongodb.ClusterFixture.createOperationContext; import static com.mongodb.internal.time.Timeout.ZeroSemantics.ZERO_DURATION_MEANS_EXPIRED; import static java.lang.Long.MAX_VALUE; +import static java.lang.Thread.sleep; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -133,6 +135,33 @@ public void shouldThrowOnTimeout() throws InterruptedException { assertTrue(connectionGetter.isGotTimeout()); } + @Test + public void shouldNotUseMaxAwaitTimeMSOnWhenTimeoutMsIsSet() throws InterruptedException { + // given + provider = new DefaultConnectionPool(SERVER_ID, connectionFactory, + ConnectionPoolSettings.builder() + .maxSize(1) + .build(), + mockSdamProvider(), OPERATION_CONTEXT_FACTORY); + provider.ready(); + TimeoutSettings timeoutSettings = TIMEOUT_SETTINGS + .withTimeout(100L, MILLISECONDS) + .withMaxWaitTimeMS(50); + + InternalConnection internalConnection = provider.get(createOperationContext(timeoutSettings)); + + // when + TimeoutTrackingConnectionGetter connectionGetter = new TimeoutTrackingConnectionGetter(provider, timeoutSettings); + new Thread(connectionGetter).start(); + + sleep(70); // wait for more than maxWaitTimeMS but less than timeoutMs. + internalConnection.close(); + connectionGetter.getLatch().await(); + + // then + assertFalse(connectionGetter.isGotTimeout()); + } + @Test public void shouldThrowOnPoolClosed() { provider = new DefaultConnectionPool(SERVER_ID, connectionFactory, @@ -166,7 +195,7 @@ public void shouldExpireConnectionAfterMaxLifeTime() throws InterruptedException // when provider.get(OPERATION_CONTEXT).close(); - Thread.sleep(100); + sleep(100); provider.doMaintenance(); provider.get(OPERATION_CONTEXT); @@ -187,7 +216,7 @@ public void shouldExpireConnectionAfterLifeTimeOnClose() throws InterruptedExcep // when InternalConnection connection = provider.get(OPERATION_CONTEXT); - Thread.sleep(50); + sleep(50); connection.close(); // then @@ -208,7 +237,7 @@ public void shouldExpireConnectionAfterMaxIdleTime() throws InterruptedException // when provider.get(OPERATION_CONTEXT).close(); - Thread.sleep(100); + sleep(100); provider.doMaintenance(); provider.get(OPERATION_CONTEXT); @@ -230,7 +259,7 @@ public void shouldCloseConnectionAfterExpiration() throws InterruptedException { // when provider.get(OPERATION_CONTEXT).close(); - Thread.sleep(50); + sleep(50); provider.doMaintenance(); provider.get(OPERATION_CONTEXT); @@ -252,7 +281,7 @@ public void shouldCreateNewConnectionAfterExpiration() throws InterruptedExcepti // when provider.get(OPERATION_CONTEXT).close(); - Thread.sleep(50); + sleep(50); provider.doMaintenance(); InternalConnection secondConnection = provider.get(OPERATION_CONTEXT); @@ -277,7 +306,7 @@ public void shouldPruneAfterMaintenanceTaskRuns() throws InterruptedException { // when - Thread.sleep(10); + sleep(10); provider.doMaintenance(); // then @@ -594,7 +623,7 @@ private static void useConcurrently(final DefaultConnectionPool pool, final int */ private static void sleepMillis(final long millis) { try { - Thread.sleep(millis); + sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java index 8eb47aa0a6..2db1239177 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java @@ -72,6 +72,8 @@ import java.time.Instant; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -781,6 +783,118 @@ public void shouldIgnoreWtimeoutMsOfWriteConcernToInitialAndSubsequentCommitTran }}); } + /** + * Not a prose spec test. However, it is additional test case for better coverage. + */ + @Test + @DisplayName("Should ignore waitQueueTimeoutMS when timeoutMS is set") + public void shouldIgnoreWaitQueueTimeoutMSWhenTimeoutMsIsSet() { + assumeTrue(serverVersionAtLeast(4, 4)); + + //given + try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder() + .timeout(500, TimeUnit.MILLISECONDS) + .applyToConnectionPoolSettings(builder -> builder + .maxWaitTime(1, TimeUnit.MILLISECONDS) + .maxSize(1) + ))) { + MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: { times: 1}," + + " data: {" + + " failCommands: [\"find\" ]," + + " blockConnection: true," + + " blockTimeMS: " + 300 + + " }" + + "}"); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + + executor.submit(() -> collection.find().first()); + sleep(100); + + //when && then + assertDoesNotThrow(() -> collection.find().first()); + } + } + + /** + * Not a prose spec test. However, it is additional test case for better coverage. + */ + @Test + @DisplayName("Should throw MongoOperationTimeoutException when connection is not available and timeoutMS is set") + public void shouldThrowOperationTimeoutExceptionWhenConnectionIsNotAvailableAndTimeoutMSIsSet() { + assumeTrue(serverVersionAtLeast(4, 4)); + + //given + try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder() + .timeout(100, TimeUnit.MILLISECONDS) + .applyToConnectionPoolSettings(builder -> builder + .maxWaitTime(1, TimeUnit.MILLISECONDS) + .maxSize(1) + ))) { + MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: { times: 1}," + + " data: {" + + " failCommands: [\"find\" ]," + + " blockConnection: true," + + " blockTimeMS: " + 500 + + " }" + + "}"); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + + executor.submit(() -> collection.withTimeout(0, TimeUnit.MILLISECONDS).find().first()); + sleep(100); + + //when && then + assertThrows(MongoOperationTimeoutException.class, () -> collection.find().first()); + } + } + + /** + * Not a prose spec test. However, it is additional test case for better coverage. + */ + @Test + @DisplayName("Should ignore waitQueueTimeoutMS when timeoutMS is set") + public void shouldUseWaitQueueTimeoutMSWhenTimeoutIsNotSet() { + assumeTrue(serverVersionAtLeast(4, 4)); + + //given + try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder() + .applyToConnectionPoolSettings(builder -> builder + .maxWaitTime(100, TimeUnit.MILLISECONDS) + .maxSize(1) + ))) { + MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: { times: 1}," + + " data: {" + + " failCommands: [\"find\" ]," + + " blockConnection: true," + + " blockTimeMS: " + 300 + + " }" + + "}"); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + + executor.submit(() -> collection.find().first()); + sleep(100); + + //when & then + assertThrows(MongoTimeoutException.class, () -> collection.find().first()); + } + } /** * Not a prose spec test. However, it is additional test case for better coverage.