Skip to content

Commit e7eb492

Browse files
Scott Youngfacebook-github-bot
Scott Young
authored andcommitted
add bufferPool for nimble parallel writer (facebookincubator#103)
Summary: change the buffer class to a bufferPool to handle multihreaded buffers without mutexes Differential Revision: D64774959
1 parent cc6724a commit e7eb492

File tree

8 files changed

+207
-37
lines changed

8 files changed

+207
-37
lines changed

dwio/nimble/common/Buffer.h

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
#pragma once
1717

18+
#include "dwio/nimble/common/Exceptions.h"
1819
#include "velox/buffer/Buffer.h"
1920
#include "velox/common/memory/Memory.h"
2021

@@ -34,7 +35,7 @@
3435

3536
namespace facebook::nimble {
3637

37-
// Internally manages memory in chunks. Releases memory only upon destruction.
38+
// Internally manages memory in chunks. releases memory when destroyed
3839
// Buffer is NOT threadsafe: external locking is required.
3940
class Buffer {
4041
using MemoryPool = facebook::velox::memory::MemoryPool;
@@ -52,7 +53,6 @@ class Buffer {
5253
// to, and guarantees for the lifetime of *this that that region will remain
5354
// valid. Does NOT guarantee that the region is initially 0'd.
5455
char* reserve(uint64_t bytes) {
55-
std::scoped_lock<std::mutex> l(mutex_);
5656
if (reserveEnd_ + bytes <= chunkEnd_) {
5757
pos_ = reserveEnd_;
5858
reserveEnd_ += bytes;
@@ -98,11 +98,6 @@ class Buffer {
9898
char* reserveEnd_;
9999
std::vector<velox::BufferPtr> chunks_;
100100
MemoryPool& memoryPool_;
101-
// NOTE: this is temporary fix, to quickly enable parallel access to the
102-
// buffer class. In the near future, we are going to templetize this class to
103-
// produce a concurrent and a non-concurrent variants, and change the call
104-
// sites to use each variant when needed.
105-
std::mutex mutex_;
106101
};
107102

108103
} // namespace facebook::nimble
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include <gtest/gtest.h>
18+
#include "dwio/nimble/common/Buffer.h"
19+
#include "dwio/nimble/common/Exceptions.h"
20+
#include "dwio/nimble/velox/FieldWriter.h"
21+
#include "folly/executors/CPUThreadPoolExecutor.h"
22+
#include "velox/common/memory/Memory.h"
23+
#include "velox/dwio/common/ExecutorBarrier.h"
24+
25+
namespace facebook::nimble::test {
26+
using MemoryPool = velox::memory::MemoryPool;
27+
using ExecutorBarrier = velox::dwio::common::ExecutorBarrier;
28+
29+
class BufferPoolTest : public ::testing::Test {
30+
protected:
31+
static void SetUpTestCase() {}
32+
33+
void SetUp() override {
34+
memPool_ = velox::memory::deprecatedAddDefaultLeafMemoryPool();
35+
}
36+
37+
std::shared_ptr<velox::memory::MemoryPool> memPool_;
38+
};
39+
40+
TEST_F(BufferPoolTest, CreateBufferPoolBadMaxPool) {
41+
try {
42+
auto bufferPool = BufferPool{*memPool_, /* maxPoolSize */ 0};
43+
FAIL();
44+
} catch (const NimbleUserError& e) {
45+
EXPECT_EQ(e.errorMessage(), "max pool size must be > 0");
46+
}
47+
}
48+
49+
TEST_F(BufferPoolTest, ReserveAddBuffer) {
50+
auto bufferPool = BufferPool{*memPool_, /* maxPoolSize */ 10};
51+
auto buffer = bufferPool.reserveBuffer();
52+
EXPECT_EQ(bufferPool.size(), 9);
53+
bufferPool.addBuffer(std::move(buffer));
54+
EXPECT_EQ(bufferPool.size(), 10);
55+
}
56+
57+
TEST_F(BufferPoolTest, EmptyFillBufferPool) {
58+
size_t iterations = 10;
59+
std::vector<std::unique_ptr<Buffer>> buffers;
60+
auto bufferPool = BufferPool{*memPool_, /* maxPoolSize */ iterations};
61+
62+
for (auto i = 0; i < iterations; ++i) {
63+
auto buffer = bufferPool.reserveBuffer();
64+
buffers.push_back(std::move(buffer));
65+
EXPECT_EQ(bufferPool.size(), iterations - i - 1);
66+
}
67+
EXPECT_EQ(bufferPool.size(), 0);
68+
69+
for (auto i = 0; i < iterations; ++i) {
70+
bufferPool.addBuffer(std::move(buffers.back()));
71+
buffers.pop_back();
72+
EXPECT_EQ(bufferPool.size(), i + 1);
73+
}
74+
}
75+
76+
TEST_F(BufferPoolTest, ParallelFillPool) {
77+
auto parallelismFactor = std::thread::hardware_concurrency();
78+
auto executor =
79+
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
80+
ExecutorBarrier barrier{executor};
81+
auto bufferPool = BufferPool{*memPool_};
82+
EXPECT_EQ(bufferPool.size(), parallelismFactor);
83+
84+
for (auto i = 0; i < parallelismFactor; ++i) {
85+
barrier.add([&]() {
86+
for (auto j = 0; j < 100000; ++j) {
87+
auto buffer = bufferPool.reserveBuffer();
88+
bufferPool.addBuffer(std::move(buffer));
89+
}
90+
});
91+
}
92+
93+
barrier.waitAll();
94+
EXPECT_LE(bufferPool.size(), parallelismFactor);
95+
}
96+
} // namespace facebook::nimble::test

dwio/nimble/velox/FieldWriter.cpp

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,9 +289,10 @@ class SimpleFieldWriter : public FieldWriter {
289289
const OrderedRanges& ranges,
290290
folly::Executor*) override {
291291
auto size = ranges.size();
292-
auto& buffer = context_.stringBuffer();
293292
auto& data = valuesStream_.mutableData();
294-
293+
auto bufferPtr = context_.bufferPool().reserveBuffer();
294+
auto bufferGuard = folly::makeGuard(
295+
[&]() { context_.bufferPool().addBuffer(std::move(bufferPtr)); });
295296
if (auto flat = vector->asFlatVector<SourceType>()) {
296297
valuesStream_.ensureNullsCapacity(flat->mayHaveNulls(), size);
297298
bool rangeCopied = false;
@@ -331,6 +332,7 @@ class SimpleFieldWriter : public FieldWriter {
331332
valuesStream_.mutableNonNulls(),
332333
Flat<SourceType>{vector},
333334
[&](SourceType value) {
335+
auto& buffer = *bufferPtr;
334336
data.push_back(
335337
C::convert(value, buffer, valuesStream_.extraMemory()));
336338
});
@@ -344,6 +346,7 @@ class SimpleFieldWriter : public FieldWriter {
344346
valuesStream_.mutableNonNulls(),
345347
Decoded<SourceType>{decoded},
346348
[&](SourceType value) {
349+
auto& buffer = *bufferPtr;
347350
data.push_back(
348351
C::convert(value, buffer, valuesStream_.extraMemory()));
349352
});
@@ -1584,6 +1587,53 @@ size_t DecodingContextPool::size() const {
15841587
return pool_.size();
15851588
}
15861589

1590+
BufferPool::BufferPool(
1591+
facebook::velox::memory::MemoryPool& memoryPool,
1592+
size_t maxPoolSize,
1593+
uint64_t initialChunkSize)
1594+
: defaultInitialChunkSize_{initialChunkSize},
1595+
maxPoolSize{maxPoolSize},
1596+
semaphore_{0},
1597+
memoryPool_{memoryPool} {
1598+
NIMBLE_CHECK(maxPoolSize > 0, "max pool size must be > 0")
1599+
pool_.reserve(maxPoolSize);
1600+
for (size_t i = 0; i < maxPoolSize; ++i) {
1601+
pool_.emplace_back(newBuffer());
1602+
semaphore_.release();
1603+
}
1604+
}
1605+
1606+
facebook::velox::memory::MemoryPool& BufferPool::getMemoryPool() {
1607+
return memoryPool_;
1608+
}
1609+
1610+
// buffer back to the pool.
1611+
void BufferPool::addBuffer(std::unique_ptr<Buffer> buffer) {
1612+
std::scoped_lock<std::mutex> lock(mutex_);
1613+
pool_.push_back(std::move(buffer));
1614+
semaphore_.release();
1615+
}
1616+
1617+
// Reserves a buffer from the pool. Adds a new buffer to the pool
1618+
// while there are buffers available
1619+
std::unique_ptr<Buffer> BufferPool::reserveBuffer() {
1620+
semaphore_.acquire();
1621+
1622+
std::scoped_lock<std::mutex> lock(mutex_);
1623+
auto buffer = std::move(pool_.back());
1624+
pool_.pop_back();
1625+
return buffer;
1626+
}
1627+
1628+
// Returns estimated number of buffers in the pool
1629+
size_t BufferPool::size() {
1630+
return pool_.size();
1631+
}
1632+
1633+
std::unique_ptr<Buffer> BufferPool::newBuffer() {
1634+
return std::make_unique<Buffer>(memoryPool_, defaultInitialChunkSize_);
1635+
}
1636+
15871637
std::unique_ptr<FieldWriter> FieldWriter::create(
15881638
FieldWriterContext& context,
15891639
const std::shared_ptr<const velox::dwio::common::TypeWithId>& type,

dwio/nimble/velox/FieldWriter.h

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -76,20 +76,48 @@ class DecodingContextPool {
7676
std::unique_ptr<velox::SelectivityVector> selectivityVector);
7777
};
7878

79+
// Manages a pool of buffers. Buffers are returned to the pool when released.
80+
// maxPoolSize should be set to at least 90% of capacity for performance
81+
class BufferPool {
82+
public:
83+
explicit BufferPool(
84+
facebook::velox::memory::MemoryPool& memoryPool,
85+
size_t maxPoolSize = std::thread::hardware_concurrency(),
86+
uint64_t initialChunkSize = kMinChunkSize);
87+
88+
facebook::velox::memory::MemoryPool& getMemoryPool();
89+
void addBuffer(std::unique_ptr<Buffer> buffer);
90+
std::unique_ptr<Buffer> reserveBuffer();
91+
size_t size();
92+
93+
private:
94+
static const uint64_t kMinChunkSize = 1LL << 20;
95+
const uint64_t defaultInitialChunkSize_;
96+
const size_t maxPoolSize;
97+
98+
std::mutex mutex_;
99+
std::counting_semaphore<> semaphore_;
100+
std::vector<std::unique_ptr<Buffer>> pool_;
101+
facebook::velox::memory::MemoryPool& memoryPool_;
102+
103+
std::unique_ptr<Buffer> newBuffer();
104+
};
105+
79106
struct FieldWriterContext {
80107
explicit FieldWriterContext(
81108
velox::memory::MemoryPool& memoryPool,
82109
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer = nullptr,
83-
std::function<void(void)> vectorDecoderVisitor = []() {})
110+
std::function<void(void)> vectorDecoderVisitor = []() {},
111+
size_t maxPoolSize = std::thread::hardware_concurrency())
84112
: bufferMemoryPool{memoryPool.addLeafChild(
85113
"field_writer_buffer",
86114
true,
87115
std::move(reclaimer))},
88116
inputBufferGrowthPolicy{
89117
DefaultInputBufferGrowthPolicy::withDefaultRanges()},
90-
decodingContextPool_{std::move(vectorDecoderVisitor)} {
91-
resetStringBuffer();
92-
}
118+
bufferPool_{
119+
std::make_unique<BufferPool>(*bufferMemoryPool, maxPoolSize)},
120+
decodingContextPool_{std::move(vectorDecoderVisitor)} {}
93121

94122
std::shared_ptr<velox::memory::MemoryPool> bufferMemoryPool;
95123
std::mutex flatMapSchemaMutex;
@@ -112,13 +140,8 @@ struct FieldWriterContext {
112140
return decodingContextPool_.reserveContext();
113141
}
114142

115-
Buffer& stringBuffer() {
116-
return *buffer_;
117-
}
118-
119-
// Reset writer context for use by next stripe.
120-
void resetStringBuffer() {
121-
buffer_ = std::make_unique<Buffer>(*bufferMemoryPool);
143+
BufferPool& bufferPool() {
144+
return *bufferPool_;
122145
}
123146

124147
const std::vector<std::unique_ptr<StreamData>>& streams() {
@@ -148,7 +171,7 @@ struct FieldWriterContext {
148171
}
149172

150173
private:
151-
std::unique_ptr<Buffer> buffer_;
174+
std::unique_ptr<BufferPool> bufferPool_;
152175
DecodingContextPool decodingContextPool_;
153176
std::vector<std::unique_ptr<StreamData>> streams_;
154177
};

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,12 @@ class WriterContext : public FieldWriterContext {
6464
WriterContext(
6565
velox::memory::MemoryPool& memoryPool,
6666
VeloxWriterOptions options)
67-
: FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor},
67+
: FieldWriterContext{
68+
memoryPool,
69+
options.reclaimerFactory(),
70+
options.vectorDecoderVisitor,
71+
options.maxPoolSize
72+
},
6873
options{std::move(options)},
6974
logger{this->options.metricsLogger} {
7075
flushPolicy = this->options.flushPolicyFactory();
@@ -622,9 +627,6 @@ void VeloxWriter::writeChunk(bool lastChunk) {
622627
LoggingScope scope{*context_->logger};
623628
velox::CpuWallTimer veloxTimer{context_->stripeFlushTiming};
624629

625-
if (!encodingBuffer_) {
626-
encodingBuffer_ = std::make_unique<Buffer>(*encodingMemoryPool_);
627-
}
628630
streams_.resize(context_->schemaBuilder.nodeCount());
629631

630632
// When writing null streams, we write the nulls as data, and the stream
@@ -668,9 +670,13 @@ void VeloxWriter::writeChunk(bool lastChunk) {
668670

669671
auto encode = [&](StreamData& streamData) {
670672
const auto offset = streamData.descriptor().offset();
671-
auto encoded = encodeStream(*context_, *encodingBuffer_, streamData);
673+
auto bufferPtr = context_->bufferPool().reserveBuffer();
674+
auto bufferGuard = folly::makeGuard(
675+
[&]() { context_->bufferPool().addBuffer(std::move(bufferPtr)); });
676+
auto& buffer = *bufferPtr;
677+
auto encoded = encodeStream(*context_, buffer, streamData);
672678
if (!encoded.empty()) {
673-
ChunkedStreamWriter chunkWriter{*encodingBuffer_};
679+
ChunkedStreamWriter chunkWriter{buffer};
674680
NIMBLE_DASSERT(offset < streams_.size(), "Stream offset out of range.");
675681
auto& stream = streams_[offset];
676682
for (auto& buffer : chunkWriter.encode(encoded)) {
@@ -782,10 +788,6 @@ uint32_t VeloxWriter::writeStripe() {
782788
uint64_t startSize = writer_.size();
783789
writer_.writeStripe(context_->rowsInStripe, std::move(streams_));
784790
stripeSize = writer_.size() - startSize;
785-
encodingBuffer_.reset();
786-
// TODO: once chunked string fields are supported, move string buffer
787-
// reset to writeChunk()
788-
context_->resetStringBuffer();
789791
}
790792

791793
NIMBLE_ASSERT(

dwio/nimble/velox/VeloxWriter.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ class VeloxWriter {
7777
TabletWriter writer_;
7878
std::unique_ptr<FieldWriter> root_;
7979

80-
std::unique_ptr<Buffer> encodingBuffer_;
8180
std::vector<Stream> streams_;
8281
std::exception_ptr lastException_;
8382
const velox::common::SpillConfig* const spillConfig_;

dwio/nimble/velox/VeloxWriterOptions.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ struct VeloxWriterOptions {
126126

127127
const velox::common::SpillConfig* spillConfig{nullptr};
128128

129+
size_t maxPoolSize = std::thread::hardware_concurrency();
129130
// If provided, internal encoding operations will happen in parallel using
130131
// this executor.
131132
std::shared_ptr<folly::Executor> encodingExecutor;

dwio/nimble/velox/tests/VeloxWriterTests.cpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -319,11 +319,15 @@ TEST_F(VeloxWriterTests, MemoryReclaimPath) {
319319
std::string file;
320320
auto writeFile = std::make_unique<velox::InMemoryWriteFile>(&file);
321321
std::atomic_bool reclaimEntered = false;
322-
nimble::VeloxWriterOptions writerOptions{.reclaimerFactory = [&]() {
323-
auto reclaimer = std::make_unique<MockReclaimer>();
324-
reclaimer->setEnterArbitrationFunc([&]() { reclaimEntered = true; });
325-
return reclaimer;
326-
}};
322+
nimble::VeloxWriterOptions writerOptions{
323+
.reclaimerFactory =
324+
[&]() {
325+
auto reclaimer = std::make_unique<MockReclaimer>();
326+
reclaimer->setEnterArbitrationFunc(
327+
[&]() { reclaimEntered = true; });
328+
return reclaimer;
329+
},
330+
.maxPoolSize = 2};
327331
nimble::VeloxWriter writer(
328332
*writerPool, type, std::move(writeFile), std::move(writerOptions));
329333
auto batches = generateBatches(type, 100, 4000, 20221110, *leafPool_);

0 commit comments

Comments
 (0)