This commit is contained in:
Dhanya Maliakal
2016-09-15 12:16:01 +02:00
parent 0f9a841c4a
commit a821442b1a
8 changed files with 98 additions and 209 deletions

View File

@ -7161,7 +7161,6 @@ void slsDetector::readFrameFromReceiver(){
portno[i] = DEFAULT_ZMQ_PORTNO + (readoutId+i);
sprintf(hostname[i], "%s%d", "tcp://127.0.0.1:",portno[i]);
//cout << "ZMQ Client of " << readoutId+i << " at " << hostname[i] << endl;
parentDet->slsframe[readoutId+i]=new int[nel];
}
@ -7172,69 +7171,51 @@ void slsDetector::readFrameFromReceiver(){
for(int i=0;i<numReadout;++i){
context[i] = zmq_ctx_new();
zmqsocket[i] = zmq_socket(context[i], ZMQ_SUB);
// an empty string implies receiving any messages
zmq_setsockopt(zmqsocket[i], ZMQ_SUBSCRIBE, "", 0);
// connect to publisher
// the publisher server does not have to be started
zmq_connect(zmqsocket[i], hostname[i]);
zmq_setsockopt(zmqsocket[i], ZMQ_SUBSCRIBE, "", 0); // an empty string implies receiving any messages
zmq_connect(zmqsocket[i], hostname[i]);// connect to publisher,the publisher server does not have to be started
runningMask|=(1<<(i));
sem_post(&parentDet->sem_multiwait[readoutId+i]); //let multi know socket created
}
cout<<detId<<" started"<<endl;
//receive msgs and let multi know
zmq_msg_t message;
int len,idet = 0;
int framecount=0;
//read frame
while(true){
for(int idet=0; idet<numReadout; ++idet){
if((1 << idet) & runningMask){
sem_wait(&parentDet->sem_slswait[readoutId+idet]);//wait for it to be copied
//update indices
if(!idet) framecount++; //count only once
if(!idet) framecount++; //update indices, count only once
// receive a message, this is a blocking function
len = zmq_msg_init (&message); /* is this required? Xiaoqiang didnt have it*/
if(len) {cprintf(RED,"Failed to initialize message %d for %d\n",len,readoutId+idet); continue; }//error
len = zmq_msg_recv(&message, zmqsocket[idet], 0);
//int size = zmq_msg_size (&message);
//end of socket
if (len <= 3 ) {
if(!len) cprintf(RED,"Received no data in socket for %d\n", readoutId+idet);
//cout<<readoutId+idet <<" sls Received end data"<<endl;
parentDet->slsframe[readoutId+idet] = NULL;
sem_post(&parentDet->sem_slsdone[readoutId+idet]);//let multi know is ready
sem_post(&parentDet->sem_slsdone[readoutId+idet]); //let multi know is ready
runningMask^=(1<<idet);
cout<<detId<<" " << idet << " finished"<<endl;
//all done, get out
if(!runningMask){
cout<<detId<<" all done"<<endl;
//cout<<detId<<" " << idet << " finished"<<endl;
if(!runningMask){ //all done, get out
//cout<<detId<<" all done"<<endl;
break;
}
continue;
}
cout<<"Received on " << readoutId+idet << " for frame " << framecount << endl;
if(zmq_msg_data(&message)==NULL) cprintf(RED,"GOT NULL FROM ZMQ\n"); /*not needed most likely*/
if(zmq_msg_data(&message)==NULL)
cprintf(RED,"GOT NULL FROM ZMQ\n");
//cout<<"Received on " << readoutId+idet << " for frame " << framecount << endl;
//if(len == thisDetector->dataBytes/numReadout){//hoow to solve this
memcpy((char*)(parentDet->slsframe[readoutId+idet]),(char*)zmq_msg_data(&message),thisDetector->dataBytes/numReadout);
//memcpy((char*)(parentDet->slsframe[readoutId+idet]),zmq_msg_data(&message[idet]),thisDetector->dataBytes);
//check header, if incorrect frame, copy somewhere and assign a blank subframe
//parentDet->slsframe[readoutId+idet] = (int*)zmq_msg_data(&message[idet]);
//check header, if incorrect frame, copy somewhere and assign a blank subframe and also check size
//jungfrau masking adcval
if(thisDetector->myDetectorType == JUNGFRAU){
@ -7244,29 +7225,23 @@ void slsDetector::readFrameFromReceiver(){
}
//}
sem_post(&parentDet->sem_slsdone[readoutId+idet]);//let multi know is ready
}
}//end of for loop
if(!runningMask){
break;
}
}
zmq_msg_close(&message);
//close socket
for(int i=0;i<numReadout;i++){
zmq_disconnect(zmqsocket[i], hostname[i]);
zmq_close(zmqsocket[i]);
zmq_ctx_destroy(context[i]);
delete [] parentDet->slsframe[readoutId+i];
}
};
}