From 55cb38f57f9d54d1c6b628753dd9024553bdbabe Mon Sep 17 00:00:00 2001 From: Liam Monninger Date: Fri, 25 Oct 2024 10:06:02 -0700 Subject: [PATCH 1/7] feat: application priority. --- package-lock.json | 2 +- protocol-units/mempool/move-rocks/src/lib.rs | 80 ++++++++++++++++--- protocol-units/mempool/util/src/lib.rs | 14 +++- .../sequencing/memseq/sequencer/src/lib.rs | 16 ++-- util/movement-types/src/transaction.rs | 36 +++++++-- 5 files changed, 119 insertions(+), 29 deletions(-) diff --git a/package-lock.json b/package-lock.json index 915069eb0..eb8637b2b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,5 +1,5 @@ { - "name": "movement", + "name": "0", "lockfileVersion": 3, "requires": true, "packages": { diff --git a/protocol-units/mempool/move-rocks/src/lib.rs b/protocol-units/mempool/move-rocks/src/lib.rs index c9f267a29..b649f62b7 100644 --- a/protocol-units/mempool/move-rocks/src/lib.rs +++ b/protocol-units/mempool/move-rocks/src/lib.rs @@ -22,10 +22,11 @@ pub struct RocksdbMempool { fn construct_mempool_transaction_key(transaction: &MempoolTransaction) -> String { // Pre-allocate a string with the required capacity - let mut key = String::with_capacity(32 + 1 + 32 + 1 + 32); + let mut key = String::with_capacity(32 + 1 + 32 + 1 + 32 + 1 + 32); // Write key components. The numbers are zero-padded to 32 characters. key.write_fmt(format_args!( - "{:032}:{:032}:{}", + "{:032}:{:032}:{:032}:{}", + transaction.transaction.application_priority(), transaction.timestamp, transaction.transaction.sequence_number(), transaction.transaction.id(), @@ -469,14 +470,14 @@ pub mod tests { let path = temp_dir.path().to_str().unwrap(); let mempool = RocksdbMempool::try_new(path)?; - let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0), 2); + let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0, 0), 2); let transaction1_id = transaction1.id(); mempool.add_mempool_transaction(transaction1).await?; assert!(mempool.has_transaction(transaction1_id).await?); sleep(Duration::from_secs(2)).await; - let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 0), 64); + let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 0, 0), 64); let transaction2_id = transaction2.id(); let transaction2_timestamp = transaction2.timestamp; mempool.add_mempool_transaction(transaction2).await?; @@ -495,9 +496,9 @@ pub mod tests { let path = temp_dir.path().to_str().unwrap(); let mempool = RocksdbMempool::try_new(path)?; - let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0), 2); - let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 0), 64); - let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![3], 0), 128); + let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0, 0), 2); + let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 0, 0), 64); + let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![3], 0, 0), 128); mempool.add_mempool_transaction(transaction2.clone()).await?; mempool.add_mempool_transaction(transaction1.clone()).await?; @@ -517,9 +518,9 @@ pub mod tests { let path = temp_dir.path().to_str().unwrap(); let mempool = RocksdbMempool::try_new(path)?; - let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0), 2); - let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 1), 2); - let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![3], 0), 64); + let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0, 0), 2); + let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 0, 1), 2); + let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![3], 0, 0), 64); mempool.add_mempool_transaction(transaction2.clone()).await?; mempool.add_mempool_transaction(transaction1.clone()).await?; @@ -539,9 +540,9 @@ pub mod tests { let path = temp_dir.path().to_str().unwrap(); let mempool = RocksdbMempool::try_new(path)?; - let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0), 0); - let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 1), 0); - let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![3], 2), 0); + let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0, 0), 0); + let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 0, 1), 0); + let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![3], 0, 2), 0); mempool.add_mempool_transaction(transaction2.clone()).await?; mempool.add_mempool_transaction(transaction1.clone()).await?; @@ -554,4 +555,57 @@ pub mod tests { Ok(()) } + + #[tokio::test] + async fn test_application_priority_based_ordering() -> Result<(), Error> { + let temp_dir = tempdir().unwrap(); + let path = temp_dir.path().to_str().unwrap(); + let mempool = RocksdbMempool::try_new(path)?; + + let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0, 0), 0); + let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 1, 0), 0); + let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![3], 2, 0), 0); + + mempool.add_mempool_transaction(transaction2.clone()).await?; + mempool.add_mempool_transaction(transaction1.clone()).await?; + mempool.add_mempool_transaction(transaction3.clone()).await?; + + let transactions = mempool.pop_mempool_transactions(3).await?; + assert_eq!(transactions[0], transaction1); + assert_eq!(transactions[1], transaction2); + assert_eq!(transactions[2], transaction3); + + Ok(()) + } + + #[tokio::test] + async fn test_total_ordering() -> Result<(), Error> { + let temp_dir = tempdir().unwrap(); + let path = temp_dir.path().to_str().unwrap(); + let mempool = RocksdbMempool::try_new(path)?; + + let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0, 0), 0); + let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![1], 0, 1), 0); + let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![1], 0, 1), 1); + let transaction4 = MempoolTransaction::at_time(Transaction::new(vec![2], 0, 1), 1); + let transaction5 = MempoolTransaction::at_time(Transaction::new(vec![1], 1, 1), 1); + let transaction6 = MempoolTransaction::at_time(Transaction::new(vec![1], 1, 2), 1); + + mempool.add_mempool_transaction(transaction2.clone()).await?; + mempool.add_mempool_transaction(transaction1.clone()).await?; + mempool.add_mempool_transaction(transaction3.clone()).await?; + mempool.add_mempool_transaction(transaction5.clone()).await?; + mempool.add_mempool_transaction(transaction4.clone()).await?; + mempool.add_mempool_transaction(transaction6.clone()).await?; + + let transactions = mempool.pop_mempool_transactions(6).await?; + assert_eq!(transactions[0], transaction1); + assert_eq!(transactions[1], transaction2); + assert_eq!(transactions[2], transaction3); + assert_eq!(transactions[3], transaction4); + assert_eq!(transactions[4], transaction5); + assert_eq!(transactions[5], transaction6); + + Ok(()) + } } diff --git a/protocol-units/mempool/util/src/lib.rs b/protocol-units/mempool/util/src/lib.rs index a633f62c3..350e1e31f 100644 --- a/protocol-units/mempool/util/src/lib.rs +++ b/protocol-units/mempool/util/src/lib.rs @@ -76,6 +76,7 @@ pub trait MempoolTransactionOperations { self.has_mempool_transaction(transaction_id).await } + /// Adds transactions to the mempool. async fn add_transactions(&self, transactions: Vec) -> Result<(), anyhow::Error> { let mempool_transactions = transactions.into_iter().map(MempoolTransaction::slot_now).collect(); @@ -158,7 +159,18 @@ impl PartialOrd for MempoolTransaction { /// This allows us to use a BTreeSet to order transactions by slot_seconds, and then by transaction and pop them off in order. impl Ord for MempoolTransaction { fn cmp(&self, other: &Self) -> Ordering { - // First, compare by timestamps + // First, compare the application priority + // Note: this also happens again in the inner transaction comparison, but the priority should come first both in the [MempoolTransaction] and in the [Transaction] by itself. + match self + .transaction + .application_priority() + .cmp(&other.transaction.application_priority()) + { + Ordering::Equal => {} + non_equal => return non_equal, + } + + // Then, compare by timestamps match self.timestamp.cmp(&other.timestamp) { Ordering::Equal => {} non_equal => return non_equal, diff --git a/protocol-units/sequencing/memseq/sequencer/src/lib.rs b/protocol-units/sequencing/memseq/sequencer/src/lib.rs index 0dd351432..89ced0b6e 100644 --- a/protocol-units/sequencing/memseq/sequencer/src/lib.rs +++ b/protocol-units/sequencing/memseq/sequencer/src/lib.rs @@ -156,7 +156,7 @@ pub mod test { // Add some transactions for i in 0..5 { - let transaction = Transaction::new(vec![i as u8], 0); + let transaction = Transaction::new(vec![i as u8], 0, 0); memseq.publish(transaction).await?; } @@ -177,7 +177,7 @@ pub mod test { let parent_block = Arc::new(RwLock::new(block::Id::default())); let memseq = Memseq::new(mempool, 10, parent_block, 1000); - let transaction = Transaction::new(vec![1, 2, 3], 0); + let transaction = Transaction::new(vec![1, 2, 3], 0, 0); let result = memseq.publish(transaction).await; assert!(result.is_err()); assert_eq!(result.unwrap_err().to_string(), "Mock add_transaction"); @@ -200,7 +200,7 @@ pub mod test { for i in 0..100 { let memseq_clone = Arc::clone(&memseq); let handle = tokio::spawn(async move { - let transaction = Transaction::new(vec![i as u8], 0); + let transaction = Transaction::new(vec![i as u8], 0, 0); memseq_clone.publish(transaction).await.unwrap(); }); handles.push(handle); @@ -225,7 +225,7 @@ pub mod test { let memseq_clone = Arc::clone(&memseq); let handle = async move { for n in 0..10 { - let transaction = Transaction::new(vec![i * 10 + n as u8], 0); + let transaction = Transaction::new(vec![i * 10 + n as u8], 0, 0); memseq_clone.publish(transaction).await?; } Ok::<_, anyhow::Error>(()) @@ -324,7 +324,7 @@ pub mod test { let path = dir.path().to_path_buf(); let memseq = Memseq::try_move_rocks(path, 128, 250)?; - let transaction: Transaction = Transaction::new(vec![1, 2, 3], 0); + let transaction: Transaction = Transaction::new(vec![1, 2, 3], 0, 0); memseq.publish(transaction.clone()).await?; let block = memseq.wait_for_next_block().await?; @@ -348,7 +348,7 @@ pub mod test { let mut transactions = Vec::new(); for i in 0..block_size * 2 { - let transaction: Transaction = Transaction::new(vec![i as u8], 0); + let transaction: Transaction = Transaction::new(vec![i as u8], 0, 0); memseq.publish(transaction.clone()).await?; transactions.push(transaction); } @@ -389,7 +389,7 @@ pub mod test { // add half of the transactions for i in 0..block_size / 2 { - let transaction: Transaction = Transaction::new(vec![i as u8], 0); + let transaction: Transaction = Transaction::new(vec![i as u8], 0, 0); memseq.publish(transaction.clone()).await?; } @@ -397,7 +397,7 @@ pub mod test { // add the rest of the transactions for i in block_size / 2..block_size - 2 { - let transaction: Transaction = Transaction::new(vec![i as u8], 0); + let transaction: Transaction = Transaction::new(vec![i as u8], 0, 0); memseq.publish(transaction.clone()).await?; } diff --git a/util/movement-types/src/transaction.rs b/util/movement-types/src/transaction.rs index 15f6d1119..3b421568a 100644 --- a/util/movement-types/src/transaction.rs +++ b/util/movement-types/src/transaction.rs @@ -43,17 +43,19 @@ impl fmt::Display for Id { #[derive(Serialize, Deserialize, Clone, Default, Debug, PartialEq, Eq, Hash)] pub struct Transaction { data: Vec, + // Application priority is stored low to high, i.e., 0 is the highest priority. + application_priority: u64, sequence_number: u64, id: Id, } impl Transaction { - pub fn new(data: Vec, sequence_number: u64) -> Self { + pub fn new(data: Vec, application_priority: u64, sequence_number: u64) -> Self { let mut hasher = blake3::Hasher::new(); hasher.update(&data); hasher.update(&sequence_number.to_le_bytes()); let id = Id(hasher.finalize().into()); - Self { data, sequence_number, id } + Self { data, sequence_number, application_priority, id } } pub fn id(&self) -> Id { @@ -64,18 +66,31 @@ impl Transaction { &self.data } + /// Returns the application priority of the transaction. + /// The lower the value, the higher the priority. + /// If you are using a high value, high priority scheme, simply subtract the priority from the maximum value. + pub fn application_priority(&self) -> u64 { + self.application_priority + } + pub fn sequence_number(&self) -> u64 { self.sequence_number } pub fn test() -> Self { - Self::new(vec![0], 0) + Self::new(vec![0], 0, 0) } } impl Ord for Transaction { fn cmp(&self, other: &Self) -> Ordering { - // First, compare by sequence_number + // First, compare by application_priority + match self.application_priority.cmp(&other.application_priority) { + Ordering::Equal => {} + non_equal => return non_equal, + } + + // Then compare by sequence number match self.sequence_number().cmp(&other.sequence_number()) { Ordering::Equal => {} non_equal => return non_equal, @@ -98,9 +113,18 @@ mod test { #[test] fn test_transaction_ordering() { + // priority based ordering + let transaction = Transaction::new(vec![1], 0, 0); + let transaction2 = Transaction::new(vec![1], 1, 0); + let transaction3 = Transaction::new(vec![1], 2, 0); + + assert!(transaction < transaction2); + assert!(transaction2 < transaction3); + + // sequencer number based ordering let transaction = Transaction::test(); - let transaction2 = Transaction::new(vec![1], 1); - let transaction3 = Transaction::new(vec![1], 2); + let transaction2 = Transaction::new(vec![1], 0, 1); + let transaction3 = Transaction::new(vec![1], 0, 2); assert!(transaction < transaction2); assert!(transaction2 < transaction3); From d63561e3de782b343e56c9a4906816a3bcf023cb Mon Sep 17 00:00:00 2001 From: Liam Monninger Date: Fri, 25 Oct 2024 12:57:38 -0700 Subject: [PATCH 2/7] feat: application priority. --- .../src/tasks/transaction_ingress.rs | 7 ++++--- protocol-units/execution/dof/src/lib.rs | 2 +- protocol-units/execution/dof/src/v1.rs | 8 ++++---- .../opt-executor/src/executor/initialization.rs | 2 +- .../execution/opt-executor/src/service.rs | 2 +- .../opt-executor/src/transaction_pipe.rs | 17 +++++++++-------- 6 files changed, 20 insertions(+), 18 deletions(-) diff --git a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs index a02108f53..4fbecbd6f 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs @@ -14,14 +14,14 @@ use std::time::{Duration, Instant}; const LOGGING_UID: AtomicU64 = AtomicU64::new(0); pub struct Task { - transaction_receiver: mpsc::Receiver, + transaction_receiver: mpsc::Receiver<(u64, SignedTransaction)>, da_light_node_client: LightNodeServiceClient, da_light_node_config: LightNodeConfig, } impl Task { pub(crate) fn new( - transaction_receiver: mpsc::Receiver, + transaction_receiver: mpsc::Receiver<(u64, SignedTransaction)>, da_light_node_client: LightNodeServiceClient, da_light_node_config: LightNodeConfig, ) -> Self { @@ -63,7 +63,7 @@ impl Task { .await { Ok(transaction) => match transaction { - Some(transaction) => { + Some((application_priority, transaction)) => { info!( target : "movement_timing", batch_id = %batch_id, @@ -75,6 +75,7 @@ impl Task { let serialized_aptos_transaction = serde_json::to_vec(&transaction)?; let movement_transaction = movement_types::transaction::Transaction::new( serialized_aptos_transaction, + application_priority, transaction.sequence_number(), ); let serialized_transaction = serde_json::to_vec(&movement_transaction)?; diff --git a/protocol-units/execution/dof/src/lib.rs b/protocol-units/execution/dof/src/lib.rs index a1beebf16..5710ca011 100644 --- a/protocol-units/execution/dof/src/lib.rs +++ b/protocol-units/execution/dof/src/lib.rs @@ -26,7 +26,7 @@ pub trait DynOptFinExecutor { /// Initialize the background task responsible for transaction processing. fn background( &self, - transaction_sender: Sender, + transaction_sender: Sender<(u64, SignedTransaction)>, config: &Config, ) -> Result< (Self::Context, impl Future> + Send + 'static), diff --git a/protocol-units/execution/dof/src/v1.rs b/protocol-units/execution/dof/src/v1.rs index 21544ac10..58fec6396 100644 --- a/protocol-units/execution/dof/src/v1.rs +++ b/protocol-units/execution/dof/src/v1.rs @@ -52,7 +52,7 @@ impl DynOptFinExecutor for Executor { fn background( &self, - transaction_sender: Sender, + transaction_sender: Sender<(u64, SignedTransaction)>, config: &Config, ) -> Result< (Context, impl Future> + Send + 'static), @@ -226,7 +226,7 @@ mod tests { services_handle.abort(); background_handle.abort(); - let received_transaction = tx_receiver.recv().await.unwrap(); + let (_application_priority, received_transaction) = tx_receiver.recv().await.unwrap(); assert_eq!(received_transaction, comparison_user_transaction); Ok(()) @@ -252,7 +252,7 @@ mod tests { let request = SubmitTransactionPost::Bcs(aptos_api::bcs_payload::Bcs(bcs_user_transaction)); api.transactions.submit_transaction(AcceptType::Bcs, request).await?; - let received_transaction = tx_receiver.recv().await.unwrap(); + let (_application_priority, received_transaction) = tx_receiver.recv().await.unwrap(); assert_eq!(received_transaction, comparison_user_transaction); // Now execute the block @@ -319,7 +319,7 @@ mod tests { SubmitTransactionPost::Bcs(aptos_api::bcs_payload::Bcs(bcs_user_transaction)); api.transactions.submit_transaction(AcceptType::Bcs, request).await?; - let received_transaction = tx_receiver.recv().await.unwrap(); + let (_application_priority, received_transaction) = tx_receiver.recv().await.unwrap(); assert_eq!(received_transaction, comparison_user_transaction); // Now execute the block diff --git a/protocol-units/execution/opt-executor/src/executor/initialization.rs b/protocol-units/execution/opt-executor/src/executor/initialization.rs index c52e5b815..676e5333f 100644 --- a/protocol-units/execution/opt-executor/src/executor/initialization.rs +++ b/protocol-units/execution/opt-executor/src/executor/initialization.rs @@ -117,7 +117,7 @@ impl Executor { /// task needs to be running. pub fn background( &self, - transaction_sender: mpsc::Sender, + transaction_sender: mpsc::Sender<(u64, SignedTransaction)>, ) -> anyhow::Result<(Context, TransactionPipe)> { let node_config = self.node_config.clone(); let maptos_config = self.config.clone(); diff --git a/protocol-units/execution/opt-executor/src/service.rs b/protocol-units/execution/opt-executor/src/service.rs index 0d814ae35..ff5beb54f 100644 --- a/protocol-units/execution/opt-executor/src/service.rs +++ b/protocol-units/execution/opt-executor/src/service.rs @@ -139,7 +139,7 @@ mod tests { assert_eq!(status.code, MempoolStatusCode::Accepted); // receive the transaction - let received_transaction = tx_receiver.recv().await.unwrap(); + let (_priority, received_transaction) = tx_receiver.recv().await.unwrap(); assert_eq!(received_transaction, user_transaction); handle.abort(); diff --git a/protocol-units/execution/opt-executor/src/transaction_pipe.rs b/protocol-units/execution/opt-executor/src/transaction_pipe.rs index 5f66137fc..02a3c6c5f 100644 --- a/protocol-units/execution/opt-executor/src/transaction_pipe.rs +++ b/protocol-units/execution/opt-executor/src/transaction_pipe.rs @@ -43,7 +43,7 @@ pub struct TransactionPipe { // The receiver for the mempool client. mempool_client_receiver: futures_mpsc::Receiver, // Sender for the channel with accepted transactions. - transaction_sender: mpsc::Sender, + transaction_sender: mpsc::Sender<(u64, SignedTransaction)>, // Access to the ledger DB. TODO: reuse an instance of VMValidator db_reader: Arc, // State of the Aptos mempool @@ -66,7 +66,7 @@ enum SequenceNumberValidity { impl TransactionPipe { pub(crate) fn new( mempool_client_receiver: futures_mpsc::Receiver, - transaction_sender: mpsc::Sender, + transaction_sender: mpsc::Sender<(u64, SignedTransaction)>, db_reader: Arc, node_config: &NodeConfig, transactions_in_flight: Arc, @@ -215,6 +215,7 @@ impl TransactionPipe { // Re-create the validator for each Tx because it uses a frozen version of the ledger. let vm_validator = VMValidator::new(Arc::clone(&self.db_reader)); let tx_result = vm_validator.validate_transaction(transaction.clone())?; + let application_priority = tx_result.score(); match tx_result.status() { Some(_) => { let ms = MempoolStatus::new(MempoolStatusCode::VmError); @@ -249,7 +250,7 @@ impl TransactionPipe { let sender = transaction.sender(); let transaction_sequence_number = transaction.sequence_number(); self.transaction_sender - .send(transaction) + .send((application_priority, transaction)) .await .map_err(|e| anyhow::anyhow!("Error sending transaction: {:?}", e))?; // increment transactions in flight @@ -302,9 +303,9 @@ mod tests { use futures::SinkExt; use maptos_execution_util::config::chain::Config; - fn setup() -> (TransactionPipe, MempoolClientSender, mpsc::Receiver) { + fn setup() -> (TransactionPipe, MempoolClientSender, mpsc::Receiver<(u64, SignedTransaction)>) { let (tx_sender, tx_receiver) = mpsc::channel(16); - let (executor, config, _tempdir) = + let (executor, _config, _tempdir) = Executor::try_test_default(GENESIS_KEYPAIR.0.clone()).unwrap(); let (context, transaction_pipe) = executor.background(tx_sender).unwrap(); (transaction_pipe, context.mempool_client_sender(), tx_receiver) @@ -342,7 +343,7 @@ mod tests { assert_eq!(status.code, MempoolStatusCode::Accepted); // receive the transaction - let received_transaction = tx_receiver.recv().await.unwrap(); + let (_application_priority, received_transaction) = tx_receiver.recv().await.unwrap(); assert_eq!(received_transaction, user_transaction); Ok(()) @@ -391,7 +392,7 @@ mod tests { assert_eq!(status.code, MempoolStatusCode::Accepted); // receive the transaction - let received_transaction = + let (_application_priority, received_transaction) = tx_receiver.recv().await.ok_or(anyhow::anyhow!("No transaction received"))?; assert_eq!(received_transaction, user_transaction); @@ -433,7 +434,7 @@ mod tests { let bcs_user_transaction = bcs::to_bytes(&user_transaction)?; let request = SubmitTransactionPost::Bcs(aptos_api::bcs_payload::Bcs(bcs_user_transaction)); api.transactions.submit_transaction(AcceptType::Bcs, request).await?; - let received_transaction = tx_receiver.recv().await.unwrap(); + let (_application_priority, received_transaction) = tx_receiver.recv().await.unwrap(); assert_eq!(received_transaction, comparison_user_transaction); mempool_handle.abort(); From d636ad3a209d990132e4fdebeebc53857797ff6e Mon Sep 17 00:00:00 2001 From: Liam Monninger Date: Fri, 25 Oct 2024 13:04:03 -0700 Subject: [PATCH 3/7] feat: aptos application priority. --- protocol-units/execution/opt-executor/src/transaction_pipe.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/protocol-units/execution/opt-executor/src/transaction_pipe.rs b/protocol-units/execution/opt-executor/src/transaction_pipe.rs index 02a3c6c5f..a2cdc6a97 100644 --- a/protocol-units/execution/opt-executor/src/transaction_pipe.rs +++ b/protocol-units/execution/opt-executor/src/transaction_pipe.rs @@ -215,7 +215,8 @@ impl TransactionPipe { // Re-create the validator for each Tx because it uses a frozen version of the ledger. let vm_validator = VMValidator::new(Arc::clone(&self.db_reader)); let tx_result = vm_validator.validate_transaction(transaction.clone())?; - let application_priority = tx_result.score(); + // invert the application priority with the u64 max minus the score from aptos (which is high to low) + let application_priority = u64::MAX - tx_result.score(); match tx_result.status() { Some(_) => { let ms = MempoolStatus::new(MempoolStatusCode::VmError); From fac18085d54edef19bac327d0050e14f37710d23 Mon Sep 17 00:00:00 2001 From: Liam Monninger Date: Fri, 25 Oct 2024 13:18:03 -0700 Subject: [PATCH 4/7] fix: application priority in comparison. --- protocol-units/execution/opt-executor/src/transaction_pipe.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol-units/execution/opt-executor/src/transaction_pipe.rs b/protocol-units/execution/opt-executor/src/transaction_pipe.rs index a2cdc6a97..50c78e5d0 100644 --- a/protocol-units/execution/opt-executor/src/transaction_pipe.rs +++ b/protocol-units/execution/opt-executor/src/transaction_pipe.rs @@ -470,7 +470,7 @@ mod tests { SubmitTransactionPost::Bcs(aptos_api::bcs_payload::Bcs(bcs_user_transaction)); api.transactions.submit_transaction(AcceptType::Bcs, request).await?; - let received_transaction = tx_receiver.recv().await.unwrap(); + let (_application_priority, received_transaction) = tx_receiver.recv().await.unwrap(); let bcs_received_transaction = bcs::to_bytes(&received_transaction)?; comparison_user_transactions.insert(bcs_received_transaction.clone()); } From cddacac48b3cb14b96614c3310bc56af473f0129 Mon Sep 17 00:00:00 2001 From: Liam Monninger Date: Fri, 25 Oct 2024 15:24:29 -0700 Subject: [PATCH 5/7] fix: total ordering test. --- protocol-units/mempool/move-rocks/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/protocol-units/mempool/move-rocks/src/lib.rs b/protocol-units/mempool/move-rocks/src/lib.rs index b649f62b7..44382f76a 100644 --- a/protocol-units/mempool/move-rocks/src/lib.rs +++ b/protocol-units/mempool/move-rocks/src/lib.rs @@ -585,11 +585,11 @@ pub mod tests { let mempool = RocksdbMempool::try_new(path)?; let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0, 0), 0); - let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![1], 0, 1), 0); - let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![1], 0, 1), 1); - let transaction4 = MempoolTransaction::at_time(Transaction::new(vec![2], 0, 1), 1); - let transaction5 = MempoolTransaction::at_time(Transaction::new(vec![1], 1, 1), 1); - let transaction6 = MempoolTransaction::at_time(Transaction::new(vec![1], 1, 2), 1); + let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 0, 1), 0); + let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![3], 0, 1), 2); + let transaction4 = MempoolTransaction::at_time(Transaction::new(vec![4], 1, 1), 2); + let transaction5 = MempoolTransaction::at_time(Transaction::new(vec![5], 1, 2), 4); + let transaction6 = MempoolTransaction::at_time(Transaction::new(vec![6], 1, 2), 6); mempool.add_mempool_transaction(transaction2.clone()).await?; mempool.add_mempool_transaction(transaction1.clone()).await?; From a575da4226935fe1a220ba303e2daec1663d8396 Mon Sep 17 00:00:00 2001 From: Liam Monninger Date: Fri, 25 Oct 2024 15:26:34 -0700 Subject: [PATCH 6/7] fix: please stop introducing unwraps. --- protocol-units/mempool/move-rocks/src/lib.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/protocol-units/mempool/move-rocks/src/lib.rs b/protocol-units/mempool/move-rocks/src/lib.rs index 44382f76a..d96494631 100644 --- a/protocol-units/mempool/move-rocks/src/lib.rs +++ b/protocol-units/mempool/move-rocks/src/lib.rs @@ -20,7 +20,7 @@ pub struct RocksdbMempool { db: Arc, } -fn construct_mempool_transaction_key(transaction: &MempoolTransaction) -> String { +fn construct_mempool_transaction_key(transaction: &MempoolTransaction) -> Result { // Pre-allocate a string with the required capacity let mut key = String::with_capacity(32 + 1 + 32 + 1 + 32 + 1 + 32); // Write key components. The numbers are zero-padded to 32 characters. @@ -31,14 +31,15 @@ fn construct_mempool_transaction_key(transaction: &MempoolTransaction) -> String transaction.transaction.sequence_number(), transaction.transaction.id(), )) - .unwrap(); - key + .map_err(|_| Error::msg("Error writing mempool transaction key"))?; + Ok(key) } -fn construct_timestamp_threshold_key(timestamp_threshold: u64) -> String { +fn construct_timestamp_threshold_key(timestamp_threshold: u64) -> Result { let mut key = String::with_capacity(32 + 1); - key.write_fmt(format_args!("{:032}:", timestamp_threshold)).unwrap(); - key + key.write_fmt(format_args!("{:032}:", timestamp_threshold)) + .map_err(|_| Error::msg("Error writing timestamp threshold key"))?; + Ok(key) } impl RocksdbMempool { @@ -140,7 +141,7 @@ impl MempoolTransactionOperations for RocksdbMempool { } let serialized_transaction = bcs::to_bytes(&transaction)?; - let key = construct_mempool_transaction_key(&transaction); + let key = construct_mempool_transaction_key(&transaction)?; batch.put_cf(&mempool_transactions_cf_handle, &key, &serialized_transaction); batch.put_cf( &transaction_lookups_cf_handle, @@ -175,7 +176,7 @@ impl MempoolTransactionOperations for RocksdbMempool { let mut batch = WriteBatch::default(); - let key = construct_mempool_transaction_key(&transaction); + let key = construct_mempool_transaction_key(&transaction)?; batch.put_cf(&mempool_transactions_cf_handle, &key, &serialized_transaction); batch.put_cf( &transaction_lookups_cf_handle, @@ -336,7 +337,7 @@ impl MempoolTransactionOperations for RocksdbMempool { .ok_or_else(|| Error::msg("CF handle not found"))?; let mut read_options = ReadOptions::default(); read_options - .set_iterate_upper_bound(construct_timestamp_threshold_key(timestamp_threshold)); + .set_iterate_upper_bound(construct_timestamp_threshold_key(timestamp_threshold)?); let mut iter = db.iterator_cf_opt(&cf_handle, read_options, IteratorMode::Start); let mut transaction_count = 0; let mut batch = WriteBatch::default(); From 7c22ea70d508f37d4eaf94c74ce6ee9722fe4947 Mon Sep 17 00:00:00 2001 From: Liam Monninger Date: Thu, 14 Nov 2024 16:25:13 +0100 Subject: [PATCH 7/7] fix: bad merge elements. --- .../suzuka/suzuka-full-node/src/partial.rs | 4 +++- protocol-units/execution/dof/src/v1.rs | 24 +++++++++---------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/networks/suzuka/suzuka-full-node/src/partial.rs b/networks/suzuka/suzuka-full-node/src/partial.rs index 43dda8a0d..e322f032b 100644 --- a/networks/suzuka/suzuka-full-node/src/partial.rs +++ b/networks/suzuka/suzuka-full-node/src/partial.rs @@ -31,7 +31,9 @@ where /// Runs the executor until crash or shutdown. pub async fn run(self) -> Result<(), anyhow::Error> { let (transaction_sender, transaction_receiver) = mpsc::channel(16); - let (context, exec_background) = self.executor.background(transaction_sender)?; + let (context, exec_background) = self + .executor + .background(transaction_sender, &self.config.execution_config.maptos_config)?; let services = context.services(); let mut movement_rest = self.movement_rest; movement_rest.set_context(services.opt_api_context()); diff --git a/protocol-units/execution/dof/src/v1.rs b/protocol-units/execution/dof/src/v1.rs index 20e1b2230..35a9e731a 100644 --- a/protocol-units/execution/dof/src/v1.rs +++ b/protocol-units/execution/dof/src/v1.rs @@ -221,9 +221,9 @@ mod tests { let private_key = Ed25519PrivateKey::generate_for_testing(); let mut config = Config::default(); config.chain.maptos_private_key = private_key.clone(); - let (executor, _tempdir) = setup(config)?; + let (executor, _tempdir) = setup(config.clone())?; let (tx_sender, mut tx_receiver) = mpsc::channel(16); - let (context, background) = executor.background(tx_sender)?; + let (context, background) = executor.background(tx_sender, &config)?; let services = context.services(); let api = services.get_opt_apis(); @@ -253,8 +253,8 @@ mod tests { config.chain.maptos_private_key = private_key.clone(); config.chain.maptos_read_only = true; let (tx_sender, _tx_receiver) = mpsc::channel(16); - let (executor, _tempdir) = setup(config)?; - let (context, background) = executor.background(tx_sender)?; + let (executor, _tempdir) = setup(config.clone())?; + let (context, background) = executor.background(tx_sender, &config)?; let services = context.services(); let api = services.get_opt_apis(); @@ -281,9 +281,9 @@ mod tests { let private_key = Ed25519PrivateKey::generate_for_testing(); let mut config = Config::default(); config.chain.maptos_private_key = private_key.clone(); - let (executor, _tempdir) = setup(config)?; + let (executor, _tempdir) = setup(config.clone())?; let (tx_sender, mut tx_receiver) = mpsc::channel(16); - let (context, background) = executor.background(tx_sender)?; + let (context, background) = executor.background(tx_sender, &config)?; let services = context.services(); let api = services.get_opt_apis(); @@ -340,9 +340,9 @@ mod tests { let private_key = Ed25519PrivateKey::generate_for_testing(); let mut config = Config::default(); config.chain.maptos_private_key = private_key.clone(); - let (executor, _tempdir) = setup(config)?; + let (executor, _tempdir) = setup(config.clone())?; let (tx_sender, mut tx_receiver) = mpsc::channel(16); - let (context, background) = executor.background(tx_sender)?; + let (context, background) = executor.background(tx_sender, &config)?; let services = context.services(); let api = services.get_opt_apis(); @@ -422,8 +422,8 @@ mod tests { // Create an executor instance from the environment configuration. let config = Config::default(); let (tx_sender, _tx_receiver) = mpsc::channel(16); - let executor = Executor::try_from_config(config)?; - let (context, background) = executor.background(tx_sender)?; + let executor = Executor::try_from_config(config.clone())?; + let (context, background) = executor.background(tx_sender, &config)?; let config = executor.config(); let services = context.services(); let apis = services.get_opt_apis(); @@ -495,8 +495,8 @@ mod tests { // Create an executor instance from the environment configuration. let config = Config::default(); let (tx_sender, _tx_receiver) = mpsc::channel(16); - let executor = Executor::try_from_config(config)?; - let (context, background) = executor.background(tx_sender)?; + let executor = Executor::try_from_config(config.clone())?; + let (context, background) = executor.background(tx_sender, &config)?; let config = executor.config(); let services = context.services();