diff --git a/slsDetectorSoftware/include/Detector.h b/slsDetectorSoftware/include/Detector.h index ff84c62fb..f3a5b7d93 100644 --- a/slsDetectorSoftware/include/Detector.h +++ b/slsDetectorSoftware/include/Detector.h @@ -674,6 +674,9 @@ class Detector { void setRxZmqIP(const IpAddr ip, Positions pos = {}); + Result getClientZmq(Positions pos = {}) const; + void setClientZmq(const bool enable, Positions pos = {}); + Result getClientZmqPort(Positions pos = {}) const; /** diff --git a/slsDetectorSoftware/src/Detector.cpp b/slsDetectorSoftware/src/Detector.cpp index 396dacd44..8800a233d 100644 --- a/slsDetectorSoftware/src/Detector.cpp +++ b/slsDetectorSoftware/src/Detector.cpp @@ -209,6 +209,7 @@ void Detector::registerDataCallback(void (*func)(detectorData *, uint64_t, uint32_t, void *), void *pArg) { pimpl->registerDataCallback(func, pArg); + setClientZmq(true); } bool Detector::getGapPixelsinCallback() const { @@ -577,11 +578,12 @@ Result Detector::getNumberofUDPInterfaces(Positions pos) const { } void Detector::setNumberofUDPInterfaces(int n, Positions pos) { - int previouslyClientStreaming = pimpl->enableDataStreamingToClient(); + bool prevClientZmq = getClientZmq().tsquash( + "Inconsistent number client zmq sockets"); bool useReceiver = getUseReceiverFlag().squash(false); - bool previouslyReceiverStreaming = false; + bool prevRxZmq = false; if (useReceiver) { - previouslyReceiverStreaming = getRxZmqDataStream(pos).squash(true); + prevRxZmq = getRxZmqDataStream(pos).squash(true); } pimpl->Parallel(&Module::setNumberofUDPInterfaces, pos, n); // ensure receiver zmq socket ports are multiplied by 2 (2 interfaces) @@ -590,11 +592,11 @@ void Detector::setNumberofUDPInterfaces(int n, Positions pos) { setRxZmqPort(startingPort, -1); } // redo the zmq sockets if enabled - if (previouslyClientStreaming != 0) { - pimpl->enableDataStreamingToClient(0); - pimpl->enableDataStreamingToClient(1); + if (prevClientZmq) { + setClientZmq(false); + setClientZmq(true); } - if (previouslyReceiverStreaming) { + if (prevRxZmq) { setRxZmqDataStream(false, pos); setRxZmqDataStream(true, pos); } @@ -987,6 +989,8 @@ Result Detector::getRxZmqPort(Positions pos) const { } void Detector::setRxZmqPort(int port, int module_id) { + bool previouslyReceiverStreaming = + getRxZmqDataStream({module_id}).squash(false); if (module_id == -1) { for (int idet = 0; idet < size(); ++idet) { pimpl->Parallel1(&Receiver::setZmqPort, {idet}, @@ -996,6 +1000,10 @@ void Detector::setRxZmqPort(int port, int module_id) { pimpl->Parallel1(&Receiver::setZmqPort, {module_id}, {}, port++); } + if (previouslyReceiverStreaming) { + setRxZmqDataStream(false, {module_id}); + setRxZmqDataStream(true, {module_id}); + } } Result Detector::getRxZmqIP(Positions pos) const { @@ -1011,11 +1019,25 @@ void Detector::setRxZmqIP(const IpAddr ip, Positions pos) { } } +Result Detector::getClientZmq(Positions pos) const { + return pimpl->Parallel3(&Receiver::getClientZmq); +} + +void Detector::setClientZmq(const bool enable, Positions pos) { + try { + pimpl->Parallel3(&Receiver::setClientZmq, enable); + } catch (...) { + throw; + } +} + Result Detector::getClientZmqPort(Positions pos) const { return pimpl->Parallel1(&Receiver::getClientZmqPort, pos, {}); } void Detector::setClientZmqPort(int port, int module_id) { + bool prevClientZmq = getClientZmq().tsquash( + "Inconsistent number of client zmq sockets"); if (module_id == -1) { for (int idet = 0; idet < size(); ++idet) { pimpl->Parallel1(&Receiver::setClientZmqPort, {idet}, @@ -1025,6 +1047,10 @@ void Detector::setClientZmqPort(int port, int module_id) { pimpl->Parallel1(&Receiver::setClientZmqPort, {module_id}, {}, port);// FIXME: Needs a clientzmqport2 } + if (prevClientZmq) { + setClientZmq(false); + setClientZmq(true); + } } Result Detector::getClientZmqIp(Positions pos) const { @@ -1032,11 +1058,12 @@ Result Detector::getClientZmqIp(Positions pos) const { } void Detector::setClientZmqIp(const IpAddr ip, Positions pos) { - int previouslyClientStreaming = pimpl->enableDataStreamingToClient(-1); + bool prevClientZmq = getClientZmq().tsquash( + "Inconsistent number of client zmq sockets"); pimpl->Parallel3(&Receiver::setClientZmqIP, ip); - if (previouslyClientStreaming != 0) { - pimpl->enableDataStreamingToClient(0); - pimpl->enableDataStreamingToClient(1); + if (prevClientZmq) { + setClientZmq(false); + setClientZmq(true); } } diff --git a/slsDetectorSoftware/src/DetectorImpl.cpp b/slsDetectorSoftware/src/DetectorImpl.cpp index 6e8ab952d..755869a36 100755 --- a/slsDetectorSoftware/src/DetectorImpl.cpp +++ b/slsDetectorSoftware/src/DetectorImpl.cpp @@ -115,7 +115,6 @@ void DetectorImpl::freeSharedMemory(int detectorId, int moduleId) { } void DetectorImpl::freeSharedMemory() { - zmqSocket.clear(); for (auto &d : detectors) { d->freeSharedMemory(); } @@ -134,7 +133,6 @@ void DetectorImpl::freeSharedMemory() { receivers2.clear(); // clear multi detector shm detector_shm.RemoveSharedMemory(); - client_downstream = false; } std::string DetectorImpl::getUserDetails() { @@ -212,7 +210,6 @@ void DetectorImpl::initializeDetectorStructure() { void DetectorImpl::initializeMembers(bool verify) { // DetectorImpl - zmqSocket.clear(); int numModules = detector_shm()->numberOfModules; // get objects from single det shared memory (open) @@ -600,58 +597,6 @@ void DetectorImpl::setGapPixelsinCallback(const bool enable) { detector_shm()->gapPixels = enable; } -int DetectorImpl::createReceivingDataSockets(const bool destroy) { - if (destroy) { - LOG(logINFO) << "Going to destroy data sockets"; - // close socket - zmqSocket.clear(); - - client_downstream = false; - LOG(logINFO) << "Destroyed Receiving Data Socket(s)"; - return OK; - } - if (client_downstream) { - return OK; - } - LOG(logINFO) << "Going to create data sockets"; - - size_t numSockets = detectors.size(); - size_t numSocketsPerDetector = 1; - if (detector_shm()->multiDetectorType == EIGER) { - numSocketsPerDetector = 2; - } - if (Parallel(&Module::getNumberofUDPInterfacesFromShm, {}).squash() == - 2) { - numSocketsPerDetector = 2; - } - numSockets *= numSocketsPerDetector; - - for (size_t iSocket = 0; iSocket < numSockets; ++iSocket) { - uint32_t portnum = (receivers[iSocket / numSocketsPerDetector][0] - ->getClientZmqPort());//FIXME 2 receivers - portnum += (iSocket % numSocketsPerDetector); - try { - zmqSocket.push_back(sls::make_unique( - receivers[iSocket / numSocketsPerDetector][0] - ->getClientZmqIP() - .str() - .c_str(), - portnum)); - LOG(logINFO) << "Zmq Client[" << iSocket << "] at " - << zmqSocket.back()->GetZmqServerAddress(); - } catch (...) { - LOG(logERROR) - << "Could not create Zmq socket on port " << portnum; - createReceivingDataSockets(true); - return FAIL; - } - } - - client_downstream = true; - LOG(logINFO) << "Receiving Data Socket(s) created"; - return OK; -} - void DetectorImpl::readFrameFromReceiver() { bool gapPixels = detector_shm()->gapPixels; @@ -667,19 +612,28 @@ void DetectorImpl::readFrameFromReceiver() { Parallel(&Module::getNumberofUDPInterfacesFromShm, {}) .squash(); // cannot pick up from zmq - bool runningList[zmqSocket.size()], connectList[zmqSocket.size()]; + size_t nZmq = receivers.size() + receivers2.size(); + std::vector zmqSockets; + for (size_t i = 0; i < receivers.size(); ++i) { + zmqSockets.push_back(receivers[i][0]->getZmqSocket()); + if (receivers2.size()) { + zmqSockets.push_back(receivers2[i][0]->getZmqSocket()); + } + } + bool runningList[nZmq], connectList[nZmq]; int numRunning = 0; - for (size_t i = 0; i < zmqSocket.size(); ++i) { - if (zmqSocket[i]->Connect() == 0) { + for (size_t i = 0; i < nZmq; ++i) { + if (zmqSockets[i]->Connect() == 0) { connectList[i] = true; runningList[i] = true; ++numRunning; } else { // to remember the list it connected to, to disconnect later connectList[i] = false; - LOG(logERROR) << "Could not connect to socket " - << zmqSocket[i]->GetZmqServerAddress(); runningList[i] = false; + LOG(logERROR) << "Could not connect to socket " + << zmqSockets[i]->GetZmqServerAddress(); + } } int numConnected = numRunning; @@ -716,7 +670,7 @@ void DetectorImpl::readFrameFromReceiver() { completeImage = true; // get each frame - for (unsigned int isocket = 0; isocket < zmqSocket.size(); ++isocket) { + for (unsigned int isocket = 0; isocket < nZmq; ++isocket) { // if running if (runningList[isocket]) { @@ -724,7 +678,7 @@ void DetectorImpl::readFrameFromReceiver() { // HEADER { zmqHeader zHeader; - if (zmqSocket[isocket]->ReceiveHeader( + if (zmqSockets[isocket]->ReceiveHeader( isocket, zHeader, SLS_DETECTOR_JSON_HEADER_VERSION) == 0) { // parse error, version error or end of acquisition for @@ -738,7 +692,7 @@ void DetectorImpl::readFrameFromReceiver() { if (image == nullptr) { // allocate size = zHeader.imageSize; - multisize = size * zmqSocket.size(); + multisize = size * nZmq; image = new char[size]; multiframe = new char[multisize]; memset(multiframe, 0xFF, multisize); @@ -803,7 +757,7 @@ void DetectorImpl::readFrameFromReceiver() { // DATA data = true; - zmqSocket[isocket]->ReceiveData(isocket, image, size); + zmqSockets[isocket]->ReceiveData(isocket, image, size); // creating multi image { @@ -890,7 +844,7 @@ void DetectorImpl::readFrameFromReceiver() { running = false; } else { // starting a new scan/measurement (got dummy data) - for (size_t i = 0; i < zmqSocket.size(); ++i) { + for (size_t i = 0; i < zmqSockets.size(); ++i) { runningList[i] = connectList[i]; } numRunning = numConnected; @@ -899,9 +853,9 @@ void DetectorImpl::readFrameFromReceiver() { } // Disconnect resources - for (size_t i = 0; i < zmqSocket.size(); ++i) { + for (size_t i = 0; i < zmqSockets.size(); ++i) { if (connectList[i]) { - zmqSocket[i]->Disconnect(); + zmqSockets[i]->Disconnect(); } } @@ -1207,23 +1161,6 @@ int DetectorImpl::InsertGapPixels(char *image, char *&gpImage, return imagesize; } - - -bool DetectorImpl::enableDataStreamingToClient(int enable) { - if (enable >= 0) { - // destroy data threads - if (enable == 0) { - createReceivingDataSockets(true); - // create data threads - } else { - if (createReceivingDataSockets() == FAIL) { - throw RuntimeError("Could not create data threads in client."); - } - } - } - return client_downstream; -} - void DetectorImpl::registerAcquisitionFinishedCallback(void (*func)(double, int, void *), void *pArg) { @@ -1237,7 +1174,6 @@ void DetectorImpl::registerDataCallback(void (*userCallback)(detectorData *, void *pArg) { dataReady = userCallback; pCallbackArg = pArg; - enableDataStreamingToClient(dataReady == nullptr ? 0 : 1); } int DetectorImpl::acquire() { diff --git a/slsDetectorSoftware/src/DetectorImpl.h b/slsDetectorSoftware/src/DetectorImpl.h index aa71aaaed..2fc90f786 100755 --- a/slsDetectorSoftware/src/DetectorImpl.h +++ b/slsDetectorSoftware/src/DetectorImpl.h @@ -5,7 +5,6 @@ #include "logger.h" #include "sls_detector_defs.h" -class ZmqSocket; class detectorData; #include @@ -693,13 +692,6 @@ class DetectorImpl : public virtual slsDetectorDefs { /** [Eiger][Jungfrau] */ void setGapPixelsinCallback(const bool enable); - /** - * Enable data streaming to client - * @param enable 0 to disable, 1 to enable, -1 to get the value - * @returns data streaming to client enable - */ - bool enableDataStreamingToClient(int enable = -1); - /** * register callback for accessing acquisition final data * @param func function to be called at the end of the acquisition. @@ -785,13 +777,6 @@ class DetectorImpl : public virtual slsDetectorDefs { void updateDetectorSize(); - /** - * Create Receiving Data Sockets - * @param destroy is true to destroy all the sockets - * @returns OK or FAIL - */ - int createReceivingDataSockets(const bool destroy = false); - /** * Reads frames from receiver through a constant socket * Called during acquire() when call back registered or when using gui @@ -847,13 +832,6 @@ class DetectorImpl : public virtual slsDetectorDefs { /** for the second udp port [Eiger][Jungfrau] */ std::vector>> receivers2; - - /** data streaming (down stream) enabled in client (zmq sckets created) */ - bool client_downstream{false}; - - /** ZMQ Socket - Receiver to Client */ - std::vector> zmqSocket; - /** semaphore to let postprocessing thread continue for next * scan/measurement */ sem_t sem_newRTAcquisition; diff --git a/slsDetectorSoftware/src/Receiver.cpp b/slsDetectorSoftware/src/Receiver.cpp index 6fc9fadfc..f5fa70b23 100755 --- a/slsDetectorSoftware/src/Receiver.cpp +++ b/slsDetectorSoftware/src/Receiver.cpp @@ -1,10 +1,13 @@ #include "Receiver.h" #include "ClientSocket.h" +#include "ZmqSocket.h" #include "FixedCapacityContainer.h" #include "string_utils.h" #include "versionAPI.h" #include "ToString.h" +#include "container_utils.h" + namespace sls { // create shm @@ -144,7 +147,7 @@ sls::MacAddr Receiver::configure(slsDetectorDefs::rxParameters arg) { arg.udpInterfaces = 2; } - LOG(logINFO) + LOG(logDEBUG) << "detType:" << arg.detType << std::endl << "detectorSize.x:" << arg.detectorSize.x << std::endl << "detectorSize.y:" << arg.detectorSize.y << std::endl @@ -389,6 +392,38 @@ void Receiver::setClientZmqIP(const sls::IpAddr ip) { shm()->zmqIp = ip; } +bool Receiver::getClientZmq() const { + return (zmqSocket != nullptr); +} + +void Receiver::setClientZmq(const bool enable) { + // destroy + if (!enable) { + if (zmqSocket != nullptr) { + zmqSocket.reset(); + } + } + // create + else { + if (zmqSocket == nullptr) { + try { + zmqSocket = sls::make_unique( + shm()->zmqIp.str().c_str(), shm()->zmqPort); + LOG(logINFO) << "Zmq Client[" << indexString << "] at " + << zmqSocket->GetZmqServerAddress(); + } catch(...) { + throw RuntimeError( + "Could not create Zmq socket [" + indexString + + " on port " + std::to_string(shm()->zmqPort)); + } + } + } +} + +ZmqSocket* Receiver::getZmqSocket() { + return zmqSocket.get(); +} + /** Receiver Parameters */ bool Receiver::getLock() const { diff --git a/slsDetectorSoftware/src/Receiver.h b/slsDetectorSoftware/src/Receiver.h index dfa99544d..c6770eac0 100755 --- a/slsDetectorSoftware/src/Receiver.h +++ b/slsDetectorSoftware/src/Receiver.h @@ -5,9 +5,12 @@ #include "network_utils.h" #include +#include #define RECEIVER_SHMVERSION 0x200421 +class ZmqSocket; + namespace sls { struct sharedReceiver { @@ -105,8 +108,9 @@ class Receiver : public virtual slsDetectorDefs { void setClientZmqPort(const int port); sls::IpAddr getClientZmqIP() const; void setClientZmqIP(const sls::IpAddr ip); - - + bool getClientZmq() const; + void setClientZmq(const bool enable); + ZmqSocket* getZmqSocket(); /************************************************** * * @@ -248,6 +252,7 @@ class Receiver : public virtual slsDetectorDefs { const int moduleId{0}; std::string indexString; mutable sls::SharedMemory shm{0, 0, 0, 0}; + std::unique_ptr zmqSocket; }; } // sls \ No newline at end of file