diff --git a/src/h5_zmq_writer.cpp b/src/h5_zmq_writer.cpp index fda3ef7..ba63492 100644 --- a/src/h5_zmq_writer.cpp +++ b/src/h5_zmq_writer.cpp @@ -1,20 +1,20 @@ #include #include -#include #include "rapidjson/document.h" #include #include "RingBuffer.hpp" #include "H5ChunkedWriter.hpp" +#include "WriterManager.hpp" using namespace std; -void write(RingBuffer *ring_buffer, string output_file, atomic_bool* running_flag) +void write(RingBuffer *ring_buffer, string output_file, WriterManager &manager) { string dataset_name = "data"; HDF5ChunkedWriter writer(output_file, dataset_name); // Run until the running flag is set or the ring_buffer is empty. - while(*running_flag || !ring_buffer->is_empty()) { + while(manager.is_running() || !ring_buffer->is_empty()) { pair received_data = ring_buffer->read(); writer.write_data(received_data.first.frame_index, @@ -23,32 +23,30 @@ void write(RingBuffer *ring_buffer, string output_file, atomic_bool* running_fla received_data.second); ring_buffer->release(received_data.first.buffer_slot_index); + + manager.written_frame(received_data.first.frame_index); } writer.close_file(); } -void receive(int num_io_threads, string connect_address, uint64_t n_images, int n_slots=100) +void receive(string connect_address, uint64_t n_images, int n_slots=100, int n_io_threads=1) { - bool ring_buffer_initialized = false; + WriterManager manager(n_images); RingBuffer ring_buffer(n_slots); - atomic_bool running_flag(true); thread writer_thread(write, &ring_buffer, "output.h5", &running_flag); - zmq::context_t context(num_io_threads); + zmq::context_t context(n_io_threads); zmq::socket_t receiver(context, ZMQ_PULL); receiver.connect(connect_address); - - // int has_more = 0; - // size_t has_more_size = sizeof(has_more); zmq::message_t message_data; FrameMetadata frame_metadata; rapidjson::Document header_parser; - for (uint64_t i=0; i(message_data.data())); - } - running_flag = false; + manager.received_frame(frame_metadata.frame_index); + } writer_thread.join(); } int main (int argc, char *argv[]) { - if (argc != 2) { - cout << "Usage: h5_zmq_writer [n_images]" << endl; + if (argc != 3) { + cout << "Usage: h5_zmq_writer [connection_address] [n_images]" << endl; exit(-1); } - int num_io_threads = 1; - receive(num_io_threads, "tcp://127.0.0.1:40000", atoi(argv[1])); + receive(string(argv[1]), atoi(argv[2])); return 0; }