Zmqsocket merge with Erik's (removing header message) and mine(socketdescriptor safe destruction in constructor exception)

This commit is contained in:
2018-08-09 18:19:47 +02:00
2 changed files with 31 additions and 25 deletions

View File

@ -4962,7 +4962,7 @@ void multiSlsDetector::readFrameFromReceiver() {
rapidjson::Document doc; rapidjson::Document doc;
if (!zmqSocket[isocket]->ReceiveHeader(isocket, doc, if (!zmqSocket[isocket]->ReceiveHeader(isocket, doc,
SLS_DETECTOR_JSON_HEADER_VERSION)) { SLS_DETECTOR_JSON_HEADER_VERSION)) {
zmqSocket[isocket]->CloseHeaderMessage(); // zmqSocket[isocket]->CloseHeaderMessage();
// parse error, version error or end of acquisition for socket // parse error, version error or end of acquisition for socket
runningList[isocket] = false; runningList[isocket] = false;
--numRunning; --numRunning;
@ -5023,7 +5023,7 @@ void multiSlsDetector::readFrameFromReceiver() {
coordX, coordY, coordX, coordY,
flippedDataX); flippedDataX);
#endif #endif
zmqSocket[isocket]->CloseHeaderMessage(); // zmqSocket[isocket]->CloseHeaderMessage();
} }
// DATA // DATA

View File

@ -18,12 +18,13 @@
#include <rapidjson/document.h> //json header in zmq stream #include <rapidjson/document.h> //json header in zmq stream
#include <string.h> #include <string.h>
#include <unistd.h> //usleep in some machines #include <unistd.h> //usleep in some machines
#include <vector>
using namespace rapidjson; using namespace rapidjson;
#define MAX_STR_LENGTH 1000 #define MAX_STR_LENGTH 1000
//#define ZMQ_DETAIL // #define ZMQ_DETAIL
#define ROIVERBOSITY #define ROIVERBOSITY
class ZmqSocket { class ZmqSocket {
@ -36,8 +37,6 @@ public:
// eg. int value = -1; // eg. int value = -1;
// if (zmq_setsockopt(socketDescriptor, ZMQ_LINGER, &value,sizeof(value))) { // if (zmq_setsockopt(socketDescriptor, ZMQ_LINGER, &value,sizeof(value))) {
// Close(); // Close();
// }
/** /**
* Constructor for a client * Constructor for a client
* Creates socket, context and connects to server * Creates socket, context and connects to server
@ -45,7 +44,9 @@ public:
* @param portnumber port number * @param portnumber port number
*/ */
ZmqSocket (const char* const hostname_or_ip, const uint32_t portnumber): ZmqSocket (const char* const hostname_or_ip, const uint32_t portnumber):
portno (portnumber) { portno (portnumber)
// headerMessage(0)
{
char ip[MAX_STR_LENGTH] = ""; char ip[MAX_STR_LENGTH] = "";
memset(ip, 0, MAX_STR_LENGTH); memset(ip, 0, MAX_STR_LENGTH);
@ -100,7 +101,10 @@ public:
* @param ethip is the ip of the ethernet interface to stream zmq from * @param ethip is the ip of the ethernet interface to stream zmq from
*/ */
ZmqSocket (const uint32_t portnumber, const char *ethip): ZmqSocket (const uint32_t portnumber, const char *ethip):
portno (portnumber) {
portno (portnumber)
// headerMessage(0)
{
sockfd.server = true; sockfd.server = true;
// create context // create context
@ -361,18 +365,20 @@ public:
*/ */
int ReceiveHeader(const int index, Document& document, uint32_t version) int ReceiveHeader(const int index, Document& document, uint32_t version)
{ {
zmq_msg_t message; // zmq_msg_t message;
headerMessage= &message; // headerMessage= &message;
zmq_msg_init (&message); // zmq_msg_init (&message);
int len = ReceiveMessage(index, message); // int len = ReceiveMessage(index, message);
std::vector<char>buffer(MAX_STR_LENGTH);
int len = zmq_recv(socketDescriptor, buffer.data(), buffer.size(),0);
if ( len > 0 ) { if ( len > 0 ) {
bool dummy = false; bool dummy = false;
#ifdef ZMQ_DETAIL #ifdef ZMQ_DETAIL
cprintf( BLUE,"Header %d [%d] Length: %d Header:%s \n", index, portno, len, (char*) zmq_msg_data (&message) ); cprintf( BLUE,"Header %d [%d] Length: %d Header:%s \n", index, portno, len, buffer.data());
#endif #endif
if ( ParseHeader (index, len, message, document, dummy, version)) { if ( ParseHeader (index, len, buffer.data(), document, dummy, version)) {
#ifdef ZMQ_DETAIL #ifdef ZMQ_DETAIL
cprintf( RED,"Parsed Header %d [%d] Length: %d Header:%s \n", index, portno, len, (char*) zmq_msg_data (&message) ); cprintf( RED,"Parsed Header %d [%d] Length: %d Header:%s \n", index, portno, len, buffer.data() );
#endif #endif
if (dummy) { if (dummy) {
#ifdef ZMQ_DETAIL #ifdef ZMQ_DETAIL
@ -393,11 +399,11 @@ public:
/** /**
* Close Header Message. Call this function if ReceiveHeader returned 1 * Close Header Message. Call this function if ReceiveHeader returned 1
*/ */
void CloseHeaderMessage() { // void CloseHeaderMessage() {
if (headerMessage) // if (headerMessage)
zmq_msg_close(headerMessage); // zmq_msg_close(headerMessage);
headerMessage = 0; // headerMessage = 0;
}; // };
/** /**
* Parse Header * Parse Header
* @param index self index for debugging * @param index self index for debugging
@ -408,15 +414,15 @@ public:
* @param version version that has to match, -1 to not care * @param version version that has to match, -1 to not care
* @returns true if successful else false * @returns true if successful else false
*/ */
int ParseHeader(const int index, int length, zmq_msg_t& message, int ParseHeader(const int index, int length, char* buff,
Document& document, bool& dummy, uint32_t version) Document& document, bool& dummy, uint32_t version)
{ {
if ( document.Parse( (char*) zmq_msg_data (&message), zmq_msg_size (&message)).HasParseError() ) { if ( document.Parse( buff, length).HasParseError() ) {
cprintf( RED,"%d Could not parse. len:%d: Message:%s \n", index, length, (char*) zmq_msg_data (&message) ); cprintf( RED,"%d Could not parse. len:%d: Message:%s \n", index, length, buff );
fflush ( stdout ); fflush ( stdout );
char* buf = (char*) zmq_msg_data (&message); // char* buf = (char*) zmq_msg_data (&message);
for ( int i= 0; i < length; ++i ) { for ( int i= 0; i < length; ++i ) {
cprintf(RED,"%02x ",buf[i]); cprintf(RED,"%02x ",buff[i]);
} }
printf("\n"); printf("\n");
fflush( stdout ); fflush( stdout );
@ -589,5 +595,5 @@ private:
mySocketDescriptors sockfd; mySocketDescriptors sockfd;
/** Header Message pointer */ /** Header Message pointer */
zmq_msg_t* headerMessage; //zmq_msg_t* headerMessage;
}; };