improved packet loss statistics and multi threaded locking for receiver

git-svn-id: file:///afs/psi.ch/project/sls_det_software/svn/slsDetectorSoftware@709 951219d9-93cf-4727-9268-0efd64621fa3
This commit is contained in:
l_maliakal_d 2013-12-17 09:30:46 +00:00
parent 3fa2fe5392
commit 356801a3dd

View File

@ -801,7 +801,7 @@ int slsReceiverFunctionList::createNewFile(){
<< "%\tframenum " << "%\tframenum "
<< dec << currframenum //<< "\t\t p " << prevframenum << dec << currframenum //<< "\t\t p " << prevframenum
<< "\tindex " << dec << getFrameIndex() << "\tindex " << dec << getFrameIndex()
<< "\tpackets lost " << dec << (currframenum-prevframenum)-(packetsInFile/packetsPerFrame) << endl; << "\tlost " << dec << (((int)(currframenum-prevframenum))-(packetsInFile/packetsPerFrame)) << endl;
} }
} }
@ -1111,11 +1111,12 @@ int slsReceiverFunctionList::startWriting(){
thread_started = 1; thread_started = 1;
int numpackets; int numpackets,tempframenum;
char* wbuf; char* wbuf;
while(1){ while(1){
while(receiver_threads_running){ while(receiver_threads_running){
@ -1172,12 +1173,20 @@ int slsReceiverFunctionList::startWriting(){
//for progress //for progress
if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
currframenum = (((((uint32_t)(*((uint32_t*)(wbuf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); tempframenum = (((((uint32_t)(*((uint32_t*)(wbuf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset);
else else
currframenum = ((((uint32_t)(*((uint32_t*)(wbuf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); tempframenum = ((((uint32_t)(*((uint32_t*)(wbuf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset);
if(numWriterThreads == 1)
currframenum = tempframenum;
else{
pthread_mutex_lock(&progress_mutex);
if(tempframenum > currframenum)
currframenum = tempframenum;
pthread_mutex_unlock(&progress_mutex);
}
#ifdef VERYDEBUG #ifdef VERYDEBUG
cout << ithread << " currframnum:" << dec << currframenum << endl; cout << ithread << " tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl;
#endif #endif
@ -1185,8 +1194,13 @@ int slsReceiverFunctionList::startWriting(){
if(!dataCompression){ if(!dataCompression){
if (cbAction < DO_EVERYTHING) if (cbAction < DO_EVERYTHING)
rawDataReadyCallBack(currframenum, wbuf, numpackets * onePacketSize, sfilefd, guiData,pRawDataReady); rawDataReadyCallBack(currframenum, wbuf, numpackets * onePacketSize, sfilefd, guiData,pRawDataReady);
else if (numpackets > 0) else if (numpackets > 0){
if(numWriterThreads >1)
pthread_mutex_lock(&progress_mutex);
writeToFile_withoutCompression(wbuf, numpackets); writeToFile_withoutCompression(wbuf, numpackets);
if(numWriterThreads >1)
pthread_mutex_unlock(&progress_mutex);
}
while(!fifoFree->push(wbuf)); while(!fifoFree->push(wbuf));
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout<<"buf freed:"<<(void*)wbuf<<endl; cout<<"buf freed:"<<(void*)wbuf<<endl;
@ -1214,9 +1228,7 @@ int slsReceiverFunctionList::startWriting(){
void slsReceiverFunctionList::writeToFile_withoutCompression(char* buf,int numpackets){ void slsReceiverFunctionList::writeToFile_withoutCompression(char* buf,int numpackets){
int packetsToSave, offset; int packetsToSave, offset,tempframenum,lastpacket;
pthread_mutex_lock(&progress_mutex);
//file write //file write
if((enableFileWrite) && (sfilefd)){ if((enableFileWrite) && (sfilefd)){
@ -1225,12 +1237,18 @@ void slsReceiverFunctionList::writeToFile_withoutCompression(char* buf,int numpa
while(numpackets > 0){ while(numpackets > 0){
//for progress and packet loss calculation(new files) //for progress and packet loss calculation(new files)
if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
currframenum = (((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); tempframenum = (((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset);
else else
currframenum = ((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); tempframenum = ((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset);
if(numWriterThreads == 1)
currframenum = tempframenum;
else{
if(tempframenum > currframenum)
currframenum = tempframenum;
}
#ifdef VERYDEBUG #ifdef VERYDEBUG
cout << " currframnum:" << dec << currframenum << endl; cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl;
#endif #endif
//to create new file when max reached //to create new file when max reached
@ -1239,15 +1257,36 @@ void slsReceiverFunctionList::writeToFile_withoutCompression(char* buf,int numpa
packetsToSave = numpackets; packetsToSave = numpackets;
fwrite(buf+offset, 1, packetsToSave * onePacketSize, sfilefd); fwrite(buf+offset, 1, packetsToSave * onePacketSize, sfilefd);
offset += (packetsToSave * onePacketSize);
packetsInFile += packetsToSave; packetsInFile += packetsToSave;
packetsCaught += packetsToSave; packetsCaught += packetsToSave;
totalPacketsCaught += packetsToSave; totalPacketsCaught += packetsToSave;
numpackets -= packetsToSave;
//new file //new file
if(packetsInFile >= maxPacketsPerFile) if(packetsInFile >= maxPacketsPerFile){
lastpacket = (((packetsToSave - 1) * onePacketSize) + offset);
//for packet loss
if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
tempframenum = (((((uint32_t)(*((uint32_t*)(buf + lastpacket))))+1)& (frameIndexMask)) >> frameIndexOffset);
else
tempframenum = ((((uint32_t)(*((uint32_t*)(buf + lastpacket))))& (frameIndexMask)) >> frameIndexOffset);
if(numWriterThreads == 1)
currframenum = tempframenum;
else{
if(tempframenum > currframenum)
currframenum = tempframenum;
}
#ifdef VERYDEBUG
cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl;
#endif
createNewFile(); createNewFile();
} }
offset += (packetsToSave * onePacketSize);
numpackets -= packetsToSave;
}
} }
//no file write //no file write
else{ else{
@ -1256,7 +1295,6 @@ void slsReceiverFunctionList::writeToFile_withoutCompression(char* buf,int numpa
totalPacketsCaught += numpackets; totalPacketsCaught += numpackets;
} }
pthread_mutex_unlock(&(progress_mutex));
} }
#endif #endif