9
9
10
10
import launch_testing
11
11
from launch import LaunchDescription
12
- from launch .actions import IncludeLaunchDescription
13
- from launch .launch_description_sources import PythonLaunchDescriptionSource
14
- import psutil
12
+ from launch .actions import (
13
+ IncludeLaunchDescription ,
14
+ )
15
+ from launch .launch_description_sources import (
16
+ PythonLaunchDescriptionSource ,
17
+ )
15
18
16
19
import rclpy
17
- from geometry_msgs .msg import Twist
20
+ from std_msgs .msg import Bool
21
+ from geometry_msgs .msg import Twist , PoseStamped
18
22
from sensor_msgs .msg import Imu
19
23
24
+ from rclpy .qos import (
25
+ QoSProfile ,
26
+ QoSReliabilityPolicy ,
27
+ QoSHistoryPolicy ,
28
+ )
29
+
30
+ from test .utils import stop_gazebo
31
+
20
32
21
33
@pytest .mark .launch_test
22
34
def generate_test_description ():
@@ -62,7 +74,7 @@ def setUpClass(cls):
62
74
cls .node = rclpy .create_node ("test_node" )
63
75
64
76
# wait for topics to be setup
65
- time .sleep (10 )
77
+ time .sleep (15 )
66
78
67
79
@classmethod
68
80
def tearDownClass (cls ):
@@ -71,10 +83,7 @@ def tearDownClass(cls):
71
83
cls .node .destroy_node ()
72
84
rclpy .shutdown ()
73
85
# Stop any running Gazebo processes
74
- for proc in psutil .process_iter (["name" ]):
75
- if proc .info ["name" ] in ["gzserver" , "gazebo" , "ign gazebo" , "gz" ]:
76
- print (f"Stopping gzserver (PID={ proc .pid } )" )
77
- proc .terminate ()
86
+ stop_gazebo ()
78
87
79
88
def test_imu_works (self , proc_output ):
80
89
"""Test that the imu topic works."""
@@ -89,12 +98,19 @@ def test_imu_works(self, proc_output):
89
98
90
99
# Wait for messages (up to 1 second)
91
100
for _ in range (10 ):
92
- rclpy .spin_once (self .__class__ .node , timeout_sec = 0.1 )
101
+ rclpy .spin_once (
102
+ self .__class__ .node ,
103
+ timeout_sec = 0.1 ,
104
+ )
93
105
if msgs :
94
106
break
95
107
96
108
# Check that we received msgs
97
- self .assertGreater (len (msgs ), 0 , msg = "No IMU messages received." )
109
+ self .assertGreater (
110
+ len (msgs ),
111
+ 0 ,
112
+ msg = "No IMU messages received." ,
113
+ )
98
114
99
115
# Clean up subscriptions
100
116
self .__class__ .node .destroy_subscription (sub )
@@ -103,33 +119,67 @@ def test_movement_works(self, proc_output):
103
119
"""Test that the velocity topic works correctly."""
104
120
msgs = []
105
121
122
+ sub_qos_profile = QoSProfile (
123
+ reliability = QoSReliabilityPolicy .BEST_EFFORT ,
124
+ history = QoSHistoryPolicy .KEEP_LAST ,
125
+ depth = 10 ,
126
+ )
127
+
128
+ pub_qos_profile = QoSProfile (
129
+ reliability = QoSReliabilityPolicy .RELIABLE ,
130
+ history = QoSHistoryPolicy .KEEP_LAST ,
131
+ depth = 10 ,
132
+ )
133
+
106
134
# Create a subscription to the velocity topic
107
135
sub = self .__class__ .node .create_subscription (
108
- Twist ,
109
- "/gz/ drone0/cmd_vel " ,
136
+ PoseStamped ,
137
+ "/drone0/ground_truth/pose " ,
110
138
lambda msg : msgs .append (msg ),
111
- qos_profile = 10 ,
139
+ qos_profile = sub_qos_profile ,
140
+ )
141
+
142
+ arm_pub = self .__class__ .node .create_publisher (
143
+ Bool ,
144
+ "/gz/drone0/arm" ,
145
+ qos_profile = pub_qos_profile ,
112
146
)
113
147
114
148
# Create a publisher to the velocity topic
115
149
pub = self .__class__ .node .create_publisher (
116
150
Twist ,
117
151
"/gz/drone0/cmd_vel" ,
118
- qos_profile = 10 ,
152
+ qos_profile = pub_qos_profile ,
119
153
)
120
154
121
155
# Publish a high velocity message to move the vehicle
122
156
twist_msg = Twist ()
123
- twist_msg .linear .x = 45 .0 # Move forward at 45 m/s
124
- twist_msg .angular .z = 120.0 # Turn at 120 rad/s
157
+ twist_msg .linear .x = 1 .0 # Move forward at 1 m/s
158
+ twist_msg .angular .z = 0.1 # Turn at 10 rad/s
125
159
160
+ arm_pub .publish (Bool (data = True ))
161
+ rclpy .spin_once (
162
+ self .__class__ .node ,
163
+ timeout_sec = 0.1 ,
164
+ )
126
165
# Wait for a short time to allow odometry updates
127
- for _ in range (15 ):
166
+ for _ in range (100 ):
128
167
pub .publish (twist_msg )
129
- rclpy .spin_once (self .__class__ .node , timeout_sec = 0.1 )
130
-
168
+ rclpy .spin_once (
169
+ self .__class__ .node ,
170
+ timeout_sec = 0.1 ,
171
+ )
131
172
# Check that we received some messages
132
- self .assertNotEqual (len (msgs ), 0 , msg = "No cmd_vel messages received." )
173
+ self .assertNotAlmostEqual (
174
+ msgs [- 1 ].pose .position .x ,
175
+ msgs [0 ].pose .position .x ,
176
+ msg = "Drone did not move." ,
177
+ )
178
+ self .assertNotEqual (
179
+ len (msgs ),
180
+ 0 ,
181
+ msg = "No cmd_vel messages received." ,
182
+ )
133
183
134
184
# Clean up subscriptions
135
185
self .__class__ .node .destroy_subscription (sub )
0 commit comments