diff --git a/project/build.properties b/project/build.properties index 64cf32f7f..23050492c 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.1.4 +sbt.version=1.1.4 \ No newline at end of file diff --git a/src/main/scala/com/lightbend/kafka/scala/streams/KafkaStreamsS.scala b/src/main/scala/com/lightbend/kafka/scala/streams/KafkaStreamsS.scala new file mode 100644 index 000000000..f77f50bb0 --- /dev/null +++ b/src/main/scala/com/lightbend/kafka/scala/streams/KafkaStreamsS.scala @@ -0,0 +1,80 @@ +package com.lightbend.kafka.scala.streams + +import java.util.Properties + +import org.apache.kafka.common.{Metric, MetricName} +import org.apache.kafka.streams.processor.{StateRestoreListener, StreamPartitioner, ThreadMetadata} +import org.apache.kafka.streams.state.{QueryableStoreType, StreamsMetadata} +import org.apache.kafka.streams.{KafkaClientSupplier, KafkaStreams, StreamsConfig, Topology} + +import scala.collection.JavaConverters._ + +class KafkaStreamsS(inner: KafkaStreams) { + + def allMetadata(): Iterable[StreamsMetadata] = + inner.allMetadata().asScala + + def allMetadataForStore(storeName: String): Iterable[StreamsMetadata] = + inner.allMetadataForStore(storeName).asScala + + def cleanUp() = { + inner.cleanUp() + this + } + + def close() = + inner.close() + + def close(timeout: Long, timeUnit: java.util.concurrent.TimeUnit) = + inner.close(timeout, timeUnit) + + def localThreadsMetadata(): Set[ThreadMetadata] = + inner.localThreadsMetadata.asScala.toSet + + def metadataForKey[K](storeName: String, key: K, keySerializer: Serializer[K]): StreamsMetadata = + inner.metadataForKey(storeName, key, keySerializer) + + def metadataForKey[K](storeName: String, key: K, partitioner: StreamPartitioner[_ >: K, _]): StreamsMetadata = + inner.metadataForKey(storeName, key, partitioner) + + def metrics(): Map[MetricName, _ <: Metric] = + inner.metrics().asScala.toMap + + def withGlobalStateRestoreListener(globalStateRestoreListener: StateRestoreListener) = { + inner.setGlobalStateRestoreListener(globalStateRestoreListener) + this + } + + def withStateListener(listener: KafkaStreams.StateListener) = { + inner.setStateListener(listener) + this + } + + def withUncaughtExceptionHandler(eh: java.lang.Thread.UncaughtExceptionHandler) = { + inner.setUncaughtExceptionHandler(eh) + this + } + + def start(): KafkaStreamsS = { + inner.start() + this + } + + def state(): KafkaStreams.State = + inner.state() + + def store[T](storeName: String, queryableStoreType: QueryableStoreType[T]) = + inner.store(storeName, queryableStoreType) +} + +object KafkaStreamsS { + def apply(s: StreamsBuilderS, p: Properties): KafkaStreamsS = new KafkaStreamsS(new KafkaStreams(s.build(), p)) + + def apply(topology: Topology, p: Properties): KafkaStreamsS = new KafkaStreamsS(new KafkaStreams(topology, p)) + + def apply(topology: Topology, config: StreamsConfig) = new KafkaStreamsS(new KafkaStreams(topology, config)) + + def apply(topology: Topology, config: StreamsConfig, clientSupplier: KafkaClientSupplier) = + new KafkaStreamsS(new KafkaStreams(topology, config, clientSupplier)) + +} diff --git a/src/test/scala/com/lightbend/kafka/scala/server/KafkaLocalServer.scala b/src/test/scala/com/lightbend/kafka/scala/server/KafkaLocalServer.scala index d446a6bfd..01552abe5 100644 --- a/src/test/scala/com/lightbend/kafka/scala/server/KafkaLocalServer.scala +++ b/src/test/scala/com/lightbend/kafka/scala/server/KafkaLocalServer.scala @@ -31,10 +31,11 @@ class KafkaLocalServer private (kafkaProperties: Properties, zooKeeperServer: Zo DEFAULT_ZK_CONNECTION_TIMEOUT_MS, false) - def start(): Unit = { + def start(): KafkaLocalServer = { broker = KafkaServerStartable.fromProps(kafkaProperties) broker.startup() + this } //scalastyle:off null diff --git a/src/test/scala/com/lightbend/kafka/scala/streams/KafkaStreamsTest.scala b/src/test/scala/com/lightbend/kafka/scala/streams/KafkaStreamsTest.scala index 4c1a23277..bff177453 100644 --- a/src/test/scala/com/lightbend/kafka/scala/streams/KafkaStreamsTest.scala +++ b/src/test/scala/com/lightbend/kafka/scala/streams/KafkaStreamsTest.scala @@ -10,17 +10,14 @@ import com.lightbend.kafka.scala.server.{KafkaLocalServer, MessageListener, Mess import minitest.TestSuite import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization._ -import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig} +import org.apache.kafka.streams.{KeyValue, StreamsConfig} import ImplicitConversions._ import com.typesafe.scalalogging.LazyLogging object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestData with LazyLogging { - override def setup(): KafkaLocalServer = { - val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir)) - s.start() - s - } + override def setup(): KafkaLocalServer = + KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start() override def tearDown(server: KafkaLocalServer): Unit = server.stop() @@ -55,8 +52,7 @@ object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestDa wordCounts.toStream.to(outputTopic) - val streams = new KafkaStreams(builder.build(), streamsConfiguration) - streams.start() + val streams = KafkaStreamsS(builder, streamsConfiguration).start() // // Step 2: Produce some input data to the input topic. diff --git a/src/test/scala/com/lightbend/kafka/scala/streams/ProbabilisticCountingScalaIntegrationTest.scala b/src/test/scala/com/lightbend/kafka/scala/streams/ProbabilisticCountingScalaIntegrationTest.scala index 6441d1b78..66a8e2d12 100644 --- a/src/test/scala/com/lightbend/kafka/scala/streams/ProbabilisticCountingScalaIntegrationTest.scala +++ b/src/test/scala/com/lightbend/kafka/scala/streams/ProbabilisticCountingScalaIntegrationTest.scala @@ -28,7 +28,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization._ import org.apache.kafka.streams.kstream.Transformer import org.apache.kafka.streams.processor.ProcessorContext -import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig} +import org.apache.kafka.streams.{KeyValue, StreamsConfig} import ImplicitConversions._ import com.typesafe.scalalogging.LazyLogging @@ -69,11 +69,8 @@ object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalServer] with ProbabilisticCountingScalaIntegrationTestData { - override def setup(): KafkaLocalServer = { - val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir)) - s.start() - s - } + override def setup(): KafkaLocalServer = + KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start() override def tearDown(server: KafkaLocalServer): Unit = server.stop() @@ -149,8 +146,7 @@ object ProbabilisticCountingScalaIntegrationTest .transform(() => new ProbabilisticCounter, cmsStoreName) .to(outputTopic) - val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration) - streams.start() + val streams: KafkaStreamsS = KafkaStreamsS(builder, streamsConfiguration).start() // Step 2: Publish some input text lines. val sender = diff --git a/src/test/scala/com/lightbend/kafka/scala/streams/PunctuateTest.scala b/src/test/scala/com/lightbend/kafka/scala/streams/PunctuateTest.scala index d0510c536..0d2da9438 100644 --- a/src/test/scala/com/lightbend/kafka/scala/streams/PunctuateTest.scala +++ b/src/test/scala/com/lightbend/kafka/scala/streams/PunctuateTest.scala @@ -10,7 +10,7 @@ import com.typesafe.scalalogging.LazyLogging import minitest.TestSuite import org.apache.kafka.common.serialization._ import org.apache.kafka.streams.processor.{AbstractProcessor, ProcessorContext, PunctuationType} -import org.apache.kafka.streams.{KafkaStreams, StreamsConfig, Topology} +import org.apache.kafka.streams.{StreamsConfig, Topology} /** * This sample is using usage of punctuate, which is significantly changed in version 1.0 and @@ -20,11 +20,8 @@ import org.apache.kafka.streams.{KafkaStreams, StreamsConfig, Topology} */ object PunctuateTest extends TestSuite[KafkaLocalServer] with PunctuateTestData with LazyLogging { - override def setup(): KafkaLocalServer = { - val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir)) - s.start() - s - } + override def setup(): KafkaLocalServer = + KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start() override def tearDown(server: KafkaLocalServer): Unit = server.stop() @@ -49,8 +46,7 @@ object PunctuateTest extends TestSuite[KafkaLocalServer] with PunctuateTestData topology.addSource("data", inputTopic) // Processors topology.addProcessor("data processor", () => new SampleProcessor(5000), "data") - val streams = new KafkaStreams(topology, streamsConfiguration) - streams.start() + val streams = KafkaStreamsS(topology, streamsConfiguration).start() // Allpw time for the streams to start up Thread.sleep(5000L) diff --git a/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala index 5b08474e3..e3ea0e3b3 100644 --- a/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala +++ b/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala @@ -46,11 +46,8 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdes with StreamToTableJoinTestData with LazyLogging { - override def setup(): KafkaLocalServer = { - val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir)) - s.start() - s - } + override def setup(): KafkaLocalServer = + KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start() override def tearDown(server: KafkaLocalServer): Unit = server.stop() @@ -106,24 +103,24 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdes // Write the (continuously updating) results to the output topic. clicksPerRegion.toStream.to(outputTopic) - val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration) - - streams.setUncaughtExceptionHandler( - (_: Thread, e: Throwable) => - try { - logger.error(s"Stream terminated because of uncaught exception .. Shutting down app", e) - e.printStackTrace() - val closed: Unit = streams.close() - logger.info(s"Exiting application after streams close ($closed)") - } catch { - case x: Exception => x.printStackTrace() - } finally { - logger.debug("Exiting application ..") - System.exit(-1) - } - ) - - streams.start() + val streams: KafkaStreamsS = KafkaStreamsS(builder, streamsConfiguration) + + streams + .withUncaughtExceptionHandler( + (_: Thread, e: Throwable) => + try { + logger.error(s"Stream terminated because of uncaught exception .. Shutting down app", e) + e.printStackTrace() + val closed = streams.close() + logger.info(s"Exiting application after streams close ($closed)") + } catch { + case x: Exception => x.printStackTrace() + } finally { + logger.debug("Exiting application ..") + System.exit(-1) + } + ) + .start() // // Step 2: Publish user-region information. diff --git a/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala b/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala index e0279c994..34a5a4fb7 100644 --- a/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala +++ b/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala @@ -29,10 +29,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization._ import org.apache.kafka.streams._ import ImplicitConversions._ +import com.typesafe.scalalogging.LazyLogging object StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro extends TestSuite[KafkaLocalServer] - with StreamToTableJoinTestData { + with StreamToTableJoinTestData + with LazyLogging { case class UserClicks(clicks: Long) @@ -78,11 +80,8 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro * must be `static` and `public`) to a workaround combination of `@Rule * def` and a `private val`. */ - override def setup(): KafkaLocalServer = { - val s = KafkaLocalServer(true, Some(localStateDir)) - s.start() - s - } + override def setup(): KafkaLocalServer = + KafkaLocalServer(true, Some(localStateDir)).start() override def tearDown(server: KafkaLocalServer): Unit = server.stop() @@ -138,27 +137,24 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro // Write the (continuously updating) results to the output topic. clicksPerRegion.toStream.to(outputTopic) - val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration) - + val streams: KafkaStreamsS = KafkaStreamsS(builder.build(), streamsConfiguration) streams - .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + .withUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { override def uncaughtException(t: Thread, e: Throwable): Unit = try { - println(s"Stream terminated because of uncaught exception .. Shutting " + - s"down app", - e) - e.printStackTrace + logger.error(s"Stream terminated because of uncaught exception .. Shutting " + + s"down app", + e) val closed = streams.close() - println(s"Exiting application after streams close ($closed)") + logger.debug(s"Exiting application after streams close ($closed)") } catch { case x: Exception => x.printStackTrace } finally { - println("Exiting application ..") + logger.debug("Exiting application ..") System.exit(-1) } }) - - streams.start() + .start() // // Step 2: Publish user-region information.