Skip to content

Commit 4ee0fd1

Browse files
Add Hybrid Cardinality collector to prioritize Ordinals Collector
Current cardinality aggregator logic selects DirectCollector over OrdinalsCollector when relative memory overhead due to OrdinalsCollector (compared to DirectCollector) is higher. Because of this relative memory consumption logic, DirectCollector is selected for high cardinality aggregation queries. DirectCollector is slower compared to OrdinalsCollector. This default selection leads to higher search latency even when Opensearch process have available memory to use ordinals collector for faster query performance. There is no way to figure out memory requirement for nested aggregation because number of buckets are dynamically created as we traverse through all the matching document ids. To overcome this limitation, this change create a hybrid collector which will first use Ordinals Collector and will switch to DirectCollector if memory usage for Ordinals Collector Increase beyond certain threshold. When Hybrid collector switch from Ordinals Collector to Direct Collector, it will utilize already computed aggregation data from Ordinals Collector so that we do not have to rebuild aggregation result using Direct Collector. Signed-off-by: Anand Pravinbhai Patel <[email protected]>
1 parent 0c3a313 commit 4ee0fd1

File tree

7 files changed

+277
-11
lines changed

7 files changed

+277
-11
lines changed

rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,8 @@ setup:
268268
---
269269
"profiler string":
270270
- skip:
271-
version: " - 7.99.99"
272-
reason: new info added in 8.0.0 to be backported to 7.10.0
271+
version: " - 3.2.99"
272+
reason: hybrid collector added in 3.3.0
273273
- do:
274274
search:
275275
body:
@@ -287,6 +287,7 @@ setup:
287287
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
288288
- match: { profile.shards.0.aggregations.0.debug.empty_collectors_used: 0 }
289289
- match: { profile.shards.0.aggregations.0.debug.numeric_collectors_used: 0 }
290-
- gt: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 }
290+
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 }
291291
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_overhead_too_high: 0 }
292292
- match: { profile.shards.0.aggregations.0.debug.string_hashing_collectors_used: 0 }
293+
- gt: { profile.shards.0.aggregations.0.debug.hybrid_collectors_used: 0 }

rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/170_cardinality_metric_unsigned.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,9 @@ setup:
265265

