mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-12 08:55:35 +02:00
First implementation of WriterZmqReceiver
This commit is contained in:
@@ -1,15 +1,28 @@
|
||||
#ifndef SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP
|
||||
#define SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP
|
||||
|
||||
#include <string>
|
||||
#include "WriterH5Writer.hpp"
|
||||
#include <vector>
|
||||
|
||||
|
||||
class WriterZmqReceiver {
|
||||
|
||||
const size_t n_modules_;
|
||||
std::vector<void*> sockets_;
|
||||
|
||||
CompressedModuleFrame frame_metadata;
|
||||
|
||||
public:
|
||||
WriterZmqReceiver(
|
||||
void *ctx,
|
||||
const std::string& ipc_prefix,
|
||||
const size_t n_modules);
|
||||
|
||||
virtual ~WriterZmqReceiver();
|
||||
|
||||
void get_next_image(
|
||||
const uint64_t pulse_id,
|
||||
ImageMetadata* image_metadata,
|
||||
char* image_buffer);
|
||||
};
|
||||
|
||||
@@ -1,116 +1,137 @@
|
||||
//#include <string>
|
||||
//#include "WriterZmqReceiver.hpp"
|
||||
//
|
||||
//void connect()
|
||||
//{
|
||||
// void *sockets[n_modules];
|
||||
// for (size_t i = 0; i < n_modules; i++) {
|
||||
// sockets[i] = zmq_socket(ctx, ZMQ_PULL);
|
||||
//
|
||||
// int rcvhwm = WRITER_RCVHWM;
|
||||
// if (zmq_setsockopt(sockets[i], ZMQ_RCVHWM, &rcvhwm,
|
||||
// sizeof(rcvhwm)) != 0) {
|
||||
// throw runtime_error(strerror(errno));
|
||||
// }
|
||||
// int linger = 0;
|
||||
// if (zmq_setsockopt(sockets[i], ZMQ_LINGER, &linger,
|
||||
// sizeof(linger)) != 0) {
|
||||
// throw runtime_error(strerror(errno));
|
||||
// }
|
||||
//
|
||||
// stringstream ipc_addr;
|
||||
// ipc_addr << ipc_prefix << i;
|
||||
// const auto ipc = ipc_addr.str();
|
||||
//
|
||||
// if (zmq_bind(sockets[i], ipc.c_str()) != 0) {
|
||||
// throw runtime_error(strerror(errno));
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//void disconnect()
|
||||
//{
|
||||
// for (size_t i = 0; i < n_modules; i++) {
|
||||
// zmq_close(sockets[i]);
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//void acquire_pulse()
|
||||
//{
|
||||
// frame_meta_buffer->is_good_frame[i_buffer] = true;
|
||||
//
|
||||
// for (size_t i_module = 0; i_module < n_modules; i_module++) {
|
||||
// auto n_bytes_metadata = zmq_recv(
|
||||
// sockets[i_module],
|
||||
// module_meta_buffer.get(),
|
||||
// sizeof(ModuleFrame),
|
||||
// 0);
|
||||
//
|
||||
// if (n_bytes_metadata != sizeof(ModuleFrame)) {
|
||||
// throw runtime_error("Wrong number of metadata bytes.");
|
||||
// }
|
||||
//
|
||||
// if (module_meta_buffer->pulse_id != current_pulse_id) {
|
||||
// stringstream err_msg;
|
||||
//
|
||||
// using namespace date;
|
||||
// using namespace chrono;
|
||||
// err_msg << "[" << system_clock::now() << "]";
|
||||
// err_msg << "[sf_writer::receive_replay]";
|
||||
// err_msg << " Read unexpected pulse_id. ";
|
||||
// err_msg << " Expected " << current_pulse_id;
|
||||
// err_msg << " received ";
|
||||
// err_msg << module_meta_buffer->pulse_id << endl;
|
||||
//
|
||||
// throw runtime_error(err_msg.str());
|
||||
// }
|
||||
//
|
||||
// // Initialize buffers in first iteration for each pulse_id.
|
||||
// if (i_module == 0) {
|
||||
// frame_meta_buffer->pulse_id[i_buffer] =
|
||||
// module_meta_buffer->pulse_id;
|
||||
// frame_meta_buffer->frame_index[i_buffer] =
|
||||
// module_meta_buffer->frame_index;
|
||||
// frame_meta_buffer->daq_rec[i_buffer] =
|
||||
// module_meta_buffer->daq_rec;
|
||||
// frame_meta_buffer->n_received_packets[i_buffer] =
|
||||
// module_meta_buffer->n_received_packets;
|
||||
//
|
||||
// if ( module_meta_buffer->n_received_packets != 128 ) frame_meta_buffer->is_good_frame[i_buffer] = false;
|
||||
//
|
||||
// } else {
|
||||
// if (module_meta_buffer->pulse_id != frame_meta_buffer->pulse_id[i_buffer]) frame_meta_buffer->is_good_frame[i_buffer] = false;
|
||||
//
|
||||
// if (module_meta_buffer->frame_index != frame_meta_buffer->frame_index[i_buffer]) frame_meta_buffer->is_good_frame[i_buffer] = false;
|
||||
//
|
||||
// if (module_meta_buffer->daq_rec != frame_meta_buffer->daq_rec[i_buffer]) frame_meta_buffer->is_good_frame[i_buffer] = false;
|
||||
//
|
||||
// if (module_meta_buffer->n_received_packets != 128 ) frame_meta_buffer->is_good_frame[i_buffer] = false;
|
||||
// }
|
||||
//
|
||||
// if (frame_meta_buffer->pulse_id[i_buffer] !=
|
||||
// module_meta_buffer->pulse_id) {
|
||||
// throw runtime_error("Unexpected pulse_id received.");
|
||||
// }
|
||||
//
|
||||
// // Offset due to frame in buffer.
|
||||
// size_t offset = MODULE_N_BYTES * n_modules * i_buffer;
|
||||
// // offset due to module in frame.
|
||||
// offset += MODULE_N_BYTES * i_module;
|
||||
//
|
||||
// auto n_bytes_image = zmq_recv(
|
||||
// sockets[i_module],
|
||||
// (frame_buffer + offset),
|
||||
// MODULE_N_BYTES,
|
||||
// 0);
|
||||
//
|
||||
// if (n_bytes_image != MODULE_N_BYTES) {
|
||||
// throw runtime_error("Wrong number of data bytes.");
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//WriterZmqReceiver::WriterZmqReceiver(void *ctx, const std::string ipc_prefix,
|
||||
// const size_t n_modules) {
|
||||
//
|
||||
//}
|
||||
#include "WriterZmqReceiver.hpp"
|
||||
#include "zmq.h"
|
||||
#include "date.h"
|
||||
#include <chrono>
|
||||
#include <sstream>
|
||||
|
||||
using namespace std;
|
||||
using namespace core_buffer;
|
||||
|
||||
WriterZmqReceiver::WriterZmqReceiver(
|
||||
void *ctx,
|
||||
const string &ipc_prefix,
|
||||
const size_t n_modules) :
|
||||
n_modules_(n_modules),
|
||||
sockets_(n_modules)
|
||||
{
|
||||
|
||||
for (size_t i = 0; i < n_modules; i++) {
|
||||
sockets_[i] = zmq_socket(ctx, ZMQ_PULL);
|
||||
|
||||
int rcvhwm = WRITER_RCVHWM;
|
||||
if (zmq_setsockopt(sockets_[i], ZMQ_RCVHWM, &rcvhwm,
|
||||
sizeof(rcvhwm)) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
int linger = 0;
|
||||
if (zmq_setsockopt(sockets_[i], ZMQ_LINGER, &linger,
|
||||
sizeof(linger)) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
stringstream ipc_addr;
|
||||
ipc_addr << ipc_prefix << i;
|
||||
const auto ipc = ipc_addr.str();
|
||||
|
||||
if (zmq_bind(sockets_[i], ipc.c_str()) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
WriterZmqReceiver::~WriterZmqReceiver()
|
||||
{
|
||||
for (size_t i = 0; i < n_modules_; i++) {
|
||||
zmq_close(sockets_[i]);
|
||||
}
|
||||
}
|
||||
|
||||
void WriterZmqReceiver::get_next_image(
|
||||
const uint64_t pulse_id,
|
||||
ImageMetadata* image_metadata,
|
||||
char* image_buffer)
|
||||
{
|
||||
// Init the image metadata.
|
||||
image_metadata->pulse_id = pulse_id;
|
||||
image_metadata->frame_index = 0;
|
||||
image_metadata->daq_rec = 0;
|
||||
image_metadata->compressed_image_size = 0;
|
||||
image_metadata->is_good_frame = 1;
|
||||
bool image_metadata_init = false;
|
||||
|
||||
size_t image_buffer_offset = 0;
|
||||
|
||||
for (size_t i_module = 0; i_module < n_modules_; i_module++) {
|
||||
|
||||
auto n_bytes_metadata = zmq_recv(
|
||||
sockets_[i_module],
|
||||
&frame_metadata,
|
||||
sizeof(CompressedModuleFrame),
|
||||
0);
|
||||
|
||||
if (n_bytes_metadata != sizeof(CompressedModuleFrame)) {
|
||||
throw runtime_error("Wrong number of metadata bytes.");
|
||||
}
|
||||
|
||||
// sf_replay should always send the right pulse_id.
|
||||
if (frame_metadata.module_frame.pulse_id != pulse_id) {
|
||||
stringstream err_msg;
|
||||
|
||||
using namespace date;
|
||||
using namespace chrono;
|
||||
err_msg << "[" << system_clock::now() << "]";
|
||||
err_msg << "[sf_writer::receive_replay]";
|
||||
err_msg << " Read unexpected pulse_id. ";
|
||||
err_msg << " Expected " << pulse_id;
|
||||
err_msg << " received ";
|
||||
err_msg << frame_metadata.module_frame.pulse_id << endl;
|
||||
|
||||
throw runtime_error(err_msg.str());
|
||||
}
|
||||
|
||||
if (!frame_metadata.is_frame_present) {
|
||||
image_metadata->is_good_frame = 0;
|
||||
|
||||
// Init the image metadata with the first valid frame.
|
||||
} else if (!image_metadata_init) {
|
||||
image_metadata_init = true;
|
||||
|
||||
image_metadata->frame_index =
|
||||
frame_metadata.module_frame.frame_index;
|
||||
image_metadata->daq_rec =
|
||||
frame_metadata.module_frame.daq_rec;
|
||||
}
|
||||
|
||||
// Once the image is not good, we don't care to re-flag it.
|
||||
if (image_metadata->is_good_frame == 1) {
|
||||
if (frame_metadata.module_frame.frame_index !=
|
||||
image_metadata->frame_index) {
|
||||
image_metadata->is_good_frame = 0;
|
||||
}
|
||||
|
||||
if (frame_metadata.module_frame.daq_rec !=
|
||||
image_metadata->daq_rec) {
|
||||
image_metadata->is_good_frame = 0;
|
||||
}
|
||||
|
||||
if (frame_metadata.module_frame.n_received_packets !=
|
||||
JUNGFRAU_N_PACKETS_PER_FRAME) {
|
||||
image_metadata->is_good_frame = 0;
|
||||
}
|
||||
}
|
||||
|
||||
auto n_bytes_image = zmq_recv(
|
||||
sockets_[i_module],
|
||||
(image_buffer + image_buffer_offset),
|
||||
frame_metadata.compressed_size,
|
||||
0);
|
||||
|
||||
if (n_bytes_image != frame_metadata.compressed_size) {
|
||||
throw runtime_error("Wrong number of data bytes.");
|
||||
}
|
||||
|
||||
image_buffer_offset += n_bytes_image;
|
||||
}
|
||||
|
||||
image_metadata->compressed_image_size = image_buffer_offset;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user