Dev/zmq stream all ipv6 adn remove rx_zmqip (#958)

* enable ipv6 in zmq socket

* removed rx_zmqip API and field in gui, changed client updaterxrzip to updateclientzmqip to have the rx_hostname ip if 0. 

* updated command line for rx_zmqip to give a warning. 

* Replaced 'depreciated' to 'deprecated' everywhere

* switching from * to 0.0.0.0 works for rebinding zmq sockets

* fixed help in command line for rx_zmqip * to 0.0.0.0 and removed cmd in python

* remove publisher zmq socket ip also for moench post processing

* fixed tests

* publisher zmq ip macros to be reused
This commit is contained in:
2024-09-10 15:19:08 +02:00
committed by GitHub
parent 15e8c0d9f1
commit e848293916
35 changed files with 168 additions and 426 deletions

View File

@ -53,10 +53,19 @@ ZmqSocket::ZmqSocket(const char *const hostname_or_ip,
}
LOG(logDEBUG) << "Default receive high water mark:"
<< GetReceiveHighWaterMark();
// enable IPv6 addresses
int ipv6 = 1;
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_IPV6, &ipv6,
sizeof(ipv6))) {
PrintError();
throw ZmqSocketError("Could not set ZMQ_IPV6");
}
}
ZmqSocket::ZmqSocket(const uint16_t portnumber, const char *ethip)
ZmqSocket::ZmqSocket(const uint16_t portnumber)
: portno(portnumber), sockfd(true) {
// create context
sockfd.contextDescriptor = zmq_ctx_new();
if (sockfd.contextDescriptor == nullptr)
@ -72,32 +81,44 @@ ZmqSocket::ZmqSocket(const uint16_t portnumber, const char *ethip)
// construct address, can be refactored with libfmt
std::ostringstream oss;
oss << "tcp://" << ethip << ":" << portno;
oss << "tcp://" << ZMQ_PUBLISHER_IP << ":" << portno;
sockfd.serverAddress = oss.str();
LOG(logDEBUG) << "zmq address: " << sockfd.serverAddress;
// enable IPv6 addresses
int ipv6 = 1;
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_IPV6, &ipv6,
sizeof(ipv6))) {
PrintError();
throw ZmqSocketError("Could not set ZMQ_IPV6");
}
// Socket Options for keepalive
// enable TCP keepalive
int keepalive = 1;
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_TCP_KEEPALIVE, &keepalive, sizeof(keepalive))) {
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_TCP_KEEPALIVE, &keepalive,
sizeof(keepalive))) {
PrintError();
throw ZmqSocketError("Could set socket opt ZMQ_TCP_KEEPALIVE");
}
// set the number of keepalives before death
keepalive = 10;
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_TCP_KEEPALIVE_CNT, &keepalive, sizeof(keepalive))) {
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_TCP_KEEPALIVE_CNT,
&keepalive, sizeof(keepalive))) {
PrintError();
throw ZmqSocketError("Could set socket opt ZMQ_TCP_KEEPALIVE_CNT");
}
// set the time before the first keepalive
keepalive = 60;
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_TCP_KEEPALIVE_IDLE, &keepalive, sizeof(keepalive))) {
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_TCP_KEEPALIVE_IDLE,
&keepalive, sizeof(keepalive))) {
PrintError();
throw ZmqSocketError("Could set socket opt ZMQ_TCP_KEEPALIVE_IDLE");
}
// set the interval between keepalives
keepalive = 1;
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_TCP_KEEPALIVE_INTVL, &keepalive, sizeof(keepalive))) {
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_TCP_KEEPALIVE_INTVL,
&keepalive, sizeof(keepalive))) {
PrintError();
throw ZmqSocketError("Could set socket opt ZMQ_TCP_KEEPALIVE_INTVL");
}
@ -109,7 +130,7 @@ ZmqSocket::ZmqSocket(const uint16_t portnumber, const char *ethip)
}
// sleep to allow a slow-joiner
std::this_thread::sleep_for(std::chrono::milliseconds(200));
};
}
int ZmqSocket::GetSendHighWaterMark() {
int value = 0;
@ -214,8 +235,9 @@ void ZmqSocket::SetReceiveBuffer(int limit) {
}
}
void ZmqSocket::Rebind() { // the purpose is to apply HWL changes, which are
// frozen at bind, which is in the constructor.
void ZmqSocket::Rebind() {
// the purpose is to apply HWL changes, which are
// frozen at bind, which is in the constructor.
// unbbind
if (zmq_unbind(sockfd.socketDescriptor, sockfd.serverAddress.c_str())) {
@ -498,8 +520,11 @@ void ZmqSocket::PrintError() {
LOG(logERROR)
<< "No I/O thread is available to accomplish the task (zmq)";
break;
case ENOENT:
LOG(logERROR) << "The requested endpoint does not exist (zmq)";
break;
default:
LOG(logERROR) << "Unknown socket error (zmq)";
LOG(logERROR) << "Unknown socket error (zmq). Error code: " << errno;
break;
}
}