some more changes

This commit is contained in:
Dhanya Maliakal
2015-10-08 12:19:07 +02:00
parent 92b73e0e88
commit a3e88f96d6
10 changed files with 1436 additions and 1350 deletions

View File

@ -6,25 +6,22 @@
* @short does all the functions for a receiver, set/get parameters, start/stop etc.
***********************************************/
#include "UDPBaseImplementation.h"
#include "sls_receiver_defs.h"
#include "receiver_defs.h"
//#include "sls_receiver_defs.h"
//#include "receiver_defs.h"
#include "genericSocket.h"
#include "circularFifo.h"
#include "singlePhotonDetector.h"
#include "slsReceiverData.h"
#include "moenchCommonMode.h"
//#include "UDPInterface.h"
#include "UDPBaseImplementation.h"
#ifdef MYROOT1
#include <TTree.h>
#include <TFile.h>
#endif
#include <string.h>
#include <pthread.h>
#include <stdio.h>
@ -38,7 +35,13 @@
class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBaseImplementation {
public:
/**
/*************************************************************************
* Constructor & Destructor **********************************************
* They access local cache of configuration or detector parameters *******
*************************************************************************/
/**
* Constructor
*/
UDPStandardImplementation();
@ -48,17 +51,382 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
*/
virtual ~UDPStandardImplementation();
/*************************************************************************
* Getters ***************************************************************
* They access local cache of configuration or detector parameters *******
*************************************************************************/
//***acquisition count parameters***
/*************************************************************************
* Setters ***************************************************************
* They modify the local cache of configuration or detector parameters ***
*************************************************************************/
//**initial parameters***
/**
* Overridden method
* Configure command line parameters
* @param config_map mapping of config parameters passed from command line arguments
*/
void configure(map<string, string> config_map);
//*** file parameters***
/**
* delete and free member parameters
* Overridden method
* Set data compression, by saving only hits (so far implemented only for Moench and Gotthard)
* @param b true for data compression enable, else false
*/
void deleteMembers();
void setDataCompressionEnable(const bool b);
//***acquisition parameters***
/**
* Overridden method
* Set Short Frame Enabled, later will be moved to getROI (so far only for gotthard)
* @param i index of adc enabled, else -1 if all enabled
*/
void setShortFrameEnable(const int i);
/**
* initialize member parameters
* Overridden method
* Set the Frequency of Frames Sent to GUI
* @param i 0 for random frame requests, n for nth frame frequency
* @return OK or FAIL
*/
int setFrameToGuiFrequency(const uint32_t i);
/**
* Overridden method
* Set Acquisition Period
* @param i acquisition period
* @return OK or FAIL
*/
int setAcquisitionPeriod(const uint64_t i);
/**
* Overridden method
* Set Dynamic Range or Number of Bits Per Pixel
* @param i dynamic range that is 4, 8, 16 or 32
* @return OK or FAIL
*/
int setDynamicRange(const uint32_t i);
/**
* Overridden method
* Set Ten Giga Enable
* @param b true if 10Giga enabled, else false (1G enabled)
* @return OK or FAIL
*/
int setTenGigaEnable(const bool b);
/*************************************************************************
* Behavioral functions***************************************************
* They may modify the status of the receiver ****************************
*************************************************************************/
//***initial functions***
/**
* Overridden method
* Set receiver type (and corresponding detector variables in derived STANDARD class)
* It is the first function called by the client when connecting to receiver
* @param d detector type
* @return OK or FAIL
*/
int setDetectorType(const slsReceiverDefs::detectorType d);
//***acquisition functions***
/**
* Overridden method
* Reset acquisition parameters such as total frames caught for an entire acquisition (including all scans)
*/
void resetAcquisitionCount();
/**
* Overridden method
* Start Listening for Packets by activating all configuration settings to receiver
* @param c error message if FAIL
* @return OK or FAIL
*/
int startReceiver(char *c=NULL);
private:
/*************************************************************************
* Setters ***************************************************************
* They modify the local cache of configuration or detector parameters ***
*************************************************************************/
//**initial parameters***
/**
* Delete and free base member parameters
*/
void deleteBaseMembers();
/**
* Delete and free member parameters
*/
void deleteMembers();
/**
* Deletes all the filter objects for single photon data
* Deals with data compression
*/
void deleteFilter();
/**
* Initialize base member parameters
*/
void initializeBaseMembers();
/**
* Initialize member parameters
*/
void initializeMembers();
/**
* Sets up all the filter objects for single photon data
* Deals with data compression
*/
void initializeFilter();
/**
* Create Listening Threads
* @param destroy is true to destroy all the threads
*/
int createListeningThreads(bool destroy = false);
/**
* Create Writer Threads
* @param destroy is true to destroy all the threads
* @return OK or FAIL
*/
int createWriterThreads(bool destroy = false);
/**
* Set Thread Priorities
*/
void setThreadPriorities();
/**
* Set up the Fifo Structure for processing buffers
* between listening and writer threads
* @return OK or FAIL
*/
int setupFifoStructure();
/**
* Creates UDP Sockets
* @return OK or FAIL
*/
int createUDPSockets();
//**detector parameters***
/**
* structure of an eiger packet header
* subframenum subframe number for 32 bit mode (already written by firmware)
* missingpacket explicitly put to 0xFF to recognize it in file read (written by software)
* portnum 0 for the first port and 1 for the second port (written by software to file)
* dynamicrange dynamic range or bits per pixel (written by software to file)
*/
typedef struct {
unsigned char subFameNumber[4];
unsigned char missingPacket[2];
unsigned char portIndex[1];
unsigned char dynamicRange[1];
} eiger_packet_header_t;
/**
* structure of an eiger packet footer
* framenum 48 bit frame number (already written by firmware)
* packetnum packet number (already written by firmware)
*/
typedef struct {
unsigned char frameNumber[6];
unsigned char packetNumber[2];
} eiger_packet_footer_t;
/** Size of 1 Frame including headers */
int frameSize;
/** Size of 1 buffer processed at a time */
int bufferSize;
/** One Packet Size including headers */
int onePacketSize;
/** One Packet Size without headers */
int oneDataSize;
/** Frame Index Mask */
uint64_t frameIndexMask;
/** Frame Index Offset */
int frameIndexOffset;
/** Packet Index Mask */
uint64_t packetIndexMask;
/** Footer offset from start of Packet*/
int footerOffset;
//***File parameters***
/** Maximum Packets Per File **/
int maxPacketsPerFile;
//***acquisition indices parameters***
/** Frame Number of First Frame of an Acquisition */
uint64_t startAcquisitionIndex;
/** Frame index at start of each real time acquisition (eg. for each scan) */
uint64_t startFrameIndex;
/** Current Frame Number */
uint64_t currentFrameNumber;
/* Acquisition started */
bool acqStarted;
/* Measurement started */
bool measurementStarted;
/** Total Frame Count listened to by listening threads */
int totalListeningFrameCount[MAX_NUMBER_OF_LISTENING_THREADS];
//***receiver parameters***
/** Receiver Buffer */
char *buffer[MAX_NUMBER_OF_LISTENING_THREADS];
/** Memory allocated */
char *mem0[MAX_NUMBER_OF_LISTENING_THREADS];
/** Circular fifo to point to addresses of data listened to */
CircularFifo<char>* fifo[MAX_NUMBER_OF_LISTENING_THREADS];
/** Circular fifo to point to address already written and freed, to be reused */
CircularFifo<char>* fifoFree[MAX_NUMBER_OF_LISTENING_THREADS];
/** Number of Jobs Per Buffer */
int numberofJobsPerBuffer;
/** Fifo Depth */
uint32_t fifoSize;
/** Current Frame copied for Gui */
char* latestData;
//***general and listening thread parameters***
/** Ensures if threads created successfully */
bool threadStarted;
/** Number of Listening Threads */
int numberofListeningThreads;
/** Listening Threads */
pthread_t listeningThreads[MAX_NUMBER_OF_LISTENING_THREADS];
/** Semaphores Synchronizing Listening Threads */
sem_t listenSemaphore[MAX_NUMBER_OF_LISTENING_THREADS];
/** Current Listening Thread Index*/
int currentListeningThreadIndex;
/** Mask with each bit indicating status of each listening thread */
volatile uint32_t listeningThreadsMask;
/** Set to self-terminate listening threads waiting for semaphores */
bool killAllListeningThreads;
//***writer thread parameters***
/** Maximum Number of Writer Threads */
const static int MAX_NUMBER_OF_WRITER_THREADS = 15;
/** Number of Writer Threads */
int numberofWriterThreads;
/** Writer Threads */
pthread_t writingThreads[MAX_NUMBER_OF_WRITER_THREADS];
/** Semaphores Synchronizing Writer Threads */
sem_t writerSemaphore[MAX_NUMBER_OF_WRITER_THREADS];
/** Current Writer Thread Index*/
int currentWriterThreadIndex;
/** Mask with each bit indicating status of each writer thread */
volatile uint32_t writerThreadsMask;
/** Mask with each bit indicating file created for each writer thread*/
volatile uint32_t createFileMask;
/** Set to self-terminate writer threads waiting for semaphores */
bool killAllWritingThreads;
//***filter parameters***
/** Common Mode Subtraction Enable FIXME: Always false, only moench uses, Ask Anna */
bool commonModeSubtractionEnable;
/** Moench Common Mode Subtraction */
moenchCommonMode *moenchCommonModeSubtraction;
/** Single Photon Detector Object for each writer thread */
singlePhotonDetector<uint16_t> *singlePhotonDetectorObject[MAX_NUMBER_OF_WRITER_THREADS];
/** Receiver Data Object for each writer thread */
slsReceiverData<uint16_t> *receiverData[MAX_NUMBER_OF_WRITER_THREADS];
//***mutex***
/** mutex for status */
pthread_mutex_t status_mutex;
/**
* Set receiver type
@ -74,26 +442,11 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
*/
//uint32_t getStartAcquisitionIndex();
/**
* Returns current Frame Index Caught for an entire acquisition (including all scans)
*/
//uint32_t getAcquisitionIndex();
/**
* Returns if acquisition started
*/
//bool getAcquistionStarted();
/**
* Returns Frames Caught for each real time acquisition (eg. for each scan)
*/
//int getFramesCaught();
/**
* Returns Total Frames Caught for an entire acquisition (including all scans)
*/
//int getTotalFramesCaught();
/**
* Returns the frame index at start of each real time acquisition (eg. for each scan)
*/
@ -117,71 +470,8 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
//void resetTotalFramesCaught();
//file parameters
/**
* Returns File Path
*/
//char* getFilePath() const;
/**
* Set File Path
* @param c file path
*/
//char* setFilePath(const char c[]);
/**
* Returns File Name
*/
//char* getFileName() const;
/**
* Set File Name (without frame index, file index and extension)
* @param c file name
*/
//char* setFileName(const char c[]);
/**
* Returns File Index
*/
//int getFileIndex();
/**
* Set File Index
* @param i file index
*/
//int setFileIndex(int i);
/**
* Set Frame Index Needed
* @param i frame index needed
*/
//int setFrameIndexNeeded(int i);
/**
* Set enable file write
* @param i file write enable
* Returns file write enable
*/
//int setEnableFileWrite(int i);
/**
* Enable/disable overwrite
* @param i enable
* Returns enable over write
*/
//int setEnableOverwrite(int i);
/**
* Returns file write enable
* 1: YES 0: NO
*/
//int getEnableFileWrite() const;
/**
* Returns file over write enable
* 1: YES 0: NO
*/
//int getEnableOverwrite() const;
//other parameters
@ -202,83 +492,7 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
*/
void setDetectorHostname(const char *detectorHostName);
/* Returns detector hostname
/returns hostname
* caller needs to deallocate the returned char array.
* if uninitialized, it must return NULL
*/
char *getDetectorHostname() const;
/**
* Set Ethernet Interface or IP to listen to
*/
void setEthernetInterface(char* c);
/**
* Set UDP Port Number
*/
void setUDPPortNo(int p);
/**
* Set UDP Port Number
*/
void setUDPPortNo2(int p);
/*
* Returns number of frames to receive
* This is the number of frames to expect to receiver from the detector.
* The data receiver will change from running to idle when it got this number of frames
*/
int getNumberOfFrames() const;
/**
* set frame number if a positive number
*/
int32_t setNumberOfFrames(int32_t fnum);
/**
* Returns scan tag
*/
int getScanTag() const;
/**
* set scan tag if its is a positive number
*/
int32_t setScanTag(int32_t stag);
/**
* Returns the number of bits per pixel
*/
int getDynamicRange() const;
/**
* set dynamic range if its is a positive number
*/
int32_t setDynamicRange(int32_t dr);
/**
* Set short frame
* @param i if shortframe i=1
*/
int setShortFrame(int i);
/**
* Set the variable to send every nth frame to gui
* or if 0,send frame only upon gui request
*/
int setNFrameToGui(int i);
/** set acquisition period if a positive number
*/
int64_t setAcquisitionPeriod(int64_t index);
/** get data compression, by saving only hits
*/
bool getDataCompression();
/** enabl data compression, by saving only hits
/returns if failed
*/
int enableDataCompression(bool enable);
/**
* enable 10Gbe
@ -337,20 +551,9 @@ private:
std::cout << "[WARNING] Method " << method_name << " not implemented!" << std::endl;
};
*/
/**
* Deletes all the filter objects for single photon data
*/
void deleteFilter();
/**
* Constructs the filter for single photon data
*/
void setupFilter();
/**
* set up fifo according to the new numjobsperthread
*/
void setupFifoStructure ();
/**
* Copy frames to gui
@ -358,28 +561,6 @@ private:
*/
void copyFrameToGui(char* startbuf[], char* buf=NULL);
/**
* creates udp sockets
* \returns if success or fail
*/
int createUDPSockets();
/**
* create listening thread
* @param destroy is true to kill all threads and start again
*/
int createListeningThreads(bool destroy = false);
/**
* create writer threads
* @param destroy is true to kill all threads and start again
*/
int createWriterThreads(bool destroy = false);
/**
* set thread priorities
*/
void setThreadPriorities();
/**
* initializes variables and creates the first file
@ -507,8 +688,7 @@ private:
/** max number of writer threads */
const static int MAX_NUM_WRITER_THREADS = 15;
/** missing packet identifier value */
const static uint16_t missingPacketValue = 0xFFFF;
@ -517,21 +697,9 @@ private:
/** UDP Socket between Receiver and Detector */
genericSocket* udpSocket[MAX_NUM_LISTENING_THREADS];
/** max packets per file **/
int maxPacketsPerFile;
/** Frame Index at start of an entire acquisition (including all scans) */
uint64_t startAcquisitionIndex;
/** Complete File name */
char savefilename[MAX_STR_LENGTH];
/* Measurement started */
bool measurementStarted;
/* Acquisition started */
bool acqStarted;
/** Frame index at start of each real time acquisition (eg. for each scan) */
uint32_t startFrameIndex;
/** Actual current frame index of each time acquisition (eg. for each scan) */
uint32_t frameIndex;
@ -547,34 +715,9 @@ private:
/** Number of missing packets per buffer*/
uint32_t numMissingPackets;
/** frame index mask */
uint32_t frameIndexMask;
/** packet index mask */
uint32_t packetIndexMask;
/** frame index offset */
int frameIndexOffset;
/** Current Frame Number */
uint64_t currframenum;
/** Previous Frame number from buffer */
int prevframenum;
/** size of one frame */
int frameSize;
/** buffer size. different from framesize as we wait for one packet instead of frame for eiger */
int bufferSize;
/** one buffer size */
int onePacketSize;
/** one buffer size */
int oneDataSize;
/** latest data */
char* latestData;
/** gui data ready */
int guiDataReady;
@ -585,71 +728,9 @@ private:
/** points to the filename to send to gui */
char* guiFileName;
/** fifo size */
unsigned int fifosize;
/** number of jobs per thread for data compression */
int numJobsPerThread;
/** memory allocated for the buffer */
char *mem0[MAX_NUM_LISTENING_THREADS];
/** circular fifo to store addresses of data read */
CircularFifo<char>* fifo[MAX_NUM_LISTENING_THREADS];
/** circular fifo to store addresses of data already written and ready to be resued*/
CircularFifo<char>* fifoFree[MAX_NUM_LISTENING_THREADS];
/** Receiver buffer */
char *buffer[MAX_NUM_LISTENING_THREADS];
/** number of writer threads */
int numListeningThreads;
/** number of writer threads */
int numWriterThreads;
/** to know if listening and writer threads created properly */
int thread_started;
/** current listening thread index*/
int currentListeningThreadIndex;
/** current writer thread index*/
int currentWriterThreadIndex;
/** thread listening to packets */
pthread_t listening_thread[MAX_NUM_LISTENING_THREADS];
/** thread writing packets */
pthread_t writing_thread[MAX_NUM_WRITER_THREADS];
/** total frame count the listening thread has listened to */
int totalListeningFrameCount[MAX_NUM_LISTENING_THREADS];
/** mask showing which listening threads are running */
volatile uint32_t listeningthreads_mask;
/** mask showing which writer threads are running */
volatile uint32_t writerthreads_mask;
/** mask showing which threads have created files*/
volatile uint32_t createfile_mask;
/** OK if file created was successful */
/** OK if file created was successful */
int ret_createfile;
/** variable used to self terminate threads waiting for semaphores */
int killAllListeningThreads;
/** variable used to self terminate threads waiting for semaphores */
int killAllWritingThreads;
/** footer offset is different for 1g and 10g*/
int footer_offset;
// TODO: not properly sure where to put these...
/** structure of an eiger image header*/
@ -659,19 +740,11 @@ private:
//semaphores
/** semaphore to synchronize writer and guireader threads */
sem_t smp;
/** semaphore to synchronize listener threads */
sem_t listensmp[MAX_NUM_LISTENING_THREADS];
/** semaphore to synchronize writer threads */
sem_t writersmp[MAX_NUM_WRITER_THREADS];
//mutex
/** guiDataReady mutex */
pthread_mutex_t dataReadyMutex;
/** mutex for status */
pthread_mutex_t status_mutex;
/** mutex for progress variable currframenum */
pthread_mutex_t progress_mutex;
@ -682,10 +755,7 @@ private:
FILE *sfilefd;
//filter
singlePhotonDetector<uint16_t> *singlePhotonDet[MAX_NUM_WRITER_THREADS];
slsReceiverData<uint16_t> *receiverdata[MAX_NUM_WRITER_THREADS];
moenchCommonMode *cmSub;
bool commonModeSubtractionEnable;
#ifdef MYROOT1
/** Tree where the hits are stored */