diff --git a/core-buffer/include/BufferUtils.hpp b/core-buffer/include/BufferUtils.hpp index 2569318..d24802c 100644 --- a/core-buffer/include/BufferUtils.hpp +++ b/core-buffer/include/BufferUtils.hpp @@ -18,7 +18,6 @@ namespace BufferUtils const std::string detector_name; const int n_modules; - const int n_submodules; const int start_udp_port; const std::string buffer_folder; @@ -32,7 +31,6 @@ namespace BufferUtils << det_config.GAIN_FILENAME << ' ' << det_config.detector_name << ' ' << det_config.n_modules << ' ' - << det_config.n_submodules << ' ' << det_config.start_udp_port << ' ' << det_config.buffer_folder << ' '; } diff --git a/core-buffer/include/formats.hpp b/core-buffer/include/formats.hpp index 7ae731f..bb134c3 100644 --- a/core-buffer/include/formats.hpp +++ b/core-buffer/include/formats.hpp @@ -16,6 +16,10 @@ struct ModuleFrame { }; #pragma pack(pop) +// ImageMetadata status convention +// 0 good image +// 1 frames with missing packets +// 2 frames with different ids #pragma pack(push) #pragma pack(1) @@ -26,11 +30,10 @@ struct ImageMetadata { uint64_t dtype; uint64_t encoding; uint64_t source_id; - uint64_t status; + uint64_t status; uint64_t user_1; uint64_t user_2; }; #pragma pack(pop) - #endif //SF_DAQ_BUFFER_FORMATS_HPP diff --git a/core-buffer/src/BufferUtils.cpp b/core-buffer/src/BufferUtils.cpp index dedcba9..abe0a3d 100644 --- a/core-buffer/src/BufferUtils.cpp +++ b/core-buffer/src/BufferUtils.cpp @@ -167,7 +167,6 @@ BufferUtils::DetectorConfig BufferUtils::read_json_config( config_parameters["gain_file"].GetString(), config_parameters["detector_name"].GetString(), config_parameters["n_modules"].GetInt(), - config_parameters["n_submodules"].GetInt(), config_parameters["start_udp_port"].GetInt(), config_parameters["buffer_folder"].GetString(), }; diff --git a/core-buffer/src/RamBuffer.cpp b/core-buffer/src/RamBuffer.cpp index f52027f..b7a20da 100644 --- a/core-buffer/src/RamBuffer.cpp +++ b/core-buffer/src/RamBuffer.cpp @@ -84,9 +84,9 @@ void RamBuffer::write_frame( const char *src_data) const { auto *dst_meta = (ModuleFrame*) get_frame_meta( - src_meta.pulse_id, src_meta.module_id); + src_meta.id, src_meta.module_id); auto *dst_data = get_frame_data( - src_meta.pulse_id, src_meta.module_id); + src_meta.id, src_meta.module_id); #ifdef DEBUG_OUTPUT using namespace date; @@ -96,7 +96,6 @@ void RamBuffer::write_frame( cout << " || src_meta.n_recv_packets " << src_meta.n_recv_packets; cout << " || src_meta.daq_rec " << src_meta.daq_rec; cout << " || src_meta.module_id " << src_meta.module_id; - cout << " || dst_meta " << &dst_meta; cout << endl; #endif diff --git a/core-buffer/test/test_RamBuffer.cpp b/core-buffer/test/test_RamBuffer.cpp index aa7c8f2..6078958 100644 --- a/core-buffer/test/test_RamBuffer.cpp +++ b/core-buffer/test/test_RamBuffer.cpp @@ -33,5 +33,5 @@ TEST(RamBuffer, simple_store) ASSERT_EQ(image_meta.pulse_id, frame_meta.pulse_id); ASSERT_EQ(image_meta.daq_rec, frame_meta.daq_rec); ASSERT_EQ(image_meta.frame_index, frame_meta.frame_index); - ASSERT_EQ(image_meta.is_good_image, 1); + ASSERT_EQ(image_meta.status, 0); } diff --git a/eiger/sf-daq-4/config/eiger.json b/eiger/sf-daq-4/config/eiger.json index 9cfbe1b..51280da 100644 --- a/eiger/sf-daq-4/config/eiger.json +++ b/eiger/sf-daq-4/config/eiger.json @@ -5,9 +5,9 @@ "live_rate": 10, "pedestal_file": "", "gain_file": "", - "detector_name": "Eiger-16", - "n_modules": 1, - "n_submodules":4, + "detector_name": "eiger-16", + "detector_type": "eiger", + "n_modules": 4, "start_udp_port": 50200, "buffer_folder": "" } diff --git a/jf-assembler/include/EigerAssembler.hpp b/jf-assembler/include/EigerAssembler.hpp index c232739..350b050 100644 --- a/jf-assembler/include/EigerAssembler.hpp +++ b/jf-assembler/include/EigerAssembler.hpp @@ -10,6 +10,8 @@ class EigerAssembler { const int bit_depth_; const int n_eiger_modules_; + int last_image_status_; + const uint32_t n_bytes_per_frame_; const uint32_t n_bytes_per_module_line_; const uint32_t n_packets_per_frame_; @@ -21,12 +23,15 @@ class EigerAssembler { const int n_lines_per_frame_; const int image_bytes_; + + public: EigerAssembler(const int n_modules, const int bit_depth); void assemble_image(const char* src_meta, const char* src_data, - char* dst_meta, char* dst_data) const; + char* dst_meta, char* dst_data); size_t get_image_n_bytes() const; size_t get_module_n_bytes() const; + int get_last_img_status() const; friend std::ostream& operator<<(std::ostream& os, const EigerAssembler& p) { diff --git a/jf-assembler/src/AssemblerStats.cpp b/jf-assembler/src/AssemblerStats.cpp index ce038c9..15e821a 100644 --- a/jf-assembler/src/AssemblerStats.cpp +++ b/jf-assembler/src/AssemblerStats.cpp @@ -28,7 +28,7 @@ void AssemblerStats::record_stats( image_counter_++; n_sync_lost_images_ += n_lost_pulses; - if (!meta->is_good_image) { + if (meta->status != 0) { n_corrupted_images_++; } diff --git a/jf-assembler/src/EigerAssembler.cpp b/jf-assembler/src/EigerAssembler.cpp index 7b6508d..73c88dd 100644 --- a/jf-assembler/src/EigerAssembler.cpp +++ b/jf-assembler/src/EigerAssembler.cpp @@ -5,6 +5,7 @@ #include #include "EigerAssembler.hpp" +#include "buffer_config.hpp" #include "eiger.hpp" #include "date.h" @@ -30,6 +31,11 @@ EigerAssembler::EigerAssembler(const int n_modules, const int bit_depth): } +int EigerAssembler::get_last_img_status() const +{ + return last_image_status_; +} + size_t EigerAssembler::get_module_n_bytes() const { return n_bytes_per_frame_; @@ -43,14 +49,46 @@ size_t EigerAssembler::get_image_n_bytes() const void EigerAssembler::assemble_image(const char* src_meta, const char* src_data, char* dst_meta, - char* dst_data) const + char* dst_data) { - + auto is_pulse_init = false; for (int i_module = 0; i_module < n_modules_; i_module++) { // module frame metadata auto frame_meta = (ModuleFrame *)(src_meta + (sizeof(ModuleFrame)* i_module)); // module frame data auto *frame_data = src_data + (n_bytes_per_frame_ * i_module); + // image metadata + auto image_meta = (ImageMetadata *) dst_meta; + // initializes image metadata + if (!is_pulse_init){ + // init good image status = 0 + image_meta->status = 0; + image_meta->id = frame_meta->id; + // todo fill rest of image metadata + // image_meta->height + // image_meta->width + is_pulse_init = 1; + } + + // missing packets: bad status = 1 + if (frame_meta->n_recv_packets != n_packets_per_frame_){ + image_meta->status = 1; + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [EigerAssembler::assemble_image] bad frame :"; + cout << "frame_meta->frame_index != n_packets_per_frame_ "; + cout << "|| i_module: " << i_module; + cout << "|| frame_meta->n_recv_packets " << frame_meta->n_recv_packets; + cout << "|| n_packets_per_frame_" << n_packets_per_frame_; + cout << endl; + #endif + } + + // frame id false: bad status = 2 + if (frame_meta->frame_index != image_meta->id) { + image_meta->status = 2; + } // top uint32_t source_offset = 0; @@ -59,7 +97,7 @@ void EigerAssembler::assemble_image(const char* src_meta, uint32_t dest_offset = 0; // If bottom -> reversed - const auto reverse = IS_BOTTOM(frame_meta->row); + const auto reverse = IS_BOTTOM(frame_meta->pos_x); if (reverse == -1) { line_number = MODULE_Y_SIZE + GAP_Y_MODULE_PIXELS; reverse_factor = MODULE_Y_SIZE - 1; @@ -68,8 +106,9 @@ void EigerAssembler::assemble_image(const char* src_meta, source_offset = (MODULE_Y_SIZE-1) * n_bytes_per_module_line_; } - const auto i_module_row = frame_meta->row; - const auto i_module_column = frame_meta->column; + const auto i_module_row = frame_meta->pos_x; + const auto i_module_column = frame_meta->pos_y; + uint32_t dest_module_line = line_number; @@ -79,23 +118,26 @@ void EigerAssembler::assemble_image(const char* src_meta, #ifdef DEBUG_OUTPUT using namespace date; - // if (i_module == 1){ - cout << " [" << std::chrono::system_clock::now(); - cout << "] [MODULE " << i_module; - cout << "] (row " << i_module_row; - cout << " , column)" << i_module_column; - cout << " || reverse_factor" << reverse_factor; - cout << " || line_number" << line_number; - cout << endl; + // if (i_module == 0){ + cout << " [" << std::chrono::system_clock::now(); + cout << "] [MODULE " << i_module; + cout << "] (row " << i_module_row; + cout << " , column" << i_module_column; + cout << ") || reverse_factor" << reverse_factor; + cout << " || line_number" << line_number; + cout << " || N_RECV_PACKETS" << frame_meta->n_recv_packets; + cout << endl; // } #endif + int counter = 0; + for (uint32_t frame_line = 0; frame_line < n_lines_per_frame_; frame_line++) { // void * destination, const void * source, size_t num memcpy ( (char*)(dst_data + dest_offset), - (char*) (src_data + source_offset), + (char*)(src_data + source_offset), n_bytes_per_module_line_ ); @@ -105,10 +147,10 @@ void EigerAssembler::assemble_image(const char* src_meta, // beginning and end of each frame if (counter < 5 || counter > 508){ cout << " [" << std::chrono::system_clock::now(); - cout << "] [MODULE :" << i_module; - cout << "] ROW :" << i_module_row; - cout << "] COLUMN :" << i_module_column; - cout << " || source_offset" << source_offset; + cout << "] [MODULE " << i_module; + cout << "] (row " << i_module_row; + cout << ",column " << i_module_column; + cout << ") source_offset" << source_offset; cout << " || dest_offset " << dest_offset; cout << " || frame_line " << frame_line; cout << " || COUNTER " << counter; @@ -121,5 +163,6 @@ void EigerAssembler::assemble_image(const char* src_meta, } line_number += n_lines_per_frame_; dest_module_line = line_number + n_lines_per_frame_ - 1; + last_image_status_ = image_meta->status; } } diff --git a/jf-assembler/src/ZmqPulseSyncReceiver.cpp b/jf-assembler/src/ZmqPulseSyncReceiver.cpp index 79f122c..efd0f46 100644 --- a/jf-assembler/src/ZmqPulseSyncReceiver.cpp +++ b/jf-assembler/src/ZmqPulseSyncReceiver.cpp @@ -8,6 +8,7 @@ #include #include #include "date.h" +#include "buffer_config.hpp" #include "assembler_config.hpp" diff --git a/jf-assembler/src/main.cpp b/jf-assembler/src/main.cpp index 2d0aacb..9550526 100644 --- a/jf-assembler/src/main.cpp +++ b/jf-assembler/src/main.cpp @@ -1,17 +1,24 @@ #include #include #include +#include "date.h" +#include + +#ifdef USE_EIGER + #include "eiger.hpp" +#else + #include "jungfrau.hpp" +#endif + + #include #include #include -#include - -#include "date.h" #include "EigerAssembler.hpp" #include "assembler_config.hpp" #include "ZmqPulseSyncReceiver.hpp" - +#include "buffer_config.hpp" using namespace std; using namespace buffer_config; @@ -22,12 +29,15 @@ int main (int argc, char *argv[]) if (argc != 3) { cout << endl; #ifndef USE_EIGER - cout << "Usage: jf_assembler [detector_json_filename] [bit_depth]" << endl; + cout << "Usage: jf_assembler [detector_json_filename] " + " [bit_depth]" << endl; #else - cout << "Usage: eiger_assembler [detector_json_filename] [bit_depth]" << endl; + cout << "Usage: eiger_assembler [detector_json_filename] " + " [bit_depth]" << endl; #endif - cout << "\tdetector_json_filename: detector config file path." << endl; - cout << "\tbit_depth: bit depth of the incoming udp packets." << endl; + cout << "\tdetector_json_filename: detector config file path."; + cout << endl; + cout << "\tbit_depth: bit depth of the image."; cout << endl; exit(-1); @@ -41,7 +51,8 @@ int main (int argc, char *argv[]) zmq_ctx_set(ctx, ZMQ_IO_THREADS, ASSEMBLER_ZMQ_IO_THREADS); auto sender = BufferUtils::bind_socket( ctx, config.detector_name, stream_name); - const int n_receivers = config.n_modules * config.n_submodules; + auto receiver_sync = BufferUtils::connect_socket( + ctx, config.detector_name, "sync"); #ifdef DEBUG_OUTPUT using namespace date; @@ -50,11 +61,13 @@ int main (int argc, char *argv[]) cout << " Details of Assembler:"; cout << " detector_name: " << config.detector_name; cout << " || n_modules: " << config.n_modules; - cout << " || n_receivers: " << n_receivers; cout << endl; #endif - EigerAssembler assembler(n_receivers, bit_depth); + const size_t FRAME_N_BYTES = MODULE_N_PIXELS * bit_depth / 8; + const size_t N_PACKETS_PER_FRAME = FRAME_N_BYTES / DATA_BYTES_PER_PACKET; + + EigerAssembler assembler(config.n_modules, bit_depth); #ifdef DEBUG_OUTPUT using namespace date; @@ -66,7 +79,7 @@ int main (int argc, char *argv[]) #endif RamBuffer frame_buffer(config.detector_name, - sizeof(ModuleFrame), N_BYTES_PER_MODULE_FRAME(bit_depth), n_receivers, + sizeof(ModuleFrame), FRAME_N_BYTES, config.n_modules, buffer_config::RAM_BUFFER_N_SLOTS); @@ -74,23 +87,34 @@ int main (int argc, char *argv[]) sizeof(ImageMetadata), assembler.get_image_n_bytes(), 1, buffer_config::RAM_BUFFER_N_SLOTS); - ZmqPulseSyncReceiver receiver(ctx, config.detector_name, n_receivers); AssemblerStats stats(config.detector_name, ASSEMBLER_STATS_MODULO); + + uint64_t image_id = 0; + while (true) { - auto pulse_and_sync = receiver.get_next_pulse_id(); + // receives the synced image id + zmq_recv(receiver_sync, &image_id, sizeof(image_id), 0); + + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [ASSEMBLER::receiver_sync] image_id: "; + cout << image_id; + cout << endl; + #endif // metadata - auto* src_meta = frame_buffer.get_slot_meta(pulse_and_sync.pulse_id); - auto* src_data = frame_buffer.get_slot_data(pulse_and_sync.pulse_id); + auto* src_meta = frame_buffer.get_slot_meta(image_id); + auto* src_data = frame_buffer.get_slot_data(image_id); // data - auto* dst_meta = image_buffer.get_slot_meta(pulse_and_sync.pulse_id); - auto* dst_data = image_buffer.get_slot_data(pulse_and_sync.pulse_id); + auto* dst_meta = image_buffer.get_slot_meta(image_id); + auto* dst_data = image_buffer.get_slot_data(image_id); // assemble assembler.assemble_image(src_meta, src_data, dst_meta, dst_data); zmq_send(sender, dst_meta, sizeof(ImageMetadata), 0); stats.record_stats( - (ImageMetadata*)dst_meta, pulse_and_sync.n_lost_pulses); + (ImageMetadata*)dst_meta, assembler.get_last_img_status()); } } diff --git a/std-udp-recv/src/FrameUdpReceiver.cpp b/std-udp-recv/src/FrameUdpReceiver.cpp index 92b7a9c..ba11a20 100644 --- a/std-udp-recv/src/FrameUdpReceiver.cpp +++ b/std-udp-recv/src/FrameUdpReceiver.cpp @@ -121,7 +121,7 @@ inline uint64_t FrameUdpReceiver::process_packets( cout << "] [frameudpreceiver::process_packets] :"; cout << " frame " << metadata.frame_index << " || "; cout << packet_buffer_[i_packet].packetnum << " packets received."; - cout << " pulse id "<< metadata.pulse_id; + cout << " id "<< metadata.id; cout << endl; #endif // buffer is loaded only if this is not the last message. diff --git a/std-udp-recv/src/main.cpp b/std-udp-recv/src/main.cpp index c9c1a06..35a8b72 100644 --- a/std-udp-recv/src/main.cpp +++ b/std-udp-recv/src/main.cpp @@ -45,7 +45,8 @@ int main (int argc, char *argv[]) { FrameUdpReceiver receiver(udp_port, N_PACKETS_PER_FRAME); RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame), - FRAME_N_BYTES, config.n_modules); + FRAME_N_BYTES, config.n_modules, + buffer_config::RAM_BUFFER_N_SLOTS); FrameStats stats(config.detector_name, module_id, N_PACKETS_PER_FRAME, STATS_TIME); @@ -61,6 +62,7 @@ int main (int argc, char *argv[]) { while (true) { // Reset the metadata and frame buffer for the next frame. meta.frame_index = 0; + meta.n_recv_packets = 0; memset(data, 0, FRAME_N_BYTES); receiver.get_frame_from_udp(meta, data); diff --git a/std-udp-sync/src/main.cpp b/std-udp-sync/src/main.cpp index 1068091..3a1702d 100644 --- a/std-udp-sync/src/main.cpp +++ b/std-udp-sync/src/main.cpp @@ -43,18 +43,9 @@ int main (int argc, char *argv[]) auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "sync"); - #ifdef DEBUG_OUTPUT - using namespace date; - cout << " [" << std::chrono::system_clock::now(); - cout << "] [Assembler] :"; - cout << " Details of Assembler:"; - cout << " detector_name: " << config.detector_name; - cout << " || n_modules: " << config.n_modules; - cout << endl; - #endif - RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame), - FRAME_N_BYTES, config.n_modules); + FRAME_N_BYTES, config.n_modules, + buffer_config::RAM_BUFFER_N_SLOTS); ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules); SyncStats stats(config.detector_name, SYNC_STATS_MODULO);