Skip to content

Commit 4f2d63b

Browse files
committed
Add p2p v7, and make other versions obsolete one day before crescendo
1 parent 47abc36 commit 4f2d63b

File tree

3 files changed

+176
-10
lines changed

3 files changed

+176
-10
lines changed

protocol/flows/src/flow_context.rs

+27-10
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1-
use crate::flowcontext::{
2-
orphans::{OrphanBlocksPool, OrphanOutput},
3-
process_queue::ProcessQueue,
4-
transactions::TransactionsSpread,
1+
use crate::{
2+
flowcontext::{
3+
orphans::{OrphanBlocksPool, OrphanOutput},
4+
process_queue::ProcessQueue,
5+
transactions::TransactionsSpread,
6+
},
7+
v7,
58
};
69
use crate::{v5, v6};
710
use async_trait::async_trait;
@@ -58,8 +61,8 @@ use tokio::sync::{
5861
use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt};
5962
use uuid::Uuid;
6063

61-
/// The P2P protocol version. Currently the only one supported.
62-
const PROTOCOL_VERSION: u32 = 6;
64+
/// The P2P protocol version.
65+
const PROTOCOL_VERSION: u32 = 7;
6366

6467
/// See `check_orphan_resolution_range`
6568
const BASELINE_ORPHAN_RESOLUTION_RANGE: u32 = 5;
@@ -776,10 +779,24 @@ impl ConnectionInitializer for FlowContext {
776779
debug!("protocol versions - self: {}, peer: {}", PROTOCOL_VERSION, peer_version.protocol_version);
777780

778781
// Register all flows according to version
779-
let (flows, applied_protocol_version) = match peer_version.protocol_version {
780-
v if v >= PROTOCOL_VERSION => (v6::register(self.clone(), router.clone()), PROTOCOL_VERSION),
781-
5 => (v5::register(self.clone(), router.clone()), 5),
782-
v => return Err(ProtocolError::VersionMismatch(PROTOCOL_VERSION, v)),
782+
const CONNECT_ONLY_NEW_VERSIONS_THRESHOLD_MILLIS: u64 = 24 * 3600 * 3600 * 1000;
783+
let threshold = CONNECT_ONLY_NEW_VERSIONS_THRESHOLD_MILLIS / self.config.target_time_per_block().before();
784+
let sink_daa_score = self.consensus().unguarded_session().async_get_sink_daa_score_timestamp().await.daa_score;
785+
let connect_only_new_versions = self.config.crescendo_activation.is_active(sink_daa_score)
786+
|| self.config.crescendo_activation.is_within_range_before_activation(sink_daa_score, threshold).is_some();
787+
788+
let (flows, applied_protocol_version) = if connect_only_new_versions {
789+
match peer_version.protocol_version {
790+
v if v >= PROTOCOL_VERSION => (v7::register(self.clone(), router.clone()), PROTOCOL_VERSION),
791+
v => return Err(ProtocolError::VersionMismatch(PROTOCOL_VERSION, v)),
792+
}
793+
} else {
794+
match peer_version.protocol_version {
795+
v if v >= PROTOCOL_VERSION => (v7::register(self.clone(), router.clone()), PROTOCOL_VERSION),
796+
6 => (v6::register(self.clone(), router.clone()), PROTOCOL_VERSION),
797+
5 => (v5::register(self.clone(), router.clone()), 5),
798+
v => return Err(ProtocolError::VersionMismatch(PROTOCOL_VERSION, v)),
799+
}
783800
};
784801

785802
// Build and register the peer properties

protocol/flows/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ pub mod flowcontext;
44
pub mod service;
55
pub mod v5;
66
pub mod v6;
7+
pub mod v7;

protocol/flows/src/v7/mod.rs

+148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
use crate::v5::{
2+
address::{ReceiveAddressesFlow, SendAddressesFlow},
3+
blockrelay::{flow::HandleRelayInvsFlow, handle_requests::HandleRelayBlockRequests},
4+
ibd::IbdFlow,
5+
ping::{ReceivePingsFlow, SendPingsFlow},
6+
request_antipast::HandleAntipastRequests,
7+
request_block_locator::RequestBlockLocatorFlow,
8+
request_headers::RequestHeadersFlow,
9+
request_ibd_blocks::HandleIbdBlockRequests,
10+
request_ibd_chain_block_locator::RequestIbdChainBlockLocatorFlow,
11+
request_pp_proof::RequestPruningPointProofFlow,
12+
request_pruning_point_utxo_set::RequestPruningPointUtxoSetFlow,
13+
txrelay::flow::{RelayTransactionsFlow, RequestTransactionsFlow},
14+
};
15+
use crate::{flow_context::FlowContext, flow_trait::Flow};
16+
17+
use kaspa_p2p_lib::{KaspadMessagePayloadType, Router, SharedIncomingRoute};
18+
use kaspa_utils::channel;
19+
use std::sync::Arc;
20+
21+
use crate::v6::request_pruning_point_and_anticone::PruningPointAndItsAnticoneRequestsFlow;
22+
23+
pub fn register(ctx: FlowContext, router: Arc<Router>) -> Vec<Box<dyn Flow>> {
24+
// IBD flow <-> invs flow communication uses a job channel in order to always
25+
// maintain at most a single pending job which can be updated
26+
let (ibd_sender, relay_receiver) = channel::job();
27+
28+
let mut flows: Vec<Box<dyn Flow>> = vec![
29+
Box::new(IbdFlow::new(
30+
ctx.clone(),
31+
router.clone(),
32+
router.subscribe(vec![
33+
KaspadMessagePayloadType::BlockHeaders,
34+
KaspadMessagePayloadType::DoneHeaders,
35+
KaspadMessagePayloadType::IbdBlockLocatorHighestHash,
36+
KaspadMessagePayloadType::IbdBlockLocatorHighestHashNotFound,
37+
KaspadMessagePayloadType::BlockWithTrustedDataV4,
38+
KaspadMessagePayloadType::DoneBlocksWithTrustedData,
39+
KaspadMessagePayloadType::IbdChainBlockLocator,
40+
KaspadMessagePayloadType::IbdBlock,
41+
KaspadMessagePayloadType::TrustedData,
42+
KaspadMessagePayloadType::PruningPoints,
43+
KaspadMessagePayloadType::PruningPointProof,
44+
KaspadMessagePayloadType::UnexpectedPruningPoint,
45+
KaspadMessagePayloadType::PruningPointUtxoSetChunk,
46+
KaspadMessagePayloadType::DonePruningPointUtxoSetChunks,
47+
]),
48+
relay_receiver,
49+
)),
50+
Box::new(HandleRelayBlockRequests::new(
51+
ctx.clone(),
52+
router.clone(),
53+
router.subscribe(vec![KaspadMessagePayloadType::RequestRelayBlocks]),
54+
)),
55+
Box::new(ReceivePingsFlow::new(ctx.clone(), router.clone(), router.subscribe(vec![KaspadMessagePayloadType::Ping]))),
56+
Box::new(SendPingsFlow::new(ctx.clone(), router.clone(), router.subscribe(vec![KaspadMessagePayloadType::Pong]))),
57+
Box::new(RequestHeadersFlow::new(
58+
ctx.clone(),
59+
router.clone(),
60+
router.subscribe(vec![KaspadMessagePayloadType::RequestHeaders, KaspadMessagePayloadType::RequestNextHeaders]),
61+
)),
62+
Box::new(RequestPruningPointProofFlow::new(
63+
ctx.clone(),
64+
router.clone(),
65+
router.subscribe(vec![KaspadMessagePayloadType::RequestPruningPointProof]),
66+
)),
67+
Box::new(RequestIbdChainBlockLocatorFlow::new(
68+
ctx.clone(),
69+
router.clone(),
70+
router.subscribe(vec![KaspadMessagePayloadType::RequestIbdChainBlockLocator]),
71+
)),
72+
Box::new(PruningPointAndItsAnticoneRequestsFlow::new(
73+
ctx.clone(),
74+
router.clone(),
75+
router.subscribe(vec![
76+
KaspadMessagePayloadType::RequestPruningPointAndItsAnticone,
77+
KaspadMessagePayloadType::RequestNextPruningPointAndItsAnticoneBlocks,
78+
]),
79+
)),
80+
Box::new(RequestPruningPointUtxoSetFlow::new(
81+
ctx.clone(),
82+
router.clone(),
83+
router.subscribe(vec![
84+
KaspadMessagePayloadType::RequestPruningPointUtxoSet,
85+
KaspadMessagePayloadType::RequestNextPruningPointUtxoSetChunk,
86+
]),
87+
)),
88+
Box::new(HandleIbdBlockRequests::new(
89+
ctx.clone(),
90+
router.clone(),
91+
router.subscribe(vec![KaspadMessagePayloadType::RequestIbdBlocks]),
92+
)),
93+
Box::new(HandleAntipastRequests::new(
94+
ctx.clone(),
95+
router.clone(),
96+
router.subscribe(vec![KaspadMessagePayloadType::RequestAntipast]),
97+
)),
98+
Box::new(RelayTransactionsFlow::new(
99+
ctx.clone(),
100+
router.clone(),
101+
router
102+
.subscribe_with_capacity(vec![KaspadMessagePayloadType::InvTransactions], RelayTransactionsFlow::invs_channel_size()),
103+
router.subscribe_with_capacity(
104+
vec![KaspadMessagePayloadType::Transaction, KaspadMessagePayloadType::TransactionNotFound],
105+
RelayTransactionsFlow::txs_channel_size(),
106+
),
107+
)),
108+
Box::new(RequestTransactionsFlow::new(
109+
ctx.clone(),
110+
router.clone(),
111+
router.subscribe(vec![KaspadMessagePayloadType::RequestTransactions]),
112+
)),
113+
Box::new(ReceiveAddressesFlow::new(ctx.clone(), router.clone(), router.subscribe(vec![KaspadMessagePayloadType::Addresses]))),
114+
Box::new(SendAddressesFlow::new(
115+
ctx.clone(),
116+
router.clone(),
117+
router.subscribe(vec![KaspadMessagePayloadType::RequestAddresses]),
118+
)),
119+
Box::new(RequestBlockLocatorFlow::new(
120+
ctx.clone(),
121+
router.clone(),
122+
router.subscribe(vec![KaspadMessagePayloadType::RequestBlockLocator]),
123+
)),
124+
];
125+
126+
let invs_route = router.subscribe_with_capacity(vec![KaspadMessagePayloadType::InvRelayBlock], ctx.block_invs_channel_size());
127+
let shared_invs_route = SharedIncomingRoute::new(invs_route);
128+
129+
let num_relay_flows = (ctx.config.bps().upper_bound() as usize / 2).max(1);
130+
flows.extend((0..num_relay_flows).map(|_| {
131+
Box::new(HandleRelayInvsFlow::new(
132+
ctx.clone(),
133+
router.clone(),
134+
shared_invs_route.clone(),
135+
router.subscribe(vec![]),
136+
ibd_sender.clone(),
137+
)) as Box<dyn Flow>
138+
}));
139+
140+
// The reject message is handled as a special case by the router
141+
// KaspadMessagePayloadType::Reject,
142+
143+
// We do not register the below two messages since they are deprecated also in go-kaspa
144+
// KaspadMessagePayloadType::BlockWithTrustedData,
145+
// KaspadMessagePayloadType::IbdBlockLocator,
146+
147+
flows
148+
}

0 commit comments

Comments
 (0)