Skip to content

Commit

Permalink
Simplify streams
Browse files Browse the repository at this point in the history
  • Loading branch information
SupernaviX committed Sep 9, 2024
1 parent 1e63516 commit 353209d
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 160 deletions.
101 changes: 6 additions & 95 deletions firefly-cardanoconnect/src/streams/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,14 @@ use crate::blockchain::mocks::{MockChain, MockChainSync, RequestNextResponse};

use super::{BlockInfo, BlockReference, ListenerId};

#[derive(Debug)]
pub enum ListenerEvent {
Process(BlockInfo),
Rollback(BlockInfo),
}
pub struct UpdateRef {
pub from: BlockReference,
pub to: BlockReference,
pub rollback: bool,
}

pub struct DataSource {
chain: Arc<MockChain>,
listeners: Arc<DashMap<ListenerId, ChainListener>>,
db: Arc<BlockDatabase>,
}

Expand All @@ -39,109 +34,25 @@ impl DataSource {
pub fn new() -> Self {
Self {
chain: Arc::new(MockChain::new(3000)),
listeners: Arc::new(DashMap::new()),
db: Arc::new(BlockDatabase::default()),
}
}

pub async fn init_listener(&self, id: ListenerId, from: &BlockReference) -> Result<()> {
let listener = ChainListener::init(id.clone(), &self.chain, &self.db, from).await?;
self.listeners.insert(id, listener);
Ok(())
}

// Returns the current "listener event" which these listeners should be handling.
// We prioritize processing rollbacks for newer blocks,
// then processing blocks from oldest to newest..
pub fn current_event(&self, refs: &[(ListenerId, BlockReference)]) -> ListenerEvent {
assert!(!refs.is_empty());

let events: Vec<ListenerEvent> = refs
.iter()
.map(|(listener_id, block_ref)| {
let mut listener = self
.listeners
.get_mut(listener_id)
.expect("listener should still exist");
listener.get_event(block_ref)
})
.collect();

let mut process_candidate = None;
let mut rollback_candidate = None;
for event in events {
match event {
ListenerEvent::Process(i) => {
if !process_candidate
.as_ref()
.is_some_and(|c: &BlockInfo| c.block_number < i.block_number)
{
process_candidate = Some(i);
}
}
ListenerEvent::Rollback(i) => {
if !rollback_candidate
.as_ref()
.is_some_and(|c: &BlockInfo| c.block_number > i.block_number)
{
rollback_candidate = Some(i);
}
}
}
}

rollback_candidate
.map(ListenerEvent::Rollback)
.or(process_candidate.map(ListenerEvent::Process))
.unwrap()
}

// Called when a set of stream listeners has finished processing a block,
// this method tells them which block they can process next.
pub async fn update_refs(
&self,
refs: &[(ListenerId, BlockReference)],
last_event: &ListenerEvent,
) -> Option<UpdateRef> {
let (block, rollback) = match last_event {
ListenerEvent::Process(block) => (block, false),
ListenerEvent::Rollback(block) => (block, true),
};
let (listener_id, block_ref) = refs
.iter()
.find(|(_, r)| match r {
BlockReference::Origin => block.block_number == 0,
BlockReference::Point(number, hash) => {
block.block_number == *number && block.block_hash == *hash
}
})
.expect("we returned an event that nobody asked us to");
let mut listener = self
.listeners
.get_mut(listener_id)
.expect("listener should still exist");
let next_ref = if rollback {
Some(listener.get_next_rollback(block_ref))
} else {
listener.get_next(block_ref).await
};
next_ref.map(|r| UpdateRef {
from: block_ref.clone(),
to: r,
rollback,
})
pub async fn listen(&self, id: ListenerId, from: &BlockReference) -> Result<ChainListener> {
ChainListener::init(id, &self.chain, &self.db, from).await
}
}

struct ChainListener {
#[derive(Debug)]
pub struct ChainListener {
history: VecDeque<BlockInfo>,
rollbacks: HashMap<BlockReference, BlockInfo>,
sync_event_source: mpsc::Receiver<ChainSyncEvent>,
block_record_sink: mpsc::UnboundedSender<BlockRecord>,
}

impl ChainListener {
pub async fn init(
async fn init(
id: ListenerId,
chain: &MockChain,
db: &Arc<BlockDatabase>,
Expand Down
Loading

0 comments on commit 353209d

Please sign in to comment.