Skip to content

Allow bidirectional traffic #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions nimbro_topic_transport/launch/bidirectional_topics.launch
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<launch>
<arg name="allow_bidirectional" default="true" />

<!--
If you set up transports for a topic in both directions, don't forget to
provide the `ignored_publishers` parameter to both senders. Otherwise,
nimbro would enter an infinite republishing loop. You can test it by
playing with the `allow_bidirectional` argument.

The `ignored_publishers` sender parameter is a list of node-names of
all receivers receiving any of the topics this publisher publishes.
-->
<include file="$(find nimbro_topic_transport)/launch/tcp_receiver.launch" />
<include file="$(find nimbro_topic_transport)/launch/tcp_sender.launch">
<arg name="allow_bidirectional" value="$(arg allow_bidirectional)" />
</include>

<include file="$(find nimbro_topic_transport)/launch/tcp_receiver_back.launch" />
<include file="$(find nimbro_topic_transport)/launch/tcp_sender_back.launch">
<arg name="allow_bidirectional" value="$(arg allow_bidirectional)" />
</include>
</launch>
11 changes: 11 additions & 0 deletions nimbro_topic_transport/launch/tcp_receiver_back.launch
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<launch>
<node name="tcp_receiver_back" pkg="nimbro_topic_transport" type="tcp_receiver" output="screen">
<param name="port" value="17002" />

<!-- Remap topics so that sender & receiver do not clash if run on the
same machine. This is not necessary in a typical setup :-)
-->
<remap from="/recv/my_first_topic" to="/my_first_topic" />
<remap from="/recv/my_second_topic" to="/my_second_topic" />
</node>
</launch>
6 changes: 6 additions & 0 deletions nimbro_topic_transport/launch/tcp_sender.launch
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<launch>
<arg name="target" default="localhost" />
<arg name="allow_bidirectional" default="true" />

<!-- The UDP sender node -->
<node name="tcp_sender" pkg="nimbro_topic_transport" type="tcp_sender" output="screen">
Expand All @@ -10,5 +11,10 @@

<!-- Load the list of topics from a YAML file -->
<rosparam command="load" file="$(find nimbro_topic_transport)/launch/topics.yaml" />

<!-- If bidirectional traffic over a topic is expected, fill this parameter.
See details in bidirectional_topics.launch -->
<rosparam param="ignored_publishers" if="$(arg allow_bidirectional)">["/tcp_receiver_back"]</rosparam>
<rosparam param="ignored_publishers" unless="$(arg allow_bidirectional)">[]</rosparam>
</node>
</launch>
20 changes: 20 additions & 0 deletions nimbro_topic_transport/launch/tcp_sender_back.launch
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<launch>
<arg name="target" default="localhost" />
<arg name="allow_bidirectional" default="true" />

<!-- The UDP sender node -->
<node name="tcp_sender_back" pkg="nimbro_topic_transport" type="tcp_sender" output="screen">

<!-- The destination host name or IP address -->
<param name="destination_addr" value="$(arg target)" />
<param name="destination_port" value="17002" />

<!-- Load the list of topics from a YAML file -->
<rosparam command="load" file="$(find nimbro_topic_transport)/launch/topics_back.yaml" />

<!-- If bidirectional traffic over a topic is expected, fill this parameter.
See details in bidirectional_topics.launch -->
<rosparam param="ignored_publishers" if="$(arg allow_bidirectional)">["/tcp_receiver"]</rosparam>
<rosparam param="ignored_publishers" unless="$(arg allow_bidirectional)">[]</rosparam>
</node>
</launch>
1 change: 1 addition & 0 deletions nimbro_topic_transport/launch/topics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ topics:
compress: true # enable bz2 compression
rate: 1.0 # rate limit at 1Hz
- name: "/my_second_topic"
latch: True
6 changes: 6 additions & 0 deletions nimbro_topic_transport/launch/topics_back.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
topics:
- name: "/recv/my_first_topic"
compress: true # enable bz2 compression
rate: 1.0 # rate limit at 1Hz
- name: "/recv/my_second_topic"
latch: True
3 changes: 2 additions & 1 deletion nimbro_topic_transport/src/tcp/tcp_packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ namespace nimbro_topic_transport

enum TCPFlag
{
TCP_FLAG_COMPRESSED = (1 << 0)
TCP_FLAG_COMPRESSED = (1 << 0),
TCP_FLAG_LATCHED = (1 << 1)
};

struct TCPHeader
Expand Down
4 changes: 4 additions & 0 deletions nimbro_topic_transport/src/tcp/tcp_receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,10 @@ void TCPReceiver::ClientHandler::run()
topic_info::getMsgDef(type)
);

if (header.flags() & TCP_FLAG_LATCHED) {
options.latch = true;
}

// It will take subscribers some time to connect to our publisher.
// Therefore, latch messages so they will not be lost.
// No, this is often unexpected. Instead, wait before publishing.
Expand Down
63 changes: 57 additions & 6 deletions nimbro_topic_transport/src/tcp/tcp_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <netinet/tcp.h>
#include <boost/algorithm/string/replace.hpp>

#include "ros/message_traits.h"

#include <netdb.h>

namespace nimbro_topic_transport
Expand Down Expand Up @@ -77,11 +79,14 @@ TCPSender::TCPSender()
if(entry.hasMember("compress") && ((bool)entry["compress"]) == true)
flags |= TCP_FLAG_COMPRESSED;

boost::function<void(const topic_tools::ShapeShifter::ConstPtr&)> func;
func = boost::bind(&TCPSender::send, this, topic, flags, _1);
if(entry.hasMember("latch") && ((bool)entry["latch"]) == true)
flags |= TCP_FLAG_LATCHED;

