@@ -75,16 +75,17 @@ void SelectiveDecimalColumnReader<DataT>::seekToRowGroup(int64_t index) {
75
75
76
76
template <typename DataT>
77
77
template <bool kDense >
78
- void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
79
- vector_size_t numRows = rows.back () + 1 ;
78
+ void SelectiveDecimalColumnReader<DataT>::readHelper(
79
+ common::Filter* filter,
80
+ RowSet rows) {
80
81
ExtractToReader extractValues (this );
81
- common::AlwaysTrue filter ;
82
+ common::AlwaysTrue alwaysTrue ;
82
83
DirectRleColumnVisitor<
83
84
int64_t ,
84
85
common::AlwaysTrue,
85
86
decltype (extractValues),
86
87
kDense >
87
- visitor (filter , this , rows, extractValues);
88
+ visitor (alwaysTrue , this , rows, extractValues);
88
89
89
90
// decode scale stream
90
91
if (version_ == velox::dwrf::RleVersion_1) {
@@ -104,46 +105,201 @@ void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
104
105
// reset numValues_ before reading values
105
106
numValues_ = 0 ;
106
107
valueSize_ = sizeof (DataT);
108
+ vector_size_t numRows = rows.back () + 1 ;
107
109
ensureValuesCapacity<DataT>(numRows);
108
110
109
111
// decode value stream
110
112
facebook::velox::dwio::common::
111
113
ColumnVisitor<DataT, common::AlwaysTrue, decltype (extractValues), kDense >
112
- valueVisitor (filter , this , rows, extractValues);
114
+ valueVisitor (alwaysTrue , this , rows, extractValues);
113
115
decodeWithVisitor<DirectDecoder<true >>(valueDecoder_.get (), valueVisitor);
114
116
readOffset_ += numRows;
117
+
118
+ // Fill decimals before applying filter.
119
+ fillDecimals ();
120
+
121
+ const auto rawNulls = nullsInReadRange_
122
+ ? (kDense ? nullsInReadRange_->as <uint64_t >() : rawResultNulls_)
123
+ : nullptr ;
124
+ // Process filter.
125
+ process (filter, rows, rawNulls);
126
+ }
127
+
128
+ template <typename DataT>
129
+ void SelectiveDecimalColumnReader<DataT>::processNulls(
130
+ bool isNull,
131
+ const RowSet& rows,
132
+ const uint64_t * rawNulls) {
133
+ if (!rawNulls) {
134
+ return ;
135
+ }
136
+ returnReaderNulls_ = false ;
137
+ anyNulls_ = !isNull;
138
+ allNull_ = isNull;
139
+
140
+ auto rawDecimal = values_->asMutable <DataT>();
141
+ auto rawScale = scaleBuffer_->asMutable <int64_t >();
142
+
143
+ vector_size_t idx = 0 ;
144
+ if (isNull) {
145
+ for (vector_size_t i = 0 ; i < numValues_; i++) {
146
+ if (bits::isBitNull (rawNulls, i)) {
147
+ bits::setNull (rawResultNulls_, idx);
148
+ addOutputRow (rows[i]);
149
+ idx++;
150
+ }
151
+ }
152
+ } else {
153
+ for (vector_size_t i = 0 ; i < numValues_; i++) {
154
+ if (!bits::isBitNull (rawNulls, i)) {
155
+ bits::setNull (rawResultNulls_, idx, false );
156
+ rawDecimal[idx] = rawDecimal[i];
157
+ rawScale[idx] = rawScale[i];
158
+ addOutputRow (rows[i]);
159
+ idx++;
160
+ }
161
+ }
162
+ }
163
+ }
164
+
165
+ template <typename DataT>
166
+ void SelectiveDecimalColumnReader<DataT>::processFilter(
167
+ const common::Filter* filter,
168
+ const RowSet& rows,
169
+ const uint64_t * rawNulls) {
170
+ VELOX_CHECK_NOT_NULL (filter, " Filter must not be null." );
171
+ returnReaderNulls_ = false ;
172
+ anyNulls_ = false ;
173
+ allNull_ = true ;
174
+
175
+ vector_size_t idx = 0 ;
176
+ auto rawDecimal = values_->asMutable <DataT>();
177
+ for (vector_size_t i = 0 ; i < numValues_; i++) {
178
+ if (rawNulls && bits::isBitNull (rawNulls, i)) {
179
+ if (filter->testNull ()) {
180
+ bits::setNull (rawResultNulls_, idx);
181
+ addOutputRow (rows[i]);
182
+ anyNulls_ = true ;
183
+ idx++;
184
+ }
185
+ } else {
186
+ bool tested;
187
+ if constexpr (std::is_same_v<DataT, int64_t >) {
188
+ tested = filter->testInt64 (rawDecimal[i]);
189
+ } else {
190
+ tested = filter->testInt128 (rawDecimal[i]);
191
+ }
192
+
193
+ if (tested) {
194
+ if (rawNulls) {
195
+ bits::setNull (rawResultNulls_, idx, false );
196
+ }
197
+ rawDecimal[idx] = rawDecimal[i];
198
+ addOutputRow (rows[i]);
199
+ allNull_ = false ;
200
+ idx++;
201
+ }
202
+ }
203
+ }
204
+ }
205
+
206
+ template <typename DataT>
207
+ void SelectiveDecimalColumnReader<DataT>::process(
208
+ const common::Filter* filter,
209
+ const RowSet& rows,
210
+ const uint64_t * rawNulls) {
211
+ // Treat the filter as kAlwaysTrue if any of the following conditions are met:
212
+ // 1) No filter found;
213
+ // 2) Filter is kIsNotNull but rawNulls == NULL (no elements is null).
214
+ auto filterKind =
215
+ !filter || (filter->kind () == common::FilterKind::kIsNotNull && !rawNulls)
216
+ ? common::FilterKind::kAlwaysTrue
217
+ : filter->kind ();
218
+ switch (filterKind) {
219
+ case common::FilterKind::kAlwaysTrue :
220
+ // Simply add all rows to output.
221
+ for (vector_size_t i = 0 ; i < numValues_; i++) {
222
+ addOutputRow (rows[i]);
223
+ }
224
+ break ;
225
+ case common::FilterKind::kIsNull :
226
+ processNulls (true , rows, rawNulls);
227
+ break ;
228
+ case common::FilterKind::kIsNotNull :
229
+ processNulls (false , rows, rawNulls);
230
+ break ;
231
+ case common::FilterKind::kBigintRange :
232
+ case common::FilterKind::kBigintValuesUsingHashTable :
233
+ case common::FilterKind::kBigintValuesUsingBitmask :
234
+ case common::FilterKind::kNegatedBigintRange :
235
+ case common::FilterKind::kNegatedBigintValuesUsingHashTable :
236
+ case common::FilterKind::kNegatedBigintValuesUsingBitmask :
237
+ case common::FilterKind::kBigintMultiRange : {
238
+ if constexpr (std::is_same_v<DataT, int64_t >) {
239
+ processFilter (filter, rows, rawNulls);
240
+ } else {
241
+ const auto actualType = CppToType<DataT>::create ();
242
+ VELOX_NYI (
243
+ " Expected type BIGINT, but found file type {}." ,
244
+ actualType->toString ());
245
+ }
246
+ break ;
247
+ }
248
+ case common::FilterKind::kHugeintValuesUsingHashTable :
249
+ case common::FilterKind::kHugeintRange : {
250
+ if constexpr (std::is_same_v<DataT, int128_t >) {
251
+ processFilter (filter, rows, rawNulls);
252
+ } else {
253
+ const auto actualType = CppToType<DataT>::create ();
254
+ VELOX_NYI (
255
+ " Expected type HUGEINT, but found file type {}." ,
256
+ actualType->toString ());
257
+ }
258
+ break ;
259
+ }
260
+ default :
261
+ VELOX_NYI (" Unsupported filter: {}." , static_cast <int >(filterKind));
262
+ }
115
263
}
116
264
117
265
template <typename DataT>
118
266
void SelectiveDecimalColumnReader<DataT>::read(
119
267
int64_t offset,
120
268
const RowSet& rows,
121
269
const uint64_t * incomingNulls) {
122
- VELOX_CHECK (!scanSpec_->filter ());
123
270
VELOX_CHECK (!scanSpec_->valueHook ());
124
271
prepareRead<int64_t >(offset, rows, incomingNulls);
272
+ if (!resultNulls_ || !resultNulls_->unique () ||
273
+ resultNulls_->capacity () * 8 < rows.size ()) {
274
+ // Make sure a dedicated resultNulls_ is allocated with enough capacity as
275
+ // RleDecoder always assumes it is available.
276
+ resultNulls_ = AlignedBuffer::allocate<bool >(rows.size (), memoryPool_);
277
+ rawResultNulls_ = resultNulls_->asMutable <uint64_t >();
278
+ }
125
279
bool isDense = rows.back () == rows.size () - 1 ;
126
280
if (isDense) {
127
- readHelper<true >(rows);
281
+ readHelper<true >(scanSpec_-> filter (), rows);
128
282
} else {
129
- readHelper<false >(rows);
283
+ readHelper<false >(scanSpec_-> filter (), rows);
130
284
}
131
285
}
132
286
133
287
template <typename DataT>
134
288
void SelectiveDecimalColumnReader<DataT>::getValues(
135
289
const RowSet& rows,
136
290
VectorPtr* result) {
291
+ rawValues_ = values_->asMutable <char >();
292
+ getIntValues (rows, requestedType_, result);
293
+ }
294
+
295
+ template <typename DataT>
296
+ void SelectiveDecimalColumnReader<DataT>::fillDecimals() {
137
297
auto nullsPtr =
138
298
resultNulls () ? resultNulls ()->template as <uint64_t >() : nullptr ;
139
299
auto scales = scaleBuffer_->as <int64_t >();
140
300
auto values = values_->asMutable <DataT>();
141
-
142
301
DecimalUtil::fillDecimals<DataT>(
143
302
values, nullsPtr, values, scales, numValues_, scale_);
144
-
145
- rawValues_ = values_->asMutable <char >();
146
- getIntValues (rows, requestedType_, result);
147
303
}
148
304
149
305
template class SelectiveDecimalColumnReader <int64_t >;
0 commit comments