From e4a2c76eeb979256afca1e0f1f10c91ed85391ec Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Fri, 22 Aug 2025 13:43:46 -0500 Subject: [PATCH 1/2] feat: wip consensus_aware_consensus_layer proxyd poller, todo: clean up consensus poller code and look at overwritting responses for optimisim_syncStatus --- proxyd/config.go | 9 +- proxyd/consensus_poller.go | 229 ++++++++++++++++++++++++++++++++++--- proxyd/proxyd.go | 12 +- 3 files changed, 225 insertions(+), 25 deletions(-) diff --git a/proxyd/config.go b/proxyd/config.go index 9addba67..6d4004da 100644 --- a/proxyd/config.go +++ b/proxyd/config.go @@ -147,6 +147,8 @@ func (b *BackendGroupConfig) ValidateRoutingStrategy(bgName string) bool { switch b.RoutingStrategy { case ConsensusAwareRoutingStrategy: return true + case ConsensusAwareCLRoutingStrategy: + return true case MulticallRoutingStrategy: return true case FallbackRoutingStrategy: @@ -161,9 +163,10 @@ func (b *BackendGroupConfig) ValidateRoutingStrategy(bgName string) bool { } const ( - ConsensusAwareRoutingStrategy RoutingStrategy = "consensus_aware" - MulticallRoutingStrategy RoutingStrategy = "multicall" - FallbackRoutingStrategy RoutingStrategy = "fallback" + ConsensusAwareRoutingStrategy RoutingStrategy = "consensus_aware" + ConsensusAwareCLRoutingStrategy RoutingStrategy = "consensus_aware_consensus_layer" + MulticallRoutingStrategy RoutingStrategy = "multicall" + FallbackRoutingStrategy RoutingStrategy = "fallback" ) type BackendGroupConfig struct { diff --git a/proxyd/consensus_poller.go b/proxyd/consensus_poller.go index a17945d1..136e9401 100644 --- a/proxyd/consensus_poller.go +++ b/proxyd/consensus_poller.go @@ -40,6 +40,7 @@ type ConsensusPoller struct { maxBlockLag uint64 maxBlockRange uint64 interval time.Duration + consensusLayer bool } type backendState struct { @@ -248,6 +249,12 @@ func WithMaxBlockLag(maxBlockLag uint64) ConsensusOpt { } } +func WithConsensusLayerConsensusAwareness(clConsensusAware bool) ConsensusOpt { + return func(cp *ConsensusPoller) { + cp.consensusLayer = true + } +} + func WithMaxBlockRange(maxBlockRange uint64) ConsensusOpt { return func(cp *ConsensusPoller) { cp.maxBlockRange = maxBlockRange @@ -341,10 +348,37 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { RecordConsensusBackendPeerCount(be, peerCount) } - latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest") - if err != nil { - log.Warn("error updating backend - latest block will not be updated", "name", be.Name, "err", err) - return + var latestBlockNumber, safeBlockNumber, finalizedBlockNumber hexutil.Uint64 + var latestBlockHash string + if cp.consensusLayer { + syncStatus, err := cp.fetchCLSyncStatus(ctx, be) + if err != nil { + log.Warn("error updating CL backend - backend will not be updated", "name", be.Name, "err", err) + return + } + latestBlockHash = syncStatus.LatestBlockHash + latestBlockNumber = syncStatus.LatestBlockNumber + safeBlockNumber = syncStatus.SafeBlockNumber + finalizedBlockNumber = syncStatus.FinalizedBlockNumber + + } else { + latestBlockNumber, latestBlockHash, err = cp.fetchELBlock(ctx, be, "latest") + if err != nil { + log.Warn("error updating backend - latest block will not be updated", "name", be.Name, "err", err) + return + } + safeBlockNumber, _, err = cp.fetchELBlock(ctx, be, "safe") + if err != nil { + log.Warn("error updating backend - safe block will not be updated", "name", be.Name, "err", err) + return + } + + finalizedBlockNumber, _, err = cp.fetchELBlock(ctx, be, "finalized") + if err != nil { + log.Warn("error updating backend - finalized block will not be updated", "name", be.Name, "err", err) + return + } + } if latestBlockNumber == 0 { log.Warn("error backend responded a 200 with blockheight 0 for latest block", "name", be.Name) @@ -352,24 +386,12 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { return } - safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe") - if err != nil { - log.Warn("error updating backend - safe block will not be updated", "name", be.Name, "err", err) - return - } - if safeBlockNumber == 0 { log.Warn("error backend responded a 200 with blockheight 0 for safe block", "name", be.Name) be.intermittentErrorsSlidingWindow.Incr() return } - finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized") - if err != nil { - log.Warn("error updating backend - finalized block will not be updated", "name", be.Name, "err", err) - return - } - if finalizedBlockNumber == 0 { log.Warn("error backend responded a 200 with blockheight 0 for finalized block", "name", be.Name) be.intermittentErrorsSlidingWindow.Incr() @@ -491,7 +513,16 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { for !hasConsensus { allAgreed := true for be := range candidates { - actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String()) + // For consensus layer check consistency of block N by using optimism_outputAtblock + var err error + var actualBlockNumber hexutil.Uint64 + var actualBlockHash string + + if cp.consensusLayer { + actualBlockNumber, actualBlockHash, err = cp.fetchCLBlock(ctx, be, proposedBlock.String()) + } else { + actualBlockNumber, actualBlockHash, err = cp.fetchELBlock(ctx, be, proposedBlock.String()) + } if err != nil { log.Warn("error updating backend", "name", be.Name, "err", err) continue @@ -621,9 +652,12 @@ func (cp *ConsensusPoller) Reset() { } } -// fetchBlock is a convenient wrapper to make a request to get a block directly from the backend -func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) { +func (cp *ConsensusPoller) fetchELBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) { var rpcRes RPCRes + log.Trace("executing fetchELBlock for backend", + "backend", be.Name, + "safety", string(block), + ) err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false) if err != nil { return 0, "", err @@ -639,8 +673,117 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st return } +type ELSyncStatus struct { + LatestBlockNumber hexutil.Uint64 + LatestBlockHash string + SafeBlockNumber hexutil.Uint64 + SafeBlockHash string + FinalizedBlockNumber hexutil.Uint64 + FinalizedBlockHash string +} + +func (cp *ConsensusPoller) fetchCLSyncStatus(ctx context.Context, be *Backend) (elSyncStatus *ELSyncStatus, err error) { + var rpcRes RPCRes + log.Trace("executing fetchCLBlock for backend", + "backend", be.Name, + ) + err = be.ForwardRPC(ctx, &rpcRes, "67", "optimism_syncStatus") + if err != nil { + return nil, err + } + + // TODO: Update this + elSyncResponse, ok := rpcRes.Result.(map[string]interface{}) + log.Trace("syncStatus response for backend", + "backend", be.Name, + "syncStatus", elSyncResponse, + ) + if !ok { + return nil, fmt.Errorf("unexpected response to optimism_syncStatus on backend %s", be.Name) + } + + latestBlockNumber, latestBlockHash, err := parseCLSyncStatusBlock(elSyncResponse, "unsafe_l2") + if err != nil { + return nil, err + } + + safeBlockNumber, safeBlockHash, err := parseCLSyncStatusBlock(elSyncResponse, "safe_l2") + if err != nil { + return nil, err + } + + finalizedBlockNumber, finalizedBlockHash, err := parseCLSyncStatusBlock(elSyncResponse, "finalized_l2") + if err != nil { + return nil, err + } + + return &ELSyncStatus{ + LatestBlockNumber: hexutil.Uint64(latestBlockNumber), + LatestBlockHash: latestBlockHash, + SafeBlockNumber: hexutil.Uint64(safeBlockNumber), + SafeBlockHash: safeBlockHash, + FinalizedBlockNumber: hexutil.Uint64(finalizedBlockNumber), + FinalizedBlockHash: finalizedBlockHash, + }, nil +} + +func parseCLSyncStatusBlock(jsonMap map[string]interface{}, safety string) (blockNumber uint64, blockHash string, err error) { + safetyMap, ok := jsonMap[safety].(map[string]interface{}) + if !ok { + return 0, "", fmt.Errorf("unexpected unmarshall to optimism_syncStatus on consensus layer backend safety %s", safety) + } + log.Trace("safetyMap", + "safetyMap", safetyMap, + ) + + numberVal, nOk := safetyMap["number"].(float64) + hashVal, hOk := safetyMap["hash"].(string) + if !nOk || !hOk { + return 0, "", fmt.Errorf("missing or invalid 'number' or 'hash' field in %s block", safety) + } + blockNumber = uint64(numberVal) + blockHash = hashVal + + return blockNumber, blockHash, nil +} + +// fetchCLBlock uses optimism_outputAtBlock to get a specific block info +func (cp *ConsensusPoller) fetchCLBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) { + var rpcRes RPCRes + log.Trace("executing fetchCLBlock for backend", + "backend", be.Name, + "block", block, + ) + err = be.ForwardRPC(ctx, &rpcRes, "67", "optimism_outputAtBlock", block) + if err != nil { + return 0, "", err + } + + // TODO: Parse this response correctly + elSyncResponse, ok := rpcRes.Result.(map[string]interface{}) + log.Trace("outputAtBlock response for backend", + "backend", be.Name, + "syncStatus", elSyncResponse, + ) + if !ok { + return 0, "", fmt.Errorf("unexpected response to optimism_outputAtBlock on backend %s", be.Name) + } + blockRef := elSyncResponse["blockRef"].(map[string]interface{}) + blockNumber = hexutil.Uint64(blockRef["number"].(float64)) + blockHash = blockRef["hash"].(string) + + return blockNumber, blockHash, nil +} + // getPeerCount is a convenient wrapper to retrieve the current peer count from the backend func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count uint64, err error) { + if cp.consensusLayer { + return cp.fetchCLPeerCount(ctx, be) + } + return cp.fetchELPeerCount(ctx, be) +} + +func (cp *ConsensusPoller) fetchELPeerCount(ctx context.Context, be *Backend) (count uint64, err error) { var rpcRes RPCRes err = be.ForwardRPC(ctx, &rpcRes, "67", "net_peerCount") if err != nil { @@ -657,8 +800,48 @@ func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count return count, nil } +func (cp *ConsensusPoller) fetchCLPeerCount(ctx context.Context, be *Backend) (count uint64, err error) { + var rpcRes RPCRes + // https://docs.optimism.io/operators/node-operators/json-rpc#opp2p_peerstats + log.Trace("executing fetchCLPeerCount", + "backend", be.Name, + ) + err = be.ForwardRPC(ctx, &rpcRes, "67", "opp2p_peerStats") + if err != nil { + return 0, err + } + + jsonMap, ok := rpcRes.Result.(map[string]interface{}) + if !ok { + return 0, fmt.Errorf("unexpected response to net_peerCount on backend %s", be.Name) + } + connectedFloat, ok := jsonMap["connected"].(float64) + if !ok { + return 0, fmt.Errorf("missing or invalid 'connected' field in opp2p_peerStats response from backend %s", be.Name) + } + count = uint64(connectedFloat) + + log.Trace("fetchCLPeerCount result", + "backend", be.Name, + "result", jsonMap, + "count", count, + ) + + return count, nil +} + // isInSync is a convenient wrapper to check if the backend is in sync from the network func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bool, err error) { + if cp.consensusLayer { + // TODO: Implement determining if CL is in sync + return true, nil + } + return cp.isELInSync(ctx, be) + +} + +// isInSync is a convenient wrapper to check if the backend is in sync from the network +func (cp *ConsensusPoller) isELInSync(ctx context.Context, be *Backend) (result bool, err error) { var rpcRes RPCRes err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_syncing") if err != nil { @@ -684,6 +867,14 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo return res, nil } +// TODO: Figure out correct way to determine if CL in Sync +func (cp *ConsensusPoller) isCLInSync(ctx context.Context, be *Backend) (result bool, err error) { + log.Trace("executing isCLInSync for backend", + "backend", be.Name, + ) + return true, nil +} + // GetBackendState creates a copy of backend state so that the caller can use it without locking func (cp *ConsensusPoller) GetBackendState(be *Backend) *backendState { bs := cp.backendState[be] diff --git a/proxyd/proxyd.go b/proxyd/proxyd.go index 05263ec9..dc3f8d3e 100644 --- a/proxyd/proxyd.go +++ b/proxyd/proxyd.go @@ -484,16 +484,22 @@ func Start(config *Config) (*Server, func(), error) { bgcfg := config.BackendGroups[bgName] if !bgcfg.ValidateRoutingStrategy(bgName) { - log.Crit("Invalid routing strategy provided. Valid options: fallback, multicall, consensus_aware, \"\"", "name", bgName) + log.Crit("Invalid routing strategy provided. Valid options: fallback, multicall, consensus_aware, consensus_aware_consensus_layer \"\"", "name", bgName) } log.Info("configuring routing strategy for backend_group", "name", bgName, "routing_strategy", bgcfg.RoutingStrategy) - if bgcfg.RoutingStrategy == ConsensusAwareRoutingStrategy { - log.Info("creating poller for consensus aware backend_group", "name", bgName) + if bgcfg.RoutingStrategy == ConsensusAwareRoutingStrategy || bgcfg.RoutingStrategy == ConsensusAwareCLRoutingStrategy { + log.Info("creating poller for consensus aware backend_group", + "name", bgName, + "routing_strategy", bgcfg.RoutingStrategy, + ) copts := make([]ConsensusOpt, 0) + if bgcfg.RoutingStrategy == ConsensusAwareCLRoutingStrategy { + copts = append(copts, WithConsensusLayerConsensusAwareness(true)) + } if bgcfg.ConsensusAsyncHandler == "noop" { copts = append(copts, WithAsyncHandler(NewNoopAsyncHandler())) } From ca18c829e15e99158121a26451b96c4148e7095f Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Fri, 22 Aug 2025 13:58:08 -0500 Subject: [PATCH 2/2] feat: nit comments --- proxyd/consensus_poller.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/proxyd/consensus_poller.go b/proxyd/consensus_poller.go index 136e9401..8c97d496 100644 --- a/proxyd/consensus_poller.go +++ b/proxyd/consensus_poller.go @@ -692,7 +692,6 @@ func (cp *ConsensusPoller) fetchCLSyncStatus(ctx context.Context, be *Backend) ( return nil, err } - // TODO: Update this elSyncResponse, ok := rpcRes.Result.(map[string]interface{}) log.Trace("syncStatus response for backend", "backend", be.Name, @@ -727,6 +726,7 @@ func (cp *ConsensusPoller) fetchCLSyncStatus(ctx context.Context, be *Backend) ( }, nil } +// parseCLSyncStatusBlock is a helper function to parse the inner map structs of optimism_syncStatusResponse func parseCLSyncStatusBlock(jsonMap map[string]interface{}, safety string) (blockNumber uint64, blockHash string, err error) { safetyMap, ok := jsonMap[safety].(map[string]interface{}) if !ok { @@ -759,18 +759,19 @@ func (cp *ConsensusPoller) fetchCLBlock(ctx context.Context, be *Backend, block return 0, "", err } - // TODO: Parse this response correctly elSyncResponse, ok := rpcRes.Result.(map[string]interface{}) - log.Trace("outputAtBlock response for backend", - "backend", be.Name, - "syncStatus", elSyncResponse, - ) if !ok { return 0, "", fmt.Errorf("unexpected response to optimism_outputAtBlock on backend %s", be.Name) } blockRef := elSyncResponse["blockRef"].(map[string]interface{}) blockNumber = hexutil.Uint64(blockRef["number"].(float64)) blockHash = blockRef["hash"].(string) + log.Trace("outputAtBlock response for backend", + "backend", be.Name, + "syncStatus", elSyncResponse, + "blockNumber", blockNumber, + "blockHash", blockHash, + ) return blockNumber, blockHash, nil } @@ -833,8 +834,7 @@ func (cp *ConsensusPoller) fetchCLPeerCount(ctx context.Context, be *Backend) (c // isInSync is a convenient wrapper to check if the backend is in sync from the network func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bool, err error) { if cp.consensusLayer { - // TODO: Implement determining if CL is in sync - return true, nil + return cp.isCLInSync(ctx, be) } return cp.isELInSync(ctx, be)