From 5ac37c2154e92d980c14ec584701f8243a5c67a6 Mon Sep 17 00:00:00 2001 From: Dhanya Maliakal Date: Tue, 13 Oct 2015 15:22:30 +0200 Subject: [PATCH] some more --- .../include/UDPBaseImplementation.h | 13 +- slsReceiverSoftware/include/UDPInterface.h | 10 +- .../include/UDPStandardImplementation.h | 371 ++- slsReceiverSoftware/include/logger.h | 6 + .../src/UDPBaseImplementation.cpp | 2 +- .../src/UDPStandardImplementation.cpp | 2256 ++++++++--------- 6 files changed, 1241 insertions(+), 1417 deletions(-) diff --git a/slsReceiverSoftware/include/UDPBaseImplementation.h b/slsReceiverSoftware/include/UDPBaseImplementation.h index c1de8fa6f..43f98b511 100644 --- a/slsReceiverSoftware/include/UDPBaseImplementation.h +++ b/slsReceiverSoftware/include/UDPBaseImplementation.h @@ -381,10 +381,10 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter * Get the buffer-current frame read by receiver * @param c pointer to current file name * @param raw address of pointer, pointing to current frame to send to gui - * @param startAcquisitionIndex start index of the acquisition - * @param startFrameIndex start index of the scan + * @param startAcq start index of the acquisition + * @param startFrame start index of the scan */ - void readFrame(char* c,char** raw, uint64_t &startAcquisitionIndex, uint64_t &startFrameIndex); + void readFrame(char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame); /** * abort acquisition with minimum damage: close open files, cleanup. @@ -393,8 +393,8 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter void abort(); //FIXME: needed, isn't stopReceiver enough? /** - * Closes all files - * @param i thread index, -1 for all threads + * Closes file / all files(if multiple files) + * @param i thread index (if multiple files used eg. root files) -1 for all threads */ void closeFile(int i = -1); @@ -438,6 +438,9 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter protected: + /************************************************************************* + * Class Members ********************************************************* + *************************************************************************/ //**detector parameters*** /** detector type */ detectorType myDetectorType; diff --git a/slsReceiverSoftware/include/UDPInterface.h b/slsReceiverSoftware/include/UDPInterface.h index a3766b847..b62ad2eb0 100644 --- a/slsReceiverSoftware/include/UDPInterface.h +++ b/slsReceiverSoftware/include/UDPInterface.h @@ -439,10 +439,10 @@ class UDPInterface { * Get the buffer-current frame read by receiver * @param c pointer to current file name * @param raw address of pointer, pointing to current frame to send to gui - * @param startAcquisitionIndex start index of the acquisition - * @param startFrameIndex start index of the scan + * @param startAcq start index of the acquisition + * @param startFrame start index of the scan */ - virtual void readFrame(char* c,char** raw, uint64_t &startAcquisitionIndex, uint64_t &startFrameIndex)=0; + virtual void readFrame(char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame)=0; /** * abort acquisition with minimum damage: close open files, cleanup. @@ -451,8 +451,8 @@ class UDPInterface { virtual void abort() = 0; //FIXME: needed, isnt stopReceiver enough? /** - * Closes all files - * @param i thread index, -1 for all threads + * Closes file / all files(if multiple files) + * @param i thread index (if multiple files used eg. root files) -1 for all threads */ virtual void closeFile(int i = -1) = 0; diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index 825806e83..cbd46d433 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -153,12 +153,63 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase int startReceiver(char *c=NULL); /** + * Overridden method + * Stop Listening for Packets + * Calls startReadout(), which stops listening and sets status to Transmitting + * When it has read every frame in buffer,it returns with the status Run_Finished + * Pre: status is running, semaphores have been instantiated, + * Post: udp sockets shut down, status is idle, semaphores destroyed + */ + void stopReceiver(); + + /** + * Overridden method + * Stop Listening to Packets + * and sets status to Transmitting + * Pre: status is running, udp sockets have been initialized, stop receiver initiated + * Post:udp sockets closed, status is transmitting + */ + void startReadout(); + + /** + * Overridden method * Shuts down and deletes UDP Sockets * @return OK or FAIL */ int shutDownUDPSockets(); + /** + * Overridden method + * Get the buffer-current frame read by receiver + * @param c pointer to current file name + * @param raw address of pointer, pointing to current frame to send to gui + * @param startAcq start index of the acquisition + * @param startFrame start index of the scan + */ + void readFrame(char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame); + + /** + * Overridden method + * Closes file / all files(data compression involves multiple files) + * @param i thread index valid for datacompression using root files, -1 for all threads + */ + void closeFile(int i = -1); + private: + /************************************************************************* + * Getters *************************************************************** + * They access local cache of configuration or detector parameters ******* + *************************************************************************/ + +/* + uint64_t (*getFrameNumber)(); + uint64_t eigerGetFrameNumber(); + uint64_t generalGetFrameNumber(); + getframenumber = &generalgetframenumber; + if(dettpe == eiger) getframenumber = &eigerGerFramenumber; + + call using getframenumber(); +*/ //**initial parameters*** @@ -194,6 +245,19 @@ private: */ void initializeFilter(); + /** + * Set up the Fifo Structure for processing buffers + * between listening and writer threads + * @return OK or FAIL + */ + int setupFifoStructure(); + + + + /************************************************************************* + * Listening and Writing Threads ***************************************** + *************************************************************************/ + /** * Create Listening Threads * @param destroy is true to destroy all the threads @@ -212,13 +276,6 @@ private: */ void setThreadPriorities(); - /** - * Set up the Fifo Structure for processing buffers - * between listening and writer threads - * @return OK or FAIL - */ - int setupFifoStructure(); - /** * Creates UDP Sockets * @return OK or FAIL @@ -232,7 +289,87 @@ private: */ int setupWriter(); + /** + * Creates new file + * @return OK or FAIL + */ + int createNewFile(); + /** + * Static function - Starts Listening Thread of this object + * @param this_pointer pointer to this object + */ + static void* startListeningThread(void *this_pointer); + + /** + * Static function - Starts Writing Thread of this object + * @param this_pointer pointer to this object + */ + static void* startWritingThread(void *this_pointer); + + /** + * Thread that listens to packets + * It pops the fifofree for free addresses, listens to packets and pushes them into the fifo + * This is continuously looped for each buffer in a nested loop, which is again looped for each acquisition + * Exits only for changing dynamic range, 10G parameters etc and recreated + * + */ + void startListening(); + + /** + * Thread started which writes packets to file. + * It pops the fifo, processes and writes packets to file and pushes the addresses into the fifoFree + * This is continuously looped for each buffer in a nested loop, which is again looped for each acquisition + * Exits only for changing dynamic range, 10G parameters etc and recreated + * + */ + void startWriting(); + + /** + * Called by startListening + * Listens to buffer, until packet(s) received or shutdownUDPsocket called by client + * Also copies carryovers from previous frame in front of buffer (gotthard and moench) + * For eiger, it ignores packets less than onePacketSize + * @param ithread listening thread index + * @param lSize number of bytes to listen to + * @param cSize number of bytes carried on from previous buffer + * @param temp temporary storage of previous buffer + * @return the number of bytes actually received + */ + int prepareAndListenBuffer(int ithread, int lSize, int cSize, char* temp); + + /** + * Called by startListening + * Its called for the first packet of a scan or acquistion + * Sets the startframeindices and the variables to know if acquisition started + * @param ithread listening thread number + */ + void startFrameIndices(int ithread); + + /** + * Called by prepareAndListenBuffer + * This is called when udp socket is shut down by client + * It pushes ffff instead of packet number into fifo + * to inform writers about the end of listening session + * Then sets the listening mask so that it stops listening and wait for next acquisition trigger + * @param ithread listening thread number + * @param numbytes number of bytes received + */ + void stopListening(int ithread, int numbytes); + + /* + * Called by startListening for gotthard and moench to handle split frames + * It processes listening thread buffers by ensuring split frames are in the same buffer + * @param ithread listening thread index + * @param cSize number of bytes carried over to the next buffer to reunite with split frame + * @param temp temporary buffer to store the split frame + * @return packet count + */ + uint32_t processListeningBuffer(int ithread, int cSize,char* temp); + + /************************************************************************* + * Class Members ********************************************************* + *************************************************************************/ //**detector parameters*** /** @@ -284,7 +421,18 @@ private: //***File parameters*** - /** Maximum Packets Per File **/ +#ifdef MYROOT1 + /** Tree where the hits are stored */ + TTree *myTree[MAX_NUMBER_OF_WRITER_THREADS]; + + /** File where the tree is saved */ + TFile *myFile[MAX_NUMBER_OF_WRITER_THREADS]; +#endif + + /** Complete File name */ + char completeFileName[MAX_STR_LENGTH]; + + /** Maximum Packets Per File **/ int maxPacketsPerFile; /** If file created successfully for all Writer Threads */ @@ -306,6 +454,9 @@ private: /** Current Frame Number */ uint64_t currentFrameNumber; + /** Previous Frame number from buffer to calculate loss */ + uint64_t previousFrameNumber; + /* Acquisition started */ bool acqStarted; @@ -356,6 +507,11 @@ private: /** Fifo Depth */ uint32_t fifoSize; + /** Missing Packet identifier value */ + const static uint16_t missingPacketValue = 0xFFFF; + + /** Dummy Packet identifier value */ + const static uint32_t dummyPacketValue = 0xFFFFFFFF; //***receiver to GUI parameters*** /** Current Frame copied for GUI */ @@ -381,6 +537,9 @@ private: /** Ensures if threads created successfully */ bool threadStarted; + /** Current Thread Index*/ + int currentThreadIndex; + /** Number of Listening Threads */ int numberofListeningThreads; @@ -414,9 +573,6 @@ private: /** Semaphores Synchronizing Writer Threads */ sem_t writerSemaphore[MAX_NUMBER_OF_WRITER_THREADS]; - /** Current Writer Thread Index*/ - int currentWriterThreadIndex; - /** Mask with each bit indicating status of each writer thread */ volatile uint32_t writerThreadsMask; @@ -447,9 +603,17 @@ private: //***mutex*** - /** mutex for status */ - pthread_mutex_t status_mutex; + /** Status mutex */ + pthread_mutex_t statusMutex; + /** Writing mutex */ + pthread_mutex_t writeMutex; + + /** GuiDataReady Mutex */ + pthread_mutex_t dataReadyMutex; + + /** Progress (currentFrameNumber) Mutex */ + pthread_mutex_t progressMutex; //***callback*** /** The action which decides what the user and default responsibilities to save data are @@ -475,32 +639,6 @@ private: - /** - * Returns the buffer-current frame read by receiver - * @param c pointer to current file name - * @param raw address of pointer, pointing to current frame to send to gui - * @param startAcquisitionIndex is the start index of the acquisition - * @param startFrameIndex is the start index of the scan - */ - void readFrame(char* c,char** raw, uint32_t &startAcquisitionIndex, uint32_t &startFrameIndex); - - /** - * Closes all files - * @param ithr thread index - */ - void closeFile(int ithr = -1); - - - /** - * Stops Receiver - stops listening for packets - * Returns success - */ - int stopReceiver(); - - /** set status to transmitting and - * when fifo is empty later, sets status to run_finished - */ - void startReadout(); @@ -513,8 +651,6 @@ private: */ void copyFrameToGui(char* startbuf[], char* buf=NULL); - - /** * Creates new tree and file for compression * @param ithr thread number @@ -523,40 +659,6 @@ private: */ int createCompressionFile(int ithr, int iframe); - /** - * Creates new file - *\returns OK for succces or FAIL for failure - */ - int createNewFile(); - - /** - * Static function - Thread started which listens to packets. - * Called by startReceiver() - * @param this_pointer pointer to this object - */ - static void* startListeningThread(void *this_pointer); - - /** - * Static function - Thread started which writes packets to file. - * Called by startReceiver() - * @param this_pointer pointer to this object - */ - static void* startWritingThread(void *this_pointer); - - /** - * Thread started which listens to packets. - * Called by startReceiver() - * - */ - int startListening(); - - /** - * Thread started which writes packets to file. - * Called by startReceiver() - * - */ - int startWriting(); - /** * Writing to file without compression * @param buf is the address of buffer popped out of fifo @@ -565,25 +667,6 @@ private: */ void writeToFile_withoutCompression(char* buf[],int numpackets, uint32_t framenum); - /** - * Its called for the first packet of a scan or acquistion - * Sets the startframeindices and the variables to know if acquisition started - * @param ithread listening thread number - * @param numbytes number of bytes it listened to - */ - void startFrameIndices(int ithread, int numbytes); - - /** - * This is called when udp socket is shut down - * It pops ffff instead of packet number into fifo - * to inform writers about the end of listening session - * @param ithread listening thread number - * @param rc number of bytes received - * @param pc packet count - * @param t total packets listened to - */ - void stopListening(int ithread, int rc, int &pc, int &t); - /** * When acquisition is over, this is called * @param ithread listening thread number @@ -615,110 +698,10 @@ private: - - - - - - - - - - - - - - - - - - - - - - /** missing packet identifier value */ - const static uint16_t missingPacketValue = 0xFFFF; - - -/** Complete File name */ - char savefilename[MAX_STR_LENGTH]; - - - - /** Previous Frame number from buffer */ - int prevframenum; - - - - - - - // TODO: not properly sure where to put these... - /** structure of an eiger image header*/ - - - - -//semaphores - - -//mutex - /** guiDataReady mutex */ - pthread_mutex_t dataReadyMutex; - - /** mutex for progress variable currframenum */ - pthread_mutex_t progress_mutex; - - /** mutex for writing data to file */ - pthread_mutex_t write_mutex; - //filter -#ifdef MYROOT1 - /** Tree where the hits are stored */ - TTree *myTree[MAX_NUM_WRITER_THREADS]; - - /** File where the tree is saved */ - TFile *myFile[MAX_NUM_WRITER_THREADS]; -#endif - - - - -public: - - - /** - callback arguments are - filepath - filename - fileindex - datasize - - return value is - 0 callback takes care of open,close,wrie file - 1 callback writes file, we have to open, close it - 2 we open, close, write file, callback does not do anything - */ - void registerCallBackStartAcquisition(int (*func)(char*, char*,int, int, void*),void *arg){startAcquisitionCallBack=func; pStartAcquisition=arg;}; - - /** - callback argument is - toatal frames caught - */ - void registerCallBackAcquisitionFinished(void (*func)(int, void*),void *arg){acquisitionFinishedCallBack=func; pAcquisitionFinished=arg;}; - - /** - args to raw data ready callback are - framenum - datapointer - datasize in bytes - file descriptor - guidatapointer (NULL, no data required) - */ - void registerCallBackRawDataReady(void (*func)(int, char*, int, FILE*, char*, void*),void *arg){rawDataReadyCallBack=func; pRawDataReady=arg;}; }; diff --git a/slsReceiverSoftware/include/logger.h b/slsReceiverSoftware/include/logger.h index cba284fb1..5e964da27 100644 --- a/slsReceiverSoftware/include/logger.h +++ b/slsReceiverSoftware/include/logger.h @@ -23,6 +23,12 @@ #define TOSTRING(x) STRINGIFY(x) #define MYCONCAT(x,y) #define __AT__ string(__FILE__) + string("::") + string(__func__) + string("(): ") +#define __SHORT_FORM_OF_FILE__ \ +(strrchr(__FILE__,'/') \ +? strrchr(__FILE__,'/')+1 \ +: __FILE__ \ +) +#define __SHORT_AT__ string(__SHORT_FORM_OF_FILE__) + string("::") + string(__func__) + string("(): ") //":" TOSTRING(__LINE__) diff --git a/slsReceiverSoftware/src/UDPBaseImplementation.cpp b/slsReceiverSoftware/src/UDPBaseImplementation.cpp index ec5c27ad5..75f7fa244 100644 --- a/slsReceiverSoftware/src/UDPBaseImplementation.cpp +++ b/slsReceiverSoftware/src/UDPBaseImplementation.cpp @@ -37,7 +37,7 @@ UDPBaseImplementation::UDPBaseImplementation(){ //***connection parameters*** strcpy(eth,""); - for(int i=0;i /proc/sys/net/core/rmem_max")) + cout << "Warning: No root permission to change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl; + else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")) + cout << "Warning: No root permission to change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl; + /** permanent setting by heiner + net.core.rmem_max = 104857600 # 100MiB + net.core.netdev_max_backlog = 250000 + sysctl -p + // from the manual + sysctl -w net.core.rmem_max=16777216 + sysctl -w net.core.netdev_max_backlog=250000 + */ } UDPStandardImplementation::~UDPStandardImplementation(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG(logDEBUG1) << __AT__ << " called"; deleteMembers(); } @@ -49,13 +69,13 @@ UDPStandardImplementation::~UDPStandardImplementation(){ /***initial parameters***/ void UDPStandardImplementation::deleteBaseMembers(){ - FILE_LOG(logDEBUG) << __AT__ << " starting"; + FILE_LOG(logDEBUG1) << __AT__ << " starting"; UDPBaseImplementation::~UDPBaseImplementation(); } void UDPStandardImplementation::deleteMembers(){ - FILE_LOG(logDEBUG) << __AT__ << " starting"; + FILE_LOG(logDEBUG1) << __AT__ << " starting"; cout << "Info: Deleting member pointers" << endl; shutDownUDPSockets(); @@ -77,7 +97,7 @@ void UDPStandardImplementation::deleteMembers(){ } void UDPStandardImplementation::deleteFilter(){ - FILE_LOG(logDEBUG) << __AT__ << " starting"; + FILE_LOG(logDEBUG1) << __AT__ << " starting"; moenchCommonModeSubtraction = NULL; for(int i=0; igetErrorStatus(); - if(!iret){ - cout << "Info: UDP port opened at port " << port[i] << endl; - }else{ -#ifdef VERBOSE - cprintf(BG_RED,"Error: Could not create UDP socket on port %d error: %d\n", port[i], iret); -#endif - shutDownUDPSockets(); - return FAIL; - } - } - - cout << "Info: UDP socket(s) created successfully." << endl; - cout << "Info: Listener Ready ..." << endl; - - return OK; -} - - - -int UDPStandardImplementation::setupWriter(){ - FILE_LOG(logDEBUG) << __AT__ << " starting"; - - //acquisition start call back returns enable write - cbAction = DO_EVERYTHING; - if (startAcquisitionCallBack) - cbAction=startAcquisitionCallBack(filePath,fileName,fileIndex,bufferSize,pStartAcquisition); - - if(cbAction < DO_EVERYTHING){ - cout << "Info: Call back activated. Data saving must be taken care of by user in call back." << endl; - if (rawDataReadyCallBack) - cout << "Info: Data Write has been defined externally" << endl; - }else if(!fileWriteEnable) - cout << "Info: Data will not be saved" << endl; - - - - //creating first file - //setting all value to 1 - pthread_mutex_lock(&status_mutex); - for(int i=0; i config_map){ - FILE_LOG(logDEBUG) << __AT__ << " starting"; + FILE_LOG(logDEBUG1) << __AT__ << " starting"; map::const_iterator pos; pos = config_map.find("mode"); @@ -596,7 +376,7 @@ void UDPStandardImplementation::configure(map config_map){ /***file parameters***/ int UDPStandardImplementation::setDataCompressionEnable(const bool b){ - FILE_LOG(logDEBUG) << __AT__ << " starting"; + FILE_LOG(logDEBUG1) << __AT__ << " starting"; cout << "Info: Setting up Data Compression Enable to " << stringEnable(b); #ifdef MYROOT1 @@ -609,9 +389,9 @@ int UDPStandardImplementation::setDataCompressionEnable(const bool b){ dataCompressionEnable = b; //-- create writer threads depending on enable - pthread_mutex_lock(&status_mutex); + pthread_mutex_lock(&statusMutex); writerThreadsMask = 0x0; - pthread_mutex_unlock(&(status_mutex)); + pthread_mutex_unlock(&(statusMutex)); createWriterThreads(true); if(b) @@ -638,7 +418,7 @@ int UDPStandardImplementation::setDataCompressionEnable(const bool b){ /***acquisition parameters***/ void UDPStandardImplementation::setShortFrameEnable(const int i){ - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG(logDEBUG1) << __AT__ << " called"; shortFrameEnable = i; @@ -675,7 +455,7 @@ void UDPStandardImplementation::setShortFrameEnable(const int i){ int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t i){ - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG(logDEBUG1) << __AT__ << " called"; if(i >= 0){ FrameToGuiFrequency = i; @@ -690,7 +470,7 @@ int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t i){ int UDPStandardImplementation::setAcquisitionPeriod(int64_t i){ - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG(logDEBUG1) << __AT__ << " called"; if(i >= 0){ acquisitionPeriod = i; @@ -705,7 +485,7 @@ int UDPStandardImplementation::setAcquisitionPeriod(int64_t i){ } int UDPStandardImplementation::setDynamicRange(const uint32_t i){ - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG(logDEBUG1) << __AT__ << " called"; int oldDynamicRange = dynamicRange; @@ -759,7 +539,7 @@ int UDPStandardImplementation::setDynamicRange(const uint32_t i){ int UDPStandardImplementation::setTenGigaEnable(const bool b){ - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG(logDEBUG1) << __AT__ << " called"; cout << "Info: Setting Ten Giga to " << string(b) << endl; bool oldTenGigaEnable = tengigaEnable; @@ -829,6 +609,10 @@ int UDPStandardImplementation::setTenGigaEnable(const bool b){ + + + + /************************************************************************* * Behavioral functions*************************************************** * They may modify the status of the receiver **************************** @@ -837,7 +621,7 @@ int UDPStandardImplementation::setTenGigaEnable(const bool b){ /***initial functions***/ int UDPStandardImplementation::setDetectorType(const slsReceiverDefs::detectorType d){ - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG(logDEBUG1) << __AT__ << " called"; cout << "Setting receiver type ..." << endl; @@ -857,7 +641,7 @@ int UDPStandardImplementation::setDetectorType(const slsReceiverDefs::detectorTy cout << "Info: ***** This is a " << slsDetectorBase::getDetectorType(d) << " Receiver *****" << endl; break; default: - cout << "Error: This is an unknown receiver type " << (int)d << endl; + cprintf(BG_RED, "Error: This is an unknown receiver type %d\n", (int)d); return FAIL; } @@ -934,9 +718,9 @@ int UDPStandardImplementation::setDetectorType(const slsReceiverDefs::detectorTy //delete threads and set number of listening threads if(myDetectorType == EIGER){ - pthread_mutex_lock(&status_mutex); + pthread_mutex_lock(&statusMutex); listeningThreadsMask = 0x0; - pthread_mutex_unlock(&(status_mutex)); + pthread_mutex_unlock(&(statusMutex)); if(threadStarted) createListeningThreads(true); numberofListeningThreads = MAX_NUMBER_OF_LISTENING_THREADS; @@ -949,11 +733,11 @@ int UDPStandardImplementation::setDetectorType(const slsReceiverDefs::detectorTy //create threads if(createListeningThreads() == FAIL){ cprintf(BG_RED,"Error: Could not create listening thread\n"); - exit (-1); + return FAIL; } if(createWriterThreads() == FAIL){ cprintf(BG_RED,"Error: Could not create writer threads\n"); - exit (-1); + return FAIL; } setThreadPriorities(); @@ -969,7 +753,7 @@ int UDPStandardImplementation::setDetectorType(const slsReceiverDefs::detectorTy /***acquisition functions***/ void UDPStandardImplementation::resetAcquisitionCount(){ - FILE_LOG(logDEBUG) << __AT__ << " starting"; + FILE_LOG(logDEBUG1) << __AT__ << " starting"; totalPacketsCaught = 0; acqStarted = false; @@ -980,7 +764,7 @@ void UDPStandardImplementation::resetAcquisitionCount(){ int UDPStandardImplementation::startReceiver(char *c=NULL){ - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG(logDEBUG1) << __AT__ << " called"; cout << "Info: Starting Receiver" << endl; @@ -1009,11 +793,11 @@ int UDPStandardImplementation::startReceiver(char *c=NULL){ guiDataReady=0; strcpy(guiFileName,""); //reset masks - pthread_mutex_lock(&status_mutex); + pthread_mutex_lock(&statusMutex); writerThreadsMask = 0x0; createFileMask = 0x0; fileCreateSuccess = false; - pthread_mutex_unlock(&status_mutex); + pthread_mutex_unlock(&statusMutex); //Print Receiver Configuration @@ -1024,6 +808,8 @@ int UDPStandardImplementation::startReceiver(char *c=NULL){ cout << "Info: Number of Jobs Per Buffer: " << numberofJobsPerBuffer << endl; if(FrameToGuiFrequency) cout << "Info: Frequency of frames sent to gui" << FrameToGuiFrequency << endl; + else + cout << "Info: Random frames sent to gui" << endl; @@ -1031,54 +817,84 @@ int UDPStandardImplementation::startReceiver(char *c=NULL){ if(createUDPSockets() == FAIL){ strcpy(c,"Could not create UDP Socket(s).\n"); cout << endl; - cout << "Error: "<< c << endl; + cprintf(BG_RED, "Error: %s\n",c); return FAIL; } if(setupWriter() == FAIL){ //stop udp socket shutDownUDPSockets(); - sprintf(c,"Could not create file %s.\n",savefilename); + sprintf(c,"Could not create file %s.\n",completeFileName); cout << endl; - cout << "Error: "<< c << endl; + cprintf(BG_RED, "Error: %s\n",c); return FAIL; } - //For compression, done to give the gui some proper name instead of always the last file name + //For compression, just for gui purposes if(dataCompressionEnable) - sprintf(savefilename, "%s/%s_fxxx_%d_xx.root", filePath,fileName,fileIndex); + sprintf(completeFileName, "%s/%s_fxxx_%d_xx.root", filePath,fileName,fileIndex); //initialize semaphore to synchronize between writer and gui reader threads sem_init(&writerGuiSemaphore,1,0); //status and thread masks - pthread_mutex_lock(&status_mutex); + pthread_mutex_lock(&statusMutex); status = RUNNING; - for(int i=0;i /proc/sys/net/core/rmem_max")) - cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl; - else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")) - cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl; - - /** permanent setting heiner - net.core.rmem_max = 104857600 # 100MiB - net.core.netdev_max_backlog = 250000 - sysctl -p - // from the manual - sysctl -w net.core.rmem_max=16777216 - sysctl -w net.core.netdev_max_backlog=250000 - */ + if(status == RUNNING){ + //wait for all packets + uint64_t prev = totalPacketsCaught; + usleep(50000); + while(prev!=totalPacketsCaught){ + prev=totalPacketsCaught; + usleep(50000); } + //set status + pthread_mutex_lock(&statusMutex); + status = TRANSMITTING; + pthread_mutex_unlock(&statusMutex); + cout << "Info: Status: Transmitting" << endl; + } - -void UDPStandardImplementation::initializeMembers(){ - myDetectorType = GENERIC; - enableFileWrite = 1; - overwrite = 1; - fileIndex = 0; - scanTag = 0; - frameIndexNeeded = 0; + //shut down udp sockets and make listeners push dummy (end) packets for writers + shutDownUDPSockets(); +} - packetsCaught = 0; - totalPacketsCaught = 0; - startAcquisitionIndex = 0; - acquisitionIndex = 0; +void UDPStandardImplementation::readFrame(char* c,char** raw, uint64_t &startAcq, uint64_t &startFrame){ + FILE_LOG(logDEBUG1) << __AT__ << " called"; - frameIndexMask = 0; - packetIndexMask = 0; - frameIndexOffset = 0; - acquisitionPeriod = SAMPLE_TIME_IN_NS; - numberOfFrames = 0; - dynamicRange = 16; - shortFrame = -1; - currframenum = 0; - prevframenum = 0; + //point to gui data, to let writer thread know that gui is back for data + if (guiData == NULL){ + guiData = latestData; +#ifdef DEBUG4 + cprintf(CYAN,"Info: gui data not null anymore - ready to get data\n"); +#endif + } + + //copy data and filename + strcpy(c,guiFileName); + startAcq = startAcquisitionIndex; + startFrame = startFrameIndex; - nFrameToGui = 0; - dataCompression = false; - numListeningThreads = 1; - numWriterThreads = 1; - thread_started = 0; + //gui data not copied yet + if(!guiDataReady){ +#ifdef DEBUG4 + cprintf(CYAN,"Info: gui data not ready\n"); +#endif + *raw = NULL; + } + + //gui data ready, pass address to gui to copy the data + else{ +#ifdef DEBUG4 + cprintf(CYAN,"Info: gui data ready\n"); +#endif + *raw = guiData; + guiData = NULL; + + //for nth frame to gui, post semaphore so writer stops waiting + if((FrameToGuiFrequency) && (writerThreadsMask)){ +#ifdef DEBUG4 + cprintf(CYAN,"Info: gonna post\n"); +#endif + //release after getting data + sem_post(&smp); + } +#ifdef DEBUG4 + cprintf(CYAN,"Info: done post\n"); +#endif + + } +} - tengigaEnable = 0; +void UDPStandardImplementation::closeFile(int i){ + FILE_LOG(logDEBUG1) << __AT__ << " called for " << i ; + //normal + if(!dataCompressionEnable){ + if(sfilefd){ +#ifdef DEBUG4 + cprintf(YELLOW, "Going to close file:%d\n",fileno(sfilefd)); +#endif + fclose(sfilefd); + sfilefd = NULL; + } + } + //compression + else{ +#if (defined(MYROOT1) && defined(ALLFILE_DEBUG)) || !defined(MYROOT1) + if(sfilefd){ +#ifdef DEBUG4 + cout << "sfield:" << (int)sfilefd << endl; +#endif + fclose(sfilefd); + sfilefd = NULL; + } +#endif - eth = NULL; - - - - cmSub = NULL; - - - //diff threads - for(int i=0;iGetCurrentFile(); + + if(myFile[i]->Write()) + //->Write(tall->GetName(),TObject::kOverwrite); + cout << "Info: Thread " << i <<": wrote frames to file" << endl; + else + cout << "Info: Thread " << i << ": could not write frames to file" << endl; + + }else + cout << "Info: Thread " << i << ": could not write frames to file: No file or No Tree" << endl; + //close file + if(myTree[i] && myFile[i]) + myFile[i] = myTree[i]->GetCurrentFile(); + if(myFile[i] != NULL) + myFile[i]->Close(); + myFile[i] = NULL; + myTree[i] = NULL; + pthread_mutex_unlock(&writeMutex); + +#endif + } +} + + + + +/************************************************************************* + * Listening and Writing Threads ***************************************** + *************************************************************************/ + + +int UDPStandardImplementation::createListeningThreads(bool destroy){ + FILE_LOG(logDEBUG1) << __AT__ << " starting"; + + //reset masks + killAllListeningThreads = false; + pthread_mutex_lock(&statusMutex); + listeningThreadsMask = 0x0; + pthread_mutex_unlock(&(statusMutex)); + + //destroy + if(destroy){ + cout << "Info: Destroying Listening Thread(s)" << endl; + + killAllListeningThreads = true; + for(int i = 0; i < numberofListeningThreads; ++i){ + sem_post(&listenSemaphore[i]); + pthread_join(listeningThreads[i],NULL); + cout <<"."<getErrorStatus(); + if(!iret){ + cout << "Info: UDP port opened at port " << port[i] << endl; + }else{ +#ifdef VERBOSE + cprintf(BG_RED,"Error: Could not create UDP socket on port %d error: %d\n", port[i], iret); +#endif + shutDownUDPSockets(); + return FAIL; + } + } + + cout << "Info: UDP socket(s) created successfully." << endl; + cout << "Info: Listener Ready ..." << endl; + + return OK; +} + + + +int UDPStandardImplementation::setupWriter(){ + FILE_LOG(logDEBUG1) << __AT__ << " starting"; + + //acquisition start call back returns enable write + cbAction = DO_EVERYTHING; + if (startAcquisitionCallBack) + cbAction=startAcquisitionCallBack(filePath,fileName,fileIndex,bufferSize,pStartAcquisition); + + if(cbAction < DO_EVERYTHING){ + cout << "Info: Call back activated. Data saving must be taken care of by user in call back." << endl; + if (rawDataReadyCallBack) + cout << "Info: Data Write has been defined externally" << endl; + }else if(!fileWriteEnable) + cout << "Info: Data will not be saved" << endl; + + + + //creating first file + //setting all value to 1 + pthread_mutex_lock(&statusMutex); + for(int i=0; i DO_NOTHING){ + + //close file pointers + if(sfilefd){ + fclose(sfilefd); + sfilefd = NULL; + } + + //create file + if(!overwriteEnable){ + if (NULL == (sfilefd = fopen((const char *) (completeFileName), "wx"))){ + cprintf(BG_RED,"Error: Could not create/overwrite file %s\n",completeFileName); + return FAIL; + } + }else if (NULL == (sfilefd = fopen((const char *) (completeFileName), "w"))){ + cprintf(BG_RED,"Error: Could not create file %s\n",completeFileName); + return FAIL; + } + //setting file buffer size to 16mb + setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); + + //Print packet loss and filenames + if(!packetsCaught){ + previousFrameNumber = -1; + cout << "Info: " << completeFileName << endl; + }else{ + cout << "Info:" << completeFileName + << "\tPacket Loss: " << setw(4)<startListening(); + return this_pointer; +} + + + +void* UDPStandardImplementation::startWritingThread(void* this_pointer){ + FILE_LOG(logDEBUG1) << __AT__ << " called"; + ((UDPStandardImplementation*)this_pointer)->startWriting(); + return this_pointer; +} + + + + +void UDPStandardImplementation::startListening(){ + FILE_LOG(logDEBUG1) << __AT__ << " called"; + + //set current thread value index + int ithread = currentThreadIndex; + //let calling function know thread started and obtained current + threadStarted = 1; + + + //variable definitions + int listenSize = 0; //listen to only 1 packet + uint32_t rc; //size of buffer received in bytes + //split frames + int carryonBufferSize; //from previous buffer to keep frames together in a buffer + char* tempBuffer = NULL; //temporary buffer to store split frames + if(myDetectorType != EIGER){ + listenSize = bufferSize * numberofJobsPerBuffer; //listen to more than 1 packet + tempBuffer = new char[onePacketSize * (packetsPerFrame - 1)]; //store maximum of 1 packets less in a frame + } + /* outer loop - loops once for each acquisition */ + //infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again) + while(true){ + + //reset parameters before acquisition + carryonBufferSize = 0; + + /* inner loop - loop for each buffer */ + //until mask unset (udp sockets shut down by client) + while((1 << ithread) & listeningThreadsMask){ + + //pop from fifo + fifoFree[ithread]->pop(buffer[ithread]); +#ifdef FIFODEBUG + cprintf(BLUE,"%d :Listener popped from fifofree %p\n", ithread, (void*)(buffer[ithread])); +#endif + + //udpsocket doesnt exist + if(udpSocket[ithread] == NULL){ + cprintf(RED, "Error: Thread %d :UDP Socket not created\n",ithread); + stopListening(ithread,0); + continue; + } + + rc = prepareAndListenBuffer(ithread, listenSize, carryonBufferSize, tempBuffer); + + //start indices for each start of scan/acquisition + if((!measurementStarted) && (rc > 0)){ + pthread_mutex_lock(&progressMutex); + if(!measurementStarted) + startFrameIndices(ithread); + pthread_mutex_unlock(&progressMutex); + } + + //problem in receiving or end of acquisition + if (status == TRANSMITTING){ + stopListening(ithread,rc); + continue; + } + + //write packet count to buffer + if(myDetectorType == EIGER) + (*((uint32_t*)(buffer[ithread]))) = 1; + //handling split frames and writing packet Count to buffer + else + (*((uint32_t*)(buffer[ithread]))) = processListeningBuffer(ithread, carryonBufferSize, tempBuffer); + + + //push buffer to FIFO + while(!fifo[ithread]->push(buffer[ithread])); +#ifdef FIFODEBUG + cprintf(BLUE,"Listening_Thread %d: Listener pushed into fifo %p\n",ithread, (void*)(buffer[ithread])); +#endif + + }/*--end of loop for each buffer (inner loop)*/ + + //end of acquisition, wait for next acquisition/change of parameters + sem_wait(&listenSemaphore[ithread]); + + //check to exit thread (for change of parameters) - only EXIT possibility + if(killAllListeningThreads){ + cprintf(GREEN,"Listening_Thread %d:Goodbye!\n",ithread); + //free resources at exit + if(tempBuffer) delete[] tempBuffer; + pthread_exit(NULL); + } + + }/*--end of loop for each acquisition (outer loop) */ +} + + + + + +int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int lSize, int cSize, char* temp){ + FILE_LOG(logDEBUG1) << __AT__ << " called"; + + //listen to UDP packets + memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, temp, cSize); + int receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, lSize + cSize); + + //throw away packets that is not one packet size, need to check status if socket is shut down + while(status != TRANSMITTING && myDetectorType == EIGER && receivedSize != onePacketSize) { + if(receivedSize != EIGER_HEADER_LENGTH) + cprintf(RED,"Listening_Thread %d: Listened to a weird packet size %d\n",receivedSize); +#ifdef DEBUG + else + cprintf(BLUE,"Listening_Thread %d: Listened to a header packet\n",ithread); +#endif + receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS); + } + +#ifdef DEBUG + cprintf(BLUE, "Listening_Thread %d : Received bytes: %d. Expected bytes: %d\n", ithread, receivedSize, expected-cSize); +#endif + return receivedSize; +} + + +void UDPStandardImplementation::startFrameIndices(int ithread){ + FILE_LOG(logDEBUG1) << __AT__ << " called"; + + //determine startFrameIndex + switch(myDetectorType){ + case EIGER: + startFrameIndex = 0; //frame number always resets + break; + default: + if(shortFrameEnable < 0){ + startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) + & (frameIndexMask)) >> frameIndexOffset); + }else{ + startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS)))) + & (frameIndexMask)) >> frameIndexOffset); + } + break; + } + + //start of entire acquisition + if(!acqStarted){ + startAcquisitionIndex = startFrameIndex; + acqStarted = true; + cprintf(BLUE,"Info: Thread %d: startAcquisitionIndex:%d\n",ithread,startAcquisitionIndex); + } + + //set start of scan/real time measurement + cprintf(BLUE,"Info: Thread %d: startFrameIndex: %d\n", ithread,startFrameIndex); + measurementStarted = true; +} + + + + + +void UDPStandardImplementation::stopListening(int ithread, int numbytes){ + FILE_LOG(logDEBUG1) << __AT__ << " called"; + + cout << "Info: Stop Listening. Status:" << slsDetectorBase::runStatusType(status) << endl; + + + //less than 1 packet size (especially for eiger), ignore the buffer (so that 2 dummy buffers are not sent with pc=0) + if(numbytes < onePacketSize) + numbytes = 0; + + + //free empty buffer + if(numbytes <= 0){ + cprintf(BLUE,"Info: Thread %d :End of Acquisition for Listening Thread\n", ithread); + while(!fifoFree[ithread]->push(buffer[ithread])); +#ifdef FIFODEBUG + cprintf(BLUE,"Listening_Thread %d :Listener push empty buffer into fifofree %p\n", ithread, (void*)(buffer[ithread])); #endif } - - - strcpy(savefilename,""); - - - //strcpy(filePath,""); - //strcpy(fileName,"run"); - - - - //status - pthread_mutex_lock(&status_mutex); - status = IDLE; - pthread_mutex_unlock(&(status_mutex)); - -} - - - -UDPStandardImplementation::~UDPStandardImplementation(){ FILE_LOG(logDEBUG) << __AT__ << " called"; - - createListeningThreads(true); - createWriterThreads(true); - deleteMembers(); -} - - - - -void UDPStandardImplementation::deleteMembers(){ FILE_LOG(logDEBUG) << __AT__ << " called"; - - //kill threads - if(thread_started){ - createListeningThreads(true); - createWriterThreads(true); + //push last non empty buffer into fifo + else{ + (*((uint32_t*)(buffer[ithread]))) = numbytes/onePacketSize; + totalListeningFrameCount[ithread] += (numbytes/onePacketSize); +#ifdef DEBUG + cprintf(BLUE,"Listening_Thread %d: Last Buffer numBytes:%d\n",ithread, numbytes); + cprintf(BLUE,"Listening_Thread %d: Last Buffer packet count:%d\n",ithread, numbytes/onePacketSize); +#endif + while(!fifo[ithread]->push(buffer[ithread])); +#ifdef FIFODEBUG + cprintf(BLUE,"Listening_Thread %d: Listener Last Buffer pushed into fifo %p\n", ithread,(void*)(buffer[ithread])); +#endif } - for(int i=0;ipop(buffer[ithread]); + //creating dummy-end buffer with pc=0xFFFF + (*((uint32_t*)(buffer[ithread]))) = dummyPacketValue; + while(!fifo[ithread]->push(buffer[ithread])); +#ifdef FIFODEBUG + cprintf(BLUE,"Listening_Thread %d: Listener pushed dummy-end buffer into fifo %p\n", ithread,(void*)(buffer[ithread])); +#endif + } + + + //reset mask and exit loop + pthread_mutex_lock(&statusMutex); + listeningThreadsMask^=(1< 1) + cprintf(BLUE,"Listening_Thread %d: Waiting for other listening threads to be done.. current mask:0x%x\n", ithread, listeningThreadsMask); +#endif + while(listeningThreadsMask) + usleep(5000); +#ifdef DEBUG4 + int t=0; + for(i=0;i> frameIndexOffset)); +#endif + cSize = onePacketSize; + --packetCount; + } } - if(receiverdata[i]){ - delete receiverdata[i]; - receiverdata[i] = NULL; - } - } - shutDownUDPSockets(); - if(eth) {delete [] eth; eth = NULL;} - if(latestData) {delete [] latestData; latestData = NULL;} +#ifdef DEBUG4 + cprintf(BLUE, "Listening_Thread %d: First Header:%d\n", (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) + & (frameIndexMask)) >> frameIndexOffset)); +#endif + break; - for(int i=0;i> frameIndexOffset), + ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (packetIndexMask)), + (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset), + ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask)), + lastPacketOffset); +#endif + //moench last packet value is 0, so find the last packet and store the others in a temp storage + if( ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastPacketOffset))))) & (packetIndexMask))){ + lastFrameHeader = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastPacketOffset))))) + & (frameIndexMask)) >> frameIndexOffset; + cSize += onePacketSize; + lastPacketOffset -= onePacketSize; + --packetCount; + while (lastFrameHeader == (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastPacketOffset))))) & (frameIndexMask)) >> frameIndexOffset)){ + cSize += onePacketSize; + lastPacketOffset -= onePacketSize; + --packetCount; + } + memcpy(temp, buffer[ithread]+(lastPacketOffset+onePacketSize), cSize); +#ifdef DEBUG4 + cprintf(BLUE, "Listening_Thread %d: temp Header:%d\t temp Packet:%d\n", + (((((uint32_t)(*((uint32_t*)(temp)))))& (frameIndexMask)) >> frameIndexOffset), + ((((uint32_t)(*((uint32_t*)(temp))))) & (packetIndexMask))); +#endif + } + break; + + default: + cprintf(RED,"Listening_Thread %d: Error: This detector is not implemented in the receiver" + + slsDetectorBase::getDetectorType(myDetectorType).c_str() + "\n"); + break; } +#ifdef DEBUG4 + cprintf(BLUE,"Listening_Thread %d: PacketCount:%d CarryonBufferSize:%d\n",ithread, packetCount, cSize); +#endif + + return packetCount; +} + + + + + + +void UDPStandardImplementation::startWriting(){ + FILE_LOG(logDEBUG1) << __AT__ << " called"; + + //set current thread value index + int ithread = currentThreadIndex; + //let calling function know thread started and obtained current + threadStarted = 1; + + //variable definitions + char* wbuf[MAX_NUMBER_OF_LISTENING_THREADS] = NULL; + sfilefd = NULL; + + + /* outer loop - loops once for each acquisition */ + //infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again) + while(true){ + + //--reset parameters before acquisition + //--end of reset parameters before acquisition + + /* inner loop - loop for each buffer */ + //until mask unset (udp sockets shut down by client) + while((1 << ithread) & writerThreadsMask){ + + + + }/*--end of loop for each buffer (inner loop)*/ + + + //in case they are not closed already + closeFile(); +#ifdef DEBUG4 + cprintf(GREEN,"Writing_Thread %d: Done with acquisition. Waiting for 1st sem to create new file/change of parameters\n", ithread); +#endif + //end of acquisition, wait for file create/change of parameters + sem_wait(&writerSemaphore[ithread]); + //check to exit thread (for change of parameters) - only EXIT possibility + if(killAllWritingThreads){ + cprintf(GREEN,"Writing_Thread %d:Goodbye!\n",ithread); + //free resources at exit + for(int i=0; i DO_NOTHING){ - //close - if(sfilefd){ - if(fclose(sfilefd)){ - cprintf(RED, "file close problem %d\n",fileno(sfilefd)); - fclose(sfilefd); - } - sfilefd = NULL; - } - - //open file - if(!overwrite){ - if (NULL == (sfilefd = fopen((const char *) (savefilename), "wx"))){ - cprintf(BG_RED,"Error: Could not create new file %s\n",savefilename); - return FAIL; - } - }else if (NULL == (sfilefd = fopen((const char *) (savefilename), "w"))){ - cprintf(BG_RED,"Error: Could not create file %s\n",savefilename); - return FAIL; - } - //setting buffer - setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); - - - //printing packet losses and file names - if(!packetsCaught) - cout << savefilename << endl; - else{ - cout << savefilename - << "\tpacket loss " - << setw(4)<GetCurrentFile(); - - if(myFile[ithr]->Write()) - //->Write(tall->GetName(),TObject::kOverwrite); - cout << "Thread " << ithr <<": wrote frames to file" << endl; - else - cout << "Thread " << ithr << ": could not write frames to file" << endl; - - }else - cout << "Thread " << ithr << ": could not write frames to file: No file or No Tree" << endl; - //close file - if(myTree[ithr] && myFile[ithr]) - myFile[ithr] = myTree[ithr]->GetCurrentFile(); - if(myFile[ithr] != NULL) - myFile[ithr]->Close(); - myFile[ithr] = NULL; - myTree[ithr] = NULL; - pthread_mutex_unlock(&write_mutex); - -#endif - } -} - - - - -/** - * Pre: status is running, semaphores have been instantiated, - * Post: udp sockets shut down, status is idle, sempahores destroyed - * */ - -int UDPStandardImplementation::stopReceiver(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - if(status != IDLE){ - //#ifdef VERBOSE - cout << "Stopping Receiver" << endl; - //#endif - - startReadout(); - - while(status == TRANSMITTING){ - sem_post(&smp); - usleep(5000); - } - - //semaphore destroy - sem_destroy(&smp); - - //change status - pthread_mutex_lock(&status_mutex); - status = IDLE; - pthread_mutex_unlock(&(status_mutex)); - - cout << "Receiver Stopped.\nStatus:" << status << endl << endl; - }else cout <<" Not idle to stop receiver" << endl; - - - //sem_post(&smp); - - return OK; -} - - - - -/** - * Pre: status is running, udp sockets have been initialized, - * stop receiver initiated - * Post:udp sockets closed, status is transmitting - * */ -void UDPStandardImplementation::startReadout(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - //#ifdef VERBOSE - cout << "Start Receiver Readout" << endl; - //#endif - - if(status == RUNNING){ - - //wait so that all packets which take time has arrived - usleep(5000); - - /********************************************/ - //usleep(10000000); - //usleep(2000000); - uint32_t prev = totalPacketsCaught; - usleep(50000); - while(prev!=totalPacketsCaught){ - prev=totalPacketsCaught; - usleep(50000); - } - pthread_mutex_lock(&status_mutex); - status = TRANSMITTING; - pthread_mutex_unlock(&status_mutex); - cout << "Status: Transmitting" << endl; - } - - //kill udp socket to tell the listening thread to push last packet - shutDownUDPSockets(); - -} - - - -void* UDPStandardImplementation::startListeningThread(void* this_pointer){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - ((UDPStandardImplementation*)this_pointer)->startListening(); - - return this_pointer; -} - - - -void* UDPStandardImplementation::startWritingThread(void* this_pointer){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - ((UDPStandardImplementation*)this_pointer)->startWriting(); - return this_pointer; -} - - - - - - -int UDPStandardImplementation::startListening(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - int ithread = currentListeningThreadIndex; -#ifdef VERYVERBOSE - cprintf(BLUE, "In startListening()\n "); -#endif - - thread_started = 1; - - int total; - int lastpacketoffset, expected, rc,packetcount, maxBufferSize, carryonBufferSize; - uint32_t lastframeheader;// for moench to check for all the packets in last frame - char* tempchar = NULL; - - while(1){ - //variables that need to be checked/set before each acquisition - carryonBufferSize = 0; - maxBufferSize = bufferSize * numJobsPerThread; -#ifdef VERYDEBUG - cprintf(BLUE, "%d maxBufferSize:%d carryonBufferSize:%d\n", ithread,maxBufferSize,carryonBufferSize); -#endif - - //missing packets compensation in listening thread - if(tempchar) {delete [] tempchar;tempchar = NULL;} - if(myDetectorType != EIGER) - tempchar = new char[onePacketSize * ((packetsPerFrame/numListeningThreads) - 1)]; //gotthard: 1packet size, moench:39 packet size - else - maxBufferSize = 0; - - - while((1<pop(buffer[ithread]); -#ifdef FIFO_DEBUG - cprintf(BLUE,"%d listener popped from fifofree %x\n", ithread, (void*)(buffer[ithread])); -#endif - - - - //ensure udpsocket exists - if(udpSocket[ithread] == NULL){ - rc = 0; - cprintf(BLUE, "%d UDP Socket is NULL\n",ithread); - } - - - //normal listening - else if(!carryonBufferSize){ -#ifdef SOCKET_DEBUG - if(!ithread){ -#endif - rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize); - if(rc == EIGER_HEADER_LENGTH && myDetectorType == EIGER) { - while(rc == EIGER_HEADER_LENGTH){ - rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize); - } - } - expected = maxBufferSize; -#ifdef SOCKET_DEBUG - }else{ - while(1) usleep(100000000); - } -#endif - } - - - //the remaining packets from previous buffer, copy it and listen to n less frame - else{ -#ifdef VERYDEBUG - cprintf(BLUE, "%d carry on buffer size:%d\n",ithread,carryonBufferSize); - cprintf(BLUE, "%d framennum in tempchar:%d\n",((((uint32_t)(*((uint32_t*)tempchar))) - & (frameIndexMask)) >> frameIndexOffset)); - cprintf(BLUE, "%d tempchar packet:%d\n", ((((uint32_t)(*((uint32_t*)(tempchar))))) - & (packetIndexMask))); -#endif - memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, tempchar, carryonBufferSize); - rc = udpSocket[ithread]->ReceiveDataOnly((buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + carryonBufferSize),maxBufferSize - carryonBufferSize); - expected = maxBufferSize - carryonBufferSize; - } - - -#ifdef EIGER_DEBUG - cprintf(BLUE, "%d rc: %d. expected: %d\n", ithread, rc, expected); -#endif - - - //start indices for each start of scan/acquisition - if((!measurementStarted) && (rc > 0)){ - pthread_mutex_lock(&progress_mutex); - if(!measurementStarted) - startFrameIndices(ithread, rc); - pthread_mutex_unlock(&progress_mutex); - } - - - //problem in receiving or end of acquisition - if (status == TRANSMITTING){ - stopListening(ithread,rc,packetcount,total); - continue; - } - - - //reset - packetcount = (packetsPerFrame/numListeningThreads) * numJobsPerThread; - carryonBufferSize = 0; - - - - //check if last packet valid and calculate packet count - switch(myDetectorType){ - case MOENCH: - lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); -#ifdef VERYDEBUG - cout <<"first packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (packetIndexMask)) << endl; - cout <<"first header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (frameIndexMask)) >> frameIndexOffset) << endl; - cout << "last packet offset:" << lastpacketoffset << endl; - cout <<"last packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask)) << endl; - cout <<"last header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset) << endl; -#endif - //moench last packet value is 0 - if( ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask))){ - lastframeheader = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset; - carryonBufferSize += onePacketSize; - lastpacketoffset -= onePacketSize; - --packetcount; - while (lastframeheader == (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset)){ - carryonBufferSize += onePacketSize; - lastpacketoffset -= onePacketSize; - --packetcount; - } - memcpy(tempchar, buffer[ithread]+(lastpacketoffset+onePacketSize), carryonBufferSize); -#ifdef VERYDEBUG - cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))) - & (frameIndexMask)) >> frameIndexOffset) << endl; - cout <<"tempchar packet:"<< ((((uint32_t)(*((uint32_t*)(tempchar))))) - & (packetIndexMask)) << endl; -#endif - } - break; - - case GOTTHARD: - case PROPIX: - if(shortFrame == -1){ - lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); -#ifdef VERYDEBUG - cprintf(BLUE, "%d last packet offset:%d\n",ithread, lastpacketoffset); -#endif - //if not last packet - if((unsigned int)(packetsPerFrame -1) != ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))+1) & (packetIndexMask))){ - memcpy(tempchar,buffer[ithread]+lastpacketoffset, onePacketSize); -#ifdef VERYDEBUG - cprintf(BLUE, "%d tempchar header:%d\n",ithread,(((((uint32_t)(*((uint32_t*)(tempchar))))+1) - & (frameIndexMask)) >> frameIndexOffset)); -#endif - carryonBufferSize = onePacketSize; - --packetcount; - } - } -#ifdef VERYDEBUG - cprintf(BLUE, "%d header:%d\n", (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) - & (frameIndexMask)) >> frameIndexOffset)); -#endif - break; - - - - case EIGER: - //because even headers might be included, so not packet count - (*((uint32_t*)(buffer[ithread]))) = rc; - packetcount = 1; - break; - - default: - break; - } - - - - //write packet count and push -#ifdef VERYDEBUG - cprintf(BLUE, "%d packetcount:%d carryonbuffer:%d\n", ithread, packetcount, carryonBufferSize); -#endif - if(myDetectorType != EIGER) - (*((uint32_t*)(buffer[ithread]))) = packetcount; - totalListeningFrameCount[ithread] += packetcount; -#ifdef VERYDEBUG - cprintf(BLUE,"%d listener going to push fifo: 0x%x\n", ithread,(void*)(buffer[ithread])); -#endif - while(!fifo[ithread]->push(buffer[ithread])); -#ifdef FIFO_DEBUG - cprintf(BLUE, "%d listener pushed into fifo %x\n",ithread, (void*)(buffer[ithread])); -#endif - - - - } - - sem_wait(&listensmp[ithread]); - - //make sure its not exiting thread - if(killAllListeningThreads){ - cout << ithread << " good bye listening thread" << endl; - if(tempchar) {delete [] tempchar;tempchar = NULL;} - pthread_exit(NULL); - } - - if(tempchar) {delete [] tempchar;tempchar = NULL;} - } - - return OK; -} - - - - - - - @@ -1946,16 +1978,10 @@ int UDPStandardImplementation::startListening(){ int UDPStandardImplementation::startWriting(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG(logDEBUG1) << __AT__ << " called"; - int ithread = currentWriterThreadIndex; -#ifdef VERYVERBOSE - cprintf(GREEN,"%d In startWriting()\n", ithread); -#endif - thread_started = 1; - char* wbuf[numListeningThreads];//interleaved char *d=new char[bufferSize*numListeningThreads]; int xmax=0,ymax=0; int ret,i,j; @@ -2468,66 +2494,9 @@ int UDPStandardImplementation::startWriting(){ } } -#ifdef VERYVERBOSE - cprintf(GREEN,"%d gonna wait for 1st sem\n", ithread); -#endif - //wait - sem_wait(&writersmp[ithread]); - if(killAllWritingThreads){ - for(i=0;i> frameIndexOffset); - else - startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS)))) - & (frameIndexMask)) >> frameIndexOffset); - - - //start of acquisition - if(!acqStarted){ - startAcquisitionIndex=startFrameIndex; - //currframenum = startAcquisitionIndex; - acqStarted = true; - cprintf(BLUE,"%d startAcquisitionIndex:%d\n", ithread, startAcquisitionIndex); - } - - cprintf(BLUE,"%d startFrameIndex: %d\n", ithread,startFrameIndex); - prevframenum=startFrameIndex-1; //so that there is no packet loss, when currframenum(max,20) - prevframenum(1) - measurementStarted = true; - -} - - - -void UDPStandardImplementation::stopListening(int ithread, int rc, int &pc, int &t){ - FILE_LOG(logDEBUG) << __AT__ << " called"; - - - int i; - -#ifdef VERYVERBOSE - cprintf(BLUE, "%d Stop Listening\n", ithread); -#endif - - - if(status != TRANSMITTING){ - cprintf(BG_RED,"%d *** udp socket not shut down from client ***********************\n", ithread); - while(!fifoFree[ithread]->push(buffer[ithread])); - exit(-1); - } - - - //free buffer - if(rc <= 0){ - cprintf(BLUE,"%d End of acquisition for Listening Thread\n", ithread); - while(!fifoFree[ithread]->push(buffer[ithread])); -#ifdef FIFO_DEBUG - cprintf(BLUE,"%d listener empty buffer pushed into fifofree %x\n", ithread, (void*)(buffer[ithread])); -#endif - } - - - //push the last buffer into fifo - else{ - if(myDetectorType == EIGER){ - (*((uint32_t*)(buffer[ithread]))) = rc; - pc = 1; - }else{ - pc = (rc/onePacketSize); - (*((uint32_t*)(buffer[ithread]))) = pc; - } -#ifdef VERYDEBUG - cprintf(BLUE,"%d last rc:%d\n",ithread, rc); - cprintf(BLUE,"%d last packetcount:%d\n", ithread, pc); -#endif - - totalListeningFrameCount[ithread] += pc; - while(!fifo[ithread]->push(buffer[ithread])); -#ifdef FIFO_DEBUG - cprintf(BLUE,"%d listener last buffer pushed into fifo %x\n", ithread,(void*)(buffer[ithread])); -#endif - } - - - - - //push dummy buffer to all writer threads - for(i=0;ipop(buffer[ithread]); -#ifdef FIFO_DEBUG - cprintf(BLUE,"%d listener popped dummy buffer from fifofree %x\n", ithread,(void*)(buffer[ithread])); -#endif - (*((uint32_t*)(buffer[ithread]))) = 0x0; -#ifdef VERYDEBUG - cprintf(BLUE,"%d dummy buffer num packets:%d\n", ithread(*((uint16_t*)(buffer[ithread])))); -#endif - while(!fifo[ithread]->push(buffer[ithread])); -#ifdef FIFO_DEBUG - cprintf(BLUE,"%d listener pushed dummy buffer into fifo %x\n", ithread,(void*)(buffer[ithread])); -#endif - } - - - - //reset mask and exit loop - pthread_mutex_lock(&status_mutex); - listeningthreads_mask^=(1< 1) - cprintf(BLUE,"%d Waiting for listening to be done.. current mask:0x%x\n", ithread, listeningthreads_mask); -#endif - while(listeningthreads_mask) - usleep(5000); -#ifdef VERYDEBUG - t = 0; - for(i=0;i