works, need to do json header and send dataready

This commit is contained in:
Dhanya Maliakal
2016-09-15 17:15:55 +02:00
parent a821442b1a
commit 1da4b07e73
7 changed files with 247 additions and 418 deletions

View File

@ -23,6 +23,7 @@ ID: $Id$
#include <sys/shm.h>
#include <iostream>
#include <string>
#include <zmq.h>
using namespace std;
@ -267,30 +268,28 @@ multiSlsDetector::multiSlsDetector(int id) : slsDetectorUtils(), shmId(-1)
getNMods();
getMaxMods();
threadpool = 0;
zmqthreadpool = 0;
if(createThreadPool(&threadpool) == FAIL)
if(createThreadPool() == FAIL)
exit(-1);
}
multiSlsDetector::~multiSlsDetector() {
//removeSlsDetector();
destroyThreadPool(&threadpool);
destroyThreadPool(&zmqthreadpool);
destroyThreadPool();
}
int multiSlsDetector::createThreadPool(ThreadPool** t){
if(*t){
(ThreadPool*)(*t)->destroy_threadpool();
*t=0;
int multiSlsDetector::createThreadPool(){
if(threadpool){
threadpool->destroy_threadpool();
threadpool=0;
}
if(thisMultiDetector->numberOfDetectors < 1){
cout << "No detectors attached to create threadpool" << endl;
return OK;
}
*t = new ThreadPool(thisMultiDetector->numberOfDetectors);
switch(((ThreadPool*)(*t))->initialize_threadpool()){
threadpool = new ThreadPool(thisMultiDetector->numberOfDetectors);
switch(threadpool->initialize_threadpool()){
case 0:
cerr << "Failed to initialize thread pool!" << endl;
return FAIL;
@ -301,19 +300,19 @@ int multiSlsDetector::createThreadPool(ThreadPool** t){
break;
default:
#ifdef VERBOSE
cout << "Initialized Threadpool " << *t << endl;
cout << "Initialized Threadpool " << threadpool << endl;
#endif
break;
}
return OK;
}
void multiSlsDetector::destroyThreadPool(ThreadPool** t){
if(*t){
(ThreadPool*)(*t)->destroy_threadpool();
*t=0;
void multiSlsDetector::destroyThreadPool(){
if(threadpool){
threadpool->destroy_threadpool();
threadpool=0;
#ifdef VERBOSE
cout<<"Destroyed Threadpool "<< *t << endl;
cout<<"Destroyed Threadpool "<< threadpool << endl;
#endif
}
}
@ -394,7 +393,7 @@ int multiSlsDetector::addSlsDetector(int id, int pos) {
//set offsets
updateOffsets();
if(createThreadPool(&threadpool) == FAIL)
if(createThreadPool() == FAIL)
exit(-1);
@ -865,7 +864,7 @@ int multiSlsDetector::removeSlsDetector(int pos) {
}
updateOffsets();
if(createThreadPool(&threadpool) == FAIL)
if(createThreadPool() == FAIL)
exit(-1);
return thisMultiDetector->numberOfDetectors;
@ -4957,17 +4956,113 @@ int multiSlsDetector::resetFramesCaught() {
}
void* multiSlsDetector::startReceivingDataThread(void* this_pointer){
((multiSlsDetector*)this_pointer)->startReceivingData();
return this_pointer;
}
void multiSlsDetector::startReceivingData(){
int ithread = currentThreadIndex; //set current thread value index
threadStarted = true; //let calling function know thread started and obtained current
int numReadoutPerDetector = 1;
bool jungfrau = false;
if(getDetectorsType() == EIGER){
numReadoutPerDetector = 2;
}else if(getDetectorsType() == JUNGFRAU)
jungfrau = true;
//server details
char hostname[100];
int portno;
int singleDatabytes = detectors[ithread/numReadoutPerDetector]->getDataBytes();
int nel=(singleDatabytes/numReadoutPerDetector)/sizeof(int);
portno = DEFAULT_ZMQ_PORTNO + (ithread);
sprintf(hostname, "%s%d", "tcp://127.0.0.1:",portno);
//cout << "ZMQ Client of " << ithread << " at " << hostname << endl;
singleframe[ithread]=new int[nel];
//loop though the half readouts to start sockets
void *context;
void *zmqsocket;
context = zmq_ctx_new();
zmqsocket = zmq_socket(context, ZMQ_PULL);
//zmq_setsockopt(zmqsocket, ZMQ_SUBSCRIBE, "", 0); // an empty string implies receiving any messages
zmq_connect(zmqsocket, hostname); // connect to publisher,the publisher server does not have to be started
pthread_mutex_lock(&ms);
receivingDataThreadMask|=(1<<(ithread));
//cout<<ithread<< " single created "<<hex<<receivingDataThreadMask<<endl;
pthread_mutex_unlock(&ms);
//receive msgs and let multi know
zmq_msg_t message;
int len,idet = 0;
int framecount=0;
//read frame
while(true){
//cprintf(GREEN,"single %d waiting for multi\n",ithread);
sem_wait(&sem_singlewait[ithread]); //wait for it to be copied
//cprintf(GREEN,"single %d got multi\n",ithread);
if(!idet) framecount++; //update indices, count only once
// receive a message, this is a blocking function
len = zmq_msg_init (&message); /* is this required? Xiaoqiang didnt have it*/
if(len) {cprintf(RED,"Failed to initialize message %d for %d\n",len,ithread); continue; } //error
len = zmq_msg_recv(&message, zmqsocket, 0);
//if(len<1024*256)
//cprintf(RED,"got less than planned for socket %d\n",ithread);
//end of socket
if (len <= 3 ) {
if(!len) cprintf(RED,"Received no data in socket for %d\n", ithread);
//cout<<ithread <<" sls Received end data"<<endl;
singleframe[ithread] = NULL;
pthread_mutex_lock(&ms);
receivingDataThreadMask^=(1<<ithread);
pthread_mutex_unlock(&ms);
sem_post(&sem_singledone[ithread]); //let multi know is ready
//cprintf(GREEN,"single %d posted done (finished) for multi\n",ithread);
//cout << ithread << " single finished" << endl;
break;
}
if(zmq_msg_data(&message)==NULL) cprintf(RED,"GOT NULL FROM ZMQ\n"); /*not needed most likely*/
//cout<<"Received on " << ithread << " for frame " << framecount << endl;
//if(len == singleDatabytes/numReadoutPerDetector){//hoow to solve this
memcpy((char*)(singleframe[ithread]),(char*)zmq_msg_data(&message),singleDatabytes/numReadoutPerDetector);
//check header, if incorrect frame, copy somewhere and assign a blank subframe and also check size
//jungfrau masking adcval
if(jungfrau){
for(unsigned int i=0;i<nel;i++){
singleframe[ithread][i] = (singleframe[ithread][i] & 0x3FFF3FFF);
}
}
//}
sem_post(&sem_singledone[ithread]);//let multi know is ready
//cprintf(GREEN,"single %d posted done for multi\n",ithread);
}
zmq_msg_close(&message);
//close socket
zmq_disconnect(zmqsocket, hostname);
zmq_close(zmqsocket);
zmq_ctx_destroy(context);
}
void multiSlsDetector::readFrameFromReceiver(){
//Note:num threads = (num slsDets = num tasks)
//so, half slsdet readouts read serially in each task (eiger udp ports)
//create zmq threads
if(createThreadPool(&zmqthreadpool) == FAIL){
cprintf(BG_RED,"Error: Could not create the zmq threads\n");
return;
}
zmqthreadpool->setzeromqThread(); //for debugging
//determine number of half readouts and maxX and maxY
int maxX=0,maxY=0;
@ -4980,125 +5075,131 @@ void multiSlsDetector::readFrameFromReceiver(){
int numReadouts = numReadoutPerDetector * thisMultiDetector->numberOfDetectors;
//start all socket tasks
volatile uint64_t runningMask = 0x0;
int slsdatabytes = 0, slsmaxchannels = 0, bytesperchannel = 0, slsmaxX = 0, slsmaxY=0;
if(!zmqthreadpool){
cout << "Error in creating threadpool. Exiting" << endl;
return;
}else{
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++){
if(detectors[idet]){
sem_init(&sem_slswait[idet*numReadoutPerDetector],1,0);
sem_init(&sem_slsdone[idet*numReadoutPerDetector],1,0);
sem_init(&sem_multiwait[idet*numReadoutPerDetector],1,0);
if(numReadoutPerDetector>1){
sem_init(&sem_slswait[idet*numReadoutPerDetector+1],1,0);
sem_init(&sem_slsdone[idet*numReadoutPerDetector+1],1,0);
sem_init(&sem_multiwait[idet*numReadoutPerDetector+1],1,0);
}
Task* task = new Task(new func00_t<void, slsDetector>(&slsDetector::readFrameFromReceiver,detectors[idet]));
zmqthreadpool->add_task(task);
if(!slsdatabytes){
slsdatabytes = detectors[idet]->getDataBytes();
slsmaxchannels = detectors[idet]->getMaxNumberOfChannels();
bytesperchannel = slsdatabytes/slsmaxchannels;
slsmaxX = detectors[idet]->getTotalNumberOfChannels(X);
slsmaxY = detectors[idet]->getTotalNumberOfChannels(Y);
}
//set mask
runningMask|=(1<<(idet*numReadoutPerDetector));
if(numReadoutPerDetector>1)
runningMask|=(1<<(idet*numReadoutPerDetector+1));
}
//create threads
/** Data Callback Threads */
pthread_t receivingDataThreads[numReadouts];
volatile uint64_t expectedMask = 0x0;
receivingDataThreadMask = 0x0;
currentThreadIndex = -1;
for(int i = 0; i < numReadouts; ++i){
threadStarted = false;
currentThreadIndex = i;
sem_init(&sem_singlewait[i],1,0);
sem_init(&sem_singledone[i],1,0);
if(pthread_create(&receivingDataThreads[i], NULL,startReceivingDataThread, (void*) this)){
cprintf(RED, "ERROR: Could not create receiving thread with index %d\n",i);
return;
}
while(!threadStarted);
//cout << "Data Thread created successfully for " << i << endl;
expectedMask|=(1<<i);
}
zmqthreadpool->startExecuting(); //tell them to start
for(int i=0;i<numReadouts;++i) //wait for all of them to have started
sem_wait(&sem_multiwait[i]);
sem_post(&dataThreadStartedSemaphore); //let utils:acquire continue to start measurement/acquisition
//cout<<"multi waiting for all threads to be created "<<hex<<receivingDataThreadMask<<" to be matched with " <<expectedMask<< endl;
//wait for the last few threads remaining to be ready
while(receivingDataThreadMask != expectedMask);
//cout<<"multi threads created"<<endl;
int slsdatabytes = 0, slsmaxchannels = 0, bytesperchannel = 0, slsmaxX = 0, slsmaxY=0;
if(detectors[0]){
slsdatabytes = detectors[0]->getDataBytes();
slsmaxchannels = detectors[0]->getMaxNumberOfChannels();
bytesperchannel = slsdatabytes/slsmaxchannels;
slsmaxX = detectors[0]->getTotalNumberOfChannels(X);
slsmaxY = detectors[0]->getTotalNumberOfChannels(Y);
}
int nel=(thisMultiDetector->dataBytes)/sizeof(int);
if(nel <= 0){
cout << "Multislsdetector databytes not valid :" << thisMultiDetector->dataBytes << endl;
cprintf(RED,"Error: Multislsdetector databytes not valid : %d\n", thisMultiDetector->dataBytes);
return;
}
int* multiframe=new int[nel];
int* p = multiframe;
int idet,offsetY,offsetX;
int halfreadoutoffset = (slsmaxX/numReadoutPerDetector);
//after reconstruction
int framecount=0;
int nx =getTotalNumberOfChannels(slsDetectorDefs::X);
int ny =getTotalNumberOfChannels(slsDetectorDefs::Y);
sem_post(&dataThreadStartedSemaphore); //let utils:acquire continue to start measurement/acquisition
//cprintf(BLUE,"all sockets created\n");
//construct complete image and send to callback
while(true){
memset(((char*)multiframe),0x0,slsdatabytes*thisMultiDetector->numberOfDetectors); //reset frame memory
//post all of them to start
for(int ireadout=0; ireadout<numReadouts; ++ireadout){
if((1 << ireadout) & receivingDataThreadMask){
sem_post(&sem_singlewait[ireadout]); //sls to continue
//cprintf(BLUE,"multi posted det %d\n",ireadout);
}
}
//get each frame
for(int ireadout=0; ireadout<numReadouts; ++ireadout){
idet = ireadout/numReadoutPerDetector;
if(detectors[idet]){
if((1 << ireadout) & runningMask){ //if running
sem_post(&sem_slswait[ireadout]); //sls to continue
sem_wait(&sem_slsdone[ireadout]); //wait for sls to copy
if((1 << ireadout) & receivingDataThreadMask){ //if running
//wait for single to copy
//cprintf(BLUE,"multi waiting done by det %d\n",ireadout);
sem_wait(&sem_singledone[ireadout]); //wait for sls to copy
//cprintf(BLUE,"multi got done by det %d\n",ireadout);
//this socket closed
if(slsframe[ireadout] == NULL){
runningMask^=(1<<ireadout);
if(!runningMask){ //all done, get out
break;
}
continue;
//this socket closed
if(!((1 << ireadout) & receivingDataThreadMask)){ //if running
continue;
}
//assemble data
if(maxX){ //eiger, so interleaving between ports in one readout itself
offsetY = (maxY - (thisMultiDetector->offsetY[idet] + slsmaxY)) * maxX * bytesperchannel;
//the left half or right half
if(!(ireadout%numReadoutPerDetector))
offsetX = thisMultiDetector->offsetX[idet];
else
offsetX = thisMultiDetector->offsetX[idet] + halfreadoutoffset;
offsetX *= bytesperchannel;
//cprintf(BLUE,"offsetx:%d offsety:%d maxx:%d slsmaxX:%d slsmaxY:%d bytesperchannel:%d\n",
// offsetX,offsetY,maxX,slsmaxX,slsmaxY,bytesperchannel);
// cprintf(BLUE,"copying bytes:%d\n", (slsmaxX/numReadoutPerDetector)*bytesperchannel);
//itnerleaving with other detectors
//bottom
if(((idet+1)%2) == 0){
for(int i=0;i<slsmaxY;++i)
memcpy(((char*)multiframe) + offsetY + offsetX + ((slsmaxY-i)*maxX*bytesperchannel),
(char*)singleframe[ireadout]+ i*(slsmaxX/numReadoutPerDetector)*bytesperchannel,
(slsmaxX/numReadoutPerDetector)*bytesperchannel);
}
//assemble data
if(maxX){ //eiger, so interleaving between ports in one readout itself
offsetY = (maxY - (thisMultiDetector->offsetY[idet] + slsmaxY)) * maxX * bytesperchannel;
//the left half or right half
if(!(ireadout%numReadoutPerDetector))
offsetX = thisMultiDetector->offsetX[idet];
else
offsetX = thisMultiDetector->offsetX[idet] + halfreadoutoffset;
offsetX *= bytesperchannel;
//cprintf(BLUE,"offsetx:%d offsety:%d maxx:%d slsmaxX:%d slsmaxY:%d bytesperchannel:%d\n",
// offsetX,offsetY,maxX,slsmaxX,slsmaxY,bytesperchannel);
//cprintf(BLUE,"copying bytes:%d\n", (slsmaxX/numReadout)*bytesperchannel);
//itnerleaving with other detectors
//bottom
if(((idet+1)%2) == 0){
for(int i=0;i<slsmaxY;++i)
memcpy(((char*)multiframe) + offsetY + offsetX + ((slsmaxY-i)*maxX*bytesperchannel),
(char*)slsframe[ireadout]+ i*(slsmaxX/numReadoutPerDetector)*bytesperchannel,
(slsmaxX/numReadoutPerDetector)*bytesperchannel);
}
//top
else{
for(int i=0;i<slsmaxY;++i)
memcpy(((char*)multiframe) + offsetY + offsetX + (i*maxX*bytesperchannel),
(char*)slsframe[ireadout]+ i*(slsmaxX/numReadoutPerDetector)*bytesperchannel,
(slsmaxX/numReadoutPerDetector)*bytesperchannel);
}
}
//no interleaving, just add to the end
//numReadout always 1 here
//top
else{
memcpy(p,multiframe,slsdatabytes);
p+=slsdatabytes/sizeof(int);
for(int i=0;i<slsmaxY;++i)
memcpy(((char*)multiframe) + offsetY + offsetX + (i*maxX*bytesperchannel),
(char*)singleframe[ireadout]+ i*(slsmaxX/numReadoutPerDetector)*bytesperchannel,
(slsmaxX/numReadoutPerDetector)*bytesperchannel);
}
}
//no interleaving, just add to the end
//numReadout always 1 here
else{
memcpy(p,multiframe,slsdatabytes);
p+=slsdatabytes/sizeof(int);
}
}
}//end of for loop
}
if(!runningMask){
//cout<<"receivingDataThreadMask:"<<hex<<receivingDataThreadMask<<endl;
if(!receivingDataThreadMask){
break;
}
//send data to callback
fdata = decodeData(multiframe);
if ((fdata) && (dataReady)){
@ -5112,13 +5213,14 @@ void multiSlsDetector::readFrameFromReceiver(){
setCurrentProgress(framecount);
}
zmqthreadpool->wait_for_tasks_to_complete();
for(int i=0;i<numReadouts;++i){
sem_destroy(&sem_slsdone[i]);
sem_destroy(&sem_slswait[i]);
sem_destroy(&sem_multiwait[i]);
//free resources
for(int i = 0; i < numReadouts; ++i){
pthread_join(receivingDataThreads[i],NULL);
sem_destroy(&sem_singlewait[i]);
sem_destroy(&sem_singledone[i]);
delete [] singleframe[i];
}
destroyThreadPool(&zmqthreadpool);
delete[] multiframe;
}