Skip to content

HADOOP-19569. S3A: stream write/close fails badly once FS is closed #7700

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,21 +130,20 @@ public static BlockingThreadPoolExecutorService newInstance(
slower than enqueueing. */
final BlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<>(waitingTasks + activeTasks);
final InnerExecutorRejection rejection = new InnerExecutorRejection();
Copy link
Preview

Copilot AI May 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The InnerExecutorRejection handler now shuts down the service upon rejection. Consider enhancing the error handling logic or adding more detailed documentation to explain the shutdown behavior in case of task rejection.

Copilot uses AI. Check for mistakes.

ThreadPoolExecutor eventProcessingExecutor =
new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
workQueue, newDaemonThreadFactory(prefixName),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r,
ThreadPoolExecutor executor) {
// This is not expected to happen.
LOG.error("Could not submit task to executor {}",
executor.toString());
}
});
rejection);
eventProcessingExecutor.allowCoreThreadTimeOut(true);
return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
eventProcessingExecutor);
final BlockingThreadPoolExecutorService service =
new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
eventProcessingExecutor);
rejection.setDelegate((r, executor) -> {
service.shutdown();
});

return service;
}

/**
Expand All @@ -164,4 +163,28 @@ public String toString() {
.append('}');
return sb.toString();
}

private static class InnerExecutorRejection implements RejectedExecutionHandler {

private RejectedExecutionHandler delegate;

private RejectedExecutionHandler getDelegate() {
return delegate;
}

private void setDelegate(final RejectedExecutionHandler delegate) {
this.delegate = delegate;
}

@Override
public void rejectedExecution(Runnable r,
ThreadPoolExecutor executor) {
// This is not expected to happen.
LOG.error("Could not submit task to executor {}",
executor.toString());
if (getDelegate() != null) {
delegate.rejectedExecution(r, executor);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -127,6 +128,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,

@Override
public <T> Future<T> submit(Callable<T> task) {
rejectWhenShutdown();
try (DurationTracker ignored =
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
queueingPermits.acquire();
Expand All @@ -139,6 +141,7 @@ public <T> Future<T> submit(Callable<T> task) {

@Override
public <T> Future<T> submit(Runnable task, T result) {
rejectWhenShutdown();
try (DurationTracker ignored =
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
queueingPermits.acquire();
Expand All @@ -151,6 +154,7 @@ public <T> Future<T> submit(Runnable task, T result) {

@Override
public Future<?> submit(Runnable task) {
rejectWhenShutdown();
try (DurationTracker ignored =
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
queueingPermits.acquire();
Expand All @@ -163,6 +167,7 @@ public Future<?> submit(Runnable task) {

@Override
public void execute(Runnable command) {
rejectWhenShutdown();
try (DurationTracker ignored =
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
queueingPermits.acquire();
Expand Down Expand Up @@ -208,6 +213,16 @@ public String toString() {
return sb.toString();
}

/**
* Raise an exception if invoked when the executor is shut down.
* @throws RejectedExecutionException if the executor is shut down.
*/
private void rejectWhenShutdown() throws RejectedExecutionException{
if (isShutdown()) {
throw new RejectedExecutionException("ExecutorService is shutdown");
}
}

