Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions ompi/communicator/comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,10 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group,
/**********************************************************************/
/**********************************************************************/
/**********************************************************************/
/*
** Counterpart to MPI_Comm_split. To be used within OMPI (e.g. MPI_Cart_sub).
*/
int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
ompi_communicator_t **newcomm, bool pass_on_topo )

int ompi_comm_split_with_info( ompi_communicator_t* comm, int color, int key,
opal_info_t *info,
ompi_communicator_t **newcomm, bool pass_on_topo )
{
int myinfo[2];
int size, my_size;
Expand Down Expand Up @@ -611,7 +610,11 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
snprintf(newcomp->c_name, MPI_MAX_OBJECT_NAME, "MPI COMMUNICATOR %d SPLIT FROM %d",
newcomp->c_contextid, comm->c_contextid );


/* Copy info if there is one */
if (info) {
newcomp->super.s_info = OBJ_NEW(opal_info_t);
opal_info_dup(info, &(newcomp->super.s_info));
}

/* Activate the communicator and init coll-component */
rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode);
Expand All @@ -638,6 +641,15 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
}


/*
** Counterpart to MPI_Comm_split. To be used within OMPI (e.g. MPI_Cart_sub).
*/
int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
ompi_communicator_t **newcomm, bool pass_on_topo )
{
return ompi_comm_split_with_info(comm, color, key, NULL, newcomm, pass_on_topo);
}

/**********************************************************************/
/**********************************************************************/
/**********************************************************************/
Expand Down
15 changes: 15 additions & 0 deletions ompi/communicator/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,21 @@ int ompi_topo_dist_graph_create_adjacent(ompi_communicator_t *old_comm,
OMPI_DECLSPEC int ompi_comm_split (ompi_communicator_t *comm, int color, int key,
ompi_communicator_t** newcomm, bool pass_on_topo);

/**
* split a communicator based on color and key. Parameters
* are identical to the MPI-counterpart of the function.
* Similar to \see ompi_comm_split with an additional info parameter.
*
* @param comm: input communicator
* @param color
* @param key
*
* @
*/
OMPI_DECLSPEC int ompi_comm_split_with_info( ompi_communicator_t* comm, int color, int key,
opal_info_t *info,
ompi_communicator_t **newcomm, bool pass_on_topo );

/**
* split a communicator based on type and key. Parameters
* are identical to the MPI-counterpart of the function.
Expand Down
28 changes: 28 additions & 0 deletions ompi/group/group.c
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,31 @@ bool ompi_group_have_remote_peers (ompi_group_t *group)

return false;
}

/**
* Count the number of processes on this group that share the same node as
* this process.
*/
int ompi_group_count_local_peers (ompi_group_t *group)
{
int local_peers = 0;
for (int i = 0 ; i < group->grp_proc_count ; ++i) {
ompi_proc_t *proc = NULL;
#if OMPI_GROUP_SPARSE
proc = ompi_group_peer_lookup (group, i);
#else
proc = ompi_group_get_proc_ptr_raw (group, i);
if (ompi_proc_is_sentinel (proc)) {
/* the proc must be stored in the group or cached in the proc
* hash table if the process resides in the local node
* (see ompi_proc_complete_init) */
continue;
}
#endif
if (OPAL_PROC_ON_LOCAL_NODE(proc->super.proc_flags)) {
local_peers++;
}
}

return local_peers;
}
8 changes: 8 additions & 0 deletions ompi/group/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,16 @@ static inline struct ompi_proc_t *ompi_group_peer_lookup_existing (ompi_group_t
return ompi_group_get_proc_ptr (group, peer_id, false);
}

/**
* Return true if all processes in the group are not on the local node.
*/
bool ompi_group_have_remote_peers (ompi_group_t *group);

/**
* Count the number of processes on the local node.
*/
int ompi_group_count_local_peers (ompi_group_t *group);

/**
* Function to print the group info
*/
Expand Down
4 changes: 2 additions & 2 deletions ompi/mca/coll/adapt/coll_adapt_ibcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ static int send_cb(ompi_request_t * req)
|| (context->con->tree->tree_nextsize > 0 && rank != context->con->root
&& num_sent == context->con->tree->tree_nextsize * context->con->num_segs
&& num_recv_fini == context->con->num_segs)) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Singal in send\n",
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Signal in send\n",
ompi_comm_rank(context->con->comm)));
ibcast_request_fini(context);
}
Expand Down Expand Up @@ -306,7 +306,7 @@ static int recv_cb(ompi_request_t * req)
&& num_recv_fini == context->con->num_segs)
|| (context->con->tree->tree_nextsize == 0
&& num_recv_fini == context->con->num_segs)) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Singal in recv\n",
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Signal in recv\n",
ompi_comm_rank(context->con->comm)));
ibcast_request_fini(context);
}
Expand Down
101 changes: 98 additions & 3 deletions ompi/mca/coll/base/coll_base_comm_select.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "mpi.h"
#include "ompi/communicator/communicator.h"
#include "opal/util/output.h"
#include "opal/util/argv.h"
#include "opal/util/show_help.h"
#include "opal/class/opal_list.h"
#include "opal/class/opal_object.h"
Expand Down Expand Up @@ -312,6 +313,20 @@ static int avail_coll_compare (opal_list_item_t **a,
return 0;
}

