diff --git a/Sources/TecoCLSLogging/CLSLogHandler.swift b/Sources/TecoCLSLogging/CLSLogHandler.swift index a27b455..0af1087 100644 --- a/Sources/TecoCLSLogging/CLSLogHandler.swift +++ b/Sources/TecoCLSLogging/CLSLogHandler.swift @@ -5,18 +5,17 @@ import protocol TecoSigner.Credential public struct CLSLogHandler: LogHandler { public let client: CLSLogClient - internal let accumulator: CLSLogAccumulator + public let queue: CLSLogQueue public init( client: HTTPClient, credentialProvider: @escaping () -> Credential, region: String, topicID: String, - maxBatchSize: UInt = 4, - maxWaitNanoseconds: UInt? = nil + queueConfig: CLSLogQueue.Configuration = .init() ) { self.client = .init(client: client, credentialProvider: credentialProvider, region: region, topicID: topicID) - self.accumulator = .init(maxBatchSize: maxBatchSize, maxWaitNanoseconds: maxWaitNanoseconds, uploader: self.client.uploadLogs) + self.queue = .init(configuration: queueConfig, uploader: self.client.uploadLogs) } // MARK: Log handler implemenation @@ -38,7 +37,7 @@ public struct CLSLogHandler: LogHandler { let metadata = self.resolveMetadata(metadata) let log = Cls_LogGroup(level, message: message, metadata: metadata, source: source, file: file, function: function, line: line) assert(log.isInitialized) - self.accumulator.addLog(log) + self.queue.enqueue(log) } // MARK: Internal implemenation @@ -51,8 +50,8 @@ public struct CLSLogHandler: LogHandler { // MARK: Test utility - internal init(client: CLSLogClient, accumulator: CLSLogAccumulator) { + internal init(client: CLSLogClient, queue: CLSLogQueue) { self.client = client - self.accumulator = accumulator + self.queue = queue } } diff --git a/Sources/TecoCLSLogging/CLSLogAccumulator.swift b/Sources/TecoCLSLogging/CLSLogQueue.swift similarity index 59% rename from Sources/TecoCLSLogging/CLSLogAccumulator.swift rename to Sources/TecoCLSLogging/CLSLogQueue.swift index 2e2924d..683ea84 100644 --- a/Sources/TecoCLSLogging/CLSLogAccumulator.swift +++ b/Sources/TecoCLSLogging/CLSLogQueue.swift @@ -1,23 +1,32 @@ import Dispatch import struct NIOConcurrencyHelpers.NIOLock -class CLSLogAccumulator { +public class CLSLogQueue { private var lock: NIOLock = .init() - private var logQueue: [Cls_LogGroup] = [] + private var logs: [Cls_LogGroup] = [] private var deadline: DispatchWallTime = .distantFuture - let maxBatchSize: Int - let maxWaitTime: DispatchTimeInterval? - let uploader: ([Cls_LogGroup]) async throws -> String + public let configuration: Configuration - init(maxBatchSize: UInt, maxWaitNanoseconds: UInt?, uploader: @escaping ([Cls_LogGroup]) async throws -> String) { - self.maxBatchSize = Int(maxBatchSize) - if let maxWaitNanoseconds = maxWaitNanoseconds { - self.maxWaitTime = .nanoseconds(Int(maxWaitNanoseconds)) - } else { - self.maxWaitTime = nil + public struct Configuration { + public let maxBatchSize: Int + public let maxWaitTime: DispatchTimeInterval? + + public init(maxBatchSize: UInt = 4, maxWaitNanoseconds: UInt? = nil) { + self.maxBatchSize = Int(maxBatchSize) + if let maxWaitNanoseconds = maxWaitNanoseconds { + self.maxWaitTime = .nanoseconds(Int(maxWaitNanoseconds)) + } else { + self.maxWaitTime = nil + } } + } + + private let uploader: ([Cls_LogGroup]) async throws -> String + + init(configuration: Configuration = .init(), uploader: @escaping ([Cls_LogGroup]) async throws -> String) { + self.configuration = configuration self.uploader = uploader } @@ -49,16 +58,16 @@ class CLSLogAccumulator { } } - func addLog(_ log: Cls_LogGroup) { + func enqueue(_ log: Cls_LogGroup) { // set deadline and append log self.lock.withLock { - if let maxWaitTime = maxWaitTime { + if let maxWaitTime = configuration.maxWaitTime { let deadline = DispatchWallTime.now() + maxWaitTime if self.deadline > deadline { self.deadline = deadline } } - self.logQueue.append(log) + self.logs.append(log) } // upload if required @@ -71,26 +80,26 @@ class CLSLogAccumulator { private func batchUploadPayload(force: Bool = false) -> [Cls_LogGroup]? { // get log queue length - guard !logQueue.isEmpty else { + guard !logs.isEmpty else { return nil } - let queued = logQueue.count + let queued = logs.count assert(queued > 0) // compute batch size - guard queued >= maxBatchSize || deadline < .now() || force else { + guard queued >= configuration.maxBatchSize || deadline < .now() || force else { return nil } - let batchSize = min(queued, maxBatchSize) - assert(logQueue.count >= batchSize) + let batchSize = min(queued, configuration.maxBatchSize) + assert(logs.count >= batchSize) // dequeue the batch return self.lock.withLock { - let batch = self.logQueue.prefix(batchSize) + let batch = self.logs.prefix(batchSize) assert(batch.count == batchSize) - self.logQueue.removeFirst(batchSize) + self.logs.removeSubrange(batch.indices) - if !self.logQueue.isEmpty, let maxWaitTime = maxWaitTime { + if !self.logs.isEmpty, let maxWaitTime = configuration.maxWaitTime { self.deadline = .now() + maxWaitTime } else { self.deadline = .distantFuture diff --git a/Tests/TecoCLSLoggingTests/CLSLogHandlerTests.swift b/Tests/TecoCLSLoggingTests/CLSLogHandlerTests.swift index bc6cd79..f47fe64 100644 --- a/Tests/TecoCLSLoggingTests/CLSLogHandlerTests.swift +++ b/Tests/TecoCLSLoggingTests/CLSLogHandlerTests.swift @@ -99,7 +99,7 @@ final class CLSLogHandlerTests: XCTestCase { return "mock-upload-id" } - // create log handler with custom accumulator + // create log handler with custom queue let logHandler = CLSLogHandler( client: .init( client: .init(eventLoopGroupProvider: .createNew), @@ -107,11 +107,7 @@ final class CLSLogHandlerTests: XCTestCase { region: "ap-guangzhou", topicID: "xxxxxxxx-xxxx-xxxx-xxxx" ), - accumulator: .init( - maxBatchSize: 3, - maxWaitNanoseconds: nil, - uploader: upload - ) + queue: .init(configuration: .init(maxBatchSize: 3), uploader: upload) ) // we're not actually sending any requests here diff --git a/Tests/TecoCLSLoggingTests/CLSLogAccumulatorTests.swift b/Tests/TecoCLSLoggingTests/CLSLogQueueTests.swift similarity index 68% rename from Tests/TecoCLSLoggingTests/CLSLogAccumulatorTests.swift rename to Tests/TecoCLSLoggingTests/CLSLogQueueTests.swift index fa06a4f..4004b32 100644 --- a/Tests/TecoCLSLoggingTests/CLSLogAccumulatorTests.swift +++ b/Tests/TecoCLSLoggingTests/CLSLogQueueTests.swift @@ -3,26 +3,22 @@ import XCTest import AsyncHTTPClient import Atomics -final class CLSLogAccumulatorTests: XCTestCase { +final class CLSLogQueueTests: XCTestCase { func testBatchSize() async throws { // set up test helpers let batches = ManagedAtomic(0) func upload(_ logs: [Cls_LogGroup]) throws -> String { XCTAssertLessThanOrEqual(logs.count, 2) - batches.wrappingIncrement(ordering: .relaxed) + batches.wrappingIncrement(ordering: .sequentiallyConsistent) return "mock-upload-id" } - // create log accumulator - let accumulator = CLSLogAccumulator( - maxBatchSize: 2, - maxWaitNanoseconds: nil, - uploader: upload - ) + // create log queue + let queue = CLSLogQueue(configuration: .init(maxBatchSize: 2), uploader: upload) // test adding logs for id in 0...10 { - accumulator.addLog( + queue.enqueue( .init(.debug, message: "Hello with ID#\(id)", source: "TecoCLSLoggingTests", file: #fileID, function: #function, line: #line) @@ -30,10 +26,11 @@ final class CLSLogAccumulatorTests: XCTestCase { } // force flush the logger to upload logs - try accumulator.forceFlush() + try queue.forceFlush() + try await Task.sleep(nanoseconds: 10_000_000) // assert batch counts - XCTAssertEqual(batches.load(ordering: .acquiring), 6) + XCTAssertEqual(batches.load(ordering: .sequentiallyConsistent), 6) } func testWaitDuration() async throws { @@ -43,16 +40,15 @@ final class CLSLogAccumulatorTests: XCTestCase { return "mock-upload-id" } - // create log accumulator - let accumulator = CLSLogAccumulator( - maxBatchSize: 5, - maxWaitNanoseconds: 200_000_000, + // create log queue + let queue = CLSLogQueue( + configuration: .init(maxBatchSize: 5, maxWaitNanoseconds: 200_000_000), uploader: upload ) // test adding logs for id in 0...10 { - accumulator.addLog( + queue.enqueue( .init(.debug, message: "Hello with ID#\(id)", source: "TecoCLSLoggingTests", file: #fileID, function: #function, line: #line)