Skip to content

Commit 77739e6

Browse files
authored
Feat/purge blob eo438 (#697)
* scaffolding purge commands [EO-438] Signed-off-by: Frédéric BIDON <[email protected]> * added core implementation Signed-off-by: Frédéric BIDON <[email protected]> * updated goleak exclusions Signed-off-by: Frédéric BIDON <[email protected]> * purge testing Signed-off-by: Frédéric BIDON <[email protected]> * added report about index/purge activity Signed-off-by: Frédéric BIDON <[email protected]> * added unit & integration tests Signed-off-by: Frederic BIDON <[email protected]> * added a few docs Signed-off-by: Frederic BIDON <[email protected]> * added doc for local index path flag Signed-off-by: Frederic BIDON <[email protected]> Signed-off-by: Frédéric BIDON <[email protected]> Signed-off-by: Frederic BIDON <[email protected]>
1 parent 118f9fa commit 77739e6

20 files changed

+1627
-16
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ output/new data that is generated from the existing data.
4545
#### Extra tools
4646

4747
* Scripted interface to use as a sidecar container (e.g. for ARGO workflows)
48+
* [Cleaning-up unused storage](docs/purge.md)
4849

4950
#### Experimental
5051

cmd/datamon/cmd/flags.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ type flagsT struct {
9595
noConflicts bool
9696
}
9797
upgrade upgradeFlags
98+
purge struct {
99+
Force bool
100+
LocalStorePath string
101+
DryRun bool
102+
}
98103
}
99104

100105
var datamonFlags = flagsT{}
@@ -574,6 +579,30 @@ func addVerifyBlobHashFlag(cmd *cobra.Command) string {
574579
return c
575580
}
576581

582+
func addPurgeForceFlag(cmd *cobra.Command) string {
583+
const c = "force"
584+
if cmd != nil {
585+
cmd.PersistentFlags().BoolVar(&datamonFlags.purge.Force, c, false, "Forces a locked purge job to run. You MUST make sure that no such concurrent job is running")
586+
}
587+
return c
588+
}
589+
590+
func addPurgeDryRunFlag(cmd *cobra.Command) string {
591+
const c = "dry-run"
592+
if cmd != nil {
593+
cmd.Flags().BoolVar(&datamonFlags.purge.DryRun, c, false, "Report about the purge, but don't actually delete anything")
594+
}
595+
return c
596+
}
597+
598+
func addPurgeLocalPathFlag(cmd *cobra.Command) string {
599+
const c = "local-work-dir"
600+
if cmd != nil {
601+
cmd.PersistentFlags().StringVar(&datamonFlags.purge.LocalStorePath, c, ".datamon-index", "Indicates the local folder that datamon will use as its working area")
602+
}
603+
return c
604+
}
605+
577606
/** parameters struct from other formats */
578607

579608
// apply config file + env vars to structure used to parse cli flags

