diff --git a/slsReceiverSoftware/include/UDPBaseImplementation.h b/slsReceiverSoftware/include/UDPBaseImplementation.h index 1e1c7bdf9..4e29e0bc6 100644 --- a/slsReceiverSoftware/include/UDPBaseImplementation.h +++ b/slsReceiverSoftware/include/UDPBaseImplementation.h @@ -567,16 +567,19 @@ protected: uint32_t frameIndex; /** Frames Caught for each real time acquisition (eg. for each scan) */ - int packetsCaught; + uint32_t packetsCaught; /** Total packets caught for an entire acquisition (including all scans) */ - int totalPacketsCaught; + uint32_t totalPacketsCaught; /** Pckets currently in current file, starts new file when it reaches max */ - int packetsInFile; + uint32_t packetsInFile; /** Number of missing packets in file (sometimes packetsinFile is incorrect due to padded packets for eiger)*/ - int numTotMissingPacketsInFile; + uint32_t numTotMissingPacketsInFile; + + /** Number of missing packets in an acquisition(sometimes packetsinFile is incorrect due to padded packets for eiger)*/ + uint32_t numMissingPackets; /** Frame index at start of an entire acquisition (including all scans) */ uint32_t startAcquisitionIndex; diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index f53897625..3cd559b52 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -123,6 +123,7 @@ void UDPStandardImplementation::initializeMembers(){ totalPacketsCaught = 0; packetsInFile = 0; numTotMissingPacketsInFile = 0; + numMissingPackets = 0; startAcquisitionIndex = 0; acquisitionIndex = 0; packetsPerFrame = 0; @@ -1278,6 +1279,7 @@ int UDPStandardImplementation::setupWriter(){ //reset writing thread variables packetsInFile=0; numTotMissingPacketsInFile = 0; + numMissingPackets = 0; packetsCaught=0; frameIndex=0; if(sfilefd) sfilefd=NULL; @@ -1936,16 +1938,37 @@ int UDPStandardImplementation::startWriting(){ int ret,i,j; + char* tofree[packetsPerFrame] ; + int tofreeoffset[packetsPerFrame]; char* tempbuffer[packetsPerFrame]; char* blankframe[packetsPerFrame]; int blankoffset; int tempoffset[numListeningThreads]; - if(myDetectorType == EIGER) + if(myDetectorType == EIGER){ for(i=0;inum3)) != 0xFF) + cprintf(RED,"blank frame header is not FF\n"); + + cprintf(GREEN,"packet %d blank frame 0x%x\n",i,(void*)(blankframe[i])); } + //last packet numbers for different dynamic ranges + switch(dynamicRange){ + case 4: LAST_PACKET_VALUE = 0x40; break; + case 8: LAST_PACKET_VALUE = 0x80; break; + case 16: LAST_PACKET_VALUE = 0xff; break; + case 32: LAST_PACKET_VALUE = 0xff; break; + default: break; + } + + } + while(1){ @@ -1972,6 +1995,7 @@ int UDPStandardImplementation::startWriting(){ popready[i] = true; startdatapacket[i] = false; tempoffset[i] = (i*packetsPerFrame/numListeningThreads); + tofreeoffset[i] = (i*packetsPerFrame/numListeningThreads); blankoffset = 0; lastpacketheader[i] = -1; currentpacketheader[i] = -1; @@ -1983,34 +2007,6 @@ int UDPStandardImplementation::startWriting(){ - if(myDetectorType == EIGER){ - - for(i=0;ipop(wbuf[i]); #ifdef FIFO_DEBUG - cprintf(GREEN,"%d writer poped from fifo %x\n", ithread, (void*)(wbuf[i])); + cprintf(GREEN,"%d writer poped 0x%x from fifo %d\n", ithread, (void*)(wbuf[i]), i); #endif numpackets[i] = (uint32_t)(*((uint32_t*)wbuf[i])); #ifdef VERYDEBUG cprintf(GREEN,"%d numpackets: %d for fifo :%d\n", ithread, numpackets[i], i); #endif //dont pop again if dummy packet - if(!numpackets[i]) + if(!numpackets[i]){ popready[i] = false; + cprintf(RED,"%d dummy frame popped out of fifo %d",ithread, i); + }else{ + tofree[tofreeoffset[i]] = wbuf[i]; + tofreeoffset[i]++; + } + } } @@ -2040,11 +2042,31 @@ int UDPStandardImplementation::startWriting(){ //end of acquisition if((!numpackets[0])&& (!numpackets[1])){ -#ifdef VERYDEBUG - cprintf(GREEN,"%d End of Acquisition in Writing Thread\n", ithread); -#endif - stopWriting(ithread,wbuf); - continue; +//#ifdef VERYDEBUG + cprintf(GREEN,"%d Both dummy frames\n", ithread); +//#endif + //remaning packets to be written + if((myDetectorType == EIGER) && + ((tempoffset[0]!=0) || (tempoffset[1]!=(packetsPerFrame/numListeningThreads)))){ + cprintf(RED,"**missing packets and end of acquisition\n"); + for(i=0;iheader_confirm))){ - - //new frame (no datapacket received yet), update frame num and corrected for fnum reset for scans - if(!startdatapacket[i]){ - tempframenum[i] = htonl(*(unsigned int*)((eiger_image_header *)((char*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum)+(startFrameIndex-1); -//#ifdef VERYVERBOSE - cprintf(GREEN,"**tempfraemnum of %d: %d\n",i,tempframenum[i]); -//#endif - }//next frame, leave - else{ - cprintf(RED,"**missing packets and got header\n"); - //add missing packets - numberofmissingpackets[i] = (LAST_PACKET_VALUE - lastpacketheader[i]); - //to decrement from packetsInFile to calculate packet loss - numTotMissingPacketsInFile += numberofmissingpackets[i]; - for(j=0;jnum4))); -#ifdef VERYVERBOSE - cprintf(GREEN,"** got current packet header of %d: %d lastpacketheader %d\n",i,currentpacketheader[i],lastpacketheader[i]); +#ifdef VERBOSE + else + cprintf(RED, "WARNING: Dummy packet: %d from fifo %d\n", numpackets[i],i); #endif - //same frame packet - continue building frame - if(currentpacketheader[i] > lastpacketheader[i]){ - //add missing packets - numberofmissingpackets[i] = (currentpacketheader[i] - lastpacketheader[i] -1); - //to decrement from packetsInFile to calculate packet loss - numTotMissingPacketsInFile += numberofmissingpackets[i]; - for(j=0;jpush((wbuf[i]))); +//#ifdef FIFO_DEBUG + cprintf(GREEN,"%d writer freed unknown length pushed into fifofree %x for listener %d\n",ithread, (void*)(wbuf[i]),i); +//#endif + } + continue; + } + + //not dummy buffer and not after getting a full frame + if(numpackets[i] && (!fullframe[i])){ + + //header packet + if( 0x01 == (*(uint8_t*)(((eiger_image_header *)((char*)(wbuf[i] + HEADER_SIZE_NUM_TOT_PACKETS)))->header_confirm))){ + + //new frame (no datapacket received yet), update frame num and corrected for fnum reset for scans + if(!startdatapacket[i]){ + tempframenum[i] = (htonl(*(unsigned int*)((eiger_image_header *)((char*)(wbuf[i] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum)); + if(!tempframenum[i]) + cprintf(RED,"**VERY WEIRD frame numbers for fifo %d: %d\n",i,tempframenum[i]); + tempframenum[i] += (startFrameIndex-1); + //#ifdef VERYVERBOSE + cprintf(GREEN,"**tempfraemnum of %d: %d\n",i,tempframenum[i]); + //#endif + }//next frame, leave + else{ + cprintf(RED,"**missing packets and got header\n"); + //add missing packets + numberofmissingpackets[i] = (LAST_PACKET_VALUE - lastpacketheader[i]); + //to decrement from packetsInFile to calculate packet loss + for(j=0;jnum4))); +#ifdef VERYVERBOSE + cprintf(GREEN,"**fifo:%d currentpacketheader: %d lastpacketheader %d\n",i,currentpacketheader[i],lastpacketheader[i]); +#endif + //same frame packet - continue building frame + if(currentpacketheader[i] > lastpacketheader[i]){ + //add missing packets + numberofmissingpackets[i] = (currentpacketheader[i] - lastpacketheader[i] -1); + //to decrement from packetsInFile to calculate packet loss + for(j=0;jpush(tofree[j])); +#ifdef FIFO_DEBUG + cprintf(GREEN,"%d writer freed pushed into fifofree %x for listener %d\n",ithread, (void*)(tofree[j]),0); +#endif + } + for(j=(packetsPerFrame/numListeningThreads);jpush(tofree[j])); +#ifdef FIFO_DEBUG + cprintf(GREEN,"%d writer freed pushed into fifofree %x for listener %d\n",ithread, (void*)(tofree[j]),1); +#endif + } + + + +//#ifdef VERYDEBUG + cprintf(GREEN,"finished freeing\n"); +//#endif //reset a few stuff for(int i=0;ipush(wbuffer[i])); -#ifdef FIFO_DEBUG +//#ifdef FIFO_DEBUG cprintf(GREEN,"%d writer free dummy pushed into fifofree %x for listener %d\n", ithread,(void*)(wbuffer[i]),i); -#endif +//#endif } @@ -2480,8 +2559,8 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer[]){ cprintf(GREEN, "Status: Run Finished\n"); if(!totalPacketsCaught){ - cprintf(GREEN, "Total Packets Caught:%d\n", totalPacketsCaught); - cprintf(GREEN, "Total Frames Caught:%d\n",(totalPacketsCaught/packetsPerFrame)); + cprintf(RED, "Total Packets Caught: 0\n"); + cprintf(RED, "Total Frames Caught: 0\n"); }else{ cprintf(GREEN, "Total Packets Caught:%d\n", totalPacketsCaught); cprintf(GREEN, "Total Frames Caught:%d\n",(totalPacketsCaught/packetsPerFrame)); @@ -2505,7 +2584,7 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer[]){ void UDPStandardImplementation::writeToFile_withoutCompression(char* buf[],int numpackets, uint32_t framenum){ FILE_LOG(logDEBUG) << __AT__ << " called"; - +cout<<"in write to file numpackets:"< numpackets) packetsToSave = numpackets; /**next time offset is still plus header length*/ - if(myDetectorType == EIGER) - for(i=0;i= maxPacketsPerFile){ @@ -2641,8 +2724,9 @@ void UDPStandardImplementation::writeToFile_withoutCompression(char* buf[],int n if(numWriterThreads > 1) pthread_mutex_lock(&write_mutex); packetsInFile += numpackets; - packetsCaught += numpackets; - totalPacketsCaught += numpackets; + packetsCaught += (numpackets - numMissingPackets); + totalPacketsCaught += (numpackets - numMissingPackets); + numMissingPackets = 0; if(numWriterThreads > 1) pthread_mutex_unlock(&write_mutex); } @@ -2734,16 +2818,22 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char* cprintf(GREEN,"copied frame\n"); //#endif - for(i=0;inum3)) != 0xFF) - while(!fifoFree[i]->push(wbuffer[j+i*(packetsPerFrame/2)])); -#ifdef FIFO_DEBUG - cprintf(GREEN,"%d writer freed pushed into fifofree %x for listener %d\n",ithread, (void*)(wbuffer[j+i*(packetsPerFrame/2)])); -#endif - } +/* + for(j=0;jnum3)) != 0xFF){ + + while(!fifoFree[(j/(packetsPerFrame/2))]->push(&(wbuffer[j] - HEADER_SIZE_NUM_TOT_PACKETS))); + + //#ifdef FIFO_DEBUG + cprintf(GREEN,"%d writer freed pushed into fifofree %x for listener %d\n",ithread, (void*)(wbuffer[j]- HEADER_SIZE_NUM_TOT_PACKETS),(j/(packetsPerFrame/2))); + //#endif + }else cprintf(GREEN,"blank frame 0x%x\n",(void*)(wbuffer[j])); } + + //#ifdef VERYDEBUG + cprintf(GREEN,"finished freeing\n"); + //#endif + */ } else{ //copy to gui