diff --git a/fabtests/common/shared.c b/fabtests/common/shared.c index e191a98c55d..316b39d41d7 100644 --- a/fabtests/common/shared.c +++ b/fabtests/common/shared.c @@ -1585,6 +1585,18 @@ int ft_init_av(void) int ft_exchange_addresses_oob(struct fid_av *av_ptr, struct fid_ep *ep_ptr, fi_addr_t *remote_addr) +{ + int ret; + + ret = ft_send_addr_oob(ep_ptr); + if (ret) + return ret; + + return ft_recv_addr_oob(av_ptr, remote_addr); +} + +/* Only send local address out-of-band */ +int ft_send_addr_oob(struct fid_ep *ep_ptr) { char buf[FT_MAX_CTRL_MSG]; int ret; @@ -1596,19 +1608,20 @@ int ft_exchange_addresses_oob(struct fid_av *av_ptr, struct fid_ep *ep_ptr, return ret; } - ret = ft_sock_send(oob_sock, buf, FT_MAX_CTRL_MSG); - if (ret) - return ret; + return ft_sock_send(oob_sock, buf, FT_MAX_CTRL_MSG); +} - ret = ft_sock_recv(oob_sock, buf, FT_MAX_CTRL_MSG); - if (ret) - return ret; +/* Receive and insert peer address out-of-band */ +int ft_recv_addr_oob(struct fid_av *av_ptr, fi_addr_t *remote_addr) +{ + char buf[FT_MAX_CTRL_MSG]; + int ret; - ret = ft_av_insert(av_ptr, buf, 1, remote_addr, 0, NULL); + ret = ft_sock_recv(oob_sock, buf, FT_MAX_CTRL_MSG); if (ret) return ret; - return 0; + return ft_av_insert(av_ptr, buf, 1, remote_addr, 0, NULL); } /* TODO: retry send for unreliable endpoints */ diff --git a/fabtests/include/shared.h b/fabtests/include/shared.h index 721a01d0ae4..0729013b3b0 100644 --- a/fabtests/include/shared.h +++ b/fabtests/include/shared.h @@ -475,6 +475,8 @@ int ft_av_insert(struct fid_av *av, void *addr, size_t count, fi_addr_t *fi_addr uint64_t flags, void *context); int ft_init_av(void); int ft_join_mc(void); +int ft_send_addr_oob(struct fid_ep *ep_ptr); +int ft_recv_addr_oob(struct fid_av *av_ptr, fi_addr_t *remote_addr); int ft_init_av_dst_addr(struct fid_av *av_ptr, struct fid_ep *ep_ptr, fi_addr_t *remote_addr); int ft_init_av_addr(struct fid_av *av, struct fid_ep *ep, diff --git a/fabtests/prov/efa/Makefile.include b/fabtests/prov/efa/Makefile.include index 7f54d1107ed..4eeb64e1ce0 100644 --- a/fabtests/prov/efa/Makefile.include +++ b/fabtests/prov/efa/Makefile.include @@ -35,7 +35,8 @@ bin_PROGRAMS += prov/efa/src/fi_efa_rnr_read_cq_error \ prov/efa/src/fi_efa_rnr_queue_resend \ prov/efa/src/fi_efa_info_test \ prov/efa/src/fi_efa_rdm_remote_exit_early \ - prov/efa/src/fi_efa_multi_ep_mt + prov/efa/src/fi_efa_multi_ep_mt \ + prov/efa/src/fi_efa_implicit_av_test if HAVE_VERBS_DEVEL if HAVE_EFA_DV @@ -78,6 +79,11 @@ prov_efa_src_fi_efa_multi_ep_mt_SOURCES = \ $(efa_rnr_srcs) prov_efa_src_fi_efa_multi_ep_mt_LDADD = libfabtests.la +prov_efa_src_fi_efa_implicit_av_test_SOURCES = \ + prov/efa/src/efa_shared.h \ + prov/efa/src/efa_implicit_av_test.c +prov_efa_src_fi_efa_implicit_av_test_LDADD = libfabtests.la + if HAVE_VERBS_DEVEL if HAVE_EFA_DV efa_exhaust_mr_reg_srcs = \ diff --git a/fabtests/prov/efa/src/efa_implicit_av_test.c b/fabtests/prov/efa/src/efa_implicit_av_test.c new file mode 100644 index 00000000000..fb863e3bebc --- /dev/null +++ b/fabtests/prov/efa/src/efa_implicit_av_test.c @@ -0,0 +1,490 @@ +/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */ +/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All + * rights reserved. */ + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "shared.h" +#include "hmem.h" + +static struct fid_ep **server_eps; +static char **send_bufs, **recv_bufs; +static struct fid_mr **send_mrs, **recv_mrs; +static void **send_descs, **recv_descs; +static struct fi_context2 *recv_ctx, *send_ctx; +static struct fid_cq **server_txcqs, **server_rxcqs; +static struct fid_av **server_avs; +static fi_addr_t *remote_fiaddr; +static int num_server_eps = 3; +static bool directed_recv = false; +static bool unexpected_path = false; +static bool implicit_av = false; + +int get_one_comp(struct fid_cq *cq) +{ + struct fi_cq_err_entry comp; + struct fi_cq_err_entry cq_err; + + memset(&cq_err, 0, sizeof(cq_err)); + int ret; + + do { + ret = fi_cq_read(cq, &comp, 1); + if (ret > 0) + break; + + if (ret < 0) { + if (ret != -FI_EAGAIN) { + printf("fi_cq_read returns error %d\n", ret); + (void) fi_cq_readerr(cq, &cq_err, 0); + return ret; + } + } + } while (1); + + return FI_SUCCESS; +} + +static void free_res(void) +{ + int i; + + for (i = 0; i < num_server_eps; i++) { + FT_CLOSE_FID(send_mrs[i]); + FT_CLOSE_FID(recv_mrs[i]); + + if (send_bufs[i]) + (void) ft_hmem_free(opts.iface, (void *) send_bufs[i]); + if (recv_bufs[i]) + (void) ft_hmem_free(opts.iface, (void *) recv_bufs[i]); + } + + free(send_bufs); + free(recv_bufs); + free(send_mrs); + free(recv_mrs); + free(send_descs); + free(recv_descs); + free(send_ctx); + free(recv_ctx); + free(remote_fiaddr); +} + +static void free_server_res(void) +{ + int i; + + free_res(); + + for (i = 0; i < num_server_eps; i++) { + FT_CLOSE_FID(server_eps[i]); + FT_CLOSE_FID(server_txcqs[i]); + FT_CLOSE_FID(server_rxcqs[i]); + FT_CLOSE_FID(server_avs[i]); + } + + free(server_txcqs); + free(server_rxcqs); + free(server_eps); + free(server_avs); +} + +static int alloc_bufs(void) +{ + int i, ret; + size_t alloc_size; + + remote_fiaddr = calloc(num_server_eps, sizeof(*remote_fiaddr)); + send_ctx = calloc(num_server_eps, sizeof(*send_ctx)); + recv_ctx = calloc(num_server_eps, sizeof(*recv_ctx)); + + send_bufs = calloc(num_server_eps, sizeof(*send_bufs)); + recv_bufs = calloc(num_server_eps, sizeof(*recv_bufs)); + + if (!send_bufs || !recv_bufs || !remote_fiaddr || !send_ctx || + !recv_ctx) + return -FI_ENOMEM; + + alloc_size = opts.transfer_size < FT_MAX_CTRL_MSG ? FT_MAX_CTRL_MSG : opts.transfer_size; + for (i = 0; i < num_server_eps; i++) { + ret = ft_hmem_alloc(opts.iface, opts.device, + (void **) &send_bufs[i], + alloc_size); + if (ret) + return ret; + + ret = ft_hmem_alloc(opts.iface, opts.device, + (void **) &recv_bufs[i], + alloc_size); + if (ret) + return ret; + + if (ft_check_opts(FT_OPT_VERIFY_DATA)) { + ret = ft_fill_buf(send_bufs[i], opts.transfer_size); + if (ret) + return ret; + } + } + + return 0; +} + +static int alloc_server_res(void) +{ + int ret; + + server_eps = calloc(num_server_eps, sizeof(*server_eps)); + server_txcqs = calloc(num_server_eps, sizeof(*server_txcqs)); + server_rxcqs = calloc(num_server_eps, sizeof(*server_rxcqs)); + server_avs = calloc(num_server_eps, sizeof(*server_avs)); + + if (!server_eps || !server_txcqs || !server_rxcqs || !server_avs) + return -FI_ENOMEM; + + ret = alloc_bufs(); + if (ret) + return ret; + + return 0; +} + +static int setup_server_ep(int idx) +{ + int ret; + + ret = fi_endpoint(domain, fi, &server_eps[idx], NULL); + if (ret) { + FT_PRINTERR("fi_endpoint", ret); + return ret; + } + + ret = ft_alloc_ep_res(fi, &server_txcqs[idx], &server_rxcqs[idx], + NULL, NULL, NULL, &server_avs[idx]); + if (ret) + return ret; + + ret = ft_enable_ep(server_eps[idx], eq, server_avs[idx], + server_txcqs[idx], server_rxcqs[idx], + NULL, NULL, NULL); + if (ret) + return ret; + + return 0; +} + +static int reg_mrs(void) +{ + int i, ret; + + send_mrs = calloc(num_server_eps, sizeof(*send_mrs)); + recv_mrs = calloc(num_server_eps, sizeof(*recv_mrs)); + send_descs = calloc(num_server_eps, sizeof(*send_descs)); + recv_descs = calloc(num_server_eps, sizeof(*recv_descs)); + + if (!send_mrs || !recv_mrs || !send_descs || + !recv_descs) + return -FI_ENOMEM; + + for (i = 0; i < num_server_eps; i++) { + ret = ft_reg_mr(fi, send_bufs[i], opts.transfer_size, + ft_info_to_mr_access(fi), + (FT_MR_KEY + 1) * (i + 1), opts.iface, + opts.device, &send_mrs[i], + &send_descs[i]); + if (ret) + return ret; + + ret = ft_reg_mr(fi, recv_bufs[i], opts.transfer_size, + ft_info_to_mr_access(fi), + (FT_MR_KEY + 2) * (i + 2), opts.iface, + opts.device, &recv_mrs[i], + &recv_descs[i]); + if (ret) + return ret; + } + + return 0; +} + +static int server_post_send(int idx) +{ + return ft_post_tx_buf(server_eps[idx], remote_fiaddr[idx], + opts.transfer_size, NO_CQ_DATA, &send_ctx[idx], + send_bufs[idx], send_descs[idx], ft_tag); +} + +static int client_post_recv(void) +{ + int i, ret = 0; + fi_addr_t addr; + + for (i = 0; i < num_server_eps; i++) { + if (directed_recv) + addr = remote_fiaddr[i]; + else + addr = FI_ADDR_UNSPEC; + + ret = ft_post_rx_buf(ep, addr, opts.transfer_size, + &recv_ctx[i], recv_bufs[i], recv_descs[i], + ft_tag); + if (ret) { + FT_PRINTERR("client_post_recv_directed", ret); + return ret; + } + } + + return ret; +} + +static int run_test(void) +{ + int i, ret; + + if (opts.dst_addr) { + /* Client side - single endpoint */ + FT_INFO("Client: Step 1 - Post receive buffers\n"); + + ret = alloc_bufs(); + if (ret) + goto cleanup_client; + + ret = reg_mrs(); + if (ret) + goto cleanup_client; + + if (!unexpected_path) { + ret = client_post_recv(); + if (ret) + goto cleanup_client; + } + + FT_INFO("Client: Initial sync\n"); + ft_sync_oob(); + + if (implicit_av) { + FT_INFO("Implicit AV. Only server inserts client's address\n"); + /* Send client endpoint address to server */ + for (i = 0; i < num_server_eps; i++) { + ret = ft_send_addr_oob(ep); + if (ret) { + FT_PRINTERR("ft_server_insert_addr_oob", ret); + goto cleanup_client; + } + } + } else { + FT_INFO("Not using implicit AV. Full address exchange \n"); + for (i = 0; i < num_server_eps; i++) { + ft_init_av_dst_addr(av, ep, &remote_fiaddr[i]); + } + } + + /* TODO: poll CQ while waiting for the OOB sync from the sender. + * Doing so will allow the test to run with large message sizes + * that use emulated RMA protocols and with delivery complete */ + FT_INFO("Client: Sync after send complete\n"); + ft_sync_oob(); + + if (unexpected_path) { + ret = client_post_recv(); + if (ret) + goto cleanup_client; + } + + FT_INFO("Client: Waiting for messages from %d server endpoints\n", num_server_eps); + /* Wait for all receive completions */ + for (i = 0; i < num_server_eps; i++) { + ret = get_one_comp(rxcq); + if (ret) { + FT_PRINTERR("get_client_comp", ret); + goto cleanup_client; + } + } + + if (ft_check_opts(FT_OPT_VERIFY_DATA)) { + for (i = 0; i < num_server_eps; i++) { + ret = ft_check_buf(recv_bufs[i], + opts.transfer_size); + if (ret) { + FT_PRINTERR("ft_check_buf", ret); + goto cleanup_client; + } + } + } + + FT_INFO("Client: Final sync before end of test\n"); + ft_sync_oob(); + +cleanup_client: + free_res(); + + } else { + /* Server side - multiple endpoints */ + FT_INFO("Server: Creating %d endpoints\n", num_server_eps); + + /* Create server endpoints */ + ret = alloc_server_res(); + if (ret) + return ret; + + for (i = 0; i < num_server_eps; i++) { + ret = setup_server_ep(i); + if (ret) + goto cleanup_server; + } + + ret = reg_mrs(); + if (ret) + goto cleanup_server; + + FT_INFO("Server: Initial sync\n"); + ft_sync_oob(); + + if (implicit_av) { + /* Initialize AV for each endpoint OOB */ + FT_INFO("Implicit AV. Only sender inserts receiver's address\n"); + for (i = 0; i < num_server_eps; i++) { + ret = ft_recv_addr_oob(server_avs[i], &remote_fiaddr[i]); + if (ret) { + FT_PRINTERR("ft_server_insert_addr_oob", ret); + goto cleanup_server; + } + } + } else { + FT_INFO("Not using implicit AV. Full address exchange \n"); + for (i = 0; i < num_server_eps; i++) { + ft_init_av_dst_addr(server_avs[i], server_eps[i], &remote_fiaddr[i]); + FT_INFO("fi_addr %ld\n", remote_fiaddr[i]); + } + } + + FT_INFO("Server: Step 1 - Send messages from all endpoints\n"); + /* Send from all endpoints */ + for (i = 0; i < num_server_eps; i++) { + ret = server_post_send(i); + if (ret) { + FT_PRINTERR("server_post_send", ret); + goto cleanup_server; + } + } + + /* Wait for all send completions */ + for (i = 0; i < num_server_eps; i++) { + ret = get_one_comp(server_txcqs[i]); + if (ret) { + FT_PRINTERR("get_server_comp", ret); + goto cleanup_server; + } + } + + FT_INFO("Server: Sync after send complete\n"); + ft_sync_oob(); + + FT_INFO("Server: Final sync before end of test\n"); + ft_sync_oob(); + +cleanup_server: + free_server_res(); + } + + if (ret == FI_SUCCESS) + FT_INFO("Test completed successfully\n"); + return ret; +} + +int main(int argc, char **argv) +{ + int i, op, ret; + + opts = INIT_OPTS; + opts.transfer_size = 64; + opts.options |= FT_OPT_OOB_ADDR_EXCH; + + hints = fi_allocinfo(); + if (!hints) + return EXIT_FAILURE; + + while ((op = getopt(argc, argv, "c:XLrvUh" ADDR_OPTS INFO_OPTS CS_OPTS)) != -1) { + switch (op) { + default: + ft_parse_addr_opts(op, optarg, &opts); + ft_parseinfo(op, optarg, hints, &opts); + ft_parsecsopts(op, optarg, &opts); + break; + case 'c': + num_server_eps = atoi(optarg); + break; + case 'X': + unexpected_path = true; + break; + case 'L': + implicit_av = true; + break; + case 'r': + directed_recv = true; + break; + case 'v': + opts.options |= FT_OPT_VERIFY_DATA; + break; + case 'U': + FT_ERR("Test does not support delivery complete\n"); + return -FI_EINVAL; + case '?': + case 'h': + ft_usage(argv[0], "AV message order test"); + FT_PRINT_OPTS_USAGE("-c ", + "number of server endpoints (default 3)"); + return EXIT_FAILURE; + } + } + + if (directed_recv && implicit_av) { + FT_ERR("Directed receive cannot be used with implicit AV\n"); + return -FI_EINVAL; + } + + /* Exchange addresses OOB and avoid posting initial receive to avoid + * conflict with receives posted in the test */ + opts.options |= FT_OPT_OOB_ADDR_EXCH | FT_OPT_SKIP_MSG_ALLOC; + + if (optind < argc) + opts.dst_addr = argv[optind]; + + hints->caps = FI_MSG; + hints->mode = FI_CONTEXT | FI_CONTEXT2; + hints->ep_attr->type = FI_EP_RDM; + hints->domain_attr->mr_mode = opts.mr_mode; + hints->addr_format = opts.address_format; + + ret = ft_init_fabric(); + if (ret) + return ret; + + if (!(opts.options & FT_OPT_SIZE)) { + for (i = 0; i < TEST_CNT; i++) { + if (!ft_use_size(i, opts.sizes_enabled)) + continue; + opts.transfer_size = test_size[i].size; + FT_INFO("Running test for message size: %ld\n", opts.transfer_size); + ret = run_test(); + if (ret) + return ret; + } + } else { + FT_INFO("Running test for message size: %ld\n", + opts.transfer_size); + ret = run_test(); + if (ret) + return ret; + } + + ft_free_res(); + return ft_exit_code(ret); +} diff --git a/fabtests/pytest/efa/test_rdm.py b/fabtests/pytest/efa/test_rdm.py index bbe664e3e34..0039d23367a 100644 --- a/fabtests/pytest/efa/test_rdm.py +++ b/fabtests/pytest/efa/test_rdm.py @@ -29,53 +29,6 @@ def test_rdm_pingpong(cmdline_args, iteration_type, completion_semantic, direct_message_size if fabric == "efa-direct" else "all", completion_type=completion_type, fabric=fabric) -# These two tests skip efa-direct because efa-direct does not -# do memory registrations on behalf of the application -@pytest.mark.functional -@pytest.mark.serial -def test_mr_exhaustion_rdm_pingpong(cmdline_args, completion_semantic): - import os - binpath = cmdline_args.binpath or "" - if not os.path.exists(os.path.join(binpath, "fi_efa_exhaust_mr_reg_rdm_pingpong")): - pytest.skip("fi_efa_exhaust_mr_reg_rdm_pingpong requires efadv") - efa_run_client_server_test(cmdline_args, "fi_efa_exhaust_mr_reg_rdm_pingpong", "short", - completion_semantic, "host_to_host", "all", timeout=1000, - fabric="efa") - -@pytest.mark.parametrize("mr_cache", [True, False]) -@pytest.mark.parametrize("iteration_type", - [pytest.param("short", marks=pytest.mark.short), - pytest.param("standard", marks=pytest.mark.standard)]) -def test_rdm_pingpong_no_mr_local(cmdline_args, iteration_type, completion_semantic, - memory_type_bi_dir, completion_type, mr_cache): - command = "fi_rdm_pingpong -M mr_local" + " " + perf_progress_model_cli - - additional_env = '' - if not mr_cache: - additional_env = 'FI_EFA_MR_CACHE_ENABLE=0' - - efa_run_client_server_test(cmdline_args, command, iteration_type, - completion_semantic, memory_type_bi_dir, "all", - completion_type=completion_type, fabric="efa", - additional_env=additional_env) - -@pytest.mark.parametrize("mr_cache", [True, False]) -@pytest.mark.parametrize("iteration_type", - [pytest.param("short", marks=pytest.mark.short), - pytest.param("standard", marks=pytest.mark.standard)]) -def test_rma_pingpong_no_mr_local(cmdline_args, iteration_type, completion_semantic, - memory_type_bi_dir, mr_cache): - command = "fi_rma_pingpong -o writedata -M mr_local" + " " + perf_progress_model_cli - - additional_env = '' - if not mr_cache: - additional_env = 'FI_EFA_MR_CACHE_ENABLE=0' - - efa_run_client_server_test(cmdline_args, command, iteration_type, - completion_semantic, memory_type_bi_dir, "all", - completion_type="queue", fabric="efa", - additional_env=additional_env) - @pytest.mark.functional def test_rdm_pingpong_range(cmdline_args, completion_semantic, memory_type_bi_dir, message_size, direct_message_size, fabric): efa_run_client_server_test(cmdline_args, "fi_rdm_pingpong", "short", @@ -215,3 +168,121 @@ def test_rdm_pingpong_sread(cmdline_args, completion_semantic, memory_type_bi_di efa_run_client_server_test(cmdline_args, f"fi_rdm_pingpong -c {comp_method}", "short", completion_semantic, memory_type_bi_dir, direct_message_size, fabric="efa-direct") + + +# These tests skip efa-direct because efa-direct does not +# do memory registrations on behalf of the application +@pytest.mark.functional +@pytest.mark.serial +def test_mr_exhaustion_rdm_pingpong(cmdline_args, completion_semantic): + import os + binpath = cmdline_args.binpath or "" + if not os.path.exists(os.path.join(binpath, "fi_efa_exhaust_mr_reg_rdm_pingpong")): + pytest.skip("fi_efa_exhaust_mr_reg_rdm_pingpong requires efadv") + efa_run_client_server_test(cmdline_args, "fi_efa_exhaust_mr_reg_rdm_pingpong", "short", + completion_semantic, "host_to_host", "all", timeout=1000, + fabric="efa") + +@pytest.mark.parametrize("mr_cache", [True, False]) +@pytest.mark.parametrize("iteration_type", + [pytest.param("short", marks=pytest.mark.short), + pytest.param("standard", marks=pytest.mark.standard)]) +def test_rdm_pingpong_no_mr_local(cmdline_args, iteration_type, completion_semantic, + memory_type_bi_dir, completion_type, mr_cache): + command = "fi_rdm_pingpong -M mr_local" + " " + perf_progress_model_cli + + additional_env = '' + if not mr_cache: + additional_env = 'FI_EFA_MR_CACHE_ENABLE=0' + + efa_run_client_server_test(cmdline_args, command, iteration_type, + completion_semantic, memory_type_bi_dir, "all", + completion_type=completion_type, fabric="efa", + additional_env=additional_env) + +@pytest.mark.parametrize("mr_cache", [True, False]) +@pytest.mark.parametrize("iteration_type", + [pytest.param("short", marks=pytest.mark.short), + pytest.param("standard", marks=pytest.mark.standard)]) +def test_rma_pingpong_no_mr_local(cmdline_args, iteration_type, completion_semantic, + memory_type_bi_dir, mr_cache): + command = "fi_rma_pingpong -o writedata -M mr_local" + " " + perf_progress_model_cli + + additional_env = '' + if not mr_cache: + additional_env = 'FI_EFA_MR_CACHE_ENABLE=0' + + efa_run_client_server_test(cmdline_args, command, iteration_type, + completion_semantic, memory_type_bi_dir, "all", + completion_type="queue", fabric="efa", + additional_env=additional_env) + + +@pytest.mark.functional +@pytest.mark.serial +@pytest.mark.parametrize("unexpected_path", [True, False]) +# TODO: Add test with (1) larger message size that uses the long read protocol +# and (2) SHM provider with CMA protocol after fixing the test to poll CQ while +# waiting for the server to get send completions +@pytest.mark.parametrize("msg_size", [1, 1024, 8192, 24756]) +# @pytest.mark.parametrize("msg_count", [1, 1024, 2048]) # below and above shm's default rx size +def test_implicit_av(cmdline_args, unexpected_path, msg_size): + import os + binpath = cmdline_args.binpath or "" + if not os.path.exists(os.path.join(binpath, "fi_efa_implicit_av_test")): + pytest.skip("implicit AV test not found") + + if (cmdline_args.server_id == cmdline_args.client_id) and unexpected_path and msg_size > 1024: + pytest.skip("SHM provider will use CMA protocol needs test modifications") + + test_cmd = f"fi_efa_implicit_av_test -L -c 5 -S {msg_size}" + if unexpected_path: + test_cmd += " -X" + + efa_run_client_server_test(cmdline_args, test_cmd, "short", + "transmit_complete", "host_to_host", "all", fabric="efa") + +@pytest.mark.functional +@pytest.mark.serial +# TODO: Add test with larger message size that uses the long read protocol after +# fixing the test to poll CQ while waiting for the server to get send completions +@pytest.mark.parametrize("msg_size", [1, 1024, 8192, 24756]) +def test_implicit_av_limited_av_size_expected_path(cmdline_args, msg_size): + import os + binpath = cmdline_args.binpath or "" + if not os.path.exists(os.path.join(binpath, "fi_efa_implicit_av_test")): + pytest.skip("implicit AV test not found") + + additional_env = 'FI_EFA_IMPLICIT_AV_SIZE=3' + + test_cmd = f"fi_efa_implicit_av_test -L -c 5 -S {msg_size}" + + efa_run_client_server_test(cmdline_args, test_cmd, "short", + "transmit_complete", "host_to_host", "all", fabric="efa", + additional_env=additional_env, timeout=30) + +@pytest.mark.functional +@pytest.mark.serial +# This test is expected to fail because the implicit AV size of 3 is smaller than +# the number of server endpoints of 5. So implicit AV entries will get evicted +# on the client and cause the test to hang. +@pytest.mark.xfail(strict=True) +# TODO: Add test with larger message size that uses the long read protocol after +# fixing the test to poll CQ while waiting for the server to get send completions +@pytest.mark.parametrize("msg_size", [1, 1024, 8192, 24756]) +def test_implicit_av_limited_av_size_unexpected_path(cmdline_args, msg_size, node_count): + import os + binpath = cmdline_args.binpath or "" + if not os.path.exists(os.path.join(binpath, "fi_efa_implicit_av_test")): + pytest.skip("implicit AV test not found") + + if cmdline_args.server_id == cmdline_args.client_id: + pytest.skip("This test will only fail for the EFA protocol path - which requires 2 nodes.") + + additional_env = 'FI_EFA_IMPLICIT_AV_SIZE=3' + + test_cmd = f"fi_efa_implicit_av_test -L -X -c 5 -S {msg_size}" + + efa_run_client_server_test(cmdline_args, test_cmd, "short", + "transmit_complete", "host_to_host", "all", fabric="efa", + additional_env=additional_env, timeout=15) diff --git a/include/ofi_util.h b/include/ofi_util.h index 11f990524fc..a473108af5b 100644 --- a/include/ofi_util.h +++ b/include/ofi_util.h @@ -1287,6 +1287,20 @@ struct ofi_ops_flow_ctrl { ssize_t (*send_handler)(struct fid_ep *ep, uint64_t credits)); }; +enum util_rx_entry_status { + /* rx entries with status RX_ENTRY_POSTED are associated with + application posted fi_recv calls that have not yet been matched to + packets from peer endpoints */ + RX_ENTRY_POSTED = 0, + /* rx entries with status RX_ENTRY_UNEXP are associated with + packets from peer endpoints that have not yet been matched to + application posted fi_recv */ + RX_ENTRY_UNEXP, + /* rx entries with status RX_ENTRY_MATCHED are associated with matched + pairs of packets and application posted receives */ + RX_ENTRY_MATCHED +}; + struct util_rx_entry { union { struct dlist_entry d_entry; @@ -1296,6 +1310,7 @@ struct util_rx_entry { uint64_t seq_no; uint64_t ignore; int multi_recv_ref; + enum util_rx_entry_status status; /* extra memory allocated at the end of each entry to hold iovecs and * MR descriptors. The amount of memory is determined by the provider's * iov limit. diff --git a/prov/efa/src/efa_av.c b/prov/efa/src/efa_av.c index eafcfeed462..663118a206f 100644 --- a/prov/efa/src/efa_av.c +++ b/prov/efa/src/efa_av.c @@ -573,11 +573,11 @@ static inline int efa_av_implicit_av_lru_insert(struct efa_av *av, dlist_pop_front(&av->implicit_av_lru_list, struct efa_conn, conn_to_release, implicit_av_lru_entry); EFA_INFO(FI_LOG_AV, - "Evicting AV entry for peer AHN %" PRIu16 " QPN %" PRIu16 - " QKEY %" PRIu32 " from " + "Evicting AV entry for peer implicit fi_addr %" PRIu64 + " AHN %" PRIu16 " QPN %" PRIu16 " QKEY %" PRIu32 " from " "implicit AV\n", - conn_to_release->ah->ahn, conn_to_release->ep_addr->qpn, - conn_to_release->ep_addr->qkey); + conn_to_release->implicit_fi_addr, conn_to_release->ah->ahn, + conn_to_release->ep_addr->qpn, conn_to_release->ep_addr->qkey); /* Add to hashset with list of evicted peers */ ep_addr_hashable = malloc(sizeof(struct efa_ep_addr_hashable)); diff --git a/prov/efa/src/rdm/efa_rdm_pke_nonreq.c b/prov/efa/src/rdm/efa_rdm_pke_nonreq.c index 88769b713c9..2ab06351e52 100644 --- a/prov/efa/src/rdm/efa_rdm_pke_nonreq.c +++ b/prov/efa/src/rdm/efa_rdm_pke_nonreq.c @@ -99,8 +99,11 @@ void efa_rdm_pke_handle_handshake_recv(struct efa_rdm_pke *pkt_entry) peer = pkt_entry->peer; assert(peer); - EFA_DBG(FI_LOG_CQ, - "HANDSHAKE received from %" PRIu64 "\n", pkt_entry->peer->conn->fi_addr); + EFA_INFO(FI_LOG_CQ, + "HANDSHAKE received from peer with explicit fi_addr %" PRIu64 + " implicit fi_addr %" PRIu64 "\n", + pkt_entry->peer->conn->fi_addr, + pkt_entry->peer->conn->implicit_fi_addr); handshake_pkt = (struct efa_rdm_handshake_hdr *)pkt_entry->wiredata; diff --git a/prov/efa/src/rdm/efa_rdm_pke_rtm.c b/prov/efa/src/rdm/efa_rdm_pke_rtm.c index bc7a14e64d1..06a446418e8 100644 --- a/prov/efa/src/rdm/efa_rdm_pke_rtm.c +++ b/prov/efa/src/rdm/efa_rdm_pke_rtm.c @@ -270,7 +270,7 @@ ssize_t efa_rdm_pke_proc_matched_rtm(struct efa_rdm_pke *pkt_entry) * * @param[in,out] pkt_entry non-tagged RTM packet entry */ -static ssize_t efa_rdm_pke_proc_msgrtm(struct efa_rdm_pke *pkt_entry) +ssize_t efa_rdm_pke_proc_msgrtm(struct efa_rdm_pke *pkt_entry) { ssize_t err; struct efa_rdm_ep *ep; diff --git a/prov/efa/src/rdm/efa_rdm_pke_rtm.h b/prov/efa/src/rdm/efa_rdm_pke_rtm.h index f0d562b2106..ced0ee4bce1 100644 --- a/prov/efa/src/rdm/efa_rdm_pke_rtm.h +++ b/prov/efa/src/rdm/efa_rdm_pke_rtm.h @@ -92,6 +92,8 @@ void efa_rdm_pke_rtm_update_rxe(struct efa_rdm_pke *pkt_entry, ssize_t efa_rdm_pke_proc_matched_rtm(struct efa_rdm_pke *pkt_entry); +ssize_t efa_rdm_pke_proc_msgrtm(struct efa_rdm_pke *pkt_entry); + ssize_t efa_rdm_pke_proc_rtm_rta(struct efa_rdm_pke *pkt_entry, struct efa_rdm_peer *peer); void efa_rdm_pke_handle_rtm_rta_recv(struct efa_rdm_pke *pkt_entry); diff --git a/prov/efa/test/efa_unit_test_srx.c b/prov/efa/test/efa_unit_test_srx.c index 6282dbf683d..c5b708ef9b4 100644 --- a/prov/efa/test/efa_unit_test_srx.c +++ b/prov/efa/test/efa_unit_test_srx.c @@ -6,6 +6,7 @@ #include "efa_rdm_ep.h" #include "efa_rdm_msg.h" #include "efa_av.h" +#include "efa_rdm_pke_rtm.h" /** * @brief This test validates whether the default min_multi_recv size is correctly @@ -76,62 +77,71 @@ void test_efa_srx_lock(struct efa_resource **state) */ void test_efa_srx_unexp_pkt(struct efa_resource **state) { - struct efa_resource *resource = *state; - struct efa_rdm_ep *efa_rdm_ep; - struct util_srx_ctx *srx_ctx; - struct efa_rdm_ope *rxe; - struct efa_rdm_pke *pke; + struct efa_resource *resource = *state; + struct efa_rdm_ep *efa_rdm_ep; + struct util_srx_ctx *srx_ctx; + struct efa_rdm_ope *rxe; + struct efa_rdm_pke *pke; struct efa_ep_addr raw_addr = {0}; size_t raw_addr_len = sizeof(raw_addr); - struct efa_conn conn = {0}; - struct efa_rdm_peer peer; - struct efa_unit_test_eager_rtm_pkt_attr pke_attr = { - .msg_id = 0, - .connid = 0x1234 - }; - - g_efa_unit_test_mocks.efa_rdm_pke_proc_matched_rtm = &efa_mock_efa_rdm_pke_proc_matched_rtm_no_op; - - efa_unit_test_resource_construct(resource, FI_EP_RDM, EFA_FABRIC_NAME); - - efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid); - srx_ctx = efa_rdm_ep_get_peer_srx_ctx(efa_rdm_ep); - - /* Fake a rx pkt entry */ - pke = efa_rdm_pke_alloc(efa_rdm_ep, efa_rdm_ep->efa_rx_pkt_pool, EFA_RDM_PKE_FROM_EFA_RX_POOL); - assert_non_null(pke); - efa_rdm_ep->efa_rx_pkts_posted = efa_rdm_ep_get_rx_pool_size(efa_rdm_ep); - - /* Create a fake peer */ - /* TODO: peer must be constructed by CQ read path */ - assert_int_equal(fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0); - raw_addr.qpn = 0; - raw_addr.qkey = 0x1234; - conn.ep_addr = &raw_addr; - efa_rdm_peer_construct(&peer, efa_rdm_ep, &conn); - pke->peer = &peer; - - efa_unit_test_eager_msgrtm_pkt_construct(pke, &pke_attr); - /** - * Allocate an rxe with the rx pkt. - * Since there is no recv posted, the rxe must be unexpected - */ - ofi_genlock_lock(srx_ctx->lock); - rxe = efa_rdm_msg_alloc_rxe_for_msgrtm(efa_rdm_ep, &pke); - assert_true(rxe->state == EFA_RDM_RXE_UNEXP); - assert_true(rxe->unexp_pkt == pke); - - /* Start progressing the unexpected rxe */ - srx_ctx->peer_srx.peer_ops->start_msg(rxe->peer_rxe); - - /* Make sure rxe is updated as mateched and unexp_pkt is NULL */ - assert_true(rxe->state == EFA_RDM_RXE_MATCHED); - assert_true(rxe->unexp_pkt == NULL); - - efa_rdm_pke_release_rx(pke); - efa_rdm_rxe_release(rxe); - ofi_genlock_unlock(srx_ctx->lock); - - /* Destroy the fake peer constructed above */ - efa_rdm_peer_destruct(&peer, efa_rdm_ep); + struct efa_conn conn = {0}; + struct efa_rdm_peer peer; + struct efa_unit_test_eager_rtm_pkt_attr pke_attr = {.msg_id = 0, + .connid = 0x1234}; + void *desc; + struct iovec iov; + + g_efa_unit_test_mocks.efa_rdm_pke_proc_matched_rtm = + &efa_mock_efa_rdm_pke_proc_matched_rtm_no_op; + + efa_unit_test_resource_construct(resource, FI_EP_RDM, EFA_FABRIC_NAME); + + efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, + base_ep.util_ep.ep_fid); + srx_ctx = efa_rdm_ep_get_peer_srx_ctx(efa_rdm_ep); + + /* Fake a rx pkt entry */ + pke = efa_rdm_pke_alloc(efa_rdm_ep, efa_rdm_ep->efa_rx_pkt_pool, + EFA_RDM_PKE_FROM_EFA_RX_POOL); + assert_non_null(pke); + efa_rdm_ep->efa_rx_pkts_posted = + efa_rdm_ep_get_rx_pool_size(efa_rdm_ep); + + /* Create a fake peer */ + /* TODO: peer must be constructed by CQ read path */ + assert_int_equal( + fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0); + raw_addr.qpn = 0; + raw_addr.qkey = 0x1234; + conn.ep_addr = &raw_addr; + efa_rdm_peer_construct(&peer, efa_rdm_ep, &conn); + pke->peer = &peer; + + efa_unit_test_eager_msgrtm_pkt_construct(pke, &pke_attr); + /** + * Allocate an rxe with the rx pkt. + * Since there is no recv posted, the rxe must be unexpected + */ + ofi_genlock_lock(srx_ctx->lock); + rxe = efa_rdm_msg_alloc_rxe_for_msgrtm(efa_rdm_ep, &pke); + assert_true(rxe->state == EFA_RDM_RXE_UNEXP); + assert_true(rxe->unexp_pkt == pke); + srx_ctx->peer_srx.owner_ops->queue_msg(rxe->peer_rxe); + ofi_genlock_unlock(srx_ctx->lock); + + /* Fake an application posted receive */ + util_srx_generic_recv(efa_rdm_ep->peer_srx_ep, &iov, &desc, 1, 0, NULL, + 0); + + /* Make sure rxe is updated as mateched and unexp_pkt is NULL */ + assert_true(rxe->state == EFA_RDM_RXE_MATCHED); + assert_true(rxe->unexp_pkt == NULL); + + ofi_genlock_lock(srx_ctx->lock); + efa_rdm_pke_release_rx(pke); + efa_rdm_rxe_release(rxe); + ofi_genlock_unlock(srx_ctx->lock); + + /* Destroy the fake peer constructed above */ + efa_rdm_peer_destruct(&peer, efa_rdm_ep); } diff --git a/prov/util/src/util_srx.c b/prov/util/src/util_srx.c index 53d6ac35a30..3ed2527ce5b 100644 --- a/prov/util/src/util_srx.c +++ b/prov/util/src/util_srx.c @@ -58,6 +58,7 @@ static void util_init_rx_entry(struct util_rx_entry *entry, size_t count, fi_addr_t addr, void *context, uint64_t tag, uint64_t flags) { + entry->status = RX_ENTRY_POSTED; memcpy(entry->peer_entry.iov, iov, sizeof(*iov) * count); if (desc) memcpy(entry->peer_entry.desc, desc, sizeof(*desc) * count); @@ -103,6 +104,7 @@ static struct util_rx_entry *util_init_unexp(struct util_srx_ctx *srx, if (!util_entry) return NULL; + util_entry->status = RX_ENTRY_UNEXP; util_entry->peer_entry.owner_context = NULL; util_entry->peer_entry.msg_size = attr->msg_size; util_entry->peer_entry.addr = attr->addr; @@ -148,8 +150,10 @@ static struct util_rx_entry *util_process_multi_recv(struct util_srx_ctx *srx, return NULL; if (util_adjust_multi_recv(srx, &owner_entry->peer_entry, - attr->msg_size)) + attr->msg_size)) { + util_entry->status = RX_ENTRY_MATCHED; slist_remove_head(queue); + } util_entry->peer_entry.owner_context = owner_entry; owner_entry->multi_recv_ref++; @@ -176,6 +180,7 @@ static int util_match_msg(struct fid_peer_srx *srx, } else { head = srx_ctx->msg_queue.head; util_entry = container_of(head, struct util_rx_entry, s_entry); + assert(util_entry->status == RX_ENTRY_POSTED); if (util_entry->peer_entry.flags & FI_MULTI_RECV) { util_entry = util_process_multi_recv(srx_ctx, &srx_ctx->msg_queue, attr, util_entry); @@ -186,6 +191,7 @@ static int util_match_msg(struct fid_peer_srx *srx, return -FI_ENOMEM; } } else { + util_entry->status = RX_ENTRY_MATCHED; (void) slist_remove_head(&srx_ctx->msg_queue); } util_entry->peer_entry.srx = srx; @@ -224,6 +230,8 @@ static int util_get_msg(struct fid_peer_srx *srx, } } + util_entry->status = RX_ENTRY_MATCHED; + if (util_entry->peer_entry.flags & FI_MULTI_RECV) { util_entry = util_process_multi_recv(srx_ctx, queue, attr, util_entry); @@ -253,8 +261,10 @@ static int util_match_tag(struct fid_peer_srx *srx, srx_ctx = srx->ep_fid.fid.context; slist_foreach(&srx_ctx->tag_queue, item, prev) { util_entry = container_of(item, struct util_rx_entry, s_entry); + assert(util_entry->status == RX_ENTRY_POSTED); if (ofi_match_tag(util_entry->peer_entry.tag, util_entry->ignore, attr->tag)) { + util_entry->status = RX_ENTRY_MATCHED; util_entry->peer_entry.srx = srx; srx_ctx->update_func(srx_ctx, util_entry); slist_remove(&srx_ctx->tag_queue, item, prev); @@ -318,6 +328,7 @@ static int util_get_tag(struct fid_peer_srx *srx, break; } } + util_entry->status = RX_ENTRY_MATCHED; util_entry->peer_entry.srx = srx; srx_ctx->update_func(srx_ctx, util_entry); *rx_entry = &util_entry->peer_entry; @@ -334,6 +345,7 @@ static int util_queue_msg(struct fi_peer_rx_entry *rx_entry) assert(ofi_genlock_held(srx_ctx->lock)); util_entry = container_of(rx_entry, struct util_rx_entry, peer_entry); + assert(util_entry->status == RX_ENTRY_UNEXP); if (rx_entry->addr == FI_ADDR_UNSPEC) { dlist_insert_tail(&util_entry->d_entry, &srx_ctx->unspec_unexp_msg_queue); @@ -358,6 +370,7 @@ static int util_queue_tag(struct fi_peer_rx_entry *rx_entry) assert(ofi_genlock_held(srx_ctx->lock)); util_entry = container_of(rx_entry, struct util_rx_entry, peer_entry); + assert(util_entry->status == RX_ENTRY_UNEXP); if (rx_entry->addr == FI_ADDR_UNSPEC) { dlist_insert_tail(&util_entry->d_entry, &srx_ctx->unspec_unexp_tag_queue); @@ -377,6 +390,9 @@ static int util_queue_tag(struct fi_peer_rx_entry *rx_entry) static void util_free_entry(struct fi_peer_rx_entry *entry) { struct util_srx_ctx *srx; + struct util_unexp_peer *unexp_peer; + struct slist *queue; + struct slist_entry *item, *prev; struct util_rx_entry *util_entry, *owner_entry; srx = (struct util_srx_ctx *) entry->srx->ep_fid.fid.context; @@ -397,6 +413,30 @@ static void util_free_entry(struct fi_peer_rx_entry *entry) ofi_buf_free(owner_entry); } } + + if (util_entry->status == RX_ENTRY_UNEXP) { + if (util_entry->peer_entry.addr == FI_ADDR_UNSPEC) { + dlist_remove(&util_entry->d_entry); + } else { + unexp_peer = ofi_array_at(&srx->src_unexp_peers, + util_entry->peer_entry.addr); + queue = util_entry->peer_entry.flags & FI_TAGGED ? + &unexp_peer->tag_queue : + &unexp_peer->msg_queue; + + slist_foreach(queue, item, prev) { + if ((struct util_rx_entry *) item == util_entry) + slist_remove(queue, item, prev); + } + + if (!--unexp_peer->cnt) { + assert(slist_empty(&unexp_peer->msg_queue) && + slist_empty(&unexp_peer->tag_queue)); + dlist_remove(&unexp_peer->entry); + } + } + } + ofi_buf_free(util_entry); } @@ -551,6 +591,7 @@ static ssize_t util_generic_mrecv(struct util_srx_ctx *srx, rx_entry = util_search_unexp_msg(srx, addr); while (rx_entry) { + rx_entry->status = RX_ENTRY_MATCHED; buf_done = util_unexp_mrecv(srx, mrecv_entry, rx_entry); srx->update_func(srx, rx_entry);