Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 5 additions & 32 deletions prov/efa/src/efa_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -248,29 +248,21 @@ efa_cq_proc_ibv_recv_rdma_with_imm_completion(struct efa_base_ep *base_ep,
*/
int efa_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
{
bool should_end_poll = false;
struct efa_base_ep *base_ep;
struct efa_cq *cq;
struct efa_domain *efa_domain;
struct fi_cq_tagged_entry cq_entry = {0};
struct fi_cq_err_entry err_entry;
int err = 0;
size_t num_cqe = 0; /* Count of read entries */
int prov_errno, opcode;

/* Initialize an empty ibv_poll_cq_attr struct for ibv_start_poll.
* EFA expects .comp_mask = 0, or otherwise returns EINVAL.
*/
struct ibv_poll_cq_attr poll_cq_attr = {.comp_mask = 0};

cq = container_of(ibv_cq, struct efa_cq, ibv_cq);
efa_domain = container_of(cq->util_cq.domain, struct efa_domain, util_domain);

/* Call ibv_start_poll only once */
err = ibv_start_poll(cq->ibv_cq.ibv_cq_ex, &poll_cq_attr);
should_end_poll = !err;
efa_cq_start_poll(ibv_cq);

while (!err) {
while (efa_cq_wc_available(ibv_cq)) {
base_ep = efa_domain->qp_table[ibv_wc_read_qp_num(cq->ibv_cq.ibv_cq_ex) & efa_domain->qp_table_sz_m1]->base_ep;
opcode = ibv_wc_read_opcode(cq->ibv_cq.ibv_cq_ex);
if (cq->ibv_cq.ibv_cq_ex->status) {
Expand Down Expand Up @@ -328,29 +320,10 @@ int efa_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
break;
}

err = ibv_next_poll(cq->ibv_cq.ibv_cq_ex);
efa_cq_next_poll(ibv_cq);
}

if (err && err != ENOENT) {
err = err > 0 ? err : -err;
prov_errno = ibv_wc_read_vendor_err(cq->ibv_cq.ibv_cq_ex);
EFA_WARN(FI_LOG_CQ,
"Unexpected error when polling ibv cq, err: %s (%d) "
"prov_errno: %s (%d)\n",
fi_strerror(err), err, efa_strerror(prov_errno),
prov_errno);
efa_show_help(prov_errno);
err_entry = (struct fi_cq_err_entry) {
.err = err,
.prov_errno = prov_errno,
.op_context = NULL,
};
ofi_cq_write_error(&cq->util_cq, &err_entry);
}

if (should_end_poll)
ibv_end_poll(cq->ibv_cq.ibv_cq_ex);

err = ibv_cq->poll_err;
efa_cq_end_poll(ibv_cq);
return err;
}

Expand Down
56 changes: 55 additions & 1 deletion prov/efa/src/efa_cq.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ struct efa_ibv_cq {
struct ibv_cq_ex *ibv_cq_ex;
enum ibv_cq_ex_type ibv_cq_ex_type;
struct ibv_comp_channel *channel;
bool poll_active;
int poll_err;
};

struct efa_ibv_cq_poll_list_entry {
Expand Down Expand Up @@ -372,10 +374,62 @@ static inline int efa_write_error_msg(struct efa_base_ep *ep, fi_addr_t addr,
return 0;
}

static inline bool efa_cq_wc_available(struct efa_ibv_cq *cq)
{
return cq->poll_active && !cq->poll_err;
}

static inline void efa_cq_report_poll_err(struct efa_ibv_cq *cq)
{
int err = cq->poll_err;
int prov_errno;

if (err && err != ENOENT) {
err = err > 0 ? err : -err;
prov_errno = ibv_wc_read_vendor_err(cq->ibv_cq_ex);
EFA_WARN(FI_LOG_CQ, "Encountered error during CQ polling. err: %s (%d), prov_errno: %s (%d)\n",
fi_strerror(err), err, efa_strerror(prov_errno), prov_errno);
efa_show_help(prov_errno);
ofi_cq_write_error(&container_of(cq, struct efa_cq, ibv_cq)->util_cq , &(struct fi_cq_err_entry) {
.err = err,
.prov_errno = prov_errno,
});
}
}

static inline void efa_cq_start_poll(struct efa_ibv_cq *cq)
{
/* Pass an empty ibv_poll_cq_attr struct (zero-initialized) for
* ibv_start_poll. EFA expects .comp_mask = 0, or otherwise returns EINVAL.
*/
assert(!cq->poll_active);
cq->poll_err = ibv_start_poll(cq->ibv_cq_ex, &(struct ibv_poll_cq_attr){0});
if (OFI_LIKELY(!cq->poll_err))
cq->poll_active = true;
else
efa_cq_report_poll_err(cq);
}

static inline void efa_cq_next_poll(struct efa_ibv_cq *cq)
{
assert(cq->poll_active);
cq->poll_err = ibv_next_poll(cq->ibv_cq_ex);
if (OFI_UNLIKELY(cq->poll_err))
efa_cq_report_poll_err(cq);
}

static inline void efa_cq_end_poll(struct efa_ibv_cq *cq)
{
if (OFI_LIKELY(cq->poll_active))
ibv_end_poll(cq->ibv_cq_ex);
cq->poll_active = false;
cq->poll_err = 0;
}

int efa_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq);
int efa_cq_trywait(struct efa_cq *cq);
int efa_cq_signal(struct fid_cq *cq_fid);
int efa_poll_events(struct efa_cq *cq, int timeout);
int efa_cq_control(struct fid *cq, int command, void *arg);

#endif /* end of _EFA_CQ_H*/
#endif /* end of _EFA_CQ_H*/
90 changes: 8 additions & 82 deletions prov/efa/src/efa_domain.c
Original file line number Diff line number Diff line change
Expand Up @@ -819,88 +819,14 @@ void efa_domain_progress_rdm_peers_and_queues(struct efa_domain *domain)
if (peer && (peer->flags & EFA_RDM_PEER_IN_BACKOFF))
continue;

if (ope->internal_flags & EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE) {
ret = efa_rdm_ope_repost_ope_queued_before_handshake(ope);
if (ret == -FI_EAGAIN)
continue;

if (OFI_UNLIKELY(ret)) {
assert(ope->type == EFA_RDM_TXE);
/* efa_rdm_txe_handle_error will remove ope from the queued_list */
ope->ep->ope_queued_before_handshake_cnt--;
efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_PKT_POST);
continue;
}

dlist_remove(&ope->queued_entry);
ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE;
ope->ep->ope_queued_before_handshake_cnt--;
}

if (ope->internal_flags & EFA_RDM_OPE_QUEUED_RNR) {
assert(!dlist_empty(&ope->queued_pkts));
ret = efa_rdm_ep_post_queued_pkts(ope->ep, &ope->queued_pkts);

if (ret == -FI_EAGAIN)
continue;

if (OFI_UNLIKELY(ret)) {
assert(ope->type == EFA_RDM_RXE || ope->type == EFA_RDM_TXE);
if (ope->type == EFA_RDM_RXE)
efa_rdm_rxe_handle_error(ope, -ret, FI_EFA_ERR_PKT_SEND);
else
efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_PKT_SEND);
continue;
}

dlist_remove(&ope->queued_entry);
ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_RNR;
}

