From e915245c104c9cb66c002ab89cb3fb1de11b0f12 Mon Sep 17 00:00:00 2001 From: Dhanya Maliakal Date: Thu, 8 Oct 2015 17:09:43 +0200 Subject: [PATCH] additional change --- .../include/UDPBaseImplementation.h | 6 +- slsReceiverSoftware/include/UDPInterface.h | 6 +- .../include/UDPStandardImplementation.h | 220 +++----- .../src/UDPBaseImplementation.cpp | 5 +- .../src/UDPStandardImplementation.cpp | 497 ++++++++---------- 5 files changed, 297 insertions(+), 437 deletions(-) diff --git a/slsReceiverSoftware/include/UDPBaseImplementation.h b/slsReceiverSoftware/include/UDPBaseImplementation.h index 293ac6763..c1de8fa6f 100644 --- a/slsReceiverSoftware/include/UDPBaseImplementation.h +++ b/slsReceiverSoftware/include/UDPBaseImplementation.h @@ -111,7 +111,7 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter uint64_t getFramesCaught() const; /** - * Get Current Frame Index Caught for an entire acquisition (including all scans) + * Get Current Frame Index for an entire acquisition (including all scans) * @return current frame index (represents all scans too) */ int64_t getAcquisitionIndex() const; @@ -372,8 +372,8 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter void startReadout(); /** - * shuts down the udp sockets - * \returns OK or FAIL + * Shuts down and deletes UDP Sockets + * @return OK or FAIL */ int shutDownUDPSockets(); diff --git a/slsReceiverSoftware/include/UDPInterface.h b/slsReceiverSoftware/include/UDPInterface.h index 65c275a29..a3766b847 100644 --- a/slsReceiverSoftware/include/UDPInterface.h +++ b/slsReceiverSoftware/include/UDPInterface.h @@ -171,7 +171,7 @@ class UDPInterface { virtual uint64_t getFramesCaught() const = 0; /** - * Get Current Frame Index Caught for an entire acquisition (including all scans) + * Get Current Frame Index for an entire acquisition (including all scans) * @return current frame index (represents all scans too) or -1 if no packets caught */ virtual int64_t getAcquisitionIndex() const = 0; @@ -430,8 +430,8 @@ class UDPInterface { virtual void startReadout() = 0; /** - * shuts down the udp sockets - * \returns OK or FAIL + * Shuts down and deletes UDP Sockets + * @return OK or FAIL */ virtual int shutDownUDPSockets() = 0; diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index 747eaad34..825806e83 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -57,8 +57,6 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase * They access local cache of configuration or detector parameters ******* *************************************************************************/ - //***acquisition count parameters*** - /************************************************************************* * Setters *************************************************************** @@ -154,14 +152,14 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase */ int startReceiver(char *c=NULL); - + /** + * Shuts down and deletes UDP Sockets + * @return OK or FAIL + */ + int shutDownUDPSockets(); private: - /************************************************************************* - * Setters *************************************************************** - * They modify the local cache of configuration or detector parameters *** - *************************************************************************/ //**initial parameters*** /** @@ -227,6 +225,13 @@ private: */ int createUDPSockets(); + /** + * Initializes writer variables and creates the first file + * also does the startAcquisitionCallBack + * @return OK or FAIL + */ + int setupWriter(); + //**detector parameters*** @@ -277,19 +282,27 @@ private: /** Footer offset from start of Packet*/ int footerOffset; + //***File parameters*** /** Maximum Packets Per File **/ int maxPacketsPerFile; + /** If file created successfully for all Writer Threads */ + bool fileCreateSuccess; - //***acquisition indices parameters*** - /** Frame Number of First Frame of an Acquisition */ + + + //***acquisition indices/count parameters*** + /** Frame Number of First Frame of an entire Acquisition (including all scans) */ uint64_t startAcquisitionIndex; /** Frame index at start of each real time acquisition (eg. for each scan) */ uint64_t startFrameIndex; + /** Actual current frame index of each time acquisition (eg. for each scan) */ + uint64_t frameIndex; + /** Current Frame Number */ uint64_t currentFrameNumber; @@ -302,6 +315,19 @@ private: /** Total Frame Count listened to by listening threads */ int totalListeningFrameCount[MAX_NUMBER_OF_LISTENING_THREADS]; + /** Pckets currently in current file, starts new file when it reaches max */ + uint32_t packetsInFile; + + /** Number of Missing Packets per buffer*/ + uint32_t numMissingPackets; + + /** Total Number of Missing Packets in acquisition*/ + uint32_t numTotMissingPackets; + + /** Number of Missing Packets in file */ + uint32_t numTotMissingPacketsInFile; + + @@ -318,15 +344,37 @@ private: /** Circular fifo to point to address already written and freed, to be reused */ CircularFifo* fifoFree[MAX_NUMBER_OF_LISTENING_THREADS]; + /** UDP Sockets - Detector to Receiver */ + genericSocket* udpSocket[MAX_NUMBER_OF_LISTENING_THREADS]; + + /** File Descriptor */ + FILE *sfilefd; + /** Number of Jobs Per Buffer */ int numberofJobsPerBuffer; /** Fifo Depth */ uint32_t fifoSize; - /** Current Frame copied for Gui */ + + //***receiver to GUI parameters*** + /** Current Frame copied for GUI */ char* latestData; + /** If Data to be sent to GUI is ready */ + bool guiDataReady; + + /** Pointer to data to be sent to GUI */ + char* guiData; + + /** Pointer to file name to be sent to GUI */ + char guiFileName[MAX_STR_LENGTH]; + + /** Semaphore to synchronize Writer and GuiReader threads*/ + sem_t writerGuiSemaphore; + + + //***general and listening thread parameters*** @@ -382,7 +430,6 @@ private: - //***filter parameters*** /** Common Mode Subtraction Enable FIXME: Always false, only moench uses, Ask Anna */ bool commonModeSubtractionEnable; @@ -404,6 +451,12 @@ private: pthread_mutex_t status_mutex; + //***callback*** + /** The action which decides what the user and default responsibilities to save data are + * 0 raw data ready callback takes care of open,close,write file + * 1 callback writes file, we have to open, close it + * 2 we open, close, write file, callback does not do anything */ + int cbAction; @@ -422,89 +475,6 @@ private: - - - - - - - /** - * Set receiver type - * @param det detector type - * Returns success or FAIL - */ - int setDetectorType(detectorType det); - - - //Frame indices and numbers caught - /** - * Returns the frame index at start of entire acquisition (including all scans) - */ - //uint32_t getStartAcquisitionIndex(); - - /** - * Returns if acquisition started - */ - //bool getAcquistionStarted(); - - /** - * Returns the frame index at start of each real time acquisition (eg. for each scan) - */ - //uint32_t getStartFrameIndex(); - - /** - * Returns current Frame Index for each real time acquisition (eg. for each scan) - */ - //uint32_t getFrameIndex(); - - /** - * Returns if measurement started - */ - //bool getMeasurementStarted(); - - /** - * Resets the Total Frames Caught - * This is how the receiver differentiates between entire acquisitions - * Returns 0 - */ - //void resetTotalFramesCaught(); - - - - - -//other parameters - - /** - * abort acquisition with minimum damage: close open files, cleanup. - * does nothing if state already is 'idle' - */ - void abort() {}; - - /** - * Returns status of receiver: idle, running or error - */ - runStatus getStatus() const; - - /** - * Set detector hostname - * @param c hostname - */ - void setDetectorHostname(const char *detectorHostName); - - - - /** - * enable 10Gbe - @param enable 1 for 10Gbe or 0 for 1 Gbe, -1 to read out - \returns enable for 10Gbe - */ - int enableTenGiga(int enable = -1); - - - -//other functions - /** * Returns the buffer-current frame read by receiver * @param c pointer to current file name @@ -520,12 +490,6 @@ private: */ void closeFile(int ithr = -1); - /** - * Starts Receiver - starts to listen for packets - * @param message is the error message if there is an error - * Returns success - */ - int startReceiver(char message[]); /** * Stops Receiver - stops listening for packets @@ -538,21 +502,9 @@ private: */ void startReadout(); - /** - * shuts down the udp sockets - * \returns if success or fail - */ - int shutDownUDPSockets(); + private: - - /* - void not_implemented(string method_name){ - std::cout << "[WARNING] Method " << method_name << " not implemented!" << std::endl; - }; - */ - - /** @@ -562,12 +514,6 @@ private: void copyFrameToGui(char* startbuf[], char* buf=NULL); - /** - * initializes variables and creates the first file - * also does the startAcquisitionCallBack - * \returns FAIL or OK - */ - int setupWriter(); /** * Creates new tree and file for compression @@ -694,42 +640,18 @@ private: const static uint16_t missingPacketValue = 0xFFFF; - /** UDP Socket between Receiver and Detector */ - genericSocket* udpSocket[MAX_NUM_LISTENING_THREADS]; - - /** Complete File name */ +/** Complete File name */ char savefilename[MAX_STR_LENGTH]; - /** Actual current frame index of each time acquisition (eg. for each scan) */ - uint32_t frameIndex; - /** Pckets currently in current file, starts new file when it reaches max */ - uint32_t packetsInFile; - - /** Number of missing packets in acquisition*/ - uint32_t numTotMissingPackets; - - /** Number of missing packets in file (sometimes packetsinFile is incorrect due to padded packets for eiger)*/ - uint32_t numTotMissingPacketsInFile; - - /** Number of missing packets per buffer*/ - uint32_t numMissingPackets; /** Previous Frame number from buffer */ int prevframenum; - /** gui data ready */ - int guiDataReady; - /** points to the data to send to gui */ - char* guiData; - /** points to the filename to send to gui */ - char* guiFileName; -/** OK if file created was successful */ - int ret_createfile; // TODO: not properly sure where to put these... /** structure of an eiger image header*/ @@ -738,8 +660,7 @@ private: //semaphores - /** semaphore to synchronize writer and guireader threads */ - sem_t smp; + //mutex /** guiDataReady mutex */ @@ -751,8 +672,6 @@ private: /** mutex for writing data to file */ pthread_mutex_t write_mutex; - /** File Descriptor */ - FILE *sfilefd; //filter @@ -766,11 +685,6 @@ private: #endif - /** The action which decides what the user and default responsibilites to save data are - * 0 raw data ready callback takes care of open,close,write file - * 1 callback writes file, we have to open, close it - * 2 we open, close, write file, callback does not do anything */ - int cbAction; public: diff --git a/slsReceiverSoftware/src/UDPBaseImplementation.cpp b/slsReceiverSoftware/src/UDPBaseImplementation.cpp index 0c1c30248..ec5c27ad5 100644 --- a/slsReceiverSoftware/src/UDPBaseImplementation.cpp +++ b/slsReceiverSoftware/src/UDPBaseImplementation.cpp @@ -286,10 +286,7 @@ int UDPBaseImplementation::setDataCompressionEnable(const bool b){ void UDPBaseImplementation::setUDPPortNumber(const uint32_t i){ FILE_LOG(logDEBUG) << __AT__ << " starting"; - if(bottomEnable) - udpPortNum[1] = i; - else - udpPortNum[0] = i; + udpPortNum[0] = i; FILE_LOG(logINFO) << "udpPortNum[0]:" << udpPortNum[0]; } diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index 0f4290c9c..5901e96a3 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -58,16 +58,22 @@ void UDPStandardImplementation::deleteMembers(){ FILE_LOG(logDEBUG) << __AT__ << " starting"; cout << "Info: Deleting member pointers" << endl; + shutDownUDPSockets(); + closeFile(); //filter deleteFilter(); + for(int i=0; igetErrorStatus(); + if(!iret){ + cout << "Info: UDP port opened at port " << port[i] << endl; + }else{ +#ifdef VERBOSE + cprintf(BG_RED,"Error: Could not create UDP socket on port %d error: %d\n", port[i], iret); +#endif + shutDownUDPSockets(); + return FAIL; + } + } + + cout << "Info: UDP socket(s) created successfully." << endl; + cout << "Info: Listener Ready ..." << endl; return OK; } +int UDPStandardImplementation::setupWriter(){ + FILE_LOG(logDEBUG) << __AT__ << " starting"; + + //acquisition start call back returns enable write + cbAction = DO_EVERYTHING; + if (startAcquisitionCallBack) + cbAction=startAcquisitionCallBack(filePath,fileName,fileIndex,bufferSize,pStartAcquisition); + + if(cbAction < DO_EVERYTHING){ + cout << "Info: Call back activated. Data saving must be taken care of by user in call back." << endl; + if (rawDataReadyCallBack) + cout << "Info: Data Write has been defined externally" << endl; + }else if(!fileWriteEnable) + cout << "Info: Data will not be saved" << endl; + + + + //creating first file + //setting all value to 1 + pthread_mutex_lock(&status_mutex); + for(int i=0; i config_map){ FILE_LOG(logDEBUG) << __AT__ << " starting"; @@ -869,25 +984,113 @@ int UDPStandardImplementation::startReceiver(char *c=NULL){ cout << "Info: Starting Receiver" << endl; + + //RESET //reset measurement variables measurementStarted = false; startFrameIndex = 0; + frameIndex = 0; if(!acqStarted) currentFrameNumber = 0; //has to be zero to add to startframeindex for each scan for(int i = 0; i < numberofListeningThreads; ++i) totalListeningFrameCount[i] = 0; + packetsCaught = 0; + numMissingPackets = 0; + numTotMissingPackets = 0; + numTotMissingPacketsInFile = 0; + //reset file parameters + packetsInFile = 0; + if(sfilefd){ + fclose(sfilefd); + sfilefd = NULL; + } + //reset gui variables + guiData = NULL; + guiDataReady=0; + strcpy(guiFileName,""); + //reset masks + pthread_mutex_lock(&status_mutex); + writerThreadsMask = 0x0; + createFileMask = 0x0; + fileCreateSuccess = false; + pthread_mutex_unlock(&status_mutex); + + + //Print Receiver Configuration + cout << "Info: ***Receiver Configuration***" << endl; + cout << "Info: Max Packets Per File:" << maxPacketsPerFile << endl; + cout << "Info: Data Compression has been " << stringEnable(dataCompressionEnable) << endl; + if(myDetectorType != EIGER) + cout << "Info: Number of Jobs Per Buffer: " << numberofJobsPerBuffer << endl; + if(FrameToGuiFrequency) + cout << "Info: Frequency of frames sent to gui" << FrameToGuiFrequency << endl; + + //create UDP sockets if(createUDPSockets() == FAIL){ - + strcpy(c,"Could not create UDP Socket(s).\n"); + cout << endl; + cout << "Error: "<< c << endl; + return FAIL; } + if(setupWriter() == FAIL){ + //stop udp socket + shutDownUDPSockets(); + sprintf(c,"Could not create file %s.\n",savefilename); + cout << endl; + cout << "Error: "<< c << endl; + return FAIL; + } + + + //For compression, done to give the gui some proper name instead of always the last file name + if(dataCompressionEnable) + sprintf(savefilename, "%s/%s_fxxx_%d_xx.root", filePath,fileName,fileIndex); + + //initialize semaphore to synchronize between writer and gui reader threads + sem_init(&writerGuiSemaphore,1,0); + + //status and thread masks + pthread_mutex_lock(&status_mutex); + status = RUNNING; + for(int i=0;iShutDownSocket(); + delete udpSocket[i]; + udpSocket[i] = NULL; + } + } + return OK; +} @@ -929,16 +1132,13 @@ int UDPStandardImplementation::startReceiver(char *c=NULL){ UDPStandardImplementation::UDPStandardImplementation(){ thread_started = 0; eth = NULL; - latestData = NULL; - guiFileName = NULL; + tengigaEnable = 0; for(int i=0;igetErrorStatus(); - if(!iret){ - cout << "UDP port opened at port " << port[i] << endl; - }else{ -#ifdef VERBOSE - cprintf(BG_RED,"Could not create UDP socket on port %d error: %d\n", port[i], iret); -#endif - shutDownUDPSockets(); - return FAIL; - } - } - - - - return OK; -} - - - - - - - -int UDPStandardImplementation::shutDownUDPSockets(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - for(int i=0;iShutDownSocket(); - delete udpSocket[i]; - udpSocket[i] = NULL; - } - } - return OK; -} - - - - - - - -int UDPStandardImplementation::setupWriter(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - //reset writing thread variables - packetsInFile=0; - numTotMissingPackets = 0; - numTotMissingPacketsInFile = 0; - numMissingPackets = 0; - packetsCaught=0; - frameIndex=0; - if(sfilefd) {cprintf(RED,"**FILE not closed!\n");fclose(sfilefd);sfilefd=NULL;} - guiData = NULL; - guiDataReady=0; - strcpy(guiFileName,""); - cbAction = DO_EVERYTHING; - - pthread_mutex_lock(&status_mutex); - writerthreads_mask = 0x0; - createfile_mask = 0x0; - ret_createfile = OK; - pthread_mutex_unlock(&status_mutex); - - //printouts - cout << "Max Packets Per File:" << maxPacketsPerFile << endl; - if (rawDataReadyCallBack) - cout << "Note: Data Write has been defined exernally" << endl; - if (dataCompression) - cout << "Data Compression is enabled with " << numJobsPerThread << " number of jobs per thread" << endl; - if(nFrameToGui) - cout << "Sending every " << nFrameToGui << "th frame to gui" << endl; - - - - //acquisition start call back returns enable write - if (startAcquisitionCallBack) - cbAction=startAcquisitionCallBack(filePath,fileName,fileIndex,bufferSize,pStartAcquisition); - - if(cbAction < DO_EVERYTHING) - cout << endl << "Note: Call back activated. Data saving must be taken care of by user in call back." << endl; - else if(enableFileWrite==0) - cout << endl << "Note: Data will not be saved" << endl; - - - - //creating first file - - //mask - pthread_mutex_lock(&status_mutex); - for(int i=0;i