1
+ use crate :: { CommitmentStream , McrSettlementClientOperations } ;
1
2
use movement_types:: BlockCommitment ;
2
3
use std:: collections:: HashMap ;
3
4
use std:: sync:: Arc ;
4
- use tokio:: sync:: { Mutex , mpsc} ;
5
- use crate :: { McrSettlementClientOperations , CommitmentStream } ;
5
+ use tokio:: sync:: { mpsc, Mutex } ;
6
6
7
7
pub struct McrSettlementClient {
8
- commitments : Arc < Mutex < HashMap < u64 , BlockCommitment > > > ,
9
- stream_sender : mpsc:: Sender < Result < BlockCommitment , anyhow:: Error > > ,
10
- // todo: this is logically dangerous, but it's just a stub
11
- stream_receiver : Arc < Mutex < mpsc:: Receiver < Result < BlockCommitment , anyhow:: Error > > > > ,
8
+ commitments : Arc < Mutex < HashMap < u64 , BlockCommitment > > > ,
9
+ stream_sender : mpsc:: Sender < Result < BlockCommitment , anyhow:: Error > > ,
10
+ // todo: this is logically dangerous, but it's just a stub
11
+ stream_receiver : Arc < Mutex < mpsc:: Receiver < Result < BlockCommitment , anyhow:: Error > > > > ,
12
12
}
13
13
14
14
impl McrSettlementClient {
15
- pub fn new ( ) -> Self {
16
- let ( stream_sender, receiver) = mpsc:: channel ( 10 ) ;
17
- McrSettlementClient {
18
- commitments : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
19
- stream_sender,
20
- stream_receiver : Arc :: new ( Mutex :: new ( receiver) ) ,
21
- }
22
- }
15
+ pub fn new ( ) -> Self {
16
+ let ( stream_sender, receiver) = mpsc:: channel ( 10 ) ;
17
+ McrSettlementClient {
18
+ commitments : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
19
+ stream_sender,
20
+ stream_receiver : Arc :: new ( Mutex :: new ( receiver) ) ,
21
+ }
22
+ }
23
23
}
24
24
25
-
26
25
#[ tonic:: async_trait]
27
26
impl McrSettlementClientOperations for McrSettlementClient {
27
+ async fn post_block_commitment (
28
+ & self ,
29
+ block_commitment : BlockCommitment ,
30
+ ) -> Result < ( ) , anyhow:: Error > {
31
+ let mut commitments = self . commitments . lock ( ) . await ;
32
+ commitments. insert ( block_commitment. height , block_commitment. clone ( ) ) ;
33
+ self . stream_sender . send ( Ok ( block_commitment) ) . await ?; // Simulate sending to the stream.
34
+ Ok ( ( ) )
35
+ }
28
36
29
- async fn post_block_commitment ( & self , block_commitment : BlockCommitment ) -> Result < ( ) , anyhow:: Error > {
30
- let mut commitments = self . commitments . lock ( ) . await ;
31
- commitments. insert ( block_commitment. height , block_commitment. clone ( ) ) ;
32
- self . stream_sender . send ( Ok ( block_commitment) ) . await ?; // Simulate sending to the stream.
33
- Ok ( ( ) )
34
- }
35
-
36
- async fn stream_block_commitments ( & self ) -> Result <
37
- CommitmentStream ,
38
- anyhow:: Error
39
- > {
37
+ async fn stream_block_commitments ( & self ) -> Result < CommitmentStream , anyhow:: Error > {
38
+ let receiver = self . stream_receiver . clone ( ) ;
39
+ let stream = async_stream:: try_stream! {
40
+ let mut receiver = receiver. lock( ) . await ;
41
+ while let Some ( commitment) = receiver. recv( ) . await {
42
+ yield commitment?;
43
+ }
44
+ } ;
40
45
41
- let receiver = self . stream_receiver . clone ( ) ;
42
- let stream = async_stream:: try_stream! {
43
- let mut receiver = receiver. lock( ) . await ;
44
- while let Some ( commitment) = receiver. recv( ) . await {
45
- yield commitment?;
46
- }
47
- } ;
46
+ Ok ( Box :: pin ( stream) as CommitmentStream )
47
+ }
48
48
49
- Ok ( Box :: pin ( stream ) as CommitmentStream )
50
- }
51
-
52
- async fn get_commitment_at_height ( & self , height : u64 ) -> Result < Option < BlockCommitment > , anyhow:: Error > {
53
- let guard = self . commitments . lock ( ) . await ;
54
- Ok ( guard. get ( & height) . cloned ( ) )
55
- }
49
+ async fn get_commitment_at_height (
50
+ & self ,
51
+ height : u64 ,
52
+ ) -> Result < Option < BlockCommitment > , anyhow:: Error > {
53
+ let guard = self . commitments . lock ( ) . await ;
54
+ Ok ( guard. get ( & height) . cloned ( ) )
55
+ }
56
56
}
57
57
58
58
#[ cfg( test) ]
59
59
pub mod test {
60
60
61
- use super :: * ;
62
- use movement_types:: Commitment ;
63
- use tokio_stream:: StreamExt ;
64
-
65
- #[ tokio:: test]
66
- async fn test_post_block_commitment ( ) -> Result < ( ) , anyhow:: Error > {
67
- let client = McrSettlementClient :: new ( ) ;
68
- let commitment = BlockCommitment {
69
- height : 1 ,
70
- block_id : Default :: default ( ) ,
71
- commitment : Commitment :: test ( ) ,
72
- } ;
73
- client. post_block_commitment ( commitment. clone ( ) ) . await . unwrap ( ) ;
74
- let guard = client. commitments . lock ( ) . await ;
75
- assert_eq ! ( guard. get( & 1 ) , Some ( & commitment) ) ;
76
- Ok ( ( ) )
77
- }
61
+ use super :: * ;
62
+ use movement_types:: Commitment ;
63
+ use tokio_stream:: StreamExt ;
78
64
79
- #[ tokio:: test]
80
- async fn test_stream_block_commitments ( ) -> Result < ( ) , anyhow:: Error > {
81
- let client = McrSettlementClient :: new ( ) ;
82
- let commitment = BlockCommitment {
83
- height : 1 ,
84
- block_id : Default :: default ( ) ,
85
- commitment : Commitment :: test ( ) ,
86
- } ;
87
- client. post_block_commitment ( commitment. clone ( ) ) . await . unwrap ( ) ;
88
- let mut stream = client. stream_block_commitments ( ) . await ? ;
89
- assert_eq ! ( stream . next ( ) . await . unwrap ( ) . unwrap ( ) , commitment) ;
90
- Ok ( ( ) )
91
- }
65
+ #[ tokio:: test]
66
+ async fn test_post_block_commitment ( ) -> Result < ( ) , anyhow:: Error > {
67
+ let client = McrSettlementClient :: new ( ) ;
68
+ let commitment = BlockCommitment {
69
+ height : 1 ,
70
+ block_id : Default :: default ( ) ,
71
+ commitment : Commitment :: test ( ) ,
72
+ } ;
73
+ client. post_block_commitment ( commitment. clone ( ) ) . await . unwrap ( ) ;
74
+ let guard = client. commitments . lock ( ) . await ;
75
+ assert_eq ! ( guard . get ( & 1 ) , Some ( & commitment) ) ;
76
+ Ok ( ( ) )
77
+ }
92
78
93
- }
79
+ #[ tokio:: test]
80
+ async fn test_stream_block_commitments ( ) -> Result < ( ) , anyhow:: Error > {
81
+ let client = McrSettlementClient :: new ( ) ;
82
+ let commitment = BlockCommitment {
83
+ height : 1 ,
84
+ block_id : Default :: default ( ) ,
85
+ commitment : Commitment :: test ( ) ,
86
+ } ;
87
+ client. post_block_commitment ( commitment. clone ( ) ) . await . unwrap ( ) ;
88
+ let mut stream = client. stream_block_commitments ( ) . await ?;
89
+ assert_eq ! ( stream. next( ) . await . unwrap( ) . unwrap( ) , commitment) ;
90
+ Ok ( ( ) )
91
+ }
92
+ }
0 commit comments