Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
image: ["swift:6.1"]
image: ["swift:6.2"]

container:
image: ${{ matrix.image }}
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/soundness.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ jobs:
with:
api_breakage_check_container_image: swift:latest
docs_check_container_image: swift:latest
# 6.2 does not understand `nonisolated(nonsending)` correctly
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use // swift-format-ignore to get around swift-format not understanding nonisolated(nonsending)

format_check_container_image: swiftlang/swift:nightly-main-noble
unacceptable_language_check_enabled: false # Valkey commands contain unacceptable language
8 changes: 7 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import PackageDescription

let defaultSwiftSettings: [SwiftSetting] =
var defaultSwiftSettings: [SwiftSetting] =
[
.swiftLanguageMode(.v6),
.enableExperimentalFeature("AvailabilityMacro=valkeySwift 1.0:macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0"),
Expand All @@ -15,6 +15,12 @@ let defaultSwiftSettings: [SwiftSetting] =
.enableExperimentalFeature("Lifetimes"),
]

#if compiler(>=6.2)
defaultSwiftSettings.append(
.enableUpcomingFeature("NonisolatedNonsendingByDefault")
)
#endif

let package = Package(
name: "valkey-swift",
products: [
Expand Down
6 changes: 2 additions & 4 deletions Sources/Valkey/Cluster/ValkeyClusterClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -463,20 +463,18 @@ public final class ValkeyClusterClient: Sendable {
/// - Parameters:
/// - keys: Keys affected by operation. This is used to choose the cluster node
/// - readOnly: Is this connection only going to be used with readonly commands
/// - isolation: Actor isolation
/// - operation: Closure handling Valkey connection
/// - Returns: Value returned by closure
@inlinable
public func withConnection<Value>(
forKeys keys: some Collection<ValkeyKey>,
readOnly: Bool = false,
isolation: isolated (any Actor)? = #isolation,
operation: (ValkeyConnection) async throws -> sending Value
operation: (ValkeyConnection) async throws -> Value
) async throws -> Value {
let hashSlots = keys.compactMap { HashSlot(key: $0) }
let nodeSelection = getNodeSelection(readOnly: readOnly)
let node = try await self.nodeClient(for: hashSlots, nodeSelection: nodeSelection)
return try await node.withConnection(isolation: isolation, operation: operation)
return try await node.withConnection(operation: operation)
}

@inlinable
Expand Down
4 changes: 1 addition & 3 deletions Sources/Valkey/Connection/ValkeyConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,21 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
/// connection.
///
/// To avoid the cost of acquiring the connection and then closing it, it is always
/// preferable to use ``ValkeyClient/withConnection(isolation:operation:)`` which
/// preferable to use ``ValkeyClient/withConnection(operation:)`` which
/// uses a persistent connection pool to provide connections to your Valkey database.
///
/// - Parameters:
/// - address: Internet address of database
/// - configuration: Configuration of Valkey connection
/// - eventLoop: EventLoop to run connection on
/// - logger: Logger for connection
/// - isolation: Actor isolation
/// - operation: Closure handling Valkey connection
/// - Returns: Return value of operation closure
public static func withConnection<Value>(
address: ValkeyServerAddress,
configuration: ValkeyConnectionConfiguration = .init(),
eventLoop: any EventLoop = MultiThreadedEventLoopGroup.singleton.any(),
logger: Logger,
isolation: isolated (any Actor)? = #isolation,
operation: (ValkeyConnection) async throws -> sending Value
) async throws -> sending Value {
let connection = try await connect(
Expand Down
6 changes: 3 additions & 3 deletions Sources/Valkey/Documentation.docc/Pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ try await valkeyClient.publish(channel: "channel1", message: "Hello, World!")

### Subscribing

Use ``ValkeyConnection/subscribe(to:isolation:process:)-(String...,_,_)`` to subscribe to a single or multiple channels and receive every message published to the channel via an AsyncSequence. When you exit the closure provided, the Valkey client sends the relevant `UNSUBSCRIBE` messages.
Use ``ValkeyConnection/subscribe(to:process:)-(String...,_)`` to subscribe to a single or multiple channels and receive every message published to the channel via an AsyncSequence. When you exit the closure provided, the Valkey client sends the relevant `UNSUBSCRIBE` messages.

```swift
try await valkeyClient.withConnection { connection in
Expand All @@ -43,7 +43,7 @@ try await connection.subscribe(to: ["channel1"]) { subscription in

### Patterns

Valkey allows you to use glob style patterns to subscribe to a range of channels. These are available with the function ``ValkeyConnection/psubscribe(to:isolation:process:)-([String],_,_)``. This is formatted in a similar manner to normal subscriptions.
Valkey allows you to use glob style patterns to subscribe to a range of channels. These are available with the function ``ValkeyConnection/psubscribe(to:process:)-([String],_)``. This is formatted in a similar manner to normal subscriptions.

```swift
try await connection.subscribe(to: ["channel*"]) { subscription in
Expand Down Expand Up @@ -85,7 +85,7 @@ try await connection.clientTracking(

#### Subscribing to Invalidation Events

Once tracking is enabled you can subscribe to invalidation events using ``ValkeyConnection/subscribeKeyInvalidations(isolation:process:)``. The AsyncSequence passed to the `process` closure is a list of keys that have been invalidated.
Once tracking is enabled you can subscribe to invalidation events using ``ValkeyConnection/subscribeKeyInvalidations(process:)``. The AsyncSequence passed to the `process` closure is a list of keys that have been invalidated.

```swift
try await connection.subscribeKeyInvalidations { keys in
Expand Down
4 changes: 1 addition & 3 deletions Sources/Valkey/Node/ValkeyNodeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,10 @@ extension ValkeyNodeClient {
/// Get connection from connection pool and run operation using connection
///
/// - Parameters:
/// - isolation: Actor isolation
/// - operation: Closure handling Valkey connection
/// - Returns: Value returned by closure
public func withConnection<Value>(
isolation: isolated (any Actor)? = #isolation,
operation: (ValkeyConnection) async throws -> sending Value
operation: (ValkeyConnection) async throws -> Value
) async throws -> Value {
let lease = try await self.connectionPool.leaseConnection()
defer { lease.release() }
Expand Down
25 changes: 10 additions & 15 deletions Sources/Valkey/Subscriptions/ValkeyClient+subscribe.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@ extension ValkeyClient {
/// Run operation with the valkey subscription connection
///
/// - Parameters:
/// - isolation: Actor isolation
/// - operation: Closure to run with subscription connection
@inlinable
func withSubscriptionConnection<Value>(
isolation: isolated (any Actor)? = #isolation,
_ operation: (ValkeyConnection) async throws -> sending Value
) async throws -> sending Value {
_ operation: (ValkeyConnection) async throws -> Value
) async throws -> Value {
let node = self.node
let id = node.subscriptionConnectionIDGenerator.next()

Expand Down Expand Up @@ -49,16 +47,14 @@ extension ValkeyClient {
/// all subscriptions.
///
/// - Parameters:
/// - isolation: Actor isolation
/// - process: Closure that is called with async sequence of key invalidations and the client id
/// of the connection the subscription is running on.
/// - Returns: Return value of closure
@inlinable
public func subscribeKeyInvalidations<Value>(
isolation: isolated (any Actor)? = #isolation,
process: (AsyncMapSequence<ValkeySubscription, ValkeyKey>, Int) async throws -> sending Value
) async throws -> sending Value {
try await withSubscriptionConnection { connection in
process: (AsyncMapSequence<ValkeySubscription, ValkeyKey>, Int) async throws -> Value
) async throws -> Value {
try await self.withSubscriptionConnection { connection in
let id = try await connection.clientId()
return try await connection.subscribe(to: [ValkeySubscriptions.invalidateChannel]) { subscription in
let keys = subscription.map { ValkeyKey($0.message) }
Expand All @@ -71,16 +67,15 @@ extension ValkeyClient {
/// AsyncSequence
///
/// This should not be called directly, used the related commands
/// ``ValkeyClient/subscribe(to:isolation:process:)`` or
/// ``ValkeyClient/psubscribe(to:isolation:process:)``
/// ``ValkeyClient/subscribe(to:process:)`` or
/// ``ValkeyClient/psubscribe(to:process:)``
@inlinable
public func _subscribe<Value>(
command: some ValkeySubscribeCommand,
isolation: isolated (any Actor)? = #isolation,
process: (ValkeySubscription) async throws -> sending Value
) async throws -> sending Value {
process: (ValkeySubscription) async throws -> Value
) async throws -> Value {
try await self.withSubscriptionConnection { connection in
try await connection._subscribe(command: command, isolation: isolation, process: process)
try await connection._subscribe(command: command, process: process)
}
}
}
25 changes: 10 additions & 15 deletions Sources/Valkey/Subscriptions/ValkeyClusterClient+subscribe.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@ extension ValkeyClusterClient {
/// Run operation with the valkey subscription connection
///
/// - Parameters:
/// - isolation: Actor isolation
/// - operation: Closure to run with subscription connection
@inlinable
func withSubscriptionConnection<Value>(
isolation: isolated (any Actor)? = #isolation,
_ operation: (ValkeyConnection) async throws -> sending Value
) async throws -> sending Value {
_ operation: (ValkeyConnection) async throws -> Value
) async throws -> Value {
let node = try await self.nodeClient(for: [], nodeSelection: .primary)
let id = node.subscriptionConnectionIDGenerator.next()

Expand Down Expand Up @@ -49,16 +47,14 @@ extension ValkeyClusterClient {
/// all subscriptions.
///
/// - Parameters:
/// - isolation: Actor isolation
/// - process: Closure that is called with async sequence of key invalidations and the client id
/// of the connection the subscription is running on.
/// - Returns: Return value of closure
@inlinable
public func subscribeKeyInvalidations<Value>(
isolation: isolated (any Actor)? = #isolation,
process: (AsyncMapSequence<ValkeySubscription, ValkeyKey>, Int) async throws -> sending Value
) async throws -> sending Value {
try await withSubscriptionConnection { connection in
process: (AsyncMapSequence<ValkeySubscription, ValkeyKey>, Int) async throws -> Value
) async throws -> Value {
try await self.withSubscriptionConnection { connection in
let id = try await connection.clientId()
return try await connection.subscribe(to: [ValkeySubscriptions.invalidateChannel]) { subscription in
let keys = subscription.map { ValkeyKey($0.message) }
Expand All @@ -71,16 +67,15 @@ extension ValkeyClusterClient {
/// AsyncSequence
///
/// This should not be called directly, used the related commands
/// ``ValkeyClusterClient/subscribe(to:isolation:process:)`` or
/// ``ValkeyClusterClient/psubscribe(to:isolation:process:)``
/// ``ValkeyClusterClient/subscribe(to:process:)`` or
/// ``ValkeyClusterClient/psubscribe(to:process:)``
@inlinable
public func _subscribe<Value>(
command: some ValkeySubscribeCommand,
isolation: isolated (any Actor)? = #isolation,
process: (ValkeySubscription) async throws -> sending Value
) async throws -> sending Value {
process: (ValkeySubscription) async throws -> Value
) async throws -> Value {
try await self.withSubscriptionConnection { connection in
try await connection._subscribe(command: command, isolation: isolation, process: process)
try await connection._subscribe(command: command, process: process)
}
}
}
72 changes: 48 additions & 24 deletions Sources/Valkey/Subscriptions/ValkeyConnection+subscribe.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ extension ValkeyConnection {
///
/// - Parameters:
/// - shardchannels: list of shard channels to subscribe to
/// - isolation: Actor isolation
/// - process: Closure that is called with subscription async sequence
/// - Returns: Return value of closure
@inlinable
public func ssubscribe<Value>(
public nonisolated func ssubscribe<Value>(
to shardchannels: String...,
isolation: isolated (any Actor)? = #isolation,
process: (ValkeySubscription) async throws -> sending Value
) async throws -> sending Value {
process: (ValkeySubscription) async throws -> Value
) async throws -> Value {
try await self.ssubscribe(to: shardchannels, process: process)
}

Expand All @@ -39,18 +37,15 @@ extension ValkeyConnection {
///
/// - Parameters:
/// - shardchannels: list of shard channels to subscribe to
/// - isolation: Actor isolation
/// - process: Closure that is called with subscription async sequence
/// - Returns: Return value of closure
@inlinable
public func ssubscribe<Value>(
public nonisolated func ssubscribe<Value>(
to shardchannels: [String],
isolation: isolated (any Actor)? = #isolation,
process: (ValkeySubscription) async throws -> sending Value
) async throws -> sending Value {
process: (ValkeySubscription) async throws -> Value
) async throws -> Value {
try await self._subscribe(
command: SSUBSCRIBE(shardchannels: shardchannels),
isolation: isolation,
process: process
)
}
Expand All @@ -65,33 +60,31 @@ extension ValkeyConnection {
/// channel
///
/// - Parameters:
/// - isolation: Actor isolation
/// - process: Closure that is called with async sequence of key invalidations
/// - Returns: Return value of closure
@inlinable
public func subscribeKeyInvalidations<Value>(
isolation: isolated (any Actor)? = #isolation,
process: (AsyncMapSequence<ValkeySubscription, ValkeyKey>) async throws -> sending Value
) async throws -> sending Value {
public nonisolated func subscribeKeyInvalidations<Value>(
process: (AsyncMapSequence<ValkeySubscription, ValkeyKey>) async throws -> Value
) async throws -> Value {
try await self.subscribe(to: [ValkeySubscriptions.invalidateChannel]) { subscription in
let keys = subscription.map { ValkeyKey($0.message) }
return try await process(keys)
}
}

#if compiler(>=6.2)
/// Execute subscribe command and run closure using related ``ValkeySubscription``
/// AsyncSequence
///
/// This should not be called directly, used the related commands
/// ``ValkeyConnection/subscribe(to:isolation:process:)`` or
/// ``ValkeyConnection/psubscribe(to:isolation:process:)``
/// ``ValkeyConnection/subscribe(to:process:)`` or
/// ``ValkeyConnection/psubscribe(to:process:)``
@inlinable
public func _subscribe<Value>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using a later swift-format which the majority of people aren't using yet can you add this instead

    // swift-format-ignore
    @inlinable
    public nonisolated(nonsending) func _subscribe<Value>(
        command: some ValkeySubscribeCommand,
        process: nonisolated(nonsending) (ValkeySubscription) async throws -> Value
    ) async throws -> Value {

public nonisolated(nonsending) func _subscribe<Value>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this nonisolated(nonsending) when the functions that call it are nonisolated?

command: some ValkeySubscribeCommand,
isolation: isolated (any Actor)? = #isolation,
process: (ValkeySubscription) async throws -> sending Value
) async throws -> sending Value {
let (id, stream) = try await subscribe(command: command, filters: command.filters)
process: nonisolated(nonsending) (ValkeySubscription) async throws -> Value
) async throws -> Value {
let (id, stream) = try await self.subscribe(command: command, filters: command.filters)
let value: Value
do {
value = try await process(stream)
Expand All @@ -109,6 +102,37 @@ extension ValkeyConnection {
}.value
return value
}
#else
/// Execute subscribe command and run closure using related ``ValkeySubscription``
/// AsyncSequence
///
/// This should not be called directly, used the related commands
/// ``ValkeyConnection/subscribe(to:process:)`` or
/// ``ValkeyConnection/psubscribe(to:process:)``
@inlinable
public nonisolated func _subscribe<Value>(
command: some ValkeySubscribeCommand,
process: (ValkeySubscription) async throws -> Value
) async throws -> Value {
let (id, stream) = try await self.subscribe(command: command, filters: command.filters)
let value: Value
do {
value = try await process(stream)
try Task.checkCancellation()
} catch {
// call unsubscribe in unstructured Task to avoid it being cancelled
_ = await Task {
try await unsubscribe(id: id)
}.result
throw error
}
// call unsubscribe in unstructured Task to avoid it being cancelled
_ = try await Task {
try await unsubscribe(id: id)
}.value
return value
}
#endif

@usableFromInline
func subscribe(
Expand All @@ -120,7 +144,7 @@ extension ValkeyConnection {
if Task.isCancelled {
throw ValkeyClientError(.cancelled)
}
let subscriptionID: Int = try await withCheckedThrowingContinuation { continuation in
let subscriptionID: Int = try await withCheckedThrowingContinuation(isolation: self) { continuation in
self.channelHandler.subscribe(
command: command,
streamContinuation: streamContinuation,
Expand Down
11 changes: 11 additions & 0 deletions Sources/Valkey/Subscriptions/ValkeySubscription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,19 @@ public struct ValkeySubscription: AsyncSequence, Sendable {
public struct AsyncIterator: AsyncIteratorProtocol {
var base: BaseAsyncSequence.AsyncIterator

#if compiler(>=6.2)
@concurrent
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be

public mutating func next() async throws -> Element? {
    try await self.base.next(isolation: #isolation)
}

Instead of adding @concurrent

public mutating func next() async throws -> Element? {
try await self.base.next()
}
#else
public mutating func next() async throws -> Element? {
try await self.base.next()
}
#endif

public mutating func next(isolation actor: isolated (any Actor)?) async throws(any Error) -> ValkeySubscriptionMessage? {
try await self.base.next(isolation: actor)
}
}
}
Loading
Loading