From 2217e5e17dc855ccdff75adb2ffb00d89920eaa9 Mon Sep 17 00:00:00 2001 From: Rajesh Chejerla Date: Sat, 30 Jul 2016 01:27:11 +0530 Subject: [PATCH 1/2] Shutting down workflow gracefully. --- .../java/com/nirmata/workflow/WorkflowManager.java | 11 +++++++++++ .../workflow/details/WorkflowManagerImpl.java | 10 ++++++++++ .../com/nirmata/workflow/queue/QueueConsumer.java | 3 +++ .../workflow/queue/zookeeper/SimpleQueue.java | 14 ++++++++++++++ 4 files changed, 38 insertions(+) diff --git a/src/main/java/com/nirmata/workflow/WorkflowManager.java b/src/main/java/com/nirmata/workflow/WorkflowManager.java index 2dde3e56..59f5645e 100644 --- a/src/main/java/com/nirmata/workflow/WorkflowManager.java +++ b/src/main/java/com/nirmata/workflow/WorkflowManager.java @@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework; import java.io.Closeable; import java.util.Optional; +import java.util.concurrent.TimeUnit; /** * Main API - create via {@link WorkflowManagerBuilder} @@ -140,4 +141,14 @@ public interface WorkflowManager extends Closeable * @return new WorkflowListenerManager */ WorkflowListenerManager newWorkflowListenerManager(); + + /** + * Attempt to stop executing new tasks. + * Already running tasks will continue till timeout occurs. + * If tasks running are supposed to take long time, its better to give bigger timeOut value. + * + * @param timeOut + * @param timeUnit the timeunit of timeout (e.g SECONDS, MINUTES) + */ + void gracefulShutdown(long timeOut, TimeUnit timeUnit); } diff --git a/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java b/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java index 2176fd55..d768fa7c 100644 --- a/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java +++ b/src/main/java/com/nirmata/workflow/details/WorkflowManagerImpl.java @@ -57,6 +57,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -594,4 +595,13 @@ private List makeTaskConsumers(QueueFactory queueFactory, List consumer.closeGraceFully(timeOut, timeUnit)); + } + } } diff --git a/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java b/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java index 807f8376..fb6e7091 100644 --- a/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java +++ b/src/main/java/com/nirmata/workflow/queue/QueueConsumer.java @@ -18,6 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.nirmata.workflow.admin.WorkflowManagerState; import java.io.Closeable; +import java.util.concurrent.TimeUnit; public interface QueueConsumer extends Closeable { @@ -30,4 +31,6 @@ public interface QueueConsumer extends Closeable @VisibleForTesting void debugValidateClosed(); + + public void closeGraceFully(long timeOut, TimeUnit timeUnit); } diff --git a/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java b/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java index f9e2bf80..00ad58fb 100644 --- a/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java +++ b/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java @@ -331,4 +331,18 @@ private static long getEpoch(String itemNode) } return 0; } + + @Override + public void closeGraceFully(long timeOut, TimeUnit timeUnit) { + if ( started.compareAndSet(true, false) ) + { + executorService.shutdown(); + try { + executorService.awaitTermination(timeOut, timeUnit); + } catch (InterruptedException e) { + log.error("Exception occurred while waiting for running tasks to complete", e); + } + } + + } } From 0bfc39d88c5f9868f397c91c05ce14199eed4908 Mon Sep 17 00:00:00 2001 From: Rajesh Chejerla Date: Sat, 30 Jul 2016 09:09:13 +0530 Subject: [PATCH 2/2] Adding call to Thread.currentThread().interrupt() in closeGraceFully method of SimpleQueue. --- .../java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java b/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java index 00ad58fb..f2955be6 100644 --- a/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java +++ b/src/main/java/com/nirmata/workflow/queue/zookeeper/SimpleQueue.java @@ -341,6 +341,7 @@ public void closeGraceFully(long timeOut, TimeUnit timeUnit) { executorService.awaitTermination(timeOut, timeUnit); } catch (InterruptedException e) { log.error("Exception occurred while waiting for running tasks to complete", e); + Thread.currentThread().interrupt(); } }