Skip to content
This repository was archived by the owner on Apr 8, 2021. It is now read-only.

Commit eb52139

Browse files
author
Alex Horsman
committed
Implemented ElasticSearchLogSource.
1 parent cedeb98 commit eb52139

File tree

2 files changed

+66
-27
lines changed

2 files changed

+66
-27
lines changed

instrumentation/elasticsearch/src/main/scala/net/cakesolutions/testkit/logging/elasticsearch/ElasticSearchLogLineSource.scala

Lines changed: 0 additions & 27 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright 2018 Cake Solutions Limited
2+
3+
package net.cakesolutions.testkit.logging.elasticsearch
4+
5+
import scala.concurrent.Future
6+
import com.sksamuel.elastic4s.aws.{Aws4ElasticClient, Aws4ElasticConfig}
7+
import com.sksamuel.elastic4s.http.RequestFailure
8+
import com.sksamuel.elastic4s.http.ElasticDsl._
9+
import com.sksamuel.elastic4s.searches.SearchDefinition
10+
import com.typesafe.scalalogging.Logger
11+
import monix.execution.{Cancelable, Scheduler}
12+
import monix.reactive.{Observable, OverflowStrategy}
13+
import net.cakesolutions.testkit.config.Configuration.Logging
14+
15+
import scala.util.control.NonFatal
16+
17+
final class ElasticSearchLogSource(
18+
search: SearchDefinition,
19+
config: Aws4ElasticConfig
20+
) {
21+
import ElasticSearchLogSource._
22+
23+
private val logger = Logger(Logging.name)
24+
25+
def source()(implicit scheduler: Scheduler): Observable[String] =
26+
Observable.create[String](
27+
OverflowStrategy.Unbounded
28+
) { subscriber =>
29+
try {
30+
scheduler.execute {
31+
new Runnable {
32+
override def run(): Unit = {
33+
val awsElasticClient = Aws4ElasticClient.apply(config)
34+
Observable
35+
.fromFuture(awsElasticClient.execute(search).flatMap {
36+
case Left(failure) =>
37+
Future.failed(
38+
ElasticSearchRequestFailureException(failure)
39+
)
40+
case Right(result) =>
41+
Future.successful(result)
42+
})
43+
.flatMap { result =>
44+
Observable.fromIterable(
45+
result.result.hits.hits.map(_.sourceAsString)
46+
)
47+
}
48+
}
49+
}
50+
}
51+
} catch {
52+
case NonFatal(error) =>
53+
logger.error("Log parsing error", error)
54+
subscriber.onError(error)
55+
}
56+
Cancelable(() => subscriber.onComplete())
57+
}
58+
}
59+
60+
object ElasticSearchLogSource {
61+
case class ElasticSearchRequestFailureException(
62+
requestFailure: RequestFailure
63+
) extends Exception(
64+
s"ElasticSearch request failed: ${requestFailure.error.reason}"
65+
)
66+
}

0 commit comments

Comments
 (0)