diff --git a/slsDetectorCalibration/moenchExecutables/moenchZmqProcess.cpp b/slsDetectorCalibration/moenchExecutables/moenchZmqProcess.cpp index 5a0080937..45d5ed86f 100644 --- a/slsDetectorCalibration/moenchExecutables/moenchZmqProcess.cpp +++ b/slsDetectorCalibration/moenchExecutables/moenchZmqProcess.cpp @@ -245,7 +245,7 @@ int main(int argc, char *argv[]) { delete zmqsocket; return EXIT_FAILURE; } else - printf("Zmq Client at %s\n", zmqsocket->GetZmqServerAddress()); + printf("Zmq Client at %s\n", zmqsocket->GetZmqServerAddress().c_str()); // send socket ZmqSocket* zmqsocket2 = 0; diff --git a/slsDetectorSoftware/CMakeLists.txt b/slsDetectorSoftware/CMakeLists.txt index b892d81ab..264ab07b9 100755 --- a/slsDetectorSoftware/CMakeLists.txt +++ b/slsDetectorSoftware/CMakeLists.txt @@ -15,7 +15,7 @@ add_library(slsDetectorShared SHARED ) -if(SLS_LTO_AVAILABLE) +if((CMAKE_BUILD_TYPE STREQUAL "Release") AND SLS_LTO_AVAILABLE) set_property(TARGET slsDetectorShared PROPERTY INTERPROCEDURAL_OPTIMIZATION True) endif() diff --git a/slsReceiverSoftware/CMakeLists.txt b/slsReceiverSoftware/CMakeLists.txt index 5e9131484..ecc9a262a 100755 --- a/slsReceiverSoftware/CMakeLists.txt +++ b/slsReceiverSoftware/CMakeLists.txt @@ -30,7 +30,7 @@ add_library(slsReceiverShared SHARED ) -if(SLS_LTO_AVAILABLE) +if((CMAKE_BUILD_TYPE STREQUAL "Release") AND SLS_LTO_AVAILABLE) set_property(TARGET slsReceiverShared PROPERTY INTERPROCEDURAL_OPTIMIZATION True) endif() diff --git a/slsSupportLib/CMakeLists.txt b/slsSupportLib/CMakeLists.txt index af8ac7383..c595d317e 100755 --- a/slsSupportLib/CMakeLists.txt +++ b/slsSupportLib/CMakeLists.txt @@ -39,6 +39,7 @@ if(SLS_DEVEL_HEADERS) include/StaticVector.h include/UdpRxSocket.h include/versionAPI.h + include/ZmqSocket.h ) endif() @@ -48,7 +49,7 @@ add_library(slsSupportLib SHARED ) -if(SLS_LTO_AVAILABLE) +if((CMAKE_BUILD_TYPE STREQUAL "Release") AND SLS_LTO_AVAILABLE) set_property(TARGET slsSupportLib PROPERTY INTERPROCEDURAL_OPTIMIZATION True) endif() diff --git a/slsSupportLib/include/ZmqSocket.h b/slsSupportLib/include/ZmqSocket.h index d529e32e1..033392b0f 100644 --- a/slsSupportLib/include/ZmqSocket.h +++ b/slsSupportLib/include/ZmqSocket.h @@ -16,8 +16,9 @@ #define ROIVERBOSITY class zmq_msg_t; +#include "container_utils.h" #include - +#include /** zmq header structure */ struct zmqHeader { /** true if incoming data, false if end of acquisition */ @@ -42,7 +43,7 @@ struct zmqHeader { /** progress in percentage */ int progress{0}; /** file name prefix */ - std::string fname{""}; + std::string fname; /** header from detector */ uint64_t frameNumber{0}; uint32_t expLength{0}; @@ -93,11 +94,6 @@ class ZmqSocket { */ ZmqSocket(const uint32_t portnumber, const char *ethip); - /** - * Destructor - */ - ~ZmqSocket() = default; - /** * Returns Port Number * @returns Port Number @@ -108,14 +104,7 @@ class ZmqSocket { * Returns Server Address * @returns Server Address */ - char *GetZmqServerAddress() { return sockfd.serverAddress; } - - /** - * Returns Socket Descriptor - * @reutns Socket descriptor - */ - - void *GetsocketDescriptor() { return sockfd.socketDescriptor; } + std::string GetZmqServerAddress() { return sockfd.serverAddress; } /** * Connect client socket to server socket @@ -126,35 +115,7 @@ class ZmqSocket { /** * Unbinds the Socket */ - void Disconnect() { sockfd.Disconnect(); }; - - /** - * Close Socket and destroy Context - */ - void Close() { sockfd.Close(); }; - - /** - * Convert Hostname to Internet address info structure - * One must use freeaddrinfo(res) after using it - * @param hostname hostname - * @param res address of pointer to address info structure - * @return 1 for fail, 0 for success - */ - // Do not make this static (for multi threading environment) - int ConvertHostnameToInternetAddress(const char *const hostname, - struct addrinfo **res); - - /** - * Convert Internet Address structure pointer to ip string (char*) - * Clears the internet address structure as well - * @param res pointer to internet address structure - * @param ip pointer to char array to store result in - * @param ipsize size available in ip buffer - * @return 1 for fail, 0 for success - */ - // Do not make this static (for multi threading environment) - int ConvertInternetAddresstoIpString(struct addrinfo *res, char *ip, - const int ipsize); + void Disconnect() { sockfd.Disconnect(); } /** * Send Message Header @@ -224,7 +185,7 @@ class ZmqSocket { class mySocketDescriptors { public: /** Constructor */ - mySocketDescriptors(); + mySocketDescriptors(bool server); /** Destructor */ ~mySocketDescriptors(); /** Unbinds the Socket */ @@ -232,19 +193,21 @@ class ZmqSocket { /** Close Socket and destroy Context */ void Close(); /** true if server, else false */ - bool server; + const bool server; /** Server Address */ - char serverAddress[1000]; + std::string serverAddress; /** Context Descriptor */ void *contextDescriptor; /** Socket Descriptor */ void *socketDescriptor; }; - private: /** Port Number */ uint32_t portno; /** Socket descriptor */ mySocketDescriptors sockfd; + + std::unique_ptr header_buffer = + sls::make_unique(MAX_STR_LENGTH); }; diff --git a/slsSupportLib/src/ZmqSocket.cpp b/slsSupportLib/src/ZmqSocket.cpp index f3ebe7069..059e73340 100644 --- a/slsSupportLib/src/ZmqSocket.cpp +++ b/slsSupportLib/src/ZmqSocket.cpp @@ -1,35 +1,25 @@ #include "ZmqSocket.h" #include "logger.h" -#include //inet_ntoa #include #include -#include //gethostbyname() #include #include //usleep in some machines #include +#include #include +#include "network_utils.h" //ip using namespace rapidjson; ZmqSocket::ZmqSocket(const char *const hostname_or_ip, const uint32_t portnumber) - : portno(portnumber) -// headerMessage(0) + : portno(portnumber), sockfd(false) { - char ip[MAX_STR_LENGTH] = ""; - memset(ip, 0, MAX_STR_LENGTH); - - // convert hostname to ip (not required, but a test that returns if failed) - struct addrinfo *result; - if ((ConvertHostnameToInternetAddress(hostname_or_ip, &result)) || - (ConvertInternetAddresstoIpString(result, ip, MAX_STR_LENGTH))) - throw sls::ZmqSocketError("Could convert IP to string"); - - std::string sip(ip); - // construct address - sprintf(sockfd.serverAddress, "tcp://%s:%d", sip.c_str(), portno); -#ifdef VERBOSE - cprintf(BLUE, "address:%s\n", sockfd.serverAddress); -#endif + // Extra check that throws if conversion fails, could be removed + auto ipstr = sls::HostnameToIp(hostname_or_ip).str(); + std::ostringstream oss; + oss << "tcp://" << ipstr << ":" << portno; + sockfd.serverAddress = oss.str(); + LOG(logDEBUG) << "zmq address: " << sockfd.serverAddress; // create context sockfd.contextDescriptor = zmq_ctx_new(); @@ -40,7 +30,6 @@ ZmqSocket::ZmqSocket(const char *const hostname_or_ip, sockfd.socketDescriptor = zmq_socket(sockfd.contextDescriptor, ZMQ_SUB); if (sockfd.socketDescriptor == nullptr) { PrintError(); - Close(); throw sls::ZmqSocketError("Could not create socket"); } @@ -48,104 +37,57 @@ ZmqSocket::ZmqSocket(const char *const hostname_or_ip, // an empty string implies receiving any messages if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_SUBSCRIBE, "", 0)) { PrintError(); - Close(); throw sls::ZmqSocketError("Could set socket opt"); } // ZMQ_LINGER default is already -1 means no messages discarded. use this // options if optimizing required ZMQ_SNDHWM default is 0 means no limit. // use this to optimize if optimizing required eg. int value = -1; - int value = 0; + const int value = 0; if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_LINGER, &value, sizeof(value))) { PrintError(); - Close(); throw sls::ZmqSocketError("Could not set ZMQ_LINGER"); } } ZmqSocket::ZmqSocket(const uint32_t portnumber, const char *ethip) - : - - portno(portnumber) -// headerMessage(0) + :portno(portnumber), sockfd(true) { - sockfd.server = true; - // create context sockfd.contextDescriptor = zmq_ctx_new(); if (sockfd.contextDescriptor == nullptr) throw sls::ZmqSocketError("Could not create contextDescriptor"); + // create publisher sockfd.socketDescriptor = zmq_socket(sockfd.contextDescriptor, ZMQ_PUB); if (sockfd.socketDescriptor == nullptr) { PrintError(); - Close(); throw sls::ZmqSocketError("Could not create socket"); } - // Socket Options provided above + // construct address, can be refactored with libfmt + std::ostringstream oss; + oss << "tcp://" << ethip << ":" << portno; + sockfd.serverAddress = oss.str(); + LOG(logDEBUG) << "zmq address: " << sockfd.serverAddress; - // construct addresss - sprintf(sockfd.serverAddress, "tcp://%s:%d", ethip, portno); -#ifdef VERBOSE - cprintf(BLUE, "address:%s\n", sockfd.serverAddress); -#endif // bind address - if (zmq_bind(sockfd.socketDescriptor, sockfd.serverAddress) < 0) { + if (zmq_bind(sockfd.socketDescriptor, sockfd.serverAddress.c_str())) { PrintError(); - Close(); throw sls::ZmqSocketError("Could not bind socket"); } - // sleep for a few milliseconds to allow a slow-joiner usleep(200 * 1000); }; int ZmqSocket::Connect() { - if (zmq_connect(sockfd.socketDescriptor, sockfd.serverAddress) < 0) { + if (zmq_connect(sockfd.socketDescriptor, sockfd.serverAddress.c_str())) { PrintError(); return 1; } return 0; } -int ZmqSocket::ConvertHostnameToInternetAddress(const char *const hostname, - struct addrinfo **res) { - // criteria in selecting socket address structures returned by res - struct addrinfo hints; - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; - // get host info into res - int errcode = getaddrinfo(hostname, nullptr, &hints, res); - if (errcode != 0) { - LOG(logERROR) << "Error: Could not convert hostname " << hostname - << " to internet address (zmq):" << gai_strerror(errcode); - } else { - if (*res == nullptr) { - LOG(logERROR) << "Could not convert hostname " << hostname - << " to internet address (zmq): " - "gettaddrinfo returned null"; - } else { - return 0; - } - } - LOG(logERROR) << "Could not convert hostname to internet address"; - return 1; -}; - -int ZmqSocket::ConvertInternetAddresstoIpString(struct addrinfo *res, char *ip, - const int ipsize) { - if (inet_ntop(res->ai_family, - &((struct sockaddr_in *)res->ai_addr)->sin_addr, ip, - ipsize) != nullptr) { - freeaddrinfo(res); - return 0; - } - LOG(logERROR) << "Could not convert internet address to ip string"; - return 1; -} - int ZmqSocket::SendHeader(int index, zmqHeader header) { /** Json Header Format */ @@ -182,8 +124,8 @@ int ZmqSocket::SendHeader(int index, zmqHeader header) { "\"quad\":%u" ; //"}\n"; - char buf[MAX_STR_LENGTH] = ""; - sprintf(buf, jsonHeaderFormat, header.jsonversion, header.dynamicRange, + memset(header_buffer.get(),'\0',MAX_STR_LENGTH); //TODO! Do we need this + sprintf(header_buffer.get(), jsonHeaderFormat, header.jsonversion, header.dynamicRange, header.fileIndex, header.ndetx, header.ndety, header.npixelsx, header.npixelsy, header.imageSize, header.acqIndex, header.frameIndex, header.progress, header.fname.c_str(), @@ -198,31 +140,31 @@ int ZmqSocket::SendHeader(int index, zmqHeader header) { header.flippedDataX, header.quad); if (header.addJsonHeader.size() > 0) { - strcat(buf, ", "); - strcat(buf, "\"addJsonHeader\": {"); + strcat(header_buffer.get(), ", "); + strcat(header_buffer.get(), "\"addJsonHeader\": {"); for (auto it = header.addJsonHeader.begin(); it != header.addJsonHeader.end(); ++it) { if (it != header.addJsonHeader.begin()) { - strcat(buf, ", "); + strcat(header_buffer.get(), ", "); } - strcat(buf, "\""); - strcat(buf, it->first.c_str()); - strcat(buf, "\":\""); - strcat(buf, it->second.c_str()); - strcat(buf, "\""); + strcat(header_buffer.get(), "\""); + strcat(header_buffer.get(), it->first.c_str()); + strcat(header_buffer.get(), "\":\""); + strcat(header_buffer.get(), it->second.c_str()); + strcat(header_buffer.get(), "\""); } - strcat(buf, " } "); + strcat(header_buffer.get(), " } "); } - strcat(buf, "}\n"); - int length = strlen(buf); + strcat(header_buffer.get(), "}\n"); + int length = strlen(header_buffer.get()); #ifdef VERBOSE // if(!index) cprintf(BLUE, "%d : Streamer: buf: %s\n", index, buf); #endif - if (zmq_send(sockfd.socketDescriptor, buf, length, + if (zmq_send(sockfd.socketDescriptor, header_buffer.get(), length, header.data ? ZMQ_SNDMORE : 0) < 0) { PrintError(); return 0; @@ -243,18 +185,17 @@ int ZmqSocket::SendData(char *buf, int length) { int ZmqSocket::ReceiveHeader(const int index, zmqHeader &zHeader, uint32_t version) { - std::vector buffer(MAX_STR_LENGTH); - int len = - zmq_recv(sockfd.socketDescriptor, buffer.data(), buffer.size(), 0); - if (len > 0) { + const int bytes_received = + zmq_recv(sockfd.socketDescriptor, header_buffer.get(), MAX_STR_LENGTH, 0); + if (bytes_received > 0) { #ifdef ZMQ_DETAIL cprintf(BLUE, "Header %d [%d] Length: %d Header:%s \n", index, portno, - len, buffer.data()); + bytes_received, buffer.data()); #endif - if (ParseHeader(index, len, buffer.data(), zHeader, version)) { + if (ParseHeader(index, bytes_received, header_buffer.get(), zHeader, version)) { #ifdef ZMQ_DETAIL cprintf(RED, "Parsed Header %d [%d] Length: %d Header:%s \n", index, - portno, len, buffer.data()); + portno, bytes_received, buffer.data()); #endif if (!zHeader.data) { #ifdef ZMQ_DETAIL @@ -278,13 +219,10 @@ int ZmqSocket::ParseHeader(const int index, int length, char *buff, if (document.Parse(buff, length).HasParseError()) { LOG(logERROR) << index << " Could not parse. len:" << length << ": Message:" << buff; - fflush(stdout); - // char* buf = (char*) zmq_msg_data (&message); for (int i = 0; i < length; ++i) { cprintf(RED, "%02x ", buff[i]); } - printf("\n"); - fflush(stdout); + std::cout << std::endl; return 0; } @@ -435,17 +373,17 @@ void ZmqSocket::PrintError() { } // Nested class to do RAII handling of socket descriptors -ZmqSocket::mySocketDescriptors::mySocketDescriptors() - : server(false), contextDescriptor(nullptr), socketDescriptor(nullptr){}; +ZmqSocket::mySocketDescriptors::mySocketDescriptors(bool server) + : server(server), contextDescriptor(nullptr), socketDescriptor(nullptr){}; ZmqSocket::mySocketDescriptors::~mySocketDescriptors() { Disconnect(); Close(); } void ZmqSocket::mySocketDescriptors::Disconnect() { if (server) - zmq_unbind(socketDescriptor, serverAddress); + zmq_unbind(socketDescriptor, serverAddress.c_str()); else - zmq_disconnect(socketDescriptor, serverAddress); + zmq_disconnect(socketDescriptor, serverAddress.c_str()); }; void ZmqSocket::mySocketDescriptors::Close() { if (socketDescriptor != nullptr) { diff --git a/slsSupportLib/tests/test-ZmqSocket.cpp b/slsSupportLib/tests/test-ZmqSocket.cpp index 36e93ff10..520738bae 100644 --- a/slsSupportLib/tests/test-ZmqSocket.cpp +++ b/slsSupportLib/tests/test-ZmqSocket.cpp @@ -1,25 +1,114 @@ #include "ZmqSocket.h" #include "catch.hpp" +TEST_CASE("Throws when cannot create socket") { + REQUIRE_THROWS(ZmqSocket("sdiasodjajpvv", 5076001)); +} + +TEST_CASE("Get port number for sub") { + constexpr int port = 50001; + ZmqSocket sub("localhost", port); + REQUIRE(sub.GetPortNumber() == port); +} + +TEST_CASE("Get port number for pub") { + constexpr int port = 50001; + ZmqSocket pub(port, "*"); + REQUIRE(pub.GetPortNumber() == port); +} + +TEST_CASE("Server address") { + constexpr int port = 50001; + ZmqSocket pub(port, "*"); + REQUIRE(pub.GetZmqServerAddress() == std::string("tcp://*:50001")); +} + TEST_CASE("Send header on localhost") { constexpr int port = 50001; ZmqSocket sub("localhost", port); sub.Connect(); ZmqSocket pub(port, "*"); - + // Header to send zmqHeader header; - + header.data = false; // if true we wait for the data + header.jsonversion = 0; + header.dynamicRange = 32; + header.fileIndex = 7; + header.ndetx = 3; + header.ndety = 1; + header.npixelsx = 724; + header.npixelsy = 324; + header.imageSize = 200; header.fname = "hej"; - header.data = 0; pub.SendHeader(0, header); - - zmqHeader received_header; sub.ReceiveHeader(0, received_header, 0); REQUIRE(received_header.fname == "hej"); + REQUIRE(received_header.dynamicRange == 32); + REQUIRE(received_header.fileIndex == 7); + REQUIRE(received_header.ndetx == 3); + REQUIRE(received_header.ndety == 1); + REQUIRE(received_header.npixelsx == 724); + REQUIRE(received_header.npixelsy == 324); + REQUIRE(received_header.imageSize == 200); +} + +TEST_CASE("Send serveral headers of different length") { + constexpr int port = 50001; + ZmqSocket sub("localhost", port); + sub.Connect(); + + ZmqSocket pub(port, "*"); + + zmqHeader header; + header.data = false; // if true we wait for the data + header.fname = "short_name"; + + zmqHeader received_header; + + pub.SendHeader(0, header); + sub.ReceiveHeader(0, received_header, 0); + REQUIRE(received_header.fname == "short_name"); + + header.fname = "this_time_a_much_longer_name"; + pub.SendHeader(0, header); + sub.ReceiveHeader(0, received_header, 0); + REQUIRE(received_header.fname == "this_time_a_much_longer_name"); + + header.fname = "short"; + pub.SendHeader(0, header); + sub.ReceiveHeader(0, received_header, 0); + REQUIRE(received_header.fname == "short"); +} + +TEST_CASE("Send header and data") { + constexpr int port = 50001; + ZmqSocket sub("localhost", port); + sub.Connect(); + + ZmqSocket pub(port, "*"); + + std::vector data{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + const int nbytes = data.size() * sizeof(decltype(data)::value_type); + zmqHeader header; + header.data = true; + header.imageSize = nbytes; + + pub.SendHeader(0, header); + pub.SendData((char *)data.data(), nbytes); + + zmqHeader received_header; + sub.ReceiveHeader(0, received_header, 0); + std::vector received_data(received_header.imageSize / sizeof(int)); + sub.ReceiveData(0, (char *)received_data.data(), received_header.imageSize); + + REQUIRE(data.size() == received_data.size()); + for (size_t i = 0; i != data.size(); ++i) { + REQUIRE(data[i] == received_data[i]); + } } \ No newline at end of file