From bd6e85685a219790966452426b11c2161be6bb37 Mon Sep 17 00:00:00 2001 From: Martin Pecka Date: Sat, 29 Oct 2016 13:28:13 +0200 Subject: [PATCH 1/3] Implemented the "server" part of latched topics support. --- nimbro_topic_transport/CMakeLists.txt | 4 +++ nimbro_topic_transport/src/tcp/tcp_packet.h | 3 +- .../src/tcp/tcp_receiver.cpp | 4 +++ nimbro_topic_transport/src/tcp/tcp_sender.cpp | 28 ++++++++++++++++++- nimbro_topic_transport/src/tcp/tcp_sender.h | 8 ++++++ .../srv/LatchedMessageRequest.srv | 3 ++ 6 files changed, 48 insertions(+), 2 deletions(-) create mode 100644 nimbro_topic_transport/srv/LatchedMessageRequest.srv diff --git a/nimbro_topic_transport/CMakeLists.txt b/nimbro_topic_transport/CMakeLists.txt index a856dec..b01ade0 100644 --- a/nimbro_topic_transport/CMakeLists.txt +++ b/nimbro_topic_transport/CMakeLists.txt @@ -16,6 +16,10 @@ add_message_files(FILES TopicBandwidth.msg ) +add_service_files(FILES + LatchedMessageRequest.srv +) + generate_messages(DEPENDENCIES std_msgs ) diff --git a/nimbro_topic_transport/src/tcp/tcp_packet.h b/nimbro_topic_transport/src/tcp/tcp_packet.h index b77be62..851fae5 100644 --- a/nimbro_topic_transport/src/tcp/tcp_packet.h +++ b/nimbro_topic_transport/src/tcp/tcp_packet.h @@ -11,7 +11,8 @@ namespace nimbro_topic_transport enum TCPFlag { - TCP_FLAG_COMPRESSED = (1 << 0) + TCP_FLAG_COMPRESSED = (1 << 0), + TCP_FLAG_LATCHED = (1 << 1) }; struct TCPHeader diff --git a/nimbro_topic_transport/src/tcp/tcp_receiver.cpp b/nimbro_topic_transport/src/tcp/tcp_receiver.cpp index 135eabb..e9605ec 100644 --- a/nimbro_topic_transport/src/tcp/tcp_receiver.cpp +++ b/nimbro_topic_transport/src/tcp/tcp_receiver.cpp @@ -345,6 +345,10 @@ void TCPReceiver::ClientHandler::run() topic_info::getMsgDef(type) ); + if (header.flags() & TCP_FLAG_LATCHED) { + options.latch = true; + } + // It will take subscribers some time to connect to our publisher. // Therefore, latch messages so they will not be lost. // No, this is often unexpected. Instead, wait before publishing. diff --git a/nimbro_topic_transport/src/tcp/tcp_sender.cpp b/nimbro_topic_transport/src/tcp/tcp_sender.cpp index 13f8cc0..99fe6cf 100644 --- a/nimbro_topic_transport/src/tcp/tcp_sender.cpp +++ b/nimbro_topic_transport/src/tcp/tcp_sender.cpp @@ -77,6 +77,9 @@ TCPSender::TCPSender() if(entry.hasMember("compress") && ((bool)entry["compress"]) == true) flags |= TCP_FLAG_COMPRESSED; + if(entry.hasMember("latch") && ((bool)entry["latch"]) == true) + flags |= TCP_FLAG_LATCHED; + boost::function func; func = boost::bind(&TCPSender::send, this, topic, flags, _1); @@ -138,7 +141,10 @@ TCPSender::TCPSender() boost::bind(&TCPSender::updateStats, this) ); m_statsTimer.start(); -} + + m_latchedMessageRequestServer = m_nh.advertiseService( + "request_latched_message", &TCPSender::sendLatched, this); +} TCPSender::~TCPSender() { @@ -250,6 +256,9 @@ void TCPSender::send(const std::string& topic, int flags, const topic_tools::Sha if(flags & TCP_FLAG_COMPRESSED) maxDataSize = size + size / 100 + 1200; // taken from bzip2 docs + if (flags & TCP_FLAG_LATCHED) + this->m_latchedMessages[topic] = std::make_pair(shifter, flags); + m_packet.resize( sizeof(TCPHeader) + topic.length() + type.length() + maxDataSize ); @@ -363,6 +372,23 @@ void TCPSender::updateStats() m_sentBytesInStatsInterval = 0; } +bool TCPSender::sendLatched(nimbro_topic_transport::LatchedMessageRequest::Request& request, + nimbro_topic_transport::LatchedMessageRequest::Response& response) { + + if (this->m_latchedMessages.find(request.topic_name) == this->m_latchedMessages.end()) { + response.message_sent = false; + return true; + } + + std::pair record = + this->m_latchedMessages.find(request.topic_name)->second; + + this->send(request.topic_name, record.second, record.first); + response.message_sent = true; + + return true; +} + } diff --git a/nimbro_topic_transport/src/tcp/tcp_sender.h b/nimbro_topic_transport/src/tcp/tcp_sender.h index ebedf5b..039ee70 100644 --- a/nimbro_topic_transport/src/tcp/tcp_sender.h +++ b/nimbro_topic_transport/src/tcp/tcp_sender.h @@ -14,7 +14,10 @@ #include #endif +#include + #include +#include namespace nimbro_topic_transport { @@ -28,6 +31,8 @@ class TCPSender bool connect(); void send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter); + bool sendLatched(nimbro_topic_transport::LatchedMessageRequest::Request& request, + nimbro_topic_transport::LatchedMessageRequest::Response& response); private: void updateStats(); @@ -43,6 +48,8 @@ class TCPSender std::vector m_packet; std::vector m_compressionBuf; + std::map > m_latchedMessages; + #if WITH_CONFIG_SERVER std::map>> m_enableTopic; #endif @@ -53,6 +60,7 @@ class TCPSender ros::WallTimer m_statsTimer; uint64_t m_sentBytesInStatsInterval; std::map m_topicSendBytesInStatsInteral; + ros::ServiceServer m_latchedMessageRequestServer; }; } diff --git a/nimbro_topic_transport/srv/LatchedMessageRequest.srv b/nimbro_topic_transport/srv/LatchedMessageRequest.srv new file mode 100644 index 0000000..7485df3 --- /dev/null +++ b/nimbro_topic_transport/srv/LatchedMessageRequest.srv @@ -0,0 +1,3 @@ +string topic_name +--- +bool message_sent \ No newline at end of file From 9311af6ffb6ffd16500101617dd687a1608b2d7b Mon Sep 17 00:00:00 2001 From: Martin Pecka Date: Sat, 29 Oct 2016 18:59:31 +0200 Subject: [PATCH 2/3] Finished the latched topic implementation. --- nimbro_topic_transport/CMakeLists.txt | 4 --- nimbro_topic_transport/launch/topics.yaml | 1 + nimbro_topic_transport/src/tcp/tcp_sender.cpp | 35 +++++++++---------- nimbro_topic_transport/src/tcp/tcp_sender.h | 8 ++--- nimbro_topic_transport/src/udp/udp_sender.cpp | 4 +++ .../srv/LatchedMessageRequest.srv | 3 -- 6 files changed, 24 insertions(+), 31 deletions(-) delete mode 100644 nimbro_topic_transport/srv/LatchedMessageRequest.srv diff --git a/nimbro_topic_transport/CMakeLists.txt b/nimbro_topic_transport/CMakeLists.txt index b01ade0..a856dec 100644 --- a/nimbro_topic_transport/CMakeLists.txt +++ b/nimbro_topic_transport/CMakeLists.txt @@ -16,10 +16,6 @@ add_message_files(FILES TopicBandwidth.msg ) -add_service_files(FILES - LatchedMessageRequest.srv -) - generate_messages(DEPENDENCIES std_msgs ) diff --git a/nimbro_topic_transport/launch/topics.yaml b/nimbro_topic_transport/launch/topics.yaml index 1cdefce..090176e 100644 --- a/nimbro_topic_transport/launch/topics.yaml +++ b/nimbro_topic_transport/launch/topics.yaml @@ -3,3 +3,4 @@ topics: compress: true # enable bz2 compression rate: 1.0 # rate limit at 1Hz - name: "/my_second_topic" + latch: True diff --git a/nimbro_topic_transport/src/tcp/tcp_sender.cpp b/nimbro_topic_transport/src/tcp/tcp_sender.cpp index 99fe6cf..2097ab5 100644 --- a/nimbro_topic_transport/src/tcp/tcp_sender.cpp +++ b/nimbro_topic_transport/src/tcp/tcp_sender.cpp @@ -81,7 +81,7 @@ TCPSender::TCPSender() flags |= TCP_FLAG_LATCHED; boost::function func; - func = boost::bind(&TCPSender::send, this, topic, flags, _1); + func = boost::bind(&TCPSender::send, this, topic, flags, _1, true); ros::SubscribeOptions options; options.initByFullCallbackType(topic, 20, func); @@ -141,9 +141,6 @@ TCPSender::TCPSender() boost::bind(&TCPSender::updateStats, this) ); m_statsTimer.start(); - - m_latchedMessageRequestServer = m_nh.advertiseService( - "request_latched_message", &TCPSender::sendLatched, this); } TCPSender::~TCPSender() @@ -224,6 +221,8 @@ bool TCPSender::connect() ROS_WARN("Not setting TCP_USER_TIMEOUT"); #endif + this->sendLatched(); + return true; } @@ -240,7 +239,8 @@ class PtrStream uint8_t* m_ptr; }; -void TCPSender::send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter) +void TCPSender::send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter, + const bool reconnect) { #if WITH_CONFIG_SERVER if (! (*m_enableTopic[topic])() ) @@ -321,10 +321,12 @@ void TCPSender::send(const std::string& topic, int flags, const topic_tools::Sha { if(m_fd == -1) { - if(!connect()) + if(reconnect && !connect()) { ROS_WARN("Connection failed, trying again"); continue; + } else if (!reconnect) { + break; } } @@ -372,21 +374,16 @@ void TCPSender::updateStats() m_sentBytesInStatsInterval = 0; } -bool TCPSender::sendLatched(nimbro_topic_transport::LatchedMessageRequest::Request& request, - nimbro_topic_transport::LatchedMessageRequest::Response& response) { - - if (this->m_latchedMessages.find(request.topic_name) == this->m_latchedMessages.end()) { - response.message_sent = false; - return true; - } +void TCPSender::sendLatched() { - std::pair record = - this->m_latchedMessages.find(request.topic_name)->second; + std::map>::iterator it = + this->m_latchedMessages.begin(); - this->send(request.topic_name, record.second, record.first); - response.message_sent = true; - - return true; + // send all latched messages + while (it != this->m_latchedMessages.end()) { + this->send(it->first, it->second.second, it->second.first, false); + it++; + } } } diff --git a/nimbro_topic_transport/src/tcp/tcp_sender.h b/nimbro_topic_transport/src/tcp/tcp_sender.h index 039ee70..09a6e1b 100644 --- a/nimbro_topic_transport/src/tcp/tcp_sender.h +++ b/nimbro_topic_transport/src/tcp/tcp_sender.h @@ -17,7 +17,6 @@ #include #include -#include namespace nimbro_topic_transport { @@ -30,9 +29,9 @@ class TCPSender bool connect(); - void send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter); - bool sendLatched(nimbro_topic_transport::LatchedMessageRequest::Request& request, - nimbro_topic_transport::LatchedMessageRequest::Response& response); + void send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter, + const bool reconnect = true); + void sendLatched(); private: void updateStats(); @@ -60,7 +59,6 @@ class TCPSender ros::WallTimer m_statsTimer; uint64_t m_sentBytesInStatsInterval; std::map m_topicSendBytesInStatsInteral; - ros::ServiceServer m_latchedMessageRequestServer; }; } diff --git a/nimbro_topic_transport/src/udp/udp_sender.cpp b/nimbro_topic_transport/src/udp/udp_sender.cpp index e32fcf0..ae9db86 100644 --- a/nimbro_topic_transport/src/udp/udp_sender.cpp +++ b/nimbro_topic_transport/src/udp/udp_sender.cpp @@ -140,6 +140,10 @@ UDPSender::UDPSender() if(list[i].hasMember("type")) type = (std::string)(list[i]["type"]); + if(list[i].hasMember("latch")) + ROS_WARN_STREAM("Ignoring 'latch' flag at UDP topic " << ((std::string)list[i]["name"]).c_str() << + " (UDP topics can't be latched)."); + TopicSender* sender = new TopicSender(this, &nh, list[i]["name"], rate, resend, flags, enabled, type); if(m_relayMode) diff --git a/nimbro_topic_transport/srv/LatchedMessageRequest.srv b/nimbro_topic_transport/srv/LatchedMessageRequest.srv deleted file mode 100644 index 7485df3..0000000 --- a/nimbro_topic_transport/srv/LatchedMessageRequest.srv +++ /dev/null @@ -1,3 +0,0 @@ -string topic_name ---- -bool message_sent \ No newline at end of file From 7a879937ca7931d12793e7c4e64a377395fa7e10 Mon Sep 17 00:00:00 2001 From: Martin Pecka Date: Mon, 31 Oct 2016 03:25:45 +0100 Subject: [PATCH 3/3] Added support for bidirectional topics. --- .../launch/bidirectional_topics.launch | 22 +++++++++++ .../launch/tcp_receiver_back.launch | 11 ++++++ .../launch/tcp_sender.launch | 6 +++ .../launch/tcp_sender_back.launch | 20 ++++++++++ .../launch/topics_back.yaml | 6 +++ nimbro_topic_transport/src/tcp/tcp_sender.cpp | 38 ++++++++++++++++--- nimbro_topic_transport/src/tcp/tcp_sender.h | 9 ++++- 7 files changed, 105 insertions(+), 7 deletions(-) create mode 100644 nimbro_topic_transport/launch/bidirectional_topics.launch create mode 100644 nimbro_topic_transport/launch/tcp_receiver_back.launch create mode 100644 nimbro_topic_transport/launch/tcp_sender_back.launch create mode 100644 nimbro_topic_transport/launch/topics_back.yaml diff --git a/nimbro_topic_transport/launch/bidirectional_topics.launch b/nimbro_topic_transport/launch/bidirectional_topics.launch new file mode 100644 index 0000000..df0be1b --- /dev/null +++ b/nimbro_topic_transport/launch/bidirectional_topics.launch @@ -0,0 +1,22 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/nimbro_topic_transport/launch/tcp_receiver_back.launch b/nimbro_topic_transport/launch/tcp_receiver_back.launch new file mode 100644 index 0000000..6539a05 --- /dev/null +++ b/nimbro_topic_transport/launch/tcp_receiver_back.launch @@ -0,0 +1,11 @@ + + + + + + + + + diff --git a/nimbro_topic_transport/launch/tcp_sender.launch b/nimbro_topic_transport/launch/tcp_sender.launch index a09fb14..46e2aaa 100644 --- a/nimbro_topic_transport/launch/tcp_sender.launch +++ b/nimbro_topic_transport/launch/tcp_sender.launch @@ -1,5 +1,6 @@ + @@ -10,5 +11,10 @@ + + + ["/tcp_receiver_back"] + [] diff --git a/nimbro_topic_transport/launch/tcp_sender_back.launch b/nimbro_topic_transport/launch/tcp_sender_back.launch new file mode 100644 index 0000000..267ac88 --- /dev/null +++ b/nimbro_topic_transport/launch/tcp_sender_back.launch @@ -0,0 +1,20 @@ + + + + + + + + + + + + + + + + ["/tcp_receiver"] + [] + + diff --git a/nimbro_topic_transport/launch/topics_back.yaml b/nimbro_topic_transport/launch/topics_back.yaml new file mode 100644 index 0000000..a7ed0a2 --- /dev/null +++ b/nimbro_topic_transport/launch/topics_back.yaml @@ -0,0 +1,6 @@ +topics: + - name: "/recv/my_first_topic" + compress: true # enable bz2 compression + rate: 1.0 # rate limit at 1Hz + - name: "/recv/my_second_topic" + latch: True diff --git a/nimbro_topic_transport/src/tcp/tcp_sender.cpp b/nimbro_topic_transport/src/tcp/tcp_sender.cpp index 2097ab5..86fc927 100644 --- a/nimbro_topic_transport/src/tcp/tcp_sender.cpp +++ b/nimbro_topic_transport/src/tcp/tcp_sender.cpp @@ -9,6 +9,8 @@ #include #include +#include "ros/message_traits.h" + #include namespace nimbro_topic_transport @@ -80,11 +82,11 @@ TCPSender::TCPSender() if(entry.hasMember("latch") && ((bool)entry["latch"]) == true) flags |= TCP_FLAG_LATCHED; - boost::function func; - func = boost::bind(&TCPSender::send, this, topic, flags, _1, true); + boost::function&)> func; + func = boost::bind(&TCPSender::messageCallback, this, topic, flags, _1); ros::SubscribeOptions options; - options.initByFullCallbackType(topic, 20, func); + options.initByFullCallbackType&>(topic, 20, func); if(entry.hasMember("type")) { @@ -117,6 +119,9 @@ TCPSender::TCPSender() #endif } + if (m_nh.hasParam("ignored_publishers")) + m_nh.getParam("ignored_publishers", m_ignoredPubs); + char hostnameBuf[256]; gethostname(hostnameBuf, sizeof(hostnameBuf)); hostnameBuf[sizeof(hostnameBuf)-1] = 0; @@ -239,7 +244,30 @@ class PtrStream uint8_t* m_ptr; }; -void TCPSender::send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter, +void TCPSender::messageCallback(const std::string& topic, int flags, + const ros::MessageEvent& event) +{ +#if WITH_CONFIG_SERVER + if (! (*m_enableTopic[topic])() ) + return; +#endif + + if (m_ignoredPubs.size() > 0) // check if the message wasn't published by an ignored publisher + { + std::string messagePublisher = event.getConnectionHeader()["callerid"]; + for (std::vector::iterator ignoredPublisher = m_ignoredPubs.begin(); + ignoredPublisher != m_ignoredPubs.end(); ignoredPublisher++) { + if (messagePublisher == *ignoredPublisher) { + return; + } + } + } + + this->send(topic, flags, event.getMessage()); +} + + +void TCPSender::send(const std::string& topic, int flags, topic_tools::ShapeShifter::ConstPtr shifter, const bool reconnect) { #if WITH_CONFIG_SERVER @@ -376,7 +404,7 @@ void TCPSender::updateStats() void TCPSender::sendLatched() { - std::map>::iterator it = + std::map >::iterator it = this->m_latchedMessages.begin(); // send all latched messages diff --git a/nimbro_topic_transport/src/tcp/tcp_sender.h b/nimbro_topic_transport/src/tcp/tcp_sender.h index 09a6e1b..7c6a1fa 100644 --- a/nimbro_topic_transport/src/tcp/tcp_sender.h +++ b/nimbro_topic_transport/src/tcp/tcp_sender.h @@ -16,6 +16,8 @@ #include +#include "ros/message_event.h" + #include namespace nimbro_topic_transport @@ -29,8 +31,10 @@ class TCPSender bool connect(); - void send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter, - const bool reconnect = true); + void send(const std::string& topic, int flags, topic_tools::ShapeShifter::ConstPtr shifter, + const bool reconnect = true); + void messageCallback(const std::string& topic, int flags, + const ros::MessageEvent& shifter); void sendLatched(); private: void updateStats(); @@ -46,6 +50,7 @@ class TCPSender std::vector m_subs; std::vector m_packet; std::vector m_compressionBuf; + std::vector m_ignoredPubs; std::map > m_latchedMessages;