Skip to content

Commit db5fa3c

Browse files
committed
Refactoring: remove usage of MTL
1 parent 46426f7 commit db5fa3c

File tree

4 files changed

+55
-83
lines changed

4 files changed

+55
-83
lines changed

core/src/main/scala/com/evolutiongaming/kafka/flow/KeyContext.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package com.evolutiongaming.kafka.flow
22

33
import cats.effect.{Ref, Resource}
4-
import cats.mtl.Stateful
54
import cats.syntax.all._
65
import cats.{Applicative, Monad}
76
import com.evolutiongaming.catshelper.Log
8-
import com.evolutiongaming.kafka.flow.effect.CatsEffectMtlInstances._
97
import com.evolutiongaming.skafka.Offset
108

119
/** Key specific metainformation inside of parititon.
@@ -32,11 +30,11 @@ object KeyContext {
3230

3331
def of[F[_]: Ref.Make: Monad: Log](removeFromCache: F[Unit]): F[KeyContext[F]] =
3432
Ref.of[F, Option[Offset]](None) map { storage =>
35-
KeyContext(storage.stateInstance, removeFromCache)
33+
KeyContext(storage, removeFromCache)
3634
}
3735

3836
def apply[F[_]: Monad: Log](
39-
storage: Stateful[F, Option[Offset]],
37+
storage: Ref[F, Option[Offset]],
4038
removeFromCache: F[Unit]
4139
): KeyContext[F] = new KeyContext[F] {
4240
def holding = storage.get
Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package com.evolutiongaming.kafka.flow.key
22

33
import cats.effect.{Ref, Sync}
4-
import cats.mtl.Stateful
54
import cats.syntax.all._
65
import cats.{Applicative, Monad}
76
import com.evolutiongaming.catshelper.LogOf
8-
import com.evolutiongaming.kafka.flow.effect.CatsEffectMtlInstances._
97
import com.evolutiongaming.skafka.TopicPartition
108
import com.evolutiongaming.sstream.Stream
119

@@ -27,31 +25,31 @@ object KeyDatabase {
2725

2826
/** Creates in-memory database implementation */
2927
def memory[F[_]: Sync, K]: F[KeyDatabase[F, K]] =
30-
Ref.of[F, Set[K]](Set.empty[K]) map { storage =>
31-
memory(storage.stateInstance)
32-
}
28+
Ref.of[F, Set[K]](Set.empty).map(s => memory(s))
3329

3430
/** Creates in-memory database implementation */
35-
def memory[F[_]: Monad, K](storage: Stateful[F, Set[K]]): KeyDatabase[F, K] =
36-
new KeyDatabase[F, K] {
31+
def memory[F[_]: Monad, K](storage: Ref[F, Set[K]]): KeyDatabase[F, K] = new FromMemory(storage)
3732

38-
def persist(key: K) =
39-
storage modify (_ + key)
33+
def empty[F[_]: Applicative, K]: KeyDatabase[F, K] = new Empty
4034

41-
def delete(key: K) =
42-
storage modify (_ - key)
35+
private final class FromMemory[F[_]: Monad, K](storage: Ref[F, Set[K]]) extends KeyDatabase[F, K] {
4336

44-
def all(applicationId: String, groupId: String, topicPartition: TopicPartition) =
45-
Stream.lift(storage.get) flatMap { keys =>
46-
Stream.from(keys.toList)
47-
}
37+
def persist(key: K) = storage.update(_ + key)
4838

49-
}
39+
def delete(key: K) = storage.update(_ - key)
5040

51-
def empty[F[_]: Applicative, K]: KeyDatabase[F, K] =
52-
new KeyDatabase[F, K] {
53-
def persist(key: K) = ().pure
54-
def delete(key: K) = ().pure
55-
def all(applicationId: String, groupId: String, topicPartition: TopicPartition) = Stream.empty
56-
}
41+
def all(applicationId: String, groupId: String, topicPartition: TopicPartition) =
42+
Stream.lift(storage.get) flatMap { keys =>
43+
Stream.from(keys.toList)
44+
}
45+
}
46+
47+
private final class Empty[F[_]: Applicative, K] extends KeyDatabase[F, K] {
48+
49+
def persist(key: K) = ().pure
50+
51+
def delete(key: K) = ().pure
52+
53+
def all(applicationId: String, groupId: String, topicPartition: TopicPartition) = Stream.empty
54+
}
5755
}
Lines changed: 25 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,77 +1,47 @@
11
package com.evolutiongaming.kafka.flow.key
22

