Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.opensearch.core.common.breaker.CircuitBreakingException
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.core.rest.RestStatus
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.node.NodeClosedException
import org.opensearch.transport.ActionNotFoundTransportException
import org.opensearch.transport.ConnectTransportException
Expand Down Expand Up @@ -188,12 +187,12 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
)
MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently)
}

val shardCount: Int = getShardsCount(monitorCtx.clusterService!!, concreteIndexName)
// Prepare updatedLastRunContext for each index
val indexUpdatedRunContext = initializeNewLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
concreteIndexName,
shardCount
) as MutableMap<String, Any>
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
Expand All @@ -217,8 +216,15 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
// update lastRunContext if its a temp monitor as we only want to view the last bit of data then
// TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data
if (isTempMonitor) {
indexLastRunContext[shard] =
max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10)
indexLastRunContext[shard] = if (indexLastRunContext.containsKey(shard)) {
if (indexLastRunContext[shard] is Long) {
max(-1L, indexUpdatedRunContext[shard] as Long - 10L)
} else if (indexLastRunContext[shard] is Int) {
max(-1L, (indexUpdatedRunContext[shard] as Int).toLong() - 10L)
} else -1L
} else {
-1L
}
}
}
val indexExecutionContext = IndexExecutionContext(
Expand Down Expand Up @@ -421,7 +427,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
return monitorResult.copy(triggerResults = triggerResults, inputResults = inputRunResults)
} catch (e: Exception) {
val errorMessage = ExceptionsHelper.detailedMessage(e)
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, executionId, workflowRunContext)
if (false == dryrun) {
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, executionId, workflowRunContext)
}
logger.error("Failed running Document-level-monitor ${monitor.name}", e)
val alertingException = AlertingException(
errorMessage,
Expand Down Expand Up @@ -455,7 +463,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
if (fanOutResponse.lastRunContexts.contains("index") && fanOutResponse.lastRunContexts["index"] == indexName) {
fanOutResponse.lastRunContexts.keys.forEach {

val seq_no = fanOutResponse.lastRunContexts[it].toString().toIntOrNull()
val seq_no = fanOutResponse.lastRunContexts[it].toString().toLongOrNull()
if (
it != "shards_count" &&
it != "index" &&
Expand Down Expand Up @@ -560,20 +568,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
return InputRunResults(listOf(inputRunResults), if (!errors.isEmpty()) AlertingException.merge(*errors.toTypedArray()) else null)
}

private fun initializeNewLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
index: String,
): Map<String, Any> {
val count: Int = getShardsCount(monitorCtx.clusterService!!, index)
val updatedLastRunContext = lastRunContext.toMutableMap()
for (i: Int in 0 until count) {
val shard = i.toString()
updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO.toString()
}
return updatedLastRunContext
}

private fun validate(monitor: Monitor) {
if (monitor.inputs.size > 1) {
throw IOException("Only one input is supported with document-level-monitor.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.opensearch.core.index.Index
import org.opensearch.core.index.shard.ShardId
import org.opensearch.index.seqno.SequenceNumbers

private val logger: Logger = LogManager.getLogger("FanOutEligibility")

Expand Down Expand Up @@ -42,3 +43,37 @@ fun distributeShards(

return nodeShardAssignments
}

/**
* Initializes the last run context for a given index.
*
* This method prepares the context structure to be updated later by fan-out operations.
* It preserves existing sequence numbers and initializes new shards with UNASSIGNED_SEQ_NO.
*
* @param lastRunContext The previous run context from monitor metadata
* @param monitorCtx The execution context containing cluster service
* @param index The name of the index for which the context is being initialized
* @return A map containing the initialized last run context
*/
fun initializeNewLastRunContext(
lastRunContext: Map<String, Any>,
index: String,
shardCount: Int,
): Map<String, Any> {
val updatedLastRunContext = lastRunContext.toMutableMap()

// Only initialize shards that don't already have a sequence number
for (i: Int in 0 until shardCount) {
val shard = i.toString()
// Preserve existing sequence numbers instead of resetting to UNASSIGNED
if (!updatedLastRunContext.containsKey(shard)) {
updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO
}
}

// Metadata fields
updatedLastRunContext["shards_count"] = shardCount
updatedLastRunContext["index"] = index

return updatedLastRunContext
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.Version
import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.MonitorRunner
import org.opensearch.alerting.MonitorRunnerExecutionContext
import org.opensearch.alerting.initializeNewLastRunContext
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.node.DiscoveryNode
Expand All @@ -27,7 +28,6 @@ import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonit
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.index.shard.ShardId
import org.opensearch.core.rest.RestStatus
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.transport.TransportService
import java.io.IOException
import java.time.Instant
Expand Down Expand Up @@ -121,11 +121,11 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
)
MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently)
}

val shardCount: Int = getShardsCount(monitorCtx.clusterService!!, concreteIndexName)
val indexUpdatedRunContext = initializeNewLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
concreteIndexName
concreteIndexName,
shardCount
) as MutableMap<String, Any>
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
Expand Down Expand Up @@ -269,12 +269,12 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
if (fanOutResponse.lastRunContexts.contains(indexName)) {
(fanOutResponse.lastRunContexts[indexName] as Map<String, Any>).forEach {

val seq_no = it.value.toString().toIntOrNull()
val seq_no = it.value.toString().toLongOrNull()
if (
it.key != "shards_count" &&
it.key != "index" &&
seq_no != null &&
seq_no >= 0
seq_no >= 0L
) {
indexLastRunContext[it.key] = seq_no
}
Expand Down Expand Up @@ -370,18 +370,4 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
val indexCreationDate = indexMetadata.settings.get("index.creation_date")?.toLong() ?: 0L
return indexCreationDate > lastExecutionTime.toEpochMilli()
}

private fun initializeNewLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
index: String,
): Map<String, Any> {
val count: Int = getShardsCount(monitorCtx.clusterService!!, index)
val updatedLastRunContext = lastRunContext.toMutableMap()
for (i: Int in 0 until count) {
val shard = i.toString()
updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO
}
return updatedLastRunContext
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class TransportDocLevelMonitorFanOutAction
val alertService: AlertService,
val scriptService: ScriptService,
val settings: Settings,
val xContentRegistry: NamedXContentRegistry
val xContentRegistry: NamedXContentRegistry,
) : HandledTransportAction<DocLevelMonitorFanOutRequest, DocLevelMonitorFanOutResponse>(
DocLevelMonitorFanOutAction.NAME, transportService, actionFilters, ::DocLevelMonitorFanOutRequest
),
Expand Down Expand Up @@ -772,7 +772,7 @@ class TransportDocLevelMonitorFanOutAction
fieldsToBeQueried: List<String>,
shardList: List<Int>,
transformedDocs: MutableList<Pair<String, TransformedDocDto>>,
updateLastRunContext: (String, String) -> Unit
updateLastRunContext: (String, Long) -> Unit
) {
for (shardId in shardList) {
val shard = shardId.toString()
Expand All @@ -798,7 +798,7 @@ class TransportDocLevelMonitorFanOutAction

if (maxSeqNo == null || maxSeqNo <= from) {
// No new documents to process
updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED).toString())
updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED))
continue
}
// Process documents in chunks between prevSeqNo and maxSeqNo
Expand Down Expand Up @@ -872,7 +872,7 @@ class TransportDocLevelMonitorFanOutAction
// Move to next chunk - use the last document's sequence number
currentSeqNo = hits.hits.last().seqNo
// update last seen sequence number after every set of seen docs
updateLastRunContext(shard, currentSeqNo.toString())
updateLastRunContext(shard, currentSeqNo)
}
} catch (e: Exception) {
log.error(
Expand Down
50 changes: 18 additions & 32 deletions alerting/src/test/kotlin/org/opensearch/alerting/DocLeveFanOutIT.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.opensearch.alerting

import org.junit.Assert
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
Expand All @@ -11,14 +11,12 @@ import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit.MILLIS

@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
class DocLeveFanOutIT : AlertingRestTestCase() {

fun `test execution reaches endtime before completing execution`() {
var updateSettings =
adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_EXECUTION_MAX_DURATION.key, TimeValue.timeValueNanos(1))
val updateSettings1 = adminClient().updateSettings(AlertingSettings.FINDING_HISTORY_ENABLED.key, false)
logger.info(updateSettings1)
logger.info(updateSettings)
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
Expand All @@ -41,45 +39,33 @@ class DocLeveFanOutIT : AlertingRestTestCase() {
}

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
val trigger3 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
val trigger4 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
val trigger5 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
val trigger6 = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)

val monitor = createMonitor(
randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger, trigger1, trigger2, trigger3, trigger4, trigger5, trigger6)
triggers = listOf(trigger)
)
)
assertNotNull(monitor.id)

