diff --git a/core-buffer/include/formats.hpp b/core-buffer/include/formats.hpp index 5945074..c461bc3 100644 --- a/core-buffer/include/formats.hpp +++ b/core-buffer/include/formats.hpp @@ -45,7 +45,7 @@ struct ImageBinaryFormat { ImageMetadata meta; char* data = nullptr; const size_t size; - ImageBinaryFormat(size_t H, size_t W, size_t D): size(H*W*D) { data = (char*) calloc(8*H*W*D, sizeof(int)); }; + ImageBinaryFormat(size_t H, size_t W, size_t D): size(H*W*D) { data = (char*) calloc(H*W*D, sizeof(int)); }; ~ImageBinaryFormat(){ free(data); std::cout << "ImageBinaryFormat destructor called!" << std::endl; } }; diff --git a/jfj-combined/include/ZmqImagePublisher.hpp b/jfj-combined/include/ZmqImagePublisher.hpp new file mode 100644 index 0000000..12cdba9 --- /dev/null +++ b/jfj-combined/include/ZmqImagePublisher.hpp @@ -0,0 +1,63 @@ +#ifndef SF_DAQ_BUFFER_ZMQ_IMAGE_PUBLISHER_HPP +#define SF_DAQ_BUFFER_ZMQ_IMAGE_PUBLISHER_HPP + +#include +#include +#include "../../core-buffer/include/formats.hpp" + + +#define ASSERT_FALSE(expr, msg) + if(bool(expr)){ \ + std::string text = "ASSERTION called at " + std::string(__FILE__) + " line " + std::to_string(__LINE__) + "\n"; \ + text = text + "Message:" + msg + "\nErrno: " + std::to_sting(errno); \ + throw std::runtime_error(text); \ + } \ + +/** ZMQ Publisher + + Lightweight wrapper base class to initialize a ZMQ Publisher. + Nothing data specific, but everything is only 'protected'. + It also has an internal mutex that can be used for threadsafe + access to the undelying connection; +**/ +class ZmqPublisher { + protected: + const uint16_t m_port; + std::string m_address; + zmq::context_t m_ctx; + zmq::socket_t m_socket; + std::mutex g_zmq_socket; + + public: + ZmqPublisher(const uint16_t port): + m_port(port), m_address("tcp://*:" + std::to_string(port)), m_ctx(1), m_socket(m_ctx, ZMQ_PUB) { + // Bind the socket + auto err = m_socket.bind(m_address.c_str(); + ASSERT_FALSE( err, "Failed to bind ZMQ socket" ) + std::cout << "Initialized ZMQ publisher at " << m_address << std::endl; + }; + + ~ZmqPublisher(){}; +}; + + +/** ZMQ Image Publisher + + Specialized publisher to send 'ImageBinaryFormat' data format as + multipart message. It also takes care of thread safety. +**/ +class ZmqImagePublisher: public ZmqPublisher { + public: + void sendImage(ImageBinaryFormat& image){ + std::lock_guard guard(g_zmq_socket); + int err = 0; + err |= m_socket.send(&image.meta, sizeof(image.meta), ZMQ_SNDMORE); + err |= m_socket.send(image.data, image.size, 0); + ASSERT_FALSE( err, "Failed to send image data" ) + std::cout << "Sent ZMQ stream of pulse: " << image.meta.pulse_id << std::endl; + } +}; + + + +#endif //SF_DAQ_BUFFER_ZMQ_IMAGE_PUBLISHER_HPP diff --git a/jfj-combined/src/ZmqImagePublisher.cpp b/jfj-combined/src/ZmqImagePublisher.cpp new file mode 100644 index 0000000..84de80b --- /dev/null +++ b/jfj-combined/src/ZmqImagePublisher.cpp @@ -0,0 +1,61 @@ +#include +#include + +using namespace std; +using namespace buffer_config; + +PacketUdpReceiver::PacketUdpReceiver() : socket_fd_(-1) { } + +PacketUdpReceiver::~PacketUdpReceiver() { + disconnect(); +} + +void PacketUdpReceiver::bind(const uint16_t port){ + if (socket_fd_ > -1) { + throw runtime_error("Socket already bound."); + } + + socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0); + if (socket_fd_ < 0) { + throw runtime_error("Cannot open socket."); + } + + sockaddr_in server_address = {0}; + server_address.sin_family = AF_INET; + server_address.sin_addr.s_addr = INADDR_ANY; + server_address.sin_port = htons(port); + + timeval udp_socket_timeout; + udp_socket_timeout.tv_sec = 0; + udp_socket_timeout.tv_usec = BUFFER_UDP_US_TIMEOUT; + + if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVTIMEO, &udp_socket_timeout, sizeof(timeval)) == -1) { + throw runtime_error("Cannot set SO_RCVTIMEO. " + string(strerror(errno))); + } + + if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVBUF, &BUFFER_UDP_RCVBUF_BYTES, sizeof(int)) == -1) { + throw runtime_error("Cannot set SO_RCVBUF. " + string(strerror(errno))); + }; + //TODO: try to set SO_RCVLOWAT + + auto bind_result = ::bind(socket_fd_, reinterpret_cast(&server_address), sizeof(server_address)); + + if (bind_result < 0) { + throw runtime_error("Cannot bind socket."); + } +} + +int PacketUdpReceiver::receive_many(mmsghdr* msgs, const size_t n_msgs){ + return recvmmsg(socket_fd_, msgs, n_msgs, 0, 0); +} + +bool PacketUdpReceiver::receive(void* buffer, const size_t buffer_n_bytes){ + auto data_len = recv(socket_fd_, buffer, buffer_n_bytes, 0); + + return (data_len == buffer_n_bytes) ? true : false; +} + +void PacketUdpReceiver::disconnect(){ + close(socket_fd_); + socket_fd_ = -1; +}