Skip to content

Commit bed1a5d

Browse files
author
Sebastien Stormacq
committed
Split file LocalServer and Pool
1 parent 0cd73da commit bed1a5d

File tree

2 files changed

+124
-105
lines changed

2 files changed

+124
-105
lines changed
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
#if LocalServerSupport
16+
import DequeModule
17+
import Synchronization
18+
19+
@available(LambdaSwift 2.0, *)
20+
extension LambdaHTTPServer {
21+
/// A shared data structure to store the current invocation or response requests and the continuation objects.
22+
/// This data structure is shared between instances of the HTTPHandler
23+
/// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function).
24+
internal final class Pool<T>: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable {
25+
private let poolName: String
26+
internal init(name: String = "Pool") { self.poolName = name }
27+
28+
typealias Element = T
29+
30+
enum State: ~Copyable {
31+
case buffer(Deque<T>)
32+
case continuation(CheckedContinuation<T, any Error>?)
33+
}
34+
35+
private let lock = Mutex<State>(.buffer([]))
36+
37+
/// enqueue an element, or give it back immediately to the iterator if it is waiting for an element
38+
public func push(_ invocation: T) {
39+
40+
// if the iterator is waiting for an element on `next()``, give it to it
41+
// otherwise, enqueue the element
42+
let maybeContinuation = self.lock.withLock { state -> CheckedContinuation<T, any Error>? in
43+
switch consume state {
44+
case .continuation(let continuation):
45+
state = .buffer([])
46+
return continuation
47+
48+
case .buffer(var buffer):
49+
buffer.append(invocation)
50+
state = .buffer(buffer)
51+
return nil
52+
}
53+
}
54+
55+
maybeContinuation?.resume(returning: invocation)
56+
}
57+
58+
func next() async throws -> T? {
59+
// exit the async for loop if the task is cancelled
60+
guard !Task.isCancelled else {
61+
return nil
62+
}
63+
64+
return try await withTaskCancellationHandler {
65+
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<T, any Error>) in
66+
let (nextAction, nextError) = self.lock.withLock { state -> (T?, PoolError?) in
67+
switch consume state {
68+
case .buffer(var buffer):
69+
if let first = buffer.popFirst() {
70+
state = .buffer(buffer)
71+
return (first, nil)
72+
} else {
73+
state = .continuation(continuation)
74+
return (nil, nil)
75+
}
76+
77+
case .continuation(let previousContinuation):
78+
state = .buffer([])
79+
return (nil, PoolError(cause: .nextCalledTwice([previousContinuation, continuation])))
80+
}
81+
}
82+
83+
if let nextError,
84+
case let .nextCalledTwice(continuations) = nextError.cause
85+
{
86+
for continuation in continuations { continuation?.resume(throwing: nextError) }
87+
} else if let nextAction {
88+
continuation.resume(returning: nextAction)
89+
}
90+
}
91+
} onCancel: {
92+
self.lock.withLock { state in
93+
switch consume state {
94+
case .buffer(let buffer):
95+
state = .buffer(buffer)
96+
case .continuation(let continuation):
97+
state = .buffer([])
98+
continuation?.resume(throwing: CancellationError())
99+
}
100+
}
101+
}
102+
}
103+
104+
func makeAsyncIterator() -> Pool {
105+
self
106+
}
107+
108+
struct PoolError: Error {
109+
let cause: Cause
110+
var message: String {
111+
switch self.cause {
112+
case .nextCalledTwice:
113+
return "Concurrent invocations to next(). This is not allowed."
114+
}
115+
}
116+
117+
enum Cause {
118+
case nextCalledTwice([CheckedContinuation<T, any Error>?])
119+
}
120+
}
121+
}
122+
}
123+
#endif

Sources/AWSLambdaRuntime/Lambda+LocalServer.swift

Lines changed: 1 addition & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,11 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
#if LocalServerSupport
16-
import DequeModule
1716
import Dispatch
1817
import Logging
1918
import NIOCore
2019
import NIOHTTP1
2120
import NIOPosix
22-
import Synchronization
2321

