diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index 2d8e913c..d2e3988d 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - protocol::libp2p::kademlia::{QueryId, Record, RecordKey}, + protocol::libp2p::kademlia::{PeerRecord, QueryId, Record, RecordKey}, PeerId, }; @@ -137,7 +137,7 @@ pub enum KademliaEvent { query_id: QueryId, /// Found record. - record: Record, + record: PeerRecord, }, /// `PUT_VALUE` query succeeded. diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 662ef0e7..e3cee89c 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -50,7 +50,7 @@ use std::collections::{hash_map::Entry, HashMap}; pub use config::{Config, ConfigBuilder}; pub use handle::{KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode}; pub use query::QueryId; -pub use record::{Key as RecordKey, Record}; +pub use record::{Key as RecordKey, PeerRecord, Record}; /// Logging target for the file. const LOG_TARGET: &str = "litep2p::ipfs::kademlia"; @@ -639,7 +639,7 @@ impl Kademlia { Ok(()) } QueryAction::GetRecordQueryDone { query_id, record } => { - self.store.put(record.clone()); + self.store.put(record.record.clone()); let _ = self.event_tx.send(KademliaEvent::GetRecordSuccess { query_id, record }).await; @@ -757,7 +757,7 @@ impl Kademlia { (Some(record), Quorum::One) => { let _ = self .event_tx - .send(KademliaEvent::GetRecordSuccess { query_id, record: record.clone() }) + .send(KademliaEvent::GetRecordSuccess { query_id, record: PeerRecord { record: record.clone(), peer: None } }) .await; } (record, _) => { diff --git a/src/protocol/libp2p/kademlia/query/get_record.rs b/src/protocol/libp2p/kademlia/query/get_record.rs index eb9d8235..5f16e2b2 100644 --- a/src/protocol/libp2p/kademlia/query/get_record.rs +++ b/src/protocol/libp2p/kademlia/query/get_record.rs @@ -24,7 +24,7 @@ use crate::{ protocol::libp2p::kademlia::{ message::KademliaMessage, query::{QueryAction, QueryId}, - record::{Key as RecordKey, Record}, + record::{Key as RecordKey, PeerRecord, Record}, types::{Distance, KademliaPeer, Key}, Quorum, }, @@ -66,7 +66,7 @@ pub struct GetRecordContext { pub candidates: BTreeMap, /// Found records. - pub found_records: Vec, + pub found_records: Vec, /// Replication factor. pub replication_factor: usize, @@ -110,7 +110,7 @@ impl GetRecordContext { } /// Get the found record. - pub fn found_record(mut self) -> Record { + pub fn found_record(mut self) -> PeerRecord { self.found_records.pop().expect("record to exist since query succeeded") } @@ -138,7 +138,10 @@ impl GetRecordContext { // TODO: validate record if let Some(record) = record { - self.found_records.push(record); + self.found_records.push(PeerRecord { + record, + peer: Some(peer.peer), + }); } // add the queried peer to `queried` and all new peers which haven't been diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index 690ead05..71f47036 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -22,7 +22,7 @@ use crate::{ protocol::libp2p::kademlia::{ message::KademliaMessage, query::{find_node::FindNodeContext, get_record::GetRecordContext}, - record::{Key as RecordKey, Record}, + record::{Key as RecordKey, PeerRecord, Record}, types::{KademliaPeer, Key}, Quorum, }, @@ -97,7 +97,7 @@ pub enum QueryAction { peers: Vec, }, - /// Store the record to nodest closest to target key. + /// Store the record to nodes closest to target key. // TODO: horrible name PutRecordToFoundNodes { /// Target peer. @@ -113,7 +113,7 @@ pub enum QueryAction { query_id: QueryId, /// Found record. - record: Record, + record: PeerRecord, }, // TODO: remove @@ -571,7 +571,7 @@ mod tests { let mut engine = QueryEngine::new(PeerId::random(), 20usize, 3usize); let record_key = RecordKey::new(&vec![1, 2, 3, 4]); let target_key = Key::new(record_key.clone()); - let original_record = Record::new(record_key, vec![1, 3, 3, 7, 1, 3, 3, 8]); + let original_record = Record::new(record_key.clone(), vec![1, 3, 3, 7, 1, 3, 3, 8]); let distances = { let mut distances = std::collections::BTreeMap::new(); @@ -651,15 +651,58 @@ mod tests { } } - match engine.next_action() { + let peers = match engine.next_action() { Some(QueryAction::PutRecordToFoundNodes { peers, record }) => { assert_eq!(peers.len(), 4); assert_eq!(record.key, original_record.key); assert_eq!(record.value, original_record.value); + peers } _ => panic!("invalid event received"), - } + }; assert!(engine.next_action().is_none()); + + // get records from those peers. + let _query = engine.start_get_record( + QueryId(1341), + record_key.clone(), + vec![ + KademliaPeer::new(peers[0].peer, vec![], ConnectionType::NotConnected), + KademliaPeer::new(peers[1].peer, vec![], ConnectionType::NotConnected), + KademliaPeer::new(peers[2].peer, vec![], ConnectionType::NotConnected), + KademliaPeer::new(peers[3].peer, vec![], ConnectionType::NotConnected), + ] + .into(), + Quorum::All, + 3, + ); + + for _ in 0..4 { + match engine.next_action() { + Some(QueryAction::SendMessage { query, peer, .. }) => { + engine.register_response( + query, + peer, + KademliaMessage::GetRecord { + record: Some(original_record.clone()), + peers: vec![], + key: Some(record_key.clone()), + }, + ); + } + _ => panic!("invalid event received"), + } + } + + let peers: std::collections::HashSet<_> = peers.into_iter().map(|p| p.peer).collect(); + match engine.next_action() { + Some(QueryAction::GetRecordQueryDone { record, .. }) => { + assert!(peers.contains(&record.peer.expect("Peer Id must be provided"))); + assert_eq!(record.record.key, original_record.key); + assert_eq!(record.record.value, original_record.value); + } + _ => panic!("invalid event received"), + } } } diff --git a/src/protocol/libp2p/kademlia/record.rs b/src/protocol/libp2p/kademlia/record.rs index 619f6ebf..f0ce855a 100644 --- a/src/protocol/libp2p/kademlia/record.rs +++ b/src/protocol/libp2p/kademlia/record.rs @@ -108,3 +108,14 @@ impl Record { self.expires.map_or(false, |t| now >= t) } } + +/// A record either received by the given peer or retrieved from the local +/// record store. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PeerRecord { + /// The peer from whom the record was received. `None` if the record was + /// retrieved from local storage. + pub peer: Option, + + pub record: Record, +}