changed zmq method, and resolved warnings and from esrf

This commit is contained in:
Dhanya Maliakal 2017-05-03 17:57:56 +02:00
parent e2d1d58acf
commit 49b4ae2f56
8 changed files with 110 additions and 91 deletions

View File

@ -188,12 +188,6 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject {
/** mutex to update static items among objects (threads)*/ /** mutex to update static items among objects (threads)*/
static pthread_mutex_t Mutex; static pthread_mutex_t Mutex;
/** Json Header Format for each measurement part */
static const char *jsonHeaderFormat_part1;
/** Json Header Format */
static const char *jsonHeaderFormat;
/** GeneralData (Detector Data) object */ /** GeneralData (Detector Data) object */
const GeneralData* generalData; const GeneralData* generalData;

View File

@ -92,7 +92,7 @@ class ThreadObject : private virtual slsReceiverDefs {
int index; int index;
/** Thread is alive/dead */ /** Thread is alive/dead */
bool alive; volatile bool alive;
/** Variable monitored by thread to kills itself */ /** Variable monitored by thread to kills itself */
volatile bool killThread; volatile bool killThread;

View File

@ -17,9 +17,7 @@
#include <rapidjson/document.h> //json header in zmq stream #include <rapidjson/document.h> //json header in zmq stream
using namespace rapidjson; using namespace rapidjson;
#define DEFAULT_ZMQ_PORTNO 70001 #define DEFAULT_ZMQ_PORTNO 40001
#define DUMMY_MSG_SIZE 3
#define DUMMY_MSG "end"
class ZmqSocket { class ZmqSocket {
@ -187,9 +185,28 @@ public:
/** /**
* Send Message Header * Send Message Header
* @param buf message
* @param length length of message
* @param dummy true if end of acquistion else false
* @returns 0 if error, else 1 * @returns 0 if error, else 1
*/ */
int SendHeaderData (char* buf, int length) { int SendHeaderData (uint32_t jsonversion, uint32_t dynamicrange, uint32_t npixelsx, uint32_t npixelsy,
uint64_t acqIndex, uint64_t fIndex, char* fname, bool dummy,
uint64_t frameNumber, uint32_t expLength, uint32_t packetNumber, uint64_t bunchId, uint64_t timestamp,
uint16_t modId, uint16_t xCoord, uint16_t yCoord, uint16_t zCoord, uint32_t debug, uint16_t roundRNumber,
uint8_t detType, uint8_t version) {
char buf[MAX_STR_LENGTH] = "";
int length = sprintf(buf, jsonHeaderFormat,
jsonversion, dynamicrange, npixelsx, npixelsy,
acqIndex, fIndex, fname, dummy?1:0,
frameNumber, expLength, packetNumber, bunchId, timestamp,
modId, xCoord, yCoord, zCoord, debug, roundRNumber,
detType, version);
#ifdef VERBOSE
printf("%d Streamer: buf:%s\n", index, buf);
#endif
if(zmq_send (socketDescriptor, buf, length, ZMQ_SNDMORE) < 0) { if(zmq_send (socketDescriptor, buf, length, ZMQ_SNDMORE) < 0) {
PrintError (); PrintError ();
return 0; return 0;
@ -199,6 +216,8 @@ public:
/** /**
* Send Message Body * Send Message Body
* @param buf message
* @param length length of message
* @returns 0 if error, else 1 * @returns 0 if error, else 1
*/ */
int SendData (char* buf, int length) { int SendData (char* buf, int length) {
@ -213,9 +232,10 @@ public:
/** /**
* Receive Message * Receive Message
* @param index self index for debugging * @param index self index for debugging
* @param message message
* @returns length of message, -1 if error * @returns length of message, -1 if error
*/ */
int ReceiveMessage(const int index) { int ReceiveMessage(const int index, zmq_msg_t& message) {
int length = zmq_msg_recv (&message, socketDescriptor, 0); int length = zmq_msg_recv (&message, socketDescriptor, 0);
if (length == -1) { if (length == -1) {
PrintError (); PrintError ();
@ -232,18 +252,27 @@ public:
* @param frameIndex address of frame index * @param frameIndex address of frame index
* @param subframeIndex address of subframe index * @param subframeIndex address of subframe index
* @param filename address of file name * @param filename address of file name
* @returns 0 if error, else 1 * @returns 0 if error or end of acquisition, else 1
*/ */
int ReceiveHeader(const int index, uint64_t &acqIndex, int ReceiveHeader(const int index, uint64_t &acqIndex,
uint64_t &frameIndex, uint32_t &subframeIndex, string &filename) uint64_t &frameIndex, uint32_t &subframeIndex, string &filename)
{ {
zmq_msg_t message;
zmq_msg_init (&message); zmq_msg_init (&message);
if (ReceiveMessage(index) > 0) { int len = ReceiveMessage(index, message);
if (ParseHeader(index, acqIndex, frameIndex, subframeIndex, filename)) { if ( len > 0 ) {
zmq_msg_close(&message); bool dummy = false;
if ( ParseHeader (index, len, message, acqIndex, frameIndex, subframeIndex, filename, dummy)) {
zmq_msg_close (&message);
#ifdef VERBOSE #ifdef VERBOSE
cprintf(BLUE,"%d header rxd\n",index); cprintf( RED,"%d Length: %d Header:%s \n", index, length, (char*) zmq_msg_data (&message) );
#endif #endif
if (dummy) {
#ifdef VERBOSE
cprintf(RED,"%d Received end of acquisition\n", index);
#endif
return 0;
}
return 1; return 1;
} }
} }
@ -256,21 +285,13 @@ public:
* @param index self index for debugging * @param index self index for debugging
* @param buf buffer to copy image data to * @param buf buffer to copy image data to
* @param size size of image * @param size size of image
* @returns 0 if error, else 1 * @returns length of data received
*/ */
int ReceiveData(const int index, int* buf, const int size) int ReceiveData(const int index, int* buf, const int size)
{ {
zmq_msg_t message;
zmq_msg_init (&message); zmq_msg_init (&message);
int length = ReceiveMessage(index); int length = ReceiveMessage(index, message);
//dummy
if (length == DUMMY_MSG_SIZE) {
#ifdef VERBOSE
cprintf(RED,"%d Received end of acquisition\n", index);
#endif
zmq_msg_close(&message);
return 0;
}
//actual data //actual data
if (length == size) { if (length == size) {
@ -287,27 +308,41 @@ public:
} }
zmq_msg_close(&message); zmq_msg_close(&message);
return 1; return length;
}; };
/** /**
* Parse Header * Parse Header
* @param index self index for debugging * @param index self index for debugging
* @param length length of message
* @param message message
* @param acqIndex address of acquisition index * @param acqIndex address of acquisition index
* @param frameIndex address of frame index * @param frameIndex address of frame index
* @param subframeIndex address of subframe index * @param subframeIndex address of subframe index
* @param filename address of file name * @param filename address of file name
* @param dummy true if end of acquisition else false
* @returns true if successfull else false
*/ */
int ParseHeader(const int index, uint64_t &acqIndex, int ParseHeader(const int index, int length, zmq_msg_t& message, uint64_t &acqIndex,
uint64_t &frameIndex, uint32_t &subframeIndex, string &filename) uint64_t &frameIndex, uint32_t &subframeIndex, string &filename, bool& dummy)
{ {
Document d; Document d;
if (d.Parse( (char*)zmq_msg_data(&message), zmq_msg_size(&message)).HasParseError()) { if ( d.Parse( (char*) zmq_msg_data (&message), zmq_msg_size (&message)).HasParseError() ) {
cprintf (RED,"Error: Could not parse header for socket %d\n",index); cprintf( RED,"%d Could not parse. len:%d: Message:%s \n", index, length, (char*) zmq_msg_data (&message) );
fflush ( stdout );
char* buf = (char*) zmq_msg_data (&message);
for ( int i= 0; i < length; ++i ) {
cprintf(RED,"%02x ",buf[i]);
}
printf("\n");
fflush( stdout );
return 0; return 0;
} }
if(d["acqIndex"].GetUint64()!=(uint64_t)-1) {
int temp = d["data"].GetUint();
dummy = temp ? true : false;
if (dummy) {
acqIndex = d["acqIndex"].GetUint64(); acqIndex = d["acqIndex"].GetUint64();
frameIndex = d["fIndex"].GetUint64(); frameIndex = d["fIndex"].GetUint64();
subframeIndex = -1; subframeIndex = -1;
@ -316,6 +351,7 @@ public:
} }
filename = d["fname"].GetString(); filename = d["fname"].GetString();
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout << "Data: " << temp << endl;
cout << "Acquisition index: " << acqIndex << endl; cout << "Acquisition index: " << acqIndex << endl;
cout << "Frame index: " << frameIndex << endl; cout << "Frame index: " << frameIndex << endl;
cout << "Subframe index: " << subframeIndex << endl; cout << "Subframe index: " << subframeIndex << endl;
@ -399,6 +435,30 @@ private:
/** Server Address */ /** Server Address */
char serverAddress[1000]; char serverAddress[1000];
/** Zmq Message */ /** Json Header Format */
zmq_msg_t message; static const char* jsonHeaderFormat =
"{"
"\"jsonversion\":%u, "
"\"bitmode\":%d, "
"\"shape\":[%d, %d], "
"\"acqIndex\":%llu, "
"\"fIndex\":%llu, "
"\"fname\":\"%s\", "
"\"data\": %d, "
"\"frameNumber\":%llu, "
"\"expLength\":%u, "
"\"packetNumber\":%u, "
"\"bunchId\":%llu, "
"\"timestamp\":%llu, "
"\"modId\":%u, "
"\"xCoord\":%u, "
"\"yCoord\":%u, "
"\"zCoord\":%u, "
"\"debug\":%u, "
"\"roundRNumber\":%u, "
"\"detType\":%u, "
"\"version\":%u"
"}\n\0";
}; };

View File

@ -101,9 +101,9 @@ enum communicationProtocol{
{ {
memset(&serverAddress, 0, sizeof(serverAddress)); memset(&serverAddress, 0, sizeof(serverAddress));
memset(&clientAddress, 0, sizeof(clientAddress)); memset(&clientAddress, 0, sizeof(clientAddress));
strcpy(lastClientIP,"none"); memset(lastClientIP,0,INET_ADDRSTRLEN);
strcpy(thisClientIP,"none1"); memset(thisClientIP,0,INET_ADDRSTRLEN);
strcpy(dummyClientIP,"dummy"); memset(dummyClientIP,0,INET_ADDRSTRLEN);
differentClients = 0; differentClients = 0;
struct hostent *hostInfo = gethostbyname(host_ip_or_name); struct hostent *hostInfo = gethostbyname(host_ip_or_name);
if (hostInfo == NULL){ if (hostInfo == NULL){
@ -166,9 +166,9 @@ enum communicationProtocol{
/* // you can specify an IP address: */ /* // you can specify an IP address: */
/* // or you can let it automatically select one: */ /* // or you can let it automatically select one: */
/* myaddr.sin_addr.s_addr = INADDR_ANY; */ /* myaddr.sin_addr.s_addr = INADDR_ANY; */
strcpy(lastClientIP,"none"); memset(lastClientIP,0,INET_ADDRSTRLEN);
strcpy(thisClientIP,"none1"); memset(thisClientIP,0,INET_ADDRSTRLEN);
strcpy(dummyClientIP,"dummy"); memset(dummyClientIP,0,INET_ADDRSTRLEN);
differentClients = 0; differentClients = 0;

View File

@ -6,16 +6,14 @@
#include <unistd.h> #include <unistd.h>
#include <ansi.h> #include <ansi.h>
#ifdef VERBOSE
#define FILELOG_MAX_LEVEL logDEBUG
#endif
#ifdef VERYVERBOSE
#define FILELOG_MAX_LEVEL logDEBUG4
#endif
#ifdef FIFODEBUG #ifdef FIFODEBUG
#define FILELOG_MAX_LEVEL logDEBUG5 #define FILELOG_MAX_LEVEL logDEBUG5
#elif VERYVERBOSE
#define FILELOG_MAX_LEVEL logDEBUG4
#elif VERBOSE
#define FILELOG_MAX_LEVEL logDEBUG
#endif #endif
#ifndef FILELOG_MAX_LEVEL #ifndef FILELOG_MAX_LEVEL

View File

@ -316,7 +316,7 @@ void DataProcessor::ProcessAnImage(char* buf) {
if (*fileWriteEnable) if (*fileWriteEnable)
file->WriteToFile(buf, generalData->fifoBufferSize + sizeof(sls_detector_header), fnum-firstMeasurementIndex); file->WriteToFile(buf, generalData->fifoBufferSize + sizeof(sls_detector_header), fnum-firstMeasurementIndex);
if (rawDataReadyCallBack) if (rawDataReadyCallBack) {
rawDataReadyCallBack( rawDataReadyCallBack(
header->frameNumber, header->frameNumber,
header->expLength, header->expLength,
@ -334,6 +334,7 @@ void DataProcessor::ProcessAnImage(char* buf) {
buf + sizeof(sls_detector_header), buf + sizeof(sls_detector_header),
generalData->imageSize, generalData->imageSize,
pRawDataReady); pRawDataReady);
}
} }

View File

@ -23,30 +23,6 @@ uint64_t DataStreamer::RunningMask(0x0);
pthread_mutex_t DataStreamer::Mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t DataStreamer::Mutex = PTHREAD_MUTEX_INITIALIZER;
const char* DataStreamer::jsonHeaderFormat =
"{"
"\"jsonversion\":%u, "
"\"bitmode\":%d, "
"\"shape\":[%d, %d], "
"\"acqIndex\":%llu, "
"\"fIndex\":%llu, "
"\"fname\":\"%s\", "
"\"frameNumber\":%llu, "
"\"expLength\":%u, "
"\"packetNumber\":%u, "
"\"bunchId\":%llu, "
"\"timestamp\":%llu, "
"\"modId\":%u, "
"\"xCoord\":%u, "
"\"yCoord\":%u, "
"\"zCoord\":%u, "
"\"debug\":%u, "
"\"roundRNumber\":%u, "
"\"detType\":%u, "
"\"version\":%u"
"}";
DataStreamer::DataStreamer(Fifo*& f, uint32_t* dr, uint32_t* freq, uint32_t* timer, int* sEnable) : DataStreamer::DataStreamer(Fifo*& f, uint32_t* dr, uint32_t* freq, uint32_t* timer, int* sEnable) :
ThreadObject(NumberofDataStreamers), ThreadObject(NumberofDataStreamers),
@ -232,9 +208,6 @@ void DataStreamer::StopProcessing(char* buf) {
if (!SendHeader(header, true)) if (!SendHeader(header, true))
cprintf(RED,"Error: Could not send zmq dummy header for streamer %d\n", index); cprintf(RED,"Error: Could not send zmq dummy header for streamer %d\n", index);
if (!zmqSocket->SendData((char*)DUMMY_MSG, DUMMY_MSG_SIZE))
cprintf(RED,"Error: Could not send zmq dummy message for streamer %d\n", index);
fifo->FreeAddress(buf); fifo->FreeAddress(buf);
StopRunning(); StopRunning();
#ifdef VERBOSE #ifdef VERBOSE
@ -324,20 +297,12 @@ int DataStreamer::SendHeader(sls_detector_header* header, bool dummy) {
uint64_t acquisitionIndex = header->frameNumber - firstAcquisitionIndex; uint64_t acquisitionIndex = header->frameNumber - firstAcquisitionIndex;
uint32_t subframeIndex = header->expLength; uint32_t subframeIndex = header->expLength;
char buf[1000] = ""; return zmqSocket->SendHeaderData(SLS_DETECTOR_JSON_HEADER_VERSION, *dynamicRange,
generalData->nPixelsX_Streamer, generalData->nPixelsY_Streamer,
acquisitionIndex, frameIndex, fileNametoStream, dummy,
if (dummy) {
frameIndex = -1;
acquisitionIndex = -1;
subframeIndex = -1;
}
int len = sprintf(buf, jsonHeaderFormat,
SLS_DETECTOR_JSON_HEADER_VERSION, *dynamicRange, generalData->nPixelsX_Streamer, generalData->nPixelsY_Streamer, acquisitionIndex, frameIndex, fileNametoStream,
header->frameNumber, header->expLength, header->packetNumber, header->bunchId, header->timestamp, header->frameNumber, header->expLength, header->packetNumber, header->bunchId, header->timestamp,
header->modId, header->xCoord, header->yCoord, header->zCoord, header->debug, header->roundRNumber, header->detType, header->version); header->modId, header->xCoord, header->yCoord, header->zCoord, header->debug, header->roundRNumber,
#ifdef VERBOSE header->detType, header->version
printf("%d Streamer: buf:%s\n", index, buf); );
#endif
return zmqSocket->SendHeaderData(buf, len);
} }

View File

@ -444,8 +444,9 @@ int UDPStandardImplementation::startReceiver(char *c) {
if (startAcquisitionCallBack) { if (startAcquisitionCallBack) {
startAcquisitionCallBack(filePath, fileName, fileIndex, startAcquisitionCallBack(filePath, fileName, fileIndex,
(generalData->fifoBufferSize) * numberofJobs + (generalData->fifoBufferHeaderSize), pStartAcquisition); (generalData->fifoBufferSize) * numberofJobs + (generalData->fifoBufferHeaderSize), pStartAcquisition);
if (rawDataReadyCallBack != NULL) if (rawDataReadyCallBack != NULL) {
cout << "Data Write has been defined externally" << endl; cout << "Data Write has been defined externally" << endl;
}
} }
//processor->writer //processor->writer