Implement simple ReplayZmqSender

This commit is contained in:
2020-05-25 16:01:53 +02:00
parent 8cd5ea2482
commit cd003d010d
2 changed files with 70 additions and 0 deletions
+22
View File
@@ -0,0 +1,22 @@
#ifndef SF_DAQ_BUFFER_REPLAYZMQSENDER_HPP
#define SF_DAQ_BUFFER_REPLAYZMQSENDER_HPP
#include <string>
#include <jungfrau.hpp>
class ReplayZmqSender {
void* ctx_;
void* socket_;
public:
ReplayZmqSender(const std::string& ipc_id, const int source_id);
virtual ~ReplayZmqSender();
void close();
void send(const ModuleFrame* metadata, const char* data);
};
#endif //SF_DAQ_BUFFER_REPLAYZMQSENDER_HPP
+48
View File
@@ -0,0 +1,48 @@
#include "ReplayZmqSender.hpp"
#include <sstream>
#include <zmq.h>
#include "buffer_config.hpp"
using namespace std;
using namespace core_buffer;
ReplayZmqSender::ReplayZmqSender(const string& ipc_id, const int source_id)
{
auto ipc_base = REPLAY_STREAM_IPC_URL + ipc_id + "-";
stringstream ipc_stream;
ipc_stream << ipc_base << source_id;
const auto ipc_address = ipc_stream.str();
ctx_ = zmq_ctx_new();
socket_ = zmq_socket(ctx_, ZMQ_PUSH);
const int sndhwm = REPLAY_SNDHWM;
if (zmq_setsockopt(socket_, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0)
throw runtime_error(zmq_strerror (errno));
const int linger_ms = -1;
if (zmq_setsockopt(socket_, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0)
throw runtime_error(zmq_strerror (errno));
if (zmq_bind(socket_, ipc_address.c_str()) != 0)
throw runtime_error(zmq_strerror (errno));
}
ReplayZmqSender::~ReplayZmqSender()
{
close();
}
void ReplayZmqSender::close() {
zmq_close(socket_);
zmq_ctx_destroy(ctx_);
}
void ReplayZmqSender::send(const ModuleFrame* metadata, const char* data)
{
zmq_send(socket_, metadata, sizeof(ModuleFrame), ZMQ_SNDMORE);
zmq_send(socket_, data, MODULE_N_BYTES, 0);
}