Skip to content

Commit 74f7dc9

Browse files
committed
Issues calling reclaimer / arbitrator APIs in single-thread execution (5790)
1 parent c4ceb71 commit 74f7dc9

17 files changed

+145
-70
lines changed

velox/common/memory/Memory.cpp

+27-13
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options)
6666
allocator_->capacity(),
6767
capacity_);
6868
VELOX_USER_CHECK_GE(capacity_, 0);
69+
if (arbitrator_ != nullptr) {
70+
VELOX_CHECK_EQ(arbitrator_->capacity(), capacity_);
71+
}
6972
MemoryAllocator::alignmentCheck(0, alignment_);
7073
defaultRoot_->grow(defaultRoot_->maxCapacity());
7174
const size_t numSharedPools =
@@ -129,19 +132,22 @@ std::shared_ptr<MemoryPool> MemoryManager::addRootPool(
129132
options.checkUsageLeak = checkUsageLeak_;
130133
options.debugEnabled = debugEnabled_;
131134

132-
folly::SharedMutex::WriteHolder guard{mutex_};
133-
if (pools_.find(poolName) != pools_.end()) {
134-
VELOX_FAIL("Duplicate root pool name found: {}", poolName);
135+
std::shared_ptr<MemoryPool> pool;
136+
{
137+
folly::SharedMutex::WriteHolder guard{mutex_};
138+
if (pools_.find(poolName) != pools_.end()) {
139+
VELOX_FAIL("Duplicate root pool name found: {}", poolName);
140+
}
141+
pool = std::make_shared<MemoryPoolImpl>(
142+
this,
143+
poolName,
144+
MemoryPool::Kind::kAggregate,
145+
nullptr,
146+
std::move(reclaimer),
147+
poolDestructionCb_,
148+
options);
149+
pools_.emplace(poolName, pool);
135150
}
136-
auto pool = std::make_shared<MemoryPoolImpl>(
137-
this,
138-
poolName,
139-
MemoryPool::Kind::kAggregate,
140-
nullptr,
141-
std::move(reclaimer),
142-
poolDestructionCb_,
143-
options);
144-
pools_.emplace(poolName, pool);
145151
VELOX_CHECK_EQ(pool->capacity(), 0);
146152
arbitrator_->reserveMemory(pool.get(), capacity);
147153
return pool;
@@ -158,6 +164,14 @@ std::shared_ptr<MemoryPool> MemoryManager::addLeafPool(
158164
return defaultRoot_->addLeafChild(poolName, threadSafe, nullptr);
159165
}
160166

167+
uint64_t MemoryManager::shrinkPool(MemoryPool* pool, uint64_t decrementBytes) {
168+
VELOX_CHECK_NOT_NULL(pool);
169+
if (arbitrator_ == nullptr) {
170+
return pool->shrink(decrementBytes);
171+
}
172+
return arbitrator_->releaseMemory(pool, decrementBytes);
173+
}
174+
161175
bool MemoryManager::growPool(MemoryPool* pool, uint64_t incrementBytes) {
162176
VELOX_CHECK_NOT_NULL(pool);
163177
VELOX_CHECK_NE(pool->capacity(), kMaxMemory);
@@ -176,7 +190,7 @@ void MemoryManager::dropPool(MemoryPool* pool) {
176190
VELOX_FAIL("The dropped memory pool {} not found", pool->name());
177191
}
178192
pools_.erase(it);
179-
arbitrator_->releaseMemory(pool);
193+
arbitrator_->releaseMemory(pool, 0);
180194
}
181195

182196
MemoryPool& MemoryManager::deprecatedSharedLeafPool() {

velox/common/memory/Memory.h

+4
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ class MemoryManager {
161161
const std::string& name = "",
162162
bool threadSafe = true);
163163

164+
/// Invoked to shrink a memory pool's free capacity with up to
165+
/// 'decrementBytes'.
166+
uint64_t shrinkPool(MemoryPool* pool, uint64_t decrementBytes);
167+
164168
/// Invoked to grows a memory pool's free capacity with at least
165169
/// 'incrementBytes'. The function returns true on success, otherwise false.
166170
bool growPool(MemoryPool* pool, uint64_t incrementBytes);

velox/common/memory/MemoryArbitrator.cpp

+6-1
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,9 @@ class NoopArbitrator : public MemoryArbitrator {
9696

9797
// Noop arbitrator has no memory capacity limit so no operation needed for
9898
// memory pool capacity release.
99-
void releaseMemory(MemoryPool* /*unused*/) override {
99+
uint64_t releaseMemory(MemoryPool* /*unused*/, uint64_t /*unused*/) override {
100100
// No-op
101+
return 0ULL;
101102
}
102103

103104
// Noop arbitrator has no memory capacity limit so no operation needed for
@@ -161,6 +162,10 @@ void MemoryArbitrator::unregisterAllFactories() {
161162
SharedArbitrator::unregisterFactory();
162163
}
163164

165+
uint64_t MemoryArbitrator::capacity() {
166+
return capacity_;
167+
}
168+
164169
std::unique_ptr<MemoryReclaimer> MemoryReclaimer::create() {
165170
return std::unique_ptr<MemoryReclaimer>(new MemoryReclaimer());
166171
}

velox/common/memory/MemoryArbitrator.h

+5-3
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,10 @@ class MemoryArbitrator {
120120
/// the memory arbitration on demand when actual memory allocation happens.
121121
virtual void reserveMemory(MemoryPool* pool, uint64_t bytes) = 0;
122122

123-
/// Invoked by the memory manager to return back all the reserved memory
124-
/// capacity of a destroying memory pool.
125-
virtual void releaseMemory(MemoryPool* pool) = 0;
123+
/// Invoked by the memory manager to return back the specified amount of
124+
/// reserved memory capacity of a destroying memory pool. If 0 is specified,
125+
/// release all reserve memory. Returns the actually released amount of bytes.
126+
virtual uint64_t releaseMemory(MemoryPool* pool, uint64_t bytes) = 0;
126127

127128
/// Invoked by the memory manager to grow a memory pool's capacity.
128129
/// 'pool' is the memory pool to request to grow. 'candidates' is a list
@@ -148,6 +149,7 @@ class MemoryArbitrator {
148149
const std::vector<std::shared_ptr<MemoryPool>>& pools,
149150
uint64_t targetBytes) = 0;
150151

152+
uint64_t capacity();
151153
/// The internal execution stats of the memory arbitrator.
152154
struct Stats {
153155
/// The number of arbitration requests.

velox/common/memory/MemoryPool.cpp

+10
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,16 @@ bool MemoryPoolImpl::incrementReservationThreadSafe(
756756
treeMemoryUsage()));
757757
}
758758

759+
uint64_t MemoryPoolImpl::shrinkManaged(
760+
MemoryPool* requestor,
761+
uint64_t targetBytes) {
762+
if (parent_ != nullptr) {
763+
return parent_->shrinkManaged(requestor, targetBytes);
764+
}
765+
VELOX_CHECK_NULL(parent_);
766+
return manager_->shrinkPool(requestor, targetBytes);
767+
};
768+
759769
bool MemoryPoolImpl::maybeIncrementReservation(uint64_t size) {
760770
std::lock_guard<std::mutex> l(mutex_);
761771
return maybeIncrementReservationLocked(size);

velox/common/memory/MemoryPool.h

+8
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,12 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
363363
/// without actually freeing the used memory.
364364
virtual uint64_t freeBytes() const = 0;
365365

366+
/// Try shrinking up to the specified amount of free memory via memory
367+
/// manager.
368+
virtual uint64_t shrinkManaged(
369+
MemoryPool* requestor,
370+
uint64_t targetBytes = 0) = 0;
371+
366372
/// Invoked to free up to the specified amount of free memory by reducing
367373
/// this memory pool's capacity without actually freeing any used memory. The
368374
/// function returns the actually freed memory capacity in bytes. If
@@ -623,6 +629,8 @@ class MemoryPoolImpl : public MemoryPool {
623629

624630
uint64_t reclaim(uint64_t targetBytes) override;
625631

632+
uint64_t shrinkManaged(MemoryPool* requestor, uint64_t targetBytes) override;
633+
626634
uint64_t shrink(uint64_t targetBytes = 0) override;
627635

628636
uint64_t grow(uint64_t bytes) noexcept override;

velox/common/memory/SharedArbitrator.cpp

+37-30
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,11 @@ void SharedArbitrator::reserveMemory(MemoryPool* pool, uint64_t /*unused*/) {
163163
pool->grow(reserveBytes);
164164
}
165165

166-
void SharedArbitrator::releaseMemory(MemoryPool* pool) {
166+
uint64_t SharedArbitrator::releaseMemory(MemoryPool* pool, uint64_t bytes) {
167167
std::lock_guard<std::mutex> l(mutex_);
168-
const uint64_t freedBytes = pool->shrink(0);
168+
const uint64_t freedBytes = pool->shrink(bytes);
169169
incrementFreeCapacityLocked(freedBytes);
170+
return freedBytes;
170171
}
171172

172173
std::vector<SharedArbitrator::Candidate> SharedArbitrator::getCandidateStats(
@@ -245,10 +246,7 @@ bool SharedArbitrator::ensureCapacity(
245246
if (checkCapacityGrowth(*requestor, targetBytes)) {
246247
return true;
247248
}
248-
const uint64_t reclaimedBytes = reclaim(requestor, targetBytes);
249-
// NOTE: return the reclaimed bytes back to the arbitrator and let the memory
250-
// arbitration process to grow the requestor's memory capacity accordingly.
251-
incrementFreeCapacity(reclaimedBytes);
249+
reclaim(requestor, targetBytes);
252250
// Check if the requestor has been aborted in reclaim operation above.
253251
if (requestor->aborted()) {
254252
++numFailures_;
@@ -293,51 +291,57 @@ bool SharedArbitrator::arbitrateMemory(
293291
const uint64_t growTarget = std::min(
294292
maxGrowBytes(*requestor),
295293
std::max(memoryPoolTransferCapacity_, targetBytes));
296-
uint64_t freedBytes = decrementFreeCapacity(growTarget);
297-
if (freedBytes >= targetBytes) {
298-
requestor->grow(freedBytes);
299-
return true;
300-
}
301-
VELOX_CHECK_LT(freedBytes, growTarget);
294+
uint64_t unusedFreedBytes = decrementFreeCapacity(growTarget);
302295

303296
auto freeGuard = folly::makeGuard([&]() {
304297
// Returns the unused freed memory capacity back to the arbitrator.
305-
if (freedBytes > 0) {
306-
incrementFreeCapacity(freedBytes);
298+
if (unusedFreedBytes > 0) {
299+
incrementFreeCapacity(unusedFreedBytes);
307300
}
308301
});
309302

310-
freedBytes +=
311-
reclaimFreeMemoryFromCandidates(candidates, growTarget - freedBytes);
312-
if (freedBytes >= targetBytes) {
313-
const uint64_t bytesToGrow = std::min(growTarget, freedBytes);
314-
requestor->grow(bytesToGrow);
315-
freedBytes -= bytesToGrow;
303+
if (unusedFreedBytes >= targetBytes) {
304+
requestor->grow(unusedFreedBytes);
305+
unusedFreedBytes = 0;
306+
return true;
307+
}
308+
VELOX_CHECK_LT(unusedFreedBytes, growTarget);
309+
310+
reclaimFreeMemoryFromCandidates(candidates, growTarget - unusedFreedBytes);
311+
unusedFreedBytes += decrementFreeCapacity(growTarget - unusedFreedBytes);
312+
if (unusedFreedBytes >= targetBytes) {
313+
requestor->grow(unusedFreedBytes);
314+
unusedFreedBytes = 0;
316315
return true;
317316
}
318317

319-
VELOX_CHECK_LT(freedBytes, growTarget);
320-
freedBytes += reclaimUsedMemoryFromCandidates(
321-
requestor, candidates, growTarget - freedBytes);
318+
VELOX_CHECK_LT(unusedFreedBytes, growTarget);
319+
reclaimUsedMemoryFromCandidates(
320+
requestor, candidates, growTarget - unusedFreedBytes);
321+
unusedFreedBytes += decrementFreeCapacity(growTarget - unusedFreedBytes);
322322
if (requestor->aborted()) {
323323
++numFailures_;
324324
VELOX_MEM_POOL_ABORTED("The requestor pool has been aborted.");
325325
}
326326

327327
VELOX_CHECK(!requestor->aborted());
328328

329-
if (freedBytes < targetBytes) {
329+
if (unusedFreedBytes < targetBytes) {
330330
VELOX_MEM_LOG(WARNING)
331331
<< "Failed to arbitrate sufficient memory for memory pool "
332332
<< requestor->name() << ", request " << succinctBytes(targetBytes)
333-
<< ", only " << succinctBytes(freedBytes)
333+
<< ", only " << succinctBytes(unusedFreedBytes)
334334
<< " has been freed, Arbitrator state: " << toString();
335335
return false;
336336
}
337337

338-
const uint64_t bytesToGrow = std::min(freedBytes, growTarget);
339-
requestor->grow(bytesToGrow);
340-
freedBytes -= bytesToGrow;
338+
if (unusedFreedBytes > growTarget) {
339+
requestor->grow(growTarget);
340+
unusedFreedBytes -= growTarget;
341+
return true;
342+
}
343+
requestor->grow(unusedFreedBytes);
344+
unusedFreedBytes = 0;
341345
return true;
342346
}
343347

@@ -358,7 +362,9 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates(
358362
if (bytesToShrink <= 0) {
359363
break;
360364
}
361-
freedBytes += candidate.pool->shrink(bytesToShrink);
365+
uint64_t shrunk = candidate.pool->shrink(bytesToShrink);
366+
incrementFreeCapacity(shrunk);
367+
freedBytes += shrunk;
362368
if (freedBytes >= targetBytes) {
363369
break;
364370
}
@@ -398,6 +404,7 @@ uint64_t SharedArbitrator::reclaim(
398404
uint64_t freedBytes{0};
399405
try {
400406
freedBytes = pool->shrink(targetBytes);
407+
incrementFreeCapacity(freedBytes);
401408
if (freedBytes < targetBytes) {
402409
pool->reclaim(targetBytes - freedBytes);
403410
}
@@ -407,7 +414,7 @@ uint64_t SharedArbitrator::reclaim(
407414
abort(pool, std::current_exception());
408415
// Free up all the free capacity from the aborted pool as the associated
409416
// query has failed at this point.
410-
pool->shrink();
417+
incrementFreeCapacity(pool->shrink());
411418
}
412419
const uint64_t newCapacity = pool->capacity();
413420
VELOX_CHECK_GE(oldCapacity, newCapacity);

velox/common/memory/SharedArbitrator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class SharedArbitrator : public MemoryArbitrator {
4343

4444
void reserveMemory(MemoryPool* pool, uint64_t /*unused*/) final;
4545

46-
void releaseMemory(MemoryPool* pool) final;
46+
uint64_t releaseMemory(MemoryPool* pool, uint64_t bytes) final;
4747

4848
bool growMemory(
4949
MemoryPool* pool,

velox/common/memory/tests/MemoryArbitratorTest.cpp

+2-6
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,9 @@ class FakeTestArbitrator : public MemoryArbitrator {
157157
.memoryPoolTransferCapacity = config.memoryPoolTransferCapacity,
158158
.retryArbitrationFailure = config.retryArbitrationFailure}) {}
159159

160-
void reserveMemory(MemoryPool* pool, uint64_t bytes) override {
161-
VELOX_NYI();
162-
}
160+
void reserveMemory(MemoryPool* pool, uint64_t bytes) override{VELOX_NYI()}
163161

164-
void releaseMemory(MemoryPool* pool) override {
165-
VELOX_NYI();
166-
}
162+
uint64_t releaseMemory(MemoryPool* pool, uint64_t bytes) override{VELOX_NYI()}
167163

168164
std::string kind() const override {
169165
return "USER";

velox/common/memory/tests/MemoryManagerTest.cpp

+1-4
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class FakeTestArbitrator : public MemoryArbitrator {
126126
VELOX_NYI();
127127
}
128128

129-
void releaseMemory(MemoryPool* pool) override {
129+
uint64_t releaseMemory(MemoryPool* pool, uint64_t bytes) override {
130130
VELOX_NYI();
131131
}
132132

@@ -210,9 +210,6 @@ TEST_F(MemoryManagerTest, addPoolWithArbitrator) {
210210
options.allocator = allocator.get();
211211
options.capacity = kCapacity;
212212
options.arbitratorKind = arbitratorKind_;
213-
// The arbitrator capacity will be overridden by the memory manager's
214-
// capacity.
215-
options.capacity = options.capacity;
216213
const uint64_t initialPoolCapacity = options.capacity / 32;
217214
options.memoryPoolInitCapacity = initialPoolCapacity;
218215
MemoryManager manager{options};

velox/common/memory/tests/MockSharedArbitratorTest.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ class MockMemoryOperator {
307307
for (const auto& allocation : allocationsToFree) {
308308
pool_->free(allocation.buffer, allocation.size);
309309
}
310-
return pool_->shrink(targetBytes);
310+
return pool_->shrinkManaged(pool, targetBytes);
311311
}
312312

313313
void abort(MemoryPool* pool) {

velox/core/QueryCtx.h

+15-5
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,13 @@ class QueryCtx {
6767
return cache_;
6868
}
6969

70-
folly::Executor* executor() const {
71-
if (executor_ != nullptr) {
72-
return executor_;
73-
}
74-
auto executor = executorKeepalive_.get();
70+
bool isExecutorSupplied() const {
71+
auto executor = executor0();
72+
return executor != nullptr;
73+
}
74+
75+
folly::Executor* FOLLY_NONNULL executor() const {
76+
auto executor = executor0();
7577
VELOX_CHECK(executor, "Executor was not supplied.");
7678
return executor;
7779
}
@@ -130,6 +132,14 @@ class QueryCtx {
130132
}
131133
}
132134

135+
folly::Executor* executor0() const {
136+
if (executor_ != nullptr) {
137+
return executor_;
138+
}
139+
auto executor = executorKeepalive_.get();
140+
return executor;
141+
}
142+
133143
const std::string queryId_;
134144

135145
std::unordered_map<std::string, std::shared_ptr<Config>> connectorConfigs_;

velox/dwio/dwrf/test/WriterFlushTest.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,10 @@ class MockMemoryPool : public velox::memory::MemoryPool {
208208
VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__);
209209
}
210210

211+
uint64_t shrinkManaged(MemoryPool* /*unused*/, uint64_t /*unused*/) override {
212+
VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__);
213+
}
214+
211215
uint64_t grow(uint64_t /*unused*/) noexcept override {
212216
VELOX_UNSUPPORTED("{} unsupported", __FUNCTION__);
213217
}

0 commit comments

Comments
 (0)