Skip to content

Commit 84ce245

Browse files
committed
feat(store): add batch exist support for master
1 parent 8d2e596 commit 84ce245

File tree

12 files changed

+431
-89
lines changed

12 files changed

+431
-89
lines changed

mooncake-integration/store/store_py.cpp

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,48 @@ int DistributedObjectStore::isExist(const std::string &key) {
494494
return toInt(err); // Error
495495
}
496496

497+
std::vector<int> DistributedObjectStore::batchIsExist(
498+
const std::vector<std::string> &keys) {
499+
std::vector<int> results;
500+
501+
if (!client_) {
502+
LOG(ERROR) << "Client is not initialized";
503+
results.resize(keys.size(), -1); // Fill with error codes
504+
return results;
505+
}
506+
507+
if (keys.empty()) {
508+
LOG(WARNING) << "Empty keys vector provided to batchIsExist";
509+
return results; // Return empty vector
510+
}
511+
512+
std::vector<ErrorCode> exist_results;
513+
ErrorCode batch_err = client_->BatchIsExist(keys, exist_results);
514+
515+
results.resize(keys.size());
516+
517+
if (batch_err != ErrorCode::OK) {
518+
LOG(ERROR) << "BatchIsExist operation failed with error: "
519+
<< toString(batch_err);
520+
// Fill all results with error code
521+
std::fill(results.begin(), results.end(), toInt(batch_err));
522+
return results;
523+
}
524+
525+
// Convert ErrorCode results to int results
526+
for (size_t i = 0; i < keys.size(); ++i) {
527+
if (exist_results[i] == ErrorCode::OK) {
528+
results[i] = 1; // Exists
529+
} else if (exist_results[i] == ErrorCode::OBJECT_NOT_FOUND) {
530+
results[i] = 0; // Does not exist
531+
} else {
532+
results[i] = toInt(exist_results[i]); // Error
533+
}
534+
}
535+
536+
return results;
537+
}
538+
497539
int64_t DistributedObjectStore::getSize(const std::string &key) {
498540
if (!client_) {
499541
LOG(ERROR) << "Client is not initialized";
@@ -775,6 +817,10 @@ PYBIND11_MODULE(store, m) {
775817
py::call_guard<py::gil_scoped_release>())
776818
.def("is_exist", &DistributedObjectStore::isExist,
777819
py::call_guard<py::gil_scoped_release>())
820+
.def("batch_is_exist", &DistributedObjectStore::batchIsExist,
821+
py::call_guard<py::gil_scoped_release>(), py::arg("keys"),
822+
"Check if multiple objects exist. Returns list of results: 1 if "
823+
"exists, 0 if not exists, -1 if error")
778824
.def("close", &DistributedObjectStore::tearDownAll)
779825
.def("get_size", &DistributedObjectStore::getSize,
780826
py::call_guard<py::gil_scoped_release>())

mooncake-integration/store/store_py.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,14 @@ class DistributedObjectStore {
164164
*/
165165
int isExist(const std::string &key);
166166

167+
/**
168+
* @brief Check if multiple objects exist
169+
* @param keys Vector of keys to check
170+
* @return Vector of existence results: 1 if exists, 0 if not exists, -1 if
171+
* error
172+
*/
173+
std::vector<int> batchIsExist(const std::vector<std::string> &keys);
174+
167175
/**
168176
* @brief Get the size of an object
169177
* @param key Key of the object

mooncake-store/include/client.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
#pragma once
22

3+
#include <boost/functional/hash.hpp>
34
#include <memory>
45
#include <mutex>
56
#include <optional>
67
#include <string>
78
#include <vector>
8-
#include <boost/functional/hash.hpp>
99

10+
#include "ha_helper.h"
1011
#include "master_client.h"
1112
#include "rpc_service.h"
1213
#include "transfer_engine.h"
1314
#include "transfer_task.h"
1415
#include "types.h"
15-
#include "ha_helper.h"
1616

1717
namespace mooncake {
1818

@@ -190,6 +190,15 @@ class Client {
190190
*/
191191
ErrorCode IsExist(const std::string& key);
192192

193+
/**
194+
* @brief Checks if multiple objects exist
195+
* @param keys Vector of keys to check
196+
* @param exist_results Output vector of existence results for each key
197+
* @return ErrorCode indicating success/failure of the batch operation
198+
*/
199+
ErrorCode BatchIsExist(const std::vector<std::string>& keys,
200+
std::vector<ErrorCode>& exist_results);
201+
193202
private:
194203
/**
195204
* @brief Private constructor to enforce creation through Create() method

mooncake-store/include/master_client.h

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,15 @@ class MasterClient {
3838
* @param object_key Key to query
3939
* @return ErrorCode indicating exist or not
4040
*/
41-
[[nodiscard]] ExistKeyResponse ExistKey(
42-
const std::string& object_key);
41+
[[nodiscard]] ExistKeyResponse ExistKey(const std::string& object_key);
42+
43+
/**
44+
* @brief Checks if multiple objects exist
45+
* @param object_keys Vector of keys to query
46+
* @return BatchExistReponse containing existence status for each key
47+
*/
48+
[[nodiscard]] BatchExistReponse BatchExistKey(
49+
const std::vector<std::string>& object_keys);
4350

4451
/**
4552
* @brief Gets object metadata without transferring data
@@ -136,8 +143,8 @@ class MasterClient {
136143
* @param client_id The uuid of the client
137144
* @return ErrorCode indicating success/failure
138145
*/
139-
[[nodiscard]] MountSegmentResponse MountSegment(
140-
const Segment& segment, const UUID& client_id);
146+
[[nodiscard]] MountSegmentResponse MountSegment(const Segment& segment,
147+
const UUID& client_id);
141148

142149
/**
143150
* @brief Re-mount segments, invoked when the client is the first time to
@@ -157,8 +164,8 @@ class MasterClient {
157164
* @param client_id The uuid of the client
158165
* @return ErrorCode indicating success/failure
159166
*/
160-
[[nodiscard]] UnmountSegmentResponse UnmountSegment(
161-
const UUID& segment_id, const UUID& client_id);
167+
[[nodiscard]] UnmountSegmentResponse UnmountSegment(const UUID& segment_id,
168+
const UUID& client_id);
162169

163170
/**
164171
* @brief Pings master to check its availability

mooncake-store/include/master_service.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ class MasterService {
105105
*/
106106
ErrorCode ExistKey(const std::string& key);
107107

108+
std::vector<ErrorCode> BatchExistKey(const std::vector<std::string>& keys);
109+
108110
/**
109111
* @brief Fetch all keys
110112
* @return ErrorCode::OK if exists

mooncake-store/include/rpc_service.h

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,11 @@ struct PingResponse {
9797
};
9898
YLT_REFL(PingResponse, view_version, error_code)
9999

100+
struct BatchExistReponse {
101+
std::vector<ErrorCode> exist_responses;
102+
};
103+
YLT_REFL(BatchExistReponse, exist_responses)
104+
100105
constexpr uint64_t kMetricReportIntervalSeconds = 10;
101106

102107
class WrappedMasterService {
@@ -274,6 +279,15 @@ class WrappedMasterService {
274279
return response;
275280
}
276281

282+
BatchExistReponse BatchExistKey(const std::vector<std::string>& keys) {
283+
ScopedVLogTimer timer(1, "BatchExistKey");
284+
timer.LogRequest("keys_count=", keys.size());
285+
286+
BatchExistReponse response{master_service_.BatchExistKey(keys)};
287+
timer.LogResponseJson(response);
288+
return response;
289+
}
290+
277291
GetReplicaListResponse GetReplicaList(const std::string& key) {
278292
ScopedVLogTimer timer(1, "GetReplicaList");
279293
timer.LogRequest("key=", key);
@@ -462,7 +476,8 @@ class WrappedMasterService {
462476
return response;
463477
}
464478

465-
MountSegmentResponse MountSegment(const Segment& segment, const UUID& client_id) {
479+
MountSegmentResponse MountSegment(const Segment& segment,
480+
const UUID& client_id) {
466481
ScopedVLogTimer timer(1, "MountSegment");
467482
timer.LogRequest("base=", segment.base, ", size=", segment.size,
468483
", segment_name=", segment.name, ", id=", segment.id);
@@ -504,15 +519,17 @@ class WrappedMasterService {
504519
return response;
505520
}
506521

507-
UnmountSegmentResponse UnmountSegment(const UUID& segment_id, const UUID& client_id) {
522+
UnmountSegmentResponse UnmountSegment(const UUID& segment_id,
523+
const UUID& client_id) {
508524
ScopedVLogTimer timer(1, "UnmountSegment");
509525
timer.LogRequest("segment_id=", segment_id);
510526

511527
// Increment request metric
512528
MasterMetricManager::instance().inc_unmount_segment_requests();
513529

514530
UnmountSegmentResponse response;
515-
response.error_code = master_service_.UnmountSegment(segment_id, client_id);
531+
response.error_code =
532+
master_service_.UnmountSegment(segment_id, client_id);
516533

517534
// Track failures if needed
518535
if (response.error_code != ErrorCode::OK) {
@@ -581,6 +598,8 @@ inline void RegisterRpcService(
581598
&wrapped_master_service);
582599
server.register_handler<&mooncake::WrappedMasterService::Ping>(
583600
&wrapped_master_service);
601+
server.register_handler<&mooncake::WrappedMasterService::BatchExistKey>(
602+
&wrapped_master_service);
584603
}
585604

586605
} // namespace mooncake

mooncake-store/src/client.cpp

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ Client::~Client() {
4343
}
4444

4545
for (auto& segment : segments_to_unmount) {
46-
auto err_code = UnmountSegment(reinterpret_cast<void*>(segment.base),
47-
segment.size);
46+
auto err_code =
47+
UnmountSegment(reinterpret_cast<void*>(segment.base), segment.size);
4848
if (err_code != ErrorCode::OK) {
4949
LOG(ERROR) << "Failed to unmount segment: " << toString(err_code);
5050
}
@@ -148,8 +148,7 @@ ErrorCode Client::ConnectToMaster(const std::string& master_server_entry) {
148148
// Start Ping thread to monitor master view changes and remount segments
149149
// if needed
150150
ping_running_ = true;
151-
ping_thread_ =
152-
std::thread(&Client::PingThreadFunc, this);
151+
ping_thread_ = std::thread(&Client::PingThreadFunc, this);
153152

154153
return ErrorCode::OK;
155154
} else {
@@ -212,7 +211,7 @@ std::optional<std::shared_ptr<Client>> Client::Create(
212211

213212
// Initialize transfer engine
214213
err = client->InitTransferEngine(local_hostname, metadata_connstring,
215-
protocol, protocol_args);
214+
protocol, protocol_args);
216215
if (err != ErrorCode::OK) {
217216
LOG(ERROR) << "Failed to initialize transfer engine";
218217
return std::nullopt;
@@ -581,8 +580,7 @@ ErrorCode Client::MountSegment(const void* buffer, size_t size) {
581580
Segment segment(generate_uuid(), local_hostname_,
582581
reinterpret_cast<uintptr_t>(buffer), size);
583582

584-
ErrorCode err =
585-
master_client_.MountSegment(segment, client_id_).error_code;
583+
ErrorCode err = master_client_.MountSegment(segment, client_id_).error_code;
586584
if (err != ErrorCode::OK) {
587585
LOG(ERROR) << "mount_segment_to_master_failed base=" << buffer
588586
<< " size=" << size << ", error=" << err;
@@ -660,6 +658,42 @@ ErrorCode Client::IsExist(const std::string& key) {
660658
return response.error_code;
661659
}
662660

661+
ErrorCode Client::BatchIsExist(const std::vector<std::string>& keys,
662+
std::vector<ErrorCode>& exist_results) {
663+
// Check for duplicate keys
664+
std::unordered_set<std::string> seen;
665+
for (const auto& key : keys) {
666+
if (!seen.insert(key).second) {
667+
LOG(ERROR) << "Duplicate key not supported for Batch API, key: "
668+
<< key;
669+
return ErrorCode::INVALID_PARAMS;
670+
}
671+
}
672+
673+
auto response = master_client_.BatchExistKey(keys);
674+
675+
// Resize the output vector to match the number of keys
676+
exist_results.resize(keys.size());
677+
678+
// Check if we got the expected number of responses
679+
if (response.exist_responses.size() != keys.size()) {
680+
LOG(ERROR) << "BatchExistKey response size mismatch. Expected: "
681+
<< keys.size()
682+
<< ", Got: " << response.exist_responses.size();
683+
// Fill with RPC_FAIL error codes
684+
std::fill(exist_results.begin(), exist_results.end(),
685+
ErrorCode::RPC_FAIL);
686+
return ErrorCode::RPC_FAIL;
687+
}
688+
689+
// Copy the error codes from the response
690+
for (size_t i = 0; i < keys.size(); ++i) {
691+
exist_results[i] = response.exist_responses[i];
692+
}
693+
694+
return ErrorCode::OK;
695+
}
696+
663697
ErrorCode Client::TransferData(
664698
const std::vector<AllocatedBuffer::Descriptor>& handles,
665699
std::vector<Slice>& slices, TransferRequest::OpCode op_code) {

mooncake-store/src/master_client.cpp

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ ExistKeyResponse MasterClient::ExistKey(const std::string& object_key) {
3939

4040
auto request_result =
4141
client_.send_request<&WrappedMasterService::ExistKey>(object_key);
42-
std::optional<ExistKeyResponse> result = coro::syncAwait(
43-
[&]() -> coro::Lazy<std::optional<ExistKeyResponse>> {
42+
std::optional<ExistKeyResponse> result =
43+
coro::syncAwait([&]() -> coro::Lazy<std::optional<ExistKeyResponse>> {
4444
auto result = co_await co_await request_result;
4545
if (!result) {
4646
LOG(ERROR) << "Failed to check key existence: "
@@ -60,6 +60,38 @@ ExistKeyResponse MasterClient::ExistKey(const std::string& object_key) {
6060
return result.value();
6161
}
6262

63+
BatchExistReponse MasterClient::BatchExistKey(
64+
const std::vector<std::string>& object_keys) {
65+
ScopedVLogTimer timer(1, "MasterClient::BatchExistKey");
66+
timer.LogRequest("keys_count=", object_keys.size());
67+
68+
auto request_result =
69+
client_.send_request<&WrappedMasterService::BatchExistKey>(object_keys);
70+
std::optional<BatchExistReponse> result =
71+
coro::syncAwait([&]() -> coro::Lazy<std::optional<BatchExistReponse>> {
72+
auto result = co_await co_await request_result;
73+
if (!result) {
74+
LOG(ERROR) << "Failed to check batch key existence: "
75+
<< result.error().msg;
76+
co_return std::nullopt;
77+
}
78+
co_return result->result();
79+
}());
80+
81+
if (!result) {
82+
BatchExistReponse response;
83+
response.exist_responses.resize(object_keys.size());
84+
for (auto& exist_response : response.exist_responses) {
85+
exist_response = ErrorCode::RPC_FAIL;
86+
}
87+
timer.LogResponseJson(response);
88+
return response;
89+
}
90+
91+
timer.LogResponseJson(result.value());
92+
return result.value();
93+
}
94+
6395
GetReplicaListResponse MasterClient::GetReplicaList(
6496
const std::string& object_key) {
6597
ScopedVLogTimer timer(1, "MasterClient::GetReplicaList");
@@ -315,7 +347,7 @@ RemoveAllResponse MasterClient::RemoveAll() {
315347
auto result = co_await co_await request_result;
316348
if (!result) {
317349
LOG(ERROR) << "Failed to remove all objects: "
318-
<< result.error().msg;
350+
<< result.error().msg;
319351
co_return std::nullopt;
320352
}
321353
co_return result->result();
@@ -363,7 +395,8 @@ MountSegmentResponse MasterClient::MountSegment(const Segment& segment,
363395
ReMountSegmentResponse MasterClient::ReMountSegment(
364396
const std::vector<Segment>& segments, const UUID& client_id) {
365397
ScopedVLogTimer timer(1, "MasterClient::ReMountSegment");
366-
timer.LogRequest("segments_num=", segments.size(), ", client_id=", client_id);
398+
timer.LogRequest("segments_num=", segments.size(),
399+
", client_id=", client_id);
367400

368401
std::optional<ReMountSegmentResponse> result =
369402
syncAwait([&]() -> coro::Lazy<std::optional<ReMountSegmentResponse>> {
@@ -431,7 +464,8 @@ PingResponse MasterClient::Ping(const UUID& client_id) {
431464
}());
432465

433466
if (!result) {
434-
auto response = PingResponse{0, ClientStatus::UNDEFINED, ErrorCode::RPC_FAIL};
467+
auto response =
468+
PingResponse{0, ClientStatus::UNDEFINED, ErrorCode::RPC_FAIL};
435469
timer.LogResponseJson(response);
436470
return response;
437471
}

0 commit comments

Comments
 (0)