Skip to content

feature wait 50 blocks to process Ethereum events #338

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Dec 19, 2020
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
158c3e8
create event queue for delay send tx to sifchain.
juniuszhou Dec 2, 2020
6bd3ad5
set delay blocks as 30.
juniuszhou Dec 2, 2020
7dff3d6
add adr file to describe ebrelayer.
juniuszhou Dec 4, 2020
b826e67
update account according to new init.sh
juniuszhou Dec 4, 2020
9281f0e
Merge branch 'develop' into feature/wait-blocks-confirm
ElliotFriedman Dec 5, 2020
97d4378
update docs
ElliotFriedman Dec 5, 2020
e41df80
Merge branch 'develop' into feature/wait-blocks-confirm
utx0 Dec 9, 2020
8092120
new algorithm to handle orphan block.
juniuszhou Dec 9, 2020
0fc75c8
Merge branch 'feature/wait-blocks-confirm' of https://github.com/sifc…
juniuszhou Dec 9, 2020
bedfd69
fix linter issue.
juniuszhou Dec 9, 2020
61e53fc
Merge branch 'develop' into feature/wait-blocks-confirm
juniuszhou Dec 10, 2020
67d0a7d
fix two errors.
juniuszhou Dec 11, 2020
619bdcf
add scripts to generate 30 new blocks.
juniuszhou Dec 11, 2020
7143e5e
print out balance for check.
juniuszhou Dec 11, 2020
47a5bd6
print out balance for check.
juniuszhou Dec 11, 2020
cd109ae
Merge branch 'develop' into feature/wait-blocks-confirm
ElliotFriedman Dec 11, 2020
b149e54
Merge branch 'develop' into feature/wait-blocks-confirm
ElliotFriedman Dec 14, 2020
38d510e
Merge branch 'develop' into feature/wait-blocks-confirm
ElliotFriedman Dec 16, 2020
f9c37a1
remove the eth lock in integration env, it impact the balance of ceth.
juniuszhou Dec 16, 2020
990fe7c
make the transfer script send transaction according to input.
juniuszhou Dec 16, 2020
616a4e3
add sleep time to make sure new block produced.
juniuszhou Dec 17, 2020
8087b40
Pull in peggy-basic-test-docker.py and test/integration/peggy-e2e-tes…
banshee Dec 17, 2020
85a5388
remove send transfer, create advanceBlock script
ElliotFriedman Dec 17, 2020
6524980
Merge branch 'feature/wait-blocks-confirm' of github.com:Sifchain/sif…
ElliotFriedman Dec 17, 2020
065d83c
rebase pr with develop branch.
juniuszhou Dec 18, 2020
0cc027b
fix the conflict.
juniuszhou Dec 18, 2020
0c4791c
Merge branch 'develop' into feature/wait-blocks-confirm
banshee Dec 19, 2020
29ee90f
Merge branch 'develop' of ssh://github.com/Sifchain/sifnode into feat…
banshee Dec 19, 2020
90cfcaf
Don't wrap block advance test in an exception handler
banshee Dec 19, 2020
c42372d
Require 50 block delay instead of 30
banshee Dec 19, 2020
c080913
update docs
ElliotFriedman Dec 19, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 75 additions & 22 deletions cmd/ebrelayer/relayer/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ type EthereumSub struct {
TxBldr authtypes.TxBuilder
PrivateKey *ecdsa.PrivateKey
TempPassword string
EventsBuffer types.EthEventBuffer
Logger tmLog.Logger
}

