From 1466d8d8cecc6d4b5b83d7d54a9896d6c17b537e Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Fri, 7 Feb 2025 14:39:49 -0500 Subject: [PATCH] feat: add address to custom events Signed-off-by: Simon Gellis --- firefly-cardanoconnect/src/contracts.rs | 6 ++--- .../src/contracts/runtime.rs | 22 ++++++++++++++++--- firefly-cardanoconnect/src/routes/ws.rs | 2 ++ firefly-cardanoconnect/src/streams/events.rs | 4 +++- firefly-cardanoconnect/src/streams/types.rs | 1 + 5 files changed, 28 insertions(+), 7 deletions(-) diff --git a/firefly-cardanoconnect/src/contracts.rs b/firefly-cardanoconnect/src/contracts.rs index f078eeb..667654b 100644 --- a/firefly-cardanoconnect/src/contracts.rs +++ b/firefly-cardanoconnect/src/contracts.rs @@ -8,8 +8,8 @@ use anyhow::{bail, Result}; use balius_runtime::{ledgers::Ledger, Response}; use dashmap::{DashMap, Entry}; use ledger::BlockfrostLedger; +pub use runtime::ContractEvent; use runtime::ContractRuntime; -pub use runtime::RawEvent; use serde::Deserialize; use serde_json::Value; use tokio::{fs, sync::Mutex}; @@ -151,7 +151,7 @@ struct ContractListenerContract { pub struct ContractListener { contracts: Vec, - cache: HashMap>, + cache: HashMap>, } impl ContractListener { @@ -163,7 +163,7 @@ impl ContractListener { } } - pub async fn events_for(&mut self, block_ref: &BlockReference) -> &[RawEvent] { + pub async fn events_for(&mut self, block_ref: &BlockReference) -> &[ContractEvent] { match self.cache.entry(block_ref.clone()) { hash_map::Entry::Occupied(entry) => entry.into_mut(), hash_map::Entry::Vacant(entry) => { diff --git a/firefly-cardanoconnect/src/contracts/runtime.rs b/firefly-cardanoconnect/src/contracts/runtime.rs index c8261c3..1e89b61 100644 --- a/firefly-cardanoconnect/src/contracts/runtime.rs +++ b/firefly-cardanoconnect/src/contracts/runtime.rs @@ -100,7 +100,7 @@ impl ContractRuntime { rx.await? } - pub async fn events(&self, block_ref: &BlockReference) -> Result> { + pub async fn events(&self, block_ref: &BlockReference) -> Result> { let key = match block_ref { BlockReference::Origin => { return Ok(vec![]); @@ -112,7 +112,15 @@ impl ContractRuntime { }; let mut lock = kv.lock().await; let raw_events: Vec = lock.get(key).await?.unwrap_or_default(); - Ok(raw_events) + Ok(raw_events + .into_iter() + .map(|e| ContractEvent { + address: self.contract.clone(), + tx_hash: hex::encode(e.tx_hash), + signature: e.signature, + data: e.data, + }) + .collect()) } } @@ -264,8 +272,16 @@ impl ContractRuntimeWorker { } } +#[derive(Clone)] +pub struct ContractEvent { + pub address: String, + pub tx_hash: String, + pub signature: String, + pub data: serde_json::Value, +} + #[derive(Clone, Deserialize)] -pub struct RawEvent { +struct RawEvent { pub tx_hash: Vec, pub signature: String, pub data: serde_json::Value, diff --git a/firefly-cardanoconnect/src/routes/ws.rs b/firefly-cardanoconnect/src/routes/ws.rs index 3c10562..824745e 100644 --- a/firefly-cardanoconnect/src/routes/ws.rs +++ b/firefly-cardanoconnect/src/routes/ws.rs @@ -49,6 +49,7 @@ async fn send_batch(socket: &mut WebSocket, topic: &str, batch: Batch) -> Result .iter() .map(|e| Event { listener_id: Some(e.id.listener_id.clone().into()), + address: e.id.address.clone(), signature: e.id.signature.clone(), block_number: e.id.block_number, block_hash: e.id.block_hash.clone(), @@ -170,6 +171,7 @@ async fn read_message(socket: &mut WebSocket) -> Result> #[serde(rename_all = "camelCase")] struct Event { listener_id: Option, + address: Option, signature: String, block_hash: String, block_number: Option, diff --git a/firefly-cardanoconnect/src/streams/events.rs b/firefly-cardanoconnect/src/streams/events.rs index 6f758c2..0a52f27 100755 --- a/firefly-cardanoconnect/src/streams/events.rs +++ b/firefly-cardanoconnect/src/streams/events.rs @@ -157,7 +157,7 @@ impl ChainEventStream { let mut contract_events: HashMap<_, Vec<_>> = HashMap::new(); for contract_event in self.contract.events_for(&block_ref).await { contract_events - .entry(hex::encode(&contract_event.tx_hash)) + .entry(contract_event.tx_hash.clone()) .or_default() .push(contract_event.clone()); } @@ -173,6 +173,7 @@ impl ChainEventStream { let id = EventId { listener_id: self.id.clone(), + address: None, signature: tx_event_signature.into(), block_hash: block.block_hash.clone(), block_number: block.block_height, @@ -197,6 +198,7 @@ impl ChainEventStream { for contract_event in contract_events.remove(tx_hash).into_iter().flatten() { let id = EventId { listener_id: self.id.clone(), + address: Some(contract_event.address), signature: contract_event.signature, block_hash: block.block_hash.clone(), block_number: block.block_height, diff --git a/firefly-cardanoconnect/src/streams/types.rs b/firefly-cardanoconnect/src/streams/types.rs index d424527..71beb42 100644 --- a/firefly-cardanoconnect/src/streams/types.rs +++ b/firefly-cardanoconnect/src/streams/types.rs @@ -123,6 +123,7 @@ pub struct BlockRecord { #[derive(Clone, Debug)] pub struct EventId { pub listener_id: ListenerId, + pub address: Option, pub signature: String, pub block_hash: String, pub block_number: Option,