Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions arbnode/consensus_execution_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ type ConsensusExecutionSyncerConfig struct {
}

var DefaultConsensusExecutionSyncerConfig = ConsensusExecutionSyncerConfig{
SyncInterval: 1 * time.Second,
SyncInterval: 300 * time.Millisecond,
}

// We don't define a Test config. For most tests we want the Syncer to behave
// the same as in production.

func ConsensusExecutionSyncerConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Duration(prefix+".sync-interval", DefaultConsensusExecutionSyncerConfig.SyncInterval, "Interval in which finality data is pushed from consensus to execution")
f.Duration(prefix+".sync-interval", DefaultConsensusExecutionSyncerConfig.SyncInterval, "Interval in which finality and sync data is pushed from consensus to execution")
}

type ConsensusExecutionSyncer struct {
Expand All @@ -41,6 +44,7 @@ type ConsensusExecutionSyncer struct {
execClient execution.ExecutionClient
blockValidator *staker.BlockValidator
txStreamer *TransactionStreamer
syncMonitor *SyncMonitor
}

func NewConsensusExecutionSyncer(
Expand All @@ -49,19 +53,24 @@ func NewConsensusExecutionSyncer(
execClient execution.ExecutionClient,
blockValidator *staker.BlockValidator,
txStreamer *TransactionStreamer,
syncMonitor *SyncMonitor,
) *ConsensusExecutionSyncer {
return &ConsensusExecutionSyncer{
config: config,
inboxReader: inboxReader,
execClient: execClient,
blockValidator: blockValidator,
txStreamer: txStreamer,
syncMonitor: syncMonitor,
}
}

func (c *ConsensusExecutionSyncer) Start(ctx_in context.Context) {
c.StopWaiter.Start(ctx_in, c)
c.CallIteratively(c.pushFinalityDataFromConsensusToExecution)
if c.inboxReader != nil {
c.CallIteratively(c.pushFinalityDataFromConsensusToExecution)
}
c.CallIteratively(c.pushConsensusSyncDataToExecution)
}

func (c *ConsensusExecutionSyncer) getFinalityData(
Expand Down Expand Up @@ -140,3 +149,40 @@ func (c *ConsensusExecutionSyncer) pushFinalityDataFromConsensusToExecution(ctx

return c.config().SyncInterval
}

func (c *ConsensusExecutionSyncer) pushConsensusSyncDataToExecution(ctx context.Context) time.Duration {
synced := c.syncMonitor.Synced()

maxMessageCount, err := c.syncMonitor.GetMaxMessageCount()
if err != nil {
log.Error("Error getting max message count", "err", err)
return c.config().SyncInterval
}

var syncProgressMap map[string]interface{}
if !synced {
// Only populate the full progress map when not synced (for debugging)
syncProgressMap = c.syncMonitor.FullSyncProgressMap()
}

syncData := &execution.ConsensusSyncData{
Synced: synced,
MaxMessageCount: maxMessageCount,
SyncProgressMap: syncProgressMap,
UpdatedAt: time.Now(),
}

_, err = c.execClient.SetConsensusSyncData(ctx, syncData).Await(ctx)
if err != nil {
log.Error("Error pushing sync data from consensus to execution", "err", err)
} else {
log.Debug("Pushed sync data from consensus to execution",
"synced", syncData.Synced,
"maxMessageCount", syncData.MaxMessageCount,
"updatedAt", syncData.UpdatedAt,
"hasProgressMap", syncData.SyncProgressMap != nil,
)
}

return c.config().SyncInterval
}
4 changes: 4 additions & 0 deletions arbnode/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func (w *execClientWrapper) SetFinalityData(
return containers.NewReadyPromise(struct{}{}, nil)
}

func (w *execClientWrapper) SetConsensusSyncData(ctx context.Context, syncData *execution.ConsensusSyncData) containers.PromiseInterface[struct{}] {
return containers.NewReadyPromise(struct{}{}, nil)
}

func (w *execClientWrapper) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*execution.MessageResult] {
return containers.NewReadyPromise(w.ExecutionEngine.DigestMessage(num, msg, msgForPrefetch))
}
Expand Down
84 changes: 46 additions & 38 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ func (c *Config) Validate() error {
if c.TransactionStreamer.TrackBlockMetadataFrom != 0 && !c.BlockMetadataFetcher.Enable {
log.Warn("track-block-metadata-from is set but blockMetadata fetcher is not enabled")
}
// Check that sync-interval is not more than msg-lag / 2
if c.ConsensusExecutionSyncer.SyncInterval > c.SyncMonitor.MsgLag/2 {
log.Warn("consensus-execution-syncer.sync-interval is more than half of sync-monitor.msg-lag, which may cause sync issues",
"sync-interval", c.ConsensusExecutionSyncer.SyncInterval,
"msg-lag", c.SyncMonitor.MsgLag)
}
return nil
}

Expand Down Expand Up @@ -978,32 +984,46 @@ func getNodeParentChainReaderDisabled(
configFetcher ConfigFetcher,
blockMetadataFetcher *BlockMetadataFetcher,
) *Node {
// Create ConsensusExecutionSyncer even in L2-only mode to push sync data
consensusExecutionSyncerConfigFetcher := func() *ConsensusExecutionSyncerConfig {
return &configFetcher.Get().ConsensusExecutionSyncer
}
consensusExecutionSyncer := NewConsensusExecutionSyncer(
consensusExecutionSyncerConfigFetcher,
nil, // inboxReader
executionClient,
nil, // blockValidator
txStreamer,
syncMonitor,
)

return &Node{
ArbDB: arbDb,
Stack: stack,
ExecutionClient: executionClient,
ExecutionSequencer: executionSequencer,
ExecutionRecorder: executionRecorder,
L1Reader: nil,
TxStreamer: txStreamer,
DeployInfo: nil,
BlobReader: blobReader,
InboxReader: nil,
InboxTracker: nil,
DelayedSequencer: nil,
BatchPoster: nil,
MessagePruner: nil,
BlockValidator: nil,
StatelessBlockValidator: nil,
Staker: nil,
BroadcastServer: broadcastServer,
BroadcastClients: broadcastClients,
SeqCoordinator: coordinator,
MaintenanceRunner: maintenanceRunner,
SyncMonitor: syncMonitor,
configFetcher: configFetcher,
ctx: ctx,
blockMetadataFetcher: blockMetadataFetcher,
ArbDB: arbDb,
Stack: stack,
ExecutionClient: executionClient,
ExecutionSequencer: executionSequencer,
ExecutionRecorder: executionRecorder,
L1Reader: nil,
TxStreamer: txStreamer,
DeployInfo: nil,
BlobReader: blobReader,
InboxReader: nil,
InboxTracker: nil,
DelayedSequencer: nil,
BatchPoster: nil,
MessagePruner: nil,
BlockValidator: nil,
StatelessBlockValidator: nil,
Staker: nil,
BroadcastServer: broadcastServer,
BroadcastClients: broadcastClients,
SeqCoordinator: coordinator,
MaintenanceRunner: maintenanceRunner,
SyncMonitor: syncMonitor,
configFetcher: configFetcher,
ctx: ctx,
blockMetadataFetcher: blockMetadataFetcher,
ConsensusExecutionSyncer: consensusExecutionSyncer,
}
}

Expand Down Expand Up @@ -1123,7 +1143,7 @@ func createNodeImpl(
consensusExecutionSyncerConfigFetcher := func() *ConsensusExecutionSyncerConfig {
return &configFetcher.Get().ConsensusExecutionSyncer
}
consensusExecutionSyncer := NewConsensusExecutionSyncer(consensusExecutionSyncerConfigFetcher, inboxReader, executionClient, blockValidator, txStreamer)
consensusExecutionSyncer := NewConsensusExecutionSyncer(consensusExecutionSyncerConfigFetcher, inboxReader, executionClient, blockValidator, txStreamer, syncMonitor)

return &Node{
ArbDB: arbDb,
Expand Down Expand Up @@ -1513,18 +1533,6 @@ func (n *Node) GetBatchParentChainBlock(seqNum uint64) containers.PromiseInterfa
return containers.NewReadyPromise(n.InboxTracker.GetBatchParentChainBlock(seqNum))
}

func (n *Node) FullSyncProgressMap() containers.PromiseInterface[map[string]interface{}] {
return containers.NewReadyPromise(n.SyncMonitor.FullSyncProgressMap(), nil)
}

func (n *Node) Synced() containers.PromiseInterface[bool] {
return containers.NewReadyPromise(n.SyncMonitor.Synced(), nil)
}

func (n *Node) SyncTargetMessageCount() containers.PromiseInterface[arbutil.MessageIndex] {
return containers.NewReadyPromise(n.SyncMonitor.SyncTargetMessageCount(), nil)
}

func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata, msgResult execution.MessageResult, blockMetadata common.BlockMetadata) containers.PromiseInterface[struct{}] {
err := n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta, msgResult, blockMetadata)
return containers.NewReadyPromise(struct{}{}, err)
Expand Down
13 changes: 12 additions & 1 deletion arbnode/sync_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ func (s *SyncMonitor) GetFinalizedMsgCount(ctx context.Context) (arbutil.Message
return 0, nil
}

func (s *SyncMonitor) GetMaxMessageCount() (arbutil.MessageIndex, error) {
return s.maxMessageCount()
}

func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) {
msgCount, err := s.txStreamer.GetMessageCount()
if err != nil {
Expand Down Expand Up @@ -136,7 +140,14 @@ func (s *SyncMonitor) FullSyncProgressMap() map[string]interface{} {
}

syncTarget := s.SyncTargetMessageCount()
res["syncTargetMsgCount"] = syncTarget
res["consensusSyncTargetMsgCount"] = syncTarget

maxMsgCount, err := s.maxMessageCount()
if err != nil {
res["maxMessageCountError"] = err.Error()
return res
}
res["maxMessageCount"] = maxMsgCount

msgCount, err := s.txStreamer.GetMessageCount()
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ var ConfigDefault = Config{
TxPreChecker: DefaultTxPreCheckerConfig,
Caching: DefaultCachingConfig,
Forwarder: DefaultNodeForwarderConfig,
SyncMonitor: DefaultSyncMonitorConfig,

EnablePrefetchBlock: true,
StylusTarget: DefaultStylusTargetConfig,
Expand Down Expand Up @@ -570,6 +571,11 @@ func (n *ExecutionNode) SetFinalityData(
return containers.NewReadyPromise(struct{}{}, err)
}

func (n *ExecutionNode) SetConsensusSyncData(ctx context.Context, syncData *execution.ConsensusSyncData) containers.PromiseInterface[struct{}] {
n.SyncMonitor.SetConsensusSyncData(syncData)
return containers.NewReadyPromise(struct{}{}, nil)
}

func (n *ExecutionNode) InitializeTimeboost(ctx context.Context, chainConfig *params.ChainConfig) error {
execNodeConfig := n.ConfigFetcher()
if execNodeConfig.Sequencer.Timeboost.Enable {
Expand Down
Loading
Loading