Skip to content

Commit

Permalink
Start querying the real chain
Browse files Browse the repository at this point in the history
  • Loading branch information
SupernaviX committed Sep 20, 2024
1 parent 06fada9 commit 85d13bb
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 85 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions firefly-cardanoconnect/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ repository = "https://github.com/SundaeSwap-finance/firefly-cardano"
[dependencies]
aide = { version = "0.13", features = ["axum"] }
anyhow = "1"
async-trait = "0.1"
axum = { version = "0.7", features = ["ws"] }
clap = { version = "4", features = ["derive"] }
chrono = "0.4"
Expand Down
112 changes: 78 additions & 34 deletions firefly-cardanoconnect/src/blockchain.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use std::path::PathBuf;

use crate::config::CardanoConnectConfig;
use anyhow::{bail, Context, Result};
use pallas_crypto::hash::Hasher;
use pallas_network::{
facades::NodeClient,
miniprotocols::localtxsubmission::{EraTx, Response},
use crate::{
config::CardanoConnectConfig,
streams::{BlockInfo, BlockReference},
};
use anyhow::{bail, Result};
use async_trait::async_trait;
use mocks::MockChain;
use n2c::NodeToClient;
use pallas_primitives::conway::Tx;
use serde::Deserialize;
use tokio::sync::Mutex;
use tokio::sync::RwLock;

pub mod mocks;
mod n2c;

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -43,8 +45,13 @@ impl BlockchainConfig {
}
}

enum ClientImpl {
NodeToClient(RwLock<NodeToClient>),
Mock(MockChain),
}

pub struct BlockchainClient {
n2c: Mutex<NodeClient>,
client: ClientImpl,
era: u16,
}

Expand All @@ -53,41 +60,78 @@ impl BlockchainClient {
let blockchain = &config.connector.blockchain;

let n2c = {
let client = NodeClient::connect(&blockchain.socket, blockchain.magic())
.await
.context("could not connect to socket")?;
Mutex::new(client)
let client = NodeToClient::new(&blockchain.socket, blockchain.magic()).await?;
RwLock::new(client)
};

Ok(Self {
n2c,
client: ClientImpl::NodeToClient(n2c),
era: blockchain.era,
})
}

#[allow(unused)]
pub fn mock() -> Self {
let mock_chain = MockChain::new(3000);
Self {
client: ClientImpl::Mock(mock_chain),
era: 0,
}
}

