Skip to content
This repository was archived by the owner on Nov 13, 2018. It is now read-only.

code upgrade to Akka2.5 #10

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .classpath
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8"/>
<classpathentry kind="output" path="target/classes"/>
</classpath>
23 changes: 23 additions & 0 deletions .project
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>activator-akka-distributed-workers-java</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>
13 changes: 13 additions & 0 deletions .settings/org.eclipse.jdt.core.prefs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.source=1.8
4 changes: 4 additions & 0 deletions .settings/org.eclipse.m2e.core.prefs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
activeProfiles=
eclipse.preferences.version=1
resolveWorkspaceProjects=true
version=1
28 changes: 24 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,43 @@
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.3.4</version>
<version>2.5.8</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.11</artifactId>
<version>2.3.4</version>
<version>2.5.8</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-contrib_2.11</artifactId>
<version>2.3.4</version>
<version>2.5.8</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.11</artifactId>
<version>2.3.4</version>
<version>2.5.8</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-tools_2.11</artifactId>
<version>2.5.8</version>
</dependency>
<dependency>
<groupId>org.iq80.leveldb</groupId>
<artifactId>leveldb</artifactId>
<version>0.7</version>
</dependency>
<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
<version>1.8</version>
</dependency>
<dependency>
<groupId>org.apache.directory.studio</groupId>
<artifactId>org.apache.commons.io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/worker/Frontend.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package worker;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.actor.UntypedAbstractActor;
import akka.cluster.singleton.ClusterSingletonProxy;
import akka.cluster.singleton.ClusterSingletonProxySettings;
import akka.dispatch.Mapper;
Expand All @@ -16,13 +16,13 @@
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;

