mirror of
https://github.com/slsdetectorgroup/slsDetectorPackage.git
synced 2025-06-13 21:37:13 +02:00
added zmqasocket
This commit is contained in:
288
slsReceiverSoftware/include/ZmqSocket.h
Normal file
288
slsReceiverSoftware/include/ZmqSocket.h
Normal file
@ -0,0 +1,288 @@
|
||||
#pragma once
|
||||
/************************************************
|
||||
* @file zmqSocket.h
|
||||
* @short functions to open/close zmq sockets
|
||||
***********************************************/
|
||||
/**
|
||||
*@short functions to open/close zmq sockets
|
||||
*/
|
||||
|
||||
#include "ansi.h"
|
||||
|
||||
#include <zmq.h>
|
||||
#include <rapidjson/document.h> //json header in zmq stream
|
||||
#include <errno.h>
|
||||
|
||||
#define DEFAULT_ZMQ_PORTNO 70001
|
||||
|
||||
class ZmqSocket {
|
||||
|
||||
public:
|
||||
|
||||
//Socket Options for optimization
|
||||
//ZMQ_LINGER default is already -1 means no messages discarded. use this options if optimizing required
|
||||
//ZMQ_SNDHWM default is 0 means no limit. use this to optimize if optimizing required
|
||||
// eg. int value = -1;
|
||||
// if (zmq_setsockopt(socketDescriptor, ZMQ_LINGER, &value,sizeof(value))) {
|
||||
// Close();
|
||||
// }
|
||||
|
||||
/**
|
||||
* Constructor for a client
|
||||
* Creates socket, context and connects to server
|
||||
* @param hostname hostname or ip of server
|
||||
* @param portnumber port number
|
||||
*/
|
||||
ZmqSocket (const char* const hostname, const uint32_t portnumber):
|
||||
portno (portnumber),
|
||||
server (false),
|
||||
contextDescriptor (NULL),
|
||||
socketDescriptor (NULL)
|
||||
{
|
||||
// construct address
|
||||
if (strchr (hostname, '.') != NULL) {
|
||||
// convert hostname to ip
|
||||
hostname = ConvertHostnameToIp (hostname);
|
||||
if (hostname == NULL)
|
||||
return;
|
||||
}
|
||||
sprintf (serverAddress, "tcp://%s:%d", hostname, portno);
|
||||
|
||||
// create context
|
||||
contextDescriptor = zmq_ctx_new();
|
||||
if (contextDescriptor == NULL)
|
||||
return;
|
||||
|
||||
// create publisher
|
||||
socketDescriptor = zmq_socket (contextDescriptor, ZMQ_PULL);
|
||||
if (socketDescriptor == NULL) {
|
||||
PrintError ();
|
||||
Close ();
|
||||
}
|
||||
|
||||
//Socket Options provided above
|
||||
|
||||
//connect socket
|
||||
if (zmq_connect(socketDescriptor, serverAddress)) {
|
||||
PrintError ();
|
||||
Close ();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Constructor for a server
|
||||
* Creates socket, context and connects to server
|
||||
* @param hostname hostname or ip of server
|
||||
* @param portnumber port number
|
||||
*/
|
||||
ZmqSocket (const uint32_t portnumber):
|
||||
portno (portnumber),
|
||||
server (true),
|
||||
contextDescriptor (NULL),
|
||||
socketDescriptor (NULL)
|
||||
{
|
||||
// create context
|
||||
contextDescriptor = zmq_ctx_new();
|
||||
if (contextDescriptor == NULL)
|
||||
return;
|
||||
// create publisher
|
||||
socketDescriptor = zmq_socket (contextDescriptor, ZMQ_PUSH);
|
||||
if (socketDescriptor == NULL) {
|
||||
PrintError ();
|
||||
Close ();
|
||||
}
|
||||
|
||||
//Socket Options provided above
|
||||
|
||||
// construct address
|
||||
sprintf (serverAddress,"tcp://*:%d", portno);
|
||||
// bind address
|
||||
if (zmq_bind (socketDescriptor, serverAddress)) {
|
||||
PrintError ();
|
||||
Close ();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Destructor
|
||||
*/
|
||||
~ZmqSocket () {
|
||||
Disconnect();
|
||||
Close();
|
||||
};
|
||||
|
||||
/**
|
||||
* Returns error status
|
||||
* @returns 1 if error else 0
|
||||
*/
|
||||
int GetErrorStatus() { if (socketDescriptor == NULL) return 1; return 0; };
|
||||
|
||||
/**
|
||||
* Returns Server Address
|
||||
* @returns Server Address
|
||||
*/
|
||||
char* GetZmqServerAddress () { return serverAddress; };
|
||||
|
||||
/**
|
||||
* Returns Port Number
|
||||
* @returns Port Number
|
||||
*/
|
||||
uint32_t GetPortNumber () { return portno; };
|
||||
|
||||
/**
|
||||
* Returns Socket Descriptor
|
||||
* @reutns Socket descriptor
|
||||
*/
|
||||
|
||||
int GetsocketDescriptor () { return socketDescriptor; };
|
||||
|
||||
/**
|
||||
* Unbinds the Socket
|
||||
*/
|
||||
void Disconnect () {
|
||||
if (server)
|
||||
zmq_unbind (socketDescriptor, serverAddress);
|
||||
else
|
||||
zmq_disconnect (socketDescriptor, serverAddress);
|
||||
};
|
||||
|
||||
/**
|
||||
* Close Socket and destroy Context
|
||||
*/
|
||||
void Close () {
|
||||
if (socketDescriptor != NULL) {
|
||||
zmq_close (socketDescriptor);
|
||||
socketDescriptor = NULL;
|
||||
}
|
||||
if (contextDescriptor != NULL) {
|
||||
zmq_ctx_destroy (contextDescriptor);
|
||||
contextDescriptor = NULL;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Convert Hostname to ip
|
||||
* @param hostname hostname
|
||||
* @returns string with ip or NULL if error
|
||||
*/
|
||||
char* ConvertHostnameToIp (const char* const hostname) {
|
||||
struct hostent *he = gethostbyname (hostname);
|
||||
if (he == NULL){
|
||||
cprintf (RED,"Error: Could not convert hostname to ip (zmq)\n");
|
||||
return NULL;
|
||||
}
|
||||
return inet_ntoa (*(struct in_addr*)he->h_addr);
|
||||
};
|
||||
|
||||
/**
|
||||
* Send Message Header
|
||||
* @returns 0 if error, else 1
|
||||
*/
|
||||
int SendHeaderData (char* buf, int length) {
|
||||
if(!zmq_send (socketDescriptor, buf, length, ZMQ_SNDMORE))
|
||||
return 1;
|
||||
PrintError();
|
||||
return 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Send Message Body
|
||||
* @returns 0 if error, else 1
|
||||
*/
|
||||
int SendDataOnly (char* buf, int length) {
|
||||
if(!zmq_send (socketDescriptor, buf, length, 0))
|
||||
return 1;
|
||||
PrintError ();
|
||||
return 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Receive Message (header/data)
|
||||
* @param index self index for debugging
|
||||
* @returns length of message, -1 if error
|
||||
*/
|
||||
int ReceiveData(int index, zmq_msg_t& message) {
|
||||
// scan header
|
||||
zmq_msg_init (&message);
|
||||
int length = zmq_msg_recv (&message, socketDescriptor, 0);
|
||||
if (length == -1) {
|
||||
PrintError ();
|
||||
cprintf (BG_RED,"Error: Could not read header for socket %d\n",index);
|
||||
zmq_msg_close (&message);
|
||||
}
|
||||
return length;
|
||||
};
|
||||
|
||||
/**
|
||||
* Print error
|
||||
*/
|
||||
void PrintError () {
|
||||
switch (errno) {
|
||||
case EINVAL:
|
||||
cprintf(RED, "Error: The socket type/option or value/endpoint supplied is invalid (zmq)\n");
|
||||
break;
|
||||
case EAGAIN:
|
||||
cprintf(RED, "Error: Non-blocking mode was requested and the message cannot be sent/available at the moment (zmq)\n");
|
||||
break;
|
||||
case ENOTSUP:
|
||||
cprintf(RED, "Error: The zmq_send()/zmq_msg_recv() operation is not supported by this socket type (zmq)\n");
|
||||
break;
|
||||
case EFSM:
|
||||
cprintf(RED, "Error: The zmq_send()/zmq_msg_recv() unavailable now as socket in inappropriate state (eg. ZMQ_REP). Look up messaging patterns (zmq)\n");
|
||||
break;
|
||||
case EFAULT:
|
||||
cprintf(RED, "Error: The provided context/message is invalid (zmq)\n");
|
||||
break;
|
||||
case EMFILE:
|
||||
cprintf(RED, "Error: The limit on the total number of open ØMQ sockets has been reached (zmq)\n");
|
||||
break;
|
||||
case EPROTONOSUPPORT:
|
||||
cprintf(RED, "Error: The requested transport protocol is not supported (zmq)\n");
|
||||
break;
|
||||
case ENOCOMPATPROTO:
|
||||
cprintf(RED, "Error: The requested transport protocol is not compatible with the socket type (zmq)\n");
|
||||
break;
|
||||
case EADDRINUSE:
|
||||
cprintf(RED, "Error: The requested address is already in use (zmq)\n");
|
||||
break;
|
||||
case EADDRNOTAVAIL:
|
||||
cprintf(RED, "Error: The requested address was not local (zmq)\n");
|
||||
break;
|
||||
case ENODEV:
|
||||
cprintf(RED, "Error: The requested address specifies a nonexistent interface (zmq)\n");
|
||||
break;
|
||||
case ETERM:
|
||||
cprintf(RED, "Error: The ØMQ context associated with the specified socket was terminated (zmq)\n");
|
||||
break;
|
||||
case ENOTSOCK:
|
||||
cprintf(RED, "Error: The provided socket was invalid (zmq)\n");
|
||||
break;
|
||||
case EINTR:
|
||||
cprintf(RED, "Error: The operation was interrupted by delivery of a signal (zmq)\n");
|
||||
break;
|
||||
case EMTHREAD:
|
||||
cprintf(RED, "Error: No I/O thread is available to accomplish the task (zmq)\n");
|
||||
break;
|
||||
default:
|
||||
cprintf(RED, "Error: Unknown socket error (zmq)\n");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
private:
|
||||
/** Port Number */
|
||||
uint32_t portno;
|
||||
|
||||
/** true if server, else false */
|
||||
bool server;
|
||||
|
||||
/** Context Descriptor */
|
||||
void* contextDescriptor;
|
||||
|
||||
/** Socket Descriptor */
|
||||
void* socketDescriptor;
|
||||
|
||||
/** Server Address */
|
||||
char serverAddress[1000];
|
||||
};
|
Reference in New Issue
Block a user