@@ -29,7 +29,7 @@ extension LambdaHTTPServer {
2929
3030 enum State : ~ Copyable {
3131 case buffer( Deque < T > )
32- case continuation( CheckedContinuation < T , any Error > ? )
32+ case continuation( CheckedContinuation < T , any Error > )
3333 }
3434
3535 private let lock = Mutex < State > ( . buffer( [ ] ) )
@@ -55,6 +55,16 @@ extension LambdaHTTPServer {
5555 maybeContinuation? . resume ( returning: invocation)
5656 }
5757
58+ /// AsyncSequence's standard next() function
59+ /// Returns:
60+ /// - nil when the task is cancelled
61+ /// - an element when there is one in the queue
62+ ///
63+ /// When there is no element in the queue, the task will be suspended until an element is pushed to the queue
64+ /// or the task is cancelled
65+ ///
66+ /// - Throws: PoolError if the next() function is called twice concurrently
67+ @Sendable
5868 func next( ) async throws -> T ? {
5969 // exit the async for loop if the task is cancelled
6070 guard !Task. isCancelled else {
@@ -63,29 +73,34 @@ extension LambdaHTTPServer {
6373
6474 return try await withTaskCancellationHandler {
6575 try await withCheckedThrowingContinuation { ( continuation: CheckedContinuation < T , any Error > ) in
66- let ( nextAction, nextError ) = self . lock. withLock { state -> ( T ? , PoolError ? ) in
76+ let nextAction : Result < T , PoolError > ? = self . lock. withLock { state -> Result < T , PoolError > ? in
6777 switch consume state {
6878 case . buffer( var buffer) :
6979 if let first = buffer. popFirst ( ) {
7080 state = . buffer( buffer)
71- return ( first, nil )
81+ return . success ( first)
7282 } else {
7383 state = . continuation( continuation)
74- return ( nil , nil )
84+ return nil
7585 }
7686
7787 case . continuation( let previousContinuation) :
7888 state = . buffer( [ ] )
79- return ( nil , PoolError ( cause: . nextCalledTwice( [ previousContinuation, continuation ] ) ) )
89+ return . failure ( PoolError ( cause: . nextCalledTwice( previousContinuation) ) )
8090 }
8191 }
8292
83- if let nextError,
84- case let . nextCalledTwice( continuations) = nextError. cause
85- {
86- for continuation in continuations { continuation? . resume ( throwing: nextError) }
87- } else if let nextAction {
88- continuation. resume ( returning: nextAction)
93+ switch nextAction {
94+ case . success( let action) :
95+ continuation. resume ( returning: action)
96+ case . failure( let error) :
97+ if case let . nextCalledTwice( continuation) = error. cause {
98+ continuation. resume ( throwing: error)
99+ }
100+ continuation. resume ( throwing: error)
101+ case . none:
102+ // do nothing
103+ break
89104 }
90105 }
91106 } onCancel: {
@@ -95,7 +110,7 @@ extension LambdaHTTPServer {
95110 state = . buffer( buffer)
96111 case . continuation( let continuation) :
97112 state = . buffer( [ ] )
98- continuation? . resume ( throwing: CancellationError ( ) )
113+ continuation. resume ( throwing: CancellationError ( ) )
99114 }
100115 }
101116 }
@@ -115,9 +130,9 @@ extension LambdaHTTPServer {
115130 }
116131
117132 enum Cause {
118- case nextCalledTwice( [ CheckedContinuation < T , any Error > ? ] )
133+ case nextCalledTwice( CheckedContinuation < T , any Error > )
119134 }
120135 }
121136 }
122137}
123- #endif
138+ #endif
0 commit comments