Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: operator downtime #1073

Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
187d73f
feat: store operator last verified block
MarcosNicolau Sep 24, 2024
bada32a
feat: basic missed block processing func
MarcosNicolau Sep 24, 2024
6ef381e
refactor: operator new field names
MarcosNicolau Sep 24, 2024
84e09ff
feat: avs_reader get tasks from a block number
MarcosNicolau Sep 25, 2024
bc85333
feat: avs_read in operator to retrive the missed logs
MarcosNicolau Sep 25, 2024
3796a85
fix: write to file through channel to prevent data races
MarcosNicolau Sep 25, 2024
8f991ba
feat: avs_read add check for responded task
MarcosNicolau Sep 25, 2024
836089e
fix: go ci
MarcosNicolau Sep 25, 2024
04e911d
feat: avs_subscriber return latest task if not responded only
MarcosNicolau Sep 25, 2024
2656930
fix: send batch handling to a goroutine when processing missed batches
MarcosNicolau Sep 25, 2024
1e38be7
docs: operator functions
MarcosNicolau Sep 25, 2024
4fac752
chore(operator): add logs and general comments
MarcosNicolau Sep 26, 2024
f3bafb3
reafactor(operator): general addressing some review comments
MarcosNicolau Oct 1, 2024
509dd93
reafactor(operator): afterHandlingBatch when sending to channel
MarcosNicolau Oct 1, 2024
54fbe38
refactor(operator): use bindings when quering logs
MarcosNicolau Oct 1, 2024
a6cccfc
refactor(operator): return err instead of logging on getLatestTask
MarcosNicolau Oct 1, 2024
8663848
chore(config): update operator last processed batch file path
MarcosNicolau Oct 1, 2024
2239b8f
fix(operator): overflow check when passing fromBlock in ProcessMissed…
MarcosNicolau Oct 1, 2024
7cbb30a
Merge branch 'staging' into 962-fixoperator-aggregator-aggregator-or-…
MarcosNicolau Oct 2, 2024
5dc4071
fix(operator): afterHandlingBatch defer
MarcosNicolau Oct 2, 2024
f399d96
refactor(operator): add 0 check before processing missed batches
MarcosNicolau Oct 2, 2024
082a984
chore(config): ignored last processed batch files
MarcosNicolau Oct 2, 2024
3e8572d
chore(operator): update avs_subscriber getLatestNotRespondedTaskFromE…
MarcosNicolau Oct 2, 2024
8a67cec
Merge branch 'staging' into 962-fixoperator-aggregator-aggregator-or-…
MarcosNicolau Oct 2, 2024
00c2b41
Merge branch 'staging' into 962-fixoperator-aggregator-aggregator-or-…
uri-99 Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config-files/config-operator-1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ operator:
enable_metrics: true
metrics_ip_port_address: localhost:9092
max_batch_size: 268435456 # 256 MiB
last_processed_batch_filepath: "operator-1.json"

# Operators variables needed for register it in EigenLayer
el_delegation_manager_address: "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9"
Expand Down
1 change: 1 addition & 0 deletions config-files/config-operator-2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ operator:
staker_opt_out_window_blocks: 0
metadata_url: "https://yetanotherco.github.io/operator_metadata/metadata.json"
max_batch_size: 268435456 # 256 MiB
last_processed_batch_filepath: "operator-2.json"

# Operators variables needed for register it in EigenLayer
el_delegation_manager_address: "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9"
Expand Down
1 change: 1 addition & 0 deletions config-files/config-operator-3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ operator:
staker_opt_out_window_blocks: 0
metadata_url: "https://yetanotherco.github.io/operator_metadata/metadata.json"
max_batch_size: 268435456 # 256 MiB
last_processed_batch_filepath: "operator-3.json"

# Operators variables needed for register it in EigenLayer
el_delegation_manager_address: "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9"
Expand Down
1 change: 1 addition & 0 deletions config-files/config-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ operator:
enable_metrics: true
metrics_ip_port_address: localhost:9092
max_batch_size: 268435456 # 256 MiB
last_processed_batch_filepath: "operator.json"
1 change: 1 addition & 0 deletions config-files/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ operator:
enable_metrics: true
metrics_ip_port_address: localhost:9092
max_batch_size: 268435456 # 256 MiB
last_processed_batch_filepath: "operator.json"
# Operators variables needed for register it in EigenLayer
el_delegation_manager_address: "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9"
private_key_store_path: config-files/anvil.ecdsa.key.json
Expand Down
94 changes: 86 additions & 8 deletions core/chainio/avs_reader.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
package chainio

