From b751238fc122e05effbdb7386d267a8108dce3b0 Mon Sep 17 00:00:00 2001 From: Erik Frojdh Date: Wed, 18 Mar 2020 12:12:01 +0100 Subject: [PATCH] ZmqSocket --- slsReceiverSoftware/src/Implementation.cpp | 1 + slsSupportLib/CMakeLists.txt | 1 + slsSupportLib/include/ZmqSocket.h | 753 +++++---------------- slsSupportLib/src/ZmqSocket.cpp | 410 +++++++++++ 4 files changed, 600 insertions(+), 565 deletions(-) create mode 100644 slsSupportLib/src/ZmqSocket.cpp diff --git a/slsReceiverSoftware/src/Implementation.cpp b/slsReceiverSoftware/src/Implementation.cpp index cb7ba7877..8f1b53faa 100755 --- a/slsReceiverSoftware/src/Implementation.cpp +++ b/slsReceiverSoftware/src/Implementation.cpp @@ -15,6 +15,7 @@ #include #include #include // stat +#include /** cosntructor & destructor */ diff --git a/slsSupportLib/CMakeLists.txt b/slsSupportLib/CMakeLists.txt index 749cd61d9..54f58ce2d 100755 --- a/slsSupportLib/CMakeLists.txt +++ b/slsSupportLib/CMakeLists.txt @@ -6,6 +6,7 @@ set(SOURCES src/ServerSocket.cpp src/ServerInterface.cpp src/network_utils.cpp + src/ZmqSocket.cpp ) set(HEADERS diff --git a/slsSupportLib/include/ZmqSocket.h b/slsSupportLib/include/ZmqSocket.h index 29fa36c53..e4a4a75db 100755 --- a/slsSupportLib/include/ZmqSocket.h +++ b/slsSupportLib/include/ZmqSocket.h @@ -7,424 +7,170 @@ *@short functions to open/close zmq sockets */ - #include "sls_detector_exceptions.h" - -#include //inet_ntoa -#include -#include -#include //gethostbyname() #include //json header in zmq stream -#include -#include //usleep in some machines -#include -#include -using namespace rapidjson; #define MAX_STR_LENGTH 1000 // #define ZMQ_DETAIL #define ROIVERBOSITY - +class zmq_msg_t; class ZmqSocket { -public: + public: + // Socket Options for optimization + // ZMQ_LINGER default is already -1 means no messages discarded. use this + // options if optimizing required ZMQ_SNDHWM default is 0 means no limit. use + // this to optimize if optimizing required + // eg. int value = -1; + // if (zmq_setsockopt(socketDescriptor, ZMQ_LINGER, &value,sizeof(value))) { + // Close(); + /** + * Constructor for a client + * Creates socket, context and connects to server + * @param hostname hostname or ip of server + * @param portnumber port number + */ + ZmqSocket(const char *const hostname_or_ip, const uint32_t portnumber); - //Socket Options for optimization - //ZMQ_LINGER default is already -1 means no messages discarded. use this options if optimizing required - //ZMQ_SNDHWM default is 0 means no limit. use this to optimize if optimizing required - // eg. int value = -1; - // if (zmq_setsockopt(socketDescriptor, ZMQ_LINGER, &value,sizeof(value))) { - // Close(); - /** - * Constructor for a client - * Creates socket, context and connects to server - * @param hostname hostname or ip of server - * @param portnumber port number - */ - ZmqSocket (const char* const hostname_or_ip, const uint32_t portnumber): - portno (portnumber) - // headerMessage(0) - { - char ip[MAX_STR_LENGTH] = ""; - memset(ip, 0, MAX_STR_LENGTH); + /** + * Constructor for a server + * Creates socket, context and connects to server + * @param hostname hostname or ip of server + * @param portnumber port number + * @param ethip is the ip of the ethernet interface to stream zmq from + */ + ZmqSocket(const uint32_t portnumber, const char *ethip); - // convert hostname to ip (not required, but a test that returns if failed) - struct addrinfo *result; - if ((ConvertHostnameToInternetAddress(hostname_or_ip, &result)) || - (ConvertInternetAddresstoIpString(result, ip, MAX_STR_LENGTH))) - throw sls::ZmqSocketError("Could convert IP to string"); + /** + * Destructor + */ + ~ZmqSocket() = default; - // construct address - sprintf (sockfd.serverAddress, "tcp://%s:%d", ip, portno); -#ifdef VERBOSE - cprintf(BLUE,"address:%s\n",sockfd.serverAddress); -#endif + /** + * Returns Port Number + * @returns Port Number + */ + uint32_t GetPortNumber() { return portno; } - // create context - sockfd.contextDescriptor = zmq_ctx_new(); - if (sockfd.contextDescriptor == 0) - throw sls::ZmqSocketError("Could not create contextDescriptor"); + /** + * Returns Server Address + * @returns Server Address + */ + char *GetZmqServerAddress() { return sockfd.serverAddress; } - // create publisher - sockfd.socketDescriptor = zmq_socket (sockfd.contextDescriptor, ZMQ_SUB); - if (sockfd.socketDescriptor == 0) { - PrintError (); - Close (); - throw sls::ZmqSocketError("Could not create socket"); - } + /** + * Returns Socket Descriptor + * @reutns Socket descriptor + */ - //Socket Options provided above - // an empty string implies receiving any messages - if ( zmq_setsockopt(sockfd.socketDescriptor, ZMQ_SUBSCRIBE, "", 0)) { - PrintError (); - Close(); - throw sls::ZmqSocketError("Could set socket opt"); - } - //ZMQ_LINGER default is already -1 means no messages discarded. use this options if optimizing required - //ZMQ_SNDHWM default is 0 means no limit. use this to optimize if optimizing required - // eg. int value = -1; - int value = 0; - if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_LINGER, &value,sizeof(value))) { - PrintError (); - Close(); - throw sls::ZmqSocketError("Could not set ZMQ_LINGER"); - } - }; + void *GetsocketDescriptor() { return sockfd.socketDescriptor; } - /** - * Constructor for a server - * Creates socket, context and connects to server - * @param hostname hostname or ip of server - * @param portnumber port number - * @param ethip is the ip of the ethernet interface to stream zmq from - */ - ZmqSocket (const uint32_t portnumber, const char *ethip): + /** + * Connect client socket to server socket + * @returns 1 for fail, 0 for success + */ + int Connect(); - portno (portnumber) - // headerMessage(0) - { - sockfd.server = true; + /** + * Unbinds the Socket + */ + void Disconnect() { sockfd.Disconnect(); }; - // create context - sockfd.contextDescriptor = zmq_ctx_new(); - if (sockfd.contextDescriptor == 0) - throw sls::ZmqSocketError("Could not create contextDescriptor"); - // create publisher - sockfd.socketDescriptor = zmq_socket (sockfd.contextDescriptor, ZMQ_PUB); - if (sockfd.socketDescriptor == 0) { - PrintError (); - Close (); - throw sls::ZmqSocketError("Could not create socket"); - } + /** + * Close Socket and destroy Context + */ + void Close() { sockfd.Close(); }; - //Socket Options provided above + /** + * Convert Hostname to Internet address info structure + * One must use freeaddrinfo(res) after using it + * @param hostname hostname + * @param res address of pointer to address info structure + * @return 1 for fail, 0 for success + */ + // Do not make this static (for multi threading environment) + int ConvertHostnameToInternetAddress(const char *const hostname, + struct addrinfo **res); - // construct addresss - sprintf (sockfd.serverAddress,"tcp://%s:%d", ethip, portno); -#ifdef VERBOSE - cprintf(BLUE,"address:%s\n",sockfd.serverAddress); -#endif - // bind address - if (zmq_bind (sockfd.socketDescriptor, sockfd.serverAddress) < 0) { - PrintError (); - Close (); - throw sls::ZmqSocketError("Could not bind socket"); - } + /** + * Convert Internet Address structure pointer to ip string (char*) + * Clears the internet address structure as well + * @param res pointer to internet address structure + * @param ip pointer to char array to store result in + * @param ipsize size available in ip buffer + * @return 1 for fail, 0 for success + */ + // Do not make this static (for multi threading environment) + int ConvertInternetAddresstoIpString(struct addrinfo *res, char *ip, + const int ipsize); - //sleep for a few milliseconds to allow a slow-joiner - usleep(200* 1000); - }; + /** + * Send Message Header + * @param index self index for debugging + * @param dummy true if a dummy message for end of acquisition + * @param jsonversion json version + * @param dynamicrange dynamic range + * @param fileIndex file or acquisition index + * @param ndetx number of detectors in x axis + * @param ndety number of detectors in y axis + * @param npixelsx number of pixels/channels in x axis for this zmq socket + * @param npixelsy number of pixels/channels in y axis for this zmq socket + * @param imageSize number of bytes for an image in this socket + * @param frameNumber current frame number + * @param expLength exposure length or subframe index if eiger + * @param packetNumber number of packets caught for this frame + * @param bunchId bunch id + * @param timestamp time stamp + * @param modId module Id + * @param row row index in complete detector + * @param column column index in complete detector + * @param reserved reserved + * @param debug debug + * @param roundRNumber not used yet + * @param detType detector enum + * @param version detector header version + * @param gapPixelsEnable gap pixels enable (exception: if gap pixels enable + * for 4 bit mode, data is not yet gap pixel enabled in receiver) + * @param flippedDataX if it is flipped across x axis + * @param quadEnable if quad is enabled + * @param additionalJsonHeader additional json header + * @returns 0 if error, else 1 + */ + int SendHeaderData( + int index, bool dummy, uint32_t jsonversion, uint32_t dynamicrange = 0, + uint64_t fileIndex = 0, uint32_t ndetx = 0, uint32_t ndety = 0, + uint32_t npixelsx = 0, uint32_t npixelsy = 0, uint32_t imageSize = 0, + uint64_t acqIndex = 0, uint64_t fIndex = 0, const char *fname = nullptr, + uint64_t frameNumber = 0, uint32_t expLength = 0, + uint32_t packetNumber = 0, uint64_t bunchId = 0, uint64_t timestamp = 0, + uint16_t modId = 0, uint16_t row = 0, uint16_t column = 0, + uint16_t reserved = 0, uint32_t debug = 0, uint16_t roundRNumber = 0, + uint8_t detType = 0, uint8_t version = 0, int gapPixelsEnable = 0, + int flippedDataX = 0, uint32_t quadEnable = 0, + std::string *additionalJsonHeader = 0); - /** - * Destructor - */ - ~ZmqSocket () { - //mySocketDescriptor destructor also gets called - }; + /** + * Send Message Body + * @param buf message + * @param length length of message + * @returns 0 if error, else 1 + */ + int SendData(char *buf, int length); - /** - * Returns Port Number - * @returns Port Number - */ - uint32_t GetPortNumber () { return portno; }; - - /** - * Returns Server Address - * @returns Server Address - */ - char* GetZmqServerAddress () { return sockfd.serverAddress; }; - - /** - * Returns Socket Descriptor - * @reutns Socket descriptor - */ - - void* GetsocketDescriptor () { return sockfd.socketDescriptor; }; - - /** - * Connect client socket to server socket - * @returns 1 for fail, 0 for success - */ - int Connect() { - if (zmq_connect(sockfd.socketDescriptor, sockfd.serverAddress) < 0) { - PrintError (); - return 1; - } - return 0; - } - - /** - * Unbinds the Socket - */ - void Disconnect () {sockfd.Disconnect();}; - - /** - * Close Socket and destroy Context - */ - void Close () { sockfd.Close(); }; - - /** - * Convert Hostname to Internet address info structure - * One must use freeaddrinfo(res) after using it - * @param hostname hostname - * @param res address of pointer to address info structure - * @return 1 for fail, 0 for success - */ - // Do not make this static (for multi threading environment) - int ConvertHostnameToInternetAddress (const char* const hostname, struct addrinfo **res) { - // criteria in selecting socket address structures returned by res - struct addrinfo hints; - memset (&hints, 0, sizeof (hints)); - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; - // get host info into res - int errcode = getaddrinfo (hostname, NULL, &hints, res); - if (errcode != 0) { - LOG(logERROR) << "Error: Could not convert hostname " << hostname << " to internet address (zmq):" - << gai_strerror(errcode); - } else { - if (*res == NULL) { - LOG(logERROR) << "Could not convert hostname " << hostname << " to internet address (zmq): " - "gettaddrinfo returned null"; - } else{ - return 0; - } - } - LOG(logERROR) << "Could not convert hostname to internet address"; - return 1; - }; - - /** - * Convert Internet Address structure pointer to ip string (char*) - * Clears the internet address structure as well - * @param res pointer to internet address structure - * @param ip pointer to char array to store result in - * @param ipsize size available in ip buffer - * @return 1 for fail, 0 for success - */ - // Do not make this static (for multi threading environment) - int ConvertInternetAddresstoIpString (struct addrinfo *res, char* ip, const int ipsize) { - if (inet_ntop (res->ai_family, &((struct sockaddr_in *) res->ai_addr)->sin_addr, ip, ipsize) != NULL) { - freeaddrinfo(res); - return 0; - } - LOG(logERROR) << "Could not convert internet address to ip string"; - return 1; - } - - - - /** - * Send Message Header - * @param index self index for debugging - * @param dummy true if a dummy message for end of acquisition - * @param jsonversion json version - * @param dynamicrange dynamic range - * @param fileIndex file or acquisition index - * @param ndetx number of detectors in x axis - * @param ndety number of detectors in y axis - * @param npixelsx number of pixels/channels in x axis for this zmq socket - * @param npixelsy number of pixels/channels in y axis for this zmq socket - * @param imageSize number of bytes for an image in this socket - * @param frameNumber current frame number - * @param expLength exposure length or subframe index if eiger - * @param packetNumber number of packets caught for this frame - * @param bunchId bunch id - * @param timestamp time stamp - * @param modId module Id - * @param row row index in complete detector - * @param column column index in complete detector - * @param reserved reserved - * @param debug debug - * @param roundRNumber not used yet - * @param detType detector enum - * @param version detector header version - * @param gapPixelsEnable gap pixels enable (exception: if gap pixels enable for 4 bit mode, data is not yet gap pixel enabled in receiver) - * @param flippedDataX if it is flipped across x axis - * @param quadEnable if quad is enabled - * @param additionalJsonHeader additional json header - * @returns 0 if error, else 1 - */ - int SendHeaderData ( int index, bool dummy, uint32_t jsonversion, uint32_t dynamicrange = 0, uint64_t fileIndex = 0, - uint32_t ndetx = 0, uint32_t ndety = 0, uint32_t npixelsx = 0, uint32_t npixelsy = 0, uint32_t imageSize = 0, - uint64_t acqIndex = 0, uint64_t fIndex = 0, const char* fname = NULL, - uint64_t frameNumber = 0, uint32_t expLength = 0, uint32_t packetNumber = 0, - uint64_t bunchId = 0, uint64_t timestamp = 0, - uint16_t modId = 0, uint16_t row = 0, uint16_t column = 0, uint16_t reserved = 0, - uint32_t debug = 0, uint16_t roundRNumber = 0, - uint8_t detType = 0, uint8_t version = 0, int gapPixelsEnable = 0, int flippedDataX = 0, - uint32_t quadEnable = 0, - std::string* additionalJsonHeader = 0) { - - - - /** Json Header Format */ - const char jsonHeaderFormat[] = - "{" - "\"jsonversion\":%u, " - "\"bitmode\":%u, " - "\"fileIndex\":%lu, " - "\"detshape\":[%u, %u], " - "\"shape\":[%u, %u], " - "\"size\":%u, " - "\"acqIndex\":%lu, " - "\"fIndex\":%lu, " - "\"fname\":\"%s\", " - "\"data\": %d, " - - "\"frameNumber\":%lu, " - "\"expLength\":%u, " - "\"packetNumber\":%u, " - "\"bunchId\":%lu, " - "\"timestamp\":%lu, " - "\"modId\":%u, " - "\"row\":%u, " - "\"column\":%u, " - "\"reserved\":%u, " - "\"debug\":%u, " - "\"roundRNumber\":%u, " - "\"detType\":%u, " - "\"version\":%u, " - - //additional stuff - "\"gappixels\":%u, " - "\"flippedDataX\":%u, " - "\"quad\":%u" - - ;//"}\n"; - char buf[MAX_STR_LENGTH] = ""; - sprintf(buf, jsonHeaderFormat, - jsonversion, dynamicrange, fileIndex, ndetx, ndety, npixelsx, npixelsy, imageSize, - acqIndex, fIndex, (fname == NULL)? "":fname, dummy?0:1, - - frameNumber, expLength, packetNumber, bunchId, timestamp, - modId, row, column, reserved, debug, roundRNumber, - detType, version, - - //additional stuff - gapPixelsEnable, - flippedDataX, - quadEnable - ); - - if (additionalJsonHeader && !((*additionalJsonHeader).empty())) { - strcat(buf, ", "); - strcat(buf, (*additionalJsonHeader).c_str()); - } - strcat(buf, "}\n"); - int length = strlen(buf); - -#ifdef VERBOSE - //if(!index) - cprintf(BLUE,"%d : Streamer: buf: %s\n", index, buf); -#endif - - if(zmq_send (sockfd.socketDescriptor, buf, length, dummy?0:ZMQ_SNDMORE) < 0) { - PrintError (); - return 0; - } -#ifdef VERBOSE - cprintf(GREEN,"[%u] send header data\n",portno); -#endif - return 1; - }; - - /** - * Send Message Body - * @param buf message - * @param length length of message - * @returns 0 if error, else 1 - */ - int SendData (char* buf, int length) { - if(zmq_send (sockfd.socketDescriptor, buf, length, 0) < 0) { - PrintError (); - return 0; - } -#ifdef VERBOSE - cprintf(GREEN,"[%u] send data\n",portno); -#endif - return 1; - }; - - - /** - * Receive Message - * @param index self index for debugging - * @param message message - * @returns length of message, -1 if error - */ - int ReceiveMessage(const int index, zmq_msg_t& message) { - int length = zmq_msg_recv (&message, sockfd.socketDescriptor, 0); - if (length == -1) { - PrintError (); - LOG(logERROR) << "Could not read header for socket " << index; - } -#ifdef VERBOSE - else - cprintf( RED,"Message %d Length: %d Header:%s \n", index, length, (char*) zmq_msg_data (&message) ); -#endif - return length; - }; - - - /** - * Receive Header (Important to close message after parsing header) - * @param index self index for debugging - * @param document parsed document reference - * @param version version that has to match, -1 to not care - * @returns 0 if error or end of acquisition, else 1 (call CloseHeaderMessage after parsing header) - */ - int ReceiveHeader(const int index, Document& document, uint32_t version) - { - std::vectorbuffer(MAX_STR_LENGTH); - int len = zmq_recv(sockfd.socketDescriptor, buffer.data(), buffer.size(),0); - if ( len > 0 ) { - bool dummy = false; -#ifdef ZMQ_DETAIL - cprintf( BLUE,"Header %d [%d] Length: %d Header:%s \n", index, portno, len, buffer.data()); -#endif - if ( ParseHeader (index, len, buffer.data(), document, dummy, version)) { -#ifdef ZMQ_DETAIL - cprintf( RED,"Parsed Header %d [%d] Length: %d Header:%s \n", index, portno, len, buffer.data() ); -#endif - if (dummy) { -#ifdef ZMQ_DETAIL - cprintf(RED,"%d [%d] Received end of acquisition\n", index, portno ); -#endif - return 0; - } -#ifdef ZMQ_DETAIL - cprintf(GREEN,"%d [%d] data\n",index, portno ); -#endif - return 1; - } - } - return 0; - }; + + /** + * Receive Header (Important to close message after parsing header) + * @param index self index for debugging + * @param document parsed document reference + * @param version version that has to match, -1 to not care + * @returns 0 if error or end of acquisition, else 1 (call + * CloseHeaderMessage after parsing header) + */ + int ReceiveHeader(const int index, rapidjson::Document &document, uint32_t version); /** * Close Header Message. Call this function if ReceiveHeader returned 1 @@ -444,183 +190,60 @@ public: * @param version version that has to match, -1 to not care * @returns true if successful else false */ - int ParseHeader(const int index, int length, char* buff, - Document& document, bool& dummy, uint32_t version) - { - if ( document.Parse( buff, length).HasParseError() ) { - LOG(logERROR) << index << " Could not parse. len:" << length << ": Message:" << buff; - fflush ( stdout ); - // char* buf = (char*) zmq_msg_data (&message); - for ( int i= 0; i < length; ++i ) { - cprintf(RED,"%02x ",buff[i]); - } - printf("\n"); - fflush( stdout ); - return 0; - } + int ParseHeader(const int index, int length, char *buff, rapidjson::Document &document, + bool &dummy, uint32_t version); - if (document["jsonversion"].GetUint() != version) { - LOG(logERROR) << "version mismatch. required " << version << ", got " << document["jsonversion"].GetUint(); - return 0; - } + /** + * Receive Data + * @param index self index for debugging + * @param buf buffer to copy image data to + * @param size size of image + * @returns length of data received + */ + int ReceiveData(const int index, char *buf, const int size); - dummy = false; - int temp = document["data"].GetUint(); - dummy = temp ? false : true; + /** + * Print error + */ + void PrintError(); - return 1; + private: + + /** + * Receive Message + * @param index self index for debugging + * @param message message + * @returns length of message, -1 if error + */ + int ReceiveMessage(const int index, zmq_msg_t &message); + /** + * Class to close socket descriptors automatically + * upon encountering exceptions in the ZmqSocket constructor + */ + class mySocketDescriptors { + public: + /** Constructor */ + mySocketDescriptors(); + /** Destructor */ + ~mySocketDescriptors(); + /** Unbinds the Socket */ + void Disconnect(); + /** Close Socket and destroy Context */ + void Close(); + /** true if server, else false */ + bool server; + /** Server Address */ + char serverAddress[1000]; + /** Context Descriptor */ + void *contextDescriptor; + /** Socket Descriptor */ + void *socketDescriptor; }; + private: + /** Port Number */ + uint32_t portno; - /** - * Receive Data - * @param index self index for debugging - * @param buf buffer to copy image data to - * @param size size of image - * @returns length of data received - */ - int ReceiveData(const int index, char* buf, const int size) - { - zmq_msg_t message; - zmq_msg_init (&message); - int length = ReceiveMessage(index, message); - - //actual data - if (length == size) { -#ifdef VERBOSE - cprintf(BLUE,"%d actual data\n", index); -#endif - memcpy(buf, (char*)zmq_msg_data(&message), size); - } - - //incorrect size (smaller) - else if (length < size){ -#ifdef ROIVERBOSITY - cprintf(RED,"Error: Received smaller packet size %d for socket %d\n", length, index); -#endif - memcpy(buf, (char*)zmq_msg_data(&message), length); - memset(buf+length,0xFF,size-length); - } - //incorrect size (larger) - else { - LOG(logERROR) << "Received weird packet size " << length << " for socket " << index; - memset(buf,0xFF,size); - } - - zmq_msg_close(&message); - return length; - }; - - - - /** - * Print error - */ - void PrintError () { - switch (errno) { - case EINVAL: - LOG(logERROR) << "The socket type/option or value/endpoint supplied is invalid (zmq)"; - break; - case EAGAIN: - LOG(logERROR) << "Non-blocking mode was requested and the message cannot be sent/available at the moment (zmq)"; - break; - case ENOTSUP: - LOG(logERROR) << "The zmq_send()/zmq_msg_recv() operation is not supported by this socket type (zmq)"; - break; - case EFSM: - LOG(logERROR) << "The zmq_send()/zmq_msg_recv() unavailable now as socket in inappropriate state (eg. ZMQ_REP). Look up messaging patterns (zmq)"; - break; - case EFAULT: - LOG(logERROR) << "The provided context/message is invalid (zmq)"; - break; - case EMFILE: - LOG(logERROR) << "The limit on the total number of open ØMQ sockets has been reached (zmq)"; - break; - case EPROTONOSUPPORT: - LOG(logERROR) << "The requested transport protocol is not supported (zmq)"; - break; - case ENOCOMPATPROTO: - LOG(logERROR) << "The requested transport protocol is not compatible with the socket type (zmq)"; - break; - case EADDRINUSE: - LOG(logERROR) << "The requested address is already in use (zmq)"; - break; - case EADDRNOTAVAIL: - LOG(logERROR) << "The requested address was not local (zmq)"; - break; - case ENODEV: - LOG(logERROR) << "The requested address specifies a nonexistent interface (zmq)"; - break; - case ETERM: - LOG(logERROR) << "The ØMQ context associated with the specified socket was terminated (zmq)"; - break; - case ENOTSOCK: - LOG(logERROR) << "The provided socket was invalid (zmq)"; - break; - case EINTR: - LOG(logERROR) << "The operation was interrupted by delivery of a signal (zmq)"; - break; - case EMTHREAD: - LOG(logERROR) << "No I/O thread is available to accomplish the task (zmq)"; - break; - default: - LOG(logERROR) << "Unknown socket error (zmq)"; - break; - } - }; - - -private: - - /** - * Class to close socket descriptors automatically - * upon encountering exceptions in the ZmqSocket constructor - */ - class mySocketDescriptors { - public: - /** Constructor */ - mySocketDescriptors(): - server(false), - contextDescriptor(0), - socketDescriptor(0) {}; - /** Destructor */ - ~mySocketDescriptors() { - Disconnect(); - Close(); - } - /** Unbinds the Socket */ - void Disconnect () { - if (server) - zmq_unbind (socketDescriptor, serverAddress); - else - zmq_disconnect (socketDescriptor, serverAddress); - }; - /** Close Socket and destroy Context */ - void Close () { - if (socketDescriptor != NULL) { - zmq_close (socketDescriptor); - socketDescriptor = NULL; - } - - if (contextDescriptor != NULL) { - zmq_ctx_destroy (contextDescriptor); - contextDescriptor = NULL; - } - }; - /** true if server, else false */ - bool server; - /** Server Address */ - char serverAddress[1000]; - /** Context Descriptor */ - void* contextDescriptor; - /** Socket Descriptor */ - void* socketDescriptor; - }; - -private: - /** Port Number */ - uint32_t portno; - - /** Socket descriptor */ - mySocketDescriptors sockfd; + /** Socket descriptor */ + mySocketDescriptors sockfd; }; diff --git a/slsSupportLib/src/ZmqSocket.cpp b/slsSupportLib/src/ZmqSocket.cpp new file mode 100644 index 000000000..0d606fd6c --- /dev/null +++ b/slsSupportLib/src/ZmqSocket.cpp @@ -0,0 +1,410 @@ +#include "ZmqSocket.h" +#include +#include +#include //inet_ntoa +#include +#include +#include //gethostbyname() +#include +#include //usleep in some machines + +using namespace rapidjson; +ZmqSocket::ZmqSocket(const char *const hostname_or_ip, + const uint32_t portnumber) + : portno(portnumber) +// headerMessage(0) +{ + char ip[MAX_STR_LENGTH] = ""; + memset(ip, 0, MAX_STR_LENGTH); + + // convert hostname to ip (not required, but a test that returns if failed) + struct addrinfo *result; + if ((ConvertHostnameToInternetAddress(hostname_or_ip, &result)) || + (ConvertInternetAddresstoIpString(result, ip, MAX_STR_LENGTH))) + throw sls::ZmqSocketError("Could convert IP to string"); + + // construct address + sprintf(sockfd.serverAddress, "tcp://%s:%d", ip, portno); +#ifdef VERBOSE + cprintf(BLUE, "address:%s\n", sockfd.serverAddress); +#endif + + // create context + sockfd.contextDescriptor = zmq_ctx_new(); + if (sockfd.contextDescriptor == 0) + throw sls::ZmqSocketError("Could not create contextDescriptor"); + + // create publisher + sockfd.socketDescriptor = zmq_socket(sockfd.contextDescriptor, ZMQ_SUB); + if (sockfd.socketDescriptor == 0) { + PrintError(); + Close(); + throw sls::ZmqSocketError("Could not create socket"); + } + + // Socket Options provided above + // an empty string implies receiving any messages + if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_SUBSCRIBE, "", 0)) { + PrintError(); + Close(); + throw sls::ZmqSocketError("Could set socket opt"); + } + // ZMQ_LINGER default is already -1 means no messages discarded. use this + // options if optimizing required ZMQ_SNDHWM default is 0 means no limit. + // use this to optimize if optimizing required eg. int value = -1; + int value = 0; + if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_LINGER, &value, + sizeof(value))) { + PrintError(); + Close(); + throw sls::ZmqSocketError("Could not set ZMQ_LINGER"); + } +} + +ZmqSocket::ZmqSocket(const uint32_t portnumber, const char *ethip) + : + + portno(portnumber) +// headerMessage(0) +{ + sockfd.server = true; + + // create context + sockfd.contextDescriptor = zmq_ctx_new(); + if (sockfd.contextDescriptor == 0) + throw sls::ZmqSocketError("Could not create contextDescriptor"); + // create publisher + sockfd.socketDescriptor = zmq_socket(sockfd.contextDescriptor, ZMQ_PUB); + if (sockfd.socketDescriptor == 0) { + PrintError(); + Close(); + throw sls::ZmqSocketError("Could not create socket"); + } + + // Socket Options provided above + + // construct addresss + sprintf(sockfd.serverAddress, "tcp://%s:%d", ethip, portno); +#ifdef VERBOSE + cprintf(BLUE, "address:%s\n", sockfd.serverAddress); +#endif + // bind address + if (zmq_bind(sockfd.socketDescriptor, sockfd.serverAddress) < 0) { + PrintError(); + Close(); + throw sls::ZmqSocketError("Could not bind socket"); + } + + // sleep for a few milliseconds to allow a slow-joiner + usleep(200 * 1000); +}; + +int ZmqSocket::Connect() { + if (zmq_connect(sockfd.socketDescriptor, sockfd.serverAddress) < 0) { + PrintError(); + return 1; + } + return 0; +} + +int ZmqSocket::ConvertHostnameToInternetAddress(const char *const hostname, + struct addrinfo **res) { + // criteria in selecting socket address structures returned by res + struct addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + // get host info into res + int errcode = getaddrinfo(hostname, NULL, &hints, res); + if (errcode != 0) { + LOG(logERROR) << "Error: Could not convert hostname " << hostname + << " to internet address (zmq):" << gai_strerror(errcode); + } else { + if (*res == NULL) { + LOG(logERROR) << "Could not convert hostname " << hostname + << " to internet address (zmq): " + "gettaddrinfo returned null"; + } else { + return 0; + } + } + LOG(logERROR) << "Could not convert hostname to internet address"; + return 1; +}; + +int ZmqSocket::ConvertInternetAddresstoIpString(struct addrinfo *res, char *ip, + const int ipsize) { + if (inet_ntop(res->ai_family, + &((struct sockaddr_in *)res->ai_addr)->sin_addr, ip, + ipsize) != NULL) { + freeaddrinfo(res); + return 0; + } + LOG(logERROR) << "Could not convert internet address to ip string"; + return 1; +} + +void ZmqSocket::PrintError() { + switch (errno) { + case EINVAL: + LOG(logERROR) << "The socket type/option or value/endpoint supplied is " + "invalid (zmq)"; + break; + case EAGAIN: + LOG(logERROR) << "Non-blocking mode was requested and the message " + "cannot be sent/available at the moment (zmq)"; + break; + case ENOTSUP: + LOG(logERROR) << "The zmq_send()/zmq_msg_recv() operation is not " + "supported by this socket type (zmq)"; + break; + case EFSM: + LOG(logERROR) << "The zmq_send()/zmq_msg_recv() unavailable now as " + "socket in inappropriate state (eg. ZMQ_REP). Look up " + "messaging patterns (zmq)"; + break; + case EFAULT: + LOG(logERROR) << "The provided context/message is invalid (zmq)"; + break; + case EMFILE: + LOG(logERROR) << "The limit on the total number of open ØMQ sockets " + "has been reached (zmq)"; + break; + case EPROTONOSUPPORT: + LOG(logERROR) + << "The requested transport protocol is not supported (zmq)"; + break; + case ENOCOMPATPROTO: + LOG(logERROR) << "The requested transport protocol is not compatible " + "with the socket type (zmq)"; + break; + case EADDRINUSE: + LOG(logERROR) << "The requested address is already in use (zmq)"; + break; + case EADDRNOTAVAIL: + LOG(logERROR) << "The requested address was not local (zmq)"; + break; + case ENODEV: + LOG(logERROR) + << "The requested address specifies a nonexistent interface (zmq)"; + break; + case ETERM: + LOG(logERROR) << "The ØMQ context associated with the specified socket " + "was terminated (zmq)"; + break; + case ENOTSOCK: + LOG(logERROR) << "The provided socket was invalid (zmq)"; + break; + case EINTR: + LOG(logERROR) + << "The operation was interrupted by delivery of a signal (zmq)"; + break; + case EMTHREAD: + LOG(logERROR) + << "No I/O thread is available to accomplish the task (zmq)"; + break; + default: + LOG(logERROR) << "Unknown socket error (zmq)"; + break; + } +} + +int ZmqSocket::ReceiveData(const int index, char *buf, const int size) { + zmq_msg_t message; + zmq_msg_init(&message); + int length = ReceiveMessage(index, message); + if (length == size) { + memcpy(buf, (char *)zmq_msg_data(&message), size); + } else if (length < size) { + memcpy(buf, (char *)zmq_msg_data(&message), length); + memset(buf + length, 0xFF, size - length); + } else { + LOG(logERROR) << "Received weird packet size " << length + << " for socket " << index; + memset(buf, 0xFF, size); + } + + zmq_msg_close(&message); + return length; +} + +int ZmqSocket::ParseHeader(const int index, int length, char *buff, + Document &document, bool &dummy, uint32_t version) { + if (document.Parse(buff, length).HasParseError()) { + LOG(logERROR) << index << " Could not parse. len:" << length + << ": Message:" << buff; + fflush(stdout); + // char* buf = (char*) zmq_msg_data (&message); + for (int i = 0; i < length; ++i) { + cprintf(RED, "%02x ", buff[i]); + } + printf("\n"); + fflush(stdout); + return 0; + } + + if (document["jsonversion"].GetUint() != version) { + LOG(logERROR) << "version mismatch. required " << version << ", got " + << document["jsonversion"].GetUint(); + return 0; + } + + dummy = false; + int temp = document["data"].GetUint(); + dummy = temp ? false : true; + + return 1; +} + +int ZmqSocket::ReceiveHeader(const int index, Document &document, + uint32_t version) { + std::vector buffer(MAX_STR_LENGTH); + int len = + zmq_recv(sockfd.socketDescriptor, buffer.data(), buffer.size(), 0); + if (len > 0) { + bool dummy = false; +#ifdef ZMQ_DETAIL + cprintf(BLUE, "Header %d [%d] Length: %d Header:%s \n", index, portno, + len, buffer.data()); +#endif + if (ParseHeader(index, len, buffer.data(), document, dummy, version)) { +#ifdef ZMQ_DETAIL + cprintf(RED, "Parsed Header %d [%d] Length: %d Header:%s \n", index, + portno, len, buffer.data()); +#endif + if (dummy) { +#ifdef ZMQ_DETAIL + cprintf(RED, "%d [%d] Received end of acquisition\n", index, + portno); +#endif + return 0; + } +#ifdef ZMQ_DETAIL + cprintf(GREEN, "%d [%d] data\n", index, portno); +#endif + return 1; + } + } + return 0; +}; + +int ZmqSocket::ReceiveMessage(const int index, zmq_msg_t &message) { + int length = zmq_msg_recv(&message, sockfd.socketDescriptor, 0); + if (length == -1) { + PrintError(); + LOG(logERROR) << "Could not read header for socket " << index; + } + return length; +} + +int ZmqSocket::SendData(char *buf, int length) { + if (zmq_send(sockfd.socketDescriptor, buf, length, 0) < 0) { + PrintError(); + return 0; + } + return 1; +} + +int ZmqSocket::SendHeaderData( + int index, bool dummy, uint32_t jsonversion, uint32_t dynamicrange, + uint64_t fileIndex, uint32_t ndetx, uint32_t ndety, uint32_t npixelsx, + uint32_t npixelsy, uint32_t imageSize, uint64_t acqIndex, uint64_t fIndex, + const char *fname, uint64_t frameNumber, uint32_t expLength, + uint32_t packetNumber, uint64_t bunchId, uint64_t timestamp, uint16_t modId, + uint16_t row, uint16_t column, uint16_t reserved, uint32_t debug, + uint16_t roundRNumber, uint8_t detType, uint8_t version, + int gapPixelsEnable, int flippedDataX, uint32_t quadEnable, + std::string *additionalJsonHeader) { + + /** Json Header Format */ + const char jsonHeaderFormat[] = "{" + "\"jsonversion\":%u, " + "\"bitmode\":%u, " + "\"fileIndex\":%lu, " + "\"detshape\":[%u, %u], " + "\"shape\":[%u, %u], " + "\"size\":%u, " + "\"acqIndex\":%lu, " + "\"fIndex\":%lu, " + "\"fname\":\"%s\", " + "\"data\": %d, " + + "\"frameNumber\":%lu, " + "\"expLength\":%u, " + "\"packetNumber\":%u, " + "\"bunchId\":%lu, " + "\"timestamp\":%lu, " + "\"modId\":%u, " + "\"row\":%u, " + "\"column\":%u, " + "\"reserved\":%u, " + "\"debug\":%u, " + "\"roundRNumber\":%u, " + "\"detType\":%u, " + "\"version\":%u, " + + // additional stuff + "\"gappixels\":%u, " + "\"flippedDataX\":%u, " + "\"quad\":%u" + + ; //"}\n"; + char buf[MAX_STR_LENGTH] = ""; + sprintf(buf, jsonHeaderFormat, jsonversion, dynamicrange, fileIndex, ndetx, + ndety, npixelsx, npixelsy, imageSize, acqIndex, fIndex, + (fname == NULL) ? "" : fname, dummy ? 0 : 1, + + frameNumber, expLength, packetNumber, bunchId, timestamp, modId, + row, column, reserved, debug, roundRNumber, detType, version, + + // additional stuff + gapPixelsEnable, flippedDataX, quadEnable); + + if (additionalJsonHeader && !((*additionalJsonHeader).empty())) { + strcat(buf, ", "); + strcat(buf, (*additionalJsonHeader).c_str()); + } + strcat(buf, "}\n"); + int length = strlen(buf); + +#ifdef VERBOSE + // if(!index) + cprintf(BLUE, "%d : Streamer: buf: %s\n", index, buf); +#endif + + if (zmq_send(sockfd.socketDescriptor, buf, length, + dummy ? 0 : ZMQ_SNDMORE) < 0) { + PrintError(); + return 0; + } +#ifdef VERBOSE + cprintf(GREEN, "[%u] send header data\n", portno); +#endif + return 1; +} + + +//Nested class to do RAII handling of socket descriptors +ZmqSocket::mySocketDescriptors::mySocketDescriptors() + : server(false), contextDescriptor(0), socketDescriptor(0){}; +ZmqSocket::mySocketDescriptors::~mySocketDescriptors() { + Disconnect(); + Close(); +} +void ZmqSocket::mySocketDescriptors::Disconnect() { + if (server) + zmq_unbind(socketDescriptor, serverAddress); + else + zmq_disconnect(socketDescriptor, serverAddress); +}; +void ZmqSocket::mySocketDescriptors::Close() { + if (socketDescriptor != NULL) { + zmq_close(socketDescriptor); + socketDescriptor = NULL; + } + + if (contextDescriptor != NULL) { + zmq_ctx_destroy(contextDescriptor); + contextDescriptor = NULL; + } +};