only missing data left to be handled in zmqthread in receiver

This commit is contained in:
Dhanya Maliakal 2016-09-19 17:21:23 +02:00
parent 38d477a1ad
commit 904d21d0be
2 changed files with 48 additions and 51 deletions

View File

@ -4989,32 +4989,24 @@ void multiSlsDetector::startReceivingData(){
//loop though the half readouts to start sockets
zmq_msg_t message;
int len,idet = 0;
void *context;
void *zmqsocket;
context = zmq_ctx_new();
zmqsocket = zmq_socket(context, ZMQ_PULL);
//zmq_setsockopt(zmqsocket, ZMQ_SUBSCRIBE, "", 0); // an empty string implies receiving any messages
zmq_connect(zmqsocket, hostname); // connect to publisher,the publisher server does not have to be started
zmq_connect(zmqsocket, hostname); // connect to publisher,the publisher server does not have to be started
pthread_mutex_lock(&ms);
receivingDataThreadMask|=(1<<(ithread));
//cout<<ithread<< " single created "<<hex<<receivingDataThreadMask<<endl;
pthread_mutex_unlock(&ms);
//receive msgs and let multi know
zmq_msg_t message;
int len,idet = 0;
int framecount=0;
//read frame
while(true){
//cprintf(GREEN,"single %d waiting for multi\n",ithread);
sem_wait(&sem_singlewait[ithread]); //wait for it to be copied
//cprintf(GREEN,"single %d got multi\n",ithread);
if(!idet) framecount++; //update indices, count only once
//scan header--------------------------------------------------------
//scan header-------------------------------------------------------------------
zmq_msg_init (&message);
len = zmq_msg_recv(&message, zmqsocket, 0);
if (len == -1) {
@ -5023,57 +5015,61 @@ void multiSlsDetector::startReceivingData(){
continue;
}
// error if you print it
// cout << ithread << " header len:"<<len<<" value:"<< (char*)zmq_msg_data(&message)<<endl;
rapidjson::Document d;
// d.Parse((char*)zmq_msg_data(&message));
d.Parse( (char*)zmq_msg_data(&message), zmq_msg_size(&message));
#ifdef VERYVERBOSE
// htype is an array of strings
rapidjson::Value::Array htype = d["htype"].GetArray();
for(int i=0; i< htype.Size(); i++)
std::cout << ithread << "htype: " << htype[i].GetString() << std::endl;
for(int i=0; i< htype.Size(); i++)
std::cout << ithread << "htype: " << htype[i].GetString() << std::endl;
// shape is an array of ints
rapidjson::Value::Array shape = d["shape"].GetArray();
cout << ithread << "shape: ";
for(int i=0; i< shape.Size(); i++)
cout << ithread << shape[i].GetInt() << " ";
cout << endl;
cout << ithread << "shape: ";
for(int i=0; i< shape.Size(); i++)
cout << ithread << shape[i].GetInt() << " ";
cout << endl;
cout << ithread << "type: " << d["type"].GetString() << endl;
#endif
if(!ithread){
currentAcquisitionIndex = d["acqIndex"].GetInt();
currentFrameIndex = d["fIndex"].GetInt();
currentSubFrameIndex = d["subfnum"].GetInt();
strcpy(currentFileName ,d["fname"].GetString());
cout << ithread << "Acquisition index: " << currentAcquisitionIndex << endl;
cout << ithread << "Frame index: " << currentFrameIndex << endl;
cout << ithread << "Subframe index: " << currentSubFrameIndex << endl;
cout << ithread << "File name: " << currentFileName << endl;
}
// close the message
zmq_msg_close(&message);
//scan data--------------------------------------------------------
//scan data-------------------------------------------------------------------
zmq_msg_init (&message);
len = zmq_msg_recv(&message, zmqsocket, 0);
//last one
if(len<1024*256){
cprintf(RED,"got less than planned for socket %d, length:%d\n",ithread,len);
//end of socket
//if (len <= 3 ) {
//end of socket ("end")
if (len < 1024*256 ) {
if(!len) cprintf(RED,"Received no data in socket for %d\n", ithread);
cprintf(RED,"End of socket for %d\n", ithread);
zmq_msg_close(&message);
//cout<<ithread <<" sls Received end data"<<endl;
singleframe[ithread] = NULL;
pthread_mutex_lock(&ms);
receivingDataThreadMask^=(1<<ithread);
pthread_mutex_unlock(&ms);
sem_post(&sem_singledone[ithread]); //let multi know is ready
//cprintf(GREEN,"single %d posted done (finished) for multi\n",ithread);
//cout << ithread << " single finished" << endl;
break;
}
//actual data
//cout<<"Received on " << ithread << " for frame " << framecount << endl;
//if(len == singleDatabytes/numReadoutPerDetector){//hoow to solve this
memcpy((char*)(singleframe[ithread]),(char*)zmq_msg_data(&message),singleDatabytes/numReadoutPerDetector);
//check header, if incorrect frame, copy somewhere and assign a blank subframe and also check size
//jungfrau masking adcval
if(jungfrau){
@ -5081,12 +5077,9 @@ void multiSlsDetector::startReceivingData(){
singleframe[ithread][i] = (singleframe[ithread][i] & 0x3FFF3FFF);
}
}
//}
sem_post(&sem_singledone[ithread]);//let multi know is ready
//cprintf(GREEN,"single %d posted done for multi\n",ithread);
// close the message
zmq_msg_close(&message);
sem_post(&sem_singledone[ithread]);//let multi know is ready
zmq_msg_close(&message); // close the message
}
//close socket
@ -5116,6 +5109,7 @@ void multiSlsDetector::readFrameFromReceiver(){
volatile uint64_t expectedMask = 0x0;
receivingDataThreadMask = 0x0;
currentThreadIndex = -1;
strcpy(currentFileName,"");
for(int i = 0; i < numReadouts; ++i){
threadStarted = false;
currentThreadIndex = i;
@ -5153,7 +5147,6 @@ void multiSlsDetector::readFrameFromReceiver(){
int* p = multiframe;
int idet,offsetY,offsetX;
int halfreadoutoffset = (slsmaxX/numReadoutPerDetector);
int framecount=0;
int nx =getTotalNumberOfChannels(slsDetectorDefs::X);
int ny =getTotalNumberOfChannels(slsDetectorDefs::Y);
@ -5161,7 +5154,6 @@ void multiSlsDetector::readFrameFromReceiver(){
sem_post(&dataThreadStartedSemaphore); //let utils:acquire continue to start measurement/acquisition
//cprintf(BLUE,"all sockets created\n");
//construct complete image and send to callback
@ -5172,7 +5164,6 @@ void multiSlsDetector::readFrameFromReceiver(){
for(int ireadout=0; ireadout<numReadouts; ++ireadout){
if((1 << ireadout) & receivingDataThreadMask){
sem_post(&sem_singlewait[ireadout]); //sls to continue
//cprintf(BLUE,"multi posted det %d\n",ireadout);
}
}
@ -5180,10 +5171,8 @@ void multiSlsDetector::readFrameFromReceiver(){
for(int ireadout=0; ireadout<numReadouts; ++ireadout){
idet = ireadout/numReadoutPerDetector;
if((1 << ireadout) & receivingDataThreadMask){ //if running
//wait for single to copy
//cprintf(BLUE,"multi waiting done by det %d\n",ireadout);
sem_wait(&sem_singledone[ireadout]); //wait for sls to copy
//cprintf(BLUE,"multi got done by det %d\n",ireadout);
//this socket closed
if(!((1 << ireadout) & receivingDataThreadMask)){ //if running
@ -5228,7 +5217,6 @@ void multiSlsDetector::readFrameFromReceiver(){
}
}
//cout<<"receivingDataThreadMask:"<<hex<<receivingDataThreadMask<<endl;
if(!receivingDataThreadMask){
break;
@ -5238,14 +5226,15 @@ void multiSlsDetector::readFrameFromReceiver(){
//send data to callback
fdata = decodeData(multiframe);
if ((fdata) && (dataReady)){
thisData = new detectorData(fdata,NULL,NULL,getCurrentProgress(),"noname.raw",nx,ny);
dataReady(thisData, framecount, framecount, pCallbackArg);//should be fnum and subfnum from json header
thisData = new detectorData(fdata,NULL,NULL,getCurrentProgress(),currentFileName,nx,ny);
dataReady(thisData, currentFrameIndex, currentSubFrameIndex, pCallbackArg);//should be fnum and subfnum from json header
delete thisData;
fdata = NULL;
//cout<<"Send frame #"<< framecount << " to gui"<<endl;
//cout<<"Send frame #"<< currentFrameIndex << " to gui"<<endl;
}
framecount++;
setCurrentProgress(framecount);
setCurrentProgress(currentAcquisitionIndex+1);
}

View File

@ -1370,9 +1370,17 @@ private:
*/
void startReceivingData();
/* synchronizing between zmq threads */
sem_t sem_singledone[MAXDET];
sem_t sem_singlewait[MAXDET];
int* singleframe[MAXDET];
/* Parameters given to the gui picked up from zmq threads*/
int currentAcquisitionIndex;
int currentFrameIndex;
int currentSubFrameIndex;
char currentFileName[MAX_STR_LENGTH];
/** Ensures if threads created successfully */
bool threadStarted;
/** Current Thread Index*/