Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Executor API cleanup and simplification #61

Merged
merged 2 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions networks/monza/monza-full-node/src/partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,18 @@ impl <T : MonzaExecutor + Send + Sync + Clone>MonzaPartialNode<T> {
}
}

pub async fn bind_transaction_channel(&mut self) -> Result<(), anyhow::Error> {
self.executor.set_tx_channel(self.transaction_sender.clone()).await?;
Ok(())
}

pub async fn bound(executor : T, light_node_client: LightNodeServiceClient<tonic::transport::Channel>) -> Result<Self, anyhow::Error> {
let mut node = Self::new(executor, light_node_client);
node.bind_transaction_channel().await?;
Ok(node)
}
fn bind_transaction_channel(&mut self) {
self.executor.set_tx_channel(self.transaction_sender.clone());
}

pub fn bound(
executor: T,
light_node_client: LightNodeServiceClient<tonic::transport::Channel>,
) -> Result<Self, anyhow::Error> {
let mut node = Self::new(executor, light_node_client);
node.bind_transaction_channel();
Ok(node)
}

pub async fn tick_write_transactions_to_da(&self) -> Result<(), anyhow::Error> {

Expand Down Expand Up @@ -166,7 +168,7 @@ impl <T : MonzaExecutor + Send + Sync + Clone>MonzaPartialNode<T> {
);
let block_id = executable_block.block_id;
self.executor.execute_block(
&FinalityMode::Opt,
FinalityMode::Opt,
executable_block
).await?;

Expand Down Expand Up @@ -225,7 +227,7 @@ impl MonzaPartialNode<MonzaExecutorV1> {
let executor = MonzaExecutorV1::try_from_env(tx).await.context(
"Failed to get executor from environment"
)?;
Self::bound(executor, light_node_client).await
Self::bound(executor, light_node_client)
}

}
26 changes: 14 additions & 12 deletions networks/suzuka/suzuka-full-node/src/partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,18 @@ impl <T : SuzukaExecutor + Send + Sync + Clone>SuzukaPartialNode<T> {
}
}

pub async fn bind_transaction_channel(&mut self) -> Result<(), anyhow::Error> {
self.executor.set_tx_channel(self.transaction_sender.clone()).await?;
Ok(())
}

pub async fn bound(executor : T, light_node_client: LightNodeServiceClient<tonic::transport::Channel>) -> Result<Self, anyhow::Error> {
let mut node = Self::new(executor, light_node_client);
node.bind_transaction_channel().await?;
Ok(node)
}
fn bind_transaction_channel(&mut self) {
self.executor.set_tx_channel(self.transaction_sender.clone());
}

pub fn bound(
executor: T,
light_node_client: LightNodeServiceClient<tonic::transport::Channel>,
) -> Result<Self, anyhow::Error> {
let mut node = Self::new(executor, light_node_client);
node.bind_transaction_channel();
Ok(node)
}

pub async fn tick_write_transactions_to_da(&self) -> Result<(), anyhow::Error> {

Expand Down Expand Up @@ -166,7 +168,7 @@ impl <T : SuzukaExecutor + Send + Sync + Clone>SuzukaPartialNode<T> {
);
let block_id = executable_block.block_id;
self.executor.execute_block(
&FinalityMode::Opt,
FinalityMode::Opt,
executable_block
).await?;

Expand Down Expand Up @@ -225,7 +227,7 @@ impl SuzukaPartialNode<SuzukaExecutorV1> {
let executor = SuzukaExecutorV1::try_from_env(tx).await.context(
"Failed to get executor from environment"
)?;
Self::bound(executor, light_node_client).await
Self::bound(executor, light_node_client)
}

}
19 changes: 8 additions & 11 deletions protocol-units/execution/maptos/opt-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,12 @@ impl Executor {
Ok(())
}

