Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ abstract class MonitorRunner {
}
}

protected suspend fun getConfigAndSendNotification(
suspend fun getConfigAndSendNotification(
action: Action,
monitorCtx: MonitorRunnerExecutionContext,
subject: String?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.remote.monitors.RemoteDocumentLevelMonitorRunner
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.script.TriggerV2ExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
Expand All @@ -59,10 +60,17 @@ import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.lifecycle.AbstractLifecycleComponent
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.ExecuteMonitorV2Request
import org.opensearch.commons.alerting.action.ExecuteMonitorV2Response
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorRunResult
import org.opensearch.commons.alerting.model.MonitorV2
import org.opensearch.commons.alerting.model.MonitorV2RunResult
import org.opensearch.commons.alerting.model.PPLMonitor
import org.opensearch.commons.alerting.model.PPLMonitor.Companion.PPL_MONITOR_TYPE
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.TriggerRunResult
import org.opensearch.commons.alerting.model.Workflow
Expand Down Expand Up @@ -316,11 +324,16 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
logger.error("Failed to move active alerts for monitor [${job.id}].", e)
}
}
} else if (job is MonitorV2) {
return
} else {
throw IllegalArgumentException("Invalid job type")
}
}

// TODO: if MonitorV2 was deleted, skip trying to move alerts
// cluster throws failed to move alerts exception whenever a MonitorV2 is deleted
// because Alerting V2's stateless alerts don't need to be moved
override fun postDelete(jobId: String) {
launch {
try {
Expand Down Expand Up @@ -408,6 +421,44 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
}
}
}
is MonitorV2 -> {
if (job !is PPLMonitor) {
throw IllegalStateException("Unexpected invalid MonitorV2 type: ${job.javaClass.name}")
}

launch {
var monitorLock: LockModel? = null
try {
monitorLock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
logger.debug("lock ${monitorLock!!.lockId} acquired")
logger.debug(
"PERF_DEBUG: executing $PPL_MONITOR_TYPE ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
val executeMonitorV2Request = ExecuteMonitorV2Request(
false,
job.id, // only need to pass in MonitorV2 ID
null, // no need to pass in MonitorV2 object itself
TimeValue(periodStart.toEpochMilli()),
TimeValue(periodEnd.toEpochMilli())
)
monitorCtx.client!!.suspendUntil<Client, ExecuteMonitorV2Response> {
monitorCtx.client!!.execute(
AlertingActions.EXECUTE_MONITOR_V2_ACTION_TYPE,
executeMonitorV2Request,
it
)
}
} catch (e: Exception) {
logger.error("MonitorV2 run failed for monitor with id ${job.id}", e)
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(monitorLock, it) }
logger.debug("lock ${monitorLock?.lockId} released")
}
}
}
else -> {
throw IllegalArgumentException("Invalid job type")
}
Expand All @@ -433,20 +484,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
): MonitorRunResult<*> {
// Updating the scheduled job index at the start of monitor execution runs for when there is an upgrade the the schema mapping
// has not been updated.
if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) {
IndexUtils.updateIndexMapping(
ScheduledJob.SCHEDULED_JOBS_INDEX,
ScheduledJobIndices.scheduledJobMappings(), monitorCtx.clusterService!!.state(), monitorCtx.client!!.admin().indices(),
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(response: AcknowledgedResponse) {
}

override fun onFailure(t: Exception) {
logger.error("Failed to update config index schema", t)
}
}
)
}
updateAlertingConfigIndexSchema()

if (job is Workflow) {
logger.info("Executing scheduled workflow - id: ${job.id}, periodStart: $periodStart, periodEnd: $periodEnd, dryrun: $dryrun")
Expand Down Expand Up @@ -539,6 +577,44 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
}
}

// after the above JobRunner interface override runJob calls ExecuteMonitorV2 API,
// the ExecuteMonitorV2 transport action calls this function to call the PPLMonitorRunner,
// where the core PPL Monitor execution logic resides
suspend fun runJobV2(
monitorV2: MonitorV2,
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean,
transportService: TransportService,
): MonitorV2RunResult<*> {
updateAlertingConfigIndexSchema()

val executionId = "${monitorV2.id}_${LocalDateTime.now(ZoneOffset.UTC)}_${UUID.randomUUID()}"
val monitorV2Type = when (monitorV2) {
is PPLMonitor -> PPL_MONITOR_TYPE
else -> throw IllegalStateException("Unexpected MonitorV2 type: ${monitorV2.javaClass.name}")
}

logger.info(
"Executing scheduled monitor - id: ${monitorV2.id}, type: $monitorV2Type, periodStart: $periodStart, " +
"periodEnd: $periodEnd, dryrun: $dryrun, executionId: $executionId"
)

// for now, always call PPLMonitorRunner since only PPL Monitors are initially supported
// to introduce new MonitorV2 type, create its MonitorRunner, and if/else branch
// to the corresponding MonitorRunners based on type. For now, default to PPLMonitorRunner
val runResult = PPLMonitorRunner.runMonitorV2(
monitorV2,
monitorCtx,
periodStart,
periodEnd,
dryrun,
executionId = executionId,
transportService = transportService,
)
return runResult
}

// TODO: See if we can move below methods (or few of these) to a common utils
internal fun getRolesForMonitor(monitor: Monitor): List<String> {
/*
Expand Down Expand Up @@ -582,4 +658,27 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
.newInstance(template.params + mapOf("ctx" to ctx.asTemplateArg()))
.execute()
}

internal fun compileTemplateV2(template: Script, ctx: TriggerV2ExecutionContext): String {
return monitorCtx.scriptService!!.compile(template, TemplateScript.CONTEXT)
.newInstance(template.params + mapOf("ctx" to ctx.asTemplateArg()))
.execute()
}

private fun updateAlertingConfigIndexSchema() {
if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) {
IndexUtils.updateIndexMapping(
ScheduledJob.SCHEDULED_JOBS_INDEX,
ScheduledJobIndices.scheduledJobMappings(), monitorCtx.clusterService!!.state(), monitorCtx.client!!.admin().indices(),
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(response: AcknowledgedResponse) {
}

override fun onFailure(t: Exception) {
logger.error("Failed to update config index schema", t)
}
}
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.opensearch.alerting

import org.opensearch.commons.alerting.model.MonitorV2
import org.opensearch.commons.alerting.model.MonitorV2RunResult
import org.opensearch.transport.TransportService
import java.time.Instant

interface MonitorV2Runner {
suspend fun runMonitorV2(
monitorV2: MonitorV2,
monitorCtx: MonitorRunnerExecutionContext, // MonitorV2 reads from same context as Monitor
periodStart: Instant,
periodEnd: Instant,
dryRun: Boolean,
executionId: String,
transportService: TransportService
): MonitorV2RunResult<*>
}
Loading
Loading