Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a3e9fb8
Add implementation
Nov 4, 2024
b714153
Merge branch 'staging' into fix/aggregator-recover-lost-batches
Nov 4, 2024
4ee91c9
Add missed error handling
Nov 4, 2024
eb3fd52
Merge branch 'staging' into fix/aggregator-recover-lost-batches
MauroToscano Nov 4, 2024
6cd8fa9
Merge branch 'staging' into fix/aggregator-recover-lost-batches
MauroToscano Nov 4, 2024
ad522cd
Requested changes
Nov 4, 2024
27bf9c1
Merge branch 'staging' into fix/aggregator-recover-lost-batches
Nov 13, 2024
9c964bb
Refactor with retries
Nov 13, 2024
c8cb8e7
Add block fetch range to config, fix related bug
Nov 13, 2024
7759996
Update aggregator/pkg/server.go
MauroToscano Nov 13, 2024
9072d8b
Update aggregator/pkg/server.go
MauroToscano Nov 13, 2024
99c6c88
Merge branch 'staging' into fix/aggregator-recover-lost-batches
Nov 13, 2024
cfe283d
Restore taskMutex behavior on operator response handler
Nov 13, 2024
4773b0a
Revert "Restore taskMutex behavior on operator response handler"
Nov 13, 2024
009375f
chore: detail in comment
uri-99 Nov 14, 2024
9d56887
chore: detail comments
uri-99 Nov 14, 2024
86bb230
Merge branch 'staging' into fix/aggregator-recover-lost-batches
Nov 20, 2024
4b3eef0
feat: add batcher_start_local_no_fund make target (again)
uri-99 Nov 20, 2024
a44b1a7
Merge branch 'fix/aggregator-recover-lost-batches' of github.com:yeta…
Nov 21, 2024
eac734f
Merge branch 'staging' into fix/aggregator-recover-lost-batches
Nov 21, 2024
9392336
Remove unused imports
Nov 21, 2024
05c7243
Merge branch 'staging' into fix/aggregator-recover-lost-batches
Nov 22, 2024
2c5a7d1
Fix merge
Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 54 additions & 17 deletions aggregator/internal/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,48 @@ func (agg *Aggregator) ServeOperators() error {
return err
}

// Waits for the arrival of task associated with signedTaskResponse and returns true on success or false on failure
// If the task is not present in the internal map, it will try to fetch it from logs and retry.
// The number of retries is specified by `waitForEventRetries`, and the waiting time between each by `waitForEventSleepSeconds`
func (agg *Aggregator) waitForTaskAndFetchIfLost(signedTaskResponse *types.SignedTaskResponse) bool {
for i := 0; i < waitForEventRetries; i++ {
// Lock
agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Check if task is present")
_, ok := agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash]
// Unlock
agg.logger.Info("- Unlocked Resources: Check if task is present")
agg.taskMutex.Unlock()
if ok {
return true
}

// Task was not found in internal map, let's try to fetch it from logs
agg.logger.Info("Trying to fetch missed task from logs...")
batch, err := agg.avsReader.GetPendingBatchFromMerkleRoot(signedTaskResponse.BatchMerkleRoot)

if err == nil && batch != nil {
agg.logger.Info("Found missed task in logs with merkle root 0x%e", batch.BatchMerkleRoot)
// Adding new task will fail only if it already exists
agg.AddNewTask(batch.BatchMerkleRoot, batch.SenderAddress, batch.TaskCreatedBlock)
return true
}

if err != nil {
agg.logger.Warn("Error fetching task from logs: %v", err)
}

if batch == nil {
agg.logger.Info("Task not found in logs")
}

// Task was not found, wait and retry
time.Sleep(waitForEventSleepSeconds)
}

return false
}

// Aggregator Methods
// This is the list of methods that the Aggregator exposes to the Operator
// The Operator can call these methods to interact with the Aggregator
Expand All @@ -49,27 +91,25 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
"SenderAddress", "0x"+hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
"BatchIdentifierHash", "0x"+hex.EncodeToString(signedTaskResponse.BatchIdentifierHash[:]),
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]))
taskIndex := uint32(0)
ok := false

for i := 0; i < waitForEventRetries; i++ {
agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response")
taskIndex, ok = agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash]
if !ok {
agg.taskMutex.Unlock()
agg.logger.Info("- Unlocked Resources: Task not found in the internal map")
time.Sleep(waitForEventSleepSeconds)
} else {
break
}
if !agg.waitForTaskAndFetchIfLost(signedTaskResponse) {
agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum")
*reply = 1
return nil
}

agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Get task taskIndex")
taskIndex, ok := agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash]
// Unlock
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Get task taskIndex")
agg.taskMutex.Unlock()
if !ok {
agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum")
agg.logger.Errorf("Unexpected error fetching for task with merkle root 0x%x", signedTaskResponse.BatchMerkleRoot)
*reply = 1
return nil
}

agg.telemetry.LogOperatorResponse(signedTaskResponse.BatchMerkleRoot, signedTaskResponse.OperatorId)

// Don't wait infinitely if it can't answer
Expand Down Expand Up @@ -109,9 +149,6 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
*reply = 0
}

agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Task response processing finished")
agg.taskMutex.Unlock()

return nil
}

Expand Down
48 changes: 48 additions & 0 deletions core/chainio/avs_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
"github.com/Layr-Labs/eigensdk-go/logging"
)

const (
BatchFetchBlocksRange uint64 = 1000
)

type AvsReader struct {
*sdkavsregistry.ChainReader
AvsContractBindings *AvsServiceBindings
Expand Down Expand Up @@ -150,3 +154,47 @@ func (r *AvsReader) GetOldTaskHash(nBlocksOld uint64, interval uint64) (*[32]byt
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
return &batchIdentifierHash, nil
}

// Returns a pending batch from its merkle root or nil if it doesn't exist
// Searches the last `BatchFetchBlocksRange` blocks at most
func (r *AvsReader) GetPendingBatchFromMerkleRoot(merkleRoot [32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
latestBlock, err := r.AvsContractBindings.ethClient.BlockNumber(context.Background())
if err != nil {
latestBlock, err = r.AvsContractBindings.ethClientFallback.BlockNumber(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to get latest block number: %w", err)
}
}

var fromBlock uint64 = 0

if latestBlock > BatchFetchBlocksRange {
fromBlock = latestBlock - BatchFetchBlocksRange
}

logs, err := r.AvsContractBindings.ServiceManager.FilterNewBatchV3(&bind.FilterOpts{Start: fromBlock, End: &latestBlock, Context: context.Background()}, [][32]byte{merkleRoot})
if err != nil {
return nil, err
}
if err := logs.Error(); err != nil {
return nil, err
}

if !logs.Next() {
return nil, nil //not an error, but no tasks found
}

batch := logs.Event

batchIdentifier := append(batch.BatchMerkleRoot[:], batch.SenderAddress[:]...)
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
state, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(nil, batchIdentifierHash)
if err != nil {
return nil, err
}
if state.Responded {
return nil, nil
}

return batch, nil
}