Skip to content

Commit 84267ad

Browse files
lexnvdmitry-markin
andauthored
kad: Implement put_record_to and try_put_record_to
This PR implements the `put_record_to` and `try_put_record_to` to selectively pick peers to update their records. The main use-case from substrate would be the following: - A peer is discovered to have an outdated authority record (needs #76) - Update the record with the latest authority record available (part of this PR) This PR provided peers to the engine if the peers are part of the kBucket. The first step of the discovery in substrate motivates this assumption. We can probably do things a bit more optimally since we know the peers part of the kBucket were discovered previously (or currently connected): - The query starts with a [FindNodeContext](https://github.com/paritytech/litep2p/blob/96e827b54f9f937c6d0489bef6a438b48cf50e58/src/protocol/libp2p/kademlia/query/find_node.rs#L37), which in this case will do a peer discovery as well - We could implement a `PutNodeContext` which circumvents the need to discover the peers and just forwards a kad `PUT_VALUE` to those peers We'd have to double check that with libp2p as well (my brief looking over code points to this direction). To unblock paritytech/polkadot-sdk#3786 we can merge this and then come back with a better / optimal solution for this Builds on top of: #76 cc @paritytech/networking --------- --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> Co-authored-by: Dmitry Markin <dmitry@markin.tech>
1 parent d7fa12b commit 84267ad

File tree

5 files changed

+190
-6
lines changed

5 files changed

+190
-6
lines changed

src/protocol/libp2p/kademlia/handle.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,20 @@ pub(crate) enum KademliaCommand {
9090
query_id: QueryId,
9191
},
9292

93+
/// Store record to DHT to the given peers.
94+
///
95+
/// Similar to [`KademliaCommand::PutRecord`] but allows user to specify the peers.
96+
PutRecordToPeers {
97+
/// Record.
98+
record: Record,
99+
100+
/// Query ID for the query.
101+
query_id: QueryId,
102+
103+
/// Use the following peers for the put request.
104+
peers: Vec<PeerId>,
105+
},
106+
93107
/// Get record from DHT.
94108
GetRecord {
95109
/// Record key.
@@ -207,6 +221,21 @@ impl KademliaHandle {
207221
query_id
208222
}
209223

224+
/// Store record to DHT to the given peers.
225+
pub async fn put_record_to_peers(&mut self, record: Record, peers: Vec<PeerId>) -> QueryId {
226+
let query_id = self.next_query_id();
227+
let _ = self
228+
.cmd_tx
229+
.send(KademliaCommand::PutRecordToPeers {
230+
record,
231+
query_id,
232+
peers,
233+
})
234+
.await;
235+
236+
query_id
237+
}
238+
210239
/// Get record from DHT.
211240
pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> QueryId {
212241
let query_id = self.next_query_id();
@@ -247,6 +276,24 @@ impl KademliaHandle {
247276
.map_err(|_| ())
248277
}
249278

279+
/// Try to initiate `PUT_VALUE` query to the given peers and if the channel is clogged,
280+
/// return an error.
281+
pub fn try_put_record_to_peers(
282+
&mut self,
283+
record: Record,
284+
peers: Vec<PeerId>,
285+
) -> Result<QueryId, ()> {
286+
let query_id = self.next_query_id();
287+
self.cmd_tx
288+
.try_send(KademliaCommand::PutRecordToPeers {
289+
record,
290+
query_id,
291+
peers,
292+
})
293+
.map(|_| query_id)
294+
.map_err(|_| ())
295+
}
296+
250297
/// Try to initiate `GET_VALUE` query and if the channel is clogged, return an error.
251298
pub fn try_get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result<QueryId, ()> {
252299
let query_id = self.next_query_id();

src/protocol/libp2p/kademlia/mod.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -741,15 +741,34 @@ impl Kademlia {
741741
Some(KademliaCommand::PutRecord { record, query_id }) => {
742742
tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT");
743743

744-
self.store.put(record.clone());
745744
let key = Key::new(record.key.clone());
746745

746+
self.store.put(record.clone());
747+
747748
self.engine.start_put_record(
748749
query_id,
749750
record,
750751
self.routing_table.closest(key, self.replication_factor).into(),
751752
);
752753
}
754+
Some(KademliaCommand::PutRecordToPeers { record, query_id, peers }) => {
755+
tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT to specified peers");
756+
757+
// Put the record to the specified peers.
758+
let peers = peers.into_iter().filter_map(|peer| {
759+
match self.routing_table.entry(Key::from(peer)) {
760+
KBucketEntry::Occupied(entry) => Some(entry.clone()),
761+
KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() => Some(entry.clone()),
762+
_ => None,
763+
}
764+
}).collect();
765+
766+
self.engine.start_put_record_to_peers(
767+
query_id,
768+
record,
769+
peers,
770+
);
771+
}
753772
Some(KademliaCommand::GetRecord { key, quorum, query_id }) => {
754773
tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT");
755774

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright 2023 litep2p developers
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use crate::{
22+
protocol::libp2p::kademlia::{
23+
query::{QueryAction, QueryId},
24+
types::KademliaPeer,
25+
},
26+
PeerId,
27+
};
28+
29+
/// Context for multiple `FIND_NODE` queries.
30+
// TODO: implement finding nodes not present in the routing table,
31+
// see https://github.com/paritytech/litep2p/issues/80.
32+
#[derive(Debug)]
33+
pub struct FindManyNodesContext {
34+
/// Query ID.
35+
pub query: QueryId,
36+
37+
/// The peers we are looking for.
38+
pub peers_to_report: Vec<KademliaPeer>,
39+
}
40+
41+
impl FindManyNodesContext {
42+
/// Creates a new [`FindManyNodesContext`].
43+
pub fn new(query: QueryId, peers_to_report: Vec<KademliaPeer>) -> Self {
44+
Self {
45+
query,
46+
peers_to_report,
47+
}
48+
}
49+
50+
/// Register response failure for `peer`.
51+
pub fn register_response_failure(&mut self, _peer: PeerId) {}
52+
53+
/// Register `FIND_NODE` response from `peer`.
54+
pub fn register_response(&mut self, _peer: PeerId, _peers: Vec<KademliaPeer>) {}
55+
56+
/// Get next action for `peer`.
57+
pub fn next_peer_action(&mut self, _peer: &PeerId) -> Option<QueryAction> {
58+
None
59+
}
60+
61+
/// Get next action for a `FIND_NODE` query.
62+
pub fn next_action(&mut self) -> Option<QueryAction> {
63+
return Some(QueryAction::QuerySucceeded { query: self.query });
64+
}
65+
}

src/protocol/libp2p/kademlia/query/mod.rs

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ use bytes::Bytes;
3333

3434
use std::collections::{HashMap, VecDeque};
3535

36+
use self::find_many_nodes::FindManyNodesContext;
37+
38+
mod find_many_nodes;
3639
mod find_node;
3740
mod get_record;
3841

@@ -63,6 +66,15 @@ enum QueryType {
6366
context: FindNodeContext<RecordKey>,
6467
},
6568

69+
/// `PUT_VALUE` query to specified peers.
70+
PutRecordToPeers {
71+
/// Record that needs to be stored.
72+
record: Record,
73+
74+
/// Context for finding peers.
75+
context: FindManyNodesContext,
76+
},
77+
6678
/// `GET_VALUE` query.
6779
GetRecord {
6880
/// Context for the `GET_VALUE` query.
@@ -227,6 +239,32 @@ impl QueryEngine {
227239
query_id
228240
}
229241

242+
/// Start `PUT_VALUE` query to specified peers.
243+
pub fn start_put_record_to_peers(
244+
&mut self,
245+
query_id: QueryId,
246+
record: Record,
247+
peers_to_report: Vec<KademliaPeer>,
248+
) -> QueryId {
249+
tracing::debug!(
250+
target: LOG_TARGET,
251+
?query_id,
252+
target = ?record.key,
253+
num_peers = ?peers_to_report.len(),
254+
"start `PUT_VALUE` query to peers"
255+
);
256+
257+
self.queries.insert(
258+
query_id,
259+
QueryType::PutRecordToPeers {
260+
record,
261+
context: FindManyNodesContext::new(query_id, peers_to_report),
262+
},
263+
);
264+
265+
query_id
266+
}
267+
230268
/// Start `GET_VALUE` query.
231269
pub fn start_get_record(
232270
&mut self,
@@ -280,6 +318,9 @@ impl QueryEngine {
280318
Some(QueryType::PutRecord { context, .. }) => {
281319
context.register_response_failure(peer);
282320
}
321+
Some(QueryType::PutRecordToPeers { context, .. }) => {
322+
context.register_response_failure(peer);
323+
}
283324
Some(QueryType::GetRecord { context }) => {
284325
context.register_response_failure(peer);
285326
}
@@ -307,6 +348,12 @@ impl QueryEngine {
307348
}
308349
_ => unreachable!(),
309350
},
351+
Some(QueryType::PutRecordToPeers { context, .. }) => match message {
352+
KademliaMessage::FindNode { peers, .. } => {
353+
context.register_response(peer, peers);
354+
}
355+
_ => unreachable!(),
356+
},
310357
Some(QueryType::GetRecord { context }) => match message {
311358
KademliaMessage::GetRecord { record, peers, .. } => {
312359
context.register_response(peer, record, peers);
@@ -323,11 +370,12 @@ impl QueryEngine {
323370
match self.queries.get_mut(query) {
324371
None => {
325372
tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
326-
return None;
373+
None
327374
}
328-
Some(QueryType::FindNode { context }) => return context.next_peer_action(peer),
329-
Some(QueryType::PutRecord { context, .. }) => return context.next_peer_action(peer),
330-
Some(QueryType::GetRecord { context }) => return context.next_peer_action(peer),
375+
Some(QueryType::FindNode { context }) => context.next_peer_action(peer),
376+
Some(QueryType::PutRecord { context, .. }) => context.next_peer_action(peer),
377+
Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer),
378+
Some(QueryType::GetRecord { context }) => context.next_peer_action(peer),
331379
}
332380
}
333381

@@ -344,6 +392,10 @@ impl QueryEngine {
344392
record,
345393
peers: context.responses.into_iter().map(|(_, peer)| peer).collect::<Vec<_>>(),
346394
},
395+
QueryType::PutRecordToPeers { record, context } => QueryAction::PutRecordToFoundNodes {
396+
record,
397+
peers: context.peers_to_report,
398+
},
347399
QueryType::GetRecord { context } => QueryAction::GetRecordQueryDone {
348400
query_id: context.query,
349401
record: context.found_record(),
@@ -365,6 +417,7 @@ impl QueryEngine {
365417
let action = match state {
366418
QueryType::FindNode { context } => context.next_action(),
367419
QueryType::PutRecord { context, .. } => context.next_action(),
420+
QueryType::PutRecordToPeers { context, .. } => context.next_action(),
368421
QueryType::GetRecord { context } => context.next_action(),
369422
};
370423

src/protocol/libp2p/kademlia/routing_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ impl RoutingTable {
176176
}
177177
}
178178

179-
/// Get `limit` closests peers to `target` from the k-buckets.
179+
/// Get `limit` closest peers to `target` from the k-buckets.
180180
pub fn closest<K: Clone>(&mut self, target: Key<K>, limit: usize) -> Vec<KademliaPeer> {
181181
ClosestBucketsIter::new(self.local_key.distance(&target))
182182
.map(|index| self.buckets[index.get()].closest_iter(&target))

0 commit comments

Comments
 (0)