import (
"context"
"fmt"
"math/big"
"strings"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
gethcommon "github.com/ethereum/go-ethereum/common"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
servicemanager "github.com/yetanotherco/aligned_layer/contracts/bindings/AlignedLayerServiceManager"
contractERC20Mock "github.com/yetanotherco/aligned_layer/contracts/bindings/ERC20Mock"
"github.com/yetanotherco/aligned_layer/core/config"

Expand All @@ -13,8 +22,9 @@ import (

type AvsReader struct {
*sdkavsregistry.ChainReader
AvsContractBindings *AvsServiceBindings
logger logging.Logger
AvsContractBindings *AvsServiceBindings
AlignedLayerServiceManagerAddr ethcommon.Address
logger logging.Logger
}

func NewAvsReaderFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.EcdsaConfig) (*AvsReader, error) {
Expand All @@ -41,13 +51,14 @@ func NewAvsReaderFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.E
}

return &AvsReader{
ChainReader: chainReader,
AvsContractBindings: avsServiceBindings,
logger: baseConfig.Logger,
ChainReader: chainReader,
AvsContractBindings: avsServiceBindings,
AlignedLayerServiceManagerAddr: baseConfig.AlignedLayerDeploymentConfig.AlignedLayerServiceManagerAddr,
logger: baseConfig.Logger,
}, nil
}

func (r *AvsReader) GetErc20Mock(tokenAddr gethcommon.Address) (*contractERC20Mock.ContractERC20Mock, error) {
func (r *AvsReader) GetErc20Mock(tokenAddr ethcommon.Address) (*contractERC20Mock.ContractERC20Mock, error) {
erc20Mock, err := contractERC20Mock.NewContractERC20Mock(tokenAddr, r.AvsContractBindings.ethClient)
if err != nil {
// Retry with fallback client
Expand All @@ -59,6 +70,73 @@ func (r *AvsReader) GetErc20Mock(tokenAddr gethcommon.Address) (*contractERC20Mo
return erc20Mock, nil
}

func (r *AvsReader) IsOperatorRegistered(address gethcommon.Address) (bool, error) {
func (r *AvsReader) IsOperatorRegistered(address ethcommon.Address) (bool, error) {
return r.ChainReader.IsOperatorRegistered(&bind.CallOpts{}, address)
}

// Returns all the "NewBatchV3" logs that have not been responded starting from the given block number
func (r *AvsReader) GetNotRespondedTasksFrom(fromBlock uint64) ([]servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
latestBlock, err := r.AvsContractBindings.ethClient.BlockNumber(context.Background())
if err != nil {
latestBlock, err = r.AvsContractBindings.ethClientFallback.BlockNumber(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to get latest block number: %w", err)
}
}

alignedLayerServiceManagerABI, err := abi.JSON(strings.NewReader(servicemanager.ContractAlignedLayerServiceManagerMetaData.ABI))
if err != nil {
return nil, fmt.Errorf("failed to parse ABI: %w", err)
}

newBatchEvent := alignedLayerServiceManagerABI.Events["NewBatchV3"]
if newBatchEvent.ID == (ethcommon.Hash{}) {
return nil, fmt.Errorf("NewBatch event not found in ABI")
}

query := ethereum.FilterQuery{
FromBlock: big.NewInt(int64(fromBlock)),
ToBlock: big.NewInt(int64(latestBlock)),
Addresses: []ethcommon.Address{r.AlignedLayerServiceManagerAddr},
Topics: [][]ethcommon.Hash{{newBatchEvent.ID, {}}},
}

logs, err := r.AvsContractBindings.ethClient.FilterLogs(context.Background(), query)
if err != nil {
logs, err = r.AvsContractBindings.ethClientFallback.FilterLogs(context.Background(), query)
if err != nil {
return nil, fmt.Errorf("failed to get logs: %w", err)
}
}

var tasks []servicemanager.ContractAlignedLayerServiceManagerNewBatchV3

for _, logEntry := range logs {
var task servicemanager.ContractAlignedLayerServiceManagerNewBatchV3

err := alignedLayerServiceManagerABI.UnpackIntoInterface(&task, "NewBatchV3", logEntry.Data)
if err != nil {
return nil, fmt.Errorf("failed to unpack log data: %w", err)
}

// The second topic is the batch merkle root, as it is an indexed variable in the contract
task.BatchMerkleRoot = logEntry.Topics[1]

// now check if its finalized or not before appending
batchIdentifier := append(task.BatchMerkleRoot[:], task.SenderAddress[:]...)
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
state, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(nil, batchIdentifierHash)

if err != nil {
return nil, fmt.Errorf("err while getting batch state: %w", err)
}

// append the task if not responded yet
if !state.Responded {
tasks = append(tasks, task)
}

}

return tasks, nil
}
55 changes: 43 additions & 12 deletions core/chainio/avs_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"encoding/hex"
"fmt"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
ethcommon "github.com/ethereum/go-ethereum/common"
"math/big"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
ethcommon "github.com/ethereum/go-ethereum/common"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
Expand Down Expand Up @@ -102,7 +103,9 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema
s.logger.Debug("Failed to get latest task from blockchain", "err", err)
continue
}
s.processNewBatchV2(latestBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
if latestBatch != nil {
s.processNewBatchV2(latestBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
}
}
}

Expand Down Expand Up @@ -132,7 +135,7 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema

return errorChannel, nil
}

func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) (chan error, error) {
// Create a new channel to receive new tasks
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3)
Expand Down Expand Up @@ -170,7 +173,9 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema
s.logger.Debug("Failed to get latest task from blockchain", "err", err)
continue
}
s.processNewBatchV3(latestBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
if latestBatch != nil {
s.processNewBatchV3(latestBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
}
}
}

