Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,11 @@ class TransportDocLevelMonitorFanOutAction

// Before the "|" is the doc id and after the "|" is the index
val docIndex = it.key.split("|")
val additionalFields = this.fetchDocForFinding(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making a search call per doc is not going to scale.

We have the document in memory as part of the monitor execution. Can we add these additional fields when we fetch that document and then carry them forward from memory?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@engechas findings are sparse
IMO we should do a second search in bulk for all finding-generating docs

docIndex[1],
docIndex[0],
monitor.metadataForFindings!!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need version check here for serde?
what happens in upgrade or blue green scenarios?

)

val finding = Finding(
id = UUID.randomUUID().toString(),
Expand All @@ -585,7 +590,8 @@ class TransportDocLevelMonitorFanOutAction
index = docIndex[1],
docLevelQueries = triggeredQueries,
timestamp = Instant.now(),
executionId = workflowExecutionId
executionId = workflowExecutionId,
additionalFields = additionalFields
)
findingDocPairs.add(Pair(finding.id, it.key))
findings.add(finding)
Expand Down Expand Up @@ -1107,6 +1113,41 @@ class TransportDocLevelMonitorFanOutAction
return response.hits
}

private suspend fun fetchDocForFinding(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz add debug and info logs
time taken for search request

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetch all docs for findings in single call

index: String,
docId: String,
fields: List<String>
): Map<String, List<Any>> {
val qb = QueryBuilders.matchQuery("_id", docId)

val request: SearchRequest = SearchRequest()
.indices(index)
.source(
SearchSourceBuilder()
.version(true)
.sort("_seq_no", SortOrder.ASC)
.seqNoAndPrimaryTerm(true)
.query(qb)
.fetchSource(false)
)

if (fields.isNotEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check should be preliminary to assert that if its empty this search is useless

for (field in fields) {
request.source().fetchField(field)
}
}
val response: SearchResponse = client.suspendUntil { client.search(request, it) }
if (response.status() !== RestStatus.OK) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error log

throw IOException("Failed to search in index [$index]. Response status is ${response.status()}")
}

val additionalFields: MutableMap<String, List<Any>> = mutableMapOf()
for (field in response.hits.hits[0].fields) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check for empty hits
this is a walking ArrayOutOfBoundsException.

additionalFields[field.key] = field.value.values
}
return additionalFields
}

/** Transform field names and index names in all the search hits to format required to run percolate search against them.
* Hits are transformed using method transformDocumentFieldNames() */
private fun transformSearchHitsAndReconstructDocs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@
},
"execution_id": {
"type": "keyword"
},
"additional_fields": {
"type": "object",
"enabled": false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3027,4 +3027,40 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
)
assertEquals(found.get(), false)
}

fun `test execute monitor with dryrun with finding metadata enabled`() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why dry run
plz add more test cases

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add test where additional fields are invalid. additoinal fields list is empty.


val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val index = createTestIndex()

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = createMonitor(
randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))),
metadataForFindings = listOf("test_field", "message")
)
)

indexDoc(index, "1", testDoc)

val response = executeMonitor(monitor.id)

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

assertEquals(1, output.objectMap("trigger_results").values.size)

val findings = searchFindings(monitor)
assertEquals(2, findings[0].additionalFields!!.size)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,13 @@ fun randomDocumentLevelMonitor(
triggers: List<Trigger> = (1..randomInt(10)).map { randomQueryLevelTrigger() },
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
withMetadata: Boolean = false
withMetadata: Boolean = false,
metadataForFindings: List<String>? = null
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), metadataForFindings = metadataForFindings
)
}

Expand Down
Loading