Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/impl/cluster/kmeans_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,10 @@ KMeansCluster::find_nearest_one_with_hgraph(const float* query,
param.allocator_ = std::make_shared<SafeAllocator>(this->allocator_);
param.thread_pool_ = this->thread_pool_;
param.metric_ = MetricType::METRIC_TYPE_L2SQR;
auto max_degree = std::max(32, dim_ / 8);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Check for potential integer division issues with dim_ / 8.

If dim_ is less than 8, max_degree will default to 32. Please verify this is correct for small values of dim_.


auto hgraph = InnerIndexInterface::FastCreateIndex("hgraph|32|fp32", param);
auto hgraph =
InnerIndexInterface::FastCreateIndex(fmt::format("hgraph|{}|fp32", max_degree), param);
auto base = Dataset::Make();
Vector<int64_t> ids(k, allocator_);
std::iota(ids.begin(), ids.end(), 0);
Expand All @@ -263,6 +265,7 @@ KMeansCluster::find_nearest_one_with_hgraph(const float* query,
->Ids(ids.data())
->Owner(false);
hgraph->Build(base);
hgraph->SetImmutable();
FilterPtr filter = nullptr;
constexpr const char* search_param = R"({"hgraph":{"ef_search":10}})";
auto func = [&](const uint64_t begin, const uint64_t end) -> void {
Expand Down
87 changes: 39 additions & 48 deletions src/utils/resource_object_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,89 +33,80 @@ template <typename T,
class ResourceObjectPool {
public:
using ConstructFuncType = std::function<std::shared_ptr<T>()>;
static constexpr uint64_t kSubPoolCount = 16;

public:
template <typename... Args>
explicit ResourceObjectPool(uint64_t init_size, Allocator* allocator, Args... args)
: allocator_(allocator), pool_size_(init_size) {
: allocator_(allocator), init_size_(init_size) {
this->constructor_ = [=]() -> std::shared_ptr<T> { return std::make_shared<T>(args...); };
if (allocator_ == nullptr) {
this->owned_allocator_ = SafeAllocator::FactoryDefaultAllocator();
this->allocator_ = owned_allocator_.get();
}
this->pool_ = std::make_unique<Deque<std::shared_ptr<T>>>(this->allocator_);
this->resize(pool_size_);
for (int i = 0; i < kSubPoolCount; ++i) {
pool_[i] = std::make_unique<Deque<std::shared_ptr<T>>>(this->allocator_);
}
this->fill(init_size_);
}

~ResourceObjectPool() {
if (owned_allocator_ != nullptr) {
this->pool_.reset();
}
}

void
SetConstructor(ConstructFuncType func) {
this->constructor_ = func;
{
std::lock_guard<std::mutex> lock(mutex_);
while (not pool_->empty()) {
pool_->pop_front();
for (int i = 0; i < kSubPoolCount; ++i) {
pool_[i].reset();
}
}
this->resize(pool_size_);
}

std::shared_ptr<T>
TakeOne() {
std::unique_lock<std::mutex> lock(mutex_);
if (pool_->empty()) {
lock.unlock();
return this->constructor_();
while (true) {
for (int i = 0; i < kSubPoolCount; ++i) {
if (sub_pool_mutexes_[i].try_lock()) {
if (pool_[i]->empty()) {
sub_pool_mutexes_[i].unlock();
return this->constructor_();
}
std::shared_ptr<T> obj = pool_[i]->front();
pool_[i]->pop_front();
sub_pool_mutexes_[i].unlock();
obj->Reset();
return obj;
}
}
}
std::shared_ptr<T> obj = pool_->front();
pool_->pop_front();
pool_size_--;
lock.unlock();
obj->Reset();
return obj;
}

void
ReturnOne(std::shared_ptr<T>& obj) {
std::lock_guard<std::mutex> lock(mutex_);
pool_->emplace_back(obj);
pool_size_++;
}

[[nodiscard]] inline uint64_t
GetSize() const {
return this->pool_size_;
while (true) {
for (int i = 0; i < kSubPoolCount; ++i) {
if (sub_pool_mutexes_[i].try_lock()) {
pool_[i]->emplace_back(obj);
sub_pool_mutexes_[i].unlock();
return;
}
}
}
}

private:
inline void
resize(uint64_t size) {
std::lock_guard<std::mutex> lock(mutex_);
int count = size - pool_->size();
while (count > 0) {
pool_->emplace_back(this->constructor_());
--count;
}
while (count < 0) {
pool_->pop_front();
++count;
fill(uint64_t size) {
for (uint64_t i = 0; i < size; ++i) {
auto sub_pool_idx = i % kSubPoolCount;
pool_[sub_pool_idx]->emplace_back(this->constructor_());
}
}

std::unique_ptr<Deque<std::shared_ptr<T>>> pool_{nullptr};
std::atomic<uint64_t> pool_size_;
private:
std::unique_ptr<Deque<std::shared_ptr<T>>> pool_[kSubPoolCount];
std::mutex sub_pool_mutexes_[kSubPoolCount];
uint64_t init_size_{0};

ConstructFuncType constructor_{nullptr};
std::mutex mutex_;
Allocator* allocator_{nullptr};

private:
std::shared_ptr<Allocator> owned_allocator_{nullptr};
};

} // namespace vsag
18 changes: 0 additions & 18 deletions src/utils/visited_list_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,6 @@ TEST_CASE("VisitedListPool Basic Test", "[ut][VisitedListPool]") {
}
};

SECTION("test basic") {
std::vector<std::shared_ptr<VisitedList>> lists;
REQUIRE(pool->GetSize() == init_size);
lists.reserve(init_size * 2);
for (auto i = 0; i < init_size * 2; ++i) {
lists.emplace_back(pool->TakeOne());
}
REQUIRE(pool->GetSize() == 0);
for (auto& ptr : lists) {
pool->ReturnOne(ptr);
}
REQUIRE(pool->GetSize() == init_size * 2);

auto ptr = pool->TakeOne();
REQUIRE(pool->GetSize() == init_size * 2 - 1);
TestVL(ptr);
}

SECTION("test concurrency") {
auto func = [&]() {
int count = 10;
Expand Down