diff --git a/bench/python/dlslime_custom_sendrecv_bench.py b/bench/python/dlslime_custom_sendrecv_bench.py index 1ed5181..1d01cb4 100644 --- a/bench/python/dlslime_custom_sendrecv_bench.py +++ b/bench/python/dlslime_custom_sendrecv_bench.py @@ -33,7 +33,7 @@ def setup_rdma_connection(args): """Establish RDMA connection between sender and receiver""" print(f'Initializing RDMA endpoint for {args.mode}...') - num_qp = 4 + num_qp = 1 end_point = _slime_c.rdma_endpoint(args.device, args.rdmaport, args.type, num_qp) zmq_ctx = zmq.Context() diff --git a/bench/python/dlslime_torch_dist_sendrecv_bench.py b/bench/python/dlslime_torch_dist_sendrecv_bench.py index 31ae117..7b121a0 100644 --- a/bench/python/dlslime_torch_dist_sendrecv_bench.py +++ b/bench/python/dlslime_torch_dist_sendrecv_bench.py @@ -17,6 +17,7 @@ print(e, "please install dlslime backend first") exit() + def benchmark_send_recv(args): # Initialize process group print("Initialize process group") @@ -38,11 +39,11 @@ def benchmark_send_recv(args): if args.sizes: sizes = [int(s) for s in args.sizes] else: - sizes = [2**n for n in range(11, 26)] # 256B to 256MB + sizes = [2**n for n in range(11, 12)] # 256B to 256MB print("Prepare data sizes: ", sizes) benchmark_data = [] - num = 2 + num = 1 print("Start to test the bench") for size in sizes: num_elements = max(1, size // 4) @@ -50,6 +51,7 @@ def benchmark_send_recv(args): torch.ones(num_elements, device=device, dtype=torch.float32) for _ in range(num) ] + # print(send_batch[0]) recv_batch = [ torch.zeros(num_elements, device=device, dtype=torch.float32) for _ in range(num) @@ -57,8 +59,8 @@ def benchmark_send_recv(args): if args.use_gpu: torch.cuda.synchronize() - - for _ in range(25): + + for _ in range(args.iterations): all_work = [] reqs = [] for i in range(num): @@ -76,7 +78,6 @@ def benchmark_send_recv(args): if args.use_gpu: torch.cuda.synchronize() start_time = time.time() - for _ in range(args.iterations): all_work = [] for i in range(num): @@ -84,7 +85,7 @@ def benchmark_send_recv(args): send_op = dist.isend(send_batch[i], dst=1, group=slime_group) all_work.extend([send_op]) else: - recv_op = dist.irecv(recv_batch[i], src=0 ,group=slime_group) + recv_op = dist.irecv(recv_batch[i], src=0, group=slime_group) all_work.extend([recv_op]) [w.wait() for w in all_work] diff --git a/compile_commands.json b/compile_commands.json new file mode 120000 index 0000000..25eb4b2 --- /dev/null +++ b/compile_commands.json @@ -0,0 +1 @@ +build/compile_commands.json \ No newline at end of file diff --git a/csrc/engine/rdma/memory_pool.cpp b/csrc/engine/rdma/memory_pool.cpp index f78d494..7a9f347 100644 --- a/csrc/engine/rdma/memory_pool.cpp +++ b/csrc/engine/rdma/memory_pool.cpp @@ -20,7 +20,6 @@ int RDMAMemoryPool::register_memory_region(const std::string& mr_key, uintptr_t /* MemoryRegion Access Right = 777 */ const static int access_rights = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ; ibv_mr* mr = ibv_reg_mr(pd_, (void*)data_ptr, length, access_rights); - SLIME_ASSERT(mr, " Failed to register memory " << data_ptr); SLIME_LOG_DEBUG("Memory region: " << mr_key << ", " << (void*)data_ptr << " -- " << (void*)(data_ptr + length) diff --git a/csrc/engine/rdma/memory_pool.h b/csrc/engine/rdma/memory_pool.h index 1419038..d6f3461 100644 --- a/csrc/engine/rdma/memory_pool.h +++ b/csrc/engine/rdma/memory_pool.h @@ -23,8 +23,8 @@ typedef struct remote_mr { remote_mr(uintptr_t addr, size_t length, uint32_t rkey): addr(addr), length(length), rkey(rkey) {} uintptr_t addr{(uintptr_t) nullptr}; - size_t length{}; - uint32_t rkey{}; + size_t length{0}; + uint32_t rkey{0}; } remote_mr_t; class RDMAMemoryPool { @@ -34,9 +34,13 @@ class RDMAMemoryPool { ~RDMAMemoryPool() { + std::cout << mrs_.size() << std::endl; for (auto& mr : mrs_) { + std::cout << "mr: " << mr.first << std::endl; if (mr.second) ibv_dereg_mr(mr.second); + + std::cout << "!!!!! " << std::endl; } mrs_.clear(); } @@ -51,9 +55,10 @@ class RDMAMemoryPool { inline struct ibv_mr* get_mr(const std::string& mr_key) { std::unique_lock lock(mrs_mutex_); - if (mrs_.find(mr_key) != mrs_.end()) + if (mrs_.find(mr_key) != mrs_.end()) { + SLIME_LOG_DEBUG("mr_key: ", mr_key, " is found in mrs_"); return mrs_[mr_key]; - + } SLIME_LOG_WARN("mr_key: ", mr_key, "not found in mrs_"); return nullptr; } @@ -61,9 +66,10 @@ class RDMAMemoryPool { inline remote_mr_t get_remote_mr(const std::string& mr_key) { std::unique_lock lock(remote_mrs_mutex_); - if (remote_mrs_.find(mr_key) != remote_mrs_.end()) + if (remote_mrs_.find(mr_key) != remote_mrs_.end()) { + SLIME_LOG_DEBUG("mr_key: ", mr_key, " is found in remote_mrs_"); return remote_mrs_[mr_key]; - + } SLIME_LOG_WARN("mr_key: ", mr_key, " not found in remote_mrs_"); return remote_mr_t(); } diff --git a/csrc/engine/rdma/rdma_assignment.h b/csrc/engine/rdma/rdma_assignment.h index e443ac0..0ec943c 100644 --- a/csrc/engine/rdma/rdma_assignment.h +++ b/csrc/engine/rdma/rdma_assignment.h @@ -43,8 +43,9 @@ typedef struct callback_info { callback_info() = default; callback_info(OpCode opcode, size_t batch_size, callback_fn_t callback): opcode_(opcode), batch_size_(batch_size) { - if (callback) + if (callback) { callback_ = std::move(callback); + } } callback_fn_t callback_{[this](int code, int imm_data) { diff --git a/csrc/engine/rdma/rdma_buffer.cpp b/csrc/engine/rdma/rdma_buffer.cpp index ae0b532..7e4065f 100644 --- a/csrc/engine/rdma/rdma_buffer.cpp +++ b/csrc/engine/rdma/rdma_buffer.cpp @@ -1,26 +1,27 @@ #include "engine/rdma/rdma_buffer.h" +#include "engine/assignment.h" namespace slime { void RDMABuffer::send() { - endpoint_->addSendTask(shared_from_this()); + endpoint_->addRDMABuffer(OpCode::SEND, shared_from_this()); } void RDMABuffer::recv() { - endpoint_->addRecvTask(shared_from_this()); + endpoint_->addRDMABuffer(OpCode::RECV, shared_from_this()); } -void RDMABuffer::send_done_callback() +void RDMABuffer::sendDoneCallback() { std::unique_lock lock(send_mutex_); ++send_completed_; send_cv_.notify_all(); } -void RDMABuffer::recv_done_callback() +void RDMABuffer::recvDoneCallback() { std::unique_lock lock(recv_mutex_); ++recv_completed_; @@ -31,25 +32,26 @@ bool RDMABuffer::waitSend() { std::unique_lock lock(send_mutex_); - if (send_completed_) - return send_completed_; - + if (send_completed_) { + return true; + } send_cv_.wait(lock, [this]() { return send_completed_ > 0; }); send_pending_ = false; SLIME_LOG_INFO("complete to send the data."); - return send_completed_; + return true; } bool RDMABuffer::waitRecv() { std::unique_lock lock(recv_mutex_); - if (recv_completed_) - return recv_completed_; + if (recv_completed_) { + return true; + } recv_cv_.wait(lock, [this]() { return recv_completed_ > 0; }); recv_pending_ = false; SLIME_LOG_INFO("complete to send the data."); - return recv_completed_; + return true; } } // namespace slime diff --git a/csrc/engine/rdma/rdma_buffer.h b/csrc/engine/rdma/rdma_buffer.h index fede1b4..a67c4ba 100644 --- a/csrc/engine/rdma/rdma_buffer.h +++ b/csrc/engine/rdma/rdma_buffer.h @@ -4,6 +4,7 @@ #include "engine/rdma/rdma_endpoint.h" #include +#include #include #include #include @@ -13,6 +14,7 @@ #include #include +#include "logging.h" #include "rdma_common.h" namespace slime { @@ -23,36 +25,43 @@ class RDMABuffer: public std::enable_shared_from_this { friend class RDMAEndpoint; public: - RDMABuffer(std::shared_ptr endpoint, storage_view_batch_t& batch): - endpoint_(endpoint), storage_view_batch_(std::move(batch)) + RDMABuffer(std::shared_ptr endpoint, uintptr_t ptr, size_t offset, size_t data_size): + endpoint_(endpoint), ptr_(ptr), offset_(offset), data_size_(data_size) { + SLIME_LOG_DEBUG("New RDMABuffer and the Index of Current Buffer is: ", buffer_counter_); + buffer_counter_ += 1; } - RDMABuffer(std::shared_ptr endpoint, - std::vector ptrs, - std::vector offset, - std::vector data_size) + RDMABuffer(std::shared_ptr endpoint, storage_view_batch_t& batch): + endpoint_(endpoint), storage_view_batch_(std::move(batch)) { - - batch_size_ = ptrs.size(); - ptrs_ = ptrs; - offset_ = offset; - data_size_ = data_size; - for (uint32_t i = 0; i < batch_size_; ++i) { - storage_view_t view{.data_ptr = ptrs[i], .storage_offset = offset[i], .length = data_size[i]}; - storage_view_batch_.push_back(view); - } - endpoint_ = endpoint; } + // RDMABuffer(std::shared_ptr endpoint, + // std::vector ptrs, + // std::vector offset, + // std::vector data_size) + // { + + // batch_size_ = ptrs.size(); + // ptrs_ = ptrs; + // offset_ = offset; + // data_size_ = data_size; + // for (uint32_t i = 0; i < batch_size_; ++i) { + // storage_view_t view{.data_ptr = ptrs[i], .storage_offset = offset[i], .length = data_size[i]}; + // storage_view_batch_.push_back(view); + // } + // endpoint_ = endpoint; + // } + ~RDMABuffer() = default; - const size_t batchSize() + const size_t batch_size() { return storage_view_batch_.size(); } - const storage_view_batch_t& storageViewBatch() + const storage_view_batch_t& view_batch() { return storage_view_batch_; } @@ -63,20 +72,20 @@ class RDMABuffer: public std::enable_shared_from_this { bool waitSend(); bool waitRecv(); - void send_done_callback(); - void recv_done_callback(); - - void get_time(); + void sendDoneCallback(); + void recvDoneCallback(); private: - storage_view_batch_t storage_view_batch_; - std::shared_ptr endpoint_; - std::vector ptrs_; - std::vector offset_; - std::vector data_size_; + uintptr_t ptr_; + size_t offset_; + size_t data_size_; - size_t batch_size_; + std::vector ptrs_batch_; + std::vector offset_batch_; + std::vector data_size_batch_; + + storage_view_batch_t storage_view_batch_; std::atomic send_pending_{0}; std::atomic recv_pending_{0}; @@ -89,6 +98,8 @@ class RDMABuffer: public std::enable_shared_from_this { std::mutex send_mutex_; std::mutex recv_mutex_; + + static inline size_t buffer_counter_{0}; }; } // namespace slime diff --git a/csrc/engine/rdma/rdma_context.cpp b/csrc/engine/rdma/rdma_context.cpp index f60357e..4c80f30 100644 --- a/csrc/engine/rdma/rdma_context.cpp +++ b/csrc/engine/rdma/rdma_context.cpp @@ -389,7 +389,6 @@ void split_assign_by_max_length(OpCode opcode, AssignmentBatch& batch_split_after_max_length, size_t max_length) { - // split assignment by length for (size_t i = 0; i < batch.size(); ++i) { if (batch[i].length < max_length) { batch_split_after_max_length.push_back(std::move(batch[i])); @@ -434,7 +433,6 @@ RDMAContext::submit(OpCode opcode, AssignmentBatch& batch, callback_fn_t callbac size_t length = SLIME_MAX_LENGTH_PER_ASSIGNMENT; AssignmentBatch batch_split; split_assign_by_max_length(opcode, batch, batch_split, length); - AssignmentBatch batch_after_agg_qp; while (batch_split.size() < SLIME_AGG_QP_NUM) { length = length / 2; diff --git a/csrc/engine/rdma/rdma_context.h b/csrc/engine/rdma/rdma_context.h index ae92abe..f8fd888 100644 --- a/csrc/engine/rdma/rdma_context.h +++ b/csrc/engine/rdma/rdma_context.h @@ -92,7 +92,7 @@ class RDMAContext { if (pd_) ibv_dealloc_pd(pd_); - + if (ib_ctx_) ibv_close_device(ib_ctx_); diff --git a/csrc/engine/rdma/rdma_endpoint.cpp b/csrc/engine/rdma/rdma_endpoint.cpp index a12443e..d82039f 100644 --- a/csrc/engine/rdma/rdma_endpoint.cpp +++ b/csrc/engine/rdma/rdma_endpoint.cpp @@ -4,298 +4,595 @@ #include "engine/rdma/rdma_buffer.h" #include "engine/rdma/rdma_common.h" #include "engine/rdma/rdma_context.h" + #include "logging.h" #include +#include +#include +#include +#include #include #include #include namespace slime { -RDMATask::RDMATask(std::shared_ptr endpoint, - uint32_t task_id, - OpCode opcode, - std::shared_ptr buffer): - endpoint_(endpoint), slot_id_(task_id), opcode_(opcode), buffer_(buffer) +RDMAEndpoint::RDMAEndpoint(const std::string& dev_name, size_t ib_port, const std::string& link_type, size_t qp_nums) { - registerDataMemoryRegion(); - fillBuffer(); + + // RDMAContext: submit the RDMA assignment + SLIME_LOG_INFO("Init the Contexts and RDMA Devices..."); + data_ctx_ = std::make_shared(qp_nums, 0); + meta_ctx_ = std::make_shared(1, 0); + + data_ctx_->init(dev_name, ib_port, link_type); + meta_ctx_->init(dev_name, ib_port, link_type); + + data_ctx_qp_num_ = data_ctx_->qp_list_len_; + meta_ctx_qp_num_ = meta_ctx_->qp_list_len_; + + SLIME_LOG_INFO("The QP number of data plane is: ", data_ctx_qp_num_); + SLIME_LOG_INFO("The QP number of control plane is: ", meta_ctx_qp_num_); + SLIME_LOG_INFO("RDMA Endpoint Init Success..."); + + SLIME_LOG_INFO("Allocate MR Buffer and Dummy Buffer"); + meta_buffer_.resize(SLIME_META_BUFFER_SIZE * 2); + memset(meta_buffer_.data(), 0, meta_buffer_.size() * sizeof(meta_data_t)); + meta_ctx_->register_memory_region( + "meta_buffer", reinterpret_cast(meta_buffer_.data()), sizeof(meta_data_t) * meta_buffer_.size()); + + dum_meta_buffer_.resize(SLIME_DUMMY_BUFFER_SIZE); + memset(dum_meta_buffer_.data(), 0, dum_meta_buffer_.size() * sizeof(uint32_t)); + meta_ctx_->register_memory_region("dum_meta_buffer", + reinterpret_cast(dum_meta_buffer_.data()), + sizeof(uint32_t) * dum_meta_buffer_.size()); + + dum_data_buffer_.resize(SLIME_DUMMY_BUFFER_SIZE); + memset(dum_data_buffer_.data(), 0, dum_data_buffer_.size() * sizeof(uint32_t)); + data_ctx_->register_memory_region("dum_data_buffer", + reinterpret_cast(dum_data_buffer_.data()), + sizeof(uint32_t) * dum_data_buffer_.size()); + + SLIME_LOG_INFO("The MR Buffer and Dummy Buffer Allocate Success..."); + + SLIME_LOG_INFO("Construct the pre-submit queue"); + meta_slots_manager_ = std::make_unique(SLIME_STATUS_SLOT_SIZE); + data_slots_manager_ = std::make_unique(SLIME_STATUS_SLOT_SIZE); } -RDMATask::~RDMATask() {} +void RDMAEndpoint::connect(const json& data_ctx_info, const json& meta_ctx_info) +{ -std::string RDMATask::getDataKey(int32_t idx) + SLIME_LOG_INFO("Lauch the RDMAConstex for DATA and META") + data_ctx_->connect(data_ctx_info); + meta_ctx_->connect(meta_ctx_info); + + data_ctx_->launch_future(); + meta_ctx_->launch_future(); + + for (size_t i = 0; i < SLIME_STATUS_SLOT_SIZE; i += 1) { + addPreQueueElement(OpCode::SEND); + addPreQueueElement(OpCode::RECV); + } + proxyInit(); +} + +void RDMAEndpoint::proxyInit() { - std::string data_prefix = opcode_ == OpCode::SEND ? "DATA_SEND" : "DATA_RECV"; - storage_view_batch_t storage_view_batch = buffer_->storageViewBatch(); - return data_prefix + "@" + std::to_string(storage_view_batch[idx].data_ptr) + "_" - + std::to_string(storage_view_batch[idx].length) + "_" + std::to_string(idx); + SLIME_LOG_INFO("Starting all RDMA endpoint threads..."); + + stop_meta_recv_queue_thread_.store(false, std::memory_order_release); + stop_data_recv_queue_thread_.store(false, std::memory_order_release); + stop_send_buffer_queue_thread_.store(false, std::memory_order_release); + stop_recv_buffer_queue_thread_.store(false, std::memory_order_release); + stop_wait_send_finish_queue_thread_.store(false, std::memory_order_release); + // stop_wait_recv_finish_queue_thread_.store(false, std::memory_order_release); + + meta_recv_thread_ = std::thread([this]() { this->metaRecvQueueThread(std::chrono::milliseconds(0)); }); + data_recv_thread_ = std::thread([this]() { this->dataRecvQueueThread(std::chrono::milliseconds(0)); }); + send_buffer_thread_ = std::thread([this]() { this->SendBufferQueueThread(std::chrono::milliseconds(0)); }); + recv_buffer_thread_ = std::thread([this]() { this->RecvBufferQueueThread(std::chrono::milliseconds(0)); }); + send_finish_thread_ = std::thread([this]() { this->SendFinishQueueThread(std::chrono::milliseconds(0)); }); + // recv_finish_thread_ = std::thread([this]() { this->RecvFinishQueueThread(std::chrono::milliseconds(0)); }); + + SLIME_LOG_INFO("All Proxy Threads Started Successfully"); } -AssignmentBatch RDMATask::getMetaAssignmentBatch() +void RDMAEndpoint::proxyDestroy() { - size_t meta_buffer_idx = slot_id_ % MAX_META_BUFFER_SIZE; - switch (opcode_) { - case OpCode::SEND: { - return AssignmentBatch{Assignment("meta_buffer", - meta_buffer_idx * sizeof(meta_data_t), - meta_buffer_idx * sizeof(meta_data_t), - sizeof(meta_data_t))}; - } - case OpCode::RECV: { - return AssignmentBatch{ - Assignment("meta_buffer", - meta_buffer_idx * sizeof(meta_data_t), - MAX_META_BUFFER_SIZE * sizeof(meta_data_t) + meta_buffer_idx * sizeof(meta_data_t), - sizeof(meta_data_t))}; - } - default: - SLIME_ABORT("Unknown Opcode"); + SLIME_LOG_INFO("Stopping all RDMA endpoint threads..."); + + stop_meta_recv_queue_thread_.store(true, std::memory_order_release); + stop_data_recv_queue_thread_.store(true, std::memory_order_release); + stop_send_buffer_queue_thread_.store(true, std::memory_order_release); + stop_recv_buffer_queue_thread_.store(true, std::memory_order_release); + stop_wait_send_finish_queue_thread_.store(true, std::memory_order_release); + // stop_wait_recv_finish_queue_thread_.store(true, std::memory_order_release); + + if (meta_recv_thread_.joinable()) { + meta_recv_thread_.join(); + SLIME_LOG_INFO("Meta recv thread stopped"); + } + if (data_recv_thread_.joinable()) { + data_recv_thread_.join(); + SLIME_LOG_INFO("Data recv thread stopped"); } + + if (send_buffer_thread_.joinable()) { + send_buffer_thread_.join(); + SLIME_LOG_INFO("Send buffer thread stopped"); + } + + if (recv_buffer_thread_.joinable()) { + recv_buffer_thread_.join(); + SLIME_LOG_INFO("Recv buffer thread stopped"); + } + + if (send_finish_thread_.joinable()) { + send_finish_thread_.join(); + SLIME_LOG_INFO("Send finish thread stopped"); + } + + // if (recv_finish_thread_.joinable()) { + // recv_finish_thread_.join(); + // SLIME_LOG_INFO("Recv finish thread stopped"); + // } + + SLIME_LOG_INFO("All Proxy Threads Stopped Successfully"); } -AssignmentBatch RDMATask::getDataAssignmentBatch() +RDMAEndpoint::~RDMAEndpoint() { - AssignmentBatch batch; - storage_view_batch_t storage_view_batch = buffer_->storageViewBatch(); - for (int i = 0; i < buffer_->batchSize(); ++i) { - batch.push_back(Assignment(getDataKey(i), 0, 0, storage_view_batch[i].length)); + try { + proxyDestroy(); + data_ctx_->stop_future(); + meta_ctx_->stop_future(); + SLIME_LOG_INFO("RDMAEndpoint destroyed successfully"); + } + catch (const std::exception& e) { + SLIME_LOG_ERROR("Exception in RDMAEndpoint destructor: ", e.what()); } - return batch; } -int RDMATask::registerDataMemoryRegion() +void RDMAEndpoint::addPreQueueElement(OpCode rdma_opcode) { - auto mr_is_exist = endpoint_->dataCtx()->get_mr(getDataKey(0)); + if (rdma_opcode == OpCode::SEND) { + RDMAPrePostQueueElement meta_recv_queue_element = + RDMAPrePostQueueElement(unique_meta_recv_id_.load(std::memory_order_relaxed), OpCode::SEND); + uint32_t idx = meta_recv_queue_element.unique_id_; + auto is_finish_ptr = meta_recv_queue_element.is_finished_ptr_; - if (mr_is_exist != nullptr) { + auto meta_recv_callback = [idx, is_finish_ptr](int status, int slot_id) mutable { + is_finish_ptr->store(true, std::memory_order_release); + }; - return 0; + AssignmentBatch assignment_batch_ = AssignmentBatch{Assignment("dum_meta_buffer", 0, 0, 16 * sizeof(uint32_t))}; + meta_ctx_->submit(OpCode::RECV, assignment_batch_, meta_recv_callback, RDMAContext::UNDEFINED_QPI, idx); + + meta_recv_queue_.enqueue(meta_recv_queue_element); + unique_meta_recv_id_.fetch_add(1, std::memory_order_relaxed); + } + else if (rdma_opcode == OpCode::RECV) { + + RDMAPrePostQueueElement data_recv_queue_element = + RDMAPrePostQueueElement(unique_data_recv_id_.load(std::memory_order_relaxed), OpCode::RECV); + + uint32_t idx = data_recv_queue_element.unique_id_; + auto is_finish_ptr = data_recv_queue_element.is_finished_ptr_; + auto data_recv_callback = [idx, is_finish_ptr](int status, int slot_id) mutable { + is_finish_ptr->store(true, std::memory_order_release); + }; + AssignmentBatch assignment_batch_ = AssignmentBatch{Assignment("dum_data_buffer", 0, 0, 16 * sizeof(uint32_t))}; + data_ctx_->submit(OpCode::RECV, assignment_batch_, data_recv_callback, RDMAContext::UNDEFINED_QPI, idx); + + data_recv_queue_.enqueue(data_recv_queue_element); + data_slots_manager_->acquireSlot(idx); + unique_data_recv_id_.fetch_add(1, std::memory_order_relaxed); } else { - storage_view_batch_t storage_view_batch = buffer_->storageViewBatch(); - for (int i = 0; i < buffer_->batchSize(); ++i) { - endpoint_->dataCtx()->register_memory_region( - getDataKey(i), storage_view_batch[i].data_ptr, storage_view_batch[i].length); - } - return 0; + SLIME_LOG_ERROR("Undefined OPCODE IN RDMAEndpoint::addPreQueueElement"); } } -void RDMATask::fillBuffer() +void RDMAEndpoint::addRDMABuffer(OpCode rdma_opcode, std::shared_ptr rdma_buffer) { - if (opcode_ == OpCode::SEND) { - std::vector& meta_buf = endpoint_->getMetaBuffer(); + + if (rdma_opcode == OpCode::SEND) { + uint32_t cur_idx = unique_SEND_SLOT_ID_.load(std::memory_order_relaxed); + RDMABufferQueueElement buffer_element = RDMABufferQueueElement(cur_idx, OpCode::SEND, rdma_buffer); + send_buffer_queue_.enqueue(buffer_element); + unique_SEND_SLOT_ID_.fetch_add(1, std::memory_order_relaxed); } - else if (opcode_ == OpCode::RECV) { - std::vector& meta_buf = endpoint_->getMetaBuffer(); - for (size_t i = 0; i < buffer_->batchSize(); ++i) { - auto mr = endpoint_->dataCtx()->get_mr(getDataKey(i)); - meta_buf[MAX_META_BUFFER_SIZE + slot_id_ % MAX_META_BUFFER_SIZE].mr_addr[i] = - reinterpret_cast(mr->addr); - meta_buf[MAX_META_BUFFER_SIZE + slot_id_ % MAX_META_BUFFER_SIZE].mr_rkey[i] = mr->rkey; - meta_buf[MAX_META_BUFFER_SIZE + slot_id_ % MAX_META_BUFFER_SIZE].mr_size[i] = mr->length; - } - meta_buf[MAX_META_BUFFER_SIZE + slot_id_ % MAX_META_BUFFER_SIZE].mr_slot = slot_id_; - meta_buf[MAX_META_BUFFER_SIZE + slot_id_ % MAX_META_BUFFER_SIZE].mr_qpidx = - slot_id_ % endpoint_->dataCtxQPNum(); + else if (rdma_opcode == OpCode::RECV) { + uint32_t cur_idx = unique_RECV_SLOT_ID_.load(std::memory_order_relaxed); + RDMABufferQueueElement buffer_element = RDMABufferQueueElement(cur_idx, OpCode::RECV, rdma_buffer); + recv_buffer_queue_.enqueue(buffer_element); + unique_RECV_SLOT_ID_.fetch_add(1, std::memory_order_relaxed); } else { - SLIME_LOG_ERROR("Unsupported opcode in RDMATask::fillBuffer()"); + SLIME_LOG_ERROR("Undefined OPCODE IN RDMAEndpoint::addRDMABuffer"); } } -int RDMATask::registerRemoteDataMemoryRegion() +void RDMAEndpoint::postMetaWrite(uint32_t idx, std::shared_ptr rdma_buffer) { - auto mr_is_exist = endpoint_->dataCtx()->get_remote_mr(getDataKey(0)); - if (mr_is_exist.addr == 0) { - std::vector& meta_buf = endpoint_->getMetaBuffer(); - for (size_t i = 0; i < buffer_->batchSize(); ++i) { - uint64_t addr = meta_buf[slot_id_ % MAX_META_BUFFER_SIZE].mr_addr[i]; - uint32_t length = meta_buf[slot_id_ % MAX_META_BUFFER_SIZE].mr_size[i]; - uint32_t rkey = meta_buf[slot_id_ % MAX_META_BUFFER_SIZE].mr_rkey[i]; - endpoint_->dataCtx()->register_remote_memory_region(getDataKey(i), addr, length, rkey); - } - return 0; + + std::string prefix = "DATA_RECV_"; + std::string MR_KEY = prefix + std::to_string(idx); + auto mr_is_exist = data_ctx_->get_mr(MR_KEY); + + if (mr_is_exist != nullptr) { + SLIME_LOG_DEBUG("The RECV DATA MR has been REGISTERED! The SLOT_ID is: ", idx); } else { - return 0; + data_ctx_->register_memory_region(MR_KEY, rdma_buffer->ptr_, rdma_buffer->data_size_); } -} -RDMAEndpoint::RDMAEndpoint(const std::string& dev_name, uint8_t ib_port, const std::string& link_type, size_t qp_num) -{ - SLIME_LOG_INFO("Init the Contexts and RDMA Devices..."); - - data_ctx_ = std::make_shared(qp_num, 0); - meta_ctx_ = std::make_shared(1, 0); + auto mr = data_ctx_->get_mr(MR_KEY); - data_ctx_->init(dev_name, ib_port, link_type); - meta_ctx_->init(dev_name, ib_port, link_type); + meta_buffer_[SLIME_META_BUFFER_SIZE + idx % SLIME_META_BUFFER_SIZE].mr_addr = reinterpret_cast(mr->addr); + meta_buffer_[SLIME_META_BUFFER_SIZE + idx % SLIME_META_BUFFER_SIZE].mr_rkey = mr->rkey; + meta_buffer_[SLIME_META_BUFFER_SIZE + idx % SLIME_META_BUFFER_SIZE].mr_size = mr->length; + meta_buffer_[SLIME_META_BUFFER_SIZE + idx % SLIME_META_BUFFER_SIZE].mr_slot = idx; - data_ctx_qp_num_ = data_ctx_->qp_list_len_; - meta_ctx_qp_num_ = meta_ctx_->qp_list_len_; - SLIME_LOG_INFO("The QP number of data plane is: ", data_ctx_qp_num_); - SLIME_LOG_INFO("The QP number of control plane is: ", meta_ctx_qp_num_); - SLIME_LOG_INFO("RDMA Endpoint Init Success and Launch the RDMA Endpoint Task Threads..."); + size_t meta_buffer_idx = idx % SLIME_META_BUFFER_SIZE; + AssignmentBatch meta_assignment_batch_ = + AssignmentBatch{Assignment("meta_buffer", + meta_buffer_idx * sizeof(meta_data_t), + SLIME_META_BUFFER_SIZE * sizeof(meta_data_t) + meta_buffer_idx * sizeof(meta_data_t), + sizeof(meta_data_t))}; - const size_t max_meta_buffer_size = MAX_META_BUFFER_SIZE * 2; - meta_buffer_.reserve(max_meta_buffer_size); - memset(meta_buffer_.data(), 0, meta_buffer_.size() * sizeof(meta_data_t)); - meta_ctx_->register_memory_region( - "meta_buffer", reinterpret_cast(meta_buffer_.data()), sizeof(meta_data_t) * max_meta_buffer_size); + auto meta_callback = [idx](int status, int slot_id) { + SLIME_LOG_DEBUG("The META RECV IS SUCCESS FOR SLOT: ", idx); + }; + meta_ctx_->submit(OpCode::WRITE_WITH_IMM, meta_assignment_batch_, meta_callback, RDMAContext::UNDEFINED_QPI, idx); } -RDMAEndpoint::RDMAEndpoint(const std::string& data_dev_name, - const std::string& meta_dev_name, - uint8_t ib_port, - const std::string& link_type, - size_t qp_num) +void RDMAEndpoint::postDataWrite(RDMABufferQueueElement& element, std::shared_ptr rdma_buffer) { - SLIME_LOG_INFO("Init the Contexts and RDMA Devices..."); - data_ctx_ = std::make_shared(qp_num, 0); - meta_ctx_ = std::make_shared(1, 0); - - data_ctx_->init(data_dev_name, ib_port, link_type); - meta_ctx_->init(meta_dev_name, ib_port, link_type); - - data_ctx_qp_num_ = data_ctx_->qp_list_len_; - meta_ctx_qp_num_ = meta_ctx_->qp_list_len_; - SLIME_LOG_INFO("The QP number of data plane is: ", data_ctx_qp_num_); - SLIME_LOG_INFO("The QP number of control plane is: ", meta_ctx_qp_num_); - SLIME_LOG_INFO("RDMA Endpoint Init Success and Launch the RDMA Endpoint Task Threads..."); + uint64_t addr; + uint32_t size; + uint32_t rkey; + uint32_t idx = element.unique_id_; + std::string prefix = "DATA_SEND_"; + std::string MR_KEY = prefix + std::to_string(idx); + auto mr_is_exist = data_ctx_->get_mr(MR_KEY); + if (mr_is_exist != nullptr) { + SLIME_LOG_DEBUG("The SEND DATA MR has been REGISTERED! The SLOT_ID is: ", unique_SEND_SLOT_ID_); + } + else { + data_ctx_->register_memory_region(MR_KEY, rdma_buffer->ptr_, rdma_buffer->data_size_); + } - const size_t max_meta_buffer_size = MAX_META_BUFFER_SIZE * 2; - meta_buffer_.reserve(max_meta_buffer_size); - memset(meta_buffer_.data(), 0, meta_buffer_.size() * sizeof(meta_data_t)); - meta_ctx_->register_memory_region( - "meta_buffer", reinterpret_cast(meta_buffer_.data()), sizeof(meta_data_t) * max_meta_buffer_size); -} + auto mr_remote = data_ctx_->get_remote_mr(MR_KEY); + if (mr_remote.rkey == 0) { -RDMAEndpoint::~RDMAEndpoint() -{ - { - std::unique_lock lock(rdma_tasks_mutex_); - RDMA_tasks_threads_running_ = false; + addr = meta_buffer_[idx % SLIME_META_BUFFER_SIZE].mr_addr; + size = meta_buffer_[idx % SLIME_META_BUFFER_SIZE].mr_size; + rkey = meta_buffer_[idx % SLIME_META_BUFFER_SIZE].mr_rkey; + data_ctx_->register_remote_memory_region(MR_KEY, addr, size, rkey); } - rdma_tasks_cv_.notify_all(); + else { + SLIME_LOG_DEBUG("The REMOTE DATA MR has been REGISTERED! The SLOT_ID is: ", unique_SEND_SLOT_ID_); + } - if (rdma_tasks_threads_.joinable()) - rdma_tasks_threads_.join(); -} + AssignmentBatch batch = AssignmentBatch{Assignment(MR_KEY, 0, 0, size)}; -void RDMAEndpoint::connect(const json& data_ctx_info, const json& meta_ctx_info) -{ - data_ctx_->connect(data_ctx_info); - meta_ctx_->connect(meta_ctx_info); + // std::cout << "POST DATA WRITE" << std::endl; + // std::cout << "addr: " << addr << std::endl; + // std::cout << "size: " << size << std::endl; + // std::cout << "rkey: " << rkey << std::endl; - data_ctx_->launch_future(); - meta_ctx_->launch_future(); + auto is_finish_ptr = element.is_finished_ptr_; + is_finish_ptr->store(false, std::memory_order_release); + auto data_write_callback = [idx, is_finish_ptr](int status, int slot_id) mutable { + is_finish_ptr->store(true, std::memory_order_release); + }; - RDMA_tasks_threads_running_ = true; - rdma_tasks_threads_ = std::thread([this] { this->waitandPopTask(std::chrono::milliseconds(100)); }); + data_ctx_->submit(OpCode::WRITE_WITH_IMM, batch, data_write_callback, RDMAContext::UNDEFINED_QPI, idx); } -void RDMAEndpoint::addSendTask(std::shared_ptr buffer) +void RDMAEndpoint::metaRecvQueueThread(std::chrono::milliseconds timeout) { - std::unique_lock lock(rdma_tasks_mutex_); - ++send_slot_id_; - auto task = std::make_shared(shared_from_this(), send_slot_id_, OpCode::SEND, buffer); - send_batch_slot_[send_slot_id_] = task; - rdma_tasks_queue_.push(task); - rdma_tasks_cv_.notify_one(); + SLIME_LOG_INFO("metaRecvQueueThread started (timeout={}ms)", timeout.count()); + + while (!stop_meta_recv_queue_thread_.load(std::memory_order_acquire)) { + uint32_t idx; + if (meta_recv_queue_.getFrontTaskId(idx)) { + if (meta_slots_manager_->checkSlotAvailable(idx)) { + bool found = meta_recv_queue_.peekQueue( + idx, [](const auto& e) { return e.is_finished_ptr_->load(std::memory_order_acquire); }); + if (found) { + if (meta_slots_manager_->acquireSlot(idx)) { + if (meta_recv_queue_.popQueue()) { + SLIME_LOG_DEBUG("SUCCESS to set META RECV RING SLOT status of task id ", + idx, + " and the slot id ", + idx % SLIME_STATUS_SLOT_SIZE); + } + else { + SLIME_LOG_ERROR("The META Queue Has no element"); + } + } + else { + SLIME_LOG_ERROR("FAIL to set META RECV RING SLOT status of task id ", + idx, + " and the slot id ", + idx % SLIME_STATUS_SLOT_SIZE); + } + } + } + } + else { + if (timeout.count() > 0) { + std::this_thread::sleep_for(timeout); + } + else { + std::this_thread::yield(); + } + } + } + SLIME_LOG_INFO("metaRecvQueueThread stopped"); } -void RDMAEndpoint::addRecvTask(std::shared_ptr buffer) +void RDMAEndpoint::dataRecvQueueThread(std::chrono::milliseconds timeout) { - std::unique_lock lock(rdma_tasks_mutex_); - ++recv_slot_id_; - auto task = std::make_shared(shared_from_this(), recv_slot_id_, OpCode::RECV, buffer); - recv_batch_slot_[recv_slot_id_] = task; - rdma_tasks_queue_.push(task); - rdma_tasks_cv_.notify_one(); + SLIME_LOG_INFO("RecvDataQueueThread started (timeout={}ms)", timeout.count()); + + while (!stop_data_recv_queue_thread_.load(std::memory_order_acquire)) { + uint32_t idx; + if (data_recv_queue_.getFrontTaskId(idx)) { + // std::cout << "dataRecv idx" << idx << std::endl; + bool found = data_recv_queue_.peekQueue( + idx, [](const auto& e) { return e.is_finished_ptr_->load(std::memory_order_acquire); }); + // std::cout << "dataRecv found" << found << std::endl; + if (found) { + RDMABufferQueueElement element = recv_buffer_mapping_[idx]; + element.rdma_buffer_->recvDoneCallback(); + + if (data_recv_queue_.popQueue()) { + recv_buffer_mapping_.erase(idx); + addPreQueueElement(OpCode::RECV); + SLIME_LOG_DEBUG("SUCCESS to set DATA RECV RING SLOT status of task id ", + idx, + " and the slot id ", + idx % SLIME_STATUS_SLOT_SIZE); + } + else { + SLIME_LOG_ERROR("The DATA RECV Queue Has no element"); + } + } + } + else { + if (timeout.count() > 0) { + std::this_thread::sleep_for(timeout); + } + else { + std::this_thread::yield(); + } + } + } + SLIME_LOG_INFO("RecvDataQueueThread stopped"); } -void RDMAEndpoint::waitandPopTask(std::chrono::milliseconds timeout) +void RDMAEndpoint::SendBufferQueueThread(std::chrono::milliseconds timeout) { - while (RDMA_tasks_threads_running_) { - std::shared_ptr task; - - { - std::unique_lock lock(rdma_tasks_mutex_); - - bool has_task = rdma_tasks_cv_.wait_for( - lock, timeout, [this] { return !rdma_tasks_queue_.empty() || !RDMA_tasks_threads_running_; }); - - if (!RDMA_tasks_threads_running_) - break; - if (!has_task) - continue; - - task = std::move(rdma_tasks_queue_.front()); - rdma_tasks_queue_.pop(); + SLIME_LOG_INFO("SendBufferQueueThread started (timeout={}ms)", timeout.count()); + + while (!stop_send_buffer_queue_thread_.load(std::memory_order_acquire)) { + + uint32_t idx; + if (send_buffer_queue_.getFrontTaskId(idx)) { + if (meta_slots_manager_->checkSlotReady(idx)) { + RDMABufferQueueElement element; + if (send_buffer_queue_.fetchQueue(element)) { + postDataWrite(element, element.rdma_buffer_); + send_finish_queue_.enqueue(element); + if (meta_slots_manager_->releaseSlot(idx)) { + addPreQueueElement(OpCode::SEND); + // std::cout << "!!!!!!!!!!!!!!!!!" << std::endl; + } + else { + SLIME_LOG_ERROR("FAIL to release meta_slots_manager_"); + } + } + else { + SLIME_LOG_ERROR("FAIL to fetch the element in send_buffer_queue_"); + } + } } - - if (task) { - switch (task->opcode_) { - case OpCode::SEND: - asyncSendData(task); - break; - case OpCode::RECV: - asyncRecvData(task); - break; - default: - SLIME_LOG_ERROR("Unknown OpCode in WaitandPopTask"); - break; + else { + if (timeout.count() > 0) { + std::this_thread::sleep_for(timeout); + } + else { + std::this_thread::yield(); } } } + SLIME_LOG_INFO("SendBufferQueueThread stopped"); } -void RDMAEndpoint::asyncSendData(std::shared_ptr task) +void RDMAEndpoint::RecvBufferQueueThread(std::chrono::milliseconds timeout) { - size_t batch_size = task->buffer_->batchSize(); - uint32_t slot_id = task->slot_id_; - - auto data_callback = [this, task](int status, int _) mutable { - std::unique_lock lock(rdma_tasks_mutex_); - this->send_batch_slot_[task->slot_id_]->buffer_->send_done_callback(); - }; - - auto meta_callback = [this, task, data_callback](int status, int slot_id) mutable { - std::unique_lock lock(this->rdma_tasks_mutex_); - task->registerRemoteDataMemoryRegion(); - AssignmentBatch data_assign_batch = task->getDataAssignmentBatch(); - auto data_atx = this->data_ctx_->submit( - OpCode::WRITE_WITH_IMM, data_assign_batch, data_callback, RDMAContext::UNDEFINED_QPI, slot_id); - }; - - { - AssignmentBatch meta_data = task->getMetaAssignmentBatch(); - meta_ctx_->submit(OpCode::RECV, meta_data, meta_callback); + SLIME_LOG_INFO("RecvBufferQueueThread started (timeout={}ms)", timeout.count()); + + while (!stop_recv_buffer_queue_thread_.load(std::memory_order_acquire)) { + uint32_t idx; + if (recv_buffer_queue_.getFrontTaskId(idx)) { + // std::cout << "data_slots_manager_" << std::endl; + // data_slots_manager_->printSlots(); + if (data_slots_manager_->checkSlotReady(idx)) { + RDMABufferQueueElement element; + if (recv_buffer_queue_.fetchQueue(element)) { + postMetaWrite(element.unique_id_, element.rdma_buffer_); + recv_buffer_mapping_.emplace(idx, element); + data_slots_manager_->releaseSlot(idx); + // data_slots_manager_->printSlots(); + } + else { + SLIME_LOG_ERROR("FAIL to fetchQueue recv_buffer_queue_"); + } + } + } + else { + if (timeout.count() > 0) { + std::this_thread::sleep_for(timeout); + } + else { + std::this_thread::yield(); + } + } } + SLIME_LOG_INFO("RecvBufferQueueThread stopped"); } -void RDMAEndpoint::asyncRecvData(std::shared_ptr task) +void RDMAEndpoint::SendFinishQueueThread(std::chrono::milliseconds timeout) { - auto data_callback = [this, task](int status, int slot_id) mutable { - std::unique_lock lock(rdma_tasks_mutex_); - recv_batch_slot_[slot_id]->buffer_->recv_done_callback(); - }; - - auto meta_callback = [this, task, data_callback](int status, int _) mutable { - std::unique_lock lock(this->rdma_tasks_mutex_); - AssignmentBatch assign_batch = task->getDataAssignmentBatch(); - auto data_atx = this->data_ctx_->submit(OpCode::RECV, assign_batch, data_callback, RDMAContext::UNDEFINED_QPI); - }; - - { - AssignmentBatch meta_data = task->getMetaAssignmentBatch(); - meta_ctx_->submit(OpCode::WRITE_WITH_IMM, meta_data, meta_callback, RDMAContext::UNDEFINED_QPI, task->slot_id_); + SLIME_LOG_INFO("SendFinishQueueThread started (timeout={}ms)", timeout.count()); + + while (!stop_wait_send_finish_queue_thread_.load(std::memory_order_acquire)) { + uint32_t idx; + // std::cout << "send_finish_queue_ size: " << send_finish_queue_.size() << std::endl; + if (send_finish_queue_.getFrontTaskId(idx)) { + // std::cout << "send_finish_queue_ idx: " << idx << std::endl; + bool found = send_finish_queue_.peekQueue( + idx, [](const auto& e) { return e.is_finished_ptr_->load(std::memory_order_acquire); }); + if (found) { + // std::cout << "send found" << std::endl; + RDMABufferQueueElement element; + if (send_finish_queue_.fetchQueue(element)) { + element.rdma_buffer_->sendDoneCallback(); + // std::cout << "sendDoneCallback" << std::endl; + SLIME_LOG_DEBUG( + "SUCCESS to send_finish_queue_ ", idx, " and the slot id ", idx % SLIME_STATUS_SLOT_SIZE); + } + else { + SLIME_LOG_ERROR("The Msend_finish_queue_ no element"); + } + } + } + else { + if (timeout.count() > 0) { + std::this_thread::sleep_for(timeout); + } + else { + std::this_thread::yield(); + } + } } + SLIME_LOG_INFO("SendFinishQueueThread stopped"); } +// void RDMAEndpoint::RecvFinishQueueThread(std::chrono::milliseconds timeout) +// { +// SLIME_LOG_INFO("SendBufferQueueThread started (timeout={}ms)", timeout.count()); + +// while (!stop_wait_recv_finish_queue_thread_.load(std::memory_order_acquire)) { +// uint32_t idx; +// if (send_finish_queue_.getFrontTaskId(idx)) { + +// if (temp_slots_manager_->checkSlotReady(idx)) { +// RDMABufferQueueElement element; +// if (recv_finish_queue_.fetchQueue(element)) { +// element.rdma_buffer_->recvDoneCallback(); +// if (temp_slots_manager_->releaseSlot(idx)) { +// addPreQueueElement(OpCode::RECV); +// } +// else { +// SLIME_LOG_ERROR("FAIL to set META RECV RING SLOT status of task id ", +// idx, +// " and the slot id ", +// idx % SLIME_STATUS_SLOT_SIZE); +// } +// } +// else { +// SLIME_LOG_ERROR("FAIL to set META RECV RING SLOT status of task id ", +// idx, +// " and the slot id ", +// idx % SLIME_STATUS_SLOT_SIZE); +// } +// } +// } +// else { +// if (timeout.count() > 0) { +// std::this_thread::sleep_for(timeout); +// } +// else { +// std::this_thread::yield(); +// } +// } +// } +// SLIME_LOG_INFO("SendBufferQueueThread stopped"); +// } } // namespace slime + +// int RDMASendRecvTask::makeRemoteDataMR() +// { +// if (!rdma_buffer_ || !rdma_endpoint_) { +// SLIME_LOG_ERROR("Null pointer detected: rdma_buffer_=" << rdma_buffer_ +// << ", rdma_endpoint_=" << rdma_endpoint_); +// return -1; +// } + +// std::string prefix = (rdma_operation_ == OpCode::SEND) ? "DATA_SEND_" : "DATA_RECV_"; +// std::string MR_KEY = prefix + std::to_string(unique_slot_id_) + "_"; +// auto mr = rdma_endpoint_->dataCtx()->get_remote_mr(MR_KEY + std::to_string(0)); + +// if (mr.addr == 0) { +// auto& meta_buffer = rdma_endpoint_->metaBuffer(); +// for (size_t i = 0; i < rdma_buffer_->batch_size(); ++i) { +// uint64_t addr = meta_buffer[unique_slot_id_ % MAX_META_BUFFER_SIZE].mr_addr[i]; +// uint32_t size = meta_buffer[unique_slot_id_ % MAX_META_BUFFER_SIZE].mr_size[i]; +// uint32_t rkey = meta_buffer[unique_slot_id_ % MAX_META_BUFFER_SIZE].mr_rkey[i]; +// rdma_endpoint_->dataCtx()->register_remote_memory_region(MR_KEY + std::to_string(i), addr, size, +// rkey); +// } +// } + +// else { +// SLIME_LOG_DEBUG("The REMOTE DATA MR has been REGISTERED! The SLOT_ID is: ", unique_slot_id_); +// } + +// return 0; +// } + +// void RDMAEndpoint::connect(const json& data_ctx_info, const json& meta_ctx_info) +// { +// data_ctx_->connect(data_ctx_info); +// meta_ctx_->connect(meta_ctx_info); +// data_ctx_->launch_future(); +// meta_ctx_->launch_future(); +// RDMA_tasks_threads_running_ = true; +// rdma_tasks_threads_ = std::thread([this] { this->mainQueueThread(std::chrono::milliseconds(100)); }); +// rdma_task_pool_ = std::make_shared(shared_from_this()); +// } + +// // 生成meta Assignment 和 data Assignment +// int RDMASendRecvTask::makeAssignmentBatch() +// { +// size_t meta_buffer_idx = unique_slot_id_ % MAX_META_BUFFER_SIZE; +// if (rdma_operation_ == OpCode::SEND) { +// AssignmentBatch batch; +// std::string prefix = "DATA_SEND_"; +// std::cout << "rdma_buffer_->batch_size()" << rdma_buffer_->batch_size() << std::endl; +// for (size_t i = 0; i < rdma_buffer_->batch_size(); ++i) { +// batch.push_back(Assignment(prefix + std::to_string(unique_slot_id_) + "_" + std::to_string(i), +// 0, +// 0, +// rdma_buffer_->view_batch()[i].length)); +// } +// data_assignment_batch_ = batch; +// } +// if (rdma_operation_ == OpCode::RECV) { +// meta_assignment_batch_ = AssignmentBatch{ +// Assignment("meta_buffer", +// meta_buffer_idx * sizeof(meta_data_t), +// MAX_META_BUFFER_SIZE * sizeof(meta_data_t) + meta_buffer_idx * sizeof(meta_data_t), +// sizeof(meta_data_t))}; +// } +// return 0; +// } diff --git a/csrc/engine/rdma/rdma_endpoint.h b/csrc/engine/rdma/rdma_endpoint.h index e772f32..03e3b92 100644 --- a/csrc/engine/rdma/rdma_endpoint.h +++ b/csrc/engine/rdma/rdma_endpoint.h @@ -3,22 +3,25 @@ #include "engine/rdma/rdma_context.h" #include +#include #include +#include +#include +#include #include -#include #include #include #include +#include #include #include #include +#include #include +#include "logging.h" #include "rdma_common.h" -#define MAX_META_BATCH_SIZE 64 -#define MAX_META_BUFFER_SIZE 64 - namespace slime { class RDMABuffer; @@ -26,126 +29,368 @@ class RDMAEndpoint; typedef struct MetaData { - uint64_t mr_addr[MAX_META_BATCH_SIZE]; - uint32_t mr_rkey[MAX_META_BATCH_SIZE]; - uint32_t mr_size[MAX_META_BATCH_SIZE]; + uint64_t mr_addr; + uint32_t mr_rkey; + uint32_t mr_size; uint32_t mr_slot; uint32_t mr_qpidx; -} __attribute__((packed)) meta_data_t; +} meta_data_t; -typedef struct RDMATask { - explicit RDMATask(std::shared_ptr endpoint, - uint32_t task_id, - OpCode opcode, - std::shared_ptr buffer); - ~RDMATask(); +struct RDMABufferQueueElement { - std::string getMetaKey(); - std::string getDataKey(int32_t idx); + RDMABufferQueueElement() = default; - AssignmentBatch getMetaAssignmentBatch(); - AssignmentBatch getDataAssignmentBatch(); + RDMABufferQueueElement(uint32_t unique_id, OpCode rdma_opcode, std::shared_ptr rdma_buffer = nullptr): + unique_id_(unique_id), rdma_opcode_(rdma_opcode), rdma_buffer_(rdma_buffer) + { + is_finished_ptr_ = std::make_shared>(false); + } - int registerMetaMemoryRegion(); - int registerDataMemoryRegion(); - int registerRemoteDataMemoryRegion(); + RDMABufferQueueElement(const RDMABufferQueueElement& other) = default; - void fillBuffer(); + RDMABufferQueueElement(RDMABufferQueueElement&& other) noexcept = default; + RDMABufferQueueElement& operator=(RDMABufferQueueElement&& other) noexcept + { + if (this != &other) { + unique_id_ = other.unique_id_; + rdma_opcode_ = other.rdma_opcode_; + rdma_buffer_ = std::move(other.rdma_buffer_); + is_finished_ptr_ = std::move(other.is_finished_ptr_); + other.rdma_buffer_ = nullptr; + other.is_finished_ptr_ = nullptr; + } + return *this; + } + uint32_t unique_id_{0}; + OpCode rdma_opcode_{OpCode::SEND}; + std::shared_ptr rdma_buffer_{nullptr}; + std::shared_ptr> is_finished_ptr_{nullptr}; +}; - int targetQPI(); +struct RDMAPrePostQueueElement { - uint32_t slot_id_; - OpCode opcode_; - std::shared_ptr buffer_; + RDMAPrePostQueueElement() = default; + RDMAPrePostQueueElement(uint32_t unique_id, OpCode rdma_opcode): unique_id_(unique_id), rdma_opcode_(rdma_opcode) + { + is_finished_ptr_ = std::make_shared>(false); + } - std::shared_ptr endpoint_; -} rdma_task_t; + RDMAPrePostQueueElement(const RDMAPrePostQueueElement& other) = default; + RDMAPrePostQueueElement(RDMAPrePostQueueElement&& other) noexcept = default; -class RDMAEndpoint: public std::enable_shared_from_this { - friend class RDMABuffer; + RDMAPrePostQueueElement& operator=(RDMAPrePostQueueElement&& other) noexcept + { + if (this != &other) { + unique_id_ = other.unique_id_; + rdma_opcode_ = other.rdma_opcode_; + is_finished_ptr_ = std::move(other.is_finished_ptr_); + other.is_finished_ptr_ = nullptr; + } + return *this; + } + uint32_t unique_id_{0}; + OpCode rdma_opcode_{OpCode::SEND}; + std::shared_ptr> is_finished_ptr_{nullptr}; +}; + +class RingSlotsManager { + +private: + const size_t buffer_size_; + + struct Slot { + std::atomic status{false}; + std::atomic layers{0}; + }; + + std::vector slots_; public: - explicit RDMAEndpoint(const std::string& dev_name, uint8_t ib_port, const std::string& link_type, size_t qp_num); + explicit RingSlotsManager(size_t size): buffer_size_(size), slots_(size) + { - explicit RDMAEndpoint(const std::string& data_dev_name, - const std::string& meta_dev_name, - uint8_t ib_port, - const std::string& link_type, - size_t qp_num); + for (size_t i = 0; i < buffer_size_; ++i) { + slots_[i].status.store(false, std::memory_order_relaxed); + slots_[i].layers.store(0, std::memory_order_relaxed); + } + } - ~RDMAEndpoint(); + RingSlotsManager(const RingSlotsManager&) = delete; + RingSlotsManager& operator=(const RingSlotsManager&) = delete; + RingSlotsManager(RingSlotsManager&&) = delete; + RingSlotsManager& operator=(RingSlotsManager&&) = delete; - void connect(const json& data_ctx_info, const json& meta_ctx_info); + bool releaseSlot(uint32_t idx, int max_wait_attempts = 5) + { + size_t index = idx % buffer_size_; + uint32_t target_layer = idx / buffer_size_; + + for (int attempt = 0; attempt < max_wait_attempts; ++attempt) { + bool cur_status = slots_[index].status.load(std::memory_order_acquire); + uint32_t cur_layers = slots_[index].layers.load(std::memory_order_acquire); + + if (!cur_status || cur_layers != target_layer) { + return false; + } + + bool expected_status = true; + if (slots_[index].status.compare_exchange_strong( + expected_status, false, std::memory_order_release, std::memory_order_relaxed)) { + uint32_t next_layer = cur_layers + 1; + if (slots_[index].layers.compare_exchange_strong( + cur_layers, next_layer, std::memory_order_release, std::memory_order_relaxed)) { + return true; + } + slots_[index].status.store(true, std::memory_order_release); + return false; + } + std::this_thread::yield(); + } + return false; + } - std::shared_ptr createRDMABuffer(storage_view_batch_t batch); + bool acquireSlot(uint32_t idx, int max_wait_attempts = 5) + { + size_t index = idx % buffer_size_; + uint32_t target_layers = idx / buffer_size_; + + for (int attempt = 0; attempt < max_wait_attempts; ++attempt) { + bool cur_status = slots_[index].status.load(std::memory_order_acquire); + uint32_t cur_layers = slots_[index].layers.load(std::memory_order_acquire); + + if (cur_status || cur_layers != target_layers) { + std::this_thread::yield(); + continue; + } - void addRecvTask(std::shared_ptr buffer); - void addSendTask(std::shared_ptr buffer); + bool expected = false; + if (slots_[index].status.compare_exchange_strong( + expected, true, std::memory_order_release, std::memory_order_relaxed)) { + return true; + } + + std::this_thread::yield(); + } + + return false; + } - json getDataContextInfo() const + bool checkSlotAvailable(uint32_t idx) const { - return data_ctx_->endpoint_info(); + size_t index = idx % buffer_size_; + uint32_t target_layers = idx / buffer_size_; + + bool status = slots_[index].status.load(std::memory_order_acquire); + uint32_t layers = slots_[index].layers.load(std::memory_order_acquire); + return !status && (layers == target_layers); } - json getMetaContextInfo() const + bool checkSlotReady(uint32_t idx) const { - return meta_ctx_->endpoint_info(); + size_t index = idx % buffer_size_; + uint32_t target_layers = idx / buffer_size_; + + bool status = slots_[index].status.load(std::memory_order_acquire); + uint32_t layers = slots_[index].layers.load(std::memory_order_acquire); + + return status && (layers == target_layers); + } + + void printSlots() const + { + std::cout << "=== RingSlotsManager Status (size=" << buffer_size_ << ") ===" << std::endl; + for (size_t i = 0; i < buffer_size_; ++i) { + bool status = slots_[i].status.load(std::memory_order_relaxed); + uint32_t layers = slots_[i].layers.load(std::memory_order_relaxed); + + std::string state = status ? "occupied" : "free"; + + std::cout << " [" << std::setw(3) << i << "] layer=" << std::setw(2) << layers << " : " << state + << std::endl; + } + std::cout << "========================================" << std::endl; + } +}; + +template +class ProxyQueue { +private: + std::queue queue_; + mutable std::mutex mutex_; + +public: + void enqueue(T element) + { + std::lock_guard lock(mutex_); + queue_.push(std::move(element)); } - std::shared_ptr dataCtx() + bool fetchQueue(T& element) { - return data_ctx_; + std::lock_guard lock(mutex_); + if (queue_.empty()) + return false; + + element = std::move(queue_.front()); + queue_.pop(); + return true; + } + + bool checkQueue(const T*& element) + { + std::lock_guard lock(mutex_); + if (queue_.empty()) + return false; + + element = &queue_.front(); + return true; + } + bool popQueue() + { + std::lock_guard lock(mutex_); + if (queue_.empty()) + return false; + + queue_.pop(); + return true; + } + + template + bool peekQueue(uint32_t& task_id, M&& matcher) + { + std::lock_guard lock(mutex_); + if (!queue_.empty() && std::forward(matcher)(queue_.front())) { + task_id = queue_.front().unique_id_; + // queue_.pop(); + return true; + } + return false; } - std::shared_ptr metaCtx() + template + bool peekQueue(T& element, M&& matcher) { - return meta_ctx_; + std::lock_guard lock(mutex_); + if (!queue_.empty() && std::forward(matcher)(queue_.front())) { + element = std::move(queue_.front()); + queue_.pop(); + return true; + } + return false; } - int metaCtxQPNum() + + bool getFrontTaskId(uint32_t& task_id) { - return meta_ctx_qp_num_; + std::lock_guard lock(mutex_); + if (!queue_.empty()) { + task_id = queue_.front().unique_id_; + return true; + } + return false; } - int dataCtxQPNum() + + size_t size() const { - return data_ctx_qp_num_; + std::lock_guard lock(mutex_); + return queue_.size(); } - std::vector& getMetaBuffer() + bool empty() const { - return meta_buffer_; + std::lock_guard lock(mutex_); + return queue_.empty(); } +}; + +class RDMAEndpoint: public std::enable_shared_from_this { + + friend class RDMABuffer; + +public: + explicit RDMAEndpoint(const std::string& dev_name, size_t ib_port, const std::string& link_type, size_t qp_nums); + + // TODO: 设计聚合多网卡传输的Send Recv + + json dataCtxInfo() const + { + return data_ctx_->endpoint_info(); + } + + json metaCtxInfo() const + { + return meta_ctx_->endpoint_info(); + } + + void connect(const json& data_ctx_info, const json& meta_ctx_info); + + ~RDMAEndpoint(); + + void proxyInit(); + void proxyDestroy(); + + void addRDMABuffer(std::shared_ptr rdma_buffer); private: - void waitandPopTask(std::chrono::milliseconds timeout); + void metaRecvQueueThread(std::chrono::milliseconds timeout); + void dataRecvQueueThread(std::chrono::milliseconds timeout); + + void SendBufferQueueThread(std::chrono::milliseconds timeout); + void RecvBufferQueueThread(std::chrono::milliseconds timeout); - void asyncRecvData(std::shared_ptr task); - void asyncSendData(std::shared_ptr task); + void SendFinishQueueThread(std::chrono::milliseconds timeout); + void RecvFinishQueueThread(std::chrono::milliseconds timeout); + + void addPreQueueElement(OpCode rdma_opcode); + void addRDMABuffer(OpCode rdma_opcode, std::shared_ptr rdma_buffer); + void postMetaWrite(uint32_t idx, std::shared_ptr rdma_buffer); + void postDataWrite(RDMABufferQueueElement& element, std::shared_ptr rdma_buffer); + void postRDMAAssignment(OpCode rdma_opcode); + + ProxyQueue meta_recv_queue_; + ProxyQueue data_recv_queue_; + + ProxyQueue send_buffer_queue_; + ProxyQueue recv_buffer_queue_; + + ProxyQueue send_finish_queue_; + ProxyQueue recv_finish_queue_; + + std::thread meta_recv_thread_; + std::thread data_recv_thread_; + + std::thread send_buffer_thread_; + std::thread recv_buffer_thread_; + + std::thread send_finish_thread_; + std::thread recv_finish_thread_; size_t data_ctx_qp_num_; size_t meta_ctx_qp_num_; - std::atomic send_slot_id_{RDMAContext::UNDEFINED_IMM_DATA}; - std::atomic recv_slot_id_{RDMAContext::UNDEFINED_IMM_DATA}; - - std::vector meta_buffer_; + std::atomic unique_SEND_SLOT_ID_{0}; + std::atomic unique_RECV_SLOT_ID_{0}; - uint32_t batch_size_; + std::vector dum_meta_buffer_; + std::vector dum_data_buffer_; - std::map> send_batch_slot_; - std::map> recv_batch_slot_; + std::vector meta_buffer_; std::shared_ptr data_ctx_; std::shared_ptr meta_ctx_; - std::queue> rdma_tasks_queue_; - std::thread rdma_tasks_threads_; + std::atomic unique_meta_recv_id_{0}; + std::atomic unique_data_recv_id_{0}; - std::condition_variable rdma_tasks_cv_; - std::mutex rdma_tasks_mutex_; + std::atomic stop_meta_recv_queue_thread_{false}; + std::atomic stop_data_recv_queue_thread_{false}; + std::atomic stop_send_buffer_queue_thread_{false}; + std::atomic stop_recv_buffer_queue_thread_{false}; + std::atomic stop_wait_send_finish_queue_thread_{false}; + std::atomic stop_wait_recv_finish_queue_thread_{false}; + std::unique_ptr meta_slots_manager_; + std::unique_ptr data_slots_manager_; - std::map> imm_data_callback_; - bool RDMA_tasks_threads_running_; + std::unordered_map recv_buffer_mapping_; }; - -} // namespace slime +} // namespace slime \ No newline at end of file diff --git a/csrc/engine/rdma/rdma_env.h b/csrc/engine/rdma/rdma_env.h index cd3ff3c..2aae0d5 100644 --- a/csrc/engine/rdma/rdma_env.h +++ b/csrc/engine/rdma/rdma_env.h @@ -17,9 +17,11 @@ inline const int SLIME_MAX_RD_ATOMIC = get_env("SLIME_MAX_RD_AT inline const int SLIME_MAX_DEST_RD_ATOMIC = get_env("SLIME_MAX_DEST_RD_ATOMIC", 16); inline const int SLIME_SERVICE_LEVEL = get_env("SLIME_SERVICE_LEVEL", 0); inline const int SLIME_GID_INDEX = get_env("SLIME_GID_INDEX", -1); -inline const int SLIME_QP_NUM = get_env("SLIME_QP_NUM", 8); +inline const int SLIME_QP_NUM = get_env("SLIME_QP_NUM", 1); inline const int SLIME_CQ_NUM = get_env("SLIME_CQ_NUM", 1); inline const int SLIME_MAX_CQ_DEPTH = get_env("SLIME_MAX_CQ_DEPTH", 8192); inline const int SLIME_AGG_QP_NUM = get_env("SLIME_AGG_QP_NUM", 1); - +inline const int SLIME_DUMMY_BUFFER_SIZE = get_env("SLIME_DUMMY_BUFFER_SIZE", 16); +inline const int SLIME_META_BUFFER_SIZE = get_env("SLIME_META_BUFFER_SIZE", 4); +inline const int SLIME_STATUS_SLOT_SIZE = get_env("SLIME_STATUS_SLOT_SIZE", 4); } // namespace slime diff --git a/csrc/python/bind.cpp b/csrc/python/bind.cpp index 1fbfe87..270a82b 100644 --- a/csrc/python/bind.cpp +++ b/csrc/python/bind.cpp @@ -120,7 +120,7 @@ py::object alloc_dlpack_tensor(slime::NVShmemContext& self, size_t size, size_t #define BUILD_INTER_OPS_ENABLED false #endif -#define EXPOSE_BUILD_FLAG(m, flag) m.attr("_"#flag) = flag##_ENABLED +#define EXPOSE_BUILD_FLAG(m, flag) m.attr("_" #flag) = flag##_ENABLED PYBIND11_MODULE(_slime_c, m) { @@ -187,14 +187,11 @@ PYBIND11_MODULE(_slime_c, m) py::class_>(m, "rdma_endpoint") .def(py::init()) .def("context_connect", &slime::RDMAEndpoint::connect) - .def("get_data_context_info", &slime::RDMAEndpoint::getDataContextInfo) - .def("get_meta_context_info", &slime::RDMAEndpoint::getMetaContextInfo); + .def("get_data_context_info", &slime::RDMAEndpoint::dataCtxInfo) + .def("get_meta_context_info", &slime::RDMAEndpoint::metaCtxInfo); py::class_>(m, "rdma_buffer") - .def(py::init, - std::vector, - std::vector, - std::vector>()) + .def(py::init, uintptr_t, size_t, size_t>()) .def("send", &slime::RDMABuffer::send) .def("recv", &slime::RDMABuffer::recv) .def("wait_send", &slime::RDMABuffer::waitSend) diff --git a/csrc/torch/slime_backend.cpp b/csrc/torch/slime_backend.cpp index 9fe7562..fc2c9bf 100644 --- a/csrc/torch/slime_backend.cpp +++ b/csrc/torch/slime_backend.cpp @@ -100,9 +100,10 @@ c10::intrusive_ptr<::c10d::Work> slimeBackend::send(std::vector& ten offset.push_back(0); data_size.push_back(static_cast(tensors[i].numel() * tensors[i].itemsize())); } - - auto buf = - std::make_shared(end_point_set_[mod_positive(dstRank - rank_, size_ - 1)], ptrs, offset, data_size); + // std::cout << "batch_size:" << batch_size << std::endl; + // std::cout << "Endpoint iDX:" << mod_positive(dstRank - rank_, size_ - 1) << std::endl; + auto buf = std::make_shared( + end_point_set_[mod_positive(dstRank - rank_, size_ - 1)], ptrs[0], offset[0], data_size[0]); buf->send(); ++seq_; @@ -126,9 +127,10 @@ c10::intrusive_ptr<::c10d::Work> slimeBackend::recv(std::vector& ten offset.push_back(0); data_size.push_back(static_cast(tensors[i].numel() * tensors[i].itemsize())); } - - auto buf = - std::make_shared(end_point_set_[mod_positive(srcRank - rank_, size_ - 1)], ptrs, offset, data_size); + // std::cout << "batch_size:" << batch_size << std::endl; + // std::cout << "Endpoint iDX:" << mod_positive(srcRank - rank_, size_ - 1) << std::endl; + auto buf = std::make_shared( + end_point_set_[mod_positive(srcRank - rank_, size_ - 1)], ptrs[0], offset[0], data_size[0]); buf->recv(); ++seq_; @@ -160,8 +162,8 @@ slimeBackend::slimeBackend(const c10::intrusive_ptr<::c10d::Store>& store, int r end_point_set_.push_back(std::make_shared(dev_name, ib_port, link_type, qp_num)); json channel_info; - channel_info["data_channel"] = end_point_set_[i]->getDataContextInfo(); - channel_info["meta_channel"] = end_point_set_[i]->getMetaContextInfo(); + channel_info["data_channel"] = end_point_set_[i]->dataCtxInfo(); + channel_info["meta_channel"] = end_point_set_[i]->metaCtxInfo(); local_channel_info_.push_back(channel_info); } diff --git a/tests/cpp/CMakeLists.txt b/tests/cpp/CMakeLists.txt index cca5acf..33347d8 100644 --- a/tests/cpp/CMakeLists.txt +++ b/tests/cpp/CMakeLists.txt @@ -11,6 +11,36 @@ target_link_libraries( _slime_engine _slime_rdma gflags zmq pthread ) + +add_executable( + send_test + send_test.cpp +) + +target_include_directories(send_test PUBLIC ${ZeroMQ_INCLUDE_DIRS}) + +target_link_libraries( + send_test + PUBLIC + _slime_engine _slime_rdma gflags zmq pthread +) + + +add_executable( + recv_test + recv_test.cpp +) + +target_include_directories(recv_test PUBLIC ${ZeroMQ_INCLUDE_DIRS}) + +target_link_libraries( + recv_test + PUBLIC + _slime_engine _slime_rdma gflags zmq pthread +) + + + if (BUILD_ASCEND_DIRECT) add_subdirectory(ascend_direct) endif() diff --git a/tests/cpp/recv_test.cpp b/tests/cpp/recv_test.cpp new file mode 100644 index 0000000..38cb171 --- /dev/null +++ b/tests/cpp/recv_test.cpp @@ -0,0 +1,184 @@ +#include "engine/rdma/rdma_buffer.h" +#include "engine/rdma/rdma_endpoint.cpp" + +#include +#include +#include +#include +#include + +using json = nlohmann::json; +using namespace slime; + +DEFINE_string(DEVICE_NAME, "rxe_0", "RDMA device name"); +DEFINE_string(LINK_TYPE, "RoCE", "IB or RoCE"); +DEFINE_int32(IB_PORT, 1, "RDMA port number"); +DEFINE_int32(PORT_DATA, 5557, "ZMQ DATA port"); +DEFINE_int32(PORT_META, 5558, "ZMQ META port"); +DEFINE_string(PEER_ADDR, "127.0.0.1", "peer IP address"); + +int main(int argc, char** argv) +{ + + std::cout << "Init the RMDA ENDPOINT OF RECV... " << std::endl; + // Construct the end_point + auto end_point = std::make_shared(FLAGS_DEVICE_NAME, FLAGS_IB_PORT, FLAGS_LINK_TYPE, 1); + + std::cout << "RDMA QP INFO VIA TCP... " << std::endl; + // RDMA control plane via TCP + zmq::context_t zmq_ctx_data(2); + zmq::context_t zmq_ctx_meta(2); + + zmq::socket_t sock_data(zmq_ctx_data, ZMQ_REQ); + zmq::socket_t sock_meta(zmq_ctx_meta, ZMQ_REQ); + + sock_data.connect("tcp://" + FLAGS_PEER_ADDR + ":" + std::to_string(FLAGS_PORT_DATA)); + sock_meta.connect("tcp://" + FLAGS_PEER_ADDR + ":" + std::to_string(FLAGS_PORT_META)); + + zmq::message_t local_data_channel_info(end_point->dataCtxInfo().dump()); + zmq::message_t local_meta_channel_info(end_point->metaCtxInfo().dump()); + + sock_data.send(local_data_channel_info, zmq::send_flags::none); + sock_meta.send(local_meta_channel_info, zmq::send_flags::none); + + std::cout << "Send the RDMA Info to other side..." << std::endl; + + zmq::message_t data_channel_info; + zmq::message_t meta_channel_info; + + auto send_data_result = sock_data.recv(data_channel_info); + auto recv_data_result = sock_meta.recv(meta_channel_info); + + end_point->connect(json::parse(data_channel_info.to_string()), json::parse(meta_channel_info.to_string())); + std::cout << "Connect Success..." << std::endl; + std::cout << "Finish the connection of QP, start to RECV of buf_0 and buf_1... " << std::endl; + + const uint32_t batch_size_buf_0 = 1; + std::vector data_buf_0(8192, 'A'); + uintptr_t ptrs_buf_0 = reinterpret_cast(data_buf_0.data()); + size_t data_sizes_buf_0 = data_buf_0.size(); + size_t offset_buf_0 = 0; + + const uint32_t batch_size_buf_1 = 1; + std::vector data_buf_1(8192, 'B'); + uintptr_t ptrs_buf_1 = reinterpret_cast(data_buf_1.data()); + size_t data_sizes_buf_1 = data_buf_1.size(); + size_t offset_buf_1 = 0; + + const uint32_t batch_size_buf_2 = 1; + std::vector data_buf_2(8192, 'C'); + uintptr_t ptrs_buf_2 = reinterpret_cast(data_buf_2.data()); + size_t data_sizes_buf_2 = data_buf_2.size(); + size_t offset_buf_2 = 0; + + const uint32_t batch_size_buf_3 = 1; + std::vector data_buf_3(8192, 'D'); + uintptr_t ptrs_buf_3 = reinterpret_cast(data_buf_3.data()); + size_t data_sizes_buf_3 = data_buf_3.size(); + size_t offset_buf_3 = 0; + + const uint32_t batch_size_buf_4 = 1; + std::vector data_buf_4(8192, 'E'); + uintptr_t ptrs_buf_4 = reinterpret_cast(data_buf_4.data()); + size_t data_sizes_buf_4 = data_buf_4.size(); + size_t offset_buf_4 = 0; + + const uint32_t batch_size_buf_5 = 1; + std::vector data_buf_5(8192, 'F'); + uintptr_t ptrs_buf_5 = reinterpret_cast(data_buf_5.data()); + size_t data_sizes_buf_5 = data_buf_5.size(); + size_t offset_buf_5 = 0; + + std::vector data_buf_6(8192, 'F'); + uintptr_t ptrs_buf_6 = reinterpret_cast(data_buf_6.data()); + size_t data_sizes_buf_6 = data_buf_6.size(); + size_t offset_buf_6 = 0; + + std::vector data_buf_7(8192, 'F'); + uintptr_t ptrs_buf_7 = reinterpret_cast(data_buf_7.data()); + size_t data_sizes_buf_7 = data_buf_7.size(); + size_t offset_buf_7 = 0; + + std::vector data_buf_8(8192, 'F'); + uintptr_t ptrs_buf_8 = reinterpret_cast(data_buf_8.data()); + size_t data_sizes_buf_8 = data_buf_8.size(); + size_t offset_buf_8 = 0; + + auto buf_0 = std::make_shared(end_point, ptrs_buf_0, offset_buf_0, data_sizes_buf_0); + auto buf_1 = std::make_shared(end_point, ptrs_buf_1, offset_buf_1, data_sizes_buf_1); + auto buf_2 = std::make_shared(end_point, ptrs_buf_2, offset_buf_2, data_sizes_buf_2); + auto buf_3 = std::make_shared(end_point, ptrs_buf_3, offset_buf_3, data_sizes_buf_3); + auto buf_4 = std::make_shared(end_point, ptrs_buf_4, offset_buf_4, data_sizes_buf_4); + auto buf_5 = std::make_shared(end_point, ptrs_buf_5, offset_buf_5, data_sizes_buf_5); + auto buf_6 = std::make_shared(end_point, ptrs_buf_6, offset_buf_6, data_sizes_buf_6); + auto buf_7 = std::make_shared(end_point, ptrs_buf_7, offset_buf_7, data_sizes_buf_7); + auto buf_8 = std::make_shared(end_point, ptrs_buf_8, offset_buf_8, data_sizes_buf_8); + + std::cout << "Launch EDNPOINT ..." << std::endl; + + buf_0->recv(); + buf_1->recv(); + buf_2->recv(); + buf_3->recv(); + buf_4->recv(); + buf_5->recv(); + buf_6->recv(); + buf_7->recv(); + buf_8->recv(); + std::cout << "Main thread working Test..." << std::endl; + std::cout << "Main thread working Test..." << std::endl; + std::cout << "Main thread working Test..." << std::endl; + std::cout << "Main thread working Test..." << std::endl; + std::cout << "Main thread working Test..." << std::endl; + std::cout << "Wait RECV Complete..." << std::endl; + buf_0->waitRecv(); + buf_1->waitRecv(); + buf_2->waitRecv(); + buf_3->waitRecv(); + buf_4->waitRecv(); + buf_5->waitRecv(); + buf_6->waitRecv(); + buf_7->waitRecv(); + buf_8->waitRecv(); + // buf_1->waitRecv(); + + std::cout << data_buf_0[0] << std::endl; + std::cout << data_buf_1[0] << std::endl; + std::cout << data_buf_2[0] << std::endl; + std::cout << data_buf_3[0] << std::endl; + std::cout << data_buf_4[0] << std::endl; + std::cout << data_buf_5[0] << std::endl; + std::cout << data_buf_6[0] << std::endl; + std::cout << data_buf_7[0] << std::endl; + std::cout << data_buf_8[0] << std::endl; + bool data_buf_0_correct = std::all_of(data_buf_0.begin(), data_buf_0.end(), [](char c) { return c == '0'; }); + assert(data_buf_0_correct && "Data_0 should contain '0'"); + + bool data_buf_1_correct = std::all_of(data_buf_1.begin(), data_buf_1.end(), [](char c) { return c == '1'; }); + assert(data_buf_1_correct && "Data_1 should contain '1'"); + + bool data_buf_2_correct = std::all_of(data_buf_2.begin(), data_buf_2.end(), [](char c) { return c == '2'; }); + assert(data_buf_2_correct && "Data_2 should contain '2'"); + + bool data_buf_3_correct = std::all_of(data_buf_3.begin(), data_buf_3.end(), [](char c) { return c == '3'; }); + assert(data_buf_3_correct && "Data_3 should contain '3'"); + + bool data_buf_4_correct = std::all_of(data_buf_4.begin(), data_buf_4.end(), [](char c) { return c == '4'; }); + assert(data_buf_4_correct && "Data_4 should contain '4'"); + + bool data_buf_5_correct = std::all_of(data_buf_5.begin(), data_buf_5.end(), [](char c) { return c == '5'; }); + assert(data_buf_5_correct && "Data_5 should contain '5'"); + + bool data_buf_6_correct = std::all_of(data_buf_6.begin(), data_buf_6.end(), [](char c) { return c == '6'; }); + assert(data_buf_6_correct && "Data_6 should contain '6'"); + + bool data_buf_7_correct = std::all_of(data_buf_7.begin(), data_buf_7.end(), [](char c) { return c == '7'; }); + assert(data_buf_7_correct && "Data_7 should contain '7'"); + + bool data_buf_8_correct = std::all_of(data_buf_8.begin(), data_buf_8.end(), [](char c) { return c == '8'; }); + assert(data_buf_8_correct && "Data_8 should contain '8'"); + + std::cout << "The RECV test completed and data verified." << std::endl; + + return 0; +} diff --git a/tests/cpp/send_test.cpp b/tests/cpp/send_test.cpp new file mode 100644 index 0000000..f81cf81 --- /dev/null +++ b/tests/cpp/send_test.cpp @@ -0,0 +1,168 @@ +#include "engine/rdma/rdma_buffer.h" +#include "engine/rdma/rdma_endpoint.h" +#include +#include +#include +#include +#include +#include + +using json = nlohmann::json; +using namespace slime; + +DEFINE_string(DEVICE_NAME, "rxe_0", "RDMA device name"); +DEFINE_string(LINK_TYPE, "RoCE", "IB or RoCE"); +DEFINE_int32(IB_PORT, 1, "RDMA port number"); +DEFINE_int32(PORT_DATA, 5557, "ZMQ DATA port"); +DEFINE_int32(PORT_META, 5558, "ZMQ META port"); + +int main(int argc, char** argv) +{ + + std::cout << "Init the RMDA ENDPOINT OF SEND... " << std::endl; + // Construct the end_point + auto end_point = std::make_shared(FLAGS_DEVICE_NAME, FLAGS_IB_PORT, FLAGS_LINK_TYPE, 1); + + std::cout << "RDMA QP INFO VIA TCP... " << std::endl; + // RDMA control plane via TCP + zmq::context_t zmq_ctx_data(2); + zmq::context_t zmq_ctx_meta(2); + + zmq::socket_t sock_data(zmq_ctx_data, ZMQ_REP); + zmq::socket_t sock_meta(zmq_ctx_meta, ZMQ_REP); + + sock_data.bind("tcp://*:" + std::to_string(FLAGS_PORT_DATA)); + sock_meta.bind("tcp://*:" + std::to_string(FLAGS_PORT_META)); + + zmq::message_t data_channel_info; + zmq::message_t meta_channel_info; + auto data_channel_info_res = sock_data.recv(data_channel_info); + auto meta_channel_info_res = sock_meta.recv(meta_channel_info); + + std::cout << "Send the RDMA Info to other side..." << std::endl; + zmq::message_t local_data_channel_info(end_point->dataCtxInfo().dump()); + zmq::message_t local_meta_channel_info(end_point->metaCtxInfo().dump()); + + sock_data.send(local_data_channel_info, zmq::send_flags::none); + sock_meta.send(local_meta_channel_info, zmq::send_flags::none); + + end_point->connect(json::parse(data_channel_info.to_string()), json::parse(meta_channel_info.to_string())); + + std::cout << "Connect Success..." << std::endl; + std::cout << "Finish the connection of QP, start to SEND of buf_0 and buf_1..." << std::endl; + + const uint32_t batch_size_buf_0 = 1; + std::vector data_buf_0(8192, '0'); + + uintptr_t ptrs_buf_0 = reinterpret_cast(data_buf_0.data()); + size_t data_sizes_buf_0 = data_buf_0.size(); + size_t offset_buf_0 = 0; + + const uint32_t batch_size_buf_1 = 1; + std::vector data_buf_1(8192, '1'); + + uintptr_t ptrs_buf_1 = reinterpret_cast(data_buf_1.data()); + size_t data_sizes_buf_1 = data_buf_1.size(); + size_t offset_buf_1 = 0; + + const uint32_t batch_size_buf_2 = 1; + std::vector data_buf_2(8192, '2'); + + uintptr_t ptrs_buf_2 = reinterpret_cast(data_buf_2.data()); + size_t data_sizes_buf_2 = data_buf_2.size(); + size_t offset_buf_2 = 0; + + const uint32_t batch_size_buf_3 = 3; + std::vector data_buf_3(8192, '3'); + + uintptr_t ptrs_buf_3 = reinterpret_cast(data_buf_3.data()); + size_t data_sizes_buf_3 = data_buf_3.size(); + size_t offset_buf_3 = 0; + + const uint32_t batch_size_buf_4 = 4; + std::vector data_buf_4(8192, '4'); + + uintptr_t ptrs_buf_4 = reinterpret_cast(data_buf_4.data()); + size_t data_sizes_buf_4 = data_buf_4.size(); + size_t offset_buf_4 = 0; + + const uint32_t batch_size_buf_5 = 5; + std::vector data_buf_5(8192, '5'); + + uintptr_t ptrs_buf_5 = reinterpret_cast(data_buf_5.data()); + size_t data_sizes_buf_5 = data_buf_5.size(); + size_t offset_buf_5 = 0; + + const uint32_t batch_size_buf_6 = 5; + std::vector data_buf_6(8192, '6'); + + uintptr_t ptrs_buf_6 = reinterpret_cast(data_buf_6.data()); + size_t data_sizes_buf_6 = data_buf_6.size(); + size_t offset_buf_6 = 0; + + const uint32_t batch_size_buf_7 = 5; + std::vector data_buf_7(8192, '7'); + + uintptr_t ptrs_buf_7 = reinterpret_cast(data_buf_7.data()); + size_t data_sizes_buf_7 = data_buf_7.size(); + size_t offset_buf_7 = 0; + + const uint32_t batch_size_buf_8 = 5; + std::vector data_buf_8(8192, '8'); + + uintptr_t ptrs_buf_8 = reinterpret_cast(data_buf_8.data()); + size_t data_sizes_buf_8 = data_buf_8.size(); + size_t offset_buf_8 = 0; + + // const uint32_t batch_size_buf_1 = 2; + // std::vector data_buf_1_0(1024, '1'); + // std::vector data_buf_1_1(2048, '2'); + + // std::vector ptrs_buf_1 = {reinterpret_cast(data_buf_1_0.data()), + // reinterpret_cast(data_buf_1_1.data())}; + // std::vector data_sizes_buf_1 = {data_buf_1_0.size(), data_buf_1_1.size()}; + // std::vector offset_buf_1 = {0, 0}; + + auto buf_0 = std::make_shared(end_point, ptrs_buf_0, offset_buf_0, data_sizes_buf_0); + auto buf_1 = std::make_shared(end_point, ptrs_buf_1, offset_buf_1, data_sizes_buf_1); + auto buf_2 = std::make_shared(end_point, ptrs_buf_2, offset_buf_2, data_sizes_buf_2); + auto buf_3 = std::make_shared(end_point, ptrs_buf_3, offset_buf_3, data_sizes_buf_3); + auto buf_4 = std::make_shared(end_point, ptrs_buf_4, offset_buf_4, data_sizes_buf_4); + auto buf_5 = std::make_shared(end_point, ptrs_buf_5, offset_buf_5, data_sizes_buf_5); + auto buf_6 = std::make_shared(end_point, ptrs_buf_6, offset_buf_6, data_sizes_buf_6); + auto buf_7 = std::make_shared(end_point, ptrs_buf_7, offset_buf_7, data_sizes_buf_7); + auto buf_8 = std::make_shared(end_point, ptrs_buf_8, offset_buf_8, data_sizes_buf_8); + // auto buf_1 = std::make_shared(end_point, ptrs_buf_1, offset_buf_1, data_sizes_buf_1); + + std::cout << "Launch EDNPOINT ..." << std::endl; + + // buf_1->send(); + buf_0->send(); + buf_1->send(); + buf_2->send(); + buf_3->send(); + buf_4->send(); + buf_5->send(); + buf_6->send(); + buf_7->send(); + buf_8->send(); + std::cout << "Main thread working Test..." << std::endl; + std::cout << "Main thread working Test..." << std::endl; + std::cout << "Main thread working Test..." << std::endl; + std::cout << "Main thread working Test..." << std::endl; + std::cout << "Main thread working Test..." << std::endl; + std::cout << "Wait SEND Complete..." << std::endl; + buf_0->waitSend(); + buf_1->waitSend(); + buf_2->waitSend(); + buf_3->waitSend(); + buf_4->waitSend(); + buf_5->waitSend(); + buf_6->waitSend(); + buf_7->waitSend(); + buf_8->waitSend(); + + std::cout << "The SEND test completed." << std::endl; + + return 0; +} diff --git a/tests/cpp/sendrecv.cpp b/tests/cpp/sendrecv.cpp index 170ef71..7a17e59 100644 --- a/tests/cpp/sendrecv.cpp +++ b/tests/cpp/sendrecv.cpp @@ -10,333 +10,339 @@ #include #include -using json = nlohmann::json; -using namespace slime; -using namespace std::chrono; - -DEFINE_string(device, "rxe_0", "RDMA device name"); -DEFINE_string(LINK_TYPE, "RoCE", "IB or RoCE"); -DEFINE_int32(IB_PORT, 1, "RDMA port number"); -DEFINE_string(PEER_ADDR, "127.0.0.1", "peer IP address"); -DEFINE_int32(PORT_DATA, 5557, "ZMQ control port"); -DEFINE_int32(PORT_META, 5558, "ZMQ control port"); -DEFINE_string(OUTPUT_FILE, "rdma_test_results.csv", "output file for performance results"); - -DEFINE_bool(send, false, "Run in send mode"); -DEFINE_bool(recv, false, "Run in recv mode"); - -DEFINE_int32(num_qp, 1, "Number of QPs"); -DEFINE_int32(num_packets, 100, "Number of packets"); -DEFINE_int32(min_packet_size, 11, "Minimum size of packet size (2^(min_packet_size) bytes)"); -DEFINE_int32(max_packet_size, 11, "Maximum size of packet size (2^(max_packet_size) bytes)"); - -typedef struct Result { - size_t packet_size; - size_t total_bytes; - size_t packet_num; - double min_latency_ms; - double max_latency_ms; - double avg_latency_ms; - double p50_latency_ms; - double p99_latency_ms; - - double min_bandwidth_MBs; - double max_bandwidth_MBs; - double avg_bandwidth_MBs; - - double p50_bandwidth_MBs; - double p99_bandwidth_MBs; - -} Result_t; - -double calculatePercentile(const std::vector& data, double percentile) + +int main() { - if (data.empty()) - return 0.0; - - std::vector sorted_data = data; - std::sort(sorted_data.begin(), sorted_data.end()); - - double position = percentile * (sorted_data.size() - 1); - size_t index = static_cast(position); - double fraction = position - index; - - if (index + 1 < sorted_data.size()) { - return sorted_data[index] + fraction * (sorted_data[index + 1] - sorted_data[index]); - } - else { - return sorted_data[index]; - } + return 0; } -void initConnection(std::shared_ptr& end_point) -{ - if (FLAGS_send) { - std::cout << "Initializing RDMA endpoint in SEND mode..." << std::endl; +// using json = nlohmann::json; +// using namespace slime; +// using namespace std::chrono; + +// DEFINE_string(device, "rxe_0", "RDMA device name"); +// DEFINE_string(LINK_TYPE, "RoCE", "IB or RoCE"); +// DEFINE_int32(IB_PORT, 1, "RDMA port number"); +// DEFINE_string(PEER_ADDR, "127.0.0.1", "peer IP address"); +// DEFINE_int32(PORT_DATA, 5557, "ZMQ control port"); +// DEFINE_int32(PORT_META, 5558, "ZMQ control port"); +// DEFINE_string(OUTPUT_FILE, "rdma_test_results.csv", "output file for performance results"); + +// DEFINE_bool(send, false, "Run in send mode"); +// DEFINE_bool(recv, false, "Run in recv mode"); + +// DEFINE_int32(num_qp, 1, "Number of QPs"); +// DEFINE_int32(num_packets, 100, "Number of packets"); +// DEFINE_int32(min_packet_size, 11, "Minimum size of packet size (2^(min_packet_size) bytes)"); +// DEFINE_int32(max_packet_size, 11, "Maximum size of packet size (2^(max_packet_size) bytes)"); + +// typedef struct Result { +// size_t packet_size; +// size_t total_bytes; +// size_t packet_num; +// double min_latency_ms; +// double max_latency_ms; +// double avg_latency_ms; +// double p50_latency_ms; +// double p99_latency_ms; + +// double min_bandwidth_MBs; +// double max_bandwidth_MBs; +// double avg_bandwidth_MBs; + +// double p50_bandwidth_MBs; +// double p99_bandwidth_MBs; + +// } Result_t; + +// double calculatePercentile(const std::vector& data, double percentile) +// { +// if (data.empty()) +// return 0.0; - zmq::context_t zmq_ctx_data(2); - zmq::context_t zmq_ctx_meta(2); +// std::vector sorted_data = data; +// std::sort(sorted_data.begin(), sorted_data.end()); - zmq::socket_t sock_data(zmq_ctx_data, ZMQ_REP); - zmq::socket_t sock_meta(zmq_ctx_meta, ZMQ_REP); +// double position = percentile * (sorted_data.size() - 1); +// size_t index = static_cast(position); +// double fraction = position - index; - sock_data.bind("tcp://*:" + std::to_string(FLAGS_PORT_DATA)); - sock_meta.bind("tcp://*:" + std::to_string(FLAGS_PORT_META)); +// if (index + 1 < sorted_data.size()) { +// return sorted_data[index] + fraction * (sorted_data[index + 1] - sorted_data[index]); +// } +// else { +// return sorted_data[index]; +// } +// } + +// void initConnection(std::shared_ptr& end_point) +// { +// if (FLAGS_send) { +// std::cout << "Initializing RDMA endpoint in SEND mode..." << std::endl; - zmq::message_t peer_data_info; - zmq::message_t peer_meta_info; - auto data_res = sock_data.recv(peer_data_info); - auto meta_res = sock_meta.recv(peer_meta_info); +// zmq::context_t zmq_ctx_data(2); +// zmq::context_t zmq_ctx_meta(2); - zmq::message_t local_data_info(end_point->getDataContextInfo().dump()); - zmq::message_t local_meta_info(end_point->getMetaContextInfo().dump()); +// zmq::socket_t sock_data(zmq_ctx_data, ZMQ_REP); +// zmq::socket_t sock_meta(zmq_ctx_meta, ZMQ_REP); - sock_data.send(local_data_info, zmq::send_flags::none); - sock_meta.send(local_meta_info, zmq::send_flags::none); +// sock_data.bind("tcp://*:" + std::to_string(FLAGS_PORT_DATA)); +// sock_meta.bind("tcp://*:" + std::to_string(FLAGS_PORT_META)); - end_point->connect( - json::parse(std::string(static_cast(peer_data_info.data()), peer_data_info.size())), - json::parse(std::string(static_cast(peer_meta_info.data()), peer_meta_info.size())) - ); - } +// zmq::message_t peer_data_info; +// zmq::message_t peer_meta_info; +// auto data_res = sock_data.recv(peer_data_info); +// auto meta_res = sock_meta.recv(peer_meta_info); + +// zmq::message_t local_data_info(end_point->getDataContextInfo().dump()); +// zmq::message_t local_meta_info(end_point->getMetaContextInfo().dump()); + +// sock_data.send(local_data_info, zmq::send_flags::none); +// sock_meta.send(local_meta_info, zmq::send_flags::none); + +// end_point->connect( +// json::parse(std::string(static_cast(peer_data_info.data()), peer_data_info.size())), +// json::parse(std::string(static_cast(peer_meta_info.data()), peer_meta_info.size())) +// ); +// } - else if (FLAGS_recv) { - std::cout << "Initializing RDMA endpoint in RECV mode..." << std::endl; - - zmq::context_t zmq_ctx_data(2); - zmq::context_t zmq_ctx_meta(2); - std::cout << "Initializing RDMA endpoint in RECV mode..." << std::endl; - zmq::socket_t sock_data(zmq_ctx_data, ZMQ_REQ); - zmq::socket_t sock_meta(zmq_ctx_meta, ZMQ_REQ); - - sock_data.connect("tcp://" + FLAGS_PEER_ADDR + ":" + std::to_string(FLAGS_PORT_DATA)); - sock_meta.connect("tcp://" + FLAGS_PEER_ADDR + ":" + std::to_string(FLAGS_PORT_META)); - std::cout << "Initializing RDMA endpoint in RECV mode..." << std::endl; - zmq::message_t local_data_info(end_point->getDataContextInfo().dump()); - zmq::message_t local_meta_info(end_point->getMetaContextInfo().dump()); - - sock_data.send(local_data_info, zmq::send_flags::none); - sock_meta.send(local_meta_info, zmq::send_flags::none); - std::cout << "Initializing RDMA endpoint in RECV mode..." << std::endl; - zmq::message_t peer_data_info; - zmq::message_t peer_meta_info; - auto data_res = sock_data.recv(peer_data_info); - auto meta_res = sock_meta.recv(peer_meta_info); - std::cout << "Initializing RDMA endpoint in RECV mode..." << std::endl; - end_point->connect( - json::parse(std::string(static_cast(peer_data_info.data()), peer_data_info.size())), - json::parse(std::string(static_cast(peer_meta_info.data()), peer_meta_info.size())) - ); - } - - std::cout << "RDMA Endpoint connection has been successfully established." << std::endl; -} +// else if (FLAGS_recv) { +// std::cout << "Initializing RDMA endpoint in RECV mode..." << std::endl; + +// zmq::context_t zmq_ctx_data(2); +// zmq::context_t zmq_ctx_meta(2); +// std::cout << "Initializing RDMA endpoint in RECV mode..." << std::endl; +// zmq::socket_t sock_data(zmq_ctx_data, ZMQ_REQ); +// zmq::socket_t sock_meta(zmq_ctx_meta, ZMQ_REQ); + +// sock_data.connect("tcp://" + FLAGS_PEER_ADDR + ":" + std::to_string(FLAGS_PORT_DATA)); +// sock_meta.connect("tcp://" + FLAGS_PEER_ADDR + ":" + std::to_string(FLAGS_PORT_META)); +// std::cout << "Initializing RDMA endpoint in RECV mode..." << std::endl; +// zmq::message_t local_data_info(end_point->getDataContextInfo().dump()); +// zmq::message_t local_meta_info(end_point->getMetaContextInfo().dump()); + +// sock_data.send(local_data_info, zmq::send_flags::none); +// sock_meta.send(local_meta_info, zmq::send_flags::none); +// std::cout << "Initializing RDMA endpoint in RECV mode..." << std::endl; +// zmq::message_t peer_data_info; +// zmq::message_t peer_meta_info; +// auto data_res = sock_data.recv(peer_data_info); +// auto meta_res = sock_meta.recv(peer_meta_info); +// std::cout << "Initializing RDMA endpoint in RECV mode..." << std::endl; +// end_point->connect( +// json::parse(std::string(static_cast(peer_data_info.data()), peer_data_info.size())), +// json::parse(std::string(static_cast(peer_meta_info.data()), peer_meta_info.size())) +// ); +// } -int singleTest(std::shared_ptr end_point, - std::shared_ptr buf, - size_t iterations, - size_t packet_size, - double& latency, - double& bandwidth) -{ +// std::cout << "RDMA Endpoint connection has been successfully established." << std::endl; +// } - std::vector> futures; - std::atomic completed(0); - // warm up - if (FLAGS_send) { - buf->send(); - buf->waitSend(); - } - else if (FLAGS_recv) { - buf->recv(); - buf->waitRecv(); - } - auto start = std::chrono::high_resolution_clock::now(); - for (size_t i = 0; i < iterations; i++) { - // //auto buffer = std::make_shared(buf->endpoint_, buf->ptrs_, buf->offset_, buf->data_size_); - // if (FLAGS_send) { - // buffer->send(); - // futures.emplace_back(std::async(std::launch::async, [&buffer, &completed]() { - // buffer->waitSend(); - // completed++; - // })); - // } - // else if (FLAGS_recv) { - // buffer->recv(); - // futures.emplace_back(std::async(std::launch::async, [&buffer, &completed]() { - // buffer->waitRecv(); - // completed++; - // })); - // } - } - for (auto& fut : futures) { - fut.wait(); - } - auto end = std::chrono::high_resolution_clock::now(); - auto duration_ns = std::chrono::duration_cast(end - start); - - double total_bytes = 2 * packet_size * iterations; - latency = (duration_ns.count() / 1000000.0); - bandwidth = (total_bytes / (duration_ns.count() / 1000000000.0)) / (1024.0 * 1024.0); - return completed; -} +// int singleTest(std::shared_ptr end_point, +// std::shared_ptr buf, +// size_t iterations, +// size_t packet_size, +// double& latency, +// double& bandwidth) +// { -void runTest(std::shared_ptr end_point, size_t packet_size, size_t total_bytes, Result_t& result) -{ - const size_t num_packets = total_bytes / packet_size; - std::vector data_buffer_0(packet_size, FLAGS_send ? 'A' : '0'); - std::vector data_buffer_1(packet_size, FLAGS_send ? 'A' : '0'); - std::vector ptrs = {reinterpret_cast(data_buffer_0.data()),reinterpret_cast(data_buffer_1.data())}; - std::vector offsets = {0,0}; - std::vector sizes = {data_buffer_0.size(), data_buffer_1.size()}; - auto buf = std::make_shared(end_point, ptrs, offsets, sizes); - - std::vector latencies; - std::vector bandwidths; - std::vector success_rates; - - size_t num_tests = FLAGS_num_packets; - size_t iterations = FLAGS_num_packets / num_tests; - for (size_t i = 0; i < num_tests; ++i) { - auto buf = std::make_shared(end_point, ptrs, offsets, sizes); - double latency; - double bandwidth; - int success_count = singleTest(end_point, buf, iterations, packet_size, latency, bandwidth); - latencies.push_back(latency); - bandwidths.push_back(bandwidth); - } - // statistic - - for (auto lat : latencies) - std::cout << "Latency: " << lat << " ms\n"; - - auto [min_lat, max_lat] = std::minmax_element(latencies.begin(), latencies.end()); - double sum_lat = std::accumulate(latencies.begin(), latencies.end(), 0.0); - double mean_lat = sum_lat / FLAGS_num_packets; - - auto [min_bw, max_bw] = std::minmax_element(bandwidths.begin(), bandwidths.end()); - double sum_bw = std::accumulate(bandwidths.begin(), bandwidths.end(), 0.0); - double mean_bw = sum_bw / num_tests; - - // Store results - result.packet_size = packet_size; - result.total_bytes = total_bytes; - result.packet_num = FLAGS_num_packets; - result.min_latency_ms = *min_lat; - result.max_latency_ms = *max_lat; - result.avg_latency_ms = mean_lat; - result.p50_latency_ms = calculatePercentile(latencies, 0.50); - result.p99_latency_ms = calculatePercentile(latencies, 0.99); - - result.min_bandwidth_MBs = *min_bw; - result.max_bandwidth_MBs = *max_bw; - result.avg_bandwidth_MBs = mean_bw; - result.p50_bandwidth_MBs = calculatePercentile(bandwidths, 0.50); - result.p99_bandwidth_MBs = calculatePercentile(bandwidths, 0.99); -} +// std::vector> futures; +// std::atomic completed(0); +// // warm up +// if (FLAGS_send) { +// buf->send(); +// buf->waitSend(); +// } +// else if (FLAGS_recv) { +// buf->recv(); +// buf->waitRecv(); +// } +// auto start = std::chrono::high_resolution_clock::now(); +// for (size_t i = 0; i < iterations; i++) { +// // //auto buffer = std::make_shared(buf->endpoint_, buf->ptrs_, buf->offset_, buf->data_size_); +// // if (FLAGS_send) { +// // buffer->send(); +// // futures.emplace_back(std::async(std::launch::async, [&buffer, &completed]() { +// // buffer->waitSend(); +// // completed++; +// // })); +// // } +// // else if (FLAGS_recv) { +// // buffer->recv(); +// // futures.emplace_back(std::async(std::launch::async, [&buffer, &completed]() { +// // buffer->waitRecv(); +// // completed++; +// // })); +// // } +// } +// for (auto& fut : futures) { +// fut.wait(); +// } +// auto end = std::chrono::high_resolution_clock::now(); +// auto duration_ns = std::chrono::duration_cast(end - start); -void print(const std::vector& results) -{ - std::cout << "\nPerformance Results:\n"; - - // 打印延迟统计 - std::cout << "\nLatency Statistics (ms):\n"; - std::cout << "================================================================================\n"; - std::cout << std::left << std::setw(12) << "Size(B)" << std::setw(12) << "Packets" << std::setw(12) << "Min" - << std::setw(12) << "Max" << std::setw(12) << "Avg" << std::setw(12) << "P50" << std::setw(12) << "P99" - << std::endl; - std::cout << "================================================================================\n"; - - for (const auto& res : results) { - std::cout << std::left << std::setw(12) << res.packet_size << std::setw(12) << res.packet_num << std::setw(12) - << std::setprecision(4) << res.min_latency_ms << std::setw(12) << std::setprecision(4) - << res.max_latency_ms << std::setw(12) << std::setprecision(4) << res.avg_latency_ms << std::setw(12) - << std::setprecision(4) << res.p50_latency_ms << std::setw(12) << std::setprecision(4) - << res.p99_latency_ms << std::endl; - } - std::cout << "================================================================================\n"; - - // 打印带宽统计 - std::cout << "\nBandwidth Statistics (MB/s):\n"; - std::cout << "================================================================================\n"; - std::cout << std::left << std::setw(12) << "Size(B)" << std::setw(12) << "Total(B)" << std::setw(12) << "Min" - << std::setw(12) << "Max" << std::setw(12) << "Avg" << std::setw(12) << "P50" << std::setw(12) << "P99" - << std::endl; - std::cout << "================================================================================\n"; - - for (const auto& res : results) { - std::cout << std::left << std::setw(12) << res.packet_size << std::setw(12) << res.total_bytes << std::setw(12) - << std::setprecision(4) << res.min_bandwidth_MBs << std::setw(12) << std::setprecision(4) - << res.max_bandwidth_MBs << std::setw(12) << std::setprecision(4) << res.avg_bandwidth_MBs - << std::setw(12) << std::setprecision(4) << res.p50_bandwidth_MBs << std::setw(12) - << std::setprecision(4) << res.p99_bandwidth_MBs << std::endl; - } - std::cout << "================================================================================\n"; -} +// double total_bytes = 2 * packet_size * iterations; +// latency = (duration_ns.count() / 1000000.0); +// bandwidth = (total_bytes / (duration_ns.count() / 1000000000.0)) / (1024.0 * 1024.0); +// return completed; +// } -// void save(const std::vector& results, const std::string& filename) +// void runTest(std::shared_ptr end_point, size_t packet_size, size_t total_bytes, Result_t& result) // { -// std::ofstream outfile(filename); -// if (!outfile.is_open()) { -// std::cerr << "Failed to open output file: " << filename << std::endl; -// return; +// const size_t num_packets = total_bytes / packet_size; +// std::vector data_buffer_0(packet_size, FLAGS_send ? 'A' : '0'); +// std::vector data_buffer_1(packet_size, FLAGS_send ? 'A' : '0'); +// std::vector ptrs = {reinterpret_cast(data_buffer_0.data()),reinterpret_cast(data_buffer_1.data())}; +// std::vector offsets = {0,0}; +// std::vector sizes = {data_buffer_0.size(), data_buffer_1.size()}; +// auto buf = std::make_shared(end_point, ptrs, offsets, sizes); + +// std::vector latencies; +// std::vector bandwidths; +// std::vector success_rates; + +// size_t num_tests = FLAGS_num_packets; +// size_t iterations = FLAGS_num_packets / num_tests; +// for (size_t i = 0; i < num_tests; ++i) { +// auto buf = std::make_shared(end_point, ptrs, offsets, sizes); +// double latency; +// double bandwidth; +// int success_count = singleTest(end_point, buf, iterations, packet_size, latency, bandwidth); +// latencies.push_back(latency); +// bandwidths.push_back(bandwidth); // } +// // statistic + +// for (auto lat : latencies) +// std::cout << "Latency: " << lat << " ms\n"; + +// auto [min_lat, max_lat] = std::minmax_element(latencies.begin(), latencies.end()); +// double sum_lat = std::accumulate(latencies.begin(), latencies.end(), 0.0); +// double mean_lat = sum_lat / FLAGS_num_packets; + +// auto [min_bw, max_bw] = std::minmax_element(bandwidths.begin(), bandwidths.end()); +// double sum_bw = std::accumulate(bandwidths.begin(), bandwidths.end(), 0.0); +// double mean_bw = sum_bw / num_tests; + +// // Store results +// result.packet_size = packet_size; +// result.total_bytes = total_bytes; +// result.packet_num = FLAGS_num_packets; +// result.min_latency_ms = *min_lat; +// result.max_latency_ms = *max_lat; +// result.avg_latency_ms = mean_lat; +// result.p50_latency_ms = calculatePercentile(latencies, 0.50); +// result.p99_latency_ms = calculatePercentile(latencies, 0.99); + +// result.min_bandwidth_MBs = *min_bw; +// result.max_bandwidth_MBs = *max_bw; +// result.avg_bandwidth_MBs = mean_bw; +// result.p50_bandwidth_MBs = calculatePercentile(bandwidths, 0.50); +// result.p99_bandwidth_MBs = calculatePercentile(bandwidths, 0.99); +// } + +// void print(const std::vector& results) +// { +// std::cout << "\nPerformance Results:\n"; -// outfile << "Packet Size (Bytes),Total Bytes (Bytes),Amount of Packet" -// << "Min Latency (ms),Max Latency (ms),Avg Latency (ms), Latency StdDev (ms)," -// << "Min Bandwidth (MB/s),Max Bandwidth (MB/s),Avg Bandwidth (MB/s),Bandwidth StdDev (MB/s)," -// << "Success Rate (%)\n"; +// // 打印延迟统计 +// std::cout << "\nLatency Statistics (ms):\n"; +// std::cout << "================================================================================\n"; +// std::cout << std::left << std::setw(12) << "Size(B)" << std::setw(12) << "Packets" << std::setw(12) << "Min" +// << std::setw(12) << "Max" << std::setw(12) << "Avg" << std::setw(12) << "P50" << std::setw(12) << "P99" +// << std::endl; +// std::cout << "================================================================================\n"; // for (const auto& res : results) { -// outfile << res.packet_size << "," << res.total_bytes << "," << res.packet_num << "," << std::setprecision(9) -// << res.min_latency_ms << "," << res.max_latency_ms << "," << res.avg_latency_ms << "," -// << res.stddev_latency << "," << res.min_bandwidth_MBs << "," << res.max_bandwidth_MBs << "," -// << res.avg_bandwidth_MBs << "," << res.stddev_bandwidth << "," << res.success_rate << "\n"; +// std::cout << std::left << std::setw(12) << res.packet_size << std::setw(12) << res.packet_num << std::setw(12) +// << std::setprecision(4) << res.min_latency_ms << std::setw(12) << std::setprecision(4) +// << res.max_latency_ms << std::setw(12) << std::setprecision(4) << res.avg_latency_ms << std::setw(12) +// << std::setprecision(4) << res.p50_latency_ms << std::setw(12) << std::setprecision(4) +// << res.p99_latency_ms << std::endl; // } +// std::cout << "================================================================================\n"; -// outfile.close(); -// std::cout << "Results saved to " << filename << std::endl; +// // 打印带宽统计 +// std::cout << "\nBandwidth Statistics (MB/s):\n"; +// std::cout << "================================================================================\n"; +// std::cout << std::left << std::setw(12) << "Size(B)" << std::setw(12) << "Total(B)" << std::setw(12) << "Min" +// << std::setw(12) << "Max" << std::setw(12) << "Avg" << std::setw(12) << "P50" << std::setw(12) << "P99" +// << std::endl; +// std::cout << "================================================================================\n"; + +// for (const auto& res : results) { +// std::cout << std::left << std::setw(12) << res.packet_size << std::setw(12) << res.total_bytes << std::setw(12) +// << std::setprecision(4) << res.min_bandwidth_MBs << std::setw(12) << std::setprecision(4) +// << res.max_bandwidth_MBs << std::setw(12) << std::setprecision(4) << res.avg_bandwidth_MBs +// << std::setw(12) << std::setprecision(4) << res.p50_bandwidth_MBs << std::setw(12) +// << std::setprecision(4) << res.p99_bandwidth_MBs << std::endl; +// } +// std::cout << "================================================================================\n"; // } -int main(int argc, char** argv) -{ - gflags::ParseCommandLineFlags(&argc, &argv, true); +// // void save(const std::vector& results, const std::string& filename) +// // { +// // std::ofstream outfile(filename); +// // if (!outfile.is_open()) { +// // std::cerr << "Failed to open output file: " << filename << std::endl; +// // return; +// // } + +// // outfile << "Packet Size (Bytes),Total Bytes (Bytes),Amount of Packet" +// // << "Min Latency (ms),Max Latency (ms),Avg Latency (ms), Latency StdDev (ms)," +// // << "Min Bandwidth (MB/s),Max Bandwidth (MB/s),Avg Bandwidth (MB/s),Bandwidth StdDev (MB/s)," +// // << "Success Rate (%)\n"; + +// // for (const auto& res : results) { +// // outfile << res.packet_size << "," << res.total_bytes << "," << res.packet_num << "," << std::setprecision(9) +// // << res.min_latency_ms << "," << res.max_latency_ms << "," << res.avg_latency_ms << "," +// // << res.stddev_latency << "," << res.min_bandwidth_MBs << "," << res.max_bandwidth_MBs << "," +// // << res.avg_bandwidth_MBs << "," << res.stddev_bandwidth << "," << res.success_rate << "\n"; +// // } + +// // outfile.close(); +// // std::cout << "Results saved to " << filename << std::endl; +// // } + +// int main(int argc, char** argv) +// { +// gflags::ParseCommandLineFlags(&argc, &argv, true); - if (!FLAGS_send && !FLAGS_recv) { - std::cerr << "Please specify mode: --send or --recv" << std::endl; - return 1; - } +// if (!FLAGS_send && !FLAGS_recv) { +// std::cerr << "Please specify mode: --send or --recv" << std::endl; +// return 1; +// } - if (FLAGS_send && FLAGS_recv) { - std::cerr << "Cannot specify both --send and --recv" << std::endl; - return 1; - } +// if (FLAGS_send && FLAGS_recv) { +// std::cerr << "Cannot specify both --send and --recv" << std::endl; +// return 1; +// } - auto end_point = std::make_shared(FLAGS_device, FLAGS_IB_PORT, FLAGS_LINK_TYPE, FLAGS_num_qp); - initConnection(end_point); +// auto end_point = std::make_shared(FLAGS_device, FLAGS_IB_PORT, FLAGS_LINK_TYPE, FLAGS_num_qp); +// initConnection(end_point); - std::vector packet_sizes; - for (int i = FLAGS_min_packet_size; i <= FLAGS_max_packet_size; i++) { - packet_sizes.push_back(1 << i); - } +// std::vector packet_sizes; +// for (int i = FLAGS_min_packet_size; i <= FLAGS_max_packet_size; i++) { +// packet_sizes.push_back(1 << i); +// } - std::vector results; - for (size_t size : packet_sizes) { +// std::vector results; +// for (size_t size : packet_sizes) { - size_t total_bytes = size * FLAGS_num_packets; - std::cout << "\nTesting with packet size: " << size << " bytes (" << (size >> 10) - << " KB), total: " << (double)total_bytes / (1024 * 1024) - << " MB, number of packets: " << FLAGS_num_packets << std::endl; +// size_t total_bytes = size * FLAGS_num_packets; +// std::cout << "\nTesting with packet size: " << size << " bytes (" << (size >> 10) +// << " KB), total: " << (double)total_bytes / (1024 * 1024) +// << " MB, number of packets: " << FLAGS_num_packets << std::endl; - Result_t result; - runTest(end_point, size, total_bytes, result); - results.push_back(result); - } +// Result_t result; +// runTest(end_point, size, total_bytes, result); +// results.push_back(result); +// } - print(results); - // save(results, FLAGS_OUTPUT_FILE); +// print(results); +// // save(results, FLAGS_OUTPUT_FILE); - return 0; -} \ No newline at end of file +// return 0; +// } \ No newline at end of file