Skip to content

Commit

Permalink
add global priority queue
Browse files Browse the repository at this point in the history
  • Loading branch information
jiazheng.jia committed Jan 9, 2025
1 parent cce48a0 commit 7ca337d
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/bthread/parking_lot.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class BAIDU_CACHELINE_ALIGNMENT ParkingLot {
int val;
};

ParkingLot() : _pending_signal(0){}
ParkingLot() : _pending_signal(0) {}

// Wake up at most `num_task' workers.
// Returns #workers woken up.
Expand Down
41 changes: 15 additions & 26 deletions src/bthread/task_control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ TaskControl::TaskControl()
, _signal_per_second(&_cumulated_signal_count)
, _status(print_rq_sizes_in_the_tc, this)
, _nbthreads("bthread_count")
, _priority_qs(FLAGS_task_group_ntags)
, _pl(FLAGS_task_group_ntags)
, _epoll_tid_states(FLAGS_task_group_ntags)
{}

int TaskControl::init(int concurrency) {
Expand All @@ -208,6 +208,10 @@ int TaskControl::init(int concurrency) {
_tagged_worker_usage_second.push_back(new bvar::PerSecond<bvar::PassiveStatus<double>>(
"bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1));
_tagged_nbthreads.push_back(new bvar::Adder<int64_t>("bthread_count", tag_str));
if (_priority_qs[i].init(BTHREAD_MAX_CONCURRENCY) != 0) {
LOG(FATAL) << "Fail to init _priority_q";
return -1;
}
}

// Make sure TimerThread is ready.
Expand Down Expand Up @@ -431,15 +435,9 @@ int TaskControl::_destroy_group(TaskGroup* g) {

bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
auto tag = tls_task_group->tag();
// epoll tid should be stolen first.
for (auto &epoll_state : _epoll_tid_states[tag]) {
bool expected_state = true;
if (epoll_state.second.compare_exchange_strong(
expected_state, false, butil::memory_order_seq_cst,
butil::memory_order_relaxed)) {
*tid = epoll_state.first;
return true;
}

if (_priority_qs[tag].steal(tid)) {
return true;
}

// 1: Acquiring fence is paired with releasing fence in _add_group to
Expand Down Expand Up @@ -482,30 +480,22 @@ void TaskControl::signal_task(int num_task, bthread_tag_t tag) {
if (num_task > 2) {
num_task = 2;
}
if (ParkingLot::_waiting_count.load(std::memory_order_acquire) == 0) {
if (FLAGS_bthread_min_concurrency > 0 &&
_concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) {
// TODO: Reduce this lock
BAIDU_SCOPED_LOCK(g_task_control_mutex);
if (_concurrency.load(butil::memory_order_acquire) < FLAGS_bthread_concurrency) {
add_workers(1, tag);
}
} else {
return;
}
}
auto& pl = tag_pl(tag);
int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
num_task -= pl[start_index].signal(1);
if (num_task > 0) {
// WARNING: This allow some bad case happen when wait_count is not accurente.
auto wait_count = ParkingLot::_waiting_count.load(butil::memory_order_relaxed);
if (wait_count > 0) {
num_task -= pl[start_index].signal(1);
}
if (num_task > 0 && wait_count > 0) {
for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
if (++start_index >= PARKING_LOT_NUM) {
start_index = 0;
}
num_task -= pl[start_index].signal(1);
}
}
if (num_task > 0 &&
if (num_task > 0 && wait_count >0 &&
FLAGS_bthread_min_concurrency > 0 && // test min_concurrency for performance
_concurrency.load(butil::memory_order_relaxed) < FLAGS_bthread_concurrency) {
// TODO: Reduce this lock
Expand Down Expand Up @@ -600,7 +590,6 @@ bvar::LatencyRecorder* TaskControl::create_exposed_pending_time() {
}

void TaskControl::set_group_epoll_tid(bthread_tag_t tag, bthread_t tid) {
_epoll_tid_states[tag][tid] = false;
auto groups = tag_group(tag);
const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_acquire);
for (size_t i = 0; i < ngroup; i++) {
Expand Down
8 changes: 3 additions & 5 deletions src/bthread/task_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,14 @@ friend bthread_t init_for_pthread_stack_trace();
// Only deal once when init epoll bthread.
void set_group_epoll_tid(bthread_tag_t tag, bthread_t tid);

void epoll_waiting(bthread_tag_t tag, bthread_t tid) {
_epoll_tid_states[tag][tid].store(true, butil::memory_order_release);
void push_priority_q(bthread_tag_t tag, bthread_t tid) {
_priority_qs[tag].push(tid);
}

private:
typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
static const int PARKING_LOT_NUM = 4;
typedef std::array<ParkingLot, PARKING_LOT_NUM> TaggedParkingLot;
typedef std::unordered_map<bthread_t, butil::atomic<bool>> EpollTidState;
// Add/Remove a TaskGroup.
// Returns 0 on success, -1 otherwise.
int _add_group(TaskGroup*, bthread_tag_t tag);
Expand Down Expand Up @@ -161,14 +160,13 @@ friend bthread_t init_for_pthread_stack_trace();
std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time;
std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*> _tagged_worker_usage_second;
std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;
std::vector<WorkStealingQueue<bthread_t>> _priority_qs;

std::vector<TaggedParkingLot> _pl;

#ifdef BRPC_BTHREAD_TRACER
TaskTracer _task_tracer;
#endif // BRPC_BTHREAD_TRACER

std::vector<EpollTidState> _epoll_tid_states;
};

inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() {
Expand Down
6 changes: 3 additions & 3 deletions src/bthread/task_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ int TaskGroup::start_foreground(TaskGroup** pg,
// NOSIGNAL affects current task, not the new task.
RemainedFn fn = NULL;
if (g->cur_epoll_tid()) {
fn = ready_to_run_epoll;
fn = priority_to_run;
} else if (g->current_task()->about_to_quit) {
fn = ready_to_run_in_worker_ignoresignal;
} else {
Expand Down Expand Up @@ -809,9 +809,9 @@ void TaskGroup::ready_to_run_in_worker_ignoresignal(void* args_in) {
return tls_task_group->push_rq(args->meta->tid);
}

void TaskGroup::ready_to_run_epoll(void* args_in) {
void TaskGroup::priority_to_run(void* args_in) {
ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
return tls_task_group->control()->epoll_waiting(args->tag, args->meta->tid);
return tls_task_group->control()->push_priority_q(args->tag, args->meta->tid);
}

struct SleepArgs {
Expand Down
2 changes: 1 addition & 1 deletion src/bthread/task_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ friend class TaskControl;
};
static void ready_to_run_in_worker(void*);
static void ready_to_run_in_worker_ignoresignal(void*);
static void ready_to_run_epoll(void*);
static void priority_to_run(void*);

// Wait for a task to run.
// Returns true on success, false is treated as permanent error and the
Expand Down

0 comments on commit 7ca337d

Please sign in to comment.