diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 6db32af04..a8ebc0263 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -575,6 +575,11 @@ class TransportDocLevelMonitorFanOutAction // Before the "|" is the doc id and after the "|" is the index val docIndex = it.key.split("|") + val additionalFields = this.fetchDocForFinding( + docIndex[1], + docIndex[0], + monitor.metadataForFindings!! + ) val finding = Finding( id = UUID.randomUUID().toString(), @@ -585,7 +590,8 @@ class TransportDocLevelMonitorFanOutAction index = docIndex[1], docLevelQueries = triggeredQueries, timestamp = Instant.now(), - executionId = workflowExecutionId + executionId = workflowExecutionId, + additionalFields = additionalFields ) findingDocPairs.add(Pair(finding.id, it.key)) findings.add(finding) @@ -1107,6 +1113,41 @@ class TransportDocLevelMonitorFanOutAction return response.hits } + private suspend fun fetchDocForFinding( + index: String, + docId: String, + fields: List + ): Map> { + val qb = QueryBuilders.matchQuery("_id", docId) + + val request: SearchRequest = SearchRequest() + .indices(index) + .source( + SearchSourceBuilder() + .version(true) + .sort("_seq_no", SortOrder.ASC) + .seqNoAndPrimaryTerm(true) + .query(qb) + .fetchSource(false) + ) + + if (fields.isNotEmpty()) { + for (field in fields) { + request.source().fetchField(field) + } + } + val response: SearchResponse = client.suspendUntil { client.search(request, it) } + if (response.status() !== RestStatus.OK) { + throw IOException("Failed to search in index [$index]. Response status is ${response.status()}") + } + + val additionalFields: MutableMap> = mutableMapOf() + for (field in response.hits.hits[0].fields) { + additionalFields[field.key] = field.value.values + } + return additionalFields + } + /** Transform field names and index names in all the search hits to format required to run percolate search against them. * Hits are transformed using method transformDocumentFieldNames() */ private fun transformSearchHitsAndReconstructDocs( diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json index 1bfea4ebc..c606e42e5 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json @@ -69,6 +69,10 @@ }, "execution_id": { "type": "keyword" + }, + "additional_fields": { + "type": "object", + "enabled": false } } } \ No newline at end of file diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 850055fad..daf8f19c2 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -3027,4 +3027,40 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { ) assertEquals(found.get(), false) } + + fun `test execute monitor with dryrun with finding metadata enabled`() { + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))), + metadataForFindings = listOf("test_field", "message") + ) + ) + + indexDoc(index, "1", testDoc) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + assertEquals(1, output.objectMap("trigger_results").values.size) + + val findings = searchFindings(monitor) + assertEquals(2, findings[0].additionalFields!!.size) + } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 2330974f4..cc1238248 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -196,12 +196,13 @@ fun randomDocumentLevelMonitor( triggers: List = (1..randomInt(10)).map { randomQueryLevelTrigger() }, enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), - withMetadata: Boolean = false + withMetadata: Boolean = false, + metadataForFindings: List? = null ): Monitor { return Monitor( name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, - uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), metadataForFindings = metadataForFindings ) }