Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a3e9fb8
Add implementation
Nov 4, 2024
b714153
Merge branch 'staging' into fix/aggregator-recover-lost-batches
Nov 4, 2024
4ee91c9
Add missed error handling
Nov 4, 2024
eb3fd52
Merge branch 'staging' into fix/aggregator-recover-lost-batches
MauroToscano Nov 4, 2024
6cd8fa9
Merge branch 'staging' into fix/aggregator-recover-lost-batches
MauroToscano Nov 4, 2024
ad522cd
Requested changes
Nov 4, 2024
27bf9c1
Merge branch 'staging' into fix/aggregator-recover-lost-batches
Nov 13, 2024
9c964bb
Refactor with retries
Nov 13, 2024
c8cb8e7
Add block fetch range to config, fix related bug
Nov 13, 2024
7759996
Update aggregator/pkg/server.go
MauroToscano Nov 13, 2024
9072d8b
Update aggregator/pkg/server.go
MauroToscano Nov 13, 2024
99c6c88
Merge branch 'staging' into fix/aggregator-recover-lost-batches
Nov 13, 2024
cfe283d
Restore taskMutex behavior on operator response handler
Nov 13, 2024
4773b0a
Revert "Restore taskMutex behavior on operator response handler"
Nov 13, 2024
009375f
chore: detail in comment
uri-99 Nov 14, 2024
9d56887
chore: detail comments
uri-99 Nov 14, 2024
86bb230
Merge branch 'staging' into fix/aggregator-recover-lost-batches
Nov 20, 2024
4b3eef0
feat: add batcher_start_local_no_fund make target (again)
uri-99 Nov 20, 2024
a44b1a7
Merge branch 'fix/aggregator-recover-lost-batches' of github.com:yeta…
Nov 21, 2024
eac734f
Merge branch 'staging' into fix/aggregator-recover-lost-batches
Nov 21, 2024
9392336
Remove unused imports
Nov 21, 2024
05c7243
Merge branch 'staging' into fix/aggregator-recover-lost-batches
Nov 22, 2024
2c5a7d1
Fix merge
Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions aggregator/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import (
"context"
"encoding/hex"
"fmt"
retry "github.com/yetanotherco/aligned_layer/core"
"github.com/yetanotherco/aligned_layer/core/types"
"net/http"
"net/rpc"
"time"

retry "github.com/yetanotherco/aligned_layer/core"
"github.com/yetanotherco/aligned_layer/core/types"
)

func (agg *Aggregator) ServeOperators() error {
Expand All @@ -34,11 +33,13 @@ func (agg *Aggregator) ServeOperators() error {
return err
}

// Aggregator Methods
// ~~ AGGREGATOR METHODS ~~
// This is the list of methods that the Aggregator exposes to the Operator
// The Operator can call these methods to interact with the Aggregator
// This methods are automatically registered by the RPC server
// This takes a response an adds it to the internal. If reaching the quorum, it sends the aggregated signatures to ethereum

// Takes a response from an operator and process it. After processing the response, the associated task may reach quorum, triggering a BLS service response.
// If the task related to the response is not known to the aggregator (not stored in internal map), it will try to fetch it from the contract's Events.
// Returns:
// - 0: Success
// - 1: Error
Expand All @@ -48,7 +49,6 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
"SenderAddress", "0x"+hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
"BatchIdentifierHash", "0x"+hex.EncodeToString(signedTaskResponse.BatchIdentifierHash[:]),
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]))
taskIndex := uint32(0)

// The Aggregator may receive the Task Identifier after the operators.
// If that's the case, we won't know about the task at this point
Expand All @@ -57,10 +57,24 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
taskIndex, err := agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash)

if err != nil {
agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum")
*reply = 1
return nil
agg.logger.Warn("Task not found in the internal map, might have been missed. Trying to fetch task data from Ethereum")
batch, err := agg.avsReader.GetPendingBatchFromMerkleRoot(signedTaskResponse.BatchMerkleRoot, agg.AggregatorConfig.Aggregator.PendingBatchFetchBlockRange)
if err != nil || batch == nil {
agg.logger.Warnf("Pending task with merkle root 0x%x not found in the contract", signedTaskResponse.BatchMerkleRoot)
*reply = 1
return nil // TODO non urgent nice to have: return an error. With it, the Operator would know that his signature corresponded to a not found task
}
agg.logger.Info("Task was found in Ethereum, adding it to the internal map")
agg.AddNewTask(batch.BatchMerkleRoot, batch.SenderAddress, batch.TaskCreatedBlock)
taskIndex, err = agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash)
if err != nil {
// This shouldn't happen, since we just added the task
agg.logger.Error("Unexpected error trying to get taskIndex from internal map")
*reply = 1
return nil
}
}