executeMonitor(monitor.id)
indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)
indexDoc(testIndex, "2", testDoc)

var response = executeMonitor(monitor.id)

var output = entityAsMap(response)

assertEquals(monitor.name, output["monitor_name"])
@Suppress("UNCHECKED_CAST")
var inputResults = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
Assert.assertTrue(inputResults.isEmpty())

updateSettings =
adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_EXECUTION_MAX_DURATION.key, TimeValue.timeValueMinutes(4))
logger.info(updateSettings)

response = executeMonitor(monitor.id)
output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])
inputResults = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val matchingDocsToQuery = inputResults[docQuery.id] as List<String>
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex"))
assertTrue("Incorrect search result", matchingDocsToQuery.contains("5|$testIndex"))
val findings1 = searchFindings(monitor)
val findingsSize1 = findings1.size
assertEquals(findingsSize1, 2)
adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_EXECUTION_MAX_DURATION.key, TimeValue.timeValueNanos(1))
executeMonitor(monitor.id)
Thread.sleep(1000)
adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_EXECUTION_MAX_DURATION.key, TimeValue.timeValueMinutes(4))
indexDoc(testIndex, "3", testDoc)
indexDoc(testIndex, "4", testDoc)
executeMonitor(monitor.id)
val findings = searchFindings(monitor)
val findingsSize = findings.size
assertEquals(findingsSize, 4)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,15 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
val monitor = createMonitor(
randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(
randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))
)
)
)
assertNotNull(monitor.id)

indexDoc(index, "1", testDoc)
indexDoc(index, "2", testDoc)
Expand All @@ -206,7 +211,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
indexDoc(index, "51", testDoc)

deleteDoc(index, "51")
val response = executeMonitor(monitor, params = mapOf("dryrun" to "false"))
val response = executeMonitor(monitor.id)

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])
Expand Down
Loading