Skip to content

Commit dce2a8f

Browse files
committed
Revert "Update shared-sequencer/movement-sequencer to match main"
This reverts commit 66a2072.
1 parent 511ec04 commit dce2a8f

File tree

7 files changed

+384
-73
lines changed

7 files changed

+384
-73
lines changed

shared-sequencer/movement-sequencer/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ serde_json = "1.0.113" # https://github.com/serde-rs/json/releases
2929
serde_with = { version = "3.6.1", features = ["hex"] }
3030
tokio = { version = "1.36.0", features = ["fs", "rt-multi-thread"] }
3131
tonic = { version = "0.11.0", features = ["gzip"] }
32+
tempfile = "3.2.0"
3233
futures = "0.3.30"
3334

3435
# new deps

shared-sequencer/movement-sequencer/src/block/mod.rs

+40-1
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,33 @@ use serde_with::serde_as;
2121

2222
#[serde_as]
2323
#[derive(Serialize, Deserialize, Clone, Derivative, Default)]
24-
#[derivative(Debug, PartialEq, Eq)]
24+
#[derivative(Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
2525
pub struct Transaction {
26+
// the id of the rollup chain that the transaction is for
2627
#[serde_as(as = "Hex0xBytes")]
2728
pub consumer_id : Vec<u8>,
29+
// the data of the transaction
2830
#[serde_as(as = "Hex0xBytes")]
2931
pub data : Vec<u8>,
3032
}
3133

