Skip to content

Commit 6803221

Browse files
committed
LibDistributed version 0.0.9
Major Changes: + Added serialization support for std::string + Added work_queue_options to provide customization points for parallelism, specifically work group sizes and root process + Queue can now have multiple processes per worker or for the master + TaskManager now returns get_subcommunicator for nested parallelism Minor Changes: Encapsulated some of the calls to MPI, future versions of LibDistributed _may_ support different distributed programming frameworks
1 parent e2064b6 commit 6803221

7 files changed

+597
-56
lines changed

include/libdistributed_comm.h

+86
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <variant>
1111
#include <vector>
1212
#include <map>
13+
#include <string>
1314
#include <mpi.h>
1415

1516
/**
@@ -730,6 +731,91 @@ struct serializer<std::vector<T>>
730731
}
731732
};
732733

734+
/** specialization for serialization for vector types */
735+
template <class CharT>
736+
struct serializer<std::basic_string<CharT>>
737+
{
738+
/** is the type serializable using MPI_Datatypes for both the sender and
739+
* receiver at compile time?*/
740+
using mpi_type = std::false_type;
741+
/** \returns a string representing the name of the type */
742+
static std::string name() {
743+
std::stringstream n;
744+
n << "std::string<" << serializer<CharT>::name() << ">";
745+
return n.str();
746+
}
747+
/** \returns a MPI_Datatype to represent the type if mpi_type is true, else MPI_INT */
748+
static MPI_Datatype dtype() { return MPI_INT; }
749+
/**
750+
* Sends a data type from one location to another
751+
* \param[in] t the data to send
752+
* \param[in] dest the MPI rank to send to
753+
* \param[in] tag the MPI tag to send to
754+
* \param[in] comm the MPI_Comm to send to
755+
* \returns an error code from the underlying send */
756+
static int send(std::basic_string<CharT> const& t, int dest, int tag, MPI_Comm comm)
757+
{
758+
int ret = 0;
759+
size_t size = t.size();
760+
ret = MPI_Send(&size, 1, mpi_size_t(), dest, tag, comm);
761+
if(serializer<CharT>::mpi_type::value) {
762+
return MPI_Send(&t.front(), t.size(), serializer<CharT>::dtype(), dest, tag, comm);
763+
} else {
764+
for (auto const& item : t) {
765+
ret |= serializer<CharT>::send(item, dest, tag, comm);
766+
}
767+
}
768+
return ret;
769+
}
770+
/**
771+
* Recv a data type from another location
772+
* \param[in] t the data to recv from
773+
* \param[in] source the MPI rank to recv from
774+
* \param[in] tag the MPI tag to recv from
775+
* \param[in] comm the MPI_Comm to recv from
776+
* \param[in] status the MPI_Status to recv from
777+
* \returns an error code from the underlying recv */
778+
static int recv(std::basic_string<CharT>& t, int source, int tag, MPI_Comm comm,
779+
MPI_Status* status)
780+
{
781+
MPI_Status alt_status;
782+
if(status == MPI_STATUS_IGNORE) status = &alt_status;
783+
size_t size, ret=0;
784+
ret = MPI_Recv(&size, 1, mpi_size_t(), source, tag, comm, status);
785+
t.resize(size);
786+
787+
if (serializer<CharT>::mpi_type::value) {
788+
return MPI_Recv(&t.front(), t.size(), serializer<CharT>::dtype(), status->MPI_SOURCE, status->MPI_TAG, comm, status);
789+
} else {
790+
for (auto& item : t) {
791+
ret |= serializer<CharT>::recv(item, status->MPI_SOURCE, status->MPI_TAG, comm, status);
792+
}
793+
}
794+
return ret;
795+
}
796+
797+
/**
798+
* Broadcast a data type from another location
799+
* \param[in] t the data to broadcast from
800+
* \param[in] root the MPI rank to broadcast from
801+
* \param[in] comm the MPI_Comm to broadcast from
802+
* \returns an error code from the underlying MPI_Bcast(s) */
803+
static int bcast(std::basic_string<CharT>& t, int root, MPI_Comm comm)
804+
{
805+
size_t size = t.size(), ret=0;
806+
ret = MPI_Bcast(&size, 1, mpi_size_t(), root, comm);
807+
t.resize(size);
808+
if (serializer<CharT>::mpi_type::value) {
809+
return MPI_Bcast(&t.front(), t.size(), serializer<CharT>::dtype(), root, comm);
810+
} else {
811+
for (auto& item : t) {
812+
ret |= serializer<CharT>::bcast(item, root, comm);
813+
}
814+
}
815+
return ret;
816+
}
817+
};
818+
733819
template <class T, size_t N>
734820
struct serializer<std::array<T, N>>
735821
{

include/libdistributed_task_manager.h

+6-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class StopToken {
3232
* A distributed token which can be used to request stopping computation on a
3333
* work_queue
3434
*/
35-
template <class RequestType>
35+
template <class RequestType, class CommunicatorType>
3636
class TaskManager: public StopToken {
3737
public:
3838
virtual ~TaskManager()=default;
@@ -42,6 +42,11 @@ class TaskManager: public StopToken {
4242
* \param[in] request the request that you would like the master to enqueue
4343
*/
4444
virtual void push(RequestType const& request)=0;
45+
46+
/**
47+
* Request a sub-communicator for the current process group
48+
*/
49+
virtual CommunicatorType get_subcommunicator()=0;
4550
};
4651

4752
}

include/libdistributed_work_queue.h

+34-16
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <utility>
55
#include "libdistributed_types.h"
66
#include "libdistributed_task_manager.h"
7+
#include "libdistributed_work_queue_options.h"
78
#include "libdistributed_work_queue_impl.h"
89

910
/**
@@ -44,41 +45,58 @@ namespace queue {
4445
*
4546
* \see distributed::queue::TaskManager<RequestType> for details on the semantics about cancellation
4647
*/
47-
template <class TaskForwardIt, class WorkerFunction, class MasterFunction>
48+
template <class TaskRandomIt, class WorkerFunction, class MasterFunction>
4849
void work_queue (
49-
MPI_Comm comm,
50-
TaskForwardIt tasks_begin,
51-
TaskForwardIt tasks_end,
50+
work_queue_options<typename impl::iterator_to_value_type<TaskRandomIt>::type> const& options,
51+
TaskRandomIt tasks_begin,
52+
TaskRandomIt tasks_end,
5253
WorkerFunction worker_fn,
5354
MasterFunction master_fn
5455
) {
5556
//setup communicator
56-
int rank, size;
57-
MPI_Comm queue_comm;
58-
MPI_Comm_dup(comm, &queue_comm);
59-
MPI_Comm_rank(queue_comm, &rank);
60-
MPI_Comm_size(queue_comm, &size);
6157

62-
using RequestType = typename impl::iterator_to_value_type<TaskForwardIt>::type;
58+
using RequestType = typename impl::iterator_to_value_type<TaskRandomIt>::type;
6359
using ResponseType = decltype( impl::maybe_stop_token( worker_fn,
6460
std::declval<RequestType>(),
65-
std::declval<TaskManager<RequestType>&>()
61+
std::declval<TaskManager<RequestType, MPI_Comm>&>()
6662
)
6763
);
64+
65+
66+
if(options.get_queue_size() > 1) {
67+
//create sub-communicators
68+
const int rank = options.get_queue_rank();
69+
auto groups = options.get_groups();
70+
MPI_Comm subcomm;
71+
MPI_Comm_split(
72+
options.get_native_queue_comm(),
73+
groups[rank],
74+
rank,
75+
&subcomm
76+
);
77+
int subrank;
78+
MPI_Comm_rank(subcomm, &subrank);
6879

69-
if(size > 1) {
7080
//determine the request and response types from the input
71-
if(rank == 0) {
72-
impl::master<RequestType, ResponseType>(queue_comm, tasks_begin, tasks_end, master_fn);
81+
if(options.is_master()) {
82+
if(subrank == 0) {
83+
impl::master_main<RequestType, ResponseType>(subcomm, tasks_begin, tasks_end, master_fn, options);
84+
} else {
85+
impl::master_aux<RequestType, ResponseType>(subcomm, master_fn, options);
86+
}
7387
} else {
74-
impl::worker<RequestType, ResponseType>(queue_comm, worker_fn);
88+
if(subrank == 0) {
89+
impl::worker_main<RequestType, ResponseType>(subcomm, worker_fn, options);
90+
} else {
91+
impl::worker_aux<RequestType, ResponseType>(subcomm, worker_fn, options);
92+
}
7593
}
94+
MPI_Comm_free(&subcomm);
7695

7796
} else {
7897
impl::no_workers<RequestType, ResponseType>(tasks_begin, tasks_end, master_fn, worker_fn);
7998
}
8099
comm::serializer::get_type_registry().clear();
81-
MPI_Comm_free(&queue_comm);
82100
}
83101

84102
}

0 commit comments

Comments
 (0)