Skip to content

Development guidelines

mhidalgo-bdai edited this page Apr 3, 2025 · 4 revisions

To streamline development, consider the following rules of thumb:

  1. Do not subclass from rclpy.node.Node, use process-wide APIs instead
  2. If necessary, subclass synchros2.node.Node instead
  3. Do not block during synchros2.node.Node subclass initialization
  4. If necessary, block from the synchros2.node.Node.__post_init__ method
  5. Use logging module to log unless within a node subclass

Do not subclass from rclpy.node.Node, use process-wide APIs instead

Rationale

While node subclassing is the typical approach to ROS 2 development, fostering code reuse, it is easy to run into deadlocks once ros_utilities are embraced and asynchronous programming is slowly replaced by synchronous programming. Calling a service and waiting for a reply, waiting for a transform to become available, are some examples of blocking calls on which an rclpy.node.Node subclass could block indefinitely if used at construction time. These problems go away if process-wide APIs and prebaked auto-spinning nodes are used instead.

Application
from synchros2.tf_listener_wrapper import TFListenerWrapper
from synchros2.context import wait_for_shutdown

-import rclpy
-from rclpy.node import Node
+import synchros2.process as ros_process

-class ApplicationNode(Node):
+class Application:
    def __init__(self):
-       super().__init__("app")
-       self.tf_listener = TFListenerWrapper(self)
+       self.tf_listener = TFListenerWrapper()
        self.tf_listener.wait_for_a_tform_b("map", "robot")  # this would block forever on a node subclass

+@ros_process.main()
def main():
-   rclpy.init()
-   rclpy.spin(ApplicationNode())
+   app = Application()
+   wait_for_shutdown()
-   rclpy.shutdown()

If necessary, subclass synchros2.node.Node instead

Rationale

Sometimes subclassing rclpy.node.Node is necessary: to offer standard ROS 2 reusable nodes, to simplify managing ROS 2 interfaces' lifetimes (e.g. subscriptions, clients, etc.), and so on. If done with care, this is perfectly feasible. For node subclasses that rely on synchronous programming, however, rclpy.node.Node default mutually exclusive callback group can lead to deadlocks (as only one callback may be running at a time). This can be circumvented by manually specifying a reentrant or non-reentrant callback group, or simply by subclassing synchros2.node.Node instead, an rclpy.node.Node subclass with a non-reentrant callback group.

Application
-from rclpy.node import Node
+from synchros2.node import Node

class MyNode(Node):
    pass

Do not block during synchros2.node.Node subclass initialization

Rationale

Neither synchros2.node.Node nor rclpy.node.Node can prevent blocking calls during initialization from running into deadlocks (as these may not be served until the node is added to an executor, after initialization). If at all possible, structure your code such that blocking calls are not required during synchros2.node.Node.__init__ invocation.

Application
from typing import Any

from synchros2.node import Node
-from synchros2.subscription import wait_for_message
+from synchros2.subscription import wait_for_message_async

from std_msgs.msg import String

class ComplexInitNode(Node):
    def __init__(self, **kwargs: Any) -> None
        super().__init__("complex_init_node", **kwargs)
-       message = wait_for_message(String, "foo")
-       # do setup with message
+       future = wait_for_message_async(String, "foo")
+       future.add_done_callback(lambda future: self.setup(future.result()))
+
+   def setup(self, message: String) -> None:
+       # do setup with message

If necessary, block from the synchros2.node.Node.__post_init__ method

Rationale

Unfortunately, sometimes blocking during synchros2.node.Node subclass initialization will be near unavoidable (e.g. third-party code may force it on us). When and if node lifetime is fully managed by process-wide APIs, blocking calls may be relocated to a __post_init__ method. If defined, this method will be invoked right after the __init__ method, once the node has been added to an executor. Note that for this not to hang too, the executor must be spinning in a background thread.

Application
from typing import Any

from synchros2.node import Node
from synchros2.subscription import wait_for_message

from std_msgs.msg import String

class PostInitNode(Node):
    def __init__(self, **kwargs: Any) -> None
        super().__init__("post_init_node", **kwargs)
+
+   def __post_init__(self) -> None:
        message = wait_for_message(String, "foo")
        # do setup with message

+@ros_process.main(prebaked=False, autospin=True)
+def main() -> None:
+    with main.managed(PostInitNode) as main.node:
+        main.wait_for_shutdown()

Use logging module to log unless within a node subclass

Rationale

Bringing ROS 2 specifics to code that is largely ROS 2 agnostic works against reuse.

Application
+import logging
from synchros2.process as ros_process

@ros_process.main()
def main():
-   main.node.get_logger().info("Logging!")
+   logging.info("Logging!")