@@ -2,22 +2,26 @@ use crate::{CommitmentStream, McrSettlementClientOperations};
2
2
use movement_types:: BlockCommitment ;
3
3
use std:: collections:: HashMap ;
4
4
use std:: sync:: Arc ;
5
- use tokio:: sync:: { mpsc, Mutex } ;
5
+ use tokio:: sync:: { mpsc, RwLock } ;
6
6
7
7
pub struct McrSettlementClient {
8
- commitments : Arc < Mutex < HashMap < u64 , BlockCommitment > > > ,
8
+ commitments : Arc < RwLock < HashMap < u64 , BlockCommitment > > > ,
9
9
stream_sender : mpsc:: Sender < Result < BlockCommitment , anyhow:: Error > > ,
10
10
// todo: this is logically dangerous, but it's just a stub
11
- stream_receiver : Arc < Mutex < mpsc:: Receiver < Result < BlockCommitment , anyhow:: Error > > > > ,
11
+ stream_receiver : Arc < RwLock < mpsc:: Receiver < Result < BlockCommitment , anyhow:: Error > > > > ,
12
+ pub current_height : Arc < RwLock < u64 > > ,
13
+ pub block_lead_tolerance : u64
12
14
}
13
15
14
16
impl McrSettlementClient {
15
17
pub fn new ( ) -> Self {
16
18
let ( stream_sender, receiver) = mpsc:: channel ( 10 ) ;
17
19
McrSettlementClient {
18
- commitments : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
20
+ commitments : Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ,
19
21
stream_sender,
20
- stream_receiver : Arc :: new ( Mutex :: new ( receiver) ) ,
22
+ stream_receiver : Arc :: new ( RwLock :: new ( receiver) ) ,
23
+ current_height : Arc :: new ( RwLock :: new ( 0 ) ) ,
24
+ block_lead_tolerance : 16 ,
21
25
}
22
26
}
23
27
}
@@ -29,9 +33,22 @@ impl McrSettlementClientOperations for McrSettlementClient {
29
33
& self ,
30
34
block_commitment : BlockCommitment ,
31
35
) -> Result < ( ) , anyhow:: Error > {
32
- let mut commitments = self . commitments . lock ( ) . await ;
33
- commitments. insert ( block_commitment. height , block_commitment. clone ( ) ) ;
34
- self . stream_sender . send ( Ok ( block_commitment) ) . await ?; // Simulate sending to the stream.
36
+
37
+ let height = block_commitment. height ;
38
+
39
+ {
40
+ let mut commitments = self . commitments . write ( ) . await ;
41
+ commitments. insert ( block_commitment. height , block_commitment. clone ( ) ) ;
42
+ self . stream_sender . send ( Ok ( block_commitment) ) . await ?; // Simulate sending to the stream.
43
+ }
44
+
45
+ {
46
+ let mut current_height = self . current_height . write ( ) . await ;
47
+ if height > * current_height {
48
+ * current_height = height;
49
+ }
50
+ }
51
+
35
52
Ok ( ( ) )
36
53
}
37
54
@@ -45,7 +62,7 @@ impl McrSettlementClientOperations for McrSettlementClient {
45
62
async fn stream_block_commitments ( & self ) -> Result < CommitmentStream , anyhow:: Error > {
46
63
let receiver = self . stream_receiver . clone ( ) ;
47
64
let stream = async_stream:: try_stream! {
48
- let mut receiver = receiver. lock ( ) . await ;
65
+ let mut receiver = receiver. write ( ) . await ;
49
66
while let Some ( commitment) = receiver. recv( ) . await {
50
67
yield commitment?;
51
68
}
@@ -57,9 +74,14 @@ impl McrSettlementClientOperations for McrSettlementClient {
57
74
& self ,
58
75
height : u64 ,
59
76
) -> Result < Option < BlockCommitment > , anyhow:: Error > {
60
- let guard = self . commitments . lock ( ) . await ;
77
+ let guard = self . commitments . write ( ) . await ;
61
78
Ok ( guard. get ( & height) . cloned ( ) )
62
79
}
80
+
81
+ async fn get_max_tolerable_block_height ( & self ) -> Result < u64 , anyhow:: Error > {
82
+ Ok ( * self . current_height . read ( ) . await + self . block_lead_tolerance )
83
+ }
84
+
63
85
}
64
86
65
87
#[ cfg( test) ]
@@ -71,15 +93,20 @@ pub mod test {
71
93
72
94
#[ tokio:: test]
73
95
async fn test_post_block_commitment ( ) -> Result < ( ) , anyhow:: Error > {
96
+
74
97
let client = McrSettlementClient :: new ( ) ;
75
98
let commitment = BlockCommitment {
76
99
height : 1 ,
77
100
block_id : Default :: default ( ) ,
78
101
commitment : Commitment :: test ( ) ,
79
102
} ;
80
103
client. post_block_commitment ( commitment. clone ( ) ) . await . unwrap ( ) ;
81
- let guard = client. commitments . lock ( ) . await ;
104
+ let guard = client. commitments . write ( ) . await ;
82
105
assert_eq ! ( guard. get( & 1 ) , Some ( & commitment) ) ;
106
+
107
+ assert_eq ! ( * client. current_height. read( ) . await , 1 ) ;
108
+ assert_eq ! ( client. get_max_tolerable_block_height( ) . await ?, 17 ) ;
109
+
83
110
Ok ( ( ) )
84
111
}
85
112
@@ -100,7 +127,7 @@ pub mod test {
100
127
commitment. clone( ) ,
101
128
commitment2. clone( ) ,
102
129
] ) . await . unwrap ( ) ;
103
- let guard = client. commitments . lock ( ) . await ;
130
+ let guard = client. commitments . write ( ) . await ;
104
131
assert_eq ! ( guard. get( & 1 ) , Some ( & commitment) ) ;
105
132
assert_eq ! ( guard. get( & 2 ) , Some ( & commitment2) ) ;
106
133
Ok ( ( ) )
0 commit comments