Skip to content

Commit 76c62fd

Browse files
authored
Merge pull request open-mpi#12032 from wenduwan/topo_aware_coll_comm_v2
Topo aware coll comm v2
2 parents 256994f + 61c9403 commit 76c62fd

File tree

5 files changed

+49
-14
lines changed

5 files changed

+49
-14
lines changed

contrib/check_unnecessary_headers.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
# Copyright (c) 2004-2005 The Regents of the University of California.
1212
# All rights reserved.
1313
# Copyright (c) 2009 Oak Ridge National Labs. All rights reserved.
14-
# Copyright (c) 2022 Amazon.com, Inc. or its affiliates.
14+
# Copyright (c) Amazon.com, Inc. or its affiliates.
1515
# All Rights reserved.
1616
#
1717
#
@@ -181,8 +181,8 @@ SEARCH_HEADER[0]="ompi/attribute/attribute.h ATTR_HASH_SIZE OMPI_KEYVAL_PREDEFIN
181181
SEARCH_HEADER[1]="ompi/class/ompi_free_list.h ompi_free_list_item_init_fn_t ompi_free_list_t ompi_free_list_item_t ompi_free_list_init_ex ompi_free_list_init ompi_free_list_init_ex_new ompi_free_list_init_new ompi_free_list_grow ompi_free_list_resize ompi_free_list_pos_t OMPI_FREE_LIST_POS_BEGINNING ompi_free_list_parse OMPI_FREE_LIST_GET OMPI_FREE_LIST_WAIT __ompi_free_list_wait OMPI_FREE_LIST_RETURN"
182182
SEARCH_HEADER[2]="ompi/class/ompi_rb_tree.h ompi_rb_tree_nodecolor_t ompi_rb_tree_node_t ompi_rb_tree_comp_fn_t ompi_rb_tree_t ompi_rb_tree_condition_fn_t ompi_rb_tree_action_fn_t ompi_rb_tree_construct ompi_rb_tree_destruct ompi_rb_tree_init ompi_rb_tree_insert ompi_rb_tree_find_with ompi_rb_tree_find ompi_rb_tree_delete ompi_rb_tree_destroy ompi_rb_tree_traverse ompi_rb_tree_size"
183183
SEARCH_HEADER[3]="ompi/class/ompi_seq_tracker.h ompi_seq_tracker_range_t ompi_seq_tracker_t ompi_seq_tracker_check_duplicate ompi_seq_tracker_insert ompi_seq_tracker_copy"
184-
SEARCH_HEADER[4]="ompi/communicator/communicator.h MPI_Comm MPI_COMM_WORLD ompi_communicator_t OMPI_COMM_INTER OMPI_COMM_CART OMPI_COMM_GRAPH OMPI_COMM_NAMEISSET OMPI_COMM_ISFREED OMPI_COMM_INTRINSIC OMPI_COMM_DYNAMIC OMPI_COMM_INVALID OMPI_COMM_PML_ADDED OMPI_COMM_IS_ OMPI_COMM_SET_ OMPI_COMM_ALLGATHER_TAG OMPI_COMM_BARRIER_TAG OMPI_COMM_ALLREDUCE_TAG OMPI_COMM_CID_ OMPI_COMM_BLOCK_ ompi_predefined_communicator_t ompi_mpi_comm_parent ompi_mpi_comm_world ompi_mpi_comm_self ompi_mpi_comm_null ompi_comm_invalid ompi_comm_rank ompi_comm_size ompi_comm_remote_size ompi_comm_get_cid ompi_comm_lookup ompi_comm_peer_lookup ompi_comm_peer_invalid ompi_comm_init ompi_comm_link_function ompi_comm_group ompi_comm_create ompi_topo_create ompi_comm_split ompi_comm_dup ompi_comm_compare ompi_comm_free ompi_comm_allocate ompi_comm_nextcid ompi_comm_finalize ompi_comm_set ompi_comm_get_rprocs ompi_comm_overlapping_groups ompi_comm_determine_first ompi_comm_activate ompi_comm_dump ompi_comm_set_name ompi_comm_reg_init ompi_comm_reg_finalize ompi_comm_num_dyncomm ompi_mpi_cxx_comm_errhandler_invoke"
185-
SEARCH_HEADER[5]="ompi/datatype/convertor.h OMPI_COMM_INTER OMPI_COMM_CART OMPI_COMM_GRAPH OMPI_COMM_NAMEISSET OMPI_COMM_ISFREED OMPI_COMM_INTRINSIC OMPI_COMM_DYNAMIC OMPI_COMM_INVALID OMPI_COMM_PML_ADDED OMPI_COMM_IS_ OMPI_COMM_SET_ OMPI_COMM_ALLGATHER_TAG OMPI_COMM_BARRIER_TAG OMPI_COMM_ALLREDUCE_TAG OMPI_COMM_CID_ OMPI_COMM_BLOCK_ ompi_predefined_communicator_t ompi_mpi_comm_parent ompi_mpi_comm_null ompi_comm_invalid ompi_comm_rank ompi_comm_size ompi_comm_remote_size ompi_comm_get_cid ompi_comm_lookup ompi_comm_peer_lookup ompi_comm_peer_invalid ompi_comm_init ompi_comm_link_function ompi_comm_group ompi_comm_create ompi_topo_create ompi_comm_split ompi_comm_dup ompi_comm_compare ompi_comm_free ompi_comm_allocate ompi_comm_nextcid ompi_comm_finalize ompi_comm_set ompi_comm_get_rprocs ompi_comm_overlapping_groups ompi_comm_determine_first ompi_comm_activate ompi_comm_dump ompi_comm_set_name ompi_comm_reg_init ompi_comm_reg_finalize ompi_comm_num_dync CONVERTOR_DATATYPE_MASK CONVERTOR_SEND_CONVERSION CONVERTOR_RECV CONVERTOR_SEND CONVERTOR_HOMOGENEOUS CONVERTOR_NO_OP CONVERTOR_WITH_CHECKSUM CONVERTOR_TYPE_MASK CONVERTOR_STATE_START CONVERTOR_STATE_COMPLETE CONVERTOR_STATE_ALLOC CONVERTOR_COMPLETED ompi_convertor_t ompi_convertor_master_t dt_stack_t DT_STATIC_STACK_SIZE ompi_convertor_get_checksum ompi_convertor_pack ompi_convertor_unpack ompi_convertor_create ompi_convertor_cleanup ompi_convertor_need_buffers ompi_convertor_get_packed_size ompi_convertor_get_unpacked_size ompi_convertor_get_current_pointer ompi_convertor_prepare_for_send ompi_convertor_copy_and_prepare_for_send ompi_convertor_prepare_for_recv ompi_convertor_copy_and_prepare_for_recv ompi_convertor_raw ompi_convertor_set_position_nocheck ompi_convertor_set_position ompi_convertor_personalize ompi_convertor_clone ompi_convertor_clone_with_position ompi_convertor_dump ompi_ddt_dump_stack ompi_convertor_generic_simple_position MPI_Datatype"
184+
SEARCH_HEADER[4]="ompi/communicator/communicator.h MPI_Comm MPI_COMM_WORLD ompi_communicator_t OMPI_COMM_INTER OMPI_COMM_CART OMPI_COMM_GRAPH OMPI_COMM_NAMEISSET OMPI_COMM_ISFREED OMPI_COMM_INTRINSIC OMPI_COMM_DYNAMIC OMPI_COMM_INVALID OMPI_COMM_DISJOINT_SET OMPI_COMM_DISJOINT OMPI_COMM_PML_ADDED OMPI_COMM_IS_ OMPI_COMM_SET_ OMPI_COMM_ALLGATHER_TAG OMPI_COMM_BARRIER_TAG OMPI_COMM_ALLREDUCE_TAG OMPI_COMM_CID_ OMPI_COMM_BLOCK_ ompi_predefined_communicator_t ompi_mpi_comm_parent ompi_mpi_comm_world ompi_mpi_comm_self ompi_mpi_comm_null ompi_comm_invalid ompi_comm_rank ompi_comm_size ompi_comm_remote_size ompi_comm_get_cid ompi_comm_lookup ompi_comm_peer_lookup ompi_comm_peer_invalid ompi_comm_init ompi_comm_link_function ompi_comm_group ompi_comm_create ompi_topo_create ompi_comm_split ompi_comm_dup ompi_comm_compare ompi_comm_free ompi_comm_allocate ompi_comm_nextcid ompi_comm_finalize ompi_comm_set ompi_comm_get_rprocs ompi_comm_overlapping_groups ompi_comm_determine_first ompi_comm_activate ompi_comm_dump ompi_comm_set_name ompi_comm_reg_init ompi_comm_reg_finalize ompi_comm_num_dyncomm ompi_mpi_cxx_comm_errhandler_invoke"
185+
SEARCH_HEADER[5]="ompi/datatype/convertor.h OMPI_COMM_INTER OMPI_COMM_CART OMPI_COMM_GRAPH OMPI_COMM_NAMEISSET OMPI_COMM_ISFREED OMPI_COMM_INTRINSIC OMPI_COMM_DYNAMIC OMPI_COMM_INVALID OMPI_COMM_DISJOINT_SET OMPI_COMM_DISJOINT OMPI_COMM_PML_ADDED OMPI_COMM_IS_ OMPI_COMM_SET_ OMPI_COMM_ALLGATHER_TAG OMPI_COMM_BARRIER_TAG OMPI_COMM_ALLREDUCE_TAG OMPI_COMM_CID_ OMPI_COMM_BLOCK_ ompi_predefined_communicator_t ompi_mpi_comm_parent ompi_mpi_comm_null ompi_comm_invalid ompi_comm_rank ompi_comm_size ompi_comm_remote_size ompi_comm_get_cid ompi_comm_lookup ompi_comm_peer_lookup ompi_comm_peer_invalid ompi_comm_init ompi_comm_link_function ompi_comm_group ompi_comm_create ompi_topo_create ompi_comm_split ompi_comm_dup ompi_comm_compare ompi_comm_free ompi_comm_allocate ompi_comm_nextcid ompi_comm_finalize ompi_comm_set ompi_comm_get_rprocs ompi_comm_overlapping_groups ompi_comm_determine_first ompi_comm_activate ompi_comm_dump ompi_comm_set_name ompi_comm_reg_init ompi_comm_reg_finalize ompi_comm_num_dync CONVERTOR_DATATYPE_MASK CONVERTOR_SEND_CONVERSION CONVERTOR_RECV CONVERTOR_SEND CONVERTOR_HOMOGENEOUS CONVERTOR_NO_OP CONVERTOR_WITH_CHECKSUM CONVERTOR_TYPE_MASK CONVERTOR_STATE_START CONVERTOR_STATE_COMPLETE CONVERTOR_STATE_ALLOC CONVERTOR_COMPLETED ompi_convertor_t ompi_convertor_master_t dt_stack_t DT_STATIC_STACK_SIZE ompi_convertor_get_checksum ompi_convertor_pack ompi_convertor_unpack ompi_convertor_create ompi_convertor_cleanup ompi_convertor_need_buffers ompi_convertor_get_packed_size ompi_convertor_get_unpacked_size ompi_convertor_get_current_pointer ompi_convertor_prepare_for_send ompi_convertor_copy_and_prepare_for_send ompi_convertor_prepare_for_recv ompi_convertor_copy_and_prepare_for_recv ompi_convertor_raw ompi_convertor_set_position_nocheck ompi_convertor_set_position ompi_convertor_personalize ompi_convertor_clone ompi_convertor_clone_with_position ompi_convertor_dump ompi_ddt_dump_stack ompi_convertor_generic_simple_position MPI_Datatype"
186186
SEARCH_HEADER[6]="ompi/datatype/datatype.h MPI_Datatype DT_MAX_PREDEFINED DT_FLAG_ MAX_DT_COMPONENT_COUNT opal_ddt_count_t dt_type_desc_t ompi_datatype_t ompi_predefined_datatype_t ompi_ddt_init ompi_ddt_finalize ompi_ddt_create_ ompi_ddt_duplicate ompi_ddt_is_predefined ompi_ddt_create_from_packed_description"
187187
SEARCH_HEADER[7]="ompi/datatype/datatype_internal.h DDT_DUMP_STACK DT_ ddt_elem_id_description ddt_elem_desc ddt_elem_desc_t ddt_loop_desc ddt_loop_desc_t ddt_endloop_desc ddt_endloop_desc_t dt_elem_desc CREATE_LOOP_START CREATE_LOOP_END CREATE_ELEM ompi_complex_float_t ompi_complex_double_t ompi_complex_long_double_t ompi_ddt_basicDatatypes BASIC_DDT_FROM_ELEM ompi_ddt_default_convertors_init ompi_ddt_default_convertors_fini SAVE_STACK PUSH_STACK ompi_ddt_safeguard_pointer_debug_breakpoint OMPI_DDT_SAFEGUARD_POINTER GET_FIRST_NON_LOOP UPDATE_INTERNAL_COUNTERS ompi_ddt_print_args"
188188
SEARCH_HEADER[8]="ompi/errhandler/errhandler.h OMPI_ERRHANDLER_LANG_ ompi_errhandler_lang_t OMPI_ERRHANDLER_TYPE_ ompi_errhandler_type_t ompi_errhandler_t ompi_predefined_errhandler_t ompi_mpi_errhandler_null OMPI_ERRHANDLER_CHECK OMPI_ERRHANDLER_RETURN ompi_errhandler_init ompi_errhandler_finalize OMPI_ERRHANDLER_INVOKE ompi_errhandler_invoke ompi_errhandler_request_invoke ompi_errhandler_create ompi_errhandler_is_intrinsic ompi_errhandler_fortran_handler_fn_t OMPI_ERR_INIT_FINALIZE MPI_Errhandler"

