Skip to content

Commit 4703923

Browse files
authored
Merge pull request #1369 from JuliaGPU/tb/async_errors
Make nonblocking synchronization robust to errors.
2 parents 64ca1c8 + bacd69a commit 4703923

File tree

4 files changed

+32
-7
lines changed

4 files changed

+32
-7
lines changed

lib/cudadrv/execution.jl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,8 @@ function launch(f::Base.Callable; stream::CuStream=stream())
147147
f()
148148
close(async_cond)
149149
end
150-
# FIXME: protect this from GC
150+
151+
# the condition object is embedded in a task, so the Julia scheduler keeps it alive
151152

152153
# callback = @cfunction(async_send, Cint, (Ptr{Cvoid},))
153154
# See https://github.com/JuliaGPU/CUDA.jl/issues/1314.

lib/cudadrv/stream.jl

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,13 +153,37 @@ end
153153
spins += 1
154154
end
155155

156-
# minimize CPU usage of long-running kernels
157-
# by waiting for an event signalled by CUDA
158-
event = Threads.Event()
156+
# minimize CPU usage of long-running kernels by waiting for an event signalled by CUDA
157+
event = Base.Event()
159158
launch(; stream) do
160159
notify(event)
161160
end
162-
Base.wait(event)
161+
# if an error occurs, the callback may never fire, so use a timer to detect such cases
162+
dev = device()
163+
timer = Timer(0; interval=1)
164+
Base.@sync begin
165+
Threads.@spawn try
166+
device!(dev)
167+
while true
168+
try
169+
Base.wait(timer)
170+
catch err
171+
err isa EOFError && break
172+
rethrow()
173+
end
174+
if unsafe_cuStreamQuery(stream) != ERROR_NOT_READY
175+
break
176+
end
177+
end
178+
finally
179+
notify(event)
180+
end
181+
182+
Threads.@spawn begin
183+
Base.wait(event)
184+
close(timer)
185+
end
186+
end
163187

164188
return
165189
end

src/compiler/execution.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ function run_and_collect(cmd)
311311
proc = run(pipeline(ignorestatus(cmd); stdout, stderr=stdout), wait=false)
312312
close(stdout.in)
313313

314-
reader = @async String(read(stdout))
314+
reader = Threads.@spawn String(read(stdout))
315315
Base.wait(proc)
316316
log = strip(fetch(reader))
317317

src/pool.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ function pool_mark(dev::CuDevice)
8989

9090
# launch a task to periodically trim the pool
9191
if isinteractive() && !isassigned(__pool_cleanup)
92-
__pool_cleanup[] = @async pool_cleanup()
92+
__pool_cleanup[] = errormonitor(Threads.@spawn pool_cleanup())
9393
end
9494
end
9595
status[] = true

0 commit comments

Comments
 (0)