From be2bc15ab5c5e26adb72275610900e206e9cba5b Mon Sep 17 00:00:00 2001 From: Dhanya Maliakal Date: Tue, 30 Aug 2016 16:10:46 +0200 Subject: [PATCH] not done --- .../include/UDPBaseImplementation.h | 2 +- .../include/UDPStandardImplementation.h | 89 +- slsReceiverSoftware/include/genericSocket.h | 24 +- slsReceiverSoftware/include/receiver_defs.h | 2 +- .../src/UDPStandardImplementation.cpp | 1224 ++++++----------- .../src/slsReceiverTCPIPInterface.cpp | 5 +- 6 files changed, 505 insertions(+), 841 deletions(-) diff --git a/slsReceiverSoftware/include/UDPBaseImplementation.h b/slsReceiverSoftware/include/UDPBaseImplementation.h index f9c5e9273..6083b0723 100644 --- a/slsReceiverSoftware/include/UDPBaseImplementation.h +++ b/slsReceiverSoftware/include/UDPBaseImplementation.h @@ -513,7 +513,7 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter //***acquisition count parameters*** /** Total packets caught for an entire acquisition (including all scans) */ uint64_t totalPacketsCaught; - /** Frames Caught for each real time acquisition (eg. for each scan) */ + /** Packets Caught for each real time acquisition (eg. for each scan) */ uint64_t packetsCaught; //***acquisition indices parameters*** diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index 587b88dbd..fcec7fc0d 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -78,6 +78,19 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase */ int setDataCompressionEnable(const bool b); + //***acquisition count parameters*** + /** + * Get Total Frames Caught for an entire acquisition (including all scans) + * @return total number of frames caught for entire acquisition + */ + uint64_t getTotalFramesCaught() const; + + /** + * Get Frames Caught for each real time acquisition (eg. for each scan) + * @return number of frames caught for each scan + */ + uint64_t getFramesCaught() const; + //***acquisition parameters*** /** * Overridden method @@ -394,18 +407,6 @@ private: */ void startWriting(); - /** - * Called by processWritingBuffer and processWritingBufferPacketByPacket - * Pops buffer from all the FIFOs and checks for dummy frames and end of acquisition - * @param ithread current thread index - * @param wbuffer the buffer array that is popped from all the FIFOs - * @param ready if that FIFO is allowed to pop (depends on if dummy buffer already popped/ waiting for other FIFO to finish a frame(eiger)) - * @param nP number of packets in the buffer popped out - * @param fifoTempFree circular fifo to save addresses of packets adding upto a frame before pushing into fifofree (eiger specific) - * @return true if end of acquisition else false - */ - bool popAndCheckEndofAcquisition(int ithread, char* wbuffer[], bool ready[], uint32_t nP[],CircularFifo* fifoTempFree[]); - /** * Called by processWritingBuffer and processWritingBufferPacketByPacket * When dummy-end buffers are popped from all FIFOs (acquisition over), this is called @@ -434,14 +435,14 @@ private: * @param wbuffer is the address of buffer popped out of FIFO * @param numpackets is the number of packets */ - void writeFileWithoutCompression(int ithread, char* wbuffer[],uint32_t numpackets); + void writeFileWithoutCompression(int ithread, char* wbuffer,uint32_t numpackets); /** * Called by writeToFileWithoutCompression * Create headers for file writing (at the moment, this is eiger specific) * @param wbuffer writing buffer popped from FIFOs */ - void createHeaders(char* wbuffer[]); + void createHeaders(char* wbuffer); /** * Updates the file header char aray, each time the corresp parameter is changed @@ -456,11 +457,7 @@ private: * @param ithread writer thread index * @param buffer buffer to copy */ - void copyFrameToGui(int ithread, char* buffer[]); - - void processWritingBuffer(int ithread); - - void processWritingBufferPacketByPacket(int ithread); + void copyFrameToGui(int ithread, char* buffer); void waitWritingBufferForNextAcquisition(int ithread); @@ -473,10 +470,28 @@ private: * @param wbuffer writer buffer * @param nf number of frames */ - void handleDataCompression(int ithread, char* wbuffer[], uint64_t &nf); + void handleDataCompression(int ithread, char* wbuffer, uint64_t &nf); + /** + * Get Frame Number + * @param ithread writer thread index + * @param wbuffer writer buffer + * @param tempframenumber reference to the frame number + * @return OK or FAIL + */ + int getFrameNumber(int ithread, char* wbuffer, uint64_t &tempframenumber); + /** + * Find offset upto this frame number and write it to file + * @param ithread writer thread index + * @param wbuffer writer buffer + * @param offset reference of offset to look from and replaces offset to starting of nextframenumber + * @param nextFrameNumber frame number up to which data written + * @param numpackets number of packets in buffer + * @param numPacketsWritten number of packets written to file + */ + int writeUptoFrameNumber(int ithread, char* wbuffer, int &offset, uint64_t nextFrameNumber, uint32_t numpackets, int &numPacketsWritten); /************************************************************************* * Class Members ********************************************************* @@ -526,8 +541,11 @@ private: /** Complete File name */ char completeFileName[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH]; + /** File Name without frame index, file index and extension (_d0_f000000000000_8.raw)*/ + char fileNamePerThread[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH]; + /** Maximum Frames Per File **/ - int maxFramesPerFile; + uint64_t maxFramesPerFile; /** If file created successfully for all Writer Threads */ bool fileCreateSuccess; @@ -550,12 +568,12 @@ private: /** Current Frame Number */ uint64_t currentFrameNumber[MAX_NUMBER_OF_WRITER_THREADS]; - /** Previous Frame number from buffer to calculate loss */ - int64_t previousFrameNumber[MAX_NUMBER_OF_WRITER_THREADS]; + int64_t frameNumberInPreviousFile[MAX_NUMBER_OF_WRITER_THREADS]; /** Last Frame Index Listened To */ - int32_t lastFrameIndex[MAX_NUMBER_OF_WRITER_THREADS]; + int64_t lastFrameIndex[MAX_NUMBER_OF_WRITER_THREADS]; + /* Acquisition started */ @@ -564,25 +582,14 @@ private: /* Measurement started - for each thread to get progress print outs*/ bool measurementStarted[MAX_NUMBER_OF_LISTENING_THREADS]; - /** Total Frame Count listened to by listening threads */ - int totalListeningFrameCount[MAX_NUMBER_OF_LISTENING_THREADS]; + /** Total packet Count listened to by listening threads */ + int totalListeningPacketCount[MAX_NUMBER_OF_LISTENING_THREADS]; /** Pckets currently in current file, starts new file when it reaches max */ - uint32_t packetsInFile[MAX_NUMBER_OF_WRITER_THREADS]; - - /** Number of Missing Packets per buffer*/ - uint32_t numMissingPackets[MAX_NUMBER_OF_WRITER_THREADS]; - - /** Total Number of Missing Packets in acquisition*/ - uint32_t numTotMissingPackets; - - /** Number of Missing Packets in file */ - uint32_t numTotMissingPacketsInFile[MAX_NUMBER_OF_WRITER_THREADS]; - - /** packets caught per thread */ - uint64_t packetsCaughtPerThread[MAX_NUMBER_OF_WRITER_THREADS]; - + uint64_t lastFrameNumberInFile[MAX_NUMBER_OF_WRITER_THREADS]; + /** packets in current file */ + uint64_t totalPacketsInFile[MAX_NUMBER_OF_WRITER_THREADS]; @@ -721,8 +728,6 @@ private: /** Progress (currentFrameNumber) Mutex */ pthread_mutex_t progressMutex; - /** Progress (currentFrameNumber) Mutex */ - pthread_mutex_t udpSocketMutex[MAX_NUMBER_OF_LISTENING_THREADS]; //***callback*** /** The action which decides what the user and default responsibilities to save data are diff --git a/slsReceiverSoftware/include/genericSocket.h b/slsReceiverSoftware/include/genericSocket.h index 3facb0a2b..0f52c0413 100644 --- a/slsReceiverSoftware/include/genericSocket.h +++ b/slsReceiverSoftware/include/genericSocket.h @@ -613,25 +613,13 @@ enum communicationProtocol{ while(length>0){ nsending = (length>packet_size) ? packet_size:length; - /* - //created for debugging on 11.05.2015 - nsending=5000; - nsent = recvfrom(socketDescriptor,(char*)buf,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); - if(nsent <1000){ - if(nsent < 48){ - cout << " "<fnum)<< "\t"; - cout << k <<" packets" << endl; - k = 0; - } - } - else - k++; - */ nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length); - if(!nsent) break; + if(nsent < packet_size) { + if(nsent){ + cout << "Incomplete Packet size " << nsent << endl; + } + break; + } length-=nsent; total_sent+=nsent; } diff --git a/slsReceiverSoftware/include/receiver_defs.h b/slsReceiverSoftware/include/receiver_defs.h index 4c621a67f..0c60b892c 100755 --- a/slsReceiverSoftware/include/receiver_defs.h +++ b/slsReceiverSoftware/include/receiver_defs.h @@ -139,7 +139,7 @@ typedef struct { #define JFRAU_BUFFER_SIZE (JFRAU_ONE_PACKET_SIZE*JFRAU_PACKETS_PER_FRAME) //8214*128 -#define JFRAU_FRAME_INDEX_MASK 0x0 //Not Applicable, use struct +#define JFRAU_FRAME_INDEX_MASK 0xffffff //mask after using struct (48 bit) #define JFRAU_FRAME_INDEX_OFFSET 0x0 //Not Applicable, use struct #define JFRAU_PACKET_INDEX_MASK 0x0//Not Applicable, use struct diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index 62da115d3..887b827a4 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -10,8 +10,6 @@ #include "gotthardModuleData.h" #include "gotthardShortModuleData.h" -#include "fileIOStatic.h" - #include // exit() #include //set precision for printing parameters for create new file #include //map @@ -38,8 +36,6 @@ UDPStandardImplementation::UDPStandardImplementation(){ pthread_mutex_init(&writeMutex,NULL); pthread_mutex_init(&dataReadyMutex,NULL); pthread_mutex_init(&progressMutex,NULL); - for(int i=0;iShutDownSocket(); FILE_LOG(logINFO) << "Shut down UDP Socket " << i; delete udpSocket[i]; udpSocket[i] = NULL; - pthread_mutex_unlock(&udpSocketMutex[i]); } } return OK; @@ -998,10 +1005,10 @@ void UDPStandardImplementation::startReadout(){ //check if all packets got int totalP = 0,prev,i; for(i=0; iinitEventTree(temp, &iframe); //resets the pedestalSubtraction array and the commonModeSubtraction singlePhotonDetectorObject[ithread]->newDataSet(); @@ -1564,20 +1570,16 @@ void UDPStandardImplementation::startListening(){ #endif - pthread_mutex_lock(&udpSocketMutex[ithread]); //udpsocket doesnt exist - if(udpSocket[ithread] == NULL){ + if(status == TRANSMITTING){ FILE_LOG(logERROR) << "Listening_Thread " << ithread << ": UDP Socket not created or shut down earlier"; stopListening(ithread,0); - pthread_mutex_unlock(&udpSocketMutex[ithread]); continue; } rc = prepareAndListenBuffer(ithread, carryonBufferSize, tempBuffer); carryonBufferSize = 0; - pthread_mutex_unlock(&udpSocketMutex[ithread]); - //start indices for each start of scan/acquisition if((!measurementStarted) && (rc > 0)) startFrameIndices(ithread); @@ -1589,7 +1591,7 @@ void UDPStandardImplementation::startListening(){ //write packet count to buffer if(myDetectorType == EIGER) - (*((uint32_t*)(buffer[ithread]))) = rc/onePacketSize; + (*((uint32_t*)(buffer[ithread]))) = (rc/onePacketSize); if(dataCompressionEnable) (*((uint32_t*)(buffer[ithread]))) = processListeningBuffer(ithread, carryonBufferSize, tempBuffer, rc); @@ -1634,12 +1636,18 @@ void UDPStandardImplementation::startListening(){ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, char* temp){ FILE_LOG(logDEBUG) << __AT__ << " called"; + int receivedSize = 0; + //carry over from previous buffer if(cSize) memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, temp, cSize); - int receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, (bufferSize * numberofJobsPerBuffer) - cSize); + if(status != TRANSMITTING) + receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, (bufferSize * numberofJobsPerBuffer) - cSize); + //throw away packets that is not one packet size - while(myDetectorType == EIGER && receivedSize != onePacketSize) { + while((myDetectorType == EIGER) && + (receivedSize != ((bufferSize * numberofJobsPerBuffer) - cSize)) && + (status != TRANSMITTING)) { if(receivedSize != EIGER_HEADER_LENGTH) cprintf(RED,"Listening_Thread %d: Listened to a weird packet size %d\n",ithread, receivedSize); #ifdef DEBUG @@ -1647,10 +1655,12 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch cprintf(BLUE,"Listening_Thread %d: Listened to a header packet\n",ithread); #endif //listen again - receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS); + if(status != TRANSMITTING) + receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, (bufferSize * numberofJobsPerBuffer) - cSize); + //cout<0){ @@ -1761,7 +1771,7 @@ void UDPStandardImplementation::stopListening(int ithread, int numbytes){ //push last non empty buffer into fifo else{ (*((uint32_t*)(buffer[ithread]))) = numbytes/onePacketSize; - totalListeningFrameCount[ithread] += (numbytes/onePacketSize); + totalListeningPacketCount[ithread] += (numbytes/onePacketSize); #ifdef DEBUG cprintf(BLUE,"Listening_Thread %d: Last Buffer numBytes:%d\n",ithread, numbytes); cprintf(BLUE,"Listening_Thread %d: Last Buffer packet count:%d\n",ithread, numbytes/onePacketSize); @@ -1813,7 +1823,7 @@ void UDPStandardImplementation::stopListening(int ithread, int numbytes){ listeningThreadsMask^=(1<> frameIndexOffset), @@ -1909,7 +1919,7 @@ uint32_t UDPStandardImplementation::processListeningBuffer(int ithread, int &cSi case JUNGFRAU: - lastPacketOffset = (((numberofJobsPerBuffer * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); + lastPacketOffset = (((packetCount - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); #ifdef DEBUG4 header = (jfrau_packet_header_t*) (buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS); cprintf(BLUE, "Listening_Thread: First Header:%d\t First Packet:%d\n", @@ -1971,22 +1981,6 @@ void UDPStandardImplementation::startWriting(){ //let calling function know thread started and obtained current threadStarted = 1; - switch(myDetectorType){ - case EIGER: - processWritingBufferPacketByPacket(ithread); - break; - default: - processWritingBuffer(ithread); - break; - } - -} - - - -void UDPStandardImplementation::processWritingBuffer(int ithread){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - //variable definitions char* wbuf; //buffer popped from FIFO sfilefd[ithread] = NULL; //file pointer @@ -2000,7 +1994,7 @@ void UDPStandardImplementation::processWritingBuffer(int ithread){ //--reset parameters before acquisition nf = 0; - guiData[ithread] = latestData[ithread]; //so that the first frame is always copied + guiData[ithread] = latestData[ithread]; //so that the first frame is always copied if(dataCompressionEnable) listenfifoIndex = 0; //compression has only one listening thread @@ -2060,361 +2054,6 @@ void UDPStandardImplementation::processWritingBuffer(int ithread){ -void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - //variable definitions - char* packetBuffer[numberofListeningThreads]; //buffer popped from FIFO - sfilefd = NULL; //file pointer - bool popReady[numberofListeningThreads]; //if the FIFO can be popped - uint32_t numPackets[numberofListeningThreads]; //number of packets popped from the FIFO - - int MAX_NUM_PACKETS = 1024; //highest 32 bit has 1024 number of packets - uint32_t LAST_PACKET_VALUE; //last packet number - - CircularFifo* fifoTempFree[numberofListeningThreads];//ciruclar fifo to keep track of one frame packets to be freed and reused later - char* temp = NULL; - - char* frameBuffer[MAX_NUM_PACKETS]; //buffer offset created for a whole frame - int frameBufferoffset[numberofListeningThreads]; //buffer offset created for a whole frame for both listening threads - char* blankframe[MAX_NUM_PACKETS]; //blank buffer for a whole frame with missing packets - int blankoffset; //blank buffer offset - - bool fullframe[numberofListeningThreads]; //if full frame processed for each listening thread - volatile uint32_t threadFrameNumber[numberofListeningThreads]; //thread frame number for each listening thread buffer popped out - volatile uint32_t presentFrameNumber; //the current frame number aiming to be built - volatile uint32_t lastPacketNumber[numberofListeningThreads]; //last packet number got - volatile uint32_t currentPacketNumber[numberofListeningThreads];//current packet number - volatile int numberofMissingPackets[numberofListeningThreads]; // number of missing packets in this buffer - - for(int i=0; iisEmpty()){ - fifoTempFree[i]->pop(temp); -#ifdef EVERYFIFODEBUG - if(fifoTempFree[i]->getSemValue()>((packetsPerFrame/numberofListeningThreads)-3)) - cprintf(RED,"FifoTempFree[%d]: value:%d, pop 0x%x\n",i,fifoTempFree[i]->getSemValue(),(void*)(temp)); -#endif - } - delete fifoTempFree[i]; - fifoTempFree[i] = NULL; - } - fifoTempFree[i] = new CircularFifo(MAX_NUM_PACKETS); - } - - for(uint32_t i=0; imissingPacket) = missingPacketValue; - *( (uint16_t*) blankframe_footer->packetNumber) = i+1; - - //set each value inside blank frame to 0xff - for(int j=0;j<(oneDataSize);++j){ - unsigned char* blankframe_data = (unsigned char*)blankframe[i] + sizeof(eiger_packet_header_t) + j; - *(blankframe_data) = 0xFF; - } - } - //last frame read out - lastFrameIndex = -1; - - - - - /* inner loop - loop for each buffer */ - //until mask unset (udp sockets shut down by client) - while((1 << ithread) & writerThreadsMask){ - - - //pop fifo and if end of acquisition - //cprintf(BLUE,"popready[0]:%d popready[1]:%d\n",popReady[0],popReady[1]); - if(popAndCheckEndofAcquisition(ithread, packetBuffer, popReady, numPackets,fifoTempFree)){ -#ifdef DEBUG4 - cprintf(GREEN,"Writing_Thread All dummy-end buffers popped\n"); -#endif - //finish missing packets - if(((frameBufferoffset[0]!=0) || (frameBufferoffset[1]!=((int)packetsPerFrame/numberofListeningThreads)))); - else{ - stopWriting(ithread,packetBuffer); - continue; - } - } -#ifdef DEBUG4 - else{cprintf(BLUE,"POPped but i see?\n");} -#endif - - //get a full frame------------------------------------------------------------------------------------------------------- - for(int i=0;ipacketNumber); -#ifdef DEBUG4 - cprintf(MAGENTA,"Fifo %d: threadframenumber original:%d currentpacketnumber real:%d\n", - i,threadFrameNumber[i],currentPacketNumber[i]); -#endif - } - - //calculate number of missing packets----------------------------------------------------- - numberofMissingPackets[i] = 0; - if((numPackets[i] == dummyPacketValue) || (threadFrameNumber[i] != presentFrameNumber)) - numberofMissingPackets[i] = (LAST_PACKET_VALUE - lastPacketNumber[i]); - else - numberofMissingPackets[i] = (currentPacketNumber[i] - lastPacketNumber[i] - 1); - numMissingPackets += numberofMissingPackets[i]; - -#ifdef DEBUG4 - if(numPackets[i] == dummyPacketValue) - cprintf(GREEN, "Fifo %d: Calc missing packets (Dummy): Adding missing packets %d to the last frame\n", - i, numberofMissingPackets[i]); - else{ - cprintf(GREEN,"Fifo %d: Calc missing packets: fnum %d, fnum_thread %d, " - "pnum %d, last_pnum %d, pnum_offset %d missing_packets %d\n", - i,presentFrameNumber,threadFrameNumber[i], - currentPacketNumber[i],lastPacketNumber[i],frameBufferoffset[i],numberofMissingPackets[i]); - } -#endif - - - //add missing packets--------------------------------------------------------------------- - for(int j=0;jmissingPacket)!= missingPacketValue){ - eiger_packet_header_t* blankframe_header = (eiger_packet_header_t*) blankframe[blankoffset]; - cprintf(BG_RED, "Fifo %d: Add Missing Packet Error: " - "pnum_offset %d, pnum %d, fnum_thread %d, missingpacket_buffer 0x%x, missingpacket_blank 0x%x\n", - i,frameBufferoffset[i],currentPacketNumber[i],threadFrameNumber[i], - *( (uint16_t*) frameBuffer_header->missingPacket), - *( (uint16_t*) blankframe_header->missingPacket)); - exit(-1); - }else{ -#ifdef DEBUG4 - cprintf(RED, "Fifo %d: Add Missing Packet success: " - "pnum_offset %d, pnum_got %d, fnum_thread %d, missingpacket_buffer 0x%x\n", - i,frameBufferoffset[i],currentPacketNumber[i],threadFrameNumber[i], - *( (uint16_t*) frameBuffer_header->missingPacket)); -#endif - frameBufferoffset[i]=frameBufferoffset[i]+1; - } - } - - //missed packets/future packet: do not pop over and determine fullframe-------------------- - popReady[i] = false; - if((numPackets[i] == dummyPacketValue) ||(threadFrameNumber[i] != presentFrameNumber)) - fullframe[i] = true; - else - fullframe[i] = false; - if(threadFrameNumber[i] != presentFrameNumber) - threadFrameNumber[i] = presentFrameNumber; - - - //add current packet-------------------------------------------------------------- - if(fullframe[i] == false){ - if(currentPacketNumber[i] != (uint32_t)(frameBufferoffset[i]-(i*packetsPerFrame/numberofListeningThreads))+1){ - cprintf(BG_RED, "Fifo %d: Correct Packet Offset Error: " - "pnum_offset %d,pnum %d fnum_thread %d\n", - i,frameBufferoffset[i],currentPacketNumber[i],threadFrameNumber[i]); - exit(-1); - } - - - while(!fifoTempFree[i]->push(packetBuffer[i])); -#ifdef EVERYFIFODEBUG - if(fifoTempFree[i]->getSemValue()>((packetsPerFrame/numberofListeningThreads)-3)) - cprintf(YELLOW,"FifoTempfree[%d]: value:%d, push 0x%x\n",i,fifoTempFree[i]->getSemValue(),(void*)(wbuffer[i])); -#endif - - - - //cprintf(RED,"Current Packet frameBufferoffset[i]:%d\n",frameBufferoffset[i]); - frameBuffer[frameBufferoffset[i]] = (packetBuffer[i] + HEADER_SIZE_NUM_TOT_PACKETS); -#ifdef DEBUG4 - eiger_packet_header_t* frameBuffer_header = (eiger_packet_header_t*) frameBuffer[frameBufferoffset[i]]; - eiger_packet_footer_t* frameBuffer_footer = (eiger_packet_footer_t*) (frameBuffer[frameBufferoffset[i]] + footerOffset); - cprintf(GREEN, "Fifo %d: Current Packet added success:" - "pnum_offset %d, pnum %d, real pnum %d fnum_thread %d, missingpacket_buffer 0x%x\n", - i,frameBufferoffset[i],currentPacketNumber[i],*( (uint16_t*) frameBuffer_footer->packetNumber),threadFrameNumber[i], - *( (uint16_t*) frameBuffer_header->missingPacket)); -#endif - frameBufferoffset[i]=frameBufferoffset[i]+1; - //update last packet - lastPacketNumber[i] = currentPacketNumber[i]; - popReady[i] = true; - fullframe[i] = false; - if(currentPacketNumber[i] == LAST_PACKET_VALUE){ -#ifdef DEBUG4 - cprintf(GREEN, "Fifo %d: Got last packet\n",i); -#endif - popReady[i] = false; - fullframe[i] = true; - } //end of last packet - }//end of add current packet - }//end of if(!fullframe) - }//end of for listening threads - - - //full frame - if(fullframe[0] && fullframe[1]){ - currentFrameNumber = presentFrameNumber; - numTotMissingPacketsInFile += numMissingPackets; - numTotMissingPackets += numMissingPackets;/**requires a lock*/ - -/* - cprintf(CYAN,"**framenum:%lld\n ",(long long int)currentFrameNumber); - if(currentFrameNumber>500){ - cprintf(BG_RED,"too high frame number %lld \n",(long long int)currentFrameNumber ); - exit(-1); - } - for(int i=0;ipacketNumber), (void*)(packetBuffer[i])); - }*/ -#ifdef DEBUG4 - cprintf(BLUE," nummissingpackets:%d\n",numMissingPackets); -#endif -#ifdef FNUM_DEBUG - cprintf(GREEN,"**fnum:%lld**\n",(long long int)currentFrameNumber); -#endif -#ifdef MISSINGP_DEBUG - if(numMissingPackets){ - cprintf(RED, "Total missing packets %d for fnum %d\n",numMissingPackets,currentFrameNumber); - for (int j=0;jmissingPacket)==missingPacketValue) - cprintf(RED,"Found missing packet at pnum %d\n",j); - } - } -#endif - - //write and copy to gui - handleWithoutDataCompression(ithread,frameBuffer,packetsPerFrame); - - //reset a few stuff - presentFrameNumber++; - for(int i=0; iisEmpty()){ - fifoTempFree[i]->pop(temp); -#ifdef EVERYFIFODEBUG - if(fifoTempFree[i]->getSemValue()>((packetsPerFrame/numberofListeningThreads)-3)) - cprintf(GRAY,"FifoTempFree[%d]: value:%d, pop 0x%x\n",i,fifoTempFree[i]->getSemValue(),(void*)(temp)); -#endif - while(!fifoFree[i]->push(temp)); -#ifdef EVERYFIFODEBUG - if(fifoFree[i]->getSemValue()<100) - cprintf(GREEN,"FifoFree[%d]: value:%d, push 0x%x\n",i,fifoFree[i]->getSemValue(),(void*)(temp)); -#endif -#ifdef CFIFODEBUG - if(i==0) - cprintf(CYAN,"Fifo %d: Writing_Thread freed: pushed into fifofree %p\n",i, (void*)(temp)); - else - cprintf(YELLOW,"Fifo %d: Writing_Thread freed: pushed into fifofree %p\n",i, (void*)(temp)); -#endif - } - } -#ifdef DEBUG4 - cprintf(GREEN,"Writing_Thread: finished freeing\n"); -#endif - - - }//end of full frame - - }/*--end of loop for each buffer (inner loop)*/ - - waitWritingBufferForNextAcquisition(ithread); - - }/*--end of loop for each acquisition (outer loop) */ -} - - - - void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread){ FILE_LOG(logDEBUG) << __AT__ << " called"; @@ -2451,24 +2090,20 @@ void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread) //change the detector index in the file names if(myDetectorType == EIGER){ int detindex = -1; - string tempname(fileName[ithread]); - cout<<"tempname:"<* fifoTempFree[]){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - bool endofAcquisition = true; - for(int i=0; ipop(wbuffer[i]); -#ifdef EVERYFIFODEBUG - if(fifo[i]->getSemValue()>(fifoSize-100)) - cprintf(CYAN,"Fifo[%d]: value:%d, pop 0x%x\n",i,fifo[i]->getSemValue(),(void*)(wbuffer[i])); -#endif -#ifdef CFIFODEBUG - if(i == 0) - cprintf(CYAN,"Writing_Thread %d: Popped %p from FIFO %d\n", ithread, (void*)(wbuffer[i]),i); - else - cprintf(YELLOW,"Writing_Thread %d: Popped %p from FIFO %d\n", ithread, (void*)(wbuffer[i]),i); -#endif - nP[i] = (uint32_t)(*((uint32_t*)wbuffer[i])); -#ifdef DEBUG4 - cprintf(GREEN,"Writing_Thread %d: Number of Packets: %d for FIFO %d\n", ithread, nP[i], i); -#endif - //dummy-end buffer - if(nP[i] == dummyPacketValue){ - ready[i] = false; -#ifdef DEBUG3 - cprintf(GREEN,"Writing_Thread %d: Dummy frame popped out of FIFO %d",ithread, i); -#endif - } - //normal buffer popped out - else{ - endofAcquisition = false; -#ifdef DEBUG4 - if(myDetectorType == EIGER){ - eiger_packet_footer_t* wbuf_footer = (eiger_packet_footer_t*)(wbuffer[i] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS); - //cprintf(BLUE,"footer value:0x%x\n",i,(uint64_t)(*( (uint64_t*) wbuf_footer))); - //if(*( (uint16_t*) wbuf_footer->packetNumber) == 1){ - cprintf(BLUE,"Fnum[%d]:%d\n",i,(uint32_t)(*( (uint64_t*) wbuf_footer))); - cprintf(BLUE,"Pnum[%d]:%d\n",i,*( (uint16_t*) wbuf_footer->packetNumber)); - //} - } -#endif - } - } - //when both are not popped but curretn frame number is being processed - else{ - if(nP[i] != dummyPacketValue) - endofAcquisition = false; - } - } - - return endofAcquisition; -} - - void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){ FILE_LOG(logDEBUG) << __AT__ << " called"; @@ -2626,294 +2206,200 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){ //statistics FILE_LOG(logINFO) << "Status: Run Finished"; FILE_LOG(logINFO) << "Last Frame Number Caught:" << lastFrameIndex[ithread]; - if(totalPacketsCaught < ((uint64_t)numberOfFrames*packetsPerFrame)){ - cprintf(RED, "Total Missing Packets padded: %d\n",numTotMissingPackets); + if(totalPacketsCaught < ((uint64_t)numberOfFrames*packetsPerFrame*numberofListeningThreads)){ + cprintf(RED, "Total Missing Packets: %lld\n",(long long int)numberOfFrames*packetsPerFrame*numberofListeningThreads-totalPacketsCaught); cprintf(RED, "Total Packets Caught: %lld\n",(long long int)totalPacketsCaught); - cprintf(RED, "Total Frames Caught: %lld\n",(long long int)(totalPacketsCaught/packetsPerFrame)); + cprintf(RED, "Total Frames Caught: %lld\n",(long long int)(totalPacketsCaught/(packetsPerFrame*numberofListeningThreads))); }else{ - cprintf(GREEN, "Total Missing Packets padded: %d\n",numTotMissingPackets); - cprintf(GREEN, "Total Packets Caught:%lld\n", (long long int)totalPacketsCaught); - cprintf(GREEN, "Total Frames Caught:%lld\n",(long long int)(totalPacketsCaught/packetsPerFrame)); + cprintf(GREEN, "Total Missing Packets: %lld\n",(long long int)numberOfFrames*packetsPerFrame*numberofListeningThreads-totalPacketsCaught); + cprintf(GREEN, "Total Packets Caught: %lld\n",(long long int)totalPacketsCaught); + cprintf(GREEN, "Total Frames Caught: %lld\n",(long long int)(totalPacketsCaught/(packetsPerFrame*numberofListeningThreads))); } //acquisition end if (acquisitionFinishedCallBack) - acquisitionFinishedCallBack((int)(totalPacketsCaught/packetsPerFrame), pAcquisitionFinished); + acquisitionFinishedCallBack((int)totalPacketsCaught, pAcquisitionFinished); } } -void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* wbuffer,uint32_t npackets){ +void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* wbuffer, uint32_t npackets){ FILE_LOG(logDEBUG) << __AT__ << " called"; - //get frame number (eiger already gets it when it does packet to packet processing) - if(myDetectorType != EIGER){ - if(myDetectorType == JUNGFRAU){ - jfrau_packet_header_t* header = (jfrau_packet_header_t*)(wbuffer[0] + HEADER_SIZE_NUM_TOT_PACKETS); - currentFrameNumber = (*( (uint32_t*) header->frameNumber))&0xffffff; - }else{ - uint64_t tempframenumber = ((uint32_t)(*((uint32_t*)(wbuffer[0] + HEADER_SIZE_NUM_TOT_PACKETS)))); - //for gotthard and normal frame, increment frame number to separate fnum and pnum - if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1)) - tempframenumber++; - //get frame number - currentFrameNumber = (tempframenumber & frameIndexMask) >> frameIndexOffset; - } - //set indices - acquisitionIndex = currentFrameNumber - startAcquisitionIndex; - frameIndex = currentFrameNumber - startFrameIndex; + //get current frame number + uint64_t tempframenumber; + if(getFrameNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS,tempframenumber) == FAIL){ + //error in frame number sent by fpga + while(!fifoFree[ithread]->push(wbuffer)); + + return; } + //update current frame number + lastFrameIndex[ithread] = tempframenumber; + if(myDetectorType == EIGER) + currentFrameNumber[ithread] = tempframenumber + (startFrameIndex - 1); + else + currentFrameNumber[ithread] = tempframenumber-startFrameIndex; - + //set indices + pthread_mutex_lock(&progressMutex); + if((currentFrameNumber[ithread] - startAcquisitionIndex) > acquisitionIndex) + acquisitionIndex = currentFrameNumber[ithread] - startAcquisitionIndex; + if((currentFrameNumber[ithread] - startFrameIndex) > frameIndex[ithread]) + frameIndex[ithread] = currentFrameNumber[ithread] - startFrameIndex; + pthread_mutex_unlock(&progressMutex); //callback to write data - if (cbAction < DO_EVERYTHING){ - switch(myDetectorType){ - case EIGER: - for(uint32_t i=0;i 0) - writeFileWithoutCompression(wbuffer, npackets); + writeFileWithoutCompression(ithread, wbuffer, npackets); #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: Writing done\nGoing to copy frame\n"); #endif //copy frame for gui - if(npackets >= packetsPerFrame) - copyFrameToGui(wbuffer); + if(npackets >= packetsPerFrame)/**needs to be reworked*/ + copyFrameToGui(ithread, wbuffer); #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: Copied frame\n"); #endif - //free fifo addresses (eiger frees for each packet later) - if(myDetectorType != EIGER){ - while(!fifoFree[0]->push(wbuffer[0])); + //free fifo addresses + int listenfifoThread = ithread; + if(dataCompressionEnable) + listenfifoThread = 0; + while(!fifoFree[listenfifoThread]->push(wbuffer)); #ifdef EVERYFIFODEBUG - if(fifoFree[0]->getSemValue()<100) - cprintf(GREEN,"FifoFree[%d]: value:%d, push 0x%x\n",0,fifoFree[0]->getSemValue(),(void*)(wbuffer[0])); + if(fifoFree[listenfifoThread]->getSemValue()<100) + cprintf(GREEN,"FifoFree[%d]: value:%d, push 0x%x\n",listenfifoThread,fifoFree[listenfifoThread]->getSemValue(),(void*)(wbuffer)); #endif #ifdef DEBUG5 - cprintf(GREEN,"Writing_Thread %d: Freed buffer, pushed into fifofree %p for listener 0\n",ithread, (void*)(wbuffer[0])); + cprintf(GREEN,"Writing_Thread %d: Freed buffer, pushed into fifofree %p for listener %d \n",listenfifoThread, (void*)(wbuffer), listenfifoThread); #endif - } + } -void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* wbuffer[],uint32_t numpackets){ +void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* wbuffer,uint32_t numpackets){ FILE_LOG(logDEBUG) << __AT__ << " called"; - - //create headers for eiger -#ifdef WRITE_HEADERS - if (myDetectorType == EIGER && cbAction == DO_EVERYTHING) - createHeaders(wbuffer); -#endif - //if write enabled - if((fileWriteEnable) && (sfilefd)){ - int offset = HEADER_SIZE_NUM_TOT_PACKETS; //offset (not eiger) to keep track of how many packets saved - uint32_t packetsToSave; //how many packets to save at a time - volatile uint64_t tempframenumber; - int lastpacket; + if((fileWriteEnable) && (sfilefd[ithread])){ +cout< 0){ - //new file - if(packetsInFile >= (uint32_t)maxPacketsPerFile){ - //for packet loss, because currframenum is the latest one for eiger - //get frame number (eiger already gets it when it does packet to packet processing) - if(myDetectorType != EIGER){ - lastpacket = (((packetsToSave - 1) * onePacketSize) + offset); - if(myDetectorType == JUNGFRAU){ - jfrau_packet_header_t* header = (jfrau_packet_header_t*) (wbuffer[0] + lastpacket); - currentFrameNumber = (*( (uint32_t*) header->frameNumber))&0xffffff; - }else{ - tempframenumber = ((uint32_t)(*((uint32_t*)(wbuffer[0] + lastpacket)))); - //for gotthard and normal frame, increment frame number to separate fnum and pnum - if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1)) - tempframenumber++; - //get frame number - currentFrameNumber = (tempframenumber & frameIndexMask) >> frameIndexOffset; - } - - //set indices - acquisitionIndex = currentFrameNumber - startAcquisitionIndex; - frameIndex = currentFrameNumber - startFrameIndex; + //handle half frames from previous buffer + //second part to not check when there has been something written previously + if(numpackets &&(lastFrameNumberInFile[ithread])){ + //get start frame (required to create new file at the right juncture) + uint64_t startframe =-1; + if(getFrameNumber(ithread, wbuffer + offset, startframe) == FAIL){ + //error in frame number sent by fpga + while(!fifoFree[ithread]->push(wbuffer)); + return; } -#ifdef DEBUG3 - cprintf(GREEN,"Writing_Thread: Current Frame Number:%d\n",currentFrameNumber); -#endif - createNewFile(ithread); - } - //to create new file when max reached - packetsToSave = maxPacketsPerFile - packetsInFile; - if(packetsToSave > numpackets) - packetsToSave = numpackets; + cout<<"222"<push(wbuffer)); + return; + } + totalPacketsInFile[ithread] += numpackets; + lastFrameNumberInFile[ithread] = finalLastFrameNumberToSave+1; + currentFrameNumber[ithread] = finalLastFrameNumberToSave; + } + if(numberofWriterThreads > 1) pthread_mutex_lock(&writeMutex); - packetsInFile += numpackets; - packetsCaughtPerThread[ithread] += (numpackets - numMissingPackets); - pthread_mutex_lock(&progressMutex); - packetsCaught += (numpackets - numMissingPackets); - pthread_mutex_unlock(&progressMutex); - totalPacketsCaught += (numpackets - numMissingPackets); - numMissingPackets = 0; + packetsCaught += numpackets; + totalPacketsCaught += numpackets; if(numberofWriterThreads > 1) pthread_mutex_unlock(&writeMutex); } + //set indices + pthread_mutex_lock(&progressMutex); + if((currentFrameNumber[ithread] - startAcquisitionIndex) > acquisitionIndex) + acquisitionIndex = currentFrameNumber[ithread] - startAcquisitionIndex; + if((currentFrameNumber[ithread] - startFrameIndex) > frameIndex[ithread]) + frameIndex[ithread] = currentFrameNumber[ithread] - startFrameIndex; + pthread_mutex_unlock(&progressMutex); } -void UDPStandardImplementation::createHeaders(char* wbuffer[]){ - - - int port = 0, missingPacket; - bool exitVal = 0; - eiger_packet_header_t* wbuf_header; - eiger_packet_footer_t* wbuf_footer; - - for (uint32_t i = 0; i < packetsPerFrame; i++){ - - - wbuf_header = (eiger_packet_header_t*) wbuffer[i]; - wbuf_footer = (eiger_packet_footer_t*)(wbuffer[i] + footerOffset); -#ifdef DEBUG4 - cprintf(GREEN, "Loop index:%d Pnum:%d real fnum %d,missingPacket 0x%x\n", - i, - *( (uint16_t*) wbuf_footer->packetNumber), - (uint32_t)(*( (uint64_t*) wbuf_footer)), - *( (uint16_t*) wbuf_header->missingPacket) - ); cout <missingPacket)== missingPacketValue){ -#ifdef DEBUG4 - cprintf(RED,"-Missing packet at Loop Index %d\n", i); -#endif - missingPacket = 1; - - //DEBUGGING - if(*( (uint16_t*) wbuf_footer->packetNumber) != (i+1)){ - cprintf(BG_RED, "Writing_Thread: Packet Number Mismatch (missing p)! " - "i %d, real pnum %d, real fnum %d, missingPacket 0x%x\n", - i, - *( (uint16_t*) wbuf_footer->packetNumber), - (uint32_t)(*( (uint64_t*) wbuf_footer)), - *( (uint16_t*) wbuf_header->missingPacket)); - exitVal =1; - } - - //add frame number - *( (uint64_t*) wbuf_footer) = (currentFrameNumber+1) | (((uint64_t)(*( (uint16_t*) wbuf_footer->packetNumber)))<<0x30); - //*( (uint16_t*) wbuf_footer->packetNumber) = (i+1); // missing frames already have the right packet number -#ifdef DEBUG4 - cprintf(RED, "Missing Packet Loop index:%d fnum:%d Pnum:%d\n",i, - (uint32_t)(*( (uint64_t*) wbuf_footer)), - *( (uint16_t*) wbuf_footer->packetNumber)); -#endif - } - //normal packet - else{ - missingPacket = 0; - - //DEBUGGING - if(*( (uint16_t*) wbuf_footer->packetNumber) != ( (i>((packetsPerFrame/2)-1)?(i-(packetsPerFrame/2)+1):i+1) )){ - cprintf(BG_RED, "Writing_Thread: Packet Number Mismatch! " - "i %d, real pnum %d, real fnum %d, missingPacket 0x%x\n", - i, - *( (uint16_t*) wbuf_footer->packetNumber), - (uint32_t)(*( (uint64_t*) wbuf_footer)), - *( (uint16_t*) wbuf_header->missingPacket)); - exitVal =1; - } - - uint16_t p = *( (uint16_t*) wbuf_footer->packetNumber); - //correct the packet numbers of port2 so that port1 and 2 are not the same - if(port) *( (uint16_t*) wbuf_footer->packetNumber) = (p +(packetsPerFrame/2)); - - } - - //overwriting port number and dynamic range - *( (uint8_t*) wbuf_header->portIndex) = (uint8_t)port; - //*( (uint8_t*) wbuf_header->dynamicRange) = (uint8_t)dynamicRange; - - //DEBUGGING - if(*( (uint16_t*) wbuf_footer->packetNumber) != (i+1)){ - cprintf(BG_RED, "Writing_Thread: Packet Number Mismatch! " - "i %d, real pnum %d, real fnum %d, missingPacket 0x%x\n", - i, - *( (uint16_t*) wbuf_footer->packetNumber), - (uint32_t)(*( (uint64_t*) wbuf_footer)), - *( (uint16_t*) wbuf_header->missingPacket)); - exitVal =1; - } - } - - if(exitVal){exit(-1);} - -} - void UDPStandardImplementation::updateFileHeader(int ithread){ int xpix=-1,ypix=-1; @@ -2960,23 +2446,23 @@ void UDPStandardImplementation::updateFileHeader(int ithread){ } -void UDPStandardImplementation::copyFrameToGui(char* buffer[]){ +void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer){ FILE_LOG(logDEBUG) << __AT__ << " called"; //random read (gui not ready) //need to toggle guiDataReady or the second frame wont be copied - if((!FrameToGuiFrequency) && (!guiData)){ + if((!FrameToGuiFrequency) && (!guiData[ithread])){ #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: CopyingFrame: Resetting guiDataReady\n"); #endif pthread_mutex_lock(&dataReadyMutex); - guiDataReady=0; + guiDataReady[ithread]=0; pthread_mutex_unlock(&dataReadyMutex); } //if nthe frame, wait for your turn (1st frame always shown as its zero) - else if(FrameToGuiFrequency && ((frametoGuiCounter)%FrameToGuiFrequency)); + else if(FrameToGuiFrequency && ((frametoGuiCounter[ithread])%FrameToGuiFrequency)); //random read (gui ready) or nth frame read: gui needs data now or it is the first frame else{ @@ -2984,22 +2470,15 @@ void UDPStandardImplementation::copyFrameToGui(char* buffer[]){ cprintf(GREEN,"Writing_Thread: CopyingFrame: Gui needs data now OR 1st frame\n"); #endif pthread_mutex_lock(&dataReadyMutex); - guiDataReady=0; + guiDataReady[ithread]=0; #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: CopyingFrame: guidataready is 0, Copying data\n"); #endif - switch(myDetectorType){ - case EIGER: - for(uint32_t i=0; i> frameIndexOffset; - //handle multi threads - pthread_mutex_lock(&progressMutex); - if(tempframenumber > currentFrameNumber) - currentFrameNumber = tempframenumber; - pthread_mutex_unlock(&progressMutex); - //set indices - acquisitionIndex = currentFrameNumber - startAcquisitionIndex; - frameIndex = currentFrameNumber - startFrameIndex; + uint64_t tempframenumber=-1; + if(getFrameNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS, tempframenumber) == FAIL){ + //error in frame number sent by fpga + while(!fifoFree[ithread]->push(wbuffer)); + return; + } + currentFrameNumber[ithread] = tempframenumber; + //set indices + pthread_mutex_lock(&progressMutex); + if((currentFrameNumber[ithread] - startAcquisitionIndex) > acquisitionIndex) + acquisitionIndex = currentFrameNumber[ithread] - startAcquisitionIndex; + if((currentFrameNumber[ithread] - startFrameIndex) > frameIndex[ithread]) + frameIndex[ithread] = currentFrameNumber[ithread] - startFrameIndex; + pthread_mutex_unlock(&progressMutex); //variable definitions char* buff[2]={0,0}; //an array just to be compatible with copyframetogui - char* data = wbuffer[0]+ HEADER_SIZE_NUM_TOT_PACKETS; //data pointer to the next memory to be analysed + char* data = wbuffer+ HEADER_SIZE_NUM_TOT_PACKETS; //data pointer to the next memory to be analysed int ndata; //size of data returned uint32_t np; //remaining number of packets returned - uint32_t npackets = (uint32_t)(*((uint32_t*)wbuffer[0])); //number of total packets + uint32_t npackets = (uint32_t)(*((uint32_t*)wbuffer)); //number of total packets int remainingsize = npackets * onePacketSize; //size of the memory slot to be analyzed eventType thisEvent = PEDESTAL; @@ -3120,8 +2599,8 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer //cout << "Fill in event: frmNr: " << iFrame << " ix " << ix << " iy " << iy << " type " << thisEvent << endl; #else pthread_mutex_lock(&writeMutex); - if((fileWriteEnable) && (sfilefd)) - singlePhotonDetectorObject[ithread]->writeCluster(sfilefd); + if((fileWriteEnable) && (sfilefd[0])) + singlePhotonDetectorObject[ithread]->writeCluster(sfilefd[0]); pthread_mutex_unlock(&writeMutex); #endif } @@ -3129,38 +2608,229 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer } nf++; + + #ifndef ALLFILE - pthread_mutex_lock(&progressMutex); - packetsInFile += packetsPerFrame; - packetsCaughtPerThread[0] += packetsPerFrame; - packetsCaught += packetsPerFrame; - totalPacketsCaught += packetsPerFrame; - if(packetsInFile >= (uint32_t)maxPacketsPerFile) - createNewFile(0); - pthread_mutex_unlock(&progressMutex); + totalPacketsInFile[ithread] += (bufferSize/packetsPerFrame); + pthread_mutex_lock(&writeMutex); + if((packetsCaught%packetsPerFrame) >= (uint32_t)maxFramesPerFile) + createNewFile(ithread); + packetsCaught += (bufferSize/packetsPerFrame); + totalPacketsCaught += (bufferSize/packetsPerFrame); + pthread_mutex_unlock(&writeMutex); + #endif if(!once){ - copyFrameToGui(buff); + copyFrameToGui(ithread, buff[0]); once = 1; } } remainingsize -= ((buff[0] + ndata) - data); data = buff[0] + ndata; - if(data > (wbuffer[0] + HEADER_SIZE_NUM_TOT_PACKETS + npackets * onePacketSize) ) + if(data > (wbuffer + HEADER_SIZE_NUM_TOT_PACKETS + npackets * onePacketSize) ) cprintf(BG_RED,"Writing_Thread %d: Error: Compression data goes out of bounds!\n", ithread); } - while(!fifoFree[0]->push(wbuffer[0])); + while(!fifoFree[0]->push(wbuffer)); #ifdef EVERYFIFODEBUG if(fifoFree[0]->getSemValue()<100) - cprintf(GREEN,"FifoFree[%d]: value:%d, push 0x%x\n",0,fifoFree[0]->getSemValue(),(void*)(wbuffer[0])); + cprintf(GREEN,"FifoFree[%d]: value:%d, push 0x%x\n",0,fifoFree[0]->getSemValue(),(void*)(wbuffer)); #endif #ifdef DEBUG5 - cprintf(GREEN,"Writing_Thread %d: Compression free pushed into fifofree %p for listerner 0\n", ithread, (void*)(wbuffer[0])); + cprintf(GREEN,"Writing_Thread %d: Compression free pushed into fifofree %p for listerner 0\n", ithread, (void*)(wbuffer)); #endif } + +int UDPStandardImplementation::getFrameNumber(int ithread, char* wbuffer, uint64_t &tempframenumber){ + FILE_LOG(logDEBUG) << __AT__ << " called"; + + eiger_packet_footer_t* footer=0; + jfrau_packet_header_t* header=0; + int pnum=-1; + + switch(myDetectorType){ + + case EIGER: + footer = (eiger_packet_footer_t*)(wbuffer + footerOffset); + tempframenumber = (uint32_t)(*( (uint64_t*) footer)); + //error in frame number sent by fpga + if(!((uint32_t)(*( (uint64_t*) footer)))){ + tempframenumber = -1; + FILE_LOG(logERROR) << "Fifo "<< ithread << ": Frame Number is zero from firmware."; + return FAIL; + } +//#ifdef DEBUG4 + if(!ithread) cprintf(GREEN,"Writing_Thread %d: fnum:%lld pnum:%d FPGA_fnum:%d footeroffset:%d\n", + ithread, + (long long int)tempframenumber, + (*( (uint16_t*) footer->packetNumber)), + (uint32_t)(*( (uint64_t*) footer)), + footerOffset); +//#endif + break; + + case JUNGFRAU: + header = (jfrau_packet_header_t*)(wbuffer); + tempframenumber = (*( (uint32_t*) header->frameNumber))&frameIndexMask; +#ifdef DEBUG4 + cprintf(GREEN, "Writing_Thread %d: fnum:%lld\t pnum:%d\n", + (long long int)tempframenumber, + (*( (uint8_t*) header->packetNumber))); +#endif + break; + + default: + tempframenumber = ((uint32_t)(*((uint32_t*)(wbuffer)))); + //for gotthard and normal frame, increment frame number to separate fnum and pnum + if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1)) + tempframenumber++; + pnum = tempframenumber&packetIndexMask; + tempframenumber = (tempframenumber & frameIndexMask) >> frameIndexOffset; +#ifdef DEBUG4 + cprintf(GREEN, "Writing_Thread %d: fnum:%lld\t pnum:%d\n", + (long long int)tempframenumber, + pnum); +#endif + + break; + } + return OK; +} + + + + + +int UDPStandardImplementation::writeUptoFrameNumber(int ithread, char* wbuffer, int &offset, uint64_t nextFrameNumber, uint32_t numpackets, int &numPacketsWritten){ + FILE_LOG(logDEBUG) << __AT__ << " called"; + + + bool expectedoffsetATlastpacket = false; + int startoffset = offset; + if(!ithread) cout<= endoffset){ + expectedoffset = startoffset + ((numpackets -1) * onePacketSize); + expectedoffsetATlastpacket = true; + } + offset = expectedoffset; + if(!ithread) cout<push(wbuffer)); + return FAIL; + } + if(!ithread) cout<=nextFrameNumber){ + if(!ithread) cout<=nextFrameNumber){ + offset -= (onePacketSize*packetsPerFrame);/** its ok..if jonbsperthread is 1, go packet by packet*/ + if(!ithread) cout<push(wbuffer)); + return FAIL; + } + if(!ithread) cout<push(wbuffer)); + return FAIL; + } + if(!ithread) cout<endoffset){if(!ithread) cout<push(wbuffer)); + return FAIL; + } + if(!ithread) cout<endoffset){ + offset = endoffset; + if(!ithread) cout< end offset so offset now:"<nextFrameNumber){ + offset -= onePacketSize; + if(!ithread) cout<push(wbuffer)); + return FAIL; + } + if(!ithread) cout<getTotalFramesCaught(); + cout<<"frames caught sent:"<differentClients){ FILE_LOG(logDEBUG) << "Force update"; @@ -2487,9 +2488,9 @@ int slsReceiverTCPIPInterface::set_dynamic_range() { dynamicrange = retval; if(myDetectorType == EIGER){ if(!tenGigaEnable) - packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicrange * EIGER_MAX_PORTS; + packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicrange; else - packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicrange * EIGER_MAX_PORTS; + packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicrange; }else if (myDetectorType == JUNGFRAU) packetsPerFrame = JFRAU_PACKETS_PER_FRAME; }