From a3e12e795526213c352443734d67d64e7fb14b9b Mon Sep 17 00:00:00 2001 From: Dhanya Maliakal Date: Thu, 15 Oct 2015 12:11:06 +0200 Subject: [PATCH] somewhere --- .../include/UDPStandardImplementation.h | 37 ++++- .../include/sls_receiver_defs.h | 8 +- .../src/UDPBaseImplementation.cpp | 18 +- .../src/UDPStandardImplementation.cpp | 155 ++++++++---------- 4 files changed, 106 insertions(+), 112 deletions(-) diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index bbe526c40..0c1d4ac43 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -77,8 +77,9 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase * Overridden method * Set data compression, by saving only hits (so far implemented only for Moench and Gotthard) * @param b true for data compression enable, else false + * @return OK or FAIL */ - void setDataCompressionEnable(const bool b); + int setDataCompressionEnable(const bool b); //***acquisition parameters*** /** @@ -367,7 +368,33 @@ private: */ uint32_t processListeningBuffer(int ithread, int cSize,char* temp); - bool popAndCheckEndofAcquisition(char* wbuffer[], bool ready[], uint32_t nP[],char* toFree[],int toFreeOffset[]); + /** + * Called by StartWriting + * Pops buffer from all the FIFOs and checks for dummy frames and end of acquisition + * @param ithread current thread index + * @param wbuffer the buffer array that is popped from all the FIFOs + * @param ready if that FIFO is allowed to pop (depends on if dummy buffer already popped/ waiting for other FIFO to finish a frame(eiger)) + * @param nP number of packets in the buffer popped out + * @param toFree array of addresses to pop into fifoFree (eiger specific) + * @param toFreeOffset the number of addresses to free for each FIFO (eiger specific) + * @return true if end of acquisition else false + */ + bool popAndCheckEndofAcquisition(int ithread, char* wbuffer[], bool ready[], uint32_t nP[],char* toFree[],int toFreeOffset[]); + + /** + * Called by StartWriting + * When dummy-end buffers are popped from all FIFOs (acquisition over), this is called + * It frees the FIFO addresses, closes all files + * For data compression, it waits for all threads to be done + * Changes the status to RUN_FINISHED and prints statistics + * @param ithread writing thread index + * @param wbuffer writing buffer popped out from FIFO + */ + void stopWriting(int ithread, char* wbuffer[]); + + void processWritingBuffer(int ithread, char* wbuffer[], uint32_t nP[]); + void processWritingBufferPacketByPacket(); + /************************************************************************* * Class Members ********************************************************* @@ -669,11 +696,7 @@ private: */ void writeToFile_withoutCompression(char* buf[],int numpackets, uint32_t framenum); - /** - * When acquisition is over, this is called - * @param ithread listening thread number - */ - void stopWriting(int ithread, char* wbuffer[]); + /** * updates parameters and writes to file when not a dummy frame diff --git a/slsReceiverSoftware/include/sls_receiver_defs.h b/slsReceiverSoftware/include/sls_receiver_defs.h index 04b8b19fb..fe0c715b1 100755 --- a/slsReceiverSoftware/include/sls_receiver_defs.h +++ b/slsReceiverSoftware/include/sls_receiver_defs.h @@ -8,8 +8,10 @@ #endif #include +#include #include "ansi.h" + typedef double double32_t; typedef float float32_t; typedef int int32_t; @@ -115,9 +117,9 @@ public: \param b true or false \returns string enabled, disabled */ - static string stringEnable(bool b){\ - if(b) return string("enabled"); \ - else return string("disabled"); \ + static std::string stringEnable(bool b){\ + if(b) return std::string("enabled"); \ + else return std::string("disabled"); \ }; diff --git a/slsReceiverSoftware/src/UDPBaseImplementation.cpp b/slsReceiverSoftware/src/UDPBaseImplementation.cpp index 75f7fa244..18e43edb5 100644 --- a/slsReceiverSoftware/src/UDPBaseImplementation.cpp +++ b/slsReceiverSoftware/src/UDPBaseImplementation.cpp @@ -5,9 +5,9 @@ ***********************************************/ #include "UDPBaseImplementation.h" +#include "genericSocket.h" #include // stat - #include #include using namespace std; @@ -75,10 +75,6 @@ UDPBaseImplementation::~UDPBaseImplementation(){ FILE_LOG(logDEBUG) << __AT__ << " starting"; cout << "Info: Deleting base member pointers" << endl; - if(detHostname) {delete [] detHostname; detHostname = NULL;} - if(eth) {delete [] eth; eth = NULL;} - if(fileName) {delete [] fileName; fileName = NULL;} - if(filePath) {delete [] filePath; filePath = NULL;} } @@ -129,7 +125,7 @@ char *UDPBaseImplementation::getFilePath() const{ return output; } -uint32_t UDPBaseImplementation::getFileIndex() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return fileIndex;} +uint64_t UDPBaseImplementation::getFileIndex() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return fileIndex;} int UDPBaseImplementation::getScanTag() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return scanTag;} @@ -234,7 +230,7 @@ void UDPBaseImplementation::setFilePath(const char c[]){ FILE_LOG(logINFO) << "File path:" << filePath; } -void UDPBaseImplementation::setFileIndex(const uint32_t i){ +void UDPBaseImplementation::setFileIndex(const uint64_t i){ FILE_LOG(logDEBUG) << __AT__ << " starting"; fileIndex = i; @@ -394,7 +390,7 @@ void UDPBaseImplementation::resetAcquisitionCount(){ FILE_LOG(logINFO) << "totalPacketsCaught:" << totalPacketsCaught << endl; } -int UDPBaseImplementation::startReceiver(char *c=NULL){ +int UDPBaseImplementation::startReceiver(char *c){ FILE_LOG(logWARNING) << __AT__ << " doing nothing..."; FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes"; return OK; @@ -433,17 +429,17 @@ void UDPBaseImplementation::closeFile(int i){ /***callback functions***/ -void UDPBaseImplementation::registerCallBackStartAcquisition(int (*func)(char*, char*,int, int, void*),void *arg){ +void UDPBaseImplementation::registerCallBackStartAcquisition(int (*func)(char*, char*,uint64_t, uint32_t, void*),void *arg){ startAcquisitionCallBack=func; pStartAcquisition=arg; } -void UDPBaseImplementation::registerCallBackAcquisitionFinished(void (*func)(int, void*),void *arg){ +void UDPBaseImplementation::registerCallBackAcquisitionFinished(void (*func)(uint64_t, void*),void *arg){ acquisitionFinishedCallBack=func; pAcquisitionFinished=arg; } -void UDPBaseImplementation::registerCallBackRawDataReady(void (*func)(int, char*, int, FILE*, char*, void*),void *arg){ +void UDPBaseImplementation::registerCallBackRawDataReady(void (*func)(uint64_t, char*, uint32_t, FILE*, char*, void*),void *arg){ rawDataReadyCallBack=func; pRawDataReady=arg; } diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index f4d2d968f..1d6a1eb23 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -764,7 +764,7 @@ void UDPStandardImplementation::resetAcquisitionCount(){ } -int UDPStandardImplementation::startReceiver(char *c=NULL){ +int UDPStandardImplementation::startReceiver(char *c){ FILE_LOG(logDEBUG1) << __AT__ << " called"; cout << "Info: Starting Receiver" << endl; @@ -1752,23 +1752,28 @@ void UDPStandardImplementation::startWriting(){ //pop fifo and if end of acquisition - if(popAndCheckEndofAcquisition(wbuf, popReady, numPackets,toFreePointers,toFreePointersOffset)){ + if(popAndCheckEndofAcquisition(ithread, wbuf, popReady, numPackets,toFreePointers,toFreePointersOffset)){ #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread %d: All dummy-end buffers popped\n", ithread); #endif //finish missing packets - if(myDetectorType == EIGER - && ((tempoffset[0]!=0) || (tempoffset[1]!=(packetsPerFrame/numListeningThreads)))); + + if(myDetectorType == EIGER && + ((tempoffset[0]!=0) || (tempoffset[1]!=(packetsPerFrame/numberofListeningThreads)))); else{ stopWriting(ithread,wbuf); continue; } } - //eiger-processWritingPackets(); - //others-processWritingBuffer(); - - + switch(myDetectorType){ + case EIGER: + processWritingBufferPacketByPacket(); + break; + default: + processWritingBuffer(ithread, wbuf, numPackets); + break; + } }/*--end of loop for each buffer (inner loop)*/ @@ -1835,7 +1840,8 @@ void UDPStandardImplementation::startWriting(){ -bool UDPStandardImplementation::popAndCheckEndofAcquisition(char* wbuffer[], bool ready[], uint32_t nP[],char* toFree[],int toFreeOffset[]){ +bool UDPStandardImplementation::popAndCheckEndofAcquisition(int ithread, char* wbuffer[], bool ready[], uint32_t nP[],char* toFree[],int toFreeOffset[]){ + FILE_LOG(logDEBUG1) << __AT__ << " called"; bool endofAcquisition = true; int val; @@ -1866,7 +1872,7 @@ bool UDPStandardImplementation::popAndCheckEndofAcquisition(char* wbuffer[], boo #ifdef DEBUG4 switch(myDetectorType){ case EIGER: - wbuf_footer = (eiger_packet_footer_t*)(wbuffer[i] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS); + eiger_packet_footer_t* wbuf_footer = (eiger_packet_footer_t*)(wbuffer[i] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS); //cprintf(BLUE,"footer value:0x%x\n",i,(uint64_t)(*( (uint64_t*) wbuf_footer))); cprintf(BLUE,"Fnum[%d]:%d\n",i,(uint32_t)(*( (uint64_t*) wbuf_footer))); cprintf(BLUE,"Pnum[%d]:%d\n",i,*( (uint16_t*) wbuf_footer->packetNumber)); @@ -1887,6 +1893,54 @@ bool UDPStandardImplementation::popAndCheckEndofAcquisition(char* wbuffer[], boo +void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer[]){ + FILE_LOG(logDEBUG1) << __AT__ << " called"; + + cprintf(GREEN,"Info: Writing_Thread %d: End of Acquisition\n",ithread); + + //free fifo + for(int i=0; ipush(wbuffer[i])); +#ifdef +} + +void UDPStandardImplementation::processWritingBuffer(int ithread, char* wbuffer[], uint32_t nP[]){ + FILE_LOG(logDEBUG1) << __AT__ << " called"; + + +} + + +void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread, char* wbuffer[], uint32_t nP[]){ + FILE_LOG(logDEBUG1) << __AT__ << " called"; + + uint64_t tempframenumber = ((uint32_t)(*((uint32_t*)(wbuffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))); + + //for gotthard and normal frame, increment frame number to separate fnum and pnum + if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1)) + tempframenumber++; + + //get frame number + tempframenumber = (tempframenumber & frameIndexMask) >> frameIndexOffset; + + //single thread, just assign and process without compression + if(!dataCompressionEnable){ + currentFrameNumber = tempframenumber; + handleWithoutDataCompression(ithread, wbuffer, nP[0]); + } + + //handling multiple threads + else{ + pthread_mutex_lock(&progressMutex); + if(tempframenumber > currentFrameNumber) + currentFrameNumber = tempframenumber; + pthread_mutex_unlock(&progressMutex); + handleDataCompression(ithread,wbuffer,d, xmax, ymax, nf); + } + + + +} @@ -2173,66 +2227,9 @@ int UDPStandardImplementation::startWriting(){ - //pop - endofacquisition = true; - for(i=0;ipop(wbuf[i]); -#ifdef FIFO_DEBUG - cprintf(GREEN,"%d writer poped 0x%x from fifo %d\n", ithread, (void*)(wbuf[i]), i); -#endif - numpackets[i] = (uint32_t)(*((uint32_t*)wbuf[i])); - -#ifdef VERYDEBUG - cprintf(GREEN,"%d numpackets: %d for fifo :%d\n", ithread, numpackets[i], i); -#endif - if(numpackets < 0){ - cprintf(BG_RED,"negative numpackets[%d]%d\n",i,numpackets[i]); - exit(-1); - } - //dont pop again if dummy packet - else if(numpackets[i] == 0){ - popready[i] = false; -#ifdef EIGER_DEBUG3 - cprintf(GREEN,"%d Dummy frame popped out of fifo %d",ithread, i); -#endif - }else{ - endofacquisition = false; - if(numpackets[i] == onePacketSize){ -#ifdef EIGER_DEBUG3 - wbuf_footer = (eiger_packet_footer_t*)(wbuf[i] + footer_offset + HEADER_SIZE_NUM_TOT_PACKETS); - //cprintf(BLUE,"footer value:0x%x\n",i,(uint64_t)(*( (uint64_t*) wbuf_footer))); - cprintf(BLUE,"tempframenum[%d]:%d\n",i,(uint32_t)(*( (uint64_t*) wbuf_footer))); - cprintf(BLUE,"packetnum[%d]:%d\n",i,*( (uint16_t*) wbuf_footer->packetnum)); -#endif - } - - if(myDetectorType == EIGER){ - tofree[tofreeoffset[i]] = wbuf[i]; - tofreeoffset[i]++; - } - } - - } - } - //END OF ACQUISITION - if(endofacquisition){ -#ifdef EIGER_DEBUG3 - cprintf(GREEN,"%d Both dummy frames\n", ithread); -#endif - //remaining packets to be written - if((myDetectorType == EIGER) && - ((tempoffset[0]!=0) || (tempoffset[1]!=(packetsPerFrame/numListeningThreads)))); - else{ - - stopWriting(ithread,wbuf); - continue; - } - } - if(myDetectorType == EIGER){ @@ -2539,31 +2536,7 @@ int UDPStandardImplementation::startWriting(){ } - //other detectors other than eiger - else{ - //frame number for progress - if ((myDetectorType == PROPIX) ||((myDetectorType == GOTTHARD) && (shortFrame == -1))) - tempframenum[0] = (((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); - else - tempframenum[0] = ((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); - - if(numWriterThreads == 1) - currframenum = tempframenum[0]; - else{ - pthread_mutex_lock(&progress_mutex); - if(tempframenum[0] > currframenum) - currframenum = tempframenum[0]; - pthread_mutex_unlock(&progress_mutex); - } - - - //without datacompression: write datacall back, or write data, free fifo - if(!dataCompression) handleWithoutDataCompression(ithread,wbuf, numpackets[0]); - //data compression - else handleDataCompression(ithread,wbuf,d, xmax, ymax, nf); - - } }