creating and destroying sockets for each receiver start and stop in a single acquisition (to deal with scans etc)

This commit is contained in:
Dhanya Maliakal 2016-09-21 16:37:29 +02:00
parent a1df8bdc15
commit d0501c3139
6 changed files with 289 additions and 143 deletions

View File

@ -4958,17 +4958,111 @@ int multiSlsDetector::resetFramesCaught() {
}
int multiSlsDetector::createReceivingDataThreads(bool destroy){
int numReadouts = thisMultiDetector->numberOfDetectors;
if(getDetectorsType() == EIGER)
numReadouts *= 2;
//reset masks
killAllReceivingDataThreads = false;
pthread_mutex_lock(&ms);
receivingDataThreadMask = 0x0;
pthread_mutex_unlock(&(ms));
//destroy
if(destroy){
#ifdef DEBUG
cout << "Destroying Receiving Data Thread(s)" << endl;
#endif
killAllReceivingDataThreads = true;
for(int i = 0; i < numReadouts; ++i){
sem_post(&receivingDataSemaphore[i]);
pthread_join(receivingDataThreads[i],NULL);
sem_destroy(&receivingDataSemaphore[i]);
sem_destroy(&receivingDataSocketsCreatedSemaphore[i]);
sem_destroy(&sem_singlewait[i]);
sem_destroy(&sem_singledone[i]);
delete [] singleframe[i];
#ifdef DEBUG
cout << "." << flush << endl;
#endif
}
killAllReceivingDataThreads = false;
threadStarted = false;
cout << "Destroyed Receiving Data Thread(s)" << endl;
}
//create
else{
#ifdef DEBUG
cout << "Creating Receiving Data Thread(s)" << endl;
#endif
//reset current index
currentThreadIndex = -1;
for(int i = 0; i < numReadouts; ++i){
sem_init(&receivingDataSemaphore[i],1,0);
sem_init(&receivingDataSocketsCreatedSemaphore[i],1,0);
sem_init(&sem_singlewait[i],1,0);
sem_init(&sem_singledone[i],1,0);
threadStarted = false;
currentThreadIndex = i;
if(pthread_create(&receivingDataThreads[i], NULL,startReceivingDataThread, (void*) this)){
cout << "Could not create receiving data thread with index " << i << endl;
return FAIL;
}
while(!threadStarted);
#ifdef DEBUG
cout << "." << flush << endl;
#endif
}
//cout << "Receiving Data Thread(s) created" << endl;
for(int i=0;i<numReadouts;i++)
sem_wait(&receivingDataSocketsCreatedSemaphore[i]); //wait for the initial sockets created
cout << "Receiving Data Threads Ready" << endl;
sem_wait(&dataThreadStartedSemaphore); //wait for processing thread to be ready
cout << "Post Processing thread ready" << endl;
}
return OK;
}
int multiSlsDetector::startReceivingData(){
/**stopReceiving needed only if sockets will be terminated */
int numReadouts = thisMultiDetector->numberOfDetectors;
if(getDetectorsType() == EIGER)
numReadouts *= 2;
if(threadStarted){
for(int i=0;i<numReadouts;i++)
receivingDataThreadMask|=(1<<i);
for(int i=0;i<numReadouts;i++)
sem_post(&receivingDataSemaphore[i]);
for(int i=0;i<numReadouts;i++)
sem_wait(&receivingDataSocketsCreatedSemaphore[i]); //wait for the sockets created
cout << "Receiving data sockets created" << endl;
}else
return FAIL;
return OK;
}
void* multiSlsDetector::startReceivingDataThread(void* this_pointer){
((multiSlsDetector*)this_pointer)->startReceivingData();
((multiSlsDetector*)this_pointer)->startReceivingDataThread();
return this_pointer;
}
void multiSlsDetector::startReceivingData(){
void multiSlsDetector::startReceivingDataThread(){
int ithread = currentThreadIndex; //set current thread value index
threadStarted = true; //let calling function know thread started and obtained current
//cout << ithread << " thread created" << endl;
int numReadoutPerDetector = 1;
bool jungfrau = false;
@ -4987,111 +5081,129 @@ void multiSlsDetector::startReceivingData(){
//cout << "ZMQ Client of " << ithread << " at " << hostname << endl;
singleframe[ithread]=new int[nel];
//loop though the half readouts to start sockets
zmq_msg_t message;
int len,idet = 0;
void *context;
void *zmqsocket;
context = zmq_ctx_new();
zmqsocket = zmq_socket(context, ZMQ_PULL);
zmq_connect(zmqsocket, hostname); // connect to publisher,the publisher server does not have to be started
pthread_mutex_lock(&ms);
receivingDataThreadMask|=(1<<(ithread));
pthread_mutex_unlock(&ms);
//read frame
/* outer loop - loops once for each acquisition */
//infinite loop, exited only at the end of acquire()
while(true){
sem_wait(&sem_singlewait[ithread]); //wait for it to be copied
zmq_msg_t message;
int len,idet = 0;
void *context;
void *zmqsocket;
context = zmq_ctx_new();
zmqsocket = zmq_socket(context, ZMQ_PULL);
zmq_connect(zmqsocket, hostname);
//cprintf(BLUE,"%d ZMQ Client Socket at %s\n",ithread, hostname);
sem_post(&receivingDataSocketsCreatedSemaphore[ithread]);
//scan header-------------------------------------------------------------------
zmq_msg_init (&message);
len = zmq_msg_recv(&message, zmqsocket, 0);
if (len == -1) {
zmq_msg_close(&message);
cprintf(RED, "%d message null\n",ithread);
continue;
}
// error if you print it
// cout << ithread << " header len:"<<len<<" value:"<< (char*)zmq_msg_data(&message)<<endl;
rapidjson::Document d;
d.Parse( (char*)zmq_msg_data(&message), zmq_msg_size(&message));
#ifdef VERYVERBOSE
// htype is an array of strings
rapidjson::Value::Array htype = d["htype"].GetArray();
for(int i=0; i< htype.Size(); i++)
std::cout << ithread << "htype: " << htype[i].GetString() << std::endl;
// shape is an array of ints
rapidjson::Value::Array shape = d["shape"].GetArray();
cout << ithread << "shape: ";
for(int i=0; i< shape.Size(); i++)
cout << ithread << shape[i].GetInt() << " ";
cout << endl;
cout << ithread << "type: " << d["type"].GetString() << endl;
#endif
if(!ithread){
currentAcquisitionIndex = d["acqIndex"].GetInt();
currentFrameIndex = d["fIndex"].GetInt();
currentSubFrameIndex = d["subfnum"].GetInt();
strcpy(currentFileName ,d["fname"].GetString());
#ifdef VERYVERBOSE
cout << "Acquisition index: " << currentAcquisitionIndex << endl;
cout << "Frame index: " << currentFrameIndex << endl;
cout << "Subframe index: " << currentSubFrameIndex << endl;
cout << "File name: " << currentFileName << endl;
#endif
if(currentFrameIndex ==-1) cprintf(RED,"multi frame index -1!!\n");
}
// close the message
zmq_msg_close(&message);
/* inner loop - loop for each buffer */
//enters at receiver start and exits at receiver stop
while((1 << ithread) & receivingDataThreadMask){
//scan data-------------------------------------------------------------------
zmq_msg_init (&message);
len = zmq_msg_recv(&message, zmqsocket, 0);
sem_wait(&sem_singlewait[ithread]); //wait for it to be copied
//end of socket ("end")
if (len < 1024*256 ) {
if(!len) cprintf(RED,"Received no data in socket for %d\n", ithread);
//#ifdef VERYVERBOSE
cprintf(RED,"End of socket for %d\n", ithread);
//#endif
zmq_msg_close(&message);
singleframe[ithread] = NULL;
pthread_mutex_lock(&ms);
receivingDataThreadMask^=(1<<ithread);
pthread_mutex_unlock(&ms);
sem_post(&sem_singledone[ithread]); //let multi know is ready
break;
}
//actual data
memcpy((char*)(singleframe[ithread]),(char*)zmq_msg_data(&message),singleDatabytes/numReadoutPerDetector);
//jungfrau masking adcval
if(jungfrau){
for(unsigned int i=0;i<nel;i++){
singleframe[ithread][i] = (singleframe[ithread][i] & 0x3FFF3FFF);
//scan header-------------------------------------------------------------------
zmq_msg_init (&message);
len = zmq_msg_recv(&message, zmqsocket, 0);
if (len == -1) {
zmq_msg_close(&message);
cprintf(RED, "%d message null\n",ithread);
continue;
}
// error if you print it
// cout << ithread << " header len:"<<len<<" value:"<< (char*)zmq_msg_data(&message)<<endl;
//cout << ithread << "header " << endl;
rapidjson::Document d;
d.Parse( (char*)zmq_msg_data(&message), zmq_msg_size(&message));
#ifdef VERYVERBOSE
// htype is an array of strings
rapidjson::Value::Array htype = d["htype"].GetArray();
for(int i=0; i< htype.Size(); i++)
std::cout << ithread << "htype: " << htype[i].GetString() << std::endl;
// shape is an array of ints
rapidjson::Value::Array shape = d["shape"].GetArray();
cout << ithread << "shape: ";
for(int i=0; i< shape.Size(); i++)
cout << ithread << shape[i].GetInt() << " ";
cout << endl;
cout << ithread << "type: " << d["type"].GetString() << endl;
#endif
if(!ithread){
currentAcquisitionIndex = d["acqIndex"].GetInt();
currentFrameIndex = d["fIndex"].GetInt();
currentSubFrameIndex = d["subfnum"].GetInt();
strcpy(currentFileName ,d["fname"].GetString());
#ifdef VERYVERBOSE
cout << "Acquisition index: " << currentAcquisitionIndex << endl;
cout << "Frame index: " << currentFrameIndex << endl;
cout << "Subframe index: " << currentSubFrameIndex << endl;
cout << "File name: " << currentFileName << endl;
#endif
if(currentFrameIndex ==-1) cprintf(RED,"multi frame index -1!!\n");
}
// close the message
zmq_msg_close(&message);
//scan data-------------------------------------------------------------------
zmq_msg_init (&message);
len = zmq_msg_recv(&message, zmqsocket, 0);
//end of socket ("end")
if (len < 1024*256 ) {
if(!len) cprintf(RED,"Received no data in socket for %d\n", ithread);
//#ifdef VERYVERBOSE
cprintf(RED,"End of socket for %d\n", ithread);
//#endif
zmq_msg_close(&message);
singleframe[ithread] = NULL;
pthread_mutex_lock(&ms);
receivingDataThreadMask^=(1<<ithread);
pthread_mutex_unlock(&ms);
sem_post(&sem_singledone[ithread]); //let multi know is ready
break;
}
//actual data
//cout << ithread << "data " << endl;
memcpy((char*)(singleframe[ithread]),(char*)zmq_msg_data(&message),singleDatabytes/numReadoutPerDetector);
//jungfrau masking adcval
if(jungfrau){
for(unsigned int i=0;i<nel;i++){
singleframe[ithread][i] = (singleframe[ithread][i] & 0x3FFF3FFF);
}
}
sem_post(&sem_singledone[ithread]);//let multi know is ready
zmq_msg_close(&message); // close the message
}/*--end of loop for each buffer (inner loop)*/
//close socket
zmq_disconnect(zmqsocket, hostname);
zmq_close(zmqsocket);
zmq_ctx_destroy(context);
//end of acquisition, wait for next acquisition/change of parameters
sem_wait(&receivingDataSemaphore[ithread]);
//check to exit thread (for change of parameters) - only EXIT possibility
if(killAllReceivingDataThreads){
#ifdef DEBUG
cprintf(MAGENTA,"Receiving Data Thread %d:Goodbye!\n",ithread);
#endif
pthread_exit(NULL);
}
sem_post(&sem_singledone[ithread]);//let multi know is ready
zmq_msg_close(&message); // close the message
}
//close socket
zmq_disconnect(zmqsocket, hostname);
zmq_close(zmqsocket);
zmq_ctx_destroy(context);
}/*--end of loop for each acquisition (outer loop) */
}
@ -5108,34 +5220,14 @@ void multiSlsDetector::readFrameFromReceiver(){
}
int numReadouts = numReadoutPerDetector * thisMultiDetector->numberOfDetectors;
//create threads
/** Data Callback Threads */
pthread_t receivingDataThreads[numReadouts];
volatile uint64_t expectedMask = 0x0;
receivingDataThreadMask = 0x0;
currentThreadIndex = -1;
//initializing variables
strcpy(currentFileName,"");
for(int i = 0; i < numReadouts; ++i){
threadStarted = false;
currentThreadIndex = i;
sem_init(&sem_singlewait[i],1,0);
sem_init(&sem_singledone[i],1,0);
if(pthread_create(&receivingDataThreads[i], NULL,startReceivingDataThread, (void*) this)){
cprintf(RED, "ERROR: Could not create receiving thread with index %d\n",i);
return;
}
while(!threadStarted);
//cout << "Data Thread created successfully for " << i << endl;
expectedMask|=(1<<i);
}
//cout<<"multi waiting for all threads to be created "<<hex<<receivingDataThreadMask<<" to be matched with " <<expectedMask<< endl;
//wait for the last few threads remaining to be ready
while(receivingDataThreadMask != expectedMask);
//cout<<"multi threads created"<<endl;
currentAcquisitionIndex = -1;
currentFrameIndex = -1;
currentSubFrameIndex = -1;
//getting values
int slsdatabytes = 0, slsmaxchannels = 0, bytesperchannel = 0, slsmaxX = 0, slsmaxY=0;
if(detectors[0]){
slsdatabytes = detectors[0]->getDataBytes();
@ -5158,9 +5250,14 @@ void multiSlsDetector::readFrameFromReceiver(){
sem_post(&dataThreadStartedSemaphore); //let utils:acquire continue to start measurement/acquisition
volatile uint64_t expectedMask = 0x0;
for(int i = 0; i < numReadouts; ++i)
expectedMask|=(1<<i);
while(receivingDataThreadMask != expectedMask);//wait for all receibvin threads to be ready
//construct complete image and send to callback
while(true){
@ -5197,7 +5294,7 @@ void multiSlsDetector::readFrameFromReceiver(){
offsetX *= bytesperchannel;
//cprintf(BLUE,"offsetx:%d offsety:%d maxx:%d slsmaxX:%d slsmaxY:%d bytesperchannel:%d\n",
// offsetX,offsetY,maxX,slsmaxX,slsmaxY,bytesperchannel);
// cprintf(BLUE,"copying bytes:%d\n", (slsmaxX/numReadoutPerDetector)*bytesperchannel);
// cprintf(BLUE,"copying bytes:%d\n", (slsmaxX/numReadoutPerDetector)*bytesperchannel);
//itnerleaving with other detectors
//bottom
@ -5243,15 +5340,11 @@ void multiSlsDetector::readFrameFromReceiver(){
setCurrentProgress(currentAcquisitionIndex+1);
}
cout << "All zmq sockets closed" << endl;
#ifdef DEBUG
cout << "All zmq sockets closed" << endl;
#endif
//free resources
for(int i = 0; i < numReadouts; ++i){
pthread_join(receivingDataThreads[i],NULL);
sem_destroy(&sem_singlewait[i]);
sem_destroy(&sem_singledone[i]);
delete [] singleframe[i];
}
delete[] multiframe;
}

View File

@ -1176,9 +1176,23 @@ class multiSlsDetector : public slsDetectorUtils {
/**
* resets framescaught
* @param index frames caught by receiver
*/
*/
int resetFramesCaught();
/**
* Create Receiving Data Threads
* @param destroy is true to destroy all the threads
* @return OK or FAIL
*/
int createReceivingDataThreads(bool destroy = false);
/**
* Start Receiving Data Threads
* @return OK or FAIL
*/
int startReceivingData();
/** Reads frames from receiver through a constant socket
*/
void readFrameFromReceiver();
@ -1368,7 +1382,7 @@ private:
/**
* Thread that receives data packets from receiver
*/
void startReceivingData();
void startReceivingDataThread();
/* synchronizing between zmq threads */
sem_t sem_singledone[MAXDET];
@ -1381,12 +1395,18 @@ private:
int currentSubFrameIndex;
char currentFileName[MAX_STR_LENGTH];
pthread_t receivingDataThreads[MAXDET];
sem_t receivingDataSemaphore[MAXDET];
/** Ensures if threads created successfully */
bool threadStarted;
/** Current Thread Index*/
int currentThreadIndex;
/** Mask with each bit indicating status of each receiving data thread */
volatile uint64_t receivingDataThreadMask;
/** Semaphore indicating socket created for each receiving data thread */
sem_t receivingDataSocketsCreatedSemaphore[MAXDET];
/** Set to self-terminate data receiving threads waiting for semaphores */
bool killAllReceivingDataThreads;
protected:

View File

@ -1568,6 +1568,19 @@ class slsDetector : public slsDetectorUtils, public energyConversion {
*/
int resetFramesCaught();
/**
* Create Receiving Data Threads
* @param destroy is true to destroy all the threads
* @return OK or FAIL
*/
int createReceivingDataThreads(bool destroy = false){};
/**
* Start Receiving Data Threads
* @return OK or FAIL
*/
int startReceivingData(){};
/** Reads frames from receiver through a constant socket
*/
void readFrameFromReceiver(){};

View File

@ -173,9 +173,11 @@ int slsDetectorUtils::acquire(int delflag){
if (*threadedProcessing) {
if(dataReady)
sem_init(&dataThreadStartedSemaphore,1,0);
startThread(delflag);
sem_init(&dataThreadStartedSemaphore,1,0);
startThread(delflag);
if(dataReady)
createReceivingDataThreads();
}
#ifdef VERBOSE
cout << " starting thread " << endl;
@ -186,9 +188,6 @@ int slsDetectorUtils::acquire(int delflag){
resetFramesCaught();
}
if(*threadedProcessing && dataReady)
sem_wait(&dataThreadStartedSemaphore);
for(int im=0;im<nm;im++) {
#ifdef VERBOSE
@ -315,6 +314,11 @@ int slsDetectorUtils::acquire(int delflag){
break;
}
pthread_mutex_unlock(&mg);
//start the receiving sockets in their threads
if(*threadedProcessing && dataReady){
startReceivingData();
}
}
#ifdef VERBOSE
cout << "Acquiring " << endl;
@ -484,13 +488,18 @@ int slsDetectorUtils::acquire(int delflag){
#ifdef VERBOSE
cout << "wait for data processing thread" << endl;
#endif
if(dataReady){
//if mask is not cleared, clear it
}
setJoinThread(1);
pthread_join(dataProcessingThread, &status);
#ifdef VERBOSE
cout << "data processing thread joined" << endl;
#endif
if(dataReady)
if(dataReady){
createReceivingDataThreads(true);
sem_destroy(&dataThreadStartedSemaphore);
}
}

View File

@ -640,6 +640,18 @@ virtual int getReceiverCurrentFrameIndex()=0;
*/
virtual int resetFramesCaught()=0;
/**
* Create Receiving Data Threads
* @param destroy is true to destroy all the threads
* @return OK or FAIL
*/
virtual int createReceivingDataThreads(bool destroy = false)=0;
/**
* Start Receiving Data Threads
* @return OK or FAIL
*/
virtual int startReceivingData()=0;
/** Reads frames from receiver through a constant socket
*/

View File

@ -333,7 +333,6 @@ s
int (*dataReady)(detectorData*,int, int,void*);
void *pCallbackArg;
detectorData *thisData;
sem_t dataThreadStartedSemaphore;
private: