From 829ba49c1c585f5a9de43948617ab610cd18afb0 Mon Sep 17 00:00:00 2001 From: Dhanya Maliakal Date: Tue, 28 Feb 2017 15:30:53 +0100 Subject: [PATCH] moving subframenumber into a 32 bit number, also in stremer, changing json header to mention 4 bit and version number --- slsReceiverSoftware/include/DataStreamer.h | 3 +- slsReceiverSoftware/include/GeneralData.h | 53 ++++++++++++++------- slsReceiverSoftware/include/ZmqSocket.h | 17 ++++--- slsReceiverSoftware/include/receiver_defs.h | 11 +++-- slsReceiverSoftware/src/DataProcessor.cpp | 1 - slsReceiverSoftware/src/DataStreamer.cpp | 21 ++++---- slsReceiverSoftware/src/Listener.cpp | 23 +++++++-- 7 files changed, 81 insertions(+), 48 deletions(-) diff --git a/slsReceiverSoftware/include/DataStreamer.h b/slsReceiverSoftware/include/DataStreamer.h index 4aebc565b..26dc19249 100644 --- a/slsReceiverSoftware/include/DataStreamer.h +++ b/slsReceiverSoftware/include/DataStreamer.h @@ -172,10 +172,11 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject { /** * Create and send Json Header * @param fnum frame number + * @param snum sub frame number * @param dummy true if its a dummy header * @returns 0 if error, else 1 */ - int SendHeader(uint64_t fnum, bool dummy = false); + int SendHeader(uint64_t fnum, uint32_t snum, bool dummy = false); /** type of thread */ static const std::string TypeName; diff --git a/slsReceiverSoftware/include/GeneralData.h b/slsReceiverSoftware/include/GeneralData.h index c1cb9ec2f..28d49fc6d 100644 --- a/slsReceiverSoftware/include/GeneralData.h +++ b/slsReceiverSoftware/include/GeneralData.h @@ -82,7 +82,8 @@ public: * @param frameNumber frame number * @param packetNumber packet number */ - virtual void GetHeaderInfo(int index, char* packetData, uint64_t& frameNumber, uint32_t& packetNumber) const { + virtual void GetHeaderInfo(int index, char* packetData, uint64_t& frameNumber, uint32_t& packetNumber) const + { frameNumber = ((uint32_t)(*((uint32_t*)(packetData)))); frameNumber++; packetNumber = frameNumber&packetIndexMask; @@ -93,15 +94,20 @@ public: * Get Header Infomation (frame number, packet number) * @param index thread index for debugging purposes * @param packetData pointer to data - * @param dynamicRange dynamic range to assign subframenumber if 32 bit mode * @param frameNumber frame number * @param packetNumber packet number * @param subFrameNumber sub frame number if applicable * @param bunchId bunch id */ - virtual void GetHeaderInfo(int index, char* packetData, uint32_t dynamicRange, - uint64_t& frameNumber, uint32_t& packetNumber, uint32_t& subFrameNumber, uint64_t bunchId) const { - cprintf(RED,"This is a generic function that should be overloaded by a derived class\n"); + virtual void GetHeaderInfo(int index, char* packetData, uint64_t& frameNumber, + uint32_t& packetNumber, uint32_t& subFrameNumber, uint64_t bunchId) const + { + subFrameNumber = -1; + bunchId = -1; + frameNumber = ((uint32_t)(*((uint32_t*)(packetData)))); + frameNumber++; + packetNumber = frameNumber&packetIndexMask; + frameNumber = (frameNumber & frameIndexMask) >> frameIndexOffset; } /** @@ -368,21 +374,36 @@ private: * Get Header Infomation (frame number, packet number) * @param index thread index for debugging purposes * @param packetData pointer to data - * @param dynamicRange dynamic range to assign subframenumber if 32 bit mode + * @param frameNumber frame number + * @param packetNumber packet number + */ + virtual void GetHeaderInfo(int index, char* packetData, uint64_t& frameNumber, uint32_t& packetNumber) const + { + jfrau_packet_header_t* header = (jfrau_packet_header_t*)(packetData); + frameNumber = (uint64_t)(*( (uint32_t*) header->frameNumber)); + packetNumber = (uint32_t)(*( (uint8_t*) header->packetNumber)); + } + + /** + * Get Header Infomation (frame number, packet number) + * @param index thread index for debugging purposes + * @param packetData pointer to data * @param frameNumber frame number * @param packetNumber packet number * @param subFrameNumber sub frame number if applicable * @param bunchId bunch id */ - void GetHeaderInfo(int index, char* packetData, uint32_t dynamicRange, - uint64_t& frameNumber, uint32_t& packetNumber, uint32_t& subFrameNumber, uint64_t bunchId) const { - subFrameNumber = 0; + void GetHeaderInfo(int index, char* packetData, uint64_t& frameNumber, + uint32_t& packetNumber, uint32_t& subFrameNumber, uint64_t bunchId) const + { + subFrameNumber = -1; jfrau_packet_header_t* header = (jfrau_packet_header_t*)(packetData); frameNumber = (uint64_t)(*( (uint32_t*) header->frameNumber)); packetNumber = (uint32_t)(*( (uint8_t*) header->packetNumber)); bunchId = (*((uint64_t*) header->bunchid)); } + /** * Print all variables */ @@ -454,23 +475,19 @@ private: * Get Header Infomation (frame number, packet number) * @param index thread index for debugging purposes * @param packetData pointer to data - * @param dynamicRange dynamic range to assign subframenumber if 32 bit mode * @param frameNumber frame number * @param packetNumber packet number * @param subFrameNumber sub frame number if applicable * @param bunchId bunch id */ - void GetHeaderInfo(int index, char* packetData, uint32_t dynamicRange, - uint64_t& frameNumber, uint32_t& packetNumber, uint32_t& subFrameNumber, uint64_t bunchId) const { - bunchId = 0; - subFrameNumber = 0; + void GetHeaderInfo(int index, char* packetData, uint64_t& frameNumber, + uint32_t& packetNumber, uint32_t& subFrameNumber, uint64_t bunchId) const { + bunchId = -1; eiger_packet_footer_t* footer = (eiger_packet_footer_t*)(packetData + footerOffset); frameNumber = (uint64_t)((*( (uint64_t*) footer)) & frameIndexMask); packetNumber = (uint32_t)(*( (uint16_t*) footer->packetNumber))-1; - if (dynamicRange == 32) { - eiger_packet_header_t* header = (eiger_packet_header_t*) (packetData); - subFrameNumber = (uint64_t) *( (uint32_t*) header->subFrameNumber); - } + eiger_packet_header_t* header = (eiger_packet_header_t*) (packetData); + subFrameNumber = (uint64_t) *( (uint32_t*) header->subFrameNumber); } /** diff --git a/slsReceiverSoftware/include/ZmqSocket.h b/slsReceiverSoftware/include/ZmqSocket.h index b83b9907d..1f02bd2e0 100644 --- a/slsReceiverSoftware/include/ZmqSocket.h +++ b/slsReceiverSoftware/include/ZmqSocket.h @@ -234,7 +234,7 @@ public: * @returns 0 if error, else 1 */ int ReceiveHeader(const int index, uint64_t &acqIndex, - uint64_t &frameIndex, uint64_t &subframeIndex, string &filename) + uint64_t &frameIndex, uint32_t &subframeIndex, string &filename) { zmq_msg_init (&message); if (ReceiveMessage(index) > 0) { @@ -299,7 +299,7 @@ public: * @param filename address of file name */ int ParseHeader(const int index, uint64_t &acqIndex, - uint64_t &frameIndex, uint64_t &subframeIndex, string &filename) + uint64_t &frameIndex, uint32_t &subframeIndex, string &filename) { Document d; if (d.Parse( (char*)zmq_msg_data(&message), zmq_msg_size(&message)).HasParseError()) { @@ -307,23 +307,22 @@ public: return 0; } #ifdef VERYVERBOSE - // htype is an array of strings - rapidjson::Value::Array htype = d["htype"].GetArray(); - for (int i = 0; i < htype.Size(); i++) - printf("%d: htype: %s\n", index, htype[i].GetString()); + printf("version:%.1f\n", d["version"].GetDouble()); + // shape is an array of ints rapidjson::Value::Array shape = d["shape"].GetArray(); printf("%d: shape: ", index); for (int i = 0; i < shape.Size(); i++) printf("%d: %d ", index, shape[i].GetInt()); printf("\n"); + printf("%d: type: %s\n", index, d["type"].GetString()); #endif if(d["acqIndex"].GetInt()!=-1){ - acqIndex = d["acqIndex"].GetInt(); - frameIndex = d["fIndex"].GetInt(); - subframeIndex = d["subfnum"].GetInt(); + acqIndex = d["acqIndex"].GetUint64(); + frameIndex = d["fIndex"].GetUint64(); + subframeIndex = d["subfnum"].GetUint(); filename = d["fname"].GetString(); #ifdef VERYVERBOSE cout << "Acquisition index: " << acqIndex << endl; diff --git a/slsReceiverSoftware/include/receiver_defs.h b/slsReceiverSoftware/include/receiver_defs.h index 3f72ec253..8ad54f132 100755 --- a/slsReceiverSoftware/include/receiver_defs.h +++ b/slsReceiverSoftware/include/receiver_defs.h @@ -14,19 +14,22 @@ #define CREATE_FILES 1 #define DO_EVERYTHING 2 -//binary -#define FILE_FRAME_HEADER_SIZE 16 +//binary file/ fifo +#define FILE_FRAME_HDR_FNUM_SIZE 8 +#define FILE_FRAME_HDR_SNUM_SIZE 4 +#define FILE_FRAME_HDR_BID_SIZE 8 +#define FILE_FRAME_HEADER_SIZE (FILE_FRAME_HDR_FNUM_SIZE + FILE_FRAME_HDR_SNUM_SIZE + FILE_FRAME_HDR_BID_SIZE) +#define FIFO_HEADER_NUMBYTES 4 #define FILE_BUFFER_SIZE (16*1024*1024) //16mb //hdf5 #define MAX_CHUNKED_IMAGES 1 //versions +#define STREAMER_VERSION 1.0 #define HDF5_WRITER_VERSION 1.0 #define BINARY_WRITER_VERSION 1.0 //1 decimal places -//fifo -#define FIFO_HEADER_NUMBYTES 4 //parameters to calculate fifo depth #define SAMPLE_TIME_IN_NS 100000000//100ms diff --git a/slsReceiverSoftware/src/DataProcessor.cpp b/slsReceiverSoftware/src/DataProcessor.cpp index 9eed48b66..17ee40b6f 100644 --- a/slsReceiverSoftware/src/DataProcessor.cpp +++ b/slsReceiverSoftware/src/DataProcessor.cpp @@ -307,7 +307,6 @@ void DataProcessor::ProcessAnImage(char* buf) { RecordFirstIndices(fnum); } - /** bunch id pass as well and then do what with it */ if (fileWriteEnable && *callbackAction == DO_EVERYTHING) file->WriteToFile(buf, generalData->fifoBufferSize + FILE_FRAME_HEADER_SIZE, fnum-firstMeasurementIndex); } diff --git a/slsReceiverSoftware/src/DataStreamer.cpp b/slsReceiverSoftware/src/DataStreamer.cpp index 28933c205..962643adf 100644 --- a/slsReceiverSoftware/src/DataStreamer.cpp +++ b/slsReceiverSoftware/src/DataStreamer.cpp @@ -25,7 +25,7 @@ pthread_mutex_t DataStreamer::Mutex = PTHREAD_MUTEX_INITIALIZER; const char* DataStreamer::jsonHeaderFormat_part1 = "{" - "\"htype\":[\"chunk-1.0\"], " + "\"version\":%.1f, " "\"type\":\"%s\", " "\"shape\":[%d, %d], "; @@ -33,7 +33,7 @@ const char* DataStreamer::jsonHeaderFormat = "%s" "\"acqIndex\":%lld, " "\"fIndex\":%lld, " - "\"subfnum\":%lld, " + "\"subfnum\":%u, " "\"fname\":\"%s\"}"; @@ -132,7 +132,7 @@ void DataStreamer::ResetParametersforNewMeasurement(){ void DataStreamer::CreateHeaderPart1() { char type[10] = ""; switch (*dynamicRange) { - case 4: strcpy(type, "uint8"); break; + case 4: strcpy(type, "uint4"); break; case 8: strcpy(type, "uint8"); break; case 16: strcpy(type, "uint16"); break; case 32: strcpy(type, "uint32"); break; @@ -143,7 +143,7 @@ void DataStreamer::CreateHeaderPart1() { } sprintf(currentHeader, jsonHeaderFormat_part1, - type, generalData->nPixelsX, generalData->nPixelsY); + STREAMER_VERSION, type, generalData->nPixelsX, generalData->nPixelsY); #ifdef VERBOSE cprintf(BLUE, "%d currentheader: %s\n", index, currentHeader); #endif @@ -233,7 +233,7 @@ void DataStreamer::StopProcessing(char* buf) { if (!SendHeader(0, true)) cprintf(RED,"Error: Could not send zmq dummy header for streamer %d\n", index); - if (!zmqSocket->SendData(DUMMY_MSG, DUMMY_MSG_SIZE)) + if (!zmqSocket->SendData((char*)DUMMY_MSG, DUMMY_MSG_SIZE)) cprintf(RED,"Error: Could not send zmq dummy message for streamer %d\n", index); fifo->FreeAddress(buf); @@ -246,6 +246,7 @@ void DataStreamer::StopProcessing(char* buf) { void DataStreamer::ProcessAnImage(char* buf) { uint64_t fnum = (*((uint64_t*)buf)); + uint32_t snum = (*((uint32_t*)(buf + FILE_FRAME_HDR_FNUM_SIZE))); #ifdef VERBOSE if (!index) cprintf(MAGENTA,"DataStreamer %d: fnum:%lld\n", index, (long long int)fnum); #endif @@ -270,7 +271,7 @@ void DataStreamer::ProcessAnImage(char* buf) { return; } - if (!SendHeader(fnum)) + if (!SendHeader(fnum, snum)) cprintf(RED,"Error: Could not send zmq header for fnum %lld and streamer %d\n", (long long int) fnum, index); @@ -307,21 +308,21 @@ bool DataStreamer::CheckCount() { } -int DataStreamer::SendHeader(uint64_t fnum, bool dummy) { +int DataStreamer::SendHeader(uint64_t fnum, uint32_t snum, bool dummy) { uint64_t frameIndex = -1; uint64_t acquisitionIndex = -1; - uint64_t subframeIndex = -1; + uint32_t subframeIndex = -1; char fname[MAX_STR_LENGTH] = "run"; char buf[1000] = ""; if (!dummy) { frameIndex = fnum - firstMeasurementIndex; acquisitionIndex = fnum - firstAcquisitionIndex; - subframeIndex = -1; /* subframe to be included in fifo buffer? */ + subframeIndex = snum; /* fname to be included in fifo buffer? */ } - int len = sprintf(buf, jsonHeaderFormat, currentHeader, acquisitionIndex, frameIndex, subframeIndex,fname); + int len = sprintf(buf, jsonHeaderFormat, currentHeader, acquisitionIndex, frameIndex, subframeIndex, fname); #ifdef VERBOSE printf("%d Streamer: buf:%s\n", index, buf); #endif diff --git a/slsReceiverSoftware/src/Listener.cpp b/slsReceiverSoftware/src/Listener.cpp index 7ab0a34ca..3f07177f2 100644 --- a/slsReceiverSoftware/src/Listener.cpp +++ b/slsReceiverSoftware/src/Listener.cpp @@ -265,6 +265,7 @@ void Listener::ThreadExecution() { } (*((uint32_t*)buffer)) = rc; + //for those returning earlier (*((uint64_t*)(buffer + FIFO_HEADER_NUMBYTES ))) = currentFrameIndex; currentFrameIndex++; @@ -288,8 +289,10 @@ void Listener::StopListening(char* buf) { uint32_t Listener::ListenToAnImage(char* buf) { uint32_t rc = 0; - uint64_t fnum = 0; uint32_t pnum = 0; + uint64_t fnum = 0, bid = 0; + uint32_t pnum = 0, snum = 0; int dsize = generalData->dataSize; + bool isHeaderEmpty = true; //reset to -1 @@ -299,13 +302,18 @@ uint32_t Listener::ListenToAnImage(char* buf) { //look for carry over if (carryOverFlag) { //check if its the current image packet - generalData->GetHeaderInfo(index,carryOverPacket,fnum,pnum); + generalData->GetHeaderInfo(index, carryOverPacket, fnum, pnum, snum, bid); if (fnum != currentFrameIndex) { return generalData->imageSize; } carryOverFlag = false; memcpy(buf + (pnum * dsize), carryOverPacket + generalData->headerSizeinPacket, dsize); - (*((uint64_t*)(buf - FILE_FRAME_HEADER_SIZE))) = fnum; + if(isHeaderEmpty) { + (*((uint64_t*)(buf - FILE_FRAME_HEADER_SIZE))) = fnum; + (*((uint64_t*)(buf - FILE_FRAME_HEADER_SIZE + FILE_FRAME_HDR_FNUM_SIZE))) = snum; + (*((uint64_t*)(buf - FILE_FRAME_HEADER_SIZE + FILE_FRAME_HDR_FNUM_SIZE + FILE_FRAME_HDR_SNUM_SIZE))) = bid; + isHeaderEmpty = false; + } } @@ -319,7 +327,7 @@ uint32_t Listener::ListenToAnImage(char* buf) { //update parameters numPacketsCaught++; //record immediately to get more time before socket shutdown numTotalPacketsCaught++; - generalData->GetHeaderInfo(index,listeningPacket,fnum,pnum); + generalData->GetHeaderInfo(index, listeningPacket, fnum, pnum, snum, bid); lastCaughtFrameIndex = fnum; #ifdef VERBOSE if (!index && !pnum) cprintf(GREEN,"Listening %d: fnum:%lld, pnum:%d\n", index, (long long int)fnum, pnum); @@ -327,7 +335,6 @@ uint32_t Listener::ListenToAnImage(char* buf) { if (!measurementStartedFlag) RecordFirstIndices(fnum); - //future packet if (fnum != currentFrameIndex) { carryOverFlag = true; @@ -337,6 +344,12 @@ uint32_t Listener::ListenToAnImage(char* buf) { //copy packet memcpy(buf + (pnum * dsize), listeningPacket + generalData->headerSizeinPacket, dsize); + if(isHeaderEmpty) { + (*((uint64_t*)(buf - FILE_FRAME_HEADER_SIZE))) = fnum; + (*((uint64_t*)(buf - FILE_FRAME_HEADER_SIZE + FILE_FRAME_HDR_FNUM_SIZE))) = snum; + (*((uint64_t*)(buf - FILE_FRAME_HEADER_SIZE + FILE_FRAME_HDR_FNUM_SIZE + FILE_FRAME_HDR_SNUM_SIZE))) = bid; + isHeaderEmpty = false; + } } return generalData->imageSize;