From df63a6dffef38f5edd259a65ced0554be6deb84a Mon Sep 17 00:00:00 2001 From: Dhanya Thattil Date: Fri, 17 Apr 2020 12:09:23 +0200 Subject: [PATCH] rx_statusL unknown, but others good WIP --- slsDetectorSoftware/include/Detector.h | 6 +- slsDetectorSoftware/src/CmdProxy.cpp | 17 ++- slsDetectorSoftware/src/Detector.cpp | 74 +++++++++---- slsDetectorSoftware/src/DetectorImpl.cpp | 22 ++-- slsDetectorSoftware/src/DetectorImpl.h | 132 +++++++++++++++++++++++ slsDetectorSoftware/src/Module.cpp | 58 ---------- slsDetectorSoftware/src/Module.h | 29 ----- slsDetectorSoftware/src/Receiver.cpp | 45 +++++++- slsDetectorSoftware/src/Receiver.h | 82 +++++++------- 9 files changed, 303 insertions(+), 162 deletions(-) diff --git a/slsDetectorSoftware/include/Detector.h b/slsDetectorSoftware/include/Detector.h index 5ede9688d..a97004305 100644 --- a/slsDetectorSoftware/include/Detector.h +++ b/slsDetectorSoftware/include/Detector.h @@ -346,7 +346,11 @@ class Detector { Result getDetectorStatus(Positions pos = {}) const; - Result getReceiverStatus(Positions pos = {}) const; + Result getReceiverStatus() const; + /** interface is by 1 (primary udp interface), + * 2 for second udp interface [Eiger][Jungfrau] */ + Result getReceiverStatus(const int udpInterface, + Positions pos = {}) const; Result getFramesCaught(Positions pos = {}) const; diff --git a/slsDetectorSoftware/src/CmdProxy.cpp b/slsDetectorSoftware/src/CmdProxy.cpp index c8e72ab92..952d093e7 100644 --- a/slsDetectorSoftware/src/CmdProxy.cpp +++ b/slsDetectorSoftware/src/CmdProxy.cpp @@ -821,16 +821,29 @@ std::vector CmdProxy::DacCommands() { /* acquisition */ std::string CmdProxy::ReceiverStatus(int action) { + int udpInterface = 1; + if (cmd == "rx_status") { + udpInterface = 1; + } else if (cmd == "rx_status2") { + udpInterface = 2; + } else { + throw sls::RuntimeError("Unknown command, use list to list all commands"); + } std::ostringstream os; os << cmd << ' '; if (action == defs::HELP_ACTION) { - os << "running, idle]\n\tReceiver listener status." + if (cmd == "rx_status") { + os << "running, idle]\n\tReceiver listener status." << '\n'; + } else { + os << "running, idle]\n\tReceiver listener status for second udp port." + << '\n'; + } } else if (action == defs::GET_ACTION) { if (args.size() != 0) { WrongNumberOfParameters(0); } - auto t = det->getReceiverStatus({det_id}); + auto t = det->getReceiverStatus(udpInterface, {det_id}); os << OutString(t) << '\n'; } else if (action == defs::PUT_ACTION) { throw sls::RuntimeError("Cannot put. Did you mean to use command 'rx_start' or 'rx_stop'?"); diff --git a/slsDetectorSoftware/src/Detector.cpp b/slsDetectorSoftware/src/Detector.cpp index 635401c60..a93ffdfc5 100644 --- a/slsDetectorSoftware/src/Detector.cpp +++ b/slsDetectorSoftware/src/Detector.cpp @@ -463,11 +463,11 @@ void Detector::acquire() { pimpl->acquire(); } void Detector::clearAcquiringFlag() { pimpl->setAcquiringFlag(0); } void Detector::startReceiver() { - pimpl->Parallel(&Module::startReceiver, {}); + pimpl->Parallel3(&Receiver::start); } void Detector::stopReceiver() { - pimpl->Parallel(&Module::stopReceiver, {}); + pimpl->Parallel3(&Receiver::stop); } void Detector::startDetector() { @@ -478,15 +478,49 @@ void Detector::startDetector() { } void Detector::stopDetector() { + // get status before stopping acquisition + defs::runStatus s = defs::ERROR, r = defs::ERROR; + bool restreamStop = false; + if (pimpl->isReceiverInitialized(1) && getRxZmqDataStream().squash(true)) { + s = getDetectorStatus().squash(defs::ERROR); + r = getReceiverStatus().squash(defs::ERROR); + // if rxr streaming and acquisition finished, restream dummy stop packet + if (s == defs::IDLE && r == defs::IDLE) { + restreamStop = true; + } + } pimpl->Parallel(&Module::stopAcquisition, {}); + if (pimpl->isReceiverInitialized(1)) { + pimpl->Parallel1(&Receiver::setStoppedFlag, {}, {}); + if (restreamStop) { + pimpl->Parallel1(&Receiver::restreamStop, {}, {}); + } + } else if (pimpl->isReceiverInitialized(2)) { + pimpl->Parallel2(&Receiver::setStoppedFlag, {}, {}); + if (restreamStop) { + pimpl->Parallel2(&Receiver::restreamStop, {}, {}); + } + } } Result Detector::getDetectorStatus(Positions pos) const { return pimpl->Parallel(&Module::getRunStatus, pos); } -Result Detector::getReceiverStatus(Positions pos) const { - return pimpl->Parallel(&Module::getReceiverStatus, pos); +Result Detector::getReceiverStatus() const { + return pimpl->Parallel3(&Receiver::getStatus); +} + +Result Detector::getReceiverStatus(const int udpInterface, Positions pos) const { + switch (udpInterface) { + case 1: + return pimpl->Parallel1(&Receiver::getStatus, pos, {}); + case 2: + return pimpl->Parallel2(&Receiver::getStatus, pos, {}); + default: + throw RuntimeError("Invalid udp interface number " + + std::to_string(udpInterface)); + } } Result Detector::getFramesCaught(Positions pos) const { @@ -703,9 +737,9 @@ Result Detector::getUseReceiverFlag(Positions pos) const { Result Detector::getRxHostname(const int udpInterface, Positions pos) const { switch (udpInterface) { case 1: - return pimpl->Parallel1(&Receiver::getHostname, pos, {0}); + return pimpl->Parallel1(&Receiver::getHostname, pos, {}); case 2: - return pimpl->Parallel2(&Receiver::getHostname, pos, {0}); + return pimpl->Parallel2(&Receiver::getHostname, pos, {}); default: throw RuntimeError("Invalid udp interface number " + std::to_string(udpInterface)); @@ -713,13 +747,17 @@ Result Detector::getRxHostname(const int udpInterface, Positions po } void Detector::setRxHostname(const int udpInterface, const std::string &hostname, Positions pos) { + if (getDetectorStatus(pos).squash(defs::ERROR) == defs::RUNNING) { + LOG(logWARNING) << "Acquisition already running, Stopping it."; + stopDetector(); + } if (!pimpl->isReceiverInitialized(udpInterface)) { pimpl->initReceiver(udpInterface); } if (udpInterface == 1) { - pimpl->Parallel1(&Receiver::setHostname, pos, {0}, hostname); + pimpl->Parallel1(&Receiver::setHostname, pos, {}, hostname); } else { - pimpl->Parallel2(&Receiver::setHostname, pos, {0}, hostname); + pimpl->Parallel2(&Receiver::setHostname, pos, {}, hostname); } } @@ -729,20 +767,20 @@ void Detector::setRxHostname(const int udpInterface, const std::string &hostname pimpl->initReceiver(udpInterface); } if (udpInterface == 1) { - pimpl->Parallel1(&Receiver::setTCPPort, {module_id}, {0}, port); - pimpl->Parallel1(&Receiver::setHostname, {module_id}, {0}, hostname); + pimpl->Parallel1(&Receiver::setTCPPort, {module_id}, {}, port); + pimpl->Parallel1(&Receiver::setHostname, {module_id}, {}, hostname); } else { - pimpl->Parallel2(&Receiver::setTCPPort, {module_id}, {0}, port); - pimpl->Parallel2(&Receiver::setHostname, {module_id}, {0}, hostname); + pimpl->Parallel2(&Receiver::setTCPPort, {module_id}, {}, port); + pimpl->Parallel2(&Receiver::setHostname, {module_id}, {}, hostname); } } Result Detector::getRxPort(const int udpInterface, Positions pos) const { switch (udpInterface) { case 1: - return pimpl->Parallel1(&Receiver::getTCPPort, pos, {0}); + return pimpl->Parallel1(&Receiver::getTCPPort, pos, {}); case 2: - return pimpl->Parallel2(&Receiver::getTCPPort, pos, {0}); + return pimpl->Parallel2(&Receiver::getTCPPort, pos, {}); default: throw RuntimeError("Invalid udp interface number " + std::to_string(udpInterface)); @@ -757,21 +795,21 @@ void Detector::setRxPort(const int udpInterface, int port, int module_id) { if (module_id == -1) { std::vector port_list = getPortNumbers(port); for (int idet = 0; idet < size(); ++idet) { - pimpl->Parallel1(&Receiver::setTCPPort, {idet}, {0}, + pimpl->Parallel1(&Receiver::setTCPPort, {idet}, {}, port_list[idet]); } } else { - pimpl->Parallel1(&Receiver::setTCPPort, {module_id}, {0}, port); + pimpl->Parallel1(&Receiver::setTCPPort, {module_id}, {}, port); } } else { if (module_id == -1) { std::vector port_list = getPortNumbers(port); for (int idet = 0; idet < size(); ++idet) { - pimpl->Parallel2(&Receiver::setTCPPort, {idet}, {0}, + pimpl->Parallel2(&Receiver::setTCPPort, {idet}, {}, port_list[idet]); } } else { - pimpl->Parallel2(&Receiver::setTCPPort, {module_id}, {0}, port); + pimpl->Parallel2(&Receiver::setTCPPort, {module_id}, {}, port); } } } diff --git a/slsDetectorSoftware/src/DetectorImpl.cpp b/slsDetectorSoftware/src/DetectorImpl.cpp index f891cbcfc..f35adfd45 100755 --- a/slsDetectorSoftware/src/DetectorImpl.cpp +++ b/slsDetectorSoftware/src/DetectorImpl.cpp @@ -1174,16 +1174,16 @@ int DetectorImpl::acquire() { // receiver/ext process) sem_init(&sem_endRTAcquisition, 1, 0); - bool receiver = - Parallel(&Module::getUseReceiverFlag, {}).squash(false); + bool receiver1 = isReceiverInitialized(1); + bool receiver2 = isReceiverInitialized(2); + bool receiver = receiver1 || receiver2; setJoinThreadFlag(false); // verify receiver is idle if (receiver) { - if (Parallel(&Module::getReceiverStatus, {}).squash(ERROR) != - IDLE) { - Parallel(&Module::stopReceiver, {}); + if (Parallel3(&Receiver::getStatus).squash(ERROR) != IDLE) { + Parallel3(&Receiver::stop); } } @@ -1191,7 +1191,7 @@ int DetectorImpl::acquire() { // start receiver if (receiver) { - Parallel(&Module::startReceiver, {}); + Parallel3(&Receiver::start); // let processing thread listen to these packets sem_post(&sem_newRTAcquisition); } @@ -1203,13 +1203,13 @@ int DetectorImpl::acquire() { } Parallel(&Module::startAndReadAll, {}); } catch (...) { - Parallel(&Module::stopReceiver, {}); + Parallel3(&Receiver::stop); throw; } // stop receiver if (receiver) { - Parallel(&Module::stopReceiver, {}); + Parallel3(&Receiver::stop); if (dataReady != nullptr) { sem_wait(&sem_endRTAcquisition); // waits for receiver's } @@ -1226,7 +1226,7 @@ int DetectorImpl::acquire() { if (acquisition_finished != nullptr) { int status = Parallel(&Module::getRunStatus, {}).squash(ERROR); - auto a = Parallel(&Module::getReceiverProgress, {}); + auto a = Parallel3(&Receiver::getProgress); int progress = (*std::min_element (a.begin(), a.end())); acquisition_finished((double)progress, status, acqFinished_p); } @@ -1278,7 +1278,7 @@ void DetectorImpl::processData() { } } // get and print progress - double temp = (double)Parallel(&Module::getReceiverProgress, {0}).squash(); + double temp = (double)Parallel1(&Receiver::getProgress, {0}, {0}).squash(); if (temp != progress) { printProgress(progress); progress = temp; @@ -1287,7 +1287,7 @@ void DetectorImpl::processData() { // exiting loop if (getJoinThreadFlag()) { // print progress one final time before exiting - progress = (double)Parallel(&Module::getReceiverProgress, {0}).squash(); + progress = (double)Parallel1(&Receiver::getProgress, {0}, {0}).squash(); printProgress(progress); break; } diff --git a/slsDetectorSoftware/src/DetectorImpl.h b/slsDetectorSoftware/src/DetectorImpl.h index 416159199..2ea3c5ab7 100755 --- a/slsDetectorSoftware/src/DetectorImpl.h +++ b/slsDetectorSoftware/src/DetectorImpl.h @@ -496,6 +496,138 @@ class DetectorImpl : public virtual slsDetectorDefs { + // for all , but dont complain if receiver2 doesnt exist + template + sls::Result Parallel3(RT (sls::Receiver::*somefunc)(CT...), + typename NonDeduced::type... Args) { + + if (receivers.size() == 0) + throw sls::RuntimeError("No receivers added"); + std::vector dPositions; + dPositions.resize(receivers.size()); + std::iota(begin(dPositions), end(dPositions), 0); + std::vector rxPositions; + rxPositions.resize(receivers[0].size()); + std::iota(begin(rxPositions), end(rxPositions), 0); + // multiply by 2 if receivers2 exists + size_t futureSize = dPositions.size() * rxPositions.size() * + (receivers2.size() > 0 ? 2 : 1); + std::vector> futures; + futures.reserve(futureSize); + for (size_t i : dPositions) { + // each entry + for (size_t j : rxPositions) { + futures.push_back(std::async(std::launch::async, somefunc, + receivers[i][j].get(), Args...)); + futures.push_back(std::async(std::launch::async, somefunc, + receivers2[i][j].get(), Args...)); + } + } + sls::Result result; + result.reserve(futureSize); + for (auto &i : futures) { + result.push_back(i.get()); + } + return result; + } + + template + sls::Result Parallel3(RT (sls::Receiver::*somefunc)(CT...) const, + typename NonDeduced::type... Args) const { + + if (receivers.size() == 0) + throw sls::RuntimeError("No receivers added"); + std::vector dPositions; + dPositions.resize(receivers.size()); + std::iota(begin(dPositions), end(dPositions), 0); + std::vector rxPositions; + rxPositions.resize(receivers[0].size()); + std::iota(begin(rxPositions), end(rxPositions), 0); + // multiply by 2 if receivers2 exists + size_t futureSize = dPositions.size() * rxPositions.size() * + (receivers2.size() > 0 ? 2 : 1); + std::vector> futures; + futures.reserve(futureSize); + for (size_t i : dPositions) { + // each entry + for (size_t j : rxPositions) { + futures.push_back(std::async(std::launch::async, somefunc, + receivers[i][j].get(), Args...)); + futures.push_back(std::async(std::launch::async, somefunc, + receivers2[i][j].get(), Args...)); + } + } + sls::Result result; + result.reserve(futureSize); + for (auto &i : futures) { + result.push_back(i.get()); + } + return result; + } + + template + void Parallel3(void (sls::Receiver::*somefunc)(CT...), + typename NonDeduced::type... Args) { + + if (receivers.size() == 0) + throw sls::RuntimeError("No receivers added"); + std::vector dPositions; + dPositions.resize(receivers.size()); + std::iota(begin(dPositions), end(dPositions), 0); + std::vector rxPositions; + rxPositions.resize(receivers[0].size()); + std::iota(begin(rxPositions), end(rxPositions), 0); + // multiply by 2 if receivers2 exists + size_t futureSize = dPositions.size() * rxPositions.size() * + (receivers2.size() > 0 ? 2 : 1); + std::vector> futures; + futures.reserve(futureSize); + for (size_t i : dPositions) { + // each entry + for (size_t j : rxPositions) { + futures.push_back(std::async(std::launch::async, somefunc, + receivers[i][j].get(), Args...)); + futures.push_back(std::async(std::launch::async, somefunc, + receivers2[i][j].get(), Args...)); + } + } + for (auto &i : futures) { + i.get(); + } + } + + template + void Parallel3(void (sls::Receiver::*somefunc)(CT...) const, + typename NonDeduced::type... Args) const { + + if (receivers.size() == 0) + throw sls::RuntimeError("No receivers added"); + std::vector dPositions; + dPositions.resize(receivers.size()); + std::iota(begin(dPositions), end(dPositions), 0); + std::vector rxPositions; + rxPositions.resize(receivers[0].size()); + std::iota(begin(rxPositions), end(rxPositions), 0); + // multiply by 2 if receivers2 exists + size_t futureSize = dPositions.size() * rxPositions.size() * + (receivers2.size() > 0 ? 2 : 1); + std::vector> futures; + futures.reserve(futureSize); + for (size_t i : dPositions) { + // each entry + for (size_t j : rxPositions) { + futures.push_back(std::async(std::launch::async, somefunc, + receivers[i][j].get(), Args...)); + futures.push_back(std::async(std::launch::async, somefunc, + receivers2[i][j].get(), Args...)); + } + } + for (auto &i : futures) { + i.get(); + } + } + + /** set acquiring flag in shared memory */ void setAcquiringFlag(bool flag); diff --git a/slsDetectorSoftware/src/Module.cpp b/slsDetectorSoftware/src/Module.cpp index 84576b593..7a91c8f4c 100755 --- a/slsDetectorSoftware/src/Module.cpp +++ b/slsDetectorSoftware/src/Module.cpp @@ -421,7 +421,6 @@ void Module::initializeDetectorStructure(detectorType type) { (moduleId * ((shm()->myDetectorType == EIGER) ? 2 : 1)); shm()->zmqip = IpAddr{}; shm()->numUDPInterfaces = 1; - shm()->stoppedFlag = false; // get the detector parameters based on type detParameters parameters{type}; @@ -985,28 +984,14 @@ void Module::prepareAcquisition() { void Module::startAcquisition() { LOG(logDEBUG1) << "Starting Acquisition"; - shm()->stoppedFlag = false; sendToDetector(F_START_ACQUISITION); LOG(logDEBUG1) << "Starting Acquisition successful"; } void Module::stopAcquisition() { - // get status before stopping acquisition - runStatus s = ERROR, r = ERROR; - bool zmqstreaming = false; - if (shm()->useReceiver && getReceiverStreaming()) { - zmqstreaming = true; - s = getRunStatus(); - r = getReceiverStatus(); - } LOG(logDEBUG1) << "Stopping Acquisition"; sendToDetectorStop(F_STOP_ACQUISITION); - shm()->stoppedFlag = true; LOG(logDEBUG1) << "Stopping Acquisition successful"; - // if rxr streaming and acquisition finished, restream dummy stop packet - if (zmqstreaming && (s == IDLE) && (r == IDLE)) { - restreamStopFromReceiver(); - } } void Module::sendSoftwareTrigger() { @@ -1017,7 +1002,6 @@ void Module::sendSoftwareTrigger() { void Module::startAndReadAll() { LOG(logDEBUG1) << "Starting and reading all frames"; - shm()->stoppedFlag = false; sendToDetector(F_START_AND_READ_ALL); LOG(logDEBUG1) << "Detector successfully finished acquisition"; } @@ -2995,32 +2979,6 @@ void Module::setPartialFramesPadding(bool padding) { sendToReceiver(F_SET_RECEIVER_PADDING, arg, nullptr); } -void Module::startReceiver() { - LOG(logDEBUG1) << "Starting Receiver"; - shm()->stoppedFlag = false; - if (shm()->useReceiver) { - sendToReceiver(F_START_RECEIVER, nullptr, nullptr); - } -} - -void Module::stopReceiver() { - LOG(logDEBUG1) << "Stopping Receiver"; - if (shm()->useReceiver) { - int arg = static_cast(shm()->stoppedFlag); - sendToReceiver(F_STOP_RECEIVER, arg, nullptr); - } -} - -slsDetectorDefs::runStatus Module::getReceiverStatus() const { - runStatus retval = ERROR; - LOG(logDEBUG1) << "Getting Receiver Status"; - if (shm()->useReceiver) { - sendToReceiver(F_GET_RECEIVER_STATUS, nullptr, retval); - LOG(logDEBUG1) << "Receiver Status: " << ToString(retval); - } - return retval; -} - int64_t Module::getFramesCaughtByReceiver() const { int64_t retval = -1; LOG(logDEBUG1) << "Getting Frames Caught by Receiver"; @@ -3068,15 +3026,6 @@ uint64_t Module::getReceiverCurrentFrameIndex() const { return retval; } -int Module::getReceiverProgress() const { - int retval = -1; - if (shm()->useReceiver) { - sendToReceiver(F_GET_RECEIVER_PROGRESS, nullptr, retval); - LOG(logDEBUG1) << "Current Progress of Receiver: " << retval; - } - return retval; -} - void Module::setFileWrite(bool value) { if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (file write enable)"); @@ -3208,13 +3157,6 @@ void Module::setReceiverSilentMode(bool enable) { sendToReceiver(F_SET_RECEIVER_SILENT_MODE, arg, nullptr); } -void Module::restreamStopFromReceiver() { - LOG(logDEBUG1) << "Restream stop dummy from Receiver via zmq"; - if (shm()->useReceiver) { - sendToReceiver(F_RESTREAM_STOP_FROM_RECEIVER, nullptr, nullptr); - } -} - void Module::setPattern(const std::string &fname) { uint64_t word; uint64_t addr = 0; diff --git a/slsDetectorSoftware/src/Module.h b/slsDetectorSoftware/src/Module.h index ce38d4fea..6534b7d68 100755 --- a/slsDetectorSoftware/src/Module.h +++ b/slsDetectorSoftware/src/Module.h @@ -84,9 +84,6 @@ struct sharedModule { /** num udp interfaces */ int numUDPInterfaces; - - /** stopped flag to inform rxr */ - bool stoppedFlag; }; class Module : public virtual slsDetectorDefs { @@ -1349,22 +1346,6 @@ class Module : public virtual slsDetectorDefs { void setPartialFramesPadding(bool padding); - /** - * Receiver starts listening to packets - */ - void startReceiver(); - - /** - * Stops the listening mode of receiver - */ - void stopReceiver(); - - /** - * Gets the status of the listening mode of receiver - * @returns status - */ - runStatus getReceiverStatus() const; - /** * Gets the number of frames caught by receiver * @returns number of frames caught by receiver @@ -1379,8 +1360,6 @@ class Module : public virtual slsDetectorDefs { * @returns current frame index of receiver */ uint64_t getReceiverCurrentFrameIndex() const; - int getReceiverProgress() const; - void setFileWrite(bool value); bool getFileWrite(); @@ -1431,14 +1410,6 @@ class Module : public virtual slsDetectorDefs { bool getReceiverSilentMode(); void setReceiverSilentMode(bool enable); - /** - * If data streaming in receiver is enabled, - * restream the stop dummy packet from receiver - * Used usually for Moench, - * in case it is lost in network due to high data rate - */ - void restreamStopFromReceiver(); - /** * Opens pattern file and sends pattern to CTB * @param fname pattern file to open diff --git a/slsDetectorSoftware/src/Receiver.cpp b/slsDetectorSoftware/src/Receiver.cpp index 38505d3d4..a314ac5f2 100755 --- a/slsDetectorSoftware/src/Receiver.cpp +++ b/slsDetectorSoftware/src/Receiver.cpp @@ -2,6 +2,7 @@ #include "ClientSocket.h" #include "string_utils.h" #include "versionAPI.h" +#include "ToString.h" namespace sls { @@ -116,7 +117,7 @@ Receiver::Receiver(int detector_id, int module_id, int receiver_id, shm()->shmversion = RECEIVER_SHMVERSION; memset(shm()->hostname, 0, MAX_STR_LENGTH); shm()->tcpPort = DEFAULT_RX_PORTNO + receiver_id; - shm()->valid = false; + shm()-> stoppedFlag = false; shm()->zmqPort = DEFAULT_ZMQ_RX_PORTNO + receiver_id; shm()->zmqIp = IpAddr{}; @@ -174,7 +175,7 @@ int Receiver::getTCPPort() const { void Receiver::setTCPPort(const int port) { if (port >= 0 && port != shm()->tcpPort) { - if (shm()->valid) { + if (strlen(shm()->hostname) != 0) { // send to receiver to change tcpp port shm()->tcpPort = port; // for now } else { @@ -184,10 +185,8 @@ void Receiver::setTCPPort(const int port) { } void Receiver::configure() { - shm()->valid = false; LOG(logINFOBLUE) << receiverId << " configured!"; checkVersionCompatibility(); - shm()->valid = true; } void Receiver::checkVersionCompatibility() { @@ -198,4 +197,42 @@ void Receiver::checkVersionCompatibility() { sendToReceiver(F_RECEIVER_CHECK_VERSION, arg, nullptr); } +void Receiver::start() { + LOG(logDEBUG1) << "Starting Receiver"; + shm()->stoppedFlag = false; + sendToReceiver(F_START_RECEIVER, nullptr, nullptr); +} + +void Receiver::stop() { + LOG(logDEBUG1) << "Stopping Receiver"; + int arg = static_cast(shm()->stoppedFlag); + sendToReceiver(F_STOP_RECEIVER, arg, nullptr); +} + +slsDetectorDefs::runStatus Receiver::getStatus() const { + runStatus retval = ERROR; + LOG(logDEBUG1) << "Getting Receiver Status"; + sendToReceiver(F_GET_RECEIVER_STATUS, nullptr, retval); + LOG(logDEBUG1) << "Receiver Status: " << ToString(retval); + return retval; +} + + +int Receiver::getProgress() const { + int retval = -1; + sendToReceiver(F_GET_RECEIVER_PROGRESS, nullptr, retval); + LOG(logDEBUG1) << "Current Progress of Receiver: " << retval; + return retval; +} + +void Receiver::setStoppedFlag() { + shm()->stoppedFlag = true; +} + +void Receiver::restreamStop() { + LOG(logDEBUG1) << "Restream stop dummy from Receiver via zmq"; + sendToReceiver(F_RESTREAM_STOP_FROM_RECEIVER, nullptr, nullptr); +} + + } // namespace sls \ No newline at end of file diff --git a/slsDetectorSoftware/src/Receiver.h b/slsDetectorSoftware/src/Receiver.h index b98617bc4..ec853850c 100755 --- a/slsDetectorSoftware/src/Receiver.h +++ b/slsDetectorSoftware/src/Receiver.h @@ -4,20 +4,18 @@ #include "sls_detector_defs.h" #include "network_utils.h" -#define RECEIVER_SHMVERSION 0x200414 +#define RECEIVER_SHMVERSION 0x200417 namespace sls { struct sharedReceiver { /* FIXED PATTERN FOR STATIC FUNCTIONS. DO NOT CHANGE, ONLY APPEND ------*/ - int shmversion; char hostname[MAX_STR_LENGTH]; int tcpPort; - bool valid; - /** END OF FIXED PATTERN -----------------------------------------------*/ + int stoppedFlag; int zmqPort; sls::IpAddr zmqIp; @@ -49,55 +47,61 @@ namespace sls { void setHostname(const std::string &hostname); int getTCPPort() const; void setTCPPort(const int port); - void configure(); - void checkVersionCompatibility(); + + void start(); + void stop(); + slsDetectorDefs::runStatus getStatus() const; + int getProgress() const; + void setStoppedFlag(); + void restreamStop(); private: - /** - * Send function parameters to receiver - * @param fnum function enum - * @param args argument pointer - * @param args_size size of argument - * @param retval return pointers - * @param retval_size size of return value - */ - void sendToReceiver(int fnum, const void *args, size_t args_size, - void *retval, size_t retval_size); + /** + * Send function parameters to receiver + * @param fnum function enum + * @param args argument pointer + * @param args_size size of argument + * @param retval return pointers + * @param retval_size size of return value + */ + void sendToReceiver(int fnum, const void *args, size_t args_size, + void *retval, size_t retval_size); - void sendToReceiver(int fnum, const void *args, size_t args_size, - void *retval, size_t retval_size) const; + void sendToReceiver(int fnum, const void *args, size_t args_size, + void *retval, size_t retval_size) const; - template - void sendToReceiver(int fnum, const Arg &args, Ret &retval); + template + void sendToReceiver(int fnum, const Arg &args, Ret &retval); - template - void sendToReceiver(int fnum, const Arg &args, Ret &retval) const; + template + void sendToReceiver(int fnum, const Arg &args, Ret &retval) const; - template - void sendToReceiver(int fnum, const Arg &args, std::nullptr_t); + template + void sendToReceiver(int fnum, const Arg &args, std::nullptr_t); - template - void sendToReceiver(int fnum, const Arg &args, std::nullptr_t) const; + template + void sendToReceiver(int fnum, const Arg &args, std::nullptr_t) const; - template - void sendToReceiver(int fnum, std::nullptr_t, Ret &retval); + template + void sendToReceiver(int fnum, std::nullptr_t, Ret &retval); - template - void sendToReceiver(int fnum, std::nullptr_t, Ret &retval) const; + template + void sendToReceiver(int fnum, std::nullptr_t, Ret &retval) const; - template - Ret sendToReceiver(int fnum); + template + Ret sendToReceiver(int fnum); - template - Ret sendToReceiver(int fnum) const; + template + Ret sendToReceiver(int fnum) const; - template - Ret sendToReceiver(int fnum, const Arg &args); - - template - Ret sendToReceiver(int fnum, const Arg &args) const; + template + Ret sendToReceiver(int fnum, const Arg &args); + template + Ret sendToReceiver(int fnum, const Arg &args) const; + void configure(); + void checkVersionCompatibility(); const int receiverId{0}; const int moduleId{0}; mutable sls::SharedMemory shm{0, 0, 0, true};