3636from osrf_pycommon .process_utils import async_execute_process # type: ignore
3737from osrf_pycommon .process_utils import AsyncSubprocessProtocol
3838
39+ import psutil
40+
3941from .emit_event import EmitEvent
4042from .opaque_function import OpaqueFunction
4143from .timer_action import TimerAction
6466from ..launch_description_entity import LaunchDescriptionEntity
6567from ..some_entities_type import SomeEntitiesType
6668from ..some_substitutions_type import SomeSubstitutionsType
67- from ..substitution import Substitution # noqa: F401
6869from ..substitutions import LaunchConfiguration
69- from ..substitutions import PythonExpression
7070from ..utilities import is_a_subclass
7171from ..utilities import normalize_to_list_of_substitutions
7272from ..utilities import perform_substitutions
@@ -86,6 +86,8 @@ def __init__(
8686 'sigterm_timeout' , default = 5 ),
8787 sigkill_timeout : SomeSubstitutionsType = LaunchConfiguration (
8888 'sigkill_timeout' , default = 5 ),
89+ signal_lingering_subprocesses : SomeSubstitutionsType = LaunchConfiguration (
90+ 'signal_lingering_subprocesses' , default = True ),
8991 emulate_tty : bool = False ,
9092 output : SomeSubstitutionsType = 'log' ,
9193 output_format : Text = '[{this.process_description.final_name}] {line}' ,
@@ -158,6 +160,11 @@ def __init__(
158160 as a string or a list of strings and Substitutions to be resolved
159161 at runtime, defaults to the LaunchConfiguration called
160162 'sigkill_timeout'
163+ :param: signal_lingering_subprocesses if `True`, all subprocesses spawned by the process
164+ will be signaled to make sure they finish.
165+ The sequence of signals used is the same SIGINT/SIGTERM/SIGKILL sequence
166+ used to kill the main process.
167+ Subprocesses start being signaled when the main process completes.
161168 :param: emulate_tty emulate a tty (terminal), defaults to False, but can
162169 be overridden with the LaunchConfiguration called 'emulate_tty',
163170 the value of which is evaluated as true or false according to
@@ -190,6 +197,8 @@ def __init__(
190197 self .__shell = shell
191198 self .__sigterm_timeout = normalize_to_list_of_substitutions (sigterm_timeout )
192199 self .__sigkill_timeout = normalize_to_list_of_substitutions (sigkill_timeout )
200+ self .__signal_lingering_subprocesses = normalize_to_list_of_substitutions (
201+ signal_lingering_subprocesses )
193202 self .__emulate_tty = emulate_tty
194203 # Note: we need to use a temporary here so that we don't assign values with different types
195204 # to the same variable
@@ -219,6 +228,7 @@ def __init__(
219228 self .__shutdown_future = None # type: Optional[asyncio.Future]
220229 self .__sigterm_timer = None # type: Optional[TimerAction]
221230 self .__sigkill_timer = None # type: Optional[TimerAction]
231+ self .__children : List [psutil .Process ] = []
222232 self .__stdout_buffer = io .StringIO ()
223233 self .__stderr_buffer = io .StringIO ()
224234
@@ -291,7 +301,11 @@ def _shutdown_process(self, context, *, send_sigint):
291301 self .__shutdown_future .set_result (None )
292302
293303 # Otherwise process is still running, start the shutdown procedures.
294- context .extend_locals ({'process_name' : self .process_details ['name' ]})
304+ context .extend_locals (
305+ {
306+ 'process_name' : self .process_details ['name' ],
307+ 'process_pid' : self .process_details ['pid' ],
308+ })
295309 actions_to_return = self .__get_shutdown_timer_actions ()
296310 if send_sigint :
297311 actions_to_return .append (self .__get_sigint_event ())
@@ -452,23 +466,17 @@ def __get_shutdown_timer_actions(self) -> List[Action]:
452466 base_msg = \
453467 "process[{}] failed to terminate '{}' seconds after receiving '{}', escalating to '{}'"
454468
455- def printer (context , msg , timeout_substitutions ):
456- self .__logger .error (msg .format (
457- context .locals .process_name ,
458- perform_substitutions (context , timeout_substitutions ),
459- ))
469+ def printer (context , msg ):
470+ self .__logger .error (msg .format (context .locals .process_name ))
460471
461- sigterm_timeout = self .__sigterm_timeout
462- sigkill_timeout = [PythonExpression (
463- ('float(' , * self .__sigterm_timeout , ') + float(' , * self .__sigkill_timeout , ')' )
464- )]
465472 # Setup a timer to send us a SIGTERM if we don't shutdown quickly.
473+ sigterm_timeout = self .__sigterm_timeout_value
466474 self .__sigterm_timer = TimerAction (
467475 period = sigterm_timeout ,
468476 actions = [
469477 OpaqueFunction (
470478 function = printer ,
471- args = (base_msg .format ('{}' , '{}' , 'SIGINT' , 'SIGTERM' ), sigterm_timeout )
479+ args = (base_msg .format ('{}' , sigterm_timeout , 'SIGINT' , 'SIGTERM' ), )
472480 ),
473481 EmitEvent (event = SignalProcess (
474482 signal_number = signal .SIGTERM ,
@@ -477,13 +485,14 @@ def printer(context, msg, timeout_substitutions):
477485 ],
478486 cancel_on_shutdown = False ,
479487 )
488+ sigkill_timeout = self .__sigterm_timeout_value + self .__sigkill_timeout_value
480489 # Setup a timer to send us a SIGKILL if we don't shutdown after SIGTERM.
481490 self .__sigkill_timer = TimerAction (
482491 period = sigkill_timeout ,
483492 actions = [
484493 OpaqueFunction (
485494 function = printer ,
486- args = (base_msg .format ('{}' , '{}' , 'SIGTERM' , 'SIGKILL' ), sigkill_timeout )
495+ args = (base_msg .format ('{}' , sigkill_timeout , 'SIGTERM' , 'SIGKILL' ), )
487496 ),
488497 EmitEvent (event = SignalProcess (
489498 signal_number = 'SIGKILL' ,
@@ -492,6 +501,13 @@ def printer(context, msg, timeout_substitutions):
492501 ],
493502 cancel_on_shutdown = False ,
494503 )
504+ self .__children = []
505+ pid = self ._subprocess_transport .get_pid ()
506+ if pid is not None :
507+ try :
508+ self .__children = psutil .Process (pid ).children (recursive = True )
509+ except psutil .NoSuchProcess :
510+ pass
495511 return [
496512 cast (Action , self .__sigterm_timer ),
497513 cast (Action , self .__sigkill_timer ),
@@ -503,12 +519,15 @@ def __get_sigint_event(self):
503519 process_matcher = matches_action (self ),
504520 ))
505521
506- def __cleanup (self ):
507- # Cancel any pending timers we started.
522+ def __cleanup_timers (self ):
508523 if self .__sigterm_timer is not None :
509524 self .__sigterm_timer .cancel ()
510525 if self .__sigkill_timer is not None :
511526 self .__sigkill_timer .cancel ()
527+
528+ def __cleanup (self ):
529+ # Cancel any pending timers we started.
530+ self .__cleanup_timers ()
512531 # Close subprocess transport if any.
513532 if self ._subprocess_transport is not None :
514533 self ._subprocess_transport .close ()
@@ -541,6 +560,48 @@ def on_stdout_received(self, data: bytes) -> None:
541560 def on_stderr_received (self , data : bytes ) -> None :
542561 self .__context .emit_event_sync (ProcessStderr (text = data , ** self .__process_event_args ))
543562
563+ async def _signal_subprocesses (self , context ):
564+ to_signal = self .__children
565+ signaled = []
566+ sig = signal .SIGINT
567+ start_time = context .asyncio_loop .time ()
568+ sigterm_timeout = self .__sigterm_timeout_value
569+ sigkill_timeout = self .__sigterm_timeout_value + self .__sigkill_timeout_value
570+ process_pid = self .process_details ['pid' ]
571+ process_name = self .process_details ['name' ]
572+ log_prefix_format = (
573+ 'subprocess[pid={}] of process['
574+ f'{ process_name } , pid={ process_pid } ]: ' )
575+ next_signals = iter (((signal .SIGTERM , sigterm_timeout ), (signal .SIGKILL , sigkill_timeout )))
576+ while True :
577+ for p in to_signal :
578+ try :
579+ p .send_signal (sig )
580+ except psutil .NoSuchProcess :
581+ continue
582+ log_prefix = log_prefix_format .format (p .pid )
583+ self .__logger .info (
584+ f'{ log_prefix } sending { sig .name } to subprocess directly.'
585+ )
586+ signaled .append (p )
587+ try :
588+ sig , timeout = next (next_signals )
589+ except StopIteration :
590+ return
591+ current_time = context .asyncio_loop .time ()
592+ while current_time < start_time + timeout :
593+ await asyncio .sleep (min (0.5 , start_time + timeout - current_time ))
594+ for p in list (signaled ):
595+ if not p .is_running ():
596+ log_prefix = log_prefix_format .format (p .pid )
597+ self .__logger .info (f'{ log_prefix } exited' )
598+ signaled .remove (p )
599+ if not signaled :
600+ return
601+ current_time = context .asyncio_loop .time ()
602+ to_signal = signaled
603+ signaled = []
604+
544605 async def __execute_process (self , context : LaunchContext ) -> None :
545606 process_event_args = self .__process_event_args
546607 if process_event_args is None :
@@ -617,8 +678,13 @@ async def __execute_process(self, context: LaunchContext) -> None:
617678 timeout = self .__respawn_delay
618679 )
619680 if not self .__shutdown_future .done ():
681+ if self .__signal_lingering_subprocesses_value :
682+ await self ._signal_subprocesses (context )
620683 context .asyncio_loop .create_task (self .__execute_process (context ))
621684 return
685+ self .__cleanup_timers ()
686+ if self .__signal_lingering_subprocesses_value :
687+ await self ._signal_subprocesses (context )
622688 self .__cleanup ()
623689
624690 def prepare (self , context : LaunchContext ):
@@ -701,6 +767,12 @@ def execute(self, context: LaunchContext) -> Optional[List[LaunchDescriptionEnti
701767 ]
702768 for event_handler in event_handlers :
703769 context .register_event_handler (event_handler )
770+ self .__sigterm_timeout_value = perform_typed_substitution (
771+ context , self .__sigterm_timeout , float )
772+ self .__sigkill_timeout_value = perform_typed_substitution (
773+ context , self .__sigkill_timeout , float )
774+ self .__signal_lingering_subprocesses_value = perform_typed_substitution (
775+ context , self .__signal_lingering_subprocesses , bool )
704776
705777 try :
706778 self .__completed_future = context .asyncio_loop .create_future ()
0 commit comments