Complete rewrite of receiver, with new changes

This commit is contained in:
Dhanya Maliakal 2017-01-27 15:40:36 +01:00
parent 70a7d83175
commit 01d54a7a4c
32 changed files with 6986 additions and 4797 deletions

View File

@ -21,7 +21,10 @@ LIBZMQ = -L$(LIBZMQDIR) -Wl,-rpath=$(LIBZMQDIR) -lzmq
#-Iinclude -I../slsDetectorCalibration -I$(ASM)
SRC_CLNT = MySocketTCP.cpp UDPInterface.cpp UDPBaseImplementation.cpp UDPStandardImplementation.cpp slsReceiverTCPIPInterface.cpp slsReceiver.cpp slsReceiverUsers.cpp utilities.cpp
SRC_CLNT = MySocketTCP.cpp ThreadObject.cpp Listener.cpp DataProcessor.cpp DataStreamer.cpp Fifo.cpp FileWriter.cpp BinaryFileWriter.cpp UDPInterface.cpp UDPBaseImplementation.cpp UDPStandardImplementation.cpp slsReceiverTCPIPInterface.cpp slsReceiver.cpp slsReceiverUsers.cpp utilities.cpp
ifeq ($(HDF5),yes)
SRC_CLNT += HDF5FileWriter.cpp
endif
ifeq ($(REST), yes)
SRC_CLNT += UDPRESTImplementation.cpp

View File

@ -0,0 +1,34 @@
/************************************************
* @file BinaryFileWriter.h
* @short sets/gets properties for the binary file,
* creates/closes the file and writes data to it
***********************************************/
#ifndef BINARY_FILE_WRITER_H
#define BINARY_FILE_WRITER_H
#include "FileWriter.h"
/**
*@short sets/gets properties for the binary file, creates/closes the file and writes data to it
*/
class BinaryFileWriter : private virtual slsReceiverDefs, public FileWriter {
public:
/**
* Constructor
* creates the File Writer
* @fname pointer to file name prefix
*/
BinaryFileWriter(char* fname);
/**
* Destructor
*/
~BinaryFileWriter();
private:
};
#endif

View File

@ -0,0 +1,104 @@
/************************************************
* @file DataProcessor.h
* @short creates data processor thread that
* pulls pointers to memory addresses from fifos
* and processes data stored in them & writes them to file
***********************************************/
#ifndef DATAPROCESSOR_H
#define DATAPROCESSOR_H
/**
*@short creates & manages a data processor thread each
*/
#include "ThreadObject.h"
class Fifo;
class DataProcessor : private virtual slsReceiverDefs, public ThreadObject {
public:
/**
* Constructor
* Calls Base Class CreateThread(), sets ErrorMask if error and increments NumberofDataProcessors
* @param f address of Fifo pointer
*/
DataProcessor(Fifo*& f);
/**
* Destructor
* Calls Base Class DestroyThread() and decrements NumberofDataProcessors
*/
~DataProcessor();
/**
* Get RunningMask
* @return RunningMask
*/
static uint64_t GetErrorMask();
/**
* Reset RunningMask
*/
static void ResetRunningMask();
/**
* Set bit in RunningMask to allow thread to run
*/
void StartRunning();
/**
* Reset bit in RunningMask to prevent thread from running
*/
void StopRunning();
/**
* Set Fifo pointer to the one given
* @param f address of Fifo pointer
*/
void SetFifo(Fifo*& f);
private:
/**
* Get Type
* @return type
*/
std::string GetType();
/**
* Returns if the thread is currently running
* @returns true if thread is running, else false
*/
bool IsRunning();
/**
* Thread Exeution for DataProcessor Class
* Pop bound addresses, process them,
* write to file if needed & free the address
*/
void ThreadExecution();
/** type of thread */
static const std::string TypeName;
/** Total Number of DataProcessor Objects */
static int NumberofDataProcessors;
/** Mask of errors on any object eg.thread creation */
static uint64_t ErrorMask;
/** Mask of all listener objects running */
static uint64_t RunningMask;
/** mutex to update static items among objects (threads)*/
static pthread_mutex_t Mutex;
/** Fifo structure */
Fifo* fifo;
};
#endif

View File

@ -0,0 +1,90 @@
/************************************************
* @file DataStreamer.h
* @short streams data from receiver via ZMQ
***********************************************/
#ifndef DATASTREAMER_H
#define DATASTREAMER_H
/**
*@short creates & manages a data streamer thread each
*/
#include "ThreadObject.h"
class DataStreamer : private virtual slsReceiverDefs, public ThreadObject {
public:
/**
* Constructor
* Calls Base Class CreateThread(), sets ErrorMask if error and increments NumberofDataStreamers
*/
DataStreamer();
/**
* Destructor
* Calls Base Class DestroyThread() and decrements NumberofDataStreamers
*/
~DataStreamer();
/**
* Get RunningMask
* @return RunningMask
*/
static uint64_t GetErrorMask();
/**
* Reset RunningMask
*/
static void ResetRunningMask();
/**
* Set bit in RunningMask to allow thread to run
*/
void StartRunning();
/**
* Reset bit in RunningMask to prevent thread from running
*/
void StopRunning();
private:
/**
* Get Type
* @return type
*/
std::string GetType();
/**
* Returns if the thread is currently running
* @returns true if thread is running, else false
*/
bool IsRunning();
/**
* Thread Exeution for DataStreamer Class
* Stream an image via zmq
*/
void ThreadExecution();
/** type of thread */
static const std::string TypeName;
/** Total Number of DataStreamer Objects */
static int NumberofDataStreamers;
/** Mask of errors on any object eg.thread creation */
static uint64_t ErrorMask;
/** Mask of all listener objects running */
static uint64_t RunningMask;
/** mutex to update static items among objects (threads)*/
static pthread_mutex_t Mutex;
};
#endif

View File

@ -0,0 +1,88 @@
/************************************************
* @file Fifo.h
* @short constructs the fifo structure
* which is a circular buffer with pointers to
* parts of allocated memory
***********************************************/
#ifndef FIFO_H
#define FIFO_H
/**
*@short constructs the fifo structure
*/
#include "sls_receiver_defs.h"
#include "logger.h"
#include "circularFifo.h"
class Fifo : private virtual slsReceiverDefs {
public:
/**
* Constructor
* Calls CreateFifos that creates fifos and allocates memory
* @param fifoItemSize size of each fifo item
* @param fifoDepth fifo depth
* @param success true if successful, else false
*/
Fifo(uint32_t fifoItemSize, uint32_t fifoDepth, bool &success);
/**
* Destructor
*/
~Fifo();
/**
* Pops free address from fifoFree
*/
void GetNewAddress(char*& address);
/**
* Frees the bound address by pushing into fifoFree
*/
void FreeAddress(char*& address);
/**
* Pushes bound address into fifoBound
*/
void PushAddress(char*& address);
/**
* Pops bound address from fifoBound to process data
*/
void PopAddress(char*& address);
private:
/**
* Create Fifos, allocate memory & push addresses into fifo
* @param fifoItemSize size of each fifo item
* @param fifoDepth fifo depth
* @return OK if successful, else FAIL
*/
int CreateFifos(uint32_t fifoItemSize, uint32_t fifoDepth);
/**
* Destroy Fifos and deallocate memory
*/
void DestroyFifos();
/** Total Number of Fifo Class Objects */
static int NumberofFifoClassObjects;
/** Self Index */
int index;
/** Memory allocated, whose addresses are pushed into the fifos */
char* memory;
/** Circular Fifo pointing to addresses of bound data in memory */
CircularFifo<char>* fifoBound;
/** Circular Fifo pointing to addresses of freed data in memory */
CircularFifo<char>* fifoFree;
};
#endif

View File

@ -0,0 +1,32 @@
/********************************************//**
* @file FileWriter.h
* @short sets/gets properties for the file, creates/closes the file and writes data to it
***********************************************/
#ifndef FILE_WRITER_H
#define FILE_WRITER_H
/**
*@short sets/gets properties for the file, creates/closes the file and writes data to it
*/
class FileWriter : private virtual slsReceiverDefs {
public:
/**
* Constructor
* creates the File Writer
*/
FileWriter();
/**
* Destructor
*/
~FileWriter();
private:
};
#endif

View File

@ -0,0 +1,45 @@
/************************************************
* @file FileWriter.h
* @short sets/gets properties for the file,
* creates/closes the file and writes data to it
***********************************************/
#ifndef FILE_WRITER_H
#define FILE_WRITER_H
/**
*@short sets/gets properties for the file, creates/closes the file and writes data to it
*/
#include "sls_receiver_defs.h"
#include "logger.h"
class FileWriter : private virtual slsReceiverDefs {
public:
/**
* Constructor
* creates the File Writer
* @fname pointer to file name prefix
*/
FileWriter(char* fname);
/**
* Destructor
*/
~FileWriter();
/**
* Get File Name prefix
* @returns file name prefix
*/
char* GetFileName();
protected:
char* fileName;
};
#endif

View File

