diff --git a/slsReceiverSoftware/include/UDPInterface.h b/slsReceiverSoftware/include/UDPInterface.h index 798c45f6c..b00f038cb 100644 --- a/slsReceiverSoftware/include/UDPInterface.h +++ b/slsReceiverSoftware/include/UDPInterface.h @@ -39,13 +39,17 @@ class UDPInterface { * * set*() : anytime after initialize(), multiple times * - * startReceiver(): anytime after initialize(). Will fail if state already is 'running': + * startReceiver(): anytime after initialize(). Will fail in TCPIP itself if state already is 'running': * * Only startReceiver() does change the data receiver configuration, it does pass the whole configuration cache to the data receiver. * * abort(), //FIXME: needed? * * stopReceiver() : anytime after initialize(). Will do nothing if state already is idle. + * Otherwise, sets status to transmitting when shutting down sockets + * then to run_finished when all data obtained + * then to idle when returning from this function + * * * getStatus() returns the actual state of the data receiver - idle, running or error, enum defined in include/sls_receiver_defs.h * diff --git a/slsReceiverSoftware/include/UDPRESTImplementation.h b/slsReceiverSoftware/include/UDPRESTImplementation.h index f335ba9d2..374199721 100644 --- a/slsReceiverSoftware/include/UDPRESTImplementation.h +++ b/slsReceiverSoftware/include/UDPRESTImplementation.h @@ -7,36 +7,21 @@ ***********************************************/ -#include "sls_receiver_defs.h" -#include "receiver_defs.h" -#include "genericSocket.h" -#include "circularFifo.h" -#include "singlePhotonDetector.h" -#include "slsReceiverData.h" -#include "moenchCommonMode.h" - #include "UDPBaseImplementation.h" - -#ifdef MYROOT1 -#include -#include -#endif - #include "RestHelper.h" - #include -#include #include -#include /** * @short does all the functions for a receiver, set/get parameters, start/stop etc. */ class UDPRESTImplementation : protected virtual slsReceiverDefs, public UDPBaseImplementation { - - public: +public: + /************************************************************************* + * Constructor & Destructor ********************************************** + *************************************************************************/ /** * Constructor */ @@ -49,787 +34,116 @@ class UDPRESTImplementation : protected virtual slsReceiverDefs, public UDPBaseI protected: - void initialize_REST(); + + /************************************************************************* + * Getters *************************************************************** + * They access local cache of configuration or detector parameters ******* + *************************************************************************/ + /** + * Get Rest State + */ int get_rest_state(RestHelper * rest, string *rest_state); + /************************************************************************* + * Setters *************************************************************** + * They modify the local cache of configuration or detector parameters *** + *************************************************************************/ + /** + * Initialize REST + */ + void initialize_REST(); + + + + public: + /************************************************************************* + * Getters *************************************************************** + * They access local cache of configuration or detector parameters ******* + *************************************************************************/ + + + /************************************************************************* + * Setters *************************************************************** + * They modify the local cache of configuration or detector parameters *** + *************************************************************************/ + + /** + * Overridden method + * Configure command line parameters + * @param config_map mapping of config parameters passed from command line arguments + */ void configure(map config_map); - /** - * delete and free member parameters - */ - void deleteMembers(); + + /************************************************************************* + * Behavioral functions*************************************************** + * They may modify the status of the receiver **************************** + *************************************************************************/ /** - * initialize member parameters + * Overridden method + * Start Listening for Packets by activating all configuration settings to receiver + * When this function returns, it has status RUNNING(upon SUCCESS) or IDLE (upon failure) + * @param c error message if FAIL + * @return OK or FAIL */ - //void initializeMembers(); + int startReceiver(char *c=NULL); /** - * Set detector hostname - * @param c hostname + * Overridden method + * Stop Listening for Packets + * Calls startReadout(), which stops listening and sets status to Transmitting + * When it has read every frame in buffer, the status changes to Run_Finished + * When this function returns, receiver has status IDLE + * Pre: status is running, semaphores have been instantiated, + * Post: udp sockets shut down, status is idle, semaphores destroyed */ - //void initialize(const char *detectorHostName); - - /* Returns detector hostname - /returns hostname - * caller needs to deallocate the returned char array. - * if uninitialized, it must return NULL - */ - //char *getDetectorHostname() const; - + void stopReceiver(); /** - * Set receiver type - * @param det detector type - * Returns success or FAIL - */ - //int setDetectorType(detectorType det); - - - //Frame indices and numbers caught - /** - * Returns the frame index at start of entire acquisition (including all scans) - */ - uint32_t getStartAcquisitionIndex(); - - /** - * Returns current Frame Index Caught for an entire acquisition (including all scans) - */ - uint32_t getAcquisitionIndex(); - - /** - * Returns if acquisition started - */ - bool getAcquistionStarted(); - - /** - * Returns Frames Caught for each real time acquisition (eg. for each scan) - */ - int getFramesCaught(); - - /** - * Returns Total Frames Caught for an entire acquisition (including all scans) - */ - int getTotalFramesCaught(); - - /** - * Returns the frame index at start of each real time acquisition (eg. for each scan) - */ - uint32_t getStartFrameIndex(); - - /** - * Returns current Frame Index for each real time acquisition (eg. for each scan) - */ - uint32_t getFrameIndex(); - - /** - * Returns if measurement started - */ - bool getMeasurementStarted(); - - /** - * Resets the Total Frames Caught - * This is how the receiver differentiates between entire acquisitions - * Returns 0 - */ - void resetTotalFramesCaught(); - - - - - //file parameters - /** - * Returns File Path - */ - //char* getFilePath() const; - - /** - * Set File Path - * @param c file path - */ - //char* setFilePath(const char c[]); - - /** - * Returns File Name - */ - //char* getFileName() const; - - /** - * Set File Name (without frame index, file index and extension) - * @param c file name - */ - //char* setFileName(const char c[]); - - /** - * Returns File Index - */ - int getFileIndex(); - - /** - * Set File Index - * @param i file index - */ - int setFileIndex(int i); - - /** - * Set Frame Index Needed - * @param i frame index needed - */ - int setFrameIndexNeeded(int i); - - /** - * Set enable file write - * @param i file write enable - * Returns file write enable - */ - //int setEnableFileWrite(int i); - - /** - * Enable/disable overwrite - * @param i enable - * Returns enable over write - */ - //int setEnableOverwrite(int i); - - /** - * Returns file write enable - * 1: YES 0: NO - */ - //int getEnableFileWrite() const; - - /** - * Returns file over write enable - * 1: YES 0: NO - */ - //int getEnableOverwrite() const; - -//other parameters - - /** - * abort acquisition with minimum damage: close open files, cleanup. - * does nothing if state already is 'idle' - */ - void abort() {}; - - /** - * Returns status of receiver: idle, running or error - */ - runStatus getStatus() const; - - - /** - * Set Ethernet Interface or IP to listen to - */ - void setEthernetInterface(char* c); - - /** - * Set UDP Port Number - */ - void setUDPPortNo(int p); - void setUDPPortNo2(int p); - - /* - * Returns number of frames to receive - * This is the number of frames to expect to receiver from the detector. - * The data receiver will change from running to idle when it got this number of frames - */ - - //int getNumberOfFrames() const; - - /** - * set frame number if a positive number - */ - //int32_t setNumberOfFrames(int32_t fnum); - - - /** - * Returns scan tag - */ - //int getScanTag() const; - - /** - * set scan tag if its is a positive number - */ - //int32_t setScanTag(int32_t stag); - - /** - * Returns the number of bits per pixel - */ - //int getDynamicRange() const; - - /** - * set dynamic range if its is a positive number - */ - int32_t setDynamicRange(int32_t dr); - - /** - * Set short frame - * @param i if shortframe i=1 - */ - int setShortFrame(int i); - - /** - * Set the variable to send every nth frame to gui - * or if 0,send frame only upon gui request - */ - int setNFrameToGui(int i); - - /** set acquisition period if a positive number - */ - int64_t setAcquisitionPeriod(int64_t index); - - /** get data compression, by saving only hits - */ - bool getDataCompression(); - - /** enabl data compression, by saving only hits - /returns if failed - */ - int enableDataCompression(bool enable); - - /** - * enable 10Gbe - @param enable 1 for 10Gbe or 0 for 1 Gbe, -1 to read out - \returns enable for 10Gbe - */ - int enableTenGiga(int enable = -1); - - - -//other functions - - /** - * Returns the buffer-current frame read by receiver - * @param c pointer to current file name - * @param raw address of pointer, pointing to current frame to send to gui - * @param fnum frame number for eiger as it is not in the packet - * @param startAcquisitionIndex is the start index of the acquisition - * @param startFrameIndex is the start index of the scan - */ - void readFrame(char* c,char** raw, uint32_t &fnum, uint32_t &startAcquisitionIndex, uint32_t &startFrameIndex); - - /** - * Closes all files - * @param ithr thread index - */ - void closeFile(int ithr = -1); - - /** - * Starts Receiver - starts to listen for packets - * @param message is the error message if there is an error - * Returns success - */ - int startReceiver(char message[]); - - /** - * Stops Receiver - stops listening for packets - * Returns success - */ - int stopReceiver(); - - /** set status to transmitting and - * when fifo is empty later, sets status to run_finished + * Overridden method + * Stop Listening to Packets + * and sets status to Transmitting + * Next step would be to get all data and stop receiver completely and return with idle state + * Pre: status is running, udp sockets have been initialized, stop receiver initiated + * Post:udp sockets closed, status is transmitting, */ void startReadout(); /** - * shuts down the udp sockets - * \returns if success or fail + * Overridden method + * Shuts down and deletes UDP Sockets + * TCPIPInterface can also call this in case of illegal shutdown of receiver + * @return OK or FAIL */ int shutDownUDPSockets(); + /** + * Overridden method + * Get the buffer-current frame read by receiver + * @param c pointer to current file name + * @param raw address of pointer, pointing to current frame to send to gui + * @param startAcq start index of the acquisition + * @param startFrame start index of the scan + */ + void readFrame(char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame); + + /** + * Overridden method + * Closes file / all files(data compression involves multiple files) + * TCPIPInterface can also call this in case of illegal shutdown of receiver + * @param i thread index valid for datacompression using root files, -1 for all threads + */ + void closeFile(int i = -1); + private: - /* - void not_implemented(string method_name){ - std::cout << "[WARNING] Method " << method_name << " not implemented!" << std::endl; - }; - */ - /** - * Deletes all the filter objects for single photon data - */ - void deleteFilter(); - - /** - * Constructs the filter for single photon data - */ - void setupFilter(); - - /** - * set up fifo according to the new numjobsperthread - */ - void setupFifoStructure (); - - /** - * Copy frames to gui - * uses semaphore for nth frame mode - */ - void copyFrameToGui(char* startbuf[], uint32_t fnum=-1, char* buf=NULL); - - /** - * creates udp sockets - * \returns if success or fail - */ - int createUDPSockets(); - - /** - * create listening thread - * @param destroy is true to kill all threads and start again - */ - int createListeningThreads(bool destroy = false); - - /** - * create writer threads - * @param destroy is true to kill all threads and start again - */ - int createWriterThreads(bool destroy = false); - - /** - * set thread priorities - */ - void setThreadPriorities(); - - /** - * initializes variables and creates the first file - * also does the startAcquisitionCallBack - * \returns FAIL or OK - */ - int setupWriter(); - - /** - * Creates new tree and file for compression - * @param ithr thread number - * @param iframe frame number - *\returns OK for succces or FAIL for failure - */ - int createCompressionFile(int ithr, int iframe); - - /** - * Creates new file - *\returns OK for succces or FAIL for failure - */ - int createNewFile(); - - /** - * Static function - Thread started which listens to packets. - * Called by startReceiver() - * @param this_pointer pointer to this object - */ - static void* startListeningThread(void *this_pointer); - - /** - * Static function - Thread started which writes packets to file. - * Called by startReceiver() - * @param this_pointer pointer to this object - */ - static void* startWritingThread(void *this_pointer); - - /** - * Thread started which listens to packets. - * Called by startReceiver() - * - */ - int startListening(); - - /** - * Thread started which writes packets to file. - * Called by startReceiver() - * - */ - int startWriting(); - - /** - * Writing to file without compression - * @param buf is the address of buffer popped out of fifo - * @param numpackets is the number of packets - * @param framenum current frame number - */ - void writeToFile_withoutCompression(char* buf,int numpackets, uint32_t framenum); - - /** - * Its called for the first packet of a scan or acquistion - * Sets the startframeindices and the variables to know if acquisition started - * @param ithread listening thread number - */ - void startFrameIndices(int ithread); - - /** - * This is called when udp socket is shut down - * It pops ffff instead of packet number into fifo - * to inform writers about the end of listening session - * @param ithread listening thread number - * @param rc number of bytes received - * @param pc packet count - * @param t total packets listened to - */ - void stopListening(int ithread, int rc, int &pc, int &t); - - /** - * When acquisition is over, this is called - * @param ithread listening thread number - * @param wbuffer writer buffer - */ - void stopWriting(int ithread, char* wbuffer[]); - - - /** - * data compression for each fifo output - * @param ithread listening thread number - * @param wbuffer writer buffer - * @param npackets number of packets from the fifo - * @param data pointer to the next packet start - * @param xmax max pixels in x direction - * @param ymax max pixels in y direction - * @param nf nf - */ - void handleDataCompression(int ithread, char* wbuffer[], int &npackets, char* data, int xmax, int ymax, int &nf); - - - /** structure of an eiger image header*/ - typedef struct - { - unsigned char header_before[20]; - unsigned char fnum[4]; - unsigned char header_after[24]; - } eiger_image_header; - - - /** structure of an eiger image header*/ - typedef struct - { - unsigned char num1[4]; - unsigned char num2[4]; - } eiger_packet_header; - - /** max number of listening threads */ - const static int MAX_NUM_LISTENING_THREADS = EIGER_MAX_PORTS; - - /** max number of writer threads */ - const static int MAX_NUM_WRITER_THREADS = 15; - - /** detector type */ - //detectorType myDetectorType; - - /** detector hostname */ - //char detHostname[MAX_STR_LENGTH]; - - /** status of receiver */ - //runStatus status; - - /** UDP Socket between Receiver and Detector */ - //genericSocket* udpSocket[MAX_NUM_LISTENING_THREADS]; - - /** Server UDP Port*/ - //int server_port[MAX_NUM_LISTENING_THREADS]; - - /** ethernet interface or IP to listen to */ - //char *eth; - - /** max packets per file **/ - //int maxPacketsPerFile; - - /** File write enable */ - //int enableFileWrite; - - /** File over write enable */ - //int overwrite; - - /** Complete File name */ - //char savefilename[MAX_STR_LENGTH]; - - /** File Name without frame index, file index and extension*/ - //char fileName[MAX_STR_LENGTH]; - - /** File Path */ - //char filePath[MAX_STR_LENGTH]; - - /** File Index */ - //int fileIndex; - - /** scan tag */ - //int scanTag; - - /** if frame index required in file name */ - //int frameIndexNeeded; - - /* Acquisition started */ - //bool acqStarted; - - /* Measurement started */ - //bool measurementStarted; - - /** Frame index at start of each real time acquisition (eg. for each scan) */ - //uint32_t startFrameIndex; - - /** Actual current frame index of each time acquisition (eg. for each scan) */ - //uint32_t frameIndex; - - /** Frames Caught for each real time acquisition (eg. for each scan) */ - //int packetsCaught; - - /** Total packets caught for an entire acquisition (including all scans) */ - //int totalPacketsCaught; - - /** Pckets currently in current file, starts new file when it reaches max */ - //int packetsInFile; - - /** Frame index at start of an entire acquisition (including all scans) */ - //uint32_t startAcquisitionIndex; - - /** Actual current frame index of an entire acquisition (including all scans) */ - //uint32_t acquisitionIndex; - - /** number of packets per frame*/ - //int packetsPerFrame; - - /** frame index mask */ - //uint32_t frameIndexMask; - - /** packet index mask */ - //uint32_t packetIndexMask; - - /** frame index offset */ - //int frameIndexOffset; - - /** acquisition period */ - //int64_t acquisitionPeriod; - - /** frame number */ - //int32_t numberOfFrames; - - /** dynamic range */ - //int dynamicRange; - - /** short frames */ - //int shortFrame; - - /** current frame number */ - //uint32_t currframenum; - - /** Previous Frame number from buffer */ - //uint32_t prevframenum; - - /** size of one frame */ - //int frameSize; - - /** buffer size. different from framesize as we wait for one packet instead of frame for eiger */ - //int bufferSize; - - /** oen buffer size */ - //int onePacketSize; - - /** latest data */ - //char* latestData; - - /** gui data ready */ - //int guiDataReady; - - /** points to the data to send to gui */ - //char* guiData; - - /** points to the filename to send to gui */ - //char* guiFileName; - - /** temporary number for eiger frame number as its not included in the packet */ - //uint32_t guiFrameNumber; - - /** send every nth frame to gui or only upon gui request*/ - //int nFrameToGui; - - /** fifo size */ - //unsigned int fifosize; - - /** number of jobs per thread for data compression */ - //int numJobsPerThread; - - /** datacompression - save only hits */ - //bool dataCompression; - - /** memory allocated for the buffer */ - //char *mem0[MAX_NUM_LISTENING_THREADS]; - - /** circular fifo to store addresses of data read */ - //CircularFifo* fifo[MAX_NUM_LISTENING_THREADS]; - - /** circular fifo to store addresses of data already written and ready to be resued*/ - //CircularFifo* fifoFree[MAX_NUM_LISTENING_THREADS]; - - /** Receiver buffer */ - //char *buffer[MAX_NUM_LISTENING_THREADS]; - - /** number of writer threads */ - //intt numListeningThreads; - - /** number of writer threads */ - //int numWriterThreads; - - /** to know if listening and writer threads created properly */ - //int thread_started; - - /** current listening thread index*/ - //int currentListeningThreadIndex; - - /** current writer thread index*/ - //int currentWriterThreadIndex; - - /** thread listening to packets */ - //pthread_t listening_thread[MAX_NUM_LISTENING_THREADS]; - - /** thread writing packets */ - //pthread_t writing_thread[MAX_NUM_WRITER_THREADS]; - - /** total frame count the listening thread has listened to */ - //int totalListeningFrameCount[MAX_NUM_LISTENING_THREADS]; - - /** mask showing which listening threads are running */ - //volatile uint32_t listeningthreads_mask; - - /** mask showing which writer threads are running */ - //volatile uint32_t writerthreads_mask; - - /** mask showing which threads have created files*/ - //volatile uint32_t createfile_mask; - - /** OK if file created was successful */ - //int ret_createfile; - - /** variable used to self terminate threads waiting for semaphores */ - //int killAllListeningThreads; - - /** variable used to self terminate threads waiting for semaphores */ - //int killAllWritingThreads; - - /** 10Gbe enable*/ - //int tengigaEnable; - - - - -//semaphores - /** semaphore to synchronize writer and guireader threads */ - //sem_t smp; - /** semaphore to synchronize listener threads */ - //sem_t listensmp[MAX_NUM_LISTENING_THREADS]; - /** semaphore to synchronize writer threads */ - //sem_t writersmp[MAX_NUM_WRITER_THREADS]; - - -//mutex - /** guiDataReady mutex */ - //pthread_mutex_t dataReadyMutex; - - /** mutex for status */ - //pthread_mutex_t status_mutex; - - /** mutex for progress variable currframenum */ - //pthread_mutex_t progress_mutex; - - /** mutex for writing data to file */ - //pthread_mutex_t write_mutex; - - /** File Descriptor */ - //FILE *sfilefd; - - //filter - //singlePhotonDetector *singlePhotonDet[MAX_NUM_WRITER_THREADS]; - //slsReceiverData *receiverdata[MAX_NUM_WRITER_THREADS]; - //moenchCommonMode *cmSub; - //bool commonModeSubtractionEnable; - -#ifdef MYROOT1 - /** Tree where the hits are stored */ - TTree *myTree[MAX_NUM_WRITER_THREADS]; - - /** File where the tree is saved */ - TFile *myFile[MAX_NUM_WRITER_THREADS]; -#endif - - - - /** - callback arguments are - filepath - filename - fileindex - data size - - return value is - 0 callback takes care of open,close,write file - 1 callback writes file, we have to open, close it - 2 we open, close, write file, callback does not do anything - - */ - int (*startAcquisitionCallBack)(char*, char*,int, int, void*); - void *pStartAcquisition; - - /** - args to acquisition finished callback - total frames caught - - */ - void (*acquisitionFinishedCallBack)(int, void*); - void *pAcquisitionFinished; - - - /** - args to raw data ready callback are - framenum - datapointer - datasize in bytes - file descriptor - guidatapointer (NULL, no data required) - */ - void (*rawDataReadyCallBack)(int, char*, int, FILE*, char*, void*); - void *pRawDataReady; - - /** The action which decides what the user and default responsibilites to save data are - * 0 raw data ready callback takes care of open,close,write file - * 1 callback writes file, we have to open, close it - * 2 we open, close, write file, callback does not do anything */ - int cbAction; - - -public: - - - /** - callback arguments are - filepath - filename - fileindex - datasize - - return value is - 0 callback takes care of open,close,wrie file - 1 callback writes file, we have to open, close it - 2 we open, close, write file, callback does not do anything - */ - void registerCallBackStartAcquisition(int (*func)(char*, char*,int, int, void*),void *arg){startAcquisitionCallBack=func; pStartAcquisition=arg;}; - - /** - callback argument is - toatal frames caught - */ - void registerCallBackAcquisitionFinished(void (*func)(int, void*),void *arg){acquisitionFinishedCallBack=func; pAcquisitionFinished=arg;}; - - /** - args to raw data ready callback are - framenum - datapointer - datasize in bytes - file descriptor - guidatapointer (NULL, no data required) - */ - void registerCallBackRawDataReady(void (*func)(int, char*, int, FILE*, char*, void*),void *arg){rawDataReadyCallBack=func; pRawDataReady=arg;}; - - - //REST specific bool isInitialized; RestHelper * rest ; - int rest_port; // receiver backend port - string rest_hostname; // receiver hostname + int rest_port; // receiver backend port + string rest_hostname; // receiver hostname }; diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index 3c256450a..7ec6672aa 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -8,8 +8,6 @@ #include "UDPBaseImplementation.h" -//#include "sls_receiver_defs.h" -//#include "receiver_defs.h" #include "genericSocket.h" #include "circularFifo.h" #include "singlePhotonDetector.h" @@ -39,7 +37,6 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase /************************************************************************* * Constructor & Destructor ********************************************** - * They access local cache of configuration or detector parameters ******* *************************************************************************/ /** * Constructor @@ -155,6 +152,7 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase /** * Overridden method * Start Listening for Packets by activating all configuration settings to receiver + * When this function returns, it has status RUNNING(upon SUCCESS) or IDLE (upon failure) * @param c error message if FAIL * @return OK or FAIL */ @@ -164,7 +162,8 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase * Overridden method * Stop Listening for Packets * Calls startReadout(), which stops listening and sets status to Transmitting - * When it has read every frame in buffer,it returns with the status Run_Finished + * When it has read every frame in buffer, the status changes to Run_Finished + * When this function returns, receiver has status IDLE * Pre: status is running, semaphores have been instantiated, * Post: udp sockets shut down, status is idle, semaphores destroyed */ @@ -174,6 +173,7 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase * Overridden method * Stop Listening to Packets * and sets status to Transmitting + * Next step would be to get all data and stop receiver completely and return with idle state * Pre: status is running, udp sockets have been initialized, stop receiver initiated * Post:udp sockets closed, status is transmitting */ @@ -182,6 +182,7 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase /** * Overridden method * Shuts down and deletes UDP Sockets + * TCPIPInterface can also call this in case of illegal shutdown of receiver * @return OK or FAIL */ int shutDownUDPSockets(); @@ -199,6 +200,7 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase /** * Overridden method * Closes file / all files(data compression involves multiple files) + * TCPIPInterface can also call this in case of illegal shutdown of receiver * @param i thread index valid for datacompression using root files, -1 for all threads */ void closeFile(int i = -1); diff --git a/slsReceiverSoftware/src/UDPRESTImplementation.cpp b/slsReceiverSoftware/src/UDPRESTImplementation.cpp index 4b2b67e4b..ab650257f 100644 --- a/slsReceiverSoftware/src/UDPRESTImplementation.cpp +++ b/slsReceiverSoftware/src/UDPRESTImplementation.cpp @@ -7,32 +7,21 @@ #include "UDPRESTImplementation.h" -#include "moench02ModuleData.h" -#include "gotthardModuleData.h" -#include "gotthardShortModuleData.h" - - -#include // SIGINT -#include // stat -#include // socket(), bind(), listen(), accept(), shut down -#include // sock_addr_in, htonl, INADDR_ANY #include // exit() #include // set precision -#include // munmap - -#include +#include // map #include +#include +#include #include - //#include "utilities.h" - using namespace std; /* TODO + filePath != getFilePath + better state handling. Now it is only IDLE - RUNNING - IDLE -*/ + */ UDPRESTImplementation::UDPRESTImplementation(){ @@ -40,6 +29,8 @@ UDPRESTImplementation::UDPRESTImplementation(){ //TODO I do not really know what to do with bottom... // Default values + isInitialized = false; + rest = NULL; rest_hostname = "localhost"; rest_port = 8081; } @@ -69,9 +60,9 @@ void UDPRESTImplementation::configure(map config_map){ for(map::const_iterator i=config_map.begin(); i != config_map.end(); i++){ std::cout << i->first << " " << i->second<< std::endl; } - */ + */ -}; +} int UDPRESTImplementation::get_rest_state(RestHelper * rest, string *rest_state){ @@ -84,13 +75,14 @@ int UDPRESTImplementation::get_rest_state(RestHelper * rest, string *rest_state) } return code; -}; +} + void UDPRESTImplementation::initialize_REST(){ - + FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " REST status is initialized: " << isInitialized; - + if (rest_hostname.empty()) { FILE_LOG(logDEBUG) << __AT__ <<"can't initialize with empty string or NULL for detectorHostname"; } @@ -99,7 +91,7 @@ void UDPRESTImplementation::initialize_REST(){ } else { FILE_LOG(logDEBUG) << __AT__ << "with receiverHostName=" << rest_hostname << ":" << rest_port; - + rest = new RestHelper() ; std::string answer; int code; @@ -107,7 +99,7 @@ void UDPRESTImplementation::initialize_REST(){ rest->init(rest_hostname, rest_port); code = get_rest_state(rest, &answer); std::cout << "AAAAAAAa " << answer << std::endl; - + if (code != 0){ FILE_LOG(logERROR) << __AT__ << " REST state returned: " << answer; @@ -123,21 +115,21 @@ void UDPRESTImplementation::initialize_REST(){ FILE_LOG(logERROR) << __func__ << ": " << e; throw; } - + //JsonBox::Object json_object; //json_object["configfile"] = JsonBox::Value("FILENAME"); JsonBox::Value json_request; //json_request["configfile"] = "config.py"; json_request["path"] = filePath; - + stringstream ss; string test; //std::cout << "GetSTring: " << json_request << std::endl; json_request.writeToStream(ss, false); //ss << json_request; ss >> test; - - + + code = rest->get_json("state", &answer); FILE_LOG(logDEBUG) << __AT__ << " state got " << code << " " << answer << "\n"; if (answer != "INITIALIZED"){ @@ -151,8 +143,8 @@ void UDPRESTImplementation::initialize_REST(){ FILE_LOG(logDEBUG) << __AT__ << " state/configure got " << code; code = rest->get_json("state", &answer); FILE_LOG(logDEBUG) << __AT__ << " state got " << code << " " << answer << "\n"; - - + + /* std::std::cout << string << std::endl; << "---- REST test 3: true, json object "<< std::endl; JsonBox::Value json_value; @@ -160,566 +152,53 @@ void UDPRESTImplementation::initialize_REST(){ std::cout << "JSON " << json_value["status"] << std::endl; */ } - + FILE_LOG(logDEBUG) << __func__ << ": initialize() done"; } -/* -int UDPRESTImplementation::setDetectorType(detectorType det){ - cout << "[WARNING] This is a base implementation, " << __func__ << " not correctly implemented" << endl; - return OK; -} -*/ - - -/*Frame indices and numbers caught*/ - -bool UDPRESTImplementation::getAcquistionStarted(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - return acqStarted; -}; - -bool UDPRESTImplementation::getMeasurementStarted(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - return measurementStarted; -}; - -int UDPRESTImplementation::getFramesCaught(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - return (packetsCaught/packetsPerFrame); -} - -int UDPRESTImplementation::getTotalFramesCaught(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - if (packetsPerFrame == 0){ - FILE_LOG(logWARNING) << __AT__ << " packetsPerFrame is 0!!!"; - return 0; - } - return (totalPacketsCaught/packetsPerFrame); -} - -uint32_t UDPRESTImplementation::getStartAcquisitionIndex(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - return startAcquisitionIndex; -} - -uint32_t UDPRESTImplementation::getStartFrameIndex(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - return startFrameIndex; -} - -uint32_t UDPRESTImplementation::getFrameIndex(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - if(!packetsCaught) - frameIndex=-1; - else - frameIndex = currframenum - startFrameIndex; - return frameIndex; -} - - -uint32_t UDPRESTImplementation::getAcquisitionIndex(){ - //FILE_LOG(logDEBUG) << __AT__ << " called, idx: " << acquisitionIndex; - if(!totalPacketsCaught) - acquisitionIndex = -1; - else - acquisitionIndex = currframenum - startAcquisitionIndex; - - //FILE_LOG(logDEBUG) << __AT__ << " idx: " << acquisitionIndex - // << " currframenum: " << currframenum - // << " startAcqIdx: " << startAcquisitionIndex; - - return acquisitionIndex; -} - - -void UDPRESTImplementation::resetTotalFramesCaught(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - acqStarted = false; - startAcquisitionIndex = 0; - totalPacketsCaught = 0; -} - - -/*file parameters*/ -int UDPRESTImplementation::getFileIndex(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - return fileIndex; -} - -int UDPRESTImplementation::setFileIndex(int i){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - if(i>=0) - fileIndex = i; - - return getFileIndex(); -} - - -int UDPRESTImplementation::setFrameIndexNeeded(int i){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - frameIndexNeeded = i; - return frameIndexNeeded; -} - - -/* -int UDPRESTImplementation::getEnableFileWrite() const{ - return enableFileWrite; -} - -int UDPRESTImplementation::setEnableFileWrite(int i){ - enableFileWrite=i; - return getEnableFileWrite(); -} - -int UDPRESTImplementation::getEnableOverwrite() const{ - return overwrite; -} - -int UDPRESTImplementation::setEnableOverwrite(int i){ - overwrite=i; - return getEnableOverwrite(); -} -*/ - - - - -/*other parameters*/ - -slsReceiverDefs::runStatus UDPRESTImplementation::getStatus() const{ - FILE_LOG(logDEBUG) << __AT__ << " called, status: " << status; - return status; -} - - - -/* -char *UDPRESTImplementation::getDetectorHostname() const{ - return (char*)detHostname; -} -*/ - -void UDPRESTImplementation::setEthernetInterface(char* c){ - FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " starting"; - - // TODO: this segfaults - //strcpy(eth,c); - //FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " done"; -} - -/* -void UDPRESTImplementation::setUDPPortNo(int p){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - for(int i=0;i= 0) - numberOfFrames = fnum; - - return getNumberOfFrames(); -} -*/ -/* -int UDPRESTImplementation::getScanTag() const{ - return scanTag; -} -*/ - -/* -int32_t UDPRESTImplementation::setScanTag(int32_t stag){ - if(stag >= 0) - scanTag = stag; - - return getScanTag(); -} -*/ - -int32_t UDPRESTImplementation::setDynamicRange(int32_t dr){ - FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " starting"; - - int olddr = dynamicRange; - if(dr >= 0){ - dynamicRange = dr; - } - - FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " " << getDynamicRange(); - return getDynamicRange(); - - -} - -/* -int32_t UDPRESTImplementation::getDynamicRange() const{ - FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " starting"; - - return dynamicRange; -} -*/ - -int UDPRESTImplementation::setShortFrame(int i){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - shortFrame=i; - - if(shortFrame!=-1){ - bufferSize = GOTTHARD_SHORT_ONE_PACKET_SIZE; - frameSize = GOTTHARD_SHORT_BUFFER_SIZE; - maxPacketsPerFile = SHORT_MAX_FRAMES_PER_FILE * GOTTHARD_SHORT_PACKETS_PER_FRAME; - packetsPerFrame = GOTTHARD_SHORT_PACKETS_PER_FRAME; - frameIndexMask = GOTTHARD_SHORT_FRAME_INDEX_MASK; - frameIndexOffset = GOTTHARD_SHORT_FRAME_INDEX_OFFSET; - - }else{ - onePacketSize = GOTTHARD_ONE_PACKET_SIZE; - bufferSize = GOTTHARD_BUFFER_SIZE; - frameSize = GOTTHARD_BUFFER_SIZE; - maxPacketsPerFile = MAX_FRAMES_PER_FILE * GOTTHARD_PACKETS_PER_FRAME; - packetsPerFrame = GOTTHARD_PACKETS_PER_FRAME; - frameIndexMask = GOTTHARD_FRAME_INDEX_MASK; - frameIndexOffset = GOTTHARD_FRAME_INDEX_OFFSET; - } - - - deleteFilter(); - if(dataCompression) - setupFilter(); - - return shortFrame; -} - - -int UDPRESTImplementation::setNFrameToGui(int i){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - if(i>=0){ - nFrameToGui = i; - setupFifoStructure(); - } - return nFrameToGui; -} - - - -int64_t UDPRESTImplementation::setAcquisitionPeriod(int64_t index){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - if(index >= 0){ - if(index != acquisitionPeriod){ - acquisitionPeriod = index; - setupFifoStructure(); - } - } - return acquisitionPeriod; -} - - -bool UDPRESTImplementation::getDataCompression(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - return dataCompression; -} - -int UDPRESTImplementation::enableDataCompression(bool enable){ - FILE_LOG(logDEBUG) << __AT__ << " called, doing nothing"; - return OK; -} - - - -/*other functions*/ - - -void UDPRESTImplementation::deleteFilter(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - int i; - cmSub=NULL; - - for(i=0;i(receiverdata[i], csize, sigma, sign, cmSub); - -} - - - -//LEO: it is not clear to me.. -void UDPRESTImplementation::setupFifoStructure(){ - FILE_LOG(logDEBUG) << __AT__ << " called, doing nothing"; -} -/* -void UDPRESTImplementation::setupFifoStructure(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - int64_t i; - int oldn = numJobsPerThread; - - //if every nth frame mode - if(nFrameToGui) - numJobsPerThread = nFrameToGui; - - //random nth frame mode - else{ - if(!acquisitionPeriod) - i = SAMPLE_TIME_IN_NS; - else - i = SAMPLE_TIME_IN_NS/acquisitionPeriod; - if (i > MAX_JOBS_PER_THREAD) - numJobsPerThread = MAX_JOBS_PER_THREAD; - else if (i < 1) - numJobsPerThread = 1; - else - numJobsPerThread = i; - } - - //if same, return - if(oldn == numJobsPerThread) - return; - - if(myDetectorType == EIGER) - numJobsPerThread = 1; - - //otherwise memory too much if numjobsperthread is at max = 1000 - fifosize = GOTTHARD_FIFO_SIZE; - if(myDetectorType == MOENCH) - fifosize = MOENCH_FIFO_SIZE; - else if(myDetectorType == EIGER) - fifosize = EIGER_FIFO_SIZE; - - if(fifosize % numJobsPerThread) - fifosize = (fifosize/numJobsPerThread)+1; - else - fifosize = fifosize/numJobsPerThread; - - - cout << "Number of Frames per buffer:" << numJobsPerThread << endl; - cout << "Fifo Size:" << fifosize << endl; - - - for(int i=0;iisEmpty()) - fifoFree[i]->pop(buffer[i]); - delete fifoFree[i]; - } - if(fifo[i]) delete fifo[i]; - if(mem0[i]) free(mem0[i]); - fifoFree[i] = new CircularFifo(fifosize); - fifo[i] = new CircularFifo(fifosize); - - - //allocate memory - mem0[i]=(char*)malloc((bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize); - // shud let the client know about this - if (mem0[i]==NULL){ - cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++" << endl; - exit(-1); - } - buffer[i]=mem0[i]; - //push the addresses into freed fifoFree and writingFifoFree - while (buffer[i]<(mem0[i]+(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) { - fifoFree[i]->push(buffer[i]); - buffer[i]+=(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS); - } - } - cout << "Fifo structure(s) reconstructed" << endl; -} -*/ - - - /** acquisition functions */ -void UDPRESTImplementation::readFrame(char* c,char** raw, uint32_t &fnum, uint32_t &startAcquisitionIndex, uint32_t &startFrameIndex){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - //point to gui data - if (guiData == NULL){ - guiData = latestData; - } - - //copy data and filename - strcpy(c,guiFileName); - fnum = guiFrameNumber; - startAcquisitionIndex = getStartAcquisitionIndex(); - startFrameIndex = getStartFrameIndex(); - //could not get gui data - if(!guiDataReady){ - *raw = NULL; - } - //data ready, set guidata to receive new data - else{ - *raw = guiData; - guiData = NULL; - if((nFrameToGui) && (writerthreads_mask)){ - //release after getting data - sem_post(&smp); - } - } -} - - - - - -void UDPRESTImplementation::copyFrameToGui(char* startbuf[], uint32_t fnum, char* buf){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - //random read when gui not ready - if((!nFrameToGui) && (!guiData)){ - pthread_mutex_lock(&dataReadyMutex); - guiDataReady=0; - pthread_mutex_unlock(&dataReadyMutex); - } - - //random read or nth frame read, gui needs data now - else{ - /* - //nth frame read, block current process if the guireader hasnt read it yet - if(nFrameToGui) - sem_wait(&smp); -*/ - pthread_mutex_lock(&dataReadyMutex); - guiDataReady=0; - //eiger - if(startbuf != NULL){ - int offset = 0; - int size = frameSize/EIGER_MAX_PORTS; - for(int j=0;jpost_json("state/configure", &answer, request_body); + code = rest->get_json("state", &answer); + FILE_LOG(logDEBUG) << __FILE__ << "::" << " got: " << answer; - //error - int iret; - for(int i=0;igetErrorStatus(); - if(iret){ -#ifdef VERBOSE - cout << "Could not create UDP socket on port " << server_port[i] << " error:" << iret << endl; -#endif - return FAIL; - } - } + //code = rest->post_json("state/open", &answer); + //code = rest->get_json("state", &answer); + + status = RUNNING; return OK; } @@ -727,9 +206,46 @@ int UDPRESTImplementation::createUDPSockets(){ +void UDPRESTImplementation::stopReceiver(){ + + FILE_LOG(logDEBUG) << __AT__ << "called"; + + if(status == RUNNING) + startReadout(); + + while(status == TRANSMITTING) + usleep(5000); + + //change status + status = IDLE; + + FILE_LOG(logDEBUG) << __AT__ << "exited, status " << endl; + +} + + +void UDPRESTImplementation::startReadout(){ + FILE_LOG(logDEBUG) << __AT__ << " starting"; + + status = TRANSMITTING; + + //kill udp socket to tell the listening thread to push last packet + shutDownUDPSockets(); + FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " done"; + +} + + + + + +/* FIXME + * Its also called by TCP in case of illegal shut down such as Ctrl + c. + * Upto you what you want to do with it. + */ int UDPRESTImplementation::shutDownUDPSockets(){ FILE_LOG(logDEBUG) << __AT__ << "called"; @@ -755,19 +271,19 @@ int UDPRESTImplementation::shutDownUDPSockets(){ } // getting the state - FILE_LOG(logWARNING) << "PLEASE WAIT WHILE CHECKING AND SHUTTING DOWN ALL CONNECTIONS!"; + FILE_LOG(logWARNING) << "PLEASE WAIT WHILE CHECKING AND SHUTTING DOWN ALL CONNECTIONS!"; code = rest->get_json("state", &answer); be_state = answer["state"].getString(); // LEO: this is probably wrong if (be_state == "OPEN"){ while (be_state != "TRANSIENT"){ - code = rest->get_json("state", &answer); - be_state = answer["state"].getString(); - cout << "be_State: " << be_state << endl; - usleep(10000); + code = rest->get_json("state", &answer); + be_state = answer["state"].getString(); + cout << "be_State: " << be_state << endl; + usleep(10000); } - + code = rest->post_json("state/close", &answer); std::cout <post_json("state/reset", &answer); @@ -787,1443 +303,29 @@ int UDPRESTImplementation::shutDownUDPSockets(){ - - -int UDPRESTImplementation::createListeningThreads(bool destroy){ - +/* FIXME + * do you really need this, this is called if registerDataCallback() is activated + * in your gui to get data from receiver. you probably have a different way + * of reconstructing complete data set from all receivers + */ +void UDPRESTImplementation::readFramee(char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame){ FILE_LOG(logDEBUG) << __AT__ << " called"; - int i; - void* status; - - killAllListeningThreads = 0; - - pthread_mutex_lock(&status_mutex); - listeningthreads_mask = 0x0; - pthread_mutex_unlock(&(status_mutex)); - - if(!destroy){ - - //start listening threads - cout << "Creating Listening Threads(s)"; - - currentListeningThreadIndex = -1; - - for(i = 0; i < numListeningThreads; ++i){ - sem_init(&listensmp[i],1,0); - thread_started = 0; - currentListeningThreadIndex = i; - if(pthread_create(&listening_thread[i], NULL,startListeningThread, (void*) this)){ - cout << "Could not create listening thread with index " << i << endl; - return FAIL; - } - while(!thread_started); - cout << "."; - cout << flush; - } -#ifdef VERBOSE - cout << "Listening thread(s) created successfully." << endl; -#else - cout << endl; -#endif - }else{ - cout<<"Destroying Listening Thread(s)"<initEventTree(temp, &iframe); - //resets the pedestalSubtraction array and the commonModeSubtraction - singlePhotonDet[ithr]->newDataSet(); - if(myFile[ithr]==NULL){ - cout<<"file null"<IsOpen()){ - cout<<"file not open"< DO_NOTHING){ - //close - if(sfilefd){ - fclose(sfilefd); - sfilefd = NULL; - } - //open file - if(!overwrite){ - if (NULL == (sfilefd = fopen((const char *) (savefilename), "wx"))){ - cout << "Error: Could not create new file " << savefilename << endl; - return FAIL; - } - }else if (NULL == (sfilefd = fopen((const char *) (savefilename), "w"))){ - cout << "Error: Could not create file " << savefilename << endl; - return FAIL; - } - //setting buffer - setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); - - //printing packet losses and file names - if(!packetsCaught) - cout << savefilename << endl; - else{ - cout << savefilename - << "\tpacket loss " - << setw(4)<GetCurrentFile(); - - if(myFile[ithr]->Write()) - //->Write(tall->GetName(),TObject::kOverwrite); - cout << "Thread " << ithr <<": wrote frames to file" << endl; - else - cout << "Thread " << ithr << ": could not write frames to file" << endl; - - }else - cout << "Thread " << ithr << ": could not write frames to file: No file or No Tree" << endl; - //close file - if(myTree[ithr] && myFile[ithr]) - myFile[ithr] = myTree[ithr]->GetCurrentFile(); - if(myFile[ithr] != NULL) - myFile[ithr]->Close(); - myFile[ithr] = NULL; - myTree[ithr] = NULL; - pthread_mutex_unlock(&write_mutex); - -#endif - } - FILE_LOG(logDEBUG) << __AT__ << "exited for thread " << ithr; } - - -int UDPRESTImplementation::startReceiver(char message[]){ - int i; - - FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " starting"; - initialize_REST(); - FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " initialized"; - - cout << "Starting Receiver" << endl; - - std::string answer; - int code; - //char *intStr = itoa(a); - //string str = string(intStr); - // TODO: remove hardcode!!! - stringstream ss; - ss << getDynamicRange(); - string str_dr = ss.str(); - stringstream ss2; - ss2 << getNumberOfFrames(); - string str_n = ss2.str(); - - - cout << "Starting Receiver" << endl; - - std::string request_body = "{\"settings\": {\"bit_depth\": " + str_dr + ", \"nimages\": " + str_n + "}}"; - //std::string request_body = "{\"settings\": {\"nimages\":1, \"scanid\":999, \"bit_depth\":16}}"; - FILE_LOG(logDEBUG) << __FILE__ << "::" << " sending this configuration body: " << request_body; - code = rest->post_json("state/configure", &answer, request_body); - code = rest->get_json("state", &answer); - FILE_LOG(logDEBUG) << __FILE__ << "::" << " got: " << answer; - - //code = rest->post_json("state/open", &answer); - //code = rest->get_json("state", &answer); - - status = RUNNING; - - //reset listening thread variables - /* - measurementStarted = false; - //should be set to zero as its added to get next start frame indices for scans for eiger - if(!acqStarted) currframenum = 0; - startFrameIndex = 0; - - for(int i = 0; i < numListeningThreads; ++i) - totalListeningFrameCount[i] = 0; - */ - //udp socket - /* - if(createUDPSockets() == FAIL){ - strcpy(message,"Could not create UDP Socket(s).\n"); - cout << endl << message << endl; - return FAIL; - } - cout << "UDP socket(s) created successfully. 1st port " << server_port[0] << endl; - - */ - /* - if(setupWriter() == FAIL){ - //stop udp socket - shutDownUDPSockets(); - - sprintf(message,"Could not create file %s.\n",savefilename); - return FAIL; - } - cout << "Successfully created file(s)" << endl; - - //done to give the gui some proper name instead of always the last file name - if(dataCompression) - sprintf(savefilename, "%s/%s_fxxx_%d_xx.root", filePath,fileName,fileIndex); - - //initialize semaphore - sem_init(&smp,1,0); - - //status - pthread_mutex_lock(&status_mutex); - status = RUNNING; - for(i=0;istartListening(); - - return this_pointer; -} - - - -void* UDPRESTImplementation::startWritingThread(void* this_pointer){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - FILE_LOG(logDEBUG) << __AT__ << " doing a big bunch of nothing"; - - //((UDPRESTImplementation*)this_pointer)->startWriting(); - return this_pointer; -} - - - - - - -int UDPRESTImplementation::startListening(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - FILE_LOG(logDEBUG) << __AT__ << " doing a big bunch of nothing"; - - /* - int ithread = currentListeningThreadIndex; -#ifdef VERYVERBOSE - cout << "In startListening() " << endl; -#endif - - thread_started = 1; - - int i,total; - int lastpacketoffset, expected, rc, rc1,packetcount, maxBufferSize, carryonBufferSize; - uint32_t lastframeheader;// for moench to check for all the packets in last frame - char* tempchar = NULL; - int imageheader = 0; - if(myDetectorType==EIGER) - imageheader = EIGER_IMAGE_HEADER_SIZE; - - - while(1){ - //variables that need to be checked/set before each acquisition - carryonBufferSize = 0; - //if more than 1 listening thread, listen one packet at a time, else need to interleaved frame later - maxBufferSize = bufferSize * numJobsPerThread; -#ifdef VERYDEBUG - cout << " maxBufferSize:" << maxBufferSize << ",carryonBufferSize:" << carryonBufferSize << endl; -#endif - - if(tempchar) {delete [] tempchar;tempchar = NULL;} - if(myDetectorType != EIGER) - tempchar = new char[onePacketSize * ((packetsPerFrame/numListeningThreads) - 1)]; //gotthard: 1packet size, moench:39 packet size - - - while((1<pop(buffer[ithread]); -#ifdef VERYDEBUG - cout << ithread << " *** popped from fifo free" << (void*)buffer[ithread] << endl; -#endif - - - //receive - if(udpSocket[ithread] == NULL){ - rc = 0; - cout << ithread << "UDP Socket is NULL" << endl; - } - //normal listening - else if(!carryonBufferSize){ - - rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize); - expected = maxBufferSize; - - } - //the remaining packets from previous buffer - else{ -#ifdef VERYDEBUG - cout << ithread << " ***carry on buffer" << carryonBufferSize << endl; - cout << ithread << " framennum in temochar:"<<((((uint32_t)(*((uint32_t*)tempchar))) - & (frameIndexMask)) >> frameIndexOffset)<ReceiveDataOnly((buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + carryonBufferSize),maxBufferSize - carryonBufferSize); - expected = maxBufferSize - carryonBufferSize; - } - -#ifdef VERYDEBUG - cout << ithread << " *** rc:" << dec << rc << ". expected:" << dec << expected << endl; -#endif - - - - - //start indices for each start of scan/acquisition - eiger does it before - if((!measurementStarted) && (rc > 0) && (!ithread)) - startFrameIndices(ithread); - - //problem in receiving or end of acquisition - if((rc < expected)||(rc <= 0)){ - stopListening(ithread,rc,packetcount,total); - continue; - } - - - - //reset - packetcount = (packetsPerFrame/numListeningThreads) * numJobsPerThread; - carryonBufferSize = 0; - - - - //check if last packet valid and calculate packet count - switch(myDetectorType){ - - case MOENCH: - lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); -#ifdef VERYDEBUG - cout <<"first packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (packetIndexMask)) << endl; - cout <<"first header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (frameIndexMask)) >> frameIndexOffset) << endl; - cout << "last packet offset:" << lastpacketoffset << endl; - cout <<"last packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask)) << endl; - cout <<"last header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset) << endl; -#endif - //moench last packet value is 0 - if( ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask))){ - lastframeheader = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset; - carryonBufferSize += onePacketSize; - lastpacketoffset -= onePacketSize; - --packetcount; - while (lastframeheader == (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset)){ - carryonBufferSize += onePacketSize; - lastpacketoffset -= onePacketSize; - --packetcount; - } - memcpy(tempchar, buffer[ithread]+(lastpacketoffset+onePacketSize), carryonBufferSize); -#ifdef VERYDEBUG - cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))) - & (frameIndexMask)) >> frameIndexOffset) << endl; - cout <<"tempchar packet:"<< ((((uint32_t)(*((uint32_t*)(tempchar))))) - & (packetIndexMask)) << endl; -#endif - } - break; - - case GOTTHARD: - if(shortFrame == -1){ - lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); -#ifdef VERYDEBUG - cout << "last packet offset:" << lastpacketoffset << endl; -#endif - - if((unsigned int)(packetsPerFrame -1) != ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))+1) & (packetIndexMask))){ - memcpy(tempchar,buffer[ithread]+lastpacketoffset, onePacketSize); -#ifdef VERYDEBUG - cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))+1) - & (frameIndexMask)) >> frameIndexOffset) << endl; -#endif - carryonBufferSize = onePacketSize; - --packetcount; - } - } -#ifdef VERYDEBUG - cout << "header:" << (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) - & (frameIndexMask)) >> frameIndexOffset) << endl; -#endif - break; - default: - - break; - - } - - - // cout<<"*********** "<fnum)<push(buffer[ithread])); -#ifdef VERYDEBUG - if(!ithread) cout << ithread << " *** pushed into listening fifo" << endl; -#endif - } - - sem_wait(&listensmp[ithread]); - - //make sure its not exiting thread - if(killAllListeningThreads){ - cout << ithread << " good bye listening thread" << endl; - if(tempchar) {delete [] tempchar;tempchar = NULL;} - pthread_exit(NULL); - } - } - */ - return OK; -} - - - - - - - - - - - - - -int UDPRESTImplementation::startWriting(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - FILE_LOG(logDEBUG) << __AT__ << " doing a big bunch of nothing"; - /* - int ithread = currentWriterThreadIndex; -#ifdef VERYVERBOSE - cout << ithread << "In startWriting()" <pop(wbuf[i]); - numpackets = (uint16_t)(*((uint16_t*)wbuf[i])); -#ifdef VERYDEBUG - cout << ithread << " numpackets:" << dec << numpackets << endl; -#endif - } - -#ifdef VERYDEBUG - cout << ithread << " numpackets:" << dec << numpackets << endl; - cout << ithread << " *** writer popped from fifo " << (void*) wbuf[0]<< endl; - cout << ithread << " *** writer popped from fifo " << (void*) wbuf[1]<< endl; -#endif - - - //last dummy packet - if(numpackets == 0xFFFF){ - stopWriting(ithread,wbuf); - continue; - } - - - - - //for progress - if(myDetectorType == EIGER){ - tempframenum = htonl(*(unsigned int*)((eiger_image_header *)((char*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum); - tempframenum += (startFrameIndex-1); //eiger frame numbers start at 1, so need to -1 - }else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) - tempframenum = (((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); - else - tempframenum = ((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); - - if(numWriterThreads == 1) - currframenum = tempframenum; - else{ - pthread_mutex_lock(&progress_mutex); - if(tempframenum > currframenum) - currframenum = tempframenum; - pthread_mutex_unlock(&progress_mutex); - } -//#ifdef VERYDEBUG - if(myDetectorType == EIGER) - cout << endl < 0){ - for(i=0;ipush(wbuf[i])); -#ifdef VERYDEBUG - cout << ithread << ":" << i+j << " fifo freed:" << (void*)wbuf[i] << endl; -#endif - } - - - } - else{ - //copy to gui - copyFrameToGui(NULL,-1,wbuf[0]+HEADER_SIZE_NUM_TOT_PACKETS); -#ifdef VERYVERBOSE - cout << ithread << " finished copying" << endl; -#endif - while(!fifoFree[0]->push(wbuf[0])); -#ifdef VERYVERBOSE - cout<<"buf freed:"<<(void*)wbuf[0]<fnum); - //gotthard has +1 for frame number and not a short frame - else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) - startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) - & (frameIndexMask)) >> frameIndexOffset); - else - startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS)))) - & (frameIndexMask)) >> frameIndexOffset); - - - //start of acquisition - if(!acqStarted){ - startAcquisitionIndex=startFrameIndex; - currframenum = startAcquisitionIndex; - acqStarted = true; - cout << "startAcquisitionIndex:" << startAcquisitionIndex<push(buffer[ithread]); - exit(-1); - } - //push the last buffer into fifo - if(rc > 0){ - pc = (rc/onePacketSize); -#ifdef VERYDEBUG - cout << ithread << " *** last packetcount:" << pc << endl; -#endif - (*((uint16_t*)(buffer[ithread]))) = pc; - totalListeningFrameCount[ithread] += pc; - while(!fifo[ithread]->push(buffer[ithread])); -#ifdef VERYDEBUG - cout << ithread << " *** last lbuf1:" << (void*)buffer[ithread] << endl; -#endif - } - - - //push dummy buffer to all writer threads - for(i=0;ipop(buffer[ithread]); - (*((uint16_t*)(buffer[ithread]))) = 0xFFFF; -#ifdef VERYDEBUG - cout << ithread << " going to push in dummy buffer:" << (void*)buffer[ithread] << " with num packets:"<< (*((uint16_t*)(buffer[ithread]))) << endl; -#endif - while(!fifo[ithread]->push(buffer[ithread])); -#ifdef VERYDEBUG - cout << ithread << " pushed in dummy buffer:" << (void*)buffer[ithread] << endl; -#endif - } - - //reset mask and exit loop - pthread_mutex_lock(&status_mutex); - listeningthreads_mask^=(1< 1) - cout << "Waiting for listening to be done.. current mask:" << hex << listeningthreads_mask << endl; -#endif - while(listeningthreads_mask) - usleep(5000); -#ifdef VERYDEBUG - t = 0; - for(i=0;ipush(wbuffer[i])); -#ifdef VERYDEBUG - cout << ithread << ":" << i<< " fifo freed:" << (void*)wbuffer[i] << endl; -#endif - } - - - - //all threads need to close file, reset mask and exit loop - closeFile(ithread); - pthread_mutex_lock(&status_mutex); - writerthreads_mask^=(1< 0){ - - //for progress and packet loss calculation(new files) - if(myDetectorType == EIGER); - else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) - tempframenum = (((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); - else - tempframenum = ((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); - - if(numWriterThreads == 1) - currframenum = tempframenum; - else{ - if(tempframenum > currframenum) - currframenum = tempframenum; - } -#ifdef VERYDEBUG - cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; -#endif - - //lock - if(numWriterThreads > 1) - pthread_mutex_lock(&write_mutex); - - - //to create new file when max reached - packetsToSave = maxPacketsPerFile - packetsInFile; - if(packetsToSave > numpackets) - packetsToSave = numpackets; -/**next time offset is still plus header length*/ - fwrite(buf+offset, 1, packetsToSave * onePacketSize, sfilefd); - packetsInFile += packetsToSave; - packetsCaught += packetsToSave; - totalPacketsCaught += packetsToSave; - - - //new file - if(packetsInFile >= maxPacketsPerFile){ - //for packet loss - lastpacket = (((packetsToSave - 1) * onePacketSize) + offset); - if(myDetectorType == EIGER); - else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) - tempframenum = (((((uint32_t)(*((uint32_t*)(buf + lastpacket))))+1)& (frameIndexMask)) >> frameIndexOffset); - else - tempframenum = ((((uint32_t)(*((uint32_t*)(buf + lastpacket))))& (frameIndexMask)) >> frameIndexOffset); - - if(numWriterThreads == 1) - currframenum = tempframenum; - else{ - if(tempframenum > currframenum) - currframenum = tempframenum; - } -#ifdef VERYDEBUG - cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; -#endif - //create - createNewFile(); - } - - //unlock - if(numWriterThreads > 1) - pthread_mutex_unlock(&write_mutex); - - - offset += (packetsToSave * onePacketSize); - numpackets -= packetsToSave; - } - - } - else{ - if(numWriterThreads > 1) - pthread_mutex_lock(&write_mutex); - packetsInFile += numpackets; - packetsCaught += numpackets; - totalPacketsCaught += numpackets; - if(numWriterThreads > 1) - pthread_mutex_unlock(&write_mutex); - } -} - - - - - - - - - - - - - - -void UDPRESTImplementation::handleDataCompression(int ithread, char* wbuffer[], int &npackets, char* data, int xmax, int ymax, int &nf){ - - FILE_LOG(logDEBUG) << __AT__ << " called"; - -#if defined(MYROOT1) && defined(ALLFILE_DEBUG) - writeToFile_withoutCompression(wbuf[0], numpackets,currframenum); -#endif - - eventType thisEvent = PEDESTAL; - int ndata; - char* buff = 0; - data = wbuffer[0]+ HEADER_SIZE_NUM_TOT_PACKETS; - int remainingsize = npackets * onePacketSize; - int np; - int once = 0; - double tot, tl, tr, bl, br; - int xmin = 1, ymin = 1, ix, iy; - - - while(buff = receiverdata[ithread]->findNextFrame(data,ndata,remainingsize)){ - np = ndata/onePacketSize; - - //cout<<"buff framnum:"<> frameIndexOffset)<newFrame(); - - //only for moench - if(commonModeSubtractionEnable){ - for(ix = xmin - 1; ix < xmax+1; ix++){ - for(iy = ymin - 1; iy < ymax+1; iy++){ - thisEvent = singlePhotonDet[ithread]->getEventType(buff, ix, iy, 0); - } - } - } - - - for(ix = xmin - 1; ix < xmax+1; ix++) - for(iy = ymin - 1; iy < ymax+1; iy++){ - thisEvent=singlePhotonDet[ithread]->getEventType(buff, ix, iy, commonModeSubtractionEnable); - if (nf>1000) { - tot=0; - tl=0; - tr=0; - bl=0; - br=0; - if (thisEvent==PHOTON_MAX) { - receiverdata[ithread]->getFrameNumber(buff); - //iFrame=receiverdata[ithread]->getFrameNumber(buff); -#ifdef MYROOT1 - myTree[ithread]->Fill(); - //cout << "Fill in event: frmNr: " << iFrame << " ix " << ix << " iy " << iy << " type " << thisEvent << endl; -#else - pthread_mutex_lock(&write_mutex); - if((enableFileWrite) && (sfilefd)) - singlePhotonDet[ithread]->writeCluster(sfilefd); - pthread_mutex_unlock(&write_mutex); -#endif - } - } - } - - nf++; -#ifndef ALLFILE - pthread_mutex_lock(&progress_mutex); - packetsInFile += packetsPerFrame; - packetsCaught += packetsPerFrame; - totalPacketsCaught += packetsPerFrame; - if(packetsInFile >= maxPacketsPerFile) - createNewFile(); - pthread_mutex_unlock(&progress_mutex); - -#endif - if(!once){ - copyFrameToGui(NULL,-1,buff); - once = 1; - } - } - - remainingsize -= ((buff + ndata) - data); - data = buff + ndata; - if(data > (wbuffer[0] + HEADER_SIZE_NUM_TOT_PACKETS + npackets * onePacketSize) ) - cout <<" **************ERROR SHOULD NOT COME HERE, Error 142536!"<push(wbuffer[0])); -#ifdef VERYVERBOSE - cout<<"buf freed:"<<(void*)wbuffer[0]<= 0){ - - tengigaEnable = enable; - - if(myDetectorType == EIGER){ - - if(!tengigaEnable){ - packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; - onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE; - maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; - }else{ - packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; - onePacketSize = EIGER_TEN_GIGA_ONE_PACKET_SIZE; - maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame*4; - } - frameSize = onePacketSize * packetsPerFrame; - bufferSize = (frameSize/EIGER_MAX_PORTS) + EIGER_HEADER_LENGTH;//everything one port gets (img header plus packets) - //maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; - - - cout<<"packetsPerFrame:"< // socket(), bind(), listen(), accept(), shut down -//#include // sock_addr_in, htonl, INADDR_ANY #include // exit() #include //set precision for printing parameters for create new file #include //map -//#include //munmap - #include #include #include -#include using namespace std; #define WRITE_HEADERS @@ -789,7 +784,7 @@ void UDPStandardImplementation::resetAcquisitionCount(){ int UDPStandardImplementation::startReceiver(char *c){ FILE_LOG(logDEBUG) << __AT__ << " called"; - cout << "Starting Receiver" << endl; + FILE_LOG(logINFO) << "Stopping Receiver"; //RESET @@ -888,7 +883,7 @@ int UDPStandardImplementation::startReceiver(char *c){ void UDPStandardImplementation::stopReceiver(){ FILE_LOG(logDEBUG) << __AT__ << " called"; - cout << "Stopping Receiver" << endl; + FILE_LOG(logINFO) << "Stopping Receiver"; //set status to transmitting startReadout();