almost done

This commit is contained in:
Dhanya Maliakal
2016-09-09 17:51:36 +02:00
parent f8b62bba64
commit 652d29f2d9
13 changed files with 441 additions and 197 deletions

View File

@ -10,7 +10,7 @@
#include <cstdlib>
#include <math.h>
#include "gitInfoLib.h"
#include <zmq.h>
int slsDetector::initSharedMemory(detectorType type, int id) {
@ -7145,58 +7145,125 @@ int slsDetector::resetFramesCaught(){
int* slsDetector::readFrameFromReceiver(char* fName, int &acquisitionIndex, int &frameIndex, int &subFrameIndex){
int fnum=F_READ_RECEIVER_FRAME;
int nel=thisDetector->dataBytes/sizeof(int);
int* retval=new int[nel];
int ret=FAIL;
int n;
char mess[MAX_STR_LENGTH]="Nothing";
void slsDetector::readFrameFromReceiver(){
if (setReceiverOnline(ONLINE_FLAG)==ONLINE_FLAG) {
#ifdef VERBOSE
std::cout<< "slsDetector: Reading frame from receiver "<< thisDetector->dataBytes << " " <<nel <<std::endl;
#endif
if (connectData() == OK){
dataSocket->SendDataOnly(&fnum,sizeof(fnum));
dataSocket->ReceiveDataOnly(&ret,sizeof(ret));
//determine number of half readouts
int numReadout = 1;
if(thisDetector->myDetectorType == EIGER) numReadout = 2;
int readoutId = detId*numReadout;
volatile uint64_t runningMask = 0x0;
if (ret==FAIL) {
n= dataSocket->ReceiveDataOnly(mess,sizeof(mess));
std::cout<< "Detector returned: " << mess << " " << n << std::endl;
delete [] retval;
disconnectData();
return NULL;
} else {
n=dataSocket->ReceiveDataOnly(fName,MAX_STR_LENGTH);
n=dataSocket->ReceiveDataOnly(&acquisitionIndex,sizeof(acquisitionIndex));
n=dataSocket->ReceiveDataOnly(&frameIndex,sizeof(frameIndex));
if(thisDetector->myDetectorType == EIGER)
n=dataSocket->ReceiveDataOnly(&subFrameIndex,sizeof(subFrameIndex));
n=dataSocket->ReceiveDataOnly(retval,thisDetector->dataBytes);
//server details
char hostname[numReadout][100];
int portno[numReadout];
int nel=(thisDetector->dataBytes/numReadout)/sizeof(int);
for(int i=0;i<numReadout;++i){
portno[i] = DEFAULT_ZMQ_PORTNO + (readoutId+i);
sprintf(hostname[i], "%s%d", "tcp://127.0.0.1:",portno[i]);
//cout << "ZMQ Client of " << readoutId+i << " at " << hostname[i] << endl;
#ifdef VERBOSE
std::cout<< "Received "<< n << " data bytes" << std::endl;
#endif
if (n!=thisDetector->dataBytes) {
std::cout<<endl<< "wrong data size received: received " << n << " but expected from receiver " << thisDetector->dataBytes << std::endl;
ret=FAIL;
delete [] retval;
disconnectData();
return NULL;
}
//jungfrau masking adcval
if(thisDetector->myDetectorType == JUNGFRAU){
for(unsigned int i=0;i<nel;i++){
retval[i] = (retval[i] & 0x3FFF3FFF);
}
}
}
disconnectData();
}
parentDet->slsframe[readoutId+i]=new int[nel];
}
return retval;
//loop though the half readouts to start sockets
void *context[numReadout];
void *zmqsocket[numReadout];
for(int i=0;i<numReadout;++i){
context[i] = zmq_ctx_new();
zmqsocket[i] = zmq_socket(context[i], ZMQ_SUB);
// an empty string implies receiving any messages
zmq_setsockopt(zmqsocket[i], ZMQ_SUBSCRIBE, "", 0);
// connect to publisher
// the publisher server does not have to be started
zmq_connect(zmqsocket[i], hostname[i]);
runningMask|=(1<<(i));
}
//receive msgs and let multi know
zmq_msg_t message;
int len,idet = 0;
int framecount=0;
while(true){
for(int idet=0; idet<numReadout; ++idet){
if((1 << idet) & runningMask){
sem_wait(&parentDet->sem_slswait[readoutId+idet]);//wait for it to be copied
//update indices
if(!idet) framecount++; //count only once
// receive a message, this is a blocking function
len = zmq_msg_init (&message); /* is this required? Xiaoqiang didnt have it*/
if(len) {cprintf(RED,"Failed to initialize message %d for %d\n",len,readoutId+idet); continue; }//error
len = zmq_msg_recv(&message, zmqsocket[idet], 0);
//int size = zmq_msg_size (&message);
if (len <= 3 ) {
if(!len) cprintf(RED,"Received no data in socket for %d\n", readoutId+idet);
cout<<readoutId+idet <<" sls Received end data"<<endl;
parentDet->slsframe[readoutId+idet] = NULL;
sem_post(&parentDet->sem_slsdone[readoutId+idet]);//let multi know is ready
runningMask^=(1<<idet);
//all done, get out
if(!runningMask){
break;
}
continue;
}
cout<<"Received on " << readoutId+idet << " for frame " << framecount << endl;
if(zmq_msg_data(&message)==NULL)
cprintf(RED,"GOT NULL FROM ZMQ\n");
//if(len == thisDetector->dataBytes/numReadout){//hoow to solve this
memcpy((char*)(parentDet->slsframe[readoutId+idet]),(char*)zmq_msg_data(&message),thisDetector->dataBytes/numReadout);
//memcpy((char*)(parentDet->slsframe[readoutId+idet]),zmq_msg_data(&message[idet]),thisDetector->dataBytes);
//check header, if incorrect frame, copy somewhere and assign a blank subframe
//parentDet->slsframe[readoutId+idet] = (int*)zmq_msg_data(&message[idet]);
//jungfrau masking adcval
if(thisDetector->myDetectorType == JUNGFRAU){
for(unsigned int i=0;i<nel;i++){
parentDet->slsframe[readoutId+idet][i] = (parentDet->slsframe[readoutId+idet][i] & 0x3FFF3FFF);
}
}
//}
sem_post(&parentDet->sem_slsdone[readoutId+idet]);//let multi know is ready
}
}//end of for loop
if(!runningMask){
break;
}
}
zmq_msg_close(&message);
//close socket
for(int i=0;i<numReadout;i++){
zmq_disconnect(zmqsocket[i], hostname[i]);
zmq_close(zmqsocket[i]);
zmq_ctx_destroy(context[i]);
delete [] parentDet->slsframe[readoutId+i];
}
};

View File

@ -1569,14 +1569,9 @@ class slsDetector : public slsDetectorUtils, public energyConversion {
int resetFramesCaught();
/**
* Reads a frame from receiver
* @param fName file name of current frame()
* @param acquisitionIndex current acquisition index
* @param frameIndex current frame index (for each scan)
* @param subFrameIndex current sub frame index (for 32 bit mode for eiger)
/returns a frame read from recever
* Reads frames from receiver through a constant socket
*/
int* readFrameFromReceiver(char* fName, int &acquisitionIndex, int &frameIndex, int &subFrameIndex);
void readFrameFromReceiver();
/** Locks/Unlocks the connection to the receiver
/param lock sets (1), usets (0), gets (-1) the lock

View File

@ -507,14 +507,9 @@ class slsDetectorBase : public virtual slsDetectorDefs, public virtual errorDef
/**
* Reads a frame from receiver
* @param fName file name of current frame()
* @param acquisitionIndex current acquisition index
* @param frameIndex current frame index (for each scan)
* @param subFrameIndex current sub frame index (for 32 bit mode for eiger)
/returns a frame read from recever
* Reads frames from receiver through a constant socket
*/
virtual int* readFrameFromReceiver(char* fName, int &acquisitionIndex, int &frameIndex, int &subFrameIndex)=0;
virtual void readFrameFromReceiver()=0;
/** Sets the read receiver frequency

View File

@ -72,10 +72,6 @@ int slsDetectorUtils::acquire(int delflag){
int multiframe = nc*nf;
pthread_mutex_lock(&mg);
acquiringDone = 0;
pthread_mutex_unlock(&mg);
// setTotalProgress();
//moved these 2 here for measurement change
progressIndex=0;
@ -162,7 +158,6 @@ int slsDetectorUtils::acquire(int delflag){
if (*threadedProcessing) {
sem_init(&sem_queue,0,0);
startThread(delflag);
}
#ifdef VERBOSE
@ -341,48 +336,13 @@ int slsDetectorUtils::acquire(int delflag){
//offline
if(setReceiverOnline()==OFFLINE_FLAG){
// wait until data processing thread has finished the data
pthread_mutex_lock(&mg);
acquiringDone = 1;
pthread_mutex_unlock(&mg);
if (*threadedProcessing) {
sem_wait(&sem_queue);
pthread_mutex_lock(&mg);
acquiringDone = 0;
pthread_mutex_unlock(&mg);
if ((getDetectorsType()==GOTTHARD) || (getDetectorsType()==MOENCH) || (getDetectorsType()==JUNGFRAU) ){
if((*correctionMask)&(1<<WRITE_FILE))
closeDataFile();
}
#ifdef VERBOSE
cout << "check data queue size " << endl;
#endif
/*while (dataQueueSize()){
//#ifdef VERBOSE
cout << "AAAAAAAAA check data queue size " << endl;
//#endif
usleep(100000);
}*/
if ((getDetectorsType()==GOTTHARD) || (getDetectorsType()==MOENCH) || (getDetectorsType()==JUNGFRAU) ){
if((*correctionMask)&(1<<WRITE_FILE))
closeDataFile();
}
}
//online
else{
pthread_mutex_lock(&mg);
acquiringDone = 1;
pthread_mutex_unlock(&mg);
// wait until data processing thread has taken the last frame
if (*threadedProcessing) {
sem_wait(&sem_queue);
pthread_mutex_lock(&mg);
acquiringDone = 0;
pthread_mutex_unlock(&mg);
}
pthread_mutex_lock(&mg);
stopReceiver();
pthread_mutex_unlock(&mg);
@ -391,7 +351,6 @@ int slsDetectorUtils::acquire(int delflag){
pthread_mutex_lock(&mp);
if (*stoppedFlag==0) {
executeAction(headerAfter);
@ -506,7 +465,6 @@ int slsDetectorUtils::acquire(int delflag){
#endif
setJoinThread(1);
pthread_join(dataProcessingThread, &status);
sem_destroy(&sem_queue);
#ifdef VERBOSE
cout << "data processing thread joined" << endl;
#endif

View File

@ -29,7 +29,7 @@ extern "C" {
#include <sstream>
#include <queue>
#include <math.h>
#include <semaphore.h>
using namespace std;
@ -641,14 +641,9 @@ virtual int getReceiverCurrentFrameIndex()=0;
virtual int resetFramesCaught()=0;
/**
* Reads a frame from receiver
* @param fName file name of current frame()
* @param acquisitionIndex current acquisition index
* @param frameIndex current frame index (for each scan)
* @param subFrameIndex current sub frame index (for 32 bit mode for eiger)
/returns a frame read from recever
* Reads frames from receiver through a constant socket
*/
virtual int* readFrameFromReceiver(char* fName, int &acquisitionIndex, int &frameIndex, int &subFrameIndex)=0;
virtual void readFrameFromReceiver()=0;
/**
@ -850,6 +845,14 @@ virtual int setReceiverFifoDepth(int i = -1)=0;
int (*progress_call)(double,void*);
void *pProgressCallArg;
public:
//data call back
//individual sls and multi
sem_t sem_slsdone[MAXDET];
sem_t sem_slswait[MAXDET];
int* slsframe[MAXDET];
};