From 4eceb3b5f76ec26140d22a831c375822cf32d240 Mon Sep 17 00:00:00 2001 From: Dhanya Maliakal Date: Fri, 2 Sep 2016 15:47:28 +0200 Subject: [PATCH] kinda --- .../include/UDPStandardImplementation.h | 8 +- .../src/UDPBaseImplementation.cpp | 1 + .../src/UDPStandardImplementation.cpp | 221 +++++++++--------- .../src/slsReceiverTCPIPInterface.cpp | 36 +-- 4 files changed, 139 insertions(+), 127 deletions(-) diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index b17882baa..b043eeedf 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -220,6 +220,8 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase */ void readFrame(int ithread, char* c,char** raw, int64_t &startAcq, int64_t &startFrame); + + void resetGuiPointer(int ithread); /** * Overridden method * Closes file / all files(data compression involves multiple files) @@ -456,8 +458,9 @@ private: * Uses semaphore for nth frame mode * @param ithread writer thread index * @param buffer buffer to copy + * @param numpackets number of packets to copy */ - void copyFrameToGui(int ithread, char* buffer); + void copyFrameToGui(int ithread, char* buffer, uint32_t numpackets); void waitWritingBufferForNextAcquisition(int ithread); @@ -589,6 +592,9 @@ private: /** packets in current file */ uint64_t totalPacketsInFile[MAX_NUMBER_OF_WRITER_THREADS]; + /**Total packet count written by each writing thread */ + uint64_t totalWritingPacketCount[MAX_NUMBER_OF_LISTENING_THREADS]; + diff --git a/slsReceiverSoftware/src/UDPBaseImplementation.cpp b/slsReceiverSoftware/src/UDPBaseImplementation.cpp index 858e968c2..d4bbbcfbd 100644 --- a/slsReceiverSoftware/src/UDPBaseImplementation.cpp +++ b/slsReceiverSoftware/src/UDPBaseImplementation.cpp @@ -436,6 +436,7 @@ void UDPBaseImplementation::readFrame(int ithread, char* c,char** raw, int64_t & FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes"; } + //FIXME: needed, isnt stopReceiver enough? void UDPBaseImplementation::abort(){ FILE_LOG(logWARNING) << __AT__ << " doing nothing..."; diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index 4ba857b2c..7f649d5c3 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -161,6 +161,7 @@ void UDPStandardImplementation::initializeMembers(){ frameNumberInPreviousFile[i] = -1; lastFrameNumberInFile[i] = -1; totalPacketsInFile[i] = 0; + totalWritingPacketCount[i] = 0; } @@ -849,6 +850,7 @@ int UDPStandardImplementation::startReceiver(char *c){ //reset file parameters lastFrameNumberInFile[i] = -1; totalPacketsInFile[i] = 0; + totalWritingPacketCount[i] = 0; if(sfilefd[i]){ fclose(sfilefd[i]); sfilefd[i] = NULL; @@ -1015,7 +1017,7 @@ void UDPStandardImplementation::startReadout(){ //wait as long as there is change from prev totalP, //and also change from received in buffer to previous value //(as one listens to many at a time, shouldnt cut off in between) - while((prev != totalP) && (prevReceivedInBuffer!= currentReceivedInBuffer)){ + while((prev != totalP) || (prevReceivedInBuffer!= currentReceivedInBuffer)){ #ifdef DEBUG5 cprintf(MAGENTA,"waiting for all packets totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer); @@ -1031,7 +1033,12 @@ void UDPStandardImplementation::startReadout(){ currentReceivedInBuffer = 0; for(i=0; igetCurrentTotalReceived(); +#ifdef DEBUG5 + cprintf(MAGENTA,"\tupdated: totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer); + +#endif } + } //set status @@ -1080,6 +1087,7 @@ void UDPStandardImplementation::readFrame(int ithread, char* c,char** raw, int64 cprintf(CYAN,"Info: gui data ready\n"); #endif *raw = guiData[ithread]; + guiData[ithread] = NULL; //for nth frame to gui, post semaphore so writer stops waiting @@ -1097,7 +1105,25 @@ void UDPStandardImplementation::readFrame(int ithread, char* c,char** raw, int64 } } +/* +void UDPStandardImplementation::resetGuiPointer(int ithread){ + FILE_LOG(logDEBUG) << __AT__ << " called"; + guiData[ithread] = NULL; + + //for nth frame to gui, post semaphore so writer stops waiting + if((FrameToGuiFrequency) && (writerThreadsMask)){ +#ifdef DEBUG4 + cprintf(CYAN,"Info: gonna post\n"); +#endif + //release after getting data + sem_post(&writerGuiSemaphore[ithread]); + } +#ifdef DEBUG4 + cprintf(CYAN,"Info: done post\n"); +#endif +} +*/ void UDPStandardImplementation::closeFile(int ithread){ FILE_LOG(logDEBUG) << __AT__ << " called for " << ithread ; @@ -1408,7 +1434,7 @@ int UDPStandardImplementation::createNewFile(int ithread){ FILE_LOG(logDEBUG) << __AT__ << " called"; int index = 0; - if(packetsCaught) + if(totalWritingPacketCount[ithread]) index = frameIndex[ithread]; //create file name @@ -1445,7 +1471,7 @@ int UDPStandardImplementation::createNewFile(int ithread){ //Print packet loss and filenames - if(!packetsCaught){ + if(!totalWritingPacketCount[ithread]){ frameNumberInPreviousFile[ithread] = -1; cout << "Thread " << ithread << " File:" << completeFileName[ithread] << endl; }else{ @@ -1458,7 +1484,8 @@ int UDPStandardImplementation::createNewFile(int ithread){ << "\tPacket Loss: " << setw(4)<0){ if(myDetectorType == JUNGFRAU){ jfrau_packet_header_t* header; @@ -1681,7 +1708,7 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch (uint32_t)(*( (uint64_t*) footer))); } } -//#endif +#endif #ifdef DEBUG cprintf(BLUE, "Listening_Thread %d : Received bytes: %d. Expected bytes: %d\n", ithread, receivedSize, bufferSize * numberofJobsPerBuffer-cSize); #endif @@ -2202,31 +2229,33 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){ //statistics FILE_LOG(logINFO) << "Status: Run Finished"; - if(totalPacketsCaught < ((uint64_t)numberOfFrames*packetsPerFrame*numberofListeningThreads)){ - cprintf(RED, "Total Missing Packets: %lld\n",(long long int)numberOfFrames*packetsPerFrame*numberofListeningThreads-totalPacketsCaught); - cprintf(RED, "Total Packets Caught: %lld\n",(long long int)totalPacketsCaught); - cprintf(RED, "Total Frames Caught: %lld\n",(long long int)(totalPacketsCaught/(packetsPerFrame*numberofListeningThreads))); - int64_t lastFrameNumber = 0; - for(int i=0;i= packetsPerFrame)/**needs to be reworked*/ - copyFrameToGui(ithread, wbuffer); + //if(npackets >= (packetsPerFrame/numberofListeningThreads)) + if(npackets) + copyFrameToGui(ithread, wbuffer,npackets); #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: Copied frame\n"); #endif @@ -2331,6 +2361,7 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w //update stats numpackets -= packetsWritten; totalPacketsInFile[ithread] += packetsWritten; + totalWritingPacketCount[ithread] += packetsWritten; pthread_mutex_lock(&writeMutex); packetsCaught += packetsWritten; totalPacketsCaught += packetsWritten; @@ -2342,9 +2373,9 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w while(numpackets){ //new file //create new file only if something has been written and modulus works - if((lastFrameNumberInFile[ithread]>=0) &&(!((lastFrameNumberInFile[ithread]+1) % maxFramesPerFile))){ + if((lastFrameNumberInFile[ithread]>=0) &&(!((lastFrameNumberInFile[ithread]+1) % maxFramesPerFile))) createNewFile(ithread); - } + //frames to save in one file nextFileFrameNumber = (lastFrameNumberInFile[ithread]+1) + @@ -2357,6 +2388,7 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w //update stats numpackets -= packetsWritten; totalPacketsInFile[ithread] += packetsWritten; + totalWritingPacketCount[ithread] += packetsWritten; pthread_mutex_lock(&writeMutex); packetsCaught += packetsWritten; totalPacketsCaught += packetsWritten; @@ -2378,8 +2410,10 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w return; } totalPacketsInFile[ithread] += numpackets; + totalWritingPacketCount[ithread] += numpackets; lastFrameNumberInFile[ithread] = finalLastFrameNumberToSave; currentFrameNumber[ithread] = finalLastFrameNumberToSave; + } if(numberofWriterThreads > 1) pthread_mutex_lock(&writeMutex); @@ -2447,7 +2481,7 @@ void UDPStandardImplementation::updateFileHeader(int ithread){ } -void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer){ +void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer, uint32_t numpackets){ FILE_LOG(logDEBUG) << __AT__ << " called"; @@ -2475,7 +2509,7 @@ void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer){ #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: CopyingFrame: guidataready is 0, Copying data\n"); #endif - memcpy(latestData[ithread],buffer + HEADER_SIZE_NUM_TOT_PACKETS,bufferSize); + memcpy(latestData[ithread],buffer , numpackets*onePacketSize); strcpy(guiFileName[ithread],completeFileName[ithread]); guiDataReady[ithread]=1; pthread_mutex_unlock(&dataReadyMutex); @@ -2612,6 +2646,7 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer #ifndef ALLFILE totalPacketsInFile[ithread] += (bufferSize/packetsPerFrame); + totalWritingPacketCount[ithread] += (bufferSize/packetsPerFrame); pthread_mutex_lock(&writeMutex); if((packetsCaught%packetsPerFrame) >= (uint32_t)maxFramesPerFile) createNewFile(ithread); @@ -2622,7 +2657,7 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer #endif if(!once){ - copyFrameToGui(ithread, buff[0]); + copyFrameToGui(ithread, buff[0],(uint32_t)packetsPerFrame); once = 1; } } @@ -2662,7 +2697,6 @@ int UDPStandardImplementation::getFrameNumber(int ithread, char* wbuffer, uint64 if(!((uint32_t)(*( (uint64_t*) footer)))){ tempframenumber = -1; FILE_LOG(logERROR) << "Fifo "<< ithread << ": Frame Number is zero from firmware."; - exit(-1); return FAIL; } #ifdef DEBUG4 @@ -2713,103 +2747,66 @@ int UDPStandardImplementation::writeUptoFrameNumber(int ithread, char* wbuffer, FILE_LOG(logDEBUG) << __AT__ << " called"; //if(ithread) cout<<"at writeUptoFrameNumber " << nextFrameNumber<< endl; - int bigIncrements = onePacketSize * packetsPerFrame; //a frame at a time - if(numberofJobsPerBuffer == 1) bigIncrements = onePacketSize; //a packet at a time as we listen to only one frame in a buffer int startoffset = offset; int endoffset = startoffset + numpackets * onePacketSize; - - int expectedoffset = startoffset + ((nextFrameNumber - (lastFrameNumberInFile[ithread]+1)) * onePacketSize * packetsPerFrame); - bool expectedoffsetATlastpacket = false; - if(expectedoffset >= endoffset){ - expectedoffset = startoffset + ((numpackets -1) * onePacketSize); - expectedoffsetATlastpacket = true; - } - offset = expectedoffset; - - - //get frame number at expected offset uint64_t tempframenumber=-1; - uint64_t frameNumberWritten=-1;//if(ithread) cout<<"frame number at expected ofset"<push(wbuffer)); return FAIL; } + //last packet's frame number < nextframenumber + if(tempframenumber=nextFrameNumber){ - while(tempframenumber>=nextFrameNumber){ - offset -= bigIncrements; - if(offsetpush(wbuffer)); - return FAIL; - } - } - if(offsetpush(wbuffer)); - return FAIL; - } - } - while(tempframenumberpush(wbuffer)); - return FAIL; - } - } + + //somewhere in between + int bigIncrements = onePacketSize * packetsPerFrame * 10; //10 frames at a time + if(numberofJobsPerBuffer == 1) bigIncrements = onePacketSize; //a packet at a time as we listen to only one frame in a buffer + + cout<=nextFrameNumber){ + offset -= bigIncrements; + if(offsetpush(wbuffer)); + return FAIL; } - - //if tempframenumber is too low, go forwards fast (by frame) and then slowly (by each packet) backwards - else{ - while(tempframenumberendoffset) - break;//if(ithread) cout<<"frame number at going forwards fast f#:"<push(wbuffer)); - return FAIL; - } - } - if(offset>endoffset){ - offset = endoffset;//if(ithread) cout<<"frame number at offset>endoffset f#:"<push(wbuffer)); - return FAIL; - } - } - while(tempframenumber>nextFrameNumber){ - offset -= onePacketSize;//if(ithread) cout<<"frame number at going bacckwards slow f#:"<push(wbuffer)); - return FAIL; - } - } - offset += onePacketSize; + } + if(offsetpush(wbuffer)); + return FAIL; + } + } + while(tempframenumberpush(wbuffer)); + return FAIL; } - frameNumberWritten = nextFrameNumber-1; } fwrite(wbuffer + startoffset, 1, offset-startoffset, sfilefd[ithread]); numPacketsWritten += ((offset-startoffset)/onePacketSize); - lastFrameNumberInFile[ithread] = frameNumberWritten; + lastFrameNumberInFile[ithread] = (nextFrameNumber-1); //if(ithread) cout<<"done with writeUptoFrameNumber" << endl; return OK; } diff --git a/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp b/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp index abb1308e8..500db2988 100644 --- a/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp +++ b/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp @@ -1164,7 +1164,7 @@ int slsReceiverTCPIPInterface::moench_read_frame(){ else{ bindex = ((uint32_t)(*((uint32_t*)raw))); - memcpy(origVal,raw,bufferSize); + memcpy(origVal,raw + HEADER_SIZE_NUM_TOT_PACKETS,bufferSize); raw=NULL; //************** packet number order********************** @@ -1369,7 +1369,7 @@ int slsReceiverTCPIPInterface::gotthard_read_frame(){ #endif } - memcpy(origVal,raw,bufferSize); + memcpy(origVal,raw + HEADER_SIZE_NUM_TOT_PACKETS,bufferSize); raw=NULL; @@ -1535,7 +1535,7 @@ int slsReceiverTCPIPInterface::propix_read_frame(){ cout << "index2:" << hex << index << endl; #endif - memcpy(origVal,raw,bufferSize); + memcpy(origVal,raw + HEADER_SIZE_NUM_TOT_PACKETS,bufferSize); raw=NULL; /*//ignore if half frame is missing @@ -1649,8 +1649,8 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){ char* raw; char* origVal = new char[frameSize]; char* retval = new char[dataSize]; - memset(origVal,0xF,frameSize); - memset(retval,0xF,dataSize); + memset(origVal,0xFF,frameSize); + memset(retval,0xFF,dataSize); int64_t startAcquisitionIndex=0; int64_t startFrameIndex=0; @@ -1677,6 +1677,9 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){ // acq started else{ ret = OK; + int fnum[EIGER_MAX_PORTS]; + for(int i=0;ireadFrame(i,fName,&raw,startAcquisitionIndex,startFrameIndex); @@ -1684,27 +1687,28 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){ if (raw == NULL){ startAcquisitionIndex = -1; #ifdef VERYVERBOSE - cout<<"data not ready for gui yet"<subFrameNumber); } #ifdef VERYVERBOSE cout << "index:" << dec << index << endl; - if(index>10000) exit(-1); cout << "subframenumber:" << dec << subframenumber << endl; #endif - - memcpy(((char*)origVal)+(i*onePacketSize*packetsPerFrame),raw,(frameSize/EIGER_MAX_PORTS)); + int numpackets = (uint32_t)(*( (uint32_t*) raw)); + memcpy(((char*)origVal)+(i*onePacketSize*packetsPerFrame),raw + HEADER_SIZE_NUM_TOT_PACKETS,numpackets*onePacketSize); raw=NULL; } } @@ -1712,8 +1716,12 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){ if(startAcquisitionIndex != -1){ //cout<<"**** got proper frame ******"<resetGuiPointer(i); - + if(fnum[0]!=fnum[1]) + cprintf(BG_RED,"Fnums differ %d and %d\n",fnum[0],fnum[1]); int c1=8;//first port int c2=(frameSize/2) + 8; //second port @@ -1942,7 +1950,7 @@ int slsReceiverTCPIPInterface::jungfrau_read_frame(){ //proper frame else{ //cout<<"**** got proper frame ******"<