Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add enable witness to leader to raft group #483

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
/runtime
/output
/test/output

/bld
# Ignore hidden files
.*
*.swp
Expand Down
1 change: 1 addition & 0 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ int NodeImpl::init(const NodeOptions& options) {
rg_options.election_timeout_ms = _options.election_timeout_ms;
rg_options.log_manager = _log_manager;
rg_options.ballot_box = _ballot_box;
rg_options.send_data_to_witness = _options.send_data_to_witness;
rg_options.node = this;
rg_options.snapshot_throttle = _options.snapshot_throttle
? _options.snapshot_throttle->get()
Expand Down
1 change: 1 addition & 0 deletions src/braft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ struct NodeOptions {
// Default: false
bool witness = false;
// Construct a default instance
bool send_data_to_witness = true;
NodeOptions();

int get_catchup_timeout_ms();
Expand Down
19 changes: 18 additions & 1 deletion src/braft/replicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ DEFINE_int32(raft_retry_replicate_interval_ms, 1000,
"Interval of retry to append entries or install snapshot");
BRPC_VALIDATE_GFLAG(raft_retry_replicate_interval_ms,
brpc::PositiveInteger);
DEFINE_bool(raft_use_conn_pool, false, "use conn pool for raft replicator");
BRPC_VALIDATE_GFLAG(raft_use_conn_pool, ::brpc::PassValidate);

DECLARE_bool(raft_enable_witness_to_leader);
DECLARE_int64(raft_append_entry_high_lat_us);
Expand Down Expand Up @@ -115,6 +117,9 @@ int Replicator::start(const ReplicatorOptions& options, ReplicatorId *id) {
Replicator* r = new Replicator();
brpc::ChannelOptions channel_opt;
channel_opt.connect_timeout_ms = FLAGS_raft_rpc_channel_connect_timeout_ms;
if (FLAGS_raft_use_conn_pool) {
channel_opt.connection_type = "pooled";
}
channel_opt.timeout_ms = -1; // We don't need RPC timeout
if (r->_sending_channel.Init(options.peer_id.addr, &channel_opt) != 0) {
LOG(ERROR) << "Fail to init sending channel"
Expand Down Expand Up @@ -630,6 +635,11 @@ int Replicator::_prepare_entry(int offset, EntryMeta* em, butil::IOBuf *data) {
} else {
CHECK(entry->type != ENTRY_TYPE_CONFIGURATION) << "log_index=" << log_index;
}
// use group-level configuration preferentially
if (is_witness() && !_options.send_data_to_witness) {
entry->Release();
return 0;
}
if (!is_witness() || FLAGS_raft_enable_witness_to_leader) {
em->set_data_len(entry->data.length());
data->append(entry->data);
Expand Down Expand Up @@ -1382,6 +1392,7 @@ int ReplicatorGroup::init(const NodeId& node_id, const ReplicatorGroupOptions& o
_election_timeout_ms = options.election_timeout_ms;
_common_options.log_manager = options.log_manager;
_common_options.ballot_box = options.ballot_box;
_common_options.send_data_to_witness = options.send_data_to_witness;
_common_options.node = options.node;
_common_options.term = 0;
_common_options.group_id = node_id.group_id;
Expand Down Expand Up @@ -1549,12 +1560,18 @@ int ReplicatorGroup::find_the_next_candidate(
}
const int64_t next_index = Replicator::get_next_index(iter->id_and_status.id);
const int consecutive_error_times = Replicator::get_consecutive_error_times(iter->id_and_status.id);
if (consecutive_error_times == 0 && next_index > max_index && !iter->peer_id.is_witness()) {
if (consecutive_error_times == 0 && next_index > max_index) {
max_index = next_index;
if (peer_id) {
*peer_id = iter->peer_id;
}
}
// transfer leadership to the non witness peer priority.
if (consecutive_error_times == 0 && next_index == max_index) {
if (peer_id && peer_id->is_witness()) {
*peer_id = iter->peer_id;
}
}
}
if (max_index == 0) {
return -1;
Expand Down
2 changes: 2 additions & 0 deletions src/braft/replicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct ReplicatorOptions {
ReplicatorOptions();
int* dynamic_heartbeat_timeout_ms;
int* election_timeout_ms;
bool send_data_to_witness;
GroupId group_id;
PeerId server_id;
PeerId peer_id;
Expand Down Expand Up @@ -267,6 +268,7 @@ struct ReplicatorGroupOptions {
ReplicatorGroupOptions();
int heartbeat_timeout_ms;
int election_timeout_ms;
bool send_data_to_witness = true;
LogManager* log_manager;
BallotBox* ballot_box;
NodeImpl* node;
Expand Down
Loading