Skip to content

Commit ef26c88

Browse files
committed
REVIEW: address feedback
TL/UCP: update ep flush with cb
1 parent 24d18d3 commit ef26c88

File tree

5 files changed

+34
-4
lines changed

5 files changed

+34
-4
lines changed

src/components/tl/ucp/alltoall/alltoall_onesided.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ void ucc_tl_ucp_alltoall_onesided_put_progress(ucc_coll_task_t *ctask)
147147
PTR_OFFSET(dest, grank * nelems), nelems,
148148
peer, src_memh, dst_memh, team, task),
149149
task, out);
150+
UCPCHECK_GOTO(ucc_tl_ucp_ep_flush(peer, team, task), task, out);
150151

151152
if (!alltoall_onesided_handle_completion(task, posted, completed,
152153
ntokens, npolls)) {
@@ -263,7 +264,7 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args,
263264
nelems = nelems / UCC_TL_TEAM_SIZE(tl_team);
264265
param.field_mask = UCP_EP_PERF_PARAM_FIELD_MESSAGE_SIZE;
265266
attr.field_mask = UCP_EP_PERF_ATTR_FIELD_ESTIMATED_TIME;
266-
param.message_size = (1 << 20);
267+
param.message_size = nelems * ucc_dt_size(TASK_ARGS(task).src.info.datatype);;
267268
ucc_tl_ucp_get_ep(
268269
tl_team, (UCC_TL_TEAM_RANK(tl_team) + 1) % UCC_TL_TEAM_SIZE(tl_team),
269270
&ep);

src/components/tl/ucp/tl_ucp_coll.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,8 @@ static inline ucc_status_t ucc_tl_ucp_test_ring(ucc_tl_ucp_task_t *task)
201201

202202
#define UCC_TL_UCP_TASK_ONESIDED_P2P_COMPLETE(_task) \
203203
(((_task)->onesided.put_posted == (_task)->onesided.put_completed) && \
204-
((_task)->onesided.get_posted == (_task)->onesided.get_completed))
204+
((_task)->onesided.get_posted == (_task)->onesided.get_completed) && \
205+
((_task)->flush_posted == (_task)->flush_completed))
205206

206207
#define UCC_TL_UCP_TASK_ONESIDED_SYNC_COMPLETE(_task, _end) \
207208
(*((long *)(TASK_ARGS(_task).global_work_buffer)) == _end)

src/components/tl/ucp/tl_ucp_sendrecv.c

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,19 @@ void ucc_tl_ucp_get_completion_cb(void *request, ucs_status_t status,
6969
ucp_request_free(request);
7070
}
7171

72+
void ucc_tl_ucp_flush_completion_cb(void *request, ucs_status_t status,
73+
void *user_data)
74+
{
75+
ucc_tl_ucp_task_t *task = (ucc_tl_ucp_task_t *)user_data;
76+
if (ucc_unlikely(UCS_OK != status)) {
77+
tl_error(UCC_TASK_LIB(task), "failure in ep flush completion %s",
78+
ucs_status_string(status));
79+
task->super.status = ucs_status_to_ucc_status(status);
80+
}
81+
task->flush_completed++;
82+
ucp_request_free(request);
83+
}
84+
7285
void ucc_tl_ucp_recv_completion_cb_mt(void *request, ucs_status_t status,
7386
const ucp_tag_recv_info_t *info, /* NOLINT */
7487
void *user_data)

src/components/tl/ucp/tl_ucp_sendrecv.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ void ucc_tl_ucp_put_completion_cb(void *request, ucs_status_t status,
6262
void ucc_tl_ucp_get_completion_cb(void *request, ucs_status_t status,
6363
void *user_data);
6464

65+
void ucc_tl_ucp_flush_completion_cb(void *request, ucs_status_t status,
66+
void *user_data);
67+
6568
void ucc_tl_ucp_recv_completion_cb_st(void *request, ucs_status_t status,
6669
const ucp_tag_recv_info_t *info,
6770
void *user_data);
@@ -531,7 +534,8 @@ static inline ucc_status_t ucc_tl_ucp_flush(ucc_tl_ucp_team_t *team)
531534
}
532535

533536
static inline ucc_status_t ucc_tl_ucp_ep_flush(ucc_rank_t dest_group_rank,
534-
ucc_tl_ucp_team_t *team)
537+
ucc_tl_ucp_team_t *team,
538+
ucc_tl_ucp_task_t *task)
535539
{
536540
ucp_request_param_t req_param = {0};
537541
ucc_status_t status;
@@ -543,12 +547,19 @@ static inline ucc_status_t ucc_tl_ucp_ep_flush(ucc_rank_t dest_group_rank,
543547
return status;
544548
}
545549

550+
req_param.op_attr_mask =
551+
UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA;
552+
req_param.cb.send = ucc_tl_ucp_flush_completion_cb;
553+
req_param.user_data = (void *)task;
554+
546555
req = ucp_ep_flush_nbx(ep, &req_param);
556+
task->flush_posted++;
547557
if (UCS_OK != req) {
548558
if (UCS_PTR_IS_ERR(req)) {
549559
return ucs_status_to_ucc_status(UCS_PTR_STATUS(req));
550560
}
551-
ucp_request_free(req);
561+
} else {
562+
task->flush_completed++;
552563
}
553564
return UCC_OK;
554565
}

src/components/tl/ucp/tl_ucp_task.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,8 @@ typedef struct ucc_tl_ucp_task {
221221
} alltoall_onesided;
222222
char plugin_data[UCC_TL_UCP_TASK_PLUGIN_MAX_DATA];
223223
};
224+
uint32_t flush_posted;
225+
uint32_t flush_completed;
224226
} ucc_tl_ucp_task_t;
225227

226228
static inline void ucc_tl_ucp_task_reset(ucc_tl_ucp_task_t *task,
@@ -230,6 +232,8 @@ static inline void ucc_tl_ucp_task_reset(ucc_tl_ucp_task_t *task,
230232
task->tagged.send_completed = 0;
231233
task->tagged.recv_posted = 0;
232234
task->tagged.recv_completed = 0;
235+
task->flush_posted = 0;
236+
task->flush_completed = 0;
233237
task->super.status = status;
234238
}
235239

0 commit comments

Comments
 (0)