From 6c1035aa99277912f6d107a692d465bc95f8b736 Mon Sep 17 00:00:00 2001 From: Dhanya Thattil Date: Thu, 8 Oct 2020 13:01:01 +0200 Subject: [PATCH] zmq hwm are specified to 2 for gui and restreaming of receiver if all zmq not closed at end of acquiistion --- slsDetectorGui/forms/form_tab_plot.ui | 322 ++++++++++++-------- slsDetectorGui/include/qDefs.h | 1 + slsDetectorGui/include/qTabPlot.h | 7 +- slsDetectorGui/src/qTabPlot.cpp | 95 ++++-- slsDetectorSoftware/include/Detector.h | 20 ++ slsDetectorSoftware/src/CmdProxy.cpp | 35 ++- slsDetectorSoftware/src/CmdProxy.h | 42 +++ slsDetectorSoftware/src/Detector.cpp | 31 +- slsDetectorSoftware/src/DetectorImpl.cpp | 98 ++++-- slsDetectorSoftware/src/DetectorImpl.h | 17 +- slsDetectorSoftware/src/Module.cpp | 16 +- slsDetectorSoftware/src/Module.h | 4 +- slsReceiverSoftware/src/ClientInterface.cpp | 21 +- slsReceiverSoftware/src/ClientInterface.h | 2 + slsReceiverSoftware/src/DataStreamer.cpp | 14 +- slsReceiverSoftware/src/DataStreamer.h | 4 +- slsReceiverSoftware/src/Implementation.cpp | 14 +- slsReceiverSoftware/src/Implementation.h | 3 + slsSupportLib/include/ZmqSocket.h | 12 + slsSupportLib/include/sls_detector_funcs.h | 5 + slsSupportLib/src/ZmqSocket.cpp | 41 +++ 21 files changed, 614 insertions(+), 190 deletions(-) diff --git a/slsDetectorGui/forms/form_tab_plot.ui b/slsDetectorGui/forms/form_tab_plot.ui index c96357286..2e6d56ebf 100755 --- a/slsDetectorGui/forms/form_tab_plot.ui +++ b/slsDetectorGui/forms/form_tab_plot.ui @@ -75,7 +75,7 @@ 6 - + true @@ -1388,7 +1388,7 @@ Displays minimum, maximum and sum of values for each plot. - + @@ -1789,7 +1789,7 @@ Displays minimum, maximum and sum of values for each plot. - + true @@ -1990,23 +1990,7 @@ Displays minimum, maximum and sum of values for each plot. - - - - Qt::Horizontal - - - QSizePolicy::Fixed - - - - 10 - 20 - - - - - + @@ -2016,7 +2000,7 @@ Displays minimum, maximum and sum of values for each plot. - 0 + 350 65 @@ -2024,7 +2008,7 @@ Displays minimum, maximum and sum of values for each plot. <html><head/><body><p>Sets the pace at which the receiver streams out images. Images in between the timeout or frequency are not sent out.<br/><br/><span style=" font-weight:600;">Time Interval</span>: Streaming time interval when an image should be streamed. <br/><span style=" font-weight:600;">Every nth Image</span>: Only every nth image is streamed. </p><p><br/></p><p><br/></p></body></html> - Receiver Streaming Frequency / Timer + Receiver Streaming Qt::AlignLeading|Qt::AlignLeft|Qt::AlignVCenter @@ -2052,6 +2036,33 @@ Displays minimum, maximum and sum of values for each plot. 0 + + + true + + + + 0 + 0 + + + + + 80 + 16777215 + + + + <nobr> +Displays minimum, maximum and sum of values for each plot. +<nobr> + + + No Plot + + + + @@ -2061,13 +2072,13 @@ Displays minimum, maximum and sum of values for each plot. - 150 + 135 0 - 16777215 + 135 16777215 @@ -2150,29 +2161,25 @@ Displays minimum, maximum and sum of values for each plot. - - - - Qt::Horizontal - - - QSizePolicy::Expanding - - + + + - 80 - 20 + 150 + 16777215 - - - - - 1 + 0 + + 2 + + + 2 + @@ -2181,11 +2188,17 @@ Displays minimum, maximum and sum of values for each plot. 0 + + + 90 + 30 + + false - 3 + 2 999999.000000000000000 @@ -2238,6 +2251,9 @@ Displays minimum, maximum and sum of values for each plot. + + 2 + @@ -2267,78 +2283,8 @@ Displays minimum, maximum and sum of values for each plot. - - - - - - - true - - - - 0 - 0 - - - - - 350 - 65 - - - - - 16777215 - 16777215 - - - - Plot Type - - - Qt::AlignLeading|Qt::AlignLeft|Qt::AlignVCenter - - - false - - - - 6 - - - 0 - - - 6 - - - 0 - - - 2 - - - 0 - - - - - - 0 - 0 - - - - Data Graph - - - true - - - - - + + Qt::Horizontal @@ -2347,22 +2293,159 @@ Displays minimum, maximum and sum of values for each plot. - 10 + 30 20 - - + + + + Snd Hwm: + + + + + + + Qt::Horizontal + + + QSizePolicy::Expanding + + + + 30 + 20 + + + + + + + + Qt::Horizontal + + + QSizePolicy::Expanding + + + + 30 + 20 + + + + + + - + 0 0 + + + 0 + 25 + + + + + 16777215 + 30 + + + + <html><head/><body><p>High water mark for inbound messages for gui.</p><p> #zmqhwm#</p></body></html> + + + + + + + + + Qt::AlignLeading|Qt::AlignLeft|Qt::AlignVCenter + + + false + + + + + + + + + -1 + + + 99999 + + + 1000 + + + + + + + + 0 + 0 + + + + + 0 + 25 + + + + + 16777215 + 30 + + + + <html><head/><body><p>High water mark for outbound messages for receiver.</p><p>#rx_zmqhwm#</p></body></html> + + + + + + + + + Qt::AlignLeading|Qt::AlignLeft|Qt::AlignVCenter + + + false + + + + + + + + + -1 + + + 99999 + + + 1000 + + + + + - No Plot + Rcv Hwm: @@ -2370,7 +2453,6 @@ Displays minimum, maximum and sum of values for each plot. - boxPlotType box1D boxFrequency box2D @@ -2378,8 +2460,6 @@ Displays minimum, maximum and sum of values for each plot. boxSave - radioNoPlot - radioDataGraph comboFrequency spinTimeGap comboTimeGapUnit diff --git a/slsDetectorGui/include/qDefs.h b/slsDetectorGui/include/qDefs.h index 20a468b8b..fd053c248 100644 --- a/slsDetectorGui/include/qDefs.h +++ b/slsDetectorGui/include/qDefs.h @@ -40,6 +40,7 @@ class qDefs : public QWidget { static const int Q_FONT_SIZE = 9; static const int DATA_GAIN_PLOT_RATIO = 5; static const int MIN_HEIGHT_GAIN_PLOT_1D = 75; + static const int GUI_ZMQ_RCV_HWM = 2; static void DisplayExceptions(std::string emsg, std::string src) { try { diff --git a/slsDetectorGui/include/qTabPlot.h b/slsDetectorGui/include/qTabPlot.h index 5bca30acd..c87d173a5 100644 --- a/slsDetectorGui/include/qTabPlot.h +++ b/slsDetectorGui/include/qTabPlot.h @@ -3,7 +3,6 @@ #include "ui_form_tab_plot.h" class qDrawPlot; -class QButtonGroup; class qTabPlot : public QWidget, private Ui::TabPlotObject { Q_OBJECT @@ -35,6 +34,8 @@ class qTabPlot : public QWidget, private Ui::TabPlotObject { void CheckAspectRatio(); void SetZRange(); void SetStreamingFrequency(); + void SetStreamingHwm(int value); + void SetReceivingHwm(int value); signals: void DisableZoomSignal(bool); @@ -45,6 +46,8 @@ class qTabPlot : public QWidget, private Ui::TabPlotObject { void Select1DPlot(bool enable); void GetGapPixels(); void GetStreamingFrequency(); + void GetStreamingHwm(); + void GetReceivingHwm(); void SetXYRange(); void MaintainAspectRatio(int dimension); @@ -52,8 +55,6 @@ class qTabPlot : public QWidget, private Ui::TabPlotObject { qDrawPlot *plot; bool is1d; - QButtonGroup *btnGroupPlotType{nullptr}; - /** default plot and axis titles */ static QString defaultPlotTitle; static QString defaultHistXAxisTitle; diff --git a/slsDetectorGui/src/qTabPlot.cpp b/slsDetectorGui/src/qTabPlot.cpp index 52190f282..1f1e77a4e 100644 --- a/slsDetectorGui/src/qTabPlot.cpp +++ b/slsDetectorGui/src/qTabPlot.cpp @@ -1,8 +1,6 @@ #include "qTabPlot.h" #include "qDefs.h" #include "qDrawPlot.h" -#include -#include #include #include @@ -20,14 +18,9 @@ qTabPlot::qTabPlot(QWidget *parent, sls::Detector *detector, qDrawPlot *p) LOG(logDEBUG) << "Plot ready"; } -qTabPlot::~qTabPlot() { delete btnGroupPlotType; } +qTabPlot::~qTabPlot() {} void qTabPlot::SetupWidgetWindow() { - // button group for plot type - btnGroupPlotType = new QButtonGroup(this); - btnGroupPlotType->addButton(radioNoPlot, 0); - btnGroupPlotType->addButton(radioDataGraph, 1); - // 1D and 2D options stackedWidget1D->setCurrentIndex(0); stackedWidget2D->setCurrentIndex(0); @@ -76,19 +69,18 @@ void qTabPlot::SetupWidgetWindow() { Initialization(); Refresh(); - // set default timer - spinTimeGap->setValue(DEFAULT_STREAMING_TIMER_IN_MS); - // set to streaming timer - stackedTimeInterval->setCurrentIndex(0); - comboFrequency->setCurrentIndex(0); + // set zmq high water mark to GUI_ZMQ_RCV_HWM (2) + spinSndHwm->setValue(qDefs::GUI_ZMQ_RCV_HWM); + spinRcvHwm->setValue(qDefs::GUI_ZMQ_RCV_HWM); } void qTabPlot::Initialization() { - // Plot arguments box - connect(btnGroupPlotType, SIGNAL(buttonClicked(int)), this, - SLOT(SetPlot())); - // Plotting frequency box + connect(chkNoPlot, SIGNAL(toggled(bool)), this, SLOT(SetPlot())); + connect(spinSndHwm, SIGNAL(valueChanged(int)), this, + SLOT(SetStreamingHwm(int))); + connect(spinRcvHwm, SIGNAL(valueChanged(int)), this, + SLOT(SetReceivingHwm(int))); connect(comboFrequency, SIGNAL(currentIndexChanged(int)), this, SLOT(SetStreamingFrequency())); connect(comboTimeGapUnit, SIGNAL(currentIndexChanged(int)), this, @@ -214,13 +206,18 @@ void qTabPlot::Select1DPlot(bool enable) { void qTabPlot::SetPlot() { bool plotEnable = false; - if (radioNoPlot->isChecked()) { + if (chkNoPlot->isChecked()) { LOG(logINFO) << "Setting Plot Type: No Plot"; - } else if (radioDataGraph->isChecked()) { + } else { LOG(logINFO) << "Setting Plot Type: Datagraph"; plotEnable = true; } - boxFrequency->setEnabled(plotEnable); + comboFrequency->setEnabled(plotEnable); + lblSndHwm->setEnabled(plotEnable); + spinSndHwm->setEnabled(plotEnable); + lblRcvHwm->setEnabled(plotEnable); + spinRcvHwm->setEnabled(plotEnable); + stackedTimeInterval->setEnabled(plotEnable); box1D->setEnabled(plotEnable); box2D->setEnabled(plotEnable); boxSave->setEnabled(plotEnable); @@ -705,17 +702,68 @@ void qTabPlot::SetStreamingFrequency() { &qTabPlot::GetStreamingFrequency) } +void qTabPlot::GetStreamingHwm() { + LOG(logDEBUG) << "Getting Streaming Hwm for receiver"; + disconnect(spinSndHwm, SIGNAL(valueChanged(int)), this, + SLOT(SetStreamingHwm(int))); + try { + int value = det->getRxZmqHwm().tsquash( + "Inconsistent streaming hwm for all receivers."); + LOG(logDEBUG) << "Got streaming hwm for receiver " << value; + spinSndHwm->setValue(value); + } + CATCH_DISPLAY("Could not get streaming hwm for receiver.", + "qTabPlot::GetStreamingHwm") + connect(spinSndHwm, SIGNAL(valueChanged(int)), this, + SLOT(SetStreamingHwm(int))); +} + +void qTabPlot::SetStreamingHwm(int value) { + LOG(logINFO) << "Setting Streaming Hwm for receiver to " << value; + try { + det->setRxZmqHwm(value); + } + CATCH_HANDLE("Could not set streaming hwm for receiver.", + "qTabPlot::SetStreamingHwm", this, &qTabPlot::GetStreamingHwm) +} + +void qTabPlot::GetReceivingHwm() { + LOG(logDEBUG) << "Getting Receiving Hwm for client"; + try { + int value = det->getClientZmqHwm(); + LOG(logDEBUG) << "Got receiving hwm for client " << value; + } + CATCH_DISPLAY("Could not get receiving hwm for client.", + "qTabPlot::GetReceivingHwm") +} + +void qTabPlot::SetReceivingHwm(int value) { + LOG(logINFO) << "Setting Streaming Hwm to " << value; + try { + det->setClientZmqHwm(value); + } + CATCH_HANDLE("Could not set receiving hwm from client.", + "qTabPlot::SetReceivingHwm", this, &qTabPlot::GetReceivingHwm) +} + void qTabPlot::Refresh() { LOG(logDEBUG) << "**Updating Plot Tab"; if (!plot->GetIsRunning()) { - boxPlotType->setEnabled(true); + boxFrequency->setEnabled(true); // streaming frequency - if (!radioNoPlot->isChecked()) { - boxFrequency->setEnabled(true); + if (!chkNoPlot->isChecked()) { + comboFrequency->setEnabled(true); + stackedTimeInterval->setEnabled(true); + lblSndHwm->setEnabled(true); + spinSndHwm->setEnabled(true); + lblRcvHwm->setEnabled(true); + spinRcvHwm->setEnabled(true); } GetStreamingFrequency(); + GetStreamingHwm(); + GetReceivingHwm(); // gain plot, gap pixels enable switch (det->getDetectorType().squash()) { case slsDetectorDefs::EIGER: @@ -734,7 +782,6 @@ void qTabPlot::Refresh() { break; } } else { - boxPlotType->setEnabled(false); boxFrequency->setEnabled(false); chkGainPlot->setEnabled(false); chkGainPlot1D->setEnabled(false); diff --git a/slsDetectorSoftware/include/Detector.h b/slsDetectorSoftware/include/Detector.h index 89fc94b73..b91bc058e 100644 --- a/slsDetectorSoftware/include/Detector.h +++ b/slsDetectorSoftware/include/Detector.h @@ -887,6 +887,26 @@ class Detector { * if enabled. */ void setClientZmqIp(const IpAddr ip, Positions pos = {}); + + int getClientZmqHwm() const; + + /** Client's zmq receive high water mark. \n Default is the zmq library's + * default (1000), can also be set here using -1. \n This is a high number + * and can be set to 2 for gui purposes. \n One must also set the receiver's + * send high water mark to similar value. Final effect is sum of them. + */ + void setClientZmqHwm(const int limit); + + Result getRxZmqHwm(Positions pos = {}) const; + + /** Receiver's zmq send high water mark. \n Default is the zmq library's + * default (1000) \n This is a high number and can be set to 2 for gui + * purposes. \n One must also set the client's receive high water mark to + * similar value. Final effect is sum of them. Also restarts receiver zmq + * streaming if enabled. \n Can set to -1 to set default. + */ + void setRxZmqHwm(const int limit); + ///@{ /** @name Eiger Specific */ diff --git a/slsDetectorSoftware/src/CmdProxy.cpp b/slsDetectorSoftware/src/CmdProxy.cpp index d4fce7586..1cb2bb015 100644 --- a/slsDetectorSoftware/src/CmdProxy.cpp +++ b/slsDetectorSoftware/src/CmdProxy.cpp @@ -1257,7 +1257,40 @@ std::string CmdProxy::ReceiverHostname(int action) { return os.str(); } /* File */ + /* ZMQ Streaming Parameters (Receiver<->Client) */ + +std::string CmdProxy::ZMQHWM(int action) { + std::ostringstream os; + os << cmd << ' '; + if (action == defs::HELP_ACTION) { + os << "[n_limit] \n\tClient's zmq receive high water mark. Default is " + "the zmq library's default (1000), can also be set here using " + "-1. \n This is a high number and can be set to 2 for gui " + "purposes. \n One must also set the receiver's send high water " + "mark to similar value. Final effect is sum of them.\n\t Setting " + "it via command line is useful only before zmq enabled (before " + "opening gui)." + << '\n'; + } else if (action == defs::GET_ACTION) { + if (!args.empty()) { + WrongNumberOfParameters(0); + } + auto t = det->getClientZmqHwm(); + os << t << '\n'; + } else if (action == defs::PUT_ACTION) { + if (args.size() != 1) { + WrongNumberOfParameters(1); + } + int t = StringTo(args[0]); + det->setClientZmqHwm(t); + os << det->getClientZmqHwm() << '\n'; + } else { + throw sls::RuntimeError("Unknown action"); + } + return os.str(); +} + /* Eiger Specific */ std::string CmdProxy::Threshold(int action) { @@ -2681,7 +2714,7 @@ std::string CmdProxy::ExecuteCommand(int action) { throw sls::RuntimeError("Cannot get."); } else if (action == defs::PUT_ACTION) { std::string command; - for (auto &i: args) { + for (auto &i : args) { command += (i + ' '); } auto t = det->executeCommand(command, std::vector{det_id}); diff --git a/slsDetectorSoftware/src/CmdProxy.h b/slsDetectorSoftware/src/CmdProxy.h index 16782576c..97f1e636a 100644 --- a/slsDetectorSoftware/src/CmdProxy.h +++ b/slsDetectorSoftware/src/CmdProxy.h @@ -235,6 +235,36 @@ return os.str(); \ } +/** int, set no id, get id */ +#define INTEGER_COMMAND_SET_NOID_GET_ID(CMDNAME, GETFCN, SETFCN, CONV, HLPSTR) \ + std::string CMDNAME(const int action) { \ + std::ostringstream os; \ + os << cmd << ' '; \ + if (action == slsDetectorDefs::HELP_ACTION) \ + os << HLPSTR << '\n'; \ + else if (action == slsDetectorDefs::GET_ACTION) { \ + if (!args.empty()) { \ + WrongNumberOfParameters(0); \ + } \ + auto t = det->GETFCN(std::vector{det_id}); \ + os << OutString(t) << '\n'; \ + } else if (action == slsDetectorDefs::PUT_ACTION) { \ + if (det_id != -1) { \ + throw sls::RuntimeError( \ + "Cannot execute this at module level"); \ + } \ + if (args.size() != 1) { \ + WrongNumberOfParameters(1); \ + } \ + auto val = CONV(args[0]); \ + det->SETFCN(val); \ + os << args.front() << '\n'; \ + } else { \ + throw sls::RuntimeError("Unknown action"); \ + } \ + return os.str(); \ + } + /** int, no id */ #define INTEGER_COMMAND_NOID(CMDNAME, GETFCN, SETFCN, CONV, HLPSTR) \ std::string CMDNAME(const int action) { \ @@ -868,6 +898,8 @@ class CmdProxy { {"zmqport", &CmdProxy::zmqport}, {"rx_zmqip", &CmdProxy::rx_zmqip}, {"zmqip", &CmdProxy::zmqip}, + {"zmqhwm", &CmdProxy::ZMQHWM}, + {"rx_zmqhwm", &CmdProxy::rx_zmqhwm}, /* Eiger Specific */ {"subexptime", &CmdProxy::subexptime}, @@ -1071,6 +1103,7 @@ class CmdProxy { std::string ReceiverHostname(int action); /* File */ /* ZMQ Streaming Parameters (Receiver<->Client) */ + std::string ZMQHWM(int action); /* Eiger Specific */ std::string Threshold(int action); std::string ThresholdNoTb(int action); @@ -1692,6 +1725,15 @@ class CmdProxy { "an intermediate process between receiver and client(gui). Also " "restarts client zmq streaming if enabled."); + INTEGER_COMMAND_SET_NOID_GET_ID( + rx_zmqhwm, getRxZmqHwm, setRxZmqHwm, StringTo, + "[n_value]\n\tReceiver's zmq send high water mark. Default is the zmq " + "library's default (1000). This is a high number and can be set to 2 " + "for gui purposes. One must also set the client's receive high water " + "mark to similar value. Final effect is sum of them. Also restarts " + "receiver zmq streaming if enabled. Can set to -1 to set default " + "value."); + /* Eiger Specific */ TIME_COMMAND(subexptime, getSubExptime, setSubExptime, diff --git a/slsDetectorSoftware/src/Detector.cpp b/slsDetectorSoftware/src/Detector.cpp index 6d0558ebe..296293cd4 100644 --- a/slsDetectorSoftware/src/Detector.cpp +++ b/slsDetectorSoftware/src/Detector.cpp @@ -643,7 +643,7 @@ Result Detector::getNumberofUDPInterfaces(Positions pos) const { } void Detector::setNumberofUDPInterfaces(int n, Positions pos) { - int previouslyClientStreaming = pimpl->getDataStreamingToClient(); + bool previouslyClientStreaming = pimpl->getDataStreamingToClient(); bool useReceiver = getUseReceiverFlag().squash(false); bool previouslyReceiverStreaming = false; if (useReceiver) { @@ -656,7 +656,7 @@ void Detector::setNumberofUDPInterfaces(int n, Positions pos) { setRxZmqPort(startingPort, -1); } // redo the zmq sockets if enabled - if (previouslyClientStreaming != 0) { + if (previouslyClientStreaming) { pimpl->setDataStreamingToClient(false); pimpl->setDataStreamingToClient(true); } @@ -1077,7 +1077,7 @@ Result Detector::getClientZmqPort(Positions pos) const { } void Detector::setClientZmqPort(int port, int module_id) { - int previouslyClientStreaming = pimpl->getDataStreamingToClient(); + bool previouslyClientStreaming = pimpl->getDataStreamingToClient(); if (module_id == -1) { std::vector port_list = getPortNumbers(port); for (int idet = 0; idet < size(); ++idet) { @@ -1087,7 +1087,7 @@ void Detector::setClientZmqPort(int port, int module_id) { } else { pimpl->Parallel(&Module::setClientStreamingPort, {module_id}, port); } - if (previouslyClientStreaming != 0) { + if (previouslyClientStreaming) { pimpl->setDataStreamingToClient(false); pimpl->setDataStreamingToClient(true); } @@ -1098,14 +1098,33 @@ Result Detector::getClientZmqIp(Positions pos) const { } void Detector::setClientZmqIp(const IpAddr ip, Positions pos) { - int previouslyClientStreaming = pimpl->getDataStreamingToClient(); + bool previouslyClientStreaming = pimpl->getDataStreamingToClient(); pimpl->Parallel(&Module::setClientStreamingIP, pos, ip); - if (previouslyClientStreaming != 0) { + if (previouslyClientStreaming) { pimpl->setDataStreamingToClient(false); pimpl->setDataStreamingToClient(true); } } +int Detector::getClientZmqHwm() const { return pimpl->getClientStreamingHwm(); } + +void Detector::setClientZmqHwm(const int limit) { + pimpl->setClientStreamingHwm(limit); +} + +Result Detector::getRxZmqHwm(Positions pos) const { + return pimpl->Parallel(&Module::getReceiverStreamingHwm, pos); +} + +void Detector::setRxZmqHwm(const int limit) { + bool previouslyReceiverStreaming = getRxZmqDataStream().squash(false); + pimpl->Parallel(&Module::setReceiverStreamingHwm, {}, limit); + if (previouslyReceiverStreaming) { + setRxZmqDataStream(false, {}); + setRxZmqDataStream(true, {}); + } +} + // Eiger Specific Result Detector::getSubExptime(Positions pos) const { diff --git a/slsDetectorSoftware/src/DetectorImpl.cpp b/slsDetectorSoftware/src/DetectorImpl.cpp index 6757b32e4..eb29524b6 100644 --- a/slsDetectorSoftware/src/DetectorImpl.cpp +++ b/slsDetectorSoftware/src/DetectorImpl.cpp @@ -158,6 +158,8 @@ void DetectorImpl::initializeDetectorStructure() { multi_shm()->acquiringFlag = false; multi_shm()->initialChecks = true; multi_shm()->gapPixels = false; + // zmqlib default + multi_shm()->zmqHwm = -1; } void DetectorImpl::initializeMembers(bool verify) { @@ -377,16 +379,17 @@ void DetectorImpl::setGapPixelsinCallback(const bool enable) { multi_shm()->gapPixels = enable; } -int DetectorImpl::createReceivingDataSockets(const bool destroy) { - if (destroy) { - LOG(logINFO) << "Going to destroy data sockets"; - // close socket - zmqSocket.clear(); +int DetectorImpl::destroyReceivingDataSockets() { + LOG(logINFO) << "Going to destroy data sockets"; + // close socket + zmqSocket.clear(); - client_downstream = false; - LOG(logINFO) << "Destroyed Receiving Data Socket(s)"; - return OK; - } + client_downstream = false; + LOG(logINFO) << "Destroyed Receiving Data Socket(s)"; + return OK; +} + +int DetectorImpl::createReceivingDataSockets() { if (client_downstream) { return OK; } @@ -417,11 +420,21 @@ int DetectorImpl::createReceivingDataSockets(const bool destroy) { .str() .c_str(), portnum)); + // set high water mark + int hwm = multi_shm()->zmqHwm; + if (hwm >= 0) { + zmqSocket[iSocket]->SetReceiveHighWaterMark(hwm); + if (zmqSocket[iSocket]->GetReceiveHighWaterMark() != hwm) { + throw sls::ZmqSocketError("Could not set zmq rcv hwm to " + + std::to_string(hwm)); + } + } LOG(logINFO) << "Zmq Client[" << iSocket << "] at " - << zmqSocket.back()->GetZmqServerAddress(); + << zmqSocket.back()->GetZmqServerAddress() << "[hwm: " + << zmqSocket.back()->GetReceiveHighWaterMark() << "]"; } catch (...) { LOG(logERROR) << "Could not create Zmq socket on port " << portnum; - createReceivingDataSockets(true); + destroyReceivingDataSockets(); return FAIL; } } @@ -449,12 +462,12 @@ void DetectorImpl::readFrameFromReceiver() { } std::vector runningList(zmqSocket.size()); std::vector connectList(zmqSocket.size()); - int numRunning = 0; + numZmqRunning = 0; for (size_t i = 0; i < zmqSocket.size(); ++i) { if (zmqSocket[i]->Connect() == 0) { connectList[i] = true; runningList[i] = true; - ++numRunning; + ++numZmqRunning; } else { // to remember the list it connected to, to disconnect later connectList[i] = false; @@ -480,14 +493,14 @@ void DetectorImpl::readFrameFromReceiver() { uint32_t currentSubFrameIndex = -1, coordX = -1, coordY = -1, flippedDataX = -1; - while (numRunning != 0) { + while (numZmqRunning != 0) { // reset data data = false; if (multiframe != nullptr) { memset(multiframe.get(), 0xFF, multisize); } - completeImage = (numRunning == (int)zmqSocket.size()); + completeImage = (numZmqRunning == (int)zmqSocket.size()); // get each frame for (unsigned int isocket = 0; isocket < zmqSocket.size(); ++isocket) { @@ -505,7 +518,7 @@ void DetectorImpl::readFrameFromReceiver() { // socket runningList[isocket] = false; completeImage = false; - --numRunning; + --numZmqRunning; continue; } @@ -967,7 +980,7 @@ bool DetectorImpl::getDataStreamingToClient() { return client_downstream; } void DetectorImpl::setDataStreamingToClient(bool enable) { // destroy data threads if (!enable) { - createReceivingDataSockets(true); + destroyReceivingDataSockets(); // create data threads } else { if (createReceivingDataSockets() == FAIL) { @@ -976,6 +989,51 @@ void DetectorImpl::setDataStreamingToClient(bool enable) { } } +int DetectorImpl::getClientStreamingHwm() const { + // disabled + if (!client_downstream) { + return multi_shm()->zmqHwm; + } + // enabled + sls::Result result; + result.reserve(zmqSocket.size()); + for (auto &it : zmqSocket) { + result.push_back(it->GetReceiveHighWaterMark()); + } + int res = result.tsquash("Inconsistent zmq receive hwm values"); + return res; +} + +void DetectorImpl::setClientStreamingHwm(const int limit) { + if (limit < -1) { + throw sls::RuntimeError( + "Cannot set hwm to less than -1 (-1 is lib default)."); + } + // update shm + multi_shm()->zmqHwm = limit; + + // streaming enabled + if (client_downstream) { + // custom limit, set it directly + if (limit >= 0) { + for (auto &it : zmqSocket) { + it->SetReceiveHighWaterMark(limit); + if (it->GetReceiveHighWaterMark() != limit) { + multi_shm()->zmqHwm = -1; + throw sls::ZmqSocketError("Could not set zmq rcv hwm to " + + std::to_string(limit)); + } + } + LOG(logINFO) << "Setting Client Zmq socket rcv hwm to " << limit; + } + // default, disable and enable to get default + else { + setDataStreamingToClient(false); + setDataStreamingToClient(true); + } + } +} + void DetectorImpl::registerAcquisitionFinishedCallback(void (*func)(double, int, void *), void *pArg) { @@ -1042,6 +1100,12 @@ int DetectorImpl::acquire() { if (dataReady == nullptr) { setJoinThreadFlag(true); } + if (receiver) { + while (numZmqRunning != 0) { + Parallel(&Module::restreamStopFromReceiver, {}); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + } dataProcessingThread.join(); if (acquisition_finished != nullptr) { diff --git a/slsDetectorSoftware/src/DetectorImpl.h b/slsDetectorSoftware/src/DetectorImpl.h index a0f7144af..1e4ae1865 100644 --- a/slsDetectorSoftware/src/DetectorImpl.h +++ b/slsDetectorSoftware/src/DetectorImpl.h @@ -16,7 +16,7 @@ class detectorData; #include #define MULTI_SHMAPIVERSION 0x190809 -#define MULTI_SHMVERSION 0x200319 +#define MULTI_SHMVERSION 0x201007 #define SHORT_STRING_LENGTH 50 #include @@ -62,6 +62,8 @@ struct sharedMultiSlsDetector { bool acquiringFlag; bool initialChecks; bool gapPixels; + /** high water mark of listening tcp port (only data) */ + int zmqHwm; }; class DetectorImpl : public virtual slsDetectorDefs { @@ -240,6 +242,8 @@ class DetectorImpl : public virtual slsDetectorDefs { bool getDataStreamingToClient(); void setDataStreamingToClient(bool enable); + int getClientStreamingHwm() const; + void setClientStreamingHwm(const int limit); /** * register callback for accessing acquisition final data @@ -324,12 +328,8 @@ class DetectorImpl : public virtual slsDetectorDefs { void updateDetectorSize(); - /** - * Create Receiving Data Sockets - * @param destroy is true to destroy all the sockets - * @returns OK or FAIL - */ - int createReceivingDataSockets(const bool destroy = false); + int destroyReceivingDataSockets(); + int createReceivingDataSockets(); /** * Reads frames from receiver through a constant socket @@ -387,6 +387,9 @@ class DetectorImpl : public virtual slsDetectorDefs { /** ZMQ Socket - Receiver to Client */ std::vector> zmqSocket; + /** number of zmq sockets running currently */ + volatile int numZmqRunning{0}; + /** mutex to synchronize main and data processing threads */ mutable std::mutex mp; diff --git a/slsDetectorSoftware/src/Module.cpp b/slsDetectorSoftware/src/Module.cpp index 04c8835a3..1cce4a841 100644 --- a/slsDetectorSoftware/src/Module.cpp +++ b/slsDetectorSoftware/src/Module.cpp @@ -431,6 +431,10 @@ void Module::stopAcquisition() { } } +void Module::restreamStopFromReceiver() { + sendToReceiver(F_RESTREAM_STOP_FROM_RECEIVER); +} + void Module::startAndReadAll() { shm()->stoppedFlag = false; sendToDetector(F_START_AND_READ_ALL); @@ -1041,6 +1045,14 @@ void Module::setClientStreamingIP(const sls::IpAddr ip) { shm()->zmqip = ip; } +int Module::getReceiverStreamingHwm() const { + return sendToReceiver(F_GET_RECEIVER_STREAMING_HWM); +} + +void Module::setReceiverStreamingHwm(const int limit) { + sendToReceiver(F_SET_RECEIVER_STREAMING_HWM, limit, nullptr); +} + // Eiger Specific int64_t Module::getSubExptime() const { @@ -2781,10 +2793,6 @@ void Module::checkReceiverVersionCompatibility() { sendToReceiver(F_RECEIVER_CHECK_VERSION, int64_t(APIRECEIVER), nullptr); } -void Module::restreamStopFromReceiver() { - sendToReceiver(F_RESTREAM_STOP_FROM_RECEIVER); -} - int Module::sendModule(sls_detector_module *myMod, sls::ClientSocket &client) { constexpr TLogLevel level = logDEBUG1; LOG(level) << "Sending Module"; diff --git a/slsDetectorSoftware/src/Module.h b/slsDetectorSoftware/src/Module.h index c045f2b45..2d8ae07af 100644 --- a/slsDetectorSoftware/src/Module.h +++ b/slsDetectorSoftware/src/Module.h @@ -161,6 +161,7 @@ class Module : public virtual slsDetectorDefs { void stopReceiver(); void startAcquisition(); void stopAcquisition(); + void restreamStopFromReceiver(); void startAndReadAll(); runStatus getRunStatus() const; runStatus getReceiverStatus() const; @@ -290,6 +291,8 @@ class Module : public virtual slsDetectorDefs { void setClientStreamingPort(int port); sls::IpAddr getClientStreamingIP() const; void setClientStreamingIP(const sls::IpAddr ip); + int getReceiverStreamingHwm() const; + void setReceiverStreamingHwm(const int limit); /************************************************** * * @@ -665,7 +668,6 @@ class Module : public virtual slsDetectorDefs { void checkDetectorVersionCompatibility(); void checkReceiverVersionCompatibility(); - void restreamStopFromReceiver(); void setModule(sls_detector_module &module, bool trimbits = true); int sendModule(sls_detector_module *myMod, sls::ClientSocket &client); void updateReceiverStreamingIP(); diff --git a/slsReceiverSoftware/src/ClientInterface.cpp b/slsReceiverSoftware/src/ClientInterface.cpp index 756dd3968..71b5ab4d0 100644 --- a/slsReceiverSoftware/src/ClientInterface.cpp +++ b/slsReceiverSoftware/src/ClientInterface.cpp @@ -206,6 +206,8 @@ int ClientInterface::functionTable(){ flist[F_SET_RECEIVER_RATE_CORRECT] = &ClientInterface::set_rate_correct; flist[F_SET_RECEIVER_SCAN] = &ClientInterface::set_scan; flist[F_RECEIVER_SET_THRESHOLD] = &ClientInterface::set_threshold; + flist[F_GET_RECEIVER_STREAMING_HWM] = &ClientInterface::get_streaming_hwm; + flist[F_SET_RECEIVER_STREAMING_HWM] = &ClientInterface::set_streaming_hwm; for (int i = NUM_DET_FUNCTIONS + 1; i < NUM_REC_FUNCTIONS ; i++) { LOG(logDEBUG1) << "function fnum: " << i << " (" << @@ -1657,4 +1659,21 @@ int ClientInterface::set_threshold(Interface &socket) { verifyIdle(socket); impl()->setThresholdEnergy(arg); return socket.Send(OK); -} \ No newline at end of file +} + +int ClientInterface::get_streaming_hwm(Interface &socket) { + int retval = impl()->getStreamingHwm(); + LOG(logDEBUG1) << "zmq send hwm limit:" << retval; + return socket.sendResult(retval); +} + +int ClientInterface::set_streaming_hwm(Interface &socket) { + auto limit = socket.Receive(); + if (limit < -1) { + throw RuntimeError("Invalid zmq send hwm limit " + + std::to_string(limit)); + } + verifyIdle(socket); + impl()->setStreamingHwm(limit); + return socket.Send(OK); +} diff --git a/slsReceiverSoftware/src/ClientInterface.h b/slsReceiverSoftware/src/ClientInterface.h index b4acc1239..391ff0a16 100644 --- a/slsReceiverSoftware/src/ClientInterface.h +++ b/slsReceiverSoftware/src/ClientInterface.h @@ -159,6 +159,8 @@ class ClientInterface : private virtual slsDetectorDefs { int set_rate_correct(sls::ServerInterface &socket); int set_scan(sls::ServerInterface &socket); int set_threshold(sls::ServerInterface &socket); + int get_streaming_hwm(sls::ServerInterface &socket); + int set_streaming_hwm(sls::ServerInterface &socket); Implementation *impl() { if (receiver != nullptr) { diff --git a/slsReceiverSoftware/src/DataStreamer.cpp b/slsReceiverSoftware/src/DataStreamer.cpp index 74d4e37a6..2549cabfc 100644 --- a/slsReceiverSoftware/src/DataStreamer.cpp +++ b/slsReceiverSoftware/src/DataStreamer.cpp @@ -75,18 +75,28 @@ void DataStreamer::SetAdditionalJsonHeader( } void DataStreamer::CreateZmqSockets(int *nunits, uint32_t port, - const sls::IpAddr ip) { + const sls::IpAddr ip, int hwm) { uint32_t portnum = port + index; std::string sip = ip.str(); try { zmqSocket = new ZmqSocket(portnum, (ip != 0 ? sip.c_str() : nullptr)); + // set if custom + if (hwm >= 0) { + zmqSocket->SetSendHighWaterMark(hwm); + if (zmqSocket->GetSendHighWaterMark() != hwm) { + throw sls::RuntimeError( + "Could not set zmq send high water mark to " + + std::to_string(hwm)); + } + } } catch (...) { LOG(logERROR) << "Could not create Zmq socket on port " << portnum << " for Streamer " << index; throw; } LOG(logINFO) << index << " Streamer: Zmq Server started at " - << zmqSocket->GetZmqServerAddress(); + << zmqSocket->GetZmqServerAddress() + << "[hwm: " << zmqSocket->GetSendHighWaterMark() << "]"; } void DataStreamer::CloseZmqSocket() { diff --git a/slsReceiverSoftware/src/DataStreamer.h b/slsReceiverSoftware/src/DataStreamer.h index be4908a6f..f04bfc4ac 100644 --- a/slsReceiverSoftware/src/DataStreamer.h +++ b/slsReceiverSoftware/src/DataStreamer.h @@ -86,8 +86,10 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject { * @param nunits pointer to number of theads/ units per detector * @param port streaming port start index * @param ip streaming source ip + * @param hwm streaming high water mark */ - void CreateZmqSockets(int *nunits, uint32_t port, const sls::IpAddr ip); + void CreateZmqSockets(int *nunits, uint32_t port, const sls::IpAddr ip, + int hwm); /** * Shuts down and deletes Zmq Sockets diff --git a/slsReceiverSoftware/src/Implementation.cpp b/slsReceiverSoftware/src/Implementation.cpp index 0a9b672c2..4ebdd68f0 100644 --- a/slsReceiverSoftware/src/Implementation.cpp +++ b/slsReceiverSoftware/src/Implementation.cpp @@ -876,7 +876,8 @@ void Implementation::setNumberofUDPInterfaces(const int n) { (int *)nd, &quadEnable, &numberOfTotalFrames)); dataStreamer[i]->SetGeneralData(generalData); dataStreamer[i]->CreateZmqSockets( - &numThreads, streamingPort, streamingSrcIP); + &numThreads, streamingPort, streamingSrcIP, + streamingHwm); dataStreamer[i]->SetAdditionalJsonHeader( additionalJsonHeader); @@ -1004,7 +1005,8 @@ void Implementation::setDataStreamEnable(const bool enable) { (int *)nd, &quadEnable, &numberOfTotalFrames)); dataStreamer[i]->SetGeneralData(generalData); dataStreamer[i]->CreateZmqSockets( - &numThreads, streamingPort, streamingSrcIP); + &numThreads, streamingPort, streamingSrcIP, + streamingHwm); dataStreamer[i]->SetAdditionalJsonHeader( additionalJsonHeader); } catch (...) { @@ -1063,6 +1065,14 @@ void Implementation::setStreamingSourceIP(const sls::IpAddr ip) { LOG(logINFO) << "Streaming Source IP: " << streamingSrcIP; } +int Implementation::getStreamingHwm() const { return streamingHwm; } + +void Implementation::setStreamingHwm(const int i) { + streamingHwm = i; + LOG(logINFO) << "Streaming Hwm: " + << (i == -1 ? "Default (-1)" : std::to_string(streamingHwm)); +} + std::map Implementation::getAdditionalJsonHeader() const { return additionalJsonHeader; diff --git a/slsReceiverSoftware/src/Implementation.h b/slsReceiverSoftware/src/Implementation.h index 5563afa1e..4dd47c98d 100644 --- a/slsReceiverSoftware/src/Implementation.h +++ b/slsReceiverSoftware/src/Implementation.h @@ -130,6 +130,8 @@ class Implementation : private virtual slsDetectorDefs { void setStreamingPort(const uint32_t i); sls::IpAddr getStreamingSourceIP() const; void setStreamingSourceIP(const sls::IpAddr ip); + int getStreamingHwm() const; + void setStreamingHwm(const int i); std::map getAdditionalJsonHeader() const; void setAdditionalJsonHeader(const std::map &c); std::string getAdditionalJsonParameter(const std::string &key) const; @@ -312,6 +314,7 @@ class Implementation : private virtual slsDetectorDefs { uint32_t streamingStartFnum{0}; uint32_t streamingPort{0}; sls::IpAddr streamingSrcIP = sls::IpAddr{}; + int streamingHwm{-1}; std::map additionalJsonHeader; // detector parameters diff --git a/slsSupportLib/include/ZmqSocket.h b/slsSupportLib/include/ZmqSocket.h index ae561afe9..ab9000bb4 100644 --- a/slsSupportLib/include/ZmqSocket.h +++ b/slsSupportLib/include/ZmqSocket.h @@ -93,6 +93,18 @@ class ZmqSocket { */ ZmqSocket(const uint32_t portnumber, const char *ethip); + /** Returns high water mark for outbound messages */ + int GetSendHighWaterMark(); + + /** Sets high water mark for outbound messages. Default 1000 (zmqlib) */ + void SetSendHighWaterMark(int limit); + + /** Returns high water mark for inbound messages */ + int GetReceiveHighWaterMark(); + + /** Sets high water mark for inbound messages. Default 1000 (zmqlib) */ + void SetReceiveHighWaterMark(int limit); + /** * Returns Port Number * @returns Port Number diff --git a/slsSupportLib/include/sls_detector_funcs.h b/slsSupportLib/include/sls_detector_funcs.h index 8ba7fa39e..3d38dccf1 100755 --- a/slsSupportLib/include/sls_detector_funcs.h +++ b/slsSupportLib/include/sls_detector_funcs.h @@ -315,6 +315,8 @@ enum detFuncs { F_SET_RECEIVER_RATE_CORRECT, F_SET_RECEIVER_SCAN, F_RECEIVER_SET_THRESHOLD, + F_GET_RECEIVER_STREAMING_HWM, + F_SET_RECEIVER_STREAMING_HWM, NUM_REC_FUNCTIONS }; @@ -631,6 +633,9 @@ const char* getFunctionNameFromEnum(enum detFuncs func) { case F_SET_RECEIVER_RATE_CORRECT: return "F_SET_RECEIVER_RATE_CORRECT"; case F_SET_RECEIVER_SCAN: return "F_SET_RECEIVER_SCAN"; case F_RECEIVER_SET_THRESHOLD: return "F_RECEIVER_SET_THRESHOLD"; + case F_GET_RECEIVER_STREAMING_HWM: return "F_GET_RECEIVER_STREAMING_HWM"; + case F_SET_RECEIVER_STREAMING_HWM: return "F_SET_RECEIVER_STREAMING_HWM"; + case NUM_REC_FUNCTIONS: return "NUM_REC_FUNCTIONS"; default: return "Unknown Function"; diff --git a/slsSupportLib/src/ZmqSocket.cpp b/slsSupportLib/src/ZmqSocket.cpp index 54b6970a4..0a5d97217 100644 --- a/slsSupportLib/src/ZmqSocket.cpp +++ b/slsSupportLib/src/ZmqSocket.cpp @@ -48,6 +48,8 @@ ZmqSocket::ZmqSocket(const char *const hostname_or_ip, PrintError(); throw sls::ZmqSocketError("Could not set ZMQ_LINGER"); } + LOG(logDEBUG) << "Default receive high water mark:" + << GetReceiveHighWaterMark(); } ZmqSocket::ZmqSocket(const uint32_t portnumber, const char *ethip) @@ -63,6 +65,7 @@ ZmqSocket::ZmqSocket(const uint32_t portnumber, const char *ethip) PrintError(); throw sls::ZmqSocketError("Could not create socket"); } + LOG(logDEBUG) << "Default send high water mark:" << GetSendHighWaterMark(); // construct address, can be refactored with libfmt std::ostringstream oss; @@ -79,6 +82,44 @@ ZmqSocket::ZmqSocket(const uint32_t portnumber, const char *ethip) std::this_thread::sleep_for(std::chrono::milliseconds(200)); }; +int ZmqSocket::GetSendHighWaterMark() { + int value = 0; + size_t value_size = sizeof(value); + if (zmq_getsockopt(sockfd.socketDescriptor, ZMQ_SNDHWM, &value, + &value_size)) { + PrintError(); + throw sls::ZmqSocketError("Could not get ZMQ_SNDHWM"); + } + return value; +} + +void ZmqSocket::SetSendHighWaterMark(int limit) { + if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_SNDHWM, &limit, + sizeof(limit))) { + PrintError(); + throw sls::ZmqSocketError("Could not set ZMQ_SNDHWM"); + } +} + +int ZmqSocket::GetReceiveHighWaterMark() { + int value = 0; + size_t value_size = sizeof(value); + if (zmq_getsockopt(sockfd.socketDescriptor, ZMQ_RCVHWM, &value, + &value_size)) { + PrintError(); + throw sls::ZmqSocketError("Could not get ZMQ_SNDHWM"); + } + return value; +} + +void ZmqSocket::SetReceiveHighWaterMark(int limit) { + if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_RCVHWM, &limit, + sizeof(limit))) { + PrintError(); + throw sls::ZmqSocketError("Could not set ZMQ_SNDHWM"); + } +} + int ZmqSocket::Connect() { if (zmq_connect(sockfd.socketDescriptor, sockfd.serverAddress.c_str())) { PrintError();