Skip to content

Commit aafe535

Browse files
authored
Merge pull request #9 from reugn/develop
v0.6.0
2 parents a772b30 + 93b7fd1 commit aafe535

File tree

11 files changed

+78
-78
lines changed

11 files changed

+78
-78
lines changed

.github/workflows/build.yml

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020

2121
steps:
2222
- name: Checkout
23-
uses: actions/checkout@v2
23+
uses: actions/checkout@v3
2424

2525
- name: Setup Java and Scala
2626
uses: olafurpg/setup-scala@v12
@@ -30,17 +30,13 @@ jobs:
3030
- name: Set up Aerospike Database
3131
uses: reugn/github-action-aerospike@v1
3232

33-
- name: Cache sbt
34-
uses: actions/cache@v2
33+
- name: Cache SBT
34+
uses: actions/cache@v3
3535
with:
3636
path: |
37-
~/.sbt
3837
~/.ivy2/cache
39-
~/.coursier/cache/v1
40-
~/.cache/coursier/v1
41-
~/AppData/Local/Coursier/Cache/v1
42-
~/Library/Caches/Coursier/v1
43-
key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
38+
~/.sbt
39+
key: ${{ runner.os }}-sbt-${{ hashFiles('**/build.sbt') }}
4440

4541
- name: Build and test
4642
run: sbt ++${{ matrix.scala }} test

aerospike-core/src/main/scala/io/github/reugn/aerospike/scala/AerospikeClientBuilder.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package io.github.reugn.aerospike.scala
22

3+
import com.aerospike.client.metrics.MetricsPolicy
34
import com.aerospike.client.policy.{AuthMode, ClientPolicy}
45
import com.aerospike.client.{AerospikeClient, Host, IAerospikeClient}
56
import com.typesafe.config.Config
67
import io.github.reugn.aerospike.scala.AerospikeClientBuilder._
78
import io.github.reugn.aerospike.scala.Policies.ClientPolicyImplicits._
89

