Finished implementation of ZmqRecv for Writer

This commit is contained in:
2020-05-25 15:29:47 +02:00
parent df62017346
commit 97cfa407d3
+74 -78
View File
@@ -10,9 +10,11 @@ using namespace core_buffer;
WriterZmqReceiver::WriterZmqReceiver(
void *ctx,
const string &ipc_prefix,
const size_t n_modules) :
n_modules_(n_modules),
sockets_(n_modules)
const size_t n_modules,
const uint64_t stop_pulse_id_) :
n_modules_(n_modules),
sockets_(n_modules),
end_pulse_id_(stop_pulse_id_)
{
for (size_t i = 0; i < n_modules; i++) {
@@ -48,95 +50,89 @@ WriterZmqReceiver::~WriterZmqReceiver()
void WriterZmqReceiver::get_next_buffer(
const uint64_t start_pulse_id,
ImageMetadataBuffer* image_metadata,
ImageMetadataBuffer* i_meta,
char* image_buffer)
{
// Init the image metadata.
image_metadata->pulse_id = pulse_id;
image_metadata->frame_index = 0;
image_metadata->daq_rec = 0;
image_metadata->data_n_bytes = 0;
image_metadata->is_good_frame = 1;
bool image_metadata_init = false;
auto n_images_in_buffer = WRITER_DATA_CACHE_N_IMAGES;
auto images_left = end_pulse_id_ - start_pulse_id + 1;
if (images_left < n_images_in_buffer) {
n_images_in_buffer = images_left;
}
size_t image_buffer_offset = 0;
i_meta->n_images = (uint16_t)n_images_in_buffer;
for (size_t pulse_id=start_pulse_id)
for (uint64_t i_pulse=0; i_pulse<n_images_in_buffer; i_pulse++) {
for (size_t i_module = 0; i_module < n_modules_; i_module++) {
auto pulse_id = start_pulse_id + i_pulse;
bool pulse_id_initialized = false;
auto n_bytes_metadata = zmq_recv(
sockets_[i_module],
&frame_metadata,
sizeof(ReplayModuleFrameBuffer),
0);
i_meta->pulse_id[i_pulse] = pulse_id;
i_meta->is_good_image[i_pulse] = 1;
i_meta->frame_index[i_pulse] = 0;
i_meta->daq_rec[i_pulse] = 0;
if (n_bytes_metadata != sizeof(ReplayModuleFrameBuffer)) {
throw runtime_error("Wrong number of metadata bytes.");
}
for (size_t i_module = 0; i_module < n_modules_; i_module++) {
for (size_t i=0; i<REPLAY_READ_BUFFER_SIZE; i++) {
auto n_bytes_metadata = zmq_recv(
sockets_[i_module], &f_meta_, sizeof(f_meta_), 0);
}
// sf_replay should always send the right pulse_id.
if (frame_metadata.metadata.pulse_id != pulse_id) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[sf_writer::receive_replay]";
err_msg << " Read unexpected pulse_id. ";
err_msg << " Expected " << pulse_id;
err_msg << " received ";
err_msg << frame_metadata.metadata.pulse_id;
err_msg << " from i_module " << i_module << endl;
throw runtime_error(err_msg.str());
}
if (!frame_metadata.is_frame_present) {
image_metadata->is_good_frame = 0;
// Init the image metadata with the first valid frame.
} else if (!image_metadata_init) {
image_metadata_init = true;
image_metadata->frame_index =
frame_metadata.metadata.frame_index;
image_metadata->daq_rec =
frame_metadata.metadata.daq_rec;
}
// Once the image is not good, we don't care to re-flag it.
if (image_metadata->is_good_frame == 1) {
if (frame_metadata.metadata.frame_index !=
image_metadata->frame_index) {
image_metadata->is_good_frame = 0;
if (n_bytes_metadata != sizeof(f_meta_)) {
throw runtime_error("Wrong number of metadata bytes.");
}
if (frame_metadata.metadata.daq_rec !=
image_metadata->daq_rec) {
image_metadata->is_good_frame = 0;
if (f_meta_.pulse_id == 0) {
i_meta->is_good_image[i_pulse] = 0;
} else {
if (!pulse_id_initialized) {
// Init the image metadata with the first valid frame.
pulse_id_initialized = true;
i_meta->frame_index[i_pulse] = f_meta_.frame_index;
i_meta->daq_rec[i_pulse] = f_meta_.daq_rec;
}
if (f_meta_.pulse_id != i_meta->pulse_id[i_pulse]) {
stringstream err_msg;
err_msg << "[WriterZmqReceiver::get_next_buffer]";
err_msg << " Read unexpected pulse_id. ";
err_msg << " Expected " << pulse_id;
err_msg << " received ";
err_msg << f_meta_.pulse_id;
err_msg << " from i_module " << i_module << endl;
throw runtime_error(err_msg.str());
}
}
if (frame_metadata.metadata.n_received_packets !=
JUNGFRAU_N_PACKETS_PER_FRAME) {
image_metadata->is_good_frame = 0;
// Once the image is not good, we don't care to re-flag it.
if (i_meta->is_good_image[i_pulse] == 1) {
if (f_meta_.frame_index != i_meta->frame_index[i_pulse]) {
i_meta->is_good_image[i_pulse] = 0;
}
if (f_meta_.daq_rec != i_meta->daq_rec[i_pulse]) {
i_meta->is_good_image[i_pulse] = 0;
}
if (f_meta_.n_received_packets != JF_N_PACKETS_PER_FRAME) {
i_meta->is_good_image[i_pulse] = 0;
}
}
auto pulse_offset = i_pulse * n_modules_ * MODULE_N_BYTES ;
auto module_offset = i_module * MODULE_N_BYTES;
auto n_bytes_image = zmq_recv(
sockets_[i_module],
(image_buffer + pulse_offset + module_offset),
MODULE_N_BYTES, 0);
if (n_bytes_image != MODULE_N_BYTES) {
throw runtime_error("Wrong number of data bytes.");
}
}
auto n_bytes_image = zmq_recv(
sockets_[i_module],
(image_buffer + image_buffer_offset),
frame_metadata.data_n_bytes,
0);
if (n_bytes_image != frame_metadata.data_n_bytes) {
throw runtime_error("Wrong number of data bytes.");
}
image_buffer_offset += n_bytes_image;
}
}