From 7ce959a2936c550dff163648e50c5bcfd0cc0974 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 9 Jan 2018 11:10:57 +0100 Subject: [PATCH] Add temporary zmq receiver --- src/h5_zmq_writer.cpp | 49 ++++++++++++++++++++++++++++--------------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/src/h5_zmq_writer.cpp b/src/h5_zmq_writer.cpp index 923f094..fda3ef7 100644 --- a/src/h5_zmq_writer.cpp +++ b/src/h5_zmq_writer.cpp @@ -1,22 +1,22 @@ #include #include -#include -#include #include - +#include "rapidjson/document.h" +#include #include "RingBuffer.hpp" #include "H5ChunkedWriter.hpp" using namespace std; -void write(RingBuffer *ring_buffer, string output_file) +void write(RingBuffer *ring_buffer, string output_file, atomic_bool* running_flag) { string dataset_name = "data"; HDF5ChunkedWriter writer(output_file, dataset_name); - while (true) { + // Run until the running flag is set or the ring_buffer is empty. + while(*running_flag || !ring_buffer->is_empty()) { pair received_data = ring_buffer->read(); - + writer.write_data(received_data.first.frame_index, received_data.first.frame_shape, received_data.first.frame_bytes_size, @@ -28,12 +28,13 @@ void write(RingBuffer *ring_buffer, string output_file) writer.close_file(); } -void receive(int num_io_threads, string connect_address, int n_slots=100) +void receive(int num_io_threads, string connect_address, uint64_t n_images, int n_slots=100) { bool ring_buffer_initialized = false; RingBuffer ring_buffer(n_slots); + atomic_bool running_flag(true); - thread writer_thread(write, &ring_buffer, "output.h5"); + thread writer_thread(write, &ring_buffer, "output.h5", &running_flag); zmq::context_t context(num_io_threads); zmq::socket_t receiver(context, ZMQ_PULL); @@ -45,16 +46,22 @@ void receive(int num_io_threads, string connect_address, int n_slots=100) zmq::message_t message_data; FrameMetadata frame_metadata; - for (int i=0; i<10; i++) { + rapidjson::Document header_parser; + + for (uint64_t i=0; i(message_data.data()), message_data.size()); - frame_metadata.frame_header = header; + + // Parse JSON header. + char* header = static_cast(message_data.data()); + header_parser.Parse(header); // Extract frame_index and frame_shape from the message header. - frame_metadata.frame_index = i; - frame_metadata.frame_shape[0] = 1; - frame_metadata.frame_shape[1] = 10; + frame_metadata.frame_index = header_parser["frame"].GetUint64(); + + auto header_shape = header_parser["shape"].GetArray(); + frame_metadata.frame_shape[0] = header_shape[0].GetUint64(); + frame_metadata.frame_shape[1] = header_shape[1].GetUint64(); // Get the message data. receiver.recv(&message_data); @@ -62,18 +69,26 @@ void receive(int num_io_threads, string connect_address, int n_slots=100) if (!ring_buffer_initialized) { ring_buffer.initialize(frame_metadata.frame_bytes_size); + ring_buffer_initialized = true; } // Commit the frame to the buffer. ring_buffer.write(frame_metadata, static_cast(message_data.data())); } - -} + running_flag = false; + + writer_thread.join(); +} int main (int argc, char *argv[]) { + if (argc != 2) { + cout << "Usage: h5_zmq_writer [n_images]" << endl; + exit(-1); + } + int num_io_threads = 1; - receive(num_io_threads, "tcp://127.0.0.1:40000"); + receive(num_io_threads, "tcp://127.0.0.1:40000", atoi(argv[1])); return 0; }