diff --git a/clearpath_tests/clearpath_tests/canbus_test.py b/clearpath_tests/clearpath_tests/canbus_test.py index b5e2891a..78833afc 100644 --- a/clearpath_tests/clearpath_tests/canbus_test.py +++ b/clearpath_tests/clearpath_tests/canbus_test.py @@ -29,11 +29,8 @@ import re import subprocess -from clearpath_generator_common.common import BaseGenerator from clearpath_tests.test_node import ClearpathTestNode, ClearpathTestResult -import rclpy - class CanbusTestNode(ClearpathTestNode): """Check the output of candump and count the number if IDs.""" @@ -68,15 +65,6 @@ def __init__( self.test_name = f'CAN {self.can_interface}' - def start(self): - while True: - result = self.read_can_log() - - if result.success: - self.get_logger().info(result.message) - else: - self.get_logger().warning(result.message) - def run_test(self): self.get_logger().info(f'Collecting CAN messages on {self.can_interface}...') result = self.read_can_log() @@ -178,23 +166,3 @@ def get_test_result_details(self): details += '\nDevices that do not use CANopen may be incorrectly identified in the list above' # noqa: E501 return details - - -def main(): - setup_path = BaseGenerator.get_args() - rclpy.init() - - ct = CanbusTestNode(setup_path=setup_path) - - try: - ct.start() - rclpy.spin(ct) - except KeyboardInterrupt: - pass - - ct.destroy_node() - rclpy.shutdown() - - -if __name__ == '__main__': - main() diff --git a/clearpath_tests/clearpath_tests/diagnostic_test.py b/clearpath_tests/clearpath_tests/diagnostic_test.py index 5f159206..193eca5a 100644 --- a/clearpath_tests/clearpath_tests/diagnostic_test.py +++ b/clearpath_tests/clearpath_tests/diagnostic_test.py @@ -29,14 +29,13 @@ import re from clearpath_config.common.types.platform import Platform -from clearpath_generator_common.common import BaseGenerator from clearpath_tests.test_node import ClearpathTestNode, ClearpathTestResult +from clearpath_tests.timer import Timeout from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus import rclpy from rclpy.qos import qos_profile_system_default -from rclpy.time import Duration PLATFORM_ANY = '*' @@ -188,27 +187,21 @@ def diagnostic_callback(self, diagnostic_array): elif status.level == DiagnosticStatus.STALE: pass - def start(self): - self.diagnostc_sub = self.create_subscription( - DiagnosticArray, - f'/{self.namespace}/diagnostics', - self.diagnostic_callback, - qos_profile_system_default - ) - def run_test(self): results = [] - self.test_in_progress = True - self.start() # collect 30s worth of data - start_time = self.get_clock().now() - end_time = start_time + Duration(seconds=30.0) - self.get_logger().info('Collecting 30 seconds of diagnostic data...') - while self.get_clock().now() < end_time: - rclpy.spin_once(self) + self.diagnostc_sub = self.create_subscription( + DiagnosticArray, + f'/{self.namespace}/diagnostics', + self.diagnostic_callback, + qos_profile_system_default + ) + timeout = Timeout(self, 30) + while not timeout.elapsed: + rclpy.spin_once(self, timeout_sec=1.0) if len(self.warnings) == 0 and len(self.errors) == 0 and len(self.allowed_errors) == 0: results.append(ClearpathTestResult(True, 'Diagnostics', 'No errors, no warnings')) @@ -257,23 +250,3 @@ def get_test_result_details(self): details += f'* {warn.name}: {warn.message}\n' return details - - -def main(): - setup_path = BaseGenerator.get_args() - rclpy.init() - - dt = DiagnosticTestNode(setup_path) - - try: - dt.start() - rclpy.spin(dt) - except KeyboardInterrupt: - pass - - dt.destroy_node() - rclpy.shutdown() - - -if __name__ == '__main__': - main() diff --git a/clearpath_tests/clearpath_tests/drive_test.py b/clearpath_tests/clearpath_tests/drive_test.py index b2c03cf6..2d945751 100644 --- a/clearpath_tests/clearpath_tests/drive_test.py +++ b/clearpath_tests/clearpath_tests/drive_test.py @@ -28,8 +28,6 @@ # POSSIBILITY OF SUCH DAMAGE. import math -from clearpath_generator_common.common import BaseGenerator - from clearpath_tests.mobility_test import MobilityTestNode from clearpath_tests.test_node import ClearpathTestResult @@ -185,28 +183,3 @@ def run_test(self): )) return results - - -def main(): - setup_path = BaseGenerator.get_args() - rclpy.init() - - try: - dt = DriveTestNode(setup_path) - dt.start() - try: - while not dt.test_done: - rclpy.spin_once(dt) - dt.get_logger().info('Test complete') - except KeyboardInterrupt: - dt.get_logger().info('User aborted! Cleaning up & exiting...') - dt.destroy_node() - except TimeoutError: - # This error is already logged when it's raised - pass - - rclpy.shutdown() - - -if __name__ == '__main__': - main() diff --git a/clearpath_tests/clearpath_tests/estop_test.py b/clearpath_tests/clearpath_tests/estop_test.py index 7b220a04..af4a169d 100644 --- a/clearpath_tests/clearpath_tests/estop_test.py +++ b/clearpath_tests/clearpath_tests/estop_test.py @@ -29,12 +29,11 @@ import threading import time -from clearpath_generator_common.common import BaseGenerator from clearpath_tests.test_node import ClearpathTestNode, ClearpathTestResult +from clearpath_tests.timer import Timeout from geometry_msgs.msg import TwistStamped import rclpy -from rclpy.duration import Duration from rclpy.qos import qos_profile_sensor_data, qos_profile_system_default from std_msgs.msg import Bool @@ -72,14 +71,6 @@ def estop_callback(self, msg: Bool): state = 'clear' self.get_logger().info(f'{self.estop_type} state: {state}') - def start(self): - self.estop_sub = self.create_subscription( - Bool, - f'/{self.namespace}/platform/emergency_stop', - self.estop_callback, - qos_profile_sensor_data - ) - def run_test(self): if self.optional: user_input = self.promptYN( @@ -90,7 +81,13 @@ def run_test(self): return [ClearpathTestResult(None, self.test_name, 'Skipped; component not installed')] # noqa: E501 self.test_in_progress = True - self.start() + + self.estop_sub = self.create_subscription( + Bool, + f'/{self.namespace}/platform/emergency_stop', + self.estop_callback, + qos_profile_sensor_data + ) self.cmd_vel_pub = self.create_publisher( TwistStamped, @@ -105,7 +102,7 @@ def run_test(self): ui_thread = threading.Thread(target=self.run_ui) ui_thread.start() while not self.test_done: - rclpy.spin_once(self) + rclpy.spin_once(self, timeout_sec=1.0) ui_thread.join() return self.results @@ -133,14 +130,14 @@ def run_ui(self): return # wait until we know the state of the e-stop - start_time = self.get_clock().now() - timeout = Duration(seconds=10) + timeout = Timeout(self, 10.0) print(f'Getting {self.estop_type} status...') while ( - self.estop_engaged is None and - (self.get_clock().now() - start_time) < timeout + self.estop_engaged is None + and not timeout.elapsed ): time.sleep(0.1) + timeout.abort() if self.estop_engaged is None: results.append(ClearpathTestResult( @@ -240,9 +237,8 @@ def run_ui(self): def command_wheels(self): self.cmd_vel.twist.linear.x = 0.1 - start_time = self.get_clock().now() - duration = Duration(seconds=2) - while (self.get_clock().now() - start_time) < duration: + timeout = Timeout(self, 2.0) + while not timeout.elapsed: pass self.cmd_vel.twist.linear.x = 0.0 @@ -256,33 +252,12 @@ def wait_for_estop(self, state, timeout_seconds=10): @return True if the e-stop state is in the desired state, otherwise False """ - start_time = self.get_clock().now() - now = self.get_clock().now() - timeout = Duration(seconds=timeout_seconds) + timeout = Timeout(self, timeout_seconds) while ( - self.estop_engaged != state and - (now - start_time) < timeout + self.estop_engaged != state + and not timeout.elapsed ): - now = self.get_clock().now() + pass + timeout.abort() return self.estop_engaged == state - - -def main(): - setup_path = BaseGenerator.get_args() - rclpy.init() - - st = EstopTestNode('', setup_path=setup_path) - - try: - st.start() - rclpy.spin(st) - except KeyboardInterrupt: - pass - - st.destroy_node() - rclpy.shutdown() - - -if __name__ == '__main__': - main() diff --git a/clearpath_tests/clearpath_tests/fan_test.py b/clearpath_tests/clearpath_tests/fan_test.py index a8d11969..5f99432f 100755 --- a/clearpath_tests/clearpath_tests/fan_test.py +++ b/clearpath_tests/clearpath_tests/fan_test.py @@ -26,12 +26,11 @@ # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. -from clearpath_generator_common.common import BaseGenerator from clearpath_platform_msgs.msg import Fans, Status from clearpath_tests.test_node import ClearpathTestNode, ClearpathTestResult +from clearpath_tests.timer import Timeout import rclpy -from rclpy.duration import Duration from rclpy.qos import qos_profile_sensor_data @@ -65,10 +64,10 @@ def check_firmware_version(self): """ self.mcu_status = None - start_at = self.get_clock().now() - timeout_duration = Duration(seconds=10) - while self.get_clock().now() - start_at <= timeout_duration and self.mcu_status is None: - rclpy.spin_once(self) + timeout = Timeout(self, 10.0) + while not timeout.elapsed and self.mcu_status is None: + rclpy.spin_once(self, timeout_sec=1.0) + timeout.abort() if self.mcu_status is None: return (None, False) @@ -134,10 +133,9 @@ def run_test(self): def wait_3s(): self.get_logger().info('Waiting for fans to spin up/down...') - start = self.get_clock().now() - sleep_time = Duration(seconds=3) - while self.get_clock().now() - start < sleep_time: - rclpy.spin_once(self) + timeout = Timeout(self, 3.0) + while not timeout.elapsed: + rclpy.spin_once(self, timeout_sec=1.0) for i in range(self.n_fans): self.fan_msg.fans[i] = 0 @@ -172,22 +170,3 @@ def wait_3s(): results.append(ClearpathTestResult(False, 'Fans (all on)', None)) return results - - -def main(): - setup_path = BaseGenerator.get_args() - rclpy.init() - - fan_test = FanTestNode(setup_path) - - try: - rclpy.spin(fan_test) - except KeyboardInterrupt: - pass - - fan_test.destroy_node() - rclpy.shutdown() - - -if __name__ == '__main__': - main() diff --git a/clearpath_tests/clearpath_tests/imu_test.py b/clearpath_tests/clearpath_tests/imu_test.py index 39b074f6..21e46f7c 100644 --- a/clearpath_tests/clearpath_tests/imu_test.py +++ b/clearpath_tests/clearpath_tests/imu_test.py @@ -28,16 +28,15 @@ # POSSIBILITY OF SUCH DAMAGE. import math -from clearpath_generator_common.common import BaseGenerator from clearpath_tests.test_node import ( ClearpathTestNode, ClearpathTestResult, ) from clearpath_tests.tf import ConfigurableTransformListener +from clearpath_tests.timer import Timeout from geometry_msgs.msg import Vector3Stamped import rclpy -from rclpy.duration import Duration from rclpy.qos import qos_profile_sensor_data from sensor_msgs.msg import Imu from tf2_geometry_msgs import do_transform_vector3 @@ -104,27 +103,24 @@ def imu_raw_callback(self, imu_data: Imu): self.accel_samples.append(transformed_accel) self.gyro_samples.append(transformed_gyro) - def start(self): - self.imu_sub = self.create_subscription( - Imu, - f'/{self.namespace}/sensors/imu_0/data_raw', - self.imu_raw_callback, - qos_profile=qos_profile_sensor_data, - ) - def run_test(self): def gather_samples(): - sample_duration = Duration(seconds=10) print('Gathering 10s worth of IMU data...') - start_time = self.get_clock().now() + timeout = Timeout(self, 10.0) self.record_data = True - while self.get_clock().now() - start_time < sample_duration: - rclpy.spin_once(self) + while not timeout.elapsed: + rclpy.spin_once(self, timeout_sec=1.0) self.record_data = False self.record_data = False self.test_in_progress = True - self.start() + + self.imu_sub = self.create_subscription( + Imu, + f'/{self.namespace}/sensors/imu_0/data_raw', + self.imu_raw_callback, + qos_profile=qos_profile_sensor_data, + ) results = [] @@ -137,7 +133,9 @@ def gather_samples(): )) else: gather_samples() - results.append(self.check_gravity('level', 0, 0)) + new_results = self.check_gravity('level', 0, 0) + for r in new_results: + results.append(r) self.accel_samples.clear() self.gyro_samples.clear() @@ -150,7 +148,9 @@ def gather_samples(): )) else: gather_samples() - results.append(self.check_gravity('rear raised', math.radians(-20), 0)) + new_results = self.check_gravity('rear raised', math.radians(-20), 0) + for r in new_results: + results.append(r) self.accel_samples.clear() self.gyro_samples.clear() @@ -163,7 +163,9 @@ def gather_samples(): )) else: gather_samples() - results.append(self.check_gravity('left raised', 0, math.radians(20))) + new_results = self.check_gravity('left raised', 0, math.radians(20)) + for r in new_results: + results.append(r) self.accel_samples.clear() self.gyro_samples.clear() @@ -179,7 +181,7 @@ def check_gravity(self, label, x_angle=0.0, y_angle=0.0) -> ClearpathTestResult: @param x_angle The robot's front/back inclination @param y_angle The robot's left/right inclination - @return A ClearpathTestResult indicating if gravity is OK + @return A list of ClearpathTestResults indicating if gravity is OK """ if len(self.accel_samples) < 10: return ClearpathTestResult( @@ -188,13 +190,9 @@ def check_gravity(self, label, x_angle=0.0, y_angle=0.0) -> ClearpathTestResult: f'{len(self.accel_samples)} samples collected; is IMU publishing at the right rate?', # noqa: E501 ) + results = [] + g = 9.807 - expected_x = g * math.sin(x_angle) - expected_y = g * math.sin(y_angle) - if abs(x_angle) > abs(y_angle): - expected_z = g * math.cos(x_angle) - else: - expected_z = g * math.cos(y_angle) avg_x = 0 avg_y = 0 @@ -207,62 +205,38 @@ def check_gravity(self, label, x_angle=0.0, y_angle=0.0) -> ClearpathTestResult: avg_y /= len(self.accel_samples) avg_z /= len(self.accel_samples) - # allow 20% error on the IMU since the ground may never be completely level - # and the calibration may not be super accurate for some models - test_tolerance = 0.2 - - x_lower_limit = expected_x - g * test_tolerance - x_upper_limit = expected_x + g * test_tolerance - - y_lower_limit = expected_y - g * test_tolerance - y_upper_limit = expected_y + g * test_tolerance - - z_lower_limit = expected_z - g * test_tolerance - z_upper_limit = expected_z + g * test_tolerance - if ( - # check that the measurements are witin our error bars - avg_x >= x_lower_limit and avg_x <= x_upper_limit and - avg_y >= y_lower_limit and avg_y <= y_upper_limit and - avg_z >= z_lower_limit and avg_z <= z_upper_limit and - - # ensure gravity is mainly +Z - avg_z > 5.0 and - - # check our inclination is the right way - ( - (avg_x > 0 and x_angle > 0) or - (avg_y > 0 and y_angle > 0) or - (x_angle == 0 and y_angle == 0) - ) - ): - return ClearpathTestResult( - True, - f'{self.test_name} ({label})', - f'Measured gravity vector: ({avg_x:0.2f}, {avg_y:0.2f}, {avg_z:0.2f}) Expected: ({expected_x:0.2f}, {expected_y:0.2f}, {expected_z:0.2f})' # noqa: E501 - ) - else: - return ClearpathTestResult( - False, - f'{self.test_name} ({label})', - f'Measured gravity vector: ({avg_x:0.2f}, {avg_y:0.2f}, {avg_z:0.2f}) Expected: ({expected_x:0.2f}, {expected_y:0.2f}, {expected_z:0.2f})' # noqa: E501 - ) - - -def main(): - setup_path = BaseGenerator.get_args() - rclpy.init() - - it = ImuTestNode(imu_num=0, setup_path=setup_path) - - try: - it.start() - rclpy.spin(it) - except KeyboardInterrupt: - pass - - it.destroy_node() - rclpy.shutdown() + # ensure the magnitude of the vector is about g + measured_g = math.sqrt(avg_x ** 2 + avg_y ** 2 + avg_z ** 2) + g_err = min(measured_g, g) / max(measured_g, g) + results.append(ClearpathTestResult( + g_err > 0.75, + f'{self.test_name} {label} (g magnitude)', + f'Measured gravity: {measured_g:0.2f}m/s^2. Accuracy {g_err:0.2f}', + )) + + # estimate our actual inclination based on the IMU data + angle_slop = 10 * math.pi / 180.0 # allow +/- 10 degree measurement error + calculated_inclination_x = math.asin(avg_x / measured_g) + calculated_inclination_y = math.asin(avg_y / measured_g) + + if x_angle != 0: + results.append(ClearpathTestResult( + ( + x_angle - angle_slop <= calculated_inclination_x + and calculated_inclination_x <= x_angle + angle_slop + ), + f'{self.test_name} {label}', + f'Measured inclination: {calculated_inclination_x * 180.0 / math.pi:0.2f}. Expected: {x_angle * 180 / math.pi:0.2f}', # noqa:E501 + )) + if y_angle != 0: + results.append(ClearpathTestResult( + ( + y_angle - angle_slop <= calculated_inclination_y + and calculated_inclination_y <= y_angle + angle_slop + ), + f'{self.test_name} {label}', + f'Measured inclination: {calculated_inclination_y * 180.0 / math.pi:0.2f}. Expected: {y_angle * 180 / math.pi:0.2f}', # noqa:E501 + )) -if __name__ == '__main__': - main() + return results diff --git a/clearpath_tests/clearpath_tests/light_test.py b/clearpath_tests/clearpath_tests/light_test.py index d66b3f4d..e800f3e1 100755 --- a/clearpath_tests/clearpath_tests/light_test.py +++ b/clearpath_tests/clearpath_tests/light_test.py @@ -29,7 +29,6 @@ import threading from clearpath_config.common.types.platform import Platform -from clearpath_generator_common.common import BaseGenerator from clearpath_platform_msgs.msg import Lights, RGB from clearpath_tests.test_node import ClearpathTestNode, ClearpathTestResult @@ -386,23 +385,3 @@ def run_ui(self): results.append(ClearpathTestResult(True, 'Bottom row white', None)) self.test_done = True - - -def main(): - setup_path = BaseGenerator.get_args() - rclpy.init() - - lt = LightTestNode(setup_path=setup_path) - - try: - lt.start() - rclpy.spin(lt) - except KeyboardInterrupt: - pass - - lt.destroy_node() - rclpy.shutdown() - - -if __name__ == '__main__': - main() diff --git a/clearpath_tests/clearpath_tests/linear_acceleration_test.py b/clearpath_tests/clearpath_tests/linear_acceleration_test.py index 85c663fc..6bdd3bdb 100644 --- a/clearpath_tests/clearpath_tests/linear_acceleration_test.py +++ b/clearpath_tests/clearpath_tests/linear_acceleration_test.py @@ -27,13 +27,13 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. -from clearpath_generator_common.common import BaseGenerator from clearpath_tests.mobility_test import MobilityTestNode from clearpath_tests.test_node import ClearpathTestResult from clearpath_tests.tf import ConfigurableTransformListener +from clearpath_tests.timer import Timeout + from geometry_msgs.msg import Vector3Stamped import rclpy -from rclpy.duration import Duration from rclpy.qos import qos_profile_sensor_data from sensor_msgs.msg import Imu from tf2_geometry_msgs import do_transform_vector3 @@ -104,20 +104,9 @@ def imu_callback(self, imu_data): if self.record_data: self.accel_samples.append(transformed_accel) - def start(self): - super().start() - - # Subscribe to our default IMU - imu_topic = f'/{self.namespace}/sensors/imu_{self.imu_num}/data' - self.get_logger().info(f'Subscribing to IMU data on {imu_topic}...') - self.imu_sub = self.create_subscription( - Imu, - imu_topic, - self.imu_callback, - qos_profile=qos_profile_sensor_data, - ) - def run_test(self): + self.start() + self.cmd_vel.twist.linear.x = 0.0 self.cmd_vel.twist.linear.y = 0.0 self.cmd_vel.twist.linear.z = 0.0 @@ -139,17 +128,26 @@ def run_test(self): return [ClearpathTestResult(False, self.test_name, 'User skipped')] self.get_logger().info('Starting acceleration test') - self.start() + + # Subscribe to our default IMU + imu_topic = f'/{self.namespace}/sensors/imu_{self.imu_num}/data' + self.get_logger().info(f'Subscribing to IMU data on {imu_topic}...') + self.imu_sub = self.create_subscription( + Imu, + imu_topic, + self.imu_callback, + qos_profile=qos_profile_sensor_data, + ) # wait until we get the first IMU message or 10s passes - start_time = self.get_clock().now() - timeout_duration = Duration(seconds=10) + timeout = Timeout(self, 10.0) while ( self.latest_imu is None - and self.get_clock().now() - start_time <= timeout_duration + and not timeout.elapsed and not self.test_error ): - rclpy.spin_once(self) + rclpy.spin_once(self, timeout_sec=1.0) + timeout.abort() if self.test_error: self.get_logger().warning(f'Test aborted due to an error: {self.test_error_msg}') @@ -163,35 +161,37 @@ def run_test(self): )] # accelerate & decelerate over 5s - accel_duration = Duration(seconds=self.acceleration_time) + timeout = Timeout(self, 5.0) start_time = self.get_clock().now() while ( not self.test_error - and self.get_clock().now() - start_time <= accel_duration + and not timeout.elapsed ): dt = (self.get_clock().now() - start_time).nanoseconds / 1_000_000_000 self.cmd_vel.twist.linear.x = self.acceleration * dt self.record_data = True - rclpy.spin_once(self) + rclpy.spin_once(self, timeout_sec=1.0) self.record_data = False + timeout.abort() # smoothly decelerate, but stop don't record any more data + timeout = Timeout(self, 5.0) start_time = self.get_clock().now() while ( not self.test_error - and self.get_clock().now() - start_time <= accel_duration + and not timeout.elapsed ): dt = (self.get_clock().now() - start_time).nanoseconds / 1_000_000_000 self.cmd_vel.twist.linear.x = self.acceleration * (self.acceleration_time - dt) - rclpy.spin_once(self) + rclpy.spin_once(self, timeout_sec=1.0) + timeout.abort() # stop driving # wait 1s to ensure we publish the command self.cmd_vel.twist.linear.x = 0.0 - test_wait = Duration(seconds=1) - start_time = self.get_clock().now() - while self.get_clock().now() - start_time <= test_wait: - rclpy.spin_once(self) + timeout = Timeout(self, 1.0) + while not timeout.elapsed: + rclpy.spin_once(self, timeout_sec=1.0) # process the results results = self.test_results @@ -215,34 +215,17 @@ def run_test(self): # self.test_name, # f'Recorded linear acceleration: {avg_accel:0.2f}m/s^2 (accuracy: {measured_accuracy:0.2f})' # noqa: E501 # )) - results.append(ClearpathTestResult( - avg_accel > 0, - self.test_name, - 'Acceleration oriented correctly' - )) + if avg_accel > 0: + results.append(ClearpathTestResult( + True, + self.test_name, + f'Acceleration oriented correctly ({avg_accel:0.2f}m/s^2)' + )) + else: + results.append(ClearpathTestResult( + False, + self.test_name, + f'Acceleration oriented incorrectly ({avg_accel:0.2f}m/s^2)' + )) return results - - -def main(): - setup_path = BaseGenerator.get_args() - rclpy.init() - - try: - rt = LinearAccelerationTestNode(setup_path) - rt.start() - try: - while not rt.test_done: - rclpy.spin_once(rt) - rt.get_logger().info('Test complete') - except KeyboardInterrupt: - rt.get_logger().info('User aborted! Cleaning up & exiting...') - rt.destroy_node() - except TimeoutError: - # This error is already logged when it's raised - pass - rclpy.shutdown() - - -if __name__ == '__main__': - main() diff --git a/clearpath_tests/clearpath_tests/mcu_test.py b/clearpath_tests/clearpath_tests/mcu_test.py index 8b426bfd..effe6971 100644 --- a/clearpath_tests/clearpath_tests/mcu_test.py +++ b/clearpath_tests/clearpath_tests/mcu_test.py @@ -29,17 +29,14 @@ import os import re import subprocess -import time from clearpath_config.common.types.platform import Platform -from clearpath_generator_common.common import BaseGenerator from clearpath_platform_msgs.msg import Status from clearpath_tests.test_node import ClearpathTestNode, ClearpathTestResult +from clearpath_tests.timer import Timeout import rclpy from rclpy.qos import qos_profile_sensor_data -from rclpy.time import Duration - MCU_IP = 1 MCU_SERIAL = 2 @@ -192,41 +189,23 @@ def get_firmware_version(self): def mcu_callback(status): self.mcu_status = status - mcu_sub = self.create_subscription( + self.mcu_sub = self.create_subscription( Status, f'/{self.namespace}/platform/mcu/status', mcu_callback, qos_profile_sensor_data, ) - start_at = self.get_clock().now() - timeout_duration = Duration(seconds=10) - while self.get_clock().now() - start_at <= timeout_duration and self.mcu_status is None: - rclpy.spin_once(self) - mcu_sub.destroy() + timeout = Timeout(self, 5) + while not timeout.elapsed and self.mcu_status is None: + rclpy.spin_once(self, timeout_sec=1) + timeout.abort() if self.mcu_status is None: return (None, None) else: return (self.mcu_status.hardware_id, self.mcu_status.firmware_version) - def start(self): - while True: - if self.mode == MCU_IP: - if self.ping_ip(self.address): - self.get_logger().info(f'MCU is responded to ping {self.address}') - else: - self.get_logger().warning(f'MCU did not respond to ping {self.address}') - elif self.mode == MCU_SERIAL: - if self.check_serial_exists(self.address): - if self.check_serial_permissions(self.address): - self.get_logger().info(f'MCU handle {self.address} exists with RW permissions') # noqa: E501 - else: - self.get_logger().warning(f'Invalid permissions for MCU handle {self.address}') # noqa: E501 - else: - self.get_logger().warning(f'MCU handle {self.address} does not exist') - time.sleep(5) - def run_test(self): if self.mode == MCU_IP: if self.ping_ip(self.address): @@ -272,23 +251,3 @@ def get_test_result_details(self): * Version: {'unknown' if not firmware_version else firmware_version} """ - - -def main(): - setup_path = BaseGenerator.get_args() - rclpy.init() - - mt = McuTestNode(setup_path=setup_path) - - try: - mt.start() - rclpy.spin(mt) - except KeyboardInterrupt: - pass - - mt.destroy_node() - rclpy.shutdown() - - -if __name__ == '__main__': - main() diff --git a/clearpath_tests/clearpath_tests/mobility_test.py b/clearpath_tests/clearpath_tests/mobility_test.py index 38e0e876..fca07ad6 100644 --- a/clearpath_tests/clearpath_tests/mobility_test.py +++ b/clearpath_tests/clearpath_tests/mobility_test.py @@ -58,6 +58,9 @@ def __init__(self, test_name, node_name, setup_path='/etc/clearpath'): # is the current test finished? self.ne = False + # motor currents sampled during the test + self.motor_currents = [] + # the results of the test, possibly collected from multiple sources self.test_results = [] @@ -81,7 +84,6 @@ def __init__(self, test_name, node_name, setup_path='/etc/clearpath'): self.drive_topic = f'/{self.namespace}/{self.drive_topic}' def start(self): - self.motor_currents = [] if self.platform == Platform.A300: self.motor_fb_sub = self.create_subscription( LynxMultiFeedback, @@ -158,9 +160,12 @@ def calculate_average_motor_currents(self): def get_test_result_details(self): details = '' details += '\n#### Average motor current draw during test\n\n' - avg = self.calculate_average_motor_currents() - for amps in avg: - details += f'* {amps:0.3f}A\n' + if len(self.motor_currents) == 0: + details += 'No motor current data recorded\n' + else: + avg = self.calculate_average_motor_currents() + for amps in avg: + details += f'* {amps:0.3f}A\n' return details def odom_callback(self, odom_msg): diff --git a/clearpath_tests/clearpath_tests/rotation_test.py b/clearpath_tests/clearpath_tests/rotation_test.py index 449ff218..e59841d6 100644 --- a/clearpath_tests/clearpath_tests/rotation_test.py +++ b/clearpath_tests/clearpath_tests/rotation_test.py @@ -28,13 +28,13 @@ # POSSIBILITY OF SUCH DAMAGE. from clearpath_config.common.types.platform import Platform -from clearpath_generator_common.common import BaseGenerator from clearpath_tests.mobility_test import MobilityTestNode from clearpath_tests.test_node import ClearpathTestResult from clearpath_tests.tf import ConfigurableTransformListener +from clearpath_tests.timer import Timeout + from geometry_msgs.msg import Vector3Stamped import rclpy -from rclpy.duration import Duration from rclpy.qos import qos_profile_sensor_data from sensor_msgs.msg import Imu from tf2_geometry_msgs import do_transform_vector3 @@ -140,14 +140,14 @@ def run_test(self): self.start() # wait until we get the first IMU message or 10s passes - start_time = self.get_clock().now() - timeout_duration = Duration(seconds=10) + timeout = Timeout(self, 10.0) while ( self.latest_imu is None - and self.get_clock().now() - start_time <= timeout_duration + and not timeout.elapsed and not self.test_error ): - rclpy.spin_once(self) + rclpy.spin_once(self, timeout_sec=1.0) + timeout.abort() if self.test_error: self.get_logger().warning(f'Test aborted due to an error: {self.test_error_msg}') @@ -162,13 +162,14 @@ def run_test(self): # start turning, but wait 1s for us to get up to speed before recording data self.cmd_vel.twist.angular.z = self.max_speed - startup_wait = Duration(seconds=1.0) - start_time = self.get_clock().now() + timeout = Timeout(self, 1.0) while ( not self.test_error - and self.get_clock().now() - start_time <= startup_wait + and not timeout.elapsed ): - rclpy.spin_once(self) + rclpy.spin_once(self, timeout_sec=1.0) + timeout.abort() + if self.test_error: self.record_data = False self.cmd_vel.twist.angular.z = 0.0 @@ -177,14 +178,15 @@ def run_test(self): # record data for 10s self.record_data = True - test_wait = Duration(seconds=10) - start_time = self.get_clock().now() + timeout = Timeout(self, 10.0) while ( not self.test_error - and self.get_clock().now() - start_time <= test_wait + and not timeout.elapsed ): - rclpy.spin_once(self) + rclpy.spin_once(self, timeout_sec=1.0) + timeout.abort() self.record_data = False + if self.test_error: self.cmd_vel.twist.angular.z = 0.0 self.get_logger().warning(f'Test aborted due to an error: {self.test_error_msg}') @@ -193,10 +195,10 @@ def run_test(self): # stop driving # wait 1s to ensure we publish the command self.cmd_vel.twist.angular.z = 0.0 - test_wait = Duration(seconds=1) - start_time = self.get_clock().now() - while self.get_clock().now() - start_time <= test_wait: - rclpy.spin_once(self) + timeout = Timeout(self, 1.0) + while not timeout.elapsed: + rclpy.spin_once(self, timeout_sec=1.0) + timeout.abort() # process the results results = self.test_results @@ -208,41 +210,30 @@ def run_test(self): )) else: avg_vel = sum(gyro.vector.z for gyro in self.gyro_samples) / len(self.gyro_samples) - min_accuracy = 0.8 + if avg_vel > 0.0: + results.append(ClearpathTestResult( + True, + f'{self.test_name} (direction)', + f'Angular velocity oriented correctly: {avg_vel:0.2f}rad/s' + )) + else: + results.append(ClearpathTestResult( + False, + f'{self.test_name} (direction)', + f'Angular velocity oriented incorrectly: {avg_vel:0.2f}rad/s' + )) + + min_accuracy = 0.75 if self.clearpath_config.platform.get_platform_model() == Platform.J100: # default Jackal IMU is terrible, so allow wider margins - min_accuracy = 0.6 + min_accuracy = 0.5 measured_accuracy = min(avg_vel, self.max_speed) / max(avg_vel, self.max_speed) results.append(ClearpathTestResult( measured_accuracy >= min_accuracy, - self.test_name, - f'Recorded angular velocity: {avg_vel}rad/s (accuracy: {measured_accuracy:0.2f})' + f'{self.test_name} (magnitude)', + f'Recorded angular velocity: {avg_vel:0.2f}rad/s (accuracy: {measured_accuracy:0.2f})' # noqa: E501 )) return results - - -def main(): - setup_path = BaseGenerator.get_args() - rclpy.init() - - try: - rt = RotationTestNode(setup_path) - rt.start() - try: - while not rt.test_done: - rclpy.spin_once(rt) - rt.get_logger().info('Test complete') - except KeyboardInterrupt: - rt.get_logger().info('User aborted! Cleaning up & exiting...') - rt.destroy_node() - except TimeoutError: - # This error is already logged when it's raised - pass - rclpy.shutdown() - - -if __name__ == '__main__': - main() diff --git a/clearpath_tests/clearpath_tests/test_node.py b/clearpath_tests/clearpath_tests/test_node.py index ea4d57e5..0fa8eeec 100644 --- a/clearpath_tests/clearpath_tests/test_node.py +++ b/clearpath_tests/clearpath_tests/test_node.py @@ -78,14 +78,6 @@ def get_test_result_details(self) -> str: """ return None - def start(self): - """ - Run this node normally as a standard ROS node without any user interaction. - - This function must be implemented by all children - """ - raise NotImplementedError() - @staticmethod def promptYN(message: str, default: str = 'Y'): """ diff --git a/clearpath_tests/clearpath_tests/timer.py b/clearpath_tests/clearpath_tests/timer.py new file mode 100644 index 00000000..33abe284 --- /dev/null +++ b/clearpath_tests/clearpath_tests/timer.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 +# Software License Agreement (BSD) +# +# @author Chris Iverach-Brereton +# @copyright (c) 2025, Clearpath Robotics, Inc., All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Clearpath Robotics nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +import threading +import time + +from rclpy.duration import Duration +from rclpy.node import Node + + +class Timeout: + """ + Reusable class for implementing basic timeouts during ROS operation. + + Starts a background thread that spins until the time has elapsed. + """ + + def __init__(self, node: Node, duration: float): + self.node = node + self.duration = Duration(seconds=duration) + self.__abort_signal = False + self.__is_elapsed = False + self.time_thread = threading.Thread( + target=self.run_timer + ) + self.time_thread.start() + + @property + def elapsed(self): + return self.__is_elapsed + + def abort(self): + self.__abort_signal = True + + def run_timer(self): + start_time = self.node.get_clock().now() + while ( + self.node.get_clock().now() - start_time < self.duration + and not self.__abort_signal + ): + time.sleep(0.01) + self.__is_elapsed = True diff --git a/clearpath_tests/clearpath_tests/wifi_test.py b/clearpath_tests/clearpath_tests/wifi_test.py index cb9c7c17..9ec91bb4 100644 --- a/clearpath_tests/clearpath_tests/wifi_test.py +++ b/clearpath_tests/clearpath_tests/wifi_test.py @@ -31,9 +31,8 @@ import subprocess import time -from clearpath_generator_common.common import BaseGenerator from clearpath_tests.test_node import ClearpathTestNode, ClearpathTestResult -import rclpy + from wireless_msgs.msg import Connection @@ -92,7 +91,11 @@ def check_connection(self, interface): if key == 'ESSID': c.essid = value.lstrip('"').rstrip('"') elif key == 'Tx-Power': - c.txpower = int(re.split(all_whitespace, value)[0]) + power = re.split(all_whitespace, value)[0] + if power == 'off': + c.txpower = 0 + else: + c.txpower = int(power) elif key == 'Bit Rate': unit = re.split(all_whitespace, value)[-1] multiplier = 1.0 @@ -184,23 +187,3 @@ def run_test(self): )) return results - - -def main(): - setup_path = BaseGenerator.get_args() - rclpy.init() - - wt = WifiTestNode(setup_path) - - try: - wt.start() - rclpy.spin(wt) - except KeyboardInterrupt: - pass - - wt.destroy_node() - rclpy.shutdown() - - -if __name__ == '__main__': - main()