Modifications for circular FIFO and memory leak corrected done during beamtime - to be tested

git-svn-id: file:///afs/psi.ch/project/sls_det_software/svn/slsDetectorSoftware@447 951219d9-93cf-4727-9268-0efd64621fa3
This commit is contained in:
bergamaschi 2013-02-04 10:13:48 +00:00
parent 09d9e2fd03
commit 247e84ba7b
4 changed files with 105 additions and 31 deletions

View File

@ -1,5 +1,5 @@
CC = g++ CC = g++
CLAGS += -DSLS_RECEIVER_FUNCTION_LIST -DGOTTHARDD CLAGS += -DSLS_RECEIVER_FUNCTION_LIST -DGOTTHARDD
LDLIBS += -lm -lstdc++ -lpthread LDLIBS += -lm -lstdc++ -lpthread
INCLUDES = -I ../MySocketTCP -I ../commonFiles -I . INCLUDES = -I ../MySocketTCP -I ../commonFiles -I .

View File

@ -47,7 +47,9 @@ slsReceiverFunctionList::slsReceiverFunctionList(bool shortfname):
fifo(NULL), fifo(NULL),
shortFrame(-1), shortFrame(-1),
bufferSize(BUFFER_SIZE), bufferSize(BUFFER_SIZE),
packetsPerFrame(2) packetsPerFrame(2),
guiRequiresData(0),
currframenum(0)
{ {
strcpy(savefilename,""); strcpy(savefilename,"");
strcpy(actualfilename,""); strcpy(actualfilename,"");
@ -55,6 +57,30 @@ slsReceiverFunctionList::slsReceiverFunctionList(bool shortfname):
strcpy(fileName,"run"); strcpy(fileName,"run");
eth = new char[MAX_STR_LENGTH]; eth = new char[MAX_STR_LENGTH];
strcpy(eth,""); strcpy(eth,"");
// dataWriteFrame = new dataStruct;
latestData = new char[bufferSize];
fifofree = new CircularFifo<char,FIFO_SIZE>();
fifo = new CircularFifo<dataStruct,FIFO_SIZE>();
mem0=(char*)malloc(4096*FIFO_SIZE);
if (mem0==NULL) {
cout<<"++++++++++++++++++++++ COULD NON ALLOCATE MEMORY!!!!!!!+++++++++++++++++++++" << endl;
}
buffer=mem0;
while (buffer<(mem0+4096*(FIFO_SIZE-1))) {
fifofree->push(buffer);
buffer+=4096;
}
// cout<<"DEFAULT BUFFER SIZE IS "<< BUFFER_SIZE << endl;
// buffer=mem0;
} }
@ -77,7 +103,7 @@ int slsReceiverFunctionList::getFrameIndex(){
if(startFrameIndex==-1) if(startFrameIndex==-1)
frameIndex=0; frameIndex=0;
else if(framesCaught) else if(framesCaught)
frameIndex=((int)(*((int*)latestData)) - startFrameIndex)/packetsPerFrame; frameIndex=(currframenum - startFrameIndex)/packetsPerFrame;
return frameIndex; return frameIndex;
} }
@ -87,7 +113,7 @@ int slsReceiverFunctionList::getAcquisitionIndex(){
if(startAcquisitionIndex==-1) if(startAcquisitionIndex==-1)
acquisitionIndex=0; acquisitionIndex=0;
else if(framesCaught) else if(framesCaught)
acquisitionIndex=((int)(*((int*)latestData)) - startAcquisitionIndex)/packetsPerFrame; acquisitionIndex=(currframenum - startAcquisitionIndex)/packetsPerFrame;
return acquisitionIndex; return acquisitionIndex;
} }
@ -147,14 +173,13 @@ int slsReceiverFunctionList::startReceiver(){
cout << "Starting new acquisition threadddd ...." << endl; cout << "Starting new acquisition threadddd ...." << endl;
listening_thread_running=1; listening_thread_running=1;
fifo = new CircularFifo<dataStruct,FIFO_SIZE>();
//error creating writing thread //error creating writing thread
err = pthread_create(&writing_thread, NULL,startWritingThread, (void*) this); err = pthread_create(&writing_thread, NULL,startWritingThread, (void*) this);
if(err){ if(err){
listening_thread_running=0; listening_thread_running=0;
status = IDLE; status = IDLE;
if(fifo) delete fifo; // if(fifo) delete fifo;
cout << "Cant create writing thread. Status:" << status << endl << endl; cout << "Cant create writing thread. Status:" << status << endl << endl;
return FAIL; return FAIL;
} }
@ -169,7 +194,7 @@ int slsReceiverFunctionList::startReceiver(){
status = IDLE; status = IDLE;
//stop writing thread //stop writing thread
pthread_join(writing_thread,NULL); pthread_join(writing_thread,NULL);
if(fifo) delete fifo; // if(fifo) delete fifo;
cout << endl << "Cant create listening thread. Status:" << status << endl << endl; cout << endl << "Cant create listening thread. Status:" << status << endl << endl;
return FAIL; return FAIL;
} }
@ -200,7 +225,7 @@ int slsReceiverFunctionList::stopReceiver(){
//stop writing thread //stop writing thread
pthread_join(writing_thread,NULL); pthread_join(writing_thread,NULL);
if(fifo) delete fifo; // if(fifo) delete fifo;
//if(latestData) delete latestData;/**new*/ //if(latestData) delete latestData;/**new*/
} }
cout << "Status:" << status << endl; cout << "Status:" << status << endl;
@ -228,7 +253,7 @@ void* slsReceiverFunctionList::startListeningThread(void* this_pointer){
int slsReceiverFunctionList::startListening(){ int slsReceiverFunctionList::startListening(){
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout << "In startListening()\n"); i cout << "In startListening()\n");
#endif #endif
// Variable and structure definitions // Variable and structure definitions
int rc; int rc;
@ -246,13 +271,13 @@ int slsReceiverFunctionList::startListening(){
if(!strlen(eth)){ //if(!strlen(eth)){
cout<<"warning:eth is empty.listening to all"<<endl; cout<<"warning:eth is empty.listening to all"<<endl;
udpSocket = new genericSocket(server_port,genericSocket::UDP,bufferSize/packetsPerFrame,packetsPerFrame); udpSocket = new genericSocket(server_port,genericSocket::UDP,bufferSize/packetsPerFrame,packetsPerFrame);
}else{ //}else{
cout<<"eth:"<<eth<<endl; // cout<<"eth:"<<eth<<endl;
udpSocket = new genericSocket(server_port,genericSocket::UDP,bufferSize/packetsPerFrame,packetsPerFrame,eth); // udpSocket = new genericSocket(server_port,genericSocket::UDP,bufferSize/packetsPerFrame,packetsPerFrame,eth);
} //}
if (udpSocket->getErrorStatus()){ if (udpSocket->getErrorStatus()){
#ifdef VERBOSE #ifdef VERBOSE
@ -265,10 +290,11 @@ int slsReceiverFunctionList::startListening(){
while (listening_thread_running) { while (listening_thread_running) {
status = RUNNING; status = RUNNING;
if (!fifofree->isEmpty()) {
fifofree->pop(buffer);
buffer = new char[bufferSize];
//receiver 2 half frames //receiver 2 half frames
rc = udpSocket->ReceiveDataOnly(buffer,sizeof(buffer)); rc = udpSocket->ReceiveDataOnly(buffer,bufferSize);//sizeof(buffer));
if( rc < 0) if( rc < 0)
cerr << "recvfrom() failed" << endl; cerr << "recvfrom() failed" << endl;
@ -282,6 +308,7 @@ int slsReceiverFunctionList::startListening(){
//start of acquisition //start of acquisition
if(startAcquisitionIndex==-1){ if(startAcquisitionIndex==-1){
startAcquisitionIndex=startFrameIndex; startAcquisitionIndex=startFrameIndex;
currframenum =startAcquisitionIndex;
cout<<"startAcquisitionIndex:"<<startAcquisitionIndex<<endl; cout<<"startAcquisitionIndex:"<<startAcquisitionIndex<<endl;
} }
@ -300,8 +327,11 @@ int slsReceiverFunctionList::startListening(){
//cout<<"read buffer:"<<dataReadFrame<<endl; //cout<<"read buffer:"<<dataReadFrame<<endl;
fifo->push(dataReadFrame); fifo->push(dataReadFrame);
} }
}
}
}
} }
} while (listening_thread_running); } while (listening_thread_running);
listening_thread_running=0; listening_thread_running=0;
@ -337,21 +367,20 @@ int slsReceiverFunctionList::startWriting(){
cout << "In startWriting()" <<endl; cout << "In startWriting()" <<endl;
#endif #endif
// Variable and structure definitions // Variable and structure definitions
int currframenum,ret; int ret,sleepnumber=0;//currframenum,ret;
dataStruct *dataWriteFrame; // dataStruct *dataWriteFrame;
//reset variables for each acquisition //reset variables for each acquisition
framesInFile=0; framesInFile=0;
framesCaught=0; framesCaught=0;
frameIndex=0; frameIndex=0;
latestData = new char[bufferSize];
if(sfilefd) sfilefd=0; if(sfilefd) sfilefd=0;
strcpy(savefilename,""); strcpy(savefilename,"");
strcpy(actualfilename,""); strcpy(actualfilename,"");
cout << "Max Frames Per File:" << maxFramesPerFile << endl;
cout << "Ready!" << endl; cout << "Ready!" << endl;
if(enableFileWrite){ if(enableFileWrite){
@ -388,10 +417,10 @@ int slsReceiverFunctionList::startWriting(){
cout << "saving to " << actualfilename << "\t"; cout << "saving to " << actualfilename << "\t";
} }
currframenum=(int)(*((int*)latestData)); //currframenum=(int)(*((int*)latestData));
cout << "packet loss " << fixed << setprecision(4) << ((currframenum-prevframenum-(packetsPerFrame*framesInFile))/(double)(packetsPerFrame*framesInFile))*100.000 << "%\t\t" cout << "packet loss " << fixed << setprecision(4) << ((currframenum-prevframenum-(packetsPerFrame*framesInFile))/(double)(packetsPerFrame*framesInFile))*100.000 << "%\t\t"
"framenum " << currframenum << "\t\t" "framenum " << currframenum << "\t\t"
"p " << prevframenum << endl; "p " << prevframenum << " sleep:" << sleepnumber << endl;
prevframenum=currframenum; prevframenum=currframenum;
framesInFile = 0; framesInFile = 0;
@ -399,22 +428,29 @@ int slsReceiverFunctionList::startWriting(){
//actual writing from fifo //actual writing from fifo
if(!fifo->isEmpty()){ if(!fifo->isEmpty()){
// dataWriteFrame = new dataStruct;
dataWriteFrame = new dataStruct;
if(fifo->pop(dataWriteFrame)){ if(fifo->pop(dataWriteFrame)){
//cout<<"write buffer:"<<dataWriteFrame<<endl<<endl; //cout<<"write buffer:"<<dataWriteFrame<<endl<<endl;
framesCaught++; framesCaught++;
totalFramesCaught++; totalFramesCaught++;
memcpy(latestData,dataWriteFrame->buffer,bufferSize); currframenum = (int)(*((int*)dataWriteFrame->buffer));
if(guiRequiresData){
memcpy(latestData,dataWriteFrame->buffer,bufferSize);
guiRequiresData=0;
}
//cout<<"write index:"<<(int)(*(int*)latestData)<<endl; //cout<<"write index:"<<(int)(*(int*)latestData)<<endl;
if(enableFileWrite) if(enableFileWrite)
fwrite(dataWriteFrame->buffer, 1, dataWriteFrame->rc, sfilefd); fwrite(dataWriteFrame->buffer, 1, dataWriteFrame->rc, sfilefd);
framesInFile++; framesInFile++;
delete dataWriteFrame->buffer; ///ANNA?!?!??!
delete dataWriteFrame; // delete [] dataWriteFrame->buffer;
fifofree->push(dataWriteFrame->buffer);
} }
delete dataWriteFrame;
} }
else{sleepnumber++;sleepnumber++;sleepnumber++;//cout<<"fifo empty, usleep"<<endl;
usleep(50000);
}
} }
cout << "Total Frames Caught:"<< totalFramesCaught << endl; cout << "Total Frames Caught:"<< totalFramesCaught << endl;
@ -424,7 +460,6 @@ int slsReceiverFunctionList::startWriting(){
#ifdef VERBOSE #ifdef VERBOSE
cout << "sfield:" << (int)sfilefd << endl; cout << "sfield:" << (int)sfilefd << endl;
#endif #endif
return 0; return 0;
} }
@ -436,6 +471,23 @@ int slsReceiverFunctionList::startWriting(){
char* slsReceiverFunctionList::readFrame(char* c){ char* slsReceiverFunctionList::readFrame(char* c){
//ask for data
guiRequiresData=1;
//wait for it to be ready, not indefinitely
int i=0;
for(i=0;i<10;i++){
if(guiRequiresData)
usleep(100000);
else
break;
}
//reset it back if not already reset
guiRequiresData=0;
//if no more data //if(guiRequiresData) // retun NULL;
//cout<<"latestdata:"<<(int)(*(int*)latestData)<<endl; //cout<<"latestdata:"<<(int)(*(int*)latestData)<<endl;
strcpy(c,savefilename); strcpy(c,savefilename);
return latestData; return latestData;

View File

@ -246,6 +246,12 @@ private:
/** Receiver buffer */ /** Receiver buffer */
char* buffer; char* buffer;
/** Receiver buffer */
char *mem0, *memfull;
/** latest data */ /** latest data */
char* latestData; char* latestData;
@ -266,6 +272,8 @@ private:
/** circular fifo to read and write data*/ /** circular fifo to read and write data*/
CircularFifo<dataStruct,FIFO_SIZE>* fifo; CircularFifo<dataStruct,FIFO_SIZE>* fifo;
/** circular fifo to read and write data*/
CircularFifo<char,FIFO_SIZE>* fifofree;
/** short frames */ /** short frames */
int shortFrame; int shortFrame;
@ -276,12 +284,20 @@ private:
/** number of packets per frame*/ /** number of packets per frame*/
int packetsPerFrame; int packetsPerFrame;
/** gui wants data */
int guiRequiresData;
/** current frame number */
int currframenum;
public: public:
/** File Descriptor */ /** File Descriptor */
static FILE *sfilefd; static FILE *sfilefd;
/** if the listening thread is running*/ /** if the listening thread is running*/
static int listening_thread_running; static int listening_thread_running;
dataStruct *dataWriteFrame;
}; };

View File

@ -781,6 +781,12 @@ int slsReceiverFuncs::read_frame(){
socket->SendDataOnly(retval,DATA_BYTES); socket->SendDataOnly(retval,DATA_BYTES);
} }
//return ok/fail //return ok/fail
///ADDED BY ANNA?!?!?!?
delete [] retval;
delete [] origVal;
return ret; return ret;
} }