diff --git a/src/main/java/com/nirmata/workflow/WorkflowManager.java b/src/main/java/com/nirmata/workflow/WorkflowManager.java index 2dde3e56..59f5645e 100644 --- a/src/main/java/com/nirmata/workflow/WorkflowManager.java +++ b/src/main/java/com/nirmata/workflow/WorkflowManager.java @@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework; import java.io.Closeable; import java.util.Optional; +import java.util.concurrent.TimeUnit; /** * Main API - create via {@link WorkflowManagerBuilder} @@ -140,4 +141,14 @@ public interface WorkflowManager extends Closeable * @return new WorkflowListenerManager */ WorkflowListenerManager newWorkflowListenerManager(); + + /** + * Attempt to stop executing new tasks. + * Already running tasks will continue till timeout occurs. + * If tasks running are supposed to take long time, its better to give bigger timeOut value. + * + * @param timeOut + * @param timeUnit the timeunit of timeout (e.g SECONDS, MINUTES) + */ + void gracefulShutdown(long timeOut, TimeUnit timeUnit); } diff --git a/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java b/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java index 2176fd55..d768fa7c 100644 --- a/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java +++ b/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java @@ -57,6 +57,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -594,4 +595,13 @@ private List makeTaskConsumers(QueueFactory queueFactory, List consumer.closeGraceFully(timeOut, timeUnit)); + } + } } diff --git a/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java b/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java index 807f8376..fb6e7091 100644 --- a/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java +++ b/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java @@ -18,6 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.nirmata.workflow.admin.WorkflowManagerState; import java.io.Closeable; +import java.util.concurrent.TimeUnit; public interface QueueConsumer extends Closeable { @@ -30,4 +31,6 @@ public interface QueueConsumer extends Closeable @VisibleForTesting void debugValidateClosed(); + + public void closeGraceFully(long timeOut, TimeUnit timeUnit); } diff --git a/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java b/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java index f9e2bf80..f2955be6 100644 --- a/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java +++ b/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java @@ -331,4 +331,19 @@ private static long getEpoch(String itemNode) } return 0; } + + @Override + public void closeGraceFully(long timeOut, TimeUnit timeUnit) { + if ( started.compareAndSet(true, false) ) + { + executorService.shutdown(); + try { + executorService.awaitTermination(timeOut, timeUnit); + } catch (InterruptedException e) { + log.error("Exception occurred while waiting for running tasks to complete", e); + Thread.currentThread().interrupt(); + } + } + + } }