mirror of
https://github.com/slsdetectorgroup/slsDetectorPackage.git
synced 2025-06-23 01:58:00 +02:00
almost there
This commit is contained in:
@ -5070,10 +5070,14 @@ void multiSlsDetector::startReceivingDataThread(){
|
||||
void *zmqsocket;
|
||||
context = zmq_ctx_new();
|
||||
zmqsocket = zmq_socket(context, ZMQ_PULL);
|
||||
//int hwmval = 10;
|
||||
//zmq_setsockopt(zmqsocket,ZMQ_RCVHWM,&hwmval,sizeof(hwmval)); //set receive HIGH WATER MARK (8-9ms slower)
|
||||
zmq_connect(zmqsocket, hostname);
|
||||
cout << "ZMQ Client of " << ithread << " at " << hostname << endl;
|
||||
cprintf(BLUE,"%d Created socket\n",ithread);
|
||||
|
||||
/*
|
||||
zmq_pollitem_t pollitem = {zmqsocket, 0 , ZMQ_POLLIN , 0};
|
||||
*/
|
||||
//initializations
|
||||
int numReadoutPerDetector = 1;
|
||||
bool jungfrau = false;
|
||||
@ -5086,6 +5090,7 @@ void multiSlsDetector::startReceivingDataThread(){
|
||||
int* image = new int[nel];
|
||||
int len,idet = 0;
|
||||
singleframe[ithread]=NULL;
|
||||
int datavalue = 2;
|
||||
threadStarted = true; //let calling function know thread started and obtained current
|
||||
|
||||
|
||||
@ -5102,82 +5107,112 @@ void multiSlsDetector::startReceivingDataThread(){
|
||||
|
||||
//scan header-------------------------------------------------------------------
|
||||
zmq_msg_init (&message);
|
||||
len = zmq_msg_recv(&message, zmqsocket, 0);
|
||||
if (len == -1) {
|
||||
zmq_msg_close(&message);
|
||||
cprintf(RED, "%d message null\n",ithread);
|
||||
continue;
|
||||
while(1){
|
||||
|
||||
len = zmq_msg_recv(&message, zmqsocket, ZMQ_DONTWAIT);
|
||||
if(len>0)
|
||||
break;//also comment out the next recv
|
||||
/*
|
||||
zmq_poll(&pollitem, 1, 0);
|
||||
//received something, get out
|
||||
if(pollitem.revents & ZMQ_POLLIN){
|
||||
pollitem.revents = 0;
|
||||
break;
|
||||
}
|
||||
*/
|
||||
//received nothing
|
||||
else if (receiverStoppedFlag){
|
||||
//one more chance if receiver stopped
|
||||
datavalue--;
|
||||
|
||||
if(!datavalue){
|
||||
//#ifdef VERYVERBOSE
|
||||
cprintf(RED,"End of socket for %d\n", ithread);
|
||||
//#endif
|
||||
singleframe[ithread] = NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
//wait to check again only if receiver stopped
|
||||
//usleep(4000);
|
||||
}
|
||||
usleep(4000);
|
||||
}
|
||||
|
||||
// error if you print it
|
||||
// cout << ithread << " header len:"<<len<<" value:"<< (char*)zmq_msg_data(&message)<<endl;
|
||||
//cprintf(BLUE,"%d header %d\n",ithread,len);
|
||||
rapidjson::Document d;
|
||||
d.Parse( (char*)zmq_msg_data(&message), zmq_msg_size(&message));
|
||||
#ifdef VERYVERBOSE
|
||||
// htype is an array of strings
|
||||
rapidjson::Value::Array htype = d["htype"].GetArray();
|
||||
for(int i=0; i< htype.Size(); i++)
|
||||
std::cout << ithread << "htype: " << htype[i].GetString() << std::endl;
|
||||
// shape is an array of ints
|
||||
rapidjson::Value::Array shape = d["shape"].GetArray();
|
||||
cout << ithread << "shape: ";
|
||||
for(int i=0; i< shape.Size(); i++)
|
||||
cout << ithread << shape[i].GetInt() << " ";
|
||||
cout << endl;
|
||||
if(datavalue){
|
||||
//len = zmq_msg_recv(&message, zmqsocket, 0);
|
||||
if (len == -1) {
|
||||
zmq_msg_close(&message);
|
||||
cprintf(RED, "%d message null\n",ithread);
|
||||
continue;
|
||||
}
|
||||
|
||||
cout << ithread << "type: " << d["type"].GetString() << endl;
|
||||
// error if you print it
|
||||
// cout << ithread << " header len:"<<len<<" value:"<< (char*)zmq_msg_data(&message)<<endl;
|
||||
//cprintf(BLUE,"%d header %d\n",ithread,len);
|
||||
rapidjson::Document d;
|
||||
d.Parse( (char*)zmq_msg_data(&message), zmq_msg_size(&message));
|
||||
#ifdef VERYVERBOSE
|
||||
// htype is an array of strings
|
||||
rapidjson::Value::Array htype = d["htype"].GetArray();
|
||||
for(int i=0; i< htype.Size(); i++)
|
||||
std::cout << ithread << "htype: " << htype[i].GetString() << std::endl;
|
||||
// shape is an array of ints
|
||||
rapidjson::Value::Array shape = d["shape"].GetArray();
|
||||
cout << ithread << "shape: ";
|
||||
for(int i=0; i< shape.Size(); i++)
|
||||
cout << ithread << shape[i].GetInt() << " ";
|
||||
cout << endl;
|
||||
|
||||
cout << ithread << "type: " << d["type"].GetString() << endl;
|
||||
|
||||
#endif
|
||||
if(!ithread){
|
||||
currentAcquisitionIndex = d["acqIndex"].GetInt();
|
||||
currentFrameIndex = d["fIndex"].GetInt();
|
||||
currentSubFrameIndex = d["subfnum"].GetInt();
|
||||
strcpy(currentFileName ,d["fname"].GetString());
|
||||
if(!ithread){
|
||||
currentAcquisitionIndex = d["acqIndex"].GetInt();
|
||||
currentFrameIndex = d["fIndex"].GetInt();
|
||||
currentSubFrameIndex = d["subfnum"].GetInt();
|
||||
strcpy(currentFileName ,d["fname"].GetString());
|
||||
#ifdef VERYVERBOSE
|
||||
cout << "Acquisition index: " << currentAcquisitionIndex << endl;
|
||||
cout << "Frame index: " << currentFrameIndex << endl;
|
||||
cout << "Subframe index: " << currentSubFrameIndex << endl;
|
||||
cout << "File name: " << currentFileName << endl;
|
||||
cout << "Acquisition index: " << currentAcquisitionIndex << endl;
|
||||
cout << "Frame index: " << currentFrameIndex << endl;
|
||||
cout << "Subframe index: " << currentSubFrameIndex << endl;
|
||||
cout << "File name: " << currentFileName << endl;
|
||||
#endif
|
||||
if(currentFrameIndex ==-1) cprintf(RED,"multi frame index -1!!\n");
|
||||
}
|
||||
singleframe[ithread]=image;
|
||||
if(currentFrameIndex ==-1) cprintf(RED,"multi frame index -1!!\n");
|
||||
}
|
||||
singleframe[ithread]=image;
|
||||
|
||||
// close the message
|
||||
zmq_msg_close(&message);
|
||||
|
||||
|
||||
//scan data-------------------------------------------------------------------
|
||||
zmq_msg_init (&message);
|
||||
len = zmq_msg_recv(&message, zmqsocket, 0);
|
||||
|
||||
//cprintf(BLUE,"%d data %d\n",ithread,len);
|
||||
//end of socket ("end")
|
||||
if (len < 1024*256 ) {
|
||||
if(!len) cprintf(RED,"Received no data in socket for %d\n", ithread);
|
||||
//#ifdef VERYVERBOSE
|
||||
cprintf(RED,"End of socket for %d\n", ithread);
|
||||
//#endif
|
||||
// close the message
|
||||
zmq_msg_close(&message);
|
||||
singleframe[ithread] = NULL;
|
||||
//break;
|
||||
}
|
||||
else{
|
||||
//actual data
|
||||
//cprintf(BLUE,"%d actual dataaa\n",ithread);
|
||||
memcpy((char*)(singleframe[ithread]),(char*)zmq_msg_data(&message),singleDatabytes/numReadoutPerDetector);
|
||||
|
||||
|
||||
//jungfrau masking adcval
|
||||
if(jungfrau){
|
||||
for(unsigned int i=0;i<nel;i++){
|
||||
singleframe[ithread][i] = (singleframe[ithread][i] & 0x3FFF3FFF);
|
||||
//scan data-------------------------------------------------------------------
|
||||
zmq_msg_init (&message);
|
||||
len = zmq_msg_recv(&message, zmqsocket, 0);
|
||||
|
||||
//cprintf(BLUE,"%d data %d\n",ithread,len);
|
||||
//end of socket ("end")
|
||||
if (len < 1024*256 ) {
|
||||
cprintf(RED,"Received weird packet size %d in socket for %d\n", len, ithread);
|
||||
memset((char*)(singleframe[ithread]),0xFF,singleDatabytes/numReadoutPerDetector);
|
||||
}
|
||||
else{
|
||||
//actual data
|
||||
//cprintf(BLUE,"%d actual dataaa\n",ithread);
|
||||
memcpy((char*)(singleframe[ithread]),(char*)zmq_msg_data(&message),singleDatabytes/numReadoutPerDetector);
|
||||
|
||||
|
||||
//jungfrau masking adcval
|
||||
if(jungfrau){
|
||||
for(unsigned int i=0;i<nel;i++){
|
||||
singleframe[ithread][i] = (singleframe[ithread][i] & 0x3FFF3FFF);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
sem_post(&sem_singledone[ithread]);//let multi know is ready
|
||||
zmq_msg_close(&message); // close the message
|
||||
datavalue = 2;
|
||||
}
|
||||
|
||||
cprintf(RED,"%d Closing socket\n",ithread);
|
||||
|
Reference in New Issue
Block a user