Skip to content

Ignore maxWaitTime when CSOT is enabled. #1744

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,9 @@ public TimeoutContext withComputedServerSelectionTimeoutContext() {
}

public Timeout startWaitQueueTimeout(final StartTime checkoutStart) {
if (hasTimeoutMS()) {
return assertNotNull(timeout);
}
final long ms = getTimeoutSettings().getMaxWaitTimeMS();
return checkoutStart.timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.mongodb.event.ConnectionPoolListener;
import com.mongodb.event.ConnectionPoolReadyEvent;
import com.mongodb.event.ConnectionReadyEvent;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.VisibleForTesting;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.SdamServerDescriptionManager.SdamIssue;
Expand Down Expand Up @@ -98,6 +99,7 @@
import static com.mongodb.event.ConnectionClosedEvent.Reason.ERROR;
import static com.mongodb.internal.Locks.lockInterruptibly;
import static com.mongodb.internal.Locks.withLock;
import static com.mongodb.internal.TimeoutContext.createMongoTimeoutException;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.connection.ConcurrentPool.INFINITE_SIZE;
Expand Down Expand Up @@ -193,7 +195,7 @@ public InternalConnection get(final OperationContext operationContext) {
Timeout waitQueueTimeout = operationContext.getTimeoutContext().startWaitQueueTimeout(checkoutStart);
try {
stateAndGeneration.throwIfClosedOrPaused();
PooledConnection connection = getPooledConnection(waitQueueTimeout, checkoutStart);
PooledConnection connection = getPooledConnection(waitQueueTimeout, checkoutStart, operationContext);
if (!connection.opened()) {
connection = openConcurrencyLimiter.openOrGetAvailable(operationContext, connection, waitQueueTimeout, checkoutStart);
}
Expand All @@ -208,7 +210,7 @@ public InternalConnection get(final OperationContext operationContext) {
@Override
public void getAsync(final OperationContext operationContext, final SingleResultCallback<InternalConnection> callback) {
StartTime checkoutStart = connectionCheckoutStarted(operationContext);
Timeout maxWaitTimeout = checkoutStart.timeoutAfterOrInfiniteIfNegative(settings.getMaxWaitTime(NANOSECONDS), NANOSECONDS);
Timeout maxWaitTimeout = operationContext.getTimeoutContext().startWaitQueueTimeout(checkoutStart);
SingleResultCallback<PooledConnection> eventSendingCallback = (connection, failure) -> {
SingleResultCallback<InternalConnection> errHandlingCallback = errorHandlingCallback(callback, LOGGER);
if (failure == null) {
Expand All @@ -225,13 +227,13 @@ public void getAsync(final OperationContext operationContext, final SingleResult
eventSendingCallback.onResult(null, e);
return;
}
asyncWorkManager.enqueue(new Task(maxWaitTimeout, checkoutStart, t -> {
asyncWorkManager.enqueue(new Task(maxWaitTimeout, checkoutStart, operationContext, t -> {
if (t != null) {
eventSendingCallback.onResult(null, t);
} else {
PooledConnection connection;
try {
connection = getPooledConnection(maxWaitTimeout, checkoutStart);
connection = getPooledConnection(maxWaitTimeout, checkoutStart, operationContext);
} catch (Exception e) {
eventSendingCallback.onResult(null, e);
return;
Expand Down Expand Up @@ -330,7 +332,9 @@ public int getGeneration() {
return stateAndGeneration.generation();
}

private PooledConnection getPooledConnection(final Timeout waitQueueTimeout, final StartTime startTime) throws MongoTimeoutException {
private PooledConnection getPooledConnection(final Timeout waitQueueTimeout,
final StartTime startTime,
final OperationContext operationContext) throws MongoTimeoutException {
try {
UsageTrackingInternalConnection internalConnection = waitQueueTimeout.call(NANOSECONDS,
() -> pool.get(-1L, NANOSECONDS),
Expand All @@ -345,7 +349,7 @@ private PooledConnection getPooledConnection(final Timeout waitQueueTimeout, fin
}
return new PooledConnection(internalConnection);
} catch (MongoTimeoutException e) {
throw createTimeoutException(startTime);
throw createTimeoutException(startTime, operationContext.getTimeoutContext());
}
}

Expand All @@ -359,13 +363,14 @@ private PooledConnection getPooledConnectionImmediate() {
return internalConnection == null ? null : new PooledConnection(internalConnection);
}

private MongoTimeoutException createTimeoutException(final StartTime startTime) {
private MongoTimeoutException createTimeoutException(final StartTime startTime, final TimeoutContext timeoutContext) {
long elapsedMs = startTime.elapsed().toMillis();
int numPinnedToCursor = pinnedStatsManager.getNumPinnedToCursor();
int numPinnedToTransaction = pinnedStatsManager.getNumPinnedToTransaction();
if (numPinnedToCursor == 0 && numPinnedToTransaction == 0) {
return new MongoTimeoutException(format("Timed out after %d ms while waiting for a connection to server %s.",
elapsedMs, serverId.getAddress()));
String errorMessage = format("Timed out after %d ms while waiting for a connection to server %s.",
elapsedMs, serverId.getAddress());
return timeoutContext.hasTimeoutMS() ? createMongoTimeoutException(errorMessage) : new MongoTimeoutException(errorMessage);
} else {
int maxSize = pool.getMaxSize();
int numInUse = pool.getInUseCount();
Expand Down Expand Up @@ -394,12 +399,13 @@ private MongoTimeoutException createTimeoutException(final StartTime startTime)
int numOtherInUse = numInUse - numPinnedToCursor - numPinnedToTransaction;
assertTrue(numOtherInUse >= 0);
assertTrue(numPinnedToCursor + numPinnedToTransaction + numOtherInUse <= maxSize);
return new MongoTimeoutException(format("Timed out after %d ms while waiting for a connection to server %s. Details: "
String errorMessage = format("Timed out after %d ms while waiting for a connection to server %s. Details: "
+ "maxPoolSize: %s, connections in use by cursors: %d, connections in use by transactions: %d, "
+ "connections in use by other operations: %d",
elapsedMs, serverId.getAddress(),
sizeToString(maxSize), numPinnedToCursor, numPinnedToTransaction,
numOtherInUse));
numOtherInUse);
return timeoutContext.hasTimeoutMS() ? createMongoTimeoutException(errorMessage) : new MongoTimeoutException(errorMessage);
}
}

Expand Down Expand Up @@ -965,7 +971,7 @@ private PooledConnection openWithConcurrencyLimit(final OperationContext operati
PooledConnection availableConnection;
try {//phase one
availableConnection = acquirePermitOrGetAvailableOpenedConnection(
mode == OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE, waitQueueTimeout, startTime);
mode == OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE, waitQueueTimeout, startTime, operationContext);
} catch (Exception e) {
connection.closeSilently();
throw e;
Expand Down Expand Up @@ -1007,7 +1013,7 @@ void openWithConcurrencyLimitAsync(
final SingleResultCallback<PooledConnection> callback) {
PooledConnection availableConnection;
try {//phase one
availableConnection = acquirePermitOrGetAvailableOpenedConnection(true, maxWaitTimeout, startTime);
availableConnection = acquirePermitOrGetAvailableOpenedConnection(true, maxWaitTimeout, startTime, operationContext);
} catch (Exception e) {
connection.closeSilently();
callback.onResult(null, e);
Expand Down Expand Up @@ -1038,7 +1044,8 @@ void openWithConcurrencyLimitAsync(
*/
@Nullable
private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boolean tryGetAvailable,
final Timeout waitQueueTimeout, final StartTime startTime)
final Timeout waitQueueTimeout, final StartTime startTime,
final OperationContext operationContext)
throws MongoTimeoutException, MongoInterruptedException {
PooledConnection availableConnection = null;
boolean expressedDesireToGetAvailableConnection = false;
Expand Down Expand Up @@ -1067,7 +1074,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
& (availableConnection = tryGetAvailable ? tryGetAvailableConnection() : null) == null) {

Timeout.onExistsAndExpired(waitQueueTimeout, () -> {
throw createTimeoutException(startTime);
throw createTimeoutException(startTime, operationContext.getTimeoutContext());
});
waitQueueTimeout.awaitOn(permitAvailableOrHandedOverOrClosedOrPausedCondition,
() -> "acquiring permit or getting available opened connection");
Expand Down Expand Up @@ -1389,10 +1396,15 @@ final class Task {
private final Timeout timeout;
private final StartTime startTime;
private final Consumer<RuntimeException> action;
private final OperationContext operationContext;
private boolean completed;

Task(final Timeout timeout, final StartTime startTime, final Consumer<RuntimeException> action) {
Task(final Timeout timeout,
final StartTime startTime,
final OperationContext operationContext,
final Consumer<RuntimeException> action) {
this.timeout = timeout;
this.operationContext = operationContext;
this.startTime = startTime;
this.action = action;
}
Expand All @@ -1406,7 +1418,7 @@ void failAsClosed() {
}

void failAsTimedOut() {
doComplete(() -> createTimeoutException(startTime));
doComplete(() -> createTimeoutException(startTime, operationContext.getTimeoutContext()));
}

private void doComplete(final Supplier<RuntimeException> failureSupplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import com.mongodb.internal.function.CheckedFunction;
import com.mongodb.internal.function.CheckedRunnable;
import com.mongodb.internal.function.CheckedSupplier;
import com.mongodb.lang.Nullable;
import com.mongodb.lang.NonNull;
import com.mongodb.lang.Nullable;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -40,6 +40,8 @@
/**
* A Timeout is a "deadline", point in time by which something must happen.
*
* Implementations of this interface must be immutable.
*
* @see TimePoint
*/
public interface Timeout {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@
import static com.mongodb.ClusterFixture.createOperationContext;
import static com.mongodb.internal.time.Timeout.ZeroSemantics.ZERO_DURATION_MEANS_EXPIRED;
import static java.lang.Long.MAX_VALUE;
import static java.lang.Thread.sleep;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -133,6 +135,33 @@ public void shouldThrowOnTimeout() throws InterruptedException {
assertTrue(connectionGetter.isGotTimeout());
}

@Test
public void shouldNotUseMaxAwaitTimeMSOnWhenTimeoutMsIsSet() throws InterruptedException {
// given
provider = new DefaultConnectionPool(SERVER_ID, connectionFactory,
ConnectionPoolSettings.builder()
.maxSize(1)
.build(),
mockSdamProvider(), OPERATION_CONTEXT_FACTORY);
provider.ready();
TimeoutSettings timeoutSettings = TIMEOUT_SETTINGS
.withTimeout(100L, MILLISECONDS)
.withMaxWaitTimeMS(50);

InternalConnection internalConnection = provider.get(createOperationContext(timeoutSettings));

// when
TimeoutTrackingConnectionGetter connectionGetter = new TimeoutTrackingConnectionGetter(provider, timeoutSettings);
new Thread(connectionGetter).start();

sleep(70); // wait for more than maxWaitTimeMS but less than timeoutMs.
internalConnection.close();
connectionGetter.getLatch().await();

// then
assertFalse(connectionGetter.isGotTimeout());
}

@Test
public void shouldThrowOnPoolClosed() {
provider = new DefaultConnectionPool(SERVER_ID, connectionFactory,
Expand Down Expand Up @@ -166,7 +195,7 @@ public void shouldExpireConnectionAfterMaxLifeTime() throws InterruptedException

// when
provider.get(OPERATION_CONTEXT).close();
Thread.sleep(100);
sleep(100);
provider.doMaintenance();
provider.get(OPERATION_CONTEXT);

Expand All @@ -187,7 +216,7 @@ public void shouldExpireConnectionAfterLifeTimeOnClose() throws InterruptedExcep

// when
InternalConnection connection = provider.get(OPERATION_CONTEXT);
Thread.sleep(50);
sleep(50);
connection.close();

// then
Expand All @@ -208,7 +237,7 @@ public void shouldExpireConnectionAfterMaxIdleTime() throws InterruptedException

// when
provider.get(OPERATION_CONTEXT).close();
Thread.sleep(100);
sleep(100);
provider.doMaintenance();
provider.get(OPERATION_CONTEXT);

Expand All @@ -230,7 +259,7 @@ public void shouldCloseConnectionAfterExpiration() throws InterruptedException {

// when
provider.get(OPERATION_CONTEXT).close();
Thread.sleep(50);
sleep(50);
provider.doMaintenance();
provider.get(OPERATION_CONTEXT);

Expand All @@ -252,7 +281,7 @@ public void shouldCreateNewConnectionAfterExpiration() throws InterruptedExcepti

// when
provider.get(OPERATION_CONTEXT).close();
Thread.sleep(50);
sleep(50);
provider.doMaintenance();
InternalConnection secondConnection = provider.get(OPERATION_CONTEXT);

Expand All @@ -277,7 +306,7 @@ public void shouldPruneAfterMaintenanceTaskRuns() throws InterruptedException {


// when
Thread.sleep(10);
sleep(10);
provider.doMaintenance();

// then
Expand Down Expand Up @@ -594,7 +623,7 @@ private static void useConcurrently(final DefaultConnectionPool pool, final int
*/
private static void sleepMillis(final long millis) {
try {
Thread.sleep(millis);
sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand Down
Loading