diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index cc69e0da0..2a8a553c7 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -463,6 +463,16 @@ private: */ void handleWithoutDataCompression(int ithread, char* wbuffer,uint32_t npackets); + /** + * Called by processWritingBuffer for jungfrau + * writes to dummy file, doesnt need to read packet numbers + * Copies data for gui display and frees addresses popped from FIFOs + * @param ithread writing thread index + * @param wbuffer writing buffer popped out from FIFO + * @param npackets number of packets + */ + void handleWithoutMissingPackets(int ithread, char* wbuffer,uint32_t npackets); + /** * Calle by handleWithoutDataCompression * Creating headers Writing to file without compression diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index 1f726bb7e..08627ffc8 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -2099,6 +2099,7 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch int offset = fifoBufferHeaderSize; int pnum = packetsPerFrame-1; int currentpnum; + int currentfnum=-1; //read first packet header receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + offset, JFRAU_HEADER_LENGTH); @@ -2110,6 +2111,9 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch //correct packet if(currentpnum == pnum){ + //complete frame, get frame number while u can + if(pnum == 0) + (*((uint32_t*)(buffer[ithread]+8))) = (*( (uint32_t*) header->frameNumber))&frameIndexMask; receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + offset, oneDataSize); if(!receivedSize) return 0; offset+=oneDataSize; @@ -2133,7 +2137,6 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch header = (jfrau_packet_header_t*)(buffer[ithread] + offset); currentpnum = (*( (uint8_t*) header->packetNumber)); } - } }//----- got a whole frame ------- @@ -2516,8 +2519,11 @@ void UDPStandardImplementation::startWriting(){ + //jungfrau + if(myDetectorType == JUNGFRAU) + handleWithoutMissingPackets(ithread, wbuf, numPackets); //normal - if(!dataCompressionEnable) + else if(!dataCompressionEnable) handleWithoutDataCompression(ithread, wbuf, numPackets); //compression @@ -2760,7 +2766,6 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* wbuffer, uint32_t npackets){ FILE_LOG(logDEBUG) << __AT__ << " called"; - //get current frame number uint64_t tempframenumber; uint32_t pnum; @@ -2772,13 +2777,13 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* return; } + //callback to write data if (cbAction < DO_EVERYTHING) rawDataReadyCallBack((int)tempframenumber, wbuffer + fifoBufferHeaderSize, npackets * onePacketSize, sfilefd[ithread], latestData[ithread],pRawDataReady);//know which thread from sfilefd - //write to file if enabled and update write parameters if(npackets > 0) writeFileWithoutCompression(ithread, wbuffer, npackets); @@ -2814,6 +2819,76 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* +void UDPStandardImplementation::handleWithoutMissingPackets(int ithread, char* wbuffer, uint32_t npackets){ + FILE_LOG(logDEBUG) << __AT__ << " called"; + + //get current frame number + uint64_t tempframenumber; + tempframenumber = (*((uint32_t*)(buffer[ithread]+8))); + cout<<"handling: frame number:"< 0){ + if((fileWriteEnable) && (sfilefd[ithread])){ + if(tempframenumber >= maxFramesPerFile) + createNewFile(ithread); + fwrite(wbuffer, 1, oneDataSize*packetsPerFrame+fifoBufferHeaderSize, sfilefd[ithread]); + } + + totalPacketsInFile[ithread] += npackets; + totalWritingPacketCount[ithread] += npackets; + lastFrameNumberInFile[ithread] = tempframenumber; + currentFrameNumber[ithread] = tempframenumber; + + if(numberofWriterThreads > 1) + pthread_mutex_lock(&writeMutex); + packetsCaught += npackets; + totalPacketsCaught += npackets; + if((currentFrameNumber[ithread] - startAcquisitionIndex) > acquisitionIndex) + acquisitionIndex = currentFrameNumber[ithread] - startAcquisitionIndex; + if((currentFrameNumber[ithread] - startFrameIndex) > frameIndex[ithread]) + frameIndex[ithread] = currentFrameNumber[ithread] - startFrameIndex; + + if(numberofWriterThreads > 1) + pthread_mutex_unlock(&writeMutex); + + } +#ifdef DEBUG4 + cprintf(GREEN,"Writing_Thread: Writing done\nGoing to copy frame\n"); +#endif + + + //copy frame for gui + //if(npackets >= (packetsPerFrame/numberofListeningThreads)) + if(dataStreamEnable && npackets > 0) + copyFrameToGui(ithread, wbuffer,npackets); +#ifdef DEBUG4 + cprintf(GREEN,"Writing_Thread: Copied frame\n"); +#endif + + + //free fifo addresses + int listenfifoThread = ithread; + if(dataCompressionEnable) + listenfifoThread = 0; + while(!fifoFree[listenfifoThread]->push(wbuffer)); +#ifdef EVERYFIFODEBUG + if(fifoFree[listenfifoThread]->getSemValue()<100) + cprintf(GREEN,"FifoFree[%d]: value:%d, push 0x%x\n",listenfifoThread,fifoFree[listenfifoThread]->getSemValue(),(void*)(wbuffer)); +#endif +#ifdef DEBUG5 + cprintf(GREEN,"Writing_Thread %d: Freed buffer, pushed into fifofree %p for listener %d \n",listenfifoThread, (void*)(wbuffer), listenfifoThread); +#endif + +} + + + void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* wbuffer,uint32_t numpackets){ FILE_LOG(logDEBUG) << __AT__ << " called";