diff --git a/src/h5_zmq_writer.cpp b/src/h5_zmq_writer.cpp index ba63492..1fbcff2 100644 --- a/src/h5_zmq_writer.cpp +++ b/src/h5_zmq_writer.cpp @@ -1,20 +1,22 @@ #include #include -#include "rapidjson/document.h" #include -#include "RingBuffer.hpp" -#include "H5ChunkedWriter.hpp" +#include +#include "rapidjson/document.h" + #include "WriterManager.hpp" +#include "H5ChunkedWriter.hpp" +#include "RingBuffer.hpp" using namespace std; -void write(RingBuffer *ring_buffer, string output_file, WriterManager &manager) +void write(RingBuffer *ring_buffer, string output_file, WriterManager *manager) { string dataset_name = "data"; HDF5ChunkedWriter writer(output_file, dataset_name); // Run until the running flag is set or the ring_buffer is empty. - while(manager.is_running() || !ring_buffer->is_empty()) { + while(manager->is_running() || !ring_buffer->is_empty()) { pair received_data = ring_buffer->read(); writer.write_data(received_data.first.frame_index, @@ -24,7 +26,7 @@ void write(RingBuffer *ring_buffer, string output_file, WriterManager &manager) ring_buffer->release(received_data.first.buffer_slot_index); - manager.written_frame(received_data.first.frame_index); + manager->written_frame(received_data.first.frame_index); } writer.close_file(); @@ -35,7 +37,7 @@ void receive(string connect_address, uint64_t n_images, int n_slots=100, int n_i WriterManager manager(n_images); RingBuffer ring_buffer(n_slots); - thread writer_thread(write, &ring_buffer, "output.h5", &running_flag); + thread writer_thread(write, &ring_buffer, "output.h5", &manager); zmq::context_t context(n_io_threads); zmq::socket_t receiver(context, ZMQ_PULL); @@ -46,7 +48,7 @@ void receive(string connect_address, uint64_t n_images, int n_slots=100, int n_i rapidjson::Document header_parser; - for (manager.is_running()) { + while (manager.is_running()) { // Get the message header. receiver.recv(&message_data); @@ -65,11 +67,6 @@ void receive(string connect_address, uint64_t n_images, int n_slots=100, int n_i receiver.recv(&message_data); frame_metadata.frame_bytes_size = message_data.size(); - if (!ring_buffer_initialized) { - ring_buffer.initialize(frame_metadata.frame_bytes_size); - ring_buffer_initialized = true; - } - // Commit the frame to the buffer. ring_buffer.write(frame_metadata, static_cast(message_data.data())); @@ -83,6 +80,8 @@ int main (int argc, char *argv[]) { if (argc != 3) { cout << "Usage: h5_zmq_writer [connection_address] [n_images]" << endl; + cout << "\tconnection_address: Address to connect to the stream (PULL). Example: tcp://127.0.0.1:40000" << endl; + cout << "\tn_images: Number of images to acquire. 0 for infinity (untill STOP is called)." << endl; exit(-1); }