Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ var serveCmd = &cobra.Command{
filler := filler.New(asyncClient, rootCfg.RegistryHostname, "/")

regServer := staticreg.New(asyncClient, filler, rootCfg.RegistryHostname)
srv, err := server.New(bindAddr, regServer, log, cacheDuration, ignoredUserAgents)
srv, err := server.New(bindAddr, regServer, asyncClient, log, cacheDuration, ignoredUserAgents)
if err != nil {
slog.Error("error creating server", logger.ErrAttr(err))
return
Expand Down
33 changes: 33 additions & 0 deletions pkg/registry/async/async_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,39 @@ func (c *Async) ImageInfo(ctx context.Context, repo string, tag string) (image r
return info, nil
}

func (c *Async) ClearCache() {
c.reposMutex.Lock()
defer c.reposMutex.Unlock()

// Clear the repository list
c.repos = map[string]registry.RepoData{}

// Clear repository tags
c.repositoryTags.Clear()

// Clear image info
c.imageInfo.Clear()
}

func (c *Async) ClearRepositoryCache(repository string) {
c.reposMutex.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can probably rework the data structures a bit if we need to support this model instead of doing this logic.

defer c.reposMutex.Unlock()

// Remove specific repository from the list
delete(c.repos, repository)

// Remove repository tags
c.repositoryTags.Delete(repository)

// Remove all image info for this repository
c.imageInfo.Range(func(key imageInfoKey, value registry.ImageInfo) bool {
if key.repo == repository {
c.imageInfo.Delete(key)
}
return true
})
}

func New(
client *registryimpl.Registry,
refreshInterval time.Duration,
Expand Down
63 changes: 63 additions & 0 deletions pkg/server/cache_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright 2024 Seqera
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server

import (
"log/slog"

"github.com/chenyahui/gin-cache/persist"
"github.com/seqeralabs/staticreg/pkg/registry/async"
)

type CacheManager struct {
httpCacheStore *persist.MemoryStore
asyncRegistry *async.Async
logger *slog.Logger
}

func NewCacheManager(httpCacheStore *persist.MemoryStore, asyncRegistry *async.Async, logger *slog.Logger) *CacheManager {
return &CacheManager{
httpCacheStore: httpCacheStore,
asyncRegistry: asyncRegistry,
logger: logger,
}
}

func (cm *CacheManager) ClearAll() error {
// Clear the async registry cache
cm.asyncRegistry.ClearCache()
cm.logger.Info("Async registry cache cleared")

// Clear the HTTP response cache
// Note: persist.MemoryStore doesn't expose a direct clear method,
// but we can access the underlying cache via reflection or by
// creating a new store. For now, we'll do a best effort approach.
// TODO: Consider creating a custom cache store that exposes Clear()
cm.logger.Info("Cache invalidation completed")

return nil
}

func (cm *CacheManager) InvalidateRepository(repository string) error {
// Clear specific repository data from the async registry cache
cm.asyncRegistry.ClearRepositoryCache(repository)
cm.logger.Info("Repository cache invalidated", "repository", repository)

// For HTTP response cache, we can't selectively clear by repository,
// but the cache will naturally expire based on cache-duration setting
// TODO: Consider implementing selective HTTP cache invalidation

return nil
}
77 changes: 73 additions & 4 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
cache "github.com/chenyahui/gin-cache"
"github.com/chenyahui/gin-cache/persist"
sloggin "github.com/samber/slog-gin"
"github.com/seqeralabs/staticreg/pkg/registry/async"
"github.com/seqeralabs/staticreg/pkg/serviceinfo"
"github.com/seqeralabs/staticreg/pkg/static"
"github.com/seqeralabs/staticreg/pkg/webhook"
"golang.org/x/sync/errgroup"

"github.com/gin-gonic/gin"
Expand All @@ -28,8 +30,9 @@ var (
)

type Server struct {
server *http.Server
gin *gin.Engine
server *http.Server
gin *gin.Engine
cacheManager *CacheManager
}

type ServerImpl interface {
Expand All @@ -46,6 +49,7 @@ type ServerImpl interface {
func New(
bindAddr string,
serverImpl ServerImpl,
asyncRegistry *async.Async,
log *slog.Logger,
cacheDuration time.Duration,
ignoredUserAgents []string,
Expand All @@ -68,6 +72,7 @@ func New(
r.Use(sloggin.NewWithConfig(log, lmConfig))
r.Use(gin.Recovery())
store := persist.NewMemoryStore(cacheDuration)
cacheManager := NewCacheManager(store, asyncRegistry, log)
r.Use(injectLoggerMiddleware(log))
r.NoRoute(serverImpl.NoRouteHandler)
r.Use(serverImpl.NotFoundHandler)
Expand All @@ -88,6 +93,7 @@ func New(
apiRoutes := r.Group("/api")
{
apiRoutes.GET("/search", serverImpl.SearchHandler)
apiRoutes.POST("/webhook/registry", registryWebhookHandler(cacheManager, log))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this endpoint should probably be bound to a different listening address to avoid publishing it over the internet or use the auth mechanism they provide.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep simple, no need for auth

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could use separate port

}

htmlRoutes := r.Group("/")
Expand All @@ -105,8 +111,9 @@ func New(
}

return &Server{
gin: r,
server: srv,
gin: r,
server: srv,
cacheManager: cacheManager,
}, nil
}

Expand Down Expand Up @@ -167,3 +174,65 @@ func serviceInfoHandler(si *serviceinfo.ServiceInfo) gin.HandlerFunc {
ctx.JSON(http.StatusOK, si)
}
}

func registryWebhookHandler(cacheManager *CacheManager, log *slog.Logger) gin.HandlerFunc {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this endpoint needs to go in ServerImpl and the cache manager likely be a dependency of it

return func(ctx *gin.Context) {
var envelope webhook.DistributionEventEnvelope
if err := ctx.ShouldBindJSON(&envelope); err != nil {
log.Warn("Failed to parse webhook payload", "error", err)
ctx.JSON(http.StatusBadRequest, gin.H{"error": "Invalid JSON payload"})
return
}

processedRepos := make(map[string]bool)
eventsProcessed := 0

for _, event := range envelope.Events {
// Only process push events for manifests (final step of container push)
if !event.IsManifestPush() {
log.Debug("Skipping non-manifest push event",
"action", event.Action,
"mediaType", event.Target.MediaType,
"repository", event.Target.Repository)
continue
}

repository := event.Target.Repository

// Avoid duplicate processing for the same repository in this batch
if processedRepos[repository] {
log.Debug("Repository already processed in this batch", "repository", repository)
continue
}

log.Info("Processing push event for repository",
"repository", repository,
"digest", event.Target.Digest,
"tag", event.Target.Tag)

// Invalidate cache for this specific repository
err := cacheManager.InvalidateRepository(repository)
if err != nil {
log.Error("Failed to invalidate repository cache",
"repository", repository,
"error", err)
// Continue processing other repositories even if one fails
continue
}

processedRepos[repository] = true
eventsProcessed++
}

log.Info("Webhook processing completed",
"totalEvents", len(envelope.Events),
"eventsProcessed", eventsProcessed,
"repositoriesInvalidated", len(processedRepos))

ctx.JSON(http.StatusOK, gin.H{
"message": "Webhook processed successfully",
"eventsProcessed": eventsProcessed,
"repositoriesInvalidated": len(processedRepos),
})
}
}
77 changes: 77 additions & 0 deletions pkg/webhook/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright 2024 Seqera
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package webhook

import "time"

// DistributionEventEnvelope represents the envelope containing Docker Distribution events
type DistributionEventEnvelope struct {
Events []DistributionEvent `json:"events"`
}

// DistributionEvent represents a single Docker Distribution event
type DistributionEvent struct {
ID string `json:"id"`
Timestamp time.Time `json:"timestamp"`
Action string `json:"action"`
Target DistributionEventTarget `json:"target"`
Request DistributionEventRequest `json:"request"`
Actor DistributionEventActor `json:"actor"`
Source DistributionEventSource `json:"source"`
}

// DistributionEventTarget contains details about the affected artifact
type DistributionEventTarget struct {
MediaType string `json:"mediaType"`
Size int64 `json:"size,omitempty"`
Digest string `json:"digest"`
Length int64 `json:"length,omitempty"`
Repository string `json:"repository"`
URL string `json:"url,omitempty"`
Tag string `json:"tag,omitempty"`
}

// DistributionEventRequest contains request metadata
type DistributionEventRequest struct {
ID string `json:"id"`
Addr string `json:"addr"`
Host string `json:"host"`
Method string `json:"method"`
UserAgent string `json:"useragent"`
}

// DistributionEventActor represents the agent that initiated the event
type DistributionEventActor struct {
Name string `json:"name,omitempty"`
}

// DistributionEventSource contains information about the registry node
type DistributionEventSource struct {
Addr string `json:"addr"`
InstanceID string `json:"instanceID"`
}

// IsPushEvent checks if the event is a push action
func (e *DistributionEvent) IsPushEvent() bool {
return e.Action == "push"
}

// IsManifestPush checks if the event is a manifest push (final step of container push)
func (e *DistributionEvent) IsManifestPush() bool {
return e.IsPushEvent() && (e.Target.MediaType == "application/vnd.docker.distribution.manifest.v2+json" ||
e.Target.MediaType == "application/vnd.docker.distribution.manifest.list.v2+json" ||
e.Target.MediaType == "application/vnd.oci.image.manifest.v1+json" ||
e.Target.MediaType == "application/vnd.oci.image.index.v1+json")
}