Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions Sources/TecoCLSLogging/CLSLogHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,17 @@ import protocol TecoSigner.Credential
public struct CLSLogHandler: LogHandler {

public let client: CLSLogClient
internal let accumulator: CLSLogAccumulator
public let queue: CLSLogQueue

public init(
client: HTTPClient,
credentialProvider: @escaping () -> Credential,
region: String,
topicID: String,
maxBatchSize: UInt = 4,
maxWaitNanoseconds: UInt? = nil
queueConfig: CLSLogQueue.Configuration = .init()
) {
self.client = .init(client: client, credentialProvider: credentialProvider, region: region, topicID: topicID)
self.accumulator = .init(maxBatchSize: maxBatchSize, maxWaitNanoseconds: maxWaitNanoseconds, uploader: self.client.uploadLogs)
self.queue = .init(configuration: queueConfig, uploader: self.client.uploadLogs)
}

// MARK: Log handler implemenation
Expand All @@ -38,7 +37,7 @@ public struct CLSLogHandler: LogHandler {
let metadata = self.resolveMetadata(metadata)
let log = Cls_LogGroup(level, message: message, metadata: metadata, source: source, file: file, function: function, line: line)
assert(log.isInitialized)
self.accumulator.addLog(log)
self.queue.enqueue(log)
}

// MARK: Internal implemenation
Expand All @@ -51,8 +50,8 @@ public struct CLSLogHandler: LogHandler {

// MARK: Test utility

internal init(client: CLSLogClient, accumulator: CLSLogAccumulator) {
internal init(client: CLSLogClient, queue: CLSLogQueue) {
self.client = client
self.accumulator = accumulator
self.queue = queue
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
import Dispatch
import struct NIOConcurrencyHelpers.NIOLock

class CLSLogAccumulator {
public class CLSLogQueue {

private var lock: NIOLock = .init()
private var logQueue: [Cls_LogGroup] = []
private var logs: [Cls_LogGroup] = []
private var deadline: DispatchWallTime = .distantFuture

let maxBatchSize: Int
let maxWaitTime: DispatchTimeInterval?
let uploader: ([Cls_LogGroup]) async throws -> String
public let configuration: Configuration

init(maxBatchSize: UInt, maxWaitNanoseconds: UInt?, uploader: @escaping ([Cls_LogGroup]) async throws -> String) {
self.maxBatchSize = Int(maxBatchSize)
if let maxWaitNanoseconds = maxWaitNanoseconds {
self.maxWaitTime = .nanoseconds(Int(maxWaitNanoseconds))
} else {
self.maxWaitTime = nil
public struct Configuration {
public let maxBatchSize: Int
public let maxWaitTime: DispatchTimeInterval?

public init(maxBatchSize: UInt = 4, maxWaitNanoseconds: UInt? = nil) {
self.maxBatchSize = Int(maxBatchSize)
if let maxWaitNanoseconds = maxWaitNanoseconds {
self.maxWaitTime = .nanoseconds(Int(maxWaitNanoseconds))
} else {
self.maxWaitTime = nil
}
}
}

private let uploader: ([Cls_LogGroup]) async throws -> String

init(configuration: Configuration = .init(), uploader: @escaping ([Cls_LogGroup]) async throws -> String) {
self.configuration = configuration
self.uploader = uploader
}

Expand Down Expand Up @@ -49,16 +58,16 @@ class CLSLogAccumulator {
}
}

func addLog(_ log: Cls_LogGroup) {
func enqueue(_ log: Cls_LogGroup) {
// set deadline and append log
self.lock.withLock {
if let maxWaitTime = maxWaitTime {
if let maxWaitTime = configuration.maxWaitTime {
let deadline = DispatchWallTime.now() + maxWaitTime
if self.deadline > deadline {
self.deadline = deadline
}
}
self.logQueue.append(log)
self.logs.append(log)
}

// upload if required
Expand All @@ -71,26 +80,26 @@ class CLSLogAccumulator {

private func batchUploadPayload(force: Bool = false) -> [Cls_LogGroup]? {
// get log queue length
guard !logQueue.isEmpty else {
guard !logs.isEmpty else {
return nil
}
let queued = logQueue.count
let queued = logs.count
assert(queued > 0)

// compute batch size
guard queued >= maxBatchSize || deadline < .now() || force else {
guard queued >= configuration.maxBatchSize || deadline < .now() || force else {
return nil
}
let batchSize = min(queued, maxBatchSize)
assert(logQueue.count >= batchSize)
let batchSize = min(queued, configuration.maxBatchSize)
assert(logs.count >= batchSize)

// dequeue the batch
return self.lock.withLock {
let batch = self.logQueue.prefix(batchSize)
let batch = self.logs.prefix(batchSize)
assert(batch.count == batchSize)
self.logQueue.removeFirst(batchSize)
self.logs.removeSubrange(batch.indices)

if !self.logQueue.isEmpty, let maxWaitTime = maxWaitTime {
if !self.logs.isEmpty, let maxWaitTime = configuration.maxWaitTime {
self.deadline = .now() + maxWaitTime
} else {
self.deadline = .distantFuture
Expand Down
8 changes: 2 additions & 6 deletions Tests/TecoCLSLoggingTests/CLSLogHandlerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,15 @@ final class CLSLogHandlerTests: XCTestCase {
return "mock-upload-id"
}

// create log handler with custom accumulator
// create log handler with custom queue
let logHandler = CLSLogHandler(
client: .init(
client: .init(eventLoopGroupProvider: .createNew),
credentialProvider: { StaticCredential(secretId: "", secretKey: "") },
region: "ap-guangzhou",
topicID: "xxxxxxxx-xxxx-xxxx-xxxx"
),
accumulator: .init(
maxBatchSize: 3,
maxWaitNanoseconds: nil,
uploader: upload
)
queue: .init(configuration: .init(maxBatchSize: 3), uploader: upload)
)

// we're not actually sending any requests here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,34 @@ import XCTest
import AsyncHTTPClient
import Atomics

final class CLSLogAccumulatorTests: XCTestCase {
final class CLSLogQueueTests: XCTestCase {
func testBatchSize() async throws {
// set up test helpers
let batches = ManagedAtomic(0)
func upload(_ logs: [Cls_LogGroup]) throws -> String {
XCTAssertLessThanOrEqual(logs.count, 2)
batches.wrappingIncrement(ordering: .relaxed)
batches.wrappingIncrement(ordering: .sequentiallyConsistent)
return "mock-upload-id"
}

// create log accumulator
let accumulator = CLSLogAccumulator(
maxBatchSize: 2,
maxWaitNanoseconds: nil,
uploader: upload
)
// create log queue
let queue = CLSLogQueue(configuration: .init(maxBatchSize: 2), uploader: upload)

// test adding logs
for id in 0...10 {
accumulator.addLog(
queue.enqueue(
.init(.debug, message: "Hello with ID#\(id)",
source: "TecoCLSLoggingTests",
file: #fileID, function: #function, line: #line)
)
}

// force flush the logger to upload logs
try accumulator.forceFlush()
try queue.forceFlush()
try await Task.sleep(nanoseconds: 10_000_000)

// assert batch counts
XCTAssertEqual(batches.load(ordering: .acquiring), 6)
XCTAssertEqual(batches.load(ordering: .sequentiallyConsistent), 6)
}

func testWaitDuration() async throws {
Expand All @@ -43,16 +40,15 @@ final class CLSLogAccumulatorTests: XCTestCase {
return "mock-upload-id"
}

// create log accumulator
let accumulator = CLSLogAccumulator(
maxBatchSize: 5,
maxWaitNanoseconds: 200_000_000,
// create log queue
let queue = CLSLogQueue(
configuration: .init(maxBatchSize: 5, maxWaitNanoseconds: 200_000_000),
uploader: upload
)

// test adding logs
for id in 0...10 {
accumulator.addLog(
queue.enqueue(
.init(.debug, message: "Hello with ID#\(id)",
source: "TecoCLSLoggingTests",
file: #fileID, function: #function, line: #line)
Expand Down