This repository was archived by the owner on Apr 1, 2025. It is now read-only.
File tree Expand file tree Collapse file tree 5 files changed +69
-6
lines changed
src/main/scala/com/signalpath/spcron Expand file tree Collapse file tree 5 files changed +69
-6
lines changed Original file line number Diff line number Diff line change @@ -7,6 +7,12 @@ from running on multiple instances at the same time.
77
88## Usage
99
10+ Add ` https://dl.bintray.com/signalpath/scala ` to your list of
11+ maven resolvers and then add the following to your build.sbt.
12+ ```
13+ libraryDependencies += "com.signalpath" %% "sp-cron" % "1.0"
14+ ```
15+
1016Running cron jobs is as simple as creating a Runner and passing
1117it JobDefinitions. The following example defines a single job
1218which prints the time every minute. The example assumes that
@@ -18,14 +24,14 @@ val lock = new RedisLockingWorker(redis)
1824val runner = new Runner(lock)
1925val jobs = Seq(
2026 JobDefinition(
21- "0 * * ? * * ",
22- "test2 ",
23- IO({ println(LocalDateTime.now()) }),
24- 45
27+ cronExpression = "0 * * ? * * ",
28+ name = "clock ",
29+ job = IO({ println(LocalDateTime.now()) }),
30+ ttl = 45
2531 )
2632)
2733runner.start(jobs)
28- ```
34+ ```
2935
3036Likely, a job will have side effects involving futures. Because
3137the JobDescription expects an ` IO[Unit] ` the future will either
Original file line number Diff line number Diff line change @@ -12,3 +12,7 @@ libraryDependencies ++= Seq(
1212 " org.scalamock" %% " scalamock" % " 4.4.0" % Test ,
1313 " org.scalatest" %% " scalatest" % " 3.1.0" % Test
1414)
15+
16+ bintrayOrganization := Some (" signalpath" )
17+ bintrayRepository := " scala"
18+ licenses += (" MIT" , url(" http://opensource.org/licenses/MIT" ))
Original file line number Diff line number Diff line change 1+ addSbtPlugin(" org.foundweekends" % " sbt-bintray" % " 0.5.6" )
Original file line number Diff line number Diff line change @@ -2,11 +2,46 @@ package com.signalpath.spcron
22
33import cats .effect .IO
44
5+ /**
6+ * Defines the redis functions needed by this library. Abstracted to all implementation of your redis
7+ * library of choice.
8+ */
59trait LockingWorker {
610
11+ /**
12+ * Redis setNx command.
13+ *
14+ * @param key Redis key name.
15+ * @param value Value to set in the key. Will be the runner id.
16+ * @return True if key was set.
17+ */
718 def setNx (key : String , value : String ): IO [Boolean ]
19+
20+ /**
21+ * Redis expire command.
22+ *
23+ * @param key Redis key name.
24+ * @param ttl Duration of time, in seconds, the key should exist for.
25+ * @return True if successfully set.
26+ */
827 def expire (key : String , ttl : Long ): IO [Boolean ]
28+
29+ /**
30+ * Redis get command.
31+ *
32+ * @param key Redis key name.
33+ * @return String value stored in the key.
34+ */
935 def get (key : String ): IO [String ]
36+
37+ /**
38+ * Redis eval command.
39+ *
40+ * @param script "String lua script to execute. Will execute a ttl and optionally an expire command.
41+ * @param key Redis key name.
42+ * @param ttl Duration of time, in seconds, the key should exist for.
43+ * @return True if successful.
44+ */
1045 def eval (script : String , key : String , ttl : Long ): IO [Boolean ]
1146
1247}
Original file line number Diff line number Diff line change @@ -7,6 +7,14 @@ import fs2.{Stream => fs2Stream}
77
88import scala .concurrent .ExecutionContext
99
10+ /**
11+ * The definition of a job to be run on a schedule.
12+ *
13+ * @param cronExpression A cron4s compatible cron expression.
14+ * @param name A unique name for the job. This will be part of the lock name.
15+ * @param job The work to be done.
16+ * @param ttl How long the lock should be held for the job execution.
17+ */
1018case class JobDefinition (
1119 cronExpression : String ,
1220 name : String ,
@@ -16,17 +24,26 @@ case class JobDefinition(
1624 def cron : cron4s.CronExpr = Cron .unsafeParse(cronExpression)
1725}
1826
27+ /**
28+ * The job runner. Create this and call the start method to start jobs executing.
29+ *
30+ * @param lock An implemented LockingWorker.
31+ */
1932class Runner (lock : LockingWorker ) {
2033
2134 val runnerId = java.util.UUID .randomUUID().toString
2235
36+ /**
37+ * Starts a collection of jobs which are executed on their cron schedules.
38+ *
39+ * @param jobs Jobs to be executed.
40+ */
2341 def start (jobs : Seq [JobDefinition ]): Unit = {
2442 implicit val timer : Timer [IO ] = IO .timer(ExecutionContext .global)
2543 val streams = jobs.map { job =>
2644 awakeEveryCron[IO ](job.cron) >> worker(job.job, job.name, job.ttl)
2745 }
2846 streams
29- // .dropRight(1)
3047 .map(j => j.compile.drain.unsafeRunAsync((cb : Either [Throwable , Unit ]) => Unit ))
3148 ()
3249 }
You can’t perform that action at this time.
0 commit comments