This commit is contained in:
Dhanya Maliakal 2016-10-18 14:05:05 +02:00
parent eec812f92c
commit a7e8ff1c42
3 changed files with 63 additions and 156 deletions

View File

@ -5071,13 +5071,12 @@ void multiSlsDetector::startReceivingDataThread(){
context = zmq_ctx_new(); context = zmq_ctx_new();
zmqsocket = zmq_socket(context, ZMQ_PULL); zmqsocket = zmq_socket(context, ZMQ_PULL);
//int hwmval = 10; //int hwmval = 10;
//zmq_setsockopt(zmqsocket,ZMQ_RCVHWM,&hwmval,sizeof(hwmval)); //set receive HIGH WATER MARK (8-9ms slower) //zmq_setsockopt(zmqsocket,ZMQ_RCVHWM,&hwmval,sizeof(hwmval)); //set receive HIGH WATER MARK (8-9ms slower//should not drop last packets)
zmq_connect(zmqsocket, hostname); zmq_connect(zmqsocket, hostname);
cout << "ZMQ Client of " << ithread << " at " << hostname << endl; cout << "ZMQ Client of " << ithread << " at " << hostname << endl;
cprintf(BLUE,"%d Created socket\n",ithread); cprintf(BLUE,"%d Created socket\n",ithread);
/*
zmq_pollitem_t pollitem = {zmqsocket, 0 , ZMQ_POLLIN , 0};
*/
//initializations //initializations
int numReadoutPerDetector = 1; int numReadoutPerDetector = 1;
bool jungfrau = false; bool jungfrau = false;
@ -5090,9 +5089,6 @@ void multiSlsDetector::startReceivingDataThread(){
int* image = new int[nel]; int* image = new int[nel];
int len,idet = 0; int len,idet = 0;
singleframe[ithread]=NULL; singleframe[ithread]=NULL;
///*
int datavalue = 2;
// */
threadStarted = true; //let calling function know thread started and obtained current threadStarted = true; //let calling function know thread started and obtained current
@ -5109,146 +5105,87 @@ void multiSlsDetector::startReceivingDataThread(){
//scan header------------------------------------------------------------------- //scan header-------------------------------------------------------------------
zmq_msg_init (&message); zmq_msg_init (&message);
/*
//---- with end
len = zmq_msg_recv(&message, zmqsocket, 0); len = zmq_msg_recv(&message, zmqsocket, 0);
if (len == -1) { if (len == -1) {
cprintf(BG_RED,"Could not read header for socket %d\n",ithread);
zmq_msg_close(&message); zmq_msg_close(&message);
cprintf(RED, "%d message null\n",ithread); cprintf(RED, "%d message null\n",ithread);
continue; continue;
} }
//----
*/
// error if you print it
while(1){ // cout << ithread << " header len:"<<len<<" value:"<< (char*)zmq_msg_data(&message)<<endl;
//cprintf(BLUE,"%d header %d\n",ithread,len);
len = zmq_msg_recv(&message, zmqsocket, ZMQ_DONTWAIT); rapidjson::Document d;
if(len>0) d.Parse( (char*)zmq_msg_data(&message), zmq_msg_size(&message));
break;//also comment out the next recv
//zmq_poll(&pollitem, 1, 0);
//received something, get out
//if(pollitem.revents & ZMQ_POLLIN){
// pollitem.revents = 0;
// break;
//}uncomment next recv
//received nothing
else if (receiverStoppedFlag){
//one more chance if receiver stopped
datavalue--;
if(!datavalue){
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cprintf(RED,"End of socket for %d\n", ithread); // htype is an array of strings
#endif rapidjson::Value::Array htype = d["htype"].GetArray();
singleframe[ithread] = NULL; for(int i=0; i< htype.Size(); i++)
break; std::cout << ithread << "htype: " << htype[i].GetString() << std::endl;
} // shape is an array of ints
rapidjson::Value::Array shape = d["shape"].GetArray();
cout << ithread << "shape: ";
for(int i=0; i< shape.Size(); i++)
cout << ithread << shape[i].GetInt() << " ";
cout << endl;
//wait to check again only if receiver stopped cout << ithread << "type: " << d["type"].GetString() << endl;
//usleep(4000);
} #endif
usleep(4000); if(!ithread){
currentAcquisitionIndex = d["acqIndex"].GetInt();
currentFrameIndex = d["fIndex"].GetInt();
currentSubFrameIndex = d["subfnum"].GetInt();
strcpy(currentFileName ,d["fname"].GetString());
#ifdef VERYVERBOSE
cout << "Acquisition index: " << currentAcquisitionIndex << endl;
cout << "Frame index: " << currentFrameIndex << endl;
cout << "Subframe index: " << currentSubFrameIndex << endl;
cout << "File name: " << currentFileName << endl;
#endif
if(currentFrameIndex ==-1) cprintf(RED,"multi frame index -1!!\n");
} }
singleframe[ithread]=image;
// close the message
zmq_msg_close(&message);
//scan data-------------------------------------------------------------------
zmq_msg_init (&message);
len = zmq_msg_recv(&message, zmqsocket, 0);
if(datavalue){ //cprintf(BLUE,"%d data %d\n",ithread,len);
//len = zmq_msg_recv(&message, zmqsocket, 0); //end of socket ("end")
if (len == -1) { if (len < 1024*256 ) {
zmq_msg_close(&message); if(len == 3){
cprintf(RED, "%d message null\n",ithread); //cprintf(RED,"Received end of acquisition for socket %d\n", ithread);
continue;
}
// error if you print it
// cout << ithread << " header len:"<<len<<" value:"<< (char*)zmq_msg_data(&message)<<endl;
//cprintf(BLUE,"%d header %d\n",ithread,len);
rapidjson::Document d;
d.Parse( (char*)zmq_msg_data(&message), zmq_msg_size(&message));
#ifdef VERYVERBOSE
// htype is an array of strings
rapidjson::Value::Array htype = d["htype"].GetArray();
for(int i=0; i< htype.Size(); i++)
std::cout << ithread << "htype: " << htype[i].GetString() << std::endl;
// shape is an array of ints
rapidjson::Value::Array shape = d["shape"].GetArray();
cout << ithread << "shape: ";
for(int i=0; i< shape.Size(); i++)
cout << ithread << shape[i].GetInt() << " ";
cout << endl;
cout << ithread << "type: " << d["type"].GetString() << endl;
#endif
if(!ithread){
currentAcquisitionIndex = d["acqIndex"].GetInt();
currentFrameIndex = d["fIndex"].GetInt();
currentSubFrameIndex = d["subfnum"].GetInt();
strcpy(currentFileName ,d["fname"].GetString());
#ifdef VERYVERBOSE
cout << "Acquisition index: " << currentAcquisitionIndex << endl;
cout << "Frame index: " << currentFrameIndex << endl;
cout << "Subframe index: " << currentSubFrameIndex << endl;
cout << "File name: " << currentFileName << endl;
#endif
if(currentFrameIndex ==-1) cprintf(RED,"multi frame index -1!!\n");
}
singleframe[ithread]=image;
// close the message
zmq_msg_close(&message);
//scan data-------------------------------------------------------------------
zmq_msg_init (&message);
len = zmq_msg_recv(&message, zmqsocket, 0);
//cprintf(BLUE,"%d data %d\n",ithread,len);
//end of socket ("end")
if (len < 1024*256 ) {
/*
if(!len) cprintf(RED,"Received no data in socket for %d\n", ithread);
//#ifdef VERYVERBOSE
cprintf(RED,"End of socket for %d\n", ithread);
//#endif
zmq_msg_close(&message);
singleframe[ithread] = NULL; singleframe[ithread] = NULL;
//break; //break;
*/ }else{
cprintf(RED,"Received weird packet size %d in socket for %d\n", len, ithread); cprintf(RED,"Received weird packet size %d in socket for %d\n", len, ithread);
memset((char*)(singleframe[ithread]),0xFF,singleDatabytes/numReadoutPerDetector); memset((char*)(singleframe[ithread]),0xFF,singleDatabytes/numReadoutPerDetector);
} }
else{
//actual data }
//cprintf(BLUE,"%d actual dataaa\n",ithread); else{
memcpy((char*)(singleframe[ithread]),(char*)zmq_msg_data(&message),singleDatabytes/numReadoutPerDetector); //actual data
//cprintf(BLUE,"%d actual dataaa\n",ithread);
memcpy((char*)(singleframe[ithread]),(char*)zmq_msg_data(&message),singleDatabytes/numReadoutPerDetector);
//jungfrau masking adcval //jungfrau masking adcval
if(jungfrau){ if(jungfrau){
for(unsigned int i=0;i<nel;i++){ for(unsigned int i=0;i<nel;i++){
singleframe[ithread][i] = (singleframe[ithread][i] & 0x3FFF3FFF); singleframe[ithread][i] = (singleframe[ithread][i] & 0x3FFF3FFF);
}
} }
} }
}
///*
}
// */
sem_post(&sem_singledone[ithread]);//let multi know is ready sem_post(&sem_singledone[ithread]);//let multi know is ready
zmq_msg_close(&message); // close the message zmq_msg_close(&message); // close the message
///*
datavalue = 2;
// */
} }
cprintf(RED,"%d Closing socket\n",ithread); cprintf(RED,"%d Closing socket\n",ithread);
@ -5638,7 +5575,7 @@ int multiSlsDetector::enableDataStreamingFromReceiver(int enable){
} }
} }
}/*else enable = threadStarted;*/ }else enable = threadStarted;
int ret=-100, ret1; int ret=-100, ret1;
for (int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++) { for (int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++) {
if (detectors[idet]) { if (detectors[idet]) {
@ -5651,10 +5588,10 @@ int multiSlsDetector::enableDataStreamingFromReceiver(int enable){
ret=-1; ret=-1;
} }
} }
///* /*
if(enable == -1) if(enable == -1)
return threadStarted; return threadStarted;
// */ */
return (threadStarted & ret); return (threadStarted & ret);
} }

