diff --git a/networks/suzuka/suzuka-full-node/src/partial.rs b/networks/suzuka/suzuka-full-node/src/partial.rs index 43dda8a0d..e322f032b 100644 --- a/networks/suzuka/suzuka-full-node/src/partial.rs +++ b/networks/suzuka/suzuka-full-node/src/partial.rs @@ -31,7 +31,9 @@ where /// Runs the executor until crash or shutdown. pub async fn run(self) -> Result<(), anyhow::Error> { let (transaction_sender, transaction_receiver) = mpsc::channel(16); - let (context, exec_background) = self.executor.background(transaction_sender)?; + let (context, exec_background) = self + .executor + .background(transaction_sender, &self.config.execution_config.maptos_config)?; let services = context.services(); let mut movement_rest = self.movement_rest; movement_rest.set_context(services.opt_api_context()); diff --git a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs index a02108f53..4fbecbd6f 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs @@ -14,14 +14,14 @@ use std::time::{Duration, Instant}; const LOGGING_UID: AtomicU64 = AtomicU64::new(0); pub struct Task { - transaction_receiver: mpsc::Receiver, + transaction_receiver: mpsc::Receiver<(u64, SignedTransaction)>, da_light_node_client: LightNodeServiceClient, da_light_node_config: LightNodeConfig, } impl Task { pub(crate) fn new( - transaction_receiver: mpsc::Receiver, + transaction_receiver: mpsc::Receiver<(u64, SignedTransaction)>, da_light_node_client: LightNodeServiceClient, da_light_node_config: LightNodeConfig, ) -> Self { @@ -63,7 +63,7 @@ impl Task { .await { Ok(transaction) => match transaction { - Some(transaction) => { + Some((application_priority, transaction)) => { info!( target : "movement_timing", batch_id = %batch_id, @@ -75,6 +75,7 @@ impl Task { let serialized_aptos_transaction = serde_json::to_vec(&transaction)?; let movement_transaction = movement_types::transaction::Transaction::new( serialized_aptos_transaction, + application_priority, transaction.sequence_number(), ); let serialized_transaction = serde_json::to_vec(&movement_transaction)?; diff --git a/package-lock.json b/package-lock.json index 915069eb0..eb8637b2b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,5 +1,5 @@ { - "name": "movement", + "name": "0", "lockfileVersion": 3, "requires": true, "packages": { diff --git a/protocol-units/execution/dof/src/lib.rs b/protocol-units/execution/dof/src/lib.rs index 6eae44165..5710ca011 100644 --- a/protocol-units/execution/dof/src/lib.rs +++ b/protocol-units/execution/dof/src/lib.rs @@ -26,7 +26,8 @@ pub trait DynOptFinExecutor { /// Initialize the background task responsible for transaction processing. fn background( &self, - transaction_sender: Sender, + transaction_sender: Sender<(u64, SignedTransaction)>, + config: &Config, ) -> Result< (Self::Context, impl Future> + Send + 'static), anyhow::Error, diff --git a/protocol-units/execution/dof/src/v1.rs b/protocol-units/execution/dof/src/v1.rs index 1aafede1a..35a9e731a 100644 --- a/protocol-units/execution/dof/src/v1.rs +++ b/protocol-units/execution/dof/src/v1.rs @@ -52,7 +52,8 @@ impl DynOptFinExecutor for Executor { fn background( &self, - transaction_sender: Sender, + transaction_sender: Sender<(u64, SignedTransaction)>, + config: &Config, ) -> Result< (Context, impl Future> + Send + 'static), anyhow::Error, @@ -220,9 +221,9 @@ mod tests { let private_key = Ed25519PrivateKey::generate_for_testing(); let mut config = Config::default(); config.chain.maptos_private_key = private_key.clone(); - let (executor, _tempdir) = setup(config)?; + let (executor, _tempdir) = setup(config.clone())?; let (tx_sender, mut tx_receiver) = mpsc::channel(16); - let (context, background) = executor.background(tx_sender)?; + let (context, background) = executor.background(tx_sender, &config)?; let services = context.services(); let api = services.get_opt_apis(); @@ -239,7 +240,7 @@ mod tests { services_handle.abort(); background_handle.abort(); - let received_transaction = tx_receiver.recv().await.unwrap(); + let (_application_priority, received_transaction) = tx_receiver.recv().await.unwrap(); assert_eq!(received_transaction, comparison_user_transaction); Ok(()) @@ -252,8 +253,8 @@ mod tests { config.chain.maptos_private_key = private_key.clone(); config.chain.maptos_read_only = true; let (tx_sender, _tx_receiver) = mpsc::channel(16); - let (executor, _tempdir) = setup(config)?; - let (context, background) = executor.background(tx_sender)?; + let (executor, _tempdir) = setup(config.clone())?; + let (context, background) = executor.background(tx_sender, &config)?; let services = context.services(); let api = services.get_opt_apis(); @@ -280,9 +281,9 @@ mod tests { let private_key = Ed25519PrivateKey::generate_for_testing(); let mut config = Config::default(); config.chain.maptos_private_key = private_key.clone(); - let (executor, _tempdir) = setup(config)?; + let (executor, _tempdir) = setup(config.clone())?; let (tx_sender, mut tx_receiver) = mpsc::channel(16); - let (context, background) = executor.background(tx_sender)?; + let (context, background) = executor.background(tx_sender, &config)?; let services = context.services(); let api = services.get_opt_apis(); @@ -297,7 +298,7 @@ mod tests { let request = SubmitTransactionPost::Bcs(aptos_api::bcs_payload::Bcs(bcs_user_transaction)); api.transactions.submit_transaction(AcceptType::Bcs, request).await?; - let received_transaction = tx_receiver.recv().await.unwrap(); + let (_application_priority, received_transaction) = tx_receiver.recv().await.unwrap(); assert_eq!(received_transaction, comparison_user_transaction); // Now execute the block @@ -339,9 +340,9 @@ mod tests { let private_key = Ed25519PrivateKey::generate_for_testing(); let mut config = Config::default(); config.chain.maptos_private_key = private_key.clone(); - let (executor, _tempdir) = setup(config)?; + let (executor, _tempdir) = setup(config.clone())?; let (tx_sender, mut tx_receiver) = mpsc::channel(16); - let (context, background) = executor.background(tx_sender)?; + let (context, background) = executor.background(tx_sender, &config)?; let services = context.services(); let api = services.get_opt_apis(); @@ -366,7 +367,7 @@ mod tests { SubmitTransactionPost::Bcs(aptos_api::bcs_payload::Bcs(bcs_user_transaction)); api.transactions.submit_transaction(AcceptType::Bcs, request).await?; - let received_transaction = tx_receiver.recv().await.unwrap(); + let (_application_priority, received_transaction) = tx_receiver.recv().await.unwrap(); assert_eq!(received_transaction, comparison_user_transaction); // Now execute the block @@ -421,8 +422,8 @@ mod tests { // Create an executor instance from the environment configuration. let config = Config::default(); let (tx_sender, _tx_receiver) = mpsc::channel(16); - let executor = Executor::try_from_config(config)?; - let (context, background) = executor.background(tx_sender)?; + let executor = Executor::try_from_config(config.clone())?; + let (context, background) = executor.background(tx_sender, &config)?; let config = executor.config(); let services = context.services(); let apis = services.get_opt_apis(); @@ -494,8 +495,8 @@ mod tests { // Create an executor instance from the environment configuration. let config = Config::default(); let (tx_sender, _tx_receiver) = mpsc::channel(16); - let executor = Executor::try_from_config(config)?; - let (context, background) = executor.background(tx_sender)?; + let executor = Executor::try_from_config(config.clone())?; + let (context, background) = executor.background(tx_sender, &config)?; let config = executor.config(); let services = context.services(); diff --git a/protocol-units/execution/opt-executor/src/background/task.rs b/protocol-units/execution/opt-executor/src/background/task.rs index 96f0d1cd4..429ed1817 100644 --- a/protocol-units/execution/opt-executor/src/background/task.rs +++ b/protocol-units/execution/opt-executor/src/background/task.rs @@ -28,7 +28,7 @@ impl BackgroundTask { /// Constructs the full background tasks for transaction processing. pub(crate) fn transaction_pipe( mempool_client_receiver: futures_mpsc::Receiver, - transaction_sender: mpsc::Sender, + transaction_sender: mpsc::Sender<(u64, SignedTransaction)>, db_reader: Arc, node_config: &NodeConfig, mempool_config: &MempoolConfig, diff --git a/protocol-units/execution/opt-executor/src/background/transaction_pipe.rs b/protocol-units/execution/opt-executor/src/background/transaction_pipe.rs index 1c3a6e48b..e16c345b3 100644 --- a/protocol-units/execution/opt-executor/src/background/transaction_pipe.rs +++ b/protocol-units/execution/opt-executor/src/background/transaction_pipe.rs @@ -29,7 +29,7 @@ pub struct TransactionPipe { // The receiver for the mempool client. mempool_client_receiver: futures_mpsc::Receiver, // Sender for the channel with accepted transactions. - transaction_sender: mpsc::Sender, + transaction_sender: mpsc::Sender<(u64, SignedTransaction)>, // Access to the ledger DB. TODO: reuse an instance of VMValidator db_reader: Arc, // State of the Aptos mempool @@ -52,7 +52,7 @@ enum SequenceNumberValidity { impl TransactionPipe { pub(crate) fn new( mempool_client_receiver: futures_mpsc::Receiver, - transaction_sender: mpsc::Sender, + transaction_sender: mpsc::Sender<(u64, SignedTransaction)>, db_reader: Arc, node_config: &NodeConfig, mempool_config: &MempoolConfig, @@ -204,6 +204,8 @@ impl TransactionPipe { // Re-create the validator for each Tx because it uses a frozen version of the ledger. let vm_validator = VMValidator::new(Arc::clone(&self.db_reader)); let tx_result = vm_validator.validate_transaction(transaction.clone())?; + // invert the application priority with the u64 max minus the score from aptos (which is high to low) + let application_priority = u64::MAX - tx_result.score(); match tx_result.status() { Some(_) => { let ms = MempoolStatus::new(MempoolStatusCode::VmError); @@ -238,7 +240,7 @@ impl TransactionPipe { let sender = transaction.sender(); let transaction_sequence_number = transaction.sequence_number(); self.transaction_sender - .send(transaction) + .send((application_priority, transaction)) .await .map_err(|e| anyhow::anyhow!("Error sending transaction: {:?}", e))?; // increment transactions in flight @@ -291,7 +293,7 @@ mod tests { use maptos_execution_util::config::chain::Config; use tempfile::TempDir; - fn setup() -> (Context, TransactionPipe, mpsc::Receiver, TempDir) { + fn setup() -> (Context, TransactionPipe, mpsc::Receiver<(u64, SignedTransaction)>, TempDir) { let (tx_sender, tx_receiver) = mpsc::channel(16); let (executor, tempdir) = Executor::try_test_default(GENESIS_KEYPAIR.0.clone()).unwrap(); let (context, background) = executor.background(tx_sender).unwrap(); @@ -333,7 +335,7 @@ mod tests { // receive the transaction let received_transaction = tx_receiver.recv().await.unwrap(); - assert_eq!(received_transaction, user_transaction); + assert_eq!(received_transaction.1, user_transaction); Ok(()) } @@ -385,7 +387,7 @@ mod tests { // receive the transaction let received_transaction = tx_receiver.recv().await.ok_or(anyhow::anyhow!("No transaction received"))?; - assert_eq!(received_transaction, user_transaction); + assert_eq!(received_transaction.1, user_transaction); // send the same transaction again let (req_sender, callback) = oneshot::channel(); @@ -424,7 +426,7 @@ mod tests { let request = SubmitTransactionPost::Bcs(aptos_api::bcs_payload::Bcs(bcs_user_transaction)); api.transactions.submit_transaction(AcceptType::Bcs, request).await?; let received_transaction = tx_receiver.recv().await.unwrap(); - assert_eq!(received_transaction, comparison_user_transaction); + assert_eq!(received_transaction.1, comparison_user_transaction); mempool_handle.abort(); @@ -457,7 +459,7 @@ mod tests { api.transactions.submit_transaction(AcceptType::Bcs, request).await?; let received_transaction = tx_receiver.recv().await.unwrap(); - let bcs_received_transaction = bcs::to_bytes(&received_transaction)?; + let bcs_received_transaction = bcs::to_bytes(&received_transaction.1)?; comparison_user_transactions.insert(bcs_received_transaction.clone()); } diff --git a/protocol-units/execution/opt-executor/src/executor/initialization.rs b/protocol-units/execution/opt-executor/src/executor/initialization.rs index cdc8e57ed..49e0f7475 100644 --- a/protocol-units/execution/opt-executor/src/executor/initialization.rs +++ b/protocol-units/execution/opt-executor/src/executor/initialization.rs @@ -135,7 +135,7 @@ impl Executor { /// task needs to be running. pub fn background( &self, - transaction_sender: mpsc::Sender, + transaction_sender: mpsc::Sender<(u64, SignedTransaction)>, ) -> anyhow::Result<(Context, BackgroundTask)> { let node_config = self.node_config.clone(); let maptos_config = self.config.clone(); diff --git a/protocol-units/execution/opt-executor/src/service.rs b/protocol-units/execution/opt-executor/src/service.rs index a206ec686..45ed519a6 100644 --- a/protocol-units/execution/opt-executor/src/service.rs +++ b/protocol-units/execution/opt-executor/src/service.rs @@ -140,7 +140,7 @@ mod tests { assert_eq!(status.code, MempoolStatusCode::Accepted); // receive the transaction - let received_transaction = tx_receiver.recv().await.unwrap(); + let (_priority, received_transaction) = tx_receiver.recv().await.unwrap(); assert_eq!(received_transaction, user_transaction); handle.abort(); diff --git a/protocol-units/mempool/move-rocks/src/lib.rs b/protocol-units/mempool/move-rocks/src/lib.rs index c9f267a29..d96494631 100644 --- a/protocol-units/mempool/move-rocks/src/lib.rs +++ b/protocol-units/mempool/move-rocks/src/lib.rs @@ -20,24 +20,26 @@ pub struct RocksdbMempool { db: Arc, } -fn construct_mempool_transaction_key(transaction: &MempoolTransaction) -> String { +fn construct_mempool_transaction_key(transaction: &MempoolTransaction) -> Result { // Pre-allocate a string with the required capacity - let mut key = String::with_capacity(32 + 1 + 32 + 1 + 32); + let mut key = String::with_capacity(32 + 1 + 32 + 1 + 32 + 1 + 32); // Write key components. The numbers are zero-padded to 32 characters. key.write_fmt(format_args!( - "{:032}:{:032}:{}", + "{:032}:{:032}:{:032}:{}", + transaction.transaction.application_priority(), transaction.timestamp, transaction.transaction.sequence_number(), transaction.transaction.id(), )) - .unwrap(); - key + .map_err(|_| Error::msg("Error writing mempool transaction key"))?; + Ok(key) } -fn construct_timestamp_threshold_key(timestamp_threshold: u64) -> String { +fn construct_timestamp_threshold_key(timestamp_threshold: u64) -> Result { let mut key = String::with_capacity(32 + 1); - key.write_fmt(format_args!("{:032}:", timestamp_threshold)).unwrap(); - key + key.write_fmt(format_args!("{:032}:", timestamp_threshold)) + .map_err(|_| Error::msg("Error writing timestamp threshold key"))?; + Ok(key) } impl RocksdbMempool { @@ -139,7 +141,7 @@ impl MempoolTransactionOperations for RocksdbMempool { } let serialized_transaction = bcs::to_bytes(&transaction)?; - let key = construct_mempool_transaction_key(&transaction); + let key = construct_mempool_transaction_key(&transaction)?; batch.put_cf(&mempool_transactions_cf_handle, &key, &serialized_transaction); batch.put_cf( &transaction_lookups_cf_handle, @@ -174,7 +176,7 @@ impl MempoolTransactionOperations for RocksdbMempool { let mut batch = WriteBatch::default(); - let key = construct_mempool_transaction_key(&transaction); + let key = construct_mempool_transaction_key(&transaction)?; batch.put_cf(&mempool_transactions_cf_handle, &key, &serialized_transaction); batch.put_cf( &transaction_lookups_cf_handle, @@ -335,7 +337,7 @@ impl MempoolTransactionOperations for RocksdbMempool { .ok_or_else(|| Error::msg("CF handle not found"))?; let mut read_options = ReadOptions::default(); read_options - .set_iterate_upper_bound(construct_timestamp_threshold_key(timestamp_threshold)); + .set_iterate_upper_bound(construct_timestamp_threshold_key(timestamp_threshold)?); let mut iter = db.iterator_cf_opt(&cf_handle, read_options, IteratorMode::Start); let mut transaction_count = 0; let mut batch = WriteBatch::default(); @@ -469,14 +471,14 @@ pub mod tests { let path = temp_dir.path().to_str().unwrap(); let mempool = RocksdbMempool::try_new(path)?; - let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0), 2); + let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0, 0), 2); let transaction1_id = transaction1.id(); mempool.add_mempool_transaction(transaction1).await?; assert!(mempool.has_transaction(transaction1_id).await?); sleep(Duration::from_secs(2)).await; - let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 0), 64); + let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 0, 0), 64); let transaction2_id = transaction2.id(); let transaction2_timestamp = transaction2.timestamp; mempool.add_mempool_transaction(transaction2).await?; @@ -495,9 +497,9 @@ pub mod tests { let path = temp_dir.path().to_str().unwrap(); let mempool = RocksdbMempool::try_new(path)?; - let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0), 2); - let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 0), 64); - let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![3], 0), 128); + let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0, 0), 2); + let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 0, 0), 64); + let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![3], 0, 0), 128); mempool.add_mempool_transaction(transaction2.clone()).await?; mempool.add_mempool_transaction(transaction1.clone()).await?; @@ -517,9 +519,9 @@ pub mod tests { let path = temp_dir.path().to_str().unwrap(); let mempool = RocksdbMempool::try_new(path)?; - let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0), 2); - let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 1), 2); - let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![3], 0), 64); + let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0, 0), 2); + let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 0, 1), 2); + let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![3], 0, 0), 64); mempool.add_mempool_transaction(transaction2.clone()).await?; mempool.add_mempool_transaction(transaction1.clone()).await?; @@ -539,9 +541,9 @@ pub mod tests { let path = temp_dir.path().to_str().unwrap(); let mempool = RocksdbMempool::try_new(path)?; - let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0), 0); - let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 1), 0); - let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![3], 2), 0); + let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0, 0), 0); + let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 0, 1), 0); + let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![3], 0, 2), 0); mempool.add_mempool_transaction(transaction2.clone()).await?; mempool.add_mempool_transaction(transaction1.clone()).await?; @@ -554,4 +556,57 @@ pub mod tests { Ok(()) } + + #[tokio::test] + async fn test_application_priority_based_ordering() -> Result<(), Error> { + let temp_dir = tempdir().unwrap(); + let path = temp_dir.path().to_str().unwrap(); + let mempool = RocksdbMempool::try_new(path)?; + + let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0, 0), 0); + let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 1, 0), 0); + let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![3], 2, 0), 0); + + mempool.add_mempool_transaction(transaction2.clone()).await?; + mempool.add_mempool_transaction(transaction1.clone()).await?; + mempool.add_mempool_transaction(transaction3.clone()).await?; + + let transactions = mempool.pop_mempool_transactions(3).await?; + assert_eq!(transactions[0], transaction1); + assert_eq!(transactions[1], transaction2); + assert_eq!(transactions[2], transaction3); + + Ok(()) + } + + #[tokio::test] + async fn test_total_ordering() -> Result<(), Error> { + let temp_dir = tempdir().unwrap(); + let path = temp_dir.path().to_str().unwrap(); + let mempool = RocksdbMempool::try_new(path)?; + + let transaction1 = MempoolTransaction::at_time(Transaction::new(vec![1], 0, 0), 0); + let transaction2 = MempoolTransaction::at_time(Transaction::new(vec![2], 0, 1), 0); + let transaction3 = MempoolTransaction::at_time(Transaction::new(vec![3], 0, 1), 2); + let transaction4 = MempoolTransaction::at_time(Transaction::new(vec![4], 1, 1), 2); + let transaction5 = MempoolTransaction::at_time(Transaction::new(vec![5], 1, 2), 4); + let transaction6 = MempoolTransaction::at_time(Transaction::new(vec![6], 1, 2), 6); + + mempool.add_mempool_transaction(transaction2.clone()).await?; + mempool.add_mempool_transaction(transaction1.clone()).await?; + mempool.add_mempool_transaction(transaction3.clone()).await?; + mempool.add_mempool_transaction(transaction5.clone()).await?; + mempool.add_mempool_transaction(transaction4.clone()).await?; + mempool.add_mempool_transaction(transaction6.clone()).await?; + + let transactions = mempool.pop_mempool_transactions(6).await?; + assert_eq!(transactions[0], transaction1); + assert_eq!(transactions[1], transaction2); + assert_eq!(transactions[2], transaction3); + assert_eq!(transactions[3], transaction4); + assert_eq!(transactions[4], transaction5); + assert_eq!(transactions[5], transaction6); + + Ok(()) + } } diff --git a/protocol-units/mempool/util/src/lib.rs b/protocol-units/mempool/util/src/lib.rs index a633f62c3..350e1e31f 100644 --- a/protocol-units/mempool/util/src/lib.rs +++ b/protocol-units/mempool/util/src/lib.rs @@ -76,6 +76,7 @@ pub trait MempoolTransactionOperations { self.has_mempool_transaction(transaction_id).await } + /// Adds transactions to the mempool. async fn add_transactions(&self, transactions: Vec) -> Result<(), anyhow::Error> { let mempool_transactions = transactions.into_iter().map(MempoolTransaction::slot_now).collect(); @@ -158,7 +159,18 @@ impl PartialOrd for MempoolTransaction { /// This allows us to use a BTreeSet to order transactions by slot_seconds, and then by transaction and pop them off in order. impl Ord for MempoolTransaction { fn cmp(&self, other: &Self) -> Ordering { - // First, compare by timestamps + // First, compare the application priority + // Note: this also happens again in the inner transaction comparison, but the priority should come first both in the [MempoolTransaction] and in the [Transaction] by itself. + match self + .transaction + .application_priority() + .cmp(&other.transaction.application_priority()) + { + Ordering::Equal => {} + non_equal => return non_equal, + } + + // Then, compare by timestamps match self.timestamp.cmp(&other.timestamp) { Ordering::Equal => {} non_equal => return non_equal, diff --git a/protocol-units/sequencing/memseq/sequencer/src/lib.rs b/protocol-units/sequencing/memseq/sequencer/src/lib.rs index 0dd351432..89ced0b6e 100644 --- a/protocol-units/sequencing/memseq/sequencer/src/lib.rs +++ b/protocol-units/sequencing/memseq/sequencer/src/lib.rs @@ -156,7 +156,7 @@ pub mod test { // Add some transactions for i in 0..5 { - let transaction = Transaction::new(vec![i as u8], 0); + let transaction = Transaction::new(vec![i as u8], 0, 0); memseq.publish(transaction).await?; } @@ -177,7 +177,7 @@ pub mod test { let parent_block = Arc::new(RwLock::new(block::Id::default())); let memseq = Memseq::new(mempool, 10, parent_block, 1000); - let transaction = Transaction::new(vec![1, 2, 3], 0); + let transaction = Transaction::new(vec![1, 2, 3], 0, 0); let result = memseq.publish(transaction).await; assert!(result.is_err()); assert_eq!(result.unwrap_err().to_string(), "Mock add_transaction"); @@ -200,7 +200,7 @@ pub mod test { for i in 0..100 { let memseq_clone = Arc::clone(&memseq); let handle = tokio::spawn(async move { - let transaction = Transaction::new(vec![i as u8], 0); + let transaction = Transaction::new(vec![i as u8], 0, 0); memseq_clone.publish(transaction).await.unwrap(); }); handles.push(handle); @@ -225,7 +225,7 @@ pub mod test { let memseq_clone = Arc::clone(&memseq); let handle = async move { for n in 0..10 { - let transaction = Transaction::new(vec![i * 10 + n as u8], 0); + let transaction = Transaction::new(vec![i * 10 + n as u8], 0, 0); memseq_clone.publish(transaction).await?; } Ok::<_, anyhow::Error>(()) @@ -324,7 +324,7 @@ pub mod test { let path = dir.path().to_path_buf(); let memseq = Memseq::try_move_rocks(path, 128, 250)?; - let transaction: Transaction = Transaction::new(vec![1, 2, 3], 0); + let transaction: Transaction = Transaction::new(vec![1, 2, 3], 0, 0); memseq.publish(transaction.clone()).await?; let block = memseq.wait_for_next_block().await?; @@ -348,7 +348,7 @@ pub mod test { let mut transactions = Vec::new(); for i in 0..block_size * 2 { - let transaction: Transaction = Transaction::new(vec![i as u8], 0); + let transaction: Transaction = Transaction::new(vec![i as u8], 0, 0); memseq.publish(transaction.clone()).await?; transactions.push(transaction); } @@ -389,7 +389,7 @@ pub mod test { // add half of the transactions for i in 0..block_size / 2 { - let transaction: Transaction = Transaction::new(vec![i as u8], 0); + let transaction: Transaction = Transaction::new(vec![i as u8], 0, 0); memseq.publish(transaction.clone()).await?; } @@ -397,7 +397,7 @@ pub mod test { // add the rest of the transactions for i in block_size / 2..block_size - 2 { - let transaction: Transaction = Transaction::new(vec![i as u8], 0); + let transaction: Transaction = Transaction::new(vec![i as u8], 0, 0); memseq.publish(transaction.clone()).await?; } diff --git a/util/movement-types/src/transaction.rs b/util/movement-types/src/transaction.rs index 15f6d1119..3b421568a 100644 --- a/util/movement-types/src/transaction.rs +++ b/util/movement-types/src/transaction.rs @@ -43,17 +43,19 @@ impl fmt::Display for Id { #[derive(Serialize, Deserialize, Clone, Default, Debug, PartialEq, Eq, Hash)] pub struct Transaction { data: Vec, + // Application priority is stored low to high, i.e., 0 is the highest priority. + application_priority: u64, sequence_number: u64, id: Id, } impl Transaction { - pub fn new(data: Vec, sequence_number: u64) -> Self { + pub fn new(data: Vec, application_priority: u64, sequence_number: u64) -> Self { let mut hasher = blake3::Hasher::new(); hasher.update(&data); hasher.update(&sequence_number.to_le_bytes()); let id = Id(hasher.finalize().into()); - Self { data, sequence_number, id } + Self { data, sequence_number, application_priority, id } } pub fn id(&self) -> Id { @@ -64,18 +66,31 @@ impl Transaction { &self.data } + /// Returns the application priority of the transaction. + /// The lower the value, the higher the priority. + /// If you are using a high value, high priority scheme, simply subtract the priority from the maximum value. + pub fn application_priority(&self) -> u64 { + self.application_priority + } + pub fn sequence_number(&self) -> u64 { self.sequence_number } pub fn test() -> Self { - Self::new(vec![0], 0) + Self::new(vec![0], 0, 0) } } impl Ord for Transaction { fn cmp(&self, other: &Self) -> Ordering { - // First, compare by sequence_number + // First, compare by application_priority + match self.application_priority.cmp(&other.application_priority) { + Ordering::Equal => {} + non_equal => return non_equal, + } + + // Then compare by sequence number match self.sequence_number().cmp(&other.sequence_number()) { Ordering::Equal => {} non_equal => return non_equal, @@ -98,9 +113,18 @@ mod test { #[test] fn test_transaction_ordering() { + // priority based ordering + let transaction = Transaction::new(vec![1], 0, 0); + let transaction2 = Transaction::new(vec![1], 1, 0); + let transaction3 = Transaction::new(vec![1], 2, 0); + + assert!(transaction < transaction2); + assert!(transaction2 < transaction3); + + // sequencer number based ordering let transaction = Transaction::test(); - let transaction2 = Transaction::new(vec![1], 1); - let transaction3 = Transaction::new(vec![1], 2); + let transaction2 = Transaction::new(vec![1], 0, 1); + let transaction3 = Transaction::new(vec![1], 0, 2); assert!(transaction < transaction2); assert!(transaction2 < transaction3);