esrf changes: Listener, DataProcessor, DataStreamer, Fifo, ZmqSocket constructors return an exception when it fails

This commit is contained in:
2018-05-02 16:49:27 +02:00
parent fdd19bc316
commit d3a0319863
16 changed files with 120 additions and 141 deletions

View File

@ -24,7 +24,6 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject {
/**
* Constructor
* Calls Base Class CreateThread(), sets ErrorMask if error and increments NumberofDataProcessors
* @param ret OK or FAIL if thread creation succeeded or failed
* @param ind self index
* @param f address of Fifo pointer
* @param ftype pointer to file format type
@ -37,7 +36,7 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject {
* @param dataReadycb pointer to data ready call back function
* @param pDataReadycb pointer to arguments of data ready call back function. To write/stream a smaller size of processed data, change this value (only smaller value is allowed).
*/
DataProcessor(int& ret, int ind, Fifo*& f, fileFormat* ftype, bool fwenable, bool* dsEnable, bool* gpEnable, uint32_t* dr,
DataProcessor(int ind, Fifo*& f, fileFormat* ftype, bool fwenable, bool* dsEnable, bool* gpEnable, uint32_t* dr,
uint32_t* freq, uint32_t* timer,
void (*dataReadycb)(uint64_t, uint32_t, uint32_t, uint64_t,
uint64_t, uint16_t, uint16_t, uint16_t, uint16_t,

View File

@ -20,7 +20,6 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject {
/**
* Constructor
* Calls Base Class CreateThread(), sets ErrorMask if error and increments NumberofDataStreamers
* @param ret OK or FAIL if thread creation succeeded or failed
* @param ind self index
* @param f address of Fifo pointer
* @param dr pointer to dynamic range
@ -29,7 +28,7 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject {
* @param fd flipped data enable for x and y dimensions
* @param ajh additional json header
*/
DataStreamer(int& ret, int ind, Fifo*& f, uint32_t* dr, int* sEnable, uint64_t* fi, int* fd, char* ajh);
DataStreamer(int ind, Fifo*& f, uint32_t* dr, int* sEnable, uint64_t* fi, int* fd, char* ajh);
/**
* Destructor
@ -87,12 +86,12 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject {
/**
* Creates Zmq Sockets
* (throws an exception if it couldnt create zmq sockets)
* @param nunits pointer to number of theads/ units per detector
* @param port streaming port start index
* @param srcip streaming source ip
* @return OK or FAIL
*/
int CreateZmqSockets(int* nunits, uint32_t port, const char* srcip);
void CreateZmqSockets(int* nunits, uint32_t port, const char* srcip);
/**
* Shuts down and deletes Zmq Sockets

View File

@ -23,9 +23,8 @@ class Fifo : private virtual slsReceiverDefs {
* @param ind self index
* @param fifoItemSize size of each fifo item
* @param depth fifo depth
* @param success true if successful, else false
*/
Fifo(int ind, uint32_t fifoItemSize, uint32_t depth, bool &success);
Fifo(int ind, uint32_t fifoItemSize, uint32_t depth);
/**
* Destructor

View File

@ -21,7 +21,6 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject {
/**
* Constructor
* Calls Base Class CreateThread(), sets ErrorMask if error and increments NumberofListerners
* @param ret OK or FAIL if thread creation succeeded or failed
* @param ind self index
* @param dtype detector type
* @param f address of Fifo pointer
@ -34,7 +33,7 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject {
* @param us pointer to udp socket buffer size
* @param as pointer to actual udp socket buffer size
*/
Listener(int& ret, int ind, detectorType dtype, Fifo*& f, runStatus* s,
Listener(int ind, detectorType dtype, Fifo*& f, runStatus* s,
uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr,
uint32_t* us, uint32_t* as);

View File

@ -58,7 +58,7 @@ public:
struct addrinfo *result;
if ((ConvertHostnameToInternetAddress(hostname_or_ip, &result)) ||
(ConvertInternetAddresstoIpString(result, ip, MAX_STR_LENGTH)))
return;
throw std::exception();
// construct address
sprintf (serverAddress, "tcp://%s:%d", ip, portno);
@ -69,29 +69,32 @@ public:
// create context
contextDescriptor = zmq_ctx_new();
if (contextDescriptor == NULL)
return;
throw std::exception();
// create publisher
socketDescriptor = zmq_socket (contextDescriptor, ZMQ_SUB);
if (socketDescriptor == NULL) {
PrintError ();
Close ();
PrintError ();
Close ();
throw std::exception();
}
//Socket Options provided above
// an empty string implies receiving any messages
if ( zmq_setsockopt(socketDescriptor, ZMQ_SUBSCRIBE, "", 0)) {
PrintError ();
Close();
PrintError ();
Close();
throw std::exception();
}
//ZMQ_LINGER default is already -1 means no messages discarded. use this options if optimizing required
//ZMQ_SNDHWM default is 0 means no limit. use this to optimize if optimizing required
// eg. int value = -1;
int value = 0;
if (zmq_setsockopt(socketDescriptor, ZMQ_LINGER, &value,sizeof(value))) {
PrintError ();
Close();
}
if (zmq_setsockopt(socketDescriptor, ZMQ_LINGER, &value,sizeof(value))) {
PrintError ();
Close();
throw std::exception();
}
};
/**
@ -111,13 +114,13 @@ public:
// create context
contextDescriptor = zmq_ctx_new();
if (contextDescriptor == NULL)
return;
throw std::exception();
// create publisher
socketDescriptor = zmq_socket (contextDescriptor, ZMQ_PUB);
if (socketDescriptor == NULL) {
PrintError ();
Close ();
return;
throw std::exception();
}
//Socket Options provided above
@ -131,7 +134,7 @@ public:
if (zmq_bind (socketDescriptor, serverAddress) < 0) {
PrintError ();
Close ();
return;
throw std::exception();
}
//sleep for a few milliseconds to allow a slow-joiner
@ -146,11 +149,6 @@ public:
Close();
};
/**
* Returns error status
* @returns true if error else false
*/
bool IsError() { if (socketDescriptor == NULL) return true; return false; };
/**
* Returns Server Address

View File

@ -231,11 +231,11 @@ enum communicationProtocol{
// confirm if sufficient
if (getsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen) == -1) {
FILE_LOG(logWARNING) << "[Port " << port_number << "] Could not get Socket Receive Buffer Size";
FILE_LOG(logWARNING) << "[Port " << port_number << "] Could not get rx socket receive buffer size";
} else if (ret_size >= real_size) {
actual_udp_socket_buffer_size = ret_size;
#ifdef VEBOSE
FILE_LOG(logINFO) << "[Port " << port_number << "] UDP Socket Buffer Size is sufficient (" << ret_size << ")";
FILE_LOG(logINFO) << "[Port " << port_number << "] UDP rx socket buffer size is sufficient (" << ret_size << ")";
#endif
}
@ -243,16 +243,16 @@ enum communicationProtocol{
else {
// set buffer size (could not set)
if (setsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &desired_size, optlen) == -1) {
FILE_LOG(logWARNING) << "[Port " << port_number << "] Could not set Socket Receive Buffer Size to "
<< desired_size << ". No Root Privileges?";
FILE_LOG(logWARNING) << "[Port " << port_number << "] Could not set rx socket buffer size to "
<< desired_size << ". (No Root Privileges?)";
}
// confirm size
else if (getsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen) == -1) {
FILE_LOG(logWARNING) << "[Port " << port_number << "] Could not get Socket Receive Buffer Size";
FILE_LOG(logWARNING) << "[Port " << port_number << "] Could not get rx socket buffer size";
}
else if (ret_size >= real_size) {
actual_udp_socket_buffer_size = ret_size;
FILE_LOG(logINFO) << "[Port " << port_number << "] UDP Socket Buffer Size modified to " << ret_size;
FILE_LOG(logINFO) << "[Port " << port_number << "] UDP rx socket buffer size modified to " << ret_size;
}
// buffer size too large
else {
@ -262,13 +262,13 @@ enum communicationProtocol{
getsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen);
if (ret == -1) {
FILE_LOG(logWARNING) << "[Port " << port_number << "] "
"Could not force Socket Receive Buffer Size to "
<< desired_size << ". Real size is " << ret_size <<
"Could not force rx socket buffer size to "
<< desired_size << ".\n Real size: " << ret_size <<
". (No Root Privileges?)\n"
"Set rx_udpsocksize from the client to <= " <<
(ret_size/2) << " (Real size:" << ret_size << ") to remove this warning.\n";
" To remove this warning: set rx_udpsocksize from client to <= " <<
(ret_size/2) << " (Real size:" << ret_size << ").";
} else {
FILE_LOG(logINFO) << "[Port " << port_number << "] UDP socket buffer size modified to " << ret_size;
FILE_LOG(logINFO) << "[Port " << port_number << "] UDP rx socket buffer size modified to " << ret_size;
}
}
}