Runner script refactoring

This commit is contained in:
2018-01-10 11:56:26 +01:00
parent 3562a468be
commit 724b7a1444
+41 -14
View File
@@ -4,13 +4,15 @@
#include <thread>
#include "rapidjson/document.h"
#include "config.hpp"
#include "WriterManager.hpp"
#include "H5ChunkedWriter.hpp"
#include "RingBuffer.hpp"
#include "rest_interface.hpp"
using namespace std;
void write(RingBuffer *ring_buffer, string output_file, WriterManager *manager)
void write(WriterManager *manager, RingBuffer *ring_buffer, string output_file)
{
string dataset_name = "data";
HDF5ChunkedWriter writer(output_file, dataset_name);
@@ -32,13 +34,8 @@ void write(RingBuffer *ring_buffer, string output_file, WriterManager *manager)
writer.close_file();
}
void receive(string connect_address, uint64_t n_images, int n_slots=100, int n_io_threads=1)
void receive(WriterManager *manager, RingBuffer *ring_buffer, string connect_address, int n_io_threads=1)
{
WriterManager manager(n_images);
RingBuffer ring_buffer(n_slots);
thread writer_thread(write, &ring_buffer, "output.h5", &manager);
zmq::context_t context(n_io_threads);
zmq::socket_t receiver(context, ZMQ_PULL);
receiver.connect(connect_address);
@@ -48,7 +45,7 @@ void receive(string connect_address, uint64_t n_images, int n_slots=100, int n_i
rapidjson::Document header_parser;
while (manager.is_running()) {
while (manager->is_running()) {
// Get the message header.
receiver.recv(&message_data);
@@ -68,23 +65,53 @@ void receive(string connect_address, uint64_t n_images, int n_slots=100, int n_i
frame_metadata.frame_bytes_size = message_data.size();
// Commit the frame to the buffer.
ring_buffer.write(frame_metadata, static_cast<char*>(message_data.data()));
ring_buffer->write(frame_metadata, static_cast<char*>(message_data.data()));
manager.received_frame(frame_metadata.frame_index);
manager->received_frame(frame_metadata.frame_index);
}
}
writer_thread.join();
void run_writer(string connect_address, string output_file, uint64_t n_images){
size_t n_slots = config::n_slots;
int n_io_threads = config::n_io_threads;
WriterManager manager(n_images);
RingBuffer ring_buffer(n_slots);
#ifdef DEBUG
cout << "[h5_zmq_writer::run_writer] Running writer with ";
cout << "connect_address " << connect_address << " " << std::endl;
cout << "output_file " << output_file << " " << std::endl;
cout << "n_slots " << n_slots << " " << std::endl;
cout << "n_io_threads " << n_io_threads << " " << std::endl;
#endif
thread receiver_thread(receive, &manager, &ring_buffer, connect_address, n_io_threads);
thread writer_thread(write, &manager, &ring_buffer, output_file);
start_rest_api(manager, config::rest_port);
receiver_thread.join();
writer_thread.join();
#ifdef DEBUG
cout << "[h5_zmq_writer::run_writer] Writer properly stopped." << endl;
#endif
}
int main (int argc, char *argv[])
{
if (argc != 3) {
cout << "Usage: h5_zmq_writer [connection_address] [n_images]" << endl;
if (argc != 4) {
cout << "Usage: h5_zmq_writer [connection_address] [output_file] [n_images] [rest_port]" << endl;
cout << "\tconnection_address: Address to connect to the stream (PULL). Example: tcp://127.0.0.1:40000" << endl;
cout << "\output_file: Name of the output file." << endl;
cout << "\tn_images: Number of images to acquire. 0 for infinity (untill STOP is called)." << endl;
exit(-1);
}
receive(string(argv[1]), atoi(argv[2]));
run_writer(string(argv[1]), string(argv[2]), atoi(argv[3]));
return 0;
}