Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package zio.kafka.bench.comparison

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import zio.kafka.admin.AdminClient.{ NewTopic, TopicPartition }
import zio.kafka.admin.AdminClient.NewTopic
import zio.kafka.bench.ConsumerZioBenchmark
import zio.kafka.bench.ZioBenchmark.randomThing
import zio.kafka.bench.comparison.ComparisonBenchmark._
Expand All @@ -15,7 +16,7 @@ import scala.jdk.CollectionConverters._
trait ComparisonBenchmark extends ConsumerZioBenchmark[Env] {

protected final val topicPartitions: List[TopicPartition] =
(0 until partitionCount).map(TopicPartition(topic1, _)).toList
(0 until partitionCount).map(new TopicPartition(topic1, _)).toList

private val javaKafkaConsumer: ZLayer[ConsumerSettings, Throwable, LowLevelKafka] =
ZLayer.scoped {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class KafkaClientBenchmarks extends ComparisonBenchmark {
ZIO
.serviceWithZIO[LowLevelKafka] { consumer =>
ZIO.attemptBlocking {
consumer.assign(topicPartitions.map(_.asJava).asJava)
consumer.assign(topicPartitions.asJava)

var count = 0L
while (count < recordCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ZioKafkaBenchmarks extends ComparisonBenchmark {
ZIO.serviceWithZIO[Consumer] { consumer =>
consumer
.plainStream(
Subscription.manual(topicPartitions.map(tp => tp.name -> tp.partition): _*),
Subscription.manual(topicPartitions.map(tp => tp.topic() -> tp.partition()): _*),
Serde.byteArray,
Serde.byteArray
)
Expand Down
22 changes: 14 additions & 8 deletions zio-kafka-test/src/test/scala/zio/kafka/admin/AdminSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package zio.kafka.admin
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
import org.apache.kafka.clients.admin.{ ConfigEntry, RecordsToDelete }
import org.apache.kafka.clients.consumer.ConsumerRecord
import zio.kafka.admin.internal.JavaConverters._
import org.apache.kafka.common.{ Node => JNode }
import zio._
import zio.kafka.ZIOSpecDefaultSlf4j
Expand Down Expand Up @@ -440,28 +441,33 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
)
},
test("should correctly handle no node (null) when converting JNode to Node") {
assert(AdminClient.Node.fromJava(null))(isNone)
val result = JNodeAsScala(null).asScala
assertTrue(result == None)
},
test("should correctly handle noNode when converting JNode to Node") {
assert(AdminClient.Node.fromJava(JNode.noNode()))(isNone)
val result = JNodeAsScala(JNode.noNode()).asScala
assertTrue(result == None)
},
test("should correctly keep all information when converting a valid jNode to Node") {
val posIntGen = Gen.int(0, Int.MaxValue)
check(posIntGen, Gen.string1(Gen.char), posIntGen, Gen.option(Gen.string)) { (id, host, port, rack) =>
val jNode = new JNode(id, host, port, rack.orNull)
assertTrue(AdminClient.Node.fromJava(jNode).map(_.asJava) == Some(jNode))
val jNode = new JNode(id, host, port, rack.orNull)
val result = JNodeAsScala(jNode).asScala.map(_.asJava)
assertTrue(result == Some(jNode))
}
},
test("will replace invalid port by None") {
val posIntGen = Gen.int(0, Int.MaxValue)
check(posIntGen, Gen.string1(Gen.char), Gen.int, Gen.option(Gen.string)) { (id, host, port, rack) =>
val jNode = new JNode(id, host, port, rack.orNull)
assertTrue(AdminClient.Node.fromJava(jNode).map(_.port.isEmpty) == Some(port < 0))
val jNode = new JNode(id, host, port, rack.orNull)
val result = JNodeAsScala(jNode).asScala
assertTrue(result.map(_.port.isEmpty) == Some(port < 0))
}
},
test("will replace empty host by None") {
val jNode = new JNode(0, "", 9092, null)
assertTrue(AdminClient.Node.fromJava(jNode).map(_.host.isEmpty) == Some(true))
val jNode = new JNode(0, "", 9092, null)
val result = JNodeAsScala(jNode).asScala
assertTrue(result.map(_.host.isEmpty) == Some(true))
},
test("incremental alter configs") {
for {
Expand Down
Loading