From 819a7ef93b232459acf02322cfabb2c13b25e821 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Thu, 23 May 2024 22:28:38 +0000 Subject: [PATCH] feat: [MR-551] Implement `CanisterQueue` --- .../queues/message_pool/tests.rs | 9 +- .../src/canister_state/queues/queue.rs | 354 +++++++++++++++-- .../src/canister_state/queues/queue/tests.rs | 362 +++++++++++++++++- rs/types/types/src/funds/cycles.rs | 7 + 4 files changed, 687 insertions(+), 45 deletions(-) diff --git a/rs/replicated_state/src/canister_state/queues/message_pool/tests.rs b/rs/replicated_state/src/canister_state/queues/message_pool/tests.rs index 0b7007a9af8..f93d5c91c10 100644 --- a/rs/replicated_state/src/canister_state/queues/message_pool/tests.rs +++ b/rs/replicated_state/src/canister_state/queues/message_pool/tests.rs @@ -982,13 +982,8 @@ fn assert_trimmed_priority_queues(pool: &MessagePool) { } /// Generates a `MessageId` for a best-effort inbound request. -pub(crate) fn new_request_message_id(generator: u64) -> MessageId { - MessageId::new( - Kind::Request, - Context::Inbound, - Class::BestEffort, - generator, - ) +pub(crate) fn new_request_message_id(generator: u64, class: Class) -> MessageId { + MessageId::new(Kind::Request, Context::Inbound, class, generator) } /// Generates a `MessageId` for an inbound response. diff --git a/rs/replicated_state/src/canister_state/queues/queue.rs b/rs/replicated_state/src/canister_state/queues/queue.rs index b279390f4cf..33d27e78a3c 100644 --- a/rs/replicated_state/src/canister_state/queues/queue.rs +++ b/rs/replicated_state/src/canister_state/queues/queue.rs @@ -1,19 +1,318 @@ -use crate::StateError; -#[cfg(test)] -mod tests; +// TODO(MR-569) Remove when `CanisterQueues` has been updated to use this. +#![allow(dead_code)] +use super::message_pool::{Class, MessageId, MessagePool}; +use crate::StateError; use ic_base_types::CanisterId; use ic_protobuf::proxy::ProxyDecodeError; use ic_protobuf::state::{ingress::v1 as pb_ingress, queues::v1 as pb_queues}; -use ic_types::messages::{Ingress, Request, RequestOrResponse, Response}; +use ic_types::messages::{Ingress, Request, RequestOrResponse, Response, NO_DEADLINE}; use ic_types::{CountBytes, Cycles, Time}; -use std::collections::BTreeMap; -use std::{ - collections::VecDeque, - convert::{From, TryFrom, TryInto}, - mem::size_of, - sync::Arc, -}; +use std::collections::{BTreeMap, VecDeque}; +use std::convert::{From, TryFrom, TryInto}; +use std::mem::size_of; +use std::sync::Arc; + +#[cfg(test)] +mod tests; + +/// A reference to a message, used as `CanisterQueue` item. +/// +/// May be a weak reference into the message pool; or identify a reject response to +/// a specific callback. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(super) enum MessageReference { + /// Weak reference to a `Request` held in the message pool. + /// + /// Guaranteed response call requests in output queues and best-effort requests + /// in input or output queues may time out and be dropped from the pool. Such + /// stale references can be safely ignored. Guaranteed response call requests in + /// input queues never time out. + /// + /// All best-effort requests are subject to load shedding and may be dropped + /// from the pool at any time. + Request(MessageId), + + /// Weak reference to a `Response` held in the message pool. + /// + /// Best-effort responses in output queues may time out and be dropped from the + /// pool. Best-effort responses in input queues and guaranteed responses never + /// time out. Additionally, best-effort responses are subject to load shedding + /// and may be dropped from the pool at any time. + /// + /// Stale response references in output queues can be safely ignored. + /// + /// Stale response references in input queues were either enqueued as dangling + /// timeout reject response markers (with the corresponding response never + /// inserted into the pool); or they are the result of shedding a best-effort + /// response. Meaning that stale response references in input queues are always + /// `SYS_UNKNOWN` reject responses to best-effort calls ("timeout" if deadline + /// has expired, "drop" otherwise.) + Response(MessageId), + // + // TODO(MR-552) Define and use variants for best-effort and guaranteed reject + // responses, so we don't need to allocate a full message. + + // Local known (i.e. `SYS_TRANSIENT`) reject for a guaranteed response call: + // "timeout" if deadline has expired, "drop" otherwise. + // LocalRejectGuaranteedResponse(CallbackId), + + // Local known (i.e. `SYS_TRANSIENT`) reject for a best-effort response call: + // "timeout" if deadline has expired, "drop" otherwise. + // LocalRejectBestEffortResponse(CallbackId), +} + +impl MessageReference { + /// Returns `true` if this is a reference to a response; or a reject response. + pub(super) fn is_response(&self) -> bool { + matches!(self, Self::Response(_)) + } + + /// Returns the `MessageId` behind this reference. + pub(super) fn id(&self) -> MessageId { + match self { + Self::Request(id) | Self::Response(id) => *id, + } + } +} + +/// A FIFO queue with equal but separate capacities for requests and responses, +/// ensuring full-duplex communication up to the capacity. +/// +/// For the most part (with the exception of transient reject response markers) +/// the queue holds weak references into a `MessagePool`. The messages that +/// these references point to may expire or be shed, resulting in stale +/// references that are not immediately removed from the queue. Which is why the +/// queue stats track "request slots" and "response slots" instead of "requests" +/// and "responses" and `len()` returns the length of the queue, not the number +/// of messages that can be popped. +/// +/// Backpressure (limiting number of open callbacks to a given destination) is +/// enforced by making enqueuing a request contingent on reserving a slot for +/// the eventual response in the reverse queue; and bounding the number of +/// responses (actually enqueued plus reserved slots) by the queue capacity. +/// Note that this ensures that a response is only ever enqueued into a slot +/// already reserved for it. +/// +/// Backpressure should implicitly limit the number of requests (since there +/// cannot be more live requests than callbacks). It is however possible for +/// requests to time out; produce a reject response in the reverse queue; and +/// for that response to be consumed while the request still consumes a slot in +/// the queue; so we must additionally explicitly limit the number of slots used +/// by requests to the queue capacity. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct CanisterQueue { + /// A FIFO queue of all requests and responses. + /// + /// Since responses may be enqueued at arbitrary points in time, reserved slots + /// for responses cannot be explicitly represented in `queue`. They only exist + /// as the difference between `response_slots` and the number of actually + /// enqueued response references (calculated as `request_slots + response_slots + /// - queue.len()`). + queue: VecDeque, + + /// Maximum number of requests; or responses + reserved slots; that can be held + /// in the queue at any one time. + capacity: usize, + + /// Number of enqueued request references. + request_slots: usize, + + /// Number of slots used by response references or reserved for expected + /// responses. + response_slots: usize, + + /// Memory reservations for expected guaranteed responses. + response_memory_reservations: usize, +} + +impl CanisterQueue { + /// The memoty overhead of an empty `CanisterQueue`, in bytes. + pub const EMPTY_SIZE_BYTES: usize = size_of::(); + + /// Creates a new `CanisterQueue` with the given capacity. + pub(super) fn new(capacity: usize) -> Self { + Self { + queue: VecDeque::new(), + capacity, + request_slots: 0, + response_slots: 0, + response_memory_reservations: 0, + } + } + + /// Returns the number slots available for requests. + pub(super) fn available_request_slots(&self) -> usize { + debug_assert!(self.request_slots <= self.capacity); + self.capacity - self.request_slots + } + + /// Returns `Ok(())` if there exists at least one available request slot, + /// `Err(StateError::QueueFull)` otherwise. + pub(super) fn check_has_request_slot(&self) -> Result<(), StateError> { + if self.request_slots >= self.capacity { + return Err(StateError::QueueFull { + capacity: self.capacity, + }); + } + Ok(()) + } + + /// Enqueues a request. + /// + /// Panics if there is no available request slot. + pub(super) fn push_request(&mut self, id: MessageId) { + assert!(self.request_slots < self.capacity); + + self.queue.push_back(MessageReference::Request(id)); + self.request_slots += 1; + + debug_assert!(self.check_invariants()); + } + + /// Returns the number of response slots available for reservation. + pub(super) fn available_response_slots(&self) -> usize { + debug_assert!(self.response_slots <= self.capacity); + self.capacity - self.response_slots + } + + /// Reserves a slot for the response to the given request, if available; else + /// returns `Err(StateError::QueueFull)`. + /// + /// If the request is for a guaranteed response call, also reserves memory for + /// the response. + pub(super) fn try_reserve_response_slot(&mut self, req: &Request) -> Result<(), StateError> { + debug_assert!(self.response_slots <= self.capacity); + if self.response_slots >= self.capacity { + return Err(StateError::QueueFull { + capacity: self.capacity, + }); + } + + self.response_slots += 1; + if req.deadline == NO_DEADLINE { + self.response_memory_reservations += 1; + } + debug_assert!(self.check_invariants()); + Ok(()) + } + + /// Returns the number of reserved response slots. + pub(super) fn reserved_slots(&self) -> usize { + debug_assert!(self.request_slots + self.response_slots >= self.queue.len()); + self.request_slots + self.response_slots - self.queue.len() + } + + /// Returns `Ok(())` if there exists at least one reserved response slot and + /// (iff `class` is `GuaranteedResponse`) one response memory reservation, + /// `Err(StateError::InvariantBroken)` otherwise. + pub(super) fn check_has_reserved_response_slot(&self, class: Class) -> Result<(), StateError> { + if self.request_slots + self.response_slots <= self.queue.len() { + return Err(StateError::InvariantBroken( + "No reserved response slot".to_string(), + )); + } + + if class == Class::GuaranteedResponse && self.response_memory_reservations == 0 { + // Guaranteed response, we must have a memory reservation for it. + return Err(StateError::InvariantBroken( + "No guaranteed response memory reservation".to_string(), + )); + } + + Ok(()) + } + + /// Returns the number of guaranteed response memory reservations. + pub(super) fn response_memory_reservations(&self) -> usize { + self.response_memory_reservations + } + + /// Enqueues a response into a reserved slot, consuming the slot. + /// + /// Panics if there is no reserved response slot or if this is a guaranteed + /// response and there is no matching guaranteed response memory reservation. + pub(super) fn push_response(&mut self, id: MessageId) { + self.check_has_reserved_response_slot(id.class()).unwrap(); + + if id.class() == Class::GuaranteedResponse { + // Guaranteed response, consume one memory reservation. + self.response_memory_reservations -= 1; + } + + self.queue.push_back(MessageReference::Response(id)); + debug_assert!(self.check_invariants()); + } + + /// Pops a reference from the queue. Returns `None` if the queue is empty. + pub(super) fn pop(&mut self) -> Option { + let reference = self.queue.pop_front()?; + + if reference.is_response() { + debug_assert!(self.response_slots > 0); + self.response_slots -= 1; + } else { + debug_assert!(self.request_slots > 0); + self.request_slots -= 1; + } + debug_assert!(self.check_invariants()); + + Some(reference) + } + + /// Returns a reference to the next item in the queue; or `None` if + /// the queue is empty. + pub(super) fn peek(&self) -> Option<&MessageReference> { + self.queue.front() + } + + /// Returns `true` if the queue has one or more used slots. + /// + /// This is basically an `is_empty()` test, except it also looks at reserved + /// slots, so it is named differently to make it clear it doesn't only check for + /// enqueued references. + pub(super) fn has_used_slots(&self) -> bool { + !self.queue.is_empty() || self.response_slots > 0 + } + + /// Returns the length of the queue (including stale references but not + /// including reserved slots). + pub(super) fn len(&self) -> usize { + self.queue.len() + } + + /// Queue invariant check that panics if any invariant does not hold. Intended + /// to be called from within a `debug_assert!()` in production code. + /// + /// Time complexity: `O(n)`. + fn check_invariants(&self) -> bool { + // Requests and response slots at or below capacity. + assert!(self.request_slots <= self.capacity); + assert!(self.response_slots <= self.capacity); + + let responses = self.queue.iter().filter(|msg| msg.is_response()).count(); + // At most as many responses as response slots (difference is reserved slots). + assert!(responses <= self.response_slots); + // Queue contains only requests and responses. + assert_eq!(self.queue.len(), self.request_slots + responses); + // Cannot have more response memory reservations than slot reservations. + assert!(self.response_memory_reservations <= self.reserved_slots()); + + true + } + + /// Returns an iterator over the underlying messages. + /// + /// For testing purposes only. + pub(super) fn iter_for_testing<'a>( + &'a self, + pool: &'a MessagePool, + ) -> impl Iterator + 'a { + self.queue + .iter() + .filter_map(|reference| pool.get(reference.id()).cloned()) + } +} /// Trait for queue items in `InputQueue` and `OutputQueue`. Such items must /// either be a response or a request (including timed out requests). @@ -64,12 +363,12 @@ impl QueueItem> for Option { /// Requests are handled in a straightforward manner: pushing a request onto the /// queue succeeds if there are available request slots, fails if there aren't. /// -/// Response slots are used by either actual responses or by reservations for -/// expected responses. Since an (incoming or outgoing) response always results -/// from an (outgoing or, respectively, incoming) request, it is required to -/// first make a reservation for a response; and later push the response into -/// the reserved slot, consuming the reservation. Attempting to push a response -/// with no reservations available will produce an error. +/// Response slots are either used by responses or reserved for expected +/// responses. Since an (incoming or outgoing) response always results from an +/// (outgoing or, respectively, incoming) request, it is required to first +/// reserve a slot for a response; and later push the response into the reserved +/// slot, consuming the slot reservation. Attempting to push a response with no +/// reserved slot available will produce an error. #[derive(Clone, Debug, PartialEq, Eq, Hash)] struct QueueWithReservation + std::clone::Clone> { /// A FIFO queue of all requests and responses. Since responses may be enqueued @@ -363,7 +662,7 @@ impl InputQueue { } } - pub fn peek(&self) -> Option<&RequestOrResponse> { + pub(super) fn peek(&self) -> Option<&RequestOrResponse> { self.queue.peek() } @@ -647,7 +946,7 @@ impl OutputQueue { } /// Number of actual messages in the queue (`None` are ignored). - pub fn num_messages(&self) -> usize { + pub(super) fn num_messages(&self) -> usize { self.num_messages } @@ -699,7 +998,7 @@ impl OutputQueue { /// Returns an iterator over the underlying messages. /// /// For testing purposes only. - pub fn iter_for_testing(&self) -> impl Iterator> { + pub(super) fn iter_for_testing(&self) -> impl Iterator> { self.queue.queue.iter() } } @@ -848,10 +1147,11 @@ pub(super) struct IngressQueue { size_bytes: usize, } -const PER_CANISTER_QUEUE_OVERHEAD_BYTES: usize = - size_of::>() + size_of::>>(); - impl IngressQueue { + /// The memoty overhead of a per-canister ingress queue, in bytes. + const PER_CANISTER_QUEUE_OVERHEAD_BYTES: usize = + size_of::>() + size_of::>>(); + /// Pushes a new ingress message to the back of the queue. pub(super) fn push(&mut self, msg: Ingress) { let msg_size = Self::ingress_size_bytes(&msg); @@ -859,7 +1159,7 @@ impl IngressQueue { if receiver_ingress_queue.is_empty() { self.schedule.push_back(msg.effective_canister_id); - self.size_bytes += PER_CANISTER_QUEUE_OVERHEAD_BYTES; + self.size_bytes += Self::PER_CANISTER_QUEUE_OVERHEAD_BYTES; } receiver_ingress_queue.push_back(Arc::new(msg)); @@ -884,7 +1184,7 @@ impl IngressQueue { self.schedule.push_back(canister_id); } else { self.queues.remove(&canister_id); - self.size_bytes -= PER_CANISTER_QUEUE_OVERHEAD_BYTES; + self.size_bytes -= Self::PER_CANISTER_QUEUE_OVERHEAD_BYTES; } let msg = res.unwrap(); @@ -958,7 +1258,7 @@ impl IngressQueue { let canister_ingress_queue = self.queues.get(canister_id).unwrap(); if canister_ingress_queue.is_empty() { self.queues.remove(canister_id); - self.size_bytes -= PER_CANISTER_QUEUE_OVERHEAD_BYTES; + self.size_bytes -= Self::PER_CANISTER_QUEUE_OVERHEAD_BYTES; false } else { true @@ -986,7 +1286,7 @@ impl IngressQueue { .iter() .map(|i| Self::ingress_size_bytes(i)) .sum::() - + PER_CANISTER_QUEUE_OVERHEAD_BYTES; + + Self::PER_CANISTER_QUEUE_OVERHEAD_BYTES; } size } diff --git a/rs/replicated_state/src/canister_state/queues/queue/tests.rs b/rs/replicated_state/src/canister_state/queues/queue/tests.rs index 3e0f90bd97c..f9fdbb8efd5 100644 --- a/rs/replicated_state/src/canister_state/queues/queue/tests.rs +++ b/rs/replicated_state/src/canister_state/queues/queue/tests.rs @@ -1,12 +1,353 @@ +use super::super::message_pool::tests::*; use super::*; -use ic_test_utilities_types::{ - arbitrary, - ids::{canister_test_id, message_test_id, user_test_id}, - messages::{IngressBuilder, RequestBuilder, ResponseBuilder}, -}; -use ic_types::{messages::RequestOrResponse, time::UNIX_EPOCH, Time}; +use assert_matches::assert_matches; +use ic_test_utilities_types::arbitrary; +use ic_test_utilities_types::ids::{canister_test_id, message_test_id, user_test_id}; +use ic_test_utilities_types::messages::{IngressBuilder, RequestBuilder, ResponseBuilder}; +use ic_types::messages::{CallbackId, RequestOrResponse}; +use ic_types::time::{CoarseTime, UNIX_EPOCH}; +use ic_types::Time; use proptest::prelude::*; +#[test] +fn canister_queue_constructor_test() { + const CAPACITY: usize = 14; + let mut queue = CanisterQueue::new(CAPACITY); + + assert_eq!(0, queue.len()); + assert!(!queue.has_used_slots()); + assert_eq!(CAPACITY, queue.capacity); + assert_eq!(CAPACITY, queue.available_request_slots()); + assert_eq!(CAPACITY, queue.available_response_slots()); + assert_eq!(Ok(()), queue.check_has_request_slot()); + assert_eq!(0, queue.reserved_slots()); + assert_matches!( + queue.check_has_reserved_response_slot(Class::BestEffort), + Err(StateError::InvariantBroken(_)) + ); + assert_matches!( + queue.check_has_reserved_response_slot(Class::GuaranteedResponse), + Err(StateError::InvariantBroken(_)) + ); + assert_eq!(queue.peek(), None); + assert_eq!(queue.pop(), None); +} + +// Pushing a request succeeds if there is space. +#[test] +fn canister_queue_push_request_succeeds() { + const CAPACITY: usize = 1; + let mut queue = CanisterQueue::new(CAPACITY); + + let id = new_request_message_id(13, Class::BestEffort); + queue.push_request(id); + + assert_eq!(1, queue.len()); + assert!(queue.has_used_slots()); + assert_eq!(CAPACITY - 1, queue.available_request_slots()); + assert_eq!( + Err(StateError::QueueFull { capacity: CAPACITY }), + queue.check_has_request_slot() + ); + assert_eq!(CAPACITY, queue.available_response_slots()); + assert_eq!(0, queue.reserved_slots()); + assert_matches!( + queue.check_has_reserved_response_slot(Class::BestEffort), + Err(StateError::InvariantBroken(_)) + ); + assert_eq!(0, queue.response_memory_reservations()); + + // Peek, then pop the request. + assert_eq!(Some(&MessageReference::Request(id)), queue.peek()); + assert_eq!(Some(MessageReference::Request(id)), queue.pop()); + + assert_eq!(0, queue.len()); + assert!(!queue.has_used_slots()); + assert_eq!(CAPACITY, queue.available_request_slots()); + assert_eq!(Ok(()), queue.check_has_request_slot()); + assert_eq!(CAPACITY, queue.available_response_slots()); + assert_eq!(0, queue.reserved_slots()); + assert_matches!( + queue.check_has_reserved_response_slot(Class::BestEffort), + Err(StateError::InvariantBroken(_)) + ); + assert_eq!(0, queue.response_memory_reservations()); +} + +// Reserving a slot, then pushing a response succeeds if there is space. +#[test] +fn canister_queue_push_response_succeeds() { + use Class::*; + + const CAPACITY: usize = 1; + let mut queue = CanisterQueue::new(CAPACITY); + + // Reserve a slot. + queue + .try_reserve_response_slot(&make_request(13, GuaranteedResponse)) + .unwrap(); + + assert_eq!(0, queue.len()); + assert!(queue.has_used_slots()); + assert_eq!(CAPACITY, queue.available_request_slots()); + assert_eq!(Ok(()), queue.check_has_request_slot()); + assert_eq!(CAPACITY - 1, queue.available_response_slots()); + assert_eq!(1, queue.reserved_slots()); + assert_eq!( + Ok(()), + queue.check_has_reserved_response_slot(GuaranteedResponse) + ); + assert_eq!(1, queue.response_memory_reservations()); + + // Push response into reseerved slot. + let id = new_response_message_id(13, GuaranteedResponse); + queue.push_response(id); + + assert_eq!(1, queue.len()); + assert!(queue.has_used_slots()); + assert_eq!(CAPACITY, queue.available_request_slots()); + assert_eq!(Ok(()), queue.check_has_request_slot()); + assert_eq!(CAPACITY - 1, queue.available_response_slots()); + assert_eq!(0, queue.reserved_slots()); + assert_matches!( + queue.check_has_reserved_response_slot(BestEffort), + Err(StateError::InvariantBroken(_)) + ); + assert_eq!(0, queue.response_memory_reservations()); + + // Peek, then pop the response reference. + assert_eq!(Some(&MessageReference::Response(id)), queue.peek()); + assert_eq!(Some(MessageReference::Response(id)), queue.pop()); + + assert_eq!(0, queue.len()); + assert!(!queue.has_used_slots()); + assert_eq!(CAPACITY, queue.available_request_slots()); + assert_eq!(Ok(()), queue.check_has_request_slot()); + assert_eq!(CAPACITY, queue.available_response_slots()); + assert_eq!(0, queue.reserved_slots()); + assert_matches!( + queue.check_has_reserved_response_slot(BestEffort), + Err(StateError::InvariantBroken(_)) + ); + assert_eq!(0, queue.response_memory_reservations()); +} + +/// Test that overfilling an output queue with requests results in failed +/// pushes; also verifies that pushes below capacity succeed. +#[test] +#[should_panic(expected = "assertion failed: self.request_slots < self.capacity")] +fn canister_queue_push_request_to_full_queue_fails() { + // First fill up the queue. + const CAPACITY: usize = 2; + let mut queue = CanisterQueue::new(CAPACITY); + for i in 0..CAPACITY { + queue.push_request(new_request_message_id(i as u64, Class::BestEffort)); + } + + assert_eq!(CAPACITY, queue.len()); + assert!(queue.has_used_slots()); + assert_eq!(0, queue.available_request_slots()); + assert_eq!( + Err(StateError::QueueFull { capacity: CAPACITY }), + queue.check_has_request_slot() + ); + assert_eq!(CAPACITY, queue.available_response_slots()); + assert_eq!(0, queue.reserved_slots()); + assert_matches!( + queue.check_has_reserved_response_slot(Class::BestEffort), + Err(StateError::InvariantBroken(_)) + ); + assert_eq!(0, queue.response_memory_reservations()); + + queue.push_request(new_request_message_id(13, Class::BestEffort)); +} + +/// Test that overfilling an output queue with slot reservations results in +/// failed slot reservations; also verifies that slot reservations below +/// capacity succeed. +#[test] +fn canister_queue_try_reserve_response_slot_in_full_queue_fails() { + use Class::*; + + const CAPACITY: usize = 2; + let mut queue = CanisterQueue::new(CAPACITY); + + // Reserve all response slots. + for i in 0..CAPACITY { + let class = if i % 2 == 0 { + BestEffort + } else { + GuaranteedResponse + }; + queue + .try_reserve_response_slot(&make_request(i as u64, class)) + .unwrap(); + } + + assert_eq!(0, queue.len()); + assert!(queue.has_used_slots()); + assert_eq!(CAPACITY, queue.available_request_slots()); + assert_eq!(Ok(()), queue.check_has_request_slot()); + assert_eq!(0, queue.available_response_slots()); + assert_eq!(CAPACITY, queue.reserved_slots()); + assert_eq!(Ok(()), queue.check_has_reserved_response_slot(BestEffort)); + assert_eq!(CAPACITY / 2, queue.response_memory_reservations()); + + // Trying to reserve a slot fails. + assert_eq!( + Err(StateError::QueueFull { capacity: CAPACITY }), + queue.try_reserve_response_slot(&make_request(13, BestEffort)) + ); + + // Fill the queue with responses. + for i in 0..CAPACITY { + let class = if i % 2 == 0 { + BestEffort + } else { + GuaranteedResponse + }; + queue.push_response(new_response_message_id(i as u64, class)); + } + + assert_eq!(2, queue.len()); + assert!(queue.has_used_slots()); + assert_eq!(CAPACITY, queue.available_request_slots()); + assert_eq!(Ok(()), queue.check_has_request_slot()); + assert_eq!(0, queue.available_response_slots()); + assert_eq!(0, queue.reserved_slots()); + assert_matches!( + queue.check_has_reserved_response_slot(BestEffort), + Err(StateError::InvariantBroken(_)) + ); + assert_eq!(0, queue.response_memory_reservations()); + + // Trying to reserve a slot still fails. + assert_eq!( + Err(StateError::QueueFull { capacity: CAPACITY }), + queue.try_reserve_response_slot(&make_request(13, BestEffort)) + ); +} + +/// Test that a queue can be filled with both requests and responses at the +/// same time. +#[test] +fn canister_queue_full_duplex() { + // First fill up the queue. + const CAPACITY: usize = 2; + let mut queue = CanisterQueue::new(CAPACITY); + for i in 0..CAPACITY as u64 { + queue.push_request(new_request_message_id(i * 2, Class::BestEffort)); + queue + .try_reserve_response_slot(&make_request(i, Class::BestEffort)) + .unwrap(); + queue.push_response(new_response_message_id(i * 2 + 1, Class::BestEffort)); + } + + assert_eq!(2 * CAPACITY, queue.len()); + assert!(queue.has_used_slots()); + assert_eq!(0, queue.available_request_slots()); + assert_eq!( + Err(StateError::QueueFull { capacity: CAPACITY }), + queue.check_has_request_slot() + ); + assert_eq!(0, queue.available_response_slots()); + assert_eq!( + Err(StateError::QueueFull { capacity: CAPACITY }), + queue.try_reserve_response_slot(&make_request(13, Class::BestEffort)), + ); +} + +#[test] +#[should_panic(expected = "InvariantBroken(\"No reserved response slot\")")] +fn canister_queue_push_without_reserved_slot_panics() { + let mut queue = CanisterQueue::new(10); + queue.push_response(new_response_message_id(13, Class::BestEffort)); +} + +#[test] +#[should_panic(expected = "InvariantBroken(\"No guaranteed response memory reservation\")")] +fn canister_queue_push_without_memory_reservation_panics() { + let mut queue = CanisterQueue::new(10); + // Reserve a best-effort slot. + queue + .try_reserve_response_slot(&make_request(1, Class::BestEffort)) + .unwrap(); + // Push a guaranteed response. + queue.push_response(new_response_message_id(13, Class::GuaranteedResponse)); +} + +#[test] +#[should_panic( + expected = "assertion failed: self.response_memory_reservations <= self.reserved_slots()" +)] +fn canister_queue_push_without_consuming_memory_reservation_panics() { + let mut queue = CanisterQueue::new(10); + // Reserve a guaranteed response slot. + queue + .try_reserve_response_slot(&make_request(1, Class::GuaranteedResponse)) + .unwrap(); + // Push a best-effort response. + queue.push_response(new_response_message_id(13, Class::BestEffort)); +} + +/// Generator for an arbitrary `MessageReference`. +fn arbitrary_message_reference() -> impl Strategy { + prop_oneof![ + 1 => any::().prop_map(|gen| MessageReference::Request(new_request_message_id(gen, Class::GuaranteedResponse))), + 1 => any::().prop_map(|gen| MessageReference::Request(new_request_message_id(gen, Class::BestEffort))), + 1 => any::().prop_map(|gen| MessageReference::Response(new_response_message_id(gen, Class::GuaranteedResponse))), + 1 => any::().prop_map(|gen| MessageReference::Response(new_response_message_id(gen, Class::BestEffort))), + ] +} + +proptest! { + #[test] + fn canister_queue_push_and_pop( + mut references in proptest::collection::vec_deque( + arbitrary_message_reference(), + 10..20, + ) + ) { + // Create a queue with large enough capacity. + let mut queue = CanisterQueue::new(20); + + // Push all references onto the queue. + for reference in references.iter() { + match reference { + MessageReference::Request(id) => { + queue.push_request(*id); + } + MessageReference::Response(id) => { + queue.try_reserve_response_slot(&make_request(13, id.class())).unwrap(); + queue.push_response(*id); + } + } + prop_assert!(queue.check_invariants()); + } + + // Check the contents of the queue via `peek` and `pop`. + while let Some(r) = queue.peek() { + let reference = references.pop_front(); + prop_assert_eq!(reference.as_ref(), Some(r)); + prop_assert_eq!(reference, queue.pop()); + } + + // All references should have been consumed. + prop_assert!(references.is_empty()); + } +} + +fn make_request(callback_id: u64, class: Class) -> Request { + RequestBuilder::default() + .sender_reply_callback(CallbackId::from(callback_id)) + .deadline(if class == Class::BestEffort { + CoarseTime::from_secs_since_unix_epoch(1) + } else { + NO_DEADLINE + }) + .build() +} + #[test] fn input_queue_constructor_test() { let capacity: usize = 14; @@ -409,7 +750,6 @@ prop_compose! { fn arb_output_queue() ( (time, num_pop, mut q) in arb_output_queue_no_timeout(5..=20) .prop_flat_map(|q| (arb_time_for_output_queue_timeouts(&q), 0..3_usize, Just(q))) - ) -> OutputQueue { q.time_out_requests(time).count(); q.check_invariants(); @@ -658,7 +998,7 @@ fn output_queue_decode_check_num_requests_and_responses() { ) .is_ok()); - // Check we get an error with one more request. + // Check that we get an error with one more request. queue.push_back(Some(RequestOrResponse::Request( RequestBuilder::default().build().into(), ))); @@ -671,7 +1011,7 @@ fn output_queue_decode_check_num_requests_and_responses() { .is_err()); queue.pop_back(); - // Check we get an error with one more `None`. + // Check that we get an error with one more `None`. queue.push_back(None); assert!(output_queue_roundtrip_from_vec_deque( queue.clone(), @@ -682,7 +1022,7 @@ fn output_queue_decode_check_num_requests_and_responses() { .is_err()); queue.pop_back(); - // Check we get an error with one more response. + // Check that we get an error with one more response. queue.push_back(Some(RequestOrResponse::Response( ResponseBuilder::default().build().into(), ))); @@ -695,7 +1035,7 @@ fn output_queue_decode_check_num_requests_and_responses() { .is_err()); queue.pop_back(); - // Check we get an error with one more reservation. + // Check that we get an error with one more slot reservation. assert!(output_queue_roundtrip_from_vec_deque( queue.clone(), num_request_slots, diff --git a/rs/types/types/src/funds/cycles.rs b/rs/types/types/src/funds/cycles.rs index 8a77b16259b..7780235db89 100644 --- a/rs/types/types/src/funds/cycles.rs +++ b/rs/types/types/src/funds/cycles.rs @@ -5,6 +5,7 @@ use ic_protobuf::state::canister_state_bits::v1::CyclesAccount as pbCyclesAccoun use ic_protobuf::state::queues::v1::Cycles as PbCycles; use serde::{Deserialize, Serialize}; use std::convert::TryInto; +use std::iter::Sum; use std::{ fmt, ops::{Add, AddAssign, Div, Mul, Sub, SubAssign}, @@ -189,6 +190,12 @@ impl Div for Cycles { } } +impl Sum for Cycles { + fn sum>(iter: I) -> Self { + iter.fold(Cycles::zero(), Cycles::add) + } +} + impl fmt::Display for Cycles { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0.separate_with_underscores())