detached sync and assembler with zmq recv.

This commit is contained in:
lhdamiani
2021-07-06 11:57:43 +02:00
parent 300d992acf
commit 8fd1c4cf7a
14 changed files with 129 additions and 64 deletions
-2
View File
@@ -18,7 +18,6 @@ namespace BufferUtils
const std::string detector_name;
const int n_modules;
const int n_submodules;
const int start_udp_port;
const std::string buffer_folder;
@@ -32,7 +31,6 @@ namespace BufferUtils
<< det_config.GAIN_FILENAME << ' '
<< det_config.detector_name << ' '
<< det_config.n_modules << ' '
<< det_config.n_submodules << ' '
<< det_config.start_udp_port << ' '
<< det_config.buffer_folder << ' ';
}
+5 -2
View File
@@ -16,6 +16,10 @@ struct ModuleFrame {
};
#pragma pack(pop)
// ImageMetadata status convention
// 0 good image
// 1 frames with missing packets
// 2 frames with different ids
#pragma pack(push)
#pragma pack(1)
@@ -26,11 +30,10 @@ struct ImageMetadata {
uint64_t dtype;
uint64_t encoding;
uint64_t source_id;
uint64_t status;
uint64_t status;
uint64_t user_1;
uint64_t user_2;
};
#pragma pack(pop)
#endif //SF_DAQ_BUFFER_FORMATS_HPP
-1
View File
@@ -167,7 +167,6 @@ BufferUtils::DetectorConfig BufferUtils::read_json_config(
config_parameters["gain_file"].GetString(),
config_parameters["detector_name"].GetString(),
config_parameters["n_modules"].GetInt(),
config_parameters["n_submodules"].GetInt(),
config_parameters["start_udp_port"].GetInt(),
config_parameters["buffer_folder"].GetString(),
};
+2 -3
View File
@@ -84,9 +84,9 @@ void RamBuffer::write_frame(
const char *src_data) const
{
auto *dst_meta = (ModuleFrame*) get_frame_meta(
src_meta.pulse_id, src_meta.module_id);
src_meta.id, src_meta.module_id);
auto *dst_data = get_frame_data(
src_meta.pulse_id, src_meta.module_id);
src_meta.id, src_meta.module_id);
#ifdef DEBUG_OUTPUT
using namespace date;
@@ -96,7 +96,6 @@ void RamBuffer::write_frame(
cout << " || src_meta.n_recv_packets " << src_meta.n_recv_packets;
cout << " || src_meta.daq_rec " << src_meta.daq_rec;
cout << " || src_meta.module_id " << src_meta.module_id;
cout << " || dst_meta " << &dst_meta;
cout << endl;
#endif
+1 -1
View File
@@ -33,5 +33,5 @@ TEST(RamBuffer, simple_store)
ASSERT_EQ(image_meta.pulse_id, frame_meta.pulse_id);
ASSERT_EQ(image_meta.daq_rec, frame_meta.daq_rec);
ASSERT_EQ(image_meta.frame_index, frame_meta.frame_index);
ASSERT_EQ(image_meta.is_good_image, 1);
ASSERT_EQ(image_meta.status, 0);
}
+3 -3
View File
@@ -5,9 +5,9 @@
"live_rate": 10,
"pedestal_file": "",
"gain_file": "",
"detector_name": "Eiger-16",
"n_modules": 1,
"n_submodules":4,
"detector_name": "eiger-16",
"detector_type": "eiger",
"n_modules": 4,
"start_udp_port": 50200,
"buffer_folder": ""
}
+6 -1
View File
@@ -10,6 +10,8 @@ class EigerAssembler {
const int bit_depth_;
const int n_eiger_modules_;
int last_image_status_;
const uint32_t n_bytes_per_frame_;
const uint32_t n_bytes_per_module_line_;
const uint32_t n_packets_per_frame_;
@@ -21,12 +23,15 @@ class EigerAssembler {
const int n_lines_per_frame_;
const int image_bytes_;
public:
EigerAssembler(const int n_modules, const int bit_depth);
void assemble_image(const char* src_meta, const char* src_data,
char* dst_meta, char* dst_data) const;
char* dst_meta, char* dst_data);
size_t get_image_n_bytes() const;
size_t get_module_n_bytes() const;
int get_last_img_status() const;
friend std::ostream& operator<<(std::ostream& os, const EigerAssembler& p)
{
+1 -1
View File
@@ -28,7 +28,7 @@ void AssemblerStats::record_stats(
image_counter_++;
n_sync_lost_images_ += n_lost_pulses;
if (!meta->is_good_image) {
if (meta->status != 0) {
n_corrupted_images_++;
}
+61 -18
View File
@@ -5,6 +5,7 @@
#include <cstring>
#include "EigerAssembler.hpp"
#include "buffer_config.hpp"
#include "eiger.hpp"
#include "date.h"
@@ -30,6 +31,11 @@ EigerAssembler::EigerAssembler(const int n_modules, const int bit_depth):
}
int EigerAssembler::get_last_img_status() const
{
return last_image_status_;
}
size_t EigerAssembler::get_module_n_bytes() const
{
return n_bytes_per_frame_;
@@ -43,14 +49,46 @@ size_t EigerAssembler::get_image_n_bytes() const
void EigerAssembler::assemble_image(const char* src_meta,
const char* src_data,
char* dst_meta,
char* dst_data) const
char* dst_data)
{
auto is_pulse_init = false;
for (int i_module = 0; i_module < n_modules_; i_module++) {
// module frame metadata
auto frame_meta = (ModuleFrame *)(src_meta + (sizeof(ModuleFrame)* i_module));
// module frame data
auto *frame_data = src_data + (n_bytes_per_frame_ * i_module);
// image metadata
auto image_meta = (ImageMetadata *) dst_meta;
// initializes image metadata
if (!is_pulse_init){
// init good image status = 0
image_meta->status = 0;
image_meta->id = frame_meta->id;
// todo fill rest of image metadata
// image_meta->height
// image_meta->width
is_pulse_init = 1;
}
// missing packets: bad status = 1
if (frame_meta->n_recv_packets != n_packets_per_frame_){
image_meta->status = 1;
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [EigerAssembler::assemble_image] bad frame :";
cout << "frame_meta->frame_index != n_packets_per_frame_ ";
cout << "|| i_module: " << i_module;
cout << "|| frame_meta->n_recv_packets " << frame_meta->n_recv_packets;
cout << "|| n_packets_per_frame_" << n_packets_per_frame_;
cout << endl;
#endif
}
// frame id false: bad status = 2
if (frame_meta->frame_index != image_meta->id) {
image_meta->status = 2;
}
// top
uint32_t source_offset = 0;
@@ -59,7 +97,7 @@ void EigerAssembler::assemble_image(const char* src_meta,
uint32_t dest_offset = 0;
// If bottom -> reversed
const auto reverse = IS_BOTTOM(frame_meta->row);
const auto reverse = IS_BOTTOM(frame_meta->pos_x);
if (reverse == -1) {
line_number = MODULE_Y_SIZE + GAP_Y_MODULE_PIXELS;
reverse_factor = MODULE_Y_SIZE - 1;
@@ -68,8 +106,9 @@ void EigerAssembler::assemble_image(const char* src_meta,
source_offset = (MODULE_Y_SIZE-1) * n_bytes_per_module_line_;
}
const auto i_module_row = frame_meta->row;
const auto i_module_column = frame_meta->column;
const auto i_module_row = frame_meta->pos_x;
const auto i_module_column = frame_meta->pos_y;
uint32_t dest_module_line = line_number;
@@ -79,23 +118,26 @@ void EigerAssembler::assemble_image(const char* src_meta,
#ifdef DEBUG_OUTPUT
using namespace date;
// if (i_module == 1){
cout << " [" << std::chrono::system_clock::now();
cout << "] [MODULE " << i_module;
cout << "] (row " << i_module_row;
cout << " , column)" << i_module_column;
cout << " || reverse_factor" << reverse_factor;
cout << " || line_number" << line_number;
cout << endl;
// if (i_module == 0){
cout << " [" << std::chrono::system_clock::now();
cout << "] [MODULE " << i_module;
cout << "] (row " << i_module_row;
cout << " , column" << i_module_column;
cout << ") || reverse_factor" << reverse_factor;
cout << " || line_number" << line_number;
cout << " || N_RECV_PACKETS" << frame_meta->n_recv_packets;
cout << endl;
// }
#endif
int counter = 0;
for (uint32_t frame_line = 0;
frame_line < n_lines_per_frame_; frame_line++) {
// void * destination, const void * source, size_t num
memcpy (
(char*)(dst_data + dest_offset),
(char*) (src_data + source_offset),
(char*)(src_data + source_offset),
n_bytes_per_module_line_
);
@@ -105,10 +147,10 @@ void EigerAssembler::assemble_image(const char* src_meta,
// beginning and end of each frame
if (counter < 5 || counter > 508){
cout << " [" << std::chrono::system_clock::now();
cout << "] [MODULE :" << i_module;
cout << "] ROW :" << i_module_row;
cout << "] COLUMN :" << i_module_column;
cout << " || source_offset" << source_offset;
cout << "] [MODULE " << i_module;
cout << "] (row " << i_module_row;
cout << ",column " << i_module_column;
cout << ") source_offset" << source_offset;
cout << " || dest_offset " << dest_offset;
cout << " || frame_line " << frame_line;
cout << " || COUNTER " << counter;
@@ -121,5 +163,6 @@ void EigerAssembler::assemble_image(const char* src_meta,
}
line_number += n_lines_per_frame_;
dest_module_line = line_number + n_lines_per_frame_ - 1;
last_image_status_ = image_meta->status;
}
}
@@ -8,6 +8,7 @@
#include <algorithm>
#include <iostream>
#include "date.h"
#include "buffer_config.hpp"
#include "assembler_config.hpp"
+43 -19
View File
@@ -1,17 +1,24 @@
#include <iostream>
#include <string>
#include <zmq.h>
#include "date.h"
#include <chrono>
#ifdef USE_EIGER
#include "eiger.hpp"
#else
#include "jungfrau.hpp"
#endif
#include <RamBuffer.hpp>
#include <BufferUtils.hpp>
#include <AssemblerStats.hpp>
#include <chrono>
#include "date.h"
#include "EigerAssembler.hpp"
#include "assembler_config.hpp"
#include "ZmqPulseSyncReceiver.hpp"
#include "buffer_config.hpp"
using namespace std;
using namespace buffer_config;
@@ -22,12 +29,15 @@ int main (int argc, char *argv[])
if (argc != 3) {
cout << endl;
#ifndef USE_EIGER
cout << "Usage: jf_assembler [detector_json_filename] [bit_depth]" << endl;
cout << "Usage: jf_assembler [detector_json_filename] "
" [bit_depth]" << endl;
#else
cout << "Usage: eiger_assembler [detector_json_filename] [bit_depth]" << endl;
cout << "Usage: eiger_assembler [detector_json_filename] "
" [bit_depth]" << endl;
#endif
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << "\tbit_depth: bit depth of the incoming udp packets." << endl;
cout << "\tdetector_json_filename: detector config file path.";
cout << endl;
cout << "\tbit_depth: bit depth of the image.";
cout << endl;
exit(-1);
@@ -41,7 +51,8 @@ int main (int argc, char *argv[])
zmq_ctx_set(ctx, ZMQ_IO_THREADS, ASSEMBLER_ZMQ_IO_THREADS);
auto sender = BufferUtils::bind_socket(
ctx, config.detector_name, stream_name);
const int n_receivers = config.n_modules * config.n_submodules;
auto receiver_sync = BufferUtils::connect_socket(
ctx, config.detector_name, "sync");
#ifdef DEBUG_OUTPUT
using namespace date;
@@ -50,11 +61,13 @@ int main (int argc, char *argv[])
cout << " Details of Assembler:";
cout << " detector_name: " << config.detector_name;
cout << " || n_modules: " << config.n_modules;
cout << " || n_receivers: " << n_receivers;
cout << endl;
#endif
EigerAssembler assembler(n_receivers, bit_depth);
const size_t FRAME_N_BYTES = MODULE_N_PIXELS * bit_depth / 8;
const size_t N_PACKETS_PER_FRAME = FRAME_N_BYTES / DATA_BYTES_PER_PACKET;
EigerAssembler assembler(config.n_modules, bit_depth);
#ifdef DEBUG_OUTPUT
using namespace date;
@@ -66,7 +79,7 @@ int main (int argc, char *argv[])
#endif
RamBuffer frame_buffer(config.detector_name,
sizeof(ModuleFrame), N_BYTES_PER_MODULE_FRAME(bit_depth), n_receivers,
sizeof(ModuleFrame), FRAME_N_BYTES, config.n_modules,
buffer_config::RAM_BUFFER_N_SLOTS);
@@ -74,23 +87,34 @@ int main (int argc, char *argv[])
sizeof(ImageMetadata), assembler.get_image_n_bytes(), 1,
buffer_config::RAM_BUFFER_N_SLOTS);
ZmqPulseSyncReceiver receiver(ctx, config.detector_name, n_receivers);
AssemblerStats stats(config.detector_name, ASSEMBLER_STATS_MODULO);
uint64_t image_id = 0;
while (true) {
auto pulse_and_sync = receiver.get_next_pulse_id();
// receives the synced image id
zmq_recv(receiver_sync, &image_id, sizeof(image_id), 0);
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [ASSEMBLER::receiver_sync] image_id: ";
cout << image_id;
cout << endl;
#endif
// metadata
auto* src_meta = frame_buffer.get_slot_meta(pulse_and_sync.pulse_id);
auto* src_data = frame_buffer.get_slot_data(pulse_and_sync.pulse_id);
auto* src_meta = frame_buffer.get_slot_meta(image_id);
auto* src_data = frame_buffer.get_slot_data(image_id);
// data
auto* dst_meta = image_buffer.get_slot_meta(pulse_and_sync.pulse_id);
auto* dst_data = image_buffer.get_slot_data(pulse_and_sync.pulse_id);
auto* dst_meta = image_buffer.get_slot_meta(image_id);
auto* dst_data = image_buffer.get_slot_data(image_id);
// assemble
assembler.assemble_image(src_meta, src_data, dst_meta, dst_data);
zmq_send(sender, dst_meta, sizeof(ImageMetadata), 0);
stats.record_stats(
(ImageMetadata*)dst_meta, pulse_and_sync.n_lost_pulses);
(ImageMetadata*)dst_meta, assembler.get_last_img_status());
}
}
+1 -1
View File
@@ -121,7 +121,7 @@ inline uint64_t FrameUdpReceiver::process_packets(
cout << "] [frameudpreceiver::process_packets] :";
cout << " frame " << metadata.frame_index << " || ";
cout << packet_buffer_[i_packet].packetnum << " packets received.";
cout << " pulse id "<< metadata.pulse_id;
cout << " id "<< metadata.id;
cout << endl;
#endif
// buffer is loaded only if this is not the last message.
+3 -1
View File
@@ -45,7 +45,8 @@ int main (int argc, char *argv[]) {
FrameUdpReceiver receiver(udp_port, N_PACKETS_PER_FRAME);
RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame),
FRAME_N_BYTES, config.n_modules);
FRAME_N_BYTES, config.n_modules,
buffer_config::RAM_BUFFER_N_SLOTS);
FrameStats stats(config.detector_name, module_id,
N_PACKETS_PER_FRAME, STATS_TIME);
@@ -61,6 +62,7 @@ int main (int argc, char *argv[]) {
while (true) {
// Reset the metadata and frame buffer for the next frame.
meta.frame_index = 0;
meta.n_recv_packets = 0;
memset(data, 0, FRAME_N_BYTES);
receiver.get_frame_from_udp(meta, data);
+2 -11
View File
@@ -43,18 +43,9 @@ int main (int argc, char *argv[])
auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "sync");
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [Assembler] :";
cout << " Details of Assembler:";
cout << " detector_name: " << config.detector_name;
cout << " || n_modules: " << config.n_modules;
cout << endl;
#endif
RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame),
FRAME_N_BYTES, config.n_modules);
FRAME_N_BYTES, config.n_modules,
buffer_config::RAM_BUFFER_N_SLOTS);
ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules);
SyncStats stats(config.detector_name, SYNC_STATS_MODULO);