From 1a3f803ba0b9e2383292eaefde34f2befedc9099 Mon Sep 17 00:00:00 2001 From: Babicaa Date: Tue, 14 May 2019 09:05:39 +0200 Subject: [PATCH] WIP --- lib/src/ProcessManager.cpp | 65 ++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 23 deletions(-) diff --git a/lib/src/ProcessManager.cpp b/lib/src/ProcessManager.cpp index 7901b34..d9b8de4 100644 --- a/lib/src/ProcessManager.cpp +++ b/lib/src/ProcessManager.cpp @@ -12,13 +12,25 @@ #include "ProcessManager.hpp" #include "config.hpp" #include "BufferedWriter.hpp" +#include "compression/compression.h" using namespace std; -ProcessManager::ProcessManager(WriterManager& writer_manager, ZmqReceiver& receiver, RingBuffer& ring_buffer, - const H5Format& format, uint16_t rest_port, const string& bsread_rest_address, hsize_t frames_per_file) : - writer_manager(writer_manager), receiver(receiver), ring_buffer(ring_buffer), format(format), rest_port(rest_port), - bsread_rest_address(bsread_rest_address), frames_per_file(frames_per_file) +ProcessManager::ProcessManager( + WriterManager& writer_manager, + ZmqReceiver& receiver, + RingBuffer& ring_buffer, + const H5Format& format, + uint16_t rest_port, + const string& bsread_rest_address, + hsize_t frames_per_file) : + writer_manager(writer_manager), + receiver(receiver), + ring_buffer(ring_buffer), + format(format), + rest_port(rest_port), + bsread_rest_address(bsread_rest_address), + frames_per_file(frames_per_file) { } @@ -26,31 +38,13 @@ ProcessManager::ProcessManager(WriterManager& writer_manager, ZmqReceiver& recei void ProcessManager::run_receivers(uint8_t n_receiving_threads) { - #ifdef DEBUG_OUTPUT - using namespace date; - cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[ProcessManager::run_writer] Running writer"; - cout << " with n_receiving_threads " << n_receiving_threads; - cout << endl; - #endif - - boost::thread_group receivers; - for (uint8_t i=0; iframe_bytes_size, 1); + + if (max_buffer_size > ring_buffer.get_slot_size()) { + + } + + auto compressed_size = compression::compress_bitshuffle( + static_cast(frame_data), + frame_metadata->frame_bytes_size, + 1, + buffer); + + #ifdef DEBUG_OUTPUT + using namespace date; + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << "[ProcessManager::receive_zmq] Compressed image from "; + cout << frame_metadata->frame_bytes_size << " bytes to "; + cout << compressed_size << " bytes." << endl; + #endif + + frame_metadata->frame_bytes_size = compressed_size; + + ring_buffer.commit(frame_metadata); } #ifdef DEBUG_OUTPUT