3234
impl Transaction {
3335

36+
pub fn new(consumer_id: Vec<u8>, data: Vec<u8>) -> Self {
37+
Self {
38+
consumer_id,
39+
data,
40+
}
41+
}
42+
43+
/// Creates a transaction for testing
44+
pub fn test() -> Self {
45+
Self {
46+
consumer_id: vec![0, 1, 2, 3, 4],
47+
data: vec![5, 6, 7, 8, 9],
48+
}
49+
}
50+
3451
#[must_use]
3552
pub fn id(&self) -> ids::Id {
3653
let data_and_consumer_id = [self.data.clone(), self.consumer_id.clone()].concat();
@@ -72,6 +89,28 @@ pub struct Block {
7289

7390

7491
impl Block {
92+
93+
/// Creates a transaction for testing
94+
pub fn test() -> Self {
95+
let consumer_id = vec![0, 1, 2, 3, 4];
96+
let data = vec![5, 6, 7, 8, 9];
97+
let transactions = vec![Transaction::new(consumer_id, data)];
98+
let parent_id = ids::Id::empty();
99+
let height = 0;
100+
let timestamp = Utc::now().timestamp() as u64;
101+
let status = choices::status::Status::default();
102+
Self {
103+
parent_id,
104+
height,
105+
timestamp,
106+
transactions,
107+
status,
108+
bytes: vec![],
109+
id: ids::Id::empty(),
110+
state: state::State::default(),
111+
}
112+
}
113+
75114
/// Can fail if the block can't be serialized to JSON.
76115
/// # Errors
77116
/// Will fail if the block can't be serialized to JSON.

shared-sequencer/movement-sequencer/src/mempool/mod.rs

+144-13
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,84 @@
1+
use serde::{Deserialize, Serialize};
2+
13
use crate::block::{
24
Transaction,
35
Block
46
};
7+
use std::cmp::Ordering;
58
pub mod rocksdb;
69

710
#[tonic::async_trait]
811
pub trait MempoolTransactionOperations {
912

10-
/// Checks whether a transaction exists in the mempool.
11-
async fn has_transaction(&self, transaction_id : avalanche_types::ids::Id) -> Result<bool, anyhow::Error>;
13+
// todo: move mempool_transaction methods into separate trait
14+
15+
/// Checks whether a mempool transaction exists in the mempool.
16+
async fn has_mempool_transaction(&self, transaction_id : avalanche_types::ids::Id) -> Result<bool, anyhow::Error>;
17+
18+
/// Adds a mempool transaction to the mempool.
19+
async fn add_mempool_transaction(&self, tx: MempoolTransaction) -> Result<(), anyhow::Error>;
20+
21+
/// Removes a mempool transaction from the mempool.
22+
async fn remove_mempool_transaction(&self, transaction_id: avalanche_types::ids::Id) -> Result<(), anyhow::Error>;
23+
24+
/// Pops mempool transaction from the mempool.
25+
async fn pop_mempool_transaction(&self) -> Result<Option<MempoolTransaction>, anyhow::Error>;
26+
27+
/// Gets a mempool transaction from the mempool.
28+
async fn get_mempool_transaction(&self, transaction_id: avalanche_types::ids::Id) -> Result<Option<MempoolTransaction>, anyhow::Error>;
29+
30+
/// Pops the next n mempool transactions from the mempool.
31+
async fn pop_mempool_transactions(&self, n: usize) -> Result<Vec<MempoolTransaction>, anyhow::Error> {
32+
let mut mempool_transactions = Vec::new();
33+
for _ in 0..n {
34+
if let Some(mempool_transaction) = self.pop_mempool_transaction().await? {
35+
mempool_transactions.push(mempool_transaction);
36+
} else {
37+
break;
38+
}
39+
}
40+
Ok(mempool_transactions)
41+
}
42+
43+
/// Checks whether the mempool has the transaction.
44+
async fn has_transaction(&self, transaction_id: avalanche_types::ids::Id) -> Result<bool, anyhow::Error> {
45+
self.has_mempool_transaction(transaction_id).await
46+
}
1247

1348
/// Adds a transaction to the mempool.
14-
async fn add_transaction(&self, tx: Transaction) -> Result<(), anyhow::Error>;
49+
async fn add_transaction(&self, tx: Transaction) -> Result<(), anyhow::Error> {
50+
51+
if self.has_transaction(tx.id()).await? {
52+
return Ok(());
53+
}
54+
55+
let mempool_transaction = MempoolTransaction::slot_now(tx);
56+
self.add_mempool_transaction(mempool_transaction).await
57+
58+
}
1559

1660
/// Removes a transaction from the mempool.
17-
async fn remove_transaction(&self, transaction_id: avalanche_types::ids::Id) -> Result<(), anyhow::Error>;
61+
async fn remove_transaction(&self, transaction_id: avalanche_types::ids::Id) -> Result<(), anyhow::Error> {
62+
self.remove_mempool_transaction(transaction_id).await
63+
}
1864

1965
/// Pops transaction from the mempool.
20-
async fn pop_transaction(&self, transaction_id: avalanche_types::ids::Id) -> Result<Transaction, anyhow::Error>;
21-
66+
async fn pop_transaction(&self) -> Result<Option<Transaction>, anyhow::Error> {
67+
let mempool_transaction = self.pop_mempool_transaction().await?;
68+
Ok(mempool_transaction.map(|mempool_transaction| mempool_transaction.transaction))
69+
}
70+
2271
/// Gets a transaction from the mempool.
23-
async fn get_transaction(&self, transaction_id: avalanche_types::ids::Id) -> Result<Transaction, anyhow::Error>;
72+
async fn get_transaction(&self, transaction_id: avalanche_types::ids::Id) -> Result<Option<Transaction>, anyhow::Error> {
73+
let mempool_transaction = self.get_mempool_transaction(transaction_id).await?;
74+
Ok(mempool_transaction.map(|mempool_transaction| mempool_transaction.transaction))
75+
}
2476

25-
/// Provides well-ordered transaction iterable
26-
async fn iter(&self) -> Result<impl Iterator<Item = Transaction>, anyhow::Error>;
77+
/// Pops the next n transactions from the mempool.
78+
async fn pop_transactions(&self, n: usize) -> Result<Vec<Transaction>, anyhow::Error> {
79+
let mempool_transactions = self.pop_mempool_transactions(n).await?;
80+
Ok(mempool_transactions.into_iter().map(|mempool_transaction| mempool_transaction.transaction).collect())
81+
}
2782

2883
}
2984

@@ -38,15 +93,91 @@ pub trait MempoolBlockOperations {
3893

3994
/// Removes a block from the mempool.
4095
async fn remove_block(&self, block_id: avalanche_types::ids::Id) -> Result<(), anyhow::Error>;
41-
42-
/// Pops block from the mempool.
43-
async fn pop_block(&self, block_id: avalanche_types::ids::Id) -> Result<Block, anyhow::Error>;
4496

4597
/// Gets a block from the mempool.
46-
async fn get_block(&self, block_id: avalanche_types::ids::Id) -> Result<Block, anyhow::Error>;
98+
async fn get_block(&self, block_id: avalanche_types::ids::Id) -> Result<Option<Block>, anyhow::Error>;
99+
100+
}
47101

102+
/// Wraps a transaction with a timestamp for help ordering.
103+
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
104+
pub struct MempoolTransaction {
105+
pub transaction: Transaction,
106+
pub timestamp: u64,
107+
pub slot_seconds : u64
48108
}
49109

110+
impl PartialOrd for MempoolTransaction {
111+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
112+
Some(self.cmp(other))
113+
}
114+
}
115+
116+
/// Ordered first by slot_seconds, then by transaction.
117+
/// This allows us to use a BTreeSet to order transactions by slot_seconds, and then by transaction and pop them off in order.
118+
impl Ord for MempoolTransaction {
119+
fn cmp(&self, other: &Self) -> Ordering {
120+
// First, compare by slot_seconds
121+
match self.slot_seconds.cmp(&other.slot_seconds) {
122+
Ordering::Equal => {}
123+
non_equal => return non_equal,
124+
}
125+
// If slot_seconds are equal, then compare by transaction
126+
self.transaction.cmp(&other.transaction)
127+
}
128+
}
129+
130+
impl MempoolTransaction {
131+
132+
const SLOT_SECONDS : u64 = 2;
133+
134+
/// Creates a test MempoolTransaction.
135+
pub fn test() -> Self {
136+
Self {
137+
transaction: Transaction::test(),
138+
timestamp: 0,
139+
slot_seconds: Self::SLOT_SECONDS
140+
}
141+
}
142+
143+
pub fn at_time(transaction: Transaction, timestamp: u64) -> Self {
144+
let floor = (
145+
timestamp / Self::SLOT_SECONDS
146+
) * Self::SLOT_SECONDS;
147+
Self {
148+
transaction,
149+
timestamp : floor,
150+
slot_seconds: Self::SLOT_SECONDS
151+
}
152+
}
153+
154+
pub fn new(transaction: Transaction, timestamp: u64, slot_seconds: u64) -> Self {
155+
Self {
156+
transaction,
157+
timestamp,
158+
slot_seconds
159+
}
160+
}
161+
162+
/// Creates a new MempoolTransaction with the current timestamp floored to the nearest slot.
163+
/// todo: probably want to move this out to a factory.
164+
pub fn slot_now(transaction : Transaction) -> MempoolTransaction {
165+
166+
let timestamp = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
167+
168+
Self::at_time(transaction, timestamp)
169+
170+
}
171+
172+
pub fn id(&self) -> avalanche_types::ids::Id {
173+
self.transaction.id()
174+
}
175+
176+
177+
}
178+
179+
180+
/// Combines RocksdbMempool with InMemoryMempool.
50181
#[derive(Debug, Clone)]
51182
pub struct Mempool {
52183

0 commit comments

Comments
 (0)