From 85d13bb7be56b04d6b3e7132f217864abc4b6259 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Fri, 20 Sep 2024 11:19:36 -0400 Subject: [PATCH] Start querying the real chain --- Cargo.lock | 1 + firefly-cardanoconnect/Cargo.toml | 1 + firefly-cardanoconnect/src/blockchain.rs | 112 +++++++++---- .../src/blockchain/mocks.rs | 38 +++-- firefly-cardanoconnect/src/blockchain/n2c.rs | 158 ++++++++++++++++++ firefly-cardanoconnect/src/main.rs | 5 +- .../src/streams/blockchain.rs | 65 ++++--- firefly-cardanoconnect/src/streams/manager.rs | 9 +- firefly-cardanoconnect/src/streams/mux.rs | 8 +- 9 files changed, 312 insertions(+), 85 deletions(-) create mode 100644 firefly-cardanoconnect/src/blockchain/n2c.rs diff --git a/Cargo.lock b/Cargo.lock index 3cb9735..3ef5746 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -555,6 +555,7 @@ version = "0.1.0" dependencies = [ "aide", "anyhow", + "async-trait", "axum", "chrono", "clap", diff --git a/firefly-cardanoconnect/Cargo.toml b/firefly-cardanoconnect/Cargo.toml index 6955d49..f856ddb 100644 --- a/firefly-cardanoconnect/Cargo.toml +++ b/firefly-cardanoconnect/Cargo.toml @@ -7,6 +7,7 @@ repository = "https://github.com/SundaeSwap-finance/firefly-cardano" [dependencies] aide = { version = "0.13", features = ["axum"] } anyhow = "1" +async-trait = "0.1" axum = { version = "0.7", features = ["ws"] } clap = { version = "4", features = ["derive"] } chrono = "0.4" diff --git a/firefly-cardanoconnect/src/blockchain.rs b/firefly-cardanoconnect/src/blockchain.rs index 6711ea7..6e52355 100644 --- a/firefly-cardanoconnect/src/blockchain.rs +++ b/firefly-cardanoconnect/src/blockchain.rs @@ -1,17 +1,19 @@ use std::path::PathBuf; -use crate::config::CardanoConnectConfig; -use anyhow::{bail, Context, Result}; -use pallas_crypto::hash::Hasher; -use pallas_network::{ - facades::NodeClient, - miniprotocols::localtxsubmission::{EraTx, Response}, +use crate::{ + config::CardanoConnectConfig, + streams::{BlockInfo, BlockReference}, }; +use anyhow::{bail, Result}; +use async_trait::async_trait; +use mocks::MockChain; +use n2c::NodeToClient; use pallas_primitives::conway::Tx; use serde::Deserialize; -use tokio::sync::Mutex; +use tokio::sync::RwLock; pub mod mocks; +mod n2c; #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] @@ -43,8 +45,13 @@ impl BlockchainConfig { } } +enum ClientImpl { + NodeToClient(RwLock), + Mock(MockChain), +} + pub struct BlockchainClient { - n2c: Mutex, + client: ClientImpl, era: u16, } @@ -53,41 +60,78 @@ impl BlockchainClient { let blockchain = &config.connector.blockchain; let n2c = { - let client = NodeClient::connect(&blockchain.socket, blockchain.magic()) - .await - .context("could not connect to socket")?; - Mutex::new(client) + let client = NodeToClient::new(&blockchain.socket, blockchain.magic()).await?; + RwLock::new(client) }; Ok(Self { - n2c, + client: ClientImpl::NodeToClient(n2c), era: blockchain.era, }) } + #[allow(unused)] + pub fn mock() -> Self { + let mock_chain = MockChain::new(3000); + Self { + client: ClientImpl::Mock(mock_chain), + era: 0, + } + } + pub async fn submit(&self, transaction: Tx) -> Result { - let txid = { - let txid_bytes = Hasher::<256>::hash_cbor(&transaction.transaction_body); - hex::encode(txid_bytes) - }; - let era_tx = { - let mut bytes = vec![]; - minicbor::encode(transaction, &mut bytes).expect("infallible"); - EraTx(self.era, bytes) - }; - let response = { - let mut client = self.n2c.lock().await; - client - .submission() - .submit_tx(era_tx) - .await - .context("could not submit transaction")? - }; - match response { - Response::Accepted => Ok(txid), - Response::Rejected(reason) => { - bail!("transaction was rejected: {}", hex::encode(&reason.0)); + match &self.client { + ClientImpl::Mock(_) => bail!("mock transaction submission not implemented"), + ClientImpl::NodeToClient(n2c) => { + let mut client = n2c.write().await; + client.submit(transaction, self.era).await } } } + + pub async fn sync(&self) -> Result { + let inner: Box = match &self.client { + ClientImpl::Mock(mock) => Box::new(mock.sync()), + ClientImpl::NodeToClient(n2c) => { + let client = n2c.read().await; + Box::new(client.open_chainsync().await?) + } + }; + Ok(ChainSyncClientWrapper { inner }) + } +} + +#[async_trait] +pub trait ChainSyncClient { + async fn request_next(&mut self) -> Result; + async fn find_intersect( + &mut self, + points: &[BlockReference], + ) -> Result<(Option, BlockReference)>; + async fn request_block(&self, block_ref: &BlockReference) -> Result>; +} + +pub struct ChainSyncClientWrapper { + inner: Box, +} + +#[async_trait] +impl ChainSyncClient for ChainSyncClientWrapper { + async fn request_next(&mut self) -> Result { + self.inner.request_next().await + } + async fn find_intersect( + &mut self, + points: &[BlockReference], + ) -> Result<(Option, BlockReference)> { + self.inner.find_intersect(points).await + } + async fn request_block(&self, block_ref: &BlockReference) -> Result> { + self.inner.request_block(block_ref).await + } +} + +pub enum RequestNextResponse { + RollForward(BlockInfo, #[expect(dead_code)] BlockReference), + RollBackward(BlockReference, #[expect(dead_code)] BlockReference), } diff --git a/firefly-cardanoconnect/src/blockchain/mocks.rs b/firefly-cardanoconnect/src/blockchain/mocks.rs index a004f76..9d5a4c6 100644 --- a/firefly-cardanoconnect/src/blockchain/mocks.rs +++ b/firefly-cardanoconnect/src/blockchain/mocks.rs @@ -1,5 +1,7 @@ use std::{sync::Arc, time::Duration}; +use anyhow::Result; +use async_trait::async_trait; use dashmap::DashMap; use rand::{Rng as _, SeedableRng as _}; use rand_chacha::ChaChaRng; @@ -10,18 +12,16 @@ use tokio::{ use crate::streams::{BlockInfo, BlockReference}; -pub enum RequestNextResponse { - RollForward(BlockInfo, #[expect(dead_code)] BlockReference), - RollBackward(BlockReference, #[expect(dead_code)] BlockReference), -} +use super::{ChainSyncClient, RequestNextResponse}; pub struct MockChainSync { chain: MockChain, consumer_tip: BlockReference, } -impl MockChainSync { - pub async fn request_next(&mut self) -> RequestNextResponse { +#[async_trait] +impl ChainSyncClient for MockChainSync { + async fn request_next(&mut self) -> Result { loop { let chain = self.chain.read_lock().await; let tip = chain.last().map(|b| b.as_reference()).unwrap_or_default(); @@ -29,7 +29,7 @@ impl MockChainSync { // If they need ro roll back, let em know if let Some(rollback_to) = self.chain.find_rollback(&self.consumer_tip) { self.consumer_tip = rollback_to.clone(); - return RequestNextResponse::RollBackward(rollback_to, tip); + return Ok(RequestNextResponse::RollBackward(rollback_to, tip)); } // what are you waiting for? @@ -42,7 +42,7 @@ impl MockChainSync { if let Some(info) = chain.get(requested_block_number as usize) { self.consumer_tip = BlockReference::Point(requested_block_number, info.block_hash.clone()); - return RequestNextResponse::RollForward(info.clone(), tip); + return Ok(RequestNextResponse::RollForward(info.clone(), tip)); } // and now we wait until the chain changes and try again @@ -51,10 +51,10 @@ impl MockChainSync { } } - pub async fn find_intersect( + async fn find_intersect( &mut self, points: &[BlockReference], - ) -> (Option, BlockReference) { + ) -> Result<(Option, BlockReference)> { let chain = self.chain.read_lock().await; let intersect = points.iter().find_map(|point| match point { BlockReference::Origin => chain.first(), @@ -64,7 +64,11 @@ impl MockChainSync { }); self.consumer_tip = intersect.map(|b| b.as_reference()).unwrap_or_default(); let tip = chain.last().map(|b| b.as_reference()).unwrap_or_default(); - (intersect.map(|i| i.as_reference()), tip) + Ok((intersect.map(|i| i.as_reference()), tip)) + } + + async fn request_block(&self, block_ref: &BlockReference) -> Result> { + self.chain.request_block(block_ref).await } } @@ -103,11 +107,11 @@ impl MockChain { } } - pub async fn read_lock(&self) -> RwLockReadGuard> { + async fn read_lock(&self) -> RwLockReadGuard> { self.chain.read().await } - pub async fn wait_for_new_block(&self) { + async fn wait_for_new_block(&self) { self.new_block.notified().await; } @@ -123,15 +127,15 @@ impl MockChain { } // this is a simpler version of request_range from the block fetch protocol. - pub async fn request_block(&self, block_ref: &BlockReference) -> Option { + pub async fn request_block(&self, block_ref: &BlockReference) -> Result> { match block_ref { - BlockReference::Origin => None, + BlockReference::Origin => Ok(None), BlockReference::Point(number, hash) => { let chain = self.chain.read().await; - chain + Ok(chain .get(*number as usize) .filter(|b| b.block_hash == *hash) - .cloned() + .cloned()) } } } diff --git a/firefly-cardanoconnect/src/blockchain/n2c.rs b/firefly-cardanoconnect/src/blockchain/n2c.rs new file mode 100644 index 0000000..df193a6 --- /dev/null +++ b/firefly-cardanoconnect/src/blockchain/n2c.rs @@ -0,0 +1,158 @@ +use std::path::{Path, PathBuf}; + +use anyhow::{bail, Context, Ok, Result}; +use async_trait::async_trait; +use pallas_crypto::hash::Hasher; +use pallas_network::{ + facades::NodeClient, + miniprotocols::{ + chainsync::{BlockContent, NextResponse}, + localtxsubmission::{EraTx, Response}, + Point, + }, +}; +use pallas_primitives::conway::{MintedBlock, Tx}; + +use crate::streams::{BlockInfo, BlockReference}; + +use super::{ChainSyncClient, RequestNextResponse}; + +pub struct NodeToClient { + socket: PathBuf, + magic: u64, + client: NodeClient, +} + +impl NodeToClient { + pub async fn new(socket: &Path, magic: u64) -> Result { + let client = Self::connect(socket, magic).await?; + Ok(Self { + socket: socket.to_path_buf(), + magic, + client, + }) + } + + pub async fn submit(&mut self, transaction: Tx, era: u16) -> Result { + let txid = { + let txid_bytes = Hasher::<256>::hash_cbor(&transaction.transaction_body); + hex::encode(txid_bytes) + }; + let era_tx = { + let mut bytes = vec![]; + minicbor::encode(transaction, &mut bytes).expect("infallible"); + EraTx(era, bytes) + }; + let response = { + self.client + .submission() + .submit_tx(era_tx) + .await + .context("could not submit transaction")? + }; + match response { + Response::Accepted => Ok(txid), + Response::Rejected(reason) => { + bail!("transaction was rejected: {}", hex::encode(&reason.0)); + } + } + } + + pub async fn open_chainsync(&self) -> Result { + let client = Self::connect(&self.socket, self.magic).await?; + Ok(N2cChainSync { client }) + } + + async fn connect(socket: &Path, magic: u64) -> Result { + NodeClient::connect(socket, magic) + .await + .context("could not connect to socket") + } +} + +pub struct N2cChainSync { + client: NodeClient, +} + +#[async_trait] +impl ChainSyncClient for N2cChainSync { + async fn request_next(&mut self) -> Result { + loop { + let res = self + .client + .chainsync() + .request_or_await_next() + .await + .context("error waiting for next response")?; + match res { + NextResponse::Await => continue, + NextResponse::RollForward(content, tip) => { + let info = content_to_block_info(content).context("error parsing new block")?; + let tip = point_to_block_ref(tip.0); + return Ok(RequestNextResponse::RollForward(info, tip)); + } + NextResponse::RollBackward(point, tip) => { + let point = point_to_block_ref(point); + let tip = point_to_block_ref(tip.0); + return Ok(RequestNextResponse::RollBackward(point, tip)); + } + }; + } + } + async fn find_intersect( + &mut self, + points: &[BlockReference], + ) -> Result<(Option, BlockReference)> { + let points: Result> = points.iter().map(block_ref_to_point).collect(); + let (intersect, tip) = self.client.chainsync().find_intersect(points?).await?; + let intersect = intersect.map(point_to_block_ref); + let tip = point_to_block_ref(tip.0); + + Ok((intersect, tip)) + } + async fn request_block(&self, _block_ref: &BlockReference) -> Result> { + bail!("request_block not implemented") + } +} + +fn block_ref_to_point(block_ref: &BlockReference) -> Result { + match block_ref { + BlockReference::Origin => Ok(Point::Origin), + BlockReference::Point(number, hash) => Ok(Point::Specific(*number, hex::decode(hash)?)), + } +} + +fn point_to_block_ref(point: Point) -> BlockReference { + match point { + Point::Origin => BlockReference::Origin, + Point::Specific(number, hash) => BlockReference::Point(number, hex::encode(hash)), + } +} + +fn content_to_block_info(content: BlockContent) -> Result { + type BlockWrapper<'b> = (u16, MintedBlock<'b>); + + let (_, block): BlockWrapper = minicbor::decode(&content.0)?; + let block_hash = { + let header = block.header.raw_cbor(); + let hash = Hasher::<256>::hash(header); + hex::encode(hash) + }; + let header = &block.header.header_body; + let parent_hash = header.prev_hash.map(hex::encode).unwrap_or_default(); + let transaction_hashes = block + .transaction_bodies + .iter() + .map(|t| { + let body = t.raw_cbor(); + let hash = Hasher::<256>::hash(body); + hex::encode(hash) + }) + .collect(); + Ok(BlockInfo { + block_number: header.block_number, + block_hash, + parent_hash, + transaction_hashes, + }) +} diff --git a/firefly-cardanoconnect/src/main.rs b/firefly-cardanoconnect/src/main.rs index 4e36c14..c812b05 100644 --- a/firefly-cardanoconnect/src/main.rs +++ b/firefly-cardanoconnect/src/main.rs @@ -48,11 +48,12 @@ struct AppState { #[instrument(err(Debug))] async fn init_state(config: &CardanoConnectConfig) -> Result { let persistence = Arc::new(Persistence::default()); + let blockchain = Arc::new(BlockchainClient::new(config).await?); let state = AppState { - blockchain: Arc::new(BlockchainClient::new(config).await?), + blockchain: blockchain.clone(), signer: Arc::new(CardanoSigner::new(config)?), - stream_manager: Arc::new(StreamManager::new(persistence).await?), + stream_manager: Arc::new(StreamManager::new(persistence, blockchain).await?), }; Ok(state) diff --git a/firefly-cardanoconnect/src/streams/blockchain.rs b/firefly-cardanoconnect/src/streams/blockchain.rs index f017bbf..cc2e907 100644 --- a/firefly-cardanoconnect/src/streams/blockchain.rs +++ b/firefly-cardanoconnect/src/streams/blockchain.rs @@ -6,8 +6,9 @@ use std::{ use anyhow::{bail, Result}; use dashmap::DashMap; use tokio::sync::mpsc; +use tracing::warn; -use crate::blockchain::mocks::{MockChain, MockChainSync, RequestNextResponse}; +use crate::blockchain::{BlockchainClient, ChainSyncClient, RequestNextResponse}; use super::{BlockInfo, BlockReference, ListenerId}; @@ -18,22 +19,16 @@ pub enum ListenerEvent { } pub struct DataSource { - chain: Arc, + blockchain: Arc, db: Arc, } -impl Default for DataSource { - fn default() -> Self { - Self::new() - } -} - const APPROXIMATELY_IMMUTABLE_LENGTH: usize = 20; impl DataSource { - pub fn new() -> Self { + pub fn new(blockchain: Arc) -> Self { Self { - chain: Arc::new(MockChain::new(3000)), + blockchain, db: Arc::new(BlockDatabase::default()), } } @@ -43,7 +38,7 @@ impl DataSource { id: ListenerId, from: Option<&BlockReference>, ) -> Result { - ChainListener::init(id, &self.chain, &self.db, from).await + ChainListener::init(id, &self.blockchain, &self.db, from).await } } @@ -58,7 +53,7 @@ pub struct ChainListener { impl ChainListener { async fn init( id: ListenerId, - chain: &MockChain, + chain: &BlockchainClient, db: &Arc, from: Option<&BlockReference>, ) -> Result { @@ -220,7 +215,7 @@ impl ChainListener { async fn init_existing( id: ListenerId, - chain: &MockChain, + chain: &BlockchainClient, db: &Arc, records: Vec, ) -> Result { @@ -236,15 +231,15 @@ impl ChainListener { history.sort_by_key(|b| b.block_number); let mut history: VecDeque = history.into(); - let mut sync = chain.sync(); + let mut sync = chain.sync().await?; let points: Vec<_> = history.iter().rev().map(|b| b.as_reference()).collect(); - let (head_ref, _) = sync.find_intersect(&points).await; + let (head_ref, _) = sync.find_intersect(&points).await?; let Some(head_ref) = head_ref else { // The chain didn't recognize any of the blocks we saved from this chain. // We have no way to recover. bail!("listener {id} is on a fork which no longer exists"); }; - let Some(head) = chain.request_block(&head_ref).await else { + let Some(head) = sync.request_block(&head_ref).await? else { bail!("listener {id} is on a fork which no longer exists"); }; @@ -271,15 +266,15 @@ impl ChainListener { async fn init_new( id: ListenerId, - chain: &MockChain, + chain: &BlockchainClient, db: &Arc, from: Option<&BlockReference>, ) -> Result { - let mut sync = chain.sync(); + let mut sync = chain.sync().await?; let head_ref = match from { Some(block_ref) => { // If the caller passed a block reference, they're starting from either the origin or a specific point - let (head, _) = sync.find_intersect(&[block_ref.clone()]).await; + let (head, _) = sync.find_intersect(&[block_ref.clone()]).await?; let Some(head) = head else { // Trying to init a fresh listener from a ref which does not exist bail!("could not start listening from {from:?}, as it does not exist on-chain"); @@ -288,16 +283,16 @@ impl ChainListener { } None => { // Otherwise, they just want to follow from the tip - let (_, tip) = sync.find_intersect(&[]).await; + let (_, tip) = sync.find_intersect(&[]).await?; // Call find_intersect again so the chainsync protocol knows we're following from the tip - let (head, _) = sync.find_intersect(&[tip.clone()]).await; + let (head, _) = sync.find_intersect(&[tip.clone()]).await?; if !head.is_some_and(|h| h == tip) { bail!("could not start listening from latest: rollback occurred while we were connecting"); }; tip } }; - let Some(head) = chain.request_block(&head_ref).await else { + let Some(head) = sync.request_block(&head_ref).await? else { // Trying to init a fresh listener from a ref which does not exist bail!("could not start listening from {from:?}, as it does not exist on-chain"); }; @@ -312,9 +307,15 @@ impl ChainListener { break; } let prev_ref = BlockReference::Point(oldest_number - 1, prev_hash); - let Some(prev_block) = chain.request_block(&prev_ref).await else { - // The chain rolled back while we were building up history - bail!("block {from:?} was rolled back before we could finish setting it up"); + let prev_block = match sync.request_block(&prev_ref).await { + Err(err) => { + warn!("could not populate a history for this listener, it may not be able to recover from rollback: {}", err); + break; + } + Ok(None) => { + bail!("block {from:?} was rolled back before we could finish setting it up") + } + Ok(Some(prev_block)) => prev_block, }; oldest_number = prev_block.block_number; @@ -334,9 +335,19 @@ impl ChainListener { }) } - async fn stay_in_sync(mut sync: MockChainSync, sync_event_sink: mpsc::Sender) { + async fn stay_in_sync( + mut sync: impl ChainSyncClient, + sync_event_sink: mpsc::Sender, + ) { loop { - let next_event = match sync.request_next().await { + let next_response = match sync.request_next().await { + Ok(response) => response, + Err(error) => { + warn!("Error syncing with chain: {:#}", error); + break; + } + }; + let next_event = match next_response { RequestNextResponse::RollForward(tip, _) => ChainSyncEvent::RollForward(tip), RequestNextResponse::RollBackward(rollback_to, _) => { ChainSyncEvent::RollBackward(rollback_to) diff --git a/firefly-cardanoconnect/src/streams/manager.rs b/firefly-cardanoconnect/src/streams/manager.rs index 3543928..0ef916a 100644 --- a/firefly-cardanoconnect/src/streams/manager.rs +++ b/firefly-cardanoconnect/src/streams/manager.rs @@ -5,7 +5,7 @@ use firefly_server::apitypes::{ApiError, ApiResult}; use tokio::sync::mpsc; use ulid::Ulid; -use crate::persistence::Persistence; +use crate::{blockchain::BlockchainClient, persistence::Persistence}; use super::{ mux::{Batch, Multiplexer}, @@ -18,10 +18,13 @@ pub struct StreamManager { } impl StreamManager { - pub async fn new(persistence: Arc) -> Result { + pub async fn new( + persistence: Arc, + blockchain: Arc, + ) -> Result { Ok(Self { persistence: persistence.clone(), - mux: Multiplexer::new(persistence).await?, + mux: Multiplexer::new(persistence, blockchain).await?, }) } diff --git a/firefly-cardanoconnect/src/streams/mux.rs b/firefly-cardanoconnect/src/streams/mux.rs index 6388f15..fdc2875 100644 --- a/firefly-cardanoconnect/src/streams/mux.rs +++ b/firefly-cardanoconnect/src/streams/mux.rs @@ -16,6 +16,7 @@ use tokio::{ use tracing::warn; use crate::{ + blockchain::BlockchainClient, persistence::Persistence, streams::{blockchain::ListenerEvent, EventData, EventId}, }; @@ -35,8 +36,11 @@ pub struct Multiplexer { } impl Multiplexer { - pub async fn new(persistence: Arc) -> Result { - let data_source = Arc::new(DataSource::new()); + pub async fn new( + persistence: Arc, + blockchain: Arc, + ) -> Result { + let data_source = Arc::new(DataSource::new(blockchain)); let dispatchers = DashMap::new(); let stream_ids_by_topic = DashMap::new();