Skip to content

Commit

Permalink
Update resource counters with write operations (#1541)
Browse files Browse the repository at this point in the history
* Simplify `Batch` size calculation

Use a more functional paradigm.

* Count the number of bytes written per block

Update the `RuntimeCounts` on every write call from applications.

* Add a `Batch::num_operations` method

Returns the number of write operations contained in the batch.

* Keep track of the number of writes per block

Update the `RuntimeCounts` on every write request from an application.

* Add write operation count to `RuntimeCounts`

Prepare to keep track of the number of write operations executed.

* Derive `Debug` for `Batch`

Make it simple to print out the batch operations.

* Impl. `Debug` for `execution_state_actor::Request`

Make it easy to print unexpected requests in tests.

* Derive `Clone` and `Eq` for `Batch`

Make it more ergonomic to use `Batch` in tests.

* Test if resources are updated by `write_batch`

Request to write a dummy batch and check that the resource consumption
counters are updated appropriately.
  • Loading branch information
jvff authored Jan 22, 2024
1 parent f26ef5b commit bce66c1
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 17 deletions.
61 changes: 61 additions & 0 deletions linera-execution/src/execution_state_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use linera_views::{
views::{View, ViewError},
};
use oneshot::Sender;
use std::fmt::{self, Debug, Formatter};

pub(crate) type ExecutionStateSender = mpsc::UnboundedSender<Request>;

Expand Down Expand Up @@ -167,3 +168,63 @@ pub enum Request {
callback: Sender<()>,
},
}

