diff --git a/.classpath b/.classpath
new file mode 100644
index 0000000..e07fea7
--- /dev/null
+++ b/.classpath
@@ -0,0 +1,32 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/.project b/.project
new file mode 100644
index 0000000..1ba7f71
--- /dev/null
+++ b/.project
@@ -0,0 +1,23 @@
+
+
+ activator-akka-distributed-workers-java
+
+
+
+
+
+ org.eclipse.jdt.core.javabuilder
+
+
+
+
+ org.eclipse.m2e.core.maven2Builder
+
+
+
+
+
+ org.eclipse.jdt.core.javanature
+ org.eclipse.m2e.core.maven2Nature
+
+
diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..13b3428
--- /dev/null
+++ b/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,13 @@
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
+org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
+org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
+org.eclipse.jdt.core.compiler.compliance=1.8
+org.eclipse.jdt.core.compiler.debug.lineNumber=generate
+org.eclipse.jdt.core.compiler.debug.localVariable=generate
+org.eclipse.jdt.core.compiler.debug.sourceFile=generate
+org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
+org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.8
diff --git a/.settings/org.eclipse.m2e.core.prefs b/.settings/org.eclipse.m2e.core.prefs
new file mode 100644
index 0000000..f897a7f
--- /dev/null
+++ b/.settings/org.eclipse.m2e.core.prefs
@@ -0,0 +1,4 @@
+activeProfiles=
+eclipse.preferences.version=1
+resolveWorkspaceProjects=true
+version=1
diff --git a/pom.xml b/pom.xml
index d8f5ce4..df2d198 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,23 +7,43 @@
com.typesafe.akka
akka-actor_2.11
- 2.3.4
+ 2.5.8
com.typesafe.akka
akka-cluster_2.11
- 2.3.4
+ 2.5.8
com.typesafe.akka
akka-contrib_2.11
- 2.3.4
+ 2.5.8
com.typesafe.akka
akka-testkit_2.11
- 2.3.4
+ 2.5.8
+
+ com.typesafe.akka
+ akka-cluster-tools_2.11
+ 2.5.8
+
+
+ org.iq80.leveldb
+ leveldb
+ 0.7
+
+
+ org.fusesource.leveldbjni
+ leveldbjni-all
+ 1.8
+
+
+ org.apache.directory.studio
+ org.apache.commons.io
+ 2.4
+
junit
junit
diff --git a/src/main/java/worker/Frontend.java b/src/main/java/worker/Frontend.java
index 77dd2f2..a3dee0f 100644
--- a/src/main/java/worker/Frontend.java
+++ b/src/main/java/worker/Frontend.java
@@ -1,7 +1,7 @@
package worker;
import akka.actor.ActorRef;
-import akka.actor.UntypedActor;
+import akka.actor.UntypedAbstractActor;
import akka.cluster.singleton.ClusterSingletonProxy;
import akka.cluster.singleton.ClusterSingletonProxySettings;
import akka.dispatch.Mapper;
@@ -16,13 +16,13 @@
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
-public class Frontend extends UntypedActor {
+public class Frontend extends UntypedAbstractActor {
ActorRef masterProxy = getContext().actorOf(
ClusterSingletonProxy.props(
"/user/master",
- ClusterSingletonProxySettings.create(getContext().system()).withRole("backend")),
+ ClusterSingletonProxySettings.create(getContext().system()).withRole("backend").withSingletonName("singleton")),
"masterProxy");
public void onReceive(Object message) {
diff --git a/src/main/java/worker/Main.java b/src/main/java/worker/Main.java
index ed7e570..198cf1f 100644
--- a/src/main/java/worker/Main.java
+++ b/src/main/java/worker/Main.java
@@ -1,6 +1,7 @@
package worker;
import akka.actor.*;
+import akka.actor.AbstractActor.Receive;
import akka.cluster.client.ClusterClient;
import akka.cluster.client.ClusterClientSettings;
import akka.cluster.singleton.ClusterSingletonManager;
@@ -8,6 +9,8 @@
import akka.dispatch.OnFailure;
import akka.dispatch.OnSuccess;
import akka.pattern.Patterns;
+import akka.persistence.AbstractPersistentActor;
+import akka.persistence.SnapshotOffer;
import akka.persistence.journal.leveldb.SharedLeveldbJournal;
import akka.persistence.journal.leveldb.SharedLeveldbStore;
import akka.util.Timeout;
@@ -19,6 +22,10 @@
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
+import static java.util.Arrays.asList;
+
+import java.io.Serializable;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -54,7 +61,7 @@ public static void startBackend(int port, String role) {
ActorSystem system = ActorSystem.create("ClusterSystem", conf);
startupSharedJournal(system, (port == 2551),
- ActorPaths.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/user/store"));
+ ActorPaths.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/user/store"),role);
system.actorOf(
ClusterSingletonManager.props(
@@ -88,7 +95,7 @@ public static void startFrontend(int port) {
}
- public static void startupSharedJournal(final ActorSystem system, boolean startStore, final ActorPath path) {
+ public static void startupSharedJournal(final ActorSystem system, boolean startStore, final ActorPath path,String role) {
// Start the shared journal one one node (don't crash this SPOF)
// This will not be needed with a distributed journal
if (startStore) {
@@ -105,8 +112,9 @@ public static void startupSharedJournal(final ActorSystem system, boolean startS
@Override
public void onSuccess(Object arg0) throws Throwable {
- if (arg0 instanceof ActorIdentity && ((ActorIdentity) arg0).getRef() != null) {
- SharedLeveldbJournal.setStore(((ActorIdentity) arg0).getRef(), system);
+ if (arg0 instanceof ActorIdentity && ((ActorIdentity) arg0).getActorRef() != null) {
+ SharedLeveldbJournal.setStore(((ActorIdentity) arg0).getRef(), system);
+
} else {
system.log().error("Lookup of shared journal at {} timed out", path);
System.exit(-1);
diff --git a/src/main/java/worker/Master.java b/src/main/java/worker/Master.java
index 3d8f878..bded3fc 100644
--- a/src/main/java/worker/Master.java
+++ b/src/main/java/worker/Master.java
@@ -1,6 +1,7 @@
package worker;
import akka.actor.*;
+import akka.actor.AbstractActor.Receive;
import akka.cluster.Cluster;
import akka.cluster.client.ClusterClientReceptionist;
import akka.cluster.client.ClusterReceptionist;
@@ -9,6 +10,7 @@
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.Logging;
import akka.event.LoggingAdapter;
+import akka.persistence.AbstractPersistentActor;
import akka.persistence.UntypedPersistentActor;
import akka.japi.Procedure;
@@ -27,7 +29,7 @@
import worker.WorkState.WorkerTimedOut;
import static worker.MasterWorkerProtocol.*;
-public class Master extends UntypedPersistentActor {
+public class Master extends AbstractPersistentActor {
public static String ResultsTopic = "results";
@@ -232,14 +234,17 @@ public String toString() {
}
@Override
- public void onReceiveRecover(Object arg0) throws Exception {
- if (arg0 instanceof WorkDomainEvent) {
- workState = workState.updated((WorkDomainEvent) arg0);
- log.info("Replayed {}", arg0.getClass().getSimpleName());
- }
+ public Receive createReceiveRecover() {
+ // TODO Auto-generated method stub
+ return receiveBuilder().
+ match (WorkDomainEvent.class, cmd -> {
+ workState = workState.updated(cmd);
+ log.info("Replayed {}", cmd.getClass().getSimpleName());
+ }).build();
}
-
- @Override
+
+
+ //@Override
public String persistenceId() {
for (String role : JavaConversions.asJavaIterable((Cluster.get(getContext().system()).selfRoles()))) {
if (role.startsWith("backend-")) {
@@ -251,30 +256,42 @@ public String persistenceId() {
}
@Override
- public void onReceiveCommand(Object cmd) throws Exception {
- if (cmd instanceof RegisterWorker) {
- String workerId = ((RegisterWorker) cmd).workerId;
- if (workers.containsKey(workerId)) {
- workers.put(workerId, workers.get(workerId).copyWithRef(getSender()));
- } else {
- log.info("Worker registered: {}", workerId);
- workers.put(workerId, new WorkerState(getSender(), Idle.instance));
- if (workState.hasWork()) {
- getSender().tell(WorkIsReady.getInstance(), getSelf());
- }
- }
- } else if (cmd instanceof WorkerRequestsWork) {
- if (workState.hasWork()) {
- final String workerId = ((WorkerRequestsWork) cmd).workerId;
- final WorkerState state = workers.get(workerId);
- if (state != null && state.status.isIdle()) {
- final Work work = workState.nextWork();
- persist(new WorkState.WorkStarted(work.workId), new Procedure() {
- public void apply(WorkStarted event) throws Exception {
- workState = workState.updated(event);
- log.info("Giving worker {} some work {}", workerId, event.workId);
- workers.put(workerId, state.copyWithStatus(new Busy(event.workId, workTimeout.fromNow())));
- getSender().tell(work, getSelf());
+ public Receive createReceive(){
+
+ return receiveBuilder().
+ match(Object.class, cmd -> updateState(cmd)).build();
+
+ }
+
+ void updateState(Object cmd) {
+
+ if (cmd instanceof RegisterWorker) {
+ String workerId = ((RegisterWorker) cmd).workerId;
+ if (workers.containsKey(workerId)) {
+ workers.put(workerId, workers.get(workerId).copyWithRef(getSender()));
+ } else {
+ log.info("Worker registered: {}", workerId);
+ workers.put(workerId, new WorkerState(getSender(), Idle.instance));
+ if (workState.hasWork()) {
+ getSender().tell(WorkIsReady.getInstance(), getSelf());
+ }
+ }
+ }
+
+
+
+ else if (cmd instanceof WorkerRequestsWork) {
+ if (workState.hasWork()) {
+ final String workerId = ((WorkerRequestsWork) cmd).workerId;
+ final WorkerState state = workers.get(workerId);
+ if (state != null && state.status.isIdle()) {
+ final Work work = workState.nextWork();
+ persist(new WorkState.WorkStarted(work.workId), new Procedure() {
+ public void apply(WorkStarted event) throws Exception {
+ workState = workState.updated(event);
+ log.info("Giving worker {} some work {}", workerId, event.workId);
+ workers.put(workerId, state.copyWithStatus(new Busy(event.workId, workTimeout.fromNow())));
+ getSender().tell(work, getSelf());
}
});