changed the receiver to use mmap and msync instead of fwrite each time, so no botteleneck after some frames as before

git-svn-id: file:///afs/psi.ch/project/sls_det_software/svn/slsDetectorSoftware@516 951219d9-93cf-4727-9268-0efd64621fa3
This commit is contained in:
l_maliakal_d 2013-04-18 09:06:43 +00:00
parent 461a7029e7
commit fd80a8b628
2 changed files with 58 additions and 59 deletions

View File

@ -20,13 +20,10 @@
#include <sys/socket.h> // socket(), bind(), listen(), accept(), shut down
#include <arpa/inet.h> // sock_addr_in, htonl, INADDR_ANY
#include <stdlib.h> // exit()
#include <iomanip> //set precision
#include <sys/mman.h> //munmap
//#include <sched.h> //sched_idle
//#include <fcntl.h> //posix_fadvice
#include <string.h>
#include <iostream>
using namespace std;
@ -248,7 +245,7 @@ int slsReceiverFunctionList::startReceiver(){
cout << "ERROR: Could not prioritize threads. You need to be super user for that." << endl;
if (pthread_setschedparam(writing_thread, policy, &write_param) == EPERM)
cout << "ERROR: Could not prioritize threads. You need to be super user for that." << endl;
if (pthread_setschedparam(pthread_self(),policy , &tcp_param) == EPERM)
if (pthread_setschedparam(pthread_self(),5 , &tcp_param) == EPERM)
cout << "ERROR: Could not prioritize threads. You need to be super user for that." << endl;
@ -319,8 +316,9 @@ int slsReceiverFunctionList::startListening(){
// very end of the program.
do {
if (strchr(eth,'.')!=NULL) strcpy(eth,"");
//creating udp socket
if (strchr(eth,'.')!=NULL) strcpy(eth,"");
if(!strlen(eth)){
cout<<"warning:eth is empty.listening to all"<<endl;
udpSocket = new genericSocket(server_port,genericSocket::UDP,bufferSize/packetsPerFrame,packetsPerFrame);
@ -328,7 +326,6 @@ int slsReceiverFunctionList::startListening(){
cout<<"eth:"<<eth<<endl;
udpSocket = new genericSocket(server_port,genericSocket::UDP,bufferSize/packetsPerFrame,packetsPerFrame,eth);
}
if (udpSocket->getErrorStatus()){
#ifdef VERBOSE
std::cout<< "Could not create UDP socket "<< server_port << std::endl;
@ -344,7 +341,7 @@ int slsReceiverFunctionList::startListening(){
fifofree->pop(buffer);
//receiver 2 half frames / 1 short frame / 40 moench frames
rc = udpSocket->ReceiveDataOnly(buffer,bufferSize);//sizeof(buffer));
rc = udpSocket->ReceiveDataOnly(buffer,bufferSize);
if( rc < 0)
cerr << "recvfrom() failed" << endl;
@ -413,6 +410,9 @@ int slsReceiverFunctionList::startWriting(){
cout << "In startWriting()" <<endl;
#endif
void *address;
int memsize = bufferSize*maxFramesPerFile;
char *wbuf;
int sleepnumber=0;
@ -422,62 +422,59 @@ int slsReceiverFunctionList::startWriting(){
if(sfilefd) sfilefd=0;
strcpy(savefilename,"");
cout << "Max Frames Per File:" << maxFramesPerFile << endl;
if (writeReceiverData)
cout << "Note: Data Write has been defined exernally" << endl;
if(nFrameToGui)
cout << " Not implemented yet: Sending every " << nFrameToGui << "th frame to gui" << endl;
cout << "Ready!" << endl;
//by default, we read/write everything
cbAction=2;
//acquisition start
//acquisition start call back returns enable write
if (startAcquisitionCallBack)
cbAction=startAcquisitionCallBack(filePath,fileName,fileIndex,bufferSize,pStartAcquisition);
if(enableFileWrite==0 || cbAction==0)
cout << endl << "Note: Data will not be saved" << endl;
//create file name
if(frameIndexNeeded==-1) sprintf(savefilename, "%s/%s_%d.raw", filePath,fileName,fileIndex);
else sprintf(savefilename, "%s/%s_f%012d_%d.raw", filePath,fileName,framesCaught,fileIndex);
//start writing
if(enableFileWrite || cbAction>0){
sfilefd = fopen((const char *) (savefilename), "w");
cout << savefilename << endl;
//posix_fadvise(fileno(sfilefd),0,0,POSIX_FADV_DONTNEED|POSIX_FADV_SEQUENTIAL);
}
cout << "Ready!" << endl;
while(listening_thread_running || (!fifo->isEmpty())){
//when it reaches maxFramesPerFile,start writing new file
if (framesInFile == maxFramesPerFile) {
//start a new file
if ((framesInFile == maxFramesPerFile) || (strlen(savefilename) == 0)){
//create file name
if(frameIndexNeeded==-1) sprintf(savefilename, "%s/%s_%d.raw", filePath,fileName,fileIndex);
else sprintf(savefilename, "%s/%s_f%012d_%d.raw", filePath,fileName,framesCaught,fileIndex);
//start writing in new file
if(enableFileWrite || cbAction>0){
//fsync(fileno(sfilefd));
//sync file and close fd
if(sfilefd) {
msync(address,memsize, MS_ASYNC);
munmap(address,memsize);
fclose(sfilefd);
sfilefd = fopen((const char *) (savefilename), "w");
//posix_fadvise(fileno(sfilefd),0,0,POSIX_FADV_DONTNEED|POSIX_FADV_SEQUENTIAL);
}
//currframenum=(int)(*((int*)latestData));
//create file , truncate size and map file to memory
if (NULL == (sfilefd = fopen((const char *) (savefilename), "w+"))){
cout << "Error: Could not create file " << savefilename << endl;
break;
}
if (-1 == ftruncate(fileno(sfilefd),memsize)) {
perror("Error:Could not truncate file:");
break;
}
address = mmap(NULL,memsize,PROT_READ|PROT_WRITE,MAP_SHARED,fileno(sfilefd),0);
if(address == MAP_FAILED)
perror("Error: Could not map file to memory:");
}
//printing packet losses and file names
if(prevframenum == 0)
cout << savefilename << endl;
else{
cout << savefilename << "\tpacket loss " << fixed << setprecision(4) << ((currframenum-prevframenum-(packetsPerFrame*framesInFile))/(double)(packetsPerFrame*framesInFile))*100.000 << "%\t\t"
"framenum " << currframenum << "\t\t"
"p " << prevframenum << endl;
@ -485,33 +482,37 @@ int slsReceiverFunctionList::startWriting(){
prevframenum=currframenum;
framesInFile = 0;
}
}
//actual writing from fifo
//pop fifo
if(!fifo->isEmpty()){
if(fifo->pop(wbuf)){
framesCaught++;
totalFramesCaught++;
currframenum = (int)(*((int*)wbuf));//cout<<"curreframenm:"<<currframenum<<endl;
currframenum = (int)(*((int*)wbuf));//cout<<"**************curreframenm:"<<currframenum<<endl;
//write data
if(enableFileWrite){
//write data call back
if (writeReceiverData) {
writeReceiverData(wbuf,bufferSize, sfilefd, pwriteReceiverDataArg);
}
//write data call back
if (cbAction<2) {
rawDataReadyCallBack(currframenum, wbuf,sfilefd, guiData,pRawDataReady);
} else {
}
//default writing to file
else {
if(sfilefd)
fwrite(wbuf, 1, bufferSize, sfilefd);
memcpy(address,wbuf, bufferSize);
else{
cout << "You do not have permissions to overwrite: " << savefilename << endl;
usleep(50000);
}
}
}
@ -527,29 +528,27 @@ int slsReceiverFunctionList::startWriting(){
framesInFile++;
// delete [] dataWriteFrame->buffer;
fifofree->push(wbuf);
}
// delete dataWriteFrame;
}
else{//cout<<"************************fifo empty**********************************"<<endl;
sleepnumber++;
usleep(50000);
}
}
listening_thread_running = 0;
cout << "Total Frames Caught:"<< totalFramesCaught << endl;
//close file
if(sfilefd)
//sync file and close fd, memsize is still for maxframes because of truncate
if(sfilefd){
msync(address,memsize, MS_ASYNC);
munmap(address,memsize);
fclose(sfilefd);
//{ fsync(fileno(sfilefd));fclose(sfilefd);}
}
#ifdef VERBOSE
cout << "sfield:" << (int)sfilefd << endl;
#endif
//acquistion over
//acquistion over call back
if (acquisitionFinishedCallBack)
acquisitionFinishedCallBack(totalFramesCaught, pAcquisitionFinished);

View File

@ -291,7 +291,7 @@ private:
/** short frames */
int shortFrame;
/** buffer size can be 1286*2 or 518 */
/** buffer size can be 1286*2 or 518 or 1286*40 */
int bufferSize;
/** number of packets per frame*/