-
Notifications
You must be signed in to change notification settings - Fork 385
fix(aggregator): (WIP) fetch task on task response if not cached #1351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: staging
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested on macos and linux and everything worked fined. I left a few minor comments.
aggregator/internal/pkg/server.go
Outdated
for i := 0; i < waitForEventRetries; i++ { | ||
// Lock | ||
agg.taskMutex.Lock() | ||
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log is repeated on ProcessOperatorSignedTaskResponseV2
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I can change it if that's more clear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
aggregator/internal/pkg/server.go
Outdated
} else { | ||
break | ||
} | ||
if !agg.waitForTask(signedTaskResponse) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find waitForTask
a bit weird primarily because it mutates the map and it isn't very clear. Personally I would prefer to consolidate everything into a single function (getTask
for example). Or at least, renaming it to something more descriptive, such as verifyTaskInMap
or fetchTask
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by consolidating everything in one function? You mean deleting waitForTask
and including its contents inside ProcessOperatorSignedTaskResponseV2
? I think I prefer to have an auxiliary function.
I also find waitForTask
not very descriptive either, but the same happens with other names. I'll keep thinking an alternative, tryToFetchTask
is my first candidate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Looks good, need to test later. |
fa1b39f
to
c8cb8e7
Compare
Need one more test and a review |
a0cd361
to
99c6c88
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reverts commit cfe283d.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested with 3 operators!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't working for me. I've tested it with 4 operators waited 15min
and restarted the aggregator. Only a few tasks were verified, not all of them. Then, I tried sending new tasks with the network working normally and the aggregator wouldn't send the aggregated responses.
The part of the task retrieval does work thou.
…notherco/aligned_layer into fix/aggregator-recover-lost-batches
25c084a
to
9392336
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still wip.
also missing: avoid getting logs for same merkle_root in concurrency. otherwise, you will fetch in the log in blockchain once for every operator, even tho they are all of the same merkle_root
Aggregator fetch missed batches
Description
This PR adds a fix so the aggregator is able to recover and process missed batches when an unknown response from an operator is received.
Type of change
Checklist
testnet
, everything else tostaging
Basic flow
On recover, if the aggregator has missed some new batches, the operators will process them and start sending signed responses to the aggregator. The main idea of this PR is that once this happens, the aggregator should first check if the received task is known to him (internal map) and if it isn't, try to fetch if from Ethereum logs. This should be done in a retry fashion, since network may be congested and events may take longer to arrive to certain RPC nodes.
Retry logic
After a response is received, the aggregator will check for the corresponding task in its internal map with 3 retries, waiting 1 sec, 2 sec, and 4 sec, respectively.
If the task is not found in internal maps, it will try to fetch logs. While doing so, some calls will be made to Ethereum, each of them having its own retry logic:
How to test
make batcher_send_infinite_sp1
.Pending
, the more you wait, the better.You should bump the operator
MaxRetries
underrpc_client.go
so the operators keep retrying sending responses to the aggregator.You may also modify the
pending_batch_fetch_block_range
config value underconfig-files/config-aggregator.yaml
to test for different scenarios. On the same file, you may also bumpgarbage_collector_period
orgarbage_collector_tasks_age
so batches are not removed before they get verified.We should stress test this PR so we are sure that no concurrency bug is possible, and for that you should try with the following:
pending_batch_fetch_block_range
variable to a large number, so all batches are fetched.Closes #1350