diff --git a/firefly-cardanoconnect/src/blockchain.rs b/firefly-cardanoconnect/src/blockchain.rs index 97b64e5..95a02b2 100644 --- a/firefly-cardanoconnect/src/blockchain.rs +++ b/firefly-cardanoconnect/src/blockchain.rs @@ -88,15 +88,10 @@ impl BlockchainClient { let blockchain = &config.connector.blockchain; let client = match (&blockchain.socket, blockfrost) { - (Some(socket), blockfrost) => { - let client = NodeToClient::new( - socket, - blockchain.magic(), - blockchain.genesis_hash(), - blockchain.genesis_values(), - blockfrost, - ) - .await; + (Some(socket), _) => { + let client = + NodeToClient::new(socket, blockchain.magic(), blockchain.genesis_values()) + .await; ClientImpl::NodeToClient(RwLock::new(client)) } (None, Some(blockfrost)) => { @@ -169,7 +164,6 @@ pub trait ChainSyncClient { &mut self, points: &[BlockReference], ) -> Result<(Option, BlockReference)>; - async fn request_block(&mut self, block_ref: &BlockReference) -> Result>; } pub struct ChainSyncClientWrapper { @@ -187,9 +181,6 @@ impl ChainSyncClient for ChainSyncClientWrapper { ) -> Result<(Option, BlockReference)> { self.inner.find_intersect(points).await } - async fn request_block(&mut self, block_ref: &BlockReference) -> Result> { - self.inner.request_block(block_ref).await - } } pub enum RequestNextResponse { diff --git a/firefly-cardanoconnect/src/blockchain/blockfrost.rs b/firefly-cardanoconnect/src/blockchain/blockfrost.rs index d99578e..6bfb3df 100644 --- a/firefly-cardanoconnect/src/blockchain/blockfrost.rs +++ b/firefly-cardanoconnect/src/blockchain/blockfrost.rs @@ -105,14 +105,6 @@ impl ChainSyncClient for BlockfrostChainSync { }; Ok((None, parse_reference(&self.tip))) } - - async fn request_block(&mut self, block_ref: &BlockReference) -> Result> { - let (requested_slot, requested_hash) = match block_ref { - BlockReference::Origin => (None, &self.genesis_hash), - BlockReference::Point(number, hash) => (*number, hash), - }; - request_block(&self.client, requested_hash, requested_slot).await - } } impl BlockfrostChainSync { @@ -237,22 +229,6 @@ impl Point { } } -pub async fn request_block( - client: &BlockfrostClient, - hash: &str, - slot: Option, -) -> Result> { - let Some(block) = client.try_blocks_by_id(hash).await? else { - return Ok(None); - }; - - if slot.is_some_and(|s| block.slot.is_some_and(|b| b as u64 != s)) { - bail!("requested_block returned a block in the wrong slot"); - } - - Ok(Some(parse_block(client, block).await?)) -} - fn parse_point(block: &BlockContent) -> Point { Point { slot: block.slot.map(|s| s as u64), diff --git a/firefly-cardanoconnect/src/blockchain/mocks.rs b/firefly-cardanoconnect/src/blockchain/mocks.rs index 6825fac..351599f 100644 --- a/firefly-cardanoconnect/src/blockchain/mocks.rs +++ b/firefly-cardanoconnect/src/blockchain/mocks.rs @@ -65,10 +65,6 @@ impl ChainSyncClient for MockChainSync { let tip = chain.last().map(|b| b.as_reference()).unwrap_or_default(); Ok((intersect.map(|i| i.as_reference()), tip)) } - - async fn request_block(&mut self, block_ref: &BlockReference) -> Result> { - self.chain.request_block(block_ref).await - } } // Mock implementation of something which can query the chain @@ -134,22 +130,6 @@ impl MockChain { Some(final_rollback_target) } - // this is a simpler version of request_range from the block fetch protocol. - pub async fn request_block(&self, block_ref: &BlockReference) -> Result> { - match block_ref { - BlockReference::Origin => Ok(None), - BlockReference::Point(slot, hash) => { - let chain = self.chain.read().await; - Ok(chain - .iter() - .rev() - .find(|b| b.block_hash == *hash) - .filter(|b| b.block_slot == *slot) - .cloned()) - } - } - } - // TODO: roll back sometimes async fn generate( chain: Arc>>, diff --git a/firefly-cardanoconnect/src/blockchain/n2c.rs b/firefly-cardanoconnect/src/blockchain/n2c.rs index 35c699b..3ee5b3a 100644 --- a/firefly-cardanoconnect/src/blockchain/n2c.rs +++ b/firefly-cardanoconnect/src/blockchain/n2c.rs @@ -20,7 +20,6 @@ use tokio::time; use tracing::warn; use crate::{ - blockfrost::BlockfrostClient, streams::{BlockInfo, BlockReference}, utils::LazyInit, }; @@ -30,27 +29,17 @@ use super::{ChainSyncClient, RequestNextResponse}; pub struct NodeToClient { socket: PathBuf, magic: u64, - genesis_hash: String, genesis_values: GenesisValues, - blockfrost: Option, client: LazyInit, } impl NodeToClient { - pub async fn new( - socket: &Path, - magic: u64, - genesis_hash: &str, - genesis_values: GenesisValues, - blockfrost: Option, - ) -> Self { + pub async fn new(socket: &Path, magic: u64, genesis_values: GenesisValues) -> Self { let client = Self::connect(socket, magic); let mut result = Self { socket: socket.to_path_buf(), magic, - genesis_hash: genesis_hash.to_string(), genesis_values, - blockfrost, client, }; if let Err(error) = result.get_client().await { @@ -93,16 +82,10 @@ impl NodeToClient { } pub fn open_chainsync(&self) -> Result { - let Some(blockfrost) = self.blockfrost.clone() else { - bail!("Cannot use node-to-client without a blockfrost key") - }; let client = Self::connect(&self.socket, self.magic); - let genesis_hash = self.genesis_hash.clone(); let genesis_values = self.genesis_values.clone(); Ok(N2cChainSync { client, - blockfrost, - genesis_hash, genesis_values, }) } @@ -137,8 +120,6 @@ impl NodeToClient { pub struct N2cChainSync { client: LazyInit, - blockfrost: BlockfrostClient, - genesis_hash: String, genesis_values: GenesisValues, } @@ -188,13 +169,6 @@ impl ChainSyncClient for N2cChainSync { Ok((intersect, tip)) } - async fn request_block(&mut self, block_ref: &BlockReference) -> Result> { - let (requested_slot, requested_hash) = match block_ref { - BlockReference::Origin => (None, &self.genesis_hash), - BlockReference::Point(number, hash) => (*number, hash), - }; - super::blockfrost::request_block(&self.blockfrost, requested_hash, requested_slot).await - } } impl N2cChainSync { diff --git a/firefly-cardanoconnect/src/routes/chain.rs b/firefly-cardanoconnect/src/routes/chain.rs index 4b3c9ce..1163d64 100644 --- a/firefly-cardanoconnect/src/routes/chain.rs +++ b/firefly-cardanoconnect/src/routes/chain.rs @@ -3,12 +3,11 @@ use firefly_server::apitypes::{ApiError, ApiResult}; use schemars::JsonSchema; use serde::Serialize; -use crate::{blockchain::ChainSyncClient, AppState}; +use crate::{blockchain::ChainSyncClient, streams::BlockReference, AppState}; #[derive(Serialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct ChainTip { - pub block_height: Option, pub block_slot: Option, pub block_hash: String, } @@ -17,13 +16,11 @@ pub async fn get_chain_tip( State(AppState { blockchain, .. }): State, ) -> ApiResult> { let mut sync = blockchain.sync().await?; - let (_, tip) = sync.find_intersect(&[]).await?; - let Some(block) = sync.request_block(&tip).await? else { + let (_, BlockReference::Point(slot, hash)) = sync.find_intersect(&[]).await? else { return Err(ApiError::not_found("tip of chain not found")); }; Ok(Json(ChainTip { - block_height: block.block_height, - block_slot: block.block_slot, - block_hash: block.block_hash, + block_slot: slot, + block_hash: hash, })) } diff --git a/firefly-cardanoconnect/src/streams/blockchain.rs b/firefly-cardanoconnect/src/streams/blockchain.rs index d105302..9bf5d45 100644 --- a/firefly-cardanoconnect/src/streams/blockchain.rs +++ b/firefly-cardanoconnect/src/streams/blockchain.rs @@ -81,6 +81,7 @@ impl ChainListener { #[derive(Debug)] pub struct ChainListenerImpl { history: VecDeque, + start: BlockReference, rollbacks: HashMap, sync_event_source: mpsc::Receiver, block_record_sink: mpsc::UnboundedSender, @@ -103,7 +104,10 @@ impl ChainListenerImpl { } pub fn get_tip(&self) -> BlockReference { - self.history.back().unwrap().as_reference() + self.history + .back() + .map(|b| b.as_reference()) + .unwrap_or(self.start.clone()) } pub fn try_get_next_event(&mut self, block_ref: &BlockReference) -> Option { @@ -112,18 +116,16 @@ impl ChainListenerImpl { BlockReference::Point(slot, hash) => (*slot, hash.clone()), }; - if let Some(slot) = prev_slot { - // if we haven't seen enough blocks to be "sure" that this one is immutable, apply all pending updates synchronously - if self - .history + // if we haven't seen enough blocks to be "sure" that this one is immutable, apply all pending updates synchronously + if prev_slot.is_none_or(|slot| { + self.history .iter() .rev() .find_map(|b| b.block_slot) - .is_some_and(|tip| tip < slot + APPROXIMATELY_IMMUTABLE_LENGTH) - { - while let Ok(sync_event) = self.sync_event_source.try_recv() { - self.handle_sync_event(sync_event); - } + .is_none_or(|tip| tip < slot + APPROXIMATELY_IMMUTABLE_LENGTH) + }) { + while let Ok(sync_event) = self.sync_event_source.try_recv() { + self.handle_sync_event(sync_event); } } @@ -132,15 +134,10 @@ impl ChainListenerImpl { return Some(ListenerEvent::Rollback(rollback_to.clone())); } - for (index, block) in self.history.iter().enumerate().rev() { - if block.block_hash == prev_hash { - if let Some(next) = self.history.get(index + 1) { - // we already have the block which comes after this! - return Some(ListenerEvent::Process(next.clone())); - } else { - // we don't have that block yet, so process events until we do - break; - } + for block in self.history.iter().rev() { + // If this block's parent is the one we're starting from, this is the next block to process. + if block.parent_hash.as_ref().is_some_and(|h| *h == prev_hash) { + return Some(ListenerEvent::Process(block.clone())); } // If we can tell by the slots we've gone too far back, break early if block @@ -222,23 +219,18 @@ impl ChainListenerImpl { let mut sync = chain.sync().await?; let points: Vec<_> = history.iter().rev().map(|b| b.as_reference()).collect(); let (head_ref, _) = sync.find_intersect(&points).await?; - let Some(head_ref) = head_ref else { + let Some(BlockReference::Point(_, head_hash)) = head_ref else { // The chain didn't recognize any of the blocks we saved from this chain. // We have no way to recover. bail!("listener {id} is on a fork which no longer exists"); }; - let Some(head) = sync.request_block(&head_ref).await? else { - bail!("listener {id} is on a fork which no longer exists"); - }; let mut rollbacks = HashMap::new(); - while history - .back() - .is_some_and(|i| i.block_hash != head.block_hash) - { + while history.back().is_some_and(|i| i.block_hash != head_hash) { let rolled_back = history.pop_back().unwrap(); rollbacks.insert(rolled_back.as_reference(), rolled_back); } + let start = history.back().unwrap().as_reference(); let (sync_event_sink, sync_event_source) = mpsc::channel(16); let (block_record_sink, block_record_source) = mpsc::unbounded_channel(); @@ -246,6 +238,7 @@ impl ChainListenerImpl { tokio::spawn(Self::persist_blocks(id, persistence, block_record_source)); Ok(Self { history, + start, rollbacks, genesis_hash: chain.genesis_hash(), sync_event_source, @@ -260,71 +253,30 @@ impl ChainListenerImpl { from: Option, ) -> Result { let mut sync = chain.sync().await?; - let head_ref = match &from { - Some(block_ref) => { - // If the caller passed a block reference, they're starting from either the origin or a specific point - let (head, _) = sync.find_intersect(&[block_ref.clone()]).await?; - let Some(head) = head else { - // Trying to init a fresh listener from a ref which does not exist - bail!("could not start listening from {from:?}, as it does not exist on-chain"); - }; - head - } - None => { - // Otherwise, they just want to follow from the tip - let (_, tip) = sync.find_intersect(&[]).await?; - // Call find_intersect again so the chainsync protocol knows we're following from the tip - let (head, _) = sync.find_intersect(&[tip.clone()]).await?; - if !head.is_some_and(|h| h == tip) { - bail!("could not start listening from latest: rollback occurred while we were connecting"); - }; - tip - } - }; - let Some(head) = sync.request_block(&head_ref).await? else { - // Trying to init a fresh listener from a ref which does not exist - bail!("could not start listening from {from:?}, as it does not exist on-chain"); - }; - - let mut prev_hash = head.parent_hash.clone(); - let mut history = VecDeque::new(); - history.push_back(head); - - for _ in 0..APPROXIMATELY_IMMUTABLE_LENGTH { - let Some(prev) = prev_hash else { - break; + let start = if let Some(block_ref) = &from { + let (Some(head), _) = sync.find_intersect(&[block_ref.clone()]).await? else { + // Trying to init a fresh listener from a ref which does not exist + bail!("could not start listening from {from:?}, as it does not exist on-chain"); }; - let prev_ref = BlockReference::Point(None, prev); - let prev_block = match sync.request_block(&prev_ref).await { - Err(err) => { - warn!("could not populate a history for this listener, it may not be able to recover from rollback: {}", err); - break; - } - Ok(None) => { - bail!("block {from:?} was rolled back before we could finish setting it up") - } - Ok(Some(prev_block)) => prev_block, + head + } else { + // Otherwise, they just want to follow from the tip + let (_, tip) = sync.find_intersect(&[]).await?; + // Call find_intersect again so the chainsync protocol knows we're following from the tip + let (head, _) = sync.find_intersect(&[tip.clone()]).await?; + if !head.is_some_and(|h| h == tip) { + bail!("could not start listening from latest: rollback occurred while we were connecting"); }; - - prev_hash = prev_block.parent_hash.clone(); - history.push_front(prev_block); - } - - let records: Vec<_> = history - .iter() - .map(|block| BlockRecord { - block: block.clone(), - rolled_back: false, - }) - .collect(); - persistence.save_block_records(&id, records).await?; + tip + }; let (sync_event_sink, sync_event_source) = mpsc::channel(16); let (block_record_sink, block_record_source) = mpsc::unbounded_channel(); tokio::spawn(Self::stay_in_sync(sync, sync_event_sink)); tokio::spawn(Self::persist_blocks(id, persistence, block_record_source)); Ok(Self { - history, + history: VecDeque::new(), + start, rollbacks: HashMap::new(), genesis_hash: chain.genesis_hash(), sync_event_source, diff --git a/scripts/demo/src/firefly.rs b/scripts/demo/src/firefly.rs index 966bd4b..a33ea7f 100644 --- a/scripts/demo/src/firefly.rs +++ b/scripts/demo/src/firefly.rs @@ -200,8 +200,6 @@ struct OperationStatus { #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Tip { - #[allow(unused)] - pub block_height: u64, pub block_slot: u64, pub block_hash: String, }