mirror of
https://github.com/slsdetectorgroup/slsDetectorPackage.git
synced 2025-06-23 10:07:59 +02:00
Jf zeromq display (#644)
* modified ZmqSocket to expose the SO_RCVBUF and SO_SENDBUF parameters. Modified DataStreamer.cpp to change the SENDBUF to 1MB when HWL is <10. Modified Datastreamer.cpp so that when HWL is changed, socket is unbind/rebind. Added rebind to ZmqSocket.cpp. Changed slot UpdatePlot() in qdrawplot.h from privat tto public, added plot->UpdatePlot() after every axis range change, so the plots are updated immediatly and not at the next image. Added onlinedisp_zmq program which connects to the receiver ZMQ port and show images and histos. Compiled against ROOT 6.22/02. Added examples files. * added setbuffer size also for gui, moved hardcoded values to a macro, removed unnecessary return of ok or success, added actual zmq exception message to could not create sockets error * zmq: changing buffer size done within hwm --------- Co-authored-by: mozzanica <l_mozzanica@mpc2012.psi.ch> Co-authored-by: Dhanya Thattil <dhanya.thattil@psi.ch>
This commit is contained in:
@ -31,6 +31,10 @@ namespace sls {
|
||||
// #define ZMQ_DETAIL
|
||||
#define ROIVERBOSITY
|
||||
|
||||
// high water mark for gui
|
||||
#define DEFFAULT_LOW_HWM (25)
|
||||
#define DEFAULT_LOW_HWM_BUFFERSIZE (1024 * 1024) // 1MB
|
||||
|
||||
/** zmq header structure */
|
||||
struct zmqHeader {
|
||||
/** true if incoming data, false if end of acquisition */
|
||||
@ -108,15 +112,29 @@ class ZmqSocket {
|
||||
/** Returns high water mark for outbound messages */
|
||||
int GetSendHighWaterMark();
|
||||
|
||||
/** Sets high water mark for outbound messages. Default 1000 (zmqlib) */
|
||||
/** Sets high water mark for outbound messages. Default 1000 (zmqlib). Also
|
||||
* changes send buffer size depending on low hwm. Must rebind. */
|
||||
void SetSendHighWaterMark(int limit);
|
||||
|
||||
/** Returns high water mark for inbound messages */
|
||||
int GetReceiveHighWaterMark();
|
||||
|
||||
/** Sets high water mark for inbound messages. Default 1000 (zmqlib) */
|
||||
/** Sets high water mark for inbound messages. Default 1000 (zmqlib). Also
|
||||
* changes receiver buffer size depending on low hwm. Must reconnect */
|
||||
void SetReceiveHighWaterMark(int limit);
|
||||
|
||||
/** Gets kernel buffer for outbound messages. Default 0 (os) */
|
||||
int GetSendBuffer();
|
||||
|
||||
/** Sets kernel buffer for outbound messages. Default 0 (os) */
|
||||
void SetSendBuffer(int limit);
|
||||
|
||||
/** Gets kernel buffer for inbound messages. Default 0 (os) */
|
||||
int GetReceiveBuffer();
|
||||
|
||||
/** Sets kernel buffer for inbound messages. Default 0 (os) */
|
||||
void SetReceiveBuffer(int limit);
|
||||
|
||||
/**
|
||||
* Returns Port Number
|
||||
* @returns Port Number
|
||||
@ -129,6 +147,9 @@ class ZmqSocket {
|
||||
*/
|
||||
std::string GetZmqServerAddress() { return sockfd.serverAddress; }
|
||||
|
||||
/** unbinds and rebind, to apply changes of HWM */
|
||||
void Rebind();
|
||||
|
||||
/**
|
||||
* Connect client socket to server socket
|
||||
* @returns 1 for fail, 0 for success
|
||||
|
@ -102,6 +102,13 @@ void ZmqSocket::SetSendHighWaterMark(int limit) {
|
||||
PrintError();
|
||||
throw ZmqSocketError("Could not set ZMQ_SNDHWM");
|
||||
}
|
||||
if (GetSendHighWaterMark() != limit) {
|
||||
throw ZmqSocketError("Could not set ZMQ_SNDHWM to " +
|
||||
std::to_string(limit));
|
||||
}
|
||||
if (limit < DEFFAULT_LOW_HWM) {
|
||||
SetSendBuffer(DEFAULT_LOW_HWM_BUFFERSIZE);
|
||||
}
|
||||
}
|
||||
|
||||
int ZmqSocket::GetReceiveHighWaterMark() {
|
||||
@ -110,7 +117,7 @@ int ZmqSocket::GetReceiveHighWaterMark() {
|
||||
if (zmq_getsockopt(sockfd.socketDescriptor, ZMQ_RCVHWM, &value,
|
||||
&value_size)) {
|
||||
PrintError();
|
||||
throw ZmqSocketError("Could not get ZMQ_SNDHWM");
|
||||
throw ZmqSocketError("Could not get ZMQ_RCVHWM");
|
||||
}
|
||||
return value;
|
||||
}
|
||||
@ -119,7 +126,75 @@ void ZmqSocket::SetReceiveHighWaterMark(int limit) {
|
||||
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_RCVHWM, &limit,
|
||||
sizeof(limit))) {
|
||||
PrintError();
|
||||
throw ZmqSocketError("Could not set ZMQ_SNDHWM");
|
||||
throw ZmqSocketError("Could not set ZMQ_RCVHWM");
|
||||
}
|
||||
if (GetReceiveHighWaterMark() != limit) {
|
||||
throw ZmqSocketError("Could not set ZMQ_RCVHWM to " +
|
||||
std::to_string(limit));
|
||||
}
|
||||
if (limit < DEFFAULT_LOW_HWM) {
|
||||
SetReceiveBuffer(DEFAULT_LOW_HWM_BUFFERSIZE);
|
||||
}
|
||||
}
|
||||
|
||||
int ZmqSocket::GetSendBuffer() {
|
||||
int value = 0;
|
||||
size_t value_size = sizeof(value);
|
||||
if (zmq_getsockopt(sockfd.socketDescriptor, ZMQ_SNDBUF, &value,
|
||||
&value_size)) {
|
||||
PrintError();
|
||||
throw ZmqSocketError("Could not get ZMQ_SNDBUF");
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
void ZmqSocket::SetSendBuffer(int limit) {
|
||||
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_SNDBUF, &limit,
|
||||
sizeof(limit))) {
|
||||
PrintError();
|
||||
throw ZmqSocketError("Could not set ZMQ_SNDBUF");
|
||||
}
|
||||
if (GetSendBuffer() != limit) {
|
||||
throw ZmqSocketError("Could not set ZMQ_SNDBUF to " +
|
||||
std::to_string(limit));
|
||||
}
|
||||
}
|
||||
|
||||
int ZmqSocket::GetReceiveBuffer() {
|
||||
int value = 0;
|
||||
size_t value_size = sizeof(value);
|
||||
if (zmq_getsockopt(sockfd.socketDescriptor, ZMQ_RCVBUF, &value,
|
||||
&value_size)) {
|
||||
PrintError();
|
||||
throw ZmqSocketError("Could not get ZMQ_RCVBUF");
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
void ZmqSocket::SetReceiveBuffer(int limit) {
|
||||
if (zmq_setsockopt(sockfd.socketDescriptor, ZMQ_RCVBUF, &limit,
|
||||
sizeof(limit))) {
|
||||
PrintError();
|
||||
throw ZmqSocketError("Could not set ZMQ_RCVBUF");
|
||||
}
|
||||
if (GetReceiveBuffer() != limit) {
|
||||
throw ZmqSocketError("Could not set ZMQ_RCVBUF to " +
|
||||
std::to_string(limit));
|
||||
}
|
||||
}
|
||||
|
||||
void ZmqSocket::Rebind() { // the purpose is to apply HWL changes, which are
|
||||
// frozen at bind, which is in the constructor.
|
||||
|
||||
// unbbind
|
||||
if (zmq_unbind(sockfd.socketDescriptor, sockfd.serverAddress.c_str())) {
|
||||
PrintError();
|
||||
throw ZmqSocketError("Could not unbind socket");
|
||||
}
|
||||
// bind address
|
||||
if (zmq_bind(sockfd.socketDescriptor, sockfd.serverAddress.c_str())) {
|
||||
PrintError();
|
||||
throw ZmqSocketError("Could not bind socket");
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user