diff --git a/slsDetectorGui/forms/form_tab_plot.ui b/slsDetectorGui/forms/form_tab_plot.ui
index c96357286..2e6d56ebf 100755
--- a/slsDetectorGui/forms/form_tab_plot.ui
+++ b/slsDetectorGui/forms/form_tab_plot.ui
@@ -75,7 +75,7 @@
6
- -
+
-
true
@@ -1388,7 +1388,7 @@ Displays minimum, maximum and sum of values for each plot.
- -
+
-
@@ -1789,7 +1789,7 @@ Displays minimum, maximum and sum of values for each plot.
- -
+
-
true
@@ -1990,23 +1990,7 @@ Displays minimum, maximum and sum of values for each plot.
- -
-
-
- Qt::Horizontal
-
-
- QSizePolicy::Fixed
-
-
-
- 10
- 20
-
-
-
-
- -
+
-
@@ -2016,7 +2000,7 @@ Displays minimum, maximum and sum of values for each plot.
- 0
+ 350
65
@@ -2024,7 +2008,7 @@ Displays minimum, maximum and sum of values for each plot.
<html><head/><body><p>Sets the pace at which the receiver streams out images. Images in between the timeout or frequency are not sent out.<br/><br/><span style=" font-weight:600;">Time Interval</span>: Streaming time interval when an image should be streamed. <br/><span style=" font-weight:600;">Every nth Image</span>: Only every nth image is streamed. </p><p><br/></p><p><br/></p></body></html>
- Receiver Streaming Frequency / Timer
+ Receiver Streaming
Qt::AlignLeading|Qt::AlignLeft|Qt::AlignVCenter
@@ -2052,6 +2036,33 @@ Displays minimum, maximum and sum of values for each plot.
0
-
+
+
+ true
+
+
+
+ 0
+ 0
+
+
+
+
+ 80
+ 16777215
+
+
+
+ <nobr>
+Displays minimum, maximum and sum of values for each plot.
+<nobr>
+
+
+ No Plot
+
+
+
+ -
@@ -2061,13 +2072,13 @@ Displays minimum, maximum and sum of values for each plot.
- 150
+ 135
0
- 16777215
+ 135
16777215
@@ -2150,29 +2161,25 @@ Displays minimum, maximum and sum of values for each plot.
- -
-
-
- Qt::Horizontal
-
-
- QSizePolicy::Expanding
-
-
+
-
+
+
- 80
- 20
+ 150
+ 16777215
-
-
- -
-
- 1
+ 0
+
+ 2
+
+
+ 2
+
-
@@ -2181,11 +2188,17 @@ Displays minimum, maximum and sum of values for each plot.
0
+
+
+ 90
+ 30
+
+
false
- 3
+ 2
999999.000000000000000
@@ -2238,6 +2251,9 @@ Displays minimum, maximum and sum of values for each plot.
+
+ 2
+
-
@@ -2267,78 +2283,8 @@ Displays minimum, maximum and sum of values for each plot.
-
-
-
- -
-
-
- true
-
-
-
- 0
- 0
-
-
-
-
- 350
- 65
-
-
-
-
- 16777215
- 16777215
-
-
-
- Plot Type
-
-
- Qt::AlignLeading|Qt::AlignLeft|Qt::AlignVCenter
-
-
- false
-
-
-
- 6
-
-
- 0
-
-
- 6
-
-
- 0
-
-
- 2
-
-
- 0
-
-
-
-
-
-
- 0
- 0
-
-
-
- Data Graph
-
-
- true
-
-
-
- -
-
+
-
+
Qt::Horizontal
@@ -2347,22 +2293,159 @@ Displays minimum, maximum and sum of values for each plot.
- 10
+ 30
20
- -
-
+
-
+
+
+ Snd Hwm:
+
+
+
+ -
+
+
+ Qt::Horizontal
+
+
+ QSizePolicy::Expanding
+
+
+
+ 30
+ 20
+
+
+
+
+ -
+
+
+ Qt::Horizontal
+
+
+ QSizePolicy::Expanding
+
+
+
+ 30
+ 20
+
+
+
+
+ -
+
-
+
0
0
+
+
+ 0
+ 25
+
+
+
+
+ 16777215
+ 30
+
+
+
+ <html><head/><body><p>High water mark for inbound messages for gui.</p><p> #zmqhwm#</p></body></html>
+
+
+
+
+
+
+
+
+ Qt::AlignLeading|Qt::AlignLeft|Qt::AlignVCenter
+
+
+ false
+
+
+
+
+
+
+
+
+ -1
+
+
+ 99999
+
+
+ 1000
+
+
+
+ -
+
+
+
+ 0
+ 0
+
+
+
+
+ 0
+ 25
+
+
+
+
+ 16777215
+ 30
+
+
+
+ <html><head/><body><p>High water mark for outbound messages for receiver.</p><p>#rx_zmqhwm#</p></body></html>
+
+
+
+
+
+
+
+
+ Qt::AlignLeading|Qt::AlignLeft|Qt::AlignVCenter
+
+
+ false
+
+
+
+
+
+
+
+
+ -1
+
+
+ 99999
+
+
+ 1000
+
+
+
+ -
+
- No Plot
+ Rcv Hwm:
@@ -2370,7 +2453,6 @@ Displays minimum, maximum and sum of values for each plot.
- boxPlotType
box1D
boxFrequency
box2D
@@ -2378,8 +2460,6 @@ Displays minimum, maximum and sum of values for each plot.
boxSave
- radioNoPlot
- radioDataGraph
comboFrequency
spinTimeGap
comboTimeGapUnit
diff --git a/slsDetectorGui/include/qDefs.h b/slsDetectorGui/include/qDefs.h
index 20a468b8b..fd053c248 100644
--- a/slsDetectorGui/include/qDefs.h
+++ b/slsDetectorGui/include/qDefs.h
@@ -40,6 +40,7 @@ class qDefs : public QWidget {
static const int Q_FONT_SIZE = 9;
static const int DATA_GAIN_PLOT_RATIO = 5;
static const int MIN_HEIGHT_GAIN_PLOT_1D = 75;
+ static const int GUI_ZMQ_RCV_HWM = 2;
static void DisplayExceptions(std::string emsg, std::string src) {
try {
diff --git a/slsDetectorGui/include/qTabPlot.h b/slsDetectorGui/include/qTabPlot.h
index 5bca30acd..c87d173a5 100644
--- a/slsDetectorGui/include/qTabPlot.h
+++ b/slsDetectorGui/include/qTabPlot.h
@@ -3,7 +3,6 @@
#include "ui_form_tab_plot.h"
class qDrawPlot;
-class QButtonGroup;
class qTabPlot : public QWidget, private Ui::TabPlotObject {
Q_OBJECT
@@ -35,6 +34,8 @@ class qTabPlot : public QWidget, private Ui::TabPlotObject {
void CheckAspectRatio();
void SetZRange();
void SetStreamingFrequency();
+ void SetStreamingHwm(int value);
+ void SetReceivingHwm(int value);
signals:
void DisableZoomSignal(bool);
@@ -45,6 +46,8 @@ class qTabPlot : public QWidget, private Ui::TabPlotObject {
void Select1DPlot(bool enable);
void GetGapPixels();
void GetStreamingFrequency();
+ void GetStreamingHwm();
+ void GetReceivingHwm();
void SetXYRange();
void MaintainAspectRatio(int dimension);
@@ -52,8 +55,6 @@ class qTabPlot : public QWidget, private Ui::TabPlotObject {
qDrawPlot *plot;
bool is1d;
- QButtonGroup *btnGroupPlotType{nullptr};
-
/** default plot and axis titles */
static QString defaultPlotTitle;
static QString defaultHistXAxisTitle;
diff --git a/slsDetectorGui/src/qTabPlot.cpp b/slsDetectorGui/src/qTabPlot.cpp
index 52190f282..1f1e77a4e 100644
--- a/slsDetectorGui/src/qTabPlot.cpp
+++ b/slsDetectorGui/src/qTabPlot.cpp
@@ -1,8 +1,6 @@
#include "qTabPlot.h"
#include "qDefs.h"
#include "qDrawPlot.h"
-#include
-#include
#include
#include
@@ -20,14 +18,9 @@ qTabPlot::qTabPlot(QWidget *parent, sls::Detector *detector, qDrawPlot *p)
LOG(logDEBUG) << "Plot ready";
}
-qTabPlot::~qTabPlot() { delete btnGroupPlotType; }
+qTabPlot::~qTabPlot() {}
void qTabPlot::SetupWidgetWindow() {
- // button group for plot type
- btnGroupPlotType = new QButtonGroup(this);
- btnGroupPlotType->addButton(radioNoPlot, 0);
- btnGroupPlotType->addButton(radioDataGraph, 1);
-
// 1D and 2D options
stackedWidget1D->setCurrentIndex(0);
stackedWidget2D->setCurrentIndex(0);
@@ -76,19 +69,18 @@ void qTabPlot::SetupWidgetWindow() {
Initialization();
Refresh();
- // set default timer
- spinTimeGap->setValue(DEFAULT_STREAMING_TIMER_IN_MS);
- // set to streaming timer
- stackedTimeInterval->setCurrentIndex(0);
- comboFrequency->setCurrentIndex(0);
+ // set zmq high water mark to GUI_ZMQ_RCV_HWM (2)
+ spinSndHwm->setValue(qDefs::GUI_ZMQ_RCV_HWM);
+ spinRcvHwm->setValue(qDefs::GUI_ZMQ_RCV_HWM);
}
void qTabPlot::Initialization() {
- // Plot arguments box
- connect(btnGroupPlotType, SIGNAL(buttonClicked(int)), this,
- SLOT(SetPlot()));
-
// Plotting frequency box
+ connect(chkNoPlot, SIGNAL(toggled(bool)), this, SLOT(SetPlot()));
+ connect(spinSndHwm, SIGNAL(valueChanged(int)), this,
+ SLOT(SetStreamingHwm(int)));
+ connect(spinRcvHwm, SIGNAL(valueChanged(int)), this,
+ SLOT(SetReceivingHwm(int)));
connect(comboFrequency, SIGNAL(currentIndexChanged(int)), this,
SLOT(SetStreamingFrequency()));
connect(comboTimeGapUnit, SIGNAL(currentIndexChanged(int)), this,
@@ -214,13 +206,18 @@ void qTabPlot::Select1DPlot(bool enable) {
void qTabPlot::SetPlot() {
bool plotEnable = false;
- if (radioNoPlot->isChecked()) {
+ if (chkNoPlot->isChecked()) {
LOG(logINFO) << "Setting Plot Type: No Plot";
- } else if (radioDataGraph->isChecked()) {
+ } else {
LOG(logINFO) << "Setting Plot Type: Datagraph";
plotEnable = true;
}
- boxFrequency->setEnabled(plotEnable);
+ comboFrequency->setEnabled(plotEnable);
+ lblSndHwm->setEnabled(plotEnable);
+ spinSndHwm->setEnabled(plotEnable);
+ lblRcvHwm->setEnabled(plotEnable);
+ spinRcvHwm->setEnabled(plotEnable);
+ stackedTimeInterval->setEnabled(plotEnable);
box1D->setEnabled(plotEnable);
box2D->setEnabled(plotEnable);
boxSave->setEnabled(plotEnable);
@@ -705,17 +702,68 @@ void qTabPlot::SetStreamingFrequency() {
&qTabPlot::GetStreamingFrequency)
}
+void qTabPlot::GetStreamingHwm() {
+ LOG(logDEBUG) << "Getting Streaming Hwm for receiver";
+ disconnect(spinSndHwm, SIGNAL(valueChanged(int)), this,
+ SLOT(SetStreamingHwm(int)));
+ try {
+ int value = det->getRxZmqHwm().tsquash(
+ "Inconsistent streaming hwm for all receivers.");
+ LOG(logDEBUG) << "Got streaming hwm for receiver " << value;
+ spinSndHwm->setValue(value);
+ }
+ CATCH_DISPLAY("Could not get streaming hwm for receiver.",
+ "qTabPlot::GetStreamingHwm")
+ connect(spinSndHwm, SIGNAL(valueChanged(int)), this,
+ SLOT(SetStreamingHwm(int)));
+}
+
+void qTabPlot::SetStreamingHwm(int value) {
+ LOG(logINFO) << "Setting Streaming Hwm for receiver to " << value;
+ try {
+ det->setRxZmqHwm(value);
+ }
+ CATCH_HANDLE("Could not set streaming hwm for receiver.",
+ "qTabPlot::SetStreamingHwm", this, &qTabPlot::GetStreamingHwm)
+}
+
+void qTabPlot::GetReceivingHwm() {
+ LOG(logDEBUG) << "Getting Receiving Hwm for client";
+ try {
+ int value = det->getClientZmqHwm();
+ LOG(logDEBUG) << "Got receiving hwm for client " << value;
+ }
+ CATCH_DISPLAY("Could not get receiving hwm for client.",
+ "qTabPlot::GetReceivingHwm")
+}
+
+void qTabPlot::SetReceivingHwm(int value) {
+ LOG(logINFO) << "Setting Streaming Hwm to " << value;
+ try {
+ det->setClientZmqHwm(value);
+ }
+ CATCH_HANDLE("Could not set receiving hwm from client.",
+ "qTabPlot::SetReceivingHwm", this, &qTabPlot::GetReceivingHwm)
+}
+
void qTabPlot::Refresh() {
LOG(logDEBUG) << "**Updating Plot Tab";
if (!plot->GetIsRunning()) {
- boxPlotType->setEnabled(true);
+ boxFrequency->setEnabled(true);
// streaming frequency
- if (!radioNoPlot->isChecked()) {
- boxFrequency->setEnabled(true);
+ if (!chkNoPlot->isChecked()) {
+ comboFrequency->setEnabled(true);
+ stackedTimeInterval->setEnabled(true);
+ lblSndHwm->setEnabled(true);
+ spinSndHwm->setEnabled(true);
+ lblRcvHwm->setEnabled(true);
+ spinRcvHwm->setEnabled(true);
}
GetStreamingFrequency();
+ GetStreamingHwm();
+ GetReceivingHwm();
// gain plot, gap pixels enable
switch (det->getDetectorType().squash()) {
case slsDetectorDefs::EIGER:
@@ -734,7 +782,6 @@ void qTabPlot::Refresh() {
break;
}
} else {
- boxPlotType->setEnabled(false);
boxFrequency->setEnabled(false);
chkGainPlot->setEnabled(false);
chkGainPlot1D->setEnabled(false);
diff --git a/slsDetectorSoftware/include/Detector.h b/slsDetectorSoftware/include/Detector.h
index 89fc94b73..b91bc058e 100644
--- a/slsDetectorSoftware/include/Detector.h
+++ b/slsDetectorSoftware/include/Detector.h
@@ -887,6 +887,26 @@ class Detector {
* if enabled.
*/
void setClientZmqIp(const IpAddr ip, Positions pos = {});
+
+ int getClientZmqHwm() const;
+
+ /** Client's zmq receive high water mark. \n Default is the zmq library's
+ * default (1000), can also be set here using -1. \n This is a high number
+ * and can be set to 2 for gui purposes. \n One must also set the receiver's
+ * send high water mark to similar value. Final effect is sum of them.
+ */
+ void setClientZmqHwm(const int limit);
+
+ Result getRxZmqHwm(Positions pos = {}) const;
+
+ /** Receiver's zmq send high water mark. \n Default is the zmq library's
+ * default (1000) \n This is a high number and can be set to 2 for gui
+ * purposes. \n One must also set the client's receive high water mark to
+ * similar value. Final effect is sum of them. Also restarts receiver zmq
+ * streaming if enabled. \n Can set to -1 to set default.
+ */
+ void setRxZmqHwm(const int limit);
+
///@{
/** @name Eiger Specific */
diff --git a/slsDetectorSoftware/src/CmdProxy.cpp b/slsDetectorSoftware/src/CmdProxy.cpp
index d4fce7586..1cb2bb015 100644
--- a/slsDetectorSoftware/src/CmdProxy.cpp
+++ b/slsDetectorSoftware/src/CmdProxy.cpp
@@ -1257,7 +1257,40 @@ std::string CmdProxy::ReceiverHostname(int action) {
return os.str();
}
/* File */
+
/* ZMQ Streaming Parameters (Receiver<->Client) */
+
+std::string CmdProxy::ZMQHWM(int action) {
+ std::ostringstream os;
+ os << cmd << ' ';
+ if (action == defs::HELP_ACTION) {
+ os << "[n_limit] \n\tClient's zmq receive high water mark. Default is "
+ "the zmq library's default (1000), can also be set here using "
+ "-1. \n This is a high number and can be set to 2 for gui "
+ "purposes. \n One must also set the receiver's send high water "
+ "mark to similar value. Final effect is sum of them.\n\t Setting "
+ "it via command line is useful only before zmq enabled (before "
+ "opening gui)."
+ << '\n';
+ } else if (action == defs::GET_ACTION) {
+ if (!args.empty()) {
+ WrongNumberOfParameters(0);
+ }
+ auto t = det->getClientZmqHwm();
+ os << t << '\n';
+ } else if (action == defs::PUT_ACTION) {
+ if (args.size() != 1) {
+ WrongNumberOfParameters(1);
+ }
+ int t = StringTo(args[0]);
+ det->setClientZmqHwm(t);
+ os << det->getClientZmqHwm() << '\n';
+ } else {
+ throw sls::RuntimeError("Unknown action");
+ }
+ return os.str();
+}
+
/* Eiger Specific */
std::string CmdProxy::Threshold(int action) {
@@ -2681,7 +2714,7 @@ std::string CmdProxy::ExecuteCommand(int action) {
throw sls::RuntimeError("Cannot get.");
} else if (action == defs::PUT_ACTION) {
std::string command;
- for (auto &i: args) {
+ for (auto &i : args) {
command += (i + ' ');
}
auto t = det->executeCommand(command, std::vector{det_id});
diff --git a/slsDetectorSoftware/src/CmdProxy.h b/slsDetectorSoftware/src/CmdProxy.h
index 16782576c..97f1e636a 100644
--- a/slsDetectorSoftware/src/CmdProxy.h
+++ b/slsDetectorSoftware/src/CmdProxy.h
@@ -235,6 +235,36 @@
return os.str(); \
}
+/** int, set no id, get id */
+#define INTEGER_COMMAND_SET_NOID_GET_ID(CMDNAME, GETFCN, SETFCN, CONV, HLPSTR) \
+ std::string CMDNAME(const int action) { \
+ std::ostringstream os; \
+ os << cmd << ' '; \
+ if (action == slsDetectorDefs::HELP_ACTION) \
+ os << HLPSTR << '\n'; \
+ else if (action == slsDetectorDefs::GET_ACTION) { \
+ if (!args.empty()) { \
+ WrongNumberOfParameters(0); \
+ } \
+ auto t = det->GETFCN(std::vector{det_id}); \
+ os << OutString(t) << '\n'; \
+ } else if (action == slsDetectorDefs::PUT_ACTION) { \
+ if (det_id != -1) { \
+ throw sls::RuntimeError( \
+ "Cannot execute this at module level"); \
+ } \
+ if (args.size() != 1) { \
+ WrongNumberOfParameters(1); \
+ } \
+ auto val = CONV(args[0]); \
+ det->SETFCN(val); \
+ os << args.front() << '\n'; \
+ } else { \
+ throw sls::RuntimeError("Unknown action"); \
+ } \
+ return os.str(); \
+ }
+
/** int, no id */
#define INTEGER_COMMAND_NOID(CMDNAME, GETFCN, SETFCN, CONV, HLPSTR) \
std::string CMDNAME(const int action) { \
@@ -868,6 +898,8 @@ class CmdProxy {
{"zmqport", &CmdProxy::zmqport},
{"rx_zmqip", &CmdProxy::rx_zmqip},
{"zmqip", &CmdProxy::zmqip},
+ {"zmqhwm", &CmdProxy::ZMQHWM},
+ {"rx_zmqhwm", &CmdProxy::rx_zmqhwm},
/* Eiger Specific */
{"subexptime", &CmdProxy::subexptime},
@@ -1071,6 +1103,7 @@ class CmdProxy {
std::string ReceiverHostname(int action);
/* File */
/* ZMQ Streaming Parameters (Receiver<->Client) */
+ std::string ZMQHWM(int action);
/* Eiger Specific */
std::string Threshold(int action);
std::string ThresholdNoTb(int action);
@@ -1692,6 +1725,15 @@ class CmdProxy {
"an intermediate process between receiver and client(gui). Also "
"restarts client zmq streaming if enabled.");
+ INTEGER_COMMAND_SET_NOID_GET_ID(
+ rx_zmqhwm, getRxZmqHwm, setRxZmqHwm, StringTo,
+ "[n_value]\n\tReceiver's zmq send high water mark. Default is the zmq "
+ "library's default (1000). This is a high number and can be set to 2 "
+ "for gui purposes. One must also set the client's receive high water "
+ "mark to similar value. Final effect is sum of them. Also restarts "
+ "receiver zmq streaming if enabled. Can set to -1 to set default "
+ "value.");
+
/* Eiger Specific */
TIME_COMMAND(subexptime, getSubExptime, setSubExptime,
diff --git a/slsDetectorSoftware/src/Detector.cpp b/slsDetectorSoftware/src/Detector.cpp
index 6d0558ebe..296293cd4 100644
--- a/slsDetectorSoftware/src/Detector.cpp
+++ b/slsDetectorSoftware/src/Detector.cpp
@@ -643,7 +643,7 @@ Result Detector::getNumberofUDPInterfaces(Positions pos) const {
}
void Detector::setNumberofUDPInterfaces(int n, Positions pos) {
- int previouslyClientStreaming = pimpl->getDataStreamingToClient();
+ bool previouslyClientStreaming = pimpl->getDataStreamingToClient();
bool useReceiver = getUseReceiverFlag().squash(false);
bool previouslyReceiverStreaming = false;
if (useReceiver) {
@@ -656,7 +656,7 @@ void Detector::setNumberofUDPInterfaces(int n, Positions pos) {
setRxZmqPort(startingPort, -1);
}
// redo the zmq sockets if enabled
- if (previouslyClientStreaming != 0) {
+ if (previouslyClientStreaming) {
pimpl->setDataStreamingToClient(false);
pimpl->setDataStreamingToClient(true);
}
@@ -1077,7 +1077,7 @@ Result Detector::getClientZmqPort(Positions pos) const {
}
void Detector::setClientZmqPort(int port, int module_id) {
- int previouslyClientStreaming = pimpl->getDataStreamingToClient();
+ bool previouslyClientStreaming = pimpl->getDataStreamingToClient();
if (module_id == -1) {
std::vector port_list = getPortNumbers(port);
for (int idet = 0; idet < size(); ++idet) {
@@ -1087,7 +1087,7 @@ void Detector::setClientZmqPort(int port, int module_id) {
} else {
pimpl->Parallel(&Module::setClientStreamingPort, {module_id}, port);
}
- if (previouslyClientStreaming != 0) {
+ if (previouslyClientStreaming) {
pimpl->setDataStreamingToClient(false);
pimpl->setDataStreamingToClient(true);
}
@@ -1098,14 +1098,33 @@ Result Detector::getClientZmqIp(Positions pos) const {
}
void Detector::setClientZmqIp(const IpAddr ip, Positions pos) {
- int previouslyClientStreaming = pimpl->getDataStreamingToClient();
+ bool previouslyClientStreaming = pimpl->getDataStreamingToClient();
pimpl->Parallel(&Module::setClientStreamingIP, pos, ip);
- if (previouslyClientStreaming != 0) {
+ if (previouslyClientStreaming) {
pimpl->setDataStreamingToClient(false);
pimpl->setDataStreamingToClient(true);
}
}
+int Detector::getClientZmqHwm() const { return pimpl->getClientStreamingHwm(); }
+
+void Detector::setClientZmqHwm(const int limit) {
+ pimpl->setClientStreamingHwm(limit);
+}
+
+Result Detector::getRxZmqHwm(Positions pos) const {
+ return pimpl->Parallel(&Module::getReceiverStreamingHwm, pos);
+}
+
+void Detector::setRxZmqHwm(const int limit) {
+ bool previouslyReceiverStreaming = getRxZmqDataStream().squash(false);
+ pimpl->Parallel(&Module::setReceiverStreamingHwm, {}, limit);
+ if (previouslyReceiverStreaming) {
+ setRxZmqDataStream(false, {});
+ setRxZmqDataStream(true, {});
+ }
+}
+
// Eiger Specific
Result Detector::getSubExptime(Positions pos) const {
diff --git a/slsDetectorSoftware/src/DetectorImpl.cpp b/slsDetectorSoftware/src/DetectorImpl.cpp
index 6757b32e4..eb29524b6 100644
--- a/slsDetectorSoftware/src/DetectorImpl.cpp
+++ b/slsDetectorSoftware/src/DetectorImpl.cpp
@@ -158,6 +158,8 @@ void DetectorImpl::initializeDetectorStructure() {
multi_shm()->acquiringFlag = false;
multi_shm()->initialChecks = true;
multi_shm()->gapPixels = false;
+ // zmqlib default
+ multi_shm()->zmqHwm = -1;
}
void DetectorImpl::initializeMembers(bool verify) {
@@ -377,16 +379,17 @@ void DetectorImpl::setGapPixelsinCallback(const bool enable) {
multi_shm()->gapPixels = enable;
}
-int DetectorImpl::createReceivingDataSockets(const bool destroy) {
- if (destroy) {
- LOG(logINFO) << "Going to destroy data sockets";
- // close socket
- zmqSocket.clear();
+int DetectorImpl::destroyReceivingDataSockets() {
+ LOG(logINFO) << "Going to destroy data sockets";
+ // close socket
+ zmqSocket.clear();
- client_downstream = false;
- LOG(logINFO) << "Destroyed Receiving Data Socket(s)";
- return OK;
- }
+ client_downstream = false;
+ LOG(logINFO) << "Destroyed Receiving Data Socket(s)";
+ return OK;
+}
+
+int DetectorImpl::createReceivingDataSockets() {
if (client_downstream) {
return OK;
}
@@ -417,11 +420,21 @@ int DetectorImpl::createReceivingDataSockets(const bool destroy) {
.str()
.c_str(),
portnum));
+ // set high water mark
+ int hwm = multi_shm()->zmqHwm;
+ if (hwm >= 0) {
+ zmqSocket[iSocket]->SetReceiveHighWaterMark(hwm);
+ if (zmqSocket[iSocket]->GetReceiveHighWaterMark() != hwm) {
+ throw sls::ZmqSocketError("Could not set zmq rcv hwm to " +
+ std::to_string(hwm));
+ }
+ }
LOG(logINFO) << "Zmq Client[" << iSocket << "] at "
- << zmqSocket.back()->GetZmqServerAddress();
+ << zmqSocket.back()->GetZmqServerAddress() << "[hwm: "
+ << zmqSocket.back()->GetReceiveHighWaterMark() << "]";
} catch (...) {
LOG(logERROR) << "Could not create Zmq socket on port " << portnum;
- createReceivingDataSockets(true);
+ destroyReceivingDataSockets();
return FAIL;
}
}
@@ -449,12 +462,12 @@ void DetectorImpl::readFrameFromReceiver() {
}
std::vector runningList(zmqSocket.size());
std::vector connectList(zmqSocket.size());
- int numRunning = 0;
+ numZmqRunning = 0;
for (size_t i = 0; i < zmqSocket.size(); ++i) {
if (zmqSocket[i]->Connect() == 0) {
connectList[i] = true;
runningList[i] = true;
- ++numRunning;
+ ++numZmqRunning;
} else {
// to remember the list it connected to, to disconnect later
connectList[i] = false;
@@ -480,14 +493,14 @@ void DetectorImpl::readFrameFromReceiver() {
uint32_t currentSubFrameIndex = -1, coordX = -1, coordY = -1,
flippedDataX = -1;
- while (numRunning != 0) {
+ while (numZmqRunning != 0) {
// reset data
data = false;
if (multiframe != nullptr) {
memset(multiframe.get(), 0xFF, multisize);
}
- completeImage = (numRunning == (int)zmqSocket.size());
+ completeImage = (numZmqRunning == (int)zmqSocket.size());
// get each frame
for (unsigned int isocket = 0; isocket < zmqSocket.size(); ++isocket) {
@@ -505,7 +518,7 @@ void DetectorImpl::readFrameFromReceiver() {
// socket
runningList[isocket] = false;
completeImage = false;
- --numRunning;
+ --numZmqRunning;
continue;
}
@@ -967,7 +980,7 @@ bool DetectorImpl::getDataStreamingToClient() { return client_downstream; }
void DetectorImpl::setDataStreamingToClient(bool enable) {
// destroy data threads
if (!enable) {
- createReceivingDataSockets(true);
+ destroyReceivingDataSockets();
// create data threads
} else {
if (createReceivingDataSockets() == FAIL) {
@@ -976,6 +989,51 @@ void DetectorImpl::setDataStreamingToClient(bool enable) {
}
}
+int DetectorImpl::getClientStreamingHwm() const {
+ // disabled
+ if (!client_downstream) {
+ return multi_shm()->zmqHwm;
+ }
+ // enabled
+ sls::Result result;
+ result.reserve(zmqSocket.size());
+ for (auto &it : zmqSocket) {
+ result.push_back(it->GetReceiveHighWaterMark());
+ }
+ int res = result.tsquash("Inconsistent zmq receive hwm values");
+ return res;
+}
+
+void DetectorImpl::setClientStreamingHwm(const int limit) {
+ if (limit < -1) {
+ throw sls::RuntimeError(
+ "Cannot set hwm to less than -1 (-1 is lib default).");
+ }
+ // update shm
+ multi_shm()->zmqHwm = limit;
+
+ // streaming enabled
+ if (client_downstream) {
+ // custom limit, set it directly
+ if (limit >= 0) {
+ for (auto &it : zmqSocket) {
+ it->SetReceiveHighWaterMark(limit);
+ if (it->GetReceiveHighWaterMark() != limit) {
+ multi_shm()->zmqHwm = -1;
+ throw sls::ZmqSocketError("Could not set zmq rcv hwm to " +
+ std::to_string(limit));
+ }
+ }
+ LOG(logINFO) << "Setting Client Zmq socket rcv hwm to " << limit;
+ }
+ // default, disable and enable to get default
+ else {
+ setDataStreamingToClient(false);
+ setDataStreamingToClient(true);
+ }
+ }
+}
+
void DetectorImpl::registerAcquisitionFinishedCallback(void (*func)(double, int,
void *),
void *pArg) {
@@ -1042,6 +1100,12 @@ int DetectorImpl::acquire() {
if (dataReady == nullptr) {
setJoinThreadFlag(true);
}
+ if (receiver) {
+ while (numZmqRunning != 0) {
+ Parallel(&Module::restreamStopFromReceiver, {});
+ std::this_thread::sleep_for(std::chrono::milliseconds(200));
+ }
+ }
dataProcessingThread.join();
if (acquisition_finished != nullptr) {
diff --git a/slsDetectorSoftware/src/DetectorImpl.h b/slsDetectorSoftware/src/DetectorImpl.h
index a0f7144af..1e4ae1865 100644
--- a/slsDetectorSoftware/src/DetectorImpl.h
+++ b/slsDetectorSoftware/src/DetectorImpl.h
@@ -16,7 +16,7 @@ class detectorData;
#include
#define MULTI_SHMAPIVERSION 0x190809
-#define MULTI_SHMVERSION 0x200319
+#define MULTI_SHMVERSION 0x201007
#define SHORT_STRING_LENGTH 50
#include
@@ -62,6 +62,8 @@ struct sharedMultiSlsDetector {
bool acquiringFlag;
bool initialChecks;
bool gapPixels;
+ /** high water mark of listening tcp port (only data) */
+ int zmqHwm;
};
class DetectorImpl : public virtual slsDetectorDefs {
@@ -240,6 +242,8 @@ class DetectorImpl : public virtual slsDetectorDefs {
bool getDataStreamingToClient();
void setDataStreamingToClient(bool enable);
+ int getClientStreamingHwm() const;
+ void setClientStreamingHwm(const int limit);
/**
* register callback for accessing acquisition final data
@@ -324,12 +328,8 @@ class DetectorImpl : public virtual slsDetectorDefs {
void updateDetectorSize();
- /**
- * Create Receiving Data Sockets
- * @param destroy is true to destroy all the sockets
- * @returns OK or FAIL
- */
- int createReceivingDataSockets(const bool destroy = false);
+ int destroyReceivingDataSockets();
+ int createReceivingDataSockets();
/**
* Reads frames from receiver through a constant socket
@@ -387,6 +387,9 @@ class DetectorImpl : public virtual slsDetectorDefs {
/** ZMQ Socket - Receiver to Client */
std::vector> zmqSocket;
+ /** number of zmq sockets running currently */
+ volatile int numZmqRunning{0};
+
/** mutex to synchronize main and data processing threads */
mutable std::mutex mp;
diff --git a/slsDetectorSoftware/src/Module.cpp b/slsDetectorSoftware/src/Module.cpp
index 04c8835a3..1cce4a841 100644
--- a/slsDetectorSoftware/src/Module.cpp
+++ b/slsDetectorSoftware/src/Module.cpp
@@ -431,6 +431,10 @@ void Module::stopAcquisition() {
}
}
+void Module::restreamStopFromReceiver() {
+ sendToReceiver(F_RESTREAM_STOP_FROM_RECEIVER);
+}
+
void Module::startAndReadAll() {
shm()->stoppedFlag = false;
sendToDetector(F_START_AND_READ_ALL);
@@ -1041,6 +1045,14 @@ void Module::setClientStreamingIP(const sls::IpAddr ip) {
shm()->zmqip = ip;
}
+int Module::getReceiverStreamingHwm() const {
+ return sendToReceiver(F_GET_RECEIVER_STREAMING_HWM);
+}
+
+void Module::setReceiverStreamingHwm(const int limit) {
+ sendToReceiver(F_SET_RECEIVER_STREAMING_HWM, limit, nullptr);
+}
+
// Eiger Specific
int64_t Module::getSubExptime() const {
@@ -2781,10 +2793,6 @@ void Module::checkReceiverVersionCompatibility() {
sendToReceiver(F_RECEIVER_CHECK_VERSION, int64_t(APIRECEIVER), nullptr);
}
-void Module::restreamStopFromReceiver() {
- sendToReceiver(F_RESTREAM_STOP_FROM_RECEIVER);
-}
-
int Module::sendModule(sls_detector_module *myMod, sls::ClientSocket &client) {
constexpr TLogLevel level = logDEBUG1;
LOG(level) << "Sending Module";
diff --git a/slsDetectorSoftware/src/Module.h b/slsDetectorSoftware/src/Module.h
index c045f2b45..2d8ae07af 100644
--- a/slsDetectorSoftware/src/Module.h
+++ b/slsDetectorSoftware/src/Module.h
@@ -161,6 +161,7 @@ class Module : public virtual slsDetectorDefs {
void stopReceiver();
void startAcquisition();
void stopAcquisition();
+ void restreamStopFromReceiver();
void startAndReadAll();
runStatus getRunStatus() const;
runStatus getReceiverStatus() const;
@@ -290,6 +291,8 @@ class Module : public virtual slsDetectorDefs {
void setClientStreamingPort(int port);
sls::IpAddr getClientStreamingIP() const;
void setClientStreamingIP(const sls::IpAddr ip);
+ int getReceiverStreamingHwm() const;
+ void setReceiverStreamingHwm(const int limit);
/**************************************************
* *
@@ -665,7 +668,6 @@ class Module : public virtual slsDetectorDefs {
void checkDetectorVersionCompatibility();
void checkReceiverVersionCompatibility();
- void restreamStopFromReceiver();
void setModule(sls_detector_module &module, bool trimbits = true);
int sendModule(sls_detector_module *myMod, sls::ClientSocket &client);
void updateReceiverStreamingIP();
diff --git a/slsReceiverSoftware/src/ClientInterface.cpp b/slsReceiverSoftware/src/ClientInterface.cpp
index 756dd3968..71b5ab4d0 100644
--- a/slsReceiverSoftware/src/ClientInterface.cpp
+++ b/slsReceiverSoftware/src/ClientInterface.cpp
@@ -206,6 +206,8 @@ int ClientInterface::functionTable(){
flist[F_SET_RECEIVER_RATE_CORRECT] = &ClientInterface::set_rate_correct;
flist[F_SET_RECEIVER_SCAN] = &ClientInterface::set_scan;
flist[F_RECEIVER_SET_THRESHOLD] = &ClientInterface::set_threshold;
+ flist[F_GET_RECEIVER_STREAMING_HWM] = &ClientInterface::get_streaming_hwm;
+ flist[F_SET_RECEIVER_STREAMING_HWM] = &ClientInterface::set_streaming_hwm;
for (int i = NUM_DET_FUNCTIONS + 1; i < NUM_REC_FUNCTIONS ; i++) {
LOG(logDEBUG1) << "function fnum: " << i << " (" <<
@@ -1657,4 +1659,21 @@ int ClientInterface::set_threshold(Interface &socket) {
verifyIdle(socket);
impl()->setThresholdEnergy(arg);
return socket.Send(OK);
-}
\ No newline at end of file
+}
+
+int ClientInterface::get_streaming_hwm(Interface &socket) {
+ int retval = impl()->getStreamingHwm();
+ LOG(logDEBUG1) << "zmq send hwm limit:" << retval;
+ return socket.sendResult(retval);
+}
+
+int ClientInterface::set_streaming_hwm(Interface &socket) {
+ auto limit = socket.Receive();
+ if (limit < -1) {
+ throw RuntimeError("Invalid zmq send hwm limit " +
+ std::to_string(limit));
+ }
+ verifyIdle(socket);
+ impl()->setStreamingHwm(limit);
+ return socket.Send(OK);
+}
diff --git a/slsReceiverSoftware/src/ClientInterface.h b/slsReceiverSoftware/src/ClientInterface.h
index b4acc1239..391ff0a16 100644
--- a/slsReceiverSoftware/src/ClientInterface.h
+++ b/slsReceiverSoftware/src/ClientInterface.h
@@ -159,6 +159,8 @@ class ClientInterface : private virtual slsDetectorDefs {
int set_rate_correct(sls::ServerInterface &socket);
int set_scan(sls::ServerInterface &socket);
int set_threshold(sls::ServerInterface &socket);
+ int get_streaming_hwm(sls::ServerInterface &socket);
+ int set_streaming_hwm(sls::ServerInterface &socket);
Implementation *impl() {
if (receiver != nullptr) {
diff --git a/slsReceiverSoftware/src/DataStreamer.cpp b/slsReceiverSoftware/src/DataStreamer.cpp
index 74d4e37a6..2549cabfc 100644
--- a/slsReceiverSoftware/src/DataStreamer.cpp
+++ b/slsReceiverSoftware/src/DataStreamer.cpp
@@ -75,18 +75,28 @@ void DataStreamer::SetAdditionalJsonHeader(
}
void DataStreamer::CreateZmqSockets(int *nunits, uint32_t port,
- const sls::IpAddr ip) {
+ const sls::IpAddr ip, int hwm) {
uint32_t portnum = port + index;
std::string sip = ip.str();
try {
zmqSocket = new ZmqSocket(portnum, (ip != 0 ? sip.c_str() : nullptr));
+ // set if custom
+ if (hwm >= 0) {
+ zmqSocket->SetSendHighWaterMark(hwm);
+ if (zmqSocket->GetSendHighWaterMark() != hwm) {
+ throw sls::RuntimeError(
+ "Could not set zmq send high water mark to " +
+ std::to_string(hwm));
+ }
+ }
} catch (...) {
LOG(logERROR) << "Could not create Zmq socket on port " << portnum
<< " for Streamer " << index;
throw;
}
LOG(logINFO) << index << " Streamer: Zmq Server started at "
- << zmqSocket->GetZmqServerAddress();
+ << zmqSocket->GetZmqServerAddress()
+ << "[hwm: " << zmqSocket->GetSendHighWaterMark() << "]";
}
void DataStreamer::CloseZmqSocket() {
diff --git a/slsReceiverSoftware/src/DataStreamer.h b/slsReceiverSoftware/src/DataStreamer.h
index be4908a6f..f04bfc4ac 100644
--- a/slsReceiverSoftware/src/DataStreamer.h
+++ b/slsReceiverSoftware/src/DataStreamer.h
@@ -86,8 +86,10 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
* @param nunits pointer to number of theads/ units per detector
* @param port streaming port start index
* @param ip streaming source ip
+ * @param hwm streaming high water mark
*/
- void CreateZmqSockets(int *nunits, uint32_t port, const sls::IpAddr ip);
+ void CreateZmqSockets(int *nunits, uint32_t port, const sls::IpAddr ip,
+ int hwm);
/**
* Shuts down and deletes Zmq Sockets
diff --git a/slsReceiverSoftware/src/Implementation.cpp b/slsReceiverSoftware/src/Implementation.cpp
index 0a9b672c2..4ebdd68f0 100644
--- a/slsReceiverSoftware/src/Implementation.cpp
+++ b/slsReceiverSoftware/src/Implementation.cpp
@@ -876,7 +876,8 @@ void Implementation::setNumberofUDPInterfaces(const int n) {
(int *)nd, &quadEnable, &numberOfTotalFrames));
dataStreamer[i]->SetGeneralData(generalData);
dataStreamer[i]->CreateZmqSockets(
- &numThreads, streamingPort, streamingSrcIP);
+ &numThreads, streamingPort, streamingSrcIP,
+ streamingHwm);
dataStreamer[i]->SetAdditionalJsonHeader(
additionalJsonHeader);
@@ -1004,7 +1005,8 @@ void Implementation::setDataStreamEnable(const bool enable) {
(int *)nd, &quadEnable, &numberOfTotalFrames));
dataStreamer[i]->SetGeneralData(generalData);
dataStreamer[i]->CreateZmqSockets(
- &numThreads, streamingPort, streamingSrcIP);
+ &numThreads, streamingPort, streamingSrcIP,
+ streamingHwm);
dataStreamer[i]->SetAdditionalJsonHeader(
additionalJsonHeader);
} catch (...) {
@@ -1063,6 +1065,14 @@ void Implementation::setStreamingSourceIP(const sls::IpAddr ip) {
LOG(logINFO) << "Streaming Source IP: " << streamingSrcIP;
}
+int Implementation::getStreamingHwm() const { return streamingHwm; }
+
+void Implementation::setStreamingHwm(const int i) {
+ streamingHwm = i;
+ LOG(logINFO) << "Streaming Hwm: "
+ << (i == -1 ? "Default (-1)" : std::to_string(streamingHwm));
+}
+
std::map
Implementation::getAdditionalJsonHeader() const {
return additionalJsonHeader;
diff --git a/slsReceiverSoftware/src/Implementation.h b/slsReceiverSoftware/src/Implementation.h
index 5563afa1e..4dd47c98d 100644
--- a/slsReceiverSoftware/src/Implementation.h
+++ b/slsReceiverSoftware/src/Implementation.h
@@ -130,6 +130,8 @@ class Implementation : private virtual slsDetectorDefs {
void setStreamingPort(const uint32_t i);
sls::IpAddr getStreamingSourceIP() const;
void setStreamingSourceIP(const sls::IpAddr ip);
+ int getStreamingHwm() const;
+ void setStreamingHwm(const int i);
std::map getAdditionalJsonHeader() const;
void setAdditionalJsonHeader(const std::map &c);
std::string getAdditionalJsonParameter(const std::string &key) const;
@@ -312,6 +314,7 @@ class Implementation : private virtual slsDetectorDefs {
uint32_t streamingStartFnum{0};
uint32_t streamingPort{0};
sls::IpAddr streamingSrcIP = sls::IpAddr{};
+ int streamingHwm{-1};
std::map additionalJsonHeader;
// detector parameters
diff --git a/slsSupportLib/include/ZmqSocket.h b/slsSupportLib/include/ZmqSocket.h
index ae561afe9..ab9000bb4 100644
--- a/slsSupportLib/include/ZmqSocket.h
+++ b/slsSupportLib/include/ZmqSocket.h
@@ -93,6 +93,18 @@ class ZmqSocket {
*/
ZmqSocket(const uint32_t portnumber, const char *ethip);
+ /** Returns high water mark for outbound messages */
+ int GetSendHighWaterMark();
+
+ /** Sets high water mark for outbound messages. Default 1000 (zmqlib) */
+ void SetSendHighWaterMark(int limit);
+
+ /** Returns high water mark for inbound messages */
+ int GetReceiveHighWaterMark();
+
+ /** Sets high water mark for inbound messages. Default 1000 (zmqlib) */
+ void SetReceiveHighWaterMark(int limit);
+
/**
* Returns Port Number
* @returns Port Number
diff --git a/slsSupportLib/include/sls_detector_funcs.h b/slsSupportLib/include/sls_detector_funcs.h
index 8ba7fa39e..3d38dccf1 100755
--- a/slsSupportLib/include/sls_detector_funcs.h
+++ b/slsSupportLib/include/sls_detector_funcs.h
@@ -315,6 +315,8 @@ enum detFuncs {
F_SET_RECEIVER_RATE_CORRECT,
F_SET_RECEIVER_SCAN,
F_RECEIVER_SET_THRESHOLD,
+ F_GET_RECEIVER_STREAMING_HWM,
+ F_SET_RECEIVER_STREAMING_HWM,
NUM_REC_FUNCTIONS
};
@@ -631,6 +633,9 @@ const char* getFunctionNameFromEnum(enum detFuncs func) {
case F_SET_RECEIVER_RATE_CORRECT: return "F_SET_RECEIVER_RATE_CORRECT";
case F_SET_RECEIVER_SCAN: return "F_SET_RECEIVER_SCAN";
case F_RECEIVER_SET_THRESHOLD: return "F_RECEIVER_SET_THRESHOLD";
+ case F_GET_RECEIVER_STREAMING_HWM: return "F_GET_RECEIVER_STREAMING_HWM";
+ case F_SET_RECEIVER_STREAMING_HWM: return "F_SET_RECEIVER_STREAMING_HWM";
+
case NUM_REC_FUNCTIONS: return "NUM_REC_FUNCTIONS";
default: return "Unknown Function";
diff --git a/slsSupportLib/src/ZmqSocket.cpp b/slsSupportLib/src/ZmqSocket.cpp
index 54b6970a4..0a5d97217 100644
--- a/slsSupportLib/src/ZmqSocket.cpp
+++ b/slsSupportLib/src/ZmqSocket.cpp
@@ -48,6 +48,8 @@ ZmqSocket::ZmqSocket(const char *const hostname_or_ip,
PrintError();
throw sls::ZmqSocketError("Could not set ZMQ_LINGER");
}
+ LOG(logDEBUG) << "Default receive high water mark:"
+ << GetReceiveHighWaterMark();
}
ZmqSocket::ZmqSocket(const uint32_t portnumber, const char *ethip)
@@ -63,6 +65,7 @@ ZmqSocket::ZmqSocket(const uint32_t portnumber, const char *ethip)
PrintError();
throw sls::ZmqSocketError("Could not create socket");
}
+ LOG(logDEBUG) << "Default send high water mark:" << GetSendHighWaterMark();
// construct address, can be refactored with libfmt
std::ostringstream oss;
@@ -79,6 +82,44 @@ ZmqSocket::ZmqSocket(const uint32_t portnumber, const char *ethip)
std::this_thread::sleep_for(std::chrono::milliseconds(200));
};
+int ZmqSocket::GetSendHighWaterMark() {
+ int value = 0;
+ size_t value_size = sizeof(value);
+ if (zmq_getsockopt(sockfd.socketDescriptor, ZMQ_SNDHWM, &value,
+ &value_size)) {
+ PrintError();
+ throw sls::ZmqSocketError("Could not get ZMQ_SNDHWM");
+ }
+ return value;
+}
+
+void ZmqSocket::SetSendHighWaterMark(int limit) {
+ if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_SNDHWM, &limit,
+ sizeof(limit))) {
+ PrintError();
+ throw sls::ZmqSocketError("Could not set ZMQ_SNDHWM");
+ }
+}
+
+int ZmqSocket::GetReceiveHighWaterMark() {
+ int value = 0;
+ size_t value_size = sizeof(value);
+ if (zmq_getsockopt(sockfd.socketDescriptor, ZMQ_RCVHWM, &value,
+ &value_size)) {
+ PrintError();
+ throw sls::ZmqSocketError("Could not get ZMQ_SNDHWM");
+ }
+ return value;
+}
+
+void ZmqSocket::SetReceiveHighWaterMark(int limit) {
+ if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_RCVHWM, &limit,
+ sizeof(limit))) {
+ PrintError();
+ throw sls::ZmqSocketError("Could not set ZMQ_SNDHWM");
+ }
+}
+
int ZmqSocket::Connect() {
if (zmq_connect(sockfd.socketDescriptor, sockfd.serverAddress.c_str())) {
PrintError();