ompi/communicator/comm_cid.c

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ struct ompi_comm_cid_context_t {
9898
int remote_leader;
9999
int iter;
100100
/** storage for activate barrier */
101-
int ok;
101+
int max_local_peers;
102102
char *port_string;
103103
bool send_first;
104104
int pml_tag;
@@ -266,7 +266,7 @@ static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t
266266

267267
context->send_first = send_first;
268268
context->iter = 0;
269-
context->ok = 1;
269+
context->max_local_peers = ompi_group_count_local_peers(newcomm->c_local_group);
270270

271271
return context;
272272
}
@@ -771,9 +771,33 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request)
771771
/* Non-blocking version of ompi_comm_activate */
772772
static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request);
773773

774-
static int ompi_comm_activate_complete (ompi_communicator_t **newcomm, ompi_communicator_t *comm)
774+
/* Callback function to set communicator disjointness flags */
775+
static inline void ompi_comm_set_disjointness_nb_complete(ompi_comm_cid_context_t *context)
776+
{
777+
if (OMPI_COMM_IS_DISJOINT_SET(*context->newcommp)) {
778+
opal_show_help("help-comm.txt", "disjointness-set-again", true);
779+
return;
780+
}
781+
782+
if (1 == context->max_local_peers) {
783+
(*context->newcommp)->c_flags |= OMPI_COMM_DISJOINT;
784+
} else {
785+
(*context->newcommp)->c_flags &= ~OMPI_COMM_DISJOINT;
786+
}
787+
(*context->newcommp)->c_flags |= OMPI_COMM_DISJOINT_SET;
788+
}
789+
790+
static int ompi_comm_activate_complete (ompi_comm_cid_context_t *context)
775791
{
776792
int ret;
793+
ompi_communicator_t **newcomm = context->newcommp, *comm = context->comm;
794+
795+
/**
796+
* Determine the new communicator's disjointness based on
797+
* context->max_local_peers. It is reduced on the communicator
798+
* before ompi_comm_activate_nb_complete is called.
799+
*/
800+
ompi_comm_set_disjointness_nb_complete(context);
777801

778802
/**
779803
* Check to see if this process is in the new communicator.
@@ -846,7 +870,7 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *c
846870
ompi_comm_cid_context_t *context;
847871
ompi_comm_request_t *request;
848872
ompi_request_t *subreq;
849-
int ret = 0;
873+
int ret = 0, local_peers = -1;
850874

851875
/* the caller should not pass NULL for comm (it may be the same as *newcomm) */
852876
assert (NULL != comm);
@@ -878,10 +902,13 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *c
878902
OMPI_COMM_SET_PML_ADDED(*newcomm);
879903
}
880904

