diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt index 4e6cdbc02..7acfdff8b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt @@ -94,7 +94,7 @@ abstract class MonitorRunner { } } - protected suspend fun getConfigAndSendNotification( + suspend fun getConfigAndSendNotification( action: Action, monitorCtx: MonitorRunnerExecutionContext, subject: String?, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index f8703aec2..a1737c913 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -33,6 +33,7 @@ import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.remote.monitors.RemoteDocumentLevelMonitorRunner import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry import org.opensearch.alerting.script.TriggerExecutionContext +import org.opensearch.alerting.script.TriggerV2ExecutionContext import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS @@ -59,10 +60,17 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.common.lifecycle.AbstractLifecycleComponent import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.ExecuteMonitorV2Request +import org.opensearch.commons.alerting.action.ExecuteMonitorV2Response import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.MonitorRunResult +import org.opensearch.commons.alerting.model.MonitorV2 +import org.opensearch.commons.alerting.model.MonitorV2RunResult +import org.opensearch.commons.alerting.model.PPLMonitor +import org.opensearch.commons.alerting.model.PPLMonitor.Companion.PPL_MONITOR_TYPE import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.TriggerRunResult import org.opensearch.commons.alerting.model.Workflow @@ -316,11 +324,16 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon logger.error("Failed to move active alerts for monitor [${job.id}].", e) } } + } else if (job is MonitorV2) { + return } else { throw IllegalArgumentException("Invalid job type") } } + // TODO: if MonitorV2 was deleted, skip trying to move alerts + // cluster throws failed to move alerts exception whenever a MonitorV2 is deleted + // because Alerting V2's stateless alerts don't need to be moved override fun postDelete(jobId: String) { launch { try { @@ -408,6 +421,44 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon } } } + is MonitorV2 -> { + if (job !is PPLMonitor) { + throw IllegalStateException("Unexpected invalid MonitorV2 type: ${job.javaClass.name}") + } + + launch { + var monitorLock: LockModel? = null + try { + monitorLock = monitorCtx.client!!.suspendUntil { + monitorCtx.lockService!!.acquireLock(job, it) + } ?: return@launch + logger.debug("lock ${monitorLock!!.lockId} acquired") + logger.debug( + "PERF_DEBUG: executing $PPL_MONITOR_TYPE ${job.id} on node " + + monitorCtx.clusterService!!.state().nodes().localNode.id + ) + val executeMonitorV2Request = ExecuteMonitorV2Request( + false, + job.id, // only need to pass in MonitorV2 ID + null, // no need to pass in MonitorV2 object itself + TimeValue(periodStart.toEpochMilli()), + TimeValue(periodEnd.toEpochMilli()) + ) + monitorCtx.client!!.suspendUntil { + monitorCtx.client!!.execute( + AlertingActions.EXECUTE_MONITOR_V2_ACTION_TYPE, + executeMonitorV2Request, + it + ) + } + } catch (e: Exception) { + logger.error("MonitorV2 run failed for monitor with id ${job.id}", e) + } finally { + monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.release(monitorLock, it) } + logger.debug("lock ${monitorLock?.lockId} released") + } + } + } else -> { throw IllegalArgumentException("Invalid job type") } @@ -433,20 +484,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon ): MonitorRunResult<*> { // Updating the scheduled job index at the start of monitor execution runs for when there is an upgrade the the schema mapping // has not been updated. - if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) { - IndexUtils.updateIndexMapping( - ScheduledJob.SCHEDULED_JOBS_INDEX, - ScheduledJobIndices.scheduledJobMappings(), monitorCtx.clusterService!!.state(), monitorCtx.client!!.admin().indices(), - object : ActionListener { - override fun onResponse(response: AcknowledgedResponse) { - } - - override fun onFailure(t: Exception) { - logger.error("Failed to update config index schema", t) - } - } - ) - } + updateAlertingConfigIndexSchema() if (job is Workflow) { logger.info("Executing scheduled workflow - id: ${job.id}, periodStart: $periodStart, periodEnd: $periodEnd, dryrun: $dryrun") @@ -539,6 +577,44 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon } } + // after the above JobRunner interface override runJob calls ExecuteMonitorV2 API, + // the ExecuteMonitorV2 transport action calls this function to call the PPLMonitorRunner, + // where the core PPL Monitor execution logic resides + suspend fun runJobV2( + monitorV2: MonitorV2, + periodStart: Instant, + periodEnd: Instant, + dryrun: Boolean, + transportService: TransportService, + ): MonitorV2RunResult<*> { + updateAlertingConfigIndexSchema() + + val executionId = "${monitorV2.id}_${LocalDateTime.now(ZoneOffset.UTC)}_${UUID.randomUUID()}" + val monitorV2Type = when (monitorV2) { + is PPLMonitor -> PPL_MONITOR_TYPE + else -> throw IllegalStateException("Unexpected MonitorV2 type: ${monitorV2.javaClass.name}") + } + + logger.info( + "Executing scheduled monitor - id: ${monitorV2.id}, type: $monitorV2Type, periodStart: $periodStart, " + + "periodEnd: $periodEnd, dryrun: $dryrun, executionId: $executionId" + ) + + // for now, always call PPLMonitorRunner since only PPL Monitors are initially supported + // to introduce new MonitorV2 type, create its MonitorRunner, and if/else branch + // to the corresponding MonitorRunners based on type. For now, default to PPLMonitorRunner + val runResult = PPLMonitorRunner.runMonitorV2( + monitorV2, + monitorCtx, + periodStart, + periodEnd, + dryrun, + executionId = executionId, + transportService = transportService, + ) + return runResult + } + // TODO: See if we can move below methods (or few of these) to a common utils internal fun getRolesForMonitor(monitor: Monitor): List { /* @@ -582,4 +658,27 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon .newInstance(template.params + mapOf("ctx" to ctx.asTemplateArg())) .execute() } + + internal fun compileTemplateV2(template: Script, ctx: TriggerV2ExecutionContext): String { + return monitorCtx.scriptService!!.compile(template, TemplateScript.CONTEXT) + .newInstance(template.params + mapOf("ctx" to ctx.asTemplateArg())) + .execute() + } + + private fun updateAlertingConfigIndexSchema() { + if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) { + IndexUtils.updateIndexMapping( + ScheduledJob.SCHEDULED_JOBS_INDEX, + ScheduledJobIndices.scheduledJobMappings(), monitorCtx.clusterService!!.state(), monitorCtx.client!!.admin().indices(), + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + } + + override fun onFailure(t: Exception) { + logger.error("Failed to update config index schema", t) + } + } + ) + } + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorV2Runner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorV2Runner.kt new file mode 100644 index 000000000..6474b3b8f --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorV2Runner.kt @@ -0,0 +1,18 @@ +package org.opensearch.alerting + +import org.opensearch.commons.alerting.model.MonitorV2 +import org.opensearch.commons.alerting.model.MonitorV2RunResult +import org.opensearch.transport.TransportService +import java.time.Instant + +interface MonitorV2Runner { + suspend fun runMonitorV2( + monitorV2: MonitorV2, + monitorCtx: MonitorRunnerExecutionContext, // MonitorV2 reads from same context as Monitor + periodStart: Instant, + periodEnd: Instant, + dryRun: Boolean, + executionId: String, + transportService: TransportService + ): MonitorV2RunResult<*> +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/PPLMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/PPLMonitorRunner.kt new file mode 100644 index 000000000..720168e94 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/PPLMonitorRunner.kt @@ -0,0 +1,640 @@ +package org.opensearch.alerting + +import org.apache.logging.log4j.LogManager +import org.json.JSONArray +import org.json.JSONObject +import org.opensearch.ExceptionsHelper +import org.opensearch.action.DocWriteRequest +import org.opensearch.action.bulk.BackoffPolicy +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.QueryLevelMonitorRunner.getConfigAndSendNotification +import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.opensearchapi.retry +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.script.PPLTriggerExecutionContext +import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.commons.alerting.alerts.AlertError +import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.AlertV2 +import org.opensearch.commons.alerting.model.MonitorV2 +import org.opensearch.commons.alerting.model.MonitorV2RunResult +import org.opensearch.commons.alerting.model.PPLMonitor +import org.opensearch.commons.alerting.model.PPLMonitorRunResult +import org.opensearch.commons.alerting.model.PPLTrigger +import org.opensearch.commons.alerting.model.PPLTrigger.ConditionType +import org.opensearch.commons.alerting.model.PPLTrigger.NumResultsCondition +import org.opensearch.commons.alerting.model.PPLTrigger.TriggerMode +import org.opensearch.commons.alerting.model.PPLTriggerRunResult +import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX +import org.opensearch.commons.alerting.model.TriggerV2 +import org.opensearch.commons.alerting.model.action.Action +import org.opensearch.commons.alerting.model.userErrorMessage +import org.opensearch.commons.ppl.PPLPluginInterface +import org.opensearch.core.common.Strings +import org.opensearch.core.rest.RestStatus +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.sql.plugin.transport.TransportPPLQueryRequest +import org.opensearch.transport.TransportService +import org.opensearch.transport.client.node.NodeClient +import java.time.Instant +import java.time.ZoneOffset.UTC +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit + +object PPLMonitorRunner : MonitorV2Runner { + private val logger = LogManager.getLogger(javaClass) + + private const val PPL_SQL_QUERY_FIELD = "query" // name of PPL query field when passing into PPL/SQL Execute API call + + override suspend fun runMonitorV2( + monitorV2: MonitorV2, + monitorCtx: MonitorRunnerExecutionContext, // MonitorV2 reads from same context as Monitor + periodStart: Instant, + periodEnd: Instant, + dryRun: Boolean, + executionId: String, + transportService: TransportService, + ): MonitorV2RunResult<*> { + if (monitorV2 !is PPLMonitor) { + throw IllegalStateException("Unexpected monitor type: ${monitorV2.javaClass.name}") + } + + if (monitorV2.id == MonitorV2.NO_ID) { + throw IllegalStateException("Received PPL Monitor to execute that unexpectedly has no ID") + } + + if (periodStart == periodEnd) { + logger.warn("Start and end time are the same: $periodStart. This PPL Monitor will probably only run once.") + } + + logger.debug("Running PPL Monitor: ${monitorV2.name}. Thread: ${Thread.currentThread().name}") + + val pplMonitor = monitorV2 + val nodeClient = monitorCtx.client as NodeClient + + // create some objects that will be used later + val triggerResults = mutableMapOf() + val pplQueryResults = mutableMapOf>() + + // set the current execution time + // use threadpool time for cross node consistency + val timeOfCurrentExecution = Instant.ofEpochMilli(MonitorRunnerService.monitorCtx.threadPool!!.absoluteTimeInMillis()) + + // TODO: should alerting v1 and v2 alerts index be separate? + try { + // TODO: write generated V2 alerts to existing alerts v1 index for now, revisit this decision + monitorCtx.alertIndices!!.createOrUpdateAlertIndex() + monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex() + } catch (e: Exception) { + val id = if (pplMonitor.id.trim().isEmpty()) "_na_" else pplMonitor.id + logger.error("Error loading alerts for monitorV2: $id", e) + return PPLMonitorRunResult(pplMonitor.name, e, periodStart, periodEnd, mapOf(), mapOf()) + } + + // only query data between now and the last PPL Monitor execution + // unless a look back window is specified, in which case use that instead, + // then inject a time filter where statement into PPL Monitor query. + // if the given monitor query already has any time check whatsoever, this + // simply returns the original query itself + val timeFilteredQuery = addTimeFilter(pplMonitor.query, periodStart, periodEnd, pplMonitor.lookBackWindow) + logger.info("time filtered query: $timeFilteredQuery") + + // run each trigger + for (pplTrigger in pplMonitor.triggers) { + try { + // check for suppression and skip execution + // before even running the trigger itself + val suppressed = checkForSuppress(pplTrigger, timeOfCurrentExecution) + if (suppressed) { + logger.info("suppressing trigger ${pplTrigger.name} from monitor ${pplMonitor.name}") + + // automatically set this trigger to untriggered + triggerResults[pplTrigger.id] = PPLTriggerRunResult(pplTrigger.name, false, null) + + continue + } + logger.info("suppression check passed, executing trigger ${pplTrigger.name} from monitor ${pplMonitor.name}") + +// internal fun isActionActionable(action: Action, alert: Alert?): Boolean { +// if (alert != null && alert.state == Alert.State.AUDIT) +// return false +// if (alert == null || action.throttle == null) { +// return true +// } +// if (action.throttleEnabled) { +// val result = alert.actionExecutionResults.firstOrNull { r -> r.actionId == action.id } +// val lastExecutionTime: Instant? = result?.lastExecutionTime +// val throttledTimeBound = currentTime().minus(action.throttle!!.value.toLong(), action.throttle!!.unit) +// return (lastExecutionTime == null || lastExecutionTime.isBefore(throttledTimeBound)) +// } +// return true +// } + + // if trigger uses custom condition, append the custom condition to query, otherwise simply proceed + val queryToExecute = if (pplTrigger.conditionType == ConditionType.NUMBER_OF_RESULTS) { // number of results trigger + timeFilteredQuery + } else { // custom condition trigger + appendCustomCondition(timeFilteredQuery, pplTrigger.customCondition!!) + } + + // TODO: does this handle pagination? does it need to? + // execute the PPL query + val queryResponseJson = executePplQuery(queryToExecute, nodeClient) + logger.info("query execution results for trigger ${pplTrigger.name}: $queryResponseJson") + + // retrieve only the relevant query response rows. + // for num_results triggers, that's the entire response + // for custom triggers, that's only rows that evaluated to true + val relevantQueryResultRows = if (pplTrigger.conditionType == ConditionType.NUMBER_OF_RESULTS) { + // number of results trigger + getQueryResponseWithoutSize(queryResponseJson) + } else { + // custom condition trigger + evaluateCustomConditionTrigger(queryResponseJson, pplTrigger) + } + + // retrieve the number of results + // for number of results triggers, this is simply the number of PPL query results + // for custom triggers, this is the number of rows in the query response's eval result column that evaluated to true + val numResults = relevantQueryResultRows.getLong("total") + logger.info("number of results: $numResults") + + // determine if the trigger condition has been met + val triggered = if (pplTrigger.conditionType == ConditionType.NUMBER_OF_RESULTS) { // number of results trigger + evaluateNumResultsTrigger(numResults, pplTrigger.numResultsCondition!!, pplTrigger.numResultsValue!!) + } else { // custom condition trigger + numResults > 0 // if any of the query results satisfied the custom condition, the trigger counts as triggered + } + + logger.info("PPLTrigger ${pplTrigger.name} triggered: $triggered") + + // store the trigger execution and ppl query results for + // trigger execution response and notification message context + triggerResults[pplTrigger.id] = PPLTriggerRunResult(pplTrigger.name, triggered, null) + pplQueryResults[pplTrigger.id] = queryResponseJson.toMap() + + if (triggered) { + // if trigger is on result set mode, this list will have exactly 1 element + // if trigger is on per result mode, this list will have as many elements as the query results had rows + val preparedQueryResults = prepareQueryResults(relevantQueryResultRows, pplTrigger.mode) + + // generate alerts based on trigger mode + // if this trigger is on result_set mode, this list contains exactly 1 alert + // if this trigger is on per_result mode, this list has any alerts as there are relevant query results + val thisTriggersGeneratedAlerts = generateAlerts( + pplTrigger, + pplMonitor, + preparedQueryResults, + executionId, + timeOfCurrentExecution + ) + + // collect the generated alerts to be written to alerts index + // if the trigger is on result_set mode +// generatedAlerts.addAll(thisTriggersGeneratedAlerts) + + // update the trigger's last execution time for future suppression checks + pplTrigger.lastTriggeredTime = timeOfCurrentExecution + + // send alert notifications +// val actionExecutionResults = mutableListOf() + for (action in pplTrigger.actions) { + for (alert in thisTriggersGeneratedAlerts) { + val pplTriggerExecutionContext = PPLTriggerExecutionContext( + monitorV2, + periodStart, + periodEnd, + null, + pplTrigger, + alert.queryResults + ) + + runAction( + action, + pplTriggerExecutionContext, + monitorCtx, + pplMonitor, + dryRun + ) + } + } + + // write the alerts to the alerts index + monitorCtx.retryPolicy?.let { + saveAlertsV2(thisTriggersGeneratedAlerts, pplMonitor, it, nodeClient) + } + } + } catch (e: Exception) { + logger.error("failed to run PPL Trigger ${pplTrigger.name} for PPL Monitor ${pplMonitor.name}", e) + + // generate an alert with an error message + monitorCtx.retryPolicy?.let { + saveAlertsV2( + generateErrorAlert(pplTrigger, pplMonitor, e, executionId, timeOfCurrentExecution), + pplMonitor, + it, + nodeClient + ) + } + + continue + } + } + + // for suppression checking purposes, reindex the PPL Monitor into the alerting-config index + // with updated last triggered times for each of its triggers + if (triggerResults.any { it.value.triggered }) { + updateMonitorWithLastTriggeredTimes(pplMonitor, nodeClient) + } + + return PPLMonitorRunResult( + pplMonitor.name, + null, + periodStart, + periodEnd, + triggerResults, + pplQueryResults + ) + } + + private fun checkForSuppress(pplTrigger: PPLTrigger, timeOfCurrentExecution: Instant): Boolean { + // the interval between throttledTimeBound and now is the suppression window + // i.e. any PPLTrigger whose last trigger time is in this window must be suppressed + val suppressTimeBound = pplTrigger.suppressDuration?.let { + timeOfCurrentExecution.minus(pplTrigger.suppressDuration!!.millis, ChronoUnit.MILLIS) + } + + // the trigger must be suppressed if... + return pplTrigger.suppressDuration != null && // suppression is enabled on the PPLTrigger + pplTrigger.lastTriggeredTime != null && // and it has triggered before at least once + pplTrigger.lastTriggeredTime!!.isAfter(suppressTimeBound!!) // and it's not yet out of the suppression window + } + + // adds monitor schedule-based time filter + // query: the raw PPL Monitor query + // periodStart: the lower bound of the initially computed query interval based on monitor schedule + // periodEnd: the upper bound of the initially computed query interval based on monitor schedule + // lookBackWindow: customer's desired query look back window, overrides [periodStart, periodEnd] if not null + private fun addTimeFilter(query: String, periodStart: Instant, periodEnd: Instant, lookBackWindow: TimeValue?): String { + // inject time filter into PPL query to only query for data within the (periodStart, periodEnd) interval + // TODO: if query contains "_time", "span", "earliest", "latest", skip adding filter + // pending https://github.com/opensearch-project/sql/issues/3969 + // for now assume "_time" field is always present in customer data + + // if the raw query contained any time check whatsoever, skip adding a time filter internally + // and return query as is, customer's in-query time checks instantly and automatically overrides + if (query.contains("_time")) { // TODO: replace with PPL time keyword checks after that's GA + return query + } + + // if customer passed in a look back window, override the precomputed interval with it + val updatedPeriodStart = lookBackWindow?.let { window -> + periodEnd.minus(window.millis, ChronoUnit.MILLIS) + } ?: periodStart + + // PPL plugin only accepts timestamp strings in this format + val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(UTC) + + val periodStartPplTimestamp = formatter.format(updatedPeriodStart) + val periodEndPplTimeStamp = formatter.format(periodEnd) + + val timeFilterReplace = "| where _time > TIMESTAMP('$periodStartPplTimestamp') and _time < TIMESTAMP('$periodEndPplTimeStamp') |" + val timeFilterAppend = "| where _time > TIMESTAMP('$periodStartPplTimestamp') and _time < TIMESTAMP('$periodEndPplTimeStamp')" + + val timeFilteredQuery: String = if (query.contains("|")) { + // if Monitor query contains piped statements, inject the time filter + // as the first piped statement (i.e. before more complex statements + // like aggregations can take effect later in the query) + query.replaceFirst("|", timeFilterReplace) + } else { + // otherwise the query contains no piped statements and is simply a + // `search source=` statement, simply append time filter at the end + query + timeFilterAppend + } + + return timeFilteredQuery + } + + // appends user-defined custom trigger condition to PPL query, only for custom condition Triggers + private fun appendCustomCondition(query: String, customCondition: String): String { + return "$query | $customCondition" + } + + // returns PPL query response as parsable JSONObject + private suspend fun executePplQuery(query: String, client: NodeClient): JSONObject { + // call PPL plugin to execute time filtered query + val transportPplQueryRequest = TransportPPLQueryRequest( + query, + JSONObject(mapOf(PPL_SQL_QUERY_FIELD to query)), + null // null path falls back to a default path internal to SQL/PPL Plugin + ) + + val transportPplQueryResponse = PPLPluginInterface.suspendUntil { + this.executeQuery( + client, + transportPplQueryRequest, + it + ) + } + + val queryResponseJson = JSONObject(transportPplQueryResponse.result) + + return queryResponseJson + } + + private fun evaluateNumResultsTrigger(numResults: Long, numResultsCondition: NumResultsCondition, numResultsValue: Long): Boolean { + return when (numResultsCondition) { + NumResultsCondition.GREATER_THAN -> numResults > numResultsValue + NumResultsCondition.GREATER_THAN_EQUAL -> numResults >= numResultsValue + NumResultsCondition.LESS_THAN -> numResults < numResultsValue + NumResultsCondition.LESS_THAN_EQUAL -> numResults <= numResultsValue + NumResultsCondition.EQUAL -> numResults == numResultsValue + NumResultsCondition.NOT_EQUAL -> numResults != numResultsValue + } + } + + private fun getQueryResponseWithoutSize(queryResponseJson: JSONObject): JSONObject { + // this will eventually store a deep copy of just the rows that triggered the custom condition + val queryResponseDeepCopy = JSONObject() + + // first add a deep copy of the schema + queryResponseDeepCopy.put("schema", JSONArray(queryResponseJson.getJSONArray("schema").toList())) + + // append empty datarows list, to be populated later + queryResponseDeepCopy.put("datarows", JSONArray()) + + val dataRowList = queryResponseJson.getJSONArray("datarows") + for (i in 0 until dataRowList.length()) { + val dataRow = dataRowList.getJSONArray(i) + queryResponseDeepCopy.getJSONArray("datarows").put(JSONArray(dataRow.toList())) + } + + // include the total but not the size field of the PPL Query response + queryResponseDeepCopy.put("total", queryResponseJson.getLong("total")) + + return queryResponseDeepCopy + } + + private fun evaluateCustomConditionTrigger(customConditionQueryResponse: JSONObject, pplTrigger: PPLTrigger): JSONObject { + // a PPL query with custom condition returning 0 results should imply a valid but not useful query. + // do not trigger alert, but warn that query likely is not functioning as user intended + if (customConditionQueryResponse.getLong("total") == 0L) { + logger.warn( + "During execution of PPL Trigger ${pplTrigger.name}, PPL query with custom " + + "condition returned no results. Proceeding without generating alert." + ) + return customConditionQueryResponse + } + + // this will eventually store a deep copy of just the rows that triggered the custom condition + val relevantQueryResultRows = JSONObject() + + // first add a deep copy of the schema + relevantQueryResultRows.put("schema", JSONArray(customConditionQueryResponse.getJSONArray("schema").toList())) + + // append empty datarows list, to be populated later + relevantQueryResultRows.put("datarows", JSONArray()) + + // find the name of the eval result variable defined in custom condition + val evalResultVarName = findEvalResultVar(pplTrigger.customCondition!!) + + // find the eval statement result variable in the PPL query response schema + val schemaList = customConditionQueryResponse.getJSONArray("schema") + var evalResultVarIdx = -1 + for (i in 0 until schemaList.length()) { + val schemaObj = schemaList.getJSONObject(i) + val columnName = schemaObj.getString("name") + + if (columnName == evalResultVarName) { + if (schemaObj.getString("type") != "boolean") { + throw IllegalStateException( + "parsing results of PPL query with custom condition failed," + + "eval statement variable was not type boolean, but instead type: ${schemaObj.getString("type")}" + ) + } + + evalResultVarIdx = i + break + } + } + + // eval statement result variable should always be found + if (evalResultVarIdx == -1) { + throw IllegalStateException( + "expected to find eval statement results variable \"$evalResultVarName\" in results " + + "of PPL query with custom condition, but did not." + ) + } + + val dataRowList = customConditionQueryResponse.getJSONArray("datarows") + for (i in 0 until dataRowList.length()) { + val dataRow = dataRowList.getJSONArray(i) + val evalResult = dataRow.getBoolean(evalResultVarIdx) + if (evalResult) { + // if the row triggered the custom condition + // add it to the relevant results deep copy + relevantQueryResultRows.getJSONArray("datarows").put(JSONArray(dataRow.toList())) + } + } + + // include the total but not the size field of the PPL Query response + relevantQueryResultRows.put("total", relevantQueryResultRows.getJSONArray("datarows").length()) + + // return only the rows that triggered the custom condition + return relevantQueryResultRows + } + + // TODO: is there maybe some PPL plugin util function we can use to replace this? + // searches a given custom condition eval statement for the name of the result + // variable and returns it + private fun findEvalResultVar(customCondition: String): String { + // the PPL keyword "eval", followed by a whitespace must be present, otherwise a syntax error from PPL plugin would've + // been thrown when executing the query (without the whitespace, the query would've had something like "evalresult", + // which is invalid PPL + val startOfEvalStatement = "eval " + + val startIdx = customCondition.indexOf(startOfEvalStatement) + startOfEvalStatement.length + val endIdx = startIdx + customCondition.substring(startIdx).indexOfFirst { it == ' ' || it == '=' } + return customCondition.substring(startIdx, endIdx) + } + + // prepares the query results to be passed into alerts and notifications based on trigger mode + // if result set, alert and notification simply stores all of the query results + // if per result, each alert and notification stores a single row of the query results + private fun prepareQueryResults(relevantQueryResultRows: JSONObject, triggerMode: TriggerMode): List { + // case: result set + if (triggerMode == TriggerMode.RESULT_SET) { + return listOf(relevantQueryResultRows) + } + + // case: per result + val individualRows = mutableListOf() + val numAlertsToGenerate = relevantQueryResultRows.getInt("total") + for (i in 0 until numAlertsToGenerate) { + val individualRow = JSONObject() + individualRow.put("schema", JSONArray(relevantQueryResultRows.getJSONArray("schema").toList())) + individualRow.put("datarows", JSONArray(relevantQueryResultRows.getJSONArray("datarows").getJSONArray(i).toList())) + individualRows.add(individualRow) + } + + return individualRows + } + + private fun generateAlerts( + pplTrigger: PPLTrigger, + pplMonitor: PPLMonitor, + preparedQueryResults: List, + executionId: String, + timeOfCurrentExecution: Instant + ): List { + val expirationTime = pplTrigger.expireDuration?.millis?.let { timeOfCurrentExecution.plus(it, ChronoUnit.MILLIS) } + + val alertV2s = mutableListOf() + for (queryResult in preparedQueryResults) { + val alertV2 = AlertV2( + monitorId = pplMonitor.id, + monitorName = pplMonitor.name, + monitorVersion = pplMonitor.version, + triggerId = pplTrigger.id, + triggerName = pplTrigger.name, + queryResults = queryResult.toMap(), + triggeredTime = timeOfCurrentExecution, + expirationTime = expirationTime, + severity = pplTrigger.severity.value, + executionId = executionId + ) + alertV2s.add(alertV2) + } + + return alertV2s.toList() // return as immutable list + } + + private fun generateErrorAlert( + pplTrigger: PPLTrigger, + pplMonitor: PPLMonitor, + exception: Exception, + executionId: String, + timeOfCurrentExecution: Instant + ): List { + val expirationTime = pplTrigger.expireDuration?.millis?.let { timeOfCurrentExecution.plus(it, ChronoUnit.MILLIS) } + + val errorMessage = "Failed to run PPL Trigger ${pplTrigger.name} from PPL Monitor ${pplMonitor.name}: " + + exception.userErrorMessage() + val obfuscatedErrorMessage = AlertError.obfuscateIPAddresses(errorMessage) + + val alertV2 = AlertV2( + monitorId = pplMonitor.id, + monitorName = pplMonitor.name, + monitorVersion = pplMonitor.version, + triggerId = pplTrigger.id, + triggerName = pplTrigger.name, + queryResults = mapOf(), + triggeredTime = timeOfCurrentExecution, + expirationTime = expirationTime, + errorMessage = obfuscatedErrorMessage, + severity = TriggerV2.Severity.ERROR.value, + executionId = executionId + ) + + return listOf(alertV2) + } + + private suspend fun saveAlertsV2( + alerts: List, + pplMonitor: PPLMonitor, + retryPolicy: BackoffPolicy, + client: NodeClient + ) { + logger.info("received alerts: $alerts") + + var requestsToRetry = alerts.flatMap { alert -> + listOf>( + IndexRequest(AlertIndices.ALERT_INDEX) + .routing(pplMonitor.id) // set routing ID to PPL Monitor ID + .source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .id(if (alert.id != Alert.NO_ID) alert.id else null) + ) + } + + if (requestsToRetry.isEmpty()) return + // Retry Bulk requests if there was any 429 response + retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) { + val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) } + val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed } + failedResponses.forEach { + logger.info("write alerts failed responses: ${it.failureMessage}") + } + requestsToRetry = failedResponses.filter { it.status() == RestStatus.TOO_MANY_REQUESTS } + .map { bulkRequest.requests()[it.itemId] as IndexRequest } + + if (requestsToRetry.isNotEmpty()) { + val retryCause = failedResponses.first { it.status() == RestStatus.TOO_MANY_REQUESTS }.failure.cause + throw ExceptionsHelper.convertToOpenSearchException(retryCause) + } + } + } + + private suspend fun updateMonitorWithLastTriggeredTimes(pplMonitor: PPLMonitor, client: NodeClient) { + val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX) + .id(pplMonitor.id) + .source(pplMonitor.toXContentWithType(XContentFactory.jsonBuilder())) + .routing(pplMonitor.id) + + val indexResponse = client.suspendUntil { index(indexRequest, it) } + + logger.info("PPLMonitor update with last execution times index response: ${indexResponse.result}") + } + + suspend fun runAction( + action: Action, + triggerCtx: PPLTriggerExecutionContext, + monitorCtx: MonitorRunnerExecutionContext, + pplMonitor: PPLMonitor, + dryrun: Boolean + ) { + // this function can throw an exception, which is caught by the try + // catch in runMonitor() to generate an error alert + val actionOutput = mutableMapOf() + actionOutput[Action.SUBJECT] = if (action.subjectTemplate != null) + MonitorRunnerService.compileTemplateV2(action.subjectTemplate!!, triggerCtx) + else "" + actionOutput[Action.MESSAGE] = MonitorRunnerService.compileTemplateV2(action.messageTemplate, triggerCtx) + if (Strings.isNullOrEmpty(actionOutput[Action.MESSAGE])) { + throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}") + } + + if (!dryrun) { +// val client = monitorCtx.client + actionOutput[Action.MESSAGE_ID] = getConfigAndSendNotification( + action, + monitorCtx, + actionOutput[Action.SUBJECT], + actionOutput[Action.MESSAGE]!! + ) + // TODO: use this block when security plugin is enabled +// client!!.threadPool().threadContext.stashContext().use { +// withClosableContext( +// InjectorContextElement( +// pplMonitor.id, +// monitorCtx.settings!!, +// monitorCtx.threadPool!!.threadContext, +// pplMonitor.user?.roles, +// pplMonitor.user +// ) +// ) { +// actionOutput[Action.MESSAGE_ID] = getConfigAndSendNotification( +// action, +// monitorCtx, +// actionOutput[Action.SUBJECT], +// actionOutput[Action.MESSAGE]!! +// ) +// } +// } + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestExecuteMonitorV2Action.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestExecuteMonitorV2Action.kt new file mode 100644 index 000000000..d05126c1a --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestExecuteMonitorV2Action.kt @@ -0,0 +1,70 @@ +package org.opensearch.alerting.resthandler + +import org.apache.logging.log4j.LogManager +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.common.unit.TimeValue +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.ExecuteMonitorV2Request +import org.opensearch.commons.alerting.model.MonitorV2 +import org.opensearch.commons.alerting.util.AlertingException +import org.opensearch.core.xcontent.XContentParser.Token.START_OBJECT +import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.RestHandler.Route +import org.opensearch.rest.RestRequest +import org.opensearch.rest.RestRequest.Method.POST +import org.opensearch.rest.action.RestToXContentListener +import org.opensearch.transport.client.node.NodeClient +import java.time.Instant + +private val log = LogManager.getLogger(RestExecuteMonitorV2Action::class.java) + +class RestExecuteMonitorV2Action : BaseRestHandler() { + + override fun getName(): String = "execute_monitor_v2_action" + + override fun routes(): List { + return listOf( + Route( + POST, + "${AlertingPlugin.MONITOR_V2_BASE_URI}/{monitorV2Id}/_execute" + ), + Route( + POST, + "${AlertingPlugin.MONITOR_V2_BASE_URI}/_execute" + ) + ) + } + + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.debug("${request.method()} ${AlertingPlugin.MONITOR_V2_BASE_URI}/_execute") + + return RestChannelConsumer { channel -> + val dryrun = request.paramAsBoolean("dryrun", false) + val requestEnd = request.paramAsTime("period_end", TimeValue(Instant.now().toEpochMilli())) + + if (request.hasParam("monitorV2Id")) { + val monitorV2Id = request.param("monitorV2Id") + val execMonitorV2Request = ExecuteMonitorV2Request(dryrun, monitorV2Id, null, null, requestEnd) + client.execute(AlertingActions.EXECUTE_MONITOR_V2_ACTION_TYPE, execMonitorV2Request, RestToXContentListener(channel)) + } else { + val xcp = request.contentParser() + ensureExpectedToken(START_OBJECT, xcp.nextToken(), xcp) + + val monitorV2: MonitorV2 + try { + monitorV2 = MonitorV2.parse(xcp) + } catch (e: Exception) { + throw AlertingException.wrap(e) + } + + val execMonitorV2Request = ExecuteMonitorV2Request(dryrun, null, monitorV2, null, requestEnd) + client.execute(AlertingActions.EXECUTE_MONITOR_V2_ACTION_TYPE, execMonitorV2Request, RestToXContentListener(channel)) + } + } + } + + override fun responseParams(): Set { + return setOf("dryrun", "period_end", "monitorV2Id") + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/PPLTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/PPLTriggerExecutionContext.kt new file mode 100644 index 000000000..a8b236673 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/PPLTriggerExecutionContext.kt @@ -0,0 +1,24 @@ +package org.opensearch.alerting.script + +import org.opensearch.commons.alerting.model.MonitorV2 +import org.opensearch.commons.alerting.model.PPLMonitorRunResult.Companion.PPL_QUERY_RESULTS_FIELD +import org.opensearch.commons.alerting.model.PPLTrigger +import org.opensearch.commons.alerting.model.PPLTrigger.Companion.PPL_TRIGGER_FIELD +import java.time.Instant + +data class PPLTriggerExecutionContext( + override val monitorV2: MonitorV2, + override val periodStart: Instant, + override val periodEnd: Instant, + override val error: Exception? = null, + val pplTrigger: PPLTrigger, + val pplQueryResults: Map // keys are PPL query result fields, not trigger ID +) : TriggerV2ExecutionContext(monitorV2, periodStart, periodEnd, error) { + + override fun asTemplateArg(): Map { + val templateArg = super.asTemplateArg().toMutableMap() + templateArg[PPL_TRIGGER_FIELD] = pplTrigger.asTemplateArg() + templateArg[PPL_QUERY_RESULTS_FIELD] = pplQueryResults + return templateArg.toMap() + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/TriggerV2ExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/TriggerV2ExecutionContext.kt new file mode 100644 index 000000000..8e1cd8b9f --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/TriggerV2ExecutionContext.kt @@ -0,0 +1,31 @@ +package org.opensearch.alerting.script + +import org.opensearch.commons.alerting.model.MonitorV2 +import org.opensearch.commons.alerting.model.MonitorV2RunResult +import org.opensearch.commons.alerting.model.TriggerV2 +import java.time.Instant + +abstract class TriggerV2ExecutionContext( + open val monitorV2: MonitorV2, + open val periodStart: Instant, + open val periodEnd: Instant, + open val error: Exception? = null +) { + + constructor(monitorV2: MonitorV2, triggerV2: TriggerV2, monitorV2RunResult: MonitorV2RunResult<*>) : + this( + monitorV2, + monitorV2RunResult.periodStart, + monitorV2RunResult.periodEnd, + monitorV2RunResult.triggerResults[triggerV2.id]?.error + ) + + open fun asTemplateArg(): Map { + return mapOf( + "monitorV2" to monitorV2.asTemplateArg(), + "periodStart" to periodStart, + "periodEnd" to periodEnd, + "error" to error + ) + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorV2Action.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorV2Action.kt new file mode 100644 index 000000000..984fa0c96 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorV2Action.kt @@ -0,0 +1,170 @@ +package org.opensearch.alerting.transport + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.alerting.MonitorRunnerService +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.ExecuteMonitorV2Request +import org.opensearch.commons.alerting.action.ExecuteMonitorV2Response +import org.opensearch.commons.alerting.model.MonitorV2 +import org.opensearch.commons.alerting.model.PPLMonitor +import org.opensearch.commons.alerting.model.PPLMonitor.Companion.PPL_MONITOR_TYPE +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.util.AlertingException +import org.opensearch.core.action.ActionListener +import org.opensearch.core.rest.RestStatus +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService +import org.opensearch.transport.client.Client +import java.time.Instant + +private val log = LogManager.getLogger(TransportExecuteMonitorV2Action::class.java) + +class TransportExecuteMonitorV2Action @Inject constructor( + private val transportService: TransportService, + private val client: Client, + private val clusterService: ClusterService, + private val runner: MonitorRunnerService, + actionFilters: ActionFilters, + val xContentRegistry: NamedXContentRegistry, + private val settings: Settings +) : HandledTransportAction( + AlertingActions.EXECUTE_MONITOR_V2_ACTION_NAME, transportService, actionFilters, ::ExecuteMonitorV2Request +) { + @Volatile private var indexTimeout = AlertingSettings.INDEX_TIMEOUT.get(settings) + + override fun doExecute( + task: Task, + execMonitorV2Request: ExecuteMonitorV2Request, + actionListener: ActionListener + ) { +// client.threadPool().threadContext.stashContext().use { // TODO: include this when security plugin enabled + /* first define a function that will be used later to run MonitorV2s */ + val executeMonitorV2 = fun (monitorV2: MonitorV2) { + runner.launch { + // get execution time interval + val (periodStart, periodEnd) = if (execMonitorV2Request.requestStart != null) { + Pair( + Instant.ofEpochMilli(execMonitorV2Request.requestStart!!.millis), + Instant.ofEpochMilli(execMonitorV2Request.requestEnd.millis) + ) + } else { + monitorV2.schedule.getPeriodEndingAt(Instant.ofEpochMilli(execMonitorV2Request.requestEnd.millis)) + } + + // call the MonitorRunnerService to execute the MonitorV2 + try { + val monitorV2Type = when (monitorV2) { + is PPLMonitor -> PPL_MONITOR_TYPE + else -> throw IllegalStateException("Unexpected MonitorV2 type: ${monitorV2.javaClass.name}") + } + log.info( + "Executing MonitorV2 from API - id: ${monitorV2.id}, type: $monitorV2Type, " + + "periodStart: $periodStart, periodEnd: $periodEnd, dryrun: ${execMonitorV2Request.dryrun}" + ) + val monitorV2RunResult = runner.runJobV2( + monitorV2, + periodStart, + periodEnd, + execMonitorV2Request.dryrun, + transportService + ) + withContext(Dispatchers.IO) { + actionListener.onResponse(ExecuteMonitorV2Response(monitorV2RunResult)) + } + } catch (e: Exception) { + log.error("Unexpected error running monitor", e) + withContext(Dispatchers.IO) { + actionListener.onFailure(AlertingException.wrap(e)) + } + } + } + } + + /* now execute the MonitorV2 */ + + // if both monitor_v2 id and object were passed in, ignore object and proceed with id + if (execMonitorV2Request.monitorId != null && execMonitorV2Request.monitorV2 != null) { + log.info( + "Both a monitor_v2 id and monitor_v2 object were passed in to ExecuteMonitorV2" + + "request. Proceeding to execute by monitor_v2 ID and ignoring monitor_v2 object." + ) + } + + if (execMonitorV2Request.monitorId != null) { // execute with monitor ID case + // search the alerting-config index for the MonitorV2 with this ID + val getMonitorV2Request = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).id(execMonitorV2Request.monitorId) + client.get( + getMonitorV2Request, + object : ActionListener { + override fun onResponse(getMonitorV2Response: GetResponse) { + if (!getMonitorV2Response.isExists) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "Can't find monitorV2 with id: ${getMonitorV2Response.id}", + RestStatus.NOT_FOUND + ) + ) + ) + return + } + if (!getMonitorV2Response.isSourceEmpty) { + XContentHelper.createParser( + xContentRegistry, LoggingDeprecationHandler.INSTANCE, + getMonitorV2Response.sourceAsBytesRef, XContentType.JSON + ).use { xcp -> + try { + val monitorV2 = ScheduledJob.parse( + xcp, + getMonitorV2Response.id, + getMonitorV2Response.version + ) as MonitorV2 + executeMonitorV2(monitorV2) + } catch (e: ClassCastException) { + actionListener.onFailure( + AlertingException.wrap( + IllegalArgumentException( + "Passed in Monitor ID is a legacy Alerting Monitor, please pass in an " + + "Alerting V2 Monitor" + ) + ) + ) + } catch (e: Exception) { + actionListener.onFailure(AlertingException.wrap(e)) + } + } + } + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + ) + } else { // execute with monitor object case + try { + val monitorV2 = execMonitorV2Request.monitorV2 as MonitorV2 + executeMonitorV2(monitorV2) + } catch (e: Exception) { + actionListener.onFailure(AlertingException.wrap(e)) + } + } +// } + } +} diff --git a/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt b/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt index 582d13fbe..077bfc6ef 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt @@ -21,6 +21,7 @@ import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.InjectSecurity import org.opensearch.commons.authuser.User import org.opensearch.commons.notifications.NotificationsPluginInterface +import org.opensearch.commons.ppl.PPLPluginInterface import org.opensearch.core.action.ActionListener import org.opensearch.core.rest.RestStatus import org.opensearch.core.rest.RestStatus.BAD_GATEWAY @@ -170,6 +171,20 @@ suspend fun NotificationsPluginInterface.suspendUntil(block: NotificationsPl }) } +/** + * Converts [PPLPluginInterface] methods that take a callback into a kotlin suspending function. + * + * @param block - a block of code that is passed an [ActionListener] that should be passed to the PPLPluginInterface API. + */ +suspend fun PPLPluginInterface.suspendUntil(block: PPLPluginInterface.(ActionListener) -> Unit): T = + suspendCoroutine { cont -> + block(object : ActionListener { + override fun onResponse(response: T) = cont.resume(response) + + override fun onFailure(e: Exception) = cont.resumeWithException(e) + }) + } + class InjectorContextElement( id: String, settings: Settings,