diff --git a/Cargo.lock b/Cargo.lock index c849867..1b47c3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -197,6 +197,7 @@ checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" dependencies = [ "async-trait", "axum-core", + "axum-macros", "base64", "bytes", "futures-util", @@ -247,6 +248,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-macros" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -332,8 +344,7 @@ dependencies = [ [[package]] name = "blockfrost" version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74fd58b20cc3cefd3a9e6179ff6bbc26f38d21e6e47fc8a0c0c1de008d353b12" +source = "git+https://github.com/blockfrost/blockfrost-rust.git?rev=14e22b5#14e22b5f25ac5f004a2f2aed056d73b18f925447" dependencies = [ "blockfrost-openapi", "futures", @@ -348,14 +359,14 @@ dependencies = [ [[package]] name = "blockfrost-openapi" -version = "0.0.3" +version = "0.1.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de47d9046ac8ff2d7c7e4d28426ea8c9e2d0cb31105f9b24d0a31f0b37ae739f" +checksum = "3c7bab97c5a8dc8b2dcdba5aa255982ab69eba0067dbdc24e009214c946434d2" dependencies = [ "serde", - "serde_derive", "serde_json", "serde_with", + "uuid", ] [[package]] @@ -755,6 +766,7 @@ dependencies = [ "async-trait", "axum", "blockfrost", + "blockfrost-openapi", "chrono", "clap", "dashmap", @@ -2794,6 +2806,16 @@ dependencies = [ "zip", ] +[[package]] +name = "uuid" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +dependencies = [ + "getrandom", + "serde", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/firefly-cardanoconnect/Cargo.toml b/firefly-cardanoconnect/Cargo.toml index 4100f71..7366712 100644 --- a/firefly-cardanoconnect/Cargo.toml +++ b/firefly-cardanoconnect/Cargo.toml @@ -8,8 +8,9 @@ repository = "https://github.com/SundaeSwap-finance/firefly-cardano" aide = { version = "0.13", features = ["axum"] } anyhow = "1" async-trait = "0.1" -axum = { version = "0.7", features = ["ws"] } -blockfrost = { version = "1", default-features = false, features = ["rustls-tls"] } +axum = { version = "0.7", features = ["macros", "ws"] } +blockfrost = { git = "https://github.com/blockfrost/blockfrost-rust.git", rev = "14e22b5", default-features = false, features = ["rustls-tls"] } +blockfrost-openapi = "0.1.69" clap = { version = "4", features = ["derive"] } chrono = "0.4" dashmap = "6" diff --git a/firefly-cardanoconnect/src/blockchain.rs b/firefly-cardanoconnect/src/blockchain.rs index c098c34..8e969c4 100644 --- a/firefly-cardanoconnect/src/blockchain.rs +++ b/firefly-cardanoconnect/src/blockchain.rs @@ -6,6 +6,7 @@ use crate::{ }; use anyhow::{bail, Result}; use async_trait::async_trait; +use blockfrost::Blockfrost; use mocks::MockChain; use n2c::NodeToClient; use pallas_primitives::conway::Tx; @@ -13,13 +14,14 @@ use pallas_traverse::wellknown::GenesisValues; use serde::Deserialize; use tokio::sync::RwLock; +mod blockfrost; pub mod mocks; mod n2c; #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct BlockchainConfig { - pub socket: PathBuf, + pub socket: Option, pub blockfrost_key: Option>, pub network: Option, pub network_magic: Option, @@ -66,6 +68,7 @@ impl BlockchainConfig { } enum ClientImpl { + Blockfrost(Blockfrost), NodeToClient(RwLock), Mock(MockChain), } @@ -77,26 +80,32 @@ pub struct BlockchainClient { } impl BlockchainClient { - pub async fn new(config: &CardanoConnectConfig) -> Self { + pub async fn new(config: &CardanoConnectConfig) -> Result { let blockchain = &config.connector.blockchain; - let n2c = { - let client = NodeToClient::new( - &blockchain.socket, - blockchain.magic(), - blockchain.genesis_hash(), - blockchain.genesis_values(), - blockchain.blockfrost_key.as_ref(), - ) - .await; - RwLock::new(client) + let client = match (&blockchain.socket, &blockchain.blockfrost_key) { + (Some(socket), key) => { + let client = NodeToClient::new( + socket, + blockchain.magic(), + blockchain.genesis_hash(), + blockchain.genesis_values(), + key.as_ref(), + ) + .await; + ClientImpl::NodeToClient(RwLock::new(client)) + } + (None, Some(key)) => { + let client = Blockfrost::new(&key.0, blockchain.genesis_hash()); + ClientImpl::Blockfrost(client) + } + (None, None) => bail!("Missing blockchain configuration"), }; - - Self { - client: ClientImpl::NodeToClient(n2c), + Ok(Self { + client, genesis_hash: blockchain.genesis_hash().to_string(), era: blockchain.era, - } + }) } #[allow(unused)] @@ -116,6 +125,7 @@ impl BlockchainClient { pub async fn health(&self) -> Result<()> { match &self.client { + ClientImpl::Blockfrost(_) => Ok(()), ClientImpl::Mock(_) => Ok(()), ClientImpl::NodeToClient(n2c) => { let client = n2c.read().await; @@ -126,6 +136,7 @@ impl BlockchainClient { pub async fn submit(&self, transaction: Tx) -> Result { match &self.client { + ClientImpl::Blockfrost(bf) => bf.submit(transaction).await, ClientImpl::Mock(_) => bail!("mock transaction submission not implemented"), ClientImpl::NodeToClient(n2c) => { let mut client = n2c.write().await; @@ -136,6 +147,7 @@ impl BlockchainClient { pub async fn sync(&self) -> Result { let inner: Box = match &self.client { + ClientImpl::Blockfrost(bf) => Box::new(bf.open_chainsync().await?), ClientImpl::Mock(mock) => Box::new(mock.sync()), ClientImpl::NodeToClient(n2c) => { let client = n2c.read().await; diff --git a/firefly-cardanoconnect/src/blockchain/blockfrost.rs b/firefly-cardanoconnect/src/blockchain/blockfrost.rs new file mode 100644 index 0000000..c578e69 --- /dev/null +++ b/firefly-cardanoconnect/src/blockchain/blockfrost.rs @@ -0,0 +1,309 @@ +use std::{collections::VecDeque, time::Duration}; + +use anyhow::{bail, Result}; +use async_trait::async_trait; +use blockfrost::{ + BlockFrostSettings, BlockfrostAPI, BlockfrostError, BlockfrostResult, Pagination, +}; +use blockfrost_openapi::models::BlockContent; +use pallas_primitives::conway::Tx; +use tokio::time; + +use crate::streams::{BlockInfo, BlockReference}; + +use super::{ChainSyncClient, RequestNextResponse}; + +pub struct Blockfrost { + api: BlockfrostAPI, + genesis_hash: String, +} + +impl Blockfrost { + pub fn new(blockfrost_key: &str, genesis_hash: &str) -> Self { + Self { + api: BlockfrostAPI::new(blockfrost_key, BlockFrostSettings::new()), + genesis_hash: genesis_hash.to_string(), + } + } + + pub async fn submit(&self, transaction: Tx) -> Result { + let transaction_data = { + let mut bytes = vec![]; + minicbor::encode(transaction, &mut bytes).expect("infallible"); + bytes + }; + Ok(self.api.transactions_submit(transaction_data).await?) + } + + pub async fn open_chainsync(&self) -> Result { + BlockfrostChainSync::new(self.api.clone(), self.genesis_hash.clone()).await + } +} + +pub struct BlockfrostChainSync { + api: BlockfrostAPI, + tip: BlockContent, + prev: VecDeque, + head: Point, + next: VecDeque, + genesis_hash: String, +} + +#[async_trait] +impl ChainSyncClient for BlockfrostChainSync { + async fn request_next(&mut self) -> Result { + if let Some(block) = self.fetch_next().await? { + // roll forward + Ok(RequestNextResponse::RollForward( + block, + parse_reference(&self.tip), + )) + } else { + // roll backward + let new_head = self.roll_back().await?; + Ok(RequestNextResponse::RollBackward( + new_head, + parse_reference(&self.tip), + )) + } + } + + async fn find_intersect( + &mut self, + points: &[BlockReference], + ) -> Result<(Option, BlockReference)> { + self.prev.clear(); + self.next.clear(); + for point in points { + let Some(point) = self.request_point(point).await? else { + continue; + }; + + let history = self + .api + .blocks_previous( + &point.hash, + Pagination { + count: 20, + ..Pagination::default() + }, + ) + .await?; + + for block in history { + self.prev.push_back(parse_point(&block)); + } + let head = point.as_reference(); + self.head = point; + + return Ok((Some(head), parse_reference(&self.tip))); + } + self.head = Point { + slot: None, + hash: self.genesis_hash.clone(), + }; + Ok((None, parse_reference(&self.tip))) + } + + async fn request_block(&mut self, block_ref: &BlockReference) -> Result> { + let (requested_slot, requested_hash) = match block_ref { + BlockReference::Origin => (None, &self.genesis_hash), + BlockReference::Point(number, hash) => (*number, hash), + }; + request_block(&self.api, requested_hash, requested_slot).await + } +} + +impl BlockfrostChainSync { + async fn new(api: BlockfrostAPI, genesis_hash: String) -> Result { + let tip = api.blocks_latest().await?; + Ok(Self { + api, + tip, + prev: VecDeque::new(), + head: Point { + slot: None, + hash: genesis_hash.clone(), + }, + next: VecDeque::new(), + genesis_hash, + }) + } + + async fn fetch_next(&mut self) -> Result> { + while self.next.is_empty() { + let Some(next_blocks) = self + .api + .blocks_next(&self.head.hash, Pagination::default()) + .await + .none_on_404()? + else { + // our head is gone, time to roll back + return Ok(None); + }; + + for block in next_blocks.into_iter() { + self.next.push_back(block); + } + + if let Some(latest) = self.next.back() { + // Update the tip if we've fetched newer blocks + if latest.time > self.tip.time { + self.tip = latest.clone(); + } + } else { + // If next is empty at this point, we're at the tip. + // We're now polling until something new appears. + time::sleep(Duration::from_secs(10)).await; + } + } + + // We definitely have the next block to return now + let next = self.next.pop_front().unwrap(); + let next = parse_block(&self.api, next).await?; + + // update prev to point to the next + self.prev.push_back(self.head.clone()); + self.prev.pop_front(); + self.head = Point { + slot: next.block_slot, + hash: next.block_hash.clone(), + }; + + Ok(Some(next)) + } + + async fn roll_back(&mut self) -> Result { + let Some(oldest) = self.prev.pop_front() else { + bail!("chain has rolled too far back!"); + }; + + let mut new_history = self + .api + .blocks_next(&oldest.hash, Pagination::default()) + .await?; + // find where history diverged + let split_index = new_history + .iter() + .zip(&self.prev) + .take_while(|(new, old)| new.hash == old.hash) + .count(); + let new_next = new_history.split_off(split_index); + + // everything before that point is the past + self.prev.clear(); + self.prev.push_back(oldest); + for block in new_history { + self.prev.push_back(parse_point(&block)); + } + + // whatever was at that point is our new head + self.head = self.prev.pop_back().unwrap(); + + // any blocks left after that point are the future + self.next.clear(); + for block in new_next { + self.next.push_back(block); + } + + Ok(self.head.as_reference()) + } + + async fn request_point(&self, block_ref: &BlockReference) -> Result> { + let (requested_slot, requested_hash) = match block_ref { + BlockReference::Origin => (None, &self.genesis_hash), + BlockReference::Point(number, hash) => (*number, hash), + }; + let Some(block) = self.api.try_fetch_block(requested_hash).await? else { + return Ok(None); + }; + + if requested_slot.is_some_and(|s| block.slot.is_some_and(|b| b as u64 != s)) { + bail!("requested_block returned a block in the wrong slot"); + } + + Ok(Some(parse_point(&block))) + } +} + +#[derive(Clone)] +struct Point { + slot: Option, + hash: String, +} +impl Point { + fn as_reference(&self) -> BlockReference { + BlockReference::Point(self.slot, self.hash.clone()) + } +} + +pub async fn request_block( + api: &BlockfrostAPI, + hash: &str, + slot: Option, +) -> Result> { + let Some(block) = api.try_fetch_block(hash).await? else { + return Ok(None); + }; + + if slot.is_some_and(|s| block.slot.is_some_and(|b| b as u64 != s)) { + bail!("requested_block returned a block in the wrong slot"); + } + + Ok(Some(parse_block(api, block).await?)) +} + +fn parse_point(block: &BlockContent) -> Point { + Point { + slot: block.slot.map(|s| s as u64), + hash: block.hash.clone(), + } +} + +fn parse_reference(block: &BlockContent) -> BlockReference { + let point = parse_point(block); + BlockReference::Point(point.slot, point.hash) +} + +async fn parse_block(api: &BlockfrostAPI, block: BlockContent) -> Result { + let block_hash = block.hash; + let block_height = block.height.map(|h| h as u64); + let block_slot = block.slot.map(|s| s as u64); + + let transaction_hashes = api.blocks_txs(&block_hash, Pagination::all()).await?; + + let info = BlockInfo { + block_hash, + block_height, + block_slot, + parent_hash: block.previous_block, + transaction_hashes, + }; + Ok(info) +} + +trait BlockfrostAPIExt { + async fn try_fetch_block(&self, hash: &str) -> Result>; +} + +impl BlockfrostAPIExt for BlockfrostAPI { + async fn try_fetch_block(&self, hash: &str) -> Result> { + self.blocks_by_id(hash).await.none_on_404() + } +} + +trait BlockfrostResultExt { + type T; + fn none_on_404(self) -> Result>; +} + +impl BlockfrostResultExt for BlockfrostResult { + type T = T; + fn none_on_404(self) -> Result> { + match self { + Err(BlockfrostError::Response { reason, .. }) if reason.status_code == 404 => Ok(None), + Err(error) => Err(error.into()), + Ok(res) => Ok(Some(res)), + } + } +} diff --git a/firefly-cardanoconnect/src/blockchain/n2c.rs b/firefly-cardanoconnect/src/blockchain/n2c.rs index 6aa5a66..89995aa 100644 --- a/firefly-cardanoconnect/src/blockchain/n2c.rs +++ b/firefly-cardanoconnect/src/blockchain/n2c.rs @@ -5,7 +5,7 @@ use std::{ use anyhow::{anyhow, bail, Context, Result}; use async_trait::async_trait; -use blockfrost::{BlockFrostSettings, BlockfrostAPI, BlockfrostError, Pagination}; +use blockfrost::{BlockFrostSettings, BlockfrostAPI}; use pallas_crypto::hash::Hasher; use pallas_network::{ facades::NodeClient, @@ -196,35 +196,7 @@ impl ChainSyncClient for N2cChainSync { BlockReference::Origin => (None, &self.genesis_hash), BlockReference::Point(number, hash) => (*number, hash), }; - let block = match self.blockfrost.blocks_by_id(requested_hash).await { - Err(BlockfrostError::Response { reason, .. }) if reason.status_code == 404 => { - return Ok(None) - } - Err(error) => return Err(error.into()), - Ok(block) => block, - }; - - let block_hash = block.hash; - let block_height = block.height.map(|h| h as u64); - let block_slot = block.slot.map(|s| s as u64); - if requested_slot.is_some_and(|s| block.slot.is_some_and(|b| b as u64 != s)) { - bail!("requested_block returned a block in the wrong slot"); - } - - let transaction_hashes = self - .blockfrost - .blocks_txs(requested_hash, Pagination::all()) - .await?; - - let info = BlockInfo { - block_hash, - block_height, - block_slot, - parent_hash: block.previous_block, - transaction_hashes, - }; - - Ok(Some(info)) + super::blockfrost::request_block(&self.blockfrost, requested_hash, requested_slot).await } } diff --git a/firefly-cardanoconnect/src/main.rs b/firefly-cardanoconnect/src/main.rs index f1b5283..669f1d8 100644 --- a/firefly-cardanoconnect/src/main.rs +++ b/firefly-cardanoconnect/src/main.rs @@ -53,7 +53,7 @@ async fn init_state(config: &CardanoConnectConfig, mock_data: bool) -> Result