From 447c5bb8fe45a77fb801f07b51f2334b2c7d6785 Mon Sep 17 00:00:00 2001 From: Dhanya Maliakal Date: Tue, 28 Feb 2017 13:42:56 +0100 Subject: [PATCH] gui works --- slsReceiverSoftware/include/DataStreamer.h | 13 +- slsReceiverSoftware/include/ZmqSocket.h | 178 +++++++++++++++--- slsReceiverSoftware/include/genericSocket.h | 2 +- slsReceiverSoftware/src/DataProcessor.cpp | 4 +- slsReceiverSoftware/src/DataStreamer.cpp | 75 +++++--- slsReceiverSoftware/src/Listener.cpp | 8 +- slsReceiverSoftware/src/ThreadObject.cpp | 2 +- .../src/UDPBaseImplementation.cpp | 36 ++-- .../src/UDPStandardImplementation.cpp | 30 +-- .../src/slsReceiverTCPIPInterface.cpp | 1 + 10 files changed, 254 insertions(+), 95 deletions(-) diff --git a/slsReceiverSoftware/include/DataStreamer.h b/slsReceiverSoftware/include/DataStreamer.h index e8150f1c5..4aebc565b 100644 --- a/slsReceiverSoftware/include/DataStreamer.h +++ b/slsReceiverSoftware/include/DataStreamer.h @@ -84,11 +84,6 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject { */ void ResetParametersforNewMeasurement(); - /** - * Create Part1 of Json Header which includes common attributes in an acquisition - */ - void CreateHeaderPart1(); - /** * Set GeneralData pointer to the one given * @param g address of GeneralData (Detector Data) pointer @@ -129,6 +124,11 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject { */ bool IsRunning(); + /** + * Create Part1 of Json Header which includes common attributes in an acquisition + */ + void CreateHeaderPart1(); + /** * Record First Indices (firstAcquisitionIndex, firstMeasurementIndex) * @param fnum frame index to record @@ -172,9 +172,10 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject { /** * Create and send Json Header * @param fnum frame number + * @param dummy true if its a dummy header * @returns 0 if error, else 1 */ - int SendHeader(uint64_t fnum); + int SendHeader(uint64_t fnum, bool dummy = false); /** type of thread */ static const std::string TypeName; diff --git a/slsReceiverSoftware/include/ZmqSocket.h b/slsReceiverSoftware/include/ZmqSocket.h index 2d1163641..b83b9907d 100644 --- a/slsReceiverSoftware/include/ZmqSocket.h +++ b/slsReceiverSoftware/include/ZmqSocket.h @@ -10,10 +10,15 @@ #include "ansi.h" #include -#include //json header in zmq stream #include +#include //gethostbyname() +#include //inet_ntoa +#include //json header in zmq stream +using namespace rapidjson; -#define DEFAULT_ZMQ_PORTNO 70001 +#define DEFAULT_ZMQ_PORTNO 70001 +#define DUMMY_MSG_SIZE 3 +#define DUMMY_MSG "end" class ZmqSocket { @@ -33,20 +38,25 @@ public: * @param hostname hostname or ip of server * @param portnumber port number */ - ZmqSocket (const char* const hostname, const uint32_t portnumber): + ZmqSocket (const char* const hostname_or_ip, const uint32_t portnumber): portno (portnumber), server (false), contextDescriptor (NULL), socketDescriptor (NULL) { + char ip[MAX_STR_LENGTH] = ""; + strcpy(ip, hostname_or_ip); + // construct address - if (strchr (hostname, '.') != NULL) { + if (strchr (hostname_or_ip, '.') != NULL) { // convert hostname to ip - hostname = ConvertHostnameToIp (hostname); - if (hostname == NULL) + char* ptr = ConvertHostnameToIp (hostname_or_ip); + if (ptr == NULL) return; + strcpy(ip, ptr); + delete ptr; } - sprintf (serverAddress, "tcp://%s:%d", hostname, portno); + sprintf (serverAddress, "tcp://%s:%d", ip, portno); // create context contextDescriptor = zmq_ctx_new(); @@ -63,7 +73,7 @@ public: //Socket Options provided above //connect socket - if (zmq_connect(socketDescriptor, serverAddress)) { + if (zmq_connect(socketDescriptor, serverAddress) < 0) { PrintError (); Close (); } @@ -97,7 +107,7 @@ public: // construct address sprintf (serverAddress,"tcp://*:%d", portno); // bind address - if (zmq_bind (socketDescriptor, serverAddress)) { + if (zmq_bind (socketDescriptor, serverAddress) < 0) { PrintError (); Close (); } @@ -113,9 +123,9 @@ public: /** * Returns error status - * @returns 1 if error else 0 + * @returns true if error else false */ - int GetErrorStatus() { if (socketDescriptor == NULL) return 1; return 0; }; + bool IsError() { if (socketDescriptor == NULL) return true; return false; }; /** * Returns Server Address @@ -134,7 +144,7 @@ public: * @reutns Socket descriptor */ - int GetsocketDescriptor () { return socketDescriptor; }; + void* GetsocketDescriptor () { return socketDescriptor; }; /** * Unbinds the Socket @@ -179,40 +189,153 @@ public: * @returns 0 if error, else 1 */ int SendHeaderData (char* buf, int length) { - if(!zmq_send (socketDescriptor, buf, length, ZMQ_SNDMORE)) - return 1; - PrintError(); - return 0; + if(zmq_send (socketDescriptor, buf, length, ZMQ_SNDMORE) < 0) { + PrintError (); + return 0; + } + return 1; }; /** * Send Message Body * @returns 0 if error, else 1 */ - int SendDataOnly (char* buf, int length) { - if(!zmq_send (socketDescriptor, buf, length, 0)) - return 1; - PrintError (); - return 0; + int SendData (char* buf, int length) { + if(zmq_send (socketDescriptor, buf, length, 0) < 0) { + PrintError (); + return 0; + } + return 1; }; + /** - * Receive Message (header/data) + * Receive Message * @param index self index for debugging * @returns length of message, -1 if error */ - int ReceiveData(int index, zmq_msg_t& message) { - // scan header - zmq_msg_init (&message); + int ReceiveMessage(const int index) { int length = zmq_msg_recv (&message, socketDescriptor, 0); if (length == -1) { PrintError (); cprintf (BG_RED,"Error: Could not read header for socket %d\n",index); - zmq_msg_close (&message); } return length; }; + + /** + * Receive Header + * @param index self index for debugging + * @param acqIndex address of acquisition index + * @param frameIndex address of frame index + * @param subframeIndex address of subframe index + * @param filename address of file name + * @returns 0 if error, else 1 + */ + int ReceiveHeader(const int index, uint64_t &acqIndex, + uint64_t &frameIndex, uint64_t &subframeIndex, string &filename) + { + zmq_msg_init (&message); + if (ReceiveMessage(index) > 0) { + if (ParseHeader(index, acqIndex, frameIndex, subframeIndex, filename)) { + zmq_msg_close(&message); +#ifdef VERBOSE + cprintf(BLUE,"%d header rxd\n",index); +#endif + return 1; + } + } + zmq_msg_close(&message); + return 0; + }; + + /** + * Receive Data + * @param index self index for debugging + * @param buf buffer to copy image data to + * @param size size of image + * @returns 0 if error, else 1 + */ + int ReceiveData(const int index, int* buf, const int size) + { + zmq_msg_init (&message); + int length = ReceiveMessage(index); + + //dummy + if (length == DUMMY_MSG_SIZE) { +#ifdef VERBOSE + cprintf(RED,"%d Received end of acquisition\n", index); +#endif + zmq_msg_close(&message); + return 0; + } + + //actual data + if (length == size) { +#ifdef VERBOSE + cprintf(BLUE,"%d actual data\n", index); +#endif + memcpy((char*)buf, (char*)zmq_msg_data(&message), size); + } + + //incorrect size + else { + cprintf(RED,"Error: Received weird packet size %d for socket %d\n", length, index); + memset((char*)buf,0xFF,size); + } + + zmq_msg_close(&message); + return 1; + }; + + + /** + * Parse Header + * @param index self index for debugging + * @param acqIndex address of acquisition index + * @param frameIndex address of frame index + * @param subframeIndex address of subframe index + * @param filename address of file name + */ + int ParseHeader(const int index, uint64_t &acqIndex, + uint64_t &frameIndex, uint64_t &subframeIndex, string &filename) + { + Document d; + if (d.Parse( (char*)zmq_msg_data(&message), zmq_msg_size(&message)).HasParseError()) { + cprintf (RED,"Error: Could not parse header for socket %d\n",index); + return 0; + } +#ifdef VERYVERBOSE + // htype is an array of strings + rapidjson::Value::Array htype = d["htype"].GetArray(); + for (int i = 0; i < htype.Size(); i++) + printf("%d: htype: %s\n", index, htype[i].GetString()); + // shape is an array of ints + rapidjson::Value::Array shape = d["shape"].GetArray(); + printf("%d: shape: ", index); + for (int i = 0; i < shape.Size(); i++) + printf("%d: %d ", index, shape[i].GetInt()); + printf("\n"); + printf("%d: type: %s\n", index, d["type"].GetString()); +#endif + + if(d["acqIndex"].GetInt()!=-1){ + acqIndex = d["acqIndex"].GetInt(); + frameIndex = d["fIndex"].GetInt(); + subframeIndex = d["subfnum"].GetInt(); + filename = d["fname"].GetString(); +#ifdef VERYVERBOSE + cout << "Acquisition index: " << acqIndex << endl; + cout << "Frame index: " << frameIndex << endl; + cout << "Subframe index: " << subframeIndex << endl; + cout << "File name: " << filename << endl; +#endif + } + return 1; + }; + + /** * Print error */ @@ -285,4 +408,7 @@ private: /** Server Address */ char serverAddress[1000]; + + /** Zmq Message */ + zmq_msg_t message; }; diff --git a/slsReceiverSoftware/include/genericSocket.h b/slsReceiverSoftware/include/genericSocket.h index c344c8eb8..61e6a4eb7 100644 --- a/slsReceiverSoftware/include/genericSocket.h +++ b/slsReceiverSoftware/include/genericSocket.h @@ -71,7 +71,7 @@ using namespace std; #define DEFAULT_BACKLOG 5 #define DEFAULT_UDP_PORTNO 50001 #define DEFAULT_GUI_PORTNO 65000 -//#define DEFAULT_ZMQ_PORTNO 70001 +#define DEFAULT_ZMQ_PORTNO 70001 class genericSocket{ diff --git a/slsReceiverSoftware/src/DataProcessor.cpp b/slsReceiverSoftware/src/DataProcessor.cpp index a9a65d764..9eed48b66 100644 --- a/slsReceiverSoftware/src/DataProcessor.cpp +++ b/slsReceiverSoftware/src/DataProcessor.cpp @@ -284,7 +284,9 @@ void DataProcessor::StopProcessing(char* buf) { file->CloseCurrentFile(); StopRunning(); - cprintf(BLUE,"%d: Processing Completed\n", index); +#ifdef VERBOSE + printf("%d: Processing Completed\n", index); +#endif } diff --git a/slsReceiverSoftware/src/DataStreamer.cpp b/slsReceiverSoftware/src/DataStreamer.cpp index 7b085714f..28933c205 100644 --- a/slsReceiverSoftware/src/DataStreamer.cpp +++ b/slsReceiverSoftware/src/DataStreamer.cpp @@ -27,7 +27,7 @@ const char* DataStreamer::jsonHeaderFormat_part1 = "{" "\"htype\":[\"chunk-1.0\"], " "\"type\":\"%s\", " - "\"shape\":%s, "; + "\"shape\":[%d, %d], "; const char* DataStreamer::jsonHeaderFormat = "%s" @@ -52,7 +52,6 @@ DataStreamer::DataStreamer(Fifo*& f, uint32_t* dr, uint32_t* freq, uint32_t* tim firstAcquisitionIndex(0), firstMeasurementIndex(0) { - memset(timerBegin, 0xFF, sizeof(timespec)); if(ThreadObject::CreateThread()){ pthread_mutex_lock(&Mutex); ErrorMask ^= (1<nPixelsX, generalData->nPixelsY); - - sprintf(currentHeader, jsonHeaderFormat_part1, type, shape); + sprintf(currentHeader, jsonHeaderFormat_part1, + type, generalData->nPixelsX, generalData->nPixelsY); +#ifdef VERBOSE + cprintf(BLUE, "%d currentheader: %s\n", index, currentHeader); +#endif } @@ -182,14 +185,13 @@ int DataStreamer::SetThreadPriority(int priority) { int DataStreamer::CreateZmqSockets(int* dindex, int* nunits) { uint32_t portnum = DEFAULT_ZMQ_PORTNO + ((*dindex) * (*nunits) + index); - printf("%d Streamer: Port number: %d\n", index, portnum); zmqSocket = new ZmqSocket(portnum); - if (zmqSocket->GetErrorStatus()) { + if (zmqSocket->IsError()) { cprintf(RED, "Error: Could not create Zmq socket on port %d for Streamer %d\n", portnum, index); return FAIL; } - printf("%d Streamer: Zmq Server started at %s\n",zmqSocket->GetZmqServerAddress()); + printf("%d Streamer: Zmq Server started at %s\n",index, zmqSocket->GetZmqServerAddress()); return OK; } @@ -226,9 +228,19 @@ void DataStreamer::ThreadExecution() { void DataStreamer::StopProcessing(char* buf) { + + //send dummy header and data + if (!SendHeader(0, true)) + cprintf(RED,"Error: Could not send zmq dummy header for streamer %d\n", index); + + if (!zmqSocket->SendData(DUMMY_MSG, DUMMY_MSG_SIZE)) + cprintf(RED,"Error: Could not send zmq dummy message for streamer %d\n", index); + fifo->FreeAddress(buf); StopRunning(); - cprintf(MAGENTA,"%d: Streaming Completed\n", index); +#ifdef VERBOSE + printf("%d: Streaming Completed\n", index); +#endif } @@ -258,14 +270,17 @@ void DataStreamer::ProcessAnImage(char* buf) { return; } - if(!SendHeader(fnum)) + if (!SendHeader(fnum)) cprintf(RED,"Error: Could not send zmq header for fnum %lld and streamer %d\n", (long long int) fnum, index); - - Send Datat(); + if (!zmqSocket->SendData(buf + FILE_FRAME_HEADER_SIZE, generalData->imageSize)) + cprintf(RED,"Error: Could not send zmq data for fnum %lld and streamer %d\n", + (long long int) fnum, index); } + + bool DataStreamer::CheckTimer() { struct timespec end; clock_gettime(CLOCK_REALTIME, &end); @@ -273,7 +288,7 @@ bool DataStreamer::CheckTimer() { cprintf(BLUE,"%d Timer elapsed time:%f seconds\n", index, ( end.tv_sec - timerBegin.tv_sec ) + ( end.tv_nsec - timerBegin.tv_nsec ) / 1000000000.0); #endif //still less than streaming timer, keep waiting - if((( end.tv_sec - timerBegin.tv_sec ) + ( end.tv_nsec - timerBegin.tv_nsec ) / 1000000000.0) < (streamingTimerInMs/1000)) + if((( end.tv_sec - timerBegin.tv_sec ) + ( end.tv_nsec - timerBegin.tv_nsec ) / 1000000000.0) < (*streamingTimerInMs/1000)) return false; //restart timer @@ -281,6 +296,7 @@ bool DataStreamer::CheckTimer() { return true; } + bool DataStreamer::CheckCount() { if (currentFreqCount == *streamingFrequency ) { currentFreqCount = 1; @@ -290,11 +306,24 @@ bool DataStreamer::CheckCount() { return false; } -int DataStreamer::SendHeader(uint64_t fnum) { - uint64_t frameIndex = fnum - firstMeasurementIndex; - uint64_t acquisitionIndex = fnum - firstAcquisitionIndex; - uint64_t subframeIndex = -1; /* subframe to be included in fifo buffer? */ - char buf[1000]; - int len = sprintf(buf, jsonHeaderFormat, jsonHeaderFormat_part1, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]); - return zmqSocket->SendDataOnly(buf, len); + +int DataStreamer::SendHeader(uint64_t fnum, bool dummy) { + uint64_t frameIndex = -1; + uint64_t acquisitionIndex = -1; + uint64_t subframeIndex = -1; + char fname[MAX_STR_LENGTH] = "run"; + char buf[1000] = ""; + + if (!dummy) { + frameIndex = fnum - firstMeasurementIndex; + acquisitionIndex = fnum - firstAcquisitionIndex; + subframeIndex = -1; /* subframe to be included in fifo buffer? */ + /* fname to be included in fifo buffer? */ + } + + int len = sprintf(buf, jsonHeaderFormat, currentHeader, acquisitionIndex, frameIndex, subframeIndex,fname); +#ifdef VERBOSE + printf("%d Streamer: buf:%s\n", index, buf); +#endif + return zmqSocket->SendHeaderData(buf, len); } diff --git a/slsReceiverSoftware/src/Listener.cpp b/slsReceiverSoftware/src/Listener.cpp index d05d09c5c..7ab0a34ca 100644 --- a/slsReceiverSoftware/src/Listener.cpp +++ b/slsReceiverSoftware/src/Listener.cpp @@ -164,7 +164,7 @@ void Listener::RecordFirstIndices(uint64_t fnum) { acquisitionStartedFlag = true; firstAcquisitionIndex = fnum; } - if (!index) cprintf(GREEN,"%d First Acquisition Index:%lld\n" + if (!index) cprintf(BLUE,"%d First Acquisition Index:%lld\n" "%d First Measurement Index:%lld\n", index, (long long int)firstAcquisitionIndex, index, (long long int)firstMeasurementIndex); @@ -205,7 +205,7 @@ int Listener::CreateUDPSockets() { generalData->packetSize, (strlen(eth)?eth:NULL), generalData->headerPacketSize); int iret = udpSocket->getErrorStatus(); if(!iret){ - cout << "UDP port opened at port " << *udpPortNumber << endl; + cout << index << ": UDP port opened at port " << *udpPortNumber << endl; }else{ FILE_LOG(logERROR) << "Could not create UDP socket on port " << *udpPortNumber << " error: " << iret; return FAIL; @@ -218,7 +218,7 @@ int Listener::CreateUDPSockets() { void Listener::ShutDownUDPSocket() { if(udpSocket){ udpSocket->ShutDownSocket(); - FILE_LOG(logINFO) << "Shut down UDP Socket " << index; + FILE_LOG(logINFO) << "Shut down of UDP port " << *udpPortNumber; delete udpSocket; udpSocket = 0; } @@ -280,8 +280,8 @@ void Listener::StopListening(char* buf) { StopRunning(); #ifdef VERBOSE cprintf(GREEN,"%d: Listening Packets (%d) : %d\n", index, *udpPortNumber, numPacketsCaught); + printf("%d: Listening Completed\n", index); #endif - cprintf(GREEN,"%d: Listening Completed\n", index); } diff --git a/slsReceiverSoftware/src/ThreadObject.cpp b/slsReceiverSoftware/src/ThreadObject.cpp index e1798517a..5b18bb7ba 100644 --- a/slsReceiverSoftware/src/ThreadObject.cpp +++ b/slsReceiverSoftware/src/ThreadObject.cpp @@ -61,7 +61,7 @@ int ThreadObject::CreateThread() { return FAIL; } alive = true; - FILE_LOG (logINFO) << GetType() << " thread " << index << " created successfully."; + FILE_LOG (logDEBUG) << GetType() << " thread " << index << " created successfully."; return OK; } diff --git a/slsReceiverSoftware/src/UDPBaseImplementation.cpp b/slsReceiverSoftware/src/UDPBaseImplementation.cpp index 22a9ab1fb..e01f21aaa 100644 --- a/slsReceiverSoftware/src/UDPBaseImplementation.cpp +++ b/slsReceiverSoftware/src/UDPBaseImplementation.cpp @@ -254,7 +254,7 @@ void UDPBaseImplementation::setFileFormat(const fileFormat f){ break; } - FILE_LOG(logINFO) << "File Format:" << getFileFormatType(fileFormatType); + FILE_LOG(logINFO) << "File Format: " << getFileFormatType(fileFormatType); } @@ -263,7 +263,7 @@ void UDPBaseImplementation::setFileName(const char c[]){ if(strlen(c)) strcpy(fileName, c); - FILE_LOG(logINFO) << "File name:" << fileName; + FILE_LOG(logINFO) << "File name: " << fileName; } void UDPBaseImplementation::setFilePath(const char c[]){ @@ -276,18 +276,18 @@ void UDPBaseImplementation::setFilePath(const char c[]){ strcpy(filePath,c); else{ strcpy(filePath,""); - FILE_LOG(logWARNING) << "FilePath does not exist:" << filePath; + FILE_LOG(logWARNING) << "FilePath does not exist: " << filePath; } strcpy(filePath, c); } - FILE_LOG(logDEBUG) << "Info: File path:" << filePath; + FILE_LOG(logDEBUG) << "Info: File path: " << filePath; } void UDPBaseImplementation::setFileIndex(const uint64_t i){ FILE_LOG(logDEBUG) << __AT__ << " starting"; fileIndex = i; - FILE_LOG(logINFO) << "File Index:" << fileIndex; + FILE_LOG(logINFO) << "File Index: " << fileIndex; } //FIXME: needed? @@ -295,7 +295,7 @@ void UDPBaseImplementation::setScanTag(const int i){ FILE_LOG(logDEBUG) << __AT__ << " starting"; scanTag = i; - FILE_LOG(logINFO) << "Scan Tag:" << scanTag; + FILE_LOG(logINFO) << "Scan Tag: " << scanTag; } @@ -336,14 +336,14 @@ void UDPBaseImplementation::setUDPPortNumber(const uint32_t i){ FILE_LOG(logDEBUG) << __AT__ << " starting"; udpPortNum[0] = i; - FILE_LOG(logINFO) << "UDP Port Number[0]:" << udpPortNum[0]; + FILE_LOG(logINFO) << "UDP Port Number[0]: " << udpPortNum[0]; } void UDPBaseImplementation::setUDPPortNumber2(const uint32_t i){ FILE_LOG(logDEBUG) << __AT__ << " starting"; udpPortNum[1] = i; - FILE_LOG(logINFO) << "UDP Port Number[1]:" << udpPortNum[1]; + FILE_LOG(logINFO) << "UDP Port Number[1]: " << udpPortNum[1]; } void UDPBaseImplementation::setEthernetInterface(const char* c){ @@ -368,7 +368,7 @@ int UDPBaseImplementation::setFrameToGuiFrequency(const uint32_t freq){ FILE_LOG(logDEBUG) << __AT__ << " starting"; frameToGuiFrequency = freq; - FILE_LOG(logINFO) << "Frame To Gui Frequency:" << frameToGuiFrequency; + FILE_LOG(logINFO) << "Frame To Gui Frequency: " << frameToGuiFrequency; //overrridden child classes might return FAIL return OK; @@ -378,7 +378,7 @@ void UDPBaseImplementation::setFrameToGuiTimer(const uint32_t time_in_ms){ FILE_LOG(logDEBUG) << __AT__ << " starting"; frameToGuiTimerinMS = time_in_ms; - FILE_LOG(logINFO) << "Frame To Gui Timer:" << frameToGuiTimerinMS; + FILE_LOG(logINFO) << "Frame To Gui Timer: " << frameToGuiTimerinMS; } @@ -386,7 +386,7 @@ int UDPBaseImplementation::setDataStreamEnable(const bool enable){ FILE_LOG(logDEBUG) << __AT__ << " starting"; dataStreamEnable = enable; - FILE_LOG(logINFO) << "Streaming Data from Receiver:" << dataStreamEnable; + FILE_LOG(logINFO) << "Streaming Data from Receiver: " << dataStreamEnable; //overrridden child classes might return FAIL return OK; @@ -397,7 +397,7 @@ int UDPBaseImplementation::setAcquisitionPeriod(const uint64_t i){ FILE_LOG(logDEBUG) << __AT__ << " starting"; acquisitionPeriod = i; - FILE_LOG(logINFO) << "Acquisition Period:" << (double)acquisitionPeriod/(1E9) << "s"; + FILE_LOG(logINFO) << "Acquisition Period: " << (double)acquisitionPeriod/(1E9) << "s"; //overrridden child classes might return FAIL return OK; @@ -407,7 +407,7 @@ int UDPBaseImplementation::setAcquisitionTime(const uint64_t i){ FILE_LOG(logDEBUG) << __AT__ << " starting"; acquisitionTime = i; - FILE_LOG(logINFO) << "Acquisition Time:" << (double)acquisitionTime/(1E9) << "s"; + FILE_LOG(logINFO) << "Acquisition Time: " << (double)acquisitionTime/(1E9) << "s"; //overrridden child classes might return FAIL return OK; @@ -417,7 +417,7 @@ int UDPBaseImplementation::setNumberOfFrames(const uint64_t i){ FILE_LOG(logDEBUG) << __AT__ << " starting"; numberOfFrames = i; - FILE_LOG(logINFO) << "Number of Frames:" << numberOfFrames; + FILE_LOG(logINFO) << "Number of Frames: " << numberOfFrames; //overrridden child classes might return FAIL return OK; @@ -427,7 +427,7 @@ int UDPBaseImplementation::setDynamicRange(const uint32_t i){ FILE_LOG(logDEBUG) << __AT__ << " starting"; dynamicRange = i; - FILE_LOG(logINFO) << "Dynamic Range:" << dynamicRange; + FILE_LOG(logINFO) << "Dynamic Range: " << dynamicRange; //overrridden child classes might return FAIL return OK; @@ -465,7 +465,7 @@ int UDPBaseImplementation::setDetectorType(const detectorType d){ myDetectorType = d; //if eiger, set numberofListeningThreads = 2; - FILE_LOG(logINFO) << "Detector Type:" << getDetectorType(d); + FILE_LOG(logINFO) << "Detector Type: " << getDetectorType(d); return OK; } @@ -473,7 +473,7 @@ void UDPBaseImplementation::setDetectorPositionId(const int i){ FILE_LOG(logDEBUG) << __AT__ << " starting"; detID = i; - FILE_LOG(logINFO) << "Detector Position Id:" << detID; + FILE_LOG(logINFO) << "Detector Position Id: " << detID; } void UDPBaseImplementation::initialize(const char *c){ @@ -481,7 +481,7 @@ void UDPBaseImplementation::initialize(const char *c){ if(strlen(c)) strcpy(detHostname, c); - FILE_LOG(logINFO) << "Detector Hostname:" << detHostname; + FILE_LOG(logINFO) << "Detector Hostname: " << detHostname; } diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index 3c5572609..499d20460 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -216,9 +216,9 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable) { if (enable) { bool error = false; for ( int i = 0; i < numThreads; ++i ) { - dataStreamer.push_back(new DataStreamer(fifo[i], &frameToGuiFrequency, &frameToGuiTimerinMS, &dynamicRange)); + dataStreamer.push_back(new DataStreamer(fifo[i], &dynamicRange, &frameToGuiFrequency, &frameToGuiTimerinMS)); dataStreamer[i]->SetGeneralData(generalData); - if (dataStreamer[i]->CreateZmqSockets() == FAIL) { + if (dataStreamer[i]->CreateZmqSockets(&detID, &numThreads) == FAIL) { error = true; break; } @@ -478,6 +478,9 @@ int UDPStandardImplementation::VerifyCallBackAction() { } int UDPStandardImplementation::startReceiver(char *c) { + cout << endl << endl; + FILE_LOG(logINFO) << "Starting Receiver"; + ResetParametersforNewMeasurement(); //listener @@ -486,7 +489,6 @@ int UDPStandardImplementation::startReceiver(char *c) { FILE_LOG(logERROR) << c; return FAIL; } - cout << "Listener Ready ..." << endl; //callbacks callbackAction = DO_EVERYTHING; @@ -502,9 +504,9 @@ int UDPStandardImplementation::startReceiver(char *c) { return FAIL; } } else - cout << "Data will not be saved" << endl; - cout << "Processor Ready ..." << endl; + cout << " Data will not be saved" << endl; + cout << "Ready ..." << endl; //status pthread_mutex_lock(&statusMutex); @@ -550,15 +552,15 @@ void UDPStandardImplementation::stopReceiver(){ tot += dataProcessor[i]->GetNumFramesCaught(); if (dataProcessor[i]->GetNumFramesCaught() < numberOfFrames) { - cprintf(RED, "\nPort %d\n",udpPortNum[i]); - cprintf(RED, "Missing Packets \t: %lld\n",(long long int)numberOfFrames*generalData->packetsPerFrame-listener[i]->GetTotalPacketsCaught()); - cprintf(RED, "Frames Caught \t\t: %lld\n",(long long int)dataProcessor[i]->GetNumFramesCaught()); - cprintf(RED, "Last Frame Number Caught :%lld\n",(long long int)listener[i]->GetLastFrameIndexCaught()); + cprintf(RED, "\n[Port %d]\n",udpPortNum[i]); + cprintf(RED, "Missing Packets\t\t: %lld\n",(long long int)numberOfFrames*generalData->packetsPerFrame-listener[i]->GetTotalPacketsCaught()); + cprintf(RED, "Frames Caught\t\t: %lld\n",(long long int)dataProcessor[i]->GetNumFramesCaught()); + cprintf(RED, "Last Frame Caught\t: %lld\n",(long long int)listener[i]->GetLastFrameIndexCaught()); }else{ - cprintf(GREEN, "\nPort %d\n",udpPortNum[i]); - cprintf(GREEN, "Missing Packets \t: %lld\n",(long long int)numberOfFrames*generalData->packetsPerFrame-listener[i]->GetTotalPacketsCaught()); - cprintf(GREEN, "Frames Caught \t\t: %lld\n",(long long int)dataProcessor[i]->GetNumFramesCaught()); - cprintf(GREEN, "Last Frame Number Caught :%lld\n",(long long int)listener[i]->GetLastFrameIndexCaught()); + cprintf(GREEN, "\n[Port %d]\n",udpPortNum[i]); + cprintf(GREEN, "Missing Packets\t\t: %lld\n",(long long int)numberOfFrames*generalData->packetsPerFrame-listener[i]->GetTotalPacketsCaught()); + cprintf(GREEN, "Frames Caught\t\t: %lld\n",(long long int)dataProcessor[i]->GetNumFramesCaught()); + cprintf(GREEN, "Last Frame Caught\t: %lld\n",(long long int)listener[i]->GetLastFrameIndexCaught()); } } if(!activated) @@ -575,7 +577,6 @@ void UDPStandardImplementation::stopReceiver(){ FILE_LOG(logINFO) << "Receiver Stopped"; FILE_LOG(logINFO) << "Status: " << runStatusType(status); - cout << endl << endl; } @@ -816,7 +817,6 @@ int UDPStandardImplementation::SetupWriter() { return FAIL; } - cout << "Writer Ready ..." << endl; return OK; } diff --git a/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp b/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp index 2aca8d2df..68648c758 100644 --- a/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp +++ b/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp @@ -2339,6 +2339,7 @@ int slsReceiverTCPIPInterface::set_multi_detector_size() { else if (receiverBase == NULL){ strcpy(mess,SET_RECEIVER_ERR_MESSAGE); ret=FAIL; + cprintf(RED, "%s", mess); } else if(receiverBase->getStatus()!= IDLE){ strcpy(mess,"Can not set position file id while receiver not idle\n");