Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/main/java/com/nirmata/workflow/WorkflowManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.curator.framework.CuratorFramework;
import java.io.Closeable;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

/**
* Main API - create via {@link WorkflowManagerBuilder}
Expand Down Expand Up @@ -140,4 +141,14 @@ public interface WorkflowManager extends Closeable
* @return new WorkflowListenerManager
*/
WorkflowListenerManager newWorkflowListenerManager();

/**
* Attempt to stop executing new tasks.
* Already running tasks will continue till timeout occurs.
* If tasks running are supposed to take long time, its better to give bigger timeOut value.
*
* @param timeOut
* @param timeUnit the timeunit of timeout (e.g SECONDS, MINUTES)
*/
void gracefulShutdown(long timeOut, TimeUnit timeUnit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -594,4 +595,13 @@ private List<QueueConsumer> makeTaskConsumers(QueueFactory queueFactory, List<Ta

return builder.build();
}

@Override
public void gracefulShutdown(long timeOut, TimeUnit timeUnit) {
if ( state.compareAndSet(State.STARTED, State.CLOSED) )
{
CloseableUtils.closeQuietly(schedulerSelector);
consumers.forEach(consumer -> consumer.closeGraceFully(timeOut, timeUnit));
}
}
}
3 changes: 3 additions & 0 deletions src/main/java/com/nirmata/workflow/queue/QueueConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.nirmata.workflow.admin.WorkflowManagerState;
import java.io.Closeable;
import java.util.concurrent.TimeUnit;

public interface QueueConsumer extends Closeable
{
Expand All @@ -30,4 +31,6 @@ public interface QueueConsumer extends Closeable

@VisibleForTesting
void debugValidateClosed();

public void closeGraceFully(long timeOut, TimeUnit timeUnit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,4 +331,19 @@ private static long getEpoch(String itemNode)
}
return 0;
}

@Override
public void closeGraceFully(long timeOut, TimeUnit timeUnit) {
if ( started.compareAndSet(true, false) )
{
executorService.shutdown();
try {
executorService.awaitTermination(timeOut, timeUnit);
} catch (InterruptedException e) {
log.error("Exception occurred while waiting for running tasks to complete", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a call to Thread.currentThread().interrupt() (see: http://www.javapractices.com/topic/TopicAction.do?Id=251)

Thread.currentThread().interrupt();
}
}

}
}