3-
import cats.data.State
4-
import cats.mtl.Stateful
3+
import cats.effect._
4+
import cats.effect.kernel.Ref
55
import com.evolutiongaming.catshelper.Log
6-
import com.evolutiongaming.kafka.flow.key.KeysSpec._
76
import munit.FunSuite
87

98
class KeysSpec extends FunSuite {
109

11-
test("Keys add key to a database on flush") {
12-
13-
val f = new ConstFixture
14-
15-
// Given("empty database")
16-
val database = KeyDatabase.memory(f.database)
17-
val keys = Keys("key1", database)
10+
implicit val log: Log[IO] = Log.empty[IO]
1811

19-
// When("Keys is flushed")
20-
val program = keys.flush
21-
22-
val result = program.runS(Set.empty).value
12+
test("Keys add key to a database on flush") {
13+
for {
14+
ref <- Ref.of[IO, Set[String]](Set.empty)
15+
db = KeyDatabase.memory(ref)
2316

24-
// Then("state gets into database")
25-
assert(result == Set("key1"))
17+
keys = Keys("key1", db)
18+
_ <- keys.flush
2619

20+
state <- ref.get
21+
} yield assertEquals(state, Set("key1"))
2722
}
2823

2924
test("Keys delete a key from a database when requested") {
25+
for {
26+
ref <- Ref.of[IO, Set[String]](Set("key1"))
27+
db = KeyDatabase.memory(ref)
28+
snapshots = Keys("key1", db)
3029

31-
val f = new ConstFixture
32-
33-
// Given("database with contents")
34-
val database = KeyDatabase.memory(f.database)
35-
val snapshots = Keys("key1", database)
36-
val context = Set("key1")
37-
38-
// When("delete is requested")
39-
val program = snapshots.delete(true)
40-
val result = program.runS(context).value
41-
42-
// Then("key is deleted")
43-
assert(result.isEmpty)
30+
_ <- snapshots.delete(true)
4431

32+
state <- ref.get
33+
} yield assert(state.isEmpty)
4534
}
4635

4736
test("Keys do not delete a key from a database when not requested") {
37+
for {
38+
ref <- Ref.of[IO, Set[String]](Set("key1"))
39+
db = KeyDatabase.memory(ref)
40+
snapshots = Keys("key1", db)
4841

49-
val f = new ConstFixture
50-
51-
// Given("database with contents")
52-
val database = KeyDatabase.memory(f.database)
53-
val snapshots = Keys("key1", database)
54-
val context = Set("key1")
55-
56-
// When("delete is requested")
57-
val program = snapshots.delete(false)
58-
val result = program.runS(context).value
59-
60-
// Then("key is not deleted")
61-
assert(result.nonEmpty)
42+
_ <- snapshots.delete(false)
6243

44+
state <- ref.get
45+
} yield assertEquals(state, Set("key1"))
6346
}
64-
65-
}
66-
67-
object KeysSpec {
68-
69-
type F[T] = State[Set[String], T]
70-
71-
class ConstFixture {
72-
val database = Stateful[F, Set[String]]
73-
}
74-
75-
implicit val log: Log[F] = Log.empty[F]
76-
7747
}

core/src/test/scala/com/evolutiongaming/kafka/flow/timer/TimerFlowOfSpec.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -527,12 +527,13 @@ class TimerFlowOfSpec extends FunSuite {
527527
_ <- f.contextRef.set(context)
528528
_ <- program
529529
result <- f.contextRef.get
530+
contextHolding <- f.contextHoldingRef.get
530531
} yield {
531532
// Then("flush and remove never happen")
532533
assertEquals(result.flushed, 0)
533534
assertEquals(result.removed, 0)
534535
// And("the offset of the last successful persist will be held")
535-
assertEquals(result.holding, Some(Offset.unsafe(100)))
536+
assertEquals(contextHolding, Some(Offset.unsafe(100)))
536537
}
537538

538539
testIO.unsafeRunSync()
@@ -567,9 +568,14 @@ object TimerFlowSpec {
567568
val contextRef: Ref[IO, Context] =
568569
Ref.unsafe[IO, Context](Context(timestamps = TimestampState(current = timestamp)))
569570

571+
val contextHoldingRef = Ref.lens(contextRef)(
572+
_.holding,
573+
(context: Context) => (offset: Option[Offset]) => context.copy(holding = offset)
574+
)
575+
570576
implicit val keyContext: KeyContext[IO] =
571577
KeyContext(
572-
storage = contextRef.stateInstance.focus(Context.lens(_.holding)),
578+
storage = contextHoldingRef,
573579
removeFromCache = contextRef.update(ctx => ctx.copy(removed = ctx.removed + 1))
574580
)
575581

0 commit comments

Comments
 (0)