9-
class AerospikeClientBuilder(config: Config) {
10+
class AerospikeClientBuilder(config: Config, metricsPolicy: Option[MetricsPolicy]) {
1011

1112
private def buildClientPolicy(): ClientPolicy = {
1213
val policy = new ClientPolicy();
@@ -33,20 +34,22 @@ class AerospikeClientBuilder(config: Config) {
3334
}
3435

3536
def build(): IAerospikeClient = {
36-
Option(config.getString("aerospike.hostList")) map {
37+
val client = Option(config.getString("aerospike.hostList")) map {
3738
hostList =>
3839
new AerospikeClient(buildClientPolicy(), Host.parseHosts(hostList, defaultPort): _*)
3940
} getOrElse {
4041
val hostname = Option(config.getString("aerospike.hostname")).getOrElse(defaultHostName)
4142
val port = Option(config.getInt("aerospike.port")).getOrElse(defaultPort)
4243
new AerospikeClient(buildClientPolicy(), hostname, port)
4344
}
45+
metricsPolicy.foreach(policy => client.enableMetrics(policy))
46+
client
4447
}
4548
}
4649

4750
object AerospikeClientBuilder {
4851
private[aerospike] val defaultHostName = "localhost"
4952
private[aerospike] val defaultPort = 3000
5053

51-
def apply(config: Config): AerospikeClientBuilder = new AerospikeClientBuilder(config)
54+
def apply(config: Config): AerospikeClientBuilder = new AerospikeClientBuilder(config, None)
5255
}

aerospike-core/src/main/scala/io/github/reugn/aerospike/scala/EventLoopProvider.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package io.github.reugn.aerospike.scala
22

33
import com.aerospike.client.async.{EventLoop, EventLoops, EventPolicy, NettyEventLoops}
44
import io.github.reugn.aerospike.scala.util.{Linux, Mac, OperatingSystem}
5-
import io.netty.channel.epoll.EpollEventLoopGroup
6-
import io.netty.channel.kqueue.KQueueEventLoopGroup
5+
import io.netty.channel.epoll.{Epoll, EpollEventLoopGroup}
6+
import io.netty.channel.kqueue.{KQueue, KQueueEventLoopGroup}
77
import io.netty.channel.nio.NioEventLoopGroup
88

99
object EventLoopProvider {
@@ -12,9 +12,9 @@ object EventLoopProvider {
1212

1313
private[scala] lazy val eventLoops: EventLoops = {
1414
val eventLoopGroup = OperatingSystem() match {
15-
case Linux =>
15+
case Linux if Epoll.isAvailable =>
1616
new EpollEventLoopGroup(nThreads)
17-
case Mac =>
17+
case Mac if KQueue.isAvailable =>
1818
new KQueueEventLoopGroup(nThreads)
1919
case _ =>
2020
new NioEventLoopGroup(nThreads)

aerospike-core/src/main/scala/io/github/reugn/aerospike/scala/model/QueryStatement.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,21 @@ case class QueryStatement(
77
namespace: String,
88
setName: Option[String] = None,
99
binNames: Option[Seq[String]] = None,
10-
secondaryIndexName: Option[String] = None,
1110
secondaryIndexFilter: Option[Filter] = None,
1211
partitionFilter: Option[PartitionFilter] = None,
1312
operations: Option[Seq[Operation]] = None,
1413
maxRecords: Option[Long] = None,
1514
recordsPerSecond: Option[Int] = None
1615
) {
17-
1816
lazy val statement: Statement = {
1917
val statement: Statement = new Statement
2018
statement.setNamespace(namespace)
2119
setName.foreach(statement.setSetName)
2220
binNames.foreach(bins => statement.setBinNames(bins: _*))
23-
secondaryIndexName.foreach(statement.setIndexName)
2421
secondaryIndexFilter.foreach(statement.setFilter)
2522
operations.foreach(ops => statement.setOperations(ops.toArray))
2623
maxRecords.foreach(statement.setMaxRecords)
2724
recordsPerSecond.foreach(statement.setRecordsPerSecond)
2825
statement
2926
}
30-
31-
lazy val isScan: Boolean = secondaryIndexName.isEmpty || secondaryIndexFilter.isEmpty
3227
}

aerospike-core/src/test/scala/io/github/reugn/aerospike/scala/AerospikeHandlerTest.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,6 @@ class AerospikeHandlerTest extends AsyncFlatSpec with TestCommon with Matchers w
197197
val queryStatement = QueryStatement(
198198
namespace,
199199
setName = Some(set),
200-
secondaryIndexName = Some("idx1"),
201200
secondaryIndexFilter = Some(Filter.equal("bin1", 1))
202201
)
203202
assertThrows[AerospikeException] {

aerospike-zio/src/main/scala/io/github/reugn/aerospike/scala/zioeffect/ZioAerospikeHandler.scala

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import com.aerospike.client.task.ExecuteTask
88
import com.typesafe.config.Config
99
import io.github.reugn.aerospike.scala._
1010
import io.github.reugn.aerospike.scala.model.QueryStatement
11-
import zio.Task
1211
import zio.stream.ZStream
12+
import zio.{Task, ZIO}
1313

1414
import java.util.Calendar
1515
import scala.collection.JavaConverters.seqAsJavaListConverter
@@ -19,49 +19,49 @@ class ZioAerospikeHandler(protected val client: IAerospikeClient)
1919
with StreamHandler3[ZStream] {
2020

2121
override def put(key: Key, bins: Bin*)(implicit policy: WritePolicy): Task[Key] = {
22-
Task(client.put(policy, key, bins: _*)).map(_ => key)
22+
ZIO.attemptBlocking(client.put(policy, key, bins: _*)).map(_ => key)
2323
}
2424

2525
override def append(key: Key, bins: Bin*)(implicit policy: WritePolicy): Task[Key] = {
26-
Task(client.append(policy, key, bins: _*)).map(_ => key)
26+
ZIO.attemptBlocking(client.append(policy, key, bins: _*)).map(_ => key)
2727
}
2828

2929
override def prepend(key: Key, bins: Bin*)(implicit policy: WritePolicy): Task[Key] = {
30-
Task(client.prepend(policy, key, bins: _*)).map(_ => key)
30+
ZIO.attemptBlocking(client.prepend(policy, key, bins: _*)).map(_ => key)
3131
}
3232

3333
override def add(key: Key, bins: Bin*)(implicit policy: WritePolicy): Task[Key] = {
34-
Task(client.add(policy, key, bins: _*)).map(_ => key)
34+
ZIO.attemptBlocking(client.add(policy, key, bins: _*)).map(_ => key)
3535
}
3636

3737
override def delete(key: Key)(implicit policy: WritePolicy): Task[Boolean] = {
38-
Task(client.delete(policy, key))
38+
ZIO.attemptBlocking(client.delete(policy, key))
3939
}
4040

4141
override def deleteBatch(keys: Seq[Key])
4242
(implicit policy: BatchPolicy, batchDeletePolicy: BatchDeletePolicy): Task[BatchResults] = {
43-
Task(client.delete(policy, batchDeletePolicy, keys.toArray))
43+
ZIO.attemptBlocking(client.delete(policy, batchDeletePolicy, keys.toArray))
4444
}
4545

4646
override def truncate(ns: String, set: String, beforeLastUpdate: Option[Calendar] = None)
4747
(implicit policy: InfoPolicy): Task[Unit] = {
48-
Task(client.truncate(policy, ns, set, beforeLastUpdate.orNull))
48+
ZIO.attemptBlocking(client.truncate(policy, ns, set, beforeLastUpdate.orNull))
4949
}
5050

5151
override def touch(key: Key)(implicit policy: WritePolicy): Task[Key] = {
52-
Task(client.touch(policy, key)).map(_ => key)
52+
ZIO.attemptBlocking(client.touch(policy, key)).map(_ => key)
5353
}
5454

5555
override def exists(key: Key)(implicit policy: Policy): Task[Boolean] = {
56-
Task(client.exists(policy, key))
56+
ZIO.attemptBlocking(client.exists(policy, key))
5757
}
5858

5959
override def existsBatch(keys: Seq[Key])(implicit policy: BatchPolicy): Task[Seq[Boolean]] = {
60-
Task(client.exists(policy, keys.toArray)).map(_.toIndexedSeq)
60+
ZIO.attemptBlocking(client.exists(policy, keys.toArray)).map(_.toIndexedSeq)
6161
}
6262

6363
override def get(key: Key, binNames: String*)(implicit policy: Policy): Task[Record] = {
64-
Task {
64+
ZIO.attemptBlocking {
6565
if (binNames.toArray.length > 0)
6666
client.get(policy, key, binNames: _*)
6767
else
@@ -70,7 +70,7 @@ class ZioAerospikeHandler(protected val client: IAerospikeClient)
7070
}
7171

7272
override def getBatch(keys: Seq[Key], binNames: String*)(implicit policy: BatchPolicy): Task[Seq[Record]] = {
73-
Task {
73+
ZIO.attemptBlocking {
7474
if (binNames.toArray.length > 0)
7575
client.get(policy, keys.toArray, binNames: _*)
7676
else
@@ -81,33 +81,33 @@ class ZioAerospikeHandler(protected val client: IAerospikeClient)
8181
}
8282

8383
override def getBatchOp(keys: Seq[Key], operations: Operation*)(implicit policy: BatchPolicy): Task[Seq[Record]] = {
84-
Task(client.get(policy, keys.toArray, operations: _*))
84+
ZIO.attemptBlocking(client.get(policy, keys.toArray, operations: _*))
8585
}
8686

8787
override def getHeader(key: Key)(implicit policy: Policy): Task[Record] = {
88-
Task(client.getHeader(policy, key))
88+
ZIO.attemptBlocking(client.getHeader(policy, key))
8989
}
9090

9191
override def getHeaderBatch(keys: Seq[Key])(implicit policy: BatchPolicy): Task[Seq[Record]] = {
92-
Task(client.getHeader(policy, keys.toArray)).map(_.toIndexedSeq)
92+
ZIO.attemptBlocking(client.getHeader(policy, keys.toArray)).map(_.toIndexedSeq)
9393
}
9494

9595
override def operate(key: Key, operations: Operation*)(implicit policy: WritePolicy): Task[Record] = {
96-
Task(client.operate(policy, key, operations: _*))
96+
ZIO.attemptBlocking(client.operate(policy, key, operations: _*))
9797
}
9898

9999
override def operateBatch(keys: Seq[Key], operations: Operation*)
100100
(implicit policy: BatchPolicy, batchWritePolicy: BatchWritePolicy): Task[BatchResults] = {
101-
Task(client.operate(policy, batchWritePolicy, keys.toArray, operations: _*))
101+
ZIO.attemptBlocking(client.operate(policy, batchWritePolicy, keys.toArray, operations: _*))
102102
}
103103

104104
override def operateBatchRecord(records: Seq[BatchRecord])(implicit policy: BatchPolicy): Task[Boolean] = {
105-
Task(client.operate(policy, records.asJava))
105+
ZIO.attemptBlocking(client.operate(policy, records.asJava))
106106
}
107107

108108
override def scanNodeName(nodeName: String, ns: String, set: String, binNames: String*)
109109
(implicit policy: ScanPolicy): Task[List[KeyRecord]] = {
110-
Task {
110+
ZIO.attemptBlocking {
111111
val callback = RecordScanCallback()
112112
client.scanNode(policy, nodeName, ns, set, callback, binNames: _*)
113113
callback.getRecordSet
@@ -116,7 +116,7 @@ class ZioAerospikeHandler(protected val client: IAerospikeClient)
116116

117117
override def scanNode(node: Node, ns: String, set: String, binNames: String*)
118118
(implicit policy: ScanPolicy): Task[List[KeyRecord]] = {
119-
Task {
119+
ZIO.attemptBlocking {
120120
val callback = RecordScanCallback()
121121
client.scanNode(policy, node, ns, set, callback, binNames: _*)
122122
callback.getRecordSet
@@ -125,11 +125,11 @@ class ZioAerospikeHandler(protected val client: IAerospikeClient)
125125

126126
override def execute(statement: Statement, operations: Operation*)
127127
(implicit policy: WritePolicy): Task[ExecuteTask] = {
128-
Task(client.execute(policy, statement, operations: _*))
128+
ZIO.attemptBlocking(client.execute(policy, statement, operations: _*))
129129
}
130130

131131
override def info(node: Node, name: String): Task[String] = {
132-
Task(Info.request(node, name))
132+
ZIO.attemptBlocking(Info.request(node, name))
133133
}
134134

135135
override def query(statement: QueryStatement)

0 commit comments

Comments
 (0)