Add runner script

This commit is contained in:
2018-01-09 17:45:00 +01:00
parent db0c3e88e0
commit 467a6d5f45
+15 -18
View File
@@ -1,20 +1,20 @@
#include <iostream>
#include <zmq.hpp>
#include <atomic>
#include "rapidjson/document.h"
#include <cstdlib>
#include "RingBuffer.hpp"
#include "H5ChunkedWriter.hpp"
#include "WriterManager.hpp"
using namespace std;
void write(RingBuffer *ring_buffer, string output_file, atomic_bool* running_flag)
void write(RingBuffer *ring_buffer, string output_file, WriterManager &manager)
{
string dataset_name = "data";
HDF5ChunkedWriter writer(output_file, dataset_name);
// Run until the running flag is set or the ring_buffer is empty.
while(*running_flag || !ring_buffer->is_empty()) {
while(manager.is_running() || !ring_buffer->is_empty()) {
pair<FrameMetadata, char*> received_data = ring_buffer->read();
writer.write_data(received_data.first.frame_index,
@@ -23,32 +23,30 @@ void write(RingBuffer *ring_buffer, string output_file, atomic_bool* running_fla
received_data.second);
ring_buffer->release(received_data.first.buffer_slot_index);
manager.written_frame(received_data.first.frame_index);
}
writer.close_file();
}
void receive(int num_io_threads, string connect_address, uint64_t n_images, int n_slots=100)
void receive(string connect_address, uint64_t n_images, int n_slots=100, int n_io_threads=1)
{
bool ring_buffer_initialized = false;
WriterManager manager(n_images);
RingBuffer ring_buffer(n_slots);
atomic_bool running_flag(true);
thread writer_thread(write, &ring_buffer, "output.h5", &running_flag);
zmq::context_t context(num_io_threads);
zmq::context_t context(n_io_threads);
zmq::socket_t receiver(context, ZMQ_PULL);
receiver.connect(connect_address);
// int has_more = 0;
// size_t has_more_size = sizeof(has_more);
zmq::message_t message_data;
FrameMetadata frame_metadata;
rapidjson::Document header_parser;
for (uint64_t i=0; i<n_images; i++) {
for (manager.is_running()) {
// Get the message header.
receiver.recv(&message_data);
@@ -65,7 +63,7 @@ void receive(int num_io_threads, string connect_address, uint64_t n_images, int
// Get the message data.
receiver.recv(&message_data);
frame_metadata.frame_bytes_size = message_data.size();
frame_metadata.frame_bytes_size = message_data.size();
if (!ring_buffer_initialized) {
ring_buffer.initialize(frame_metadata.frame_bytes_size);
@@ -74,21 +72,20 @@ void receive(int num_io_threads, string connect_address, uint64_t n_images, int
// Commit the frame to the buffer.
ring_buffer.write(frame_metadata, static_cast<char*>(message_data.data()));
}
running_flag = false;
manager.received_frame(frame_metadata.frame_index);
}
writer_thread.join();
}
int main (int argc, char *argv[])
{
if (argc != 2) {
cout << "Usage: h5_zmq_writer [n_images]" << endl;
if (argc != 3) {
cout << "Usage: h5_zmq_writer [connection_address] [n_images]" << endl;
exit(-1);
}
int num_io_threads = 1;
receive(num_io_threads, "tcp://127.0.0.1:40000", atoi(argv[1]));
receive(string(argv[1]), atoi(argv[2]));
return 0;
}