1+ # # Nim-Codex
2+ # # Copyright (c) 2025 Status Research & Development GmbH
3+ # # Licensed under either of
4+ # # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
5+ # # * MIT license ([LICENSE-MIT](LICENSE-MIT))
6+ # # at your option.
7+ # # This file may not be copied, modified, or distributed except according to
8+ # # those terms.
9+
10+ {.push raises : [].}
11+
112import std/ sugar
213
314import pkg/ questionable
415import pkg/ chronos
516
617import ./ iter
718
8- export iter
9-
10- # # AsyncIter[T] is similar to `Iter[Future[T]]` with addition of methods specific to asynchronous processing
19+ # # AsyncIter[T] is similar to `Iter[Future[T]]` with
20+ # # addition of methods specific to asynchronous processing.
1121# #
12-
13- type AsyncIter * [T] = ref object
14- finished: bool
15- next* : GenNext [Future [T]]
16-
17- proc finish * [T](self: AsyncIter [T]): void =
18- self.finished = true
19-
20- proc finished * [T](self: AsyncIter [T]): bool =
21- self.finished
22-
23- iterator items * [T](self: AsyncIter [T]): Future [T] =
24- while not self.finished:
25- yield self.next ()
26-
27- iterator pairs * [T](self: AsyncIter [T]): tuple [key: int , val: Future [T]] {.inline .} =
28- var i = 0
29- while not self.finished:
30- yield (i, self.next ())
31- inc (i)
32-
33- proc map * [T, U](fut: Future [T], fn: Function [T, U]): Future [U] {.async .} =
34- let t = await fut
35- fn (t)
36-
37- proc flatMap * [T, U](fut: Future [T], fn: Function [T, Future [U]]): Future [U] {.async .} =
22+ # # Public interface:
23+ # #
24+ # # Attributes
25+ # # - next - allows to set a custom function to be called when the next item is requested
26+ # #
27+ # # Operations:
28+ # # - new - to create a new async iterator (AsyncIter)
29+ # # - finish - to finish the async iterator
30+ # # - finished - to check if the async iterator is finished
31+ # # - next - to get the next item from the async iterator
32+ # # - items - to iterate over the async iterator
33+ # # - pairs - to iterate over the async iterator and return the index of each item
34+ # # - mapFuture - to convert a (raising) Future[T] to a (raising) Future[U] using a function fn: auto -> Future[U] - we use auto to handle both raising and non-raising futures
35+ # # - mapAsync - to convert a regular sync iterator (Iter) to an async iterator (AsyncIter)
36+ # # - map - to convert one async iterator (AsyncIter) to another async iterator (AsyncIter)
37+ # # - mapFilter - to convert one async iterator (AsyncIter) to another async iterator (AsyncIter) and apply filtering at the same time
38+ # # - filter - to filter an async iterator (AsyncIter) and return another async iterator (AsyncIter)
39+ # # - delayBy - to delay each item returned by async iterator by a given duration
40+ # # - empty - to create an empty async iterator (AsyncIter)
41+
42+ type
43+ AsyncIterFunc [T, U] =
44+ proc (fut: T): Future [U] {.async .}
45+ AsyncIterIsFinished = proc (): bool {.raises : [], gcsafe .}
46+ AsyncIterGenNext [T] =
47+ proc (): Future [T] {.async .}
48+
49+ AsyncIter * [T] = ref object
50+ finished: bool
51+ next* : AsyncIterGenNext [T]
52+
53+ proc flatMap [T, U](fut: Future [T], fn: AsyncIterFunc [T, U]): Future [U] {.async .} =
3854 let t = await fut
3955 await fn (t)
4056
57+ # #######################################################################
58+ # # AsyncIter public interface methods
59+ # #######################################################################
60+
4161proc new * [T](
4262 _: type AsyncIter [T],
43- genNext: GenNext [ Future [T] ],
44- isFinished: IsFinished ,
63+ genNext: AsyncIterGenNext [T ],
64+ isFinished: AsyncIterIsFinished ,
4565 finishOnErr: bool = true ,
4666): AsyncIter [T] =
4767 # # Creates a new Iter using elements returned by supplier function `genNext`.
@@ -77,8 +97,8 @@ proc new*[T](
7797 iter.next = next
7898 return iter
7999
80- proc mapAsync * [T, U](iter: Iter [T], fn: Function [T, Future [U]]): AsyncIter [U] =
81- AsyncIter [U]. new (genNext = () => fn (iter. next ()), isFinished = () => iter. finished ())
100+ # forward declaration
101+ proc mapAsync * [T, U](iter: Iter [T], fn: AsyncIterFunc [T, U]): AsyncIter [U]
82102
83103proc new * [U, V: Ordinal ](_: type AsyncIter [U], slice: HSlice [U, V]): AsyncIter [U] =
84104 # # Creates new Iter from a slice
@@ -104,25 +124,36 @@ proc new*[U, V, S: Ordinal](
104124 i,
105125 )
106126
107- proc empty * [T](_: type AsyncIter [T]): AsyncIter [T] =
108- # # Creates an empty AsyncIter
109- # #
127+ proc finish * [T](self: AsyncIter [T]): void =
128+ self.finished = true
110129
111- proc genNext (): Future [T] {. raises : [ CatchableError ].} =
112- raise newException ( CatchableError , " Next item requested from an empty AsyncIter " )
130+ proc finished * [T](self: AsyncIter [T]): bool =
131+ self.finished
113132
114- proc isFinished (): bool =
115- true
133+ iterator items * [T](self: AsyncIter [T]): Future [T] =
134+ while not self.finished:
135+ yield self.next ()
136+
137+ iterator pairs * [T](self: AsyncIter [T]): tuple [key: int , val: Future [T]] {.inline .} =
138+ var i = 0
139+ while not self.finished:
140+ yield (i, self.next ())
141+ inc (i)
142+
143+ proc mapFuture * [T, U](fut: Future [T], fn: AsyncIterFunc [T, U]): Future [U] {.async .} =
144+ let t = await fut
145+ fn (t)
116146
117- AsyncIter [T].new (genNext, isFinished)
147+ proc mapAsync * [T, U](iter: Iter [T], fn: AsyncIterFunc [T, U]): AsyncIter [U] =
148+ AsyncIter [U].new (genNext = () => fn (iter.next ()), isFinished = () => iter.finished ())
118149
119- proc map * [T, U](iter: AsyncIter [T], fn: Function [T, Future [U] ]): AsyncIter [U] =
150+ proc map * [T, U](iter: AsyncIter [T], fn: AsyncIterFunc [T, U ]): AsyncIter [U] =
120151 AsyncIter [U].new (
121152 genNext = () => iter.next ().flatMap (fn), isFinished = () => iter.finished
122153 )
123154
124155proc mapFilter * [T, U](
125- iter: AsyncIter [T], mapPredicate: Function [T, Future [ Option [U] ]]
156+ iter: AsyncIter [T], mapPredicate: AsyncIterFunc [T, Option [U]]
126157): Future [AsyncIter [U]] {.async : (raises: [CancelledError ]).} =
127158 var nextFutU: Option [Future [U]]
128159
@@ -156,7 +187,7 @@ proc mapFilter*[T, U](
156187 AsyncIter [U].new (genNext, isFinished)
157188
158189proc filter * [T](
159- iter: AsyncIter [T], predicate: Function [T, Future [ bool ] ]
190+ iter: AsyncIter [T], predicate: AsyncIterFunc [T, bool ]
160191): Future [AsyncIter [T]] {.async : (raises: [CancelledError ]).} =
161192 proc wrappedPredicate (t: T): Future [Option [T]] {.async .} =
162193 if await predicate (t):
@@ -176,3 +207,15 @@ proc delayBy*[T](iter: AsyncIter[T], d: Duration): AsyncIter[T] =
176207 await sleepAsync (d)
177208 t,
178209 )
210+
211+ proc empty * [T](_: type AsyncIter [T]): AsyncIter [T] =
212+ # # Creates an empty AsyncIter
213+ # #
214+
215+ proc genNext (): Future [T] {.async .} =
216+ raise newException (CatchableError , " Next item requested from an empty AsyncIter" )
217+
218+ proc isFinished (): bool =
219+ true
220+
221+ AsyncIter [T].new (genNext, isFinished)
0 commit comments