Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions cluster_smigrating_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package redis

import (
"context"
"sync/atomic"
"testing"

"github.com/redis/go-redis/v9/maintnotifications"
)

// TestClusterClientSMigratedCallback tests that ClusterClient sets up SMIGRATED callback on node clients
func TestClusterClientSMigratedCallback(t *testing.T) {
t.Run("CallbackSetupWithMaintNotifications", func(t *testing.T) {
// Track if state reload was called
var reloadCalled atomic.Bool

// Create cluster options with maintnotifications enabled
opt := &ClusterOptions{
Addrs: []string{"localhost:7000"}, // Dummy address
MaintNotificationsConfig: &maintnotifications.Config{
Mode: maintnotifications.ModeEnabled,
},
// Use custom NewClient to track when nodes are created
NewClient: func(opt *Options) *Client {
client := NewClient(opt)
return client
},
}

// Create cluster client
cluster := NewClusterClient(opt)
defer cluster.Close()

// Manually trigger node creation by calling GetOrCreate
// This simulates what happens during normal cluster operations
node, err := cluster.nodes.GetOrCreate("localhost:7000")
if err != nil {
t.Fatalf("Failed to create node: %v", err)
}

// Get the maintnotifications manager from the node client
manager := node.Client.GetMaintNotificationsManager()
if manager == nil {
t.Skip("MaintNotifications manager not initialized (expected if not connected to real Redis)")
return
}

// Set up cluster state reload callback for testing
var receivedHostPort string
var receivedSlotRanges []string
manager.SetClusterStateReloadCallback(func(ctx context.Context, hostPort string, slotRanges []string) {
reloadCalled.Store(true)
receivedHostPort = hostPort
receivedSlotRanges = slotRanges
})

// Trigger the callback (this is what SMIGRATED notification would do)
ctx := context.Background()
testHostPort := "127.0.0.1:6379"
testSlotRanges := []string{"1234", "5000-6000"}
manager.TriggerClusterStateReload(ctx, testHostPort, testSlotRanges)

// Verify callback was called
if !reloadCalled.Load() {
t.Error("Cluster state reload callback should have been called")
}

// Verify host:port was passed correctly
if receivedHostPort != testHostPort {
t.Errorf("Expected host:port %s, got %s", testHostPort, receivedHostPort)
}

// Verify slot ranges were passed correctly
if len(receivedSlotRanges) != len(testSlotRanges) {
t.Errorf("Expected %d slot ranges, got %d", len(testSlotRanges), len(receivedSlotRanges))
}
})

t.Run("NoCallbackWithoutMaintNotifications", func(t *testing.T) {
// Create cluster options WITHOUT maintnotifications
opt := &ClusterOptions{
Addrs: []string{"localhost:7000"}, // Dummy address
// MaintNotificationsConfig is nil
}

// Create cluster client
cluster := NewClusterClient(opt)
defer cluster.Close()

// The OnNewNode callback should not be registered when MaintNotificationsConfig is nil
// This test just verifies that the cluster client doesn't panic
})
}

// TestClusterClientSMigratedIntegration tests SMIGRATED notification handling in cluster context
func TestClusterClientSMigratedIntegration(t *testing.T) {
t.Run("SMigratedTriggersStateReload", func(t *testing.T) {
// This test verifies the integration between SMIGRATED notification and cluster state reload
// We verify that the callback is properly set up to call cluster.state.LazyReload()

// Create cluster options with maintnotifications enabled
opt := &ClusterOptions{
Addrs: []string{"localhost:7000"},
MaintNotificationsConfig: &maintnotifications.Config{
Mode: maintnotifications.ModeEnabled,
},
}

// Create cluster client
cluster := NewClusterClient(opt)
defer cluster.Close()

// Create a node
node, err := cluster.nodes.GetOrCreate("localhost:7000")
if err != nil {
t.Fatalf("Failed to create node: %v", err)
}

// Get the maintnotifications manager
manager := node.Client.GetMaintNotificationsManager()
if manager == nil {
t.Skip("MaintNotifications manager not initialized (expected if not connected to real Redis)")
return
}

// Verify that the callback is set by checking it's not nil
// We can't directly test LazyReload being called without a real cluster,
// but we can verify the callback mechanism works
var callbackWorks atomic.Bool
var receivedHostPort string
var receivedSlotRanges []string
manager.SetClusterStateReloadCallback(func(ctx context.Context, hostPort string, slotRanges []string) {
callbackWorks.Store(true)
receivedHostPort = hostPort
receivedSlotRanges = slotRanges
})

ctx := context.Background()
testHostPort := "127.0.0.1:7000"
testSlotRanges := []string{"5678"}
manager.TriggerClusterStateReload(ctx, testHostPort, testSlotRanges)

if !callbackWorks.Load() {
t.Error("Callback mechanism should work")
}

if receivedHostPort != testHostPort {
t.Errorf("Expected host:port %s, got %s", testHostPort, receivedHostPort)
}

if len(receivedSlotRanges) != 1 || receivedSlotRanges[0] != "5678" {
t.Errorf("Expected slot ranges [5678], got %v", receivedSlotRanges)
}
})
}

