Record lost frames in manager

This commit is contained in:
2018-01-31 15:24:41 +01:00
parent af5b08bc00
commit 0952aeaa49
3 changed files with 26 additions and 9 deletions
+12 -3
View File
@@ -6,7 +6,8 @@
using namespace std;
WriterManager::WriterManager(const map<string, DATA_TYPE>* parameters_type, uint64_t n_frames):
parameters_type(parameters_type), n_frames(n_frames), running_flag(true), killed_flag(false), n_received_frames(0), n_written_frames(0)
parameters_type(parameters_type), n_frames(n_frames), running_flag(true), killed_flag(false),
n_received_frames(0), n_written_frames(0), n_lost_frames(0)
{
#ifdef DEBUG_OUTPUT
cout << "[WriterManager::WriterManager] Writer manager for n_frames " << n_frames << endl;
@@ -50,6 +51,7 @@ map<string, uint64_t> WriterManager::get_statistics()
{
map<string, uint64_t> result = {{"n_received_frames", n_received_frames.load()},
{"n_written_frames", n_written_frames.load()},
{"n_lost_frames", n_lost_frames.load()},
{"total_expected_frames", n_frames}};
return result;
@@ -87,7 +89,8 @@ void WriterManager::set_parameters(const map<string, boost::any>& new_parameters
#endif
}
const map<string, DATA_TYPE>* WriterManager::get_parameters_type() {
const map<string, DATA_TYPE>* WriterManager::get_parameters_type()
{
return parameters_type;
}
@@ -116,7 +119,13 @@ void WriterManager::written_frame(size_t frame_index)
n_written_frames++;
}
bool WriterManager::are_all_parameters_set() {
void WriterManager::lost_frame(size_t frame_index)
{
n_lost_frames++;
}
bool WriterManager::are_all_parameters_set()
{
lock_guard<mutex> lock(parameters_mutex);
for (const auto& parameter : *parameters_type) {
+2
View File
@@ -22,6 +22,7 @@ class WriterManager
std::atomic_bool killed_flag;
std::atomic<uint64_t> n_received_frames;
std::atomic<uint64_t> n_written_frames;
std::atomic<uint64_t> n_lost_frames;
public:
WriterManager(const std::map<std::string, DATA_TYPE>* parameters_type, uint64_t n_frames=0);
@@ -39,6 +40,7 @@ class WriterManager
std::map<std::string, uint64_t> get_statistics();
void received_frame(size_t frame_index);
void written_frame(size_t frame_index);
void lost_frame(size_t frame_index);
};
#endif
+12 -6
View File
@@ -30,7 +30,7 @@ void write_h5(WriterManager& manager, RingBuffer& ring_buffer, string output_fil
continue;
}
pair<FrameMetadata, char*> received_data = ring_buffer.read();
const pair<FrameMetadata, char*> received_data = ring_buffer.read();
// NULL pointer means that the ringbuffer->read() timeouted. Faster than rising an exception.
if(!received_data.second) {
@@ -61,7 +61,7 @@ void write_h5(WriterManager& manager, RingBuffer& ring_buffer, string output_fil
// Need to check again if we have all parameters to write down the format.
if (manager.are_all_parameters_set()) {
auto parameters = manager.get_parameters();
const auto parameters = manager.get_parameters();
// Even if we can't write the format, lets try to preserve the data.
try {
@@ -116,9 +116,9 @@ void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, string connect
frame_metadata.frame_index = json_header.get<uint64_t>("frame");
uint8_t index = 0;
for (auto item : json_header.get_child("shape")) {
for (const auto& item : json_header.get_child("shape")) {
frame_metadata.frame_shape[index] = item.second.get_value<size_t>();
index++;
++index;
}
// Array 1.0 specified little endian as the default encoding.
@@ -127,8 +127,14 @@ void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, string connect
frame_metadata.type = json_header.get<string>("type");
// Get the message data.
// TODO: Check if the message was received, otherwise log the exception.
receiver.recv(&message_data);
if (!receiver.recv(&message_data)) {
cout << "[h5_zmq_writer::receive_zmq] ERROR: Error while reading from ZMQ. Frame index " << frame_metadata.frame_index << " lost.";
cout << " Trying to continue with the next frame." << endl;
manager.lost_frame(frame_metadata.frame_index);
continue;
}
frame_metadata.frame_bytes_size = message_data.size();