Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
- Adds capability to automatically switch to old access-control if model-group is excluded from protected resources setting ([#1569](https://github.com/opensearch-project/anomaly-detection/pull/1569))
- Adding suggest and validate transport actions to node client ([#1605](https://github.com/opensearch-project/anomaly-detection/pull/1605))
- Adding auto create as an optional field on detectors ([#1602](https://github.com/opensearch-project/anomaly-detection/pull/1602))
- Adding create and start to AD node client ([#1611](https://github.com/opensearch-project/anomaly-detection/pull/1611))

### Bug Fixes

Expand Down
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ dependencies {
testImplementation 'org.reflections:reflections:0.10.2'

testImplementation "org.opensearch.test:framework:${opensearch_version}"

zipArchive("org.opensearch.plugin:opensearch-ml-plugin:${opensearch_build}")

}

apply plugin: 'java'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.ad.transport.GetAnomalyDetectorResponse;
import org.opensearch.ad.transport.IndexAnomalyDetectorRequest;
import org.opensearch.ad.transport.IndexAnomalyDetectorResponse;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.core.action.ActionListener;
import org.opensearch.timeseries.transport.GetConfigRequest;
import org.opensearch.timeseries.transport.JobRequest;
import org.opensearch.timeseries.transport.JobResponse;
import org.opensearch.timeseries.transport.SuggestConfigParamRequest;
import org.opensearch.timeseries.transport.SuggestConfigParamResponse;
import org.opensearch.timeseries.transport.ValidateConfigRequest;
Expand Down Expand Up @@ -110,4 +114,40 @@ default ActionFuture<SuggestConfigParamResponse> suggestAnomalyDetector(SuggestC
* @param listener a listener to be notified of the result
*/
void suggestAnomalyDetector(SuggestConfigParamRequest suggestRequest, ActionListener<SuggestConfigParamResponse> listener);

/**
* Create anomaly detector - refer to https://opensearch.org/docs/latest/observing-your-data/ad/api/#create-detector
* @param createRequest request to create the detector
* @return ActionFuture of IndexAnomalyDetectorResponse
*/
default ActionFuture<IndexAnomalyDetectorResponse> createAnomalyDetector(IndexAnomalyDetectorRequest createRequest) {
PlainActionFuture<IndexAnomalyDetectorResponse> actionFuture = PlainActionFuture.newFuture();
createAnomalyDetector(createRequest, actionFuture);
return actionFuture;
}

/**
* Create anomaly detector - refer to https://opensearch.org/docs/latest/observing-your-data/ad/api/#create-detector
* @param createRequest request to create the detector
* @param listener a listener to be notified of the result
*/
void createAnomalyDetector(IndexAnomalyDetectorRequest createRequest, ActionListener<IndexAnomalyDetectorResponse> listener);

/**
* Start anomaly detector - refer to https://opensearch.org/docs/latest/observing-your-data/ad/api/#start-detector
* @param startRequest request to start the detector
* @return ActionFuture of JobResponse
*/
default ActionFuture<JobResponse> startAnomalyDetector(JobRequest startRequest) {
PlainActionFuture<JobResponse> actionFuture = PlainActionFuture.newFuture();
startAnomalyDetector(startRequest, actionFuture);
return actionFuture;
}

/**
* Start anomaly detector - refer to https://opensearch.org/docs/latest/observing-your-data/ad/api/#start-detector
* @param startRequest request to start the detector
* @param listener a listener to be notified of the result
*/
void startAnomalyDetector(JobRequest startRequest, ActionListener<JobResponse> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.transport.AnomalyDetectorJobAction;
import org.opensearch.ad.transport.GetAnomalyDetectorAction;
import org.opensearch.ad.transport.GetAnomalyDetectorResponse;
import org.opensearch.ad.transport.IndexAnomalyDetectorAction;
import org.opensearch.ad.transport.IndexAnomalyDetectorRequest;
import org.opensearch.ad.transport.IndexAnomalyDetectorResponse;
import org.opensearch.ad.transport.SearchAnomalyDetectorAction;
import org.opensearch.ad.transport.SearchAnomalyResultAction;
import org.opensearch.ad.transport.SuggestAnomalyDetectorParamAction;
Expand All @@ -19,6 +23,8 @@
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.timeseries.transport.GetConfigRequest;
import org.opensearch.timeseries.transport.JobRequest;
import org.opensearch.timeseries.transport.JobResponse;
import org.opensearch.timeseries.transport.SuggestConfigParamRequest;
import org.opensearch.timeseries.transport.SuggestConfigParamResponse;
import org.opensearch.timeseries.transport.ValidateConfigRequest;
Expand Down Expand Up @@ -63,6 +69,16 @@ public void suggestAnomalyDetector(SuggestConfigParamRequest suggestRequest, Act
this.client.execute(SuggestAnomalyDetectorParamAction.INSTANCE, suggestRequest, suggestConfigResponseActionListener(listener));
}

@Override
public void createAnomalyDetector(IndexAnomalyDetectorRequest createRequest, ActionListener<IndexAnomalyDetectorResponse> listener) {
this.client.execute(IndexAnomalyDetectorAction.INSTANCE, createRequest, indexAnomalyDetectorResponseActionListener(listener));
}

@Override
public void startAnomalyDetector(JobRequest startRequest, ActionListener<JobResponse> listener) {
this.client.execute(AnomalyDetectorJobAction.INSTANCE, startRequest, jobResponseActionListener(listener));
}

// We need to wrap AD-specific response type listeners around an internal listener, and re-generate the response from a generic
// ActionResponse. This is needed to prevent classloader issues and ClassCastExceptions when executed by other plugins.
// Additionally, we need to inject the configured NamedWriteableRegistry so NamedWriteables (present in sub-fields of
Expand Down Expand Up @@ -107,6 +123,30 @@ private ActionListener<SuggestConfigParamResponse> suggestConfigResponseActionLi
return actionListener;
}

private ActionListener<IndexAnomalyDetectorResponse> indexAnomalyDetectorResponseActionListener(
ActionListener<IndexAnomalyDetectorResponse> listener
) {
ActionListener<IndexAnomalyDetectorResponse> internalListener = ActionListener.wrap(indexAnomalyDetectorResponse -> {
listener.onResponse(indexAnomalyDetectorResponse);
}, listener::onFailure);
ActionListener<IndexAnomalyDetectorResponse> actionListener = wrapActionListener(internalListener, actionResponse -> {
IndexAnomalyDetectorResponse response = IndexAnomalyDetectorResponse
.fromActionResponse(actionResponse, this.namedWriteableRegistry);
return response;
});
return actionListener;
}

private ActionListener<JobResponse> jobResponseActionListener(ActionListener<JobResponse> listener) {
ActionListener<JobResponse> internalListener = ActionListener
.wrap(jobResponse -> { listener.onResponse(jobResponse); }, listener::onFailure);
ActionListener<JobResponse> actionListener = wrapActionListener(internalListener, actionResponse -> {
JobResponse response = JobResponse.fromActionResponse(actionResponse, this.namedWriteableRegistry);
return response;
});
return actionListener;
}

private <T extends ActionResponse> ActionListener<T> wrapActionListener(
final ActionListener<T> listener,
final Function<ActionResponse, T> recreate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.timeseries.transport.BaseJobTransportAction;
import org.opensearch.transport.TransportService;
Expand All @@ -47,7 +48,8 @@ public AnomalyDetectorJobTransportAction(
ClusterService clusterService,
Settings settings,
NamedXContentRegistry xContentRegistry,
ADIndexJobActionHandler adIndexJobActionHandler
ADIndexJobActionHandler adIndexJobActionHandler,
NamedWriteableRegistry namedWriteableRegistry
) {
super(
transportService,
Expand All @@ -64,7 +66,8 @@ public AnomalyDetectorJobTransportAction(
AnomalyDetector.class,
adIndexJobActionHandler,
Clock.systemUTC(), // inject cannot find clock due to OS limitation
AnomalyDetector.class
AnomalyDetector.class,
namedWriteableRegistry
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.rest.RestRequest;

public class IndexAnomalyDetectorRequest extends ActionRequest implements DocRequest {
Expand Down Expand Up @@ -50,10 +51,10 @@ public IndexAnomalyDetectorRequest(StreamInput in) throws IOException {
detector = new AnomalyDetector(in);
method = in.readEnum(RestRequest.Method.class);
requestTimeout = in.readTimeValue();
maxSingleEntityAnomalyDetectors = in.readInt();
maxMultiEntityAnomalyDetectors = in.readInt();
maxAnomalyFeatures = in.readInt();
maxCategoricalFields = in.readInt();
maxSingleEntityAnomalyDetectors = in.readOptionalInt();
maxMultiEntityAnomalyDetectors = in.readOptionalInt();
maxAnomalyFeatures = in.readOptionalInt();
maxCategoricalFields = in.readOptionalInt();
}

public IndexAnomalyDetectorRequest(
Expand Down Expand Up @@ -83,6 +84,22 @@ public IndexAnomalyDetectorRequest(
this.maxCategoricalFields = maxCategoricalFields;
}

public IndexAnomalyDetectorRequest(String detectorID, AnomalyDetector detector, RestRequest.Method method) {
this(
detectorID,
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
WriteRequest.RefreshPolicy.IMMEDIATE,
detector,
method,
TimeValue.timeValueSeconds(60),
null,
null,
null,
null
);
}

public String getDetectorID() {
return detectorID;
}
Expand Down Expand Up @@ -137,10 +154,10 @@ public void writeTo(StreamOutput out) throws IOException {
detector.writeTo(out);
out.writeEnum(method);
out.writeTimeValue(requestTimeout);
out.writeInt(maxSingleEntityAnomalyDetectors);
out.writeInt(maxMultiEntityAnomalyDetectors);
out.writeInt(maxAnomalyFeatures);
out.writeInt(maxCategoricalFields);
out.writeOptionalInt(maxSingleEntityAnomalyDetectors);
out.writeOptionalInt(maxMultiEntityAnomalyDetectors);
out.writeOptionalInt(maxAnomalyFeatures);
out.writeOptionalInt(maxCategoricalFields);
}

@Override
Expand All @@ -162,4 +179,32 @@ public String index() {
public String id() {
return detectorID;
}

public static IndexAnomalyDetectorRequest fromActionRequest(
final ActionRequest actionRequest,
org.opensearch.core.common.io.stream.NamedWriteableRegistry namedWriteableRegistry
) {
if (actionRequest instanceof IndexAnomalyDetectorRequest) {
return (IndexAnomalyDetectorRequest) actionRequest;
}

try (
java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
org.opensearch.core.common.io.stream.OutputStreamStreamOutput osso =
new org.opensearch.core.common.io.stream.OutputStreamStreamOutput(baos)
) {
actionRequest.writeTo(osso);
try (
org.opensearch.core.common.io.stream.StreamInput input = new org.opensearch.core.common.io.stream.InputStreamStreamInput(
new java.io.ByteArrayInputStream(baos.toByteArray())
);
org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput namedInput =
new org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput(input, namedWriteableRegistry)
) {
return new IndexAnomalyDetectorRequest(namedInput);
}
} catch (java.io.IOException e) {
throw new IllegalArgumentException("failed to parse ActionRequest into IndexAnomalyDetectorRequest", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,17 @@

package org.opensearch.ad.transport;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;

import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.rest.RestStatus;
Expand Down Expand Up @@ -81,4 +88,25 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field(RestHandlerUtils._PRIMARY_TERM, primaryTerm)
.endObject();
}

public static IndexAnomalyDetectorResponse fromActionResponse(
ActionResponse actionResponse,
NamedWriteableRegistry namedWriteableRegistry
) {
if (actionResponse instanceof IndexAnomalyDetectorResponse) {
return (IndexAnomalyDetectorResponse) actionResponse;
}

try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputStreamStreamOutput osso = new OutputStreamStreamOutput(baos)) {
actionResponse.writeTo(osso);
try (
StreamInput input = new InputStreamStreamInput(new ByteArrayInputStream(baos.toByteArray()));
NamedWriteableAwareStreamInput namedWriteableAwareInput = new NamedWriteableAwareStreamInput(input, namedWriteableRegistry)
) {
return new IndexAnomalyDetectorResponse(namedWriteableAwareInput);
}
} catch (IOException e) {
throw new UncheckedIOException("failed to parse ActionResponse into IndexAnomalyDetectorResponse", e);
}
}
}
Loading
Loading