diff --git a/fabtests/prov/efa/src/multi_ep_mt.c b/fabtests/prov/efa/src/multi_ep_mt.c index ed59bc66010..c560e596a07 100644 --- a/fabtests/prov/efa/src/multi_ep_mt.c +++ b/fabtests/prov/efa/src/multi_ep_mt.c @@ -368,7 +368,7 @@ int open_client(int idx) { int ret; struct fi_av_attr av_attr = {0}; - struct fid_av *av; + int av_idx = idx; if (opts.av_name) { av_attr.name = opts.av_name; @@ -383,11 +383,9 @@ int open_client(int idx) /* ft_enable_ep bind the ep with cq and av before enabling */ if (shared_av) { - av = avs[0]; + av_idx = 0; } else { - av = avs[idx]; - - ret = fi_av_open(domain, &av_attr, &av, NULL); + ret = fi_av_open(domain, &av_attr, &avs[av_idx], NULL); if (ret) { FT_PRINTERR("fi_av_open", ret); return ret; @@ -399,7 +397,7 @@ int open_client(int idx) return ret; /* Use the same remote addr we got from the persistent receiver ep */ - ret = ft_av_insert(av, (void *)remote_raw_addr, 1, &remote_fiaddr[idx], 0, NULL); + ret = ft_av_insert(avs[av_idx], (void *)remote_raw_addr, 1, &remote_fiaddr[idx], 0, NULL); if (ret) return ret; diff --git a/prov/efa/src/efa_cq.c b/prov/efa/src/efa_cq.c index 1b8790757de..18bce4ea672 100644 --- a/prov/efa/src/efa_cq.c +++ b/prov/efa/src/efa_cq.c @@ -33,8 +33,15 @@ static inline uint64_t efa_cq_opcode_to_fi_flags(enum ibv_wc_opcode opcode) { } } -static void efa_cq_construct_cq_entry(struct efa_ibv_cq* cq, - struct fi_cq_tagged_entry *entry, int opcode) +static void efa_cq_read_context_entry(struct efa_ibv_cq *ibv_cq, void *buf, int opcode) +{ + struct fi_cq_entry *entry = buf; + + entry->op_context = (void *)(uintptr_t)ibv_cq->ibv_cq_ex->wr_id; +} + +static inline +void efa_cq_read_entry_common(struct efa_ibv_cq *cq, struct fi_cq_msg_entry *entry, int opcode) { struct ibv_cq_ex *ibv_cqx = cq->ibv_cq_ex; @@ -45,12 +52,21 @@ static void efa_cq_construct_cq_entry(struct efa_ibv_cq* cq, entry->op_context = NULL; entry->flags = efa_cq_opcode_to_fi_flags(opcode); } - entry->len = efa_ibv_cq_wc_read_byte_len(cq); +} + +static void efa_cq_read_msg_entry(struct efa_ibv_cq *cq, void *buf, int opcode) +{ + efa_cq_read_entry_common(cq, (struct fi_cq_msg_entry *)buf, opcode); +} + +static void efa_cq_read_data_entry(struct efa_ibv_cq *cq, void *buf, int opcode) +{ + struct fi_cq_data_entry *entry = buf; + + efa_cq_read_entry_common(cq, (struct fi_cq_msg_entry *)buf, opcode); entry->buf = NULL; entry->data = 0; - entry->tag = 0; - if (efa_ibv_cq_wc_read_wc_flags(cq) & IBV_WC_WITH_IMM) { entry->flags |= FI_REMOTE_CQ_DATA; entry->data = efa_ibv_cq_wc_read_imm_data(cq); @@ -86,7 +102,8 @@ static void efa_cq_handle_error(struct efa_base_ep *base_ep, struct ibv_cq_ex *ibv_cq_ex = cq->ibv_cq_ex; memset(&err_entry, 0, sizeof(err_entry)); - efa_cq_construct_cq_entry(cq, (struct fi_cq_tagged_entry *) &err_entry, efa_ibv_cq_wc_read_opcode(cq)); + /* Use the most informative entry that efa-direct support to construct cq entry for general usage */ + efa_cq_read_data_entry(cq, &err_entry, efa_ibv_cq_wc_read_opcode(cq)); err_entry.err = err; err_entry.prov_errno = prov_errno; @@ -300,7 +317,8 @@ int efa_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq) break; } - efa_cq_construct_cq_entry(ibv_cq, &cq_entry, opcode); + /* Use the most informative entry that efa-direct support to construct cq entry for general usage */ + efa_cq_read_data_entry(ibv_cq, &cq_entry, opcode); EFA_DBG(FI_LOG_CQ, "Write cq entry of context: %lx, flags: %lx\n", (size_t) cq_entry.op_context, cq_entry.flags); @@ -569,45 +587,6 @@ int efa_cq_signal(struct fid_cq *cq_fid) return 0; } -static void efa_cq_read_context_entry(struct efa_ibv_cq *ibv_cq, void *buf, int opcode) -{ - struct fi_cq_entry *entry = buf; - - entry->op_context = (void *)(uintptr_t)ibv_cq->ibv_cq_ex->wr_id; -} - -static inline -void efa_cq_read_entry_common(struct efa_ibv_cq *cq, struct fi_cq_msg_entry *entry, int opcode) -{ - struct ibv_cq_ex *ibv_cqx = cq->ibv_cq_ex; - - if (!efa_cq_wc_is_unsolicited(cq) && ibv_cqx->wr_id) { - entry->op_context = (void *)ibv_cqx->wr_id; - entry->flags = (opcode == IBV_WC_RECV_RDMA_WITH_IMM) ? efa_cq_opcode_to_fi_flags(opcode): ((struct efa_context *) ibv_cqx->wr_id)->completion_flags; - } else { - entry->op_context = NULL; - entry->flags = efa_cq_opcode_to_fi_flags(opcode); - } - entry->len = efa_ibv_cq_wc_read_byte_len(cq); -} - -static void efa_cq_read_msg_entry(struct efa_ibv_cq *cq, void *buf, int opcode) -{ - efa_cq_read_entry_common(cq, (struct fi_cq_msg_entry *)buf, opcode); -} - -static void efa_cq_read_data_entry(struct efa_ibv_cq *cq, void *buf, int opcode) -{ - struct fi_cq_data_entry *entry = buf; - - efa_cq_read_entry_common(cq, (struct fi_cq_msg_entry *)buf, opcode); - entry->data = 0; - if (efa_ibv_cq_wc_read_wc_flags(cq) & IBV_WC_WITH_IMM) { - entry->flags |= FI_REMOTE_CQ_DATA; - entry->data = efa_ibv_cq_wc_read_imm_data(cq); - } -} - static inline fi_addr_t efa_cq_get_src_addr(struct efa_ibv_cq *ibv_cq, int opcode) { struct efa_cq *efa_cq; @@ -661,7 +640,8 @@ static inline void efa_cq_fill_err_entry(struct efa_ibv_cq *ibv_cq, struct fi_cq int prov_errno = efa_ibv_cq_wc_read_vendor_err(ibv_cq); fi_addr_t addr; - efa_cq_construct_cq_entry(ibv_cq, (struct fi_cq_tagged_entry *) buf, opcode); + /* Use the most informative entry that efa-direct support to construct cq entry for general usage */ + efa_cq_read_data_entry(ibv_cq, buf, opcode); buf->err = to_fi_errno(prov_errno); buf->prov_errno = prov_errno; diff --git a/prov/efa/src/rdm/efa_rdm_ep_fiops.c b/prov/efa/src/rdm/efa_rdm_ep_fiops.c index 7cc63bf804c..b6e31b3badd 100644 --- a/prov/efa/src/rdm/efa_rdm_ep_fiops.c +++ b/prov/efa/src/rdm/efa_rdm_ep_fiops.c @@ -238,8 +238,7 @@ int efa_rdm_ep_create_buffer_pools(struct efa_rdm_ep *ep) sizeof(struct efa_rdm_ep_peer_map_entry), EFA_RDM_BUFPOOL_ALIGNMENT, 0, /* no limit to max_cnt */ EFA_RDM_EP_MIN_PEER_POOL_SIZE, - /* Don't track usage, because endpoint can be closed without removing entries from AV */ - OFI_BUFPOOL_NO_TRACK); + 0); if (ret) goto err_free; @@ -748,6 +747,25 @@ static void efa_rdm_ep_destroy_buffer_pools(struct efa_rdm_ep *efa_rdm_ep) efa_rdm_txe_release(txe); } + /* Clean up any remaining peers in the hashmap before destroying buffer pools */ + if (efa_rdm_ep->fi_addr_to_peer_map) { + struct efa_rdm_ep_peer_map_entry *map_entry, *map_tmp; + HASH_ITER(hndl, efa_rdm_ep->fi_addr_to_peer_map, map_entry, map_tmp) { + efa_rdm_peer_destruct(&map_entry->peer, efa_rdm_ep); + HASH_DELETE(hndl, efa_rdm_ep->fi_addr_to_peer_map, map_entry); + ofi_buf_free(map_entry); + } + } + + if (efa_rdm_ep->fi_addr_to_peer_map_implicit) { + struct efa_rdm_ep_peer_map_entry *map_entry, *map_tmp; + HASH_ITER(hndl, efa_rdm_ep->fi_addr_to_peer_map_implicit, map_entry, map_tmp) { + efa_rdm_peer_destruct(&map_entry->peer, efa_rdm_ep); + HASH_DELETE(hndl, efa_rdm_ep->fi_addr_to_peer_map_implicit, map_entry); + ofi_buf_free(map_entry); + } + } + if (efa_rdm_ep->ope_pool) ofi_bufpool_destroy(efa_rdm_ep->ope_pool);