somewhere, but weird threads

This commit is contained in:
Dhanya Maliakal 2016-10-05 15:26:58 +02:00
parent 39baeade37
commit 6da59ca382
8 changed files with 145 additions and 220 deletions

View File

@ -63,7 +63,7 @@ using namespace std;
#define RATE_CORRECTION_NO_TAU_PROVIDED 0x0000000001000000ULL #define RATE_CORRECTION_NO_TAU_PROVIDED 0x0000000001000000ULL
#define PROGRAMMING_ERROR 0x0000000002000000ULL #define PROGRAMMING_ERROR 0x0000000002000000ULL
#define RECEIVER_ACTIVATE 0x0000000004000000ULL #define RECEIVER_ACTIVATE 0x0000000004000000ULL
#define DATA_STREAMING_IN_RECEIVER 0x0000000008000000ULL #define DATA_STREAMING 0x0000000008000000ULL
// 0x00000000FFFFFFFFULL // 0x00000000FFFFFFFFULL
/** @short class returning all error messages for error mask */ /** @short class returning all error messages for error mask */
@ -207,8 +207,8 @@ public:
if(slsErrorMask&RECEIVER_ACTIVATE) if(slsErrorMask&RECEIVER_ACTIVATE)
retval.append("Could not activate/deactivate receiver\n"); retval.append("Could not activate/deactivate receiver\n");
if(slsErrorMask&DATA_STREAMING_IN_RECEIVER) if(slsErrorMask&DATA_STREAMING)
retval.append("Could not set/reset Data Streaming in Receiver\n"); retval.append("Could not set/reset Data Streaming\n");

View File

