@@ -240,15 +240,15 @@ defmodule K8s.Client.Mint.HTTPAdapter do
240240
241241 @ impl true
242242 def handle_call ( { :request , method , path , headers , body } , from , state ) do
243- make_request ( state , method , path , headers , body , type: :sync , caller: from )
243+ make_request ( state , method , path , headers , body , from , type: :sync , caller: from )
244244 end
245245
246- def handle_call ( { :stream , method , path , headers , body } , _from , state ) do
247- make_request ( state , method , path , headers , body , type: :stream )
246+ def handle_call ( { :stream , method , path , headers , body } , from , state ) do
247+ make_request ( state , method , path , headers , body , from , type: :stream )
248248 end
249249
250- def handle_call ( { :stream_to , method , path , headers , body , stream_to } , _from , state ) do
251- make_request ( state , method , path , headers , body , type: :stream_to , stream_to: stream_to )
250+ def handle_call ( { :stream_to , method , path , headers , body , stream_to } , from , state ) do
251+ make_request ( state , method , path , headers , body , from , type: :stream_to , stream_to: stream_to )
252252 end
253253
254254 def handle_call ( { :websocket_request , path , headers } , from , state ) do
@@ -326,13 +326,35 @@ defmodule K8s.Client.Mint.HTTPAdapter do
326326 end
327327 end
328328
329+ def handle_info ( { :DOWN , ref , :process , _pid , _reason } , state ) do
330+ state =
331+ state . requests
332+ |> Map . filter ( fn { _request_ref , request } -> request . caller_ref == ref end )
333+ |> Map . keys ( )
334+ |> Enum . reduce_while ( state , fn
335+ request_ref , state ->
336+ case pop_in ( state . requests [ request_ref ] ) do
337+ { % HTTPRequest { } , state } -> { :cont , state }
338+ { _ , state } -> { :halt , { :stop , state } }
339+ end
340+ end )
341+
342+ case state do
343+ { :stop , state } ->
344+ { :stop , :normal , state }
345+
346+ state ->
347+ { :noreply , state }
348+ end
349+ end
350+
329351 @ impl true
330352 def terminate ( _reason , state ) do
331353 state = flush_buffer ( state )
332354
333355 state
334356 |> Map . get ( :requests )
335- |> Enum . filter ( fn { _ref , request } -> is_map_key ( request , :websocket ) end )
357+ |> Enum . filter ( fn { _ref , request } -> is_struct ( request , WebSocketRequest ) end )
336358 |> Enum . each ( fn { request_ref , request } ->
337359 { :ok , _websocket , data } = Mint.WebSocket . encode ( request . websocket , :close )
338360 Mint.WebSocket . stream_request_body ( state . conn , request_ref , data )
@@ -342,15 +364,25 @@ defmodule K8s.Client.Mint.HTTPAdapter do
342364 :ok
343365 end
344366
345- @ spec make_request ( t ( ) , binary ( ) , binary ( ) , Mint.Types . headers ( ) , binary ( ) , keyword ( ) ) ::
367+ @ spec make_request (
368+ t ( ) ,
369+ binary ( ) ,
370+ binary ( ) ,
371+ Mint.Types . headers ( ) ,
372+ binary ( ) ,
373+ GenServer . from ( ) ,
374+ keyword ( )
375+ ) ::
346376 { :noreply , t ( ) } | { :reply , :ok | { :ok , reference ( ) } | { :error , HTTPError . t ( ) } , t ( ) }
347- defp make_request ( state , method , path , headers , body , extra ) do
377+ defp make_request ( state , method , path , headers , body , caller , extra ) do
378+ caller_ref = caller |> elem ( 0 ) |> Process . monitor ( )
379+
348380 case Mint.HTTP . request ( state . conn , method , path , headers , body ) do
349381 { :ok , conn , request_ref } ->
350382 state =
351383 put_in (
352384 state . requests [ request_ref ] ,
353- HTTPRequest . new ( extra )
385+ extra |> Keyword . put ( :caller_ref , caller_ref ) |> HTTPRequest . new ( )
354386 )
355387
356388 case extra [ :type ] do
@@ -366,15 +398,27 @@ defmodule K8s.Client.Mint.HTTPAdapter do
366398 end
367399 end
368400
369- @ spec upgrade_to_websocket ( t ( ) , binary ( ) , Mint.Types . headers ( ) , pid ( ) , WebSocketRequest . t ( ) ) ::
401+ @ spec upgrade_to_websocket (
402+ t ( ) ,
403+ binary ( ) ,
404+ Mint.Types . headers ( ) ,
405+ GenServer . from ( ) ,
406+ WebSocketRequest . t ( )
407+ ) ::
370408 { :noreply , t ( ) } | { :reply , { :error , HTTPError . t ( ) , t ( ) } }
371409 defp upgrade_to_websocket ( state , path , headers , caller , websocket_request ) do
410+ caller_ref = caller |> elem ( 0 ) |> Process . monitor ( )
411+
372412 case Mint.WebSocket . upgrade ( :wss , state . conn , path , headers ) do
373413 { :ok , conn , request_ref } ->
374414 state =
375415 put_in (
376416 state . requests [ request_ref ] ,
377- UpgradeRequest . new ( caller: caller , websocket_request: websocket_request )
417+ UpgradeRequest . new (
418+ caller: caller ,
419+ caller_ref: caller_ref ,
420+ websocket_request: struct! ( websocket_request , caller_ref: caller_ref )
421+ )
378422 )
379423
380424 { :noreply , struct! ( state , conn: conn ) }
0 commit comments