From 356801a3ddfcde0cd2d7cff3aaef9088889832e5 Mon Sep 17 00:00:00 2001 From: l_maliakal_d Date: Tue, 17 Dec 2013 09:30:46 +0000 Subject: [PATCH] improved packet loss statistics and multi threaded locking for receiver git-svn-id: file:///afs/psi.ch/project/sls_det_software/svn/slsDetectorSoftware@709 951219d9-93cf-4727-9268-0efd64621fa3 --- .../slsReceiver/slsReceiverFunctionList.cpp | 70 ++++++++++++++----- 1 file changed, 54 insertions(+), 16 deletions(-) diff --git a/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.cpp b/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.cpp index 124c3d7cb..545fb32bc 100644 --- a/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.cpp +++ b/slsDetectorSoftware/slsReceiver/slsReceiverFunctionList.cpp @@ -801,7 +801,7 @@ int slsReceiverFunctionList::createNewFile(){ << "%\tframenum " << dec << currframenum //<< "\t\t p " << prevframenum << "\tindex " << dec << getFrameIndex() - << "\tpackets lost " << dec << (currframenum-prevframenum)-(packetsInFile/packetsPerFrame) << endl; + << "\tlost " << dec << (((int)(currframenum-prevframenum))-(packetsInFile/packetsPerFrame)) << endl; } } @@ -1111,11 +1111,12 @@ int slsReceiverFunctionList::startWriting(){ thread_started = 1; - int numpackets; + int numpackets,tempframenum; char* wbuf; while(1){ + while(receiver_threads_running){ @@ -1172,12 +1173,20 @@ int slsReceiverFunctionList::startWriting(){ //for progress if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) - currframenum = (((((uint32_t)(*((uint32_t*)(wbuf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); + tempframenum = (((((uint32_t)(*((uint32_t*)(wbuf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); else - currframenum = ((((uint32_t)(*((uint32_t*)(wbuf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); + tempframenum = ((((uint32_t)(*((uint32_t*)(wbuf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); + if(numWriterThreads == 1) + currframenum = tempframenum; + else{ + pthread_mutex_lock(&progress_mutex); + if(tempframenum > currframenum) + currframenum = tempframenum; + pthread_mutex_unlock(&progress_mutex); + } #ifdef VERYDEBUG - cout << ithread << " currframnum:" << dec << currframenum << endl; + cout << ithread << " tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; #endif @@ -1185,8 +1194,13 @@ int slsReceiverFunctionList::startWriting(){ if(!dataCompression){ if (cbAction < DO_EVERYTHING) rawDataReadyCallBack(currframenum, wbuf, numpackets * onePacketSize, sfilefd, guiData,pRawDataReady); - else if (numpackets > 0) + else if (numpackets > 0){ + if(numWriterThreads >1) + pthread_mutex_lock(&progress_mutex); writeToFile_withoutCompression(wbuf, numpackets); + if(numWriterThreads >1) + pthread_mutex_unlock(&progress_mutex); + } while(!fifoFree->push(wbuf)); #ifdef VERYVERBOSE cout<<"buf freed:"<<(void*)wbuf< 0){ //for progress and packet loss calculation(new files) if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) - currframenum = (((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); + tempframenum = (((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); else - currframenum = ((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); + tempframenum = ((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); + if(numWriterThreads == 1) + currframenum = tempframenum; + else{ + if(tempframenum > currframenum) + currframenum = tempframenum; + } #ifdef VERYDEBUG - cout << " currframnum:" << dec << currframenum << endl; + cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; #endif //to create new file when max reached @@ -1239,14 +1257,35 @@ void slsReceiverFunctionList::writeToFile_withoutCompression(char* buf,int numpa packetsToSave = numpackets; fwrite(buf+offset, 1, packetsToSave * onePacketSize, sfilefd); - offset += (packetsToSave * onePacketSize); packetsInFile += packetsToSave; packetsCaught += packetsToSave; totalPacketsCaught += packetsToSave; - numpackets -= packetsToSave; + //new file - if(packetsInFile >= maxPacketsPerFile) + if(packetsInFile >= maxPacketsPerFile){ + lastpacket = (((packetsToSave - 1) * onePacketSize) + offset); + + //for packet loss + if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + tempframenum = (((((uint32_t)(*((uint32_t*)(buf + lastpacket))))+1)& (frameIndexMask)) >> frameIndexOffset); + else + tempframenum = ((((uint32_t)(*((uint32_t*)(buf + lastpacket))))& (frameIndexMask)) >> frameIndexOffset); + + if(numWriterThreads == 1) + currframenum = tempframenum; + else{ + if(tempframenum > currframenum) + currframenum = tempframenum; + } +#ifdef VERYDEBUG + cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; +#endif + createNewFile(); + } + + offset += (packetsToSave * onePacketSize); + numpackets -= packetsToSave; } } //no file write @@ -1256,7 +1295,6 @@ void slsReceiverFunctionList::writeToFile_withoutCompression(char* buf,int numpa totalPacketsCaught += numpackets; } - pthread_mutex_unlock(&(progress_mutex)); } #endif