@ -0,0 +1,364 @@
/************************************************
* @file GeneralData.h
* @short abstract for setting/getting properties of detector data
***********************************************/
#ifndef GENERAL_DATA_H
#define GENERAL_DATA_H
/**
*@short abstract for setting/getting properties of detector data
*/
typedef double double32_t;
typedef float float32_t;
typedef int int32_t;
class GeneralData {
public:
/** Number of Pixels in x axis */
uint32_t nPixelsX;
/** Number of Pixels in y axis */
uint32_t nPixelsY;
/** Size of just data in 1 packet (in bytes) */
uint32_t dataSize;
/** Size of 1 packet (in bytes) */
uint32_t packetSize;
/** Number of packets in an image (for each listening UDP port) */
uint32_t packetsPerFrame;
/** Image size (in bytes, for each listening UDP port) */
uint32_t imageSize;
/** Frame Number Mask */
uint64_t frameIndexMask;
/** Frame Index Offset */
uint32_t frameIndexOffset;
/** Packet Index Mask */
uint32_t packetIndexMask;
/** Packet Index Offset */
uint32_t packetIndexOffset;
/** Max Frames per binary file */
uint32_t maxFramesPerFile;
/** Data size that is saved into the fifo buffer at a time*/
uint32_t fifoBufferSize;
/** Header size of data saved into fifo buffer at a time*/
uint32_t fifoBufferHeaderSize;
/** Default Fifo depth */
uint32_t defaultFifoDepth;
/** Cosntructor */
GeneralData(){};
/** Destructor */
virtual ~GeneralData(){};
/**
* Get Header Infomation (frame number, packet number)
* @param index thread index for debugging purposes
* @param packetData pointer to data
* @param dynamicRange dynamic range to assign subframenumber if 32 bit mode
* @param frameNumber frame number
* @param packetNumber packet number
* @param subFrameNumber sub frame number if applicable
* @param bunchId bunch id
*/
void GetHeaderInfo(int index, char* packetData, uint32_t dynamicRange,
uint64_t& frameNumber, uint32_t& packetNumber, uint32_t& subFrameNumber, uint64_t bunchId) {
subFrameNumber = 0;
bunchId = 0;
frameNumber = ((uint32_t)(*((uint32_t*)(packetData))));
frameNumber++;
packetNumber = frameNumber&packetIndexMask;
frameNumber = (frameNumber & frameIndexMask) >> frameIndexOffset;
}
};
class GotthardData : public GeneralData {
public:
/** Constructor */
GotthardData(){
nPixelsX = 1280;
nPixelsY = 1;
dataSize = 1280;
packetSize = 1286;
packetsPerFrame = 2;
imageSize = dataSize*packetsPerFrame;
frameIndexMask = 0xFFFFFFFE;
frameIndexOffset = 1;
packetIndexMask = 1;
maxFramesPerFile = MAX_FRAMES_PER_FILE;
fifoBufferSize = packetSize*packetsPerFrame;
fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS;
defaultFifoDepth = 25000;
};
};
class ShortGotthardData : public GeneralData {
public:
/** Constructor */
ShortGotthardData(){
nPixelsX = 256;
nPixelsY = 1;
dataSize = 512;
packetSize = 518;
packetsPerFrame = 1;
imageSize = dataSize*packetsPerFrame;
frameIndexMask = 0xFFFFFFFF;
maxFramesPerFile = SHORT_MAX_FRAMES_PER_FILE;
fifoBufferSize = packetSize*packetsPerFrame;
fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS;
defaultFifoDepth = 25000;
};
};
class PropixData : public GeneralData {
private:
/** dynamic range for calculating image size */
const static uint32_t dynamicRange = 16;
public:
/** Constructor */
PropixData(){
nPixelsX = 22;
nPixelsY = 22;
dataSize = 1280;
packetSize = 1286;
packetsPerFrame = 2; //not really
imageSize = nPixelsX*nPixelsY*dynamicRange;
frameIndexMask = 0xFFFFFFFE;
frameIndexOffset = 1;
packetIndexMask = 1;
maxFramesPerFile = MAX_FRAMES_PER_FILE;
fifoBufferSize = packetSize*packetsPerFrame;
fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS;
defaultFifoDepth = 25000;
};
};
class Moench02Data : public GeneralData {
public:
/** Bytes Per Adc */
const static uint32_t bytesPerAdc = (40*2);
/** Constructor */
Moench02Data(){
nPixelsX = 160;
nPixelsY = 160;
dataSize = 1280;
packetSize = 1286;
packetsPerFrame = 40;
imageSize = dataSize*packetsPerFrame;
frameIndexMask = 0xFFFFFF00;
frameIndexOffset = 8;
packetIndexMask = 0xFF;
maxFramesPerFile = MOENCH_MAX_FRAMES_PER_FILE;
fifoBufferSize = packetSize*packetsPerFrame;
fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS + FILE_FRAME_HEADER_LENGTH;
defaultFifoDepth = 2500;
};
};
class Moench03Data : public GeneralData {
public:
/** Size of packet header */
const static uint32_t packetHeaderSize = 22;
/** Constructor */
Moench03Data(){
nPixelsX = 400;
nPixelsY = 400;
dataSize = 8192;
packetSize = packetHeaderSize + dataSize;
packetsPerFrame = 40;
imageSize = dataSize*packetsPerFrame;
frameIndexMask = 0xFFFFFFFF;
frameIndexOffset = (6+8);
packetIndexMask = 0xFFFFFFFF;
maxFramesPerFile = JFRAU_MAX_FRAMES_PER_FILE;
fifoBufferSize = packetSize*packetsPerFrame;
fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS + FILE_FRAME_HEADER_LENGTH;
defaultFifoDepth = 2500;
};
};
class JCTBData : public GeneralData {
public:
/** Bytes Per Adc */
const static uint32_t bytesPerAdc = 2;
/** Constructor */
JCTBData(){
nPixelsX = 32;
nPixelsY = 128;
dataSize = 8192;
packetSize = 8224;
packetsPerFrame = 1;
imageSize = dataSize*packetsPerFrame;
maxFramesPerFile = JFCTB_MAX_FRAMES_PER_FILE;
fifoBufferSize = packetSize*packetsPerFrame;
fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS + FILE_FRAME_HEADER_LENGTH;
defaultFifoDepth = 2500;
};
};
class JungfrauData : public GeneralData {
private:
/** Structure of an jungfrau packet header */
typedef struct {
unsigned char emptyHeader[6];
unsigned char reserved[4];
unsigned char packetNumber[1];
unsigned char frameNumber[3];
unsigned char bunchid[8];
} jfrau_packet_header_t;
public:
/** Size of packet header */
const static uint32_t packetHeaderSize = 22;
/** Constructor */
JungfrauData(){
nPixelsX = (256*4);
nPixelsY = 256;
dataSize = 8192;
packetSize = packetHeaderSize + dataSize;
packetsPerFrame = 128;
imageSize = dataSize*packetsPerFrame;
frameIndexMask = 0xffffff;
frameIndexOffset = 0;
packetIndexMask = 0;
packetIndexOffset = 0;
maxFramesPerFile = JFRAU_MAX_FRAMES_PER_FILE;
fifoBufferSize = packetSize*packetsPerFrame;
fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS + FILE_FRAME_HEADER_LENGTH;
defaultFifoDepth = 2500;
};
/**
* Get Header Infomation (frame number, packet number)
* @param index thread index for debugging purposes
* @param packetData pointer to data
* @param dynamicRange dynamic range to assign subframenumber if 32 bit mode
* @param frameNumber frame number
* @param packetNumber packet number
* @param subFrameNumber sub frame number if applicable
* @param bunchId bunch id
*/
void GetHeaderInfo(int index, char* packetData, uint32_t dynamicRange,
uint64_t& frameNumber, uint32_t& packetNumber, uint32_t& subFrameNumber, uint64_t bunchId) {
subFrameNumber = 0;
jfrau_packet_header_t* header = (jfrau_packet_header_t*)(packetData);
frameNumber = (*( (uint32_t*) header->frameNumber))&frameIndexMask;
packetNumber = (uint32_t)(*( (uint8_t*) header->packetNumber));
bunchId = (*((uint64_t*) header->bunchid));
}
};
class EigerData : public GeneralData {
private:
/** Structure of an eiger packet header */
typedef struct {
unsigned char subFrameNumber[4];
unsigned char missingPacket[2];
unsigned char portIndex[1];
unsigned char dynamicRange[1];
} eiger_packet_header_t;
/** Structure of an eiger packet footer */
typedef struct {
unsigned char frameNumber[6];
unsigned char packetNumber[2];
} eiger_packet_footer_t;
public:
/** Size of packet header */
const static uint32_t packetHeaderSize = 8;
/** Footer offset */
uint32_t footerOffset;
/** Constructor */
EigerData(){
nPixelsX = (256*2);
nPixelsY = 256;
dataSize = 1024;
packetSize = 1040;
packetsPerFrame = 1;
imageSize = dataSize*packetsPerFrame;
frameIndexMask = 0xFFFFFFFF;
frameIndexOffset = 0;
packetIndexMask = 0;
packetIndexOffset = 0;
maxFramesPerFile = EIGER_MAX_FRAMES_PER_FILE;
fifoBufferSize = packetSize*packetsPerFrame;
fifoBufferHeaderSize= HEADER_SIZE_NUM_TOT_PACKETS + FILE_FRAME_HEADER_LENGTH;
defaultFifoDepth = 100;
footerOffset = packetHeaderSize+dataSize;
};
/**
* Get Header Infomation (frame number, packet number)
* @param index thread index for debugging purposes
* @param packetData pointer to data
* @param dynamicRange dynamic range to assign subframenumber if 32 bit mode
* @param frameNumber frame number
* @param packetNumber packet number
* @param subFrameNumber sub frame number if applicable
* @param bunchId bunch id
*/
void GetHeaderInfo(int index, char* packetData, uint32_t dynamicRange,
uint64_t& frameNumber, uint32_t& packetNumber, uint32_t& subFrameNumber, uint64_t bunchId) {
bunchId = 0;
subFrameNumber = 0;
eiger_packet_footer_t* footer = (eiger_packet_footer_t*)(packetData + footerOffset);
frameNumber = (uint64_t)(*( (uint64_t*) footer));
packetNumber = (*( (uint16_t*) footer->packetNumber))-1;
if (dynamicRange == 32) {
eiger_packet_header_t* header = (eiger_packet_header_t*) (packetData);
subFrameNumber = *( (uint32_t*) header->subFrameNumber);
}
}
};
#endif

View File

@ -0,0 +1,34 @@
/************************************************
* @file HDF5FileWriter.h
* @short sets/gets properties for the HDF5 file,
* creates/closes the file and writes data to it
***********************************************/
#ifndef HDF5_FILE_WRITER_H
#define HDF5_FILE_WRITER_H
#include "FileWriter.h"
/**
*@short sets/gets properties for the HDF5 file, creates/closes the file and writes data to it
*/
class HDF5FileWriter : private virtual slsReceiverDefs, public FileWriter {
public:
/**
* Constructor
* creates the File Writer
* @fname pointer to file name prefix
*/
HDF5FileWriter(char* fname);
/**
* Destructor
*/
~HDF5FileWriter();
private:
};
#endif

View File

@ -0,0 +1,108 @@
/************************************************
* @file Listener.h
* @short creates the listener thread that
* listens to udp sockets, writes data to memory
* & puts pointers to their memory addresses into fifos
***********************************************/
#ifndef LISTENER_H
#define LISTENER_H
/**
*@short creates & manages a listener thread each
*/
#include "ThreadObject.h"
class Fifo;
class Listener : private virtual slsReceiverDefs, public ThreadObject {
public:
/**
* Constructor
* Calls Base Class CreateThread(), sets ErrorMask if error and increments NumberofListerners
* @param f address of Fifo pointer
*/
Listener(Fifo*& f);
/**
* Destructor
* Calls Base Class DestroyThread() and decrements NumberofListerners
*/
~Listener();
/**
* Get RunningMask
* @return RunningMask
*/
static uint64_t GetErrorMask();
/**
* Reset RunningMask
*/
static void ResetRunningMask();
/**
* Set bit in RunningMask to allow thread to run
*/
void StartRunning();
/**
* Reset bit in RunningMask to prevent thread from running
*/
void StopRunning();
/**
* Set Fifo pointer to the one given
* @param f address of Fifo pointer
*/
void SetFifo(Fifo*& f);
private:
/**
* Get Type
* @return type
*/
std::string GetType();
/**
* Returns if the thread is currently running
* @returns true if thread is running, else false
*/
bool IsRunning();
/**
* Thread Exeution for Listener Class
* Pop free addresses, listen to udp socket,
* write to memory & push the address into fifo
*/
void ThreadExecution();
/** type of thread */
static const std::string TypeName;
/** Total Number of Listener Objects */
static int NumberofListeners;
/** Mask of errors on any object eg.thread creation */
static uint64_t ErrorMask;
/** Mask of all listener objects running */
static uint64_t RunningMask;
/** Mutex to update static items among objects (threads)*/
static pthread_mutex_t Mutex;
/** Fifo structure */
Fifo* fifo;
int count;
};
#endif

View File

@ -0,0 +1,111 @@
/************************************************
* @file ThreadObject.h
* @short creates/destroys a thread
***********************************************/
#ifndef THREADOBJECT_H
#define THREADOBJECT_H
/**
*@short creates/destroys a thread
*/
#include "sls_receiver_defs.h"
#include "logger.h"
#include <pthread.h>
#include <semaphore.h>
#include <string>
class ThreadObject : private virtual slsReceiverDefs {
public:
/**
* Constructor
* @param ind self index
*/
ThreadObject(int ind);
/**
* Destructor
* if alive, destroys thread
*/
virtual ~ThreadObject();
/**
* Print all member values
*/
void PrintMembers();
/**
* Get Type
* @return type
*/
virtual std::string GetType() = 0;
/**
* Returns if the thread is currently running
* @returns true if thread is running, else false
*/
virtual bool IsRunning() = 0;
/**
* What is really being executed in the thread
*/
virtual void ThreadExecution() = 0;
/**
* Post semaphore so thread can continue & start an acquisition
*/
void Continue();
protected:
/**
* Destroy thread, semaphore and resets alive and killThread
*/
void DestroyThread();
/**
* Create Thread, sets semaphore, alive and killThread
* @return OK if successful, else FAIL
*/
int CreateThread();
private:
/**
* Static function using pointer from argument to call RunningThread()
* @param thisPointer pointer to an object of ThreadObject
*/
static void* StartThread(void *thisPointer);
/**
* Actual Thread called: An infinite while loop in which,
* semaphore starts executing its contents as long RunningMask is satisfied
* Then it exits the thread on its own if killThread is true
*/
void RunningThread();
protected:
/** Self Index */
int index;
/** Thread is alive/dead */
bool alive;
/** Variable monitored by thread to kills itself */
bool killThread;
/** Thread variable */
pthread_t thread;
/** Semaphore to synchonize starting of each run */
sem_t semaphore;
};
#endif

View File

@ -173,9 +173,9 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
/**
* Get the data stream enable
* @return 1 to send via zmq, else 0
* @return data stream enable
*/
uint32_t getDataStreamEnable() const;
bool getDataStreamEnable() const;
/**
@ -336,8 +336,9 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
/**
* Set Short Frame Enabled, later will be moved to getROI (so far only for gotthard)
* @param i index of adc enabled, else -1 if all enabled
* @return OK or FAIL
*/
void setShortFrameEnable(const int i);
int setShortFrameEnable(const int i);
/**
* Set the Frequency of Frames Sent to GUI
@ -354,10 +355,10 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
/**
* Set the data stream enable
* @param enable 0 to disable, 1 to enable
* @param enable data stream enable
* @return OK or FAIL
*/
uint32_t setDataStreamEnable(const uint32_t enable);
int setDataStreamEnable(const bool enable);
/**
* Set Acquisition Period
@ -604,7 +605,7 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
/** Timer of Frames sent to GUI when frequency is 0 */
uint32_t frameToGuiTimerinMS;
/** Data Stream Enable from Receiver */
int32_t dataStreamEnable;
bool dataStreamEnable;
static const int DEFAULT_STREAMING_TIMER = 500;

View File

