diff --git a/Cargo.lock b/Cargo.lock index 3471c98..3fe86a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -405,7 +405,7 @@ dependencies = [ [[package]] name = "blockfrost" version = "1.0.3" -source = "git+https://github.com/SupernaviX/blockfrost-rust.git?rev=ffb40ae#ffb40aea05cf583fdfe321b88bb3fe65db656c28" +source = "git+https://github.com/blockfrost/blockfrost-rust.git?rev=14e22b5#14e22b5f25ac5f004a2f2aed056d73b18f925447" dependencies = [ "blockfrost-openapi", "futures", @@ -1071,6 +1071,7 @@ dependencies = [ "axum", "balius-runtime", "blockfrost", + "blockfrost-openapi", "chrono", "clap", "dashmap", diff --git a/firefly-cardanoconnect/Cargo.toml b/firefly-cardanoconnect/Cargo.toml index 658bc81..b5c2cbf 100644 --- a/firefly-cardanoconnect/Cargo.toml +++ b/firefly-cardanoconnect/Cargo.toml @@ -10,7 +10,8 @@ anyhow = "1" async-trait = "0.1" axum = { version = "0.7", features = ["macros", "ws"] } balius-runtime = { git = "https://github.com/SupernaviX/balius.git", rev = "3a899f9" } -blockfrost = { git = "https://github.com/SupernaviX/blockfrost-rust.git", rev = "ffb40ae", default-features = false, features = ["rustls-tls"] } +blockfrost = { git = "https://github.com/blockfrost/blockfrost-rust.git", rev = "14e22b5", default-features = false, features = ["rustls-tls"] } +blockfrost-openapi = "0.1.69" clap = { version = "4", features = ["derive"] } chrono = "0.4" dashmap = "6" diff --git a/firefly-cardanoconnect/src/blockchain.rs b/firefly-cardanoconnect/src/blockchain.rs index c098c34..8e969c4 100644 --- a/firefly-cardanoconnect/src/blockchain.rs +++ b/firefly-cardanoconnect/src/blockchain.rs @@ -6,6 +6,7 @@ use crate::{ }; use anyhow::{bail, Result}; use async_trait::async_trait; +use blockfrost::Blockfrost; use mocks::MockChain; use n2c::NodeToClient; use pallas_primitives::conway::Tx; @@ -13,13 +14,14 @@ use pallas_traverse::wellknown::GenesisValues; use serde::Deserialize; use tokio::sync::RwLock; +mod blockfrost; pub mod mocks; mod n2c; #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct BlockchainConfig { - pub socket: PathBuf, + pub socket: Option, pub blockfrost_key: Option>, pub network: Option, pub network_magic: Option, @@ -66,6 +68,7 @@ impl BlockchainConfig { } enum ClientImpl { + Blockfrost(Blockfrost), NodeToClient(RwLock), Mock(MockChain), } @@ -77,26 +80,32 @@ pub struct BlockchainClient { } impl BlockchainClient { - pub async fn new(config: &CardanoConnectConfig) -> Self { + pub async fn new(config: &CardanoConnectConfig) -> Result { let blockchain = &config.connector.blockchain; - let n2c = { - let client = NodeToClient::new( - &blockchain.socket, - blockchain.magic(), - blockchain.genesis_hash(), - blockchain.genesis_values(), - blockchain.blockfrost_key.as_ref(), - ) - .await; - RwLock::new(client) + let client = match (&blockchain.socket, &blockchain.blockfrost_key) { + (Some(socket), key) => { + let client = NodeToClient::new( + socket, + blockchain.magic(), + blockchain.genesis_hash(), + blockchain.genesis_values(), + key.as_ref(), + ) + .await; + ClientImpl::NodeToClient(RwLock::new(client)) + } + (None, Some(key)) => { + let client = Blockfrost::new(&key.0, blockchain.genesis_hash()); + ClientImpl::Blockfrost(client) + } + (None, None) => bail!("Missing blockchain configuration"), }; - - Self { - client: ClientImpl::NodeToClient(n2c), + Ok(Self { + client, genesis_hash: blockchain.genesis_hash().to_string(), era: blockchain.era, - } + }) } #[allow(unused)] @@ -116,6 +125,7 @@ impl BlockchainClient { pub async fn health(&self) -> Result<()> { match &self.client { + ClientImpl::Blockfrost(_) => Ok(()), ClientImpl::Mock(_) => Ok(()), ClientImpl::NodeToClient(n2c) => { let client = n2c.read().await; @@ -126,6 +136,7 @@ impl BlockchainClient { pub async fn submit(&self, transaction: Tx) -> Result { match &self.client { + ClientImpl::Blockfrost(bf) => bf.submit(transaction).await, ClientImpl::Mock(_) => bail!("mock transaction submission not implemented"), ClientImpl::NodeToClient(n2c) => { let mut client = n2c.write().await; @@ -136,6 +147,7 @@ impl BlockchainClient { pub async fn sync(&self) -> Result { let inner: Box = match &self.client { + ClientImpl::Blockfrost(bf) => Box::new(bf.open_chainsync().await?), ClientImpl::Mock(mock) => Box::new(mock.sync()), ClientImpl::NodeToClient(n2c) => { let client = n2c.read().await; diff --git a/firefly-cardanoconnect/src/blockchain/blockfrost.rs b/firefly-cardanoconnect/src/blockchain/blockfrost.rs new file mode 100644 index 0000000..612dbed --- /dev/null +++ b/firefly-cardanoconnect/src/blockchain/blockfrost.rs @@ -0,0 +1,154 @@ +use anyhow::{bail, Context as _, Result}; +use async_trait::async_trait; +use blockfrost::{BlockFrostSettings, BlockfrostAPI, BlockfrostError, Pagination}; +use blockfrost_openapi::models::BlockContent; +use futures::future::try_join_all; +use pallas_primitives::conway::Tx; + +use crate::streams::{BlockInfo, BlockReference}; + +use super::{ChainSyncClient, RequestNextResponse}; + +pub struct Blockfrost { + api: BlockfrostAPI, + genesis_hash: String, +} + +impl Blockfrost { + pub fn new(blockfrost_key: &str, genesis_hash: &str) -> Self { + Self { + api: BlockfrostAPI::new(blockfrost_key, BlockFrostSettings::new()), + genesis_hash: genesis_hash.to_string(), + } + } + + pub async fn submit(&self, transaction: Tx) -> Result { + let transaction_data = { + let mut bytes = vec![]; + minicbor::encode(transaction, &mut bytes).expect("infallible"); + bytes + }; + Ok(self.api.transactions_submit(transaction_data).await?) + } + + pub async fn open_chainsync(&self) -> Result { + BlockfrostChainSync::new(self.api.clone(), self.genesis_hash.clone()).await + } +} + +pub struct BlockfrostChainSync { + api: BlockfrostAPI, + tip: BlockInfo, + genesis_hash: String, +} + +impl BlockfrostChainSync { + async fn new(api: BlockfrostAPI, genesis_hash: String) -> Result { + let tip = api.blocks_latest().await?; + let tip = parse_block(&api, tip).await?; + Ok(Self { + api, + tip, + genesis_hash, + }) + } + + async fn request_ref(&self, block_ref: &BlockReference) -> Result> { + let (requested_slot, requested_hash) = match block_ref { + BlockReference::Origin => (None, &self.genesis_hash), + BlockReference::Point(number, hash) => (*number, hash), + }; + let block = match self.api.blocks_by_id(requested_hash).await { + Err(BlockfrostError::Response { reason, .. }) if reason.status_code == 404 => { + return Ok(None) + } + Err(error) => return Err(error.into()), + Ok(block) => block, + }; + + if requested_slot.is_some_and(|s| block.slot.is_some_and(|b| b as u64 != s)) { + bail!("requested_block returned a block in the wrong slot"); + } + + let block_ref = BlockReference::Point(block.slot.map(|s| s as u64), block.hash); + Ok(Some(block_ref)) + } +} + +#[async_trait] +impl ChainSyncClient for BlockfrostChainSync { + async fn request_next(&mut self) -> Result { + bail!("not done yet") + } + async fn find_intersect( + &mut self, + points: &[BlockReference], + ) -> Result<(Option, BlockReference)> { + for point in points { + let Some(block) = self.request_ref(point).await? else { + continue; + }; + return Ok((Some(block), self.tip.as_reference())); + } + Ok((None, self.tip.as_reference())) + } + async fn request_block(&mut self, block_ref: &BlockReference) -> Result> { + let (requested_slot, requested_hash) = match block_ref { + BlockReference::Origin => (None, &self.genesis_hash), + BlockReference::Point(number, hash) => (*number, hash), + }; + request_block(&self.api, requested_hash, requested_slot).await + } +} + +pub async fn request_block( + api: &BlockfrostAPI, + hash: &str, + slot: Option, +) -> Result> { + let block = match api.blocks_by_id(hash).await { + Err(BlockfrostError::Response { reason, .. }) if reason.status_code == 404 => { + return Ok(None) + } + Err(error) => return Err(error.into()), + Ok(block) => block, + }; + + if slot.is_some_and(|s| block.slot.is_some_and(|b| b as u64 != s)) { + bail!("requested_block returned a block in the wrong slot"); + } + + Ok(Some(parse_block(api, block).await?)) +} + +async fn parse_block(api: &BlockfrostAPI, block: BlockContent) -> Result { + let block_hash = block.hash; + let block_height = block.height.map(|h| h as u64); + let block_slot = block.slot.map(|s| s as u64); + + let transaction_hashes = api.blocks_txs(&block_hash, Pagination::all()).await?; + + let tx_fetch_jobs = transaction_hashes + .iter() + .map(|tx_hash| fetch_tx(api, tx_hash)); + let transactions = try_join_all(tx_fetch_jobs).await?; + + let info = BlockInfo { + block_hash, + block_height, + block_slot, + parent_hash: block.previous_block, + transaction_hashes, + transactions, + }; + Ok(info) +} + +async fn fetch_tx(blockfrost: &BlockfrostAPI, hash: &str) -> Result> { + let tx_body = blockfrost + .transactions_cbor(hash) + .await + .context("could not fetch tx body")?; + let bytes = hex::decode(&tx_body.cbor)?; + Ok(bytes) +} diff --git a/firefly-cardanoconnect/src/blockchain/n2c.rs b/firefly-cardanoconnect/src/blockchain/n2c.rs index f6e07c6..85be165 100644 --- a/firefly-cardanoconnect/src/blockchain/n2c.rs +++ b/firefly-cardanoconnect/src/blockchain/n2c.rs @@ -5,8 +5,7 @@ use std::{ use anyhow::{anyhow, bail, Context, Result}; use async_trait::async_trait; -use blockfrost::{BlockFrostSettings, BlockfrostAPI, BlockfrostError, Pagination}; -use futures::future::try_join_all; +use blockfrost::{BlockFrostSettings, BlockfrostAPI}; use pallas_crypto::hash::Hasher; use pallas_network::{ facades::NodeClient, @@ -197,41 +196,7 @@ impl ChainSyncClient for N2cChainSync { BlockReference::Origin => (None, &self.genesis_hash), BlockReference::Point(number, hash) => (*number, hash), }; - let block = match self.blockfrost.blocks_by_id(requested_hash).await { - Err(BlockfrostError::Response { reason, .. }) if reason.status_code == 404 => { - return Ok(None) - } - Err(error) => return Err(error.into()), - Ok(block) => block, - }; - - let block_hash = block.hash; - let block_height = block.height.map(|h| h as u64); - let block_slot = block.slot.map(|s| s as u64); - if requested_slot.is_some_and(|s| block.slot.is_some_and(|b| b as u64 != s)) { - bail!("requested_block returned a block in the wrong slot"); - } - - let transaction_hashes = self - .blockfrost - .blocks_txs(requested_hash, Pagination::all()) - .await?; - - let tx_fetch_jobs = transaction_hashes - .iter() - .map(|hash| fetch_tx(&self.blockfrost, hash)); - let transactions = try_join_all(tx_fetch_jobs).await?; - - let info = BlockInfo { - block_hash, - block_height, - block_slot, - parent_hash: block.previous_block, - transaction_hashes, - transactions, - }; - - Ok(Some(info)) + super::blockfrost::request_block(&self.blockfrost, requested_hash, requested_slot).await } } @@ -305,12 +270,3 @@ fn point_to_block_ref(point: Point) -> BlockReference { Point::Specific(number, hash) => BlockReference::Point(Some(number), hex::encode(hash)), } } - -async fn fetch_tx(blockfrost: &BlockfrostAPI, hash: &str) -> Result> { - let tx_body = blockfrost - .transactions_cbor(hash) - .await - .context("could not fetch tx body")?; - let bytes = hex::decode(&tx_body.cbor)?; - Ok(bytes) -} diff --git a/firefly-cardanoconnect/src/main.rs b/firefly-cardanoconnect/src/main.rs index a58c0f5..330015b 100644 --- a/firefly-cardanoconnect/src/main.rs +++ b/firefly-cardanoconnect/src/main.rs @@ -60,7 +60,7 @@ async fn init_state(config: &CardanoConnectConfig, mock_data: bool) -> Result