881-
/* Step 1: the barrier, after which it is allowed to
882-
* send messages over the new communicator
905+
/**
906+
* Dual-purpose barrier:
907+
* 1. The communicator's disjointness is inferred from max_local_peers.
908+
* 2. After the operation it is allowed to send messages over the new communicator.
883909
*/
884-
ret = context->allreduce_fn (&context->ok, &context->ok, 1, MPI_MIN, context,
910+
local_peers = context->max_local_peers;
911+
ret = context->allreduce_fn (&local_peers, &context->max_local_peers, 1, MPI_MAX, context,
885912
&subreq);
886913
if (OMPI_SUCCESS != ret) {
887914
ompi_comm_request_return (request);
@@ -920,7 +947,7 @@ int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm
920947
static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request)
921948
{
922949
ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
923-
return ompi_comm_activate_complete (context->newcommp, context->comm);
950+
return ompi_comm_activate_complete (context);
924951
}
925952

926953
/**************************************************************************/

ompi/communicator/communicator.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_communicator_t);
6262
#define OMPI_COMM_DYNAMIC 0x00000008
6363
#define OMPI_COMM_ISFREED 0x00000010
6464
#define OMPI_COMM_INVALID 0x00000020
65+
#define OMPI_COMM_DISJOINT_SET 0x00000040
66+
#define OMPI_COMM_DISJOINT 0x00000080
6567
#define OMPI_COMM_CART 0x00000100
6668
#define OMPI_COMM_GRAPH 0x00000200
6769
#define OMPI_COMM_DIST_GRAPH 0x00000400
@@ -80,6 +82,8 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_communicator_t);
8082
#define OMPI_COMM_IS_FREED(comm) ((comm)->c_flags & OMPI_COMM_ISFREED)
8183
#define OMPI_COMM_IS_DYNAMIC(comm) ((comm)->c_flags & OMPI_COMM_DYNAMIC)
8284
#define OMPI_COMM_IS_INVALID(comm) ((comm)->c_flags & OMPI_COMM_INVALID)
85+
#define OMPI_COMM_IS_DISJOINT_SET(comm) ((comm)->c_flags & OMPI_COMM_DISJOINT_SET)
86+
#define OMPI_COMM_IS_DISJOINT(comm) ((comm)->c_flags & OMPI_COMM_DISJOINT)
8387
#define OMPI_COMM_IS_PML_ADDED(comm) ((comm)->c_flags & OMPI_COMM_PML_ADDED)
8488
#define OMPI_COMM_IS_EXTRA_RETAIN(comm) ((comm)->c_flags & OMPI_COMM_EXTRA_RETAIN)
8589
#define OMPI_COMM_IS_TOPO(comm) (OMPI_COMM_IS_CART((comm)) || \

