Skip to content

Commit

Permalink
Support streaming from earliest, latest, or specific blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
SupernaviX committed Sep 11, 2024
1 parent 353209d commit d784c6d
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 21 deletions.
4 changes: 2 additions & 2 deletions firefly-cardanoconnect/src/blockchain/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl MockChainSync {
pub async fn find_intersect(
&mut self,
points: &[BlockReference],
) -> (Option<BlockInfo>, BlockReference) {
) -> (Option<BlockReference>, BlockReference) {
let chain = self.chain.read_lock().await;
let intersect = points.iter().find_map(|point| match point {
BlockReference::Origin => chain.first(),
Expand All @@ -64,7 +64,7 @@ impl MockChainSync {
});
self.consumer_tip = intersect.map(|b| b.as_reference()).unwrap_or_default();
let tip = chain.last().map(|b| b.as_reference()).unwrap_or_default();
(intersect.cloned(), tip)
(intersect.map(|i| i.as_reference()), tip)
}
}

Expand Down
35 changes: 32 additions & 3 deletions firefly-cardanoconnect/src/routes/streams.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use axum::extract::{Path, Query};
use axum::{extract::State, Json};
use firefly_server::apitypes::{ApiDuration, ApiResult, NoContent};
use firefly_server::apitypes::{ApiDuration, ApiError, ApiResult, NoContent};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::streams::{ListenerFilter, ListenerType, Stream};
use crate::streams::{BlockReference, ListenerFilter, ListenerType, Stream};
use crate::AppState;

fn example_batch_size() -> usize {
Expand All @@ -15,6 +15,10 @@ fn example_opt_batch_size() -> Option<usize> {
Some(example_batch_size())
}

fn example_from_block() -> Option<String> {
Some("earliest".into())
}

#[derive(Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateStreamRequest {
Expand Down Expand Up @@ -71,6 +75,8 @@ pub struct CreateListenerRequest {
pub name: String,
#[serde(rename = "type")]
pub type_: ListenerType,
#[schemars(example = "example_from_block")]
pub from_block: Option<String>,
#[serde(default)]
pub filters: Vec<ListenerFilter>,
}
Expand Down Expand Up @@ -158,8 +164,12 @@ pub async fn create_listener(
Json(req): Json<CreateListenerRequest>,
) -> ApiResult<Json<Listener>> {
let stream_id = stream_id.into();
let from_block = match req.from_block {
Some(block) => parse_block_reference(&block)?,
None => None,
};
let listener = stream_manager
.create_listener(&stream_id, &req.name, req.type_, &req.filters)
.create_listener(&stream_id, &req.name, req.type_, from_block, &req.filters)
.await?;
Ok(Json(listener.into()))
}
Expand Down Expand Up @@ -207,3 +217,22 @@ pub async fn list_listeners(
.await?;
Ok(Json(listeners.into_iter().map(|l| l.into()).collect()))
}

fn parse_block_reference(value: &str) -> ApiResult<Option<BlockReference>> {
match value {
"earliest" => Ok(Some(BlockReference::Origin)),
"latest" => Ok(None),
other => {
let Some((slot_number, slot_hash)) = other.split_once(".") else {
return Err(ApiError::bad_request("invalid block reference"));
};
let Ok(slot_number) = slot_number.parse() else {
return Err(ApiError::bad_request("invalid block reference"));
};
Ok(Some(BlockReference::Point(
slot_number,
slot_hash.to_string(),
)))
}
}
}
45 changes: 38 additions & 7 deletions firefly-cardanoconnect/src/streams/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ impl DataSource {
}
}

pub async fn listen(&self, id: ListenerId, from: &BlockReference) -> Result<ChainListener> {
pub async fn listen(
&self,
id: ListenerId,
from: Option<&BlockReference>,
) -> Result<ChainListener> {
ChainListener::init(id, &self.chain, &self.db, from).await
}
}
Expand All @@ -56,7 +60,7 @@ impl ChainListener {
id: ListenerId,
chain: &MockChain,
db: &Arc<BlockDatabase>,
from: &BlockReference,
from: Option<&BlockReference>,
) -> Result<Self> {
if let Some(records) = db.load_history(&id).await {
Self::init_existing(id, chain, db, records).await
Expand All @@ -65,6 +69,10 @@ impl ChainListener {
}
}

pub fn get_tip(&self) -> BlockReference {
self.history.front().unwrap().as_reference()
}

pub fn get_event(&mut self, block_ref: &BlockReference) -> ListenerEvent {
let (target_number, target_hash) = match block_ref {
BlockReference::Origin => (0, None),
Expand Down Expand Up @@ -230,12 +238,15 @@ impl ChainListener {

let mut sync = chain.sync();
let points: Vec<_> = history.iter().rev().map(|b| b.as_reference()).collect();
let (head, _) = sync.find_intersect(&points).await;
let Some(head) = head else {
let (head_ref, _) = sync.find_intersect(&points).await;
let Some(head_ref) = head_ref else {
// The chain didn't recognize any of the blocks we saved from this chain.
// We have no way to recover.
bail!("listener {id} is on a fork which no longer exists");
};
let Some(head) = chain.request_block(&head_ref).await else {
bail!("listener {id} is on a fork which no longer exists");
};

let mut rollbacks = HashMap::new();
while history
Expand All @@ -262,11 +273,31 @@ impl ChainListener {
id: ListenerId,
chain: &MockChain,
db: &Arc<BlockDatabase>,
from: &BlockReference,
from: Option<&BlockReference>,
) -> Result<Self> {
let mut sync = chain.sync();
let (head, _) = sync.find_intersect(&[from.clone()]).await;
let Some(head) = head else {
let head_ref = match from {
Some(block_ref) => {
// If the caller passed a block reference, they're starting from either the origin or a specific point
let (head, _) = sync.find_intersect(&[block_ref.clone()]).await;
let Some(head) = head else {
// Trying to init a fresh listener from a ref which does not exist
bail!("could not start listening from {from:?}, as it does not exist on-chain");
};
head
}
None => {
// Otherwise, they just want to follow from the tip
let (_, tip) = sync.find_intersect(&[]).await;
// Call find_intersect again so the chainsync protocol knows we're following from the tip
let (head, _) = sync.find_intersect(&[tip.clone()]).await;
if !head.is_some_and(|h| h == tip) {
bail!("could not start listening from latest: rollback occurred while we were connecting");
};
tip
}
};
let Some(head) = chain.request_block(&head_ref).await else {
// Trying to init a fresh listener from a ref which does not exist
bail!("could not start listening from {from:?}, as it does not exist on-chain");
};
Expand Down
5 changes: 3 additions & 2 deletions firefly-cardanoconnect/src/streams/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::persistence::Persistence;

use super::{
mux::{Batch, Multiplexer},
Listener, ListenerFilter, ListenerId, ListenerType, Stream, StreamId,
BlockReference, Listener, ListenerFilter, ListenerId, ListenerType, Stream, StreamId,
};

pub struct StreamManager {
Expand Down Expand Up @@ -89,6 +89,7 @@ impl StreamManager {
stream_id: &StreamId,
name: &str,
listener_type: ListenerType,
from_block: Option<BlockReference>,
filters: &[ListenerFilter],
) -> ApiResult<Listener> {
if listener_type != ListenerType::Events {
Expand All @@ -105,7 +106,7 @@ impl StreamManager {
filters: filters.to_vec(),
};
self.persistence.write_listener(&listener).await?;
match self.mux.handle_listener_write(&listener).await {
match self.mux.handle_listener_write(&listener, from_block).await {
Ok(()) => Ok(listener),
Err(err) => {
self.persistence
Expand Down
24 changes: 17 additions & 7 deletions firefly-cardanoconnect/src/streams/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,27 @@ impl Multiplexer {
self.dispatchers.remove(id);
}

pub async fn handle_listener_write(&self, listener: &Listener) -> Result<()> {
let hwm = EventReference::default(); // TODO: caller can provide this
pub async fn handle_listener_write(
&self,
listener: &Listener,
from_block: Option<BlockReference>,
) -> Result<()> {
let sync = self
.data_source
.listen(listener.id.clone(), &hwm.block)
.listen(listener.id.clone(), from_block.as_ref())
.await?;
let block = from_block.unwrap_or(sync.get_tip());
let hwm = EventReference {
block,
rollback: false,
tx_index: None,
log_index: None,
};

let Some(dispatcher) = self.dispatchers.get(&listener.stream_id) else {
bail!("new listener created for stream we haven't heard of");
};
dispatcher
.add_listener(listener, sync, EventReference::default())
.await
dispatcher.add_listener(listener, sync, hwm).await
}

pub async fn handle_listener_delete(
Expand Down Expand Up @@ -141,7 +149,9 @@ impl StreamDispatcher {
let mut hwms = BTreeMap::new();
for listener in all_listeners {
let hwm = old_hwms.get(&listener.id).cloned().unwrap_or_default();
let stream = data_source.listen(listener.id.clone(), &hwm.block).await?;
let stream = data_source
.listen(listener.id.clone(), Some(&hwm.block))
.await?;

hwms.insert(listener.id.clone(), hwm);
listeners.insert(
Expand Down
1 change: 1 addition & 0 deletions firefly-cardanoconnect/src/streams/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub enum ListenerType {
}

#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub enum ListenerFilter {
TransactionId(String),
}
Expand Down
3 changes: 3 additions & 0 deletions firefly-server/src/apitypes/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ impl ApiError {
}
err.into()
}
pub fn bad_request(message: impl Into<String>) -> Self {
Self::new(StatusCode::BAD_REQUEST, message)
}
pub fn not_found(message: impl Into<String>) -> Self {
Self::new(StatusCode::NOT_FOUND, message)
}
Expand Down

0 comments on commit d784c6d

Please sign in to comment.