Zmq communication uses additional headers, moench processing fixed

This commit is contained in:
2018-09-11 17:05:11 +02:00
parent 83600fcb15
commit f288390255
26 changed files with 1826 additions and 868 deletions

View File

@ -0,0 +1,34 @@
#module add CBFlib/0.9.5
INCDIR=-I. -I../ -I../interpolations -I../interpolations/etaVEL -I../dataStructures -I../../slsReceiverSoftware/include
LDFLAG= ../tiffIO.cpp -L/usr/lib64/ -lpthread -lm -lstdc++ -pthread -lrt -ltiff -O3
MAIN=moench03ClusterFinder.cpp
all: moenchClusterFinder moenchMakeEta moenchInterpolation moenchNoInterpolation moenchPhotonCounter moenchAnalog
moenchClusterFinder: moench03ClusterFinder.cpp $(INCS) clean
g++ -o moenchClusterFinder moench03ClusterFinder.cpp $(LDFLAG) $(INCDIR) $(LIBHDF5) $(LIBRARYCBF) -DSAVE_ALL -DNEWRECEIVER
moenchMakeEta: moench03Interpolation.cpp $(INCS) clean
g++ -o moenchMakeEta moench03Interpolation.cpp $(LDFLAG) $(INCDIR) $(LIBHDF5) $(LIBRARYCBF) -DFF
moenchInterpolation: moench03Interpolation.cpp $(INCS) clean
g++ -o moenchInterpolation moench03Interpolation.cpp $(LDFLAG) $(INCDIR) $(LIBHDF5) $(LIBRARYCBF)
moenchNoInterpolation: moench03NoInterpolation.cpp $(INCS) clean
g++ -o moenchNoInterpolation moench03NoInterpolation.cpp $(LDFLAG) $(INCDIR) $(LIBHDF5) $(LIBRARYCBF)
moenchPhotonCounter: moenchPhotonCounter.cpp $(INCS) clean
g++ -o moenchPhotonCounter moenchPhotonCounter.cpp $(LDFLAG) $(INCDIR) $(LIBHDF5) $(LIBRARYCBF) -DNEWRECEIVER
moenchAnalog: moenchPhotonCounter.cpp $(INCS) clean
g++ -o moenchAnalog moenchPhotonCounter.cpp $(LDFLAG) $(INCDIR) $(LIBHDF5) $(LIBRARYCBF) -DNEWRECEIVER -DANALOG
clean:
rm -f moenchClusterFinder moenchMakeEta moenchInterpolation moenchNoInterpolation moenchPhotonCounter moenchAnalog

View File