Expand Down Expand Up @@ -254,9 +259,9 @@ func (s *AvsSubscriber) processNewBatchV2(batch *servicemanager.ContractAlignedL

if _, ok := batchesSet[batchIdentifierHash]; !ok {
s.logger.Info("Received new task",
"batchMerkleRoot", hex.EncodeToString(batch.BatchMerkleRoot[:]),
"senderAddress", hex.EncodeToString(batch.SenderAddress[:]),
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]),)
"batchMerkleRoot", hex.EncodeToString(batch.BatchMerkleRoot[:]),
"senderAddress", hex.EncodeToString(batch.SenderAddress[:]),
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))

batchesSet[batchIdentifierHash] = struct{}{}
newTaskCreatedChan <- batch
Expand All @@ -280,9 +285,9 @@ func (s *AvsSubscriber) processNewBatchV3(batch *servicemanager.ContractAlignedL

if _, ok := batchesSet[batchIdentifierHash]; !ok {
s.logger.Info("Received new task",
"batchMerkleRoot", hex.EncodeToString(batch.BatchMerkleRoot[:]),
"senderAddress", hex.EncodeToString(batch.SenderAddress[:]),
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]),)
"batchMerkleRoot", hex.EncodeToString(batch.BatchMerkleRoot[:]),
"senderAddress", hex.EncodeToString(batch.SenderAddress[:]),
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))

batchesSet[batchIdentifierHash] = struct{}{}
newTaskCreatedChan <- batch
Expand Down Expand Up @@ -358,6 +363,19 @@ func (s *AvsSubscriber) getLatestTaskFromEthereumV2() (*servicemanager.ContractA
// The second topic is the batch merkle root, as it is an indexed variable in the contract
latestTask.BatchMerkleRoot = lastLog.Topics[1]

// return the task if has not been responded only
batchIdentifier := append(latestTask.BatchMerkleRoot[:], latestTask.SenderAddress[:]...)
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
state, err := s.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(nil, batchIdentifierHash)

if err != nil {
return nil, fmt.Errorf("err while getting batch state: %w", err)
}

if state.Responded {
return nil, nil
}

return &latestTask, nil
}

Expand Down Expand Up @@ -422,6 +440,19 @@ func (s *AvsSubscriber) getLatestTaskFromEthereumV3() (*servicemanager.ContractA
// The second topic is the batch merkle root, as it is an indexed variable in the contract
latestTask.BatchMerkleRoot = lastLog.Topics[1]

// return the task if has not been responded only
batchIdentifier := append(latestTask.BatchMerkleRoot[:], latestTask.SenderAddress[:]...)
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
state, err := s.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(nil, batchIdentifierHash)

if err != nil {
return nil, fmt.Errorf("err while getting batch state: %w", err)
}

if state.Responded {
return nil, nil
}

return &latestTask, nil
}

Expand Down
3 changes: 3 additions & 0 deletions core/config/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type OperatorConfig struct {
EnableMetrics bool
MetricsIpPortAddress string
MaxBatchSize int64
LastProcessedBatchFilePath string
}
}

Expand All @@ -43,6 +44,7 @@ type OperatorConfigFromYaml struct {
EnableMetrics bool `yaml:"enable_metrics"`
MetricsIpPortAddress string `yaml:"metrics_ip_port_address"`
MaxBatchSize int64 `yaml:"max_batch_size"`
LastProcessedBatchFilePath string `yaml:"last_processed_batch_filepath"`
} `yaml:"operator"`
EcdsaConfigFromYaml EcdsaConfigFromYaml `yaml:"ecdsa"`
BlsConfigFromYaml BlsConfigFromYaml `yaml:"bls"`
Expand Down Expand Up @@ -92,6 +94,7 @@ func NewOperatorConfig(configFilePath string) *OperatorConfig {
EnableMetrics bool
MetricsIpPortAddress string
MaxBatchSize int64
LastProcessedBatchFilePath string
}(operatorConfigFromYaml.Operator),
}
}
Loading