pub async fn try_get_context(&self) -> Result<Arc<Context>, anyhow::Error> {
Ok(self.context.clone())
fn context(&self) -> Arc<Context> {
self.context.clone()
}

pub async fn try_get_apis(&self) -> Result<Apis, anyhow::Error> {
let context = self.try_get_context().await?;
Ok(get_apis(context))
pub fn get_apis(&self) -> Apis {
get_apis(self.context())
}

pub async fn run_service(&self) -> Result<(), anyhow::Error> {
Expand All @@ -313,9 +312,7 @@ impl Executor {

}


let context = self.try_get_context().await?;
let api_service = get_api_service(context).server(
let api_service = get_api_service(self.context()).server(
format!("http://{:?}", self.aptos_config.aptos_rest_listen_url)
);

Expand Down Expand Up @@ -650,7 +647,7 @@ mod tests {
executor.execute_block(block).await?;

// Retrieve the executor's API interface and fetch the transaction by each hash.
let apis = executor.try_get_apis().await?;
let apis = executor.get_apis();
for hash in transaction_hashes {
let _ = apis.transactions.get_transaction_by_hash_inner(
&AcceptType::Bcs,
Expand Down Expand Up @@ -743,7 +740,7 @@ mod tests {
Ok(()) as Result<(), anyhow::Error>
});

let api = executor.try_get_apis().await?;
let api = executor.get_apis();
let user_transaction = create_signed_transaction(0, executor.aptos_config.chain_id.clone());
let comparison_user_transaction = user_transaction.clone();
let bcs_user_transaction = bcs::to_bytes(&user_transaction)?;
Expand Down Expand Up @@ -774,7 +771,7 @@ mod tests {
Ok(()) as Result<(), anyhow::Error>
});

let api = executor.try_get_apis().await?;
let api = executor.get_apis();
let mut user_transactions = BTreeSet::new();
let mut comparison_user_transactions = BTreeSet::new();
for _ in 0..25 {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum FinalityMode {
Dyn,
Opt,
Expand Down
16 changes: 8 additions & 8 deletions protocol-units/execution/monza/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ pub trait MonzaExecutor {
/// Executes a block dynamically
async fn execute_block(
&self,
mode : &FinalityMode,
mode: FinalityMode,
block: ExecutableBlock,
) -> Result<(), anyhow::Error>;

/// Sets the transaction channel.
async fn set_tx_channel(&mut self, tx_channel: Sender<SignedTransaction>) -> Result<(), anyhow::Error>;
/// Sets the transaction channel.
fn set_tx_channel(
&mut self,
tx_channel: Sender<SignedTransaction>,
);

/// Gets the dyn API.
async fn get_api(
&self,
mode : &FinalityMode,
) -> Result<Apis, anyhow::Error>;
/// Gets the dyn API.
fn get_api(&self, mode: FinalityMode) -> Apis;

/// Get block head height.
async fn get_block_head_height(&self) -> Result<u64, anyhow::Error>;
Expand Down
31 changes: 15 additions & 16 deletions protocol-units/execution/monza/executor/src/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl MonzaExecutor for MonzaExecutorV1 {
/// Executes a block dynamically
async fn execute_block(
&self,
mode: &FinalityMode,
mode: FinalityMode,
block: ExecutableBlock,
) -> Result<(), anyhow::Error> {
match mode {
Expand All @@ -57,19 +57,18 @@ impl MonzaExecutor for MonzaExecutorV1 {
}

/// Sets the transaction channel.
async fn set_tx_channel(
fn set_tx_channel(
&mut self,
tx_channel: Sender<SignedTransaction>,
) -> Result<(), anyhow::Error> {
) {
self.transaction_channel = tx_channel;
Ok(())
}

/// Gets the API.
async fn get_api(&self, _mode: &FinalityMode) -> Result<Apis, anyhow::Error> {
match _mode {
fn get_api(&self, mode: FinalityMode) -> Apis {
match mode {
FinalityMode::Dyn => unimplemented!(),
FinalityMode::Opt => Ok(self.executor.try_get_apis().await?),
FinalityMode::Opt => self.executor.get_apis(),
FinalityMode::Fin => unimplemented!(),
}
}
Expand Down Expand Up @@ -131,15 +130,15 @@ mod tests {

#[tokio::test]
async fn test_execute_opt_block() -> Result<(), anyhow::Error> {
let (tx, rx) = async_channel::unbounded();
let mut executor = MonzaExecutorV1::try_from_env(tx).await?;
let (tx, _rx) = async_channel::unbounded();
let executor = MonzaExecutorV1::try_from_env(tx).await?;
let block_id = HashValue::random();
let tx = SignatureVerifiedTransaction::Valid(Transaction::UserTransaction(
create_signed_transaction(0),
));
let txs = ExecutableTransactions::Unsharded(vec![tx]);
let block = ExecutableBlock::new(block_id.clone(), txs);
executor.execute_block(&FinalityMode::Opt, block).await?;
executor.execute_block(FinalityMode::Opt, block).await?;
Ok(())
}

Expand All @@ -166,7 +165,7 @@ mod tests {
let bcs_user_transaction = bcs::to_bytes(&user_transaction)?;

let request = SubmitTransactionPost::Bcs(aptos_api::bcs_payload::Bcs(bcs_user_transaction));
let api = executor.get_api(&FinalityMode::Opt).await?;
let api = executor.get_api(FinalityMode::Opt);
api.transactions.submit_transaction(AcceptType::Bcs, request).await?;

services_handle.abort();
Expand Down Expand Up @@ -200,7 +199,7 @@ mod tests {
let bcs_user_transaction = bcs::to_bytes(&user_transaction)?;

let request = SubmitTransactionPost::Bcs(aptos_api::bcs_payload::Bcs(bcs_user_transaction));
let api = executor.get_api(&FinalityMode::Opt).await?;
let api = executor.get_api(FinalityMode::Opt);
api.transactions.submit_transaction(AcceptType::Bcs, request).await?;

let received_transaction = rx.recv().await?;
Expand All @@ -212,7 +211,7 @@ mod tests {
SignatureVerifiedTransaction::Valid(Transaction::UserTransaction(received_transaction));
let txs = ExecutableTransactions::Unsharded(vec![tx]);
let block = ExecutableBlock::new(block_id.clone(), txs);
executor.execute_block(&FinalityMode::Opt, block).await?;
executor.execute_block(FinalityMode::Opt, block).await?;

services_handle.abort();
background_handle.abort();
Expand Down Expand Up @@ -261,7 +260,7 @@ mod tests {

let request =
SubmitTransactionPost::Bcs(aptos_api::bcs_payload::Bcs(bcs_user_transaction));
let api = executor.get_api(&FinalityMode::Opt).await?;
let api = executor.get_api(FinalityMode::Opt);
api.transactions.submit_transaction(AcceptType::Bcs, request).await?;

let received_transaction = rx.recv().await?;
Expand All @@ -274,7 +273,7 @@ mod tests {
));
let txs = ExecutableTransactions::Unsharded(vec![tx]);
let block = ExecutableBlock::new(block_id.clone(), txs);
executor.execute_block(&FinalityMode::Opt, block).await?;
executor.execute_block(FinalityMode::Opt, block).await?;

blockheight += 1;
committed_blocks.insert(
Expand Down Expand Up @@ -369,7 +368,7 @@ mod tests {
executor.execute_block(block).await?;

// Retrieve the executor's API interface and fetch the transaction by each hash.
let apis = executor.try_get_apis().await?;
let apis = executor.get_apis();
for hash in transaction_hashes {
let _ = apis
.transactions
Expand Down
16 changes: 8 additions & 8 deletions protocol-units/execution/suzuka/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ pub trait SuzukaExecutor {
/// Executes a block dynamically
async fn execute_block(
&self,
mode : &FinalityMode,
mode: FinalityMode,
block: ExecutableBlock,
) -> Result<(), anyhow::Error>;

/// Sets the transaction channel.
async fn set_tx_channel(&mut self, tx_channel: Sender<SignedTransaction>) -> Result<(), anyhow::Error>;
/// Sets the transaction channel.
fn set_tx_channel(
&mut self,
tx_channel: Sender<SignedTransaction>,
);

/// Gets the dyn API.
async fn get_api(
&self,
mode : &FinalityMode,
) -> Result<Apis, anyhow::Error>;
/// Gets the dyn API.
fn get_api(&self, mode: FinalityMode) -> Apis;

/// Get block head height.
async fn get_block_head_height(&self) -> Result<u64, anyhow::Error>;
Expand Down
Loading
Loading