agg.telemetry.LogOperatorResponse(signedTaskResponse.BatchMerkleRoot, signedTaskResponse.OperatorId)

// Don't wait infinitely if it can't answer
Expand Down Expand Up @@ -120,8 +134,10 @@ TODO: We should refactor the retry duration considering extending it to a larger
func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte) (uint32, error) {
getTaskIndex_func := func() (uint32, error) {
agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Get task index")
taskIndex, ok := agg.batchesIdxByIdentifierHash[batchIdentifierHash]
agg.taskMutex.Unlock()
agg.logger.Info("- Unlocked Resources: Get task index")
if !ok {
return taskIndex, fmt.Errorf("Task not found in the internal map")
} else {
Expand Down
7 changes: 4 additions & 3 deletions config-files/config-aggregator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ aggregator:
enable_metrics: true
metrics_ip_port_address: localhost:9091
telemetry_ip_port_address: localhost:4001
garbage_collector_period: 2m #The period of the GC process. Suggested value for Prod: '168h' (7 days)
garbage_collector_tasks_age: 20 #The age of tasks that will be removed by the GC, in blocks. Suggested value for prod: '216000' (30 days)
garbage_collector_tasks_interval: 10 #The interval of queried blocks to get an old batch. Suggested value for prod: '900' (3 hours)
garbage_collector_period: 2m # The period of the GC process. Suggested value for Prod: '168h' (7 days)
garbage_collector_tasks_age: 20 # The age of tasks that will be removed by the GC, in blocks. Suggested value for prod: '216000' (30 days)
garbage_collector_tasks_interval: 10 # The interval of queried blocks to get an old batch. Suggested value for prod: '900' (3 hours)
bls_service_task_timeout: 168h # The timeout of bls aggregation service tasks. Suggested value for prod '168h' (7 days)
pending_batch_fetch_block_range: 1000 # The interval of queried blocks to get a pending batch by logs. Suggested valued for prod `1000`
gas_base_bump_percentage: 25 # Percentage to overestimate gas price when sending a task
gas_bump_incremental_percentage: 20 # An extra percentage to overestimate in each bump of respond to task. This is additive between tries
# Gas used formula = est_gas_by_node * (gas_base_bump_percentage + gas_bum_incremental_percentage * i) / 100, where i is the iteration number.
Expand Down
43 changes: 43 additions & 0 deletions core/chainio/avs_reader.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package chainio

Check failure on line 1 in core/chainio/avs_reader.go

View workflow job for this annotation

GitHub Actions / lint

: # github.com/yetanotherco/aligned_layer/core/chainio

import (
"context"
Expand Down Expand Up @@ -150,3 +150,46 @@
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
return &batchIdentifierHash, nil
}

// Returns a pending batch from its merkle root or nil if it doesn't exist
// Searches the last `blockRange` blocks at most
func (r *AvsReader) GetPendingBatchFromMerkleRoot(merkleRoot [32]byte, blockRange uint64) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
latestBlock, err := r.BlockNumberRetryable(context.Background())
if err != nil {
return nil, fmt.Errorf("Failed to get latest block number: %w", err)
}

var fromBlock uint64 = 0

if latestBlock > blockRange {
fromBlock = latestBlock - blockRange
}

logs, err := r.FilterBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: &latestBlock, Context: context.Background()}, [][32]byte{merkleRoot})
if err != nil {
return nil, err
}
if err := logs.Error(); err != nil {
return nil, err
}

if !logs.Next() {
return nil, nil // Not an error, but no tasks found
}

batch := logs.Event

batchIdentifier := append(batch.BatchMerkleRoot[:], batch.SenderAddress[:]...)
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
state, err := r.BatchesStateRetryable(nil, batchIdentifierHash)

if err != nil {
return nil, err
}

if state.Responded {
return nil, nil // Task found but already responded
}

return batch, nil
}
67 changes: 67 additions & 0 deletions core/chainio/retryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,70 @@
}
return retry.RetryWithData(subscribe_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
}

// |---AVS_READER---|

// TODO: These functions are being copied from AvsSubscriber and should be refactorized
// we don't actually need access to the AvsReader, AvsSubscriber or AbsWriter, but instead to the AvsContractBindings

// TODO: We should also add the fallback calls to the functions which are missing it

