From c33c63a6e678dc56673fead54b1e78b0203cb94f Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 15 May 2020 16:20:32 +0200 Subject: [PATCH] First implementation of WriterZmqReceiver --- core-buffer/include/WriterZmqReceiver.hpp | 13 ++ core-buffer/src/WriterZmqReceiver.cpp | 253 ++++++++++++---------- 2 files changed, 150 insertions(+), 116 deletions(-) diff --git a/core-buffer/include/WriterZmqReceiver.hpp b/core-buffer/include/WriterZmqReceiver.hpp index 2361577..4a30caa 100644 --- a/core-buffer/include/WriterZmqReceiver.hpp +++ b/core-buffer/include/WriterZmqReceiver.hpp @@ -1,15 +1,28 @@ #ifndef SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP #define SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP +#include +#include "WriterH5Writer.hpp" +#include + class WriterZmqReceiver { + + const size_t n_modules_; + std::vector sockets_; + + CompressedModuleFrame frame_metadata; + public: WriterZmqReceiver( void *ctx, const std::string& ipc_prefix, const size_t n_modules); + virtual ~WriterZmqReceiver(); + void get_next_image( + const uint64_t pulse_id, ImageMetadata* image_metadata, char* image_buffer); }; diff --git a/core-buffer/src/WriterZmqReceiver.cpp b/core-buffer/src/WriterZmqReceiver.cpp index 8abda0f..010bc9e 100644 --- a/core-buffer/src/WriterZmqReceiver.cpp +++ b/core-buffer/src/WriterZmqReceiver.cpp @@ -1,116 +1,137 @@ -//#include -//#include "WriterZmqReceiver.hpp" -// -//void connect() -//{ -// void *sockets[n_modules]; -// for (size_t i = 0; i < n_modules; i++) { -// sockets[i] = zmq_socket(ctx, ZMQ_PULL); -// -// int rcvhwm = WRITER_RCVHWM; -// if (zmq_setsockopt(sockets[i], ZMQ_RCVHWM, &rcvhwm, -// sizeof(rcvhwm)) != 0) { -// throw runtime_error(strerror(errno)); -// } -// int linger = 0; -// if (zmq_setsockopt(sockets[i], ZMQ_LINGER, &linger, -// sizeof(linger)) != 0) { -// throw runtime_error(strerror(errno)); -// } -// -// stringstream ipc_addr; -// ipc_addr << ipc_prefix << i; -// const auto ipc = ipc_addr.str(); -// -// if (zmq_bind(sockets[i], ipc.c_str()) != 0) { -// throw runtime_error(strerror(errno)); -// } -// } -//} -// -//void disconnect() -//{ -// for (size_t i = 0; i < n_modules; i++) { -// zmq_close(sockets[i]); -// } -//} -// -//void acquire_pulse() -//{ -// frame_meta_buffer->is_good_frame[i_buffer] = true; -// -// for (size_t i_module = 0; i_module < n_modules; i_module++) { -// auto n_bytes_metadata = zmq_recv( -// sockets[i_module], -// module_meta_buffer.get(), -// sizeof(ModuleFrame), -// 0); -// -// if (n_bytes_metadata != sizeof(ModuleFrame)) { -// throw runtime_error("Wrong number of metadata bytes."); -// } -// -// if (module_meta_buffer->pulse_id != current_pulse_id) { -// stringstream err_msg; -// -// using namespace date; -// using namespace chrono; -// err_msg << "[" << system_clock::now() << "]"; -// err_msg << "[sf_writer::receive_replay]"; -// err_msg << " Read unexpected pulse_id. "; -// err_msg << " Expected " << current_pulse_id; -// err_msg << " received "; -// err_msg << module_meta_buffer->pulse_id << endl; -// -// throw runtime_error(err_msg.str()); -// } -// -// // Initialize buffers in first iteration for each pulse_id. -// if (i_module == 0) { -// frame_meta_buffer->pulse_id[i_buffer] = -// module_meta_buffer->pulse_id; -// frame_meta_buffer->frame_index[i_buffer] = -// module_meta_buffer->frame_index; -// frame_meta_buffer->daq_rec[i_buffer] = -// module_meta_buffer->daq_rec; -// frame_meta_buffer->n_received_packets[i_buffer] = -// module_meta_buffer->n_received_packets; -// -// if ( module_meta_buffer->n_received_packets != 128 ) frame_meta_buffer->is_good_frame[i_buffer] = false; -// -// } else { -// if (module_meta_buffer->pulse_id != frame_meta_buffer->pulse_id[i_buffer]) frame_meta_buffer->is_good_frame[i_buffer] = false; -// -// if (module_meta_buffer->frame_index != frame_meta_buffer->frame_index[i_buffer]) frame_meta_buffer->is_good_frame[i_buffer] = false; -// -// if (module_meta_buffer->daq_rec != frame_meta_buffer->daq_rec[i_buffer]) frame_meta_buffer->is_good_frame[i_buffer] = false; -// -// if (module_meta_buffer->n_received_packets != 128 ) frame_meta_buffer->is_good_frame[i_buffer] = false; -// } -// -// if (frame_meta_buffer->pulse_id[i_buffer] != -// module_meta_buffer->pulse_id) { -// throw runtime_error("Unexpected pulse_id received."); -// } -// -// // Offset due to frame in buffer. -// size_t offset = MODULE_N_BYTES * n_modules * i_buffer; -// // offset due to module in frame. -// offset += MODULE_N_BYTES * i_module; -// -// auto n_bytes_image = zmq_recv( -// sockets[i_module], -// (frame_buffer + offset), -// MODULE_N_BYTES, -// 0); -// -// if (n_bytes_image != MODULE_N_BYTES) { -// throw runtime_error("Wrong number of data bytes."); -// } -// } -//} -// -//WriterZmqReceiver::WriterZmqReceiver(void *ctx, const std::string ipc_prefix, -// const size_t n_modules) { -// -//} +#include "WriterZmqReceiver.hpp" +#include "zmq.h" +#include "date.h" +#include +#include + +using namespace std; +using namespace core_buffer; + +WriterZmqReceiver::WriterZmqReceiver( + void *ctx, + const string &ipc_prefix, + const size_t n_modules) : + n_modules_(n_modules), + sockets_(n_modules) +{ + + for (size_t i = 0; i < n_modules; i++) { + sockets_[i] = zmq_socket(ctx, ZMQ_PULL); + + int rcvhwm = WRITER_RCVHWM; + if (zmq_setsockopt(sockets_[i], ZMQ_RCVHWM, &rcvhwm, + sizeof(rcvhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + int linger = 0; + if (zmq_setsockopt(sockets_[i], ZMQ_LINGER, &linger, + sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + stringstream ipc_addr; + ipc_addr << ipc_prefix << i; + const auto ipc = ipc_addr.str(); + + if (zmq_bind(sockets_[i], ipc.c_str()) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + } +} + +WriterZmqReceiver::~WriterZmqReceiver() +{ + for (size_t i = 0; i < n_modules_; i++) { + zmq_close(sockets_[i]); + } +} + +void WriterZmqReceiver::get_next_image( + const uint64_t pulse_id, + ImageMetadata* image_metadata, + char* image_buffer) +{ + // Init the image metadata. + image_metadata->pulse_id = pulse_id; + image_metadata->frame_index = 0; + image_metadata->daq_rec = 0; + image_metadata->compressed_image_size = 0; + image_metadata->is_good_frame = 1; + bool image_metadata_init = false; + + size_t image_buffer_offset = 0; + + for (size_t i_module = 0; i_module < n_modules_; i_module++) { + + auto n_bytes_metadata = zmq_recv( + sockets_[i_module], + &frame_metadata, + sizeof(CompressedModuleFrame), + 0); + + if (n_bytes_metadata != sizeof(CompressedModuleFrame)) { + throw runtime_error("Wrong number of metadata bytes."); + } + + // sf_replay should always send the right pulse_id. + if (frame_metadata.module_frame.pulse_id != pulse_id) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[sf_writer::receive_replay]"; + err_msg << " Read unexpected pulse_id. "; + err_msg << " Expected " << pulse_id; + err_msg << " received "; + err_msg << frame_metadata.module_frame.pulse_id << endl; + + throw runtime_error(err_msg.str()); + } + + if (!frame_metadata.is_frame_present) { + image_metadata->is_good_frame = 0; + + // Init the image metadata with the first valid frame. + } else if (!image_metadata_init) { + image_metadata_init = true; + + image_metadata->frame_index = + frame_metadata.module_frame.frame_index; + image_metadata->daq_rec = + frame_metadata.module_frame.daq_rec; + } + + // Once the image is not good, we don't care to re-flag it. + if (image_metadata->is_good_frame == 1) { + if (frame_metadata.module_frame.frame_index != + image_metadata->frame_index) { + image_metadata->is_good_frame = 0; + } + + if (frame_metadata.module_frame.daq_rec != + image_metadata->daq_rec) { + image_metadata->is_good_frame = 0; + } + + if (frame_metadata.module_frame.n_received_packets != + JUNGFRAU_N_PACKETS_PER_FRAME) { + image_metadata->is_good_frame = 0; + } + } + + auto n_bytes_image = zmq_recv( + sockets_[i_module], + (image_buffer + image_buffer_offset), + frame_metadata.compressed_size, + 0); + + if (n_bytes_image != frame_metadata.compressed_size) { + throw runtime_error("Wrong number of data bytes."); + } + + image_buffer_offset += n_bytes_image; + } + + image_metadata->compressed_image_size = image_buffer_offset; +}