Skip to content

Commit 0b61650

Browse files
Scott Youngfacebook-github-bot
Scott Young
authored andcommitted
add bufferPool for nimble parallel writer (facebookincubator#103)
Summary: change the buffer class to a bufferPool to handle multihreaded buffers without mutexes Differential Revision: D64774959
1 parent cc6724a commit 0b61650

File tree

8 files changed

+232
-37
lines changed

8 files changed

+232
-37
lines changed

dwio/nimble/common/Buffer.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434

3535
namespace facebook::nimble {
3636

37-
// Internally manages memory in chunks. Releases memory only upon destruction.
37+
// Internally manages memory in chunks. releases memory when destroyed
3838
// Buffer is NOT threadsafe: external locking is required.
3939
class Buffer {
4040
using MemoryPool = facebook::velox::memory::MemoryPool;
@@ -52,7 +52,6 @@ class Buffer {
5252
// to, and guarantees for the lifetime of *this that that region will remain
5353
// valid. Does NOT guarantee that the region is initially 0'd.
5454
char* reserve(uint64_t bytes) {
55-
std::scoped_lock<std::mutex> l(mutex_);
5655
if (reserveEnd_ + bytes <= chunkEnd_) {
5756
pos_ = reserveEnd_;
5857
reserveEnd_ += bytes;
@@ -98,11 +97,6 @@ class Buffer {
9897
char* reserveEnd_;
9998
std::vector<velox::BufferPtr> chunks_;
10099
MemoryPool& memoryPool_;
101-
// NOTE: this is temporary fix, to quickly enable parallel access to the
102-
// buffer class. In the near future, we are going to templetize this class to
103-
// produce a concurrent and a non-concurrent variants, and change the call
104-
// sites to use each variant when needed.
105-
std::mutex mutex_;
106100
};
107101

108102
} // namespace facebook::nimble
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include <gtest/gtest.h>
18+
#include "dwio/nimble/common/Exceptions.h"
19+
#include "dwio/nimble/velox/FieldWriter.h"
20+
#include "folly/executors/CPUThreadPoolExecutor.h"
21+
#include "velox/common/memory/Memory.h"
22+
#include "velox/common/memory/MemoryArbitrator.h"
23+
#include "velox/common/memory/SharedArbitrator.h"
24+
#include "velox/dwio/common/ExecutorBarrier.h"
25+
26+
namespace facebook::nimble::test {
27+
using MemoryPool = velox::memory::MemoryPool;
28+
using ExecutorBarrier = velox::dwio::common::ExecutorBarrier;
29+
30+
class BufferPoolTest : public ::testing::Test {
31+
protected:
32+
static void SetUpTestCase() {
33+
velox::memory::SharedArbitrator::registerFactory();
34+
velox::memory::MemoryManager::testingSetInstance(
35+
{.arbitratorKind = "SHARED"});
36+
}
37+
38+
void SetUp() override {
39+
rootPool_ = velox::memory::memoryManager()->addRootPool("default_root");
40+
leafPool_ = rootPool_->addLeafChild("default_leaf");
41+
}
42+
43+
std::shared_ptr<velox::memory::MemoryPool> rootPool_;
44+
std::shared_ptr<velox::memory::MemoryPool> leafPool_;
45+
};
46+
47+
TEST_F(BufferPoolTest, CreateBufferPoolBadMaxPool) {
48+
try {
49+
auto bufferPool = BufferPool{*leafPool_, /* maxPoolSize */ 0};
50+
FAIL();
51+
} catch (const NimbleUserError& e) {
52+
EXPECT_EQ(e.errorMessage(), "max pool size must be > 0");
53+
}
54+
}
55+
56+
TEST_F(BufferPoolTest, ReserveBuffer) {
57+
auto bufferPool = BufferPool{*leafPool_, /* maxPoolSize */ 10};
58+
EXPECT_EQ(bufferPool.size(), 10);
59+
{
60+
auto buffer = bufferPool.reserveBuffer();
61+
EXPECT_EQ(bufferPool.size(), 9);
62+
}
63+
EXPECT_EQ(bufferPool.size(), 10);
64+
}
65+
66+
TEST_F(BufferPoolTest, EmptyFillBufferPool) {
67+
size_t iterations = 10;
68+
auto bufferPool = BufferPool{*leafPool_, /* maxPoolSize */ iterations};
69+
70+
for (auto i = 0; i < iterations; ++i) {
71+
{
72+
auto buffer1 = bufferPool.reserveBuffer();
73+
auto buffer2 = bufferPool.reserveBuffer();
74+
auto buffer3 = bufferPool.reserveBuffer();
75+
76+
EXPECT_EQ(bufferPool.size(), iterations - 3);
77+
}
78+
EXPECT_EQ(bufferPool.size(), iterations);
79+
}
80+
}
81+
82+
TEST_F(BufferPoolTest, ParallelFillPool) {
83+
auto parallelismFactor = std::thread::hardware_concurrency();
84+
auto executor =
85+
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
86+
ExecutorBarrier barrier{executor};
87+
auto bufferPool = BufferPool{*leafPool_};
88+
EXPECT_EQ(bufferPool.size(), parallelismFactor);
89+
90+
for (auto i = 0; i < parallelismFactor; ++i) {
91+
barrier.add([&]() {
92+
for (auto j = 0; j < 100000; ++j) {
93+
auto buffer = bufferPool.reserveBuffer();
94+
}
95+
});
96+
}
97+
98+
barrier.waitAll();
99+
EXPECT_LE(bufferPool.size(), parallelismFactor);
100+
}
101+
} // namespace facebook::nimble::test

dwio/nimble/velox/FieldWriter.cpp

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,9 +289,8 @@ class SimpleFieldWriter : public FieldWriter {
289289
const OrderedRanges& ranges,
290290
folly::Executor*) override {
291291
auto size = ranges.size();
292-
auto& buffer = context_.stringBuffer();
293292
auto& data = valuesStream_.mutableData();
294-
293+
auto bufferObject = context_.bufferPool().reserveBuffer();
295294
if (auto flat = vector->asFlatVector<SourceType>()) {
296295
valuesStream_.ensureNullsCapacity(flat->mayHaveNulls(), size);
297296
bool rangeCopied = false;
@@ -331,6 +330,7 @@ class SimpleFieldWriter : public FieldWriter {
331330
valuesStream_.mutableNonNulls(),
332331
Flat<SourceType>{vector},
333332
[&](SourceType value) {
333+
auto& buffer = bufferObject.get();
334334
data.push_back(
335335
C::convert(value, buffer, valuesStream_.extraMemory()));
336336
});
@@ -344,6 +344,7 @@ class SimpleFieldWriter : public FieldWriter {
344344
valuesStream_.mutableNonNulls(),
345345
Decoded<SourceType>{decoded},
346346
[&](SourceType value) {
347+
auto& buffer = bufferObject.get();
347348
data.push_back(
348349
C::convert(value, buffer, valuesStream_.extraMemory()));
349350
});
@@ -1584,6 +1585,67 @@ size_t DecodingContextPool::size() const {
15841585
return pool_.size();
15851586
}
15861587

1588+
BufferPool::BufferObject::BufferObject(
1589+
BufferPool& pool,
1590+
std::unique_ptr<Buffer> buffer)
1591+
: pool_{pool}, buffer_{std::move(buffer)} {}
1592+
1593+
BufferPool::BufferObject::~BufferObject() {
1594+
pool_.addBuffer(std::move(buffer_));
1595+
}
1596+
1597+
Buffer& BufferPool::BufferObject::get() {
1598+
return *buffer_;
1599+
}
1600+
1601+
BufferPool::BufferPool(
1602+
facebook::velox::memory::MemoryPool& memoryPool,
1603+
size_t maxPoolSize,
1604+
uint64_t initialChunkSize)
1605+
: defaultInitialChunkSize_{initialChunkSize},
1606+
maxPoolSize{maxPoolSize},
1607+
semaphore_{0},
1608+
memoryPool_{memoryPool} {
1609+
NIMBLE_CHECK(maxPoolSize > 0, "max pool size must be > 0")
1610+
pool_.reserve(maxPoolSize);
1611+
for (size_t i = 0; i < maxPoolSize; ++i) {
1612+
pool_.emplace_back(newBuffer());
1613+
semaphore_.release();
1614+
}
1615+
}
1616+
1617+
facebook::velox::memory::MemoryPool& BufferPool::getMemoryPool() {
1618+
return memoryPool_;
1619+
}
1620+
1621+
// buffer back to the pool.
1622+
void BufferPool::addBuffer(std::unique_ptr<Buffer> buffer) {
1623+
std::scoped_lock<std::mutex> lock(mutex_);
1624+
pool_.push_back(std::move(buffer));
1625+
semaphore_.release();
1626+
}
1627+
1628+
// Reserves a buffer from the pool. Adds a new buffer to the pool
1629+
// while there are buffers available
1630+
BufferPool::BufferObject BufferPool::reserveBuffer() {
1631+
semaphore_.acquire();
1632+
1633+
std::scoped_lock<std::mutex> lock(mutex_);
1634+
auto buffer = std::move(pool_.back());
1635+
pool_.pop_back();
1636+
1637+
return BufferPool::BufferObject{*this, std::move(buffer)};
1638+
}
1639+
1640+
// Returns estimated number of buffers in the pool
1641+
size_t BufferPool::size() {
1642+
return pool_.size();
1643+
}
1644+
1645+
std::unique_ptr<Buffer> BufferPool::newBuffer() {
1646+
return std::make_unique<Buffer>(memoryPool_, defaultInitialChunkSize_);
1647+
}
1648+
15871649
std::unique_ptr<FieldWriter> FieldWriter::create(
15881650
FieldWriterContext& context,
15891651
const std::shared_ptr<const velox::dwio::common::TypeWithId>& type,

dwio/nimble/velox/FieldWriter.h

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -76,20 +76,59 @@ class DecodingContextPool {
7676
std::unique_ptr<velox::SelectivityVector> selectivityVector);
7777
};
7878

79+
// Manages a pool of buffers. Buffers are returned to the pool when released.
80+
// maxPoolSize should be set to at least 90% of capacity for performance
81+
class BufferPool {
82+
public:
83+
class BufferObject {
84+
public:
85+
explicit BufferObject(BufferPool& pool, std::unique_ptr<Buffer> buffer);
86+
87+
~BufferObject();
88+
Buffer& get();
89+
90+
private:
91+
BufferPool& pool_;
92+
std::unique_ptr<Buffer> buffer_;
93+
};
94+
95+
explicit BufferPool(
96+
facebook::velox::memory::MemoryPool& memoryPool,
97+
size_t maxPoolSize = std::thread::hardware_concurrency(),
98+
uint64_t initialChunkSize = kMinChunkSize);
99+
100+
facebook::velox::memory::MemoryPool& getMemoryPool();
101+
BufferObject reserveBuffer();
102+
size_t size();
103+
104+
private:
105+
static const uint64_t kMinChunkSize = 1LL << 20;
106+
const uint64_t defaultInitialChunkSize_;
107+
108+
std::mutex mutex_;
109+
std::counting_semaphore<> semaphore_;
110+
std::vector<std::unique_ptr<Buffer>> pool_;
111+
facebook::velox::memory::MemoryPool& memoryPool_;
112+
113+
void addBuffer(std::unique_ptr<Buffer> buffer);
114+
std::unique_ptr<Buffer> newBuffer();
115+
};
116+
79117
struct FieldWriterContext {
80118
explicit FieldWriterContext(
81119
velox::memory::MemoryPool& memoryPool,
82120
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer = nullptr,
83-
std::function<void(void)> vectorDecoderVisitor = []() {})
121+
std::function<void(void)> vectorDecoderVisitor = []() {},
122+
size_t maxPoolSize = std::thread::hardware_concurrency())
84123
: bufferMemoryPool{memoryPool.addLeafChild(
85124
"field_writer_buffer",
86125
true,
87126
std::move(reclaimer))},
88127
inputBufferGrowthPolicy{
89128
DefaultInputBufferGrowthPolicy::withDefaultRanges()},
90-
decodingContextPool_{std::move(vectorDecoderVisitor)} {
91-
resetStringBuffer();
92-
}
129+
bufferPool_{
130+
std::make_unique<BufferPool>(*bufferMemoryPool, maxPoolSize)},
131+
decodingContextPool_{std::move(vectorDecoderVisitor)} {}
93132

94133
std::shared_ptr<velox::memory::MemoryPool> bufferMemoryPool;
95134
std::mutex flatMapSchemaMutex;
@@ -112,13 +151,8 @@ struct FieldWriterContext {
112151
return decodingContextPool_.reserveContext();
113152
}
114153

115-
Buffer& stringBuffer() {
116-
return *buffer_;
117-
}
118-
119-
// Reset writer context for use by next stripe.
120-
void resetStringBuffer() {
121-
buffer_ = std::make_unique<Buffer>(*bufferMemoryPool);
154+
BufferPool& bufferPool() {
155+
return *bufferPool_;
122156
}
123157

124158
const std::vector<std::unique_ptr<StreamData>>& streams() {
@@ -148,7 +182,7 @@ struct FieldWriterContext {
148182
}
149183

150184
private:
151-
std::unique_ptr<Buffer> buffer_;
185+
std::unique_ptr<BufferPool> bufferPool_;
152186
DecodingContextPool decodingContextPool_;
153187
std::vector<std::unique_ptr<StreamData>> streams_;
154188
};

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,12 @@ class WriterContext : public FieldWriterContext {
6464
WriterContext(
6565
velox::memory::MemoryPool& memoryPool,
6666
VeloxWriterOptions options)
67-
: FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor},
67+
: FieldWriterContext{
68+
memoryPool,
69+
options.reclaimerFactory(),
70+
options.vectorDecoderVisitor,
71+
options.maxPoolSize
72+
},
6873
options{std::move(options)},
6974
logger{this->options.metricsLogger} {
7075
flushPolicy = this->options.flushPolicyFactory();
@@ -622,9 +627,6 @@ void VeloxWriter::writeChunk(bool lastChunk) {
622627
LoggingScope scope{*context_->logger};
623628
velox::CpuWallTimer veloxTimer{context_->stripeFlushTiming};
624629

625-
if (!encodingBuffer_) {
626-
encodingBuffer_ = std::make_unique<Buffer>(*encodingMemoryPool_);
627-
}
628630
streams_.resize(context_->schemaBuilder.nodeCount());
629631

630632
// When writing null streams, we write the nulls as data, and the stream
@@ -668,9 +670,11 @@ void VeloxWriter::writeChunk(bool lastChunk) {
668670

669671
auto encode = [&](StreamData& streamData) {
670672
const auto offset = streamData.descriptor().offset();
671-
auto encoded = encodeStream(*context_, *encodingBuffer_, streamData);
673+
auto bufferObject = context_->bufferPool().reserveBuffer();
674+
auto& buffer = bufferObject.get();
675+
auto encoded = encodeStream(*context_, buffer, streamData);
672676
if (!encoded.empty()) {
673-
ChunkedStreamWriter chunkWriter{*encodingBuffer_};
677+
ChunkedStreamWriter chunkWriter{buffer};
674678
NIMBLE_DASSERT(offset < streams_.size(), "Stream offset out of range.");
675679
auto& stream = streams_[offset];
676680
for (auto& buffer : chunkWriter.encode(encoded)) {
@@ -782,10 +786,6 @@ uint32_t VeloxWriter::writeStripe() {
782786
uint64_t startSize = writer_.size();
783787
writer_.writeStripe(context_->rowsInStripe, std::move(streams_));
784788
stripeSize = writer_.size() - startSize;
785-
encodingBuffer_.reset();
786-
// TODO: once chunked string fields are supported, move string buffer
787-
// reset to writeChunk()
788-
context_->resetStringBuffer();
789789
}
790790

791791
NIMBLE_ASSERT(

dwio/nimble/velox/VeloxWriter.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ class VeloxWriter {
7777
TabletWriter writer_;
7878
std::unique_ptr<FieldWriter> root_;
7979

80-
std::unique_ptr<Buffer> encodingBuffer_;
8180
std::vector<Stream> streams_;
8281
std::exception_ptr lastException_;
8382
const velox::common::SpillConfig* const spillConfig_;

dwio/nimble/velox/VeloxWriterOptions.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ struct VeloxWriterOptions {
126126

127127
const velox::common::SpillConfig* spillConfig{nullptr};
128128

129+
size_t maxPoolSize = std::thread::hardware_concurrency();
129130
// If provided, internal encoding operations will happen in parallel using
130131
// this executor.
131132
std::shared_ptr<folly::Executor> encodingExecutor;

0 commit comments

Comments
 (0)