Skip to content

Commit 7349f7b

Browse files
juniuszhouElliotFriedmanutx0banshee
authored
feature wait 50 blocks to process Ethereum events (#338)
* create event queue for delay send tx to sifchain. * set delay blocks as 30. * add adr file to describe ebrelayer. * update account according to new init.sh * update docs * new algorithm to handle orphan block. * fix linter issue. * fix two errors. * add scripts to generate 30 new blocks. * print out balance for check. * print out balance for check. * remove the eth lock in integration env, it impact the balance of ceth. * make the transfer script send transaction according to input. * add sleep time to make sure new block produced. * Pull in peggy-basic-test-docker.py and test/integration/peggy-e2e-test.py from develop (commit bb368ec) We're going to do block advancement a different way * remove send transfer, create advanceBlock script * fix the conflict. * Don't wrap block advance test in an exception handler * Require 50 block delay instead of 30 * update docs Co-authored-by: Elliot Friedman <elliotfriedman3@gmail.com> Co-authored-by: utx0_ <90531+utx0@users.noreply.github.com> Co-authored-by: Elliot <34463580+ElliotFriedman@users.noreply.github.com> Co-authored-by: James Moore <james@sifchain.finance>
1 parent 5ae91ae commit 7349f7b

File tree

7 files changed

+317
-46
lines changed

7 files changed

+317
-46
lines changed

Diff for: cmd/ebrelayer/relayer/ethereum.go

+75-22
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,11 @@ type EthereumSub struct {
5050
TxBldr authtypes.TxBuilder
5151
PrivateKey *ecdsa.PrivateKey
5252
TempPassword string
53+
EventsBuffer types.EthEventBuffer
5354
Logger tmLog.Logger
5455
}
5556

57+
// NewKeybase create a new keybase instance
5658
func NewKeybase(validatorMoniker, mnemonic, password string) (keys.Keybase, keys.Info, error) {
5759
keybase := keys.NewInMemory()
5860
hdpath := *hd.NewFundraiserParams(0, sdk.CoinType, 0)
@@ -97,6 +99,7 @@ func NewEthereumSub(inBuf io.Reader, rpcURL string, cdc *codec.Codec, validatorM
9799
TxBldr: txBldr,
98100
PrivateKey: privateKey,
99101
TempPassword: tempPassword,
102+
EventsBuffer: types.NewEthEventBuffer(),
100103
Logger: logger,
101104
}, nil
102105
}
@@ -158,8 +161,16 @@ func (sub EthereumSub) Start(completionEvent *sync.WaitGroup) {
158161
bridgeBankAddress, subBridgeBank := sub.startContractEventSub(logs, client, txs.BridgeBank)
159162
defer subBridgeBank.Unsubscribe()
160163
bridgeBankContractABI := contract.LoadABI(txs.BridgeBank)
161-
eventLogLockSignature := bridgeBankContractABI.Events[types.LogLock.String()].ID().Hex()
162-
eventLogBurnSignature := bridgeBankContractABI.Events[types.LogBurn.String()].ID().Hex()
164+
165+
// Listen the new header
166+
heads := make(chan *ctypes.Header)
167+
defer close(heads)
168+
subHead, err := client.SubscribeNewHead(context.Background(), heads)
169+
if err != nil {
170+
log.Println(err)
171+
return
172+
}
173+
defer subHead.Unsubscribe()
163174

164175
for {
165176
select {
@@ -171,26 +182,47 @@ func (sub EthereumSub) Start(completionEvent *sync.WaitGroup) {
171182
completionEvent.Add(1)
172183
go sub.Start(completionEvent)
173184
return
185+
case err := <-subHead.Err():
186+
sub.Logger.Error(err.Error())
187+
completionEvent.Add(1)
188+
go sub.Start(completionEvent)
189+
return
190+
case newHead := <-heads:
191+
sub.Logger.Info(fmt.Sprintf("New header %d with hash %v", newHead.Number, newHead.Hash()))
192+
193+
// Add new header info to buffer
194+
sub.EventsBuffer.AddHeader(newHead.Number, newHead.Hash(), newHead.ParentHash)
195+
196+
for {
197+
fifty := big.NewInt(50)
198+
fifty.Add(fifty, sub.EventsBuffer.MinHeight)
199+
if fifty.Cmp(newHead.Number) <= 0 {
200+
events := sub.EventsBuffer.GetHeaderEvents()
201+
for _, event := range events {
202+
err := sub.handleEthereumEvent(event)
203+
204+
if err != nil {
205+
sub.Logger.Error(err.Error())
206+
completionEvent.Add(1)
207+
}
208+
}
209+
210+
sub.EventsBuffer.RemoveHeight()
211+
212+
} else {
213+
break
214+
}
215+
}
216+
174217
// vLog is raw event data
175218
case vLog := <-logs:
176219
sub.Logger.Info(fmt.Sprintf("Witnessed tx %s on block %d\n", vLog.TxHash.Hex(), vLog.BlockNumber))
177-
log.Println("Found event from the ethereum bridgebank contract: ", types.LogLock.String())
178-
var err error
179-
switch vLog.Topics[0].Hex() {
180-
case eventLogBurnSignature:
181-
err = sub.handleEthereumEvent(clientChainID, bridgeBankAddress, bridgeBankContractABI,
182-
types.LogBurn.String(), vLog)
183-
case eventLogLockSignature:
184-
log.Println("Found loglock event from the ethereum bridgebank contract: ", types.LogLock.String())
185-
err = sub.handleEthereumEvent(clientChainID, bridgeBankAddress, bridgeBankContractABI,
186-
types.LogLock.String(), vLog)
187-
}
188-
// TODO: Check local events store for status, if retryable, attempt relay again
220+
event, isBurnLock, err := sub.logToEvent(clientChainID, bridgeBankAddress, bridgeBankContractABI, vLog)
189221
if err != nil {
190-
sub.Logger.Error(err.Error())
191-
completionEvent.Add(1)
192-
go sub.Start(completionEvent)
193-
return
222+
sub.Logger.Error("Failed to get event from ethereum log")
223+
} else if isBurnLock {
224+
sub.Logger.Info("Add event into buffer")
225+
sub.EventsBuffer.AddEvent(big.NewInt(int64(vLog.BlockNumber)), vLog.BlockHash, event)
194226
}
195227
}
196228
}
@@ -219,14 +251,31 @@ func (sub EthereumSub) startContractEventSub(logs chan ctypes.Log, client *ethcl
219251
return subContractAddress, contractSub
220252
}
221253

222-
// handleEthereumEvent unpacks an Ethereum event, converts it to a ProphecyClaim, and relays a tx to Cosmos
223-
func (sub EthereumSub) handleEthereumEvent(clientChainID *big.Int, contractAddress common.Address,
224-
contractABI abi.ABI, eventName string, cLog ctypes.Log) error {
254+
// logToEvent unpacks an Ethereum event
255+
func (sub EthereumSub) logToEvent(clientChainID *big.Int, contractAddress common.Address,
256+
contractABI abi.ABI, cLog ctypes.Log) (types.EthereumEvent, bool, error) {
225257
// Parse the event's attributes via contract ABI
226258
event := types.EthereumEvent{}
259+
eventLogLockSignature := contractABI.Events[types.LogLock.String()].ID().Hex()
260+
eventLogBurnSignature := contractABI.Events[types.LogBurn.String()].ID().Hex()
261+
262+
var eventName string
263+
switch cLog.Topics[0].Hex() {
264+
case eventLogBurnSignature:
265+
eventName = types.LogBurn.String()
266+
case eventLogLockSignature:
267+
eventName = types.LogLock.String()
268+
}
269+
270+
// If event is not expected
271+
if eventName == "" {
272+
return event, false, nil
273+
}
274+
227275
err := contractABI.Unpack(&event, eventName, cLog.Data)
228276
if err != nil {
229-
sub.Logger.Error("error unpacking: %v", err)
277+
sub.Logger.Error(err.Error())
278+
return event, false, err
230279
}
231280
event.BridgeContractAddress = contractAddress
232281
event.EthereumChainID = clientChainID
@@ -239,7 +288,11 @@ func (sub EthereumSub) handleEthereumEvent(clientChainID *big.Int, contractAddre
239288

240289
// Add the event to the record
241290
types.NewEventWrite(cLog.TxHash.Hex(), event)
291+
return event, true, nil
292+
}
242293

294+
// handleEthereumEvent unpacks an Ethereum event, converts it to a ProphecyClaim, and relays a tx to Cosmos
295+
func (sub EthereumSub) handleEthereumEvent(event types.EthereumEvent) error {
243296
prophecyClaim, err := txs.EthereumEventToEthBridgeClaim(sub.ValidatorAddress, &event)
244297
if err != nil {
245298
return err

Diff for: cmd/ebrelayer/types/ethEventBuffer.go

+165
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package types
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"math/big"
7+
8+
"github.com/ethereum/go-ethereum/common"
9+
)
10+
11+
// EventsInBlock store all events in a block, parent hash used to determine the best chain
12+
type EventsInBlock struct {
13+
ParentHash common.Hash
14+
Events []EthereumEvent
15+
}
16+
17+
// NewEventsInBlock create new instance with parent hash
18+
func NewEventsInBlock() EventsInBlock {
19+
return EventsInBlock{
20+
ParentHash: common.Hash{},
21+
Events: []EthereumEvent{},
22+
}
23+
}
24+
25+
// AddEvent append new event to list
26+
func (e *EventsInBlock) AddEvent(event EthereumEvent) {
27+
// avoid add the same event twice
28+
for _, n := range e.Events {
29+
if n.Equal(event) {
30+
log.Println("EventsInBlock event already in list")
31+
return
32+
}
33+
}
34+
e.Events = append(e.Events, event)
35+
}
36+
37+
// EventsInHeight store all events at the same height
38+
type EventsInHeight struct {
39+
// map the block hash to its parent hash and event list
40+
EventsMap map[common.Hash]*EventsInBlock
41+
}
42+
43+
// NewEventsInHeight create a new instance
44+
func NewEventsInHeight() EventsInHeight {
45+
return EventsInHeight{
46+
EventsMap: make(map[common.Hash]*EventsInBlock),
47+
}
48+
}
49+
50+
// AddEvent append event
51+
func (e *EventsInHeight) AddEvent(blockHash common.Hash, event EthereumEvent) {
52+
events, ok := e.EventsMap[blockHash]
53+
if ok {
54+
events.AddEvent(event)
55+
} else {
56+
newEventsInBlock := NewEventsInBlock()
57+
newEventsInBlock.AddEvent(event)
58+
e.EventsMap[blockHash] = &newEventsInBlock
59+
}
60+
}
61+
62+
// AddHeader add a new block hash into map
63+
func (e *EventsInHeight) AddHeader(blockHash common.Hash, parentHash common.Hash) {
64+
events, ok := e.EventsMap[blockHash]
65+
// the events list the block hash already existed, then update the parent hash
66+
if ok {
67+
events.ParentHash = parentHash
68+
} else {
69+
newEventsInBlock := NewEventsInBlock()
70+
newEventsInBlock.ParentHash = parentHash
71+
e.EventsMap[blockHash] = &newEventsInBlock
72+
}
73+
}
74+
75+
// EthEventBuffer store all events from Ethereum smart contract
76+
type EthEventBuffer struct {
77+
Buffer map[string]EventsInHeight
78+
MinHeight *big.Int
79+
}
80+
81+
// NewEthEventBuffer create a new instance of EthEventBuffer
82+
func NewEthEventBuffer() EthEventBuffer {
83+
return EthEventBuffer{
84+
Buffer: make(map[string]EventsInHeight),
85+
MinHeight: big.NewInt(0),
86+
}
87+
}
88+
89+
// AddEvent insert a new event to queue
90+
// func (buff *EthEventBuffer) AddEvent(blockNumber *big.Int, blockHash common.Hash, event EthereumEvent) {
91+
func (buff *EthEventBuffer) AddEvent(blockNumber fmt.Stringer, blockHash common.Hash, event EthereumEvent) {
92+
// Check if block number already in the map
93+
events, ok := buff.Buffer[blockNumber.String()]
94+
if ok {
95+
events.AddEvent(blockHash, event)
96+
} else {
97+
newEvents := NewEventsInHeight()
98+
newEvents.AddEvent(blockHash, event)
99+
buff.Buffer[blockNumber.String()] = newEvents
100+
}
101+
}
102+
103+
// AddHeader create new entry for new header
104+
func (buff *EthEventBuffer) AddHeader(blockNumber *big.Int, blockHash common.Hash, parentHash common.Hash) {
105+
if buff.MinHeight.Cmp(big.NewInt(0)) == 0 {
106+
buff.MinHeight = blockNumber
107+
}
108+
// Check if block number already in the map
109+
eventsInHeight, ok := buff.Buffer[blockNumber.String()]
110+
if ok {
111+
eventsInHeight.AddHeader(blockHash, parentHash)
112+
} else {
113+
newEventsInHeight := NewEventsInHeight()
114+
newEventsInHeight.AddHeader(blockHash, parentHash)
115+
buff.Buffer[blockNumber.String()] = newEventsInHeight
116+
}
117+
}
118+
119+
// GetDepth get the depth of a block
120+
func (buff *EthEventBuffer) GetDepth(blockNumber *big.Int, blockHash common.Hash) uint64 {
121+
eventsInHeight, ok := buff.Buffer[blockNumber.String()]
122+
if ok {
123+
// if there is block's parent is the block hash
124+
for key, eventsInBlock := range eventsInHeight.EventsMap {
125+
if eventsInBlock.ParentHash == blockHash {
126+
one := big.NewInt(1)
127+
one.Add(one, blockNumber)
128+
129+
// recursive to its child block
130+
return buff.GetDepth(one, key) + 1
131+
}
132+
}
133+
}
134+
return 0
135+
}
136+
137+
// RemoveHeight remove an entry
138+
func (buff *EthEventBuffer) RemoveHeight() {
139+
delete(buff.Buffer, buff.MinHeight.String())
140+
buff.MinHeight.Add(buff.MinHeight, big.NewInt(1))
141+
}
142+
143+
// GetHeaderEvents get the events in block of best chain
144+
func (buff *EthEventBuffer) GetHeaderEvents() []EthereumEvent {
145+
eventsInHeight, ok := buff.Buffer[buff.MinHeight.String()]
146+
if ok {
147+
maxDepth := uint64(0)
148+
result := []EthereumEvent{}
149+
one := big.NewInt(1)
150+
one.Add(one, buff.MinHeight)
151+
for blockHash, eventsInBlock := range eventsInHeight.EventsMap {
152+
153+
depth := buff.GetDepth(one, blockHash)
154+
155+
if depth >= maxDepth {
156+
maxDepth = depth
157+
result = eventsInBlock.Events
158+
}
159+
}
160+
161+
return result
162+
}
163+
164+
return []EthereumEvent{}
165+
}

Diff for: cmd/ebrelayer/types/types.go

+14
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package types
22

33
import (
4+
"bytes"
45
"fmt"
56
"math/big"
67

@@ -47,6 +48,19 @@ type EthereumEvent struct {
4748
ClaimType ethbridge.ClaimType
4849
}
4950

51+
// Equal two events
52+
func (e EthereumEvent) Equal(other EthereumEvent) bool {
53+
return e.EthereumChainID == other.EthereumChainID &&
54+
e.BridgeContractAddress == other.BridgeContractAddress &&
55+
bytes.Equal(e.ID[:], other.ID[:]) &&
56+
e.From == other.From &&
57+
bytes.Equal(e.To, other.To) &&
58+
e.Symbol == other.Symbol &&
59+
e.Value.Cmp(other.Value) == 0 &&
60+
e.Nonce.Cmp(other.Nonce) == 0 &&
61+
e.ClaimType == other.ClaimType
62+
}
63+
5064
// String implements fmt.Stringer
5165
func (e EthereumEvent) String() string {
5266
return fmt.Sprintf("\nChain ID: %v\nBridge contract address: %v\nToken symbol: %v\nToken "+

Diff for: docs/adr-002-ebrelayer-eth-subscribe.md

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# ADR 002: Ebrelayer Ethereum Subscribe
2+
3+
## Changelog
4+
5+
- 2020/10/21: Initial version
6+
7+
## Status
8+
9+
*Proposed*
10+
11+
## Context
12+
In this ADR, we discuss the solution for ebrelayer how to subscribe the events from Ethereum and process these events.
13+
### Summary
14+
15+
For ebrelayer, it just needs to subscribe to the events from the BridgeBank smart contract, both LogLock event and LogBurn event, then process both events and send transaction to Sifchain. Basically, there are two problems.
16+
1. The block produced in Ethereum maybe be reorganized, most of system opt to confirm the finalization of block after 6 blocks. So it is better to process the event with some dalay. For the events once we receive from subscription, we store them in the buffer and wait for more blocks produced.
17+
2. How to store the events, there are two options. First one is store them in memory, but events will be lost if ebrelayer restarted. Second solution is store in local db, message queue like Kafka, but will increase the complexity of ebrelayer's deployment.
18+
19+
## Current solution
20+
We start to process the events happened 50 blocks before. 50 blocks can guarantee the finalization of block. Then there is no impact from block reorganization. Repeated events and invalid events are abondoned in events buffer. We choose memory to store events for now, it is simple and easy to implements. The ebrelayer will miss some events if ebrelayer restarted. Considering the whole Sifchain system is a decentralized, we can tolerate some ebrelayer offline and not send transaction to Sifnode.
21+
22+
### Pros and Cons
23+
24+
Pros: The solution is easy to implement.
25+
26+
Cons: The events lost if ebrelayer restart. We will store the events in persistent storage like local database or message queue system. It depends on the requirement of product in the future.
27+
28+
## Consequences
29+
We will see obvious of delay for Sifchain get the message of cross-chain asset transfer. The end to end tests are impacted, need extra operations and transactions to verify the cases both transfering eth/erc20 asset to Sifchain and burn pegged Cosmos asset back to Sifchain.
30+
31+
### Positive
32+
33+
- We can quickly deliver our MVP
34+
35+
### Negative
36+
37+
- Nothing major
38+
39+
### Neutral
40+
41+
- Nothing major
42+

0 commit comments

Comments
 (0)