Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.opensearch.alerting.core.modelv2

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.core.xcontent.ToXContent
import java.time.Instant

interface MonitorV2RunResult<out TriggerV2Result : TriggerV2RunResult> : Writeable, ToXContent {
val monitorName: String
val error: Exception?
val periodStart: Instant
val periodEnd: Instant
val triggerResults: Map<String, TriggerV2Result>

enum class MonitorV2RunResultType() {
PPL_MONITOR_RUN_RESULT;
}

companion object {
const val MONITOR_V2_NAME_FIELD = "monitor_v2_name"
const val ERROR_FIELD = "error"
const val PERIOD_START_FIELD = "period_start"
const val PERIOD_END_FIELD = "period_end"
const val TRIGGER_RESULTS_FIELD = "trigger_results"

fun readFrom(sin: StreamInput): MonitorV2RunResult<TriggerV2RunResult> {
val monitorRunResultType = sin.readEnum(MonitorV2RunResultType::class.java)
return when (monitorRunResultType) {
MonitorV2RunResultType.PPL_MONITOR_RUN_RESULT -> PPLMonitorRunResult(sin)
else -> throw IllegalStateException("Unexpected input [$monitorRunResultType] when reading MonitorV2RunResult")
}
}

fun writeTo(out: StreamOutput, monitorV2RunResult: MonitorV2RunResult<TriggerV2RunResult>) {
when (monitorV2RunResult) {
is PPLMonitorRunResult -> {
out.writeEnum(MonitorV2RunResultType.PPL_MONITOR_RUN_RESULT)
monitorV2RunResult.writeTo(out)
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.opensearch.alerting.core.modelv2

import org.opensearch.alerting.core.modelv2.MonitorV2RunResult.Companion.ERROR_FIELD
import org.opensearch.alerting.core.modelv2.MonitorV2RunResult.Companion.MONITOR_V2_NAME_FIELD
import org.opensearch.alerting.core.modelv2.MonitorV2RunResult.Companion.PERIOD_END_FIELD
import org.opensearch.alerting.core.modelv2.MonitorV2RunResult.Companion.PERIOD_START_FIELD
import org.opensearch.alerting.core.modelv2.MonitorV2RunResult.Companion.TRIGGER_RESULTS_FIELD
import org.opensearch.alerting.core.util.nonOptionalTimeField
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import java.io.IOException
import java.time.Instant

data class PPLMonitorRunResult(
override val monitorName: String,
override val error: Exception?,
override val periodStart: Instant,
override val periodEnd: Instant,
override val triggerResults: Map<String, PPLTriggerRunResult>,
val pplQueryResults: Map<String, Map<String, Any?>> // key: trigger id, value: query results
) : MonitorV2RunResult<PPLTriggerRunResult> {

@Throws(IOException::class)
@Suppress("UNCHECKED_CAST")
constructor(sin: StreamInput) : this(
sin.readString(), // monitorName
sin.readException(), // error
sin.readInstant(), // periodStart
sin.readInstant(), // periodEnd
sin.readMap() as Map<String, PPLTriggerRunResult>, // triggerResults
sin.readMap() as Map<String, Map<String, Any?>> // pplQueryResults
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
builder.field(MONITOR_V2_NAME_FIELD, monitorName)
builder.nonOptionalTimeField(PERIOD_START_FIELD, periodStart)
builder.nonOptionalTimeField(PERIOD_END_FIELD, periodEnd)
builder.field(ERROR_FIELD, error?.message)
builder.field(TRIGGER_RESULTS_FIELD, triggerResults)
builder.field(PPL_QUERY_RESULTS_FIELD, pplQueryResults)
builder.endObject()
return builder
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(monitorName)
out.writeException(error)
out.writeInstant(periodStart)
out.writeInstant(periodEnd)
out.writeMap(triggerResults)
out.writeMap(pplQueryResults)
}

companion object {
const val PPL_QUERY_RESULTS_FIELD = "ppl_query_results"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.opensearch.alerting.core.modelv2

import org.opensearch.alerting.core.modelv2.TriggerV2RunResult.Companion.ERROR_FIELD
import org.opensearch.alerting.core.modelv2.TriggerV2RunResult.Companion.NAME_FIELD
import org.opensearch.alerting.core.modelv2.TriggerV2RunResult.Companion.TRIGGERED_FIELD
import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.commons.alerting.model.TriggerRunResult
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import java.io.IOException

data class PPLTriggerRunResult(
override var triggerName: String,
override var triggered: Boolean,
override var error: Exception?,
) : TriggerV2RunResult {

@Throws(IOException::class)
@Suppress("UNCHECKED_CAST")
constructor(sin: StreamInput) : this(
triggerName = sin.readString(),
triggered = sin.readBoolean(),
error = sin.readException()
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
builder.field(NAME_FIELD, triggerName)
builder.field(TRIGGERED_FIELD, triggered)
builder.field(ERROR_FIELD, error?.message)
builder.endObject()
return builder
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(triggerName)
out.writeBoolean(triggered)
out.writeException(error)
}

companion object {
@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): TriggerRunResult {
return QueryLevelTriggerRunResult(sin)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.opensearch.alerting.core.modelv2

import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.core.xcontent.ToXContent

interface TriggerV2RunResult : Writeable, ToXContent {

val triggerName: String
val triggered: Boolean
val error: Exception?

companion object {
const val NAME_FIELD = "name"
const val TRIGGERED_FIELD = "triggered"
const val ERROR_FIELD = "error"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.opensearch.alerting.core.ppl

import org.opensearch.commons.utils.recreateObject
import org.opensearch.core.action.ActionListener
import org.opensearch.core.action.ActionResponse
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.sql.plugin.transport.PPLQueryAction
import org.opensearch.sql.plugin.transport.TransportPPLQueryRequest
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse
import org.opensearch.transport.client.node.NodeClient

/**
* Various transport action plugin interfaces for the SQL/PPL plugin
*/
object PPLPluginInterface {
fun executeQuery(
client: NodeClient,
request: TransportPPLQueryRequest,
listener: ActionListener<TransportPPLQueryResponse>
) {
client.execute(
PPLQueryAction.INSTANCE,
request,
wrapActionListener(listener) { response -> recreateObject(response) { TransportPPLQueryResponse(it) } }
)
}

/**
* Wrap action listener on concrete response class by a new created one on ActionResponse.
* This is required because the response may be loaded by different classloader across plugins.
* The onResponse(ActionResponse) avoids type cast exception and give a chance to recreate
* the response object.
*/
@Suppress("UNCHECKED_CAST")
private fun <Response : ActionResponse> wrapActionListener(
listener: ActionListener<Response>,
recreate: (Writeable) -> Response
): ActionListener<Response> {
return object : ActionListener<ActionResponse> {
override fun onResponse(response: ActionResponse) {
val recreated = recreate(response)
listener.onResponse(recreated)
}

override fun onFailure(exception: java.lang.Exception) {
listener.onFailure(exception)
}
} as ActionListener<Response>
}
}
Loading