// NewKeybase create a new keybase instance
func NewKeybase(validatorMoniker, mnemonic, password string) (keys.Keybase, keys.Info, error) {
keybase := keys.NewInMemory()
hdpath := *hd.NewFundraiserParams(0, sdk.CoinType, 0)
Expand Down Expand Up @@ -97,6 +99,7 @@ func NewEthereumSub(inBuf io.Reader, rpcURL string, cdc *codec.Codec, validatorM
TxBldr: txBldr,
PrivateKey: privateKey,
TempPassword: tempPassword,
EventsBuffer: types.NewEthEventBuffer(),
Logger: logger,
}, nil
}
Expand Down Expand Up @@ -158,8 +161,16 @@ func (sub EthereumSub) Start(completionEvent *sync.WaitGroup) {
bridgeBankAddress, subBridgeBank := sub.startContractEventSub(logs, client, txs.BridgeBank)
defer subBridgeBank.Unsubscribe()
bridgeBankContractABI := contract.LoadABI(txs.BridgeBank)
eventLogLockSignature := bridgeBankContractABI.Events[types.LogLock.String()].ID().Hex()
eventLogBurnSignature := bridgeBankContractABI.Events[types.LogBurn.String()].ID().Hex()

// Listen the new header
heads := make(chan *ctypes.Header)
defer close(heads)
subHead, err := client.SubscribeNewHead(context.Background(), heads)
if err != nil {
log.Println(err)
return
}
defer subHead.Unsubscribe()

for {
select {
Expand All @@ -171,26 +182,47 @@ func (sub EthereumSub) Start(completionEvent *sync.WaitGroup) {
completionEvent.Add(1)
go sub.Start(completionEvent)
return
case err := <-subHead.Err():
sub.Logger.Error(err.Error())
completionEvent.Add(1)
go sub.Start(completionEvent)
return
case newHead := <-heads:
sub.Logger.Info(fmt.Sprintf("New header %d with hash %v", newHead.Number, newHead.Hash()))

// Add new header info to buffer
sub.EventsBuffer.AddHeader(newHead.Number, newHead.Hash(), newHead.ParentHash)

for {
thirty := big.NewInt(30)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be 50 blocks per the new requirement.

thirty.Add(thirty, sub.EventsBuffer.MinHeight)
if thirty.Cmp(newHead.Number) <= 0 {
events := sub.EventsBuffer.GetHeaderEvents()
for _, event := range events {
err := sub.handleEthereumEvent(event)

if err != nil {
sub.Logger.Error(err.Error())
completionEvent.Add(1)
}
}

sub.EventsBuffer.RemoveHeight()

} else {
break
}
}

// vLog is raw event data
case vLog := <-logs:
sub.Logger.Info(fmt.Sprintf("Witnessed tx %s on block %d\n", vLog.TxHash.Hex(), vLog.BlockNumber))
log.Println("Found event from the ethereum bridgebank contract: ", types.LogLock.String())
var err error
switch vLog.Topics[0].Hex() {
case eventLogBurnSignature:
err = sub.handleEthereumEvent(clientChainID, bridgeBankAddress, bridgeBankContractABI,
types.LogBurn.String(), vLog)
case eventLogLockSignature:
log.Println("Found loglock event from the ethereum bridgebank contract: ", types.LogLock.String())
err = sub.handleEthereumEvent(clientChainID, bridgeBankAddress, bridgeBankContractABI,
types.LogLock.String(), vLog)
}
// TODO: Check local events store for status, if retryable, attempt relay again
event, isBurnLock, err := sub.logToEvent(clientChainID, bridgeBankAddress, bridgeBankContractABI, vLog)
if err != nil {
sub.Logger.Error(err.Error())
completionEvent.Add(1)
go sub.Start(completionEvent)
return
sub.Logger.Error("Failed to get event from ethereum log")
} else if isBurnLock {
sub.Logger.Info("Add event into buffer")
sub.EventsBuffer.AddEvent(big.NewInt(int64(vLog.BlockNumber)), vLog.BlockHash, event)
}
}
}
Expand Down Expand Up @@ -219,14 +251,31 @@ func (sub EthereumSub) startContractEventSub(logs chan ctypes.Log, client *ethcl
return subContractAddress, contractSub
}

// handleEthereumEvent unpacks an Ethereum event, converts it to a ProphecyClaim, and relays a tx to Cosmos
func (sub EthereumSub) handleEthereumEvent(clientChainID *big.Int, contractAddress common.Address,
contractABI abi.ABI, eventName string, cLog ctypes.Log) error {
// logToEvent unpacks an Ethereum event
func (sub EthereumSub) logToEvent(clientChainID *big.Int, contractAddress common.Address,
contractABI abi.ABI, cLog ctypes.Log) (types.EthereumEvent, bool, error) {
// Parse the event's attributes via contract ABI
event := types.EthereumEvent{}
eventLogLockSignature := contractABI.Events[types.LogLock.String()].ID().Hex()
eventLogBurnSignature := contractABI.Events[types.LogBurn.String()].ID().Hex()

var eventName string
switch cLog.Topics[0].Hex() {
case eventLogBurnSignature:
eventName = types.LogBurn.String()
case eventLogLockSignature:
eventName = types.LogLock.String()
}

// If event is not expected
if eventName == "" {
return event, false, nil
}

err := contractABI.Unpack(&event, eventName, cLog.Data)
if err != nil {
sub.Logger.Error("error unpacking: %v", err)
sub.Logger.Error(err.Error())
return event, false, err
}
event.BridgeContractAddress = contractAddress
event.EthereumChainID = clientChainID
Expand All @@ -239,7 +288,11 @@ func (sub EthereumSub) handleEthereumEvent(clientChainID *big.Int, contractAddre

// Add the event to the record
types.NewEventWrite(cLog.TxHash.Hex(), event)
return event, true, nil
}

// handleEthereumEvent unpacks an Ethereum event, converts it to a ProphecyClaim, and relays a tx to Cosmos
func (sub EthereumSub) handleEthereumEvent(event types.EthereumEvent) error {
prophecyClaim, err := txs.EthereumEventToEthBridgeClaim(sub.ValidatorAddress, &event)
if err != nil {
return err
Expand Down
165 changes: 165 additions & 0 deletions cmd/ebrelayer/types/ethEventBuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package types

import (
"fmt"
"log"
"math/big"

"github.com/ethereum/go-ethereum/common"
)

// EventsInBlock store all events in a block, parent hash used to determine the best chain
type EventsInBlock struct {
ParentHash common.Hash
Events []EthereumEvent
}

// NewEventsInBlock create new instance with parent hash
func NewEventsInBlock() EventsInBlock {
return EventsInBlock{
ParentHash: common.Hash{},
Events: []EthereumEvent{},
}
}

// AddEvent append new event to list
func (e *EventsInBlock) AddEvent(event EthereumEvent) {
// avoid add the same event twice
for _, n := range e.Events {
if n.Equal(event) {
log.Println("EventsInBlock event already in list")
return
}
}
e.Events = append(e.Events, event)
}

// EventsInHeight store all events at the same height
type EventsInHeight struct {
// map the block hash to its parent hash and event list
EventsMap map[common.Hash]*EventsInBlock
}

// NewEventsInHeight create a new instance
func NewEventsInHeight() EventsInHeight {
return EventsInHeight{
EventsMap: make(map[common.Hash]*EventsInBlock),
}
}

// AddEvent append event
func (e *EventsInHeight) AddEvent(blockHash common.Hash, event EthereumEvent) {
events, ok := e.EventsMap[blockHash]
if ok {
events.AddEvent(event)
} else {
newEventsInBlock := NewEventsInBlock()
newEventsInBlock.AddEvent(event)
e.EventsMap[blockHash] = &newEventsInBlock
}
}

// AddHeader add a new block hash into map
func (e *EventsInHeight) AddHeader(blockHash common.Hash, parentHash common.Hash) {
events, ok := e.EventsMap[blockHash]
// the events list the block hash already existed, then update the parent hash
if ok {
events.ParentHash = parentHash
} else {
newEventsInBlock := NewEventsInBlock()
newEventsInBlock.ParentHash = parentHash
e.EventsMap[blockHash] = &newEventsInBlock
}
}

// EthEventBuffer store all events from Ethereum smart contract
type EthEventBuffer struct {
Buffer map[string]EventsInHeight
MinHeight *big.Int
}

// NewEthEventBuffer create a new instance of EthEventBuffer
func NewEthEventBuffer() EthEventBuffer {
return EthEventBuffer{
Buffer: make(map[string]EventsInHeight),
MinHeight: big.NewInt(0),
}
}

// AddEvent insert a new event to queue
// func (buff *EthEventBuffer) AddEvent(blockNumber *big.Int, blockHash common.Hash, event EthereumEvent) {
func (buff *EthEventBuffer) AddEvent(blockNumber fmt.Stringer, blockHash common.Hash, event EthereumEvent) {
// Check if block number already in the map
events, ok := buff.Buffer[blockNumber.String()]
if ok {
events.AddEvent(blockHash, event)
} else {
newEvents := NewEventsInHeight()
newEvents.AddEvent(blockHash, event)
buff.Buffer[blockNumber.String()] = newEvents
}
}

// AddHeader create new entry for new header
func (buff *EthEventBuffer) AddHeader(blockNumber *big.Int, blockHash common.Hash, parentHash common.Hash) {
if buff.MinHeight.Cmp(big.NewInt(0)) == 0 {
buff.MinHeight = blockNumber
}
// Check if block number already in the map
eventsInHeight, ok := buff.Buffer[blockNumber.String()]
if ok {
eventsInHeight.AddHeader(blockHash, parentHash)
} else {
newEventsInHeight := NewEventsInHeight()
newEventsInHeight.AddHeader(blockHash, parentHash)
buff.Buffer[blockNumber.String()] = newEventsInHeight
}
}

// GetDepth get the depth of a block
func (buff *EthEventBuffer) GetDepth(blockNumber *big.Int, blockHash common.Hash) uint64 {
eventsInHeight, ok := buff.Buffer[blockNumber.String()]
if ok {
// if there is block's parent is the block hash
for key, eventsInBlock := range eventsInHeight.EventsMap {
if eventsInBlock.ParentHash == blockHash {
one := big.NewInt(1)
one.Add(one, blockNumber)

// recursive to its child block
return buff.GetDepth(one, key) + 1
}
}
}
return 0
}

// RemoveHeight remove an entry
func (buff *EthEventBuffer) RemoveHeight() {
delete(buff.Buffer, buff.MinHeight.String())
buff.MinHeight.Add(buff.MinHeight, big.NewInt(1))
}

// GetHeaderEvents get the events in block of best chain
func (buff *EthEventBuffer) GetHeaderEvents() []EthereumEvent {
eventsInHeight, ok := buff.Buffer[buff.MinHeight.String()]
if ok {
maxDepth := uint64(0)
result := []EthereumEvent{}
one := big.NewInt(1)
one.Add(one, buff.MinHeight)
for blockHash, eventsInBlock := range eventsInHeight.EventsMap {

depth := buff.GetDepth(one, blockHash)

if depth >= maxDepth {
maxDepth = depth
result = eventsInBlock.Events
}
}

return result
}

return []EthereumEvent{}
}
14 changes: 14 additions & 0 deletions cmd/ebrelayer/types/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package types

import (
"bytes"
"fmt"
"math/big"

Expand Down Expand Up @@ -47,6 +48,19 @@ type EthereumEvent struct {
ClaimType ethbridge.ClaimType
}

// Equal two events
func (e EthereumEvent) Equal(other EthereumEvent) bool {
return e.EthereumChainID == other.EthereumChainID &&
e.BridgeContractAddress == other.BridgeContractAddress &&
bytes.Equal(e.ID[:], other.ID[:]) &&
e.From == other.From &&
bytes.Equal(e.To, other.To) &&
e.Symbol == other.Symbol &&
e.Value.Cmp(other.Value) == 0 &&
e.Nonce.Cmp(other.Nonce) == 0 &&
e.ClaimType == other.ClaimType
}

// String implements fmt.Stringer
func (e EthereumEvent) String() string {
return fmt.Sprintf("\nChain ID: %v\nBridge contract address: %v\nToken symbol: %v\nToken "+
Expand Down
42 changes: 42 additions & 0 deletions docs/adr-002-ebrelayer-eth-subscribe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# ADR 002: Ebrelayer Ethereum Subscribe

## Changelog

- 2020/10/21: Initial version

## Status

*Proposed*

## Context
In this ADR, we discuss the solution for ebrelayer how to subscribe the events from Ethereum and process these events.
### Summary

For ebrelayer, it just needs to subscribe to the events from the BridgeBank smart contract, both LogLock event and LogBurn event, then process both events and send transaction to Sifchain. Basically, there are two problems.
1. The block produced in Ethereum maybe be reorganized, most of system opt to confirm the finalization of block after 6 blocks. So it is better to process the event with some dalay. For the events once we receive from subscription, we store them in the buffer and wait for more blocks produced.
2. How to store the events, there are two options. First one is store them in memory, but events will be lost if ebrelayer restarted. Second solution is store in local db, message queue like Kafka, but will increase the complexity of ebrelayer's deployment.

## Current solution
We start to process the events happened 30 blocks before. 30 blocks can guarantee the finalization of block. Then there is no impact from block reorganization. Repeated events and invalid events are abondoned in events buffer. We choose memory to store events for now, it is simple and easy to implements. The ebrelayer will miss some events if ebrelayer restarted. Considering the whole Sifchain system is a decentralized, we can tolerate some ebrelayer offline and not send transaction to Sifnode.

### Pros and Cons

Pros: The solution is easy to implement.

Cons: The events lost if ebrelayer restart. We will store the events in persistent storage like local database or message queue system. It depends on the requirement of product in the future.

## Consequences
We will see obvious of delay for Sifchain get the message of cross-chain asset transfer. The end to end tests are impacted, need extra operations and transactions to verify the cases both transfering eth/erc20 asset to Sifchain and burn pegged Cosmos asset back to Sifchain.

### Positive

- We can quickly deliver our MVP

### Negative

- Nothing major

### Neutral

- Nothing major

1 change: 1 addition & 0 deletions smart-contracts/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"peggy:process": "npx truffle exec scripts/sendProcessProphecy.js",
"peggy:addBridgeToken": "npx truffle exec scripts/sendAddBridgeToken.js",
"peggy:getTokenBalance": "npx truffle exec scripts/getTokenBalance.js",
"peggy:transfer": "npx truffle exec scripts/sendTransfer.js",
"token:address": "npx truffle exec scripts/getTokenContractAddress.js",
"token:mint": "npx truffle exec scripts/mintTestTokens.js",
"token:approve": "npx truffle exec scripts/sendApproveTx.js",
Expand Down
Loading