diff --git a/core/src/main/java/dev/failsafe/internal/BulkheadExecutor.java b/core/src/main/java/dev/failsafe/internal/BulkheadExecutor.java index b7c42c5f..419fe2f9 100644 --- a/core/src/main/java/dev/failsafe/internal/BulkheadExecutor.java +++ b/core/src/main/java/dev/failsafe/internal/BulkheadExecutor.java @@ -24,6 +24,7 @@ import dev.failsafe.spi.Scheduler; import java.time.Duration; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -62,22 +63,33 @@ protected CompletableFuture> preExecuteAsync(Scheduler schedu CompletableFuture> promise = new CompletableFuture<>(); CompletableFuture acquireFuture = bulkhead.acquirePermitAsync(); acquireFuture.whenComplete((result, error) -> { - // Signal for execution to proceed - promise.complete(ExecutionResult.none()); + if (error instanceof CancellationException) { + // Cancellation of acquireFuture future means either cancellation in the scheduler (in which case we probably + // do not care about the result too much), or cancellation because we reached maxWaitTime (see below) - in which case + // we want to inform the user with BulkheadFullException. + promise.complete(ExecutionResult.exception(new BulkheadFullException(bulkhead))); + } else { + // Signal for execution to proceed + promise.complete(ExecutionResult.none()); + } }); if (!promise.isDone()) { try { // Schedule bulkhead permit timeout Future timeoutFuture = scheduler.schedule(() -> { - promise.complete(ExecutionResult.exception(new BulkheadFullException(bulkhead))); + // Note: we cannot call `promise.complete` here directly. Doing so would result in a following race condition: + // * `promise` would be considered failed (i.e. caller would think thar no permit was acquired) + // * but some other thread may release permit before we call `acquireFuture.cancel` - resulting in `acquireFuture` being completed + // successfully (and permit acquired). But since `promise` is already completed that fact is ignored. + // This discrepancy would lead to permits 'leaking'. So instead we make `acquireFuture.whenComplete` be the only way + // to complete `promise` and here we just signal to that code that `promise` should return BulkheadFullException. acquireFuture.cancel(true); return null; }, maxWaitTime.toNanos(), TimeUnit.NANOSECONDS); // Propagate outer cancellations to the promise, bulkhead acquire future, and timeout future future.setCancelFn(this, (mayInterrupt, cancelResult) -> { - promise.complete(cancelResult); acquireFuture.cancel(mayInterrupt); timeoutFuture.cancel(mayInterrupt); }); diff --git a/core/src/main/java/dev/failsafe/internal/BulkheadImpl.java b/core/src/main/java/dev/failsafe/internal/BulkheadImpl.java index 477532ef..33d6cc02 100644 --- a/core/src/main/java/dev/failsafe/internal/BulkheadImpl.java +++ b/core/src/main/java/dev/failsafe/internal/BulkheadImpl.java @@ -96,14 +96,30 @@ synchronized CompletableFuture acquirePermitAsync() { @Override public synchronized void releasePermit() { - if (permits < maxPermits) { - permits += 1; - CompletableFuture future = futures.pollFirst(); - if (future != null){ - permits -= 1; - future.complete(null); + if (permits < maxPermits) { + permits += 1; + /* + * It is possible to get future from the list that already had been completed. This + * happens because setting future to 'completed' state happens before (and not + * atomically with) removing future from the list. Handle this by pulling futures from + * the list until we find one we can complete (or reach the end of the list). Not doing + * this may result in 'dandling' messages in the list that are never completed. For some + * details see FutureLinkedList.add - how it returns a future that weill remove entry + * from the list when it is completed. And also see BulkheadExecutor.preExecuteAsync + * that calls acquirePermitAsync and gets that future in response. + */ + while (true) { + CompletableFuture future = futures.pollFirst(); + if (future == null) { + break; + } + permits -= 1; + if (future.complete(null)) { + break; + } + permits += 1; + } } - } } @Override diff --git a/core/src/main/java/dev/failsafe/internal/util/FutureLinkedList.java b/core/src/main/java/dev/failsafe/internal/util/FutureLinkedList.java index 2794ed14..addcb0b7 100644 --- a/core/src/main/java/dev/failsafe/internal/util/FutureLinkedList.java +++ b/core/src/main/java/dev/failsafe/internal/util/FutureLinkedList.java @@ -67,6 +67,18 @@ public synchronized CompletableFuture pollFirst() { return previousHead == null ? null : previousHead.future; } + /* + * This looks dodgy: we are 'leaking' reference to the node object via future.whenComplete, so + * this can end up being called for a node that has already been removed. This could have caused + * problems. But in reality it currently would not because the only way to remove a node is via + * pollFirst - i.e. by polling from the head of the list. This means that if node passed to this + * function has already been removed it would imply that it's 'previous' field is always null + * (it was in the head of the list before removal). And it's 'next' points to current head of + * the list, so when we replace node.next.previous with node.previous we always replace null + * with null. This whole assumption would break it this list allowed to add from the head of the + * list, or remove from the tail or middle. So this is somewhat fragile, but currently seems to + * be working fine. + */ private synchronized void remove(Node node) { if (node.previous != null) node.previous.next = node.next; diff --git a/core/src/test/java/dev/failsafe/functional/BulkheadTest.java b/core/src/test/java/dev/failsafe/functional/BulkheadTest.java index 79be225a..9bc8c2d8 100644 --- a/core/src/test/java/dev/failsafe/functional/BulkheadTest.java +++ b/core/src/test/java/dev/failsafe/functional/BulkheadTest.java @@ -25,8 +25,14 @@ import org.testng.annotations.Test; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.testng.Assert.assertTrue; /** * Tests various Bulkhead scenarios. @@ -96,4 +102,46 @@ public void testMaxWaitTimeExceeded() { testRunFailure(Failsafe.with(bulkhead), ctx -> { }, BulkheadFullException.class); } + + @Test + public void testPermitsLeak() throws InterruptedException { + // We verify against leak of permits because of a race condition that only happens when maxWaitTime is not zero. + Bulkhead bulkhead = Bulkhead.builder(1).withMaxWaitTime(Duration.ofMillis(1)).build(); + FailsafeExecutor failsafe = Failsafe.with(bulkhead); + + AtomicInteger errors = new AtomicInteger(); + List threads = new ArrayList<>(); + for (int i = 0; i < 30; i++) { + threads.add(new Thread(() -> { + for (int j = 0; j < 30; j++) { + try { + failsafe.getStageAsync(() -> { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + log(getClass(), "Interrupted sleep", e); + } + return null; + }).join(); // Submit work to the bulkhead + } catch (CompletionException e) { + errors.incrementAndGet(); + } + } + })); + } + + // Start and join the threads + threads.forEach(Thread::start); + for (Thread t : threads) { + t.join(); + } + + // Wait for the executor to finish all work. 250ms is plenty of time to finish the work submitted above. + Thread.sleep(250); + + // Make sure this run doesn't fail + failsafe.getStageAsync(() -> null).join(); + + assertTrue(errors.get() > 0, "Should have some errors because maxWaitTime is very small"); + } }