diff --git a/slsDetectorServers/eigerDetectorServer/FebControl.c b/slsDetectorServers/eigerDetectorServer/FebControl.c index 91e3f8ab2..0ccfa98e6 100755 --- a/slsDetectorServers/eigerDetectorServer/FebControl.c +++ b/slsDetectorServers/eigerDetectorServer/FebControl.c @@ -898,7 +898,7 @@ int Feb_Control_SendDACValue(unsigned int dst_num, unsigned int ch, unsigned int -int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int *trimbits) { +int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int *trimbits, int top) { LOG(logINFO, ("Setting Trimbits\n")); //for (int iy=10000;iy<20020;++iy)//263681 @@ -963,7 +963,7 @@ int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int *trimbits) { int i; for(i=0;i<8;i++) { // column loop i - if (Module_TopAddressIsValid(&modules[1])) { + if (top==1) { trimbits_to_load_l[offset+chip_sc] |= ( 0x7 & trimbits[row_set*16480+super_column_start_position_l+i])<<((7-i)*4);//low trimbits_to_load_l[offset+chip_sc+32] |= ((0x38 & trimbits[row_set*16480+super_column_start_position_l+i])>>3)<<((7-i)*4);//upper trimbits_to_load_r[offset+chip_sc] |= ( 0x7 & trimbits[row_set*16480+super_column_start_position_r+i])<<((7-i)*4);//low @@ -1572,12 +1572,12 @@ int Feb_Control_StopAcquisition() { -int Feb_Control_SaveAllTrimbitsTo(int value) { +int Feb_Control_SaveAllTrimbitsTo(int value, int top) { unsigned int chanregs[Feb_Control_trimbit_size]; int i; for(i=0;ishutDownSocket(); - LOG(logDEBUG) << "TCP Socket closed on port " << portNumber; - } - // shut down tcp thread + LOG(logINFO) << "Shutting down TCP Socket on port " << portNumber; + server.shutdown(); + LOG(logDEBUG) << "TCP Socket closed on port " << portNumber; tcpThread->join(); } ClientInterface::ClientInterface(int portNumber) : myDetectorType(GOTTHARD), - portNumber(portNumber > 0 ? portNumber : DEFAULT_PORTNO + 2) { + portNumber(portNumber > 0 ? portNumber : DEFAULT_PORTNO + 2), + server(portNumber) { functionTable(); // start up tcp thread tcpThread = sls::make_unique(&ClientInterface::startTCPServer, this); @@ -73,11 +70,11 @@ void ClientInterface::startTCPServer() { LOG(logINFOBLUE) << "Created [ TCP server Tid: " << syscall(SYS_gettid) << "]"; LOG(logINFO) << "SLS Receiver starting TCP Server on port " << portNumber << '\n'; - server = sls::make_unique(portNumber); - while (true) { + // server = sls::make_unique(portNumber); + while (!killTcpThread) { LOG(logDEBUG1) << "Start accept loop"; try { - auto socket = server->accept(); + auto socket = server.accept(); try { verifyLock(); ret = decodeFunction(socket); @@ -95,10 +92,6 @@ void ClientInterface::startTCPServer() { } catch (const RuntimeError &e) { LOG(logERROR) << "Accept failed"; } - // destructor to kill this thread - if (killTcpThread) { - break; - } } if (receiver) { @@ -253,7 +246,7 @@ void ClientInterface::validate(T arg, T retval, const std::string& modename, } void ClientInterface::verifyLock() { - if (lockedByClient && server->getThisClient() != server->getLockedBy()) { + if (lockedByClient && server.getThisClient() != server.getLockedBy()) { throw sls::SocketError("Receiver locked\n"); } } @@ -299,10 +292,10 @@ int ClientInterface::lock_receiver(Interface &socket) { auto lock = socket.Receive(); LOG(logDEBUG1) << "Locking Server to " << lock; if (lock >= 0) { - if (!lockedByClient || (server->getLockedBy() == server->getThisClient())) { + if (!lockedByClient || (server.getLockedBy() == server.getThisClient())) { lockedByClient = lock; - lock ? server->setLockedBy(server->getThisClient()) - : server->setLockedBy(sls::IpAddr{}); + lock ? server.setLockedBy(server.getThisClient()) + : server.setLockedBy(sls::IpAddr{}); } else { throw RuntimeError("Receiver locked\n"); } @@ -311,7 +304,7 @@ int ClientInterface::lock_receiver(Interface &socket) { } int ClientInterface::get_last_client_ip(Interface &socket) { - return socket.sendResult(server->getLastClient()); + return socket.sendResult(server.getLastClient()); } int ClientInterface::set_port(Interface &socket) { @@ -321,9 +314,11 @@ int ClientInterface::set_port(Interface &socket) { " is too low (<1024)"); LOG(logINFO) << "TCP port set to " << p_number << std::endl; - auto new_server = sls::make_unique(p_number); - new_server->setLockedBy(server->getLockedBy()); - new_server->setLastClient(server->getThisClient()); + sls::ServerSocket new_server(p_number); + // auto new_server = sls::make_unique(p_number); + new_server.setLockedBy(server.getLockedBy()); + new_server.setLastClient(server.getThisClient()); + // server = std::move(new_server); server = std::move(new_server); socket.sendResult(p_number); return OK; diff --git a/slsReceiverSoftware/src/ClientInterface.h b/slsReceiverSoftware/src/ClientInterface.h index f83b7cab9..76063b5ce 100755 --- a/slsReceiverSoftware/src/ClientInterface.h +++ b/slsReceiverSoftware/src/ClientInterface.h @@ -10,8 +10,18 @@ class ServerInterface; #include class ClientInterface : private virtual slsDetectorDefs { - private: enum numberMode { DEC, HEX }; + detectorType myDetectorType; + int portNumber{0}; + sls::ServerSocket server; + std::unique_ptr receiver; + std::unique_ptr tcpThread; + int ret{OK}; + int fnum{-1}; + int lockedByClient{0}; + + std::atomic killTcpThread{false}; + public: virtual ~ClientInterface(); @@ -49,7 +59,6 @@ class ClientInterface : private virtual slsDetectorDefs { void verifyLock(); void verifyIdle(sls::ServerInterface &socket); - int exec_command(sls::ServerInterface &socket); int exit_server(sls::ServerInterface &socket); int lock_receiver(sls::ServerInterface &socket); @@ -144,7 +153,6 @@ class ClientInterface : private virtual slsDetectorDefs { int get_additional_json_parameter(sls::ServerInterface &socket); int get_progress(sls::ServerInterface &socket); - Implementation *impl() { if (receiver != nullptr) { return receiver.get(); @@ -154,19 +162,9 @@ class ClientInterface : private virtual slsDetectorDefs { } } - detectorType myDetectorType; - std::unique_ptr receiver{nullptr}; int (ClientInterface::*flist[NUM_REC_FUNCTIONS])( sls::ServerInterface &socket); - int ret{OK}; - int fnum{-1}; - int lockedByClient{0}; - int portNumber{0}; - std::atomic killTcpThread{false}; - std::unique_ptr tcpThread; - - - + //***callback parameters*** int (*startAcquisitionCallBack)(std::string, std::string, uint64_t, uint32_t, @@ -179,6 +177,6 @@ class ClientInterface : private virtual slsDetectorDefs { void *) = nullptr; void *pRawDataReady{nullptr}; - protected: - std::unique_ptr server{nullptr}; + + }; diff --git a/slsReceiverSoftware/src/DataProcessor.cpp b/slsReceiverSoftware/src/DataProcessor.cpp index ad8a89a77..e45a29a13 100755 --- a/slsReceiverSoftware/src/DataProcessor.cpp +++ b/slsReceiverSoftware/src/DataProcessor.cpp @@ -29,13 +29,9 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f, uint32_t* freq, uint32_t* timer, bool* fp, bool* act, bool* depaden, bool* sm, bool* qe, std::vector * cdl, int* cdo, int* cad) : - ThreadObject(ind, TypeName), - runningFlag(false), - generalData(nullptr), fifo(f), myDetectorType(dtype), - file(nullptr), dataStreamEnable(dsEnable), fileFormatType(ftype), fileWriteEnable(fwenable), @@ -43,7 +39,6 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f, dynamicRange(dr), streamingFrequency(freq), streamingTimerInMs(timer), - currentFreqCount(0), activated(act), deactivatedPaddingEnable(depaden), silentMode(sm), @@ -51,14 +46,7 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f, framePadding(fp), ctbDbitList(cdl), ctbDbitOffset(cdo), - ctbAnalogDataBytes(cad), - startedFlag(false), - firstIndex(0), - numFramesCaught(0), - currentFrameIndex(0), - rawDataReadyCallBack(nullptr), - rawDataModifyReadyCallBack(nullptr), - pRawDataReady(nullptr) + ctbAnalogDataBytes(cad) { LOG(logDEBUG) << "DataProcessor " << ind << " created"; memset((void*)&timerBegin, 0, sizeof(timespec)); @@ -71,10 +59,6 @@ DataProcessor::~DataProcessor() { /** getters */ -bool DataProcessor::IsRunning() { - return runningFlag; -} - bool DataProcessor::GetStartedFlag(){ return startedFlag; } @@ -91,18 +75,6 @@ uint64_t DataProcessor::GetProcessedIndex() { return currentFrameIndex - firstIndex; } - - -/** setters */ -void DataProcessor::StartRunning() { - runningFlag = true; -} - - -void DataProcessor::StopRunning() { - runningFlag = false; -} - void DataProcessor::SetFifo(Fifo* f) { fifo = f; } diff --git a/slsReceiverSoftware/src/DataProcessor.h b/slsReceiverSoftware/src/DataProcessor.h index b6cdbd21d..10f5f2f5d 100755 --- a/slsReceiverSoftware/src/DataProcessor.h +++ b/slsReceiverSoftware/src/DataProcessor.h @@ -59,11 +59,6 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { //*** getters *** - /** - * Returns if the thread is currently running - * @returns true if thread is running, else false - */ - bool IsRunning() override; /** * Get acquisition started flag @@ -89,17 +84,6 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { */ uint64_t GetProcessedIndex(); - //*** setters *** - /** - * Set bit in RunningMask to allow thread to run - */ - void StartRunning(); - - /** - * Reset bit in RunningMask to prevent thread from running - */ - void StopRunning(); - /** * Set Fifo pointer to the one given * @param f address of Fifo pointer @@ -254,11 +238,8 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { /** type of thread */ static const std::string TypeName; - /** Object running status */ - std::atomic runningFlag; - /** GeneralData (Detector Data) object */ - const GeneralData* generalData; + const GeneralData* generalData{nullptr}; /** Fifo structure */ Fifo* fifo; @@ -269,7 +250,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { detectorType myDetectorType; /** File writer implemented as binary or hdf5 File */ - File* file; + File* file{nullptr}; /** Data Stream Enable */ bool* dataStreamEnable; @@ -293,7 +274,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { uint32_t* streamingTimerInMs; /** Current frequency count */ - uint32_t currentFreqCount; + uint32_t currentFreqCount{0}; /** timer beginning stamp for random streaming */ struct timespec timerBegin; @@ -324,21 +305,18 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { //acquisition start /** Aquisition Started flag */ - bool startedFlag; + std::atomic startedFlag{false}; /** Frame Number of First Frame */ - uint64_t firstIndex; + std::atomic firstIndex{0}; //for statistics /** Number of complete frames caught */ - uint64_t numFramesCaught; + uint64_t numFramesCaught{0}; /** Frame Number of latest processed frame number */ - uint64_t currentFrameIndex; - - - + std::atomic currentFrameIndex{0}; //call back /** @@ -349,7 +327,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { * dataSize in bytes is the size of the data in bytes. */ void (*rawDataReadyCallBack)(char*, - char*, uint32_t, void*); + char*, uint32_t, void*) = nullptr; /** * Call back for raw data (modified) @@ -359,9 +337,9 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { * revDatasize is the reference of data size in bytes. Can be modified to the new size to be written/streamed. (only smaller value). */ void (*rawDataModifyReadyCallBack)(char*, - char*, uint32_t &, void*); + char*, uint32_t &, void*) = nullptr; - void *pRawDataReady; + void *pRawDataReady{nullptr}; diff --git a/slsReceiverSoftware/src/DataStreamer.cpp b/slsReceiverSoftware/src/DataStreamer.cpp index 2b840dab1..9c9b5f1a1 100755 --- a/slsReceiverSoftware/src/DataStreamer.cpp +++ b/slsReceiverSoftware/src/DataStreamer.cpp @@ -19,18 +19,11 @@ const std::string DataStreamer::TypeName = "DataStreamer"; DataStreamer::DataStreamer(int ind, Fifo* f, uint32_t* dr, ROI* r, uint64_t* fi, int fd, int* nd, bool* qe, uint64_t* tot) : ThreadObject(ind, TypeName), - runningFlag(0), - generalData(nullptr), fifo(f), - zmqSocket(nullptr), dynamicRange(dr), roi(r), - adcConfigured(-1), fileIndex(fi), flippedDataX(fd), - startedFlag(false), - firstIndex(0), - completeBuffer(nullptr), quadEnable(qe), totalNumFrames(tot) { @@ -46,23 +39,6 @@ DataStreamer::~DataStreamer() { delete [] completeBuffer; } -/** getters */ - -bool DataStreamer::IsRunning() { - return runningFlag; -} - - -/** setters */ -void DataStreamer::StartRunning() { - runningFlag = true; -} - - -void DataStreamer::StopRunning() { - runningFlag = false; -} - void DataStreamer::SetFifo(Fifo* f) { fifo = f; } diff --git a/slsReceiverSoftware/src/DataStreamer.h b/slsReceiverSoftware/src/DataStreamer.h index 4a31fa550..8b7d81881 100755 --- a/slsReceiverSoftware/src/DataStreamer.h +++ b/slsReceiverSoftware/src/DataStreamer.h @@ -42,25 +42,6 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject { */ ~DataStreamer(); - //*** getters *** - /** - * Returns if the thread is currently running - * @returns true if thread is running, else false - */ - bool IsRunning(); - - - //*** setters *** - /** - * Set bit in RunningMask to allow thread to run - */ - void StartRunning(); - - /** - * Reset bit in RunningMask to prevent thread from running - */ - void StopRunning(); - /** * Set Fifo pointer to the one given * @param f address of Fifo pointer @@ -158,19 +139,14 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject { /** type of thread */ static const std::string TypeName; - /** Object running status */ - bool runningFlag; - /** GeneralData (Detector Data) object */ - const GeneralData* generalData; + const GeneralData* generalData{nullptr}; /** Fifo structure */ Fifo* fifo; - - /** ZMQ Socket - Receiver to Client */ - ZmqSocket* zmqSocket; + ZmqSocket* zmqSocket{nullptr}; /** Pointer to dynamic range */ uint32_t* dynamicRange; @@ -179,7 +155,7 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject { ROI* roi; /** adc Configured */ - int adcConfigured; + int adcConfigured{-1}; /** Pointer to file index */ uint64_t* fileIndex; @@ -191,16 +167,16 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject { std::map additionJsonHeader; /** Aquisition Started flag */ - bool startedFlag; + bool startedFlag{nullptr}; /** Frame Number of First Frame */ - uint64_t firstIndex; + uint64_t firstIndex{0}; /* File name to stream */ std::string fileNametoStream; /** Complete buffer used for roi, eg. shortGotthard */ - char* completeBuffer; + char* completeBuffer{nullptr}; /** Number of Detectors in X and Y dimension */ int numDet[2]; diff --git a/slsReceiverSoftware/src/Listener.cpp b/slsReceiverSoftware/src/Listener.cpp index 6c0b054ff..48b9e133d 100755 --- a/slsReceiverSoftware/src/Listener.cpp +++ b/slsReceiverSoftware/src/Listener.cpp @@ -12,6 +12,7 @@ #include "container_utils.h" // For sls::make_unique<> #include "sls_detector_exceptions.h" #include "UdpRxSocket.h" +#include "network_utils.h" #include #include @@ -25,12 +26,9 @@ Listener::Listener(int ind, detectorType dtype, Fifo* f, std::atomic* int64_t* us, int64_t* as, uint32_t* fpf, frameDiscardPolicy* fdp, bool* act, bool* depaden, bool* sm) : ThreadObject(ind, TypeName), - runningFlag(0), - generalData(nullptr), fifo(f), myDetectorType(dtype), status(s), - udpSocket(nullptr), udpPortNumber(portno), eth(e), numImages(nf), @@ -41,19 +39,7 @@ Listener::Listener(int ind, detectorType dtype, Fifo* f, std::atomic* frameDiscardMode(fdp), activated(act), deactivatedPaddingEnable(depaden), - silentMode(sm), - row(0), - column(0), - startedFlag(false), - firstIndex(0), - numPacketsCaught(0), - lastCaughtFrameIndex(0), - currentFrameIndex(0), - carryOverFlag(0), - udpSocketAlive(0), - numPacketsStatistic(0), - numFramesStatistic(0), - oddStartingPacket(true) + silentMode(sm) { LOG(logDEBUG) << "Listener " << ind << " created"; } @@ -66,20 +52,15 @@ Listener::~Listener() { } } -/** getters */ -bool Listener::IsRunning() { - return runningFlag; -} - -uint64_t Listener::GetPacketsCaught() { +uint64_t Listener::GetPacketsCaught() const { return numPacketsCaught; } -uint64_t Listener::GetLastFrameIndexCaught() { +uint64_t Listener::GetLastFrameIndexCaught() const { return lastCaughtFrameIndex; } -uint64_t Listener::GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) { +uint64_t Listener::GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) const { if (!stoppedFlag) { return (numPackets - numPacketsCaught); } @@ -89,22 +70,10 @@ uint64_t Listener::GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) { return (lastCaughtFrameIndex - firstIndex + 1) * generalData->packetsPerFrame - numPacketsCaught; } -/** setters */ -void Listener::StartRunning() { - runningFlag = true; -} - - -void Listener::StopRunning() { - runningFlag = false; -} - - void Listener::SetFifo(Fifo* f) { fifo = f; } - void Listener::ResetParametersforNewAcquisition() { runningFlag = false; startedFlag = false; @@ -177,7 +146,7 @@ void Listener::CreateUDPSockets() { sem_init(&semaphore_socket,1,0); // doubled due to kernel bookkeeping (could also be less due to permissions) - *actualUDPSocketBufferSize = udpSocket->getActualUDPSocketBufferSize(); + *actualUDPSocketBufferSize = udpSocket->getBufferSize(); } @@ -185,7 +154,7 @@ void Listener::CreateUDPSockets() { void Listener::ShutDownUDPSocket() { if(udpSocket){ udpSocketAlive = false; - udpSocket->ShutDownSocket(); + udpSocket->Shutdown(); LOG(logINFO) << "Shut down of UDP port " << *udpPortNumber; fflush(stdout); // wait only if the threads have started as it is the threads that @@ -220,7 +189,7 @@ void Listener::CreateDummySocketForUDPSocketBufferSize(int64_t s) { *udpSocketBufferSize); // doubled due to kernel bookkeeping (could also be less due to permissions) - *actualUDPSocketBufferSize = g.getActualUDPSocketBufferSize(); + *actualUDPSocketBufferSize = g.getBufferSize(); if (*actualUDPSocketBufferSize == -1) { *udpSocketBufferSize = temp; } else { diff --git a/slsReceiverSoftware/src/Listener.h b/slsReceiverSoftware/src/Listener.h index 83e788cd1..38fdd072e 100755 --- a/slsReceiverSoftware/src/Listener.h +++ b/slsReceiverSoftware/src/Listener.h @@ -12,13 +12,10 @@ #include #include #include "ThreadObject.h" +#include "UdpRxSocket.h" class GeneralData; class Fifo; -namespace sls{ - class UdpRxSocket; -} - class Listener : private virtual slsDetectorDefs, public ThreadObject { @@ -53,40 +50,20 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { */ ~Listener(); - - //*** getters *** - /** - * Returns if the thread is currently running - * @returns true if thread is running, else false - */ - bool IsRunning() override; - /** * Get Packets caught * @return Packets caught */ - uint64_t GetPacketsCaught(); + uint64_t GetPacketsCaught() const; /** * Get Last Frame index caught * @return last frame index caught */ - uint64_t GetLastFrameIndexCaught(); + uint64_t GetLastFrameIndexCaught() const; /** Get number of missing packets */ - uint64_t GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets); - - - //*** setters *** - /** - * Set bit in RunningMask to allow thread to run - */ - void StartRunning(); - - /** - * Reset bit in RunningMask to prevent thread from running - */ - void StopRunning(); + uint64_t GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) const; /** * Set Fifo pointer to the one given @@ -140,7 +117,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { void RecordFirstIndex(uint64_t fnum); /** - * Thread Exeution for Listener Class + * Thread Execution for Listener Class * Pop free addresses, listen to udp socket, * write to memory & push the address into fifo */ @@ -168,16 +145,11 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { */ void PrintFifoStatistics(); - - /** type of thread */ static const std::string TypeName; - /** Object running status */ - std::atomic runningFlag; - /** GeneralData (Detector Data) object */ - GeneralData* generalData; + GeneralData* generalData{nullptr}; /** Fifo structure */ Fifo* fifo; @@ -190,7 +162,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { std::atomic* status; /** UDP Socket - Detector to Receiver */ - std::unique_ptr udpSocket; + std::unique_ptr udpSocket{nullptr}; /** UDP Port Number */ uint32_t* udpPortNumber; @@ -228,36 +200,34 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { /** row hardcoded as 1D or 2d, * if detector does not send them yet or * missing packets/deactivated (eiger/jungfrau sends 2d pos) **/ - uint16_t row; + uint16_t row{0}; /** column hardcoded as 2D, * deactivated eiger/missing packets (eiger/jungfrau sends 2d pos) **/ - uint16_t column; - + uint16_t column{0}; // acquisition start /** Aquisition Started flag */ - std::atomic startedFlag; + std::atomic startedFlag{false}; /** Frame Number of First Frame */ - uint64_t firstIndex; + uint64_t firstIndex{0}; // for acquisition summary /** Number of complete Packets caught */ - std::atomic numPacketsCaught; + std::atomic numPacketsCaught{0}; /** Last Frame Index caught from udp network */ - std::atomic lastCaughtFrameIndex; - + std::atomic lastCaughtFrameIndex{0}; // parameters to acquire image /** Current Frame Index, default value is 0 * ( always check startedFlag for validity first) */ - uint64_t currentFrameIndex; + uint64_t currentFrameIndex{0}; /** True if there is a packet carry over from previous Image */ - bool carryOverFlag; + bool carryOverFlag{false}; /** Carry over packet buffer */ std::unique_ptr carryOverPacket; @@ -266,22 +236,22 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { std::unique_ptr listeningPacket; /** if the udp socket is connected */ - std::atomic udpSocketAlive; + std::atomic udpSocketAlive{false}; - /** Semaphore to synchonize deleting udp socket */ + /** Semaphore to synchronize deleting udp socket */ sem_t semaphore_socket; - // for print progress during acqusition + // for print progress during acquisition /** number of packets for statistic */ - uint32_t numPacketsStatistic; + uint32_t numPacketsStatistic{0}; /** number of images for statistic */ - uint32_t numFramesStatistic; + uint32_t numFramesStatistic{0}; /** - * starting packet number is odd or evern, accordingly increment frame number + * starting packet number is odd or even, accordingly increment frame number * to get first packet number as 0 * (pecific to gotthard, can vary between modules, hence defined here) */ - bool oddStartingPacket; + bool oddStartingPacket{true}; }; diff --git a/slsReceiverSoftware/src/ReceiverApp.cpp b/slsReceiverSoftware/src/ReceiverApp.cpp index 89f1687cf..cbe4305ae 100755 --- a/slsReceiverSoftware/src/ReceiverApp.cpp +++ b/slsReceiverSoftware/src/ReceiverApp.cpp @@ -42,17 +42,14 @@ int main(int argc, char *argv[]) { LOG(logERROR) << "Could not set handler function for SIGPIPE"; } - std::unique_ptr receiver = nullptr; try { - receiver = sls::make_unique(argc, argv); + Receiver r(argc, argv); + LOG(logINFO) << "[ Press \'Ctrl+c\' to exit ]"; + sem_wait(&semaphore); + sem_destroy(&semaphore); } catch (...) { - LOG(logINFOBLUE) << "Exiting [ Tid: " << syscall(SYS_gettid) << " ]"; - throw; + //pass } - - LOG(logINFO) << "[ Press \'Ctrl+c\' to exit ]"; - sem_wait(&semaphore); - sem_destroy(&semaphore); LOG(logINFOBLUE) << "Exiting [ Tid: " << syscall(SYS_gettid) << " ]"; LOG(logINFO) << "Exiting Receiver"; return 0; diff --git a/slsReceiverSoftware/src/ThreadObject.cpp b/slsReceiverSoftware/src/ThreadObject.cpp index e78b5e3ca..08d2a5a67 100755 --- a/slsReceiverSoftware/src/ThreadObject.cpp +++ b/slsReceiverSoftware/src/ThreadObject.cpp @@ -36,6 +36,17 @@ ThreadObject::~ThreadObject() { sem_destroy(&semaphore); } +bool ThreadObject::IsRunning() const{ + return runningFlag; +} + +void ThreadObject::StartRunning() { + runningFlag = true; +} + +void ThreadObject::StopRunning() { + runningFlag = false; +} void ThreadObject::RunningThread() { LOG(logINFOBLUE) << "Created [ " << type << "Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]"; diff --git a/slsReceiverSoftware/src/ThreadObject.h b/slsReceiverSoftware/src/ThreadObject.h index cc9ffddf1..624dec1cf 100755 --- a/slsReceiverSoftware/src/ThreadObject.h +++ b/slsReceiverSoftware/src/ThreadObject.h @@ -21,7 +21,9 @@ class ThreadObject : private virtual slsDetectorDefs { public: ThreadObject(int threadIndex, std::string threadType); virtual ~ThreadObject(); - virtual bool IsRunning() = 0; + bool IsRunning() const; + void StartRunning(); + void StopRunning(); void Continue(); void SetThreadPriority(int priority); @@ -41,6 +43,7 @@ class ThreadObject : private virtual slsDetectorDefs { int index{0}; std::string type; std::atomic killThread{false}; + std::atomic runningFlag{false}; std::unique_ptr threadObject; sem_t semaphore; }; diff --git a/slsSupportLib/CMakeLists.txt b/slsSupportLib/CMakeLists.txt index 98904031e..67cc82d19 100755 --- a/slsSupportLib/CMakeLists.txt +++ b/slsSupportLib/CMakeLists.txt @@ -8,6 +8,7 @@ set(SOURCES src/ToString.cpp src/network_utils.cpp src/ZmqSocket.cpp + src/UdpRxSocket.cpp ) set(HEADERS diff --git a/slsSupportLib/include/DataSocket.h b/slsSupportLib/include/DataSocket.h index deaba27d8..470a72a13 100755 --- a/slsSupportLib/include/DataSocket.h +++ b/slsSupportLib/include/DataSocket.h @@ -21,7 +21,7 @@ class DataSocket { //No copy since the class manage the underlying socket DataSocket(const DataSocket &) = delete; DataSocket &operator=(DataSocket const &) = delete; - int getSocketId() const { return socketId_; } + int getSocketId() const { return sockfd_; } int Send(const void *buffer, size_t size); @@ -51,9 +51,10 @@ class DataSocket { int setReceiveTimeout(int us); void close(); void shutDownSocket(); + void shutdown(); private: - int socketId_ = -1; + int sockfd_ = -1; }; }; // namespace sls diff --git a/slsSupportLib/include/UdpRxSocket.h b/slsSupportLib/include/UdpRxSocket.h index a6ffffbb3..3a4ee5685 100644 --- a/slsSupportLib/include/UdpRxSocket.h +++ b/slsSupportLib/include/UdpRxSocket.h @@ -1,141 +1,30 @@ +#pragma once /* -UdpRxSocket provies socket control to receive -data on a udp socket. - -It provides a drop in replacement for -genericSocket. But please be careful since -this might be deprecated in the future - +UDP socket class to receive data. The intended use is in the +receiver listener loop. Should be used RAII style... */ -#include "network_utils.h" -#include "sls_detector_exceptions.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include - +#include //ssize_t namespace sls { class UdpRxSocket { - const ssize_t packet_size; - char *buff; - int fd = -1; + const ssize_t packet_size_; + int sockfd_{-1}; public: UdpRxSocket(int port, ssize_t packet_size, const char *hostname = nullptr, - ssize_t buffer_size = 0) - : packet_size(packet_size) { - /* hostname = nullptr -> wildcard */ + size_t kernel_buffer_size = 0); + ~UdpRxSocket(); + bool ReceivePacket(char *dst) noexcept; + size_t getBufferSize() const; + void setBufferSize(ssize_t size); + ssize_t getPacketSize() const noexcept; + void Shutdown(); - struct addrinfo hints; - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_DGRAM; - hints.ai_protocol = 0; - hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; - struct addrinfo *res = 0; - - const std::string portname = std::to_string(port); - if (getaddrinfo(hostname, portname.c_str(), &hints, &res)) { - throw RuntimeError("Failed at getaddrinfo with " + - std::string(hostname)); - } - fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); - if (fd == -1) { - throw RuntimeError("Failed to create UDP RX socket"); - } - if (bind(fd, res->ai_addr, res->ai_addrlen) == -1) { - throw RuntimeError("Failed to bind UDP RX socket"); - } - freeaddrinfo(res); - - // If we get a specified buffer size that is larger than the set one - // we set it. Otherwise we leave it there since it could have been - // set by the rx_udpsocksize command - if (buffer_size) { - auto current = getBufferSize() / 2; - if (current < buffer_size) { - setBufferSize(buffer_size); - if (getBufferSize() / 2 < buffer_size) { - LOG(logWARNING) - << "Could not set buffer size. Got: " - << getBufferSize() / 2 << " instead of " << buffer_size; - } - } - } - // Allocate at the end to avoid memory leak if we throw - buff = new char[packet_size]; - } - - ~UdpRxSocket() { - delete[] buff; - Shutdown(); - } - - const char *LastPacket() const noexcept { return buff; } - ssize_t getPacketSize() const noexcept { return packet_size; } - - bool ReceivePacket() noexcept { return ReceivePacket(buff); } - - bool ReceivePacket(char *dst, int flags = 0) noexcept { - auto bytes_received = - recvfrom(fd, dst, packet_size, flags, nullptr, nullptr); - return bytes_received == packet_size; - } - - bool PeekPacket() noexcept{ - return ReceivePacket(buff, MSG_PEEK); - } - - // Only for backwards compatibility this function will be removed during - // refactoring of the receiver - ssize_t ReceiveDataOnly(char *dst) { - auto r = recvfrom(fd, dst, packet_size, 0, nullptr, nullptr); - constexpr ssize_t eiger_header_packet = - 40; // only detector that has this - if (r == eiger_header_packet) { - LOG(logWARNING) << "Got header pkg"; - r = recvfrom(fd, dst, packet_size, 0, nullptr, nullptr); - } - return r; - } - - ssize_t getBufferSize() const { - uint64_t ret_size = 0; - socklen_t optlen = sizeof(uint64_t); - if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen) == -1) - return -1; - else - return ret_size; - } - - // Only for backwards compatibility will be removed - ssize_t getActualUDPSocketBufferSize() const { return getBufferSize(); } - - // Only for backwards compatibility will be removed - void ShutDownSocket() { Shutdown(); } - - void setBufferSize(ssize_t size) { - socklen_t optlen = sizeof(size); - if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &size, optlen)) { - throw RuntimeError("Could not set socket buffer size"); - } - } - - void Shutdown() { - shutdown(fd, SHUT_RDWR); - if (fd >= 0) { - close(fd); - fd = -1; - } - } + // Only for backwards compatibility, this drops the EIGER small pkt, may be + // removed + ssize_t ReceiveDataOnly(char *dst) noexcept; }; } // namespace sls diff --git a/slsSupportLib/src/DataSocket.cpp b/slsSupportLib/src/DataSocket.cpp index abef23141..48af8a9a6 100755 --- a/slsSupportLib/src/DataSocket.cpp +++ b/slsSupportLib/src/DataSocket.cpp @@ -15,13 +15,13 @@ namespace sls { -DataSocket::DataSocket(int socketId) : socketId_(socketId) { +DataSocket::DataSocket(int socketId) : sockfd_(socketId) { int value = 1; - setsockopt(socketId_, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)); + setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)); } DataSocket::~DataSocket() { - if (socketId_ <= 0) { + if (sockfd_ <= 0) { return; } else { try { @@ -32,7 +32,7 @@ DataSocket::~DataSocket() { } void DataSocket::swap(DataSocket &other) noexcept { - std::swap(socketId_, other.socketId_); + std::swap(sockfd_, other.sockfd_); } DataSocket::DataSocket(DataSocket &&move) noexcept { move.swap(*this); } @@ -121,19 +121,23 @@ int DataSocket::setTimeOut(int t_seconds) { } void DataSocket::close() { - if (socketId_ > 0) { - if (::close(socketId_)) { + if (sockfd_ > 0) { + if (::close(sockfd_)) { throw SocketError("could not close socket"); } - socketId_ = -1; + sockfd_ = -1; } else { throw std::runtime_error("Socket ERROR: close called on bad socket\n"); } } void DataSocket::shutDownSocket() { - shutdown(getSocketId(), SHUT_RDWR); + ::shutdown(getSocketId(), SHUT_RDWR); close(); } +void DataSocket::shutdown(){ + ::shutdown(sockfd_, SHUT_RDWR); +} + } // namespace sls diff --git a/slsSupportLib/src/UdpRxSocket.cpp b/slsSupportLib/src/UdpRxSocket.cpp new file mode 100644 index 000000000..558d9ee7e --- /dev/null +++ b/slsSupportLib/src/UdpRxSocket.cpp @@ -0,0 +1,96 @@ +#include "UdpRxSocket.h" +#include "network_utils.h" +#include "sls_detector_exceptions.h" +#include +#include +#include +#include +#include +#include +#include +#include + +namespace sls { + +UdpRxSocket::UdpRxSocket(int port, ssize_t packet_size, const char *hostname, + size_t kernel_buffer_size) + : packet_size_(packet_size) { + struct addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; + struct addrinfo *res = 0; + + const std::string portname = std::to_string(port); + if (getaddrinfo(hostname, portname.c_str(), &hints, &res)) { + throw RuntimeError("Failed at getaddrinfo with " + + std::string(hostname)); + } + sockfd_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (sockfd_ == -1) { + throw RuntimeError("Failed to create UDP RX socket"); + } + if (bind(sockfd_, res->ai_addr, res->ai_addrlen) == -1) { + throw RuntimeError("Failed to bind UDP RX socket"); + } + freeaddrinfo(res); + + // If we get a specified buffer size that is larger than the set one + // we set it. Otherwise we leave it there since it could have been + // set by the rx_udpsocksize command + if (kernel_buffer_size) { + auto current = getBufferSize() / 2; + if (current < kernel_buffer_size) { + setBufferSize(kernel_buffer_size); + if (getBufferSize() / 2 < kernel_buffer_size) { + LOG(logWARNING) + << "Could not set buffer size. Got: " << getBufferSize() / 2 + << " instead of " << kernel_buffer_size; + } + } + } +} + +UdpRxSocket::~UdpRxSocket() { Shutdown(); } +ssize_t UdpRxSocket::getPacketSize() const noexcept { return packet_size_; } + +bool UdpRxSocket::ReceivePacket(char *dst) noexcept{ + auto bytes_received = + recvfrom(sockfd_, dst, packet_size_, 0, nullptr, nullptr); + return bytes_received == packet_size_; +} + +ssize_t UdpRxSocket::ReceiveDataOnly(char *dst) noexcept { + auto r = recvfrom(sockfd_, dst, packet_size_, 0, nullptr, nullptr); + constexpr ssize_t eiger_header_packet = + 40; // only detector that has this + if (r == eiger_header_packet) { + LOG(logWARNING) << "Got header pkg"; + r = recvfrom(sockfd_, dst, packet_size_, 0, nullptr, nullptr); + } + return r; + } + +size_t UdpRxSocket::getBufferSize() const { + size_t ret = 0; + socklen_t optlen = sizeof(ret); + if (getsockopt(sockfd_, SOL_SOCKET, SO_RCVBUF, &ret, &optlen) == -1) + throw RuntimeError("Could not get socket buffer size"); + return ret; +} + +void UdpRxSocket::setBufferSize(ssize_t size) { + if (setsockopt(sockfd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size))) + throw RuntimeError("Could not set socket buffer size"); +} + +void UdpRxSocket::Shutdown() { + shutdown(sockfd_, SHUT_RDWR); + if (sockfd_ >= 0) { + close(sockfd_); + sockfd_ = -1; + } + } +} // namespace sls \ No newline at end of file diff --git a/slsSupportLib/tests/test-ToString.cpp b/slsSupportLib/tests/test-ToString.cpp index 7fc6b4bcc..8bbbeb034 100644 --- a/slsSupportLib/tests/test-ToString.cpp +++ b/slsSupportLib/tests/test-ToString.cpp @@ -109,7 +109,7 @@ TEST_CASE("run status"){ using defs = slsDetectorDefs; REQUIRE(ToString(defs::runStatus::ERROR) == "error"); REQUIRE(ToString(defs::runStatus::WAITING) == "waiting"); - REQUIRE(ToString(defs::runStatus::TRANSMITTING) == "data"); //?? + REQUIRE(ToString(defs::runStatus::TRANSMITTING) == "transmitting"); REQUIRE(ToString(defs::runStatus::RUN_FINISHED) == "finished"); REQUIRE(ToString(defs::runStatus::STOPPED) == "stopped"); REQUIRE(ToString(defs::runStatus::IDLE) == "idle"); diff --git a/slsSupportLib/tests/test-UdpRxSocket.cpp b/slsSupportLib/tests/test-UdpRxSocket.cpp index d922b5e5c..07c7e115c 100644 --- a/slsSupportLib/tests/test-UdpRxSocket.cpp +++ b/slsSupportLib/tests/test-UdpRxSocket.cpp @@ -4,6 +4,14 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include constexpr int default_port = 50001; @@ -29,46 +37,49 @@ int open_socket(int port) { throw sls::RuntimeError("Failed to create UDP RX socket"); } - if (connect(fd, res->ai_addr, res->ai_addrlen)){ + if (connect(fd, res->ai_addr, res->ai_addrlen)) { throw sls::RuntimeError("Failed to connect socket"); } freeaddrinfo(res); return fd; } -TEST_CASE("Receive data on localhost") { +TEST_CASE("Get packet size returns the packet size we set in the constructor"){ + constexpr int port = 50001; + constexpr ssize_t packet_size = 8000; + sls::UdpRxSocket s{port, packet_size}; + CHECK(s.getPacketSize() == packet_size); +} + +TEST_CASE("Receive data from a vector") { constexpr int port = 50001; std::vector data_to_send{4, 5, 3, 2, 5, 7, 2, 3}; + std::vector data_received(data_to_send.size()); ssize_t packet_size = sizeof(decltype(data_to_send)::value_type) * data_to_send.size(); + sls::UdpRxSocket udpsock{port, packet_size}; - int fd = open_socket(port); auto n = write(fd, data_to_send.data(), packet_size); CHECK(n == packet_size); - CHECK(udpsock.ReceivePacket()); + + CHECK(udpsock.ReceivePacket((char*)data_received.data())); close(fd); - // Copy data from buffer and compare values - std::vector data_received(data_to_send.size()); - memcpy(data_received.data(), udpsock.LastPacket(), udpsock.getPacketSize()); - CHECK(data_received.size() == data_to_send.size()); // sanity check - for (size_t i = 0; i != data_to_send.size(); ++i) { - CHECK(data_to_send[i] == data_received[i]); - } + CHECK(data_to_send == data_received); + } TEST_CASE("Shutdown socket without hanging when waiting for data") { constexpr int port = 50001; constexpr ssize_t packet_size = 8000; sls::UdpRxSocket s{port, packet_size}; + char buff[packet_size]; // Start a thread and wait for package // if the socket is left open we would block std::future ret = - std::async(static_cast( - &sls::UdpRxSocket::ReceivePacket), - &s); + std::async(&sls::UdpRxSocket::ReceivePacket, &s, (char *)&buff); s.Shutdown(); auto r = ret.get(); @@ -76,60 +87,23 @@ TEST_CASE("Shutdown socket without hanging when waiting for data") { CHECK(r == false); // since we didn't get the packet } -TEST_CASE("Too small packet"){ +TEST_CASE("Too small packet") { constexpr int port = 50001; - sls::UdpRxSocket s(port, 2*sizeof(uint32_t)); + sls::UdpRxSocket s(port, 2 * sizeof(uint32_t)); auto fd = open_socket(port); uint32_t val = 10; write(fd, &val, sizeof(val)); - CHECK(s.ReceivePacket() == false); + uint32_t buff[2]; + CHECK(s.ReceivePacket((char *)&buff) == false); close(fd); } - -TEST_CASE("Receive an int to internal buffer"){ +TEST_CASE("Receive an int to an external buffer") { int to_send = 5; int received = -1; auto fd = open_socket(default_port); sls::UdpRxSocket s(default_port, sizeof(int)); write(fd, &to_send, sizeof(to_send)); - CHECK(s.ReceivePacket()); - memcpy(&received, s.LastPacket(), sizeof(int)); + CHECK(s.ReceivePacket(reinterpret_cast(&received))); CHECK(received == to_send); } - -TEST_CASE("Receive an int to an external buffer"){ - int to_send = 5; - int received = -1; - auto fd = open_socket(default_port); - sls::UdpRxSocket s(default_port, sizeof(int)); - write(fd, &to_send, sizeof(to_send)); - CHECK(s.ReceivePacket(reinterpret_cast(&received))); - CHECK(received == to_send); -} - - -TEST_CASE("PEEK data"){ - int to_send = 5; - int to_send2 = 12; - int received = -1; - auto fd = open_socket(default_port); - sls::UdpRxSocket s(default_port, sizeof(int)); - write(fd, &to_send, sizeof(to_send)); - write(fd, &to_send2, sizeof(to_send)); - CHECK(s.PeekPacket()); - memcpy(&received, s.LastPacket(), sizeof(int)); - CHECK(received == to_send); - - CHECK(s.PeekPacket()); - memcpy(&received, s.LastPacket(), sizeof(int)); - CHECK(received == to_send); - - CHECK(s.ReceivePacket()); - memcpy(&received, s.LastPacket(), sizeof(int)); - CHECK(received == to_send); - - CHECK(s.ReceivePacket()); - memcpy(&received, s.LastPacket(), sizeof(int)); - CHECK(received == to_send2); -}