diff --git a/linera-base/src/identifiers.rs b/linera-base/src/identifiers.rs index 6438a4564c02..433c28a64469 100644 --- a/linera-base/src/identifiers.rs +++ b/linera-base/src/identifiers.rs @@ -420,6 +420,29 @@ pub struct StreamId { pub stream_name: StreamName, } +/// An event identifier. +#[derive( + Debug, + PartialEq, + Eq, + Hash, + Clone, + Serialize, + Deserialize, + WitLoad, + WitStore, + WitType, + SimpleObject, +)] +pub struct EventId { + /// The ID of the chain that generated this event. + pub chain_id: ChainId, + /// The ID of the stream this event belongs to. + pub stream_id: StreamId, + /// The event key. + pub key: Vec, +} + /// The destination of a message, relative to a particular application. #[derive( Clone, diff --git a/linera-chain/src/data_types.rs b/linera-chain/src/data_types.rs index 690de73d92a4..baa28a702395 100644 --- a/linera-chain/src/data_types.rs +++ b/linera-chain/src/data_types.rs @@ -17,8 +17,8 @@ use linera_base::{ hashed::Hashed, hex_debug, identifiers::{ - Account, BlobId, BlobType, ChainId, ChannelName, Destination, GenericApplicationId, - MessageId, Owner, StreamId, + Account, BlobId, BlobType, ChainId, ChannelName, Destination, EventId, + GenericApplicationId, MessageId, Owner, StreamId, }, }; use linera_execution::{ @@ -431,6 +431,17 @@ pub struct EventRecord { pub value: Vec, } +impl EventRecord { + /// Returns the ID of this event record, given the publisher chain ID. + pub fn id(&self, chain_id: ChainId) -> EventId { + EventId { + chain_id, + stream_id: self.stream_id.clone(), + key: self.key.clone(), + } + } +} + impl<'de> BcsHashable<'de> for EventRecord {} /// The hash and chain ID of a `CertificateValue`. diff --git a/linera-core/src/chain_worker/state/attempted_changes.rs b/linera-core/src/chain_worker/state/attempted_changes.rs index bdfbbee76d93..92a105c01c7c 100644 --- a/linera-core/src/chain_worker/state/attempted_changes.rs +++ b/linera-core/src/chain_worker/state/attempted_changes.rs @@ -347,6 +347,13 @@ where .storage .write_blobs_and_certificate(blobs, &certificate) .await?; + let events = executed_block + .outcome + .events + .iter() + .flatten() + .map(|event| (event.id(chain_id), &event.value[..])); + self.state.storage.write_events(events).await?; } // Update the blob state with last used certificate hash. diff --git a/linera-storage/src/db_storage.rs b/linera-storage/src/db_storage.rs index 418b0bd7ccd4..86f14b970327 100644 --- a/linera-storage/src/db_storage.rs +++ b/linera-storage/src/db_storage.rs @@ -11,7 +11,7 @@ use linera_base::{ crypto::CryptoHash, data_types::{Blob, TimeDelta, Timestamp}, hashed::Hashed, - identifiers::{BlobId, ChainId, UserApplicationId}, + identifiers::{BlobId, ChainId, EventId, UserApplicationId}, }; use linera_chain::{ types::{ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate}, @@ -185,6 +185,28 @@ pub static LOAD_CHAIN_LATENCY: LazyLock = LazyLock::new(|| { ) }); +/// The metric counting how often an event is read from storage. +#[cfg(with_metrics)] +#[doc(hidden)] +pub static READ_EVENT_COUNTER: LazyLock = LazyLock::new(|| { + register_int_counter_vec( + "read_event", + "The metric counting how often an event is read from storage", + &[], + ) +}); + +/// The metric counting how often an event is written to storage. +#[cfg(with_metrics)] +#[doc(hidden)] +pub static WRITE_EVENT_COUNTER: LazyLock = LazyLock::new(|| { + register_int_counter_vec( + "write_event", + "The metric counting how often an event is written to storage", + &[], + ) +}); + trait BatchExt { fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError>; @@ -192,6 +214,8 @@ trait BatchExt { fn add_certificate(&mut self, certificate: &ConfirmedBlockCertificate) -> Result<(), ViewError>; + + fn add_event(&mut self, event_id: EventId, value: &[u8]) -> Result<(), ViewError>; } impl BatchExt for Batch { @@ -222,6 +246,14 @@ impl BatchExt for Batch { self.put_key_value(value_key.to_vec(), certificate.value())?; Ok(()) } + + fn add_event(&mut self, event_id: EventId, value: &[u8]) -> Result<(), ViewError> { + #[cfg(with_metrics)] + WRITE_EVENT_COUNTER.with_label_values(&[]).inc(); + let event_key = bcs::to_bytes(&BaseKey::Event(event_id))?; + self.put_key_value_bytes(event_key.to_vec(), value.to_vec()); + Ok(()) + } } /// Main implementation of the [`Storage`] trait. @@ -242,6 +274,7 @@ enum BaseKey { ConfirmedBlock(CryptoHash), Blob(BlobId), BlobState(BlobId), + Event(EventId), } const INDEX_BLOB: u8 = 3; @@ -741,6 +774,25 @@ where Ok(certificates) } + async fn read_event(&self, event_id: EventId) -> Result, ViewError> { + let event_key = bcs::to_bytes(&BaseKey::Event(event_id.clone()))?; + let maybe_value = self.store.read_value::>(&event_key).await?; + #[cfg(with_metrics)] + READ_EVENT_COUNTER.with_label_values(&[]).inc(); + maybe_value.ok_or_else(|| ViewError::not_found("value for event ID", event_id)) + } + + async fn write_events( + &self, + events: impl IntoIterator + Send, + ) -> Result<(), ViewError> { + let mut batch = Batch::new(); + for (event_id, value) in events { + batch.add_event(event_id, value)?; + } + self.write_batch(batch).await + } + fn wasm_runtime(&self) -> Option { self.wasm_runtime } diff --git a/linera-storage/src/lib.rs b/linera-storage/src/lib.rs index d5521daedd57..6506876fc0fd 100644 --- a/linera-storage/src/lib.rs +++ b/linera-storage/src/lib.rs @@ -16,7 +16,7 @@ use linera_base::{ data_types::{Amount, Blob, BlockHeight, TimeDelta, Timestamp, UserApplicationDescription}, hashed::Hashed, identifiers::{ - BlobId, ChainDescription, ChainId, GenericApplicationId, Owner, UserApplicationId, + BlobId, ChainDescription, ChainId, EventId, GenericApplicationId, Owner, UserApplicationId, }, ownership::ChainOwnership, }; @@ -163,6 +163,15 @@ pub trait Storage: Sized { hashes: I, ) -> Result, ViewError>; + /// Reads the event with the given ID. + async fn read_event(&self, id: EventId) -> Result, ViewError>; + + /// Writes a vector of events. + async fn write_events( + &self, + events: impl IntoIterator + Send, + ) -> Result<(), ViewError>; + /// Loads the view of a chain state and checks that it is active. /// /// # Notes