From 25c2eab6e59e6b43752d5ae400c284056c01be3b Mon Sep 17 00:00:00 2001 From: LHT129 Date: Fri, 10 Oct 2025 14:56:12 +0800 Subject: [PATCH] improvement: speed up kmeans on large dataset - vt pool waste too much time on fast search in HGraph Index Signed-off-by: LHT129 --- src/impl/cluster/kmeans_cluster.cpp | 5 +- src/utils/resource_object_pool.h | 87 +++++++++++++---------------- src/utils/visited_list_test.cpp | 18 ------ 3 files changed, 43 insertions(+), 67 deletions(-) diff --git a/src/impl/cluster/kmeans_cluster.cpp b/src/impl/cluster/kmeans_cluster.cpp index 534148d30..71adab101 100644 --- a/src/impl/cluster/kmeans_cluster.cpp +++ b/src/impl/cluster/kmeans_cluster.cpp @@ -252,8 +252,10 @@ KMeansCluster::find_nearest_one_with_hgraph(const float* query, param.allocator_ = std::make_shared(this->allocator_); param.thread_pool_ = this->thread_pool_; param.metric_ = MetricType::METRIC_TYPE_L2SQR; + auto max_degree = std::max(32, dim_ / 8); - auto hgraph = InnerIndexInterface::FastCreateIndex("hgraph|32|fp32", param); + auto hgraph = + InnerIndexInterface::FastCreateIndex(fmt::format("hgraph|{}|fp32", max_degree), param); auto base = Dataset::Make(); Vector ids(k, allocator_); std::iota(ids.begin(), ids.end(), 0); @@ -263,6 +265,7 @@ KMeansCluster::find_nearest_one_with_hgraph(const float* query, ->Ids(ids.data()) ->Owner(false); hgraph->Build(base); + hgraph->SetImmutable(); FilterPtr filter = nullptr; constexpr const char* search_param = R"({"hgraph":{"ef_search":10}})"; auto func = [&](const uint64_t begin, const uint64_t end) -> void { diff --git a/src/utils/resource_object_pool.h b/src/utils/resource_object_pool.h index 910fec39e..6f1a85607 100644 --- a/src/utils/resource_object_pool.h +++ b/src/utils/resource_object_pool.h @@ -33,89 +33,80 @@ template ()>; + static constexpr uint64_t kSubPoolCount = 16; public: template explicit ResourceObjectPool(uint64_t init_size, Allocator* allocator, Args... args) - : allocator_(allocator), pool_size_(init_size) { + : allocator_(allocator), init_size_(init_size) { this->constructor_ = [=]() -> std::shared_ptr { return std::make_shared(args...); }; if (allocator_ == nullptr) { this->owned_allocator_ = SafeAllocator::FactoryDefaultAllocator(); this->allocator_ = owned_allocator_.get(); } - this->pool_ = std::make_unique>>(this->allocator_); - this->resize(pool_size_); + for (int i = 0; i < kSubPoolCount; ++i) { + pool_[i] = std::make_unique>>(this->allocator_); + } + this->fill(init_size_); } ~ResourceObjectPool() { if (owned_allocator_ != nullptr) { - this->pool_.reset(); - } - } - - void - SetConstructor(ConstructFuncType func) { - this->constructor_ = func; - { - std::lock_guard lock(mutex_); - while (not pool_->empty()) { - pool_->pop_front(); + for (int i = 0; i < kSubPoolCount; ++i) { + pool_[i].reset(); } } - this->resize(pool_size_); } std::shared_ptr TakeOne() { - std::unique_lock lock(mutex_); - if (pool_->empty()) { - lock.unlock(); - return this->constructor_(); + while (true) { + for (int i = 0; i < kSubPoolCount; ++i) { + if (sub_pool_mutexes_[i].try_lock()) { + if (pool_[i]->empty()) { + sub_pool_mutexes_[i].unlock(); + return this->constructor_(); + } + std::shared_ptr obj = pool_[i]->front(); + pool_[i]->pop_front(); + sub_pool_mutexes_[i].unlock(); + obj->Reset(); + return obj; + } + } } - std::shared_ptr obj = pool_->front(); - pool_->pop_front(); - pool_size_--; - lock.unlock(); - obj->Reset(); - return obj; } void ReturnOne(std::shared_ptr& obj) { - std::lock_guard lock(mutex_); - pool_->emplace_back(obj); - pool_size_++; - } - - [[nodiscard]] inline uint64_t - GetSize() const { - return this->pool_size_; + while (true) { + for (int i = 0; i < kSubPoolCount; ++i) { + if (sub_pool_mutexes_[i].try_lock()) { + pool_[i]->emplace_back(obj); + sub_pool_mutexes_[i].unlock(); + return; + } + } + } } private: inline void - resize(uint64_t size) { - std::lock_guard lock(mutex_); - int count = size - pool_->size(); - while (count > 0) { - pool_->emplace_back(this->constructor_()); - --count; - } - while (count < 0) { - pool_->pop_front(); - ++count; + fill(uint64_t size) { + for (uint64_t i = 0; i < size; ++i) { + auto sub_pool_idx = i % kSubPoolCount; + pool_[sub_pool_idx]->emplace_back(this->constructor_()); } } - std::unique_ptr>> pool_{nullptr}; - std::atomic pool_size_; +private: + std::unique_ptr>> pool_[kSubPoolCount]; + std::mutex sub_pool_mutexes_[kSubPoolCount]; + uint64_t init_size_{0}; ConstructFuncType constructor_{nullptr}; - std::mutex mutex_; Allocator* allocator_{nullptr}; -private: std::shared_ptr owned_allocator_{nullptr}; }; - } // namespace vsag diff --git a/src/utils/visited_list_test.cpp b/src/utils/visited_list_test.cpp index f8a63dd54..457827a92 100644 --- a/src/utils/visited_list_test.cpp +++ b/src/utils/visited_list_test.cpp @@ -88,24 +88,6 @@ TEST_CASE("VisitedListPool Basic Test", "[ut][VisitedListPool]") { } }; - SECTION("test basic") { - std::vector> lists; - REQUIRE(pool->GetSize() == init_size); - lists.reserve(init_size * 2); - for (auto i = 0; i < init_size * 2; ++i) { - lists.emplace_back(pool->TakeOne()); - } - REQUIRE(pool->GetSize() == 0); - for (auto& ptr : lists) { - pool->ReturnOne(ptr); - } - REQUIRE(pool->GetSize() == init_size * 2); - - auto ptr = pool->TakeOne(); - REQUIRE(pool->GetSize() == init_size * 2 - 1); - TestVL(ptr); - } - SECTION("test concurrency") { auto func = [&]() { int count = 10;