From 746f99967eb6de6a3d4f411f48b62502913a0235 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Thu, 30 Oct 2025 06:39:03 -0300 Subject: [PATCH 1/5] fix(realtime): make realtime default to MainActor --- Sources/Realtime/RealtimeChannelV2.swift | 45 ++---- Sources/Realtime/RealtimeClientV2.swift | 186 +++++++++-------------- Sources/Supabase/SupabaseClient.swift | 16 +- 3 files changed, 89 insertions(+), 158 deletions(-) diff --git a/Sources/Realtime/RealtimeChannelV2.swift b/Sources/Realtime/RealtimeChannelV2.swift index f4a9744b..0cbc58f0 100644 --- a/Sources/Realtime/RealtimeChannelV2.swift +++ b/Sources/Realtime/RealtimeChannelV2.swift @@ -24,14 +24,16 @@ public struct RealtimeChannelConfig: Sendable { public var isPrivate: Bool } -protocol RealtimeChannelProtocol: AnyObject, Sendable { - @MainActor var config: RealtimeChannelConfig { get } +@MainActor +protocol RealtimeChannelProtocol: AnyObject { + var config: RealtimeChannelConfig { get } var topic: String { get } var logger: (any SupabaseLogger)? { get } var socket: any RealtimeClientProtocol { get } } +@MainActor public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { struct MutableState { var clientChanges: [PostgresJoinConfig] = [] @@ -39,17 +41,16 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { var pushes: [String: PushV2] = [:] } - @MainActor private var mutableState = MutableState() let topic: String - @MainActor var config: RealtimeChannelConfig + var config: RealtimeChannelConfig let logger: (any SupabaseLogger)? let socket: any RealtimeClientProtocol - @MainActor var joinRef: String? { mutableState.joinRef } + var joinRef: String? { mutableState.joinRef } let callbackManager = CallbackManager() private let statusSubject = AsyncValueSubject(.unsubscribed) @@ -162,13 +163,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { throw RealtimeError.maxRetryAttemptsReached } - /// Subscribes to the channel. - @available(*, deprecated, message: "Use `subscribeWithError` instead") - @MainActor - public func subscribe() async { - try? await subscribeWithError() - } - /// Calculates retry delay with exponential backoff and jitter private func calculateRetryDelay(for attempt: Int) -> TimeInterval { let baseDelay: TimeInterval = 1.0 @@ -186,7 +180,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { } /// Subscribes to the channel - @MainActor private func _subscribe() async { if socket.status != .connected { if socket.options.connectOnSubscribe != true { @@ -236,20 +229,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { await push(ChannelEvent.leave) } - @available( - *, - deprecated, - message: - "manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead." - ) - public func updateAuth(jwt: String?) async { - logger?.debug("Updating auth token for channel \(topic)") - await push( - ChannelEvent.accessToken, - payload: ["access_token": jwt.map { .string($0) } ?? .null] - ) - } - /// Sends a broadcast message explicitly via REST API. /// /// This method always uses the REST API endpoint regardless of WebSocket connection state. @@ -295,7 +274,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { } headers[.authorization] = "Bearer \(accessToken)" - let body = try await JSONEncoder.supabase().encode( + let body = try JSONEncoder.supabase().encode( BroadcastMessagePayload( messages: [ BroadcastMessagePayload.Message( @@ -317,7 +296,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { let response = try await withTimeout(interval: timeout ?? socket.options.timeoutInterval) { [self] in - await Result { + await Result { @Sendable in try await socket.http.send(request) } }.get() @@ -475,7 +454,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { throw RealtimeError("Received a reply with unexpected payload: \(message)") } - await didReceiveReply(ref: ref, status: status) + didReceiveReply(ref: ref, status: status) if message.payload["response"]?.objectValue?.keys .contains(ChannelEvent.postgresChanges) == true @@ -692,9 +671,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { filter: filter ) - Task { @MainActor in - mutableState.clientChanges.append(config) - } + mutableState.clientChanges.append(config) let id = callbackManager.addPostgresCallback(filter: config, callback: callback) return RealtimeSubscription { [weak callbackManager, logger] in @@ -733,7 +710,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { self.onSystem { _ in callback() } } - @MainActor @discardableResult func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus { let message = RealtimeMessageV2( @@ -752,7 +728,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { return await push.send() } - @MainActor private func didReceiveReply(ref: String, status: String) { let push = mutableState.pushes.removeValue(forKey: ref) push?.didReceive(status: PushStatus(rawValue: status) ?? .ok) diff --git a/Sources/Realtime/RealtimeClientV2.swift b/Sources/Realtime/RealtimeClientV2.swift index a621b302..9ecdc062 100644 --- a/Sources/Realtime/RealtimeClientV2.swift +++ b/Sources/Realtime/RealtimeClientV2.swift @@ -13,9 +13,11 @@ import Foundation #endif /// Factory function for returning a new WebSocket connection. -typealias WebSocketTransport = @Sendable (_ url: URL, _ headers: [String: String]) async throws -> +typealias WebSocketTransport = + @Sendable (_ url: URL, _ headers: [String: String]) async throws -> any WebSocket +@MainActor protocol RealtimeClientProtocol: AnyObject, Sendable { var status: RealtimeClientStatus { get } var options: RealtimeClientOptions { get } @@ -29,40 +31,31 @@ protocol RealtimeClientProtocol: AnyObject, Sendable { func _remove(_ channel: any RealtimeChannelProtocol) } -public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { - struct MutableState { - var accessToken: String? - var ref = 0 - var pendingHeartbeatRef: String? +@MainActor +public final class RealtimeClientV2: RealtimeClientProtocol { + var accessToken: String? + var ref = 0 + var pendingHeartbeatRef: String? - /// Long-running task that keeps sending heartbeat messages. - var heartbeatTask: Task? + /// Long-running task that keeps sending heartbeat messages. + var heartbeatTask: Task? - /// Long-running task for listening for incoming messages from WebSocket. - var messageTask: Task? + /// Long-running task for listening for incoming messages from WebSocket. + var messageTask: Task? - var connectionTask: Task? - var channels: [String: RealtimeChannelV2] = [:] - var sendBuffer: [@Sendable () -> Void] = [] + var connectionTask: Task? + var sendBuffer: [@MainActor () -> Void] = [] - var conn: (any WebSocket)? - } + var conn: (any WebSocket)? let url: URL let options: RealtimeClientOptions let wsTransport: WebSocketTransport - let mutableState = LockIsolated(MutableState()) let http: any HTTPClientType let apikey: String - var conn: (any WebSocket)? { - mutableState.conn - } - /// All managed channels indexed by their topics. - public var channels: [String: RealtimeChannelV2] { - mutableState.channels - } + public private(set) var channels: [String: RealtimeChannelV2] = [:] private let statusSubject = AsyncValueSubject(.disconnected) private let heartbeatSubject = AsyncValueSubject(nil) @@ -161,19 +154,15 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { precondition(options.apikey != nil, "API key is required to connect to Realtime") apikey = options.apikey! - mutableState.withValue { [options] in - if let accessToken = options.headers[.authorization]?.split(separator: " ").last { - $0.accessToken = String(accessToken) - } + if let accessToken = options.headers[.authorization]?.split(separator: " ").last { + self.accessToken = String(accessToken) } } deinit { - mutableState.withValue { - $0.heartbeatTask?.cancel() - $0.messageTask?.cancel() - $0.channels = [:] - } + heartbeatTask?.cancel() + messageTask?.cancel() + channels = [:] } /// Connects the socket. @@ -185,7 +174,7 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { func connect(reconnect: Bool) async { if status == .disconnected { - let connectionTask = Task { + self.connectionTask = Task { if reconnect { try? await _clock.sleep(for: .seconds(options.reconnectDelay)) @@ -203,7 +192,7 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { status = .connecting do { - let conn = try await wsTransport( + self.conn = try await wsTransport( Self.realtimeWebSocketURL( baseURL: Self.realtimeBaseURL(url: url), apikey: options.apikey, @@ -211,16 +200,11 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { ), options.headers.dictionary ) - mutableState.withValue { $0.conn = conn } onConnected(reconnect: reconnect) } catch { onError(error) } } - - mutableState.withValue { - $0.connectionTask = connectionTask - } } _ = await statusChange.first { @Sendable in $0 == .connected } @@ -280,43 +264,29 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { _ topic: String, options: @Sendable (inout RealtimeChannelConfig) -> Void = { _ in } ) -> RealtimeChannelV2 { - mutableState.withValue { - let realtimeTopic = "realtime:\(topic)" + let realtimeTopic = "realtime:\(topic)" - if let channel = $0.channels[realtimeTopic] { - return channel - } - - var config = RealtimeChannelConfig( - broadcast: BroadcastJoinConfig(acknowledgeBroadcasts: false, receiveOwnBroadcasts: false), - presence: PresenceJoinConfig(key: ""), - isPrivate: false - ) - options(&config) + if let channel = self.channels[realtimeTopic] { + return channel + } - let channel = RealtimeChannelV2( - topic: realtimeTopic, - config: config, - socket: self, - logger: self.options.logger - ) + var config = RealtimeChannelConfig( + broadcast: BroadcastJoinConfig(acknowledgeBroadcasts: false, receiveOwnBroadcasts: false), + presence: PresenceJoinConfig(key: ""), + isPrivate: false + ) + options(&config) - $0.channels[realtimeTopic] = channel + let channel = RealtimeChannelV2( + topic: realtimeTopic, + config: config, + socket: self, + logger: self.options.logger + ) - return channel - } - } + self.channels[realtimeTopic] = channel - @available( - *, - deprecated, - message: - "Client handles channels automatically, this method will be removed on the next major release." - ) - public func addChannel(_ channel: RealtimeChannelV2) { - mutableState.withValue { - $0.channels[channel.topic] = channel - } + return channel } /// Unsubscribe and removes channel. @@ -334,9 +304,7 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { } func _remove(_ channel: any RealtimeChannelProtocol) { - mutableState.withValue { - $0.channels[channel.topic] = nil - } + self.channels[channel.topic] = nil } /// Unsubscribes and removes all channels. @@ -354,7 +322,7 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { if let accessToken = try? await options.accessToken?() { return accessToken } - return mutableState.accessToken + return self.accessToken } private func rejoinChannels() { @@ -366,7 +334,7 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { } private func listenForMessages() { - let messageTask = Task { [weak self] in + self.messageTask = Task { [weak self] in guard let self, let conn = self.conn else { return } do { @@ -382,7 +350,7 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data) await onMessage(message) - case let .close(code, reason): + case .close(let code, let reason): onClose(code: code, reason: reason) } } @@ -390,13 +358,10 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { onError(error) } } - mutableState.withValue { - $0.messageTask = messageTask - } } private func startHeartbeating() { - let heartbeatTask = Task { [weak self, options] in + self.heartbeatTask = Task { [weak self, options] in while !Task.isCancelled { try? await _clock.sleep(for: .seconds(options.heartbeatInterval)) if Task.isCancelled { @@ -405,9 +370,6 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { await self?.sendHeartbeat() } } - mutableState.withValue { - $0.heartbeatTask = heartbeatTask - } } private func sendHeartbeat() async { @@ -416,16 +378,16 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { return } - let pendingHeartbeatRef: String? = mutableState.withValue { - if $0.pendingHeartbeatRef != nil { - $0.pendingHeartbeatRef = nil + let pendingHeartbeatRef: String? = { + if self.pendingHeartbeatRef != nil { + self.pendingHeartbeatRef = nil return nil } let ref = makeRef() - $0.pendingHeartbeatRef = ref + self.pendingHeartbeatRef = ref return ref - } + }() if let pendingHeartbeatRef { push( @@ -455,13 +417,11 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { conn?.close(code: code, reason: reason) - mutableState.withValue { - $0.ref = 0 - $0.messageTask?.cancel() - $0.heartbeatTask?.cancel() - $0.connectionTask?.cancel() - $0.conn = nil - } + ref = 0 + messageTask?.cancel() + heartbeatTask?.cancel() + connectionTask?.cancel() + conn = nil status = .disconnected } @@ -479,13 +439,11 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { tokenToSend = try? await options.accessToken?() } - guard tokenToSend != mutableState.accessToken else { + guard tokenToSend != accessToken else { return } - mutableState.withValue { [token] in - $0.accessToken = token - } + self.accessToken = token for channel in channels.values { if channel.status == .subscribed { @@ -503,17 +461,17 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { heartbeatSubject.yield(message.status == .ok ? .ok : .error) } - let channel = mutableState.withValue { - if let ref = message.ref, ref == $0.pendingHeartbeatRef { - $0.pendingHeartbeatRef = nil + let channel = { + if let ref = message.ref, ref == self.pendingHeartbeatRef { + self.pendingHeartbeatRef = nil options.logger?.debug("heartbeat received") } else { options.logger? .debug("Received event \(message.event) for channel \(message.topic)") } - return $0.channels[message.topic] - } + return self.channels[message.topic] + }() if let channel { await channel.onMessage(message) @@ -524,7 +482,7 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { /// /// If the socket is not connected, the message gets enqueued within a local buffer, and sent out when a connection is next established. public func push(_ message: RealtimeMessageV2) { - let callback = { @Sendable [weak self] in + let callback = { @MainActor [weak self] in do { // Check cancellation before sending, because this push may have been cancelled before a connection was established. try Task.checkCancellation() @@ -546,24 +504,18 @@ public final class RealtimeClientV2: Sendable, RealtimeClientProtocol { if status == .connected { callback() } else { - mutableState.withValue { - $0.sendBuffer.append(callback) - } + self.sendBuffer.append(callback) } } private func flushSendBuffer() { - mutableState.withValue { - $0.sendBuffer.forEach { $0() } - $0.sendBuffer = [] - } + self.sendBuffer.forEach { $0() } + self.sendBuffer = [] } func makeRef() -> String { - mutableState.withValue { - $0.ref += 1 - return $0.ref.description - } + self.ref += 1 + return self.ref.description } static func realtimeBaseURL(url: URL) -> URL { diff --git a/Sources/Supabase/SupabaseClient.swift b/Sources/Supabase/SupabaseClient.swift index d4321478..8b12e9f1 100644 --- a/Sources/Supabase/SupabaseClient.swift +++ b/Sources/Supabase/SupabaseClient.swift @@ -68,14 +68,16 @@ public final class SupabaseClient: Sendable { } } + @MainActor + private var _realtime: RealtimeClientV2? + /// Realtime client for Supabase + @MainActor public var realtimeV2: RealtimeClientV2 { - mutableState.withValue { - if $0.realtime == nil { - $0.realtime = _initRealtimeClient() - } - return $0.realtime! + if _realtime == nil { + _realtime = _initRealtimeClient() } + return _realtime! } /// Supabase Functions allows you to deploy and invoke edge functions. @@ -108,7 +110,6 @@ public final class SupabaseClient: Sendable { var storage: SupabaseStorageClient? var rest: PostgrestClient? var functions: FunctionsClient? - var realtime: RealtimeClientV2? var changedAccessToken: String? } @@ -234,6 +235,7 @@ public final class SupabaseClient: Sendable { } /// Returns all Realtime channels. + @MainActor public var channels: [RealtimeChannelV2] { Array(realtimeV2.channels.values) } @@ -242,6 +244,7 @@ public final class SupabaseClient: Sendable { /// - Parameters: /// - name: The name of the Realtime channel. /// - options: The options to pass to the Realtime channel. + @MainActor public func channel( _ name: String, options: @Sendable (inout RealtimeChannelConfig) -> Void = { _ in } @@ -381,6 +384,7 @@ public final class SupabaseClient: Sendable { await realtimeV2.setAuth(accessToken) } + @MainActor private func _initRealtimeClient() -> RealtimeClientV2 { var realtimeOptions = options.realtime realtimeOptions.headers.merge(with: _headers) From b6a66fa9af81413f36f71c212265a076b9946cf6 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Thu, 30 Oct 2025 06:46:45 -0300 Subject: [PATCH 2/5] test: fix realtime tests --- Sources/Realtime/RealtimeClientV2.swift | 2 +- .../RealtimeTests/RealtimeChannelTests.swift | 1 + Tests/RealtimeTests/RealtimeTests.swift | 40 +++++++++++-------- Tests/RealtimeTests/_PushTests.swift | 11 ++--- Tests/SupabaseTests/SupabaseClientTests.swift | 4 +- 5 files changed, 34 insertions(+), 24 deletions(-) diff --git a/Sources/Realtime/RealtimeClientV2.swift b/Sources/Realtime/RealtimeClientV2.swift index 9ecdc062..0d6bd45b 100644 --- a/Sources/Realtime/RealtimeClientV2.swift +++ b/Sources/Realtime/RealtimeClientV2.swift @@ -562,7 +562,7 @@ public final class RealtimeClientV2: RealtimeClientProtocol { return url } - var broadcastURL: URL { + nonisolated var broadcastURL: URL { url.appendingPathComponent("api/broadcast") } } diff --git a/Tests/RealtimeTests/RealtimeChannelTests.swift b/Tests/RealtimeTests/RealtimeChannelTests.swift index 4fdbaa67..5db22f4b 100644 --- a/Tests/RealtimeTests/RealtimeChannelTests.swift +++ b/Tests/RealtimeTests/RealtimeChannelTests.swift @@ -12,6 +12,7 @@ import XCTestDynamicOverlay @testable import Realtime +@MainActor final class RealtimeChannelTests: XCTestCase { let sut = RealtimeChannelV2( topic: "topic", diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index 9fbbdda4..00494d70 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -11,6 +11,7 @@ import XCTest import FoundationNetworking #endif +@MainActor final class RealtimeTests: XCTestCase { let url = URL(string: "http://localhost:54321/realtime/v1")! let apiKey = "anon.api.key" @@ -33,8 +34,8 @@ final class RealtimeTests: XCTestCase { let reconnectDelay: TimeInterval = RealtimeClientOptions.defaultReconnectDelay let timeoutInterval: TimeInterval = RealtimeClientOptions.defaultTimeoutInterval - override func setUp() { - super.setUp() + override func setUp() async throws { + try await super.setUp() (client, server) = FakeWebSocket.fakes() http = HTTPClientMock() @@ -54,10 +55,10 @@ final class RealtimeTests: XCTestCase { ) } - override func tearDown() { + override func tearDown() async throws { sut.disconnect() - super.tearDown() + try await super.tearDown() } func test_transport() async { @@ -128,10 +129,10 @@ final class RealtimeTests: XCTestCase { XCTAssertEqual(socketStatuses.value, [.disconnected, .connecting, .connected]) - let messageTask = sut.mutableState.messageTask + let messageTask = sut.messageTask XCTAssertNotNil(messageTask) - let heartbeatTask = sut.mutableState.heartbeatTask + let heartbeatTask = sut.heartbeatTask XCTAssertNotNil(heartbeatTask) let channelStatuses = LockIsolated([RealtimeChannelStatus]()) @@ -247,15 +248,22 @@ final class RealtimeTests: XCTestCase { // So we need to wait at least 2.5s to ensure the retry happens await testClock.advance(by: .seconds(2.5)) - let events = client.sentEvents.compactMap { $0.realtimeMessage }.filter { - $0.event == "phx_join" - } - assertInlineSnapshot(of: events, as: .json) { + let events = client.sentEvents.compactMap { $0.realtimeMessage } + + assertInlineSnapshot(of: events, as: .json, record: .failed) { #""" [ + { + "event" : "heartbeat", + "payload" : { + + }, + "ref" : "1", + "topic" : "phoenix" + }, { "event" : "phx_join", - "join_ref" : "1", + "join_ref" : "2", "payload" : { "access_token" : "custom.access.token", "config" : { @@ -274,12 +282,12 @@ final class RealtimeTests: XCTestCase { }, "version" : "realtime-swift\/0.0.0" }, - "ref" : "1", + "ref" : "2", "topic" : "realtime:public:messages" }, { "event" : "phx_join", - "join_ref" : "2", + "join_ref" : "3", "payload" : { "access_token" : "custom.access.token", "config" : { @@ -298,7 +306,7 @@ final class RealtimeTests: XCTestCase { }, "version" : "realtime-swift\/0.0.0" }, - "ref" : "2", + "ref" : "3", "topic" : "realtime:public:messages" } ] @@ -524,7 +532,7 @@ final class RealtimeTests: XCTestCase { await fulfillment(of: [sentHeartbeatExpectation], timeout: 0) - let pendingHeartbeatRef = sut.mutableState.pendingHeartbeatRef + let pendingHeartbeatRef = sut.pendingHeartbeatRef XCTAssertNotNil(pendingHeartbeatRef) // Wait until next heartbeat @@ -614,7 +622,7 @@ final class RealtimeTests: XCTestCase { "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjY0MDkyMjExMjAwfQ.GfiEKLl36X8YWcatHg31jRbilovlGecfUKnOyXMSX9c" await sut.setAuth(validToken) - XCTAssertEqual(sut.mutableState.accessToken, validToken) + XCTAssertEqual(sut.accessToken, validToken) } func testSetAuthWithNonJWT() async throws { diff --git a/Tests/RealtimeTests/_PushTests.swift b/Tests/RealtimeTests/_PushTests.swift index df85752a..2eff83af 100644 --- a/Tests/RealtimeTests/_PushTests.swift +++ b/Tests/RealtimeTests/_PushTests.swift @@ -12,12 +12,13 @@ import XCTest @testable import Realtime #if !os(Android) && !os(Linux) && !os(Windows) + @MainActor final class _PushTests: XCTestCase { var ws: FakeWebSocket! var socket: RealtimeClientV2! - override func setUp() { - super.setUp() + override func setUp() async throws { + try await super.setUp() let (client, server) = FakeWebSocket.fakes() ws = server @@ -43,7 +44,7 @@ import XCTest socket: socket, logger: nil ) - let push = await PushV2( + let push = PushV2( channel: channel, message: RealtimeMessageV2( joinRef: nil, @@ -69,7 +70,7 @@ import XCTest socket: socket, logger: nil ) - let push = await PushV2( + let push = PushV2( channel: channel, message: RealtimeMessageV2( joinRef: nil, @@ -84,7 +85,7 @@ import XCTest await push.send() } await Task.megaYield() - await push.didReceive(status: .ok) + push.didReceive(status: .ok) let status = await task.value XCTAssertEqual(status, .ok) diff --git a/Tests/SupabaseTests/SupabaseClientTests.swift b/Tests/SupabaseTests/SupabaseClientTests.swift index 4a38c844..7c7e9e38 100644 --- a/Tests/SupabaseTests/SupabaseClientTests.swift +++ b/Tests/SupabaseTests/SupabaseClientTests.swift @@ -86,10 +86,10 @@ final class SupabaseClientTests: XCTestCase { let functionsRegion = await client.functions.region XCTAssertEqual(functionsRegion?.rawValue, "ap-northeast-1") - let realtimeURL = client.realtimeV2.url + let realtimeURL = await client.realtimeV2.url XCTAssertEqual(realtimeURL.absoluteString, "https://project-ref.supabase.co/realtime/v1") - let realtimeOptions = client.realtimeV2.options + let realtimeOptions = await client.realtimeV2.options let expectedRealtimeHeader = client._headers.merging(with: [ .init("custom_realtime_header_key")!: "custom_realtime_header_value" ] From f1d13a1707eae884e7a4d63a054de487772426af Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Thu, 30 Oct 2025 06:51:47 -0300 Subject: [PATCH 3/5] fix: make CallbackManager isolated to MainActor --- Sources/Helpers/EventEmitter.swift | 4 +- Sources/Realtime/CallbackManager.swift | 96 ++++++------------- Sources/Realtime/RealtimeChannelV2.swift | 4 - .../RealtimeTests/CallbackManagerTests.swift | 1 + 4 files changed, 34 insertions(+), 71 deletions(-) diff --git a/Sources/Helpers/EventEmitter.swift b/Sources/Helpers/EventEmitter.swift index 63d07d97..d005d39a 100644 --- a/Sources/Helpers/EventEmitter.swift +++ b/Sources/Helpers/EventEmitter.swift @@ -13,13 +13,13 @@ import Foundation /// When this token gets deallocated it cancels the observation it was associated with. Store this token in another object to keep the observation alive. public final class ObservationToken: @unchecked Sendable, Hashable { private let _isCancelled = LockIsolated(false) - package var onCancel: @Sendable () -> Void + package var onCancel: () -> Void public var isCancelled: Bool { _isCancelled.withValue { $0 } } - package init(onCancel: @escaping @Sendable () -> Void = {}) { + package init(onCancel: @escaping () -> Void = {}) { self.onCancel = onCancel } diff --git a/Sources/Realtime/CallbackManager.swift b/Sources/Realtime/CallbackManager.swift index b55655ce..bcac6354 100644 --- a/Sources/Realtime/CallbackManager.swift +++ b/Sources/Realtime/CallbackManager.swift @@ -1,45 +1,28 @@ import ConcurrencyExtras import Foundation -final class CallbackManager: Sendable { - struct MutableState { - var id = 0 - var serverChanges: [PostgresJoinConfig] = [] - var callbacks: [RealtimeCallback] = [] - } - - private let mutableState = LockIsolated(MutableState()) - - var serverChanges: [PostgresJoinConfig] { - mutableState.serverChanges - } - - var callbacks: [RealtimeCallback] { - mutableState.callbacks - } - - deinit { - reset() - } +@MainActor +final class CallbackManager { + var id = 0 + var serverChanges: [PostgresJoinConfig] = [] + var callbacks: [RealtimeCallback] = [] @discardableResult func addBroadcastCallback( event: String, callback: @escaping @Sendable (JSONObject) -> Void ) -> Int { - mutableState.withValue { - $0.id += 1 - $0.callbacks.append( - .broadcast( - BroadcastCallback( - id: $0.id, - event: event, - callback: callback - ) + self.id += 1 + self.callbacks.append( + .broadcast( + BroadcastCallback( + id: self.id, + event: event, + callback: callback ) ) - return $0.id - } + ) + return self.id } @discardableResult @@ -47,59 +30,46 @@ final class CallbackManager: Sendable { filter: PostgresJoinConfig, callback: @escaping @Sendable (AnyAction) -> Void ) -> Int { - mutableState.withValue { - $0.id += 1 - $0.callbacks.append( + self.id += 1 + self.callbacks.append( .postgres( PostgresCallback( - id: $0.id, + id: self.id, filter: filter, callback: callback ) ) ) - return $0.id - } + return self.id } @discardableResult func addPresenceCallback(callback: @escaping @Sendable (any PresenceAction) -> Void) -> Int { - mutableState.withValue { - $0.id += 1 - $0.callbacks.append(.presence(PresenceCallback(id: $0.id, callback: callback))) - return $0.id - } + self.id += 1 + self.callbacks.append(.presence(PresenceCallback(id: self.id, callback: callback))) + return self.id } @discardableResult func addSystemCallback(callback: @escaping @Sendable (RealtimeMessageV2) -> Void) -> Int { - mutableState.withValue { - $0.id += 1 - $0.callbacks.append(.system(SystemCallback(id: $0.id, callback: callback))) - return $0.id - } + self.id += 1 + self.callbacks.append(.system(SystemCallback(id: self.id, callback: callback))) + return self.id } func setServerChanges(changes: [PostgresJoinConfig]) { - mutableState.withValue { - $0.serverChanges = changes - } + self.serverChanges = changes } func removeCallback(id: Int) { - mutableState.withValue { - $0.callbacks.removeAll { $0.id == id } - } + self.callbacks.removeAll { $0.id == id } } func triggerPostgresChanges(ids: [Int], data: AnyAction) { - // Read mutableState at start to acquire lock once. - let mutableState = mutableState.value - - let filters = mutableState.serverChanges.filter { + let filters = serverChanges.filter { ids.contains($0.id) } - let postgresCallbacks = mutableState.callbacks.compactMap { + let postgresCallbacks = callbacks.compactMap { if case let .postgres(callback) = $0 { return callback } @@ -118,7 +88,7 @@ final class CallbackManager: Sendable { } func triggerBroadcast(event: String, json: JSONObject) { - let broadcastCallbacks = mutableState.callbacks.compactMap { + let broadcastCallbacks = callbacks.compactMap { if case let .broadcast(callback) = $0 { return callback } @@ -133,7 +103,7 @@ final class CallbackManager: Sendable { leaves: [String: PresenceV2], rawMessage: RealtimeMessageV2 ) { - let presenceCallbacks = mutableState.callbacks.compactMap { + let presenceCallbacks = callbacks.compactMap { if case let .presence(callback) = $0 { return callback } @@ -151,7 +121,7 @@ final class CallbackManager: Sendable { } func triggerSystem(message: RealtimeMessageV2) { - let systemCallbacks = mutableState.callbacks.compactMap { + let systemCallbacks = callbacks.compactMap { if case .system(let callback) = $0 { return callback } @@ -162,10 +132,6 @@ final class CallbackManager: Sendable { systemCallback.callback(message) } } - - func reset() { - mutableState.setValue(MutableState()) - } } struct PostgresCallback { diff --git a/Sources/Realtime/RealtimeChannelV2.swift b/Sources/Realtime/RealtimeChannelV2.swift index 0cbc58f0..9accd5cd 100644 --- a/Sources/Realtime/RealtimeChannelV2.swift +++ b/Sources/Realtime/RealtimeChannelV2.swift @@ -88,10 +88,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { self.socket = socket } - deinit { - callbackManager.reset() - } - /// Subscribes to the channel. public func subscribeWithError() async throws { logger?.debug( diff --git a/Tests/RealtimeTests/CallbackManagerTests.swift b/Tests/RealtimeTests/CallbackManagerTests.swift index d80269cc..f1aeccae 100644 --- a/Tests/RealtimeTests/CallbackManagerTests.swift +++ b/Tests/RealtimeTests/CallbackManagerTests.swift @@ -11,6 +11,7 @@ import XCTest @testable import Realtime +@MainActor final class CallbackManagerTests: XCTestCase { func testIntegration() { let callbackManager = CallbackManager() From 923c39e3339390f8691e71a10bf83e0a8f9ac61f Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Mon, 3 Nov 2025 12:12:20 -0300 Subject: [PATCH 4/5] refactor(realtime): drop MutableState structs --- Sources/Realtime/RealtimeChannelV2.swift | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/Sources/Realtime/RealtimeChannelV2.swift b/Sources/Realtime/RealtimeChannelV2.swift index 9accd5cd..b27de019 100644 --- a/Sources/Realtime/RealtimeChannelV2.swift +++ b/Sources/Realtime/RealtimeChannelV2.swift @@ -35,13 +35,9 @@ protocol RealtimeChannelProtocol: AnyObject { @MainActor public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { - struct MutableState { - var clientChanges: [PostgresJoinConfig] = [] - var joinRef: String? - var pushes: [String: PushV2] = [:] - } - - private var mutableState = MutableState() + var clientChanges: [PostgresJoinConfig] = [] + var joinRef: String? + var pushes: [String: PushV2] = [:] let topic: String @@ -50,8 +46,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { let logger: (any SupabaseLogger)? let socket: any RealtimeClientProtocol - var joinRef: String? { mutableState.joinRef } - let callbackManager = CallbackManager() private let statusSubject = AsyncValueSubject(.unsubscribed) @@ -194,7 +188,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { let joinConfig = RealtimeJoinConfig( broadcast: config.broadcast, presence: config.presence, - postgresChanges: mutableState.clientChanges, + postgresChanges: clientChanges, isPrivate: config.isPrivate ) @@ -205,7 +199,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { ) let joinRef = socket.makeRef() - mutableState.joinRef = joinRef + self.joinRef = joinRef logger?.debug("Subscribing to channel with body: \(joinConfig)") @@ -667,7 +661,7 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { filter: filter ) - mutableState.clientChanges.append(config) + clientChanges.append(config) let id = callbackManager.addPostgresCallback(filter: config, callback: callback) return RealtimeSubscription { [weak callbackManager, logger] in @@ -718,14 +712,14 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol { let push = PushV2(channel: self, message: message) if let ref = message.ref { - mutableState.pushes[ref] = push + pushes[ref] = push } return await push.send() } private func didReceiveReply(ref: String, status: String) { - let push = mutableState.pushes.removeValue(forKey: ref) + let push = pushes.removeValue(forKey: ref) push?.didReceive(status: PushStatus(rawValue: status) ?? .ok) } } From 058e9b708e466491899857880a32092a0df91a6e Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Mon, 3 Nov 2025 15:13:51 -0300 Subject: [PATCH 5/5] test: fix tests on Xcode 16.4 --- Tests/RealtimeTests/RealtimeTests.swift | 2 +- Tests/RealtimeTests/_PushTests.swift | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index 00494d70..f032188a 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -3,7 +3,7 @@ import ConcurrencyExtras import CustomDump import InlineSnapshotTesting import TestHelpers -import XCTest +@preconcurrency import XCTest @testable import Realtime diff --git a/Tests/RealtimeTests/_PushTests.swift b/Tests/RealtimeTests/_PushTests.swift index 2eff83af..38ccb738 100644 --- a/Tests/RealtimeTests/_PushTests.swift +++ b/Tests/RealtimeTests/_PushTests.swift @@ -7,7 +7,7 @@ import ConcurrencyExtras import TestHelpers -import XCTest +@preconcurrency import XCTest @testable import Realtime