From 3df0ffcc2cf34d6ce42245e5f7d164732255fbf8 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Tue, 26 Aug 2025 10:12:29 +0200 Subject: [PATCH 1/7] Refactor consensus-execution sync to push sync data This change eliminates a circular dependency between the consensus and execution layers by transforming the sync status flow from a pull-based to a push-based model. Previously, the execution layer would query the consensus layer for sync status through the ConsensusInfo interface, creating a tight coupling between the layers. The new architecture introduces a ConsensusSyncData structure that contains sync status, target message count, and progress information. The ConsensusExecutionSyncer now periodically pushes this data from consensus to execution, where it's stored using an atomic pointer for lock-free reads. This approach maintains consistency with the existing finality data push mechanism and provides better performance through reduced lock contention. As part of this refactoring, the ConsensusInfo interface has been simplified to only include the BlockMetadataAtMessageIndex method, removing the now-redundant Synced, FullSyncProgressMap, and SyncTargetMessageCount methods. This cleaner separation of concerns better supports alternative client implementations by clearly defining the data flow boundaries between consensus and execution layers. --- arbnode/consensus_execution_syncer.go | 26 ++++++++++- arbnode/inbox_test.go | 4 ++ arbnode/node.go | 14 +----- execution/gethexec/node.go | 5 +++ execution/gethexec/sync_monitor.go | 63 ++++++++++++++------------- execution/interface.go | 11 +++-- 6 files changed, 76 insertions(+), 47 deletions(-) diff --git a/arbnode/consensus_execution_syncer.go b/arbnode/consensus_execution_syncer.go index d7edb8c0ad..a1a494bc98 100644 --- a/arbnode/consensus_execution_syncer.go +++ b/arbnode/consensus_execution_syncer.go @@ -29,7 +29,7 @@ var DefaultConsensusExecutionSyncerConfig = ConsensusExecutionSyncerConfig{ } func ConsensusExecutionSyncerConfigAddOptions(prefix string, f *flag.FlagSet) { - f.Duration(prefix+".sync-interval", DefaultConsensusExecutionSyncerConfig.SyncInterval, "Interval in which finality data is pushed from consensus to execution") + f.Duration(prefix+".sync-interval", DefaultConsensusExecutionSyncerConfig.SyncInterval, "Interval in which finality and sync data is pushed from consensus to execution") } type ConsensusExecutionSyncer struct { @@ -41,6 +41,7 @@ type ConsensusExecutionSyncer struct { execClient execution.ExecutionClient blockValidator *staker.BlockValidator txStreamer *TransactionStreamer + syncMonitor *SyncMonitor } func NewConsensusExecutionSyncer( @@ -49,6 +50,7 @@ func NewConsensusExecutionSyncer( execClient execution.ExecutionClient, blockValidator *staker.BlockValidator, txStreamer *TransactionStreamer, + syncMonitor *SyncMonitor, ) *ConsensusExecutionSyncer { return &ConsensusExecutionSyncer{ config: config, @@ -56,12 +58,14 @@ func NewConsensusExecutionSyncer( execClient: execClient, blockValidator: blockValidator, txStreamer: txStreamer, + syncMonitor: syncMonitor, } } func (c *ConsensusExecutionSyncer) Start(ctx_in context.Context) { c.StopWaiter.Start(ctx_in, c) c.CallIteratively(c.pushFinalityDataFromConsensusToExecution) + c.CallIteratively(c.pushConsensusSyncDataToExecution) } func (c *ConsensusExecutionSyncer) getFinalityData( @@ -140,3 +144,23 @@ func (c *ConsensusExecutionSyncer) pushFinalityDataFromConsensusToExecution(ctx return c.config().SyncInterval } + +func (c *ConsensusExecutionSyncer) pushConsensusSyncDataToExecution(ctx context.Context) time.Duration { + syncData := &execution.ConsensusSyncData{ + Synced: c.syncMonitor.Synced(), + SyncTargetMessageCount: c.syncMonitor.SyncTargetMessageCount(), + SyncProgressMap: c.syncMonitor.FullSyncProgressMap(), + } + + _, err := c.execClient.SetConsensusSyncData(ctx, syncData).Await(ctx) + if err != nil { + log.Error("Error pushing sync data from consensus to execution", "err", err) + } else { + log.Debug("Pushed sync data from consensus to execution", + "synced", syncData.Synced, + "syncTarget", syncData.SyncTargetMessageCount, + ) + } + + return c.config().SyncInterval +} diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index 26a4d047d9..63e898cd77 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -88,6 +88,10 @@ func (w *execClientWrapper) SetFinalityData( return containers.NewReadyPromise(struct{}{}, nil) } +func (w *execClientWrapper) SetConsensusSyncData(ctx context.Context, syncData *execution.ConsensusSyncData) containers.PromiseInterface[struct{}] { + return containers.NewReadyPromise(struct{}{}, nil) +} + func (w *execClientWrapper) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*execution.MessageResult] { return containers.NewReadyPromise(w.ExecutionEngine.DigestMessage(num, msg, msgForPrefetch)) } diff --git a/arbnode/node.go b/arbnode/node.go index 51e93992f2..83819d85f6 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -1123,7 +1123,7 @@ func createNodeImpl( consensusExecutionSyncerConfigFetcher := func() *ConsensusExecutionSyncerConfig { return &configFetcher.Get().ConsensusExecutionSyncer } - consensusExecutionSyncer := NewConsensusExecutionSyncer(consensusExecutionSyncerConfigFetcher, inboxReader, executionClient, blockValidator, txStreamer) + consensusExecutionSyncer := NewConsensusExecutionSyncer(consensusExecutionSyncerConfigFetcher, inboxReader, executionClient, blockValidator, txStreamer, syncMonitor) return &Node{ ArbDB: arbDb, @@ -1513,18 +1513,6 @@ func (n *Node) GetBatchParentChainBlock(seqNum uint64) containers.PromiseInterfa return containers.NewReadyPromise(n.InboxTracker.GetBatchParentChainBlock(seqNum)) } -func (n *Node) FullSyncProgressMap() containers.PromiseInterface[map[string]interface{}] { - return containers.NewReadyPromise(n.SyncMonitor.FullSyncProgressMap(), nil) -} - -func (n *Node) Synced() containers.PromiseInterface[bool] { - return containers.NewReadyPromise(n.SyncMonitor.Synced(), nil) -} - -func (n *Node) SyncTargetMessageCount() containers.PromiseInterface[arbutil.MessageIndex] { - return containers.NewReadyPromise(n.SyncMonitor.SyncTargetMessageCount(), nil) -} - func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata, msgResult execution.MessageResult, blockMetadata common.BlockMetadata) containers.PromiseInterface[struct{}] { err := n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta, msgResult, blockMetadata) return containers.NewReadyPromise(struct{}{}, err) diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 20f3dbd840..14abfde142 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -587,6 +587,11 @@ func (n *ExecutionNode) SetFinalityData( return containers.NewReadyPromise(struct{}{}, err) } +func (n *ExecutionNode) SetConsensusSyncData(ctx context.Context, syncData *execution.ConsensusSyncData) containers.PromiseInterface[struct{}] { + n.SyncMonitor.SetConsensusSyncData(syncData) + return containers.NewReadyPromise(struct{}{}, nil) +} + func (n *ExecutionNode) InitializeTimeboost(ctx context.Context, chainConfig *params.ChainConfig) error { execNodeConfig := n.ConfigFetcher() if execNodeConfig.Sequencer.Timeboost.Enable { diff --git a/execution/gethexec/sync_monitor.go b/execution/gethexec/sync_monitor.go index a156f9d6b4..2fb5d29f15 100644 --- a/execution/gethexec/sync_monitor.go +++ b/execution/gethexec/sync_monitor.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync/atomic" flag "github.com/spf13/pflag" @@ -34,6 +35,8 @@ type SyncMonitor struct { config *SyncMonitorConfig consensus execution.ConsensusInfo exec *ExecutionEngine + + consensusSyncData atomic.Pointer[execution.ConsensusSyncData] } func NewSyncMonitor(config *SyncMonitorConfig, exec *ExecutionEngine) *SyncMonitor { @@ -43,20 +46,25 @@ func NewSyncMonitor(config *SyncMonitorConfig, exec *ExecutionEngine) *SyncMonit } } +// SetConsensusSyncData updates the sync data pushed from consensus +func (s *SyncMonitor) SetConsensusSyncData(syncData *execution.ConsensusSyncData) { + s.consensusSyncData.Store(syncData) +} + func (s *SyncMonitor) FullSyncProgressMap(ctx context.Context) map[string]interface{} { - res, err := s.consensus.FullSyncProgressMap().Await(ctx) - if err != nil { - res = make(map[string]interface{}) - res["fullSyncProgressMapError"] = err + data := s.consensusSyncData.Load() + if data == nil { + return map[string]interface{}{"error": "no consensus sync data available"} } - consensusSyncTarget, err := s.consensus.SyncTargetMessageCount().Await(ctx) - if err != nil { - res["consensusSyncTargetError"] = err - } else { - res["consensusSyncTarget"] = consensusSyncTarget + res := make(map[string]interface{}) + for k, v := range data.SyncProgressMap { + res[k] = v } + res["consensusSyncTarget"] = data.SyncTargetMessageCount + + // Add execution-specific data header, err := s.exec.getCurrentHeader() if err != nil { res["currentHeaderError"] = err @@ -82,32 +90,27 @@ func (s *SyncMonitor) SyncProgressMap(ctx context.Context) map[string]interface{ } func (s *SyncMonitor) Synced(ctx context.Context) bool { - synced, err := s.consensus.Synced().Await(ctx) - if err != nil { - log.Error("Error checking if consensus is synced", "err", err) + data := s.consensusSyncData.Load() + if data == nil { return false } - if synced { - built, err := s.exec.HeadMessageIndex() - if err != nil { - log.Error("Error getting head message index", "err", err) - return false - } - consensusSyncTarget, err := s.consensus.SyncTargetMessageCount().Await(ctx) - if err != nil { - log.Error("Error getting consensus sync target", "err", err) - return false - } - if consensusSyncTarget == 0 { - return false - } + if !data.Synced { + return false + } - if built+1 >= consensusSyncTarget { - return true - } + // Additional execution-side validation + built, err := s.exec.HeadMessageIndex() + if err != nil { + log.Error("Error getting head message index", "err", err) + return false } - return false + + if data.SyncTargetMessageCount == 0 { + return false + } + + return built+1 >= data.SyncTargetMessageCount } func (s *SyncMonitor) SetConsensusInfo(consensus execution.ConsensusInfo) { diff --git a/execution/interface.go b/execution/interface.go index 780584b9db..a2100faab6 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -33,6 +33,13 @@ type InboxBatch struct { Found bool } +// ConsensusSyncData contains sync status information pushed from consensus to execution +type ConsensusSyncData struct { + Synced bool + SyncTargetMessageCount arbutil.MessageIndex + SyncProgressMap map[string]interface{} +} + var ErrRetrySequencer = errors.New("please retry transaction") var ErrSequencerInsertLockTaken = errors.New("insert lock taken") @@ -45,6 +52,7 @@ type ExecutionClient interface { MessageIndexToBlockNumber(messageNum arbutil.MessageIndex) containers.PromiseInterface[uint64] BlockNumberToMessageIndex(blockNum uint64) containers.PromiseInterface[arbutil.MessageIndex] SetFinalityData(ctx context.Context, safeFinalityData *arbutil.FinalityData, finalizedFinalityData *arbutil.FinalityData, validatedFinalityData *arbutil.FinalityData) containers.PromiseInterface[struct{}] + SetConsensusSyncData(ctx context.Context, syncData *ConsensusSyncData) containers.PromiseInterface[struct{}] MarkFeedStart(to arbutil.MessageIndex) containers.PromiseInterface[struct{}] TriggerMaintenance() containers.PromiseInterface[struct{}] @@ -91,9 +99,6 @@ type BatchFetcher interface { } type ConsensusInfo interface { - Synced() containers.PromiseInterface[bool] - FullSyncProgressMap() containers.PromiseInterface[map[string]interface{}] - SyncTargetMessageCount() containers.PromiseInterface[arbutil.MessageIndex] BlockMetadataAtMessageIndex(msgIdx arbutil.MessageIndex) containers.PromiseInterface[common.BlockMetadata] } From 8418a7d5dd74a720f414ab2fd420ebc70b844e56 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Tue, 26 Aug 2025 22:32:34 +0200 Subject: [PATCH 2/7] Start the ConsensusExecutionSyncer for l2-only mode Also lower the sync interval for tests. --- arbnode/consensus_execution_syncer.go | 5 ++- arbnode/node.go | 64 ++++++++++++++++----------- system_tests/seq_coordinator_test.go | 1 + 3 files changed, 44 insertions(+), 26 deletions(-) diff --git a/arbnode/consensus_execution_syncer.go b/arbnode/consensus_execution_syncer.go index a1a494bc98..3868178c44 100644 --- a/arbnode/consensus_execution_syncer.go +++ b/arbnode/consensus_execution_syncer.go @@ -64,7 +64,9 @@ func NewConsensusExecutionSyncer( func (c *ConsensusExecutionSyncer) Start(ctx_in context.Context) { c.StopWaiter.Start(ctx_in, c) - c.CallIteratively(c.pushFinalityDataFromConsensusToExecution) + if c.inboxReader != nil { + c.CallIteratively(c.pushFinalityDataFromConsensusToExecution) + } c.CallIteratively(c.pushConsensusSyncDataToExecution) } @@ -159,6 +161,7 @@ func (c *ConsensusExecutionSyncer) pushConsensusSyncDataToExecution(ctx context. log.Debug("Pushed sync data from consensus to execution", "synced", syncData.Synced, "syncTarget", syncData.SyncTargetMessageCount, + "syncProgressMap", syncData.SyncProgressMap, ) } diff --git a/arbnode/node.go b/arbnode/node.go index 83819d85f6..79ba4612da 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -978,32 +978,46 @@ func getNodeParentChainReaderDisabled( configFetcher ConfigFetcher, blockMetadataFetcher *BlockMetadataFetcher, ) *Node { + // Create ConsensusExecutionSyncer even in L2-only mode to push sync data + consensusExecutionSyncerConfigFetcher := func() *ConsensusExecutionSyncerConfig { + return &configFetcher.Get().ConsensusExecutionSyncer + } + consensusExecutionSyncer := NewConsensusExecutionSyncer( + consensusExecutionSyncerConfigFetcher, + nil, // inboxReader + executionClient, + nil, // blockValidator + txStreamer, + syncMonitor, + ) + return &Node{ - ArbDB: arbDb, - Stack: stack, - ExecutionClient: executionClient, - ExecutionSequencer: executionSequencer, - ExecutionRecorder: executionRecorder, - L1Reader: nil, - TxStreamer: txStreamer, - DeployInfo: nil, - BlobReader: blobReader, - InboxReader: nil, - InboxTracker: nil, - DelayedSequencer: nil, - BatchPoster: nil, - MessagePruner: nil, - BlockValidator: nil, - StatelessBlockValidator: nil, - Staker: nil, - BroadcastServer: broadcastServer, - BroadcastClients: broadcastClients, - SeqCoordinator: coordinator, - MaintenanceRunner: maintenanceRunner, - SyncMonitor: syncMonitor, - configFetcher: configFetcher, - ctx: ctx, - blockMetadataFetcher: blockMetadataFetcher, + ArbDB: arbDb, + Stack: stack, + ExecutionClient: executionClient, + ExecutionSequencer: executionSequencer, + ExecutionRecorder: executionRecorder, + L1Reader: nil, + TxStreamer: txStreamer, + DeployInfo: nil, + BlobReader: blobReader, + InboxReader: nil, + InboxTracker: nil, + DelayedSequencer: nil, + BatchPoster: nil, + MessagePruner: nil, + BlockValidator: nil, + StatelessBlockValidator: nil, + Staker: nil, + BroadcastServer: broadcastServer, + BroadcastClients: broadcastClients, + SeqCoordinator: coordinator, + MaintenanceRunner: maintenanceRunner, + SyncMonitor: syncMonitor, + configFetcher: configFetcher, + ctx: ctx, + blockMetadataFetcher: blockMetadataFetcher, + ConsensusExecutionSyncer: consensusExecutionSyncer, } } diff --git a/system_tests/seq_coordinator_test.go b/system_tests/seq_coordinator_test.go index c126999332..3b67765eb0 100644 --- a/system_tests/seq_coordinator_test.go +++ b/system_tests/seq_coordinator_test.go @@ -52,6 +52,7 @@ func TestRedisSeqCoordinatorPriorities(t *testing.T) { builder.takeOwnership = false builder.nodeConfig.SeqCoordinator.Enable = true builder.nodeConfig.SeqCoordinator.RedisUrl = redisutil.CreateTestRedis(ctx, t) + builder.nodeConfig.ConsensusExecutionSyncer.SyncInterval = 10 * time.Millisecond l2Info := builder.L2Info From 3d0834ce1fc8a43423586ad7a73f30a42a06ed21 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Fri, 29 Aug 2025 19:17:30 +0200 Subject: [PATCH 3/7] Fix review comments about sync timing issues Only populate SyncProgressMap when not synced. MaxMessageCount is now a dedicated field that's always sent. Fix stale sync targets caused by push delay. Instead of consensus sending pre-calculated targets, it now sends raw MaxMessageCount. Execution maintains a sliding window history and calculates its own target using values from 1-2 MsgLag ago, properly accounting for the push delay. The default push interval and execution message lag are both 1 second so they work together well. Includes unit tests for the sliding window implementation. --- arbnode/consensus_execution_syncer.go | 28 +++- arbnode/sync_monitor.go | 13 +- execution/gethexec/sync_monitor.go | 117 ++++++++++++-- execution/gethexec/sync_monitor_test.go | 199 ++++++++++++++++++++++++ execution/interface.go | 8 +- system_tests/seq_coordinator_test.go | 1 + 6 files changed, 346 insertions(+), 20 deletions(-) create mode 100644 execution/gethexec/sync_monitor_test.go diff --git a/arbnode/consensus_execution_syncer.go b/arbnode/consensus_execution_syncer.go index 3868178c44..78c4e1d1bc 100644 --- a/arbnode/consensus_execution_syncer.go +++ b/arbnode/consensus_execution_syncer.go @@ -148,20 +148,36 @@ func (c *ConsensusExecutionSyncer) pushFinalityDataFromConsensusToExecution(ctx } func (c *ConsensusExecutionSyncer) pushConsensusSyncDataToExecution(ctx context.Context) time.Duration { + synced := c.syncMonitor.Synced() + + maxMessageCount, err := c.syncMonitor.GetMaxMessageCount() + if err != nil { + log.Error("Error getting max message count", "err", err) + return c.config().SyncInterval + } + + var syncProgressMap map[string]interface{} + if !synced { + // Only populate the full progress map when not synced (for debugging) + syncProgressMap = c.syncMonitor.FullSyncProgressMap() + } + syncData := &execution.ConsensusSyncData{ - Synced: c.syncMonitor.Synced(), - SyncTargetMessageCount: c.syncMonitor.SyncTargetMessageCount(), - SyncProgressMap: c.syncMonitor.FullSyncProgressMap(), + Synced: synced, + MaxMessageCount: maxMessageCount, + SyncProgressMap: syncProgressMap, + UpdatedAt: time.Now(), } - _, err := c.execClient.SetConsensusSyncData(ctx, syncData).Await(ctx) + _, err = c.execClient.SetConsensusSyncData(ctx, syncData).Await(ctx) if err != nil { log.Error("Error pushing sync data from consensus to execution", "err", err) } else { log.Debug("Pushed sync data from consensus to execution", "synced", syncData.Synced, - "syncTarget", syncData.SyncTargetMessageCount, - "syncProgressMap", syncData.SyncProgressMap, + "maxMessageCount", syncData.MaxMessageCount, + "updatedAt", syncData.UpdatedAt, + "hasProgressMap", syncData.SyncProgressMap != nil, ) } diff --git a/arbnode/sync_monitor.go b/arbnode/sync_monitor.go index 8374aa4319..86396c9413 100644 --- a/arbnode/sync_monitor.go +++ b/arbnode/sync_monitor.go @@ -84,6 +84,10 @@ func (s *SyncMonitor) GetFinalizedMsgCount(ctx context.Context) (arbutil.Message return 0, nil } +func (s *SyncMonitor) GetMaxMessageCount() (arbutil.MessageIndex, error) { + return s.maxMessageCount() +} + func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) { msgCount, err := s.txStreamer.GetMessageCount() if err != nil { @@ -136,7 +140,14 @@ func (s *SyncMonitor) FullSyncProgressMap() map[string]interface{} { } syncTarget := s.SyncTargetMessageCount() - res["syncTargetMsgCount"] = syncTarget + res["consensusSyncTargetMsgCount"] = syncTarget + + maxMsgCount, err := s.maxMessageCount() + if err != nil { + res["maxMessageCountError"] = err.Error() + return res + } + res["maxMessageCount"] = maxMsgCount msgCount, err := s.txStreamer.GetMessageCount() if err != nil { diff --git a/execution/gethexec/sync_monitor.go b/execution/gethexec/sync_monitor.go index 2fb5d29f15..46b110cb67 100644 --- a/execution/gethexec/sync_monitor.go +++ b/execution/gethexec/sync_monitor.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "sync" "sync/atomic" + "time" flag "github.com/spf13/pflag" @@ -16,19 +18,86 @@ import ( "github.com/offchainlabs/nitro/execution" ) +type syncDataEntry struct { + maxMessageCount arbutil.MessageIndex + timestamp time.Time +} + +// syncHistory maintains a time-based sliding window of sync data +type syncHistory struct { + mutex sync.RWMutex + entries []syncDataEntry + msgLag time.Duration +} + +func newSyncHistory(msgLag time.Duration) *syncHistory { + return &syncHistory{ + entries: make([]syncDataEntry, 0), + msgLag: msgLag, + } +} + +// add adds a new entry and trims old entries beyond 2*msgLag +func (h *syncHistory) add(maxMessageCount arbutil.MessageIndex, timestamp time.Time) { + h.mutex.Lock() + defer h.mutex.Unlock() + + h.entries = append(h.entries, syncDataEntry{ + maxMessageCount: maxMessageCount, + timestamp: timestamp, + }) + + // Trim entries older than 2*msgLag + cutoff := timestamp.Add(-2 * h.msgLag) + i := 0 + for i < len(h.entries) && h.entries[i].timestamp.Before(cutoff) { + i++ + } + if i > 0 { + h.entries = h.entries[i:] + } +} + +// getSyncTarget returns the appropriate sync target based on msgLag timing +// Returns 0 if no appropriate entry is found +func (h *syncHistory) getSyncTarget(now time.Time) arbutil.MessageIndex { + h.mutex.RLock() + defer h.mutex.RUnlock() + + if len(h.entries) == 0 { + return 0 + } + + // Find entries between msgLag and 2*msgLag ago + windowStart := now.Add(-2 * h.msgLag) + windowEnd := now.Add(-h.msgLag) + + for _, entry := range h.entries { + if !entry.timestamp.Before(windowStart) && !entry.timestamp.After(windowEnd) { + // Return the first (oldest) entry in the window + return entry.maxMessageCount + } + } + + return 0 +} + type SyncMonitorConfig struct { - SafeBlockWaitForBlockValidator bool `koanf:"safe-block-wait-for-block-validator"` - FinalizedBlockWaitForBlockValidator bool `koanf:"finalized-block-wait-for-block-validator"` + SafeBlockWaitForBlockValidator bool `koanf:"safe-block-wait-for-block-validator"` + FinalizedBlockWaitForBlockValidator bool `koanf:"finalized-block-wait-for-block-validator"` + MsgLag time.Duration `koanf:"msg-lag"` } var DefaultSyncMonitorConfig = SyncMonitorConfig{ SafeBlockWaitForBlockValidator: false, FinalizedBlockWaitForBlockValidator: false, + MsgLag: time.Second, } func SyncMonitorConfigAddOptions(prefix string, f *flag.FlagSet) { f.Bool(prefix+".safe-block-wait-for-block-validator", DefaultSyncMonitorConfig.SafeBlockWaitForBlockValidator, "wait for block validator to complete before returning safe block number") f.Bool(prefix+".finalized-block-wait-for-block-validator", DefaultSyncMonitorConfig.FinalizedBlockWaitForBlockValidator, "wait for block validator to complete before returning finalized block number") + f.Duration(prefix+".msg-lag", DefaultSyncMonitorConfig.MsgLag, "allowed message lag while still considered in sync") } type SyncMonitor struct { @@ -37,18 +106,25 @@ type SyncMonitor struct { exec *ExecutionEngine consensusSyncData atomic.Pointer[execution.ConsensusSyncData] + syncHistory *syncHistory } func NewSyncMonitor(config *SyncMonitorConfig, exec *ExecutionEngine) *SyncMonitor { return &SyncMonitor{ - config: config, - exec: exec, + config: config, + exec: exec, + syncHistory: newSyncHistory(config.MsgLag), } } // SetConsensusSyncData updates the sync data pushed from consensus func (s *SyncMonitor) SetConsensusSyncData(syncData *execution.ConsensusSyncData) { s.consensusSyncData.Store(syncData) + + // Add the max message count to history for sync target calculation + if syncData != nil && syncData.MaxMessageCount > 0 { + s.syncHistory.add(syncData.MaxMessageCount, syncData.UpdatedAt) + } } func (s *SyncMonitor) FullSyncProgressMap(ctx context.Context) map[string]interface{} { @@ -58,11 +134,21 @@ func (s *SyncMonitor) FullSyncProgressMap(ctx context.Context) map[string]interf } res := make(map[string]interface{}) - for k, v := range data.SyncProgressMap { - res[k] = v + + // Copy sync progress map if it exists (may be nil when synced) + if data.SyncProgressMap != nil { + for k, v := range data.SyncProgressMap { + res[k] = v + } } - res["consensusSyncTarget"] = data.SyncTargetMessageCount + // Always add the max message count + res["maxMessageCount"] = data.MaxMessageCount + + // Add execution-calculated sync target + now := time.Now() + executionSyncTarget := s.syncHistory.getSyncTarget(now) + res["executionSyncTarget"] = executionSyncTarget // Add execution-specific data header, err := s.exec.getCurrentHeader() @@ -95,22 +181,33 @@ func (s *SyncMonitor) Synced(ctx context.Context) bool { return false } + // Check that the sync data is fresh (not older than MsgLag) + now := time.Now() + if now.Sub(data.UpdatedAt) > s.config.MsgLag { + return false + } + + // Consensus must report being synced if !data.Synced { return false } - // Additional execution-side validation + // Get execution's current message index built, err := s.exec.HeadMessageIndex() if err != nil { log.Error("Error getting head message index", "err", err) return false } - if data.SyncTargetMessageCount == 0 { + // Calculate the sync target based on historical data + syncTarget := s.syncHistory.getSyncTarget(now) + if syncTarget == 0 { + // No valid sync target available yet return false } - return built+1 >= data.SyncTargetMessageCount + // Check if execution has reached the calculated sync target + return built+1 >= syncTarget } func (s *SyncMonitor) SetConsensusInfo(consensus execution.ConsensusInfo) { diff --git a/execution/gethexec/sync_monitor_test.go b/execution/gethexec/sync_monitor_test.go new file mode 100644 index 0000000000..02f9ed8b78 --- /dev/null +++ b/execution/gethexec/sync_monitor_test.go @@ -0,0 +1,199 @@ +// Copyright 2025, Offchain Labs, Inc. +// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md + +package gethexec + +import ( + "testing" + "time" + + "github.com/offchainlabs/nitro/arbutil" +) + +func TestSyncHistory_Add(t *testing.T) { + msgLag := 100 * time.Millisecond + h := newSyncHistory(msgLag) + + now := time.Now() + + // Add some entries + h.add(arbutil.MessageIndex(100), now) + h.add(arbutil.MessageIndex(200), now.Add(50*time.Millisecond)) + h.add(arbutil.MessageIndex(300), now.Add(100*time.Millisecond)) + + // Check that all entries are present + if len(h.entries) != 3 { + t.Errorf("Expected 3 entries, got %d", len(h.entries)) + } + + // Add an entry that should trigger trimming (more than 2*msgLag later) + h.add(arbutil.MessageIndex(400), now.Add(250*time.Millisecond)) + + // First entry should be trimmed (it's older than 2*msgLag from newest entry) + if len(h.entries) != 3 { + t.Errorf("Expected 3 entries after trimming, got %d", len(h.entries)) + } + + // Verify the first entry was trimmed + if h.entries[0].maxMessageCount != 200 { + t.Errorf("Expected first entry to have maxMessageCount 200, got %d", h.entries[0].maxMessageCount) + } +} + +func TestSyncHistory_GetSyncTarget(t *testing.T) { + msgLag := 100 * time.Millisecond + h := newSyncHistory(msgLag) + + now := time.Now() + + // Test empty history + target := h.getSyncTarget(now) + if target != 0 { + t.Errorf("Expected 0 for empty history, got %d", target) + } + + // Add entries at various times + h.add(arbutil.MessageIndex(100), now.Add(-250*time.Millisecond)) // Too old (beyond 2*msgLag) + h.add(arbutil.MessageIndex(200), now.Add(-180*time.Millisecond)) // In window (between msgLag and 2*msgLag) + h.add(arbutil.MessageIndex(300), now.Add(-150*time.Millisecond)) // In window + h.add(arbutil.MessageIndex(400), now.Add(-120*time.Millisecond)) // In window + h.add(arbutil.MessageIndex(500), now.Add(-80*time.Millisecond)) // Too recent (less than msgLag) + h.add(arbutil.MessageIndex(600), now.Add(-50*time.Millisecond)) // Too recent + + // Should return the oldest entry in the window (200) + target = h.getSyncTarget(now) + if target != 200 { + t.Errorf("Expected sync target 200, got %d", target) + } +} + +func TestSyncHistory_GetSyncTarget_NoValidEntries(t *testing.T) { + msgLag := 100 * time.Millisecond + h := newSyncHistory(msgLag) + + now := time.Now() + + // Add only entries outside the valid window + h.add(arbutil.MessageIndex(100), now.Add(-250*time.Millisecond)) // Too old + h.add(arbutil.MessageIndex(200), now.Add(-50*time.Millisecond)) // Too recent + + // Should return 0 as no entries are in the valid window + target := h.getSyncTarget(now) + if target != 0 { + t.Errorf("Expected sync target 0, got %d", target) + } +} + +func TestSyncHistory_GetSyncTarget_ExactBoundaries(t *testing.T) { + msgLag := 100 * time.Millisecond + h := newSyncHistory(msgLag) + + now := time.Now() + + // Add entries exactly at the boundaries + h.add(arbutil.MessageIndex(100), now.Add(-2*msgLag)) // Exactly at 2*msgLag ago (inclusive) + h.add(arbutil.MessageIndex(200), now.Add(-msgLag)) // Exactly at msgLag ago (inclusive) + + // Both should be in the window, return the oldest (100) + target := h.getSyncTarget(now) + if target != 100 { + t.Errorf("Expected sync target 100, got %d", target) + } +} + +func TestSyncHistory_Trimming(t *testing.T) { + msgLag := 100 * time.Millisecond + h := newSyncHistory(msgLag) + + baseTime := time.Now() + + // Add many entries - they will get trimmed as we go + // With msgLag=100ms, we keep entries within 200ms of the newest + for i := 0; i < 10; i++ { + // #nosec G115 + h.add(arbutil.MessageIndex(i*100), baseTime.Add(time.Duration(i*50)*time.Millisecond)) + } + + // After adding entry at 450ms, we keep entries from 250ms onwards + // That's entries at 250ms, 300ms, 350ms, 400ms, 450ms = 5 entries + if len(h.entries) != 5 { + t.Errorf("Expected 5 entries after incremental adds, got %d", len(h.entries)) + } + + // Verify the first entry is the one at 250ms (index 5) + if h.entries[0].maxMessageCount != 500 { + t.Errorf("Expected first entry to be 500, got %d", h.entries[0].maxMessageCount) + } + + // Add an entry much later that should trigger more aggressive trimming + futureTime := baseTime.Add(1 * time.Second) + h.add(arbutil.MessageIndex(1000), futureTime) + + // Should have trimmed all old entries (keeping only the new one since all others are > 200ms old) + if len(h.entries) != 1 { + t.Errorf("Expected 1 entry after adding future entry, got %d", len(h.entries)) + } + + if h.entries[0].maxMessageCount != 1000 { + t.Errorf("Expected remaining entry to be 1000, got %d", h.entries[0].maxMessageCount) + } +} + +func TestSyncHistory_ConcurrentAccess(t *testing.T) { + msgLag := 10 * time.Millisecond + h := newSyncHistory(msgLag) + + done := make(chan bool) + now := time.Now() + + // Concurrent adds + go func() { + for i := 0; i < 100; i++ { + // #nosec G115 + h.add(arbutil.MessageIndex(i), now.Add(time.Duration(i)*time.Millisecond)) + time.Sleep(time.Microsecond) + } + done <- true + }() + + // Concurrent reads + go func() { + for i := 0; i < 100; i++ { + h.getSyncTarget(now.Add(time.Duration(i) * time.Millisecond)) + time.Sleep(time.Microsecond) + } + done <- true + }() + + // Wait for both goroutines + <-done + <-done + + // Just verify we didn't panic and have some entries + if len(h.entries) == 0 { + t.Error("Expected some entries after concurrent operations") + } +} + +func TestSyncHistory_EdgeCases(t *testing.T) { + msgLag := 100 * time.Millisecond + h := newSyncHistory(msgLag) + + now := time.Now() + + // Test with single entry in window + h.add(arbutil.MessageIndex(100), now.Add(-150*time.Millisecond)) + target := h.getSyncTarget(now) + if target != 100 { + t.Errorf("Expected sync target 100 for single entry, got %d", target) + } + + // Test with msgLag = 0 (edge case) + h2 := newSyncHistory(0) + h2.add(arbutil.MessageIndex(200), now) + target2 := h2.getSyncTarget(now) + // With msgLag=0, the window is from 0 to 0 ago, so current entry should match + if target2 != 200 { + t.Errorf("Expected sync target 200 for msgLag=0, got %d", target2) + } +} diff --git a/execution/interface.go b/execution/interface.go index 1d4c701434..756b32e5ab 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -3,6 +3,7 @@ package execution import ( "context" "errors" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" @@ -36,9 +37,10 @@ type InboxBatch struct { // ConsensusSyncData contains sync status information pushed from consensus to execution type ConsensusSyncData struct { - Synced bool - SyncTargetMessageCount arbutil.MessageIndex - SyncProgressMap map[string]interface{} + Synced bool + MaxMessageCount arbutil.MessageIndex + SyncProgressMap map[string]interface{} // Only populated when !Synced for debugging + UpdatedAt time.Time } var ErrRetrySequencer = errors.New("please retry transaction") diff --git a/system_tests/seq_coordinator_test.go b/system_tests/seq_coordinator_test.go index 3b67765eb0..8969b0b627 100644 --- a/system_tests/seq_coordinator_test.go +++ b/system_tests/seq_coordinator_test.go @@ -53,6 +53,7 @@ func TestRedisSeqCoordinatorPriorities(t *testing.T) { builder.nodeConfig.SeqCoordinator.Enable = true builder.nodeConfig.SeqCoordinator.RedisUrl = redisutil.CreateTestRedis(ctx, t) builder.nodeConfig.ConsensusExecutionSyncer.SyncInterval = 10 * time.Millisecond + builder.execConfig.SyncMonitor.MsgLag = 10 * time.Millisecond l2Info := builder.L2Info From cfc1913fbb18800873f4ff0abc17f50bc7d3e9e1 Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Thu, 4 Sep 2025 13:45:51 +0200 Subject: [PATCH 4/7] Add SyncMonitor default to ConfigDefault --- execution/gethexec/node.go | 1 + 1 file changed, 1 insertion(+) diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 65a896cd78..d3b1350032 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -218,6 +218,7 @@ var ConfigDefault = Config{ Caching: DefaultCachingConfig, MultigasCollector: multigascollector.DefaultCollectorConfig, Forwarder: DefaultNodeForwarderConfig, + SyncMonitor: DefaultSyncMonitorConfig, EnablePrefetchBlock: true, StylusTarget: DefaultStylusTargetConfig, From 4756bfde61d31be24b722576c70777d339b0353d Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Fri, 5 Sep 2025 15:12:33 +0200 Subject: [PATCH 5/7] Sleep to give time for ConsensusExecutionSyncer --- arbnode/consensus_execution_syncer.go | 3 +++ system_tests/eth_sync_test.go | 2 ++ 2 files changed, 5 insertions(+) diff --git a/arbnode/consensus_execution_syncer.go b/arbnode/consensus_execution_syncer.go index 78c4e1d1bc..af60ad4341 100644 --- a/arbnode/consensus_execution_syncer.go +++ b/arbnode/consensus_execution_syncer.go @@ -28,6 +28,9 @@ var DefaultConsensusExecutionSyncerConfig = ConsensusExecutionSyncerConfig{ SyncInterval: 1 * time.Second, } +// We don't define a Test config. For most tests we want the Syncer to behave +// the same as in production. + func ConsensusExecutionSyncerConfigAddOptions(prefix string, f *flag.FlagSet) { f.Duration(prefix+".sync-interval", DefaultConsensusExecutionSyncerConfig.SyncInterval, "Interval in which finality and sync data is pushed from consensus to execution") } diff --git a/system_tests/eth_sync_test.go b/system_tests/eth_sync_test.go index af1fd4943f..6143753cd6 100644 --- a/system_tests/eth_sync_test.go +++ b/system_tests/eth_sync_test.go @@ -65,6 +65,7 @@ func TestEthSyncing(t *testing.T) { attempt++ } + // TODO: Use Client.SyncProgressMap to see the full map progress, err := testClientB.Client.SyncProgress(ctx) Require(t, err) if progress == nil { @@ -72,6 +73,7 @@ func TestEthSyncing(t *testing.T) { } for testClientB.ConsensusNode.TxStreamer.ExecuteNextMsg(ctx) { } + time.Sleep(time.Second * 2) progress, err = testClientB.Client.SyncProgress(ctx) Require(t, err) if progress != nil { From 2473e0ba470ae56cb16b7bfd006bd9cffeb7a17b Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Tue, 23 Sep 2025 12:48:23 +0600 Subject: [PATCH 6/7] Address PR review comments on sync monitor - Simplify sync target to use oldest entry < msgLag ago (not 2*msgLag window) - Use min(now, syncData.UpdatedAt) to prevent future timestamps - Rename maxMessageCount to consensusMaxMessageCount for clarity - Update tests to match new msgLag-based trimming behavior --- execution/gethexec/sync_monitor.go | 28 ++++++----- execution/gethexec/sync_monitor_test.go | 64 +++++++++++++------------ 2 files changed, 50 insertions(+), 42 deletions(-) diff --git a/execution/gethexec/sync_monitor.go b/execution/gethexec/sync_monitor.go index 46b110cb67..182483b50a 100644 --- a/execution/gethexec/sync_monitor.go +++ b/execution/gethexec/sync_monitor.go @@ -37,7 +37,7 @@ func newSyncHistory(msgLag time.Duration) *syncHistory { } } -// add adds a new entry and trims old entries beyond 2*msgLag +// add adds a new entry and trims old entries beyond msgLag func (h *syncHistory) add(maxMessageCount arbutil.MessageIndex, timestamp time.Time) { h.mutex.Lock() defer h.mutex.Unlock() @@ -47,8 +47,8 @@ func (h *syncHistory) add(maxMessageCount arbutil.MessageIndex, timestamp time.T timestamp: timestamp, }) - // Trim entries older than 2*msgLag - cutoff := timestamp.Add(-2 * h.msgLag) + // Trim entries older than msgLag + cutoff := timestamp.Add(-h.msgLag) i := 0 for i < len(h.entries) && h.entries[i].timestamp.Before(cutoff) { i++ @@ -58,7 +58,10 @@ func (h *syncHistory) add(maxMessageCount arbutil.MessageIndex, timestamp time.T } } -// getSyncTarget returns the appropriate sync target based on msgLag timing +// getSyncTarget returns the sync target based on msgLag timing. +// The sync target is the consensusMaxMessageCount from the oldest +// syncDataEntry that was received more recently that than 1 msgLag ago. +// There may be no entries if the syncHistory has not been updated recently. // Returns 0 if no appropriate entry is found func (h *syncHistory) getSyncTarget(now time.Time) arbutil.MessageIndex { h.mutex.RLock() @@ -68,13 +71,11 @@ func (h *syncHistory) getSyncTarget(now time.Time) arbutil.MessageIndex { return 0 } - // Find entries between msgLag and 2*msgLag ago - windowStart := now.Add(-2 * h.msgLag) - windowEnd := now.Add(-h.msgLag) + // Find oldest entry newer than now-msgLag + windowStart := now.Add(-h.msgLag) for _, entry := range h.entries { - if !entry.timestamp.Before(windowStart) && !entry.timestamp.After(windowEnd) { - // Return the first (oldest) entry in the window + if !entry.timestamp.Before(windowStart) { return entry.maxMessageCount } } @@ -123,7 +124,12 @@ func (s *SyncMonitor) SetConsensusSyncData(syncData *execution.ConsensusSyncData // Add the max message count to history for sync target calculation if syncData != nil && syncData.MaxMessageCount > 0 { - s.syncHistory.add(syncData.MaxMessageCount, syncData.UpdatedAt) + syncTime := time.Now() + if syncTime.After(syncData.UpdatedAt) { + syncTime = syncData.UpdatedAt + } + + s.syncHistory.add(syncData.MaxMessageCount, syncTime) } } @@ -143,7 +149,7 @@ func (s *SyncMonitor) FullSyncProgressMap(ctx context.Context) map[string]interf } // Always add the max message count - res["maxMessageCount"] = data.MaxMessageCount + res["consensusMaxMessageCount"] = data.MaxMessageCount // Add execution-calculated sync target now := time.Now() diff --git a/execution/gethexec/sync_monitor_test.go b/execution/gethexec/sync_monitor_test.go index 02f9ed8b78..ed5b5999b4 100644 --- a/execution/gethexec/sync_monitor_test.go +++ b/execution/gethexec/sync_monitor_test.go @@ -26,17 +26,19 @@ func TestSyncHistory_Add(t *testing.T) { t.Errorf("Expected 3 entries, got %d", len(h.entries)) } - // Add an entry that should trigger trimming (more than 2*msgLag later) + // Add an entry that should trigger trimming (more than msgLag later) + // The new entry is at now+250ms, so we keep entries from (now+250ms - 100ms) = now+150ms onwards + // All previous entries are before now+150ms, so they get trimmed h.add(arbutil.MessageIndex(400), now.Add(250*time.Millisecond)) - // First entry should be trimmed (it's older than 2*msgLag from newest entry) - if len(h.entries) != 3 { - t.Errorf("Expected 3 entries after trimming, got %d", len(h.entries)) + // Only the newest entry should remain after trimming + if len(h.entries) != 1 { + t.Errorf("Expected 1 entry after trimming, got %d", len(h.entries)) } - // Verify the first entry was trimmed - if h.entries[0].maxMessageCount != 200 { - t.Errorf("Expected first entry to have maxMessageCount 200, got %d", h.entries[0].maxMessageCount) + // Verify only the newest entry remains + if h.entries[0].maxMessageCount != 400 { + t.Errorf("Expected first entry to have maxMessageCount 400, got %d", h.entries[0].maxMessageCount) } } @@ -53,17 +55,17 @@ func TestSyncHistory_GetSyncTarget(t *testing.T) { } // Add entries at various times - h.add(arbutil.MessageIndex(100), now.Add(-250*time.Millisecond)) // Too old (beyond 2*msgLag) - h.add(arbutil.MessageIndex(200), now.Add(-180*time.Millisecond)) // In window (between msgLag and 2*msgLag) - h.add(arbutil.MessageIndex(300), now.Add(-150*time.Millisecond)) // In window - h.add(arbutil.MessageIndex(400), now.Add(-120*time.Millisecond)) // In window - h.add(arbutil.MessageIndex(500), now.Add(-80*time.Millisecond)) // Too recent (less than msgLag) - h.add(arbutil.MessageIndex(600), now.Add(-50*time.Millisecond)) // Too recent - - // Should return the oldest entry in the window (200) + h.add(arbutil.MessageIndex(100), now.Add(-250*time.Millisecond)) // Too old (beyond msgLag) + h.add(arbutil.MessageIndex(200), now.Add(-180*time.Millisecond)) // Too old (beyond msgLag) + h.add(arbutil.MessageIndex(300), now.Add(-150*time.Millisecond)) // Too old (beyond msgLag) + h.add(arbutil.MessageIndex(400), now.Add(-120*time.Millisecond)) // Too old (beyond msgLag) + h.add(arbutil.MessageIndex(500), now.Add(-80*time.Millisecond)) // In window (less than msgLag ago) + h.add(arbutil.MessageIndex(600), now.Add(-50*time.Millisecond)) // In window (less than msgLag ago) + + // Should return the oldest entry in the window (500) target = h.getSyncTarget(now) - if target != 200 { - t.Errorf("Expected sync target 200, got %d", target) + if target != 500 { + t.Errorf("Expected sync target 500, got %d", target) } } @@ -75,7 +77,7 @@ func TestSyncHistory_GetSyncTarget_NoValidEntries(t *testing.T) { // Add only entries outside the valid window h.add(arbutil.MessageIndex(100), now.Add(-250*time.Millisecond)) // Too old - h.add(arbutil.MessageIndex(200), now.Add(-50*time.Millisecond)) // Too recent + h.add(arbutil.MessageIndex(200), now.Add(-250*time.Millisecond)) // Too old // Should return 0 as no entries are in the valid window target := h.getSyncTarget(now) @@ -91,10 +93,10 @@ func TestSyncHistory_GetSyncTarget_ExactBoundaries(t *testing.T) { now := time.Now() // Add entries exactly at the boundaries - h.add(arbutil.MessageIndex(100), now.Add(-2*msgLag)) // Exactly at 2*msgLag ago (inclusive) - h.add(arbutil.MessageIndex(200), now.Add(-msgLag)) // Exactly at msgLag ago (inclusive) + h.add(arbutil.MessageIndex(100), now.Add(-msgLag)) // Exactly at msgLag ago (included) + h.add(arbutil.MessageIndex(200), now.Add(-msgLag/2)) // More recent than msgLag ago - // Both should be in the window, return the oldest (100) + // Both entries are in the window, return the oldest (100) target := h.getSyncTarget(now) if target != 100 { t.Errorf("Expected sync target 100, got %d", target) @@ -108,28 +110,28 @@ func TestSyncHistory_Trimming(t *testing.T) { baseTime := time.Now() // Add many entries - they will get trimmed as we go - // With msgLag=100ms, we keep entries within 200ms of the newest + // With msgLag=100ms, we keep entries within 100ms of the newest for i := 0; i < 10; i++ { // #nosec G115 h.add(arbutil.MessageIndex(i*100), baseTime.Add(time.Duration(i*50)*time.Millisecond)) } - // After adding entry at 450ms, we keep entries from 250ms onwards - // That's entries at 250ms, 300ms, 350ms, 400ms, 450ms = 5 entries - if len(h.entries) != 5 { - t.Errorf("Expected 5 entries after incremental adds, got %d", len(h.entries)) + // After adding entry at 450ms, we keep entries from 350ms onwards + // That's entries at 350ms, 400ms, 450ms = 3 entries + if len(h.entries) != 3 { + t.Errorf("Expected 3 entries after incremental adds, got %d", len(h.entries)) } - // Verify the first entry is the one at 250ms (index 5) - if h.entries[0].maxMessageCount != 500 { - t.Errorf("Expected first entry to be 500, got %d", h.entries[0].maxMessageCount) + // Verify the first entry is the one at 350ms (index 7) + if h.entries[0].maxMessageCount != 700 { + t.Errorf("Expected first entry to be 700, got %d", h.entries[0].maxMessageCount) } // Add an entry much later that should trigger more aggressive trimming futureTime := baseTime.Add(1 * time.Second) h.add(arbutil.MessageIndex(1000), futureTime) - // Should have trimmed all old entries (keeping only the new one since all others are > 200ms old) + // Should have trimmed all old entries (keeping only the new one since all others are > 100ms old) if len(h.entries) != 1 { t.Errorf("Expected 1 entry after adding future entry, got %d", len(h.entries)) } @@ -182,7 +184,7 @@ func TestSyncHistory_EdgeCases(t *testing.T) { now := time.Now() // Test with single entry in window - h.add(arbutil.MessageIndex(100), now.Add(-150*time.Millisecond)) + h.add(arbutil.MessageIndex(100), now.Add(-50*time.Millisecond)) target := h.getSyncTarget(now) if target != 100 { t.Errorf("Expected sync target 100 for single entry, got %d", target) From 50e0d861a15f7f9ec8aa431293639d13b2e9e2ba Mon Sep 17 00:00:00 2001 From: Tristan Wilson Date: Fri, 3 Oct 2025 12:26:46 +0200 Subject: [PATCH 7/7] Change consensus sync interval, remove TODO --- arbnode/consensus_execution_syncer.go | 2 +- arbnode/node.go | 6 ++++++ system_tests/eth_sync_test.go | 1 - 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/arbnode/consensus_execution_syncer.go b/arbnode/consensus_execution_syncer.go index f9302becc7..3ba0a85591 100644 --- a/arbnode/consensus_execution_syncer.go +++ b/arbnode/consensus_execution_syncer.go @@ -25,7 +25,7 @@ type ConsensusExecutionSyncerConfig struct { } var DefaultConsensusExecutionSyncerConfig = ConsensusExecutionSyncerConfig{ - SyncInterval: 1 * time.Second, + SyncInterval: 300 * time.Millisecond, } // We don't define a Test config. For most tests we want the Syncer to behave diff --git a/arbnode/node.go b/arbnode/node.go index 64af0ad4fd..d5513c10da 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -119,6 +119,12 @@ func (c *Config) Validate() error { if c.TransactionStreamer.TrackBlockMetadataFrom != 0 && !c.BlockMetadataFetcher.Enable { log.Warn("track-block-metadata-from is set but blockMetadata fetcher is not enabled") } + // Check that sync-interval is not more than msg-lag / 2 + if c.ConsensusExecutionSyncer.SyncInterval > c.SyncMonitor.MsgLag/2 { + log.Warn("consensus-execution-syncer.sync-interval is more than half of sync-monitor.msg-lag, which may cause sync issues", + "sync-interval", c.ConsensusExecutionSyncer.SyncInterval, + "msg-lag", c.SyncMonitor.MsgLag) + } return nil } diff --git a/system_tests/eth_sync_test.go b/system_tests/eth_sync_test.go index 6143753cd6..a48b408546 100644 --- a/system_tests/eth_sync_test.go +++ b/system_tests/eth_sync_test.go @@ -65,7 +65,6 @@ func TestEthSyncing(t *testing.T) { attempt++ } - // TODO: Use Client.SyncProgressMap to see the full map progress, err := testClientB.Client.SyncProgress(ctx) Require(t, err) if progress == nil {