Skip to content

Commit b437c4e

Browse files
rui-moJkSelf
authored andcommitted
Support timestamp reader for Parquet file format (4680)
1 parent 9d96360 commit b437c4e

14 files changed

+301
-14
lines changed

velox/connectors/hive/HiveDataSource.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -419,11 +419,14 @@ HiveDataSource::HiveDataSource(
419419
for (auto& [k, v] : hiveTableHandle_->subfieldFilters()) {
420420
filters.emplace(k.clone(), v->clone());
421421
}
422-
auto remainingFilter = extractFiltersFromRemainingFilter(
423-
hiveTableHandle_->remainingFilter(),
424-
expressionEvaluator_,
425-
false,
426-
filters);
422+
auto remainingFilter = hiveTableHandle_->remainingFilter();
423+
if (hiveTableHandle_->isFilterPushdownEnabled()) {
424+
remainingFilter = extractFiltersFromRemainingFilter(
425+
hiveTableHandle_->remainingFilter(),
426+
expressionEvaluator_,
427+
false,
428+
filters);
429+
}
427430

428431
std::vector<common::Subfield> remainingFilterSubfields;
429432
if (remainingFilter) {

velox/dwio/common/SelectiveColumnReader.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,9 @@ void SelectiveColumnReader::getIntValues(
214214
VELOX_FAIL("Unsupported value size: {}", valueSize_);
215215
}
216216
break;
217+
case TypeKind::TIMESTAMP:
218+
getFlatValues<Timestamp, Timestamp>(rows, result, requestedType);
219+
break;
217220
default:
218221
VELOX_FAIL(
219222
"Not a valid type for integer reader: {}", requestedType->toString());

velox/dwio/parquet/reader/PageReader.cpp

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,51 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
401401
}
402402
break;
403403
}
404+
case thrift::Type::INT96: {
405+
auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp);
406+
dictionary_.values = AlignedBuffer::allocate<char>(numVeloxBytes, &pool_);
407+
auto numBytes = dictionary_.numValues * sizeof(Int96Timestamp);
408+
if (pageData_) {
409+
memcpy(dictionary_.values->asMutable<char>(), pageData_, numBytes);
410+
} else {
411+
dwio::common::readBytes(
412+
numBytes,
413+
inputStream_.get(),
414+
dictionary_.values->asMutable<char>(),
415+
bufferStart_,
416+
bufferEnd_);
417+
}
418+
// Expand the Parquet type length values to Velox type length.
419+
// We start from the end to allow in-place expansion.
420+
auto values = dictionary_.values->asMutable<Timestamp>();
421+
auto parquetValues = dictionary_.values->asMutable<char>();
422+
static constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
423+
static constexpr int64_t kSecondsPerDay = 86400LL;
424+
static constexpr int64_t kNanosPerSecond =
425+
Timestamp::kNanosecondsInMillisecond *
426+
Timestamp::kMillisecondsInSecond;
427+
for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
428+
// Convert the timestamp into seconds and nanos since the Unix epoch,
429+
// 00:00:00.000000 on 1 January 1970.
430+
uint64_t nanos;
431+
memcpy(
432+
&nanos,
433+
parquetValues + i * sizeof(Int96Timestamp),
434+
sizeof(uint64_t));
435+
int32_t days;
436+
memcpy(
437+
&days,
438+
parquetValues + i * sizeof(Int96Timestamp) + sizeof(uint64_t),
439+
sizeof(int32_t));
440+
int64_t seconds = (days - kJulianToUnixEpochDays) * kSecondsPerDay;
441+
if (nanos > Timestamp::kMaxNanos) {
442+
seconds += nanos / kNanosPerSecond;
443+
nanos -= (nanos / kNanosPerSecond) * kNanosPerSecond;
444+
}
445+
values[i] = Timestamp(seconds, nanos);
446+
}
447+
break;
448+
}
404449
case thrift::Type::BYTE_ARRAY: {
405450
dictionary_.values =
406451
AlignedBuffer::allocate<StringView>(dictionary_.numValues, &pool_);
@@ -491,7 +536,6 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
491536
VELOX_UNSUPPORTED(
492537
"Parquet type {} not supported for dictionary", parquetType);
493538
}
494-
case thrift::Type::INT96:
495539
default:
496540
VELOX_UNSUPPORTED(
497541
"Parquet type {} not supported for dictionary", parquetType);
@@ -518,6 +562,8 @@ int32_t parquetTypeBytes(thrift::Type::type type) {
518562
case thrift::Type::INT64:
519563
case thrift::Type::DOUBLE:
520564
return 8;
565+
case thrift::Type::INT96:
566+
return 12;
521567
default:
522568
VELOX_FAIL("Type does not have a byte width {}", type);
523569
}

