// Copyright (2019-2022) Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-or-later #include "ZMQWrappers.h" #include ZMQContext::ZMQContext() { context = zmq_ctx_new(); } ZMQContext &ZMQContext::NumThreads(int32_t threads) { if (zmq_ctx_set(context, ZMQ_IO_THREADS, threads) != 0) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Cannot set number of I/O threads"); return *this; } ZMQContext::~ZMQContext() { zmq_ctx_destroy(context); } void *ZMQContext::GetContext() const { return context; } ZMQSocket::ZMQSocket(ZMQContext &context, ZMQSocketType in_socket_type) : socket_type(in_socket_type) { socket = zmq_socket(context.GetContext(), static_cast(socket_type)); if (socket == nullptr) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_socket failed"); } void ZMQSocket::Bind(const std::string &addr) { if (zmq_bind(socket, addr.c_str()) != 0) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_bind failed"); } void ZMQSocket::Connect(const std::string &addr) { if (zmq_connect(socket, addr.c_str()) != 0) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_connect failed"); } void ZMQSocket::Disconnect(const std::string &addr) { if (zmq_disconnect(socket, addr.c_str()) != 0) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_disconnect failed"); } void ZMQSocket::Send() { std::unique_lock ul(m); if (zmq_send(socket, nullptr, 0, 0) != 0) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_send() failed: " + std::string(std::strerror(errno))); } void ZMQSocket::Send(const void *buf, size_t buf_size, bool blocking, bool multipart) { std::unique_lock ul(m); if (zmq_send(socket, buf, buf_size, (blocking ? 0 : ZMQ_DONTWAIT) | (multipart ? ZMQ_SNDMORE : 0)) != buf_size) { if (errno != EAGAIN) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_send(buf) failed: " + std::string(std::strerror(errno))); } } void ZMQSocket::Send(const std::string &s, bool blocking, bool multipart) { Send((void *) s.c_str(), s.size(), blocking, multipart); } void ZMQSocket::Send(const int32_t &value) { std::unique_lock ul(m); if (zmq_send(socket, &value, sizeof(int32_t), 0) != sizeof(int32_t)) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_send(int) failed: " + std::string(std::strerror(errno))); } void ZMQSocket::Send(zmq_msg_t *msg) { std::unique_lock ul(m); if (zmq_msg_send(msg, socket, 0) < 0) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_send failed"); } void zmq_socket_free(void *data, void *hint) { auto z = (ZeroCopyReturnValue *) hint; z->release(); } void ZMQSocket::SendZeroCopy(const void *buf, size_t buf_size, ZeroCopyReturnValue *zero_copy_ret_val) { std::unique_lock ul(m); zmq_msg_t msg; if (zmq_msg_init_data(&msg, const_cast(buf), buf_size, zmq_socket_free, zero_copy_ret_val) != 0) { zero_copy_ret_val->release(); throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_init_data failed"); } if (zmq_msg_send(&msg, socket, 0) < 0) { zero_copy_ret_val->release(); throw JFJochException(JFJochExceptionCategory::ZeroMQ, "zmq_msg_send failed"); } } int64_t ZMQSocket::Receive(bool blocking) { std::vector msg; return Receive(msg, blocking, true); } int64_t ZMQSocket::Receive(std::string &s, bool blocking) { std::vector v; int64_t rc = Receive(v, blocking, true); if (rc > 0) s = std::string(v.begin(), v.end()); return rc; } void ZMQSocket::SetSocketOption(int32_t option_name, int32_t value) { if (zmq_setsockopt(socket, option_name, &value, sizeof(value)) != 0) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Cannot set socket option"); } ZMQSocket &ZMQSocket::ReceiveTimeout(std::chrono::milliseconds input) { SetSocketOption(ZMQ_RCVTIMEO, input.count()); return *this; } ZMQSocket &ZMQSocket::NoReceiveTimeout() { SetSocketOption(ZMQ_RCVTIMEO, -1); return *this; } ZMQSocket &ZMQSocket::ReceiverBufferSize(int32_t bytes) { SetSocketOption(ZMQ_RCVBUF, bytes); return *this; } ZMQSocket &ZMQSocket::SendBufferSize(int32_t bytes) { SetSocketOption(ZMQ_SNDBUF, bytes); return *this; } ZMQSocket &ZMQSocket::SubscribeAll() { if (socket_type != ZMQSocketType::Sub) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "subscribe only possible for Sub socket"); zmq_setsockopt(socket, ZMQ_SUBSCRIBE, nullptr, 0); return *this; } ZMQSocket &ZMQSocket::Subscribe(const std::string &topic) { if (socket_type != ZMQSocketType::Sub) throw JFJochException(JFJochExceptionCategory::ZeroMQ, "subscribe only possible for Sub socket"); zmq_setsockopt(socket, ZMQ_SUBSCRIBE, topic.c_str(), topic.size()); return *this; } ZMQSocket::~ZMQSocket() { zmq_close(socket); } ZMQSocket& ZMQSocket::ReceiveWaterMark(int32_t msgs) { SetSocketOption(ZMQ_RCVHWM, msgs); return *this; } ZMQSocket& ZMQSocket::SendWaterMark(int32_t msgs) { SetSocketOption(ZMQ_SNDHWM, msgs); return *this; } ZMQSocket &ZMQSocket::NoLinger() { SetSocketOption(ZMQ_LINGER, 0); return *this; } ZMQSocket &ZMQSocket::Conflate(bool input) { SetSocketOption(ZMQ_CONFLATE, input ? 1 : 0); return *this; }