From 652d29f2d904b7ce91b9f7a4581b17d419f07b04 Mon Sep 17 00:00:00 2001 From: Dhanya Maliakal Date: Fri, 9 Sep 2016 17:51:36 +0200 Subject: [PATCH] almost done --- slsDetectorSoftware/Makefile | 7 +- .../multiSlsDetector/multiSlsDetector.cpp | 218 ++++++++++++++++-- .../multiSlsDetector/multiSlsDetector.h | 15 +- .../slsDetector/slsDetector.cpp | 165 +++++++++---- slsDetectorSoftware/slsDetector/slsDetector.h | 9 +- .../slsDetector/slsDetectorBase.h | 9 +- .../slsDetector/slsDetectorUtils.cpp | 48 +--- .../slsDetector/slsDetectorUtils.h | 19 +- .../slsDetectorAnalysis/postProcessing.cpp | 18 +- .../slsDetectorAnalysis/postProcessing.h | 18 +- slsDetectorSoftware/threadFiles/Task.h | 44 +++- .../threadFiles/ThreadPool.cpp | 60 +++-- slsDetectorSoftware/threadFiles/ThreadPool.h | 8 +- 13 files changed, 441 insertions(+), 197 deletions(-) diff --git a/slsDetectorSoftware/Makefile b/slsDetectorSoftware/Makefile index 17422ee95..eb3708d50 100644 --- a/slsDetectorSoftware/Makefile +++ b/slsDetectorSoftware/Makefile @@ -18,6 +18,9 @@ INCLUDES?= -IcommonFiles -IslsDetector -I../slsReceiverSoftware/MySocketTCP -Ius SRC_CLNT=slsDetectorAnalysis/fileIO.cpp usersFunctions/usersFunctions.cpp slsDetector/slsDetectorUtils.cpp slsDetector/slsDetectorCommand.cpp slsDetectorAnalysis/angularConversion.cpp slsDetectorAnalysis/angularConversionStatic.cpp slsDetectorAnalysis/energyConversion.cpp slsDetector/slsDetectorActions.cpp slsDetectorAnalysis/postProcessing.cpp slsDetector/slsDetector.cpp multiSlsDetector/multiSlsDetector.cpp slsDetectorAnalysis/postProcessingFuncs.cpp slsReceiverInterface/receiverInterface.cpp slsDetector/slsDetectorUsers.cpp threadFiles/CondVar.cpp threadFiles/Mutex.cpp threadFiles/ThreadPool.cpp #../slsReceiverSoftware/MySocketTCP/MySocketTCP.cpp +LIBZMQDIR = ../slsReceiverSoftware/include +LIBZMQ = -L$(LIBZMQDIR) -Wl,-rpath=$(LIBZMQDIR) -lzmq + $(info ) $(info #######################################) $(info # Compiling slsDetectorSoftware #) @@ -66,14 +69,14 @@ gotthardVirtualServer: $(SRC_MYTHEN_SVC) %.o : %.cpp %.h Makefile - $(CXX) -o $@ -c $< $(INCLUDES) $(DFLAGS) -fPIC $(EPICSFLAGS) -lpthread #$(FLAGS) + $(CXX) -o $@ -c $< $(INCLUDES) $(DFLAGS) -fPIC $(EPICSFLAGS) -lpthread -lrt $(LIBZMQ) #$(FLAGS) package: $(OBJS) $(DESTDIR)/libSlsDetector.so $(DESTDIR)/libSlsDetector.a $(DESTDIR)/libSlsDetector.so: $(OBJS) - $(CXX) -shared -Wl,-soname,libSlsDetector.so -o libSlsDetector.so $(OBJS) -lc $(INCLUDES) $(DFLAGS) $(FLAGS) $(EPICSFLAGS) -L/usr/lib64 -lpthread + $(CXX) -shared -Wl,-soname,libSlsDetector.so -o libSlsDetector.so $(OBJS) -lc $(INCLUDES) $(DFLAGS) $(FLAGS) $(EPICSFLAGS) -L/usr/lib64 -lpthread -lrt $(LIBZMQ) $(shell test -d $(DESTDIR) || mkdir -p $(DESTDIR)) mv libSlsDetector.so $(DESTDIR) diff --git a/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp b/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp index f902e24d7..36a37143f 100644 --- a/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp +++ b/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp @@ -267,28 +267,30 @@ multiSlsDetector::multiSlsDetector(int id) : slsDetectorUtils(), shmId(-1) getNMods(); getMaxMods(); threadpool = 0; - if(createThreadPool() == FAIL) + zmqthreadpool = 0; + if(createThreadPool(&threadpool) == FAIL) exit(-1); } multiSlsDetector::~multiSlsDetector() { //removeSlsDetector(); - destroyThreadPool(); + destroyThreadPool(&threadpool); + destroyThreadPool(&zmqthreadpool); } -int multiSlsDetector::createThreadPool(){ - if(threadpool){ - threadpool->destroy_threadpool(); - threadpool=0; +int multiSlsDetector::createThreadPool(ThreadPool** t){ + if(*t){ + (ThreadPool*)(*t)->destroy_threadpool(); + *t=0; } if(thisMultiDetector->numberOfDetectors < 1){ cout << "No detectors attached to create threadpool" << endl; return OK; } - threadpool = new ThreadPool(thisMultiDetector->numberOfDetectors); - switch(threadpool->initialize_threadpool()){ + *t = new ThreadPool(thisMultiDetector->numberOfDetectors); + switch(((ThreadPool*)(*t))->initialize_threadpool()){ case 0: cerr << "Failed to initialize thread pool!" << endl; return FAIL; @@ -299,19 +301,19 @@ int multiSlsDetector::createThreadPool(){ break; default: #ifdef VERBOSE - cout << "Initialized Threadpool" << endl; + cout << "Initialized Threadpool " << *t << endl; #endif break; } return OK; } -void multiSlsDetector::destroyThreadPool(){ - if(threadpool){ - threadpool->destroy_threadpool(); - threadpool=0; +void multiSlsDetector::destroyThreadPool(ThreadPool** t){ + if(*t){ + (ThreadPool*)(*t)->destroy_threadpool(); + *t=0; #ifdef VERBOSE - cout<<"Destroyed Threadpool"<numberOfDetectors; @@ -1196,6 +1196,7 @@ slsDetectorDefs::detectorSettings multiSlsDetector::getSettings(int pos) { threadpool->add_task(task); } } + threadpool->startExecuting(); threadpool->wait_for_tasks_to_complete(); for(int idet=posmin; idetadd_task(task); } } + threadpool->startExecuting(); threadpool->wait_for_tasks_to_complete(); for(int idet=posmin; idetadd_task(task); } } + threadpool->startExecuting(); threadpool->wait_for_tasks_to_complete(); for(int idet=posmin; idetmasterPosition) && (detectors[idet])){ @@ -3318,6 +3321,7 @@ char* multiSlsDetector::setNetworkParameter(networkParameter p, string s){ threadpool->add_task(task); } } + threadpool->startExecuting(); threadpool->wait_for_tasks_to_complete(); for(int idet=0; idetnumberOfDetectors; idet++){ if(detectors[idet]){ @@ -3926,6 +3930,7 @@ int multiSlsDetector::executeTrimming(trimMode mode, int par1, int par2, int imo threadpool->add_task(task); } } + threadpool->startExecuting(); threadpool->wait_for_tasks_to_complete(); for(int idet=0; idetnumberOfDetectors; idet++){ if(detectors[idet]){ @@ -3977,6 +3982,7 @@ int multiSlsDetector::loadSettingsFile(string fname, int imod) { threadpool->add_task(task); } } + threadpool->startExecuting(); threadpool->wait_for_tasks_to_complete(); for(int idet=0; idetnumberOfDetectors; idet++){ if(detectors[idet]){ @@ -4051,6 +4057,7 @@ int multiSlsDetector::setAllTrimbits(int val, int imod){ threadpool->add_task(task); } } + threadpool->startExecuting(); threadpool->wait_for_tasks_to_complete(); for(int idet=0; idetnumberOfDetectors; idet++){ if(detectors[idet]){ @@ -4100,6 +4107,7 @@ int multiSlsDetector::loadCalibrationFile(string fname, int imod) { threadpool->add_task(task); } } + threadpool->startExecuting(); threadpool->wait_for_tasks_to_complete(); for(int idet=0; idetnumberOfDetectors; idet++){ if(detectors[idet]){ @@ -4752,6 +4760,7 @@ int multiSlsDetector::startReceiver(){ threadpool->add_task(task); } } + threadpool->startExecuting(); threadpool->wait_for_tasks_to_complete(); for(int idet=posmin; idetmasterPosition) && (detectors[idet])){ @@ -4813,6 +4822,7 @@ int multiSlsDetector::stopReceiver(){ threadpool->add_task(task); } } + threadpool->startExecuting(); threadpool->wait_for_tasks_to_complete(); for(int idet=posmin; idetmasterPosition) && (detectors[idet])){ @@ -4948,7 +4958,173 @@ int multiSlsDetector::resetFramesCaught() { -int* multiSlsDetector::readFrameFromReceiver(char* fName, int &acquisitionIndex, int &frameIndex, int &subFrameIndex){ +void multiSlsDetector::readFrameFromReceiver(){ + int value; + if(createThreadPool(&zmqthreadpool) == FAIL){ + cprintf(BG_RED,"Error: Could not create the zmq threads\n"); + return; + } + zmqthreadpool->setzeromqThread(); + + //determine number of half readouts and maxX and maxY + int maxX=0,maxY=0; + int numReadout = 1; + if(getDetectorsType() == EIGER){ + numReadout = 2; + maxX = thisMultiDetector->numberOfChannel[X]; + maxY = thisMultiDetector->numberOfChannel[Y]; + } + + //Note:num threads correspond to num detectors as task calls each slsdet + //(eiger udp ports/half readouts will have to do it serially) + + //start all socket tasks + volatile uint64_t runningMask = 0x0; + int slsdatabytes = 0, slsmaxchannels = 0, bytesperchannel = 0, slsmaxX = 0, slsmaxY=0; + if(!zmqthreadpool){ + cout << "Error in creating threadpool. Exiting" << endl; + return; + }else{ + for(int idet=0; idetnumberOfDetectors; idet++){ + if(detectors[idet]){ + sem_init(&sem_slswait[idet*numReadout],1,0); + sem_init(&sem_slsdone[idet*numReadout],1,0); + if(numReadout>1){ + sem_init(&sem_slswait[idet*numReadout+1],1,0); + sem_init(&sem_slsdone[idet*numReadout+1],1,0); + } + Task* task = new Task(new func00_t(&slsDetector::readFrameFromReceiver,detectors[idet])); + zmqthreadpool->add_task(task); + if(!slsdatabytes){ + slsdatabytes = detectors[idet]->getDataBytes(); + slsmaxchannels = detectors[idet]->getMaxNumberOfChannels(); + bytesperchannel = slsdatabytes/slsmaxchannels; + slsmaxX = detectors[idet]->getTotalNumberOfChannels(X); + slsmaxY = detectors[idet]->getTotalNumberOfChannels(Y); + } + //set mask + runningMask|=(1<<(idet*numReadout)); + if(numReadout>1) + runningMask|=(1<<(idet*numReadout+1)); + + } + } + } + + zmqthreadpool->startExecuting();//tell them to start + + + int nel=(thisMultiDetector->dataBytes)/sizeof(int); + + if(nel <= 0){ + cout << "Multislsdetector databytes not valid :" << thisMultiDetector->dataBytes << endl; + return; + } + int* multiframe=new int[nel]; + int* p = multiframe; + int idet,offsetY,offsetX; + int halfreadoutoffset = (slsmaxX/numReadout); + //after reconstruction + int framecount=0; + int nx =getTotalNumberOfChannels(slsDetectorDefs::X); + int ny =getTotalNumberOfChannels(slsDetectorDefs::Y); + + + + while(true){ + memset(((char*)multiframe),0x0,slsdatabytes*thisMultiDetector->numberOfDetectors); + + for(int ireadout=0; ireadoutnumberOfDetectors*numReadout; ++ireadout){ + idet = ireadout/numReadout; + + if(detectors[idet]){ + if((1 << ireadout) & runningMask){ + + + sem_post(&sem_slswait[ireadout]); //sls to continue + sem_wait(&sem_slsdone[ireadout]); //wait for sls to copy + + //this socket closed + if(slsframe[ireadout] == NULL){ + runningMask^=(1<offsetY[idet] + slsmaxY)) * maxX * bytesperchannel; + if(!(ireadout%numReadout)) offsetX = thisMultiDetector->offsetX[idet]; + else offsetX = thisMultiDetector->offsetX[idet] + halfreadoutoffset; + offsetX *= bytesperchannel; + cprintf(BLUE,"offsetx:%d offsety:%d maxx:%d slsmaxX:%d slsmaxY:%d bytesperchannel:%d\n", + offsetX,offsetY,maxX,slsmaxX,slsmaxY,bytesperchannel); + + cprintf(BLUE,"copying bytes:%d\n", (slsmaxX/numReadout)*bytesperchannel); + //itnerleaving with other detectors + for(int i=0;iwait_for_tasks_to_complete(); + destroyThreadPool(&zmqthreadpool); + delete[] multiframe; + + + + + + + + +/* int nel=(thisMultiDetector->dataBytes)/sizeof(int); if(nel <= 0){ cout << "Multislsdetector databytes not valid :" << thisMultiDetector->dataBytes << endl; @@ -5039,6 +5215,7 @@ int* multiSlsDetector::readFrameFromReceiver(char* fName, int &acquisitionIndex if((getDetectorsType() == EIGER) &&(complete ==FAIL)) acquisitionIndex = -1; return retval; + */ }; @@ -5468,6 +5645,7 @@ int multiSlsDetector::pulsePixel(int n,int x,int y) { threadpool->add_task(task); } } + threadpool->startExecuting(); threadpool->wait_for_tasks_to_complete(); for(int idet=0; idetnumberOfDetectors; idet++){ if(detectors[idet]){ @@ -5504,6 +5682,7 @@ int multiSlsDetector::pulsePixelNMove(int n,int x,int y) { threadpool->add_task(task); } } + threadpool->startExecuting(); threadpool->wait_for_tasks_to_complete(); for(int idet=0; idetnumberOfDetectors; idet++){ if(detectors[idet]){ @@ -5540,6 +5719,7 @@ int multiSlsDetector::pulseChip(int n) { threadpool->add_task(task); } } + threadpool->startExecuting(); threadpool->wait_for_tasks_to_complete(); for(int idet=0; idetnumberOfDetectors; idet++){ if(detectors[idet]){ diff --git a/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.h b/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.h index 07c4fe839..2c72b848a 100644 --- a/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.h +++ b/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.h @@ -245,10 +245,10 @@ class multiSlsDetector : public slsDetectorUtils { * Creates all the threads in the threadpool \returns OK or FAIL */ - int createThreadPool(); + int createThreadPool(ThreadPool** t); /** destroys all the threads in the threadpool */ - void destroyThreadPool(); + void destroyThreadPool(ThreadPool** t); /** frees the shared memory occpied by the sharedMultiSlsDetector structure */ int freeSharedMemory() ; @@ -1180,14 +1180,9 @@ class multiSlsDetector : public slsDetectorUtils { int resetFramesCaught(); /** - * Reads a frame from receiver - * @param fName file name of current frame() - * @param acquisitionIndex current acquisition index - * @param frameIndex current frame index (for each scan) - * @param subFrameIndex current sub frame index (for 32 bit mode for eiger) - /returns a frame read from recever + * Reads frames from receiver through a constant socket */ - int* readFrameFromReceiver(char* fName, int &acquisitionIndex, int &frameIndex, int &subFrameIndex); + void readFrameFromReceiver(); /** Locks/Unlocks the connection to the receiver /param lock sets (1), usets (0), gets (-1) the lock @@ -1378,7 +1373,7 @@ class multiSlsDetector : public slsDetectorUtils { private: ThreadPool* threadpool; - + ThreadPool* zmqthreadpool; }; diff --git a/slsDetectorSoftware/slsDetector/slsDetector.cpp b/slsDetectorSoftware/slsDetector/slsDetector.cpp index 7565aeaf8..892c6a3a5 100644 --- a/slsDetectorSoftware/slsDetector/slsDetector.cpp +++ b/slsDetectorSoftware/slsDetector/slsDetector.cpp @@ -10,7 +10,7 @@ #include #include #include "gitInfoLib.h" - +#include int slsDetector::initSharedMemory(detectorType type, int id) { @@ -7145,58 +7145,125 @@ int slsDetector::resetFramesCaught(){ -int* slsDetector::readFrameFromReceiver(char* fName, int &acquisitionIndex, int &frameIndex, int &subFrameIndex){ - int fnum=F_READ_RECEIVER_FRAME; - int nel=thisDetector->dataBytes/sizeof(int); - int* retval=new int[nel]; - int ret=FAIL; - int n; - char mess[MAX_STR_LENGTH]="Nothing"; +void slsDetector::readFrameFromReceiver(){ - if (setReceiverOnline(ONLINE_FLAG)==ONLINE_FLAG) { -#ifdef VERBOSE - std::cout<< "slsDetector: Reading frame from receiver "<< thisDetector->dataBytes << " " <SendDataOnly(&fnum,sizeof(fnum)); - dataSocket->ReceiveDataOnly(&ret,sizeof(ret)); + //determine number of half readouts + int numReadout = 1; + if(thisDetector->myDetectorType == EIGER) numReadout = 2; + int readoutId = detId*numReadout; + volatile uint64_t runningMask = 0x0; - if (ret==FAIL) { - n= dataSocket->ReceiveDataOnly(mess,sizeof(mess)); - std::cout<< "Detector returned: " << mess << " " << n << std::endl; - delete [] retval; - disconnectData(); - return NULL; - } else { - n=dataSocket->ReceiveDataOnly(fName,MAX_STR_LENGTH); - n=dataSocket->ReceiveDataOnly(&acquisitionIndex,sizeof(acquisitionIndex)); - n=dataSocket->ReceiveDataOnly(&frameIndex,sizeof(frameIndex)); - if(thisDetector->myDetectorType == EIGER) - n=dataSocket->ReceiveDataOnly(&subFrameIndex,sizeof(subFrameIndex)); - n=dataSocket->ReceiveDataOnly(retval,thisDetector->dataBytes); + //server details + char hostname[numReadout][100]; + int portno[numReadout]; + int nel=(thisDetector->dataBytes/numReadout)/sizeof(int); + for(int i=0;idataBytes) { - std::cout<dataBytes << std::endl; - ret=FAIL; - delete [] retval; - disconnectData(); - return NULL; - } - - //jungfrau masking adcval - if(thisDetector->myDetectorType == JUNGFRAU){ - for(unsigned int i=0;islsframe[readoutId+i]=new int[nel]; } - return retval; + + + //loop though the half readouts to start sockets + void *context[numReadout]; + void *zmqsocket[numReadout]; + for(int i=0;isem_slswait[readoutId+idet]);//wait for it to be copied + + + //update indices + if(!idet) framecount++; //count only once + + // receive a message, this is a blocking function + len = zmq_msg_init (&message); /* is this required? Xiaoqiang didnt have it*/ + if(len) {cprintf(RED,"Failed to initialize message %d for %d\n",len,readoutId+idet); continue; }//error + len = zmq_msg_recv(&message, zmqsocket[idet], 0); + + //int size = zmq_msg_size (&message); + if (len <= 3 ) { + if(!len) cprintf(RED,"Received no data in socket for %d\n", readoutId+idet); + cout<slsframe[readoutId+idet] = NULL; + sem_post(&parentDet->sem_slsdone[readoutId+idet]);//let multi know is ready + + runningMask^=(1<dataBytes/numReadout){//hoow to solve this + memcpy((char*)(parentDet->slsframe[readoutId+idet]),(char*)zmq_msg_data(&message),thisDetector->dataBytes/numReadout); + //memcpy((char*)(parentDet->slsframe[readoutId+idet]),zmq_msg_data(&message[idet]),thisDetector->dataBytes); + //check header, if incorrect frame, copy somewhere and assign a blank subframe + //parentDet->slsframe[readoutId+idet] = (int*)zmq_msg_data(&message[idet]); + + //jungfrau masking adcval + if(thisDetector->myDetectorType == JUNGFRAU){ + for(unsigned int i=0;islsframe[readoutId+idet][i] = (parentDet->slsframe[readoutId+idet][i] & 0x3FFF3FFF); + } + } + //} + sem_post(&parentDet->sem_slsdone[readoutId+idet]);//let multi know is ready + + } + }//end of for loop + + if(!runningMask){ + break; + } + + } + zmq_msg_close(&message); + + + + //close socket + for(int i=0;islsframe[readoutId+i]; + } + }; diff --git a/slsDetectorSoftware/slsDetector/slsDetector.h b/slsDetectorSoftware/slsDetector/slsDetector.h index 673729b22..54d6b27e5 100644 --- a/slsDetectorSoftware/slsDetector/slsDetector.h +++ b/slsDetectorSoftware/slsDetector/slsDetector.h @@ -1569,14 +1569,9 @@ class slsDetector : public slsDetectorUtils, public energyConversion { int resetFramesCaught(); /** - * Reads a frame from receiver - * @param fName file name of current frame() - * @param acquisitionIndex current acquisition index - * @param frameIndex current frame index (for each scan) - * @param subFrameIndex current sub frame index (for 32 bit mode for eiger) - /returns a frame read from recever + * Reads frames from receiver through a constant socket */ - int* readFrameFromReceiver(char* fName, int &acquisitionIndex, int &frameIndex, int &subFrameIndex); + void readFrameFromReceiver(); /** Locks/Unlocks the connection to the receiver /param lock sets (1), usets (0), gets (-1) the lock diff --git a/slsDetectorSoftware/slsDetector/slsDetectorBase.h b/slsDetectorSoftware/slsDetector/slsDetectorBase.h index 62eb83006..9ccf313e5 100644 --- a/slsDetectorSoftware/slsDetector/slsDetectorBase.h +++ b/slsDetectorSoftware/slsDetector/slsDetectorBase.h @@ -507,14 +507,9 @@ class slsDetectorBase : public virtual slsDetectorDefs, public virtual errorDef /** - * Reads a frame from receiver - * @param fName file name of current frame() - * @param acquisitionIndex current acquisition index - * @param frameIndex current frame index (for each scan) - * @param subFrameIndex current sub frame index (for 32 bit mode for eiger) - /returns a frame read from recever + * Reads frames from receiver through a constant socket */ - virtual int* readFrameFromReceiver(char* fName, int &acquisitionIndex, int &frameIndex, int &subFrameIndex)=0; + virtual void readFrameFromReceiver()=0; /** Sets the read receiver frequency diff --git a/slsDetectorSoftware/slsDetector/slsDetectorUtils.cpp b/slsDetectorSoftware/slsDetector/slsDetectorUtils.cpp index d390c8398..a4e3030ff 100644 --- a/slsDetectorSoftware/slsDetector/slsDetectorUtils.cpp +++ b/slsDetectorSoftware/slsDetector/slsDetectorUtils.cpp @@ -72,10 +72,6 @@ int slsDetectorUtils::acquire(int delflag){ int multiframe = nc*nf; - pthread_mutex_lock(&mg); - acquiringDone = 0; - pthread_mutex_unlock(&mg); - // setTotalProgress(); //moved these 2 here for measurement change progressIndex=0; @@ -162,7 +158,6 @@ int slsDetectorUtils::acquire(int delflag){ if (*threadedProcessing) { - sem_init(&sem_queue,0,0); startThread(delflag); } #ifdef VERBOSE @@ -341,48 +336,13 @@ int slsDetectorUtils::acquire(int delflag){ //offline if(setReceiverOnline()==OFFLINE_FLAG){ - // wait until data processing thread has finished the data - pthread_mutex_lock(&mg); - acquiringDone = 1; - pthread_mutex_unlock(&mg); - if (*threadedProcessing) { - sem_wait(&sem_queue); - pthread_mutex_lock(&mg); - acquiringDone = 0; - pthread_mutex_unlock(&mg); + if ((getDetectorsType()==GOTTHARD) || (getDetectorsType()==MOENCH) || (getDetectorsType()==JUNGFRAU) ){ + if((*correctionMask)&(1< #include #include - +#include using namespace std; @@ -641,14 +641,9 @@ virtual int getReceiverCurrentFrameIndex()=0; virtual int resetFramesCaught()=0; /** - * Reads a frame from receiver - * @param fName file name of current frame() - * @param acquisitionIndex current acquisition index - * @param frameIndex current frame index (for each scan) - * @param subFrameIndex current sub frame index (for 32 bit mode for eiger) - /returns a frame read from recever + * Reads frames from receiver through a constant socket */ -virtual int* readFrameFromReceiver(char* fName, int &acquisitionIndex, int &frameIndex, int &subFrameIndex)=0; +virtual void readFrameFromReceiver()=0; /** @@ -850,6 +845,14 @@ virtual int setReceiverFifoDepth(int i = -1)=0; int (*progress_call)(double,void*); void *pProgressCallArg; + public: + //data call back + //individual sls and multi + sem_t sem_slsdone[MAXDET]; + sem_t sem_slswait[MAXDET]; + int* slsframe[MAXDET]; + + }; diff --git a/slsDetectorSoftware/slsDetectorAnalysis/postProcessing.cpp b/slsDetectorSoftware/slsDetectorAnalysis/postProcessing.cpp index 9fcb6c9c0..60db31654 100644 --- a/slsDetectorSoftware/slsDetectorAnalysis/postProcessing.cpp +++ b/slsDetectorSoftware/slsDetectorAnalysis/postProcessing.cpp @@ -7,6 +7,7 @@ #include "usersFunctions.h" #endif + //#define VERBOSE static void* startProcessData(void *n){ @@ -461,12 +462,7 @@ void* postProcessing::processData(int delflag) { } - /** IF detector acquisition is done, let the acquire() thread know to finish up and force join thread */ - if(acquiringDone){ - sem_post(&sem_queue); - // cout << "Sem posted" << endl; - } //else - // cout << "Sem not posted" << endl; + /* IF THERE ARE NO DATA look if acquisition is finished */ if (checkJoinThread()) { if (dataQueueSize()==0) { @@ -488,6 +484,14 @@ void* postProcessing::processData(int delflag) { else{ + + readFrameFromReceiver(); + + + + + +/* int progress = 0; char currentfName[MAX_STR_LENGTH]=""; int caught = -1; @@ -663,6 +667,8 @@ void* postProcessing::processData(int delflag) { } } +*/ + } return 0; diff --git a/slsDetectorSoftware/slsDetectorAnalysis/postProcessing.h b/slsDetectorSoftware/slsDetectorAnalysis/postProcessing.h index 90f00085a..546457fa1 100644 --- a/slsDetectorSoftware/slsDetectorAnalysis/postProcessing.h +++ b/slsDetectorSoftware/slsDetectorAnalysis/postProcessing.h @@ -17,7 +17,7 @@ #include #include #include -#include + class postProcessingFuncs; @@ -300,11 +300,7 @@ s /** data queue size */ int queuesize; - /** queue mutex */ - sem_t sem_queue; - /** set when detector finishes acquiring */ - int acquiringDone; /** @@ -333,18 +329,23 @@ s - private: double *fdata; - int (*dataReady)(detectorData*,int, int,void*); void *pCallbackArg; + detectorData *thisData; + private: + // double *fdata; + + // int (*dataReady)(detectorData*,int, int,void*); + // void *pCallbackArg; + int (*rawDataReady)(double*,int,void*); void *pRawDataArg; postProcessingFuncs *ppFun; - detectorData *thisData; + //detectorData *thisData; double *ang; @@ -374,4 +375,5 @@ s + #endif diff --git a/slsDetectorSoftware/threadFiles/Task.h b/slsDetectorSoftware/threadFiles/Task.h index e7f8f12f8..785298d46 100644 --- a/slsDetectorSoftware/threadFiles/Task.h +++ b/slsDetectorSoftware/threadFiles/Task.h @@ -16,6 +16,18 @@ using namespace std; class slsDetector; +template +class func00_t{ +public: + func00_t(_Ret (_Class::*fn)(),_Class* ptr): + m_fn(fn),m_ptr(ptr){} + ~func00_t() {} + void operator()() const {((m_ptr->*m_fn)());} +private: + _Class* m_ptr; + _Ret (_Class::*m_fn)(); +}; + template class func0_t{ public: @@ -94,27 +106,32 @@ private: class Task: public virtual slsDetectorDefs{ public: /* Return: int, Param: int */ - Task(func1_t * t): m1(t),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0){}; + Task(func1_t * t): m1(t),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0),m12(0){}; /* Return: int, Param: string,int */ - Task(func2_t * t): m1(0),m2(t),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0){}; + Task(func2_t * t): m1(0),m2(t),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0),m12(0){}; /* Return: string, Param: string */ - Task(func1_t * t): m1(0),m2(0),m3(t),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0){}; + Task(func1_t * t): m1(0),m2(0),m3(t),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0),m12(0){}; /* Return: char*, Param: char* */ - Task(func1_t * t): m1(0),m2(0),m3(0),m4(t),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0){}; + Task(func1_t * t): m1(0),m2(0),m3(0),m4(t),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0),m12(0){}; /* Return: detectorSettings, Param: int */ - Task(func1_t * t): m1(0),m2(0),m3(0),m4(0),m5(t),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0){}; + Task(func1_t * t): m1(0),m2(0),m3(0),m4(0),m5(t),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0),m12(0){}; /* Return: detectorSettings, Param: detectorSettings,int */ - Task(func2_t * t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(t),m7(0),m8(0),m9(0),m10(0),m11(0){}; + Task(func2_t * t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(t),m7(0),m8(0),m9(0),m10(0),m11(0),m12(0){}; /* Return: int, Param: int,int */ - Task(func2_t * t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(t),m8(0),m9(0),m10(0),m11(0){}; + Task(func2_t * t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(t),m8(0),m9(0),m10(0),m11(0),m12(0){}; /* Return: int, Param: int,int */ - Task(func3_t * t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(t),m9(0),m10(0),m11(0){}; + Task(func3_t * t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(t),m9(0),m10(0),m11(0),m12(0){}; /* Return: int, Param: trimMode,int,int,int */ - Task(func4_t * t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(t),m10(0),m11(0){}; - /* Return: int, Param: int */ - Task(func0_t * t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(t),m11(0){}; + Task(func4_t * t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(t),m10(0),m11(0),m12(0){}; + /* Return: int, Param: none */ + Task(func0_t * t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(t),m11(0),m12(0){}; /* Return: char*, Param: networkParameter,string,string */ - Task(func2_t * t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(t){}; + Task(func2_t * t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(t),m12(0){}; + /* Return: void, Param: none */ + Task(func00_t * t): m1(0),m2(0),m3(0),m4(0),m5(0),m6(0),m7(0),m8(0),m9(0),m10(0),m11(0),m12(t){}; + + + ~Task(){} void operator()(){ @@ -129,6 +146,7 @@ public: else if(m9) (*m9)(); else if(m10) (*m10)(); else if(m11) (*m11)(); + else if(m12) (*m12)(); } private: @@ -154,6 +172,8 @@ private: func0_t * m10; /* Return: char*, Param: networkParameter,string,string */ func2_t * m11; + /* Return: void, Param: none */ + func00_t * m12; }; diff --git a/slsDetectorSoftware/threadFiles/ThreadPool.cpp b/slsDetectorSoftware/threadFiles/ThreadPool.cpp index 223983a73..347a15e31 100644 --- a/slsDetectorSoftware/threadFiles/ThreadPool.cpp +++ b/slsDetectorSoftware/threadFiles/ThreadPool.cpp @@ -7,8 +7,10 @@ ThreadPool::ThreadPool(int pool_size) : m_pool_size(pool_size){ #endif m_tasks_loaded = false; thread_started = false; + zmqthreadpool = false; current_thread_number = -1; number_of_ongoing_tasks = 0; + number_of_total_tasks = 0; } ThreadPool::~ThreadPool(){ @@ -34,6 +36,7 @@ int ThreadPool::initialize_threadpool(){ m_pool_state = STARTED; int ret = -1; sem_init(&semStart,1,0); + sem_init(&semDone,1,0); for (int i = 0; i < m_pool_size; i++) { pthread_t tid; thread_started = false; @@ -68,12 +71,15 @@ int ThreadPool::destroy_threadpool(){ for (int i = 0; i < m_pool_size; i++) { void* result; sem_post(&semStart); + sem_post(&semDone); ret = pthread_join(m_threads[i], &result); /*cout << "pthread_join() returned " << ret << ": " << strerror(errno) << endl;*/ m_task_cond_var.broadcast(); // try waking up a bunch of threads that are still waiting } sem_destroy(&semStart); + sem_destroy(&semDone); number_of_ongoing_tasks = 0; + number_of_total_tasks = 0; /* cout << m_pool_size << " threads exited from the thread pool" << endl;*/ return 0; } @@ -109,25 +115,28 @@ void* ThreadPool::execute_thread(){ /*cout << "Unlocking: " << pthread_self() << endl;*/ m_task_mutex.unlock(); + //if(zmqthreadpool) cout<<"***"<run(arg); /*cout << ithread <<" Done executing thread " << pthread_self() << endl;*/ - m_all_tasks_mutex.lock(); + m_task_mutex.lock(); number_of_ongoing_tasks--; - m_all_tasks_mutex.unlock(); + m_task_mutex.unlock(); + //if(zmqthreadpool) cout< m_tasks; volatile int m_pool_state; - Mutex m_all_tasks_mutex; - CondVar m_all_tasks_cond_var; bool m_tasks_loaded; bool thread_started; int current_thread_number; //volatile uint64_t tasks_done_mask; volatile int number_of_ongoing_tasks; + volatile int number_of_total_tasks; sem_t semStart; - + sem_t semDone; + bool zmqthreadpool; };