Go to the documentation of this file.
18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_
19 #define IGNITION_TRANSPORT_DISCOVERY_HH_
31 #include <sys/types.h>
33 #include <sys/socket.h>
37 #include <arpa/inet.h>
41 #include <netinet/in.h>
47 #pragma warning(push, 0)
52 #pragma warning(disable: 4503)
54 #pragma warning(disable: 4996)
58 #include <condition_variable>
66 #include "ignition/transport/Export.hh"
95 template<
typename Pub>
105 const bool _verbose =
false)
109 silenceInterval(kDefSilenceInterval),
110 activityInterval(kDefActivityInterval),
111 heartbeatInterval(kDefHeartbeatInterval),
112 connectionCb(nullptr),
113 disconnectionCb(nullptr),
116 numHeartbeatsUninitialized(0),
121 if (
env(
"IGN_IP", ignIp) && !ignIp.
empty())
122 this->hostInterfaces = {ignIp};
130 WORD wVersionRequested;
134 wVersionRequested = MAKEWORD(2, 2);
136 if (WSAStartup(wVersionRequested, &wsaData) != 0)
142 for (
const auto &netIface : this->hostInterfaces)
144 auto succeed = this->RegisterNetIface(netIface);
149 if (netIface == this->hostAddr && !succeed)
151 this->RegisterNetIface(
"127.0.0.1");
152 std::cerr <<
"Did you set the environment variable IGN_IP with a "
154 <<
" [" << netIface <<
"] seems an invalid local IP "
156 <<
" Using 127.0.0.1 as hostname." <<
std::endl;
157 this->hostAddr =
"127.0.0.1";
166 if (setsockopt(this->sockets.
at(0), SOL_SOCKET, SO_REUSEADDR,
167 reinterpret_cast<const char *
>(&reuseAddr),
sizeof(reuseAddr)) != 0)
169 std::cerr <<
"Error setting socket option (SO_REUSEADDR)."
180 if (setsockopt(this->sockets.
at(0), SOL_SOCKET, SO_REUSEPORT,
181 reinterpret_cast<const char *
>(&reusePort),
sizeof(reusePort)) != 0)
183 std::cerr <<
"Error setting socket option (SO_REUSEPORT)."
189 sockaddr_in localAddr;
190 memset(&localAddr, 0,
sizeof(localAddr));
191 localAddr.sin_family = AF_INET;
192 localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
193 localAddr.sin_port = htons(
static_cast<u_short
>(this->port));
195 if (bind(this->sockets.
at(0),
196 reinterpret_cast<sockaddr *
>(&localAddr),
sizeof(sockaddr_in)) < 0)
203 memset(&this->mcastAddr, 0,
sizeof(this->mcastAddr));
204 this->mcastAddr.sin_family = AF_INET;
205 this->mcastAddr.sin_addr.s_addr =
206 inet_addr(this->kMulticastGroup.
c_str());
207 this->mcastAddr.sin_port = htons(
static_cast<u_short
>(this->port));
217 this->exitMutex.
lock();
222 if (this->threadReception.
joinable())
223 this->threadReception.
join();
231 for (
const auto &sock : this->sockets)
254 this->enabled =
true;
258 this->timeNextHeartbeat = now;
259 this->timeNextActivity = now;
262 this->threadReception =
std::thread(&Discovery::RecvMessages,
this);
285 this->SendMsg(
AdvType, _publisher);
312 cb = this->connectionCb;
317 pub.SetPUuid(this->pUuid);
332 for (
const auto &node : proc.second)
405 return this->hostAddr;
415 return this->activityInterval;
426 return this->heartbeatInterval;
436 return this->silenceInterval;
445 this->activityInterval = _ms;
454 this->heartbeatInterval = _ms;
463 this->silenceInterval = _ms;
473 this->connectionCb =
_cb;
483 this->disconnectionCb =
_cb;
497 std::cout <<
"\tActivity: " << this->activityInterval
499 std::cout <<
"\tHeartbeat: " << this->heartbeatInterval
501 std::cout <<
"\tSilence: " << this->silenceInterval
543 if (!this->initialized)
545 this->initializedCv.
wait(
lk, [
this]{
return this->initialized;});
552 private:
void UpdateActivity()
558 if (now < this->timeNextActivity)
561 for (
auto it = this->
activity.
cbegin(); it != this->activity.cend();)
564 auto elapsed = now - it->second;
567 if (std::chrono::duration_cast<std::chrono::milliseconds>
568 (elapsed).count() > this->silenceInterval)
577 publisher.SetPUuid(it->first);
578 this->disconnectionCb(publisher);
592 private:
void UpdateHeartbeat()
599 if (now < this->timeNextHeartbeat)
603 Publisher pub(
"",
"", this->pUuid,
"", AdvertiseOptions());
614 for (
const auto &topic : nodes)
616 for (
const auto &node : topic.second)
622 if (!this->initialized)
624 ++this->numHeartbeatsUninitialized;
625 if (this->numHeartbeatsUninitialized == 2)
629 this->initialized =
true;
650 private:
int NextTimeout()
const
653 auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
654 auto timeUntilNextActivity = this->timeNextActivity - now;
656 int t =
static_cast<int>(
657 std::chrono::duration_cast<std::chrono::milliseconds>
658 (
std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
659 int t2 =
std::min(t, this->kTimeout);
664 private:
void RecvMessages()
666 bool timeToExit =
false;
670 int timeout = this->NextTimeout();
674 this->RecvDiscoveryUpdate();
680 this->UpdateHeartbeat();
681 this->UpdateActivity();
693 private:
void RecvDiscoveryUpdate()
695 char rcvStr[Discovery::kMaxRcvStr];
698 sockaddr_in clntAddr;
699 socklen_t addrLen =
sizeof(clntAddr);
701 if ((recvfrom(this->sockets.
at(0),
702 reinterpret_cast<raw_type *
>(rcvStr),
704 reinterpret_cast<sockaddr *
>(&clntAddr),
705 reinterpret_cast<socklen_t *
>(&addrLen))) < 0)
707 std::cerr <<
"Discovery::RecvDiscoveryUpdate() recvfrom error"
711 srcAddr = inet_ntoa(clntAddr.sin_addr);
712 srcPort = ntohs(clntAddr.sin_port);
716 std::cout <<
"\nReceived discovery update from " << srcAddr <<
": "
720 this->DispatchDiscoveryMsg(srcAddr, rcvStr);
727 private:
void DispatchDiscoveryMsg(
const std::string &_fromIp,
735 pBody += header.HeaderLength();
738 if (this->kWireVersion != header.Version())
741 auto recvPUuid = header.PUuid();
744 if (recvPUuid == this->pUuid)
748 DiscoveryCallback<Pub> connectCb;
749 DiscoveryCallback<Pub> disconnectCb;
753 connectCb = this->connectionCb;
754 disconnectCb = this->disconnectionCb;
757 switch (header.Type())
762 transport::AdvertiseMessage<Pub> advMsg;
763 advMsg.Unpack(pBody);
768 _fromIp != this->hostAddr))
780 if (added && connectCb)
783 connectCb(advMsg.Publisher());
791 SubscriptionMsg subMsg;
792 subMsg.Unpack(pBody);
793 auto recvTopic = subMsg.Topic();
808 for (
const auto &nodeInfo :
addresses[this->pUuid])
813 _fromIp != this->hostAddr))
819 this->SendMsg(
AdvType, nodeInfo);
840 pub.SetPUuid(recvPUuid);
856 transport::AdvertiseMessage<Pub> advMsg;
857 advMsg.Unpack(pBody);
862 _fromIp != this->hostAddr))
870 disconnectCb(advMsg.Publisher());
877 advMsg.Publisher().PUuid(), advMsg.Publisher().NUuid());
884 std::cerr <<
"Unknown message type [" << header.Type() <<
"]\n";
896 private:
template<
typename T>
897 void SendMsg(
const uint8_t _type,
899 const uint16_t _flags = 0)
const
902 Header header(this->Version(), _pub.PUuid(), _type, _flags);
914 transport::AdvertiseMessage<T> advMsg(header, _pub);
917 buffer.
resize(advMsg.MsgLength());
918 advMsg.Pack(
reinterpret_cast<char*
>(&buffer[0]));
919 msgLength =
static_cast<int>(advMsg.MsgLength());
925 SubscriptionMsg subMsg(header, topic);
928 buffer.
resize(subMsg.MsgLength());
929 subMsg.Pack(
reinterpret_cast<char*
>(&buffer[0]));
930 msgLength =
static_cast<int>(subMsg.MsgLength());
937 buffer.
resize(header.HeaderLength());
938 header.Pack(
reinterpret_cast<char*
>(&buffer[0]));
939 msgLength = header.HeaderLength();
943 std::cerr <<
"Discovery::SendMsg() error: Unrecognized message"
944 <<
" type [" << _type <<
"]" <<
std::endl;
950 for (
const auto &sock : this->Sockets())
952 if (sendto(sock,
reinterpret_cast<const raw_type *
>(
953 reinterpret_cast<unsigned char*
>(&buffer[0])),
955 reinterpret_cast<const sockaddr *
>(this->MulticastAddr()),
956 sizeof(*(this->MulticastAddr()))) != msgLength)
966 <<
" msg [" << topic <<
"]" <<
std::endl;
974 return this->sockets;
979 private:
const sockaddr_in *MulticastAddr()
const
981 return &this->mcastAddr;
986 private:
bool Verbose()
const
988 return this->verbose;
993 private: uint8_t Version()
const
995 return this->kWireVersion;
1002 private:
bool RegisterNetIface(
const std::string &_ip)
1005 int sock =
static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1015 struct in_addr ifAddr;
1016 ifAddr.s_addr = inet_addr(_ip.
c_str());
1017 if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1018 reinterpret_cast<const char*
>(&ifAddr),
sizeof(ifAddr)) != 0)
1020 std::cerr <<
"Error setting socket option (IP_MULTICAST_IF)."
1030 struct ip_mreq group;
1031 group.imr_multiaddr.s_addr =
1032 inet_addr(this->kMulticastGroup.
c_str());
1033 group.imr_interface.s_addr = inet_addr(_ip.
c_str());
1034 if (setsockopt(this->sockets.
at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1035 reinterpret_cast<const char*
>(&group),
sizeof(group)) != 0)
1037 std::cerr <<
"Error setting socket option (IP_ADD_MEMBERSHIP)."
1048 private:
static const unsigned int kDefActivityInterval = 100;
1053 private:
static const unsigned int kDefHeartbeatInterval = 1000;
1058 private:
static const unsigned int kDefSilenceInterval = 3000;
1061 private:
const std::string kMulticastGroup =
"224.0.0.7";
1064 private:
const int kTimeout = 250;
1067 private:
static const int kMaxRcvStr = 65536;
1071 private:
static const uint8_t kWireVersion = 8;
1088 private:
unsigned int silenceInterval;
1093 private:
unsigned int activityInterval;
1098 private:
unsigned int heartbeatInterval;
1101 private: DiscoveryCallback<Pub> connectionCb;
1104 private: DiscoveryCallback<Pub> disconnectionCb;
1107 private: TopicStorage<Pub> info;
1116 private:
bool verbose;
1122 private: sockaddr_in mcastAddr;
1143 private:
bool initialized;
1146 private:
unsigned int numHeartbeatsUninitialized;
1155 private:
bool enabled;
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events. Each time a new topic is connected,...
Definition: Discovery.hh:470
bool Publishers(const std::string &_topic, std::map< std::string, std::vector< T >> &_info) const
Get the map of publishers stored for a given topic.
Definition: TopicStorage.hh:203
bool DelPublisherByNode(const std::string &_topic, const std::string &_pUuid, const std::string &_nUuid)
Remove a publisher associated to a given topic and UUID pair.
Definition: TopicStorage.hh:218
@ HOST
Topic/service only available to subscribers in the same machine as the publisher.
Definition: AdvertiseOptions.hh:28
This class stores all the information about a publisher. It stores the topic name that publishes,...
Definition: Publisher.hh:38
std::map< std::string, Timestamp > activity
Activity information. Every time there is a message from a remote node, its activity information is u...
Definition: Discovery.hh:1113
static const uint8_t UnadvType
Definition: Packet.hh:37
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: Discovery.hh:442
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every 'activity interval' milliseconds.
Definition: Discovery.hh:412
static const uint8_t AdvType
Definition: Packet.hh:35
bool pollSockets(const std::vector< int > &_sockets, const int _timeout)
void WaitForInit() const
Check if ready/initialized. If not, then wait on the initializedCv condition variable.
Definition: Discovery.hh:539
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently stored.
Definition: TopicStorage.hh:335
static const uint8_t SubType
Definition: Packet.hh:36
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: Discovery.hh:350
static const std::vector< std::string > MsgTypesStr
Used for debugging the message type received/send.
Definition: Packet.hh:44
A class for customizing the publication options for a topic or service advertised....
Definition: AdvertiseOptions.hh:55
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: Discovery.hh:433
std::string determineHost()
Determine IP or hostname. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp...
virtual ~Discovery()
Destructor.
Definition: Discovery.hh:214
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes....
Definition: Discovery.hh:423
void PrintCurrentState() const
Print the current discovery state.
Definition: Discovery.hh:487
*brief Advertise a new service without any output parameter *In this version the callback is a free function *param[in] _topic Topic name associated to the service *param[in] _cb Callback to handle the service request with the *following void(* _cb)(const RequestT &_req)
Definition: Node.hh:527
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:151
A class that is used to store information about an advertised publisher. An instance of this class is...
Definition: Node.hh:521
static const uint8_t ByeType
Definition: Packet.hh:39
bool DelPublishersByProc(const std::string &_pUuid)
Remove all the publishers associated to a given process.
Definition: TopicStorage.hh:258
bool AddPublisher(const T &_publisher)
Add a new address associated to a given topic and node UUID.
Definition: TopicStorage.hh:49
std::unique_lock< std::recursive_mutex > lk(this->Shared() ->mutex)
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message. Broadcast a discovery message that will cancel all the discovery informati...
Definition: Discovery.hh:374
cb(_internalRep, _internalResult)
bool env(const std::string &_name, std::string &_value)
Find the environment variable '_name' and return its value.
A discovery class that implements a distributed topic discovery protocol. It uses UDP multicast for s...
Definition: Discovery.hh:97
*brief Advertise a new service without any output parameter *In this version the callback is a free function *param[in] _topic Topic name associated to the service *param[in] _cb Callback to handle the service request with the *following void(*) const AdvertiseServiceOptions ReplyT const std::string _topic)
Definition: Node.hh:558
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: Discovery.hh:460
bool HasAnyPublishers(const std::string &_topic, const std::string &_pUuid) const
Return if there is any publisher stored for the given topic and process UUID.
Definition: TopicStorage.hh:130
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: Discovery.hh:530
T duration_cast(T... args)
std::string HostAddr() const
Get the IP address of this host.
Definition: Discovery.hh:402
void PublishersByProc(const std::string &_pUuid, std::map< std::string, std::vector< T >> &_pubs) const
Given a process UUID, the function returns the list of publishers contained in this process UUID with...
Definition: TopicStorage.hh:282
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events. Each time a topic is no longer active,...
Definition: Discovery.hh:480
void Print() const
Print all the information for debugging purposes.
Definition: TopicStorage.hh:342
void raw_type
Definition: Discovery.hh:43
SrvAddresses_M addresses
Definition: Node.hh:986
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition: Discovery.hh:103
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: Discovery.hh:451
static const uint8_t HeartbeatType
Definition: Packet.hh:38
void Start()
Start the discovery service. You probably want to register the callbacks for receiving discovery noti...
Definition: Discovery.hh:245
bool Discover(const std::string &_topic) const
Request discovery information about a topic. When using this method, the user might want to use SetCo...
Definition: Discovery.hh:300
@ PROCESS
Topic/service only available to subscribers in the same process as the publisher.
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: Discovery.hh:269
bool Publisher(const std::string &_topic, const std::string &_pUuid, const std::string &_nUuid, T &_publisher) const
Get the address information for a given topic and node UUID.
Definition: TopicStorage.hh:165
std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine. Reference: https://github....
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers' information known for a given topic.
Definition: Discovery.hh:360