boost::function<void(const ros::MessageEvent<topic_tools::ShapeShifter const>&)> func;
func = boost::bind(&TCPSender::messageCallback, this, topic, flags, _1);

ros::SubscribeOptions options;
options.initByFullCallbackType<topic_tools::ShapeShifter::ConstPtr>(topic, 20, func);
options.initByFullCallbackType<const ros::MessageEvent<topic_tools::ShapeShifter const>&>(topic, 20, func);

if(entry.hasMember("type"))
{
Expand Down Expand Up @@ -114,6 +119,9 @@ TCPSender::TCPSender()
#endif
}

if (m_nh.hasParam("ignored_publishers"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please indent using tabs.

m_nh.getParam("ignored_publishers", m_ignoredPubs);

char hostnameBuf[256];
gethostname(hostnameBuf, sizeof(hostnameBuf));
hostnameBuf[sizeof(hostnameBuf)-1] = 0;
Expand All @@ -138,7 +146,7 @@ TCPSender::TCPSender()
boost::bind(&TCPSender::updateStats, this)
);
m_statsTimer.start();
}
}

TCPSender::~TCPSender()
{
Expand Down Expand Up @@ -218,6 +226,8 @@ bool TCPSender::connect()
ROS_WARN("Not setting TCP_USER_TIMEOUT");
#endif

this->sendLatched();

return true;
}

Expand All @@ -234,7 +244,31 @@ class PtrStream
uint8_t* m_ptr;
};

void TCPSender::send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter)
void TCPSender::messageCallback(const std::string& topic, int flags,
const ros::MessageEvent<topic_tools::ShapeShifter const>& event)
{
#if WITH_CONFIG_SERVER
if (! (*m_enableTopic[topic])() )
return;
#endif

if (m_ignoredPubs.size() > 0) // check if the message wasn't published by an ignored publisher
{
std::string messagePublisher = event.getConnectionHeader()["callerid"];
for (std::vector<std::string>::iterator ignoredPublisher = m_ignoredPubs.begin();
ignoredPublisher != m_ignoredPubs.end(); ignoredPublisher++) {
if (messagePublisher == *ignoredPublisher) {
return;
}
}
}

this->send(topic, flags, event.getMessage());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please get rid of this->.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did it in the new PR. Just curious, why is it better?

}


void TCPSender::send(const std::string& topic, int flags, topic_tools::ShapeShifter::ConstPtr shifter,
const bool reconnect)
{
#if WITH_CONFIG_SERVER
if (! (*m_enableTopic[topic])() )
Expand All @@ -250,6 +284,9 @@ void TCPSender::send(const std::string& topic, int flags, const topic_tools::Sha
if(flags & TCP_FLAG_COMPRESSED)
maxDataSize = size + size / 100 + 1200; // taken from bzip2 docs

if (flags & TCP_FLAG_LATCHED)
this->m_latchedMessages[topic] = std::make_pair(shifter, flags);

m_packet.resize(
sizeof(TCPHeader) + topic.length() + type.length() + maxDataSize
);
Expand Down Expand Up @@ -312,10 +349,12 @@ void TCPSender::send(const std::string& topic, int flags, const topic_tools::Sha
{
if(m_fd == -1)
{
if(!connect())
if(reconnect && !connect())
{
ROS_WARN("Connection failed, trying again");
continue;
} else if (!reconnect) {
break;
}
}

Expand Down Expand Up @@ -363,6 +402,18 @@ void TCPSender::updateStats()
m_sentBytesInStatsInterval = 0;
}

void TCPSender::sendLatched() {

std::map<std::string, std::pair<topic_tools::ShapeShifter::ConstPtr, int> >::iterator it =
this->m_latchedMessages.begin();

// send all latched messages
while (it != this->m_latchedMessages.end()) {
this->send(it->first, it->second.second, it->second.first, false);
it++;
}
}

}


Expand Down
13 changes: 12 additions & 1 deletion nimbro_topic_transport/src/tcp/tcp_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
#include <config_server/parameter.h>
#endif

#include <map>

#include "ros/message_event.h"

#include <nimbro_topic_transport/SenderStats.h>

namespace nimbro_topic_transport
Expand All @@ -27,7 +31,11 @@ class TCPSender

bool connect();

void send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter);
void send(const std::string& topic, int flags, topic_tools::ShapeShifter::ConstPtr shifter,
const bool reconnect = true);
void messageCallback(const std::string& topic, int flags,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent using tabs.

const ros::MessageEvent<topic_tools::ShapeShifter const>& shifter);
void sendLatched();
private:
void updateStats();

Expand All @@ -42,6 +50,9 @@ class TCPSender
std::vector<ros::Subscriber> m_subs;
std::vector<uint8_t> m_packet;
std::vector<uint8_t> m_compressionBuf;
std::vector<std::string> m_ignoredPubs;

std::map<std::string, std::pair<topic_tools::ShapeShifter::ConstPtr, int> > m_latchedMessages;

#if WITH_CONFIG_SERVER
std::map<std::string, boost::shared_ptr<config_server::Parameter<bool>>> m_enableTopic;
Expand Down
4 changes: 4 additions & 0 deletions nimbro_topic_transport/src/udp/udp_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ UDPSender::UDPSender()
if(list[i].hasMember("type"))
type = (std::string)(list[i]["type"]);

if(list[i].hasMember("latch"))
ROS_WARN_STREAM("Ignoring 'latch' flag at UDP topic " << ((std::string)list[i]["name"]).c_str() <<
" (UDP topics can't be latched).");

TopicSender* sender = new TopicSender(this, &nh, list[i]["name"], rate, resend, flags, enabled, type);

if(m_relayMode)
Expand Down