-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
0xmovses/aptos event monitoring #388
Conversation
client.counterparty_address, | ||
struct_tag.as_str(), | ||
"bridge_transfer_assets_locked", | ||
Some(1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it's polling the first page, unlimited. Taking a look at the aptos method,
async fn get_bcs_with_page(
&self,
url: Url,
start: Option<u64>,
limit: Option<u16>,
) -> AptosResult<Response<bytes::Bytes>> {
let mut request = self.inner.get(url).header(ACCEPT, BCS);
if let Some(start) = start {
request = request.query(&[("start", start)])
}
if let Some(limit) = limit {
request = request.query(&[("limit", limit)])
}
let response = request.send().await?;
self.check_and_parse_bcs_response(response).await
}
```
not sure if pagination is being handled correctly as you'd want to use the previous start as the limit. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good point. I can only meaningfully tweak this once I have the integration test so I can see what behaviour its giving me. I can try here but I'd be flying blind. But I have an idea for how to handle pagination within the Stream
.map_err(|e| Error::msg(e.to_string()))?; | ||
|
||
// Process responses and return results | ||
let locked_events = process_response(locked_response, CounterpartyEventKind::Locked) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like process_response could be process_responses instead and return a tuple of locked_Events, completed_eventes and cancelled_events
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, pending completion of the outstanding todos and integration testing
_rest_url: &str, | ||
_listener: UnboundedReceiver<MovementChainEvent<MovementAddress, MovementHash>>, | ||
) -> Result<Self, anyhow::Error> { | ||
todo!() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps it would make sense to list the todos in the Outstanding Issues section of the PR?
closing as all these changes are in #399 |
Summary
Implements
Stream
forMovementCounterpartyMonitoring
. We cannot subscribe to a ws on aptos-core because this functionality is not provided out-of-the-box, additionally the indexer only streams transactions no events. So we must query the rest end point directly.MovementCounterpartyMonitoring
must beStream
in order for the BridgeService to correctly operate, this is exemplified in theshared/tests
code, it is also a trait bound.Changelog
Stream
forMovementCounterpartyMonitoring
Testing
More top-level code needs to be added before I can integrate test this. In an attempt to keep the PR small. Will add that in a separate PR.