Skip to content

Commit

Permalink
Write events to storage. (#3301)
Browse files Browse the repository at this point in the history
## Motivation

We want to use event streams for committee changes to address the admin
chain's scalability issues. Ultimately we want to replace pub-sub
channels with event streams entirely. (#365)

## Proposal

Write committed blocks' events to storage.

## Test Plan

This doesn't use them yet, and just ports parts of
#2309 to `main`, so we
don't have to rebase them again in the future. CI should catch any
regressions, and the event functionality itself will be tested in
upcoming PRs, when events are actually used.

## Release Plan

- Nothing to do / These changes follow the usual release cycle.

## Links

- Porting parts of #2309.
- Related to #365.
- [reviewer
checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
  • Loading branch information
afck authored Feb 12, 2025
1 parent e57957f commit fbe0732
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 4 deletions.
23 changes: 23 additions & 0 deletions linera-base/src/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,29 @@ pub struct StreamId {
pub stream_name: StreamName,
}

/// An event identifier.
#[derive(
Debug,
PartialEq,
Eq,
Hash,
Clone,
Serialize,
Deserialize,
WitLoad,
WitStore,
WitType,
SimpleObject,
)]
pub struct EventId {
/// The ID of the chain that generated this event.
pub chain_id: ChainId,
/// The ID of the stream this event belongs to.
pub stream_id: StreamId,
/// The event key.
pub key: Vec<u8>,
}

/// The destination of a message, relative to a particular application.
#[derive(
Clone,
Expand Down
15 changes: 13 additions & 2 deletions linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use linera_base::{
hashed::Hashed,
hex_debug,
identifiers::{
Account, BlobId, BlobType, ChainId, ChannelName, Destination, GenericApplicationId,
MessageId, Owner, StreamId,
Account, BlobId, BlobType, ChainId, ChannelName, Destination, EventId,
GenericApplicationId, MessageId, Owner, StreamId,
},
};
use linera_execution::{
Expand Down Expand Up @@ -431,6 +431,17 @@ pub struct EventRecord {
pub value: Vec<u8>,
}

impl EventRecord {
/// Returns the ID of this event record, given the publisher chain ID.
pub fn id(&self, chain_id: ChainId) -> EventId {
EventId {
chain_id,
stream_id: self.stream_id.clone(),
key: self.key.clone(),
}
}
}

impl<'de> BcsHashable<'de> for EventRecord {}

/// The hash and chain ID of a `CertificateValue`.
Expand Down
7 changes: 7 additions & 0 deletions linera-core/src/chain_worker/state/attempted_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,13 @@ where
.storage
.write_blobs_and_certificate(blobs, &certificate)
.await?;
let events = executed_block
.outcome
.events
.iter()
.flatten()
.map(|event| (event.id(chain_id), &event.value[..]));
self.state.storage.write_events(events).await?;
}

// Update the blob state with last used certificate hash.
Expand Down
54 changes: 53 additions & 1 deletion linera-storage/src/db_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use linera_base::{
crypto::CryptoHash,
data_types::{Blob, TimeDelta, Timestamp},
hashed::Hashed,
identifiers::{BlobId, ChainId, UserApplicationId},
identifiers::{BlobId, ChainId, EventId, UserApplicationId},
};
use linera_chain::{
types::{ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
Expand Down Expand Up @@ -185,13 +185,37 @@ pub static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
)
});

/// The metric counting how often an event is read from storage.
#[cfg(with_metrics)]
#[doc(hidden)]
pub static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec(
"read_event",
"The metric counting how often an event is read from storage",
&[],
)
});

/// The metric counting how often an event is written to storage.
#[cfg(with_metrics)]
#[doc(hidden)]
pub static WRITE_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec(
"write_event",
"The metric counting how often an event is written to storage",
&[],
)
});

trait BatchExt {
fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError>;

fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError>;

fn add_certificate(&mut self, certificate: &ConfirmedBlockCertificate)
-> Result<(), ViewError>;

fn add_event(&mut self, event_id: EventId, value: &[u8]) -> Result<(), ViewError>;
}

impl BatchExt for Batch {
Expand Down Expand Up @@ -222,6 +246,14 @@ impl BatchExt for Batch {
self.put_key_value(value_key.to_vec(), certificate.value())?;
Ok(())
}

fn add_event(&mut self, event_id: EventId, value: &[u8]) -> Result<(), ViewError> {
#[cfg(with_metrics)]
WRITE_EVENT_COUNTER.with_label_values(&[]).inc();
let event_key = bcs::to_bytes(&BaseKey::Event(event_id))?;
self.put_key_value_bytes(event_key.to_vec(), value.to_vec());
Ok(())
}
}

/// Main implementation of the [`Storage`] trait.
Expand All @@ -242,6 +274,7 @@ enum BaseKey {
ConfirmedBlock(CryptoHash),
Blob(BlobId),
BlobState(BlobId),
Event(EventId),
}

const INDEX_BLOB: u8 = 3;
Expand Down Expand Up @@ -741,6 +774,25 @@ where
Ok(certificates)
}

async fn read_event(&self, event_id: EventId) -> Result<Vec<u8>, ViewError> {
let event_key = bcs::to_bytes(&BaseKey::Event(event_id.clone()))?;
let maybe_value = self.store.read_value::<Vec<u8>>(&event_key).await?;
#[cfg(with_metrics)]
READ_EVENT_COUNTER.with_label_values(&[]).inc();
maybe_value.ok_or_else(|| ViewError::not_found("value for event ID", event_id))
}

async fn write_events(
&self,
events: impl IntoIterator<Item = (EventId, &[u8])> + Send,
) -> Result<(), ViewError> {
let mut batch = Batch::new();
for (event_id, value) in events {
batch.add_event(event_id, value)?;
}
self.write_batch(batch).await
}

fn wasm_runtime(&self) -> Option<WasmRuntime> {
self.wasm_runtime
}
Expand Down
11 changes: 10 additions & 1 deletion linera-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use linera_base::{
data_types::{Amount, Blob, BlockHeight, TimeDelta, Timestamp, UserApplicationDescription},
hashed::Hashed,
identifiers::{
BlobId, ChainDescription, ChainId, GenericApplicationId, Owner, UserApplicationId,
BlobId, ChainDescription, ChainId, EventId, GenericApplicationId, Owner, UserApplicationId,
},
ownership::ChainOwnership,
};
Expand Down Expand Up @@ -163,6 +163,15 @@ pub trait Storage: Sized {
hashes: I,
) -> Result<Vec<ConfirmedBlockCertificate>, ViewError>;

/// Reads the event with the given ID.
async fn read_event(&self, id: EventId) -> Result<Vec<u8>, ViewError>;

/// Writes a vector of events.
async fn write_events(
&self,
events: impl IntoIterator<Item = (EventId, &[u8])> + Send,
) -> Result<(), ViewError>;

/// Loads the view of a chain state and checks that it is active.
///
/// # Notes
Expand Down

0 comments on commit fbe0732

Please sign in to comment.