ompi/communicator/help-comm.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,6 @@ in a call to MPI_Comm_split_type between peers in the communicator.
3434
[unexpected-split-type]
3535
Detected an unexpected split type in a call to MPI_Comm_split_type.
3636
split_type: %s (%d)
37+
[disjointness-set-again]
38+
Communicator disjointness should only be set once at initialization.
39+
Attempts to modify the state are illegal and shall be ignored.

ompi/mca/coll/han/coll_han_subcomms.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ int mca_coll_han_comm_create(struct ompi_communicator_t *comm,
282282
opal_info_set(&comm_info, "ompi_comm_coll_preference", "tuned,^han");
283283
ompi_comm_split_type(comm, MPI_COMM_TYPE_SHARED, 0,
284284
&comm_info, &(low_comms[0]));
285+
assert(OMPI_COMM_IS_DISJOINT_SET(low_comms[0]) && !OMPI_COMM_IS_DISJOINT(low_comms[0]));
285286

286287
/*
287288
* Get my local rank and the local size
@@ -296,6 +297,7 @@ int mca_coll_han_comm_create(struct ompi_communicator_t *comm,
296297
opal_info_set(&comm_info, "ompi_comm_coll_preference", "sm,^han");
297298
ompi_comm_split_type(comm, MPI_COMM_TYPE_SHARED, 0,
298299
&comm_info, &(low_comms[1]));
300+
assert(OMPI_COMM_IS_DISJOINT_SET(low_comms[1]) && !OMPI_COMM_IS_DISJOINT(low_comms[1]));
299301

300302
/*
301303
* Upgrade libnbc module priority to set up up_comms[0] with libnbc module
@@ -304,15 +306,16 @@ int mca_coll_han_comm_create(struct ompi_communicator_t *comm,
304306
*/
305307
opal_info_set(&comm_info, "ompi_comm_coll_preference", "libnbc,^han");
306308
ompi_comm_split_with_info(comm, low_rank, w_rank, &comm_info, &(up_comms[0]), false);
307-
308309
up_rank = ompi_comm_rank(up_comms[0]);
310+
assert(OMPI_COMM_IS_DISJOINT_SET(up_comms[0]) && OMPI_COMM_IS_DISJOINT(up_comms[0]));
309311

310312
/*
311313
* Upgrade adapt module priority to set up up_comms[0] with adapt module
312314
* This sub-communicator contains one process per node.
313315
*/
314316
opal_info_set(&comm_info, "ompi_comm_coll_preference", "adapt,^han");
315317
ompi_comm_split_with_info(comm, low_rank, w_rank, &comm_info, &(up_comms[1]), false);
318+
assert(OMPI_COMM_IS_DISJOINT_SET(up_comms[1]) && OMPI_COMM_IS_DISJOINT(up_comms[1]));
316319

317320
/*
318321
* Set my virtual rank number.
@@ -350,5 +353,3 @@ int mca_coll_han_comm_create(struct ompi_communicator_t *comm,
350353
OBJ_DESTRUCT(&comm_info);
351354
return OMPI_SUCCESS;
352355
}
353-
354-

0 commit comments

Comments
 (0)