// TestSMigratingAndSMigratedConstants verifies the SMIGRATING and SMIGRATED constants are exported
func TestSMigratingAndSMigratedConstants(t *testing.T) {
// This test verifies that the SMIGRATING and SMIGRATED constants are properly defined
// and accessible from the maintnotifications package
if maintnotifications.NotificationSMigrating != "SMIGRATING" {
t.Errorf("Expected NotificationSMigrating to be 'SMIGRATING', got: %s", maintnotifications.NotificationSMigrating)
}

if maintnotifications.NotificationSMigrated != "SMIGRATED" {
t.Errorf("Expected NotificationSMigrated to be 'SMIGRATED', got: %s", maintnotifications.NotificationSMigrated)
}
}

20 changes: 15 additions & 5 deletions commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8905,27 +8905,37 @@ var _ = Describe("Commands", func() {
const key = "latency-monitor-threshold"

old := client.ConfigGet(ctx, key).Val()
client.ConfigSet(ctx, key, "1")
// Use a higher threshold (100ms) to avoid capturing normal operations
// that could cause flakiness due to timing variations
client.ConfigSet(ctx, key, "100")
defer client.ConfigSet(ctx, key, old[key])

result, err := client.Latency(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(result)).Should(Equal(0))

err = client.Do(ctx, "DEBUG", "SLEEP", 0.01).Err()
// Use a longer sleep (150ms) to ensure it exceeds the 100ms threshold
err = client.Do(ctx, "DEBUG", "SLEEP", 0.15).Err()
Expect(err).NotTo(HaveOccurred())

result, err = client.Latency(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(result)).Should(Equal(1))
Expect(len(result)).Should(BeNumerically(">=", 1))

// reset latency by event name
err = client.LatencyReset(ctx, result[0].Name).Err()
eventName := result[0].Name
err = client.LatencyReset(ctx, eventName).Err()
Expect(err).NotTo(HaveOccurred())

// Verify the specific event was reset (not that all events are gone)
// This avoids flakiness from other operations triggering latency events
result, err = client.Latency(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(result)).Should(Equal(0))
for _, event := range result {
if event.Name == eventName {
Fail("Event " + eventName + " should have been reset")
}
}
})
})
})
Expand Down
45 changes: 45 additions & 0 deletions internal/maintnotifications/logs/log_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ const (
UnrelaxedTimeoutMessage = "clearing relaxed timeout"
ManagerNotInitializedMessage = "manager not initialized"
FailedToMarkForHandoffMessage = "failed to mark connection for handoff"
InvalidSeqIDInSMigratingNotificationMessage = "invalid SeqID in SMIGRATING notification"
InvalidSeqIDInSMigratedNotificationMessage = "invalid SeqID in SMIGRATED notification"
InvalidHostPortInSMigratedNotificationMessage = "invalid host:port in SMIGRATED notification"
SlotMigratingMessage = "slots migrating, applying relaxed timeout"
SlotMigratedMessage = "slots migrated, triggering cluster state reload"

// ========================================
// used in pool/conn
Expand Down Expand Up @@ -623,3 +628,43 @@ func ExtractDataFromLogMessage(logMessage string) map[string]interface{} {
// If JSON parsing fails, return empty map
return result
}

// Cluster notification functions
func InvalidSeqIDInSMigratingNotification(seqID interface{}) string {
message := fmt.Sprintf("%s: %v", InvalidSeqIDInSMigratingNotificationMessage, seqID)
return appendJSONIfDebug(message, map[string]interface{}{
"seqID": fmt.Sprintf("%v", seqID),
})
}

func InvalidSeqIDInSMigratedNotification(seqID interface{}) string {
message := fmt.Sprintf("%s: %v", InvalidSeqIDInSMigratedNotificationMessage, seqID)
return appendJSONIfDebug(message, map[string]interface{}{
"seqID": fmt.Sprintf("%v", seqID),
})
}

func InvalidHostPortInSMigratedNotification(hostPort interface{}) string {
message := fmt.Sprintf("%s: %v", InvalidHostPortInSMigratedNotificationMessage, hostPort)
return appendJSONIfDebug(message, map[string]interface{}{
"hostPort": fmt.Sprintf("%v", hostPort),
})
}

func SlotMigrating(connID uint64, seqID int64, slotRanges []string) string {
message := fmt.Sprintf("conn[%d] %s seqID=%d slots=%v", connID, SlotMigratingMessage, seqID, slotRanges)
return appendJSONIfDebug(message, map[string]interface{}{
"connID": connID,
"seqID": seqID,
"slotRanges": slotRanges,
})
}

func SlotMigrated(seqID int64, hostPort string, slotRanges []string) string {
message := fmt.Sprintf("%s seqID=%d host:port=%s slots=%v", SlotMigratedMessage, seqID, hostPort, slotRanges)
return appendJSONIfDebug(message, map[string]interface{}{
"seqID": seqID,
"hostPort": hostPort,
"slotRanges": slotRanges,
})
}
10 changes: 8 additions & 2 deletions maintnotifications/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@

Seamless Redis connection handoffs during cluster maintenance operations without dropping connections.

## ⚠️ **Important Note**
**Maintenance notifications are currently supported only in standalone Redis clients.** Cluster clients (ClusterClient, FailoverClient, etc.) do not yet support this functionality.
## Cluster Support

**Cluster notifications are now supported for ClusterClient!**

- **SMIGRATING**: `["SMIGRATING", SeqID, slot/range, ...]` - Relaxes timeouts when slots are being migrated
- **SMIGRATED**: `["SMIGRATED", SeqID, host:port, slot/range, ...]` - Reloads cluster state when slot migration completes

**Note:** Other maintenance notifications (MOVING, MIGRATING, MIGRATED, FAILING_OVER, FAILED_OVER) are supported only in standalone Redis clients. Cluster clients support SMIGRATING and SMIGRATED for cluster-specific slot migration handling.

## Quick Start

Expand Down
52 changes: 47 additions & 5 deletions maintnotifications/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import (

// Push notification type constants for maintenance
const (
NotificationMoving = "MOVING"
NotificationMigrating = "MIGRATING"
NotificationMigrated = "MIGRATED"
NotificationFailingOver = "FAILING_OVER"
NotificationFailedOver = "FAILED_OVER"
NotificationMoving = "MOVING" // Per-connection handoff notification
NotificationMigrating = "MIGRATING" // Per-connection migration start notification - relaxes timeouts
NotificationMigrated = "MIGRATED" // Per-connection migration complete notification - clears relaxed timeouts
NotificationFailingOver = "FAILING_OVER" // Per-connection failover start notification - relaxes timeouts
NotificationFailedOver = "FAILED_OVER" // Per-connection failover complete notification - clears relaxed timeouts
NotificationSMigrating = "SMIGRATING" // Cluster slot migrating notification - relaxes timeouts
NotificationSMigrated = "SMIGRATED" // Cluster slot migrated notification - triggers cluster state reload
)

// maintenanceNotificationTypes contains all notification types that maintenance handles
Expand All @@ -32,6 +34,8 @@ var maintenanceNotificationTypes = []string{
NotificationMigrated,
NotificationFailingOver,
NotificationFailedOver,
NotificationSMigrating,
NotificationSMigrated,
}

// NotificationHook is called before and after notification processing
Expand Down Expand Up @@ -65,6 +69,10 @@ type Manager struct {
// MOVING operation tracking - using sync.Map for better concurrent performance
activeMovingOps sync.Map // map[MovingOperationKey]*MovingOperation

// SMIGRATED notification deduplication - tracks processed SeqIDs
// Multiple connections may receive the same SMIGRATED notification
processedSMigratedSeqIDs sync.Map // map[int64]bool

// Atomic state tracking - no locks needed for state queries
activeOperationCount atomic.Int64 // Number of active operations
closed atomic.Bool // Manager closed state
Expand All @@ -73,6 +81,9 @@ type Manager struct {
hooks []NotificationHook
hooksMu sync.RWMutex // Protects hooks slice
poolHooksRef *PoolHook

// Cluster state reload callback for SMIGRATED notifications
clusterStateReloadCallback ClusterStateReloadCallback
}

// MovingOperation tracks an active MOVING operation.
Expand All @@ -83,6 +94,14 @@ type MovingOperation struct {
Deadline time.Time
}

// ClusterStateReloadCallback is a callback function that triggers cluster state reload.
// This is used by node clients to notify their parent ClusterClient about SMIGRATED notifications.
// The hostPort parameter indicates the destination node (e.g., "127.0.0.1:6379").
// The slotRanges parameter contains the migrated slots (e.g., ["1234", "5000-6000"]).
// Currently, implementations typically reload the entire cluster state, but in the future
// this could be optimized to reload only the specific slots.
type ClusterStateReloadCallback func(ctx context.Context, hostPort string, slotRanges []string)

// NewManager creates a new simplified manager.
func NewManager(client interfaces.ClientInterface, pool pool.Pooler, config *Config) (*Manager, error) {
if client == nil {
Expand Down Expand Up @@ -223,6 +242,15 @@ func (hm *Manager) GetActiveOperationCount() int64 {
return hm.activeOperationCount.Load()
}

// MarkSMigratedSeqIDProcessed attempts to mark a SMIGRATED SeqID as processed.
// Returns true if this is the first time processing this SeqID (should process),
// false if it was already processed (should skip).
// This prevents duplicate processing when multiple connections receive the same notification.
func (hm *Manager) MarkSMigratedSeqIDProcessed(seqID int64) bool {
_, alreadyProcessed := hm.processedSMigratedSeqIDs.LoadOrStore(seqID, true)
return !alreadyProcessed // Return true if NOT already processed
}

// Close closes the manager.
func (hm *Manager) Close() error {
// Use atomic operation for thread-safe close check
Expand Down Expand Up @@ -318,3 +346,17 @@ func (hm *Manager) AddNotificationHook(notificationHook NotificationHook) {
defer hm.hooksMu.Unlock()
hm.hooks = append(hm.hooks, notificationHook)
}

// SetClusterStateReloadCallback sets the callback function that will be called when a SMIGRATED notification is received.
// This allows node clients to notify their parent ClusterClient to reload cluster state.
func (hm *Manager) SetClusterStateReloadCallback(callback ClusterStateReloadCallback) {
hm.clusterStateReloadCallback = callback
}

// TriggerClusterStateReload calls the cluster state reload callback if it's set.
// This is called when a SMIGRATED notification is received.
func (hm *Manager) TriggerClusterStateReload(ctx context.Context, hostPort string, slotRanges []string) {
if hm.clusterStateReloadCallback != nil {
hm.clusterStateReloadCallback(ctx, hostPort, slotRanges)
}
}
2 changes: 2 additions & 0 deletions maintnotifications/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ func TestManagerRefactoring(t *testing.T) {
NotificationMigrated,
NotificationFailingOver,
NotificationFailedOver,
NotificationSMigrating,
NotificationSMigrated,
}

if len(maintenanceNotificationTypes) != len(expectedTypes) {
Expand Down
Loading
Loading