From b3caf74182d3fb663b7188a3b52ed9c07559edd8 Mon Sep 17 00:00:00 2001 From: Darryl Abbate Date: Thu, 12 Jun 2025 16:41:33 -0700 Subject: [PATCH 1/5] [v2.2.x]prov/efa: Create abstraction for IBV CQ polling sequence This abstracts away and centralizes some of the necessary logic for correctly polling the IBV CQ. This also encapsulates the polling state in the efa_ibv_cq struct, rather than using local variables during the polling procedures. Signed-off-by: Darryl Abbate --- prov/efa/src/efa_cq.c | 37 ++++------------------- prov/efa/src/efa_cq.h | 56 ++++++++++++++++++++++++++++++++++- prov/efa/src/rdm/efa_rdm_cq.c | 30 ++++--------------- 3 files changed, 65 insertions(+), 58 deletions(-) diff --git a/prov/efa/src/efa_cq.c b/prov/efa/src/efa_cq.c index 4b5947eac5c..12898165bf7 100644 --- a/prov/efa/src/efa_cq.c +++ b/prov/efa/src/efa_cq.c @@ -248,29 +248,21 @@ efa_cq_proc_ibv_recv_rdma_with_imm_completion(struct efa_base_ep *base_ep, */ int efa_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq) { - bool should_end_poll = false; struct efa_base_ep *base_ep; struct efa_cq *cq; struct efa_domain *efa_domain; struct fi_cq_tagged_entry cq_entry = {0}; - struct fi_cq_err_entry err_entry; int err = 0; size_t num_cqe = 0; /* Count of read entries */ int prov_errno, opcode; - /* Initialize an empty ibv_poll_cq_attr struct for ibv_start_poll. - * EFA expects .comp_mask = 0, or otherwise returns EINVAL. - */ - struct ibv_poll_cq_attr poll_cq_attr = {.comp_mask = 0}; - cq = container_of(ibv_cq, struct efa_cq, ibv_cq); efa_domain = container_of(cq->util_cq.domain, struct efa_domain, util_domain); /* Call ibv_start_poll only once */ - err = ibv_start_poll(cq->ibv_cq.ibv_cq_ex, &poll_cq_attr); - should_end_poll = !err; + efa_cq_start_poll(ibv_cq); - while (!err) { + while (efa_cq_wc_available(ibv_cq)) { base_ep = efa_domain->qp_table[ibv_wc_read_qp_num(cq->ibv_cq.ibv_cq_ex) & efa_domain->qp_table_sz_m1]->base_ep; opcode = ibv_wc_read_opcode(cq->ibv_cq.ibv_cq_ex); if (cq->ibv_cq.ibv_cq_ex->status) { @@ -328,29 +320,10 @@ int efa_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq) break; } - err = ibv_next_poll(cq->ibv_cq.ibv_cq_ex); + efa_cq_next_poll(ibv_cq); } - - if (err && err != ENOENT) { - err = err > 0 ? err : -err; - prov_errno = ibv_wc_read_vendor_err(cq->ibv_cq.ibv_cq_ex); - EFA_WARN(FI_LOG_CQ, - "Unexpected error when polling ibv cq, err: %s (%d) " - "prov_errno: %s (%d)\n", - fi_strerror(err), err, efa_strerror(prov_errno), - prov_errno); - efa_show_help(prov_errno); - err_entry = (struct fi_cq_err_entry) { - .err = err, - .prov_errno = prov_errno, - .op_context = NULL, - }; - ofi_cq_write_error(&cq->util_cq, &err_entry); - } - - if (should_end_poll) - ibv_end_poll(cq->ibv_cq.ibv_cq_ex); - + err = ibv_cq->poll_err; + efa_cq_end_poll(ibv_cq); return err; } diff --git a/prov/efa/src/efa_cq.h b/prov/efa/src/efa_cq.h index 57e14ae476c..3088d720ec0 100644 --- a/prov/efa/src/efa_cq.h +++ b/prov/efa/src/efa_cq.h @@ -15,6 +15,8 @@ struct efa_ibv_cq { struct ibv_cq_ex *ibv_cq_ex; enum ibv_cq_ex_type ibv_cq_ex_type; struct ibv_comp_channel *channel; + bool poll_active; + int poll_err; }; struct efa_ibv_cq_poll_list_entry { @@ -372,10 +374,62 @@ static inline int efa_write_error_msg(struct efa_base_ep *ep, fi_addr_t addr, return 0; } +static inline bool efa_cq_wc_available(struct efa_ibv_cq *cq) +{ + return cq->poll_active && !cq->poll_err; +} + +static inline void efa_cq_report_poll_err(struct efa_ibv_cq *cq) +{ + int err = cq->poll_err; + int prov_errno; + + if (err && err != ENOENT) { + err = err > 0 ? err : -err; + prov_errno = ibv_wc_read_vendor_err(cq->ibv_cq_ex); + EFA_WARN(FI_LOG_CQ, "Encountered error during CQ polling. err: %s (%d), prov_errno: %s (%d)\n", + fi_strerror(err), err, efa_strerror(prov_errno), prov_errno); + efa_show_help(prov_errno); + ofi_cq_write_error(&container_of(cq, struct efa_cq, ibv_cq)->util_cq , &(struct fi_cq_err_entry) { + .err = err, + .prov_errno = prov_errno, + }); + } +} + +static inline void efa_cq_start_poll(struct efa_ibv_cq *cq) +{ + /* Pass an empty ibv_poll_cq_attr struct (zero-initialized) for + * ibv_start_poll. EFA expects .comp_mask = 0, or otherwise returns EINVAL. + */ + assert(!cq->poll_active); + cq->poll_err = ibv_start_poll(cq->ibv_cq_ex, &(struct ibv_poll_cq_attr){0}); + if (OFI_LIKELY(!cq->poll_err)) + cq->poll_active = true; + else + efa_cq_report_poll_err(cq); +} + +static inline void efa_cq_next_poll(struct efa_ibv_cq *cq) +{ + assert(cq->poll_active); + cq->poll_err = ibv_next_poll(cq->ibv_cq_ex); + if (OFI_UNLIKELY(cq->poll_err)) + efa_cq_report_poll_err(cq); +} + +static inline void efa_cq_end_poll(struct efa_ibv_cq *cq) +{ + if (OFI_LIKELY(cq->poll_active)) + ibv_end_poll(cq->ibv_cq_ex); + cq->poll_active = false; + cq->poll_err = 0; +} + int efa_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq); int efa_cq_trywait(struct efa_cq *cq); int efa_cq_signal(struct fid_cq *cq_fid); int efa_poll_events(struct efa_cq *cq, int timeout); int efa_cq_control(struct fid *cq, int command, void *arg); -#endif /* end of _EFA_CQ_H*/ \ No newline at end of file +#endif /* end of _EFA_CQ_H*/ diff --git a/prov/efa/src/rdm/efa_rdm_cq.c b/prov/efa/src/rdm/efa_rdm_cq.c index 54650b31257..d88fa7897a5 100644 --- a/prov/efa/src/rdm/efa_rdm_cq.c +++ b/prov/efa/src/rdm/efa_rdm_cq.c @@ -419,18 +419,12 @@ static int efa_rdm_cq_match_ep(struct dlist_entry *item, const void *ep) */ int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq) { - bool should_end_poll = false; - /* Initialize an empty ibv_poll_cq_attr struct for ibv_start_poll. - * EFA expects .comp_mask = 0, or otherwise returns EINVAL. - */ - struct ibv_poll_cq_attr poll_cq_attr = {.comp_mask = 0}; struct efa_rdm_pke *pkt_entry; int err; int opcode; size_t i = 0; int prov_errno; struct efa_rdm_ep *ep = NULL; - struct fi_cq_err_entry err_entry; struct efa_cq *efa_cq; struct efa_domain *efa_domain; struct efa_qp *qp; @@ -442,10 +436,9 @@ int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq) dlist_init(&rx_progressed_ep_list); /* Call ibv_start_poll only once */ - err = ibv_start_poll(ibv_cq->ibv_cq_ex, &poll_cq_attr); - should_end_poll = !err; + efa_cq_start_poll(ibv_cq); - while (!err) { + while (efa_cq_wc_available(ibv_cq)) { pkt_entry = (void *)(uintptr_t)ibv_cq->ibv_cq_ex->wr_id; qp = efa_domain->qp_table[ibv_wc_read_qp_num(ibv_cq->ibv_cq_ex) & efa_domain->qp_table_sz_m1]; ep = container_of(qp->base_ep, struct efa_rdm_ep, base_ep); @@ -528,24 +521,11 @@ int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq) * ibv_next_poll MUST be call after the current WC is fully processed, * which prevents later calls on ibv_cq_ex from reading the wrong WC. */ - err = ibv_next_poll(ibv_cq->ibv_cq_ex); - } - - if (err && err != ENOENT) { - err = err > 0 ? err : -err; - prov_errno = ibv_wc_read_vendor_err(ibv_cq->ibv_cq_ex); - EFA_WARN(FI_LOG_CQ, "Unexpected error when polling ibv cq, err: %s (%d) prov_errno: %s (%d)\n", fi_strerror(err), err, efa_strerror(prov_errno), prov_errno); - efa_show_help(prov_errno); - err_entry = (struct fi_cq_err_entry) { - .err = err, - .prov_errno = prov_errno, - .op_context = NULL - }; - ofi_cq_write_error(&efa_cq->util_cq, &err_entry); + efa_cq_next_poll(ibv_cq); } - if (should_end_poll) - ibv_end_poll(ibv_cq->ibv_cq_ex); + err = ibv_cq->poll_err; + efa_cq_end_poll(ibv_cq); dlist_foreach_container_safe( &rx_progressed_ep_list, struct efa_rdm_ep, ep, entry, tmp) { From 7ae4a5f87a0bd21da438b410d7129c59a3c09b8d Mon Sep 17 00:00:00 2001 From: Darryl Abbate Date: Mon, 23 Jun 2025 12:57:08 -0700 Subject: [PATCH 2/5] [v2.2.x]prov/efa: Add generic utility for fetching RDM packet type This also adds a flag to efa_rdm_pke to indicate headerless packets, as well as a sentinel packet type value for headerless packets Signed-off-by: Darryl Abbate --- prov/efa/src/rdm/efa_rdm_ep_utils.c | 17 ++++++--------- prov/efa/src/rdm/efa_rdm_pke.c | 9 ++++++++ prov/efa/src/rdm/efa_rdm_pke.h | 11 +++++++--- prov/efa/src/rdm/efa_rdm_pke_cmd.c | 8 +++---- prov/efa/src/rdm/efa_rdm_pke_rtm.c | 2 +- prov/efa/src/rdm/efa_rdm_pke_utils.h | 32 ++++++++++++++++++++++++++++ prov/efa/src/rdm/efa_rdm_protocol.h | 2 ++ 7 files changed, 63 insertions(+), 18 deletions(-) diff --git a/prov/efa/src/rdm/efa_rdm_ep_utils.c b/prov/efa/src/rdm/efa_rdm_ep_utils.c index e7a0063da79..5c0291801c0 100644 --- a/prov/efa/src/rdm/efa_rdm_ep_utils.c +++ b/prov/efa/src/rdm/efa_rdm_ep_utils.c @@ -687,7 +687,6 @@ ssize_t efa_rdm_ep_post_queued_pkts(struct efa_rdm_ep *ep, struct dlist_entry *tmp; struct efa_rdm_peer *peer; struct efa_rdm_pke *pkt_entry; - struct efa_rdm_base_hdr *base_hdr; ssize_t ret; dlist_foreach_container_safe(pkts, struct efa_rdm_pke, @@ -699,16 +698,14 @@ ssize_t efa_rdm_ep_post_queued_pkts(struct efa_rdm_ep *ep, */ dlist_remove(&pkt_entry->entry); - if (pkt_entry->flags & EFA_RDM_PKE_SEND_TO_USER_RECV_QP) { + switch (efa_rdm_pkt_type_of(pkt_entry)) { + case EFA_RDM_RMA_CONTEXT_PKT: + assert(((struct efa_rdm_rma_context_pkt *)pkt_entry->wiredata)->context_type == EFA_RDM_RDMA_WRITE_CONTEXT); + ret = efa_rdm_pke_write(pkt_entry); + break; + default: ret = efa_rdm_pke_sendv(&pkt_entry, 1, 0); - } else { - base_hdr = efa_rdm_pke_get_base_hdr(pkt_entry); - if (base_hdr->type == EFA_RDM_RMA_CONTEXT_PKT) { - assert(((struct efa_rdm_rma_context_pkt *)pkt_entry->wiredata)->context_type == EFA_RDM_RDMA_WRITE_CONTEXT); - ret = efa_rdm_pke_write(pkt_entry); - } else { - ret = efa_rdm_pke_sendv(&pkt_entry, 1, 0); - } + break; } if (ret) { diff --git a/prov/efa/src/rdm/efa_rdm_pke.c b/prov/efa/src/rdm/efa_rdm_pke.c index cf3387b80c6..d65cd490299 100644 --- a/prov/efa/src/rdm/efa_rdm_pke.c +++ b/prov/efa/src/rdm/efa_rdm_pke.c @@ -71,6 +71,15 @@ struct efa_rdm_pke *efa_rdm_pke_alloc(struct efa_rdm_ep *ep, pkt_entry->payload = NULL; pkt_entry->payload_size = 0; pkt_entry->payload_mr = NULL; + + switch (alloc_type) { + case EFA_RDM_PKE_FROM_USER_RX_POOL: + case EFA_RDM_PKE_FROM_READ_COPY_POOL: + pkt_entry->flags |= EFA_RDM_PKE_HAS_NO_BASE_HDR; + break; + default: + break; + } return pkt_entry; } diff --git a/prov/efa/src/rdm/efa_rdm_pke.h b/prov/efa/src/rdm/efa_rdm_pke.h index 97c591f2cc6..2e179a0db85 100644 --- a/prov/efa/src/rdm/efa_rdm_pke.h +++ b/prov/efa/src/rdm/efa_rdm_pke.h @@ -15,6 +15,7 @@ #define EFA_RDM_PKE_DC_LONGCTS_DATA BIT_ULL(3) /**< this DATA packet entry is used by a delivery complete LONGCTS send/write protocol*/ #define EFA_RDM_PKE_LOCAL_WRITE BIT_ULL(4) /**< this packet entry is used as context of an RDMA Write to self */ #define EFA_RDM_PKE_SEND_TO_USER_RECV_QP BIT_ULL(5) /**< this packet entry is used for posting send to a dedicated QP that doesn't expect any pkt hdrs */ +#define EFA_RDM_PKE_HAS_NO_BASE_HDR BIT_ULL(6) /**< This packet entry's wiredata contains no base header */ #define EFA_RDM_PKE_ALIGNMENT 128 @@ -133,9 +134,13 @@ struct efa_rdm_pke { /** * @brief flags indicating the status of the packet entry * - * @details - * Possible flags include #EFA_RDM_PKE_IN_USE #EFA_RDM_PKE_RNR_RETRANSMIT, - * #EFA_RDM_PKE_LOCAL_READ, and #EFA_RDM_PKE_DC_LONGCTS_DATA + * @see #EFA_RDM_PKE_IN_USE + * @see #EFA_RDM_PKE_RNR_RETRANSMIT + * @see #EFA_RDM_PKE_LOCAL_READ + * @see #EFA_RDM_PKE_DC_LONGCTS_DATA + * @see #EFA_RDM_PKE_LOCAL_WRITE + * @see #EFA_RDM_PKE_SEND_TO_USER_RECV_QP + * @see #EFA_RDM_PKE_HAS_NO_BASE_HDR */ uint32_t flags; diff --git a/prov/efa/src/rdm/efa_rdm_pke_cmd.c b/prov/efa/src/rdm/efa_rdm_pke_cmd.c index 39218d4b63d..1766a6f123f 100644 --- a/prov/efa/src/rdm/efa_rdm_pke_cmd.c +++ b/prov/efa/src/rdm/efa_rdm_pke_cmd.c @@ -55,7 +55,7 @@ int efa_rdm_pke_fill_data(struct efa_rdm_pke *pkt_entry, if (efa_both_support_zero_hdr_data_transfer(pkt_entry->ep, ope->peer)) { /* zero hdr transfer only happens for eager msg (non-tagged) pkt */ assert(pkt_type == EFA_RDM_EAGER_MSGRTM_PKT); - pkt_entry->flags |= EFA_RDM_PKE_SEND_TO_USER_RECV_QP; + pkt_entry->flags |= EFA_RDM_PKE_SEND_TO_USER_RECV_QP | EFA_RDM_PKE_HAS_NO_BASE_HDR; } /* Only 3 categories of packets has data_size and data_offset: @@ -427,7 +427,7 @@ void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int prov_errno, switch (pkt_entry->ope->type) { case EFA_RDM_TXE: txe = pkt_entry->ope; - if (!(pkt_entry->flags & EFA_RDM_PKE_SEND_TO_USER_RECV_QP) && efa_rdm_pke_get_base_hdr(pkt_entry)->type == EFA_RDM_HANDSHAKE_PKT) { + if (efa_rdm_pkt_type_of(pkt_entry) == EFA_RDM_HANDSHAKE_PKT) { switch (prov_errno) { case EFA_IO_COMP_STATUS_REMOTE_ERROR_RNR: /* @@ -584,7 +584,7 @@ void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry, struct ef } /* Start handling pkts with hdrs */ - switch (efa_rdm_pke_get_base_hdr(pkt_entry)->type) { + switch (efa_rdm_pkt_type_of(pkt_entry)) { case EFA_RDM_HANDSHAKE_PKT: efa_rdm_txe_release(pkt_entry->ope); break; @@ -796,7 +796,7 @@ void efa_rdm_pke_proc_received_no_hdr(struct efa_rdm_pke *pkt_entry, bool has_im { struct efa_rdm_ope *rxe = pkt_entry->ope; - assert(pkt_entry->alloc_type == EFA_RDM_PKE_FROM_USER_RX_POOL); + assert(pkt_entry->flags & EFA_RDM_PKE_HAS_NO_BASE_HDR); assert(rxe); if (has_imm_data) { diff --git a/prov/efa/src/rdm/efa_rdm_pke_rtm.c b/prov/efa/src/rdm/efa_rdm_pke_rtm.c index 02f99260d52..14586233e36 100644 --- a/prov/efa/src/rdm/efa_rdm_pke_rtm.c +++ b/prov/efa/src/rdm/efa_rdm_pke_rtm.c @@ -558,7 +558,7 @@ ssize_t efa_rdm_pke_init_eager_msgrtm(struct efa_rdm_pke *pkt_entry, { int ret; - if (pkt_entry->flags & EFA_RDM_PKE_SEND_TO_USER_RECV_QP) + if (pkt_entry->flags & EFA_RDM_PKE_HAS_NO_BASE_HDR) ret = efa_rdm_pke_init_eager_msgrtm_zero_hdr(pkt_entry, txe); else ret = efa_rdm_pke_init_rtm_with_payload(pkt_entry, diff --git a/prov/efa/src/rdm/efa_rdm_pke_utils.h b/prov/efa/src/rdm/efa_rdm_pke_utils.h index 89de4931e6f..3cd7ae41015 100644 --- a/prov/efa/src/rdm/efa_rdm_pke_utils.h +++ b/prov/efa/src/rdm/efa_rdm_pke_utils.h @@ -11,6 +11,12 @@ #include "efa_rdm_pke_rtm.h" #include "efa_mr.h" +static inline +bool efa_rdm_pke_has_base_hdr(struct efa_rdm_pke *pke) +{ + return !(pke->flags & EFA_RDM_PKE_HAS_NO_BASE_HDR); +} + /** * @brief get the base header of an pke * @@ -23,6 +29,32 @@ struct efa_rdm_base_hdr *efa_rdm_pke_get_base_hdr(struct efa_rdm_pke *pke) return (struct efa_rdm_base_hdr *)pke->wiredata; } +#define efa_rdm_pkt_type_of(obj) _Generic((obj), \ + struct efa_rdm_pke *: efa_rdm_pkt_type_of_pke, \ + struct efa_rdm_ope *: efa_rdm_pkt_type_of_ope, \ + default: efa_rdm_pkt_type_of_base_hdr)(obj) + +static inline +int efa_rdm_pkt_type_of_base_hdr(struct efa_rdm_base_hdr *base_hdr) +{ + return base_hdr->type; +} + +static inline +int efa_rdm_pkt_type_of_pke(struct efa_rdm_pke *pke) +{ + if (efa_rdm_pke_has_base_hdr(pke)) { + return efa_rdm_pkt_type_of_base_hdr(efa_rdm_pke_get_base_hdr(pke)); + } + return EFA_RDM_HEADERLESS_PKT; +} + +static inline +int efa_rdm_pkt_type_of_ope(struct efa_rdm_ope *ope) +{ + return efa_rdm_pkt_type_of_pke(container_of(ope, struct efa_rdm_pke, ope)); +} + /** * @brief return the segment offset of user data in packet entry * diff --git a/prov/efa/src/rdm/efa_rdm_protocol.h b/prov/efa/src/rdm/efa_rdm_protocol.h index 975cbd44e94..845c1e185bd 100644 --- a/prov/efa/src/rdm/efa_rdm_protocol.h +++ b/prov/efa/src/rdm/efa_rdm_protocol.h @@ -38,6 +38,8 @@ */ #define EFA_RDM_MAX_NUM_EXINFO (4) +#define EFA_RDM_HEADERLESS_PKT 0 /**< Sentinel value for headerless packets */ + /* * Packet type ID of each packet type (section 1.3) * From 33f5839e50fc66133c8ae8c1d2fc7511acf24ac4 Mon Sep 17 00:00:00 2001 From: Darryl Abbate Date: Tue, 17 Jun 2025 11:46:34 -0700 Subject: [PATCH 3/5] [v2.2.x]prov/efa: Deduce queued packet list from op entry Signed-off-by: Darryl Abbate Co-authored-by: Shi Jin --- prov/efa/src/rdm/efa_rdm_ep.h | 4 +--- prov/efa/src/rdm/efa_rdm_ep_utils.c | 15 +++++++++++---- prov/efa/src/rdm/efa_rdm_pke_cmd.c | 16 ++-------------- prov/efa/test/efa_unit_test_rnr.c | 3 ++- 4 files changed, 16 insertions(+), 22 deletions(-) diff --git a/prov/efa/src/rdm/efa_rdm_ep.h b/prov/efa/src/rdm/efa_rdm_ep.h index c32350f7473..d4aea381bc9 100644 --- a/prov/efa/src/rdm/efa_rdm_ep.h +++ b/prov/efa/src/rdm/efa_rdm_ep.h @@ -248,9 +248,7 @@ int efa_rdm_ep_post_user_recv_buf(struct efa_rdm_ep *ep, struct efa_rdm_ope *rxe struct efa_rdm_peer; -void efa_rdm_ep_queue_rnr_pkt(struct efa_rdm_ep *ep, struct dlist_entry *list, - struct efa_rdm_pke *pkt_entry, - struct efa_rdm_peer *peer); +void efa_rdm_ep_queue_rnr_pkt(struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry); ssize_t efa_rdm_ep_post_queued_pkts(struct efa_rdm_ep *ep, struct dlist_entry *pkts); diff --git a/prov/efa/src/rdm/efa_rdm_ep_utils.c b/prov/efa/src/rdm/efa_rdm_ep_utils.c index 5c0291801c0..5d0c4618766 100644 --- a/prov/efa/src/rdm/efa_rdm_ep_utils.c +++ b/prov/efa/src/rdm/efa_rdm_ep_utils.c @@ -452,19 +452,26 @@ void efa_rdm_ep_record_tx_op_completed(struct efa_rdm_ep *ep, struct efa_rdm_pke * @param[in] pkt_entry packet entry that encounter RNR * @param[in] peer efa_rdm_peer struct of the receiver */ -void efa_rdm_ep_queue_rnr_pkt(struct efa_rdm_ep *ep, struct dlist_entry *list, - struct efa_rdm_pke *pkt_entry, - struct efa_rdm_peer *peer) +void efa_rdm_ep_queue_rnr_pkt(struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry) { static const int random_min_timeout = 40; static const int random_max_timeout = 120; + struct efa_rdm_peer *peer; + struct efa_rdm_ope *ope = pkt_entry->ope; #if ENABLE_DEBUG dlist_remove(&pkt_entry->dbg_entry); #endif - dlist_insert_tail(&pkt_entry->entry, list); + peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr); + + assert(ope); + dlist_insert_tail(&pkt_entry->entry, &ope->queued_pkts); ep->efa_rnr_queued_pkt_cnt += 1; assert(peer); + if (!(ope->internal_flags & EFA_RDM_OPE_QUEUED_RNR)) { + ope->internal_flags |= EFA_RDM_OPE_QUEUED_RNR; + dlist_insert_tail(&ope->queued_entry, &efa_rdm_ep_domain(ep)->ope_queued_list); + } if (!(pkt_entry->flags & EFA_RDM_PKE_RNR_RETRANSMIT)) { /* This is the first time this packet encountered RNR, * we are NOT going to put the peer in backoff mode just yet. diff --git a/prov/efa/src/rdm/efa_rdm_pke_cmd.c b/prov/efa/src/rdm/efa_rdm_pke_cmd.c index 1766a6f123f..0edbf64b469 100644 --- a/prov/efa/src/rdm/efa_rdm_pke_cmd.c +++ b/prov/efa/src/rdm/efa_rdm_pke_cmd.c @@ -401,7 +401,6 @@ void efa_rdm_pke_handle_data_copied(struct efa_rdm_pke *pkt_entry) void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int prov_errno, struct efa_rdm_peer *peer) { struct efa_rdm_ope *txe; - struct efa_rdm_ope *rxe; struct efa_rdm_ep *ep; int err = to_fi_errno(prov_errno); @@ -502,12 +501,7 @@ void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int prov_errno, * packets include all REQ, DATA) thus shoud be queued for RNR * only if application wants EFA to manager resource. */ - efa_rdm_ep_queue_rnr_pkt(ep, &txe->queued_pkts, pkt_entry, peer); - if (!(txe->internal_flags & EFA_RDM_OPE_QUEUED_RNR)) { - txe->internal_flags |= EFA_RDM_OPE_QUEUED_RNR; - dlist_insert_tail(&txe->queued_entry, - &efa_rdm_ep_domain(ep)->ope_queued_list); - } + efa_rdm_ep_queue_rnr_pkt(ep, pkt_entry); } } else { efa_rdm_txe_handle_error(pkt_entry->ope, err, prov_errno); @@ -515,7 +509,6 @@ void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int prov_errno, } break; case EFA_RDM_RXE: - rxe = pkt_entry->ope; if (prov_errno == EFA_IO_COMP_STATUS_REMOTE_ERROR_RNR) { /* * This packet is associated with a recv operation, (such packets @@ -523,12 +516,7 @@ void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int prov_errno, * is regardless value of ep->handle_resource_management, because * resource management is only applied to send operation. */ - efa_rdm_ep_queue_rnr_pkt(ep, &rxe->queued_pkts, pkt_entry, peer); - if (!(rxe->internal_flags & EFA_RDM_OPE_QUEUED_RNR)) { - rxe->internal_flags |= EFA_RDM_OPE_QUEUED_RNR; - dlist_insert_tail(&rxe->queued_entry, - &efa_rdm_ep_domain(ep)->ope_queued_list); - } + efa_rdm_ep_queue_rnr_pkt(ep, pkt_entry); } else { efa_rdm_rxe_handle_error(pkt_entry->ope, err, prov_errno); efa_rdm_pke_release_tx(pkt_entry); diff --git a/prov/efa/test/efa_unit_test_rnr.c b/prov/efa/test/efa_unit_test_rnr.c index ac3640987c4..8d150c01192 100644 --- a/prov/efa/test/efa_unit_test_rnr.c +++ b/prov/efa/test/efa_unit_test_rnr.c @@ -48,10 +48,11 @@ void test_efa_rnr_queue_and_resend_impl(struct efa_resource **state, uint32_t op txe = container_of(efa_rdm_ep->txe_list.next, struct efa_rdm_ope, ep_entry); pkt_entry = (struct efa_rdm_pke *)g_ibv_submitted_wr_id_vec[0]; peer = efa_rdm_ep_get_peer(efa_rdm_ep, pkt_entry->addr); + pkt_entry->ope = txe; efa_rdm_ep_record_tx_op_completed(efa_rdm_ep, pkt_entry, peer); - efa_rdm_ep_queue_rnr_pkt(efa_rdm_ep, &txe->queued_pkts, pkt_entry, peer); + efa_rdm_ep_queue_rnr_pkt(efa_rdm_ep, pkt_entry); assert_int_equal(pkt_entry->flags & EFA_RDM_PKE_RNR_RETRANSMIT, EFA_RDM_PKE_RNR_RETRANSMIT); assert_int_equal(efa_rdm_ep->efa_rnr_queued_pkt_cnt, 1); assert_int_equal(efa_rdm_ep_get_peer(efa_rdm_ep, peer_addr)->rnr_queued_pkt_cnt, 1); From 85f57240f29f4970850b3a7be86c08cb50568e2b Mon Sep 17 00:00:00 2001 From: Darryl Abbate Date: Tue, 22 Jul 2025 14:10:10 -0700 Subject: [PATCH 4/5] [v2.2.x]prov/efa: Add generic function to process queued op entries Signed-off-by: Darryl Abbate --- prov/efa/src/efa_domain.c | 90 +++------------------------------- prov/efa/src/rdm/efa_rdm_ope.c | 45 +++++++++++++++++ prov/efa/src/rdm/efa_rdm_ope.h | 3 ++ 3 files changed, 56 insertions(+), 82 deletions(-) diff --git a/prov/efa/src/efa_domain.c b/prov/efa/src/efa_domain.c index 3f359f98860..e1d406b0d90 100644 --- a/prov/efa/src/efa_domain.c +++ b/prov/efa/src/efa_domain.c @@ -819,88 +819,14 @@ void efa_domain_progress_rdm_peers_and_queues(struct efa_domain *domain) if (peer && (peer->flags & EFA_RDM_PEER_IN_BACKOFF)) continue; - if (ope->internal_flags & EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE) { - ret = efa_rdm_ope_repost_ope_queued_before_handshake(ope); - if (ret == -FI_EAGAIN) - continue; - - if (OFI_UNLIKELY(ret)) { - assert(ope->type == EFA_RDM_TXE); - /* efa_rdm_txe_handle_error will remove ope from the queued_list */ - ope->ep->ope_queued_before_handshake_cnt--; - efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_PKT_POST); - continue; - } - - dlist_remove(&ope->queued_entry); - ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE; - ope->ep->ope_queued_before_handshake_cnt--; - } - - if (ope->internal_flags & EFA_RDM_OPE_QUEUED_RNR) { - assert(!dlist_empty(&ope->queued_pkts)); - ret = efa_rdm_ep_post_queued_pkts(ope->ep, &ope->queued_pkts); - - if (ret == -FI_EAGAIN) - continue; - - if (OFI_UNLIKELY(ret)) { - assert(ope->type == EFA_RDM_RXE || ope->type == EFA_RDM_TXE); - if (ope->type == EFA_RDM_RXE) - efa_rdm_rxe_handle_error(ope, -ret, FI_EFA_ERR_PKT_SEND); - else - efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_PKT_SEND); - continue; - } - - dlist_remove(&ope->queued_entry); - ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_RNR; - } - - if (ope->internal_flags & EFA_RDM_OPE_QUEUED_CTRL) { - ret = efa_rdm_ope_post_send(ope, ope->queued_ctrl_type); - if (ret == -FI_EAGAIN) - continue; - - if (OFI_UNLIKELY(ret)) { - assert(ope->type == EFA_RDM_TXE || ope->type == EFA_RDM_RXE); - if (ope->type == EFA_RDM_TXE) - efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_PKT_POST); - else - efa_rdm_rxe_handle_error(ope, -ret, FI_EFA_ERR_PKT_POST); - continue; - } - - /* it can happen that efa_rdm_ope_post_send() released ope - * (if the ope is rxe and packet type is EOR and inject is used). In - * that case rxe's state has been set to EFA_RDM_OPE_FREE and - * it has been removed from ep->op_queued_entry_list, so nothing - * is left to do. - */ - if (ope->state == EFA_RDM_OPE_FREE) - continue; - - ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_CTRL; - dlist_remove(&ope->queued_entry); - } - - if (ope->internal_flags & EFA_RDM_OPE_QUEUED_READ) { - ret = efa_rdm_ope_post_read(ope); - if (ret == -FI_EAGAIN) - continue; - - if (OFI_UNLIKELY(ret)) { - assert(ope->type == EFA_RDM_TXE || ope->type == EFA_RDM_RXE); - if (ope->type == EFA_RDM_TXE) - efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_READ_POST); - else - efa_rdm_rxe_handle_error(ope, -ret, FI_EFA_ERR_READ_POST); - continue; - } - - ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_READ; - dlist_remove(&ope->queued_entry); - } + if (efa_rdm_ope_process_queued_ope(ope, EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE)) + continue; + if (efa_rdm_ope_process_queued_ope(ope, EFA_RDM_OPE_QUEUED_RNR)) + continue; + if (efa_rdm_ope_process_queued_ope(ope, EFA_RDM_OPE_QUEUED_CTRL)) + continue; + if (efa_rdm_ope_process_queued_ope(ope, EFA_RDM_OPE_QUEUED_READ)) + continue; } /* * Send data packets until window or data queue is exhausted. diff --git a/prov/efa/src/rdm/efa_rdm_ope.c b/prov/efa/src/rdm/efa_rdm_ope.c index 733381ee804..81ee15f663b 100644 --- a/prov/efa/src/rdm/efa_rdm_ope.c +++ b/prov/efa/src/rdm/efa_rdm_ope.c @@ -1913,3 +1913,48 @@ ssize_t efa_rdm_ope_repost_ope_queued_before_handshake(struct efa_rdm_ope *ope) return -FI_EINVAL; } } + +int efa_rdm_ope_process_queued_ope(struct efa_rdm_ope *ope, uint16_t flag) +{ + int ret = 0; + + assert(flag & EFA_RDM_OPE_QUEUED_FLAGS); + + if (!(ope->internal_flags & flag)) + return 0; + + switch (flag) { + case EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE: + ret = efa_rdm_ope_repost_ope_queued_before_handshake(ope); + --ope->ep->ope_queued_before_handshake_cnt; + break; + case EFA_RDM_OPE_QUEUED_RNR: + assert(!dlist_empty(&ope->queued_pkts)); + ret = efa_rdm_ep_post_queued_pkts(ope->ep, &ope->queued_pkts); + break; + case EFA_RDM_OPE_QUEUED_CTRL: + ret = efa_rdm_ope_post_send(ope, ope->queued_ctrl_type); + break; + case EFA_RDM_OPE_QUEUED_READ: + ret = efa_rdm_ope_post_read(ope); + break; + default: + break; + } + + if (OFI_UNLIKELY(ret)) { + if (ret == -FI_EAGAIN) + return ret; + + assert(ope->type == EFA_RDM_TXE || ope->type == EFA_RDM_RXE); + if (ope->type == EFA_RDM_TXE) + efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_PKT_POST); + else + efa_rdm_rxe_handle_error(ope, -ret, FI_EFA_ERR_PKT_POST); + return ret; + } + + ope->internal_flags &= ~flag; + dlist_remove(&ope->queued_entry); + return ret; +} diff --git a/prov/efa/src/rdm/efa_rdm_ope.h b/prov/efa/src/rdm/efa_rdm_ope.h index cd8a03c0a71..ede88f54f31 100644 --- a/prov/efa/src/rdm/efa_rdm_ope.h +++ b/prov/efa/src/rdm/efa_rdm_ope.h @@ -340,4 +340,7 @@ ssize_t efa_rdm_ope_post_send_or_queue(struct efa_rdm_ope *ope, int pkt_type); ssize_t efa_rdm_ope_repost_ope_queued_before_handshake(struct efa_rdm_ope *ope); ssize_t efa_rdm_txe_prepare_local_read_pkt_entry(struct efa_rdm_ope *txe); + +int efa_rdm_ope_process_queued_ope(struct efa_rdm_ope *ope, uint16_t flag); + #endif From 2ade2e7511d523265abab5a36dff6dfb0074cca6 Mon Sep 17 00:00:00 2001 From: Darryl Abbate Date: Fri, 13 Jun 2025 21:57:53 -0700 Subject: [PATCH 5/5] [v2.2.x]prov/efa: Fix wait_send procedure This leverages a specialized procedure for processing outstanding completions of an EP to be closed. Unless RM is enabled (FI_RM_ENABLED) and the packet types are RECEIPT or EOR, the packet entries are simply released. Otherwise, the packets are queued for reposting if RNR was encountered. This avoids a lot of overhead with the standard completion polling procedure. Signed-off-by: Darryl Abbate --- prov/efa/src/rdm/efa_rdm_cq.c | 224 +++++++++++++++++++++------- prov/efa/src/rdm/efa_rdm_cq.h | 1 + prov/efa/src/rdm/efa_rdm_ep_fiops.c | 49 +++++- 3 files changed, 220 insertions(+), 54 deletions(-) diff --git a/prov/efa/src/rdm/efa_rdm_cq.c b/prov/efa/src/rdm/efa_rdm_cq.c index d88fa7897a5..da66e3197c8 100644 --- a/prov/efa/src/rdm/efa_rdm_cq.c +++ b/prov/efa/src/rdm/efa_rdm_cq.c @@ -410,73 +410,136 @@ static int efa_rdm_cq_match_ep(struct dlist_entry *item, const void *ep) return (container_of(item, struct efa_rdm_ep, entry) == ep) ; } +static inline struct efa_rdm_ep *efa_rdm_cq_get_rdm_ep(struct efa_ibv_cq *cq, struct efa_domain *efa_domain) +{ + struct efa_base_ep *base_ep = efa_domain->qp_table[ibv_wc_read_qp_num(cq->ibv_cq_ex) & efa_domain->qp_table_sz_m1]->base_ep; + return container_of(base_ep, struct efa_rdm_ep, base_ep); +} + /** - * @brief poll rdma-core cq and process the cq entry + * @brief Process work completions for a closing endpoint * - * @param[in] ep_poll the RDM endpoint that polls ibv cq. Note this polling endpoint can be different - * from the endpoint that the completed packet entry was posted from (pkt_entry->ep). - * @param[in] cqe_to_process Max number of cq entry to poll and process. A negative number means to poll until cq empty + * This is a lighter-weight counterpart to #efa_rdm_cq_process_wc(); avoiding + * unnecessary overhead for processing completions for an endpoint that's + * closing anyway by simply releasing the packet entries. Exceptions include + * RECEIPT and EOR packets when a completion fails due to RNR and resource + * management is enabled by the user (FI_RM_ENABLED). In this case, packets are + * queued to be reposted. + * + * @param[in] cq IBV CQ + * @param[in] ep EFA RDM endpoint (to be closed) + * @return Status code for the work completion */ -int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq) +static inline +enum ibv_wc_status efa_rdm_cq_process_wc_closing_ep(struct efa_ibv_cq *cq, struct efa_rdm_ep *ep) { - struct efa_rdm_pke *pkt_entry; - int err; - int opcode; - size_t i = 0; + uint64_t wr_id = cq->ibv_cq_ex->wr_id; + enum ibv_wc_status status = cq->ibv_cq_ex->status; + enum ibv_wc_opcode opcode = ibv_wc_read_opcode(cq->ibv_cq_ex); + struct efa_rdm_pke *pkt_entry = (struct efa_rdm_pke *) wr_id; int prov_errno; - struct efa_rdm_ep *ep = NULL; - struct efa_cq *efa_cq; - struct efa_domain *efa_domain; - struct efa_qp *qp; struct efa_rdm_peer *peer = NULL; - struct dlist_entry rx_progressed_ep_list, *tmp; - efa_cq = container_of(ibv_cq, struct efa_cq, ibv_cq); - efa_domain = container_of(efa_cq->util_cq.domain, struct efa_domain, util_domain); - dlist_init(&rx_progressed_ep_list); - - /* Call ibv_start_poll only once */ - efa_cq_start_poll(ibv_cq); - - while (efa_cq_wc_available(ibv_cq)) { - pkt_entry = (void *)(uintptr_t)ibv_cq->ibv_cq_ex->wr_id; - qp = efa_domain->qp_table[ibv_wc_read_qp_num(ibv_cq->ibv_cq_ex) & efa_domain->qp_table_sz_m1]; - ep = container_of(qp->base_ep, struct efa_rdm_ep, base_ep); #if HAVE_LTTNG - efa_rdm_tracepoint(poll_cq, (size_t) ibv_cq->ibv_cq_ex->wr_id); + efa_rdm_tracepoint(poll_cq, (size_t) wr_id); if (pkt_entry && pkt_entry->ope) efa_rdm_tracepoint(poll_cq_ope, pkt_entry->ope->msg_id, (size_t) pkt_entry->ope->cq_entry.op_context, pkt_entry->ope->total_len, pkt_entry->ope->cq_entry.tag, pkt_entry->ope->addr); #endif - opcode = ibv_wc_read_opcode(ibv_cq->ibv_cq_ex); - if (ibv_cq->ibv_cq_ex->status) { + if (!efa_cq_wc_is_unsolicited(cq->ibv_cq_ex)) { + if (OFI_UNLIKELY(status != IBV_WC_SUCCESS)) { if (pkt_entry) peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr); - prov_errno = efa_rdm_cq_get_prov_errno(ibv_cq->ibv_cq_ex, peer); - switch (opcode) { - case IBV_WC_SEND: /* fall through */ - case IBV_WC_RDMA_WRITE: /* fall through */ - case IBV_WC_RDMA_READ: - efa_rdm_pke_handle_tx_error(pkt_entry, prov_errno, peer); - break; - case IBV_WC_RECV: /* fall through */ - case IBV_WC_RECV_RDMA_WITH_IMM: - if (efa_cq_wc_is_unsolicited(ibv_cq->ibv_cq_ex)) { - EFA_WARN(FI_LOG_CQ, "Receive error %s (%d) for unsolicited write recv", - efa_strerror(prov_errno), prov_errno); - efa_base_ep_write_eq_error(&ep->base_ep, to_fi_errno(prov_errno), prov_errno); + prov_errno = efa_rdm_cq_get_prov_errno(cq->ibv_cq_ex, peer); + if (prov_errno == EFA_IO_COMP_STATUS_REMOTE_ERROR_RNR && + ep->handle_resource_management == FI_RM_ENABLED) { + switch(efa_rdm_pkt_type_of(pkt_entry)) { + case EFA_RDM_RECEIPT_PKT: + case EFA_RDM_EOR_PKT: + efa_rdm_ep_record_tx_op_completed(ep, pkt_entry, peer); + efa_rdm_ep_queue_rnr_pkt(ep, pkt_entry); + return status; + default: break; } - efa_rdm_pke_handle_rx_error(pkt_entry, prov_errno); + } + } + switch (opcode) { + case IBV_WC_SEND: /* fall through */ + case IBV_WC_RDMA_WRITE: /* fall through */ + case IBV_WC_RDMA_READ: + if (pkt_entry) + peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr); + efa_rdm_ep_record_tx_op_completed(ep, pkt_entry, peer); + efa_rdm_pke_release_tx(pkt_entry); + break; + case IBV_WC_RECV: /* fall through */ + case IBV_WC_RECV_RDMA_WITH_IMM: + efa_rdm_pke_release_rx(pkt_entry); + break; + default: + EFA_WARN(FI_LOG_EP_CTRL, "Unhandled opcode: %d\n", opcode); + assert(0 && "Unhandled opcode"); + } + } + return status; +} + +/** + * @brief Process work completions + * + * @param[in] cq IBV CQ + * @param[in] ep EFA RDM endpoint + * @return Status code for the work completion + */ +static inline +enum ibv_wc_status efa_rdm_cq_process_wc(struct efa_ibv_cq *cq, struct efa_rdm_ep *ep) +{ + uint64_t wr_id = cq->ibv_cq_ex->wr_id; + enum ibv_wc_status status = cq->ibv_cq_ex->status; + enum ibv_wc_opcode opcode = ibv_wc_read_opcode(cq->ibv_cq_ex); + struct efa_rdm_pke *pkt_entry = (struct efa_rdm_pke *) wr_id; + int prov_errno; + struct efa_rdm_peer *peer = NULL; + +#if HAVE_LTTNG + efa_rdm_tracepoint(poll_cq, (size_t) wr_id); + if (pkt_entry && pkt_entry->ope) + efa_rdm_tracepoint(poll_cq_ope, pkt_entry->ope->msg_id, + (size_t) pkt_entry->ope->cq_entry.op_context, + pkt_entry->ope->total_len, pkt_entry->ope->cq_entry.tag, + pkt_entry->ope->addr); +#endif + + if (OFI_UNLIKELY(status != IBV_WC_SUCCESS)) { + if (pkt_entry) + peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr); + prov_errno = efa_rdm_cq_get_prov_errno(cq->ibv_cq_ex, peer); + switch (opcode) { + case IBV_WC_SEND: /* fall through */ + case IBV_WC_RDMA_WRITE: /* fall through */ + case IBV_WC_RDMA_READ: + assert(pkt_entry); + efa_rdm_pke_handle_tx_error(pkt_entry, prov_errno, peer); + break; + case IBV_WC_RECV: /* fall through */ + case IBV_WC_RECV_RDMA_WITH_IMM: + if (efa_cq_wc_is_unsolicited(cq->ibv_cq_ex)) { + EFA_WARN(FI_LOG_CQ, "Receive error %s (%d) for unsolicited write recv", + efa_strerror(prov_errno), prov_errno); + efa_base_ep_write_eq_error(&ep->base_ep, to_fi_errno(prov_errno), prov_errno); break; - default: - EFA_WARN(FI_LOG_EP_CTRL, "Unhandled op code %d\n", opcode); - assert(0 && "Unhandled op code"); } + assert(pkt_entry); + efa_rdm_pke_handle_rx_error(pkt_entry, prov_errno); break; + default: + EFA_WARN(FI_LOG_EP_CTRL, "Unhandled opcode: %d\n", opcode); + assert(0 && "Unhandled opcode"); } + } else { switch (opcode) { case IBV_WC_SEND: #if ENABLE_DEBUG @@ -488,7 +551,7 @@ int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq) case IBV_WC_RECV: /* efa_rdm_cq_handle_recv_completion does additional work to determine the source * address and the peer struct. So do not try to identify the peer here. */ - efa_rdm_cq_handle_recv_completion(ibv_cq, pkt_entry, ep); + efa_rdm_cq_handle_recv_completion(cq, pkt_entry, ep); #if ENABLE_DEBUG ep->recv_comps++; #endif @@ -500,7 +563,7 @@ int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq) break; case IBV_WC_RECV_RDMA_WITH_IMM: efa_rdm_cq_proc_ibv_recv_rdma_with_imm_completion( - ibv_cq->ibv_cq_ex, + cq->ibv_cq_ex, FI_REMOTE_CQ_DATA | FI_RMA | FI_REMOTE_WRITE, ep, pkt_entry); break; @@ -509,13 +572,73 @@ int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq) "Unhandled cq type\n"); assert(0 && "Unhandled cq type"); } + } + return status; +} + +void efa_rdm_cq_poll_ibv_cq_closing_ep(struct efa_ibv_cq *ibv_cq, struct efa_rdm_ep *closing_ep) +{ + + struct efa_rdm_ep *ep = NULL; + struct efa_cq *efa_cq = container_of(ibv_cq, struct efa_cq, ibv_cq); + struct efa_domain *efa_domain = container_of(efa_cq->util_cq.domain, struct efa_domain, util_domain); + struct dlist_entry rx_progressed_ep_list, *tmp; + + dlist_init(&rx_progressed_ep_list); + + efa_cq_start_poll(ibv_cq); + while (efa_cq_wc_available(ibv_cq)) { + ep = efa_rdm_cq_get_rdm_ep(ibv_cq, efa_domain); + if (ep == closing_ep) { + if (OFI_UNLIKELY(efa_rdm_cq_process_wc_closing_ep(ibv_cq, ep) != IBV_WC_SUCCESS)) + break; + } else { + if (OFI_UNLIKELY(efa_rdm_cq_process_wc(ibv_cq, ep) != IBV_WC_SUCCESS)) + break; + if (ep->efa_rx_pkts_to_post > 0 && !dlist_find_first_match(&rx_progressed_ep_list, &efa_rdm_cq_match_ep, ep)) + dlist_insert_tail(&ep->entry, &rx_progressed_ep_list); + } + efa_cq_next_poll(ibv_cq); + } + efa_cq_end_poll(ibv_cq); + dlist_foreach_container_safe( + &rx_progressed_ep_list, struct efa_rdm_ep, ep, entry, tmp) { + efa_rdm_ep_post_internal_rx_pkts(ep); + dlist_remove(&ep->entry); + } + assert(dlist_empty(&rx_progressed_ep_list)); +} + +/** + * @brief poll rdma-core cq and process the cq entry + * + * @param[in] ep_poll the RDM endpoint that polls ibv cq. Note this polling endpoint can be different + * from the endpoint that the completed packet entry was posted from (pkt_entry->ep). + * @param[in] cqe_to_process Max number of cq entry to poll and process. A negative number means to poll until cq empty + */ +int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq) +{ + int err; + size_t i = 0; + struct efa_rdm_ep *ep = NULL; + struct efa_cq *efa_cq = container_of(ibv_cq, struct efa_cq, ibv_cq); + struct efa_domain *efa_domain = container_of(efa_cq->util_cq.domain, struct efa_domain, util_domain); + + struct dlist_entry rx_progressed_ep_list, *tmp; + dlist_init(&rx_progressed_ep_list); + + /* Call ibv_start_poll only once */ + efa_cq_start_poll(ibv_cq); + + while (efa_cq_wc_available(ibv_cq)) { + ep = efa_rdm_cq_get_rdm_ep(ibv_cq, efa_domain); + if (OFI_UNLIKELY(efa_rdm_cq_process_wc(ibv_cq, ep) != IBV_WC_SUCCESS)) + break; if (ep->efa_rx_pkts_to_post > 0 && !dlist_find_first_match(&rx_progressed_ep_list, &efa_rdm_cq_match_ep, ep)) dlist_insert_tail(&ep->entry, &rx_progressed_ep_list); - i++; - if (i == cqe_to_process) { + if (++i >= cqe_to_process) break; - } /* * ibv_next_poll MUST be call after the current WC is fully processed, @@ -526,7 +649,6 @@ int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq) err = ibv_cq->poll_err; efa_cq_end_poll(ibv_cq); - dlist_foreach_container_safe( &rx_progressed_ep_list, struct efa_rdm_ep, ep, entry, tmp) { efa_rdm_ep_post_internal_rx_pkts(ep); diff --git a/prov/efa/src/rdm/efa_rdm_cq.h b/prov/efa/src/rdm/efa_rdm_cq.h index f46bbb2d9b9..d8c3e89a48b 100644 --- a/prov/efa/src/rdm/efa_rdm_cq.h +++ b/prov/efa/src/rdm/efa_rdm_cq.h @@ -17,6 +17,7 @@ struct efa_rdm_cq { int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, struct fid_cq **cq_fid, void *context); +void efa_rdm_cq_poll_ibv_cq_closing_ep(struct efa_ibv_cq *ibv_cq, struct efa_rdm_ep *closing_ep); int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq); void efa_rdm_cq_progress_peers_and_queues(struct efa_rdm_cq *efa_rdm_cq); diff --git a/prov/efa/src/rdm/efa_rdm_ep_fiops.c b/prov/efa/src/rdm/efa_rdm_ep_fiops.c index bc46e9ac601..6b1765fa3be 100644 --- a/prov/efa/src/rdm/efa_rdm_ep_fiops.c +++ b/prov/efa/src/rdm/efa_rdm_ep_fiops.c @@ -12,6 +12,7 @@ #include "efa_rdm_rxe_map.h" #include "efa_rdm_pkt_type.h" #include "efa_rdm_pke_req.h" +#include "efa_rdm_pke_utils.h" #include "efa_cntr.h" @@ -824,6 +825,48 @@ bool efa_rdm_ep_has_unfinished_send(struct efa_rdm_ep *efa_rdm_ep) return false; } +static inline void progress_queues_closing_ep(struct efa_rdm_ep *ep) +{ + struct efa_rdm_peer *peer; + struct dlist_entry *tmp; + struct efa_rdm_ope *ope; + struct efa_domain *domain = efa_rdm_ep_domain(ep); + + assert(domain->info->ep_attr->type == FI_EP_RDM); + + /* Update timers for peers that are in backoff list*/ + dlist_foreach_container_safe(&domain->peer_backoff_list, + struct efa_rdm_peer, peer, rnr_backoff_entry, tmp) { + if (ofi_gettime_us() >= peer->rnr_backoff_begin_ts + + peer->rnr_backoff_wait_time) { + peer->flags &= ~EFA_RDM_PEER_IN_BACKOFF; + dlist_remove(&peer->rnr_backoff_entry); + } + } + + dlist_foreach_container_safe(&domain->ope_queued_list, + struct efa_rdm_ope, ope, queued_entry, tmp) { + if (ope->ep == ep) { + switch (efa_rdm_pkt_type_of(ope)) { + case EFA_RDM_RECEIPT_PKT: + case EFA_RDM_EOR_PKT: + if (efa_rdm_ope_process_queued_ope(ope, EFA_RDM_OPE_QUEUED_RNR)) + continue; + if (efa_rdm_ope_process_queued_ope(ope, EFA_RDM_OPE_QUEUED_CTRL)) + continue; + /* fall-thru */ + default: + /* Release all other queued OPEs */ + if (ope->type == EFA_RDM_TXE) + efa_rdm_txe_release(ope); + else + efa_rdm_rxe_release(ope); + break; + } + } + } +} + /* * @brief wait for send to finish * @@ -846,10 +889,10 @@ void efa_rdm_ep_wait_send(struct efa_rdm_ep *efa_rdm_ep) while (efa_rdm_ep_has_unfinished_send(efa_rdm_ep)) { /* poll cq until empty */ if (tx_cq) - (void) efa_rdm_cq_poll_ibv_cq(-1, &tx_cq->ibv_cq); + efa_rdm_cq_poll_ibv_cq_closing_ep(&tx_cq->ibv_cq, efa_rdm_ep); if (rx_cq) - (void) efa_rdm_cq_poll_ibv_cq(-1, &rx_cq->ibv_cq); - efa_domain_progress_rdm_peers_and_queues(efa_rdm_ep_domain(efa_rdm_ep)); + efa_rdm_cq_poll_ibv_cq_closing_ep(&rx_cq->ibv_cq, efa_rdm_ep); + progress_queues_closing_ep(efa_rdm_ep); } ofi_genlock_unlock(&efa_rdm_ep_domain(efa_rdm_ep)->srx_lock);