merged with developer

This commit is contained in:
Dhanya Maliakal
2016-10-28 08:18:01 +02:00
2 changed files with 40 additions and 51 deletions

View File

@ -446,6 +446,7 @@ enum communicationProtocol{
close(file_des); close(file_des);
} }
else { else {
while(!shutdown(socketDescriptor, SHUT_RDWR));
close(socketDescriptor); close(socketDescriptor);
socketDescriptor=-1; socketDescriptor=-1;
} }

View File

@ -1408,12 +1408,14 @@ void UDPStandardImplementation::setThreadPriorities(){
break; break;
} }
} }
for(int i = 0; i < numberofDataCallbackThreads; ++i){ if(dataStreamEnable){
if(rights) for(int i = 0; i < numberofDataCallbackThreads; ++i){
if (pthread_setschedparam(dataCallbackThreads[i], SCHED_RR, &datacallback_param) == EPERM){ if(rights)
rights = false; if (pthread_setschedparam(dataCallbackThreads[i], SCHED_RR, &datacallback_param) == EPERM){
break; rights = false;
} break;
}
}
} }
if (pthread_setschedparam(pthread_self(),5 , &tcp_param) == EPERM) if (pthread_setschedparam(pthread_self(),5 , &tcp_param) == EPERM)
rights = false; rights = false;
@ -1534,12 +1536,6 @@ int UDPStandardImplementation::setupWriter(){
int UDPStandardImplementation::createNewFile(int ithread){ int UDPStandardImplementation::createNewFile(int ithread){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
int index = 0;
if(totalWritingPacketCount[ithread]){
index = frameIndex[ithread];
cout << "\nThread " << ithread << "\tFile:" << completeFileName[ithread] <<endl;
}
//create file name //create file name
if(!frameIndexEnable) if(!frameIndexEnable)
sprintf(completeFileName[ithread], "%s/%s_%lld.raw", filePath,fileNamePerThread[ithread],(long long int)fileIndex); sprintf(completeFileName[ithread], "%s/%s_%lld.raw", filePath,fileNamePerThread[ithread],(long long int)fileIndex);
@ -1576,20 +1572,21 @@ int UDPStandardImplementation::createNewFile(int ithread){
//Print packet loss and filenames //Print packet loss and filenames
if(!totalWritingPacketCount[ithread]){ if(!totalWritingPacketCount[ithread]){
frameNumberInPreviousFile[ithread] = -1; frameNumberInPreviousFile[ithread] = -1;
cout << "Thread " << ithread << " File:" << completeFileName[ithread] << endl; printf("Thread:%d File:%s\n",ithread,completeFileName[ithread]);
}else{ }else{
cout if(frameNumberInPreviousFile[ithread] == -1)
//<< "Packet Loss:" << frameNumberInPreviousFile[ithread] = startFrameIndex -1;
//setw(4)<<fixed << setprecision(4) <<
//dec << (int)((( (currentFrameNumber-1-previousFrameNumber) - ((packetsInFile-numTotMissingPacketsInFile)/packetsPerFrame))/ printf("\nThread:%d File:%s\n"
// (double)(currentFrameNumber-1-previousFrameNumber))*100.000) //"\ttotalpacketsinfile:%d\t"
//<< "%\t" "Packets Lost:%d"
<< "\tPackets Lost:" << dec << ( ((int)(currentFrameNumber[ithread]-1-frameNumberInPreviousFile[ithread])*packetsPerFrame) - //"\tCurrentFrameNumber:%lld\tPreviousFrameNumber:%lld"
totalPacketsInFile[ithread]) "\n",
<< "\tCurrentFrameNumber:" << currentFrameNumber[ithread] ithread,completeFileName[ithread],
<< "\tPreviousFrameNumber:" << frameNumberInPreviousFile[ithread] //totalPacketsInFile[ithread],
//<< "\tIndex:" << dec << index ( ((int)(currentFrameNumber[ithread]-frameNumberInPreviousFile[ithread])*packetsPerFrame) - totalPacketsInFile[ithread])
<< endl; //,currentFrameNumber[ithread],frameNumberInPreviousFile[ithread]
);
} }
//write file header //write file header
@ -1599,7 +1596,7 @@ int UDPStandardImplementation::createNewFile(int ithread){
//reset counters for each new file //reset counters for each new file
if(totalWritingPacketCount[ithread]){ if(totalWritingPacketCount[ithread]){
frameNumberInPreviousFile[ithread] = currentFrameNumber[ithread]-1; frameNumberInPreviousFile[ithread] = currentFrameNumber[ithread];
totalPacketsInFile[ithread] = 0; totalPacketsInFile[ithread] = 0;
} }
@ -2314,7 +2311,7 @@ void UDPStandardImplementation::stopListening(int ithread, int numbytes){
listeningThreadsMask^=(1<<ithread); listeningThreadsMask^=(1<<ithread);
//#ifdef DEBUG4 //#ifdef DEBUG4
//cprintf(BLUE,"Listening_Thread %d: Resetting mask of listening thread. New Mask: 0x%x", ithread, listeningThreadsMask); //cprintf(BLUE,"Listening_Thread %d: Resetting mask of listening thread. New Mask: 0x%x", ithread, listeningThreadsMask);
FILE_LOG(logINFO) << "Listening Thread of " << udpPortNum[ithread] << "got " << totalListeningPacketCount[ithread] << " packets"; FILE_LOG(logINFO) << "Listening Thread of " << udpPortNum[ithread] << " got " << totalListeningPacketCount[ithread] << " packets";
//cprintf(BLUE,"Listening_Thread %d: Frames listened to :%d\n",ithread, (totalListeningPacketCount[ithread]/packetsPerFrame)); //cprintf(BLUE,"Listening_Thread %d: Frames listened to :%d\n",ithread, (totalListeningPacketCount[ithread]/packetsPerFrame));
//#endif //#endif
pthread_mutex_unlock(&(statusMutex)); pthread_mutex_unlock(&(statusMutex));
@ -2682,18 +2679,19 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
} }
if(totalWritingPacketCount[ithread]){ if(totalWritingPacketCount[ithread]){
cout << "\nThread " << ithread << "\tFile:" << completeFileName[ithread] <<endl; if(frameNumberInPreviousFile[ithread]==-1)
//<< "Packet Loss:" << frameNumberInPreviousFile[ithread] = startFrameIndex-1;
//setw(4)<<fixed << setprecision(4) <<
//dec << (int)((( (currentFrameNumber-1-previousFrameNumber) - ((packetsInFile-numTotMissingPacketsInFile)/packetsPerFrame))/ printf("\nThread:%d File:%s\n"
// (double)(currentFrameNumber-1-previousFrameNumber))*100.000) //"\ttotalpacketsinfile:%d\t"
//<< "%\t" "Packets Lost:%d"
cout << "\tPackets Lost:" << dec << ( ((int)(currentFrameNumber[ithread]-frameNumberInPreviousFile[ithread])*packetsPerFrame) - //"\tCurrentFrameNumber:%lld\tPreviousFrameNumber:%lld"
totalPacketsInFile[ithread]) "\n",
<< "\tCurrentFrameNumber:" << currentFrameNumber[ithread] ithread,completeFileName[ithread],
<< "\tPreviousFrameNumber:" << frameNumberInPreviousFile[ithread] //totalPacketsInFile[ithread],
//<< "\tIndex:" << dec << index ( ((int)(currentFrameNumber[ithread]-frameNumberInPreviousFile[ithread])*packetsPerFrame) - totalPacketsInFile[ithread])
<< endl; //,currentFrameNumber[ithread],frameNumberInPreviousFile[ithread]
);
} }
closeFile(ithread); closeFile(ithread);
pthread_mutex_lock(&statusMutex); pthread_mutex_lock(&statusMutex);
@ -2773,20 +2771,10 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
return; return;
} }
//update current frame number
currentFrameNumber[ithread] = tempframenumber;
//set indices
pthread_mutex_lock(&progressMutex);
if((currentFrameNumber[ithread] - startAcquisitionIndex) > acquisitionIndex)
acquisitionIndex = currentFrameNumber[ithread] - startAcquisitionIndex;
if((currentFrameNumber[ithread] - startFrameIndex) > frameIndex[ithread])
frameIndex[ithread] = currentFrameNumber[ithread] - startFrameIndex;
pthread_mutex_unlock(&progressMutex);
//callback to write data //callback to write data
if (cbAction < DO_EVERYTHING) if (cbAction < DO_EVERYTHING)
rawDataReadyCallBack((int)currentFrameNumber[ithread], wbuffer + fifoBufferHeaderSize, npackets * onePacketSize, rawDataReadyCallBack((int)tempframenumber, wbuffer + fifoBufferHeaderSize, npackets * onePacketSize,
sfilefd[ithread], latestData[ithread],pRawDataReady);//know which thread from sfilefd sfilefd[ithread], latestData[ithread],pRawDataReady);//know which thread from sfilefd
@ -2891,7 +2879,7 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w
packetsCaught += packetsWritten; packetsCaught += packetsWritten;
totalPacketsCaught += packetsWritten; totalPacketsCaught += packetsWritten;
pthread_mutex_unlock(&writeMutex); pthread_mutex_unlock(&writeMutex);
currentFrameNumber[ithread] += lastFrameNumberInFile[ithread]; currentFrameNumber[ithread] = lastFrameNumberInFile[ithread];
} }
} }
} }