266266
---
267267
"profiler string":
268+
- skip:
269+
version: " - 3.2.99"
270+
reason: hybrid collector added in 3.3.0
268271
- do:
269272
search:
270273
body:
@@ -282,6 +285,7 @@ setup:
282285
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
283286
- match: { profile.shards.0.aggregations.0.debug.empty_collectors_used: 0 }
284287
- match: { profile.shards.0.aggregations.0.debug.numeric_collectors_used: 0 }
285-
- gt: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 }
288+
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 }
286289
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_overhead_too_high: 0 }
287290
- match: { profile.shards.0.aggregations.0.debug.string_hashing_collectors_used: 0 }
291+
- gt: { profile.shards.0.aggregations.0.debug.hybrid_collectors_used: 0 }

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@
159159
import org.opensearch.script.ScriptService;
160160
import org.opensearch.search.SearchService;
161161
import org.opensearch.search.aggregations.MultiBucketConsumerService;
162+
import org.opensearch.search.aggregations.metrics.CardinalityAggregator;
162163
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
163164
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
164165
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
@@ -574,6 +575,8 @@ public void apply(Settings value, Settings current, Settings previous) {
574575
SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING,
575576
SearchService.SEARCH_MAX_QUERY_STRING_LENGTH,
576577
SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD,
578+
CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_ENABLED,
579+
CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_MEMORY_THRESHOLD,
577580
SearchService.KEYWORD_INDEX_OR_DOC_VALUES_ENABLED,
578581
CreatePitController.PIT_INIT_KEEP_ALIVE,
579582
Node.WRITE_PORTS_FILE_SETTING,

server/src/main/java/org/opensearch/search/DefaultSearchContext.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.opensearch.common.settings.Settings;
5656
import org.opensearch.common.unit.TimeValue;
5757
import org.opensearch.common.util.BigArrays;
58+
import org.opensearch.core.common.unit.ByteSizeValue;
5859
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
5960
import org.opensearch.index.IndexService;
6061
import org.opensearch.index.IndexSettings;
@@ -75,6 +76,7 @@
7576
import org.opensearch.search.aggregations.BucketCollectorProcessor;
7677
import org.opensearch.search.aggregations.InternalAggregation;
7778
import org.opensearch.search.aggregations.SearchContextAggregations;
79+
import org.opensearch.search.aggregations.metrics.CardinalityAggregator;
7880
import org.opensearch.search.builder.SearchSourceBuilder;
7981
import org.opensearch.search.collapse.CollapseContext;
8082
import org.opensearch.search.deciders.ConcurrentSearchDecision;
@@ -220,6 +222,8 @@ final class DefaultSearchContext extends SearchContext {
220222
private final int maxAggRewriteFilters;
221223
private final int filterRewriteSegmentThreshold;
222224
private final int cardinalityAggregationPruningThreshold;
225+
private final boolean cardinalityAggregationHybridCollectorEnabled;
226+
private final long cardinalityAggregationHybridCollectorMemoryThreshold;
223227
private final int bucketSelectionStrategyFactor;
224228
private final boolean keywordIndexOrDocValuesEnabled;
225229

@@ -287,6 +291,8 @@ final class DefaultSearchContext extends SearchContext {
287291
this.maxAggRewriteFilters = evaluateFilterRewriteSetting();
288292
this.filterRewriteSegmentThreshold = evaluateAggRewriteFilterSegThreshold();
289293
this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold();
294+
this.cardinalityAggregationHybridCollectorEnabled = evaluateCardinalityAggregationHybridCollectorEnabled();
295+
this.cardinalityAggregationHybridCollectorMemoryThreshold = evaluateCardinalityAggregationHybridCollectorMemoryThreshold();
290296
this.bucketSelectionStrategyFactor = evaluateBucketSelectionStrategyFactor();
291297
this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories;
292298
this.keywordIndexOrDocValuesEnabled = evaluateKeywordIndexOrDocValuesEnabled();
@@ -1238,6 +1244,16 @@ public int cardinalityAggregationPruningThreshold() {
12381244
return cardinalityAggregationPruningThreshold;
12391245
}
12401246

1247+
@Override
1248+
public boolean cardinalityAggregationHybridCollectorEnabled() {
1249+
return cardinalityAggregationHybridCollectorEnabled;
1250+
}
1251+
1252+
@Override
1253+
public long cardinalityAggregationHybridCollectorMemoryThreshold() {
1254+
return cardinalityAggregationHybridCollectorMemoryThreshold;
1255+
}
1256+
12411257
@Override
12421258
public int bucketSelectionStrategyFactor() {
12431259
return bucketSelectionStrategyFactor;
@@ -1255,6 +1271,22 @@ private int evaluateCardinalityAggregationPruningThreshold() {
12551271
return 0;
12561272
}
12571273

1274+
private boolean evaluateCardinalityAggregationHybridCollectorEnabled() {
1275+
if (clusterService != null) {
1276+
return clusterService.getClusterSettings().get(CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_ENABLED);
1277+
}
1278+
return false;
1279+
}
1280+
1281+
private long evaluateCardinalityAggregationHybridCollectorMemoryThreshold() {
1282+
if (clusterService != null) {
1283+
ByteSizeValue threshold = clusterService.getClusterSettings()
1284+
.get(CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_MEMORY_THRESHOLD);
1285+
return threshold.getBytes();
1286+
}
1287+
return Runtime.getRuntime().maxMemory() / 100; // 1% default
1288+
}
1289+
12581290
private int evaluateBucketSelectionStrategyFactor() {
12591291
if (clusterService != null) {
12601292
return clusterService.getClusterSettings().get(BUCKET_SELECTION_STRATEGY_FACTOR_SETTING);

server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java

Lines changed: 116 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,13 @@
5757
import org.opensearch.common.hash.MurmurHash3;
5858
import org.opensearch.common.lease.Releasable;
5959
import org.opensearch.common.lease.Releasables;
60+
import org.opensearch.common.settings.Setting;
6061
import org.opensearch.common.util.BigArrays;
6162
import org.opensearch.common.util.BitArray;
6263
import org.opensearch.common.util.BitMixer;
6364
import org.opensearch.common.util.LongArray;
6465
import org.opensearch.common.util.ObjectArray;
66+
import org.opensearch.core.common.unit.ByteSizeValue;
6567
import org.opensearch.core.rest.RestStatus;
6668
import org.opensearch.index.fielddata.SortedBinaryDocValues;
6769
import org.opensearch.index.fielddata.SortedNumericDoubleValues;
@@ -88,6 +90,27 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue
8890

8991
private static final Logger logger = LogManager.getLogger(CardinalityAggregator.class);
9092

93+
/**
94+
* Setting to enable/disable hybrid collector for cardinality aggregation.
95+
*/
96+
public static final Setting<Boolean> CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_ENABLED = Setting.boolSetting(
97+
"search.aggregations.cardinality.hybrid_collector.enabled",
98+
true,
99+
Setting.Property.NodeScope,
100+
Setting.Property.Dynamic
101+
);
102+
103+
/**
104+
* Setting for hybrid collector memory threshold. Supports both percentage (e.g., "1%")
105+
* and absolute values (e.g., "10mb", "1gb").
106+
*/
107+
public static final Setting<ByteSizeValue> CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_MEMORY_THRESHOLD = Setting.memorySizeSetting(
108+
"search.aggregations.cardinality.hybrid_collector.memory_threshold",
109+
"1%",
110+
Setting.Property.NodeScope,
111+
Setting.Property.Dynamic
112+
);
113+
91114
final CardinalityAggregatorFactory.ExecutionMode executionMode;
92115
final int precision;
93116
final ValuesSource valuesSource;
@@ -103,6 +126,7 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue
103126
int emptyCollectorsUsed;
104127
int numericCollectorsUsed;
105128
int ordinalsCollectorsUsed;
129+
int hybridCollectorsUsed;
106130
int ordinalsCollectorsOverheadTooHigh;
107131
int stringHashingCollectorsUsed;
108132
int dynamicPrunedSegments;
@@ -157,14 +181,24 @@ private Collector pickCollector(LeafReaderContext ctx) throws IOException {
157181
collector = new OrdinalsCollector(counts, ordinalValues, context.bigArrays());
158182
} else if (executionMode == null) {
159183
// no hint provided, fall back to heuristics
160-
final long ordinalsMemoryUsage = OrdinalsCollector.memoryOverhead(maxOrd);
161-
final long countsMemoryUsage = HyperLogLogPlusPlus.memoryUsage(precision);
162-
// only use ordinals if they don't increase memory usage by more than 25%
163-
if (ordinalsMemoryUsage < countsMemoryUsage / 4) {
164-
ordinalsCollectorsUsed++;
165-
collector = new OrdinalsCollector(counts, ordinalValues, context.bigArrays());
184+
// Check if hybrid collector is enabled
185+
boolean hybridCollectorEnabled = context.cardinalityAggregationHybridCollectorEnabled();
186+
187+
if (hybridCollectorEnabled) {
188+
// Use HybridCollector with configurable memory threshold
189+
long memoryThreshold = context.cardinalityAggregationHybridCollectorMemoryThreshold();
190+
MurmurHash3Values hashValues = MurmurHash3Values.hash(source.bytesValues(ctx));
191+
hybridCollectorsUsed++;
192+
collector = new HybridCollector(counts, ordinalValues, hashValues, context.bigArrays(), memoryThreshold);
166193
} else {
167-
ordinalsCollectorsOverheadTooHigh++;
194+
final long ordinalsMemoryUsage = OrdinalsCollector.memoryOverhead(maxOrd);
195+
final long countsMemoryUsage = HyperLogLogPlusPlus.memoryUsage(precision);
196+
if (ordinalsMemoryUsage < countsMemoryUsage / 4) {
197+
ordinalsCollectorsUsed++;
198+
collector = new OrdinalsCollector(counts, ordinalValues, context.bigArrays());
199+
} else {
200+
ordinalsCollectorsOverheadTooHigh++;
201+
}
168202
}
169203
}
170204
}
@@ -339,6 +373,7 @@ public void collectDebugInfo(BiConsumer<String, Object> add) {
339373
add.accept("empty_collectors_used", emptyCollectorsUsed);
340374
add.accept("numeric_collectors_used", numericCollectorsUsed);
341375
add.accept("ordinals_collectors_used", ordinalsCollectorsUsed);
376+
add.accept("hybrid_collectors_used", hybridCollectorsUsed);
342377
add.accept("ordinals_collectors_overhead_too_high", ordinalsCollectorsOverheadTooHigh);
343378
add.accept("string_hashing_collectors_used", stringHashingCollectorsUsed);
344379
add.accept("dynamic_pruned_segments", dynamicPrunedSegments);
@@ -564,6 +599,7 @@ public static long memoryOverhead(long maxOrd) {
564599
private final int maxOrd;
565600
private final HyperLogLogPlusPlus counts;
566601
private ObjectArray<BitArray> visitedOrds;
602+
private long currentMemoryUsage = 0;
567603

568604
OrdinalsCollector(HyperLogLogPlusPlus counts, SortedSetDocValues values, BigArrays bigArrays) {
569605
if (values.getValueCount() > Integer.MAX_VALUE) {
@@ -583,6 +619,8 @@ public void collect(int doc, long bucketOrd) throws IOException {
583619
if (bits == null) {
584620
bits = new BitArray(maxOrd, bigArrays);
585621
visitedOrds.set(bucketOrd, bits);
622+
// Update memory usage when new BitArray is created
623+
currentMemoryUsage += memoryOverhead(maxOrd);
586624
}
587625
if (values.advanceExact(doc)) {
588626
int count = values.docValueCount();
@@ -627,6 +665,10 @@ public void postCollect() throws IOException {
627665
}
628666
}
629667

668+
public long getCurrentMemoryUsage() {
669+
return currentMemoryUsage;
670+
}
671+
630672
@Override
631673
public void close() {
632674
for (int i = 0; i < visitedOrds.size(); i++) {
@@ -761,4 +803,71 @@ public long nextValue() throws IOException {
761803
}
762804
}
763805
}
806+
807+
/**
808+
* Hybrid Collector that starts with OrdinalsCollector and switches to DirectCollector
809+
* when memory consumption exceeds a threshold.
810+
*/
811+
static class HybridCollector extends Collector {
812+
private final HyperLogLogPlusPlus counts;
813+
private final MurmurHash3Values hashValues;
814+
private final long memoryThreshold;
815+
816+
private Collector activeCollector;
817+
private final OrdinalsCollector ordinalsCollector;
818+
private boolean switchedToDirectCollector = false;
819+
820+
HybridCollector(
821+
HyperLogLogPlusPlus counts,
822+
SortedSetDocValues ordinalValues,
823+
MurmurHash3Values hashValues,
824+
BigArrays bigArrays,
825+
long memoryThreshold
826+
) {
827+
this.counts = counts;
828+
this.hashValues = hashValues;
829+
this.memoryThreshold = memoryThreshold;
830+
831+
// Start with OrdinalsCollector
832+
this.ordinalsCollector = new OrdinalsCollector(counts, ordinalValues, bigArrays);
833+
this.activeCollector = ordinalsCollector;
834+
}
835+
836+
@Override
837+
public void collect(int doc, long bucketOrd) throws IOException {
838+
activeCollector.collect(doc, bucketOrd);
839+
840+
// Check memory usage after collection
841+
if (!switchedToDirectCollector && ordinalsCollector.getCurrentMemoryUsage() > memoryThreshold) {
842+
switchToDirectCollector();
843+
}
844+
}
845+
846+
private void switchToDirectCollector() throws IOException {
847+
// Switching to Direct Collector because memory consumption from Ordinals Collector is high.
848+
// Post collect all the already computed data.
849+
ordinalsCollector.postCollect();
850+
ordinalsCollector.close();
851+
logger.debug("switching to direct collector");
852+
853+
// Pass computed data to Direct Collector.
854+
activeCollector = new DirectCollector(counts, hashValues);
855+
switchedToDirectCollector = true;
856+
}
857+
858+
@Override
859+
public void postCollect() throws IOException {
860+
activeCollector.postCollect();
861+
}
862+
863+
@Override
864+
public void close() {
865+
Releasables.close(activeCollector);
866+
}
867+
868+
// Visible for testing
869+
Collector getActiveCollector() {
870+
return activeCollector;
871+
}
872+
}
764873
}

server/src/main/java/org/opensearch/search/internal/SearchContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,14 @@ public int cardinalityAggregationPruningThreshold() {
555555
return 0;
556556
}
557557

558+
public boolean cardinalityAggregationHybridCollectorEnabled() {
559+
return false;
560+
}
561+
562+
public long cardinalityAggregationHybridCollectorMemoryThreshold() {
563+
return Runtime.getRuntime().maxMemory() / 100; // 1% of available memory default
564+
}
565+
558566
public int bucketSelectionStrategyFactor() {
559567
return SearchService.DEFAULT_BUCKET_SELECTION_STRATEGY_FACTOR;
560568
}

0 commit comments

Comments
 (0)