diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2d0f3c1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea/vcs.xml +.idea/various.iml diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml new file mode 100644 index 0000000..41fe27e --- /dev/null +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -0,0 +1,5 @@ + + + + \ No newline at end of file diff --git a/.idea/compiler.xml b/.idea/compiler.xml new file mode 100644 index 0000000..7d285f9 --- /dev/null +++ b/.idea/compiler.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/jarRepositories.xml b/.idea/jarRepositories.xml new file mode 100644 index 0000000..271d107 --- /dev/null +++ b/.idea/jarRepositories.xml @@ -0,0 +1,40 @@ + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..b33a53f --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,14 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..9f3a195 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..a5c30bb --- /dev/null +++ b/pom.xml @@ -0,0 +1,19 @@ + + 4.0.0 + com.infiniteautomation + various + 1.0.1-SNAPSHOT + Sero Various Libraries + + https://github.com/infiniteautomation/various + + + MPL 2.0 + https://www.mozilla.org/en-US/MPL/2.0/. + + + + Radix IoT + https://www.radixiot.com/ + + diff --git a/sero-warp/pom.xml b/sero-warp/pom.xml new file mode 100644 index 0000000..3630d12 --- /dev/null +++ b/sero-warp/pom.xml @@ -0,0 +1,56 @@ + + 4.0.0 + lohbihler + sero-warp + 1.0.1-SNAPSHOT + + 17 + + + + MPL 2.0 + https://www.mozilla.org/en-US/MPL/2.0/. + + + + Radix IoT + https://www.radixiot.com/ + + + + lohbihler + sero-scheduler + 1.1.0 + + + junit + junit + 4.13.1 + test + + + + + + false + + + true + + ias-snapshots + https://maven.mangoautomation.net/repository/ias-snapshot/ + + + + true + + + false + + ias-releases + https://maven.mangoautomation.net/repository/ias-release/ + + + diff --git a/sero-warp/src/main/java/lohbihler/warp/OrderedExecutorService.java b/sero-warp/src/main/java/lohbihler/warp/OrderedExecutorService.java new file mode 100644 index 0000000..79a94c6 --- /dev/null +++ b/sero-warp/src/main/java/lohbihler/warp/OrderedExecutorService.java @@ -0,0 +1,83 @@ +package lohbihler.warp; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * @author Terry Packer + */ +public class OrderedExecutorService extends ThreadPoolExecutor { + + /** + * Lock held on access to workers set and related bookkeeping. While we could use a concurrent + * set of some sort, it turns out to be generally preferable to use a lock. Among the reasons is + * that this serializes interruptIdleWorkers, which avoids unnecessary interrupt storms, + * especially during shutdown. Otherwise exiting threads would concurrently interrupt those that + * have not yet interrupted. It also simplifies some of the associated statistics bookkeeping of + * largestPoolSize etc. We also hold mainLock on shutdown and shutdownNow, for the sake of + * ensuring workers set is stable while separately checking permission to interrupt and actually + * interrupting. + */ + private final ReentrantReadWriteLock mainLock = new ReentrantReadWriteLock(); + + private OrderedRunnable first; + + public OrderedExecutorService(int corePoolSize, int maximumPoolSize, long keepAliveTime, + TimeUnit unit, BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + @Override + public void execute(Runnable command) { + ReentrantReadWriteLock lock = mainLock; + lock.writeLock().lock(); + try { + if (first == null) { + first = new OrderedRunnable(null, command); + } else { + first = new OrderedRunnable(first, command); + } + } finally { + lock.writeLock().unlock(); + } + super.execute(first); + } + + private class OrderedRunnable implements Runnable { + + private final OrderedRunnable previous; + private volatile CountDownLatch latch; + private final Runnable command; + private volatile boolean success = false; + private volatile boolean done = false; + + public OrderedRunnable(OrderedRunnable previous, Runnable command) { + this.previous = previous; + this.command = command; + this.latch = new CountDownLatch(1); + } + + public void await() throws InterruptedException { + latch.await(); + } + + @Override + public void run() { + try { + command.run(); + success = true; + if (previous != null) { + previous.await(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + done = true; + latch.countDown(); + } + } + } +} diff --git a/sero-warp/src/main/java/lohbihler/warp/TestingWarpScheduledExecutorService.java b/sero-warp/src/main/java/lohbihler/warp/TestingWarpScheduledExecutorService.java new file mode 100644 index 0000000..8fa5e3e --- /dev/null +++ b/sero-warp/src/main/java/lohbihler/warp/TestingWarpScheduledExecutorService.java @@ -0,0 +1,175 @@ +package lohbihler.warp; + +import java.time.Clock; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * @author Terry Packer + */ +public class TestingWarpScheduledExecutorService extends WarpScheduledExecutorService { + + private final Set runnables; + private final Set> callables; + + public TestingWarpScheduledExecutorService(final Clock clock) { + super(clock); + this.runnables = ConcurrentHashMap.newKeySet(); + this.callables = ConcurrentHashMap.newKeySet(); + } + + public TestingWarpScheduledExecutorService(final Clock clock, + final ExecutorService executorService) { + super(clock, executorService); + this.runnables = ConcurrentHashMap.newKeySet(); + this.callables = ConcurrentHashMap.newKeySet(); + } + + @Override + protected void executeInExecutor(Runnable command) { + super.executeInExecutor(wrapRunnable(command)); + } + + @Override + protected Future submitToExecutor(Runnable runnable) { + return super.submitToExecutor(wrapRunnable(runnable)); + } + + @Override + protected Future submitToExecutor(Runnable task, T result) { + return super.submitToExecutor(wrapRunnable(task), result); + } + + @Override + protected Future submitToExecutor(Callable callable) { + return super.submitToExecutor(wrapCallable(callable)); + } + + @Override + public List> invokeAll(Collection> tasks) + throws InterruptedException { + return super.invokeAll(tasks.stream().map(this::wrapCallable).toList()); + } + + @Override + public List> invokeAll(Collection> tasks, + long timeout, TimeUnit unit) throws InterruptedException { + return super.invokeAll(tasks.stream().map(this::wrapCallable).toList(), timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, + TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return super.invokeAny(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException { + return super.invokeAny(tasks); + } + + private TrackedCallable wrapCallable(Callable callable) { + return new TrackedCallable<>(callable); + } + + private TrackedRunnable wrapRunnable(Runnable runnable) { + return new TrackedRunnable(runnable); + } + + /** + * Wait for all tasks to complete, waiting for a period of time, count times. + * + * @param period + * @param timeUnit + * @param periodsToWait + * @param waitForCallables + * @param waitForRunnables + * @param waitForScheduledFutures + * @return true if all tasks exited, false if not + * @throws InterruptedException + */ + public boolean waitForExecutorTasks(int period, TimeUnit timeUnit, int periodsToWait, + boolean waitForCallables, boolean waitForRunnables, boolean waitForScheduledFutures) + throws InterruptedException { + int waits = 0; + boolean callablesDone = !waitForCallables; + boolean runnablesDone = !waitForRunnables; + boolean scheduledFuturesDone = !waitForScheduledFutures; + + while (waits < periodsToWait) { + if (waitForCallables) { + synchronized (callables) { + if (callables.isEmpty()) { + callablesDone = true; + } + } + } + if (waitForRunnables) { + synchronized (runnables) { + if (runnables.isEmpty()) { + runnablesDone = true; + } + } + } + if (waitForScheduledFutures) { + if (tasks.isEmpty()) { + scheduledFuturesDone = true; + } + } + + if (!callablesDone || !runnablesDone || !scheduledFuturesDone) { + timeUnit.sleep(period); + } else { + return true; + } + } + return false; + } + + class TrackedCallable implements Callable { + + private final Callable callable; + + public TrackedCallable(Callable callable) { + this.callable = callable; + callables.add(callable); + } + + @Override + public V call() throws Exception { + try { + return this.callable.call(); + } finally { + callables.remove(callable); + } + } + } + + class TrackedRunnable implements Runnable { + + private final Runnable runnable; + + public TrackedRunnable(Runnable runnable) { + this.runnable = runnable; + runnables.add(runnable); + } + + @Override + public void run() { + try { + this.runnable.run(); + } finally { + runnables.remove(runnable); + } + } + } +} diff --git a/sero-warp/src/main/java/lohbihler/warp/WarpClock.java b/sero-warp/src/main/java/lohbihler/warp/WarpClock.java index caf2cc3..459d51b 100644 --- a/sero-warp/src/main/java/lohbihler/warp/WarpClock.java +++ b/sero-warp/src/main/java/lohbihler/warp/WarpClock.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; public class WarpClock extends Clock { + private final ZoneId zoneId; private LocalDateTime dateTime; private final List listeners = new CopyOnWriteArrayList<>(); @@ -35,21 +36,28 @@ public WarpClock(final ZoneId zoneId) { this(zoneId, LocalDateTime.now(Clock.system(zoneId))); } + /** + * @param zoneId + * @param dateTime + */ public WarpClock(final ZoneId zoneId, final LocalDateTime dateTime) { Objects.requireNonNull(zoneId, "zoneId"); Objects.requireNonNull(dateTime, "dateTime"); this.zoneId = zoneId; this.dateTime = dateTime; + } - public TimeoutFuture setTimeout(final Runnable command, final long timeout, final TimeUnit timeUnit) { + public TimeoutFuture setTimeout(final Runnable command, final long timeout, + final TimeUnit timeUnit) { return setTimeout(() -> { command.run(); return null; }, timeout, timeUnit); } - public TimeoutFuture setTimeout(final Callable callable, final long timeout, final TimeUnit timeUnit) { + public TimeoutFuture setTimeout(final Callable callable, final long timeout, + final TimeUnit timeUnit) { final LocalDateTime deadline = dateTime.plusNanos(timeUnit.toNanos(timeout)); final TimeoutFutureImpl future = new TimeoutFutureImpl<>(); final ClockListener listener = new ClockListener() { @@ -72,6 +80,7 @@ public void clockUpdate(final LocalDateTime dateTime) { } class TimeoutFutureImpl implements TimeoutFuture { + private boolean success; private boolean cancelled; private Exception ex; @@ -80,21 +89,26 @@ class TimeoutFutureImpl implements TimeoutFuture { @Override public V get() throws CancellationException, InterruptedException, Exception { - if (success) + if (success) { return result; - if (ex != null) + } + if (ex != null) { throw ex; - if (cancelled) + } + if (cancelled) { throw new CancellationException(); + } synchronized (this) { wait(); } - if (success) + if (success) { return result; - if (ex != null) + } + if (ex != null) { throw ex; + } throw new CancellationException(); } @@ -137,33 +151,41 @@ public void removeListener(final ClockListener listener) { listeners.remove(listener); } - public LocalDateTime set(final int year, final Month month, final int dayOfMonth, final int hour, - final int minute) { + public LocalDateTime set(final int year, final Month month, final int dayOfMonth, + final int hour, + final int minute) { return fireUpdate(LocalDateTime.of(year, month, dayOfMonth, hour, minute)); } - public LocalDateTime set(final int year, final Month month, final int dayOfMonth, final int hour, final int minute, - final int second) { + public LocalDateTime set(final int year, final Month month, final int dayOfMonth, + final int hour, final int minute, + final int second) { return fireUpdate(LocalDateTime.of(year, month, dayOfMonth, hour, minute, second)); } - public LocalDateTime set(final int year, final Month month, final int dayOfMonth, final int hour, final int minute, - final int second, final int nanoOfSecond) { - return fireUpdate(LocalDateTime.of(year, month, dayOfMonth, hour, minute, second, nanoOfSecond)); + public LocalDateTime set(final int year, final Month month, final int dayOfMonth, + final int hour, final int minute, + final int second, final int nanoOfSecond) { + return fireUpdate( + LocalDateTime.of(year, month, dayOfMonth, hour, minute, second, nanoOfSecond)); } - public LocalDateTime set(final int year, final int month, final int dayOfMonth, final int hour, final int minute) { + public LocalDateTime set(final int year, final int month, final int dayOfMonth, final int hour, + final int minute) { return fireUpdate(LocalDateTime.of(year, month, dayOfMonth, hour, minute)); } - public LocalDateTime set(final int year, final int month, final int dayOfMonth, final int hour, final int minute, - final int second) { + public LocalDateTime set(final int year, final int month, final int dayOfMonth, final int hour, + final int minute, + final int second) { return fireUpdate(LocalDateTime.of(year, month, dayOfMonth, hour, minute, second)); } - public LocalDateTime set(final int year, final int month, final int dayOfMonth, final int hour, final int minute, - final int second, final int nanoOfSecond) { - return fireUpdate(LocalDateTime.of(year, month, dayOfMonth, hour, minute, second, nanoOfSecond)); + public LocalDateTime set(final int year, final int month, final int dayOfMonth, final int hour, + final int minute, + final int second, final int nanoOfSecond) { + return fireUpdate( + LocalDateTime.of(year, month, dayOfMonth, hour, minute, second, nanoOfSecond)); } public LocalDateTime plus(final TemporalAmount amountToAdd) { @@ -214,10 +236,12 @@ public LocalDateTime plus(final int amount, final TimeUnit unit, final long endS return plus(amount, unit, 0, null, 0, endSleep); } - public LocalDateTime plus(final int amount, final TimeUnit unit, final int byAmount, final TimeUnit byUnit, - final long eachSleep, final long endSleep) { + public LocalDateTime plus(final int amount, final TimeUnit unit, final int byAmount, + final TimeUnit byUnit, + final long eachSleep, final long endSleep) { long remainder = unit.toNanos(amount); - final long each = (byUnit == null ? unit : byUnit).toNanos(byAmount == 0 ? amount : byAmount); + final long each = (byUnit == null ? unit : byUnit).toNanos( + byAmount == 0 ? amount : byAmount); LocalDateTime result = null; try { @@ -227,8 +251,9 @@ public LocalDateTime plus(final int amount, final TimeUnit unit, final int byAmo } else { while (remainder > 0) { long nanos = each; - if (each > remainder) + if (each > remainder) { nanos = remainder; + } result = plusNanos(nanos); remainder -= nanos; Thread.sleep(eachSleep); diff --git a/sero-warp/src/main/java/lohbihler/warp/WarpScheduledExecutorService.java b/sero-warp/src/main/java/lohbihler/warp/WarpScheduledExecutorService.java index acdface..dc1eb28 100644 --- a/sero-warp/src/main/java/lohbihler/warp/WarpScheduledExecutorService.java +++ b/sero-warp/src/main/java/lohbihler/warp/WarpScheduledExecutorService.java @@ -24,7 +24,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; - import lohbihler.scheduler.ScheduledExecutorServiceVariablePool; /** @@ -34,23 +33,28 @@ * @author Matthew */ public class WarpScheduledExecutorService implements ScheduledExecutorService, ClockListener { - private final WarpClock clock; - private final ExecutorService executorService; - private final ScheduledExecutorServiceVariablePool delegate; - private final List> tasks = new ArrayList<>(); - private boolean shutdown; + protected final WarpClock clock; + protected final ExecutorService executorService; + protected final ScheduledExecutorServiceVariablePool delegate; + + protected final List> tasks = new ArrayList<>(); + protected boolean shutdown; public WarpScheduledExecutorService(final Clock clock) { + this(clock, Executors.newCachedThreadPool()); + } + + public WarpScheduledExecutorService(final Clock clock, final ExecutorService executorService) { if (clock instanceof WarpClock) { this.clock = (WarpClock) clock; this.clock.addListener(this); - executorService = Executors.newCachedThreadPool(); - delegate = null; + this.executorService = executorService; + this.delegate = null; } else { this.clock = null; - executorService = null; - delegate = new ScheduledExecutorServiceVariablePool(clock); + this.executorService = null; + this.delegate = new ScheduledExecutorServiceVariablePool(clock); } } @@ -60,12 +64,14 @@ public void clockUpdate(final LocalDateTime dateTime) { // Poll for a task. final ScheduleFutureImpl task; synchronized (tasks) { - if (tasks.isEmpty()) + if (tasks.isEmpty()) { break; + } task = tasks.get(0); final long waitTime = task.getDelay(TimeUnit.MILLISECONDS); - if (waitTime > 0) + if (waitTime > 0) { break; + } // Remove the task tasks.remove(0); } @@ -120,7 +126,8 @@ public boolean isTerminated() { } @Override - public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException { + public boolean awaitTermination(final long timeout, final TimeUnit unit) + throws InterruptedException { if (delegate == null) { return executorService.awaitTermination(timeout, unit); } @@ -130,7 +137,7 @@ public boolean awaitTermination(final long timeout, final TimeUnit unit) throws @Override public Future submit(final Callable task) { if (delegate == null) { - return executorService.submit(task); + return submitToExecutor(task); } return delegate.submit(task); } @@ -138,7 +145,7 @@ public Future submit(final Callable task) { @Override public Future submit(final Runnable task, final T result) { if (delegate == null) { - return executorService.submit(task, result); + return submitToExecutor(task, result); } return delegate.submit(task, result); } @@ -146,13 +153,14 @@ public Future submit(final Runnable task, final T result) { @Override public Future submit(final Runnable task) { if (delegate == null) { - return executorService.submit(task); + return submitToExecutor(task); } return delegate.submit(task); } @Override - public List> invokeAll(final Collection> tasks) throws InterruptedException { + public List> invokeAll(final Collection> tasks) + throws InterruptedException { if (delegate == null) { return executorService.invokeAll(tasks); } @@ -160,8 +168,9 @@ public List> invokeAll(final Collection> tas } @Override - public List> invokeAll(final Collection> tasks, final long timeout, - final TimeUnit unit) throws InterruptedException { + public List> invokeAll(final Collection> tasks, + final long timeout, + final TimeUnit unit) throws InterruptedException { if (delegate == null) { return executorService.invokeAll(tasks, timeout, unit); } @@ -170,7 +179,7 @@ public List> invokeAll(final Collection> tas @Override public T invokeAny(final Collection> tasks) - throws InterruptedException, ExecutionException { + throws InterruptedException, ExecutionException { if (delegate == null) { return executorService.invokeAny(tasks); } @@ -178,8 +187,9 @@ public T invokeAny(final Collection> tasks) } @Override - public T invokeAny(final Collection> tasks, final long timeout, final TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { + public T invokeAny(final Collection> tasks, final long timeout, + final TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { if (delegate == null) { return executorService.invokeAny(tasks, timeout, unit); } @@ -189,14 +199,15 @@ public T invokeAny(final Collection> tasks, final long @Override public void execute(final Runnable command) { if (delegate == null) { - executorService.execute(command); + executeInExecutor(command); } else { delegate.execute(command); } } @Override - public ScheduledFuture schedule(final Runnable command, final long delay, final TimeUnit unit) { + public ScheduledFuture schedule(final Runnable command, final long delay, + final TimeUnit unit) { if (delegate == null) { return addTask(new OneTime(command, delay, unit)); } @@ -204,7 +215,8 @@ public ScheduledFuture schedule(final Runnable command, final long delay, fin } @Override - public ScheduledFuture schedule(final Callable callable, final long delay, final TimeUnit unit) { + public ScheduledFuture schedule(final Callable callable, final long delay, + final TimeUnit unit) { if (delegate == null) { return addTask(new OneTimeCallable<>(callable, delay, unit)); } @@ -212,8 +224,9 @@ public ScheduledFuture schedule(final Callable callable, final long de } @Override - public ScheduledFuture scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, - final TimeUnit unit) { + public ScheduledFuture scheduleAtFixedRate(final Runnable command, final long initialDelay, + final long period, + final TimeUnit unit) { if (delegate == null) { return addTask(new FixedRate(command, initialDelay, period, unit)); } @@ -221,38 +234,58 @@ public ScheduledFuture scheduleAtFixedRate(final Runnable command, final long } @Override - public ScheduledFuture scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, - final TimeUnit unit) { + public ScheduledFuture scheduleWithFixedDelay(final Runnable command, + final long initialDelay, final long delay, + final TimeUnit unit) { if (delegate == null) { return addTask(new FixedDelay(command, initialDelay, delay, unit)); } return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit); } - private ScheduleFutureImpl addTask(final ScheduleFutureImpl task) { + + protected ScheduleFutureImpl addTask(final ScheduleFutureImpl task) { synchronized (tasks) { if (task.getDelay(TimeUnit.MILLISECONDS) <= 0) { // Run now - executorService.submit(task.getRunnable()); + submitToExecutor(task.getRunnable()); } else { int index = Collections.binarySearch(tasks, task); - if (index < 0) + if (index < 0) { index = -index - 1; + } tasks.add(index, task); } return task; } } + protected void executeInExecutor(final Runnable command) { + executorService.execute(command); + } + + protected Future submitToExecutor(final Runnable runnable) { + return executorService.submit(runnable); + } + + protected Future submitToExecutor(final Callable runnable) { + return executorService.submit(runnable); + } + + protected Future submitToExecutor(final Runnable task, final T result) { + return executorService.submit(task, result); + } + abstract class ScheduleFutureImpl implements ScheduledFuture { + private volatile boolean success; private volatile V result; private volatile Exception exception; private volatile boolean cancelled; private volatile boolean done; - void execute() { - executorService.submit(() -> executeImpl()); + public void execute() { + submitToExecutor(() -> executeImpl()); } abstract void executeImpl(); @@ -261,7 +294,8 @@ void execute() { @Override public int compareTo(final Delayed that) { - return Long.compare(getDelay(TimeUnit.MILLISECONDS), that.getDelay(TimeUnit.MILLISECONDS)); + return Long.compare(getDelay(TimeUnit.MILLISECONDS), + that.getDelay(TimeUnit.MILLISECONDS)); } @Override @@ -296,27 +330,31 @@ public V get() throws InterruptedException, ExecutionException { @Override public V get(final long timeout, final TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { + throws InterruptedException, ExecutionException, TimeoutException { return await(true, unit.toMillis(timeout)); } private V await(final boolean timed, final long millis) - throws InterruptedException, ExecutionException, TimeoutException { + throws InterruptedException, ExecutionException, TimeoutException { final long expiry = clock.millis() + millis; while (true) { synchronized (this) { final long remaining = expiry - clock.millis(); - if (success) + if (success) { return result; - if (exception != null) + } + if (exception != null) { throw new ExecutionException(exception); - if (isCancelled()) + } + if (isCancelled()) { throw new CancellationException(); + } if (timed) { - if (remaining <= 0) + if (remaining <= 0) { throw new TimeoutException(); + } WarpUtils.wait(clock, this, remaining, TimeUnit.MILLISECONDS); } else { wait(); @@ -353,6 +391,7 @@ protected void exception(final Exception exception) { } class OneTime extends ScheduleFutureImpl { + private final Runnable command; private final long runtime; @@ -380,6 +419,7 @@ public long getDelay(final TimeUnit unit) { } abstract class Repeating extends ScheduleFutureImpl { + private final Runnable command; protected final TimeUnit unit; @@ -423,9 +463,11 @@ public boolean isDone() { } class FixedRate extends Repeating { + private final long period; - public FixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) { + public FixedRate(final Runnable command, final long initialDelay, final long period, + final TimeUnit unit) { super(command, initialDelay, unit); this.period = period; } @@ -437,9 +479,11 @@ void updateNextRuntime() { } class FixedDelay extends Repeating { + private final long delay; - public FixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) { + public FixedDelay(final Runnable command, final long initialDelay, final long delay, + final TimeUnit unit) { super(command, initialDelay, unit); this.delay = delay; } @@ -451,6 +495,7 @@ void updateNextRuntime() { } class OneTimeCallable extends ScheduleFutureImpl { + private final Callable command; private final long runtime; diff --git a/sero-warp/src/test/java/lohbihler/sim/OrderedExecutorServiceTest.java b/sero-warp/src/test/java/lohbihler/sim/OrderedExecutorServiceTest.java new file mode 100644 index 0000000..ace770c --- /dev/null +++ b/sero-warp/src/test/java/lohbihler/sim/OrderedExecutorServiceTest.java @@ -0,0 +1,66 @@ +package lohbihler.sim; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.time.Clock; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lohbihler.warp.OrderedExecutorService; +import lohbihler.warp.TestingWarpScheduledExecutorService; +import lohbihler.warp.WarpClock; +import org.junit.Before; +import org.junit.Test; + +/** + * @author Terry Packer + */ +public class OrderedExecutorServiceTest { + + private WarpClock clock; + private Instant start; + private TestingWarpScheduledExecutorService scheduler; + + @Before + public void before() { + ZoneId zone = ZoneId.systemDefault(); + clock = new WarpClock(zone, LocalDateTime.now(Clock.system(zone))); + start = clock.instant(); + scheduler = new TestingWarpScheduledExecutorService(clock, + new OrderedExecutorService(1, Integer.MAX_VALUE, 60L, + TimeUnit.SECONDS, new LinkedBlockingQueue<>())); + } + + @Test + public void testOrderOfImmediateExecution() { + AtomicInteger counter = new AtomicInteger(); + List runOrder = new CopyOnWriteArrayList<>(); + for (int i = 0; i < 1000; i++) { + int order = counter.getAndIncrement(); + scheduler.execute(() -> { + runOrder.add(order); + }); + } + + //Wait for completion + try { + assertEquals( + scheduler.waitForExecutorTasks(50, TimeUnit.MILLISECONDS, 10, true, true, true), + true); + } catch (InterruptedException e) { + fail(e.getMessage()); + } + + assertEquals(1000, runOrder.size()); + for (int i = 0; i < runOrder.size(); i++) { + assertEquals(i, (int) runOrder.get(i)); + } + + } +} diff --git a/sero-warp/src/test/java/lohbihler/sim/WarpScheduledExecutorServiceTest.java b/sero-warp/src/test/java/lohbihler/sim/WarpScheduledExecutorServiceTest.java index 6f59475..0832008 100644 --- a/sero-warp/src/test/java/lohbihler/sim/WarpScheduledExecutorServiceTest.java +++ b/sero-warp/src/test/java/lohbihler/sim/WarpScheduledExecutorServiceTest.java @@ -14,6 +14,7 @@ import java.time.Duration; import java.time.Instant; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CancellationException; @@ -23,20 +24,20 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Before; -import org.junit.Test; - import lohbihler.warp.WarpClock; import lohbihler.warp.WarpScheduledExecutorService; +import org.junit.Before; +import org.junit.Test; public class WarpScheduledExecutorServiceTest { + private WarpClock clock; private Instant start; private WarpScheduledExecutorService scheduler; @Before public void before() { + ZoneId zone = ZoneId.systemDefault(); clock = new WarpClock(); start = clock.instant(); scheduler = new WarpScheduledExecutorService(clock); @@ -44,12 +45,15 @@ public void before() { @Test public void callables() throws InterruptedException { - final ScheduledFuture success = scheduler.schedule(() -> true, 100, TimeUnit.MINUTES); - final ScheduledFuture timeout = scheduler.schedule(() -> true, 100, TimeUnit.MINUTES); + final ScheduledFuture success = scheduler.schedule(() -> true, 100, + TimeUnit.MINUTES); + final ScheduledFuture timeout = scheduler.schedule(() -> true, 100, + TimeUnit.MINUTES); final ScheduledFuture exception = scheduler.schedule(() -> { throw new Exception("test ex"); }, 100, TimeUnit.MINUTES); - final ScheduledFuture cancel = scheduler.schedule(() -> true, 100, TimeUnit.MINUTES); + final ScheduledFuture cancel = scheduler.schedule(() -> true, 100, + TimeUnit.MINUTES); // Create threads to get future results. final AtomicBoolean successResult = new AtomicBoolean(false); @@ -128,11 +132,13 @@ public void callables() throws InterruptedException { @Test public void fixedRate() throws InterruptedException { final List instants1 = new ArrayList<>(); - final ScheduledFuture future1 = scheduler.scheduleAtFixedRate(() -> instants1.add(clock.instant()), 3, 2, - TimeUnit.MINUTES); + final ScheduledFuture future1 = scheduler.scheduleAtFixedRate( + () -> instants1.add(clock.instant()), 3, 2, + TimeUnit.MINUTES); final List instants2 = new ArrayList<>(); - final ScheduledFuture future2 = scheduler.scheduleAtFixedRate(() -> instants2.add(clock.instant()), 4, 2, - TimeUnit.MINUTES); + final ScheduledFuture future2 = scheduler.scheduleAtFixedRate( + () -> instants2.add(clock.instant()), 4, 2, + TimeUnit.MINUTES); // Run the minutes individually to ensure the runtimes. clock.plus(7, TimeUnit.MINUTES, 1, TimeUnit.MINUTES, 20, 0); @@ -182,11 +188,13 @@ public void fixedRate() throws InterruptedException { @Test public void fixedDelay() throws InterruptedException { final List instants1 = new ArrayList<>(); - final ScheduledFuture future1 = scheduler.scheduleWithFixedDelay(() -> instants1.add(clock.instant()), 3, 2, - TimeUnit.MINUTES); + final ScheduledFuture future1 = scheduler.scheduleWithFixedDelay( + () -> instants1.add(clock.instant()), 3, 2, + TimeUnit.MINUTES); final List instants2 = new ArrayList<>(); - final ScheduledFuture future2 = scheduler.scheduleWithFixedDelay(() -> instants2.add(clock.instant()), 4, 2, - TimeUnit.MINUTES); + final ScheduledFuture future2 = scheduler.scheduleWithFixedDelay( + () -> instants2.add(clock.instant()), 4, 2, + TimeUnit.MINUTES); // Run the minutes individually to ensure the runtimes. clock.plus(7, TimeUnit.MINUTES, 1, TimeUnit.MINUTES, 20, 20); @@ -236,7 +244,8 @@ public void fixedDelay() throws InterruptedException { public void scheduled() { final List instants = new ArrayList<>(); - final ScheduledFuture future = scheduler.schedule(() -> instants.add(clock.instant()), 10, TimeUnit.MINUTES); + final ScheduledFuture future = scheduler.schedule(() -> instants.add(clock.instant()), + 10, TimeUnit.MINUTES); scheduler.schedule((Runnable) () -> instants.add(clock.instant()), 11, TimeUnit.MINUTES); scheduler.schedule((Runnable) () -> instants.add(clock.instant()), 12, TimeUnit.MINUTES); scheduler.schedule((Runnable) () -> instants.add(clock.instant()), 15, TimeUnit.MINUTES);