Skip to content

Commit 75a6fc6

Browse files
committed
prov/efa: Fix wait_send procedure
This leverages a specialized procedure for processing outstanding completions of an EP to be closed. Unless RM is enabled (FI_RM_ENABLED) and the packet types are RECEIPT or EOR, the packet entries are simply released. Otherwise, the packets are queued for reposting if RNR was encountered. This avoids a lot of overhead with the standard completion polling procedure. Signed-off-by: Darryl Abbate <[email protected]>
1 parent 863e5ef commit 75a6fc6

File tree

3 files changed

+220
-56
lines changed

3 files changed

+220
-56
lines changed

prov/efa/src/rdm/efa_rdm_cq.c

Lines changed: 173 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -410,73 +410,134 @@ static int efa_rdm_cq_match_ep(struct dlist_entry *item, const void *ep)
410410
return (container_of(item, struct efa_rdm_ep, entry) == ep) ;
411411
}
412412

413+
static inline struct efa_rdm_ep *efa_rdm_cq_get_rdm_ep(struct efa_ibv_cq *cq, struct efa_domain *efa_domain)
414+
{
415+
struct efa_base_ep *base_ep = efa_domain->qp_table[ibv_wc_read_qp_num(cq->ibv_cq_ex) & efa_domain->qp_table_sz_m1]->base_ep;
416+
return container_of(base_ep, struct efa_rdm_ep, base_ep);
417+
}
418+
413419
/**
414-
* @brief poll rdma-core cq and process the cq entry
420+
* @brief Process work completions for a closing endpoint
415421
*
416-
* @param[in] ep_poll the RDM endpoint that polls ibv cq. Note this polling endpoint can be different
417-
* from the endpoint that the completed packet entry was posted from (pkt_entry->ep).
418-
* @param[in] cqe_to_process Max number of cq entry to poll and process. A negative number means to poll until cq empty
422+
* This is a lighter-weight counterpart to #efa_rdm_cq_process_wc(); avoiding
423+
* unnecessary overhead for processing completions for an endpoint that's
424+
* closing anyway by simply releasing the packet entries. Exceptions include
425+
* RECEIPT and EOR packets when a completion fails due to RNR and resource
426+
* management is enabled by the user (FI_RM_ENABLED). In this case, packets are
427+
* queued to be reposted.
428+
*
429+
* @param[in] cq IBV CQ
430+
* @param[in] ep EFA RDM endpoint (to be closed)
431+
* @return Status code for the work completion
419432
*/
420-
int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
433+
static inline
434+
enum ibv_wc_status efa_rdm_cq_process_wc_closing_ep(struct efa_ibv_cq *cq, struct efa_rdm_ep *ep)
421435
{
422-
struct efa_rdm_pke *pkt_entry;
423-
int err;
424-
int opcode;
425-
size_t i = 0;
436+
uint64_t wr_id = cq->ibv_cq_ex->wr_id;
437+
enum ibv_wc_status status = cq->ibv_cq_ex->status;
438+
enum ibv_wc_opcode opcode = ibv_wc_read_opcode(cq->ibv_cq_ex);
439+
struct efa_rdm_pke *pkt_entry = (struct efa_rdm_pke *) wr_id;
426440
int prov_errno;
427-
struct efa_rdm_ep *ep = NULL;
428-
struct efa_cq *efa_cq;
429-
struct efa_domain *efa_domain;
430-
struct efa_qp *qp;
431441
struct efa_rdm_peer *peer = NULL;
432-
struct dlist_entry rx_progressed_ep_list, *tmp;
433442

434-
efa_cq = container_of(ibv_cq, struct efa_cq, ibv_cq);
435-
efa_domain = container_of(efa_cq->util_cq.domain, struct efa_domain, util_domain);
436-
dlist_init(&rx_progressed_ep_list);
437-
438-
/* Call ibv_start_poll only once */
439-
efa_cq_start_poll(ibv_cq);
440-
441-
while (efa_cq_wc_available(ibv_cq)) {
442-
pkt_entry = (void *)(uintptr_t)ibv_cq->ibv_cq_ex->wr_id;
443-
qp = efa_domain->qp_table[ibv_wc_read_qp_num(ibv_cq->ibv_cq_ex) & efa_domain->qp_table_sz_m1];
444-
ep = container_of(qp->base_ep, struct efa_rdm_ep, base_ep);
445443
#if HAVE_LTTNG
446-
efa_rdm_tracepoint(poll_cq, (size_t) ibv_cq->ibv_cq_ex->wr_id);
444+
efa_rdm_tracepoint(poll_cq, (size_t) wr_id);
447445
if (pkt_entry && pkt_entry->ope)
448446
efa_rdm_tracepoint(poll_cq_ope, pkt_entry->ope->msg_id,
449447
(size_t) pkt_entry->ope->cq_entry.op_context,
450448
pkt_entry->ope->total_len, pkt_entry->ope->cq_entry.tag,
451449
pkt_entry->ope->addr);
452450
#endif
453-
opcode = ibv_wc_read_opcode(ibv_cq->ibv_cq_ex);
454-
if (ibv_cq->ibv_cq_ex->status) {
455-
if (pkt_entry)
456-
peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
457-
prov_errno = efa_rdm_cq_get_prov_errno(ibv_cq->ibv_cq_ex, peer);
458-
switch (opcode) {
459-
case IBV_WC_SEND: /* fall through */
460-
case IBV_WC_RDMA_WRITE: /* fall through */
461-
case IBV_WC_RDMA_READ:
462-
efa_rdm_pke_handle_tx_error(pkt_entry, prov_errno, peer);
463-
break;
464-
case IBV_WC_RECV: /* fall through */
465-
case IBV_WC_RECV_RDMA_WITH_IMM:
466-
if (efa_cq_wc_is_unsolicited(ibv_cq->ibv_cq_ex)) {
467-
EFA_WARN(FI_LOG_CQ, "Receive error %s (%d) for unsolicited write recv",
468-
efa_strerror(prov_errno), prov_errno);
469-
efa_base_ep_write_eq_error(&ep->base_ep, to_fi_errno(prov_errno), prov_errno);
451+
if (!efa_cq_wc_is_unsolicited(cq->ibv_cq_ex)) {
452+
if (pkt_entry)
453+
peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
454+
if (OFI_UNLIKELY(status != IBV_WC_SUCCESS)) {
455+
prov_errno = efa_rdm_cq_get_prov_errno(cq->ibv_cq_ex, peer);
456+
if (prov_errno == EFA_IO_COMP_STATUS_REMOTE_ERROR_RNR &&
457+
ep->handle_resource_management == FI_RM_ENABLED) {
458+
switch(efa_rdm_pkt_type_of(pkt_entry)) {
459+
case EFA_RDM_RECEIPT_PKT:
460+
case EFA_RDM_EOR_PKT:
461+
efa_rdm_ep_record_tx_op_completed(ep, pkt_entry, peer);
462+
efa_rdm_ep_queue_rnr_pkt(ep, pkt_entry);
463+
return status;
464+
default:
470465
break;
471466
}
472-
efa_rdm_pke_handle_rx_error(pkt_entry, prov_errno);
467+
}
468+
}
469+
switch (opcode) {
470+
case IBV_WC_SEND: /* fall through */
471+
case IBV_WC_RDMA_WRITE: /* fall through */
472+
case IBV_WC_RDMA_READ:
473+
efa_rdm_ep_record_tx_op_completed(ep, pkt_entry, peer);
474+
efa_rdm_pke_release_tx(pkt_entry);
475+
break;
476+
case IBV_WC_RECV: /* fall through */
477+
case IBV_WC_RECV_RDMA_WITH_IMM:
478+
efa_rdm_pke_release_rx(pkt_entry);
479+
break;
480+
default:
481+
EFA_WARN(FI_LOG_EP_CTRL, "Unhandled opcode: %d\n", opcode);
482+
assert(0 && "Unhandled opcode");
483+
}
484+
}
485+
return status;
486+
}
487+
488+
/**
489+
* @brief Process work completions
490+
*
491+
* @param[in] cq IBV CQ
492+
* @param[in] ep EFA RDM endpoint
493+
* @return Status code for the work completion
494+
*/
495+
static inline
496+
enum ibv_wc_status efa_rdm_cq_process_wc(struct efa_ibv_cq *cq, struct efa_rdm_ep *ep)
497+
{
498+
uint64_t wr_id = cq->ibv_cq_ex->wr_id;
499+
enum ibv_wc_status status = cq->ibv_cq_ex->status;
500+
enum ibv_wc_opcode opcode = ibv_wc_read_opcode(cq->ibv_cq_ex);
501+
struct efa_rdm_pke *pkt_entry = (struct efa_rdm_pke *) wr_id;
502+
int prov_errno;
503+
struct efa_rdm_peer *peer = NULL;
504+
505+
#if HAVE_LTTNG
506+
efa_rdm_tracepoint(poll_cq, (size_t) wr_id);
507+
if (pkt_entry && pkt_entry->ope)
508+
efa_rdm_tracepoint(poll_cq_ope, pkt_entry->ope->msg_id,
509+
(size_t) pkt_entry->ope->cq_entry.op_context,
510+
pkt_entry->ope->total_len, pkt_entry->ope->cq_entry.tag,
511+
pkt_entry->ope->addr);
512+
#endif
513+
514+
if (OFI_UNLIKELY(status != IBV_WC_SUCCESS)) {
515+
if (pkt_entry)
516+
peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
517+
prov_errno = efa_rdm_cq_get_prov_errno(cq->ibv_cq_ex, peer);
518+
switch (opcode) {
519+
case IBV_WC_SEND: /* fall through */
520+
case IBV_WC_RDMA_WRITE: /* fall through */
521+
case IBV_WC_RDMA_READ:
522+
assert(pkt_entry);
523+
efa_rdm_pke_handle_tx_error(pkt_entry, prov_errno, peer);
524+
break;
525+
case IBV_WC_RECV: /* fall through */
526+
case IBV_WC_RECV_RDMA_WITH_IMM:
527+
if (efa_cq_wc_is_unsolicited(cq->ibv_cq_ex)) {
528+
EFA_WARN(FI_LOG_CQ, "Receive error %s (%d) for unsolicited write recv",
529+
efa_strerror(prov_errno), prov_errno);
530+
efa_base_ep_write_eq_error(&ep->base_ep, to_fi_errno(prov_errno), prov_errno);
473531
break;
474-
default:
475-
EFA_WARN(FI_LOG_EP_CTRL, "Unhandled op code %d\n", opcode);
476-
assert(0 && "Unhandled op code");
477532
}
533+
assert(pkt_entry);
534+
efa_rdm_pke_handle_rx_error(pkt_entry, prov_errno);
478535
break;
536+
default:
537+
EFA_WARN(FI_LOG_EP_CTRL, "Unhandled opcode: %d\n", opcode);
538+
assert(0 && "Unhandled opcode");
479539
}
540+
} else {
480541
switch (opcode) {
481542
case IBV_WC_SEND:
482543
#if ENABLE_DEBUG
@@ -488,7 +549,7 @@ int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
488549
case IBV_WC_RECV:
489550
/* efa_rdm_cq_handle_recv_completion does additional work to determine the source
490551
* address and the peer struct. So do not try to identify the peer here. */
491-
efa_rdm_cq_handle_recv_completion(ibv_cq, pkt_entry, ep);
552+
efa_rdm_cq_handle_recv_completion(cq, pkt_entry, ep);
492553
#if ENABLE_DEBUG
493554
ep->recv_comps++;
494555
#endif
@@ -500,7 +561,7 @@ int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
500561
break;
501562
case IBV_WC_RECV_RDMA_WITH_IMM:
502563
efa_rdm_cq_proc_ibv_recv_rdma_with_imm_completion(
503-
ibv_cq->ibv_cq_ex,
564+
cq->ibv_cq_ex,
504565
FI_REMOTE_CQ_DATA | FI_RMA | FI_REMOTE_WRITE,
505566
ep, pkt_entry);
506567
break;
@@ -509,13 +570,73 @@ int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
509570
"Unhandled cq type\n");
510571
assert(0 && "Unhandled cq type");
511572
}
573+
}
574+
return status;
575+
}
512576

577+
void efa_rdm_cq_poll_ibv_cq_closing_ep(struct efa_ibv_cq *ibv_cq, struct efa_rdm_ep *closing_ep)
578+
{
579+
580+
struct efa_rdm_ep *ep = NULL;
581+
struct efa_cq *efa_cq = container_of(ibv_cq, struct efa_cq, ibv_cq);
582+
struct efa_domain *efa_domain = container_of(efa_cq->util_cq.domain, struct efa_domain, util_domain);
583+
struct dlist_entry rx_progressed_ep_list, *tmp;
584+
585+
dlist_init(&rx_progressed_ep_list);
586+
587+
efa_cq_start_poll(ibv_cq);
588+
while (efa_cq_wc_available(ibv_cq)) {
589+
ep = efa_rdm_cq_get_rdm_ep(ibv_cq, efa_domain);
590+
if (ep == closing_ep) {
591+
if (OFI_UNLIKELY(efa_rdm_cq_process_wc_closing_ep(ibv_cq, ep) != IBV_WC_SUCCESS))
592+
break;
593+
} else {
594+
if (OFI_UNLIKELY(efa_rdm_cq_process_wc(ibv_cq, ep) != IBV_WC_SUCCESS))
595+
break;
596+
if (ep->efa_rx_pkts_to_post > 0 && !dlist_find_first_match(&rx_progressed_ep_list, &efa_rdm_cq_match_ep, ep))
597+
dlist_insert_tail(&ep->entry, &rx_progressed_ep_list);
598+
}
599+
efa_cq_next_poll(ibv_cq);
600+
}
601+
efa_cq_end_poll(ibv_cq);
602+
dlist_foreach_container_safe(
603+
&rx_progressed_ep_list, struct efa_rdm_ep, ep, entry, tmp) {
604+
efa_rdm_ep_post_internal_rx_pkts(ep);
605+
dlist_remove(&ep->entry);
606+
}
607+
assert(dlist_empty(&rx_progressed_ep_list));
608+
}
609+
610+
/**
611+
* @brief poll rdma-core cq and process the cq entry
612+
*
613+
* @param[in] ep_poll the RDM endpoint that polls ibv cq. Note this polling endpoint can be different
614+
* from the endpoint that the completed packet entry was posted from (pkt_entry->ep).
615+
* @param[in] cqe_to_process Max number of cq entry to poll and process. A negative number means to poll until cq empty
616+
*/
617+
int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
618+
{
619+
int err;
620+
size_t i = 0;
621+
struct efa_rdm_ep *ep = NULL;
622+
struct efa_cq *efa_cq = container_of(ibv_cq, struct efa_cq, ibv_cq);
623+
struct efa_domain *efa_domain = container_of(efa_cq->util_cq.domain, struct efa_domain, util_domain);
624+
625+
struct dlist_entry rx_progressed_ep_list, *tmp;
626+
627+
dlist_init(&rx_progressed_ep_list);
628+
629+
/* Call ibv_start_poll only once */
630+
efa_cq_start_poll(ibv_cq);
631+
632+
while (efa_cq_wc_available(ibv_cq)) {
633+
ep = efa_rdm_cq_get_rdm_ep(ibv_cq, efa_domain);
634+
if (OFI_UNLIKELY(efa_rdm_cq_process_wc(ibv_cq, ep) != IBV_WC_SUCCESS))
635+
break;
513636
if (ep->efa_rx_pkts_to_post > 0 && !dlist_find_first_match(&rx_progressed_ep_list, &efa_rdm_cq_match_ep, ep))
514637
dlist_insert_tail(&ep->entry, &rx_progressed_ep_list);
515-
i++;
516-
if (i == cqe_to_process) {
638+
if (++i >= cqe_to_process)
517639
break;
518-
}
519640

520641
/*
521642
* ibv_next_poll MUST be call after the current WC is fully processed,
@@ -526,7 +647,6 @@ int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
526647

527648
err = ibv_cq->poll_err;
528649
efa_cq_end_poll(ibv_cq);
529-
530650
dlist_foreach_container_safe(
531651
&rx_progressed_ep_list, struct efa_rdm_ep, ep, entry, tmp) {
532652
efa_rdm_ep_post_internal_rx_pkts(ep);

prov/efa/src/rdm/efa_rdm_cq.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ struct efa_rdm_cq {
1717
int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
1818
struct fid_cq **cq_fid, void *context);
1919

20+
void efa_rdm_cq_poll_ibv_cq_closing_ep(struct efa_ibv_cq *ibv_cq, struct efa_rdm_ep *closing_ep);
2021
int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq);
2122

2223
void efa_rdm_cq_progress_peers_and_queues(struct efa_rdm_cq *efa_rdm_cq);

prov/efa/src/rdm/efa_rdm_ep_fiops.c

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "efa_rdm_rxe_map.h"
1313
#include "efa_rdm_pkt_type.h"
1414
#include "efa_rdm_pke_req.h"
15+
#include "efa_rdm_pke_utils.h"
1516
#include "efa_cntr.h"
1617

1718

@@ -824,6 +825,48 @@ bool efa_rdm_ep_has_unfinished_send(struct efa_rdm_ep *efa_rdm_ep)
824825
return false;
825826
}
826827

828+
static inline void progress_queues_closing_ep(struct efa_rdm_ep *ep)
829+
{
830+
struct efa_rdm_peer *peer;
831+
struct dlist_entry *tmp;
832+
struct efa_rdm_ope *ope;
833+
struct efa_domain *domain = efa_rdm_ep_domain(ep);
834+
835+
assert(domain->info->ep_attr->type == FI_EP_RDM);
836+
837+
/* Update timers for peers that are in backoff list*/
838+
dlist_foreach_container_safe(&domain->peer_backoff_list,
839+
struct efa_rdm_peer, peer, rnr_backoff_entry, tmp) {
840+
if (ofi_gettime_us() >= peer->rnr_backoff_begin_ts +
841+
peer->rnr_backoff_wait_time) {
842+
peer->flags &= ~EFA_RDM_PEER_IN_BACKOFF;
843+
dlist_remove(&peer->rnr_backoff_entry);
844+
}
845+
}
846+
847+
dlist_foreach_container_safe(&domain->ope_queued_list,
848+
struct efa_rdm_ope, ope, queued_entry, tmp) {
849+
if (ope->ep == ep) {
850+
switch (efa_rdm_pkt_type_of(ope)) {
851+
case EFA_RDM_RECEIPT_PKT:
852+
case EFA_RDM_EOR_PKT:
853+
if (efa_rdm_ope_process_queued_ope(ope, EFA_RDM_OPE_QUEUED_RNR))
854+
continue;
855+
if (efa_rdm_ope_process_queued_ope(ope, EFA_RDM_OPE_QUEUED_CTRL))
856+
continue;
857+
/* fall-thru */
858+
default:
859+
/* Release all other queued OPEs */
860+
if (ope->type == EFA_RDM_TXE)
861+
efa_rdm_txe_release(ope);
862+
else
863+
efa_rdm_rxe_release(ope);
864+
break;
865+
}
866+
}
867+
}
868+
}
869+
827870
/*
828871
* @brief wait for send to finish
829872
*
@@ -846,10 +889,10 @@ void efa_rdm_ep_wait_send(struct efa_rdm_ep *efa_rdm_ep)
846889
while (efa_rdm_ep_has_unfinished_send(efa_rdm_ep)) {
847890
/* poll cq until empty */
848891
if (tx_cq)
849-
(void) efa_rdm_cq_poll_ibv_cq(-1, &tx_cq->ibv_cq);
892+
efa_rdm_cq_poll_ibv_cq_closing_ep(&tx_cq->ibv_cq, efa_rdm_ep);
850893
if (rx_cq)
851-
(void) efa_rdm_cq_poll_ibv_cq(-1, &rx_cq->ibv_cq);
852-
efa_domain_progress_rdm_peers_and_queues(efa_rdm_ep_domain(efa_rdm_ep));
894+
efa_rdm_cq_poll_ibv_cq_closing_ep(&rx_cq->ibv_cq, efa_rdm_ep);
895+
progress_queues_closing_ep(efa_rdm_ep);
853896
}
854897

855898
ofi_genlock_unlock(&efa_rdm_ep_domain(efa_rdm_ep)->srx_lock);

0 commit comments

Comments
 (0)