Skip to content

Commit

Permalink
Merge pull request #749 from movementlabsxyz/l-monninger/application-…
Browse files Browse the repository at this point in the history
…priority

Application Priority in Memseq and Integration with Aptos
  • Loading branch information
l-monninger authored Nov 14, 2024
2 parents 430b36a + 7c22ea7 commit f8e6e30
Show file tree
Hide file tree
Showing 13 changed files with 168 additions and 70 deletions.
4 changes: 3 additions & 1 deletion networks/suzuka/suzuka-full-node/src/partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ where
/// Runs the executor until crash or shutdown.
pub async fn run(self) -> Result<(), anyhow::Error> {
let (transaction_sender, transaction_receiver) = mpsc::channel(16);
let (context, exec_background) = self.executor.background(transaction_sender)?;
let (context, exec_background) = self
.executor
.background(transaction_sender, &self.config.execution_config.maptos_config)?;
let services = context.services();
let mut movement_rest = self.movement_rest;
movement_rest.set_context(services.opt_api_context());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use std::time::{Duration, Instant};
const LOGGING_UID: AtomicU64 = AtomicU64::new(0);

pub struct Task {
transaction_receiver: mpsc::Receiver<SignedTransaction>,
transaction_receiver: mpsc::Receiver<(u64, SignedTransaction)>,
da_light_node_client: LightNodeServiceClient<tonic::transport::Channel>,
da_light_node_config: LightNodeConfig,
}

impl Task {
pub(crate) fn new(
transaction_receiver: mpsc::Receiver<SignedTransaction>,
transaction_receiver: mpsc::Receiver<(u64, SignedTransaction)>,
da_light_node_client: LightNodeServiceClient<tonic::transport::Channel>,
da_light_node_config: LightNodeConfig,
) -> Self {
Expand Down Expand Up @@ -63,7 +63,7 @@ impl Task {
.await
{
Ok(transaction) => match transaction {
Some(transaction) => {
Some((application_priority, transaction)) => {
info!(
target : "movement_timing",
batch_id = %batch_id,
Expand All @@ -75,6 +75,7 @@ impl Task {
let serialized_aptos_transaction = serde_json::to_vec(&transaction)?;
let movement_transaction = movement_types::transaction::Transaction::new(
serialized_aptos_transaction,
application_priority,
transaction.sequence_number(),
);
let serialized_transaction = serde_json::to_vec(&movement_transaction)?;
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion protocol-units/execution/dof/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ pub trait DynOptFinExecutor {
/// Initialize the background task responsible for transaction processing.
fn background(
&self,
transaction_sender: Sender<SignedTransaction>,
transaction_sender: Sender<(u64, SignedTransaction)>,
config: &Config,
) -> Result<
(Self::Context, impl Future<Output = Result<(), anyhow::Error>> + Send + 'static),
anyhow::Error,
Expand Down
33 changes: 17 additions & 16 deletions protocol-units/execution/dof/src/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ impl DynOptFinExecutor for Executor {

fn background(
&self,
transaction_sender: Sender<SignedTransaction>,
transaction_sender: Sender<(u64, SignedTransaction)>,
config: &Config,
) -> Result<
(Context, impl Future<Output = Result<(), anyhow::Error>> + Send + 'static),
anyhow::Error,
Expand Down Expand Up @@ -220,9 +221,9 @@ mod tests {
let private_key = Ed25519PrivateKey::generate_for_testing();
let mut config = Config::default();
config.chain.maptos_private_key = private_key.clone();
let (executor, _tempdir) = setup(config)?;
let (executor, _tempdir) = setup(config.clone())?;
let (tx_sender, mut tx_receiver) = mpsc::channel(16);
let (context, background) = executor.background(tx_sender)?;
let (context, background) = executor.background(tx_sender, &config)?;
let services = context.services();
let api = services.get_opt_apis();

Expand All @@ -239,7 +240,7 @@ mod tests {

services_handle.abort();
background_handle.abort();
let received_transaction = tx_receiver.recv().await.unwrap();
let (_application_priority, received_transaction) = tx_receiver.recv().await.unwrap();
assert_eq!(received_transaction, comparison_user_transaction);

Ok(())
Expand All @@ -252,8 +253,8 @@ mod tests {
config.chain.maptos_private_key = private_key.clone();
config.chain.maptos_read_only = true;
let (tx_sender, _tx_receiver) = mpsc::channel(16);
let (executor, _tempdir) = setup(config)?;
let (context, background) = executor.background(tx_sender)?;
let (executor, _tempdir) = setup(config.clone())?;
let (context, background) = executor.background(tx_sender, &config)?;
let services = context.services();
let api = services.get_opt_apis();

Expand All @@ -280,9 +281,9 @@ mod tests {
let private_key = Ed25519PrivateKey::generate_for_testing();
let mut config = Config::default();
config.chain.maptos_private_key = private_key.clone();
let (executor, _tempdir) = setup(config)?;
let (executor, _tempdir) = setup(config.clone())?;
let (tx_sender, mut tx_receiver) = mpsc::channel(16);
let (context, background) = executor.background(tx_sender)?;
let (context, background) = executor.background(tx_sender, &config)?;
let services = context.services();
let api = services.get_opt_apis();

Expand All @@ -297,7 +298,7 @@ mod tests {
let request = SubmitTransactionPost::Bcs(aptos_api::bcs_payload::Bcs(bcs_user_transaction));
api.transactions.submit_transaction(AcceptType::Bcs, request).await?;

let received_transaction = tx_receiver.recv().await.unwrap();
let (_application_priority, received_transaction) = tx_receiver.recv().await.unwrap();
assert_eq!(received_transaction, comparison_user_transaction);

// Now execute the block
Expand Down Expand Up @@ -339,9 +340,9 @@ mod tests {
let private_key = Ed25519PrivateKey::generate_for_testing();
let mut config = Config::default();
config.chain.maptos_private_key = private_key.clone();
let (executor, _tempdir) = setup(config)?;
let (executor, _tempdir) = setup(config.clone())?;
let (tx_sender, mut tx_receiver) = mpsc::channel(16);
let (context, background) = executor.background(tx_sender)?;
let (context, background) = executor.background(tx_sender, &config)?;
let services = context.services();
let api = services.get_opt_apis();

Expand All @@ -366,7 +367,7 @@ mod tests {
SubmitTransactionPost::Bcs(aptos_api::bcs_payload::Bcs(bcs_user_transaction));
api.transactions.submit_transaction(AcceptType::Bcs, request).await?;

let received_transaction = tx_receiver.recv().await.unwrap();
let (_application_priority, received_transaction) = tx_receiver.recv().await.unwrap();
assert_eq!(received_transaction, comparison_user_transaction);

// Now execute the block
Expand Down Expand Up @@ -421,8 +422,8 @@ mod tests {
// Create an executor instance from the environment configuration.
let config = Config::default();
let (tx_sender, _tx_receiver) = mpsc::channel(16);
let executor = Executor::try_from_config(config)?;
let (context, background) = executor.background(tx_sender)?;
let executor = Executor::try_from_config(config.clone())?;
let (context, background) = executor.background(tx_sender, &config)?;
let config = executor.config();
let services = context.services();
let apis = services.get_opt_apis();
Expand Down Expand Up @@ -494,8 +495,8 @@ mod tests {
// Create an executor instance from the environment configuration.
let config = Config::default();
let (tx_sender, _tx_receiver) = mpsc::channel(16);
let executor = Executor::try_from_config(config)?;
let (context, background) = executor.background(tx_sender)?;
let executor = Executor::try_from_config(config.clone())?;
let (context, background) = executor.background(tx_sender, &config)?;
let config = executor.config();
let services = context.services();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl BackgroundTask {
/// Constructs the full background tasks for transaction processing.
pub(crate) fn transaction_pipe(
mempool_client_receiver: futures_mpsc::Receiver<MempoolClientRequest>,
transaction_sender: mpsc::Sender<SignedTransaction>,
transaction_sender: mpsc::Sender<(u64, SignedTransaction)>,
db_reader: Arc<dyn DbReader>,
node_config: &NodeConfig,
mempool_config: &MempoolConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct TransactionPipe {
// The receiver for the mempool client.
mempool_client_receiver: futures_mpsc::Receiver<MempoolClientRequest>,
// Sender for the channel with accepted transactions.
transaction_sender: mpsc::Sender<SignedTransaction>,
transaction_sender: mpsc::Sender<(u64, SignedTransaction)>,
// Access to the ledger DB. TODO: reuse an instance of VMValidator
db_reader: Arc<dyn DbReader>,
// State of the Aptos mempool
Expand All @@ -52,7 +52,7 @@ enum SequenceNumberValidity {
impl TransactionPipe {
pub(crate) fn new(
mempool_client_receiver: futures_mpsc::Receiver<MempoolClientRequest>,
transaction_sender: mpsc::Sender<SignedTransaction>,
transaction_sender: mpsc::Sender<(u64, SignedTransaction)>,
db_reader: Arc<dyn DbReader>,
node_config: &NodeConfig,
mempool_config: &MempoolConfig,
Expand Down Expand Up @@ -204,6 +204,8 @@ impl TransactionPipe {
// Re-create the validator for each Tx because it uses a frozen version of the ledger.
let vm_validator = VMValidator::new(Arc::clone(&self.db_reader));
let tx_result = vm_validator.validate_transaction(transaction.clone())?;
// invert the application priority with the u64 max minus the score from aptos (which is high to low)
let application_priority = u64::MAX - tx_result.score();
match tx_result.status() {
Some(_) => {
let ms = MempoolStatus::new(MempoolStatusCode::VmError);
Expand Down Expand Up @@ -238,7 +240,7 @@ impl TransactionPipe {
let sender = transaction.sender();
let transaction_sequence_number = transaction.sequence_number();
self.transaction_sender
.send(transaction)
.send((application_priority, transaction))
.await
.map_err(|e| anyhow::anyhow!("Error sending transaction: {:?}", e))?;
// increment transactions in flight
Expand Down Expand Up @@ -291,7 +293,7 @@ mod tests {
use maptos_execution_util::config::chain::Config;
use tempfile::TempDir;

fn setup() -> (Context, TransactionPipe, mpsc::Receiver<SignedTransaction>, TempDir) {
fn setup() -> (Context, TransactionPipe, mpsc::Receiver<(u64, SignedTransaction)>, TempDir) {
let (tx_sender, tx_receiver) = mpsc::channel(16);
let (executor, tempdir) = Executor::try_test_default(GENESIS_KEYPAIR.0.clone()).unwrap();
let (context, background) = executor.background(tx_sender).unwrap();
Expand Down Expand Up @@ -333,7 +335,7 @@ mod tests {

// receive the transaction
let received_transaction = tx_receiver.recv().await.unwrap();
assert_eq!(received_transaction, user_transaction);
assert_eq!(received_transaction.1, user_transaction);

Ok(())
}
Expand Down Expand Up @@ -385,7 +387,7 @@ mod tests {
// receive the transaction
let received_transaction =
tx_receiver.recv().await.ok_or(anyhow::anyhow!("No transaction received"))?;
assert_eq!(received_transaction, user_transaction);
assert_eq!(received_transaction.1, user_transaction);

// send the same transaction again
let (req_sender, callback) = oneshot::channel();
Expand Down Expand Up @@ -424,7 +426,7 @@ mod tests {
let request = SubmitTransactionPost::Bcs(aptos_api::bcs_payload::Bcs(bcs_user_transaction));
api.transactions.submit_transaction(AcceptType::Bcs, request).await?;
let received_transaction = tx_receiver.recv().await.unwrap();
assert_eq!(received_transaction, comparison_user_transaction);
assert_eq!(received_transaction.1, comparison_user_transaction);

mempool_handle.abort();

Expand Down Expand Up @@ -457,7 +459,7 @@ mod tests {
api.transactions.submit_transaction(AcceptType::Bcs, request).await?;

let received_transaction = tx_receiver.recv().await.unwrap();
let bcs_received_transaction = bcs::to_bytes(&received_transaction)?;
let bcs_received_transaction = bcs::to_bytes(&received_transaction.1)?;
comparison_user_transactions.insert(bcs_received_transaction.clone());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl Executor {
/// task needs to be running.
pub fn background(
&self,
transaction_sender: mpsc::Sender<SignedTransaction>,
transaction_sender: mpsc::Sender<(u64, SignedTransaction)>,
) -> anyhow::Result<(Context, BackgroundTask)> {
let node_config = self.node_config.clone();
let maptos_config = self.config.clone();
Expand Down
2 changes: 1 addition & 1 deletion protocol-units/execution/opt-executor/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ mod tests {
assert_eq!(status.code, MempoolStatusCode::Accepted);

// receive the transaction
let received_transaction = tx_receiver.recv().await.unwrap();
let (_priority, received_transaction) = tx_receiver.recv().await.unwrap();
assert_eq!(received_transaction, user_transaction);

handle.abort();
Expand Down
Loading

0 comments on commit f8e6e30

Please sign in to comment.