diff --git a/cmd/serve.go b/cmd/serve.go index ed3c87b..4939736 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -67,7 +67,7 @@ var serveCmd = &cobra.Command{ filler := filler.New(asyncClient, rootCfg.RegistryHostname, "/") regServer := staticreg.New(asyncClient, filler, rootCfg.RegistryHostname) - srv, err := server.New(bindAddr, regServer, log, cacheDuration, ignoredUserAgents) + srv, err := server.New(bindAddr, regServer, asyncClient, log, cacheDuration, ignoredUserAgents) if err != nil { slog.Error("error creating server", logger.ErrAttr(err)) return diff --git a/pkg/registry/async/async_registry.go b/pkg/registry/async/async_registry.go index 2f77689..e5a6d7b 100644 --- a/pkg/registry/async/async_registry.go +++ b/pkg/registry/async/async_registry.go @@ -245,6 +245,39 @@ func (c *Async) ImageInfo(ctx context.Context, repo string, tag string) (image r return info, nil } +func (c *Async) ClearCache() { + c.reposMutex.Lock() + defer c.reposMutex.Unlock() + + // Clear the repository list + c.repos = map[string]registry.RepoData{} + + // Clear repository tags + c.repositoryTags.Clear() + + // Clear image info + c.imageInfo.Clear() +} + +func (c *Async) ClearRepositoryCache(repository string) { + c.reposMutex.Lock() + defer c.reposMutex.Unlock() + + // Remove specific repository from the list + delete(c.repos, repository) + + // Remove repository tags + c.repositoryTags.Delete(repository) + + // Remove all image info for this repository + c.imageInfo.Range(func(key imageInfoKey, value registry.ImageInfo) bool { + if key.repo == repository { + c.imageInfo.Delete(key) + } + return true + }) +} + func New( client *registryimpl.Registry, refreshInterval time.Duration, diff --git a/pkg/server/cache_manager.go b/pkg/server/cache_manager.go new file mode 100644 index 0000000..e7aca47 --- /dev/null +++ b/pkg/server/cache_manager.go @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright 2024 Seqera +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package server + +import ( + "log/slog" + + "github.com/chenyahui/gin-cache/persist" + "github.com/seqeralabs/staticreg/pkg/registry/async" +) + +type CacheManager struct { + httpCacheStore *persist.MemoryStore + asyncRegistry *async.Async + logger *slog.Logger +} + +func NewCacheManager(httpCacheStore *persist.MemoryStore, asyncRegistry *async.Async, logger *slog.Logger) *CacheManager { + return &CacheManager{ + httpCacheStore: httpCacheStore, + asyncRegistry: asyncRegistry, + logger: logger, + } +} + +func (cm *CacheManager) ClearAll() error { + // Clear the async registry cache + cm.asyncRegistry.ClearCache() + cm.logger.Info("Async registry cache cleared") + + // Clear the HTTP response cache + // Note: persist.MemoryStore doesn't expose a direct clear method, + // but we can access the underlying cache via reflection or by + // creating a new store. For now, we'll do a best effort approach. + // TODO: Consider creating a custom cache store that exposes Clear() + cm.logger.Info("Cache invalidation completed") + + return nil +} + +func (cm *CacheManager) InvalidateRepository(repository string) error { + // Clear specific repository data from the async registry cache + cm.asyncRegistry.ClearRepositoryCache(repository) + cm.logger.Info("Repository cache invalidated", "repository", repository) + + // For HTTP response cache, we can't selectively clear by repository, + // but the cache will naturally expire based on cache-duration setting + // TODO: Consider implementing selective HTTP cache invalidation + + return nil +} \ No newline at end of file diff --git a/pkg/server/server.go b/pkg/server/server.go index 42451b1..0964238 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -12,8 +12,10 @@ import ( cache "github.com/chenyahui/gin-cache" "github.com/chenyahui/gin-cache/persist" sloggin "github.com/samber/slog-gin" + "github.com/seqeralabs/staticreg/pkg/registry/async" "github.com/seqeralabs/staticreg/pkg/serviceinfo" "github.com/seqeralabs/staticreg/pkg/static" + "github.com/seqeralabs/staticreg/pkg/webhook" "golang.org/x/sync/errgroup" "github.com/gin-gonic/gin" @@ -28,8 +30,9 @@ var ( ) type Server struct { - server *http.Server - gin *gin.Engine + server *http.Server + gin *gin.Engine + cacheManager *CacheManager } type ServerImpl interface { @@ -46,6 +49,7 @@ type ServerImpl interface { func New( bindAddr string, serverImpl ServerImpl, + asyncRegistry *async.Async, log *slog.Logger, cacheDuration time.Duration, ignoredUserAgents []string, @@ -68,6 +72,7 @@ func New( r.Use(sloggin.NewWithConfig(log, lmConfig)) r.Use(gin.Recovery()) store := persist.NewMemoryStore(cacheDuration) + cacheManager := NewCacheManager(store, asyncRegistry, log) r.Use(injectLoggerMiddleware(log)) r.NoRoute(serverImpl.NoRouteHandler) r.Use(serverImpl.NotFoundHandler) @@ -88,6 +93,7 @@ func New( apiRoutes := r.Group("/api") { apiRoutes.GET("/search", serverImpl.SearchHandler) + apiRoutes.POST("/webhook/registry", registryWebhookHandler(cacheManager, log)) } htmlRoutes := r.Group("/") @@ -105,8 +111,9 @@ func New( } return &Server{ - gin: r, - server: srv, + gin: r, + server: srv, + cacheManager: cacheManager, }, nil } @@ -167,3 +174,65 @@ func serviceInfoHandler(si *serviceinfo.ServiceInfo) gin.HandlerFunc { ctx.JSON(http.StatusOK, si) } } + +func registryWebhookHandler(cacheManager *CacheManager, log *slog.Logger) gin.HandlerFunc { + return func(ctx *gin.Context) { + var envelope webhook.DistributionEventEnvelope + if err := ctx.ShouldBindJSON(&envelope); err != nil { + log.Warn("Failed to parse webhook payload", "error", err) + ctx.JSON(http.StatusBadRequest, gin.H{"error": "Invalid JSON payload"}) + return + } + + processedRepos := make(map[string]bool) + eventsProcessed := 0 + + for _, event := range envelope.Events { + // Only process push events for manifests (final step of container push) + if !event.IsManifestPush() { + log.Debug("Skipping non-manifest push event", + "action", event.Action, + "mediaType", event.Target.MediaType, + "repository", event.Target.Repository) + continue + } + + repository := event.Target.Repository + + // Avoid duplicate processing for the same repository in this batch + if processedRepos[repository] { + log.Debug("Repository already processed in this batch", "repository", repository) + continue + } + + log.Info("Processing push event for repository", + "repository", repository, + "digest", event.Target.Digest, + "tag", event.Target.Tag) + + // Invalidate cache for this specific repository + err := cacheManager.InvalidateRepository(repository) + if err != nil { + log.Error("Failed to invalidate repository cache", + "repository", repository, + "error", err) + // Continue processing other repositories even if one fails + continue + } + + processedRepos[repository] = true + eventsProcessed++ + } + + log.Info("Webhook processing completed", + "totalEvents", len(envelope.Events), + "eventsProcessed", eventsProcessed, + "repositoriesInvalidated", len(processedRepos)) + + ctx.JSON(http.StatusOK, gin.H{ + "message": "Webhook processed successfully", + "eventsProcessed": eventsProcessed, + "repositoriesInvalidated": len(processedRepos), + }) + } +} diff --git a/pkg/webhook/events.go b/pkg/webhook/events.go new file mode 100644 index 0000000..3e2cebc --- /dev/null +++ b/pkg/webhook/events.go @@ -0,0 +1,77 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright 2024 Seqera +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package webhook + +import "time" + +// DistributionEventEnvelope represents the envelope containing Docker Distribution events +type DistributionEventEnvelope struct { + Events []DistributionEvent `json:"events"` +} + +// DistributionEvent represents a single Docker Distribution event +type DistributionEvent struct { + ID string `json:"id"` + Timestamp time.Time `json:"timestamp"` + Action string `json:"action"` + Target DistributionEventTarget `json:"target"` + Request DistributionEventRequest `json:"request"` + Actor DistributionEventActor `json:"actor"` + Source DistributionEventSource `json:"source"` +} + +// DistributionEventTarget contains details about the affected artifact +type DistributionEventTarget struct { + MediaType string `json:"mediaType"` + Size int64 `json:"size,omitempty"` + Digest string `json:"digest"` + Length int64 `json:"length,omitempty"` + Repository string `json:"repository"` + URL string `json:"url,omitempty"` + Tag string `json:"tag,omitempty"` +} + +// DistributionEventRequest contains request metadata +type DistributionEventRequest struct { + ID string `json:"id"` + Addr string `json:"addr"` + Host string `json:"host"` + Method string `json:"method"` + UserAgent string `json:"useragent"` +} + +// DistributionEventActor represents the agent that initiated the event +type DistributionEventActor struct { + Name string `json:"name,omitempty"` +} + +// DistributionEventSource contains information about the registry node +type DistributionEventSource struct { + Addr string `json:"addr"` + InstanceID string `json:"instanceID"` +} + +// IsPushEvent checks if the event is a push action +func (e *DistributionEvent) IsPushEvent() bool { + return e.Action == "push" +} + +// IsManifestPush checks if the event is a manifest push (final step of container push) +func (e *DistributionEvent) IsManifestPush() bool { + return e.IsPushEvent() && (e.Target.MediaType == "application/vnd.docker.distribution.manifest.v2+json" || + e.Target.MediaType == "application/vnd.docker.distribution.manifest.list.v2+json" || + e.Target.MediaType == "application/vnd.oci.image.manifest.v1+json" || + e.Target.MediaType == "application/vnd.oci.image.index.v1+json") +} \ No newline at end of file