From 191ea4eaf5bd2f3ee066c39a9d9ac9eae20a3008 Mon Sep 17 00:00:00 2001 From: Babicaa Date: Fri, 26 Apr 2019 10:50:29 +0200 Subject: [PATCH] Merge PSIWriter with WriterManager It does not make sense to have 2 separate entities for this. The writer manager only responsibility was to control the PSIWriter. --- lib/src/PSIWriter.cpp | 222 ------------------------------------------ lib/src/PSIWriter.hpp | 48 --------- 2 files changed, 270 deletions(-) delete mode 100644 lib/src/PSIWriter.cpp delete mode 100644 lib/src/PSIWriter.hpp diff --git a/lib/src/PSIWriter.cpp b/lib/src/PSIWriter.cpp deleted file mode 100644 index 237e9fa..0000000 --- a/lib/src/PSIWriter.cpp +++ /dev/null @@ -1,222 +0,0 @@ -#include "PSIWriter.hpp" -#include "config.hpp" -#include "BufferedWriter.hpp" - -using namespace std; - - -void PSIWriter::run_writer(WriterManager& writer_manager, - string output_file, - uint64_t n_frames) -{ - writing_thread = boost::thread(&PSIWriter::write_h5, - this, - boost::ref(writer_manager), - output_file, - n_frames); -} - -void PSIWriter::join_writer(){ - writing_thread.join(); -} - -void PSIWriter::write_h5(WriterManager& writer_manager, - string output_file, - uint64_t n_frames) -{ - try { - - size_t metadata_buffer_size = - frames_per_file != 0 ? frames_per_file : n_frames; - - auto metadata_buffer = unique_ptr( - new MetadataBuffer(metadata_buffer_size, - header_values_type)); - - auto writer = get_buffered_writer( - output_file, - n_frames, - move(metadata_buffer), - frames_per_file, - config::dataset_increase_step); - - writer->create_file(); - - auto raw_frames_dataset_name = config::raw_image_dataset_name; - - uint64_t last_pulse_id = 0; - - while(writer_manager.is_writing() || !ring_buffer.is_empty()) { - - if (ring_buffer.is_empty()) { - boost::this_thread::sleep_for( - boost::chrono::milliseconds( - config::ring_buffer_read_retry_interval)); - continue; - } - - const pair< shared_ptr, char* > received_data = - ring_buffer.read(); - - // NULL pointer means that the ringbuffer->read() timeouted. - if(!received_data.first) { - continue; - } - - // The acquisition stops when there are no more frames to write. - if (!writer_manager.write_frame()) { - break; - } - - // Write file format before rolling to next file. - if (!writer->is_data_for_current_file( - received_data.first->frame_index)) { - - #ifdef DEBUG_OUTPUT - using namespace date; - using namespace chrono; - - cout << "[" << system_clock::now() << "]"; - cout << "[PSIWriter::write_h5] Frame index "; - cout << received_data.first->frame_index; - cout << " does not belong to current file. "; - cout << " Write format before switching file." << endl; - #endif - - writer->write_metadata_to_file(); - - write_h5_format(writer->get_h5_file()); - } - - #ifdef PERF_OUTPUT - using namespace date; - using namespace chrono; - - auto start_time_frame = system_clock::now(); - #endif - - // Write image data. - writer->write_data(raw_frames_dataset_name, - received_data.first->frame_index, - received_data.second, - received_data.first->frame_shape, - received_data.first->frame_bytes_size, - received_data.first->type, - received_data.first->endianness); - - #ifdef PERF_OUTPUT - using namespace date; - using namespace chrono; - - auto frame_time_difference = system_clock::now() - start_time_frame; - - auto frame_diff_ms = - duration(frame_time_difference).count(); - - cout << "[" << system_clock::now() << "]"; - cout << "[PSIWriter::write_h5] Frame index "; - cout << received_data.first->frame_index; - cout << " written in " << frame_diff_ms << " ms." << endl; - #endif - - ring_buffer.release(received_data.first->buffer_slot_index); - - #ifdef PERF_OUTPUT - using namespace date; - using namespace chrono; - - auto start_time_metadata = system_clock::now(); - #endif - - // Write image metadata if mapping specified. - if (header_values_type) { - - for (const auto& header_type : *header_values_type) { - - auto& name = header_type.first; - auto value = received_data.first->header_values.at(name); - - // TODO: Ugly hack until we get the start sequence in bsread. - if (name == "pulse_id") { - if (!last_pulse_id) { - last_pulse_id = *(reinterpret_cast(value.get())); - //notify_first_pulse_id(last_pulse_id); - } else { - last_pulse_id = *(reinterpret_cast(value.get())); - } - } - - writer->cache_metadata(name, received_data.first->frame_index, value.get()); - } - } - - #ifdef PERF_OUTPUT - using namespace date; - using namespace chrono; - - auto metadata_time_difference = system_clock::now() - start_time_metadata; - auto metadata_diff_ms = duration(metadata_time_difference).count(); - - cout << "[" << system_clock::now() << "]"; - cout << "[ProcessManager::write_h5] Frame metadata index "; - cout << received_data.first->frame_index << " written in " << metadata_diff_ms << " ms." << endl; - #endif - } - - // Send the last_pulse_id only if it was set. - if (last_pulse_id) { - //notify_last_pulse_id(last_pulse_id); - } - - if (writer->is_file_open()) { - #ifdef DEBUG_OUTPUT - using namespace date; - using namespace chrono; - - cout << "[" << system_clock::now() << "]"; - cout << "[ProcessManager::write] Writing file format." << endl; - #endif - - writer->write_metadata_to_file(); - - write_h5_format(writer->get_h5_file()); - } - - #ifdef DEBUG_OUTPUT - using namespace date; - using namespace chrono; - - cout << "[" << system_clock::now() << "]"; - cout << "[ProcessManager::write] Closing file " << writer_manager.get_output_file() << endl; - #endif - - writer->close_file(); - - #ifdef DEBUG_OUTPUT - using namespace date; - using namespace chrono; - - cout << "[" << system_clock::now() << "]"; - cout << "[ProcessManager::write] Writer thread stopped." << endl; - #endif - - writer_manager.writing_completed(); - - } catch (const exception& ex) { - writer_manager.writing_error(ex.what()); - } -} - -void PSIWriter::write_h5_format(H5::H5File& file) { - - try { - H5FormatUtils::write_format(file, format, {}); - } catch (const runtime_error& ex) { - using namespace date; - using namespace chrono; - - cout << "[" << system_clock::now() << "]"; - cout << "[ProcessManager::write_h5_format] Error while"; - cout << " trying to write file format: "<< ex.what() << endl; - } -} diff --git a/lib/src/PSIWriter.hpp b/lib/src/PSIWriter.hpp deleted file mode 100644 index da9a0b8..0000000 --- a/lib/src/PSIWriter.hpp +++ /dev/null @@ -1,48 +0,0 @@ - -#ifndef PSIWRITER_H -#define PSIWRITER_H - -#include -#include "date.h" -#include - -#include "WriterManager.hpp" -#include "H5Format.hpp" -#include "RingBuffer.hpp" -#include "MetadataBuffer.hpp" -#include "ZmqReceiver.hpp" - - -class PSIWriter -{ - - RingBuffer& ring_buffer; - const H5Format& format; - hsize_t frames_per_file; - - typedef std::unordered_map header_map; - std::shared_ptr header_values_type = NULL; - - protected: - boost::thread writing_thread; - - void write_h5(WriterManager& writer_manager, - std::string output_file, - uint64_t n_frames); - - void write_h5_format(H5::H5File& file); - - public: - PSIWriter(RingBuffer& ring_buffer, - const H5Format& format, - std::shared_ptr header_values_type, - hsize_t frames_per_file=0); - - void run_writer(WriterManager& writer_manager, - std::string output_file, - uint64_t n_frames); - - void join_writer(); -}; - -#endif