From 9df8a9aec0185639fc421329ac0050f639567757 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Mon, 30 Dec 2024 12:14:51 -0500 Subject: [PATCH] feat: separate event production from listener multiplexing --- firefly-cardanoconnect/src/streams.rs | 1 + .../src/streams/blockchain.rs | 110 +++---- firefly-cardanoconnect/src/streams/events.rs | 188 ++++++++++++ firefly-cardanoconnect/src/streams/mux.rs | 281 +++--------------- firefly-cardanoconnect/src/streams/types.rs | 2 +- firefly-cardanoconnect/src/utils.rs | 3 + 6 files changed, 283 insertions(+), 302 deletions(-) create mode 100755 firefly-cardanoconnect/src/streams/events.rs diff --git a/firefly-cardanoconnect/src/streams.rs b/firefly-cardanoconnect/src/streams.rs index 89609d1..7d5526e 100644 --- a/firefly-cardanoconnect/src/streams.rs +++ b/firefly-cardanoconnect/src/streams.rs @@ -1,4 +1,5 @@ mod blockchain; +mod events; mod manager; mod mux; mod types; diff --git a/firefly-cardanoconnect/src/streams/blockchain.rs b/firefly-cardanoconnect/src/streams/blockchain.rs index f7c53a4..d105302 100644 --- a/firefly-cardanoconnect/src/streams/blockchain.rs +++ b/firefly-cardanoconnect/src/streams/blockchain.rs @@ -55,16 +55,19 @@ impl ChainListener { Ok(self.get_impl().await?.get_tip()) } - pub async fn get_event(&mut self, block_ref: &BlockReference) -> Option { - self.get_impl().await.unwrap().get_event(block_ref) + pub fn try_get_next_event(&mut self, block_ref: &BlockReference) -> Option { + let l = self.0.try_get()?.as_mut().ok()?; + l.try_get_next_event(block_ref) } - pub async fn get_next(&mut self, block_ref: &BlockReference) -> Option { - self.get_impl().await.unwrap().get_next(block_ref).await - } - - pub async fn get_next_rollback(&mut self, block_ref: &BlockReference) -> BlockReference { - self.get_impl().await.unwrap().get_next_rollback(block_ref) + pub async fn wait_for_next_event(&mut self, block_ref: &BlockReference) -> ListenerEvent { + let l = self.get_impl().await.unwrap(); + loop { + if let Some(event) = l.try_get_next_event(block_ref) { + return event; + } + l.wait_for_more_events().await; + } } async fn get_impl(&mut self) -> Result<&mut ChainListenerImpl> { @@ -103,13 +106,13 @@ impl ChainListenerImpl { self.history.back().unwrap().as_reference() } - pub fn get_event(&mut self, block_ref: &BlockReference) -> Option { - let (target_slot, target_hash) = match block_ref { + pub fn try_get_next_event(&mut self, block_ref: &BlockReference) -> Option { + let (prev_slot, prev_hash) = match block_ref { BlockReference::Origin => (None, self.genesis_hash.clone()), BlockReference::Point(slot, hash) => (*slot, hash.clone()), }; - if let Some(slot) = target_slot { + if let Some(slot) = prev_slot { // if we haven't seen enough blocks to be "sure" that this one is immutable, apply all pending updates synchronously if self .history @@ -124,73 +127,42 @@ impl ChainListenerImpl { } } - // If we already know this block has been rolled back, just say so - if let Some(rollback) = self.rollbacks.get(block_ref) { - return Some(ListenerEvent::Rollback(rollback.clone())); + // Check if we've rolled back already + if let Some(rollback_to) = self.rollbacks.get(block_ref) { + return Some(ListenerEvent::Rollback(rollback_to.clone())); } - // If we have it already, return it. - // If we don't, no big deal, some other consumer is running at a different point in history - self.history - .iter() - .rev() - .take_while(|b| { - b.block_slot - .is_none_or(|b| target_slot.is_none_or(|t| b >= t)) - }) - .find(|b| b.block_hash == target_hash) - .cloned() - .map(ListenerEvent::Process) - } - - pub async fn get_next(&mut self, block_ref: &BlockReference) -> Option { - let (prev_slot, prev_hash) = match block_ref { - BlockReference::Origin => (None, self.genesis_hash.clone()), - BlockReference::Point(slot, hash) => (*slot, hash.clone()), - }; - - loop { - // Have we rolled back to before this block? If so, don't wait for its successor. - // That successor will never come, and even if it did, we'd ignore it. - if self.rollbacks.contains_key(block_ref) { - return None; - } - - for (index, block) in self.history.iter().enumerate().rev() { - if block.block_hash == prev_hash { - if let Some(next) = self.history.get(index + 1) { - // we already have the block which comes after this! - return Some(next.as_reference()); - } else { - // we don't have that block yet, so process events until we do - break; - } - } - // If we can tell by the slots we've gone too far back, break early - if block - .block_slot - .is_some_and(|slot| prev_slot.is_some_and(|target| slot < target)) - { + for (index, block) in self.history.iter().enumerate().rev() { + if block.block_hash == prev_hash { + if let Some(next) = self.history.get(index + 1) { + // we already have the block which comes after this! + return Some(ListenerEvent::Process(next.clone())); + } else { + // we don't have that block yet, so process events until we do break; } } - - // We don't have it, wait until the chain has progressed before checking again - let mut sync_events = vec![]; - if self.sync_event_source.recv_many(&mut sync_events, 32).await == 0 { - panic!("data source has been shut down") - } - for sync_event in sync_events { - self.handle_sync_event(sync_event); + // If we can tell by the slots we've gone too far back, break early + if block + .block_slot + .is_some_and(|slot| prev_slot.is_some_and(|target| slot < target)) + { + break; } } + + // We don't have it, wait until the chain has progressed before checking again + None } - pub fn get_next_rollback(&mut self, block_ref: &BlockReference) -> BlockReference { - let Some(rollback_to) = self.rollbacks.get(block_ref) else { - panic!("caller is trying to roll back when we didn't tell them to"); - }; - rollback_to.as_reference() + pub async fn wait_for_more_events(&mut self) { + let mut sync_events = vec![]; + if self.sync_event_source.recv_many(&mut sync_events, 32).await == 0 { + panic!("data source has been shut down") + } + for sync_event in sync_events { + self.handle_sync_event(sync_event); + } } fn handle_sync_event(&mut self, sync_event: ChainSyncEvent) { diff --git a/firefly-cardanoconnect/src/streams/events.rs b/firefly-cardanoconnect/src/streams/events.rs new file mode 100755 index 0000000..77cf213 --- /dev/null +++ b/firefly-cardanoconnect/src/streams/events.rs @@ -0,0 +1,188 @@ +use std::{collections::HashMap, time::SystemTime}; + +use serde_json::json; + +use super::{ + blockchain::{ChainListener, ListenerEvent}, + BlockInfo, BlockReference, Event, EventId, EventReference, ListenerFilter, ListenerId, +}; + +#[derive(Debug)] +pub struct ChainEventStream { + id: ListenerId, + filters: Vec, + sync: ChainListener, + cache: HashMap>, +} + +impl ChainEventStream { + pub fn new(id: ListenerId, filters: Vec, sync: ChainListener) -> Self { + Self { + id, + filters, + sync, + cache: HashMap::new(), + } + } + + pub fn try_get_next_event(&mut self, hwm: &EventReference) -> Option<(EventReference, Event)> { + let mut next_hwm = hwm.clone(); + loop { + if let Some(result) = self.next_event_in_memory(&next_hwm) { + return Some(result); + } + let (rollbacks, block) = self.try_get_next_block(&next_hwm.block)?; + let block_ref = block.as_reference(); + if let Some(event) = self.collect_events(rollbacks, block) { + return Some(event); + } + next_hwm = EventReference { + block: block_ref, + rollback: false, + tx_index: None, + log_index: None, + } + } + } + + pub async fn wait_for_next_event(&mut self, hwm: &EventReference) -> (EventReference, Event) { + let mut next_hwm = hwm.clone(); + loop { + if let Some(result) = self.next_event_in_memory(&next_hwm) { + return result; + } + let (rollbacks, block) = self.wait_for_next_block(&next_hwm.block).await; + let block_ref = block.as_reference(); + if let Some(event) = self.collect_events(rollbacks, block) { + return event; + } + next_hwm = EventReference { + block: block_ref, + rollback: false, + tx_index: None, + log_index: None, + } + } + } + + fn next_event_in_memory(&mut self, hwm: &EventReference) -> Option<(EventReference, Event)> { + let cached = self.cache.get(&hwm.block)?; + if hwm.tx_index.is_none() && hwm.log_index.is_none() { + // We haven't processed any events from this block yet, so just process the first + return cached.first().cloned(); + } + let current_index = cached.iter().position(|(e_ref, _)| e_ref == hwm)?; + cached.get(current_index + 1).cloned() + } + + fn try_get_next_block( + &mut self, + block_ref: &BlockReference, + ) -> Option<(Vec, BlockInfo)> { + let mut rollbacks = vec![]; + let mut at = block_ref.clone(); + loop { + match self.sync.try_get_next_event(&at)? { + ListenerEvent::Rollback(block) => { + at = block.as_reference(); + rollbacks.push(block); + } + ListenerEvent::Process(block) => { + return Some((rollbacks, block)); + } + } + } + } + + async fn wait_for_next_block( + &mut self, + block_ref: &BlockReference, + ) -> (Vec, BlockInfo) { + let mut rollbacks = vec![]; + let mut at = block_ref.clone(); + loop { + match self.sync.wait_for_next_event(&at).await { + ListenerEvent::Rollback(block) => { + at = block.as_reference(); + rollbacks.push(block); + } + ListenerEvent::Process(block) => { + return (rollbacks, block); + } + } + } + } + + fn collect_events( + &mut self, + rollbacks: Vec, + block: BlockInfo, + ) -> Option<(EventReference, Event)> { + let mut result = None; + for rollback in rollbacks { + let Some(forwards) = self.cache.get(&rollback.as_reference()) else { + continue; + }; + let backwards: Vec<_> = forwards + .iter() + .map(|(_, e)| { + let event_ref = EventReference { + block: block.as_reference(), + rollback: true, + tx_index: Some(e.id.transaction_index), + log_index: Some(e.id.log_index), + }; + let event = e.clone().into_rollback(); + (event_ref, event) + }) + .collect(); + result = result.or(backwards.first().cloned()); + self.cache.insert(rollback.as_reference(), backwards); + } + let events = self.collect_forward_tx_events(&block); + result = result.or(events.first().cloned()); + self.cache.insert(block.as_reference(), events); + result + } + + fn collect_forward_tx_events(&self, block: &BlockInfo) -> Vec<(EventReference, Event)> { + let mut events = vec![]; + for (tx_idx, tx_hash) in block.transaction_hashes.iter().enumerate() { + let tx_idx = tx_idx as u64; + if self.matches_tx_filter(tx_hash) { + let id = EventId { + listener_id: self.id.clone(), + signature: "TransactionAccepted(string, string, string)".into(), + block_hash: block.block_hash.clone(), + block_number: block.block_height, + transaction_hash: tx_hash.clone(), + transaction_index: tx_idx, + log_index: 0, + timestamp: Some(SystemTime::now()), + }; + let event = Event { + id, + data: json!({}), + }; + let event_ref = EventReference { + block: block.as_reference(), + rollback: false, + tx_index: Some(tx_idx), + log_index: Some(0), + }; + events.push((event_ref, event)); + } + } + events + } + + fn matches_tx_filter(&self, tx_hash: &str) -> bool { + for filter in &self.filters { + let ListenerFilter::TransactionId(id) = filter; + if id == tx_hash || id == "any" { + return true; + } + } + false + } +} diff --git a/firefly-cardanoconnect/src/streams/mux.rs b/firefly-cardanoconnect/src/streams/mux.rs index 4b8915c..e5a3553 100644 --- a/firefly-cardanoconnect/src/streams/mux.rs +++ b/firefly-cardanoconnect/src/streams/mux.rs @@ -1,14 +1,9 @@ -use std::{ - cmp::Ordering, - collections::BTreeMap, - sync::Arc, - time::{Duration, SystemTime}, -}; +use std::{cmp::Ordering, collections::BTreeMap, sync::Arc, time::Duration}; use anyhow::{bail, Context, Result}; use dashmap::{DashMap, Entry}; use firefly_server::apitypes::ToAnyhow; -use serde_json::json; +use futures::{stream::FuturesUnordered, StreamExt}; use tokio::{ select, sync::{broadcast, mpsc, oneshot}, @@ -16,16 +11,12 @@ use tokio::{ }; use tracing::warn; -use crate::{ - blockchain::BlockchainClient, - operations::Operation, - persistence::Persistence, - streams::{blockchain::ListenerEvent, EventId}, -}; +use crate::{blockchain::BlockchainClient, operations::Operation, persistence::Persistence}; use super::{ blockchain::{ChainListener, DataSource}, - BlockInfo, BlockReference, Event, EventReference, Listener, ListenerFilter, ListenerId, Stream, + events::ChainEventStream, + BlockReference, Event, EventReference, Listener, ListenerFilter, ListenerId, Stream, StreamCheckpoint, StreamId, }; @@ -169,15 +160,14 @@ impl StreamDispatcher { let mut hwms = BTreeMap::new(); for listener in all_listeners { let hwm = old_hwms.get(&listener.id).cloned().unwrap_or_default(); - let stream = data_source.listen(listener.id.clone(), Some(&hwm.block)); + let sync = data_source.listen(listener.id.clone(), Some(&hwm.block)); hwms.insert(listener.id.clone(), hwm); listeners.insert( listener.id.clone(), ListenerState { - id: listener.id, - sync: stream, - filters: listener.filters, + id: listener.id.clone(), + stream: ChainEventStream::new(listener.id, listener.filters, sync), }, ); } @@ -278,8 +268,7 @@ impl StreamDispatcherWorker { StreamDispatcherStateChange::NewListener(listener_id, filters, sync, hwm) => { self.listeners.insert(listener_id.clone(), ListenerState { id: listener_id.clone(), - sync, - filters, + stream: ChainEventStream::new(listener_id.clone(), filters.clone(), sync), }); self.hwms.insert(listener_id, hwm); } @@ -364,230 +353,59 @@ impl StreamDispatcherWorker { !self.listeners.is_empty(), "no listeners to produce events!" ); - // TODO: gotta pull events from the balius runtime - loop { - // find the next event to process - let mut sync_events = vec![]; + while events.len() < self.batch_size { + let mut new_events = vec![]; + let mut next_event_future = FuturesUnordered::new(); for listener in self.listeners.values_mut() { let hwm = hwms.get(&listener.id).unwrap(); - if let Some(sync_event) = listener.sync.get_event(&hwm.block).await { - sync_events.push((listener.id.clone(), hwm.block.clone(), sync_event)); + if let Some((event_ref, event)) = listener.stream.try_get_next_event(hwm) { + // This listener already has an event waiting to surface. + new_events.push((listener.id.clone(), event_ref, event)); + } else { + // This listener must already be at the tip. We might have to wait a while for more events. + next_event_future.push(async move { + let (event_ref, event) = listener.stream.wait_for_next_event(hwm).await; + (listener.id.clone(), event_ref, event) + }); } } - let (listener_id, block_ref, sync_event) = sync_events + // If any listeners already had events waiting for us, choose the highest-priority event + let next_event = new_events .into_iter() - .max_by(|l, r| Self::compare_stream_event_priority(&l.2, &r.2)) - .expect("no listeners had anything to do on this event!"); - - // process it, updating our high water marks as we go - match &sync_event { - ListenerEvent::Process(block) => { - self.collect_forward_events(hwms, events, block); - } - ListenerEvent::Rollback(block) => { - self.collect_backward_events(hwms, events, block); - } - } - - // if we still need to fill this batch, update our high water marks to look at the next block - if events.len() >= self.batch_size { - break; - } - let listener = self.listeners.get_mut(&listener_id).unwrap(); - let (update_to, rollback) = match sync_event { - ListenerEvent::Process(_) => { - let update_to = listener.sync.get_next(&block_ref).await; - (update_to, false) - } - ListenerEvent::Rollback(_) => { - let update_to = Some(listener.sync.get_next_rollback(&block_ref).await); - (update_to, true) - } + .max_by(|l, r| Self::compare_event_priority(&l.1, &r.1)); + let (listener_id, event_ref, new_event) = if let Some(event) = next_event { + event + } else { + // Block until some listener has a new event, then use that + next_event_future.next().await.unwrap() }; - if let Some(next_ref) = update_to { - for hwm in hwms.values_mut() { - if hwm.block == block_ref { - *hwm = EventReference { - block: next_ref.clone(), - rollback, - tx_index: None, - log_index: None, - }; - } - } - } + drop(next_event_future); + hwms.insert(listener_id, event_ref); + events.push(new_event); } } - fn collect_forward_events( - &self, - hwms: &mut BTreeMap, - events: &mut Vec, - block: &BlockInfo, - ) { - for (id, hwm) in hwms.iter_mut() { - if !Self::matches_ref(block, &hwm.block) { - continue; - } - let listener = self.listeners.get(id).unwrap(); - for event in self.find_forward_events(listener, hwm, block) { - *hwm = EventReference { - block: hwm.block.clone(), - rollback: false, - tx_index: Some(event.id.transaction_index), - log_index: Some(event.id.log_index), - }; - events.push(event); - if events.len() >= self.batch_size { - return; - } - } - } - } - - fn collect_backward_events( - &self, - hwms: &mut BTreeMap, - events: &mut Vec, - block: &BlockInfo, - ) { - for (id, hwm) in hwms.iter_mut().rev() { - if !Self::matches_ref(block, &hwm.block) { - continue; - } - let listener = self.listeners.get(id).unwrap(); - for event in self.find_backward_events(listener, hwm, block) { - *hwm = EventReference { - block: hwm.block.clone(), - rollback: true, - tx_index: Some(event.id.transaction_index), - log_index: Some(event.id.log_index), - }; - events.push(event); - if events.len() >= self.batch_size { - return; - } - } - } - } - - fn matches_ref(block: &BlockInfo, block_ref: &BlockReference) -> bool { - match block_ref { - BlockReference::Origin => block.block_slot.is_none() && block.block_height.is_none(), - BlockReference::Point(slot, hash) => { - block.block_slot == *slot && block.block_hash == *hash - } - } - } - - fn find_forward_events( - &self, - listener: &ListenerState, - hwm: &EventReference, - block: &BlockInfo, - ) -> Vec { - if hwm.rollback { - // If we were rolling back before, and now we've started rolling forward, - // we rolled back onto a block we've already completely processed. - return vec![]; - } - let mut events = self.find_events(listener, block); - events.retain(|e| { - // throw out any events which came at or before our high-water mark - let tx_cmp = hwm - .tx_index - .as_ref() - .map(|tx| e.id.transaction_index.cmp(tx)); - let log_cmp = hwm.log_index.as_ref().map(|log| e.id.log_index.cmp(log)); - match (tx_cmp, log_cmp) { - (Some(Ordering::Less), _) => false, - (Some(Ordering::Equal), Some(Ordering::Less | Ordering::Equal)) => false, - (_, _) => true, - } - }); - events - } - - fn find_backward_events( - &self, - listener: &ListenerState, - hwm: &EventReference, - block: &BlockInfo, - ) -> Vec { - self.find_events(listener, block) - .into_iter() - .rev() - .filter(|e| { - let tx_cmp = hwm - .tx_index - .as_ref() - .map(|tx| e.id.transaction_index.cmp(tx)); - let log_cmp = hwm.log_index.as_ref().map(|log| e.id.log_index.cmp(log)); - if hwm.rollback { - // if we were rolling back already, throw out events we've already rolled back past - match (tx_cmp, log_cmp) { - (Some(Ordering::Greater), _) => false, - (Some(Ordering::Equal), Some(Ordering::Greater | Ordering::Equal)) => false, - (_, _) => true, - } - } else { - // otherwise, just keep any events that rolling forward would have kept - match (tx_cmp, log_cmp) { - (Some(Ordering::Less), _) => true, - (Some(Ordering::Equal), Some(Ordering::Less | Ordering::Equal)) => true, - (_, _) => false, - } - } - }) - .map(|e| e.into_rollback()) - .collect() - } - - fn find_events(&self, listener: &ListenerState, block: &BlockInfo) -> Vec { - let mut events = vec![]; - for (tx_idx, tx_hash) in block.transaction_hashes.iter().enumerate() { - let tx_idx = tx_idx as u64; - for filter in &listener.filters { - if Self::matches_filter(tx_hash, filter) { - let id = EventId { - listener_id: listener.id.clone(), - signature: "TransactionAccepted(string,string,string)".into(), - block_hash: block.block_hash.clone(), - block_number: block.block_height, - transaction_hash: tx_hash.clone(), - transaction_index: tx_idx, - log_index: 0, - timestamp: Some(SystemTime::now()), - }; - events.push(Event { - id, - data: json!({}), - }) - } - } - } - events - } - // "rollback" events are higher priority than "process" events // "process" events are higher priority the older they are // "rollback" events are higher priority the newer they are - fn compare_stream_event_priority(lhs: &ListenerEvent, rhs: &ListenerEvent) -> Ordering { - match (lhs, rhs) { - (ListenerEvent::Process(_), ListenerEvent::Rollback(_)) => Ordering::Less, - (ListenerEvent::Rollback(_), ListenerEvent::Process(_)) => Ordering::Greater, - (ListenerEvent::Process(l), ListenerEvent::Process(r)) => { - r.block_slot.cmp(&l.block_slot) - } - (ListenerEvent::Rollback(l), ListenerEvent::Rollback(r)) => { - l.block_slot.cmp(&r.block_slot) - } + fn compare_event_priority(lhs: &EventReference, rhs: &EventReference) -> Ordering { + if lhs.rollback && !rhs.rollback { + return Ordering::Less; + } + if !lhs.rollback && rhs.rollback { + return Ordering::Greater; } - } - fn matches_filter(tx: &str, filter: &ListenerFilter) -> bool { - match filter { - ListenerFilter::TransactionId(id) => id == tx || id == "any", + let event_ordering = lhs + .block + .partial_cmp(&rhs.block) + .unwrap_or(Ordering::Equal) + .then(lhs.tx_index.cmp(&rhs.tx_index)) + .then(lhs.log_index.cmp(&rhs.log_index)); + if lhs.rollback && rhs.rollback { + event_ordering.reverse() + } else { + event_ordering } } } @@ -595,8 +413,7 @@ impl StreamDispatcherWorker { #[derive(Debug)] struct ListenerState { id: ListenerId, - sync: ChainListener, - filters: Vec, + stream: ChainEventStream, } enum StreamDispatcherStateChange { diff --git a/firefly-cardanoconnect/src/streams/types.rs b/firefly-cardanoconnect/src/streams/types.rs index 8032917..84ad5a1 100644 --- a/firefly-cardanoconnect/src/streams/types.rs +++ b/firefly-cardanoconnect/src/streams/types.rs @@ -87,7 +87,7 @@ impl PartialOrd for BlockReference { } } -#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct EventReference { pub block: BlockReference, pub rollback: bool, diff --git a/firefly-cardanoconnect/src/utils.rs b/firefly-cardanoconnect/src/utils.rs index 32964ae..b903db9 100644 --- a/firefly-cardanoconnect/src/utils.rs +++ b/firefly-cardanoconnect/src/utils.rs @@ -61,4 +61,7 @@ impl LazyInit { } self.value.as_mut().unwrap() } + pub fn try_get(&mut self) -> Option<&mut T> { + self.value.as_mut() + } }