Skip to content

Commit be5fa22

Browse files
committed
improvement: speed up kmeans on large dataset
- vt pool waste too much time on fast search in HGraph Index Signed-off-by: LHT129 <[email protected]>
1 parent fe957af commit be5fa22

File tree

3 files changed

+43
-67
lines changed

3 files changed

+43
-67
lines changed

src/impl/cluster/kmeans_cluster.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,10 @@ KMeansCluster::find_nearest_one_with_hgraph(const float* query,
252252
param.allocator_ = std::make_shared<SafeAllocator>(this->allocator_);
253253
param.thread_pool_ = this->thread_pool_;
254254
param.metric_ = MetricType::METRIC_TYPE_L2SQR;
255+
auto max_degree = std::max(32, dim_ / 8);
255256

256-
auto hgraph = InnerIndexInterface::FastCreateIndex("hgraph|32|fp32", param);
257+
auto hgraph =
258+
InnerIndexInterface::FastCreateIndex(fmt::format("hgraph|{}|fp32", max_degree), param);
257259
auto base = Dataset::Make();
258260
Vector<int64_t> ids(k, allocator_);
259261
std::iota(ids.begin(), ids.end(), 0);
@@ -263,6 +265,7 @@ KMeansCluster::find_nearest_one_with_hgraph(const float* query,
263265
->Ids(ids.data())
264266
->Owner(false);
265267
hgraph->Build(base);
268+
hgraph->SetImmutable();
266269
FilterPtr filter = nullptr;
267270
constexpr const char* search_param = R"({"hgraph":{"ef_search":10}})";
268271
auto func = [&](const uint64_t begin, const uint64_t end) -> void {

src/utils/resource_object_pool.h

Lines changed: 39 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -33,89 +33,80 @@ template <typename T,
3333
class ResourceObjectPool {
3434
public:
3535
using ConstructFuncType = std::function<std::shared_ptr<T>()>;
36+
static constexpr uint64_t kSubPoolCount = 16;
3637

3738
public:
3839
template <typename... Args>
3940
explicit ResourceObjectPool(uint64_t init_size, Allocator* allocator, Args... args)
40-
: allocator_(allocator), pool_size_(init_size) {
41+
: allocator_(allocator), init_size_(init_size) {
4142
this->constructor_ = [=]() -> std::shared_ptr<T> { return std::make_shared<T>(args...); };
4243
if (allocator_ == nullptr) {
4344
this->owned_allocator_ = SafeAllocator::FactoryDefaultAllocator();
4445
this->allocator_ = owned_allocator_.get();
4546
}
46-
this->pool_ = std::make_unique<Deque<std::shared_ptr<T>>>(this->allocator_);
47-
this->resize(pool_size_);
47+
for (int i = 0; i < kSubPoolCount; ++i) {
48+
pool_[i] = std::make_unique<Deque<std::shared_ptr<T>>>(this->allocator_);
49+
}
50+
this->fill(init_size_);
4851
}
4952

5053
~ResourceObjectPool() {
5154
if (owned_allocator_ != nullptr) {
52-
this->pool_.reset();
53-
}
54-
}
55-
56-
void
57-
SetConstructor(ConstructFuncType func) {
58-
this->constructor_ = func;
59-
{
60-
std::lock_guard<std::mutex> lock(mutex_);
61-
while (not pool_->empty()) {
62-
pool_->pop_front();
55+
for (int i = 0; i < kSubPoolCount; ++i) {
56+
pool_[i].reset();
6357
}
6458
}
65-
this->resize(pool_size_);
6659
}
6760

6861
std::shared_ptr<T>
6962
TakeOne() {
70-
std::unique_lock<std::mutex> lock(mutex_);
71-
if (pool_->empty()) {
72-
lock.unlock();
73-
return this->constructor_();
63+
while (true) {
64+
for (int i = 0; i < kSubPoolCount; ++i) {
65+
if (sub_pool_mutexes_[i].try_lock()) {
66+
if (pool_[i]->empty()) {
67+
sub_pool_mutexes_[i].unlock();
68+
return this->constructor_();
69+
}
70+
std::shared_ptr<T> obj = pool_[i]->front();
71+
pool_[i]->pop_front();
72+
sub_pool_mutexes_[i].unlock();
73+
obj->Reset();
74+
return obj;
75+
}
76+
}
7477
}
75-
std::shared_ptr<T> obj = pool_->front();
76-
pool_->pop_front();
77-
pool_size_--;
78-
lock.unlock();
79-
obj->Reset();
80-
return obj;
8178
}
8279

8380
void
8481
ReturnOne(std::shared_ptr<T>& obj) {
85-
std::lock_guard<std::mutex> lock(mutex_);
86-
pool_->emplace_back(obj);
87-
pool_size_++;
88-
}
89-
90-
[[nodiscard]] inline uint64_t
91-
GetSize() const {
92-
return this->pool_size_;
82+
while (true) {
83+
for (int i = 0; i < kSubPoolCount; ++i) {
84+
if (sub_pool_mutexes_[i].try_lock()) {
85+
pool_[i]->emplace_back(obj);
86+
sub_pool_mutexes_[i].unlock();
87+
return;
88+
}
89+
}
90+
}
9391
}
9492

9593
private:
9694
inline void
97-
resize(uint64_t size) {
98-
std::lock_guard<std::mutex> lock(mutex_);
99-
int count = size - pool_->size();
100-
while (count > 0) {
101-
pool_->emplace_back(this->constructor_());
102-
--count;
103-
}
104-
while (count < 0) {
105-
pool_->pop_front();
106-
++count;
95+
fill(uint64_t size) {
96+
for (uint64_t i = 0; i < size; ++i) {
97+
auto sub_pool_idx = i % kSubPoolCount;
98+
pool_[sub_pool_idx]->emplace_back(this->constructor_());
10799
}
108100
}
109101

110-
std::unique_ptr<Deque<std::shared_ptr<T>>> pool_{nullptr};
111-
std::atomic<uint64_t> pool_size_;
102+
private:
103+
std::unique_ptr<Deque<std::shared_ptr<T>>> pool_[kSubPoolCount];
104+
std::mutex sub_pool_mutexes_[kSubPoolCount];
105+
uint64_t init_size_{0};
112106

113107
ConstructFuncType constructor_{nullptr};
114-
std::mutex mutex_;
115108
Allocator* allocator_{nullptr};
116109

117-
private:
118110
std::shared_ptr<Allocator> owned_allocator_{nullptr};
119111
};
120-
121112
} // namespace vsag

src/utils/visited_list_test.cpp

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -88,24 +88,6 @@ TEST_CASE("VisitedListPool Basic Test", "[ut][VisitedListPool]") {
8888
}
8989
};
9090

91-
SECTION("test basic") {
92-
std::vector<std::shared_ptr<VisitedList>> lists;
93-
REQUIRE(pool->GetSize() == init_size);
94-
lists.reserve(init_size * 2);
95-
for (auto i = 0; i < init_size * 2; ++i) {
96-
lists.emplace_back(pool->TakeOne());
97-
}
98-
REQUIRE(pool->GetSize() == 0);
99-
for (auto& ptr : lists) {
100-
pool->ReturnOne(ptr);
101-
}
102-
REQUIRE(pool->GetSize() == init_size * 2);
103-
104-
auto ptr = pool->TakeOne();
105-
REQUIRE(pool->GetSize() == init_size * 2 - 1);
106-
TestVL(ptr);
107-
}
108-
10991
SECTION("test concurrency") {
11092
auto func = [&]() {
11193
int count = 10;

0 commit comments

Comments
 (0)