diff --git a/firefly-cardanoconnect/src/contracts.rs b/firefly-cardanoconnect/src/contracts.rs index 514cf92..5f50dd1 100644 --- a/firefly-cardanoconnect/src/contracts.rs +++ b/firefly-cardanoconnect/src/contracts.rs @@ -1,7 +1,12 @@ -use std::{path::PathBuf, sync::Arc}; +use std::{ + collections::{BTreeSet, HashMap}, + path::PathBuf, + sync::{Arc, Weak}, +}; use anyhow::{bail, Result}; use balius_runtime::{ledgers::Ledger, Response, Runtime, Store}; +use dashmap::DashMap; use ledger::BlockfrostLedger; use serde::Deserialize; use serde_json::{json, Value}; @@ -9,23 +14,30 @@ use tokio::{ fs, sync::{Mutex, RwLock}, }; +use tracing::error; +use u5c::convert_block; -use crate::blockfrost::BlockfrostClient; +use crate::{ + blockfrost::BlockfrostClient, + streams::{BlockInfo, BlockReference, Event, Listener, ListenerFilter, ListenerId}, +}; mod ledger; +mod u5c; #[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct ContractsConfig { pub components_path: PathBuf, - pub store_path: PathBuf, + pub stores_path: PathBuf, pub cache_size: Option, } pub struct ContractManager { config: Option, - blockfrost: Option, - runtime: RwLock>, + ledger: Option, + invoke_runtime: RwLock>, + listener_runtimes: DashMap, } impl ContractManager { @@ -34,19 +46,25 @@ impl ContractManager { blockfrost: Option, ) -> Result { fs::create_dir_all(&config.components_path).await?; - let runtime = Self::new_runtime(config, blockfrost.clone()).await?; + let ledger = blockfrost.map(|client| { + let ledger = BlockfrostLedger::new(client); + Ledger::Custom(Arc::new(Mutex::new(ledger))) + }); + let runtime = Self::new_invoke_runtime(config, ledger.clone()).await?; Ok(Self { config: Some(config.clone()), - blockfrost, - runtime: RwLock::new(Some(runtime)), + ledger, + invoke_runtime: RwLock::new(Some(runtime)), + listener_runtimes: DashMap::new(), }) } pub fn none() -> Self { Self { config: None, - blockfrost: None, - runtime: RwLock::new(None), + ledger: None, + invoke_runtime: RwLock::new(None), + listener_runtimes: DashMap::new(), } } @@ -56,9 +74,36 @@ impl ContractManager { }; let path = config.components_path.join(format!("{id}.wasm")); fs::write(&path, contract).await?; - let mut rt_lock = self.runtime.write().await; - *rt_lock = None; // drop the old worker before opening the new - *rt_lock = Some(Self::new_runtime(config, self.blockfrost.clone()).await?); + + // update the invoke runtime + let mut rt_lock = self.invoke_runtime.write().await; + *rt_lock = None; // drop the old runtime before creating the new + *rt_lock = Some(Self::new_invoke_runtime(config, self.ledger.clone()).await?); + drop(rt_lock); + + // update any listener runtimes + for entry in self.listener_runtimes.iter() { + let Some(runtime) = entry.runtime.upgrade() else { + let key = entry.key().clone(); + self.listener_runtimes.remove(&key); + continue; + }; + if !entry.contracts.iter().any(|it| it == id) { + continue; + } + let mut rt_lock = runtime.lock().await; + *rt_lock = None; // drop the old runtime before creating the new + *rt_lock = Some( + Self::new_listen_runtime( + config, + self.ledger.clone(), + entry.key(), + &entry.contracts, + ) + .await?, + ); + } + Ok(()) } @@ -69,7 +114,7 @@ impl ContractManager { params: Value, ) -> Result>> { let params = serde_json::to_vec(¶ms)?; - let rt_lock = self.runtime.read().await; + let rt_lock = self.invoke_runtime.read().await; let Some(runtime) = rt_lock.as_ref() else { bail!("Contract manager not configured") }; @@ -80,18 +125,32 @@ impl ContractManager { } } - async fn new_runtime( + pub async fn listen(&self, listener: &Listener) -> ContractListener { + let contracts = find_contract_names(&listener.filters); + let runtime = match self.config.as_ref() { + Some(config) => { + Self::new_listen_runtime(config, self.ledger.clone(), &listener.id, &contracts) + .await + .ok() + } + None => None, + }; + let runtime = Arc::new(Mutex::new(runtime)); + + let handle = ContractListenerHandle { + contracts, + runtime: Arc::downgrade(&runtime), + }; + + self.listener_runtimes.insert(listener.id.clone(), handle); + ContractListener::new(runtime) + } + + async fn new_invoke_runtime( config: &ContractsConfig, - blockfrost: Option, + ledger: Option, ) -> Result { - let store = Store::open(&config.store_path, config.cache_size)?; - let mut runtime_builder = Runtime::builder(store); - if let Some(client) = blockfrost { - let ledger = BlockfrostLedger::new(client); - runtime_builder = - runtime_builder.with_ledger(Ledger::Custom(Arc::new(Mutex::new(ledger)))) - } - let mut runtime = runtime_builder.build()?; + let mut runtime = Self::new_runtime(config, ledger, "invoke")?; let mut entries = fs::read_dir(&config.components_path).await?; while let Some(entry) = entries.next_entry().await? { let extless = entry.path().with_extension(""); @@ -104,4 +163,82 @@ impl ContractManager { } Ok(runtime) } + + async fn new_listen_runtime( + config: &ContractsConfig, + ledger: Option, + listener: &ListenerId, + contracts: &[String], + ) -> Result { + let mut runtime = Self::new_runtime(config, ledger, listener.to_string())?; + for contract in contracts { + let wasm_path = config.components_path.join(contract).with_extension("wasm"); + runtime + .register_worker(contract, wasm_path, json!(null)) + .await?; + } + Ok(runtime) + } + + fn new_runtime( + config: &ContractsConfig, + ledger: Option, + store_name: impl AsRef, + ) -> Result { + let store_path = config + .stores_path + .join(store_name.as_ref()) + .with_extension("wasm"); + let store = Store::open(&store_path, config.cache_size)?; + let mut runtime_builder = Runtime::builder(store); + if let Some(ledger) = ledger { + runtime_builder = runtime_builder.with_ledger(ledger) + } + Ok(runtime_builder.build()?) + } +} + +pub struct ContractListener { + runtime: Arc>>, + cache: HashMap>, +} + +impl ContractListener { + fn new(runtime: Arc>>) -> Self { + Self { + runtime, + cache: HashMap::new(), + } + } + + pub async fn gather_events(&self, rollbacks: &[BlockInfo], block: &BlockInfo) { + let mut lock = self.runtime.lock().await; + let runtime = lock.as_mut().unwrap(); + let undo_blocks = rollbacks.iter().map(convert_block).collect(); + let next_block = convert_block(block); + if let Err(error) = runtime.handle_chain(&undo_blocks, &next_block).await { + error!("could not gather events for new blocks: {error}"); + } + + // TODO: actually gather events + } + + pub async fn events_for(&self, block_ref: &BlockReference) -> Vec { + self.cache.get(block_ref).cloned().unwrap_or_default() + } +} + +struct ContractListenerHandle { + contracts: Vec, + runtime: Weak>>, +} + +fn find_contract_names(filters: &[ListenerFilter]) -> Vec { + let mut result = BTreeSet::new(); + for filter in filters { + if let ListenerFilter::Event { contract, .. } = filter { + result.insert(contract.clone()); + } + } + result.into_iter().collect() } diff --git a/firefly-cardanoconnect/src/contracts/u5c.rs b/firefly-cardanoconnect/src/contracts/u5c.rs new file mode 100644 index 0000000..f428312 --- /dev/null +++ b/firefly-cardanoconnect/src/contracts/u5c.rs @@ -0,0 +1,216 @@ +use pallas_primitives::conway; +use pallas_traverse::ComputeHash as _; +use utxorpc_spec::utxorpc::v1alpha::cardano; + +use crate::streams::BlockInfo; + +fn convert_plutus_data(data: conway::PlutusData) -> cardano::PlutusData { + let inner = match data { + conway::PlutusData::Constr(con) => { + let mut constr = cardano::Constr { + tag: con.tag as u32, + any_constructor: con.any_constructor.unwrap_or_default(), + ..cardano::Constr::default() + }; + for field in con.fields.to_vec() { + constr.fields.push(convert_plutus_data(field)); + } + cardano::plutus_data::PlutusData::Constr(constr) + } + conway::PlutusData::Map(map) => { + let mut new_map = cardano::PlutusDataMap::default(); + for (key, value) in map.to_vec() { + new_map.pairs.push(cardano::PlutusDataPair { + key: Some(convert_plutus_data(key)), + value: Some(convert_plutus_data(value)), + }); + } + cardano::plutus_data::PlutusData::Map(new_map) + } + conway::PlutusData::BigInt(int) => { + let inner = match int { + conway::BigInt::Int(i) => { + let value: i128 = i.into(); + cardano::big_int::BigInt::Int(value as i64) + } + conway::BigInt::BigUInt(i) => cardano::big_int::BigInt::BigUInt(i.to_vec().into()), + conway::BigInt::BigNInt(i) => cardano::big_int::BigInt::BigNInt(i.to_vec().into()), + }; + cardano::plutus_data::PlutusData::BigInt(cardano::BigInt { + big_int: Some(inner), + }) + } + conway::PlutusData::BoundedBytes(bytes) => { + cardano::plutus_data::PlutusData::BoundedBytes(bytes.to_vec().into()) + } + conway::PlutusData::Array(array) => { + let mut new_array = cardano::PlutusDataArray::default(); + for item in array.to_vec() { + new_array.items.push(convert_plutus_data(item)); + } + cardano::plutus_data::PlutusData::Array(new_array) + } + }; + cardano::PlutusData { + plutus_data: Some(inner), + } +} + +fn convert_native_script(script: conway::NativeScript) -> cardano::NativeScript { + fn convert_native_script_list(scripts: Vec) -> cardano::NativeScriptList { + let mut new_list = cardano::NativeScriptList::default(); + for script in scripts { + new_list.items.push(convert_native_script(script)); + } + new_list + } + use cardano::native_script::NativeScript as InnerNativeScript; + let inner = match script { + conway::NativeScript::ScriptPubkey(hash) => { + InnerNativeScript::ScriptPubkey(hash.to_vec().into()) + } + conway::NativeScript::ScriptAll(scripts) => { + InnerNativeScript::ScriptAll(convert_native_script_list(scripts)) + } + conway::NativeScript::ScriptAny(scripts) => { + InnerNativeScript::ScriptAny(convert_native_script_list(scripts)) + } + conway::NativeScript::ScriptNOfK(k, scripts) => { + InnerNativeScript::ScriptNOfK(cardano::ScriptNOfK { + k, + scripts: scripts.into_iter().map(convert_native_script).collect(), + }) + } + conway::NativeScript::InvalidBefore(_) => todo!(), + conway::NativeScript::InvalidHereafter(_) => todo!(), + }; + cardano::NativeScript { + native_script: Some(inner), + } +} + +fn convert_tx(bytes: &[u8]) -> cardano::Tx { + use pallas_primitives::{alonzo, conway}; + + let real_tx: conway::Tx = minicbor::decode(bytes).unwrap(); + let mut tx = cardano::Tx::default(); + for real_output in real_tx.transaction_body.outputs { + let mut output = cardano::TxOutput::default(); + match real_output { + conway::PseudoTransactionOutput::Legacy(txo) => { + output.address = txo.address.to_vec().into(); + if let Some(hash) = txo.datum_hash { + output.datum = Some(cardano::Datum { + hash: hash.to_vec().into(), + ..cardano::Datum::default() + }); + } + match txo.amount { + alonzo::Value::Coin(c) => { + output.coin = c; + } + alonzo::Value::Multiasset(c, assets) => { + output.coin = c; + for (policy_id, policy_assets) in assets.iter() { + let assets = policy_assets + .iter() + .map(|(name, amount)| cardano::Asset { + name: name.to_vec().into(), + output_coin: *amount, + ..cardano::Asset::default() + }) + .collect(); + output.assets.push(cardano::Multiasset { + policy_id: policy_id.to_vec().into(), + assets, + ..cardano::Multiasset::default() + }); + } + } + } + } + pallas_primitives::conway::PseudoTransactionOutput::PostAlonzo(txo) => { + output.address = txo.address.to_vec().into(); + if let Some(datum_option) = txo.datum_option { + let mut datum = cardano::Datum::default(); + match datum_option { + conway::PseudoDatumOption::Hash(hash) => { + datum.hash = hash.to_vec().into(); + } + conway::PseudoDatumOption::Data(data) => { + let mut cbor = vec![]; + minicbor::encode(&data, &mut cbor).expect("infallible"); + datum.hash = data.0.compute_hash().to_vec().into(); + datum.payload = Some(convert_plutus_data(data.0)); + datum.original_cbor = cbor.into(); + } + } + } + match txo.value { + conway::Value::Coin(c) => { + output.coin = c; + } + conway::Value::Multiasset(c, assets) => { + output.coin = c; + for (policy_id, policy_assets) in assets.iter() { + let assets = policy_assets + .iter() + .map(|(name, amount)| cardano::Asset { + name: name.to_vec().into(), + output_coin: amount.into(), + ..cardano::Asset::default() + }) + .collect(); + output.assets.push(cardano::Multiasset { + policy_id: policy_id.to_vec().into(), + assets, + ..cardano::Multiasset::default() + }); + } + } + } + if let Some(script) = txo.script_ref { + let inner = match script.0 { + conway::PseudoScript::NativeScript(script) => { + cardano::script::Script::Native(convert_native_script(script)) + } + conway::PseudoScript::PlutusV1Script(script) => { + cardano::script::Script::PlutusV1(script.0.to_vec().into()) + } + conway::PseudoScript::PlutusV2Script(script) => { + cardano::script::Script::PlutusV2(script.0.to_vec().into()) + } + conway::PseudoScript::PlutusV3Script(script) => { + cardano::script::Script::PlutusV3(script.0.to_vec().into()) + } + }; + output.script = Some(cardano::Script { + script: Some(inner), + }); + } + } + } + tx.outputs.push(output); + } + tx +} + +/** + * Convert a block in our internal format (basically the bytes on the chain) + * into one in balius format (mostly a list of utxorpc-formatted txos) + */ +pub fn convert_block(info: &BlockInfo) -> balius_runtime::Block { + let header = cardano::BlockHeader { + slot: info.block_slot.unwrap_or_default(), + hash: hex::decode(&info.block_hash).unwrap().into(), + height: info.block_height.unwrap_or_default(), + }; + let body = cardano::BlockBody { + tx: info.transactions.iter().map(|tx| convert_tx(tx)).collect(), + }; + let block = cardano::Block { + header: Some(header), + body: Some(body), + }; + balius_runtime::Block::Cardano(block) +} diff --git a/firefly-cardanoconnect/src/main.rs b/firefly-cardanoconnect/src/main.rs index 4dcb5fc..2e0b88d 100644 --- a/firefly-cardanoconnect/src/main.rs +++ b/firefly-cardanoconnect/src/main.rs @@ -80,7 +80,7 @@ async fn init_state(config: &CardanoConnectConfig, mock_data: bool) -> Result Result, sync: ChainListener, + contract: ContractListener, cache: HashMap>, } impl ChainEventStream { - pub fn new(id: ListenerId, filters: Vec, sync: ChainListener) -> Self { + pub fn new( + id: ListenerId, + filters: Vec, + sync: ChainListener, + contract: ContractListener, + ) -> Self { Self { id, filters, sync, + contract, cache: HashMap::new(), } } - pub fn try_get_next_event(&mut self, hwm: &EventReference) -> Option<(EventReference, Event)> { + pub async fn try_get_next_event( + &mut self, + hwm: &EventReference, + ) -> Option<(EventReference, Event)> { let mut next_hwm = hwm.clone(); loop { if let Some(result) = self.next_event_in_memory(&next_hwm) { @@ -33,7 +44,7 @@ impl ChainEventStream { } let (rollbacks, block) = self.try_get_next_block(&next_hwm.block)?; let block_ref = block.as_reference(); - if let Some(event) = self.collect_events(rollbacks, block) { + if let Some(event) = self.collect_events(rollbacks, block).await { return Some(event); } next_hwm = EventReference { @@ -53,7 +64,7 @@ impl ChainEventStream { } let (rollbacks, block) = self.wait_for_next_block(&next_hwm.block).await; let block_ref = block.as_reference(); - if let Some(event) = self.collect_events(rollbacks, block) { + if let Some(event) = self.collect_events(rollbacks, block).await { return event; } next_hwm = EventReference { @@ -113,35 +124,45 @@ impl ChainEventStream { } } - fn collect_events( + async fn collect_events( &mut self, rollbacks: Vec, block: BlockInfo, ) -> Option<(EventReference, Event)> { + if self.cache.contains_key(&block.as_reference()) { + // we already gathered these events + return None; + } + self.contract.gather_events(&rollbacks, &block).await; let mut result = None; for rollback in rollbacks { - let Some(forwards) = self.cache.get(&rollback.as_reference()) else { - continue; - }; - let backwards: Vec<_> = forwards - .iter() - .map(|(_, e)| { - let event_ref = EventReference { - block: block.as_reference(), - rollback: true, - tx_index: Some(e.id.transaction_index), - log_index: Some(e.id.log_index), - }; - let event = e.clone().into_rollback(); - (event_ref, event) - }) - .collect(); + let rollback_ref = rollback.as_reference(); + let mut backwards = self.collect_backward_tx_events(&rollback); + for event in self.contract.events_for(&rollback_ref).await { + let event_ref = EventReference { + block: rollback_ref.clone(), + rollback: true, + tx_index: Some(event.id.transaction_index), + log_index: Some(event.id.log_index), + }; + backwards.push((event_ref, event)); + } result = result.or(backwards.first().cloned()); - self.cache.insert(rollback.as_reference(), backwards); + self.cache.insert(rollback_ref, backwards); + } + let block_ref = block.as_reference(); + let mut events = self.collect_forward_tx_events(&block); + for event in self.contract.events_for(&block_ref).await { + let event_ref = EventReference { + block: block_ref.clone(), + rollback: false, + tx_index: Some(event.id.transaction_index), + log_index: Some(event.id.log_index), + }; + events.push((event_ref, event)); } - let events = self.collect_forward_tx_events(&block); result = result.or(events.first().cloned()); - self.cache.insert(block.as_reference(), events); + self.cache.insert(block_ref, events); result } @@ -176,9 +197,42 @@ impl ChainEventStream { events } + fn collect_backward_tx_events(&self, block: &BlockInfo) -> Vec<(EventReference, Event)> { + let mut events = vec![]; + for (tx_idx, tx_hash) in block.transaction_hashes.iter().enumerate().rev() { + let tx_idx = tx_idx as u64; + if self.matches_tx_filter(tx_hash) { + let id = EventId { + listener_id: self.id.clone(), + signature: "TransactionRolledBack(string, string, string)".into(), + block_hash: block.block_hash.clone(), + block_number: block.block_height, + transaction_hash: tx_hash.clone(), + transaction_index: tx_idx, + log_index: 0, + timestamp: Some(SystemTime::now()), + }; + let event = Event { + id, + data: json!({}), + }; + let event_ref = EventReference { + block: block.as_reference(), + rollback: false, + tx_index: Some(tx_idx), + log_index: Some(0), + }; + events.push((event_ref, event)); + } + } + events + } + fn matches_tx_filter(&self, tx_hash: &str) -> bool { for filter in &self.filters { - let ListenerFilter::TransactionId(id) = filter; + let ListenerFilter::TransactionId(id) = filter else { + continue; + }; if id == tx_hash || id == "any" { return true; } diff --git a/firefly-cardanoconnect/src/streams/manager.rs b/firefly-cardanoconnect/src/streams/manager.rs index d6e03ec..13ab358 100644 --- a/firefly-cardanoconnect/src/streams/manager.rs +++ b/firefly-cardanoconnect/src/streams/manager.rs @@ -5,7 +5,10 @@ use firefly_server::apitypes::{ApiError, ApiResult}; use tokio::sync::broadcast; use ulid::Ulid; -use crate::{blockchain::BlockchainClient, operations::Operation, persistence::Persistence}; +use crate::{ + blockchain::BlockchainClient, contracts::ContractManager, operations::Operation, + persistence::Persistence, +}; use super::{ mux::{Multiplexer, StreamSubscription}, @@ -19,13 +22,14 @@ pub struct StreamManager { impl StreamManager { pub async fn new( - persistence: Arc, blockchain: Arc, + contracts: Arc, + persistence: Arc, operation_sink: broadcast::Sender, ) -> Result { Ok(Self { persistence: persistence.clone(), - mux: Multiplexer::new(persistence, blockchain, operation_sink).await?, + mux: Multiplexer::new(blockchain, contracts, persistence, operation_sink).await?, }) } diff --git a/firefly-cardanoconnect/src/streams/mux.rs b/firefly-cardanoconnect/src/streams/mux.rs index e5a3553..313a310 100644 --- a/firefly-cardanoconnect/src/streams/mux.rs +++ b/firefly-cardanoconnect/src/streams/mux.rs @@ -11,7 +11,12 @@ use tokio::{ }; use tracing::warn; -use crate::{blockchain::BlockchainClient, operations::Operation, persistence::Persistence}; +use crate::{ + blockchain::BlockchainClient, + contracts::{ContractListener, ContractManager}, + operations::Operation, + persistence::Persistence, +}; use super::{ blockchain::{ChainListener, DataSource}, @@ -25,14 +30,16 @@ pub struct Multiplexer { dispatchers: Arc>, stream_ids_by_topic: Arc>, operation_sink: broadcast::Sender, + contracts: Arc, persistence: Arc, data_source: Arc, } impl Multiplexer { pub async fn new( - persistence: Arc, blockchain: Arc, + contracts: Arc, + persistence: Arc, operation_sink: broadcast::Sender, ) -> Result { let data_source = Arc::new(DataSource::new(blockchain, persistence.clone())); @@ -45,6 +52,7 @@ impl Multiplexer { let dispatcher = StreamDispatcher::new( &stream, + contracts.clone(), persistence.clone(), data_source.clone(), operation_sink.clone(), @@ -56,6 +64,7 @@ impl Multiplexer { dispatchers: Arc::new(dispatchers), stream_ids_by_topic: Arc::new(stream_ids_by_topic), operation_sink, + contracts, persistence, data_source, }) @@ -73,6 +82,7 @@ impl Multiplexer { entry.insert( StreamDispatcher::new( stream, + self.contracts.clone(), self.persistence.clone(), self.data_source.clone(), self.operation_sink.clone(), @@ -97,6 +107,7 @@ impl Multiplexer { .data_source .listen(listener.id.clone(), from_block.as_ref()); let block = from_block.unwrap_or(sync.get_tip().await?); + let contract = self.contracts.listen(listener).await; let hwm = EventReference { block, rollback: false, @@ -107,7 +118,7 @@ impl Multiplexer { let Some(dispatcher) = self.dispatchers.get(&listener.stream_id) else { bail!("new listener created for stream we haven't heard of"); }; - dispatcher.add_listener(listener, sync, hwm).await + dispatcher.add_listener(listener, sync, contract, hwm).await } pub async fn handle_listener_delete( @@ -141,6 +152,7 @@ struct StreamDispatcher { impl StreamDispatcher { pub async fn new( stream: &Stream, + contracts: Arc, persistence: Arc, data_source: Arc, operation_sink: broadcast::Sender, @@ -161,13 +173,19 @@ impl StreamDispatcher { for listener in all_listeners { let hwm = old_hwms.get(&listener.id).cloned().unwrap_or_default(); let sync = data_source.listen(listener.id.clone(), Some(&hwm.block)); + let contract = contracts.listen(&listener).await; hwms.insert(listener.id.clone(), hwm); listeners.insert( listener.id.clone(), ListenerState { id: listener.id.clone(), - stream: ChainEventStream::new(listener.id, listener.filters, sync), + stream: ChainEventStream::new( + listener.id, + listener.filters, + sync, + contract, + ), }, ); } @@ -205,6 +223,7 @@ impl StreamDispatcher { &self, listener: &Listener, sync: ChainListener, + contract: ContractListener, hwm: EventReference, ) -> Result<()> { self.state_change_sink @@ -212,6 +231,7 @@ impl StreamDispatcher { listener.id.clone(), listener.filters.clone(), sync, + contract, hwm, )) .await @@ -265,10 +285,10 @@ impl StreamDispatcherWorker { self.batch_size = settings.batch_size; self.batch_timeout = settings.batch_timeout; } - StreamDispatcherStateChange::NewListener(listener_id, filters, sync, hwm) => { + StreamDispatcherStateChange::NewListener(listener_id, filters, sync, contract, hwm) => { self.listeners.insert(listener_id.clone(), ListenerState { id: listener_id.clone(), - stream: ChainEventStream::new(listener_id.clone(), filters.clone(), sync), + stream: ChainEventStream::new(listener_id.clone(), filters, sync, contract), }); self.hwms.insert(listener_id, hwm); } @@ -358,7 +378,7 @@ impl StreamDispatcherWorker { let mut next_event_future = FuturesUnordered::new(); for listener in self.listeners.values_mut() { let hwm = hwms.get(&listener.id).unwrap(); - if let Some((event_ref, event)) = listener.stream.try_get_next_event(hwm) { + if let Some((event_ref, event)) = listener.stream.try_get_next_event(hwm).await { // This listener already has an event waiting to surface. new_events.push((listener.id.clone(), event_ref, event)); } else { @@ -410,7 +430,6 @@ impl StreamDispatcherWorker { } } -#[derive(Debug)] struct ListenerState { id: ListenerId, stream: ChainEventStream, @@ -422,6 +441,7 @@ enum StreamDispatcherStateChange { ListenerId, Vec, ChainListener, + ContractListener, EventReference, ), RemovedListener(ListenerId), diff --git a/firefly-cardanoconnect/src/streams/types.rs b/firefly-cardanoconnect/src/streams/types.rs index 84ad5a1..d424527 100644 --- a/firefly-cardanoconnect/src/streams/types.rs +++ b/firefly-cardanoconnect/src/streams/types.rs @@ -44,6 +44,10 @@ pub enum ListenerType { #[serde(rename_all = "camelCase")] pub enum ListenerFilter { TransactionId(String), + Event { + contract: String, + event_path: String, + }, } #[derive(Clone, Debug)] @@ -133,13 +137,3 @@ pub struct Event { pub id: EventId, pub data: serde_json::Value, } -impl Event { - pub fn into_rollback(mut self) -> Self { - if self.id.signature == "TransactionAccepted(string,string,string)" { - self.id.signature = "TransactionRolledBack(string,string,string)".into(); - } else if self.id.signature == "TransactionRolledBack(string,string,string)" { - self.id.signature = "TransactionAccepted(string,string,string)".into(); - } - self - } -} diff --git a/infra/connect.yaml b/infra/connect.yaml index 1f1a2ce..4564f9d 100644 --- a/infra/connect.yaml +++ b/infra/connect.yaml @@ -6,7 +6,7 @@ connector: network: preview contracts: componentsPath: /contracts/components - storePath: /contracts/store.redb + storesPath: /contracts/stores persistence: type: sqlite path: /db/db.sqlite3 diff --git a/wasm/simple-tx/src/lib.rs b/wasm/simple-tx/src/lib.rs index acfad7e..ecfac13 100644 --- a/wasm/simple-tx/src/lib.rs +++ b/wasm/simple-tx/src/lib.rs @@ -22,8 +22,6 @@ fn send_ada(_: Config<()>, req: Params) -> WorkerResult let from_address = Address::from_bech32(&req.from_address).map_err(|_| BuildError::MalformedAddress)?; - let to_address = - Address::from_bech32(&req.to_address).map_err(|_| BuildError::MalformedAddress)?; let address_source = UtxoSource::Search(UtxoPattern { address: Some(AddressPattern { @@ -36,7 +34,7 @@ fn send_ada(_: Config<()>, req: Params) -> WorkerResult .with_input(CoinSelectionInput(address_source.clone(), req.amount)) .with_output( OutputBuilder::new() - .address(to_address) + .address(req.to_address.clone()) .with_value(req.amount), ) .with_output(FeeChangeReturn(address_source));