From d611df49c4821d9a229e49e640e259f150d7a990 Mon Sep 17 00:00:00 2001 From: Christopher Canel Date: Wed, 1 May 2024 00:03:25 +0000 Subject: [PATCH] Use queues in libratemon_interp --- ratemon/runtime/libratemon_interp.cpp | 91 +++++++++++++-------------- 1 file changed, 42 insertions(+), 49 deletions(-) diff --git a/ratemon/runtime/libratemon_interp.cpp b/ratemon/runtime/libratemon_interp.cpp index 9606497..bb6385e 100644 --- a/ratemon/runtime/libratemon_interp.cpp +++ b/ratemon/runtime/libratemon_interp.cpp @@ -17,8 +17,8 @@ #include #include -#include -#include +#include +#include #include #include @@ -28,17 +28,16 @@ volatile bool setup = false; volatile bool skipped_first = false; +// Protects active_fds_queue and paused_fds_queue. +std::mutex lock; +std::queue active_fds_queue; +std::queue paused_fds_queue; + // BPF things. struct ratemon_maps_bpf *skel = NULL; // struct bpf_map *flow_to_rwnd = NULL; int flow_to_rwnd_fd = 0; -// File descriptors of flows that are allowed to send and therefore do not have -// an entry in flow_to_rwnd. -boost::unordered::concurrent_flat_set active_fds; -// File descriptors of flows that are paused and therefore have an entry of 0 B -// in flow_to_rwnd. -boost::unordered::concurrent_flat_set paused_fds; // Maps file descriptor to flow struct. boost::unordered::concurrent_flat_map fd_to_flow; @@ -64,15 +63,9 @@ inline void trigger_ack(int fd) { } void thread_func() { - // Variables used during each scheduling epoch. - // Currently active flows, soon to be previously active. - std::vector prev_active_fds; - // Length of prev_active_fds, which is the number of flows to be unpaused. - size_t num_to_find = 0; // Previously paused flows that will be activated. std::vector new_active_fds; // Preallocate suffient space. - prev_active_fds.reserve(max_active_flows); new_active_fds.reserve(max_active_flows); if (max_active_flows == 0 || epoch_us == 0) { @@ -98,39 +91,35 @@ void thread_func() { // If fewer than the max number of flows exist and they are all active, then // there is no need for scheduling. - if (active_fds.size() < max_active_flows && paused_fds.size() == 0) { + if (active_fds_queue.size() < max_active_flows && + paused_fds_queue.size() == 0) { // RM_PRINTF("WARNING insufficient flows, skipping scheduling\n"); continue; } RM_PRINTF("Performing scheduling\n"); - - prev_active_fds.clear(); new_active_fds.clear(); - - // Make a copy of the currently (soon-to-be previously) active flows. - active_fds.visit_all([&](const int &fd) { prev_active_fds.push_back(fd); }); - - // For each previously active flow, try to find a paused flow to replace it. - num_to_find = prev_active_fds.size(); - paused_fds.visit_while([&](const int &x) { - if (new_active_fds.size() < num_to_find) { - new_active_fds.push_back(x); - return true; - } else { - return false; + lock.lock(); + + // Try to find max_active_flows FDs to unpause. + while (!paused_fds_queue.empty() and + new_active_fds.size() < max_active_flows) { + // If we still know about this flow, then we can activate it. + int next_fd = paused_fds_queue.front(); + if (fd_to_flow.visit(next_fd, [](const auto &) {})) { + paused_fds_queue.pop(); + new_active_fds.push_back(next_fd); } - }); + } + auto num_prev_active = active_fds_queue.size(); // For each of the flows chosen to be activated, add it to the active set - // and remove it from both the paused set and the RWND map. Trigger an ACK - // to wake it up. Note that twice the allowable number of flows will be - // active briefly. + // and remove it from the RWND map. Trigger an ACK to wake it up. Note that + // twice the allowable number of flows will be active briefly. RM_PRINTF("Activating %lu flows: ", new_active_fds.size()); for (const auto &fd : new_active_fds) { RM_PRINTF("%d ", fd); - active_fds.insert(fd); - paused_fds.erase(fd); + active_fds_queue.push(fd); fd_to_flow.visit(fd, [](const auto &p) { bpf_map_delete_elem(flow_to_rwnd_fd, &p.second); }); @@ -138,14 +127,15 @@ void thread_func() { } RM_PRINTF("\n"); - // For each fo the previously active flows, add it to the paused set, remove - // it from the active set, install an RWND mapping to actually pause it, and - // trigger an ACK to communicate the new RWND value. - RM_PRINTF("Pausing %lu flows: ", prev_active_fds.size()); - for (const auto &fd : prev_active_fds) { + // For each fo the previously active flows, add it to the paused set, + // install an RWND mapping to actually pause it, and trigger an ACK to + // communicate the new RWND value. + RM_PRINTF("Pausing %lu flows: ", num_prev_active); + for (unsigned long i = 0; i < num_prev_active; i++) { + int fd = active_fds_queue.front(); + active_fds_queue.pop(); RM_PRINTF("%d ", fd); - paused_fds.insert(fd); - active_fds.erase(fd); + paused_fds_queue.push(fd); fd_to_flow.visit(fd, [](const auto &p) { bpf_map_update_elem(flow_to_rwnd_fd, &p.second, &zero, BPF_ANY); }); @@ -154,8 +144,7 @@ void thread_func() { } RM_PRINTF("\n"); - // Clear temporary data structures. - prev_active_fds.clear(); + lock.unlock(); new_active_fds.clear(); fflush(stdout); @@ -270,17 +259,21 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { // flow.local_addr, flow.local_port); fd_to_flow.insert({new_fd, flow}); + lock.lock(); + // Should this flow be active or paused? - if (active_fds.size() < max_active_flows) { + if (active_fds_queue.size() < max_active_flows) { // Less than the max number of flows are active, so make this one active. - active_fds.insert(new_fd); + active_fds_queue.push(new_fd); } else { // The max number of flows are active already, so pause this one. - paused_fds.insert(new_fd); + paused_fds_queue.push(new_fd); // Pausing a flow means retting its RWND to 0 B. bpf_map_update_elem(flow_to_rwnd_fd, &flow, &zero, BPF_ANY); } + lock.unlock(); + RM_PRINTF("Successful 'accept' for FD=%d, got FD=%d\n", sockfd, new_fd); return new_fd; } @@ -299,14 +292,14 @@ int close(int sockfd) { return ret; } - active_fds.erase(sockfd); - paused_fds.erase(sockfd); // To get the flow struct for this FD, we must use visit() to look it up // in the concurrent_flat_map. Obviously, do this before removing the FD // from fd_to_flow. fd_to_flow.visit(sockfd, [](const auto &p) { bpf_map_delete_elem(flow_to_rwnd_fd, &p.second); }); + // Removing the FD from fd_to_flow triggers it to be (eventually) removed from + // scheduling. fd_to_flow.erase(sockfd); RM_PRINTF("Successful 'close' for FD=%d\n", sockfd);