/*
- All errors are considered Transient Errors
- Retry times (3 retries): 1 sec, 2 sec, 4 sec.
*/
func (r *AvsReader) BlockNumberRetryable(ctx context.Context) (uint64, error) {
latestBlock_func := func() (uint64, error) {
// Try with main connection
latestBlock, err := r.AvsContractBindings.ethClient.BlockNumber(ctx)
if err != nil {
// If error try with fallback connection
latestBlock, err = r.AvsContractBindings.ethClientFallback.BlockNumber(ctx)
}
return latestBlock, err
}
return retry.RetryWithData(latestBlock_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
}

Check failure on line 254 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / build

undefined: retry.MinDelay

Check failure on line 254 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / build

too many arguments in call to retry.RetryWithData

Check failure on line 254 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / build

undefined: retry.RetryFactor

Check failure on line 254 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / build

undefined: retry.NumRetries

Check failure on line 254 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / build

undefined: retry.MaxInterval

Check failure on line 254 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / build

undefined: retry.MaxElapsedTime

Check failure on line 254 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / lint

undefined: retry.MinDelay

Check failure on line 254 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / lint

too many arguments in call to retry.RetryWithData

Check failure on line 254 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / lint

undefined: retry.RetryFactor

Check failure on line 254 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / lint

undefined: retry.NumRetries

Check failure on line 254 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / lint

undefined: retry.MaxInterval

Check failure on line 254 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / lint

undefined: retry.MaxElapsedTime
/*
- All errors are considered Transient Errors
- Retry times (3 retries): 1 sec, 2 sec, 4 sec.
*/
func (r *AvsReader) FilterBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) {
filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) {
// Try with main connection
batch, err := r.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot)
if err != nil {
// If error try with fallback connection
batch, err = r.AvsContractBindings.ServiceManagerFallback.FilterNewBatchV3(opts, batchMerkleRoot)
}
return batch, err
}
return retry.RetryWithData(filterNewBatchV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
}

Check failure on line 271 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / build

undefined: retry.MinDelay

Check failure on line 271 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / build

undefined: retry.RetryFactor

Check failure on line 271 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / build

undefined: retry.NumRetries

Check failure on line 271 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / build

undefined: retry.MaxInterval

Check failure on line 271 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / lint

undefined: retry.MinDelay

Check failure on line 271 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / lint

undefined: retry.RetryFactor

Check failure on line 271 in core/chainio/retryable.go

View workflow job for this annotation

GitHub Actions / lint

undefined: retry.NumRetries
/*
- All errors are considered Transient Errors
- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks)
*/
func (r *AvsReader) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct {
TaskCreatedBlock uint32
Responded bool
RespondToTaskFeeLimit *big.Int
}, error) {
batchState_func := func() (struct {
TaskCreatedBlock uint32
Responded bool
RespondToTaskFeeLimit *big.Int
}, error) {
// Try with main connection
state, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0)
if err != nil {
// If error try with fallback connection
state, err = r.AvsContractBindings.ServiceManagerFallback.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0)
}
return state, err
}

return retry.RetryWithData(batchState_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime)
}
3 changes: 3 additions & 0 deletions core/config/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type AggregatorConfig struct {
GarbageCollectorTasksAge uint64
GarbageCollectorTasksInterval uint64
BlsServiceTaskTimeout time.Duration
PendingBatchFetchBlockRange uint64
GasBaseBumpPercentage uint
GasBumpIncrementalPercentage uint
TimeToWaitBeforeBump time.Duration
Expand All @@ -43,6 +44,7 @@ type AggregatorConfigFromYaml struct {
GarbageCollectorTasksAge uint64 `yaml:"garbage_collector_tasks_age"`
GarbageCollectorTasksInterval uint64 `yaml:"garbage_collector_tasks_interval"`
BlsServiceTaskTimeout time.Duration `yaml:"bls_service_task_timeout"`
PendingBatchFetchBlockRange uint64 `yaml:"pending_batch_fetch_block_range"`
GasBaseBumpPercentage uint `yaml:"gas_base_bump_percentage"`
GasBumpIncrementalPercentage uint `yaml:"gas_bump_incremental_percentage"`
TimeToWaitBeforeBump time.Duration `yaml:"time_to_wait_before_bump"`
Expand Down Expand Up @@ -91,6 +93,7 @@ func NewAggregatorConfig(configFilePath string) *AggregatorConfig {
GarbageCollectorTasksAge uint64
GarbageCollectorTasksInterval uint64
BlsServiceTaskTimeout time.Duration
PendingBatchFetchBlockRange uint64
GasBaseBumpPercentage uint
GasBumpIncrementalPercentage uint
TimeToWaitBeforeBump time.Duration
Expand Down
Loading