diff --git a/build.sbt b/build.sbt index a67ca9b..fce883e 100644 --- a/build.sbt +++ b/build.sbt @@ -48,6 +48,10 @@ lazy val elasticsearch = project.in(file("instrumentation/elasticsearch")) .settings(CommonProject.settings) .settings( name := "logging-testkit-elasticsearch", - libraryDependencies += aws.logs, + libraryDependencies ++= Seq( + elastic4s.aws, + elastic4s.core, + elastic4s.http + ), coverageMinimum := 100 ) diff --git a/instrumentation/elasticsearch/src/main/scala/net/cakesolutions/testkit/logging/elasticsearch/ElasticSearchLogClient.scala b/instrumentation/elasticsearch/src/main/scala/net/cakesolutions/testkit/logging/elasticsearch/ElasticSearchLogClient.scala new file mode 100644 index 0000000..af86afe --- /dev/null +++ b/instrumentation/elasticsearch/src/main/scala/net/cakesolutions/testkit/logging/elasticsearch/ElasticSearchLogClient.scala @@ -0,0 +1,62 @@ +// Copyright 2018 Cake Solutions Limited + +package net.cakesolutions.testkit.logging.elasticsearch + +import scala.concurrent.Future +import com.sksamuel.elastic4s.aws.{Aws4ElasticClient, Aws4ElasticConfig} +import com.sksamuel.elastic4s.http.RequestFailure +import com.sksamuel.elastic4s.http.ElasticDsl._ +import com.sksamuel.elastic4s.searches.SearchDefinition +import monix.execution.Scheduler +import monix.reactive.Observable + +/** + * Client for querying logs from ElasticSearch. + * + * @param config AWS elasticsearch configuration + */ +class ElasticSearchLogClient(config: Aws4ElasticConfig) { + import ElasticSearchLogClient._ + + private val elasticSearchClient = Aws4ElasticClient(config) + + /** + * Query ElasticSearch and return the results as an Observable. + * + * @param searchDef search query + * @param scheduler monix scheduler + * @return query results as observable stream + */ + def search(searchDef: SearchDefinition)( + implicit scheduler: Scheduler + ): Observable[String] = { + Observable + .fromFuture( + elasticSearchClient.execute(searchDef).flatMap { + case Left(failure) => + Future.failed(ElasticSearchRequestFailureException(failure)) + case Right(result) => + Future.successful(result) + } + ) + .flatMap { result => + Observable.fromIterable( + result.result.hits.hits.map(_.sourceAsString) + ) + } + } +} + +object ElasticSearchLogClient { + + /** + * Execution of an ElasticSearch request failed. + * + * @param requestFailure failure response + */ + case class ElasticSearchRequestFailureException( + requestFailure: RequestFailure + ) extends Exception( + s"ElasticSearch request failed: ${requestFailure.error.reason}" + ) +} diff --git a/instrumentation/elasticsearch/src/main/scala/net/cakesolutions/testkit/logging/elasticsearch/ElasticSearchLogLineSource.scala b/instrumentation/elasticsearch/src/main/scala/net/cakesolutions/testkit/logging/elasticsearch/ElasticSearchLogLineSource.scala deleted file mode 100644 index d736a03..0000000 --- a/instrumentation/elasticsearch/src/main/scala/net/cakesolutions/testkit/logging/elasticsearch/ElasticSearchLogLineSource.scala +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2018 Cake Solutions Limited - -package net.cakesolutions.testkit.logging.elasticsearch - -import scala.concurrent.Promise - -import io.circe.Json -import monix.execution.Scheduler -import monix.reactive.observers.Subscriber -import net.cakesolutions.testkit.logging.LineLoggingSource - -// $COVERAGE-OFF$ disabled until this is implemented - -object ElasticSearchLogLineSource extends LineLoggingSource[Json] { - - /** @see net.cakesolutions.testkit.logging.LoggingSource */ - override protected def subscriberPolling( - subscriber: Subscriber[Json], - cancelP: Promise[Unit] - )(implicit - scheduler: Scheduler - ): Unit = { - ??? // TODO: needs implementing - } -} - -// $COVERAGE-ON$ diff --git a/project/CommonProject.scala b/project/CommonProject.scala index 5d29c7d..74cf0a8 100644 --- a/project/CommonProject.scala +++ b/project/CommonProject.scala @@ -31,6 +31,8 @@ object CommonProject { "-Ypartial-unification", "-Xfatal-warnings" ), + // Disable unused import warnings in Scala console. + scalacOptions in (Compile, console) -= "-Ywarn-unused-import", scalacOptions in (Compile, doc) ++= { val nm = (name in(Compile, doc)).value val ver = (version in(Compile, doc)).value diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 0ca037b..9953f47 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -30,6 +30,14 @@ object Dependencies { val reactive: ModuleID = "io.monix" %% "monix-reactive" % version } + object elastic4s { + private val version = "6.1.4" + + val aws: ModuleID = "com.sksamuel.elastic4s" %% "elastic4s-aws" % version + val core: ModuleID = "com.sksamuel.elastic4s" %% "elastic4s-core" % version + val http: ModuleID = "com.sksamuel.elastic4s" %% "elastic4s-http" % version + } + val scalacheck: ModuleID = "org.scalacheck" %% "scalacheck" % "1.13.5" val scalatest: ModuleID = "org.scalatest" %% "scalatest" % "3.0.5" }