Skip to content

Commit d07c455

Browse files
kad: Fix not retrieving local records (#221)
This PR makes sure we always return a locally stored record if we have it. This changes the existing behavior, so I am unsure if it breaks any assumptions in the client code. We should mention this breaking change in the release notes.
1 parent 7c05d29 commit d07c455

File tree

5 files changed

+127
-13
lines changed

5 files changed

+127
-13
lines changed

src/protocol/libp2p/kademlia/handle.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,12 +254,12 @@ pub enum KademliaEvent {
254254
/// The type of the DHT records.
255255
#[derive(Debug, Clone)]
256256
pub enum RecordsType {
257-
/// Record was found in the local store.
257+
/// Record was found in the local store and [`Quorum::One`] was used.
258258
///
259259
/// This contains only a single result.
260260
LocalStore(Record),
261261

262-
/// Records found in the network.
262+
/// Records found in the network. This can include the locally found record.
263263
Network(Vec<PeerRecord>),
264264
}
265265

src/protocol/libp2p/kademlia/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1118,7 +1118,7 @@ impl Kademlia {
11181118
.closest(&Key::new(key), self.replication_factor)
11191119
.into(),
11201120
quorum,
1121-
if record.is_some() { 1 } else { 0 },
1121+
record.cloned(),
11221122
);
11231123
}
11241124
}

