Skip to content

Testing guidelines

mhidalgo-bdai edited this page Nov 21, 2023 · 7 revisions

Unit testing ROS 2 code is no different from unit testing any other software, but some care must be exercised.

Considerations for unit testing

  • Data transport in ROS 2 is non-deterministic. So is callback scheduling when multi-threaded executors (as the one instantiated by process-wide APIs by default) are in place. As such, time sensitive and execution order dependent tests are bound to fail even if only sporadically. A multi-threaded executor spinning in a background thread, as provided by bdai_ros_wrappers.scope functionality, enables synchronization primitives as a mechanism to avoid these issues.
  • ROS 2 middlewares perform peer discovery by default. This allows distributed architectures in production but leads to cross-talk during parallelized testing. domain_coordinator functionality simplifies ROS domain ID assignment enforcing host-wide uniqueness and with it, middleware isolation.

Therefore, as rules of thumb consider:

  1. Using bdai_ros2_wrappers.scope.top to setup ROS 2 in your test fixtures.
    • Isolate it by passing a unique domain ID, as provided by domain_coordinator.domain_id.
  2. Using synchronization primitives to wait with timeouts.
    • Note timeouts make the test time sensitive. Pick timeouts an order of magnitude above the expected test timing.

Writing unit tests using pytest (recommended)

pytest is a testing framework for Python software, the most common in ROS 2 Python codebases.

import domain_coordinator
import pytest

from typing import Iterator

import bdai_ros2_wrappers.scope as ros_scope
from bdai_ros2_wrappers.action_client import ActionClientWrapper
from bdai_ros2_wrappers.futures import wait_for_future
from bdai_ros2_wrappers.scope import ROSAwareScope
from bdai_ros2_wrappers.single_goal_action_server import SingleGoalActionServer
from bdai_ros2_wrappers.subscription import wait_for_message

from std_msgs.msg import String
from std_srvs.srv import Trigger
from example_interfaces.action import Fibonacci

from rclpy.action.server import ServerGoalHandle
from rclpy.qos import QoSProfile, HistoryPolicy, DurabilityPolicy

@pytest.fixture
def ros() -> Iterator[ROSAwareScope]:
    """
    A pytest fixture that will set up and yield a ROS 2 aware global scope to each test that requests it.

    See https://docs.pytest.org/en/7.4.x/fixture.html for a primer on pytest fixtures.
    """
    with domain_coordinator.domain_id() as domain_id:  # to ensure node isolation 
        with ros_scope.top(global_=True, namespace="fixture", domain_id=domain_id) as top:
            yield top

def test_topic_pub_sub(ros: ROSAwareScope) -> None:
    """Asserts that a published message can be received on the other end."""
    qos_profile = QoSProfile(
        depth=100,
        history=HistoryPolicy.KEEP_LAST,
        durability=DurabilityPolicy.TRANSIENT_LOCAL,
    )
    pub = ros.node.create_publisher(String, "test", qos_profile)
    pub.publish(String(data="test"))
    # Message will arrive at an unspecified point in the future, thus
    assert wait_for_message(String, "test", qos_profile=qos_profile, timeout_sec=5.0)

def test_service_server_client(ros: ROSAwareScope) -> None:
    """Asserts that a service server replies to client requests."""
    def callback(_: Trigger.Request, response: Trigger.Response) -> Trigger.Response:
        response.success = True
        return response
    server = ros.node.create_service(Trigger, "trigger", callback)
    client = ros.node.create_client(Trigger, "trigger")
    assert client.wait_for_service(timeout_sec=5.0)
    future = client.call_async(Trigger.Request())
    assert wait_for_future(future, timeout_sec=5.0)
    response = future.result()
    assert response and response.success

def test_action_server_client(ros: ROSAwareScope) -> None:
    """Asserts that an action server reacts to action client requests."""
    def callback(goal_handle: ServerGoalHandle) -> Fibonacci.Result:
        result = Fibonacci.Result()
        result.sequence = [0, 1]
        for i in range(1, goal_handle.request.order):
            result.sequence.append(result.sequence[i] + result.sequence[i-1])
        goal_handle.succeed()
        return result
    server = SingleGoalActionServer(ros.node, Fibonacci, "compute", callback)
    client = ActionClientWrapper(Fibonacci, "compute", ros.node)
    assert client.wait_for_server(timeout_sec=5.0)
    goal = Fibonacci.Goal(order=3)
    result = client.send_goal_and_wait("compute", goal, timeout_sec=5.0)
    assert result and list(result.sequence) == [0, 1, 1, 2]

