mirror of
https://github.com/slsdetectorgroup/slsDetectorPackage.git
synced 2026-01-16 17:19:20 +01:00
Gappixels (#89)
* WIP * WIP virtual delays, imagetest for saturation * WIP, vertical and horizontal * WIP * gap pixels work, fixed 32 bit data out (10gbe=0) for virtual servers * quad works (also in virtual), handling gappixels and quad * jungfrau gapppixels work * jungfrau: done * complete image or missing packets given in json header and gui * eiger virtual 4 bit mode bug fix * working version of zmq add json header, except printout * printout bug * fix for json para * to map WIP * map done * map print , mapwith result left * json result works, testing added * updated server binaries * compiling on rhels7, variable size char array iniitalization * zmqsocket parsing didnt need Document * const to map, json para is strings not map * json add header: mapping cleaner without insert make_pair
This commit is contained in:
@@ -17,9 +17,59 @@
|
||||
|
||||
|
||||
class zmq_msg_t;
|
||||
#include <map>
|
||||
|
||||
/** zmq header structure */
|
||||
struct zmqHeader {
|
||||
/** true if incoming data, false if end of acquisition */
|
||||
bool data{true};
|
||||
uint32_t jsonversion{0};
|
||||
uint32_t dynamicRange{0};
|
||||
uint64_t fileIndex{0};
|
||||
/** number of detectors in x axis */
|
||||
uint32_t ndetx{0};
|
||||
/** number of detectors in y axis */
|
||||
uint32_t ndety{0};
|
||||
/** number of pixels/channels in x axis for this zmq socket */
|
||||
uint32_t npixelsx{0};
|
||||
/** number of pixels/channels in y axis for this zmq socket */
|
||||
uint32_t npixelsy{0};
|
||||
/** number of bytes for an image in this socket */
|
||||
uint32_t imageSize{0};
|
||||
/** frame number from detector */
|
||||
uint64_t acqIndex{0};
|
||||
/** frame index (starting at 0 for each acquisition) */
|
||||
uint64_t frameIndex{0};
|
||||
/** file name prefix */
|
||||
std::string fname{""};
|
||||
/** header from detector */
|
||||
uint64_t frameNumber{0};
|
||||
uint32_t expLength{0};
|
||||
uint32_t packetNumber{0};
|
||||
uint64_t bunchId{0};
|
||||
uint64_t timestamp{0};
|
||||
uint16_t modId{0};
|
||||
uint16_t row{0};
|
||||
uint16_t column{0};
|
||||
uint16_t reserved{0};
|
||||
uint32_t debug{0};
|
||||
uint16_t roundRNumber{0};
|
||||
uint8_t detType{0};
|
||||
uint8_t version{0};
|
||||
/** if image should be flipped across x axis */
|
||||
int flippedDataX{0};
|
||||
/** quad type (eiger hardware specific) */
|
||||
uint32_t quad{0};
|
||||
/** true if complete image, else missing packets */
|
||||
bool completeImage{false};
|
||||
/** additional json header */
|
||||
std::map<std::string, std::string> addJsonHeader;
|
||||
};
|
||||
|
||||
class ZmqSocket {
|
||||
|
||||
public:
|
||||
|
||||
// Socket Options for optimization
|
||||
// ZMQ_LINGER default is already -1 means no messages discarded. use this
|
||||
// options if optimizing required ZMQ_SNDHWM default is 0 means no limit. use
|
||||
@@ -110,47 +160,10 @@ class ZmqSocket {
|
||||
/**
|
||||
* Send Message Header
|
||||
* @param index self index for debugging
|
||||
* @param dummy true if a dummy message for end of acquisition
|
||||
* @param jsonversion json version
|
||||
* @param dynamicrange dynamic range
|
||||
* @param fileIndex file or acquisition index
|
||||
* @param ndetx number of detectors in x axis
|
||||
* @param ndety number of detectors in y axis
|
||||
* @param npixelsx number of pixels/channels in x axis for this zmq socket
|
||||
* @param npixelsy number of pixels/channels in y axis for this zmq socket
|
||||
* @param imageSize number of bytes for an image in this socket
|
||||
* @param frameNumber current frame number
|
||||
* @param expLength exposure length or subframe index if eiger
|
||||
* @param packetNumber number of packets caught for this frame
|
||||
* @param bunchId bunch id
|
||||
* @param timestamp time stamp
|
||||
* @param modId module Id
|
||||
* @param row row index in complete detector
|
||||
* @param column column index in complete detector
|
||||
* @param reserved reserved
|
||||
* @param debug debug
|
||||
* @param roundRNumber not used yet
|
||||
* @param detType detector enum
|
||||
* @param version detector header version
|
||||
* @param gapPixelsEnable gap pixels enable (exception: if gap pixels enable
|
||||
* for 4 bit mode, data is not yet gap pixel enabled in receiver)
|
||||
* @param flippedDataX if it is flipped across x axis
|
||||
* @param quadEnable if quad is enabled
|
||||
* @param additionalJsonHeader additional json header
|
||||
* @param header zmq header (from json)
|
||||
* @returns 0 if error, else 1
|
||||
*/
|
||||
int SendHeaderData(
|
||||
int index, bool dummy, uint32_t jsonversion, uint32_t dynamicrange = 0,
|
||||
uint64_t fileIndex = 0, uint32_t ndetx = 0, uint32_t ndety = 0,
|
||||
uint32_t npixelsx = 0, uint32_t npixelsy = 0, uint32_t imageSize = 0,
|
||||
uint64_t acqIndex = 0, uint64_t fIndex = 0, std::string fname = "",
|
||||
uint64_t frameNumber = 0, uint32_t expLength = 0,
|
||||
uint32_t packetNumber = 0, uint64_t bunchId = 0, uint64_t timestamp = 0,
|
||||
uint16_t modId = 0, uint16_t row = 0, uint16_t column = 0,
|
||||
uint16_t reserved = 0, uint32_t debug = 0, uint16_t roundRNumber = 0,
|
||||
uint8_t detType = 0, uint8_t version = 0, int gapPixelsEnable = 0,
|
||||
int flippedDataX = 0, uint32_t quadEnable = 0,
|
||||
std::string *additionalJsonHeader = 0);
|
||||
int SendHeader(int index, zmqHeader header);
|
||||
|
||||
/**
|
||||
* Send Message Body
|
||||
@@ -159,39 +172,16 @@ class ZmqSocket {
|
||||
* @returns 0 if error, else 1
|
||||
*/
|
||||
int SendData(char *buf, int length);
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Receive Header (Important to close message after parsing header)
|
||||
* Receive Header
|
||||
* @param index self index for debugging
|
||||
* @param document parsed document reference
|
||||
* @param zHeader filled out zmqHeader structure (parsed from json header)
|
||||
* @param version version that has to match, -1 to not care
|
||||
* @returns 0 if error or end of acquisition, else 1 (call
|
||||
* CloseHeaderMessage after parsing header)
|
||||
*/
|
||||
int ReceiveHeader(const int index, rapidjson::Document &document, uint32_t version);
|
||||
|
||||
/**
|
||||
* Close Header Message. Call this function if ReceiveHeader returned 1
|
||||
*/
|
||||
// void CloseHeaderMessage() {
|
||||
// if (headerMessage)
|
||||
// zmq_msg_close(headerMessage);
|
||||
// headerMessage = 0;
|
||||
// };
|
||||
/**
|
||||
* Parse Header
|
||||
* @param index self index for debugging
|
||||
* @param length length of message
|
||||
* @param message message
|
||||
* @param document parsed document reference
|
||||
* @param dummy true if end of acqusition, else false, loaded upon parsing
|
||||
* @param version version that has to match, -1 to not care
|
||||
* @returns true if successful else false
|
||||
*/
|
||||
int ParseHeader(const int index, int length, char *buff, rapidjson::Document &document,
|
||||
bool &dummy, uint32_t version);
|
||||
int ReceiveHeader(const int index, zmqHeader& zHeader, uint32_t version);
|
||||
|
||||
/**
|
||||
* Receive Data
|
||||
@@ -216,6 +206,19 @@ class ZmqSocket {
|
||||
* @returns length of message, -1 if error
|
||||
*/
|
||||
int ReceiveMessage(const int index, zmq_msg_t &message);
|
||||
|
||||
/**
|
||||
* Parse Header
|
||||
* @param index self index for debugging
|
||||
* @param length length of message
|
||||
* @param message message
|
||||
* @param zHeader filled out zmqHeader structure (parsed from json header)
|
||||
* @param version version that has to match, -1 to not care
|
||||
* @returns true if successful else false
|
||||
*/
|
||||
int ParseHeader(const int index, int length, char *buff,
|
||||
zmqHeader& zHeader, uint32_t version);
|
||||
|
||||
/**
|
||||
* Class to close socket descriptors automatically
|
||||
* upon encountering exceptions in the ZmqSocket constructor
|
||||
@@ -246,4 +249,6 @@ class ZmqSocket {
|
||||
|
||||
/** Socket descriptor */
|
||||
mySocketDescriptors sockfd;
|
||||
|
||||
|
||||
};
|
||||
|
||||
@@ -37,7 +37,7 @@
|
||||
#define DEFAULT_ZMQ_RX_PORTNO 30001
|
||||
|
||||
#define SLS_DETECTOR_HEADER_VERSION 0x2
|
||||
#define SLS_DETECTOR_JSON_HEADER_VERSION 0x3
|
||||
#define SLS_DETECTOR_JSON_HEADER_VERSION 0x4
|
||||
|
||||
// ctb/ moench 1g udp (read from fifo)
|
||||
#define UDP_PACKET_DATA_BYTES (1344)
|
||||
@@ -61,6 +61,7 @@
|
||||
|
||||
/** default maximum string length */
|
||||
#define MAX_STR_LENGTH 1000
|
||||
#define SHORT_STR_LENGTH 20
|
||||
|
||||
#define DEFAULT_STREAMING_TIMER_IN_MS 200
|
||||
|
||||
@@ -480,8 +481,6 @@ struct detParameters {
|
||||
int nChipY{0};
|
||||
int nDacs{0};
|
||||
int dynamicRange{0};
|
||||
int nGappixelsX{0};
|
||||
int nGappixelsY{0};
|
||||
|
||||
detParameters() = default;
|
||||
explicit detParameters(slsDetectorDefs::detectorType type) {
|
||||
@@ -493,8 +492,6 @@ struct detParameters {
|
||||
nChipY = 1;
|
||||
nDacs = 8;
|
||||
dynamicRange = 16;
|
||||
nGappixelsX = 0;
|
||||
nGappixelsY = 0;
|
||||
break;
|
||||
case slsDetectorDefs::detectorType::JUNGFRAU:
|
||||
nChanX = 256;
|
||||
@@ -503,8 +500,6 @@ struct detParameters {
|
||||
nChipY = 2;
|
||||
nDacs = 8;
|
||||
dynamicRange = 16;
|
||||
nGappixelsX = 0;
|
||||
nGappixelsY = 0;
|
||||
break;
|
||||
case slsDetectorDefs::detectorType::CHIPTESTBOARD:
|
||||
nChanX = 36;
|
||||
@@ -513,8 +508,6 @@ struct detParameters {
|
||||
nChipY = 1;
|
||||
nDacs = 24;
|
||||
dynamicRange = 16;
|
||||
nGappixelsX = 0;
|
||||
nGappixelsY = 0;
|
||||
break;
|
||||
case slsDetectorDefs::detectorType::MOENCH:
|
||||
nChanX = 32;
|
||||
@@ -523,8 +516,6 @@ struct detParameters {
|
||||
nChipY = 1;
|
||||
nDacs = 8;
|
||||
dynamicRange = 16;
|
||||
nGappixelsX = 0;
|
||||
nGappixelsY = 0;
|
||||
break;
|
||||
case slsDetectorDefs::detectorType::EIGER:
|
||||
nChanX = 256;
|
||||
@@ -533,8 +524,6 @@ struct detParameters {
|
||||
nChipY = 1;
|
||||
nDacs = 16;
|
||||
dynamicRange = 16;
|
||||
nGappixelsX = 6;
|
||||
nGappixelsY = 1;
|
||||
break;
|
||||
case slsDetectorDefs::detectorType::MYTHEN3:
|
||||
nChanX = 128 * 3;
|
||||
@@ -543,8 +532,6 @@ struct detParameters {
|
||||
nChipY = 1;
|
||||
nDacs = 16;
|
||||
dynamicRange = 32;
|
||||
nGappixelsX = 0;
|
||||
nGappixelsY = 0;
|
||||
break;
|
||||
case slsDetectorDefs::detectorType::GOTTHARD2:
|
||||
nChanX = 128;
|
||||
@@ -553,8 +540,6 @@ struct detParameters {
|
||||
nChipY = 1;
|
||||
nDacs = 14;
|
||||
dynamicRange = 16;
|
||||
nGappixelsX = 0;
|
||||
nGappixelsY = 0;
|
||||
break;
|
||||
default:
|
||||
throw sls::RuntimeError("Unknown detector type! " + std::to_string(type));
|
||||
|
||||
@@ -206,7 +206,6 @@ enum detFuncs{
|
||||
F_LOCK_RECEIVER,
|
||||
F_GET_LAST_RECEIVER_CLIENT_IP,
|
||||
F_SET_RECEIVER_PORT,
|
||||
F_UPDATE_RECEIVER_CLIENT,
|
||||
F_GET_RECEIVER_VERSION,
|
||||
F_GET_RECEIVER_TYPE,
|
||||
F_SEND_RECEIVER_DETHOSTNAME,
|
||||
@@ -256,7 +255,6 @@ enum detFuncs{
|
||||
F_GET_RECEIVER_STREAMING_SRC_IP,
|
||||
F_SET_RECEIVER_SILENT_MODE,
|
||||
F_GET_RECEIVER_SILENT_MODE,
|
||||
F_ENABLE_GAPPIXELS_IN_RECEIVER,
|
||||
F_RESTREAM_STOP_FROM_RECEIVER,
|
||||
F_SET_ADDITIONAL_JSON_HEADER,
|
||||
F_GET_ADDITIONAL_JSON_HEADER,
|
||||
@@ -287,6 +285,8 @@ enum detFuncs{
|
||||
F_RECEIVER_SET_ADC_MASK_10G,
|
||||
F_RECEIVER_SET_NUM_COUNTERS,
|
||||
F_INCREMENT_FILE_INDEX,
|
||||
F_SET_ADDITIONAL_JSON_PARAMETER,
|
||||
F_GET_ADDITIONAL_JSON_PARAMETER,
|
||||
NUM_REC_FUNCTIONS
|
||||
};
|
||||
|
||||
@@ -489,7 +489,6 @@ static const char* getFunctionNameFromEnum(enum detFuncs func) {
|
||||
case F_LOCK_RECEIVER: return "F_LOCK_RECEIVER";
|
||||
case F_GET_LAST_RECEIVER_CLIENT_IP: return "F_GET_LAST_RECEIVER_CLIENT_IP";
|
||||
case F_SET_RECEIVER_PORT: return "F_SET_RECEIVER_PORT";
|
||||
case F_UPDATE_RECEIVER_CLIENT: return "F_UPDATE_RECEIVER_CLIENT";
|
||||
case F_GET_RECEIVER_VERSION: return "F_GET_RECEIVER_VERSION";
|
||||
case F_GET_RECEIVER_TYPE: return "F_GET_RECEIVER_TYPE";
|
||||
case F_SEND_RECEIVER_DETHOSTNAME: return "F_SEND_RECEIVER_DETHOSTNAME";
|
||||
@@ -539,7 +538,6 @@ static const char* getFunctionNameFromEnum(enum detFuncs func) {
|
||||
case F_GET_RECEIVER_STREAMING_SRC_IP: return "F_GET_RECEIVER_STREAMING_SRC_IP";
|
||||
case F_SET_RECEIVER_SILENT_MODE: return "F_SET_RECEIVER_SILENT_MODE";
|
||||
case F_GET_RECEIVER_SILENT_MODE: return "F_GET_RECEIVER_SILENT_MODE";
|
||||
case F_ENABLE_GAPPIXELS_IN_RECEIVER: return "F_ENABLE_GAPPIXELS_IN_RECEIVER";
|
||||
case F_RESTREAM_STOP_FROM_RECEIVER: return "F_RESTREAM_STOP_FROM_RECEIVER";
|
||||
case F_SET_ADDITIONAL_JSON_HEADER: return "F_SET_ADDITIONAL_JSON_HEADER";
|
||||
case F_GET_ADDITIONAL_JSON_HEADER: return "F_GET_ADDITIONAL_JSON_HEADER";
|
||||
@@ -570,7 +568,8 @@ static const char* getFunctionNameFromEnum(enum detFuncs func) {
|
||||
case F_RECEIVER_SET_ADC_MASK_10G: return "F_RECEIVER_SET_ADC_MASK_10G";
|
||||
case F_RECEIVER_SET_NUM_COUNTERS: return "F_RECEIVER_SET_NUM_COUNTERS";
|
||||
case F_INCREMENT_FILE_INDEX: return "F_INCREMENT_FILE_INDEX";
|
||||
|
||||
case F_SET_ADDITIONAL_JSON_PARAMETER: return "F_SET_ADDITIONAL_JSON_PARAMETER";
|
||||
case F_GET_ADDITIONAL_JSON_PARAMETER: return "F_GET_ADDITIONAL_JSON_PARAMETER";
|
||||
|
||||
case NUM_REC_FUNCTIONS: return "NUM_REC_FUNCTIONS";
|
||||
default: return "Unknown Function";
|
||||
|
||||
@@ -3,10 +3,10 @@
|
||||
#define APILIB 0x200227
|
||||
#define APIRECEIVER 0x200227
|
||||
#define APIGUI 0x200227
|
||||
#define APICTB 0x200310
|
||||
#define APIGOTTHARD 0x200310
|
||||
#define APIJUNGFRAU 0x200310
|
||||
#define APIMYTHEN3 0x200310
|
||||
#define APIMOENCH 0x200310
|
||||
#define APIEIGER 0x200310
|
||||
#define APIGOTTHARD2 0x200313
|
||||
#define APICTB 0x200311
|
||||
#define APIGOTTHARD 0x200326
|
||||
#define APIGOTTHARD2 0x200326
|
||||
#define APIJUNGFRAU 0x200326
|
||||
#define APIMYTHEN3 0x200311
|
||||
#define APIMOENCH 0x200326
|
||||
#define APIEIGER 0x200326
|
||||
|
||||
@@ -145,6 +145,244 @@ int ZmqSocket::ConvertInternetAddresstoIpString(struct addrinfo *res, char *ip,
|
||||
return 1;
|
||||
}
|
||||
|
||||
int ZmqSocket::SendHeader(
|
||||
int index, zmqHeader header) {
|
||||
|
||||
/** Json Header Format */
|
||||
const char jsonHeaderFormat[] = "{"
|
||||
"\"jsonversion\":%u, "
|
||||
"\"bitmode\":%u, "
|
||||
"\"fileIndex\":%lu, "
|
||||
"\"detshape\":[%u, %u], "
|
||||
"\"shape\":[%u, %u], "
|
||||
"\"size\":%u, "
|
||||
"\"acqIndex\":%lu, "
|
||||
"\"frameIndex\":%lu, "
|
||||
"\"fname\":\"%s\", "
|
||||
"\"data\": %d, "
|
||||
"\"completeImage\": %d, "
|
||||
|
||||
"\"frameNumber\":%lu, "
|
||||
"\"expLength\":%u, "
|
||||
"\"packetNumber\":%u, "
|
||||
"\"bunchId\":%lu, "
|
||||
"\"timestamp\":%lu, "
|
||||
"\"modId\":%u, "
|
||||
"\"row\":%u, "
|
||||
"\"column\":%u, "
|
||||
"\"reserved\":%u, "
|
||||
"\"debug\":%u, "
|
||||
"\"roundRNumber\":%u, "
|
||||
"\"detType\":%u, "
|
||||
"\"version\":%u, "
|
||||
|
||||
// additional stuff
|
||||
"\"flippedDataX\":%u, "
|
||||
"\"quad\":%u"
|
||||
|
||||
; //"}\n";
|
||||
char buf[MAX_STR_LENGTH] = "";
|
||||
sprintf(buf, jsonHeaderFormat,
|
||||
header.jsonversion,
|
||||
header.dynamicRange,
|
||||
header.fileIndex,
|
||||
header.ndetx,
|
||||
header.ndety,
|
||||
header.npixelsx,
|
||||
header.npixelsy,
|
||||
header.imageSize,
|
||||
header.acqIndex,
|
||||
header.frameIndex,
|
||||
header.fname.c_str(),
|
||||
header.data ? 1 : 0,
|
||||
header.completeImage ? 1 : 0,
|
||||
|
||||
header.frameNumber,
|
||||
header.expLength,
|
||||
header.packetNumber,
|
||||
header.bunchId,
|
||||
header.timestamp,
|
||||
header.modId,
|
||||
header.row,
|
||||
header.column,
|
||||
header.reserved,
|
||||
header.debug,
|
||||
header.roundRNumber,
|
||||
header.detType,
|
||||
header.version,
|
||||
|
||||
// additional stuff
|
||||
header.flippedDataX,
|
||||
header.quad);
|
||||
|
||||
if (header.addJsonHeader.size() > 0) {
|
||||
strcat(buf, ", ");
|
||||
strcat(buf, "\"addJsonHeader\": {");
|
||||
for (auto it = header.addJsonHeader.begin(); it != header.addJsonHeader.end(); ++it) {
|
||||
if (it != header.addJsonHeader.begin()) {
|
||||
strcat(buf, ", ");
|
||||
}
|
||||
strcat(buf, "\"");
|
||||
strcat(buf, it->first.c_str());
|
||||
strcat(buf, "\":\"");
|
||||
strcat(buf, it->second.c_str());
|
||||
strcat(buf, "\"");
|
||||
}
|
||||
strcat(buf, " } ");
|
||||
}
|
||||
|
||||
strcat(buf, "}\n");
|
||||
int length = strlen(buf);
|
||||
|
||||
#ifdef VERBOSE
|
||||
// if(!index)
|
||||
cprintf(BLUE, "%d : Streamer: buf: %s\n", index, buf);
|
||||
#endif
|
||||
|
||||
if (zmq_send(sockfd.socketDescriptor, buf, length,
|
||||
header.data ? ZMQ_SNDMORE : 0) < 0) {
|
||||
PrintError();
|
||||
return 0;
|
||||
}
|
||||
#ifdef VERBOSE
|
||||
cprintf(GREEN, "[%u] send header data\n", portno);
|
||||
#endif
|
||||
return 1;
|
||||
}
|
||||
|
||||
int ZmqSocket::SendData(char *buf, int length) {
|
||||
if (zmq_send(sockfd.socketDescriptor, buf, length, 0) < 0) {
|
||||
PrintError();
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
int ZmqSocket::ReceiveHeader(const int index, zmqHeader& zHeader,
|
||||
uint32_t version) {
|
||||
std::vector<char> buffer(MAX_STR_LENGTH);
|
||||
int len =
|
||||
zmq_recv(sockfd.socketDescriptor, buffer.data(), buffer.size(), 0);
|
||||
if (len > 0) {
|
||||
#ifdef ZMQ_DETAIL
|
||||
cprintf(BLUE, "Header %d [%d] Length: %d Header:%s \n", index, portno,
|
||||
len, buffer.data());
|
||||
#endif
|
||||
if (ParseHeader(index, len, buffer.data(), zHeader, version)) {
|
||||
#ifdef ZMQ_DETAIL
|
||||
cprintf(RED, "Parsed Header %d [%d] Length: %d Header:%s \n", index,
|
||||
portno, len, buffer.data());
|
||||
#endif
|
||||
if (!zHeader.data) {
|
||||
#ifdef ZMQ_DETAIL
|
||||
cprintf(RED, "%d [%d] Received end of acquisition\n", index,
|
||||
portno);
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
#ifdef ZMQ_DETAIL
|
||||
cprintf(GREEN, "%d [%d] data\n", index, portno);
|
||||
#endif
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
};
|
||||
|
||||
int ZmqSocket::ParseHeader(const int index, int length, char *buff,
|
||||
zmqHeader& zHeader, uint32_t version) {
|
||||
Document document;
|
||||
if (document.Parse(buff, length).HasParseError()) {
|
||||
LOG(logERROR) << index << " Could not parse. len:" << length
|
||||
<< ": Message:" << buff;
|
||||
fflush(stdout);
|
||||
// char* buf = (char*) zmq_msg_data (&message);
|
||||
for (int i = 0; i < length; ++i) {
|
||||
cprintf(RED, "%02x ", buff[i]);
|
||||
}
|
||||
printf("\n");
|
||||
fflush(stdout);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// version check
|
||||
zHeader.jsonversion = document["jsonversion"].GetUint();
|
||||
if (zHeader.jsonversion != version) {
|
||||
LOG(logERROR) << "version mismatch. required " << version << ", got "
|
||||
<< zHeader.jsonversion;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// parse
|
||||
zHeader.data = ((document["data"].GetUint()) == 0) ? false : true;
|
||||
zHeader.dynamicRange = document["bitmode"].GetUint();
|
||||
zHeader.fileIndex = document["fileIndex"].GetUint64();
|
||||
zHeader.ndetx = document["detshape"][0].GetUint();
|
||||
zHeader.ndety = document["detshape"][1].GetUint();
|
||||
zHeader.npixelsx = document["shape"][0].GetUint();
|
||||
zHeader.npixelsy = document["shape"][1].GetUint();
|
||||
zHeader.imageSize = document["size"].GetUint();
|
||||
zHeader.acqIndex = document["acqIndex"].GetUint64();
|
||||
zHeader.frameIndex = document["frameIndex"].GetUint64();
|
||||
zHeader.fname = document["fname"].GetString();
|
||||
|
||||
zHeader.frameNumber = document["frameNumber"].GetUint64();
|
||||
zHeader.expLength = document["expLength"].GetUint();
|
||||
zHeader.packetNumber = document["packetNumber"].GetUint();
|
||||
zHeader.bunchId = document["bunchId"].GetUint64();
|
||||
zHeader.timestamp = document["timestamp"].GetUint64();
|
||||
zHeader.modId = document["modId"].GetUint();
|
||||
zHeader.row = document["row"].GetUint();
|
||||
zHeader.column = document["column"].GetUint();
|
||||
zHeader.reserved = document["reserved"].GetUint();
|
||||
zHeader.debug = document["debug"].GetUint();
|
||||
zHeader.roundRNumber = document["roundRNumber"].GetUint();
|
||||
zHeader.detType = document["detType"].GetUint();
|
||||
zHeader.version = document["version"].GetUint();
|
||||
|
||||
zHeader.flippedDataX = document["flippedDataX"].GetUint();
|
||||
zHeader.quad = document["quad"].GetUint();
|
||||
zHeader.completeImage = document["completeImage"].GetUint();
|
||||
|
||||
if (document.HasMember("addJsonHeader")) {
|
||||
const Value& V = document["addJsonHeader"];
|
||||
zHeader.addJsonHeader.clear();
|
||||
for (Value::ConstMemberIterator iter = V.MemberBegin(); iter != V.MemberEnd(); ++iter){
|
||||
zHeader.addJsonHeader[iter->name.GetString()] = iter->value.GetString();
|
||||
}
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int ZmqSocket::ReceiveData(const int index, char *buf, const int size) {
|
||||
zmq_msg_t message;
|
||||
zmq_msg_init(&message);
|
||||
int length = ReceiveMessage(index, message);
|
||||
if (length == size) {
|
||||
memcpy(buf, (char *)zmq_msg_data(&message), size);
|
||||
} else if (length < size) {
|
||||
memcpy(buf, (char *)zmq_msg_data(&message), length);
|
||||
memset(buf + length, 0xFF, size - length);
|
||||
} else {
|
||||
LOG(logERROR) << "Received weird packet size " << length
|
||||
<< " for socket " << index;
|
||||
memset(buf, 0xFF, size);
|
||||
}
|
||||
|
||||
zmq_msg_close(&message);
|
||||
return length;
|
||||
}
|
||||
|
||||
int ZmqSocket::ReceiveMessage(const int index, zmq_msg_t &message) {
|
||||
int length = zmq_msg_recv(&message, sockfd.socketDescriptor, 0);
|
||||
if (length == -1) {
|
||||
PrintError();
|
||||
LOG(logERROR) << "Could not read header for socket " << index;
|
||||
}
|
||||
return length;
|
||||
}
|
||||
|
||||
void ZmqSocket::PrintError() {
|
||||
switch (errno) {
|
||||
case EINVAL:
|
||||
@@ -210,181 +448,6 @@ void ZmqSocket::PrintError() {
|
||||
}
|
||||
}
|
||||
|
||||
int ZmqSocket::ReceiveData(const int index, char *buf, const int size) {
|
||||
zmq_msg_t message;
|
||||
zmq_msg_init(&message);
|
||||
int length = ReceiveMessage(index, message);
|
||||
if (length == size) {
|
||||
memcpy(buf, (char *)zmq_msg_data(&message), size);
|
||||
} else if (length < size) {
|
||||
memcpy(buf, (char *)zmq_msg_data(&message), length);
|
||||
memset(buf + length, 0xFF, size - length);
|
||||
} else {
|
||||
LOG(logERROR) << "Received weird packet size " << length
|
||||
<< " for socket " << index;
|
||||
memset(buf, 0xFF, size);
|
||||
}
|
||||
|
||||
zmq_msg_close(&message);
|
||||
return length;
|
||||
}
|
||||
|
||||
int ZmqSocket::ParseHeader(const int index, int length, char *buff,
|
||||
Document &document, bool &dummy, uint32_t version) {
|
||||
if (document.Parse(buff, length).HasParseError()) {
|
||||
LOG(logERROR) << index << " Could not parse. len:" << length
|
||||
<< ": Message:" << buff;
|
||||
fflush(stdout);
|
||||
// char* buf = (char*) zmq_msg_data (&message);
|
||||
for (int i = 0; i < length; ++i) {
|
||||
cprintf(RED, "%02x ", buff[i]);
|
||||
}
|
||||
printf("\n");
|
||||
fflush(stdout);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (document["jsonversion"].GetUint() != version) {
|
||||
LOG(logERROR) << "version mismatch. required " << version << ", got "
|
||||
<< document["jsonversion"].GetUint();
|
||||
return 0;
|
||||
}
|
||||
|
||||
dummy = false;
|
||||
int temp = document["data"].GetUint();
|
||||
dummy = temp ? false : true;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int ZmqSocket::ReceiveHeader(const int index, Document &document,
|
||||
uint32_t version) {
|
||||
std::vector<char> buffer(MAX_STR_LENGTH);
|
||||
int len =
|
||||
zmq_recv(sockfd.socketDescriptor, buffer.data(), buffer.size(), 0);
|
||||
if (len > 0) {
|
||||
bool dummy = false;
|
||||
#ifdef ZMQ_DETAIL
|
||||
cprintf(BLUE, "Header %d [%d] Length: %d Header:%s \n", index, portno,
|
||||
len, buffer.data());
|
||||
#endif
|
||||
if (ParseHeader(index, len, buffer.data(), document, dummy, version)) {
|
||||
#ifdef ZMQ_DETAIL
|
||||
cprintf(RED, "Parsed Header %d [%d] Length: %d Header:%s \n", index,
|
||||
portno, len, buffer.data());
|
||||
#endif
|
||||
if (dummy) {
|
||||
#ifdef ZMQ_DETAIL
|
||||
cprintf(RED, "%d [%d] Received end of acquisition\n", index,
|
||||
portno);
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
#ifdef ZMQ_DETAIL
|
||||
cprintf(GREEN, "%d [%d] data\n", index, portno);
|
||||
#endif
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
};
|
||||
|
||||
int ZmqSocket::ReceiveMessage(const int index, zmq_msg_t &message) {
|
||||
int length = zmq_msg_recv(&message, sockfd.socketDescriptor, 0);
|
||||
if (length == -1) {
|
||||
PrintError();
|
||||
LOG(logERROR) << "Could not read header for socket " << index;
|
||||
}
|
||||
return length;
|
||||
}
|
||||
|
||||
int ZmqSocket::SendData(char *buf, int length) {
|
||||
if (zmq_send(sockfd.socketDescriptor, buf, length, 0) < 0) {
|
||||
PrintError();
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
int ZmqSocket::SendHeaderData(
|
||||
int index, bool dummy, uint32_t jsonversion, uint32_t dynamicrange,
|
||||
uint64_t fileIndex, uint32_t ndetx, uint32_t ndety, uint32_t npixelsx,
|
||||
uint32_t npixelsy, uint32_t imageSize, uint64_t acqIndex, uint64_t fIndex,
|
||||
std::string fname, uint64_t frameNumber, uint32_t expLength,
|
||||
uint32_t packetNumber, uint64_t bunchId, uint64_t timestamp, uint16_t modId,
|
||||
uint16_t row, uint16_t column, uint16_t reserved, uint32_t debug,
|
||||
uint16_t roundRNumber, uint8_t detType, uint8_t version,
|
||||
int gapPixelsEnable, int flippedDataX, uint32_t quadEnable,
|
||||
std::string *additionalJsonHeader) {
|
||||
|
||||
/** Json Header Format */
|
||||
const char jsonHeaderFormat[] = "{"
|
||||
"\"jsonversion\":%u, "
|
||||
"\"bitmode\":%u, "
|
||||
"\"fileIndex\":%lu, "
|
||||
"\"detshape\":[%u, %u], "
|
||||
"\"shape\":[%u, %u], "
|
||||
"\"size\":%u, "
|
||||
"\"acqIndex\":%lu, "
|
||||
"\"fIndex\":%lu, "
|
||||
"\"fname\":\"%s\", "
|
||||
"\"data\": %d, "
|
||||
|
||||
"\"frameNumber\":%lu, "
|
||||
"\"expLength\":%u, "
|
||||
"\"packetNumber\":%u, "
|
||||
"\"bunchId\":%lu, "
|
||||
"\"timestamp\":%lu, "
|
||||
"\"modId\":%u, "
|
||||
"\"row\":%u, "
|
||||
"\"column\":%u, "
|
||||
"\"reserved\":%u, "
|
||||
"\"debug\":%u, "
|
||||
"\"roundRNumber\":%u, "
|
||||
"\"detType\":%u, "
|
||||
"\"version\":%u, "
|
||||
|
||||
// additional stuff
|
||||
"\"gappixels\":%u, "
|
||||
"\"flippedDataX\":%u, "
|
||||
"\"quad\":%u"
|
||||
|
||||
; //"}\n";
|
||||
char buf[MAX_STR_LENGTH] = "";
|
||||
sprintf(buf, jsonHeaderFormat, jsonversion, dynamicrange, fileIndex, ndetx,
|
||||
ndety, npixelsx, npixelsy, imageSize, acqIndex, fIndex,
|
||||
fname.c_str(), dummy ? 0 : 1,
|
||||
|
||||
frameNumber, expLength, packetNumber, bunchId, timestamp, modId,
|
||||
row, column, reserved, debug, roundRNumber, detType, version,
|
||||
|
||||
// additional stuff
|
||||
gapPixelsEnable, flippedDataX, quadEnable);
|
||||
|
||||
if (additionalJsonHeader && !((*additionalJsonHeader).empty())) {
|
||||
strcat(buf, ", ");
|
||||
strcat(buf, (*additionalJsonHeader).c_str());
|
||||
}
|
||||
strcat(buf, "}\n");
|
||||
int length = strlen(buf);
|
||||
|
||||
#ifdef VERBOSE
|
||||
// if(!index)
|
||||
cprintf(BLUE, "%d : Streamer: buf: %s\n", index, buf);
|
||||
#endif
|
||||
|
||||
if (zmq_send(sockfd.socketDescriptor, buf, length,
|
||||
dummy ? 0 : ZMQ_SNDMORE) < 0) {
|
||||
PrintError();
|
||||
return 0;
|
||||
}
|
||||
#ifdef VERBOSE
|
||||
cprintf(GREEN, "[%u] send header data\n", portno);
|
||||
#endif
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
//Nested class to do RAII handling of socket descriptors
|
||||
ZmqSocket::mySocketDescriptors::mySocketDescriptors()
|
||||
: server(false), contextDescriptor(0), socketDescriptor(0){};
|
||||
|
||||
Reference in New Issue
Block a user