diff --git a/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp b/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp index ae16c2a31..2477ebab8 100644 --- a/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp +++ b/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp @@ -24,7 +24,7 @@ ID: $Id$ #include #include #include - +#include //json header in zmq stream char ans[MAX_STR_LENGTH]; @@ -6015,243 +6015,165 @@ int multiSlsDetector::createReceivingDataSockets(const bool destroy){ - - - -int multiSlsDetector::getData(const int isocket, char* image, const int size, - uint64_t &acqIndex, uint64_t &frameIndex, uint32_t &subframeIndex, - string &filename, uint64_t &fileIndex) { - - //fail is on parse error or end of acquisition - if (!zmqSocket[isocket]->ReceiveHeader(isocket, acqIndex, frameIndex, subframeIndex, filename, fileIndex)) - return FAIL; - - //receiving incorrect size is replaced by 0xFF - zmqSocket[isocket]->ReceiveData(isocket, image, size); - - return OK; -} - - - - - void multiSlsDetector::readFrameFromReceiver(){ - //determine number of sockets - int numSockets = thisMultiDetector->numberOfDetectors; - int numSocketsPerSLSDetector = 1; - bool jungfrau = false; - bool eiger = false; - /*double* gdata = NULL;*/ - slsDetectorDefs::detectorType myDetType = getDetectorsType(); - switch(myDetType){ - case EIGER: - eiger = true; - numSocketsPerSLSDetector = 2; - numSockets *= numSocketsPerSLSDetector; - break; - case JUNGFRAU: - jungfrau = true; - break; - default: - break; - } + int numSockets = thisMultiDetector->numberOfDetectors; + bool gappixelsenable = false; + if (getDetectorsType() == EIGER) { + numSockets *= 2; + gappixelsenable = detectors[0]->enableGapPixels(-1) >= 1 ? true: false; + } - //gui variables - uint64_t currentAcquisitionIndex = -1; - uint64_t currentFrameIndex = -1; - uint32_t currentSubFrameIndex = -1; - uint64_t currentFileIndex = -1; - string currentFileName = ""; + bool runningList[numSockets]; + bool connectList[numSockets]; + int numRunning = 0; + for(int i = 0; i < numSockets; ++i) { + if(!zmqSocket[i]->Connect()) { + connectList[i] = true; + runningList[i] = true; + ++numRunning; + } else { + // to remember the list it connected to, to disconnect later + connectList[i] = false; + cprintf(RED,"Error: Could not connect to socket %s\n",zmqSocket[i]->GetZmqServerAddress()); + runningList[i] = false; + } + } + int numConnected = numRunning; + bool data = false; + char* image = NULL; + char* multiframe = NULL; + char* multigappixels = NULL + int multisize = 0; + // header info + uint32_t size = 0; // only first message header + uint32_t nPixelsx = 0, nPixelsY = 0; + uint32_t dynamicRange = 0; + string currentFileName = ""; + uint64_t currentAcquisitionIndex = -1; + uint64_t currentFrameIndex = -1; + uint32_t currentSubFrameIndex = -1; + uint64_t currentFileIndex = -1; - //getting sls values - int slsdatabytes = 0, slsmaxchannels = 0, slsmaxX = 0, slsmaxY=0; - double bytesperchannel = 0; - bool gappixelsenable = false; - if(detectors[0]){ - slsmaxchannels = detectors[0]->getMaxNumberOfChannels(X) * detectors[0]->getMaxNumberOfChannels(Y); - slsdatabytes = detectors[0]->getDataBytes(); - bytesperchannel = (double)slsdatabytes/(double)slsmaxchannels; + //wait for real time acquisition to start + bool running = true; + sem_wait(&sem_newRTAcquisition); + if(checkJoinThread()) + running = false; + + //exit when checkJoinThread() (all sockets done) + while(running){ + + //get each frame + for(int isocket=0; isocketReceiveHeader(isocket, doc, SLS_DETECTOR_JSON_HEADER_VERSION)){ + zmqSocket[isocket]->CloseHeaderMessage(); + // parse error, version error or end of acquisition for socket + runningList[isocket] = false; + --numRunning; + continue; + } + + // if first message, allocate (all one time stuff) + if (image == NULL) { + // allocate + size = doc["size"].GetUint(); + multisize = size * numSockets; + image = new char[size]; + multiframe = new char[multisize]; + + // one time values + // dynamic range + dynamicRange = doc["bitmode"].GetUint(); + // shape + if (dynamicRange == 4) { + nPixelsx = thisMultiDetector->numberOfChannelInclGapPixels[X]; + nPixelsY = thisMultiDetector->numberOfChannelInclGapPixels[Y]; + } else { + const Value& a = doc["shape"]; + nPixelsx = a[0].GetUint(); /* later try doc["shape"].GetUint();*/ + nPixelsY = a[1].GetUint(); + } + } + // parse rest of header + currentFileName = doc["fname"].GetString(); + currentAcquisitionIndex = doc["acqIndex"].GetUint64(); + currentFrameIndex = doc["fIndex"].GetUint64(); + currentFileIndex = doc["fileIndex"].GetUint64(); + currentSubFrameIndex = doc["expLength"].GetUint(); + zmqSocket[isocket]->CloseHeaderMessage(); + + // copying data (receiving incorrect size is replaced by 0xFF) + data = true; + zmqSocket[isocket]->ReceiveData(isocket, image, size); + + // creaing multi image - // recalculate with gap pixels (for >= 8 bit mode) - if (bytesperchannel >= 1.0) { - slsdatabytes = detectors[0]->getDataBytesInclGapPixels(); - slsmaxchannels = detectors[0]->getMaxNumberOfChannelsInclGapPixels(X)*detectors[0]->getMaxNumberOfChannelsInclGapPixels(Y); - } + } - slsmaxX = (bytesperchannel >= 1.0) ? detectors[0]->getTotalNumberOfChannelsInclGapPixels(X) : detectors[0]->getTotalNumberOfChannels(X); - slsmaxY = (bytesperchannel >= 1.0) ? detectors[0]->getTotalNumberOfChannelsInclGapPixels(Y) : detectors[0]->getTotalNumberOfChannels(Y); - gappixelsenable = detectors[0]->enableGapPixels(-1) >= 1 ? true: false; - } - // max channel values - int maxX = (bytesperchannel >= 1.0) ? thisMultiDetector->numberOfChannelInclGapPixels[X] : thisMultiDetector->numberOfChannel[X]; - int maxY = (bytesperchannel >= 1.0) ? thisMultiDetector->numberOfChannelInclGapPixels[Y] : thisMultiDetector->numberOfChannel[Y]; - int multidatabytes = (bytesperchannel >= 1.0) ? thisMultiDetector->dataBytesInclGapPixels : thisMultiDetector->dataBytes; - int dr = bytesperchannel * 8; - if (myDetType == JUNGFRAUCTB) { - maxY = (int)(thisMultiDetector->timerValue[SAMPLES_JCTB] * 2)/25; // for moench 03 - maxX = 400; - dr = 16; - } + } + //send data to callback + if(data){ + // 4bit gap pixels + if (dynamicRange == 4 && gappixelsenable) { + int n = processImageWithGapPixels(multiframe, multigappixels); + thisData = new detectorData(NULL,NULL,NULL,getCurrentProgress(), + currentFileName.c_str(), nPixelsx, nPixelsY, + multigappixels, n, dynamicRange, currentFileIndex); + } + // normal pixels + else + thisData = new detectorData(NULL, NULL, NULL, getCurrentProgress(), + currentFileName.c_str(), nPixelsx, nPixelsY, + multiframe, multisize, dynamicRange, currentFileIndex); + dataReady(thisData, currentFrameIndex, + ((dynamicRange == 32) ? currentSubFrameIndex : -1), + pCallbackArg); + delete thisData; + setCurrentProgress(currentAcquisitionIndex + 1); + } + //all done + if(!numRunning){ + // let main thread know that all dummy packets have been received (also from external process), + // main thread can now proceed to measurement finished call back + sem_post(&sem_endRTAcquisition); + // wait for next scan/measurement, else join thread + sem_wait(&sem_newRTAcquisition); + //done with complete acquisition + if(checkJoinThread()) + running = false; + else{ + //starting a new scan/measurement (got dummy data) + for(int i = 0; i < numSockets; ++i) + runningList[i] = connectList[i]; + numRunning = numConnected; + } + } - //getting multi values - //calculating offsets (for eiger interleaving ports) - int offsetX[numSockets]; int offsetY[numSockets]; - int bottom[numSockets]; - if(eiger){ - for(int i=0; ioffsetY[i/numSocketsPerSLSDetector] + slsmaxY)) * maxX * bytesperchannel; - //the left half or right half - if(!(i%numSocketsPerSLSDetector)) - offsetX[i] = thisMultiDetector->offsetX[i/numSocketsPerSLSDetector]; - else - offsetX[i] = thisMultiDetector->offsetX[i/numSocketsPerSLSDetector] + (slsmaxX/numSocketsPerSLSDetector); - offsetX[i] *= bytesperchannel; - bottom[i] = detectors[i/numSocketsPerSLSDetector]->getFlippedData(X);/*only for eiger*/ - } - } + } - int expectedslssize = slsdatabytes/numSocketsPerSLSDetector; - char* image = new char[expectedslssize](); - char* multiframe = new char[multidatabytes](); - char* multiframegain = NULL; - char* multigappixels = NULL; // used only for 4 bit mode with gap pixels enabled - if (jungfrau) - multiframegain = new char[multidatabytes](); - - - bool runningList[numSockets]; - bool connectList[numSockets]; - for(int i = 0; i < numSockets; ++i) { - if(!zmqSocket[i]->Connect()) { - connectList[i] = true; - runningList[i] = true; - } else { - connectList[i] = false; - cprintf(RED,"Error: Could not connect to socket %s\n",zmqSocket[i]->GetZmqServerAddress()); - runningList[i] = false; - } - } - int numRunning = numSockets; - - - //wait for real time acquisition to start - bool running = true; - sem_wait(&sem_newRTAcquisition); - if(checkJoinThread()) - running = false; - - - //exit when last message for each socket received - while(running){ - memset(multiframe,0xFF,slsdatabytes*thisMultiDetector->numberOfDetectors); //reset frame memory - - //get each frame - for(int isocket=0; isocketoffsetY[isocket] + i) * maxX) + thisMultiDetector->offsetX[isocket])* (int)bytesperchannel, - (char*)image+ (i*slsmaxX*(int)bytesperchannel), - (slsmaxX*(int)bytesperchannel)); - } - } - } - - } - - - //all done - if(!numRunning){ - // let main thread know that all dummy packets have been received (also from external process), - // main thread can now proceed to measurement finished call back - sem_post(&sem_endRTAcquisition); - // wait for next scan/measurement, else join thread - sem_wait(&sem_newRTAcquisition); - //done with complete acquisition - if(checkJoinThread()) - break; - else{ - //starting a new scan/measurement (got dummy data) - for(int i = 0; i < numSockets; ++i) - runningList[i] = true; - numRunning = numSockets; - running = false; - } - } - - //send data to callback - if(running){ - if (gappixelsenable && bytesperchannel < 1) {//inside this function, allocate if it doesnt exist - int nx = thisMultiDetector->numberOfChannelInclGapPixels[X]; - int ny = thisMultiDetector->numberOfChannelInclGapPixels[Y]; - int n = processImageWithGapPixels(multiframe, multigappixels); - thisData = new detectorData(NULL,NULL,NULL,getCurrentProgress(),currentFileName.c_str(), nx, ny,multigappixels, n, dr, currentFileIndex); - } - else { - thisData = new detectorData(NULL,NULL,NULL,getCurrentProgress(),currentFileName.c_str(),maxX,maxY,multiframe, multidatabytes, dr, currentFileIndex); - } - dataReady(thisData, currentFrameIndex, (((dr == 32) && (eiger)) ? currentSubFrameIndex : -1), pCallbackArg); - delete thisData; - //cout<<"Send frame #"<< currentFrameIndex << " to gui"<Disconnect(); - - //free resources - delete [] image; - delete[] multiframe; - if (jungfrau) - delete [] multiframegain; - if (multigappixels != NULL) - delete [] multigappixels; + // Disconnect resources + for (int i = 0; i < numSockets; ++i) + if (connectList[i]) + zmqSocket[i]->Disconnect(); + //free resources + if (image != NULL) delete [] image; + if (multiframe != NULL) delete [] multiframe; + if (multigappixels != NULL) delete [] multigappixels; } diff --git a/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.h b/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.h index a0875185d..79235315d 100644 --- a/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.h +++ b/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.h @@ -1361,6 +1361,7 @@ class multiSlsDetector : public slsDetectorUtils { /** Reads frames from receiver through a constant socket */ void readFrameFromReceiver(); + /** Locks/Unlocks the connection to the receiver /param lock sets (1), usets (0), gets (-1) the lock /returns lock status of the receiver @@ -1556,21 +1557,6 @@ class multiSlsDetector : public slsDetectorUtils { private: - /** - * Gets data from socket - * @param isocket socket index - * @param image image buffer - * @param size size of image - * @param acqIndex address of acquisition index - * @param frameIndex address of frame index - * @param subframeIndex address of subframe index - * @param filename address of file name - * @param fileindex address of file index - */ - int getData(const int isocket, char* image, const int size, - uint64_t &acqIndex, uint64_t &frameIndex, uint32_t &subframeIndex, - string &filename, uint64_t &fileIndex); - /** * add gap pixels to the image (only for Eiger in 4 bit mode) diff --git a/slsDetectorSoftware/slsDetector/slsDetectorBase.h b/slsDetectorSoftware/slsDetector/slsDetectorBase.h index 11dd4ac0b..d30c3be9a 100644 --- a/slsDetectorSoftware/slsDetector/slsDetectorBase.h +++ b/slsDetectorSoftware/slsDetector/slsDetectorBase.h @@ -576,6 +576,7 @@ class slsDetectorBase : public virtual slsDetectorDefs, public virtual errorDef */ virtual void readFrameFromReceiver()=0; + /** * Enable data streaming to client * @param enable 0 to disable, 1 to enable, -1 to get the value diff --git a/slsReceiverSoftware/include/ZmqSocket.h b/slsReceiverSoftware/include/ZmqSocket.h index 8fc063670..56fa5c91b 100644 --- a/slsReceiverSoftware/include/ZmqSocket.h +++ b/slsReceiverSoftware/include/ZmqSocket.h @@ -48,7 +48,8 @@ public: portno (portnumber), server (false), contextDescriptor (NULL), - socketDescriptor (NULL) + socketDescriptor (NULL), + headerMessage(0) { char ip[MAX_STR_LENGTH] = ""; memset(ip, 0, MAX_STR_LENGTH); @@ -104,7 +105,8 @@ public: portno (portnumber), server (true), contextDescriptor (NULL), - socketDescriptor (NULL) + socketDescriptor (NULL), + headerMessage(0) { // create context contextDescriptor = zmq_ctx_new(); @@ -365,19 +367,16 @@ public: /** - * Receive Header + * Receive Header (Important to close message after parsing header) * @param index self index for debugging - * @param acqIndex address of acquisition index - * @param frameIndex address of frame index - * @param subframeIndex address of subframe index - * @param filename address of file name - * @param fileindex address of file index - * @returns 0 if error or end of acquisition, else 1 + * @param document parsed document reference + * @param version version that has to match, -1 to not care + * @returns 0 if error or end of acquisition, else 1 (call CloseHeaderMessage after parsing header) */ - int ReceiveHeader(const int index, uint64_t &acqIndex, - uint64_t &frameIndex, uint32_t &subframeIndex, std::string &filename, uint64_t &fileIndex) + int ReceiveHeader(const int index, Document& document, uint32_t version) { zmq_msg_t message; + headerMessage= &message; zmq_msg_init (&message); int len = ReceiveMessage(index, message); if ( len > 0 ) { @@ -385,11 +384,10 @@ public: #ifdef ZMQ_DETAIL cprintf( BLUE,"Header %d [%d] Length: %d Header:%s \n", index, portno, len, (char*) zmq_msg_data (&message) ); #endif - if ( ParseHeader (index, len, message, acqIndex, frameIndex, subframeIndex, filename, fileIndex, dummy)) { + if ( ParseHeader (index, len, message, document, dummy, version)) { #ifdef ZMQ_DETAIL cprintf( RED,"Parsed Header %d [%d] Length: %d Header:%s \n", index, portno, len, (char*) zmq_msg_data (&message) ); #endif - zmq_msg_close (&message); if (dummy) { #ifdef ZMQ_DETAIL cprintf(RED,"%d [%d] Received end of acquisition\n", index, portno ); @@ -402,10 +400,79 @@ public: return 1; } } - zmq_msg_close(&message); return 0; }; + + /** + * Close Header Message. Call this function if ReceiveHeader returned 1 + */ + void CloseHeaderMessage() { + if (headerMessage) + zmq_msg_close(headerMessage); + headerMessage = 0; + }; + /** + * Parse Header + * @param index self index for debugging + * @param length length of message + * @param message message + * @param document parsed document reference + * @param dummy true if end of acqusition, else false, loaded upon parsing + * @param version version that has to match, -1 to not care + * @returns true if successful else false + */ + int ParseHeader(const int index, int length, zmq_msg_t& message, + Document& document, bool& dummy, uint32_t version) + { + if ( document.Parse( (char*) zmq_msg_data (&message), zmq_msg_size (&message)).HasParseError() ) { + cprintf( RED,"%d Could not parse. len:%d: Message:%s \n", index, length, (char*) zmq_msg_data (&message) ); + fflush ( stdout ); + char* buf = (char*) zmq_msg_data (&message); + for ( int i= 0; i < length; ++i ) { + cprintf(RED,"%02x ",buf[i]); + } + printf("\n"); + fflush( stdout ); + return 0; + } + + if (document["jsonversion"].GetUint() != version) { + cprintf( RED, "version mismatch. required %u, got %u\n", version, document["jsonversion"].GetUint()); + return 0; + } + + dummy = false; + int temp = document["data"].GetUint(); + dummy = temp ? false : true; + + return 1; + /* + int temp = d["data"].GetUint(); + dummy = temp ? false : true; + if (!dummy) { + acqIndex = d["acqIndex"].GetUint64(); + frameIndex = d["fIndex"].GetUint64(); + fileIndex = d["fileIndex"].GetUint64(); + subframeIndex = d["expLength"].GetUint(); + filename = d["fname"].GetString(); + } +#ifdef VERYVERBOSE + cprintf(BLUE,"%d Dummy:%d\n" + "\tAcqIndex:%lu\n" + "\tFrameIndex:%lu\n" + "\tSubIndex:%u\n" + "\tFileIndex:%lu\n" + "\tBitMode:%u\n" + "\tDetType:%u\n", + index, (int)dummy, acqIndex, frameIndex, subframeIndex, fileIndex, + d["bitmode"].GetUint(),d["detType"].GetUint()); +#endif + return 1; + */ + }; + + /** * Receive Data * @param index self index for debugging @@ -446,65 +513,6 @@ public: }; - /** - * Parse Header - * @param index self index for debugging - * @param length length of message - * @param message message - * @param acqIndex address of acquisition index - * @param frameIndex address of frame index - * @param subframeIndex address of subframe index - * @param filename address of file name - * @param fileindex address of file index - * @param dummy true if end of acquisition else false - * @returns true if successfull else false - */ - int ParseHeader(const int index, int length, zmq_msg_t& message, uint64_t &acqIndex, - uint64_t &frameIndex, uint32_t &subframeIndex, std::string &filename, uint64_t &fileIndex, bool& dummy) - { - - acqIndex = -1; - frameIndex = -1; - subframeIndex = -1; - fileIndex = -1; - dummy = true; - - Document d; - if ( d.Parse( (char*) zmq_msg_data (&message), zmq_msg_size (&message)).HasParseError() ) { - cprintf( RED,"%d Could not parse. len:%d: Message:%s \n", index, length, (char*) zmq_msg_data (&message) ); - fflush ( stdout ); - char* buf = (char*) zmq_msg_data (&message); - for ( int i= 0; i < length; ++i ) { - cprintf(RED,"%02x ",buf[i]); - } - printf("\n"); - fflush( stdout ); - return 0; - } - - int temp = d["data"].GetUint(); - dummy = temp ? false : true; - if (!dummy) { - acqIndex = d["acqIndex"].GetUint64(); - frameIndex = d["fIndex"].GetUint64(); - fileIndex = d["fileIndex"].GetUint64(); - subframeIndex = d["expLength"].GetUint(); - filename = d["fname"].GetString(); - } -#ifdef VERYVERBOSE - cprintf(BLUE,"%d Dummy:%d\n" - "\tAcqIndex:%lu\n" - "\tFrameIndex:%lu\n" - "\tSubIndex:%u\n" - "\tFileIndex:%lu\n" - "\tBitMode:%u\n" - "\tDetType:%u\n", - index, (int)dummy, acqIndex, frameIndex, subframeIndex, fileIndex, - d["bitmode"].GetUint(),d["detType"].GetUint()); -#endif - return 1; - }; - /** * Print error @@ -580,4 +588,7 @@ private: /** Server Address */ char serverAddress[1000]; + /** Header Message pointer */ + zmq_msg_t* headerMessage; + }; diff --git a/slsReceiverSoftware/include/receiver_defs.h b/slsReceiverSoftware/include/receiver_defs.h index 7884ffead..5b7b924c9 100755 --- a/slsReceiverSoftware/include/receiver_defs.h +++ b/slsReceiverSoftware/include/receiver_defs.h @@ -27,8 +27,7 @@ //versions #define HDF5_WRITER_VERSION 1.0 //1 decimal places #define BINARY_WRITER_VERSION 1.0 //1 decimal places -#define SLS_DETECTOR_HEADER_VERSION 0x1 -#define SLS_DETECTOR_JSON_HEADER_VERSION 0x2 + //parameters to calculate fifo depth #define SAMPLE_TIME_IN_NS 100000000//100ms diff --git a/slsReceiverSoftware/include/sls_receiver_defs.h b/slsReceiverSoftware/include/sls_receiver_defs.h index 0e4df0593..6548157c7 100755 --- a/slsReceiverSoftware/include/sls_receiver_defs.h +++ b/slsReceiverSoftware/include/sls_receiver_defs.h @@ -36,6 +36,9 @@ typedef int int32_t; #define DEFAULT_ZMQ_CL_PORTNO 30001 #define DEFAULT_ZMQ_RX_PORTNO 30001 +#define SLS_DETECTOR_HEADER_VERSION 0x1 +#define SLS_DETECTOR_JSON_HEADER_VERSION 0x2 + /** \file sls_receiver_defs.h This file contains all the basic definitions common to the slsReceiver class