From 6d8e113e7b30b398782ab153a002b8fba3f7b649 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 31 Mar 2020 17:45:09 +0200 Subject: [PATCH] First implementation of receiver module --- core-writer/include/ZmqRecvModule.hpp | 33 ++++++++ core-writer/src/ZmqRecvModule.cpp | 104 ++++++++++++++++++++++++++ 2 files changed, 137 insertions(+) create mode 100644 core-writer/include/ZmqRecvModule.hpp create mode 100644 core-writer/src/ZmqRecvModule.cpp diff --git a/core-writer/include/ZmqRecvModule.hpp b/core-writer/include/ZmqRecvModule.hpp new file mode 100644 index 0000000..fa22573 --- /dev/null +++ b/core-writer/include/ZmqRecvModule.hpp @@ -0,0 +1,33 @@ +#ifndef ZMQRECVMODULE_H +#define ZMQRECVMODULE_H + +#include "ZmqReceiver.hpp" +#include "RingBuffer.hpp" + +class ZmqRecvModule +{ + typedef std::unordered_map header_map; + + RingBuffer& ring_buffer_; + const header_map& header_values_; + const std::atomic_bool& is_writing_; + +protected: + void receive_thread( + const std::string& connect_address, + const uint8_t n_receiving_threads); + +public: + ZmqRecvModule( + RingBuffer& ring_buffer, + const header_map& header_values, + const std::atomic_bool& is_writing); + + void start( + const std::string& connect_address, + const uint8_t n_receiving_thread); + + void stop(); +}; + +#endif \ No newline at end of file diff --git a/core-writer/src/ZmqRecvModule.cpp b/core-writer/src/ZmqRecvModule.cpp new file mode 100644 index 0000000..747d362 --- /dev/null +++ b/core-writer/src/ZmqRecvModule.cpp @@ -0,0 +1,104 @@ + +#include +#include +#include "ZmqRecvModule.hpp" + +using namespace std; + +ZmqRecvModule::ZmqRecvModule( + RingBuffer &ringBuffer, + const header_map &header_values, + const std::atomic_bool& is_writing) : + ring_buffer_(ring_buffer_), + header_values_(header_values), + is_writing_(is_writing) +{ + +} + +void ZmqRecvModule::start( + const string& connect_address, + const uint8_t n_receiving_thread) +{ + +} + +void ZmqRecvModule::stop() +{ + +} + +void ZmqRecvModule::receive_thread( + const string& connect_address, + const uint8_t n_receiving_threads) +{ + ZmqReceiver receiver( + connect_address, + config::zmq_n_io_threads, + config::zmq_receive_timeout, + header_values_); + + receiver.connect(); + + while (true) { + + auto frame = receiver.receive(); + + // If no message, first and second = nullptr + if (frame.first == nullptr || + !is_writing_.load(memory_order::memory_order_relaxed)) { + continue; + } + + auto frame_metadata = frame.first; + auto frame_data = frame.second; + + #ifdef DEBUG_OUTPUT + using namespace date; + using namespace chrono; + cout << "[" << system_clock::now() << "]"; + cout << "[ProcessManager::receive_zmq]"; + cout << " Processing FrameMetadata with frame_index "; + cout << frame_metadata->frame_index; + cout << " and frame_shape [" << frame_metadata->frame_shape[0]; + cout << ", " << frame_metadata->frame_shape[1] << "]"; + cout << " and endianness " << frame_metadata->endianness; + cout << " and type " << frame_metadata->type; + cout << " and frame_bytes_size "; + cout << frame_metadata->frame_bytes_size << "." << endl; + #endif + + char* buffer = ring_buffer_.reserve(frame_metadata); + + size_t max_buffer_size = compression::get_bitshuffle_max_buffer_size( + frame_metadata->frame_bytes_size, 1); + + if (max_buffer_size > ring_buffer.get_slot_size()) { + + } + + auto compressed_size = compression::compress_bitshuffle( + static_cast(frame_data), + frame_metadata->frame_bytes_size, + 1, + buffer); + + #ifdef DEBUG_OUTPUT + using namespace date; + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << "[ProcessManager::receive_zmq] Compressed image from "; + cout << frame_metadata->frame_bytes_size << " bytes to "; + cout << compressed_size << " bytes." << endl; + #endif + + frame_metadata->frame_bytes_size = compressed_size; + + ring_buffer_.commit(frame_metadata); + } + + #ifdef DEBUG_OUTPUT + using namespace date; + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << "[ProcessManager::receive_zmq] Receiver thread stopped." << endl; + #endif +} \ No newline at end of file