Skip to content

[WIP] Keyspace cleaning #9182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions pkg/keyspace/gc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright 2025 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package keyspace

import (
"context"
"sync"
"time"

Check failure on line 21 in pkg/keyspace/gc.go

View workflow job for this annotation

GitHub Actions / statics

File is not properly formatted (gci)
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/kv"
"go.uber.org/zap"
)

// Some common per-loop config.
const (
// gcPerLoopTimeout specifies timeout for a single loop, it must be less than minKeyspaceGCInterval (10m).
gcPerLoopTimeout = 5 * time.Minute
// gcKeyspaceBatch specifies batch size when fetching keyspaces.
gcKeyspaceBatch = 100
)

// gcWorker is used to clean up keyspace related information.
type gcWorker struct {
sync.RWMutex
manager *Manager
member *member.EmbeddedEtcdMember

nextKeyspaceID uint32
ticker *time.Ticker
// gcLifeTime specifies how long should we keep archived keyspace.
gcLifeTime time.Duration
}

// newGCWorker returns a newGCWorker.
Copy link
Contributor

@ystaticy ystaticy Apr 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduce the concept of 'archive' instead of calling it GC to prevent other developers from confusing this GC with MVCC GC

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, do not use the concept of 'GC' when archiving a keyspace, and the naming of safepoint variables should include a prefix to indicate what operation the safepoint corresponds to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, sounds good, will use cleaning & archiving instead of gc

func (manager *Manager) newGCWorker() *gcWorker {
dummyTicker := time.NewTicker(time.Hour)
dummyTicker.Stop()

worker := &gcWorker{
manager: manager,
member: manager.member,
ticker: dummyTicker, // A dummy ticker, real ticker will be setup by reload.
gcLifeTime: manager.config.GetGCLifeTime(),
}
return worker
}

// run starts the main loop of the gc worker.
func (worker *gcWorker) run() {
for {
select {
// If manager's context done, stop the loop completely.
case <-worker.manager.ctx.Done():
worker.ticker.Stop()
log.Info("[keyspace] gc loop stopped due to context cancel")
return
case now := <-worker.ticker.C:
if !worker.member.IsLeader() {
// If server currently not leader, stop the ticker and don't do gc,
// reload will be called and reset the ticker when this server is elected.
worker.ticker.Stop()
log.Info("[keyspace] gc loop skipped, server is not leader")
continue
}
// If a keyspace archived before safePoint, we should clean up the keyspace.
worker.RLock()
safePoint := now.Add(-worker.gcLifeTime).Unix()
worker.RUnlock()
log.Info("[keyspace] starting gc")
worker.nextKeyspaceID = worker.scanKeyspacesAndDoGC(worker.manager.ctx, safePoint)
}
}
}

func (worker *gcWorker) reload(cfg Config) {
worker.ticker.Stop()
worker.Lock()
defer worker.Unlock()

if !cfg.ToGCKeyspace() {
log.Info("[keyspace] gc disabled")
return
}
// Set the worker's gc lifetime and run duration
worker.gcLifeTime = cfg.GetGCLifeTime()
worker.ticker.Reset(cfg.GetGCRunInterval())
log.Info("[keyspace] gc config reloaded",
zap.Duration("gc lifetime", worker.gcLifeTime),
zap.Duration("run interval", cfg.GetGCRunInterval()),
)
}

// scanKeyspacesAndDoGC scans all current keyspaces and attempts to do one round of gc within gcPerLoopTimeout.
// It starts with nextKeyspaceID, and return the last garbage collected keyspace's id + 1 as the starting id
// for the next round.
func (worker *gcWorker) scanKeyspacesAndDoGC(ctx context.Context, safePoint int64) uint32 {
ctx, cancel := context.WithTimeout(ctx, gcPerLoopTimeout)
defer cancel()
nextID := worker.nextKeyspaceID
manager := worker.manager
var (
batch []*keyspacepb.KeyspaceMeta
err error
)
// Scan all keyspaces.
for {
select {
case <-ctx.Done():
log.Info("[keyspace] stopping gc loop due to context cancel")
return nextID
default:
}

err = manager.store.RunInTxn(ctx, func(txn kv.Txn) error {
if batch, err = manager.store.LoadRangeKeyspace(txn, nextID, gcKeyspaceBatch); err != nil {
return err
}
return nil
})
if err != nil {
log.Error("[keyspace] stopping gc loop, failed to fetch keyspace meta", zap.Error(err))
return nextID
}
if len(batch) == 0 {
// Return here directly and wait for the next tick instead of redo scanning from the beginning,
// this could prevent scanning keyspace at high frequency when keyspace count is low.
return 0
}
for i, meta := range batch {
if canGC(meta, safePoint) {
worker.gcKeyspace(ctx, meta)
}
if i == len(batch)-1 {
nextID = meta.GetId() + 1
}
}
}
}

