11
11
pubsub_client:: PubsubClientError ,
12
12
rpc_config:: { RpcAccountInfoConfig , RpcProgramAccountsConfig } ,
13
13
rpc_filter:: { Memcmp , RpcFilterType } ,
14
+ rpc_response:: { Response , RpcKeyedAccount } ,
14
15
} ,
15
16
solana_sdk:: pubkey:: Pubkey ,
16
17
std:: { fs, str:: FromStr , time:: Duration } ,
@@ -40,6 +41,62 @@ fn find_message_pda(wormhole_pid: &Pubkey, slot: u64) -> Pubkey {
40
41
. 0
41
42
}
42
43
44
+ const FAILED_TO_DECODE : & str = "Failed to decode account data" ;
45
+ const INVALID_UNRELIABLE_DATA_FORMAT : & str = "Invalid unreliable data format" ;
46
+
47
+ #[ derive( Debug ) ]
48
+ enum VerifyUpdateError {
49
+ InvalidMessagePDA ,
50
+ InvalidEmitterChain ,
51
+ InvalidAccumulatorAddress ,
52
+ #[ allow( dead_code) ]
53
+ DecodingError ( String ) ,
54
+ }
55
+
56
+ fn decode_and_verify_update (
57
+ wormhole_pid : & Pubkey ,
58
+ accumulator_address : & Pubkey ,
59
+ update : Response < RpcKeyedAccount > ,
60
+ ) -> Result < PostedMessageUnreliableData , VerifyUpdateError > {
61
+ if find_message_pda ( wormhole_pid, update. context . slot ) . to_string ( ) != update. value . pubkey {
62
+ return Err ( VerifyUpdateError :: InvalidMessagePDA ) ;
63
+ }
64
+ let data = update
65
+ . value
66
+ . account
67
+ . data
68
+ . decode ( )
69
+ . ok_or ( VerifyUpdateError :: DecodingError (
70
+ FAILED_TO_DECODE . to_string ( ) ,
71
+ ) ) ?;
72
+ let unreliable_data: PostedMessageUnreliableData =
73
+ BorshDeserialize :: deserialize ( & mut data. as_slice ( ) ) . map_err ( |e| {
74
+ VerifyUpdateError :: DecodingError ( format ! ( "{}: {}" , INVALID_UNRELIABLE_DATA_FORMAT , e) )
75
+ } ) ?;
76
+
77
+ if Chain :: Pythnet != unreliable_data. emitter_chain . into ( ) {
78
+ return Err ( VerifyUpdateError :: InvalidEmitterChain ) ;
79
+ }
80
+
81
+ if accumulator_address != & Pubkey :: from ( unreliable_data. emitter_address ) {
82
+ return Err ( VerifyUpdateError :: InvalidAccumulatorAddress ) ;
83
+ }
84
+
85
+ Ok ( unreliable_data)
86
+ }
87
+
88
+ fn new_body ( unreliable_data : & PostedMessageUnreliableData ) -> Body < & RawMessage > {
89
+ Body {
90
+ timestamp : unreliable_data. submission_time ,
91
+ nonce : unreliable_data. nonce ,
92
+ emitter_chain : unreliable_data. emitter_chain . into ( ) ,
93
+ emitter_address : Address ( unreliable_data. emitter_address ) ,
94
+ sequence : unreliable_data. sequence ,
95
+ consistency_level : unreliable_data. consistency_level ,
96
+ payload : RawMessage :: new ( unreliable_data. payload . as_slice ( ) ) ,
97
+ }
98
+ }
99
+
43
100
async fn run_listener ( input : RunListenerInput ) -> Result < ( ) , PubsubClientError > {
44
101
let client = PubsubClient :: new ( input. ws_url . as_str ( ) ) . await ?;
45
102
let ( mut stream, unsubscribe) = client
@@ -63,49 +120,22 @@ async fn run_listener(input: RunListenerInput) -> Result<(), PubsubClientError>
63
120
. await ?;
64
121
65
122
while let Some ( update) = stream. next ( ) . await {
66
- if find_message_pda ( & input. wormhole_pid , update. context . slot ) . to_string ( )
67
- != update. value . pubkey
68
- {
69
- continue ; // Skip updates that are not for the expected PDA
70
- }
71
-
72
- let unreliable_data: PostedMessageUnreliableData = {
73
- let data = match update. value . account . data . decode ( ) {
74
- Some ( data) => data,
75
- None => {
76
- tracing:: error!( "Failed to decode account data" ) ;
77
- continue ;
78
- }
79
- } ;
80
-
81
- match BorshDeserialize :: deserialize ( & mut data. as_slice ( ) ) {
123
+ let unreliable_data =
124
+ match decode_and_verify_update ( & input. wormhole_pid , & input. accumulator_address , update)
125
+ {
82
126
Ok ( data) => data,
83
127
Err ( e) => {
84
- tracing:: error!( error = ?e, "Invalid unreliable data format" ) ;
128
+ if !matches ! ( e, VerifyUpdateError :: InvalidMessagePDA ) {
129
+ tracing:: error!( error = ?e, "Received an invalid update" ) ;
130
+ }
85
131
continue ;
86
132
}
87
- }
88
- } ;
89
-
90
- if Chain :: Pythnet != unreliable_data. emitter_chain . into ( ) {
91
- continue ;
92
- }
93
- if input. accumulator_address != Pubkey :: from ( unreliable_data. emitter_address ) {
94
- continue ;
95
- }
133
+ } ;
96
134
97
135
tokio:: spawn ( {
98
136
let api_client = input. api_client . clone ( ) ;
99
137
async move {
100
- let body = Body {
101
- timestamp : unreliable_data. submission_time ,
102
- nonce : unreliable_data. nonce ,
103
- emitter_chain : unreliable_data. emitter_chain . into ( ) ,
104
- emitter_address : Address ( unreliable_data. emitter_address ) ,
105
- sequence : unreliable_data. sequence ,
106
- consistency_level : unreliable_data. consistency_level ,
107
- payload : RawMessage :: new ( unreliable_data. payload . as_slice ( ) ) ,
108
- } ;
138
+ let body = new_body ( & unreliable_data) ;
109
139
match Observation :: try_new ( body. clone ( ) , input. secret_key ) {
110
140
Ok ( observation) => {
111
141
if let Err ( e) = api_client. post_observation ( observation) . await {
@@ -171,3 +201,202 @@ async fn main() {
171
201
}
172
202
}
173
203
}
204
+
205
+ #[ cfg( test) ]
206
+ mod tests {
207
+ use base64:: Engine ;
208
+ use borsh:: BorshSerialize ;
209
+ use solana_account_decoder:: { UiAccount , UiAccountData } ;
210
+
211
+ use super :: * ;
212
+ use crate :: posted_message:: MessageData ;
213
+
214
+ fn get_wormhole_pid ( ) -> Pubkey {
215
+ Pubkey :: from_str ( "H3fxXJ86ADW2PNuDDmZJg6mzTtPxkYCpNuQUTgmJ7AjU" ) . unwrap ( )
216
+ }
217
+
218
+ fn get_accumulator_address ( ) -> Pubkey {
219
+ Pubkey :: from_str ( "G9LV2mp9ua1znRAfYwZz5cPiJMAbo1T6mbjdQsDZuMJg" ) . unwrap ( )
220
+ }
221
+
222
+ fn get_payload ( ) -> Vec < u8 > {
223
+ vec ! [
224
+ 65 , 85 , 87 , 86 , 0 , 0 , 0 , 0 , 0 , 13 , 74 , 15 , 90 , 0 , 0 , 39 , 16 , 172 , 145 , 156 , 108 , 253 ,
225
+ 178 , 4 , 138 , 51 , 74 , 110 , 116 , 101 , 139 , 121 , 254 , 152 , 165 , 24 , 190 ,
226
+ ]
227
+ }
228
+
229
+ fn get_unreliable_data ( ) -> PostedMessageUnreliableData {
230
+ PostedMessageUnreliableData {
231
+ message : MessageData {
232
+ submission_time : 1749732585 ,
233
+ nonce : 0 ,
234
+ emitter_chain : Chain :: Pythnet . into ( ) ,
235
+ emitter_address : [
236
+ 225 , 1 , 250 , 237 , 172 , 88 , 81 , 227 , 43 , 155 , 35 , 181 , 249 , 65 , 26 , 140 , 43 ,
237
+ 172 , 74 , 174 , 62 , 212 , 221 , 123 , 129 , 29 , 209 , 167 , 46 , 164 , 170 , 113 ,
238
+ ] ,
239
+ sequence : 138184361 ,
240
+ consistency_level : 1 ,
241
+ payload : get_payload ( ) ,
242
+ vaa_version : 1 ,
243
+ vaa_time : 0 ,
244
+ vaa_signature_account : [ 0 ; 32 ] ,
245
+ } ,
246
+ }
247
+ }
248
+
249
+ fn get_update ( unreliable_data : PostedMessageUnreliableData ) -> Response < RpcKeyedAccount > {
250
+ let message = unreliable_data. try_to_vec ( ) . unwrap ( ) ;
251
+ let message = base64:: engine:: general_purpose:: STANDARD . encode ( & message) ;
252
+ Response {
253
+ context : solana_client:: rpc_response:: RpcResponseContext {
254
+ slot : 123456 ,
255
+ api_version : None ,
256
+ } ,
257
+ value : RpcKeyedAccount {
258
+ pubkey : find_message_pda ( & get_wormhole_pid ( ) , 123456 ) . to_string ( ) ,
259
+ account : UiAccount {
260
+ lamports : 0 ,
261
+ data : UiAccountData :: Binary ( message, UiAccountEncoding :: Base64 ) ,
262
+ owner : get_accumulator_address ( ) . to_string ( ) ,
263
+ executable : false ,
264
+ rent_epoch : 0 ,
265
+ space : None ,
266
+ } ,
267
+ } ,
268
+ }
269
+ }
270
+
271
+ #[ test]
272
+ fn test_find_message_pda ( ) {
273
+ assert_eq ! (
274
+ find_message_pda( & get_wormhole_pid( ) , 123456 ) . to_string( ) ,
275
+ "Ed9gRoBySmUjSVFxovuhTk6AcFkv9uE8EovvshtHWLNT"
276
+ ) ;
277
+ }
278
+
279
+ #[ test]
280
+ fn test_get_body ( ) {
281
+ let unreliable_data = get_unreliable_data ( ) ;
282
+ let body = new_body ( & unreliable_data) ;
283
+ assert_eq ! ( body. timestamp, unreliable_data. submission_time) ;
284
+ assert_eq ! ( body. nonce, unreliable_data. nonce) ;
285
+ assert_eq ! ( body. emitter_chain, Chain :: Pythnet ) ;
286
+ assert_eq ! (
287
+ body. emitter_address,
288
+ Address ( unreliable_data. emitter_address)
289
+ ) ;
290
+ assert_eq ! ( body. sequence, unreliable_data. sequence) ;
291
+ assert_eq ! ( body. payload, RawMessage :: new( get_payload( ) . as_slice( ) ) ) ;
292
+ }
293
+
294
+ #[ test]
295
+ fn test_decode_and_verify_update ( ) {
296
+ let expected_unreliable_data = get_unreliable_data ( ) ;
297
+ let update = get_update ( expected_unreliable_data. clone ( ) ) ;
298
+ let result =
299
+ decode_and_verify_update ( & get_wormhole_pid ( ) , & get_accumulator_address ( ) , update) ;
300
+
301
+ assert ! ( result. is_ok( ) ) ;
302
+ let unreliable_data = result. unwrap ( ) ;
303
+
304
+ assert_eq ! (
305
+ expected_unreliable_data. consistency_level,
306
+ unreliable_data. consistency_level
307
+ ) ;
308
+ assert_eq ! (
309
+ expected_unreliable_data. emitter_chain,
310
+ unreliable_data. emitter_chain
311
+ ) ;
312
+ assert_eq ! (
313
+ expected_unreliable_data. emitter_address,
314
+ unreliable_data. emitter_address
315
+ ) ;
316
+ assert_eq ! ( expected_unreliable_data. sequence, unreliable_data. sequence) ;
317
+ assert_eq ! (
318
+ expected_unreliable_data. submission_time,
319
+ unreliable_data. submission_time
320
+ ) ;
321
+ assert_eq ! ( expected_unreliable_data. nonce, unreliable_data. nonce) ;
322
+ assert_eq ! ( expected_unreliable_data. payload, unreliable_data. payload) ;
323
+ assert_eq ! (
324
+ expected_unreliable_data. vaa_version,
325
+ unreliable_data. vaa_version
326
+ ) ;
327
+ assert_eq ! ( expected_unreliable_data. vaa_time, unreliable_data. vaa_time) ;
328
+ assert_eq ! (
329
+ expected_unreliable_data. vaa_signature_account,
330
+ unreliable_data. vaa_signature_account
331
+ ) ;
332
+ }
333
+
334
+ #[ test]
335
+ fn test_decode_and_verify_update_invalid_pda ( ) {
336
+ let mut update = get_update ( get_unreliable_data ( ) ) ;
337
+ update. context . slot += 1 ;
338
+ let result =
339
+ decode_and_verify_update ( & get_wormhole_pid ( ) , & get_accumulator_address ( ) , update) ;
340
+ assert ! ( matches!( result, Err ( VerifyUpdateError :: InvalidMessagePDA ) ) ) ;
341
+ }
342
+
343
+ #[ test]
344
+ fn test_decode_and_verify_update_failed_decode ( ) {
345
+ let mut update = get_update ( get_unreliable_data ( ) ) ;
346
+ update. value . account . data =
347
+ UiAccountData :: Binary ( "invalid_base64" . to_string ( ) , UiAccountEncoding :: Base64 ) ;
348
+ let result =
349
+ decode_and_verify_update ( & get_wormhole_pid ( ) , & get_accumulator_address ( ) , update) ;
350
+ assert ! (
351
+ matches!( result, Err ( VerifyUpdateError :: DecodingError ( ref msg) ) if msg == FAILED_TO_DECODE ) ,
352
+ ) ;
353
+ }
354
+
355
+ #[ test]
356
+ fn test_decode_and_verify_update_invalid_unreliable_data ( ) {
357
+ let mut update = get_update ( get_unreliable_data ( ) ) ;
358
+ let message = base64:: engine:: general_purpose:: STANDARD . encode ( vec ! [ 4 , 1 , 2 , 3 , 4 ] ) ;
359
+ update. value . account . data = UiAccountData :: Binary ( message, UiAccountEncoding :: Base64 ) ;
360
+ let result =
361
+ decode_and_verify_update ( & get_wormhole_pid ( ) , & get_accumulator_address ( ) , update) ;
362
+ let error_message = format ! (
363
+ "{}: {}" ,
364
+ INVALID_UNRELIABLE_DATA_FORMAT ,
365
+ "Magic mismatch. Expected [109, 115, 117] but got [4, 1, 2]"
366
+ ) ;
367
+ assert ! (
368
+ matches!( result, Err ( VerifyUpdateError :: DecodingError ( ref msg) )
369
+ if * msg == error_message)
370
+ ) ;
371
+ }
372
+
373
+ #[ test]
374
+ fn test_decode_and_verify_update_invalid_emitter_chain ( ) {
375
+ let mut unreliable_data = get_unreliable_data ( ) ;
376
+ unreliable_data. emitter_chain = Chain :: Solana . into ( ) ;
377
+ let result = decode_and_verify_update (
378
+ & get_wormhole_pid ( ) ,
379
+ & get_accumulator_address ( ) ,
380
+ get_update ( unreliable_data) ,
381
+ ) ;
382
+ assert ! ( matches!(
383
+ result,
384
+ Err ( VerifyUpdateError :: InvalidEmitterChain )
385
+ ) ) ;
386
+ }
387
+
388
+ #[ test]
389
+ fn test_decode_and_verify_update_invalid_emitter_address ( ) {
390
+ let mut unreliable_data = get_unreliable_data ( ) ;
391
+ unreliable_data. emitter_address = Pubkey :: new_unique ( ) . to_bytes ( ) ;
392
+ let result = decode_and_verify_update (
393
+ & get_wormhole_pid ( ) ,
394
+ & get_accumulator_address ( ) ,
395
+ get_update ( unreliable_data) ,
396
+ ) ;
397
+ assert ! ( matches!(
398
+ result,
399
+ Err ( VerifyUpdateError :: InvalidAccumulatorAddress )
400
+ ) ) ;
401
+ }
402
+ }
0 commit comments