From a3e88f96d6d1cfd2ad050006dc632c88437c8715 Mon Sep 17 00:00:00 2001 From: Dhanya Maliakal Date: Thu, 8 Oct 2015 12:19:07 +0200 Subject: [PATCH] some more changes --- .../include/UDPBaseImplementation.h | 67 +- slsReceiverSoftware/include/UDPInterface.h | 30 +- .../include/UDPStandardImplementation.h | 694 ++++--- slsReceiverSoftware/include/receiver_defs.h | 8 +- slsReceiverSoftware/include/slsReceiver.h | 1 - .../include/sls_receiver_defs.h | 10 + .../src/UDPBaseImplementation.cpp | 91 +- slsReceiverSoftware/src/UDPInterface.cpp | 4 +- .../src/UDPStandardImplementation.cpp | 1838 ++++++++--------- .../src/slsReceiverTCPIPInterface.cpp | 43 +- 10 files changed, 1436 insertions(+), 1350 deletions(-) diff --git a/slsReceiverSoftware/include/UDPBaseImplementation.h b/slsReceiverSoftware/include/UDPBaseImplementation.h index 29d9dd009..293ac6763 100644 --- a/slsReceiverSoftware/include/UDPBaseImplementation.h +++ b/slsReceiverSoftware/include/UDPBaseImplementation.h @@ -7,13 +7,12 @@ ***********************************************/ -#include "sls_receiver_defs.h" +//#include "sls_receiver_defs.h" #include "UDPInterface.h" -#include -#include +//#include /** - * @short does all the functions for a receiver, set/get parameters, start/stop etc. + * @short does all the base functions for a receiver, set/get parameters, start/stop etc. */ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInterface { @@ -123,13 +122,13 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter * Get UDP Port Number * @return udp port number */ - uint32_t getUDPPortNo() const; + uint32_t getUDPPortNumber() const; /** * Get Second UDP Port Number (eiger specific) * @return second udp port number */ - uint32_t getUDPPortNo2() const; + uint32_t getUDPPortNumber2() const; /** * Get Ehernet Interface @@ -254,9 +253,9 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter /** * Set data compression, by saving only hits (so far implemented only for Moench and Gotthard) * @param b true for data compression enable, else false + * @return OK or FAIL */ - void setDataCompressionEnable(const bool b); - + int setDataCompressionEnable(const bool b); //***connection parameters*** @@ -264,13 +263,13 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter * Set UDP Port Number * @param i udp port number */ - void setUDPPortNo(const uint32_t i); + void setUDPPortNumber(const uint32_t i); /** * Set Second UDP Port Number (eiger specific) * @return second udp port number */ - void setUDPPortNo2(const uint32_t i); + void setUDPPortNumber2(const uint32_t i); /** * Set Ethernet Interface to listen to @@ -279,7 +278,7 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter void setEthernetInterface(const char* c); - //***connection parameters*** + //***acquisition parameters*** /** * Set Short Frame Enabled, later will be moved to getROI (so far only for gotthard) * @param i index of adc enabled, else -1 if all enabled @@ -289,14 +288,16 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter /** * Set the Frequency of Frames Sent to GUI * @param i 0 for random frame requests, n for nth frame frequency + * @return OK or FAIL */ - void setFrameToGuiFrequency(const uint32_t i); + int setFrameToGuiFrequency(const uint32_t i); /** * Set Acquisition Period * @param i acquisition period + * @return OK or FAIL */ - void setAcquisitionPeriod(const uint64_t i); + int setAcquisitionPeriod(const uint64_t i); /** * Set Number of Frames expected by receiver from detector @@ -308,15 +309,16 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter /** * Set Dynamic Range or Number of Bits Per Pixel * @param i dynamic range that is 4, 8, 16 or 32 + * @return OK or FAIL */ - void setDynamicRange(const uint32_t i); + int setDynamicRange(const uint32_t i); /** * Set Ten Giga Enable * @param b true if 10Giga enabled, else false (1G enabled) + * @return OK or FAIL */ - void setTenGigaEnable(const bool b); - + int setTenGigaEnable(const bool b); /************************************************************************* @@ -324,6 +326,7 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter * They may modify the status of the receiver **************************** *************************************************************************/ + //***initial functions*** /** * Set receiver type (and corresponding detector variables in derived STANDARD class) @@ -387,7 +390,7 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter * abort acquisition with minimum damage: close open files, cleanup. * does nothing if state already is 'idle' */ - void abort(); //FIXME: needed, isnt stopReceiver enough? + void abort(); //FIXME: needed, isn't stopReceiver enough? /** * Closes all files @@ -436,36 +439,12 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter protected: //**detector parameters*** - /** - * structure of an eiger packet header - * subframenum subframe number for 32 bit mode (already written by firmware) - * missingpacket explicitly put to 0xFF to recognize it in file read (written by software) - * portnum 0 for the first port and 1 for the second port (written by software to file) - * dynamicrange dynamic range or bits per pixel (written by software to file) - */ - typedef struct { - unsigned char subframenum[4]; - unsigned char missingpacket[2]; - unsigned char portnum[1]; - unsigned char dynamicrange[1]; - } eiger_packet_header_t; - /** - * structure of an eiger packet footer - * framenum 48 bit frame number (already written by firmware) - * packetnum packet number (already written by firmware) - */ - typedef struct { - unsigned char framenum[6]; - unsigned char packetnum[2]; - } eiger_packet_footer_t; - - /** detector type */ detectorType myDetectorType; /** detector hostname */ char detHostname[MAX_STR_LENGTH]; /** Number of Packets per Frame*/ - uint64_t packetsPerFrame; + uint32_t packetsPerFrame; /** Acquisition Period */ int64_t acquisitionPeriod; /** Frame Number */ @@ -479,7 +458,7 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter //***receiver parameters*** /** Maximum Number of Listening Threads/ UDP Ports */ - const static int MAX_NUM_LISTENING_THREADS = 2; + const static int MAX_NUMBER_OF_LISTENING_THREADS = 2; /** Receiver Status */ runStatus status; @@ -487,7 +466,7 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter /** Ethernet Interface */ char eth[MAX_STR_LENGTH]; /** Server UDP Port Number*/ - uint32_t udpPortNum[MAX_NUM_LISTENING_THREADS]; + uint32_t udpPortNum[MAX_NUMBER_OF_LISTENING_THREADS]; //***file parameters*** /** File Name without frame index, file index and extension (_d0_f000000000000_8.raw)*/ diff --git a/slsReceiverSoftware/include/UDPInterface.h b/slsReceiverSoftware/include/UDPInterface.h index 2e89b59be..65c275a29 100644 --- a/slsReceiverSoftware/include/UDPInterface.h +++ b/slsReceiverSoftware/include/UDPInterface.h @@ -17,8 +17,6 @@ #include "sls_receiver_defs.h" #include "receiver_defs.h" -#include "MySocketTCP.h" - #include "utilities.h" #include "logger.h" @@ -184,13 +182,13 @@ class UDPInterface { * Get UDP Port Number * @return udp port number */ - virtual uint32_t getUDPPortNo() const = 0; + virtual uint32_t getUDPPortNumber() const = 0; /** * Get Second UDP Port Number (eiger specific) * @return second udp port number */ - virtual uint32_t getUDPPortNo2() const = 0; + virtual uint32_t getUDPPortNumber2() const = 0; /** * Get Ehernet Interface @@ -314,22 +312,22 @@ class UDPInterface { /** * Set data compression, by saving only hits (so far implemented only for Moench and Gotthard) * @param b true for data compression enable, else false + * @return OK or FAIL */ - virtual void setDataCompressionEnable(const bool b) = 0; - + virtual int setDataCompressionEnable(const bool b) = 0; //***connection parameters*** /** * Set UDP Port Number * @param i udp port number */ - virtual void setUDPPortNo(const uint32_t i) = 0; + virtual void setUDPPortNumber(const uint32_t i) = 0; /** * Set Second UDP Port Number (eiger specific) * @return second udp port number */ - virtual void setUDPPortNo2(const uint32_t i) = 0; + virtual void setUDPPortNumber2(const uint32_t i) = 0; /** * Set Ethernet Interface to listen to @@ -338,7 +336,7 @@ class UDPInterface { virtual void setEthernetInterface(const char* c) = 0; - //***connection parameters*** + //***acquisition parameters*** /** * Set Short Frame Enabled, later will be moved to getROI (so far only for gotthard) * @param i index of adc enabled, else -1 if all enabled @@ -348,14 +346,16 @@ class UDPInterface { /** * Set the Frequency of Frames Sent to GUI * @param i 0 for random frame requests, n for nth frame frequency + * @return OK or FAIL */ - virtual void setFrameToGuiFrequency(const uint32_t i) = 0; + virtual int setFrameToGuiFrequency(const uint32_t i) = 0; /** * Set Acquisition Period * @param i acquisition period + * @return OK or FAIL */ - virtual void setAcquisitionPeriod(const uint64_t i) = 0; + virtual int setAcquisitionPeriod(const uint64_t i) = 0; /** * Set Number of Frames expected by receiver from detector @@ -367,15 +367,16 @@ class UDPInterface { /** * Set Dynamic Range or Number of Bits Per Pixel * @param i dynamic range that is 4, 8, 16 or 32 + * @return OK or FAIL */ - virtual void setDynamicRange(const uint32_t i) = 0; + virtual int setDynamicRange(const uint32_t i) = 0; /** * Set Ten Giga Enable * @param b true if 10Giga enabled, else false (1G enabled) + * @return OK or FAIL */ - virtual void setTenGigaEnable(const bool b) = 0; - + virtual int setTenGigaEnable(const bool b) = 0; /************************************************************************* @@ -383,6 +384,7 @@ class UDPInterface { * They may modify the status of the receiver **************************** *************************************************************************/ + //***initial functions*** /** * Set receiver type (and corresponding detector variables in derived STANDARD class) diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index 04bd7000e..747eaad34 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -6,25 +6,22 @@ * @short does all the functions for a receiver, set/get parameters, start/stop etc. ***********************************************/ +#include "UDPBaseImplementation.h" -#include "sls_receiver_defs.h" -#include "receiver_defs.h" +//#include "sls_receiver_defs.h" +//#include "receiver_defs.h" #include "genericSocket.h" #include "circularFifo.h" #include "singlePhotonDetector.h" #include "slsReceiverData.h" #include "moenchCommonMode.h" -//#include "UDPInterface.h" -#include "UDPBaseImplementation.h" #ifdef MYROOT1 #include #include #endif - - #include #include #include @@ -38,7 +35,13 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBaseImplementation { public: - /** + + + /************************************************************************* + * Constructor & Destructor ********************************************** + * They access local cache of configuration or detector parameters ******* + *************************************************************************/ + /** * Constructor */ UDPStandardImplementation(); @@ -48,17 +51,382 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase */ virtual ~UDPStandardImplementation(); + + /************************************************************************* + * Getters *************************************************************** + * They access local cache of configuration or detector parameters ******* + *************************************************************************/ + + //***acquisition count parameters*** + + + /************************************************************************* + * Setters *************************************************************** + * They modify the local cache of configuration or detector parameters *** + *************************************************************************/ + + //**initial parameters*** + + /** + * Overridden method + * Configure command line parameters + * @param config_map mapping of config parameters passed from command line arguments + */ void configure(map config_map); + //*** file parameters*** /** - * delete and free member parameters + * Overridden method + * Set data compression, by saving only hits (so far implemented only for Moench and Gotthard) + * @param b true for data compression enable, else false */ - void deleteMembers(); + void setDataCompressionEnable(const bool b); + + //***acquisition parameters*** + /** + * Overridden method + * Set Short Frame Enabled, later will be moved to getROI (so far only for gotthard) + * @param i index of adc enabled, else -1 if all enabled + */ + void setShortFrameEnable(const int i); /** - * initialize member parameters + * Overridden method + * Set the Frequency of Frames Sent to GUI + * @param i 0 for random frame requests, n for nth frame frequency + * @return OK or FAIL + */ + int setFrameToGuiFrequency(const uint32_t i); + + /** + * Overridden method + * Set Acquisition Period + * @param i acquisition period + * @return OK or FAIL + */ + int setAcquisitionPeriod(const uint64_t i); + + /** + * Overridden method + * Set Dynamic Range or Number of Bits Per Pixel + * @param i dynamic range that is 4, 8, 16 or 32 + * @return OK or FAIL + */ + int setDynamicRange(const uint32_t i); + + /** + * Overridden method + * Set Ten Giga Enable + * @param b true if 10Giga enabled, else false (1G enabled) + * @return OK or FAIL + */ + int setTenGigaEnable(const bool b); + + + + /************************************************************************* + * Behavioral functions*************************************************** + * They may modify the status of the receiver **************************** + *************************************************************************/ + + //***initial functions*** + /** + * Overridden method + * Set receiver type (and corresponding detector variables in derived STANDARD class) + * It is the first function called by the client when connecting to receiver + * @param d detector type + * @return OK or FAIL + */ + int setDetectorType(const slsReceiverDefs::detectorType d); + + //***acquisition functions*** + /** + * Overridden method + * Reset acquisition parameters such as total frames caught for an entire acquisition (including all scans) + */ + void resetAcquisitionCount(); + + /** + * Overridden method + * Start Listening for Packets by activating all configuration settings to receiver + * @param c error message if FAIL + * @return OK or FAIL + */ + int startReceiver(char *c=NULL); + + + +private: + + /************************************************************************* + * Setters *************************************************************** + * They modify the local cache of configuration or detector parameters *** + *************************************************************************/ + //**initial parameters*** + + /** + * Delete and free base member parameters + */ + void deleteBaseMembers(); + + /** + * Delete and free member parameters + */ + void deleteMembers(); + + /** + * Deletes all the filter objects for single photon data + * Deals with data compression + */ + void deleteFilter(); + + /** + * Initialize base member parameters + */ + void initializeBaseMembers(); + + /** + * Initialize member parameters */ void initializeMembers(); + + /** + * Sets up all the filter objects for single photon data + * Deals with data compression + */ + void initializeFilter(); + + /** + * Create Listening Threads + * @param destroy is true to destroy all the threads + */ + int createListeningThreads(bool destroy = false); + + /** + * Create Writer Threads + * @param destroy is true to destroy all the threads + * @return OK or FAIL + */ + int createWriterThreads(bool destroy = false); + + /** + * Set Thread Priorities + */ + void setThreadPriorities(); + + /** + * Set up the Fifo Structure for processing buffers + * between listening and writer threads + * @return OK or FAIL + */ + int setupFifoStructure(); + + /** + * Creates UDP Sockets + * @return OK or FAIL + */ + int createUDPSockets(); + + + + //**detector parameters*** + /** + * structure of an eiger packet header + * subframenum subframe number for 32 bit mode (already written by firmware) + * missingpacket explicitly put to 0xFF to recognize it in file read (written by software) + * portnum 0 for the first port and 1 for the second port (written by software to file) + * dynamicrange dynamic range or bits per pixel (written by software to file) + */ + typedef struct { + unsigned char subFameNumber[4]; + unsigned char missingPacket[2]; + unsigned char portIndex[1]; + unsigned char dynamicRange[1]; + } eiger_packet_header_t; + /** + * structure of an eiger packet footer + * framenum 48 bit frame number (already written by firmware) + * packetnum packet number (already written by firmware) + */ + typedef struct { + unsigned char frameNumber[6]; + unsigned char packetNumber[2]; + } eiger_packet_footer_t; + + /** Size of 1 Frame including headers */ + int frameSize; + + /** Size of 1 buffer processed at a time */ + int bufferSize; + + /** One Packet Size including headers */ + int onePacketSize; + + /** One Packet Size without headers */ + int oneDataSize; + + /** Frame Index Mask */ + uint64_t frameIndexMask; + + /** Frame Index Offset */ + int frameIndexOffset; + + /** Packet Index Mask */ + uint64_t packetIndexMask; + + /** Footer offset from start of Packet*/ + int footerOffset; + + //***File parameters*** + /** Maximum Packets Per File **/ + int maxPacketsPerFile; + + + + //***acquisition indices parameters*** + /** Frame Number of First Frame of an Acquisition */ + uint64_t startAcquisitionIndex; + + /** Frame index at start of each real time acquisition (eg. for each scan) */ + uint64_t startFrameIndex; + + /** Current Frame Number */ + uint64_t currentFrameNumber; + + /* Acquisition started */ + bool acqStarted; + + /* Measurement started */ + bool measurementStarted; + + /** Total Frame Count listened to by listening threads */ + int totalListeningFrameCount[MAX_NUMBER_OF_LISTENING_THREADS]; + + + + + //***receiver parameters*** + /** Receiver Buffer */ + char *buffer[MAX_NUMBER_OF_LISTENING_THREADS]; + + /** Memory allocated */ + char *mem0[MAX_NUMBER_OF_LISTENING_THREADS]; + + /** Circular fifo to point to addresses of data listened to */ + CircularFifo* fifo[MAX_NUMBER_OF_LISTENING_THREADS]; + + /** Circular fifo to point to address already written and freed, to be reused */ + CircularFifo* fifoFree[MAX_NUMBER_OF_LISTENING_THREADS]; + + /** Number of Jobs Per Buffer */ + int numberofJobsPerBuffer; + + /** Fifo Depth */ + uint32_t fifoSize; + + /** Current Frame copied for Gui */ + char* latestData; + + + + //***general and listening thread parameters*** + /** Ensures if threads created successfully */ + bool threadStarted; + + /** Number of Listening Threads */ + int numberofListeningThreads; + + /** Listening Threads */ + pthread_t listeningThreads[MAX_NUMBER_OF_LISTENING_THREADS]; + + /** Semaphores Synchronizing Listening Threads */ + sem_t listenSemaphore[MAX_NUMBER_OF_LISTENING_THREADS]; + + /** Current Listening Thread Index*/ + int currentListeningThreadIndex; + + /** Mask with each bit indicating status of each listening thread */ + volatile uint32_t listeningThreadsMask; + + /** Set to self-terminate listening threads waiting for semaphores */ + bool killAllListeningThreads; + + + + //***writer thread parameters*** + /** Maximum Number of Writer Threads */ + const static int MAX_NUMBER_OF_WRITER_THREADS = 15; + + /** Number of Writer Threads */ + int numberofWriterThreads; + + /** Writer Threads */ + pthread_t writingThreads[MAX_NUMBER_OF_WRITER_THREADS]; + + /** Semaphores Synchronizing Writer Threads */ + sem_t writerSemaphore[MAX_NUMBER_OF_WRITER_THREADS]; + + /** Current Writer Thread Index*/ + int currentWriterThreadIndex; + + /** Mask with each bit indicating status of each writer thread */ + volatile uint32_t writerThreadsMask; + + /** Mask with each bit indicating file created for each writer thread*/ + volatile uint32_t createFileMask; + + /** Set to self-terminate writer threads waiting for semaphores */ + bool killAllWritingThreads; + + + + + + + //***filter parameters*** + /** Common Mode Subtraction Enable FIXME: Always false, only moench uses, Ask Anna */ + bool commonModeSubtractionEnable; + + /** Moench Common Mode Subtraction */ + moenchCommonMode *moenchCommonModeSubtraction; + + /** Single Photon Detector Object for each writer thread */ + singlePhotonDetector *singlePhotonDetectorObject[MAX_NUMBER_OF_WRITER_THREADS]; + + /** Receiver Data Object for each writer thread */ + slsReceiverData *receiverData[MAX_NUMBER_OF_WRITER_THREADS]; + + + + + //***mutex*** + /** mutex for status */ + pthread_mutex_t status_mutex; + + + + + + + + + + + + + + + + + + + + + + + + + /** * Set receiver type @@ -74,26 +442,11 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase */ //uint32_t getStartAcquisitionIndex(); - /** - * Returns current Frame Index Caught for an entire acquisition (including all scans) - */ - //uint32_t getAcquisitionIndex(); - /** * Returns if acquisition started */ //bool getAcquistionStarted(); - /** - * Returns Frames Caught for each real time acquisition (eg. for each scan) - */ - //int getFramesCaught(); - - /** - * Returns Total Frames Caught for an entire acquisition (including all scans) - */ - //int getTotalFramesCaught(); - /** * Returns the frame index at start of each real time acquisition (eg. for each scan) */ @@ -117,71 +470,8 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase //void resetTotalFramesCaught(); - //file parameters - /** - * Returns File Path - */ - //char* getFilePath() const; - /** - * Set File Path - * @param c file path - */ - //char* setFilePath(const char c[]); - /** - * Returns File Name - */ - //char* getFileName() const; - - /** - * Set File Name (without frame index, file index and extension) - * @param c file name - */ - //char* setFileName(const char c[]); - - /** - * Returns File Index - */ - //int getFileIndex(); - - /** - * Set File Index - * @param i file index - */ - //int setFileIndex(int i); - - /** - * Set Frame Index Needed - * @param i frame index needed - */ - //int setFrameIndexNeeded(int i); - - /** - * Set enable file write - * @param i file write enable - * Returns file write enable - */ - //int setEnableFileWrite(int i); - - /** - * Enable/disable overwrite - * @param i enable - * Returns enable over write - */ - //int setEnableOverwrite(int i); - - /** - * Returns file write enable - * 1: YES 0: NO - */ - //int getEnableFileWrite() const; - - /** - * Returns file over write enable - * 1: YES 0: NO - */ - //int getEnableOverwrite() const; //other parameters @@ -202,83 +492,7 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase */ void setDetectorHostname(const char *detectorHostName); - /* Returns detector hostname - /returns hostname - * caller needs to deallocate the returned char array. - * if uninitialized, it must return NULL - */ - char *getDetectorHostname() const; - /** - * Set Ethernet Interface or IP to listen to - */ - void setEthernetInterface(char* c); - - /** - * Set UDP Port Number - */ - void setUDPPortNo(int p); - /** - * Set UDP Port Number - */ - void setUDPPortNo2(int p); - - /* - * Returns number of frames to receive - * This is the number of frames to expect to receiver from the detector. - * The data receiver will change from running to idle when it got this number of frames - */ - int getNumberOfFrames() const; - - /** - * set frame number if a positive number - */ - int32_t setNumberOfFrames(int32_t fnum); - - /** - * Returns scan tag - */ - int getScanTag() const; - - /** - * set scan tag if its is a positive number - */ - int32_t setScanTag(int32_t stag); - - /** - * Returns the number of bits per pixel - */ - int getDynamicRange() const; - - /** - * set dynamic range if its is a positive number - */ - int32_t setDynamicRange(int32_t dr); - - /** - * Set short frame - * @param i if shortframe i=1 - */ - int setShortFrame(int i); - - /** - * Set the variable to send every nth frame to gui - * or if 0,send frame only upon gui request - */ - int setNFrameToGui(int i); - - /** set acquisition period if a positive number - */ - int64_t setAcquisitionPeriod(int64_t index); - - /** get data compression, by saving only hits - */ - bool getDataCompression(); - - /** enabl data compression, by saving only hits - /returns if failed - */ - int enableDataCompression(bool enable); /** * enable 10Gbe @@ -337,20 +551,9 @@ private: std::cout << "[WARNING] Method " << method_name << " not implemented!" << std::endl; }; */ - /** - * Deletes all the filter objects for single photon data - */ - void deleteFilter(); - /** - * Constructs the filter for single photon data - */ - void setupFilter(); - /** - * set up fifo according to the new numjobsperthread - */ - void setupFifoStructure (); + /** * Copy frames to gui @@ -358,28 +561,6 @@ private: */ void copyFrameToGui(char* startbuf[], char* buf=NULL); - /** - * creates udp sockets - * \returns if success or fail - */ - int createUDPSockets(); - - /** - * create listening thread - * @param destroy is true to kill all threads and start again - */ - int createListeningThreads(bool destroy = false); - - /** - * create writer threads - * @param destroy is true to kill all threads and start again - */ - int createWriterThreads(bool destroy = false); - - /** - * set thread priorities - */ - void setThreadPriorities(); /** * initializes variables and creates the first file @@ -507,8 +688,7 @@ private: - /** max number of writer threads */ - const static int MAX_NUM_WRITER_THREADS = 15; + /** missing packet identifier value */ const static uint16_t missingPacketValue = 0xFFFF; @@ -517,21 +697,9 @@ private: /** UDP Socket between Receiver and Detector */ genericSocket* udpSocket[MAX_NUM_LISTENING_THREADS]; - /** max packets per file **/ - int maxPacketsPerFile; - - /** Frame Index at start of an entire acquisition (including all scans) */ - uint64_t startAcquisitionIndex; /** Complete File name */ char savefilename[MAX_STR_LENGTH]; - /* Measurement started */ - bool measurementStarted; - /* Acquisition started */ - bool acqStarted; - /** Frame index at start of each real time acquisition (eg. for each scan) */ - uint32_t startFrameIndex; - /** Actual current frame index of each time acquisition (eg. for each scan) */ uint32_t frameIndex; @@ -547,34 +715,9 @@ private: /** Number of missing packets per buffer*/ uint32_t numMissingPackets; - /** frame index mask */ - uint32_t frameIndexMask; - - /** packet index mask */ - uint32_t packetIndexMask; - - /** frame index offset */ - int frameIndexOffset; - /** Current Frame Number */ - uint64_t currframenum; - /** Previous Frame number from buffer */ int prevframenum; - /** size of one frame */ - int frameSize; - - /** buffer size. different from framesize as we wait for one packet instead of frame for eiger */ - int bufferSize; - - /** one buffer size */ - int onePacketSize; - - /** one buffer size */ - int oneDataSize; - - /** latest data */ - char* latestData; /** gui data ready */ int guiDataReady; @@ -585,71 +728,9 @@ private: /** points to the filename to send to gui */ char* guiFileName; - /** fifo size */ - unsigned int fifosize; - - /** number of jobs per thread for data compression */ - int numJobsPerThread; - - /** memory allocated for the buffer */ - char *mem0[MAX_NUM_LISTENING_THREADS]; - - /** circular fifo to store addresses of data read */ - CircularFifo* fifo[MAX_NUM_LISTENING_THREADS]; - - /** circular fifo to store addresses of data already written and ready to be resued*/ - CircularFifo* fifoFree[MAX_NUM_LISTENING_THREADS]; - - /** Receiver buffer */ - char *buffer[MAX_NUM_LISTENING_THREADS]; - - /** number of writer threads */ - int numListeningThreads; - - /** number of writer threads */ - int numWriterThreads; - - /** to know if listening and writer threads created properly */ - int thread_started; - - /** current listening thread index*/ - int currentListeningThreadIndex; - - /** current writer thread index*/ - int currentWriterThreadIndex; - - /** thread listening to packets */ - pthread_t listening_thread[MAX_NUM_LISTENING_THREADS]; - - /** thread writing packets */ - pthread_t writing_thread[MAX_NUM_WRITER_THREADS]; - - /** total frame count the listening thread has listened to */ - int totalListeningFrameCount[MAX_NUM_LISTENING_THREADS]; - - /** mask showing which listening threads are running */ - volatile uint32_t listeningthreads_mask; - - /** mask showing which writer threads are running */ - volatile uint32_t writerthreads_mask; - - /** mask showing which threads have created files*/ - volatile uint32_t createfile_mask; - - /** OK if file created was successful */ +/** OK if file created was successful */ int ret_createfile; - /** variable used to self terminate threads waiting for semaphores */ - int killAllListeningThreads; - - /** variable used to self terminate threads waiting for semaphores */ - int killAllWritingThreads; - - - - /** footer offset is different for 1g and 10g*/ - int footer_offset; - // TODO: not properly sure where to put these... /** structure of an eiger image header*/ @@ -659,19 +740,11 @@ private: //semaphores /** semaphore to synchronize writer and guireader threads */ sem_t smp; - /** semaphore to synchronize listener threads */ - sem_t listensmp[MAX_NUM_LISTENING_THREADS]; - /** semaphore to synchronize writer threads */ - sem_t writersmp[MAX_NUM_WRITER_THREADS]; - //mutex /** guiDataReady mutex */ pthread_mutex_t dataReadyMutex; - /** mutex for status */ - pthread_mutex_t status_mutex; - /** mutex for progress variable currframenum */ pthread_mutex_t progress_mutex; @@ -682,10 +755,7 @@ private: FILE *sfilefd; //filter - singlePhotonDetector *singlePhotonDet[MAX_NUM_WRITER_THREADS]; - slsReceiverData *receiverdata[MAX_NUM_WRITER_THREADS]; - moenchCommonMode *cmSub; - bool commonModeSubtractionEnable; + #ifdef MYROOT1 /** Tree where the hits are stored */ diff --git a/slsReceiverSoftware/include/receiver_defs.h b/slsReceiverSoftware/include/receiver_defs.h index 206c8b92b..bac0df2d6 100755 --- a/slsReceiverSoftware/include/receiver_defs.h +++ b/slsReceiverSoftware/include/receiver_defs.h @@ -27,8 +27,9 @@ /*#define GOTTHARD_ALIGNED_FRAME_SIZE 4096*/ #define GOTTHARD_PACKETS_PER_FRAME 2 #define GOTTHARD_ONE_PACKET_SIZE 1286 +#define GOTTHARD_ONE_DATA_SIZE 1280 #define GOTTHARD_BUFFER_SIZE (GOTTHARD_ONE_PACKET_SIZE*GOTTHARD_PACKETS_PER_FRAME) //1286*2 -#define GOTTHARD_DATA_BYTES (1280*GOTTHARD_PACKETS_PER_FRAME) //1280*2 +#define GOTTHARD_DATA_BYTES (GOTTHARD_ONE_DATA_SIZE*GOTTHARD_PACKETS_PER_FRAME) //1280*2 #define GOTTHARD_FRAME_INDEX_MASK 0xFFFFFFFE #define GOTTHARD_FRAME_INDEX_OFFSET 1 @@ -39,7 +40,7 @@ #define GOTTHARD_SHORT_PACKETS_PER_FRAME 1 -#define GOTTHARD_SHORT_ONE_PACKET_SIZE 518 +#define GOTTHARD_SHORT_ONE_PACKET_SIZE 518 #define GOTTHARD_SHORT_BUFFER_SIZE 518 #define GOTTHARD_SHORT_DATABYTES 512 #define GOTTHARD_SHORT_FRAME_INDEX_MASK 0xFFFFFFFF @@ -75,8 +76,9 @@ /*#define MOENCH_ALIGNED_FRAME_SIZE 65536*/ #define MOENCH_PACKETS_PER_FRAME 40 #define MOENCH_ONE_PACKET_SIZE 1286 +#define MOENCH_ONE_DATA_SIZE 1280 #define MOENCH_BUFFER_SIZE (MOENCH_ONE_PACKET_SIZE*MOENCH_PACKETS_PER_FRAME) //1286*40 -#define MOENCH_DATA_BYTES (1280*MOENCH_PACKETS_PER_FRAME) //1280*40 +#define MOENCH_DATA_BYTES (MOENCH_ONE_DATA_SIZE*MOENCH_PACKETS_PER_FRAME) //1280*40 #define MOENCH_FRAME_INDEX_MASK 0xFFFFFF00 #define MOENCH_FRAME_INDEX_OFFSET 8 diff --git a/slsReceiverSoftware/include/slsReceiver.h b/slsReceiverSoftware/include/slsReceiver.h index 4eb024776..e76ea7d1b 100644 --- a/slsReceiverSoftware/include/slsReceiver.h +++ b/slsReceiverSoftware/include/slsReceiver.h @@ -8,7 +8,6 @@ #include "slsReceiverTCPIPInterface.h" #include "UDPInterface.h" -//#include "UDPBaseImplementation.h" #include "receiver_defs.h" #include "MySocketTCP.h" diff --git a/slsReceiverSoftware/include/sls_receiver_defs.h b/slsReceiverSoftware/include/sls_receiver_defs.h index 11641334a..04b8b19fb 100755 --- a/slsReceiverSoftware/include/sls_receiver_defs.h +++ b/slsReceiverSoftware/include/sls_receiver_defs.h @@ -111,6 +111,16 @@ public: }; + /** returns string from enabled/disabled + \param b true or false + \returns string enabled, disabled + */ + static string stringEnable(bool b){\ + if(b) return string("enabled"); \ + else return string("disabled"); \ + }; + + #ifdef __cplusplus protected: diff --git a/slsReceiverSoftware/src/UDPBaseImplementation.cpp b/slsReceiverSoftware/src/UDPBaseImplementation.cpp index e16060865..0c1c30248 100644 --- a/slsReceiverSoftware/src/UDPBaseImplementation.cpp +++ b/slsReceiverSoftware/src/UDPBaseImplementation.cpp @@ -4,25 +4,12 @@ * @short does all the functions for a receiver, set/get parameters, start/stop etc. ***********************************************/ - #include "UDPBaseImplementation.h" -#include "moench02ModuleData.h" -#include "gotthardModuleData.h" -#include "gotthardShortModuleData.h" - - -#include // SIGINT #include // stat -#include // socket(), bind(), listen(), accept(), shut down -#include // sock_addr_in, htonl, INADDR_ANY -#include // exit() -#include //set precision -#include //munmap -#include #include - +#include using namespace std; @@ -32,7 +19,11 @@ using namespace std; * They access local cache of configuration or detector parameters ******* *************************************************************************/ UDPBaseImplementation::UDPBaseImplementation(){ + FILE_LOG(logDEBUG) << __AT__ << " starting"; + + cout << "Info: Initializing base members" << endl; //**detector parameters*** + myDetectorType = GENERIC; strcpy(detHostname,""); packetsPerFrame = 0; acquisitionPeriod = 0; @@ -51,7 +42,7 @@ UDPBaseImplementation::UDPBaseImplementation(){ } //***file parameters*** - strcpy(fileName,""); + strcpy(fileName,"run"); strcpy(filePath,""); fileIndex = 0; scanTag = 0; @@ -78,9 +69,17 @@ UDPBaseImplementation::UDPBaseImplementation(){ pAcquisitionFinished = NULL; rawDataReadyCallBack = NULL; pRawDataReady = NULL; -}; +} -UDPBaseImplementation::~UDPBaseImplementation(){}; +UDPBaseImplementation::~UDPBaseImplementation(){ + FILE_LOG(logDEBUG) << __AT__ << " starting"; + + cout << "Info: Deleting base member pointers" << endl; + if(detHostname) {delete [] detHostname; detHostname = NULL;} + if(eth) {delete [] eth; eth = NULL;} + if(fileName) {delete [] fileName; fileName = NULL;} + if(filePath) {delete [] filePath; filePath = NULL;} +} /************************************************************************* @@ -157,9 +156,9 @@ int64_t UDPBaseImplementation::getAcquisitionIndex() const{ /***connection parameters***/ -uint32_t UDPBaseImplementation::getUDPPortNo() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return udpPortNum[0];} +uint32_t UDPBaseImplementation::getUDPPortNumber() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return udpPortNum[0];} -uint32_t UDPBaseImplementation::getUDPPortNo2() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return udpPortNum[1];} +uint32_t UDPBaseImplementation::getUDPPortNumber2() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return udpPortNum[1];} char *UDPBaseImplementation::getEthernetInterface() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; @@ -205,7 +204,7 @@ void UDPBaseImplementation::setBottomEnable(const bool b){ FILE_LOG(logDEBUG) << __AT__ << " starting"; bottomEnable = b; - FILE_LOG(logINFO) << "Bottom Enable:" << bottomEnable; + FILE_LOG(logINFO) << "Bottom Enable: " << stringEnable(bottomEnable); } @@ -255,41 +254,46 @@ void UDPBaseImplementation::setFrameIndexEnable(const bool b){ FILE_LOG(logDEBUG) << __AT__ << " starting"; frameIndexEnable = b; - FILE_LOG(logINFO) << "Frame Index Enable:" << frameIndexEnable; + FILE_LOG(logINFO) << "Frame Index Enable: " << stringEnable(frameIndexEnable); } void UDPBaseImplementation::setFileWriteEnable(const bool b){ FILE_LOG(logDEBUG) << __AT__ << " starting"; fileWriteEnable = b; - FILE_LOG(logINFO) << "File Write Enable:" << fileWriteEnable; + FILE_LOG(logINFO) << "File Write Enable: " << stringEnable(fileWriteEnable); } void UDPBaseImplementation::setOverwriteEnable(const bool b){ FILE_LOG(logDEBUG) << __AT__ << " starting"; overwriteEnable = b; - FILE_LOG(logINFO) << "Overwrite Enable:" << overwriteEnable; + FILE_LOG(logINFO) << "Overwrite Enable: " << stringEnable(overwriteEnable); } -void UDPBaseImplementation::setDataCompressionEnable(const bool b){ +int UDPBaseImplementation::setDataCompressionEnable(const bool b){ FILE_LOG(logDEBUG) << __AT__ << " starting"; dataCompressionEnable = b; - FILE_LOG(logINFO) << "Data Compression Enable:" << dataCompressionEnable; -} + FILE_LOG(logINFO) << "Data Compression Enable: " << stringEnable(dataCompressionEnable); + //overridden methods might return FAIL + return OK; +} /***connection parameters***/ -void UDPBaseImplementation::setUDPPortNo(const uint32_t i){ +void UDPBaseImplementation::setUDPPortNumber(const uint32_t i){ FILE_LOG(logDEBUG) << __AT__ << " starting"; - udpPortNum[0] = i; + if(bottomEnable) + udpPortNum[1] = i; + else + udpPortNum[0] = i; FILE_LOG(logINFO) << "udpPortNum[0]:" << udpPortNum[0]; } -void UDPBaseImplementation::setUDPPortNo2(const uint32_t i){ +void UDPBaseImplementation::setUDPPortNumber2(const uint32_t i){ FILE_LOG(logDEBUG) << __AT__ << " starting"; udpPortNum[1] = i; @@ -304,26 +308,32 @@ void UDPBaseImplementation::setEthernetInterface(const char* c){ } -/***connection parameters***/ +/***acquisition parameters***/ void UDPBaseImplementation::setShortFrameEnable(const int i){ FILE_LOG(logDEBUG) << __AT__ << " starting"; shortFrameEnable = i; - FILE_LOG(logINFO) << "Short Frame Enable:" << shortFrameEnable; + FILE_LOG(logINFO) << "Short Frame Enable: " << stringEnable(shortFrameEnable); } -void UDPBaseImplementation::setFrameToGuiFrequency(const uint32_t i){ +int UDPBaseImplementation::setFrameToGuiFrequency(const uint32_t i){ FILE_LOG(logDEBUG) << __AT__ << " starting"; FrameToGuiFrequency = i; FILE_LOG(logINFO) << "Frame To Gui Frequency:" << FrameToGuiFrequency; + + //overrridden child classes might return FAIL + return OK; } -void UDPBaseImplementation::setAcquisitionPeriod(const uint64_t i){ +int UDPBaseImplementation::setAcquisitionPeriod(const uint64_t i){ FILE_LOG(logDEBUG) << __AT__ << " starting"; acquisitionPeriod = i; FILE_LOG(logINFO) << "Acquisition Period:" << acquisitionPeriod; + + //overrridden child classes might return FAIL + return OK; } void UDPBaseImplementation::setNumberOfFrames(const uint64_t i){ @@ -333,20 +343,25 @@ void UDPBaseImplementation::setNumberOfFrames(const uint64_t i){ FILE_LOG(logINFO) << "Number of Frames:" << numberOfFrames; } -void UDPBaseImplementation::setDynamicRange(const uint32_t i){ +int UDPBaseImplementation::setDynamicRange(const uint32_t i){ FILE_LOG(logDEBUG) << __AT__ << " starting"; dynamicRange = i; FILE_LOG(logINFO) << "Dynamic Range:" << dynamicRange; + + //overrridden child classes might return FAIL + return OK; } -void UDPBaseImplementation::setTenGigaEnable(const bool b){ +int UDPBaseImplementation::setTenGigaEnable(const bool b){ FILE_LOG(logDEBUG) << __AT__ << " starting"; tengigaEnable = b; - FILE_LOG(logINFO) << "Ten Giga Enable:" << tengigaEnable; -} + FILE_LOG(logINFO) << "Ten Giga Enable: " << stringEnable(tengigaEnable); + //overridden functions might return FAIL + return OK; +} /************************************************************************* @@ -354,11 +369,13 @@ void UDPBaseImplementation::setTenGigaEnable(const bool b){ * They may modify the status of the receiver **************************** *************************************************************************/ + /***initial functions***/ int UDPBaseImplementation::setDetectorType(const slsReceiverDefs::detectorType d){ FILE_LOG(logDEBUG) << __AT__ << " starting"; myDetectorType = d; + //if eiger, set numberofListeningThreads = 2; FILE_LOG(logINFO) << "Detector Type:" << slsDetectorBase::getDetectorType(d); return OK; } diff --git a/slsReceiverSoftware/src/UDPInterface.cpp b/slsReceiverSoftware/src/UDPInterface.cpp index 952098811..937035845 100644 --- a/slsReceiverSoftware/src/UDPInterface.cpp +++ b/slsReceiverSoftware/src/UDPInterface.cpp @@ -5,11 +5,11 @@ ***********************************************/ - -#include #include +#include using namespace std; + #include "UDPInterface.h" #include "UDPBaseImplementation.h" #include "UDPStandardImplementation.h" diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index dfbe51493..0f4290c9c 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -10,24 +10,920 @@ #include "gotthardModuleData.h" #include "gotthardShortModuleData.h" - -#include // SIGINT -#include // stat -#include // socket(), bind(), listen(), accept(), shut down -#include // sock_addr_in, htonl, INADDR_ANY +//#include // socket(), bind(), listen(), accept(), shut down +//#include // sock_addr_in, htonl, INADDR_ANY #include // exit() -#include //set precision -#include //munmap +#include //set precision for printing parameters for create new file +#include //map +//#include //munmap - - -#include #include +#include #include using namespace std; - #define WRITE_HEADERS + +/************************************************************************* + * Constructor & Destructor ********************************************** + * They access local cache of configuration or detector parameters ******* + *************************************************************************/ + +UDPStandardImplementation::UDPStandardImplementation(){ + FILE_LOG(logDEBUG) << __AT__ << " called"; + + initializeMembers(); +} + +UDPStandardImplementation::~UDPStandardImplementation(){ + FILE_LOG(logDEBUG) << __AT__ << " called"; + + deleteMembers(); +} + + +/************************************************************************* + * Setters *************************************************************** + * They modify the local cache of configuration or detector parameters *** + *************************************************************************/ + +/***initial parameters***/ +void UDPStandardImplementation::deleteBaseMembers(){ + FILE_LOG(logDEBUG) << __AT__ << " starting"; + + UDPBaseImplementation::~UDPBaseImplementation(); +} + +void UDPStandardImplementation::deleteMembers(){ + FILE_LOG(logDEBUG) << __AT__ << " starting"; + + cout << "Info: Deleting member pointers" << endl; + //filter + deleteFilter(); + //kill threads + if(threadStarted){ + createListeningThreads(true); + createWriterThreads(true); + } + //shutdownudpsockets + //close file + if(latestData) {delete[] latestData; latestData = NULL;} +} + +void UDPStandardImplementation::deleteFilter(){ + FILE_LOG(logDEBUG) << __AT__ << " starting"; + + moenchCommonModeSubtraction = NULL; + for(int i=0; i(receiverData[i], csize, sigma, sign, commonModeSubtractionEnable); +} + + + +int UDPStandardImplementation::createListeningThreads(bool destroy){ + FILE_LOG(logDEBUG) << __AT__ << " starting"; + + //reset masks + killAllListeningThreads = false; + pthread_mutex_lock(&status_mutex); + listeningThreadsMask = 0x0; + pthread_mutex_unlock(&(status_mutex)); + + //destroy + if(destroy){ + cout << "Info: Destroying Listening Thread(s)" << endl; + + killAllListeningThreads = true; + for(int i = 0; i < numberofListeningThreads; ++i){ + sem_post(&listenSemaphore[i]); + pthread_join(listeningThreads[i],NULL); + cout <<"."< MAX_JOBS_PER_THREAD) + numberofJobsPerBuffer = MAX_JOBS_PER_THREAD; + else if (i < 1) + numberofJobsPerBuffer = 1; + else + numberofJobsPerBuffer = i; + + } + cout << "Info: Number of Frames per buffer:" << numberofJobsPerBuffer << endl; + } + + //set fifo depth + //eiger listens to 1 packet at a time and size changes depending on packets per frame + if(myDetectorType == EIGER) + fifoSize = EIGER_FIFO_SIZE * packetsPerFrame; + else{ + fifoSize = GOTTHARD_FIFO_SIZE; + if(myDetectorType == MOENCH) + fifoSize = MOENCH_FIFO_SIZE; + else if(myDetectorType == PROPIX) + fifoSize = PROPIX_FIFO_SIZE; + //reduce fifo depth if more frames listened to at a time + if(fifoSize % numberofJobsPerBuffer) + fifoSize = (fifoSize/numberofJobsPerBuffer)+1; + else + fifoSize = fifoSize/numberofJobsPerBuffer; + } +#ifdef VERBOSE + cout << "Info: Fifo Depth:" << fifoSize << endl; +#endif + + + //do not rebuild fifo structure if it is the same + if((oldNumberofJobsPerBuffer == numberofJobsPerBuffer) && (oldFifoSize == fifoSize)) + return OK; + + + //set up fifo structure + for(int i=0;iisEmpty()) + fifoFree[i]->pop(buffer[i]); +#ifdef FIFO_DEBUG + cprintf(GREEN,"%d fifostructure popped from fifofree %p\n", i, (void*)(buffer[i])); +#endif + delete fifoFree[i]; + } + if(fifo[i]) delete fifo[i]; + if(mem0[i]) free(mem0[i]); + + //creating + fifoFree[i] = new CircularFifo(fifoSize); + fifo[i] = new CircularFifo(fifoSize); + + //allocate memory + mem0[i] = (char*)malloc((bufferSize * numberofJobsPerBuffer + HEADER_SIZE_NUM_TOT_PACKETS) * fifoSize); + if (mem0[i] == NULL){ + cprintf(BG_RED,"Error: Could not allocate memory for listening \n"); + return FAIL; + } + + //push free address into fifoFree + buffer[i]=mem0[i]; + while (buffer[i] < (mem0[i]+(bufferSize * numberofJobsPerBuffer + HEADER_SIZE_NUM_TOT_PACKETS) * (fifoSize-1))) { + fifoFree[i]->push(buffer[i]); +#ifdef FIFO_DEBUG + cprintf(BLUE,"%d fifostructure free pushed into fifofree %p\n", i, (void*)(buffer[i])); +#endif + buffer[i] += (bufferSize * numberofJobsPerBuffer + HEADER_SIZE_NUM_TOT_PACKETS); + } + } + cout << "Info: Fifo structure(s) reconstructed" << endl; +} + + + + +int UDPStandardImplementation::createUDPSockets(){ + FILE_LOG(logDEBUG) << __AT__ << " called"; + + + + + return OK; +} + + + + +void UDPStandardImplementation::configure(map config_map){ + FILE_LOG(logDEBUG) << __AT__ << " starting"; + + map::const_iterator pos; + pos = config_map.find("mode"); + if (pos != config_map.end() ){ + int b; + if(!sscanf(pos->second.c_str(), "%d", &b)){ + cout << "Warning: Could not parse mode. Assuming top mode." << endl; + b = 0; + } + bottomEnable = b!= 0; + cout << "Info: Bottom Enable: " << stringEnable(bottomEnable) << endl; + } + +} + + +/***file parameters***/ +int UDPStandardImplementation::setDataCompressionEnable(const bool b){ + FILE_LOG(logDEBUG) << __AT__ << " starting"; + + cout << "Info: Setting up Data Compression Enable to " << stringEnable(b); +#ifdef MYROOT1 + cout << " WITH ROOT" << endl; +#else + cout << " WITHOUT ROOT" << endl; +#endif + + //set data compression enable + dataCompressionEnable = b; + + //-- create writer threads depending on enable + pthread_mutex_lock(&status_mutex); + writerThreadsMask = 0x0; + pthread_mutex_unlock(&(status_mutex)); + + createWriterThreads(true); + if(b) + numberofWriterThreads = MAX_NUMBER_OF_WRITER_THREADS; + else + numberofWriterThreads = 1; + if(createWriterThreads() == FAIL){ + cprintf(BG_RED,"Error: Could not create writer threads\n"); + return FAIL; + } + //-- end of create writer threads + setThreadPriorities(); + + //filter + deleteFilter(); + if(b) + initializeFilter(); + + cout << "Info: Data Compression " << stringEnable(dataCompressionEnable) << endl; + + return OK; +} + + +/***acquisition parameters***/ +void UDPStandardImplementation::setShortFrameEnable(const int i){ + FILE_LOG(logDEBUG) << __AT__ << " called"; + + shortFrameEnable = i; + + if(shortFrameEnable!=-1){ + frameSize = GOTTHARD_SHORT_BUFFER_SIZE; + bufferSize = GOTTHARD_SHORT_BUFFER_SIZE; + onePacketSize = GOTTHARD_SHORT_BUFFER_SIZE; + oneDataSize = GOTTHARD_SHORT_DATABYTES; + maxPacketsPerFile = SHORT_MAX_FRAMES_PER_FILE * GOTTHARD_SHORT_PACKETS_PER_FRAME; + packetsPerFrame = GOTTHARD_SHORT_PACKETS_PER_FRAME; + frameIndexMask = GOTTHARD_SHORT_FRAME_INDEX_MASK; + frameIndexOffset = GOTTHARD_SHORT_FRAME_INDEX_OFFSET; + packetIndexMask = GOTTHARD_SHORT_PACKET_INDEX_MASK; + + }else{ + frameSize = GOTTHARD_BUFFER_SIZE; + bufferSize = GOTTHARD_BUFFER_SIZE; + onePacketSize = GOTTHARD_ONE_PACKET_SIZE; + oneDataSize = GOTTHARD_ONE_DATA_SIZE; + maxPacketsPerFile = MAX_FRAMES_PER_FILE * GOTTHARD_PACKETS_PER_FRAME; + packetsPerFrame = GOTTHARD_PACKETS_PER_FRAME; + frameIndexMask = GOTTHARD_FRAME_INDEX_MASK; + frameIndexOffset = GOTTHARD_FRAME_INDEX_OFFSET; + packetIndexMask = GOTTHARD_PACKET_INDEX_MASK; + } + + //filter + deleteFilter(); + if(dataCompressionEnable) + initializeFilter(); + + cout << "Info: Short Frame Enable set to " << shortFrameEnable << endl; +} + + +int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t i){ + FILE_LOG(logDEBUG) << __AT__ << " called"; + + if(i >= 0){ + FrameToGuiFrequency = i; + if(setupFifoStructure() == FAIL) + return FAIL; + } + + cout << "Info: Frame to Gui Frequency set to " << FrameToGuiFrequency << endl; + + return OK; +} + + +int UDPStandardImplementation::setAcquisitionPeriod(int64_t i){ + FILE_LOG(logDEBUG) << __AT__ << " called"; + + if(i >= 0){ + acquisitionPeriod = i; + if(setupFifoStructure() == FAIL) + return FAIL; + } + + cout << "Info: Acquisition Period set to " << acquisitionPeriod << endl; + + + return OK; +} + +int UDPStandardImplementation::setDynamicRange(const uint32_t i){ + FILE_LOG(logDEBUG) << __AT__ << " called"; + + int oldDynamicRange = dynamicRange; + + cout << "Info: Setting Dynamic Range to " << i << endl; + dynamicRange = i; + + if(myDetectorType == EIGER){ + + //set parameters depending on new dynamic range. + packetsPerFrame = (tengigaEnable ? EIGER_TEN_GIGA_CONSTANT : EIGER_ONE_GIGA_CONSTANT) + * dynamicRange * EIGER_MAX_PORTS; + frameSize = onePacketSize * packetsPerFrame; + maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; + + //new dynamic range, then restart threads and resetup fifo structure + if(oldDynamicRange != dynamicRange){ + + //delete threads + if(threadStarted){ + createListeningThreads(true); + createWriterThreads(true); + } + + //gui buffer + if(latestData){delete[] latestData; latestData = NULL;} + latestData = new char[frameSize]; + + //restructure fifo + if(setupFifoStructure() == FAIL) + return FAIL; + + //create threads + if(createListeningThreads() == FAIL){ + cprintf(BG_RED,"Error: Could not create listening thread\n"); + return FAIL; + } + if(createWriterThreads() == FAIL){ + cprintf(BG_RED,"Error: Could not create writer threads\n"); + return FAIL; + } + setThreadPriorities(); + } + + } + + cout << "Info: Dynamic Range set to " << dynamicRange << endl; + + return OK; +} + + + +int UDPStandardImplementation::setTenGigaEnable(const bool b){ + FILE_LOG(logDEBUG) << __AT__ << " called"; + + cout << "Info: Setting Ten Giga to " << string(b) << endl; + bool oldTenGigaEnable = tengigaEnable; + tengigaEnable = b; + + if(myDetectorType == EIGER){ + + //set parameters depending on 10g + if(tengigaEnable){ + packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; + onePacketSize = EIGER_TEN_GIGA_ONE_PACKET_SIZE; + oneDataSize = EIGER_TEN_GIGA_ONE_DATA_SIZE; + }else{ + packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; + onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE; + oneDataSize = EIGER_ONE_GIGA_ONE_DATA_SIZE; + } + frameSize = onePacketSize * packetsPerFrame; + bufferSize = onePacketSize; + maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; + + FILE_LOG(logDEBUG1) << dec << + "packetsPerFrame:" << packetsPerFrame << + "\nonePacketSize:" << onePacketSize << + "\noneDataSize:" << oneDataSize << + "\nframesize:" << frameSize << + "\nbufferSize:" << bufferSize << + "\nmaxPacketsPerFile:" << maxPacketsPerFile << endl; + + + + //new enable, then restart threads and resetup fifo structure + if(oldTenGigaEnable != tengigaEnable){ + + //delete threads + if(threadStarted){ + createListeningThreads(true); + createWriterThreads(true); + } + + //gui buffer + if(latestData){delete[] latestData; latestData = NULL;} + latestData = new char[frameSize]; + + //restructure fifo + if(setupFifoStructure() == FAIL) + return FAIL; + + //create threads + if(createListeningThreads() == FAIL){ + cprintf(BG_RED,"Error: Could not create listening thread\n"); + return FAIL; + } + if(createWriterThreads() == FAIL){ + cprintf(BG_RED,"Error: Could not create writer threads\n"); + return FAIL; + } + setThreadPriorities(); + } + + } + + cout << "Info: Ten Giga " << string(tengigaEnable) << endl; + + return OK; +} + + + +/************************************************************************* + * Behavioral functions*************************************************** + * They may modify the status of the receiver **************************** + *************************************************************************/ + + +/***initial functions***/ +int UDPStandardImplementation::setDetectorType(const slsReceiverDefs::detectorType d){ + FILE_LOG(logDEBUG) << __AT__ << " called"; + + cout << "Setting receiver type ..." << endl; + + deleteBaseMembers(); + deleteMembers(); + initializeBaseMembers(); + initializeMembers(); + + myDetectorType = d; + switch(myDetectorType){ + case GOTTHARD: + case PROPIX: + case MOENCH: + case EIGER: + case JUNGFRAUCTB: + case JUNGFRAU: + cout << "Info: ***** This is a " << slsDetectorBase::getDetectorType(d) << " Receiver *****" << endl; + break; + default: + cout << "Error: This is an unknown receiver type " << (int)d << endl; + return FAIL; + } + + //set detector specific variables + switch(myDetectorType){ + case GOTTHARD: + packetsPerFrame = GOTTHARD_PACKETS_PER_FRAME; + onePacketSize = GOTTHARD_ONE_PACKET_SIZE; + oneDataSize = GOTTHARD_ONE_DATA_SIZE; + frameSize = GOTTHARD_BUFFER_SIZE; + bufferSize = GOTTHARD_BUFFER_SIZE; + frameIndexMask = GOTTHARD_FRAME_INDEX_MASK; + frameIndexOffset = GOTTHARD_FRAME_INDEX_OFFSET; + packetIndexMask = GOTTHARD_PACKET_INDEX_MASK; + maxPacketsPerFile = MAX_FRAMES_PER_FILE * GOTTHARD_PACKETS_PER_FRAME; + fifoSize = GOTTHARD_FIFO_SIZE; + //footerOffset = Not applicable; + break; + case PROPIX: + packetsPerFrame = PROPIX_PACKETS_PER_FRAME; + onePacketSize = PROPIX_ONE_PACKET_SIZE; + //oneDataSize = Not applicable; + frameSize = PROPIX_BUFFER_SIZE; + bufferSize = PROPIX_BUFFER_SIZE; + frameIndexMask = PROPIX_FRAME_INDEX_MASK; + frameIndexOffset = PROPIX_FRAME_INDEX_OFFSET; + packetIndexMask = PROPIX_PACKET_INDEX_MASK; + maxPacketsPerFile = MAX_FRAMES_PER_FILE * PROPIX_PACKETS_PER_FRAME; + fifoSize = PROPIX_FIFO_SIZE; + //footerOffset = Not applicable; + break; + case MOENCH: + packetsPerFrame = MOENCH_PACKETS_PER_FRAME; + onePacketSize = MOENCH_ONE_PACKET_SIZE; + oneDataSize = MOENCH_ONE_DATA_SIZE; + frameSize = MOENCH_BUFFER_SIZE; + bufferSize = MOENCH_BUFFER_SIZE; + frameIndexMask = MOENCH_FRAME_INDEX_MASK; + frameIndexOffset = MOENCH_FRAME_INDEX_OFFSET; + packetIndexMask = MOENCH_PACKET_INDEX_MASK; + maxPacketsPerFile = MOENCH_MAX_FRAMES_PER_FILE * MOENCH_PACKETS_PER_FRAME; + fifoSize = MOENCH_FIFO_SIZE; + //footerOffset = Not applicable; + break; + case EIGER: + //assuming 1G in the beginning + packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; + onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE; + oneDataSize = EIGER_ONE_GIGA_ONE_DATA_SIZE; + frameSize = onePacketSize * packetsPerFrame; + bufferSize = onePacketSize; + frameIndexMask = EIGER_FRAME_INDEX_MASK; + frameIndexOffset = EIGER_FRAME_INDEX_OFFSET; + packetIndexMask = EIGER_PACKET_INDEX_MASK; + maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; + fifoSize = EIGER_FIFO_SIZE; + footerOffset = EIGER_PACKET_HEADER_SIZE + oneDataSize; + break; + case JUNGFRAUCTB: + case JUNGFRAU: + packetsPerFrame = JCTB_PACKETS_PER_FRAME; + onePacketSize = JCTB_ONE_PACKET_SIZE; + //oneDataSize = Not applicable; + frameSize = JCTB_BUFFER_SIZE; + bufferSize = JCTB_BUFFER_SIZE; + frameIndexMask = JCTB_FRAME_INDEX_MASK; + frameIndexOffset = JCTB_FRAME_INDEX_OFFSET; + packetIndexMask = JCTB_PACKET_INDEX_MASK; + maxPacketsPerFile = JFCTB_MAX_FRAMES_PER_FILE * JCTB_PACKETS_PER_FRAME; + fifoSize = JCTB_FIFO_SIZE; + //footerOffset = Not applicable; + break; + } + + //delete threads and set number of listening threads + if(myDetectorType == EIGER){ + pthread_mutex_lock(&status_mutex); + listeningThreadsMask = 0x0; + pthread_mutex_unlock(&(status_mutex)); + if(threadStarted) + createListeningThreads(true); + numberofListeningThreads = MAX_NUMBER_OF_LISTENING_THREADS; + } + + //set up fifo structure -1 for numberofJobsPerBuffer ensure it is done + numberofJobsPerBuffer = -1; + setupFifoStructure(); + + //create threads + if(createListeningThreads() == FAIL){ + cprintf(BG_RED,"Error: Could not create listening thread\n"); + exit (-1); + } + if(createWriterThreads() == FAIL){ + cprintf(BG_RED,"Error: Could not create writer threads\n"); + exit (-1); + } + setThreadPriorities(); + + //allocate for latest data (frame copy for gui) + latestData = new char[frameSize]; + + cout << " Detector type set to " << slsDetectorBase::getDetectorType(d) << endl; + cout << "Ready..." << endl; + + return OK; +} + + +/***acquisition functions***/ +void UDPStandardImplementation::resetAcquisitionCount(){ + FILE_LOG(logDEBUG) << __AT__ << " starting"; + + totalPacketsCaught = 0; + acqStarted = false; + startAcquisitionIndex = 0; + + cout << "Info: Acquisition Count has been reset" << endl; +} + + +int UDPStandardImplementation::startReceiver(char *c=NULL){ + FILE_LOG(logDEBUG) << __AT__ << " called"; + + cout << "Info: Starting Receiver" << endl; + + //reset measurement variables + measurementStarted = false; + startFrameIndex = 0; + if(!acqStarted) + currentFrameNumber = 0; //has to be zero to add to startframeindex for each scan + for(int i = 0; i < numberofListeningThreads; ++i) + totalListeningFrameCount[i] = 0; + + //create UDP sockets + if(createUDPSockets() == FAIL){ + + } + + return OK; +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + UDPStandardImplementation::UDPStandardImplementation(){ @@ -36,7 +932,7 @@ UDPStandardImplementation::UDPStandardImplementation(){ latestData = NULL; guiFileName = NULL; tengigaEnable = 0; - footer_offset = 0; + for(int i=0;i config_map){ - FILE_LOG(logWARNING) << __AT__ << " called"; - - map::const_iterator pos; - pos = config_map.find("mode"); - if (pos != config_map.end() ){ - int b; - if(!sscanf(pos->second.c_str(), "%d", &b)){ - cout << "Warning: Could not parse mode. Assuming top mode." << endl; - b = 0; - } - bottom = b!= 0; - cout << "bottom:"<< bottom << endl; - } -}; void UDPStandardImplementation::initializeMembers(){ myDetectorType = GENERIC; - maxPacketsPerFile = 0; enableFileWrite = 1; overwrite = 1; fileIndex = 0; scanTag = 0; frameIndexNeeded = 0; - acqStarted = false; - measurementStarted = false; - startFrameIndex = 0; + + frameIndex = 0; packetsCaught = 0; totalPacketsCaught = 0; @@ -119,7 +998,7 @@ void UDPStandardImplementation::initializeMembers(){ numMissingPackets = 0; startAcquisitionIndex = 0; acquisitionIndex = 0; - packetsPerFrame = 0; + frameIndexMask = 0; packetIndexMask = 0; frameIndexOffset = 0; @@ -129,39 +1008,24 @@ void UDPStandardImplementation::initializeMembers(){ shortFrame = -1; currframenum = 0; prevframenum = 0; - frameSize = 0; - bufferSize = 0; - onePacketSize = 0; - oneDataSize = 0; + guiDataReady = 0; nFrameToGui = 0; - fifosize = 0; - numJobsPerThread = -1; dataCompression = false; numListeningThreads = 1; numWriterThreads = 1; thread_started = 0; - currentListeningThreadIndex = -1; - currentWriterThreadIndex = -1; - for(int i=0;i=0) - fileIndex = i; - return getFileIndex(); -} -*/ -/* -int UDPStandardImplementation::setFrameIndexNeeded(int i){ - frameIndexNeeded = i; - return frameIndexNeeded; -} -*/ -/* -int UDPStandardImplementation::getEnableFileWrite() const{ - return enableFileWrite; -} -*/ - -/* -int UDPStandardImplementation::setEnableFileWrite(int i){ - enableFileWrite=i; - return getEnableFileWrite(); -} -*/ - -/* -int UDPStandardImplementation::getEnableOverwrite() const{ - return overwrite; -} -*/ - -/* -int UDPStandardImplementation::setEnableOverwrite(int i){ - overwrite=i; - return getEnableOverwrite(); -} -*/ /*other parameters*/ -slsReceiverDefs::runStatus UDPStandardImplementation::getStatus() const{ - FILE_LOG(logDEBUG) << __AT__ << " called, status: " << status; - - - return status; -} - - -void UDPStandardImplementation::setDetectorHostname(const char *detectorHostName){ - if(strlen(detectorHostName)) - strcpy(detHostname,detectorHostName); -} - - -char *UDPStandardImplementation::getDetectorHostname() const{ - if(!strlen(detHostname)) - return NULL; - return (char*)detHostname; -} - -void UDPStandardImplementation::setEthernetInterface(char* c){ FILE_LOG(logDEBUG) << __AT__ << " called"; - - strcpy(eth,c); -} - - -void UDPStandardImplementation::setUDPPortNo(int p){ -FILE_LOG(logDEBUG) << __AT__ << " called"; - server_port[0] = p; -} - - -void UDPStandardImplementation::setUDPPortNo2(int p){ -FILE_LOG(logDEBUG) << __AT__ << " called"; - server_port[1] = p; -} - - -int UDPStandardImplementation::getNumberOfFrames() const { - return numberOfFrames; -} - - -int32_t UDPStandardImplementation::setNumberOfFrames(int32_t fnum){ FILE_LOG(logDEBUG) << __AT__ << " called"; - - if(fnum >= 0) - numberOfFrames = fnum; - - return getNumberOfFrames(); -} - -int UDPStandardImplementation::getScanTag() const{ - return scanTag; -} - - -int32_t UDPStandardImplementation::setScanTag(int32_t stag){ FILE_LOG(logDEBUG) << __AT__ << " called"; - - if(stag >= 0) - scanTag = stag; - - return getScanTag(); -} - - -int UDPStandardImplementation::getDynamicRange() const{ - return dynamicRange; -} - -int32_t UDPStandardImplementation::setDynamicRange(int32_t dr){ FILE_LOG(logDEBUG) << __AT__ << " called"; - - int olddr = dynamicRange; - - if(dr >= 0){ - cout << "Setting Dynamic Range to " << dr << endl; - - dynamicRange = dr; - - if(myDetectorType == EIGER){ - - - if(!tengigaEnable){ - packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; - onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE; - oneDataSize = EIGER_ONE_GIGA_ONE_DATA_SIZE; - }else{ - packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; - onePacketSize = EIGER_TEN_GIGA_ONE_PACKET_SIZE; - oneDataSize = EIGER_TEN_GIGA_ONE_DATA_SIZE; - } - - footer_offset = EIGER_PACKET_HEADER_SIZE + oneDataSize; - frameSize = onePacketSize * packetsPerFrame; - bufferSize = (frameSize/EIGER_MAX_PORTS) + EIGER_HEADER_LENGTH;//everything one port gets (img header plus packets) - maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; - - - - if(olddr != dr){ - - //del - if(thread_started){ - createListeningThreads(true); - createWriterThreads(true); - } - for(int i=0;i=0){ - nFrameToGui = i; - setupFifoStructure(); - } - return nFrameToGui; -} - - - -int64_t UDPStandardImplementation::setAcquisitionPeriod(int64_t index){ FILE_LOG(logDEBUG) << __AT__ << " called"; - - - if(index >= 0){ - if(index != acquisitionPeriod){ - acquisitionPeriod = index; - setupFifoStructure(); - } - } - return acquisitionPeriod; -} - - -bool UDPStandardImplementation::getDataCompression(){ FILE_LOG(logDEBUG) << __AT__ << " called"; -return dataCompression;} - -int UDPStandardImplementation::enableDataCompression(bool enable){ FILE_LOG(logDEBUG) << __AT__ << " called"; - - cout << "Data compression "; - if(enable) - cout << "enabled" << endl; - else - cout << "disabled" << endl; -#ifdef MYROOT1 - cout << " WITH ROOT" << endl; -#else - cout << " WITHOUT ROOT" << endl; -#endif - //delete filter for the current number of threads - deleteFilter(); - - dataCompression = enable; - pthread_mutex_lock(&status_mutex); - writerthreads_mask = 0x0; - pthread_mutex_unlock(&(status_mutex)); - - createWriterThreads(true); - - if(enable) - numWriterThreads = MAX_NUM_WRITER_THREADS; - else - numWriterThreads = 1; - - if(createWriterThreads() == FAIL){ - cprintf(BG_RED,"ERROR: Could not create writer threads\n"); - return FAIL; - } - setThreadPriorities(); - - - if(enable) - setupFilter(); - - return OK; -} - - - - - - - - - - - - -/*other functions*/ - - -void UDPStandardImplementation::deleteFilter(){ FILE_LOG(logDEBUG) << __AT__ << " called"; - - int i; - cmSub=NULL; - - for(i=0;i(receiverdata[i], csize, sigma, sign, cmSub); - -} - - - -//LEO: it is not clear to me.. -void UDPStandardImplementation::setupFifoStructure(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - int64_t i; - int oldn = numJobsPerThread; - - //if every nth frame mode - if(nFrameToGui) - numJobsPerThread = nFrameToGui; - - //random nth frame mode - else{ - if(!acquisitionPeriod) - i = SAMPLE_TIME_IN_NS; - else - i = SAMPLE_TIME_IN_NS/acquisitionPeriod; - if (i > MAX_JOBS_PER_THREAD) - numJobsPerThread = MAX_JOBS_PER_THREAD; - else if (i < 1) - numJobsPerThread = 1; - else - numJobsPerThread = i; - } - - //if same, return - if(oldn == numJobsPerThread) - return; - - if(myDetectorType == EIGER) - numJobsPerThread = 1; - - //otherwise memory too much if numjobsperthread is at max = 1000 - fifosize = GOTTHARD_FIFO_SIZE; - if(myDetectorType == MOENCH) - fifosize = MOENCH_FIFO_SIZE; - if(myDetectorType == PROPIX) - fifosize = PROPIX_FIFO_SIZE; - else if(myDetectorType == EIGER) - fifosize = EIGER_FIFO_SIZE * packetsPerFrame; - - if(fifosize % numJobsPerThread) - fifosize = (fifosize/numJobsPerThread)+1; - else - fifosize = fifosize/numJobsPerThread; - - if(myDetectorType == EIGER) - cout << "1 packet per buffer" << endl; - else - cout << "Number of Frames per buffer:" << numJobsPerThread << endl; -#ifdef VERBOSE - cout << "Fifo Size:" << fifosize << endl; -#endif - /* - //for testing - numJobsPerThread = 3; fifosize = 11; - */ - - for(int i=0;iisEmpty()) - fifoFree[i]->pop(buffer[i]); -#ifdef FIFO_DEBUG - //cprintf(GREEN,"%d fifostructure popped from fifofree %x\n", i, (void*)(buffer[i])); -#endif - delete fifoFree[i]; - } - if(fifo[i]) delete fifo[i]; - if(mem0[i]) free(mem0[i]); - fifoFree[i] = new CircularFifo(fifosize); - fifo[i] = new CircularFifo(fifosize); - - - int whatperbuffer = bufferSize; - if(myDetectorType == EIGER) - whatperbuffer = onePacketSize; - - //allocate memory - mem0[i]=(char*)malloc((whatperbuffer * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize); - /** shud let the client know about this */ - if (mem0[i]==NULL){ - cprintf(BG_RED,"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++\n"); - exit(-1); - } - - buffer[i]=mem0[i]; - //push the addresses into freed fifoFree and writingFifoFree - while (buffer[i]<(mem0[i]+(whatperbuffer * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) { - fifoFree[i]->push(buffer[i]); -#ifdef FIFO_DEBUG - cprintf(BLUE,"%d fifostructure free pushed into fifofree %x\n", i, (void*)(buffer[i])); -#endif - buffer[i]+=(whatperbuffer * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS); - } - } - cout << "Fifo structure(s) reconstructed" << endl; -} - - - - @@ -1113,164 +1342,6 @@ int UDPStandardImplementation::shutDownUDPSockets(){ -int UDPStandardImplementation::createListeningThreads(bool destroy){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - int i; - void* status; - - killAllListeningThreads = 0; - - pthread_mutex_lock(&status_mutex); - listeningthreads_mask = 0x0; - pthread_mutex_unlock(&(status_mutex)); - - FILE_LOG(logDEBUG) << "Starting " << __func__ << endl; - - if(!destroy){ - - //start listening threads - cout << "Creating Listening Threads(s)"; - - currentListeningThreadIndex = -1; - - for(i = 0; i < numListeningThreads; ++i){ - sem_init(&listensmp[i],1,0); - thread_started = 0; - currentListeningThreadIndex = i; - if(pthread_create(&listening_thread[i], NULL,startListeningThread, (void*) this)){ - cout << "Could not create listening thread with index " << i << endl; - return FAIL; - } - while(!thread_started); - cout << "."; - cout << flush; - } -#ifdef VERBOSE - cout << "Listening thread(s) created successfully." << endl; -#else - cout << endl; -#endif - }else{ - cout<<"Destroying Listening Thread(s)"<= 0){ - - tengigaEnable = enable; - - if(myDetectorType == EIGER){ - - if(!tengigaEnable){ - packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; - onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE; - oneDataSize = EIGER_ONE_GIGA_ONE_DATA_SIZE; - }else{ - packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; - onePacketSize = EIGER_TEN_GIGA_ONE_PACKET_SIZE; - oneDataSize = EIGER_TEN_GIGA_ONE_DATA_SIZE; - } - frameSize = onePacketSize * packetsPerFrame; - bufferSize = (frameSize/EIGER_MAX_PORTS) + EIGER_HEADER_LENGTH;//everything one port gets (img header plus packets) - maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; - -#ifdef VERBOSE - cout<<"packetsPerFrame:"<= 0) - receiverBase->setFrameToGuiFrequency(index); + if(index >= 0){ + ret = receiverBase->setFrameToGuiFrequency(index); + if(ret == FAIL) + strcpy(mess, "Could not allocate memory for listening fifo\n"); + } retval=receiverBase->getFrameToGuiFrequency(); if(index>=0 && retval!=index) ret = FAIL; @@ -2107,8 +2110,11 @@ int slsReceiverTCPIPInterface::set_timer() { } else{ if(index[0] == slsReceiverDefs::FRAME_PERIOD){ - if(index[1]>=0) - receiverBase->setAcquisitionPeriod(index[1]); + if(index[1]>=0){ + ret = receiverBase->setAcquisitionPeriod(index[1]); + if(ret == FAIL) + strcpy(mess,"Could not allocate memory for listening fifo\n") + } retval=receiverBase->getAcquisitionPeriod(); }else{ if(index[1]>=0) @@ -2183,17 +2189,19 @@ int slsReceiverTCPIPInterface::enable_compression() { } else{ if(enable >= 0) - receiverBase->setDataCompressionEnable(enable); + ret = receiverBase->setDataCompressionEnable(enable); } } - if (receiverBase == NULL){ - strcpy(mess,"Receiver not set up\n"); - ret=FAIL; - }else{ - retval = receiverBase->getDataCompressionEnable(); - if(enable >= 0 && retval != enable) - ret = FAIL; + if(ret != FAIL){ + if (receiverBase == NULL){ + strcpy(mess,"Receiver not set up\n"); + ret=FAIL; + }else{ + retval = receiverBase->getDataCompressionEnable(); + if(enable >= 0 && retval != enable) + ret = FAIL; + } } } @@ -2324,8 +2332,11 @@ int slsReceiverTCPIPInterface::set_dynamic_range() { strcpy(mess,"Receiver not set up\n"); ret=FAIL; }else{ - if(dr > 0) - receiverBase->setDynamicRange(dr); + if(dr > 0){ + ret = receiverBase->setDynamicRange(dr); + if(ret == FAIL) + strcpy(mess, "Could not allocate memory for fifo or could not start listening/writing threads\n"); + } retval = receiverBase->getDynamicRange(); if(dr > 0 && retval != dr) ret = FAIL; @@ -2461,8 +2472,8 @@ int slsReceiverTCPIPInterface::enable_tengiga() { } else{ if(val >= 0) - receiverBase->setDataCompressionEnable(val); - retval=receiverBase->getDataCompressionEnable(); + ret = receiverBase->setTenGigaEnable(val); + retval=receiverBase->getTenGigaEnable(); if((val >= 0) && (val != retval)) ret = FAIL; else