diff --git a/pkg/datasync/consumer_test.go b/pkg/datasync/consumer_test.go index 1d8a4184a5fe3..f91b8309f7eb8 100644 --- a/pkg/datasync/consumer_test.go +++ b/pkg/datasync/consumer_test.go @@ -304,7 +304,7 @@ func TestParseCheckpointLocations(t *testing.T) { assert.NotNil(t, ckp) locations, err := c.parseCheckpointLocations(ctx, ckp.Location) assert.NoError(t, err) - assert.Equal(t, 35, len(locations)) + assert.Equal(t, 65, len(locations)) }) }) } diff --git a/pkg/vm/engine/tae/catalog/object.go b/pkg/vm/engine/tae/catalog/object.go index be8e033ffe114..b22ce4ab7627f 100644 --- a/pkg/vm/engine/tae/catalog/object.go +++ b/pkg/vm/engine/tae/catalog/object.go @@ -728,3 +728,36 @@ func (entry *ObjectEntry) ForeachMVCCNodeInRange(start, end types.TS, f func(*tx } return nil } + +// ForeachMVCCSpecificNodeInRange is used by "do checkpoint". +// The purpose is to ensure that the checkpoint contains all data before the end timestamp. +func (entry *ObjectEntry) ForeachMVCCSpecificNodeInRange(start, end types.TS, f func(*txnbase.TxnMVCCNode) error) error { + needWait, txn := entry.GetLastMVCCNode().NeedWaitCommitting(end.Next()) + if needWait { + txn.GetTxnState(true) + } + + var createIn, deleteIn bool + createIn, _ = entry.CreateNode.PreparedIn(start, end) + if createIn { + if err := f(&entry.CreateNode); err != nil { + return err + } + } + deleteIn, _ = entry.DeleteNode.PreparedIn(start, end) + if !deleteIn { + if !entry.IsAppendable() { + return nil + } + if !createIn { + return nil + } + if !entry.DeleteNode.IsCommitted() { + return nil + } + } + if err := f(&entry.DeleteNode); err != nil { + return err + } + return nil +} diff --git a/pkg/vm/engine/tae/db/gc/v3/checkpoint.go b/pkg/vm/engine/tae/db/gc/v3/checkpoint.go index 67cc929030219..b7817e4959e3c 100644 --- a/pkg/vm/engine/tae/db/gc/v3/checkpoint.go +++ b/pkg/vm/engine/tae/db/gc/v3/checkpoint.go @@ -828,8 +828,8 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked( if ckpMaxEnd.GT(checkpointLowWaterMark) { logutil.Warn("GC-PANIC-MERGE-FILES", zap.String("task", c.TaskNameLocked()), - zap.String("ckpMaxEnd", ckpMaxEnd.ToString()), - zap.String("checkpointLowWaterMark", checkpointLowWaterMark.ToString()), + zap.String("ckp-max", ckpMaxEnd.ToString()), + zap.String("ckp-low", checkpointLowWaterMark.ToString()), ) return } @@ -848,7 +848,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked( window := c.GetScannedWindowLocked() if ckpMaxEnd.GT(&window.tsRange.end) { logutil.Warn("GC-PANIC-MERGE-FILES", - zap.String("ckpMaxEnd", ckpMaxEnd.ToString()), + zap.String("ckp-max", ckpMaxEnd.ToString()), zap.String("window-end", window.tsRange.end.ToString()), ) return @@ -901,7 +901,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked( logutil.Error( "GC-TRACE-MERGE-CHECKPOINT-FILES", zap.String("task", c.TaskNameLocked()), - zap.Int("file len", len(newFiles)), + zap.Int("file-len", len(newFiles)), zap.Error(err), ) return @@ -947,7 +947,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked( logutil.Info( "GC-TRACE-DELETE-CHECKPOINT-FILE", zap.String("task", c.TaskNameLocked()), - zap.String("delete file", deleteFile), + zap.String("delete-file", deleteFile), ) c.checkpointCli.RemoveCheckpointMetaFile(decodedFile.GetName()) } @@ -1140,7 +1140,7 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked( if scanMark.IsEmpty() { logutil.Warn("GC-PANIC-SCANMARK-EMPTY", zap.String("task", c.TaskNameLocked()), - zap.String("mergeMark", mergeMark.ToString())) + zap.String("merge-mark", mergeMark.ToString())) return nil } if waterMark.GT(&scanMark) { @@ -1573,7 +1573,7 @@ func (c *checkpointCleaner) tryScanLocked( cpt := c.checkpointCli.GetCompacted() if cpt == nil { logutil.Info("GC-PANIC-REBUILD-TABLE", - zap.String("max gCkp", maxGCkp.String()), + zap.String("max-gckp", maxGCkp.String()), zap.String("start", start.ToString())) } else { candidates = append(candidates, cpt) @@ -1582,8 +1582,8 @@ func (c *checkpointCleaner) tryScanLocked( gcWaterMark := c.GetGCWaterMark() if gcWaterMark != nil { logutil.Warn("GC-PANIC-REBUILD-GC-WATER-MARK", - zap.String("max gCkp", maxGCkp.String()), - zap.String("gcWaterMark", gcWaterMark.String())) + zap.String("max-gckp", maxGCkp.String()), + zap.String("gc-water-mark", gcWaterMark.String())) } c.updateGCWaterMark(maxGCkp) tryGC = false diff --git a/pkg/vm/engine/tae/db/gc/v3/diskcleaner.go b/pkg/vm/engine/tae/db/gc/v3/diskcleaner.go index dab5d1ee35b2c..dc2b5d7c0e22b 100644 --- a/pkg/vm/engine/tae/db/gc/v3/diskcleaner.go +++ b/pkg/vm/engine/tae/db/gc/v3/diskcleaner.go @@ -346,6 +346,12 @@ func (cleaner *DiskCleaner) Start() { cleaner.onceStart.Do(func() { cleaner.processQueue.Start() step := cleaner.step.Load() + defer func() { + logutil.Info( + "GC-DISK-CLEANER-START", + zap.Uint32("step", cleaner.step.Load()), + ) + }() switch step { case StateStep_Write: if err := cleaner.forceScheduleJob(JT_GCReplayAndExecute); err != nil { @@ -366,7 +372,7 @@ func (cleaner *DiskCleaner) Stop() { cleaner.processQueue.Stop() cleaner.cleaner.Stop() logutil.Info( - "GC-DiskCleaner-Started", + "GC-DISK-CLEANER-STOP", zap.Uint32("step", cleaner.step.Load()), ) }) diff --git a/pkg/vm/engine/tae/db/gc/v3/exec_v1.go b/pkg/vm/engine/tae/db/gc/v3/exec_v1.go index c00792d728efe..7c2423f5c6c32 100644 --- a/pkg/vm/engine/tae/db/gc/v3/exec_v1.go +++ b/pkg/vm/engine/tae/db/gc/v3/exec_v1.go @@ -336,7 +336,7 @@ func MakeSnapshotAndPitrFineFilter( if deleteTS.IsEmpty() { logutil.Warn("GC-PANIC-TS-EMPTY", zap.String("name", name), - zap.String("createTS", createTS.ToString())) + zap.String("create-ts", createTS.ToString())) continue } if !logtail.ObjectIsSnapshotRefers( diff --git a/pkg/vm/engine/tae/logtail/ckp_writer.go b/pkg/vm/engine/tae/logtail/ckp_writer.go index 7053fd7e0974e..6c2237b050a85 100644 --- a/pkg/vm/engine/tae/logtail/ckp_writer.go +++ b/pkg/vm/engine/tae/logtail/ckp_writer.go @@ -90,7 +90,7 @@ func (collector *BaseCollector_V2) Collect(c *catalog.Catalog) (err error) { } func (collector *BaseCollector_V2) visitObject(entry *catalog.ObjectEntry) error { - return entry.ForeachMVCCNodeInRange(collector.start, collector.end, func(node *txnbase.TxnMVCCNode) error { + return entry.ForeachMVCCSpecificNodeInRange(collector.start, collector.end, func(node *txnbase.TxnMVCCNode) error { if node.IsAborted() { return nil }