receiver modified: fifofreed using callback, last fifo freed after all the process, using locks for fifofree, updating currentframenum only in singlephoton if datacompression

git-svn-id: file:///afs/psi.ch/project/sls_det_software/svn/slsDetectorSoftware@701 951219d9-93cf-4727-9268-0efd64621fa3
This commit is contained in:
l_maliakal_d 2013-11-29 11:39:22 +00:00
parent 03d0d5e6cd
commit adb500c4a0
5 changed files with 73 additions and 30 deletions

View File

@ -13,7 +13,7 @@
singlePhotonFilter::singlePhotonFilter(int nx, int ny, singlePhotonFilter::singlePhotonFilter(int nx, int ny,
int fmask, int pmask, int foffset, int poffset, int pperf, int iValue, int fmask, int pmask, int foffset, int poffset, int pperf, int iValue,
int16_t *m, int16_t *s, CircularFifo<char>* f, int d, int* tfcaught, int* fcaught): int16_t *m, int16_t *s, CircularFifo<char>* f, int d, int* tfcaught, int* fcaught,uint32_t* cframenum):
#ifdef MYROOT1 #ifdef MYROOT1
myTree(NULL), myTree(NULL),
myFile(NULL), myFile(NULL),
@ -57,6 +57,7 @@ singlePhotonFilter::singlePhotonFilter(int nx, int ny,
fifo(f), fifo(f),
totalFramesCaught(tfcaught), totalFramesCaught(tfcaught),
framesCaught(fcaught), framesCaught(fcaught),
currentframenum(cframenum),
freeFifoCallBack(NULL), freeFifoCallBack(NULL),
pFreeFifo(NULL){ pFreeFifo(NULL){
#ifndef MYROOT1 #ifndef MYROOT1
@ -93,6 +94,7 @@ singlePhotonFilter::singlePhotonFilter(int nx, int ny,
pthread_mutex_init(&write_mutex,NULL); pthread_mutex_init(&write_mutex,NULL);
pthread_mutex_init(&running_mutex,NULL); pthread_mutex_init(&running_mutex,NULL);
pthread_mutex_init(&frnum_mutex,NULL);
@ -264,6 +266,7 @@ void singlePhotonFilter::setupAcquisitionParameters(char *outfpath, char* outfna
fnum = 0; pnum = 0; ptot = 0; f0 = 0; firstTime = true; currentThread = -1; fnum = 0; pnum = 0; ptot = 0; f0 = 0; firstTime = true; currentThread = -1;
*framesCaught = 0; *framesCaught = 0;
*currentframenum = 0;
//initialize //initialize
for (int ir=0; ir<nChannelsX; ir++){ for (int ir=0; ir<nChannelsX; ir++){
@ -449,6 +452,11 @@ void singlePhotonFilter::findHits(){
cout<<"got data semwait:["<<index<<"]:"<<dum<<endl; cout<<"got data semwait:["<<index<<"]:"<<dum<<endl;
isData += HEADER_SIZE_NUM_FRAMES; isData += HEADER_SIZE_NUM_FRAMES;
if(clusteriframe > *currentframenum){
pthread_mutex_lock(&frnum_mutex);
*currentframenum = clusteriframe;
pthread_mutex_unlock(&frnum_mutex);
}
//for all the frames in one buffer //for all the frames in one buffer
for (i=0; i < numFramesAlloted[index]; ++i){ for (i=0; i < numFramesAlloted[index]; ++i){
@ -462,6 +470,13 @@ void singlePhotonFilter::findHits(){
isData += HEADER_SIZE_NUM_PACKETS; isData += HEADER_SIZE_NUM_PACKETS;
clusteriframe = (((uint32_t)(*((uint32_t*)(isData)))& frame_index_mask) >>frame_index_offset); clusteriframe = (((uint32_t)(*((uint32_t*)(isData)))& frame_index_mask) >>frame_index_offset);
//progress
if((clusteriframe + PROGRESS_INCREMENT) > *currentframenum){
pthread_mutex_lock(&frnum_mutex);
*currentframenum = clusteriframe;
pthread_mutex_unlock(&frnum_mutex);
}
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout << "scurrframnum:" << clusteriframe << endl; cout << "scurrframnum:" << clusteriframe << endl;
#endif #endif

View File

@ -81,8 +81,9 @@ public:
* @param s mask as to which adcs are inverted * @param s mask as to which adcs are inverted
* @param f circular fifo buffer, which needs to be freed * @param f circular fifo buffer, which needs to be freed
* @param d Size of data with the headers * @param d Size of data with the headers
* @param tfcaught pointer to total frames caught * @param tfcaught pointer to total frames caught- needs updation for client
* @param fcaught pointer to frames caught * @param fcaught pointer to frames caught - needs updation for client
* @param cframenum pointer to currentframe num- needs updation for progress for gui
*/ */
singlePhotonFilter( singlePhotonFilter(
int nx, int nx,
@ -98,7 +99,8 @@ public:
CircularFifo<char>* f, CircularFifo<char>* f,
int d, int d,
int* tfcaught, int* tfcaught,
int* fcaught); int* fcaught,
uint32_t* cframenum);
/** virtual destructor */ /** virtual destructor */
virtual ~singlePhotonFilter(); virtual ~singlePhotonFilter();
@ -359,7 +361,9 @@ private:
volatile int threads_mask; volatile int threads_mask;
pthread_mutex_t write_mutex; pthread_mutex_t write_mutex;
pthread_mutex_t running_mutex; pthread_mutex_t running_mutex;
pthread_mutex_t frnum_mutex;
static const int PROGRESS_INCREMENT = 50;
/** current thread the job being allotted to */ /** current thread the job being allotted to */
int currentThread; int currentThread;
@ -408,6 +412,9 @@ private:
/** frames caught */ /** frames caught */
int* framesCaught; int* framesCaught;
/** current frame number */
uint32_t* currentframenum;
/** call back function */ /** call back function */
void (*freeFifoCallBack)(char*, void*); void (*freeFifoCallBack)(char*, void*);

View File

@ -18,6 +18,7 @@
//#include "sls_detector_defs.h" //#include "sls_detector_defs.h"
#include <semaphore.h> #include <semaphore.h>
#include <vector> #include <vector>
#include <iostream>
using namespace std; using namespace std;
typedef double double32_t; typedef double double32_t;
@ -45,6 +46,8 @@ public:
bool isEmpty() const; bool isEmpty() const;
bool isFull() const; bool isFull() const;
int getSemValue();
private: private:
volatile unsigned int tail; // input index volatile unsigned int tail; // input index
vector <Element*> array; vector <Element*> array;
@ -55,7 +58,13 @@ private:
unsigned int increment(unsigned int idx_) const; unsigned int increment(unsigned int idx_) const;
}; };
template<typename Element>
int CircularFifo<Element>::getSemValue()
{
int value;
sem_getvalue(&free_mutex, &value);
return value;
}
/** Producer only: Adds item to the circular queue. /** Producer only: Adds item to the circular queue.
@ -67,6 +76,7 @@ private:
template<typename Element> template<typename Element>
bool CircularFifo<Element>::push(Element*& item_) bool CircularFifo<Element>::push(Element*& item_)
{ {
int nextTail = increment(tail); int nextTail = increment(tail);
if(nextTail != head) if(nextTail != head)
{ {

View File

@ -155,7 +155,7 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
filter = new singlePhotonFilter(x,y, MOENCH_FRAME_INDEX_MASK, MOENCH_PACKET_INDEX_MASK, MOENCH_FRAME_INDEX_OFFSET, filter = new singlePhotonFilter(x,y, MOENCH_FRAME_INDEX_MASK, MOENCH_PACKET_INDEX_MASK, MOENCH_FRAME_INDEX_OFFSET,
0, MOENCH_PACKETS_PER_FRAME, 0,map, mask,fifofree,MOENCH_BUFFER_SIZE,&totalFramesCaught,&framesCaught); 0, MOENCH_PACKETS_PER_FRAME, 0,map, mask,fifofree,MOENCH_BUFFER_SIZE,&totalFramesCaught,&framesCaught,&currframenum);
break; break;
default: default:
x = 1; x = 1;
@ -185,7 +185,7 @@ slsReceiverFunctionList::slsReceiverFunctionList(detectorType det):
filter = new singlePhotonFilter(x,y,GOTTHARD_FRAME_INDEX_MASK, GOTTHARD_PACKET_INDEX_MASK, GOTTHARD_FRAME_INDEX_OFFSET, filter = new singlePhotonFilter(x,y,GOTTHARD_FRAME_INDEX_MASK, GOTTHARD_PACKET_INDEX_MASK, GOTTHARD_FRAME_INDEX_OFFSET,
0, GOTTHARD_PACKETS_PER_FRAME, 1,map, mask,fifofree,GOTTHARD_BUFFER_SIZE,&totalFramesCaught,&framesCaught); 0, GOTTHARD_PACKETS_PER_FRAME, 1,map, mask,fifofree,GOTTHARD_BUFFER_SIZE,&totalFramesCaught,&framesCaught,&currframenum);
break; break;
} }
@ -544,8 +544,11 @@ int slsReceiverFunctionList::startListening(){
framesCount = 0; framesCount = 0;
packetsCount = 0; packetsCount = 0;
//pop freefifo //pop freefifo
if(!fifofree->isEmpty()) /*while(fifofree->isEmpty());*/
fifofree->pop(buffer); fifofree->pop(buffer);
#ifdef VERYVERBOSE
cout << "lbuf1 popped:" << (void*)buffer << endl;
#endif
//increment offsets //increment offsets
offset = HEADER_SIZE_NUM_FRAMES; offset = HEADER_SIZE_NUM_FRAMES;
offset += HEADER_SIZE_NUM_PACKETS; offset += HEADER_SIZE_NUM_PACKETS;
@ -580,9 +583,9 @@ int slsReceiverFunctionList::startListening(){
//receive 1 packet //receive 1 packet
rc = udpSocket->ReceiveDataOnly(buffer+offset,oneBufferSize); rc = udpSocket->ReceiveDataOnly(buffer+offset,oneBufferSize);
if( rc <= 0){ if( rc <= 0){
//#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cerr << "recvfrom() failed:"<<endl; cerr << "recvfrom() failed:"<<endl;
//#endif #endif
if(status != TRANSMITTING){ if(status != TRANSMITTING){
continue; continue;
} }
@ -692,6 +695,8 @@ int slsReceiverFunctionList::startListening(){
framesCount++; framesCount++;
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
totalcount++; totalcount++;
#endif
#ifdef VERYVERBOSE
cout<<"lcurrframnum:"<< dec<< cout<<"lcurrframnum:"<< dec<<
(((uint32_t)(*((uint32_t*)(buffer+offset))) & frameIndexMask) >> frameIndexOffset)<<"*"<<endl; (((uint32_t)(*((uint32_t*)(buffer+offset))) & frameIndexMask) >> frameIndexOffset)<<"*"<<endl;
#endif #endif
@ -803,7 +808,7 @@ int slsReceiverFunctionList::startWriting(){
cout << "Ready!" << endl; cout << "Ready!" << endl;
//will always run till acquisition over and then runs till fifo is empty //will always run till acquisition over and then runs till fifo is empty
while(receiver_threads_running || (!fifo->isEmpty())){ while(receiver_threads_running){// || (!fifo->isEmpty())){
//pop fifo //pop fifo
if(fifo->pop(wbuf)){ if(fifo->pop(wbuf)){
@ -817,12 +822,12 @@ int slsReceiverFunctionList::startWriting(){
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout << "popped last dummy frame:" << (void*)wbuf << endl; cout << "popped last dummy frame:" << (void*)wbuf << endl;
#endif #endif
fifofree->push(wbuf); //fifofree->push(wbuf);
/* /*
cout <<"fifofree is full?:" << fifofree->isFull()<<endl; cout <<"fifofree is full?:" << fifofree->isFull()<<endl;
cout <<"fifo is empty?:" << fifo->isEmpty()<<endl; cout <<"fifo is empty?:" << fifo->isEmpty()<<endl;
*/ */
if(status == TRANSMITTING)cout<<"transmitting as well"<<endl; else cout <<"not transmitting"<<endl; //if(status == TRANSMITTING)cout<<"transmitting as well"<<endl; else cout <<"not transmitting"<<endl;
if(fifo->isEmpty() && status == TRANSMITTING){ if(fifo->isEmpty() && status == TRANSMITTING){
//data compression //data compression
if(dataCompression){ if(dataCompression){
@ -830,6 +835,10 @@ int slsReceiverFunctionList::startWriting(){
while(!filter->checkIfJobsDone()) while(!filter->checkIfJobsDone())
usleep(50000); usleep(50000);
} }
#ifdef VERYVERBOSE
cout << "fifo freed:" << (void*)wbuf << endl;
#endif
fifofree->push(wbuf);
/* /*
cout <<"fifofree is full?:" << fifofree->isFull()<<endl; cout <<"fifofree is full?:" << fifofree->isFull()<<endl;
cout <<"fifo is empty?:" << fifo->isEmpty()<<endl; cout <<"fifo is empty?:" << fifo->isEmpty()<<endl;
@ -840,12 +849,9 @@ int slsReceiverFunctionList::startWriting(){
cout << "Status: Run Finished" << endl; cout << "Status: Run Finished" << endl;
break; break;
}else{ }else{
while(!fifo->isEmpty()){ cout<<"*************************should not be here!"<<endl;
cout<<"popped:"<<fifo->pop(buffer)<<endl; numJobsPerThread = -1;
cout<<"buff val:"<<(void*)buffer<<endl; break;
exit(-1);
}
} }
} }
@ -857,11 +863,13 @@ int slsReceiverFunctionList::startWriting(){
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout << "buf1:" << (void*)wbuf << endl; cout << "buf1:" << (void*)wbuf << endl;
#endif #endif
currframenum = (((uint32_t)(*((uint32_t*)(wbuf + HEADER_SIZE_NUM_FRAMES + HEADER_SIZE_NUM_PACKETS)))& frameIndexMask) >>frameIndexOffset); //must be updated only in singlephoton if compression, else lock issues. (exception in the beginning for startframeindex)
if(!dataCompression){
currframenum = (((uint32_t)(*((uint32_t*)(wbuf + HEADER_SIZE_NUM_FRAMES + HEADER_SIZE_NUM_PACKETS)))& frameIndexMask) >>frameIndexOffset);
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout << "currframnum:" << dec << currframenum << endl; cout << "currframnum:" << dec << currframenum << endl;
#endif #endif
}
//send it for writing and data compression //send it for writing and data compression
if(dataCompression) if(dataCompression)
filter->assignJobsForThread(wbuf, numFrames); filter->assignJobsForThread(wbuf, numFrames);
@ -1128,7 +1136,7 @@ int slsReceiverFunctionList::setShortFrame(int i){
delete filter; delete filter;
filter = new singlePhotonFilter(x,y,frameIndexMask, GOTTHARD_PACKET_INDEX_MASK, frameIndexOffset, filter = new singlePhotonFilter(x,y,frameIndexMask, GOTTHARD_PACKET_INDEX_MASK, frameIndexOffset,
0, GOTTHARD_SHORT_PACKETS_PER_FRAME, 0,map, mask,fifofree,GOTTHARD_SHORT_BUFFER_SIZE,&totalFramesCaught,&framesCaught); 0, GOTTHARD_SHORT_PACKETS_PER_FRAME, 0,map, mask,fifofree,GOTTHARD_SHORT_BUFFER_SIZE,&totalFramesCaught,&framesCaught,&currframenum);
break; break;
default: //normal readout for gotthard default: //normal readout for gotthard
@ -1159,7 +1167,7 @@ int slsReceiverFunctionList::setShortFrame(int i){
delete filter; delete filter;
filter = new singlePhotonFilter(x,y,frameIndexMask, GOTTHARD_PACKET_INDEX_MASK, frameIndexOffset, filter = new singlePhotonFilter(x,y,frameIndexMask, GOTTHARD_PACKET_INDEX_MASK, frameIndexOffset,
0, GOTTHARD_PACKETS_PER_FRAME, 1,map, mask,fifofree,GOTTHARD_BUFFER_SIZE,&totalFramesCaught,&framesCaught); 0, GOTTHARD_PACKETS_PER_FRAME, 1,map, mask,fifofree,GOTTHARD_BUFFER_SIZE,&totalFramesCaught,&framesCaught,&currframenum);
break; break;
} }
@ -1232,6 +1240,8 @@ void slsReceiverFunctionList::setupFifoStructure(){
numJobsPerThread = i; numJobsPerThread = i;
} }
/*for testing
numJobsPerThread = 3;*/
//if same, return //if same, return
if(oldn == numJobsPerThread) if(oldn == numJobsPerThread)
@ -1261,10 +1271,11 @@ void slsReceiverFunctionList::setupFifoStructure(){
fifosize = fifosize/numJobsPerThread; fifosize = fifosize/numJobsPerThread;
/*
fifosize = 11; /*for testing
numJobsPerThread = 3; fifosize = 11;*/
*/
//allocate memory //allocate memory
mem0=(char*)malloc(((bufferSize+HEADER_SIZE_NUM_PACKETS)*numJobsPerThread+HEADER_SIZE_NUM_FRAMES)*fifosize); mem0=(char*)malloc(((bufferSize+HEADER_SIZE_NUM_PACKETS)*numJobsPerThread+HEADER_SIZE_NUM_FRAMES)*fifosize);

View File

@ -234,7 +234,7 @@ public:
/** free fifo buffer, called back from single photon filter */ /** free fifo buffer, called back from single photon filter */
static void freeFifoBufferCallBack (char* fbuffer, void *this_pointer){((slsReceiverFunctionList*)this_pointer)->freeFifoBuffer(fbuffer);}; static void freeFifoBufferCallBack (char* fbuffer, void *this_pointer){((slsReceiverFunctionList*)this_pointer)->freeFifoBuffer(fbuffer);};
void freeFifoBuffer(char* fbuffer){fifofree->push(fbuffer);}; void freeFifoBuffer(char* fbuffer){/*cout<< "fifo freed:"<<(void*)fbuffer<<endl;*/fifofree->push(fbuffer);};