@ -235,9 +235,9 @@ class UDPInterface {
/**
* Get the data stream enable
* @return 1 to send via zmq, else 0
* @return data stream enable
*/
virtual uint32_t getDataStreamEnable() const = 0;
virtual bool getDataStreamEnable() const = 0;
/**
* Get Acquisition Period
@ -393,8 +393,9 @@ class UDPInterface {
/**
* Set Short Frame Enabled, later will be moved to getROI (so far only for gotthard)
* @param i index of adc enabled, else -1 if all enabled
* @return OK or FAIL
*/
virtual void setShortFrameEnable(const int i) = 0;
virtual int setShortFrameEnable(const int i) = 0;
/**
* Set the Frequency of Frames Sent to GUI
@ -411,10 +412,10 @@ class UDPInterface {
/**
* Set the data stream enable
* @param enable 0 to disable, 1 to enable
* @param enable data stream enable
* @return OK or FAIL
*/
virtual uint32_t setDataStreamEnable(const uint32_t enable) = 0;
virtual int setDataStreamEnable(const bool enable) = 0;
/**
* Set Acquisition Period

View File

@ -5,45 +5,26 @@
* @file UDPBaseImplementation.h
* @short does all the functions for a receiver, set/get parameters, start/stop etc.
***********************************************/
#include "UDPBaseImplementation.h"
#include "genericSocket.h"
#include "circularFifo.h"
#include "singlePhotonDetector.h"
#include "slsReceiverData.h"
#include "moenchCommonMode.h"
#ifdef MYROOT1
#include <TTree.h>
#include <TFile.h>
#endif
#include <string.h>
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
#ifdef HDF5C
#include "H5Cpp.h"
#ifndef H5_NO_NAMESPACE
using namespace H5;
#endif
#endif
/**
* @short does all the functions for a receiver, set/get parameters, start/stop etc.
*/
#include "UDPBaseImplementation.h"
class GeneralData;
class Listener;
class DataProcessor;
class DataStreamer;
class Fifo;
class FileWriter;
#include <vector>
class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBaseImplementation {
public:
/*************************************************************************
* Constructor & Destructor **********************************************
*************************************************************************/
//*** cosntructor & destructor ***
/**
* Constructor
*/
@ -54,36 +35,17 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
*/
virtual ~UDPStandardImplementation();
/*************************************************************************
* Getters ***************************************************************
* They access local cache of configuration or detector parameters *******
*************************************************************************/
/*************************************************************************
* Setters ***************************************************************
* They modify the local cache of configuration or detector parameters ***
*************************************************************************/
//**initial parameters***
//*** file parameters***
//*** initial parameters (behavioral)***
/**
* Set File Name Prefix (without frame index, file index and extension (_d0_f000000000000_8.raw))
* Does not check for file existence since it is created only at startReceiver
* @param c file name (max of 1000 characters)
*/
void setFileName(const char c[]);
/**
* Overridden method
* Set data compression, by saving only hits (so far implemented only for Moench and Gotthard)
* @param b true for data compression enable, else false
* Set receiver type (and corresponding detector variables in derived STANDARD class)
* It is the first function called by the client when connecting to receiver
* @param d detector type
* @return OK or FAIL
*/
int setDataCompressionEnable(const bool b);
int setDetectorType(const detectorType d);
//***acquisition count parameters***
//*** Getters ***
/**
* Get Total Frames Caught for an entire acquisition (including all scans)
* @return total number of frames caught for entire acquisition
@ -96,13 +58,25 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
*/
uint64_t getFramesCaught() const;
//***acquisition parameters***
//*** Setters ***
//*** file parameters ***
/**
* Set File Name Prefix (without frame index, file index and extension (_f000000000000_8.raw))
* Does not check for file existence since it is created only at startReceiver
* @param c file name (max of 1000 characters)
*/
void setFileName(const char c[]);
//*** acquisition parameters ***
/**
* Overridden method
* Set Short Frame Enabled, later will be moved to getROI (so far only for gotthard)
* @param i index of adc enabled, else -1 if all enabled
* @return OK or FAIL
*/
void setShortFrameEnable(const int i);
int setShortFrameEnable(const int i);
/**
* Set the Frequency of Frames Sent to GUI
@ -113,758 +87,80 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
/**
* Set the data stream enable
* @param enable 0 to disable, 1 to enable
* @param enable data stream enable
* @return OK or FAIL
*/
uint32_t setDataStreamEnable(const uint32_t enable);
int setDataStreamEnable(const bool enable);
/**
* Overridden method
* Set Acquisition Period
* @param i acquisition period
* @return OK or FAIL
*/
int setAcquisitionPeriod(const uint64_t i);
/**
* Set Acquisition Time
* @param i acquisition time
* @return OK or FAIL
*/
int setAcquisitionTime(const uint64_t i);
/**
* Overridden method
* Set Number of Frames expected by receiver from detector
* The data receiver status will change from running to idle when it gets this number of frames
* @param i number of frames expected
* @return OK or FAIL
*/
int setNumberOfFrames(const uint64_t i);
/**
* Overridden method
* Set Dynamic Range or Number of Bits Per Pixel
* @param i dynamic range that is 4, 8, 16 or 32
* @return OK or FAIL
*/
int setDynamicRange(const uint32_t i);
/**
* Overridden method
* Set Ten Giga Enable
* @param b true if 10Giga enabled, else false (1G enabled)
* @return OK or FAIL
*/
int setTenGigaEnable(const bool b);
//*** Behavioral functions ***
/**
* Overridden method
* Set Fifo Depth
* @param i fifo depth value
* @return OK or FAIL
*/
int setFifoDepth(const uint32_t i);
/*************************************************************************
* Behavioral functions***************************************************
* They may modify the status of the receiver ****************************
*************************************************************************/
//***initial functions***
/**
* Overridden method
* Set receiver type (and corresponding detector variables in derived STANDARD class)
* It is the first function called by the client when connecting to receiver
* @param d detector type
* @return OK or FAIL
*/
int setDetectorType(const detectorType d);
//***acquisition functions***
/**
* Overridden method
* Reset acquisition parameters such as total frames caught for an entire acquisition (including all scans)
*/
void resetAcquisitionCount();
/**
* Overridden method
* Start Listening for Packets by activating all configuration settings to receiver
* When this function returns, it has status RUNNING(upon SUCCESS) or IDLE (upon failure)
* @param c error message if FAIL
* @return OK or FAIL
*/
int startReceiver(char *c=NULL);
/**
* Overridden method
* Stop Listening for Packets
* Calls startReadout(), which stops listening and sets status to Transmitting
* When it has read every frame in buffer, the status changes to Run_Finished
* When this function returns, receiver has status IDLE
* Pre: status is running, semaphores have been instantiated,
* Post: udp sockets shut down, status is idle, semaphores destroyed
*/
void stopReceiver();
/**
* Overridden method
* Stop Listening to Packets
* and sets status to Transmitting
* Next step would be to get all data and stop receiver completely and return with idle state
* Pre: status is running, udp sockets have been initialized, stop receiver initiated
* Post:udp sockets closed, status is transmitting
*/
void startReadout();
/**
* Overridden method
* Shuts down and deletes UDP Sockets
* TCPIPInterface can also call this in case of illegal shutdown of receiver
* @return OK or FAIL
*/
int shutDownUDPSockets();
/**
* Overridden method
* Get the buffer-current frame read by receiver
* @param ithread writer thread
* @param c pointer to current file name
* @param raw address of pointer, pointing to current frame to send to gui
* @param startAcq start index of the acquisition
* @param startFrame start index of the scan
*/
void readFrame(int ithread, char* c,char** raw, int64_t &startAcq, int64_t &startFrame);
void resetGuiPointer(int ithread);
/**
* Overridden method
* Closes file / all files(data compression involves multiple files)
* TCPIPInterface can also call this in case of illegal shutdown of receiver
* @param ithread writer thread index
*/
void closeFile(int ithread = 0);
/**
* Activate / Deactivate Receiver
* If deactivated, receiver will write dummy packets 0xFF
* (as it will receive nothing from detector)
*/
int setActivate(int enable = -1);
private:
/*************************************************************************
* Getters ***************************************************************
* They access local cache of configuration or detector parameters *******
*************************************************************************/
/*
uint64_t (*getFrameNumber)();
uint64_t eigerGetFrameNumber();
uint64_t generalGetFrameNumber();
getframenumber = &generalgetframenumber;
if(dettpe == eiger) getframenumber = &eigerGerFramenumber;
call using getframenumber();
*/
//**initial parameters***
/**
* Delete and free member parameters
*/
void deleteMembers();
/**
* Deletes all the filter objects for single photon data
* Deals with data compression
*/
void deleteFilter();
/**
* Initialize base member parameters
*/
void initializeBaseMembers();
void DeleteMembers();
/**
* Initialize member parameters
*/
void initializeMembers();
void InitializeMembers();
/**
* Sets up all the filter objects for single photon data
* Deals with data compression
* Sets local network parameters, but requires permissions
*/
void initializeFilter();
void SetLocalNetworkParameters();
/**
* Set up the Fifo Structure for processing buffers
* between listening and writer threads
* When the parameters ahve been determined and if fifostructure needs to be changes,
* the listerning and writing threads are also destroyed together with this
* between listening and dataprocessor threads
* @return OK or FAIL
*/
int setupFifoStructure();
int SetupFifoStructure();
/*************************************************************************
* Listening and Writing Threads *****************************************
*************************************************************************/
/**
* Create Data Call Back Threads
* @param destroy is true to destroy all the threads
* @return OK or FAIL
*/
int createDataCallbackThreads(bool destroy = false);
/**
* Create Listening Threads
* @param destroy is true to destroy all the threads
*/
int createListeningThreads(bool destroy = false);
/**
* Create Writer Threads
* @param destroy is true to destroy all the threads
* @return OK or FAIL
*/
int createWriterThreads(bool destroy = false);
//*** Class Members ***
/**
* Set Thread Priorities
*/
void setThreadPriorities();
/**
* Creates UDP Sockets
* @return OK or FAIL
*/
int createUDPSockets();
/**
* Initializes writer variables and creates the first file
* also does the startAcquisitionCallBack
* @return OK or FAIL
*/
int setupWriter();
/**
* Creates new file and reset some parameters
* @param ithread writer thread index
* @return OK or FAIL
*/
int createNewFile(int ithread);
/**
* Creates new tree and file for compression
* @param ithread thread number
* @param iframe frame number
* @return OK or FAIL
*/
int createCompressionFile(int ithread, int iframe);
/**
* Static function - Starts Data Callback Thread of this object
* @param this_pointer pointer to this object
*/
static void* startDataCallbackThread(void *this_pointer);
/**
* Static function - Starts Listening Thread of this object
* @param this_pointer pointer to this object
*/
static void* startListeningThread(void *this_pointer);
/**
* Static function - Starts Writing Thread of this object
* @param this_pointer pointer to this object
*/
static void* startWritingThread(void *this_pointer);
/**
* Thread that sends data packets to client
*/
void startDataCallback();
/**
* Thread that listens to packets
* It pops the fifofree for free addresses, listens to packets and pushes them into the fifo
* This is continuously looped for each buffer in a nested loop, which is again looped for each acquisition
* Exits only for changing dynamic range, 10G parameters etc and recreated
*
*/
void startListening();
/**
* Called by startListening
* Listens to buffer, until packet(s) received or shutdownUDPsocket called by client
* Also copies carryovers from previous frame in front of buffer (gotthard and moench)
* For eiger, it ignores packets less than onePacketSize
* @param ithread listening thread index
* @param cSize number of bytes carried on from previous buffer
* @param temp temporary storage of previous buffer
* @return the number of bytes actually received
*/
int prepareAndListenBuffer(int ithread, int cSize, char* temp);
/**
* Called by startListening
* Creates the packets
* @param ithread listening thread index
* @return the number of bytes actually received
*/
int prepareAndListenBufferDeactivated(int ithread);
/**
* Called by startListening
* Listens to each packet and copies only complete frames
* until all receiver or shutdownUDPsocket called by client
* @param ithread listening thread index
* @return the number of bytes copied to buffer
*/
int prepareAndListenBufferCompleteFrames(int ithread);
/**
* Called by startListening
* Its called for the first packet of a scan or acquistion
* Sets the startframeindices and the variables to know if acquisition started
* @param ithread listening thread number
*/
void startFrameIndices(int ithread);
/**
* Called by prepareAndListenBuffer
* This is called when udp socket is shut down by client
* It pushes ffff instead of packet number into fifo
* to inform writers about the end of listening session
* Then sets the listening mask so that it stops listening and wait for next acquisition trigger
* @param ithread listening thread number
* @param numbytes number of bytes received
*/
void stopListening(int ithread, int numbytes);
/*
* Called by startListening for gotthard and moench to handle split frames
* It processes listening thread buffers by ensuring split frames are in the same buffer
* @param ithread listening thread index
* @param cSize number of bytes carried over to the next buffer to reunite with split frame
* @param temp temporary buffer to store the split frame
* @param rc number of bytes received
* @return packet count
*/
uint32_t processListeningBuffer(int ithread, int &cSize,char* temp, int rc);
/**
* Thread started which writes packets to file.
* It calls popAndCheckEndofAcquisition to pop fifo and check if it is a dummy end buffer
* It then calls a function to process and write packets to file and pushes the addresses into the fifoFree
* This is continuously looped for each buffer in a nested loop, which is again looped for each acquisition
* Exits only for changing dynamic range, 10G parameters etc and recreated
*
*/
void startWriting();
/**
* Called by processWritingBuffer and processWritingBufferPacketByPacket
* When dummy-end buffers are popped from all FIFOs (acquisition over), this is called
* It frees the FIFO addresses, closes all files
* For data compression, it waits for all threads to be done
* Changes the status to RUN_FINISHED and prints statistics
* @param ithread writing thread index
* @param wbuffer writing buffer popped out from FIFO
*/
void stopWriting(int ithread, char* wbuffer);
/**
* Called by processWritingBuffer and processWritingBufferPacketByPacket
* Updates parameters, (writes headers for eiger) and writes to file when not a dummy frame
* Copies data for gui display and frees addresses popped from FIFOs
* @param ithread writing thread index
* @param wbuffer writing buffer popped out from FIFO
* @param npackets number of packets
*/
void handleWithoutDataCompression(int ithread, char* wbuffer,uint32_t npackets);
/**
* Called by startWriting for jungfrau and eiger
* writes complete frames to file
* Copies data for gui display and frees addresses popped from FIFOs
* @param ithread writing thread index
* @param wbuffer writing buffer popped out from FIFO
*/
void handleCompleteFramesOnly(int ithread, char* wbuffer);
/**
* Calle by handleWithoutDataCompression
* Creating headers Writing to file without compression
* @param ithread writer thread index
* @param wbuffer is the address of buffer popped out of FIFO
* @param numpackets is the number of packets
*/
void writeFileWithoutCompression(int ithread, char* wbuffer,uint32_t numpackets);
/**
* Called by writeToFileWithoutCompression
* Create headers for file writing (at the moment, this is eiger specific)
* @param wbuffer writing buffer popped from FIFOs
*/
void createHeaders(char* wbuffer);
/**
* Updates the file header char aray, each time the corresp parameter is changed
* @param ithread writer thread index
*/
void updateFileHeader(int ithread);
/**
* Called by handleWithoutDataCompression and handleWithCompression after writing to file
* Copy frames for GUI and updates appropriate parameters for frequency frames to gui
* Uses semaphore for nth frame mode
* @param ithread writer thread index
* @param buffer buffer to copy
* @param numpackets number of packets to copy
*/
void copyFrameToGui(int ithread, char* buffer, uint32_t numpackets);
void waitWritingBufferForNextAcquisition(int ithread);
/**
* Called by processWritingBuffer
* Processing fifo popped buffers for data compression
* Updates parameters and writes to file
* Copies data for gui display and frees addresses popped from FIFOs
* @param ithread writing thread number
* @param wbuffer writer buffer
* @param nf number of frames
*/
void handleDataCompression(int ithread, char* wbuffer, uint64_t &nf);
/**
* Get Frame Number
* @param ithread writer thread index
* @param wbuffer writer buffer
* @param framenumber reference to the frame number
* @param packetnumber reference to the packet number
* @param subframenumber reference to the subframe number
* @oaram bunchid reference to the bunch id
* @return OK or FAIL
*/
int getFrameandPacketNumber(int ithread, char* wbuffer, uint64_t &framenumber, uint32_t &packetnumber, uint32_t &subframenumber, uint64_t &bunchid);
/**
* Find offset upto this frame number and write it to file
* @param ithread writer thread index
* @param wbuffer writer buffer
* @param offset reference of offset to look from and replaces offset to starting of nextframenumber
* @param nextFrameNumber frame number up to which data written
* @param numpackets number of packets in buffer
* @param numPacketsWritten number of packets written to file
*/
int writeUptoFrameNumber(int ithread, char* wbuffer, int &offset, uint64_t nextFrameNumber, uint32_t numpackets, int &numPacketsWritten);
/** function that returns the name variable from the receiver complete file name prefix
@param fname complete file name prefix
@returns file name
*/
string getNameFromReceiverFilePrefix(string fname);
/*************************************************************************
* Class Members *********************************************************
*************************************************************************/
/** Maximum Number of Writer Threads */
#ifdef DCOMPRESS
/**** most likely not used ***/
const static int MAX_NUMBER_OF_WRITER_THREADS = 15;
#else
const static int MAX_NUMBER_OF_WRITER_THREADS = 2;
#endif
//**detector parameters***
//*** detector parameters ***
/*Detector Readout ID*/
int detID;
/** Size of 1 buffer processed at a time */
int bufferSize;
//*** receiver parameters ***
/** Number of Threads */
int numThreads;
/** One Packet Size including headers */
int onePacketSize;
/** Number of Jobs */
int numberofJobs;
/** One Packet Size without headers */
int oneDataSize;
/** Frame Index Mask */
uint64_t frameIndexMask;
//** class objects ***
/** General Data Properties */
GeneralData* generalData;
/** Frame Index Offset */
int frameIndexOffset;
/** Listener Objects that listen to UDP and push into fifo */
std::vector <Listener*> listener;
/** Packet Index Mask */
uint64_t packetIndexMask;
/** DataProcessor Objects that pull from fifo and process data */
std::vector <DataProcessor*> dataProcessor;
/** Footer offset from start of Packet*/
int footerOffset;
/** DataStreamer Objects that stream data via ZMQ */
std::vector <DataStreamer*> dataStreamer;
/** variable to exclude missing packet */
bool excludeMissingPackets;
//***File parameters***
#ifdef MYROOT1
/** Tree where the hits are stored */
TTree *myTree[MAX_NUMBER_OF_WRITER_THREADS];
/** File where the tree is saved */
TFile *myFile[MAX_NUMBER_OF_WRITER_THREADS];
#endif
/** Complete File name */
char completeFileName[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH];
/** File Name without frame index, file index and extension (_d0_f000000000000_8.raw)*/
char fileNamePerThread[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH];
/** Maximum Frames Per File **/
uint64_t maxFramesPerFile;
const static int progressFrequency = 10;
/** If file created successfully for all Writer Threads */
bool fileCreateSuccess;
/** File header */
const static unsigned int FILE_HEADER_SIZE = 500;
char fileHeader[MAX_NUMBER_OF_WRITER_THREADS][FILE_HEADER_SIZE];
/** File Descriptor */
FILE *sfilefd[MAX_NUMBER_OF_WRITER_THREADS];
#ifdef HDF5C
DataSpace *hdf5_dataspaceId[MAX_NUMBER_OF_WRITER_THREADS];
DataSet *hdf5_datasetId[MAX_NUMBER_OF_WRITER_THREADS];
H5File *hdf5_fileId[MAX_NUMBER_OF_WRITER_THREADS];
H5File* hdf5_masterFileId;
H5File* hdf5_virtualFileId;
DataType hdf5_datatype;
#endif
//***acquisition indices/count parameters***
/** Frame Number of First Frame of an entire Acquisition (including all scans) */
uint64_t startAcquisitionIndex;
/** Frame index at start of each real time acquisition (eg. for each scan) */
uint64_t startFrameIndex;
/** Actual current frame index of each time acquisition (eg. for each scan) */
uint64_t frameIndex[MAX_NUMBER_OF_WRITER_THREADS];
/** Current Frame Number */
uint64_t currentFrameNumber[MAX_NUMBER_OF_WRITER_THREADS];
/** Previous Frame number from buffer to calculate loss */
int64_t frameNumberInPreviousFile[MAX_NUMBER_OF_WRITER_THREADS];
/** Previous Frame number from last check to calculate loss */
int64_t frameNumberInPreviousCheck[MAX_NUMBER_OF_WRITER_THREADS];
/** total packet count from last check */
int64_t totalWritingPacketCountFromLastCheck[MAX_NUMBER_OF_WRITER_THREADS];
/** Pckets currently in current file, starts new file when it reaches max */
int64_t lastFrameNumberInFile[MAX_NUMBER_OF_WRITER_THREADS];
/** packets in current file */
uint64_t totalPacketsInFile[MAX_NUMBER_OF_WRITER_THREADS];
/**Total packet count written by each writing thread */
uint64_t totalWritingPacketCount[MAX_NUMBER_OF_LISTENING_THREADS];
/* Acquisition started */
bool acqStarted;
/* Measurement started - for each thread to get progress print outs*/
bool measurementStarted[MAX_NUMBER_OF_LISTENING_THREADS];
/** Total packet Count listened to by listening threads */
int totalListeningPacketCount[MAX_NUMBER_OF_LISTENING_THREADS];
/** Total packet Count ignored by listening threads */
int totalIgnoredPacketCount[MAX_NUMBER_OF_LISTENING_THREADS];
//***receiver parameters***
/** Receiver Buffer */
char *buffer[MAX_NUMBER_OF_LISTENING_THREADS];
/** Memory allocated */
char *mem0[MAX_NUMBER_OF_LISTENING_THREADS];
/** Circular fifo to point to addresses of data listened to */
CircularFifo<char>* fifo[MAX_NUMBER_OF_LISTENING_THREADS];
/** Circular fifo to point to address already written and freed, to be reused */
CircularFifo<char>* fifoFree[MAX_NUMBER_OF_LISTENING_THREADS];
/** UDP Sockets - Detector to Receiver */
genericSocket* udpSocket[MAX_NUMBER_OF_LISTENING_THREADS];
/** Number of Jobs Per Buffer */
int numberofJobsPerBuffer;
/** Total fifo size */
uint32_t fifoSize;
/** fifo buffer header size */
uint32_t fifoBufferHeaderSize;
/** Dummy Packet identifier value */
const static uint32_t dummyPacketValue = 0xFFFFFFFF;
//***receiver to GUI parameters***
/** Current Frame copied for GUI */
char* latestData[MAX_NUMBER_OF_WRITER_THREADS];
/** Pointer to file name to be sent to GUI */
char guiFileName[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH];
/** Number of packets copied to be sent to gui (others padded) */
uint32_t guiNumPackets[MAX_NUMBER_OF_WRITER_THREADS];
/** Semaphore to synchronize Writer and GuiReader threads*/
sem_t writerGuiSemaphore[MAX_NUMBER_OF_WRITER_THREADS]; //datacompression, only first thread sends to gui
/** Semaphore to synchronize Writer and GuiReader threads*/
sem_t dataCallbackWriterSemaphore[MAX_NUMBER_OF_WRITER_THREADS]; //datacompression, only first thread sends to gui
/** counter for nth frame to gui */
int frametoGuiCounter[MAX_NUMBER_OF_WRITER_THREADS];
//***data call back thread parameters***
/** Ensures if zmq threads created successfully */
bool zmqThreadStarted;
/** Number of data callback Threads */
int numberofDataCallbackThreads;
/** Data Callback Threads */
pthread_t dataCallbackThreads[MAX_NUMBER_OF_LISTENING_THREADS];
/** Semaphores Synchronizing DataCallback Threads */
sem_t dataCallbackSemaphore[MAX_NUMBER_OF_LISTENING_THREADS];
/** Mask with each bit indicating status of each data callback thread */
volatile uint32_t dataCallbackThreadsMask;
/** Set to self-terminate data callback threads waiting for semaphores */
bool killAllDataCallbackThreads;
//***general and listening thread parameters***
/** Ensures if threads created successfully */
bool threadStarted;
/** Current Thread Index*/
int currentThreadIndex;
/** Number of Listening Threads */
int numberofListeningThreads;
/** Listening Threads */
pthread_t listeningThreads[MAX_NUMBER_OF_LISTENING_THREADS];
/** Semaphores Synchronizing Listening Threads */
sem_t listenSemaphore[MAX_NUMBER_OF_LISTENING_THREADS];
/** Mask with each bit indicating status of each listening thread */
volatile uint32_t listeningThreadsMask;
/** Set to self-terminate listening threads waiting for semaphores */
bool killAllListeningThreads;
//***writer thread parameters***
/** Number of Writer Threads */
int numberofWriterThreads;
/** Writer Threads */
pthread_t writingThreads[MAX_NUMBER_OF_WRITER_THREADS];
/** Semaphores Synchronizing Writer Threads */
sem_t writerSemaphore[MAX_NUMBER_OF_WRITER_THREADS];
/** Mask with each bit indicating status of each writer thread */
volatile uint32_t writerThreadsMask;
/** Mask with each bit indicating file created for each writer thread*/
volatile uint32_t createFileMask;
/** Set to self-terminate writer threads waiting for semaphores */
bool killAllWritingThreads;
//***data shape ***
/** Number of pixels in x axis */
int NX;
/** Number of pixels in y axis */
int NY;
int TILE_NX;
int TILE_NY;
//***filter parameters***
/** Common Mode Subtraction Enable FIXME: Always false, only moench uses, Ask Anna */
bool commonModeSubtractionEnable;
/** Moench Common Mode Subtraction */
moenchCommonMode *moenchCommonModeSubtraction;
/** Single Photon Detector Object for each writer thread */
singlePhotonDetector<uint16_t> *singlePhotonDetectorObject[MAX_NUMBER_OF_WRITER_THREADS];
/** Receiver Data Object for each writer thread */
slsReceiverData<uint16_t> *receiverData[MAX_NUMBER_OF_WRITER_THREADS];
//***mutex***
/** Status mutex */
pthread_mutex_t statusMutex;
/** Writing mutex */
pthread_mutex_t writeMutex;
/** GuiDataReady Mutex */
pthread_mutex_t dataReadyMutex;
/** Progress (currentFrameNumber) Mutex */
pthread_mutex_t progressMutex;
//***callback***
/** The action which decides what the user and default responsibilities to save data are
* 0 raw data ready callback takes care of open,close,write file
* 1 callback writes file, we have to open, close it
* 2 we open, close, write file, callback does not do anything */
int cbAction;
/** Fifo Structure to store addresses of memory writes */
std::vector <Fifo*> fifo;
/** File writer implemented as binary or hdf5 filewriter */
std::vector <FileWriter*> fileWriter;
};

View File

@ -0,0 +1,873 @@
//#ifdef UDP_BASE_IMPLEMENTATION
#ifndef UDP_STANDARD_IMPLEMENTATION_H
#define UDP_STANDARD_IMPLEMENTATION_H
/********************************************//**
* @file UDPBaseImplementation.h
* @short does all the functions for a receiver, set/get parameters, start/stop etc.
***********************************************/
#include "UDPBaseImplementation.h"
#include "genericSocket.h"
#include "circularFifo.h"
#include "singlePhotonDetector.h"
#include "slsReceiverData.h"
#include "moenchCommonMode.h"
#ifdef MYROOT1
#include <TTree.h>
#include <TFile.h>
#endif
#include <string.h>
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
#ifdef HDF5C
#include "H5Cpp.h"
#ifndef H5_NO_NAMESPACE
using namespace H5;
#endif
#endif
/**
* @short does all the functions for a receiver, set/get parameters, start/stop etc.
*/
class UDPStandardImplementationCopy: private virtual slsReceiverDefs, public UDPBaseImplementation {
public:
/*************************************************************************
* Constructor & Destructor **********************************************
*************************************************************************/
/**
* Constructor
*/
UDPStandardImplementation();
/**
* Destructor
*/
virtual ~UDPStandardImplementation();
/*************************************************************************
* Getters ***************************************************************
* They access local cache of configuration or detector parameters *******
*************************************************************************/
/*************************************************************************
* Setters ***************************************************************
* They modify the local cache of configuration or detector parameters ***
*************************************************************************/
//**initial parameters***
//*** file parameters***
/**
* Set File Name Prefix (without frame index, file index and extension (_d0_f000000000000_8.raw))
* Does not check for file existence since it is created only at startReceiver
* @param c file name (max of 1000 characters)
*/
void setFileName(const char c[]);
/**
* Overridden method
* Set data compression, by saving only hits (so far implemented only for Moench and Gotthard)
* @param b true for data compression enable, else false
* @return OK or FAIL
*/
int setDataCompressionEnable(const bool b);
//***acquisition count parameters***
/**
* Get Total Frames Caught for an entire acquisition (including all scans)
* @return total number of frames caught for entire acquisition
*/
uint64_t getTotalFramesCaught() const;
/**
* Get Frames Caught for each real time acquisition (eg. for each scan)
* @return number of frames caught for each scan
*/
uint64_t getFramesCaught() const;
//***acquisition parameters***
/**
* Overridden method
* Set Short Frame Enabled, later will be moved to getROI (so far only for gotthard)
* @param i index of adc enabled, else -1 if all enabled
*/
void setShortFrameEnable(const int i);
/**
* Set the Frequency of Frames Sent to GUI
* @param freq 0 for random frame requests, n for nth frame frequency
* @return OK or FAIL
*/
int setFrameToGuiFrequency(const uint32_t freq);
/**
* Set the data stream enable
* @param enable 0 to disable, 1 to enable
* @return OK or FAIL
*/
uint32_t setDataStreamEnable(const uint32_t enable);
/**
* Overridden method
* Set Acquisition Period
* @param i acquisition period
* @return OK or FAIL
*/
int setAcquisitionPeriod(const uint64_t i);
/**
* Set Acquisition Time
* @param i acquisition time
* @return OK or FAIL
*/
int setAcquisitionTime(const uint64_t i);
/**
* Overridden method
* Set Number of Frames expected by receiver from detector
* The data receiver status will change from running to idle when it gets this number of frames
* @param i number of frames expected
* @return OK or FAIL
*/
int setNumberOfFrames(const uint64_t i);
/**
* Overridden method
* Set Dynamic Range or Number of Bits Per Pixel
* @param i dynamic range that is 4, 8, 16 or 32
* @return OK or FAIL
*/
int setDynamicRange(const uint32_t i);
/**
* Overridden method
* Set Ten Giga Enable
* @param b true if 10Giga enabled, else false (1G enabled)
* @return OK or FAIL
*/
int setTenGigaEnable(const bool b);
/**
* Overridden method
* Set Fifo Depth
* @param i fifo depth value
* @return OK or FAIL
*/
int setFifoDepth(const uint32_t i);
/*************************************************************************
* Behavioral functions***************************************************
* They may modify the status of the receiver ****************************
*************************************************************************/
//***initial functions***
/**
* Overridden method
* Set receiver type (and corresponding detector variables in derived STANDARD class)
* It is the first function called by the client when connecting to receiver
* @param d detector type
* @return OK or FAIL
*/
int setDetectorType(const detectorType d);
//***acquisition functions***
/**
* Overridden method
* Reset acquisition parameters such as total frames caught for an entire acquisition (including all scans)
*/
void resetAcquisitionCount();
/**
* Overridden method
* Start Listening for Packets by activating all configuration settings to receiver
* When this function returns, it has status RUNNING(upon SUCCESS) or IDLE (upon failure)
* @param c error message if FAIL
* @return OK or FAIL
*/
int startReceiver(char *c=NULL);
/**
* Overridden method
* Stop Listening for Packets
* Calls startReadout(), which stops listening and sets status to Transmitting
* When it has read every frame in buffer, the status changes to Run_Finished
* When this function returns, receiver has status IDLE
* Pre: status is running, semaphores have been instantiated,
* Post: udp sockets shut down, status is idle, semaphores destroyed
*/
void stopReceiver();
/**
* Overridden method
* Stop Listening to Packets
* and sets status to Transmitting
* Next step would be to get all data and stop receiver completely and return with idle state
* Pre: status is running, udp sockets have been initialized, stop receiver initiated
* Post:udp sockets closed, status is transmitting
*/
void startReadout();
/**
* Overridden method
* Shuts down and deletes UDP Sockets
* TCPIPInterface can also call this in case of illegal shutdown of receiver
* @return OK or FAIL
*/
int shutDownUDPSockets();
/**
* Overridden method
* Get the buffer-current frame read by receiver
* @param ithread writer thread
* @param c pointer to current file name
* @param raw address of pointer, pointing to current frame to send to gui
* @param startAcq start index of the acquisition
* @param startFrame start index of the scan
*/
void readFrame(int ithread, char* c,char** raw, int64_t &startAcq, int64_t &startFrame);
void resetGuiPointer(int ithread);
/**
* Overridden method
* Closes file / all files(data compression involves multiple files)
* TCPIPInterface can also call this in case of illegal shutdown of receiver
* @param ithread writer thread index
*/
void closeFile(int ithread = 0);
/**
* Activate / Deactivate Receiver
* If deactivated, receiver will write dummy packets 0xFF
* (as it will receive nothing from detector)
*/
int setActivate(int enable = -1);
private:
/*************************************************************************
* Getters ***************************************************************
* They access local cache of configuration or detector parameters *******
*************************************************************************/
/*
uint64_t (*getFrameNumber)();
uint64_t eigerGetFrameNumber();
uint64_t generalGetFrameNumber();
getframenumber = &generalgetframenumber;
if(dettpe == eiger) getframenumber = &eigerGerFramenumber;
call using getframenumber();
*/
//**initial parameters***
/**
* Delete and free member parameters
*/
void deleteMembers();
/**
* Deletes all the filter objects for single photon data
* Deals with data compression
*/
void deleteFilter();
/**
* Initialize base member parameters
*/
void initializeBaseMembers();
/**
* Initialize member parameters
*/
void initializeMembers();
/**
* Sets up all the filter objects for single photon data
* Deals with data compression
*/
void initializeFilter();
/**
* Set up the Fifo Structure for processing buffers
* between listening and writer threads
* When the parameters ahve been determined and if fifostructure needs to be changes,
* the listerning and writing threads are also destroyed together with this
* @return OK or FAIL
*/
int setupFifoStructure();
/*************************************************************************
* Listening and Writing Threads *****************************************
*************************************************************************/
/**
* Create Data Call Back Threads
* @param destroy is true to destroy all the threads
* @return OK or FAIL
*/
int createDataCallbackThreads(bool destroy = false);
/**
* Create Listening Threads
* @param destroy is true to destroy all the threads
*/
int createListeningThreads(bool destroy = false);
/**
* Create Writer Threads
* @param destroy is true to destroy all the threads
* @return OK or FAIL
*/
int createWriterThreads(bool destroy = false);
/**
* Set Thread Priorities
*/
void setThreadPriorities();
/**
* Creates UDP Sockets
* @return OK or FAIL
*/
int createUDPSockets();
/**
* Initializes writer variables and creates the first file
* also does the startAcquisitionCallBack
* @return OK or FAIL
*/
int setupWriter();
/**
* Creates new file and reset some parameters
* @param ithread writer thread index
* @return OK or FAIL
*/
int createNewFile(int ithread);
/**
* Creates new tree and file for compression
* @param ithread thread number
* @param iframe frame number
* @return OK or FAIL
*/
int createCompressionFile(int ithread, int iframe);
/**
* Static function - Starts Data Callback Thread of this object
* @param this_pointer pointer to this object
*/
static void* startDataCallbackThread(void *this_pointer);
/**
* Static function - Starts Listening Thread of this object
* @param this_pointer pointer to this object
*/
static void* startListeningThread(void *this_pointer);
/**
* Static function - Starts Writing Thread of this object
* @param this_pointer pointer to this object
*/
static void* startWritingThread(void *this_pointer);
/**
* Thread that sends data packets to client
*/
void startDataCallback();
/**
* Thread that listens to packets
* It pops the fifofree for free addresses, listens to packets and pushes them into the fifo
* This is continuously looped for each buffer in a nested loop, which is again looped for each acquisition
* Exits only for changing dynamic range, 10G parameters etc and recreated
*
*/
void startListening();
/**
* Called by startListening
* Listens to buffer, until packet(s) received or shutdownUDPsocket called by client
* Also copies carryovers from previous frame in front of buffer (gotthard and moench)
* For eiger, it ignores packets less than onePacketSize
* @param ithread listening thread index
* @param cSize number of bytes carried on from previous buffer
* @param temp temporary storage of previous buffer
* @return the number of bytes actually received
*/
int prepareAndListenBuffer(int ithread, int cSize, char* temp);
/**
* Called by startListening
* Creates the packets
* @param ithread listening thread index
* @return the number of bytes actually received
*/
int prepareAndListenBufferDeactivated(int ithread);
/**
* Called by startListening
* Listens to each packet and copies only complete frames
* until all receiver or shutdownUDPsocket called by client
* @param ithread listening thread index
* @return the number of bytes copied to buffer
*/
int prepareAndListenBufferCompleteFrames(int ithread);
/**
* Called by startListening
* Its called for the first packet of a scan or acquistion
* Sets the startframeindices and the variables to know if acquisition started
* @param ithread listening thread number
*/
void startFrameIndices(int ithread);
/**
* Called by prepareAndListenBuffer
* This is called when udp socket is shut down by client
* It pushes ffff instead of packet number into fifo
* to inform writers about the end of listening session
* Then sets the listening mask so that it stops listening and wait for next acquisition trigger
* @param ithread listening thread number
* @param numbytes number of bytes received
*/
void stopListening(int ithread, int numbytes);
/*
* Called by startListening for gotthard and moench to handle split frames
* It processes listening thread buffers by ensuring split frames are in the same buffer
* @param ithread listening thread index
* @param cSize number of bytes carried over to the next buffer to reunite with split frame
* @param temp temporary buffer to store the split frame
* @param rc number of bytes received
* @return packet count
*/
uint32_t processListeningBuffer(int ithread, int &cSize,char* temp, int rc);
/**
* Thread started which writes packets to file.
* It calls popAndCheckEndofAcquisition to pop fifo and check if it is a dummy end buffer
* It then calls a function to process and write packets to file and pushes the addresses into the fifoFree
* This is continuously looped for each buffer in a nested loop, which is again looped for each acquisition
* Exits only for changing dynamic range, 10G parameters etc and recreated
*
*/
void startWriting();
/**
* Called by processWritingBuffer and processWritingBufferPacketByPacket
* When dummy-end buffers are popped from all FIFOs (acquisition over), this is called
* It frees the FIFO addresses, closes all files
* For data compression, it waits for all threads to be done
* Changes the status to RUN_FINISHED and prints statistics
* @param ithread writing thread index
* @param wbuffer writing buffer popped out from FIFO
*/
void stopWriting(int ithread, char* wbuffer);
/**
* Called by processWritingBuffer and processWritingBufferPacketByPacket
* Updates parameters, (writes headers for eiger) and writes to file when not a dummy frame
* Copies data for gui display and frees addresses popped from FIFOs
* @param ithread writing thread index
* @param wbuffer writing buffer popped out from FIFO
* @param npackets number of packets
*/
void handleWithoutDataCompression(int ithread, char* wbuffer,uint32_t npackets);
/**
* Called by startWriting for jungfrau and eiger
* writes complete frames to file
* Copies data for gui display and frees addresses popped from FIFOs
* @param ithread writing thread index
* @param wbuffer writing buffer popped out from FIFO
*/
void handleCompleteFramesOnly(int ithread, char* wbuffer);
/**
* Calle by handleWithoutDataCompression
* Creating headers Writing to file without compression
* @param ithread writer thread index
* @param wbuffer is the address of buffer popped out of FIFO
* @param numpackets is the number of packets
*/
void writeFileWithoutCompression(int ithread, char* wbuffer,uint32_t numpackets);
/**
* Called by writeToFileWithoutCompression
* Create headers for file writing (at the moment, this is eiger specific)
* @param wbuffer writing buffer popped from FIFOs
*/
void createHeaders(char* wbuffer);
/**
* Updates the file header char aray, each time the corresp parameter is changed
* @param ithread writer thread index
*/
void updateFileHeader(int ithread);
/**
* Called by handleWithoutDataCompression and handleWithCompression after writing to file
* Copy frames for GUI and updates appropriate parameters for frequency frames to gui
* Uses semaphore for nth frame mode
* @param ithread writer thread index
* @param buffer buffer to copy
* @param numpackets number of packets to copy
*/
void copyFrameToGui(int ithread, char* buffer, uint32_t numpackets);
void waitWritingBufferForNextAcquisition(int ithread);
/**
* Called by processWritingBuffer
* Processing fifo popped buffers for data compression
* Updates parameters and writes to file
* Copies data for gui display and frees addresses popped from FIFOs
* @param ithread writing thread number
* @param wbuffer writer buffer
* @param nf number of frames
*/
void handleDataCompression(int ithread, char* wbuffer, uint64_t &nf);
/**
* Get Frame Number
* @param ithread writer thread index
* @param wbuffer writer buffer
* @param framenumber reference to the frame number
* @param packetnumber reference to the packet number
* @param subframenumber reference to the subframe number
* @oaram bunchid reference to the bunch id
* @return OK or FAIL
*/
int getFrameandPacketNumber(int ithread, char* wbuffer, uint64_t &framenumber, uint32_t &packetnumber, uint32_t &subframenumber, uint64_t &bunchid);
/**
* Find offset upto this frame number and write it to file
* @param ithread writer thread index
* @param wbuffer writer buffer
* @param offset reference of offset to look from and replaces offset to starting of nextframenumber
* @param nextFrameNumber frame number up to which data written
* @param numpackets number of packets in buffer
* @param numPacketsWritten number of packets written to file
*/
int writeUptoFrameNumber(int ithread, char* wbuffer, int &offset, uint64_t nextFrameNumber, uint32_t numpackets, int &numPacketsWritten);
/** function that returns the name variable from the receiver complete file name prefix
@param fname complete file name prefix
@returns file name
*/
string getNameFromReceiverFilePrefix(string fname);
/*************************************************************************
* Class Members *********************************************************
*************************************************************************/
/** Maximum Number of Writer Threads */
#ifdef DCOMPRESS
/**** most likely not used ***/
const static int MAX_NUMBER_OF_WRITER_THREADS = 15;
#else
const static int MAX_NUMBER_OF_WRITER_THREADS = 2;
#endif
//**detector parameters***
/*Detector Readout ID*/
int detID;
/** Size of 1 buffer processed at a time */
int bufferSize;
/** One Packet Size including headers */
int onePacketSize;
/** One Packet Size without headers */
int oneDataSize;
/** Frame Index Mask */
uint64_t frameIndexMask;
/** Frame Index Offset */
int frameIndexOffset;
/** Packet Index Mask */
uint64_t packetIndexMask;
/** Footer offset from start of Packet*/
int footerOffset;
/** variable to exclude missing packet */
bool excludeMissingPackets;
//***File parameters***
#ifdef MYROOT1
/** Tree where the hits are stored */
TTree *myTree[MAX_NUMBER_OF_WRITER_THREADS];
/** File where the tree is saved */
TFile *myFile[MAX_NUMBER_OF_WRITER_THREADS];
#endif
/** Complete File name */
char completeFileName[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH];
/** File Name without frame index, file index and extension (_d0_f000000000000_8.raw)*/
char fileNamePerThread[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH];
/** Maximum Frames Per File **/
uint64_t maxFramesPerFile;
const static int progressFrequency = 10;
/** If file created successfully for all Writer Threads */
bool fileCreateSuccess;
/** File header */
const static unsigned int FILE_HEADER_SIZE = 500;
char fileHeader[MAX_NUMBER_OF_WRITER_THREADS][FILE_HEADER_SIZE];
/** File Descriptor */
FILE *sfilefd[MAX_NUMBER_OF_WRITER_THREADS];
#ifdef HDF5C
DataSpace *hdf5_dataspaceId[MAX_NUMBER_OF_WRITER_THREADS];
DataSet *hdf5_datasetId[MAX_NUMBER_OF_WRITER_THREADS];
H5File *hdf5_fileId[MAX_NUMBER_OF_WRITER_THREADS];
H5File* hdf5_masterFileId;
H5File* hdf5_virtualFileId;
DataType hdf5_datatype;
#endif
//***acquisition indices/count parameters***
/** Frame Number of First Frame of an entire Acquisition (including all scans) */
uint64_t startAcquisitionIndex;
/** Frame index at start of each real time acquisition (eg. for each scan) */
uint64_t startFrameIndex;
/** Actual current frame index of each time acquisition (eg. for each scan) */
uint64_t frameIndex[MAX_NUMBER_OF_WRITER_THREADS];
/** Current Frame Number */
uint64_t currentFrameNumber[MAX_NUMBER_OF_WRITER_THREADS];
/** Previous Frame number from buffer to calculate loss */
int64_t frameNumberInPreviousFile[MAX_NUMBER_OF_WRITER_THREADS];
/** Previous Frame number from last check to calculate loss */
int64_t frameNumberInPreviousCheck[MAX_NUMBER_OF_WRITER_THREADS];
/** total packet count from last check */
int64_t totalWritingPacketCountFromLastCheck[MAX_NUMBER_OF_WRITER_THREADS];
/** Pckets currently in current file, starts new file when it reaches max */
int64_t lastFrameNumberInFile[MAX_NUMBER_OF_WRITER_THREADS];
/** packets in current file */
uint64_t totalPacketsInFile[MAX_NUMBER_OF_WRITER_THREADS];
/**Total packet count written by each writing thread */
uint64_t totalWritingPacketCount[MAX_NUMBER_OF_LISTENING_THREADS];
/* Acquisition started */
bool acqStarted;
/* Measurement started - for each thread to get progress print outs*/
bool measurementStarted[MAX_NUMBER_OF_LISTENING_THREADS];
/** Total packet Count listened to by listening threads */
int totalListeningPacketCount[MAX_NUMBER_OF_LISTENING_THREADS];
/** Total packet Count ignored by listening threads */
int totalIgnoredPacketCount[MAX_NUMBER_OF_LISTENING_THREADS];
//***receiver parameters***
/** Receiver Buffer */
char *buffer[MAX_NUMBER_OF_LISTENING_THREADS];
/** Memory allocated */
char *mem0[MAX_NUMBER_OF_LISTENING_THREADS];
/** Circular fifo to point to addresses of data listened to */
CircularFifo<char>* fifo[MAX_NUMBER_OF_LISTENING_THREADS];
/** Circular fifo to point to address already written and freed, to be reused */
CircularFifo<char>* fifoFree[MAX_NUMBER_OF_LISTENING_THREADS];
/** UDP Sockets - Detector to Receiver */
genericSocket* udpSocket[MAX_NUMBER_OF_LISTENING_THREADS];
/** Number of Jobs Per Buffer */
int numberofJobsPerBuffer;
/** Total fifo size */
uint32_t fifoSize;
/** fifo buffer header size */
uint32_t fifoBufferHeaderSize;
/** Dummy Packet identifier value */
const static uint32_t dummyPacketValue = 0xFFFFFFFF;
//***receiver to GUI parameters***
/** Current Frame copied for GUI */
char* latestData[MAX_NUMBER_OF_WRITER_THREADS];
/** Pointer to file name to be sent to GUI */
char guiFileName[MAX_NUMBER_OF_WRITER_THREADS][MAX_STR_LENGTH];
/** Number of packets copied to be sent to gui (others padded) */
uint32_t guiNumPackets[MAX_NUMBER_OF_WRITER_THREADS];
/** Semaphore to synchronize Writer and GuiReader threads*/
sem_t writerGuiSemaphore[MAX_NUMBER_OF_WRITER_THREADS]; //datacompression, only first thread sends to gui
/** Semaphore to synchronize Writer and GuiReader threads*/
sem_t dataCallbackWriterSemaphore[MAX_NUMBER_OF_WRITER_THREADS]; //datacompression, only first thread sends to gui
/** counter for nth frame to gui */
int frametoGuiCounter[MAX_NUMBER_OF_WRITER_THREADS];
//***data call back thread parameters***
/** Ensures if zmq threads created successfully */
bool zmqThreadStarted;
/** Number of data callback Threads */
int numberofDataCallbackThreads;
/** Data Callback Threads */
pthread_t dataCallbackThreads[MAX_NUMBER_OF_LISTENING_THREADS];
/** Semaphores Synchronizing DataCallback Threads */
sem_t dataCallbackSemaphore[MAX_NUMBER_OF_LISTENING_THREADS];
/** Mask with each bit indicating status of each data callback thread */
volatile uint32_t dataCallbackThreadsMask;
/** Set to self-terminate data callback threads waiting for semaphores */
bool killAllDataCallbackThreads;
//***general and listening thread parameters***
/** Ensures if threads created successfully */
bool threadStarted;
/** Current Thread Index*/
int currentThreadIndex;
/** Number of Listening Threads */
int numberofListeningThreads;
/** Listening Threads */
pthread_t listeningThreads[MAX_NUMBER_OF_LISTENING_THREADS];
/** Semaphores Synchronizing Listening Threads */
sem_t listenSemaphore[MAX_NUMBER_OF_LISTENING_THREADS];
/** Mask with each bit indicating status of each listening thread */
volatile uint32_t listeningThreadsMask;
/** Set to self-terminate listening threads waiting for semaphores */
bool killAllListeningThreads;
//***writer thread parameters***
/** Number of Writer Threads */
int numberofWriterThreads;
/** Writer Threads */
pthread_t writingThreads[MAX_NUMBER_OF_WRITER_THREADS];
/** Semaphores Synchronizing Writer Threads */
sem_t writerSemaphore[MAX_NUMBER_OF_WRITER_THREADS];
/** Mask with each bit indicating status of each writer thread */
volatile uint32_t writerThreadsMask;
/** Mask with each bit indicating file created for each writer thread*/
volatile uint32_t createFileMask;
/** Set to self-terminate writer threads waiting for semaphores */
bool killAllWritingThreads;
//***data shape ***
/** Number of pixels in x axis */
int NX;
/** Number of pixels in y axis */
int NY;
int TILE_NX;
int TILE_NY;
//***filter parameters***
/** Common Mode Subtraction Enable FIXME: Always false, only moench uses, Ask Anna */
bool commonModeSubtractionEnable;
/** Moench Common Mode Subtraction */
moenchCommonMode *moenchCommonModeSubtraction;
/** Single Photon Detector Object for each writer thread */
singlePhotonDetector<uint16_t> *singlePhotonDetectorObject[MAX_NUMBER_OF_WRITER_THREADS];
/** Receiver Data Object for each writer thread */
slsReceiverData<uint16_t> *receiverData[MAX_NUMBER_OF_WRITER_THREADS];
//***mutex***
/** Status mutex */
pthread_mutex_t statusMutex;
/** Writing mutex */
pthread_mutex_t writeMutex;
/** GuiDataReady Mutex */
pthread_mutex_t dataReadyMutex;
/** Progress (currentFrameNumber) Mutex */
pthread_mutex_t progressMutex;
//***callback***
/** The action which decides what the user and default responsibilities to save data are
* 0 raw data ready callback takes care of open,close,write file
* 1 callback writes file, we have to open, close it
* 2 we open, close, write file, callback does not do anything */
int cbAction;
};
#endif
//#endif

View File

@ -1,5 +1,5 @@
//#ifndef __LOG_H__
//#define __LOG_H__
#ifndef __LOG_H__
#define __LOG_H__
#include <sstream>
#include <string>
@ -243,4 +243,4 @@ inline void Output2FILE::Output(const std::string& msg, TLogLevel level)
//#endif //__LOG_H__
#endif //__LOG_H__

View File

@ -5,6 +5,144 @@
#include <stdint.h>
#define GOODBYE -200
//local network parameters
#define RECEIVE_SOCKET_BUFFER_SIZE (100*1024*1024)
#define MAX_SOCKET_INPUT_PACKET_QUEUE 250000
//files
#define DO_NOTHING 0
#define CREATE_FILES 1
#define DO_EVERYTHING 2
//binary
#define FILE_BUF_SIZE (16*1024*1024) //16mb
#define FILE_FRAME_HEADER_LENGTH 16
#define FILE_HEADER_BUNCHID_OFFSET 8
//hdf5
#ifdef HDF5C
#define HDF5_WRITER_VERSION 1.0
#define MAX_CHUNKED_IMAGES 1
#define MAX_IMAGES_IN_DATASET 1000
#endif
#define HEADER_SIZE_NUM_TOT_PACKETS 4
#define SAMPLE_TIME_IN_NS 100000000//100ms
#define MAX_JOBS_PER_THREAD 1000
#define ALL_MASK_32 0xFFFFFFFF
//gottard
#define GOTTHARD_FIFO_SIZE 25000 //cannot be less than max jobs per thread = 1000
#define GOTTHARD_PIXELS_IN_ROW 1280
#define GOTTHARD_PIXELS_IN_COL 1
#define GOTTHARD_PACKETS_PER_FRAME 2
#define GOTTHARD_ONE_PACKET_SIZE 1286
#define GOTTHARD_ONE_DATA_SIZE 1280
#define GOTTHARD_BUFFER_SIZE (GOTTHARD_ONE_PACKET_SIZE*GOTTHARD_PACKETS_PER_FRAME) //1286*2
#define GOTTHARD_DATA_BYTES (GOTTHARD_ONE_DATA_SIZE*GOTTHARD_PACKETS_PER_FRAME) //1280*2
#define GOTTHARD_FRAME_INDEX_MASK 0xFFFFFFFE
#define GOTTHARD_FRAME_INDEX_OFFSET 1
#define GOTTHARD_PACKET_INDEX_MASK 0x1
//short gotthard
#define GOTTHARD_SHORT_PIXELS_IN_ROW 256
#define GOTTHARD_SHORT_PIXELS_IN_COL 1
#define GOTTHARD_SHORT_PACKETS_PER_FRAME 1
#define GOTTHARD_SHORT_ONE_PACKET_SIZE 518
#define GOTTHARD_SHORT_ONE_DATA_SIZE 512
#define GOTTHARD_SHORT_BUFFER_SIZE (GOTTHARD_SHORT_ONE_PACKET_SIZE*GOTTHARD_SHORT_PACKETS_PER_FRAME)//518*1
#define GOTTHARD_SHORT_DATABYTES (GOTTHARD_SHORT_ONE_DATA_SIZE*GOTTHARD_SHORT_PACKETS_PER_FRAME) //512*1
#define GOTTHARD_SHORT_FRAME_INDEX_MASK 0xFFFFFFFF
#define GOTTHARD_SHORT_FRAME_INDEX_OFFSET 0
#define GOTTHARD_SHORT_PACKET_INDEX_MASK 0
//propix
#define PROPIX_FIFO_SIZE 25000 //cannot be less than max jobs per thread = 1000
#define PROPIX_PIXELS_IN_ROW 22
#define PROPIX_PIXELS_IN_COL 22
#define PROPIX_PACKETS_PER_FRAME 2
#define PROPIX_ONE_PACKET_SIZE 1286
#define PROPIX_ONE_DATA_SIZE 1280
#define PROPIX_BUFFER_SIZE (PROPIX_ONE_PACKET_SIZE*PROPIX_PACKETS_PER_FRAME) //1286*2
#define PROPIX_DATABYTES_PER_PIXEL 2
//#define PROPIX_DATA_BYTES (1280*PROPIX_PACKETS_PER_FRAME) //1280*2
#define PROPIX_DATA_BYTES (PROPIX_PIXELS_IN_ROW * PROPIX_PIXELS_IN_COL * PROPIX_DATABYTES_PER_PIXEL) //22 * 22 * 2
#define PROPIX_FRAME_INDEX_MASK 0xFFFFFFFE
#define PROPIX_FRAME_INDEX_OFFSET 1
#define PROPIX_PACKET_INDEX_MASK 0x1
//moench
#define MOENCH_FIFO_SIZE 2500 //cannot be less than max jobs per thread = 1000
#define MOENCH_BYTES_IN_ONE_ROW (MOENCH_PIXELS_IN_ONE_ROW*2)
#define MOENCH_BYTES_PER_ADC (40*2)
#define MOENCH_PIXELS_IN_ONE_ROW 160
#define MOENCH_PACKETS_PER_FRAME 40
#define MOENCH_ONE_PACKET_SIZE 1286
#define MOENCH_ONE_DATA_SIZE 1280
#define MOENCH_BUFFER_SIZE (MOENCH_ONE_PACKET_SIZE*MOENCH_PACKETS_PER_FRAME) //1286*40
#define MOENCH_DATA_BYTES (MOENCH_ONE_DATA_SIZE*MOENCH_PACKETS_PER_FRAME) //1280*40
#define MOENCH_FRAME_INDEX_MASK 0xFFFFFF00
#define MOENCH_FRAME_INDEX_OFFSET 8
#define MOENCH_PACKET_INDEX_MASK 0xFF
//jungfrau
#define JFRAU_FIFO_SIZE 2500
#define JFRAU_PIXELS_IN_ONE_ROW (256*4)
#define JFRAU_PIXELS_IN_ONE_COL (256)
#define JFRAU_BYTES_IN_ONE_ROW (JFRAU_PIXELS_IN_ONE_ROW*2)
#define JFRAU_PACKETS_PER_FRAME 128
#define JFRAU_HEADER_LENGTH 22
#define JFRAU_ONE_DATA_SIZE 8192
#define JFRAU_ONE_PACKET_SIZE (JFRAU_HEADER_LENGTH+JFRAU_ONE_DATA_SIZE) //8214
#define JFRAU_BUFFER_SIZE (JFRAU_ONE_PACKET_SIZE*JFRAU_PACKETS_PER_FRAME) //8214*128
#define JFRAU_DATA_BYTES (JFRAU_ONE_DATA_SIZE*JFRAU_PACKETS_PER_FRAME) //8192*128
#define JFRAU_FRAME_INDEX_MASK 0xffffff //mask after using struct (48 bit)
#define JFRAU_FRAME_INDEX_OFFSET 0x0 //Not Applicable, use struct
#define JFRAU_PACKET_INDEX_MASK 0x0 //Not Applicable, use struct
//jungrau chip test board
#define JCTB_FIFO_SIZE 2500
#define JCTB_PIXELS_IN_ONE_ROW 32
#define JCTB_BYTES_IN_ONE_ROW (JCTB_PIXELS_IN_ONE_ROW*2)
#define JCTB_BYTES_PER_ADC (2)
#define JCTB_PACKETS_PER_FRAME 1
#define JCTB_ONE_PACKET_SIZE 8224
#define JCTB_BUFFER_SIZE (JCTB_ONE_PACKET_SIZE*40)
#define JCTB_DATA_BYTES (8192*JCTB_PACKETS_PER_FRAME)
#define JCTB_FRAME_INDEX_MASK 0xFFFFFFFF
#define JCTB_FRAME_INDEX_OFFSET 6+8
#define JCTB_PACKET_INDEX_MASK 0xFFFFFFFF
//eiger
#define EIGER_FIFO_SIZE 100
#define EIGER_PIXELS_IN_ONE_ROW (256*2)
#define EIGER_PIXELS_IN_ONE_COL (256)
#define EIGER_PORTS_PER_READOUT 2
#define EIGER_HEADER_PACKET_LENGTH 48
#define EIGER_ONE_GIGA_CONSTANT 16
#define EIGER_TEN_GIGA_CONSTANT 4
#define EIGER_ONE_GIGA_ONE_PACKET_SIZE 1040
#define EIGER_ONE_GIGA_ONE_DATA_SIZE 1024
#define EIGER_TEN_GIGA_ONE_PACKET_SIZE 4112
#define EIGER_TEN_GIGA_ONE_DATA_SIZE 4096
#define EIGER_DATA_PACKET_HEADER_SIZE 8
#define EIGER_FRAME_INDEX_MASK 0xFFFFFFFF //32 bit for now
#define EIGER_FRAME_INDEX_OFFSET 0
#define EIGER_PACKET_INDEX_MASK 0x0
//data structures
/**
* structure of an eiger packet header
* subframenum subframe number for 32 bit mode (already written by firmware)
@ -43,127 +181,4 @@ typedef struct {
unsigned char bunchid[8];
} jfrau_packet_header_t;
#define HDF5_WRITER_VERSION 1.0
#define GOODBYE -200
#define DO_NOTHING 0
#define CREATE_FILES 1
#define DO_EVERYTHING 2
#define HEADER_SIZE_NUM_TOT_PACKETS 4
#define SAMPLE_TIME_IN_NS 100000000//100ms
#define MAX_JOBS_PER_THREAD 1000
#define ALL_MASK_32 0xFFFFFFFF
//binary file size and file header
#define FILE_BUF_SIZE (16*1024*1024) //16mb
#define FILE_FRAME_HEADER_LENGTH 16
#define FILE_HEADER_BUNCHID_OFFSET 8
//HDF5
#define MAX_CHUNKED_IMAGES 1
#define MAX_IMAGES_IN_DATASET 1000
//all max frames per file defined in sls_receiver_defs.h
#define GOTTHARD_FIFO_SIZE 25000 //cannot be less than max jobs per thread = 1000
#define GOTTHARD_PIXELS_IN_ROW 1280
#define GOTTHARD_PIXELS_IN_COL 1
#define GOTTHARD_PACKETS_PER_FRAME 2
#define GOTTHARD_ONE_PACKET_SIZE 1286
#define GOTTHARD_ONE_DATA_SIZE 1280
#define GOTTHARD_BUFFER_SIZE (GOTTHARD_ONE_PACKET_SIZE*GOTTHARD_PACKETS_PER_FRAME) //1286*2
#define GOTTHARD_DATA_BYTES (GOTTHARD_ONE_DATA_SIZE*GOTTHARD_PACKETS_PER_FRAME) //1280*2
#define GOTTHARD_FRAME_INDEX_MASK 0xFFFFFFFE
#define GOTTHARD_FRAME_INDEX_OFFSET 1
#define GOTTHARD_PACKET_INDEX_MASK 0x1
//short
#define GOTTHARD_SHORT_PIXELS_IN_ROW 256
#define GOTTHARD_SHORT_PIXELS_IN_COL 1
#define GOTTHARD_SHORT_PACKETS_PER_FRAME 1
#define GOTTHARD_SHORT_ONE_PACKET_SIZE 518
#define GOTTHARD_SHORT_ONE_DATA_SIZE 512
#define GOTTHARD_SHORT_BUFFER_SIZE (GOTTHARD_SHORT_ONE_PACKET_SIZE*GOTTHARD_SHORT_PACKETS_PER_FRAME)//518*1
#define GOTTHARD_SHORT_DATABYTES (GOTTHARD_SHORT_ONE_DATA_SIZE*GOTTHARD_SHORT_PACKETS_PER_FRAME) //512*1
#define GOTTHARD_SHORT_FRAME_INDEX_MASK 0xFFFFFFFF
#define GOTTHARD_SHORT_FRAME_INDEX_OFFSET 0
#define GOTTHARD_SHORT_PACKET_INDEX_MASK 0
#define PROPIX_FIFO_SIZE 25000 //cannot be less than max jobs per thread = 1000
#define PROPIX_PIXELS_IN_ROW 22
#define PROPIX_PIXELS_IN_COL 22
#define PROPIX_PACKETS_PER_FRAME 2
#define PROPIX_ONE_PACKET_SIZE 1286
#define PROPIX_ONE_DATA_SIZE 1280
#define PROPIX_BUFFER_SIZE (PROPIX_ONE_PACKET_SIZE*PROPIX_PACKETS_PER_FRAME) //1286*2
#define PROPIX_DATABYTES_PER_PIXEL 2
//#define PROPIX_DATA_BYTES (1280*PROPIX_PACKETS_PER_FRAME) //1280*2
#define PROPIX_DATA_BYTES (PROPIX_PIXELS_IN_ROW * PROPIX_PIXELS_IN_COL * PROPIX_DATABYTES_PER_PIXEL) //22 * 22 * 2
#define PROPIX_FRAME_INDEX_MASK 0xFFFFFFFE
#define PROPIX_FRAME_INDEX_OFFSET 1
#define PROPIX_PACKET_INDEX_MASK 0x1
#define MOENCH_FIFO_SIZE 2500 //cannot be less than max jobs per thread = 1000
#define MOENCH_BYTES_IN_ONE_ROW (MOENCH_PIXELS_IN_ONE_ROW*2)
#define MOENCH_BYTES_PER_ADC (40*2)
#define MOENCH_PIXELS_IN_ONE_ROW 160
#define MOENCH_PACKETS_PER_FRAME 40
#define MOENCH_ONE_PACKET_SIZE 1286
#define MOENCH_ONE_DATA_SIZE 1280
#define MOENCH_BUFFER_SIZE (MOENCH_ONE_PACKET_SIZE*MOENCH_PACKETS_PER_FRAME) //1286*40
#define MOENCH_DATA_BYTES (MOENCH_ONE_DATA_SIZE*MOENCH_PACKETS_PER_FRAME) //1280*40
#define MOENCH_FRAME_INDEX_MASK 0xFFFFFF00
#define MOENCH_FRAME_INDEX_OFFSET 8
#define MOENCH_PACKET_INDEX_MASK 0xFF
#define JFRAU_FIFO_SIZE 2500 //cannot be less than max jobs per thread = 1000
#define JFRAU_PIXELS_IN_ONE_ROW (256*4)
#define JFRAU_PIXELS_IN_ONE_COL (256)
#define JFRAU_BYTES_IN_ONE_ROW (JFRAU_PIXELS_IN_ONE_ROW*2)
#define JFRAU_PACKETS_PER_FRAME 128
#define JFRAU_HEADER_LENGTH 22
#define JFRAU_ONE_DATA_SIZE 8192
#define JFRAU_ONE_PACKET_SIZE (JFRAU_HEADER_LENGTH+JFRAU_ONE_DATA_SIZE) //8214
#define JFRAU_BUFFER_SIZE (JFRAU_ONE_PACKET_SIZE*JFRAU_PACKETS_PER_FRAME) //8214*128
#define JFRAU_DATA_BYTES (JFRAU_ONE_DATA_SIZE*JFRAU_PACKETS_PER_FRAME) //8192*128
#define JFRAU_FRAME_INDEX_MASK 0xffffff //mask after using struct (48 bit)
#define JFRAU_FRAME_INDEX_OFFSET 0x0 //Not Applicable, use struct
#define JFRAU_PACKET_INDEX_MASK 0x0 //Not Applicable, use struct
#define JCTB_FIFO_SIZE 2500 //cannot be less than max jobs per thread = 1000
#define JCTB_PIXELS_IN_ONE_ROW 32
#define JCTB_BYTES_IN_ONE_ROW (JCTB_PIXELS_IN_ONE_ROW*2)
#define JCTB_BYTES_PER_ADC (2)
#define JCTB_PACKETS_PER_FRAME 1
#define JCTB_ONE_PACKET_SIZE 8224
#define JCTB_BUFFER_SIZE (JCTB_ONE_PACKET_SIZE*40)
#define JCTB_DATA_BYTES (8192*JCTB_PACKETS_PER_FRAME)
#define JCTB_FRAME_INDEX_MASK 0xFFFFFFFF
#define JCTB_FRAME_INDEX_OFFSET 6+8
#define JCTB_PACKET_INDEX_MASK 0xFFFFFFFF
#define EIGER_FIFO_SIZE 100
#define EIGER_PIXELS_IN_ONE_ROW (256*2)
#define EIGER_PIXELS_IN_ONE_COL (256)
#define EIGER_MAX_PORTS 2
#define EIGER_HEADER_PACKET_LENGTH 48
#define EIGER_ONE_GIGA_CONSTANT 16
#define EIGER_TEN_GIGA_CONSTANT 4
#define EIGER_ONE_GIGA_ONE_PACKET_SIZE 1040
#define EIGER_ONE_GIGA_ONE_DATA_SIZE 1024
#define EIGER_TEN_GIGA_ONE_PACKET_SIZE 4112
#define EIGER_TEN_GIGA_ONE_DATA_SIZE 4096
#define EIGER_DATA_PACKET_HEADER_SIZE 8
#define EIGER_FRAME_INDEX_MASK 0xFFFFFFFF //32 bit for now
#define EIGER_FRAME_INDEX_OFFSET 0
#define EIGER_PACKET_INDEX_MASK 0x0
#endif

View File

@ -0,0 +1,22 @@
/************************************************
* @file BinaryFileWriter.h
* @short sets/gets properties for the binary file,
* creates/closes the file and writes data to it
***********************************************/
#include "BinaryFileWriter.h"
#include <iostream>
using namespace std;
BinaryFileWriter::BinaryFileWriter(char* fname):
FileWriter(fname) {
}
BinaryFileWriter::~BinaryFileWriter() {
}

View File

@ -0,0 +1,112 @@
/************************************************
* @file DataProcessor.h
* @short creates data processor thread that
* pulls pointers to memory addresses from fifos
* and processes data stored in them & writes them to file
***********************************************/
#include "DataProcessor.h"
#include "Fifo.h"
#include <iostream>
#include <cstring>
using namespace std;
const string DataProcessor::TypeName = "DataProcessor";
int DataProcessor::NumberofDataProcessors(0);
uint64_t DataProcessor::ErrorMask(0x0);
uint64_t DataProcessor::RunningMask(0x0);
pthread_mutex_t DataProcessor::Mutex = PTHREAD_MUTEX_INITIALIZER;
DataProcessor::DataProcessor(Fifo*& f) : ThreadObject(NumberofDataProcessors), fifo(f) {
FILE_LOG(logDEBUG) << __AT__ << " called";
if(ThreadObject::CreateThread()){
pthread_mutex_lock(&Mutex);
ErrorMask ^= (1<<index);
pthread_mutex_unlock(&Mutex);
}
NumberofDataProcessors++;
FILE_LOG(logDEBUG) << "Number of DataProcessors: " << NumberofDataProcessors << endl;
}
DataProcessor::~DataProcessor() {
FILE_LOG(logDEBUG) << __AT__ << " called";
ThreadObject::DestroyThread();
NumberofDataProcessors--;
}
/** static functions */
uint64_t DataProcessor::GetErrorMask() {
FILE_LOG(logDEBUG) << __AT__ << " called";
return ErrorMask;
}
void DataProcessor::ResetRunningMask() {
FILE_LOG(logDEBUG) << __AT__ << " called";
pthread_mutex_lock(&Mutex);
RunningMask = 0x0;
pthread_mutex_unlock(&Mutex);
}
/** non static functions */
string DataProcessor::GetType(){
return TypeName;
}
bool DataProcessor::IsRunning() {
FILE_LOG(logDEBUG) << __AT__ << " called";
return ((1 << index) & RunningMask);
}
void DataProcessor::StartRunning() {
FILE_LOG(logDEBUG) << __AT__ << " called";
pthread_mutex_lock(&Mutex);
RunningMask |= (1<<index);
pthread_mutex_unlock(&Mutex);
}
void DataProcessor::StopRunning() {
FILE_LOG(logDEBUG) << __AT__ << " called";
pthread_mutex_lock(&Mutex);
RunningMask ^= (1<<index);
pthread_mutex_unlock(&Mutex);
}
void DataProcessor::SetFifo(Fifo*& f) {
FILE_LOG(logDEBUG) << __AT__ << " called";
fifo = f;
}
void DataProcessor::ThreadExecution() {
FILE_LOG(logDEBUG) << __AT__ << " called";
char* buffer=0;
fifo->PopAddress(buffer);
#ifdef FIFODEBUG
cprintf(BLUE,"DataProcessor %d, pop 0x%p buffer:%s\n", index,(void*)(buffer),buffer);
#endif
if(!strcmp(buffer,"done")){
StopRunning();
}
fifo->FreeAddress(buffer);
}

View File

@ -0,0 +1,94 @@
/************************************************
* @file DataStreamer.h
* @short streams data from receiver via ZMQ
***********************************************/
#include "DataStreamer.h"
#include <iostream>
using namespace std;
const string DataStreamer::TypeName = "DataStreamer";
int DataStreamer::NumberofDataStreamers(0);
uint64_t DataStreamer::ErrorMask(0x0);
uint64_t DataStreamer::RunningMask(0x0);
pthread_mutex_t DataStreamer::Mutex = PTHREAD_MUTEX_INITIALIZER;
DataStreamer::DataStreamer() : ThreadObject(NumberofDataStreamers) {
FILE_LOG(logDEBUG) << __AT__ << " called";
if(ThreadObject::CreateThread()){
pthread_mutex_lock(&Mutex);
ErrorMask ^= (1<<index);
pthread_mutex_unlock(&Mutex);
}
NumberofDataStreamers++;
FILE_LOG(logDEBUG) << "Number of DataStreamers: " << NumberofDataStreamers << endl;
}
DataStreamer::~DataStreamer() {
FILE_LOG(logDEBUG) << __AT__ << " called";
ThreadObject::DestroyThread();
NumberofDataStreamers--;
}
/** static functions */
uint64_t DataStreamer::GetErrorMask() {
FILE_LOG(logDEBUG) << __AT__ << " called";
return ErrorMask;
}
void DataStreamer::ResetRunningMask() {
FILE_LOG(logDEBUG) << __AT__ << " called";
pthread_mutex_lock(&Mutex);
RunningMask = 0x0;
pthread_mutex_unlock(&Mutex);
}
/** non static functions */
string DataStreamer::GetType(){
return TypeName;
}
bool DataStreamer::IsRunning() {
FILE_LOG(logDEBUG) << __AT__ << " called";
return ((1 << index) & RunningMask);
}
void DataStreamer::StartRunning() {
FILE_LOG(logDEBUG) << __AT__ << " called";
pthread_mutex_lock(&Mutex);
RunningMask |= (1<<index);
pthread_mutex_unlock(&Mutex);
}
void DataStreamer::StopRunning() {
FILE_LOG(logDEBUG) << __AT__ << " called";
pthread_mutex_lock(&Mutex);
RunningMask ^= (1<<index);
pthread_mutex_unlock(&Mutex);
}
void DataStreamer::ThreadExecution() {
FILE_LOG(logDEBUG) << __AT__ << " called";
}

View File

@ -0,0 +1,100 @@
/************************************************
* @file Fifo.h
* @short constructs the fifo structure
* which is a circular buffer with pointers to
* parts of allocated memory
***********************************************/
#include "Fifo.h"
#include <iostream>
#include <cstdlib>
using namespace std;
int Fifo::NumberofFifoClassObjects(0);
Fifo::Fifo(uint32_t fifoItemSize, uint32_t fifoDepth, bool &success):
memory(0),
fifoBound(0),
fifoFree(0) {
FILE_LOG(logDEBUG) << __AT__ << " called";
index = NumberofFifoClassObjects++;
if(CreateFifos(fifoItemSize, fifoDepth) == FAIL)
success = false;
}
Fifo::~Fifo() {
FILE_LOG(logDEBUG) << __AT__ << " called";
DestroyFifos();
NumberofFifoClassObjects--;
}
int Fifo::CreateFifos(uint32_t fifoItemSize, uint32_t fifoDepth) {
FILE_LOG(logDEBUG) << __AT__ << " called";
//destroy if not already
DestroyFifos();
//create fifos
fifoBound = new CircularFifo<char>(fifoDepth);
fifoFree = new CircularFifo<char>(fifoDepth);
//allocate memory
memory = (char*) calloc (fifoItemSize * fifoDepth, sizeof(char));
if (memory == NULL){
FILE_LOG(logERROR) << "Could not allocate memory for fifos";
return FAIL;
}
{ //push free addresses into fifoFree fifo
char* buffer = memory;
while (buffer < (memory + fifoItemSize * (fifoDepth-1))) {
sprintf(buffer,"memory");
#ifdef FIFODEBUG
cprintf(MAGENTA,"Fifofree %d: value:%d, pop 0x%p\n", index, fifoFree->getSemValue(), (void*)(buffer));
#endif
FreeAddress(buffer);
buffer += fifoItemSize;
}
}
FILE_LOG(logINFO) << "Fifo Structure " << index << " reconstructed";
return OK;
}
void Fifo::DestroyFifos(){
FILE_LOG(logDEBUG) << __AT__ << " called";
if (fifoBound) {
delete fifoBound;
fifoBound = 0;
}
if (fifoFree) {
delete fifoFree;
fifoFree = 0;
}
if(memory) {
free(memory);
memory = 0;
}
}
void Fifo::GetNewAddress(char*& address) {
fifoFree->pop(address);
}
void Fifo::FreeAddress(char*& address) {
while(!fifoFree->push(address));
}
void Fifo::PushAddress(char*& address) {
while(!fifoBound->push(address));
}
void Fifo::PopAddress(char*& address) {
fifoBound->pop(address);
}

View File

@ -0,0 +1,19 @@
/********************************************//**
* @file FileWriter.cpp
* @short sets/gets properties for the file, creates/closes the file and writes data to it
***********************************************/
#include "FileWriter.h"
#include <iostream>
using namespace std;
FileWriter::FileWriter(){}
FileWriter::~FileWriter() {}

View File

@ -0,0 +1,24 @@
/************************************************
* @file FileWriter.h
* @short sets/gets properties for the file,
* creates/closes the file and writes data to it
***********************************************/
#include "FileWriter.h"
#include <iostream>
using namespace std;
FileWriter::FileWriter(char* fname):
fileName(fname) {
cout<<"fileName:"<<fileName<<endl;
}
FileWriter::~FileWriter() {
}
char* FileWriter::GetFileName() {
return fileName;
}

View File

@ -0,0 +1,23 @@
/************************************************
* @file HDF5FileWriter.h
* @short sets/gets properties for the HDF5 file,
* creates/closes the file and writes data to it
***********************************************/
#include "HDF5FileWriter.h"
#include <iostream>
using namespace std;
HDF5FileWriter::HDF5FileWriter(char* fname):
FileWriter(fname) {
}
HDF5FileWriter::~HDF5FileWriter() {
}

View File

@ -0,0 +1,118 @@
/************************************************
* @file Listener.cpp
* @short creates the listener thread that
* listens to udp sockets, writes data to memory
* & puts pointers to their memory addresses into fifos
***********************************************/
#include "Listener.h"
#include "Fifo.h"
#include <iostream>
#include <cstring>
using namespace std;
const string Listener::TypeName = "Listener";
int Listener::NumberofListeners(0);
uint64_t Listener::ErrorMask(0x0);
uint64_t Listener::RunningMask(0x0);
pthread_mutex_t Listener::Mutex = PTHREAD_MUTEX_INITIALIZER;
Listener::Listener(Fifo*& f) : ThreadObject(NumberofListeners), fifo(f) {
FILE_LOG(logDEBUG) << __AT__ << " called";
if(ThreadObject::CreateThread()){
pthread_mutex_lock(&Mutex);
ErrorMask ^= (1<<index);
pthread_mutex_unlock(&Mutex);
}
count = 0;
NumberofListeners++;
FILE_LOG(logDEBUG) << "Number of Listeners: " << NumberofListeners << endl;
}
Listener::~Listener() {
FILE_LOG(logDEBUG) << __AT__ << " called";
ThreadObject::DestroyThread();
NumberofListeners--;
}
/** static functions */
uint64_t Listener::GetErrorMask() {
FILE_LOG(logDEBUG) << __AT__ << " called";
return ErrorMask;
}
void Listener::ResetRunningMask() {
FILE_LOG(logDEBUG) << __AT__ << " called";
pthread_mutex_lock(&Mutex);
RunningMask = 0x0;
pthread_mutex_unlock(&Mutex);
}
/** non static functions */
string Listener::GetType(){
return TypeName;
}
bool Listener::IsRunning() {
FILE_LOG(logDEBUG) << __AT__ << " called";
return ((1 << index) & RunningMask);
}
void Listener::StartRunning() {
FILE_LOG(logDEBUG) << __AT__ << " called";
pthread_mutex_lock(&Mutex);
RunningMask |= (1<<index);
pthread_mutex_unlock(&Mutex);
}
void Listener::StopRunning() {
FILE_LOG(logDEBUG) << __AT__ << " called";
pthread_mutex_lock(&Mutex);
RunningMask ^= (1<<index);
pthread_mutex_unlock(&Mutex);
}
void Listener::SetFifo(Fifo*& f) {
FILE_LOG(logDEBUG) << __AT__ << " called";
fifo = f;
}
void Listener::ThreadExecution() {
FILE_LOG(logDEBUG) << __AT__ << " called";
char* buffer;
fifo->GetNewAddress(buffer);
#ifdef FIFODEBUG
cprintf(GREEN,"Listener %d, pop 0x%p buffer:%s\n", index,(void*)(buffer),buffer);
#endif
strcpy(buffer,"changed");
if(count == 3){
strcpy(buffer,"done\0");
StopRunning();
}
fifo->PushAddress(buffer);
count++;
}

View File

@ -0,0 +1,113 @@
/************************************************
* @file ThreadObject.h
* @short creates/destroys a thread
***********************************************/
#include "ThreadObject.h"
#include <iostream>
using namespace std;
ThreadObject::ThreadObject(int ind):
index(ind),
alive(false),
killThread(false),
thread(0) {
FILE_LOG(logDEBUG) << __AT__ << " called";
PrintMembers();
}
ThreadObject::~ThreadObject() {
FILE_LOG(logDEBUG) << __AT__ << " called";
DestroyThread();
}
void ThreadObject::PrintMembers() {
FILE_LOG(logDEBUG) << __AT__ << " called";
FILE_LOG(logDEBUG) << "Index : " << index
<< "\nalive: " << alive
<< "\nkillThread: " << killThread
<< "\npthread: " << thread;
}
void ThreadObject::DestroyThread() {
FILE_LOG(logDEBUG) << __AT__ << " called";
if(alive){
killThread = true;
sem_post(&semaphore);
pthread_join(thread,NULL);
sem_destroy(&semaphore);
killThread = false;
alive = false;
FILE_LOG(logDEBUG) << GetType() << " thread with index " << index << " destroyed successfully.";
}
}
int ThreadObject::CreateThread() {
FILE_LOG(logDEBUG) << __AT__ << " called";
if(alive){
FILE_LOG(logERROR) << "Cannot create thread " << index << ". Already alive";
return FAIL;
}
sem_init(&semaphore,1,0);
killThread = false;
if(pthread_create(&thread, NULL,StartThread, (void*) this)){
FILE_LOG(logERROR) << "Could not create " << GetType() << " thread with index " << index;
return FAIL;
}
alive = true;
FILE_LOG(logINFO) << GetType() << " thread " << index << " created successfully.";
return OK;
}
void* ThreadObject::StartThread(void* thisPointer) {
FILE_LOG(logDEBUG) << __AT__ << " called";
((ThreadObject*)thisPointer)->RunningThread();
return thisPointer;
}
void ThreadObject::RunningThread() {
FILE_LOG(logDEBUG) << __AT__ << " called";
while(true) {
while(IsRunning()) {
ThreadExecution();
}/*--end of inner loop */
//wait till the next acquisition
sem_wait(&semaphore);
if(killThread) {
cprintf(BLUE,"%s Thread %d: Goodbye\n",GetType().c_str(),index);
pthread_exit(NULL);
}
}/*--end of loop for each acquisition (outer loop) */
}
void ThreadObject::Continue() {
sem_post(&semaphore);
}

View File

@ -191,7 +191,7 @@ uint32_t UDPBaseImplementation::getFrameToGuiFrequency() const{ FILE_LOG(logDEBU
uint32_t UDPBaseImplementation::getFrameToGuiTimer() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return frameToGuiTimerinMS;}
uint32_t UDPBaseImplementation::getDataStreamEnable() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return dataStreamEnable;}
bool UDPBaseImplementation::getDataStreamEnable() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return dataStreamEnable;}
uint64_t UDPBaseImplementation::getAcquisitionPeriod() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return acquisitionPeriod;}
@ -342,11 +342,13 @@ void UDPBaseImplementation::setEthernetInterface(const char* c){
/***acquisition parameters***/
void UDPBaseImplementation::setShortFrameEnable(const int i){
int UDPBaseImplementation::setShortFrameEnable(const int i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
shortFrameEnable = i;
FILE_LOG(logINFO) << "Short Frame Enable: " << stringEnable(shortFrameEnable);
//overrridden child classes might return FAIL
return OK;
}
int UDPBaseImplementation::setFrameToGuiFrequency(const uint32_t freq){
@ -367,7 +369,7 @@ void UDPBaseImplementation::setFrameToGuiTimer(const uint32_t time_in_ms){
}
uint32_t UDPBaseImplementation::setDataStreamEnable(const uint32_t enable){
int UDPBaseImplementation::setDataStreamEnable(const bool enable){
FILE_LOG(logDEBUG) << __AT__ << " starting";
dataStreamEnable = enable;

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -101,7 +101,7 @@ int main(int argc, char *argv[]) {
//receiver->registerCallBackRawDataReady(rawDataReadyCallBack,NULL);
/*
//start tcp server thread
if(receiver->start() == slsReceiverDefs::OK){
FILE_LOG(logDEBUG1) << "DONE!" << endl;
@ -113,7 +113,7 @@ int main(int argc, char *argv[]) {
//stop tcp server thread, stop udp socket
receiver->stop();
}
*/
deleteReceiver(receiver);
cout << "Goodbye!" << endl;
return 0;

View File

@ -127,6 +127,7 @@ slsReceiver::slsReceiver(int argc, char *argv[], int &success){
udp_interface = UDPInterface::create(udp_interface_type);
udp_interface->configure(configuration_map);
#endif
udp_interface = UDPInterface::create("standard");
tcpipInterface = new slsReceiverTCPIPInterface(success, udp_interface, tcpip_port_no);
}
}

View File

@ -23,8 +23,8 @@ using namespace std;
slsReceiverTCPIPInterface::~slsReceiverTCPIPInterface() {
stop();
if(mySock) {delete mySock; mySock=NULL;}
/*stop();
if(mySock) {delete mySock; mySock=NULL;}*/
}
slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int &success, UDPInterface* rbase, int pn):
@ -58,7 +58,7 @@ slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int &success, UDPInterface*
success=OK;
//create socket
/*//create socket
if(success == OK){
mySock = new MySocketTCP(port_no);
if (mySock->getErrorStatus()) {
@ -76,7 +76,7 @@ slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int &success, UDPInterface*
cout << "Function table assigned." << endl;
#endif
}
}
}*/
}
@ -2927,8 +2927,8 @@ int slsReceiverTCPIPInterface::set_fifo_depth() {
ret=FAIL;
}else{
retval = receiverBase->getFifoDepth();
if(value >= 0 && retval != value)
ret = FAIL;
/*if(value >= 0 && retval != value)
ret = FAIL;*/
}
}