Skip to content

Commit

Permalink
Use queues in libratemon_interp
Browse files Browse the repository at this point in the history
  • Loading branch information
ccanel committed May 1, 2024
1 parent b08c8c9 commit d611df4
Showing 1 changed file with 42 additions and 49 deletions.
91 changes: 42 additions & 49 deletions ratemon/runtime/libratemon_interp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

#include <boost/thread.hpp>
#include <boost/unordered/concurrent_flat_map.hpp>
#include <boost/unordered/concurrent_flat_set.hpp>
#include <unordered_set>
#include <mutex>
#include <queue>
#include <utility>
#include <vector>

Expand All @@ -28,17 +28,16 @@
volatile bool setup = false;
volatile bool skipped_first = false;

// Protects active_fds_queue and paused_fds_queue.
std::mutex lock;
std::queue<int> active_fds_queue;
std::queue<int> paused_fds_queue;

// BPF things.
struct ratemon_maps_bpf *skel = NULL;
// struct bpf_map *flow_to_rwnd = NULL;
int flow_to_rwnd_fd = 0;

// File descriptors of flows that are allowed to send and therefore do not have
// an entry in flow_to_rwnd.
boost::unordered::concurrent_flat_set<int> active_fds;
// File descriptors of flows that are paused and therefore have an entry of 0 B
// in flow_to_rwnd.
boost::unordered::concurrent_flat_set<int> paused_fds;
// Maps file descriptor to flow struct.
boost::unordered::concurrent_flat_map<int, struct rm_flow> fd_to_flow;

Expand All @@ -64,15 +63,9 @@ inline void trigger_ack(int fd) {
}

void thread_func() {
// Variables used during each scheduling epoch.
// Currently active flows, soon to be previously active.
std::vector<int> prev_active_fds;
// Length of prev_active_fds, which is the number of flows to be unpaused.
size_t num_to_find = 0;
// Previously paused flows that will be activated.
std::vector<int> new_active_fds;
// Preallocate suffient space.
prev_active_fds.reserve(max_active_flows);
new_active_fds.reserve(max_active_flows);

if (max_active_flows == 0 || epoch_us == 0) {
Expand All @@ -98,54 +91,51 @@ void thread_func() {

// If fewer than the max number of flows exist and they are all active, then
// there is no need for scheduling.
if (active_fds.size() < max_active_flows && paused_fds.size() == 0) {
if (active_fds_queue.size() < max_active_flows &&
paused_fds_queue.size() == 0) {
// RM_PRINTF("WARNING insufficient flows, skipping scheduling\n");
continue;
}

RM_PRINTF("Performing scheduling\n");

prev_active_fds.clear();
new_active_fds.clear();

// Make a copy of the currently (soon-to-be previously) active flows.
active_fds.visit_all([&](const int &fd) { prev_active_fds.push_back(fd); });

// For each previously active flow, try to find a paused flow to replace it.
num_to_find = prev_active_fds.size();
paused_fds.visit_while([&](const int &x) {
if (new_active_fds.size() < num_to_find) {
new_active_fds.push_back(x);
return true;
} else {
return false;
lock.lock();

// Try to find max_active_flows FDs to unpause.
while (!paused_fds_queue.empty() and
new_active_fds.size() < max_active_flows) {
// If we still know about this flow, then we can activate it.
int next_fd = paused_fds_queue.front();
if (fd_to_flow.visit(next_fd, [](const auto &) {})) {
paused_fds_queue.pop();
new_active_fds.push_back(next_fd);
}
});
}
auto num_prev_active = active_fds_queue.size();

// For each of the flows chosen to be activated, add it to the active set
// and remove it from both the paused set and the RWND map. Trigger an ACK
// to wake it up. Note that twice the allowable number of flows will be
// active briefly.
// and remove it from the RWND map. Trigger an ACK to wake it up. Note that
// twice the allowable number of flows will be active briefly.
RM_PRINTF("Activating %lu flows: ", new_active_fds.size());
for (const auto &fd : new_active_fds) {
RM_PRINTF("%d ", fd);
active_fds.insert(fd);
paused_fds.erase(fd);
active_fds_queue.push(fd);
fd_to_flow.visit(fd, [](const auto &p) {
bpf_map_delete_elem(flow_to_rwnd_fd, &p.second);
});
trigger_ack(fd);
}
RM_PRINTF("\n");

// For each fo the previously active flows, add it to the paused set, remove
// it from the active set, install an RWND mapping to actually pause it, and
// trigger an ACK to communicate the new RWND value.
RM_PRINTF("Pausing %lu flows: ", prev_active_fds.size());
for (const auto &fd : prev_active_fds) {
// For each fo the previously active flows, add it to the paused set,
// install an RWND mapping to actually pause it, and trigger an ACK to
// communicate the new RWND value.
RM_PRINTF("Pausing %lu flows: ", num_prev_active);
for (unsigned long i = 0; i < num_prev_active; i++) {
int fd = active_fds_queue.front();
active_fds_queue.pop();
RM_PRINTF("%d ", fd);
paused_fds.insert(fd);
active_fds.erase(fd);
paused_fds_queue.push(fd);
fd_to_flow.visit(fd, [](const auto &p) {
bpf_map_update_elem(flow_to_rwnd_fd, &p.second, &zero, BPF_ANY);
});
Expand All @@ -154,8 +144,7 @@ void thread_func() {
}
RM_PRINTF("\n");

// Clear temporary data structures.
prev_active_fds.clear();
lock.unlock();
new_active_fds.clear();

fflush(stdout);
Expand Down Expand Up @@ -270,17 +259,21 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
// flow.local_addr, flow.local_port);
fd_to_flow.insert({new_fd, flow});

lock.lock();

// Should this flow be active or paused?
if (active_fds.size() < max_active_flows) {
if (active_fds_queue.size() < max_active_flows) {
// Less than the max number of flows are active, so make this one active.
active_fds.insert(new_fd);
active_fds_queue.push(new_fd);
} else {
// The max number of flows are active already, so pause this one.
paused_fds.insert(new_fd);
paused_fds_queue.push(new_fd);
// Pausing a flow means retting its RWND to 0 B.
bpf_map_update_elem(flow_to_rwnd_fd, &flow, &zero, BPF_ANY);
}

lock.unlock();

RM_PRINTF("Successful 'accept' for FD=%d, got FD=%d\n", sockfd, new_fd);
return new_fd;
}
Expand All @@ -299,14 +292,14 @@ int close(int sockfd) {
return ret;
}

active_fds.erase(sockfd);
paused_fds.erase(sockfd);
// To get the flow struct for this FD, we must use visit() to look it up
// in the concurrent_flat_map. Obviously, do this before removing the FD
// from fd_to_flow.
fd_to_flow.visit(sockfd, [](const auto &p) {
bpf_map_delete_elem(flow_to_rwnd_fd, &p.second);
});
// Removing the FD from fd_to_flow triggers it to be (eventually) removed from
// scheduling.
fd_to_flow.erase(sockfd);

RM_PRINTF("Successful 'close' for FD=%d\n", sockfd);
Expand Down

0 comments on commit d611df4

Please sign in to comment.