Writing unit tests using unittest

unittest is the testing framework in Python's standard library.

import contextlib
import domain_coordinator
import unittest

import bdai_ros2_wrappers.scope as ros_scope
from bdai_ros2_wrappers.action_client import ActionClientWrapper
from bdai_ros2_wrappers.futures import wait_for_future
from bdai_ros2_wrappers.scope import ROSAwareScope
from bdai_ros2_wrappers.single_goal_action_server import SingleGoalActionServer
from bdai_ros2_wrappers.subscription import wait_for_message

from std_msgs.msg import String
from std_srvs.srv import Trigger
from example_interfaces.action import Fibonacci

from rclpy.action.server import ServerGoalHandle
from rclpy.qos import QoSProfile, HistoryPolicy, DurabilityPolicy

class TestCase(unittest.TestCase):

    def setUp(self) -> None:
        """Sets up an isolated ROS 2 aware scope for all tests in the test case."""
        self.fixture = contextlib.ExitStack()
        domain_id = self.fixture.enter_context(domain_coordinator.domain_id())
        self.ros = self.fixture.enter_context(ros_scope.top(
            global_=True, namespace="fixture", domain_id=domain_id
        ))

    def tearDown(self) -> None:
        self.fixture.close()  # exits all contexts

    def test_topic_pub_sub(self) -> None:
        """Asserts that a published message can be received on the other end."""
        qos_profile = QoSProfile(
            depth=100,
            history=HistoryPolicy.KEEP_LAST,
            durability=DurabilityPolicy.TRANSIENT_LOCAL,
        )
        pub = self.ros.node.create_publisher(String, "test", qos_profile)
        pub.publish(String(data="test"))
        # Message will arrive at an unspecified point in the future, thus
        self.assertIsNotNone(wait_for_message(
            String, "test", qos_profile=qos_profile, timeout_sec=5.0))

    def test_service_server_client(self) -> None:
        """Asserts that a service server replies to client requests."""
        def callback(_: Trigger.Request, response: Trigger.Response) -> Trigger.Response:
            response.success = True
            return response
        server = self.ros.node.create_service(Trigger, "trigger", callback)
        client = self.ros.node.create_client(Trigger, "trigger")
        self.assertTrue(client.wait_for_service(timeout_sec=5.0))
        future = client.call_async(Trigger.Request())
        self.assertTrue(wait_for_future(future, timeout_sec=5.0))
        response = future.result()
        self.assertIsNotNone(response)
        self.assertTrue(response.success)

    def test_action_server_client(self) -> None:
        """Asserts that an action server reacts to action client requests."""
        def callback(goal_handle: ServerGoalHandle) -> Fibonacci.Result:
            result = Fibonacci.Result()
            result.sequence = [0, 1]
            for i in range(1, goal_handle.request.order):
                result.sequence.append(result.sequence[i] + result.sequence[i-1])
            goal_handle.succeed()
            return result
        server = SingleGoalActionServer(self.ros.node, Fibonacci, "compute", callback)
        client = ActionClientWrapper(Fibonacci, "compute", self.ros.node)
        self.assertTrue(client.wait_for_server(timeout_sec=5.0))
        goal = Fibonacci.Goal(order=3)
        result = client.send_goal_and_wait("compute", goal, timeout_sec=5.0)
        self.assertIsNotNone(result)
        self.assertEqual(list(result.sequence), [0, 1, 1, 2])

Adding unit tests to a package

A package's type and build system dictate how unit tests are to be added. Unit tests for ROS 2 packages are typically hosted under the test subdirectory, so the following assumes this convention is observed.

For ament_cmake packages, the CMakeLists.txt file should have:

if(BUILD_TESTING)
    find_package(ament_cmake_pytest REQUIRED)
    # Define an arbitrary target for your tests such as:
    ament_add_pytest_test(unit_tests test)
endif()

For ament_python packages, the setup.py file should have:

setup(
    # ...
    tests_require=['pytest'],
)

Useful references

Clone this wiki locally