diff --git a/core-buffer/include/BufferUtils.hpp b/core-buffer/include/BufferUtils.hpp index 2569318..d24802c 100644 --- a/core-buffer/include/BufferUtils.hpp +++ b/core-buffer/include/BufferUtils.hpp @@ -18,7 +18,6 @@ namespace BufferUtils const std::string detector_name; const int n_modules; - const int n_submodules; const int start_udp_port; const std::string buffer_folder; @@ -32,7 +31,6 @@ namespace BufferUtils << det_config.GAIN_FILENAME << ' ' << det_config.detector_name << ' ' << det_config.n_modules << ' ' - << det_config.n_submodules << ' ' << det_config.start_udp_port << ' ' << det_config.buffer_folder << ' '; } diff --git a/core-buffer/include/formats.hpp b/core-buffer/include/formats.hpp index e643af4..3fc1c6b 100644 --- a/core-buffer/include/formats.hpp +++ b/core-buffer/include/formats.hpp @@ -18,6 +18,10 @@ struct ModuleFrame { }; #pragma pack(pop) +// ImageMetadata status convention +// 0 good image +// 1 frames with missing packets +// 2 frames with different ids #pragma pack(push) #pragma pack(1) @@ -28,11 +32,10 @@ struct ImageMetadata { uint64_t dtype; uint64_t encoding; uint64_t source_id; - uint64_t status; + uint64_t status; uint64_t user_1; uint64_t user_2; }; #pragma pack(pop) - #endif //SF_DAQ_BUFFER_FORMATS_HPP diff --git a/core-buffer/src/BufferUtils.cpp b/core-buffer/src/BufferUtils.cpp index dedcba9..abe0a3d 100644 --- a/core-buffer/src/BufferUtils.cpp +++ b/core-buffer/src/BufferUtils.cpp @@ -167,7 +167,6 @@ BufferUtils::DetectorConfig BufferUtils::read_json_config( config_parameters["gain_file"].GetString(), config_parameters["detector_name"].GetString(), config_parameters["n_modules"].GetInt(), - config_parameters["n_submodules"].GetInt(), config_parameters["start_udp_port"].GetInt(), config_parameters["buffer_folder"].GetString(), }; diff --git a/core-buffer/src/RamBuffer.cpp b/core-buffer/src/RamBuffer.cpp index adef0c1..8b3f298 100644 --- a/core-buffer/src/RamBuffer.cpp +++ b/core-buffer/src/RamBuffer.cpp @@ -29,8 +29,8 @@ RamBuffer::RamBuffer(const string& buffer_name, #ifdef DEBUG_OUTPUT using namespace date; cout << " [" << std::chrono::system_clock::now(); - cout << "] [RamBuffer::RamBuffer] :"; - cout << " || buffer_name_ " << buffer_name; + cout << "] [RamBuffer::RamBuffer] "; + cout << " buffer_name_ " << buffer_name; cout << " || n_modules_ " << n_modules_; cout << " || n_slots_ " << n_slots_; cout << " || meta_bytes_" << meta_bytes_; @@ -90,13 +90,12 @@ void RamBuffer::write_frame( #ifdef DEBUG_OUTPUT using namespace date; cout << " [" << std::chrono::system_clock::now(); - cout << "] [RamBuffer::write_frame] :"; - cout << " || src_meta.id " << src_meta.id; + cout << "] [RamBuffer::write_frame] "; + cout << " src_meta.id " << src_meta.id; cout << " || src_meta.frame_index " << src_meta.frame_index; cout << " || src_meta.n_recv_packets " << src_meta.n_recv_packets; cout << " || src_meta.daq_rec " << src_meta.daq_rec; cout << " || src_meta.module_id " << src_meta.module_id; - cout << " || dst_meta " << &dst_meta; cout << endl; #endif diff --git a/eiger/sf-daq-4/config/eiger.json b/eiger/sf-daq-4/config/eiger.json index 9cfbe1b..51280da 100644 --- a/eiger/sf-daq-4/config/eiger.json +++ b/eiger/sf-daq-4/config/eiger.json @@ -5,9 +5,9 @@ "live_rate": 10, "pedestal_file": "", "gain_file": "", - "detector_name": "Eiger-16", - "n_modules": 1, - "n_submodules":4, + "detector_name": "eiger-16", + "detector_type": "eiger", + "n_modules": 4, "start_udp_port": 50200, "buffer_folder": "" } diff --git a/jf-assembler/include/EigerAssembler.hpp b/jf-assembler/include/EigerAssembler.hpp index c232739..350b050 100644 --- a/jf-assembler/include/EigerAssembler.hpp +++ b/jf-assembler/include/EigerAssembler.hpp @@ -10,6 +10,8 @@ class EigerAssembler { const int bit_depth_; const int n_eiger_modules_; + int last_image_status_; + const uint32_t n_bytes_per_frame_; const uint32_t n_bytes_per_module_line_; const uint32_t n_packets_per_frame_; @@ -21,12 +23,15 @@ class EigerAssembler { const int n_lines_per_frame_; const int image_bytes_; + + public: EigerAssembler(const int n_modules, const int bit_depth); void assemble_image(const char* src_meta, const char* src_data, - char* dst_meta, char* dst_data) const; + char* dst_meta, char* dst_data); size_t get_image_n_bytes() const; size_t get_module_n_bytes() const; + int get_last_img_status() const; friend std::ostream& operator<<(std::ostream& os, const EigerAssembler& p) { diff --git a/jf-assembler/src/AssemblerStats.cpp b/jf-assembler/src/AssemblerStats.cpp index ce038c9..15e821a 100644 --- a/jf-assembler/src/AssemblerStats.cpp +++ b/jf-assembler/src/AssemblerStats.cpp @@ -28,7 +28,7 @@ void AssemblerStats::record_stats( image_counter_++; n_sync_lost_images_ += n_lost_pulses; - if (!meta->is_good_image) { + if (meta->status != 0) { n_corrupted_images_++; } diff --git a/jf-assembler/src/EigerAssembler.cpp b/jf-assembler/src/EigerAssembler.cpp index 7b6508d..73c88dd 100644 --- a/jf-assembler/src/EigerAssembler.cpp +++ b/jf-assembler/src/EigerAssembler.cpp @@ -5,6 +5,7 @@ #include #include "EigerAssembler.hpp" +#include "buffer_config.hpp" #include "eiger.hpp" #include "date.h" @@ -30,6 +31,11 @@ EigerAssembler::EigerAssembler(const int n_modules, const int bit_depth): } +int EigerAssembler::get_last_img_status() const +{ + return last_image_status_; +} + size_t EigerAssembler::get_module_n_bytes() const { return n_bytes_per_frame_; @@ -43,14 +49,46 @@ size_t EigerAssembler::get_image_n_bytes() const void EigerAssembler::assemble_image(const char* src_meta, const char* src_data, char* dst_meta, - char* dst_data) const + char* dst_data) { - + auto is_pulse_init = false; for (int i_module = 0; i_module < n_modules_; i_module++) { // module frame metadata auto frame_meta = (ModuleFrame *)(src_meta + (sizeof(ModuleFrame)* i_module)); // module frame data auto *frame_data = src_data + (n_bytes_per_frame_ * i_module); + // image metadata + auto image_meta = (ImageMetadata *) dst_meta; + // initializes image metadata + if (!is_pulse_init){ + // init good image status = 0 + image_meta->status = 0; + image_meta->id = frame_meta->id; + // todo fill rest of image metadata + // image_meta->height + // image_meta->width + is_pulse_init = 1; + } + + // missing packets: bad status = 1 + if (frame_meta->n_recv_packets != n_packets_per_frame_){ + image_meta->status = 1; + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [EigerAssembler::assemble_image] bad frame :"; + cout << "frame_meta->frame_index != n_packets_per_frame_ "; + cout << "|| i_module: " << i_module; + cout << "|| frame_meta->n_recv_packets " << frame_meta->n_recv_packets; + cout << "|| n_packets_per_frame_" << n_packets_per_frame_; + cout << endl; + #endif + } + + // frame id false: bad status = 2 + if (frame_meta->frame_index != image_meta->id) { + image_meta->status = 2; + } // top uint32_t source_offset = 0; @@ -59,7 +97,7 @@ void EigerAssembler::assemble_image(const char* src_meta, uint32_t dest_offset = 0; // If bottom -> reversed - const auto reverse = IS_BOTTOM(frame_meta->row); + const auto reverse = IS_BOTTOM(frame_meta->pos_x); if (reverse == -1) { line_number = MODULE_Y_SIZE + GAP_Y_MODULE_PIXELS; reverse_factor = MODULE_Y_SIZE - 1; @@ -68,8 +106,9 @@ void EigerAssembler::assemble_image(const char* src_meta, source_offset = (MODULE_Y_SIZE-1) * n_bytes_per_module_line_; } - const auto i_module_row = frame_meta->row; - const auto i_module_column = frame_meta->column; + const auto i_module_row = frame_meta->pos_x; + const auto i_module_column = frame_meta->pos_y; + uint32_t dest_module_line = line_number; @@ -79,23 +118,26 @@ void EigerAssembler::assemble_image(const char* src_meta, #ifdef DEBUG_OUTPUT using namespace date; - // if (i_module == 1){ - cout << " [" << std::chrono::system_clock::now(); - cout << "] [MODULE " << i_module; - cout << "] (row " << i_module_row; - cout << " , column)" << i_module_column; - cout << " || reverse_factor" << reverse_factor; - cout << " || line_number" << line_number; - cout << endl; + // if (i_module == 0){ + cout << " [" << std::chrono::system_clock::now(); + cout << "] [MODULE " << i_module; + cout << "] (row " << i_module_row; + cout << " , column" << i_module_column; + cout << ") || reverse_factor" << reverse_factor; + cout << " || line_number" << line_number; + cout << " || N_RECV_PACKETS" << frame_meta->n_recv_packets; + cout << endl; // } #endif + int counter = 0; + for (uint32_t frame_line = 0; frame_line < n_lines_per_frame_; frame_line++) { // void * destination, const void * source, size_t num memcpy ( (char*)(dst_data + dest_offset), - (char*) (src_data + source_offset), + (char*)(src_data + source_offset), n_bytes_per_module_line_ ); @@ -105,10 +147,10 @@ void EigerAssembler::assemble_image(const char* src_meta, // beginning and end of each frame if (counter < 5 || counter > 508){ cout << " [" << std::chrono::system_clock::now(); - cout << "] [MODULE :" << i_module; - cout << "] ROW :" << i_module_row; - cout << "] COLUMN :" << i_module_column; - cout << " || source_offset" << source_offset; + cout << "] [MODULE " << i_module; + cout << "] (row " << i_module_row; + cout << ",column " << i_module_column; + cout << ") source_offset" << source_offset; cout << " || dest_offset " << dest_offset; cout << " || frame_line " << frame_line; cout << " || COUNTER " << counter; @@ -121,5 +163,6 @@ void EigerAssembler::assemble_image(const char* src_meta, } line_number += n_lines_per_frame_; dest_module_line = line_number + n_lines_per_frame_ - 1; + last_image_status_ = image_meta->status; } } diff --git a/jf-assembler/src/ZmqPulseSyncReceiver.cpp b/jf-assembler/src/ZmqPulseSyncReceiver.cpp index 79f122c..efd0f46 100644 --- a/jf-assembler/src/ZmqPulseSyncReceiver.cpp +++ b/jf-assembler/src/ZmqPulseSyncReceiver.cpp @@ -8,6 +8,7 @@ #include #include #include "date.h" +#include "buffer_config.hpp" #include "assembler_config.hpp" diff --git a/jf-assembler/src/main.cpp b/jf-assembler/src/main.cpp index 53c73aa..7ddd6db 100644 --- a/jf-assembler/src/main.cpp +++ b/jf-assembler/src/main.cpp @@ -1,13 +1,13 @@ #include #include #include +#include "date.h" +#include + #include #include #include -#include - -#include "date.h" #include "EigerAssembler.hpp" #include "assembler_config.hpp" #include "ZmqPulseSyncReceiver.hpp" @@ -27,12 +27,15 @@ int main (int argc, char *argv[]) if (argc != 3) { cout << endl; #ifndef USE_EIGER - cout << "Usage: jf_assembler [detector_json_filename] [bit_depth]" << endl; + cout << "Usage: jf_assembler [detector_json_filename] " + " [bit_depth]" << endl; #else - cout << "Usage: eiger_assembler [detector_json_filename] [bit_depth]" << endl; + cout << "Usage: eiger_assembler [detector_json_filename] " + " [bit_depth]" << endl; #endif - cout << "\tdetector_json_filename: detector config file path." << endl; - cout << "\tbit_depth: bit depth of the incoming udp packets." << endl; + cout << "\tdetector_json_filename: detector config file path."; + cout << endl; + cout << "\tbit_depth: bit depth of the image."; cout << endl; exit(-1); @@ -46,7 +49,8 @@ int main (int argc, char *argv[]) zmq_ctx_set(ctx, ZMQ_IO_THREADS, ASSEMBLER_ZMQ_IO_THREADS); auto sender = BufferUtils::bind_socket( ctx, config.detector_name, stream_name); - const int n_receivers = config.n_modules * config.n_submodules; + auto receiver_sync = BufferUtils::connect_socket( + ctx, config.detector_name, "sync"); #ifdef DEBUG_OUTPUT using namespace date; @@ -55,11 +59,13 @@ int main (int argc, char *argv[]) cout << " Details of Assembler:"; cout << " detector_name: " << config.detector_name; cout << " || n_modules: " << config.n_modules; - cout << " || n_receivers: " << n_receivers; cout << endl; #endif - EigerAssembler assembler(n_receivers, bit_depth); + const size_t FRAME_N_BYTES = MODULE_N_PIXELS * bit_depth / 8; + const size_t N_PACKETS_PER_FRAME = FRAME_N_BYTES / DATA_BYTES_PER_PACKET; + + EigerAssembler assembler(config.n_modules, bit_depth); #ifdef DEBUG_OUTPUT using namespace date; @@ -71,7 +77,7 @@ int main (int argc, char *argv[]) #endif RamBuffer frame_buffer(config.detector_name, - sizeof(ModuleFrame), N_BYTES_PER_MODULE_FRAME(bit_depth), n_receivers, + sizeof(ModuleFrame), FRAME_N_BYTES, config.n_modules, buffer_config::RAM_BUFFER_N_SLOTS); @@ -79,23 +85,34 @@ int main (int argc, char *argv[]) sizeof(ImageMetadata), assembler.get_image_n_bytes(), 1, buffer_config::RAM_BUFFER_N_SLOTS); - ZmqPulseSyncReceiver receiver(ctx, config.detector_name, n_receivers); AssemblerStats stats(config.detector_name, ASSEMBLER_STATS_MODULO); - + + + uint64_t image_id = 0; + while (true) { - auto pulse_and_sync = receiver.get_next_pulse_id(); + // receives the synced image id + zmq_recv(receiver_sync, &image_id, sizeof(image_id), 0); + + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [ASSEMBLER::receiver_sync] image_id: "; + cout << image_id; + cout << endl; + #endif // metadata - auto* src_meta = frame_buffer.get_slot_meta(pulse_and_sync.pulse_id); - auto* src_data = frame_buffer.get_slot_data(pulse_and_sync.pulse_id); + auto* src_meta = frame_buffer.get_slot_meta(image_id); + auto* src_data = frame_buffer.get_slot_data(image_id); // data - auto* dst_meta = image_buffer.get_slot_meta(pulse_and_sync.pulse_id); - auto* dst_data = image_buffer.get_slot_data(pulse_and_sync.pulse_id); + auto* dst_meta = image_buffer.get_slot_meta(image_id); + auto* dst_data = image_buffer.get_slot_data(image_id); // assemble assembler.assemble_image(src_meta, src_data, dst_meta, dst_data); zmq_send(sender, dst_meta, sizeof(ImageMetadata), 0); stats.record_stats( - (ImageMetadata*)dst_meta, pulse_and_sync.n_lost_pulses); + (ImageMetadata*)dst_meta, assembler.get_last_img_status()); } } diff --git a/jf-udp-recv/src/main.cpp b/jf-udp-recv/src/main.cpp deleted file mode 100644 index 598c16c..0000000 --- a/jf-udp-recv/src/main.cpp +++ /dev/null @@ -1,80 +0,0 @@ -#include -#include -#include -#include - -#include "formats.hpp" -#include "buffer_config.hpp" -#include "FrameUdpReceiver.hpp" -#include "BufferUtils.hpp" -#include "FrameStats.hpp" - -using namespace std; -using namespace chrono; -using namespace buffer_config; -using namespace BufferUtils; - - - -int main (int argc, char *argv[]) { - - if (argc != 4) { - cout << endl; - #ifndef USE_EIGER - cout << "Usage: jf_udp_recv [detector_json_filename] [module_id] [bit_depth]"; - #else - cout << "Usage: eiger_udp_recv [detector_json_filename] [module_id] [bit_depth]"; - #endif - cout << endl; - cout << "\tdetector_json_filename: detector config file path." << endl; - cout << "\tmodule_id: id of the module for this process." << endl; - cout << "\tbit_depth: bit depth of the incoming udp packets." << endl; - cout << endl; - exit(-1); - } - - const auto config = read_json_config(string(argv[1])); - const int module_id = atoi(argv[2]); - const int bit_depth = atoi(argv[3]); - const int n_receivers = config.n_modules * config.n_submodules; - const auto udp_port = config.start_udp_port + module_id; - - FrameUdpReceiver receiver(module_id, udp_port, n_receivers, config.n_submodules, bit_depth); - RamBuffer buffer(config.detector_name, sizeof(ModuleFrame), N_BYTES_PER_MODULE_FRAME(bit_depth), n_receivers, - buffer_config::RAM_BUFFER_N_SLOTS); - FrameStats stats(config.detector_name, n_receivers, module_id, bit_depth, STATS_TIME); - - auto ctx = zmq_ctx_new(); - auto socket = bind_socket(ctx, config.detector_name, to_string(module_id)); - - ModuleFrame meta; - char* data = new char[MODULE_N_PIXELS * bit_depth / 8]; - - uint64_t pulse_id_previous = 0; - uint64_t frame_index_previous = 0; - - while (true) { - - auto pulse_id = receiver.get_frame_from_udp(meta, data); - - bool bad_pulse_id = false; - if ( ( meta.frame_index != (frame_index_previous+1) ) || - ( (meta.frame_index-frame_index_previous) <= 0 ) || - ( (meta.frame_index-frame_index_previous) > 1000 ) ){ - bad_pulse_id = true; - } else { - buffer.write_frame(meta, data); - - zmq_send(socket, &pulse_id, sizeof(pulse_id), 0); - - } - - stats.record_stats(meta, bad_pulse_id); - - pulse_id_previous = pulse_id; - frame_index_previous = meta.frame_index; - - } - - delete[] data; -} diff --git a/std-udp-recv/src/FrameUdpReceiver.cpp b/std-udp-recv/src/FrameUdpReceiver.cpp index cd12760..e3f2a3a 100644 --- a/std-udp-recv/src/FrameUdpReceiver.cpp +++ b/std-udp-recv/src/FrameUdpReceiver.cpp @@ -124,7 +124,6 @@ inline uint64_t FrameUdpReceiver::process_packets( cout << " frame " << metadata.frame_index << " || "; cout << packet_buffer_[i_packet].packetnum+1; cout << " packets received."; - cout << " pulse id "<< metadata.pulse_id; cout << endl; #endif // buffer is loaded only if this is not the last message. diff --git a/std-udp-recv/src/main.cpp b/std-udp-recv/src/main.cpp index 591d9b5..c9dc862 100644 --- a/std-udp-recv/src/main.cpp +++ b/std-udp-recv/src/main.cpp @@ -59,6 +59,9 @@ int main (int argc, char *argv[]) { char* data = new char[FRAME_N_BYTES]; while (true) { + // Reset the metadata and frame buffer for the next frame. + meta.frame_index = 0; + meta.n_recv_packets = 0; // Reset the data buffer. memset(data, 0, FRAME_N_BYTES); diff --git a/std-udp-sync/src/ZmqPulseSyncReceiver.cpp b/std-udp-sync/src/ZmqPulseSyncReceiver.cpp index 63682bd..85c9841 100644 --- a/std-udp-sync/src/ZmqPulseSyncReceiver.cpp +++ b/std-udp-sync/src/ZmqPulseSyncReceiver.cpp @@ -53,9 +53,17 @@ PulseAndSync ZmqPulseSyncReceiver::get_next_pulse_id() const } if (modules_in_sync) { + #ifdef DEBUG_OUTPUT + using namespace date; + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << " [ZmqPulseSyncReceiver::get_next_pulse_id]"; + cout << " Modules in sync ("; + cout << " pulse_id " << ids[0] <<")."; + cout << endl; + #endif return {ids[0], i_sync}; } - + #ifdef DEBUG_OUTPUT using namespace date; cout << "[" << std::chrono::system_clock::now() << "]"; diff --git a/std-udp-sync/src/main.cpp b/std-udp-sync/src/main.cpp index 2bfd5dd..b2d5150 100644 --- a/std-udp-sync/src/main.cpp +++ b/std-udp-sync/src/main.cpp @@ -45,16 +45,6 @@ int main (int argc, char *argv[]) auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "sync"); - #ifdef DEBUG_OUTPUT - using namespace date; - cout << " [" << std::chrono::system_clock::now(); - cout << "] [Assembler] :"; - cout << " Details of Assembler:"; - cout << " detector_name: " << config.detector_name; - cout << " || n_modules: " << config.n_modules; - cout << endl; - #endif - RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame), FRAME_N_BYTES, config.n_modules, RAM_BUFFER_N_SLOTS);