Skip to content
This repository was archived by the owner on Jul 26, 2024. It is now read-only.

Use a separate thread to pop k4abt_tracker result. Fix #109. #161

Merged
merged 5 commits into from
Oct 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/azure_kinect_ros_driver/k4a_ros_device.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ class K4AROSDevice
sensor_msgs::PointCloud2Ptr& point_cloud);

void framePublisherThread();
#if defined(K4A_BODY_TRACKING)
void bodyPublisherThread();
#endif
void imuPublisherThread();

// Gets a timestap from one of the captures images
Expand Down Expand Up @@ -157,6 +160,8 @@ class K4AROSDevice
#if defined(K4A_BODY_TRACKING)
// Body tracker
k4abt::tracker k4abt_tracker_;
std::atomic_int16_t k4abt_tracker_queue_size_;
std::thread body_publisher_thread_;
#endif

std::chrono::nanoseconds device_to_realtime_offset_{0};
Expand Down
176 changes: 83 additions & 93 deletions src/k4a_ros_device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ K4AROSDevice::K4AROSDevice(const NodeHandle& n, const NodeHandle& p)
// clang-format off
#if defined(K4A_BODY_TRACKING)
k4abt_tracker_(nullptr),
k4abt_tracker_queue_size_(0),
#endif
// clang-format on
node_(n),
Expand Down Expand Up @@ -312,6 +313,13 @@ K4AROSDevice::~K4AROSDevice()
// Start tearing down the publisher threads
running_ = false;

#if defined(K4A_BODY_TRACKING)
// Join the publisher thread
ROS_INFO("Joining body publisher thread");
body_publisher_thread_.join();
ROS_INFO("Body publisher thread joined");
#endif

// Join the publisher thread
ROS_INFO("Joining camera publisher thread");
frame_publisher_thread_.join();
Expand Down Expand Up @@ -388,6 +396,9 @@ k4a_result_t K4AROSDevice::startCameras()

// Start the thread that will poll the cameras and publish frames
frame_publisher_thread_ = thread(&K4AROSDevice::framePublisherThread, this);
#if defined(K4A_BODY_TRACKING)
body_publisher_thread_ = thread(&K4AROSDevice::bodyPublisherThread, this);
#endif