public class Frontend extends UntypedActor {
public class Frontend extends UntypedAbstractActor {


ActorRef masterProxy = getContext().actorOf(
ClusterSingletonProxy.props(
"/user/master",
ClusterSingletonProxySettings.create(getContext().system()).withRole("backend")),
ClusterSingletonProxySettings.create(getContext().system()).withRole("backend").withSingletonName("singleton")),
"masterProxy");

public void onReceive(Object message) {
Expand Down
16 changes: 12 additions & 4 deletions src/main/java/worker/Main.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package worker;

import akka.actor.*;
import akka.actor.AbstractActor.Receive;
import akka.cluster.client.ClusterClient;
import akka.cluster.client.ClusterClientSettings;
import akka.cluster.singleton.ClusterSingletonManager;
import akka.cluster.singleton.ClusterSingletonManagerSettings;
import akka.dispatch.OnFailure;
import akka.dispatch.OnSuccess;
import akka.pattern.Patterns;
import akka.persistence.AbstractPersistentActor;
import akka.persistence.SnapshotOffer;
import akka.persistence.journal.leveldb.SharedLeveldbJournal;
import akka.persistence.journal.leveldb.SharedLeveldbStore;
import akka.util.Timeout;
Expand All @@ -19,6 +22,10 @@
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

import static java.util.Arrays.asList;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -54,7 +61,7 @@ public static void startBackend(int port, String role) {
ActorSystem system = ActorSystem.create("ClusterSystem", conf);

startupSharedJournal(system, (port == 2551),
ActorPaths.fromString("akka.tcp://[email protected]:2551/user/store"));
ActorPaths.fromString("akka.tcp://[email protected]:2551/user/store"),role);

system.actorOf(
ClusterSingletonManager.props(
Expand Down Expand Up @@ -88,7 +95,7 @@ public static void startFrontend(int port) {
}


public static void startupSharedJournal(final ActorSystem system, boolean startStore, final ActorPath path) {
public static void startupSharedJournal(final ActorSystem system, boolean startStore, final ActorPath path,String role) {
// Start the shared journal one one node (don't crash this SPOF)
// This will not be needed with a distributed journal
if (startStore) {
Expand All @@ -105,8 +112,9 @@ public static void startupSharedJournal(final ActorSystem system, boolean startS

@Override
public void onSuccess(Object arg0) throws Throwable {
if (arg0 instanceof ActorIdentity && ((ActorIdentity) arg0).getRef() != null) {
SharedLeveldbJournal.setStore(((ActorIdentity) arg0).getRef(), system);
if (arg0 instanceof ActorIdentity && ((ActorIdentity) arg0).getActorRef() != null) {
SharedLeveldbJournal.setStore(((ActorIdentity) arg0).getRef(), system);

} else {
system.log().error("Lookup of shared journal at {} timed out", path);
System.exit(-1);
Expand Down
81 changes: 49 additions & 32 deletions src/main/java/worker/Master.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package worker;

import akka.actor.*;
import akka.actor.AbstractActor.Receive;
import akka.cluster.Cluster;
import akka.cluster.client.ClusterClientReceptionist;
import akka.cluster.client.ClusterReceptionist;
Expand All @@ -9,6 +10,7 @@
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.persistence.AbstractPersistentActor;
import akka.persistence.UntypedPersistentActor;
import akka.japi.Procedure;

Expand All @@ -27,7 +29,7 @@
import worker.WorkState.WorkerTimedOut;
import static worker.MasterWorkerProtocol.*;

public class Master extends UntypedPersistentActor {
public class Master extends AbstractPersistentActor {

public static String ResultsTopic = "results";

Expand Down Expand Up @@ -232,14 +234,17 @@ public String toString() {
}

@Override
public void onReceiveRecover(Object arg0) throws Exception {
if (arg0 instanceof WorkDomainEvent) {
workState = workState.updated((WorkDomainEvent) arg0);
log.info("Replayed {}", arg0.getClass().getSimpleName());
}
public Receive createReceiveRecover() {
// TODO Auto-generated method stub
return receiveBuilder().
match (WorkDomainEvent.class, cmd -> {
workState = workState.updated(cmd);
log.info("Replayed {}", cmd.getClass().getSimpleName());
}).build();
}

@Override


//@Override
public String persistenceId() {
for (String role : JavaConversions.asJavaIterable((Cluster.get(getContext().system()).selfRoles()))) {
if (role.startsWith("backend-")) {
Expand All @@ -251,30 +256,42 @@ public String persistenceId() {
}

@Override
public void onReceiveCommand(Object cmd) throws Exception {
if (cmd instanceof RegisterWorker) {
String workerId = ((RegisterWorker) cmd).workerId;
if (workers.containsKey(workerId)) {
workers.put(workerId, workers.get(workerId).copyWithRef(getSender()));
} else {
log.info("Worker registered: {}", workerId);
workers.put(workerId, new WorkerState(getSender(), Idle.instance));
if (workState.hasWork()) {
getSender().tell(WorkIsReady.getInstance(), getSelf());
}
}
} else if (cmd instanceof WorkerRequestsWork) {
if (workState.hasWork()) {
final String workerId = ((WorkerRequestsWork) cmd).workerId;
final WorkerState state = workers.get(workerId);
if (state != null && state.status.isIdle()) {
final Work work = workState.nextWork();
persist(new WorkState.WorkStarted(work.workId), new Procedure<WorkState.WorkStarted>() {
public void apply(WorkStarted event) throws Exception {
workState = workState.updated(event);
log.info("Giving worker {} some work {}", workerId, event.workId);
workers.put(workerId, state.copyWithStatus(new Busy(event.workId, workTimeout.fromNow())));
getSender().tell(work, getSelf());
public Receive createReceive(){

return receiveBuilder().
match(Object.class, cmd -> updateState(cmd)).build();

}

void updateState(Object cmd) {

if (cmd instanceof RegisterWorker) {
String workerId = ((RegisterWorker) cmd).workerId;
if (workers.containsKey(workerId)) {
workers.put(workerId, workers.get(workerId).copyWithRef(getSender()));
} else {
log.info("Worker registered: {}", workerId);
workers.put(workerId, new WorkerState(getSender(), Idle.instance));
if (workState.hasWork()) {
getSender().tell(WorkIsReady.getInstance(), getSelf());
}
}
}



else if (cmd instanceof WorkerRequestsWork) {
if (workState.hasWork()) {
final String workerId = ((WorkerRequestsWork) cmd).workerId;
final WorkerState state = workers.get(workerId);
if (state != null && state.status.isIdle()) {
final Work work = workState.nextWork();
persist(new WorkState.WorkStarted(work.workId), new Procedure<WorkState.WorkStarted>() {
public void apply(WorkStarted event) throws Exception {
workState = workState.updated(event);
log.info("Giving worker {} some work {}", workerId, event.workId);
workers.put(workerId, state.copyWithStatus(new Busy(event.workId, workTimeout.fromNow())));
getSender().tell(work, getSelf());

}
});
Expand Down