diff --git a/arbnode/node.go b/arbnode/node.go index 667f1954dd..420356ec5c 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -48,6 +48,7 @@ import ( "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/contracts" "github.com/offchainlabs/nitro/util/headerreader" + "github.com/offchainlabs/nitro/util/livedbsnapshotter" "github.com/offchainlabs/nitro/util/redisutil" "github.com/offchainlabs/nitro/util/rpcclient" "github.com/offchainlabs/nitro/util/signature" @@ -74,6 +75,7 @@ type Config struct { ResourceMgmt resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"` BlockMetadataFetcher BlockMetadataFetcherConfig `koanf:"block-metadata-fetcher" reload:"hot"` ConsensusExecutionSyncer ConsensusExecutionSyncerConfig `koanf:"consensus-execution-syncer"` + LiveDBSnapshotter livedbsnapshotter.Config `koanf:"live-db-snapshotter"` // SnapSyncConfig is only used for testing purposes, these should not be configured in production. SnapSyncTest SnapSyncConfig } @@ -145,6 +147,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed MaintenanceConfigAddOptions(prefix+".maintenance", f) BlockMetadataFetcherConfigAddOptions(prefix+".block-metadata-fetcher", f) ConsensusExecutionSyncerConfigAddOptions(prefix+".consensus-execution-syncer", f) + livedbsnapshotter.ConfigAddOptions(prefix+".live-db-snapshotter", f) } var ConfigDefault = Config{ @@ -168,6 +171,7 @@ var ConfigDefault = Config{ Maintenance: DefaultMaintenanceConfig, ConsensusExecutionSyncer: DefaultConsensusExecutionSyncerConfig, SnapSyncTest: DefaultSnapSyncConfig, + LiveDBSnapshotter: livedbsnapshotter.DefaultConfig, } func ConfigDefaultL1Test() *Config { @@ -270,6 +274,7 @@ type Node struct { configFetcher ConfigFetcher ctx context.Context ConsensusExecutionSyncer *ConsensusExecutionSyncer + liveDBSnapshotter *livedbsnapshotter.LiveDBSnapshotter } type SnapSyncConfig struct { @@ -942,6 +947,7 @@ func getNodeParentChainReaderDisabled( syncMonitor *SyncMonitor, configFetcher ConfigFetcher, blockMetadataFetcher *BlockMetadataFetcher, + liveDBSnapshotter *livedbsnapshotter.LiveDBSnapshotter, ) *Node { return &Node{ ArbDB: arbDb, @@ -970,6 +976,7 @@ func getNodeParentChainReaderDisabled( configFetcher: configFetcher, ctx: ctx, blockMetadataFetcher: blockMetadataFetcher, + liveDBSnapshotter: liveDBSnapshotter, } } @@ -991,6 +998,7 @@ func createNodeImpl( fatalErrChan chan error, parentChainID *big.Int, blobReader daprovider.BlobReader, + dBSnapshotTrigger chan struct{}, ) (*Node, error) { config := configFetcher.Get() @@ -1041,8 +1049,13 @@ func createNodeImpl( return nil, err } + var liveDBSnapshotter *livedbsnapshotter.LiveDBSnapshotter + if config.LiveDBSnapshotter.Enable { + liveDBSnapshotter = livedbsnapshotter.NewLiveDBSnapshotter(arbDb, "arbitrumdata", dBSnapshotTrigger, config.LiveDBSnapshotter.Dir, false, nil) + } + if !config.ParentChainReader.Enable { - return getNodeParentChainReaderDisabled(ctx, arbDb, stack, executionClient, executionSequencer, executionRecorder, txStreamer, blobReader, broadcastServer, broadcastClients, coordinator, maintenanceRunner, syncMonitor, configFetcher, blockMetadataFetcher), nil + return getNodeParentChainReaderDisabled(ctx, arbDb, stack, executionClient, executionSequencer, executionRecorder, txStreamer, blobReader, broadcastServer, broadcastClients, coordinator, maintenanceRunner, syncMonitor, configFetcher, blockMetadataFetcher, liveDBSnapshotter), nil } delayedBridge, sequencerInbox, err := getDelayedBridgeAndSequencerInbox(deployInfo, l1client) @@ -1118,6 +1131,7 @@ func createNodeImpl( configFetcher: configFetcher, ctx: ctx, ConsensusExecutionSyncer: consensusExecutionSyncer, + liveDBSnapshotter: liveDBSnapshotter, }, nil } @@ -1221,11 +1235,12 @@ func CreateNodeExecutionClient( fatalErrChan chan error, parentChainID *big.Int, blobReader daprovider.BlobReader, + dBSnapshotTrigger chan struct{}, ) (*Node, error) { if executionClient == nil { return nil, errors.New("execution client must be non-nil") } - currentNode, err := createNodeImpl(ctx, stack, executionClient, nil, nil, nil, arbDb, configFetcher, l2Config, l1client, deployInfo, txOptsValidator, txOptsBatchPoster, dataSigner, fatalErrChan, parentChainID, blobReader) + currentNode, err := createNodeImpl(ctx, stack, executionClient, nil, nil, nil, arbDb, configFetcher, l2Config, l1client, deployInfo, txOptsValidator, txOptsBatchPoster, dataSigner, fatalErrChan, parentChainID, blobReader, dBSnapshotTrigger) if err != nil { return nil, err } @@ -1251,11 +1266,12 @@ func CreateNodeFullExecutionClient( fatalErrChan chan error, parentChainID *big.Int, blobReader daprovider.BlobReader, + dBSnapshotTrigger chan struct{}, ) (*Node, error) { if (executionClient == nil) || (executionSequencer == nil) || (executionRecorder == nil) || (executionBatchPoster == nil) { return nil, errors.New("execution client, sequencer, recorder, and batch poster must be non-nil") } - currentNode, err := createNodeImpl(ctx, stack, executionClient, executionSequencer, executionRecorder, executionBatchPoster, arbDb, configFetcher, l2Config, l1client, deployInfo, txOptsValidator, txOptsBatchPoster, dataSigner, fatalErrChan, parentChainID, blobReader) + currentNode, err := createNodeImpl(ctx, stack, executionClient, executionSequencer, executionRecorder, executionBatchPoster, arbDb, configFetcher, l2Config, l1client, deployInfo, txOptsValidator, txOptsBatchPoster, dataSigner, fatalErrChan, parentChainID, blobReader, dBSnapshotTrigger) if err != nil { return nil, err } @@ -1403,10 +1419,16 @@ func (n *Node) Start(ctx context.Context) error { if n.ConsensusExecutionSyncer != nil { n.ConsensusExecutionSyncer.Start(ctx) } + if n.liveDBSnapshotter != nil { + n.liveDBSnapshotter.Start(ctx) + } return nil } func (n *Node) StopAndWait() { + if n.liveDBSnapshotter != nil { + n.liveDBSnapshotter.StopAndWait() + } if n.MaintenanceRunner != nil && n.MaintenanceRunner.Started() { n.MaintenanceRunner.StopAndWait() } diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 5be7288432..6a78b7e03c 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -523,6 +523,32 @@ func mainImpl() int { } } + // TODO: after the consensus-execution split, remove this code block and modify LiveDBSnapshotter to + // directly read from sigur2. Currently execution will trigger snapshot creation of consensus side once + // its own snapshotting is complete. + var executionDBSnapShotTrigger, consensusDBSnapShotTrigger chan struct{} + if nodeConfig.Execution.LiveDBSnapshotter.Enable { + executionDBSnapShotTrigger = make(chan struct{}, 1) + go func() { + sigusr := make(chan os.Signal, 1) + signal.Notify(sigusr, syscall.SIGUSR2) + for { + select { + case <-ctx.Done(): + return + case <-sigusr: + select { + case executionDBSnapShotTrigger <- struct{}{}: + default: + } + } + } + }() + } + if nodeConfig.Node.LiveDBSnapshotter.Enable { + consensusDBSnapShotTrigger = make(chan struct{}, 1) + } + execNode, err := gethexec.CreateExecutionNode( ctx, stack, @@ -531,6 +557,8 @@ func mainImpl() int { l1Client, func() *gethexec.Config { return &liveNodeConfig.Get().Execution }, liveNodeConfig.Get().Node.TransactionStreamer.SyncTillBlock, + executionDBSnapShotTrigger, + consensusDBSnapShotTrigger, // execution will invoke conensus's db snapshotting ) if err != nil { log.Error("failed to create execution node", "err", err) @@ -555,6 +583,7 @@ func mainImpl() int { fatalErrChan, new(big.Int).SetUint64(nodeConfig.ParentChain.ID), blobReader, + consensusDBSnapShotTrigger, ) if err != nil { log.Error("failed to create node", "err", err) diff --git a/cmd/replay/db.go b/cmd/replay/db.go index ed03b5ebbf..7105286207 100644 --- a/cmd/replay/db.go +++ b/cmd/replay/db.go @@ -18,6 +18,10 @@ import ( type PreimageDb struct{} +func (db PreimageDb) CreateDBSnapshot(dir string) error { + return errors.New("createDBSnapshot method is not supported by PreimageDb") +} + func (db PreimageDb) Has(key []byte) (bool, error) { if len(key) != 32 { return false, nil diff --git a/execution/gethexec/api.go b/execution/gethexec/api.go index 879ac29e97..d16cff5682 100644 --- a/execution/gethexec/api.go +++ b/execution/gethexec/api.go @@ -11,6 +11,7 @@ import ( "math/big" "sync" "sync/atomic" + "syscall" "time" "github.com/ethereum/go-ethereum/arbitrum" @@ -84,13 +85,21 @@ func (a *ArbTimeboostAPI) SendExpressLaneTransaction(ctx context.Context, msg *t } type ArbDebugAPI struct { - blockchain *core.BlockChain - blockRangeBound uint64 - timeoutQueueBound uint64 + blockchain *core.BlockChain + blockRangeBound uint64 + timeoutQueueBound uint64 + enableLiveDBSnapshotting bool } -func NewArbDebugAPI(blockchain *core.BlockChain, blockRangeBound uint64, timeoutQueueBound uint64) *ArbDebugAPI { - return &ArbDebugAPI{blockchain, blockRangeBound, timeoutQueueBound} +func NewArbDebugAPI(blockchain *core.BlockChain, blockRangeBound uint64, timeoutQueueBound uint64, enableLiveDBSnapshotting bool) *ArbDebugAPI { + return &ArbDebugAPI{blockchain, blockRangeBound, timeoutQueueBound, enableLiveDBSnapshotting} +} + +func (api *ArbDebugAPI) CreateDBSnapshot(ctx context.Context) error { + if !api.enableLiveDBSnapshotting { + return errors.New("live database snapshot creation is not enabled") + } + return syscall.Kill(syscall.Getpid(), syscall.SIGUSR2) } type PricingModelHistory struct { diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 63f14c89b1..8e3a667119 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -47,6 +47,7 @@ import ( "github.com/offchainlabs/nitro/cmd/chaininfo" "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/util/arbmath" + "github.com/offchainlabs/nitro/util/livedbsnapshotter" "github.com/offchainlabs/nitro/util/sharedmetrics" "github.com/offchainlabs/nitro/util/stopwaiter" ) @@ -103,6 +104,8 @@ type ExecutionEngine struct { cachedL1PriceData *L1PriceData syncTillBlock uint64 + + liveDBSnapshotter *livedbsnapshotter.LiveDBSnapshotter } func NewL1PriceData() *L1PriceData { @@ -226,6 +229,16 @@ func (s *ExecutionEngine) SetReorgEventsNotifier(reorgEventsNotifier chan struct s.reorgEventsNotifier = reorgEventsNotifier } +func (s *ExecutionEngine) SetLiveDBSnapshotter(liveDBSnapshotter *livedbsnapshotter.LiveDBSnapshotter) { + if s.Started() { + panic("trying to set liveDBSnapshotter after start") + } + if s.liveDBSnapshotter != nil { + panic("trying to set liveDBSnapshotter when already set") + } + s.liveDBSnapshotter = liveDBSnapshotter +} + func (s *ExecutionEngine) EnableReorgSequencing() { if s.Started() { panic("trying to enable reorg sequencing after start") @@ -798,6 +811,10 @@ func (s *ExecutionEngine) appendBlock(block *types.Block, statedb *state.StateDB blockGasUsedHistogram.Update(int64(blockGasused)) gasUsedSinceStartupCounter.Inc(int64(blockGasused)) s.updateL1GasPriceEstimateMetric() + if s.liveDBSnapshotter != nil && s.liveDBSnapshotter.IsSnapshotDue() { + s.bc.FlushToDisk(false) + s.liveDBSnapshotter.CreateDBSnapshot() + } return nil } diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 871bacaee7..559b2a34a0 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -33,6 +33,7 @@ import ( "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/dbutil" "github.com/offchainlabs/nitro/util/headerreader" + "github.com/offchainlabs/nitro/util/livedbsnapshotter" ) type StylusTargetConfig struct { @@ -86,21 +87,22 @@ func StylusTargetConfigAddOptions(prefix string, f *flag.FlagSet) { } type Config struct { - ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"` - Sequencer SequencerConfig `koanf:"sequencer" reload:"hot"` - RecordingDatabase BlockRecorderConfig `koanf:"recording-database"` - TxPreChecker TxPreCheckerConfig `koanf:"tx-pre-checker" reload:"hot"` - Forwarder ForwarderConfig `koanf:"forwarder"` - ForwardingTarget string `koanf:"forwarding-target"` - SecondaryForwardingTarget []string `koanf:"secondary-forwarding-target"` - Caching CachingConfig `koanf:"caching"` - RPC arbitrum.Config `koanf:"rpc"` - TxLookupLimit uint64 `koanf:"tx-lookup-limit"` - EnablePrefetchBlock bool `koanf:"enable-prefetch-block"` - SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"` - StylusTarget StylusTargetConfig `koanf:"stylus-target"` - BlockMetadataApiCacheSize uint64 `koanf:"block-metadata-api-cache-size"` - BlockMetadataApiBlocksLimit uint64 `koanf:"block-metadata-api-blocks-limit"` + ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"` + Sequencer SequencerConfig `koanf:"sequencer" reload:"hot"` + RecordingDatabase BlockRecorderConfig `koanf:"recording-database"` + TxPreChecker TxPreCheckerConfig `koanf:"tx-pre-checker" reload:"hot"` + Forwarder ForwarderConfig `koanf:"forwarder"` + ForwardingTarget string `koanf:"forwarding-target"` + SecondaryForwardingTarget []string `koanf:"secondary-forwarding-target"` + Caching CachingConfig `koanf:"caching"` + RPC arbitrum.Config `koanf:"rpc"` + TxLookupLimit uint64 `koanf:"tx-lookup-limit"` + EnablePrefetchBlock bool `koanf:"enable-prefetch-block"` + SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"` + StylusTarget StylusTargetConfig `koanf:"stylus-target"` + BlockMetadataApiCacheSize uint64 `koanf:"block-metadata-api-cache-size"` + BlockMetadataApiBlocksLimit uint64 `koanf:"block-metadata-api-blocks-limit"` + LiveDBSnapshotter livedbsnapshotter.Config `koanf:"live-db-snapshotter"` forwardingTarget string } @@ -145,6 +147,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) { StylusTargetConfigAddOptions(prefix+".stylus-target", f) f.Uint64(prefix+".block-metadata-api-cache-size", ConfigDefault.BlockMetadataApiCacheSize, "size (in bytes) of lru cache storing the blockMetadata to service arb_getRawBlockMetadata") f.Uint64(prefix+".block-metadata-api-blocks-limit", ConfigDefault.BlockMetadataApiBlocksLimit, "maximum number of blocks allowed to be queried for blockMetadata per arb_getRawBlockMetadata query. Enabled by default, set 0 to disable the limit") + livedbsnapshotter.ConfigAddOptions(prefix+".live-db-snapshotter", f) } var ConfigDefault = Config{ @@ -162,6 +165,7 @@ var ConfigDefault = Config{ StylusTarget: DefaultStylusTargetConfig, BlockMetadataApiCacheSize: 100 * 1024 * 1024, BlockMetadataApiBlocksLimit: 100, + LiveDBSnapshotter: livedbsnapshotter.DefaultConfig, } type ConfigFetcher func() *Config @@ -183,6 +187,7 @@ type ExecutionNode struct { ClassicOutbox *ClassicOutboxRetriever started atomic.Bool bulkBlockMetadataFetcher *BulkBlockMetadataFetcher + liveDBSnapshotter *livedbsnapshotter.LiveDBSnapshotter } func CreateExecutionNode( @@ -193,6 +198,8 @@ func CreateExecutionNode( l1client *ethclient.Client, configFetcher ConfigFetcher, syncTillBlock uint64, + dBSnapshotTrigger chan struct{}, + consensusDBSnapshotTrigger chan struct{}, ) (*ExecutionNode, error) { config := configFetcher() execEngine, err := NewExecutionEngine(l2BlockChain, syncTillBlock) @@ -276,6 +283,12 @@ func CreateExecutionNode( bulkBlockMetadataFetcher := NewBulkBlockMetadataFetcher(l2BlockChain, execEngine, config.BlockMetadataApiCacheSize, config.BlockMetadataApiBlocksLimit) + var liveDBSnapshotter *livedbsnapshotter.LiveDBSnapshotter + if config.LiveDBSnapshotter.Enable { + liveDBSnapshotter = livedbsnapshotter.NewLiveDBSnapshotter(chainDB, "l2chaindata, ancient, wasm", dBSnapshotTrigger, config.LiveDBSnapshotter.Dir, true, consensusDBSnapshotTrigger) + execEngine.SetLiveDBSnapshotter(liveDBSnapshotter) + } + apis := []rpc.API{{ Namespace: "arb", Version: "1.0", @@ -302,6 +315,7 @@ func CreateExecutionNode( l2BlockChain, config.RPC.ArbDebug.BlockRangeBound, config.RPC.ArbDebug.TimeoutQueueBound, + config.LiveDBSnapshotter.Enable, ), Public: false, }) @@ -338,6 +352,7 @@ func CreateExecutionNode( ParentChainReader: parentChainReader, ClassicOutbox: classicOutbox, bulkBlockMetadataFetcher: bulkBlockMetadataFetcher, + liveDBSnapshotter: liveDBSnapshotter, }, nil } @@ -389,6 +404,9 @@ func (n *ExecutionNode) Start(ctx context.Context) containers.PromiseInterface[s n.ParentChainReader.Start(ctx) } n.bulkBlockMetadataFetcher.Start(ctx) + if n.liveDBSnapshotter != nil { + n.liveDBSnapshotter.Start(ctx) + } return containers.NewReadyPromise(struct{}{}, nil) } @@ -396,6 +414,9 @@ func (n *ExecutionNode) StopAndWait() containers.PromiseInterface[struct{}] { if !n.started.Load() { return containers.NewReadyPromise(struct{}{}, nil) } + if n.liveDBSnapshotter != nil { + n.liveDBSnapshotter.StopAndWait() + } n.bulkBlockMetadataFetcher.StopAndWait() // TODO after separation // n.Stack.StopRPC() // does nothing if not running diff --git a/go-ethereum b/go-ethereum index e6c8bea35d..bffaf9989c 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit e6c8bea35d519098cf7cc9c0d3765aef9ab72cbb +Subproject commit bffaf9989c2f515f5f6bd39f5cfd2ef7bbe1a965 diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 02421e0f4d..e98d2bc216 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -501,13 +501,13 @@ func buildOnParentChain( Require(t, execConfig.Validate()) execConfigToBeUsedInConfigFetcher := execConfig execConfigFetcher := func() *gethexec.Config { return execConfigToBeUsedInConfigFetcher } - execNode, err := gethexec.CreateExecutionNode(ctx, chainTestClient.Stack, chainDb, blockchain, parentChainTestClient.Client, execConfigFetcher, 0) + execNode, err := gethexec.CreateExecutionNode(ctx, chainTestClient.Stack, chainDb, blockchain, parentChainTestClient.Client, execConfigFetcher, 0, nil, nil) Require(t, err) fatalErrChan := make(chan error, 10) chainTestClient.ConsensusNode, err = arbnode.CreateNodeFullExecutionClient( ctx, chainTestClient.Stack, execNode, execNode, execNode, execNode, arbDb, NewFetcherFromConfig(nodeConfig), blockchain.Config(), parentChainTestClient.Client, - addresses, validatorTxOptsPtr, sequencerTxOptsPtr, dataSigner, fatalErrChan, parentChainId, nil) + addresses, validatorTxOptsPtr, sequencerTxOptsPtr, dataSigner, fatalErrChan, parentChainId, nil, nil) Require(t, err) err = chainTestClient.ConsensusNode.Start(ctx) @@ -625,13 +625,13 @@ func (b *NodeBuilder) BuildL2(t *testing.T) func() { Require(t, b.execConfig.Validate()) execConfig := b.execConfig execConfigFetcher := func() *gethexec.Config { return execConfig } - execNode, err := gethexec.CreateExecutionNode(b.ctx, b.L2.Stack, chainDb, blockchain, nil, execConfigFetcher, 0) + execNode, err := gethexec.CreateExecutionNode(b.ctx, b.L2.Stack, chainDb, blockchain, nil, execConfigFetcher, 0, nil, nil) Require(t, err) fatalErrChan := make(chan error, 10) b.L2.ConsensusNode, err = arbnode.CreateNodeFullExecutionClient( b.ctx, b.L2.Stack, execNode, execNode, execNode, execNode, arbDb, NewFetcherFromConfig(b.nodeConfig), blockchain.Config(), - nil, nil, nil, nil, nil, fatalErrChan, big.NewInt(1337), nil) + nil, nil, nil, nil, nil, fatalErrChan, big.NewInt(1337), nil, nil) Require(t, err) // Give the node an init message @@ -674,11 +674,11 @@ func (b *NodeBuilder) RestartL2Node(t *testing.T) { l2info, stack, chainDb, arbDb, blockchain := createNonL1BlockChainWithStackConfig(t, b.L2Info, b.dataDir, b.chainConfig, b.initMessage, b.l2StackConfig, b.execConfig, b.wasmCacheTag, b.useFreezer) execConfigFetcher := func() *gethexec.Config { return b.execConfig } - execNode, err := gethexec.CreateExecutionNode(b.ctx, stack, chainDb, blockchain, nil, execConfigFetcher, 0) + execNode, err := gethexec.CreateExecutionNode(b.ctx, stack, chainDb, blockchain, nil, execConfigFetcher, 0, nil, nil) Require(t, err) feedErrChan := make(chan error, 10) - currentNode, err := arbnode.CreateNodeFullExecutionClient(b.ctx, stack, execNode, execNode, execNode, execNode, arbDb, NewFetcherFromConfig(b.nodeConfig), blockchain.Config(), nil, nil, nil, nil, nil, feedErrChan, big.NewInt(1337), nil) + currentNode, err := arbnode.CreateNodeFullExecutionClient(b.ctx, stack, execNode, execNode, execNode, execNode, arbDb, NewFetcherFromConfig(b.nodeConfig), blockchain.Config(), nil, nil, nil, nil, nil, feedErrChan, big.NewInt(1337), nil, nil) Require(t, err) Require(t, currentNode.Start(b.ctx)) @@ -1569,14 +1569,14 @@ func Create2ndNodeWithConfig( Require(t, nodeConfig.Validate()) configFetcher := func() *gethexec.Config { return execConfig } - currentExec, err := gethexec.CreateExecutionNode(ctx, chainStack, chainDb, blockchain, parentChainClient, configFetcher, 0) + currentExec, err := gethexec.CreateExecutionNode(ctx, chainStack, chainDb, blockchain, parentChainClient, configFetcher, 0, nil, nil) Require(t, err) var currentNode *arbnode.Node if useExecutionClientOnly { - currentNode, err = arbnode.CreateNodeExecutionClient(ctx, chainStack, currentExec, arbDb, NewFetcherFromConfig(nodeConfig), blockchain.Config(), parentChainClient, addresses, &validatorTxOpts, &sequencerTxOpts, dataSigner, feedErrChan, big.NewInt(1337), nil) + currentNode, err = arbnode.CreateNodeExecutionClient(ctx, chainStack, currentExec, arbDb, NewFetcherFromConfig(nodeConfig), blockchain.Config(), parentChainClient, addresses, &validatorTxOpts, &sequencerTxOpts, dataSigner, feedErrChan, big.NewInt(1337), nil, nil) } else { - currentNode, err = arbnode.CreateNodeFullExecutionClient(ctx, chainStack, currentExec, currentExec, currentExec, currentExec, arbDb, NewFetcherFromConfig(nodeConfig), blockchain.Config(), parentChainClient, addresses, &validatorTxOpts, &sequencerTxOpts, dataSigner, feedErrChan, big.NewInt(1337), nil) + currentNode, err = arbnode.CreateNodeFullExecutionClient(ctx, chainStack, currentExec, currentExec, currentExec, currentExec, arbDb, NewFetcherFromConfig(nodeConfig), blockchain.Config(), parentChainClient, addresses, &validatorTxOpts, &sequencerTxOpts, dataSigner, feedErrChan, big.NewInt(1337), nil, nil) } Require(t, err) diff --git a/util/livedbsnapshotter/live_db_snapshotter.go b/util/livedbsnapshotter/live_db_snapshotter.go new file mode 100644 index 0000000000..a8b49303fd --- /dev/null +++ b/util/livedbsnapshotter/live_db_snapshotter.go @@ -0,0 +1,104 @@ +package livedbsnapshotter + +import ( + "context" + "sync/atomic" + "time" + + flag "github.com/spf13/pflag" + + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + + "github.com/offchainlabs/nitro/util/stopwaiter" +) + +type Config struct { + Enable bool `koanf:"enable"` + Dir string `koanf:"dir"` +} + +var DefaultConfig = Config{ + Enable: false, + Dir: "", +} + +func ConfigAddOptions(prefix string, f *flag.FlagSet) { + f.Bool(prefix+".enable", DefaultConfig.Enable, "enable creation of live db snapshots") + f.String(prefix+".dir", DefaultConfig.Dir, "path to the directory for saving db snapshots") +} + +type LiveDBSnapshotter struct { + stopwaiter.StopWaiter + db ethdb.Database + dbName string + trigger chan struct{} + dir string + isSnapshotDue atomic.Bool + withScheduling bool + + chainedTrigger chan struct{} +} + +func NewLiveDBSnapshotter(db ethdb.Database, dbName string, trigger chan struct{}, dir string, withScheduling bool, chainedTrigger chan struct{}) *LiveDBSnapshotter { + return &LiveDBSnapshotter{ + db: db, + dbName: dbName, + trigger: trigger, + dir: dir, + withScheduling: withScheduling, + chainedTrigger: chainedTrigger, + } +} + +func (l *LiveDBSnapshotter) Start(ctx context.Context) { + l.StopWaiter.Start(ctx, l) + l.LaunchThread(l.scheduleSnapshotCreation) +} + +func (l *LiveDBSnapshotter) scheduleSnapshotCreation(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-l.trigger: + log.Info("Live databases snapshot creation scheduled", "databases", l.dbName) + } + + if l.dir == "" { + log.Error("Aborting live databases snapshot creation as destination directory is empty") + continue + } + l.isSnapshotDue.Store(true) + if !l.withScheduling && l.IsSnapshotDue() { + l.CreateDBSnapshot() + } + } +} + +func (l *LiveDBSnapshotter) IsSnapshotDue() bool { + return l.isSnapshotDue.Load() +} + +func (l *LiveDBSnapshotter) CreateDBSnapshot() { + if l.Stopped() { + return + } + + startTime := time.Now() + log.Info("Beginning snapshot creation", "databases", l.dbName) + if err := l.db.CreateDBSnapshot(l.dir); err != nil { + log.Error("Snapshot creation for database failed", "databases", l.dbName, "err", err, "timeTaken", time.Since(startTime)) + } else { + log.Info("Live snapshot was successfully created", "databases", l.dbName, "timeTaken", time.Since(startTime)) + } + l.isSnapshotDue.Store(false) + + // As snapshot of consensus can only be taken after execution's is done + if l.chainedTrigger != nil { + select { + case l.chainedTrigger <- struct{}{}: + default: + } + } +}