@@ -572,13 +572,17 @@ impl Channel {
572572 }
573573
574574 fn on_channel_close_ok_sent ( & self , error : Option < Error > ) {
575- self . set_closed (
576- error
577- . clone ( )
578- . unwrap_or ( ErrorKind :: InvalidChannelState ( ChannelState :: Closing ) . into ( ) ) ,
579- ) ;
580- if let Some ( error) = error {
581- self . error_handler . on_error ( error) ;
575+ if !self . recovery_config . auto_recover_channels
576+ || !error. as_ref ( ) . map_or ( false , |e| e. is_amqp_soft_error ( ) )
577+ {
578+ self . set_closed (
579+ error
580+ . clone ( )
581+ . unwrap_or ( ErrorKind :: InvalidChannelState ( ChannelState :: Closing ) . into ( ) ) ,
582+ ) ;
583+ if let Some ( error) = error {
584+ self . error_handler . on_error ( error) ;
585+ }
582586 }
583587 }
584588
@@ -862,7 +866,19 @@ impl Channel {
862866 resolver : PromiseResolver < Channel > ,
863867 channel : Channel ,
864868 ) -> Result < ( ) > {
865- self . set_state ( ChannelState :: Connected ) ;
869+ if self . recovery_config . auto_recover_channels {
870+ self . status . update_recovery_context ( |ctx| {
871+ ctx. set_expected_replies ( self . frames . take_expected_replies ( self . id ) ) ;
872+ self . frames . drop_frames_for_channel ( channel. id , ctx. cause ( ) ) ;
873+ self . acknowledgements . reset ( ctx. cause ( ) ) ;
874+ self . consumers . error ( ctx. cause ( ) ) ;
875+ } ) ;
876+ if !self . status . confirm ( ) {
877+ self . status . finalize_recovery ( ) ;
878+ }
879+ } else {
880+ self . set_state ( ChannelState :: Connected ) ;
881+ }
866882 resolver. resolve ( channel) ;
867883 Ok ( ( ) )
868884 }
@@ -891,18 +907,38 @@ impl Channel {
891907 }
892908
893909 fn on_channel_close_received ( & self , method : protocol:: channel:: Close ) -> Result < ( ) > {
894- let error = AMQPError :: try_from ( method. clone ( ) ) . map ( |error| {
895- error ! (
896- channel=%self . id, ?method, ?error,
897- "Channel closed"
898- ) ;
899- ErrorKind :: ProtocolError ( error) . into ( )
900- } ) ;
901- self . set_closing ( error. clone ( ) . ok ( ) ) ;
910+ let error: std:: result:: Result < Error , _ > =
911+ AMQPError :: try_from ( method. clone ( ) ) . map ( |error| {
912+ error ! (
913+ channel=%self . id, ?method, ?error,
914+ "Channel closed"
915+ ) ;
916+ ErrorKind :: ProtocolError ( error) . into ( )
917+ } ) ;
918+ match (
919+ self . recovery_config . auto_recover_channels ,
920+ error. clone ( ) . ok ( ) ,
921+ ) {
922+ ( true , Some ( error) ) if error. is_amqp_soft_error ( ) => {
923+ self . status . set_reconnecting ( error)
924+ }
925+ ( _, err) => self . set_closing ( err) ,
926+ }
902927 let error = error. map_err ( |error| info ! ( channel=%self . id, ?method, code_to_error=%error, "Channel closed with a non-error code" ) ) . ok ( ) ;
903928 let channel = self . clone ( ) ;
904- self . internal_rpc
905- . register_internal_future ( async move { channel. channel_close_ok ( error) . await } ) ;
929+ self . internal_rpc . register_internal_future ( async move {
930+ channel. channel_close_ok ( error) . await ?;
931+ if channel. recovery_config . auto_recover_channels {
932+ let ch = channel. clone ( ) ;
933+ channel. channel_open ( ch) . await ?;
934+ if channel. status . confirm ( ) {
935+ channel
936+ . confirm_select ( ConfirmSelectOptions :: default ( ) )
937+ . await ?;
938+ }
939+ }
940+ Ok ( ( ) )
941+ } ) ;
906942 Ok ( ( ) )
907943 }
908944
0 commit comments