working simple streamer

This commit is contained in:
Bechir Braham
2024-04-09 17:30:42 +02:00
parent 59b04ad6e8
commit 0755de309e
10 changed files with 99 additions and 31 deletions

View File

@ -20,7 +20,6 @@ class ZmqSocketReceiver : public ZmqSocket {
public:
ZmqSocketReceiver(const std::string &endpoint);
void connect();
ZmqFrame receive(ZmqFrame &zmq_frame);
std::vector<ZmqFrame> receive_n();
private:

View File

@ -43,7 +43,9 @@ void ZmqSocketReceiver::connect() {
ZmqHeader ZmqSocketReceiver::receive_header() {
// receive string ZmqHeader
aare::logger::debug("Receiving header");
size_t header_bytes_received = zmq_recv(m_socket, m_header_buffer, m_max_header_size, 0);
aare::logger::debug("Bytes: ", header_bytes_received);
m_header_buffer[header_bytes_received] = '\0'; // make sure we zero terminate
if (header_bytes_received < 0) {
@ -80,6 +82,7 @@ int ZmqSocketReceiver::receive_data(std::byte *data, size_t size) {
ZmqFrame ZmqSocketReceiver::receive_zmqframe() {
// receive header from zmq and parse it
ZmqHeader header = receive_header();
if (!header.data) {
// no data following header
return {header, Frame(0, 0, 0)};

View File

@ -17,7 +17,10 @@ void ZmqSocketSender::bind() {
m_context = zmq_ctx_new();
m_socket = zmq_socket(m_context, ZMQ_PUB);
size_t rc = zmq_bind(m_socket, m_endpoint.c_str());
assert(rc == 0);
if (rc != 0) {
std::string error = zmq_strerror(zmq_errno());
throw network_io::NetworkError("zmq_bind failed: " + error);
}
}
/**
@ -33,6 +36,7 @@ size_t ZmqSocketSender::send(const ZmqHeader &header, const std::byte *data, siz
// rc = zmq_send(m_socket, &header, sizeof(ZmqHeader), ZMQ_SNDMORE);
// assert(rc == sizeof(ZmqHeader));
std::string header_str = header.to_string();
aare::logger::debug("Header :", header_str);
rc = zmq_send(m_socket, header_str.c_str(), header_str.size(), ZMQ_SNDMORE);
assert(rc == header_str.size());
if (data == nullptr) {