diff --git a/slsReceiverSoftware/gitInfo.txt b/slsReceiverSoftware/gitInfo.txt index 09ad992a7..bfc610dae 100644 --- a/slsReceiverSoftware/gitInfo.txt +++ b/slsReceiverSoftware/gitInfo.txt @@ -1,9 +1,9 @@ Path: slsDetectorsPackage/slsReceiverSoftware URL: origin git@git.psi.ch:sls_detectors_software/sls_receiver_software.git Repository Root: origin git@git.psi.ch:sls_detectors_software/sls_receiver_software.git -Repsitory UUID: 9f0ea629975864abb9bb3fd9f59c1d9188e0f3d1 -Revision: 512 +Repsitory UUID: a453c1d55d73fecc18af9df6ada21621a989ee4b +Revision: 514 Branch: 2.3 Last Changed Author: Dhanya_Maliakal -Last Changed Rev: 512 -Last Changed Date: 2017-03-23 15:17:21 +0100 +Last Changed Rev: 514 +Last Changed Date: 2017-03-24 13:41:56 +0100 diff --git a/slsReceiverSoftware/include/BinaryFile.h b/slsReceiverSoftware/include/BinaryFile.h index 5bff5f486..7b6e8e0b7 100644 --- a/slsReceiverSoftware/include/BinaryFile.h +++ b/slsReceiverSoftware/include/BinaryFile.h @@ -46,12 +46,6 @@ class BinaryFile : private virtual slsReceiverDefs, public File, public BinaryFi */ void PrintMembers(); - /** - * Get File Handle pointer - * @returns file handle pointer - */ - FILE* GetFileHandle(); - /** * Create file * @param fnum current frame index to include in file name diff --git a/slsReceiverSoftware/include/BinaryFileStatic.h b/slsReceiverSoftware/include/BinaryFileStatic.h index e57374616..f33296d12 100644 --- a/slsReceiverSoftware/include/BinaryFileStatic.h +++ b/slsReceiverSoftware/include/BinaryFileStatic.h @@ -129,16 +129,16 @@ class BinaryFileStatic { time_t t = time(0); char message[MAX_STR_LENGTH]; sprintf(message, - "Version : %.1f\n" - "Dynamic Range : %d\n" - "Ten Giga : %d\n" - "Image Size : %d bytes\n" - "x : %d pixels\n" - "y : %d pixels\n" - "Total Frames : %lld\n" - "Exptime (ns) : %lld\n" - "Period (ns) : %lld\n" - "Timestamp : %s\n\n" + "Version : %.1f\n" + "Dynamic Range : %d\n" + "Ten Giga : %d\n" + "Image Size : %d bytes\n" + "x : %d pixels\n" + "y : %d pixels\n" + "Total Frames : %lld\n" + "Exptime (ns) : %lld\n" + "Period (ns) : %lld\n" + "Timestamp : %s\n\n" "#Frame Header\n" "Frame Number : 8 bytes\n" diff --git a/slsReceiverSoftware/include/DataProcessor.h b/slsReceiverSoftware/include/DataProcessor.h index cc714ed53..8285129f6 100644 --- a/slsReceiverSoftware/include/DataProcessor.h +++ b/slsReceiverSoftware/include/DataProcessor.h @@ -28,14 +28,12 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject { * @param ftype pointer to file format type * @param fwenable pointer to file writer enable * @param dsEnable pointer to data stream enable - * @param cbaction pointer to call back action * @param dataReadycb pointer to data ready call back function * @param pDataReadycb pointer to arguments of data ready call back function */ DataProcessor(Fifo*& f, fileFormat* ftype, bool* fwenable, bool* dsEnable, - int* cbaction, void (*dataReadycb)(uint64_t, uint32_t, uint32_t, uint64_t, uint64_t, uint16_t, uint16_t, uint16_t, uint16_t, uint32_t, uint16_t, uint8_t, uint8_t, - char*, uint32_t, FILE*, void*), + char*, uint32_t, void*), void *pDataReadycb); /** @@ -286,13 +284,6 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject { bool* fileWriteEnable; - //***callback parameters*** - /** Pointer to the action which decides what the user and default responsibilities to save data are - * 0 raw data ready callback takes care of open,close,write file - * 1 callback writes file, we have to open, close it - * 2 we open, close, write file, callback does not do anything */ - int* callbackAction; - /** * Call back for raw data * args to raw data ready callback are @@ -311,10 +302,9 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject { * version is the version number of this structure format * dataPointer is the pointer to the data * dataSize in bytes is the size of the data in bytes - * fileDescriptor is the file descriptor */ void (*rawDataReadyCallBack)(uint64_t, uint32_t, uint32_t, uint64_t, uint64_t, uint16_t, uint16_t, uint16_t, uint16_t, uint32_t, uint16_t, uint8_t, uint8_t, - char*, uint32_t, FILE*, void*); + char*, uint32_t, void*); void *pRawDataReady; diff --git a/slsReceiverSoftware/include/File.h b/slsReceiverSoftware/include/File.h index 019e0bcb5..908c9f4ee 100644 --- a/slsReceiverSoftware/include/File.h +++ b/slsReceiverSoftware/include/File.h @@ -11,16 +11,6 @@ #include "sls_receiver_defs.h" #include "logger.h" -#ifdef HDF5C -//#ifndef HDF5DEFINED -//#define HDF5DEFINED -#include "H5Cpp.h" -#ifndef H5_NO_NAMESPACE - using namespace H5; -#endif -#endif -//#endif - #include class File : private virtual slsReceiverDefs { @@ -67,15 +57,6 @@ class File : private virtual slsReceiverDefs { */ virtual fileFormat GetFileType() = 0; - /** - * Get File Handle pointer - * @returns file handle pointer - */ - virtual FILE* GetBinaryFileHandle() {return NULL;}; -#ifdef HDF5C - virtual H5File* GetHDF5FileHandle() {return NULL;}; -#endif - /** * Get Member Pointer Values before the object is destroyed * @param nd pointer to number of detectors in each dimension diff --git a/slsReceiverSoftware/include/HDF5File.h b/slsReceiverSoftware/include/HDF5File.h index 563ed01a2..cc28d6474 100644 --- a/slsReceiverSoftware/include/HDF5File.h +++ b/slsReceiverSoftware/include/HDF5File.h @@ -13,14 +13,10 @@ #include "File.h" #include "HDF5FileStatic.h" -//#ifndef HDF5DEFINED -//#define HDF5DEFINED #include "H5Cpp.h" #ifndef H5_NO_NAMESPACE using namespace H5; #endif -//#endif - #include class HDF5File : private virtual slsReceiverDefs, public File, public HDF5FileStatic { @@ -58,12 +54,6 @@ class HDF5File : private virtual slsReceiverDefs, public File, public HDF5FileSt */ void PrintMembers(); - /** - * Get File Handle pointer - * @returns file handle pointer - */ - H5File* GetHDF5FileHandle(); - /** * Set Number of pixels * @param nx number of pixels in x direction diff --git a/slsReceiverSoftware/include/UDPBaseImplementation.h b/slsReceiverSoftware/include/UDPBaseImplementation.h index 5e163b72d..bde8874e9 100644 --- a/slsReceiverSoftware/include/UDPBaseImplementation.h +++ b/slsReceiverSoftware/include/UDPBaseImplementation.h @@ -518,9 +518,9 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter * fileindex * datasize * - * return value is - * 0 callback takes care of open,close,wrie file - * 1 we open, close, write file, callback does not do anything + * return value is insignificant at the moment + * we write depending on file write enable + * users get data to write depending on call backs registered */ void registerCallBackStartAcquisition(int (*func)(char*, char*, uint64_t, uint32_t, void*),void *arg); @@ -640,9 +640,9 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter * fileindex * datasize * - * return value is - * 0 callback takes care of open,close,wrie file - * 1 we open, close, write file, callback does not do anything + * return value is insignificant at the moment + * we write depending on file write enable + * users get data to write depending on call backs registered */ int (*startAcquisitionCallBack)(char*, char*, uint64_t, uint32_t, void*); void *pStartAcquisition; diff --git a/slsReceiverSoftware/include/UDPInterface.h b/slsReceiverSoftware/include/UDPInterface.h index 5eaacded5..81f8a7faa 100644 --- a/slsReceiverSoftware/include/UDPInterface.h +++ b/slsReceiverSoftware/include/UDPInterface.h @@ -577,9 +577,9 @@ class UDPInterface { * fileindex * datasize * - * return value is - * 0 callback takes care of open,close,write file - * 1 we open, close, write file, callback does not do anything + * return value is insignificant at the moment + * we write depending on file write enable + * users get data to write depending on call backs registered */ virtual void registerCallBackStartAcquisition(int (*func)(char*, char*, uint64_t, uint32_t, void*),void *arg) = 0; diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index 030887b93..933728b47 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -248,12 +248,6 @@ private: /** Number of Jobs */ int numberofJobs; - /** Pointer to the action which decides what the user and default responsibilities to save data are - * 0 raw data ready callback takes care of open,close,write file - * 1 callback writes file, we have to open, close it - * 2 we open, close, write file, callback does not do anything */ - int callbackAction; - //*** mutex *** /** Status mutex */ pthread_mutex_t statusMutex; diff --git a/slsReceiverSoftware/include/UDPStandardImplementation_copy.h b/slsReceiverSoftware/include/UDPStandardImplementation_copy.h deleted file mode 100644 index 392740972..000000000 --- a/slsReceiverSoftware/include/UDPStandardImplementation_copy.h +++ /dev/null @@ -1,869 +0,0 @@ -#pragma once -/********************************************//** - * @file UDPBaseImplementation.h - * @short does all the functions for a receiver, set/get parameters, start/stop etc. - ***********************************************/ - -#include "UDPBaseImplementation.h" - -#include "genericSocket.h" -#include "circularFifo.h" -#include "singlePhotonDetector.h" -#include "slsReceiverData.h" -#include "moenchCommonMode.h" - - -#ifdef MYROOT1 -#include -#include -#endif - -#include -#include -#include -#include - -#ifdef HDF5C -#include "H5Cpp.h" -#ifndef H5_NO_NAMESPACE - using namespace H5; -#endif -#endif - -/** - * @short does all the functions for a receiver, set/get parameters, start/stop etc. - */ - - -class UDPStandardImplementationCopy: private virtual slsReceiverDefs, public UDPBaseImplementation { - public: - - - /************************************************************************* - * Constructor & Destructor ********************************************** - *************************************************************************/ - /** - * Constructor - */ - UDPStandardImplementation(); - - /** - * Destructor - */ - virtual ~UDPStandardImplementation(); - - - /************************************************************************* - * Getters *************************************************************** - * They access local cache of configuration or detector parameters ******* - *************************************************************************/ - - - /************************************************************************* - * Setters *************************************************************** - * They modify the local cache of configuration or detector parameters *** - *************************************************************************/ - - //**initial parameters*** - //*** file parameters*** - /** - * Set File Name Prefix (without frame index, file index and extension (_d0_f000000000000_8.raw)) - * Does not check for file existence since it is created only at startReceiver - * @param c file name (max of 1000 characters) - */ - void setFileName(const char c[]); - - /** - * Overridden method - * Set data compression, by saving only hits (so far implemented only for Moench and Gotthard) - * @param b true for data compression enable, else false - * @return OK or FAIL - */ - int setDataCompressionEnable(const bool b); - - //***acquisition count parameters*** - /** - * Get Total Frames Caught for an entire acquisition (including all scans) - * @return total number of frames caught for entire acquisition - */ - uint64_t getTotalFramesCaught() const; - - /** - * Get Frames Caught for each real time acquisition (eg. for each scan) - * @return number of frames caught for each scan - */ - uint64_t getFramesCaught() const; - - //***acquisition parameters*** - /** - * Overridden method - * Set Short Frame Enabled, later will be moved to getROI (so far only for gotthard) - * @param i index of adc enabled, else -1 if all enabled - */ - void setShortFrameEnable(const int i); - - /** - * Set the Frequency of Frames Sent to GUI - * @param freq 0 for random frame requests, n for nth frame frequency - * @return OK or FAIL - */ - int setFrameToGuiFrequency(const uint32_t freq); - - /** - * Set the data stream enable - * @param enable 0 to disable, 1 to enable - * @return OK or FAIL - */ - uint32_t setDataStreamEnable(const uint32_t enable); - - /** - * Overridden method - * Set Acquisition Period - * @param i acquisition period - * @return OK or FAIL - */ - int setAcquisitionPeriod(const uint64_t i); - - /** - * Set Acquisition Time - * @param i acquisition time - * @return OK or FAIL - */ - int setAcquisitionTime(const uint64_t i); - - /** - * Overridden method - * Set Number of Frames expected by receiver from detector - * The data receiver status will change from running to idle when it gets this number of frames - * @param i number of frames expected - * @return OK or FAIL - */ - int setNumberOfFrames(const uint64_t i); - - /** - * Overridden method - * Set Dynamic Range or Number of Bits Per Pixel - * @param i dynamic range that is 4, 8, 16 or 32 - * @return OK or FAIL - */ - int setDynamicRange(const uint32_t i); - - /** - * Overridden method - * Set Ten Giga Enable - * @param b true if 10Giga enabled, else false (1G enabled) - * @return OK or FAIL - */ - int setTenGigaEnable(const bool b); - - - /** - * Overridden method - * Set Fifo Depth - * @param i fifo depth value - * @return OK or FAIL - */ - int setFifoDepth(const uint32_t i); - - /************************************************************************* - * Behavioral functions*************************************************** - * They may modify the status of the receiver **************************** - *************************************************************************/ - - //***initial functions*** - /** - * Overridden method - * Set receiver type (and corresponding detector variables in derived STANDARD class) - * It is the first function called by the client when connecting to receiver - * @param d detector type - * @return OK or FAIL - */ - int setDetectorType(const detectorType d); - - //***acquisition functions*** - /** - * Overridden method - * Reset acquisition parameters such as total frames caught for an entire acquisition (including all scans) - */ - void resetAcquisitionCount(); - - /** - * Overridden method - * Start Listening for Packets by activating all configuration settings to receiver - * When this function returns, it has status RUNNING(upon SUCCESS) or IDLE (upon failure) - * @param c error message if FAIL - * @return OK or FAIL - */ - int startReceiver(char *c=NULL); - - /** - * Overridden method - * Stop Listening for Packets - * Calls startReadout(), which stops listening and sets status to Transmitting - * When it has read every frame in buffer, the status changes to Run_Finished - * When this function returns, receiver has status IDLE - * Pre: status is running, semaphores have been instantiated, - * Post: udp sockets shut down, status is idle, semaphores destroyed - */ - void stopReceiver(); - - /** - * Overridden method - * Stop Listening to Packets - * and sets status to Transmitting - * Next step would be to get all data and stop receiver completely and return with idle state - * Pre: status is running, udp sockets have been initialized, stop receiver initiated - * Post:udp sockets closed, status is transmitting - */ - void startReadout(); - - /** - * Overridden method - * Shuts down and deletes UDP Sockets - * TCPIPInterface can also call this in case of illegal shutdown of receiver - * @return OK or FAIL - */ - int shutDownUDPSockets(); - - /** - * Overridden method - * Get the buffer-current frame read by receiver - * @param ithread writer thread - * @param c pointer to current file name - * @param raw address of pointer, pointing to current frame to send to gui - * @param startAcq start index of the acquisition - * @param startFrame start index of the scan - */ - void readFrame(int ithread, char* c,char** raw, int64_t &startAcq, int64_t &startFrame); - - - void resetGuiPointer(int ithread); - /** - * Overridden method - * Closes file / all files(data compression involves multiple files) - * TCPIPInterface can also call this in case of illegal shutdown of receiver - * @param ithread writer thread index - */ - void closeFile(int ithread = 0); - - /** - * Activate / Deactivate Receiver - * If deactivated, receiver will write dummy packets 0xFF - * (as it will receive nothing from detector) - */ - int setActivate(int enable = -1); - -private: - /************************************************************************* - * Getters *************************************************************** - * They access local cache of configuration or detector parameters ******* - *************************************************************************/ - -/* - uint64_t (*getFrameNumber)(); - uint64_t eigerGetFrameNumber(); - uint64_t generalGetFrameNumber(); - getframenumber = &generalgetframenumber; - if(dettpe == eiger) getframenumber = &eigerGerFramenumber; - - call using getframenumber(); -*/ - - //**initial parameters*** - - /** - * Delete and free member parameters - */ - void deleteMembers(); - - /** - * Deletes all the filter objects for single photon data - * Deals with data compression - */ - void deleteFilter(); - - /** - * Initialize base member parameters - */ - void initializeBaseMembers(); - - /** - * Initialize member parameters - */ - void initializeMembers(); - - /** - * Sets up all the filter objects for single photon data - * Deals with data compression - */ - void initializeFilter(); - - /** - * Set up the Fifo Structure for processing buffers - * between listening and writer threads - * When the parameters ahve been determined and if fifostructure needs to be changes, - * the listerning and writing threads are also destroyed together with this - * @return OK or FAIL - */ - int setupFifoStructure(); - - - - /************************************************************************* - * Listening and Writing Threads ***************************************** - *************************************************************************/ - /** - * Create Data Call Back Threads - * @param destroy is true to destroy all the threads - * @return OK or FAIL - */ - int createDataCallbackThreads(bool destroy = false); - - /** - * Create Listening Threads - * @param destroy is true to destroy all the threads - */ - int createListeningThreads(bool destroy = false); - - /** - * Create Writer Threads - * @param destroy is true to destroy all the threads - * @return OK or FAIL - */ - int createWriterThreads(bool destroy = false); - - - - - /** - * Set Thread Priorities - */ - void setThreadPriorities(); - - /** - * Creates UDP Sockets - * @return OK or FAIL - */ - int createUDPSockets(); - - /** - * Initializes writer variables and creates the first file - * also does the startAcquisitionCallBack - * @return OK or FAIL - */ - int setupWriter(); - - /** - * Creates new file and reset some parameters - * @param ithread writer thread index - * @return OK or FAIL - */ - int createNewFile(int ithread); - - /** - * Creates new tree and file for compression - * @param ithread thread number - * @param iframe frame number - * @return OK or FAIL - */ - int createCompressionFile(int ithread, int iframe); - - /** - * Static function - Starts Data Callback Thread of this object - * @param this_pointer pointer to this object - */ - static void* startDataCallbackThread(void *this_pointer); - - /** - * Static function - Starts Listening Thread of this object - * @param this_pointer pointer to this object - */ - static void* startListeningThread(void *this_pointer); - - /** - * Static function - Starts Writing Thread of this object - * @param this_pointer pointer to this object - */ - static void* startWritingThread(void *this_pointer); - - /** - * Thread that sends data packets to client - */ - void startDataCallback(); - - /** - * Thread that listens to packets - * It pops the fifofree for free addresses, listens to packets and pushes them into the fifo - * This is continuously looped for each buffer in a nested loop, which is again looped for each acquisition - * Exits only for changing dynamic range, 10G parameters etc and recreated - * - */ - void startListening(); - - /** - * Called by startListening - * Listens to buffer, until packet(s) received or shutdownUDPsocket called by client - * Also copies carryovers from previous frame in front of buffer (gotthard and moench) - * For eiger, it ignores packets less than onePacketSize - * @param ithread listening thread index - * @param cSize number of bytes carried on from previous buffer - * @param temp temporary storage of previous buffer - * @return the number of bytes actually received - */ - int prepareAndListenBuffer(int ithread, int cSize, char* temp); - - /** - * Called by startListening - * Creates the packets - * @param ithread listening thread index - * @return the number of bytes actually received - */ - int prepareAndListenBufferDeactivated(int ithread); - - - /** - * Called by startListening - * Listens to each packet and copies only complete frames - * until all receiver or shutdownUDPsocket called by client - * @param ithread listening thread index - * @return the number of bytes copied to buffer - */ - int prepareAndListenBufferCompleteFrames(int ithread); - - /** - * Called by startListening - * Its called for the first packet of a scan or acquistion - * Sets the startframeindices and the variables to know if acquisition started - * @param ithread listening thread number - */ - void startFrameIndices(int ithread); - - /** - * Called by prepareAndListenBuffer - * This is called when udp socket is shut down by client - * It pushes ffff instead of packet number into fifo - * to inform writers about the end of listening session - * Then sets the listening mask so that it stops listening and wait for next acquisition trigger - * @param ithread listening thread number - * @param numbytes number of bytes received - */ - void stopListening(int ithread, int numbytes); - - /* - * Called by startListening for gotthard and moench to handle split frames - * It processes listening thread buffers by ensuring split frames are in the same buffer - * @param ithread listening thread index - * @param cSize number of bytes carried over to the next buffer to reunite with split frame - * @param temp temporary buffer to store the split frame - * @param rc number of bytes received - * @return packet count - */ - uint32_t processListeningBuffer(int ithread, int &cSize,char* temp, int rc); - - /** - * Thread started which writes packets to file. - * It calls popAndCheckEndofAcquisition to pop fifo and check if it is a dummy end buffer - * It then calls a function to process and write packets to file and pushes the addresses into the fifoFree - * This is continuously looped for each buffer in a nested loop, which is again looped for each acquisition - * Exits only for changing dynamic range, 10G parameters etc and recreated - * - */ - void startWriting(); - - /** - * Called by processWritingBuffer and processWritingBufferPacketByPacket - * When dummy-end buffers are popped from all FIFOs (acquisition over), this is called - * It frees the FIFO addresses, closes all files - * For data compression, it waits for all threads to be done - * Changes the status to RUN_FINISHED and prints statistics - * @param ithread writing thread index - * @param wbuffer writing buffer popped out from FIFO - */ - void stopWriting(int ithread, char* wbuffer); - - /** - * Called by processWritingBuffer and processWritingBufferPacketByPacket - * Updates parameters, (writes headers for eiger) and writes to file when not a dummy frame - * Copies data for gui display and frees addresses popped from FIFOs - * @param ithread writing thread index - * @param wbuffer writing buffer popped out from FIFO - * @param npackets number of packets - */ - void handleWithoutDataCompression(int ithread, char* wbuffer,uint32_t npackets); - - /** - * Called by startWriting for jungfrau and eiger - * writes complete frames to file - * Copies data for gui display and frees addresses popped from FIFOs - * @param ithread writing thread index - * @param wbuffer writing buffer popped out from FIFO - */ - void handleCompleteFramesOnly(int ithread, char* wbuffer); - - /** - * Calle by handleWithoutDataCompression - * Creating headers Writing to file without compression - * @param ithread writer thread index - * @param wbuffer is the address of buffer popped out of FIFO - * @param numpackets is the number of packets - */ - void writeFileWithoutCompression(int ithread, char* wbuffer,uint32_t numpackets); - - /** - * Called by writeToFileWithoutCompression - * Create headers for file writing (at the moment, this is eiger specific) - * @param wbuffer writing buffer popped from FIFOs - */ - void createHeaders(char* wbuffer); - - /** - * Updates the file header char aray, each time the corresp parameter is changed - * @param ithread writer thread index - */ - void updateFileHeader(int ithread); - - /** - * Called by handleWithoutDataCompression and handleWithCompression after writing to file - * Copy frames for GUI and updates appropriate parameters for frequency frames to gui - * Uses semaphore for nth frame mode - * @param ithread writer thread index - * @param buffer buffer to copy - * @param numpackets number of packets to copy - */ - void copyFrameToGui(int ithread, char* buffer, uint32_t numpackets); - - void waitWritingBufferForNextAcquisition(int ithread); - - /** - * Called by processWritingBuffer - * Processing fifo popped buffers for data compression - * Updates parameters and writes to file - * Copies data for gui display and frees addresses popped from FIFOs - * @param ithread writing thread number - * @param wbuffer writer buffer - * @param nf number of frames - */ - void handleDataCompression(int ithread, char* wbuffer, uint64_t &nf); - - - /** - * Get Frame Number - * @param ithread writer thread index - * @param wbuffer writer buffer - * @param framenumber reference to the frame number - * @param packetnumber reference to the packet number - * @param subframenumber reference to the subframe number - * @oaram bunchid reference to the bunch id - * @return OK or FAIL - */ - int getFrameandPacketNumber(int ithread, char* wbuffer, uint64_t &framenumber, uint32_t &packetnumber, uint32_t &subframenumber, uint64_t &bunchid); - - /** - * Find offset upto this frame number and write it to file - * @param ithread writer thread index - * @param wbuffer writer buffer - * @param offset reference of offset to look from and replaces offset to starting of nextframenumber - * @param nextFrameNumber frame number up to which data written - * @param numpackets number of packets in buffer - * @param numPacketsWritten number of packets written to file - */ - int writeUptoFrameNumber(int ithread, char* wbuffer, int &offset, uint64_t nextFrameNumber, uint32_t numpackets, int &numPacketsWritten); - - /** function that returns the name variable from the receiver complete file name prefix - @param fname complete file name prefix - @returns file name - */ - string getNameFromReceiverFilePrefix(string fname); - - /************************************************************************* - * Class Members ********************************************************* - *************************************************************************/ - - /** Maximum Number of Writer Threads */ - -#ifdef DCOMPRESS - /**** most likely not used ***/ - const static int MAX_NUMBER_OF_WRITER_THREADS = 15; -#else - const static int MAX_NUMBER_OF_WRITER_THREADS = 2; -#endif - - //**detector parameters*** - /*Detector Readout ID*/ - int detID; - - /** Size of 1 buffer processed at a time */ - int bufferSize; - - /** One Packet Size including headers */ - int onePacketSize; - - /** One Packet Size without headers */ - int oneDataSize; - - /** Frame Index Mask */ - uint64_t frameIndexMask; - - /** Frame Index Offset */ - int frameIndexOffset; - - /** Packet Index Mask */ - uint64_t packetIndexMask; - - /** Footer offset from start of Packet*/ - int footerOffset; - - /** variable to exclude missing packet */ - bool excludeMissingPackets; - - - //***File parameters*** -#ifdef MYROOT1 - /** Tree where the hits are stored */ - TTree *myTree[MAX_NUMBER_OF_WRITER_THREADS]; - - /** File where the tree is saved */ - TFile *myFile[MAX_NUMBER_OF_WRITER_THREADS]; -#endif - - /** Complete File name */ - char completeFileName[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH]; - - /** File Name without frame index, file index and extension (_d0_f000000000000_8.raw)*/ - char fileNamePerThread[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH]; - - /** Maximum Frames Per File **/ - uint64_t maxFramesPerFile; - const static int progressFrequency = 10; - - /** If file created successfully for all Writer Threads */ - bool fileCreateSuccess; - - /** File header */ - const static unsigned int FILE_HEADER_SIZE = 500; - char fileHeader[MAX_NUMBER_OF_WRITER_THREADS][FILE_HEADER_SIZE]; - - /** File Descriptor */ - FILE *sfilefd[MAX_NUMBER_OF_WRITER_THREADS]; -#ifdef HDF5C - DataSpace *hdf5_dataspaceId[MAX_NUMBER_OF_WRITER_THREADS]; - DataSet *hdf5_datasetId[MAX_NUMBER_OF_WRITER_THREADS]; - H5File *hdf5_fileId[MAX_NUMBER_OF_WRITER_THREADS]; - H5File* hdf5_masterFileId; - H5File* hdf5_virtualFileId; - DataType hdf5_datatype; -#endif - //***acquisition indices/count parameters*** - /** Frame Number of First Frame of an entire Acquisition (including all scans) */ - uint64_t startAcquisitionIndex; - - /** Frame index at start of each real time acquisition (eg. for each scan) */ - uint64_t startFrameIndex; - - /** Actual current frame index of each time acquisition (eg. for each scan) */ - uint64_t frameIndex[MAX_NUMBER_OF_WRITER_THREADS]; - - /** Current Frame Number */ - uint64_t currentFrameNumber[MAX_NUMBER_OF_WRITER_THREADS]; - - /** Previous Frame number from buffer to calculate loss */ - int64_t frameNumberInPreviousFile[MAX_NUMBER_OF_WRITER_THREADS]; - - /** Previous Frame number from last check to calculate loss */ - int64_t frameNumberInPreviousCheck[MAX_NUMBER_OF_WRITER_THREADS]; - - /** total packet count from last check */ - int64_t totalWritingPacketCountFromLastCheck[MAX_NUMBER_OF_WRITER_THREADS]; - - /** Pckets currently in current file, starts new file when it reaches max */ - int64_t lastFrameNumberInFile[MAX_NUMBER_OF_WRITER_THREADS]; - - /** packets in current file */ - uint64_t totalPacketsInFile[MAX_NUMBER_OF_WRITER_THREADS]; - - /**Total packet count written by each writing thread */ - uint64_t totalWritingPacketCount[MAX_NUMBER_OF_LISTENING_THREADS]; - - - /* Acquisition started */ - bool acqStarted; - - /* Measurement started - for each thread to get progress print outs*/ - bool measurementStarted[MAX_NUMBER_OF_LISTENING_THREADS]; - - /** Total packet Count listened to by listening threads */ - int totalListeningPacketCount[MAX_NUMBER_OF_LISTENING_THREADS]; - - /** Total packet Count ignored by listening threads */ - int totalIgnoredPacketCount[MAX_NUMBER_OF_LISTENING_THREADS]; - - - - - - - //***receiver parameters*** - /** Receiver Buffer */ - char *buffer[MAX_NUMBER_OF_LISTENING_THREADS]; - - /** Memory allocated */ - char *mem0[MAX_NUMBER_OF_LISTENING_THREADS]; - - /** Circular fifo to point to addresses of data listened to */ - CircularFifo* fifo[MAX_NUMBER_OF_LISTENING_THREADS]; - - /** Circular fifo to point to address already written and freed, to be reused */ - CircularFifo* fifoFree[MAX_NUMBER_OF_LISTENING_THREADS]; - - /** UDP Sockets - Detector to Receiver */ - genericSocket* udpSocket[MAX_NUMBER_OF_LISTENING_THREADS]; - - /** Number of Jobs Per Buffer */ - int numberofJobsPerBuffer; - - /** Total fifo size */ - uint32_t fifoSize; - - /** fifo buffer header size */ - uint32_t fifoBufferHeaderSize; - - /** Dummy Packet identifier value */ - const static uint32_t dummyPacketValue = 0xFFFFFFFF; - - - - //***receiver to GUI parameters*** - /** Current Frame copied for GUI */ - char* latestData[MAX_NUMBER_OF_WRITER_THREADS]; - - /** Pointer to file name to be sent to GUI */ - char guiFileName[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH]; - - /** Number of packets copied to be sent to gui (others padded) */ - uint32_t guiNumPackets[MAX_NUMBER_OF_WRITER_THREADS]; - - /** Semaphore to synchronize Writer and GuiReader threads*/ - sem_t writerGuiSemaphore[MAX_NUMBER_OF_WRITER_THREADS]; //datacompression, only first thread sends to gui - - /** Semaphore to synchronize Writer and GuiReader threads*/ - sem_t dataCallbackWriterSemaphore[MAX_NUMBER_OF_WRITER_THREADS]; //datacompression, only first thread sends to gui - - /** counter for nth frame to gui */ - int frametoGuiCounter[MAX_NUMBER_OF_WRITER_THREADS]; - - - - //***data call back thread parameters*** - /** Ensures if zmq threads created successfully */ - bool zmqThreadStarted; - - /** Number of data callback Threads */ - int numberofDataCallbackThreads; - - /** Data Callback Threads */ - pthread_t dataCallbackThreads[MAX_NUMBER_OF_LISTENING_THREADS]; - - /** Semaphores Synchronizing DataCallback Threads */ - sem_t dataCallbackSemaphore[MAX_NUMBER_OF_LISTENING_THREADS]; - - /** Mask with each bit indicating status of each data callback thread */ - volatile uint32_t dataCallbackThreadsMask; - - /** Set to self-terminate data callback threads waiting for semaphores */ - bool killAllDataCallbackThreads; - - - - //***general and listening thread parameters*** - /** Ensures if threads created successfully */ - bool threadStarted; - - /** Current Thread Index*/ - int currentThreadIndex; - - /** Number of Listening Threads */ - int numberofListeningThreads; - - /** Listening Threads */ - pthread_t listeningThreads[MAX_NUMBER_OF_LISTENING_THREADS]; - - /** Semaphores Synchronizing Listening Threads */ - sem_t listenSemaphore[MAX_NUMBER_OF_LISTENING_THREADS]; - - /** Mask with each bit indicating status of each listening thread */ - volatile uint32_t listeningThreadsMask; - - /** Set to self-terminate listening threads waiting for semaphores */ - bool killAllListeningThreads; - - - - //***writer thread parameters*** - /** Number of Writer Threads */ - int numberofWriterThreads; - - /** Writer Threads */ - pthread_t writingThreads[MAX_NUMBER_OF_WRITER_THREADS]; - - /** Semaphores Synchronizing Writer Threads */ - sem_t writerSemaphore[MAX_NUMBER_OF_WRITER_THREADS]; - - /** Mask with each bit indicating status of each writer thread */ - volatile uint32_t writerThreadsMask; - - /** Mask with each bit indicating file created for each writer thread*/ - volatile uint32_t createFileMask; - - /** Set to self-terminate writer threads waiting for semaphores */ - bool killAllWritingThreads; - - //***data shape *** - /** Number of pixels in x axis */ - int NX; - /** Number of pixels in y axis */ - int NY; - - int TILE_NX; - int TILE_NY; - - //***filter parameters*** - /** Common Mode Subtraction Enable FIXME: Always false, only moench uses, Ask Anna */ - bool commonModeSubtractionEnable; - - /** Moench Common Mode Subtraction */ - moenchCommonMode *moenchCommonModeSubtraction; - - /** Single Photon Detector Object for each writer thread */ - singlePhotonDetector *singlePhotonDetectorObject[MAX_NUMBER_OF_WRITER_THREADS]; - - /** Receiver Data Object for each writer thread */ - slsReceiverData *receiverData[MAX_NUMBER_OF_WRITER_THREADS]; - - - - - //***mutex*** - /** Status mutex */ - pthread_mutex_t statusMutex; - - /** Writing mutex */ - pthread_mutex_t writeMutex; - - /** GuiDataReady Mutex */ - pthread_mutex_t dataReadyMutex; - - /** Progress (currentFrameNumber) Mutex */ - pthread_mutex_t progressMutex; - - - //***callback*** - /** The action which decides what the user and default responsibilities to save data are - * 0 raw data ready callback takes care of open,close,write file - * 1 callback writes file, we have to open, close it - * 2 we open, close, write file, callback does not do anything */ - int cbAction; - -}; - - -//#endif diff --git a/slsReceiverSoftware/include/gitInfoReceiver.h b/slsReceiverSoftware/include/gitInfoReceiver.h index aab086be3..29510e4e9 100644 --- a/slsReceiverSoftware/include/gitInfoReceiver.h +++ b/slsReceiverSoftware/include/gitInfoReceiver.h @@ -1,11 +1,11 @@ //#define SVNPATH "" #define SVNURL "git@git.psi.ch:sls_detectors_software/sls_receiver_software.git" //#define SVNREPPATH "" -#define SVNREPUUID "9f0ea629975864abb9bb3fd9f59c1d9188e0f3d1" -//#define SVNREV 0x512 +#define SVNREPUUID "a453c1d55d73fecc18af9df6ada21621a989ee4b" +//#define SVNREV 0x514 //#define SVNKIND "" //#define SVNSCHED "" #define SVNAUTH "Dhanya_Maliakal" -#define SVNREV 0x512 -#define SVNDATE 0x20170323 +#define SVNREV 0x514 +#define SVNDATE 0x20170324 // diff --git a/slsReceiverSoftware/include/slsReceiver.h b/slsReceiverSoftware/include/slsReceiver.h index a66afc321..25741401d 100644 --- a/slsReceiverSoftware/include/slsReceiver.h +++ b/slsReceiverSoftware/include/slsReceiver.h @@ -65,9 +65,9 @@ class slsReceiver : private virtual slsReceiverDefs { * fileindex * datasize * - * return value is - * 0 callback takes care of open,close,wrie file - * 1 we open, close, write file, callback does not do anything + * return value is undefined at the moment + * we write depending on file write enable + * users get data to write depending on call backs registered */ void registerCallBackStartAcquisition(int (*func)(char*, char*, uint64_t, uint32_t, void*),void *arg); diff --git a/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h b/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h index 7952ac35e..02befbf9c 100644 --- a/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h +++ b/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h @@ -64,9 +64,9 @@ class slsReceiverTCPIPInterface : private virtual slsReceiverDefs { * fileindex * datasize * - * return value is - * 0 callback takes care of open,close,wrie file - * 1 we open, close, write file, callback does not do anything + * return value is insignificant at the moment + * we write depending on file write enable + * users get data to write depending on call backs registered */ void registerCallBackStartAcquisition(int (*func)(char*, char*, uint64_t, uint32_t, void*),void *arg); @@ -306,9 +306,9 @@ private: * fileindex * datasize * - * return value is - * 0 callback takes care of open,close,wrie file - * 1 we open, close, write file, callback does not do anything + * return value is insignificant at the moment + * we write depending on file write enable + * users get data to write depending on call backs registered */ int (*startAcquisitionCallBack)(char*, char*, uint64_t, uint32_t, void*); void *pStartAcquisition; diff --git a/slsReceiverSoftware/include/slsReceiverUsers.h b/slsReceiverSoftware/include/slsReceiverUsers.h index 91b8a5564..ac8c4a560 100644 --- a/slsReceiverSoftware/include/slsReceiverUsers.h +++ b/slsReceiverSoftware/include/slsReceiverUsers.h @@ -51,7 +51,7 @@ public: @sort register calbback for starting the acquisition \param func callback to be called when starting the acquisition. Its arguments are filepath, filename, fileindex, datasize - \returns 0 callback takes care of open,close,write file; 1 we open, close, write file, callback does not do anything + \return value is insignificant at the moment, we write depending on file write enable, users get data to write depending on call backs registered */ void registerCallBackStartAcquisition(int (*func)(char* filepath, char* filename, uint64_t fileindex, uint32_t datasize, void*),void *arg); diff --git a/slsReceiverSoftware/src/BinaryFile.cpp b/slsReceiverSoftware/src/BinaryFile.cpp index 6ee32b21e..ff4ba17f5 100644 --- a/slsReceiverSoftware/src/BinaryFile.cpp +++ b/slsReceiverSoftware/src/BinaryFile.cpp @@ -35,10 +35,6 @@ void BinaryFile::PrintMembers() { printf("Number of Frames in File: %d\n",numFramesInFile); } -FILE* BinaryFile::GetFileHandle() { - return filefd; -} - slsReceiverDefs::fileFormat BinaryFile::GetFileType() { return BINARY; } diff --git a/slsReceiverSoftware/src/DataProcessor.cpp b/slsReceiverSoftware/src/DataProcessor.cpp index 01ec63487..c5d2ae9e5 100644 --- a/slsReceiverSoftware/src/DataProcessor.cpp +++ b/slsReceiverSoftware/src/DataProcessor.cpp @@ -32,9 +32,8 @@ pthread_mutex_t DataProcessor::Mutex = PTHREAD_MUTEX_INITIALIZER; DataProcessor::DataProcessor(Fifo*& f, fileFormat* ftype, bool* fwenable, bool* dsEnable, - int* cbaction, void (*dataReadycb)(uint64_t, uint32_t, uint32_t, uint64_t, uint64_t, uint16_t, uint16_t, uint16_t, uint16_t, uint32_t, uint16_t, uint8_t, uint8_t, - char*, uint32_t, FILE*, void*), + char*, uint32_t, void*), void *pDataReadycb) : ThreadObject(NumberofDataProcessors), @@ -51,7 +50,6 @@ DataProcessor::DataProcessor(Fifo*& f, fileFormat* ftype, bool* fwenable, bool* file(0), fileFormatType(ftype), fileWriteEnable(fwenable), - callbackAction(cbaction), rawDataReadyCallBack(dataReadycb), pRawDataReady(pDataReadycb) { @@ -314,32 +312,28 @@ void DataProcessor::ProcessAnImage(char* buf) { RecordFirstIndices(fnum); } - if (*callbackAction == DO_EVERYTHING) { - if (*fileWriteEnable) - file->WriteToFile(buf, generalData->fifoBufferSize + sizeof(sls_detector_header), fnum-firstMeasurementIndex); - } else { - /* - if (rawDataReadyCallBack) - rawDataReadyCallBack( - header->frameNumber, - header->expLength, - header->packetNumber, - header->bunchId, - header->timestamp, - header->modId, - header->xCoord, - header->yCoord, - header->zCoord, - header->debug, - header->roundRNumber, - header->detType, - header->version, - buf + sizeof(sls_detector_header), - generalData->imageSize, - file->GetFileHandle(), pRawDataReady); - */ - file->GetBinaryFileHandle(); - //GetHDF5FileHandle - } + + if (*fileWriteEnable) + file->WriteToFile(buf, generalData->fifoBufferSize + sizeof(sls_detector_header), fnum-firstMeasurementIndex); + + if (rawDataReadyCallBack) + rawDataReadyCallBack( + header->frameNumber, + header->expLength, + header->packetNumber, + header->bunchId, + header->timestamp, + header->modId, + header->xCoord, + header->yCoord, + header->zCoord, + header->debug, + header->roundRNumber, + header->detType, + header->version, + buf + sizeof(sls_detector_header), + generalData->imageSize, + pRawDataReady); + } diff --git a/slsReceiverSoftware/src/HDF5File.cpp b/slsReceiverSoftware/src/HDF5File.cpp index 73022dd55..03f35840a 100644 --- a/slsReceiverSoftware/src/HDF5File.cpp +++ b/slsReceiverSoftware/src/HDF5File.cpp @@ -66,10 +66,6 @@ void HDF5File::PrintMembers() { } -H5File* HDF5File::GetHDF5FileHandle() { - return filefd; -} - void HDF5File::SetNumberofPixels(uint32_t nx, uint32_t ny) { nPixelsX = nx; nPixelsY = ny; diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index cdd91ea2e..f66d1a2d6 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -53,7 +53,6 @@ void UDPStandardImplementation::InitializeMembers() { //*** receiver parameters *** numThreads = 1; numberofJobs = 1; - callbackAction = DO_EVERYTHING; //*** mutex *** pthread_mutex_init(&statusMutex,NULL); @@ -373,7 +372,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) { for ( int i=0; i < numThreads; ++i ) { listener.push_back(new Listener(fifo[i], &status, &udpPortNum[i], eth, &activated, &numberOfFrames, &dynamicRange)); dataProcessor.push_back(new DataProcessor(fifo[i], &fileFormatType, &fileWriteEnable, &dataStreamEnable, - &callbackAction, rawDataReadyCallBack,pRawDataReady)); + rawDataReadyCallBack,pRawDataReady)); if (Listener::GetErrorMask() || DataProcessor::GetErrorMask()) { FILE_LOG (logERROR) << "Error: Could not creates listener/dataprocessor threads (index:" << i << ")"; for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) @@ -442,28 +441,22 @@ int UDPStandardImplementation::startReceiver(char *c) { } //callbacks - callbackAction = DO_EVERYTHING; if (startAcquisitionCallBack) { - callbackAction=startAcquisitionCallBack(filePath, fileName, fileIndex, + startAcquisitionCallBack(filePath, fileName, fileIndex, (generalData->fifoBufferSize) * numberofJobs + (generalData->fifoBufferHeaderSize), pStartAcquisition); - if (callbackAction == DO_NOTHING) { - if (acquisitionFinishedCallBack == NULL || rawDataReadyCallBack == NULL) { - FILE_LOG(logERROR) << "Callback action 0: All the call backs must be registered"; - return FAIL; - } - cout << "Start Acquisition, Acquisition Finished and Data Write has been defined externally" << endl; - } + if (rawDataReadyCallBack != NULL) + cout << "Data Write has been defined externally" << endl; } //processor->writer if (fileWriteEnable) { - if (callbackAction == DO_EVERYTHING && SetupWriter() == FAIL) { + if (SetupWriter() == FAIL) { strcpy(c,"Could not create file."); FILE_LOG(logERROR) << c; return FAIL; } } else - cout << " Data will not be saved" << endl; + cout << "File Write Disabled" << endl; cout << "Ready ..." << endl; diff --git a/slsReceiverSoftware/src/UDPStandardImplementation_copy.cpp b/slsReceiverSoftware/src/UDPStandardImplementation_copy.cpp deleted file mode 100644 index ad2f16af0..000000000 --- a/slsReceiverSoftware/src/UDPStandardImplementation_copy.cpp +++ /dev/null @@ -1,4026 +0,0 @@ -/********************************************//** - * @file UDPStandardImplementation.cpp - * @short does all the functions for a receiver, set/get parameters, start/stop etc. - ***********************************************/ - - -#include "UDPStandardImplementation.h" - -#include "moench02ModuleData.h" -#include "gotthardModuleData.h" -#include "gotthardShortModuleData.h" - -#include // exit() -#include //set precision for printing parameters for create new file -#include //map -#include -#include -#include -#include - -#include //zmq -#include - - -using namespace std; - -#define WRITE_HEADERS - -/************************************************************************* - * Constructor & Destructor ********************************************** - * They access local cache of configuration or detector parameters ******* - *************************************************************************/ - -UDPStandardImplementation::UDPStandardImplementation(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - initializeMembers(); - - //***mutex*** - pthread_mutex_init(&statusMutex,NULL); - pthread_mutex_init(&writeMutex,NULL); - pthread_mutex_init(&dataReadyMutex,NULL); - pthread_mutex_init(&progressMutex,NULL); - - //to increase socket receiver buffer size and max length of input queue by changing kernel settings - if(myDetectorType == EIGER); - else if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max")){ - FILE_LOG(logDEBUG) << "Warning: No root permission to change socket receiver buffer size in file /proc/sys/net/core/rmem_max"; - }else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")){ - FILE_LOG(logDEBUG) << "Warning: No root permission to change max length of input queue in file /proc/sys/net/core/netdev_max_backlog"; - } - - /** permanent setting by heiner - net.core.rmem_max = 104857600 # 100MiB - net.core.netdev_max_backlog = 250000 - sysctl -p - // from the manual - sysctl -w net.core.rmem_max=16777216 - sysctl -w net.core.netdev_max_backlog=250000 - */ -} - -UDPStandardImplementation::~UDPStandardImplementation(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - for(int i=0;i(receiverData[i], csize, sigma, sign, moenchCommonModeSubtraction); -} - - - - -int UDPStandardImplementation::setupFifoStructure(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - - //number of jobs per buffer - int64_t i; - int oldNumberofJobsPerBuffer = numberofJobsPerBuffer; - //eiger always listens to 1 packet at a time - if(excludeMissingPackets){ - numberofJobsPerBuffer = 1; - FILE_LOG(logDEBUG) << "Info: 1 packet per buffer"; - } - //else calculate best possible number of frames to listen to at a time (for fast readouts like gotthard) - else{ - //if frequency to gui is not random (every nth frame), then listen to only n frames per buffer - if(frameToGuiFrequency) - numberofJobsPerBuffer = frameToGuiFrequency; - //random frame sent to gui, then frames per buffer depends on acquisition period - else{ - //calculate 100ms/period to get frames to listen to at a time - if(acquisitionPeriod) - i = SAMPLE_TIME_IN_NS/acquisitionPeriod; - else{ - if(acquisitionTime) - i = SAMPLE_TIME_IN_NS/acquisitionTime; - else - i = SAMPLE_TIME_IN_NS; - } - //max frames to listen to at a time is limited by 1000 - if (i > MAX_JOBS_PER_THREAD) - numberofJobsPerBuffer = MAX_JOBS_PER_THREAD; - else if (i < 1) - numberofJobsPerBuffer = 1; - else - numberofJobsPerBuffer = i; - - } - FILE_LOG(logINFO) << "Number of Frames per buffer:" << numberofJobsPerBuffer << endl; - } - - - - // fifo depth - uint32_t oldFifoSize = fifoSize; - - - //reduce fifo depth if > 1 numberofJobsPerBuffer - if(fifoDepth % numberofJobsPerBuffer) - fifoSize = (fifoDepth/numberofJobsPerBuffer)+1; - else - fifoSize = fifoDepth/numberofJobsPerBuffer; - - //do not rebuild fifo structure if it is the same (oldfifosize differs only for different packetsperframe) - if((oldNumberofJobsPerBuffer == numberofJobsPerBuffer) && (oldFifoSize == fifoSize)) - return OK; - FILE_LOG(logINFO) << "Info: Total Fifo Size:" << fifoSize; - - - - //delete threads - if(threadStarted){ - createListeningThreads(true); - createWriterThreads(true); - } - - - //set up fifo structure - for(int i=0;iisEmpty()){ - fifoFree[i]->pop(buffer[i]); - //cprintf(BLUE,"FifoFree[%d]: value:%d, pop 0x%x\n",i,fifoFree[i]->getSemValue(),(void*)(buffer[i])); - } - delete fifoFree[i]; - fifoFree[i] = 0; - } - if(fifo[i]){ - while(!fifo[i]->isEmpty()){ - fifo[i]->pop(buffer[i]); - //cprintf(CYAN,"Fifo[%d]: value:%d, pop 0x%x\n",i,fifo[i]->getSemValue(),(void*)(buffer[i])); - } - delete fifo[i]; - fifo[i] = 0; - } - if(mem0[i]){ - free(mem0[i]); - mem0[i] = 0; - } - - } - - - for(int i=0;i(fifoSize); - fifo[i] = new CircularFifo(fifoSize); - - //allocate memory - mem0[i] = (char*)calloc((bufferSize * numberofJobsPerBuffer + fifoBufferHeaderSize) * fifoSize,sizeof(char)); - if (mem0[i] == NULL){ - cprintf(BG_RED,"Error: Could not allocate memory for listening \n"); - return FAIL; - } - - //push free address into fifoFree - buffer[i]=mem0[i]; - while (buffer[i] < (mem0[i]+(bufferSize * numberofJobsPerBuffer + fifoBufferHeaderSize) * (fifoSize-1))) { - //cprintf(BLUE,"fifofree %d: push 0x%p\n",i,(void*)buffer[i]); - /*for(int k=0;kpush(buffer[i])); - //cprintf(GREEN,"Fifofree[%d]: value:%d, push 0x%x\n",i,fifoFree[i]->getSemValue(),(void*)(buffer[i])); -#ifdef DEBUG5 - cprintf(BLUE,"Info: %d fifostructure free pushed into fifofree %p\n", i, (void*)(buffer[i])); -#endif - buffer[i] += (bufferSize * numberofJobsPerBuffer + fifoBufferHeaderSize); - } - } - cout << "Fifo structure(s) reconstructed" << endl; - - //create threads - if(createListeningThreads() == FAIL){ - FILE_LOG(logERROR) << "Could not create listening thread"; - return FAIL; - } - if(createWriterThreads() == FAIL){ - FILE_LOG(logERROR) << "Could not create writer threads"; - return FAIL; - } - setThreadPriorities(); - - return OK; -} - - - - - - - - -void UDPStandardImplementation::setFileName(const char c[]){ - FILE_LOG(logDEBUG) << __AT__ << " starting"; - - char oldfilename[MAX_STR_LENGTH]; - strcpy(oldfilename,fileName); - - if(strlen(c)) - strcpy(fileName, c); - - if(strlen(fileName)){ - int detindex = -1; - string tempname(fileName); - size_t uscore=tempname.rfind("_"); - if (uscore!=string::npos){ - if (sscanf(tempname.substr(uscore+1,tempname.size()-uscore-1).c_str(),"d%d",&detindex)) { - detID = detindex; - } - } - if(detindex == -1) - detID = 0; - } - - FILE_LOG(logINFO) << "File name:" << fileName; -} - -int UDPStandardImplementation::setDataCompressionEnable(const bool b){ - FILE_LOG(logDEBUG) << __AT__ << " starting"; - - if(myDetectorType != EIGER){ - cout << "Info: Setting up Data Compression Enable to " << stringEnable(b); -#ifdef MYROOT1 - cout << " WITH ROOT" << endl; -#else - cout << " WITHOUT ROOT" << endl; -#endif - } - - //set data compression enable - dataCompressionEnable = b; - - //-- create writer threads depending on enable - pthread_mutex_lock(&statusMutex); - writerThreadsMask = 0x0; - pthread_mutex_unlock(&(statusMutex)); - - createWriterThreads(true); - if(b) - numberofWriterThreads = MAX_NUMBER_OF_WRITER_THREADS; - else - numberofWriterThreads = 1; - if(createWriterThreads() == FAIL){ - cprintf(BG_RED,"Error: Could not create writer threads\n"); - return FAIL; - } - //-- end of create writer threads - setThreadPriorities(); - - //filter - deleteFilter(); - if(b) - initializeFilter(); - - FILE_LOG(logINFO) << "Data Compression: " << stringEnable(dataCompressionEnable); - - return OK; -} - -/***acquisition count parameters***/ -uint64_t UDPStandardImplementation::getTotalFramesCaught() const{ - FILE_LOG(logDEBUG) << __AT__ << " starting"; - - return (totalPacketsCaught/(packetsPerFrame*numberofListeningThreads)); -} - -uint64_t UDPStandardImplementation::getFramesCaught() const{ - FILE_LOG(logDEBUG) << __AT__ << " starting"; - return (packetsCaught/(packetsPerFrame*numberofListeningThreads)); -} - -/***acquisition parameters***/ -void UDPStandardImplementation::setShortFrameEnable(const int i){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - shortFrameEnable = i; - - if(shortFrameEnable!=-1){ - bufferSize = GOTTHARD_SHORT_BUFFER_SIZE; - onePacketSize = GOTTHARD_SHORT_BUFFER_SIZE; - oneDataSize = GOTTHARD_SHORT_DATABYTES; - maxFramesPerFile = SHORT_MAX_FRAMES_PER_FILE; - packetsPerFrame = GOTTHARD_SHORT_PACKETS_PER_FRAME; - frameIndexMask = GOTTHARD_SHORT_FRAME_INDEX_MASK; - frameIndexOffset = GOTTHARD_SHORT_FRAME_INDEX_OFFSET; - packetIndexMask = GOTTHARD_SHORT_PACKET_INDEX_MASK; - - }else{ - bufferSize = GOTTHARD_BUFFER_SIZE; - onePacketSize = GOTTHARD_ONE_PACKET_SIZE; - oneDataSize = GOTTHARD_ONE_DATA_SIZE; - maxFramesPerFile = MAX_FRAMES_PER_FILE; - packetsPerFrame = GOTTHARD_PACKETS_PER_FRAME; - frameIndexMask = GOTTHARD_FRAME_INDEX_MASK; - frameIndexOffset = GOTTHARD_FRAME_INDEX_OFFSET; - packetIndexMask = GOTTHARD_PACKET_INDEX_MASK; - } - - //filter - deleteFilter(); - if(dataCompressionEnable) - initializeFilter(); - - FILE_LOG(logINFO) << "Short Frame Enable: " << shortFrameEnable; -} - - -int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - frameToGuiFrequency = freq; - if(setupFifoStructure() == FAIL) - return FAIL; - - FILE_LOG(logINFO) << "Frame to Gui Frequency: " << frameToGuiFrequency; - - return OK; -} - - - -uint32_t UDPStandardImplementation::setDataStreamEnable(const uint32_t enable){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - int oldvalue = dataStreamEnable; - dataStreamEnable = enable; - FILE_LOG(logINFO) << "Data Send to Gui: " << dataStreamEnable; - - - if(oldvalue!=dataStreamEnable){ - //data sockets have to be created again as the client ones are - if(zmqThreadStarted) - createDataCallbackThreads(true); - - if(dataStreamEnable){ - numberofDataCallbackThreads = numberofListeningThreads; - if(createDataCallbackThreads() == FAIL){ - cprintf(BG_RED,"Error: Could not create data callback threads\n"); - } - } - } - - return OK; -} - - - -int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - acquisitionPeriod = i; - if(setupFifoStructure() == FAIL) - return FAIL; - - FILE_LOG(logINFO) << "Acquisition Period: " << (double)acquisitionPeriod/(1E9) << "s"; - - if(myDetectorType == EIGER && fileFormatType == BINARY) - for(int i=0; iShutDownSocket(); - FILE_LOG(logINFO) << "Shut down UDP Socket " << i; - delete udpSocket[i]; - udpSocket[i] = 0; - } - } - return OK; -} - - - - -/** - * Pre: status is running, udp sockets have been initialized, stop receiver initiated - * Post:udp sockets closed, status is transmitting - * */ -void UDPStandardImplementation::startReadout(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - FILE_LOG(logDEBUG) << "Transmitting last data"; - - if(status == RUNNING){ - - - //needs to wait for packets only if activated - if(activated){ - //check if all packets got - int totalP = 0,prev=-1; - for(int i=0; igetCurrentTotalReceived(); - - //wait for all packets - if((unsigned long long int)totalP!=numberOfFrames*packetsPerFrame*numberofListeningThreads){ - - //wait as long as there is change from prev totalP, - //and also change from received in buffer to previous value - //(as one listens to many at a time, shouldnt cut off in between) - while((prev != totalP) || (prevReceivedInBuffer!= currentReceivedInBuffer)){ -#ifdef DEBUG5 - cprintf(MAGENTA,"waiting for all packets prevP:%d totalP:%d PrevBuffer:%d currentBuffer:%d\n",prev,totalP,prevReceivedInBuffer,currentReceivedInBuffer); - -#endif - //usleep(2*1000*1000); - usleep(5*1000);/* Need to find optimal time (exposure time and acquisition period) **/ - prev = totalP; - totalP = 0; - for(int i=0; igetCurrentTotalReceived(); -#ifdef DEBUG5 - cprintf(MAGENTA,"\tupdated: totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer); - -#endif - } - - } - } - - //set status - pthread_mutex_lock(&statusMutex); - status = TRANSMITTING; - pthread_mutex_unlock(&statusMutex); - - FILE_LOG(logINFO) << "Status: Transmitting"; - } - - //shut down udp sockets and make listeners push dummy (end) packets for writers - shutDownUDPSockets(); -} - - - -/**make this better by asking all of it at once*/ -void UDPStandardImplementation::readFrame(int ithread, char* c,char** raw, int64_t &startAcq, int64_t &startFrame){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - -} - - - -void UDPStandardImplementation::closeFile(int ithread){ - FILE_LOG(logDEBUG) << __AT__ << " called for " << ithread ; - - //normal - if(!dataCompressionEnable){ - if(sfilefd[ithread]){ -#ifdef DEBUG4 - FILE_LOG(logDEBUG4) << "Going to close file: " << fileno(sfilefd)); -#endif - fflush(sfilefd[ithread]); - fclose(sfilefd[ithread]); - sfilefd[ithread] = 0; - } -#ifdef HDF5C - pthread_mutex_lock(&writeMutex); - try{ - Exception::dontPrint(); //to handle errors - if(hdf5_dataspaceId[ithread]) {delete hdf5_dataspaceId[ithread]; hdf5_dataspaceId[ithread] = 0;} - if(hdf5_datasetId[ithread]) {delete hdf5_datasetId[ithread]; hdf5_datasetId[ithread] = 0;} - if(hdf5_fileId[ithread]) {delete hdf5_fileId[ithread]; hdf5_fileId[ithread] = 0;} - if(hdf5_masterFileId) {delete hdf5_masterFileId; hdf5_masterFileId = 0;} - if(hdf5_virtualFileId) {delete hdf5_virtualFileId; hdf5_virtualFileId = 0;} - } - catch(Exception error){ - cprintf(RED,"Error in closing HDF5 handles\n"); - error.printError(); - } - pthread_mutex_unlock(&writeMutex); -#endif - } - - //compression - else{ -#if (defined(MYROOT1) && defined(ALLFILE_DEBUG)) || !defined(MYROOT1) - if(sfilefd[0]){ -#ifdef DEBUG4 - FILE_LOG(logDEBUG4) << "sfilefd: " << (int)sfilefd[0]; -#endif - fclose(sfilefd[0]); - sfilefd[0] = 0; - } -#endif - -#ifdef MYROOT1 - pthread_mutex_lock(&writeMutex); - //write to file - if(myTree[ithread] && myFile[ithread]){ - myFile[ithread] = myTree[ithread]->GetCurrentFile(); - - if(myFile[ithread]->Write()) - //->Write(tall->GetName(),TObject::kOverwrite); - cout << "Thread " << ithread <<": wrote frames to file" << endl; - else - cout << "Thread " << ithread << ": could not write frames to file" << endl; - - }else - cout << "Thread " << ithread << ": could not write frames to file: No file or No Tree" << endl; - //close file - if(myTree[ithread] && myFile[ithread]) - myFile[ithread] = myTree[ithread]->GetCurrentFile(); - if(myFile[ithread] != 0) - myFile[ithread]->Close(); - myFile[ithread] = 0; - myTree[ithread] = 0; - pthread_mutex_unlock(&writeMutex); - -#endif - } -} - - -//eiger only -int UDPStandardImplementation::setActivate(int enable){ - FILE_LOG(logDEBUG) << __AT__ << " starting"; - - if(enable != -1){ - activated = enable; - FILE_LOG(logINFO) << "Activation: " << stringEnable(activated); - } - - if(fileFormatType == BINARY) - for(int i=0; igetErrorStatus(); - if(!iret){ - cout << "UDP port opened at port " << port[i] << endl; - }else{ - FILE_LOG(logERROR) << "Could not create UDP socket on port " << port[i] << " error: " << iret; - shutDownUDPSockets(); - return FAIL; - } - } - - FILE_LOG(logDEBUG) << "UDP socket(s) created successfully."; - cout << "Listener Ready ..." << endl; - - return OK; -} - - - -int UDPStandardImplementation::setupWriter(){ - FILE_LOG(logDEBUG) << __AT__ << " starting"; - - //acquisition start call back returns enable write - cbAction = DO_EVERYTHING; - if (startAcquisitionCallBack) - cbAction=startAcquisitionCallBack(filePath,fileNamePerThread[0],(int)fileIndex,bufferSize,pStartAcquisition); - - - if(cbAction < DO_EVERYTHING){ - FILE_LOG(logINFO) << "Call back activated. Data saving must be taken care of by user in call back."; - if (rawDataReadyCallBack){ - FILE_LOG(logINFO) << "Data Write has been defined externally"; - } - }else if(!fileWriteEnable){ - FILE_LOG(logINFO) << "Data will not be saved"; - } - - - //creating first file - //setting all value to 1 - pthread_mutex_lock(&statusMutex); - for(int i=0; i DO_NOTHING){ - - - if(fileFormatType == BINARY){ - //close file pointers - if(sfilefd[ithread]){ - //all threads need to close file, reset mask and exit loop - if(myDetectorType == EIGER && fileWriteEnable && (cbAction > DO_NOTHING)){ - updateFileHeader(ithread); - fseek(sfilefd[ithread],0,0); - fwrite((void*)fileHeader[ithread], 1, FILE_HEADER_SIZE, sfilefd[ithread]); - } - fflush(sfilefd[ithread]); - fclose(sfilefd[ithread]); - sfilefd[ithread] = 0; - } - - - //create file - if(!overwriteEnable){ - if (NULL == (sfilefd[ithread] = fopen((const char *) (completeFileName[ithread]), "wx"))){ - FILE_LOG(logERROR) << "Could not create/overwrite file" << completeFileName[ithread]; - sfilefd[ithread] = 0; - return FAIL; - } - }else if (NULL == (sfilefd[ithread] = fopen((const char *) (completeFileName[ithread]), "w"))){ - FILE_LOG(logERROR) << "Could not create file" << completeFileName[ithread]; - sfilefd[ithread] = 0; - return FAIL; - } - //setting file buffer size to 16mb - setvbuf(sfilefd[ithread],NULL,_IOFBF,FILE_BUF_SIZE); - - - //Print packet loss and filenames - if(totalWritingPacketCount[ithread]){ - if(numberofWriterThreads>1){ - cprintf(BLUE,"File:%s" - "\nThread:%d" - "\tLost:%lld" - "\t\tPackets:%lld" - "\tFrame#:%lld" - "\tPFrame#:%lld\n", - completeFileName[ithread],ithread, - ((frameNumberInPreviousFile[ithread]+1+maxFramesPerFile)>numberOfFrames) - ?(long long int)((numberOfFrames-(frameNumberInPreviousFile[ithread]+1))*packetsPerFrame - totalPacketsInFile[ithread]) - :(long long int)((frameNumberInPreviousFile[ithread]+maxFramesPerFile - frameNumberInPreviousFile[ithread])*packetsPerFrame - totalPacketsInFile[ithread]), - (long long int)totalPacketsInFile[ithread], - (long long int)currentFrameNumber[ithread], - (long long int)frameNumberInPreviousFile[ithread] - ); - }else{ - cprintf(BLUE,"File:%s" - "\nLost:%lld" - "\t\tPackets:%lld" - "\tFrame#:%lld" - "\tPFrame#:%lld\n", - completeFileName[ithread], - ((frameNumberInPreviousFile[ithread]+1+maxFramesPerFile)>numberOfFrames) - ?(long long int)(numberOfFrames-(frameNumberInPreviousFile[ithread]+1)) - :(long long int)(frameNumberInPreviousFile[ithread]+maxFramesPerFile - frameNumberInPreviousFile[ithread]), - (long long int)totalPacketsInFile[ithread], - (long long int)currentFrameNumber[ithread], - (long long int)frameNumberInPreviousFile[ithread] - ); - } - - }else - printf("Thread:%d File opened:%s\n",ithread, completeFileName[ithread]); - - //write file header - if(myDetectorType == EIGER) - fwrite((void*)fileHeader[ithread], 1, FILE_HEADER_SIZE, sfilefd[ithread]); - } - - -#ifdef HDF5C - else if(fileFormatType == HDF5){ - struct timespec begin,end; - - pthread_mutex_lock(&writeMutex); - if(!ithread) - clock_gettime(CLOCK_REALTIME, &begin); - //closing file - try{ - Exception::dontPrint(); //to handle errors - if(hdf5_dataspaceId[ithread]) {delete hdf5_dataspaceId[ithread]; hdf5_dataspaceId[ithread] = 0;} - if(hdf5_datasetId[ithread]) {delete hdf5_datasetId[ithread]; hdf5_datasetId[ithread] = 0;} - if(hdf5_fileId[ithread]) {delete hdf5_fileId[ithread]; hdf5_fileId[ithread] = 0; } - if(hdf5_masterFileId) {delete hdf5_masterFileId; hdf5_masterFileId = 0;} - if(hdf5_virtualFileId) {delete hdf5_virtualFileId; hdf5_virtualFileId = 0;} - } - catch(AttributeIException error){ - cprintf(RED,"Error in creating attributes in thread %d\n",ithread); - error.printError(); - pthread_mutex_unlock(&writeMutex); - return FAIL; - } - catch(Exception error){ - cprintf(RED,"Error in closing HDF5 handles in thread %d\n",ithread); - error.printError(); - pthread_mutex_unlock(&writeMutex); - return FAIL; - }//end of closing file - - char masterFileName[1000]="", virtualFileName[1000]=""; - sprintf(masterFileName, "%s/%s_master_%lld.h5", filePath,getNameFromReceiverFilePrefix(fileName).c_str(),(long long int)fileIndex); - sprintf(virtualFileName, "%s/%s_%lld.h5", filePath,getNameFromReceiverFilePrefix(fileName).c_str(),(long long int)fileIndex); - - - //creating file - try{ - Exception::dontPrint(); //to handle errors - - if(!detID && !ithread){ - //creating master file with metadata - try{ - //creating file - FileAccPropList flist; - flist.setFcloseDegree(H5F_CLOSE_STRONG); - if(!overwriteEnable) - hdf5_masterFileId = new H5File( masterFileName, H5F_ACC_EXCL, NULL, flist ); - else - hdf5_masterFileId = new H5File( masterFileName, H5F_ACC_TRUNC, NULL, flist ); - - //create attributes - DataSpace dataspace = DataSpace (H5S_SCALAR); - Attribute attribute; - double dValue=0; - - //version - dValue=HDF5_WRITER_VERSION; - attribute = hdf5_masterFileId->createAttribute("version",PredType::NATIVE_DOUBLE, dataspace); - attribute.write(PredType::NATIVE_DOUBLE, &dValue); - - - //Create a group in the file - Group group1( hdf5_masterFileId->createGroup( "entry" ) ); - Group group2( group1.createGroup("data") ); - Group group3( group1.createGroup("instrument") ); - Group group4( group3.createGroup("beam") ); - Group group5( group3.createGroup("detector") ); - Group group6( group1.createGroup("sample") ); - - - int iValue=0; - StrType strdatatype(PredType::C_S1,256); - DataSet dataset; - - //Dynamic Range - dataset = group5.createDataSet ( "dynamic range", PredType::NATIVE_INT, dataspace ); - dataset.write ( &dynamicRange, PredType::NATIVE_INT); - attribute = dataset.createAttribute("unit",strdatatype, dataspace); - attribute.write(strdatatype, string("bits")); - //Ten Giga - iValue = tengigaEnable; - dataset = group5.createDataSet ( "ten giga enable", PredType::NATIVE_INT, dataspace ); - dataset.write ( &iValue, PredType::NATIVE_INT); - //Image Size - dataset = group5.createDataSet ( "image size", PredType::NATIVE_INT, dataspace ); - dataset.write ( &bufferSize, PredType::NATIVE_INT); - attribute = dataset.createAttribute("unit",strdatatype, dataspace); - attribute.write(strdatatype, string("bytes")); - //x - dataset = group5.createDataSet ( "number of pixels in x axis", PredType::NATIVE_INT, dataspace ); - dataset.write ( &NX, PredType::NATIVE_INT); - //y - dataset = group5.createDataSet ( "number of pixels in y axis", PredType::NATIVE_INT, dataspace ); - dataset.write ( &NY, PredType::NATIVE_INT); - //Total Frames - dataset = group5.createDataSet ( "total frames", PredType::STD_U64LE, dataspace ); - dataset.write ( &numberOfFrames, PredType::STD_U64LE); - //Exptime - dataset = group5.createDataSet ( "exposure time", PredType::STD_U64LE, dataspace ); - dataset.write ( &acquisitionTime, PredType::STD_U64LE); - attribute = dataset.createAttribute("unit",strdatatype, dataspace); - attribute.write(strdatatype, string("ns")); - //Period - dataset = group5.createDataSet ( "acquisition period", PredType::STD_U64LE, dataspace ); - dataset.write ( &acquisitionPeriod, PredType::STD_U64LE); - attribute = dataset.createAttribute("unit",strdatatype, dataspace); - attribute.write(strdatatype, string("ns")); - //Timestamp - time_t t = time(0); - dataset = group5.createDataSet ( "timestamp", strdatatype, dataspace ); - dataset.write ( string(ctime(&t)), strdatatype ); - - group1.close(); - group2.close(); - group3.close(); - group4.close(); - group5.close(); - group6.close(); - - }catch(Exception error){ - cprintf(RED,"Error in creating HDF5 master file in thread %d\n",ithread); - error.printError(); - pthread_mutex_unlock(&writeMutex); - return FAIL; - } - -/* - // creating virtual file - try{ - FileAccPropList flist; - flist.setFcloseDegree(H5F_CLOSE_STRONG); - if(!overwriteEnable) - hdf5_virtualFileId = new H5File( virtualFileName, H5F_ACC_EXCL, NULL,flist ); - else - hdf5_virtualFileId = new H5File( virtualFileName, H5F_ACC_TRUNC, NULL, flist ); - - //create dataspace for the dataset in the file - char dsetname[100]; - try{ - int numimagesindataset = ((numberOfFrames < MAX_IMAGES_IN_DATASET)? numberOfFrames:MAX_IMAGES_IN_DATASET); - hsize_t srcdims[3] = { numimagesindataset, NY*TILE_NY, NX*TILE*NX }; - if(dynamicRange == 4) - srcdims[2] = NX/2; - DataSpace dataspace(3,srcdims); - sprintf(dsetname, "/virtualdata_%012lld", (long long int)currentFrameNumber[ithread]+1); - - //create property list for a dataset - DSetCreatPropList plist = H5P_DEFAULT; - - //create chunked dataset if greater than max_chunked_images - if(numimagesindataset > MAX_CHUNKED_IMAGES){ - //set up fill values - int fillvalue = -1; /*Aldo suggested its time consuming*/ - - /* - plist.setFillValue(hdf5_datatype, &fillvalue); - hsize_t chunk_dims[3] ={MAX_CHUNKED_IMAGES, srcdims[1], srcdims[2]}; - plist.setChunk(3, chunk_dims); - } - - //Create dataset and write it into the file - hdf5_datasetId[ithread] = new DataSet (hdf5_fileId[ithread]->createDataSet( - dsetname, hdf5_datatype, dataspace, plist)); - -*/ - - /* fix this to mapp - hsize_t src_dims[1]={NZ}; - hsize_t start[RANK]= {0,0,0},count[RANK] = {1,1,NZ}; - for(int i=0;iclose(); - pthread_mutex_unlock(&writeMutex); - return FAIL; - } - - }catch(Exception error){ - cprintf(RED,"Error in creating HDF5 file %s in thread %d\n",completeFileName[ithread], ithread); - error.printError(); - hdf5_masterFileId->close(); - pthread_mutex_unlock(&writeMutex); - return FAIL; - } - */ - - /*//link ... master file should create link to the virtual file.. - try{ - char linkPath[1000]=""; - sprintf(linkPath,"/entry/data/%s",dsetname); - //herr_t H5Lcreate_external( const char *target_file_name, const char *target_obj_name, hid_t link_loc_id, const char *link_name, hid_t lcpl_id, hid_t lapl_id ) - H5Lcreate_external(masterFileName, dsetname, "/entry/data",dsetname,H5P_DEFAULT,H5P_DEFAULT - //hdf5_fileId[ithread]->link(H5G_LINK_HARD,dsetname,linkPath); - }catch(Exception error){ - cprintf(RED,"Error in creating link in thread %d\n", ithread); - error.printError(); - pthread_mutex_unlock(&writeMutex); - return FAIL; - } - */ - } - - - //creating file - FileAccPropList flist; - flist.setFcloseDegree(H5F_CLOSE_STRONG); - try{ - if(!overwriteEnable) - hdf5_fileId[ithread] = new H5File( completeFileName[ithread], H5F_ACC_EXCL, NULL,flist ); - else - hdf5_fileId[ithread] = new H5File( completeFileName[ithread], H5F_ACC_TRUNC, NULL, flist ); - }catch(Exception error){ - cprintf(RED,"Error in creating HDF5 file %s in thread %d\n",completeFileName[ithread], ithread); - error.printError(); - hdf5_masterFileId->close(); - pthread_mutex_unlock(&writeMutex); - return FAIL; - } - //create attributes - - try{ - DataSpace dataspace = DataSpace (H5S_SCALAR); - Attribute attribute; - double dValue=0; - - //version - dValue=HDF5_WRITER_VERSION; - attribute = hdf5_fileId[ithread]->createAttribute("version",PredType::NATIVE_DOUBLE, dataspace); - attribute.write(PredType::NATIVE_DOUBLE, &dValue); - - - int iValue=0; - StrType strdatatype(PredType::C_S1,256); - DataSet dataset; - if(myDetectorType == EIGER){ - //top - iValue = (flippedData[0]?0:1); - attribute = hdf5_fileId[ithread]->createAttribute("top",PredType::NATIVE_INT, dataspace); - attribute.write(PredType::NATIVE_INT, &iValue); - /*dataset = group5.createDataSet ( "top", PredType::NATIVE_INT, dataspace );dataset.write ( &iValue, PredType::NATIVE_INT);*/ - //left - iValue = (ithread?0:1); - attribute = hdf5_fileId[ithread]->createAttribute("left",PredType::NATIVE_INT, dataspace); - attribute.write(PredType::NATIVE_INT, &iValue); - /*dataset = group5.createDataSet ( "left", PredType::NATIVE_INT, dataspace );dataset.write ( &iValue, PredType::NATIVE_INT);*/ - //active - iValue = activated; - attribute = hdf5_fileId[ithread]->createAttribute("active",PredType::NATIVE_INT, dataspace); - attribute.write(PredType::NATIVE_INT, &iValue); - /*dataset = group5.createDataSet ( "active", PredType::NATIVE_INT, dataspace );dataset.write ( &iValue, PredType::NATIVE_INT);*/ - } - - }catch(Exception error){ - cprintf(RED,"Error in creating attribute to file in thread %d\n", ithread); - error.printError(); - hdf5_masterFileId->close(); - pthread_mutex_unlock(&writeMutex); - return FAIL; - } - //data - //create dataspace for the dataset in the file - char dsetname[100]; - try{ - - /*int numimagesindataset = ((numberOfFrames < MAX_IMAGES_IN_DATASET)? numberOfFrames:MAX_IMAGES_IN_DATASET); - hsize_t srcdims[3] = {numimagesindataset,NY,NX}; - */ - hsize_t srcdims[3] = {numberOfFrames,NY,NX}; - - - if(dynamicRange == 4) - srcdims[2] = NX/2; - hdf5_dataspaceId[ithread] = new DataSpace (3,srcdims); - sprintf(dsetname, "/data_%012lld", (long long int)currentFrameNumber[ithread]+1); - - //create chunked dataset if greater than max_chunked_images - /*if(numimagesindataset > MAX_CHUNKED_IMAGES){*/ - //create property list for a dataset - DSetCreatPropList plist; - //set up fill values - /*Aldo suggested its time consuming*/ - int fillvalue = -1; - plist.setFillValue(hdf5_datatype, &fillvalue); - - hsize_t chunk_dims[3] ={MAX_CHUNKED_IMAGES, NY, srcdims[2]}; - plist.setChunk(3, chunk_dims); - //Create dataset and write it into the file - hdf5_datasetId[ithread] = new DataSet (hdf5_fileId[ithread]->createDataSet( - dsetname, hdf5_datatype, *hdf5_dataspaceId[ithread], plist)); - /*}else - hdf5_datasetId[ithread] = new DataSet (hdf5_fileId[ithread]->createDataSet( - dsetname, hdf5_datatype, *hdf5_dataspaceId[ithread]));*/ - }catch(Exception error){ - cprintf(RED,"Error in creating dataset in thread %d\n",ithread); - error.printError(); - hdf5_masterFileId->close(); - pthread_mutex_unlock(&writeMutex); - return FAIL; - } - - } - catch(Exception error){ - cprintf(RED,"Error in creating HDF5 handles in thread %d\n",ithread); - error.printError(); - hdf5_masterFileId->close(); - pthread_mutex_unlock(&writeMutex); - return FAIL; - }//end of creating file - - if(!detID && !ithread){ - if(hdf5_masterFileId) - hdf5_masterFileId->close(); - if(hdf5_virtualFileId) - hdf5_virtualFileId->close(); - } - - if(!ithread){ - clock_gettime(CLOCK_REALTIME, &end); - cprintf(RED,"%d Elapsed time:%f seconds\n",ithread,( end.tv_sec - begin.tv_sec ) + ( end.tv_nsec - begin.tv_nsec ) / 1000000000.0); - } - pthread_mutex_unlock(&writeMutex); - - if(!totalWritingPacketCount[ithread]) - printf("Thread:%d File opened:%s\n",ithread, completeFileName[ithread]); - } -#endif - - } - - //reset counters for each new file - if(totalWritingPacketCount[ithread]){ - frameNumberInPreviousFile[ithread] = currentFrameNumber[ithread]; - totalPacketsInFile[ithread] = 0; - }else{ - frameNumberInPreviousFile[ithread] = -1; - frameNumberInPreviousCheck[ithread] = -1; - } - - - - return OK; -} - - - - -int UDPStandardImplementation::createCompressionFile(int ithread, int iframe){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - -#ifdef MYROOT1 - char temp[MAX_STR_LENGTH]; - //create file name for gui purposes, and set up acquistion parameters - sprintf(temp, "%s/%s_fxxx_%d_%d.root", filePath,fileNamePerThread[ithread],fileIndex,ithread); - //file - myFile[ithread] = new TFile(temp,"RECREATE");/** later return error if it exists */ - cprintf(GREEN,"Writing_Thread %d: Created Compression File: %s\n",ithread, temp); - //tree - sprintf(temp, "%s_fxxx_%d_%d",fileNamePerThread[ithread],fileIndex,ithread); - myTree[ithread]=singlePhotonDetectorObject[ithread]->initEventTree(temp, &iframe); - //resets the pedestalSubtraction array and the commonModeSubtraction - singlePhotonDetectorObject[ithread]->newDataSet(); - if(myFile[ithread]==NULL){ - FILE_LOG(logERROR) << "File Null"; - return FAIL; - } - if(!myFile[ithread]->IsOpen()){ - FILE_LOG(logERROR) << "File Not Open"; - return FAIL; - } - return OK; -#endif - return FAIL; -} - - - -void* UDPStandardImplementation::startDataCallbackThread(void* this_pointer){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - ((UDPStandardImplementation*)this_pointer)->startDataCallback(); - return this_pointer; -} - - - -void* UDPStandardImplementation::startListeningThread(void* this_pointer){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - ((UDPStandardImplementation*)this_pointer)->startListening(); - return this_pointer; -} - - - -void* UDPStandardImplementation::startWritingThread(void* this_pointer){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - ((UDPStandardImplementation*)this_pointer)->startWriting(); - return this_pointer; -} - - - - -void UDPStandardImplementation::startDataCallback(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - //set current thread value index - int ithread = currentThreadIndex; - struct timespec begin,end; - - // server address to bind - char hostName[100] = "tcp://*:";//"tcp://127.0.0.1:"; - int portno = DEFAULT_ZMQ_PORTNO + (detID*numberofListeningThreads+ithread); - sprintf(hostName,"%s%d",hostName,portno); - - //socket details - void *context = zmq_ctx_new(); - void *zmqsocket = zmq_socket(context, ZMQ_PUSH); // create a publisher - int val = -1; - zmq_setsockopt(zmqsocket, ZMQ_LINGER, &val,sizeof(val)); // wait for the unsent packets before closing socket - //val = 10; - //zmq_setsockopt(zmqsocket,ZMQ_SNDHWM,&val,sizeof(val)); //set SEND HIGH WATER MARK (8-9ms slower) - zmq_bind(zmqsocket,hostName); // bind - FILE_LOG(logINFO) << "Thread" << ithread << ": ZMQ Server at " << hostName; - - - int headersize=0; - switch(myDetectorType){ - case EIGER: - headersize = EIGER_DATA_PACKET_HEADER_SIZE; break; - default: - headersize = 0; break; - } - - //let calling function know thread started and obtained current (after sockets created) - if(!zmqThreadStarted) - zmqThreadStarted = true; - - /* outer loop - loops once for each acquisition */ - //infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again) - while(true){ - - int oneframesize = oneDataSize * packetsPerFrame; - char* buffer = new char[packetsPerFrame*oneDataSize](); - memset(buffer,0xFF,oneframesize); - int size = 0; - int offset = 0; - uint32_t currentfnum = 0; - uint64_t fnum = 0; - uint32_t pnum = 0; - uint32_t snum = 0; - uint64_t bid = 0; - bool randomSendNow = true; - bool newFrame = false; - - - - //header details - const char *type = "float64"; - const char *shape= "[1024, 512]"; - const char *jsonFmt ="{\"htype\":[\"chunk-1.0\"], \"type\":\"%s\", \"shape\":%s, \"acqIndex\":%d, \"fIndex\":%d, \"subfnum\":%d, \"fname\":\"%s\"}"; - char buf[1000]; - int acquisitionIndex = -1; - int frameIndex = -1; - int subframeIndex = -1; -#ifdef DEBUG - int oldpnum = -1; -#endif - int datapacketscaught = 0; - - /* inner loop - loop for each buffer */ - //until mask reset (dummy pcaket got by writer) - while((1 << ithread) & dataCallbackThreadsMask){ - - //let the writer thread continue, while we process carry over if any - sem_post(&writerGuiSemaphore[ithread]); - //wait for receiver to send more data - sem_wait(&dataCallbackWriterSemaphore[ithread]); - - //end if acquistion - if(guiNumPackets[ithread] == dummyPacketValue){ - - //sending previous half frames if any - if(!excludeMissingPackets && newFrame){ - //send header - //update frame details - frameIndex = fnum; - acquisitionIndex = fnum - startAcquisitionIndex; - if(dynamicRange == 32) subframeIndex = snum; - int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]); - zmq_send(zmqsocket, buf,len, ZMQ_SNDMORE); - //send data - zmq_send(zmqsocket, buffer, oneframesize, 0); - newFrame = false; - } - - - - //send final header - //update frame details -#ifdef DEBUG - cout << "sending dummy" << endl; -#endif - frameIndex = -9; - acquisitionIndex = -9; - subframeIndex = -9; - int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]); - zmq_send(zmqsocket, buf,len, ZMQ_SNDMORE); - //send final data - zmq_send (zmqsocket, "end", 3, 0); - - pthread_mutex_lock(&statusMutex); - dataCallbackThreadsMask^=(1<0) && (getFrameandPacketNumber(ithread, latestData[ithread]+offset, fnum, pnum,snum,bid)==FAIL)){ - offset+= onePacketSize; - } - //if(!ithread) cout<< ithread <<" fnum:"<< fnum<<" pnum:"<= size) - break; - - if(!frameToGuiFrequency) - currentfnum = fnum; - - - //last packet of same frame - if(fnum == currentfnum && pnum == (packetsPerFrame-1)){ -#ifdef DEBUG - oldpnum=0; -#endif - memcpy(buffer+(pnum*oneDataSize), latestData[ithread]+offset+headersize,oneDataSize); - offset+= onePacketSize; - //send header - //update frame details - frameIndex = fnum; - acquisitionIndex = fnum - startAcquisitionIndex; - if(dynamicRange == 32) subframeIndex = snum; - int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]); - zmq_send(zmqsocket, buf,len, ZMQ_SNDMORE); - //send data - zmq_send(zmqsocket, buffer, oneframesize, 0); - newFrame = false; -#ifdef DEBUG - if(!ithread)cprintf(BLUE,"%d sent (last packet)\n",ithread); -#endif - currentfnum++; - //start clock after sending - if(!frameToGuiFrequency){ - randomSendNow = false; - clock_gettime(CLOCK_REALTIME, &begin); - } - memset(buffer,0xFF,oneframesize); - - } - //same frame (not last) or next frame - else { - //next frame -#ifdef DEBUG - int once = true; -#endif - while(fnum > currentfnum){ -#ifdef DEBUG - if(once){ - if((fnum-currentfnum-1)>1) cprintf(RED,"%d Complete sub image missing:%d (cfnum:%d nfnum:%d)\n", - ithread,fnum-currentfnum-1,currentfnum,fnum); - once = false; - } -#endif - //send header - //update frame details - frameIndex = fnum; - acquisitionIndex = fnum - startAcquisitionIndex; - if(dynamicRange == 32) subframeIndex = snum; - int len = sprintf(buf,jsonFmt,type,shape, acquisitionIndex, frameIndex, subframeIndex,completeFileName[ithread]); - zmq_send(zmqsocket, buf,len, ZMQ_SNDMORE); - //send data - zmq_send(zmqsocket, buffer, oneframesize, 0); - newFrame = false; -#ifdef DEBUG - cprintf(BLUE,"%d sent (last packet of previous frame)\n",ithread); -#endif - currentfnum++; - //start clock after sending - if(!frameToGuiFrequency){ - randomSendNow = false; - clock_gettime(CLOCK_REALTIME, &begin); - } - memset(buffer,0xFF,oneframesize); - } - - memcpy(buffer+(pnum*oneDataSize), latestData[ithread]+offset+headersize,oneDataSize); - offset+= onePacketSize; - newFrame = true; - } - - } - } - - - - }/*--end of loop for each buffer (inner loop)*/ - - //free resources - delete[] buffer; - - //end of acquisition, wait for next acquisition/change of parameters - sem_wait(&dataCallbackSemaphore[ithread]); - - - //check to exit thread (for change of parameters) - only EXIT possibility - if(killAllDataCallbackThreads){ - break;//pthread_exit(NULL); - } - - }/*--end of loop for each acquisition (outer loop) */ - - - //free resources - zmq_unbind(zmqsocket, hostName); /* will this be too soon and cut the sending*/ - zmq_close(zmqsocket); - zmq_ctx_destroy(context); - cprintf(MAGENTA,"DataCallback_Thread %d:Goodbye!\n",ithread); -} - - - - -void UDPStandardImplementation::startListening(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - //set current thread value index - int ithread = currentThreadIndex; - //let calling function know thread started and obtained current - threadStarted = 1; - - - uint32_t rc = 0; //size of buffer received in bytes - //split frames for data compression - int carryonBufferSize = 0; //from previous buffer to keep frames together in a buffer - char* tempBuffer = 0; //temporary buffer to store split frames - - - /* outer loop - loops once for each acquisition */ - //infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again) - while(true){ - - //compression variables reset before acquisition - if(dataCompressionEnable){ - carryonBufferSize = 0; - if(tempBuffer){delete []tempBuffer;tempBuffer=0;} - tempBuffer = new char[onePacketSize * (packetsPerFrame - 1)](); //store maximum of 1 packets less in a frame - } - - /* inner loop - loop for each buffer */ - //until mask reset (udp sockets shut down by client) - while((1 << ithread) & listeningThreadsMask){ - - - //pop from fifo - fifoFree[ithread]->pop(buffer[ithread]); - - //udpsocket doesnt exist - if(activated && !udpSocket[ithread]){ - FILE_LOG(logERROR) << "Listening_Thread " << ithread << ": UDP Socket not created or shut down earlier"; - stopListening(ithread,0); - continue; - } - - if(!activated) //eiger not activated modules - rc = prepareAndListenBufferDeactivated(ithread); - else if(excludeMissingPackets) //eiger and jungfrau - rc = prepareAndListenBufferCompleteFrames(ithread); - else{ - rc = prepareAndListenBuffer(ithread, carryonBufferSize, tempBuffer); //others - carryonBufferSize = 0; - } - - //problem in receiving or end of acquisition - if (status == TRANSMITTING||(rc <= 0 && activated == 0)){ - stopListening(ithread,rc); - continue; - } - - if(dataCompressionEnable) - (*((uint32_t*)(buffer[ithread]))) = processListeningBuffer(ithread, carryonBufferSize, tempBuffer, rc); - - //push buffer to FIFO - while(!fifo[ithread]->push(buffer[ithread])); - - }/*--end of loop for each buffer (inner loop)*/ - - //end of acquisition, wait for next acquisition/change of parameters - sem_wait(&listenSemaphore[ithread]); - - //check to exit thread (for change of parameters) - only EXIT possibility - if(killAllListeningThreads){ - cprintf(BLUE,"Listening_Thread %d:Goodbye!\n",ithread); - //free resources at exit - if(tempBuffer) delete[] tempBuffer; - pthread_exit(NULL); - } - - }/*--end of loop for each acquisition (outer loop) */ -} - - - - - -int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, char* temp){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - int receivedSize = 0; - - //carry over from previous buffer - if(cSize) - memcpy(buffer[ithread] + fifoBufferHeaderSize, temp, cSize); - - //listen to after the carry over buffer - if(status != TRANSMITTING) - receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + fifoBufferHeaderSize + cSize, - (bufferSize * numberofJobsPerBuffer) - cSize); - - if(receivedSize > 0){ - //write packet count to buffer - *((uint32_t*)(buffer[ithread])) = (receivedSize/onePacketSize); - totalListeningPacketCount[ithread] += (receivedSize/onePacketSize); - - //start indices for each start of scan/acquisition - if(!measurementStarted[ithread]) //and rc>0 - startFrameIndices(ithread); - } -#ifdef DEBUG - cprintf(BLUE, "Listening_Thread %d : Received bytes: %d. Expected bytes: %d\n", ithread, receivedSize, bufferSize * numberofJobsPerBuffer-cSize); -#endif - return receivedSize; -} - - - - - - - -int UDPStandardImplementation::prepareAndListenBufferDeactivated(int ithread){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - //last - if(currentFrameNumber[ithread] == numberOfFrames) - return 0; - - //copy dummy packets - memset(buffer[ithread] + fifoBufferHeaderSize, 0xFF,bufferSize); - - //write fnum and number of packets - (*((uint64_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))) = currentFrameNumber[ithread]+1; - (*((uint32_t*)(buffer[ithread]))) = packetsPerFrame; - - //start indices for each start of scan/acquisition (rc > 0) - if(!measurementStarted[ithread]) - startFrameIndices(ithread); - - return bufferSize; -} - - - - -int UDPStandardImplementation::prepareAndListenBufferCompleteFrames(int ithread){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - int headerlength = 0; - uint32_t LASTPNUM = 0; - uint32_t FIRSTPNUM = 0; - int INCORDECR = 0; - switch(myDetectorType){ - case JUNGFRAU: - headerlength = JFRAU_HEADER_LENGTH; - FIRSTPNUM = packetsPerFrame-1; - LASTPNUM = 0; - INCORDECR = -1; - break; - case EIGER: - headerlength = EIGER_DATA_PACKET_HEADER_SIZE; - FIRSTPNUM = 0; - LASTPNUM = packetsPerFrame-1; - INCORDECR = 1; - break; - default:break; - } - - - int offset = fifoBufferHeaderSize; - uint32_t pnum = 0; - uint64_t fnum = 0; - uint64_t bnum = 0; - int rc = 0; - //from getframeandpacketnumber() - uint32_t pi = 0; - uint64_t fi = 0; - uint64_t bi = 0; - uint32_t si = 0; - - - //read first packet - pnum = FIRSTPNUM; //first packet number to validate - if(status != TRANSMITTING) rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + offset); - if(rc <= 0) return 0; - if(getFrameandPacketNumber(ithread,buffer[ithread] + offset,fi,pi,si,bi) == FAIL){ - pi = ALL_MASK_32; //got 0 from fpga - fi = ALL_MASK_32; - } - else - fnum = fi; //fnum of first packet - bnum = bi; //bnum of first packet - totalListeningPacketCount[ithread]++; -#ifdef VERBOSE - if(!ithread) cout << "1 pnum:" << pnum << endl; -#endif - //start indices for each start of scan/acquisition (rc > 0) - if(!measurementStarted[ithread]) - startFrameIndices(ithread); - - - while(true){ - - //------------------------------------------------------ correct packet -------------------------------------------------------- - if((myDetectorType == JUNGFRAU && pnum == pi) || //jungfrau only looks at pnum - (myDetectorType == EIGER && pnum == pi && fnum == fi)){ // eiger looks at pnum and fnum -#ifdef VERBOSE - if(!ithread) cout << "correct packet" << endl; -#endif - //copy only data - memcpy(buffer[ithread] + offset,buffer[ithread] + offset + headerlength, oneDataSize); - offset+=oneDataSize; - - //if complete frame - if(pnum == LASTPNUM) - break; - //else increment/decrement - pnum += INCORDECR; - - rc=0; //listen again - if(status != TRANSMITTING) - rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + offset); - if(rc <= 0){ //end: update ignored and return - if(myDetectorType == JUNGFRAU) - totalIgnoredPacketCount[ithread] += (packetsPerFrame - pnum); - else - totalIgnoredPacketCount[ithread] += (pnum + 1); - return 0; - } - totalListeningPacketCount[ithread]++; - if(getFrameandPacketNumber(ithread, buffer[ithread] + offset,fi,pi,si,bi) == FAIL){ - pi = ALL_MASK_32; //got 0 from fpga - fi = ALL_MASK_32; - totalIgnoredPacketCount[ithread] += (pnum + 1); - } - else if(myDetectorType == EIGER) - fnum = fi; //update currentfnum for eiger (next packets should have currentfnum value) -#ifdef VERBOSE - if(!ithread) cout << "next currentpnum :" << pnum << endl; -#endif - } - - //------------------------------------------------------ wrong packet -------------------------------------------------------- - else{ -#ifdef VERBOSE - if(!ithread) cprintf(RED,"wrong packet %d, expected packet %d fnum of last good one:%d\n", - pi,pnum,(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))); -#endif - if(myDetectorType == JUNGFRAU) - totalIgnoredPacketCount[ithread] += (packetsPerFrame - pnum -1); //extra 1 subtracted now to be added in the while loop anyway - else - totalIgnoredPacketCount[ithread] += pnum; //extra 1 subtracted now to be added in the while loop anyway - pnum = FIRSTPNUM; - offset = fifoBufferHeaderSize; - - //find the start of next image - while(pnum != pi){ - totalIgnoredPacketCount[ithread]++; - - rc=0; - if(status != TRANSMITTING) - rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + offset); - if(rc <= 0){ - if(myDetectorType == JUNGFRAU) - totalIgnoredPacketCount[ithread] += (packetsPerFrame - pnum); - else - totalIgnoredPacketCount[ithread] += (pnum + 1); - return 0; - } - totalListeningPacketCount[ithread]++; - if(getFrameandPacketNumber(ithread, buffer[ithread] + offset,fi,pi,si,bi) == FAIL){ - pi = ALL_MASK_32; //got 0 from fpga - fi = ALL_MASK_32; - } -#ifdef VERBOSE - if(!ithread) cout << "trying to find pnum:" << pnum << " got " << pi << endl; -#endif - } - if(fi!=ALL_MASK_32) - fnum = fi; //fnum of first packet - bnum = bi; //bnum of first packet - } - } - //------------------------------------------------------ got a complete frame -------------------------------------------------------- - - //write frame number - (*((uint64_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))) = fnum + startAcquisitionIndex; -#ifdef VERBOSE - if(!ithread) cout << "fnum:" << (*((uint64_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))) << endl; -#endif - if(myDetectorType == JUNGFRAU) - (*((uint64_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + FILE_HEADER_BUNCHID_OFFSET))) = bnum; - //write packet count to buffer - *((uint32_t*)(buffer[ithread])) = packetsPerFrame; - return bufferSize; -} - - - -void UDPStandardImplementation::startFrameIndices(int ithread){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - - jfrau_packet_header_t* header=0; - switch(myDetectorType){ - case EIGER: - startFrameIndex = 1; //frame number always resets - break; - case JUNGFRAU: - header = (jfrau_packet_header_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS); - startFrameIndex = (*( (uint32_t*) header->frameNumber))&frameIndexMask; - break; - default: - if(shortFrameEnable < 0){ - startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + fifoBufferHeaderSize))))+1) - & (frameIndexMask)) >> frameIndexOffset); - }else{ - startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+fifoBufferHeaderSize)))) - & (frameIndexMask)) >> frameIndexOffset); - } - break; - } - - //start of entire acquisition - if(!acqStarted){ - pthread_mutex_lock(&progressMutex); - startAcquisitionIndex = startFrameIndex; - acqStarted = true; - pthread_mutex_unlock(&progressMutex); - cprintf(BLUE,"Listening_Thread %d: startAcquisitionIndex:%lld\n",ithread,(long long int)startAcquisitionIndex); - } - - //set start of scan/real time measurement - cprintf(BLUE,"Listening_Thread %d: startFrameIndex: %lld\n", ithread,(long long int)startFrameIndex); - measurementStarted[ithread] = true; -} - - - - - - - -void UDPStandardImplementation::stopListening(int ithread, int numbytes){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - -#ifdef DEBUG4 - cprintf(BLUE,"Listening_Thread %d: Stop Listening\nStatus: %s numbytes:%d\n", ithread, runStatusType(status).c_str(),numbytes); -#endif - - //free empty buffer - if(numbytes <= 0){ - FILE_LOG(logINFO) << "Listening "<< ithread << ": End of Acquisition"; - while(!fifoFree[ithread]->push(buffer[ithread])); - } - - - //push last non empty buffer into fifo - else{ - if(excludeMissingPackets){ - (*((uint32_t*)(buffer[ithread]))) = numbytes/oneDataSize; - totalListeningPacketCount[ithread] += (numbytes/oneDataSize); - }else{ - (*((uint32_t*)(buffer[ithread]))) = numbytes/onePacketSize; - totalListeningPacketCount[ithread] += (numbytes/onePacketSize); - } -#ifdef DEBUG - cprintf(BLUE,"Listening_Thread %d: Last Buffer numBytes:%d\n",ithread, numbytes); - cprintf(BLUE,"Listening_Thread %d: Last Buffer packet count:%d\n",ithread,(*((uint32_t*)(buffer[ithread]))) ); -#endif - while(!fifo[ithread]->push(buffer[ithread])); - } - - //push dummy-end buffer into fifo for all writer threads - fifoFree[ithread]->pop(buffer[ithread]); - - //creating dummy-end buffer with pc=0xFFFF - (*((uint32_t*)(buffer[ithread]))) = dummyPacketValue; - while(!fifo[ithread]->push(buffer[ithread])); - - - //reset mask and exit loop - pthread_mutex_lock(&statusMutex); - listeningThreadsMask^=(1<> frameIndexOffset)); -#endif - cSize = onePacketSize; - --packetCount; - } - } -#ifdef DEBUG4 - cprintf(BLUE, "Listening_Thread %d: First Header:%d\n", (((((uint32_t)(*((uint32_t*)(buffer[ithread] + fifoBufferHeaderSize))))+1) - & (frameIndexMask)) >> frameIndexOffset)); -#endif - break; - - case MOENCH: - lastPacketOffset = (((packetCount - 1) * onePacketSize) + fifoBufferHeaderSize); -#ifdef DEBUG4 - cprintf(BLUE, "Listening_Thread %d: First Header:%d\t First Packet:%d\t Last Header:%d\t Last Packet:%d\tLast Packet Offset:%d\n", - (((((uint32_t)(*((uint32_t*)(buffer[ithread]+fifoBufferHeaderSize))))) & (frameIndexMask)) >> frameIndexOffset), - ((((uint32_t)(*((uint32_t*)(buffer[ithread]+fifoBufferHeaderSize))))) & (packetIndexMask)), - (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset), - ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask)), - lastPacketOffset); -#endif - //moench last packet value is 0, so find the last packet and store the others in a temp storage - if( ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastPacketOffset))))) & (packetIndexMask))){ - lastFrameHeader = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastPacketOffset))))) - & (frameIndexMask)) >> frameIndexOffset; - cSize += onePacketSize; - lastPacketOffset -= onePacketSize; - --packetCount; - while (lastFrameHeader == (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastPacketOffset))))) & (frameIndexMask)) >> frameIndexOffset)){ - cSize += onePacketSize; - lastPacketOffset -= onePacketSize; - --packetCount; - } - memcpy(temp, buffer[ithread]+(lastPacketOffset+onePacketSize), cSize); -#ifdef DEBUG4 - cprintf(BLUE, "Listening_Thread %d: temp Header:%d\t temp Packet:%d\n", - (((((uint32_t)(*((uint32_t*)(temp)))))& (frameIndexMask)) >> frameIndexOffset), - ((((uint32_t)(*((uint32_t*)(temp))))) & (packetIndexMask))); -#endif - } - break; - - default: - cprintf(RED,"Listening_Thread %d: Error: This detector %s is not implemented in the receiver\n", - ithread, getDetectorType(myDetectorType).c_str()); - break; - } - -#ifdef DEBUG4 - cprintf(BLUE,"Listening_Thread %d: PacketCount:%d CarryonBufferSize:%d\n",ithread, packetCount, cSize); -#endif - - return packetCount; -} - - - - - - -void UDPStandardImplementation::startWriting(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - int ithread = currentThreadIndex; //set current thread value index - threadStarted = 1; //let calling function know thread started and obtained current - - char* wbuf = NULL; //buffer popped from FIFO - sfilefd[ithread] = 0; //file pointer - uint64_t nf = 0; //for compression, number of frames - - - /* outer loop - loops once for each acquisition */ - //infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again) - while(true){ - - //--reset parameters before acquisition (depending on compression) - nf = 0; //compression has only one listening thread (anything not eiger) - - - /* inner loop - loop for each buffer */ - //until mask unset (udp sockets shut down by client) - while((1 << ithread) & writerThreadsMask){ - - //pop - if(!dataCompressionEnable) - fifo[ithread]->pop(wbuf); - else - fifo[0]->pop(wbuf); - uint32_t numPackets = (uint32_t)(*((uint32_t*)wbuf)); - -#ifdef DEBUG4 - cprintf(GREEN,"Writing_Thread %d: Number of Packets: %d for FIFO %d\n", ithread, numPackets, dataCompressionEnable?0:ithread); -#endif - - - //end of acquisition - if(numPackets == dummyPacketValue){ -#ifdef DEBUG4 - cprintf(GREEN,"Writing_Thread %d: Dummy frame popped out of FIFO %d",ithread, dataCompressionEnable?0:ithread); -#endif - stopWriting(ithread,wbuf); - continue; - } - - //jungfrau and eiger - if(excludeMissingPackets) - handleCompleteFramesOnly(ithread, wbuf); - //normal - else if(!dataCompressionEnable) - handleWithoutDataCompression(ithread, wbuf, numPackets); - - //compression - else{ -#if defined(MYROOT1) && defined(ALLFILE_DEBUG) - if(npackets > 0) - writeFileWithoutCompression(wbuf, numPackets); -#endif - handleDataCompression(ithread,wbuf,nf); - } - }/*--end of loop for each buffer (inner loop)*/ - - waitWritingBufferForNextAcquisition(ithread); - - }/*--end of loop for each acquisition (outer loop) */ -} - - - - - - - - -void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - //in case they are not closed already - closeFile(ithread); -#ifdef DEBUG4 - cprintf(GREEN,"Writing_Thread %d: Done with acquisition. Waiting for 1st sem to create new file/change of parameters\n", ithread); -#endif - //end of acquisition, wait for file create/change of parameters - sem_wait(&writerSemaphore[ithread]); - //check to exit thread (for change of parameters) - only EXIT possibility - if(killAllWritingThreads){ - cprintf(GREEN,"Writing_Thread %d:Goodbye!\n",ithread); - pthread_exit(NULL); - } -#ifdef DEBUG4 - cprintf(GREEN,"Writing_Thread %d: Got 1st post. Creating File\n", ithread); -#endif - - - //pop fifo so that its empty - char* temp; - while(!fifo[ithread]->isEmpty()){ - cprintf(RED,"%d:fifo emptied\n", ithread); - fifo[ithread]->pop(temp); - fifoFree[ithread]->push(temp); - } - - //create file - if((1<push(wbuffer)); - - if(dataStreamEnable){ - sem_wait(&writerGuiSemaphore[ithread]); //ensure previous frame was processed - guiNumPackets[ithread] = dummyPacketValue; - sem_post(&dataCallbackWriterSemaphore[ithread]); //let it know its got data - } - - - //all threads need to close file, reset mask and exit loop - if(fileWriteEnable && (cbAction > DO_NOTHING)){ - if(fileFormatType == BINARY && myDetectorType == EIGER){ - updateFileHeader(ithread); - fseek(sfilefd[ithread],0,0); - fwrite((void*)fileHeader[ithread], 1, FILE_HEADER_SIZE, sfilefd[ithread]); - } - } - - //Print packet loss - //if(totalWritingPacketCountFromLastCheck[ithread]){ -#ifdef VERBOSE - if(fileFormatType == BINARY){ - if(numberofWriterThreads>1){ - printf("Thread:%d" - "\tLost:%lld" - "\t\tPackets:%lld" - "\tFrame#:%lld" - "\tPFrame#:%lld\n", - ithread, - ((frameNumberInPreviousCheck[ithread]+1+(maxFramesPerFile/progressFrequency))>numberOfFrames) - ?(long long int)((numberOfFrames-(frameNumberInPreviousCheck[ithread]+1))*packetsPerFrame - totalWritingPacketCountFromLastCheck[ithread]) - :(long long int)((frameNumberInPreviousCheck[ithread]+(maxFramesPerFile/progressFrequency) - frameNumberInPreviousCheck[ithread])*packetsPerFrame - totalWritingPacketCountFromLastCheck[ithread]), - (long long int)totalWritingPacketCountFromLastCheck[ithread], - (long long int)currentFrameNumber[ithread], - (long long int)frameNumberInPreviousCheck[ithread] - ); - }else{ - printf("Lost:%lld" - "\t\tPackets:%lld" - "\tFrame#:%lld" - "\tPFrame#:%lld\n", - ((frameNumberInPreviousCheck[ithread]+1+(maxFramesPerFile/progressFrequency))>numberOfFrames) - ?(long long int)((numberOfFrames-(frameNumberInPreviousCheck[ithread]+1))*packetsPerFrame - totalWritingPacketCountFromLastCheck[ithread]) - :(long long int)((frameNumberInPreviousCheck[ithread]+(maxFramesPerFile/progressFrequency) - frameNumberInPreviousCheck[ithread])*packetsPerFrame - totalWritingPacketCountFromLastCheck[ithread]), - (long long int)totalWritingPacketCountFromLastCheck[ithread], - (long long int)currentFrameNumber[ithread], - (long long int)frameNumberInPreviousCheck[ithread] - ); - } - - if(numberofWriterThreads>1){ - cprintf(BLUE,"File:%s" - "\nThread:%d" - "\tLost:%lld" - "\t\tPackets:%lld" - "\tFrame#:%lld" - "\tPFrame#:%lld\n", - completeFileName[ithread],ithread, - ((frameNumberInPreviousFile[ithread]+1+maxFramesPerFile)>numberOfFrames) - ?(long long int)((numberOfFrames-(frameNumberInPreviousFile[ithread]+1))*packetsPerFrame - totalPacketsInFile[ithread]) - :(long long int)((frameNumberInPreviousFile[ithread]+maxFramesPerFile - frameNumberInPreviousFile[ithread])*packetsPerFrame - totalPacketsInFile[ithread]), - (long long int)totalPacketsInFile[ithread], - (long long int)currentFrameNumber[ithread], - (long long int)frameNumberInPreviousFile[ithread] - ); - }else{ - cprintf(BLUE,"File:%s" - "\nLost:%lld" - "\t\tPackets:%lld" - "\tFrame#:%lld" - "\tPFrame#:%lld\n", - completeFileName[ithread], - ((frameNumberInPreviousFile[ithread]+1+maxFramesPerFile)>numberOfFrames) - ?(long long int)(numberOfFrames-(frameNumberInPreviousFile[ithread]+1)) - :(long long int)(frameNumberInPreviousFile[ithread]+maxFramesPerFile - frameNumberInPreviousFile[ithread]), - (long long int)totalPacketsInFile[ithread], - (long long int)currentFrameNumber[ithread], - (long long int)frameNumberInPreviousFile[ithread] - ); - } - } -#endif - //} - - closeFile(ithread); - pthread_mutex_lock(&statusMutex); - writerThreadsMask^=(1<push(wbuffer)); - - return; - } - - - //callback to write data - if (cbAction < DO_EVERYTHING) - rawDataReadyCallBack((int)tempframenumber, wbuffer + fifoBufferHeaderSize, npackets * onePacketSize, - sfilefd[ithread], latestData[ithread],pRawDataReady);//know which thread from sfilefd - - - //write to file if enabled and update write parameters - if(npackets > 0) - writeFileWithoutCompression(ithread, wbuffer, npackets); -#ifdef DEBUG4 - cprintf(GREEN,"Writing_Thread: Writing done\nGoing to copy frame\n"); -#endif - - - //copy frame for gui - //if(npackets >= (packetsPerFrame/numberofListeningThreads)) - if(dataStreamEnable && npackets > 0) - copyFrameToGui(ithread, wbuffer,npackets); -#ifdef DEBUG4 - cprintf(GREEN,"Writing_Thread: Copied frame\n"); -#endif - - - //free fifo addresses - int listenfifoThread = ithread; - if(dataCompressionEnable) - listenfifoThread = 0; - while(!fifoFree[listenfifoThread]->push(wbuffer)); -#ifdef EVERYFIFODEBUG - if(fifoFree[listenfifoThread]->getSemValue()<100) - cprintf(GREEN,"FifoFree[%d]: value:%d, push 0x%x\n",listenfifoThread,fifoFree[listenfifoThread]->getSemValue(),(void*)(wbuffer)); -#endif -#ifdef DEBUG5 - cprintf(GREEN,"Writing_Thread %d: Freed buffer, pushed into fifofree %p for listener %d \n",listenfifoThread, (void*)(wbuffer), listenfifoThread); -#endif - -} - - - - -void UDPStandardImplementation::handleCompleteFramesOnly(int ithread, char* wbuffer){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - //get current frame number - uint64_t tempframenumber; - tempframenumber = (*((uint64_t*)(wbuffer+HEADER_SIZE_NUM_TOT_PACKETS))); - tempframenumber -= startFrameIndex; - - - if (cbAction < DO_EVERYTHING) - rawDataReadyCallBack((int)tempframenumber, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS, bufferSize + FILE_FRAME_HEADER_LENGTH, - sfilefd[ithread], latestData[ithread],pRawDataReady); - - - //write to file if enabled and update write parameters - if((fileWriteEnable) && (sfilefd[ithread] -#ifdef HDF5C - ||hdf5_fileId[ithread])){ -#else - )){ -#endif - - if(fileFormatType == BINARY){ - if(tempframenumber && (tempframenumber%maxFramesPerFile) == 0) - createNewFile(ithread); - fwrite(wbuffer + HEADER_SIZE_NUM_TOT_PACKETS, 1, (bufferSize + FILE_FRAME_HEADER_LENGTH), sfilefd[ithread]); - } - - - -#ifdef HDF5C - else if (fileFormatType == HDF5){ - pthread_mutex_lock(&writeMutex); - - /* if(tempframenumber && (tempframenumber%MAX_IMAGES_IN_DATASET) == 0){ - try{ - Exception::dontPrint(); //to handle errors - if(hdf5_datasetId[ithread]) {delete hdf5_datasetId[ithread]; hdf5_datasetId[ithread] = 0;} - char dsetname[100]; - sprintf(dsetname, "/entry/data/data_%012lld", (long long int)currentFrameNumber[ithread]+1); - //create new dataspace if fewer than max images/dataset - int numimagesindataset = (((numberOfFrames-tempframenumber) < MAX_IMAGES_IN_DATASET)? (numberOfFrames-tempframenumber):MAX_IMAGES_IN_DATASET); - if(numimagesindataset MAX_CHUNKED_IMAGES){ - DSetCreatPropList plist; - hsize_t chunk_dims[3] ={MAX_CHUNKED_IMAGES, NY, NX}; - if(dynamicRange == 4) - chunk_dims[2] = NX/2; - plist.setChunk(3, chunk_dims); - - hdf5_datasetId[ithread] = new DataSet (hdf5_fileId[ithread]->createDataSet( - dsetname, hdf5_datatype, *hdf5_dataspaceId[ithread],plist)); - } - else - hdf5_datasetId[ithread] = new DataSet (hdf5_fileId[ithread]->createDataSet( - dsetname, hdf5_datatype, *hdf5_dataspaceId[ithread])); - - //link - char linkPath[1000]=""; - sprintf(linkPath,"/entry/data/%s",dsetname); - hdf5_fileId[ithread]->link(H5G_LINK_HARD,dsetname,linkPath); - } - catch(Exception error){ - cprintf(RED,"Error in closing HDF5 dataset to create a new one in thread %d\n",ithread); - error.printError(); - } - } -*/ - struct timespec begin,end; - if(!ithread && !tempframenumber) - clock_gettime(CLOCK_REALTIME, &begin); - - //wite to file - hsize_t count[3] = {1, NY,NX}; - hsize_t start[3] = {tempframenumber%MAX_IMAGES_IN_DATASET, 0 , 0}; - hsize_t dims2[2]={NY,NX}; - if(dynamicRange == 4){ - dims2[1] = NX/2; - count[1] = NX/2; - } - - try{ - Exception::dontPrint(); //to handle errors - hdf5_dataspaceId[ithread]->selectHyperslab( H5S_SELECT_SET, count, start); - DataSpace memspace(2,dims2); - hdf5_datasetId[ithread]->write(wbuffer + fifoBufferHeaderSize, hdf5_datatype, memspace, *hdf5_dataspaceId[ithread]); - memspace.close(); - } - catch(Exception error){ - cprintf(RED,"Error in writing to file in thread %d\n",ithread); - error.printError(); - } - - if(!ithread && !tempframenumber){ - clock_gettime(CLOCK_REALTIME, &end); - cprintf(RED,"%d Writing packets Elapsed time:%f seconds\n",ithread,( end.tv_sec - begin.tv_sec ) + ( end.tv_nsec - begin.tv_nsec ) / 1000000000.0); - } - - pthread_mutex_unlock(&writeMutex); - } -#endif - - } - - - //progress - if(tempframenumber && (tempframenumber%(maxFramesPerFile/progressFrequency)) == 0){ - if(numberofWriterThreads>1){ - printf("Thread:%d" - "\tLost:%lld" - "\t\tPackets:%lld" - "\tFrame#:%lld" - "\tPFrame#:%lld\n", - ithread, - ((frameNumberInPreviousCheck[ithread]+1+(maxFramesPerFile/progressFrequency))>numberOfFrames) - ?(long long int)((numberOfFrames-(frameNumberInPreviousCheck[ithread]+1))*packetsPerFrame - totalWritingPacketCountFromLastCheck[ithread]) - :(long long int)((frameNumberInPreviousCheck[ithread]+(maxFramesPerFile/progressFrequency) - frameNumberInPreviousCheck[ithread])*packetsPerFrame - totalWritingPacketCountFromLastCheck[ithread]), - (long long int)totalWritingPacketCountFromLastCheck[ithread], - (long long int)currentFrameNumber[ithread], - (long long int)frameNumberInPreviousCheck[ithread] - ); - }else{ - printf("Lost:%lld" - "\t\tPackets:%lld" - "\tFrame#:%lld" - "\tPFrame#:%lld\n", - ((frameNumberInPreviousCheck[ithread]+1+(maxFramesPerFile/progressFrequency))>numberOfFrames) - ?(long long int)((numberOfFrames-(frameNumberInPreviousCheck[ithread]+1))*packetsPerFrame - totalWritingPacketCountFromLastCheck[ithread]) - :(long long int)((frameNumberInPreviousCheck[ithread]+(maxFramesPerFile/progressFrequency) - frameNumberInPreviousCheck[ithread])*packetsPerFrame - totalWritingPacketCountFromLastCheck[ithread]), - (long long int)totalWritingPacketCountFromLastCheck[ithread], - (long long int)currentFrameNumber[ithread], - (long long int)frameNumberInPreviousCheck[ithread] - ); - } - //reset counters for each new file - frameNumberInPreviousCheck[ithread] = currentFrameNumber[ithread]; - totalWritingPacketCountFromLastCheck[ithread] = 0; - } - - totalWritingPacketCountFromLastCheck[ithread]+= packetsPerFrame; - totalPacketsInFile[ithread] += packetsPerFrame; - totalWritingPacketCount[ithread] += packetsPerFrame; - lastFrameNumberInFile[ithread] = tempframenumber; - currentFrameNumber[ithread] = tempframenumber; - //cout<<"curentframenumber:"< 1) - pthread_mutex_lock(&writeMutex); - - packetsCaught += packetsPerFrame; - totalPacketsCaught += packetsPerFrame; - if((currentFrameNumber[ithread] - startAcquisitionIndex) > acquisitionIndex) - acquisitionIndex = currentFrameNumber[ithread] - startAcquisitionIndex; - if((currentFrameNumber[ithread] - startFrameIndex) > frameIndex[ithread]) - frameIndex[ithread] = currentFrameNumber[ithread] - startFrameIndex; - - if(numberofWriterThreads > 1) - pthread_mutex_unlock(&writeMutex); - - if(!activated) - currentFrameNumber[ithread]++; - -#ifdef DEBUG4 - cprintf(GREEN,"Writing_Thread: Writing done\nGoing to copy frame\n"); -#endif - - - //copy frame for gui - if(dataStreamEnable) - copyFrameToGui(ithread, wbuffer, packetsPerFrame); -#ifdef DEBUG4 - cprintf(GREEN,"Writing_Thread: Copied frame\n"); -#endif - - - //free fifo addresses - while(!fifoFree[ithread]->push(wbuffer)); -#ifdef DEBUG5 - cprintf(GREEN,"Writing_Thread %d: Freed buffer, pushed into fifofree %p for listener %d \n",ithread, (void*)(wbuffer), ithread); -#endif - -} - - - -void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* wbuffer,uint32_t numpackets){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - //if write enabled - if((fileWriteEnable) && (sfilefd[ithread])){ - if(numpackets){ - int offset = fifoBufferHeaderSize; - uint64_t nextFileFrameNumber; - int packetsWritten = 0; - //if(ithread) cout<<"numpackets:"<=0)){ - //get start frame (required to create new file at the right juncture) - uint64_t startframe = 0; - uint32_t pnum = 0; - uint32_t snum = 0; - uint64_t bunchid = 0; - //if(ithread) cout<<"getting start frame number"<push(wbuffer)); - return; - } - //if(ithread) cout<<"done getting start frame number"<=0) &&(!((lastFrameNumberInFile[ithread]+1) % maxFramesPerFile))) - createNewFile(ithread); - - - //frames to save in one file - nextFileFrameNumber = (lastFrameNumberInFile[ithread]+1) + - (maxFramesPerFile - ((lastFrameNumberInFile[ithread]+1)%maxFramesPerFile)); - - if(writeUptoFrameNumber(ithread, wbuffer, offset, nextFileFrameNumber, numpackets, packetsWritten) == FAIL) - //weird frame number of zero from fpga - return; - - //update stats - numpackets -= packetsWritten; - totalPacketsInFile[ithread] += packetsWritten; - totalWritingPacketCount[ithread] += packetsWritten; - pthread_mutex_lock(&writeMutex); - packetsCaught += packetsWritten; - totalPacketsCaught += packetsWritten; - pthread_mutex_unlock(&writeMutex); - currentFrameNumber[ithread] = lastFrameNumberInFile[ithread]; - } - } - } - - //only update parameters - else{ - - if(numpackets){ - //get last frame number - uint64_t finalLastFrameNumberToSave = 0; - uint32_t pnum; - uint32_t snum; - uint64_t bunchid = 0; - if(getFrameandPacketNumber(ithread, wbuffer + fifoBufferHeaderSize + ((numpackets - 1) * onePacketSize), finalLastFrameNumberToSave,pnum,snum,bunchid) == FAIL){ - //error in frame number sent by fpga - while(!fifoFree[ithread]->push(wbuffer)); - return; - } - totalPacketsInFile[ithread] += numpackets; - totalWritingPacketCount[ithread] += numpackets; - lastFrameNumberInFile[ithread] = finalLastFrameNumberToSave; - currentFrameNumber[ithread] = finalLastFrameNumberToSave; - - } - - if(numberofWriterThreads > 1) pthread_mutex_lock(&writeMutex); - packetsCaught += numpackets; - totalPacketsCaught += numpackets; - if(numberofWriterThreads > 1) pthread_mutex_unlock(&writeMutex); - } - - //set indices - pthread_mutex_lock(&progressMutex); - if((currentFrameNumber[ithread] - startAcquisitionIndex) > acquisitionIndex) - acquisitionIndex = currentFrameNumber[ithread] - startAcquisitionIndex; - if((currentFrameNumber[ithread] - startFrameIndex) > frameIndex[ithread]) - frameIndex[ithread] = currentFrameNumber[ithread] - startFrameIndex; - pthread_mutex_unlock(&progressMutex); -} - - - - - - -void UDPStandardImplementation::updateFileHeader(int ithread){ - //update file header - time_t t = time(0); - sprintf(fileHeader[ithread], - "\nHeader\t\t: %d bytes\n" - "Top\t\t: %d\n" - "Left\t\t: %d\n" - "Active\t\t: %d\n" - "Frames Caught\t: %lld\n" - "Frames Lost\t: %lld\n" - "Dynamic Range\t: %d\n" - "Ten Giga\t: %d\n" - "Image Size\t: %d bytes\n" - "x\t\t: %d pixels\n" - "y\t\t: %d pixels\n" - "Total Frames\t: %lld\n" - "Exptime (ns)\t: %lld\n" - "Period (ns)\t: %lld\n" - "Timestamp\t: %s\n\n" - - "#Frame Header\n" - "Frame Number\t: 8 bytes\n" - "Bunch ID\t: 8 bytes\n", - FILE_HEADER_SIZE, - (flippedData[0]?0:1), - (ithread?0:1), - activated, - (long long int)(totalPacketsInFile[ithread]/packetsPerFrame), - ((frameNumberInPreviousFile[ithread]+1+maxFramesPerFile)>numberOfFrames) - ?(long long int)((numberOfFrames-(frameNumberInPreviousFile[ithread]+1)) - (totalPacketsInFile[ithread]/packetsPerFrame)) - :(long long int)((frameNumberInPreviousFile[ithread]+maxFramesPerFile - frameNumberInPreviousFile[ithread]) - (totalPacketsInFile[ithread]/packetsPerFrame)), - dynamicRange,tengigaEnable, - bufferSize, - //only for eiger right now - EIGER_PIXELS_IN_ONE_ROW,EIGER_PIXELS_IN_ONE_COL, - (long long int)numberOfFrames, - (long long int)acquisitionTime, - (long long int)acquisitionPeriod, - ctime(&t)); - if(strlen(fileHeader[ithread]) > FILE_HEADER_SIZE) - cprintf(BG_RED,"File Header Size %d is too small for fixed file header size %d\n",(int)strlen(fileHeader[ithread]),(int)FILE_HEADER_SIZE); - - -} - -//called only if datacallback enabled -void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer, uint32_t numpackets){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - //if nthe frame, wait for your turn (1st frame always shown as its zero) - if(frameToGuiFrequency && ((frametoGuiCounter[ithread])%frameToGuiFrequency)); - - //random read (gui ready) or nth frame read: gui needs data now or it is the first frame - else{ - -#ifdef DEBUG4 - cprintf(GREEN,"Writing_Thread: CopyingFrame: Going to copy data\n"); -#endif - //ensure previous frame was processed - sem_wait(&writerGuiSemaphore[ithread]); - - //copy date - guiNumPackets[ithread] = numpackets; - strcpy(guiFileName[ithread],completeFileName[ithread]); - - if(excludeMissingPackets) //copy also the header - memcpy(latestData[ithread],buffer+HEADER_SIZE_NUM_TOT_PACKETS, bufferSize + FILE_FRAME_HEADER_LENGTH); - else //copy only the data - memcpy(latestData[ithread],buffer+ fifoBufferHeaderSize , numpackets*onePacketSize); - //let it know its got data - sem_post(&dataCallbackWriterSemaphore[ithread]); - -#ifdef DEBUG4 - cprintf(GREEN,"Writing_Thread: CopyingFrame: Copied Data\n"); -#endif - - } - - //update the counter for nth frame - if(frameToGuiFrequency) - frametoGuiCounter[ithread]++; - - -} - - - - - -void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer, uint64_t &nf){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - //get frame number - uint64_t tempframenumber=-1; - uint32_t pnum; - uint32_t snum; - uint64_t bunchid=-1; - if(getFrameandPacketNumber(ithread, wbuffer + fifoBufferHeaderSize, tempframenumber,pnum,snum,bunchid) == FAIL){ - //error in frame number sent by fpga - while(!fifoFree[ithread]->push(wbuffer)); - return; - } - currentFrameNumber[ithread] = tempframenumber; - - - //set indices - pthread_mutex_lock(&progressMutex); - if((currentFrameNumber[ithread] - startAcquisitionIndex) > acquisitionIndex) - acquisitionIndex = currentFrameNumber[ithread] - startAcquisitionIndex; - if((currentFrameNumber[ithread] - startFrameIndex) > frameIndex[ithread]) - frameIndex[ithread] = currentFrameNumber[ithread] - startFrameIndex; - pthread_mutex_unlock(&progressMutex); - - //variable definitions - char* buff[2]={0,0}; //an array just to be compatible with copyframetogui - char* data = wbuffer+ fifoBufferHeaderSize; //data pointer to the next memory to be analysed - int ndata; //size of data returned - uint32_t np; //remaining number of packets returned - uint32_t npackets = (uint32_t)(*((uint32_t*)wbuffer)); //number of total packets - int remainingsize = npackets * onePacketSize; //size of the memory slot to be analyzed - - eventType thisEvent = PEDESTAL; - int once = 0; - int xmax = 0, ymax = 0; //max pixels in x and y direction - int xmin = 1, ymin = 1; //min pixels in x and y direction - double tot, tl, tr, bl, br; - - //determining xmax and ymax - switch(myDetectorType){ - case MOENCH: - xmax = MOENCH_PIXELS_IN_ONE_ROW-1; - ymax = MOENCH_PIXELS_IN_ONE_ROW-1; - break; - case GOTTHARD: - if(shortFrameEnable == -1){ - xmax = GOTTHARD_PIXELS_IN_ROW-1; - ymax = GOTTHARD_PIXELS_IN_COL-1; - }else{ - xmax = GOTTHARD_SHORT_PIXELS_IN_ROW-1; - ymax = GOTTHARD_SHORT_PIXELS_IN_COL-1; - } - break; - default: - break; - } - - while(buff[0] = receiverData[ithread]->findNextFrame(data,ndata,remainingsize)){ - - //remaining number of packets - np = ndata/onePacketSize; - - if ((np == packetsPerFrame) && (buff[0]!=NULL)){ - if(nf == 1000) - cprintf(GREEN, "Writing_Thread %d: pedestal done\n", ithread); - - singlePhotonDetectorObject[ithread]->newFrame(); - - //only for moench - if(commonModeSubtractionEnable){ - for(int ix = xmin - 1; ix < xmax+1; ix++){ - for(int iy = ymin - 1; iy < ymax+1; iy++){ - thisEvent = singlePhotonDetectorObject[ithread]->getEventType(buff[0], ix, iy, 0); - } - } - } - - - for(int ix = xmin - 1; ix < xmax+1; ix++) - for(int iy = ymin - 1; iy < ymax+1; iy++){ - thisEvent=singlePhotonDetectorObject[ithread]->getEventType(buff[0], ix, iy, commonModeSubtractionEnable); - if (nf>1000) { - tot=0; - tl=0; - tr=0; - bl=0; - br=0; - if (thisEvent==PHOTON_MAX) { - receiverData[ithread]->getFrameNumber(buff[0]); - //iFrame=receiverData[ithread]->getFrameNumber(buff); -#ifdef MYROOT1 - myTree[ithread]->Fill(); - //cout << "Fill in event: frmNr: " << iFrame << " ix " << ix << " iy " << iy << " type " << thisEvent << endl; -#else - pthread_mutex_lock(&writeMutex); - if((fileWriteEnable) && (sfilefd[0])) - singlePhotonDetectorObject[ithread]->writeCluster(sfilefd[0]); - pthread_mutex_unlock(&writeMutex); -#endif - } - } - } - - nf++; - - -#ifndef ALLFILE - totalPacketsInFile[ithread] += (bufferSize/packetsPerFrame); - totalWritingPacketCount[ithread] += (bufferSize/packetsPerFrame); - pthread_mutex_lock(&writeMutex); - if((packetsCaught%packetsPerFrame) >= (uint32_t)maxFramesPerFile) - createNewFile(ithread); - packetsCaught += (bufferSize/packetsPerFrame); - totalPacketsCaught += (bufferSize/packetsPerFrame); - pthread_mutex_unlock(&writeMutex); - - -#endif - if(!once){ - if(dataStreamEnable) - copyFrameToGui(ithread, buff[0],(uint32_t)packetsPerFrame); - once = 1; - } - } - - remainingsize -= ((buff[0] + ndata) - data); - data = buff[0] + ndata; - if(data > (wbuffer + fifoBufferHeaderSize + npackets * onePacketSize) ) - cprintf(BG_RED,"Writing_Thread %d: Error: Compression data goes out of bounds!\n", ithread); - } - - - while(!fifoFree[0]->push(wbuffer)); -#ifdef EVERYFIFODEBUG - if(fifoFree[0]->getSemValue()<100) - cprintf(GREEN,"FifoFree[%d]: value:%d, push 0x%x\n",0,fifoFree[0]->getSemValue(),(void*)(wbuffer)); -#endif -#ifdef DEBUG5 - cprintf(GREEN,"Writing_Thread %d: Compression free pushed into fifofree %p for listerner 0\n", ithread, (void*)(wbuffer)); -#endif -} - - - -int UDPStandardImplementation::getFrameandPacketNumber(int ithread, char* wbuffer, uint64_t &framenumber, uint32_t &packetnumber,uint32_t &subframenumber, uint64_t &bunchid){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - eiger_packet_footer_t* footer=0; - eiger_packet_header_t* e_header=0; - jfrau_packet_header_t* header=0; - framenumber = 0; - packetnumber = 0; - subframenumber = 0; - bunchid = 0; - - switch(myDetectorType){ - - case EIGER: - footer = (eiger_packet_footer_t*)(wbuffer + footerOffset); - framenumber = (uint32_t)(*( (uint64_t*) footer)); - //error in frame number sent by fpga - if(((uint32_t)(*( (uint64_t*) footer)))==0){ - framenumber = 0; - FILE_LOG(logERROR) << "Fifo "<< ithread << ": Frame Number is zero from firmware."; - return FAIL; - } - packetnumber = (*( (uint16_t*) footer->packetNumber))-1; - e_header = (eiger_packet_header_t*) (wbuffer); - subframenumber = *( (uint32_t*) e_header->subFrameNumber); -#ifdef DEBUG4 - if(!ithread) cprintf(GREEN,"Writing_Thread %d: fnum:%lld pnum:%d FPGA_fnum:%d subfnum:%d footeroffset:%d\n", - ithread, - (long long int)framenumber, - packetnumber, - framenumber, - subframenumber, - footerOffset); -#endif - framenumber -= startFrameIndex; - break; - - case JUNGFRAU: - header = (jfrau_packet_header_t*)(wbuffer); - framenumber = (*( (uint32_t*) header->frameNumber))&frameIndexMask; - packetnumber = (uint32_t)(*( (uint8_t*) header->packetNumber)); - bunchid = (*((uint64_t*) header->bunchid)); -#ifdef DEBUG4 - cprintf(GREEN, "Writing_Thread %d: fnum:%lld\t pnum:%d bunchid:%lld\n", - (long long int)framenumber, - packetnumber, - (long long int)bunchid); -#endif - framenumber -= startFrameIndex; - break; - - default: - framenumber = ((uint32_t)(*((uint32_t*)(wbuffer)))); - //for gotthard and normal frame, increment frame number to separate fnum and pnum - if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1)) - framenumber++; - packetnumber = framenumber&packetIndexMask; - framenumber = (framenumber & frameIndexMask) >> frameIndexOffset; -#ifdef DEBUG4 - cprintf(GREEN, "Writing_Thread %d: fnum:%lld\t pnum:%d\n", - (long long int)framenumber, - packetnumber); -#endif - framenumber -= startFrameIndex; - break; - } - return OK; -} - - - - - -int UDPStandardImplementation::writeUptoFrameNumber(int ithread, char* wbuffer, int &offset, uint64_t nextFrameNumber, uint32_t numpackets, int &numPacketsWritten){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - //if(ithread) cout<<"at writeUptoFrameNumber " << nextFrameNumber<< endl; - - - int startoffset = offset; - int endoffset = startoffset + numpackets * onePacketSize; - uint64_t tempframenumber=-1; - offset = endoffset; - uint32_t pnum; - uint32_t snum; - uint64_t bunchid=-1; - //get last frame number - if(getFrameandPacketNumber(ithread, wbuffer + (endoffset-onePacketSize), tempframenumber,pnum,snum,bunchid) == FAIL){ - //error in frame number sent by fpga - while(!fifoFree[ithread]->push(wbuffer)); - return FAIL; - } - //last packet's frame number < nextframenumber - if(tempframenumber=nextFrameNumber){ - offset -= bigIncrements; - if(offsetpush(wbuffer)); - return FAIL; - } - } - if(offsetpush(wbuffer)); - return FAIL; - } - } - while(tempframenumberpush(wbuffer)); - return FAIL; - } - } - - - fwrite(wbuffer + startoffset, 1, offset-startoffset, sfilefd[ithread]); - numPacketsWritten += ((offset-startoffset)/onePacketSize); - lastFrameNumberInFile[ithread] = (nextFrameNumber-1); - //if(ithread) cout<<"done with writeUptoFrameNumber" << endl; - return OK; -} - - - - -/** function that returns the name variable from the receiver complete file name prefix - \param fname complete file name prefix - \returns file name -*/ -string UDPStandardImplementation::getNameFromReceiverFilePrefix(string fname) { - int i; - string s=fname; - size_t uscore=s.rfind("_"); - if (sscanf( s.substr(uscore+1,s.size()-uscore-1).c_str(),"d%d",&i)) - s=fname.substr(0,uscore); - return s; -}; - - - - -