diff --git a/src/WriterManager.cpp b/src/WriterManager.cpp index 502a263..82b0f06 100644 --- a/src/WriterManager.cpp +++ b/src/WriterManager.cpp @@ -6,7 +6,8 @@ using namespace std; WriterManager::WriterManager(const map* parameters_type, uint64_t n_frames): - parameters_type(parameters_type), n_frames(n_frames), running_flag(true), killed_flag(false), n_received_frames(0), n_written_frames(0) + parameters_type(parameters_type), n_frames(n_frames), running_flag(true), killed_flag(false), + n_received_frames(0), n_written_frames(0), n_lost_frames(0) { #ifdef DEBUG_OUTPUT cout << "[WriterManager::WriterManager] Writer manager for n_frames " << n_frames << endl; @@ -50,6 +51,7 @@ map WriterManager::get_statistics() { map result = {{"n_received_frames", n_received_frames.load()}, {"n_written_frames", n_written_frames.load()}, + {"n_lost_frames", n_lost_frames.load()}, {"total_expected_frames", n_frames}}; return result; @@ -87,7 +89,8 @@ void WriterManager::set_parameters(const map& new_parameters #endif } -const map* WriterManager::get_parameters_type() { +const map* WriterManager::get_parameters_type() +{ return parameters_type; } @@ -116,7 +119,13 @@ void WriterManager::written_frame(size_t frame_index) n_written_frames++; } -bool WriterManager::are_all_parameters_set() { +void WriterManager::lost_frame(size_t frame_index) +{ + n_lost_frames++; +} + +bool WriterManager::are_all_parameters_set() +{ lock_guard lock(parameters_mutex); for (const auto& parameter : *parameters_type) { diff --git a/src/WriterManager.hpp b/src/WriterManager.hpp index c761ec7..0c745d4 100644 --- a/src/WriterManager.hpp +++ b/src/WriterManager.hpp @@ -22,6 +22,7 @@ class WriterManager std::atomic_bool killed_flag; std::atomic n_received_frames; std::atomic n_written_frames; + std::atomic n_lost_frames; public: WriterManager(const std::map* parameters_type, uint64_t n_frames=0); @@ -39,6 +40,7 @@ class WriterManager std::map get_statistics(); void received_frame(size_t frame_index); void written_frame(size_t frame_index); + void lost_frame(size_t frame_index); }; #endif \ No newline at end of file diff --git a/src/h5_zmq_writer.cpp b/src/h5_zmq_writer.cpp index 640b6d0..f03cbec 100644 --- a/src/h5_zmq_writer.cpp +++ b/src/h5_zmq_writer.cpp @@ -30,7 +30,7 @@ void write_h5(WriterManager& manager, RingBuffer& ring_buffer, string output_fil continue; } - pair received_data = ring_buffer.read(); + const pair received_data = ring_buffer.read(); // NULL pointer means that the ringbuffer->read() timeouted. Faster than rising an exception. if(!received_data.second) { @@ -61,7 +61,7 @@ void write_h5(WriterManager& manager, RingBuffer& ring_buffer, string output_fil // Need to check again if we have all parameters to write down the format. if (manager.are_all_parameters_set()) { - auto parameters = manager.get_parameters(); + const auto parameters = manager.get_parameters(); // Even if we can't write the format, lets try to preserve the data. try { @@ -116,9 +116,9 @@ void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, string connect frame_metadata.frame_index = json_header.get("frame"); uint8_t index = 0; - for (auto item : json_header.get_child("shape")) { + for (const auto& item : json_header.get_child("shape")) { frame_metadata.frame_shape[index] = item.second.get_value(); - index++; + ++index; } // Array 1.0 specified little endian as the default encoding. @@ -127,8 +127,14 @@ void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, string connect frame_metadata.type = json_header.get("type"); // Get the message data. - // TODO: Check if the message was received, otherwise log the exception. - receiver.recv(&message_data); + if (!receiver.recv(&message_data)) { + cout << "[h5_zmq_writer::receive_zmq] ERROR: Error while reading from ZMQ. Frame index " << frame_metadata.frame_index << " lost."; + cout << " Trying to continue with the next frame." << endl; + + manager.lost_frame(frame_metadata.frame_index); + + continue; + } frame_metadata.frame_bytes_size = message_data.size();