diff --git a/slsReceiverSoftware/include/DataProcessor.h b/slsReceiverSoftware/include/DataProcessor.h index 1e18616c5..aae292a2d 100644 --- a/slsReceiverSoftware/include/DataProcessor.h +++ b/slsReceiverSoftware/include/DataProcessor.h @@ -38,10 +38,34 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject { static uint64_t GetErrorMask(); /** - * Reset RunningMask + * Get acquisition started flag + * @return acquisition started flag */ - static void ResetRunningMask(); + static bool GetAcquisitionStartedFlag(); + /** + * Get measurement started flag + * @return measurement started flag + */ + static bool GetMeasurementStartedFlag(); + + /** + * Get Total Complete Frames Caught for an entire acquisition (including all scans) + * @return total number of frames caught for entire acquisition + */ + uint64_t GetNumTotalFramesCaught(); + + /** + * Get Frames Complete Caught for each real time acquisition (eg. for each scan) + * @return number of frames caught for each scan + */ + uint64_t GetNumFramesCaught(); + + /** + * Get Current Frame Index thats been processed for an entire acquisition (including all scans) + * @return -1 if no frames have been caught, else current frame index (represents all scans too) + */ + uint64_t GetProcessedAcquisitionIndex(); /** * Set bit in RunningMask to allow thread to run @@ -59,6 +83,26 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject { */ void SetFifo(Fifo*& f); + /** + * Reset parameters for new acquisition (including all scans) + */ + void ResetParametersforNewAcquisition(); + + /** + * Reset parameters for new measurement (eg. for each scan) + */ + void ResetParametersforNewMeasurement(); + + /** + * Create New File + */ + int CreateNewFile(); + + /** + * Closes file + */ + void CloseFile(); + private: /** @@ -99,6 +143,30 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject { /** Fifo structure */ Fifo* fifo; + + + // individual members + /** Aquisition Started flag */ + static bool acquisitionStartedFlag; + + /** Measurement Started flag */ + static bool measurementStartedFlag; + + /**Number of complete frames caught for an entire acquisition (including all scans) */ + uint64_t numTotalFramesCaught; + + /** Number of complete frames caught for each real time acquisition (eg. for each scan) */ + uint64_t numFramesCaught; + + /** Frame Number of First Frame of an entire Acquisition (including all scans) */ + uint64_t firstAcquisitionIndex; + + /** Frame Number of First Frame for each real time acquisition (eg. for each scan) */ + uint64_t firstMeasurementIndex; + + /** Frame Number of latest processed frame number of an entire Acquisition (including all scans) */ + uint64_t currentFrameIndex; + }; #endif diff --git a/slsReceiverSoftware/include/GeneralData.h b/slsReceiverSoftware/include/GeneralData.h index 5a38e8095..6e936fa6e 100644 --- a/slsReceiverSoftware/include/GeneralData.h +++ b/slsReceiverSoftware/include/GeneralData.h @@ -60,8 +60,18 @@ public: /** Default Fifo depth */ uint32_t defaultFifoDepth; + /** Threads per receiver */ + uint32_t threadsPerReceiver; + + /** Size of a header packet */ + uint32_t headerPacketSize; + /** Cosntructor */ - GeneralData(){}; + GeneralData(): + packetIndexMask(0), + packetIndexOffset(0), + threadsPerReceiver(1), + headerPacketSize(0){}; /** Destructor */ virtual ~GeneralData(){}; @@ -85,6 +95,24 @@ public: packetNumber = frameNumber&packetIndexMask; frameNumber = (frameNumber & frameIndexMask) >> frameIndexOffset; } + + /** + * Setting dynamic range changes member variables + * @param dr dynamic range + * @param tgEnable true if 10GbE is enabled, else false + */ + virtual void SetDynamicRange(int dr, bool tgEnable) { + //This is a generic function that is overloaded by a dervied class + }; + + /** + * Setting ten giga enable changes member variables + * @param tgEnable true if 10GbE is enabled, else false + * @param dr dynamic range + */ + virtual void SetTenGigaEnable(bool tgEnable, int dr) { + //This is a generic function that is overloaded by a dervied class + }; }; @@ -105,7 +133,7 @@ class GotthardData : public GeneralData { packetIndexMask = 1; maxFramesPerFile = MAX_FRAMES_PER_FILE; fifoBufferSize = packetSize*packetsPerFrame; - fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS; + fifoBufferHeaderSize= FIFO_BUFFER_HEADER_SIZE; defaultFifoDepth = 25000; }; }; @@ -126,7 +154,7 @@ class ShortGotthardData : public GeneralData { frameIndexMask = 0xFFFFFFFF; maxFramesPerFile = SHORT_MAX_FRAMES_PER_FILE; fifoBufferSize = packetSize*packetsPerFrame; - fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS; + fifoBufferHeaderSize= FIFO_BUFFER_HEADER_SIZE; defaultFifoDepth = 25000; }; }; @@ -154,7 +182,7 @@ class PropixData : public GeneralData { packetIndexMask = 1; maxFramesPerFile = MAX_FRAMES_PER_FILE; fifoBufferSize = packetSize*packetsPerFrame; - fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS; + fifoBufferHeaderSize= FIFO_BUFFER_HEADER_SIZE; defaultFifoDepth = 25000; }; }; @@ -180,7 +208,7 @@ class Moench02Data : public GeneralData { packetIndexMask = 0xFF; maxFramesPerFile = MOENCH_MAX_FRAMES_PER_FILE; fifoBufferSize = packetSize*packetsPerFrame; - fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS + FILE_FRAME_HEADER_LENGTH; + fifoBufferHeaderSize= FIFO_BUFFER_HEADER_SIZE + FILE_FRAME_HEADER_SIZE; defaultFifoDepth = 2500; }; }; @@ -206,7 +234,7 @@ class Moench03Data : public GeneralData { packetIndexMask = 0xFFFFFFFF; maxFramesPerFile = JFRAU_MAX_FRAMES_PER_FILE; fifoBufferSize = packetSize*packetsPerFrame; - fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS + FILE_FRAME_HEADER_LENGTH; + fifoBufferHeaderSize= FIFO_BUFFER_HEADER_SIZE + FILE_FRAME_HEADER_SIZE; defaultFifoDepth = 2500; }; }; @@ -229,7 +257,7 @@ class JCTBData : public GeneralData { imageSize = dataSize*packetsPerFrame; maxFramesPerFile = JFCTB_MAX_FRAMES_PER_FILE; fifoBufferSize = packetSize*packetsPerFrame; - fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS + FILE_FRAME_HEADER_LENGTH; + fifoBufferHeaderSize= FIFO_BUFFER_HEADER_SIZE + FILE_FRAME_HEADER_SIZE; defaultFifoDepth = 2500; }; }; @@ -261,13 +289,9 @@ private: packetSize = packetHeaderSize + dataSize; packetsPerFrame = 128; imageSize = dataSize*packetsPerFrame; - frameIndexMask = 0xffffff; - frameIndexOffset = 0; - packetIndexMask = 0; - packetIndexOffset = 0; maxFramesPerFile = JFRAU_MAX_FRAMES_PER_FILE; fifoBufferSize = packetSize*packetsPerFrame; - fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS + FILE_FRAME_HEADER_LENGTH; + fifoBufferHeaderSize= FIFO_BUFFER_HEADER_SIZE + FILE_FRAME_HEADER_SIZE; defaultFifoDepth = 2500; }; @@ -285,7 +309,7 @@ private: uint64_t& frameNumber, uint32_t& packetNumber, uint32_t& subFrameNumber, uint64_t bunchId) { subFrameNumber = 0; jfrau_packet_header_t* header = (jfrau_packet_header_t*)(packetData); - frameNumber = (*( (uint32_t*) header->frameNumber))&frameIndexMask; + frameNumber = (uint64_t)(*( (uint32_t*) header->frameNumber)); packetNumber = (uint32_t)(*( (uint8_t*) header->packetNumber)); bunchId = (*((uint64_t*) header->bunchid)); } @@ -323,17 +347,16 @@ private: nPixelsY = 256; dataSize = 1024; packetSize = 1040; - packetsPerFrame = 1; + packetsPerFrame = 256; imageSize = dataSize*packetsPerFrame; - frameIndexMask = 0xFFFFFFFF; - frameIndexOffset = 0; - packetIndexMask = 0; - packetIndexOffset = 0; + frameIndexMask = 0xffffff; maxFramesPerFile = EIGER_MAX_FRAMES_PER_FILE; fifoBufferSize = packetSize*packetsPerFrame; - fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS + FILE_FRAME_HEADER_LENGTH; + fifoBufferHeaderSize= FIFO_BUFFER_HEADER_SIZE + FILE_FRAME_HEADER_SIZE; defaultFifoDepth = 100; footerOffset = packetHeaderSize+dataSize; + threadsPerReceiver = 2; + headerPacketSize = 48; }; /** @@ -351,13 +374,38 @@ private: bunchId = 0; subFrameNumber = 0; eiger_packet_footer_t* footer = (eiger_packet_footer_t*)(packetData + footerOffset); - frameNumber = (uint64_t)(*( (uint64_t*) footer)); - packetNumber = (*( (uint16_t*) footer->packetNumber))-1; + frameNumber = (uint64_t)((*( (uint64_t*) footer)) & frameIndexMask); + packetNumber = (uint32_t)(*( (uint16_t*) footer->packetNumber))-1; if (dynamicRange == 32) { eiger_packet_header_t* header = (eiger_packet_header_t*) (packetData); - subFrameNumber = *( (uint32_t*) header->subFrameNumber); + subFrameNumber = (uint64_t) *( (uint32_t*) header->subFrameNumber); } } + + /** + * Setting dynamic range changes member variables + * @param dr dynamic range + * @param tgEnable true if 10GbE is enabled, else false + */ + void SetDynamicRange(int dr, bool tgEnable) { + packetsPerFrame = (tgEnable ? 4 : 16) * dr; + imageSize = dataSize*packetsPerFrame; + fifoBufferSize = packetSize*packetsPerFrame; + } + + /** + * Setting ten giga enable changes member variables + * @param tgEnable true if 10GbE is enabled, else false + * @param dr dynamic range + */ + void SetTenGigaEnable(bool tgEnable, int dr) { + dataSize = (tgEnable ? 4096 : 1024); + packetSize = (tgEnable ? 4112 : 1040);; + packetsPerFrame = (tgEnable ? 4 : 16) * dr; + imageSize = dataSize*packetsPerFrame; + fifoBufferSize = packetSize*packetsPerFrame; + footerOffset = packetHeaderSize+dataSize; + }; }; diff --git a/slsReceiverSoftware/include/Listener.h b/slsReceiverSoftware/include/Listener.h index 88e381f16..9cc77bb46 100644 --- a/slsReceiverSoftware/include/Listener.h +++ b/slsReceiverSoftware/include/Listener.h @@ -13,6 +13,7 @@ #include "ThreadObject.h" class Fifo; +class genericSocket; class Listener : private virtual slsReceiverDefs, public ThreadObject { @@ -38,10 +39,27 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject { static uint64_t GetErrorMask(); /** - * Reset RunningMask + * Get acquisition started flag + * @return acquisition started flag */ - static void ResetRunningMask(); + static bool GetAcquisitionStartedFlag(); + /** + * Get measurement started flag + * @return measurement started flag + */ + static bool GetMeasurementStartedFlag(); + + /** + * Get Total Packets caught in an acquisition + * @return Total Packets caught in an acquisition + */ + uint64_t GetTotalPacketsCaught(); + + /** + * Get number of bytes currently received in udp buffer + */ + uint64_t GetNumReceivedinUDPBuffer(); /** * Set bit in RunningMask to allow thread to run @@ -59,6 +77,33 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject { */ void SetFifo(Fifo*& f); + /** + * Reset parameters for new acquisition (including all scans) + */ + void ResetParametersforNewAcquisition(); + + /** + * Reset parameters for new measurement (eg. for each scan) + */ + void ResetParametersforNewMeasurement(); + + /** + * Creates UDP Sockets + * @param portnumber udp port number + * @param packetSize size of one packet + * @param eth ethernet interface or null + * @param headerPacketSize size of a header packet + * @return OK or FAIL + */ + int CreateUDPSockets(uint32_t portnumber, uint32_t packetSize, const char* eth, uint32_t headerPacketSize); + + /** + * Shuts down and deletes UDP Sockets + */ + void ShutDownUDPSocket(); + + + private: @@ -82,7 +127,6 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject { void ThreadExecution(); - /** type of thread */ static const std::string TypeName; @@ -101,8 +145,28 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject { /** Fifo structure */ Fifo* fifo; - int count; + // individual members + /** Aquisition Started flag */ + static bool acquisitionStartedFlag; + + /** Measurement Started flag */ + static bool measurementStartedFlag; + + /**Number of complete Packets caught for an entire acquisition (including all scans) */ + uint64_t numTotalPacketsCaught; + + /** Number of complete Packets caught for each real time acquisition (eg. for each scan) */ + uint64_t numPacketsCaught; + + /** Frame Number of First Frame of an entire Acquisition (including all scans) */ + uint64_t firstAcquisitionIndex; + + /** Frame Number of First Frame for each real time acquisition (eg. for each scan) */ + uint64_t firstMeasurementIndex; + + /** UDP Sockets - Detector to Receiver */ + genericSocket* udpSocket; }; #endif diff --git a/slsReceiverSoftware/include/UDPBaseImplementation.h b/slsReceiverSoftware/include/UDPBaseImplementation.h index fdd8e5886..4d46b68dc 100644 --- a/slsReceiverSoftware/include/UDPBaseImplementation.h +++ b/slsReceiverSoftware/include/UDPBaseImplementation.h @@ -127,7 +127,7 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter /** * Get Current Frame Index for an entire acquisition (including all scans) - * @return current frame index (represents all scans too) + * @return -1 if no frames have been caught, else current frame index (represents all scans too) */ int64_t getAcquisitionIndex() const; @@ -456,9 +456,8 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter /** * Shuts down and deletes UDP Sockets - * @return OK or FAIL */ - int shutDownUDPSockets(); + void shutDownUDPSockets(); /** * Get the buffer-current frame read by receiver @@ -477,10 +476,9 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter void abort(); //FIXME: needed, isn't stopReceiver enough? /** - * Closes file / all files(if multiple files) - * @param ithread writer thread index + * Closes all files */ - void closeFile(int ithread = 0); + void closeFiles(); /** * Activate / Deactivate Receiver @@ -536,8 +534,6 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter detectorType myDetectorType; /** detector hostname */ char detHostname[MAX_STR_LENGTH]; - /** Number of Packets per Frame*/ - uint32_t packetsPerFrame; /** Acquisition Period */ uint64_t acquisitionPeriod; /** Acquisition Time */ @@ -587,16 +583,6 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter /** Data Compression Enable - save only hits */ bool dataCompressionEnable; - //***acquisition count parameters*** - /** Total packets caught for an entire acquisition (including all scans) */ - uint64_t totalPacketsCaught; - /** Packets Caught for each real time acquisition (eg. for each scan) */ - uint64_t packetsCaught; - - //***acquisition indices parameters*** - /** Actual current frame index of an entire acquisition (including all scans) */ - uint64_t acquisitionIndex; - //***acquisition parameters*** /* Short Frame Enable or index of adc enabled, else -1 if all enabled (gotthard specific) TODO: move to setROI */ int shortFrameEnable; @@ -609,8 +595,6 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter static const int DEFAULT_STREAMING_TIMER = 500; - - //***callback parameters*** /** * function being called back for start acquisition diff --git a/slsReceiverSoftware/include/UDPInterface.h b/slsReceiverSoftware/include/UDPInterface.h index 315f12bec..a267d39fa 100644 --- a/slsReceiverSoftware/include/UDPInterface.h +++ b/slsReceiverSoftware/include/UDPInterface.h @@ -188,7 +188,7 @@ class UDPInterface { /** * Get Current Frame Index for an entire acquisition (including all scans) - * @return current frame index (represents all scans too) or -1 if no packets caught + * @return -1 if no frames have been caught, else current frame index (represents all scans too) or -1 if no packets caught */ virtual int64_t getAcquisitionIndex() const = 0; @@ -513,9 +513,8 @@ class UDPInterface { /** * Shuts down and deletes UDP Sockets - * @return OK or FAIL */ - virtual int shutDownUDPSockets() = 0; + virtual void shutDownUDPSockets() = 0; /** * Get the buffer-current frame read by receiver @@ -534,10 +533,9 @@ class UDPInterface { virtual void abort() = 0; //FIXME: needed, isnt stopReceiver enough? /** - * Closes file / all files(if multiple files) - * @param ithread writer thread index + * Closes all files */ - virtual void closeFile(int ithread = 0) = 0; + virtual void closeFiles() = 0; /** * Activate / Deactivate Receiver diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index c4fc66faf..cafa4bdcc 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -35,17 +35,9 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase */ virtual ~UDPStandardImplementation(); - //*** initial parameters (behavioral)*** - /** - * Set receiver type (and corresponding detector variables in derived STANDARD class) - * It is the first function called by the client when connecting to receiver - * @param d detector type - * @return OK or FAIL - */ - int setDetectorType(const detectorType d); - //*** Getters *** + //*** Overloaded Functions called by TCP Interface *** /** * Get Total Frames Caught for an entire acquisition (including all scans) * @return total number of frames caught for entire acquisition @@ -58,11 +50,12 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase */ uint64_t getFramesCaught() const; + /** + * Get Current Frame Index for an entire acquisition (including all scans) + * @return -1 if no frames have been caught, else current frame index (represents all scans too) + */ + int64_t getAcquisitionIndex() const; - - //*** Setters *** - - //*** file parameters *** /** * Set File Name Prefix (without frame index, file index and extension (_f000000000000_8.raw)) * Does not check for file existence since it is created only at startReceiver @@ -70,7 +63,6 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase */ void setFileName(const char c[]); - //*** acquisition parameters *** /** * Set Short Frame Enabled, later will be moved to getROI (so far only for gotthard) * @param i index of adc enabled, else -1 if all enabled @@ -99,9 +91,93 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase */ int setAcquisitionPeriod(const uint64_t i); - //*** Behavioral functions *** + /** + * Set Acquisition Time + * @param i acquisition time + * @return OK or FAIL + */ + int setAcquisitionTime(const uint64_t i); + /** + * Set Number of Frames expected by receiver from detector + * The data receiver status will change from running to idle when it gets this number of frames + * @param i number of frames expected + * @return OK or FAIL + */ + int setNumberOfFrames(const uint64_t i); + /** + * Set Dynamic Range or Number of Bits Per Pixel + * @param i dynamic range that is 4, 8, 16 or 32 + * @return OK or FAIL + */ + int setDynamicRange(const uint32_t i); + + /** + * Set Ten Giga Enable + * @param b true if 10GbE enabled, else false (1G enabled) + * @return OK or FAIL + */ + int setTenGigaEnable(const bool b); + + /** + * Set Fifo Depth + * @param i fifo depth value + * @return OK or FAIL + */ + int setFifoDepth(const uint32_t i); + + /** + * Set receiver type (and corresponding detector variables in derived STANDARD class) + * It is the first function called by the client when connecting to receiver + * @param d detector type + * @return OK or FAIL + */ + int setDetectorType(const detectorType d); + + /** + * Reset acquisition parameters such as total frames caught for an entire acquisition (including all scans) + */ + void resetAcquisitionCount(); + + /** + * Start Listening for Packets by activating all configuration settings to receiver + * When this function returns, it has status RUNNING(upon SUCCESS) or IDLE (upon failure) + * @param c error message if FAIL + * @return OK or FAIL + */ + int startReceiver(char *c=NULL); + + /** + * Stop Listening for Packets + * Calls startReadout(), which stops listening and sets status to Transmitting + * When it has read every frame in buffer, the status changes to Run_Finished + * When this function returns, receiver has status IDLE + * Pre: status is running, semaphores have been instantiated, + * Post: udp sockets shut down, status is idle, semaphores destroyed + */ + void stopReceiver(); + + /** + * Stop Listening to Packets + * and sets status to Transmitting + * Next step would be to get all data and stop receiver completely and return with idle state + * Pre: status is running, udp sockets have been initialized, stop receiver initiated + * Post:udp sockets closed, status is transmitting + */ + void startReadout(); + + /** + * Shuts down and deletes UDP Sockets + * also called in case of illegal shutdown of receiver + */ + void shutDownUDPSockets(); + + /** + * Closes file / all files(data compression involves multiple files) + * TCPIPInterface can also call this in case of illegal shutdown of receiver + */ + void closeFiles(); private: @@ -127,6 +203,31 @@ private: */ int SetupFifoStructure(); + /** + * Reset parameters for new measurement (eg. for each scan) + */ + void ResetParametersforNewMeasurement(); + + /** + * Creates UDP Sockets + * @return OK or FAIL + */ + int CreateUDPSockets(); + + /** + * Creates the first file + * also does the startAcquisitionCallBack + * @return OK or FAIL + */ + int SetupWriter(); + + /** + * Start Running + * Set running mask and post semaphore of the threads + * to start the inner loop in execution thread + */ + void StartRunning(); + //*** Class Members *** diff --git a/slsReceiverSoftware/include/receiver_defs.h b/slsReceiverSoftware/include/receiver_defs.h index f4a9dee04..f215e430c 100755 --- a/slsReceiverSoftware/include/receiver_defs.h +++ b/slsReceiverSoftware/include/receiver_defs.h @@ -4,21 +4,33 @@ #include "sls_receiver_defs.h" #include - -#define GOODBYE -200 - -//local network parameters -#define RECEIVE_SOCKET_BUFFER_SIZE (100*1024*1024) -#define MAX_SOCKET_INPUT_PACKET_QUEUE 250000 +//socket +#define GOODBYE -200 +#define RECEIVE_SOCKET_BUFFER_SIZE (100*1024*1024) +#define MAX_SOCKET_INPUT_PACKET_QUEUE 250000 //files -#define DO_NOTHING 0 -#define CREATE_FILES 1 -#define DO_EVERYTHING 2 +#define DO_NOTHING 0 +#define CREATE_FILES 1 +#define DO_EVERYTHING 2 +//binary +#define FILE_FRAME_HEADER_SIZE 16 + +//fifo +#define FIFO_BUFFER_HEADER_SIZE 4 + +//parameters to calculate fifo depth +#define SAMPLE_TIME_IN_NS 100000000//100ms +#define MAX_JOBS_PER_THREAD 1000 + +#define DUMMY_PACKET_VALUE 0xFFFFFFFF; + + + +/* //binary #define FILE_BUF_SIZE (16*1024*1024) //16mb -#define FILE_FRAME_HEADER_LENGTH 16 #define FILE_HEADER_BUNCHID_OFFSET 8 //hdf5 @@ -30,8 +42,6 @@ #define HEADER_SIZE_NUM_TOT_PACKETS 4 -#define SAMPLE_TIME_IN_NS 100000000//100ms -#define MAX_JOBS_PER_THREAD 1000 #define ALL_MASK_32 0xFFFFFFFF @@ -142,43 +152,5 @@ #define EIGER_PACKET_INDEX_MASK 0x0 -//data structures -/** - * structure of an eiger packet header - * subframenum subframe number for 32 bit mode (already written by firmware) - * missingpacket explicitly put to 0xFF to recognize it in file read (written by software) - * portnum 0 for the first port and 1 for the second port (written by software to file) - * dynamicrange dynamic range or bits per pixel (written by software to file) - */ -typedef struct { - unsigned char subFrameNumber[4]; - unsigned char missingPacket[2]; - unsigned char portIndex[1]; - unsigned char dynamicRange[1]; -} eiger_packet_header_t; - -/** - * structure of an eiger packet footer - * framenum 48 bit frame number (already written by firmware) - * packetnum packet number (already written by firmware) - */ -typedef struct { - unsigned char frameNumber[6]; - unsigned char packetNumber[2]; -} eiger_packet_footer_t; - -/** - * structure of an jungfrau packet header - * empty header - * framenumber - * packetnumber - */ -typedef struct { - unsigned char emptyHeader[6]; - unsigned char reserved[4]; - unsigned char packetNumber[1]; - unsigned char frameNumber[3]; - unsigned char bunchid[8]; -} jfrau_packet_header_t; - +*/ #endif diff --git a/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h b/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h index 75165d214..c6a90e3bb 100644 --- a/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h +++ b/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h @@ -269,15 +269,6 @@ private: /** Lock Status if server locked to a client */ int lockStatus; - /** Short frame */ - int shortFrame; - - /** Packets per frame */ - int packetsPerFrame; - - /** Dynamic Range */ - int dynamicrange; - /** kill tcp server thread */ int killTCPServerThread; diff --git a/slsReceiverSoftware/src/DataProcessor.cpp b/slsReceiverSoftware/src/DataProcessor.cpp index c45b5c762..1253dad53 100644 --- a/slsReceiverSoftware/src/DataProcessor.cpp +++ b/slsReceiverSoftware/src/DataProcessor.cpp @@ -23,9 +23,21 @@ uint64_t DataProcessor::RunningMask(0x0); pthread_mutex_t DataProcessor::Mutex = PTHREAD_MUTEX_INITIALIZER; +bool DataProcessor::acquisitionStartedFlag(false); -DataProcessor::DataProcessor(Fifo*& f) : ThreadObject(NumberofDataProcessors), fifo(f) { - FILE_LOG(logDEBUG) << __AT__ << " called"; +bool DataProcessor::measurementStartedFlag(false); + + +DataProcessor::DataProcessor(Fifo*& f) : + ThreadObject(NumberofDataProcessors), + fifo(f), + numTotalFramesCaught(0), + numFramesCaught(0), + firstAcquisitionIndex(0), + firstMeasurementIndex(0), + currentFrameIndex(0) +{ + FILE_LOG (logDEBUG) << __AT__ << " called"; if(ThreadObject::CreateThread()){ pthread_mutex_lock(&Mutex); @@ -33,12 +45,12 @@ DataProcessor::DataProcessor(Fifo*& f) : ThreadObject(NumberofDataProcessors), f pthread_mutex_unlock(&Mutex); } NumberofDataProcessors++; - FILE_LOG(logDEBUG) << "Number of DataProcessors: " << NumberofDataProcessors << endl; + FILE_LOG (logDEBUG) << "Number of DataProcessors: " << NumberofDataProcessors << endl; } DataProcessor::~DataProcessor() { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; ThreadObject::DestroyThread(); NumberofDataProcessors--; } @@ -47,35 +59,55 @@ DataProcessor::~DataProcessor() { uint64_t DataProcessor::GetErrorMask() { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; return ErrorMask; } -void DataProcessor::ResetRunningMask() { - FILE_LOG(logDEBUG) << __AT__ << " called"; - pthread_mutex_lock(&Mutex); - RunningMask = 0x0; - pthread_mutex_unlock(&Mutex); +bool DataProcessor::GetAcquisitionStartedFlag(){ + FILE_LOG (logDEBUG) << __AT__ << " called"; + return acquisitionStartedFlag; +} + + +bool DataProcessor::GetMeasurementStartedFlag(){ + FILE_LOG (logDEBUG) << __AT__ << " called"; + return measurementStartedFlag; } /** non static functions */ - string DataProcessor::GetType(){ return TypeName; } +uint64_t DataProcessor::GetNumTotalFramesCaught() { + FILE_LOG (logDEBUG) << __AT__ << " called"; + return numTotalFramesCaught; +} + + +uint64_t DataProcessor::GetNumFramesCaught() { + FILE_LOG (logDEBUG) << __AT__ << " called"; + return numFramesCaught; +} + + +uint64_t DataProcessor::GetProcessedAcquisitionIndex() { + FILE_LOG (logDEBUG) << __AT__ << " called"; + return currentFrameIndex - firstAcquisitionIndex; +} + bool DataProcessor::IsRunning() { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; return ((1 << index) & RunningMask); } void DataProcessor::StartRunning() { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; pthread_mutex_lock(&Mutex); RunningMask |= (1<PopAddress(buffer); #ifdef FIFODEBUG cprintf(BLUE,"DataProcessor %d, pop 0x%p buffer:%s\n", index,(void*)(buffer),buffer); #endif + uint32_t numPackets = (uint32_t)(*((uint32_t*)buffer)); - if(!strcmp(buffer,"done")){ + if(numPackets == DUMMY_PACKET_VALUE){ + cprintf(GREEN,"DataProcessing %d: Got dummy value*****\n"); StopRunning(); + fifo->FreeAddress(buffer); + return; } + + uint64_t fnum; uint32_t pnum; uint32_t snum; uint64_t bcid; + GetHeaderInfo(index,buffer+generalData->fifoBufferHeaderSize,16,fnum,pnum,snum,bcid); + cprintf(GREEN,"DataProcessing %d: fnum:%lld, pnum:%d\n",(long long int)fnum, pnum); + fifo->FreeAddress(buffer); } + + +int DataProcessor::CreateNewFile() { + FILE_LOG (logDEBUG) << __AT__ << " called"; + //create file fileWriter.push_back(new BinaryFileWriter(fileName)) + return OK; +} + + + +void DataProcessor::CloseFile() { + FILE_LOG (logDEBUG) << __AT__ << " called"; + +} diff --git a/slsReceiverSoftware/src/DataStreamer.cpp b/slsReceiverSoftware/src/DataStreamer.cpp index aecfa227a..b5304ae88 100644 --- a/slsReceiverSoftware/src/DataStreamer.cpp +++ b/slsReceiverSoftware/src/DataStreamer.cpp @@ -20,8 +20,10 @@ uint64_t DataStreamer::RunningMask(0x0); pthread_mutex_t DataStreamer::Mutex = PTHREAD_MUTEX_INITIALIZER; -DataStreamer::DataStreamer() : ThreadObject(NumberofDataStreamers) { - FILE_LOG(logDEBUG) << __AT__ << " called"; +DataStreamer::DataStreamer() : + ThreadObject(NumberofDataStreamers) +{ + FILE_LOG (logDEBUG) << __AT__ << " called"; if(ThreadObject::CreateThread()){ pthread_mutex_lock(&Mutex); @@ -30,12 +32,12 @@ DataStreamer::DataStreamer() : ThreadObject(NumberofDataStreamers) { } NumberofDataStreamers++; - FILE_LOG(logDEBUG) << "Number of DataStreamers: " << NumberofDataStreamers << endl; + FILE_LOG (logDEBUG) << "Number of DataStreamers: " << NumberofDataStreamers << endl; } DataStreamer::~DataStreamer() { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; ThreadObject::DestroyThread(); NumberofDataStreamers--; } @@ -44,13 +46,13 @@ DataStreamer::~DataStreamer() { uint64_t DataStreamer::GetErrorMask() { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; return ErrorMask; } void DataStreamer::ResetRunningMask() { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; pthread_mutex_lock(&Mutex); RunningMask = 0x0; pthread_mutex_unlock(&Mutex); @@ -66,13 +68,13 @@ string DataStreamer::GetType(){ bool DataStreamer::IsRunning() { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; return ((1 << index) & RunningMask); } void DataStreamer::StartRunning() { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; pthread_mutex_lock(&Mutex); RunningMask |= (1< #include @@ -23,22 +24,32 @@ uint64_t Listener::RunningMask(0x0); pthread_mutex_t Listener::Mutex = PTHREAD_MUTEX_INITIALIZER; -Listener::Listener(Fifo*& f) : ThreadObject(NumberofListeners), fifo(f) { - FILE_LOG(logDEBUG) << __AT__ << " called"; +bool Listener::acquisitionStartedFlag(false); + +bool Listener::measurementStartedFlag(false); + +Listener::Listener(Fifo*& f) : + ThreadObject(NumberofListeners), + fifo(f), + numTotalPacketsCaught(0), + numPacketsCaught(0), + firstAcquisitionIndex(0), + firstMeasurementIndex(0) +{ + FILE_LOG (logDEBUG) << __AT__ << " called"; if(ThreadObject::CreateThread()){ pthread_mutex_lock(&Mutex); ErrorMask ^= (1<getCurrentTotalReceived(); +} + bool Listener::IsRunning() { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; return ((1 << index) & RunningMask); } void Listener::StartRunning() { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; pthread_mutex_lock(&Mutex); RunningMask |= (1<GetNewAddress(buffer); @@ -104,15 +160,50 @@ void Listener::ThreadExecution() { cprintf(GREEN,"Listener %d, pop 0x%p buffer:%s\n", index,(void*)(buffer),buffer); #endif - strcpy(buffer,"changed"); + int rc; - if(count == 3){ - strcpy(buffer,"done\0"); + while ((rc>0 && rc < generalData->packetSize)) { + rc = udpSocket->ReceiveDataOnly(buffer + generalData->fifoBufferHeaderSize,fifoBufferSize); + cprintf(BLUE,"Listening %d: rc: %d\n",index,rc); + uint64_t fnum; uint32_t pnum; uint32_t snum; uint64_t bcid; + GetHeaderInfo(index,buffer,16,fnum,pnum,snum,bcid); + cprintf(BLUE,"Listening %d: fnum:%lld, pnum:%d\n",(long long int)fnum, pnum); + *((uint32_t*)(buffer[ithread])) = (rc/generalData->packetSize); + } + + if(rc <=0 ){ + cprintf(BLUE,"Listening %d: Gonna send dummy value*****\n"); + (*((uint32_t*)buffer)) = DUMMY_PACKET_VALUE; StopRunning(); } fifo->PushAddress(buffer); - count++; } +int Listener::CreateUDPSockets(uint32_t portnumber, uint32_t packetSize, const char* eth, uint32_t headerPacketSize) { + FILE_LOG (logDEBUG) << __AT__ << " called"; + + udpSocket = new genericSocket(portnumber, genericSocket::UDP, packetSize, eth, headerPacketSize); + int iret = udpSocket->getErrorStatus(); + if(!iret){ + cout << "UDP port opened at port " << portnumber << endl; + }else{ + FILE_LOG(logERROR) << "Could not create UDP socket on port " << portnumber << " error: " << iret; + return FAIL; + } + return OK; +} + + +void Listener::ShutDownUDPSocket() { + FILE_LOG (logDEBUG) << __AT__ << " called"; + + if(udpSocket){ + udpSocket->ShutDownSocket(); + FILE_LOG(logINFO) << "Shut down UDP Socket " << index; + delete udpSocket; + udpSocket = 0; + } +} + diff --git a/slsReceiverSoftware/src/ThreadObject.cpp b/slsReceiverSoftware/src/ThreadObject.cpp index cc2fa1d5a..ac31fd476 100644 --- a/slsReceiverSoftware/src/ThreadObject.cpp +++ b/slsReceiverSoftware/src/ThreadObject.cpp @@ -17,20 +17,20 @@ ThreadObject::ThreadObject(int ind): alive(false), killThread(false), thread(0) { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; PrintMembers(); } ThreadObject::~ThreadObject() { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; DestroyThread(); } void ThreadObject::PrintMembers() { - FILE_LOG(logDEBUG) << __AT__ << " called"; - FILE_LOG(logDEBUG) << "Index : " << index + FILE_LOG (logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << "Index : " << index << "\nalive: " << alive << "\nkillThread: " << killThread << "\npthread: " << thread; @@ -38,7 +38,7 @@ void ThreadObject::PrintMembers() { void ThreadObject::DestroyThread() { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; if(alive){ killThread = true; sem_post(&semaphore); @@ -46,40 +46,40 @@ void ThreadObject::DestroyThread() { sem_destroy(&semaphore); killThread = false; alive = false; - FILE_LOG(logDEBUG) << GetType() << " thread with index " << index << " destroyed successfully."; + FILE_LOG (logDEBUG) << GetType() << " thread with index " << index << " destroyed successfully."; } } int ThreadObject::CreateThread() { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; if(alive){ - FILE_LOG(logERROR) << "Cannot create thread " << index << ". Already alive"; + FILE_LOG (logERROR) << "Cannot create thread " << index << ". Already alive"; return FAIL; } sem_init(&semaphore,1,0); killThread = false; if(pthread_create(&thread, NULL,StartThread, (void*) this)){ - FILE_LOG(logERROR) << "Could not create " << GetType() << " thread with index " << index; + FILE_LOG (logERROR) << "Could not create " << GetType() << " thread with index " << index; return FAIL; } alive = true; - FILE_LOG(logINFO) << GetType() << " thread " << index << " created successfully."; + FILE_LOG (logINFO) << GetType() << " thread " << index << " created successfully."; return OK; } void* ThreadObject::StartThread(void* thisPointer) { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; ((ThreadObject*)thisPointer)->RunningThread(); return thisPointer; } void ThreadObject::RunningThread() { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; while(true) { diff --git a/slsReceiverSoftware/src/UDPBaseImplementation.cpp b/slsReceiverSoftware/src/UDPBaseImplementation.cpp index d5e8c3524..b0b55b074 100644 --- a/slsReceiverSoftware/src/UDPBaseImplementation.cpp +++ b/slsReceiverSoftware/src/UDPBaseImplementation.cpp @@ -39,7 +39,6 @@ void UDPBaseImplementation::initializeMembers(){ //**detector parameters*** myDetectorType = GENERIC; strcpy(detHostname,""); - packetsPerFrame = 0; acquisitionPeriod = 0; acquisitionTime = 0; numberOfFrames = 0; @@ -70,13 +69,6 @@ void UDPBaseImplementation::initializeMembers(){ overwriteEnable = true; dataCompressionEnable = false; - //***acquisition count parameters*** - totalPacketsCaught = 0; - packetsCaught = 0; - - //***acquisition indices parameters*** - acquisitionIndex = 0; - //***acquisition parameters*** shortFrameEnable = -1; frameToGuiFrequency = 0; @@ -156,17 +148,11 @@ bool UDPBaseImplementation::getOverwriteEnable() const{ FILE_LOG(logDEBUG) << __ bool UDPBaseImplementation::getDataCompressionEnable() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return dataCompressionEnable;} /***acquisition count parameters***/ -uint64_t UDPBaseImplementation::getTotalFramesCaught() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return (totalPacketsCaught/packetsPerFrame);} +uint64_t UDPBaseImplementation::getTotalFramesCaught() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return 0;} -uint64_t UDPBaseImplementation::getFramesCaught() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return (packetsCaught/packetsPerFrame);} +uint64_t UDPBaseImplementation::getFramesCaught() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return 0;} -int64_t UDPBaseImplementation::getAcquisitionIndex() const{ - FILE_LOG(logDEBUG) << __AT__ << " starting"; - - if(!totalPacketsCaught) - return -1; - return acquisitionIndex; -} +int64_t UDPBaseImplementation::getAcquisitionIndex() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return -1;} /***connection parameters***/ @@ -469,8 +455,7 @@ void UDPBaseImplementation::initialize(const char *c){ void UDPBaseImplementation::resetAcquisitionCount(){ FILE_LOG(logDEBUG) << __AT__ << " starting"; - totalPacketsCaught = 0; - FILE_LOG(logINFO) << "totalPacketsCaught:" << totalPacketsCaught; + //overriden by resetting of new acquisition parameters } int UDPBaseImplementation::startReceiver(char *c){ @@ -489,12 +474,9 @@ void UDPBaseImplementation::startReadout(){ FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes"; } -int UDPBaseImplementation::shutDownUDPSockets(){ +void UDPBaseImplementation::shutDownUDPSockets(){ FILE_LOG(logWARNING) << __AT__ << " doing nothing..."; FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes"; - - //overridden functions might return FAIL - return OK; } void UDPBaseImplementation::readFrame(int ithread, char* c,char** raw, int64_t &startAcquisitionIndex, int64_t &startFrameIndex){ @@ -509,7 +491,7 @@ void UDPBaseImplementation::abort(){ FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes"; } -void UDPBaseImplementation::closeFile(int ithread){ +void UDPBaseImplementation::closeFiles(){ FILE_LOG(logWARNING) << __AT__ << " doing nothing..."; FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes"; } diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index aca9e872e..b6c9ded67 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -23,22 +23,22 @@ using namespace std; /** cosntructor & destructor */ UDPStandardImplementation::UDPStandardImplementation() { - FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG (logDEBUG) << __AT__ << " called"; InitializeMembers(); } -UDPStandardImplementation::~UDPStandardImplementation(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; +UDPStandardImplementation::~UDPStandardImplementation() { + FILE_LOG (logDEBUG) << __AT__ << " called"; DeleteMembers(); } void UDPStandardImplementation::DeleteMembers() { - FILE_LOG(logDEBUG) << __AT__ << " starting"; + FILE_LOG (logDEBUG) << __AT__ << " starting"; - if (generalData){ delete generalData; generalData=0;} + if (generalData) { delete generalData; generalData=0;} listener.clear(); dataProcessor.clear(); dataStreamer.clear(); @@ -47,8 +47,8 @@ void UDPStandardImplementation::DeleteMembers() { } -void UDPStandardImplementation::InitializeMembers(){ - FILE_LOG(logDEBUG) << __AT__ << " starting"; +void UDPStandardImplementation::InitializeMembers() { + FILE_LOG (logDEBUG) << __AT__ << " starting"; UDPBaseImplementation::initializeMembers(); acquisitionPeriod = SAMPLE_TIME_IN_NS; @@ -70,90 +70,40 @@ void UDPStandardImplementation::InitializeMembers(){ } -int UDPStandardImplementation::setDetectorType(const detectorType d) { - FILE_LOG(logDEBUG) << __AT__ << " starting"; +/*** Overloaded Functions called by TCP Interface ***/ - numThreads = EIGER_PORTS_PER_READOUT; - numberofJobs = 1; - //killing all threads, deleting members etc. +uint64_t UDPStandardImplementation::getTotalFramesCaught() const { + FILE_LOG (logDEBUG) << __AT__ << " starting"; + uint64_t sum = 0; + vector::const_iterator it; + for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) + sum += (*it)->GetNumTotalFramesCaught(); + return (sum/dataProcessor.size()); +} +uint64_t UDPStandardImplementation::getFramesCaught() const { + FILE_LOG (logDEBUG) << __AT__ << " starting"; + uint64_t sum = 0; + for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) + sum += (*it)->GetNumFramesCaught(); + return (sum/dataProcessor.size()); +} - //generalData = new GotthardData(); - - - - //only at start or changing parameters - for ( int i=0; i < 1; i++ ) {//numthreads; i++ ) { - - bool success = true; - fifo.push_back(new Fifo(1024*256,5, success)); - if (!success) cprintf(RED,"not successful\n"); - - - listener.push_back(new Listener(fifo[i])); - dataProcessor.push_back(new DataProcessor(fifo[i])); - //dataStreamer.push_back(new DataStreamer(fifo[i])); - - - //listener[i]->SetFifo(fifo[i]); - //dataProcessor[i]->SetFifo(fifo[i]); - - fileWriter.push_back(new BinaryFileWriter(fileName)); - - } - - - if (Listener::GetErrorMask() || DataProcessor::GetErrorMask()){ - cprintf(RED, "Error in creating threads\n"); - } - - - //start receiver functions - //create udp sockets - //create file - //reset status - //reset all masks - Listener::ResetRunningMask(); - DataProcessor::ResetRunningMask(); - //DataStreamer::ResetRunningMask(); - - - for( unsigned int i=0; i < listener.size();i++ ) { - listener[i]->StartRunning(); - dataProcessor[i]->StartRunning(); - listener[i]->Continue(); - dataProcessor[i]->Continue(); - } - - - // for (vector::iterator it = listener.begin(); it != listener.end(); ++it) { - //*it->StartRunning(); - - usleep (5 * 1000 * 1000); - - - SetLocalNetworkParameters(); - - return OK; +int64_t UDPStandardImplementation::getAcquisitionIndex() const { + FILE_LOG (logDEBUG) << __AT__ << " starting"; + //no data processed + if(!DataProcessor::GetAcquisitionStartedFlag()) + return -1; + uint64_t sum = 0; + for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) + sum += (*it)->GetProcessedAcquisitionIndex(); + return (sum/dataProcessor.size()); } -uint64_t UDPStandardImplementation::getTotalFramesCaught() const{ - FILE_LOG(logDEBUG) << __AT__ << " starting"; - //preventing divide by 0 using ternary operator - return (totalPacketsCaught/(packetsPerFrame*(listener.size()>0?listener.size():1))); -} - -uint64_t UDPStandardImplementation::getFramesCaught() const{ - FILE_LOG(logDEBUG) << __AT__ << " starting"; - //preventing divide by 0 using ternary operator - return (packetsCaught/(packetsPerFrame*(listener.size()>0?listener.size():1))); -} - - -void UDPStandardImplementation::setFileName(const char c[]){ - FILE_LOG(logDEBUG) << __AT__ << " starting"; +void UDPStandardImplementation::setFileName(const char c[]) { + FILE_LOG (logDEBUG) << __AT__ << " starting"; if (strlen(c)) { strcpy(fileName, c); //automatically update fileName in Filewriter (pointer) @@ -168,12 +118,12 @@ void UDPStandardImplementation::setFileName(const char c[]){ if (detindex == -1) detID = 0; } - FILE_LOG(logINFO) << "File name:" << fileName; + FILE_LOG (logINFO) << "File name:" << fileName; } -int UDPStandardImplementation::setShortFrameEnable(const int i){ - FILE_LOG(logDEBUG) << __AT__ << " called"; +int UDPStandardImplementation::setShortFrameEnable(const int i) { + FILE_LOG (logDEBUG) << __AT__ << " called"; if (myDetectorType != GOTTHARD) { cprintf(RED, "Error: Can not set short frame for this detector\n"); @@ -190,47 +140,47 @@ int UDPStandardImplementation::setShortFrameEnable(const int i){ else generalData = new GotthardData(); numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure - if(SetupFifoStructure() == FAIL) + if (SetupFifoStructure() == FAIL) return FAIL; } - FILE_LOG(logINFO) << "Short Frame Enable: " << shortFrameEnable; + FILE_LOG (logINFO) << "Short Frame Enable: " << shortFrameEnable; return OK; } -int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq){ - FILE_LOG(logDEBUG) << __AT__ << " called"; +int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq) { + FILE_LOG (logDEBUG) << __AT__ << " called"; - if (frameToGuiFrequency != freq){ + if (frameToGuiFrequency != freq) { frameToGuiFrequency = freq; //only the ones lisening to more than 1 frame at a time needs to change fifo structure switch (myDetectorType) { case GOTTHARD: case PROPIX: - if(SetupFifoStructure() == FAIL) + if (SetupFifoStructure() == FAIL) return FAIL; break; default: break; } } - FILE_LOG(logINFO) << "Frame to Gui Frequency: " << frameToGuiFrequency; + FILE_LOG (logINFO) << "Frame to Gui Frequency: " << frameToGuiFrequency; return OK; } -int UDPStandardImplementation::setDataStreamEnable(const bool enable){ - FILE_LOG(logDEBUG) << __AT__ << " called"; +int UDPStandardImplementation::setDataStreamEnable(const bool enable) { + FILE_LOG (logDEBUG) << __AT__ << " called"; if (dataStreamEnable != enable) { dataStreamEnable = enable; //data sockets have to be created again as the client ones are - if(dataStreamer.size()) + if (dataStreamer.size()) dataStreamer.clear(); - if(enable){ + if (enable) { for ( int i=0; i < numThreads; ++i ) { dataStreamer.push_back(new DataStreamer()); if (DataStreamer::GetErrorMask()) { @@ -240,13 +190,13 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable){ } } } - FILE_LOG(logINFO) << "Data Send to Gui: " << dataStreamEnable; + FILE_LOG (logINFO) << "Data Send to Gui: " << dataStreamEnable; return OK; } -int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i){ - FILE_LOG(logDEBUG) << __AT__ << " called"; +int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i) { + FILE_LOG (logDEBUG) << __AT__ << " called"; if (acquisitionPeriod != i) { acquisitionPeriod = i; @@ -255,27 +205,322 @@ int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i){ switch (myDetectorType) { case GOTTHARD: case PROPIX: - if(setupFifoStructure() == FAIL) + if (SetupFifoStructure() == FAIL) return FAIL; break; - case EIGER: - if (fileFormatType == BINARY) - for (int i=0; iSetDynamicRange(i,tengigaEnable); + + numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure + if (SetupFifoStructure() == FAIL) + return FAIL; + } + FILE_LOG (logINFO) << "Dynamic Range: " << dynamicRange; + return OK; +} + + +int UDPStandardImplementation::setTenGigaEnable(const bool b) { + FILE_LOG (logDEBUG) << __AT__ << " called"; + + if (tengigaEnable != b) { + tengigaEnable = b; + //side effects + generalData->SetTenGigaEnable(tengigaEnable,b); + + numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure + if (SetupFifoStructure() == FAIL) + return FAIL; + } + FILE_LOG (logINFO) << "Ten Giga: " << stringEnable(tengigaEnable); + return OK; +} + + +int UDPStandardImplementation::setFifoDepth(const uint32_t i) { + FILE_LOG (logDEBUG) << __AT__ << " called"; + + if (fifoDepth != i) { + fifoDepth = i; + + numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure + if (SetupFifoStructure() == FAIL) + return FAIL; + } + FILE_LOG (logINFO) << "Fifo Depth: " << i << endl; return OK; } +int UDPStandardImplementation::setDetectorType(const detectorType d) { + FILE_LOG (logDEBUG) << __AT__ << " starting"; -void UDPStandardImplementation::SetLocalNetworkParameters(){ + FILE_LOG (logDEBUG) << "Setting receiver type"; + + DeleteMembers(); + InitializeMembers(); + myDetectorType = d; + switch(myDetectorType) { + case GOTTHARD: + case PROPIX: + case MOENCH: + case EIGER: + case JUNGFRAUCTB: + case JUNGFRAU: + FILE_LOG (logINFO) << " ***** " << getDetectorType(d) << " Receiver *****"; + break; + default: + FILE_LOG (logERROR) << "This is an unknown receiver type " << (int)d; + return FAIL; + } + + + //set detector specific variables + switch(myDetectorType) { + case GOTTHARD: generalData = new GotthardData(); break; + case PROPIX: generalData = new PropixData(); break; + case MOENCH: generalData = new Moench02Data(); break; + case EIGER: generalData = new EigerData(); break; + case JUNGFRAUCTB: generalData = new JCTBData(); break; + case JUNGFRAU: generalData = new JungfrauData(); break; + default: break; + } + numThreads = generalData->threadsPerReceiver; + + + for ( int i=0; i < numThreads; ++i ) { + + //create fifo structure + numberofJobs = -1; + if (SetupFifoStructure() == FAIL) { + FILE_LOG (logERROR) << "Error: Could not allocate memory for fifo (index:" << i << ")"; + return FAIL; + } + + //create threads + listener.push_back(new Listener(fifo[i])); + dataProcessor.push_back(new DataProcessor(fifo[i])); + if (Listener::GetErrorMask() || DataProcessor::GetErrorMask()) { + FILE_LOG (logERROR) << "Error: Could not creates listener/dataprocessor threads (index:" << i << ")"; + return FAIL; + } + } + + //local network parameters + SetLocalNetworkParameters(); + + FILE_LOG (logDEBUG) << " Detector type set to " << getDetectorType(d); + return OK; +} + + + +void UDPStandardImplementation::resetAcquisitionCount() { + FILE_LOG (logDEBUG) << __AT__ << " starting"; + + for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) + (*it)->ResetParametersforNewAcquisition(); + + for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) + (*it)->ResetParametersforNewAcquisition(); + + FILE_LOG (logINFO) << "Acquisition Count has been reset"; +} + + + +int UDPStandardImplementation::startReceiver(char *c) { + FILE_LOG (logDEBUG) << __AT__ << " called"; + + ResetParametersforNewMeasurement(); + + if (CreateUDPSockets() == FAIL) { + strcpy(c,"Could not create UDP Socket(s)."); + FILE_LOG(logERROR) << c; + return FAIL; + } + + if(fileWriteEnable){ + if (SetupWriter() == FAIL) { + strcpy(c,"Could not create file."); + FILE_LOG(logERROR) << c; + return FAIL; + } + } + + //Let Threads continue to be ready for acquisition + StartRunning(); + + FILE_LOG(logINFO) << "Receiver Started"; + FILE_LOG(logINFO) << "Status: " << runStatusType(status); +} + + + +void UDPStandardImplementation::stopReceiver(){ FILE_LOG(logDEBUG) << __AT__ << " called"; + FILE_LOG(logINFO) << "Stopping Receiver"; + + //set status to transmitting + startReadout(); + + //wait until status is run_finished + while(status == TRANSMITTING){ + usleep(5000); + } + +/* //change status + pthread_mutex_lock(&statusMutex); + status = IDLE; + pthread_mutex_unlock(&(statusMutex)); +*/ + FILE_LOG(logINFO) << "Receiver Stopped"; + FILE_LOG(logINFO) << "Status: " << runStatusType(status); + cout << endl << endl; +} + + + +void UDPStandardImplementation::startReadout(){ + FILE_LOG(logDEBUG) << __AT__ << " called"; + + if(status == RUNNING){ + + //needs to wait for packets only if activated + if(activated){ + + //current packets caught + int totalP = 0,prev=-1; + for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) + totalP += (*it)->GetTotalPacketsCaught(); + + //current udp buffer received + int currentReceivedInBuffer=0,prevReceivedInBuffer=-1; + for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) + currentReceivedInBuffer += (*it)->GetNumReceivedinUDPBuffer(); + + //wait for all packets + if((unsigned long long int)totalP!=numberOfFrames*generalData->packetsPerFrame*listener.size()){ + + //wait as long as there is change from prev totalP, + //and also change from received in buffer to previous value + //(as one listens to many at a time, shouldnt cut off in between) + while((prev != totalP) || (prevReceivedInBuffer!= currentReceivedInBuffer)){ +#ifdef VERY_VERBOSE + cprintf(MAGENTA,"waiting for all packets prevP:%d totalP:%d PrevBuffer:%d currentBuffer:%d\n",prev,totalP,prevReceivedInBuffer,currentReceivedInBuffer); + +#endif + //usleep(2*1000*1000); + usleep(5*1000);/* Need to find optimal time **/ + + prev = totalP; + totalP = 0; + prevReceivedInBuffer = currentReceivedInBuffer; + currentReceivedInBuffer = 0; + + for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) { + totalP += (*it)->GetTotalPacketsCaught(); + currentReceivedInBuffer += (*it)->GetNumReceivedinUDPBuffer(); + } +#ifdef VERY_VERBOSE + cprintf(MAGENTA,"\tupdated: totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer); + +#endif + } + } + } + + /*//set status + pthread_mutex_lock(&statusMutex); + status = TRANSMITTING; + pthread_mutex_unlock(&statusMutex);*/ + + FILE_LOG(logINFO) << "Status: Transmitting"; + } + + //shut down udp sockets so as to make listeners push dummy (end) packets for processors + shutDownUDPSockets(); +} + + +void UDPStandardImplementation::shutDownUDPSockets() { + FILE_LOG (logDEBUG) << __AT__ << " called"; + for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) + (*it)->ShutDownUDPSocket(); +} + + + +void UDPStandardImplementation::closeFiles() { + FILE_LOG (logDEBUG) << __AT__ << " called"; + for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) + (*it)->CloseFile(); +} + + + + +void UDPStandardImplementation::SetLocalNetworkParameters() { + FILE_LOG (logDEBUG) << __AT__ << " called"; //to increase socket receiver buffer size and max length of input queue by changing kernel settings if (myDetectorType == EIGER) @@ -285,30 +530,30 @@ void UDPStandardImplementation::SetLocalNetworkParameters(){ //to increase Socket Receiver Buffer size sprintf(command,"echo $((%d)) > /proc/sys/net/core/rmem_max",RECEIVE_SOCKET_BUFFER_SIZE); - if (system(command)){ - FILE_LOG(logWARNING) << "No root permission to change Socket Receiver Buffer size (/proc/sys/net/core/rmem_max)"; + if (system(command)) { + FILE_LOG (logWARNING) << "No root permission to change Socket Receiver Buffer size (/proc/sys/net/core/rmem_max)"; return; } - FILE_LOG(logINFO) << "Socket Receiver Buffer size (/proc/sys/net/core/rmem_max) modified to " << RECEIVE_SOCKET_BUFFER_SIZE ; + FILE_LOG (logINFO) << "Socket Receiver Buffer size (/proc/sys/net/core/rmem_max) modified to " << RECEIVE_SOCKET_BUFFER_SIZE ; // to increase Max length of input packet queue sprintf(command,"echo %d > /proc/sys/net/core/netdev_max_backlog",MAX_SOCKET_INPUT_PACKET_QUEUE); - if (system(command)){ - FILE_LOG(logWARNING) << "No root permission to change Max length of input packet queue (/proc/sys/net/core/netdev_max_backlog)"; + if (system(command)) { + FILE_LOG (logWARNING) << "No root permission to change Max length of input packet queue (/proc/sys/net/core/netdev_max_backlog)"; return; } - FILE_LOG(logINFO) << "Max length of input packet queue (/proc/sys/net/core/netdev_max_backlog) modified to " << MAX_SOCKET_INPUT_PACKET_QUEUE ; + FILE_LOG (logINFO) << "Max length of input packet queue (/proc/sys/net/core/netdev_max_backlog) modified to " << MAX_SOCKET_INPUT_PACKET_QUEUE ; } -int UDPStandardImplementation::SetupFifoStructure(){ - FILE_LOG(logDEBUG) << __AT__ << " called"; +int UDPStandardImplementation::SetupFifoStructure() { + FILE_LOG (logDEBUG) << __AT__ << " called"; //recalculate number of jobs & fifodepth, return if no change - if ((myDetectortype == GOTTHARD) || (myDetectortype = PROPIX)) { + if ((myDetectorType == GOTTHARD) || (myDetectorType = PROPIX)) { int oldnumberofjobs = numberofJobs; //listen to only n jobs at a time @@ -316,38 +561,40 @@ int UDPStandardImplementation::SetupFifoStructure(){ numberofJobs = frameToGuiFrequency; else { //random freq depends on acquisition period/time (calculate upto 100ms/period) - i = ((acquisitionPeriod > 0) ? + int i = ((acquisitionPeriod > 0) ? (SAMPLE_TIME_IN_NS/acquisitionPeriod): ((acquisitionTime > 0) ? (SAMPLE_TIME_IN_NS/acquisitionTime) : SAMPLE_TIME_IN_NS)); //must be > 0 and < max jobs numberofJobs = ((i < 1) ? 1 : ((i > MAX_JOBS_PER_THREAD) ? MAX_JOBS_PER_THREAD : i)); } - FILE_LOG(logINFO) << "Number of Jobs Per Thread:" << numberofJobs << endl; - + FILE_LOG (logINFO) << "Number of Jobs Per Thread:" << numberofJobs << endl; uint32_t oldfifodepth = fifoDepth; //reduce fifo depth if numberofJobsPerBuffer > 1 (to save memory) - if(numberofJobsPerBuffer >1){ - fifoDepth = ((fifoDepth % numberofJobsPerBuffer) ? - ((fifoDepth/numberofJobsPerBuffer)+1) : //if not directly divisible - (fifoDepth/numberofJobsPerBuffer)); + if (numberofJobs >1) { + fifoDepth = ((fifoDepth % numberofJobs) ? + ((fifoDepth/numberofJobs)+1) : //if not directly divisible + (fifoDepth/numberofJobs)); } - FILE_LOG(logINFO) << "Total Fifo Size:" << fifoSize; + FILE_LOG (logINFO) << "Total Fifo Depth Recalculated:" << fifoDepth; //no change, return if ((oldnumberofjobs == numberofJobs) && (oldfifodepth == fifoDepth)) return OK; - } + }else + numberofJobs = 1; - //delete fifostructure + //create fifostructure fifo.clear(); for ( int i=0; i < numThreads; i++ ) { //create fifo structure bool success = true; - fifo.push_back( new Fifo ((generalData->fifoBufferSize) * numberofJobs + (generalData->fifoBufferHeaderSize), success)); - if (!success){ + fifo.push_back( new Fifo ( + (generalData->fifoBufferSize) * numberofJobs + (generalData->fifoBufferHeaderSize), + fifoDepth, success)); + if (!success) { cprintf(BG_RED,"Error: Could not allocate memory for listening \n"); return FAIL; } @@ -356,7 +603,82 @@ int UDPStandardImplementation::SetupFifoStructure(){ listener[i]->SetFifo(fifo[i]); dataProcessor[i]->SetFifo(fifo[i]); } - FILE_LOG(logINFO) << "Fifo structure(s) reconstructed"; + FILE_LOG (logINFO) << "Fifo structure(s) reconstructed"; return OK; } + + +void UDPStandardImplementation::ResetParametersforNewMeasurement() { + FILE_LOG (logDEBUG) << __AT__ << " called"; + for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) + (*it)->ResetParametersforNewMeasurement(); + for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) + (*it)->ResetParametersforNewMeasurement(); +} + + + +int UDPStandardImplementation::CreateUDPSockets() { + FILE_LOG (logDEBUG) << __AT__ << " called"; + + shutDownUDPSockets(); + + //if eth is mistaken with ip address + if (strchr(eth,'.') != NULL){ + strcpy(eth,""); + } + if(!strlen(eth)){ + FILE_LOG(logWARNING) << "eth is empty. Listening to all"; + } + bool error = false; + for (unsigned int i = 0; i < listener.size(); ++i) + if (listener[i]->CreateUDPSockets(udpPortNum[i], generalData->packetSize, + (strlen(eth)?eth:NULL), generalData->headerPacketSize) == FAIL) { + error = true; + break; + } + if (error) { + shutDownUDPSockets(); + return FAIL; + } + + FILE_LOG(logDEBUG) << "UDP socket(s) created successfully."; + cout << "Listener Ready ..." << endl; + return OK; +} + + +int UDPStandardImplementation::SetupWriter() { + FILE_LOG (logDEBUG) << __AT__ << " called"; + + bool error = false; + for (unsigned int i = 0; i < dataProcessor.size(); ++i) + if (dataProcessor[i]->CreateNewFile() == FAIL) { + error = true; + break; + } + if (error) { + shutDownUDPSockets(); + closeFiles(); + return FAIL; + } + + cout << "Writer Ready ..." << endl; + return OK; +} + + +void UDPStandardImplementation::StartRunning() { + FILE_LOG (logDEBUG) << __AT__ << " called"; + + //set running mask and post semaphore to start the inner loop in execution thread + for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) { + (*it)->StartRunning(); + (*it)->Continue(); + } + for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){ + (*it)->StartRunning(); + (*it)->Continue(); + } +} diff --git a/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp b/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp index eef507e93..a1e981c8d 100644 --- a/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp +++ b/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp @@ -32,9 +32,6 @@ slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int &success, UDPInterface* receiverBase(rbase), ret(OK), lockStatus(0), - shortFrame(-1), - packetsPerFrame(GOTTHARD_PACKETS_PER_FRAME), - dynamicrange(16), killTCPServerThread(0), tenGigaEnable(0), portNumber(DEFAULT_PORTNO+2), @@ -195,7 +192,7 @@ void slsReceiverTCPIPInterface::startTCPServer(){ receiverBase->shutDownUDPSockets(); cout << "Closing Files... " << endl; - receiverBase->closeFile(); + receiverBase->closeFiles(); } mySock->exitServer(); @@ -334,7 +331,7 @@ int slsReceiverTCPIPInterface::M_nofunc(){ void slsReceiverTCPIPInterface::closeFile(int p){ - receiverBase->closeFile(); + receiverBase->closeFiles(); } @@ -1098,11 +1095,6 @@ int slsReceiverTCPIPInterface::set_short_frame() { else{ receiverBase->setShortFrameEnable(index); retval = receiverBase->getShortFrameEnable(); - shortFrame = retval; - if(shortFrame==-1) - packetsPerFrame=GOTTHARD_PACKETS_PER_FRAME; - else - packetsPerFrame=GOTTHARD_SHORT_PACKETS_PER_FRAME; } } #endif @@ -1144,940 +1136,39 @@ int slsReceiverTCPIPInterface::read_frame(){ -int slsReceiverTCPIPInterface::moench_read_frame(){ - ret=OK; - char fName[MAX_STR_LENGTH]=""; - int acquisitionIndex = -1; - int frameIndex= -1; - int i; +int slsReceiverTCPIPInterface::moench_read_frame(){ return FAIL;} - int bufferSize = MOENCH_BUFFER_SIZE; - int rnel = bufferSize/(sizeof(int)); - int* retval = new int[rnel]; - int* origVal = new int[rnel]; - //all initialized to 0 - for(i=0;igetFramesCaught()){ - startAcquisitionIndex = -1; - cout<<"haven't caught any frame yet"<readFrame(0,fName,&raw,startAcquisitionIndex,startFrameIndex); - /**send garbage with -1 index to try again*/ - if (raw == NULL){ - startAcquisitionIndex = -1; -#ifdef VERYVERBOSE - cout<<"data not ready for gui yet"<> MOENCH_FRAME_INDEX_OFFSET); +int slsReceiverTCPIPInterface::propix_read_frame(){ return FAIL;} - uint32_t numPackets = MOENCH_PACKETS_PER_FRAME; //40 - uint32_t onePacketSize = MOENCH_DATA_BYTES / MOENCH_PACKETS_PER_FRAME; //1280*40 / 40 = 1280 - uint32_t packetDatabytes_row = onePacketSize * (MOENCH_BYTES_IN_ONE_ROW / MOENCH_BYTES_PER_ADC); //1280 * 4 = 5120 - uint32_t partsPerFrame = onePacketSize / MOENCH_BYTES_PER_ADC; // 1280 / 80 = 16 - uint32_t packetOffset = 0; - int packetIndex,x,y; - int iPacket = 0; - offset = 4; - while (iPacket < (int)numPackets){ -#ifdef VERYVERBOSE - printf("iPacket:%d\n",iPacket);cout << endl; -#endif - //if missing packets, dont send to gui - bindex = (*((uint32_t*)(((char*)origVal)+packetOffset))); - if (bindex == 0xFFFFFFFF){ - cout << "Missing Packet,Not sending to gui" << endl; - index = startAcquisitionIndex - 1; - break;//use continue and change index above if you want to display missing packets with 0 value anyway in gui - } - packetIndex = bindex & MOENCH_PACKET_INDEX_MASK; - //cout<<"packetIndex:"<= 40) && (packetIndex < 0)) - cout << "cannot decode packet index:" << packetIndex << endl; - else{ - x = packetIndex / 10; - y = packetIndex % 10; -#ifdef VERYVERBOSE - cout<<"x:"<differentClients){ - FILE_LOG(logDEBUG) << "Force update"; - ret=FORCE_UPDATE; - } - // send answer - mySock->SendDataOnly(&ret,sizeof(ret)); - if(ret==FAIL){ - cprintf(RED,"%s\n",mess); - mySock->SendDataOnly(mess,sizeof(mess)); - } - else{ - mySock->SendDataOnly(fName,MAX_STR_LENGTH); - mySock->SendDataOnly(&acquisitionIndex,sizeof(acquisitionIndex)); - mySock->SendDataOnly(&frameIndex,sizeof(frameIndex)); - mySock->SendDataOnly(retval,MOENCH_DATA_BYTES); - } - //return ok/fail - delete [] retval; - delete [] origVal; - delete [] raw; - return ret; - -} - - - - -int slsReceiverTCPIPInterface::gotthard_read_frame(){ - ret=OK; - char fName[MAX_STR_LENGTH]=""; - int acquisitionIndex = -1; - int frameIndex= -1; - int i; - - - //retval is a full frame - int bufferSize = GOTTHARD_BUFFER_SIZE; - int rnel = bufferSize/(sizeof(int)); - int* retval = new int[rnel]; - int* origVal = new int[rnel]; - //all initialized to 0 - for(i=0;igetFramesCaught()){ - startAcquisitionIndex=-1; - cout<<"haven't caught any frame yet"<readFrame(0,fName,&raw,startAcquisitionIndex,startFrameIndex); - - /**send garbage with -1 index to try again*/ - if (raw == NULL){ - startAcquisitionIndex = -1; -#ifdef VERYVERBOSE - cout<<"data not ready for gui yet"<> GOTTHARD_SHORT_FRAME_INDEX_OFFSET); -#ifdef VERYVERBOSE - cout << "index:" << hex << index << endl; -#endif - }else{ - bindex = ((uint32_t)(*((uint32_t*)raw)))+1; - pindex = (bindex & GOTTHARD_PACKET_INDEX_MASK); - index = ((bindex & GOTTHARD_FRAME_INDEX_MASK) >> GOTTHARD_FRAME_INDEX_OFFSET); - bindex2 = ((uint32_t)(*((uint32_t*)((char*)(raw+onebuffersize)))))+1; - pindex2 =(bindex2 & GOTTHARD_PACKET_INDEX_MASK); - index2 =((bindex2 & GOTTHARD_FRAME_INDEX_MASK) >> GOTTHARD_FRAME_INDEX_OFFSET); -#ifdef VERYVERBOSE - cout << "index1:" << hex << index << endl; - cout << "index2:" << hex << index << endl; -#endif - } - - memcpy(origVal,raw + HEADER_SIZE_NUM_TOT_PACKETS,bufferSize); - raw=NULL; - - - //1 adc - if(shortFrame!=-1){ - if(bindex != 0xFFFFFFFF) - memcpy((((char*)retval)+(GOTTHARD_SHORT_DATABYTES*shortFrame)),((char*) origVal)+4, GOTTHARD_SHORT_DATABYTES); - else{ - index = startAcquisitionIndex - 1; - cout << "Missing Packet,Not sending to gui" << endl; - } - } - //all adc - else{ - /*//ignore if half frame is missing - if ((bindex != 0xFFFFFFFF) && (bindex2 != 0xFFFFFFFF)){*/ - - //should be same frame - if (index == index2){ - //ideal situation (should be odd, even(index+1)) - if(!pindex){ - memcpy(retval,((char*) origVal)+4, onedatasize); - memcpy((((char*)retval)+onedatasize), ((char*) origVal)+10+onedatasize, onedatasize); - } - //swap to even,odd - else{ - memcpy((((char*)retval)+onedatasize),((char*) origVal)+4, onedatasize); - memcpy(retval, ((char*) origVal)+10+onedatasize, onedatasize); - index=index2; - } - }else - cout << "different frames caught. frame1:"<< hex << index << ":"<differentClients){ - FILE_LOG(logDEBUG) << "Force update"; - ret=FORCE_UPDATE; - } - - // send answer - mySock->SendDataOnly(&ret,sizeof(ret)); - if(ret==FAIL){ - cprintf(RED,"%s\n",mess); - mySock->SendDataOnly(mess,sizeof(mess)); - } - else{ - mySock->SendDataOnly(fName,MAX_STR_LENGTH); - mySock->SendDataOnly(&acquisitionIndex,sizeof(acquisitionIndex)); - mySock->SendDataOnly(&frameIndex,sizeof(frameIndex)); - mySock->SendDataOnly(retval,GOTTHARD_DATA_BYTES); - } - - delete [] retval; - delete [] origVal; - delete [] raw; - - return ret; -} - - - - - - - -int slsReceiverTCPIPInterface::propix_read_frame(){ - ret=OK; - char fName[MAX_STR_LENGTH]=""; - int acquisitionIndex = -1; - int frameIndex= -1; - int i; - - - //retval is a full frame - int bufferSize = PROPIX_BUFFER_SIZE; - int onebuffersize = bufferSize/PROPIX_PACKETS_PER_FRAME; - int onedatasize = PROPIX_DATA_BYTES; - - char* raw; - int rnel = bufferSize/(sizeof(int)); - int* retval = new int[rnel]; - int* origVal = new int[rnel]; - //all initialized to 0 - for(i=0;igetFramesCaught()){ - startAcquisitionIndex=-1; - cout<<"haven't caught any frame yet"<readFrame(0,fName,&raw,startAcquisitionIndex,startFrameIndex); - - /**send garbage with -1 index to try again*/ - if (raw == NULL){ - startAcquisitionIndex = -1; -#ifdef VERYVERBOSE - cout<<"data not ready for gui yet"<> PROPIX_FRAME_INDEX_OFFSET); - bindex2 = ((uint32_t)(*((uint32_t*)((char*)(raw+onebuffersize)))))+1; - pindex2 =(bindex2 & PROPIX_PACKET_INDEX_MASK); - index2 =((bindex2 & PROPIX_FRAME_INDEX_MASK) >> PROPIX_FRAME_INDEX_OFFSET); -#ifdef VERYVERBOSE - cout << "index1:" << hex << index << endl; - cout << "index2:" << hex << index << endl; -#endif - - memcpy(origVal,raw + HEADER_SIZE_NUM_TOT_PACKETS,bufferSize); - raw=NULL; - - /*//ignore if half frame is missing - if ((bindex != 0xFFFFFFFF) && (bindex2 != 0xFFFFFFFF)){*/ - - //should be same frame - if (index == index2){ - //ideal situation (should be odd, even(index+1)) - if(!pindex){ - memcpy(retval,((char*) origVal)+4, onedatasize); - memcpy((((char*)retval)+onedatasize), ((char*) origVal)+10+onedatasize, onedatasize); - } - //swap to even,odd - else{ - memcpy((((char*)retval)+onedatasize),((char*) origVal)+4, onedatasize); - memcpy(retval, ((char*) origVal)+10+onedatasize, onedatasize); - index=index2; - } - }else - cout << "different frames caught. frame1:"<< hex << index << ":"<differentClients){ - FILE_LOG(logDEBUG) << "Force update"; - ret=FORCE_UPDATE; - } - - // send answer - mySock->SendDataOnly(&ret,sizeof(ret)); - if(ret==FAIL){ - cprintf(RED,"%s\n",mess); - mySock->SendDataOnly(mess,sizeof(mess)); - } - else{ - mySock->SendDataOnly(fName,MAX_STR_LENGTH); - mySock->SendDataOnly(&acquisitionIndex,sizeof(acquisitionIndex)); - mySock->SendDataOnly(&frameIndex,sizeof(frameIndex)); - mySock->SendDataOnly(retval,PROPIX_DATA_BYTES); - } - - delete [] retval; - delete [] origVal; - delete [] raw; - - return ret; -} - - - - - - - - - - -int slsReceiverTCPIPInterface::eiger_read_frame(){ - ret=OK; -/* - char fName[MAX_STR_LENGTH]=""; - int acquisitionIndex = -1; - int frameIndex= -1; - int index=0; - int subframenumber=-1; - - int frameSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE * packetsPerFrame * EIGER_MAX_PORTS; - int dataSize = EIGER_ONE_GIGA_ONE_DATA_SIZE * packetsPerFrame * EIGER_MAX_PORTS; - int oneDataSize = EIGER_ONE_GIGA_ONE_DATA_SIZE; - int onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE; - if(tenGigaEnable){ - frameSize = EIGER_TEN_GIGA_ONE_PACKET_SIZE * packetsPerFrame * EIGER_MAX_PORTS; - dataSize = EIGER_TEN_GIGA_ONE_DATA_SIZE * packetsPerFrame * EIGER_MAX_PORTS; - oneDataSize = EIGER_TEN_GIGA_ONE_DATA_SIZE; - onePacketSize = EIGER_TEN_GIGA_ONE_PACKET_SIZE; - } - char* raw; - char* origVal = new char[frameSize](); - char* retval = new char[dataSize](); - memset(origVal,0xFF,frameSize); - memset(retval,0xFF,dataSize); - - int64_t startAcquisitionIndex=0; - int64_t startFrameIndex=0; - strcpy(mess,"Could not read frame\n"); - - - // execute action if the arguments correctly arrived -#ifdef SLS_RECEIVER_UDP_FUNCTIONS - - if (receiverBase == NULL){ - strcpy(mess,SET_RECEIVER_ERR_MESSAGE); - ret=FAIL; - } - - //send garbage with -1 index to try again - else if(!receiverBase->getFramesCaught()){ - startAcquisitionIndex=-1; -#ifdef VERYVERBOSE - cout<<"haven't caught any frame yet"<readFrame(i,fName,&raw,startAcquisitionIndex,startFrameIndex); - //send garbage with -1 index to try again - if (raw == NULL){ - startAcquisitionIndex = -1; -#ifdef VERYVERBOSE - cout<<"data not ready for gui yet for "<< i << endl; -#endif - raw=NULL; - break; - } - else{ - eiger_packet_footer_t* wbuf_footer; - wbuf_footer = (eiger_packet_footer_t*)(raw + HEADER_SIZE_NUM_TOT_PACKETS + oneDataSize + sizeof(eiger_packet_header_t)); - index =(uint32_t)(*( (uint64_t*) wbuf_footer)); - index += (startFrameIndex-1); - fnum[i] = index; - if(dynamicrange == 32){ - eiger_packet_header_t* wbuf_header; - wbuf_header = (eiger_packet_header_t*) (raw + HEADER_SIZE_NUM_TOT_PACKETS); - subframenumber = *( (uint32_t*) wbuf_header->subFrameNumber); - } -#ifdef VERYVERBOSE - cout << "index:" << dec << index << endl; - cout << "subframenumber:" << dec << subframenumber << endl; -#endif - int numpackets = (uint32_t)(*( (uint32_t*) raw)); - memcpy(((char*)origVal)+(i*onePacketSize*packetsPerFrame),raw + HEADER_SIZE_NUM_TOT_PACKETS,numpackets*onePacketSize); - raw=NULL; - } - } - //proper frame - if(startAcquisitionIndex != -1){ - //cout<<"**** got proper frame ******"<resetGuiPointer(i); - - if(fnum[0]!=fnum[1]) - cprintf(BG_RED,"Fnums differ %d and %d\n",fnum[0],fnum[1]); - - int c1=8;//first port - int c2=(frameSize/2) + 8; //second port - int retindex=0; - int irow,ibytesperpacket; - int linesperpacket = (16*1/dynamicrange);// 16:1 line, 8:2 lines, 4:4 lines, 32: 0.5 - int numbytesperlineperport=(EIGER_PIXELS_IN_ONE_ROW/EIGER_MAX_PORTS)*dynamicrange/8;//16:1024,8:512,4:256,32:2048 - int datapacketlength = EIGER_ONE_GIGA_ONE_DATA_SIZE; - int total_num_bytes = EIGER_ONE_GIGA_ONE_PACKET_SIZE *(EIGER_ONE_GIGA_CONSTANT *dynamicrange)*2; - - if(tenGigaEnable){ - linesperpacket = (16*4/dynamicrange);// 16:4 line, 8:8 lines, 4:16 lines, 32: 2 - datapacketlength = EIGER_TEN_GIGA_ONE_DATA_SIZE; - total_num_bytes = EIGER_TEN_GIGA_ONE_PACKET_SIZE*(EIGER_TEN_GIGA_CONSTANT*dynamicrange)*2; - } - //if 1GbE, one line is split into two packets for 32 bit mode, so its special - else if(dynamicrange == 32){ - numbytesperlineperport = 1024; - linesperpacket = 1; //we repeat this twice anyway for 32 bit - } - - if(!bottom){ - - for(irow=0;irowdifferentClients){ - FILE_LOG(logDEBUG) << "Force update"; - ret=FORCE_UPDATE; - } - - // send answer - mySock->SendDataOnly(&ret,sizeof(ret)); - if(ret==FAIL){ - cprintf(RED,"%s\n",mess); - mySock->SendDataOnly(mess,sizeof(mess)); - } - else{ - mySock->SendDataOnly(fName,MAX_STR_LENGTH); - mySock->SendDataOnly(&acquisitionIndex,sizeof(acquisitionIndex)); - mySock->SendDataOnly(&frameIndex,sizeof(frameIndex)); - mySock->SendDataOnly(&subframenumber,sizeof(subframenumber)); - mySock->SendDataOnly(retval,dataSize); - } - - delete [] retval; - delete [] origVal; - delete [] raw; -*/ - return ret; -} - - - - - - - -int slsReceiverTCPIPInterface::jungfrau_read_frame(){ - ret=OK; - - char fName[MAX_STR_LENGTH]=""; - int acquisitionIndex = -1; - int frameIndex= -1; - int64_t currentIndex=0; - int64_t startAcquisitionIndex=0; - int64_t startFrameIndex=0; - strcpy(mess,"Could not read frame\n"); - - - int frameSize = JFRAU_ONE_PACKET_SIZE * packetsPerFrame; - int dataSize = JFRAU_ONE_DATA_SIZE * packetsPerFrame; - int oneDataSize = JFRAU_ONE_DATA_SIZE; - - char* raw; - char* origVal = new char[frameSize](); - char* retval = new char[dataSize](); - char* blackpacket = new char[oneDataSize](); - - for(int i=0;igetFramesCaught()){ - startAcquisitionIndex=-1; -#ifdef VERYVERBOSE - cout<<"haven't caught any frame yet"<readFrame(0,fName,&raw,startAcquisitionIndex,startFrameIndex); - //send garbage with -1 index to try again - if (raw == NULL){ - startAcquisitionIndex = -1; -#ifdef VERYVERBOSE - cout<<"data not ready for gui yet"<frameNumber))&0xffffff; -#ifdef VERYVERBOSE - cout << "currentIndex:" << dec << currentIndex << endl; -#endif - - int64_t currentPacket = packetsPerFrame-1; - int offsetsrc = 0; - int offsetdest = 0; - int64_t ifnum=-1; - int64_t ipnum=-1; - - while(currentPacket >= 0){ - header = (jfrau_packet_header_t*) (origVal + offsetsrc); - ifnum = (*( (uint32_t*) header->frameNumber))&0xffffff; - ipnum = (*( (uint8_t*) header->packetNumber)); - if(ifnum != currentIndex) { - cout << "current packet " << currentPacket << " Wrong Frame number " << ifnum << ", copying blank packet" << endl; - memcpy(retval+offsetdest,blackpacket,oneDataSize); - offsetdest += oneDataSize; - //no need to increase offsetsrc as all packets will be wrong - currentPacket--; - continue; - } - if(ipnum!= currentPacket){ - cout << "current packet " << currentPacket << " Wrong packet number " << ipnum << ", copying blank packet" << endl; - memcpy(retval+offsetdest,blackpacket,oneDataSize); - offsetdest += oneDataSize; - //no need to increase offsetsrc until we get the right packet - currentPacket--; - continue; - } - offsetsrc+=JFRAU_HEADER_LENGTH; - memcpy(retval+offsetdest,origVal+offsetsrc,oneDataSize); - offsetdest += oneDataSize; - offsetsrc += oneDataSize; - currentPacket--; - } - - - acquisitionIndex = (int)(currentIndex-startAcquisitionIndex); - if(acquisitionIndex == -1) - startFrameIndex = -1; - else - frameIndex = (int)(currentIndex-startFrameIndex); -#ifdef VERY_VERY_DEBUG - cout << "acquisitionIndex calculated is:" << acquisitionIndex << endl; - cout << "frameIndex calculated is:" << frameIndex << endl; - cout << "currentIndex:" << currentIndex << endl; - cout << "startAcquisitionIndex:" << startAcquisitionIndex << endl; - cout << "startFrameIndex:" << startFrameIndex << endl; -#endif - } - } - -#ifdef VERYVERBOSE - if(frameIndex!=-1){ - cout << "fName:" << fName << endl; - cout << "acquisitionIndex:" << acquisitionIndex << endl; - cout << "frameIndex:" << frameIndex << endl; - cout << "startAcquisitionIndex:" << startAcquisitionIndex << endl; - cout << "startFrameIndex:" << startFrameIndex << endl; - } -#endif - - - -#endif - - if(ret==OK && mySock->differentClients){ - FILE_LOG(logDEBUG) << "Force update"; - ret=FORCE_UPDATE; - } - - // send answer - mySock->SendDataOnly(&ret,sizeof(ret)); - if(ret==FAIL){ - cprintf(RED,"%s\n",mess); - mySock->SendDataOnly(mess,sizeof(mess)); - } - else{ - mySock->SendDataOnly(fName,MAX_STR_LENGTH); - mySock->SendDataOnly(&acquisitionIndex,sizeof(acquisitionIndex)); - mySock->SendDataOnly(&frameIndex,sizeof(frameIndex)); - mySock->SendDataOnly(retval,dataSize); - } - - delete [] retval; - delete [] origVal; - delete [] raw; - - return ret; -} +int slsReceiverTCPIPInterface::jungfrau_read_frame(){ return FAIL;} @@ -2703,16 +1794,6 @@ int slsReceiverTCPIPInterface::set_dynamic_range() { retval = receiverBase->getDynamicRange(); if(dr > 0 && retval != dr) ret = FAIL; - else{ - dynamicrange = retval; - if(myDetectorType == EIGER){ - if(!tenGigaEnable) - packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicrange; - else - packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicrange; - }else if (myDetectorType == JUNGFRAU) - packetsPerFrame = JFRAU_PACKETS_PER_FRAME; - } } } }