// gcKeyspace gc one keyspace related information.
// It will be tried again in the next scan if it fails.
func (worker *gcWorker) gcKeyspace(ctx context.Context, meta *keyspacepb.KeyspaceMeta) {
select {
case <-ctx.Done():
log.Info("[keyspace] skipping gc due to context cancel",
zap.Uint32("ID", meta.GetId()), zap.String("name", meta.GetName()))
return
default:
}

log.Info("[keyspace] start cleaning keyspace meta data",
zap.String("name", meta.GetName()),
zap.Uint32("ID", meta.GetId()),
zap.String("state", meta.GetState().String()),
zap.Int64("last state change", meta.GetStateChangedAt()),
)

// Following section should be idempotent:
// TODO: Clean TiKV range.
// TODO: Clean TiFlash placement rules.
// TODO: Clean Region Label rules.
// TODO: Clean keyspace related etcd paths.
// And only when all of the above succeeded:
// TODO: Set keyspace state to TOMBSTONE

// Inject a failpoint to test cleaning framework during test.
failpoint.Inject("doGC", func() {
_, err := worker.manager.UpdateKeyspaceState(meta.GetName(), keyspacepb.KeyspaceState_TOMBSTONE, time.Now().Unix())
if err != nil {
log.Warn("[keyspace] fail to tombstone keyspace when doing gc")
}
})
}
17 changes: 16 additions & 1 deletion pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"strconv"
"time"

"github.com/tikv/pd/pkg/member"

Check failure on line 23 in pkg/keyspace/keyspace.go

View workflow job for this annotation

GitHub Actions / statics

File is not properly formatted (gci)
"go.uber.org/zap"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -70,6 +71,9 @@
ToWaitRegionSplit() bool
GetWaitRegionSplitTimeout() time.Duration
GetCheckRegionSplitInterval() time.Duration
ToGCKeyspace() bool
GetGCRunInterval() time.Duration
GetGCLifeTime() time.Duration
}

// Manager manages keyspace related data.
Expand All @@ -85,12 +89,16 @@
store endpoint.KeyspaceStorage
// rc is the raft cluster of the server.
cluster core.ClusterInformer
// member is the current pd's member information, used to check if server is leader.
member *member.EmbeddedEtcdMember
// config is the configurations of the manager.
config Config
// kgm is the keyspace group manager of the server.
kgm *GroupManager
// nextPatrolStartID is the next start id of keyspace assignment patrol.
nextPatrolStartID uint32
// gcWorker is used to clean up archived keyspace.
gcWorker *gcWorker
}

// CreateKeyspaceRequest represents necessary arguments to create a keyspace.
Expand All @@ -110,11 +118,12 @@
ctx context.Context,
store endpoint.KeyspaceStorage,
cluster core.ClusterInformer,
member *member.EmbeddedEtcdMember,
idAllocator id.Allocator,
config Config,
kgm *GroupManager,
) *Manager {
return &Manager{
manager := &Manager{
ctx: ctx,
// Remove the lock of the given key from the lock group when unlock to
// keep minimal working set, which is suited for low qps, non-time-critical
Expand All @@ -125,10 +134,13 @@
idAllocator: idAllocator,
store: store,
cluster: cluster,
member: member,
config: config,
kgm: kgm,
nextPatrolStartID: constant.DefaultKeyspaceID,
}
manager.gcWorker = manager.newGCWorker()
return manager
}

// Bootstrap saves default keyspace info.
Expand Down Expand Up @@ -182,12 +194,15 @@
return err
}
}
// start gc loop.
go manager.gcWorker.run()
return nil
}

// UpdateConfig update keyspace manager's config.
func (manager *Manager) UpdateConfig(cfg Config) {
manager.config = cfg
manager.gcWorker.reload(cfg)
}

// CreateKeyspace create a keyspace meta with given config and save it to storage.
Expand Down
17 changes: 16 additions & 1 deletion pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type mockConfig struct {
WaitRegionSplit bool
WaitRegionSplitTimeout typeutil.Duration
CheckRegionSplitInterval typeutil.Duration
GCKeyspace bool
GCRunInterval time.Duration
GCLifeTime time.Duration
}

func (m *mockConfig) GetPreAlloc() []string {
Expand All @@ -77,13 +80,25 @@ func (m *mockConfig) GetCheckRegionSplitInterval() time.Duration {
return m.CheckRegionSplitInterval.Duration
}

func (m *mockConfig) ToGCKeyspace() bool {
return m.GCKeyspace
}

func (m *mockConfig) GetGCRunInterval() time.Duration {
return m.GCRunInterval
}

func (m *mockConfig) GetGCLifeTime() time.Duration {
return m.GCLifeTime
}

func (suite *keyspaceTestSuite) SetupTest() {
re := suite.Require()
suite.ctx, suite.cancel = context.WithCancel(context.Background())
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
allocator := mockid.NewIDAllocator()
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil)
suite.manager = NewKeyspaceManager(suite.ctx, store, nil, allocator, &mockConfig{}, kgm)
suite.manager = NewKeyspaceManager(suite.ctx, store, nil, nil, allocator, &mockConfig{}, kgm)
re.NoError(kgm.Bootstrap(suite.ctx))
re.NoError(suite.manager.Bootstrap())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() {
suite.kgm = NewKeyspaceGroupManager(suite.ctx, store, nil)
idAllocator := mockid.NewIDAllocator()
cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions())
suite.kg = NewKeyspaceManager(suite.ctx, store, cluster, idAllocator, &mockConfig{}, suite.kgm)
suite.kg = NewKeyspaceManager(suite.ctx, store, cluster, nil, idAllocator, &mockConfig{}, suite.kgm)
re.NoError(suite.kgm.Bootstrap(suite.ctx))
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,11 @@ func (hp *indexedHeap) Remove(id uint32) *endpoint.KeyspaceGroup {
}
return nil
}

// canGC checks keyspace's state and stateChangedAt against safePoint to determine if we can safely gc it.
func canGC(meta *keyspacepb.KeyspaceMeta, safePoint int64) bool {
if meta.GetState() != keyspacepb.KeyspaceState_ARCHIVED {
return false
}
return meta.GetStateChangedAt() < safePoint
}
Loading
Loading