57
57
import org .opensearch .common .hash .MurmurHash3 ;
58
58
import org .opensearch .common .lease .Releasable ;
59
59
import org .opensearch .common .lease .Releasables ;
60
+ import org .opensearch .common .settings .Setting ;
60
61
import org .opensearch .common .util .BigArrays ;
61
62
import org .opensearch .common .util .BitArray ;
62
63
import org .opensearch .common .util .BitMixer ;
63
64
import org .opensearch .common .util .LongArray ;
64
65
import org .opensearch .common .util .ObjectArray ;
66
+ import org .opensearch .core .common .unit .ByteSizeValue ;
65
67
import org .opensearch .core .rest .RestStatus ;
66
68
import org .opensearch .index .fielddata .SortedBinaryDocValues ;
67
69
import org .opensearch .index .fielddata .SortedNumericDoubleValues ;
@@ -88,6 +90,27 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue
88
90
89
91
private static final Logger logger = LogManager .getLogger (CardinalityAggregator .class );
90
92
93
+ /**
94
+ * Setting to enable/disable hybrid collector for cardinality aggregation.
95
+ */
96
+ public static final Setting <Boolean > CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_ENABLED = Setting .boolSetting (
97
+ "search.aggregations.cardinality.hybrid_collector.enabled" ,
98
+ true ,
99
+ Setting .Property .NodeScope ,
100
+ Setting .Property .Dynamic
101
+ );
102
+
103
+ /**
104
+ * Setting for hybrid collector memory threshold. Supports both percentage (e.g., "1%")
105
+ * and absolute values (e.g., "10mb", "1gb").
106
+ */
107
+ public static final Setting <ByteSizeValue > CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_MEMORY_THRESHOLD = Setting .memorySizeSetting (
108
+ "search.aggregations.cardinality.hybrid_collector.memory_threshold" ,
109
+ "1%" ,
110
+ Setting .Property .NodeScope ,
111
+ Setting .Property .Dynamic
112
+ );
113
+
91
114
final CardinalityAggregatorFactory .ExecutionMode executionMode ;
92
115
final int precision ;
93
116
final ValuesSource valuesSource ;
@@ -103,6 +126,7 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue
103
126
int emptyCollectorsUsed ;
104
127
int numericCollectorsUsed ;
105
128
int ordinalsCollectorsUsed ;
129
+ int hybridCollectorsUsed ;
106
130
int ordinalsCollectorsOverheadTooHigh ;
107
131
int stringHashingCollectorsUsed ;
108
132
int dynamicPrunedSegments ;
@@ -157,14 +181,24 @@ private Collector pickCollector(LeafReaderContext ctx) throws IOException {
157
181
collector = new OrdinalsCollector (counts , ordinalValues , context .bigArrays ());
158
182
} else if (executionMode == null ) {
159
183
// no hint provided, fall back to heuristics
160
- final long ordinalsMemoryUsage = OrdinalsCollector .memoryOverhead (maxOrd );
161
- final long countsMemoryUsage = HyperLogLogPlusPlus .memoryUsage (precision );
162
- // only use ordinals if they don't increase memory usage by more than 25%
163
- if (ordinalsMemoryUsage < countsMemoryUsage / 4 ) {
164
- ordinalsCollectorsUsed ++;
165
- collector = new OrdinalsCollector (counts , ordinalValues , context .bigArrays ());
184
+ // Check if hybrid collector is enabled
185
+ boolean hybridCollectorEnabled = context .cardinalityAggregationHybridCollectorEnabled ();
186
+
187
+ if (hybridCollectorEnabled ) {
188
+ // Use HybridCollector with configurable memory threshold
189
+ long memoryThreshold = context .cardinalityAggregationHybridCollectorMemoryThreshold ();
190
+ MurmurHash3Values hashValues = MurmurHash3Values .hash (source .bytesValues (ctx ));
191
+ hybridCollectorsUsed ++;
192
+ collector = new HybridCollector (counts , ordinalValues , hashValues , context .bigArrays (), memoryThreshold );
166
193
} else {
167
- ordinalsCollectorsOverheadTooHigh ++;
194
+ final long ordinalsMemoryUsage = OrdinalsCollector .memoryOverhead (maxOrd );
195
+ final long countsMemoryUsage = HyperLogLogPlusPlus .memoryUsage (precision );
196
+ if (ordinalsMemoryUsage < countsMemoryUsage / 4 ) {
197
+ ordinalsCollectorsUsed ++;
198
+ collector = new OrdinalsCollector (counts , ordinalValues , context .bigArrays ());
199
+ } else {
200
+ ordinalsCollectorsOverheadTooHigh ++;
201
+ }
168
202
}
169
203
}
170
204
}
@@ -339,6 +373,7 @@ public void collectDebugInfo(BiConsumer<String, Object> add) {
339
373
add .accept ("empty_collectors_used" , emptyCollectorsUsed );
340
374
add .accept ("numeric_collectors_used" , numericCollectorsUsed );
341
375
add .accept ("ordinals_collectors_used" , ordinalsCollectorsUsed );
376
+ add .accept ("hybrid_collectors_used" , hybridCollectorsUsed );
342
377
add .accept ("ordinals_collectors_overhead_too_high" , ordinalsCollectorsOverheadTooHigh );
343
378
add .accept ("string_hashing_collectors_used" , stringHashingCollectorsUsed );
344
379
add .accept ("dynamic_pruned_segments" , dynamicPrunedSegments );
@@ -564,6 +599,7 @@ public static long memoryOverhead(long maxOrd) {
564
599
private final int maxOrd ;
565
600
private final HyperLogLogPlusPlus counts ;
566
601
private ObjectArray <BitArray > visitedOrds ;
602
+ private long currentMemoryUsage = 0 ;
567
603
568
604
OrdinalsCollector (HyperLogLogPlusPlus counts , SortedSetDocValues values , BigArrays bigArrays ) {
569
605
if (values .getValueCount () > Integer .MAX_VALUE ) {
@@ -583,6 +619,8 @@ public void collect(int doc, long bucketOrd) throws IOException {
583
619
if (bits == null ) {
584
620
bits = new BitArray (maxOrd , bigArrays );
585
621
visitedOrds .set (bucketOrd , bits );
622
+ // Update memory usage when new BitArray is created
623
+ currentMemoryUsage += memoryOverhead (maxOrd );
586
624
}
587
625
if (values .advanceExact (doc )) {
588
626
int count = values .docValueCount ();
@@ -627,6 +665,10 @@ public void postCollect() throws IOException {
627
665
}
628
666
}
629
667
668
+ public long getCurrentMemoryUsage () {
669
+ return currentMemoryUsage ;
670
+ }
671
+
630
672
@ Override
631
673
public void close () {
632
674
for (int i = 0 ; i < visitedOrds .size (); i ++) {
@@ -761,4 +803,71 @@ public long nextValue() throws IOException {
761
803
}
762
804
}
763
805
}
806
+
807
+ /**
808
+ * Hybrid Collector that starts with OrdinalsCollector and switches to DirectCollector
809
+ * when memory consumption exceeds a threshold.
810
+ */
811
+ static class HybridCollector extends Collector {
812
+ private final HyperLogLogPlusPlus counts ;
813
+ private final MurmurHash3Values hashValues ;
814
+ private final long memoryThreshold ;
815
+
816
+ private Collector activeCollector ;
817
+ private final OrdinalsCollector ordinalsCollector ;
818
+ private boolean switchedToDirectCollector = false ;
819
+
820
+ HybridCollector (
821
+ HyperLogLogPlusPlus counts ,
822
+ SortedSetDocValues ordinalValues ,
823
+ MurmurHash3Values hashValues ,
824
+ BigArrays bigArrays ,
825
+ long memoryThreshold
826
+ ) {
827
+ this .counts = counts ;
828
+ this .hashValues = hashValues ;
829
+ this .memoryThreshold = memoryThreshold ;
830
+
831
+ // Start with OrdinalsCollector
832
+ this .ordinalsCollector = new OrdinalsCollector (counts , ordinalValues , bigArrays );
833
+ this .activeCollector = ordinalsCollector ;
834
+ }
835
+
836
+ @ Override
837
+ public void collect (int doc , long bucketOrd ) throws IOException {
838
+ activeCollector .collect (doc , bucketOrd );
839
+
840
+ // Check memory usage after collection
841
+ if (!switchedToDirectCollector && ordinalsCollector .getCurrentMemoryUsage () > memoryThreshold ) {
842
+ switchToDirectCollector ();
843
+ }
844
+ }
845
+
846
+ private void switchToDirectCollector () throws IOException {
847
+ // Switching to Direct Collector because memory consumption from Ordinals Collector is high.
848
+ // Post collect all the already computed data.
849
+ ordinalsCollector .postCollect ();
850
+ ordinalsCollector .close ();
851
+ logger .debug ("switching to direct collector" );
852
+
853
+ // Pass computed data to Direct Collector.
854
+ activeCollector = new DirectCollector (counts , hashValues );
855
+ switchedToDirectCollector = true ;
856
+ }
857
+
858
+ @ Override
859
+ public void postCollect () throws IOException {
860
+ activeCollector .postCollect ();
861
+ }
862
+
863
+ @ Override
864
+ public void close () {
865
+ Releasables .close (activeCollector );
866
+ }
867
+
868
+ // Visible for testing
869
+ Collector getActiveCollector () {
870
+ return activeCollector ;
871
+ }
872
+ }
764
873
}
0 commit comments