diff --git a/lib/src/WriterManager.cpp b/lib/src/WriterManager.cpp index 28fcae2..f3158a3 100644 --- a/lib/src/WriterManager.cpp +++ b/lib/src/WriterManager.cpp @@ -64,13 +64,19 @@ void writer_utils::create_destination_folder(const string& output_file) } WriterManager::WriterManager( + ZmqReceiver& receiver, RingBuffer& ring_buffer, const H5Format& format, std::shared_ptr header_values_type, + uint8_t n_receiving_threads, hsize_t frames_per_file): + receiver(receiver), ring_buffer(ring_buffer), format(format), header_values_type(header_values_type), + n_receiving_threads(n_receiving_threads), + frames_per_file(frames_per_file), + receiving_threads(frames_per_file), logs(10) { running_flag = true; @@ -86,23 +92,33 @@ WriterManager::WriterManager( using namespace chrono; cout << "[" << system_clock::now() << "]"; - cout << "[WriterManager::WriterManager] Writer manager initialized." << endl; + cout << "[WriterManager::WriterManager]" + cout << " Writer manager initialized." << endl; #endif } -WriterManager::~WriterManager(){} +WriterManager::~WriterManager() +{ + stop(); +} void WriterManager::stop() { #ifdef DEBUG_OUTPUT using namespace date; - cout << "[" << std::chrono::system_clock::now() << "]"; + using namespace chrono; + cout << "[" << system_clock::now() << "]"; cout << "[WriterManager::stop] Stopping the writer." << endl; #endif running_flag = false; + + for (auto& thread_ptr:receiving_threads) { + thread_ptr->join(); + } + receiving_threads.clear(); - + writing_thread.join(); } string WriterManager::get_status() @@ -154,10 +170,16 @@ void WriterManager::start(const string output_file, this, output_file, n_frames); + + for (uint8_t i_rec=0; i_rec < n_receiving_threads; i_rec++) { + receiving_threads.push_back(unique_ptr( + new boost::thread(&WriterManager::receive_zmq, this) + )); + } - //TODO: Sent this event somewhere? } + bool WriterManager::is_running() const { return running_flag.load(); @@ -168,7 +190,7 @@ bool WriterManager::is_writing() const return writing_flag.load(); } -bool WriterManager::receive_frame() { +bool WriterManager::should_receive_frame() { if (n_frames_to_receive > 0) { return (n_frames_to_receive.fetch_sub(1) >= 0); } @@ -176,7 +198,7 @@ bool WriterManager::receive_frame() { return false; } -bool WriterManager::write_frame() { +bool WriterManager::should_write_frame() { if (n_frames_to_write > 0) { return (n_frames_to_write.fetch_sub(1) >= 0); } @@ -191,7 +213,8 @@ void WriterManager::writing_completed() { using namespace date; using namespace chrono; cout << "[" << system_clock::now() << "]"; - cout << "[WriterManager::writing_completed] Writing has finished." << endl; + cout << "[WriterManager::writing_completed]" + cout << " Writing has finished." << endl; #endif //TODO: Send this event somewhere somehow? @@ -257,7 +280,7 @@ void WriterManager::write_h5(const string output_file, const uint64_t n_frames) } // The acquisition stops when there are no more frames to write. - if (!write_frame()) { + if (!should_write_frame()) { break; } @@ -400,3 +423,68 @@ void WriterManager::write_h5(const string output_file, const uint64_t n_frames) } } +void WriterManager::receive_zmq() +{ + receiver.connect(); + + while (is_running()) { + + auto frame = receiver.receive(); + + // If receive timeout, both pointers are NULL. + if (!frame.first || !receive_frame()){ + continue; + } + + auto frame_metadata = frame.first; + auto frame_data = frame.second; + + #ifdef DEBUG_OUTPUT + using namespace date; + using namespace chrono; + cout << "[" << system_clock::now() << "]"; + cout << "[ProcessManager::receive_zmq] Processing FrameMetadata"; + cout << " with frame_index " << frame_metadata->frame_index; + cout << " and frame_shape [" << frame_metadata->frame_shape[0]; + cout << ", " << frame_metadata->frame_shape[1] << "]"; + cout << " and endianness " << frame_metadata->endianness; + cout << " and type " << frame_metadata->type; + cout << " and frame_bytes_size "; + cout << frame_metadata->frame_bytes_size << "." << endl; + #endif + + char* buffer = ring_buffer.reserve(frame_metadata); + + size_t max_buffer_size = compression::get_bitshuffle_max_buffer_size( + frame_metadata->frame_bytes_size, 1); + + if (max_buffer_size > ring_buffer.get_slot_size()) { + + } + + auto compressed_size = compression::compress_bitshuffle( + static_cast(frame_data), + frame_metadata->frame_bytes_size, + 1, + buffer); + + #ifdef DEBUG_OUTPUT + using namespace date; + using namespace chrono; + cout << "[" << system_clock::now() << "]"; + cout << "[WriterManager::receive_zmq] Compressed image from "; + cout << frame_metadata->frame_bytes_size << " bytes to "; + cout << compressed_size << " bytes." << endl; + #endif + + frame_metadata->frame_bytes_size = compressed_size; + + ring_buffer.commit(frame_metadata); + } + + #ifdef DEBUG_OUTPUT + using namespace date; + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << "[WriterManager::receive_zmq] Receiver thread stopped." << endl; + #endif +} diff --git a/lib/src/WriterManager.hpp b/lib/src/WriterManager.hpp index 76de00c..b21d8d0 100644 --- a/lib/src/WriterManager.hpp +++ b/lib/src/WriterManager.hpp @@ -10,10 +10,13 @@ #include #include "date.h" #include +#include +#include #include "ZmqReceiver.hpp" #include "RingBuffer.hpp" #include "H5Format.hpp" +#include "compression/compression.h" namespace writer_utils { void set_process_id(int user_id); @@ -43,12 +46,16 @@ class WriterManager std::atomic n_frames_to_receive; std::atomic n_frames_to_write; - protected: RingBuffer& ring_buffer; + + ZmqReceiver& receiver; + uint8_t n_receiving_threads; + const H5Format& format; hsize_t frames_per_file; + std::list> receiving_threads; boost::thread writing_thread; typedef std::unordered_map header_map; @@ -56,15 +63,18 @@ class WriterManager const std::deque logs; - void write_h5(std::string output_file, - uint64_t n_frames); + void receive_zmq(); + void write_h5(std::string output_file, uint64_t n_frames); + void write_h5_format(H5::H5File& file); - public: - WriterManager(RingBuffer& ring_buffer, + + WriterManager(ZmqReceiver& receiver, + RingBuffer& ring_buffer, const H5Format& format, std::shared_ptr header_values_type, + uint8_t n_receiving_threads, hsize_t frames_per_file=0); virtual ~WriterManager(); @@ -75,17 +85,13 @@ class WriterManager std::string get_status(); std::unordered_map get_statistics() const; - // Return True if the frame is to be received, False if is to be dropped. bool receive_frame(); - // True if the process should continue. bool is_running() const; bool is_writing() const; - // Return True if the frame is to be written, False otherwise. - bool write_frame(); - // True if the writing should continue. + bool should_write_frame(); + bool should_receive_frame(); - // Signal that the writing has completed. void writing_completed(); void writing_error(std::string error); };