diff --git a/lib/src/ProcessManager.cpp b/lib/src/ProcessManager.cpp index 4600a97..db0178e 100644 --- a/lib/src/ProcessManager.cpp +++ b/lib/src/ProcessManager.cpp @@ -149,152 +149,158 @@ void ProcessManager::receive_zmq() void ProcessManager::write_h5 (string output_file, uint64_t n_frames) { - size_t metadata_buffer_size = frames_per_file != 0 ? frames_per_file : n_frames; - auto metadata_buffer = unique_ptr(new MetadataBuffer(metadata_buffer_size, receiver.get_header_values_type())); + try { - auto writer = get_buffered_writer(output_file, n_frames, move(metadata_buffer), - frames_per_file, config::dataset_increase_step); - - writer->create_file(); - - auto raw_frames_dataset_name = config::raw_image_dataset_name; - - uint64_t last_pulse_id = 0; + size_t metadata_buffer_size = frames_per_file != 0 ? frames_per_file : n_frames; + auto metadata_buffer = unique_ptr(new MetadataBuffer(metadata_buffer_size, receiver.get_header_values_type())); - while(writer_manager.is_writing() || !ring_buffer.is_empty()) { + auto writer = get_buffered_writer(output_file, n_frames, move(metadata_buffer), + frames_per_file, config::dataset_increase_step); + + writer->create_file(); + + auto raw_frames_dataset_name = config::raw_image_dataset_name; + + uint64_t last_pulse_id = 0; - if (ring_buffer.is_empty()) { - boost::this_thread::sleep_for(boost::chrono::milliseconds(config::ring_buffer_read_retry_interval)); - continue; + while(writer_manager.is_writing() || !ring_buffer.is_empty()) { + + if (ring_buffer.is_empty()) { + boost::this_thread::sleep_for(boost::chrono::milliseconds(config::ring_buffer_read_retry_interval)); + continue; + } + + const pair< shared_ptr, char* > received_data = ring_buffer.read(); + + // NULL pointer means that the ringbuffer->read() timeouted. Faster than rising an exception. + if(!received_data.first) { + continue; + } + + // The acquisition stops when there are no more frames to write. + if (!writer_manager.write_frame()) { + break; + } + + // When using file roll over, write the file format before switching to the next file. + if (!writer->is_data_for_current_file(received_data.first->frame_index)) { + #ifdef DEBUG_OUTPUT + using namespace date; + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << "[ProcessManager::write_h5] Frame index " << received_data.first->frame_index; + cout << " does not belong to current file. Write format before the file will be closed." << endl; + #endif + + writer->write_metadata_to_file(); + + write_h5_format(writer->get_h5_file()); + } + + #ifdef PERF_OUTPUT + using namespace date; + auto start_time_frame = std::chrono::system_clock::now(); + #endif + + // Write image data. + writer->write_data(raw_frames_dataset_name, + received_data.first->frame_index, + received_data.second, + received_data.first->frame_shape, + received_data.first->frame_bytes_size, + received_data.first->type, + received_data.first->endianness); + + #ifdef PERF_OUTPUT + using namespace date; + using namespace std::chrono; + + auto frame_time_difference = std::chrono::system_clock::now() - start_time_frame; + auto frame_diff_ms = duration(frame_time_difference).count(); + + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << "[ProcessManager::write_h5] Frame index "; + cout << received_data.first->frame_index << " written in " << frame_diff_ms << " ms." << endl; + #endif + + ring_buffer.release(received_data.first->buffer_slot_index); + + #ifdef PERF_OUTPUT + using namespace date; + auto start_time_metadata = std::chrono::system_clock::now(); + #endif + + // Write image metadata if mapping specified. + auto header_values_type = receiver.get_header_values_type(); + if (header_values_type) { + + for (const auto& header_type : *header_values_type) { + + auto& name = header_type.first; + auto value = received_data.first->header_values.at(name); + + // TODO: Ugly hack until we get the start sequence in the bsread stream itself. + if (name == "pulse_id") { + if (!last_pulse_id) { + last_pulse_id = *(reinterpret_cast(value.get())); + notify_first_pulse_id(last_pulse_id); + } else { + last_pulse_id = *(reinterpret_cast(value.get())); + } + } + + writer->cache_metadata(name, received_data.first->frame_index, value.get()); + } + } + + #ifdef PERF_OUTPUT + using namespace date; + using namespace std::chrono; + + auto metadata_time_difference = std::chrono::system_clock::now() - start_time_metadata; + auto metadata_diff_ms = duration(metadata_time_difference).count(); + + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << "[ProcessManager::write_h5] Frame metadata index "; + cout << received_data.first->frame_index << " written in " << metadata_diff_ms << " ms." << endl; + #endif } - - const pair< shared_ptr, char* > received_data = ring_buffer.read(); - - // NULL pointer means that the ringbuffer->read() timeouted. Faster than rising an exception. - if(!received_data.first) { - continue; + + // Send the last_pulse_id only if it was set. + if (last_pulse_id) { + notify_last_pulse_id(last_pulse_id); } - - // The acquisition stops when there are no more frames to write. - if (!writer_manager.write_frame()) { - break; - } - - // When using file roll over, write the file format before switching to the next file. - if (!writer->is_data_for_current_file(received_data.first->frame_index)) { + + if (writer->is_file_open()) { #ifdef DEBUG_OUTPUT using namespace date; cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[ProcessManager::write_h5] Frame index " << received_data.first->frame_index; - cout << " does not belong to current file. Write format before the file will be closed." << endl; + cout << "[ProcessManager::write] Writing file format." << endl; #endif - + writer->write_metadata_to_file(); - + write_h5_format(writer->get_h5_file()); } - - #ifdef PERF_OUTPUT - using namespace date; - auto start_time_frame = std::chrono::system_clock::now(); - #endif - - // Write image data. - writer->write_data(raw_frames_dataset_name, - received_data.first->frame_index, - received_data.second, - received_data.first->frame_shape, - received_data.first->frame_bytes_size, - received_data.first->type, - received_data.first->endianness); - - #ifdef PERF_OUTPUT - using namespace date; - using namespace std::chrono; - - auto frame_time_difference = std::chrono::system_clock::now() - start_time_frame; - auto frame_diff_ms = duration(frame_time_difference).count(); - - cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[ProcessManager::write_h5] Frame index "; - cout << received_data.first->frame_index << " written in " << frame_diff_ms << " ms." << endl; - #endif - - ring_buffer.release(received_data.first->buffer_slot_index); - - #ifdef PERF_OUTPUT - using namespace date; - auto start_time_metadata = std::chrono::system_clock::now(); - #endif - - // Write image metadata if mapping specified. - auto header_values_type = receiver.get_header_values_type(); - if (header_values_type) { - - for (const auto& header_type : *header_values_type) { - - auto& name = header_type.first; - auto value = received_data.first->header_values.at(name); - - // TODO: Ugly hack until we get the start sequence in the bsread stream itself. - if (name == "pulse_id") { - if (!last_pulse_id) { - last_pulse_id = *(reinterpret_cast(value.get())); - notify_first_pulse_id(last_pulse_id); - } else { - last_pulse_id = *(reinterpret_cast(value.get())); - } - } - - writer->cache_metadata(name, received_data.first->frame_index, value.get()); - } - } - - #ifdef PERF_OUTPUT - using namespace date; - using namespace std::chrono; - - auto metadata_time_difference = std::chrono::system_clock::now() - start_time_metadata; - auto metadata_diff_ms = duration(metadata_time_difference).count(); - - cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[ProcessManager::write_h5] Frame metadata index "; - cout << received_data.first->frame_index << " written in " << metadata_diff_ms << " ms." << endl; - #endif - } - - // Send the last_pulse_id only if it was set. - if (last_pulse_id) { - notify_last_pulse_id(last_pulse_id); - } - - if (writer->is_file_open()) { + #ifdef DEBUG_OUTPUT using namespace date; cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[ProcessManager::write] Writing file format." << endl; + cout << "[ProcessManager::write] Closing file " << writer_manager.get_output_file() << endl; #endif - - writer->write_metadata_to_file(); - write_h5_format(writer->get_h5_file()); + writer->close_file(); + + #ifdef DEBUG_OUTPUT + using namespace date; + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << "[ProcessManager::write] Writer thread stopped." << endl; + #endif + + writer_manager.writing_completed(); + + } catch (const exception& ex) { + writer_manager.writing_error(ex.what()); } - - #ifdef DEBUG_OUTPUT - using namespace date; - cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[ProcessManager::write] Closing file " << writer_manager.get_output_file() << endl; - #endif - - writer->close_file(); - - #ifdef DEBUG_OUTPUT - using namespace date; - cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[ProcessManager::write] Writer thread stopped." << endl; - #endif - - writer_manager.writing_completed(); } void ProcessManager::write_h5_format(H5::H5File& file) { diff --git a/lib/src/WriterManager.cpp b/lib/src/WriterManager.cpp index 03858e7..86ea492 100644 --- a/lib/src/WriterManager.cpp +++ b/lib/src/WriterManager.cpp @@ -105,7 +105,7 @@ void WriterManager::start(const unordered_map& new_parameter stringstream output_message; using namespace date; output_message << "[" << std::chrono::system_clock::now() << "]"; - output_message << "[WriterManager::sttart] Starting with parameters: "; + output_message << "[WriterManager::start] Starting with parameters: "; for (const auto& parameter : new_parameters) { auto& parameter_name = parameter.first; @@ -154,4 +154,29 @@ bool WriterManager::write_frame() { void WriterManager::writing_completed() { writing_flag = false; + + #ifdef DEBUG_OUTPUT + stringstream output_message; + using namespace date; + output_message << "[" << std::chrono::system_clock::now() << "]"; + output_message << "[WriterManager::writing_completed] Writing has finished."; + output_message << endl; + #endif + + //TODO: Send this event somewhere somehow? +} + +void WriterManager::writing_error(string error_message) { + writing_flag = false; + + + #ifdef DEBUG_OUTPUT + stringstream output_message; + using namespace date; + output_message << "[" << std::chrono::system_clock::now() << "]"; + output_message << "[WriterManager::writing_error] Error while writing: "; + output_message << error_message << endl; + #endif + + // TODO: Send this error somewhere? } diff --git a/lib/src/WriterManager.hpp b/lib/src/WriterManager.hpp index 6151af3..29a8edd 100644 --- a/lib/src/WriterManager.hpp +++ b/lib/src/WriterManager.hpp @@ -59,8 +59,11 @@ class WriterManager bool write_frame(); // True if the writing should continue. bool is_writing() const; + + // Signal that the writing has completed. void writing_completed(); + void writing_error(std::string error_message); }; #endif