Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions fabtests/prov/efa/src/multi_ep_mt.c
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ int open_client(int idx)
{
int ret;
struct fi_av_attr av_attr = {0};
struct fid_av *av;
int av_idx = idx;

if (opts.av_name) {
av_attr.name = opts.av_name;
Expand All @@ -383,11 +383,9 @@ int open_client(int idx)

/* ft_enable_ep bind the ep with cq and av before enabling */
if (shared_av) {
av = avs[0];
av_idx = 0;
} else {
av = avs[idx];

ret = fi_av_open(domain, &av_attr, &av, NULL);
ret = fi_av_open(domain, &av_attr, &avs[av_idx], NULL);
if (ret) {
FT_PRINTERR("fi_av_open", ret);
return ret;
Expand All @@ -399,7 +397,7 @@ int open_client(int idx)
return ret;

/* Use the same remote addr we got from the persistent receiver ep */
ret = ft_av_insert(av, (void *)remote_raw_addr, 1, &remote_fiaddr[idx], 0, NULL);
ret = ft_av_insert(avs[av_idx], (void *)remote_raw_addr, 1, &remote_fiaddr[idx], 0, NULL);
if (ret)
return ret;

Expand Down
74 changes: 27 additions & 47 deletions prov/efa/src/efa_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,15 @@ static inline uint64_t efa_cq_opcode_to_fi_flags(enum ibv_wc_opcode opcode) {
}
}

static void efa_cq_construct_cq_entry(struct efa_ibv_cq* cq,
struct fi_cq_tagged_entry *entry, int opcode)
static void efa_cq_read_context_entry(struct efa_ibv_cq *ibv_cq, void *buf, int opcode)
{
struct fi_cq_entry *entry = buf;

entry->op_context = (void *)(uintptr_t)ibv_cq->ibv_cq_ex->wr_id;
}

static inline
void efa_cq_read_entry_common(struct efa_ibv_cq *cq, struct fi_cq_msg_entry *entry, int opcode)
{
struct ibv_cq_ex *ibv_cqx = cq->ibv_cq_ex;

Expand All @@ -45,12 +52,21 @@ static void efa_cq_construct_cq_entry(struct efa_ibv_cq* cq,
entry->op_context = NULL;
entry->flags = efa_cq_opcode_to_fi_flags(opcode);
}

entry->len = efa_ibv_cq_wc_read_byte_len(cq);
}

static void efa_cq_read_msg_entry(struct efa_ibv_cq *cq, void *buf, int opcode)
{
efa_cq_read_entry_common(cq, (struct fi_cq_msg_entry *)buf, opcode);
}

static void efa_cq_read_data_entry(struct efa_ibv_cq *cq, void *buf, int opcode)
{
struct fi_cq_data_entry *entry = buf;

efa_cq_read_entry_common(cq, (struct fi_cq_msg_entry *)buf, opcode);
entry->buf = NULL;
entry->data = 0;
entry->tag = 0;

if (efa_ibv_cq_wc_read_wc_flags(cq) & IBV_WC_WITH_IMM) {
entry->flags |= FI_REMOTE_CQ_DATA;
entry->data = efa_ibv_cq_wc_read_imm_data(cq);
Expand Down Expand Up @@ -86,7 +102,8 @@ static void efa_cq_handle_error(struct efa_base_ep *base_ep,
struct ibv_cq_ex *ibv_cq_ex = cq->ibv_cq_ex;

memset(&err_entry, 0, sizeof(err_entry));
efa_cq_construct_cq_entry(cq, (struct fi_cq_tagged_entry *) &err_entry, efa_ibv_cq_wc_read_opcode(cq));
/* Use the most informative entry that efa-direct support to construct cq entry for general usage */
efa_cq_read_data_entry(cq, &err_entry, efa_ibv_cq_wc_read_opcode(cq));
err_entry.err = err;
err_entry.prov_errno = prov_errno;

Expand Down Expand Up @@ -300,7 +317,8 @@ int efa_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
break;
}

efa_cq_construct_cq_entry(ibv_cq, &cq_entry, opcode);
/* Use the most informative entry that efa-direct support to construct cq entry for general usage */
efa_cq_read_data_entry(ibv_cq, &cq_entry, opcode);
EFA_DBG(FI_LOG_CQ,
"Write cq entry of context: %lx, flags: %lx\n",
(size_t) cq_entry.op_context, cq_entry.flags);
Expand Down Expand Up @@ -569,45 +587,6 @@ int efa_cq_signal(struct fid_cq *cq_fid)
return 0;
}

static void efa_cq_read_context_entry(struct efa_ibv_cq *ibv_cq, void *buf, int opcode)
{
struct fi_cq_entry *entry = buf;

entry->op_context = (void *)(uintptr_t)ibv_cq->ibv_cq_ex->wr_id;
}

static inline
void efa_cq_read_entry_common(struct efa_ibv_cq *cq, struct fi_cq_msg_entry *entry, int opcode)
{
struct ibv_cq_ex *ibv_cqx = cq->ibv_cq_ex;

if (!efa_cq_wc_is_unsolicited(cq) && ibv_cqx->wr_id) {
entry->op_context = (void *)ibv_cqx->wr_id;
entry->flags = (opcode == IBV_WC_RECV_RDMA_WITH_IMM) ? efa_cq_opcode_to_fi_flags(opcode): ((struct efa_context *) ibv_cqx->wr_id)->completion_flags;
} else {
entry->op_context = NULL;
entry->flags = efa_cq_opcode_to_fi_flags(opcode);
}
entry->len = efa_ibv_cq_wc_read_byte_len(cq);
}

static void efa_cq_read_msg_entry(struct efa_ibv_cq *cq, void *buf, int opcode)
{
efa_cq_read_entry_common(cq, (struct fi_cq_msg_entry *)buf, opcode);
}

static void efa_cq_read_data_entry(struct efa_ibv_cq *cq, void *buf, int opcode)
{
struct fi_cq_data_entry *entry = buf;

efa_cq_read_entry_common(cq, (struct fi_cq_msg_entry *)buf, opcode);
entry->data = 0;
if (efa_ibv_cq_wc_read_wc_flags(cq) & IBV_WC_WITH_IMM) {
entry->flags |= FI_REMOTE_CQ_DATA;
entry->data = efa_ibv_cq_wc_read_imm_data(cq);
}
}

static inline fi_addr_t efa_cq_get_src_addr(struct efa_ibv_cq *ibv_cq, int opcode)
{
struct efa_cq *efa_cq;
Expand Down Expand Up @@ -661,7 +640,8 @@ static inline void efa_cq_fill_err_entry(struct efa_ibv_cq *ibv_cq, struct fi_cq
int prov_errno = efa_ibv_cq_wc_read_vendor_err(ibv_cq);
fi_addr_t addr;

efa_cq_construct_cq_entry(ibv_cq, (struct fi_cq_tagged_entry *) buf, opcode);
/* Use the most informative entry that efa-direct support to construct cq entry for general usage */
efa_cq_read_data_entry(ibv_cq, buf, opcode);
buf->err = to_fi_errno(prov_errno);
buf->prov_errno = prov_errno;

Expand Down
22 changes: 20 additions & 2 deletions prov/efa/src/rdm/efa_rdm_ep_fiops.c
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ int efa_rdm_ep_create_buffer_pools(struct efa_rdm_ep *ep)
sizeof(struct efa_rdm_ep_peer_map_entry),
EFA_RDM_BUFPOOL_ALIGNMENT, 0, /* no limit to max_cnt */
EFA_RDM_EP_MIN_PEER_POOL_SIZE,
/* Don't track usage, because endpoint can be closed without removing entries from AV */
OFI_BUFPOOL_NO_TRACK);
0);
if (ret)
goto err_free;

Expand Down Expand Up @@ -748,6 +747,25 @@ static void efa_rdm_ep_destroy_buffer_pools(struct efa_rdm_ep *efa_rdm_ep)
efa_rdm_txe_release(txe);
}

/* Clean up any remaining peers in the hashmap before destroying buffer pools */
if (efa_rdm_ep->fi_addr_to_peer_map) {
struct efa_rdm_ep_peer_map_entry *map_entry, *map_tmp;
HASH_ITER(hndl, efa_rdm_ep->fi_addr_to_peer_map, map_entry, map_tmp) {
efa_rdm_peer_destruct(&map_entry->peer, efa_rdm_ep);
HASH_DELETE(hndl, efa_rdm_ep->fi_addr_to_peer_map, map_entry);
ofi_buf_free(map_entry);
}
}

if (efa_rdm_ep->fi_addr_to_peer_map_implicit) {
struct efa_rdm_ep_peer_map_entry *map_entry, *map_tmp;
HASH_ITER(hndl, efa_rdm_ep->fi_addr_to_peer_map_implicit, map_entry, map_tmp) {
efa_rdm_peer_destruct(&map_entry->peer, efa_rdm_ep);
HASH_DELETE(hndl, efa_rdm_ep->fi_addr_to_peer_map_implicit, map_entry);
ofi_buf_free(map_entry);
}
}

if (efa_rdm_ep->ope_pool)
ofi_bufpool_destroy(efa_rdm_ep->ope_pool);

Expand Down