diff --git a/src/h5_zmq_writer.cpp b/src/h5_zmq_writer.cpp new file mode 100644 index 0000000..923f094 --- /dev/null +++ b/src/h5_zmq_writer.cpp @@ -0,0 +1,79 @@ +#include +#include +#include +#include +#include + +#include "RingBuffer.hpp" +#include "H5ChunkedWriter.hpp" + +using namespace std; + +void write(RingBuffer *ring_buffer, string output_file) +{ + string dataset_name = "data"; + HDF5ChunkedWriter writer(output_file, dataset_name); + + while (true) { + pair received_data = ring_buffer->read(); + + writer.write_data(received_data.first.frame_index, + received_data.first.frame_shape, + received_data.first.frame_bytes_size, + received_data.second); + + ring_buffer->release(received_data.first.buffer_slot_index); + } + + writer.close_file(); +} + +void receive(int num_io_threads, string connect_address, int n_slots=100) +{ + bool ring_buffer_initialized = false; + RingBuffer ring_buffer(n_slots); + + thread writer_thread(write, &ring_buffer, "output.h5"); + + zmq::context_t context(num_io_threads); + zmq::socket_t receiver(context, ZMQ_PULL); + receiver.connect(connect_address); + + // int has_more = 0; + // size_t has_more_size = sizeof(has_more); + + zmq::message_t message_data; + FrameMetadata frame_metadata; + + for (int i=0; i<10; i++) { + // Get the message header. + receiver.recv(&message_data); + string header = string(static_cast(message_data.data()), message_data.size()); + frame_metadata.frame_header = header; + + // Extract frame_index and frame_shape from the message header. + frame_metadata.frame_index = i; + frame_metadata.frame_shape[0] = 1; + frame_metadata.frame_shape[1] = 10; + + // Get the message data. + receiver.recv(&message_data); + frame_metadata.frame_bytes_size = message_data.size(); + + if (!ring_buffer_initialized) { + ring_buffer.initialize(frame_metadata.frame_bytes_size); + } + + // Commit the frame to the buffer. + ring_buffer.write(frame_metadata, static_cast(message_data.data())); + } + +} + + +int main (int argc, char *argv[]) +{ + int num_io_threads = 1; + receive(num_io_threads, "tcp://127.0.0.1:40000"); + return 0; +}