Skip to content

kad: Implement put_record_to and try_put_record_to #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 19, 2024
47 changes: 45 additions & 2 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ pub(crate) enum KademliaCommand {

/// Query ID for the query.
query_id: QueryId,

/// Use the following peers for the put request.
peers: Option<Vec<PeerId>>,
},

/// Get record from DHT.
Expand Down Expand Up @@ -202,7 +205,29 @@ impl KademliaHandle {
/// Store record to DHT.
pub async fn put_record(&mut self, record: Record) -> QueryId {
let query_id = self.next_query_id();
let _ = self.cmd_tx.send(KademliaCommand::PutRecord { record, query_id }).await;
let _ = self
.cmd_tx
.send(KademliaCommand::PutRecord {
record,
query_id,
peers: None,
})
.await;

query_id
}

/// Store record to DHT to the given peers.
pub async fn put_record_to(&mut self, record: Record, peers: Vec<PeerId>) -> QueryId {
let query_id = self.next_query_id();
let _ = self
.cmd_tx
.send(KademliaCommand::PutRecord {
record,
query_id,
peers: Some(peers),
})
.await;

query_id
}
Expand Down Expand Up @@ -242,7 +267,25 @@ impl KademliaHandle {
pub fn try_put_record(&mut self, record: Record) -> Result<QueryId, ()> {
let query_id = self.next_query_id();
self.cmd_tx
.try_send(KademliaCommand::PutRecord { record, query_id })
.try_send(KademliaCommand::PutRecord {
record,
query_id,
peers: None,
})
.map(|_| query_id)
.map_err(|_| ())
}

/// Try to initiate `PUT_VALUE` query to the given peers and if the channel is clogged,
/// return an error.
pub fn try_put_record_to(&mut self, record: Record, peers: Vec<PeerId>) -> Result<QueryId, ()> {
let query_id = self.next_query_id();
self.cmd_tx
.try_send(KademliaCommand::PutRecord {
record,
query_id,
peers: Some(peers),
})
.map(|_| query_id)
.map_err(|_| ())
}
Expand Down
35 changes: 27 additions & 8 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use futures::StreamExt;
use multiaddr::Multiaddr;
use tokio::sync::mpsc::{Receiver, Sender};

use std::collections::{hash_map::Entry, HashMap};
use std::collections::{hash_map::Entry, HashMap, VecDeque};

pub use config::{Config, ConfigBuilder};
pub use handle::{KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode};
Expand Down Expand Up @@ -738,17 +738,36 @@ impl Kademlia {
self.routing_table.closest(Key::from(peer), self.replication_factor).into()
);
}
Some(KademliaCommand::PutRecord { record, query_id }) => {
Some(KademliaCommand::PutRecord { record, query_id, peers }) => {
tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT");

self.store.put(record.clone());
let key = Key::new(record.key.clone());

self.engine.start_put_record(
query_id,
record,
self.routing_table.closest(key, self.replication_factor).into(),
);

if let Some(peers) = peers {
// Put the record to the specified peers.
let peers: VecDeque<_> = peers.into_iter().filter_map(|peer| {
if let KBucketEntry::Occupied(entry) = self.routing_table.entry(Key::from(peer)) {
Some(entry.clone())
} else {
None
}
}).collect();

self.engine.start_put_record(
query_id,
record,
peers
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going once again through start_put_record() implementation and FindNodeContext query used under the hood, this seems to be doing a slightly different thing that what's needed by authority-discovery. Seeding peers with some specific candidates would result in QueryType::PutRecord query initialized by that peers, but if FindNode collects more than 20 peers that are closer to the target than the original "seed" peers, the "seed" peers would end up not updated.

To update specific peers, we can go two routes:

  1. Update only peers if they are already in the routing table. For this, we just need to send KademliaMessage::put_value directly to peers from the routing table. This should be enough for authority-discivery needs, as the peers should be already in the routing table.
  2. Update peers, even if they are not in the routing table. Basically, do FindNode query for the needed peers first, followed by sending KademliaMessage::put_value to the discovered addresses. We likely need this to conform to libp2p put_record_to implementation, but can do in a follow-up.

} else {
self.store.put(record.clone());

self.engine.start_put_record(
query_id,
record,
self.routing_table.closest(key, self.replication_factor).into(),
);
}
}
Some(KademliaCommand::GetRecord { key, quorum, query_id }) => {
tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT");
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/libp2p/kademlia/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl RoutingTable {
}
}

/// Get `limit` closests peers to `target` from the k-buckets.
/// Get `limit` closest peers to `target` from the k-buckets.
pub fn closest<K: Clone>(&mut self, target: Key<K>, limit: usize) -> Vec<KademliaPeer> {
ClosestBucketsIter::new(self.local_key.distance(&target))
.map(|index| self.buckets[index.get()].closest_iter(&target))
Expand Down