Skip to content

Commit a2d72dc

Browse files
Scott Youngfacebook-github-bot
Scott Young
authored andcommitted
add bufferPool for nimble parallel writer (#103)
Summary: change the buffer class to a bufferPool to handle multihreaded buffers without mutexes Differential Revision: D64774959
1 parent 6d79aa4 commit a2d72dc

File tree

8 files changed

+378
-39
lines changed

8 files changed

+378
-39
lines changed

dwio/nimble/common/Buffer.h

+125-9
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
*/
1616
#pragma once
1717

18+
#include "dwio/nimble/common/Exceptions.h"
19+
#include "folly/concurrency/DynamicBoundedQueue.h"
20+
#include "folly/coro/AsyncScope.h"
21+
#include "folly/coro/BlockingWait.h"
1822
#include "velox/buffer/Buffer.h"
1923
#include "velox/common/memory/Memory.h"
2024

@@ -33,12 +37,13 @@
3337
// and so on
3438

3539
namespace facebook::nimble {
40+
using MemoryPool = facebook::velox::memory::MemoryPool;
41+
using AsyncScope = folly::coro::AsyncScope;
3642

37-
// Internally manages memory in chunks. Releases memory only upon destruction.
43+
// Internally manages memory in chunks. releases memory to the pool when
44+
// cleared is called
3845
// Buffer is NOT threadsafe: external locking is required.
3946
class Buffer {
40-
using MemoryPool = facebook::velox::memory::MemoryPool;
41-
4247
public:
4348
explicit Buffer(
4449
MemoryPool& memoryPool,
@@ -52,7 +57,6 @@ class Buffer {
5257
// to, and guarantees for the lifetime of *this that that region will remain
5358
// valid. Does NOT guarantee that the region is initially 0'd.
5459
char* reserve(uint64_t bytes) {
55-
std::scoped_lock<std::mutex> l(mutex_);
5660
if (reserveEnd_ + bytes <= chunkEnd_) {
5761
pos_ = reserveEnd_;
5862
reserveEnd_ += bytes;
@@ -98,11 +102,123 @@ class Buffer {
98102
char* reserveEnd_;
99103
std::vector<velox::BufferPtr> chunks_;
100104
MemoryPool& memoryPool_;
101-
// NOTE: this is temporary fix, to quickly enable parallel access to the
102-
// buffer class. In the near future, we are going to templetize this class to
103-
// produce a concurrent and a non-concurrent variants, and change the call
104-
// sites to use each variant when needed.
105-
std::mutex mutex_;
105+
};
106+
107+
using BufferPtr = std::unique_ptr<Buffer>;
108+
109+
// Manages a pool of buffers. Buffers are returned to the pool when released.
110+
// maxPoolSize should be set to at least 90% of capacity for performance
111+
class BufferPool {
112+
public:
113+
explicit BufferPool(
114+
MemoryPool& memoryPool,
115+
std::shared_ptr<folly::Executor> executor,
116+
std::chrono::milliseconds timeout = std::chrono::milliseconds(1000 * 10),
117+
size_t maxPoolSize = std::thread::hardware_concurrency(),
118+
size_t initialBufferCount = 1,
119+
uint64_t initialChunkSize = kMinChunkSize)
120+
: defaultInitialChunkSize_{initialChunkSize},
121+
timeout_{timeout},
122+
executor_{std::move(executor)},
123+
bufferQueue_{maxPoolSize},
124+
backgroundScope_{std::make_unique<AsyncScope>()},
125+
memoryPool_{memoryPool} {
126+
NIMBLE_CHECK(maxPoolSize > 0, "max pool size must be > 0")
127+
NIMBLE_CHECK(timeout_.count() > 0, "timeout must be > 0")
128+
NIMBLE_CHECK(0 < initialBufferCount, "initial pool size must be > 0");
129+
NIMBLE_CHECK(
130+
initialBufferCount <= maxPoolSize,
131+
"initial pool size must be <= max pool size")
132+
133+
for (size_t i = 0; i < initialBufferCount; ++i) {
134+
addBuffer();
135+
}
136+
}
137+
~BufferPool() {
138+
folly::coro::blockingWait(backgroundScope_->joinAsync());
139+
deleteBufferPool();
140+
backgroundScope_.reset();
141+
}
142+
143+
MemoryPool& getMemoryPool() {
144+
return memoryPool_;
145+
}
146+
147+
void co_addBuffer(BufferPtr buffer) {
148+
backgroundScope_->add(folly::coro::co_invoke(
149+
[this, bufferRef = std::move(buffer)]() mutable
150+
-> folly::coro::Task<void> {
151+
addBuffer(std::move(bufferRef));
152+
co_return;
153+
})
154+
.scheduleOn(executor_.get()));
155+
}
156+
157+
// Releases the buffer back to the pool.
158+
void addBuffer(BufferPtr buffer) {
159+
const auto status =
160+
bufferQueue_.try_enqueue_for(std::move(buffer), timeout_);
161+
if (!status) {
162+
NIMBLE_UNKNOWN(
163+
"Timed out enqueuing for buffer. Timeout set to " +
164+
std::to_string(timeout_.count()) + " ms");
165+
}
166+
}
167+
168+
// Reserves a buffer from the pool. Adds a new buffer to the pool
169+
// while there are buffers available
170+
BufferPtr reserveBuffer() {
171+
if (bufferQueue_.empty()) {
172+
return newBuffer();
173+
}
174+
175+
BufferPtr buffer;
176+
const auto status = bufferQueue_.try_dequeue_for(buffer, timeout_);
177+
if (!status) {
178+
NIMBLE_UNREACHABLE(
179+
"Timed out dequeuing for buffer. Timeout set to " +
180+
std::to_string(timeout_.count()) + " ms");
181+
}
182+
return buffer;
183+
}
184+
185+
// Returns estimated number of buffers in the pool
186+
size_t size() {
187+
return bufferQueue_.size();
188+
}
189+
190+
private:
191+
static const uint64_t kMinChunkSize = 1LL << 20;
192+
const uint64_t defaultInitialChunkSize_ = kMinChunkSize;
193+
const std::chrono::milliseconds timeout_ = std::chrono::milliseconds(1000);
194+
195+
std::shared_ptr<folly::Executor> executor_;
196+
folly::DMPMCQueue<BufferPtr, false> bufferQueue_;
197+
std::unique_ptr<AsyncScope> backgroundScope_;
198+
199+
MemoryPool& memoryPool_;
200+
201+
BufferPtr newBuffer() {
202+
return std::make_unique<Buffer>(memoryPool_, defaultInitialChunkSize_);
203+
}
204+
205+
void addBuffer() {
206+
auto status = bufferQueue_.try_enqueue_for(newBuffer(), timeout_);
207+
if (!status) {
208+
NIMBLE_UNKNOWN(
209+
"Timed out enqueuing for buffer. Timeout set to " +
210+
std::to_string(timeout_.count()) + " ms");
211+
}
212+
}
213+
214+
// clears all buffers in the pool
215+
void deleteBufferPool() {
216+
while (!bufferQueue_.empty()) {
217+
BufferPtr buffer;
218+
bufferQueue_.dequeue(buffer);
219+
buffer.reset();
220+
}
221+
}
106222
};
107223

108224
} // namespace facebook::nimble
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include <gtest/gtest.h>
18+
#include "dwio/nimble/common/Buffer.h"
19+
#include "dwio/nimble/common/Exceptions.h"
20+
#include "folly/executors/CPUThreadPoolExecutor.h"
21+
#include "velox/common/memory/Memory.h"
22+
#include "velox/dwio/common/ExecutorBarrier.h"
23+
24+
namespace facebook::nimble::test {
25+
using MemoryPool = velox::memory::MemoryPool;
26+
using ExecutorBarrier = velox::dwio::common::ExecutorBarrier;
27+
28+
class BufferPoolTest : public ::testing::Test {
29+
protected:
30+
static void SetUpTestCase() {}
31+
32+
void SetUp() override {
33+
memPool_ = velox::memory::deprecatedAddDefaultLeafMemoryPool();
34+
executor_ = std::make_unique<folly::CPUThreadPoolExecutor>(1);
35+
}
36+
37+
std::shared_ptr<velox::memory::MemoryPool> memPool_;
38+
std::unique_ptr<folly::CPUThreadPoolExecutor> executor_;
39+
};
40+
41+
TEST_F(BufferPoolTest, CreateBufferPool) {
42+
auto bufferPool = BufferPool{*memPool_, std::move(executor_)};
43+
EXPECT_NO_THROW(bufferPool);
44+
}
45+
46+
TEST_F(BufferPoolTest, CreateBufferPoolWithInitialSize) {
47+
auto bufferPool = BufferPool{
48+
*memPool_, std::move(executor_), std::chrono::milliseconds{10}, 10, 10};
49+
EXPECT_EQ(bufferPool.size(), 10);
50+
}
51+
52+
TEST_F(BufferPoolTest, CreateBufferPoolBadTimeout) {
53+
auto throwLambda = [&]() {
54+
auto bufferPool = BufferPool{
55+
*memPool_, std::move(executor_), std::chrono::milliseconds{0}};
56+
};
57+
EXPECT_THROW(throwLambda(), NimbleUserError);
58+
59+
try {
60+
throwLambda();
61+
} catch (const NimbleUserError& e) {
62+
EXPECT_EQ(e.errorMessage(), "timeout must be > 0");
63+
}
64+
}
65+
66+
TEST_F(BufferPoolTest, CreateBufferPoolBadMaxPool) {
67+
auto throwLambda = [&]() {
68+
auto bufferPool = BufferPool{
69+
*memPool_, std::move(executor_), std::chrono::milliseconds{1}, 0};
70+
};
71+
EXPECT_THROW(throwLambda(), NimbleUserError);
72+
73+
try {
74+
throwLambda();
75+
} catch (const NimbleUserError& e) {
76+
EXPECT_EQ(e.errorMessage(), "max pool size must be > 0");
77+
}
78+
}
79+
80+
TEST_F(BufferPoolTest, CreateBufferPoolBadInitialSize) {
81+
auto throwLambda = [&]() {
82+
auto bufferPool = BufferPool{
83+
*memPool_, std::move(executor_), std::chrono::milliseconds{1}, 10, 0};
84+
};
85+
EXPECT_THROW(throwLambda(), NimbleUserError);
86+
try {
87+
throwLambda();
88+
} catch (const NimbleUserError& e) {
89+
EXPECT_EQ(e.errorMessage(), "initial pool size must be > 0");
90+
}
91+
}
92+
93+
TEST_F(BufferPoolTest, CreateBufferPoolBadInitialSizeMaxSize) {
94+
auto throwLambda = [&]() {
95+
auto bufferPool = BufferPool{
96+
*memPool_, std::move(executor_), std::chrono::milliseconds{1}, 1, 2};
97+
};
98+
EXPECT_THROW(throwLambda(), NimbleUserError);
99+
100+
try {
101+
throwLambda();
102+
} catch (const NimbleUserError& e) {
103+
EXPECT_EQ(e.errorMessage(), "initial pool size must be <= max pool size");
104+
}
105+
}
106+
107+
TEST_F(BufferPoolTest, ReserveBuffer) {
108+
auto bufferPool = BufferPool{*memPool_, std::move(executor_)};
109+
auto buffer = bufferPool.reserveBuffer();
110+
EXPECT_NE(buffer, nullptr);
111+
}
112+
113+
TEST_F(BufferPoolTest, AddBuffer) {
114+
auto bufferPool = BufferPool{*memPool_, std::move(executor_)};
115+
auto buffer = bufferPool.reserveBuffer();
116+
EXPECT_NE(buffer, nullptr);
117+
EXPECT_EQ(bufferPool.size(), 0);
118+
bufferPool.addBuffer(std::move(buffer));
119+
EXPECT_EQ(bufferPool.size(), 1);
120+
}
121+
122+
TEST_F(BufferPoolTest, DestroyBufferPool) {
123+
auto bufferPoolPtr =
124+
std::make_unique<BufferPool>(*memPool_, std::move(executor_));
125+
bufferPoolPtr.reset();
126+
EXPECT_NO_THROW();
127+
}
128+
129+
TEST_F(BufferPoolTest, emptyBufferPool) {
130+
auto bufferPool = BufferPool{
131+
*memPool_, std::move(executor_), std::chrono::milliseconds{10}, 1};
132+
EXPECT_EQ(bufferPool.size(), 1);
133+
134+
auto buffer1 = bufferPool.reserveBuffer();
135+
EXPECT_EQ(bufferPool.size(), 0);
136+
}
137+
138+
TEST_F(BufferPoolTest, OverfillBufferPool) {
139+
// not guarenteed to fail at size of 2 due to DMPMCQueue
140+
auto bufferPool = BufferPool{
141+
*memPool_, std::move(executor_), std::chrono::milliseconds{10}, 1};
142+
auto throwLambda = [&]() {
143+
for (auto i = 0; i < 5; i++) {
144+
auto buffer = std::make_unique<Buffer>(*memPool_);
145+
bufferPool.addBuffer(std::move(buffer));
146+
}
147+
};
148+
149+
EXPECT_THROW(throwLambda(), NimbleInternalError);
150+
}
151+
152+
TEST_F(BufferPoolTest, FillEmptyFillBufferPool) {
153+
size_t iterations = 10;
154+
std::vector<BufferPtr> buffers;
155+
156+
auto bufferPool = BufferPool{
157+
*memPool_,
158+
std::move(executor_),
159+
std::chrono::milliseconds{10},
160+
iterations};
161+
for (auto i = 0; i < iterations; i++) {
162+
auto buffer = bufferPool.reserveBuffer();
163+
buffers.push_back(std::move(buffer));
164+
}
165+
EXPECT_EQ(bufferPool.size(), 0);
166+
EXPECT_EQ(buffers.size(), iterations);
167+
for (auto& buffer : buffers) {
168+
bufferPool.addBuffer(std::move(buffer));
169+
}
170+
EXPECT_EQ(bufferPool.size(), iterations);
171+
buffers.clear();
172+
173+
for (auto i = 0; i < iterations; i++) {
174+
auto buffer = bufferPool.reserveBuffer();
175+
buffers.push_back(std::move(buffer));
176+
}
177+
EXPECT_EQ(bufferPool.size(), 0);
178+
EXPECT_EQ(buffers.size(), iterations);
179+
}
180+
181+
TEST_F(BufferPoolTest, ParallelFillPool) {
182+
folly::CPUThreadPoolExecutor executor{10};
183+
ExecutorBarrier barrier{executor};
184+
auto bufferPool = BufferPool{
185+
*memPool_,
186+
std::move(executor_),
187+
std::chrono::milliseconds{1000 * 10},
188+
100};
189+
auto fillPool = [&]() {
190+
for (auto i = 0; i < 10; i++) {
191+
EXPECT_NO_THROW(
192+
auto buffer = bufferPool.reserveBuffer();
193+
std::this_thread::sleep_for(std::chrono::milliseconds{1000});
194+
bufferPool.addBuffer(std::move(buffer)););
195+
}
196+
};
197+
198+
for (auto i = 0; i < 10; i++) {
199+
barrier.add(fillPool);
200+
}
201+
202+
barrier.waitAll();
203+
}
204+
205+
} // namespace facebook::nimble::test

0 commit comments

Comments
 (0)