Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Semantic Field] Support the sparse two phase processor for the semantic field.
- [Stats] Add stats for agentic query and agentic query translator processor.
- [Agentic Search] Adds validations and logging for agentic query
- [Agentic Search] Convert agentic query translator processor to system-generated processor

### Bug Fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.SearchResponseProcessor;
import org.opensearch.search.pipeline.SystemGeneratedProcessor;
import org.opensearch.search.query.QueryPhaseSearcher;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
Expand Down Expand Up @@ -326,7 +327,15 @@ public Map<String, org.opensearch.search.pipeline.Processor.Factory<SearchReques
NeuralQueryEnricherProcessor.TYPE,
new NeuralQueryEnricherProcessor.Factory(),
NeuralSparseTwoPhaseProcessor.TYPE,
new NeuralSparseTwoPhaseProcessor.Factory(),
new NeuralSparseTwoPhaseProcessor.Factory()
);
}

@Override
public Map<String, SystemGeneratedProcessor.SystemGeneratedFactory<SearchRequestProcessor>> getSystemGeneratedRequestProcessors(
Parameters parameters
) {
return Map.of(
AgenticQueryTranslatorProcessor.TYPE,
new AgenticQueryTranslatorProcessor.Factory(clientAccessor, xContentRegistry, settingsAccessor)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package org.opensearch.neuralsearch.processor;

import com.google.gson.Gson;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.common.xcontent.XContentType;
Expand All @@ -20,7 +21,8 @@
import org.opensearch.neuralsearch.stats.events.EventStatsManager;
import org.opensearch.neuralsearch.util.NeuralSearchClusterUtil;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.SystemGeneratedProcessor;
import org.opensearch.search.pipeline.ProcessorGenerationContext;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.PipelineProcessingContext;
Expand All @@ -31,29 +33,28 @@
import java.util.Locale;
import java.util.Map;

import static org.opensearch.ingest.ConfigurationUtils.readStringProperty;

@Log4j2
public class AgenticQueryTranslatorProcessor extends AbstractProcessor implements SearchRequestProcessor {
public class AgenticQueryTranslatorProcessor implements SearchRequestProcessor, SystemGeneratedProcessor {

public static final String TYPE = "agentic_query_translator";
private static final int MAX_AGENT_RESPONSE_SIZE = 10_000;
private final MLCommonsClientAccessor mlClient;
private final String agentId;
private final NamedXContentRegistry xContentRegistry;
private static final Gson gson = new Gson();;
private final String tag;
private final boolean ignoreFailure;
private static final String DESCRIPTION =
"This is a system generated search request processor which will be executed before agentic search request to execute an agent";
private static final Gson gson = new Gson();

AgenticQueryTranslatorProcessor(
String tag,
String description,
boolean ignoreFailure,
MLCommonsClientAccessor mlClient,
String agentId,
NamedXContentRegistry xContentRegistry
) {
super(tag, description, ignoreFailure);
this.tag = tag;
this.ignoreFailure = ignoreFailure;
this.mlClient = mlClient;
this.agentId = agentId;
this.xContentRegistry = xContentRegistry;
}

Expand All @@ -80,12 +81,8 @@ public void processRequestAsync(

// Validate that agentic query is used alone without other search features
if (hasOtherSearchFeatures(sourceBuilder)) {
String errorMessage = String.format(
Locale.ROOT,
"Agentic search blocked - Invalid usage with other search features - Agent ID: [%s], Query: [%s]",
agentId,
agenticQuery.getQueryText()
);
String errorMessage =
"Agentic search blocked - Invalid usage with other search features like aggregation, sort, filters, collapse";
requestListener.onFailure(new IllegalArgumentException(errorMessage));
return;
}
Expand All @@ -109,6 +106,7 @@ private void executeAgentAsync(
ActionListener<SearchRequest> requestListener
) {
Map<String, String> parameters = new HashMap<>();
String agentId = agenticQuery.getAgentId();
parameters.put("query_text", agenticQuery.getQueryText());

// Get index mapping from the search request
Expand All @@ -131,22 +129,14 @@ private void executeAgentAsync(

// Validate response size to prevent memory exhaustion
if (agentResponse == null) {
String errorMessage = String.format(
Locale.ROOT,
"Agentic search failed - Null response from agent - Agent ID: [%s], Query: [%s]",
agentId,
agenticQuery.getQueryText()
);
throw new IllegalArgumentException(errorMessage);
throw new IllegalArgumentException("Agentic search failed - Null response from agent");
}

if (agentResponse.length() > MAX_AGENT_RESPONSE_SIZE) {
String errorMessage = String.format(
Locale.ROOT,
"Agentic search blocked - Response size exceeded limit - Agent ID: [%s], Size: [%d], Query: [%s]. Maximum allowed size is %d characters.",
agentId,
"Agentic search blocked - Response size exceeded limit. Size: [%d], Maximum allowed size is %d characters.",
agentResponse.length(),
agenticQuery.getQueryText(),
MAX_AGENT_RESPONSE_SIZE
);
throw new IllegalArgumentException(errorMessage);
Expand All @@ -161,22 +151,11 @@ private void executeAgentAsync(

requestListener.onResponse(request);
} catch (IOException e) {
String errorMessage = String.format(
Locale.ROOT,
"Agentic search failed - Parse error - Agent ID: [%s], Error: [%s]",
agentId,
e.getMessage()
);
String errorMessage = String.format(Locale.ROOT, "Agentic search failed - Parse error: [%s]", e.getMessage());
requestListener.onFailure(new IOException(errorMessage, e));
}
}, e -> {
String errorMessage = String.format(
Locale.ROOT,
"Agentic search failed - Agent execution error - Agent ID: [%s], Query: [%s], Error: [%s]",
agentId,
agenticQuery.getQueryText(),
e.getMessage()
);
String errorMessage = String.format(Locale.ROOT, "Agentic search failed - Agent execution error: [%s]", e.getMessage());
requestListener.onFailure(new RuntimeException(errorMessage, e));
}));
}
Expand All @@ -191,19 +170,44 @@ public String getType() {
return TYPE;
}

public static class Factory implements Processor.Factory<SearchRequestProcessor> {
@Override
public String getTag() {
return this.tag;
}

@Override
public String getDescription() {
return DESCRIPTION;
}

@Override
public boolean isIgnoreFailure() {
return this.ignoreFailure;
}

@Override
public ExecutionStage getExecutionStage() {
// Execute before user-defined processors as agentic query would be replaced by the new DSL
return ExecutionStage.PRE_USER_DEFINED;
}

@AllArgsConstructor
public static class Factory implements SystemGeneratedProcessor.SystemGeneratedFactory<SearchRequestProcessor> {
private final MLCommonsClientAccessor mlClient;
private final NamedXContentRegistry xContentRegistry;
private final NeuralSearchSettingsAccessor settingsAccessor;

public Factory(
MLCommonsClientAccessor mlClient,
NamedXContentRegistry xContentRegistry,
NeuralSearchSettingsAccessor settingsAccessor
) {
this.mlClient = mlClient;
this.xContentRegistry = xContentRegistry;
this.settingsAccessor = settingsAccessor;
@Override
public boolean shouldGenerate(ProcessorGenerationContext context) {
SearchRequest searchRequest = context.searchRequest();
if (searchRequest == null || searchRequest.source() == null) {
return false;
}

boolean hasAgenticQuery = searchRequest.source().query() instanceof AgenticSearchQueryBuilder;
log.debug("Query type: {}, hasAgenticQuery: {}", searchRequest.source().query().getClass().getSimpleName(), hasAgenticQuery);

return hasAgenticQuery;
}

@Override
Expand All @@ -221,11 +225,7 @@ public AgenticQueryTranslatorProcessor create(
"Agentic search is currently disabled. Enable it using the 'plugins.neural_search.agentic_search_enabled' setting."
);
}
String agentId = readStringProperty(TYPE, tag, config, "agent_id");
if (agentId == null || agentId.trim().isEmpty()) {
throw new IllegalArgumentException("agent_id is required for agentic_query_translator processor");
}
return new AgenticQueryTranslatorProcessor(tag, description, ignoreFailure, mlClient, agentId, xContentRegistry);
return new AgenticQueryTranslatorProcessor(tag, ignoreFailure, mlClient, xContentRegistry);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public final class AgenticSearchQueryBuilder extends AbstractQueryBuilder<Agenti
public static final String NAME = "agentic";
public static final ParseField QUERY_TEXT_FIELD = new ParseField("query_text");
public static final ParseField QUERY_FIELDS = new ParseField("query_fields");
public static final ParseField AGENT_ID_FIELD = new ParseField("agent_id");

// Regex patterns for sanitizing query text
private static final String SYSTEM_INSTRUCTION_PATTERN = "(?i)\\b(system|instruction|prompt)\\s*:";
Expand All @@ -57,6 +58,7 @@ public final class AgenticSearchQueryBuilder extends AbstractQueryBuilder<Agenti
private static final int MAX_QUERY_LENGTH = 1000;
public String queryText;
public List<String> queryFields;
public String agentId;

// setting accessor to retrieve agentic search feature flag
private static NeuralSearchSettingsAccessor SETTINGS_ACCESSOR;
Expand All @@ -69,6 +71,7 @@ public AgenticSearchQueryBuilder(StreamInput in) throws IOException {
super(in);
this.queryText = in.readString();
this.queryFields = in.readOptionalStringList();
this.agentId = in.readOptionalString();
}

public String getQueryText() {
Expand All @@ -79,6 +82,10 @@ public List<String> getQueryFields() {
return queryFields;
}

public String getAgentId() {
return agentId;
}

@Override
protected void doWriteTo(StreamOutput out) throws IOException {
// feature flag check
Expand All @@ -89,6 +96,7 @@ protected void doWriteTo(StreamOutput out) throws IOException {
}
out.writeString(this.queryText);
out.writeOptionalStringCollection(this.queryFields);
out.writeOptionalString(this.agentId);
}

@Override
Expand All @@ -106,6 +114,9 @@ protected void doXContent(XContentBuilder xContentBuilder, Params params) throws
if (Objects.nonNull(queryFields) && !queryFields.isEmpty()) {
xContentBuilder.field(QUERY_FIELDS.getPreferredName(), queryFields);
}
if (Objects.nonNull(agentId)) {
xContentBuilder.field(AGENT_ID_FIELD.getPreferredName(), agentId);
}
xContentBuilder.endObject();
}

Expand All @@ -115,6 +126,7 @@ protected void doXContent(XContentBuilder xContentBuilder, Params params) throws
* {
* "agentic": {
* "query_text": "string",
* "agent_id": "string"
* "query_fields": ["string", "string"..]
* }
* }
Expand All @@ -133,6 +145,8 @@ public static AgenticSearchQueryBuilder fromXContent(XContentParser parser) thro
} else if (token.isValue()) {
if (QUERY_TEXT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
agenticSearchQueryBuilder.queryText = parser.text();
} else if (AGENT_ID_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
agenticSearchQueryBuilder.agentId = parser.text();
} else {
throw new ParsingException(parser.getTokenLocation(), "Unknown field [" + currentFieldName + "]");
}
Expand All @@ -157,6 +171,9 @@ public static AgenticSearchQueryBuilder fromXContent(XContentParser parser) thro
throw new ParsingException(parser.getTokenLocation(), "[" + QUERY_TEXT_FIELD.getPreferredName() + "] is required");
}

if (agenticSearchQueryBuilder.agentId == null || agenticSearchQueryBuilder.agentId.trim().isEmpty()) {
throw new ParsingException(parser.getTokenLocation(), "[" + AGENT_ID_FIELD.getPreferredName() + "] is required");
}
// Sanitize query text to prevent prompt injection
agenticSearchQueryBuilder.queryText = sanitizeQueryText(agenticSearchQueryBuilder.queryText);

Expand All @@ -171,8 +188,21 @@ protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws

@Override
protected Query doToQuery(QueryShardContext context) throws IOException {
// This should not be reached if the system-generated processor is working correctly
if (agentId == null || agentId.trim().isEmpty()) {
throw new IllegalStateException("Agentic search query requires an agent_id. Provide agent_id in the query.");
}
// Check if the system-generated processor is enabled
if (!SETTINGS_ACCESSOR.isSystemGenerateProcessorEnabled("agentic_query_translator")) {
throw new IllegalStateException(
"Agentic search requires the agentic_query_translator system processor to be enabled. "
+ "Add 'agentic_query_translator' to the 'cluster.search.enabled_system_generated_factories' setting."
);
}

throw new IllegalStateException(
"Agentic search query must be used as top-level query, not nested inside other queries. Should be used with agentic_query_translator search processor"
"Agentic search query must be processed by the agentic_query_translator system processor before query execution. "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add a validation in the AgenticSearchQueryBuilder to ensure the processor is enabled in the cluster setting cluster.search.enabled_system_generated_factories.

Copy link
Member Author

@owaiskazi19 owaiskazi19 Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a new generic method NeuralSearchSettingsAccessor for all the system generated processor to use to verify if they are enabled in the factories

+ "Ensure the neural search plugin is properly installed and the agentic search feature is enabled."
);
}

Expand All @@ -183,12 +213,13 @@ protected boolean doEquals(AgenticSearchQueryBuilder obj) {
EqualsBuilder equalsBuilder = new EqualsBuilder();
equalsBuilder.append(queryText, obj.queryText);
equalsBuilder.append(queryFields, obj.queryFields);
equalsBuilder.append(agentId, obj.agentId);
return equalsBuilder.isEquals();
}

@Override
protected int doHashCode() {
return new HashCodeBuilder().append(queryText).append(queryFields).toHashCode();
return new HashCodeBuilder().append(queryText).append(queryFields).append(agentId).toHashCode();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,17 @@ public class NeuralSearchSettingsAccessor {
@Getter
private volatile boolean isAgenticSearchEnabled;

private static final String SYSTEM_GENERATED_PIPELINE_SETTINGS = "cluster.search.enabled_system_generated_factories";

private final ClusterService clusterService;

/**
* Constructor, registers callbacks to update settings
* @param clusterService
* @param settings
*/
public NeuralSearchSettingsAccessor(ClusterService clusterService, Settings settings) {
this.clusterService = clusterService;
isStatsEnabled = NeuralSearchSettings.NEURAL_STATS_ENABLED.get(settings);
isAgenticSearchEnabled = NeuralSearchSettings.AGENTIC_SEARCH_ENABLED.get(settings);
registerSettingsCallbacks(clusterService, settings);
Expand Down Expand Up @@ -59,4 +64,13 @@ private void registerSettingsCallbacks(ClusterService clusterService, Settings s
ClusterTrainingExecutor.updateThreadPoolSize(maxThreadQty, setting);
});
}

/**
* Checks if the system processor is enabled
* @return true if the processor is enabled in cluster settings
*/
public boolean isSystemGenerateProcessorEnabled(String processor) {
String enabledFactories = String.valueOf(clusterService.getClusterSettings().get(SYSTEM_GENERATED_PIPELINE_SETTINGS));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ENABLED_SYSTEM_GENERATED_FACTORIES_SETTING is a list of strings. And I think it's better to get the value through ENABLED_SYSTEM_GENERATED_FACTORIES_SETTING.get(clusterService.getSettings()) which will return a list of string to you.

And we also need to check if it has "*" which will enabled all the system processors.

return enabledFactories != null && enabledFactories.contains(processor);
}
}
Loading
Loading