Skip to content

Commit

Permalink
Fix channel upgrade logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ljoss17 committed Jan 18, 2024
1 parent a9357fa commit b8f4fba
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 93 deletions.
150 changes: 74 additions & 76 deletions crates/relayer/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,13 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {
chain: ChainA,
counterparty_chain: ChainB,
channel: WorkerChannelObject,
height: Height,
) -> Result<(Channel<ChainA, ChainB>, State), ChannelError> {
let (a_channel, _) = chain
.query_channel(
QueryChannelRequest {
port_id: channel.src_port_id.clone(),
channel_id: channel.src_channel_id.clone(),
height: QueryHeight::Specific(height),
height: QueryHeight::Latest,
},
// IncludeProof::Yes forces a new query when the CachingChainHandle
// is used.
Expand Down Expand Up @@ -394,7 +393,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {
QueryChannelRequest {
port_id: a_channel.remote.port_id.clone(),
channel_id: a_channel.remote.channel_id.clone().unwrap(),
height: QueryHeight::Specific(height),
height: QueryHeight::Latest,
},
// IncludeProof::Yes forces a new query when the CachingChainHandle
// is used.
Expand Down Expand Up @@ -789,7 +788,6 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {
&mut self,
state: State,
) -> Result<(Option<IbcEvent>, Next), ChannelError> {
tracing::warn!("self state `{}` other state `{:#?}`", state, self.counterparty_state());
let event = match (state, self.counterparty_state()?) {
// Open handshake steps
(State::Init, State::Uninitialized) => Some(self.build_chan_open_try_and_send()?),
Expand Down Expand Up @@ -826,50 +824,54 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {
None => Some(self.build_chan_upgrade_cancel_and_send()?),
}
}
/*(State::Flushing, State::Open(UpgradeState::Upgrading)) => {
match self.build_chan_upgrade_ack_and_send()? {
Some(event) => Some(event),
None => Some(self.flipped().build_chan_upgrade_cancel_and_send()?),
}
}*/
(State::Flushing, State::Flushing) => match self.build_chan_upgrade_ack_and_send()? {
Some(event) => Some(event),
None => Some(self.flipped().build_chan_upgrade_cancel_and_send()?),
},
(State::Flushcomplete, State::Flushcomplete) => {
Some(self.build_chan_upgrade_open_and_send()?)
}
(State::Flushcomplete, State::Flushing) => {
match self.build_chan_upgrade_confirm_and_send()? {
Some(event) => Some(event),
None => Some(self.flipped().build_chan_upgrade_cancel_and_send()?),
}
}
(State::Flushing, State::Open(UpgradeState::Upgrading)) => {

let dst_latest_height = self
.dst_chain()
.query_latest_height()
.map_err(|e| ChannelError::chain_query(self.dst_chain().id(), e))?;
let src_latest_height = self
.src_chain()
.query_latest_height()
.map_err(|e| ChannelError::chain_query(self.src_chain().id(), e))?;
let (error_receipt, _) = self
.dst_chain()
.query_upgrade_error(
QueryUpgradeErrorRequest {
port_id: self.dst_port_id().to_string(),
channel_id: self.dst_channel_id().unwrap().to_string(),
},
dst_latest_height,
)
.map_err(|e| ChannelError::chain_query(self.src_chain().id(), e))?;

let (channel_end, _) = self
.src_chain()
.query_channel(
QueryChannelRequest {
port_id: self.src_port_id().clone(),
channel_id: self.src_channel_id().unwrap().clone(),
height: QueryHeight::Specific(src_latest_height),
},
IncludeProof::Yes,
)
.map_err(|e| ChannelError::query(self.src_chain().id(), e))?;
// Verify if an ErrorReceipt for the Upgrade exists on the Chain which
// has the channel end Open. If it is the case an UpgradeCancel needs to
// be relayed to the Chain with the channel end Flushing.
// Else relay the UpgradeAck.
let dst_latest_height = self
.dst_chain()
.query_latest_height()
.map_err(|e| ChannelError::chain_query(self.dst_chain().id(), e))?;
let src_latest_height = self
.src_chain()
.query_latest_height()
.map_err(|e| ChannelError::chain_query(self.src_chain().id(), e))?;
let (error_receipt, _) = self
.dst_chain()
.query_upgrade_error(
QueryUpgradeErrorRequest {
port_id: self.dst_port_id().to_string(),
channel_id: self.dst_channel_id().unwrap().to_string(),
},
dst_latest_height,
)
.map_err(|e| ChannelError::chain_query(self.src_chain().id(), e))?;

let (channel_end, _) = self
.src_chain()
.query_channel(
QueryChannelRequest {
port_id: self.src_port_id().clone(),
channel_id: self.src_channel_id().unwrap().clone(),
height: QueryHeight::Specific(src_latest_height),
},
IncludeProof::Yes,
)
.map_err(|e| ChannelError::query(self.src_chain().id(), e))?;

if error_receipt.sequence == channel_end.upgrade_sequence {
Some(self.flipped().build_chan_upgrade_cancel_and_send()?)
Expand All @@ -881,50 +883,46 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {
}
}
(State::Open(UpgradeState::NotUpgrading), State::Flushing) => {

let dst_latest_height = self
.dst_chain()
.query_latest_height()
.map_err(|e| ChannelError::chain_query(self.dst_chain().id(), e))?;
let src_latest_height = self
.src_chain()
.query_latest_height()
.map_err(|e| ChannelError::chain_query(self.src_chain().id(), e))?;
let (error_receipt, _) = self
.src_chain()
.query_upgrade_error(
QueryUpgradeErrorRequest {
port_id: self.src_port_id().to_string(),
channel_id: self.src_channel_id().unwrap().to_string(),
},
src_latest_height,
)
.map_err(|e| ChannelError::chain_query(self.src_chain().id(), e))?;

let (channel_end, _) = self
.dst_chain()
.query_channel(
QueryChannelRequest {
port_id: self.dst_port_id().clone(),
channel_id: self.dst_channel_id().unwrap().clone(),
height: QueryHeight::Specific(dst_latest_height),
},
IncludeProof::Yes,
)
.map_err(|e| ChannelError::query(self.src_chain().id(), e))?;
let dst_latest_height = self
.dst_chain()
.query_latest_height()
.map_err(|e| ChannelError::chain_query(self.dst_chain().id(), e))?;
let src_latest_height = self
.src_chain()
.query_latest_height()
.map_err(|e| ChannelError::chain_query(self.src_chain().id(), e))?;
let (error_receipt, _) = self
.src_chain()
.query_upgrade_error(
QueryUpgradeErrorRequest {
port_id: self.src_port_id().to_string(),
channel_id: self.src_channel_id().unwrap().to_string(),
},
src_latest_height,
)
.map_err(|e| ChannelError::chain_query(self.src_chain().id(), e))?;

let (channel_end, _) = self
.dst_chain()
.query_channel(
QueryChannelRequest {
port_id: self.dst_port_id().clone(),
channel_id: self.dst_channel_id().unwrap().clone(),
height: QueryHeight::Specific(dst_latest_height),
},
IncludeProof::Yes,
)
.map_err(|e| ChannelError::query(self.dst_chain().id(), e))?;

if error_receipt.sequence == channel_end.upgrade_sequence {
Some(self.build_chan_upgrade_cancel_and_send()?)
} else {
match self.build_chan_upgrade_ack_and_send()? {
match self.flipped().build_chan_upgrade_ack_and_send()? {
Some(event) => Some(event),
None => Some(self.build_chan_upgrade_cancel_and_send()?),
}
}
}
/*(State::Flushcomplete, State::Open(UpgradeState::Upgrading)) => {
Some(self.flipped().build_chan_upgrade_open_and_send()?)
}*/
(State::Open(UpgradeState::Upgrading), State::Flushcomplete) => {
Some(self.build_chan_upgrade_open_and_send()?)
}
Expand Down
3 changes: 0 additions & 3 deletions crates/relayer/src/supervisor/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,9 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> {
.channel_end
.is_upgrading(&channel_scan.counterparty);

tracing::warn!("is_channel_upgrading: {is_channel_upgrading}");

if (mode.clients.enabled || mode.packets.enabled)
&& chan_state_src.is_open()
&& (chan_state_dst.is_open() || chan_state_dst.is_closed())
//&& !is_channel_upgrading
{
if mode.clients.enabled {
// Spawn the client worker
Expand Down
6 changes: 0 additions & 6 deletions crates/relayer/src/worker/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ pub fn spawn_channel_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
chains.a.clone(),
chains.b.clone(),
channel.clone(),
event_with_height.height,
) {
Ok((mut handshake_channel, _)) => handshake_channel
.step_event(&event_with_height.event, index),
Expand Down Expand Up @@ -94,18 +93,13 @@ pub fn spawn_channel_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
} if complete_handshake_on_new_block => {
debug!("starts processing block event at {:#?}", current_height);

let height = current_height
.decrement()
.map_err(|e| TaskError::Fatal(RunError::ics02(e)))?;

complete_handshake_on_new_block = false;
retry_with_index(
channel_handshake_retry::default_strategy(max_block_times),
|index| match RelayChannel::restore_from_state(
chains.a.clone(),
chains.b.clone(),
channel.clone(),
height,
) {
Ok((mut handshake_channel, state)) => {
handshake_channel.step_state(state, index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use ibc_test_framework::chain::config::{set_max_deposit_period, set_voting_perio
use ibc_test_framework::prelude::*;
use ibc_test_framework::relayer::channel::{
assert_eventually_channel_established, assert_eventually_channel_upgrade_ack,
assert_eventually_channel_upgrade_cancel, assert_eventually_channel_upgrade_open,
assert_eventually_channel_upgrade_try, ChannelUpgradableAttributes, assert_eventually_channel_upgrade_flushing,
assert_eventually_channel_upgrade_cancel, assert_eventually_channel_upgrade_flushing,
assert_eventually_channel_upgrade_open, assert_eventually_channel_upgrade_try,
ChannelUpgradableAttributes,
};

#[test]
Expand Down Expand Up @@ -628,9 +629,6 @@ impl BinaryChannelTest for ChannelUpgradeHandshakeTimeoutWhenFlushingHandshake {
)?;

info!("Will initialise upgrade handshake with governance proposal...");
warn!("id : {}", chains.node_a.chain_driver().chain_id());
warn!("port id: {}", channel.src_port_id());
warn!("channel id: {}", channel.src_channel_id().unwrap());

chains.node_a.chain_driver().initialise_channel_upgrade(
channel.src_port_id().as_str(),
Expand Down Expand Up @@ -708,7 +706,7 @@ impl BinaryChannelTest for ChannelUpgradeHandshakeTimeoutWhenFlushingHandshake {
IbcEventType::UpgradeTimeoutChannel
);

relayer.with_supervisor(||{
relayer.with_supervisor(|| {
info!("Check that the step ChanUpgradeTimeout was correctly executed...");

assert_eventually_channel_upgrade_cancel(
Expand All @@ -718,7 +716,7 @@ impl BinaryChannelTest for ChannelUpgradeHandshakeTimeoutWhenFlushingHandshake {
&channels.port_b.as_ref(),
&old_attrs.flipped(),
)?;

Ok(())
})
}
Expand Down
2 changes: 1 addition & 1 deletion tools/test-framework/src/chain/ext/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ impl ChainBootstrapMethodsExt for ChainDriver {
assert_eventually_succeed(
&format!("proposal `{}` status: {}", proposal_id, status.as_str()),
10,
Duration::from_secs(2),
Duration::from_secs(3),
|| match query_gov_proposal(
chain_id,
command_path,
Expand Down

0 comments on commit b8f4fba

Please sign in to comment.