From f8e9065f536e66d19ddd295bce1b9b0f8d340d5a Mon Sep 17 00:00:00 2001 From: Anant Pandey Date: Mon, 7 May 2018 09:04:26 +0100 Subject: [PATCH 1/3] code upgrade to Akka2.5 --- .classpath | 32 +++++++++++ .project | 23 ++++++++ .settings/org.eclipse.jdt.core.prefs | 13 +++++ .settings/org.eclipse.m2e.core.prefs | 4 ++ pom.xml | 28 ++++++++-- src/main/java/worker/Frontend.java | 6 +-- src/main/java/worker/Main.java | 16 ++++-- src/main/java/worker/Master.java | 81 +++++++++++++++++----------- 8 files changed, 160 insertions(+), 43 deletions(-) create mode 100644 .classpath create mode 100644 .project create mode 100644 .settings/org.eclipse.jdt.core.prefs create mode 100644 .settings/org.eclipse.m2e.core.prefs diff --git a/.classpath b/.classpath new file mode 100644 index 0000000..e07fea7 --- /dev/null +++ b/.classpath @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.project b/.project new file mode 100644 index 0000000..1ba7f71 --- /dev/null +++ b/.project @@ -0,0 +1,23 @@ + + + activator-akka-distributed-workers-java + + + + + + org.eclipse.jdt.core.javabuilder + + + + + org.eclipse.m2e.core.maven2Builder + + + + + + org.eclipse.jdt.core.javanature + org.eclipse.m2e.core.maven2Nature + + diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 0000000..13b3428 --- /dev/null +++ b/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,13 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled +org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8 +org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve +org.eclipse.jdt.core.compiler.compliance=1.8 +org.eclipse.jdt.core.compiler.debug.lineNumber=generate +org.eclipse.jdt.core.compiler.debug.localVariable=generate +org.eclipse.jdt.core.compiler.debug.sourceFile=generate +org.eclipse.jdt.core.compiler.problem.assertIdentifier=error +org.eclipse.jdt.core.compiler.problem.enumIdentifier=error +org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning +org.eclipse.jdt.core.compiler.source=1.8 diff --git a/.settings/org.eclipse.m2e.core.prefs b/.settings/org.eclipse.m2e.core.prefs new file mode 100644 index 0000000..f897a7f --- /dev/null +++ b/.settings/org.eclipse.m2e.core.prefs @@ -0,0 +1,4 @@ +activeProfiles= +eclipse.preferences.version=1 +resolveWorkspaceProjects=true +version=1 diff --git a/pom.xml b/pom.xml index d8f5ce4..df2d198 100644 --- a/pom.xml +++ b/pom.xml @@ -7,23 +7,43 @@ com.typesafe.akka akka-actor_2.11 - 2.3.4 + 2.5.8 com.typesafe.akka akka-cluster_2.11 - 2.3.4 + 2.5.8 com.typesafe.akka akka-contrib_2.11 - 2.3.4 + 2.5.8 com.typesafe.akka akka-testkit_2.11 - 2.3.4 + 2.5.8 + + com.typesafe.akka + akka-cluster-tools_2.11 + 2.5.8 + + + org.iq80.leveldb + leveldb + 0.7 + + + org.fusesource.leveldbjni + leveldbjni-all + 1.8 + + + org.apache.directory.studio + org.apache.commons.io + 2.4 + junit junit diff --git a/src/main/java/worker/Frontend.java b/src/main/java/worker/Frontend.java index 77dd2f2..a3dee0f 100644 --- a/src/main/java/worker/Frontend.java +++ b/src/main/java/worker/Frontend.java @@ -1,7 +1,7 @@ package worker; import akka.actor.ActorRef; -import akka.actor.UntypedActor; +import akka.actor.UntypedAbstractActor; import akka.cluster.singleton.ClusterSingletonProxy; import akka.cluster.singleton.ClusterSingletonProxySettings; import akka.dispatch.Mapper; @@ -16,13 +16,13 @@ import static akka.pattern.Patterns.ask; import static akka.pattern.Patterns.pipe; -public class Frontend extends UntypedActor { +public class Frontend extends UntypedAbstractActor { ActorRef masterProxy = getContext().actorOf( ClusterSingletonProxy.props( "/user/master", - ClusterSingletonProxySettings.create(getContext().system()).withRole("backend")), + ClusterSingletonProxySettings.create(getContext().system()).withRole("backend").withSingletonName("singleton")), "masterProxy"); public void onReceive(Object message) { diff --git a/src/main/java/worker/Main.java b/src/main/java/worker/Main.java index ed7e570..198cf1f 100644 --- a/src/main/java/worker/Main.java +++ b/src/main/java/worker/Main.java @@ -1,6 +1,7 @@ package worker; import akka.actor.*; +import akka.actor.AbstractActor.Receive; import akka.cluster.client.ClusterClient; import akka.cluster.client.ClusterClientSettings; import akka.cluster.singleton.ClusterSingletonManager; @@ -8,6 +9,8 @@ import akka.dispatch.OnFailure; import akka.dispatch.OnSuccess; import akka.pattern.Patterns; +import akka.persistence.AbstractPersistentActor; +import akka.persistence.SnapshotOffer; import akka.persistence.journal.leveldb.SharedLeveldbJournal; import akka.persistence.journal.leveldb.SharedLeveldbStore; import akka.util.Timeout; @@ -19,6 +22,10 @@ import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; +import static java.util.Arrays.asList; + +import java.io.Serializable; +import java.util.ArrayList; import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -54,7 +61,7 @@ public static void startBackend(int port, String role) { ActorSystem system = ActorSystem.create("ClusterSystem", conf); startupSharedJournal(system, (port == 2551), - ActorPaths.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/user/store")); + ActorPaths.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/user/store"),role); system.actorOf( ClusterSingletonManager.props( @@ -88,7 +95,7 @@ public static void startFrontend(int port) { } - public static void startupSharedJournal(final ActorSystem system, boolean startStore, final ActorPath path) { + public static void startupSharedJournal(final ActorSystem system, boolean startStore, final ActorPath path,String role) { // Start the shared journal one one node (don't crash this SPOF) // This will not be needed with a distributed journal if (startStore) { @@ -105,8 +112,9 @@ public static void startupSharedJournal(final ActorSystem system, boolean startS @Override public void onSuccess(Object arg0) throws Throwable { - if (arg0 instanceof ActorIdentity && ((ActorIdentity) arg0).getRef() != null) { - SharedLeveldbJournal.setStore(((ActorIdentity) arg0).getRef(), system); + if (arg0 instanceof ActorIdentity && ((ActorIdentity) arg0).getActorRef() != null) { + SharedLeveldbJournal.setStore(((ActorIdentity) arg0).getRef(), system); + } else { system.log().error("Lookup of shared journal at {} timed out", path); System.exit(-1); diff --git a/src/main/java/worker/Master.java b/src/main/java/worker/Master.java index 3d8f878..bded3fc 100644 --- a/src/main/java/worker/Master.java +++ b/src/main/java/worker/Master.java @@ -1,6 +1,7 @@ package worker; import akka.actor.*; +import akka.actor.AbstractActor.Receive; import akka.cluster.Cluster; import akka.cluster.client.ClusterClientReceptionist; import akka.cluster.client.ClusterReceptionist; @@ -9,6 +10,7 @@ import akka.cluster.pubsub.DistributedPubSubMediator; import akka.event.Logging; import akka.event.LoggingAdapter; +import akka.persistence.AbstractPersistentActor; import akka.persistence.UntypedPersistentActor; import akka.japi.Procedure; @@ -27,7 +29,7 @@ import worker.WorkState.WorkerTimedOut; import static worker.MasterWorkerProtocol.*; -public class Master extends UntypedPersistentActor { +public class Master extends AbstractPersistentActor { public static String ResultsTopic = "results"; @@ -232,14 +234,17 @@ public String toString() { } @Override - public void onReceiveRecover(Object arg0) throws Exception { - if (arg0 instanceof WorkDomainEvent) { - workState = workState.updated((WorkDomainEvent) arg0); - log.info("Replayed {}", arg0.getClass().getSimpleName()); - } + public Receive createReceiveRecover() { + // TODO Auto-generated method stub + return receiveBuilder(). + match (WorkDomainEvent.class, cmd -> { + workState = workState.updated(cmd); + log.info("Replayed {}", cmd.getClass().getSimpleName()); + }).build(); } - - @Override + + + //@Override public String persistenceId() { for (String role : JavaConversions.asJavaIterable((Cluster.get(getContext().system()).selfRoles()))) { if (role.startsWith("backend-")) { @@ -251,30 +256,42 @@ public String persistenceId() { } @Override - public void onReceiveCommand(Object cmd) throws Exception { - if (cmd instanceof RegisterWorker) { - String workerId = ((RegisterWorker) cmd).workerId; - if (workers.containsKey(workerId)) { - workers.put(workerId, workers.get(workerId).copyWithRef(getSender())); - } else { - log.info("Worker registered: {}", workerId); - workers.put(workerId, new WorkerState(getSender(), Idle.instance)); - if (workState.hasWork()) { - getSender().tell(WorkIsReady.getInstance(), getSelf()); - } - } - } else if (cmd instanceof WorkerRequestsWork) { - if (workState.hasWork()) { - final String workerId = ((WorkerRequestsWork) cmd).workerId; - final WorkerState state = workers.get(workerId); - if (state != null && state.status.isIdle()) { - final Work work = workState.nextWork(); - persist(new WorkState.WorkStarted(work.workId), new Procedure() { - public void apply(WorkStarted event) throws Exception { - workState = workState.updated(event); - log.info("Giving worker {} some work {}", workerId, event.workId); - workers.put(workerId, state.copyWithStatus(new Busy(event.workId, workTimeout.fromNow()))); - getSender().tell(work, getSelf()); + public Receive createReceive(){ + + return receiveBuilder(). + match(Object.class, cmd -> updateState(cmd)).build(); + + } + + void updateState(Object cmd) { + + if (cmd instanceof RegisterWorker) { + String workerId = ((RegisterWorker) cmd).workerId; + if (workers.containsKey(workerId)) { + workers.put(workerId, workers.get(workerId).copyWithRef(getSender())); + } else { + log.info("Worker registered: {}", workerId); + workers.put(workerId, new WorkerState(getSender(), Idle.instance)); + if (workState.hasWork()) { + getSender().tell(WorkIsReady.getInstance(), getSelf()); + } + } + } + + + + else if (cmd instanceof WorkerRequestsWork) { + if (workState.hasWork()) { + final String workerId = ((WorkerRequestsWork) cmd).workerId; + final WorkerState state = workers.get(workerId); + if (state != null && state.status.isIdle()) { + final Work work = workState.nextWork(); + persist(new WorkState.WorkStarted(work.workId), new Procedure() { + public void apply(WorkStarted event) throws Exception { + workState = workState.updated(event); + log.info("Giving worker {} some work {}", workerId, event.workId); + workers.put(workerId, state.copyWithStatus(new Busy(event.workId, workTimeout.fromNow()))); + getSender().tell(work, getSelf()); } }); From bade588146ec6fff521c8c93d52a44665ff41f28 Mon Sep 17 00:00:00 2001 From: Anant <33789263+anant1525@users.noreply.github.com> Date: Sun, 10 Dec 2023 18:40:23 +0530 Subject: [PATCH 2/3] Created using Colaboratory --- kittieAl.ipynb | 145 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 kittieAl.ipynb diff --git a/kittieAl.ipynb b/kittieAl.ipynb new file mode 100644 index 0000000..4314959 --- /dev/null +++ b/kittieAl.ipynb @@ -0,0 +1,145 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "provenance": [], + "authorship_tag": "ABX9TyMEIs50eRRlXt22vx24TEbf", + "include_colab_link": true + }, + "kernelspec": { + "name": "python3", + "display_name": "Python 3" + }, + "language_info": { + "name": "python" + } + }, + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "view-in-github", + "colab_type": "text" + }, + "source": [ + "\"Open" + ] + }, + { + "cell_type": "code", + "execution_count": 145, + "metadata": { + "id": "4oZjqFeGhlJT" + }, + "outputs": [], + "source": [ + "# importing time module\n", + "import time" + ] + }, + { + "cell_type": "code", + "source": [ + "# forming the intial disctionary\n", + "\n", + "Dict = {0: 'Anant', 1: 'Govind', 2: 'Amit', 3: 'Anuj', 4: 'Yash', 5: 'Karan'}\n", + "#print(Dict)" + ], + "metadata": { + "id": "UDO3HbtljtBU" + }, + "execution_count": 146, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "# Get the time in nanoseconds\n", + "# since the epoch\n", + "# using time.time_ns() method\n", + "time_nanosec = time.time_ns()\n", + "\n", + "# Print the time\n", + "# in nanoseconds since the epoch\n", + "print(\"Time in nanoseconds since the epoch:\", time_nanosec)" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "-C4pIY-9igsE", + "outputId": "dc0a8a4d-951a-4b20-cfd3-d3440c349880" + }, + "execution_count": 147, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "Time in nanoseconds since the epoch: 1702192665356612285\n" + ] + } + ] + }, + { + "cell_type": "code", + "source": [ + "# This will do the modulo\n", + "modulo = time_nanosec % 6" + ], + "metadata": { + "id": "7gjOwfsOihEi" + }, + "execution_count": 148, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "# optional part of printing the modulo\n", + "print(\"modulo 6 is:\", modulo)" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "KfgKBVa4ihaK", + "outputId": "5a31edc6-2060-4456-8a45-f47269cb50f6" + }, + "execution_count": 149, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "modulo 6 is: 5\n" + ] + } + ] + }, + { + "cell_type": "code", + "source": [ + "print (\"This cycle kitty goes to:\", Dict[int(modulo)])" + ], + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "tqay4umckby6", + "outputId": "5443d25c-e3ef-4b29-b9cb-acf61306a255" + }, + "execution_count": 150, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "This cycle kitty goes to: Karan\n" + ] + } + ] + } + ] +} \ No newline at end of file From 5e701cfa8ebee5c1e33de1c4ed100c2a0cd43c56 Mon Sep 17 00:00:00 2001 From: Anant <33789263+anant1525@users.noreply.github.com> Date: Sun, 10 Dec 2023 18:43:51 +0530 Subject: [PATCH 3/3] Delete kittieAl.ipynb --- kittieAl.ipynb | 145 ------------------------------------------------- 1 file changed, 145 deletions(-) delete mode 100644 kittieAl.ipynb diff --git a/kittieAl.ipynb b/kittieAl.ipynb deleted file mode 100644 index 4314959..0000000 --- a/kittieAl.ipynb +++ /dev/null @@ -1,145 +0,0 @@ -{ - "nbformat": 4, - "nbformat_minor": 0, - "metadata": { - "colab": { - "provenance": [], - "authorship_tag": "ABX9TyMEIs50eRRlXt22vx24TEbf", - "include_colab_link": true - }, - "kernelspec": { - "name": "python3", - "display_name": "Python 3" - }, - "language_info": { - "name": "python" - } - }, - "cells": [ - { - "cell_type": "markdown", - "metadata": { - "id": "view-in-github", - "colab_type": "text" - }, - "source": [ - "\"Open" - ] - }, - { - "cell_type": "code", - "execution_count": 145, - "metadata": { - "id": "4oZjqFeGhlJT" - }, - "outputs": [], - "source": [ - "# importing time module\n", - "import time" - ] - }, - { - "cell_type": "code", - "source": [ - "# forming the intial disctionary\n", - "\n", - "Dict = {0: 'Anant', 1: 'Govind', 2: 'Amit', 3: 'Anuj', 4: 'Yash', 5: 'Karan'}\n", - "#print(Dict)" - ], - "metadata": { - "id": "UDO3HbtljtBU" - }, - "execution_count": 146, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# Get the time in nanoseconds\n", - "# since the epoch\n", - "# using time.time_ns() method\n", - "time_nanosec = time.time_ns()\n", - "\n", - "# Print the time\n", - "# in nanoseconds since the epoch\n", - "print(\"Time in nanoseconds since the epoch:\", time_nanosec)" - ], - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "id": "-C4pIY-9igsE", - "outputId": "dc0a8a4d-951a-4b20-cfd3-d3440c349880" - }, - "execution_count": 147, - "outputs": [ - { - "output_type": "stream", - "name": "stdout", - "text": [ - "Time in nanoseconds since the epoch: 1702192665356612285\n" - ] - } - ] - }, - { - "cell_type": "code", - "source": [ - "# This will do the modulo\n", - "modulo = time_nanosec % 6" - ], - "metadata": { - "id": "7gjOwfsOihEi" - }, - "execution_count": 148, - "outputs": [] - }, - { - "cell_type": "code", - "source": [ - "# optional part of printing the modulo\n", - "print(\"modulo 6 is:\", modulo)" - ], - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "id": "KfgKBVa4ihaK", - "outputId": "5a31edc6-2060-4456-8a45-f47269cb50f6" - }, - "execution_count": 149, - "outputs": [ - { - "output_type": "stream", - "name": "stdout", - "text": [ - "modulo 6 is: 5\n" - ] - } - ] - }, - { - "cell_type": "code", - "source": [ - "print (\"This cycle kitty goes to:\", Dict[int(modulo)])" - ], - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "id": "tqay4umckby6", - "outputId": "5443d25c-e3ef-4b29-b9cb-acf61306a255" - }, - "execution_count": 150, - "outputs": [ - { - "output_type": "stream", - "name": "stdout", - "text": [ - "This cycle kitty goes to: Karan\n" - ] - } - ] - } - ] -} \ No newline at end of file