Skip to content

Commit

Permalink
Add gossipsub as a Lighthouse behaviour (#5066)
Browse files Browse the repository at this point in the history
* Move gossipsub as a lighthouse behaviour

* Update dependencies, pin to corrected libp2p version

* Merge latest unstable

* Fix test

* Remove unused dep

* Fix cargo.lock

* Re-order behaviour, pin upstream libp2p

* Pin discv5 to latest version
  • Loading branch information
AgeManning authored Jan 31, 2024
1 parent b9c519d commit 4273004
Show file tree
Hide file tree
Showing 41 changed files with 17,720 additions and 182 deletions.
614 changes: 460 additions & 154 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ criterion = "0.3"
delay_map = "0.3"
derivative = "2"
dirs = "3"
discv5 = { git="https://github.com/sigp/discv5", rev="e30a2c31b7ac0c57876458b971164654dfa4513b", features = ["libp2p"] }
either = "1.9"
discv5 = { version = "0.4.1", features = ["libp2p"] }
env_logger = "0.9"
error-chain = "0.12"
ethereum-types = "0.14"
Expand Down Expand Up @@ -160,6 +161,7 @@ tempfile = "3"
tokio = { version = "1", features = ["rt-multi-thread", "sync", "signal"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-util = { version = "0.6", features = ["codec", "compat", "time"] }
tracing = "0.1.40"
tracing-appender = "0.2"
tracing-core = "0.1"
tracing-log = "0.2"
Expand Down
26 changes: 21 additions & 5 deletions beacon_node/lighthouse_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,29 @@ superstruct = { workspace = true }
prometheus-client = "0.22.0"
unused_port = { workspace = true }
delay_map = { workspace = true }
void = "1"
libp2p-mplex = { git = "https://github.com/sigp/rust-libp2p/", rev = "cfa3275ca17e502799ed56e555b6c0611752e369" }
tracing = { workspace = true }
byteorder = { workspace = true }
bytes = { workspace = true }
either = { workspace = true }

# Local dependencies
futures-ticker = "0.0.3"
futures-timer = "3.0.2"
getrandom = "0.2.11"
hex_fmt = "0.3.0"
instant = "0.1.12"
quick-protobuf = "0.8"
void = "1.0.2"
async-channel = "1.9.0"
asynchronous-codec = "0.7.0"
base64 = "0.21.5"
libp2p-mplex = "0.41"
quick-protobuf-codec = "0.3"

[dependencies.libp2p]
git = "https://github.com/sigp/rust-libp2p/"
rev = "cfa3275ca17e502799ed56e555b6c0611752e369"
version = "0.53"
default-features = false
features = ["identify", "yamux", "noise", "gossipsub", "dns", "tcp", "tokio", "plaintext", "secp256k1", "macros", "ecdsa", "metrics", "quic"]
features = ["identify", "yamux", "noise", "dns", "tcp", "tokio", "plaintext", "secp256k1", "macros", "ecdsa", "metrics", "quic"]

[dev-dependencies]
slog-term = { workspace = true }
Expand All @@ -58,6 +73,7 @@ tempfile = { workspace = true }
exit-future = { workspace = true }
quickcheck = { workspace = true }
quickcheck_macros = { workspace = true }
async-std = { version = "1.6.3", features = ["unstable"] }

[features]
libp2p-websocket = []
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::gossipsub;
use crate::listen_addr::{ListenAddr, ListenAddress};
use crate::rpc::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
use crate::types::GossipKind;
use crate::{Enr, PeerIdSerialized};
use directory::{
DEFAULT_BEACON_NODE_DIR, DEFAULT_HARDCODED_NETWORK, DEFAULT_NETWORK_DIR, DEFAULT_ROOT_DIR,
};
use libp2p::gossipsub;
use libp2p::Multiaddr;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
Expand Down Expand Up @@ -158,7 +158,7 @@ pub struct Config {
/// Configuration for the inbound rate limiter (requests received by this node).
pub inbound_rate_limiter_config: Option<InboundRateLimiterConfig>,

/// Whether to disable logging duplicate gossip messages as WARN. If set to true, duplicate
/// Whether to disable logging duplicate gossip messages as WARN. If set to true, duplicate
/// errors will be logged at DEBUG level.
pub disable_duplicate_warn_logs: bool,
}
Expand Down
175 changes: 175 additions & 0 deletions beacon_node/lighthouse_network/src/gossipsub/backoff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright 2020 Sigma Prime Pty Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! Data structure for efficiently storing known back-off's when pruning peers.
use crate::gossipsub::topic::TopicHash;
use instant::Instant;
use libp2p::identity::PeerId;
use std::collections::{
hash_map::{Entry, HashMap},
HashSet,
};
use std::time::Duration;

#[derive(Copy, Clone)]
struct HeartbeatIndex(usize);

/// Stores backoffs in an efficient manner.
pub(crate) struct BackoffStorage {
/// Stores backoffs and the index in backoffs_by_heartbeat per peer per topic.
backoffs: HashMap<TopicHash, HashMap<PeerId, (Instant, HeartbeatIndex)>>,
/// Stores peer topic pairs per heartbeat (this is cyclic the current index is
/// heartbeat_index).
backoffs_by_heartbeat: Vec<HashSet<(TopicHash, PeerId)>>,
/// The index in the backoffs_by_heartbeat vector corresponding to the current heartbeat.
heartbeat_index: HeartbeatIndex,
/// The heartbeat interval duration from the config.
heartbeat_interval: Duration,
/// Backoff slack from the config.
backoff_slack: u32,
}

impl BackoffStorage {
fn heartbeats(d: &Duration, heartbeat_interval: &Duration) -> usize {
((d.as_nanos() + heartbeat_interval.as_nanos() - 1) / heartbeat_interval.as_nanos())
as usize
}

pub(crate) fn new(
prune_backoff: &Duration,
heartbeat_interval: Duration,
backoff_slack: u32,
) -> BackoffStorage {
// We add one additional slot for partial heartbeat
let max_heartbeats =
Self::heartbeats(prune_backoff, &heartbeat_interval) + backoff_slack as usize + 1;
BackoffStorage {
backoffs: HashMap::new(),
backoffs_by_heartbeat: vec![HashSet::new(); max_heartbeats],
heartbeat_index: HeartbeatIndex(0),
heartbeat_interval,
backoff_slack,
}
}

/// Updates the backoff for a peer (if there is already a more restrictive backoff then this call
/// doesn't change anything).
pub(crate) fn update_backoff(&mut self, topic: &TopicHash, peer: &PeerId, time: Duration) {
let instant = Instant::now() + time;
let insert_into_backoffs_by_heartbeat =
|heartbeat_index: HeartbeatIndex,
backoffs_by_heartbeat: &mut Vec<HashSet<_>>,
heartbeat_interval,
backoff_slack| {
let pair = (topic.clone(), *peer);
let index = (heartbeat_index.0
+ Self::heartbeats(&time, heartbeat_interval)
+ backoff_slack as usize)
% backoffs_by_heartbeat.len();
backoffs_by_heartbeat[index].insert(pair);
HeartbeatIndex(index)
};
match self.backoffs.entry(topic.clone()).or_default().entry(*peer) {
Entry::Occupied(mut o) => {
let (backoff, index) = o.get();
if backoff < &instant {
let pair = (topic.clone(), *peer);
if let Some(s) = self.backoffs_by_heartbeat.get_mut(index.0) {
s.remove(&pair);
}
let index = insert_into_backoffs_by_heartbeat(
self.heartbeat_index,
&mut self.backoffs_by_heartbeat,
&self.heartbeat_interval,
self.backoff_slack,
);
o.insert((instant, index));
}
}
Entry::Vacant(v) => {
let index = insert_into_backoffs_by_heartbeat(
self.heartbeat_index,
&mut self.backoffs_by_heartbeat,
&self.heartbeat_interval,
self.backoff_slack,
);
v.insert((instant, index));
}
};
}

/// Checks if a given peer is backoffed for the given topic. This method respects the
/// configured BACKOFF_SLACK and may return true even if the backup is already over.
/// It is guaranteed to return false if the backoff is not over and eventually if enough time
/// passed true if the backoff is over.
///
/// This method should be used for deciding if we can already send a GRAFT to a previously
/// backoffed peer.
pub(crate) fn is_backoff_with_slack(&self, topic: &TopicHash, peer: &PeerId) -> bool {
self.backoffs
.get(topic)
.map_or(false, |m| m.contains_key(peer))
}

pub(crate) fn get_backoff_time(&self, topic: &TopicHash, peer: &PeerId) -> Option<Instant> {
Self::get_backoff_time_from_backoffs(&self.backoffs, topic, peer)
}

fn get_backoff_time_from_backoffs(
backoffs: &HashMap<TopicHash, HashMap<PeerId, (Instant, HeartbeatIndex)>>,
topic: &TopicHash,
peer: &PeerId,
) -> Option<Instant> {
backoffs
.get(topic)
.and_then(|m| m.get(peer).map(|(i, _)| *i))
}

/// Applies a heartbeat. That should be called regularly in intervals of length
/// `heartbeat_interval`.
pub(crate) fn heartbeat(&mut self) {
// Clean up backoffs_by_heartbeat
if let Some(s) = self.backoffs_by_heartbeat.get_mut(self.heartbeat_index.0) {
let backoffs = &mut self.backoffs;
let slack = self.heartbeat_interval * self.backoff_slack;
let now = Instant::now();
s.retain(|(topic, peer)| {
let keep = match Self::get_backoff_time_from_backoffs(backoffs, topic, peer) {
Some(backoff_time) => backoff_time + slack > now,
None => false,
};
if !keep {
//remove from backoffs
if let Entry::Occupied(mut m) = backoffs.entry(topic.clone()) {
if m.get_mut().remove(peer).is_some() && m.get().is_empty() {
m.remove();
}
}
}

keep
});
}

// Increase heartbeat index
self.heartbeat_index =
HeartbeatIndex((self.heartbeat_index.0 + 1) % self.backoffs_by_heartbeat.len());
}
}
Loading

0 comments on commit 4273004

Please sign in to comment.