From e18a24d3411e9ac35f6b41927113f14d08ef9c9b Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 22 Apr 2020 10:34:51 +0200 Subject: [PATCH] Simple SF writer receiver --- sf-writer/sf_h5_writer.cpp | 147 +++++++++++++------------------------ 1 file changed, 52 insertions(+), 95 deletions(-) diff --git a/sf-writer/sf_h5_writer.cpp b/sf-writer/sf_h5_writer.cpp index 978d842..8b077b8 100644 --- a/sf-writer/sf_h5_writer.cpp +++ b/sf-writer/sf_h5_writer.cpp @@ -1,13 +1,10 @@ #include #include -#include -#include -#include -#include "BufferMultiReader.hpp" -#include - #include "config.hpp" -#include "SfFormat.cpp" +#include "zmq.h" +#include +#include +#include using namespace std; @@ -15,10 +12,9 @@ int main (int argc, char *argv[]) { if (argc != 5) { cout << endl; - cout << "Usage: sf_h5_writer [root_folder]"; + cout << "Usage: sf_h5_writer "; cout << " [output_file] [start_pulse_id] [stop_pulse_id]"; cout << endl; - cout << "\troot_folder: Base of the buffer." << endl; cout << "\toutput_file: Complete path to the output file." << endl; cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl; @@ -27,98 +23,59 @@ int main (int argc, char *argv[]) exit(-1); } - string root_folder = string(argv[1]); - string output_file = string(argv[2]); - uint64_t start_pulse_id = (uint64_t) atoll(argv[3]); - uint64_t stop_pulse_id = (uint64_t) atoll(argv[4]); + string output_file = string(argv[1]); + uint64_t start_pulse_id = (uint64_t) atoll(argv[2]); + uint64_t stop_pulse_id = (uint64_t) atoll(argv[3]); size_t n_modules = 32; - RingBuffer ring_buffer(3); - ring_buffer.initialize(BufferUtils::FILE_MOD * 2 * 512 *1024); + auto ctx = zmq_ctx_new(); + auto socket = zmq_socket(ctx, ZMQ_PULL); - auto path_suffixes = BufferUtils::get_path_suffixes( - start_pulse_id, stop_pulse_id); - - size_t n_reads = path_suffixes.size() * n_modules; - - auto read_buffer = [=, &ring_buffer]() { - - for (size_t i_module=0; i_module(); - file_metadata->module_id = i_module; - file_metadata->start_pulse_id = suffix.start_pulse_id; - file_metadata->stop_pulse_id = suffix.stop_pulse_id; - - char* buffer = ring_buffer.reserve(file_metadata); - while (buffer == nullptr) { - this_thread::sleep_for(chrono::milliseconds(10)); - buffer = ring_buffer.reserve(file_metadata); - } - - string filename = - root_folder + "/" + - device_name + "/" + - suffix.path; - - cout << "Reading file " << filename << endl; - - H5::H5File input_file(filename, H5F_ACC_RDONLY); - - auto image_dataset = input_file.openDataSet("image"); - image_dataset.read( - buffer, H5::PredType::NATIVE_UINT16); - - auto pulse_id_dataset = input_file.openDataSet("pulse_id"); - pulse_id_dataset.read( - file_metadata->pulse_id, H5::PredType::NATIVE_UINT64); - - auto frame_id_dataset = input_file.openDataSet("frame_id"); - frame_id_dataset.read( - file_metadata->frame_index, H5::PredType::NATIVE_UINT64); - - auto daq_rec_dataset = input_file.openDataSet("daq_rec"); - daq_rec_dataset.read( - file_metadata->daq_rec, H5::PredType::NATIVE_UINT32); - - auto received_packets_dataset = - input_file.openDataSet("received_packets"); - received_packets_dataset.read( - file_metadata->n_received_packets, - H5::PredType::NATIVE_UINT16); - - input_file.close(); - - ring_buffer.commit(file_metadata); - } - } - }; - auto read_thread = thread(read_buffer); - - for (size_t i_read=0; i_readstart_pulse_id; - cout << " to " << data.first->stop_pulse_id << endl; - - ring_buffer.release(data.first->buffer_slot_index); + //TODO: Use ipc? + if (zmq_bind(socket, "tcp://localhost:50000") != 0) { + throw runtime_error("not binding"); } - read_thread.join(); + int rcvhwm = 3; + int status = zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)); + if (status != 0) { + throw runtime_error(strerror (errno)); + } + + auto metadata_buffer = make_unique(); + + auto image_buffer = make_unique( + BufferUtils::FILE_MOD * 512 * 1024); + + while (true) { + auto n_bytes_metadata = zmq_recv( + socket, + metadata_buffer.get(), + sizeof(FileBufferMetadata), + 0); + + if (n_bytes_metadata != sizeof(FileBufferMetadata)) { + throw runtime_error("Unexpected number of bytes in metadata."); + } + + auto n_bytes_image = zmq_recv( + socket, + image_buffer.get(), + BufferUtils::FILE_MOD * 512 * 1024 * 2, + 0); + + if (n_bytes_image != BufferUtils::FILE_MOD * 512 * 1024 * 2) { + throw runtime_error("Unexpected number of bytes in image."); + } + + cout << "Received " << metadata_buffer->start_pulse_id; + cout << " to " << metadata_buffer->stop_pulse_id; + cout << endl; + } + + zmq_close(socket); + zmq_ctx_destroy(ctx); return 0; }