Skip to content
This repository was archived by the owner on Apr 1, 2025. It is now read-only.

Commit ef64799

Browse files
author
Brent Johnson
committed
Initial Commit
1 parent cb34be0 commit ef64799

File tree

6 files changed

+259
-0
lines changed

6 files changed

+259
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
*.class
22
*.log
3+
.idea/**

README.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# SP-Cron
2+
3+
A small library to define functions that run on a cron schedule
4+
using [fs2-cron](https://github.com/fthomas/fs2-cron). Using
5+
[redis](redis.io) to create a lock per job preventing the job
6+
from running on multiple instances at the same time.
7+
8+
## Usage
9+
10+
Running cron jobs is as simple as creating a Runner and passing
11+
it JobDefinitions. The following example defines a single job
12+
which prints the time every minute. The example assumes that
13+
`redis` defines a connection to [redis](redis.io). See the
14+
LockingWorker section for implementing a LockingWorker.
15+
16+
```
17+
val lock = new RedisLockingWorker(redis)
18+
val runner = new Runner(lock)
19+
val jobs = Seq(
20+
JobDefinition(
21+
"0 * * ? * * ",
22+
"test2",
23+
IO({ println(LocalDateTime.now()) }),
24+
45
25+
)
26+
)
27+
runner.start(jobs)
28+
```
29+
30+
Likely, a job will have side effects involving futures. Because
31+
the JobDescription expects an `IO[Unit]` the future will either
32+
need to be resolved, through an Await for example, or by being
33+
converted to an IO. A future can be converted to an IO with
34+
Catbird with, `rerunnableToIO(Rerunnable.fromFuture(F))`, where
35+
F is the Twitter Future.
36+
37+
### LockingWorker
38+
39+
The LockingWorker trait must be implemented and provided to the
40+
Runner. LockingWorker expects implementations of 4 redis functions:
41+
42+
* setNx: to set a key only if it doesn't exist. A UUID is
43+
generated for each instantiation of the runner which is set
44+
as the key value.
45+
* expire: to expire the key based on the job's ttl
46+
* get: to get the value of the key
47+
* eval: which is used by threads that do not secure the log
48+
to verify a ttl is set on the key. It does this by calling
49+
ttl on the key and calling expire if no ttl is set.
50+
51+
An example implementation using
52+
[Finatra](https://twitter.github.io/finatra/) and
53+
[Catbird](https://github.com/travisbrown/catbird) is provided
54+
below.
55+
56+
```
57+
class RedisLockingWorker(redis: RedisClient) extends LockingWorker {
58+
val keyPrefix = "cron:namespace:"
59+
60+
override def setNx(key: String, value: String): IO[Boolean] =
61+
redis.setNx(keyPrefix + key, value).map(b => Boolean.unbox(b))
62+
63+
override def expire(key: String, ttl: Long): IO[Boolean] =
64+
redis.expire(keyPrefix + key, ttl).map(b => Boolean.unbox(b))
65+
66+
override def get(key: String): IO[String] =
67+
redis.get(keyPrefix + key).map(buf => optBufToStr(buf))
68+
69+
override def eval(script: String, key: String, ttl: Long): IO[Boolean] =
70+
redis.eval(script, Seq(keyPrefix + key), Seq(ttl.toString)).map(_ => true)
71+
72+
def optBufToStr(b: Option[Buf]): String = Utf8.unapply(b.get).get
73+
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
74+
implicit def futureToIO[A](f: Future[A]): IO[A] = rerunnableToIO(Rerunnable.fromFuture(f))
75+
implicit def strToBuf(s: String): Buf = Utf8.apply(s)
76+
implicit def seqStrToSeqBuf(strings: Seq[String]): Seq[Buf] = strings.map(strToBuf)
77+
}
78+
```

build.sbt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
name := "sp-cron"
2+
organization := "com.signalpath"
3+
version := "1.0"
4+
scalaVersion := "2.12.11"
5+
6+
libraryDependencies ++= Seq(
7+
"co.fs2" %% "fs2-core" % "2.2.1",
8+
"com.github.alonsodomin.cron4s" %% "cron4s-core" % "0.6.0",
9+
"eu.timepit" %% "fs2-cron-core" % "0.2.2",
10+
"org.typelevel" %% "cats-core" % "2.0.0",
11+
"org.scalatest" %% "scalatest" % "3.1.1" % Test,
12+
"org.scalamock" %% "scalamock" % "4.4.0" % Test,
13+
"org.scalatest" %% "scalatest" % "3.1.0" % Test
14+
)
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.signalpath.spcron
2+
3+
import cats.effect.IO
4+
5+
trait LockingWorker {
6+
7+
def setNx(key: String, value: String): IO[Boolean]
8+
def expire(key: String, ttl: Long): IO[Boolean]
9+
def get(key: String): IO[String]
10+
def eval(script: String, key: String, ttl: Long): IO[Boolean]
11+
12+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package com.signalpath.spcron
2+
3+
import cats.effect.{Async, IO, Timer}
4+
import cron4s.Cron
5+
import eu.timepit.fs2cron.awakeEveryCron
6+
import fs2.{Stream => fs2Stream}
7+
8+
import scala.concurrent.ExecutionContext
9+
10+
case class JobDefinition(
11+
cronExpression: String,
12+
name: String,
13+
job: IO[Unit],
14+
ttl: Long) {
15+
16+
def cron: cron4s.CronExpr = Cron.unsafeParse(cronExpression)
17+
}
18+
19+
class Runner(lock: LockingWorker) {
20+
21+
val runnerId = java.util.UUID.randomUUID().toString
22+
23+
def start(jobs: Seq[JobDefinition]): Unit = {
24+
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
25+
val streams = jobs.map { job =>
26+
awakeEveryCron[IO](job.cron) >> worker(job.job, job.name, job.ttl)
27+
}
28+
streams
29+
//.dropRight(1)
30+
.map(j => j.compile.drain.unsafeRunAsync((cb: Either[Throwable, Unit]) => Unit))
31+
()
32+
}
33+
34+
def worker(task: IO[Unit], name: String, ttl: Long): fs2.Stream[IO, fs2.INothing] = {
35+
val async: IO[Unit] = Async[IO].async { cb =>
36+
getLock(name, ttl).flatMap { isOwner =>
37+
if(isOwner) task
38+
else IO(())
39+
}.unsafeRunSync()
40+
}
41+
fs2Stream.eval_(IO(async.unsafeRunAsync{
42+
case Left(error) =>
43+
throw error
44+
()
45+
case Right(unit) => unit
46+
}))
47+
}
48+
49+
def getLock(name: String, ttl: Long): IO[Boolean] = {
50+
lock.setNx(name, runnerId).flatMap { setnxResult =>
51+
if (setnxResult) {
52+
lock.expire(name, ttl).map(_ => true)
53+
} else {
54+
lock.get(name).flatMap { owner =>
55+
val ownerCheck = runnerId.equalsIgnoreCase(owner)
56+
lock.eval(
57+
"local ttl = redis.call('ttl', KEYS[1]) if ttl <= 0 then redis.call('expire', KEYS[1], ARGV[1]) end return ttl",
58+
name,
59+
ttl).map(_ => ownerCheck)
60+
}
61+
}
62+
}
63+
}
64+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package com.signalpath.spcron
2+
3+
import cats.effect.{IO, Timer}
4+
import eu.timepit.fs2cron.awakeEveryCron
5+
import org.scalamock.scalatest.MockFactory
6+
import org.scalatest.funspec.AnyFunSpec
7+
import org.scalatest.matchers.should.Matchers
8+
9+
import scala.concurrent.ExecutionContext
10+
import scala.concurrent.duration.Duration
11+
12+
class RunnerTest extends AnyFunSpec with MockFactory with Matchers {
13+
14+
describe("getLock") {
15+
describe("when the key does not exist") {
16+
it("should set a ttl on key and return true") {
17+
val mockLock = mock[LockingWorker]
18+
(mockLock.setNx _).expects("test", *).returning(IO(true)).once()
19+
(mockLock.expire _).expects("test", 42).returning(IO(true)).once()
20+
val cron = new Runner(mockLock)
21+
val actual = cron.getLock("test", 42).unsafeRunSync()
22+
actual should equal(true)
23+
}
24+
}
25+
26+
describe("when the key already exists") {
27+
it("should return true when key is owned by runner") {
28+
val mockLock = mock[LockingWorker]
29+
val cron = new Runner(mockLock)
30+
(mockLock.setNx _).expects("test", *).returning(IO(false)).once()
31+
(mockLock.get _).expects("test").returning(IO(cron.runnerId)).once()
32+
(mockLock.eval _).expects(*, "test", 42).returning(IO(true)).once()
33+
val actual = cron.getLock("test", 42).unsafeRunSync()
34+
actual should equal(true)
35+
}
36+
37+
it("should return false when key is owned by a different runner") {
38+
val mockLock = mock[LockingWorker]
39+
val cron = new Runner(mockLock)
40+
(mockLock.setNx _).expects("test", *).returning(IO(false)).once()
41+
(mockLock.get _).expects("test").returning(IO("nope")).once()
42+
(mockLock.eval _).expects(*, "test", 42).returning(IO(true)).once()
43+
val actual = cron.getLock("test", 42).unsafeRunSync()
44+
actual should equal(false)
45+
}
46+
}
47+
}
48+
49+
describe("worker") {
50+
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
51+
describe("when it secures the lock") {
52+
it("should run the job") {
53+
var wasRun = false
54+
val j = IO({
55+
wasRun = true
56+
()
57+
})
58+
val job = JobDefinition("* * * ? * *", "test", j, 1)
59+
val mockLock = mock[LockingWorker]
60+
val cron = new Runner(mockLock)
61+
(mockLock.setNx _).expects(job.name, *).returning(IO(true)).atLeastOnce()
62+
(mockLock.expire _).expects(job.name, job.ttl).returning(IO(true)).atLeastOnce()
63+
val runner = new Runner(mockLock)
64+
val worker = runner.worker(job.job, job.name, job.ttl)
65+
(awakeEveryCron[IO](job.cron) >> worker).compile.drain.unsafeRunTimed(Duration(1, "second"))
66+
wasRun should equal(true)
67+
}
68+
}
69+
describe("when it does not secure the lock") {
70+
it("should not run the job") {
71+
var wasRun = false
72+
val j = IO({
73+
wasRun = true
74+
()
75+
})
76+
val job = JobDefinition("* * * ? * *", "test", j, 1)
77+
val mockLock = mock[LockingWorker]
78+
val cron = new Runner(mockLock)
79+
(mockLock.setNx _).expects(job.name, *).returning(IO(false)).atLeastOnce()
80+
(mockLock.get _).expects(job.name).returning(IO("nope")).atLeastOnce()
81+
(mockLock.eval _).expects(*, job.name, job.ttl).returning(IO(true)).atLeastOnce()
82+
val runner = new Runner(mockLock)
83+
val worker = runner.worker(job.job, job.name, job.ttl)
84+
(awakeEveryCron[IO](job.cron) >> worker).compile.drain.unsafeRunTimed(Duration(1, "second"))
85+
wasRun should equal(false)
86+
}
87+
}
88+
}
89+
90+
}

0 commit comments

Comments
 (0)