mirror of
https://github.com/slsdetectorgroup/slsDetectorPackage.git
synced 2025-06-18 15:57:13 +02:00
included exitReceiver, using different threads to listen and write packets in receiver, edited circularfifo to use pointer references, and acquire returns frames caught
git-svn-id: file:///afs/psi.ch/project/sls_det_software/svn/slsDetectorSoftware@362 951219d9-93cf-4727-9268-0efd64621fa3
This commit is contained in:
@ -15,6 +15,8 @@
|
||||
#include <arpa/inet.h> // sock_addr_in, htonl, INADDR_ANY
|
||||
#include <stdlib.h> // exit()
|
||||
|
||||
#include <iomanip> //set precision
|
||||
|
||||
|
||||
#include <string.h>
|
||||
#include <iostream>
|
||||
@ -35,15 +37,18 @@ slsReceiverFunctionList::slsReceiverFunctionList(bool shortfname):
|
||||
startAcquisitionIndex(-1),
|
||||
acquisitionIndex(0),
|
||||
framesInFile(0),
|
||||
prevframenum(0),
|
||||
status(IDLE),
|
||||
latestData(NULL),
|
||||
udpSocket(NULL),
|
||||
server_port(DEFAULT_UDP_PORT)
|
||||
server_port(DEFAULT_UDP_PORT),
|
||||
fifo(NULL)
|
||||
{
|
||||
strcpy(savefilename,"");
|
||||
strcpy(actualfilename,"");
|
||||
strcpy(filePath,"");
|
||||
strcpy(fileName,"run");
|
||||
strcpy(buffer,"");
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -52,7 +57,7 @@ int slsReceiverFunctionList::getFrameIndex(){
|
||||
if(startFrameIndex==-1)
|
||||
frameIndex=0;
|
||||
else
|
||||
frameIndex=((int)(*((int*)buffer)) - startFrameIndex)/2;
|
||||
frameIndex=((int)(*((int*)latestData)) - startFrameIndex)/2;
|
||||
return frameIndex;
|
||||
}
|
||||
|
||||
@ -62,7 +67,7 @@ int slsReceiverFunctionList::getAcquisitionIndex(){
|
||||
if(startAcquisitionIndex==-1)
|
||||
acquisitionIndex=0;
|
||||
else
|
||||
acquisitionIndex=((int)(*((int*)buffer)) - startAcquisitionIndex)/2;
|
||||
acquisitionIndex=((int)(*((int*)latestData)) - startAcquisitionIndex)/2;
|
||||
return acquisitionIndex;
|
||||
}
|
||||
|
||||
@ -112,18 +117,38 @@ int slsReceiverFunctionList::startReceiver(){
|
||||
#endif
|
||||
int err = 0;
|
||||
if(!listening_thread_running){
|
||||
printf("Starting new acquisition threadddd ....\n");
|
||||
cout << "Starting new acquisition threadddd ...." << endl;
|
||||
listening_thread_running=1;
|
||||
err = pthread_create(&listening_thread, NULL,startListeningThread, (void*) this);
|
||||
if(!err){
|
||||
while(status!=RUNNING);
|
||||
cout << endl << "Thread created successfully." << endl;
|
||||
}else{
|
||||
|
||||
fifo = new CircularFifo<dataStruct,FIFO_SIZE>();
|
||||
|
||||
//error creating writing thread
|
||||
err = pthread_create(&writing_thread, NULL,startWritingThread, (void*) this);
|
||||
if(err){
|
||||
listening_thread_running=0;
|
||||
status = IDLE;
|
||||
cout << endl << "Cant create thread. Status:" << status << endl;
|
||||
if(fifo) delete fifo;
|
||||
cout << "Cant create writing thread. Status:" << status << endl << endl;
|
||||
return FAIL;
|
||||
}
|
||||
cout << "Writing thread created successfully." << endl;
|
||||
|
||||
|
||||
//error creating listenign thread
|
||||
err = 0;
|
||||
err = pthread_create(&listening_thread, NULL,startListeningThread, (void*) this);
|
||||
if(err){
|
||||
listening_thread_running=0;
|
||||
status = IDLE;
|
||||
//stop writing thread
|
||||
pthread_join(writing_thread,NULL);
|
||||
if(fifo) delete fifo;
|
||||
cout << endl << "Cant create listening thread. Status:" << status << endl << endl;
|
||||
return FAIL;
|
||||
}
|
||||
|
||||
while(status!=RUNNING);
|
||||
cout << "Listening thread created successfully." << endl;
|
||||
}
|
||||
|
||||
return OK;
|
||||
@ -140,14 +165,19 @@ int slsReceiverFunctionList::stopReceiver(){
|
||||
|
||||
if(listening_thread_running){
|
||||
cout << "Stopping new acquisition threadddd ...." << endl;
|
||||
//stop thread
|
||||
//stop listening thread
|
||||
listening_thread_running=0;
|
||||
udpSocket->ShutDownSocket();
|
||||
pthread_join(listening_thread,NULL);
|
||||
status = IDLE;
|
||||
|
||||
//stop writing thread
|
||||
pthread_join(writing_thread,NULL);
|
||||
if(fifo) delete fifo;
|
||||
}
|
||||
cout << "Status:" << status << endl;
|
||||
|
||||
|
||||
return OK;
|
||||
}
|
||||
|
||||
@ -161,33 +191,26 @@ void* slsReceiverFunctionList::startListeningThread(void* this_pointer){
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
int slsReceiverFunctionList::startListening(){
|
||||
#ifdef VERYVERBOSE
|
||||
cout << "In startListening()\n");
|
||||
#endif
|
||||
|
||||
// Variable and structure definitions
|
||||
int rc, currframenum, prevframenum;
|
||||
int rc;
|
||||
dataStruct *dataReadFrame;
|
||||
|
||||
//reset variables for each acquisition
|
||||
framesInFile=0;
|
||||
framesCaught=0;
|
||||
startFrameIndex=-1;
|
||||
frameIndex=0;
|
||||
shortFileNameIndex=1;
|
||||
|
||||
|
||||
//create file name
|
||||
if(!frameIndexNeeded)
|
||||
sprintf(savefilename, "%s/%s_%d.raw", filePath,fileName,fileIndex);
|
||||
else
|
||||
sprintf(savefilename, "%s/%s_f%012d_%d.raw", filePath,fileName,framesCaught,fileIndex);
|
||||
|
||||
if(!shortFileName)
|
||||
strcpy(actualfilename,savefilename);
|
||||
else
|
||||
sprintf(actualfilename, "%s/%s_%d_%09d.raw", filePath,fileName,fileIndex, 0);
|
||||
|
||||
// A do/while(FALSE) loop is used to make error cleanup easier. The
|
||||
// close() of each of the socket descriptors is only done once at the
|
||||
// very end of the program.
|
||||
@ -198,60 +221,42 @@ int slsReceiverFunctionList::startListening(){
|
||||
break;
|
||||
}
|
||||
|
||||
sfilefd = fopen((const char *) (actualfilename), "w");
|
||||
cout << "Saving to " << actualfilename << ". Ready! " << endl;
|
||||
|
||||
while (listening_thread_running) {
|
||||
|
||||
if (framesInFile == 20000) {
|
||||
fclose(sfilefd);
|
||||
|
||||
currframenum=(int)(*((int*)buffer));
|
||||
//create file name
|
||||
if(!frameIndexNeeded)
|
||||
sprintf(savefilename, "%s/%s_%d.raw", filePath,fileName,fileIndex);
|
||||
else
|
||||
sprintf(savefilename, "%s/%s_f%012d_%d.raw", filePath,fileName,framesCaught,fileIndex);
|
||||
|
||||
if(!shortFileName)
|
||||
strcpy(actualfilename,savefilename);
|
||||
else
|
||||
sprintf(actualfilename, "%s/%s_%d_%09d.raw", filePath,fileName,fileIndex, shortFileNameIndex);
|
||||
|
||||
shortFileNameIndex++;
|
||||
|
||||
cout << "saving to " << actualfilename << "\t\t"
|
||||
"packet loss " << ((currframenum-prevframenum-(2*framesInFile))/(double)(2*framesInFile))*100.000 << "%\t\t"
|
||||
"framenum " << currframenum << endl;
|
||||
sfilefd = fopen((const char *) (actualfilename), "w");
|
||||
prevframenum=currframenum;
|
||||
framesInFile = 0;
|
||||
}
|
||||
status = RUNNING;
|
||||
|
||||
|
||||
buffer = new char[BUFFER_SIZE];
|
||||
//receiver 2 half frames
|
||||
rc = udpSocket->ReceiveDataOnly(buffer,sizeof(buffer));
|
||||
if( rc < 0)
|
||||
cerr << "recvfrom() failed" << endl;
|
||||
|
||||
//for each scan
|
||||
//start for each scan
|
||||
if(startFrameIndex==-1){
|
||||
startFrameIndex=(int)(*((int*)buffer))-2;
|
||||
//cout<<"startFrameIndex:"<<startFrameIndex<<endl;
|
||||
prevframenum=startFrameIndex;
|
||||
}
|
||||
//start of acquisition
|
||||
if(startAcquisitionIndex==-1)
|
||||
if(startAcquisitionIndex==-1){
|
||||
startAcquisitionIndex=startFrameIndex;
|
||||
//cout<<"startAcquisitionIndex:"<<startAcquisitionIndex<<endl;
|
||||
}
|
||||
|
||||
|
||||
|
||||
//so that it doesnt write the last frame twice
|
||||
if(listening_thread_running){
|
||||
fwrite(buffer, 1, rc, sfilefd);
|
||||
framesInFile++;
|
||||
framesCaught++;
|
||||
totalFramesCaught++;
|
||||
//s.assign(buffer);
|
||||
if(fifo->isFull())
|
||||
cout<<"**********************FIFO FULLLLLLLL************************"<<endl;
|
||||
else{
|
||||
dataReadFrame = new dataStruct;
|
||||
dataReadFrame->buffer=buffer;
|
||||
dataReadFrame->rc=rc;
|
||||
//cout<<"read buffer:"<<dataReadFrame<<endl;
|
||||
fifo->push(dataReadFrame);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -262,10 +267,108 @@ int slsReceiverFunctionList::startListening(){
|
||||
//Close down any open socket descriptors
|
||||
udpSocket->Disconnect();
|
||||
|
||||
//close file
|
||||
fclose(sfilefd);
|
||||
|
||||
#ifdef VERBOSE
|
||||
cout << "listening_thread_running:" << listening_thread_running << endl;
|
||||
#endif
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
void* slsReceiverFunctionList::startWritingThread(void* this_pointer){
|
||||
((slsReceiverFunctionList*)this_pointer)->startWriting();
|
||||
|
||||
return this_pointer;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
int slsReceiverFunctionList::startWriting(){
|
||||
#ifdef VERYVERBOSE
|
||||
cout << "In startWriting()" <<endl;
|
||||
#endif
|
||||
// Variable and structure definitions
|
||||
int currframenum,ret;
|
||||
dataStruct *dataWriteFrame;
|
||||
|
||||
//reset variables for each acquisition
|
||||
framesInFile=0;
|
||||
framesCaught=0;
|
||||
frameIndex=0;
|
||||
|
||||
|
||||
latestData = new char[BUFFER_SIZE];
|
||||
|
||||
//create file name
|
||||
if(!frameIndexNeeded) sprintf(savefilename, "%s/%s_%d.raw", filePath,fileName,fileIndex);
|
||||
else sprintf(savefilename, "%s/%s_f%012d_%d.raw", filePath,fileName,framesCaught,fileIndex);
|
||||
|
||||
//for sebastian
|
||||
if(!shortFileName) strcpy(actualfilename,savefilename);
|
||||
else sprintf(actualfilename, "%s/%s_%d_%09d.raw", filePath,fileName,fileIndex, 0);
|
||||
|
||||
//start writing
|
||||
sfilefd = fopen((const char *) (actualfilename), "w");
|
||||
cout << "Saving to " << actualfilename << ". Ready! " << endl;
|
||||
|
||||
|
||||
while(listening_thread_running){
|
||||
|
||||
//when it reaches 20000,start writing new file
|
||||
if (framesInFile == 20000) {
|
||||
fclose(sfilefd);
|
||||
|
||||
//create file name
|
||||
if(!frameIndexNeeded) sprintf(savefilename, "%s/%s_%d.raw", filePath,fileName,fileIndex);
|
||||
else sprintf(savefilename, "%s/%s_f%012d_%d.raw", filePath,fileName,framesCaught,fileIndex);
|
||||
//for sebastian
|
||||
if(!shortFileName) strcpy(actualfilename,savefilename);
|
||||
else sprintf(actualfilename, "%s/%s_%d_%09d.raw", filePath,fileName,fileIndex, shortFileNameIndex);
|
||||
shortFileNameIndex++;
|
||||
|
||||
|
||||
currframenum=(int)(*((int*)latestData));
|
||||
cout << "saving to " << actualfilename << "\t"
|
||||
"packet loss " << fixed << setprecision(4) << ((currframenum-prevframenum-(2*framesInFile))/(double)(2*framesInFile))*100.000 << "%\t\t"
|
||||
"framenum " << currframenum << "\t\t"
|
||||
"p " << prevframenum << endl;
|
||||
|
||||
//start writing in new file
|
||||
sfilefd = fopen((const char *) (actualfilename), "w");
|
||||
prevframenum=currframenum;
|
||||
framesInFile = 0;
|
||||
}
|
||||
|
||||
//actual writing from fifo
|
||||
if(!fifo->isEmpty()){
|
||||
|
||||
dataWriteFrame = new dataStruct;
|
||||
if(fifo->pop(dataWriteFrame)){
|
||||
//cout<<"write buffer:"<<dataWriteFrame<<endl<<endl;
|
||||
framesCaught++;
|
||||
totalFramesCaught++;
|
||||
memcpy(latestData,dataWriteFrame->buffer,BUFFER_SIZE);
|
||||
fwrite(dataWriteFrame->buffer, 1, dataWriteFrame->rc, sfilefd);
|
||||
framesInFile++;
|
||||
delete dataWriteFrame->buffer;
|
||||
delete dataWriteFrame;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cout << "Total Frames Caught:"<< totalFramesCaught << endl;
|
||||
//delete latestData;
|
||||
//close file
|
||||
fclose(sfilefd);
|
||||
|
||||
#ifdef VERBOSE
|
||||
cout << "sfield:" << (int)sfilefd << endl;
|
||||
#endif
|
||||
|
||||
@ -277,9 +380,13 @@ int slsReceiverFunctionList::startListening(){
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
char* slsReceiverFunctionList::readFrame(char* c){
|
||||
//cout<<"latestdata:"<<(int)(*(int*)latestData)<<endl;
|
||||
strcpy(c,savefilename);
|
||||
return buffer;
|
||||
return latestData;
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user