Skip to content

Commit 8c4c0bf

Browse files
committed
Make the file schema of written by parquet writer can be configurable (6074)
1 parent 6942d8c commit 8c4c0bf

File tree

6 files changed

+42
-7
lines changed

6 files changed

+42
-7
lines changed

velox/dwio/common/tests/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ target_link_libraries(
6464
velox_parse_parser
6565
velox_vector_test_lib
6666
velox_link_libs
67+
parquet
68+
arrow
6769
Folly::folly
6870
fmt::fmt
6971
lz4::lz4

velox/dwio/common/tests/E2EFilterTestBase.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#pragma once
1818

19+
#include <arrow/type.h>
1920
#include "velox/common/time/Timer.h"
2021
#include "velox/dwio/common/BufferedInput.h"
2122
#include "velox/dwio/common/FileSink.h"
@@ -170,7 +171,8 @@ class E2EFilterTestBase : public testing::Test {
170171
virtual void writeToMemory(
171172
const TypePtr& type,
172173
const std::vector<RowVectorPtr>& batches,
173-
bool forRowGroupSkip) = 0;
174+
bool forRowGroupSkip,
175+
std::shared_ptr<arrow::Schema> schema = nullptr) = 0;
174176

175177
virtual std::unique_ptr<dwio::common::Reader> makeReader(
176178
const dwio::common::ReaderOptions& opts,

velox/dwio/dwrf/test/E2EFilterTest.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ class E2EFilterTest : public E2EFilterTestBase {
6262
void writeToMemory(
6363
const TypePtr& type,
6464
const std::vector<RowVectorPtr>& batches,
65-
bool forRowGroupSkip = false) override {
65+
bool forRowGroupSkip = false,
66+
std::shared_ptr<arrow::Schema> schema = nullptr) override {
6667
auto options = createWriterOptions(type);
6768
int32_t flushCounter = 0;
6869
// If we test row group skip, we have all the data in one stripe. For

velox/dwio/parquet/tests/reader/E2EFilterTest.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17+
#include <arrow/type.h>
1718
#include "velox/dwio/common/tests/E2EFilterTestBase.h"
1819
#include "velox/dwio/parquet/reader/ParquetReader.h"
1920
#include "velox/dwio/parquet/writer/Writer.h"
@@ -69,6 +70,7 @@ class E2EFilterTest : public E2EFilterTestBase {
6970
: (++flushCounter % flushEveryNBatches_ == 0);
7071
});
7172
};
73+
options_.schema = schema;
7274

7375
writer_ = std::make_unique<facebook::velox::parquet::Writer>(
7476
std::move(sink), options_);
@@ -615,6 +617,27 @@ TEST_F(E2EFilterTest, combineRowGroup) {
615617
EXPECT_EQ(parquetReader.numberOfRows(), 5);
616618
}
617619

620+
TEST_F(E2EFilterTest, configurableWriteSchema) {
621+
rowType_ = ROW({"c0"}, {INTEGER()});
622+
std::vector<RowVectorPtr> batches;
623+
for (int i = 0; i < 5; i++) {
624+
batches.push_back(std::static_pointer_cast<RowVector>(
625+
test::BatchMaker::createBatch(rowType_, 1, *leafPool_, nullptr, 0)));
626+
}
627+
628+
auto int32 = arrow::field("int32", arrow::int32());
629+
auto rbSchema = arrow::schema({int32});
630+
writeToMemory(rowType_, batches, false, rbSchema);
631+
std::string_view data(sinkPtr_->getData(), sinkPtr_->size());
632+
dwio::common::ReaderOptions readerOpts{leafPool_.get()};
633+
auto input = std::make_unique<BufferedInput>(
634+
std::make_shared<InMemoryReadFile>(data), readerOpts.getMemoryPool());
635+
auto reader = makeReader(readerOpts, std::move(input));
636+
auto parquetReader = dynamic_cast<ParquetReader&>(*reader.get());
637+
EXPECT_EQ(parquetReader.rowType()->containsChild("int32"), true);
638+
EXPECT_EQ(parquetReader.rowType()->containsChild("c0"), false);
639+
}
640+
618641
// Define main so that gflags get processed.
619642
int main(int argc, char** argv) {
620643
testing::InitGoogleTest(&argc, argv);

velox/dwio/parquet/writer/Writer.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,11 @@ Writer::Writer(
141141
} else {
142142
flushPolicy_ = std::make_unique<DefaultFlushPolicy>();
143143
}
144+
145+
if (options.schema) {
146+
arrowContext_->schema = options.schema;
147+
}
148+
144149
arrowContext_->properties =
145150
getArrowParquetWriterOptions(options, flushPolicy_);
146151
}
@@ -218,11 +223,11 @@ void Writer::write(const VectorPtr& data) {
218223
auto recordBatch, arrow::ImportRecordBatch(&array, &schema));
219224
if (!arrowContext_->schema) {
220225
arrowContext_->schema = recordBatch->schema();
221-
for (int colIdx = 0; colIdx < arrowContext_->schema->num_fields();
222-
colIdx++) {
223-
arrowContext_->stagingChunks.push_back(
224-
std::vector<std::shared_ptr<arrow::Array>>());
225-
}
226+
}
227+
228+
for (int colIdx = 0; colIdx < arrowContext_->schema->num_fields(); colIdx++) {
229+
arrowContext_->stagingChunks.push_back(
230+
std::vector<std::shared_ptr<arrow::Array>>());
226231
}
227232

228233
auto bytes = data->estimateFlatSize();

velox/dwio/parquet/writer/Writer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#pragma once
1818

19+
#include <arrow/type.h>
1920
#include "velox/common/compression/Compression.h"
2021
#include "velox/dwio/common/DataBuffer.h"
2122
#include "velox/dwio/common/FileSink.h"
@@ -94,6 +95,7 @@ struct WriterOptions {
9495
// The default factory allows the writer to construct the default flush
9596
// policy with the configs in its ctor.
9697
std::function<std::unique_ptr<DefaultFlushPolicy>()> flushPolicyFactory;
98+
std::shared_ptr<arrow::Schema> schema;
9799
};
98100

99101
// Writes Velox vectors into a DataSink using Arrow Parquet writer.

0 commit comments

Comments
 (0)