moench zmq process implemented - command implementation is missing

This commit is contained in:
2018-09-13 16:07:43 +02:00
parent f288390255
commit 19e7ced332
12 changed files with 867 additions and 738 deletions

View File

@ -17,9 +17,12 @@
#include<iostream>
//#include "analogDetector.h"
//#include "multiThreadedAnalogDetector.h"
//#include "singlePhotonDetector.h"
#include "interpolatingDetector.h"
#include "multiThreadedAnalogDetector.h"
//#include "interpolatingDetector.h"
//#include "multiThreadedCountingDetector.h"
#include "multiThreadedInterpolatingDetector.h"
#include "etaInterpolationPosXY.h"
#include "ansi.h"
#include <iostream>
using namespace std;
@ -36,27 +39,34 @@ int main(int argc, char *argv[]) {
*/
FILE *of=NULL;
int fifosize=1000;
int nthreads=20;
int etabins=1000;//nsubpix*2*100;
double etamin=-1, etamax=2;
// help
if (argc < 3 ) {
cprintf(RED, "Help: ./trial [receive socket ip] [receive starting port number] [send_socket ip] [send starting port number]\n");
return EXIT_FAILURE;
cprintf(RED, "Help: ./trial [receive socket ip] [receive starting port number] [send_socket ip] [send starting port number] [nthreads] [nsubpix] [etafile]\n");
return EXIT_FAILURE;
}
// receive parameters
bool send = false;
char* socketip=argv[1];
uint32_t portnum = atoi(argv[2]);
int maxSize = 32*2*8192;//5000;//atoi(argv[3]);
int size= 32*2*5000;
int multisize=size;
// send parameters if any
char* socketip2 = 0;
uint32_t portnum2 = 0;
if (argc > 3) {
send = true;
int ok;
if (argc > 4) {
socketip2 = argv[3];
portnum2 = atoi(argv[4]);
if (portnum2>0)
send = true;
}
cout << "\nrx socket ip : " << socketip <<
"\nrx port num : " << portnum ;
@ -64,26 +74,56 @@ int main(int argc, char *argv[]) {
cout << "\ntx socket ip : " << socketip2 <<
"\ntx port num : " << portnum2;
}
cout << endl;
int nthreads=5;
if (argc>5)
nthreads=atoi(argv[5]);
cout << "Number of threads is: " << nthreads << endl;
int nSubPixels=2;
if (argc>6)
nSubPixels=atoi(argv[6]);
cout << "Number of subpixels is: " << nSubPixels << endl;
char *etafname=NULL;
if (argc>7) {
etafname=argv[7];
cout << "Eta file name is: " << etafname << endl;
}
//slsDetectorData *det=new moench03T1ZmqDataNew();
moench03T1ZmqDataNew *det=new moench03T1ZmqDataNew();
cout << endl << " det" <<endl;
int npx, npy;
det->getDetectorSize(npx, npy);
int maxSize = npx*npy*2;//32*2*8192;//5000;//atoi(argv[3]);
int size= maxSize;//32*2*5000;
int multisize=size;
int dataSize=size;
//analogDetector<uint16_t> *filter=new analogDetector<uint16_t>(det,1,NULL,1000);
singlePhotonDetector *filter=new singlePhotonDetector(det,3, 5, 1, 0, 1000, 10);
// interpolatingDetector *filter=new interpolatingDetector(det,NULL, 5, 1, 0, 1000, 10);
//singlePhotonDetector *filter=new singlePhotonDetector(det,3, 5, 1, 0, 1000, 10);
eta2InterpolationPosXY *interp=new eta2InterpolationPosXY(npx, npy, nSubPixels, etabins, etamin, etamax);
if (etafname) interp->readFlatField(etafname);
interpolatingDetector *filter=new interpolatingDetector(det,interp, 5, 1, 0, 1000, 10);
cout << " filter" <<endl;
char* buff;
multiThreadedAnalogDetector *mt=new multiThreadedAnalogDetector(filter,nthreads,fifosize);
cout << " multi" <<endl;
// multiThreadedCountingDetector *mt=new multiThreadedCountingDetector(filter,nthreads,fifosize);
// multiThreadedAnalogDetector *mt=new multiThreadedAnalogDetector(filter,nthreads,fifosize);
multiThreadedInterpolatingDetector *mt=new multiThreadedInterpolatingDetector(filter,nthreads,fifosize);
mt->setFrameMode(eFrame);
mt->StartThreads();
cout << " start" <<endl;
mt->popFree(buff);
cout << " pop" <<endl;
ZmqSocket* zmqsocket=NULL;
@ -120,7 +160,7 @@ int main(int argc, char *argv[]) {
// send socket
ZmqSocket* zmqsocket2 = 0;
cout << "zmq2 " << endl;
// cout << "zmq2 " << endl;
if (send) {
#ifdef NEWZMQ
// receive socket
@ -133,25 +173,29 @@ int main(int argc, char *argv[]) {
#ifdef NEWZMQ
} catch (...) {
cprintf(RED, "Error: Could not create Zmq socket server on port %d and ip %s\n", portnum2, socketip2);
delete zmqsocket2;
delete zmqsocket;
return EXIT_FAILURE;
// delete zmqsocket2;
// zmqsocket2=NULL;
// delete zmqsocket;
// return EXIT_FAILURE;
send = false;
}
#endif
#ifndef NEWZMQ
if (zmqsocket2->IsError()) {
cprintf(RED, "Error: Could not create Zmq socket server on port %d and ip %s\n", portnum2, socketip2);
delete zmqsocket2;
delete zmqsocket;
return EXIT_FAILURE;
cprintf(RED, "AAA Error: Could not create Zmq socket server on port %d and ip %s\n", portnum2, socketip2);
// delete zmqsocket2;
//delete zmqsocket;
// return EXIT_FAILURE;
send = false;
}
#endif
if (zmqsocket2->Connect()) {
cprintf(RED, "Error: Could not connect to socket %s\n",
cprintf(RED, "BBB Error: Could not connect to socket %s\n",
zmqsocket2->GetZmqServerAddress());
delete zmqsocket2;
return EXIT_FAILURE;
// delete zmqsocket2;
send = false;
// return EXIT_FAILURE;
} else
printf("Zmq Client at %s\n", zmqsocket2->GetZmqServerAddress());
@ -176,7 +220,6 @@ int main(int argc, char *argv[]) {
int *detimage;
int nnx, nny,nns;
uint32_t imageSize = 0, nPixelsX = 0, nPixelsY = 0, dynamicRange = 0;
filter->getImageSize(nnx, nny,nns);
// infinite loop
uint32_t packetNumber = 0;
uint64_t bunchId = 0;
@ -187,7 +230,7 @@ int main(int argc, char *argv[]) {
uint16_t zCoord = 0;
uint32_t debug = 0;
uint32_t dr = 16;
int16_t *dout=new int16_t [nnx*nny];
int16_t *dout;//=new int16_t [nnx*nny];
//uint32_t dr = 32;
//int32_t *dout=new int32_t [nnx*nny];
uint32_t nSigma=5;
@ -214,6 +257,13 @@ int main(int argc, char *argv[]) {
frameMode fMode;
double *ped;
filter->getImageSize(nnx, nny,nns);
while(1) {
@ -237,48 +287,76 @@ int main(int argc, char *argv[]) {
while (mt->isBusy()) {;}//wait until all data are processed from the queues
detimage=mt->getImage(nnx,nny,nns);
if (fMode==ePedestal) {
nns=1;
// cout << "get pedestal " << endl;
ped=mt->getPedestal();
// cout << "got pedestal " << endl;
for (int ix=0; ix<nnx; ix++) {
for (int iy=0; iy<nny; iy++) {
dout[iy*nnx+ix]=ped[(iy)*nnx+ix];
// cout << ped[(iy)*nnx+ix] << " " ;
}
}
// cout << endl ;
// cout << "done " << endl;
} else {
for (int ix=0; ix<nnx; ix++) {
for (int iy=0; iy<nny; iy++) {
for (int isx=0; isx<nns; isx++) {
for (int isy=0; isy<nns; isy++) {
if (isx==0 && isy==0)
dout[iy*nnx+ix]=detimage[(iy+isy)*nnx*nns+ix+isx];
else
dout[iy*nnx+ix]+=detimage[(iy+isy)*nnx*nns+ix+isx];
}
}
if (dout[iy*nnx+ix]<0) dout[iy*nnx+ix]=0;
}
if (of) {
fclose(of);
of=NULL;
}
if (fMode==ePedestal) {
sprintf(ofname,"ped_%s_%d.tiff",fname,fileindex);
mt->writePedestal(ofname);
} else if (fMode==eFlat) {
mt->prepareInterpolation(ok);
sprintf(ofname,"eta_%s_%d.tiff",fname,fileindex);
mt->writeFlatField(ofname);
} else {
sprintf(ofname,"%s_%d.tiff",fname,fileindex);
mt->writeImage(ofname);
}
// cout << nns*nnx*nny*nns*dr/8 << " " << length << endl;
if (send) {
strcpy(fname,filename.c_str());
#ifdef NEWZMQ
//zmqsocket2->SendHeaderData (0, false, SLS_DETECTOR_JSON_HEADER_VERSION, dynamicRange, fileindex,
// nnx, nny, nns*dynamicRange/8,acqIndex, frameIndex, fname, acqIndex, subFrameIndex, packetNumber,bunchId, timestamp, modId, xCoord, yCoord, zCoord,debug, roundRNumber, detType, version, flippedData, additionalJsonHeader);
if (fMode==ePedestal) {
nns=1;
nnx=npx;
nny=npy;
dout= new int16_t[nnx*nny*nns*nns];
// cout << "get pedestal " << endl;
ped=mt->getPedestal();
// cout << "got pedestal " << endl;
for (int ix=0; ix<nnx*nny; ix++) {
// zmqsocket2->SendHeaderData (0, false, SLS_DETECTOR_JSON_HEADER_VERSION, dr, fileindex, nnx, nny, nns*dr/8,acqIndex, frameIndex, fname, acqIndex, subFrameIndex, packetNumber,bunchId, timestamp, modId, xCoord, yCoord, zCoord,debug, roundRNumber, detType, version, flippedData, additionalJsonHeader);
dout[ix]=ped[ix];
}
} else if (fMode==eFlat) {
int nb;
double emi, ema;
int *ff=mt->getFlatField(nb, emi, ema);
nnx=nb;
nny=nb;
dout= new int16_t[nb*nb];
for (int ix=0; ix<nb*nb; ix++) {
dout[ix]=ff[ix];
}
} else {
detimage=mt->getImage(nnx,nny,nns);
// nns=1;
// nnx=npx;
// nny=npy;
nnx=nnx*nns;
nny=nny*nns;
dout= new int16_t[nnx*nny];
for (int ix=0; ix<nnx*nns; ix++) {
// for (int iy=0; iy<nny*nns; iy++) {
// for (int isx=0; isx<nns; isx++) {
// for (int isy=0; isy<nns; isy++) {
// if (isx==0 && isy==0)
// dout[iy*nnx+ix]=detimage[(iy+isy)*nnx*nns+ix+isx];
// else
// dout[iy*nnx+ix]+=detimage[(iy+isy)*nnx*nns+ix+isx];
// }
// }
dout[ix]=detimage[ix];
if (dout[ix]<0) dout[ix]=0;
// }
}
}
#ifdef NEWZMQ
zmqsocket2->SendHeaderData (0, false, SLS_DETECTOR_JSON_HEADER_VERSION, dr, fileindex, nnx, nny, nnx*nny*dr/8,acqIndex, frameIndex, fname, acqIndex, subFrameIndex, packetNumber,bunchId, timestamp, modId, xCoord, yCoord, zCoord,debug, roundRNumber, detType, version, flippedData, additionalJsonHeader);
@ -289,27 +367,27 @@ int main(int argc, char *argv[]) {
#endif
zmqsocket2->SendData((char*)dout,length);//nns*dr/8);
cprintf(GREEN, "Sent Data\n");
}
// stream dummy to socket2 to signal end of acquisition
if (send) {
zmqsocket2->SendHeaderData(0, true, SLS_DETECTOR_JSON_HEADER_VERSION);
cprintf(RED, "Sent Dummy\n");
cprintf(GREEN, "Sent Data\n");
zmqsocket2->SendHeaderData(0, true, SLS_DETECTOR_JSON_HEADER_VERSION);
cprintf(RED, "Sent Dummy\n");
delete [] dout;
}
mt->clearImage();
if (of) {
fclose(of);
of=NULL;
}
newFrame=1;
continue; //continue to not get out
}
#ifdef NEWZMQ
if (newFrame) {
// acqIndex, frameIndex, subframeIndex, filename, fileindex
size = doc["size"].GetUint();
multisize = size;// * zmqsocket->size();
@ -334,36 +412,38 @@ int main(int argc, char *argv[]) {
detType=doc["detType"].GetUint();
version=doc["version"].GetUint();
dataSize=size;
strcpy(fname,filename.c_str());
cprintf(BLUE, "Header Info:\n"
"size: %u\n"
"multisize: %u\n"
"dynamicRange: %u\n"
"nPixelsX: %u\n"
"nPixelsY: %u\n"
"currentFileName: %s\n"
"currentAcquisitionIndex: %lu\n"
"currentFrameIndex: %lu\n"
"currentFileIndex: %lu\n"
"currentSubFrameIndex: %u\n"
"xCoordX: %u\n"
"yCoordY: %u\n"
"zCoordZ: %u\n"
"flippedDataX: %u\n"
"packetNumber: %u\n"
"bunchId: %u\n"
"timestamp: %u\n"
"modId: %u\n"
"debug: %u\n"
"roundRNumber: %u\n"
"detType: %u\n"
"version: %u\n",
size, multisize, dynamicRange, nPixelsX, nPixelsY,
filename.c_str(), acqIndex,
frameIndex, fileindex, subFrameIndex,
xCoord, yCoord,zCoord,
flippedDataX, packetNumber, bunchId, timestamp, modId, debug, roundRNumber, detType, version);
// cprintf(BLUE, "Header Info:\n"
// "size: %u\n"
// "multisize: %u\n"
// "dynamicRange: %u\n"
// "nPixelsX: %u\n"
// "nPixelsY: %u\n"
// "currentFileName: %s\n"
// "currentAcquisitionIndex: %lu\n"
// "currentFrameIndex: %lu\n"
// "currentFileIndex: %lu\n"
// "currentSubFrameIndex: %u\n"
// "xCoordX: %u\n"
// "yCoordY: %u\n"
// "zCoordZ: %u\n"
// "flippedDataX: %u\n"
// "packetNumber: %u\n"
// "bunchId: %u\n"
// "timestamp: %u\n"
// "modId: %u\n"
// "debug: %u\n"
// "roundRNumber: %u\n"
// "detType: %u\n"
// "version: %u\n",
// size, multisize, dynamicRange, nPixelsX, nPixelsY,
// filename.c_str(), acqIndex,
// frameIndex, fileindex, subFrameIndex,
// xCoord, yCoord,zCoord,
// flippedDataX, packetNumber, bunchId, timestamp, modId, debug, roundRNumber, detType, version);
/* Analog detector commands */
isPedestal=0;
@ -386,7 +466,7 @@ int main(int argc, char *argv[]) {
fMode=eFlat;
isFlat=1;
} else if (frameMode_s == "newFlatfield") {
//mt->resetFlatfield();
mt->resetFlatField();
isFlat=1;
//cprintf(MAGENTA, "Resetting flatfield\n");
fMode=eFlat;
@ -397,16 +477,15 @@ int main(int argc, char *argv[]) {
cprintf(MAGENTA, "%s\n" , frameMode_s.c_str());
mt->setFrameMode(fMode);
threshold=0;
// threshold=0;
cprintf(MAGENTA, "Threshold: ");
if (doc.HasMember("threshold")) {
if (doc["threshold"].IsInt())
if (doc["threshold"].IsInt()) {
threshold=doc["threshold"].GetInt();
}
mt->setThreshold(threshold);
mt->setThreshold(threshold);
}
}
cprintf(MAGENTA, "%d\n", threshold);
xmin=0;
xmax=npx;
@ -433,12 +512,13 @@ int main(int argc, char *argv[]) {
}
}
cprintf(MAGENTA, "%d %d %d %d\n", xmin, xmax, ymin, ymax);
cprintf(MAGENTA, "ROI: %d %d %d %d\n", xmin, xmax, ymin, ymax);
mt->setROI(xmin, xmax, ymin, ymax);
// if (doc.HasMember("dynamicRange")) {
// dr=doc["dynamicRange"].GetUint();
// }
if (doc.HasMember("dynamicRange")) {
dr=doc["dynamicRange"].GetUint();
dr=16;
}
dMode=eAnalog;
detectorMode_s="analog";
@ -463,28 +543,44 @@ int main(int argc, char *argv[]) {
/* Single Photon Detector commands */
// if (doc.HasMember("nSigma")) {
// nSigma=doc["nSigma"].GetUint();
// }
nSigma=5;
if (doc.HasMember("nSigma")) {
if (doc["nSigma"].IsInt())
nSigma=doc["nSigma"].GetInt();
mt->setNSigma(nSigma);
}
emin=-1;
emax=-1;
if (doc.HasMember("energyRange")) {
if (doc["energyRange"].IsArray()) {
if (doc["energyRange"].Size() > 0 )
if (doc["energyRange"][0].IsInt())
emin=doc["energyRange"][0].GetInt();
if (doc["energyRange"].Size() > 1 )
if (doc["energyRange"][1].IsInt())
emax=doc["energyRange"][1].GetUint();
}
}
if (doc.HasMember("eMin")) {
if (doc["eMin"][1].IsInt())
emin=doc["eMin"].GetInt();
}
if (doc.HasMember("eMax")) {
if (doc["eMax"][1].IsInt())
emin=doc["eMax"].GetInt();
}
mt->setEnergyRange(emin,emax);
// if (doc.HasMember("energyRange")) {
// emin=doc["energyRange"][0].GetUint();
// emax=doc["energyRange"][1].GetUint();
// }
/* interpolating detector commands */
// if (doc.HasMember("nSubPixels")) {
// nsubPixels=doc["nSubPixels"].GetUint();
// }
if (doc.HasMember("nSubPixels")) {
if (doc["nSubPixels"].IsUint())
nSubPixels=doc["nSubPixels"].GetUint();
mt->setNSubPixels(nSubPixels);
}
//if (doc.HasMember("interpolation")) {
// intMode_s=doc["intepolation"].GetString();
//}
newFrame=0;
zmqsocket->CloseHeaderMessage();
@ -514,6 +610,7 @@ int main(int argc, char *argv[]) {
iframe++;
} // exiting infinite loop