return K4A_RESULT_SUCCEEDED;
}
Expand Down Expand Up @@ -596,7 +607,6 @@ k4a_result_t K4AROSDevice::getRgbPointCloudInDepthFrame(const k4a::capture& capt

point_cloud->header.frame_id = calibration_data_.tf_prefix_ + calibration_data_.depth_camera_frame_;
point_cloud->header.stamp = timestampToROS(k4a_depth_frame.get_device_timestamp());
printTimestampDebugMessage("RGB point cloud", point_cloud->header.stamp);

return fillColorPointCloud(calibration_data_.point_cloud_image_, calibration_data_.transformed_rgb_image_,
point_cloud);
Expand Down Expand Up @@ -629,7 +639,6 @@ k4a_result_t K4AROSDevice::getRgbPointCloudInRgbFrame(const k4a::capture& captur

point_cloud->header.frame_id = calibration_data_.tf_prefix_ + calibration_data_.rgb_camera_frame_;
point_cloud->header.stamp = timestampToROS(k4a_depth_frame.get_device_timestamp());
printTimestampDebugMessage("RGB point cloud", point_cloud->header.stamp);

return fillColorPointCloud(calibration_data_.point_cloud_image_, k4a_bgra_frame, point_cloud);
}
Expand All @@ -646,7 +655,6 @@ k4a_result_t K4AROSDevice::getPointCloud(const k4a::capture& capture, sensor_msg

point_cloud->header.frame_id = calibration_data_.tf_prefix_ + calibration_data_.depth_camera_frame_;
point_cloud->header.stamp = timestampToROS(k4a_depth_frame.get_device_timestamp());
printTimestampDebugMessage("Point cloud", point_cloud->header.stamp);

// Tranform depth image to point cloud
calibration_data_.k4a_transformation_.depth_image_to_point_cloud(k4a_depth_frame, K4A_CALIBRATION_TYPE_DEPTH,
Expand Down Expand Up @@ -758,7 +766,6 @@ k4a_result_t K4AROSDevice::getImuFrame(const k4a_imu_sample_t& sample, sensor_ms
{
imu_msg->header.frame_id = calibration_data_.tf_prefix_ + calibration_data_.imu_frame_;
imu_msg->header.stamp = timestampToROS(sample.acc_timestamp_usec);
printTimestampDebugMessage("IMU", imu_msg->header.stamp);

// The correct convention in ROS is to publish the raw sensor data, in the
// sensor coordinate frame. Do that here.
Expand Down Expand Up @@ -989,7 +996,6 @@ void K4AROSDevice::framePublisherThread()
else if (result == K4A_RESULT_SUCCEEDED)
{
capture_time = timestampToROS(capture.get_ir_image().get_device_timestamp());
printTimestampDebugMessage("IR image", capture_time);

// Re-sychronize the timestamps with the capture timestamp
ir_raw_camera_info.header.stamp = capture_time;
Expand Down Expand Up @@ -1021,7 +1027,6 @@ void K4AROSDevice::framePublisherThread()
else if (result == K4A_RESULT_SUCCEEDED)
{
capture_time = timestampToROS(capture.get_depth_image().get_device_timestamp());
printTimestampDebugMessage("Depth image", capture_time);

// Re-sychronize the timestamps with the capture timestamp
depth_raw_camera_info.header.stamp = capture_time;
Expand Down Expand Up @@ -1053,7 +1058,6 @@ void K4AROSDevice::framePublisherThread()
else if (result == K4A_RESULT_SUCCEEDED)
{
capture_time = timestampToROS(capture.get_depth_image().get_device_timestamp());
printTimestampDebugMessage("Depth image", capture_time);

depth_rect_frame->header.stamp = capture_time;
depth_rect_frame->header.frame_id = calibration_data_.tf_prefix_ + calibration_data_.rgb_camera_frame_;
Expand All @@ -1067,11 +1071,9 @@ void K4AROSDevice::framePublisherThread()

#if defined(K4A_BODY_TRACKING)
// Publish body markers when body tracking is enabled and a depth image is available
if (params_.body_tracking_enabled &&
if (params_.body_tracking_enabled && k4abt_tracker_queue_size_ < 3 &&
(body_marker_publisher_.getNumSubscribers() > 0 || body_index_map_publisher_.getNumSubscribers() > 0))
{
capture_time = timestampToROS(capture.get_depth_image().get_device_timestamp());

if (!k4abt_tracker_.enqueue_capture(capture))
{
ROS_ERROR("Error! Add capture to tracker process queue failed!");
Expand All @@ -1080,56 +1082,7 @@ void K4AROSDevice::framePublisherThread()
}
else
{
k4abt::frame body_frame = k4abt_tracker_.pop_result();
if (body_frame == nullptr)
{
ROS_ERROR_STREAM("Pop body frame result failed!");
ros::shutdown();
return;
}
else
{
if (body_marker_publisher_.getNumSubscribers() > 0)
{
// Joint marker array
MarkerArrayPtr markerArrayPtr(new MarkerArray);
auto num_bodies = body_frame.get_num_bodies();
for (size_t i = 0; i < num_bodies; ++i)
{
k4abt_body_t body = body_frame.get_body(i);
for (int j = 0; j < (int) K4ABT_JOINT_COUNT; ++j)
{
MarkerPtr markerPtr(new Marker);
getBodyMarker(body, markerPtr, j, capture_time);
markerArrayPtr->markers.push_back(*markerPtr);
}
}
body_marker_publisher_.publish(markerArrayPtr);
}

if (body_index_map_publisher_.getNumSubscribers() > 0)
{
// Body index map
ImagePtr body_index_map_frame(new Image);
result = getBodyIndexMap(body_frame, body_index_map_frame);

if (result != K4A_RESULT_SUCCEEDED)
{
ROS_ERROR_STREAM("Failed to get body index map");
ros::shutdown();
return;
}
else if (result == K4A_RESULT_SUCCEEDED)
{
// Re-sychronize the timestamps with the capture timestamp
body_index_map_frame->header.stamp = capture_time;
body_index_map_frame->header.frame_id =
calibration_data_.tf_prefix_ + calibration_data_.depth_camera_frame_;

body_index_map_publisher_.publish(body_index_map_frame);
}
}
}
++k4abt_tracker_queue_size_;
}
}
#endif
Expand All @@ -1155,7 +1108,6 @@ void K4AROSDevice::framePublisherThread()
}

capture_time = timestampToROS(capture.get_color_image().get_device_timestamp());
printTimestampDebugMessage("Color image", capture_time);

rgb_jpeg_frame->header.stamp = capture_time;
rgb_jpeg_frame->header.frame_id = calibration_data_.tf_prefix_ + calibration_data_.rgb_camera_frame_;
Expand All @@ -1181,7 +1133,6 @@ void K4AROSDevice::framePublisherThread()
}

capture_time = timestampToROS(capture.get_color_image().get_device_timestamp());
printTimestampDebugMessage("Color image", capture_time);

rgb_raw_frame->header.stamp = capture_time;
rgb_raw_frame->header.frame_id = calibration_data_.tf_prefix_ + calibration_data_.rgb_camera_frame_;
Expand Down Expand Up @@ -1209,7 +1160,6 @@ void K4AROSDevice::framePublisherThread()
}

capture_time = timestampToROS(capture.get_color_image().get_device_timestamp());
printTimestampDebugMessage("Color image", capture_time);

rgb_rect_frame->header.stamp = capture_time;
rgb_rect_frame->header.frame_id = calibration_data_.tf_prefix_ + calibration_data_.depth_camera_frame_;
Expand Down Expand Up @@ -1276,6 +1226,76 @@ void K4AROSDevice::framePublisherThread()
}
}

#if defined(K4A_BODY_TRACKING)
void K4AROSDevice::bodyPublisherThread()
{
while (running_ && ros::ok() && !ros::isShuttingDown())
{
if (k4abt_tracker_queue_size_ > 0)
{
k4abt::frame body_frame = k4abt_tracker_.pop_result();
--k4abt_tracker_queue_size_;

if (body_frame == nullptr)
{
ROS_ERROR_STREAM("Pop body frame result failed!");
ros::shutdown();
return;
}
else
{
auto capture_time = timestampToROS(body_frame.get_device_timestamp());

if (body_marker_publisher_.getNumSubscribers() > 0)
{
// Joint marker array
MarkerArrayPtr markerArrayPtr(new MarkerArray);
auto num_bodies = body_frame.get_num_bodies();
for (size_t i = 0; i < num_bodies; ++i)
{
k4abt_body_t body = body_frame.get_body(i);
for (int j = 0; j < (int) K4ABT_JOINT_COUNT; ++j)
{
MarkerPtr markerPtr(new Marker);
getBodyMarker(body, markerPtr, j, capture_time);
markerArrayPtr->markers.push_back(*markerPtr);
}
}
body_marker_publisher_.publish(markerArrayPtr);
}

if (body_index_map_publisher_.getNumSubscribers() > 0)
{
// Body index map
ImagePtr body_index_map_frame(new Image);
auto result = getBodyIndexMap(body_frame, body_index_map_frame);

if (result != K4A_RESULT_SUCCEEDED)
{
ROS_ERROR_STREAM("Failed to get body index map");
ros::shutdown();
return;
}
else if (result == K4A_RESULT_SUCCEEDED)
{
// Re-sychronize the timestamps with the capture timestamp
body_index_map_frame->header.stamp = capture_time;
body_index_map_frame->header.frame_id =
calibration_data_.tf_prefix_ + calibration_data_.depth_camera_frame_;

body_index_map_publisher_.publish(body_index_map_frame);
}
}
}
}
else
{
std::this_thread::sleep_for(std::chrono::milliseconds{ 20 });
}
}
}
#endif

k4a_imu_sample_t K4AROSDevice::computeMeanIMUSample(const std::vector<k4a_imu_sample_t>& samples)
{
// Compute mean sample
Expand Down Expand Up @@ -1503,33 +1523,3 @@ void K4AROSDevice::updateTimestampOffset(const std::chrono::microseconds& k4a_de
std::floor(alpha * (device_to_realtime - device_to_realtime_offset_).count())));
}
}

void printTimestampDebugMessage(const std::string& name, const ros::Time& timestamp)
{
ros::Duration lag = ros::Time::now() - timestamp;
static std::map<const std::string, std::pair<ros::Duration, ros::Duration>> map_min_max;
auto it = map_min_max.find(name);
if (it == map_min_max.end())
{
map_min_max[name] = std::make_pair(lag, lag);
it = map_min_max.find(name);
}
else
{
auto& min_lag = it->second.first;
auto& max_lag = it->second.second;
if (lag < min_lag)
{
min_lag = lag;
}
if (lag > max_lag)
{
max_lag = lag;
}
}

ROS_DEBUG_STREAM(name << " timestamp lags ros::Time::now() by\n"
<< std::setw(23) << lag.toSec() * 1000.0 << " ms. "
<< "The lag ranges from " << it->second.first.toSec() * 1000.0 << "ms"
<< " to " << it->second.second.toSec() * 1000.0 << "ms.");
}