@ -5,13 +5,17 @@ LIBRARYCBF=$(CBFLIBDIR)/lib/*.o
INCDIR=-I../../slsReceiverSoftware/include -I$(CBFLIBDIR)/include/ -I. -I../dataStructures ../tiffIO.cpp -I../ -I../interpolations/
LIBHDF5=
#-I../interpolations/etaVEL
LDFLAG= -L/usr/lib64/ -lpthread -lm -lstdc++ -L. -lzmq -pthread -lrt -lhdf5 -ltiff -L$(ZMQLIB) -L$(CBFLIBDIR)/lib/
LDFLAG= -L/usr/lib64/ -lpthread -lm -lstdc++ -L. -lzmq -pthread -lrt -lhdf5 -ltiff -L$(ZMQLIB) -L$(CBFLIBDIR)/lib/ -O3
#-L../../bin
#DESTDIR?=../bin
aaa: moenchZmqProcess
all: moenchZmqClusterFinder moenchZmqInterpolating moenchZmqAnalog
moenchZmqProcess: moenchZmqProcess.cpp $(INCS) clean
g++ -o moenchZmqProcess moenchZmqProcess.cpp $(LDFLAG) $(INCDIR) $(LIBHDF5) $(LIBRARYCBF) -DNEWZMQ
moenchZmqInterpolating: $(MAIN) $(INCS) clean
g++ -o moenchZmqInterpolating moenchZmqInterpolating.cpp $(LDFLAG) $(INCDIR) $(LIBHDF5) $(LIBRARYCBF) -DSAVE_ALL
@ -23,6 +27,6 @@ moenchZmqAnalog: $(MAIN) $(INCS) clean
g++ -o moenchZmqAnalog moenchZmqAnalog.cpp $(LDFLAG) $(INCDIR) $(LIBHDF5) $(LIBRARYCBF) -DSAVE_ALL
clean:
rm -f moench03ZmqInterpolating moench03ZmqClusterFinder moenchZmqAnalog
rm -f moench03ZmqInterpolating moench03ZmqClusterFinder moenchZmqAnalog moenchZmqProcess

View File

@ -26,24 +26,45 @@ using namespace std;
#define XTALK
int main(int argc, char *argv[]) {
/**
* trial.o [socket ip] [starting port number] [outfname]
*
*/
#ifndef FF
if (argc<9) {
cout << "Wrong usage! Should be: "<< argv[0] << " infile " << " etafile outfile runmin runmax ns cmin cmax" << endl;
cout << "Wrong usage! Should be: "<< argv[0] << " infile etafile outfile runmin runmax ns cmin cmax" << endl;
return 1;
}
#endif
#ifdef FF
if (argc<7) {
cout << "Wrong usage! Should be: "<< argv[0] << " infile etafile runmin runmax cmin cmax" << endl;
return 1;
}
#endif
int iarg=4;
char infname[10000];
char fname[10000];
char outfname[10000];
int runmin=atoi(argv[4]);
int runmax=atoi(argv[5]);
int nsubpix=atoi(argv[6]);
float cmin=atof(argv[7]);
float cmax=atof(argv[8]);
#ifndef FF
iarg=4;
#endif
#ifdef FF
iarg=3;
#endif
int runmin=atoi(argv[iarg++]);
int runmax=atoi(argv[iarg++]);
cout << "Run min: " << runmin << endl;
cout << "Run max: " << runmax << endl;
int nsubpix=4;
#ifndef FF
nsubpix=atoi(argv[iarg++]);
cout << "Subpix: " << nsubpix << endl;
#endif
float cmin=atof(argv[iarg++]);
float cmax=atof(argv[iarg++]);
cout << "Energy min: " << cmin << endl;
cout << "Energy max: " << cmax << endl;
int etabins=1000;//nsubpix*2*100;
double etamin=-1, etamax=2;
@ -58,9 +79,6 @@ int main(int argc, char *argv[]) {
int ix, iy, isx, isy;
int nframes=0, lastframe=-1;
double d_x, d_y, res=5, xx, yy;
#ifdef MANYFILES
int ff=1000;
#endif
int nph=0, badph=0, totph=0;
FILE *f=NULL;
@ -72,32 +90,19 @@ int main(int argc, char *argv[]) {
single_photon_hit cl(3,3);
#endif
#ifdef XTALK
int old_val[3][3];
int new_val[3][3];
double xcorr=0.04;
// int ix=0;
#endif
eta2InterpolationPosXY *interp=new eta2InterpolationPosXY(NC, NR, nsubpix, etabins, etamin, etamax);
eta3InterpolationPosXY *interp3=new eta3InterpolationPosXY(NC, NR, nsubpix, etabins, eta3min, eta3max);
noInterpolation *dummy=new noInterpolation(NC, NR, nsubpix);
noInterpolation *nointerp=new noInterpolation(NC, NR, nsubpix);
noInterpolation *mult=new noInterpolation(NC, NR, nsubpix);
//etaInterpolationAdaptiveBins *interp=new etaInterpolationAdaptiveBins (NC, NR, nsubpix, etabins, etamin, etamax);
//etaInterpolationRandomBins *interp=new etaInterpolationRandomBins (NC, NR, nsubpix, etabins, etamin, etamax);
//#ifndef FF
#ifndef FF
cout << "read ff " << argv[2] << endl;
sprintf(fname,"%s_eta2.tiff",argv[2]);
sprintf(fname,"%s",argv[2]);
interp->readFlatField(fname);
interp->prepareInterpolation(ok);
sprintf(fname,"%s_eta3.tiff",argv[2]);
interp3->readFlatField(fname);
interp3->prepareInterpolation(ok);
//#endif
#endif
#ifdef FF
cout << "Will write eta file " << argv[2] << endl;
#endif
int *img;
float *totimg=new float[NC*NR*nsubpix*nsubpix];
@ -110,20 +115,18 @@ int main(int argc, char *argv[]) {
}
}
}
#ifdef FF
float ff[nsubpix*nsubpix];
float *ffimg=new float[NC*NR*nsubpix*nsubpix];
float totff=0;
#endif
#ifdef FF
sprintf(outfname,argv[2]);
#endif
int irun;
for (irun=runmin; irun<runmax; irun++) {
sprintf(infname,argv[1],irun);
#ifndef MANYFILES
#ifndef FF
sprintf(outfname,argv[3],irun);
#endif
f=fopen(infname,"r");
if (f) {
cout << infname << endl;
@ -138,185 +141,71 @@ int main(int argc, char *argv[]) {
// f0=cl.iframe;
if (nframes==0) f0=lastframe;
nframes++;
#ifdef MANYFILES
if (nframes%ff==0) {
sprintf(outfname,argv[3],irun,nframes-ff);
cout << outfname << endl;
interp->writeInterpolatedImage(outfname);
interp->clearInterpolatedImage();
}
#endif
}
#ifdef XTALK
if ((cl.x+1)%25!=0) {
for (int ix=-1; ix<2; ix++) {
for (int iy=-1; iy<2; iy++) {
old_val[iy+1][ix+1]=cl.get_data(ix,iy);
if (ix>=0) {
new_val[iy+1][ix+1]=old_val[iy+1][ix+1]-old_val[iy+1][ix]*xcorr;
cl.set_data(new_val[iy+1][ix+1],ix,iy);
}
}
}
}
#endif
quad=interp->calcQuad(cl.get_cluster(), sum, totquad, sDum);
if (sum>cmin && totquad/sum>0.8 && totquad/sum<1.2 && totquad<cmax && quad<cmax) {
nph++;
// if (sum>200 && sum<580) {
// interp->getInterpolatedPosition(cl.x,cl.y, totquad,quad,cl.get_cluster(),int_x, int_y);
if (sum>cmin && totquad/sum>0.8 && totquad/sum<1.2 && totquad<cmax && quad<cmax) {
nph++;
// if (sum>200 && sum<580) {
// interp->getInterpolatedPosition(cl.x,cl.y, totquad,quad,cl.get_cluster(),int_x, int_y);
#ifndef FF
interp->getInterpolatedPosition(cl.x,cl.y, cl.get_cluster(),int_x, int_y);
interp->addToImage(int_x, int_y);
interp3->getInterpolatedPosition(cl.x,cl.y, cl.get_cluster(),int3_x, int3_y);
interp3->addToImage(int3_x, int3_y);
nointerp->getInterpolatedPosition(cl.x,cl.y, cl.get_cluster(),noint_x, noint_y);
nointerp->addToImage(noint_x, noint_y);
#endif
#ifdef FF
interp->addToFlatField(cl.get_cluster(), etax, etay);
#endif
d_x= (int_x-int3_x)*25.;
d_y= (int_y-int3_y)*25.;
dummy->calcEta(totquad, sDum, etax, etay);
xx=int_x;
yy=int_y;
if (etax<0.1 || etax>0.9) xx=int3_x;
if (etay<0.1 || etay>0.9) yy=int3_y;
dummy->addToImage(xx,yy);
if (d_x>res || d_x<-res || d_y>res || d_y<-res) {
badph++;
// cout << "delta (um): "<< d_x << " " << d_y << " " << cl.x << " " << cl.y << endl;
// cout << sum << " " << totquad << " " << etax << " "<< etay << endl;
// //cout<< int_x << " " << int_y << " " << int3_x << " " << int3_y << endl;
if (nph%1000000==0) cout << nph << endl;
if (nph%100000000==0) {
#ifndef FF
interp->writeInterpolatedImage(outfname);
#endif
#ifdef FF
interp->writeFlatField(outfname);
#endif
}
mult->addToImage(noint_x, noint_y);
if (nph%1000000==0) cout << nph << endl;
if (nph%100000000==0) {
sprintf(outfname,"%s_inteta2.tiff", argv[3]);
interp->writeInterpolatedImage(outfname);
sprintf(outfname,"%s_inteta3.tiff", argv[3]);
interp3->writeInterpolatedImage(outfname);
sprintf(outfname,"%s_mix.tiff", argv[3]);
dummy->writeInterpolatedImage(outfname);
sprintf(outfname,"%s_noint.tiff", argv[3]);
nointerp->writeInterpolatedImage(outfname);
sprintf(outfname,"%s_mult.tiff", argv[3]);
mult->writeInterpolatedImage(outfname);
}
} else {
mult->getInterpolatedPosition(cl.x,cl.y, cl.get_cluster(),int_x, int_y);
for (int imult=0; imult<2.*sum/(cmax+cmin); imult++) mult->addToImage(int_x, int_y);
}
}
}
fclose(f);
#ifdef FF
interp->writeFlatField(outfname);
#endif
#ifndef FF
interp->writeInterpolatedImage(outfname);
img=interp->getInterpolatedImage();
for (isx=0; isx<nsubpix; isx++) {
for (isy=0; isy<nsubpix; isy++) {
ff[isy*nsubpix+isx]=0;
}
}
totff=0;
for (ix=0; ix<NC; ix++) {
for (iy=0; iy<NR-100; iy++) {
for (iy=0; iy<NR; iy++) {
for (isx=0; isx<nsubpix; isx++) {
for (isy=0; isy<nsubpix; isy++) {
ff[isy*nsubpix+isx]+=img[ix*nsubpix+isx+(iy*nsubpix+isy)*(NC*nsubpix)];
}
totimg[ix*nsubpix+isx+(iy*nsubpix+isy)*(NC*nsubpix)]+=img[ix*nsubpix+isx+(iy*nsubpix+isy)*(NC*nsubpix)];
}
}
}
}
for (isx=0; isx<nsubpix; isx++) {
for (isy=0; isy<nsubpix; isy++) {
totff+=ff[isy*nsubpix+isx];
}
}
totff/=nsubpix*nsubpix;
if (totff) {
cout << "ff: " << totff << endl;
for (isx=0; isx<nsubpix; isx++) {
for (isy=0; isy<nsubpix; isy++) {
ff[isy*nsubpix+isx]/=totff;
cout << ff[isy*nsubpix+isx] << "\t";
}
cout << endl;
}
for (ix=0; ix<NC; ix++) {
for (iy=0; iy<NR; iy++) {
for (isx=0; isx<nsubpix; isx++) {
for (isy=0; isy<nsubpix; isy++) {
ffimg[ix*nsubpix+isx+(iy*nsubpix+isy)*(NC*nsubpix)]=img[ix*nsubpix+isx+(iy*nsubpix+isy)*(NC*nsubpix)]/ff[isy*nsubpix+isx];
totimg[ix*nsubpix+isx+(iy*nsubpix+isy)*(NC*nsubpix)]+=ffimg[ix*nsubpix+isx+(iy*nsubpix+isy)*(NC*nsubpix)];
}
}
}
}
cout << "writing eta!" << endl;
WriteToTiff(ffimg, outfname,NC*nsubpix,NR*nsubpix);
}
#endif
#ifndef FF
#ifdef MANYFILES
sprintf(outfname,argv[3],irun,nframes-ff);
cout << outfname << endl;
#endif
sprintf(outfname,"%s_inteta2.tiff", argv[3]);
interp->writeInterpolatedImage(outfname);
sprintf(outfname,"%s_inteta3.tiff", argv[3]);
interp3->writeInterpolatedImage(outfname);
sprintf(outfname,"%s_mix.tiff", argv[3]);
dummy->writeInterpolatedImage(outfname);
sprintf(outfname,"%s_mult.tiff", argv[3]);
mult->writeInterpolatedImage(outfname);
#ifndef MANYFILES
img=interp->getInterpolatedImage();
for (ix=0; ix<NC; ix++) {
for (iy=0; iy<NR; iy++) {
for (isx=0; isx<nsubpix; isx++) {
for (isy=0; isy<nsubpix; isy++) {
totimg[ix*nsubpix+isx+(iy*nsubpix+isy)*(NC*nsubpix)]+=img[ix*nsubpix+isx+(iy*nsubpix+isy)*(NC*nsubpix)];
}
}
}
}
#endif
#endif
cout << "Read " << nframes << " frames (first frame: " << f0 << " last frame: " << lastframe << " delta:" << lastframe-f0 << ")"<<endl;
cout << "Read " << nframes << " frames (first frame: " << f0 << " last frame: " << lastframe << " delta:" << lastframe-f0 << ")"<<endl;
interp->clearInterpolatedImage();
interp3->clearInterpolatedImage();
dummy->clearInterpolatedImage();
mult->clearInterpolatedImage();
#endif
} else
cout << "could not open file " << infname << endl;
}
cout << irun << " " << runmax << endl;
#ifndef MANYFILES
#ifndef FF
sprintf(outfname,argv[3],11111);
WriteToTiff(totimg, outfname,NC*nsubpix,NR*nsubpix);
WriteToTiff(totimg, outfname,NC*nsubpix,NR*nsubpix);
#endif
#ifdef FF
interp->writeFlatField(outfname);
#endif
cout << "Filled " << nph << " (/"<< totph <<") of which " << badph << " badly interpolated " << endl;
cout << "Filled " << nph << " (/"<< totph <<") " << endl;
return 0;
}

