Skip to content

Commit

Permalink
parallel load
Browse files Browse the repository at this point in the history
  • Loading branch information
luoxiaojian committed Sep 5, 2024
1 parent fd4f50e commit c783034
Show file tree
Hide file tree
Showing 13 changed files with 654 additions and 302 deletions.
120 changes: 120 additions & 0 deletions examples/analytical_apps/run_app_opt.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,72 @@ void RunUndirectedPageRankOpt(const CommSpec& comm_spec,
}
}

template <LoadStrategy load_strategy>
void RunUndirectedPageRankOptString(const CommSpec& comm_spec,
const std::string& out_prefix,
const ParallelEngineSpec& spec,
double delta, int mr) {
timer_next("load graph");
LoadGraphSpec graph_spec = DefaultLoadGraphSpec();
graph_spec.set_directed(FLAGS_directed);
graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor);
graph_spec.load_concurrency = FLAGS_load_concurrency;
if (FLAGS_deserialize) {
graph_spec.set_deserialize(true, FLAGS_serialization_prefix);
}
if (FLAGS_serialize) {
graph_spec.set_serialize(true, FLAGS_serialization_prefix);
}
graph_spec.partitioner_type =
parse_partitioner_type_name(FLAGS_partitioner_type);
graph_spec.idxer_type = parse_idxer_type_name(FLAGS_idxer_type);

// using FRAG_T = ImmutableEdgecutFragment<int64_t, uint32_t, EmptyType,
using FRAG_T = ImmutableEdgecutFragment<std::string, uint32_t, EmptyType,
EmptyType, load_strategy>;
std::shared_ptr<FRAG_T> fragment =
LoadGraph<FRAG_T>(FLAGS_efile, FLAGS_vfile, comm_spec, graph_spec);
bool push;
if (fragment->fnum() >= 8) {
uint64_t local_ivnum = fragment->GetInnerVerticesNum();
uint64_t local_ovnum = fragment->GetOuterVerticesNum();
uint64_t total_ivnum, total_ovnum;
MPI_Allreduce(&local_ivnum, &total_ivnum, 1, MPI_UINT64_T, MPI_SUM,
comm_spec.comm());
MPI_Allreduce(&local_ovnum, &total_ovnum, 1, MPI_UINT64_T, MPI_SUM,
comm_spec.comm());

double avg_degree = static_cast<double>(FLAGS_edge_num) /
static_cast<double>(FLAGS_vertex_num);
double rate =
static_cast<double>(total_ovnum) / static_cast<double>(total_ivnum);

if (rate < 0.5) {
// not to many outer vertices
push = true;
} else if (avg_degree > 60) {
// dense
push = true;
} else {
push = false;
}
} else {
push = true;
}

if (!push) {
using AppType = PageRankOpt<FRAG_T>;
auto app = std::make_shared<AppType>();
DoQuery<FRAG_T, AppType, double, int>(fragment, app, comm_spec, spec,
out_prefix, delta, mr);
} else {
using AppType = PageRankPushOpt<FRAG_T>;
auto app = std::make_shared<AppType>();
DoQuery<FRAG_T, AppType, double, int>(fragment, app, comm_spec, spec,
out_prefix, delta, mr);
}
}

template <typename VERTEX_MAP_T>
std::pair<int64_t, int64_t> get_min_max_id(const VERTEX_MAP_T& vm) {
fid_t fnum = vm.GetFragmentNum();
Expand Down Expand Up @@ -287,6 +353,37 @@ void CreateAndQueryOpt(const CommSpec& comm_spec, const std::string& out_prefix,
args...);
}

template <typename EDATA_T, LoadStrategy load_strategy,
template <class> class APP_T, typename... Args>
void CreateAndQueryOptString(const CommSpec& comm_spec,
const std::string& out_prefix,
const ParallelEngineSpec& spec, Args... args) {
timer_next("load graph");
LoadGraphSpec graph_spec = DefaultLoadGraphSpec();
graph_spec.set_directed(FLAGS_directed);
graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor);
graph_spec.load_concurrency = FLAGS_load_concurrency;
if (FLAGS_deserialize) {
graph_spec.set_deserialize(true, FLAGS_serialization_prefix);
}
if (FLAGS_serialize) {
graph_spec.set_serialize(true, FLAGS_serialization_prefix);
}
graph_spec.partitioner_type =
parse_partitioner_type_name(FLAGS_partitioner_type);
graph_spec.idxer_type = parse_idxer_type_name(FLAGS_idxer_type);

