From 5339e16101a926e07dba1b4821661582512f3695 Mon Sep 17 00:00:00 2001 From: Dhanya Thattil Date: Wed, 22 Apr 2020 17:55:02 +0200 Subject: [PATCH] port sequences and client zmq implementation needs revisit, WIP --- slsDetectorSoftware/include/Detector.h | 2 - slsDetectorSoftware/src/Detector.cpp | 62 ++++++++++-------------- slsDetectorSoftware/src/DetectorImpl.cpp | 17 ++++++- slsDetectorSoftware/src/DetectorImpl.h | 1 + slsDetectorSoftware/src/Module.cpp | 37 -------------- slsDetectorSoftware/src/Module.h | 24 --------- slsDetectorSoftware/src/Receiver.cpp | 28 ++++++++++- slsDetectorSoftware/src/Receiver.h | 19 +++++--- 8 files changed, 81 insertions(+), 109 deletions(-) diff --git a/slsDetectorSoftware/include/Detector.h b/slsDetectorSoftware/include/Detector.h index a97004305..983a6aa94 100644 --- a/slsDetectorSoftware/include/Detector.h +++ b/slsDetectorSoftware/include/Detector.h @@ -1370,8 +1370,6 @@ class Detector { Result getRxCurrentFrameIndex(Positions pos = {}) const; - private: - std::vector getPortNumbers(int start_port); }; } // namespace sls \ No newline at end of file diff --git a/slsDetectorSoftware/src/Detector.cpp b/slsDetectorSoftware/src/Detector.cpp index 1fa17021a..10b90f9e2 100644 --- a/slsDetectorSoftware/src/Detector.cpp +++ b/slsDetectorSoftware/src/Detector.cpp @@ -633,7 +633,7 @@ Result Detector::getDestinationUDPIP(Positions pos) const { void Detector::setDestinationUDPIP(const IpAddr ip, Positions pos) { pimpl->Parallel(&Module::setDestinationUDPIP, pos, ip); - auto mac = pimpl->Parallel1(&Receiver::setDestinationUDPIP, pos, {}, ip).squash(); + auto mac = pimpl->Parallel1(&Receiver::setUDPIP, pos, {}, ip).squash(); setDestinationUDPMAC(mac, pos); } @@ -643,6 +643,8 @@ Result Detector::getDestinationUDPIP2(Positions pos) const { void Detector::setDestinationUDPIP2(const IpAddr ip, Positions pos) { pimpl->Parallel(&Module::setDestinationUDPIP2, pos, ip); + auto mac = pimpl->Parallel2(&Receiver::setUDPIP, pos, {}, ip).squash(); + setDestinationUDPMAC2(mac, pos); } Result Detector::getDestinationUDPMAC(Positions pos) const { @@ -667,13 +669,14 @@ Result Detector::getDestinationUDPPort(Positions pos) const { void Detector::setDestinationUDPPort(int port, int module_id) { if (module_id == -1) { - std::vector port_list = getPortNumbers(port); - for (int idet = 0; idet < size(); ++idet) { - pimpl->Parallel(&Module::setDestinationUDPPort, {idet}, - port_list[idet]); + for (int iModule = 0; iModule < size(); ++iModule) { + pimpl->Parallel(&Module::setDestinationUDPPort, {iModule}, port); + pimpl->Parallel1(&Receiver::setUDPPort, {iModule}, {}, port); + ++port; } } else { pimpl->Parallel(&Module::setDestinationUDPPort, {module_id}, port); + pimpl->Parallel1(&Receiver::setUDPPort, {module_id}, {}, port); } } @@ -683,14 +686,14 @@ Result Detector::getDestinationUDPPort2(Positions pos) const { void Detector::setDestinationUDPPort2(int port, int module_id) { if (module_id == -1) { - std::vector port_list = getPortNumbers(port); - for (int idet = 0; idet < size(); ++idet) { - pimpl->Parallel(&Module::setDestinationUDPPort2, {idet}, - port_list[idet]); + for (int iModule = 0; iModule < size(); ++iModule) { + pimpl->Parallel(&Module::setDestinationUDPPort2, {iModule}, port++); + pimpl->Parallel2(&Receiver::setUDPPort, {iModule}, {}, port); + ++port; } } else { - pimpl->Parallel(&Module::setDestinationUDPPort2, {module_id}, - port); + pimpl->Parallel(&Module::setDestinationUDPPort2, {module_id}, port); + pimpl->Parallel2(&Receiver::setUDPPort, {module_id}, {}, port); } } @@ -792,20 +795,18 @@ void Detector::setRxPort(const int udpInterface, int port, int module_id) { } if (udpInterface == 1) { if (module_id == -1) { - std::vector port_list = getPortNumbers(port); for (int idet = 0; idet < size(); ++idet) { pimpl->Parallel1(&Receiver::setTCPPort, {idet}, {}, - port_list[idet]); + port++); } } else { pimpl->Parallel1(&Receiver::setTCPPort, {module_id}, {}, port); } } else { if (module_id == -1) { - std::vector port_list = getPortNumbers(port); for (int idet = 0; idet < size(); ++idet) { pimpl->Parallel2(&Receiver::setTCPPort, {idet}, {}, - port_list[idet]); + port++); } } else { pimpl->Parallel2(&Receiver::setTCPPort, {module_id}, {}, port); @@ -966,19 +967,18 @@ void Detector::setRxZmqTimer(int time_in_ms, Positions pos) { } Result Detector::getRxZmqPort(Positions pos) const { - return pimpl->Parallel(&Module::getReceiverStreamingPort, pos); + return pimpl->Parallel1(&Receiver::getReceiverZmqPort, pos, {}); } void Detector::setRxZmqPort(int port, int module_id) { if (module_id == -1) { - std::vector port_list = getPortNumbers(port); for (int idet = 0; idet < size(); ++idet) { - pimpl->Parallel(&Module::setReceiverStreamingPort, {idet}, - port_list[idet]); + pimpl->Parallel1(&Receiver::setReceiverZmqPort, {idet}, + {}, port++); } } else { - pimpl->Parallel(&Module::setReceiverStreamingPort, {module_id}, - port); + pimpl->Parallel1(&Receiver::setReceiverZmqPort, {module_id}, + {}, port++); } } @@ -996,19 +996,18 @@ void Detector::setRxZmqIP(const IpAddr ip, Positions pos) { } Result Detector::getClientZmqPort(Positions pos) const { - return pimpl->Parallel(&Module::getClientStreamingPort, pos); + return pimpl->Parallel1(&Receiver::getClientZmqPort, pos, {}); } void Detector::setClientZmqPort(int port, int module_id) { if (module_id == -1) { - std::vector port_list = getPortNumbers(port); for (int idet = 0; idet < size(); ++idet) { - pimpl->Parallel(&Module::setClientStreamingPort, {idet}, - port_list[idet]); + pimpl->Parallel1(&Receiver::setClientZmqPort, {idet}, + {}, port++); } } else { - pimpl->Parallel(&Module::setClientStreamingPort, {module_id}, - port); + pimpl->Parallel1(&Receiver::setClientZmqPort, {module_id}, + {}, port);// FIXME: Needs a clientzmqport2 } } @@ -1967,13 +1966,4 @@ Result Detector::getRxCurrentFrameIndex(Positions pos) const { return pimpl->Parallel(&Module::getReceiverCurrentFrameIndex, pos); } -std::vector Detector::getPortNumbers(int start_port) { - std::vector res; - res.reserve(size()); - for (int idet = 0; idet < size(); ++idet) { - res.push_back(start_port + idet); - } - return res; -} - } // namespace sls \ No newline at end of file diff --git a/slsDetectorSoftware/src/DetectorImpl.cpp b/slsDetectorSoftware/src/DetectorImpl.cpp index 9c48b97ac..98d99dfc6 100755 --- a/slsDetectorSoftware/src/DetectorImpl.cpp +++ b/slsDetectorSoftware/src/DetectorImpl.cpp @@ -361,6 +361,19 @@ void DetectorImpl::addModule(const std::string &hostname, detectors[pos]->updateNumberOfChannels(); } +int DetectorImpl::getNumberofReceiversPerModule() const { + int retval = receivers.size(); + if (receivers2.size()) { + retval *= 2; + } + // for round robin + if (retval) { + retval *= receivers[0].size(); + } + return retval; +} + + void DetectorImpl::initReceiver(const int udpInterface) { if (udpInterface == 1) { if (receivers.size() != 0) { @@ -601,8 +614,8 @@ int DetectorImpl::createReceivingDataSockets(const bool destroy) { numSockets *= numSocketsPerDetector; for (size_t iSocket = 0; iSocket < numSockets; ++iSocket) { - uint32_t portnum = (detectors[iSocket / numSocketsPerDetector] - ->getClientStreamingPort()); + uint32_t portnum = (receivers[iSocket / numSocketsPerDetector][0] + ->getClientZmqPort());//FIXME 2 receivers portnum += (iSocket % numSocketsPerDetector); try { zmqSocket.push_back(sls::make_unique( diff --git a/slsDetectorSoftware/src/DetectorImpl.h b/slsDetectorSoftware/src/DetectorImpl.h index 2b77915bf..aa71aaaed 100755 --- a/slsDetectorSoftware/src/DetectorImpl.h +++ b/slsDetectorSoftware/src/DetectorImpl.h @@ -668,6 +668,7 @@ class DetectorImpl : public virtual slsDetectorDefs { void setHostname(const std::vector &name, const std::vector &port); + int getNumberofReceiversPerModule() const; void initReceiver(const int udpInterface); bool isReceiverInitialized(const int udpInterface); void removeReceivers(const int udpInterface); diff --git a/slsDetectorSoftware/src/Module.cpp b/slsDetectorSoftware/src/Module.cpp index ae26e0a4c..2b52a8cfa 100755 --- a/slsDetectorSoftware/src/Module.cpp +++ b/slsDetectorSoftware/src/Module.cpp @@ -1462,12 +1462,6 @@ void Module::setDestinationUDPIP(const IpAddr ip) { throw RuntimeError("Invalid destination udp ip address"); } sendToDetector(F_SET_DEST_UDP_IP, ip, nullptr); - if (shm()->useReceiver) { - sls::MacAddr retval(0LU); - sendToReceiver(F_SET_RECEIVER_UDP_IP, ip, retval); - LOG(logINFO) << "Setting destination udp mac of detector " << moduleId << " to " << retval; - sendToDetector(F_SET_DEST_UDP_MAC, retval, nullptr); - } } sls::IpAddr Module::getDestinationUDPIP() { @@ -1483,14 +1477,7 @@ void Module::setDestinationUDPIP2(const IpAddr ip) { if (ip == 0) { throw RuntimeError("Invalid destination udp ip address2"); } - sendToDetector(F_SET_DEST_UDP_IP2, ip, nullptr); - if (shm()->useReceiver) { - sls::MacAddr retval(0LU); - sendToReceiver(F_SET_RECEIVER_UDP_IP2, ip, retval); - LOG(logINFO) << "Setting destination udp mac2 of detector " << moduleId << " to " << retval; - sendToDetector(F_SET_DEST_UDP_MAC2, retval, nullptr); - } } sls::IpAddr Module::getDestinationUDPIP2() { @@ -1538,9 +1525,6 @@ sls::MacAddr Module::getDestinationUDPMAC2() { void Module::setDestinationUDPPort(const int port) { LOG(logDEBUG1) << "Setting destination udp port to " << port; sendToDetector(F_SET_DEST_UDP_PORT, port, nullptr); - if (shm()->useReceiver) { - sendToReceiver(F_SET_RECEIVER_UDP_PORT, port, nullptr); - } } int Module::getDestinationUDPPort() { @@ -1555,9 +1539,6 @@ int Module::getDestinationUDPPort() { void Module::setDestinationUDPPort2(const int port) { LOG(logDEBUG1) << "Setting destination udp port2 to " << port; sendToDetector(F_SET_DEST_UDP_PORT2, port, nullptr); - if (shm()->useReceiver) { - sendToReceiver(F_SET_RECEIVER_UDP_PORT2, port, nullptr); - } } int Module::getDestinationUDPPort2() { @@ -1632,24 +1613,6 @@ std::string Module::printUDPConfiguration() { return oss.str(); } -void Module::setClientStreamingPort(int port) { shm()->zmqport = port; } - -int Module::getClientStreamingPort() { return shm()->zmqport; } - -void Module::setReceiverStreamingPort(int port) { - if (!shm()->useReceiver) { - throw RuntimeError("Set rx_hostname first to use receiver parameters (zmq port)"); - } - sendToReceiver(F_SET_RECEIVER_STREAMING_PORT, port, nullptr); -} - -int Module::getReceiverStreamingPort() { - if (!shm()->useReceiver) { - throw RuntimeError("Set rx_hostname first to get receiver parameters (zmq port)"); - } - return sendToReceiver(F_GET_RECEIVER_STREAMING_PORT); -} - void Module::setClientStreamingIP(const sls::IpAddr ip) { LOG(logDEBUG1) << "Setting client zmq ip to " << ip; if (ip == 0) { diff --git a/slsDetectorSoftware/src/Module.h b/slsDetectorSoftware/src/Module.h index 872f64740..09c231e46 100755 --- a/slsDetectorSoftware/src/Module.h +++ b/slsDetectorSoftware/src/Module.h @@ -792,30 +792,6 @@ class Module : public virtual slsDetectorDefs { std::string printUDPConfiguration(); - /** - * Sets the client zmq port - * @param port client zmq port - */ - void setClientStreamingPort(int port); - - /** - * Returns the client zmq port - * @returns the client zmq port - */ - int getClientStreamingPort(); - - /** - * Sets the receiver zmq port - * @param port receiver zmq port - */ - void setReceiverStreamingPort(int port); - - /** - * Returns the receiver zmq port - * @returns the receiver zmq port - */ - int getReceiverStreamingPort(); - /** * Sets the client zmq ip * @param ip client zmq ip diff --git a/slsDetectorSoftware/src/Receiver.cpp b/slsDetectorSoftware/src/Receiver.cpp index 54524bef3..43e408d0c 100755 --- a/slsDetectorSoftware/src/Receiver.cpp +++ b/slsDetectorSoftware/src/Receiver.cpp @@ -361,8 +361,8 @@ void Receiver::restreamStop() { } /** Network Configuration (Detector<->Receiver) */ -sls::MacAddr Receiver::setDestinationUDPIP(const IpAddr ip) { - LOG(logDEBUG1) << "Setting destination udp ip to " << ip; +sls::MacAddr Receiver::setUDPIP(const IpAddr ip) { + LOG(logDEBUG1) << "Setting udp ip to receier: " << ip; if (ip == 0) { throw RuntimeError("Invalid destination udp ip address"); } @@ -371,6 +371,30 @@ sls::MacAddr Receiver::setDestinationUDPIP(const IpAddr ip) { return retval; } +void Receiver::setUDPPort(const int port) { + LOG(logDEBUG1) << "Setting udp port to receiver: " << port; + sendToReceiver(F_SET_RECEIVER_UDP_PORT, port, nullptr); +} + +/** ZMQ Streaming Parameters (Receiver<->Client) */ +void Receiver::setClientZmqPort(const int port) { + shm()->zmqPort = port; +} + +int Receiver::getClientZmqPort() const { + return shm()->zmqPort; +} + +void Receiver::setReceiverZmqPort(int port) { + sendToReceiver(F_SET_RECEIVER_STREAMING_PORT, port, nullptr); +} + +int Receiver::getReceiverZmqPort() const { + return sendToReceiver(F_GET_RECEIVER_STREAMING_PORT); +} + + + /** Detector Parameters */ void Receiver::setNumberOfFrames(int64_t value) { LOG(logDEBUG1) << "Sending number of frames to Receiver: " << value; diff --git a/slsDetectorSoftware/src/Receiver.h b/slsDetectorSoftware/src/Receiver.h index 0686a6b39..21b724a7b 100755 --- a/slsDetectorSoftware/src/Receiver.h +++ b/slsDetectorSoftware/src/Receiver.h @@ -76,7 +76,19 @@ class Receiver : public virtual slsDetectorDefs { * Network Configuration (Detector<->Receiver) * * * * ************************************************/ - sls::MacAddr setDestinationUDPIP(const sls::IpAddr ip); + sls::MacAddr setUDPIP(const sls::IpAddr ip); + void setUDPPort(int udpport); + + /************************************************** + * * + * ZMQ Streaming Parameters (Receiver<->Client)* + * * + * ************************************************/ + int getClientZmqPort() const; + void setClientZmqPort(const int port); + int getReceiverZmqPort() const; + void setReceiverZmqPort(int port); + /************************************************** * * @@ -116,11 +128,6 @@ class Receiver : public virtual slsDetectorDefs { * File * * * * ************************************************/ - /************************************************** - * * - * ZMQ Streaming Parameters (Receiver<->Client)* - * * - * ************************************************/ private: void sendToReceiver(int fnum, const void *args, size_t args_size,