mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-03 18:04:12 +02:00
Make use of common connect function in Pulse Receiver
This commit is contained in:
@@ -10,16 +10,16 @@
|
||||
|
||||
class ZmqPulseReceiver {
|
||||
|
||||
const std::vector<std::string> ipc_urls_;
|
||||
const int n_modules_;
|
||||
void* ctx_;
|
||||
const int n_modules_;
|
||||
|
||||
std::vector<void*> sockets_;
|
||||
|
||||
void* connect_socket(const std::string url);
|
||||
|
||||
public:
|
||||
ZmqPulseReceiver(const std::vector<std::string>& ipc_urls, void* ctx);
|
||||
ZmqPulseReceiver(
|
||||
void* ctx,
|
||||
const std::string& detector_name,
|
||||
const int n_modules);
|
||||
~ZmqPulseReceiver();
|
||||
|
||||
uint64_t get_next_pulse_id() const;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
#include "ZmqPulseReceiver.hpp"
|
||||
#include "BufferUtils.hpp"
|
||||
|
||||
#include <zmq.h>
|
||||
#include <stdexcept>
|
||||
@@ -16,16 +17,17 @@ using namespace stream_config;
|
||||
|
||||
|
||||
ZmqPulseReceiver::ZmqPulseReceiver(
|
||||
const vector<string>& ipc_urls,
|
||||
void* ctx) :
|
||||
ipc_urls_(ipc_urls),
|
||||
n_modules_(ipc_urls_.size()),
|
||||
ctx_(ctx)
|
||||
void * ctx,
|
||||
const string& detector_name,
|
||||
const int n_modules) :
|
||||
ctx_(ctx),
|
||||
n_modules_(n_modules)
|
||||
{
|
||||
sockets_.reserve(ipc_urls_.size());
|
||||
sockets_.reserve(n_modules_);
|
||||
|
||||
for (const auto& url : ipc_urls_) {
|
||||
sockets_.push_back(connect_socket(url));
|
||||
for (int i=0; i<n_modules_; i++) {
|
||||
sockets_.push_back(
|
||||
BufferUtils::connect_socket(ctx_, detector_name, i));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,34 +38,6 @@ ZmqPulseReceiver::~ZmqPulseReceiver()
|
||||
}
|
||||
}
|
||||
|
||||
void* ZmqPulseReceiver::connect_socket(const string url)
|
||||
{
|
||||
void* socket = zmq_socket(ctx_, ZMQ_SUB);
|
||||
if (socket == nullptr) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
int rcvhwm = STREAM_RCVHWM;
|
||||
if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
int linger = 0;
|
||||
if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
if (zmq_connect(socket, url.c_str()) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
if (zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
return socket;
|
||||
}
|
||||
|
||||
uint64_t ZmqPulseReceiver::get_next_pulse_id() const
|
||||
{
|
||||
uint64_t pulses[n_modules_];
|
||||
@@ -100,7 +74,7 @@ uint64_t ZmqPulseReceiver::get_next_pulse_id() const
|
||||
err_msg << " max_diff=" << max_diff << " pulses.";
|
||||
|
||||
for (int i = 0; i < n_modules_; i++) {
|
||||
err_msg << " (" << ipc_urls_[i] << ", ";
|
||||
err_msg << " (module " << i << ", ";
|
||||
err_msg << pulses[i] << "),";
|
||||
}
|
||||
err_msg << endl;
|
||||
|
||||
Reference in New Issue
Block a user