Skip to content

Commit

Permalink
Resetted nodes were briefly identified as alive. (#133)
Browse files Browse the repository at this point in the history
The reason is that we call report_heartbeat in the reset procedure.
The purpose of this call is to make sure that we have a record in the failure detector for the node.

This PR just creates the failure detector entry, and avoid recording a
heartbeat change.

This PR also makes sure we do not count the first heartbeat record as an event.

Closes #132
  • Loading branch information
fulmicoton authored Feb 28, 2024
1 parent b3f109b commit 867207e
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 6 deletions.
14 changes: 11 additions & 3 deletions chitchat/src/failure_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ impl FailureDetector {
}

/// Reports node heartbeat.
pub fn report_heartbeat(&mut self, chitchat_id: &ChitchatId) {
debug!(node_id=%chitchat_id.node_id, "reporting node heartbeat.");
pub(crate) fn get_or_create_sampling_window(
&mut self,
chitchat_id: &ChitchatId,
) -> &mut SamplingWindow {
self.node_samples
.entry(chitchat_id.clone())
.or_insert_with(|| {
Expand All @@ -42,6 +44,12 @@ impl FailureDetector {
self.config.initial_interval,
)
})
}

/// Reports node heartbeat.
pub fn report_heartbeat(&mut self, chitchat_id: &ChitchatId) {
debug!(node_id=%chitchat_id.node_id, "reporting node heartbeat.");
self.get_or_create_sampling_window(chitchat_id)
.report_heartbeat();
}

Expand Down Expand Up @@ -179,7 +187,7 @@ impl AdditiveSmoothing {

/// A fixed-sized window that keeps track of the most recent heartbeat arrival intervals.
#[derive(Debug)]
struct SamplingWindow {
pub(crate) struct SamplingWindow {
/// The set of collected intervals.
intervals: BoundedArrayStats,
/// Last heartbeat reported time.
Expand Down
27 changes: 24 additions & 3 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,15 @@ impl Chitchat {
if node_state.max_version() >= max_version {
return;
}
if node_state.max_version() == 0 {
self.failure_detector.report_heartbeat(chitchat_id);
}

// We make sure that the node is listed in the failure detector,
// so that we won't forget to GC the state.
//
// We don't report the heartbeat however, to make sure that we
// avoid identifying resetted node as alive.
self.failure_detector
.get_or_create_sampling_window(chitchat_id);

// We don't want to call listeners for keys that are already up to date so we must do this
// dance instead of clearing the node state and then setting the new values.
let mut previous_keys: HashSet<String> = node_state
Expand Down Expand Up @@ -557,6 +563,21 @@ mod tests {
assert_nodes_sync(&[&node1, &node2]);
}

#[test]
fn test_chitchat_dead_node_liveness() {
let node_config1 = ChitchatConfig::for_test(10_001);
let empty_seeds = watch::channel(Default::default()).1;
let mut node1 =
Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone(), Vec::new());
let chitchat_id = ChitchatId::for_local_test(10u16);
node1.reset_node_state(&chitchat_id, std::iter::empty(), 10_000, 10u64);
node1.report_heartbeat(&chitchat_id, Heartbeat(10_000u64));
node1.report_heartbeat(&chitchat_id, Heartbeat(10_000u64));
node1.update_nodes_liveness();
let live_nodes: HashSet<&ChitchatId> = node1.live_nodes().collect();
assert_eq!(live_nodes.len(), 1);
}

#[tokio::test]
async fn test_live_node_channel() {
let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE);
Expand Down
8 changes: 8 additions & 0 deletions chitchat/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,16 @@ impl NodeState {

/// Attempts to set the heartbeat of another node.
/// If the value is actually not an update, just ignore the data and return false.
/// As a corner case, the first value is not considered an update.
///
/// Otherwise, returns true.
pub fn try_set_heartbeat(&mut self, heartbeat_new_value: Heartbeat) -> bool {
if self.heartbeat.0 == 0 {
// This is the first heartbeat.
// Let's set it, but we do not consider it as an update.
self.heartbeat = heartbeat_new_value;
return false;
}
if heartbeat_new_value > self.heartbeat {
self.heartbeat = heartbeat_new_value;
true
Expand Down

0 comments on commit 867207e

Please sign in to comment.