velox/dwio/parquet/reader/ParquetColumnReader.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "velox/dwio/parquet/reader/StructColumnReader.h"
2929

3030
#include "velox/dwio/parquet/reader/Statistics.h"
31+
#include "velox/dwio/parquet/reader/TimestampColumnReader.h"
3132
#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h"
3233

3334
namespace facebook::velox::parquet {
@@ -77,6 +78,10 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
7778
return std::make_unique<BooleanColumnReader>(
7879
requestedType, fileType, params, scanSpec);
7980

81+
case TypeKind::TIMESTAMP:
82+
return std::make_unique<TimestampColumnReader>(
83+
requestedType, fileType, params, scanSpec);
84+
8085
default:
8186
VELOX_FAIL(
8287
"buildReader unhandled type: " +

velox/dwio/parquet/reader/ParquetReader.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,7 @@ TypePtr ReaderBase::convertType(
531531
case thrift::Type::type::INT64:
532532
return BIGINT();
533533
case thrift::Type::type::INT96:
534-
return DOUBLE(); // TODO: Lose precision
534+
return TIMESTAMP();
535535
case thrift::Type::type::FLOAT:
536536
return REAL();
537537
case thrift::Type::type::DOUBLE:
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include "velox/dwio/parquet/reader/IntegerColumnReader.h"
20+
#include "velox/dwio/parquet/reader/ParquetColumnReader.h"
21+
22+
namespace facebook::velox::parquet {
23+
24+
class TimestampColumnReader : public IntegerColumnReader {
25+
public:
26+
TimestampColumnReader(
27+
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
28+
std::shared_ptr<const dwio::common::TypeWithId> fileType,
29+
ParquetParams& params,
30+
common::ScanSpec& scanSpec)
31+
: IntegerColumnReader(requestedType, fileType, params, scanSpec) {}
32+
33+
bool hasBulkPath() const override {
34+
return false;
35+
}
36+
37+
void read(
38+
vector_size_t offset,
39+
RowSet rows,
40+
const uint64_t* /*incomingNulls*/) override {
41+
auto& data = formatData_->as<ParquetData>();
42+
// Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length.
43+
prepareRead<int128_t>(offset, rows, nullptr);
44+
readCommon<IntegerColumnReader>(rows);
45+
readOffset_ += rows.back() + 1;
46+
}
47+
};
48+
49+
} // namespace facebook::velox::parquet
Binary file not shown.

velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,34 @@ class ParquetTableScanTest : public HiveConnectorTestBase {
7272
assertQuery(plan, splits_, sql);
7373
}
7474

75+
void assertSelectWithFilter(
76+
std::vector<std::string>&& outputColumnNames,
77+
const std::vector<std::string>& subfieldFilters,
78+
const std::string& remainingFilter,
79+
const std::string& sql,
80+
bool isFilterPushdownEnabled) {
81+
auto rowType = getRowType(std::move(outputColumnNames));
82+
parse::ParseOptions options;
83+
options.parseDecimalAsDouble = false;
84+
85+
auto plan = PlanBuilder(pool_.get())
86+
.setParseOptions(options)
87+
// Function extractFiltersFromRemainingFilter will extract
88+
// filters to subfield filters, but for some types, filter
89+
// pushdown is not supported.
90+
.tableScan(
91+
"hive_table",
92+
rowType,
93+
{},
94+
subfieldFilters,
95+
remainingFilter,
96+
nullptr,
97+
isFilterPushdownEnabled)
98+
.planNode();
99+
100+
assertQuery(plan, splits_, sql);
101+
}
102+
75103
void assertSelectWithAgg(
76104
std::vector<std::string>&& outputColumnNames,
77105
const std::vector<std::string>& aggregates,
@@ -518,6 +546,83 @@ TEST_F(ParquetTableScanTest, structSelection) {
518546
assertSelectWithFilter({"name"}, {}, "", "SELECT t from tmp");
519547
}
520548

549+
TEST_F(ParquetTableScanTest, timestampFilter) {
550+
// Timestamp-int96.parquet holds one column (t: TIMESTAMP) and
551+
// 10 rows in one row group. Data is in SNAPPY compressed format.
552+
// The values are:
553+
// |t |
554+
// +-------------------+
555+
// |2015-06-01 19:34:56|
556+
// |2015-06-02 19:34:56|
557+
// |2001-02-03 03:34:06|
558+
// |1998-03-01 08:01:06|
559+
// |2022-12-23 03:56:01|
560+
// |1980-01-24 00:23:07|
561+
// |1999-12-08 13:39:26|
562+
// |2023-04-21 09:09:34|
563+
// |2000-09-12 22:36:29|
564+
// |2007-12-12 04:27:56|
565+
// +-------------------+
566+
auto vector = makeFlatVector<Timestamp>(
567+
{Timestamp(1433116800, 70496000000000),
568+
Timestamp(1433203200, 70496000000000),
569+
Timestamp(981158400, 12846000000000),
570+
Timestamp(888710400, 28866000000000),
571+
Timestamp(1671753600, 14161000000000),
572+
Timestamp(317520000, 1387000000000),
573+
Timestamp(944611200, 49166000000000),
574+
Timestamp(1682035200, 32974000000000),
575+
Timestamp(968716800, 81389000000000),
576+
Timestamp(1197417600, 16076000000000)});
577+
578+
loadData(
579+
getExampleFilePath("timestamp_int96.parquet"),
580+
ROW({"t"}, {TIMESTAMP()}),
581+
makeRowVector(
582+
{"t"},
583+
{
584+
vector,
585+
}));
586+
assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp", false);
587+
assertSelectWithFilter(
588+
{"t"},
589+
{},
590+
"t < TIMESTAMP '2000-09-12 22:36:29'",
591+
"SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'",
592+
false);
593+
assertSelectWithFilter(
594+
{"t"},
595+
{},
596+
"t <= TIMESTAMP '2000-09-12 22:36:29'",
597+
"SELECT t from tmp where t <= TIMESTAMP '2000-09-12 22:36:29'",
598+
false);
599+
assertSelectWithFilter(
600+
{"t"},
601+
{},
602+
"t > TIMESTAMP '1980-01-24 00:23:07'",
603+
"SELECT t from tmp where t > TIMESTAMP '1980-01-24 00:23:07'",
604+
false);
605+
assertSelectWithFilter(
606+
{"t"},
607+
{},
608+
"t >= TIMESTAMP '1980-01-24 00:23:07'",
609+
"SELECT t from tmp where t >= TIMESTAMP '1980-01-24 00:23:07'",
610+
false);
611+
assertSelectWithFilter(
612+
{"t"},
613+
{},
614+
"t == TIMESTAMP '2022-12-23 03:56:01'",
615+
"SELECT t from tmp where t == TIMESTAMP '2022-12-23 03:56:01'",
616+
false);
617+
VELOX_ASSERT_THROW(
618+
assertSelectWithFilter(
619+
{"t"},
620+
{"t < TIMESTAMP '2000-09-12 22:36:29'"},
621+
"",
622+
"SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'"),
623+
"testInt128() is not supported");
624+
}
625+
521626
int main(int argc, char** argv) {
522627
testing::InitGoogleTest(&argc, argv);
523628
folly::init(&argc, &argv, false);

velox/exec/tests/utils/PlanBuilder.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ PlanBuilder& PlanBuilder::tableScan(
110110
const std::unordered_map<std::string, std::string>& columnAliases,
111111
const std::vector<std::string>& subfieldFilters,
112112
const std::string& remainingFilter,
113-
const RowTypePtr& dataColumns) {
113+
const RowTypePtr& dataColumns,
114+
bool isFilterPushdownEnabled) {
114115
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
115116
assignments;
116117
std::unordered_map<std::string, core::TypedExprPtr> typedMapping;
@@ -170,7 +171,7 @@ PlanBuilder& PlanBuilder::tableScan(
170171
auto tableHandle = std::make_shared<HiveTableHandle>(
171172
kHiveConnectorId,
172173
tableName,
173-
true,
174+
isFilterPushdownEnabled,
174175
std::move(filters),
175176
remainingFilterExpr,
176177
dataColumns);

velox/exec/tests/utils/PlanBuilder.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ class PlanBuilder {
144144
const std::unordered_map<std::string, std::string>& columnAliases = {},
145145
const std::vector<std::string>& subfieldFilters = {},
146146
const std::string& remainingFilter = "",
147-
const RowTypePtr& dataColumns = nullptr);
147+
const RowTypePtr& dataColumns = nullptr,
148+
bool isFilterPushdownEnabled = true);
148149

149150
/// Add a TableScanNode using a connector-specific table handle and
150151
/// assignments. Supports any connector, not just Hive connector.

velox/type/Type.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ namespace facebook::velox {
4545

4646
using int128_t = __int128_t;
4747

48+
struct __attribute__((__packed__)) Int96Timestamp {
49+
int32_t days;
50+
uint64_t nanos;
51+
};
52+
4853
/// Velox type system supports a small set of SQL-compatible composeable types:
4954
/// BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, HUGEINT, REAL, DOUBLE, VARCHAR,
5055
/// VARBINARY, TIMESTAMP, ARRAY, MAP, ROW

0 commit comments

Comments
 (0)