Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local resume task queue per task group. #2398

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
abe5a5a
add resume_rq for remote task and update wait_task(),sched(),ending_s…
lzxddz Jun 14, 2023
0145eed
add remote queue size bvar
MrGuin Aug 10, 2023
b0e2b9e
add bvar consume command and socket write latency; remove busy loop i…
MrGuin Aug 11, 2023
bd54270
include fix
MrGuin Aug 25, 2023
8158abd
remove duplicate header
MrGuin Aug 25, 2023
d48aa3c
Update src/bthread/parking_lot.cpp
MrGuin Aug 25, 2023
86048e8
Update src/bthread/moodycamelqueue.h
MrGuin Aug 25, 2023
d77640f
Merge pull request #4 from monographdb/resume_q_zkl
MrGuin Aug 25, 2023
bc3eaab
Add no_signal parameter to notify_one. (#5)
MrGuin Sep 4, 2023
03bc4be
Update minimum virsion requirements for dependancies. (#6)
weidaolee Sep 14, 2023
73fb5a9
change static resume_rq to shared_ptr get from singleton object (#8)
MrGuin Sep 14, 2023
0e8e5a4
Add memory header file
zhangh43 Sep 15, 2023
62a3c88
include headers (#9)
MrGuin Sep 15, 2023
c9b7ad5
set default behaviour for bthread_cond_signal to no signal (#7)
MrGuin Sep 25, 2023
2986f4d
fix the problem that butex_wake does not signal pending tasks (#13)
MrGuin Dec 13, 2023
846f5ac
Redis transaction support. (#12)
MrGuin Dec 14, 2023
a70b93f
local resume_rq each task group
MrGuin Sep 27, 2023
6686bfe
wait_task busy loop before waiting on PL
MrGuin Oct 13, 2023
7e81e6e
add bvar ready_to_run_skip_signal_task_per_second
MrGuin Oct 13, 2023
8009b20
change wait_task busy poll time from 100ms to 15ms
MrGuin Jan 9, 2024
6269bf1
check waiting_worker_num in signal_task
MrGuin Jan 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ SET(CPACK_DEBIAN_PACKAGE_MAINTAINER "brpc authors")
INCLUDE(CPack)

if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
# require at least gcc 4.8
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 4.8)
message(FATAL_ERROR "GCC is too old, please install a newer version supporting C++11")
# require at least gcc 8
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 8) # ref: https://gcc.gnu.org/projects/cxx-status.html
message(FATAL_ERROR "GCC is too old, please install a newer version supporting C++17")
endif()
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
# require at least clang 3.3
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 3.3)
message(FATAL_ERROR "Clang is too old, please install a newer version supporting C++11")
# require at least clang 5
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 5) # ref: https://clang.llvm.org/cxx_status.html
message(FATAL_ERROR "Clang is too old, please install a newer version supporting C++17")
endif()
else()
message(WARNING "You are using an unsupported compiler! Compilation has only been tested with Clang and GCC.")
Expand Down Expand Up @@ -121,21 +121,21 @@ set(CMAKE_CPP_FLAGS "${CMAKE_CPP_FLAGS} ${DEBUG_SYMBOL} ${THRIFT_CPP_FLAG}")
set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -O2 -pipe -Wall -W -fPIC -fstrict-aliasing -Wno-invalid-offsetof -Wno-unused-parameter -fno-omit-frame-pointer")
set(CMAKE_C_FLAGS "${CMAKE_CPP_FLAGS} -O2 -pipe -Wall -W -fPIC -fstrict-aliasing -Wno-unused-parameter -fno-omit-frame-pointer")

macro(use_cxx11)
macro(use_cxx17)
if(CMAKE_VERSION VERSION_LESS "3.1.3")
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17")
endif()
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17")
endif()
else()
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
endif()
endmacro(use_cxx11)
endmacro(use_cxx17)

use_cxx11()
use_cxx17()

if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
#required by butil/crc32.cc to boost performance for 10x
Expand Down Expand Up @@ -169,8 +169,9 @@ if(WITH_SNAPPY)
endif()

if(WITH_GLOG)
message(NOTICE "BRPC WITH_GLOG=ON")
find_path(GLOG_INCLUDE_PATH NAMES glog/logging.h)
find_library(GLOG_LIB NAMES glog)
find_library(GLOG_LIB NAMES glog VERSION ">=0.6.0" REQUIRE)
if((NOT GLOG_INCLUDE_PATH) OR (NOT GLOG_LIB))
message(FATAL_ERROR "Fail to find glog")
endif()
Expand Down
49 changes: 45 additions & 4 deletions src/brpc/policy/redis_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "brpc/redis.h"
#include "brpc/redis_command.h"
#include "brpc/policy/redis_protocol.h"
#include "bvar/latency_recorder.h"

namespace brpc {

Expand All @@ -59,6 +60,7 @@ class RedisConnContext : public Destroyable {
public:
explicit RedisConnContext(const RedisService* rs)
: redis_service(rs)
, in_transaction(false)
, batched_size(0) {}

~RedisConnContext();
Expand All @@ -68,7 +70,10 @@ class RedisConnContext : public Destroyable {
const RedisService* redis_service;
// If user starts a transaction, transaction_handler indicates the
// handler pointer that runs the transaction command.
std::unique_ptr<RedisCommandHandler> transaction_handler;
std::unique_ptr<TransactionHandler> transaction_handler;
// Whether this connection has begun a transaction. If true, the commands
// received will be handled by transaction_handler.
bool in_transaction;
// >0 if command handler is run in batched mode.
int batched_size;

Expand All @@ -82,15 +87,33 @@ int ConsumeCommand(RedisConnContext* ctx,
butil::IOBufAppender* appender) {
RedisReply output(&ctx->arena);
RedisCommandHandlerResult result = REDIS_CMD_HANDLED;
if (ctx->transaction_handler) {
if (ctx->in_transaction) {
assert(ctx->transaction_handler != nullptr);
result = ctx->transaction_handler->Run(args, &output, flush_batched);
if (result == REDIS_CMD_HANDLED) {
ctx->transaction_handler.reset(NULL);
ctx->in_transaction = false;
} else if (result == REDIS_CMD_BATCHED) {
LOG(ERROR) << "BATCHED should not be returned by a transaction handler.";
return -1;
}
} else {
}
else if (args[0] == "watch" || args[0] == "unwatch") {
if (!ctx->transaction_handler) {
ctx->transaction_handler.reset(ctx->redis_service->NewTransactionHandler());
ctx->in_transaction = false;
}
if (!ctx->transaction_handler) {
output.SetError("ERR Transaction not supported.");
} else {
result = ctx->transaction_handler->Run(args, &output, flush_batched);
if (result == REDIS_CMD_BATCHED) {
LOG(ERROR) << "BATCHED should not be returned by a transaction handler.";
return -1;
}
}
}
else {
RedisCommandHandler* ch = ctx->redis_service->FindCommandHandler(args[0]);
if (!ch) {
char buf[64];
Expand All @@ -103,7 +126,16 @@ int ConsumeCommand(RedisConnContext* ctx,
LOG(ERROR) << "CONTINUE should not be returned in a batched process.";
return -1;
}
ctx->transaction_handler.reset(ch->NewTransactionHandler());
if (ctx->transaction_handler == nullptr) {
ctx->transaction_handler.reset(ctx->redis_service->NewTransactionHandler());
}
if (ctx->transaction_handler != nullptr) {
ctx->transaction_handler->Begin();
ctx->in_transaction = true;
}
else {
output.SetError("ERR Transaction not supported.");
}
} else if (result == REDIS_CMD_BATCHED) {
ctx->batched_size++;
}
Expand Down Expand Up @@ -144,6 +176,9 @@ void RedisConnContext::Destroy() {

// ========== impl of RedisConnContext ==========

inline bvar::LatencyRecorder socket_write_latency("socket", "write");
inline bvar::LatencyRecorder consume_cmd_latency("socket", "consume_cmd");

ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
bool read_eof, const void* arg) {
if (read_eof || source->empty()) {
Expand Down Expand Up @@ -174,22 +209,28 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
if (err != PARSE_OK) {
break;
}
int64_t start_time_us = butil::cpuwide_time_us();
if (ConsumeCommand(ctx, current_args, false, &appender) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
consume_cmd_latency << (butil::cpuwide_time_us() - start_time_us);
current_args.swap(next_args);
}
int64_t start_time_us = butil::cpuwide_time_us();
if (ConsumeCommand(ctx, current_args,
true /*must be the last message*/, &appender) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
consume_cmd_latency << (butil::cpuwide_time_us() - start_time_us);
butil::IOBuf sendbuf;
appender.move_to(sendbuf);
CHECK(!sendbuf.empty());
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
start_time_us = butil::cpuwide_time_us();
LOG_IF(WARNING, socket->Write(&sendbuf, &wopt) != 0)
<< "Fail to send redis reply";
socket_write_latency << (butil::cpuwide_time_us() - start_time_us);
if(ctx->parser.ParsedArgsSize() == 0) {
ctx->arena.clear();
}
Expand Down
5 changes: 5 additions & 0 deletions src/brpc/redis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,11 @@ RedisCommandHandler* RedisService::FindCommandHandler(const butil::StringPiece&
return NULL;
}

TransactionHandler* RedisService::NewTransactionHandler() const {
LOG(ERROR) << "NewTransactionHandler is not implemented";
return NULL;
}

RedisCommandHandler* RedisCommandHandler::NewTransactionHandler() {
LOG(ERROR) << "NewTransactionHandler is not implemented";
return NULL;
Expand Down
11 changes: 11 additions & 0 deletions src/brpc/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ std::ostream& operator<<(std::ostream& os, const RedisRequest&);
std::ostream& operator<<(std::ostream& os, const RedisResponse&);

class RedisCommandHandler;
class TransactionHandler;

// Container of CommandHandlers.
// Assign an instance to ServerOption.redis_service to enable redis support.
Expand All @@ -231,6 +232,9 @@ class RedisService {
// Call this function to register `handler` that can handle command `name`.
bool AddCommandHandler(const std::string& name, RedisCommandHandler* handler);

// Create a transaction handler to handle commands inside a transaction.
virtual TransactionHandler* NewTransactionHandler() const;

// This function should not be touched by user and used by brpc deverloper only.
RedisCommandHandler* FindCommandHandler(const butil::StringPiece& name) const;

Expand All @@ -243,6 +247,8 @@ enum RedisCommandHandlerResult {
REDIS_CMD_HANDLED = 0,
REDIS_CMD_CONTINUE = 1,
REDIS_CMD_BATCHED = 2,
REDIS_CMD_TXN_START = 3,
REDIS_CMD_TXN_FINISH = 4,
};

// The Command handler for a redis request. User should impletement Run().
Expand Down Expand Up @@ -289,6 +295,11 @@ class RedisCommandHandler {
virtual RedisCommandHandler* NewTransactionHandler();
};

class TransactionHandler : public RedisCommandHandler {
public:
virtual bool Begin() = 0;
};

} // namespace brpc

#endif // BRPC_REDIS_H
6 changes: 5 additions & 1 deletion src/bthread/condition_variable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ extern "C" {

extern int bthread_mutex_unlock(bthread_mutex_t*);
extern int bthread_mutex_lock_contended(bthread_mutex_t*);
extern void bthread_flush();

int bthread_cond_init(bthread_cond_t* __restrict c,
const bthread_condattr_t*) {
Expand All @@ -65,7 +66,10 @@ int bthread_cond_signal(bthread_cond_t* c) {
butil::atomic<int>* const saved_seq = ic->seq;
saved_seq->fetch_add(1, butil::memory_order_release);
// don't touch ic any more
bthread::butex_wake(saved_seq);
bool no_signal = true;
bthread::butex_wake(saved_seq, no_signal);
// flush unsignaled tasks manually
bthread_flush();
return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion src/bthread/condition_variable.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ConditionVariable {
DISALLOW_COPY_AND_ASSIGN(ConditionVariable);
public:
typedef bthread_cond_t* native_handler_type;

ConditionVariable() {
CHECK_EQ(0, bthread_cond_init(&_cond, NULL));
}
Expand Down
Loading