View File

@ -58,9 +58,6 @@ int slsDetectorUtils::acquire(int delflag){
if(!receiver){ if(!receiver){
setDetectorIndex(-1); setDetectorIndex(-1);
}else{ }else{
///*
receiverStoppedFlag = 0;
//*/
//put receiver read frequency to random if no gui //put receiver read frequency to random if no gui
int ret = setReadReceiverFrequency(0); int ret = setReadReceiverFrequency(0);
if(ret>0 && (dataReady == NULL)){ if(ret>0 && (dataReady == NULL)){
@ -151,27 +148,16 @@ int slsDetectorUtils::acquire(int delflag){
if(receiver){ if(receiver){
if(getReceiverStatus()!=IDLE) if(getReceiverStatus()!=IDLE)
stopReceiver(); stopReceiver();
if(setReceiverOnline()==OFFLINE_FLAG){ if(setReceiverOnline()==OFFLINE_FLAG)
*stoppedFlag=1; *stoppedFlag=1;
///*
receiverStoppedFlag = 1;
//*/
}
//multi detectors shouldnt have different receiver read frequencies enabled/disabled //multi detectors shouldnt have different receiver read frequencies enabled/disabled
if(setReadReceiverFrequency(0) < 0){ if(setReadReceiverFrequency(0) < 0){
std::cout << "Error: The receiver read frequency is invalid:" << setReadReceiverFrequency(0) << std::endl; std::cout << "Error: The receiver read frequency is invalid:" << setReadReceiverFrequency(0) << std::endl;
*stoppedFlag=1; *stoppedFlag=1;
///*
receiverStoppedFlag = 1;
//*/
} }
if(setReceiverOnline()==OFFLINE_FLAG){ if(setReceiverOnline()==OFFLINE_FLAG)
*stoppedFlag=1; *stoppedFlag=1;
///*
receiverStoppedFlag = 1;
//*/
}
} }
@ -304,9 +290,6 @@ int slsDetectorUtils::acquire(int delflag){
if(setReceiverOnline()==OFFLINE_FLAG){ if(setReceiverOnline()==OFFLINE_FLAG){
stopReceiver(); stopReceiver();
*stoppedFlag=1; *stoppedFlag=1;
///*
receiverStoppedFlag = 1;
//*/
pthread_mutex_unlock(&mg); pthread_mutex_unlock(&mg);
break; break;
} }
@ -314,9 +297,6 @@ int slsDetectorUtils::acquire(int delflag){
if(startReceiver() == FAIL) { if(startReceiver() == FAIL) {
stopReceiver(); stopReceiver();
*stoppedFlag=1; *stoppedFlag=1;
///*
receiverStoppedFlag = 1;
//*/
pthread_mutex_unlock(&mg); pthread_mutex_unlock(&mg);
break; break;
} }
@ -363,9 +343,6 @@ int slsDetectorUtils::acquire(int delflag){
pthread_mutex_lock(&mg); pthread_mutex_lock(&mg);
//offline //offline
if(setReceiverOnline()==OFFLINE_FLAG){ if(setReceiverOnline()==OFFLINE_FLAG){
///*
receiverStoppedFlag = 1;
//*/
if ((getDetectorsType()==GOTTHARD) || (getDetectorsType()==MOENCH) || (getDetectorsType()==JUNGFRAU) ){ if ((getDetectorsType()==GOTTHARD) || (getDetectorsType()==MOENCH) || (getDetectorsType()==JUNGFRAU) ){
if((*correctionMask)&(1<<WRITE_FILE)) if((*correctionMask)&(1<<WRITE_FILE))
closeDataFile(); closeDataFile();
@ -374,9 +351,6 @@ int slsDetectorUtils::acquire(int delflag){
//online //online
else{ else{
stopReceiver(); stopReceiver();
///*
receiverStoppedFlag = 1;
//*/
// cout<<"***********receiver stopped"<<endl; // cout<<"***********receiver stopped"<<endl;
} }
pthread_mutex_unlock(&mg); pthread_mutex_unlock(&mg);

View File

@ -837,10 +837,6 @@ virtual int setReceiverFifoDepth(int i = -1)=0;
//protected: //protected:
int *stoppedFlag; int *stoppedFlag;
///*
int receiverStoppedFlag;
// */
int64_t *timerValue; int64_t *timerValue;
detectorSettings *currentSettings; detectorSettings *currentSettings;
int *currentThresholdEV; int *currentThresholdEV;