if (ope->internal_flags & EFA_RDM_OPE_QUEUED_CTRL) {
ret = efa_rdm_ope_post_send(ope, ope->queued_ctrl_type);
if (ret == -FI_EAGAIN)
continue;

if (OFI_UNLIKELY(ret)) {
assert(ope->type == EFA_RDM_TXE || ope->type == EFA_RDM_RXE);
if (ope->type == EFA_RDM_TXE)
efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_PKT_POST);
else
efa_rdm_rxe_handle_error(ope, -ret, FI_EFA_ERR_PKT_POST);
continue;
}

/* it can happen that efa_rdm_ope_post_send() released ope
* (if the ope is rxe and packet type is EOR and inject is used). In
* that case rxe's state has been set to EFA_RDM_OPE_FREE and
* it has been removed from ep->op_queued_entry_list, so nothing
* is left to do.
*/
if (ope->state == EFA_RDM_OPE_FREE)
continue;

ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_CTRL;
dlist_remove(&ope->queued_entry);
}

if (ope->internal_flags & EFA_RDM_OPE_QUEUED_READ) {
ret = efa_rdm_ope_post_read(ope);
if (ret == -FI_EAGAIN)
continue;

if (OFI_UNLIKELY(ret)) {
assert(ope->type == EFA_RDM_TXE || ope->type == EFA_RDM_RXE);
if (ope->type == EFA_RDM_TXE)
efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_READ_POST);
else
efa_rdm_rxe_handle_error(ope, -ret, FI_EFA_ERR_READ_POST);
continue;
}

ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_READ;
dlist_remove(&ope->queued_entry);
}
if (efa_rdm_ope_process_queued_ope(ope, EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE))
continue;
if (efa_rdm_ope_process_queued_ope(ope, EFA_RDM_OPE_QUEUED_RNR))
continue;
if (efa_rdm_ope_process_queued_ope(ope, EFA_RDM_OPE_QUEUED_CTRL))
continue;
if (efa_rdm_ope_process_queued_ope(ope, EFA_RDM_OPE_QUEUED_READ))
continue;
}
/*
* Send data packets until window or data queue is exhausted.
Expand Down
Loading