@@ -50,9 +50,11 @@ type EthereumSub struct {
50
50
TxBldr authtypes.TxBuilder
51
51
PrivateKey * ecdsa.PrivateKey
52
52
TempPassword string
53
+ EventsBuffer types.EthEventBuffer
53
54
Logger tmLog.Logger
54
55
}
55
56
57
+ // NewKeybase create a new keybase instance
56
58
func NewKeybase (validatorMoniker , mnemonic , password string ) (keys.Keybase , keys.Info , error ) {
57
59
keybase := keys .NewInMemory ()
58
60
hdpath := * hd .NewFundraiserParams (0 , sdk .CoinType , 0 )
@@ -97,6 +99,7 @@ func NewEthereumSub(inBuf io.Reader, rpcURL string, cdc *codec.Codec, validatorM
97
99
TxBldr : txBldr ,
98
100
PrivateKey : privateKey ,
99
101
TempPassword : tempPassword ,
102
+ EventsBuffer : types .NewEthEventBuffer (),
100
103
Logger : logger ,
101
104
}, nil
102
105
}
@@ -158,8 +161,16 @@ func (sub EthereumSub) Start(completionEvent *sync.WaitGroup) {
158
161
bridgeBankAddress , subBridgeBank := sub .startContractEventSub (logs , client , txs .BridgeBank )
159
162
defer subBridgeBank .Unsubscribe ()
160
163
bridgeBankContractABI := contract .LoadABI (txs .BridgeBank )
161
- eventLogLockSignature := bridgeBankContractABI .Events [types .LogLock .String ()].ID ().Hex ()
162
- eventLogBurnSignature := bridgeBankContractABI .Events [types .LogBurn .String ()].ID ().Hex ()
164
+
165
+ // Listen the new header
166
+ heads := make (chan * ctypes.Header )
167
+ defer close (heads )
168
+ subHead , err := client .SubscribeNewHead (context .Background (), heads )
169
+ if err != nil {
170
+ log .Println (err )
171
+ return
172
+ }
173
+ defer subHead .Unsubscribe ()
163
174
164
175
for {
165
176
select {
@@ -171,26 +182,47 @@ func (sub EthereumSub) Start(completionEvent *sync.WaitGroup) {
171
182
completionEvent .Add (1 )
172
183
go sub .Start (completionEvent )
173
184
return
185
+ case err := <- subHead .Err ():
186
+ sub .Logger .Error (err .Error ())
187
+ completionEvent .Add (1 )
188
+ go sub .Start (completionEvent )
189
+ return
190
+ case newHead := <- heads :
191
+ sub .Logger .Info (fmt .Sprintf ("New header %d with hash %v" , newHead .Number , newHead .Hash ()))
192
+
193
+ // Add new header info to buffer
194
+ sub .EventsBuffer .AddHeader (newHead .Number , newHead .Hash (), newHead .ParentHash )
195
+
196
+ for {
197
+ fifty := big .NewInt (50 )
198
+ fifty .Add (fifty , sub .EventsBuffer .MinHeight )
199
+ if fifty .Cmp (newHead .Number ) <= 0 {
200
+ events := sub .EventsBuffer .GetHeaderEvents ()
201
+ for _ , event := range events {
202
+ err := sub .handleEthereumEvent (event )
203
+
204
+ if err != nil {
205
+ sub .Logger .Error (err .Error ())
206
+ completionEvent .Add (1 )
207
+ }
208
+ }
209
+
210
+ sub .EventsBuffer .RemoveHeight ()
211
+
212
+ } else {
213
+ break
214
+ }
215
+ }
216
+
174
217
// vLog is raw event data
175
218
case vLog := <- logs :
176
219
sub .Logger .Info (fmt .Sprintf ("Witnessed tx %s on block %d\n " , vLog .TxHash .Hex (), vLog .BlockNumber ))
177
- log .Println ("Found event from the ethereum bridgebank contract: " , types .LogLock .String ())
178
- var err error
179
- switch vLog .Topics [0 ].Hex () {
180
- case eventLogBurnSignature :
181
- err = sub .handleEthereumEvent (clientChainID , bridgeBankAddress , bridgeBankContractABI ,
182
- types .LogBurn .String (), vLog )
183
- case eventLogLockSignature :
184
- log .Println ("Found loglock event from the ethereum bridgebank contract: " , types .LogLock .String ())
185
- err = sub .handleEthereumEvent (clientChainID , bridgeBankAddress , bridgeBankContractABI ,
186
- types .LogLock .String (), vLog )
187
- }
188
- // TODO: Check local events store for status, if retryable, attempt relay again
220
+ event , isBurnLock , err := sub .logToEvent (clientChainID , bridgeBankAddress , bridgeBankContractABI , vLog )
189
221
if err != nil {
190
- sub .Logger .Error (err . Error () )
191
- completionEvent . Add ( 1 )
192
- go sub .Start ( completionEvent )
193
- return
222
+ sub .Logger .Error ("Failed to get event from ethereum log" )
223
+ } else if isBurnLock {
224
+ sub .Logger . Info ( "Add event into buffer" )
225
+ sub . EventsBuffer . AddEvent ( big . NewInt ( int64 ( vLog . BlockNumber )), vLog . BlockHash , event )
194
226
}
195
227
}
196
228
}
@@ -219,14 +251,31 @@ func (sub EthereumSub) startContractEventSub(logs chan ctypes.Log, client *ethcl
219
251
return subContractAddress , contractSub
220
252
}
221
253
222
- // handleEthereumEvent unpacks an Ethereum event, converts it to a ProphecyClaim, and relays a tx to Cosmos
223
- func (sub EthereumSub ) handleEthereumEvent (clientChainID * big.Int , contractAddress common.Address ,
224
- contractABI abi.ABI , eventName string , cLog ctypes.Log ) error {
254
+ // logToEvent unpacks an Ethereum event
255
+ func (sub EthereumSub ) logToEvent (clientChainID * big.Int , contractAddress common.Address ,
256
+ contractABI abi.ABI , cLog ctypes.Log ) (types. EthereumEvent , bool , error ) {
225
257
// Parse the event's attributes via contract ABI
226
258
event := types.EthereumEvent {}
259
+ eventLogLockSignature := contractABI .Events [types .LogLock .String ()].ID ().Hex ()
260
+ eventLogBurnSignature := contractABI .Events [types .LogBurn .String ()].ID ().Hex ()
261
+
262
+ var eventName string
263
+ switch cLog .Topics [0 ].Hex () {
264
+ case eventLogBurnSignature :
265
+ eventName = types .LogBurn .String ()
266
+ case eventLogLockSignature :
267
+ eventName = types .LogLock .String ()
268
+ }
269
+
270
+ // If event is not expected
271
+ if eventName == "" {
272
+ return event , false , nil
273
+ }
274
+
227
275
err := contractABI .Unpack (& event , eventName , cLog .Data )
228
276
if err != nil {
229
- sub .Logger .Error ("error unpacking: %v" , err )
277
+ sub .Logger .Error (err .Error ())
278
+ return event , false , err
230
279
}
231
280
event .BridgeContractAddress = contractAddress
232
281
event .EthereumChainID = clientChainID
@@ -239,7 +288,11 @@ func (sub EthereumSub) handleEthereumEvent(clientChainID *big.Int, contractAddre
239
288
240
289
// Add the event to the record
241
290
types .NewEventWrite (cLog .TxHash .Hex (), event )
291
+ return event , true , nil
292
+ }
242
293
294
+ // handleEthereumEvent unpacks an Ethereum event, converts it to a ProphecyClaim, and relays a tx to Cosmos
295
+ func (sub EthereumSub ) handleEthereumEvent (event types.EthereumEvent ) error {
243
296
prophecyClaim , err := txs .EthereumEventToEthBridgeClaim (sub .ValidatorAddress , & event )
244
297
if err != nil {
245
298
return err
0 commit comments