cmd/datamon/cmd/mocks_test.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,12 @@ func setupConfig(t *testing.T, flags flagsT) func() {
9090
testContext := testContext()
9191
client, err := gcsStorage.NewClient(context.Background(), option.WithScopes(gcsStorage.ScopeFullControl))
9292
require.NoError(t, err, "couldn't create bucket client")
93-
err = client.Bucket(bucketConfig).Create(context.Background(), projectID(), nil)
94-
require.NoError(t, err, "couldn't create config bucket")
93+
94+
require.NoError(t,
95+
client.Bucket(bucketConfig).Create(context.Background(), projectID(), nil),
96+
"couldn't create config bucket",
97+
)
98+
9599
runCmd(t, []string{
96100
"context",
97101
"create",
@@ -111,10 +115,13 @@ func setupConfig(t *testing.T, flags flagsT) func() {
111115
flags.context.Descriptor.ReadLog,
112116
// "--loglevel", "debug",
113117
}, "test and create context", false)
114-
err = os.Setenv("DATAMON_GLOBAL_CONFIG", bucketConfig)
115-
require.NoError(t, err)
116-
err = os.Setenv("DATAMON_CONTEXT", testContext)
117-
require.NoError(t, err)
118+
require.NoError(t,
119+
os.Setenv("DATAMON_GLOBAL_CONFIG", bucketConfig),
120+
)
121+
require.NoError(t,
122+
os.Setenv("DATAMON_CONTEXT", testContext),
123+
)
124+
118125
cleanup := func() {
119126
deleteBucket(context.Background(), t, client, bucketConfig)
120127
}
@@ -175,6 +182,7 @@ func setupTests(t *testing.T) func() {
175182
_ = os.RemoveAll(destinationDir)
176183
doBucketCleanup()
177184
}
185+
178186
return cleanup
179187
}
180188

cmd/datamon/cmd/purge.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package cmd
2+
3+
import (
4+
"github.com/spf13/cobra"
5+
)
6+
7+
// purgeCmd represents the purge related commands
8+
var purgeCmd = &cobra.Command{
9+
Use: "purge",
10+
Short: "Commands to purge unused blob storage",
11+
Long: `Purge allows owners of a BLOB storage to actually delete data that is no longer referenced by any repo.
12+
13+
To effectively proceed to a purge, proceed with the following steps:
14+
1. Use "datamon repo delete" to delete repositories. This will remove references to a repo. Actual BLOB storage is maintained.
15+
2. Use "datamon purge build-reverse-lookup". This will build an index all currently active BLOB references for _all_ repositories.
16+
3. Use "datamon purge delete-unused". This will delete BLOB resources that are not present in the index.
17+
18+
NOTES:
19+
* datamon purge delete-unused-blobs won't start if no reverse-lookup index is present
20+
* datamon purge build-reverse-lookup may be run again, thus updating the index
21+
* the update time considered for the reverse-lookup index is the time the build command is launched
22+
* any repo or file object that is created while building the index will be ignored in the index.
23+
* when running delete-unused, BLOB pages that are more recent than the index won't be removed.
24+
`,
25+
PreRun: func(cmd *cobra.Command, args []string) {
26+
if err := newCliOptionInputs(config, &datamonFlags).populateRemoteConfig(); err != nil {
27+
wrapFatalln("populate remote config", err)
28+
}
29+
},
30+
}

cmd/datamon/cmd/purge_delete_index.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package cmd
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/oneconcern/datamon/pkg/core"
8+
"github.com/spf13/cobra"
9+
"go.uber.org/zap"
10+
)
11+
12+
// deleteLookupCmd represents the command to build a reverse-lookup index of BLOB resources.
13+
var deleteLookupCmd = &cobra.Command{
14+
Use: "delete-reverse-lookup",
15+
Short: "Command to delete a reverse-lookup index from the metadata",
16+
Long: `The index maybe quite large and only really used when we need to purge BLOBs.
17+
18+
This command allows to remove the index file from the metadata.
19+
Only ONE instance of this command may run: dropping index concurrently is not supported.
20+
21+
A deletion of the index may be forced using the "--force" flag.
22+
23+
You MUST make sure that no concurrent build-reverse-lookup or delete job is still running before doing that.
24+
`,
25+
PreRun: func(cmd *cobra.Command, args []string) {
26+
if err := newCliOptionInputs(config, &datamonFlags).populateRemoteConfig(); err != nil {
27+
wrapFatalln("populate remote config", err)
28+
}
29+
},
30+
Run: func(cmd *cobra.Command, args []string) {
31+
var err error
32+
33+
defer func(t0 time.Time) {
34+
cliUsage(t0, "purge build-reverse-lookup", err)
35+
}(time.Now())
36+
37+
ctx := context.Background()
38+
optionInputs := newCliOptionInputs(config, &datamonFlags)
39+
remoteStores, err := optionInputs.datamonContext(ctx)
40+
if err != nil {
41+
wrapFatalln("create remote stores", err)
42+
return
43+
}
44+
logger, err := optionInputs.getLogger()
45+
46+
logger.Info("deleting reverse-lookup index",
47+
zap.String("context", datamonFlags.context.Descriptor.Name),
48+
zap.Bool("force?", datamonFlags.purge.Force),
49+
zap.String("context BLOB bucket", datamonFlags.context.Descriptor.Blob),
50+
zap.String("context metadata bucket", datamonFlags.context.Descriptor.Metadata),
51+
)
52+
opts := []core.PurgeOption{
53+
core.WithPurgeForce(datamonFlags.purge.Force),
54+
core.WithPurgeLogger(logger),
55+
}
56+
57+
err = core.PurgeLock(remoteStores, opts...)
58+
if err != nil {
59+
wrapFatalln("deleting reverse-lookup: another purge job is running", err)
60+
61+
return
62+
}
63+
64+
err = core.PurgeDropReverseIndex(remoteStores, opts...)
65+
erp := core.PurgeUnlock(remoteStores, opts...)
66+
67+
if erh := handlePurgeErrors(cmd.Name(), err, erp); erh != nil {
68+
wrapFatalln(cmd.Name(), erh)
69+
}
70+
},
71+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package cmd
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/oneconcern/datamon/pkg/core"
8+
"github.com/spf13/cobra"
9+
"go.uber.org/zap"
10+
)
11+
12+
// deleteUnusedCmd represents the command to delete BLOB resources that are not present in the reverse-lookup index
13+
var deleteUnusedCmd = &cobra.Command{
14+
Use: "delete-unused",
15+
Short: "Command to delete BLOB resources that are not present in the reverse-lookup index",
16+
Long: `The reverse-lookup index MUST have been created.
17+
18+
Any BLOB resource that is more recent than the index last update date is kept.
19+
20+
Only ONE instance of this command may run: concurrent deletion is not supported.
21+
Index updates cannot be performed while the deletion is ongoing.
22+
23+
If the delete-unused job fais to complete, it may be run again.
24+
25+
To retry on a failed deletion, use the "--force" flag to bypass the lock.
26+
You MUST make sure that no delete job is still running before doing that.
27+
`,
28+
PreRun: func(cmd *cobra.Command, args []string) {
29+
if err := newCliOptionInputs(config, &datamonFlags).populateRemoteConfig(); err != nil {
30+
wrapFatalln("populate remote config", err)
31+
}
32+
},
33+
Run: func(cmd *cobra.Command, args []string) {
34+
var err error
35+
36+
defer func(t0 time.Time) {
37+
cliUsage(t0, "purge delete-unused", err)
38+
}(time.Now())
39+
40+
ctx := context.Background()
41+
optionInputs := newCliOptionInputs(config, &datamonFlags)
42+
remoteStores, err := optionInputs.datamonContext(ctx)
43+
if err != nil {
44+
wrapFatalln("create remote stores", err)
45+
return
46+
}
47+
logger, err := optionInputs.getLogger()
48+
49+
logger.Info("deleting unused blobs",
50+
zap.String("context", datamonFlags.context.Descriptor.Name),
51+
zap.Bool("force?", datamonFlags.purge.Force),
52+
zap.String("context BLOB bucket", datamonFlags.context.Descriptor.Blob),
53+
zap.String("context metadata bucket", datamonFlags.context.Descriptor.Metadata),
54+
)
55+
opts := []core.PurgeOption{
56+
core.WithPurgeForce(datamonFlags.purge.Force),
57+
core.WithPurgeLogger(logger),
58+
core.WithPurgeLocalStore(datamonFlags.purge.LocalStorePath),
59+
core.WithPurgeDryRun(datamonFlags.purge.DryRun),
60+
}
61+
62+
err = core.PurgeLock(remoteStores, opts...)
63+
if err != nil {
64+
wrapFatalln("delete-unused: another purge job is running", err)
65+
66+
return
67+
}
68+
69+
descriptor, err := core.PurgeDeleteUnused(remoteStores, opts...)
70+
erp := core.PurgeUnlock(remoteStores, opts...)
71+
72+
if erh := handlePurgeErrors(cmd.Name(), err, erp); erh != nil {
73+
wrapFatalln(cmd.Name(), erh)
74+
75+
return
76+
}
77+
78+
log.Printf(
79+
"unused blob keys removed (none is actually removed if this is a dry-run).\n"+
80+
"Metadata store: %v\n"+
81+
"Blob store: %s\n"+
82+
"Index built at: %v\n"+
83+
"Num blob keys scanned: %d\n"+
84+
"Num blob keys found in use: %d\n"+
85+
"Num blob keys found more recent than index: %d\n"+
86+
"Num blob keys deleted: %d\n"+
87+
"Num bytes relinquished: %d\n"+
88+
"Dry-run: %t\n",
89+
remoteStores.Metadata(),
90+
remoteStores.Blob(),
91+
descriptor.IndexTime,
92+
descriptor.ScannedEntries,
93+
descriptor.IndexedEntries,
94+
descriptor.MoreRecentEntries,
95+
descriptor.DeletedEntries,
96+
descriptor.DeletedSize,
97+
descriptor.DryRun,
98+
)
99+
},
100+
}

0 commit comments

Comments
 (0)