src/protocol/libp2p/kademlia/query/get_record.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,11 @@ pub struct GetRecordContext {
106106

107107
impl GetRecordContext {
108108
/// Create new [`GetRecordContext`].
109-
pub fn new(config: GetRecordConfig, in_peers: VecDeque<KademliaPeer>) -> Self {
109+
pub fn new(
110+
config: GetRecordConfig,
111+
in_peers: VecDeque<KademliaPeer>,
112+
found_records: Vec<PeerRecord>,
113+
) -> Self {
110114
let mut candidates = BTreeMap::new();
111115

112116
for candidate in &in_peers {
@@ -123,7 +127,7 @@ impl GetRecordContext {
123127
candidates,
124128
pending: HashMap::new(),
125129
queried: HashSet::new(),
126-
found_records: Vec::new(),
130+
found_records,
127131
}
128132
}
129133

@@ -378,7 +382,7 @@ mod tests {
378382
#[test]
379383
fn completes_when_no_candidates() {
380384
let config = default_config();
381-
let mut context = GetRecordContext::new(config, VecDeque::new());
385+
let mut context = GetRecordContext::new(config, VecDeque::new(), Vec::new());
382386
assert!(context.is_done());
383387
let event = context.next_action().unwrap();
384388
assert_eq!(event, QueryAction::QueryFailed { query: QueryId(0) });
@@ -387,7 +391,7 @@ mod tests {
387391
known_records: 1,
388392
..default_config()
389393
};
390-
let mut context = GetRecordContext::new(config, VecDeque::new());
394+
let mut context = GetRecordContext::new(config, VecDeque::new(), Vec::new());
391395
assert!(context.is_done());
392396
let event = context.next_action().unwrap();
393397
assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) });
@@ -405,7 +409,7 @@ mod tests {
405409
assert_eq!(in_peers_set.len(), 3);
406410

407411
let in_peers = in_peers_set.iter().map(|peer| peer_to_kad(*peer)).collect();
408-
let mut context = GetRecordContext::new(config, in_peers);
412+
let mut context = GetRecordContext::new(config, in_peers, Vec::new());
409413

410414
for num in 0..3 {
411415
let event = context.next_action().unwrap();
@@ -444,7 +448,7 @@ mod tests {
444448
assert_eq!(in_peers_set.len(), 3);
445449

446450
let in_peers = [peer_a, peer_b, peer_c].iter().map(|peer| peer_to_kad(*peer)).collect();
447-
let mut context = GetRecordContext::new(config, in_peers);
451+
let mut context = GetRecordContext::new(config, in_peers, Vec::new());
448452

449453
// Schedule peer queries.
450454
for num in 0..3 {

src/protocol/libp2p/kademlia/query/mod.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ impl QueryEngine {
318318
target: RecordKey,
319319
candidates: VecDeque<KademliaPeer>,
320320
quorum: Quorum,
321-
count: usize,
321+
local_record: Option<Record>,
322322
) -> QueryId {
323323
tracing::debug!(
324324
target: LOG_TARGET,
@@ -331,18 +331,26 @@ impl QueryEngine {
331331
let target = Key::new(target);
332332
let config = GetRecordConfig {
333333
local_peer_id: self.local_peer_id,
334-
known_records: count,
334+
known_records: if local_record.is_some() { 1 } else { 0 },
335335
quorum,
336336
replication_factor: self.replication_factor,
337337
parallelism_factor: self.parallelism_factor,
338338
query: query_id,
339339
target,
340340
};
341341

342+
let found_records = local_record
343+
.into_iter()
344+
.map(|record| PeerRecord {
345+
peer: self.local_peer_id,
346+
record,
347+
})
348+
.collect();
349+
342350
self.queries.insert(
343351
query_id,
344352
QueryType::GetRecord {
345-
context: GetRecordContext::new(config, candidates),
353+
context: GetRecordContext::new(config, candidates, found_records),
346354
},
347355
);
348356

@@ -883,7 +891,7 @@ mod tests {
883891
]
884892
.into(),
885893
Quorum::All,
886-
3,
894+
None,
887895
);
888896

889897
for _ in 0..4 {

tests/protocol/kademlia.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,108 @@ async fn get_record_retrieves_remote_records() {
464464
}
465465
}
466466

467+
#[tokio::test]
468+
async fn get_record_retrieves_local_and_remote_records() {
469+
let (kad_config1, mut kad_handle1) = KademliaConfigBuilder::new().build();
470+
let (kad_config2, mut kad_handle2) = KademliaConfigBuilder::new().build();
471+
472+
let config1 = ConfigBuilder::new()
473+
.with_tcp(TcpConfig {
474+
listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
475+
..Default::default()
476+
})
477+
.with_libp2p_kademlia(kad_config1)
478+
.build();
479+
480+
let config2 = ConfigBuilder::new()
481+
.with_tcp(TcpConfig {
482+
listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()],
483+
..Default::default()
484+
})
485+
.with_libp2p_kademlia(kad_config2)
486+
.build();
487+
488+
let mut litep2p1 = Litep2p::new(config1).unwrap();
489+
let mut litep2p2 = Litep2p::new(config2).unwrap();
490+
491+
// Let peers jnow about each other
492+
kad_handle1
493+
.add_known_peer(
494+
*litep2p2.local_peer_id(),
495+
litep2p2.listen_addresses().cloned().collect(),
496+
)
497+
.await;
498+
kad_handle2
499+
.add_known_peer(
500+
*litep2p1.local_peer_id(),
501+
litep2p1.listen_addresses().cloned().collect(),
502+
)
503+
.await;
504+
505+
// Store the record on `litep2p1``.
506+
let original_record = Record::new(vec![1, 2, 3], vec![0x01]);
507+
let query1 = kad_handle1.put_record(original_record.clone()).await;
508+
509+
let (mut peer1_stored, mut peer2_stored) = (false, false);
510+
let mut query3 = None;
511+
512+
loop {
513+
tokio::select! {
514+
_ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => {
515+
panic!("record was not retrieved in 10 secs")
516+
}
517+
event = litep2p1.next_event() => {}
518+
event = litep2p2.next_event() => {}
519+
event = kad_handle1.next() => {}
520+
event = kad_handle2.next() => {
521+
match event {
522+
Some(KademliaEvent::IncomingRecord { record: got_record }) => {
523+
assert_eq!(got_record.key, original_record.key);
524+
assert_eq!(got_record.value, original_record.value);
525+
assert_eq!(got_record.publisher.unwrap(), *litep2p1.local_peer_id());
526+
assert!(got_record.expires.is_some());
527+
528+
// Get record.
529+
let query_id = kad_handle2
530+
.get_record(RecordKey::from(vec![1, 2, 3]), Quorum::All).await;
531+
query3 = Some(query_id);
532+
}
533+
Some(KademliaEvent::GetRecordSuccess { query_id: _, records }) => {
534+
match records {
535+
RecordsType::LocalStore(_) => {
536+
panic!("the record was retrieved only from peer2")
537+
}
538+
RecordsType::Network(records) => {
539+
assert_eq!(records.len(), 2);
540+
541+
// Locally retrieved record goes first.
542+
assert_eq!(records[0].peer, *litep2p2.local_peer_id());
543+
assert_eq!(records[0].record.key, original_record.key);
544+
assert_eq!(records[0].record.value, original_record.value);
545+
assert_eq!(records[0].record.publisher.unwrap(), *litep2p1.local_peer_id());
546+
assert!(records[0].record.expires.is_some());
547+
548+
// Remote record from peer 1.
549+
assert_eq!(records[1].peer, *litep2p1.local_peer_id());
550+
assert_eq!(records[1].record.key, original_record.key);
551+
assert_eq!(records[1].record.value, original_record.value);
552+
assert_eq!(records[1].record.publisher.unwrap(), *litep2p1.local_peer_id());
553+
assert!(records[1].record.expires.is_some());
554+
555+
break
556+
}
557+
}
558+
}
559+
Some(KademliaEvent::QueryFailed { query_id: _ }) => {
560+
panic!("peer2 query failed")
561+
}
562+
_ => {}
563+
}
564+
}
565+
}
566+
}
567+
}
568+
467569
#[tokio::test]
468570
async fn provider_retrieved_by_remote_node() {
469571
let (kad_config1, mut kad_handle1) = KademliaConfigBuilder::new().build();

0 commit comments

Comments
 (0)