diff --git a/slsReceiverSoftware/include/ZmqSocket.h b/slsReceiverSoftware/include/ZmqSocket.h new file mode 100644 index 000000000..2d1163641 --- /dev/null +++ b/slsReceiverSoftware/include/ZmqSocket.h @@ -0,0 +1,288 @@ +#pragma once +/************************************************ + * @file zmqSocket.h + * @short functions to open/close zmq sockets + ***********************************************/ +/** + *@short functions to open/close zmq sockets + */ + +#include "ansi.h" + +#include +#include //json header in zmq stream +#include + +#define DEFAULT_ZMQ_PORTNO 70001 + +class ZmqSocket { + +public: + + //Socket Options for optimization + //ZMQ_LINGER default is already -1 means no messages discarded. use this options if optimizing required + //ZMQ_SNDHWM default is 0 means no limit. use this to optimize if optimizing required + // eg. int value = -1; + // if (zmq_setsockopt(socketDescriptor, ZMQ_LINGER, &value,sizeof(value))) { + // Close(); + // } + + /** + * Constructor for a client + * Creates socket, context and connects to server + * @param hostname hostname or ip of server + * @param portnumber port number + */ + ZmqSocket (const char* const hostname, const uint32_t portnumber): + portno (portnumber), + server (false), + contextDescriptor (NULL), + socketDescriptor (NULL) + { + // construct address + if (strchr (hostname, '.') != NULL) { + // convert hostname to ip + hostname = ConvertHostnameToIp (hostname); + if (hostname == NULL) + return; + } + sprintf (serverAddress, "tcp://%s:%d", hostname, portno); + + // create context + contextDescriptor = zmq_ctx_new(); + if (contextDescriptor == NULL) + return; + + // create publisher + socketDescriptor = zmq_socket (contextDescriptor, ZMQ_PULL); + if (socketDescriptor == NULL) { + PrintError (); + Close (); + } + + //Socket Options provided above + + //connect socket + if (zmq_connect(socketDescriptor, serverAddress)) { + PrintError (); + Close (); + } + }; + + /** + * Constructor for a server + * Creates socket, context and connects to server + * @param hostname hostname or ip of server + * @param portnumber port number + */ + ZmqSocket (const uint32_t portnumber): + portno (portnumber), + server (true), + contextDescriptor (NULL), + socketDescriptor (NULL) + { + // create context + contextDescriptor = zmq_ctx_new(); + if (contextDescriptor == NULL) + return; + // create publisher + socketDescriptor = zmq_socket (contextDescriptor, ZMQ_PUSH); + if (socketDescriptor == NULL) { + PrintError (); + Close (); + } + + //Socket Options provided above + + // construct address + sprintf (serverAddress,"tcp://*:%d", portno); + // bind address + if (zmq_bind (socketDescriptor, serverAddress)) { + PrintError (); + Close (); + } + }; + + /** + * Destructor + */ + ~ZmqSocket () { + Disconnect(); + Close(); + }; + + /** + * Returns error status + * @returns 1 if error else 0 + */ + int GetErrorStatus() { if (socketDescriptor == NULL) return 1; return 0; }; + + /** + * Returns Server Address + * @returns Server Address + */ + char* GetZmqServerAddress () { return serverAddress; }; + + /** + * Returns Port Number + * @returns Port Number + */ + uint32_t GetPortNumber () { return portno; }; + + /** + * Returns Socket Descriptor + * @reutns Socket descriptor + */ + + int GetsocketDescriptor () { return socketDescriptor; }; + + /** + * Unbinds the Socket + */ + void Disconnect () { + if (server) + zmq_unbind (socketDescriptor, serverAddress); + else + zmq_disconnect (socketDescriptor, serverAddress); + }; + + /** + * Close Socket and destroy Context + */ + void Close () { + if (socketDescriptor != NULL) { + zmq_close (socketDescriptor); + socketDescriptor = NULL; + } + if (contextDescriptor != NULL) { + zmq_ctx_destroy (contextDescriptor); + contextDescriptor = NULL; + } + }; + + /** + * Convert Hostname to ip + * @param hostname hostname + * @returns string with ip or NULL if error + */ + char* ConvertHostnameToIp (const char* const hostname) { + struct hostent *he = gethostbyname (hostname); + if (he == NULL){ + cprintf (RED,"Error: Could not convert hostname to ip (zmq)\n"); + return NULL; + } + return inet_ntoa (*(struct in_addr*)he->h_addr); + }; + + /** + * Send Message Header + * @returns 0 if error, else 1 + */ + int SendHeaderData (char* buf, int length) { + if(!zmq_send (socketDescriptor, buf, length, ZMQ_SNDMORE)) + return 1; + PrintError(); + return 0; + }; + + /** + * Send Message Body + * @returns 0 if error, else 1 + */ + int SendDataOnly (char* buf, int length) { + if(!zmq_send (socketDescriptor, buf, length, 0)) + return 1; + PrintError (); + return 0; + }; + + /** + * Receive Message (header/data) + * @param index self index for debugging + * @returns length of message, -1 if error + */ + int ReceiveData(int index, zmq_msg_t& message) { + // scan header + zmq_msg_init (&message); + int length = zmq_msg_recv (&message, socketDescriptor, 0); + if (length == -1) { + PrintError (); + cprintf (BG_RED,"Error: Could not read header for socket %d\n",index); + zmq_msg_close (&message); + } + return length; + }; + + /** + * Print error + */ + void PrintError () { + switch (errno) { + case EINVAL: + cprintf(RED, "Error: The socket type/option or value/endpoint supplied is invalid (zmq)\n"); + break; + case EAGAIN: + cprintf(RED, "Error: Non-blocking mode was requested and the message cannot be sent/available at the moment (zmq)\n"); + break; + case ENOTSUP: + cprintf(RED, "Error: The zmq_send()/zmq_msg_recv() operation is not supported by this socket type (zmq)\n"); + break; + case EFSM: + cprintf(RED, "Error: The zmq_send()/zmq_msg_recv() unavailable now as socket in inappropriate state (eg. ZMQ_REP). Look up messaging patterns (zmq)\n"); + break; + case EFAULT: + cprintf(RED, "Error: The provided context/message is invalid (zmq)\n"); + break; + case EMFILE: + cprintf(RED, "Error: The limit on the total number of open ØMQ sockets has been reached (zmq)\n"); + break; + case EPROTONOSUPPORT: + cprintf(RED, "Error: The requested transport protocol is not supported (zmq)\n"); + break; + case ENOCOMPATPROTO: + cprintf(RED, "Error: The requested transport protocol is not compatible with the socket type (zmq)\n"); + break; + case EADDRINUSE: + cprintf(RED, "Error: The requested address is already in use (zmq)\n"); + break; + case EADDRNOTAVAIL: + cprintf(RED, "Error: The requested address was not local (zmq)\n"); + break; + case ENODEV: + cprintf(RED, "Error: The requested address specifies a nonexistent interface (zmq)\n"); + break; + case ETERM: + cprintf(RED, "Error: The ØMQ context associated with the specified socket was terminated (zmq)\n"); + break; + case ENOTSOCK: + cprintf(RED, "Error: The provided socket was invalid (zmq)\n"); + break; + case EINTR: + cprintf(RED, "Error: The operation was interrupted by delivery of a signal (zmq)\n"); + break; + case EMTHREAD: + cprintf(RED, "Error: No I/O thread is available to accomplish the task (zmq)\n"); + break; + default: + cprintf(RED, "Error: Unknown socket error (zmq)\n"); + break; + } + }; + + +private: + /** Port Number */ + uint32_t portno; + + /** true if server, else false */ + bool server; + + /** Context Descriptor */ + void* contextDescriptor; + + /** Socket Descriptor */ + void* socketDescriptor; + + /** Server Address */ + char serverAddress[1000]; +};