using FRAG_T =
ImmutableEdgecutFragment<std::string, uint32_t, grape::EmptyType, EDATA_T,
load_strategy>;
std::shared_ptr<FRAG_T> fragment =
LoadGraph<FRAG_T>(FLAGS_efile, FLAGS_vfile, comm_spec, graph_spec);
using AppType = APP_T<FRAG_T>;
auto app = std::make_shared<AppType>();
DoQuery<FRAG_T, AppType, Args...>(fragment, app, comm_spec, spec, out_prefix,
args...);
}

template <typename EDATA_T, LoadStrategy load_strategy,
template <class> class APP1_T, template <class> class APP2_T,
typename... Args>
Expand Down Expand Up @@ -391,6 +488,29 @@ void RunOpt() {
RunUndirectedPageRankOpt<LoadStrategy::kOnlyOut>(
comm_spec, out_prefix, spec, FLAGS_pr_d, FLAGS_pr_mr);
}
} else if (name == "pagerank_str") {
if (FLAGS_directed) {
if (FLAGS_partitioner_type == "default") {
FLAGS_partitioner_type = "hash";
}
if (FLAGS_idxer_type == "default") {
FLAGS_idxer_type = "pthash";
}
CreateAndQueryOptString<EmptyType, LoadStrategy::kBothOutIn,
PageRankDirected, double, int>(
comm_spec, out_prefix, spec, FLAGS_pr_d, FLAGS_pr_mr);
} else {
FLAGS_rebalance = true;
FLAGS_rebalance_vertex_factor = 0;
if (FLAGS_partitioner_type == "default") {
FLAGS_partitioner_type = "segment";
}
if (FLAGS_idxer_type == "default") {
FLAGS_idxer_type = "sorted_array";
}
RunUndirectedPageRankOptString<LoadStrategy::kOnlyOut>(
comm_spec, out_prefix, spec, FLAGS_pr_d, FLAGS_pr_mr);
}
} else if (name == "cdlp") {
if (FLAGS_directed) {
FLAGS_directed = false;
Expand Down
22 changes: 22 additions & 0 deletions grape/communication/shuffle.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,14 @@ void foreach_helper(const Tuple& t, const Func& func,
}
}

template <typename Tuple, typename Func, std::size_t... index>
void range_foreach_helper(const Tuple& t, size_t begin, size_t end,
const Func& func, index_sequence<index...>) {
for (size_t i = begin; i < end; ++i) {
func(get_const_buffer<index>(t)[i]...);
}
}

template <typename Tuple, typename Func, std::size_t... index>
void foreach_rval_helper(Tuple& t, const Func& func, index_sequence<index...>) {
size_t size = t.size();
Expand All @@ -314,11 +322,25 @@ void foreach_rval_helper(Tuple& t, const Func& func, index_sequence<index...>) {
}
}

template <typename Tuple, typename Func, std::size_t... index>
void range_foreach_rval_helper(Tuple& t, size_t begin, size_t end,
const Func& func, index_sequence<index...>) {
for (size_t i = begin; i < end; ++i) {
func(std::move(get_buffer<index>(t)[i])...);
}
}

template <typename Tuple, typename Func>
void foreach(Tuple& t, const Func& func) {
foreach_helper(t, func, make_index_sequence<Tuple::tuple_size>{});
}

template <typename Tuple, typename Func>
void range_foreach_rval(Tuple& t, size_t begin, size_t end, const Func& func) {
range_foreach_rval_helper(t, begin, end, func,
make_index_sequence<Tuple::tuple_size>{});
}

template <typename Tuple, typename Func>
void foreach_rval(Tuple& t, const Func& func) {
foreach_rval_helper(t, func, make_index_sequence<Tuple::tuple_size>{});
Expand Down
Loading

0 comments on commit c783034

Please sign in to comment.