Skip to content

Commit 186f606

Browse files
committed
Implement Sync Rate Rule
1 parent 8e414f8 commit 186f606

File tree

4 files changed

+158
-90
lines changed

4 files changed

+158
-90
lines changed

protocol/mining/src/rule_engine.rs

+29-89
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
use std::{
2-
sync::{
3-
atomic::{AtomicBool, Ordering},
4-
Arc,
5-
},
2+
sync::{atomic::AtomicBool, Arc},
63
time::{Duration, Instant},
74
};
85

96
use kaspa_consensus_core::{
107
api::counters::ProcessingCounters,
11-
config::{params::NEW_DIFFICULTY_WINDOW_DURATION, Config},
8+
config::Config,
129
daa_score_timestamp::DaaScoreTimestamp,
1310
mining_rules::MiningRules,
1411
network::NetworkType::{Mainnet, Testnet},
@@ -20,14 +17,14 @@ use kaspa_core::{
2017
tick::{TickReason, TickService},
2118
},
2219
time::unix_now,
23-
trace, warn,
20+
trace,
2421
};
2522
use kaspa_p2p_lib::Hub;
2623

27-
use crate::rules::mining_rule::MiningRule;
24+
use crate::rules::{mining_rule::MiningRule, sync_rate_rule::SyncRateRule, ExtraData};
2825

2926
const RULE_ENGINE: &str = "mining-rule-engine";
30-
const SYNC_RATE_THRESHOLD: f64 = 0.10;
27+
pub const SNAPSHOT_INTERVAL: u64 = 10;
3128

