From 49b4ae2f561870b73c78b6ac73a7d254e268a1c3 Mon Sep 17 00:00:00 2001 From: Dhanya Maliakal Date: Wed, 3 May 2017 17:57:56 +0200 Subject: [PATCH] changed zmq method, and resolved warnings and from esrf --- slsReceiverSoftware/include/DataStreamer.h | 6 - slsReceiverSoftware/include/ThreadObject.h | 2 +- slsReceiverSoftware/include/ZmqSocket.h | 118 +++++++++++++----- slsReceiverSoftware/include/genericSocket.h | 12 +- slsReceiverSoftware/include/logger.h | 10 +- slsReceiverSoftware/src/DataProcessor.cpp | 3 +- slsReceiverSoftware/src/DataStreamer.cpp | 47 +------ .../src/UDPStandardImplementation.cpp | 3 +- 8 files changed, 110 insertions(+), 91 deletions(-) diff --git a/slsReceiverSoftware/include/DataStreamer.h b/slsReceiverSoftware/include/DataStreamer.h index a86ee8fb2..94fbb0640 100644 --- a/slsReceiverSoftware/include/DataStreamer.h +++ b/slsReceiverSoftware/include/DataStreamer.h @@ -188,12 +188,6 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject { /** mutex to update static items among objects (threads)*/ static pthread_mutex_t Mutex; - /** Json Header Format for each measurement part */ - static const char *jsonHeaderFormat_part1; - - /** Json Header Format */ - static const char *jsonHeaderFormat; - /** GeneralData (Detector Data) object */ const GeneralData* generalData; diff --git a/slsReceiverSoftware/include/ThreadObject.h b/slsReceiverSoftware/include/ThreadObject.h index 1f4abdf26..5b5639cea 100644 --- a/slsReceiverSoftware/include/ThreadObject.h +++ b/slsReceiverSoftware/include/ThreadObject.h @@ -92,7 +92,7 @@ class ThreadObject : private virtual slsReceiverDefs { int index; /** Thread is alive/dead */ - bool alive; + volatile bool alive; /** Variable monitored by thread to kills itself */ volatile bool killThread; diff --git a/slsReceiverSoftware/include/ZmqSocket.h b/slsReceiverSoftware/include/ZmqSocket.h index 0549c3453..b14849694 100644 --- a/slsReceiverSoftware/include/ZmqSocket.h +++ b/slsReceiverSoftware/include/ZmqSocket.h @@ -17,9 +17,7 @@ #include //json header in zmq stream using namespace rapidjson; -#define DEFAULT_ZMQ_PORTNO 70001 -#define DUMMY_MSG_SIZE 3 -#define DUMMY_MSG "end" +#define DEFAULT_ZMQ_PORTNO 40001 class ZmqSocket { @@ -187,9 +185,28 @@ public: /** * Send Message Header + * @param buf message + * @param length length of message + * @param dummy true if end of acquistion else false * @returns 0 if error, else 1 */ - int SendHeaderData (char* buf, int length) { + int SendHeaderData (uint32_t jsonversion, uint32_t dynamicrange, uint32_t npixelsx, uint32_t npixelsy, + uint64_t acqIndex, uint64_t fIndex, char* fname, bool dummy, + uint64_t frameNumber, uint32_t expLength, uint32_t packetNumber, uint64_t bunchId, uint64_t timestamp, + uint16_t modId, uint16_t xCoord, uint16_t yCoord, uint16_t zCoord, uint32_t debug, uint16_t roundRNumber, + uint8_t detType, uint8_t version) { + + char buf[MAX_STR_LENGTH] = ""; + int length = sprintf(buf, jsonHeaderFormat, + jsonversion, dynamicrange, npixelsx, npixelsy, + acqIndex, fIndex, fname, dummy?1:0, + frameNumber, expLength, packetNumber, bunchId, timestamp, + modId, xCoord, yCoord, zCoord, debug, roundRNumber, + detType, version); +#ifdef VERBOSE + printf("%d Streamer: buf:%s\n", index, buf); +#endif + if(zmq_send (socketDescriptor, buf, length, ZMQ_SNDMORE) < 0) { PrintError (); return 0; @@ -199,6 +216,8 @@ public: /** * Send Message Body + * @param buf message + * @param length length of message * @returns 0 if error, else 1 */ int SendData (char* buf, int length) { @@ -213,9 +232,10 @@ public: /** * Receive Message * @param index self index for debugging + * @param message message * @returns length of message, -1 if error */ - int ReceiveMessage(const int index) { + int ReceiveMessage(const int index, zmq_msg_t& message) { int length = zmq_msg_recv (&message, socketDescriptor, 0); if (length == -1) { PrintError (); @@ -232,18 +252,27 @@ public: * @param frameIndex address of frame index * @param subframeIndex address of subframe index * @param filename address of file name - * @returns 0 if error, else 1 + * @returns 0 if error or end of acquisition, else 1 */ int ReceiveHeader(const int index, uint64_t &acqIndex, uint64_t &frameIndex, uint32_t &subframeIndex, string &filename) { + zmq_msg_t message; zmq_msg_init (&message); - if (ReceiveMessage(index) > 0) { - if (ParseHeader(index, acqIndex, frameIndex, subframeIndex, filename)) { - zmq_msg_close(&message); + int len = ReceiveMessage(index, message); + if ( len > 0 ) { + bool dummy = false; + if ( ParseHeader (index, len, message, acqIndex, frameIndex, subframeIndex, filename, dummy)) { + zmq_msg_close (&message); #ifdef VERBOSE - cprintf(BLUE,"%d header rxd\n",index); + cprintf( RED,"%d Length: %d Header:%s \n", index, length, (char*) zmq_msg_data (&message) ); #endif + if (dummy) { +#ifdef VERBOSE + cprintf(RED,"%d Received end of acquisition\n", index); +#endif + return 0; + } return 1; } } @@ -256,21 +285,13 @@ public: * @param index self index for debugging * @param buf buffer to copy image data to * @param size size of image - * @returns 0 if error, else 1 + * @returns length of data received */ int ReceiveData(const int index, int* buf, const int size) { + zmq_msg_t message; zmq_msg_init (&message); - int length = ReceiveMessage(index); - - //dummy - if (length == DUMMY_MSG_SIZE) { -#ifdef VERBOSE - cprintf(RED,"%d Received end of acquisition\n", index); -#endif - zmq_msg_close(&message); - return 0; - } + int length = ReceiveMessage(index, message); //actual data if (length == size) { @@ -287,27 +308,41 @@ public: } zmq_msg_close(&message); - return 1; + return length; }; /** * Parse Header * @param index self index for debugging + * @param length length of message + * @param message message * @param acqIndex address of acquisition index * @param frameIndex address of frame index * @param subframeIndex address of subframe index * @param filename address of file name + * @param dummy true if end of acquisition else false + * @returns true if successfull else false */ - int ParseHeader(const int index, uint64_t &acqIndex, - uint64_t &frameIndex, uint32_t &subframeIndex, string &filename) + int ParseHeader(const int index, int length, zmq_msg_t& message, uint64_t &acqIndex, + uint64_t &frameIndex, uint32_t &subframeIndex, string &filename, bool& dummy) { Document d; - if (d.Parse( (char*)zmq_msg_data(&message), zmq_msg_size(&message)).HasParseError()) { - cprintf (RED,"Error: Could not parse header for socket %d\n",index); + if ( d.Parse( (char*) zmq_msg_data (&message), zmq_msg_size (&message)).HasParseError() ) { + cprintf( RED,"%d Could not parse. len:%d: Message:%s \n", index, length, (char*) zmq_msg_data (&message) ); + fflush ( stdout ); + char* buf = (char*) zmq_msg_data (&message); + for ( int i= 0; i < length; ++i ) { + cprintf(RED,"%02x ",buf[i]); + } + printf("\n"); + fflush( stdout ); return 0; } - if(d["acqIndex"].GetUint64()!=(uint64_t)-1) { + + int temp = d["data"].GetUint(); + dummy = temp ? true : false; + if (dummy) { acqIndex = d["acqIndex"].GetUint64(); frameIndex = d["fIndex"].GetUint64(); subframeIndex = -1; @@ -316,6 +351,7 @@ public: } filename = d["fname"].GetString(); #ifdef VERYVERBOSE + cout << "Data: " << temp << endl; cout << "Acquisition index: " << acqIndex << endl; cout << "Frame index: " << frameIndex << endl; cout << "Subframe index: " << subframeIndex << endl; @@ -399,6 +435,30 @@ private: /** Server Address */ char serverAddress[1000]; - /** Zmq Message */ - zmq_msg_t message; + /** Json Header Format */ + static const char* jsonHeaderFormat = + "{" + "\"jsonversion\":%u, " + "\"bitmode\":%d, " + "\"shape\":[%d, %d], " + "\"acqIndex\":%llu, " + "\"fIndex\":%llu, " + "\"fname\":\"%s\", " + "\"data\": %d, " + + "\"frameNumber\":%llu, " + "\"expLength\":%u, " + "\"packetNumber\":%u, " + "\"bunchId\":%llu, " + "\"timestamp\":%llu, " + "\"modId\":%u, " + "\"xCoord\":%u, " + "\"yCoord\":%u, " + "\"zCoord\":%u, " + "\"debug\":%u, " + "\"roundRNumber\":%u, " + "\"detType\":%u, " + "\"version\":%u" + "}\n\0"; + }; diff --git a/slsReceiverSoftware/include/genericSocket.h b/slsReceiverSoftware/include/genericSocket.h index 61e6a4eb7..75064c1c5 100644 --- a/slsReceiverSoftware/include/genericSocket.h +++ b/slsReceiverSoftware/include/genericSocket.h @@ -101,9 +101,9 @@ enum communicationProtocol{ { memset(&serverAddress, 0, sizeof(serverAddress)); memset(&clientAddress, 0, sizeof(clientAddress)); - strcpy(lastClientIP,"none"); - strcpy(thisClientIP,"none1"); - strcpy(dummyClientIP,"dummy"); + memset(lastClientIP,0,INET_ADDRSTRLEN); + memset(thisClientIP,0,INET_ADDRSTRLEN); + memset(dummyClientIP,0,INET_ADDRSTRLEN); differentClients = 0; struct hostent *hostInfo = gethostbyname(host_ip_or_name); if (hostInfo == NULL){ @@ -166,9 +166,9 @@ enum communicationProtocol{ /* // you can specify an IP address: */ /* // or you can let it automatically select one: */ /* myaddr.sin_addr.s_addr = INADDR_ANY; */ - strcpy(lastClientIP,"none"); - strcpy(thisClientIP,"none1"); - strcpy(dummyClientIP,"dummy"); + memset(lastClientIP,0,INET_ADDRSTRLEN); + memset(thisClientIP,0,INET_ADDRSTRLEN); + memset(dummyClientIP,0,INET_ADDRSTRLEN); differentClients = 0; diff --git a/slsReceiverSoftware/include/logger.h b/slsReceiverSoftware/include/logger.h index 10de5ede3..61b3141a7 100644 --- a/slsReceiverSoftware/include/logger.h +++ b/slsReceiverSoftware/include/logger.h @@ -6,16 +6,14 @@ #include #include -#ifdef VERBOSE -#define FILELOG_MAX_LEVEL logDEBUG -#endif -#ifdef VERYVERBOSE -#define FILELOG_MAX_LEVEL logDEBUG4 -#endif #ifdef FIFODEBUG #define FILELOG_MAX_LEVEL logDEBUG5 +#elif VERYVERBOSE +#define FILELOG_MAX_LEVEL logDEBUG4 +#elif VERBOSE +#define FILELOG_MAX_LEVEL logDEBUG #endif #ifndef FILELOG_MAX_LEVEL diff --git a/slsReceiverSoftware/src/DataProcessor.cpp b/slsReceiverSoftware/src/DataProcessor.cpp index 2d15507e2..410e098e9 100644 --- a/slsReceiverSoftware/src/DataProcessor.cpp +++ b/slsReceiverSoftware/src/DataProcessor.cpp @@ -316,7 +316,7 @@ void DataProcessor::ProcessAnImage(char* buf) { if (*fileWriteEnable) file->WriteToFile(buf, generalData->fifoBufferSize + sizeof(sls_detector_header), fnum-firstMeasurementIndex); - if (rawDataReadyCallBack) + if (rawDataReadyCallBack) { rawDataReadyCallBack( header->frameNumber, header->expLength, @@ -334,6 +334,7 @@ void DataProcessor::ProcessAnImage(char* buf) { buf + sizeof(sls_detector_header), generalData->imageSize, pRawDataReady); + } } diff --git a/slsReceiverSoftware/src/DataStreamer.cpp b/slsReceiverSoftware/src/DataStreamer.cpp index 8fd5c97bc..885cd0713 100644 --- a/slsReceiverSoftware/src/DataStreamer.cpp +++ b/slsReceiverSoftware/src/DataStreamer.cpp @@ -23,30 +23,6 @@ uint64_t DataStreamer::RunningMask(0x0); pthread_mutex_t DataStreamer::Mutex = PTHREAD_MUTEX_INITIALIZER; -const char* DataStreamer::jsonHeaderFormat = - "{" - "\"jsonversion\":%u, " - "\"bitmode\":%d, " - "\"shape\":[%d, %d], " - "\"acqIndex\":%llu, " - "\"fIndex\":%llu, " - "\"fname\":\"%s\", " - - "\"frameNumber\":%llu, " - "\"expLength\":%u, " - "\"packetNumber\":%u, " - "\"bunchId\":%llu, " - "\"timestamp\":%llu, " - "\"modId\":%u, " - "\"xCoord\":%u, " - "\"yCoord\":%u, " - "\"zCoord\":%u, " - "\"debug\":%u, " - "\"roundRNumber\":%u, " - "\"detType\":%u, " - "\"version\":%u" - "}"; - DataStreamer::DataStreamer(Fifo*& f, uint32_t* dr, uint32_t* freq, uint32_t* timer, int* sEnable) : ThreadObject(NumberofDataStreamers), @@ -232,9 +208,6 @@ void DataStreamer::StopProcessing(char* buf) { if (!SendHeader(header, true)) cprintf(RED,"Error: Could not send zmq dummy header for streamer %d\n", index); - if (!zmqSocket->SendData((char*)DUMMY_MSG, DUMMY_MSG_SIZE)) - cprintf(RED,"Error: Could not send zmq dummy message for streamer %d\n", index); - fifo->FreeAddress(buf); StopRunning(); #ifdef VERBOSE @@ -324,20 +297,12 @@ int DataStreamer::SendHeader(sls_detector_header* header, bool dummy) { uint64_t acquisitionIndex = header->frameNumber - firstAcquisitionIndex; uint32_t subframeIndex = header->expLength; - char buf[1000] = ""; + return zmqSocket->SendHeaderData(SLS_DETECTOR_JSON_HEADER_VERSION, *dynamicRange, + generalData->nPixelsX_Streamer, generalData->nPixelsY_Streamer, + acquisitionIndex, frameIndex, fileNametoStream, dummy, - if (dummy) { - frameIndex = -1; - acquisitionIndex = -1; - subframeIndex = -1; - } - - int len = sprintf(buf, jsonHeaderFormat, - SLS_DETECTOR_JSON_HEADER_VERSION, *dynamicRange, generalData->nPixelsX_Streamer, generalData->nPixelsY_Streamer, acquisitionIndex, frameIndex, fileNametoStream, header->frameNumber, header->expLength, header->packetNumber, header->bunchId, header->timestamp, - header->modId, header->xCoord, header->yCoord, header->zCoord, header->debug, header->roundRNumber, header->detType, header->version); -#ifdef VERBOSE - printf("%d Streamer: buf:%s\n", index, buf); -#endif - return zmqSocket->SendHeaderData(buf, len); + header->modId, header->xCoord, header->yCoord, header->zCoord, header->debug, header->roundRNumber, + header->detType, header->version + ); } diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index f66d1a2d6..866b9cacc 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -444,8 +444,9 @@ int UDPStandardImplementation::startReceiver(char *c) { if (startAcquisitionCallBack) { startAcquisitionCallBack(filePath, fileName, fileIndex, (generalData->fifoBufferSize) * numberofJobs + (generalData->fifoBufferHeaderSize), pStartAcquisition); - if (rawDataReadyCallBack != NULL) + if (rawDataReadyCallBack != NULL) { cout << "Data Write has been defined externally" << endl; + } } //processor->writer