// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include "ZMQWrappers.h" #include ZMQContext::ZMQContext() { context = zmq_ctx_new(); // Default is to have 2 I/O threads per ZMQ context if (zmq_ctx_set(context, ZMQ_IO_THREADS, 2) != 0) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Cannot set number of I/O threads"); } ZMQContext &ZMQContext::NumThreads(int32_t threads) { if (zmq_ctx_set(context, ZMQ_IO_THREADS, threads) != 0) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Cannot set number of I/O threads"); return *this; } ZMQContext::~ZMQContext() { zmq_ctx_destroy(context); } void *ZMQContext::GetContext() const { return context; } ZMQSocket::ZMQSocket(ZMQSocketType in_socket_type) : socket_type(in_socket_type) { socket = zmq_socket(context.GetContext(), static_cast(socket_type)); if (socket == nullptr) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_socket failed"); EnableKeepAlive(); } void ZMQSocket::Bind(const std::string &addr) { if (zmq_bind(socket, addr.c_str()) != 0) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_bind failed"); } void ZMQSocket::Connect(const std::string &addr) { if (zmq_connect(socket, addr.c_str()) != 0) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_connect failed"); } void ZMQSocket::Disconnect(const std::string &addr) { if (zmq_disconnect(socket, addr.c_str()) != 0) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_disconnect failed"); } void ZMQSocket::Send() { std::unique_lock ul(m); if (zmq_send(socket, nullptr, 0, 0) != 0) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_send() failed: " + std::string(std::strerror(errno))); } bool ZMQSocket::Send(const void *buf, size_t buf_size, bool blocking, bool multipart) { std::unique_lock ul(m); if (zmq_send(socket, buf, buf_size, (blocking ? 0 : ZMQ_DONTWAIT) | (multipart ? ZMQ_SNDMORE : 0)) == buf_size) return true; else { if ((errno == EAGAIN) || (errno == EINTR)) return false; else throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_send(buf) failed: " + std::string(std::strerror(errno))); } } bool ZMQSocket::Send(const std::string &s, bool blocking, bool multipart) { return Send((void *) s.c_str(), s.size(), blocking, multipart); } void ZMQSocket::Send(const int32_t &value) { std::unique_lock ul(m); if (zmq_send(socket, &value, sizeof(int32_t), 0) != sizeof(int32_t)) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_send(int) failed: " + std::string(std::strerror(errno))); } void ZMQSocket::Send(zmq_msg_t *msg) { std::unique_lock ul(m); if (zmq_msg_send(msg, socket, 0) < 0) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_send failed"); } void ZMQSocket::SendZeroCopy(void *data, size_t size,void (*callback)(void *, void *), void *hint) { std::unique_lock ul(m); zmq_msg_t msg; if (zmq_msg_init_data(&msg, data, size, callback, hint) != 0) { callback(data, hint); throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_init_data failed"); } if (zmq_msg_send(&msg, socket, 0) < 0) { callback(data, hint); throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_send failed"); } } void ZMQSocket::SetSocketOption(int32_t option_name, int32_t value) { if (zmq_setsockopt(socket, option_name, &value, sizeof(value)) != 0) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Cannot set socket option"); } ZMQSocket &ZMQSocket::ReceiveTimeout(std::chrono::milliseconds input) { SetSocketOption(ZMQ_RCVTIMEO, input.count()); return *this; } ZMQSocket &ZMQSocket::NoReceiveTimeout() { SetSocketOption(ZMQ_RCVTIMEO, -1); return *this; } ZMQSocket &ZMQSocket::SendTimeout(std::chrono::milliseconds input) { SetSocketOption(ZMQ_SNDTIMEO, input.count()); return *this; } ZMQSocket &ZMQSocket::NoSendTimeout() { SetSocketOption(ZMQ_SNDTIMEO, -1); return *this; } ZMQSocket &ZMQSocket::ReceiverBufferSize(int32_t bytes) { SetSocketOption(ZMQ_RCVBUF, bytes); return *this; } ZMQSocket &ZMQSocket::SendBufferSize(int32_t bytes) { SetSocketOption(ZMQ_SNDBUF, bytes); return *this; } ZMQSocket &ZMQSocket::SubscribeAll() { if (socket_type != ZMQSocketType::Sub) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "subscribe only possible for Sub socket"); zmq_setsockopt(socket, ZMQ_SUBSCRIBE, nullptr, 0); return *this; } ZMQSocket &ZMQSocket::Subscribe(const std::string &topic) { if (socket_type != ZMQSocketType::Sub) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "subscribe only possible for Sub socket"); zmq_setsockopt(socket, ZMQ_SUBSCRIBE, topic.c_str(), topic.size()); return *this; } ZMQSocket::~ZMQSocket() { zmq_close(socket); } ZMQSocket& ZMQSocket::ReceiveWaterMark(int32_t msgs) { SetSocketOption(ZMQ_RCVHWM, msgs); return *this; } ZMQSocket& ZMQSocket::SendWaterMark(int32_t msgs) { SetSocketOption(ZMQ_SNDHWM, msgs); return *this; } ZMQSocket &ZMQSocket::NoLinger() { SetSocketOption(ZMQ_LINGER, 0); return *this; } ZMQSocket &ZMQSocket::Conflate(bool input) { SetSocketOption(ZMQ_CONFLATE, input ? 1 : 0); return *this; } ZMQSocket &ZMQSocket::EnableKeepAlive() { SetSocketOption(ZMQ_TCP_KEEPALIVE, 1); SetSocketOption(ZMQ_TCP_KEEPALIVE_CNT, 10); SetSocketOption(ZMQ_TCP_KEEPALIVE_IDLE, 60); SetSocketOption(ZMQ_TCP_KEEPALIVE_INTVL, 1); return *this; } std::string ZMQSocket::GetEndpointName() { char tmp[256]; size_t len = 255; zmq_getsockopt(socket, ZMQ_LAST_ENDPOINT, tmp, &len); return tmp; } bool ZMQSocket::Receive(ZMQMessage &msg, bool blocking) { std::unique_lock ul(m); return (zmq_msg_recv(msg.GetMsg(), socket, blocking ? 0 : ZMQ_DONTWAIT) >= 0); } ZMQMessage::ZMQMessage() { int rc = zmq_msg_init(&msg); if (rc) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Cannot init zmq_msg"); } ZMQMessage::~ZMQMessage() { zmq_msg_close(&msg); } zmq_msg_t *ZMQMessage::GetMsg() { return &msg; } size_t ZMQMessage::size() { return zmq_msg_size(&msg); } const uint8_t *ZMQMessage::data() { return (uint8_t *) zmq_msg_data(&msg); }