Refactor the writer runner

This commit is contained in:
2018-02-07 16:31:30 +01:00
parent 857123cd29
commit 2cd3e654bd
3 changed files with 28 additions and 14 deletions
+7 -2
View File
@@ -5,8 +5,8 @@
using namespace std;
WriterManager::WriterManager(const map<string, DATA_TYPE>& parameters_type, uint64_t n_frames):
parameters_type(parameters_type), n_frames(n_frames), running_flag(true), killed_flag(false),
WriterManager::WriterManager(const map<string, DATA_TYPE>& parameters_type, const string& output_file, uint64_t n_frames):
parameters_type(parameters_type), output_file(output_file), n_frames(n_frames), running_flag(true), killed_flag(false),
n_received_frames(0), n_written_frames(0), n_lost_frames(0)
{
#ifdef DEBUG_OUTPUT
@@ -47,6 +47,11 @@ string WriterManager::get_status()
}
}
string WriterManager::get_output_file()
{
return output_file;
}
map<string, uint64_t> WriterManager::get_statistics()
{
map<string, uint64_t> result = {{"n_received_frames", n_received_frames.load()},
+3 -1
View File
@@ -23,15 +23,17 @@ class WriterManager
std::atomic<uint64_t> n_received_frames;
std::atomic<uint64_t> n_written_frames;
std::atomic<uint64_t> n_lost_frames;
std::string output_file;
public:
WriterManager(const std::map<std::string, DATA_TYPE>& parameters_type, uint64_t n_frames=0);
WriterManager(const std::map<std::string, DATA_TYPE>& parameters_type, const std::string& output_file, uint64_t n_frames=0);
void stop();
void kill();
bool is_running();
bool is_killed();
std::string get_status();
bool are_all_parameters_set();
std::string get_output_file();
const std::map<std::string, DATA_TYPE>& get_parameters_type();
std::map<std::string, boost::any> get_parameters();
+18 -11
View File
@@ -20,9 +20,9 @@
using namespace std;
namespace pt = boost::property_tree;
void write_h5(WriterManager& manager, H5Format& format, RingBuffer& ring_buffer, string output_file)
void write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer)
{
H5Writer writer(output_file, format.get_raw_frames_dataset_name());
H5Writer writer(manager.get_output_file(), format.get_raw_frames_dataset_name());
// Run until the running flag is set or the ring_buffer is empty.
while(manager.is_running() || !ring_buffer.is_empty()) {
@@ -75,7 +75,7 @@ void write_h5(WriterManager& manager, H5Format& format, RingBuffer& ring_buffer,
}
#ifdef DEBUG_OUTPUT
cout << "[h5_zmq_writer::write] Closing file " << output_file << endl;
cout << "[h5_zmq_writer::write] Closing file " << manager.get_output_file() << endl;
#endif
writer.close_file();
@@ -88,7 +88,7 @@ void write_h5(WriterManager& manager, H5Format& format, RingBuffer& ring_buffer,
exit(0);
}
void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, string connect_address, int n_io_threads=1, int receive_timeout=-1)
void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, const string connect_address, int n_io_threads=1, int receive_timeout=-1)
{
zmq::context_t context(n_io_threads);
zmq::socket_t receiver(context, ZMQ_PULL);
@@ -161,20 +161,18 @@ void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, string connect
#endif
}
void run_writer(string connect_address, string output_file, uint64_t n_frames, uint16_t rest_port)
void run_writer(WriterManager& manager, const H5Format& format, const string& connect_address, uint16_t rest_port)
{
size_t n_slots = config::ring_buffer_n_slots;
int n_io_threads = config::zmq_n_io_threads;
int receive_timeout = config::zmq_receive_timeout;
NXmxFormat format;
WriterManager manager(format.get_input_value_type(), n_frames);
RingBuffer ring_buffer(n_slots);
#ifdef DEBUG_OUTPUT
cout << "[h5_zmq_writer::run_writer] Running writer";
cout << " with connect_address " << connect_address;
cout << " and output_file " << output_file;
cout << " and output_file " << manager.get_output_file();
cout << " and n_slots " << n_slots;
cout << " and n_io_threads " << n_io_threads;
cout << " and receive_timeout " << receive_timeout;
@@ -182,7 +180,7 @@ void run_writer(string connect_address, string output_file, uint64_t n_frames, u
#endif
boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer), connect_address, n_io_threads, receive_timeout);
boost::thread writer_thread(write_h5, boost::ref(manager), boost::ref(format), boost::ref(ring_buffer), output_file);
boost::thread writer_thread(write_h5, boost::ref(manager), boost::ref(format), boost::ref(ring_buffer));
start_rest_api(manager, rest_port);
@@ -208,7 +206,7 @@ int main (int argc, char *argv[])
cout << "Usage: h5_zmq_writer [connection_address] [output_file] [n_frames] [rest_port] [user_id]" << endl;
cout << "\tconnection_address: Address to connect to the stream (PULL). Example: tcp://127.0.0.1:40000" << endl;
cout << "\toutput_file: Name of the output file." << endl;
cout << "\t_frames: Number of images to acquire. 0 for infinity (untill /stop is called)." << endl;
cout << "\tn_frames: Number of images to acquire. 0 for infinity (untill /stop is called)." << endl;
cout << "\trest_port: Port to start the REST Api on." << endl;
cout << "\tuser_id: uid under which to run the writer. -1 to leave it as it is." << endl;
cout << endl;
@@ -232,7 +230,16 @@ int main (int argc, char *argv[])
}
}
run_writer(string(argv[1]), string(argv[2]), atoi(argv[3]), atoi(argv[4]));
int n_frames = atoi(argv[3]);
string output_file = string(argv[2]);
NXmxFormat format;
WriterManager manager(format.get_input_value_type(), output_file, n_frames);
string connect_address = string(argv[1]);
int rest_port = atoi(argv[4]);
run_writer(manager, format, connect_address, rest_port);
return 0;
}