in between

This commit is contained in:
2018-04-04 14:27:53 +02:00
parent 9e8d3e598f
commit 61897cbd41
6 changed files with 238 additions and 316 deletions

View File

@ -24,7 +24,7 @@ ID: $Id$
#include <sys/shm.h>
#include <iostream>
#include <string>
#include <rapidjson/document.h> //json header in zmq stream
char ans[MAX_STR_LENGTH];
@ -6015,243 +6015,165 @@ int multiSlsDetector::createReceivingDataSockets(const bool destroy){
int multiSlsDetector::getData(const int isocket, char* image, const int size,
uint64_t &acqIndex, uint64_t &frameIndex, uint32_t &subframeIndex,
string &filename, uint64_t &fileIndex) {
//fail is on parse error or end of acquisition
if (!zmqSocket[isocket]->ReceiveHeader(isocket, acqIndex, frameIndex, subframeIndex, filename, fileIndex))
return FAIL;
//receiving incorrect size is replaced by 0xFF
zmqSocket[isocket]->ReceiveData(isocket, image, size);
return OK;
}
void multiSlsDetector::readFrameFromReceiver(){
//determine number of sockets
int numSockets = thisMultiDetector->numberOfDetectors;
int numSocketsPerSLSDetector = 1;
bool jungfrau = false;
bool eiger = false;
/*double* gdata = NULL;*/
slsDetectorDefs::detectorType myDetType = getDetectorsType();
switch(myDetType){
case EIGER:
eiger = true;
numSocketsPerSLSDetector = 2;
numSockets *= numSocketsPerSLSDetector;
break;
case JUNGFRAU:
jungfrau = true;
break;
default:
break;
}
int numSockets = thisMultiDetector->numberOfDetectors;
bool gappixelsenable = false;
if (getDetectorsType() == EIGER) {
numSockets *= 2;
gappixelsenable = detectors[0]->enableGapPixels(-1) >= 1 ? true: false;
}
//gui variables
uint64_t currentAcquisitionIndex = -1;
uint64_t currentFrameIndex = -1;
uint32_t currentSubFrameIndex = -1;
uint64_t currentFileIndex = -1;
string currentFileName = "";
bool runningList[numSockets];
bool connectList[numSockets];
int numRunning = 0;
for(int i = 0; i < numSockets; ++i) {
if(!zmqSocket[i]->Connect()) {
connectList[i] = true;
runningList[i] = true;
++numRunning;
} else {
// to remember the list it connected to, to disconnect later
connectList[i] = false;
cprintf(RED,"Error: Could not connect to socket %s\n",zmqSocket[i]->GetZmqServerAddress());
runningList[i] = false;
}
}
int numConnected = numRunning;
bool data = false;
char* image = NULL;
char* multiframe = NULL;
char* multigappixels = NULL
int multisize = 0;
// header info
uint32_t size = 0; // only first message header
uint32_t nPixelsx = 0, nPixelsY = 0;
uint32_t dynamicRange = 0;
string currentFileName = "";
uint64_t currentAcquisitionIndex = -1;
uint64_t currentFrameIndex = -1;
uint32_t currentSubFrameIndex = -1;
uint64_t currentFileIndex = -1;
//getting sls values
int slsdatabytes = 0, slsmaxchannels = 0, slsmaxX = 0, slsmaxY=0;
double bytesperchannel = 0;
bool gappixelsenable = false;
if(detectors[0]){
slsmaxchannels = detectors[0]->getMaxNumberOfChannels(X) * detectors[0]->getMaxNumberOfChannels(Y);
slsdatabytes = detectors[0]->getDataBytes();
bytesperchannel = (double)slsdatabytes/(double)slsmaxchannels;
//wait for real time acquisition to start
bool running = true;
sem_wait(&sem_newRTAcquisition);
if(checkJoinThread())
running = false;
//exit when checkJoinThread() (all sockets done)
while(running){
//get each frame
for(int isocket=0; isocket<numSockets; ++isocket){
// reset data
data = false;
if (multiframe != NULL)
memset(multiframe, 0xFF, multisize);
//if running
if (runningList[isocket]) {
// sub images - header
rapidjson::Document doc;
if(!zmqSocket[isocket]->ReceiveHeader(isocket, doc, SLS_DETECTOR_JSON_HEADER_VERSION)){
zmqSocket[isocket]->CloseHeaderMessage();
// parse error, version error or end of acquisition for socket
runningList[isocket] = false;
--numRunning;
continue;
}
// if first message, allocate (all one time stuff)
if (image == NULL) {
// allocate
size = doc["size"].GetUint();
multisize = size * numSockets;
image = new char[size];
multiframe = new char[multisize];
// one time values
// dynamic range
dynamicRange = doc["bitmode"].GetUint();
// shape
if (dynamicRange == 4) {
nPixelsx = thisMultiDetector->numberOfChannelInclGapPixels[X];
nPixelsY = thisMultiDetector->numberOfChannelInclGapPixels[Y];
} else {
const Value& a = doc["shape"];
nPixelsx = a[0].GetUint(); /* later try doc["shape"].GetUint();*/
nPixelsY = a[1].GetUint();
}
}
// parse rest of header
currentFileName = doc["fname"].GetString();
currentAcquisitionIndex = doc["acqIndex"].GetUint64();
currentFrameIndex = doc["fIndex"].GetUint64();
currentFileIndex = doc["fileIndex"].GetUint64();
currentSubFrameIndex = doc["expLength"].GetUint();
zmqSocket[isocket]->CloseHeaderMessage();
// copying data (receiving incorrect size is replaced by 0xFF)
data = true;
zmqSocket[isocket]->ReceiveData(isocket, image, size);
// creaing multi image
// recalculate with gap pixels (for >= 8 bit mode)
if (bytesperchannel >= 1.0) {
slsdatabytes = detectors[0]->getDataBytesInclGapPixels();
slsmaxchannels = detectors[0]->getMaxNumberOfChannelsInclGapPixels(X)*detectors[0]->getMaxNumberOfChannelsInclGapPixels(Y);
}
}
slsmaxX = (bytesperchannel >= 1.0) ? detectors[0]->getTotalNumberOfChannelsInclGapPixels(X) : detectors[0]->getTotalNumberOfChannels(X);
slsmaxY = (bytesperchannel >= 1.0) ? detectors[0]->getTotalNumberOfChannelsInclGapPixels(Y) : detectors[0]->getTotalNumberOfChannels(Y);
gappixelsenable = detectors[0]->enableGapPixels(-1) >= 1 ? true: false;
}
// max channel values
int maxX = (bytesperchannel >= 1.0) ? thisMultiDetector->numberOfChannelInclGapPixels[X] : thisMultiDetector->numberOfChannel[X];
int maxY = (bytesperchannel >= 1.0) ? thisMultiDetector->numberOfChannelInclGapPixels[Y] : thisMultiDetector->numberOfChannel[Y];
int multidatabytes = (bytesperchannel >= 1.0) ? thisMultiDetector->dataBytesInclGapPixels : thisMultiDetector->dataBytes;
int dr = bytesperchannel * 8;
if (myDetType == JUNGFRAUCTB) {
maxY = (int)(thisMultiDetector->timerValue[SAMPLES_JCTB] * 2)/25; // for moench 03
maxX = 400;
dr = 16;
}
}
//send data to callback
if(data){
// 4bit gap pixels
if (dynamicRange == 4 && gappixelsenable) {
int n = processImageWithGapPixels(multiframe, multigappixels);
thisData = new detectorData(NULL,NULL,NULL,getCurrentProgress(),
currentFileName.c_str(), nPixelsx, nPixelsY,
multigappixels, n, dynamicRange, currentFileIndex);
}
// normal pixels
else
thisData = new detectorData(NULL, NULL, NULL, getCurrentProgress(),
currentFileName.c_str(), nPixelsx, nPixelsY,
multiframe, multisize, dynamicRange, currentFileIndex);
dataReady(thisData, currentFrameIndex,
((dynamicRange == 32) ? currentSubFrameIndex : -1),
pCallbackArg);
delete thisData;
setCurrentProgress(currentAcquisitionIndex + 1);
}
//all done
if(!numRunning){
// let main thread know that all dummy packets have been received (also from external process),
// main thread can now proceed to measurement finished call back
sem_post(&sem_endRTAcquisition);
// wait for next scan/measurement, else join thread
sem_wait(&sem_newRTAcquisition);
//done with complete acquisition
if(checkJoinThread())
running = false;
else{
//starting a new scan/measurement (got dummy data)
for(int i = 0; i < numSockets; ++i)
runningList[i] = connectList[i];
numRunning = numConnected;
}
}
//getting multi values
//calculating offsets (for eiger interleaving ports)
int offsetX[numSockets]; int offsetY[numSockets];
int bottom[numSockets];
if(eiger){
for(int i=0; i<numSockets; ++i){
offsetY[i] = (maxY - (thisMultiDetector->offsetY[i/numSocketsPerSLSDetector] + slsmaxY)) * maxX * bytesperchannel;
//the left half or right half
if(!(i%numSocketsPerSLSDetector))
offsetX[i] = thisMultiDetector->offsetX[i/numSocketsPerSLSDetector];
else
offsetX[i] = thisMultiDetector->offsetX[i/numSocketsPerSLSDetector] + (slsmaxX/numSocketsPerSLSDetector);
offsetX[i] *= bytesperchannel;
bottom[i] = detectors[i/numSocketsPerSLSDetector]->getFlippedData(X);/*only for eiger*/
}
}
}
int expectedslssize = slsdatabytes/numSocketsPerSLSDetector;
char* image = new char[expectedslssize]();
char* multiframe = new char[multidatabytes]();
char* multiframegain = NULL;
char* multigappixels = NULL; // used only for 4 bit mode with gap pixels enabled
if (jungfrau)
multiframegain = new char[multidatabytes]();
bool runningList[numSockets];
bool connectList[numSockets];
for(int i = 0; i < numSockets; ++i) {
if(!zmqSocket[i]->Connect()) {
connectList[i] = true;
runningList[i] = true;
} else {
connectList[i] = false;
cprintf(RED,"Error: Could not connect to socket %s\n",zmqSocket[i]->GetZmqServerAddress());
runningList[i] = false;
}
}
int numRunning = numSockets;
//wait for real time acquisition to start
bool running = true;
sem_wait(&sem_newRTAcquisition);
if(checkJoinThread())
running = false;
//exit when last message for each socket received
while(running){
memset(multiframe,0xFF,slsdatabytes*thisMultiDetector->numberOfDetectors); //reset frame memory
//get each frame
for(int isocket=0; isocket<numSockets; ++isocket){
//if running
if (runningList[isocket]) {
//get individual images
if(FAIL == getData(isocket, image, expectedslssize, currentAcquisitionIndex,currentFrameIndex,currentSubFrameIndex,currentFileName, currentFileIndex)){
runningList[isocket] = false;
--numRunning;
continue;
}
//assemble data with interleaving
if(eiger){
//bottom
if(bottom[isocket]){
//if((((isocket/numSocketsPerSLSDetector)+1)%2) == 0){
for(int i=0;i<slsmaxY;++i){
memcpy(multiframe + offsetY[isocket] + offsetX[isocket] + (int)((slsmaxY-1-i)*maxX*bytesperchannel),
image+ (int)(i*(slsmaxX/numSocketsPerSLSDetector)*bytesperchannel),
(int)((slsmaxX/numSocketsPerSLSDetector)*bytesperchannel));
}
}
//top
else{
for(int i=0;i<slsmaxY;++i){
memcpy(multiframe + offsetY[isocket] + offsetX[isocket] + (int)(i*maxX*bytesperchannel),
image+ (int)(i*(slsmaxX/numSocketsPerSLSDetector)*bytesperchannel),
(int)((slsmaxX/numSocketsPerSLSDetector)*bytesperchannel));
}
}
}
//assemble data with no interleaving, assumed detectors appended vertically
else{
for(int i=0;i<slsmaxY;++i){
memcpy(((char*)multiframe) + (((thisMultiDetector->offsetY[isocket] + i) * maxX) + thisMultiDetector->offsetX[isocket])* (int)bytesperchannel,
(char*)image+ (i*slsmaxX*(int)bytesperchannel),
(slsmaxX*(int)bytesperchannel));
}
}
}
}
//all done
if(!numRunning){
// let main thread know that all dummy packets have been received (also from external process),
// main thread can now proceed to measurement finished call back
sem_post(&sem_endRTAcquisition);
// wait for next scan/measurement, else join thread
sem_wait(&sem_newRTAcquisition);
//done with complete acquisition
if(checkJoinThread())
break;
else{
//starting a new scan/measurement (got dummy data)
for(int i = 0; i < numSockets; ++i)
runningList[i] = true;
numRunning = numSockets;
running = false;
}
}
//send data to callback
if(running){
if (gappixelsenable && bytesperchannel < 1) {//inside this function, allocate if it doesnt exist
int nx = thisMultiDetector->numberOfChannelInclGapPixels[X];
int ny = thisMultiDetector->numberOfChannelInclGapPixels[Y];
int n = processImageWithGapPixels(multiframe, multigappixels);
thisData = new detectorData(NULL,NULL,NULL,getCurrentProgress(),currentFileName.c_str(), nx, ny,multigappixels, n, dr, currentFileIndex);
}
else {
thisData = new detectorData(NULL,NULL,NULL,getCurrentProgress(),currentFileName.c_str(),maxX,maxY,multiframe, multidatabytes, dr, currentFileIndex);
}
dataReady(thisData, currentFrameIndex, (((dr == 32) && (eiger)) ? currentSubFrameIndex : -1), pCallbackArg);
delete thisData;
//cout<<"Send frame #"<< currentFrameIndex << " to gui"<<endl;
setCurrentProgress(currentAcquisitionIndex+1);
}
//setting it back for each scan/measurement
running = true;
}
// Disconnect resources
for (int i = 0; i < numSockets; ++i)
if (connectList[i])
zmqSocket[i]->Disconnect();
//free resources
delete [] image;
delete[] multiframe;
if (jungfrau)
delete [] multiframegain;
if (multigappixels != NULL)
delete [] multigappixels;
// Disconnect resources
for (int i = 0; i < numSockets; ++i)
if (connectList[i])
zmqSocket[i]->Disconnect();
//free resources
if (image != NULL) delete [] image;
if (multiframe != NULL) delete [] multiframe;
if (multigappixels != NULL) delete [] multigappixels;
}

View File

@ -1361,6 +1361,7 @@ class multiSlsDetector : public slsDetectorUtils {
/** Reads frames from receiver through a constant socket
*/
void readFrameFromReceiver();
/** Locks/Unlocks the connection to the receiver
/param lock sets (1), usets (0), gets (-1) the lock
/returns lock status of the receiver
@ -1556,21 +1557,6 @@ class multiSlsDetector : public slsDetectorUtils {
private:
/**
* Gets data from socket
* @param isocket socket index
* @param image image buffer
* @param size size of image
* @param acqIndex address of acquisition index
* @param frameIndex address of frame index
* @param subframeIndex address of subframe index
* @param filename address of file name
* @param fileindex address of file index
*/
int getData(const int isocket, char* image, const int size,
uint64_t &acqIndex, uint64_t &frameIndex, uint32_t &subframeIndex,
string &filename, uint64_t &fileIndex);
/**
* add gap pixels to the image (only for Eiger in 4 bit mode)

View File

@ -576,6 +576,7 @@ class slsDetectorBase : public virtual slsDetectorDefs, public virtual errorDef
*/
virtual void readFrameFromReceiver()=0;
/**
* Enable data streaming to client
* @param enable 0 to disable, 1 to enable, -1 to get the value

View File

@ -48,7 +48,8 @@ public:
portno (portnumber),
server (false),
contextDescriptor (NULL),
socketDescriptor (NULL)
socketDescriptor (NULL),
headerMessage(0)
{
char ip[MAX_STR_LENGTH] = "";
memset(ip, 0, MAX_STR_LENGTH);
@ -104,7 +105,8 @@ public:
portno (portnumber),
server (true),
contextDescriptor (NULL),
socketDescriptor (NULL)
socketDescriptor (NULL),
headerMessage(0)
{
// create context
contextDescriptor = zmq_ctx_new();
@ -365,19 +367,16 @@ public:
/**
* Receive Header
* Receive Header (Important to close message after parsing header)
* @param index self index for debugging
* @param acqIndex address of acquisition index
* @param frameIndex address of frame index
* @param subframeIndex address of subframe index
* @param filename address of file name
* @param fileindex address of file index
* @returns 0 if error or end of acquisition, else 1
* @param document parsed document reference
* @param version version that has to match, -1 to not care
* @returns 0 if error or end of acquisition, else 1 (call CloseHeaderMessage after parsing header)
*/
int ReceiveHeader(const int index, uint64_t &acqIndex,
uint64_t &frameIndex, uint32_t &subframeIndex, std::string &filename, uint64_t &fileIndex)
int ReceiveHeader(const int index, Document& document, uint32_t version)
{
zmq_msg_t message;
headerMessage= &message;
zmq_msg_init (&message);
int len = ReceiveMessage(index, message);
if ( len > 0 ) {
@ -385,11 +384,10 @@ public:
#ifdef ZMQ_DETAIL
cprintf( BLUE,"Header %d [%d] Length: %d Header:%s \n", index, portno, len, (char*) zmq_msg_data (&message) );
#endif
if ( ParseHeader (index, len, message, acqIndex, frameIndex, subframeIndex, filename, fileIndex, dummy)) {
if ( ParseHeader (index, len, message, document, dummy, version)) {
#ifdef ZMQ_DETAIL
cprintf( RED,"Parsed Header %d [%d] Length: %d Header:%s \n", index, portno, len, (char*) zmq_msg_data (&message) );
#endif
zmq_msg_close (&message);
if (dummy) {
#ifdef ZMQ_DETAIL
cprintf(RED,"%d [%d] Received end of acquisition\n", index, portno );
@ -402,10 +400,79 @@ public:
return 1;
}
}
zmq_msg_close(&message);
return 0;
};
/**
* Close Header Message. Call this function if ReceiveHeader returned 1
*/
void CloseHeaderMessage() {
if (headerMessage)
zmq_msg_close(headerMessage);
headerMessage = 0;
};
/**
* Parse Header
* @param index self index for debugging
* @param length length of message
* @param message message
* @param document parsed document reference
* @param dummy true if end of acqusition, else false, loaded upon parsing
* @param version version that has to match, -1 to not care
* @returns true if successful else false
*/
int ParseHeader(const int index, int length, zmq_msg_t& message,
Document& document, bool& dummy, uint32_t version)
{
if ( document.Parse( (char*) zmq_msg_data (&message), zmq_msg_size (&message)).HasParseError() ) {
cprintf( RED,"%d Could not parse. len:%d: Message:%s \n", index, length, (char*) zmq_msg_data (&message) );
fflush ( stdout );
char* buf = (char*) zmq_msg_data (&message);
for ( int i= 0; i < length; ++i ) {
cprintf(RED,"%02x ",buf[i]);
}
printf("\n");
fflush( stdout );
return 0;
}
if (document["jsonversion"].GetUint() != version) {
cprintf( RED, "version mismatch. required %u, got %u\n", version, document["jsonversion"].GetUint());
return 0;
}
dummy = false;
int temp = document["data"].GetUint();
dummy = temp ? false : true;
return 1;
/*
int temp = d["data"].GetUint();
dummy = temp ? false : true;
if (!dummy) {
acqIndex = d["acqIndex"].GetUint64();
frameIndex = d["fIndex"].GetUint64();
fileIndex = d["fileIndex"].GetUint64();
subframeIndex = d["expLength"].GetUint();
filename = d["fname"].GetString();
}
#ifdef VERYVERBOSE
cprintf(BLUE,"%d Dummy:%d\n"
"\tAcqIndex:%lu\n"
"\tFrameIndex:%lu\n"
"\tSubIndex:%u\n"
"\tFileIndex:%lu\n"
"\tBitMode:%u\n"
"\tDetType:%u\n",
index, (int)dummy, acqIndex, frameIndex, subframeIndex, fileIndex,
d["bitmode"].GetUint(),d["detType"].GetUint());
#endif
return 1;
*/
};
/**
* Receive Data
* @param index self index for debugging
@ -446,65 +513,6 @@ public:
};
/**
* Parse Header
* @param index self index for debugging
* @param length length of message
* @param message message
* @param acqIndex address of acquisition index
* @param frameIndex address of frame index
* @param subframeIndex address of subframe index
* @param filename address of file name
* @param fileindex address of file index
* @param dummy true if end of acquisition else false
* @returns true if successfull else false
*/
int ParseHeader(const int index, int length, zmq_msg_t& message, uint64_t &acqIndex,
uint64_t &frameIndex, uint32_t &subframeIndex, std::string &filename, uint64_t &fileIndex, bool& dummy)
{
acqIndex = -1;
frameIndex = -1;
subframeIndex = -1;
fileIndex = -1;
dummy = true;
Document d;
if ( d.Parse( (char*) zmq_msg_data (&message), zmq_msg_size (&message)).HasParseError() ) {
cprintf( RED,"%d Could not parse. len:%d: Message:%s \n", index, length, (char*) zmq_msg_data (&message) );
fflush ( stdout );
char* buf = (char*) zmq_msg_data (&message);
for ( int i= 0; i < length; ++i ) {
cprintf(RED,"%02x ",buf[i]);
}
printf("\n");
fflush( stdout );
return 0;
}
int temp = d["data"].GetUint();
dummy = temp ? false : true;
if (!dummy) {
acqIndex = d["acqIndex"].GetUint64();
frameIndex = d["fIndex"].GetUint64();
fileIndex = d["fileIndex"].GetUint64();
subframeIndex = d["expLength"].GetUint();
filename = d["fname"].GetString();
}
#ifdef VERYVERBOSE
cprintf(BLUE,"%d Dummy:%d\n"
"\tAcqIndex:%lu\n"
"\tFrameIndex:%lu\n"
"\tSubIndex:%u\n"
"\tFileIndex:%lu\n"
"\tBitMode:%u\n"
"\tDetType:%u\n",
index, (int)dummy, acqIndex, frameIndex, subframeIndex, fileIndex,
d["bitmode"].GetUint(),d["detType"].GetUint());
#endif
return 1;
};
/**
* Print error
@ -580,4 +588,7 @@ private:
/** Server Address */
char serverAddress[1000];
/** Header Message pointer */
zmq_msg_t* headerMessage;
};

View File

@ -27,8 +27,7 @@
//versions
#define HDF5_WRITER_VERSION 1.0 //1 decimal places
#define BINARY_WRITER_VERSION 1.0 //1 decimal places
#define SLS_DETECTOR_HEADER_VERSION 0x1
#define SLS_DETECTOR_JSON_HEADER_VERSION 0x2
//parameters to calculate fifo depth
#define SAMPLE_TIME_IN_NS 100000000//100ms

View File

@ -36,6 +36,9 @@ typedef int int32_t;
#define DEFAULT_ZMQ_CL_PORTNO 30001
#define DEFAULT_ZMQ_RX_PORTNO 30001
#define SLS_DETECTOR_HEADER_VERSION 0x1
#define SLS_DETECTOR_JSON_HEADER_VERSION 0x2
/**
\file sls_receiver_defs.h
This file contains all the basic definitions common to the slsReceiver class