Skip to content

Commit

Permalink
Fix operator merkle root retrocompatibility (#996)
Browse files Browse the repository at this point in the history
  • Loading branch information
uri-99 authored Sep 19, 2024
1 parent 047a65b commit 45c9a50
Show file tree
Hide file tree
Showing 16 changed files with 4,416 additions and 32 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build-go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ jobs:
run: make build_halo2_ipa_linux
- name: Build Merkle Tree bindings
run: make build_merkle_tree_linux
- name: Build Old Merkle Tree bindings
run: make build_merkle_tree_linux_old
- name: Build operator
run: go build operator/cmd/main.go
- name: Build aggregator
Expand Down
24 changes: 24 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -541,15 +541,29 @@ build_merkle_tree_macos:
@cp operator/merkle_tree/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.dylib operator/merkle_tree/lib/libmerkle_tree.dylib
@cp operator/merkle_tree/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.a operator/merkle_tree/lib/libmerkle_tree.a

build_merkle_tree_macos_old:
@cd operator/merkle_tree_old/lib && cargo build $(RELEASE_FLAG)
@cp operator/merkle_tree_old/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.dylib operator/merkle_tree_old/lib/libmerkle_tree.dylib
@cp operator/merkle_tree_old/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.a operator/merkle_tree_old/lib/libmerkle_tree.a

build_merkle_tree_linux:
@cd operator/merkle_tree/lib && cargo build $(RELEASE_FLAG)
@cp operator/merkle_tree/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.so operator/merkle_tree/lib/libmerkle_tree.so
@cp operator/merkle_tree/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.a operator/merkle_tree/lib/libmerkle_tree.a

build_merkle_tree_linux_old:
@cd operator/merkle_tree_old/lib && cargo build $(RELEASE_FLAG)
@cp operator/merkle_tree_old/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.so operator/merkle_tree_old/lib/libmerkle_tree.so
@cp operator/merkle_tree_old/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.a operator/merkle_tree_old/lib/libmerkle_tree.a

test_merkle_tree_rust_ffi:
@echo "Testing Merkle Tree Rust FFI source code..."
@cd operator/merkle_tree/lib && RUST_MIN_STACK=83886080 cargo t --release

test_merkle_tree_rust_ffi_old:
@echo "Testing Old Merkle Tree Rust FFI source code..."
@cd operator/merkle_tree_old/lib && RUST_MIN_STACK=83886080 cargo t --release

test_merkle_tree_go_bindings_macos: build_merkle_tree_macos
@echo "Testing Merkle Tree Go bindings..."
go test ./operator/merkle_tree/... -v
Expand All @@ -558,6 +572,14 @@ test_merkle_tree_go_bindings_linux: build_merkle_tree_linux
@echo "Testing Merkle Tree Go bindings..."
go test ./operator/merkle_tree/... -v

test_merkle_tree_old_go_bindings_macos: build_merkle_tree_macos_old
@echo "Testing Old Merkle Tree Go bindings..."
go test ./operator/merkle_tree_old/... -v

test_merkle_tree_go_bindings_linux_old: build_merkle_tree_linux_old
@echo "Testing Merkle Tree Go bindings..."
go test ./operator/merkle_tree_old/... -v

__HALO2_KZG_FFI__: ##
build_halo2_kzg_macos:
@cd operator/halo2kzg/lib && cargo build $(RELEASE_FLAG)
Expand Down Expand Up @@ -632,6 +654,7 @@ build_all_ffi_macos: ## Build all FFIs for macOS
@$(MAKE) build_sp1_macos
@$(MAKE) build_risc_zero_macos
@$(MAKE) build_merkle_tree_macos
@$(MAKE) build_merkle_tree_macos_old
@$(MAKE) build_halo2_ipa_macos
@$(MAKE) build_halo2_kzg_macos
@echo "All macOS FFIs built successfully."
Expand All @@ -641,6 +664,7 @@ build_all_ffi_linux: ## Build all FFIs for Linux
@$(MAKE) build_sp1_linux
@$(MAKE) build_risc_zero_linux
@$(MAKE) build_merkle_tree_linux
@$(MAKE) build_merkle_tree_linux_old
@$(MAKE) build_halo2_ipa_linux
@$(MAKE) build_halo2_kzg_linux
@echo "All Linux FFIs built successfully."
Expand Down
2 changes: 1 addition & 1 deletion aggregator/internal/pkg/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (agg *Aggregator) SubscribeToNewTasks() error {
func (agg *Aggregator) subscribeToNewTasks() error {
var err error

agg.taskSubscriber, err = agg.avsSubscriber.SubscribeToNewTasks(agg.NewBatchChan)
agg.taskSubscriber, err = agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan)

if err != nil {
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", err)
Expand Down
202 changes: 191 additions & 11 deletions core/chainio/avs_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,87 @@ func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber,
logger: baseConfig.Logger,
}, nil
}

func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) (chan error, error) {
// Create a new channel to receive new tasks
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2)

// Subscribe to new tasks
sub, err := subscribeToNewTasksV2(s.AvsContractBindings.ServiceManager, internalChannel, s.logger)
if err != nil {
s.logger.Error("Failed to subscribe to new AlignedLayer tasks", "err", err)
return nil, err
}

