diff --git a/nimbro_topic_transport/launch/topics.yaml b/nimbro_topic_transport/launch/topics.yaml index 1cdefce..090176e 100644 --- a/nimbro_topic_transport/launch/topics.yaml +++ b/nimbro_topic_transport/launch/topics.yaml @@ -3,3 +3,4 @@ topics: compress: true # enable bz2 compression rate: 1.0 # rate limit at 1Hz - name: "/my_second_topic" + latch: True diff --git a/nimbro_topic_transport/src/tcp/tcp_packet.h b/nimbro_topic_transport/src/tcp/tcp_packet.h index b77be62..851fae5 100644 --- a/nimbro_topic_transport/src/tcp/tcp_packet.h +++ b/nimbro_topic_transport/src/tcp/tcp_packet.h @@ -11,7 +11,8 @@ namespace nimbro_topic_transport enum TCPFlag { - TCP_FLAG_COMPRESSED = (1 << 0) + TCP_FLAG_COMPRESSED = (1 << 0), + TCP_FLAG_LATCHED = (1 << 1) }; struct TCPHeader diff --git a/nimbro_topic_transport/src/tcp/tcp_receiver.cpp b/nimbro_topic_transport/src/tcp/tcp_receiver.cpp index 135eabb..e9605ec 100644 --- a/nimbro_topic_transport/src/tcp/tcp_receiver.cpp +++ b/nimbro_topic_transport/src/tcp/tcp_receiver.cpp @@ -345,6 +345,10 @@ void TCPReceiver::ClientHandler::run() topic_info::getMsgDef(type) ); + if (header.flags() & TCP_FLAG_LATCHED) { + options.latch = true; + } + // It will take subscribers some time to connect to our publisher. // Therefore, latch messages so they will not be lost. // No, this is often unexpected. Instead, wait before publishing. diff --git a/nimbro_topic_transport/src/tcp/tcp_sender.cpp b/nimbro_topic_transport/src/tcp/tcp_sender.cpp index 13f8cc0..2097ab5 100644 --- a/nimbro_topic_transport/src/tcp/tcp_sender.cpp +++ b/nimbro_topic_transport/src/tcp/tcp_sender.cpp @@ -77,8 +77,11 @@ TCPSender::TCPSender() if(entry.hasMember("compress") && ((bool)entry["compress"]) == true) flags |= TCP_FLAG_COMPRESSED; + if(entry.hasMember("latch") && ((bool)entry["latch"]) == true) + flags |= TCP_FLAG_LATCHED; + boost::function func; - func = boost::bind(&TCPSender::send, this, topic, flags, _1); + func = boost::bind(&TCPSender::send, this, topic, flags, _1, true); ros::SubscribeOptions options; options.initByFullCallbackType(topic, 20, func); @@ -138,7 +141,7 @@ TCPSender::TCPSender() boost::bind(&TCPSender::updateStats, this) ); m_statsTimer.start(); -} +} TCPSender::~TCPSender() { @@ -218,6 +221,8 @@ bool TCPSender::connect() ROS_WARN("Not setting TCP_USER_TIMEOUT"); #endif + this->sendLatched(); + return true; } @@ -234,7 +239,8 @@ class PtrStream uint8_t* m_ptr; }; -void TCPSender::send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter) +void TCPSender::send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter, + const bool reconnect) { #if WITH_CONFIG_SERVER if (! (*m_enableTopic[topic])() ) @@ -250,6 +256,9 @@ void TCPSender::send(const std::string& topic, int flags, const topic_tools::Sha if(flags & TCP_FLAG_COMPRESSED) maxDataSize = size + size / 100 + 1200; // taken from bzip2 docs + if (flags & TCP_FLAG_LATCHED) + this->m_latchedMessages[topic] = std::make_pair(shifter, flags); + m_packet.resize( sizeof(TCPHeader) + topic.length() + type.length() + maxDataSize ); @@ -312,10 +321,12 @@ void TCPSender::send(const std::string& topic, int flags, const topic_tools::Sha { if(m_fd == -1) { - if(!connect()) + if(reconnect && !connect()) { ROS_WARN("Connection failed, trying again"); continue; + } else if (!reconnect) { + break; } } @@ -363,6 +374,18 @@ void TCPSender::updateStats() m_sentBytesInStatsInterval = 0; } +void TCPSender::sendLatched() { + + std::map>::iterator it = + this->m_latchedMessages.begin(); + + // send all latched messages + while (it != this->m_latchedMessages.end()) { + this->send(it->first, it->second.second, it->second.first, false); + it++; + } +} + } diff --git a/nimbro_topic_transport/src/tcp/tcp_sender.h b/nimbro_topic_transport/src/tcp/tcp_sender.h index ebedf5b..09a6e1b 100644 --- a/nimbro_topic_transport/src/tcp/tcp_sender.h +++ b/nimbro_topic_transport/src/tcp/tcp_sender.h @@ -14,6 +14,8 @@ #include #endif +#include + #include namespace nimbro_topic_transport @@ -27,7 +29,9 @@ class TCPSender bool connect(); - void send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter); + void send(const std::string& topic, int flags, const topic_tools::ShapeShifter::ConstPtr& shifter, + const bool reconnect = true); + void sendLatched(); private: void updateStats(); @@ -43,6 +47,8 @@ class TCPSender std::vector m_packet; std::vector m_compressionBuf; + std::map > m_latchedMessages; + #if WITH_CONFIG_SERVER std::map>> m_enableTopic; #endif diff --git a/nimbro_topic_transport/src/udp/udp_sender.cpp b/nimbro_topic_transport/src/udp/udp_sender.cpp index e32fcf0..ae9db86 100644 --- a/nimbro_topic_transport/src/udp/udp_sender.cpp +++ b/nimbro_topic_transport/src/udp/udp_sender.cpp @@ -140,6 +140,10 @@ UDPSender::UDPSender() if(list[i].hasMember("type")) type = (std::string)(list[i]["type"]); + if(list[i].hasMember("latch")) + ROS_WARN_STREAM("Ignoring 'latch' flag at UDP topic " << ((std::string)list[i]["name"]).c_str() << + " (UDP topics can't be latched)."); + TopicSender* sender = new TopicSender(this, &nh, list[i]["name"], rate, resend, flags, enabled, type); if(m_relayMode)