From d536ad2b5b5171ad934060fe547460c8ff68c861 Mon Sep 17 00:00:00 2001 From: Dhanya Thattil Date: Thu, 16 Apr 2020 13:58:59 +0200 Subject: [PATCH] WIP, rxr constr done --- slsDetectorSoftware/include/Detector.h | 24 +- slsDetectorSoftware/src/CmdProxy.cpp | 140 +++++++--- slsDetectorSoftware/src/CmdProxy.h | 11 +- slsDetectorSoftware/src/Detector.cpp | 143 +++++++--- slsDetectorSoftware/src/DetectorImpl.cpp | 198 +++++++++++--- slsDetectorSoftware/src/DetectorImpl.h | 335 ++++++++++++++++++++++- slsDetectorSoftware/src/Module.cpp | 205 ++++++++------ slsDetectorSoftware/src/Module.h | 14 +- slsDetectorSoftware/src/Receiver.cpp | 83 +++--- slsDetectorSoftware/src/Receiver.h | 26 +- slsDetectorSoftware/src/SharedMemory.h | 41 ++- 11 files changed, 943 insertions(+), 277 deletions(-) diff --git a/slsDetectorSoftware/include/Detector.h b/slsDetectorSoftware/include/Detector.h index 6f3f4e658..4f7ffb03f 100644 --- a/slsDetectorSoftware/include/Detector.h +++ b/slsDetectorSoftware/include/Detector.h @@ -19,7 +19,7 @@ class IpAddr; //Free function to avoid dependence on class //and avoid the option to free another objects //shm by mistake -void freeSharedMemory(int detectorId, int detPos = -1); +void freeSharedMemory(int detectorId, int moduleId = -1); /** @@ -57,6 +57,8 @@ class Detector { /* Frees shared memory, adds detectors to the list * and updates local detector cache */ void setHostname(const std::vector &hostname); + void setHostname(const std::vector &hostname, + const std::vector &port); /** connects to n servers at local host starting at specific control port */ void setVirtualDetectorServers(int numServers, int startingPort); @@ -507,24 +509,30 @@ class Detector { Result getUseReceiverFlag(Positions pos = {}) const; Result getRxHostname(Positions pos = {}) const; + Result getRxHostname2(Positions pos = {}) const; /** * Validates and sets the receiver. - * Updates local receiver cache parameters * Configures the detector to the receiver as UDP destination - * @param receiver receiver hostname or IP address, can include tcp port eg. hostname:port */ - void setRxHostname(const std::string &receiver, Positions pos = {}); - - /** multiple rx hostnames (same as setRxHostname) */ - void setRxHostname(const std::vector &name); + void setRxHostname(const std::string &hostname, Positions pos = {}); + /** receiver hostname for udp port 2 */ + void setRxHostname2(const std::string &hostname, Positions pos = {}); + /** cannot be for multiple detectors as port is unique*/ + void setRxHostname(const std::string &hostname, const int port, + int module_id); + void setRxHostname2(const std::string &hostname, const int port, + int module_id); Result getRxPort(Positions pos = {}) const; + /** for 2nd udp port receiver */ + Result getRxPort2(Positions pos = {}) const; /** Receiver TCP port (for client communication with Receiver) * module_id is -1 for all detectors, ports for each module is calculated * (increments) */ - void setRxPort(int port, int module_id = -1); + void setRxPort(const int port, int module_id = -1); + void setRxPort2(const int port, int module_id = -1); Result getRxFifoDepth(Positions pos = {}) const; diff --git a/slsDetectorSoftware/src/CmdProxy.cpp b/slsDetectorSoftware/src/CmdProxy.cpp index 1c578757e..91dc393c3 100644 --- a/slsDetectorSoftware/src/CmdProxy.cpp +++ b/slsDetectorSoftware/src/CmdProxy.cpp @@ -115,13 +115,27 @@ std::string CmdProxy::ListCommands(int action) { } /* configuration */ +std::pair + CmdProxy::parseHostnameAndPort(std::string name) { + std::string host = name; + std::string hostname; + int port = 0; + auto res = sls::split(host, ':'); + if (res.size() > 1) { + hostname = res[0]; + port = StringTo(res[1]); + } + return std::make_pair(host, port); +} + std::string CmdProxy::Hostname(int action) { std::ostringstream os; os << cmd << ' '; if (action == defs::HELP_ACTION) { os << "\n\tFrees shared memory and sets hostname (or IP address) of " - "all modules concatenated by +." + "all modules concatenated by +.\n\t" + "[hostname or ip address]:[tcp port] Use this for virtual servers\n\t" << '\n'; } else if (action == defs::GET_ACTION) { if (!args.empty()) { @@ -136,17 +150,35 @@ std::string CmdProxy::Hostname(int action) { if (det_id != -1) { throw sls::RuntimeError("Cannot execute this at module level"); } + std::vector arguments; // only args[0], but many hostames concatenated with + if (args[0].find('+') != std::string::npos) { - auto t = sls::split(args[0], '+'); - det->setHostname(t); - os << ToString(t) << '\n'; + if (args.size() > 1) { + throw sls::RuntimeError("Cannot have concatenated hostnames and" + "multiple arguments"); + } + arguments = sls::split(args[0], '+'); } // either hostnames separated by space, or single hostname else { - det->setHostname(args); - os << ToString(args) << '\n'; + arguments.assign(args.begin(), args.end()); } + // separate hostname and port + std::vector hostnames; + std::vector ports; + for (size_t i = 0; i < arguments.size(); ++i) { + std::pair res = parseHostnameAndPort(arguments[i]); + hostnames.push_back(res.first); + if (res.second == 0) { + ports.push_back(DEFAULT_PORTNO); + } else { + ports.push_back(res.second); + } + } + det->setHostname(hostnames, ports); + auto t = det->getHostname({det_id}); + os << OutString(t) << '\n'; + } else { throw sls::RuntimeError("Unknown action"); } @@ -905,16 +937,19 @@ std::string CmdProxy::UDPDestinationIP2(int action) { } /* Receiver Config */ -std::string CmdProxy::ReceiveHostname(int action) { +std::string CmdProxy::ReceiverHostname(int action) { std::ostringstream os; os << cmd << ' '; if (action == defs::HELP_ACTION) { os << "[hostname or ip address]\n\t" "[hostname or ip address]:[tcp port]\n\t" - "[hostname1]:[tcp_port1]+[hostname2]:[tcp_port2]+\n\t" - "Receiver hostname or IP. If port included, then the receiver tcp port.\n\t" + "Receiver hostname or IP. Port is the receiver tcp port (optional).\n\t" "Used for TCP control communication between client and receiver " - "to configure receiver. Also updates receiver with detector parameters." + "to configure receiver. Also updates receiver with detector parameters.\n\t" + "TCP port must be unique, if included.\n\t" + "If port not included and not set earlier, then it takes default port 1954" + " and calculates from there. \n\t" + "[Eiger][Jungfrau] For the 2nd udp interface, use rx_hostname2." << '\n'; } else if (action == defs::GET_ACTION) { if (!args.empty()) { @@ -923,38 +958,69 @@ std::string CmdProxy::ReceiveHostname(int action) { auto t = det->getRxHostname({det_id}); os << OutString(t) << '\n'; } else if (action == defs::PUT_ACTION) { - if (args.size() < 1) { + if (args.size() != 1) { WrongNumberOfParameters(1); } - // multiple arguments - if (args.size() > 1) { - // multiple in mulitple - if (args[0].find('+') != std::string::npos) { - throw sls::RuntimeError("Cannot add multiple receivers at module level"); + if (args[0].find('+') != std::string::npos) { + throw sls::RuntimeError("Cannot concatenate receiver hostnames"); + } + std::pair res = parseHostnameAndPort(args[0]); + std::string hostname = res.first; + int port = res.second; + if (port == 0) { + det->setRxHostname(hostname, {det_id}); + } else { + if (det_id == -1) { + throw sls::RuntimeError("Cannot set same tcp port " + "for all receiver hostnames"); } - if (det_id != -1) { - throw sls::RuntimeError("Cannot add multiple receivers at module level"); - } - det->setRxHostname(args); - os << ToString(args) << '\n'; + det->setRxHostname(hostname, port, det_id); } - // single argument - else { - // multiple receivers concatenated with + - if (args[0].find('+') != std::string::npos) { - if (det_id != -1) { - throw sls::RuntimeError("Cannot add multiple receivers at module level"); - } - auto t = sls::split(args[0], '+'); - det->setRxHostname(t); - os << ToString(t) << '\n'; - } - // single receiver - else { - det->setRxHostname(args[0], {det_id}); - os << ToString(args) << '\n'; - } + auto t = det->getRxHostname({det_id}); + os << OutString(t) << '\n'; + } else { + throw sls::RuntimeError("Unknown action"); + } + return os.str(); +} + +std::string CmdProxy::ReceiverHostname2(int action) { + std::ostringstream os; + os << cmd << ' '; + if (action == defs::HELP_ACTION) { + os << "[hostname or ip address]\n\t" + "[hostname or ip address]:[tcp port]\n\t" + "[Eiger][Jungfrau] Receiver hostname or IP for the second udp port. " + "Port is the receiver tcp port (optional).\n\t" + "Refer rx_hostname help for details" + << '\n'; + } else if (action == defs::GET_ACTION) { + if (!args.empty()) { + WrongNumberOfParameters(0); } + auto t = det->getRxHostname2({det_id}); + os << OutString(t) << '\n'; + } else if (action == defs::PUT_ACTION) { + if (args.size() != 1) { + WrongNumberOfParameters(1); + } + if (args[0].find('+') != std::string::npos) { + throw sls::RuntimeError("Cannot concatenate receiver hostnames"); + } + std::pair res = parseHostnameAndPort(args[0]); + std::string hostname = res.first; + int port = res.second; + if (port == 0) { + det->setRxHostname2(hostname, {det_id}); + } else { + if (det_id == -1) { + throw sls::RuntimeError("Cannot set same tcp port " + "for all receiver hostnames"); + } + det->setRxHostname2(hostname, port, det_id); + } + auto t = det->getRxHostname2({det_id}); + os << OutString(t) << '\n'; } else { throw sls::RuntimeError("Unknown action"); } diff --git a/slsDetectorSoftware/src/CmdProxy.h b/slsDetectorSoftware/src/CmdProxy.h index bc48de4ba..2d4490ee5 100644 --- a/slsDetectorSoftware/src/CmdProxy.h +++ b/slsDetectorSoftware/src/CmdProxy.h @@ -719,8 +719,10 @@ class CmdProxy { {"txndelay_right", &CmdProxy::txndelay_right}, /* Receiver Config */ - {"rx_hostname", &CmdProxy::ReceiveHostname}, + {"rx_hostname", &CmdProxy::ReceiverHostname}, + {"rx_hostname2", &CmdProxy::ReceiverHostname2}, {"rx_tcpport", &CmdProxy::rx_tcpport}, + {"rx_tcpport2", &CmdProxy::rx_tcpport2}, {"rx_fifodepth", &CmdProxy::rx_fifodepth}, {"rx_silent", &CmdProxy::rx_silent}, {"rx_discardpolicy", &CmdProxy::rx_discardpolicy}, @@ -911,6 +913,7 @@ class CmdProxy { /* configuration */ std::string free(int action); // std::string config2(int action); + std::pair parseHostnameAndPort(std::string name); std::string Hostname(int action); std::string VirtualServer(int action); std::string FirmwareVersion(int action); @@ -940,7 +943,8 @@ class CmdProxy { std::string UDPDestinationIP(int action); std::string UDPDestinationIP2(int action); /* Receiver Config */ - std::string ReceiveHostname(int action); + std::string ReceiverHostname(int action); + std::string ReceiverHostname2(int action); /* File */ /* ZMQ Streaming Parameters (Receiver<->Client) */ /* Eiger Specific */ @@ -1433,6 +1437,9 @@ class CmdProxy { INTEGER_COMMAND(rx_tcpport, getRxPort, setRxPort, StringTo, "[port]\n\tTCP port for client-receiver communication. Default is 1954. Must be different if multiple receivers on same pc. Must be first command to set a receiver parameter. Multi command will automatically increment for individual modules."); + + INTEGER_COMMAND(rx_tcpport2, getRxPort2, setRxPort2, StringTo, + "[port]\n\t[Eiger][Jungfrau] TCP port for client-receiver communication for 2nd udp port. For details, refer rx_tcpport."); INTEGER_COMMAND(rx_fifodepth, getRxFifoDepth, setRxFifoDepth, StringTo, "[n_frames]\n\tSet the number of frames in the receiver fifo (buffer between listener and writer threads)."); diff --git a/slsDetectorSoftware/src/Detector.cpp b/slsDetectorSoftware/src/Detector.cpp index a31064f6f..3b42ecb7f 100644 --- a/slsDetectorSoftware/src/Detector.cpp +++ b/slsDetectorSoftware/src/Detector.cpp @@ -6,6 +6,7 @@ #include "logger.h" #include "DetectorImpl.h" #include "Module.h" +#include "Receiver.h" #include "sls_detector_defs.h" #include "versionAPI.h" @@ -13,13 +14,25 @@ namespace sls { -void freeSharedMemory(int detectorId, int detPos) { +void freeSharedMemory(int detectorId, int moduleId) { // single - if (detPos >= 0) { - SharedMemory moduleShm(detectorId, detPos); + if (moduleId >= 0) { + SharedMemory moduleShm(detectorId, moduleId); + int numReceivers = 0, numReceivers2 = 0; if (moduleShm.IsExisting()) { + moduleShm.OpenSharedMemory(); + if (Module::hasSharedMemoryReceiverList(moduleShm()->shmversion)) { + numReceivers = moduleShm()->numberOfReceivers; + numReceivers2 = moduleShm()->numberOfReceivers2; + } moduleShm.RemoveSharedMemory(); } + for (int iReceiver = 0; iReceiver < numReceivers + numReceivers2; ++iReceiver) { + SharedMemory receiverShm(detectorId, moduleId, iReceiver); + if (receiverShm.IsExisting()) { + receiverShm.RemoveSharedMemory(); + } + } return; } @@ -29,13 +42,27 @@ void freeSharedMemory(int detectorId, int detPos) { if (detectorShm.IsExisting()) { detectorShm.OpenSharedMemory(); - numDetectors = detectorShm()->numberOfDetectors; + numDetectors = detectorShm()->numberOfModules; detectorShm.RemoveSharedMemory(); } - for (int i = 0; i < numDetectors; ++i) { - SharedMemory moduleShm(detectorId, i); - moduleShm.RemoveSharedMemory(); + for (int iModule = 0; iModule < numDetectors; ++iModule) { + SharedMemory moduleShm(detectorId, iModule); + int numReceivers = 0, numReceivers2 = 0; + if (moduleShm.IsExisting()) { + moduleShm.OpenSharedMemory(); + if (Module::hasSharedMemoryReceiverList(moduleShm()->shmversion)) { + numReceivers = moduleShm()->numberOfReceivers; + numReceivers2 = moduleShm()->numberOfReceivers2; + } + moduleShm.RemoveSharedMemory(); + } + for (int iReceiver = 0; iReceiver < numReceivers + numReceivers2; ++iReceiver) { + SharedMemory receiverShm(detectorId, iModule, iReceiver); + if (receiverShm.IsExisting()) { + receiverShm.RemoveSharedMemory(); + } + } } } @@ -91,6 +118,11 @@ void Detector::setHostname(const std::vector &hostname) { pimpl->setHostname(hostname); } +void Detector::setHostname(const std::vector &hostname, + const std::vector &port) { + pimpl->setHostname(hostname, port); +} + void Detector::setVirtualDetectorServers(int numServers, int startingPort) { pimpl->setVirtualDetectorServers(numServers, startingPort); } @@ -664,46 +696,80 @@ Result Detector::getUseReceiverFlag(Positions pos) const { } Result Detector::getRxHostname(Positions pos) const { - return pimpl->Parallel(&Module::getReceiverHostname, pos); + return pimpl->Parallel1(&Receiver::getHostname, pos, {0}); } -void Detector::setRxHostname(const std::string &receiver, Positions pos) { - pimpl->Parallel(&Module::setReceiverHostname, pos, receiver); +Result Detector::getRxHostname2(Positions pos) const { + return pimpl->Parallel2(&Receiver::getHostname, pos, {0}); } -void Detector::setRxHostname(const std::vector &name) { - // set all to same rx_hostname - if (name.size() == 1) { - pimpl->Parallel(&Module::setReceiverHostname, {}, name[0]); - } else { - if ((int)name.size() != size()) { - throw RuntimeError("Receiver hostnames size " + - std::to_string(name.size()) + " does not match detector size " + - std::to_string(size())); - } - // set each rx_hostname - for (int idet = 0; idet < size(); ++idet) { - pimpl->Parallel(&Module::setReceiverHostname, {idet}, name[idet]); - } +void Detector::setRxHostname(const std::string &hostname, Positions pos) { + if (!pimpl->isReceiverInitialized()) { + pimpl->initReceiver(); } + pimpl->Parallel1(&Receiver::setHostname, pos, {0}, hostname); +} + +void Detector::setRxHostname2(const std::string &hostname, Positions pos) { + if (!pimpl->isReceiver2Initialized()) { + pimpl->initReceiver2(); + } + pimpl->Parallel2(&Receiver::setHostname, pos, {0}, hostname); +} + +void Detector::setRxHostname(const std::string &hostname, const int port, + int module_id) { + if (!pimpl->isReceiverInitialized()) { + pimpl->initReceiver(); + } + pimpl->Parallel1(&Receiver::setTCPPort, {module_id}, {0}, port); + pimpl->Parallel1(&Receiver::setHostname, {module_id}, {0}, hostname); +} + +void Detector::setRxHostname2(const std::string &hostname, const int port, + int module_id) { + if (!pimpl->isReceiver2Initialized()) { + pimpl->initReceiver2(); + } + pimpl->Parallel2(&Receiver::setTCPPort, {module_id}, {0}, port); + pimpl->Parallel2(&Receiver::setHostname, {module_id}, {0}, hostname); } Result Detector::getRxPort(Positions pos) const { - return pimpl->Parallel(&Module::getReceiverPort, pos); + return pimpl->Parallel1(&Receiver::getTCPPort, pos, {0}); +} + +Result Detector::getRxPort2(Positions pos) const { + return pimpl->Parallel2(&Receiver::getTCPPort, pos, {0}); } void Detector::setRxPort(int port, int module_id) { + if (!pimpl->isReceiverInitialized()) { + pimpl->initReceiver(); + } if (module_id == -1) { - std::vector port_list(size()); - for (auto &it : port_list) { - it = port++; - } + std::vector port_list = getPortNumbers(port); for (int idet = 0; idet < size(); ++idet) { - pimpl->Parallel(&Module::setReceiverPort, {idet}, + pimpl->Parallel1(&Receiver::setTCPPort, {idet}, {0}, port_list[idet]); } } else { - pimpl->Parallel(&Module::setReceiverPort, {module_id}, port); + pimpl->Parallel1(&Receiver::setTCPPort, {module_id}, {0}, port); + } +} + +void Detector::setRxPort2(int port, int module_id) { + if (!pimpl->isReceiver2Initialized()) { + pimpl->initReceiver2(); + } + if (module_id == -1) { + std::vector port_list = getPortNumbers(port); + for (int idet = 0; idet < size(); ++idet) { + pimpl->Parallel2(&Receiver::setTCPPort, {idet}, {0}, + port_list[idet]); + } + } else { + pimpl->Parallel2(&Receiver::setTCPPort, {module_id}, {0}, port); } } @@ -1822,23 +1888,10 @@ Result Detector::getRxCurrentFrameIndex(Positions pos) const { } std::vector Detector::getPortNumbers(int start_port) { - int num_sockets_per_detector = 1; - switch (getDetectorType().squash()) { - case defs::EIGER: - num_sockets_per_detector *= 2; - break; - case defs::JUNGFRAU: - if (getNumberofUDPInterfaces().squash() == 2) { - num_sockets_per_detector *= 2; - } - break; - default: - break; - } std::vector res; res.reserve(size()); for (int idet = 0; idet < size(); ++idet) { - res.push_back(start_port + (idet * num_sockets_per_detector)); + res.push_back(start_port + idet); } return res; } diff --git a/slsDetectorSoftware/src/DetectorImpl.cpp b/slsDetectorSoftware/src/DetectorImpl.cpp index c06b46174..96ac19723 100755 --- a/slsDetectorSoftware/src/DetectorImpl.cpp +++ b/slsDetectorSoftware/src/DetectorImpl.cpp @@ -5,6 +5,7 @@ #include "file_utils.h" #include "logger.h" #include "Module.h" +#include "Receiver.h" #include "sls_detector_exceptions.h" #include "versionAPI.h" @@ -49,12 +50,24 @@ void DetectorImpl::setAcquiringFlag(bool flag) { int DetectorImpl::getDetectorId() const { return detectorId; } -void DetectorImpl::freeSharedMemory(int detectorId, int detPos) { +void DetectorImpl::freeSharedMemory(int detectorId, int moduleId) { // single - if (detPos >= 0) { - SharedMemory module_shm(detectorId, detPos); - if (module_shm.IsExisting()) { - module_shm.RemoveSharedMemory(); + if (moduleId >= 0) { + SharedMemory moduleShm(detectorId, moduleId); + int numReceivers = 0, numReceivers2 = 0; + if (moduleShm.IsExisting()) { + moduleShm.OpenSharedMemory(); + if (Module::hasSharedMemoryReceiverList(moduleShm()->shmversion)) { + numReceivers = moduleShm()->numberOfReceivers; + numReceivers2 = moduleShm()->numberOfReceivers2; + } + moduleShm.RemoveSharedMemory(); + } + for (int iReceiver = 0; iReceiver < numReceivers + numReceivers2; ++iReceiver) { + SharedMemory receiverShm(detectorId, moduleId, iReceiver); + if (receiverShm.IsExisting()) { + receiverShm.RemoveSharedMemory(); + } } return; } @@ -65,13 +78,27 @@ void DetectorImpl::freeSharedMemory(int detectorId, int detPos) { if (detectorShm.IsExisting()) { detectorShm.OpenSharedMemory(); - numDetectors = detectorShm()->numberOfDetectors; + numDetectors = detectorShm()->numberOfModules; detectorShm.RemoveSharedMemory(); } - for (int i = 0; i < numDetectors; ++i) { - SharedMemory module_shm(detectorId, i); - module_shm.RemoveSharedMemory(); + for (int iModule = 0; iModule < numDetectors; ++iModule) { + SharedMemory moduleShm(detectorId, iModule); + int numReceivers = 0, numReceivers2 = 0; + if (moduleShm.IsExisting()) { + moduleShm.OpenSharedMemory(); + if (Module::hasSharedMemoryReceiverList(moduleShm()->shmversion)) { + numReceivers = moduleShm()->numberOfReceivers; + numReceivers2 = moduleShm()->numberOfReceivers2; + } + moduleShm.RemoveSharedMemory(); + } + for (int iReceiver = 0; iReceiver < numReceivers + numReceivers2; ++iReceiver) { + SharedMemory receiverShm(detectorId, iModule, iReceiver); + if (receiverShm.IsExisting()) { + receiverShm.RemoveSharedMemory(); + } + } } } @@ -81,7 +108,18 @@ void DetectorImpl::freeSharedMemory() { d->freeSharedMemory(); } detectors.clear(); - + for (auto &dr : receivers) { + for (auto &r : dr) { + r->freeSharedMemory(); + } + } + receivers.clear(); + for (auto &dr : receivers2) { + for (auto &r : dr) { + r->freeSharedMemory(); + } + } + receivers2.clear(); // clear multi detector shm detector_shm.RemoveSharedMemory(); client_downstream = false; @@ -149,7 +187,7 @@ void DetectorImpl::initSharedMemory(bool verify) { void DetectorImpl::initializeDetectorStructure() { detector_shm()->shmversion = DETECTOR_SHMVERSION; - detector_shm()->numberOfDetectors = 0; + detector_shm()->numberOfModules = 0; detector_shm()->multiDetectorType = GENERIC; detector_shm()->numberOfDetector.x = 0; detector_shm()->numberOfDetector.y = 0; @@ -163,16 +201,37 @@ void DetectorImpl::initializeDetectorStructure() { void DetectorImpl::initializeMembers(bool verify) { // DetectorImpl zmqSocket.clear(); + int numModules = detector_shm()->numberOfModules; // get objects from single det shared memory (open) - for (int i = 0; i < detector_shm()->numberOfDetectors; i++) { - try { + try { + for (int iModule = 0; iModule < numModules; ++iModule) { detectors.push_back( - sls::make_unique(detectorId, i, verify)); - } catch (...) { - detectors.clear(); - throw; + sls::make_unique(detectorId, iModule, verify)); + int numReceivers = detectors[iModule]->getNumberOfReceivers(); + if (numReceivers != 0) { + receivers.resize(numModules); + for (int iReceiver = 0; iReceiver < numReceivers; ++iReceiver) { + receivers[iModule].push_back( + sls::make_unique(detectorId, iModule, iReceiver, + true, verify)); + } + } + int numReceivers2 = detectors[iModule]->getNumberOfReceivers2(); + if (numReceivers2 != 0) { + receivers2.resize(numModules); + for (int iReceiver = 0; iReceiver < numReceivers2; ++iReceiver) { + receivers2[iModule].push_back( + sls::make_unique(detectorId, iModule, iReceiver, + false, verify)); + } + } } + } catch (...) { + detectors.clear(); + receivers.clear(); + receivers2.clear(); + throw; } } @@ -222,17 +281,18 @@ std::string DetectorImpl::exec(const char *cmd) { void DetectorImpl::setVirtualDetectorServers(const int numdet, const int port) { std::vector hostnames; + std::vector ports; for (int i = 0; i < numdet; ++i) { + hostnames.push_back(std::string("localhost")); // * 2 is for control and stop port - hostnames.push_back(std::string("localhost:") + - std::to_string(port + i * 2)); + ports.push_back(port + i * 2); } - setHostname(hostnames); + setHostname(hostnames, ports); } void DetectorImpl::setHostname(const std::vector &name) { // this check is there only to allow the previous detsizechan command - if (detector_shm()->numberOfDetectors != 0) { + if (detector_shm()->numberOfModules != 0) { LOG(logWARNING) << "There are already detector(s) in shared memory." "Freeing Shared memory now."; @@ -242,27 +302,41 @@ void DetectorImpl::setHostname(const std::vector &name) { detector_shm()->initialChecks = initialChecks; } for (const auto &hostname : name) { - addSlsDetector(hostname); + addModule(hostname, DEFAULT_PORTNO); } updateDetectorSize(); } -void DetectorImpl::addSlsDetector(const std::string &hostname) { +void DetectorImpl::setHostname(const std::vector &name, + const std::vector &port) { + if (name.size() != port.size()) { + throw RuntimeError("hostname vector size and port vector size do not match"); + } + // this check is there only to allow the previous detsizechan command + if (detector_shm()->numberOfModules != 0) { + LOG(logWARNING) + << "There are already detector(s) in shared memory." + "Freeing Shared memory now."; + bool initialChecks = detector_shm()->initialChecks; + freeSharedMemory(); + setupDetector(); + detector_shm()->initialChecks = initialChecks; + } + for (size_t i = 0; i < name.size(); ++i) { + addModule(name[i], port[i]); + } + updateDetectorSize(); +} + +void DetectorImpl::addModule(const std::string &hostname, + const int port) { LOG(logINFO) << "Adding detector " << hostname; - int port = DEFAULT_PORTNO; - std::string host = hostname; - auto res = sls::split(hostname, ':'); - if (res.size() > 1) { - host = res[0]; - port = StringTo(res[1]); - } - - if (host != "localhost") { + if (hostname != "localhost") { for (auto &d : detectors) { - if (d->getHostname() == host) { + if (d->getHostname() == hostname) { LOG(logWARNING) - << "Detector " << host + << "Detector " << hostname << "already part of the Detector!" << std::endl << "Remove it before adding it back in a new position!"; return; @@ -271,14 +345,14 @@ void DetectorImpl::addSlsDetector(const std::string &hostname) { } // get type by connecting - detectorType type = Module::getTypeFromDetector(host, port); + detectorType type = Module::getTypeFromDetector(hostname, port); auto pos = detectors.size(); detectors.emplace_back( sls::make_unique(type, detectorId, pos, false)); - detector_shm()->numberOfDetectors = detectors.size(); + detector_shm()->numberOfModules = detectors.size(); detectors[pos]->setControlPort(port); detectors[pos]->setStopPort(port + 1); - detectors[pos]->setHostname(host, detector_shm()->initialChecks); + detectors[pos]->setHostname(hostname, detector_shm()->initialChecks); // detector type updated by now detector_shm()->multiDetectorType = Parallel(&Module::getDetectorType, {}) @@ -287,6 +361,54 @@ void DetectorImpl::addSlsDetector(const std::string &hostname) { detectors[pos]->updateNumberOfChannels(); } +void DetectorImpl::initReceiver() { + if (receivers.size() != 0) { + throw RuntimeError("receiver vector already initialized"); + } + int tcpPort = DEFAULT_RX_PORTNO; + int zmqPort = DEFAULT_ZMQ_CL_PORTNO; + try { + for (int iModule = 0; iModule < size(); ++iModule) { + receivers.resize(detectors.size()); + receivers[iModule].push_back( + sls::make_unique(detectorId, iModule, 0, + true, tcpPort++, "", zmqPort++)); + detectors[iModule]->setNumberOfReceivers(1); + } + } catch (...) { + receivers.clear(); + throw; + } +} + +bool DetectorImpl::isReceiverInitialized() { + return (receivers.size() > 0); +} + +void DetectorImpl::initReceiver2() { + if (receivers2.size() != 0) { + throw RuntimeError("receiver2 vector already initialized"); + } + int tcpPort = DEFAULT_RX_PORTNO + size(); + int zmqPort = DEFAULT_ZMQ_CL_PORTNO + size(); + try { + for (int iModule = 0; iModule < size(); ++iModule) { + receivers2.resize(detectors.size()); + receivers2[iModule].push_back( + sls::make_unique(detectorId, iModule, 0, + false, tcpPort++, "", zmqPort++)); + detectors[iModule]->setNumberOfReceivers2(1); + } + } catch (...) { + receivers.clear(); + throw; + } +} + +bool DetectorImpl::isReceiver2Initialized() { + return (receivers2.size() > 0); +} + void DetectorImpl::updateDetectorSize() { LOG(logDEBUG) << "Updating Detector Size: " << size(); @@ -429,7 +551,7 @@ void DetectorImpl::readFrameFromReceiver() { int nDetPixelsY = 0; bool quadEnable = false; bool eiger = false; - bool numInterfaces = + int numInterfaces = Parallel(&Module::getNumberofUDPInterfacesFromShm, {}) .squash(); // cannot pick up from zmq diff --git a/slsDetectorSoftware/src/DetectorImpl.h b/slsDetectorSoftware/src/DetectorImpl.h index 583cd3a8f..531ccbd98 100755 --- a/slsDetectorSoftware/src/DetectorImpl.h +++ b/slsDetectorSoftware/src/DetectorImpl.h @@ -25,6 +25,7 @@ class detectorData; namespace sls{ class Module; +class Receiver; /** * @short structure allocated in shared memory to store detector settings @@ -47,7 +48,7 @@ struct sharedDetector { /** last time stamp when accessing the shared memory */ char lastDate[SHORT_STRING_LENGTH]; - int numberOfDetectors; + int numberOfModules; slsDetectorDefs::detectorType multiDetectorType; /** END OF FIXED PATTERN @@ -189,6 +190,320 @@ class DetectorImpl : public virtual slsDetectorDefs { } + + + + + + + template + sls::Result Parallel1(RT (sls::Receiver::*somefunc)(CT...), + std::vector dPositions, + std::vector rxPositions, + typename NonDeduced::type... Args) { + + if (receivers.size() == 0) + throw sls::RuntimeError("No receivers added"); + if (dPositions.empty() || + (dPositions.size() == 1 && dPositions[0] == -1)) { + dPositions.resize(receivers.size()); + std::iota(begin(dPositions), end(dPositions), 0); + } + std::vector> futures; + futures.reserve(dPositions.size());// cannot know rxPositions.size() without looping + for (size_t i : dPositions) { + if (i >= receivers.size()) + throw sls::RuntimeError("Detector out of range"); + // each entry + std::vector rxPos(rxPositions); + if (rxPositions.empty() || + (rxPositions.size() == 1 && rxPositions[0] == -1)) { + rxPos.resize(receivers[i].size()); + std::iota(begin(rxPos), end(rxPos), 0); + } + for (size_t j : rxPos) { + futures.push_back(std::async(std::launch::async, somefunc, + receivers[i][j].get(), Args...)); + } + } + sls::Result result; + result.reserve(dPositions.size());// cannot know rxPositions.size() without looping + for (auto &i : futures) { + result.push_back(i.get()); + } + return result; + } + + template + sls::Result Parallel1(RT (sls::Receiver::*somefunc)(CT...) const, + std::vector dPositions, + std::vector rxPositions, + typename NonDeduced::type... Args) const { + + if (receivers.size() == 0) + throw sls::RuntimeError("No receivers added"); + if (dPositions.empty() || + (dPositions.size() == 1 && dPositions[0] == -1)) { + dPositions.resize(receivers.size()); + std::iota(begin(dPositions), end(dPositions), 0); + } + std::vector> futures; + futures.reserve(dPositions.size());// cannot know rxPositions.size() without looping + for (size_t i : dPositions) { + if (i >= receivers.size()) + throw sls::RuntimeError("Detector out of range"); + // each entry + std::vector rxPos(rxPositions); + if (rxPositions.empty() || + (rxPositions.size() == 1 && rxPositions[0] == -1)) { + rxPos.resize(receivers[i].size()); + std::iota(begin(rxPos), end(rxPos), 0); + } + for (size_t j : rxPos) { + futures.push_back(std::async(std::launch::async, somefunc, + receivers[i][j].get(), Args...)); + } + } + sls::Result result; + result.reserve(dPositions.size());// cannot know rxPositions.size() without looping + for (auto &i : futures) { + result.push_back(i.get()); + } + return result; + } + + template + void Parallel1(void (sls::Receiver::*somefunc)(CT...), + std::vector dPositions, + std::vector rxPositions, + typename NonDeduced::type... Args) { + + if (receivers.size() == 0) + throw sls::RuntimeError("No receivers added"); + if (dPositions.empty() || + (dPositions.size() == 1 && dPositions[0] == -1)) { + dPositions.resize(receivers.size()); + std::iota(begin(dPositions), end(dPositions), 0); + } + std::vector> futures; + futures.reserve(dPositions.size());// cannot know rxPositions.size() without looping + for (size_t i : dPositions) { + if (i >= receivers.size()) + throw sls::RuntimeError("Detector out of range"); + // each entry + std::vector rxPos(rxPositions); + if (rxPositions.empty() || + (rxPositions.size() == 1 && rxPositions[0] == -1)) { + rxPos.resize(receivers[i].size()); + std::iota(begin(rxPos), end(rxPos), 0); + } + for (size_t j : rxPos) { + futures.push_back(std::async(std::launch::async, somefunc, + receivers[i][j].get(), Args...)); + } + } + for (auto &i : futures) { + i.get(); + } + } + + template + void Parallel1(void (sls::Receiver::*somefunc)(CT...) const, + std::vector dPositions, + std::vector rxPositions, + typename NonDeduced::type... Args) const { + + if (receivers.size() == 0) + throw sls::RuntimeError("No receivers added"); + if (dPositions.empty() || + (dPositions.size() == 1 && dPositions[0] == -1)) { + dPositions.resize(receivers.size()); + std::iota(begin(dPositions), end(dPositions), 0); + } + std::vector> futures; + futures.reserve(dPositions.size());// cannot know rxPositions.size() without looping + for (size_t i : dPositions) { + if (i >= receivers.size()) + throw sls::RuntimeError("Detector out of range"); + // each entry + std::vector rxPos(rxPositions); + if (rxPositions.empty() || + (rxPositions.size() == 1 && rxPositions[0] == -1)) { + rxPos.resize(receivers[i].size()); + std::iota(begin(rxPos), end(rxPos), 0); + } + for (size_t j : rxPos) { + futures.push_back(std::async(std::launch::async, somefunc, + receivers[i][j].get(), Args...)); + } + } + for (auto &i : futures) { + i.get(); + } + } + + + + + + + + + + + + + + template + sls::Result Parallel2(RT (sls::Receiver::*somefunc)(CT...), + std::vector dPositions, + std::vector rxPositions, + typename NonDeduced::type... Args) { + + if (receivers2.size() == 0) + throw sls::RuntimeError("No receivers2 added"); + if (dPositions.empty() || + (dPositions.size() == 1 && dPositions[0] == -1)) { + dPositions.resize(receivers2.size()); + std::iota(begin(dPositions), end(dPositions), 0); + } + std::vector> futures; + futures.reserve(dPositions.size());// cannot know rxPositions.size() without looping + for (size_t i : dPositions) { + if (i >= receivers2.size()) + throw sls::RuntimeError("Detector out of range"); + // each entry + std::vector rxPos(rxPositions); + if (rxPositions.empty() || + (rxPositions.size() == 1 && rxPositions[0] == -1)) { + rxPos.resize(receivers2[i].size()); + std::iota(begin(rxPos), end(rxPos), 0); + } + for (size_t j : rxPos) { + futures.push_back(std::async(std::launch::async, somefunc, + receivers2[i][j].get(), Args...)); + } + } + sls::Result result; + result.reserve(dPositions.size());// cannot know rxPositions.size() without looping + for (auto &i : futures) { + result.push_back(i.get()); + } + return result; + } + + template + sls::Result Parallel2(RT (sls::Receiver::*somefunc)(CT...) const, + std::vector dPositions, + std::vector rxPositions, + typename NonDeduced::type... Args) const { + + if (receivers2.size() == 0) + throw sls::RuntimeError("No receivers2 added"); + if (dPositions.empty() || + (dPositions.size() == 1 && dPositions[0] == -1)) { + dPositions.resize(receivers2.size()); + std::iota(begin(dPositions), end(dPositions), 0); + } + std::vector> futures; + futures.reserve(dPositions.size());// cannot know rxPositions.size() without looping + for (size_t i : dPositions) { + if (i >= receivers2.size()) + throw sls::RuntimeError("Detector out of range"); + // each entry + std::vector rxPos(rxPositions); + if (rxPositions.empty() || + (rxPositions.size() == 1 && rxPositions[0] == -1)) { + rxPos.resize(receivers2[i].size()); + std::iota(begin(rxPos), end(rxPos), 0); + } + for (size_t j : rxPos) { + futures.push_back(std::async(std::launch::async, somefunc, + receivers2[i][j].get(), Args...)); + } + } + sls::Result result; + result.reserve(dPositions.size());// cannot know rxPositions.size() without looping + for (auto &i : futures) { + result.push_back(i.get()); + } + return result; + } + + template + void Parallel2(void (sls::Receiver::*somefunc)(CT...), + std::vector dPositions, + std::vector rxPositions, + typename NonDeduced::type... Args) { + + if (receivers2.size() == 0) + throw sls::RuntimeError("No receivers2 added"); + if (dPositions.empty() || + (dPositions.size() == 1 && dPositions[0] == -1)) { + dPositions.resize(receivers2.size()); + std::iota(begin(dPositions), end(dPositions), 0); + } + std::vector> futures; + futures.reserve(dPositions.size());// cannot know rxPositions.size() without looping + for (size_t i : dPositions) { + if (i >= receivers2.size()) + throw sls::RuntimeError("Detector out of range"); + // each entry + std::vector rxPos(rxPositions); + if (rxPositions.empty() || + (rxPositions.size() == 1 && rxPositions[0] == -1)) { + rxPos.resize(receivers2[i].size()); + std::iota(begin(rxPos), end(rxPos), 0); + } + for (size_t j : rxPos) { + futures.push_back(std::async(std::launch::async, somefunc, + receivers2[i][j].get(), Args...)); + } + } + for (auto &i : futures) { + i.get(); + } + } + + template + void Parallel2(void (sls::Receiver::*somefunc)(CT...) const, + std::vector dPositions, + std::vector rxPositions, + typename NonDeduced::type... Args) const { + + if (receivers2.size() == 0) + throw sls::RuntimeError("No receivers2 added"); + if (dPositions.empty() || + (dPositions.size() == 1 && dPositions[0] == -1)) { + dPositions.resize(receivers2.size()); + std::iota(begin(dPositions), end(dPositions), 0); + } + std::vector> futures; + futures.reserve(dPositions.size());// cannot know rxPositions.size() without looping + for (size_t i : dPositions) { + if (i >= receivers2.size()) + throw sls::RuntimeError("Detector out of range"); + // each entry + std::vector rxPos(rxPositions); + if (rxPositions.empty() || + (rxPositions.size() == 1 && rxPositions[0] == -1)) { + rxPos.resize(receivers2[i].size()); + std::iota(begin(rxPos), end(rxPos), 0); + } + for (size_t j : rxPos) { + futures.push_back(std::async(std::launch::async, somefunc, + receivers2[i][j].get(), Args...)); + } + } + for (auto &i : futures) { + i.get(); + } + } + + + + + /** set acquiring flag in shared memory */ void setAcquiringFlag(bool flag); @@ -196,7 +511,7 @@ class DetectorImpl : public virtual slsDetectorDefs { int getDetectorId() const; /** Free specific shared memory from the command line without creating object */ - static void freeSharedMemory(int detectorId, int detPos = -1); + static void freeSharedMemory(int detectorId, int moduleId = -1); /** Free all modules from current multi Id shared memory and delete members */ void freeSharedMemory(); @@ -217,8 +532,14 @@ class DetectorImpl : public virtual slsDetectorDefs { */ void setVirtualDetectorServers(const int numdet, const int port); - /** Sets the hostname of all sls detectors in shared memory and updates local cache */ void setHostname(const std::vector &name); + void setHostname(const std::vector &name, + const std::vector &port); + + void initReceiver(); + bool isReceiverInitialized(); + void initReceiver2(); + bool isReceiver2Initialized(); /** Gets the total number of detectors */ int size() const; @@ -324,7 +645,7 @@ class DetectorImpl : public virtual slsDetectorDefs { /** Execute command in terminal and return result */ std::string exec(const char *cmd); - void addSlsDetector(const std::string &hostname); + void addModule(const std::string &hostname, const int port); void updateDetectorSize(); @@ -385,6 +706,12 @@ class DetectorImpl : public virtual slsDetectorDefs { /** pointers to the Module structures */ std::vector> detectors; + /** pointers to the Receiver structures, each row for a module */ + std::vector>> receivers; + /** for the second udp port [Eiger][Jungfrau] */ + std::vector>> receivers2; + + /** data streaming (down stream) enabled in client (zmq sckets created) */ bool client_downstream{false}; diff --git a/slsDetectorSoftware/src/Module.cpp b/slsDetectorSoftware/src/Module.cpp index 684bf3f0b..84576b593 100755 --- a/slsDetectorSoftware/src/Module.cpp +++ b/slsDetectorSoftware/src/Module.cpp @@ -55,6 +55,27 @@ bool Module::isFixedPatternSharedMemoryCompatible() { return (shm()->shmversion >= MODULE_SHMAPIVERSION); } +bool Module::hasSharedMemoryReceiverList(int version) { + return (version >= MODULE_SHMRXVERSION); +} + + +int Module::getNumberOfReceivers() const { + return shm()->numberOfReceivers; +} + +void Module::setNumberOfReceivers(const int num) { + shm()->numberOfReceivers = num; +} + +int Module::getNumberOfReceivers2() const { + return shm()->numberOfReceivers2; +} + +void Module::setNumberOfReceivers2(const int num) { + shm()->numberOfReceivers2 = num; +} + void Module::checkDetectorVersionCompatibility() { int fnum = F_CHECK_VERSION; int64_t arg = 0; @@ -128,7 +149,7 @@ int64_t Module::getSerialNumber() { int64_t Module::getReceiverSoftwareVersion() const { LOG(logDEBUG1) << "Getting receiver software version"; int64_t retval = -1; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_GET_RECEIVER_VERSION, nullptr, retval); } return retval; @@ -386,6 +407,8 @@ void Module::initializeDetectorStructure(detectorType type) { shm()->shmversion = MODULE_SHMVERSION; memset(shm()->hostname, 0, MAX_STR_LENGTH); shm()->myDetectorType = type; + shm()->numberOfReceivers = 0; + shm()->numberOfReceivers2 = 0; shm()->detectorSize.x = 0; shm()->detectorSize.y = 0; shm()->controlPort = DEFAULT_PORTNO; @@ -393,7 +416,7 @@ void Module::initializeDetectorStructure(detectorType type) { sls::strcpy_safe(shm()->settingsDir, getenv("HOME")); sls::strcpy_safe(shm()->rxHostname, "none"); shm()->rxTCPPort = DEFAULT_PORTNO + 2; - shm()->useReceiverFlag = false; + shm()->useReceiver = false; shm()->zmqport = DEFAULT_ZMQ_CL_PORTNO + (moduleId * ((shm()->myDetectorType == EIGER) ? 2 : 1)); shm()->zmqip = IpAddr{}; @@ -562,7 +585,7 @@ void Module::setQuad(const bool enable) { LOG(logDEBUG1) << "Setting Quad type to " << value; sendToDetector(F_SET_QUAD, value, nullptr); LOG(logDEBUG1) << "Setting Quad type to " << value << " in Receiver"; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_SET_RECEIVER_QUAD, value, nullptr); } } @@ -572,7 +595,7 @@ void Module::setReadNLines(const int value) { sendToDetector(F_SET_READ_N_LINES, value, nullptr); LOG(logDEBUG1) << "Setting read n lines to " << value << " in Receiver"; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_SET_RECEIVER_READ_N_LINES, value, nullptr); } } @@ -625,7 +648,7 @@ int Module::setStopPort(int port_number) { int Module::setReceiverPort(int port_number) { LOG(logDEBUG1) << "Setting reciever port to " << port_number; if (port_number >= 0 && port_number != shm()->rxTCPPort) { - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { int retval = -1; sendToReceiver(F_SET_RECEIVER_PORT, port_number, retval); shm()->rxTCPPort = retval; @@ -971,7 +994,7 @@ void Module::stopAcquisition() { // get status before stopping acquisition runStatus s = ERROR, r = ERROR; bool zmqstreaming = false; - if (shm()->useReceiverFlag && getReceiverStreaming()) { + if (shm()->useReceiver && getReceiverStreaming()) { zmqstreaming = true; s = getRunStatus(); r = getReceiverStatus(); @@ -1034,7 +1057,7 @@ int64_t Module::getNumberOfFrames() { void Module::setNumberOfFrames(int64_t value) { LOG(logDEBUG1) << "Setting number of frames to " << value; sendToDetector(F_SET_NUM_FRAMES, value, nullptr); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { LOG(logDEBUG1) << "Sending number of frames to Receiver: " << value; sendToReceiver(F_RECEIVER_SET_NUM_FRAMES, value, nullptr); } @@ -1050,7 +1073,7 @@ int64_t Module::getNumberOfTriggers() { void Module::setNumberOfTriggers(int64_t value) { LOG(logDEBUG1) << "Setting number of triggers to " << value; sendToDetector(F_SET_NUM_TRIGGERS, value, nullptr); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { LOG(logDEBUG1) << "Sending number of triggers to Receiver: " << value; sendToReceiver(F_SET_RECEIVER_NUM_TRIGGERS, value, nullptr); } @@ -1066,7 +1089,7 @@ int64_t Module::getNumberOfBursts() { void Module::setNumberOfBursts(int64_t value) { LOG(logDEBUG1) << "Setting number of bursts to " << value; sendToDetector(F_SET_NUM_BURSTS, value, nullptr); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { LOG(logDEBUG1) << "Sending number of bursts to Receiver: " << value; sendToReceiver(F_SET_RECEIVER_NUM_BURSTS, value, nullptr); } @@ -1096,7 +1119,7 @@ void Module::setNumberOfAnalogSamples(int value) { sendToDetector(F_SET_NUM_ANALOG_SAMPLES, value, nullptr); // update #nchan, as it depends on #samples, adcmask updateNumberOfChannels(); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { LOG(logDEBUG1) << "Sending number of analog samples to Receiver: " << value; sendToReceiver(F_RECEIVER_SET_NUM_ANALOG_SAMPLES, value, nullptr); } @@ -1114,7 +1137,7 @@ void Module::setNumberOfDigitalSamples(int value) { sendToDetector(F_SET_NUM_DIGITAL_SAMPLES, value, nullptr); // update #nchan, as it depends on #samples, adcmask updateNumberOfChannels(); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { LOG(logDEBUG1) << "Sending number of digital samples to Receiver: " << value; sendToReceiver(F_RECEIVER_SET_NUM_DIGITAL_SAMPLES, value, nullptr); } @@ -1131,7 +1154,7 @@ void Module::setExptime(int64_t value) { } LOG(logDEBUG1) << "Setting exptime to " << value << "ns"; sendToDetector(F_SET_EXPTIME, value, nullptr); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { LOG(logDEBUG1) << "Sending exptime to Receiver: " << value; sendToReceiver(F_RECEIVER_SET_EXPTIME, value, nullptr); } @@ -1147,7 +1170,7 @@ int64_t Module::getPeriod() { void Module::setPeriod(int64_t value) { LOG(logDEBUG1) << "Setting period to " << value << "ns"; sendToDetector(F_SET_PERIOD, value, nullptr); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { LOG(logDEBUG1) << "Sending period to Receiver: " << value; sendToReceiver(F_RECEIVER_SET_PERIOD, value, nullptr); } @@ -1182,7 +1205,7 @@ void Module::setSubExptime(int64_t value) { } LOG(logDEBUG1) << "Setting sub exptime to " << value << "ns"; sendToDetector(F_SET_SUB_EXPTIME, value, nullptr); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { LOG(logDEBUG1) << "Sending sub exptime to Receiver: " << value; sendToReceiver(F_RECEIVER_SET_SUB_EXPTIME, value, nullptr); } @@ -1198,7 +1221,7 @@ int64_t Module::getSubDeadTime() { void Module::setSubDeadTime(int64_t value) { LOG(logDEBUG1) << "Setting sub deadtime to " << value << "ns"; sendToDetector(F_SET_SUB_DEADTIME, value, nullptr); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { LOG(logDEBUG1) << "Sending sub deadtime to Receiver: " << value; sendToReceiver(F_RECEIVER_SET_SUB_DEADTIME, value, nullptr); } @@ -1298,7 +1321,7 @@ void Module::setTimingMode(timingMode value) { timingMode retval = GET_TIMING_MODE; LOG(logDEBUG1) << "Setting timing mode to " << value; sendToDetector(F_SET_TIMING_MODE, static_cast(value), retval); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { LOG(logDEBUG1) << "Sending timing mode to Receiver: " << value; sendToReceiver(F_SET_RECEIVER_TIMING_MODE, value, nullptr); } @@ -1323,7 +1346,7 @@ void Module::setDynamicRange(int n) { sendToDetector(F_SET_DYNAMIC_RANGE, n, retval); LOG(logDEBUG1) << "Dynamic Range: " << retval; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { int arg = retval; retval = -1; LOG(logDEBUG1) << "Sending dynamic range to receiver: " << arg; @@ -1438,7 +1461,7 @@ void Module::setReadoutMode(const slsDetectorDefs::readoutMode mode) { if (shm()->myDetectorType == CHIPTESTBOARD) { updateNumberOfChannels(); } - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_RECEIVER_SET_READOUT_MODE, mode, nullptr); } } @@ -1495,7 +1518,7 @@ void Module::setReceiverHostname(const std::string &receiverIP) { if (receiverIP == "none") { memset(shm()->rxHostname, 0, MAX_STR_LENGTH); sls::strcpy_safe(shm()->rxHostname, "none"); - shm()->useReceiverFlag = false; + shm()->useReceiver = false; } // stop acquisition if running @@ -1512,7 +1535,7 @@ void Module::setReceiverHostname(const std::string &receiverIP) { shm()->rxTCPPort = std::stoi(res[1]); } sls::strcpy_safe(shm()->rxHostname, host.c_str()); - shm()->useReceiverFlag = true; + shm()->useReceiver = true; checkReceiverVersionCompatibility(); // populate parameters from detector @@ -1665,7 +1688,7 @@ void Module::setDestinationUDPIP(const IpAddr ip) { throw RuntimeError("Invalid destination udp ip address"); } sendToDetector(F_SET_DEST_UDP_IP, ip, nullptr); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sls::MacAddr retval(0LU); sendToReceiver(F_SET_RECEIVER_UDP_IP, ip, retval); LOG(logINFO) << "Setting destination udp mac of detector " << moduleId << " to " << retval; @@ -1688,7 +1711,7 @@ void Module::setDestinationUDPIP2(const IpAddr ip) { } sendToDetector(F_SET_DEST_UDP_IP2, ip, nullptr); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sls::MacAddr retval(0LU); sendToReceiver(F_SET_RECEIVER_UDP_IP2, ip, retval); LOG(logINFO) << "Setting destination udp mac2 of detector " << moduleId << " to " << retval; @@ -1741,7 +1764,7 @@ sls::MacAddr Module::getDestinationUDPMAC2() { void Module::setDestinationUDPPort(const int port) { LOG(logDEBUG1) << "Setting destination udp port to " << port; sendToDetector(F_SET_DEST_UDP_PORT, port, nullptr); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_SET_RECEIVER_UDP_PORT, port, nullptr); } } @@ -1758,7 +1781,7 @@ int Module::getDestinationUDPPort() { void Module::setDestinationUDPPort2(const int port) { LOG(logDEBUG1) << "Setting destination udp port2 to " << port; sendToDetector(F_SET_DEST_UDP_PORT2, port, nullptr); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_SET_RECEIVER_UDP_PORT2, port, nullptr); } } @@ -1775,7 +1798,7 @@ void Module::setNumberofUDPInterfaces(int n) { LOG(logDEBUG1) << "Setting number of udp interfaces to " << n; sendToDetector(F_SET_NUM_INTERFACES, n, nullptr); shm()->numUDPInterfaces = n; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_SET_RECEIVER_NUM_INTERFACES, n, nullptr); } } @@ -1812,14 +1835,14 @@ void Module::setClientStreamingPort(int port) { shm()->zmqport = port; } int Module::getClientStreamingPort() { return shm()->zmqport; } void Module::setReceiverStreamingPort(int port) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (zmq port)"); } sendToReceiver(F_SET_RECEIVER_STREAMING_PORT, port, nullptr); } int Module::getReceiverStreamingPort() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to get receiver parameters (zmq port)"); } return sendToReceiver(F_GET_RECEIVER_STREAMING_PORT); @@ -1836,7 +1859,7 @@ void Module::setClientStreamingIP(const sls::IpAddr ip) { sls::IpAddr Module::getClientStreamingIP() { return shm()->zmqip; } void Module::setReceiverStreamingIP(const sls::IpAddr ip) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (streaming ip)"); } if (ip == 0) { @@ -1851,7 +1874,7 @@ void Module::setReceiverStreamingIP(const sls::IpAddr ip) { } sls::IpAddr Module::getReceiverStreamingIP() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (streaming ip)"); } return sendToReceiver(F_GET_RECEIVER_STREAMING_SRC_IP); @@ -1921,7 +1944,7 @@ void Module::setTransmissionDelayRight(int value) { void Module::setAdditionalJsonHeader(const std::map &jsonHeader) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (zmq json header)"); } for (auto &it : jsonHeader) { @@ -1959,7 +1982,7 @@ void Module::setAdditionalJsonHeader(const std::map &j } std::map Module::getAdditionalJsonHeader() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (zmq json header)"); } int fnum = F_GET_ADDITIONAL_JSON_HEADER; @@ -1990,7 +2013,7 @@ std::map Module::getAdditionalJsonHeader() { } void Module::setAdditionalJsonParameter(const std::string &key, const std::string &value) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (zmq json parameter)"); } if (key.empty() || key.length() > SHORT_STR_LENGTH || @@ -2005,7 +2028,7 @@ void Module::setAdditionalJsonParameter(const std::string &key, const std::strin } std::string Module::getAdditionalJsonParameter(const std::string &key) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (zmq json parameter)"); } char arg[SHORT_STR_LENGTH]{}; @@ -2019,7 +2042,7 @@ int64_t Module::setReceiverUDPSocketBufferSize(int64_t udpsockbufsize) { LOG(logDEBUG1) << "Sending UDP Socket Buffer size to receiver: " << udpsockbufsize; int64_t retval = -1; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_RECEIVER_UDP_SOCK_BUF_SIZE, udpsockbufsize, retval); LOG(logDEBUG1) << "Receiver UDP Socket Buffer size: " << retval; } @@ -2196,7 +2219,7 @@ void Module::setBurstMode(slsDetectorDefs::burstMode value) { int arg = static_cast(value); LOG(logDEBUG1) << "Setting burst mode to " << arg; sendToDetector(F_SET_BURST_MODE, arg, nullptr); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { LOG(logDEBUG1) << "Sending burst mode to Receiver: " << value; sendToReceiver(F_SET_RECEIVER_BURST_MODE, value, nullptr); } @@ -2253,7 +2276,7 @@ void Module::setROI(slsDetectorDefs::ROI arg) { LOG(logDEBUG) << "Sending ROI to detector [" << arg.xmin << ", " << arg.xmax << "]"; sendToDetector(F_SET_ROI, args, nullptr); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { LOG(logDEBUG1) << "Sending ROI to receiver"; sendToReceiver(F_RECEIVER_SET_ROI, arg, nullptr); } @@ -2283,7 +2306,7 @@ void Module::setADCEnableMask(uint32_t mask) { if (shm()->myDetectorType == MOENCH) setAdditionalJsonParameter("adcmask_1g", std::to_string(mask)); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { int fnum = F_RECEIVER_SET_ADC_MASK; int retval = -1; LOG(logDEBUG1) << "Setting ADC Enable mask to 0x" << std::hex @@ -2313,7 +2336,7 @@ void Module::setTenGigaADCEnableMask(uint32_t mask) { if (shm()->myDetectorType == MOENCH) setAdditionalJsonParameter("adcmask_10g", std::to_string(mask)); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { int fnum = F_RECEIVER_SET_ADC_MASK_10G; int retval = -1; LOG(logDEBUG1) << "Setting 10Gb ADC Enable mask to 0x" << std::hex @@ -2369,7 +2392,7 @@ int Module::setExternalSampling(int value) { int Module::getExternalSampling() { return setExternalSampling(-1); } void Module::setReceiverDbitList(const std::vector& list) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (dbit list)"); } @@ -2389,7 +2412,7 @@ void Module::setReceiverDbitList(const std::vector& list) { } std::vector Module::getReceiverDbitList() const { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (dbit list)"); } sls::FixedCapacityContainer retval; @@ -2398,14 +2421,14 @@ std::vector Module::getReceiverDbitList() const { } void Module::setReceiverDbitOffset(int value) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (dbit offset)"); } sendToReceiver(F_SET_RECEIVER_DBIT_OFFSET, value, nullptr); } int Module::getReceiverDbitOffset() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (dbit offset)"); } return sendToReceiver(F_GET_RECEIVER_DBIT_OFFSET); @@ -2424,21 +2447,21 @@ int Module::activate(int enable) { sendToDetector(F_ACTIVATE, enable, retval); sendToDetectorStop(F_ACTIVATE, enable, retval); LOG(logDEBUG1) << "Activate: " << retval; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_RECEIVER_ACTIVATE, retval, nullptr); } return retval; } bool Module::getDeactivatedRxrPaddingMode() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (deactivated padding)"); } return sendToReceiver(F_GET_RECEIVER_DEACTIVATED_PADDING); } void Module::setDeactivatedRxrPaddingMode(bool padding) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (deactivated padding)"); } int arg = static_cast(padding); @@ -2446,7 +2469,7 @@ void Module::setDeactivatedRxrPaddingMode(bool padding) { } bool Module::getFlippedDataX() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (flipped data x)"); } int retval = -1; @@ -2457,7 +2480,7 @@ bool Module::getFlippedDataX() { } void Module::setFlippedDataX(bool value) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (flipped data x)"); } int retval = -1; @@ -2808,12 +2831,12 @@ std::string Module::printReceiverConfiguration() { return os.str(); } -bool Module::getUseReceiverFlag() const { return shm()->useReceiverFlag; } +bool Module::getUseReceiverFlag() const { return shm()->useReceiver; } int Module::lockReceiver(int lock) { LOG(logDEBUG1) << "Setting receiver server lock to " << lock; int retval = -1; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_LOCK_RECEIVER, lock, retval); LOG(logDEBUG1) << "Receiver Lock: " << retval; } @@ -2823,7 +2846,7 @@ int Module::lockReceiver(int lock) { sls::IpAddr Module::getReceiverLastClientIP() const { sls::IpAddr retval; LOG(logDEBUG1) << "Getting last client ip to receiver server"; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_GET_LAST_RECEIVER_CLIENT_IP, nullptr, retval); LOG(logDEBUG1) << "Last client IP from receiver: " << retval; } @@ -2832,7 +2855,7 @@ sls::IpAddr Module::getReceiverLastClientIP() const { void Module::exitReceiver() { LOG(logDEBUG1) << "Sending exit command to receiver server"; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_EXIT_RECEIVER, nullptr, nullptr); } } @@ -2842,14 +2865,14 @@ void Module::execReceiverCommand(const std::string &cmd) { char retval[MAX_STR_LENGTH]{}; sls::strcpy_safe(arg, cmd.c_str()); LOG(logDEBUG1) << "Sending command to receiver: " << arg; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_EXEC_RECEIVER_COMMAND, arg, retval); LOG(logINFO) << "Receiver " << moduleId << " returned:\n" << retval; } } std::string Module::getFilePath() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (file path)"); } char retvals[MAX_STR_LENGTH]{}; @@ -2858,7 +2881,7 @@ std::string Module::getFilePath() { } void Module::setFilePath(const std::string &path) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (file path)"); } if (path.empty()) { @@ -2870,7 +2893,7 @@ void Module::setFilePath(const std::string &path) { } std::string Module::getFileName() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (file name prefix)"); } char retvals[MAX_STR_LENGTH]{}; @@ -2879,7 +2902,7 @@ std::string Module::getFileName() { } void Module::setFileName(const std::string &fname) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (file name prefix)"); } if (fname.empty()) { @@ -2891,28 +2914,28 @@ void Module::setFileName(const std::string &fname) { } int64_t Module::getFileIndex() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (file index)"); } return sendToReceiver(F_GET_RECEIVER_FILE_INDEX); } void Module::setFileIndex(int64_t file_index) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (file index)"); } sendToReceiver(F_SET_RECEIVER_FILE_INDEX, file_index, nullptr); } void Module::incrementFileIndex() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (increment file index)"); } sendToReceiver(F_INCREMENT_FILE_INDEX, nullptr, nullptr); } slsDetectorDefs::fileFormat Module::getFileFormat() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (file format)"); } return static_cast( @@ -2920,7 +2943,7 @@ slsDetectorDefs::fileFormat Module::getFileFormat() { } void Module::setFileFormat(fileFormat f) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (file format)"); } int arg = static_cast(f); @@ -2928,21 +2951,21 @@ void Module::setFileFormat(fileFormat f) { } int Module::getFramesPerFile() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (frames per file)"); } return sendToReceiver(F_GET_RECEIVER_FRAMES_PER_FILE); } void Module::setFramesPerFile(int n_frames) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (frames per file)"); } sendToReceiver(F_SET_RECEIVER_FRAMES_PER_FILE, n_frames, nullptr); } slsDetectorDefs::frameDiscardPolicy Module::getReceiverFramesDiscardPolicy() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (frame discard policy)"); } return static_cast( @@ -2950,7 +2973,7 @@ slsDetectorDefs::frameDiscardPolicy Module::getReceiverFramesDiscardPolicy() { } void Module::setReceiverFramesDiscardPolicy(frameDiscardPolicy f) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (frame discard policy)"); } int arg = static_cast(f); @@ -2958,14 +2981,14 @@ void Module::setReceiverFramesDiscardPolicy(frameDiscardPolicy f) { } bool Module::getPartialFramesPadding() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (frame padding)"); } return sendToReceiver(F_GET_RECEIVER_PADDING); } void Module::setPartialFramesPadding(bool padding) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (frame padding)"); } int arg = static_cast(padding); @@ -2975,14 +2998,14 @@ void Module::setPartialFramesPadding(bool padding) { void Module::startReceiver() { LOG(logDEBUG1) << "Starting Receiver"; shm()->stoppedFlag = false; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_START_RECEIVER, nullptr, nullptr); } } void Module::stopReceiver() { LOG(logDEBUG1) << "Stopping Receiver"; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { int arg = static_cast(shm()->stoppedFlag); sendToReceiver(F_STOP_RECEIVER, arg, nullptr); } @@ -2991,7 +3014,7 @@ void Module::stopReceiver() { slsDetectorDefs::runStatus Module::getReceiverStatus() const { runStatus retval = ERROR; LOG(logDEBUG1) << "Getting Receiver Status"; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_GET_RECEIVER_STATUS, nullptr, retval); LOG(logDEBUG1) << "Receiver Status: " << ToString(retval); } @@ -3001,7 +3024,7 @@ slsDetectorDefs::runStatus Module::getReceiverStatus() const { int64_t Module::getFramesCaughtByReceiver() const { int64_t retval = -1; LOG(logDEBUG1) << "Getting Frames Caught by Receiver"; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_GET_RECEIVER_FRAMES_CAUGHT, nullptr, retval); LOG(logDEBUG1) << "Frames Caught by Receiver: " << retval; } @@ -3010,7 +3033,7 @@ int64_t Module::getFramesCaughtByReceiver() const { std::vector Module::getNumMissingPackets() const { LOG(logDEBUG1) << "Getting num missing packets"; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { int fnum = F_GET_NUM_MISSING_PACKETS; int ret = FAIL; auto client = ReceiverSocket(shm()->rxHostname, shm()->rxTCPPort); @@ -3038,7 +3061,7 @@ std::vector Module::getNumMissingPackets() const { uint64_t Module::getReceiverCurrentFrameIndex() const { uint64_t retval = -1; LOG(logDEBUG1) << "Getting Current Frame Index of Receiver"; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_GET_RECEIVER_FRAME_INDEX, nullptr, retval); LOG(logDEBUG1) << "Current Frame Index of Receiver: " << retval; } @@ -3047,7 +3070,7 @@ uint64_t Module::getReceiverCurrentFrameIndex() const { int Module::getReceiverProgress() const { int retval = -1; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_GET_RECEIVER_PROGRESS, nullptr, retval); LOG(logDEBUG1) << "Current Progress of Receiver: " << retval; } @@ -3055,7 +3078,7 @@ int Module::getReceiverProgress() const { } void Module::setFileWrite(bool value) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (file write enable)"); } int arg = static_cast(value); @@ -3063,14 +3086,14 @@ void Module::setFileWrite(bool value) { } bool Module::getFileWrite() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (file_write enable)"); } return sendToReceiver(F_GET_RECEIVER_FILE_WRITE); } void Module::setMasterFileWrite(bool value) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (master file write enable)"); } int arg = static_cast(value); @@ -3078,14 +3101,14 @@ void Module::setMasterFileWrite(bool value) { } bool Module::getMasterFileWrite() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (master file write enable)"); } return sendToReceiver(F_GET_RECEIVER_MASTER_FILE_WRITE); } void Module::setFileOverWrite(bool value) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (file overwrite enable)"); } int arg = static_cast(value); @@ -3093,21 +3116,21 @@ void Module::setFileOverWrite(bool value) { } bool Module::getFileOverWrite() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (file overwrite enable)"); } return sendToReceiver(F_GET_RECEIVER_OVERWRITE); } int Module::getReceiverStreamingFrequency() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (streaming/read frequency)"); } return sendToReceiver(F_GET_RECEIVER_STREAMING_FREQUENCY); } void Module::setReceiverStreamingFrequency(int freq) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (streaming/read frequency)"); } if (freq < 0) { @@ -3119,7 +3142,7 @@ void Module::setReceiverStreamingFrequency(int freq) { int Module::setReceiverStreamingTimer(int time_in_ms) { int retval = -1; LOG(logDEBUG1) << "Sending read timer to receiver: " << time_in_ms; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_RECEIVER_STREAMING_TIMER, time_in_ms, retval); LOG(logDEBUG1) << "Receiver read timer: " << retval; } @@ -3127,14 +3150,14 @@ int Module::setReceiverStreamingTimer(int time_in_ms) { } bool Module::getReceiverStreaming() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to get receiver parameters (zmq enable)"); } return sendToReceiver(F_GET_RECEIVER_STREAMING); } void Module::setReceiverStreaming(bool enable) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (zmq enable)"); } int arg = static_cast(enable); @@ -3151,7 +3174,7 @@ bool Module::enableTenGigabitEthernet(int value) { } LOG(logDEBUG1) << "10Gbe: " << retval; value = retval; - if (shm()->useReceiverFlag && value != -1) { + if (shm()->useReceiver && value != -1) { int retval = -1; LOG(logDEBUG1) << "Sending 10Gbe enable to receiver: " << value; sendToReceiver(F_ENABLE_RECEIVER_TEN_GIGA, value, retval); @@ -3163,7 +3186,7 @@ bool Module::enableTenGigabitEthernet(int value) { int Module::setReceiverFifoDepth(int n_frames) { int retval = -1; LOG(logDEBUG1) << "Sending Receiver Fifo Depth: " << n_frames; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_SET_RECEIVER_FIFO_DEPTH, n_frames, retval); LOG(logDEBUG1) << "Receiver Fifo Depth: " << retval; } @@ -3171,14 +3194,14 @@ int Module::setReceiverFifoDepth(int n_frames) { } bool Module::getReceiverSilentMode() { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (silent mode)"); } return sendToReceiver(F_GET_RECEIVER_SILENT_MODE); } void Module::setReceiverSilentMode(bool enable) { - if (!shm()->useReceiverFlag) { + if (!shm()->useReceiver) { throw RuntimeError("Set rx_hostname first to use receiver parameters (silent mode)"); } int arg = static_cast(enable); @@ -3187,7 +3210,7 @@ void Module::setReceiverSilentMode(bool enable) { void Module::restreamStopFromReceiver() { LOG(logDEBUG1) << "Restream stop dummy from Receiver via zmq"; - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { sendToReceiver(F_RESTREAM_STOP_FROM_RECEIVER, nullptr, nullptr); } } @@ -3392,7 +3415,7 @@ void Module::setPipeline(int clkIndex, int value) { void Module::setCounterMask(uint32_t countermask) { LOG(logDEBUG1) << "Setting Counter mask to " << countermask; sendToDetector(F_SET_COUNTER_MASK, countermask, nullptr); - if (shm()->useReceiverFlag) { + if (shm()->useReceiver) { int ncounters = __builtin_popcount(countermask); LOG(logDEBUG1) << "Sending Reciver #counters: " << ncounters; sendToReceiver(F_RECEIVER_SET_NUM_COUNTERS, ncounters, nullptr); diff --git a/slsDetectorSoftware/src/Module.h b/slsDetectorSoftware/src/Module.h index 1401defe5..ce38d4fea 100755 --- a/slsDetectorSoftware/src/Module.h +++ b/slsDetectorSoftware/src/Module.h @@ -13,8 +13,9 @@ class ServerInterface; +#define MODULE_SHMRXVERSION 0x200415 #define MODULE_SHMAPIVERSION 0x190726 -#define MODULE_SHMVERSION 0x200402 +#define MODULE_SHMVERSION 0x200415 namespace sls{ @@ -36,6 +37,9 @@ struct sharedModule { /** detector type \ see :: detectorType*/ slsDetectorDefs::detectorType myDetectorType; + int numberOfReceivers; + int numberOfReceivers2; + /** END OF FIXED PATTERN -----------------------------------------------*/ /** Number of detectors in multi list in x dir and y dir */ @@ -70,7 +74,7 @@ struct sharedModule { /** is set if the receiver hostname given and is connected, * unset if socket connection is not possible */ - bool useReceiverFlag; + bool useReceiver; /** tcp port from gui/different process to receiver (only data) */ int zmqport; @@ -118,6 +122,12 @@ class Module : public virtual slsDetectorDefs { */ bool isFixedPatternSharedMemoryCompatible(); + static bool hasSharedMemoryReceiverList(int version); + + int getNumberOfReceivers() const; + void setNumberOfReceivers(const int num); + int getNumberOfReceivers2() const; + void setNumberOfReceivers2(const int num); /** * Check version compatibility with receiver software */ diff --git a/slsDetectorSoftware/src/Receiver.cpp b/slsDetectorSoftware/src/Receiver.cpp index d3d4f55cf..7cdc0cfbc 100755 --- a/slsDetectorSoftware/src/Receiver.cpp +++ b/slsDetectorSoftware/src/Receiver.cpp @@ -3,17 +3,13 @@ namespace sls { -size_t Receiver::NUM_RECEIVERS{0}; - -size_t Receiver::getNumReceivers() { - return NUM_RECEIVERS; -} - // create shm Receiver::Receiver(int detector_id, int module_id, int receiver_id, - int tcp_port, std::string hostname) - : receiverId(receiver_id), shm(detector_id, module_id, receiver_id) { + bool primaryInterface, int tcp_port, std::string hostname, + int zmq_port) : + receiverId(receiver_id), moduleId(module_id), + shm(detector_id, module_id, receiver_id, primaryInterface) { // ensure shared memory was not created before if (shm.IsExisting()) { @@ -22,19 +18,24 @@ Receiver::Receiver(int detector_id, int module_id, int receiver_id, << shm.GetName() << ". Freeing it again"; shm.RemoveSharedMemory(); } - shm = SharedMemory(detector_id, module_id, receiver_id); - ++NUM_RECEIVERS; + shm = SharedMemory(detector_id, module_id, receiver_id, + primaryInterface); + shm.CreateSharedMemory(); // initalize receiver structure shm()->shmversion = RECEIVER_SHMVERSION; memset(shm()->hostname, 0, MAX_STR_LENGTH); - shm()->tcpPort = DEFAULT_RX_PORTNO + NUM_RECEIVERS - 1; - shm()->zmqPort = DEFAULT_ZMQ_RX_PORTNO + NUM_RECEIVERS - 1; + shm()->tcpPort = DEFAULT_RX_PORTNO + receiver_id; + shm()->valid = false; + shm()->zmqPort = DEFAULT_ZMQ_RX_PORTNO + receiver_id; shm()->zmqIp = IpAddr{}; // copy port, hostname if given if (tcp_port != 0) { - shm()->tcpPort = tcp_port; + setTCPPort(tcp_port); + } + if (zmq_port != 0) { + shm()->zmqPort = zmq_port; } if (!hostname.empty()) { setHostname(hostname); @@ -43,42 +44,60 @@ Receiver::Receiver(int detector_id, int module_id, int receiver_id, // open shm Receiver::Receiver(int detector_id, int module_id, int receiver_id, - bool verify) - : receiverId(receiver_id), shm(detector_id, module_id, receiver_id) { + bool primaryInterface, bool verify) : + receiverId(receiver_id), moduleId(module_id), + shm(detector_id, module_id, receiver_id, primaryInterface) { shm.OpenSharedMemory(); if (verify && shm()->shmversion != RECEIVER_SHMVERSION) { std::ostringstream ss; - ss << "Receiver shared memory (" << detector_id << "-" << receiverId - << ":) version mismatch (expected 0x" << std::hex + ss << "Receiver shared memory (" << detector_id << "-" << moduleId + << ":" << receiverId << ") version mismatch (expected 0x" << std::hex << RECEIVER_SHMVERSION << " but got 0x" << shm()->shmversion << ")" << std::dec << ". Clear Shared memory to continue."; throw SharedMemoryError(ss.str()); } } -Receiver::~Receiver() { - --NUM_RECEIVERS; +Receiver::~Receiver() = default; + +void Receiver::freeSharedMemory() { + if (shm.IsExisting()) { + shm.RemoveSharedMemory(); + } } -void Receiver::setHostname(const std::string &ip_port) { - if (ip_port.empty()) { +std::string Receiver::getHostname() const { + return shm()->hostname; +} + +void Receiver::setHostname(const std::string &hostname) { + if (hostname.empty()) { throw RuntimeError("Invalid receiver hostname. Cannot be empty."); } - // parse tcp port from this hostname:port string - std::string host = ip_port; - auto res = sls::split(host, ':'); - if (res.size() > 1) { - host = res[0]; - shm()->tcpPort = std::stoi(res[1]); - } - sls::strcpy_safe(shm()->hostname, host.c_str()); - - updateReceiver(); + sls::strcpy_safe(shm()->hostname, hostname.c_str()); + configure(); } -void Receiver::updateReceiver() { +void Receiver::configure() { + shm()->valid = false; + LOG(logINFOBLUE) << receiverId << " configured!"; //checkReceiverVersionCompatibility(); + shm()->valid = true; } +int Receiver::getTCPPort() const { + return shm()->tcpPort; +} + +void Receiver::setTCPPort(const int port) { + if (port >= 0 && port != shm()->tcpPort) { + if (shm()->valid) { + // send to receiver to change tcpp port + shm()->tcpPort = port; // for now + } else { + shm()->tcpPort = port; + } + } +} } // namespace sls \ No newline at end of file diff --git a/slsDetectorSoftware/src/Receiver.h b/slsDetectorSoftware/src/Receiver.h index 7ffc18c54..17ff9d404 100755 --- a/slsDetectorSoftware/src/Receiver.h +++ b/slsDetectorSoftware/src/Receiver.h @@ -14,6 +14,7 @@ namespace sls { int shmversion; char hostname[MAX_STR_LENGTH]; int tcpPort; + bool valid; /** END OF FIXED PATTERN -----------------------------------------------*/ @@ -27,20 +28,33 @@ namespace sls { static size_t getNumReceivers(); // create shm explicit Receiver(int detector_id, int module_id, int receiver_id, - int tcp_port = 0, std::string hostname = ""); + bool primaryInterface, int tcp_port = 0, std::string hostname = "", + int zmq_port = 0); // open shm explicit Receiver(int detector_id, int module_id, int receiver_id, - bool verify); + bool primaryInterface, bool verify); virtual ~Receiver(); - void setHostname(const std::string &ip_port); - void updateReceiver(); + + /** + * Free shared memory and delete shared memory structure + * occupied by the sharedReceiver structure + * Is only safe to call if one deletes the Receiver object afterward + * and frees multi shared memory/updates + * thisMultiDetector->numberOfReceivers + */ + void freeSharedMemory(); + std::string getHostname() const; + void setHostname(const std::string &hostname); + void configure(); + int getTCPPort() const; + void setTCPPort(const int port); private: - static size_t NUM_RECEIVERS; const int receiverId{0}; - mutable sls::SharedMemory shm{0, 0, 0}; + const int moduleId{0}; + mutable sls::SharedMemory shm{0, 0, 0, true}; }; } // sls \ No newline at end of file diff --git a/slsDetectorSoftware/src/SharedMemory.h b/slsDetectorSoftware/src/SharedMemory.h index 5d5c714a6..52c9d8d1c 100755 --- a/slsDetectorSoftware/src/SharedMemory.h +++ b/slsDetectorSoftware/src/SharedMemory.h @@ -25,6 +25,7 @@ #define SHM_MULTI_PREFIX "/slsDetectorPackage_multi_" #define SHM_MODULE_PREFIX "_module_" #define SHM_RECEIVER_PREFIX "_receiver_" +#define SHM_RECEIVER2_PREFIX "_receiver2_" #define SHM_ENV_NAME "SLSDETNAME" #include @@ -42,9 +43,10 @@ class SharedMemory { * @param multiId multi detector id * @param moduleId module detector id, -1 if a multi detector shared memory * @param receiverId receiver id, -1 if a multi detector or module shared memory + * @param primaryInterface is false, for the second udp port receiver */ - SharedMemory(int multiId, int moduleId, int receiverId = -1) { - name = ConstructSharedMemoryName(multiId, moduleId, receiverId); + SharedMemory(int multiId, int moduleId, int receiverId = -1, bool primaryInterface = true) { + name = ConstructSharedMemoryName(multiId, moduleId, receiverId, primaryInterface); } /** @@ -186,7 +188,8 @@ class SharedMemory { // silent exit if shm did not exist anyway if (errno == ENOENT) return; - std::string msg = "Free Shared Memory " + name + " Failed: " + strerror(errno); + std::string msg = "Free Shared Memory " + name + " Failed: " + + strerror(errno); LOG(logERROR) << msg; throw SharedMemoryError(msg); } @@ -219,9 +222,11 @@ class SharedMemory { * @param multiId multi detector id * @param moduleId module detector id, -1 if a multi detector shared memory * @param receiverId receiver id, -1 if a multi detector or module shared memory + * @param primaryInterface false, if second udp port receiver * @returns shared memory name */ - std::string ConstructSharedMemoryName(int multiId, int moduleId, int receiverId) { + std::string ConstructSharedMemoryName(int multiId, int moduleId, + int receiverId, bool primaryInterface) { // using environment path std::string sEnvPath = ""; @@ -232,13 +237,20 @@ class SharedMemory { } std::stringstream ss; - if (moduleId < 0) + if (moduleId < 0 && receiverId < 0) ss << SHM_MULTI_PREFIX << multiId << sEnvPath; else if (receiverId < 0) - ss << SHM_MULTI_PREFIX << multiId << SHM_MODULE_PREFIX << moduleId << sEnvPath; - else - ss << SHM_MULTI_PREFIX << multiId << SHM_MODULE_PREFIX << moduleId << + ss << SHM_MULTI_PREFIX << multiId << + SHM_MODULE_PREFIX << moduleId << sEnvPath; + else if (primaryInterface) + ss << SHM_MULTI_PREFIX << multiId << + SHM_MODULE_PREFIX << moduleId << SHM_RECEIVER_PREFIX << receiverId << sEnvPath; + else + ss << SHM_MULTI_PREFIX << multiId << + SHM_MODULE_PREFIX << moduleId << + SHM_RECEIVER2_PREFIX << receiverId << sEnvPath; + std::string temp = ss.str(); if (temp.length() > NAME_MAX_LENGTH) { @@ -259,9 +271,11 @@ class SharedMemory { */ T *MapSharedMemory() { - void *addr = mmap(nullptr, sizeof(T), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + void *addr = mmap(nullptr, sizeof(T), PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0); if (addr == MAP_FAILED) { - std::string msg = "Mapping shared memory " + name + " failed: " + strerror(errno); + std::string msg = "Mapping shared memory " + name + " failed: " + + strerror(errno); LOG(logERROR) << msg; close(fd); throw SharedMemoryError(msg); @@ -280,7 +294,8 @@ class SharedMemory { struct stat sb; // could not fstat if (fstat(fd, &sb) < 0) { - std::string msg = "Could not verify existing shared memory " + name + " size match " + "(could not fstat): " + strerror(errno); + std::string msg = "Could not verify existing shared memory " + + name + " size match " + "(could not fstat): " + strerror(errno); LOG(logERROR) << msg; close(fd); throw SharedMemoryError(msg); @@ -289,7 +304,9 @@ class SharedMemory { //size does not match auto sz = static_cast(sb.st_size); if (sz != expectedSize) { - std::string msg = "Existing shared memory " + name + " size does not match" + "Expected " + std::to_string(expectedSize) + ", found " + std::to_string(sz); + std::string msg = "Existing shared memory " + name + + " size does not match" + "Expected " + + std::to_string(expectedSize) + ", found " + std::to_string(sz); LOG(logERROR) << msg; throw SharedMemoryError(msg); return 1;