Skip to content

kad: Expose the peer that provided the kad record #76

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
protocol::libp2p::kademlia::{QueryId, Record, RecordKey},
protocol::libp2p::kademlia::{PeerRecord, QueryId, Record, RecordKey},
PeerId,
};

Expand Down Expand Up @@ -137,7 +137,7 @@ pub enum KademliaEvent {
query_id: QueryId,

/// Found record.
record: Record,
record: PeerRecord,
},

/// `PUT_VALUE` query succeeded.
Expand Down
6 changes: 3 additions & 3 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use std::collections::{hash_map::Entry, HashMap};
pub use config::{Config, ConfigBuilder};
pub use handle::{KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode};
pub use query::QueryId;
pub use record::{Key as RecordKey, Record};
pub use record::{Key as RecordKey, PeerRecord, Record};

/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia";
Expand Down Expand Up @@ -639,7 +639,7 @@ impl Kademlia {
Ok(())
}
QueryAction::GetRecordQueryDone { query_id, record } => {
self.store.put(record.clone());
self.store.put(record.record.clone());

let _ =
self.event_tx.send(KademliaEvent::GetRecordSuccess { query_id, record }).await;
Expand Down Expand Up @@ -757,7 +757,7 @@ impl Kademlia {
(Some(record), Quorum::One) => {
let _ = self
.event_tx
.send(KademliaEvent::GetRecordSuccess { query_id, record: record.clone() })
.send(KademliaEvent::GetRecordSuccess { query_id, record: PeerRecord { record: record.clone(), peer: None } })
.await;
}
(record, _) => {
Expand Down
11 changes: 7 additions & 4 deletions src/protocol/libp2p/kademlia/query/get_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
protocol::libp2p::kademlia::{
message::KademliaMessage,
query::{QueryAction, QueryId},
record::{Key as RecordKey, Record},
record::{Key as RecordKey, PeerRecord, Record},
types::{Distance, KademliaPeer, Key},
Quorum,
},
Expand Down Expand Up @@ -66,7 +66,7 @@ pub struct GetRecordContext {
pub candidates: BTreeMap<Distance, KademliaPeer>,

/// Found records.
pub found_records: Vec<Record>,
pub found_records: Vec<PeerRecord>,

/// Replication factor.
pub replication_factor: usize,
Expand Down Expand Up @@ -110,7 +110,7 @@ impl GetRecordContext {
}

/// Get the found record.
pub fn found_record(mut self) -> Record {
pub fn found_record(mut self) -> PeerRecord {
self.found_records.pop().expect("record to exist since query succeeded")
}

Expand Down Expand Up @@ -138,7 +138,10 @@ impl GetRecordContext {

// TODO: validate record
if let Some(record) = record {
self.found_records.push(record);
self.found_records.push(PeerRecord {
record,
peer: Some(peer.peer),
});
}

// add the queried peer to `queried` and all new peers which haven't been
Expand Down
55 changes: 49 additions & 6 deletions src/protocol/libp2p/kademlia/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
protocol::libp2p::kademlia::{
message::KademliaMessage,
query::{find_node::FindNodeContext, get_record::GetRecordContext},
record::{Key as RecordKey, Record},
record::{Key as RecordKey, PeerRecord, Record},
types::{KademliaPeer, Key},
Quorum,
},
Expand Down Expand Up @@ -97,7 +97,7 @@ pub enum QueryAction {
peers: Vec<KademliaPeer>,
},

/// Store the record to nodest closest to target key.
/// Store the record to nodes closest to target key.
// TODO: horrible name
PutRecordToFoundNodes {
/// Target peer.
Expand All @@ -113,7 +113,7 @@ pub enum QueryAction {
query_id: QueryId,

/// Found record.
record: Record,
record: PeerRecord,
},

// TODO: remove
Expand Down Expand Up @@ -571,7 +571,7 @@ mod tests {
let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize);
let record_key = RecordKey::new(&vec![1, 2, 3, 4]);
let target_key = Key::new(record_key.clone());
let original_record = Record::new(record_key, vec![1, 3, 3, 7, 1, 3, 3, 8]);
let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]);

let distances = {
let mut distances = std::collections::BTreeMap::new();
Expand Down Expand Up @@ -651,15 +651,58 @@ mod tests {
}
}

match engine.next_action() {
let peers = match engine.next_action() {
Some(QueryAction::PutRecordToFoundNodes { peers, record }) => {
assert_eq!(peers.len(), 4);
assert_eq!(record.key, original_record.key);
assert_eq!(record.value, original_record.value);
peers
}
_ => panic!("invalid event received"),
}
};

assert!(engine.next_action().is_none());

// get records from those peers.
let _query = engine.start_get_record(
QueryId(1341),
record_key.clone(),
vec![
KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected),
KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected),
KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected),
KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected),
]
.into(),
Quorum::All,
3,
);

for _ in 0..4 {
match engine.next_action() {
Some(QueryAction::SendMessage { query, peer, .. }) => {
engine.register_response(
query,
peer,
KademliaMessage::GetRecord {
record: Some(original_record.clone()),
peers: vec![],
key: Some(record_key.clone()),
},
);
}
_ => panic!("invalid event received"),
}
}

let peers: std::collections::HashSet<_> = peers.into_iter().map(|p| p.peer).collect();
match engine.next_action() {
Some(QueryAction::GetRecordQueryDone { record, .. }) => {
assert!(peers.contains(&record.peer.expect("Peer Id must be provided")));
assert_eq!(record.record.key, original_record.key);
assert_eq!(record.record.value, original_record.value);
}
_ => panic!("invalid event received"),
}
}
}
11 changes: 11 additions & 0 deletions src/protocol/libp2p/kademlia/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,14 @@ impl Record {
self.expires.map_or(false, |t| now >= t)
}
}

/// A record either received by the given peer or retrieved from the local
/// record store.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerRecord {
/// The peer from whom the record was received. `None` if the record was
/// retrieved from local storage.
pub peer: Option<PeerId>,

pub record: Record,
}