static inline int
component_in_argv(char **argv, const char* component_name)
{
if( NULL != argv ) {
while( NULL != *argv ) {
if( 0 == strcmp(component_name, *argv) ) {
return 1;
}
argv++; /* move to the next argument */
}
}
return 0;
}

/*
* For each module in the list, check and see if it wants to run, and
* do the resulting priority comparison. Make a list of modules to be
Expand All @@ -321,20 +336,80 @@ static int avail_coll_compare (opal_list_item_t **a,
static opal_list_t *check_components(opal_list_t * components,
ompi_communicator_t * comm)
{
int priority;
int priority, flag;
const mca_base_component_t *component;
mca_base_component_list_item_t *cli;
mca_coll_base_module_2_3_0_t *module;
opal_list_t *selectable;
mca_coll_base_avail_coll_t *avail;

char info_val[OPAL_MAX_INFO_VAL+1];
char **coll_argv = NULL, **coll_exclude = NULL, **coll_include = NULL;

/* Check if this communicator comes with restrictions on the collective modules
* it wants to use. The restrictions are consistent with the MCA parameter
* to limit the collective components loaded, but it applies for each
* communicator and is provided as an info key during the communicator
* creation. Unlike the MCA param, this info key is used not to select
* components but either to prevent components from being used or to
* force a change in the component priority.
*/
if( NULL != comm->super.s_info) {
opal_info_get(comm->super.s_info, "ompi_comm_coll_preference",
sizeof(info_val), info_val, &flag);
if( !flag ) {
goto proceed_to_select;
}
coll_argv = opal_argv_split(info_val, ',');
if(NULL == coll_argv) {
goto proceed_to_select;
}
int idx2, count_include = opal_argv_count(coll_argv);
/* Allocate the coll_include argv */
coll_include = (char**)malloc((count_include + 1) * sizeof(char*));
coll_include[count_include] = NULL; /* NULL terminated array */
/* Dispatch the include/exclude in the corresponding arrays */
for( int idx = 0; NULL != coll_argv[idx]; idx++ ) {
if( '^' == coll_argv[idx][0] ) {
coll_include[idx] = NULL; /* NULL terminated array */

/* Allocate the coll_exclude argv */
coll_exclude = (char**)malloc((count_include - idx + 1) * sizeof(char*));
/* save the exclude components */
for( idx2 = idx; NULL != coll_argv[idx2]; idx2++ ) {
coll_exclude[idx2 - idx] = coll_argv[idx2];
}
coll_exclude[idx2 - idx] = NULL; /* NULL-terminated array */
coll_exclude[0] = coll_exclude[0] + 1; /* get rid of the ^ */
count_include = idx;
break;
}
coll_include[idx] = coll_argv[idx];
}
/* Reverse the order of the coll_inclide argv to faciliate the ordering of
* the selected components reverse.
*/
for( idx2 = 0; idx2 < (count_include - 1); idx2++ ) {
char* temp = coll_include[idx2];
coll_include[idx2] = coll_include[count_include - 1];
coll_include[count_include - 1] = temp;
count_include--;
}
}
proceed_to_select:
/* Make a list of the components that query successfully */
selectable = OBJ_NEW(opal_list_t);

/* Scan through the list of components */
OPAL_LIST_FOREACH(cli, &ompi_coll_base_framework.framework_components, mca_base_component_list_item_t) {
component = cli->cli_component;

/* dont bother is we have this component in the exclusion list */
if( component_in_argv(coll_exclude, component->mca_component_name) ) {
opal_output_verbose(10, ompi_coll_base_framework.framework_output,
"coll:base:comm_select: component disqualified: %s (due to communicator info key)",
component->mca_component_name );
continue;
}
priority = check_one_component(comm, component, &module);
if (priority >= 0) {
/* We have a component that indicated that it wants to run
Expand Down Expand Up @@ -370,6 +445,27 @@ static opal_list_t *check_components(opal_list_t * components,
/* Put this list in priority order */
opal_list_sort(selectable, avail_coll_compare);

/* For all valid component reorder them not on their provided priorities but on
* the order requested in the info key. As at this point the coll_include is
* already ordered backward we can simply prepend the components.
*/
mca_coll_base_avail_coll_t *item, *item_next;
OPAL_LIST_FOREACH_SAFE(item, item_next,
selectable, mca_coll_base_avail_coll_t) {
if( component_in_argv(coll_include, item->ac_component_name) ) {
opal_list_remove_item(selectable, &item->super);
opal_list_prepend(selectable, &item->super);
}
}

opal_argv_free(coll_argv);
if( NULL != coll_exclude ) {
free(coll_exclude);
}
if( NULL != coll_include ) {
free(coll_include);
}

/* All done */
return selectable;
}
Expand Down Expand Up @@ -403,7 +499,6 @@ static int check_one_component(ompi_communicator_t * comm,
return priority;
}


/**************************************************************************
* Query functions
**************************************************************************/
Expand Down
Loading