Minor fixes

This commit is contained in:
2018-01-10 14:53:41 +01:00
parent d8f7755490
commit b9f6994d31
3 changed files with 37 additions and 18 deletions
+8 -4
View File
@@ -13,6 +13,10 @@ WriterManager::WriterManager(uint64_t n_images):
void WriterManager::stop()
{
#ifdef DEBUG_OUTPUT
cout << "[WriterManager::stop] Stopping the writer manager." << endl;
#endif
running_flag = false;
}
@@ -20,7 +24,7 @@ string WriterManager::get_status()
{
if (running_flag) {
return "receiving";
} else if (n_received_frames == n_written_frames) {
} else if (n_received_frames > n_written_frames) {
return "writing";
} else {
return "finished";
@@ -30,8 +34,8 @@ string WriterManager::get_status()
map<string, uint64_t> WriterManager::get_statistics()
{
map<string, uint64_t> result = {{"n_received_frames", n_received_frames.load()},
{"n_written_frames", n_written_frames.load()},
{"total_expected_frames", n_images}};
{"n_written_frames", n_written_frames.load()},
{"total_expected_frames", n_images}};
return result;
}
@@ -51,7 +55,7 @@ void WriterManager::set_parameters(map<string, string>& new_parameters)
bool WriterManager::is_running()
{
// Take into account n_images only if it is <> 0.
if (n_images && n_received_frames.load() > n_images) {
if (n_images && n_received_frames.load() >= n_images) {
running_flag = false;
}
+22 -12
View File
@@ -14,8 +14,7 @@ using namespace std;
void write(WriterManager *manager, RingBuffer *ring_buffer, string output_file)
{
string dataset_name = "data";
HDF5ChunkedWriter writer(output_file, dataset_name);
HDF5ChunkedWriter writer(output_file, config::dataset_name);
// Run until the running flag is set or the ring_buffer is empty.
while(manager->is_running() || !ring_buffer->is_empty()) {
@@ -32,6 +31,10 @@ void write(WriterManager *manager, RingBuffer *ring_buffer, string output_file)
}
writer.close_file();
#ifdef DEBUG_OUTPUT
cout << "[h5_zmq_writer::write] Writer thread stopped." << endl;
#endif
}
void receive(WriterManager *manager, RingBuffer *ring_buffer, string connect_address, int n_io_threads=1)
@@ -69,9 +72,13 @@ void receive(WriterManager *manager, RingBuffer *ring_buffer, string connect_add
manager->received_frame(frame_metadata.frame_index);
}
#ifdef DEBUG_OUTPUT
cout << "[h5_zmq_writer::receive] Receiver thread stopped." << endl;
#endif
}
void run_writer(string connect_address, string output_file, uint64_t n_images){
void run_writer(string connect_address, string output_file, uint64_t n_images, uint16_t rest_port){
size_t n_slots = config::n_slots;
int n_io_threads = config::n_io_threads;
@@ -81,17 +88,17 @@ void run_writer(string connect_address, string output_file, uint64_t n_images){
#ifdef DEBUG_OUTPUT
cout << "[h5_zmq_writer::run_writer] Running writer";
cout << " with connect_address " << connect_address << " ";
cout << " and output_file " << output_file << " ";
cout << " and n_slots " << n_slots << " ";
cout << " and n_io_threads " << n_io_threads << " ";
cout << " with connect_address " << connect_address;
cout << " and output_file " << output_file;
cout << " and n_slots " << n_slots;
cout << " and n_io_threads " << n_io_threads;
cout << endl;
#endif
thread receiver_thread(receive, &manager, &ring_buffer, connect_address, n_io_threads);
thread writer_thread(write, &manager, &ring_buffer, output_file);
start_rest_api(manager, config::rest_port);
start_rest_api(manager, rest_port);
receiver_thread.join();
writer_thread.join();
@@ -103,16 +110,19 @@ void run_writer(string connect_address, string output_file, uint64_t n_images){
int main (int argc, char *argv[])
{
if (argc != 4) {
if (argc != 5) {
cout << endl;
cout << "Usage: h5_zmq_writer [connection_address] [output_file] [n_images] [rest_port]" << endl;
cout << "\tconnection_address: Address to connect to the stream (PULL). Example: tcp://127.0.0.1:40000" << endl;
cout << "\output_file: Name of the output file." << endl;
cout << "\tn_images: Number of images to acquire. 0 for infinity (untill STOP is called)." << endl;
cout << "\toutput_file: Name of the output file." << endl;
cout << "\tn_images: Number of images to acquire. 0 for infinity (untill /stop is called)." << endl;
cout << "\rest_port: Port to start the REST Api on." << endl;
cout << endl;
exit(-1);
}
run_writer(string(argv[1]), string(argv[2]), atoi(argv[3]));
run_writer(string(argv[1]), string(argv[2]), atoi(argv[3]), atoi(argv[4]));
return 0;
}
+7 -2
View File
@@ -8,6 +8,11 @@ using namespace std;
void start_rest_api(WriterManager& writer_manager, uint16_t port)
{
#ifdef DEBUG_OUTPUT
cout << "[rest_interface::start_rest_api] Starting rest interface on port " << port << endl;
#endif
crow::SimpleApp app;
CROW_ROUTE(app, "/kill")([&](){
@@ -15,7 +20,7 @@ void start_rest_api(WriterManager& writer_manager, uint16_t port)
crow::json::wvalue result;
result["status"] = "Writer killed.";
result["status"] = "killed";
app.stop();
@@ -78,6 +83,6 @@ void start_rest_api(WriterManager& writer_manager, uint16_t port)
}
});
app.loglevel(crow::LogLevel::INFO);
app.loglevel(crow::LogLevel::ERROR);
app.port(port).run();
}