Skip to content

Commit cab1563

Browse files
authored
Add support for ASK errors being returned by Valkey cluster (#199)
1 parent 0a4f34d commit cab1563

15 files changed

+308
-70
lines changed

Sources/Valkey/Cluster/HashSlotShardMap.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,8 @@ package struct HashSlotShardMap: Sendable {
181181
///
182182
/// This logic was first implemented in `valkey-glide` (see `Notice.txt`) and adopted for Swift here.
183183
@usableFromInline
184-
package mutating func updateSlots(with movedError: ValkeyMovedError) -> UpdateSlotsResult {
184+
package mutating func updateSlots(with movedError: ValkeyClusterRedirectionError) -> UpdateSlotsResult {
185+
assert(movedError.redirection == .move)
185186
if let shardIndex = self.slotToShardID[Int(movedError.slot.rawValue)].value {
186187
// if the slot had a shard assignment before
187188
var shard = self.shardIDToShard[shardIndex]

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -148,18 +148,28 @@ public final class ValkeyClusterClient: Sendable {
148148
try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
149149
}
150150

151+
var asking = false
151152
while !Task.isCancelled {
152153
do {
153154
let client = try await clientSelector()
154-
return try await client.execute(command)
155+
if asking {
156+
// if asking we need to call ASKING beforehand otherwise we will get a MOVE error
157+
return try await client.execute(
158+
ASKING(),
159+
command
160+
).1.get()
161+
} else {
162+
return try await client.execute(command)
163+
}
155164
} catch ValkeyClusterError.noNodeToTalkTo {
156165
// TODO: Rerun node discovery!
157166
} catch let error as ValkeyClientError where error.errorCode == .commandError {
158-
guard let errorMessage = error.message, let movedError = ValkeyMovedError(errorMessage) else {
167+
guard let errorMessage = error.message, let redirectError = ValkeyClusterRedirectionError(errorMessage) else {
159168
throw error
160169
}
161-
self.logger.trace("Received move error", metadata: ["error": "\(movedError)"])
162-
clientSelector = { try await self.nodeClient(for: movedError) }
170+
self.logger.trace("Received redirect error", metadata: ["error": "\(redirectError)"])
171+
clientSelector = { try await self.nodeClient(for: redirectError) }
172+
asking = (redirectError.redirection == .ask)
163173
}
164174
}
165175
throw CancellationError()
@@ -366,25 +376,29 @@ public final class ValkeyClusterClient: Sendable {
366376
/// This internal method is used when handling cluster topology changes indicated by
367377
/// MOVED responses from Valkey nodes.
368378
///
369-
/// - Parameter moveError: The MOVED error response from a Valkey node.
379+
/// - Parameter redirectError: The MOVED/ASK error response from a Valkey node.
370380
/// - Returns: A ``ValkeyNode`` connected to the node that can handle the request.
371381
/// - Throws:
372382
/// - `ValkeyClusterError.waitedForDiscoveryAfterMovedErrorThreeTimes` if unable to resolve
373383
/// the MOVED error after multiple attempts
374384
/// - `ValkeyClusterError.clientRequestCancelled` if the request is cancelled
375385
@usableFromInline
376-
/* private */ func nodeClient(for moveError: ValkeyMovedError) async throws -> ValkeyNodeClient {
386+
/* private */ func nodeClient(for redirectError: ValkeyClusterRedirectionError) async throws -> ValkeyNodeClient {
377387
var counter = 0
378388
while counter < 3 {
379389
defer { counter += 1 }
380-
let action = try self.stateLock.withLock { stateMachine throws(ValkeyClusterError) -> StateMachine.PoolForMovedErrorAction in
381-
try stateMachine.poolFastPath(for: moveError)
390+
let action = try self.stateLock.withLock { stateMachine throws(ValkeyClusterError) -> StateMachine.PoolForRedirectErrorAction in
391+
try stateMachine.poolFastPath(for: redirectError)
382392
}
383393

384394
switch action {
385395
case .connectionPool(let node):
386396
return node
387397

398+
case .runAndUseConnectionPool(let node):
399+
self.queueAction(.runClient(node))
400+
return node
401+
388402
case .waitForDiscovery:
389403
break
390404

@@ -432,7 +446,7 @@ public final class ValkeyClusterClient: Sendable {
432446
/// - `ValkeyClusterError.clusterIsUnavailable` if no healthy nodes are available
433447
/// - `ValkeyClusterError.clusterIsMissingSlotAssignment` if the slot assignment cannot be determined
434448
@inlinable
435-
func nodeClient(for slots: [HashSlot]) async throws -> ValkeyNodeClient {
449+
package func nodeClient(for slots: some (Collection<HashSlot> & Sendable)) async throws -> ValkeyNodeClient {
436450
var retries = 0
437451
while retries < 3 {
438452
defer { retries += 1 }
@@ -492,7 +506,7 @@ public final class ValkeyClusterClient: Sendable {
492506
/// Handles the transition to a degraded state when a moved error is received.
493507
///
494508
/// - Parameter action: The action containing operations for degraded mode.
495-
private func runMovedToDegraded(_ action: StateMachine.PoolForMovedErrorAction.MoveToDegraded) {
509+
private func runMovedToDegraded(_ action: StateMachine.PoolForRedirectErrorAction.MoveToDegraded) {
496510
if let cancelToken = action.runDiscoveryAndCancelTimer {
497511
cancelToken.yield()
498512
self.queueAction(.runClusterDiscovery(runNodeDiscovery: false))

Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -559,8 +559,9 @@ package struct ValkeyClusterClientStateMachine<
559559
}
560560

561561
@usableFromInline
562-
package enum PoolForMovedErrorAction {
562+
package enum PoolForRedirectErrorAction {
563563
case connectionPool(ConnectionPool)
564+
case runAndUseConnectionPool(ConnectionPool)
564565
case moveToDegraded(MoveToDegraded)
565566
case waitForDiscovery
566567

@@ -577,7 +578,29 @@ package struct ValkeyClusterClientStateMachine<
577578
}
578579

579580
@usableFromInline
580-
package mutating func poolFastPath(for movedError: ValkeyMovedError) throws(ValkeyClusterError) -> PoolForMovedErrorAction {
581+
package mutating func poolFastPath(for redirectError: ValkeyClusterRedirectionError) throws(ValkeyClusterError) -> PoolForRedirectErrorAction {
582+
switch redirectError.redirection {
583+
case .ask:
584+
return try poolFastPath(forAskError: redirectError)
585+
case .move:
586+
return try poolFastPath(forMovedError: redirectError)
587+
}
588+
}
589+
590+
@usableFromInline
591+
package mutating func poolFastPath(forAskError askError: ValkeyClusterRedirectionError) throws(ValkeyClusterError) -> PoolForRedirectErrorAction {
592+
switch self.runningClients.addNode(ValkeyNodeDescription(redirectionError: askError)) {
593+
case .useExistingPool(let connectionPool):
594+
return .connectionPool(connectionPool)
595+
case .runAndUsePool(let connectionPool):
596+
return .runAndUseConnectionPool(connectionPool)
597+
}
598+
}
599+
600+
@usableFromInline
601+
package mutating func poolFastPath(
602+
forMovedError movedError: ValkeyClusterRedirectionError
603+
) throws(ValkeyClusterError) -> PoolForRedirectErrorAction {
581604
switch self.clusterState {
582605
case .unavailable(let unavailableContext):
583606
if unavailableContext.start.advanced(by: self.configuration.circuitBreakerDuration) > self.clock.now {
@@ -606,7 +629,6 @@ package struct ValkeyClusterClientStateMachine<
606629
return .connectionPool(pool)
607630
}
608631
}
609-
610632
let circuitBreakerTimerID = self.nextTimerID()
611633

612634
self.clusterState = .degraded(

Sources/Valkey/Cluster/ValkeyMovedError.swift renamed to Sources/Valkey/Cluster/ValkeyClusterRedirectionError.swift

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,31 @@
77
//
88
import NIOCore
99

10-
/// Represents a `MOVED` redirection error from a Valkey cluster node.
10+
/// Represents a redirection error from a Valkey cluster node.
1111
///
1212
/// When a client sends a command to a Valkey cluster node that doesn't own
1313
/// the hash slot for the specified key, the node responds with a `MOVED` error
1414
/// containing information about which node actually owns that slot.
1515
///
16+
/// When a client sends a command to a Valkey cluster node for a hash slot that
17+
/// is currently migrating for a key that does not exist the node responds
18+
/// with a `ASK` error containing information about which node is importing
19+
/// that hash slot
20+
///
1621
/// This error provides the necessary information for clients to redirect their
1722
/// request to the correct node in the cluster.
1823
@usableFromInline
19-
package struct ValkeyMovedError: Hashable, Sendable {
24+
package struct ValkeyClusterRedirectionError: Hashable, Sendable {
25+
@usableFromInline
26+
package enum Redirection: Sendable {
27+
case move
28+
case ask
29+
}
30+
31+
/// Request type
32+
@usableFromInline
33+
package var redirection: Redirection
34+
2035
/// The hash slot number that triggered the redirection.
2136
package var slot: HashSlot
2237

@@ -26,7 +41,8 @@ package struct ValkeyMovedError: Hashable, Sendable {
2641
/// The port number of the node that owns the requested hash slot.
2742
package var port: Int
2843

29-
package init(slot: HashSlot, endpoint: String, port: Int) {
44+
package init(request: Redirection, slot: HashSlot, endpoint: String, port: Int) {
45+
self.redirection = request
3046
self.slot = slot
3147
self.endpoint = endpoint
3248
self.port = port
@@ -38,25 +54,33 @@ package struct ValkeyMovedError: Hashable, Sendable {
3854
}
3955
}
4056

41-
extension ValkeyMovedError {
57+
extension ValkeyClusterRedirectionError {
4258
static let movedPrefix = "MOVED "
59+
static let askPrefix = "ASK "
4360

44-
/// Attempts to parse a Valkey MOVED error from a String.
61+
/// Attempts to parse a Valkey MOVED/ASK error from a String.
4562
///
4663
/// This method extracts the hash slot, endpoint, and port information from the string
47-
/// if it represents a Valkey MOVED error. MOVED errors are returned by Valkey cluster
48-
/// nodes when a client attempts to access a key that belongs to a different node.
64+
/// if it represents a Valkey MOVED/ASK error. Redirection errors are returned by Valkey
65+
/// cluster nodes when a client attempts to access a key that belongs to a different node
66+
/// or the hashslot is currently migrating.
4967
///
5068
/// The error format is expected to be: `"MOVED <slot> <endpoint>:<port>"`
5169
///
52-
/// - Returns: A `ValkeyMovedError` if the token represents a valid MOVED error, or `nil` otherwise.
70+
/// - Returns: A `ValkeyClusterRedirectionError` if the token represents a valid MOVED/ASK error, or `nil` otherwise.
5371
@usableFromInline
5472
init?(_ errorMessage: String) {
55-
guard errorMessage.hasPrefix(Self.movedPrefix) else {
73+
let msg: String.SubSequence
74+
let request: Redirection
75+
if errorMessage.hasPrefix(Self.movedPrefix) {
76+
msg = errorMessage.dropFirst(Self.movedPrefix.count)
77+
request = .move
78+
} else if errorMessage.hasPrefix(Self.askPrefix) {
79+
msg = errorMessage.dropFirst(Self.askPrefix.count)
80+
request = .ask
81+
} else {
5682
return nil
5783
}
58-
59-
let msg = errorMessage.dropFirst(Self.movedPrefix.count)
6084
guard let spaceAfterSlotIndex = msg.firstIndex(where: { $0 == " " }) else {
6185
return nil
6286
}
@@ -79,6 +103,6 @@ extension ValkeyMovedError {
79103
return nil
80104
}
81105

82-
self = ValkeyMovedError(slot: slot, endpoint: Swift.String(endpoint), port: port)
106+
self = ValkeyClusterRedirectionError(request: request, slot: slot, endpoint: Swift.String(endpoint), port: port)
83107
}
84108
}

Sources/Valkey/Cluster/ValkeyNodeDescription.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,18 @@ package struct ValkeyNodeDescription: Identifiable, Hashable, Sendable {
8080
self.port = description.tlsPort ?? description.port ?? 6379
8181
}
8282

83+
/// Creates a node description from a redirection error.
84+
///
85+
/// This initializer converts a `ValkeyClusterRedirectionError` to a `ValkeyNodeDescription`.
86+
///
87+
/// - Parameter redirectionError: A `ValkeyClusterRedirectionError` instance.
88+
package init(redirectionError: ValkeyClusterRedirectionError) {
89+
self.host = nil
90+
self.ip = nil
91+
self.endpoint = redirectionError.endpoint
92+
self.port = redirectionError.port
93+
}
94+
8395
/// Determines whether this node description matches a given cluster node description.
8496
///
8597
/// This method compares the essential connection properties of this node with

Sources/Valkey/Commands/ClusterCommands.swift

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -284,8 +284,6 @@ public enum CLUSTER {
284284
/// Returns the key names in a hash slot.
285285
@_documentation(visibility: internal)
286286
public struct GETKEYSINSLOT: ValkeyCommand {
287-
public typealias Response = RESPToken.Array
288-
289287
@inlinable public static var name: String { "CLUSTER GETKEYSINSLOT" }
290288

291289
public var slot: Int
@@ -462,8 +460,6 @@ public enum CLUSTER {
462460
/// Returns the ID of a node.
463461
@_documentation(visibility: internal)
464462
public struct MYID: ValkeyCommand {
465-
public typealias Response = ByteBuffer
466-
467463
@inlinable public static var name: String { "CLUSTER MYID" }
468464

469465
@inlinable public init() {
@@ -477,8 +473,6 @@ public enum CLUSTER {
477473
/// Returns the shard ID of a node.
478474
@_documentation(visibility: internal)
479475
public struct MYSHARDID: ValkeyCommand {
480-
public typealias Response = ByteBuffer
481-
482476
@inlinable public static var name: String { "CLUSTER MYSHARDID" }
483477

484478
@inlinable public init() {
@@ -1014,7 +1008,7 @@ extension ValkeyClientProtocol {
10141008
/// - Response: [Array]: An array with up to count elements.
10151009
@inlinable
10161010
@discardableResult
1017-
public func clusterGetkeysinslot(slot: Int, count: Int) async throws -> RESPToken.Array {
1011+
public func clusterGetkeysinslot(slot: Int, count: Int) async throws -> CLUSTER.GETKEYSINSLOT.Response {
10181012
try await execute(CLUSTER.GETKEYSINSLOT(slot: slot, count: count))
10191013
}
10201014

@@ -1108,7 +1102,7 @@ extension ValkeyClientProtocol {
11081102
/// - Response: [String]: The node id.
11091103
@inlinable
11101104
@discardableResult
1111-
public func clusterMyid() async throws -> ByteBuffer {
1105+
public func clusterMyid() async throws -> CLUSTER.MYID.Response {
11121106
try await execute(CLUSTER.MYID())
11131107
}
11141108

@@ -1120,7 +1114,7 @@ extension ValkeyClientProtocol {
11201114
/// - Response: [String]: The node's shard id.
11211115
@inlinable
11221116
@discardableResult
1123-
public func clusterMyshardid() async throws -> ByteBuffer {
1117+
public func clusterMyshardid() async throws -> CLUSTER.MYSHARDID.Response {
11241118
try await execute(CLUSTER.MYSHARDID())
11251119
}
11261120

Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,18 @@
77
//
88
import NIOCore
99

10+
extension CLUSTER.GETKEYSINSLOT {
11+
public typealias Response = [ValkeyKey]
12+
}
13+
14+
extension CLUSTER.MYID {
15+
public typealias Response = String
16+
}
17+
18+
extension CLUSTER.MYSHARDID {
19+
public typealias Response = String
20+
}
21+
1022
extension CLUSTER.SHARDS {
1123
public typealias Response = ValkeyClusterDescription
1224
}

Sources/Valkey/Commands/GenericCommands.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,15 +352,15 @@ public struct MIGRATE<Host: RESPStringRenderable>: ValkeyCommand {
352352
public var respEntries: Int {
353353
switch self {
354354
case .key(let key): key.respEntries
355-
case .emptyString: "\"\"".respEntries
355+
case .emptyString: "".respEntries
356356
}
357357
}
358358

359359
@inlinable
360360
public func encode(into commandEncoder: inout ValkeyCommandEncoder) {
361361
switch self {
362362
case .key(let key): key.encode(into: &commandEncoder)
363-
case .emptyString: "\"\"".encode(into: &commandEncoder)
363+
case .emptyString: "".encode(into: &commandEncoder)
364364
}
365365
}
366366
}

Sources/Valkey/Node/ValkeyNodeClient.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,17 @@ public final class ValkeyNodeClient: Sendable {
3535
ContinuousClock
3636
>
3737
/// Server address
38-
let serverAddress: ValkeyServerAddress
38+
public let serverAddress: ValkeyServerAddress
3939
/// Connection pool
4040
let connectionPool: Pool
4141

4242
let connectionFactory: ValkeyConnectionFactory
4343
/// configuration
44-
var configuration: ValkeyClientConfiguration { self.connectionFactory.configuration }
44+
public var configuration: ValkeyClientConfiguration { self.connectionFactory.configuration }
4545
/// EventLoopGroup to use
46-
let eventLoopGroup: any EventLoopGroup
46+
public let eventLoopGroup: any EventLoopGroup
4747
/// Logger
48-
let logger: Logger
48+
public let logger: Logger
4949

5050
package init(
5151
_ address: ValkeyServerAddress,

Sources/Valkey/Node/ValkeyRunningClients.swift

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,22 @@ struct ValkeyRunningClientsStateMachine<
8989
)
9090
}
9191

92+
enum AddNodeAction {
93+
case useExistingPool(ConnectionPool)
94+
case runAndUsePool(ConnectionPool)
95+
}
96+
97+
mutating func addNode(
98+
_ node: ValkeyNodeDescription
99+
) -> AddNodeAction {
100+
if let pool = self.clientMap[node.id] {
101+
return .useExistingPool(pool.pool)
102+
}
103+
let newPool = self.makePool(for: node)
104+
self.clientMap[node.id] = NodeBundle(pool: newPool, nodeDescription: node)
105+
return .runAndUsePool(newPool)
106+
}
107+
92108
@inlinable
93109
subscript(_ index: ValkeyNodeID) -> NodeBundle? {
94110
self.clientMap[index]

0 commit comments

Comments
 (0)