From 2cd3e654bd13b0e553a1fc116807161474a76a81 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 7 Feb 2018 16:31:30 +0100 Subject: [PATCH] Refactor the writer runner --- src/WriterManager.cpp | 9 +++++++-- src/WriterManager.hpp | 4 +++- src/h5_zmq_writer.cpp | 29 ++++++++++++++++++----------- 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/src/WriterManager.cpp b/src/WriterManager.cpp index f9c6965..89829d2 100644 --- a/src/WriterManager.cpp +++ b/src/WriterManager.cpp @@ -5,8 +5,8 @@ using namespace std; -WriterManager::WriterManager(const map& parameters_type, uint64_t n_frames): - parameters_type(parameters_type), n_frames(n_frames), running_flag(true), killed_flag(false), +WriterManager::WriterManager(const map& parameters_type, const string& output_file, uint64_t n_frames): + parameters_type(parameters_type), output_file(output_file), n_frames(n_frames), running_flag(true), killed_flag(false), n_received_frames(0), n_written_frames(0), n_lost_frames(0) { #ifdef DEBUG_OUTPUT @@ -47,6 +47,11 @@ string WriterManager::get_status() } } +string WriterManager::get_output_file() +{ + return output_file; +} + map WriterManager::get_statistics() { map result = {{"n_received_frames", n_received_frames.load()}, diff --git a/src/WriterManager.hpp b/src/WriterManager.hpp index 56bd284..0afcc03 100644 --- a/src/WriterManager.hpp +++ b/src/WriterManager.hpp @@ -23,15 +23,17 @@ class WriterManager std::atomic n_received_frames; std::atomic n_written_frames; std::atomic n_lost_frames; + std::string output_file; public: - WriterManager(const std::map& parameters_type, uint64_t n_frames=0); + WriterManager(const std::map& parameters_type, const std::string& output_file, uint64_t n_frames=0); void stop(); void kill(); bool is_running(); bool is_killed(); std::string get_status(); bool are_all_parameters_set(); + std::string get_output_file(); const std::map& get_parameters_type(); std::map get_parameters(); diff --git a/src/h5_zmq_writer.cpp b/src/h5_zmq_writer.cpp index 6f4c0c3..4cd687f 100644 --- a/src/h5_zmq_writer.cpp +++ b/src/h5_zmq_writer.cpp @@ -20,9 +20,9 @@ using namespace std; namespace pt = boost::property_tree; -void write_h5(WriterManager& manager, H5Format& format, RingBuffer& ring_buffer, string output_file) +void write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer) { - H5Writer writer(output_file, format.get_raw_frames_dataset_name()); + H5Writer writer(manager.get_output_file(), format.get_raw_frames_dataset_name()); // Run until the running flag is set or the ring_buffer is empty. while(manager.is_running() || !ring_buffer.is_empty()) { @@ -75,7 +75,7 @@ void write_h5(WriterManager& manager, H5Format& format, RingBuffer& ring_buffer, } #ifdef DEBUG_OUTPUT - cout << "[h5_zmq_writer::write] Closing file " << output_file << endl; + cout << "[h5_zmq_writer::write] Closing file " << manager.get_output_file() << endl; #endif writer.close_file(); @@ -88,7 +88,7 @@ void write_h5(WriterManager& manager, H5Format& format, RingBuffer& ring_buffer, exit(0); } -void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, string connect_address, int n_io_threads=1, int receive_timeout=-1) +void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, const string connect_address, int n_io_threads=1, int receive_timeout=-1) { zmq::context_t context(n_io_threads); zmq::socket_t receiver(context, ZMQ_PULL); @@ -161,20 +161,18 @@ void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, string connect #endif } -void run_writer(string connect_address, string output_file, uint64_t n_frames, uint16_t rest_port) +void run_writer(WriterManager& manager, const H5Format& format, const string& connect_address, uint16_t rest_port) { size_t n_slots = config::ring_buffer_n_slots; int n_io_threads = config::zmq_n_io_threads; int receive_timeout = config::zmq_receive_timeout; - NXmxFormat format; - WriterManager manager(format.get_input_value_type(), n_frames); RingBuffer ring_buffer(n_slots); #ifdef DEBUG_OUTPUT cout << "[h5_zmq_writer::run_writer] Running writer"; cout << " with connect_address " << connect_address; - cout << " and output_file " << output_file; + cout << " and output_file " << manager.get_output_file(); cout << " and n_slots " << n_slots; cout << " and n_io_threads " << n_io_threads; cout << " and receive_timeout " << receive_timeout; @@ -182,7 +180,7 @@ void run_writer(string connect_address, string output_file, uint64_t n_frames, u #endif boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer), connect_address, n_io_threads, receive_timeout); - boost::thread writer_thread(write_h5, boost::ref(manager), boost::ref(format), boost::ref(ring_buffer), output_file); + boost::thread writer_thread(write_h5, boost::ref(manager), boost::ref(format), boost::ref(ring_buffer)); start_rest_api(manager, rest_port); @@ -208,7 +206,7 @@ int main (int argc, char *argv[]) cout << "Usage: h5_zmq_writer [connection_address] [output_file] [n_frames] [rest_port] [user_id]" << endl; cout << "\tconnection_address: Address to connect to the stream (PULL). Example: tcp://127.0.0.1:40000" << endl; cout << "\toutput_file: Name of the output file." << endl; - cout << "\t_frames: Number of images to acquire. 0 for infinity (untill /stop is called)." << endl; + cout << "\tn_frames: Number of images to acquire. 0 for infinity (untill /stop is called)." << endl; cout << "\trest_port: Port to start the REST Api on." << endl; cout << "\tuser_id: uid under which to run the writer. -1 to leave it as it is." << endl; cout << endl; @@ -232,7 +230,16 @@ int main (int argc, char *argv[]) } } - run_writer(string(argv[1]), string(argv[2]), atoi(argv[3]), atoi(argv[4])); + int n_frames = atoi(argv[3]); + string output_file = string(argv[2]); + + NXmxFormat format; + WriterManager manager(format.get_input_value_type(), output_file, n_frames); + + string connect_address = string(argv[1]); + int rest_port = atoi(argv[4]); + + run_writer(manager, format, connect_address, rest_port); return 0; }