pub async fn submit(&self, transaction: Tx) -> Result<String> {
let txid = {
let txid_bytes = Hasher::<256>::hash_cbor(&transaction.transaction_body);
hex::encode(txid_bytes)
};
let era_tx = {
let mut bytes = vec![];
minicbor::encode(transaction, &mut bytes).expect("infallible");
EraTx(self.era, bytes)
};
let response = {
let mut client = self.n2c.lock().await;
client
.submission()
.submit_tx(era_tx)
.await
.context("could not submit transaction")?
};
match response {
Response::Accepted => Ok(txid),
Response::Rejected(reason) => {
bail!("transaction was rejected: {}", hex::encode(&reason.0));
match &self.client {
ClientImpl::Mock(_) => bail!("mock transaction submission not implemented"),
ClientImpl::NodeToClient(n2c) => {
let mut client = n2c.write().await;
client.submit(transaction, self.era).await
}
}
}

pub async fn sync(&self) -> Result<ChainSyncClientWrapper> {
let inner: Box<dyn ChainSyncClient + Send + Sync> = match &self.client {
ClientImpl::Mock(mock) => Box::new(mock.sync()),
ClientImpl::NodeToClient(n2c) => {
let client = n2c.read().await;
Box::new(client.open_chainsync().await?)
}
};
Ok(ChainSyncClientWrapper { inner })
}
}

#[async_trait]
pub trait ChainSyncClient {
async fn request_next(&mut self) -> Result<RequestNextResponse>;
async fn find_intersect(
&mut self,
points: &[BlockReference],
) -> Result<(Option<BlockReference>, BlockReference)>;
async fn request_block(&self, block_ref: &BlockReference) -> Result<Option<BlockInfo>>;
}

pub struct ChainSyncClientWrapper {
inner: Box<dyn ChainSyncClient + Send + Sync>,
}

#[async_trait]
impl ChainSyncClient for ChainSyncClientWrapper {
async fn request_next(&mut self) -> Result<RequestNextResponse> {
self.inner.request_next().await
}
async fn find_intersect(
&mut self,
points: &[BlockReference],
) -> Result<(Option<BlockReference>, BlockReference)> {
self.inner.find_intersect(points).await
}
async fn request_block(&self, block_ref: &BlockReference) -> Result<Option<BlockInfo>> {
self.inner.request_block(block_ref).await
}
}

pub enum RequestNextResponse {
RollForward(BlockInfo, #[expect(dead_code)] BlockReference),
RollBackward(BlockReference, #[expect(dead_code)] BlockReference),
}
38 changes: 21 additions & 17 deletions firefly-cardanoconnect/src/blockchain/mocks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{sync::Arc, time::Duration};

use anyhow::Result;
use async_trait::async_trait;
use dashmap::DashMap;
use rand::{Rng as _, SeedableRng as _};
use rand_chacha::ChaChaRng;
Expand All @@ -10,26 +12,24 @@ use tokio::{

use crate::streams::{BlockInfo, BlockReference};

pub enum RequestNextResponse {
RollForward(BlockInfo, #[expect(dead_code)] BlockReference),
RollBackward(BlockReference, #[expect(dead_code)] BlockReference),
}
use super::{ChainSyncClient, RequestNextResponse};

pub struct MockChainSync {
chain: MockChain,
consumer_tip: BlockReference,
}

impl MockChainSync {
pub async fn request_next(&mut self) -> RequestNextResponse {
#[async_trait]
impl ChainSyncClient for MockChainSync {
async fn request_next(&mut self) -> Result<RequestNextResponse> {
loop {
let chain = self.chain.read_lock().await;
let tip = chain.last().map(|b| b.as_reference()).unwrap_or_default();

// If they need ro roll back, let em know
if let Some(rollback_to) = self.chain.find_rollback(&self.consumer_tip) {
self.consumer_tip = rollback_to.clone();
return RequestNextResponse::RollBackward(rollback_to, tip);
return Ok(RequestNextResponse::RollBackward(rollback_to, tip));
}

// what are you waiting for?
Expand All @@ -42,7 +42,7 @@ impl MockChainSync {
if let Some(info) = chain.get(requested_block_number as usize) {
self.consumer_tip =
BlockReference::Point(requested_block_number, info.block_hash.clone());
return RequestNextResponse::RollForward(info.clone(), tip);
return Ok(RequestNextResponse::RollForward(info.clone(), tip));
}

// and now we wait until the chain changes and try again
Expand All @@ -51,10 +51,10 @@ impl MockChainSync {
}
}

pub async fn find_intersect(
async fn find_intersect(
&mut self,
points: &[BlockReference],
) -> (Option<BlockReference>, BlockReference) {
) -> Result<(Option<BlockReference>, BlockReference)> {
let chain = self.chain.read_lock().await;
let intersect = points.iter().find_map(|point| match point {
BlockReference::Origin => chain.first(),
Expand All @@ -64,7 +64,11 @@ impl MockChainSync {
});
self.consumer_tip = intersect.map(|b| b.as_reference()).unwrap_or_default();
let tip = chain.last().map(|b| b.as_reference()).unwrap_or_default();
(intersect.map(|i| i.as_reference()), tip)
Ok((intersect.map(|i| i.as_reference()), tip))
}

async fn request_block(&self, block_ref: &BlockReference) -> Result<Option<BlockInfo>> {
self.chain.request_block(block_ref).await
}
}

Expand Down Expand Up @@ -103,11 +107,11 @@ impl MockChain {
}
}

pub async fn read_lock(&self) -> RwLockReadGuard<Vec<BlockInfo>> {
async fn read_lock(&self) -> RwLockReadGuard<Vec<BlockInfo>> {
self.chain.read().await
}

pub async fn wait_for_new_block(&self) {
async fn wait_for_new_block(&self) {
self.new_block.notified().await;
}

Expand All @@ -123,15 +127,15 @@ impl MockChain {
}

// this is a simpler version of request_range from the block fetch protocol.
pub async fn request_block(&self, block_ref: &BlockReference) -> Option<BlockInfo> {
pub async fn request_block(&self, block_ref: &BlockReference) -> Result<Option<BlockInfo>> {
match block_ref {
BlockReference::Origin => None,
BlockReference::Origin => Ok(None),
BlockReference::Point(number, hash) => {
let chain = self.chain.read().await;
chain
Ok(chain
.get(*number as usize)
.filter(|b| b.block_hash == *hash)
.cloned()
.cloned())
}
}
}
Expand Down
Loading

0 comments on commit 85d13bb

Please sign in to comment.