View File

@ -8,6 +8,12 @@
#include <fstream>
#include "tiffIO.h"
//#define NEWZMQ
#ifdef NEWZMQ
#include <rapidjson/document.h> //json header in zmq stream
#endif
#include<iostream>
//#include "analogDetector.h"
@ -30,87 +36,129 @@ int main(int argc, char *argv[]) {
int fifosize=1000;
int nthreads=20;
// help
if (argc < 3 ) {
cprintf(RED, "Help: ./trial [receive socket ip] [receive starting port number] [send_socket ip] [send starting port number]\n");
if (argc < 3 ) {
cprintf(RED, "Help: ./trial [receive socket ip] [receive starting port number] [send_socket ip] [send starting port number]\n");
return EXIT_FAILURE;
}
// receive parameters
bool send = false;
char* socketip=argv[1];
uint32_t portnum = atoi(argv[2]);
int maxSize = 32*2*8192;//5000;//atoi(argv[3]);
int size= 32*2*5000;
int multisize=size;
// send parameters if any
char* socketip2 = 0;
uint32_t portnum2 = 0;
if (argc > 3) {
send = true;
socketip2 = argv[3];
portnum2 = atoi(argv[4]);
}
cout << "\nrx socket ip : " << socketip <<
"\nrx port num : " << portnum ;
if (send) {
cout << "\nsd socket ip : " << socketip2 <<
"\nsd port num : " << portnum2;
}
cout << endl;
//slsDetectorData *det=new moench03T1ZmqDataNew();
moench03T1ZmqDataNew *det=new moench03T1ZmqDataNew();
//analogDetector<uint16_t> *filter=new analogDetector<uint16_t>(det,1,NULL,1000);
singlePhotonDetector *filter=new singlePhotonDetector(det,3, 5, 1, 0, 1000, 10);
char* buff;
multiThreadedAnalogDetector *mt=new multiThreadedAnalogDetector(filter,nthreads,fifosize);
mt->setFrameMode(eFrame);
mt->StartThreads();
mt->popFree(buff);
ZmqSocket* zmqsocket=NULL;
#ifdef NEWZMQ
// receive socket
try{
#endif
zmqsocket = new ZmqSocket(socketip,portnum);
#ifdef NEWZMQ
} catch (...) {
cprintf(RED, "Error: Could not create Zmq socket on port %d with ip %s\n", portnum, socketip);
delete zmqsocket;
return EXIT_FAILURE;
}
}
#endif
// receive parameters
bool send = false;
char* socketip=argv[1];
uint32_t portnum = atoi(argv[2]);
int size = 32*2*5000;//atoi(argv[3]);
// send parameters if any
char* socketip2 = 0;
uint32_t portnum2 = 0;
if (argc > 3) {
send = true;
socketip2 = argv[3];
portnum2 = atoi(argv[4]);
}
cout << "\nrx socket ip : " << socketip <<
"\nrx port num : " << portnum ;
if (send) {
cout << "\nsd socket ip : " << socketip2 <<
"\nsd port num : " << portnum2;
}
cout << endl;
//slsDetectorData *det=new moench03T1ZmqDataNew();
moench03T1ZmqDataNew *det=new moench03T1ZmqDataNew();
//analogDetector<uint16_t> *filter=new analogDetector<uint16_t>(det,1,NULL,1000);
singlePhotonDetector *filter=new singlePhotonDetector(det,3, 5, 1, 0, 1000, 10);
char* buff;
multiThreadedAnalogDetector *mt=new multiThreadedAnalogDetector(filter,nthreads,fifosize);
mt->setFrameMode(eFrame);
mt->StartThreads();
mt->popFree(buff);
// receive socket
ZmqSocket* zmqsocket = new ZmqSocket(socketip,portnum);
#ifndef NEWZMQ
if (zmqsocket->IsError()) {
cprintf(RED, "Error: Could not create Zmq socket on port %d with ip %s\n", portnum, socketip);
delete zmqsocket;
return EXIT_FAILURE;
}
zmqsocket->Connect();
printf("Zmq Client at %s\n", zmqsocket->GetZmqServerAddress());
#endif
if (zmqsocket->Connect()) {
cprintf(RED, "Error: Could not connect to socket %s\n",
zmqsocket->GetZmqServerAddress());
delete zmqsocket;
return EXIT_FAILURE;
} else
printf("Zmq Client at %s\n", zmqsocket->GetZmqServerAddress());
// send socket
ZmqSocket* zmqsocket2 = 0;
cout << "zmq2 " << endl;
if (send) {
#ifdef NEWZMQ
// receive socket
try{
#endif
zmqsocket2 = new ZmqSocket(portnum2, socketip2);
#ifdef NEWZMQ
} catch (...) {
cprintf(RED, "Error: Could not create Zmq socket server on port %d and ip %s\n", portnum2, socketip2);
delete zmqsocket2;
delete zmqsocket;
return EXIT_FAILURE;
}
#endif
#ifndef NEWZMQ
if (zmqsocket2->IsError()) {
bprintf(RED, "Error: Could not create Zmq socket server on port %d and ip %s\n", portnum2, socketip2);
cprintf(RED, "Error: Could not create Zmq socket server on port %d and ip %s\n", portnum2, socketip2);
delete zmqsocket2;
delete zmqsocket;
return EXIT_FAILURE;
}
zmqsocket2->Connect();
printf("Zmq Server started at %s\n", zmqsocket2->GetZmqServerAddress());
#endif
if (zmqsocket2->Connect()) {
cprintf(RED, "Error: Could not connect to socket %s\n",
zmqsocket2->GetZmqServerAddress());
delete zmqsocket2;
return EXIT_FAILURE;
} else
printf("Zmq Client at %s\n", zmqsocket2->GetZmqServerAddress());
}
// header variables
uint64_t acqIndex = -1;
uint64_t frameIndex = -1;
uint32_t subframeIndex = -1;
uint32_t subFrameIndex = -1;
uint64_t fileindex = -1;
string filename = "";
char* image = new char[size];
// char* image = new char[size];
//int* image = new int[(size/sizeof(int))]();
uint32_t flippedDataX = -1;
int *nph;
int iframe=0;
char ofname[10000];
@ -119,17 +167,57 @@ int main(int argc, char *argv[]) {
int length;
int *detimage;
int nnx, nny,nns;
uint32_t imageSize = 0, nPixelsX = 0, nPixelsY = 0, dynamicRange = 0;
filter->getImageSize(nnx, nny,nns);
int16_t *dout=new int16_t [nnx*nny];
// infinite loop
uint32_t packetNumber = 0;
uint64_t bunchId = 0;
uint64_t timestamp = 0;
int16_t modId = 0;
uint16_t xCoord = 0;
uint16_t yCoord = 0;
uint16_t zCoord = 0;
uint32_t debug = 0;
uint32_t dr = 16;
uint16_t roundRNumber = 0;
uint8_t detType = 0;
uint8_t version = 0;
int* flippedData = 0;
char* additionalJsonHeader = 0;
uint32_t threshold=0;
uint32_t xmin=0, xmax=400, ymin=0, ymax=400;
string frameMode_s, detectorMode_s;
int emin, emax;
int newFrame=1;
while(1) {
// cout << "+++++++++++++++++++++++++++++++LOOP" << endl;
// get header, (if dummy, fail is on parse error or end of acquisition)
#ifndef NEWZMQ
if (!zmqsocket->ReceiveHeader(0, acqIndex, frameIndex, subframeIndex, filename, fileindex)){
#endif
#ifdef NEWZMQ
rapidjson::Document doc;
if (!zmqsocket->ReceiveHeader(0, doc, SLS_DETECTOR_JSON_HEADER_VERSION)) {
zmqsocket->CloseHeaderMessage();
#endif
// if (!zmqsocket->ReceiveHeader(0, acqIndex, frameIndex, subframeIndex, filename, fileindex)) {
cprintf(RED, "Got Dummy\n");
while (mt->isBusy()) {;}//wait until all data are processed from the queues
@ -142,8 +230,18 @@ int main(int argc, char *argv[]) {
if (send) {
strcpy(fname,filename.c_str());
// zmqsocket2->SendHeaderData(0, false, SLS_DETECTOR_JSON_HEADER_VERSION,16,fileindex,400,400,400*400, acqIndex,frameIndex,fname, acqIndex, 0,0,0,0,0,0,0,0,0,0,0,1);
zmqsocket2->SendHeaderData(0, false, SLS_DETECTOR_JSON_HEADER_VERSION,0,0,0,0,0, 0,0,fname, 0, 0,0,0,0,0,0,0,0,0,0,0,1);
#ifdef NEWZMQ
//zmqsocket2->SendHeaderData (0, false, SLS_DETECTOR_JSON_HEADER_VERSION, dynamicRange, fileindex,
// nnx, nny, nns*dynamicRange/8,acqIndex, frameIndex, fname, acqIndex, subFrameIndex, packetNumber,bunchId, timestamp, modId, xCoord, yCoord, zCoord,debug, roundRNumber, detType, version, flippedData, additionalJsonHeader);
zmqsocket2->SendHeaderData (0, false, SLS_DETECTOR_JSON_HEADER_VERSION, dr, fileindex,
nnx, nny, nns*dr/8,acqIndex, frameIndex, fname, acqIndex, subFrameIndex, packetNumber,bunchId, timestamp, modId, xCoord, yCoord, zCoord,debug, roundRNumber, detType, version, flippedData, additionalJsonHeader);
#endif
#ifndef NEWZMQ
zmqsocket2->SendHeaderData(0, false, SLS_DETECTOR_JSON_HEADER_VERSION,0,0,0,0,0, 0,0,fname, 0, 0,0,0,0,0,0,0,0,0,0,0,1);
#endif
zmqsocket2->SendData((char*)dout,length);
cprintf(GREEN, "Sent Data\n");
@ -161,9 +259,112 @@ int main(int argc, char *argv[]) {
of=NULL;
}
newFrame=1;
continue; //continue to not get out
}
#ifdef NEWZMQ
if (newFrame) {
// acqIndex, frameIndex, subframeIndex, filename, fileindex
size = doc["size"].GetUint();
multisize = size;// * zmqsocket->size();
dynamicRange = doc["bitmode"].GetUint();
nPixelsX = doc["shape"][0].GetUint();
nPixelsY = doc["shape"][1].GetUint();
filename = doc["fname"].GetString();
acqIndex = doc["acqIndex"].GetUint64();
frameIndex = doc["fIndex"].GetUint64();
fileindex = doc["fileIndex"].GetUint64();
subFrameIndex = doc["expLength"].GetUint();
xCoord = doc["xCoord"].GetUint();
yCoord = doc["yCoord"].GetUint();
zCoord = doc["zCoord"].GetUint();
flippedDataX=doc["flippedDataX"].GetUint();
packetNumber=doc["packetNumber"].GetUint();
bunchId=doc["bunchId"].GetUint();
timestamp=doc["timestamp"].GetUint();
modId=doc["modId"].GetUint();
debug=doc["debug"].GetUint();
roundRNumber=doc["roundRNumber"].GetUint();
detType=doc["detType"].GetUint();
version=doc["version"].GetUint();
cprintf(BLUE, "Header Info:\n"
"size: %u\n"
"multisize: %u\n"
"dynamicRange: %u\n"
"nPixelsX: %u\n"
"nPixelsY: %u\n"
"currentFileName: %s\n"
"currentAcquisitionIndex: %lu\n"
"currentFrameIndex: %lu\n"
"currentFileIndex: %lu\n"
"currentSubFrameIndex: %u\n"
"xCoordX: %u\n"
"yCoordY: %u\n"
"zCoordZ: %u\n"
"flippedDataX: %u\n"
"packetNumber: %u\n"
"bunchId: %u\n"
"timestamp: %u\n"
"modId: %u\n"
"debug: %u\n"
"roundRNumber: %u\n"
"detType: %u\n"
"version: %u\n",
size, multisize, dynamicRange, nPixelsX, nPixelsY,
filename.c_str(), acqIndex,
frameIndex, fileindex, subFrameIndex,
xCoord, yCoord,zCoord,
flippedDataX, packetNumber, bunchId, timestamp, modId, debug, roundRNumber, detType, version);
if (doc.HasMember("threshold")) {
version=doc["threshold"].GetUint();
}
if (doc.HasMember("roi")) {
xmin=doc["roi"][0].GetUint();
xmax=doc["roi"][1].GetUint();
ymin=doc["roi"][2].GetUint();
ymax=doc["roi"][3].GetUint();
}
if (doc.HasMember("frameMode")) {
frameMode_s=doc["frameMode"].GetString();
}
if (doc.HasMember("detectorMode")) {
detectorMode_s=doc["detectorMode"].GetString();
}
if (doc.HasMember("energyRange")) {
emin=doc["energyRange"][0].GetUint();
emax=doc["energyRange"][0].GetUint();
}
if (doc.HasMember("dynamicRange")) {
dr=doc["dynamicRange"].GetUint();
}
if (doc.HasMember("nSubPixels")) {
nsubPixels=doc["nSubPixels"].GetUint();
}
newFrame=0;
zmqsocket->CloseHeaderMessage();
}
#endif
if (of==NULL) {
sprintf(ofname,"%s_%d.clust",filename.c_str(),fileindex);

View File

@ -0,0 +1,529 @@
#include "sls_receiver_defs.h"
#include "ZmqSocket.h"
#include "moench03T1ZmqDataNew.h"
#include <vector>
#include <string>
#include <sstream>
#include <iomanip>
#include <fstream>
#include "tiffIO.h"
//#define NEWZMQ
#ifdef NEWZMQ
#include <rapidjson/document.h> //json header in zmq stream
#endif
#include<iostream>
//#include "analogDetector.h"
//#include "singlePhotonDetector.h"
#include "interpolatingDetector.h"
#include "multiThreadedAnalogDetector.h"
#include "ansi.h"
#include <iostream>
using namespace std;
#define SLS_DETECTOR_JSON_HEADER_VERSION 0x2
// myDet->setNetworkParameter(ADDITIONAL_JSON_HEADER, " \"what\":\"nothing\" ");
int main(int argc, char *argv[]) {
/**
* trial.o [socket ip] [starting port number] [send_socket ip] [send port number]
*
*/
FILE *of=NULL;
int fifosize=1000;
int nthreads=20;
// help
if (argc < 3 ) {
cprintf(RED, "Help: ./trial [receive socket ip] [receive starting port number] [send_socket ip] [send starting port number]\n");
return EXIT_FAILURE;
}
// receive parameters
bool send = false;
char* socketip=argv[1];
uint32_t portnum = atoi(argv[2]);
int maxSize = 32*2*8192;//5000;//atoi(argv[3]);
int size= 32*2*5000;
int multisize=size;
// send parameters if any
char* socketip2 = 0;
uint32_t portnum2 = 0;
if (argc > 3) {
send = true;
socketip2 = argv[3];
portnum2 = atoi(argv[4]);
}
cout << "\nrx socket ip : " << socketip <<
"\nrx port num : " << portnum ;
if (send) {
cout << "\ntx socket ip : " << socketip2 <<
"\ntx port num : " << portnum2;
}
cout << endl;
//slsDetectorData *det=new moench03T1ZmqDataNew();
moench03T1ZmqDataNew *det=new moench03T1ZmqDataNew();
cout << endl << " det" <<endl;
int npx, npy;
det->getDetectorSize(npx, npy);
//analogDetector<uint16_t> *filter=new analogDetector<uint16_t>(det,1,NULL,1000);
singlePhotonDetector *filter=new singlePhotonDetector(det,3, 5, 1, 0, 1000, 10);
// interpolatingDetector *filter=new interpolatingDetector(det,NULL, 5, 1, 0, 1000, 10);
cout << " filter" <<endl;
char* buff;
multiThreadedAnalogDetector *mt=new multiThreadedAnalogDetector(filter,nthreads,fifosize);
cout << " multi" <<endl;
mt->setFrameMode(eFrame);
mt->StartThreads();
cout << " start" <<endl;
mt->popFree(buff);
cout << " pop" <<endl;
ZmqSocket* zmqsocket=NULL;
#ifdef NEWZMQ
// receive socket
try{
#endif
zmqsocket = new ZmqSocket(socketip,portnum);
#ifdef NEWZMQ
} catch (...) {
cprintf(RED, "Error: Could not create Zmq socket on port %d with ip %s\n", portnum, socketip);
delete zmqsocket;
return EXIT_FAILURE;
}
#endif
#ifndef NEWZMQ
if (zmqsocket->IsError()) {
cprintf(RED, "Error: Could not create Zmq socket on port %d with ip %s\n", portnum, socketip);
delete zmqsocket;
return EXIT_FAILURE;
}
#endif
if (zmqsocket->Connect()) {
cprintf(RED, "Error: Could not connect to socket %s\n",
zmqsocket->GetZmqServerAddress());
delete zmqsocket;
return EXIT_FAILURE;
} else
printf("Zmq Client at %s\n", zmqsocket->GetZmqServerAddress());
// send socket
ZmqSocket* zmqsocket2 = 0;
cout << "zmq2 " << endl;
if (send) {
#ifdef NEWZMQ
// receive socket
try{
#endif
zmqsocket2 = new ZmqSocket(portnum2, socketip2);
#ifdef NEWZMQ
} catch (...) {
cprintf(RED, "Error: Could not create Zmq socket server on port %d and ip %s\n", portnum2, socketip2);
delete zmqsocket2;
delete zmqsocket;
return EXIT_FAILURE;
}
#endif
#ifndef NEWZMQ
if (zmqsocket2->IsError()) {
cprintf(RED, "Error: Could not create Zmq socket server on port %d and ip %s\n", portnum2, socketip2);
delete zmqsocket2;
delete zmqsocket;
return EXIT_FAILURE;
}
#endif
if (zmqsocket2->Connect()) {
cprintf(RED, "Error: Could not connect to socket %s\n",
zmqsocket2->GetZmqServerAddress());
delete zmqsocket2;
return EXIT_FAILURE;
} else
printf("Zmq Client at %s\n", zmqsocket2->GetZmqServerAddress());
}
// header variables
uint64_t acqIndex = -1;
uint64_t frameIndex = -1;
uint32_t subFrameIndex = -1;
uint64_t fileindex = -1;
string filename = "";
// char* image = new char[size];
//int* image = new int[(size/sizeof(int))]();
uint32_t flippedDataX = -1;
int *nph;
int iframe=0;
char ofname[10000];
char fname[10000];
int length;
int *detimage;
int nnx, nny,nns;
uint32_t imageSize = 0, nPixelsX = 0, nPixelsY = 0, dynamicRange = 0;
filter->getImageSize(nnx, nny,nns);
// infinite loop
uint32_t packetNumber = 0;
uint64_t bunchId = 0;
uint64_t timestamp = 0;
int16_t modId = 0;
uint16_t xCoord = 0;
uint16_t yCoord = 0;
uint16_t zCoord = 0;
uint32_t debug = 0;
uint32_t dr = 16;
int16_t *dout=new int16_t [nnx*nny];
//uint32_t dr = 32;
//int32_t *dout=new int32_t [nnx*nny];
uint32_t nSigma=5;
uint16_t roundRNumber = 0;
uint8_t detType = 0;
uint8_t version = 0;
int* flippedData = 0;
char* additionalJsonHeader = 0;
int32_t threshold=0;
int32_t xmin=0, xmax=400, ymin=0, ymax=400;
string frameMode_s, detectorMode_s, intMode_s;
int emin, emax;
int resetFlat=0;
int resetPed=0;
int nsubPixels=1;
int isPedestal;
int isFlat=0;
int newFrame=1;
detectorMode dMode;
frameMode fMode;
double *ped;
while(1) {
// cout << "+++++++++++++++++++++++++++++++LOOP" << endl;
// get header, (if dummy, fail is on parse error or end of acquisition)
#ifndef NEWZMQ
if (!zmqsocket->ReceiveHeader(0, acqIndex, frameIndex, subframeIndex, filename, fileindex)){
#endif
#ifdef NEWZMQ
rapidjson::Document doc;
if (!zmqsocket->ReceiveHeader(0, doc, SLS_DETECTOR_JSON_HEADER_VERSION)) {
zmqsocket->CloseHeaderMessage();
#endif
// if (!zmqsocket->ReceiveHeader(0, acqIndex, frameIndex, subframeIndex, filename, fileindex)) {
cprintf(RED, "Got Dummy\n");
while (mt->isBusy()) {;}//wait until all data are processed from the queues
detimage=mt->getImage(nnx,nny,nns);
if (fMode==ePedestal) {
nns=1;
// cout << "get pedestal " << endl;
ped=mt->getPedestal();
// cout << "got pedestal " << endl;
for (int ix=0; ix<nnx; ix++) {
for (int iy=0; iy<nny; iy++) {
dout[iy*nnx+ix]=ped[(iy)*nnx+ix];
// cout << ped[(iy)*nnx+ix] << " " ;
}
}
// cout << endl ;
// cout << "done " << endl;
} else {
for (int ix=0; ix<nnx; ix++) {
for (int iy=0; iy<nny; iy++) {
for (int isx=0; isx<nns; isx++) {
for (int isy=0; isy<nns; isy++) {
if (isx==0 && isy==0)
dout[iy*nnx+ix]=detimage[(iy+isy)*nnx*nns+ix+isx];
else
dout[iy*nnx+ix]+=detimage[(iy+isy)*nnx*nns+ix+isx];
}
}
if (dout[iy*nnx+ix]<0) dout[iy*nnx+ix]=0;
}
}
}
// cout << nns*nnx*nny*nns*dr/8 << " " << length << endl;
if (send) {
strcpy(fname,filename.c_str());
#ifdef NEWZMQ
//zmqsocket2->SendHeaderData (0, false, SLS_DETECTOR_JSON_HEADER_VERSION, dynamicRange, fileindex,
// nnx, nny, nns*dynamicRange/8,acqIndex, frameIndex, fname, acqIndex, subFrameIndex, packetNumber,bunchId, timestamp, modId, xCoord, yCoord, zCoord,debug, roundRNumber, detType, version, flippedData, additionalJsonHeader);
// zmqsocket2->SendHeaderData (0, false, SLS_DETECTOR_JSON_HEADER_VERSION, dr, fileindex, nnx, nny, nns*dr/8,acqIndex, frameIndex, fname, acqIndex, subFrameIndex, packetNumber,bunchId, timestamp, modId, xCoord, yCoord, zCoord,debug, roundRNumber, detType, version, flippedData, additionalJsonHeader);
zmqsocket2->SendHeaderData (0, false, SLS_DETECTOR_JSON_HEADER_VERSION, dr, fileindex, nnx, nny, nnx*nny*dr/8,acqIndex, frameIndex, fname, acqIndex, subFrameIndex, packetNumber,bunchId, timestamp, modId, xCoord, yCoord, zCoord,debug, roundRNumber, detType, version, flippedData, additionalJsonHeader);
#endif
#ifndef NEWZMQ
zmqsocket2->SendHeaderData(0, false, SLS_DETECTOR_JSON_HEADER_VERSION,0,0,0,0,0, 0,0,fname, 0, 0,0,0,0,0,0,0,0,0,0,0,1);
#endif
zmqsocket2->SendData((char*)dout,length);//nns*dr/8);
cprintf(GREEN, "Sent Data\n");
}
// stream dummy to socket2 to signal end of acquisition
if (send) {
zmqsocket2->SendHeaderData(0, true, SLS_DETECTOR_JSON_HEADER_VERSION);
cprintf(RED, "Sent Dummy\n");
}
mt->clearImage();
if (of) {
fclose(of);
of=NULL;
}
newFrame=1;
continue; //continue to not get out
}
#ifdef NEWZMQ
if (newFrame) {
// acqIndex, frameIndex, subframeIndex, filename, fileindex
size = doc["size"].GetUint();
multisize = size;// * zmqsocket->size();
dynamicRange = doc["bitmode"].GetUint();
nPixelsX = doc["shape"][0].GetUint();
nPixelsY = doc["shape"][1].GetUint();
filename = doc["fname"].GetString();
acqIndex = doc["acqIndex"].GetUint64();
frameIndex = doc["fIndex"].GetUint64();
fileindex = doc["fileIndex"].GetUint64();
subFrameIndex = doc["expLength"].GetUint();
xCoord = doc["xCoord"].GetUint();
yCoord = doc["yCoord"].GetUint();
zCoord = doc["zCoord"].GetUint();
flippedDataX=doc["flippedDataX"].GetUint();
packetNumber=doc["packetNumber"].GetUint();
bunchId=doc["bunchId"].GetUint();
timestamp=doc["timestamp"].GetUint();
modId=doc["modId"].GetUint();
debug=doc["debug"].GetUint();
roundRNumber=doc["roundRNumber"].GetUint();
detType=doc["detType"].GetUint();
version=doc["version"].GetUint();
cprintf(BLUE, "Header Info:\n"
"size: %u\n"
"multisize: %u\n"
"dynamicRange: %u\n"
"nPixelsX: %u\n"
"nPixelsY: %u\n"
"currentFileName: %s\n"
"currentAcquisitionIndex: %lu\n"
"currentFrameIndex: %lu\n"
"currentFileIndex: %lu\n"
"currentSubFrameIndex: %u\n"
"xCoordX: %u\n"
"yCoordY: %u\n"
"zCoordZ: %u\n"
"flippedDataX: %u\n"
"packetNumber: %u\n"
"bunchId: %u\n"
"timestamp: %u\n"
"modId: %u\n"
"debug: %u\n"
"roundRNumber: %u\n"
"detType: %u\n"
"version: %u\n",
size, multisize, dynamicRange, nPixelsX, nPixelsY,
filename.c_str(), acqIndex,
frameIndex, fileindex, subFrameIndex,
xCoord, yCoord,zCoord,
flippedDataX, packetNumber, bunchId, timestamp, modId, debug, roundRNumber, detType, version);
/* Analog detector commands */
isPedestal=0;
isFlat=0;
fMode=eFrame;
frameMode_s="frame";
cprintf(MAGENTA, "Frame mode: ");
if (doc.HasMember("frameMode")) {
if (doc["frameMode"].IsString()) {
frameMode_s=doc["frameMode"].GetString();
if (frameMode_s == "pedestal"){
fMode=ePedestal;
isPedestal=1;
} else if (frameMode_s == "newPedestal"){
mt->newDataSet(); //resets pedestal
// cprintf(MAGENTA, "Resetting pedestal\n");
fMode=ePedestal;
isPedestal=1;
} else if (frameMode_s == "flatfield") {
fMode=eFlat;
isFlat=1;
} else if (frameMode_s == "newFlatfield") {
//mt->resetFlatfield();
isFlat=1;
//cprintf(MAGENTA, "Resetting flatfield\n");
fMode=eFlat;
} else
fMode=eFrame;
}
}
cprintf(MAGENTA, "%s\n" , frameMode_s.c_str());
mt->setFrameMode(fMode);
threshold=0;
cprintf(MAGENTA, "Threshold: ");
if (doc.HasMember("threshold")) {
if (doc["threshold"].IsInt())
threshold=doc["threshold"].GetInt();
}
mt->setThreshold(threshold);
cprintf(MAGENTA, "%d\n", threshold);
xmin=0;
xmax=npx;
ymin=0;
ymax=npy;
cprintf(MAGENTA, "ROI: ");
if (doc.HasMember("roi")) {
if (doc["roi"].IsArray()) {
if (doc["roi"].Size() > 0 )
if (doc["roi"][0].IsInt())
xmin=doc["roi"][0].GetInt();
if (doc["roi"].Size() > 1 )
if (doc["roi"][1].IsInt())
xmax=doc["roi"][1].GetInt();
if (doc["roi"].Size() > 2 )
if (doc["roi"][2].IsInt())
ymin=doc["roi"][2].GetInt();
if (doc["roi"].Size() > 3 )
if (doc["roi"][3].IsInt())
ymax=doc["roi"][3].GetInt();
}
}
cprintf(MAGENTA, "%d %d %d %d\n", xmin, xmax, ymin, ymax);
mt->setROI(xmin, xmax, ymin, ymax);
// if (doc.HasMember("dynamicRange")) {
// dr=doc["dynamicRange"].GetUint();
// }
dMode=eAnalog;
detectorMode_s="analog";
cprintf(MAGENTA, "Detector mode: ");
if (doc.HasMember("detectorMode")) {
if (doc["detectorMode"].IsString()) {
detectorMode_s=doc["detectorMode"].GetString();
if (detectorMode_s == "interpolating"){
dMode=eInterpolating;
} else if (detectorMode_s == "counting"){
dMode=ePhotonCounting;
} else {
dMode=eAnalog;
}
}
}
cprintf(MAGENTA, "%s\n" , detectorMode_s.c_str());
mt->setDetectorMode(dMode);
/* Single Photon Detector commands */
// if (doc.HasMember("nSigma")) {
// nSigma=doc["nSigma"].GetUint();
// }
// if (doc.HasMember("energyRange")) {
// emin=doc["energyRange"][0].GetUint();
// emax=doc["energyRange"][1].GetUint();
// }
/* interpolating detector commands */
// if (doc.HasMember("nSubPixels")) {
// nsubPixels=doc["nSubPixels"].GetUint();
// }
//if (doc.HasMember("interpolation")) {
// intMode_s=doc["intepolation"].GetString();
//}
newFrame=0;
zmqsocket->CloseHeaderMessage();
}
#endif
if (of==NULL) {
sprintf(ofname,"%s_%d.clust",filename.c_str(),fileindex);
of=fopen(ofname,"w");
if (of) {
mt->setFilePointer(of);
}else {
cout << "Could not open "<< ofname << " for writing " << endl;
mt->setFilePointer(NULL);
}
}
// get data
length = zmqsocket->ReceiveData(0, buff, size);
mt->pushData(buff);
mt->nextThread();
mt->popFree(buff);
iframe++;
} // exiting infinite loop
delete zmqsocket;
if (send)
delete zmqsocket2;
cout<<"Goodbye"<< endl;
return 0;
}