subFallback, err := subscribeToNewTasksV2(s.AvsContractBindings.ServiceManagerFallback, internalChannel, s.logger)
if err != nil {
s.logger.Error("Failed to subscribe to new AlignedLayer tasks", "err", err)
return nil, err
}

// create a new channel to foward errors
errorChannel := make(chan error)

pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)

// Forward the new tasks to the provided channel
go func() {
defer pollLatestBatchTicker.Stop()
newBatchMutex := &sync.Mutex{}
batchesSet := make(map[[32]byte]struct{})
for {
select {
case newBatch := <-internalChannel:
s.processNewBatchV2(newBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
case <-pollLatestBatchTicker.C:
latestBatch, err := s.getLatestTaskFromEthereumV2()
if err != nil {
s.logger.Debug("Failed to get latest task from blockchain", "err", err)
continue
}
s.processNewBatchV2(latestBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
}
}

}()

// Handle errors and resubscribe
go func() {
for {
select {
case err := <-sub.Err():
s.logger.Warn("Error in new task subscription", "err", err)
sub.Unsubscribe()
sub, err = subscribeToNewTasksV2(s.AvsContractBindings.ServiceManager, internalChannel, s.logger)
if err != nil {
errorChannel <- err
}
case err := <-subFallback.Err():
s.logger.Warn("Error in fallback new task subscription", "err", err)
subFallback.Unsubscribe()
subFallback, err = subscribeToNewTasksV2(s.AvsContractBindings.ServiceManagerFallback, internalChannel, s.logger)
if err != nil {
errorChannel <- err
}
}
}
}()

return errorChannel, nil
}

func (s *AvsSubscriber) SubscribeToNewTasks(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) (chan error, error) {
func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) (chan error, error) {
// Create a new channel to receive new tasks
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3)

// Subscribe to new tasks
sub, err := subscribeToNewTasks(s.AvsContractBindings.ServiceManager, internalChannel, s.logger)
sub, err := subscribeToNewTasksV3(s.AvsContractBindings.ServiceManager, internalChannel, s.logger)
if err != nil {
s.logger.Error("Failed to subscribe to new AlignedLayer tasks", "err", err)
return nil, err
}

subFallback, err := subscribeToNewTasks(s.AvsContractBindings.ServiceManagerFallback, internalChannel, s.logger)
subFallback, err := subscribeToNewTasksV3(s.AvsContractBindings.ServiceManagerFallback, internalChannel, s.logger)
if err != nil {
s.logger.Error("Failed to subscribe to new AlignedLayer tasks", "err", err)
return nil, err
Expand All @@ -95,14 +163,14 @@ func (s *AvsSubscriber) SubscribeToNewTasks(newTaskCreatedChan chan *servicemana
for {
select {
case newBatch := <-internalChannel:
s.processNewBatch(newBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
s.processNewBatchV3(newBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
case <-pollLatestBatchTicker.C:
latestBatch, err := s.getLatestTaskFromEthereum()
latestBatch, err := s.getLatestTaskFromEthereumV3()
if err != nil {
s.logger.Debug("Failed to get latest task from blockchain", "err", err)
continue
}
s.processNewBatch(latestBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
s.processNewBatchV3(latestBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
}
}

Expand All @@ -115,14 +183,14 @@ func (s *AvsSubscriber) SubscribeToNewTasks(newTaskCreatedChan chan *servicemana
case err := <-sub.Err():
s.logger.Warn("Error in new task subscription", "err", err)
sub.Unsubscribe()
sub, err = subscribeToNewTasks(s.AvsContractBindings.ServiceManager, internalChannel, s.logger)
sub, err = subscribeToNewTasksV3(s.AvsContractBindings.ServiceManager, internalChannel, s.logger)
if err != nil {
errorChannel <- err
}
case err := <-subFallback.Err():
s.logger.Warn("Error in fallback new task subscription", "err", err)
subFallback.Unsubscribe()
subFallback, err = subscribeToNewTasks(s.AvsContractBindings.ServiceManagerFallback, internalChannel, s.logger)
subFallback, err = subscribeToNewTasksV3(s.AvsContractBindings.ServiceManagerFallback, internalChannel, s.logger)
if err != nil {
errorChannel <- err
}
Expand All @@ -133,7 +201,29 @@ func (s *AvsSubscriber) SubscribeToNewTasks(newTaskCreatedChan chan *servicemana
return errorChannel, nil
}

func subscribeToNewTasks(
func subscribeToNewTasksV2(
serviceManager *servicemanager.ContractAlignedLayerServiceManager,
newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2,
logger sdklogging.Logger,
) (event.Subscription, error) {
for i := 0; i < MaxRetries; i++ {
sub, err := serviceManager.WatchNewBatchV2(
&bind.WatchOpts{}, newTaskCreatedChan, nil,
)
if err != nil {
logger.Warn("Failed to subscribe to new AlignedLayer tasks", "err", err)
time.Sleep(RetryInterval)
continue
}

logger.Info("Subscribed to new AlignedLayer tasks")
return sub, nil
}

return nil, fmt.Errorf("failed to subscribe to new AlignedLayer tasks after %d retries", MaxRetries)
}

func subscribeToNewTasksV3(
serviceManager *servicemanager.ContractAlignedLayerServiceManager,
newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3,
logger sdklogging.Logger,
Expand All @@ -155,7 +245,7 @@ func subscribeToNewTasks(
return nil, fmt.Errorf("failed to subscribe to new AlignedLayer tasks after %d retries", MaxRetries)
}

func (s *AvsSubscriber) processNewBatch(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) {
func (s *AvsSubscriber) processNewBatchV2(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) {
newBatchMutex.Lock()
defer newBatchMutex.Unlock()

Expand All @@ -181,10 +271,100 @@ func (s *AvsSubscriber) processNewBatch(batch *servicemanager.ContractAlignedLay
}
}

func (s *AvsSubscriber) processNewBatchV3(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) {
newBatchMutex.Lock()
defer newBatchMutex.Unlock()

batchIdentifier := append(batch.BatchMerkleRoot[:], batch.SenderAddress[:]...)
var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))

if _, ok := batchesSet[batchIdentifierHash]; !ok {
s.logger.Info("Received new task",
"batchMerkleRoot", hex.EncodeToString(batch.BatchMerkleRoot[:]),
"senderAddress", hex.EncodeToString(batch.SenderAddress[:]),
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]),)

batchesSet[batchIdentifierHash] = struct{}{}
newTaskCreatedChan <- batch

// Remove the batch from the set after RemoveBatchFromSetInterval time
go func() {
time.Sleep(RemoveBatchFromSetInterval)
newBatchMutex.Lock()
delete(batchesSet, batchIdentifierHash)
newBatchMutex.Unlock()
}()
}
}

// getLatestTaskFromEthereum queries the blockchain for the latest task using the FilterLogs method.
// The alternative to this is using the FilterNewBatch method from the contract's filterer, but it requires
// to iterate over all the logs, which is not efficient and not needed since we only need the latest task.
func (s *AvsSubscriber) getLatestTaskFromEthereumV2() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, error) {
latestBlock, err := s.AvsContractBindings.ethClient.BlockNumber(context.Background())
if err != nil {
latestBlock, err = s.AvsContractBindings.ethClientFallback.BlockNumber(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to get latest block number: %w", err)
}
}

var fromBlock uint64

if latestBlock < BlockInterval {
fromBlock = 0
} else {
fromBlock = latestBlock - BlockInterval
}

alignedLayerServiceManagerABI, err := abi.JSON(strings.NewReader(servicemanager.ContractAlignedLayerServiceManagerMetaData.ABI))
if err != nil {
return nil, fmt.Errorf("failed to parse ABI: %w", err)
}

// We just care about the NewBatch event
newBatchEvent := alignedLayerServiceManagerABI.Events["NewBatchV2"]
if newBatchEvent.ID == (ethcommon.Hash{}) {
return nil, fmt.Errorf("NewBatch event not found in ABI")
}

query := ethereum.FilterQuery{
FromBlock: big.NewInt(int64(fromBlock)),
ToBlock: big.NewInt(int64(latestBlock)),
Addresses: []ethcommon.Address{s.AlignedLayerServiceManagerAddr},
Topics: [][]ethcommon.Hash{{newBatchEvent.ID, {}}},
}

logs, err := s.AvsContractBindings.ethClient.FilterLogs(context.Background(), query)
if err != nil {
logs, err = s.AvsContractBindings.ethClientFallback.FilterLogs(context.Background(), query)
if err != nil {
return nil, fmt.Errorf("failed to get logs: %w", err)
}
}

if len(logs) == 0 {
return nil, fmt.Errorf("no logs found")
}

lastLog := logs[len(logs)-1]

var latestTask servicemanager.ContractAlignedLayerServiceManagerNewBatchV2
err = alignedLayerServiceManagerABI.UnpackIntoInterface(&latestTask, "NewBatchV2", lastLog.Data)
if err != nil {
return nil, fmt.Errorf("failed to unpack log data: %w", err)
}

// The second topic is the batch merkle root, as it is an indexed variable in the contract
latestTask.BatchMerkleRoot = lastLog.Topics[1]

return &latestTask, nil
}

// getLatestTaskFromEthereum queries the blockchain for the latest task using the FilterLogs method.
// The alternative to this is using the FilterNewBatch method from the contract's filterer, but it requires
// to iterate over all the logs, which is not efficient and not needed since we only need the latest task.
func (s *AvsSubscriber) getLatestTaskFromEthereum() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
func (s *AvsSubscriber) getLatestTaskFromEthereumV3() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
latestBlock, err := s.AvsContractBindings.ethClient.BlockNumber(context.Background())
if err != nil {
latestBlock, err = s.AvsContractBindings.ethClientFallback.BlockNumber(context.Background())
Expand Down
2 changes: 2 additions & 0 deletions operator/merkle_tree_old/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
lib/libmerkle_tree.a
lib/libmerkle_tree.dylib
Loading

0 comments on commit 45c9a50

Please sign in to comment.