2422
// This functionality is designed for local testing when the LocalServerSupport trait is enabled.
2523

@@ -565,108 +563,6 @@ internal struct LambdaHTTPServer {
565563
}
566564
}
567565

568-
/// A shared data structure to store the current invocation or response requests and the continuation objects.
569-
/// This data structure is shared between instances of the HTTPHandler
570-
/// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function).
571-
internal final class Pool<T>: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable {
572-
private let poolName: String
573-
internal init(name: String = "Pool") { self.poolName = name }
574-
575-
typealias Element = T
576-
577-
enum State: ~Copyable {
578-
case buffer(Deque<T>)
579-
case continuation(CheckedContinuation<T, any Error>?)
580-
}
581-
582-
private let lock = Mutex<State>(.buffer([]))
583-
584-
/// enqueue an element, or give it back immediately to the iterator if it is waiting for an element
585-
public func push(_ invocation: T) {
586-
587-
// if the iterator is waiting for an element on `next()``, give it to it
588-
// otherwise, enqueue the element
589-
let maybeContinuation = self.lock.withLock { state -> CheckedContinuation<T, any Error>? in
590-
switch consume state {
591-
case .continuation(let continuation):
592-
state = .buffer([])
593-
return continuation
594-
595-
case .buffer(var buffer):
596-
buffer.append(invocation)
597-
state = .buffer(buffer)
598-
return nil
599-
}
600-
}
601-
602-
maybeContinuation?.resume(returning: invocation)
603-
}
604-
605-
func next() async throws -> T? {
606-
// exit the async for loop if the task is cancelled
607-
guard !Task.isCancelled else {
608-
return nil
609-
}
610-
611-
return try await withTaskCancellationHandler {
612-
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<T, any Error>) in
613-
let (nextAction, nextError) = self.lock.withLock { state -> (T?, PoolError?) in
614-
switch consume state {
615-
case .buffer(var buffer):
616-
if let first = buffer.popFirst() {
617-
state = .buffer(buffer)
618-
return (first, nil)
619-
} else {
620-
state = .continuation(continuation)
621-
return (nil, nil)
622-
}
623-
624-
case .continuation(let previousContinuation):
625-
state = .buffer([])
626-
return (nil, PoolError(cause: .nextCalledTwice([previousContinuation, continuation])))
627-
}
628-
}
629-
630-
if let nextError,
631-
case let .nextCalledTwice(continuations) = nextError.cause
632-
{
633-
for continuation in continuations { continuation?.resume(throwing: nextError) }
634-
} else if let nextAction {
635-
continuation.resume(returning: nextAction)
636-
}
637-
}
638-
} onCancel: {
639-
self.lock.withLock { state in
640-
switch consume state {
641-
case .buffer(let buffer):
642-
state = .buffer(buffer)
643-
case .continuation(let continuation):
644-
state = .buffer([])
645-
continuation?.resume(throwing: CancellationError())
646-
}
647-
}
648-
}
649-
}
650-
651-
func makeAsyncIterator() -> Pool {
652-
self
653-
}
654-
655-
struct PoolError: Error {
656-
let cause: Cause
657-
var message: String {
658-
switch self.cause {
659-
case .nextCalledTwice:
660-
return "Concurrent invocations to next(). This is not allowed."
661-
}
662-
}
663-
664-
enum Cause {
665-
case nextCalledTwice([CheckedContinuation<T, any Error>?])
666-
}
667-
}
668-
}
669-
670566
private struct LocalServerResponse: Sendable {
671567
let requestId: String?
672568
let status: HTTPResponseStatus?
@@ -687,7 +583,7 @@ internal struct LambdaHTTPServer {
687583
self.final = final
688584
}
689585
}
690-
586+
691587
private struct LocalServerInvocation: Sendable {
692588
let requestId: String
693589
let request: ByteBuffer

0 commit comments

Comments
 (0)