From 169a91b6aceb7b952815e08d73d5d63408d8a602 Mon Sep 17 00:00:00 2001 From: Dhanya Maliakal Date: Fri, 16 Oct 2015 10:47:21 +0200 Subject: [PATCH] some --- .../include/UDPStandardImplementation.h | 80 +- .../src/UDPStandardImplementation.cpp | 817 ++++++++---------- 2 files changed, 405 insertions(+), 492 deletions(-) diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index 0c1d4ac43..6b08fab2b 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -317,15 +317,6 @@ private: */ void startListening(); - /** - * Thread started which writes packets to file. - * It pops the fifo, processes and writes packets to file and pushes the addresses into the fifoFree - * This is continuously looped for each buffer in a nested loop, which is again looped for each acquisition - * Exits only for changing dynamic range, 10G parameters etc and recreated - * - */ - void startWriting(); - /** * Called by startListening * Listens to buffer, until packet(s) received or shutdownUDPsocket called by client @@ -368,6 +359,16 @@ private: */ uint32_t processListeningBuffer(int ithread, int cSize,char* temp); + /** + * Thread started which writes packets to file. + * It calls popAndCheckEndofAcquisition to pop fifo and check if it is a dummy end buffer + * It then calls a function to process and write packets to file and pushes the addresses into the fifoFree + * This is continuously looped for each buffer in a nested loop, which is again looped for each acquisition + * Exits only for changing dynamic range, 10G parameters etc and recreated + * + */ + void startWriting(); + /** * Called by StartWriting * Pops buffer from all the FIFOs and checks for dummy frames and end of acquisition @@ -392,9 +393,40 @@ private: */ void stopWriting(int ithread, char* wbuffer[]); - void processWritingBuffer(int ithread, char* wbuffer[], uint32_t nP[]); - void processWritingBufferPacketByPacket(); + /** + * Called by startWriting or processWritingBufferPacketByPacket upon reading a frame (for eiger) + * Updates parameters, (writes headers for eiger) and writes to file when not a dummy frame + * Copies data for gui display and frees addresses popped from FIFOs + * @param ithread writing thread index + * @param wbuffer writing buffer popped out from FIFO + * @param npackets number of packets + */ + void handleWithoutDataCompression(int ithread, char* wbuffer[],int npackets); + /** + * Calle by handleWithoutDataCompression + * Creating headers Writing to file without compression + * @param wbuffer is the address of buffer popped out of FIFO + * @param numpackets is the number of packets + */ + void writeFileWithoutCompression(char* wbuffer[],int numpackets); + + /** + * Called by writeToFileWithoutCompression() + * Create headers for file writing (at the moment, this is eiger specific) + * @param wbuffer writing buffer popped from FIFOs + */ + void createHeaders(char* wbuffer[]); + + /** + * Called by handleWithoutDataCompression and handleWithCompression after writing to file + * Copy frames for GUI and updates appropriate parameters for frequency frames to gui + * Uses semaphore for nth frame mode + * @param buffer buffer to copy + */ + void copyFrameToGui(char* buffer[]); + + void processWritingBufferPacketByPacket(); /************************************************************************* * Class Members ********************************************************* @@ -674,11 +706,7 @@ private: private: - /** - * Copy frames to gui - * uses semaphore for nth frame mode - */ - void copyFrameToGui(char* startbuf[], char* buf=NULL); + /** * Creates new tree and file for compression @@ -688,26 +716,6 @@ private: */ int createCompressionFile(int ithr, int iframe); - /** - * Writing to file without compression - * @param buf is the address of buffer popped out of fifo - * @param numpackets is the number of packets - * @param framenum current frame number - */ - void writeToFile_withoutCompression(char* buf[],int numpackets, uint32_t framenum); - - - - /** - * updates parameters and writes to file when not a dummy frame - * Also calls writeToFile_withoutCompression or handleDataCompression - * Called by startWriting() - * @param ithread writing thread number - * @param wbuffer writer buffer - * @param npackets number of packets - */ - void handleWithoutDataCompression(int ithread, char* wbuffer[],int npackets); - /** * data compression for each fifo output * @param ithread writing thread number diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index 1d6a1eb23..f01997cf8 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -775,8 +775,11 @@ int UDPStandardImplementation::startReceiver(char *c){ measurementStarted = false; startFrameIndex = 0; frameIndex = 0; - if(!acqStarted) + if(!acqStarted){ currentFrameNumber = 0; //has to be zero to add to startframeindex for each scan + acquisitionIndex = 0; + frameIndex = 0; + } for(int i = 0; i < numberofListeningThreads; ++i) totalListeningFrameCount[i] = 0; packetsCaught = 0; @@ -983,7 +986,7 @@ void UDPStandardImplementation::readFrame(char* c,char** raw, uint64_t &startAcq cprintf(CYAN,"Info: gonna post\n"); #endif //release after getting data - sem_post(&smp); + sem_post(&writerGuiSemaphore); } #ifdef DEBUG4 cprintf(CYAN,"Info: done post\n"); @@ -1771,7 +1774,10 @@ void UDPStandardImplementation::startWriting(){ processWritingBufferPacketByPacket(); break; default: - processWritingBuffer(ithread, wbuf, numPackets); + if(!dataCompressionEnable) + handleWithoutDataCompression(ithread, wbuf, numPackets[0]); + else + handleDataCompression(ithread,wbuf,d, xmax, ymax, nf); break; } @@ -1899,180 +1905,372 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer[]){ cprintf(GREEN,"Info: Writing_Thread %d: End of Acquisition\n",ithread); //free fifo - for(int i=0; ipush(wbuffer[i])); -#ifdef -} - -void UDPStandardImplementation::processWritingBuffer(int ithread, char* wbuffer[], uint32_t nP[]){ - FILE_LOG(logDEBUG1) << __AT__ << " called"; - - -} - - -void UDPStandardImplementation::processWritingBufferPacketByPacket(int ithread, char* wbuffer[], uint32_t nP[]){ - FILE_LOG(logDEBUG1) << __AT__ << " called"; - - uint64_t tempframenumber = ((uint32_t)(*((uint32_t*)(wbuffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))); - - //for gotthard and normal frame, increment frame number to separate fnum and pnum - if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1)) - tempframenumber++; - - //get frame number - tempframenumber = (tempframenumber & frameIndexMask) >> frameIndexOffset; - - //single thread, just assign and process without compression - if(!dataCompressionEnable){ - currentFrameNumber = tempframenumber; - handleWithoutDataCompression(ithread, wbuffer, nP[0]); +#ifdef FIFODEBUG + cprintf(GREEN,"Writing_Thread %d: Freeing dummy-end buffer. Pushed into fifofree %p for listener %d\n", ithread,(void*)(wbuffer[i]),i); +#endif } - //handling multiple threads - else{ - pthread_mutex_lock(&progressMutex); - if(tempframenumber > currentFrameNumber) - currentFrameNumber = tempframenumber; - pthread_mutex_unlock(&progressMutex); - handleDataCompression(ithread,wbuffer,d, xmax, ymax, nf); + //all threads need to close file, reset mask and exit loop + closeFile(ithread); + pthread_mutex_lock(&statusMutex); + writerThreadsMask^=(1<> frameIndexOffset; + //set indices + acquisitionIndex = currentFrameNumber - startAcquisitionIndex; + frameIndex = currentFrameNumber - startFrameIndex; } - -} - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -/* - acquisitionIndex = currframenum - startAcquisitionIndex - frameIndex = currframenum - startFrameIndex; - -} -*/ - - - - -void UDPStandardImplementation::copyFrameToGui(char* startbuf[], char* buf){ - FILE_LOG(logDEBUG1) << __AT__ << " called"; -#ifdef VERY_VERY_DEBUG -cout << "copyframe" << endl; + //callback to write data + if (cbAction < DO_EVERYTHING){ + switch(myDetectorType){ + case EIGER: + for(i=0;i 0) + writeFileWithoutCompression(wbuffer, npackets); +#ifdef DEBUG4 + cprintf(GREEN,"Writing_Thread: Writing done\nGoing to copy frame\n"); #endif - //random read when gui not ready , also command line doesnt have nthframetogui - //else guidata always null as guidataready is always 1 after 1st frame, and seccond data never gets copied - if((!nFrameToGui) && (!guiData)){ -#ifdef VERY_VERY_DEBUG - cprintf(GREEN,"doing nothing\n"); + + //copy frame for gui + if(npackets >= packetsPerFrame) + copyFrameToGui(wbuffer); +#ifdef DEBUG4 + cprintf(GREEN,"Writing_Thread: Copied frame\n"); +#endif + + + //free fifo addresses (eiger frees for each packet later) + if(myDetectorType != EIGER){ + while(!fifoFree[0]->push(wbuffer[0])); +#ifdef FIFODEBUG + cprintf(GREEN,"Writing_Thread %d: Freed buffer, pushed into fifofree %p for listener 0\n",ithread, (void*)(wbuffer[0])); +#endif + } +} + + + + +void UDPStandardImplementation::writeFileWithoutCompression(char* wbuffer[],int numpackets){ + FILE_LOG(logDEBUG1) << __AT__ << " called"; + + int i; + + //create headers for eiger +#ifdef WRITE_HEADERS + if (myDetectorType == EIGER && cbAction == DO_EVERYTHING) + createHeaders(wbuffer); +#endif + + //if write enabled + if((fileWriteEnable) && (sfilefd)){ + int offset = HEADER_SIZE_NUM_TOT_PACKETS; //offset (not eiger) to keep track of how many packets saved + int packetsToSave; //how many packets to save at a time + volatile uint64_t tempframenumber; + int lastpacket; + + //loop to take care of creating new files when it reaches max packets per file + while(numpackets > 0){ + + //to create new file when max reached + packetsToSave = maxPacketsPerFile - packetsInFile; + if(packetsToSave > numpackets) + packetsToSave = numpackets; + + //write to file + if(cbAction == DO_EVERYTHING){ + switch(myDetectorType){ + case EIGER: + for(i=0; i= (uint32_t)maxPacketsPerFile){ + //for packet loss, because currframenum is the latest one for eiger + if(myDetectorType != EIGER){ + lastpacket = (((packetsToSave - 1) * onePacketSize) + offset); + + //get frame number (eiger already gets it when it does packet to packet processing) + tempframenumber = ((uint32_t)(*((uint32_t*)(wbuffer[0] + lastpacket)))); + //for gotthard and normal frame, increment frame number to separate fnum and pnum + if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1)) + tempframenumber++; + //get frame number + currentFrameNumber = (tempframenumber & frameIndexMask) >> frameIndexOffset; + //set indices + acquisitionIndex = currentFrameNumber - startAcquisitionIndex; + frameIndex = currentFrameNumber - startFrameIndex; + } +#ifdef DEBUG3 + cprintf(GREEN,"Writing_Thread: Current Frame Number:%d\n",currentFrameNumber); +#endif + createNewFile(); + } + + //increase offset + if(myDetectorType != EIGER) + offset += (packetsToSave * onePacketSize); + numpackets -= packetsToSave; + } + } + + //only update parameters + else{ + if(numberofWriterThreads > 1) pthread_mutex_lock(&writeMutex); + packetsInFile += numpackets; + packetsCaught += (numpackets - numMissingPackets); + totalPacketsCaught += (numpackets - numMissingPackets); + numMissingPackets = 0; + if(numberofWriterThreads > 1) pthread_mutex_unlock(&writeMutex); + } + +} + + + + + +void UDPStandardImplementation::createHeaders(char* wbuffer[]){ + + eiger_packet_header_t* wbuf_header=0; + eiger_packet_footer_t* wbuf_footer=0; + int port = 0, missingPacket; + + for (int i = 0; i < packetsPerFrame; i++){ + + wbuf_header = (eiger_packet_header_t*) wbuffer[i]; + wbuf_footer = (eiger_packet_footer_t*)(wbuffer[i] + footerOffset); +#ifdef DEBUG4 + cprintf(GREEN, "Loop index:%d Pnum:%d\n",i,*( (uint16_t*) wbuf_footer->packetNumber)); +#endif + //which port + if (i ==(packetsPerFrame/2)) port = 1; + + //missing packet + if (*( (uint16_t*) wbuf_header->missingPacket)== missingPacketValue){ +#ifdef DEBUG4 + cprintf(GREEN,"Missing packet at %d\n", i+1); +#endif + missingPacket = 1; + //add frame and packet numbers + *( (uint64_t*) wbuf_footer) = (uint64_t)((currentFrameNumber+1)); + *( (uint16_t*) wbuf_footer->packetNumber) = (i+1); + } + //normal packet + else{ + missingPacket = 0; + //correct the packet numbers of port2 so that port1 and 2 are not the same + if(port) *( (uint16_t*) wbuf_footer->packetNumber) = (*( (uint16_t*) wbuf_footer->packetNumber))+(packetsPerFrame/2); + } + //DEBUGGING + if(*( (uint16_t*) wbuf_footer->packetNumber) != (i+1)){ + cprintf(BG_RED, "Packet Number Mismatch! i:%d pnum:%d fnum:%d missingPacket:%d\n", + i,*( (uint16_t*) wbuf_footer->packetNumber),currentFrameNumber,*( (uint16_t*) wbuf_header->missingPacket)); + exit(-1); + } + //overwriting port number and dynamic range + *( (uint8_t*) wbuf_header->portIndex) = port; + *( (uint8_t*) wbuf_header->dynamicRange) = dynamicRange; + } +} + + +void UDPStandardImplementation::copyFrameToGui(char* buffer[]){ + FILE_LOG(logDEBUG1) << __AT__ << " called"; + + int i; + + //random read (gui not ready) + //need to toggle guiDataReady or the second frame wont be copied + if((!FrameToGuiFrequency) && (!guiData)){ +#ifdef DEBUG4 + cprintf(GREEN,"Writing_Thread: CopyingFrame: Resetting guiDataReady\n"); #endif pthread_mutex_lock(&dataReadyMutex); guiDataReady=0; pthread_mutex_unlock(&dataReadyMutex); } - //random read or nth frame read, gui needs data now or it is the first frame + //random read (gui ready) or nth frame read: gui needs data now or it is the first frame else{ -#ifdef VERY_VERY_DEBUG - cprintf(GREEN,"gui needs data now or 1st frame\n"); +#ifdef DEBUG4 + cprintf(GREEN,"Writing_Thread: CopyingFrame: Gui needs data now OR 1st frame\n"); #endif pthread_mutex_lock(&dataReadyMutex); guiDataReady=0; -#ifdef VERY_VERY_DEBUG - cprintf(GREEN,"guidataready is 0, copying data\n"); +#ifdef DEBUG4 + cprintf(GREEN,"Writing_Thread: CopyingFrame: guidataready is 0, Copying data\n"); #endif - //eiger - if(startbuf != NULL){ + switch(myDetectorType){ + case EIGER: + for(int i=0; ipush(wbuffer[i])); -#ifdef FIFO_DEBUG - cprintf(GREEN,"%d writer free dummy pushed into fifofree %x for listener %d\n", ithread,(void*)(wbuffer[i]),i); -#endif - } - - - - //all threads need to close file, reset mask and exit loop - closeFile(ithread); - pthread_mutex_lock(&statusMutex); - writerthreads_mask^=(1< 0){ - - //for progress and packet loss calculation(new files) - if(myDetectorType == EIGER); - else if ((myDetectorType == PROPIX)||((myDetectorType == GOTTHARD) && (shortFrame == -1))) - tempframenum = (((((uint32_t)(*((uint32_t*)(buf[0] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); - else - tempframenum = ((((uint32_t)(*((uint32_t*)(buf[0] + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); - - if(numWriterThreads == 1) - currframenum = tempframenum; - else{ - if(tempframenum > currframenum) - currframenum = tempframenum; - } -#ifdef VERYDEBUG - cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; -#endif - - //lock - if(numWriterThreads > 1) - pthread_mutex_lock(&write_mutex); - - - //to create new file when max reached - packetsToSave = maxPacketsPerFile - packetsInFile; - if(packetsToSave > numpackets) - packetsToSave = numpackets; -/**next time offset is still plus header length*/ - if(myDetectorType == EIGER){ - for(i=0;i= (uint32_t)maxPacketsPerFile){ - - //for packet loss, because currframenum is the latest one for eiger - if(myDetectorType != EIGER){ - lastpacket = (((packetsToSave - 1) * onePacketSize) + offset); - - if ((myDetectorType == PROPIX)||((myDetectorType == GOTTHARD) && (shortFrame == -1))) - - tempframenum = (((((uint32_t)(*((uint32_t*)(buf[0] + lastpacket))))+1)& (frameIndexMask)) >> frameIndexOffset); - else - tempframenum = ((((uint32_t)(*((uint32_t*)(buf[0] + lastpacket))))& (frameIndexMask)) >> frameIndexOffset); - } - if(numWriterThreads == 1) - currframenum = tempframenum; - else{ - if(tempframenum > currframenum) - currframenum = tempframenum; - } -#ifdef VERYDEBUG - cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; -#endif - //create - createNewFile(); - } - - //unlock - if(numWriterThreads > 1) - pthread_mutex_unlock(&write_mutex); - - if(myDetectorType != EIGER) - offset += (packetsToSave * onePacketSize); - numpackets -= packetsToSave; - } - - } - else{ - if(numWriterThreads > 1) - pthread_mutex_lock(&write_mutex); - packetsInFile += numpackets; - packetsCaught += (numpackets - numMissingPackets); - totalPacketsCaught += (numpackets - numMissingPackets); - numMissingPackets = 0; - if(numWriterThreads > 1) - pthread_mutex_unlock(&write_mutex); - } -} - - - - - - - - -void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* wbuffer[],int npackets){ - int i, missingpacket,port = 0; - - - if (cbAction < DO_EVERYTHING){ - if (myDetectorType == EIGER){ - for(i=0;i 0){ - -#ifdef WRITE_HEADERS - if (myDetectorType == EIGER){ - - eiger_packet_header_t* wbuf_header=0; - eiger_packet_footer_t* wbuf_footer=0; - - - for (i = 0; i < packetsPerFrame; i++){ - - wbuf_header = (eiger_packet_header_t*) wbuffer[i]; - wbuf_footer = (eiger_packet_footer_t*)(wbuffer[i] + footer_offset); -#ifdef EIGER_DEBUG3 - cprintf(GREEN, "i:%d pnum:%d \n",i,*( (uint16_t*) wbuf_footer->packetnum)); -#endif - //which port - if (i ==(packetsPerFrame/2)) - port = 1; - - - - //missing packet - if (*( (uint16_t*) wbuf_header->missingpacket)== missingPacketValue){ -#ifdef VERY_VERBOSE - cprintf(GREEN,"missing packet at %d\n", i+1); -#endif - missingpacket = 1; - //add frame and packet numbers - *( (uint64_t*) wbuf_footer) = (uint64_t)((currframenum+1)); - *( (uint16_t*) wbuf_footer->packetnum) = (i+1); - }else{ - missingpacket = 0; - - if(*( (uint16_t*) wbuf_footer->packetnum)!= (i-(port*packetsPerFrame/numListeningThreads))+1){ - cprintf(BG_RED, "pnum mismatch num4! i:%d pnum:%d fnum:%d\n", - i,*( (uint16_t*) wbuf_footer->packetnum),currframenum); - exit(-1); - } - - //move packet numbers to num2, and compensate for port1 starting pnum from 0 - if(port) - *( (uint16_t*) wbuf_footer->packetnum) = (*( (uint16_t*) wbuf_footer->packetnum))+(packetsPerFrame/2); - } - - if(*( (uint16_t*) wbuf_footer->packetnum) != (i+1)){ - cprintf(BG_RED, "pnum mismatch! i:%d pnum:%d fnum:%d\n", - i,*( (uint16_t*) wbuf_footer->packetnum),currframenum); - if (*( (uint16_t*) wbuf_header->missingpacket) == missingPacketValue) - cprintf(BG_RED,"missing packet though\n"); - exit(-1); - } - - //overwriting port number and dynamic range - *( (uint8_t*) wbuf_header->portnum) = port; - *( (uint8_t*) wbuf_header->dynamicrange) = dynamicRange; - - - -#ifdef VERYDEBUG - if((i==0)||(i==1)){ - cprintf(GREEN, "%d packet header:0x%016llx missingpacket:0x%x\n",i, - (uint64_t)()*( (uint64_t*) wbuf_header)), *( (uint16_t*) wbuf_header->missingpacket)); - - cprintf(GREEN, "%d - 0x%x - %d\n", i, - *( (uint16_t*) wbuf_header->missingpacket), *( (uint16_t*) wbuf_footer->packetnum)); - } -#endif - - } - } -#endif - - writeToFile_withoutCompression(wbuffer, npackets,currframenum); - } - -#ifdef VERYDEBUG - cprintf(GREEN,"written everyting\n"); -#endif - } - - - if(myDetectorType == EIGER) { - -#ifdef VERYDEBUG - cprintf(GREEN,"gonna copy frame\n"); -#endif - copyFrameToGui(wbuffer); -#ifdef VERYDEBUG - cprintf(GREEN,"copied frame\n"); -#endif - }else{ - //copy to gui - if(npackets >= packetsPerFrame){//min 1 frame, but neednt be - //if(npackets == packetsPerFrame * numJobsPerThread){ //only full frames - copyFrameToGui(NULL,wbuffer[0]+HEADER_SIZE_NUM_TOT_PACKETS); -#ifdef VERYVERBOSE - cout << ithread << " finished copying" << endl; -#endif - } - - //else cout << "unfinished buffersize" << endl; - while(!fifoFree[0]->push(wbuffer[0])); -#ifdef FIFO_DEBUG - cprintf(GREEN,"%d writer freed pushed into fifofree %x for listener 0\n",ithread, (void*)(wbuffer[0])); -#endif - - } -} @@ -2875,6 +2764,22 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer[], char* data, int xmax, int ymax, int &nf){ FILE_LOG(logDEBUG1) << __AT__ << " called"; + uint64_t tempframenumber = ((uint32_t)(*((uint32_t*)(wbuffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))); + //for gotthard and normal frame, increment frame number to separate fnum and pnum + if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1)) + tempframenumber++; + //get frame number + tempframenumber = (tempframenumber & frameIndexMask) >> frameIndexOffset; + + + pthread_mutex_lock(&progressMutex); + if(tempframenumber > currentFrameNumber) + currentFrameNumber = tempframenumber; + pthread_mutex_unlock(&progressMutex); + //set indices + acquisitionIndex = currentFrameNumber - startAcquisitionIndex; + frameIndex = currentFrameNumber - startFrameIndex; + #if defined(MYROOT1) && defined(ALLFILE_DEBUG) writeToFile_withoutCompression(wbuf[0], numpackets,currframenum); #endif