3
3
from dataclasses import dataclass
4
4
from typing import Any , ClassVar , Dict , List , SupportsBytes , Type , TypeAlias
5
5
6
- from pydantic import Field , PrivateAttr , TypeAdapter
6
+ from pydantic import BaseModel , Field , PrivateAttr , TypeAdapter
7
7
8
8
from .base_types import Address , Bytes , Hash , HashInt , HexNumber , ZeroPaddedHexNumber
9
9
from .conversions import BytesConvertible , NumberConvertible
@@ -487,7 +487,7 @@ class ForkBlobSchedule(CamelModel):
487
487
488
488
489
489
class BlobSchedule (EthereumTestRootModel [Dict [str , ForkBlobSchedule ]]):
490
- """Blob schedule configuration dictionary."""
490
+ """Blob schedule configuration dictionary. Key is fork name. """
491
491
492
492
root : Dict [str , ForkBlobSchedule ] = Field (default_factory = dict , validate_default = True )
493
493
@@ -503,6 +503,36 @@ def last(self) -> ForkBlobSchedule | None:
503
503
return None
504
504
return list (self .root .values ())[- 1 ]
505
505
506
- def __getitem__ (self , key : str ) -> ForkBlobSchedule :
506
+ def __getitem__ (self , key : str ) -> ForkBlobSchedule | None :
507
507
"""Return the schedule for a given fork."""
508
- return self .root [key ]
508
+ return self .root .get (key )
509
+
510
+
511
+ class TimestampBlobSchedule (BaseModel ):
512
+ """
513
+ Contains a list of dictionaries. Each dictionary is a scheduled BPO fork.
514
+ Each dictionary's key is the activation timestamp and the values are a ForkBlobSchedule
515
+ object with the fields max, target and base_fee_update_fraction.
516
+ """
517
+
518
+ root : List [Dict [int , ForkBlobSchedule ]] = Field (default_factory = list , validate_default = True )
519
+
520
+ @classmethod
521
+ def add_schedule (cls , activation_timestamp : int , schedule : ForkBlobSchedule ):
522
+ """Add a schedule to the schedule list."""
523
+ assert activation_timestamp > - 1
524
+ assert schedule .max_blobs_per_block > 0
525
+ assert schedule .base_fee_update_fraction > 0
526
+ assert schedule .target_blobs_per_block > 0
527
+
528
+ # ensure that the timestamp of each scheduled bpo fork is unique
529
+ existing_keys : set = set ()
530
+ for d in cls .root :
531
+ existing_keys .update (d .keys ())
532
+ assert activation_timestamp not in existing_keys , (
533
+ f"No duplicate activation forks allowed: Timestamp { activation_timestamp } already "
534
+ "exists in the current schedule."
535
+ )
536
+
537
+ # add a scheduled bpo fork
538
+ cls .root .append ({activation_timestamp : schedule })
0 commit comments