diff --git a/examples/analytical_apps/run_app_opt.h b/examples/analytical_apps/run_app_opt.h index 23c358da..7004e49d 100644 --- a/examples/analytical_apps/run_app_opt.h +++ b/examples/analytical_apps/run_app_opt.h @@ -120,6 +120,72 @@ void RunUndirectedPageRankOpt(const CommSpec& comm_spec, } } +template +void RunUndirectedPageRankOptString(const CommSpec& comm_spec, + const std::string& out_prefix, + const ParallelEngineSpec& spec, + double delta, int mr) { + timer_next("load graph"); + LoadGraphSpec graph_spec = DefaultLoadGraphSpec(); + graph_spec.set_directed(FLAGS_directed); + graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor); + graph_spec.load_concurrency = FLAGS_load_concurrency; + if (FLAGS_deserialize) { + graph_spec.set_deserialize(true, FLAGS_serialization_prefix); + } + if (FLAGS_serialize) { + graph_spec.set_serialize(true, FLAGS_serialization_prefix); + } + graph_spec.partitioner_type = + parse_partitioner_type_name(FLAGS_partitioner_type); + graph_spec.idxer_type = parse_idxer_type_name(FLAGS_idxer_type); + + // using FRAG_T = ImmutableEdgecutFragment; + std::shared_ptr fragment = + LoadGraph(FLAGS_efile, FLAGS_vfile, comm_spec, graph_spec); + bool push; + if (fragment->fnum() >= 8) { + uint64_t local_ivnum = fragment->GetInnerVerticesNum(); + uint64_t local_ovnum = fragment->GetOuterVerticesNum(); + uint64_t total_ivnum, total_ovnum; + MPI_Allreduce(&local_ivnum, &total_ivnum, 1, MPI_UINT64_T, MPI_SUM, + comm_spec.comm()); + MPI_Allreduce(&local_ovnum, &total_ovnum, 1, MPI_UINT64_T, MPI_SUM, + comm_spec.comm()); + + double avg_degree = static_cast(FLAGS_edge_num) / + static_cast(FLAGS_vertex_num); + double rate = + static_cast(total_ovnum) / static_cast(total_ivnum); + + if (rate < 0.5) { + // not to many outer vertices + push = true; + } else if (avg_degree > 60) { + // dense + push = true; + } else { + push = false; + } + } else { + push = true; + } + + if (!push) { + using AppType = PageRankOpt; + auto app = std::make_shared(); + DoQuery(fragment, app, comm_spec, spec, + out_prefix, delta, mr); + } else { + using AppType = PageRankPushOpt; + auto app = std::make_shared(); + DoQuery(fragment, app, comm_spec, spec, + out_prefix, delta, mr); + } +} + template std::pair get_min_max_id(const VERTEX_MAP_T& vm) { fid_t fnum = vm.GetFragmentNum(); @@ -287,6 +353,37 @@ void CreateAndQueryOpt(const CommSpec& comm_spec, const std::string& out_prefix, args...); } +template class APP_T, typename... Args> +void CreateAndQueryOptString(const CommSpec& comm_spec, + const std::string& out_prefix, + const ParallelEngineSpec& spec, Args... args) { + timer_next("load graph"); + LoadGraphSpec graph_spec = DefaultLoadGraphSpec(); + graph_spec.set_directed(FLAGS_directed); + graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor); + graph_spec.load_concurrency = FLAGS_load_concurrency; + if (FLAGS_deserialize) { + graph_spec.set_deserialize(true, FLAGS_serialization_prefix); + } + if (FLAGS_serialize) { + graph_spec.set_serialize(true, FLAGS_serialization_prefix); + } + graph_spec.partitioner_type = + parse_partitioner_type_name(FLAGS_partitioner_type); + graph_spec.idxer_type = parse_idxer_type_name(FLAGS_idxer_type); + + using FRAG_T = + ImmutableEdgecutFragment; + std::shared_ptr fragment = + LoadGraph(FLAGS_efile, FLAGS_vfile, comm_spec, graph_spec); + using AppType = APP_T; + auto app = std::make_shared(); + DoQuery(fragment, app, comm_spec, spec, out_prefix, + args...); +} + template class APP1_T, template class APP2_T, typename... Args> @@ -391,6 +488,29 @@ void RunOpt() { RunUndirectedPageRankOpt( comm_spec, out_prefix, spec, FLAGS_pr_d, FLAGS_pr_mr); } + } else if (name == "pagerank_str") { + if (FLAGS_directed) { + if (FLAGS_partitioner_type == "default") { + FLAGS_partitioner_type = "hash"; + } + if (FLAGS_idxer_type == "default") { + FLAGS_idxer_type = "pthash"; + } + CreateAndQueryOptString( + comm_spec, out_prefix, spec, FLAGS_pr_d, FLAGS_pr_mr); + } else { + FLAGS_rebalance = true; + FLAGS_rebalance_vertex_factor = 0; + if (FLAGS_partitioner_type == "default") { + FLAGS_partitioner_type = "segment"; + } + if (FLAGS_idxer_type == "default") { + FLAGS_idxer_type = "sorted_array"; + } + RunUndirectedPageRankOptString( + comm_spec, out_prefix, spec, FLAGS_pr_d, FLAGS_pr_mr); + } } else if (name == "cdlp") { if (FLAGS_directed) { FLAGS_directed = false; diff --git a/grape/communication/shuffle.h b/grape/communication/shuffle.h index ab3ee77b..5559795c 100644 --- a/grape/communication/shuffle.h +++ b/grape/communication/shuffle.h @@ -306,6 +306,14 @@ void foreach_helper(const Tuple& t, const Func& func, } } +template +void range_foreach_helper(const Tuple& t, size_t begin, size_t end, + const Func& func, index_sequence) { + for (size_t i = begin; i < end; ++i) { + func(get_const_buffer(t)[i]...); + } +} + template void foreach_rval_helper(Tuple& t, const Func& func, index_sequence) { size_t size = t.size(); @@ -314,11 +322,25 @@ void foreach_rval_helper(Tuple& t, const Func& func, index_sequence) { } } +template +void range_foreach_rval_helper(Tuple& t, size_t begin, size_t end, + const Func& func, index_sequence) { + for (size_t i = begin; i < end; ++i) { + func(std::move(get_buffer(t)[i])...); + } +} + template void foreach(Tuple& t, const Func& func) { foreach_helper(t, func, make_index_sequence{}); } +template +void range_foreach_rval(Tuple& t, size_t begin, size_t end, const Func& func) { + range_foreach_rval_helper(t, begin, end, func, + make_index_sequence{}); +} + template void foreach_rval(Tuple& t, const Func& func) { foreach_rval_helper(t, func, make_index_sequence{}); diff --git a/grape/fragment/basic_efile_fragment_loader.h b/grape/fragment/basic_efile_fragment_loader.h index ad7af090..ca54c70d 100644 --- a/grape/fragment/basic_efile_fragment_loader.h +++ b/grape/fragment/basic_efile_fragment_loader.h @@ -27,6 +27,15 @@ limitations under the License. namespace grape { +inline size_t custom_hash(size_t val) { + val = (val ^ 61) ^ (val >> 16); + val = val + (val << 3); + val = val ^ (val >> 4); + val = val * 0x27d4eb2d; + val = val ^ (val >> 15); + return val; +} + template class BasicEFileFragmentLoader : public BasicFragmentLoaderBase { using fragment_t = FRAG_T; @@ -129,49 +138,119 @@ class BasicEFileFragmentLoader : public BasicFragmentLoaderBase { } } else { std::atomic idx(0); - size_t buf_num = got_edges_.size(); std::vector> vertices(concurrency_); + std::vector> vertices_mat(concurrency_ * + concurrency_); std::vector threads; + std::vector thread_time(concurrency_, 0); for (int i = 0; i < concurrency_; ++i) { threads.emplace_back( [&, this](int tid) { + double tt = -grape::GetCurrentTime(); fid_t fid = comm_spec_.fid(); - auto& vec = vertices[tid]; - while (true) { - size_t cur = idx.fetch_add(1); - if (cur >= buf_num) { - break; + // auto& vec = vertices[tid]; + for (auto& buffer : got_edges_) { + size_t size = buffer.size(); + size_t chunk = (size + concurrency_ - 1) / concurrency_; + size_t start = std::min(size, chunk * tid); + size_t end = std::min(size, start + chunk); + if (spec_.idxer_type == IdxerType::kLocalIdxer) { + range_foreach_helper( + buffer, start, end, + [&](const internal_oid_t& src, + const internal_oid_t& dst) { + int src_hash = + custom_hash(std::hash()(src)) % + concurrency_; + vertices_mat[tid * concurrency_ + src_hash] + .emplace_back(src); + int dst_hash = + custom_hash(std::hash()(dst)) % + concurrency_; + vertices_mat[tid * concurrency_ + dst_hash] + .emplace_back(dst); + // vec.emplace_back(src); + // vec.emplace_back(dst); + }, + make_index_sequence<2>{}); + } else { + range_foreach_helper( + buffer, start, end, + [&](const internal_oid_t& src, + const internal_oid_t& dst) { + if (builder.get_fragment_id(src) == fid) { + int src_hash = + custom_hash(std::hash()(src)) % + concurrency_; + vertices_mat[tid * concurrency_ + src_hash] + .emplace_back(src); + // vec.emplace_back(src); + } + if (builder.get_fragment_id(dst) == fid) { + int dst_hash = + custom_hash(std::hash()(dst)) % + concurrency_; + vertices_mat[tid * concurrency_ + dst_hash] + .emplace_back(dst); + // vec.emplace_back(dst); + } + }, + make_index_sequence<2>{}); } - auto& buffer = got_edges_[cur]; - foreach_helper( - buffer, - [&](const internal_oid_t& src, - const internal_oid_t& dst) { - if (builder.get_fragment_id(src) == fid) { - vec.emplace_back(src); - } - if (builder.get_fragment_id(dst) == fid) { - vec.emplace_back(dst); - } - }, - make_index_sequence<2>{}); } - DistinctSort(vec); + // DistinctSort(vec); + tt += grape::GetCurrentTime(); + thread_time[tid] = tt; }, i); } for (auto& thrd : threads) { thrd.join(); } + show_thread_timing(thread_time, "parse vertices"); + std::vector aggregate_threads; + for (int i = 0; i < concurrency_; ++i) { + aggregate_threads.emplace_back( + [&, this](int tid) { + double tt = -grape::GetCurrentTime(); + auto& vec = vertices[tid]; + for (int j = 0; j < concurrency_; ++j) { + vec.insert(vec.end(), + vertices_mat[j * concurrency_ + tid].begin(), + vertices_mat[j * concurrency_ + tid].end()); + } + DistinctSort(vec); + tt += grape::GetCurrentTime(); + thread_time[tid] = tt; + }, + i); + } + for (auto& thrd : aggregate_threads) { + thrd.join(); + } + show_thread_timing(thread_time, "aggregate vertices"); + // TODO(luoxiaojian): parallelize this part + double tx = -grape::GetCurrentTime(); for (auto& vec : vertices) { for (auto& v : vec) { builder.add_vertex(v); } } + tx += grape::GetCurrentTime(); + LOG(INFO) << "[worker-" << comm_spec_.worker_id() + << "] finished adding vertices, time: " << tx << " s"; } + double ty = -grape::GetCurrentTime(); builder.finish(comm_spec_, *vm_ptr); + ty += grape::GetCurrentTime(); + LOG(INFO) << "[worker-" << comm_spec_.worker_id() + << "] finished building vertex map, time: " << ty << " s"; } + MPI_Barrier(comm_spec_.comm()); t0 += grape::GetCurrentTime(); + if (comm_spec_.worker_id() == 0) { + VLOG(1) << "finished constructing vertex_map, time: " << t0 << " s"; + } double t1 = -grape::GetCurrentTime(); std::vector> processed_edges; @@ -195,39 +274,51 @@ class BasicEFileFragmentLoader : public BasicFragmentLoaderBase { total += buffers.size(); } processed_edges.resize(total); - std::atomic idx(0); - size_t buf_num = got_edges_.size(); std::vector threads; + std::vector thread_time(concurrency_, 0); for (int i = 0; i < concurrency_; ++i) { - threads.emplace_back([&, this]() { - while (true) { - size_t cur = idx.fetch_add(1); - if (cur >= buf_num) { - break; - } - auto& buffer = got_edges_[cur]; - size_t offset = offsets[cur]; - foreach_rval(buffer, [&](internal_oid_t&& src, internal_oid_t&& dst, - edata_t&& data) { - vid_t src_gid, dst_gid; - if (vm_ptr->GetGid(oid_t(src), src_gid) && - vm_ptr->GetGid(oid_t(dst), dst_gid)) { - processed_edges[offset] = - Edge(src_gid, dst_gid, std::move(data)); - } else { - processed_edges[offset] = Edge( - std::numeric_limits::max(), - std::numeric_limits::max(), std::move(data)); + threads.emplace_back( + [&, this](int tid) { + double tt = -grape::GetCurrentTime(); + size_t global_offset = 0; + for (auto& buffer : got_edges_) { + size_t size = buffer.size(); + size_t chunk = (size + concurrency_ - 1) / concurrency_; + size_t start = std::min(size, chunk * tid); + size_t end = std::min(size, start + chunk); + size_t local_offset = global_offset + start; + global_offset += size; + range_foreach_rval( + buffer, start, end, + [&](internal_oid_t&& src, internal_oid_t&& dst, + edata_t&& data) { + vid_t src_gid, dst_gid; + if (vm_ptr->GetGidFromInternalOid(src, src_gid) && + vm_ptr->GetGidFromInternalOid(dst, dst_gid)) { + processed_edges[local_offset++] = Edge( + src_gid, dst_gid, std::move(data)); + } else { + processed_edges[local_offset++] = Edge( + std::numeric_limits::max(), + std::numeric_limits::max(), std::move(data)); + } + }); } - }); - } - }); + tt += grape::GetCurrentTime(); + thread_time[tid] = tt; + }, + i); } for (auto& thrd : threads) { thrd.join(); } + show_thread_timing(thread_time, "construct edges"); } + MPI_Barrier(comm_spec_.comm()); t1 += grape::GetCurrentTime(); + if (comm_spec_.worker_id() == 0) { + VLOG(1) << "finished parsing edges, time: " << t1 << " s"; + } double t2 = -grape::GetCurrentTime(); fragment = std::make_shared(); @@ -239,11 +330,11 @@ class BasicEFileFragmentLoader : public BasicFragmentLoaderBase { fragment->ParallelInit(comm_spec_, spec_.directed, std::move(vm_ptr), fake_vertices, processed_edges, concurrency_); } + MPI_Barrier(comm_spec_.comm()); t2 += grape::GetCurrentTime(); - LOG(INFO) << "[worker-" << comm_spec_.worker_id() - << "] basic loader: construct vertices time: " << t0 - << " s, construct edges: " << t1 - << " s, construct fragment: " << t2 << " s"; + if (comm_spec_.worker_id() == 0) { + VLOG(1) << "finished initializing fragment, time: " << t2 << " s"; + } if (!std::is_same::value) { this->InitOuterVertexData(fragment); diff --git a/grape/fragment/csr_edgecut_fragment_base.h b/grape/fragment/csr_edgecut_fragment_base.h index 13ce0292..d3bd5135 100644 --- a/grape/fragment/csr_edgecut_fragment_base.h +++ b/grape/fragment/csr_edgecut_fragment_base.h @@ -411,9 +411,11 @@ class CSREdgecutFragmentBase using base_t::IsInnerVertexGid; using base_t::IsInnerVertexLid; using base_t::OuterVertexGid2Lid; - void buildCSR(const typename csr_builder_t::vertex_range_t& vertex_range, + using vertices_t = typename TRAITS_T::vertices_t; + + void buildCSR(const vertices_t& vertex_range, std::vector>& edges, - LoadStrategy load_strategy) { + LoadStrategy load_strategy, int concurrency = 1) { csr_builder_t ie_builder, oe_builder; ie_builder.init(vertex_range); oe_builder.init(vertex_range); @@ -497,42 +499,6 @@ class CSREdgecutFragmentBase ie_builder.inc_degree(e.dst); } }; - if (load_strategy == LoadStrategy::kOnlyIn) { - if (this->directed_) { - for (auto& e : edges) { - parse_iter_in(e); - } - } else { - for (auto& e : edges) { - parse_iter_in_undirected(e); - } - } - } else if (load_strategy == LoadStrategy::kOnlyOut) { - if (this->directed_) { - for (auto& e : edges) { - parse_iter_out(e); - } - } else { - for (auto& e : edges) { - parse_iter_out_undirected(e); - } - } - } else if (load_strategy == LoadStrategy::kBothOutIn) { - if (this->directed_) { - for (auto& e : edges) { - parse_iter_out_in(e); - } - } else { - for (auto& e : edges) { - parse_iter_out_in_undirected(e); - } - } - } else { - LOG(FATAL) << "Invalid load strategy"; - } - - ie_builder.build_offsets(); - oe_builder.build_offsets(); auto insert_iter_in = [&](const Edge& e) { if (e.src != invalid_vid) { @@ -593,137 +559,47 @@ class CSREdgecutFragmentBase } }; - if (load_strategy == LoadStrategy::kOnlyIn) { - if (this->directed_) { - for (auto& e : edges) { - insert_iter_in(e); - } - } else { - for (auto& e : edges) { - insert_iter_in_undirected(e); - } - } - } else if (load_strategy == LoadStrategy::kOnlyOut) { - if (this->directed_) { - for (auto& e : edges) { - insert_iter_out(e); - } - } else { - for (auto& e : edges) { - insert_iter_out_undirected(e); - } - } - } else if (load_strategy == LoadStrategy::kBothOutIn) { - if (this->directed_) { - for (auto& e : edges) { - insert_iter_out_in(e); - } - } else { - for (auto& e : edges) { - insert_iter_out_in_undirected(e); - } - } - } else { - LOG(FATAL) << "Invalid load strategy"; - } - - ie_builder.finish(ie_); - oe_builder.finish(oe_); - } - - void parallelBuildCSR( - const typename csr_builder_t::vertex_range_t& vertex_range, - std::vector>& edges, LoadStrategy load_strategy, - int concurrency) { - csr_builder_t ie_builder, oe_builder; - ie_builder.init(vertex_range); - oe_builder.init(vertex_range); - - static constexpr VID_T invalid_vid = std::numeric_limits::max(); - auto parse_iter_in = [&](Edge& e) { - if (e.src != invalid_vid) { - if (IsInnerVertexGid(e.src)) { - InnerVertexGid2Lid(e.src, e.src); - } else { - CHECK(OuterVertexGid2Lid(e.src, e.src)); - oe_builder.inc_degree(e.src); - } - InnerVertexGid2Lid(e.dst, e.dst); - ie_builder.inc_degree(e.dst); - } - }; - auto parse_iter_out = [&](Edge& e) { - if (e.src != invalid_vid) { - InnerVertexGid2Lid(e.src, e.src); - oe_builder.inc_degree(e.src); - if (IsInnerVertexGid(e.dst)) { - InnerVertexGid2Lid(e.dst, e.dst); - } else { - CHECK(OuterVertexGid2Lid(e.dst, e.dst)); - ie_builder.inc_degree(e.dst); - } - } - }; - auto parse_iter_out_in = [&](Edge& e) { - if (e.src != invalid_vid) { - Gid2Lid(e.src, e.src); - oe_builder.inc_degree(e.src); - Gid2Lid(e.dst, e.dst); - ie_builder.inc_degree(e.dst); - } - }; - auto parse_iter_in_undirected = [&](Edge& e) { - if (e.src != invalid_vid) { - if (IsInnerVertexGid(e.src)) { - InnerVertexGid2Lid(e.src, e.src); - ie_builder.inc_degree(e.src); - } else { - CHECK(OuterVertexGid2Lid(e.src, e.src)); - oe_builder.inc_degree(e.src); - } - if (IsInnerVertexGid(e.dst)) { - InnerVertexGid2Lid(e.dst, e.dst); - ie_builder.inc_degree(e.dst); + if (concurrency == 1) { + if (load_strategy == LoadStrategy::kOnlyIn) { + if (this->directed_) { + for (auto& e : edges) { + parse_iter_in(e); + } } else { - CHECK(OuterVertexGid2Lid(e.dst, e.dst)); - oe_builder.inc_degree(e.dst); + for (auto& e : edges) { + parse_iter_in_undirected(e); + } } - } - }; - auto parse_iter_out_undirected = [&](Edge& e) { - if (e.src != invalid_vid) { - if (IsInnerVertexGid(e.src)) { - InnerVertexGid2Lid(e.src, e.src); - oe_builder.inc_degree(e.src); + } else if (load_strategy == LoadStrategy::kOnlyOut) { + if (this->directed_) { + for (auto& e : edges) { + parse_iter_out(e); + } } else { - CHECK(OuterVertexGid2Lid(e.src, e.src)); - ie_builder.inc_degree(e.src); + for (auto& e : edges) { + parse_iter_out_undirected(e); + } } - if (IsInnerVertexGid(e.dst)) { - InnerVertexGid2Lid(e.dst, e.dst); - oe_builder.inc_degree(e.dst); + } else if (load_strategy == LoadStrategy::kBothOutIn) { + if (this->directed_) { + for (auto& e : edges) { + parse_iter_out_in(e); + } } else { - CHECK(OuterVertexGid2Lid(e.dst, e.dst)); - ie_builder.inc_degree(e.dst); + for (auto& e : edges) { + parse_iter_out_in_undirected(e); + } } + } else { + LOG(FATAL) << "Invalid load strategy"; } - }; - auto parse_iter_out_in_undirected = [&](Edge& e) { - if (e.src != invalid_vid) { - Gid2Lid(e.src, e.src); - oe_builder.inc_degree(e.src); - ie_builder.inc_degree(e.src); - Gid2Lid(e.dst, e.dst); - oe_builder.inc_degree(e.dst); - ie_builder.inc_degree(e.dst); - } - }; - - { + } else { std::vector threads; + std::vector thread_time(concurrency, 0); for (int i = 0; i < concurrency; ++i) { threads.emplace_back( [&, this](int tid) { + double tt = -grape::GetCurrentTime(); size_t batch = (edges.size() + concurrency - 1) / concurrency; size_t begin = std::min(batch * tid, edges.size()); size_t end = std::min(begin + batch, edges.size()); @@ -760,81 +636,61 @@ class CSREdgecutFragmentBase } else { LOG(FATAL) << "Invalid load strategy"; } + tt += grape::GetCurrentTime(); + thread_time[tid] = tt; }, i); } for (auto& thrd : threads) { thrd.join(); } + show_thread_timing(thread_time, "inc degree"); } ie_builder.build_offsets(); oe_builder.build_offsets(); - auto insert_iter_in = [&](const Edge& e) { - if (e.src != invalid_vid) { - ie_builder.add_edge(e.dst, nbr_t(e.src, e.edata)); - if (!IsInnerVertexLid(e.src)) { - oe_builder.add_edge(e.src, nbr_t(e.dst, e.edata)); - } - } - }; - auto insert_iter_out = [&](const Edge& e) { - if (e.src != invalid_vid) { - oe_builder.add_edge(e.src, nbr_t(e.dst, e.edata)); - if (!IsInnerVertexLid(e.dst)) { - ie_builder.add_edge(e.dst, nbr_t(e.src, e.edata)); - } - } - }; - auto insert_iter_out_in = [&](const Edge& e) { - if (e.src != invalid_vid) { - ie_builder.add_edge(e.dst, nbr_t(e.src, e.edata)); - oe_builder.add_edge(e.src, nbr_t(e.dst, e.edata)); - } - }; - auto insert_iter_in_undirected = [&](const Edge& e) { - if (e.src != invalid_vid) { - if (IsInnerVertexLid(e.src)) { - ie_builder.add_edge(e.src, nbr_t(e.dst, e.edata)); - } else { - oe_builder.add_edge(e.src, nbr_t(e.dst, e.edata)); - } - if (IsInnerVertexLid(e.dst)) { - ie_builder.add_edge(e.dst, nbr_t(e.src, e.edata)); + if (concurrency == 1) { + if (load_strategy == LoadStrategy::kOnlyIn) { + if (this->directed_) { + for (auto& e : edges) { + insert_iter_in(e); + } } else { - oe_builder.add_edge(e.dst, nbr_t(e.src, e.edata)); + for (auto& e : edges) { + insert_iter_in_undirected(e); + } } - } - }; - auto insert_iter_out_undirected = [&](const Edge& e) { - if (e.src != invalid_vid) { - if (IsInnerVertexLid(e.src)) { - oe_builder.add_edge(e.src, nbr_t(e.dst, e.edata)); + } else if (load_strategy == LoadStrategy::kOnlyOut) { + if (this->directed_) { + for (auto& e : edges) { + insert_iter_out(e); + } } else { - ie_builder.add_edge(e.src, nbr_t(e.dst, e.edata)); + for (auto& e : edges) { + insert_iter_out_undirected(e); + } } - if (IsInnerVertexLid(e.dst)) { - oe_builder.add_edge(e.dst, nbr_t(e.src, e.edata)); + } else if (load_strategy == LoadStrategy::kBothOutIn) { + if (this->directed_) { + for (auto& e : edges) { + insert_iter_out_in(e); + } } else { - ie_builder.add_edge(e.dst, nbr_t(e.src, e.edata)); + for (auto& e : edges) { + insert_iter_out_in_undirected(e); + } } + } else { + LOG(FATAL) << "Invalid load strategy"; } - }; - auto insert_iter_out_in_undirected = [&](const Edge& e) { - if (e.src != invalid_vid) { - ie_builder.add_edge(e.dst, nbr_t(e.src, e.edata)); - ie_builder.add_edge(e.src, nbr_t(e.dst, e.edata)); - oe_builder.add_edge(e.src, nbr_t(e.dst, e.edata)); - oe_builder.add_edge(e.dst, nbr_t(e.src, e.edata)); - } - }; - - { + } else { std::vector threads; + std::vector thread_time(concurrency, 0); for (int i = 0; i < concurrency; ++i) { threads.emplace_back( [&, this](int tid) { + double tt = -grape::GetCurrentTime(); size_t batch = (edges.size() + concurrency - 1) / concurrency; size_t begin = std::min(batch * tid, edges.size()); size_t end = std::min(begin + batch, edges.size()); @@ -871,16 +727,19 @@ class CSREdgecutFragmentBase } else { LOG(FATAL) << "Invalid load strategy"; } + tt += grape::GetCurrentTime(); + thread_time[tid] = tt; }, i); } for (auto& thrd : threads) { thrd.join(); } + show_thread_timing(thread_time, "insert edge"); } - ie_builder.finish(ie_); - oe_builder.finish(oe_); + ie_builder.finish(ie_, concurrency); + oe_builder.finish(oe_, concurrency); } template diff --git a/grape/fragment/ev_fragment_loader.h b/grape/fragment/ev_fragment_loader.h index cf7cd433..be5c6be6 100644 --- a/grape/fragment/ev_fragment_loader.h +++ b/grape/fragment/ev_fragment_loader.h @@ -114,8 +114,10 @@ class EVFragmentLoader { } } - double t0 = -grape::GetCurrentTime(); if (!vfile.empty()) { + MPI_Barrier(comm_spec_.comm()); + double t0 = -grape::GetCurrentTime(); + auto io_adaptor = std::unique_ptr(new IOADAPTOR_T(vfile)); io_adaptor->SetPartialRead(comm_spec_.worker_id(), comm_spec_.worker_num()); @@ -141,12 +143,26 @@ class EVFragmentLoader { basic_fragment_loader_->AddVertex(vertex_id, v_data); } io_adaptor->Close(); - } - basic_fragment_loader_->ConstructVertices(); - t0 += grape::GetCurrentTime(); + MPI_Barrier(comm_spec_.comm()); + t0 += grape::GetCurrentTime(); + if (comm_spec_.worker_id() == 0) { + VLOG(1) << "finished reading vertices inputs, time: " << t0 << " s"; + } + + double t1 = -grape::GetCurrentTime(); + basic_fragment_loader_->ConstructVertices(); + + MPI_Barrier(comm_spec_.comm()); + t1 += grape::GetCurrentTime(); + if (comm_spec_.worker_id() == 0) { + VLOG(1) << "finished constructing vertices, time: " << t1 << " s"; + } + } else { + basic_fragment_loader_->ConstructVertices(); + } - double t1 = -grape::GetCurrentTime(); + double t2 = -grape::GetCurrentTime(); { auto io_adaptor = std::unique_ptr(new IOADAPTOR_T(std::string(efile))); @@ -178,19 +194,20 @@ class EVFragmentLoader { } io_adaptor->Close(); } - t1 += grape::GetCurrentTime(); - - VLOG(1) << "[worker-" << comm_spec_.worker_id() - << "] finished add vertices and edges"; + MPI_Barrier(comm_spec_.comm()); + t2 += grape::GetCurrentTime(); + if (comm_spec_.worker_id() == 0) { + VLOG(1) << "finished reading edges inputs, time: " << t2 << " s"; + } - double t2 = -grape::GetCurrentTime(); + double t3 = -grape::GetCurrentTime(); basic_fragment_loader_->ConstructFragment(fragment); - t2 += grape::GetCurrentTime(); + MPI_Barrier(comm_spec_.comm()); + t3 += grape::GetCurrentTime(); - LOG(INFO) << "[worker-" << comm_spec_.worker_id() - << "] load graph: construct vertices time: " << t0 - << "s, construct edges time: " << t1 - << ", construct fragment time: " << t2 << "s"; + if (comm_spec_.worker_id() == 0) { + VLOG(1) << "finished constructing fragment, time: " << t3 << " s"; + } if (spec.serialize) { bool serialized = SerializeFragment( diff --git a/grape/fragment/fragment_base.h b/grape/fragment/fragment_base.h index d56e053e..11d88e4e 100644 --- a/grape/fragment/fragment_base.h +++ b/grape/fragment/fragment_base.h @@ -183,6 +183,14 @@ class FragmentBase { return oid; } + using internal_id_t = typename InternalOID::type; + + internal_id_t GetInternalId(const Vertex& v) const { + internal_id_t oid{}; + vm_ptr_->GetInternalOid(Vertex2Gid(v), oid); + return oid; + } + OID_T Gid2Oid(VID_T gid) const { OID_T oid; vm_ptr_->GetOid(gid, oid); diff --git a/grape/fragment/immutable_edgecut_fragment.h b/grape/fragment/immutable_edgecut_fragment.h index 5c529388..838a3051 100644 --- a/grape/fragment/immutable_edgecut_fragment.h +++ b/grape/fragment/immutable_edgecut_fragment.h @@ -59,7 +59,7 @@ struct ImmutableEdgecutFragmentTraits { using fragment_const_adj_list_t = ConstAdjList; using csr_t = ImmutableCSR>; - using csr_builder_t = ImmutableCSRBuild>; + using csr_builder_t = ImmutableCSRParallelBuilder>; using mirror_vertices_t = std::vector>; }; @@ -157,7 +157,6 @@ class ImmutableEdgecutFragment using base_t::buildCSR; using base_t::init; using base_t::IsInnerVertexGid; - using base_t::parallelBuildCSR; static std::string type_info() { std::string ret = "ImmutableEdgecutFragment<"; @@ -372,7 +371,6 @@ class ImmutableEdgecutFragment double t0 = -grape::GetCurrentTime(); static constexpr VID_T invalid_vid = std::numeric_limits::max(); { - std::vector outer_vertices; auto iter_in = [&](Edge& e, std::vector& outer_vertices) { if (IsInnerVertexGid(e.dst)) { @@ -437,9 +435,11 @@ class ImmutableEdgecutFragment std::vector> outer_vertices_vec(concurrency); std::vector threads; + std::vector thread_time(concurrency, 0); for (int i = 0; i < concurrency; ++i) { threads.emplace_back( [&, this](int tid) { + double tt = -grape::GetCurrentTime(); size_t batch = (edges.size() + concurrency - 1) / concurrency; size_t begin = std::min(batch * tid, edges.size()); size_t end = std::min(begin + batch, edges.size()); @@ -472,12 +472,16 @@ class ImmutableEdgecutFragment LOG(FATAL) << "Invalid load strategy"; } DistinctSort(vec); + tt += grape::GetCurrentTime(); + thread_time[tid] = tt; }, i); } for (auto& thrd : threads) { thrd.join(); } + show_thread_timing(thread_time, "construct outer vertices time"); + std::vector outer_vertices; for (auto& vec : outer_vertices_vec) { outer_vertices.insert(outer_vertices.end(), vec.begin(), vec.end()); } @@ -503,7 +507,7 @@ class ImmutableEdgecutFragment t1 += grape::GetCurrentTime(); double t2 = -grape::GetCurrentTime(); - parallelBuildCSR(this->Vertices(), edges, load_strategy, concurrency); + buildCSR(this->Vertices(), edges, load_strategy, concurrency); t2 += grape::GetCurrentTime(); double t3 = -grape::GetCurrentTime(); @@ -529,8 +533,6 @@ class ImmutableEdgecutFragment LOG(INFO) << "[frag-" << fid_ << "] construct vertices time: " << t0 << ", init time: " << t1 << ", build csr time: " << t2 << ", init outer vertices time: " << t3; - LOG(INFO) << "[frag-" << fid_ << "] ivnum: " << ivnum_ - << ", ovnum: " << ovnum_ << ", edge_num: " << this->GetEdgeNum(); } template diff --git a/grape/graph/de_mutable_csr.h b/grape/graph/de_mutable_csr.h index cea96b61..011b7f90 100644 --- a/grape/graph/de_mutable_csr.h +++ b/grape/graph/de_mutable_csr.h @@ -84,15 +84,15 @@ class DeMutableCSRBuilder> { } } - void finish(DeMutableCSR>& ret) { + void finish(DeMutableCSR>& ret, int concurrency) { ret.min_id_ = min_id_; ret.max_id_ = max_id_; ret.max_head_id_ = max_head_id_; ret.min_tail_id_ = min_tail_id_; ret.dedup_ = dedup_; - head_builder_.finish(ret.head_); - tail_builder_.finish(ret.tail_); + head_builder_.finish(ret.head_, concurrency); + tail_builder_.finish(ret.tail_, concurrency); if (dedup_) { VID_T head_num = ret.head_.vertex_num(); diff --git a/grape/graph/immutable_csr.h b/grape/graph/immutable_csr.h index a19b2c48..a3f81be6 100644 --- a/grape/graph/immutable_csr.h +++ b/grape/graph/immutable_csr.h @@ -33,15 +33,15 @@ template class ImmutableCSR; template -class ImmutableCSRBuild { +class ImmutableCSRParallelBuilder { using vid_t = VID_T; using nbr_t = NBR_T; public: using vertex_range_t = VertexRange; - ImmutableCSRBuild() {} - ~ImmutableCSRBuild() {} + ImmutableCSRParallelBuilder() {} + ~ImmutableCSRParallelBuilder() {} void init(VID_T vnum) { vnum_ = vnum; @@ -101,9 +101,33 @@ class ImmutableCSRBuild { } } - void finish(ImmutableCSR& ret) { - for (VID_T i = 0; i < vnum_; ++i) { - std::sort(offsets_[i], offsets_[i + 1]); + void finish(ImmutableCSR& ret, int concurrency) { + if (concurrency == 1) { + for (VID_T i = 0; i < vnum_; ++i) { + std::sort(offsets_[i], offsets_[i + 1]); + } + } else { + std::vector threads; + std::atomic offset(0); + static constexpr VID_T chunk = 4096; + for (int i = 0; i < concurrency; ++i) { + threads.emplace_back([this, &offset]() { + while (true) { + VID_T begin = std::min(offset.fetch_add(chunk), vnum_); + VID_T end = std::min(begin + chunk, vnum_); + if (begin == end) { + break; + } + while (begin < end) { + std::sort(offsets_[begin], offsets_[begin + 1]); + ++begin; + } + } + }); + } + for (auto& thrd : threads) { + thrd.join(); + } } ret.edges_.swap(edges_); @@ -120,6 +144,90 @@ class ImmutableCSRBuild { std::vector> iter_; }; +template +class ImmutableCSRBuilder { + using vid_t = VID_T; + using nbr_t = NBR_T; + + public: + using vertex_range_t = VertexRange; + + ImmutableCSRBuilder() {} + ~ImmutableCSRBuilder() {} + + void init(VID_T vnum) { + vnum_ = vnum; + degree_.clear(); + degree_.resize(vnum, 0); + } + + void init(const VertexRange& range) { + assert(range.begin_value() == 0); + vnum_ = range.size(); + degree_.clear(); + degree_.resize(vnum_, 0); + } + + void inc_degree(VID_T i) { + if (i < vnum_) { + ++degree_[i]; + } + } + + void build_offsets() { + edge_num_ = 0; + for (auto d : degree_) { + edge_num_ += d; + } + edges_.clear(); + edges_.resize(edge_num_); + offsets_.clear(); + offsets_.resize(vnum_ + 1); + offsets_[0] = edges_.data(); + for (VID_T i = 0; i < vnum_; ++i) { + offsets_[i + 1] = offsets_[i] + degree_[i]; + } + CHECK_EQ(offsets_[vnum_], edges_.data() + edge_num_); + { + std::vector tmp; + tmp.swap(degree_); + } + iter_ = offsets_; + } + + void add_edge(VID_T src, const nbr_t& nbr) { + if (src < vnum_) { + nbr_t* ptr = iter_[src]++; + *ptr = nbr; + } + } + + template + void sort(const FUNC_T& func) { + for (VID_T i = 0; i < vnum_; ++i) { + std::sort(offsets_[i], offsets_[i + 1], func); + } + } + + void finish(ImmutableCSR& ret, int concurrency) { + for (VID_T i = 0; i < vnum_; ++i) { + std::sort(offsets_[i], offsets_[i + 1]); + } + + ret.edges_.swap(edges_); + ret.offsets_.swap(offsets_); + } + + private: + VID_T vnum_; + size_t edge_num_; + + Array> edges_; + Array> offsets_; + std::vector degree_; + Array> iter_; +}; + template class ImmutableCSRStreamBuilder { public: @@ -135,7 +243,7 @@ class ImmutableCSRStreamBuilder { degree_.push_back(degree); } - void finish(ImmutableCSR& ret) { + void finish(ImmutableCSR& ret, int concurrency) { ret.edges_.clear(); ret.edges_.resize(edges_.size()); std::copy(edges_.begin(), edges_.end(), ret.edges_.begin()); @@ -158,6 +266,7 @@ class ImmutableCSR { public: using vid_t = VID_T; using nbr_t = NBR_T; + using vertex_range_t = VertexRange; ImmutableCSR() { offsets_.resize(1); @@ -258,7 +367,10 @@ class ImmutableCSR { Array> offsets_; template - friend class ImmutableCSRBuild; + friend class ImmutableCSRBuilder; + + template + friend class ImmutableCSRParallelBuilder; template friend class ImmutableCSRStreamBuilder; diff --git a/grape/graph/mutable_csr.h b/grape/graph/mutable_csr.h index 6ca5467d..8d6524aa 100644 --- a/grape/graph/mutable_csr.h +++ b/grape/graph/mutable_csr.h @@ -112,7 +112,7 @@ class MutableCSRBuilder> { } } - void finish(MutableCSR>& ret) { + void finish(MutableCSR>& ret, int concurrency) { if (vnum_ == 0) { ret.capacity_.clear(); ret.prev_.clear(); diff --git a/grape/util.h b/grape/util.h index 5145f1ea..1960a35b 100644 --- a/grape/util.h +++ b/grape/util.h @@ -223,6 +223,19 @@ inline size_t get_available_memory() { #endif } +void show_thread_timing(const std::vector& thread_time, + const std::string& prefix) { + double total = 0, min_t = std::numeric_limits::max(), max_t = 0; + for (auto& t : thread_time) { + total += t; + min_t = std::min(min_t, t); + max_t = std::max(max_t, t); + } + double avg_t = total / thread_time.size(); + LOG(INFO) << prefix << " min: " << min_t << " max: " << max_t + << " avg: " << avg_t; +} + } // namespace grape #endif // GRAPE_UTIL_H_ diff --git a/grape/vertex_map/idxers/sorted_array_idxer.h b/grape/vertex_map/idxers/sorted_array_idxer.h index 0eaf48da..64f5e15f 100644 --- a/grape/vertex_map/idxers/sorted_array_idxer.h +++ b/grape/vertex_map/idxers/sorted_array_idxer.h @@ -80,12 +80,8 @@ class SortedArrayIdxer public: SortedArrayIdxer() {} - explicit SortedArrayIdxer( - Array>&& id_list) { - for (auto& id : id_list) { - id_list_.emplace_back(id); - } - } + explicit SortedArrayIdxer(StringViewVector&& id_list) + : id_list_(std::move(id_list)) {} ~SortedArrayIdxer() {} bool get_key(VID_T vid, internal_oid_t& oid) const override { @@ -159,6 +155,31 @@ class SortedArrayIdxerDummyBuilder : public IdxerBuilderBase { Array> id_list_; }; +template +class SortedArrayIdxerDummyBuilder + : public IdxerBuilderBase { + public: + using internal_oid_t = typename InternalOID::type; + void add(const internal_oid_t& oid) override {} + + std::unique_ptr> finish() override { + return std::unique_ptr>( + new SortedArrayIdxer(std::move(id_list_))); + } + + void sync_request(const CommSpec& comm_spec, int target, int tag) override { + sync_comm::Recv(id_list_, target, tag, comm_spec.comm()); + } + + void sync_response(const CommSpec& comm_spec, int source, int tag) override { + LOG(ERROR) << "SortedArrayIdxerDummyBuilder should not be used to sync " + "response"; + } + + private: + StringViewVector id_list_; +}; + template class SortedArrayIdxerBuilder : public IdxerBuilderBase { public: @@ -193,6 +214,61 @@ class SortedArrayIdxerBuilder : public IdxerBuilderBase { bool sorted_ = false; }; +template +class SortedArrayIdxerBuilder + : public IdxerBuilderBase { + public: + using internal_oid_t = typename InternalOID::type; + void add(const internal_oid_t& oid) override { + keys_.push_back(std::string(oid)); + // keys_.push_back(oid); + } + + std::unique_ptr> finish() override { + if (!sorted_) { + double t0 = -GetCurrentTime(); + DistinctSort(keys_); + t0 += GetCurrentTime(); + double t1 = -GetCurrentTime(); + for (auto& key : keys_) { + id_list_.emplace_back(key); + } + sorted_ = true; + t1 += GetCurrentTime(); + LOG(INFO) << "sort time: " << t0 << ", copy time: " << t1; + } + return std::unique_ptr>( + new SortedArrayIdxer(std::move(id_list_))); + } + + void sync_request(const CommSpec& comm_spec, int target, int tag) override { + LOG(ERROR) << "HashMapIdxerBuilder should not be used to sync request"; + } + + void sync_response(const CommSpec& comm_spec, int source, int tag) override { + if (!sorted_) { + double t0 = -GetCurrentTime(); + DistinctSort(keys_); + t0 += GetCurrentTime(); + double t1 = -GetCurrentTime(); + for (auto& key : keys_) { + id_list_.emplace_back(key); + } + sorted_ = true; + t1 += GetCurrentTime(); + LOG(INFO) << "[worker-" << comm_spec.worker_id() << "] sort time: " << t0 + << ", copy time: " << t1; + } + sync_comm::Send(id_list_, source, tag, comm_spec.comm()); + } + + private: + std::vector keys_; + // std::vector keys_; + StringViewVector id_list_; + bool sorted_ = false; +}; + } // namespace grape #endif // GRAPE_VERTEX_MAP_IDXERS_SORTED_ARRAY_IDXER_H_ diff --git a/grape/vertex_map/vertex_map.h b/grape/vertex_map/vertex_map.h index a1c385bd..e970567d 100644 --- a/grape/vertex_map/vertex_map.h +++ b/grape/vertex_map/vertex_map.h @@ -77,13 +77,25 @@ class VertexMap { return GetOid(fid, GetLidFromGid(gid), oid); } + bool GetInternalOid(const VID_T& gid, internal_oid_t& oid) const { + fid_t fid = GetFidFromGid(gid); + return GetInternalOid(fid, GetLidFromGid(gid), oid); + } + bool GetOid(fid_t fid, const VID_T& lid, OID_T& oid) const { internal_oid_t internal_oid; + if (GetInternalOid(fid, lid, internal_oid)) { + oid = InternalOID::FromInternal(internal_oid); + return true; + } + return false; + } + + bool GetInternalOid(fid_t fid, const VID_T& lid, internal_oid_t& oid) const { if (fid >= fnum_) { return false; } - if (idxers_[fid]->get_key(lid, internal_oid)) { - oid = InternalOID::FromInternal(internal_oid); + if (idxers_[fid]->get_key(lid, oid)) { return true; } return false; @@ -109,6 +121,26 @@ class VertexMap { return GetGid(fid, oid, gid); } + bool GetGidFromInternalOid(fid_t fid, const internal_oid_t& oid, + VID_T& gid) const { + if (fid >= fnum_) { + return false; + } + if (idxers_[fid]->get_index(oid, gid)) { + gid = Lid2Gid(fid, gid); + return true; + } + return false; + } + + bool GetGidFromInternalOid(const internal_oid_t& oid, VID_T& gid) const { + fid_t fid = partitioner_->GetPartitionId(oid); + if (fid == fnum_) { + return false; + } + return GetGidFromInternalOid(fid, oid, gid); + } + void reset() { idxers_.clear(); } void ExtendVertices(const CommSpec& comm_spec,