From 724b7a1444138e7dad3986263fd6a43d5293f3ac Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 10 Jan 2018 11:56:26 +0100 Subject: [PATCH] Runner script refactoring --- src/h5_zmq_writer.cpp | 55 ++++++++++++++++++++++++++++++++----------- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/src/h5_zmq_writer.cpp b/src/h5_zmq_writer.cpp index 1fbcff2..fb409b2 100644 --- a/src/h5_zmq_writer.cpp +++ b/src/h5_zmq_writer.cpp @@ -4,13 +4,15 @@ #include #include "rapidjson/document.h" +#include "config.hpp" #include "WriterManager.hpp" #include "H5ChunkedWriter.hpp" #include "RingBuffer.hpp" +#include "rest_interface.hpp" using namespace std; -void write(RingBuffer *ring_buffer, string output_file, WriterManager *manager) +void write(WriterManager *manager, RingBuffer *ring_buffer, string output_file) { string dataset_name = "data"; HDF5ChunkedWriter writer(output_file, dataset_name); @@ -32,13 +34,8 @@ void write(RingBuffer *ring_buffer, string output_file, WriterManager *manager) writer.close_file(); } -void receive(string connect_address, uint64_t n_images, int n_slots=100, int n_io_threads=1) +void receive(WriterManager *manager, RingBuffer *ring_buffer, string connect_address, int n_io_threads=1) { - WriterManager manager(n_images); - RingBuffer ring_buffer(n_slots); - - thread writer_thread(write, &ring_buffer, "output.h5", &manager); - zmq::context_t context(n_io_threads); zmq::socket_t receiver(context, ZMQ_PULL); receiver.connect(connect_address); @@ -48,7 +45,7 @@ void receive(string connect_address, uint64_t n_images, int n_slots=100, int n_i rapidjson::Document header_parser; - while (manager.is_running()) { + while (manager->is_running()) { // Get the message header. receiver.recv(&message_data); @@ -68,23 +65,53 @@ void receive(string connect_address, uint64_t n_images, int n_slots=100, int n_i frame_metadata.frame_bytes_size = message_data.size(); // Commit the frame to the buffer. - ring_buffer.write(frame_metadata, static_cast(message_data.data())); + ring_buffer->write(frame_metadata, static_cast(message_data.data())); - manager.received_frame(frame_metadata.frame_index); + manager->received_frame(frame_metadata.frame_index); } +} - writer_thread.join(); +void run_writer(string connect_address, string output_file, uint64_t n_images){ + + size_t n_slots = config::n_slots; + int n_io_threads = config::n_io_threads; + + WriterManager manager(n_images); + RingBuffer ring_buffer(n_slots); + + #ifdef DEBUG + cout << "[h5_zmq_writer::run_writer] Running writer with "; + cout << "connect_address " << connect_address << " " << std::endl; + cout << "output_file " << output_file << " " << std::endl; + cout << "n_slots " << n_slots << " " << std::endl; + cout << "n_io_threads " << n_io_threads << " " << std::endl; + #endif + + thread receiver_thread(receive, &manager, &ring_buffer, connect_address, n_io_threads); + thread writer_thread(write, &manager, &ring_buffer, output_file); + + start_rest_api(manager, config::rest_port); + + receiver_thread.join(); + writer_thread.join(); + + #ifdef DEBUG + cout << "[h5_zmq_writer::run_writer] Writer properly stopped." << endl; + #endif } int main (int argc, char *argv[]) { - if (argc != 3) { - cout << "Usage: h5_zmq_writer [connection_address] [n_images]" << endl; + if (argc != 4) { + cout << "Usage: h5_zmq_writer [connection_address] [output_file] [n_images] [rest_port]" << endl; cout << "\tconnection_address: Address to connect to the stream (PULL). Example: tcp://127.0.0.1:40000" << endl; + cout << "\output_file: Name of the output file." << endl; cout << "\tn_images: Number of images to acquire. 0 for infinity (untill STOP is called)." << endl; + exit(-1); } - receive(string(argv[1]), atoi(argv[2])); + run_writer(string(argv[1]), string(argv[2]), atoi(argv[3])); + return 0; }