merged with zmqdata

This commit is contained in:
Dhanya Maliakal
2016-10-20 08:54:29 +02:00
49 changed files with 17399 additions and 1398 deletions

View File

@ -70,6 +70,13 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
void configure(map<string, string> config_map);
//*** file parameters***
/**
* Set File Name Prefix (without frame index, file index and extension (_d0_f000000000000_8.raw))
* Does not check for file existence since it is created only at startReceiver
* @param c file name (max of 1000 characters)
*/
void setFileName(const char c[]);
/**
* Overridden method
* Set data compression, by saving only hits (so far implemented only for Moench and Gotthard)
@ -78,6 +85,19 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
*/
int setDataCompressionEnable(const bool b);
//***acquisition count parameters***
/**
* Get Total Frames Caught for an entire acquisition (including all scans)
* @return total number of frames caught for entire acquisition
*/
uint64_t getTotalFramesCaught() const;
/**
* Get Frames Caught for each real time acquisition (eg. for each scan)
* @return number of frames caught for each scan
*/
uint64_t getFramesCaught() const;
//***acquisition parameters***
/**
* Overridden method
@ -87,12 +107,18 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
void setShortFrameEnable(const int i);
/**
* Overridden method
* Set the Frequency of Frames Sent to GUI
* @param i 0 for random frame requests, n for nth frame frequency
* @param freq 0 for random frame requests, n for nth frame frequency
* @return OK or FAIL
*/
int setFrameToGuiFrequency(const uint32_t i);
int setFrameToGuiFrequency(const uint32_t freq);
/**
* Set the data stream enable
* @param enable 0 to disable, 1 to enable
* @return OK or FAIL
*/
uint32_t setDataStreamEnable(const uint32_t enable);
/**
* Overridden method
@ -199,20 +225,23 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
/**
* Overridden method
* Get the buffer-current frame read by receiver
* @param ithread writer thread
* @param c pointer to current file name
* @param raw address of pointer, pointing to current frame to send to gui
* @param startAcq start index of the acquisition
* @param startFrame start index of the scan
*/
void readFrame(char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame);
void readFrame(int ithread, char* c,char** raw, int64_t &startAcq, int64_t &startFrame);
void resetGuiPointer(int ithread);
/**
* Overridden method
* Closes file / all files(data compression involves multiple files)
* TCPIPInterface can also call this in case of illegal shutdown of receiver
* @param i thread index valid for datacompression using root files, -1 for all threads
* @param ithread writer thread index
*/
void closeFile(int i = -1);
void closeFile(int ithread = 0);
private:
/*************************************************************************
@ -273,6 +302,12 @@ private:
/*************************************************************************
* Listening and Writing Threads *****************************************
*************************************************************************/
/**
* Create Data Call Back Threads
* @param destroy is true to destroy all the threads
* @return OK or FAIL
*/
int createDataCallbackThreads(bool destroy = false);
/**
* Create Listening Threads
@ -287,6 +322,9 @@ private:
*/
int createWriterThreads(bool destroy = false);
/**
* Set Thread Priorities
*/
@ -307,9 +345,10 @@ private:
/**
* Creates new file and reset some parameters
* @param ithread writer thread index
* @return OK or FAIL
*/
int createNewFile();
int createNewFile(int ithread);
/**
* Creates new tree and file for compression
@ -319,6 +358,12 @@ private:
*/
int createCompressionFile(int ithread, int iframe);
/**
* Static function - Starts Data Callback Thread of this object
* @param this_pointer pointer to this object
*/
static void* startDataCallbackThread(void *this_pointer);
/**
* Static function - Starts Listening Thread of this object
* @param this_pointer pointer to this object
@ -331,6 +376,11 @@ private:
*/
static void* startWritingThread(void *this_pointer);
/**
* Thread that sends data packets to client
*/
void startDataCallback();
/**
* Thread that listens to packets
* It pops the fifofree for free addresses, listens to packets and pushes them into the fifo
@ -346,12 +396,11 @@ private:
* Also copies carryovers from previous frame in front of buffer (gotthard and moench)
* For eiger, it ignores packets less than onePacketSize
* @param ithread listening thread index
* @param lSize number of bytes to listen to
* @param cSize number of bytes carried on from previous buffer
* @param temp temporary storage of previous buffer
* @return the number of bytes actually received
*/
int prepareAndListenBuffer(int ithread, int lSize, int cSize, char* temp);
int prepareAndListenBuffer(int ithread, int cSize, char* temp);
/**
* Called by startListening
@ -378,9 +427,10 @@ private:
* @param ithread listening thread index
* @param cSize number of bytes carried over to the next buffer to reunite with split frame
* @param temp temporary buffer to store the split frame
* @param rc number of bytes received
* @return packet count
*/
uint32_t processListeningBuffer(int ithread, int cSize,char* temp);
uint32_t processListeningBuffer(int ithread, int &cSize,char* temp, int rc);
/**
* Thread started which writes packets to file.
@ -392,18 +442,6 @@ private:
*/
void startWriting();
/**
* Called by processWritingBuffer and processWritingBufferPacketByPacket
* Pops buffer from all the FIFOs and checks for dummy frames and end of acquisition
* @param ithread current thread index
* @param wbuffer the buffer array that is popped from all the FIFOs
* @param ready if that FIFO is allowed to pop (depends on if dummy buffer already popped/ waiting for other FIFO to finish a frame(eiger))
* @param nP number of packets in the buffer popped out
* @param fifoTempFree circular fifo to save addresses of packets adding upto a frame before pushing into fifofree (eiger specific)
* @return true if end of acquisition else false
*/
bool popAndCheckEndofAcquisition(int ithread, char* wbuffer[], bool ready[], uint32_t nP[],CircularFifo<char>* fifoTempFree[]);
/**
* Called by processWritingBuffer and processWritingBufferPacketByPacket
* When dummy-end buffers are popped from all FIFOs (acquisition over), this is called
@ -413,7 +451,7 @@ private:
* @param ithread writing thread index
* @param wbuffer writing buffer popped out from FIFO
*/
void stopWriting(int ithread, char* wbuffer[]);
void stopWriting(int ithread, char* wbuffer);
/**
* Called by processWritingBuffer and processWritingBufferPacketByPacket
@ -423,39 +461,39 @@ private:
* @param wbuffer writing buffer popped out from FIFO
* @param npackets number of packets
*/
void handleWithoutDataCompression(int ithread, char* wbuffer[],uint32_t npackets);
void handleWithoutDataCompression(int ithread, char* wbuffer,uint32_t npackets);
/**
* Calle by handleWithoutDataCompression
* Creating headers Writing to file without compression
* @param ithread writer thread index
* @param wbuffer is the address of buffer popped out of FIFO
* @param numpackets is the number of packets
*/
void writeFileWithoutCompression(char* wbuffer[],uint32_t numpackets);
void writeFileWithoutCompression(int ithread, char* wbuffer,uint32_t numpackets);
/**
* Called by writeToFileWithoutCompression
* Create headers for file writing (at the moment, this is eiger specific)
* @param wbuffer writing buffer popped from FIFOs
*/
void createHeaders(char* wbuffer[]);
void createHeaders(char* wbuffer);
/**
* Updates the file header char aray, each time the corresp parameter is changed
* @param ithread writer thread index
*/
void updateFileHeader();
void updateFileHeader(int ithread);
/**
* Called by handleWithoutDataCompression and handleWithCompression after writing to file
* Copy frames for GUI and updates appropriate parameters for frequency frames to gui
* Uses semaphore for nth frame mode
* @param ithread writer thread index
* @param buffer buffer to copy
* @param numpackets number of packets to copy
*/
void copyFrameToGui(char* buffer[]);
void processWritingBuffer(int ithread);
void processWritingBufferPacketByPacket(int ithread);
void copyFrameToGui(int ithread, char* buffer, uint32_t numpackets);
void waitWritingBufferForNextAcquisition(int ithread);
@ -468,18 +506,47 @@ private:
* @param wbuffer writer buffer
* @param nf number of frames
*/
void handleDataCompression(int ithread, char* wbuffer[], uint64_t &nf);
void handleDataCompression(int ithread, char* wbuffer, uint64_t &nf);
/**
* Get Frame Number
* @param ithread writer thread index
* @param wbuffer writer buffer
* @param framenumber reference to the frame number
* @param packetnumber reference to the packet number
* @param subframenumber reference to the subframe number
* @return OK or FAIL
*/
int getFrameandPacketNumber(int ithread, char* wbuffer, uint64_t &framenumber, uint32_t &packetnumber, uint32_t &subframenumber);
/**
* Find offset upto this frame number and write it to file
* @param ithread writer thread index
* @param wbuffer writer buffer
* @param offset reference of offset to look from and replaces offset to starting of nextframenumber
* @param nextFrameNumber frame number up to which data written
* @param numpackets number of packets in buffer
* @param numPacketsWritten number of packets written to file
*/
int writeUptoFrameNumber(int ithread, char* wbuffer, int &offset, uint64_t nextFrameNumber, uint32_t numpackets, int &numPacketsWritten);
/*************************************************************************
* Class Members *********************************************************
*************************************************************************/
/** Maximum Number of Writer Threads */
#ifdef DCOMPRESS
/**** most likely not used ***/
const static int MAX_NUMBER_OF_WRITER_THREADS = 15;
#else
const static int MAX_NUMBER_OF_WRITER_THREADS = 2;
#endif
//**detector parameters***
/** Size of 1 Frame including headers */
int frameSize;
/*Detector Readout ID*/
int detID;
/** Size of 1 buffer processed at a time */
int bufferSize;
@ -513,15 +580,20 @@ private:
#endif
/** Complete File name */
char completeFileName[MAX_STR_LENGTH];
char completeFileName[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH];
/** Maximum Packets Per File **/
int maxPacketsPerFile;
/** File Name without frame index, file index and extension (_d0_f000000000000_8.raw)*/
char fileNamePerThread[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH];
/** Maximum Frames Per File **/
uint64_t maxFramesPerFile;
/** If file created successfully for all Writer Threads */
bool fileCreateSuccess;
char fileHeader[1000];
const static int FILE_HEADER_SIZE = 400;
char fileHeader[MAX_NUMBER_OF_WRITER_THREADS][FILE_HEADER_SIZE];
@ -534,38 +606,34 @@ private:
uint64_t startFrameIndex;
/** Actual current frame index of each time acquisition (eg. for each scan) */
uint64_t frameIndex;
uint64_t frameIndex[MAX_NUMBER_OF_WRITER_THREADS];
/** Current Frame Number */
uint64_t currentFrameNumber;
uint64_t currentFrameNumber[MAX_NUMBER_OF_WRITER_THREADS];
/** Previous Frame number from buffer to calculate loss */
int64_t previousFrameNumber;
int64_t frameNumberInPreviousFile[MAX_NUMBER_OF_WRITER_THREADS];
/** Last Frame Index Listened To */
int32_t lastFrameIndex;
/* Acquisition started */
bool acqStarted;
/* Measurement started */
bool measurementStarted;
/* Measurement started - for each thread to get progress print outs*/
bool measurementStarted[MAX_NUMBER_OF_LISTENING_THREADS];
/** Total Frame Count listened to by listening threads */
int totalListeningFrameCount[MAX_NUMBER_OF_LISTENING_THREADS];
/** Total packet Count listened to by listening threads */
int totalListeningPacketCount[MAX_NUMBER_OF_LISTENING_THREADS];
/** Pckets currently in current file, starts new file when it reaches max */
uint32_t packetsInFile;
int64_t lastFrameNumberInFile[MAX_NUMBER_OF_WRITER_THREADS];
/** Number of Missing Packets per buffer*/
uint32_t numMissingPackets;
/** Total Number of Missing Packets in acquisition*/
uint32_t numTotMissingPackets;
/** Number of Missing Packets in file */
uint32_t numTotMissingPacketsInFile;
/** packets in current file */
uint64_t totalPacketsInFile[MAX_NUMBER_OF_WRITER_THREADS];
/**Total packet count written by each writing thread */
uint64_t totalWritingPacketCount[MAX_NUMBER_OF_LISTENING_THREADS];
@ -587,7 +655,7 @@ private:
genericSocket* udpSocket[MAX_NUMBER_OF_LISTENING_THREADS];
/** File Descriptor */
FILE *sfilefd;
FILE *sfilefd[MAX_NUMBER_OF_WRITER_THREADS];
/** Number of Jobs Per Buffer */
int numberofJobsPerBuffer;
@ -595,9 +663,8 @@ private:
/** Total fifo size */
uint32_t fifoSize;
/** Missing Packet identifier value */
const static uint16_t missingPacketValue = 0xFFFF;
const static uint16_t deactivatedPacketValue = 0xFEFE;
/** Missing Packet */
int missingPacketinFile;
/** Dummy Packet identifier value */
const static uint32_t dummyPacketValue = 0xFFFFFFFF;
@ -606,25 +673,45 @@ private:
//***receiver to GUI parameters***
/** Current Frame copied for GUI */
char* latestData;
/** If Data to be sent to GUI is ready */
bool guiDataReady;
/** Pointer to data to be sent to GUI */
char* guiData;
char* latestData[MAX_NUMBER_OF_WRITER_THREADS];
/** Pointer to file name to be sent to GUI */
char guiFileName[MAX_STR_LENGTH];
char guiFileName[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH];
/** Number of packets copied to be sent to gui (others padded) */
int guiNumPackets[MAX_NUMBER_OF_WRITER_THREADS];
/** Semaphore to synchronize Writer and GuiReader threads*/
sem_t writerGuiSemaphore;
sem_t writerGuiSemaphore[MAX_NUMBER_OF_WRITER_THREADS]; //datacompression, only first thread sends to gui
/** Semaphore to synchronize Writer and GuiReader threads*/
sem_t dataCallbackWriterSemaphore[MAX_NUMBER_OF_WRITER_THREADS]; //datacompression, only first thread sends to gui
/** counter for nth frame to gui */
int frametoGuiCounter;
int frametoGuiCounter[MAX_NUMBER_OF_WRITER_THREADS];
//***data call back thread parameters***
/** Ensures if zmq threads created successfully */
bool zmqThreadStarted;
/** Number of data callback Threads */
int numberofDataCallbackThreads;
/** Data Callback Threads */
pthread_t dataCallbackThreads[MAX_NUMBER_OF_LISTENING_THREADS];
/** Semaphores Synchronizing DataCallback Threads */
sem_t dataCallbackSemaphore[MAX_NUMBER_OF_LISTENING_THREADS];
/** Mask with each bit indicating status of each data callback thread */
volatile uint32_t dataCallbackThreadsMask;
/** Set to self-terminate data callback threads waiting for semaphores */
bool killAllDataCallbackThreads;
//***general and listening thread parameters***
/** Ensures if threads created successfully */
@ -642,9 +729,6 @@ private:
/** Semaphores Synchronizing Listening Threads */
sem_t listenSemaphore[MAX_NUMBER_OF_LISTENING_THREADS];
/** Current Listening Thread Index*/
int currentListeningThreadIndex;
/** Mask with each bit indicating status of each listening thread */
volatile uint32_t listeningThreadsMask;
@ -654,9 +738,6 @@ private:
//***writer thread parameters***
/** Maximum Number of Writer Threads */
const static int MAX_NUMBER_OF_WRITER_THREADS = 15;
/** Number of Writer Threads */
int numberofWriterThreads;
@ -680,6 +761,10 @@ private:
uint64_t deactivated_framenumber[MAX_NUMBER_OF_LISTENING_THREADS];
uint32_t deactivated_packetnumber[MAX_NUMBER_OF_LISTENING_THREADS];
//***deactivated parameters***
uint64_t deactivatedFrameNumber[MAX_NUMBER_OF_LISTENING_THREADS];
int deactivatedFrameIncrement;
//***filter parameters***
@ -711,6 +796,7 @@ private:
/** Progress (currentFrameNumber) Mutex */
pthread_mutex_t progressMutex;
//***callback***
/** The action which decides what the user and default responsibilities to save data are
* 0 raw data ready callback takes care of open,close,write file