diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala index b98b29c31..2caf8cc5b 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala @@ -1,8 +1,9 @@ package zio.kafka.bench.comparison import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.ByteArrayDeserializer -import zio.kafka.admin.AdminClient.{ NewTopic, TopicPartition } +import zio.kafka.admin.AdminClient.NewTopic import zio.kafka.bench.ConsumerZioBenchmark import zio.kafka.bench.ZioBenchmark.randomThing import zio.kafka.bench.comparison.ComparisonBenchmark._ @@ -15,7 +16,7 @@ import scala.jdk.CollectionConverters._ trait ComparisonBenchmark extends ConsumerZioBenchmark[Env] { protected final val topicPartitions: List[TopicPartition] = - (0 until partitionCount).map(TopicPartition(topic1, _)).toList + (0 until partitionCount).map(new TopicPartition(topic1, _)).toList private val javaKafkaConsumer: ZLayer[ConsumerSettings, Throwable, LowLevelKafka] = ZLayer.scoped { diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/KafkaClientBenchmarks.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/KafkaClientBenchmarks.scala index 031f0be8b..ae4274eeb 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/KafkaClientBenchmarks.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/KafkaClientBenchmarks.scala @@ -43,7 +43,7 @@ class KafkaClientBenchmarks extends ComparisonBenchmark { ZIO .serviceWithZIO[LowLevelKafka] { consumer => ZIO.attemptBlocking { - consumer.assign(topicPartitions.map(_.asJava).asJava) + consumer.assign(topicPartitions.asJava) var count = 0L while (count < recordCount) { diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ZioKafkaBenchmarks.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ZioKafkaBenchmarks.scala index 6af09af72..8ef3cfc9a 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ZioKafkaBenchmarks.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ZioKafkaBenchmarks.scala @@ -31,7 +31,7 @@ class ZioKafkaBenchmarks extends ComparisonBenchmark { ZIO.serviceWithZIO[Consumer] { consumer => consumer .plainStream( - Subscription.manual(topicPartitions.map(tp => tp.name -> tp.partition): _*), + Subscription.manual(topicPartitions.map(tp => tp.topic() -> tp.partition()): _*), Serde.byteArray, Serde.byteArray ) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/admin/AdminSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/admin/AdminSpec.scala index 51f9217f6..de2f965f2 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/admin/AdminSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/admin/AdminSpec.scala @@ -3,6 +3,7 @@ package zio.kafka.admin import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource import org.apache.kafka.clients.admin.{ ConfigEntry, RecordsToDelete } import org.apache.kafka.clients.consumer.ConsumerRecord +import zio.kafka.admin.internal.JavaConverters._ import org.apache.kafka.common.{ Node => JNode } import zio._ import zio.kafka.ZIOSpecDefaultSlf4j @@ -440,28 +441,33 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { ) }, test("should correctly handle no node (null) when converting JNode to Node") { - assert(AdminClient.Node.fromJava(null))(isNone) + val result = JNodeAsScala(null).asScala + assertTrue(result == None) }, test("should correctly handle noNode when converting JNode to Node") { - assert(AdminClient.Node.fromJava(JNode.noNode()))(isNone) + val result = JNodeAsScala(JNode.noNode()).asScala + assertTrue(result == None) }, test("should correctly keep all information when converting a valid jNode to Node") { val posIntGen = Gen.int(0, Int.MaxValue) check(posIntGen, Gen.string1(Gen.char), posIntGen, Gen.option(Gen.string)) { (id, host, port, rack) => - val jNode = new JNode(id, host, port, rack.orNull) - assertTrue(AdminClient.Node.fromJava(jNode).map(_.asJava) == Some(jNode)) + val jNode = new JNode(id, host, port, rack.orNull) + val result = JNodeAsScala(jNode).asScala.map(_.asJava) + assertTrue(result == Some(jNode)) } }, test("will replace invalid port by None") { val posIntGen = Gen.int(0, Int.MaxValue) check(posIntGen, Gen.string1(Gen.char), Gen.int, Gen.option(Gen.string)) { (id, host, port, rack) => - val jNode = new JNode(id, host, port, rack.orNull) - assertTrue(AdminClient.Node.fromJava(jNode).map(_.port.isEmpty) == Some(port < 0)) + val jNode = new JNode(id, host, port, rack.orNull) + val result = JNodeAsScala(jNode).asScala + assertTrue(result.map(_.port.isEmpty) == Some(port < 0)) } }, test("will replace empty host by None") { - val jNode = new JNode(0, "", 9092, null) - assertTrue(AdminClient.Node.fromJava(jNode).map(_.host.isEmpty) == Some(true)) + val jNode = new JNode(0, "", 9092, null) + val result = JNodeAsScala(jNode).asScala + assertTrue(result.map(_.host.isEmpty) == Some(true)) }, test("incremental alter configs") { for { diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala b/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala index f7f4fb919..0a4579167 100644 --- a/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala +++ b/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala @@ -1,64 +1,24 @@ package zio.kafka.admin -import org.apache.kafka.clients.admin.ListOffsetsResult.{ ListOffsetsResultInfo => JListOffsetsResultInfo } import org.apache.kafka.clients.admin.{ Admin => JAdmin, - AlterConfigOp => JAlterConfigOp, - AlterConfigsOptions => JAlterConfigsOptions, - AlterConsumerGroupOffsetsOptions => JAlterConsumerGroupOffsetsOptions, - Config => JConfig, - ConsumerGroupDescription => JConsumerGroupDescription, - CreatePartitionsOptions => JCreatePartitionsOptions, - CreateTopicsOptions => JCreateTopicsOptions, + ConfigEntry, DeleteAclsOptions => _, - DeleteConsumerGroupsOptions => JDeleteConsumerGroupsOptions, - DeleteRecordsOptions => JDeleteRecordsOptions, - DeleteTopicsOptions => JDeleteTopicsOptions, - DescribeClusterOptions => JDescribeClusterOptions, - DescribeConfigsOptions => JDescribeConfigsOptions, - DescribeConsumerGroupsOptions => JDescribeConsumerGroupsOptions, - DescribeTopicsOptions => JDescribeTopicsOptions, - GroupListing => JGroupListing, - ListConsumerGroupOffsetsOptions => JListConsumerGroupOffsetsOptions, - ListConsumerGroupOffsetsSpec => JListConsumerGroupOffsetsSpec, + DescribeClusterResult, ListGroupsOptions => JListGroupsOptions, - ListOffsetsOptions => JListOffsetsOptions, - ListTopicsOptions => JListTopicsOptions, - LogDirDescription => JLogDirDescription, - MemberDescription => JMemberDescription, - NewPartitions => JNewPartitions, - NewTopic => JNewTopic, - OffsetSpec => JOffsetSpec, - ReplicaInfo => JReplicaInfo, - TopicDescription => JTopicDescription, - TopicListing => JTopicListing, - _ + MemberToRemove, + RecordsToDelete, + RemoveMembersFromConsumerGroupOptions } -import org.apache.kafka.clients.consumer.{ OffsetAndMetadata => JOffsetAndMetadata } -import org.apache.kafka.common.config.{ ConfigResource => JConfigResource } import org.apache.kafka.common.errors.ApiException -import org.apache.kafka.common.{ - GroupState => JGroupState, - GroupType => JGroupType, - IsolationLevel => JIsolationLevel, - KafkaFuture, - Metric => JMetric, - MetricName => JMetricName, - Node => JNode, - TopicPartition => JTopicPartition, - TopicPartitionInfo => JTopicPartitionInfo, - Uuid -} +import org.apache.kafka.common.{ KafkaFuture, Uuid } import zio._ import zio.kafka.admin.acl._ +import zio.kafka.admin.internal.JavaConverters._ import zio.kafka.utils.SslHelper -import java.util.Optional -import scala.annotation.tailrec -import scala.collection.mutable -import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ -import scala.util.{ Failure, Success, Try } +import scala.util.{ Failure, Success } /** * Admin client, can be used to create, list, delete topics, consumer groups, etc. @@ -85,7 +45,7 @@ trait AdminClient { */ def deleteConsumerGroups( groupIds: Iterable[String], - options: Option[DeleteConsumerGroupOptions] = None + options: Option[DeleteConsumerGroupsOptions] = None ): Task[Unit] /** @@ -368,7 +328,7 @@ object AdminClient { */ override def deleteConsumerGroups( groupIds: Iterable[String], - options: Option[DeleteConsumerGroupOptions] + options: Option[DeleteConsumerGroupsOptions] ): Task[Unit] = { val asJava = groupIds.asJavaCollection fromKafkaFutureVoid { @@ -432,7 +392,7 @@ object AdminClient { .fold(adminClient.listTopics())(opts => adminClient.listTopics(opts.asJava)) .namesToListings() ) - }.map(_.asScala.map { case (k, v) => k -> TopicListing.fromJava(v) }.toMap) + }.map(_.asScala.map { case (k, v) => k -> v.asScala }.toMap) /** * Describe the specified topics. @@ -451,7 +411,7 @@ object AdminClient { }.flatMap { jTopicDescriptions => ZIO.fromTry { jTopicDescriptions.asScala.toList.forEach { case (k, v) => - AdminClient.TopicDescription.fromJava(v).map(k -> _) + JTopicDescriptionAsScala(v).asScala.map(k -> _) } .map(_.toMap) } @@ -474,7 +434,7 @@ object AdminClient { ) }.map { _.asScala.view.map { case (configResource, config) => - (ConfigResource.fromJava(configResource), KafkaConfig.fromJava(config)) + (configResource.asScala, config.asScala) }.toMap } } @@ -496,10 +456,10 @@ object AdminClient { .map { _.asScala.view.map { case (configResource, configFuture) => ( - ConfigResource.fromJava(configResource), + configResource.asScala, ZIO .fromCompletionStage(configFuture.toCompletionStage) - .map(config => KafkaConfig.fromJava(config)) + .map(config => config.asScala) ) }.toMap @@ -520,7 +480,7 @@ object AdminClient { ).flatMap { nodes => ZIO.fromTry { nodes.asScala.toList.forEach { jNode => - Node.fromJava(jNode) match { + jNode.asScala match { case Some(node) => Success(node) case None => Failure(new RuntimeException("NoNode not expected when listing cluster nodes")) } @@ -532,9 +492,7 @@ object AdminClient { * Get the cluster controller. */ override def describeClusterController(options: Option[DescribeClusterOptions] = None): Task[Option[Node]] = - fromKafkaFuture( - describeCluster(options).map(_.controller()) - ).map(Node.fromJava) + fromKafkaFuture(describeCluster(options).map(_.controller())).map(_.asScala) /** * Get the cluster id. @@ -554,7 +512,7 @@ object AdminClient { res <- describeCluster(options) opt <- fromKafkaFuture(ZIO.attempt(res.authorizedOperations())).map(Option(_)) lst <- ZIO.fromOption(opt.map(_.asScala.toSet)).orElseSucceed(Set.empty) - aclOperations = lst.map(AclOperation.apply) + aclOperations = lst.map(_.asScala) } yield aclOperations /** @@ -589,7 +547,7 @@ object AdminClient { .all() ) } - }.map(_.asScala.bimap(TopicPartition.fromJava, ListOffsetsResultInfo.fromJava).toMap) + }.map(_.asScala.bimap(_.asScala, _.asScala).toMap) /** * List offset for the specified partitions. @@ -609,8 +567,8 @@ object AdminClient { }.map { _.view.map { case (topicPartition, listOffsetResultInfoFuture) => ( - TopicPartition.fromJava(topicPartition), - ZIO.fromCompletionStage(listOffsetResultInfoFuture.toCompletionStage).map(ListOffsetsResultInfo.fromJava) + topicPartition.asScala, + ZIO.fromCompletionStage(listOffsetResultInfoFuture.toCompletionStage).map(_.asScala) ) }.toMap } @@ -632,7 +590,7 @@ object AdminClient { ) }.map { _.asScala.filter { case (_, om) => om ne null } - .bimap(TopicPartition.fromJava, OffsetAndMetadata.fromJava) + .bimap(_.asScala, _.asScala) .toMap } @@ -654,7 +612,7 @@ object AdminClient { _.asScala.map { case (groupId, offsets) => groupId -> offsets.asScala.filter { case (_, om) => om ne null } - .bimap(TopicPartition.fromJava, OffsetAndMetadata.fromJava) + .bimap(_.asScala, _.asScala) .toMap }.toMap } @@ -676,7 +634,7 @@ object AdminClient { _.asScala.map { case (groupId, offsets) => groupId -> offsets.asScala.filter { case (_, om) => om ne null } - .bimap(TopicPartition.fromJava, OffsetAndMetadata.fromJava) + .bimap(_.asScala, _.asScala) .toMap }.toMap } @@ -707,7 +665,7 @@ object AdminClient { override def metrics: Task[Map[MetricName, Metric]] = ZIO.attempt( adminClient.metrics().asScala.toMap.map { case (metricName, metric) => - (MetricName.fromJava(metricName), Metric.fromJava(metric)) + (metricName.asScala, metric.asScala) } ) @@ -727,7 +685,7 @@ object AdminClient { } result.all() } - }.map(_.asScala.map(ConsumerGroupListing.fromJava).toList) + }.map(_.asScala.map(_.asConsumerGroupListing).toList) /** * List the groups in the cluster. @@ -743,7 +701,7 @@ object AdminClient { } result.all() } - }.map(_.asScala.map(GroupListing.fromJava).toList) + }.map(_.asScala.map(_.asScala).toList) /** * Describe the specified consumer groups. @@ -766,7 +724,7 @@ object AdminClient { ) .all ) - ).map(_.asScala.map { case (k, v) => k -> ConsumerGroupDescription.fromJava(v) }.toMap) + ).map(_.asScala.map { case (k, v) => k -> v.asScala }.toMap) /** * Remove the specified members from a consumer group. @@ -802,7 +760,7 @@ object AdminClient { adminClient.describeLogDirs(brokersId.map(Int.box).asJavaCollection).allDescriptions() ) ).map { - _.asScala.bimap(_.intValue, _.asScala.bimap(identity, LogDirDescription.fromJava).toMap).toMap + _.asScala.bimap(_.intValue, _.asScala.bimap(identity, _.asScala).toMap).toMap } /** @@ -821,7 +779,7 @@ object AdminClient { brokerId.intValue(), ZIO .fromCompletionStage(descriptionsFuture.toCompletionStage) - .map(_.asScala.toMap.map { case (k, v) => (k, LogDirDescription.fromJava(v)) }) + .map(_.asScala.toMap.map { case (k, v) => (k, v.asScala) }) ) }.toMap } @@ -860,7 +818,7 @@ object AdminClient { .values() ) .map(_.asScala.map { case (configResource, kf) => - (ConfigResource.fromJava(configResource), ZIO.fromCompletionStage(kf.toCompletionStage).unit) + (configResource.asScala, ZIO.fromCompletionStage(kf.toCompletionStage).unit) }.toMap) override def describeAcls( @@ -958,139 +916,38 @@ object AdminClient { def fromKafkaFutureVoid[R](kfv: RIO[R, KafkaFuture[Void]]): RIO[R, Unit] = fromKafkaFuture(kfv).unit - final case class ConfigResource(`type`: ConfigResourceType, name: String) { - lazy val asJava: JConfigResource = new JConfigResource(`type`.asJava, name) - } - - object ConfigResource { - def fromJava(jcr: JConfigResource): ConfigResource = - ConfigResource(`type` = ConfigResourceType.fromJava(jcrt = jcr.`type`()), name = jcr.name()) - } - - trait ConfigResourceType { - def asJava: JConfigResource.Type - } + final case class ConfigResource(`type`: ConfigResourceType, name: String) + sealed trait ConfigResourceType object ConfigResourceType { - case object BrokerLogger extends ConfigResourceType { - override def asJava = JConfigResource.Type.BROKER_LOGGER - } - - case object Broker extends ConfigResourceType { - override def asJava = JConfigResource.Type.BROKER - } - - case object Topic extends ConfigResourceType { - override def asJava = JConfigResource.Type.TOPIC - } - - case object Unknown extends ConfigResourceType { - override def asJava = JConfigResource.Type.UNKNOWN - } - - case object ClientMetrics extends ConfigResourceType { - override def asJava = JConfigResource.Type.CLIENT_METRICS - } - - case object Group extends ConfigResourceType { - override def asJava = JConfigResource.Type.GROUP - } - - def fromJava(jcrt: JConfigResource.Type): ConfigResourceType = - jcrt match { - case JConfigResource.Type.BROKER_LOGGER => BrokerLogger - case JConfigResource.Type.BROKER => Broker - case JConfigResource.Type.TOPIC => Topic - case JConfigResource.Type.UNKNOWN => Unknown - case JConfigResource.Type.CLIENT_METRICS => ClientMetrics - case JConfigResource.Type.GROUP => Group - } - } - - sealed trait GroupState { - def asJava: JGroupState + case object BrokerLogger extends ConfigResourceType + case object Broker extends ConfigResourceType + case object Topic extends ConfigResourceType + case object Unknown extends ConfigResourceType + case object ClientMetrics extends ConfigResourceType + case object Group extends ConfigResourceType } + sealed trait GroupState object GroupState { - case object Unknown extends GroupState { - override def asJava: JGroupState = JGroupState.UNKNOWN - } - - case object PreparingRebalance extends GroupState { - override def asJava: JGroupState = JGroupState.PREPARING_REBALANCE - } - - case object CompletingRebalance extends GroupState { - override def asJava: JGroupState = JGroupState.COMPLETING_REBALANCE - } - - case object Stable extends GroupState { - override def asJava: JGroupState = JGroupState.STABLE - } - - case object Dead extends GroupState { - override def asJava: JGroupState = JGroupState.DEAD - } - - case object Empty extends GroupState { - override def asJava: JGroupState = JGroupState.EMPTY - } - - case object Assigning extends GroupState { - override def asJava: JGroupState = JGroupState.ASSIGNING - } - - case object Reconciling extends GroupState { - override def asJava: JGroupState = JGroupState.RECONCILING - } - - case object NotReady extends GroupState { - override def asJava: JGroupState = JGroupState.NOT_READY - } - - def fromJava(state: JGroupState): GroupState = - state match { - case JGroupState.UNKNOWN => GroupState.Unknown - case JGroupState.PREPARING_REBALANCE => GroupState.PreparingRebalance - case JGroupState.COMPLETING_REBALANCE => GroupState.CompletingRebalance - case JGroupState.STABLE => GroupState.Stable - case JGroupState.DEAD => GroupState.Dead - case JGroupState.EMPTY => GroupState.Empty - case JGroupState.ASSIGNING => GroupState.Assigning - case JGroupState.RECONCILING => GroupState.Reconciling - case JGroupState.NOT_READY => GroupState.NotReady - } - } - - sealed trait GroupType { - def asJava: JGroupType - } - + case object Unknown extends GroupState + case object PreparingRebalance extends GroupState + case object CompletingRebalance extends GroupState + case object Stable extends GroupState + case object Dead extends GroupState + case object Empty extends GroupState + case object Assigning extends GroupState + case object Reconciling extends GroupState + case object NotReady extends GroupState + } + + sealed trait GroupType object GroupType { - case object Unknown extends GroupType { - override def asJava = JGroupType.UNKNOWN - } - case object Consumer extends GroupType { - override def asJava = JGroupType.CONSUMER - } - case object Classic extends GroupType { - override def asJava = JGroupType.CLASSIC - } - case object Share extends GroupType { - override def asJava = JGroupType.SHARE - } - case object Streams extends GroupType { - override def asJava = JGroupType.STREAMS - } - - def fromJava(groupType: JGroupType): GroupType = - groupType match { - case JGroupType.UNKNOWN => GroupType.Unknown - case JGroupType.CONSUMER => GroupType.Consumer - case JGroupType.CLASSIC => GroupType.Classic - case JGroupType.SHARE => GroupType.Share - case JGroupType.STREAMS => GroupType.Streams - } + case object Unknown extends GroupType + case object Consumer extends GroupType + case object Classic extends GroupType + case object Share extends GroupType + case object Streams extends GroupType } final case class MemberDescription( @@ -1101,17 +958,6 @@ object AdminClient { assignment: Set[TopicPartition] ) - object MemberDescription { - def fromJava(desc: JMemberDescription): MemberDescription = - MemberDescription( - consumerId = desc.consumerId, - groupInstanceId = desc.groupInstanceId.toScala, - clientId = desc.clientId(), - host = desc.host(), - assignment = desc.assignment.topicPartitions().asScala.map(TopicPartition.fromJava).toSet - ) - } - final case class ConsumerGroupDescription( groupId: String, isSimpleConsumerGroup: Boolean, @@ -1122,175 +968,49 @@ object AdminClient { authorizedOperations: Set[AclOperation] ) - object ConsumerGroupDescription { - - def fromJava(description: JConsumerGroupDescription): ConsumerGroupDescription = - ConsumerGroupDescription( - groupId = description.groupId, - isSimpleConsumerGroup = description.isSimpleConsumerGroup, - members = description.members.asScala.map(MemberDescription.fromJava).toList, - partitionAssignor = description.partitionAssignor, - state = GroupState.fromJava(description.groupState), - coordinator = Node.fromJava(description.coordinator()), - authorizedOperations = Option(description.authorizedOperations()) - .fold(Set.empty[AclOperation])(_.asScala.map(AclOperation.apply).toSet) - ) - } - final case class CreatePartitionsOptions( validateOnly: Boolean = false, retryOnQuotaViolation: Boolean = true, timeout: Option[Duration] - ) { - def asJava: JCreatePartitionsOptions = { - val opts = new JCreatePartitionsOptions() - .validateOnly(validateOnly) - .retryOnQuotaViolation(retryOnQuotaViolation) - - timeout.fold(opts)(timeout => opts.timeoutMs(timeout.toMillis.toInt)) - } - } - - final case class CreateTopicsOptions(validateOnly: Boolean, timeout: Option[Duration]) { - def asJava: JCreateTopicsOptions = { - val opts = new JCreateTopicsOptions().validateOnly(validateOnly) - timeout.fold(opts)(timeout => opts.timeoutMs(timeout.toMillis.toInt)) - } - } - - final case class DeleteConsumerGroupOptions(timeout: Option[Duration]) { - def asJava: JDeleteConsumerGroupsOptions = { - val opts = new JDeleteConsumerGroupsOptions() - timeout.fold(opts)(timeout => opts.timeoutMs(timeout.toMillis.toInt)) - } - } - - final case class DeleteTopicsOptions(retryOnQuotaViolation: Boolean = true, timeout: Option[Duration]) { - def asJava: JDeleteTopicsOptions = { - val opts = new JDeleteTopicsOptions().retryOnQuotaViolation(retryOnQuotaViolation) - timeout.fold(opts)(timeout => opts.timeoutMs(timeout.toMillis.toInt)) - } - } - - final case class ListTopicsOptions(listInternal: Boolean = false, timeout: Option[Duration]) { - def asJava: JListTopicsOptions = { - val opts = new JListTopicsOptions().listInternal(listInternal) - timeout.fold(opts)(timeout => opts.timeoutMs(timeout.toMillis.toInt)) - } - } - - final case class DescribeTopicsOptions(includeAuthorizedOperations: Boolean, timeout: Option[Duration]) { - def asJava: JDescribeTopicsOptions = { - val opts = new JDescribeTopicsOptions().includeAuthorizedOperations(includeAuthorizedOperations) - timeout.fold(opts)(timeout => opts.timeoutMs(timeout.toMillis.toInt)) - } - } + ) + final case class CreateTopicsOptions(validateOnly: Boolean, timeout: Option[Duration]) + @deprecated(since = "3.1.0", message = "Use DeleteConsumerGroupsOptions") + type DeleteConsumerGroupOptions = DeleteConsumerGroupsOptions + final case class DeleteConsumerGroupsOptions(timeout: Option[Duration]) + final case class DeleteTopicsOptions(retryOnQuotaViolation: Boolean = true, timeout: Option[Duration]) + final case class ListTopicsOptions(listInternal: Boolean = false, timeout: Option[Duration]) + final case class DescribeTopicsOptions(includeAuthorizedOperations: Boolean, timeout: Option[Duration]) final case class DescribeConfigsOptions( includeSynonyms: Boolean = false, includeDocumentation: Boolean = false, timeout: Option[Duration] - ) { - def asJava: JDescribeConfigsOptions = { - val opts = new JDescribeConfigsOptions() - .includeSynonyms(includeSynonyms) - .includeDocumentation(includeDocumentation) - - timeout.fold(opts)(timeout => opts.timeoutMs(timeout.toMillis.toInt)) - } - } - - final case class DescribeClusterOptions(includeAuthorizedOperations: Boolean, timeout: Option[Duration]) { - lazy val asJava: JDescribeClusterOptions = { - val opts = new JDescribeClusterOptions().includeAuthorizedOperations(includeAuthorizedOperations) - timeout.fold(opts)(timeout => opts.timeoutMs(timeout.toMillis.toInt)) - } - } - - final case class DescribeConsumerGroupsOptions(includeAuthorizedOperations: Boolean, timeout: Option[Duration]) { - lazy val asJava: JDescribeConsumerGroupsOptions = { - val jOpts = new JDescribeConsumerGroupsOptions() - .includeAuthorizedOperations(includeAuthorizedOperations) - timeout.fold(jOpts)(timeout => jOpts.timeoutMs(timeout.toMillis.toInt)) - } - } - - final case class AlterConfigsOptions(validateOnly: Boolean = false, timeout: Option[Duration] = None) { - lazy val asJava: JAlterConfigsOptions = { - val jOpts = new JAlterConfigsOptions().validateOnly(validateOnly) - timeout.fold(jOpts)(timeout => jOpts.timeoutMs(timeout.toMillis.toInt)) - } - } - - final case class AlterConfigOp(configEntry: ConfigEntry, opType: AlterConfigOpType) { - lazy val asJava: JAlterConfigOp = new JAlterConfigOp(configEntry, opType.asJava) - } - - sealed trait AlterConfigOpType { - def asJava: JAlterConfigOp.OpType - } + ) + final case class DescribeClusterOptions(includeAuthorizedOperations: Boolean, timeout: Option[Duration]) + final case class DescribeConsumerGroupsOptions(includeAuthorizedOperations: Boolean, timeout: Option[Duration]) + final case class AlterConfigsOptions(validateOnly: Boolean = false, timeout: Option[Duration] = None) + final case class AlterConfigOp(configEntry: ConfigEntry, opType: AlterConfigOpType) + sealed trait AlterConfigOpType object AlterConfigOpType { - case object Set extends AlterConfigOpType { - override def asJava = JAlterConfigOp.OpType.SET - } - - case object Delete extends AlterConfigOpType { - override def asJava = JAlterConfigOp.OpType.DELETE - } - - case object Append extends AlterConfigOpType { - override def asJava = JAlterConfigOp.OpType.APPEND - } - - case object Substract extends AlterConfigOpType { - override def asJava = JAlterConfigOp.OpType.SUBTRACT - } + case object Set extends AlterConfigOpType + case object Delete extends AlterConfigOpType + case object Append extends AlterConfigOpType + case object Subtract extends AlterConfigOpType + // noinspection SpellCheckingInspection + @deprecated(since = "3.1.0", message = "Use Subtract") + val Substract: Subtract.type = Subtract } final case class MetricName(name: String, group: String, description: String, tags: Map[String, String]) - object MetricName { - def fromJava(jmn: JMetricName): MetricName = - MetricName( - name = jmn.name(), - group = jmn.group(), - description = jmn.description(), - tags = jmn.tags().asScala.toMap - ) - } - final case class Metric(name: MetricName, metricValue: AnyRef) - object Metric { - def fromJava(jm: JMetric): Metric = - Metric(name = MetricName.fromJava(jmn = jm.metricName()), metricValue = jm.metricValue()) - } - final case class NewTopic( name: String, numPartitions: Int, replicationFactor: Short, configs: Map[String, String] = Map.empty - ) { - def asJava: JNewTopic = { - val jn = new JNewTopic(name, numPartitions, replicationFactor) - - if (configs.nonEmpty) { - val _ = jn.configs(configs.asJava) - } - - jn - } - } - - final case class NewPartitions( - totalCount: Int, - newAssignments: List[List[Int]] = List.empty - ) { - def asJava: JNewPartitions = - if (newAssignments.nonEmpty) - JNewPartitions.increaseTo(totalCount, newAssignments.map(_.map(Int.box).asJava).asJava) - else JNewPartitions.increaseTo(totalCount) - } + ) + final case class NewPartitions(totalCount: Int, newAssignments: List[List[Int]] = List.empty) /** * @param id @@ -1300,22 +1020,7 @@ object AdminClient { * @param port * can't be negative if present */ - final case class Node(id: Int, host: Option[String], port: Option[Int], rack: Option[String] = None) { - lazy val asJava: JNode = new JNode(id, host.getOrElse(""), port.getOrElse(-1), rack.orNull) - } - object Node { - def fromJava(jNode: JNode): Option[Node] = - Option(jNode) - .filter(_.id() >= 0) - .map { jNode => - Node( - id = jNode.id(), - host = Option(jNode.host()).filterNot(_.isEmpty), - port = Option(jNode.port()).filter(_ >= 0), - rack = Option(jNode.rack()) - ) - } - } + final case class Node(id: Int, host: Option[String], port: Option[Int], rack: Option[String] = None) final case class TopicDescription( name: String, @@ -1324,275 +1029,66 @@ object AdminClient { authorizedOperations: Option[Set[AclOperation]] ) - object TopicDescription { - def fromJava(jt: JTopicDescription): Try[TopicDescription] = { - val authorizedOperations = Option(jt.authorizedOperations).map(_.asScala.toSet).map(_.map(AclOperation.apply)) - - jt.partitions.asScala.toList.forEach(TopicPartitionInfo.fromJava).map { partitions => - TopicDescription( - name = jt.name, - internal = jt.isInternal, - partitions = partitions, - authorizedOperations = authorizedOperations - ) - } - } - } - - final case class TopicPartitionInfo(partition: Int, leader: Option[Node], replicas: List[Node], isr: List[Node]) { - lazy val asJava: JTopicPartitionInfo = - new JTopicPartitionInfo( - partition, - leader.map(_.asJava).getOrElse(JNode.noNode()), - replicas.map(_.asJava).asJava, - isr.map(_.asJava).asJava - ) - } - - object TopicPartitionInfo { - def fromJava(jtpi: JTopicPartitionInfo): Try[TopicPartitionInfo] = { - val replicas: Try[List[Node]] = - jtpi - .replicas() - .asScala - .toList - .forEach { jNode => - Node.fromJava(jNode) match { - case Some(node) => Success(node) - case None => Failure(new RuntimeException("NoNode node not expected among topic replicas")) - } - } - - val inSyncReplicas: Try[List[Node]] = - jtpi - .isr() - .asScala - .toList - .forEach { jNode => - Node.fromJava(jNode) match { - case Some(node) => Success(node) - case None => Failure(new RuntimeException("NoNode node not expected among topic in sync replicas")) - } - } - - for { - replicas <- replicas - inSyncReplicas <- inSyncReplicas - } yield TopicPartitionInfo( - partition = jtpi.partition(), - leader = Node.fromJava(jtpi.leader()), - replicas = replicas, - isr = inSyncReplicas - ) - } - } - - final case class TopicListing(name: String, topicId: Uuid, isInternal: Boolean) { - def asJava: JTopicListing = new JTopicListing(name, topicId, isInternal) - } - - object TopicListing { - def fromJava(jtl: JTopicListing): TopicListing = TopicListing(jtl.name(), jtl.topicId(), jtl.isInternal) - } - - final case class TopicPartition( - name: String, - partition: Int - ) { - def asJava: JTopicPartition = new JTopicPartition(name, partition) - } - - object TopicPartition { - def fromJava(tp: JTopicPartition): TopicPartition = - new TopicPartition(name = tp.topic(), partition = tp.partition()) - } - - sealed abstract class OffsetSpec { - def asJava: JOffsetSpec - } + final case class TopicPartitionInfo(partition: Int, leader: Option[Node], replicas: List[Node], isr: List[Node]) + final case class TopicListing(name: String, topicId: Uuid, isInternal: Boolean) + final case class TopicPartition(name: String, partition: Int) + sealed abstract class OffsetSpec object OffsetSpec { - case object EarliestSpec extends OffsetSpec { - override def asJava: JOffsetSpec = JOffsetSpec.earliest() - } - - case object LatestSpec extends OffsetSpec { - override def asJava: JOffsetSpec = JOffsetSpec.latest() - } - - final case class TimestampSpec(timestamp: Long) extends OffsetSpec { - override def asJava: JOffsetSpec = JOffsetSpec.forTimestamp(timestamp) - } - } - - sealed abstract class IsolationLevel { - def asJava: JIsolationLevel + case object EarliestSpec extends OffsetSpec + case object LatestSpec extends OffsetSpec + final case class TimestampSpec(timestamp: Long) extends OffsetSpec } + sealed abstract class IsolationLevel object IsolationLevel { - case object ReadUncommitted extends IsolationLevel { - override def asJava: JIsolationLevel = JIsolationLevel.READ_UNCOMMITTED - } - - case object ReadCommitted extends IsolationLevel { - override def asJava: JIsolationLevel = JIsolationLevel.READ_COMMITTED - } + case object ReadUncommitted extends IsolationLevel + case object ReadCommitted extends IsolationLevel } - final case class DeleteRecordsOptions(timeout: Option[Duration]) { - def asJava: JDeleteRecordsOptions = { - val deleteRecordsOpt = new JDeleteRecordsOptions() - timeout.fold(deleteRecordsOpt)(timeout => deleteRecordsOpt.timeoutMs(timeout.toMillis.toInt)) - } - } + final case class DeleteRecordsOptions(timeout: Option[Duration]) final case class ListOffsetsOptions( isolationLevel: IsolationLevel = IsolationLevel.ReadUncommitted, timeout: Option[Duration] - ) { - def asJava: JListOffsetsOptions = { - val offsetOpt = new JListOffsetsOptions(isolationLevel.asJava) - timeout.fold(offsetOpt)(timeout => offsetOpt.timeoutMs(timeout.toMillis.toInt)) - } - } - + ) final case class ListOffsetsResultInfo( offset: Long, timestamp: Long, leaderEpoch: Option[Int] ) - object ListOffsetsResultInfo { - def fromJava(lo: JListOffsetsResultInfo): ListOffsetsResultInfo = - ListOffsetsResultInfo(lo.offset(), lo.timestamp(), lo.leaderEpoch().toScala.map(_.toInt)) - } - - final case class ListConsumerGroupOffsetsOptions(requireStable: Boolean) { - def asJava: JListConsumerGroupOffsetsOptions = { - val opts = new JListConsumerGroupOffsetsOptions() - opts.requireStable(requireStable) - } - } - - object ListConsumerGroupOffsetsOptions { - def apply(requireStable: Boolean): ListConsumerGroupOffsetsOptions = - new ListConsumerGroupOffsetsOptions(requireStable) - } - - final case class ListConsumerGroupOffsetsSpec(partitions: Chunk[TopicPartition]) { - def asJava: JListConsumerGroupOffsetsSpec = { - val opts = new JListConsumerGroupOffsetsSpec - opts.topicPartitions(partitions.map(_.asJava).asJava) - opts - } - } + final case class ListConsumerGroupOffsetsOptions(requireStable: Boolean) + final case class ListConsumerGroupOffsetsSpec(partitions: Chunk[TopicPartition]) final case class OffsetAndMetadata( offset: Long, leaderEpoch: Option[Int] = None, metadata: Option[String] = None - ) { - def asJava: JOffsetAndMetadata = new JOffsetAndMetadata(offset, leaderEpoch.map(Int.box).toJava, metadata.orNull) - } - object OffsetAndMetadata { - def fromJava(om: JOffsetAndMetadata): OffsetAndMetadata = - OffsetAndMetadata( - offset = om.offset(), - leaderEpoch = om.leaderEpoch().toScala.map(_.toInt), - metadata = Some(om.metadata()) - ) - } - - final case class AlterConsumerGroupOffsetsOptions(timeout: Option[Duration]) { - def asJava: JAlterConsumerGroupOffsetsOptions = { - val options = new JAlterConsumerGroupOffsetsOptions() - timeout.fold(options)(timeout => options.timeoutMs(timeout.toMillis.toInt)) - } - } + ) - final case class ListConsumerGroupsOptions(states: Set[GroupState]) { - def asJListGroupsOptions: JListGroupsOptions = - JListGroupsOptions.forConsumerGroups().inGroupStates(states.map(_.asJava).asJava) - } + final case class AlterConsumerGroupOffsetsOptions(timeout: Option[Duration]) + final case class ListConsumerGroupsOptions(states: Set[GroupState]) final case class ConsumerGroupListing(groupId: String, isSimple: Boolean, state: Option[GroupState]) - object ConsumerGroupListing { - def fromJava(cg: JGroupListing): ConsumerGroupListing = - ConsumerGroupListing( - groupId = cg.groupId(), - isSimple = cg.isSimpleConsumerGroup, - state = cg.groupState().toScala.map(GroupState.fromJava) - ) - } - final case class GroupListing( groupId: String, groupType: Option[GroupType], protocol: String, groupState: Option[GroupState] - ) { - def asJava: JGroupListing = - new JGroupListing( - groupId, - groupType.map(_.asJava).toJava, - protocol, - groupState.map(_.asJava).toJava - ) - } - - object GroupListing { - def fromJava(jgl: JGroupListing): GroupListing = - GroupListing( - jgl.groupId(), - jgl.`type`().toScala.map(GroupType.fromJava), - jgl.protocol(), - jgl.groupState().toScala.map(GroupState.fromJava) - ) - } + ) final case class ListGroupsOptions( groupStates: Set[GroupState] = Set.empty, groupTypes: Set[GroupType] = Set.empty, protocolTypes: Set[String] = Set.empty - ) { - def asJava: JListGroupsOptions = - new JListGroupsOptions() - .inGroupStates(groupStates.map(_.asJava).asJava) - .withTypes(groupTypes.map(_.asJava).asJava) - .withProtocolTypes(protocolTypes.asJava) - } - - object ListGroupsOptions { - def fromJava(lgo: JListGroupsOptions): ListGroupsOptions = - ListGroupsOptions( - lgo.groupStates().asScala.map(GroupState.fromJava).toSet, - lgo.types().asScala.map(GroupType.fromJava).toSet, - lgo.protocolTypes().asScala.toSet - ) - } + ) - final case class KafkaConfig(entries: Map[String, ConfigEntry]) { - def asJava: JConfig = new JConfig(entries.values.asJavaCollection) - } - object KafkaConfig { - def fromJava(jConfig: JConfig): KafkaConfig = - KafkaConfig(entries = jConfig.entries().asScala.map(e => e.name() -> e).toMap) - } + final case class KafkaConfig(entries: Map[String, ConfigEntry]) final case class LogDirDescription(error: ApiException, replicaInfos: Map[TopicPartition, ReplicaInfo]) - object LogDirDescription { - def fromJava(ld: JLogDirDescription): LogDirDescription = - LogDirDescription( - error = ld.error(), - replicaInfos = ld.replicaInfos().asScala.bimap(TopicPartition.fromJava, ReplicaInfo.fromJava).toMap - ) - } final case class ReplicaInfo(size: Long, offsetLag: Long, isFuture: Boolean) - object ReplicaInfo { - def fromJava(ri: JReplicaInfo): ReplicaInfo = - ReplicaInfo(size = ri.size(), offsetLag = ri.offsetLag(), isFuture = ri.isFuture) - } def make(settings: AdminClientSettings): ZIO[Scope, Throwable, AdminClient] = fromScopedJavaClient(javaClientFromSettings(settings)) @@ -1615,36 +1111,4 @@ object AdminClient { endpointCheck *> ZIO.attempt(JAdmin.create(settings.driverSettings.asJava)) }(client => ZIO.attempt(client.close(settings.closeTimeout)).orDie) - implicit final class MapOps[K1, V1](private val v: Map[K1, V1]) extends AnyVal { - def bimap[K2, V2](fk: K1 => K2, fv: V1 => V2): Map[K2, V2] = v.map { case (k, v) => fk(k) -> fv(v) } - } - - implicit final class MutableMapOps[K1, V1](private val v: mutable.Map[K1, V1]) extends AnyVal { - def bimap[K2, V2](fk: K1 => K2, fv: V1 => V2): mutable.Map[K2, V2] = v.map { case (k, v) => fk(k) -> fv(v) } - } - - implicit final class OptionalOps[T](private val v: Optional[T]) extends AnyVal { - def toScala: Option[T] = if (v.isPresent) Some(v.get()) else None - } - - implicit final class OptionOps[T](private val v: Option[T]) extends AnyVal { - def toJava: Optional[T] = v.fold(Optional.empty[T])(Optional.of) - } - - implicit final class ListOps[A](private val list: List[A]) extends AnyVal { - def forEach[B](f: A => Try[B]): Try[List[B]] = { - @tailrec - def loop(acc: ListBuffer[B], rest: List[A]): Try[List[B]] = - rest match { - case Nil => Success(acc.toList) - case h :: t => - f(h) match { - case Success(b) => loop(acc += b, t) - case fail @ Failure(_) => fail.asInstanceOf[Try[List[B]]] - } - } - - loop(ListBuffer.empty, list) - } - } } diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/acl/AccessControlEntry.scala b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AccessControlEntry.scala index 2459c3545..87ab99e62 100644 --- a/zio-kafka/src/main/scala/zio/kafka/admin/acl/AccessControlEntry.scala +++ b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AccessControlEntry.scala @@ -1,5 +1,6 @@ package zio.kafka.admin.acl +import zio.kafka.admin.internal.JavaConverters._ import org.apache.kafka.common.acl.{ AccessControlEntry => JAccessControlEntry } final case class AccessControlEntry( @@ -15,7 +16,7 @@ object AccessControlEntry { def apply(jAccessControlEntry: JAccessControlEntry): AccessControlEntry = AccessControlEntry( principal = jAccessControlEntry.principal(), host = jAccessControlEntry.host(), - operation = AclOperation(jAccessControlEntry.operation()), + operation = jAccessControlEntry.operation().asScala, permissionType = AclPermissionType(jAccessControlEntry.permissionType()) ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/acl/AccessControlEntryFilter.scala b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AccessControlEntryFilter.scala index f7995afc5..d9d7ad7fe 100644 --- a/zio-kafka/src/main/scala/zio/kafka/admin/acl/AccessControlEntryFilter.scala +++ b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AccessControlEntryFilter.scala @@ -1,5 +1,6 @@ package zio.kafka.admin.acl +import zio.kafka.admin.internal.JavaConverters._ import org.apache.kafka.common.acl.{ AccessControlEntryFilter => JAccessControlEntryFilter } final case class AccessControlEntryFilter( @@ -18,7 +19,7 @@ object AccessControlEntryFilter { def apply(jAccessControlEntryFilter: JAccessControlEntryFilter): AccessControlEntryFilter = AccessControlEntryFilter( principal = jAccessControlEntryFilter.principal(), host = jAccessControlEntryFilter.host(), - operation = AclOperation(jAccessControlEntryFilter.operation()), + operation = jAccessControlEntryFilter.operation().asScala, permissionType = AclPermissionType(jAccessControlEntryFilter.permissionType()) ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/acl/AclOperation.scala b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AclOperation.scala index eb4ff68c4..26c161c2f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/admin/acl/AclOperation.scala +++ b/zio-kafka/src/main/scala/zio/kafka/admin/acl/AclOperation.scala @@ -1,78 +1,22 @@ package zio.kafka.admin.acl -import org.apache.kafka.common.acl.{ AclOperation => JAclOperation } - -sealed trait AclOperation { - def asJava: JAclOperation -} +sealed trait AclOperation object AclOperation { - case object Unknown extends AclOperation { - def asJava: JAclOperation = JAclOperation.UNKNOWN - } - case object Any extends AclOperation { - def asJava: JAclOperation = JAclOperation.ANY - } - case object All extends AclOperation { - def asJava: JAclOperation = JAclOperation.ALL - } - case object Read extends AclOperation { - def asJava: JAclOperation = JAclOperation.READ - } - case object Write extends AclOperation { - def asJava: JAclOperation = JAclOperation.WRITE - } - case object Create extends AclOperation { - def asJava: JAclOperation = JAclOperation.CREATE - } - case object Delete extends AclOperation { - def asJava: JAclOperation = JAclOperation.DELETE - } - case object Alter extends AclOperation { - def asJava: JAclOperation = JAclOperation.ALTER - } - case object Describe extends AclOperation { - def asJava: JAclOperation = JAclOperation.DESCRIBE - } - case object ClusterAction extends AclOperation { - def asJava: JAclOperation = JAclOperation.CLUSTER_ACTION - } - case object DescribeConfigs extends AclOperation { - def asJava: JAclOperation = JAclOperation.DESCRIBE_CONFIGS - } - case object AlterConfigs extends AclOperation { - def asJava: JAclOperation = JAclOperation.ALTER_CONFIGS - } - case object IdempotentWrite extends AclOperation { - def asJava: JAclOperation = JAclOperation.IDEMPOTENT_WRITE - } - case object CreateTokens extends AclOperation { - def asJava: JAclOperation = JAclOperation.CREATE_TOKENS - } - case object DescribeTokens extends AclOperation { - def asJava: JAclOperation = JAclOperation.DESCRIBE_TOKENS - } - case object TwoPhaseCommit extends AclOperation { - def asJava: JAclOperation = JAclOperation.TWO_PHASE_COMMIT - } - - def apply(jAclOperation: JAclOperation): AclOperation = - jAclOperation match { - case JAclOperation.UNKNOWN => Unknown - case JAclOperation.ANY => Any - case JAclOperation.ALL => All - case JAclOperation.READ => Read - case JAclOperation.WRITE => Write - case JAclOperation.CREATE => Create - case JAclOperation.DELETE => Delete - case JAclOperation.ALTER => Alter - case JAclOperation.DESCRIBE => Describe - case JAclOperation.CLUSTER_ACTION => ClusterAction - case JAclOperation.DESCRIBE_CONFIGS => DescribeConfigs - case JAclOperation.ALTER_CONFIGS => AlterConfigs - case JAclOperation.IDEMPOTENT_WRITE => IdempotentWrite - case JAclOperation.CREATE_TOKENS => CreateTokens - case JAclOperation.DESCRIBE_TOKENS => DescribeTokens - case JAclOperation.TWO_PHASE_COMMIT => TwoPhaseCommit - } + case object Unknown extends AclOperation + case object Any extends AclOperation + case object All extends AclOperation + case object Read extends AclOperation + case object Write extends AclOperation + case object Create extends AclOperation + case object Delete extends AclOperation + case object Alter extends AclOperation + case object Describe extends AclOperation + case object ClusterAction extends AclOperation + case object DescribeConfigs extends AclOperation + case object AlterConfigs extends AclOperation + case object IdempotentWrite extends AclOperation + case object CreateTokens extends AclOperation + case object DescribeTokens extends AclOperation + case object TwoPhaseCommit extends AclOperation } diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/internal/JavaConverters.scala b/zio-kafka/src/main/scala/zio/kafka/admin/internal/JavaConverters.scala new file mode 100644 index 000000000..4e0cb1f3f --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/admin/internal/JavaConverters.scala @@ -0,0 +1,599 @@ +package zio.kafka.admin.internal + +import zio.kafka.admin.AdminClient.{ ConsumerGroupDescription, _ } +import org.apache.kafka.clients.admin.ListOffsetsResult.{ ListOffsetsResultInfo => JListOffsetsResultInfo } +import org.apache.kafka.clients.admin.{ + AlterConfigOp => JAlterConfigOp, + AlterConfigsOptions => JAlterConfigsOptions, + AlterConsumerGroupOffsetsOptions => JAlterConsumerGroupOffsetsOptions, + Config => JConfig, + ConsumerGroupDescription => JConsumerGroupDescription, + ConsumerGroupListing => _, + CreatePartitionsOptions => JCreatePartitionsOptions, + CreateTopicsOptions => JCreateTopicsOptions, + DeleteAclsOptions => _, + DeleteConsumerGroupsOptions => JDeleteConsumerGroupsOptions, + DeleteRecordsOptions => JDeleteRecordsOptions, + DeleteTopicsOptions => JDeleteTopicsOptions, + DescribeClusterOptions => JDescribeClusterOptions, + DescribeConfigsOptions => JDescribeConfigsOptions, + DescribeConsumerGroupsOptions => JDescribeConsumerGroupsOptions, + DescribeTopicsOptions => JDescribeTopicsOptions, + GroupListing => JGroupListing, + ListConsumerGroupOffsetsOptions => JListConsumerGroupOffsetsOptions, + ListConsumerGroupOffsetsSpec => JListConsumerGroupOffsetsSpec, + ListConsumerGroupsOptions => _, + ListGroupsOptions => JListGroupsOptions, + ListOffsetsOptions => JListOffsetsOptions, + ListTopicsOptions => JListTopicsOptions, + LogDirDescription => JLogDirDescription, + MemberDescription => JMemberDescription, + NewPartitions => JNewPartitions, + NewTopic => JNewTopic, + OffsetSpec => JOffsetSpec, + ReplicaInfo => JReplicaInfo, + TopicDescription => JTopicDescription, + TopicListing => JTopicListing +} +import org.apache.kafka.clients.consumer.{ OffsetAndMetadata => JOffsetAndMetadata } +import org.apache.kafka.common.config.{ ConfigResource => JConfigResource } +import org.apache.kafka.common.{ + GroupState => JGroupState, + GroupType => JGroupType, + IsolationLevel => JIsolationLevel, + Metric => JMetric, + MetricName => JMetricName, + Node => JNode, + TopicPartition => JTopicPartition, + TopicPartitionInfo => JTopicPartitionInfo +} +import zio.kafka.admin.acl._ +import org.apache.kafka.common.acl.{ AclOperation => JAclOperation } + +import java.util.Optional +import scala.annotation.tailrec +import scala.collection.mutable +import scala.collection.mutable.ListBuffer +import scala.jdk.CollectionConverters._ +import scala.util.{ Failure, Success, Try } + +object JavaConverters { + + implicit final class MapOps[K1, V1](private val v: Map[K1, V1]) extends AnyVal { + def bimap[K2, V2](fk: K1 => K2, fv: V1 => V2): Map[K2, V2] = v.map { case (k, v) => fk(k) -> fv(v) } + } + + implicit final class MutableMapOps[K1, V1](private val v: mutable.Map[K1, V1]) extends AnyVal { + def bimap[K2, V2](fk: K1 => K2, fv: V1 => V2): mutable.Map[K2, V2] = v.map { case (k, v) => fk(k) -> fv(v) } + } + + implicit final class OptionalOps[T](private val v: Optional[T]) extends AnyVal { + def toScala: Option[T] = if (v.isPresent) Some(v.get()) else None + } + + implicit final class OptionOps[T](private val v: Option[T]) extends AnyVal { + def toJava: Optional[T] = v.fold(Optional.empty[T])(Optional.of) + } + + implicit final class ListOps[A](private val list: List[A]) extends AnyVal { + def forEach[B](f: A => Try[B]): Try[List[B]] = { + @tailrec + def loop(acc: ListBuffer[B], rest: List[A]): Try[List[B]] = + rest match { + case Nil => Success(acc.toList) + case h :: t => + f(h) match { + case Success(b) => loop(acc += b, t) + case fail @ Failure(_) => fail.asInstanceOf[Try[List[B]]] + } + } + + loop(ListBuffer.empty, list) + } + } + + implicit class ConfigResourceAsJava(private val cr: ConfigResource) extends AnyVal { + def asJava: JConfigResource = new JConfigResource(cr.`type`.asJava, cr.name) + } + implicit class JConfigResourceAsScala(private val jcr: JConfigResource) extends AnyVal { + def asScala: ConfigResource = + ConfigResource(`type` = jcr.`type`().asScala, name = jcr.name()) + } + + implicit class ConfigResourceTypeAsJava(private val crt: ConfigResourceType) extends AnyVal { + def asJava: JConfigResource.Type = + crt match { + case ConfigResourceType.BrokerLogger => JConfigResource.Type.BROKER_LOGGER + case ConfigResourceType.Broker => JConfigResource.Type.BROKER + case ConfigResourceType.Topic => JConfigResource.Type.TOPIC + case ConfigResourceType.Unknown => JConfigResource.Type.UNKNOWN + case ConfigResourceType.ClientMetrics => JConfigResource.Type.CLIENT_METRICS + case ConfigResourceType.Group => JConfigResource.Type.GROUP + } + } + implicit class JConfigResourceTypeAsScala(private val jcrt: JConfigResource.Type) extends AnyVal { + def asScala: ConfigResourceType = + jcrt match { + case JConfigResource.Type.BROKER_LOGGER => ConfigResourceType.BrokerLogger + case JConfigResource.Type.BROKER => ConfigResourceType.Broker + case JConfigResource.Type.TOPIC => ConfigResourceType.Topic + case JConfigResource.Type.UNKNOWN => ConfigResourceType.Unknown + case JConfigResource.Type.CLIENT_METRICS => ConfigResourceType.ClientMetrics + case JConfigResource.Type.GROUP => ConfigResourceType.Group + } + } + + implicit class GroupStateAsJava(private val gs: GroupState) extends AnyVal { + def asJava: JGroupState = + gs match { + case GroupState.Unknown => JGroupState.UNKNOWN + case GroupState.PreparingRebalance => JGroupState.PREPARING_REBALANCE + case GroupState.CompletingRebalance => JGroupState.COMPLETING_REBALANCE + case GroupState.Stable => JGroupState.STABLE + case GroupState.Dead => JGroupState.DEAD + case GroupState.Empty => JGroupState.EMPTY + case GroupState.Assigning => JGroupState.ASSIGNING + case GroupState.Reconciling => JGroupState.RECONCILING + case GroupState.NotReady => JGroupState.NOT_READY + } + } + implicit class JGroupStateAsScala(private val jgs: JGroupState) extends AnyVal { + def asScala: GroupState = + jgs match { + case JGroupState.UNKNOWN => GroupState.Unknown + case JGroupState.PREPARING_REBALANCE => GroupState.PreparingRebalance + case JGroupState.COMPLETING_REBALANCE => GroupState.CompletingRebalance + case JGroupState.STABLE => GroupState.Stable + case JGroupState.DEAD => GroupState.Dead + case JGroupState.EMPTY => GroupState.Empty + case JGroupState.ASSIGNING => GroupState.Assigning + case JGroupState.RECONCILING => GroupState.Reconciling + case JGroupState.NOT_READY => GroupState.NotReady + } + } + + implicit class GroupTypeAsJava(private val gt: GroupType) extends AnyVal { + def asJava: JGroupType = + gt match { + case GroupType.Unknown => JGroupType.UNKNOWN + case GroupType.Consumer => JGroupType.CONSUMER + case GroupType.Classic => JGroupType.CLASSIC + case GroupType.Share => JGroupType.SHARE + case GroupType.Streams => JGroupType.STREAMS + } + } + implicit class JGroupTypeAsScale(private val jgt: JGroupType) extends AnyVal { + def asScala: GroupType = + jgt match { + case JGroupType.UNKNOWN => GroupType.Unknown + case JGroupType.CONSUMER => GroupType.Consumer + case JGroupType.CLASSIC => GroupType.Classic + case JGroupType.SHARE => GroupType.Share + case JGroupType.STREAMS => GroupType.Streams + } + } + + implicit class JMemberDescriptionAsScala(private val jmd: JMemberDescription) extends AnyVal { + def asScala: MemberDescription = + MemberDescription( + consumerId = jmd.consumerId, + groupInstanceId = jmd.groupInstanceId.toScala, + clientId = jmd.clientId(), + host = jmd.host(), + assignment = jmd.assignment.topicPartitions().asScala.map(_.asScala).toSet + ) + } + + implicit class JConsumerGroupDescriptionAsScala(private val description: JConsumerGroupDescription) extends AnyVal { + def asScala: ConsumerGroupDescription = + ConsumerGroupDescription( + groupId = description.groupId, + isSimpleConsumerGroup = description.isSimpleConsumerGroup, + members = description.members.asScala.map(_.asScala).toList, + partitionAssignor = description.partitionAssignor, + state = description.groupState.asScala, + coordinator = description.coordinator().asScala, + authorizedOperations = Option(description.authorizedOperations()) + .fold(Set.empty[AclOperation])(_.asScala.map(_.asScala).toSet) + ) + } + + implicit class CreatePartitionsOptionsAsJava(private val cpo: CreatePartitionsOptions) extends AnyVal { + def asJava: JCreatePartitionsOptions = { + val opts = new JCreatePartitionsOptions() + .validateOnly(cpo.validateOnly) + .retryOnQuotaViolation(cpo.retryOnQuotaViolation) + cpo.timeout.fold(opts)(timeout => opts.timeoutMs(timeout.toMillis.toInt)) + } + } + + implicit class CreateTopicsOptionsAsJava(private val cto: CreateTopicsOptions) extends AnyVal { + def asJava: JCreateTopicsOptions = { + val opts = new JCreateTopicsOptions().validateOnly(cto.validateOnly) + cto.timeout.fold(opts)(timeout => opts.timeoutMs(timeout.toMillis.toInt)) + } + } + + implicit class DeleteConsumerGroupsOptionsAsJava(private val cto: DeleteConsumerGroupsOptions) extends AnyVal { + def asJava: JDeleteConsumerGroupsOptions = { + val opts = new JDeleteConsumerGroupsOptions() + cto.timeout.fold(opts)(timeout => opts.timeoutMs(timeout.toMillis.toInt)) + } + } + + implicit class DeleteTopicsOptionsAsJava(private val dto: DeleteTopicsOptions) extends AnyVal { + def asJava: JDeleteTopicsOptions = { + val opts = new JDeleteTopicsOptions().retryOnQuotaViolation(dto.retryOnQuotaViolation) + dto.timeout.fold(opts)(timeout => opts.timeoutMs(timeout.toMillis.toInt)) + } + } + + implicit class ListTopicsOptionsAsJava(private val lto: ListTopicsOptions) extends AnyVal { + def asJava: JListTopicsOptions = { + val opts = new JListTopicsOptions().listInternal(lto.listInternal) + lto.timeout.fold(opts)(timeout => opts.timeoutMs(timeout.toMillis.toInt)) + } + } + + implicit class DescribeTopicsOptionsAsJava(private val dto: DescribeTopicsOptions) extends AnyVal { + def asJava: JDescribeTopicsOptions = { + val opts = new JDescribeTopicsOptions().includeAuthorizedOperations(dto.includeAuthorizedOperations) + dto.timeout.fold(opts)(timeout => opts.timeoutMs(timeout.toMillis.toInt)) + } + } + + implicit class DescribeConfigsOptionsAsJava(private val dco: DescribeConfigsOptions) extends AnyVal { + def asJava: JDescribeConfigsOptions = { + val opts = new JDescribeConfigsOptions() + .includeSynonyms(dco.includeSynonyms) + .includeDocumentation(dco.includeDocumentation) + dco.timeout.fold(opts)(timeout => opts.timeoutMs(timeout.toMillis.toInt)) + } + } + + implicit class DescribeClusterOptionsAsJava(private val dco: DescribeClusterOptions) extends AnyVal { + def asJava: JDescribeClusterOptions = { + val opts = new JDescribeClusterOptions().includeAuthorizedOperations(dco.includeAuthorizedOperations) + dco.timeout.fold(opts)(timeout => opts.timeoutMs(timeout.toMillis.toInt)) + } + } + + implicit class DescribeConsumerGroupsOptionsAsJava(private val dcgo: DescribeConsumerGroupsOptions) extends AnyVal { + def asJava: JDescribeConsumerGroupsOptions = { + val jOpts = new JDescribeConsumerGroupsOptions() + .includeAuthorizedOperations(dcgo.includeAuthorizedOperations) + dcgo.timeout.fold(jOpts)(timeout => jOpts.timeoutMs(timeout.toMillis.toInt)) + } + } + + implicit class AlterConfigsOptionsAsJava(private val aco: AlterConfigsOptions) extends AnyVal { + def asJava: JAlterConfigsOptions = { + val jOpts = new JAlterConfigsOptions().validateOnly(aco.validateOnly) + aco.timeout.fold(jOpts)(timeout => jOpts.timeoutMs(timeout.toMillis.toInt)) + } + } + + implicit class AlterConfigOpAsJava(private val aco: AlterConfigOp) extends AnyVal { + def asJava: JAlterConfigOp = new JAlterConfigOp(aco.configEntry, aco.opType.asJava) + } + + implicit class AlterConfigOpTypeAsJava(private val acot: AlterConfigOpType) extends AnyVal { + def asJava: JAlterConfigOp.OpType = + acot match { + case AlterConfigOpType.Set => JAlterConfigOp.OpType.SET + case AlterConfigOpType.Delete => JAlterConfigOp.OpType.DELETE + case AlterConfigOpType.Append => JAlterConfigOp.OpType.APPEND + case AlterConfigOpType.Subtract => JAlterConfigOp.OpType.SUBTRACT + } + } + + implicit class JMetricNameAsScala(private val jmn: JMetricName) extends AnyVal { + def asScala: MetricName = + MetricName( + name = jmn.name(), + group = jmn.group(), + description = jmn.description(), + tags = jmn.tags().asScala.toMap + ) + } + + implicit class JMetricAsScala(private val jm: JMetric) extends AnyVal { + def asScala: Metric = + Metric(name = jm.metricName().asScala, metricValue = jm.metricValue()) + } + + implicit class NewTopicAsJava(private val nt: NewTopic) extends AnyVal { + def asJava: JNewTopic = { + val jn = new JNewTopic(nt.name, nt.numPartitions, nt.replicationFactor) + + if (nt.configs.nonEmpty) { + val _ = jn.configs(nt.configs.asJava) + } + + jn + } + } + + implicit class NewPartitionsAsJava(private val np: NewPartitions) extends AnyVal { + def asJava: JNewPartitions = + if (np.newAssignments.isEmpty) JNewPartitions.increaseTo(np.totalCount) + else JNewPartitions.increaseTo(np.totalCount, np.newAssignments.map(_.map(Int.box).asJava).asJava) + } + + implicit class NodeAsJava(private val n: Node) extends AnyVal { + def asJava: JNode = new JNode(n.id, n.host.getOrElse(""), n.port.getOrElse(-1), n.rack.orNull) + } + + /** + * @param jn + * may be null + */ + implicit class JNodeAsScala(private val jn: JNode) extends AnyVal { + def asScala: Option[Node] = + Option(jn) + .filter(_.id() >= 0) + .map { jNode => + Node( + id = jNode.id(), + host = Option(jNode.host()).filterNot(_.isEmpty), + port = Option(jNode.port()).filter(_ >= 0), + rack = Option(jNode.rack()) + ) + } + } + + implicit class JTopicDescriptionAsScala(private val td: JTopicDescription) extends AnyVal { + def asScala: Try[TopicDescription] = { + val authorizedOperations = Option(td.authorizedOperations).map(_.asScala.toSet).map(_.map(_.asScala)) + + td.partitions.asScala.toList.forEach(_.asScala).map { partitions => + TopicDescription( + name = td.name, + internal = td.isInternal, + partitions = partitions, + authorizedOperations = authorizedOperations + ) + } + } + } + + implicit class TopicPartitionInfoAsJava(private val tpi: TopicPartitionInfo) extends AnyVal { + def asJava: JTopicPartitionInfo = + new JTopicPartitionInfo( + tpi.partition, + tpi.leader.map(_.asJava).getOrElse(JNode.noNode()), + tpi.replicas.map(_.asJava).asJava, + tpi.isr.map(_.asJava).asJava + ) + } + implicit class JTopicPartitionInfoAsScala(private val jtpi: JTopicPartitionInfo) extends AnyVal { + def asScala: Try[TopicPartitionInfo] = { + val replicas: Try[List[Node]] = + jtpi + .replicas() + .asScala + .toList + .forEach { jNode => + jNode.asScala match { + case Some(node) => Success(node) + case None => Failure(new RuntimeException("NoNode node not expected among topic replicas")) + } + } + + val inSyncReplicas: Try[List[Node]] = + jtpi + .isr() + .asScala + .toList + .forEach { jNode => + jNode.asScala match { + case Some(node) => Success(node) + case None => Failure(new RuntimeException("NoNode node not expected among topic in sync replicas")) + } + } + + for { + replicas <- replicas + inSyncReplicas <- inSyncReplicas + } yield TopicPartitionInfo( + partition = jtpi.partition(), + leader = jtpi.leader().asScala, + replicas = replicas, + isr = inSyncReplicas + ) + } + } + + implicit class TopicListingAsJava(private val tl: TopicListing) extends AnyVal { + def asJava: JTopicListing = new JTopicListing(tl.name, tl.topicId, tl.isInternal) + } + implicit class JTopicListingAsScala(private val jtl: JTopicListing) extends AnyVal { + def asScala: TopicListing = TopicListing(jtl.name(), jtl.topicId(), jtl.isInternal) + } + + implicit class TopicPartitionAsJava(private val tp: TopicPartition) extends AnyVal { + def asJava: JTopicPartition = new JTopicPartition(tp.name, tp.partition) + } + + implicit class JTopicPartitionAsScala(private val jtp: JTopicPartition) extends AnyVal { + def asScala: TopicPartition = + TopicPartition(name = jtp.topic(), partition = jtp.partition()) + } + + implicit class OffsetSpecAsJava(private val os: OffsetSpec) extends AnyVal { + def asJava: JOffsetSpec = + os match { + case OffsetSpec.EarliestSpec => JOffsetSpec.earliest() + case OffsetSpec.LatestSpec => JOffsetSpec.latest() + case OffsetSpec.TimestampSpec(timestamp) => JOffsetSpec.forTimestamp(timestamp) + } + } + + implicit class IsolationLevelAsJava(private val il: IsolationLevel) extends AnyVal { + def asJava: JIsolationLevel = + il match { + case IsolationLevel.ReadUncommitted => JIsolationLevel.READ_UNCOMMITTED + case IsolationLevel.ReadCommitted => JIsolationLevel.READ_COMMITTED + } + } + + implicit class DeleteRecordsOptionsAsJava(private val dro: DeleteRecordsOptions) extends AnyVal { + def asJava: JDeleteRecordsOptions = { + val deleteRecordsOpt = new JDeleteRecordsOptions() + dro.timeout.fold(deleteRecordsOpt)(timeout => deleteRecordsOpt.timeoutMs(timeout.toMillis.toInt)) + } + } + + implicit class ListOffsetsOptionsAsJava(private val loo: ListOffsetsOptions) extends AnyVal { + def asJava: JListOffsetsOptions = { + val offsetOpt = new JListOffsetsOptions(loo.isolationLevel.asJava) + loo.timeout.fold(offsetOpt)(timeout => offsetOpt.timeoutMs(timeout.toMillis.toInt)) + } + } + + implicit class JListOffsetsResultInfoAsScala(private val jlori: JListOffsetsResultInfo) extends AnyVal { + def asScala: ListOffsetsResultInfo = + ListOffsetsResultInfo(jlori.offset(), jlori.timestamp(), jlori.leaderEpoch().toScala.map(_.toInt)) + } + + implicit class JListConsumerGroupOffsetsOptionsAsJava(private val lcgoo: ListConsumerGroupOffsetsOptions) + extends AnyVal { + def asJava: JListConsumerGroupOffsetsOptions = { + val opts = new JListConsumerGroupOffsetsOptions() + opts.requireStable(lcgoo.requireStable) + } + } + + implicit class ListConsumerGroupOffsetsSpecAsScala(private val lcgos: ListConsumerGroupOffsetsSpec) extends AnyVal { + def asJava: JListConsumerGroupOffsetsSpec = { + val opts = new JListConsumerGroupOffsetsSpec + opts.topicPartitions(lcgos.partitions.map(_.asJava).asJava) + opts + } + } + + implicit class OffsetAndMetadataAsJava(private val oam: OffsetAndMetadata) extends AnyVal { + def asJava: JOffsetAndMetadata = + new JOffsetAndMetadata(oam.offset, oam.leaderEpoch.map(Int.box).toJava, oam.metadata.orNull) + } + implicit class JOffsetAndMetadataAsScala(private val joam: JOffsetAndMetadata) extends AnyVal { + def asScala: OffsetAndMetadata = + OffsetAndMetadata( + offset = joam.offset(), + leaderEpoch = joam.leaderEpoch().toScala.map(_.toInt), + metadata = Some(joam.metadata()) + ) + } + + implicit class AlterConsumerGroupOffsetsOptionsAsJava(private val acgoo: AlterConsumerGroupOffsetsOptions) + extends AnyVal { + def asJava: JAlterConsumerGroupOffsetsOptions = { + val options = new JAlterConsumerGroupOffsetsOptions() + acgoo.timeout.fold(options)(timeout => options.timeoutMs(timeout.toMillis.toInt)) + } + } + + implicit class ListConsumerGroupsOptionsAsJava(private val lcgo: ListConsumerGroupsOptions) extends AnyVal { + def asJListGroupsOptions: JListGroupsOptions = + JListGroupsOptions.forConsumerGroups().inGroupStates(lcgo.states.map(_.asJava).asJava) + } + + implicit class JGroupListingAsScala(private val jgl: JGroupListing) extends AnyVal { + def asConsumerGroupListing: ConsumerGroupListing = + ConsumerGroupListing( + groupId = jgl.groupId(), + isSimple = jgl.isSimpleConsumerGroup, + state = jgl.groupState().toScala.map(_.asScala) + ) + + def asScala: GroupListing = + GroupListing( + jgl.groupId(), + jgl.`type`().toScala.map(_.asScala), + jgl.protocol(), + jgl.groupState().toScala.map(_.asScala) + ) + } + + implicit class ListGroupsOptionsAsJava(private val lgo: ListGroupsOptions) extends AnyVal { + def asJava: JListGroupsOptions = + new JListGroupsOptions() + .inGroupStates(lgo.groupStates.map(_.asJava).asJava) + .withTypes(lgo.groupTypes.map(_.asJava).asJava) + .withProtocolTypes(lgo.protocolTypes.asJava) + } + + implicit class JListGroupsOptionsAsScala(private val jlgo: JListGroupsOptions) extends AnyVal { + def asScala: ListGroupsOptions = + ListGroupsOptions( + jlgo.groupStates().asScala.map(_.asScala).toSet, + jlgo.types().asScala.map(_.asScala).toSet, + jlgo.protocolTypes().asScala.toSet + ) + } + + implicit class KafkaConfigAsJava(private val kc: KafkaConfig) extends AnyVal { + def asJava: JConfig = new JConfig(kc.entries.values.asJavaCollection) + } + implicit class JConfigAsScala(private val jc: JConfig) extends AnyVal { + def asScala: KafkaConfig = + KafkaConfig(entries = jc.entries().asScala.map(e => e.name() -> e).toMap) + } + + implicit class JLogDirDescriptionAsScala(private val jldd: JLogDirDescription) extends AnyVal { + def asScala: LogDirDescription = + LogDirDescription( + error = jldd.error(), + replicaInfos = jldd.replicaInfos().asScala.bimap(_.asScala, _.asScala).toMap + ) + } + + implicit class JReplicaInfoAsScala(private val jri: JReplicaInfo) extends AnyVal { + def asScala: ReplicaInfo = + ReplicaInfo(size = jri.size(), offsetLag = jri.offsetLag(), isFuture = jri.isFuture) + } + + implicit class AclOperationAsScala(private val ao: AclOperation) extends AnyVal { + def asJava: JAclOperation = + ao match { + case AclOperation.Unknown => JAclOperation.UNKNOWN + case AclOperation.Any => JAclOperation.ANY + case AclOperation.All => JAclOperation.ALL + case AclOperation.Read => JAclOperation.READ + case AclOperation.Write => JAclOperation.WRITE + case AclOperation.Create => JAclOperation.CREATE + case AclOperation.Delete => JAclOperation.DELETE + case AclOperation.Alter => JAclOperation.ALTER + case AclOperation.Describe => JAclOperation.DESCRIBE + case AclOperation.ClusterAction => JAclOperation.CLUSTER_ACTION + case AclOperation.DescribeConfigs => JAclOperation.DESCRIBE_CONFIGS + case AclOperation.AlterConfigs => JAclOperation.ALTER_CONFIGS + case AclOperation.IdempotentWrite => JAclOperation.IDEMPOTENT_WRITE + case AclOperation.CreateTokens => JAclOperation.CREATE_TOKENS + case AclOperation.DescribeTokens => JAclOperation.DESCRIBE_TOKENS + case AclOperation.TwoPhaseCommit => JAclOperation.TWO_PHASE_COMMIT + } + } + implicit class JAclOperationAsJava(private val jao: JAclOperation) extends AnyVal { + def asScala: AclOperation = + jao match { + case JAclOperation.UNKNOWN => AclOperation.Unknown + case JAclOperation.ANY => AclOperation.Any + case JAclOperation.ALL => AclOperation.All + case JAclOperation.READ => AclOperation.Read + case JAclOperation.WRITE => AclOperation.Write + case JAclOperation.CREATE => AclOperation.Create + case JAclOperation.DELETE => AclOperation.Delete + case JAclOperation.ALTER => AclOperation.Alter + case JAclOperation.DESCRIBE => AclOperation.Describe + case JAclOperation.CLUSTER_ACTION => AclOperation.ClusterAction + case JAclOperation.DESCRIBE_CONFIGS => AclOperation.DescribeConfigs + case JAclOperation.ALTER_CONFIGS => AclOperation.AlterConfigs + case JAclOperation.IDEMPOTENT_WRITE => AclOperation.IdempotentWrite + case JAclOperation.CREATE_TOKENS => AclOperation.CreateTokens + case JAclOperation.DESCRIBE_TOKENS => AclOperation.DescribeTokens + case JAclOperation.TWO_PHASE_COMMIT => AclOperation.TwoPhaseCommit + } + } + +}