From 1da4b07e73c7b3ef4de90f3c08ebede95e909f8a Mon Sep 17 00:00:00 2001 From: Dhanya Maliakal Date: Thu, 15 Sep 2016 17:15:55 +0200 Subject: [PATCH] works, need to do json header and send dataready --- .../multiSlsDetector/multiSlsDetector.cpp | 342 ++++++++++++------ .../multiSlsDetector/multiSlsDetector.h | 27 +- .../slsDetector/slsDetector.cpp | 103 ------ slsDetectorSoftware/slsDetector/slsDetector.h | 2 +- .../slsDetector/slsDetectorUtils.cpp | 1 + .../slsDetector/slsDetectorUtils.h | 8 - .../slsDetectorAnalysis/postProcessing.cpp | 182 +--------- 7 files changed, 247 insertions(+), 418 deletions(-) diff --git a/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp b/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp index eb902d955..c33b5767d 100644 --- a/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp +++ b/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp @@ -23,6 +23,7 @@ ID: $Id$ #include #include #include +#include using namespace std; @@ -267,30 +268,28 @@ multiSlsDetector::multiSlsDetector(int id) : slsDetectorUtils(), shmId(-1) getNMods(); getMaxMods(); threadpool = 0; - zmqthreadpool = 0; - if(createThreadPool(&threadpool) == FAIL) + if(createThreadPool() == FAIL) exit(-1); } multiSlsDetector::~multiSlsDetector() { //removeSlsDetector(); - destroyThreadPool(&threadpool); - destroyThreadPool(&zmqthreadpool); + destroyThreadPool(); } -int multiSlsDetector::createThreadPool(ThreadPool** t){ - if(*t){ - (ThreadPool*)(*t)->destroy_threadpool(); - *t=0; +int multiSlsDetector::createThreadPool(){ + if(threadpool){ + threadpool->destroy_threadpool(); + threadpool=0; } if(thisMultiDetector->numberOfDetectors < 1){ cout << "No detectors attached to create threadpool" << endl; return OK; } - *t = new ThreadPool(thisMultiDetector->numberOfDetectors); - switch(((ThreadPool*)(*t))->initialize_threadpool()){ + threadpool = new ThreadPool(thisMultiDetector->numberOfDetectors); + switch(threadpool->initialize_threadpool()){ case 0: cerr << "Failed to initialize thread pool!" << endl; return FAIL; @@ -301,19 +300,19 @@ int multiSlsDetector::createThreadPool(ThreadPool** t){ break; default: #ifdef VERBOSE - cout << "Initialized Threadpool " << *t << endl; + cout << "Initialized Threadpool " << threadpool << endl; #endif break; } return OK; } -void multiSlsDetector::destroyThreadPool(ThreadPool** t){ - if(*t){ - (ThreadPool*)(*t)->destroy_threadpool(); - *t=0; +void multiSlsDetector::destroyThreadPool(){ + if(threadpool){ + threadpool->destroy_threadpool(); + threadpool=0; #ifdef VERBOSE - cout<<"Destroyed Threadpool "<< *t << endl; + cout<<"Destroyed Threadpool "<< threadpool << endl; #endif } } @@ -394,7 +393,7 @@ int multiSlsDetector::addSlsDetector(int id, int pos) { //set offsets updateOffsets(); - if(createThreadPool(&threadpool) == FAIL) + if(createThreadPool() == FAIL) exit(-1); @@ -865,7 +864,7 @@ int multiSlsDetector::removeSlsDetector(int pos) { } updateOffsets(); - if(createThreadPool(&threadpool) == FAIL) + if(createThreadPool() == FAIL) exit(-1); return thisMultiDetector->numberOfDetectors; @@ -4957,17 +4956,113 @@ int multiSlsDetector::resetFramesCaught() { } +void* multiSlsDetector::startReceivingDataThread(void* this_pointer){ + ((multiSlsDetector*)this_pointer)->startReceivingData(); + return this_pointer; +} + + +void multiSlsDetector::startReceivingData(){ + + int ithread = currentThreadIndex; //set current thread value index + threadStarted = true; //let calling function know thread started and obtained current + + + int numReadoutPerDetector = 1; + bool jungfrau = false; + if(getDetectorsType() == EIGER){ + numReadoutPerDetector = 2; + }else if(getDetectorsType() == JUNGFRAU) + jungfrau = true; + + //server details + char hostname[100]; + int portno; + int singleDatabytes = detectors[ithread/numReadoutPerDetector]->getDataBytes(); + int nel=(singleDatabytes/numReadoutPerDetector)/sizeof(int); + portno = DEFAULT_ZMQ_PORTNO + (ithread); + sprintf(hostname, "%s%d", "tcp://127.0.0.1:",portno); + //cout << "ZMQ Client of " << ithread << " at " << hostname << endl; + singleframe[ithread]=new int[nel]; + + + //loop though the half readouts to start sockets + void *context; + void *zmqsocket; + context = zmq_ctx_new(); + zmqsocket = zmq_socket(context, ZMQ_PULL); + //zmq_setsockopt(zmqsocket, ZMQ_SUBSCRIBE, "", 0); // an empty string implies receiving any messages + zmq_connect(zmqsocket, hostname); // connect to publisher,the publisher server does not have to be started + pthread_mutex_lock(&ms); + receivingDataThreadMask|=(1<<(ithread)); + //cout<setzeromqThread(); //for debugging //determine number of half readouts and maxX and maxY int maxX=0,maxY=0; @@ -4980,125 +5075,131 @@ void multiSlsDetector::readFrameFromReceiver(){ int numReadouts = numReadoutPerDetector * thisMultiDetector->numberOfDetectors; - - //start all socket tasks - volatile uint64_t runningMask = 0x0; - int slsdatabytes = 0, slsmaxchannels = 0, bytesperchannel = 0, slsmaxX = 0, slsmaxY=0; - if(!zmqthreadpool){ - cout << "Error in creating threadpool. Exiting" << endl; - return; - }else{ - for(int idet=0; idetnumberOfDetectors; idet++){ - if(detectors[idet]){ - sem_init(&sem_slswait[idet*numReadoutPerDetector],1,0); - sem_init(&sem_slsdone[idet*numReadoutPerDetector],1,0); - sem_init(&sem_multiwait[idet*numReadoutPerDetector],1,0); - if(numReadoutPerDetector>1){ - sem_init(&sem_slswait[idet*numReadoutPerDetector+1],1,0); - sem_init(&sem_slsdone[idet*numReadoutPerDetector+1],1,0); - sem_init(&sem_multiwait[idet*numReadoutPerDetector+1],1,0); - } - Task* task = new Task(new func00_t(&slsDetector::readFrameFromReceiver,detectors[idet])); - zmqthreadpool->add_task(task); - if(!slsdatabytes){ - slsdatabytes = detectors[idet]->getDataBytes(); - slsmaxchannels = detectors[idet]->getMaxNumberOfChannels(); - bytesperchannel = slsdatabytes/slsmaxchannels; - slsmaxX = detectors[idet]->getTotalNumberOfChannels(X); - slsmaxY = detectors[idet]->getTotalNumberOfChannels(Y); - } - //set mask - runningMask|=(1<<(idet*numReadoutPerDetector)); - if(numReadoutPerDetector>1) - runningMask|=(1<<(idet*numReadoutPerDetector+1)); - - } + //create threads + /** Data Callback Threads */ + pthread_t receivingDataThreads[numReadouts]; + volatile uint64_t expectedMask = 0x0; + receivingDataThreadMask = 0x0; + currentThreadIndex = -1; + for(int i = 0; i < numReadouts; ++i){ + threadStarted = false; + currentThreadIndex = i; + sem_init(&sem_singlewait[i],1,0); + sem_init(&sem_singledone[i],1,0); + if(pthread_create(&receivingDataThreads[i], NULL,startReceivingDataThread, (void*) this)){ + cprintf(RED, "ERROR: Could not create receiving thread with index %d\n",i); + return; } + while(!threadStarted); + //cout << "Data Thread created successfully for " << i << endl; + expectedMask|=(1<startExecuting(); //tell them to start - for(int i=0;igetDataBytes(); + slsmaxchannels = detectors[0]->getMaxNumberOfChannels(); + bytesperchannel = slsdatabytes/slsmaxchannels; + slsmaxX = detectors[0]->getTotalNumberOfChannels(X); + slsmaxY = detectors[0]->getTotalNumberOfChannels(Y); + } int nel=(thisMultiDetector->dataBytes)/sizeof(int); if(nel <= 0){ - cout << "Multislsdetector databytes not valid :" << thisMultiDetector->dataBytes << endl; + cprintf(RED,"Error: Multislsdetector databytes not valid : %d\n", thisMultiDetector->dataBytes); return; } int* multiframe=new int[nel]; int* p = multiframe; int idet,offsetY,offsetX; int halfreadoutoffset = (slsmaxX/numReadoutPerDetector); - //after reconstruction int framecount=0; int nx =getTotalNumberOfChannels(slsDetectorDefs::X); int ny =getTotalNumberOfChannels(slsDetectorDefs::Y); + + + sem_post(&dataThreadStartedSemaphore); //let utils:acquire continue to start measurement/acquisition + //cprintf(BLUE,"all sockets created\n"); + + //construct complete image and send to callback while(true){ memset(((char*)multiframe),0x0,slsdatabytes*thisMultiDetector->numberOfDetectors); //reset frame memory + + //post all of them to start + for(int ireadout=0; ireadoutoffsetY[idet] + slsmaxY)) * maxX * bytesperchannel; + //the left half or right half + if(!(ireadout%numReadoutPerDetector)) + offsetX = thisMultiDetector->offsetX[idet]; + else + offsetX = thisMultiDetector->offsetX[idet] + halfreadoutoffset; + offsetX *= bytesperchannel; + //cprintf(BLUE,"offsetx:%d offsety:%d maxx:%d slsmaxX:%d slsmaxY:%d bytesperchannel:%d\n", + // offsetX,offsetY,maxX,slsmaxX,slsmaxY,bytesperchannel); + // cprintf(BLUE,"copying bytes:%d\n", (slsmaxX/numReadoutPerDetector)*bytesperchannel); + //itnerleaving with other detectors + + //bottom + if(((idet+1)%2) == 0){ + for(int i=0;ioffsetY[idet] + slsmaxY)) * maxX * bytesperchannel; - //the left half or right half - if(!(ireadout%numReadoutPerDetector)) - offsetX = thisMultiDetector->offsetX[idet]; - else - offsetX = thisMultiDetector->offsetX[idet] + halfreadoutoffset; - offsetX *= bytesperchannel; - //cprintf(BLUE,"offsetx:%d offsety:%d maxx:%d slsmaxX:%d slsmaxY:%d bytesperchannel:%d\n", - // offsetX,offsetY,maxX,slsmaxX,slsmaxY,bytesperchannel); - //cprintf(BLUE,"copying bytes:%d\n", (slsmaxX/numReadout)*bytesperchannel); - //itnerleaving with other detectors - - //bottom - if(((idet+1)%2) == 0){ - for(int i=0;iwait_for_tasks_to_complete(); - for(int i=0;i #include #include "gitInfoLib.h" -#include int slsDetector::initSharedMemory(detectorType type, int id) { @@ -7145,108 +7144,6 @@ int slsDetector::resetFramesCaught(){ -void slsDetector::readFrameFromReceiver(){ - - //determine number of half readouts - int numReadout = 1; - if(thisDetector->myDetectorType == EIGER) numReadout = 2; - int readoutId = detId*numReadout; - volatile uint64_t runningMask = 0x0; - - //server details - char hostname[numReadout][100]; - int portno[numReadout]; - int nel=(thisDetector->dataBytes/numReadout)/sizeof(int); - for(int i=0;islsframe[readoutId+i]=new int[nel]; - } - - - //loop though the half readouts to start sockets - void *context[numReadout]; - void *zmqsocket[numReadout]; - for(int i=0;isem_multiwait[readoutId+i]); //let multi know socket created - } - - //receive msgs and let multi know - zmq_msg_t message; - int len,idet = 0; - int framecount=0; - - //read frame - while(true){ - for(int idet=0; idetsem_slswait[readoutId+idet]);//wait for it to be copied - - if(!idet) framecount++; //update indices, count only once - - // receive a message, this is a blocking function - len = zmq_msg_init (&message); /* is this required? Xiaoqiang didnt have it*/ - if(len) {cprintf(RED,"Failed to initialize message %d for %d\n",len,readoutId+idet); continue; }//error - len = zmq_msg_recv(&message, zmqsocket[idet], 0); - - //end of socket - if (len <= 3 ) { - if(!len) cprintf(RED,"Received no data in socket for %d\n", readoutId+idet); - //cout<slsframe[readoutId+idet] = NULL; - sem_post(&parentDet->sem_slsdone[readoutId+idet]); //let multi know is ready - runningMask^=(1<dataBytes/numReadout){//hoow to solve this - memcpy((char*)(parentDet->slsframe[readoutId+idet]),(char*)zmq_msg_data(&message),thisDetector->dataBytes/numReadout); - //check header, if incorrect frame, copy somewhere and assign a blank subframe and also check size - - //jungfrau masking adcval - if(thisDetector->myDetectorType == JUNGFRAU){ - for(unsigned int i=0;islsframe[readoutId+idet][i] = (parentDet->slsframe[readoutId+idet][i] & 0x3FFF3FFF); - } - } - //} - sem_post(&parentDet->sem_slsdone[readoutId+idet]);//let multi know is ready - } - }//end of for loop - - if(!runningMask){ - break; - } - } - zmq_msg_close(&message); - - //close socket - for(int i=0;islsframe[readoutId+i]; - } -} - - - - - int slsDetector::lockReceiver(int lock){ int fnum=F_LOCK_RECEIVER; diff --git a/slsDetectorSoftware/slsDetector/slsDetector.h b/slsDetectorSoftware/slsDetector/slsDetector.h index 7ae8579fc..58a5f7a61 100644 --- a/slsDetectorSoftware/slsDetector/slsDetector.h +++ b/slsDetectorSoftware/slsDetector/slsDetector.h @@ -1570,7 +1570,7 @@ class slsDetector : public slsDetectorUtils, public energyConversion { /** Reads frames from receiver through a constant socket */ - void readFrameFromReceiver(); + void readFrameFromReceiver(){}; /** Locks/Unlocks the connection to the receiver /param lock sets (1), usets (0), gets (-1) the lock diff --git a/slsDetectorSoftware/slsDetector/slsDetectorUtils.cpp b/slsDetectorSoftware/slsDetector/slsDetectorUtils.cpp index cc10d0c12..cfae1c279 100644 --- a/slsDetectorSoftware/slsDetector/slsDetectorUtils.cpp +++ b/slsDetectorSoftware/slsDetector/slsDetectorUtils.cpp @@ -350,6 +350,7 @@ int slsDetectorUtils::acquire(int delflag){ pthread_mutex_lock(&mg); stopReceiver(); pthread_mutex_unlock(&mg); + // cout<<"***********receiver stopped"< 0){ -#ifdef VERY_VERY_DEBUG - if(acquiringDone == 1) cout << "acquiring seems to be done" << endl; -#endif - - - //IF GUI, check for last frames (counter upto 5) - if(dataReady){ - pthread_mutex_lock(&mg); - acquiringDone++; - pthread_mutex_unlock(&mg); -#ifdef VERY_VERY_DEBUG - cout << "acquiringDone :" << acquiringDone << endl; -#endif - } - - - //post to stopReceiver in acquire(), but continue reading frames - if (!dataReady || (acquiringDone >= 5)){ - if(!dataReady || (!nthframe) ||(!newData)){ -#ifdef VERY_VERY_DEBUG - cout << "gonna post for it to end" << endl; -#endif - sem_post(&sem_queue); -#ifdef VERY_VERY_DEBUG - cout << "Sem posted" << endl; -#endif - } - } - } - //random reads and for nthframe, checks if there is no new data - else if((!nthframe) ||(!newData)){ - //cout <<"cecking now" << endl; - if (checkJoinThread()){ - break; - } - } - - - - - - //for random reads, ask only if it has new data - if(!newData){ - if(caught > progress){ - newData = true; - - // If new data and acquiringDone>0 (= det acq over), reset to get more frames - if(dataReady && (acquiringDone > 0)){ - pthread_mutex_lock(&mg); - acquiringDone = 1; -#ifdef VERY_VERY_DEBUG - cout << "Keeping acquiringDone at 1 " << endl; -#endif - pthread_mutex_unlock(&mg); - } - - } - } - - - - if(newData){ -#ifdef VERY_VERY_DEBUG - cout << "new data" << endl; -#endif - //no gui - if (!dataReady){ - progress = caught; -#ifdef VERY_VERY_DEBUG - cout << "progress:" << progress << endl; -#endif - newData = false; -#ifdef VERY_VERY_DEBUG - cout << "newData set to false" << endl; -#endif - } - //gui - else{ - if(setReceiverOnline()==ONLINE_FLAG){ - //get data - strcpy(currentfName,""); - pthread_mutex_lock(&mg); - //int* receiverData = new int [getTotalNumberOfChannels()]; - int* receiverData = readFrameFromReceiver(currentfName,currentAcquisitionIndex,currentFrameIndex,currentSubFrameIndex); - pthread_mutex_unlock(&mg); - - //if detector returned null - if(setReceiverOnline()==OFFLINE_FLAG) - receiverData = NULL; - - //no data or wrong data for print out - if(receiverData == NULL){ - currentAcquisitionIndex = -1; - cout<<"****Detector Data returned is NULL***"< progress){ -#ifdef VERY_VERY_DEBUG - cout << "GOT data" << endl; -#endif - fdata = decodeData(receiverData); - delete [] receiverData; - if ((fdata) && (dataReady)){ - // cout << "DATAREADY 3" << endl; - thisData = new detectorData(fdata,NULL,NULL,getCurrentProgress(),currentfName,nx,ny); - dataReady(thisData, currentFrameIndex, currentSubFrameIndex, pCallbackArg); - delete thisData; - fdata = NULL; - progress = caught; -#ifdef VERY_VERY_DEBUG - cout << "progress:" << progress << endl; -#endif - newData = false; -#ifdef VERY_VERY_DEBUG - cout << "newData set to false" << endl; -#endif - } - } - } - } - } - - } -*/ - } return 0;