impl Debug for Request {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
match self {
Request::LoadContract { id, .. } => formatter
.debug_struct("Request::LoadContract")
.field("id", id)
.finish_non_exhaustive(),

Request::LoadService { id, .. } => formatter
.debug_struct("Request::LoadService")
.field("id", id)
.finish_non_exhaustive(),

Request::SystemBalance { .. } => formatter
.debug_struct("Request::SystemBalance")
.finish_non_exhaustive(),

Request::SystemTimestamp { .. } => formatter
.debug_struct("Request::SystemTimestamp")
.finish_non_exhaustive(),

Request::ReadValueBytes { id, key, .. } => formatter
.debug_struct("Request::ReadValueBytes")
.field("id", id)
.field("key", key)
.finish_non_exhaustive(),

Request::ContainsKey { id, key, .. } => formatter
.debug_struct("Request::ContainsKey")
.field("id", id)
.field("key", key)
.finish_non_exhaustive(),

Request::ReadMultiValuesBytes { id, keys, .. } => formatter
.debug_struct("Request::ReadMultiValuesBytes")
.field("id", id)
.field("keys", keys)
.finish_non_exhaustive(),

Request::FindKeysByPrefix { id, key_prefix, .. } => formatter
.debug_struct("Request::FindKeysByPrefix")
.field("id", id)
.field("key_prefix", key_prefix)
.finish_non_exhaustive(),

Request::FindKeyValuesByPrefix { id, key_prefix, .. } => formatter
.debug_struct("Request::FindKeyValuesByPrefix")
.field("id", id)
.field("key_prefix", key_prefix)
.finish_non_exhaustive(),

Request::WriteBatch { id, batch, .. } => formatter
.debug_struct("Request::WriteBatch")
.field("id", id)
.field("batch", batch)
.finish_non_exhaustive(),
}
}
}
14 changes: 14 additions & 0 deletions linera-execution/src/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub struct ResourceTracker {
pub used_fuel: u64,
/// The number of reads in the computation
pub num_reads: u64,
/// The number of writes in the computation
pub num_writes: u64,
/// The total number of bytes read
pub bytes_read: u64,
/// The total number of bytes written
Expand Down Expand Up @@ -124,6 +126,8 @@ pub struct RuntimeCounts {
pub remaining_fuel: u64,
/// The number of read operations
pub num_reads: u64,
/// The number of write operations
pub num_writes: u64,
/// The bytes that have been read
pub bytes_read: u64,
/// The bytes that have been written
Expand All @@ -141,6 +145,16 @@ impl RuntimeCounts {
Ok(())
}

/// Increments the number of executed write operations.
pub fn increment_num_writes(
&mut self,
_limits: &RuntimeLimits,
amount: u64,
) -> Result<(), ExecutionError> {
self.num_writes += amount;
Ok(())
}

pub fn increment_bytes_read(
&mut self,
limits: &RuntimeLimits,
Expand Down
8 changes: 8 additions & 0 deletions linera-execution/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ use std::{
sync::{Arc, Mutex},
};

#[cfg(test)]
#[path = "unit_tests/runtime_tests.rs"]
mod tests;

#[derive(Debug)]
pub struct SyncRuntime<UserInstance>(Arc<Mutex<SyncRuntimeInternal<UserInstance>>>);

Expand Down Expand Up @@ -704,6 +708,10 @@ impl<UserInstance> BaseRuntime for SyncRuntimeInternal<UserInstance> {
let id = self.application_id()?;
let state = self.view_user_states.entry(id).or_default();
state.force_all_pending_queries()?;
self.runtime_counts
.increment_num_writes(&self.runtime_limits, batch.num_operations() as u64)?;
self.runtime_counts
.increment_bytes_written(&self.runtime_limits, batch.size() as u64)?;
self.execution_state_sender
.send_request(|callback| Request::WriteBatch {
id,
Expand Down
122 changes: 122 additions & 0 deletions linera-execution/src/unit_tests/runtime_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

//! Resource consumption unit tests.
use super::{ApplicationStatus, SyncRuntimeInternal};
use crate::{
execution_state_actor::Request, resources::RuntimeLimits, BaseRuntime, UserContractInstance,
};
use futures::{channel::mpsc, StreamExt};
use linera_base::{
data_types::BlockHeight,
identifiers::{ApplicationId, BytecodeId, ChainDescription, MessageId},
};
use linera_views::batch::Batch;
use std::thread;
use tokio::runtime::Runtime;

/// Test writing a batch of changes.
///
/// Ensure that resource consumption counts are updated correctly.
#[test]
fn test_write_batch() {
let (mut runtime, mut execution_state_receiver) = create_contract_runtime();
let mut batch = Batch::new();

let write_key = vec![1, 2, 3, 4, 5];
let write_data = vec![6, 7, 8, 9];
let delete_key = vec![10, 11, 12];
let delete_key_prefix = vec![13, 14, 15, 16, 17, 18];

let expected_bytes_count =
write_key.len() + write_data.len() + delete_key.len() + delete_key_prefix.len();

batch.put_key_value_bytes(write_key, write_data);
batch.delete_key(delete_key);
batch.delete_key_prefix(delete_key_prefix);

let expected_write_count = batch.operations.len();
let expected_application_id = runtime.current_application().id;
let expected_batch = batch.clone();

thread::spawn(move || {
Runtime::new()
.expect("Failed to create Tokio runtime")
.block_on(async move {
let request = execution_state_receiver
.next()
.await
.expect("Missing expected request to write a batch");

let Request::WriteBatch {
id,
batch,
callback,
} = request
else {
panic!("Expected a `Request::WriteBatch` but got {request:?} instead");
};

assert_eq!(id, expected_application_id);
assert_eq!(batch, expected_batch);

callback
.send(())
.expect("Failed to notify that writing the batch finished");
})
});

runtime
.write_batch(batch)
.expect("Failed to write test batch");

assert_eq!(
runtime.runtime_counts.num_writes,
expected_write_count as u64
);
assert_eq!(
runtime.runtime_counts.bytes_written,
expected_bytes_count as u64
);
}

/// Creates a [`SyncRuntimeInternal`] instance for contracts, and returns it and the receiver
/// endpoint for the requests the runtime sends to the [`ExecutionStateView`] actor.
fn create_contract_runtime() -> (
SyncRuntimeInternal<UserContractInstance>,
mpsc::UnboundedReceiver<Request>,
) {
let chain_id = ChainDescription::Root(0).into();
let (execution_state_sender, execution_state_receiver) = mpsc::unbounded();
let limits = RuntimeLimits::default();

let mut runtime = SyncRuntimeInternal::new(chain_id, execution_state_sender, limits, 0);

runtime.push_application(create_dummy_application());

(runtime, execution_state_receiver)
}

/// Create an [`ApplicationStatus`] for a dummy application.
fn create_dummy_application() -> ApplicationStatus {
let chain_id = ChainDescription::Root(1).into();
let id = ApplicationId {
bytecode_id: BytecodeId::new(MessageId {
chain_id,
height: BlockHeight(1),
index: 0,
}),
creation: MessageId {
chain_id,
height: BlockHeight(1),
index: 1,
},
};

ApplicationStatus {
id,
parameters: vec![],
signer: None,
}
}
32 changes: 15 additions & 17 deletions linera-views/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::{
/// * Deletion of a specific key.
/// * Deletion of all keys matching a specific prefix.
/// * Insertion or replacement of a key with a value.
#[derive(Debug)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum WriteOperation {
/// Delete the given key.
Delete {
Expand All @@ -58,7 +58,7 @@ pub enum WriteOperation {
}

/// A batch of write operations.
#[derive(Default)]
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct Batch {
/// The write operations.
pub operations: Vec<WriteOperation>,
Expand Down Expand Up @@ -175,21 +175,19 @@ impl Batch {

/// The total size of the batch
pub fn size(&self) -> usize {
let mut size = 0;
for operation in &self.operations {
match operation {
WriteOperation::Delete { key } => {
size += key.len();
}
WriteOperation::Put { key, value } => {
size += key.len() + value.len();
}
WriteOperation::DeletePrefix { key_prefix } => {
size += key_prefix.len();
}
}
}
size
self.operations
.iter()
.map(|operation| match operation {
WriteOperation::Delete { key } => key.len(),
WriteOperation::Put { key, value } => key.len() + value.len(),
WriteOperation::DeletePrefix { key_prefix } => key_prefix.len(),
})
.sum()
}

/// Returns the number of operations in this [`Batch`].
pub fn num_operations(&self) -> usize {
self.operations.len()
}

/// Builds a batch from a builder function.
Expand Down

0 comments on commit bce66c1

Please sign in to comment.