Skip to content

Commit a3493d0

Browse files
Scott Youngfacebook-github-bot
Scott Young
authored andcommitted
add bufferPool for nimble parallel writer (facebookincubator#103)
Summary: change the buffer class to a bufferPool to handle multihreaded buffers without mutexes Differential Revision: D64774959
1 parent 5c87b8d commit a3493d0

File tree

7 files changed

+323
-33
lines changed

7 files changed

+323
-33
lines changed

dwio/nimble/common/Buffer.h

Lines changed: 101 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
#pragma once
1717

18+
#include "dwio/nimble/common/Exceptions.h"
19+
#include "folly/concurrency/DynamicBoundedQueue.h"
1820
#include "velox/buffer/Buffer.h"
1921
#include "velox/common/memory/Memory.h"
2022

@@ -33,12 +35,11 @@
3335
// and so on
3436

3537
namespace facebook::nimble {
38+
using MemoryPool = facebook::velox::memory::MemoryPool;
3639

3740
// Internally manages memory in chunks. Releases memory only upon destruction.
3841
// Buffer is NOT threadsafe: external locking is required.
3942
class Buffer {
40-
using MemoryPool = facebook::velox::memory::MemoryPool;
41-
4243
public:
4344
explicit Buffer(
4445
MemoryPool& memoryPool,
@@ -52,7 +53,6 @@ class Buffer {
5253
// to, and guarantees for the lifetime of *this that that region will remain
5354
// valid. Does NOT guarantee that the region is initially 0'd.
5455
char* reserve(uint64_t bytes) {
55-
std::scoped_lock<std::mutex> l(mutex_);
5656
if (reserveEnd_ + bytes <= chunkEnd_) {
5757
pos_ = reserveEnd_;
5858
reserveEnd_ += bytes;
@@ -98,11 +98,104 @@ class Buffer {
9898
char* reserveEnd_;
9999
std::vector<velox::BufferPtr> chunks_;
100100
MemoryPool& memoryPool_;
101-
// NOTE: this is temporary fix, to quickly enable parallel access to the
102-
// buffer class. In the near future, we are going to templetize this class to
103-
// produce a concurrent and a non-concurrent variants, and change the call
104-
// sites to use each variant when needed.
105-
std::mutex mutex_;
101+
};
102+
103+
using BufferPtr = std::unique_ptr<Buffer>;
104+
105+
// Manages a pool of buffers. Buffers are returned to the pool when released.
106+
// maxPoolSize should be set to at least 90% of capacity for performance
107+
class BufferPool {
108+
public:
109+
explicit BufferPool(
110+
MemoryPool& memoryPool,
111+
std::chrono::milliseconds timeout = std::chrono::milliseconds(1000 * 10),
112+
size_t maxPoolSize = std::thread::hardware_concurrency(),
113+
size_t initialBufferCount = 1,
114+
uint64_t initialChunkSize = kMinChunkSize)
115+
: defaultInitialChunkSize_{initialChunkSize},
116+
timeout_{timeout},
117+
bufferQueue_{maxPoolSize},
118+
memoryPool_{memoryPool} {
119+
NIMBLE_CHECK(maxPoolSize > 0, "max pool size must be > 0")
120+
NIMBLE_CHECK(timeout_.count() > 0, "timeout must be > 0")
121+
NIMBLE_CHECK(0 < initialBufferCount, "initial pool size must be > 0");
122+
NIMBLE_CHECK(
123+
initialBufferCount <= maxPoolSize,
124+
"initial pool size must be <= max pool size")
125+
126+
for (size_t i = 0; i < initialBufferCount; ++i) {
127+
addBuffer();
128+
}
129+
}
130+
~BufferPool() {
131+
deleteBufferPool();
132+
}
133+
134+
MemoryPool& getMemoryPool() {
135+
return memoryPool_;
136+
}
137+
138+
// Releases the buffer back to the pool.
139+
void addBuffer(BufferPtr buffer) {
140+
auto status = bufferQueue_.try_enqueue_for(std::move(buffer), timeout_);
141+
if (!status) {
142+
NIMBLE_UNKNOWN(
143+
"Timed out enqueuing for buffer. Timeout set to " +
144+
std::to_string(timeout_.count()) + " ms");
145+
}
146+
}
147+
148+
// Reserves a buffer from the pool. Adds a new buffer to the pool
149+
// while there are buffers available
150+
BufferPtr reserveBuffer() {
151+
if (bufferQueue_.empty()) {
152+
return newBuffer();
153+
}
154+
155+
BufferPtr buffer;
156+
auto status = bufferQueue_.try_dequeue_for(buffer, timeout_);
157+
if (!status) {
158+
NIMBLE_UNREACHABLE(
159+
"Timed out dequeuing for buffer. Timeout set to " +
160+
std::to_string(timeout_.count()) + " ms");
161+
}
162+
return buffer;
163+
}
164+
165+
// Returns estimated number of buffers in the pool
166+
size_t size() {
167+
return bufferQueue_.size();
168+
}
169+
170+
private:
171+
static const uint64_t kMinChunkSize = 1LL << 20;
172+
const uint64_t defaultInitialChunkSize_ = kMinChunkSize;
173+
const std::chrono::milliseconds timeout_ = std::chrono::milliseconds(1000);
174+
175+
folly::DMPMCQueue<BufferPtr, true> bufferQueue_;
176+
MemoryPool& memoryPool_;
177+
178+
BufferPtr newBuffer() {
179+
return std::make_unique<Buffer>(memoryPool_, defaultInitialChunkSize_);
180+
}
181+
182+
void addBuffer() {
183+
auto status = bufferQueue_.try_enqueue_for(newBuffer(), timeout_);
184+
if (!status) {
185+
NIMBLE_UNKNOWN(
186+
"Timed out enqueuing for buffer. Timeout set to " +
187+
std::to_string(timeout_.count()) + " ms");
188+
}
189+
}
190+
191+
// clears all buffers in the pool
192+
void deleteBufferPool() {
193+
while (!bufferQueue_.empty()) {
194+
BufferPtr buffer;
195+
bufferQueue_.dequeue(buffer);
196+
buffer.reset();
197+
}
198+
}
106199
};
107200

108201
} // namespace facebook::nimble
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include <gtest/gtest.h>
18+
#include "dwio/nimble/common/Buffer.h"
19+
#include "dwio/nimble/common/Exceptions.h"
20+
#include "folly/executors/CPUThreadPoolExecutor.h"
21+
#include "velox/common/memory/Memory.h"
22+
#include "velox/dwio/common/ExecutorBarrier.h"
23+
24+
namespace facebook::nimble::test {
25+
using MemoryPool = velox::memory::MemoryPool;
26+
using ExecutorBarrier = velox::dwio::common::ExecutorBarrier;
27+
28+
class BufferPoolTest : public ::testing::Test {
29+
protected:
30+
static void SetUpTestCase() {}
31+
32+
void SetUp() override {
33+
memPool_ = velox::memory::deprecatedAddDefaultLeafMemoryPool();
34+
}
35+
36+
std::shared_ptr<velox::memory::MemoryPool> memPool_;
37+
};
38+
39+
TEST_F(BufferPoolTest, CreateBufferPool) {
40+
auto bufferPool = BufferPool{*memPool_};
41+
EXPECT_NO_THROW(bufferPool);
42+
}
43+
44+
TEST_F(BufferPoolTest, CreateBufferPoolWithInitialSize) {
45+
auto bufferPool =
46+
BufferPool{*memPool_, std::chrono::milliseconds{10}, 10, 10};
47+
EXPECT_EQ(bufferPool.size(), 10);
48+
}
49+
50+
TEST_F(BufferPoolTest, CreateBufferPoolBadTimeout) {
51+
auto throwLambda = [&]() {
52+
auto bufferPool = BufferPool{*memPool_, std::chrono::milliseconds{0}};
53+
};
54+
EXPECT_THROW(throwLambda(), NimbleUserError);
55+
56+
try {
57+
throwLambda();
58+
} catch (const NimbleUserError& e) {
59+
EXPECT_EQ(e.errorMessage(), "timeout must be > 0");
60+
}
61+
}
62+
63+
TEST_F(BufferPoolTest, CreateBufferPoolBadMaxPool) {
64+
auto throwLambda = [&]() {
65+
auto bufferPool = BufferPool{*memPool_, std::chrono::milliseconds{1}, 0};
66+
};
67+
EXPECT_THROW(throwLambda(), NimbleUserError);
68+
69+
try {
70+
throwLambda();
71+
} catch (const NimbleUserError& e) {
72+
EXPECT_EQ(e.errorMessage(), "max pool size must be > 0");
73+
}
74+
}
75+
76+
TEST_F(BufferPoolTest, CreateBufferPoolBadInitialSize) {
77+
auto throwLambda = [&]() {
78+
auto bufferPool =
79+
BufferPool{*memPool_, std::chrono::milliseconds{1}, 10, 0};
80+
};
81+
EXPECT_THROW(throwLambda(), NimbleUserError);
82+
try {
83+
throwLambda();
84+
} catch (const NimbleUserError& e) {
85+
EXPECT_EQ(e.errorMessage(), "initial pool size must be > 0");
86+
}
87+
}
88+
89+
TEST_F(BufferPoolTest, CreateBufferPoolBadInitialSizeMaxSize) {
90+
auto throwLambda = [&]() {
91+
auto bufferPool = BufferPool{*memPool_, std::chrono::milliseconds{1}, 1, 2};
92+
};
93+
EXPECT_THROW(throwLambda(), NimbleUserError);
94+
95+
try {
96+
throwLambda();
97+
} catch (const NimbleUserError& e) {
98+
EXPECT_EQ(e.errorMessage(), "initial pool size must be <= max pool size");
99+
}
100+
}
101+
102+
TEST_F(BufferPoolTest, ReserveBuffer) {
103+
auto bufferPool = BufferPool{*memPool_};
104+
auto buffer = bufferPool.reserveBuffer();
105+
EXPECT_NE(buffer, nullptr);
106+
}
107+
108+
TEST_F(BufferPoolTest, AddBuffer) {
109+
auto bufferPool = BufferPool{*memPool_};
110+
auto buffer = bufferPool.reserveBuffer();
111+
EXPECT_NE(buffer, nullptr);
112+
EXPECT_EQ(bufferPool.size(), 0);
113+
bufferPool.addBuffer(std::move(buffer));
114+
EXPECT_EQ(bufferPool.size(), 1);
115+
}
116+
117+
TEST_F(BufferPoolTest, DestroyBufferPool) {
118+
auto bufferPoolPtr = std::make_unique<BufferPool>(*memPool_);
119+
bufferPoolPtr.reset();
120+
EXPECT_NO_THROW();
121+
}
122+
123+
TEST_F(BufferPoolTest, emptyBufferPool) {
124+
auto bufferPool = BufferPool{*memPool_, std::chrono::milliseconds{10}, 1};
125+
EXPECT_EQ(bufferPool.size(), 1);
126+
127+
auto buffer1 = bufferPool.reserveBuffer();
128+
EXPECT_EQ(bufferPool.size(), 0);
129+
}
130+
131+
TEST_F(BufferPoolTest, OverfillBufferPool) {
132+
// not guarenteed to fail at size of 2 due to DMPMCQueue
133+
auto bufferPool = BufferPool{*memPool_, std::chrono::milliseconds{10}, 1};
134+
auto throwLambda = [&]() {
135+
for (auto i = 0; i < 5; i++) {
136+
auto buffer = std::make_unique<Buffer>(*memPool_);
137+
bufferPool.addBuffer(std::move(buffer));
138+
}
139+
};
140+
141+
EXPECT_THROW(throwLambda(), NimbleInternalError);
142+
}
143+
144+
TEST_F(BufferPoolTest, FillEmptyFillBufferPool) {
145+
size_t iterations = 10;
146+
std::vector<BufferPtr> buffers;
147+
148+
auto bufferPool =
149+
BufferPool{*memPool_, std::chrono::milliseconds{10}, iterations};
150+
for (auto i = 0; i < iterations; i++) {
151+
auto buffer = bufferPool.reserveBuffer();
152+
buffers.push_back(std::move(buffer));
153+
}
154+
EXPECT_EQ(bufferPool.size(), 0);
155+
EXPECT_EQ(buffers.size(), iterations);
156+
for (auto& buffer : buffers) {
157+
bufferPool.addBuffer(std::move(buffer));
158+
}
159+
EXPECT_EQ(bufferPool.size(), iterations);
160+
buffers.clear();
161+
162+
for (auto i = 0; i < iterations; i++) {
163+
auto buffer = bufferPool.reserveBuffer();
164+
buffers.push_back(std::move(buffer));
165+
}
166+
EXPECT_EQ(bufferPool.size(), 0);
167+
EXPECT_EQ(buffers.size(), iterations);
168+
}
169+
170+
TEST_F(BufferPoolTest, ParallelFillPool) {
171+
folly::CPUThreadPoolExecutor executor{10};
172+
ExecutorBarrier barrier{executor};
173+
auto bufferPool =
174+
BufferPool{*memPool_, std::chrono::milliseconds{1000 * 10}, 100};
175+
auto fillPool = [&]() {
176+
for (auto i = 0; i < 10; i++) {
177+
EXPECT_NO_THROW(
178+
auto buffer = bufferPool.reserveBuffer();
179+
std::this_thread::sleep_for(std::chrono::milliseconds{1000});
180+
bufferPool.addBuffer(std::move(buffer)););
181+
}
182+
};
183+
184+
for (auto i = 0; i < 10; i++) {
185+
barrier.add(fillPool);
186+
}
187+
188+
barrier.waitAll();
189+
}
190+
191+
} // namespace facebook::nimble::test

dwio/nimble/velox/FieldWriter.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,6 @@ class SimpleFieldWriter : public FieldWriter {
285285
void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
286286
override {
287287
auto size = ranges.size();
288-
auto& buffer = context_.stringBuffer();
289288
auto& data = valuesStream_.mutableData();
290289

291290
if (auto flat = vector->asFlatVector<SourceType>()) {
@@ -327,8 +326,11 @@ class SimpleFieldWriter : public FieldWriter {
327326
valuesStream_.mutableNonNulls(),
328327
Flat<SourceType>{vector},
329328
[&](SourceType value) {
329+
auto bufferPtr = context_.bufferPool().reserveBuffer();
330+
auto& buffer = *bufferPtr;
330331
data.push_back(
331332
C::convert(value, buffer, valuesStream_.extraMemory()));
333+
context_.bufferPool().addBuffer(std::move(bufferPtr));
332334
});
333335
}
334336
} else {
@@ -340,8 +342,11 @@ class SimpleFieldWriter : public FieldWriter {
340342
valuesStream_.mutableNonNulls(),
341343
Decoded<SourceType>{decoded},
342344
[&](SourceType value) {
345+
auto bufferPtr = context_.bufferPool().reserveBuffer();
346+
auto& buffer = *bufferPtr;
343347
data.push_back(
344348
C::convert(value, buffer, valuesStream_.extraMemory()));
349+
context_.bufferPool().addBuffer(std::move(bufferPtr));
345350
});
346351
context_.decodingPairPool().addPair(std::move(pair));
347352
}

0 commit comments

Comments
 (0)