3229
#[derive(Clone)]
3330
pub struct MiningRuleEngine {
@@ -44,13 +41,11 @@ pub struct MiningRuleEngine {
4441

4542
impl MiningRuleEngine {
4643
pub async fn worker(self: &Arc<MiningRuleEngine>) {
47-
println!(module_path!());
48-
let snapshot_interval = 10;
4944
let mut last_snapshot = self.processing_counters.snapshot();
5045
let mut last_log_time = Instant::now();
5146
loop {
5247
// START: Sync monitor
53-
if let TickReason::Shutdown = self.tick_service.tick(Duration::from_secs(snapshot_interval)).await {
48+
if let TickReason::Shutdown = self.tick_service.tick(Duration::from_secs(SNAPSHOT_INTERVAL)).await {
5449
// Let the system print final logs before exiting
5550
tokio::time::sleep(Duration::from_millis(500)).await;
5651
break;
@@ -70,60 +65,24 @@ impl MiningRuleEngine {
7065
if elapsed_time.as_secs() > 0 {
7166
let session = self.consensus_manager.consensus().unguarded_session();
7267
let sink_daa_timestamp = session.async_get_sink_daa_score_timestamp().await;
73-
let expected_blocks =
74-
(elapsed_time.as_millis() as u64) / self.config.target_time_per_block().get(sink_daa_timestamp.daa_score);
75-
let received_blocks = delta.body_counts.max(delta.header_counts);
76-
let rate: f64 = (received_blocks as f64) / (expected_blocks as f64);
7768

7869
let finality_point = session.async_finality_point().await;
7970
let finality_point_timestamp = session.async_get_header(finality_point).await.unwrap().timestamp;
80-
// Finality point is considered "recent" if it is within 3 finality durations from the current time
81-
let is_finality_recent = finality_point_timestamp
82-
>= unix_now()
83-
.saturating_sub(self.config.finality_duration_in_milliseconds().get(sink_daa_timestamp.daa_score) * 3);
84-
85-
trace!(
86-
"Sync rate: {:.2} | Finality point recent: {} | Elapsed time: {}s | Connected: {} | Found/Expected blocks: {}/{}",
87-
rate,
88-
is_finality_recent,
89-
elapsed_time.as_secs(),
90-
delta.body_counts,
91-
self.has_sufficient_peer_connectivity(),
92-
expected_blocks,
93-
);
94-
95-
if is_finality_recent && rate < SYNC_RATE_THRESHOLD {
96-
// if sync rate rule conditions are met:
97-
if let Ok(false) = self.use_sync_rate_rule.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) {
98-
warn!("Sync rate {:.2} is below threshold: {}", rate, SYNC_RATE_THRESHOLD);
99-
}
100-
} else {
101-
// else when sync rate conditions are not met:
102-
if let Ok(true) = self.use_sync_rate_rule.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) {
103-
if !is_finality_recent {
104-
warn!("Sync rate {:.2} recovered: {} by entering IBD", rate, SYNC_RATE_THRESHOLD);
105-
} else {
106-
warn!("Sync rate {:.2} recovered: {}", rate, SYNC_RATE_THRESHOLD);
107-
}
108-
} else if !is_finality_recent {
109-
trace!("Finality period is old. Timestamp: {}. Sync rate: {:.2}", finality_point_timestamp, rate);
110-
}
111-
}
11271

113-
// END - Sync monitor
72+
let extra_data = ExtraData {
73+
finality_point_timestamp,
74+
target_time_per_block: self.config.target_time_per_block().get(sink_daa_timestamp.daa_score),
75+
has_sufficient_peer_connectivity: self.has_sufficient_peer_connectivity(),
76+
finality_duration: self.config.finality_duration_in_milliseconds().get(sink_daa_timestamp.daa_score),
77+
elapsed_time,
78+
};
11479

115-
// START - Rule Engine
11680
trace!("Current Mining Rule: {:?}", self.mining_rules);
11781

118-
// Blue Parents Only Check:
82+
// Check for all the rules
11983
for rule in &self.rules {
120-
rule.check_rule(&snapshot);
84+
rule.check_rule(&delta, &extra_data);
12185
}
122-
123-
// No Transactions Check:
124-
// TODO: implement this part
125-
126-
// End - Rule Engine
12786
}
12887

12988
last_snapshot = snapshot;
@@ -139,18 +98,10 @@ impl MiningRuleEngine {
13998
hub: Hub,
14099
mining_rules: Arc<MiningRules>,
141100
) -> Self {
142-
let rules: Vec<Arc<(dyn MiningRule + 'static)>> = vec![];
143-
144-
Self {
145-
consensus_manager,
146-
config,
147-
processing_counters,
148-
tick_service,
149-
hub,
150-
use_sync_rate_rule: Arc::new(AtomicBool::new(false)),
151-
mining_rules,
152-
rules,
153-
}
101+
let use_sync_rate_rule = Arc::new(AtomicBool::new(false));
102+
let rules: Vec<Arc<(dyn MiningRule + 'static)>> = vec![Arc::new(SyncRateRule::new(use_sync_rate_rule.clone()))];
103+
104+
Self { consensus_manager, config, processing_counters, tick_service, hub, use_sync_rate_rule, mining_rules, rules }
154105
}
155106

156107
pub fn should_mine(&self, sink_daa_score_timestamp: DaaScoreTimestamp) -> bool {
@@ -169,27 +120,16 @@ impl MiningRuleEngine {
169120
pub fn is_nearly_synced(&self, sink_daa_score_timestamp: DaaScoreTimestamp) -> bool {
170121
let sink_timestamp = sink_daa_score_timestamp.timestamp;
171122

172-
if self.config.net.is_mainnet() {
173-
// We consider the node close to being synced if the sink (virtual selected parent) block
174-
// timestamp is within DAA window duration far in the past. Blocks mined over such DAG state would
175-
// enter the DAA window of fully-synced nodes and thus contribute to overall network difficulty
176-
//
177-
// [Crescendo]: both durations are nearly equal so this decision is negligible
178-
unix_now()
179-
< sink_timestamp
180-
+ self.config.expected_difficulty_window_duration_in_milliseconds().get(sink_daa_score_timestamp.daa_score)
181-
} else {
182-
// For testnets we consider the node to be synced if the sink timestamp is within a time range which
183-
// is overwhelmingly unlikely to pass without mined blocks even if net hashrate decreased dramatically.
184-
//
185-
// This period is smaller than the above mainnet calculation in order to ensure that an IBDing miner
186-
// with significant testnet hashrate does not overwhelm the network with deep side-DAGs.
187-
//
188-
// We use DAA duration as baseline and scale it down with BPS (and divide by 3 for mining only when very close to current time on 10BPS testnets)
189-
let max_expected_duration_without_blocks_in_milliseconds =
190-
self.config.prior_target_time_per_block * NEW_DIFFICULTY_WINDOW_DURATION / 3; // = DAA duration in milliseconds / bps / 3
191-
unix_now() < sink_timestamp + max_expected_duration_without_blocks_in_milliseconds
192-
}
123+
// We consider the node close to being synced if the sink (virtual selected parent) block
124+
// timestamp is within a quarter of the DAA window duration far in the past. Blocks mined over such DAG state would
125+
// enter the DAA window of fully-synced nodes and thus contribute to overall network difficulty
126+
//
127+
// [Crescendo]: both durations are nearly equal so this decision is negligible
128+
let synced_threshold =
129+
self.config.expected_difficulty_window_duration_in_milliseconds().get(sink_daa_score_timestamp.daa_score) / 4;
130+
131+
// Roughly 10mins in all networks
132+
unix_now() < sink_timestamp + synced_threshold
193133
}
194134

195135
fn has_sufficient_peer_connectivity(&self) -> bool {
+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use kaspa_consensus_core::api::counters::ProcessingCountersSnapshot;
22

3+
use super::ExtraData;
4+
35
pub trait MiningRule: Send + Sync + 'static {
4-
fn check_rule(&self, delta: &ProcessingCountersSnapshot);
6+
fn check_rule(&self, delta: &ProcessingCountersSnapshot, extra_data: &ExtraData);
57
}

protocol/mining/src/rules/mod.rs

+12
Original file line numberDiff line numberDiff line change
@@ -1 +1,13 @@
1+
use std::time::Duration;
2+
3+
pub mod sync_rate_rule;
4+
15
pub mod mining_rule;
6+
7+
pub struct ExtraData {
8+
pub finality_point_timestamp: u64,
9+
pub target_time_per_block: u64,
10+
pub has_sufficient_peer_connectivity: bool,
11+
pub finality_duration: u64,
12+
pub elapsed_time: Duration,
13+
}
+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
use std::{
2+
collections::VecDeque,
3+
sync::{
4+
atomic::{AtomicBool, AtomicU64, Ordering},
5+
Arc, RwLock,
6+
},
7+
};
8+
9+
use kaspa_consensus_core::api::counters::ProcessingCountersSnapshot;
10+
use kaspa_core::{time::unix_now, trace, warn};
11+
12+
use crate::rule_engine::SNAPSHOT_INTERVAL;
13+
14+
use super::{mining_rule::MiningRule, ExtraData};
15+
16+
// within a 5 minute period, we expect sync rate
17+
const SYNC_RATE_THRESHOLD: f64 = 0.90;
18+
// number of samples you expect in a 5 minute interval, sampled every 10s
19+
const SYNC_RATE_WINDOW_MAX_SIZE: usize = 5 * 60 / (SNAPSHOT_INTERVAL as usize);
20+
// number of samples required before considering this rule. This allows using the sync rate rule
21+
// even before the full window size is reached. Represents the number of samples in 1 minute
22+
const SYNC_RATE_WINDOW_MIN_THRESHOLD: usize = 60 / (SNAPSHOT_INTERVAL as usize);
23+
24+
pub struct SyncRateRule {
25+
pub use_sync_rate_rule: Arc<AtomicBool>,
26+
sync_rate_samples: RwLock<VecDeque<(u64, u64)>>,
27+
total_expected_blocks: AtomicU64,
28+
total_received_blocks: AtomicU64,
29+
}
30+
31+
impl SyncRateRule {
32+
pub fn new(use_sync_rate_rule: Arc<AtomicBool>) -> Self {
33+
Self {
34+
use_sync_rate_rule,
35+
sync_rate_samples: RwLock::new(VecDeque::new()),
36+
total_expected_blocks: AtomicU64::new(0),
37+
total_received_blocks: AtomicU64::new(0),
38+
}
39+
}
40+
41+
/// Adds current observation of received and expected blocks to the sample window, and removes
42+
/// old samples. Returns true if there are enough samples in the window to start triggering the
43+
/// sync rate rule.
44+
fn update_sync_rate_window(&self, received_blocks: u64, expected_blocks: u64) -> bool {
45+
self.total_received_blocks.fetch_add(received_blocks, Ordering::SeqCst);
46+
self.total_expected_blocks.fetch_add(expected_blocks, Ordering::SeqCst);
47+
48+
let mut samples = self.sync_rate_samples.write().unwrap();
49+
50+
samples.push_back((received_blocks, expected_blocks));
51+
52+
// Remove old samples. Usually is a single op after the window is full per 10s:
53+
while samples.len() > SYNC_RATE_WINDOW_MAX_SIZE {
54+
let (old_received_blocks, old_expected_blocks) = samples.pop_front().unwrap();
55+
self.total_received_blocks.fetch_sub(old_received_blocks, Ordering::SeqCst);
56+
self.total_expected_blocks.fetch_sub(old_expected_blocks, Ordering::SeqCst);
57+
}
58+
59+
samples.len() >= SYNC_RATE_WINDOW_MIN_THRESHOLD
60+
}
61+
}
62+
63+
/// SyncRateRule
64+
/// Allow mining even if the node is "not nearly synced" if the sync rate is below threshold
65+
/// and the finality point is recent. This is to prevent the network from undermining and to allow
66+
/// the network to automatically recover from any short-term mining halt.
67+
///
68+
/// Trigger: Sync rate is below threshold and finality point is recent
69+
/// Recovery: Sync rate is back above threshold
70+
impl MiningRule for SyncRateRule {
71+
fn check_rule(&self, delta: &ProcessingCountersSnapshot, extra_data: &ExtraData) {
72+
let expected_blocks = (extra_data.elapsed_time.as_millis() as u64) / extra_data.target_time_per_block;
73+
let received_blocks = delta.body_counts.max(delta.header_counts);
74+
75+
if !self.update_sync_rate_window(received_blocks, expected_blocks) {
76+
// Don't process the sync rule if the window doesn't have enough samples to filter out noise
77+
return;
78+
}
79+
80+
let rate: f64 =
81+
(self.total_received_blocks.load(Ordering::SeqCst) as f64) / (self.total_expected_blocks.load(Ordering::SeqCst) as f64);
82+
83+
// Finality point is considered "recent" if it is within 3 finality durations from the current time
84+
let is_finality_recent = extra_data.finality_point_timestamp >= unix_now().saturating_sub(extra_data.finality_duration * 3);
85+
86+
trace!(
87+
"Sync rate: {:.2} | Finality point recent: {} | Elapsed time: {}s | Connected: {} | Found/Expected blocks: {}/{}",
88+
rate,
89+
is_finality_recent,
90+
extra_data.elapsed_time.as_secs(),
91+
extra_data.has_sufficient_peer_connectivity,
92+
delta.body_counts,
93+
expected_blocks,
94+
);
95+
96+
if is_finality_recent && rate < SYNC_RATE_THRESHOLD {
97+
// if sync rate rule conditions are met:
98+
if let Ok(false) = self.use_sync_rate_rule.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) {
99+
warn!("Sync rate {:.2} is below threshold: {}", rate, SYNC_RATE_THRESHOLD);
100+
}
101+
} else {
102+
// else when sync rate conditions are not met:
103+
if let Ok(true) = self.use_sync_rate_rule.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) {
104+
if !is_finality_recent {
105+
warn!("Sync rate {:.2} recovered: {} by entering IBD", rate, SYNC_RATE_THRESHOLD);
106+
} else {
107+
warn!("Sync rate {:.2} recovered: {}", rate, SYNC_RATE_THRESHOLD);
108+
}
109+
} else if !is_finality_recent {
110+
trace!("Finality period is old. Timestamp: {}. Sync rate: {:.2}", extra_data.finality_point_timestamp, rate);
111+
}
112+
}
113+
}
114+
}

0 commit comments

Comments
 (0)