@ -4981,9 +4981,6 @@ int multiSlsDetector::createReceivingDataThreads(bool destroy){
//reset masks //reset masks
killAllReceivingDataThreads = false; killAllReceivingDataThreads = false;
pthread_mutex_lock(&ms);
receivingDataThreadMask = 0x0;
pthread_mutex_unlock(&(ms));
//destroy //destroy
if(destroy){ if(destroy){
@ -4992,13 +4989,10 @@ int multiSlsDetector::createReceivingDataThreads(bool destroy){
#endif #endif
killAllReceivingDataThreads = true; killAllReceivingDataThreads = true;
for(int i = 0; i < numReadouts; ++i){ for(int i = 0; i < numReadouts; ++i){
sem_post(&receivingDataSemaphore[i]); sem_post(&sem_singlewait[i]);
pthread_join(receivingDataThreads[i],NULL); pthread_join(receivingDataThreads[i],NULL);
sem_destroy(&receivingDataSemaphore[i]);
sem_destroy(&receivingDataSocketsCreatedSemaphore[i]);
sem_destroy(&sem_singlewait[i]); sem_destroy(&sem_singlewait[i]);
sem_destroy(&sem_singledone[i]); sem_destroy(&sem_singledone[i]);
delete [] singleframe[i];
#ifdef DEBUG #ifdef DEBUG
cout << "." << flush << endl; cout << "." << flush << endl;
#endif #endif
@ -5018,14 +5012,12 @@ int multiSlsDetector::createReceivingDataThreads(bool destroy){
currentThreadIndex = -1; currentThreadIndex = -1;
for(int i = 0; i < numReadouts; ++i){ for(int i = 0; i < numReadouts; ++i){
sem_init(&receivingDataSemaphore[i],1,0);
sem_init(&receivingDataSocketsCreatedSemaphore[i],1,0);
sem_init(&sem_singlewait[i],1,0); sem_init(&sem_singlewait[i],1,0);
sem_init(&sem_singledone[i],1,0); sem_init(&sem_singledone[i],1,0);
threadStarted = false; threadStarted = false;
currentThreadIndex = i; currentThreadIndex = i;
if(pthread_create(&receivingDataThreads[i], NULL,startReceivingDataThread, (void*) this)){ if(pthread_create(&receivingDataThreads[i], NULL,startReceivingDataThread, (void*) this)){
cout << "Could not create receiving data thread with index " << i << endl; cprintf(RED, "Could not create receiving data thread with index %d\n",i);
return FAIL; return FAIL;
} }
while(!threadStarted); while(!threadStarted);
@ -5033,39 +5025,13 @@ int multiSlsDetector::createReceivingDataThreads(bool destroy){
cout << "." << flush << endl; cout << "." << flush << endl;
#endif #endif
} }
//cout << "Receiving Data Thread(s) created" << endl; cout << "Receiving Data Thread(s) created" << endl;
for(int i=0;i<numReadouts;i++)
sem_wait(&receivingDataSocketsCreatedSemaphore[i]); //wait for the initial sockets created
cout << "Receiving Data Threads Ready" << endl;
sem_wait(&dataThreadStartedSemaphore); //wait for processing thread to be ready
cout << "Post Processing thread ready" << endl;
} }
return OK; return OK;
} }
int multiSlsDetector::startReceivingData(){
/**stopReceiving needed only if sockets will be terminated */
int numReadouts = thisMultiDetector->numberOfDetectors;
if(getDetectorsType() == EIGER)
numReadouts *= 2;
if(threadStarted){
for(int i=0;i<numReadouts;i++)
receivingDataThreadMask|=(1<<i);
for(int i=0;i<numReadouts;i++)
sem_post(&receivingDataSemaphore[i]);
for(int i=0;i<numReadouts;i++)
sem_wait(&receivingDataSocketsCreatedSemaphore[i]); //wait for the sockets created
cout << "Receiving data sockets created" << endl;
}else
return FAIL;
return OK;
}
void* multiSlsDetector::startReceivingDataThread(void* this_pointer){ void* multiSlsDetector::startReceivingDataThread(void* this_pointer){
((multiSlsDetector*)this_pointer)->startReceivingDataThread(); ((multiSlsDetector*)this_pointer)->startReceivingDataThread();
@ -5076,9 +5042,9 @@ void* multiSlsDetector::startReceivingDataThread(void* this_pointer){
void multiSlsDetector::startReceivingDataThread(){ void multiSlsDetector::startReceivingDataThread(){
int ithread = currentThreadIndex; //set current thread value index int ithread = currentThreadIndex; //set current thread value index
threadStarted = true; //let calling function know thread started and obtained current
//cout << ithread << " thread created" << endl; //cout << ithread << " thread created" << endl;
//number of readouts
int numReadoutPerDetector = 1; int numReadoutPerDetector = 1;
bool jungfrau = false; bool jungfrau = false;
if(getDetectorsType() == EIGER){ if(getDetectorsType() == EIGER){
@ -5093,29 +5059,32 @@ void multiSlsDetector::startReceivingDataThread(){
int nel=(singleDatabytes/numReadoutPerDetector)/sizeof(int); int nel=(singleDatabytes/numReadoutPerDetector)/sizeof(int);
portno = DEFAULT_ZMQ_PORTNO + (ithread); portno = DEFAULT_ZMQ_PORTNO + (ithread);
sprintf(hostname, "%s%d", "tcp://127.0.0.1:",portno); sprintf(hostname, "%s%d", "tcp://127.0.0.1:",portno);
//cout << "ZMQ Client of " << ithread << " at " << hostname << endl; cout << "ZMQ Client of " << ithread << " at " << hostname << endl;
singleframe[ithread]=new int[nel];
/* outer loop - loops once for each acquisition */
//infinite loop, exited only at the end of acquire()
while(true){
//socket details
zmq_msg_t message; zmq_msg_t message;
int len,idet = 0;
void *context; void *context;
void *zmqsocket; void *zmqsocket;
context = zmq_ctx_new(); context = zmq_ctx_new();
zmqsocket = zmq_socket(context, ZMQ_PULL); zmqsocket = zmq_socket(context, ZMQ_PULL);
zmq_connect(zmqsocket, hostname); zmq_connect(zmqsocket, hostname);
//cprintf(BLUE,"%d ZMQ Client Socket at %s\n",ithread, hostname); threadStarted = true; //let calling function know thread started and obtained current
sem_post(&receivingDataSocketsCreatedSemaphore[ithread]);
/* inner loop - loop for each buffer */ //initializations
//enters at receiver start and exits at receiver stop singleframe[ithread]=new int[nel];
while((1 << ithread) & receivingDataThreadMask){ int len,idet = 0;
//infinite loop, exited only (if gui restarted/ enabledatastreaming called)
while(true){
sem_wait(&sem_singlewait[ithread]); //wait for it to be copied sem_wait(&sem_singlewait[ithread]); //wait for it to be copied
//check to exit thread
if(killAllReceivingDataThreads){
delete [] singleframe[ithread];
break;
}
//scan header------------------------------------------------------------------- //scan header-------------------------------------------------------------------
zmq_msg_init (&message); zmq_msg_init (&message);
@ -5176,12 +5145,7 @@ void multiSlsDetector::startReceivingDataThread(){
//#endif //#endif
zmq_msg_close(&message); zmq_msg_close(&message);
singleframe[ithread] = NULL; singleframe[ithread] = NULL;
pthread_mutex_lock(&ms);
receivingDataThreadMask^=(1<<ithread);
pthread_mutex_unlock(&ms);
sem_post(&sem_singledone[ithread]); //let multi know is ready sem_post(&sem_singledone[ithread]); //let multi know is ready
break; break;
} }
//actual data //actual data
@ -5198,27 +5162,17 @@ void multiSlsDetector::startReceivingDataThread(){
sem_post(&sem_singledone[ithread]);//let multi know is ready sem_post(&sem_singledone[ithread]);//let multi know is ready
zmq_msg_close(&message); // close the message zmq_msg_close(&message); // close the message
}
}/*--end of loop for each buffer (inner loop)*/
//close socket //close socket
zmq_disconnect(zmqsocket, hostname); zmq_disconnect(zmqsocket, hostname);
zmq_close(zmqsocket); zmq_close(zmqsocket);
zmq_ctx_destroy(context); zmq_ctx_destroy(context);
//end of acquisition, wait for next acquisition/change of parameters
sem_wait(&receivingDataSemaphore[ithread]);
//check to exit thread (for change of parameters) - only EXIT possibility
if(killAllReceivingDataThreads){
#ifdef DEBUG #ifdef DEBUG
cprintf(MAGENTA,"Receiving Data Thread %d:Goodbye!\n",ithread); cprintf(MAGENTA,"Receiving Data Thread %d:Goodbye!\n",ithread);
#endif #endif
pthread_exit(NULL);
}
}/*--end of loop for each acquisition (outer loop) */
} }
@ -5264,13 +5218,9 @@ void multiSlsDetector::readFrameFromReceiver(){
int ny =getTotalNumberOfChannels(slsDetectorDefs::Y); int ny =getTotalNumberOfChannels(slsDetectorDefs::Y);
volatile uint64_t dataThreadMask = 0x0;
sem_post(&dataThreadStartedSemaphore); //let utils:acquire continue to start measurement/acquisition
volatile uint64_t expectedMask = 0x0;
for(int i = 0; i < numReadouts; ++i) for(int i = 0; i < numReadouts; ++i)
expectedMask|=(1<<i); dataThreadMask|=(1<<i);
while(receivingDataThreadMask != expectedMask);//wait for all receibvin threads to be ready
@ -5280,7 +5230,7 @@ void multiSlsDetector::readFrameFromReceiver(){
//post all of them to start //post all of them to start
for(int ireadout=0; ireadout<numReadouts; ++ireadout){ for(int ireadout=0; ireadout<numReadouts; ++ireadout){
if((1 << ireadout) & receivingDataThreadMask){ if((1 << ireadout) & dataThreadMask){
sem_post(&sem_singlewait[ireadout]); //sls to continue sem_post(&sem_singlewait[ireadout]); //sls to continue
} }
} }
@ -5289,12 +5239,13 @@ void multiSlsDetector::readFrameFromReceiver(){
for(int ireadout=0; ireadout<numReadouts; ++ireadout){ for(int ireadout=0; ireadout<numReadouts; ++ireadout){
//cprintf(BLUE,"multi checking %d mask:0x%x\n",ireadout,receivingDataThreadMask); //cprintf(BLUE,"multi checking %d mask:0x%x\n",ireadout,receivingDataThreadMask);
idet = ireadout/numReadoutPerDetector; idet = ireadout/numReadoutPerDetector;
if((1 << ireadout) & receivingDataThreadMask){ //if running if((1 << ireadout) & dataThreadMask){ //if running
sem_wait(&sem_singledone[ireadout]); //wait for sls to copy sem_wait(&sem_singledone[ireadout]); //wait for sls to copy
//this socket closed //this socket closed
if(!((1 << ireadout) & receivingDataThreadMask)){ //if running if(singleframe[ireadout] == NULL){ //if got nothing
dataThreadMask^=(1<<ireadout);
continue; continue;
} }
@ -5336,10 +5287,9 @@ void multiSlsDetector::readFrameFromReceiver(){
} }
} }
//all done
if(!receivingDataThreadMask){ if(!dataThreadMask)
break; break;
}
//send data to callback //send data to callback
@ -5351,14 +5301,9 @@ void multiSlsDetector::readFrameFromReceiver(){
fdata = NULL; fdata = NULL;
//cout<<"Send frame #"<< currentFrameIndex << " to gui"<<endl; //cout<<"Send frame #"<< currentFrameIndex << " to gui"<<endl;
} }
setCurrentProgress(currentAcquisitionIndex+1); setCurrentProgress(currentAcquisitionIndex+1);
} }
#ifdef DEBUG
cout << "All zmq sockets closed" << endl;
#endif
//free resources //free resources
delete[] multiframe; delete[] multiframe;
} }
@ -5584,10 +5529,27 @@ int multiSlsDetector::setReadReceiverFrequency(int getFromReceiver, int freq){
} }
// only called from gui or that wants zmq data packets
int multiSlsDetector::enableDataStreamingFromReceiver(int enable){ int multiSlsDetector::enableDataStreamingFromReceiver(int enable){
int ret=-100, ret1;
if(enable >= 0){
//destroy data threads
if(threadStarted)
createReceivingDataThreads(true);
//create data threads if enable is 1
if(enable == 1)
if(createReceivingDataThreads() == FAIL){
std::cout << "Could not create data threads in client. Aborting creating data threads in receiver" << std::endl;
//only for the first det as theres no general one
setErrorMask(getErrorMask()|(1<<0));
detectors[0]->setErrorMask((detectors[0]->getErrorMask())|(DATA_STREAMING));
return -1;
}
}
int ret=-100, ret1;
for (int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++) { for (int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++) {
if (detectors[idet]) { if (detectors[idet]) {
ret1=detectors[idet]->enableDataStreamingFromReceiver(enable); ret1=detectors[idet]->enableDataStreamingFromReceiver(enable);
@ -5600,6 +5562,7 @@ int multiSlsDetector::enableDataStreamingFromReceiver(int enable){
} }
} }
return ret; return ret;
} }

View File

@ -1191,11 +1191,6 @@ class multiSlsDetector : public slsDetectorUtils {
*/ */
int createReceivingDataThreads(bool destroy = false); int createReceivingDataThreads(bool destroy = false);
/**
* Start Receiving Data Threads
* @return OK or FAIL
*/
int startReceivingData();
/** Reads frames from receiver through a constant socket /** Reads frames from receiver through a constant socket
@ -1401,15 +1396,10 @@ private:
char currentFileName[MAX_STR_LENGTH]; char currentFileName[MAX_STR_LENGTH];
pthread_t receivingDataThreads[MAXDET]; pthread_t receivingDataThreads[MAXDET];
sem_t receivingDataSemaphore[MAXDET];
/** Ensures if threads created successfully */ /** Ensures if threads created successfully */
bool threadStarted; bool threadStarted;
/** Current Thread Index*/ /** Current Thread Index*/
int currentThreadIndex; int currentThreadIndex;
/** Mask with each bit indicating status of each receiving data thread */
volatile uint64_t receivingDataThreadMask;
/** Semaphore indicating socket created for each receiving data thread */
sem_t receivingDataSocketsCreatedSemaphore[MAXDET];
/** Set to self-terminate data receiving threads waiting for semaphores */ /** Set to self-terminate data receiving threads waiting for semaphores */
bool killAllReceivingDataThreads; bool killAllReceivingDataThreads;

View File

@ -7720,7 +7720,7 @@ int slsDetector::enableDataStreamingFromReceiver(int enable){
if ((enable > 0) && (retval != enable)){ if ((enable > 0) && (retval != enable)){
cout << "could not set data streaming in receiver to " << enable <<" Returned:" << retval << endl; cout << "could not set data streaming in receiver to " << enable <<" Returned:" << retval << endl;
setErrorMask((getErrorMask())|(RECEIVER_READ_FREQUENCY)); setErrorMask((getErrorMask())|(DATA_STREAMING));
} }
return retval; return retval;
} }

View File

@ -1580,11 +1580,6 @@ class slsDetector : public slsDetectorUtils, public energyConversion {
*/ */
int createReceivingDataThreads(bool destroy = false){}; int createReceivingDataThreads(bool destroy = false){};
/**
* Start Receiving Data Threads
* @return OK or FAIL
*/
int startReceivingData(){};
/** Reads frames from receiver through a constant socket /** Reads frames from receiver through a constant socket
*/ */

View File

@ -172,13 +172,8 @@ int slsDetectorUtils::acquire(int delflag){
} }
if (*threadedProcessing) { if (*threadedProcessing)
sem_init(&dataThreadStartedSemaphore,1,0);
startThread(delflag); startThread(delflag);
if(dataReady)
createReceivingDataThreads();
}
#ifdef VERBOSE #ifdef VERBOSE
cout << " starting thread " << endl; cout << " starting thread " << endl;
#endif #endif
@ -188,6 +183,7 @@ int slsDetectorUtils::acquire(int delflag){
resetFramesCaught(); resetFramesCaught();
} }
for(int im=0;im<nm;im++) { for(int im=0;im<nm;im++) {
#ifdef VERBOSE #ifdef VERBOSE
@ -314,11 +310,6 @@ int slsDetectorUtils::acquire(int delflag){
break; break;
} }
pthread_mutex_unlock(&mg); pthread_mutex_unlock(&mg);
//start the receiving sockets in their threads
if(*threadedProcessing && dataReady){
startReceivingData();
}
} }
#ifdef VERBOSE #ifdef VERBOSE
cout << "Acquiring " << endl; cout << "Acquiring " << endl;
@ -488,19 +479,11 @@ int slsDetectorUtils::acquire(int delflag){
#ifdef VERBOSE #ifdef VERBOSE
cout << "wait for data processing thread" << endl; cout << "wait for data processing thread" << endl;
#endif #endif
if(dataReady){
//if mask is not cleared, clear it
}
setJoinThread(1); setJoinThread(1);
pthread_join(dataProcessingThread, &status); pthread_join(dataProcessingThread, &status);
#ifdef VERBOSE #ifdef VERBOSE
cout << "data processing thread joined" << endl; cout << "data processing thread joined" << endl;
#endif #endif
if(dataReady){
createReceivingDataThreads(true);
sem_destroy(&dataThreadStartedSemaphore);
}
} }

View File

@ -653,11 +653,6 @@ virtual int resetFramesCaught()=0;
*/ */
virtual int createReceivingDataThreads(bool destroy = false)=0; virtual int createReceivingDataThreads(bool destroy = false)=0;
/**
* Start Receiving Data Threads
* @return OK or FAIL
*/
virtual int startReceivingData()=0;
/** Reads frames from receiver through a constant socket /** Reads frames from receiver through a constant socket
*/ */

View File

@ -333,7 +333,6 @@ s
int (*dataReady)(detectorData*,int, int,void*); int (*dataReady)(detectorData*,int, int,void*);
void *pCallbackArg; void *pCallbackArg;
detectorData *thisData; detectorData *thisData;
sem_t dataThreadStartedSemaphore;
private: private:
// double *fdata; // double *fdata;