From 32dba2320aaf8605d5a177dc0f8566b4f8358588 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 6 Apr 2020 14:44:41 +0200 Subject: [PATCH] Partial implementation of write thread --- core-writer/src/module/H5WriteModule.cpp | 158 +++++++++++++++++++++++ 1 file changed, 158 insertions(+) diff --git a/core-writer/src/module/H5WriteModule.cpp b/core-writer/src/module/H5WriteModule.cpp index 5829926..bcfdcde 100644 --- a/core-writer/src/module/H5WriteModule.cpp +++ b/core-writer/src/module/H5WriteModule.cpp @@ -1,5 +1,7 @@ #include "H5WriteModule.hpp" #include +#include +#include "BufferedWriter.hpp" using namespace std; @@ -68,3 +70,159 @@ void H5WriteModule::stop_writing() writing_thread_.join(); } } + +void H5WriteModule::write_thread( + const std::string& output_file, + const int n_frames, + const int user_id) +{ + // TODO: Take into account file rollover. + size_t metadata_buffer_length = n_frames; + MetadataBuffer metadata_buffer(metadata_buffer_length, header_values_); + + BufferedWriter writer(output_file, n_frames, metadata_buffer); + writer.close_file(); + + auto raw_frames_dataset_name = config::raw_image_dataset_name; + + try { + + writer.create_file(); + uint64_t last_pulse_id = 0; + + while(is_writing_.load(memory_order_relaxed)) { + + auto received_data = ring_buffer_.read(); + + // .first is nullptr if ringbuffer is empty. + if(received_data.first == nullptr) { + this_thread::sleep_for(chrono::milliseconds( + config::ring_buffer_read_retry_interval)); + continue; + } + + // Write file format before rolling to next file. + if (!writer.is_data_for_current_file( + received_data.first->frame_index)) { + + #ifdef DEBUG_OUTPUT + using namespace date; + using namespace chrono; + + cout << "[" << system_clock::now() << "]"; + cout << "[H5WriteModule::write_thread] Frame index "; + cout << received_data.first->frame_index; + cout << " does not belong to current file. "; + cout << " Write format before switching file." << endl; + #endif + + writer.write_metadata_to_file(); + write_h5_format(writer.get_h5_file()); + } + + #ifdef PERF_OUTPUT + using namespace date; + using namespace chrono; + + auto start_time_frame = system_clock::now(); + #endif + + // Write image data. + writer.write_data( + raw_frames_dataset_name, + received_data.first->frame_index, + received_data.second, + received_data.first->frame_shape, + received_data.first->frame_bytes_size, + received_data.first->type, + received_data.first->endianness); + + #ifdef PERF_OUTPUT + using namespace date; + using namespace chrono; + + auto frame_time_difference = system_clock::now() - start_time_frame; + + auto frame_diff_ms = + duration(frame_time_difference).count(); + + cout << "[" << system_clock::now() << "]"; + cout << "[H5WriteModule::write_thread] Frame index "; + cout << received_data.first->frame_index; + cout << " written in " << frame_diff_ms << " ms." << endl; + #endif + + ring_buffer_.release(received_data.first->buffer_slot_index); + + #ifdef PERF_OUTPUT + using namespace date; + using namespace chrono; + + auto start_time_metadata = system_clock::now(); + #endif + + for (const auto& header_type : header_values_) { + + auto& name = header_type.first; + auto value = received_data.first->header_values.at(name); + + writer.cache_metadata( + name, + received_data.first->frame_index, + value.get()); + } + + #ifdef PERF_OUTPUT + using namespace date; + using namespace chrono; + + auto metadata_time_difference = system_clock::now() - start_time_metadata; + auto metadata_diff_ms = duration(metadata_time_difference).count(); + + cout << "[" << system_clock::now() << "]"; + cout << "[H5WriteModule::write_thread] Frame metadata index "; + cout << received_data.first->frame_index << " written in " << metadata_diff_ms << " ms." << endl; + #endif + } + + if (writer.is_file_open()) { + #ifdef DEBUG_OUTPUT + using namespace date; + using namespace chrono; + + cout << "[" << system_clock::now() << "]"; + cout << "[H5WriteModule::write_thread]"; + cout << " Writing file format." << endl; + #endif + + writer.write_metadata_to_file(); + + write_h5_format(writer.get_h5_file()); + } + + #ifdef DEBUG_OUTPUT + using namespace date; + using namespace chrono; + + cout << "[" << system_clock::now() << "]"; + cout << "[H5WriteModule::write_thread]"; + cout << " Closing file " << output_file << endl; + #endif + + writer.close_file(); + + #ifdef DEBUG_OUTPUT + using namespace date; + using namespace chrono; + + cout << "[" << system_clock::now() << "]"; + cout << "[H5WriteModule::write_thread]"; + cout << " Writer thread stopped." << endl; + #endif + + writing_completed(); + + } catch (const exception& ex) { + writing_error(ex.what()); + } +}