/**
* Releases a permit after the task is executed.
*/
Expand All @@ -222,6 +237,7 @@ class RunnableWithPermitRelease implements Runnable {
@Override
public void run() {
try {
rejectWhenShutdown();
delegatee.run();
} finally {
queueingPermits.release();
Expand All @@ -244,6 +260,7 @@ class CallableWithPermitRelease<T> implements Callable<T> {
@Override
public T call() throws Exception {
try {
rejectWhenShutdown();
return delegatee.call();
} finally {
queueingPermits.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,71 +16,77 @@
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;

import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.StopWatch;

import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
package org.apache.hadoop.util;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.test.AbstractHadoopTestBase;

import static org.apache.hadoop.test.LambdaTestUtils.intercept;

/**
* Basic test for S3A's blocking executor service.
* Test for the blocking executor service.
*/
public class ITestBlockingThreadPoolExecutorService {
public class TestBlockingThreadPoolExecutorService extends AbstractHadoopTestBase {

private static final Logger LOG = LoggerFactory.getLogger(
ITestBlockingThreadPoolExecutorService.class);
TestBlockingThreadPoolExecutorService.class);

private static final int NUM_ACTIVE_TASKS = 4;

private static final int NUM_WAITING_TASKS = 2;

private static final int TASK_SLEEP_MSEC = 100;

private static final int SHUTDOWN_WAIT_MSEC = 200;

private static final int SHUTDOWN_WAIT_TRIES = 5;

private static final int BLOCKING_THRESHOLD_MSEC = 50;

private static final Integer SOME_VALUE = 1337;

private static BlockingThreadPoolExecutorService tpe;
private BlockingThreadPoolExecutorService tpe;
Copy link
Preview

Copilot AI May 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Since the thread pool executor is now an instance variable with setup/teardown methods, ensure that each test properly initializes and destroys the executor to avoid interference between tests.

Copilot uses AI. Check for mistakes.


@Rule
public Timeout testTimeout = new Timeout(60, TimeUnit.SECONDS);

@AfterClass
public static void afterClass() throws Exception {
@Before
public void setup() throws Exception {
ensureCreated();
}

@After
public void teardown() throws Exception {
ensureDestroyed();
}


/**
* Basic test of running one trivial task.
*/
@Test
public void testSubmitCallable() throws Exception {
ensureCreated();
Future<Integer> f = tpe.submit(callableSleeper);
Integer v = f.get();
assertEquals(SOME_VALUE, v);
Assertions.assertThat(v).isEqualTo(SOME_VALUE);
}

/**
* More involved test, including detecting blocking when at capacity.
*/
@Test
public void testSubmitRunnable() throws Exception {
ensureCreated();
verifyQueueSize(tpe, NUM_ACTIVE_TASKS + NUM_WAITING_TASKS);
}

Expand All @@ -102,27 +108,30 @@ protected void verifyQueueSize(ExecutorService executorService,
assertDidBlock(stopWatch);
}

@Test
public void testShutdown() throws Exception {
// Cover create / destroy, regardless of when this test case runs
ensureCreated();
ensureDestroyed();

// Cover create, execute, destroy, regardless of when test case runs
ensureCreated();
testSubmitRunnable();
ensureDestroyed();
}

@Test
public void testChainedQueue() throws Throwable {
ensureCreated();
int size = 2;
ExecutorService wrapper = new SemaphoredDelegatingExecutor(tpe,
size, true);
verifyQueueSize(wrapper, size);
}

@Test
public void testShutdownQueueRejectsOperations() throws Throwable {
tpe.shutdown();
Assertions.assertThat(tpe.isShutdown())
.describedAs("%s should be shutdown", tpe)
.isTrue();
// runnable
intercept(RejectedExecutionException.class, () ->
tpe.submit(failToRun));
// callable
intercept(RejectedExecutionException.class, () ->
tpe.submit(() -> 0));
intercept(RejectedExecutionException.class, () ->
tpe.execute(failToRun));
}

// Helper functions, etc.

private void assertDidBlock(StopWatch sw) {
Expand All @@ -135,28 +144,27 @@ private void assertDidBlock(StopWatch sw) {
}
}

private Runnable sleeper = new Runnable() {
@Override
public void run() {
String name = Thread.currentThread().getName();
try {
Thread.sleep(TASK_SLEEP_MSEC);
} catch (InterruptedException e) {
LOG.info("Thread {} interrupted.", name);
Thread.currentThread().interrupt();
}
}
private Runnable failToRun = () -> {
throw new RuntimeException("This runnable raises and exception");
};

private Callable<Integer> callableSleeper = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
sleeper.run();
return SOME_VALUE;
private Runnable sleeper = () -> {
String name = Thread.currentThread().getName();
try {
Thread.sleep(TASK_SLEEP_MSEC);
} catch (InterruptedException e) {
LOG.info("Thread {} interrupted.", name);
Thread.currentThread().interrupt();
}
};

private Callable<Integer> callableSleeper = () -> {
sleeper.run();
return SOME_VALUE;
};

private class LatchedSleeper implements Runnable {

private final CountDownLatch latch;

LatchedSleeper(CountDownLatch latch) {
Expand All @@ -178,7 +186,7 @@ public void run() {
/**
* Helper function to create thread pool under test.
*/
private static void ensureCreated() throws Exception {
private void ensureCreated() throws Exception {
if (tpe == null) {
LOG.debug("Creating thread pool");
tpe = BlockingThreadPoolExecutorService.newInstance(
Expand All @@ -191,7 +199,7 @@ private static void ensureCreated() throws Exception {
* Helper function to terminate thread pool under test, asserting that
* shutdown -> terminate works as expected.
*/
private static void ensureDestroyed() throws Exception {
private void ensureDestroyed() throws Exception {
if (tpe == null) {
return;
}
Expand Down
Loading