222 lines
6.6 KiB
C++
222 lines
6.6 KiB
C++
// Copyright (2019-2023) Paul Scherrer Institute
|
|
|
|
#include "ZMQWrappers.h"
|
|
#include <cerrno>
|
|
|
|
ZMQContext::ZMQContext() {
|
|
context = zmq_ctx_new();
|
|
|
|
// Default is to have 2 I/O threads per ZMQ context
|
|
if (zmq_ctx_set(context, ZMQ_IO_THREADS, 2) != 0)
|
|
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Cannot set number of I/O threads");
|
|
}
|
|
|
|
ZMQContext &ZMQContext::NumThreads(int32_t threads) {
|
|
if (zmq_ctx_set(context, ZMQ_IO_THREADS, threads) != 0)
|
|
throw JFJochException(JFJochExceptionCategory::ZeroMQ,
|
|
"Cannot set number of I/O threads");
|
|
return *this;
|
|
}
|
|
|
|
ZMQContext::~ZMQContext() {
|
|
zmq_ctx_destroy(context);
|
|
}
|
|
|
|
void *ZMQContext::GetContext() const {
|
|
return context;
|
|
}
|
|
|
|
ZMQSocket::ZMQSocket(ZMQSocketType in_socket_type) : socket_type(in_socket_type) {
|
|
socket = zmq_socket(context.GetContext(), static_cast<int>(socket_type));
|
|
|
|
if (socket == nullptr)
|
|
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_socket failed");
|
|
|
|
EnableKeepAlive();
|
|
}
|
|
|
|
void ZMQSocket::Bind(const std::string &addr) {
|
|
if (zmq_bind(socket, addr.c_str()) != 0)
|
|
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_bind failed");
|
|
}
|
|
|
|
void ZMQSocket::Connect(const std::string &addr) {
|
|
if (zmq_connect(socket, addr.c_str()) != 0)
|
|
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_connect failed");
|
|
}
|
|
|
|
void ZMQSocket::Disconnect(const std::string &addr) {
|
|
if (zmq_disconnect(socket, addr.c_str()) != 0)
|
|
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_disconnect failed");
|
|
}
|
|
|
|
void ZMQSocket::Send() {
|
|
std::unique_lock<std::mutex> ul(m);
|
|
if (zmq_send(socket, nullptr, 0, 0) != 0)
|
|
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_send() failed: " + std::string(std::strerror(errno)));
|
|
}
|
|
|
|
bool ZMQSocket::Send(const void *buf, size_t buf_size, bool blocking, bool multipart) {
|
|
std::unique_lock<std::mutex> ul(m);
|
|
if (zmq_send(socket, buf, buf_size, (blocking ? 0 : ZMQ_DONTWAIT) | (multipart ? ZMQ_SNDMORE : 0)) == buf_size)
|
|
return true;
|
|
else {
|
|
if ((errno == EAGAIN) || (errno == EINTR))
|
|
return false;
|
|
else
|
|
throw JFJochException(JFJochExceptionCategory::ZeroMQ,
|
|
"zmq_send(buf) failed: " + std::string(std::strerror(errno)));
|
|
}
|
|
}
|
|
|
|
bool ZMQSocket::Send(const std::string &s, bool blocking, bool multipart) {
|
|
return Send((void *) s.c_str(), s.size(), blocking, multipart);
|
|
}
|
|
|
|
void ZMQSocket::Send(const int32_t &value) {
|
|
std::unique_lock<std::mutex> ul(m);
|
|
if (zmq_send(socket, &value, sizeof(int32_t), 0) != sizeof(int32_t))
|
|
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_send(int) failed: " + std::string(std::strerror(errno)));
|
|
}
|
|
|
|
void ZMQSocket::Send(zmq_msg_t *msg) {
|
|
std::unique_lock<std::mutex> ul(m);
|
|
if (zmq_msg_send(msg, socket, 0) < 0)
|
|
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_send failed");
|
|
}
|
|
|
|
void zmq_socket_free(void *data, void *hint) {
|
|
auto z = (ZeroCopyReturnValue *) hint;
|
|
z->release();
|
|
}
|
|
|
|
void ZMQSocket::SendZeroCopy(const void *buf, size_t buf_size, ZeroCopyReturnValue *zero_copy_ret_val) {
|
|
std::unique_lock<std::mutex> ul(m);
|
|
zmq_msg_t msg;
|
|
if (zmq_msg_init_data(&msg, const_cast<void *>(buf), buf_size, zmq_socket_free, zero_copy_ret_val) != 0) {
|
|
zero_copy_ret_val->release();
|
|
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_init_data failed");
|
|
}
|
|
if (zmq_msg_send(&msg, socket, 0) < 0) {
|
|
zero_copy_ret_val->release();
|
|
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_send failed");
|
|
}
|
|
}
|
|
|
|
void ZMQSocket::SetSocketOption(int32_t option_name, int32_t value) {
|
|
if (zmq_setsockopt(socket, option_name, &value, sizeof(value)) != 0)
|
|
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Cannot set socket option");
|
|
|
|
}
|
|
|
|
ZMQSocket &ZMQSocket::ReceiveTimeout(std::chrono::milliseconds input) {
|
|
SetSocketOption(ZMQ_RCVTIMEO, input.count());
|
|
return *this;
|
|
}
|
|
|
|
ZMQSocket &ZMQSocket::NoReceiveTimeout() {
|
|
SetSocketOption(ZMQ_RCVTIMEO, -1);
|
|
return *this;
|
|
}
|
|
|
|
ZMQSocket &ZMQSocket::SendTimeout(std::chrono::milliseconds input) {
|
|
SetSocketOption(ZMQ_SNDTIMEO, input.count());
|
|
return *this;
|
|
}
|
|
|
|
ZMQSocket &ZMQSocket::NoSendTimeout() {
|
|
SetSocketOption(ZMQ_SNDTIMEO, -1);
|
|
return *this;
|
|
}
|
|
|
|
ZMQSocket &ZMQSocket::ReceiverBufferSize(int32_t bytes) {
|
|
SetSocketOption(ZMQ_RCVBUF, bytes);
|
|
return *this;
|
|
}
|
|
|
|
ZMQSocket &ZMQSocket::SendBufferSize(int32_t bytes) {
|
|
SetSocketOption(ZMQ_SNDBUF, bytes);
|
|
return *this;
|
|
}
|
|
|
|
ZMQSocket &ZMQSocket::SubscribeAll() {
|
|
if (socket_type != ZMQSocketType::Sub)
|
|
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "subscribe only possible for Sub socket");
|
|
zmq_setsockopt(socket, ZMQ_SUBSCRIBE, nullptr, 0);
|
|
return *this;
|
|
}
|
|
|
|
ZMQSocket &ZMQSocket::Subscribe(const std::string &topic) {
|
|
if (socket_type != ZMQSocketType::Sub)
|
|
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "subscribe only possible for Sub socket");
|
|
zmq_setsockopt(socket, ZMQ_SUBSCRIBE, topic.c_str(), topic.size());
|
|
return *this;
|
|
}
|
|
|
|
ZMQSocket::~ZMQSocket() {
|
|
zmq_close(socket);
|
|
}
|
|
|
|
ZMQSocket& ZMQSocket::ReceiveWaterMark(int32_t msgs) {
|
|
SetSocketOption(ZMQ_RCVHWM, msgs);
|
|
return *this;
|
|
}
|
|
|
|
ZMQSocket& ZMQSocket::SendWaterMark(int32_t msgs) {
|
|
SetSocketOption(ZMQ_SNDHWM, msgs);
|
|
return *this;
|
|
}
|
|
|
|
ZMQSocket &ZMQSocket::NoLinger() {
|
|
SetSocketOption(ZMQ_LINGER, 0);
|
|
return *this;
|
|
}
|
|
|
|
ZMQSocket &ZMQSocket::Conflate(bool input) {
|
|
SetSocketOption(ZMQ_CONFLATE, input ? 1 : 0);
|
|
return *this;
|
|
}
|
|
|
|
ZMQSocket &ZMQSocket::EnableKeepAlive() {
|
|
SetSocketOption(ZMQ_TCP_KEEPALIVE, 1);
|
|
SetSocketOption(ZMQ_TCP_KEEPALIVE_CNT, 10);
|
|
SetSocketOption(ZMQ_TCP_KEEPALIVE_IDLE, 60);
|
|
SetSocketOption(ZMQ_TCP_KEEPALIVE_INTVL, 1);
|
|
return *this;
|
|
}
|
|
|
|
std::string ZMQSocket::GetEndpointName() {
|
|
char tmp[256];
|
|
size_t len = 255;
|
|
zmq_getsockopt(socket, ZMQ_LAST_ENDPOINT, tmp, &len);
|
|
return tmp;
|
|
}
|
|
|
|
bool ZMQSocket::Receive(ZMQMessage &msg, bool blocking) {
|
|
std::unique_lock<std::mutex> ul(m);
|
|
return (zmq_msg_recv(msg.GetMsg(), socket, blocking ? 0 : ZMQ_DONTWAIT) >= 0);
|
|
}
|
|
|
|
ZMQMessage::ZMQMessage() {
|
|
int rc = zmq_msg_init(&msg);
|
|
if (rc) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Cannot init zmq_msg");
|
|
}
|
|
|
|
ZMQMessage::~ZMQMessage() {
|
|
zmq_msg_close(&msg);
|
|
}
|
|
|
|
zmq_msg_t *ZMQMessage::GetMsg() {
|
|
return &msg;
|
|
}
|
|
|
|
size_t ZMQMessage::size() {
|
|
return zmq_msg_size(&msg);
|
|
}
|
|
|
|
const uint8_t *ZMQMessage::data() {
|
|
return (uint8_t *) zmq_msg_data(&msg);
|
|
}
|
|
|
|
|