ZMQ placeholder

This commit is contained in:
Mohacsi Istvan
2021-06-28 13:40:58 +02:00
parent 35cebb59e4
commit 9bc39432fb
3 changed files with 125 additions and 1 deletions
+1 -1
View File
@@ -45,7 +45,7 @@ struct ImageBinaryFormat {
ImageMetadata meta;
char* data = nullptr;
const size_t size;
ImageBinaryFormat(size_t H, size_t W, size_t D): size(H*W*D) { data = (char*) calloc(8*H*W*D, sizeof(int)); };
ImageBinaryFormat(size_t H, size_t W, size_t D): size(H*W*D) { data = (char*) calloc(H*W*D, sizeof(int)); };
~ImageBinaryFormat(){ free(data); std::cout << "ImageBinaryFormat destructor called!" << std::endl; }
};
@@ -0,0 +1,63 @@
#ifndef SF_DAQ_BUFFER_ZMQ_IMAGE_PUBLISHER_HPP
#define SF_DAQ_BUFFER_ZMQ_IMAGE_PUBLISHER_HPP
#include <iostream>
#include <zmq.hpp>
#include "../../core-buffer/include/formats.hpp"
#define ASSERT_FALSE(expr, msg)
if(bool(expr)){ \
std::string text = "ASSERTION called at " + std::string(__FILE__) + " line " + std::to_string(__LINE__) + "\n"; \
text = text + "Message:" + msg + "\nErrno: " + std::to_sting(errno); \
throw std::runtime_error(text); \
} \
/** ZMQ Publisher
Lightweight wrapper base class to initialize a ZMQ Publisher.
Nothing data specific, but everything is only 'protected'.
It also has an internal mutex that can be used for threadsafe
access to the undelying connection;
**/
class ZmqPublisher {
protected:
const uint16_t m_port;
std::string m_address;
zmq::context_t m_ctx;
zmq::socket_t m_socket;
std::mutex g_zmq_socket;
public:
ZmqPublisher(const uint16_t port):
m_port(port), m_address("tcp://*:" + std::to_string(port)), m_ctx(1), m_socket(m_ctx, ZMQ_PUB) {
// Bind the socket
auto err = m_socket.bind(m_address.c_str();
ASSERT_FALSE( err, "Failed to bind ZMQ socket" )
std::cout << "Initialized ZMQ publisher at " << m_address << std::endl;
};
~ZmqPublisher(){};
};
/** ZMQ Image Publisher
Specialized publisher to send 'ImageBinaryFormat' data format as
multipart message. It also takes care of thread safety.
**/
class ZmqImagePublisher: public ZmqPublisher {
public:
void sendImage(ImageBinaryFormat& image){
std::lock_guard<std::mutex> guard(g_zmq_socket);
int err = 0;
err |= m_socket.send(&image.meta, sizeof(image.meta), ZMQ_SNDMORE);
err |= m_socket.send(image.data, image.size, 0);
ASSERT_FALSE( err, "Failed to send image data" )
std::cout << "Sent ZMQ stream of pulse: " << image.meta.pulse_id << std::endl;
}
};
#endif //SF_DAQ_BUFFER_ZMQ_IMAGE_PUBLISHER_HPP
+61
View File
@@ -0,0 +1,61 @@
#include <iostream>
#include <zmq.hpp>
using namespace std;
using namespace buffer_config;
PacketUdpReceiver::PacketUdpReceiver() : socket_fd_(-1) { }
PacketUdpReceiver::~PacketUdpReceiver() {
disconnect();
}
void PacketUdpReceiver::bind(const uint16_t port){
if (socket_fd_ > -1) {
throw runtime_error("Socket already bound.");
}
socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
if (socket_fd_ < 0) {
throw runtime_error("Cannot open socket.");
}
sockaddr_in server_address = {0};
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = INADDR_ANY;
server_address.sin_port = htons(port);
timeval udp_socket_timeout;
udp_socket_timeout.tv_sec = 0;
udp_socket_timeout.tv_usec = BUFFER_UDP_US_TIMEOUT;
if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVTIMEO, &udp_socket_timeout, sizeof(timeval)) == -1) {
throw runtime_error("Cannot set SO_RCVTIMEO. " + string(strerror(errno)));
}
if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVBUF, &BUFFER_UDP_RCVBUF_BYTES, sizeof(int)) == -1) {
throw runtime_error("Cannot set SO_RCVBUF. " + string(strerror(errno)));
};
//TODO: try to set SO_RCVLOWAT
auto bind_result = ::bind(socket_fd_, reinterpret_cast<const sockaddr *>(&server_address), sizeof(server_address));
if (bind_result < 0) {
throw runtime_error("Cannot bind socket.");
}
}
int PacketUdpReceiver::receive_many(mmsghdr* msgs, const size_t n_msgs){
return recvmmsg(socket_fd_, msgs, n_msgs, 0, 0);
}
bool PacketUdpReceiver::receive(void* buffer, const size_t buffer_n_bytes){
auto data_len = recv(socket_fd_, buffer, buffer_n_bytes, 0);
return (data_len == buffer_n_bytes) ? true : false;
}
void PacketUdpReceiver::disconnect(){
close(socket_fd_);
socket_fd_ = -1;
}