diff --git a/benchmark/hermes_api_bench.cc b/benchmark/hermes_api_bench.cc index 47c2f8547..f1a8f6644 100644 --- a/benchmark/hermes_api_bench.cc +++ b/benchmark/hermes_api_bench.cc @@ -26,7 +26,7 @@ void GatherTimes(std::string test_name, size_t io_size, MpiTimer &t) { if (t.rank_ == 0) { double max = t.GetSec(); double mbps = io_size / t.GetUsec(); - HIPRINT("{}: Time: {} sec, MBps (or MOps): {}, Count: {}, Nprocs: {}\n", + HILOG(kInfo, "{}: Time: {} sec, MBps (or MOps): {}, Count: {}, Nprocs: {}\n", test_name, max, mbps, io_size, t.nprocs_); } } @@ -41,6 +41,7 @@ void PutTest(int nprocs, int rank, t.Resume(); for (int j = 0; j < repeat; ++j) { for (size_t i = 0; i < blobs_per_rank; ++i) { + // HILOG(kInfo, "On blob {}", i) size_t blob_name_int = rank * blobs_per_rank + i; std::string name = std::to_string(blob_name_int); bkt.AsyncPut(name, blob, ctx); diff --git a/hrun/include/hrun/queue_manager/queues/hshm_queue.h b/hrun/include/hrun/queue_manager/queues/hshm_queue.h index 907ce422d..11db153a1 100644 --- a/hrun/include/hrun/queue_manager/queues/hshm_queue.h +++ b/hrun/include/hrun/queue_manager/queues/hshm_queue.h @@ -239,10 +239,20 @@ struct MultiQueueT : public hipc::ShmContainer { /** Emplace a SHM pointer to a task */ HSHM_ALWAYS_INLINE - bool Emplace(u32 prio, u32 lane_hash, hipc::Pointer &p, bool complete = false) { + bool Emplace(u32 prio, u32 lane_hash, + hipc::Pointer &p, bool complete = false) { return Emplace(prio, lane_hash, LaneData(p, complete)); } + /** + * Emplace a SHM pointer to a task if the queue utilization is less than 50% + * */ + HSHM_ALWAYS_INLINE + bool EmplaceFrac(u32 prio, u32 lane_hash, + hipc::Pointer &p, bool complete = false) { + return EmplaceFrac(prio, lane_hash, LaneData(p, complete)); + } + /** Emplace a SHM pointer to a task */ bool Emplace(u32 prio, u32 lane_hash, const LaneData &data) { if (IsEmplacePlugged()) { @@ -255,6 +265,23 @@ struct MultiQueueT : public hipc::ShmContainer { return !ret.IsNull(); } + /** + * Emplace a SHM pointer to a task if the queue utilization is less than 50% + * */ + bool EmplaceFrac(u32 prio, u32 lane_hash, const LaneData &data) { + if (IsEmplacePlugged()) { + WaitForEmplacePlug(); + } + LaneGroup &lane_group = GetGroup(prio); + u32 lane_id = lane_hash % lane_group.num_lanes_; + Lane &lane = GetLane(lane_group, lane_id); + if (lane.GetSize() * 2 > lane.GetDepth()) { + return false; + } + hshm::qtok_t ret = lane.emplace(data); + return !ret.IsNull(); + } + /** * Change the number of active lanes * This assumes that PlugForResize and UnplugForResize are called externally. diff --git a/hrun/include/hrun/queue_manager/queues/mpsc_queue.h b/hrun/include/hrun/queue_manager/queues/mpsc_queue.h index abf459b32..46dee8a76 100644 --- a/hrun/include/hrun/queue_manager/queues/mpsc_queue.h +++ b/hrun/include/hrun/queue_manager/queues/mpsc_queue.h @@ -175,7 +175,8 @@ class mpsc_queue : public ShmContainer { // Check if there's space in the queue. if (size > queue.size()) { - HILOG(kInfo, "Queue {}/{} is full, waiting for space", id_, queue_->size()); + HILOG(kDebug, "Queue {}/{} is full, waiting for space", + id_, queue_->size()); while (true) { head = head_.load(); size = tail - head + 1; @@ -184,7 +185,7 @@ class mpsc_queue : public ShmContainer { } HERMES_THREAD_MODEL->Yield(); } - HILOG(kInfo, "Queue {}/{} got scheduled", id_, queue_->size()); + HILOG(kDebug, "Queue {}/{} got scheduled", id_, queue_->size()); } // Emplace into queue at our slot @@ -284,6 +285,22 @@ class mpsc_queue : public ShmContainer { return qtok_t::GetNull(); } } + + /** Current size of queue */ + size_t GetSize() { + size_t tail = tail_.load(); + size_t head = head_.load(); + if (tail <= head) { + return 0; + } else { + return tail - head; + } + } + + /** Max depth of queue */ + size_t GetDepth() { + return (*queue_).size(); + } }; } // namespace hshm::ipc diff --git a/hrun/tasks_required/proc_queue/src/proc_queue.cc b/hrun/tasks_required/proc_queue/src/proc_queue.cc index 9a4af3378..993c85ac4 100644 --- a/hrun/tasks_required/proc_queue/src/proc_queue.cc +++ b/hrun/tasks_required/proc_queue/src/proc_queue.cc @@ -48,8 +48,11 @@ class Server : public TaskLib { task->is_fire_forget_ = true; } MultiQueue *real_queue = HRUN_CLIENT->GetQueue(QueueId(ptr->task_state_)); - real_queue->Emplace(ptr->prio_, ptr->lane_hash_, task->sub_run_.shm_); - task->phase_ = PushTaskPhase::kWaitSchedule; + bool ret = real_queue->EmplaceFrac( + ptr->prio_, ptr->lane_hash_, task->sub_run_.shm_); + if (ret) { + task->phase_ = PushTaskPhase::kWaitSchedule; + } } case PushTaskPhase::kWaitSchedule: { Task *&ptr = task->sub_run_.ptr_; diff --git a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h index 6f911e863..3cb367c9d 100644 --- a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h +++ b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h @@ -424,9 +424,10 @@ class Client : public TaskLibClient { /** Initialize automatic flushing */ void AsyncFlushDataConstruct(FlushDataTask *task, - const TaskNode &task_node) { + const TaskNode &task_node, + size_t period_ms) { HRUN_CLIENT->ConstructTask( - task, task_node, id_); + task, task_node, id_, period_ms); } HRUN_TASK_NODE_PUSH_ROOT(FlushData); diff --git a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_tasks.h b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_tasks.h index 77eff2907..f3307a772 100644 --- a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_tasks.h +++ b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_tasks.h @@ -1158,7 +1158,8 @@ struct FlushDataTask : public Task, TaskFlags { HSHM_ALWAYS_INLINE explicit FlushDataTask(hipc::Allocator *alloc, const TaskNode &task_node, - const TaskStateId &state_id) : Task(alloc) { + const TaskStateId &state_id, + size_t period_ms) : Task(alloc) { // Initialize task task_node_ = task_node; lane_hash_ = 0; @@ -1171,7 +1172,7 @@ struct FlushDataTask : public Task, TaskFlags { TASK_LONG_RUNNING | TASK_COROUTINE | TASK_REMOTE_DEBUG_MARK); - SetPeriodSec(5); // TODO(llogan): don't hardcode this + SetPeriodMs((double)period_ms); domain_id_ = DomainId::GetLocal(); } diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index a13d3f36b..9576d390b 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -139,7 +139,8 @@ class Server : public TaskLib { bkt_mdm_.Init(task->bkt_mdm_, HRUN_ADMIN->queue_id_); stager_mdm_.Init(task->stager_mdm_, HRUN_ADMIN->queue_id_); op_mdm_.Init(task->op_mdm_, HRUN_ADMIN->queue_id_); - flush_task_ = blob_mdm_.AsyncFlushData(task->task_node_ + 1); + flush_task_ = blob_mdm_.AsyncFlushData( + task->task_node_ + 1, HERMES_SERVER_CONF.borg_.flush_period_); } task->SetModuleComplete(); }