Merge branch 'developer' into separateRxr

This commit is contained in:
maliakal_d 2020-04-20 18:33:37 +02:00
commit e39ec65d19
22 changed files with 267 additions and 457 deletions

View File

@ -898,7 +898,7 @@ int Feb_Control_SendDACValue(unsigned int dst_num, unsigned int ch, unsigned int
int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int *trimbits) {
int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int *trimbits, int top) {
LOG(logINFO, ("Setting Trimbits\n"));
//for (int iy=10000;iy<20020;++iy)//263681
@ -963,7 +963,7 @@ int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int *trimbits) {
int i;
for(i=0;i<8;i++) { // column loop i
if (Module_TopAddressIsValid(&modules[1])) {
if (top==1) {
trimbits_to_load_l[offset+chip_sc] |= ( 0x7 & trimbits[row_set*16480+super_column_start_position_l+i])<<((7-i)*4);//low
trimbits_to_load_l[offset+chip_sc+32] |= ((0x38 & trimbits[row_set*16480+super_column_start_position_l+i])>>3)<<((7-i)*4);//upper
trimbits_to_load_r[offset+chip_sc] |= ( 0x7 & trimbits[row_set*16480+super_column_start_position_r+i])<<((7-i)*4);//low
@ -1572,12 +1572,12 @@ int Feb_Control_StopAcquisition() {
int Feb_Control_SaveAllTrimbitsTo(int value) {
int Feb_Control_SaveAllTrimbitsTo(int value, int top) {
unsigned int chanregs[Feb_Control_trimbit_size];
int i;
for(i=0;i<Feb_Control_trimbit_size;i++)
chanregs[i] = value;
return Feb_Control_SetTrimbits(0,chanregs);
return Feb_Control_SetTrimbits(0,chanregs, top);
}

View File

@ -93,9 +93,9 @@ int Feb_Control_SetDAC(char* s, int value, int is_a_voltage_mv);
int Feb_Control_GetDAC(char* s, int* ret_value, int voltage_mv);
int Feb_Control_GetDACName(unsigned int dac_num,char* s);
int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int* trimbits);
int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int* trimbits, int top);
unsigned int* Feb_Control_GetTrimbits();
int Feb_Control_SaveAllTrimbitsTo(int value);
int Feb_Control_SaveAllTrimbitsTo(int value, int top);
int Feb_Control_Reset();
int Feb_Control_PrepareForAcquisition();

View File

@ -890,7 +890,7 @@ int setModule(sls_detector_module myMod, char* mess) {
}
//set trimbits
if (!Feb_Control_SetTrimbits(Feb_Control_GetModuleNumber(), tt)) {
if (!Feb_Control_SetTrimbits(Feb_Control_GetModuleNumber(), tt,top)) {
sprintf(mess, "Could not set module. Could not set trimbits\n");
LOG(logERROR, (mess));
setSettings(UNDEFINED);
@ -1671,7 +1671,7 @@ void setExternalGating(int enable[]) {
int setAllTrimbits(int val) {
#ifndef VIRTUAL
if (!Feb_Control_SaveAllTrimbitsTo(val)) {
if (!Feb_Control_SaveAllTrimbitsTo(val,top)) {
LOG(logERROR, ("Could not set all trimbits\n"));
return FAIL;
}

View File

@ -24,19 +24,16 @@ using Interface = sls::ServerInterface;
ClientInterface::~ClientInterface() {
killTcpThread = true;
// shut down tcp sockets
if (server.get() != nullptr) {
LOG(logINFO) << "Shutting down TCP Socket on port " << portNumber;
server->shutDownSocket();
LOG(logDEBUG) << "TCP Socket closed on port " << portNumber;
}
// shut down tcp thread
LOG(logINFO) << "Shutting down TCP Socket on port " << portNumber;
server.shutdown();
LOG(logDEBUG) << "TCP Socket closed on port " << portNumber;
tcpThread->join();
}
ClientInterface::ClientInterface(int portNumber)
: myDetectorType(GOTTHARD),
portNumber(portNumber > 0 ? portNumber : DEFAULT_PORTNO + 2) {
portNumber(portNumber > 0 ? portNumber : DEFAULT_PORTNO + 2),
server(portNumber) {
functionTable();
// start up tcp thread
tcpThread = sls::make_unique<std::thread>(&ClientInterface::startTCPServer, this);
@ -73,11 +70,11 @@ void ClientInterface::startTCPServer() {
LOG(logINFOBLUE) << "Created [ TCP server Tid: " << syscall(SYS_gettid) << "]";
LOG(logINFO) << "SLS Receiver starting TCP Server on port "
<< portNumber << '\n';
server = sls::make_unique<sls::ServerSocket>(portNumber);
while (true) {
// server = sls::make_unique<sls::ServerSocket>(portNumber);
while (!killTcpThread) {
LOG(logDEBUG1) << "Start accept loop";
try {
auto socket = server->accept();
auto socket = server.accept();
try {
verifyLock();
ret = decodeFunction(socket);
@ -95,10 +92,6 @@ void ClientInterface::startTCPServer() {
} catch (const RuntimeError &e) {
LOG(logERROR) << "Accept failed";
}
// destructor to kill this thread
if (killTcpThread) {
break;
}
}
if (receiver) {
@ -253,7 +246,7 @@ void ClientInterface::validate(T arg, T retval, const std::string& modename,
}
void ClientInterface::verifyLock() {
if (lockedByClient && server->getThisClient() != server->getLockedBy()) {
if (lockedByClient && server.getThisClient() != server.getLockedBy()) {
throw sls::SocketError("Receiver locked\n");
}
}
@ -299,10 +292,10 @@ int ClientInterface::lock_receiver(Interface &socket) {
auto lock = socket.Receive<int>();
LOG(logDEBUG1) << "Locking Server to " << lock;
if (lock >= 0) {
if (!lockedByClient || (server->getLockedBy() == server->getThisClient())) {
if (!lockedByClient || (server.getLockedBy() == server.getThisClient())) {
lockedByClient = lock;
lock ? server->setLockedBy(server->getThisClient())
: server->setLockedBy(sls::IpAddr{});
lock ? server.setLockedBy(server.getThisClient())
: server.setLockedBy(sls::IpAddr{});
} else {
throw RuntimeError("Receiver locked\n");
}
@ -311,7 +304,7 @@ int ClientInterface::lock_receiver(Interface &socket) {
}
int ClientInterface::get_last_client_ip(Interface &socket) {
return socket.sendResult(server->getLastClient());
return socket.sendResult(server.getLastClient());
}
int ClientInterface::set_port(Interface &socket) {
@ -321,9 +314,11 @@ int ClientInterface::set_port(Interface &socket) {
" is too low (<1024)");
LOG(logINFO) << "TCP port set to " << p_number << std::endl;
auto new_server = sls::make_unique<sls::ServerSocket>(p_number);
new_server->setLockedBy(server->getLockedBy());
new_server->setLastClient(server->getThisClient());
sls::ServerSocket new_server(p_number);
// auto new_server = sls::make_unique<sls::ServerSocket>(p_number);
new_server.setLockedBy(server.getLockedBy());
new_server.setLastClient(server.getThisClient());
// server = std::move(new_server);
server = std::move(new_server);
socket.sendResult(p_number);
return OK;

View File

@ -10,8 +10,18 @@ class ServerInterface;
#include <future>
class ClientInterface : private virtual slsDetectorDefs {
private:
enum numberMode { DEC, HEX };
detectorType myDetectorType;
int portNumber{0};
sls::ServerSocket server;
std::unique_ptr<Implementation> receiver;
std::unique_ptr<std::thread> tcpThread;
int ret{OK};
int fnum{-1};
int lockedByClient{0};
std::atomic<bool> killTcpThread{false};
public:
virtual ~ClientInterface();
@ -49,7 +59,6 @@ class ClientInterface : private virtual slsDetectorDefs {
void verifyLock();
void verifyIdle(sls::ServerInterface &socket);
int exec_command(sls::ServerInterface &socket);
int exit_server(sls::ServerInterface &socket);
int lock_receiver(sls::ServerInterface &socket);
@ -144,7 +153,6 @@ class ClientInterface : private virtual slsDetectorDefs {
int get_additional_json_parameter(sls::ServerInterface &socket);
int get_progress(sls::ServerInterface &socket);
Implementation *impl() {
if (receiver != nullptr) {
return receiver.get();
@ -154,18 +162,8 @@ class ClientInterface : private virtual slsDetectorDefs {
}
}
detectorType myDetectorType;
std::unique_ptr<Implementation> receiver{nullptr};
int (ClientInterface::*flist[NUM_REC_FUNCTIONS])(
sls::ServerInterface &socket);
int ret{OK};
int fnum{-1};
int lockedByClient{0};
int portNumber{0};
std::atomic<bool> killTcpThread{false};
std::unique_ptr<std::thread> tcpThread;
//***callback parameters***
@ -179,6 +177,6 @@ class ClientInterface : private virtual slsDetectorDefs {
void *) = nullptr;
void *pRawDataReady{nullptr};
protected:
std::unique_ptr<sls::ServerSocket> server{nullptr};
};

View File

@ -29,13 +29,9 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f,
uint32_t* freq, uint32_t* timer,
bool* fp, bool* act, bool* depaden, bool* sm, bool* qe,
std::vector <int> * cdl, int* cdo, int* cad) :
ThreadObject(ind, TypeName),
runningFlag(false),
generalData(nullptr),
fifo(f),
myDetectorType(dtype),
file(nullptr),
dataStreamEnable(dsEnable),
fileFormatType(ftype),
fileWriteEnable(fwenable),
@ -43,7 +39,6 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f,
dynamicRange(dr),
streamingFrequency(freq),
streamingTimerInMs(timer),
currentFreqCount(0),
activated(act),
deactivatedPaddingEnable(depaden),
silentMode(sm),
@ -51,14 +46,7 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f,
framePadding(fp),
ctbDbitList(cdl),
ctbDbitOffset(cdo),
ctbAnalogDataBytes(cad),
startedFlag(false),
firstIndex(0),
numFramesCaught(0),
currentFrameIndex(0),
rawDataReadyCallBack(nullptr),
rawDataModifyReadyCallBack(nullptr),
pRawDataReady(nullptr)
ctbAnalogDataBytes(cad)
{
LOG(logDEBUG) << "DataProcessor " << ind << " created";
memset((void*)&timerBegin, 0, sizeof(timespec));
@ -71,10 +59,6 @@ DataProcessor::~DataProcessor() {
/** getters */
bool DataProcessor::IsRunning() {
return runningFlag;
}
bool DataProcessor::GetStartedFlag(){
return startedFlag;
}
@ -91,18 +75,6 @@ uint64_t DataProcessor::GetProcessedIndex() {
return currentFrameIndex - firstIndex;
}
/** setters */
void DataProcessor::StartRunning() {
runningFlag = true;
}
void DataProcessor::StopRunning() {
runningFlag = false;
}
void DataProcessor::SetFifo(Fifo* f) {
fifo = f;
}

View File

@ -59,11 +59,6 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
//*** getters ***
/**
* Returns if the thread is currently running
* @returns true if thread is running, else false
*/
bool IsRunning() override;
/**
* Get acquisition started flag
@ -89,17 +84,6 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
*/
uint64_t GetProcessedIndex();
//*** setters ***
/**
* Set bit in RunningMask to allow thread to run
*/
void StartRunning();
/**
* Reset bit in RunningMask to prevent thread from running
*/
void StopRunning();
/**
* Set Fifo pointer to the one given
* @param f address of Fifo pointer
@ -254,11 +238,8 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
/** type of thread */
static const std::string TypeName;
/** Object running status */
std::atomic<bool> runningFlag;
/** GeneralData (Detector Data) object */
const GeneralData* generalData;
const GeneralData* generalData{nullptr};
/** Fifo structure */
Fifo* fifo;
@ -269,7 +250,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
detectorType myDetectorType;
/** File writer implemented as binary or hdf5 File */
File* file;
File* file{nullptr};
/** Data Stream Enable */
bool* dataStreamEnable;
@ -293,7 +274,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
uint32_t* streamingTimerInMs;
/** Current frequency count */
uint32_t currentFreqCount;
uint32_t currentFreqCount{0};
/** timer beginning stamp for random streaming */
struct timespec timerBegin;
@ -324,21 +305,18 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
//acquisition start
/** Aquisition Started flag */
bool startedFlag;
std::atomic<bool> startedFlag{false};
/** Frame Number of First Frame */
uint64_t firstIndex;
std::atomic<uint64_t> firstIndex{0};
//for statistics
/** Number of complete frames caught */
uint64_t numFramesCaught;
uint64_t numFramesCaught{0};
/** Frame Number of latest processed frame number */
uint64_t currentFrameIndex;
std::atomic<uint64_t> currentFrameIndex{0};
//call back
/**
@ -349,7 +327,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
* dataSize in bytes is the size of the data in bytes.
*/
void (*rawDataReadyCallBack)(char*,
char*, uint32_t, void*);
char*, uint32_t, void*) = nullptr;
/**
* Call back for raw data (modified)
@ -359,9 +337,9 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
* revDatasize is the reference of data size in bytes. Can be modified to the new size to be written/streamed. (only smaller value).
*/
void (*rawDataModifyReadyCallBack)(char*,
char*, uint32_t &, void*);
char*, uint32_t &, void*) = nullptr;
void *pRawDataReady;
void *pRawDataReady{nullptr};

View File

@ -19,18 +19,11 @@ const std::string DataStreamer::TypeName = "DataStreamer";
DataStreamer::DataStreamer(int ind, Fifo* f, uint32_t* dr, ROI* r,
uint64_t* fi, int fd, int* nd, bool* qe, uint64_t* tot) :
ThreadObject(ind, TypeName),
runningFlag(0),
generalData(nullptr),
fifo(f),
zmqSocket(nullptr),
dynamicRange(dr),
roi(r),
adcConfigured(-1),
fileIndex(fi),
flippedDataX(fd),
startedFlag(false),
firstIndex(0),
completeBuffer(nullptr),
quadEnable(qe),
totalNumFrames(tot)
{
@ -46,23 +39,6 @@ DataStreamer::~DataStreamer() {
delete [] completeBuffer;
}
/** getters */
bool DataStreamer::IsRunning() {
return runningFlag;
}
/** setters */
void DataStreamer::StartRunning() {
runningFlag = true;
}
void DataStreamer::StopRunning() {
runningFlag = false;
}
void DataStreamer::SetFifo(Fifo* f) {
fifo = f;
}

View File

@ -42,25 +42,6 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
*/
~DataStreamer();
//*** getters ***
/**
* Returns if the thread is currently running
* @returns true if thread is running, else false
*/
bool IsRunning();
//*** setters ***
/**
* Set bit in RunningMask to allow thread to run
*/
void StartRunning();
/**
* Reset bit in RunningMask to prevent thread from running
*/
void StopRunning();
/**
* Set Fifo pointer to the one given
* @param f address of Fifo pointer
@ -158,19 +139,14 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
/** type of thread */
static const std::string TypeName;
/** Object running status */
bool runningFlag;
/** GeneralData (Detector Data) object */
const GeneralData* generalData;
const GeneralData* generalData{nullptr};
/** Fifo structure */
Fifo* fifo;
/** ZMQ Socket - Receiver to Client */
ZmqSocket* zmqSocket;
ZmqSocket* zmqSocket{nullptr};
/** Pointer to dynamic range */
uint32_t* dynamicRange;
@ -179,7 +155,7 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
ROI* roi;
/** adc Configured */
int adcConfigured;
int adcConfigured{-1};
/** Pointer to file index */
uint64_t* fileIndex;
@ -191,16 +167,16 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
std::map<std::string, std::string> additionJsonHeader;
/** Aquisition Started flag */
bool startedFlag;
bool startedFlag{nullptr};
/** Frame Number of First Frame */
uint64_t firstIndex;
uint64_t firstIndex{0};
/* File name to stream */
std::string fileNametoStream;
/** Complete buffer used for roi, eg. shortGotthard */
char* completeBuffer;
char* completeBuffer{nullptr};
/** Number of Detectors in X and Y dimension */
int numDet[2];

View File

@ -12,6 +12,7 @@
#include "container_utils.h" // For sls::make_unique<>
#include "sls_detector_exceptions.h"
#include "UdpRxSocket.h"
#include "network_utils.h"
#include <cerrno>
#include <cstring>
@ -25,12 +26,9 @@ Listener::Listener(int ind, detectorType dtype, Fifo* f, std::atomic<runStatus>*
int64_t* us, int64_t* as, uint32_t* fpf,
frameDiscardPolicy* fdp, bool* act, bool* depaden, bool* sm) :
ThreadObject(ind, TypeName),
runningFlag(0),
generalData(nullptr),
fifo(f),
myDetectorType(dtype),
status(s),
udpSocket(nullptr),
udpPortNumber(portno),
eth(e),
numImages(nf),
@ -41,19 +39,7 @@ Listener::Listener(int ind, detectorType dtype, Fifo* f, std::atomic<runStatus>*
frameDiscardMode(fdp),
activated(act),
deactivatedPaddingEnable(depaden),
silentMode(sm),
row(0),
column(0),
startedFlag(false),
firstIndex(0),
numPacketsCaught(0),
lastCaughtFrameIndex(0),
currentFrameIndex(0),
carryOverFlag(0),
udpSocketAlive(0),
numPacketsStatistic(0),
numFramesStatistic(0),
oddStartingPacket(true)
silentMode(sm)
{
LOG(logDEBUG) << "Listener " << ind << " created";
}
@ -66,20 +52,15 @@ Listener::~Listener() {
}
}
/** getters */
bool Listener::IsRunning() {
return runningFlag;
}
uint64_t Listener::GetPacketsCaught() {
uint64_t Listener::GetPacketsCaught() const {
return numPacketsCaught;
}
uint64_t Listener::GetLastFrameIndexCaught() {
uint64_t Listener::GetLastFrameIndexCaught() const {
return lastCaughtFrameIndex;
}
uint64_t Listener::GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) {
uint64_t Listener::GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) const {
if (!stoppedFlag) {
return (numPackets - numPacketsCaught);
}
@ -89,22 +70,10 @@ uint64_t Listener::GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) {
return (lastCaughtFrameIndex - firstIndex + 1) * generalData->packetsPerFrame - numPacketsCaught;
}
/** setters */
void Listener::StartRunning() {
runningFlag = true;
}
void Listener::StopRunning() {
runningFlag = false;
}
void Listener::SetFifo(Fifo* f) {
fifo = f;
}
void Listener::ResetParametersforNewAcquisition() {
runningFlag = false;
startedFlag = false;
@ -177,7 +146,7 @@ void Listener::CreateUDPSockets() {
sem_init(&semaphore_socket,1,0);
// doubled due to kernel bookkeeping (could also be less due to permissions)
*actualUDPSocketBufferSize = udpSocket->getActualUDPSocketBufferSize();
*actualUDPSocketBufferSize = udpSocket->getBufferSize();
}
@ -185,7 +154,7 @@ void Listener::CreateUDPSockets() {
void Listener::ShutDownUDPSocket() {
if(udpSocket){
udpSocketAlive = false;
udpSocket->ShutDownSocket();
udpSocket->Shutdown();
LOG(logINFO) << "Shut down of UDP port " << *udpPortNumber;
fflush(stdout);
// wait only if the threads have started as it is the threads that
@ -220,7 +189,7 @@ void Listener::CreateDummySocketForUDPSocketBufferSize(int64_t s) {
*udpSocketBufferSize);
// doubled due to kernel bookkeeping (could also be less due to permissions)
*actualUDPSocketBufferSize = g.getActualUDPSocketBufferSize();
*actualUDPSocketBufferSize = g.getBufferSize();
if (*actualUDPSocketBufferSize == -1) {
*udpSocketBufferSize = temp;
} else {

View File

@ -12,13 +12,10 @@
#include <memory>
#include <atomic>
#include "ThreadObject.h"
#include "UdpRxSocket.h"
class GeneralData;
class Fifo;
namespace sls{
class UdpRxSocket;
}
class Listener : private virtual slsDetectorDefs, public ThreadObject {
@ -53,40 +50,20 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
*/
~Listener();
//*** getters ***
/**
* Returns if the thread is currently running
* @returns true if thread is running, else false
*/
bool IsRunning() override;
/**
* Get Packets caught
* @return Packets caught
*/
uint64_t GetPacketsCaught();
uint64_t GetPacketsCaught() const;
/**
* Get Last Frame index caught
* @return last frame index caught
*/
uint64_t GetLastFrameIndexCaught();
uint64_t GetLastFrameIndexCaught() const;
/** Get number of missing packets */
uint64_t GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets);
//*** setters ***
/**
* Set bit in RunningMask to allow thread to run
*/
void StartRunning();
/**
* Reset bit in RunningMask to prevent thread from running
*/
void StopRunning();
uint64_t GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) const;
/**
* Set Fifo pointer to the one given
@ -140,7 +117,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
void RecordFirstIndex(uint64_t fnum);
/**
* Thread Exeution for Listener Class
* Thread Execution for Listener Class
* Pop free addresses, listen to udp socket,
* write to memory & push the address into fifo
*/
@ -168,16 +145,11 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
*/
void PrintFifoStatistics();
/** type of thread */
static const std::string TypeName;
/** Object running status */
std::atomic<bool> runningFlag;
/** GeneralData (Detector Data) object */
GeneralData* generalData;
GeneralData* generalData{nullptr};
/** Fifo structure */
Fifo* fifo;
@ -190,7 +162,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
std::atomic<runStatus>* status;
/** UDP Socket - Detector to Receiver */
std::unique_ptr<sls::UdpRxSocket> udpSocket;
std::unique_ptr<sls::UdpRxSocket> udpSocket{nullptr};
/** UDP Port Number */
uint32_t* udpPortNumber;
@ -228,36 +200,34 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
/** row hardcoded as 1D or 2d,
* if detector does not send them yet or
* missing packets/deactivated (eiger/jungfrau sends 2d pos) **/
uint16_t row;
uint16_t row{0};
/** column hardcoded as 2D,
* deactivated eiger/missing packets (eiger/jungfrau sends 2d pos) **/
uint16_t column;
uint16_t column{0};
// acquisition start
/** Aquisition Started flag */
std::atomic<bool> startedFlag;
std::atomic<bool> startedFlag{false};
/** Frame Number of First Frame */
uint64_t firstIndex;
uint64_t firstIndex{0};
// for acquisition summary
/** Number of complete Packets caught */
std::atomic<uint64_t> numPacketsCaught;
std::atomic<uint64_t> numPacketsCaught{0};
/** Last Frame Index caught from udp network */
std::atomic<uint64_t> lastCaughtFrameIndex;
std::atomic<uint64_t> lastCaughtFrameIndex{0};
// parameters to acquire image
/** Current Frame Index, default value is 0
* ( always check startedFlag for validity first)
*/
uint64_t currentFrameIndex;
uint64_t currentFrameIndex{0};
/** True if there is a packet carry over from previous Image */
bool carryOverFlag;
bool carryOverFlag{false};
/** Carry over packet buffer */
std::unique_ptr<char []> carryOverPacket;
@ -266,22 +236,22 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
std::unique_ptr<char []> listeningPacket;
/** if the udp socket is connected */
std::atomic<bool> udpSocketAlive;
std::atomic<bool> udpSocketAlive{false};
/** Semaphore to synchonize deleting udp socket */
/** Semaphore to synchronize deleting udp socket */
sem_t semaphore_socket;
// for print progress during acqusition
// for print progress during acquisition
/** number of packets for statistic */
uint32_t numPacketsStatistic;
uint32_t numPacketsStatistic{0};
/** number of images for statistic */
uint32_t numFramesStatistic;
uint32_t numFramesStatistic{0};
/**
* starting packet number is odd or evern, accordingly increment frame number
* starting packet number is odd or even, accordingly increment frame number
* to get first packet number as 0
* (pecific to gotthard, can vary between modules, hence defined here) */
bool oddStartingPacket;
bool oddStartingPacket{true};
};

View File

@ -42,17 +42,14 @@ int main(int argc, char *argv[]) {
LOG(logERROR) << "Could not set handler function for SIGPIPE";
}
std::unique_ptr<Receiver> receiver = nullptr;
try {
receiver = sls::make_unique<Receiver>(argc, argv);
Receiver r(argc, argv);
LOG(logINFO) << "[ Press \'Ctrl+c\' to exit ]";
sem_wait(&semaphore);
sem_destroy(&semaphore);
} catch (...) {
LOG(logINFOBLUE) << "Exiting [ Tid: " << syscall(SYS_gettid) << " ]";
throw;
//pass
}
LOG(logINFO) << "[ Press \'Ctrl+c\' to exit ]";
sem_wait(&semaphore);
sem_destroy(&semaphore);
LOG(logINFOBLUE) << "Exiting [ Tid: " << syscall(SYS_gettid) << " ]";
LOG(logINFO) << "Exiting Receiver";
return 0;

View File

@ -36,6 +36,17 @@ ThreadObject::~ThreadObject() {
sem_destroy(&semaphore);
}
bool ThreadObject::IsRunning() const{
return runningFlag;
}
void ThreadObject::StartRunning() {
runningFlag = true;
}
void ThreadObject::StopRunning() {
runningFlag = false;
}
void ThreadObject::RunningThread() {
LOG(logINFOBLUE) << "Created [ " << type << "Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]";

View File

@ -21,7 +21,9 @@ class ThreadObject : private virtual slsDetectorDefs {
public:
ThreadObject(int threadIndex, std::string threadType);
virtual ~ThreadObject();
virtual bool IsRunning() = 0;
bool IsRunning() const;
void StartRunning();
void StopRunning();
void Continue();
void SetThreadPriority(int priority);
@ -41,6 +43,7 @@ class ThreadObject : private virtual slsDetectorDefs {
int index{0};
std::string type;
std::atomic<bool> killThread{false};
std::atomic<bool> runningFlag{false};
std::unique_ptr<std::thread> threadObject;
sem_t semaphore;
};

View File

@ -8,6 +8,7 @@ set(SOURCES
src/ToString.cpp
src/network_utils.cpp
src/ZmqSocket.cpp
src/UdpRxSocket.cpp
)
set(HEADERS

View File

@ -21,7 +21,7 @@ class DataSocket {
//No copy since the class manage the underlying socket
DataSocket(const DataSocket &) = delete;
DataSocket &operator=(DataSocket const &) = delete;
int getSocketId() const { return socketId_; }
int getSocketId() const { return sockfd_; }
int Send(const void *buffer, size_t size);
@ -51,9 +51,10 @@ class DataSocket {
int setReceiveTimeout(int us);
void close();
void shutDownSocket();
void shutdown();
private:
int socketId_ = -1;
int sockfd_ = -1;
};
}; // namespace sls

View File

@ -1,141 +1,30 @@
#pragma once
/*
UdpRxSocket provies socket control to receive
data on a udp socket.
It provides a drop in replacement for
genericSocket. But please be careful since
this might be deprecated in the future
UDP socket class to receive data. The intended use is in the
receiver listener loop. Should be used RAII style...
*/
#include "network_utils.h"
#include "sls_detector_exceptions.h"
#include <cstdint>
#include <errno.h>
#include <iostream>
#include <netdb.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
#include <vector>
#include <sys/types.h> //ssize_t
namespace sls {
class UdpRxSocket {
const ssize_t packet_size;
char *buff;
int fd = -1;
const ssize_t packet_size_;
int sockfd_{-1};
public:
UdpRxSocket(int port, ssize_t packet_size, const char *hostname = nullptr,
ssize_t buffer_size = 0)
: packet_size(packet_size) {
/* hostname = nullptr -> wildcard */
size_t kernel_buffer_size = 0);
~UdpRxSocket();
bool ReceivePacket(char *dst) noexcept;
size_t getBufferSize() const;
void setBufferSize(ssize_t size);
ssize_t getPacketSize() const noexcept;
void Shutdown();
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = 0;
hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
struct addrinfo *res = 0;
const std::string portname = std::to_string(port);
if (getaddrinfo(hostname, portname.c_str(), &hints, &res)) {
throw RuntimeError("Failed at getaddrinfo with " +
std::string(hostname));
}
fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (fd == -1) {
throw RuntimeError("Failed to create UDP RX socket");
}
if (bind(fd, res->ai_addr, res->ai_addrlen) == -1) {
throw RuntimeError("Failed to bind UDP RX socket");
}
freeaddrinfo(res);
// If we get a specified buffer size that is larger than the set one
// we set it. Otherwise we leave it there since it could have been
// set by the rx_udpsocksize command
if (buffer_size) {
auto current = getBufferSize() / 2;
if (current < buffer_size) {
setBufferSize(buffer_size);
if (getBufferSize() / 2 < buffer_size) {
LOG(logWARNING)
<< "Could not set buffer size. Got: "
<< getBufferSize() / 2 << " instead of " << buffer_size;
}
}
}
// Allocate at the end to avoid memory leak if we throw
buff = new char[packet_size];
}
~UdpRxSocket() {
delete[] buff;
Shutdown();
}
const char *LastPacket() const noexcept { return buff; }
ssize_t getPacketSize() const noexcept { return packet_size; }
bool ReceivePacket() noexcept { return ReceivePacket(buff); }
bool ReceivePacket(char *dst, int flags = 0) noexcept {
auto bytes_received =
recvfrom(fd, dst, packet_size, flags, nullptr, nullptr);
return bytes_received == packet_size;
}
bool PeekPacket() noexcept{
return ReceivePacket(buff, MSG_PEEK);
}
// Only for backwards compatibility this function will be removed during
// refactoring of the receiver
ssize_t ReceiveDataOnly(char *dst) {
auto r = recvfrom(fd, dst, packet_size, 0, nullptr, nullptr);
constexpr ssize_t eiger_header_packet =
40; // only detector that has this
if (r == eiger_header_packet) {
LOG(logWARNING) << "Got header pkg";
r = recvfrom(fd, dst, packet_size, 0, nullptr, nullptr);
}
return r;
}
ssize_t getBufferSize() const {
uint64_t ret_size = 0;
socklen_t optlen = sizeof(uint64_t);
if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen) == -1)
return -1;
else
return ret_size;
}
// Only for backwards compatibility will be removed
ssize_t getActualUDPSocketBufferSize() const { return getBufferSize(); }
// Only for backwards compatibility will be removed
void ShutDownSocket() { Shutdown(); }
void setBufferSize(ssize_t size) {
socklen_t optlen = sizeof(size);
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &size, optlen)) {
throw RuntimeError("Could not set socket buffer size");
}
}
void Shutdown() {
shutdown(fd, SHUT_RDWR);
if (fd >= 0) {
close(fd);
fd = -1;
}
}
// Only for backwards compatibility, this drops the EIGER small pkt, may be
// removed
ssize_t ReceiveDataOnly(char *dst) noexcept;
};
} // namespace sls

View File

@ -15,13 +15,13 @@
namespace sls {
DataSocket::DataSocket(int socketId) : socketId_(socketId) {
DataSocket::DataSocket(int socketId) : sockfd_(socketId) {
int value = 1;
setsockopt(socketId_, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
}
DataSocket::~DataSocket() {
if (socketId_ <= 0) {
if (sockfd_ <= 0) {
return;
} else {
try {
@ -32,7 +32,7 @@ DataSocket::~DataSocket() {
}
void DataSocket::swap(DataSocket &other) noexcept {
std::swap(socketId_, other.socketId_);
std::swap(sockfd_, other.sockfd_);
}
DataSocket::DataSocket(DataSocket &&move) noexcept { move.swap(*this); }
@ -121,19 +121,23 @@ int DataSocket::setTimeOut(int t_seconds) {
}
void DataSocket::close() {
if (socketId_ > 0) {
if (::close(socketId_)) {
if (sockfd_ > 0) {
if (::close(sockfd_)) {
throw SocketError("could not close socket");
}
socketId_ = -1;
sockfd_ = -1;
} else {
throw std::runtime_error("Socket ERROR: close called on bad socket\n");
}
}
void DataSocket::shutDownSocket() {
shutdown(getSocketId(), SHUT_RDWR);
::shutdown(getSocketId(), SHUT_RDWR);
close();
}
void DataSocket::shutdown(){
::shutdown(sockfd_, SHUT_RDWR);
}
} // namespace sls

View File

@ -0,0 +1,96 @@
#include "UdpRxSocket.h"
#include "network_utils.h"
#include "sls_detector_exceptions.h"
#include <cstdint>
#include <errno.h>
#include <iostream>
#include <netdb.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
namespace sls {
UdpRxSocket::UdpRxSocket(int port, ssize_t packet_size, const char *hostname,
size_t kernel_buffer_size)
: packet_size_(packet_size) {
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = 0;
hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
struct addrinfo *res = 0;
const std::string portname = std::to_string(port);
if (getaddrinfo(hostname, portname.c_str(), &hints, &res)) {
throw RuntimeError("Failed at getaddrinfo with " +
std::string(hostname));
}
sockfd_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (sockfd_ == -1) {
throw RuntimeError("Failed to create UDP RX socket");
}
if (bind(sockfd_, res->ai_addr, res->ai_addrlen) == -1) {
throw RuntimeError("Failed to bind UDP RX socket");
}
freeaddrinfo(res);
// If we get a specified buffer size that is larger than the set one
// we set it. Otherwise we leave it there since it could have been
// set by the rx_udpsocksize command
if (kernel_buffer_size) {
auto current = getBufferSize() / 2;
if (current < kernel_buffer_size) {
setBufferSize(kernel_buffer_size);
if (getBufferSize() / 2 < kernel_buffer_size) {
LOG(logWARNING)
<< "Could not set buffer size. Got: " << getBufferSize() / 2
<< " instead of " << kernel_buffer_size;
}
}
}
}
UdpRxSocket::~UdpRxSocket() { Shutdown(); }
ssize_t UdpRxSocket::getPacketSize() const noexcept { return packet_size_; }
bool UdpRxSocket::ReceivePacket(char *dst) noexcept{
auto bytes_received =
recvfrom(sockfd_, dst, packet_size_, 0, nullptr, nullptr);
return bytes_received == packet_size_;
}
ssize_t UdpRxSocket::ReceiveDataOnly(char *dst) noexcept {
auto r = recvfrom(sockfd_, dst, packet_size_, 0, nullptr, nullptr);
constexpr ssize_t eiger_header_packet =
40; // only detector that has this
if (r == eiger_header_packet) {
LOG(logWARNING) << "Got header pkg";
r = recvfrom(sockfd_, dst, packet_size_, 0, nullptr, nullptr);
}
return r;
}
size_t UdpRxSocket::getBufferSize() const {
size_t ret = 0;
socklen_t optlen = sizeof(ret);
if (getsockopt(sockfd_, SOL_SOCKET, SO_RCVBUF, &ret, &optlen) == -1)
throw RuntimeError("Could not get socket buffer size");
return ret;
}
void UdpRxSocket::setBufferSize(ssize_t size) {
if (setsockopt(sockfd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)))
throw RuntimeError("Could not set socket buffer size");
}
void UdpRxSocket::Shutdown() {
shutdown(sockfd_, SHUT_RDWR);
if (sockfd_ >= 0) {
close(sockfd_);
sockfd_ = -1;
}
}
} // namespace sls

View File

@ -109,7 +109,7 @@ TEST_CASE("run status"){
using defs = slsDetectorDefs;
REQUIRE(ToString(defs::runStatus::ERROR) == "error");
REQUIRE(ToString(defs::runStatus::WAITING) == "waiting");
REQUIRE(ToString(defs::runStatus::TRANSMITTING) == "data"); //??
REQUIRE(ToString(defs::runStatus::TRANSMITTING) == "transmitting");
REQUIRE(ToString(defs::runStatus::RUN_FINISHED) == "finished");
REQUIRE(ToString(defs::runStatus::STOPPED) == "stopped");
REQUIRE(ToString(defs::runStatus::IDLE) == "idle");

View File

@ -4,6 +4,14 @@
#include <future>
#include <thread>
#include <vector>
#include <cstdint>
#include <errno.h>
#include <iostream>
#include <netdb.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
constexpr int default_port = 50001;
@ -29,46 +37,49 @@ int open_socket(int port) {
throw sls::RuntimeError("Failed to create UDP RX socket");
}
if (connect(fd, res->ai_addr, res->ai_addrlen)){
if (connect(fd, res->ai_addr, res->ai_addrlen)) {
throw sls::RuntimeError("Failed to connect socket");
}
freeaddrinfo(res);
return fd;
}
TEST_CASE("Receive data on localhost") {
TEST_CASE("Get packet size returns the packet size we set in the constructor"){
constexpr int port = 50001;
constexpr ssize_t packet_size = 8000;
sls::UdpRxSocket s{port, packet_size};
CHECK(s.getPacketSize() == packet_size);
}
TEST_CASE("Receive data from a vector") {
constexpr int port = 50001;
std::vector<int> data_to_send{4, 5, 3, 2, 5, 7, 2, 3};
std::vector<int> data_received(data_to_send.size());
ssize_t packet_size =
sizeof(decltype(data_to_send)::value_type) * data_to_send.size();
sls::UdpRxSocket udpsock{port, packet_size};
sls::UdpRxSocket udpsock{port, packet_size};
int fd = open_socket(port);
auto n = write(fd, data_to_send.data(), packet_size);
CHECK(n == packet_size);
CHECK(udpsock.ReceivePacket());
CHECK(udpsock.ReceivePacket((char*)data_received.data()));
close(fd);
// Copy data from buffer and compare values
std::vector<int> data_received(data_to_send.size());
memcpy(data_received.data(), udpsock.LastPacket(), udpsock.getPacketSize());
CHECK(data_received.size() == data_to_send.size()); // sanity check
for (size_t i = 0; i != data_to_send.size(); ++i) {
CHECK(data_to_send[i] == data_received[i]);
}
CHECK(data_to_send == data_received);
}
TEST_CASE("Shutdown socket without hanging when waiting for data") {
constexpr int port = 50001;
constexpr ssize_t packet_size = 8000;
sls::UdpRxSocket s{port, packet_size};
char buff[packet_size];
// Start a thread and wait for package
// if the socket is left open we would block
std::future<bool> ret =
std::async(static_cast<bool (sls::UdpRxSocket::*)()>(
&sls::UdpRxSocket::ReceivePacket),
&s);
std::async(&sls::UdpRxSocket::ReceivePacket, &s, (char *)&buff);
s.Shutdown();
auto r = ret.get();
@ -76,60 +87,23 @@ TEST_CASE("Shutdown socket without hanging when waiting for data") {
CHECK(r == false); // since we didn't get the packet
}
TEST_CASE("Too small packet"){
TEST_CASE("Too small packet") {
constexpr int port = 50001;
sls::UdpRxSocket s(port, 2*sizeof(uint32_t));
sls::UdpRxSocket s(port, 2 * sizeof(uint32_t));
auto fd = open_socket(port);
uint32_t val = 10;
write(fd, &val, sizeof(val));
CHECK(s.ReceivePacket() == false);
uint32_t buff[2];
CHECK(s.ReceivePacket((char *)&buff) == false);
close(fd);
}
TEST_CASE("Receive an int to internal buffer"){
TEST_CASE("Receive an int to an external buffer") {
int to_send = 5;
int received = -1;
auto fd = open_socket(default_port);
sls::UdpRxSocket s(default_port, sizeof(int));
write(fd, &to_send, sizeof(to_send));
CHECK(s.ReceivePacket());
memcpy(&received, s.LastPacket(), sizeof(int));
CHECK(s.ReceivePacket(reinterpret_cast<char *>(&received)));
CHECK(received == to_send);
}
TEST_CASE("Receive an int to an external buffer"){
int to_send = 5;
int received = -1;
auto fd = open_socket(default_port);
sls::UdpRxSocket s(default_port, sizeof(int));
write(fd, &to_send, sizeof(to_send));
CHECK(s.ReceivePacket(reinterpret_cast<char*>(&received)));
CHECK(received == to_send);
}
TEST_CASE("PEEK data"){
int to_send = 5;
int to_send2 = 12;
int received = -1;
auto fd = open_socket(default_port);
sls::UdpRxSocket s(default_port, sizeof(int));
write(fd, &to_send, sizeof(to_send));
write(fd, &to_send2, sizeof(to_send));
CHECK(s.PeekPacket());
memcpy(&received, s.LastPacket(), sizeof(int));
CHECK(received == to_send);
CHECK(s.PeekPacket());
memcpy(&received, s.LastPacket(), sizeof(int));
CHECK(received == to_send);
CHECK(s.ReceivePacket());
memcpy(&received, s.LastPacket(), sizeof(int));
CHECK(received == to_send);
CHECK(s.ReceivePacket());
memcpy(&received, s.LastPacket(), sizeof(int));
CHECK(received == to_send2);
}