Exceptions: zmq socket class descriptors made into its own class for auto destruction upon construction exception, similarly for other try blocks. slsDetector and multislsdetector left to do

This commit is contained in:
maliakal_d 2018-08-09 18:12:56 +02:00
parent 1102153d2b
commit a0512a01d5
7 changed files with 140 additions and 138 deletions

View File

@ -721,8 +721,8 @@ bool multiSlsDetector::initSharedMemory(bool verify) {
cprintf(RED, "Multi shared memory (%d) version mismatch " cprintf(RED, "Multi shared memory (%d) version mismatch "
"(expected 0x%x but got 0x%x)\n", detId, "(expected 0x%x but got 0x%x)\n", detId,
MULTI_SHMVERSION, thisMultiDetector->shmversion); MULTI_SHMVERSION, thisMultiDetector->shmversion);
sharedMemory->UnmapSharedMemory(thisMultiDetector);/** is this unncessary? */ sharedMemory->UnmapSharedMemory(thisMultiDetector);
delete sharedMemory;/** is this unncessary? */ delete sharedMemory;
sharedMemory = 0; sharedMemory = 0;
throw SharedMemoryException(); throw SharedMemoryException();
} }
@ -732,8 +732,8 @@ bool multiSlsDetector::initSharedMemory(bool verify) {
created = true; created = true;
} catch(...) { } catch(...) {
sharedMemory->RemoveSharedMemory(); sharedMemory->RemoveSharedMemory();
delete sharedMemory;/** is this unncessary? */ delete sharedMemory;
sharedMemory = 0;/** is this unncessary? */ sharedMemory = 0;
thisMultiDetector = 0; thisMultiDetector = 0;
throw; throw;
} }
@ -4852,6 +4852,7 @@ int multiSlsDetector::createReceivingDataSockets(const bool destroy) {
cprintf(MAGENTA, "Going to destroy data sockets\n"); cprintf(MAGENTA, "Going to destroy data sockets\n");
//close socket //close socket
for (vector<ZmqSocket*>::const_iterator it = zmqSocket.begin(); it != zmqSocket.end(); ++it) { for (vector<ZmqSocket*>::const_iterator it = zmqSocket.begin(); it != zmqSocket.end(); ++it) {
(*it)->Close();
delete(*it); delete(*it);
} }
zmqSocket.clear(); zmqSocket.clear();

View File

@ -113,7 +113,8 @@ public:
// create multiSlsDetector class if required // create multiSlsDetector class if required
if (myDetector==NULL) { \ if (myDetector==NULL) { \
try { \ try { \
myDetector = new multiSlsDetector(id, verify, update); \ multiSlsDetector* m = new multiSlsDetector(id, verify, update); \
myDetector = m; \
} catch (const SharedMemoryException & e) { \ } catch (const SharedMemoryException & e) { \
cout << e.GetMessage() << endl; \ cout << e.GetMessage() << endl; \
return; \ return; \

View File

@ -1001,7 +1001,7 @@ slsDetectorDefs::sls_detector_module* slsDetector::createModule(detectorType ty
nd = detlist.nDacs; nd = detlist.nDacs;
na = detlist.nAdcs; na = detlist.nAdcs;
} catch(...) { } catch(...) {
;// FIXME do what here? return NULL;
} }
dacs_t *dacs=new dacs_t[nd]; dacs_t *dacs=new dacs_t[nd];
dacs_t *adcs=new dacs_t[na]; dacs_t *adcs=new dacs_t[na];

View File

@ -270,13 +270,17 @@ public:
dset->extend(dims); dset->extend(dims);
delete dspace; delete dspace;
dspace = new DataSpace(dset->getSpace()); dspace = 0;
DataSpace* d = new DataSpace(dset->getSpace());
dspace = d;
hsize_t dims_para[1] = {dims[0]}; hsize_t dims_para[1] = {dims[0]};
for (unsigned int i = 0; i < dset_para.size(); ++i) for (unsigned int i = 0; i < dset_para.size(); ++i)
dset_para[i]->extend(dims_para); dset_para[i]->extend(dims_para);
delete dspace_para; delete dspace_para;
dspace_para = new DataSpace(dset_para[0]->getSpace()); dspace_para = 0;
DataSpace* ds = new DataSpace(dset_para[0]->getSpace());
dspace_para = ds;
} }
catch(Exception error){ catch(Exception error){
@ -318,14 +322,16 @@ public:
FileAccPropList flist; FileAccPropList flist;
flist.setFcloseDegree(H5F_CLOSE_STRONG); flist.setFcloseDegree(H5F_CLOSE_STRONG);
int k = 0;
if(!owenable) if(!owenable)
fd = new H5File( fname.c_str(), H5F_ACC_EXCL, k = new H5File( fname.c_str(), H5F_ACC_EXCL,
FileCreatPropList::DEFAULT, FileCreatPropList::DEFAULT,
flist ); flist );
else else
fd = new H5File( fname.c_str(), H5F_ACC_TRUNC, k = new H5File( fname.c_str(), H5F_ACC_TRUNC,
FileCreatPropList::DEFAULT, FileCreatPropList::DEFAULT,
flist ); flist );
fd = k;
//variables //variables
DataSpace dataspace = DataSpace (H5S_SCALAR); DataSpace dataspace = DataSpace (H5S_SCALAR);
@ -416,6 +422,7 @@ public:
} catch(Exception error) { } catch(Exception error) {
cprintf(RED,"Error in creating master HDF5 handles\n"); cprintf(RED,"Error in creating master HDF5 handles\n");
error.printErrorStack(); error.printErrorStack();
if (fd) fd->close();
return 1; return 1;
} }
return 0; return 0;
@ -459,14 +466,16 @@ public:
//file //file
FileAccPropList fapl; FileAccPropList fapl;
fapl.setFcloseDegree(H5F_CLOSE_STRONG); fapl.setFcloseDegree(H5F_CLOSE_STRONG);
int k = 0;
if(!owenable) if(!owenable)
fd = new H5File( fname.c_str(), H5F_ACC_EXCL, k = new H5File( fname.c_str(), H5F_ACC_EXCL,
FileCreatPropList::DEFAULT, FileCreatPropList::DEFAULT,
fapl ); fapl );
else else
fd = new H5File( fname.c_str(), H5F_ACC_TRUNC, k = new H5File( fname.c_str(), H5F_ACC_TRUNC,
FileCreatPropList::DEFAULT, FileCreatPropList::DEFAULT,
fapl ); fapl );
fd = k;
//attributes - version //attributes - version
double dValue=version; double dValue=version;
@ -477,7 +486,8 @@ public:
//dataspace //dataspace
hsize_t srcdims[3] = {nDimx, nDimy, nDimz}; hsize_t srcdims[3] = {nDimx, nDimy, nDimz};
hsize_t srcdimsmax[3] = {H5S_UNLIMITED, nDimy, nDimz}; hsize_t srcdimsmax[3] = {H5S_UNLIMITED, nDimy, nDimz};
dspace = new DataSpace (3,srcdims,srcdimsmax); DataSpace* d = new DataSpace (3,srcdims,srcdimsmax);
dspace = d;
//dataset name //dataset name
@ -494,12 +504,14 @@ public:
// always create chunked dataset as unlimited is only supported with chunked layout // always create chunked dataset as unlimited is only supported with chunked layout
hsize_t chunk_dims[3] ={maxchunkedimages, nDimy, nDimz}; hsize_t chunk_dims[3] ={maxchunkedimages, nDimy, nDimz};
plist.setChunk(3, chunk_dims); plist.setChunk(3, chunk_dims);
dset = new DataSet (fd->createDataSet(dsetname.c_str(), dtype, *dspace, plist)); DataSet* ds = new DataSet (fd->createDataSet(dsetname.c_str(), dtype, *dspace, plist));
dset = ds;
//create parameter datasets //create parameter datasets
hsize_t dims[1] = {nDimx}; hsize_t dims[1] = {nDimx};
hsize_t dimsmax[1] = {H5S_UNLIMITED}; hsize_t dimsmax[1] = {H5S_UNLIMITED};
dspace_para = new DataSpace (1,dims,dimsmax); DataSpace* dsp = new DataSpace (1,dims,dimsmax);
dspace_para = dsp;
// always create chunked dataset as unlimited is only supported with chunked layout // always create chunked dataset as unlimited is only supported with chunked layout
DSetCreatPropList paralist; DSetCreatPropList paralist;
@ -515,7 +527,7 @@ public:
catch(Exception error){ catch(Exception error){
cprintf(RED,"Error in creating HDF5 handles in object %d\n",ind); cprintf(RED,"Error in creating HDF5 handles in object %d\n",ind);
error.printErrorStack(); error.printErrorStack();
fd->close(); if (fd) fd->close();
return 1; return 1;
} }
return 0; return 0;

View File

@ -45,12 +45,7 @@ public:
* @param portnumber port number * @param portnumber port number
*/ */
ZmqSocket (const char* const hostname_or_ip, const uint32_t portnumber): ZmqSocket (const char* const hostname_or_ip, const uint32_t portnumber):
portno (portnumber), portno (portnumber) {
server (false),
contextDescriptor (NULL),
socketDescriptor (NULL),
headerMessage(0)
{
char ip[MAX_STR_LENGTH] = ""; char ip[MAX_STR_LENGTH] = "";
memset(ip, 0, MAX_STR_LENGTH); memset(ip, 0, MAX_STR_LENGTH);
@ -61,19 +56,19 @@ public:
throw std::exception(); throw std::exception();
// construct address // construct address
sprintf (serverAddress, "tcp://%s:%d", ip, portno); sprintf (sockfd.serverAddress, "tcp://%s:%d", ip, portno);
#ifdef VERBOSE #ifdef VERBOSE
cprintf(BLUE,"address:%s\n",serverAddress); cprintf(BLUE,"address:%s\n",sockfd.serverAddress);
#endif #endif
// create context // create context
contextDescriptor = zmq_ctx_new(); sockfd.contextDescriptor = zmq_ctx_new();
if (contextDescriptor == NULL) if (sockfd.contextDescriptor == 0)
throw std::exception(); throw std::exception();
// create publisher // create publisher
socketDescriptor = zmq_socket (contextDescriptor, ZMQ_SUB); sockfd.socketDescriptor = zmq_socket (sockfd.contextDescriptor, ZMQ_SUB);
if (socketDescriptor == NULL) { if (sockfd.socketDescriptor == 0) {
PrintError (); PrintError ();
Close (); Close ();
throw std::exception(); throw std::exception();
@ -81,7 +76,7 @@ public:
//Socket Options provided above //Socket Options provided above
// an empty string implies receiving any messages // an empty string implies receiving any messages
if ( zmq_setsockopt(socketDescriptor, ZMQ_SUBSCRIBE, "", 0)) { if ( zmq_setsockopt(sockfd.socketDescriptor, ZMQ_SUBSCRIBE, "", 0)) {
PrintError (); PrintError ();
Close(); Close();
throw std::exception(); throw std::exception();
@ -90,7 +85,7 @@ public:
//ZMQ_SNDHWM default is 0 means no limit. use this to optimize if optimizing required //ZMQ_SNDHWM default is 0 means no limit. use this to optimize if optimizing required
// eg. int value = -1; // eg. int value = -1;
int value = 0; int value = 0;
if (zmq_setsockopt(socketDescriptor, ZMQ_LINGER, &value,sizeof(value))) { if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_LINGER, &value,sizeof(value))) {
PrintError (); PrintError ();
Close(); Close();
throw std::exception(); throw std::exception();
@ -105,19 +100,16 @@ public:
* @param ethip is the ip of the ethernet interface to stream zmq from * @param ethip is the ip of the ethernet interface to stream zmq from
*/ */
ZmqSocket (const uint32_t portnumber, const char *ethip): ZmqSocket (const uint32_t portnumber, const char *ethip):
portno (portnumber), portno (portnumber) {
server (true), sockfd.server = true;
contextDescriptor (NULL),
socketDescriptor (NULL),
headerMessage(0)
{
// create context // create context
contextDescriptor = zmq_ctx_new(); sockfd.contextDescriptor = zmq_ctx_new();
if (contextDescriptor == NULL) if (sockfd.contextDescriptor == 0)
throw std::exception(); throw std::exception();
// create publisher // create publisher
socketDescriptor = zmq_socket (contextDescriptor, ZMQ_PUB); sockfd.socketDescriptor = zmq_socket (sockfd.contextDescriptor, ZMQ_PUB);
if (socketDescriptor == NULL) { if (sockfd.socketDescriptor == 0) {
PrintError (); PrintError ();
Close (); Close ();
throw std::exception(); throw std::exception();
@ -126,12 +118,12 @@ public:
//Socket Options provided above //Socket Options provided above
// construct addresss // construct addresss
sprintf (serverAddress,"tcp://%s:%d", ethip, portno); sprintf (sockfd.serverAddress,"tcp://%s:%d", ethip, portno);
#ifdef VERBOSE #ifdef VERBOSE
cprintf(BLUE,"address:%s\n",serverAddress); cprintf(BLUE,"address:%s\n",sockfd.serverAddress);
#endif #endif
// bind address // bind address
if (zmq_bind (socketDescriptor, serverAddress) < 0) { if (zmq_bind (sockfd.socketDescriptor, sockfd.serverAddress) < 0) {
PrintError (); PrintError ();
Close (); Close ();
throw std::exception(); throw std::exception();
@ -145,69 +137,49 @@ public:
* Destructor * Destructor
*/ */
~ZmqSocket () { ~ZmqSocket () {
Disconnect(); //mySocketDescriptor destructor also gets called
Close();
}; };
/**
* Returns Server Address
* @returns Server Address
*/
char* GetZmqServerAddress () { return serverAddress; };
/** /**
* Returns Port Number * Returns Port Number
* @returns Port Number * @returns Port Number
*/ */
uint32_t GetPortNumber () { return portno; }; uint32_t GetPortNumber () { return portno; };
/**
* Returns Server Address
* @returns Server Address
*/
char* GetZmqServerAddress () { return sockfd.serverAddress; };
/** /**
* Returns Socket Descriptor * Returns Socket Descriptor
* @reutns Socket descriptor * @reutns Socket descriptor
*/ */
void* GetsocketDescriptor () { return socketDescriptor; }; void* GetsocketDescriptor () { return sockfd.socketDescriptor; };
/** /**
* Connect client socket to server socket * Connect client socket to server socket
* @returns 1 for fail, 0 for success * @returns 1 for fail, 0 for success
*/ */
int Connect() { int Connect() {
if (zmq_connect(socketDescriptor, serverAddress) < 0) { if (zmq_connect(sockfd.socketDescriptor, sockfd.serverAddress) < 0) {
PrintError (); PrintError ();
Close ();
return 1; return 1;
} }
return 0; return 0;
} }
/** /**
* Unbinds the Socket * Unbinds the Socket
*/ */
void Disconnect () { void Disconnect () {sockfd.Disconnect();};
if (server)
zmq_unbind (socketDescriptor, serverAddress);
else
zmq_disconnect (socketDescriptor, serverAddress);
};
/** /**
* Close Socket and destroy Context * Close Socket and destroy Context
*/ */
void Close () { void Close () { sockfd.Close(); };
if (socketDescriptor != NULL) {
zmq_close (socketDescriptor);
socketDescriptor = NULL;
}
if (contextDescriptor != NULL) {
zmq_ctx_destroy (contextDescriptor);
contextDescriptor = NULL;
}
};
/** /**
* Convert Hostname to Internet address info structure * Convert Hostname to Internet address info structure
@ -332,7 +304,7 @@ public:
cprintf(BLUE,"%d : Streamer: buf: %s\n", index, buf); cprintf(BLUE,"%d : Streamer: buf: %s\n", index, buf);
#endif #endif
if(zmq_send (socketDescriptor, buf, length, dummy?0:ZMQ_SNDMORE) < 0) { if(zmq_send (sockfd.socketDescriptor, buf, length, dummy?0:ZMQ_SNDMORE) < 0) {
PrintError (); PrintError ();
return 0; return 0;
} }
@ -349,7 +321,7 @@ public:
* @returns 0 if error, else 1 * @returns 0 if error, else 1
*/ */
int SendData (char* buf, int length) { int SendData (char* buf, int length) {
if(zmq_send (socketDescriptor, buf, length, 0) < 0) { if(zmq_send (sockfd.socketDescriptor, buf, length, 0) < 0) {
PrintError (); PrintError ();
return 0; return 0;
} }
@ -367,7 +339,7 @@ public:
* @returns length of message, -1 if error * @returns length of message, -1 if error
*/ */
int ReceiveMessage(const int index, zmq_msg_t& message) { int ReceiveMessage(const int index, zmq_msg_t& message) {
int length = zmq_msg_recv (&message, socketDescriptor, 0); int length = zmq_msg_recv (&message, sockfd.socketDescriptor, 0);
if (length == -1) { if (length == -1) {
PrintError (); PrintError ();
cprintf (BG_RED,"Error: Could not read header for socket %d\n",index); cprintf (BG_RED,"Error: Could not read header for socket %d\n",index);
@ -461,29 +433,6 @@ public:
dummy = temp ? false : true; dummy = temp ? false : true;
return 1; return 1;
/*
int temp = d["data"].GetUint();
dummy = temp ? false : true;
if (!dummy) {
acqIndex = d["acqIndex"].GetUint64();
frameIndex = d["fIndex"].GetUint64();
fileIndex = d["fileIndex"].GetUint64();
subframeIndex = d["expLength"].GetUint();
filename = d["fname"].GetString();
}
#ifdef VERYVERBOSE
cprintf(BLUE,"%d Dummy:%d\n"
"\tAcqIndex:%lu\n"
"\tFrameIndex:%lu\n"
"\tSubIndex:%u\n"
"\tFileIndex:%lu\n"
"\tBitMode:%u\n"
"\tDetType:%u\n",
index, (int)dummy, acqIndex, frameIndex, subframeIndex, fileIndex,
d["bitmode"].GetUint(),d["detType"].GetUint());
#endif
return 1;
*/
}; };
@ -585,24 +534,60 @@ public:
}; };
private:
/**
* Class to close socket descriptors automatically
* upon encountering exceptions in the ZmqSocket constructor
*/
class mySocketDescriptors {
public:
/** Constructor */
mySocketDescriptors():
server(false),
contextDescriptor(0),
socketDescriptor(0) {};
/** Destructor */
~mySocketDescriptors() {
Disconnect();
Close();
}
/** Unbinds the Socket */
void Disconnect () {
if (server)
zmq_unbind (socketDescriptor, serverAddress);
else
zmq_disconnect (socketDescriptor, serverAddress);
};
/** Close Socket and destroy Context */
void Close () {
if (socketDescriptor != NULL) {
zmq_close (socketDescriptor);
socketDescriptor = NULL;
}
if (contextDescriptor != NULL) {
zmq_ctx_destroy (contextDescriptor);
contextDescriptor = NULL;
}
};
/** true if server, else false */
bool server;
/** Server Address */
char serverAddress[1000];
/** Context Descriptor */
void* contextDescriptor;
/** Socket Descriptor */
void* socketDescriptor;
};
private: private:
/** Port Number */ /** Port Number */
uint32_t portno; uint32_t portno;
/** true if server, else false */ /** Socket descriptor */
bool server; mySocketDescriptors sockfd;
/** Context Descriptor */
void* contextDescriptor;
/** Socket Descriptor */
void* socketDescriptor;
/** Server Address */
char serverAddress[1000];
/** Header Message pointer */ /** Header Message pointer */
zmq_msg_t* headerMessage; zmq_msg_t* headerMessage;
}; };

View File

@ -50,31 +50,6 @@ using namespace std;
#define DEFAULT_BACKLOG 5 #define DEFAULT_BACKLOG 5
/**
* Class to close socket descriptors automatically
* upon encountering exceptions in the constructor
*/
class mySocketDescriptors {
public:
mySocketDescriptors():fd(-1), newfd(-1){};
~mySocketDescriptors() {
// close TCP server new socket descriptor from accept
if (newfd >= 0) {
close(newfd);
}
// close socket descriptor
if (fd >= 0) {
close(fd);
}
}
/** socket descriptor */
int fd;
/** new socket descriptor in TCP server from accept */
int newfd;
};
class genericSocket{ class genericSocket{
public: public:
@ -814,6 +789,33 @@ public:
char thisClientIP[INET_ADDRSTRLEN]; char thisClientIP[INET_ADDRSTRLEN];
int differentClients; int differentClients;
private:
/**
* Class to close socket descriptors automatically
* upon encountering exceptions in the genericSocket constructor
*/
class mySocketDescriptors {
public:
/** Constructor */
mySocketDescriptors():fd(-1), newfd(-1){};
/** Destructor */
~mySocketDescriptors() {
// close TCP server new socket descriptor from accept
if (newfd >= 0) {
close(newfd);
}
// close socket descriptor
if (fd >= 0) {
close(fd);
}
}
/** socket descriptor */
int fd;
/** new socket descriptor in TCP server from accept */
int newfd;
};
protected: protected:
int portno; int portno;
communicationProtocol protocol; communicationProtocol protocol;

View File

@ -132,7 +132,8 @@ void DataStreamer::CreateZmqSockets(int* nunits, uint32_t port, const char* srci
uint32_t portnum = port + index; uint32_t portnum = port + index;
try { try {
zmqSocket = new ZmqSocket(portnum, (strlen(srcip)?srcip:NULL)); ZmqSocket* z = new ZmqSocket(portnum, (strlen(srcip)?srcip:NULL));
zmqSocket = z;
} catch (...) { } catch (...) {
cprintf(RED, "Error: Could not create Zmq socket on port %d for Streamer %d\n", portnum, index); cprintf(RED, "Error: Could not create Zmq socket on port %d for Streamer %d\n", portnum, index);
throw; throw;