diff --git a/CMakeLists.txt b/CMakeLists.txt index 491fb13..659699f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -29,6 +29,10 @@ add_subdirectory( EXCLUDE_FROM_ALL) add_subdirectory("core-buffer") -add_subdirectory("sf-buffer") +add_subdirectory("jf-udp-recv") +add_subdirectory("jf-buffer-writer") +add_subdirectory("jf-assembler") add_subdirectory("sf-stream") add_subdirectory("sf-writer") +#add_subdirectory("jf-live-writer") + diff --git a/core-buffer/CMakeLists.txt b/core-buffer/CMakeLists.txt index f8e3b5c..e87a3b2 100644 --- a/core-buffer/CMakeLists.txt +++ b/core-buffer/CMakeLists.txt @@ -5,43 +5,8 @@ file(GLOB SOURCES add_library(core-buffer-lib STATIC ${SOURCES}) target_include_directories(core-buffer-lib PUBLIC include/) - -#if(CMAKE_BUILD_TYPE STREQUAL "Debug") -# target_compile_definitions(core-buffer-lib PRIVATE DEBUG_OUTPUT) -#endif() - -#add_executable(sf-replay src/replay/sf_replay.cpp) -#set_target_properties(sf-replay PROPERTIES OUTPUT_NAME sf_replay) -#target_link_libraries(sf-replay -# core-buffer -# external -# zmq -# hdf5 -# hdf5_hl -# hdf5_cpp -# boost_system -# pthread) -# -#add_executable(sf-writer src/writer/sf_writer.cpp) -#set_target_properties(sf-writer PROPERTIES OUTPUT_NAME sf_writer) -#target_link_libraries(sf-writer -# core-buffer -# external -# zmq -# hdf5 -# hdf5_hl -# hdf5_cpp -# boost_system -# pthread) -# -#add_executable(sf-stream src/stream/sf_stream.cpp) -#set_target_properties(sf-stream PROPERTIES OUTPUT_NAME sf_stream) -#target_link_libraries(sf-stream -# core-buffer -# zmq -# jsoncpp -# boost_system -# pthread) +target_link_libraries(core-buffer-lib + external) enable_testing() add_subdirectory(test/) \ No newline at end of file diff --git a/core-buffer/include/BufferUtils.hpp b/core-buffer/include/BufferUtils.hpp index 37f9f62..a1219ff 100644 --- a/core-buffer/include/BufferUtils.hpp +++ b/core-buffer/include/BufferUtils.hpp @@ -6,18 +6,52 @@ namespace BufferUtils { - std::string get_filename( - std::string detector_folder, - std::string module_name, - uint64_t pulse_id); - std::size_t get_file_frame_index(uint64_t pulse_id); + struct DetectorConfig { + const std::string streamvis_address; + const int reduction_factor_streamvis; + const std::string live_analysis_address; + const int reduction_factor_live_analysis; + const std::string PEDE_FILENAME; + const std::string GAIN_FILENAME; + + const std::string detector_name; + const int n_modules; + const int start_udp_port; + const std::string buffer_folder; + const int image_y_size; + const int image_x_size; + }; + + + std::string get_filename( + const std::string& detector_folder, + const std::string& module_name, + const uint64_t pulse_id); + + std::string get_image_filename( + const std::string& detector_folder, + const uint64_t pulse_id); + + std::size_t get_file_frame_index(const uint64_t pulse_id); void update_latest_file( const std::string& latest_filename, const std::string& filename_to_write); void create_destination_folder(const std::string& output_file); + + void* bind_socket( + void* ctx, + const std::string& detector_name, + const std::string& stream_name); + + void* connect_socket( + void* ctx, + const std::string& detector_name, + const std::string& stream_name); + + DetectorConfig read_json_config(const std::string& filename); } #endif //BUFFER_UTILS_HPP diff --git a/core-buffer/include/RamBuffer.hpp b/core-buffer/include/RamBuffer.hpp new file mode 100644 index 0000000..91872cb --- /dev/null +++ b/core-buffer/include/RamBuffer.hpp @@ -0,0 +1,39 @@ +#ifndef SF_DAQ_BUFFER_RAMBUFFER_HPP +#define SF_DAQ_BUFFER_RAMBUFFER_HPP + +#include +#include "formats.hpp" + +class RamBuffer { + const std::string detector_name_; + const int n_modules_; + const int n_slots_; + + const size_t meta_bytes_; + const size_t image_bytes_; + const size_t buffer_bytes_; + + int shm_fd_; + void* buffer_; + + ModuleFrame* meta_buffer_; + char* image_buffer_; + +public: + RamBuffer(const std::string& detector_name, + const int n_modules, + const int n_slots=buffer_config::RAM_BUFFER_N_SLOTS); + ~RamBuffer(); + + void write_frame(const ModuleFrame &src_meta, const char *src_data) const; + void read_frame(const uint64_t pulse_id, + const uint64_t module_id, + ModuleFrame &meta, + char *data) const; + char* read_image(const uint64_t pulse_id) const; + void assemble_image( + const uint64_t pulse_id, ImageMetadata &image_meta) const; +}; + + +#endif //SF_DAQ_BUFFER_RAMBUFFER_HPP diff --git a/core-buffer/include/buffer_config.hpp b/core-buffer/include/buffer_config.hpp index 6f58c22..b2e68ca 100644 --- a/core-buffer/include/buffer_config.hpp +++ b/core-buffer/include/buffer_config.hpp @@ -20,15 +20,17 @@ namespace buffer_config { const size_t FOLDER_MOD = 100000; // Extension of our file format. const std::string FILE_EXTENSION = ".bin"; - // Number of pulses between each statistics print out. - const size_t STATS_MODULO = 100; + // Number of pulses between each statistics print out (buffer_writer, stream2vis...) + const size_t STATS_MODULO = 1000; + // Number of seconds after which statistics is print out (udp_recv) + const size_t STATS_TIME = 10; // If the RB is empty, how much time to wait before trying to read it again. const size_t RB_READ_RETRY_INTERVAL_MS = 5; // How many frames to read at once from file. const size_t BUFFER_BLOCK_SIZE = 100; - const size_t BUFFER_UDP_N_RECV_MSG = 64; + const size_t BUFFER_UDP_N_RECV_MSG = 128; // Size of UDP recv buffer const int BUFFER_UDP_RCVBUF_N_SLOTS = 100; // 8246 bytes for each UDP packet. @@ -38,8 +40,12 @@ namespace buffer_config { const int BUFFER_UDP_US_TIMEOUT = 2 * 1000; // HWM for live stream from buffer. const int BUFFER_ZMQ_SNDHWM = 100; + // HWM for live stream from buffer. + const int BUFFER_ZMQ_RCVHWM = 100; // IPC address of the live stream. const std::string BUFFER_LIVE_IPC_URL = "ipc:///tmp/sf-live-"; + // Number of image slots in ram buffer - 10 seconds should be enough + const int RAM_BUFFER_N_SLOTS = 100 * 10; } #endif //BUFFERCONFIG_HPP diff --git a/core-buffer/include/formats.hpp b/core-buffer/include/formats.hpp index a48cb8b..dfa3589 100644 --- a/core-buffer/include/formats.hpp +++ b/core-buffer/include/formats.hpp @@ -15,6 +15,17 @@ struct ModuleFrame { }; #pragma pack(pop) + +#pragma pack(push) +#pragma pack(1) +struct ImageMetadata { + uint64_t pulse_id; + uint64_t frame_index; + uint32_t daq_rec; + uint32_t is_good_image; +}; +#pragma pack(pop) + struct ModuleFrameBuffer { ModuleFrame module[JUNGFRAU_N_MODULES]; }; @@ -23,7 +34,7 @@ struct ModuleFrameBuffer { #pragma pack(1) struct BufferBinaryFormat { const char FORMAT_MARKER = 0xBE; - ModuleFrame metadata; + ModuleFrame meta; char data[buffer_config::MODULE_N_BYTES]; }; #pragma pack(pop) diff --git a/core-buffer/src/BufferUtils.cpp b/core-buffer/src/BufferUtils.cpp index c2a60a4..be85b4b 100644 --- a/core-buffer/src/BufferUtils.cpp +++ b/core-buffer/src/BufferUtils.cpp @@ -2,13 +2,37 @@ #include #include +#include +#include +#include +#include +#include using namespace std; +using namespace buffer_config; + +string BufferUtils::get_image_filename( + const std::string& detector_folder, + const uint64_t pulse_id) +{ + uint64_t data_folder = pulse_id / buffer_config::FOLDER_MOD; + data_folder *= buffer_config::FOLDER_MOD; + + uint64_t data_file = pulse_id / buffer_config::FILE_MOD; + data_file *= buffer_config::FILE_MOD; + + stringstream folder; + folder << detector_folder << "/"; + folder << data_folder << "/"; + folder << data_file << buffer_config::FILE_EXTENSION; + + return folder.str(); +} string BufferUtils::get_filename( - std::string detector_folder, - std::string module_name, - uint64_t pulse_id) + const std::string& detector_folder, + const std::string& module_name, + const uint64_t pulse_id) { uint64_t data_folder = pulse_id / buffer_config::FOLDER_MOD; data_folder *= buffer_config::FOLDER_MOD; @@ -25,7 +49,7 @@ string BufferUtils::get_filename( return folder.str(); } -size_t BufferUtils::get_file_frame_index(uint64_t pulse_id) +size_t BufferUtils::get_file_frame_index(const uint64_t pulse_id) { uint64_t file_base = pulse_id / buffer_config::FILE_MOD; file_base *= buffer_config::FILE_MOD; @@ -60,4 +84,87 @@ void BufferUtils::create_destination_folder(const string& output_file) string create_folder_command("mkdir -p " + output_folder); system(create_folder_command.c_str()); } -} \ No newline at end of file +} + +void* BufferUtils::connect_socket( + void* ctx, const string& detector_name, const string& stream_name) +{ + string ipc_address = BUFFER_LIVE_IPC_URL + + detector_name + "-" + + stream_name; + + void* socket = zmq_socket(ctx, ZMQ_SUB); + if (socket == nullptr) { + throw runtime_error(zmq_strerror(errno)); + } + + int rcvhwm = BUFFER_ZMQ_RCVHWM; + if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + int linger = 0; + if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + if (zmq_connect(socket, ipc_address.c_str()) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + if (zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + return socket; +} + +void* BufferUtils::bind_socket( + void* ctx, const string& detector_name, const string& stream_name) +{ + string ipc_address = BUFFER_LIVE_IPC_URL + + detector_name + "-" + + stream_name; + + void* socket = zmq_socket(ctx, ZMQ_PUB); + + const int sndhwm = BUFFER_ZMQ_SNDHWM; + if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + const int linger = 0; + if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + if (zmq_bind(socket, ipc_address.c_str()) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + return socket; +} + +BufferUtils::DetectorConfig BufferUtils::read_json_config( + const std::string& filename) +{ + std::ifstream ifs(filename); + rapidjson::IStreamWrapper isw(ifs); + rapidjson::Document config_parameters; + config_parameters.ParseStream(isw); + + return { + config_parameters["streamvis_stream"].GetString(), + config_parameters["streamvis_rate"].GetInt(), + config_parameters["live_stream"].GetString(), + config_parameters["live_rate"].GetInt(), + config_parameters["pedestal_file"].GetString(), + config_parameters["gain_file"].GetString(), + config_parameters["detector_name"].GetString(), + config_parameters["n_modules"].GetInt(), + config_parameters["start_udp_port"].GetInt(), + config_parameters["buffer_folder"].GetString(), + config_parameters["image_y_size"].GetInt(), + config_parameters["image_x_size"].GetInt() + }; +} diff --git a/core-buffer/src/RamBuffer.cpp b/core-buffer/src/RamBuffer.cpp new file mode 100644 index 0000000..ff14011 --- /dev/null +++ b/core-buffer/src/RamBuffer.cpp @@ -0,0 +1,166 @@ +#include +#include +#include +#include +#include +#include +#include "RamBuffer.hpp" +#include "buffer_config.hpp" + +using namespace std; +using namespace buffer_config; + +RamBuffer::RamBuffer( + const string &detector_name, + const int n_modules, + const int n_slots) : + detector_name_(detector_name), + n_modules_(n_modules), + n_slots_(n_slots), + meta_bytes_(sizeof(ModuleFrame) * n_modules_), + image_bytes_(MODULE_N_BYTES * n_modules_), + buffer_bytes_((meta_bytes_ + image_bytes_) * n_slots_) +{ + shm_fd_ = shm_open(detector_name_.c_str(), O_RDWR | O_CREAT, 0777); + if (shm_fd_ < 0) { + throw runtime_error(strerror(errno)); + } + + if ((ftruncate(shm_fd_, buffer_bytes_)) == -1) { + throw runtime_error(strerror(errno)); + } + + // TODO: Test with MAP_HUGETLB + buffer_ = mmap(NULL, buffer_bytes_, PROT_WRITE, MAP_SHARED, shm_fd_, 0); + if (buffer_ == MAP_FAILED) { + throw runtime_error(strerror(errno)); + } + + // Metadata buffer is located at the start of the memory region. + meta_buffer_ = (ModuleFrame *) buffer_; + // Image buffer start right after metadata buffer. + image_buffer_ = (char*)buffer_ + (meta_bytes_ * n_slots_); +} + +RamBuffer::~RamBuffer() +{ + munmap(buffer_, buffer_bytes_); + close(shm_fd_); + shm_unlink(detector_name_.c_str()); +} + +void RamBuffer::write_frame( + const ModuleFrame& src_meta, + const char *src_data) const +{ + const int slot_n = src_meta.pulse_id % n_slots_; + + ModuleFrame *dst_meta = meta_buffer_ + + (n_modules_ * slot_n) + + src_meta.module_id; + + char *dst_data = image_buffer_ + + (image_bytes_ * slot_n) + + (MODULE_N_BYTES * src_meta.module_id); + + memcpy(dst_meta, &src_meta, sizeof(ModuleFrame)); + memcpy(dst_data, src_data, MODULE_N_BYTES); +} + +void RamBuffer::read_frame( + const uint64_t pulse_id, + const uint64_t module_id, + ModuleFrame& dst_meta, + char* dst_data) const +{ + const size_t slot_n = pulse_id % n_slots_; + + ModuleFrame *src_meta = meta_buffer_ + (n_modules_ * slot_n) + module_id; + + char *src_data = image_buffer_ + + (image_bytes_ * slot_n) + + (MODULE_N_BYTES * module_id); + + memcpy(&dst_meta, src_meta, sizeof(ModuleFrame)); + memcpy(dst_data, src_data, MODULE_N_BYTES); +} + +void RamBuffer::assemble_image( + const uint64_t pulse_id, ImageMetadata &image_meta) const +{ + const size_t slot_n = pulse_id % n_slots_; + ModuleFrame *src_meta = meta_buffer_ + (n_modules_ * slot_n); + + auto is_pulse_init = false; + auto is_good_image = true; + + for (int i_module=0; i_module < n_modules_; i_module++) { + ModuleFrame *frame_meta = src_meta + i_module; + + auto is_good_frame = + frame_meta->n_recv_packets == JF_N_PACKETS_PER_FRAME; + + if (!is_good_frame) { + is_good_image = false; + continue; + } + + if (!is_pulse_init) { + + if (frame_meta->pulse_id != pulse_id) { + stringstream err_msg; + err_msg << "[RamBuffer::read_image]"; + err_msg << " Unexpected pulse_id in ram buffer."; + err_msg << " expected=" << pulse_id; + err_msg << " got=" << frame_meta->pulse_id; + + for (int i = 0; i < n_modules_; i++) { + ModuleFrame *meta = src_meta + i_module; + + err_msg << " (module " << i << ", "; + err_msg << meta->pulse_id << "),"; + } + err_msg << endl; + + throw runtime_error(err_msg.str()); + } + + image_meta.pulse_id = frame_meta->pulse_id; + image_meta.frame_index = frame_meta->frame_index; + image_meta.daq_rec = frame_meta->daq_rec; + + is_pulse_init = 1; + } + + if (is_good_image) { + if (frame_meta->pulse_id != image_meta.pulse_id) { + is_good_image = false; + // TODO: Add some diagnostics in case this happens. + } + + if (frame_meta->frame_index != image_meta.frame_index) { + is_good_image = false; + } + + if (frame_meta->daq_rec != image_meta.daq_rec) { + is_good_image = false; + } + } + } + + image_meta.is_good_image = is_good_image; + + if (!is_pulse_init) { + image_meta.pulse_id = 0; + image_meta.frame_index = 0; + image_meta.daq_rec = 0; + } +} + +char* RamBuffer::read_image(const uint64_t pulse_id) const +{ + const size_t slot_n = pulse_id % n_slots_; + char *src_data = image_buffer_ + (image_bytes_ * slot_n); + + return src_data; +} diff --git a/core-buffer/test/CMakeLists.txt b/core-buffer/test/CMakeLists.txt index f127886..aa1173a 100644 --- a/core-buffer/test/CMakeLists.txt +++ b/core-buffer/test/CMakeLists.txt @@ -3,7 +3,6 @@ add_executable(core-buffer-tests main.cpp) target_link_libraries(core-buffer-tests core-buffer-lib external - hdf5 - hdf5_cpp + rt zmq gtest) diff --git a/core-buffer/test/main.cpp b/core-buffer/test/main.cpp index e379d99..732038f 100644 --- a/core-buffer/test/main.cpp +++ b/core-buffer/test/main.cpp @@ -1,6 +1,7 @@ #include "gtest/gtest.h" #include "test_buffer_utils.cpp" #include "test_bitshuffle.cpp" +#include "test_RamBuffer.cpp" using namespace std; diff --git a/core-buffer/test/test_RamBuffer.cpp b/core-buffer/test/test_RamBuffer.cpp new file mode 100644 index 0000000..d28dd81 --- /dev/null +++ b/core-buffer/test/test_RamBuffer.cpp @@ -0,0 +1,37 @@ +#include "gtest/gtest.h" +#include "RamBuffer.hpp" + +using namespace std; +using namespace buffer_config; + +TEST(RamBuffer, simple_store) +{ + const int n_modules = 3; + RamBuffer buffer("test_detector", n_modules, 10); + + ModuleFrame frame_meta; + frame_meta.pulse_id = 123523; + frame_meta.daq_rec = 1234; + frame_meta.frame_index = 12342300; + frame_meta.n_recv_packets = JF_N_PACKETS_PER_FRAME; + + auto frame_buffer = make_unique(MODULE_N_PIXELS); + + + for (size_t i = 0; i < MODULE_N_PIXELS; i++) { + frame_buffer[i] = i % 100; + } + + for (int i_module=0; i_module +#include +#include + +class AssemblerStats { + const std::string detector_name_; + const size_t stats_modulo_; + + int image_counter_; + int n_corrupted_images_; + int n_sync_lost_images_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + AssemblerStats(const std::string &detector_name, + const size_t stats_modulo); + + void record_stats(const ImageMetadata &meta, const uint32_t n_lost_pulses); +}; + + +#endif //SF_DAQ_BUFFER_ASSEMBLERSTATS_HPP diff --git a/jf-assembler/include/ZmqPulseSyncReceiver.hpp b/jf-assembler/include/ZmqPulseSyncReceiver.hpp new file mode 100644 index 0000000..624de34 --- /dev/null +++ b/jf-assembler/include/ZmqPulseSyncReceiver.hpp @@ -0,0 +1,34 @@ +#ifndef SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP +#define SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP + + +#include +#include +#include + +#include "formats.hpp" + +struct PulseAndSync { + const uint64_t pulse_id; + const uint32_t n_lost_pulses; +}; + +class ZmqPulseSyncReceiver { + + void* ctx_; + const int n_modules_; + + std::vector sockets_; + +public: + ZmqPulseSyncReceiver( + void* ctx, + const std::string& detector_name, + const int n_modules); + ~ZmqPulseSyncReceiver(); + + PulseAndSync get_next_pulse_id() const; +}; + + +#endif //SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP diff --git a/jf-assembler/include/assembler_config.hpp b/jf-assembler/include/assembler_config.hpp new file mode 100644 index 0000000..b0e277d --- /dev/null +++ b/jf-assembler/include/assembler_config.hpp @@ -0,0 +1,14 @@ +namespace assembler_config +{ + // N of IO threads to send image metadata. + const int ASSEMBLER_ZMQ_IO_THREADS = 1; + + // If the modules are offset more than 1000 pulses, crush. + const uint64_t PULSE_OFFSET_LIMIT = 100; + + // Number of times we try to re-sync in case of failure. + const int SYNC_RETRY_LIMIT = 3; + + // Number of pulses between each statistics print out. + const size_t ASSEMBLER_STATS_MODULO = 1000; +} diff --git a/jf-assembler/src/AssemblerStats.cpp b/jf-assembler/src/AssemblerStats.cpp new file mode 100644 index 0000000..e295da6 --- /dev/null +++ b/jf-assembler/src/AssemblerStats.cpp @@ -0,0 +1,62 @@ +#include "AssemblerStats.hpp" + +#include + +using namespace std; +using namespace chrono; + +AssemblerStats::AssemblerStats( + const std::string &detector_name, + const size_t stats_modulo) : + detector_name_(detector_name), + stats_modulo_(stats_modulo) +{ + reset_counters(); +} + +void AssemblerStats::reset_counters() +{ + image_counter_ = 0; + n_sync_lost_images_ = 0; + n_corrupted_images_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void AssemblerStats::record_stats( + const ImageMetadata &meta, const uint32_t n_lost_pulses) +{ + image_counter_++; + n_sync_lost_images_ += n_lost_pulses; + + if (!meta.is_good_image) { + n_corrupted_images_++; + } + + if (image_counter_ == stats_modulo_) { + print_stats(); + reset_counters(); + } +} + +void AssemblerStats::print_stats() +{ + auto interval_ms_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((image_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "jf_assembler"; + cout << ",detector_name=" << detector_name_; + cout << " "; + cout << "n_processed_images=" << image_counter_ << "i"; + cout << ",n_corrupted_images=" << n_corrupted_images_ << "i"; + cout << ",n_sync_lost_images=" << n_sync_lost_images_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} + diff --git a/jf-assembler/src/ZmqPulseSyncReceiver.cpp b/jf-assembler/src/ZmqPulseSyncReceiver.cpp new file mode 100644 index 0000000..6dbe2fc --- /dev/null +++ b/jf-assembler/src/ZmqPulseSyncReceiver.cpp @@ -0,0 +1,115 @@ +#include "ZmqPulseSyncReceiver.hpp" +#include "BufferUtils.hpp" + +#include +#include +#include +#include +#include +#include + +#include "assembler_config.hpp" + +using namespace std; +using namespace chrono; +using namespace buffer_config; +using namespace assembler_config; + + +ZmqPulseSyncReceiver::ZmqPulseSyncReceiver( + void * ctx, + const string& detector_name, + const int n_modules) : + ctx_(ctx), + n_modules_(n_modules) +{ + sockets_.reserve(n_modules_); + + for (int i=0; i::max();; + uint64_t max_pulse_id = 0; + + for (int i = 0; i < n_modules_; i++) { + min_pulse_id = min(min_pulse_id, pulses[i]); + max_pulse_id = max(max_pulse_id, pulses[i]); + } + + auto max_diff = max_pulse_id - min_pulse_id; + if (max_diff > PULSE_OFFSET_LIMIT) { + stringstream err_msg; + err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]"; + err_msg << " PULSE_OFFSET_LIMIT exceeded."; + err_msg << " max_diff=" << max_diff << " pulses."; + + for (int i = 0; i < n_modules_; i++) { + err_msg << " (module " << i << ", "; + err_msg << pulses[i] << "),"; + } + err_msg << endl; + + throw runtime_error(err_msg.str()); + } + + modules_in_sync = true; + // Max pulses we lost in this sync attempt. + uint32_t i_sync_lost_pulses = 0; + for (int i = 0; i < n_modules_; i++) { + // How many pulses we lost for this specific module. + uint32_t i_module_lost_pulses = 0; + while (pulses[i] < max_pulse_id) { + zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0); + i_module_lost_pulses++; + } + + i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses); + + if (pulses[i] != max_pulse_id) { + modules_in_sync = false; + } + } + n_lost_pulses += i_sync_lost_pulses; + + if (modules_in_sync) { + return {pulses[0], n_lost_pulses}; + } + } + + stringstream err_msg; + err_msg << "[ZmqLiveReceiver::get_next_pulse_id]"; + err_msg << " SYNC_RETRY_LIMIT exceeded."; + err_msg << endl; + + throw runtime_error(err_msg.str()); +} diff --git a/jf-assembler/src/main.cpp b/jf-assembler/src/main.cpp new file mode 100644 index 0000000..e1b76ab --- /dev/null +++ b/jf-assembler/src/main.cpp @@ -0,0 +1,47 @@ +#include +#include +#include +#include +#include +#include + +#include "assembler_config.hpp" +#include "ZmqPulseSyncReceiver.hpp" + +using namespace std; +using namespace buffer_config; +using namespace assembler_config; + +int main (int argc, char *argv[]) +{ + if (argc != 2) { + cout << endl; + cout << "Usage: jf_assembler [detector_json_filename]" << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; + cout << endl; + + exit(-1); + } + + auto config = BufferUtils::read_json_config(string(argv[1])); + auto const stream_name = "assembler"; + + auto ctx = zmq_ctx_new(); + zmq_ctx_set(ctx, ZMQ_IO_THREADS, ASSEMBLER_ZMQ_IO_THREADS); + auto sender = BufferUtils::bind_socket( + ctx, config.detector_name, stream_name); + + ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules); + RamBuffer ram_buffer(config.detector_name, config.n_modules); + AssemblerStats stats(config.detector_name, ASSEMBLER_STATS_MODULO); + + ImageMetadata meta; + while (true) { + auto pulse_and_sync = receiver.get_next_pulse_id(); + ram_buffer.assemble_image(pulse_and_sync.pulse_id, meta); + + zmq_send(sender, &meta, sizeof(meta), 0); + + stats.record_stats(meta, pulse_and_sync.n_lost_pulses); + } +} diff --git a/jf-assembler/test/CMakeLists.txt b/jf-assembler/test/CMakeLists.txt new file mode 100644 index 0000000..bb240e7 --- /dev/null +++ b/jf-assembler/test/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(jf-assembler-tests main.cpp) + +target_link_libraries(jf-assembler-tests + jf-assembler-lib + gtest + ) + diff --git a/jf-assembler/test/main.cpp b/jf-assembler/test/main.cpp new file mode 100644 index 0000000..e819294 --- /dev/null +++ b/jf-assembler/test/main.cpp @@ -0,0 +1,8 @@ +#include "gtest/gtest.h" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/jf-buffer-writer/CMakeLists.txt b/jf-buffer-writer/CMakeLists.txt new file mode 100644 index 0000000..a389502 --- /dev/null +++ b/jf-buffer-writer/CMakeLists.txt @@ -0,0 +1,18 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(jf-buffer-writer-lib STATIC ${SOURCES}) +target_include_directories(jf-buffer-writer-lib PUBLIC include/) +target_link_libraries(jf-buffer-writer-lib + external + core-buffer-lib) + +add_executable(jf-buffer-writer src/main.cpp) +set_target_properties(jf-buffer-writer PROPERTIES OUTPUT_NAME jf_buffer_writer) +target_link_libraries(jf-buffer-writer + jf-buffer-writer-lib + zmq + rt) + +enable_testing() +add_subdirectory(test/) diff --git a/sf-buffer/include/BufferBinaryWriter.hpp b/jf-buffer-writer/include/BufferBinaryWriter.hpp similarity index 100% rename from sf-buffer/include/BufferBinaryWriter.hpp rename to jf-buffer-writer/include/BufferBinaryWriter.hpp diff --git a/jf-buffer-writer/include/BufferStats.hpp b/jf-buffer-writer/include/BufferStats.hpp new file mode 100644 index 0000000..3aff6cb --- /dev/null +++ b/jf-buffer-writer/include/BufferStats.hpp @@ -0,0 +1,32 @@ +#include +#include +#include + +#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP +#define SF_DAQ_BUFFER_FRAMESTATS_HPP + + +class BufferStats { + const std::string detector_name_; + const int module_id_; + size_t stats_modulo_; + + int frames_counter_; + uint32_t total_buffer_write_us_; + uint32_t max_buffer_write_us_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + BufferStats( + const std::string &detector_name, + const int module_id, + const size_t stats_modulo); + void start_frame_write(); + void end_frame_write(); +}; + + +#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP diff --git a/sf-buffer/src/BufferBinaryWriter.cpp b/jf-buffer-writer/src/BufferBinaryWriter.cpp similarity index 99% rename from sf-buffer/src/BufferBinaryWriter.cpp rename to jf-buffer-writer/src/BufferBinaryWriter.cpp index f892704..78eef20 100644 --- a/sf-buffer/src/BufferBinaryWriter.cpp +++ b/jf-buffer-writer/src/BufferBinaryWriter.cpp @@ -29,7 +29,7 @@ BufferBinaryWriter::~BufferBinaryWriter() } void BufferBinaryWriter::write( - uint64_t pulse_id, + const uint64_t pulse_id, const BufferBinaryFormat* buffer) { auto current_frame_file = diff --git a/jf-buffer-writer/src/BufferStats.cpp b/jf-buffer-writer/src/BufferStats.cpp new file mode 100644 index 0000000..173a35c --- /dev/null +++ b/jf-buffer-writer/src/BufferStats.cpp @@ -0,0 +1,63 @@ +#include +#include "BufferStats.hpp" + +using namespace std; +using namespace chrono; + +BufferStats::BufferStats( + const string& detector_name, + const int module_id, + const size_t stats_modulo) : + detector_name_(detector_name), + module_id_(module_id), + stats_modulo_(stats_modulo) +{ + reset_counters(); +} + +void BufferStats::reset_counters() +{ + frames_counter_ = 0; + total_buffer_write_us_ = 0; + max_buffer_write_us_ = 0; +} + +void BufferStats::start_frame_write() +{ + stats_interval_start_ = steady_clock::now(); +} + +void BufferStats::end_frame_write() +{ + frames_counter_++; + + uint32_t write_us_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + + total_buffer_write_us_ += write_us_duration; + max_buffer_write_us_ = max(max_buffer_write_us_, write_us_duration); + + if (frames_counter_ == stats_modulo_) { + print_stats(); + reset_counters(); + } +} + +void BufferStats::print_stats() +{ + float avg_buffer_write_us = total_buffer_write_us_ / frames_counter_; + + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "jf_buffer_writer"; + cout << ",detector_name=" << detector_name_; + cout << ",module_name=M" << module_id_; + cout << " "; + cout << "avg_buffer_write_us=" << avg_buffer_write_us; + cout << ",max_buffer_write_us=" << max_buffer_write_us_ << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} diff --git a/jf-buffer-writer/src/main.cpp b/jf-buffer-writer/src/main.cpp new file mode 100644 index 0000000..cd65986 --- /dev/null +++ b/jf-buffer-writer/src/main.cpp @@ -0,0 +1,61 @@ +#include +#include +#include +#include +#include + +#include "formats.hpp" +#include "BufferUtils.hpp" +#include "buffer_config.hpp" +#include "jungfrau.hpp" +#include "BufferBinaryWriter.hpp" + +using namespace std; +using namespace buffer_config; +using namespace BufferUtils; + +int main (int argc, char *argv[]) { + + if (argc != 3) { + cout << endl; + cout << "Usage: jf_buffer_writer [detector_json_filename] [module_id]"; + cout << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; + cout << "\tmodule_id: id of the module for this process." << endl; + cout << endl; + + exit(-1); + } + + const auto config = read_json_config(string(argv[1])); + const int module_id = atoi(argv[2]); + + // TODO: Remove this also in reader. + const string module_prefix = (module_id < 10) ? "0" : ""; + const auto module_name = "M" + module_prefix + to_string(module_id); + + BufferBinaryWriter writer(config.buffer_folder, module_name); + RamBuffer ram_buff(config.detector_name, config.n_modules); + BufferStats stats(config.detector_name, module_id, STATS_MODULO); + + auto ctx = zmq_ctx_new(); + auto socket = connect_socket( + ctx, config.detector_name, to_string(module_id)); + + auto file_buff = new BufferBinaryFormat(); + uint64_t pulse_id; + + while (true) { + zmq_recv(socket, &pulse_id, sizeof(pulse_id), 0); + + stats.start_frame_write(); + + // TODO: Memory copy here. Optimize this one out. + ram_buff.read_frame( + pulse_id, module_id, file_buff->meta, file_buff->data); + + writer.write(pulse_id, file_buff); + + stats.end_frame_write(); + } +} diff --git a/jf-buffer-writer/test/CMakeLists.txt b/jf-buffer-writer/test/CMakeLists.txt new file mode 100644 index 0000000..4e32fd8 --- /dev/null +++ b/jf-buffer-writer/test/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(jf-buffer-writer-tests main.cpp) + +target_link_libraries(jf-buffer-writer-tests + jf-buffer-writer-lib + zmq + gtest + ) diff --git a/jf-buffer-writer/test/main.cpp b/jf-buffer-writer/test/main.cpp new file mode 100644 index 0000000..1c656b8 --- /dev/null +++ b/jf-buffer-writer/test/main.cpp @@ -0,0 +1,10 @@ +#include "gtest/gtest.h" +#include "test_BufferBinaryWriter.cpp" + + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/sf-buffer/test/test_BufferBinaryWriter.cpp b/jf-buffer-writer/test/test_BufferBinaryWriter.cpp similarity index 76% rename from sf-buffer/test/test_BufferBinaryWriter.cpp rename to jf-buffer-writer/test/test_BufferBinaryWriter.cpp index 5076c34..92e5d92 100644 --- a/sf-buffer/test/test_BufferBinaryWriter.cpp +++ b/jf-buffer-writer/test/test_BufferBinaryWriter.cpp @@ -12,10 +12,10 @@ TEST(BinaryWriter, basic_interaction) BufferBinaryWriter writer(module_name, detector_folder); BufferBinaryFormat frame_data; - frame_data.metadata.pulse_id = 1; - frame_data.metadata.frame_index = 2; - frame_data.metadata.daq_rec = 3; - frame_data.metadata.n_recv_packets = 4; + frame_data.meta.pulse_id = 1; + frame_data.meta.frame_index = 2; + frame_data.meta.daq_rec = 3; + frame_data.meta.n_recv_packets = 4; writer.write(5, &frame_data); @@ -33,11 +33,11 @@ TEST(BinaryWriter, basic_interaction) ::read(read_fd, &read_data, sizeof(BufferBinaryFormat)); ASSERT_EQ(frame_data.FORMAT_MARKER, read_data.FORMAT_MARKER); - ASSERT_EQ(frame_data.metadata.pulse_id, read_data.metadata.pulse_id); - ASSERT_EQ(frame_data.metadata.frame_index, read_data.metadata.frame_index); - ASSERT_EQ(frame_data.metadata.daq_rec, read_data.metadata.daq_rec); - ASSERT_EQ(frame_data.metadata.n_recv_packets, - read_data.metadata.n_recv_packets); + ASSERT_EQ(frame_data.meta.pulse_id, read_data.meta.pulse_id); + ASSERT_EQ(frame_data.meta.frame_index, read_data.meta.frame_index); + ASSERT_EQ(frame_data.meta.daq_rec, read_data.meta.daq_rec); + ASSERT_EQ(frame_data.meta.n_recv_packets, + read_data.meta.n_recv_packets); } TEST(BinaryWriter, test_format_marker) @@ -49,10 +49,10 @@ TEST(BinaryWriter, test_format_marker) BufferBinaryWriter writer(module_name, detector_folder); BufferBinaryFormat frame_data; - frame_data.metadata.pulse_id = 1; - frame_data.metadata.frame_index = 2; - frame_data.metadata.daq_rec = 3; - frame_data.metadata.n_recv_packets = 4; + frame_data.meta.pulse_id = 1; + frame_data.meta.frame_index = 2; + frame_data.meta.daq_rec = 3; + frame_data.meta.n_recv_packets = 4; writer.write(5, &frame_data); diff --git a/jf-live-writer/CMakeLists.txt b/jf-live-writer/CMakeLists.txt new file mode 100644 index 0000000..b6f1045 --- /dev/null +++ b/jf-live-writer/CMakeLists.txt @@ -0,0 +1,31 @@ +find_package(MPI REQUIRED) +# Because of openmpi. +add_definitions(-DOMPI_SKIP_MPICXX) + +file(GLOB SOURCES + src/*.cpp) + +add_library(jf-live-writer-lib STATIC ${SOURCES}) +target_include_directories(jf-live-writer-lib + PUBLIC include/ + SYSTEM ${MPI_INCLUDE_PATH}) + +target_link_libraries(jf-live-writer-lib + external + core-buffer-lib + ${MPI_LIBRARIES} + ) + +add_executable(jf-live-writer src/main.cpp) +set_target_properties(jf-live-writer PROPERTIES OUTPUT_NAME jf_live_writer) +target_link_libraries(jf-live-writer + jf-live-writer-lib + zmq + hdf5 + hdf5_hl + hdf5_cpp + rt + ) + +enable_testing() +add_subdirectory(test/) \ No newline at end of file diff --git a/jf-live-writer/README.md b/jf-live-writer/README.md new file mode 100644 index 0000000..ce4349c --- /dev/null +++ b/jf-live-writer/README.md @@ -0,0 +1,13 @@ +# jf-live-writer + +## Install PHDF5 manually +``` +wget https://support.hdfgroup.org/ftp/HDF5/releases/hdf5-1.12/hdf5-1.12.0/src/hdf5-1.12.0.tar.gz +tar -xzf hdf5-1.12.0.tar.gz +cd hdf5-1.10.7 +./configure --enable-parallel +make install +sudo ln -v -s `pwd`/hdf5/lib/* /usr/lib64/ +sudo ln -v -s `pwd`/hdf5/include/* /usr/include/ +``` + diff --git a/jf-live-writer/include/JFH5Writer.hpp b/jf-live-writer/include/JFH5Writer.hpp new file mode 100644 index 0000000..3a681f2 --- /dev/null +++ b/jf-live-writer/include/JFH5Writer.hpp @@ -0,0 +1,58 @@ +#ifndef SFWRITER_HPP +#define SFWRITER_HPP + +#include +#include +#include +#include + +extern "C" { + #include +} + +class JFH5Writer { + + const std::string root_folder_; + const std::string detector_name_; + + static const int64_t NO_RUN_ID; + + // Run specific variables. + int64_t current_run_id_ = NO_RUN_ID; + uint32_t image_y_size_ = 0; + uint32_t image_x_size_ = 0; + uint32_t bits_per_pixel_ = 0; + + // Open file specific variables. + hid_t file_id_ = -1; + hid_t image_dataset_id_ = -1; + hid_t pulse_dataset_id_= -1; + hid_t frame_dataset_id_ = -1; + hid_t daq_rec_dataset_id_ = -1; + hid_t is_good_dataset_id_ = -1; + + hid_t get_datatype(const int bits_per_pixel); + void open_file(const std::string& output_file, const uint32_t n_images); + void close_file(); + +public: + JFH5Writer(const BufferUtils::DetectorConfig config); + ~JFH5Writer(); + + void open_run(const int64_t run_id, + const uint32_t n_images, + const uint32_t image_y_size, + const uint32_t image_x_size, + const uint32_t bits_per_pixel); + void close_run(); + + void write_data(const int64_t run_id, + const uint32_t index, + const char* data); + + void write_meta(const int64_t run_id, + const uint32_t index, + const ImageMetadata& meta); +}; + +#endif //SFWRITER_HPP diff --git a/jf-live-writer/include/WriterStats.hpp b/jf-live-writer/include/WriterStats.hpp new file mode 100644 index 0000000..775a46f --- /dev/null +++ b/jf-live-writer/include/WriterStats.hpp @@ -0,0 +1,32 @@ +#include +#include +#include + +#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP +#define SF_DAQ_BUFFER_FRAMESTATS_HPP + + +class WriterStats { + const std::string detector_name_; + const size_t stats_modulo_; + const size_t image_n_bytes_; + + int image_counter_; + uint32_t total_buffer_write_us_; + uint32_t max_buffer_write_us_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + WriterStats( + const std::string &detector_name, + const size_t stats_modulo, + const size_t image_n_bytes); + void start_image_write(); + void end_image_write(); +}; + + +#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP diff --git a/jf-live-writer/include/broker_format.hpp b/jf-live-writer/include/broker_format.hpp new file mode 100644 index 0000000..ecdca44 --- /dev/null +++ b/jf-live-writer/include/broker_format.hpp @@ -0,0 +1,23 @@ +#ifndef SF_DAQ_BUFFER_BROKER_FORMAT_HPP +#define SF_DAQ_BUFFER_BROKER_FORMAT_HPP + +#include "formats.hpp" + +const static uint8_t OP_START = 1; +const static uint8_t OP_END = 2; + +#pragma pack(push) +#pragma pack(1) +struct StoreStream { + int64_t run_id; + uint32_t i_image; + uint32_t n_images; + uint32_t image_y_size; + uint32_t image_x_size; + uint32_t op_code; + uint32_t bits_per_pixel; + + ImageMetadata image_metadata; +}; +#pragma pack(pop) +#endif //SF_DAQ_BUFFER_BROKER_FORMAT_HPP diff --git a/jf-live-writer/include/live_writer_config.hpp b/jf-live-writer/include/live_writer_config.hpp new file mode 100644 index 0000000..d47c531 --- /dev/null +++ b/jf-live-writer/include/live_writer_config.hpp @@ -0,0 +1,9 @@ +#include + +namespace live_writer_config +{ + // N of IO threads to receive data from modules. + const int LIVE_ZMQ_IO_THREADS = 1; + + const std::string OUTPUT_FOLDER_SYMLINK = "OUTPUT/" +} \ No newline at end of file diff --git a/jf-live-writer/src/JFH5Writer.cpp b/jf-live-writer/src/JFH5Writer.cpp new file mode 100644 index 0000000..a9d2a52 --- /dev/null +++ b/jf-live-writer/src/JFH5Writer.cpp @@ -0,0 +1,234 @@ +#include "JFH5Writer.hpp" + +#include +#include +#include + + +#include "live_writer_config.hpp" +#include "buffer_config.hpp" +#include "formats.hpp" + +extern "C" +{ + #include "H5DOpublic.h" + #include +} + +using namespace std; +using namespace buffer_config; +using namespace live_writer_config; + +JFH5Writer::JFH5Writer(const BufferUtils::DetectorConfig config): + root_folder_(config.buffer_folder), + detector_name_(config.detector_name), +{ +} + +JFH5Writer::~JFH5Writer() +{ + close_file(); +} + +void JFH5Writer::open_run(const int64_t run_id, + const uint32_t n_images, + const uint32_t image_y_size, + const uint32_t image_x_size, + const uint32_t bits_per_pixel) +{ + close_run(); + + const string output_folder = root_folder_ + "/" + OUTPUT_FOLDER_SYMLINK; + // TODO: Maybe add leading zeros to filename? + const string output_file = output_folder + to_string(run_id) + ".h5"; + + current_run_id_ = run_id; + image_y_size_ = image_y_size; + image_x_size_ = image_x_size; + bits_per_pixel_ = bits_per_pixel; + + open_file(output_file, n_images); +} + +void JFH5Writer::close_run() +{ + close_file(); + + current_run_id_ = NO_RUN_ID; + image_y_size_ = 0; + image_x_size_ = 0; + bits_per_pixel_ = 0; +} + +void JFH5Writer::open_file(const string& output_file, const uint32_t n_images) +{ + // Create file + auto fcpl_id = H5Pcreate(H5P_FILE_ACCESS); + if (fcpl_id == -1) { + throw runtime_error("Error in file access property list."); + } + + if (H5Pset_fapl_mpio(fcpl_id, MPI_COMM_WORLD, MPI_INFO_NULL) < 0) { + throw runtime_error("Cannot set mpio to property list."); + } + + file_id_ = H5Fcreate( + output_file.c_str(), H5F_ACC_TRUNC, H5P_DEFAULT, fcpl_id); + if (file_id_ < 0) { + throw runtime_error("Cannot create output file."); + } + + H5Pclose(fcpl_id); + + // Create group + auto data_group_id = H5Gcreate(file_id_, detector_name_.c_str(), + H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); + if (data_group_id < 0) { + throw runtime_error("Cannot create data group."); + } + + // Create image dataset. + auto dcpl_id = H5Pcreate(H5P_DATASET_CREATE); + if (dcpl_id < 0) { + throw runtime_error("Error in creating dataset create property list."); + } + + hsize_t image_dataset_chunking[] = {1, image_y_size_, image_x_size_}; + if (H5Pset_chunk(dcpl_id, 3, image_dataset_chunking) < 0) { + throw runtime_error("Cannot set image dataset chunking."); + } + + if (H5Pset_fill_time(dcpl_id, H5D_FILL_TIME_NEVER) < 0) { + throw runtime_error("Cannot set image dataset fill time."); + } + + if (H5Pset_alloc_time(dcpl_id, H5D_ALLOC_TIME_EARLY) < 0) { + throw runtime_error("Cannot set image dataset allocation time."); + } + + hsize_t image_dataset_dims[] = {n_images, image_y_size_, image_x_size_}; + auto image_space_id = H5Screate_simple(3, image_dataset_dims, NULL); + if (image_space_id < 0) { + throw runtime_error("Cannot create image dataset space."); + } + + // TODO: Enable compression. +// bshuf_register_h5filter(); +// uint filter_prop[] = {PIXEL_N_BYTES, BSHUF_H5_COMPRESS_LZ4}; +// if (H5Pset_filter(dcpl_id, BSHUF_H5FILTER, H5Z_FLAG_MANDATORY, +// 2, filter_prop) < 0) { +// throw runtime_error("Cannot set compression filter on dataset."); +// } + + image_dataset_id_ = H5Dcreate( + data_group_id, "data", get_datatype(bits_per_pixel_), + image_space_id, H5P_DEFAULT, dcpl_id, H5P_DEFAULT); + if (image_dataset_id_ < 0) { + throw runtime_error("Cannot create image dataset."); + } + + // Create metadata datasets. + hsize_t meta_dataset_dims[] = {n_images}; + auto meta_space_id = H5Screate_simple(1, meta_dataset_dims, NULL); + if (meta_space_id < 0) { + throw runtime_error("Cannot create meta dataset space."); + } + + auto create_meta_dataset = [&](string name, hid_t data_type) { + auto dataset_id = H5Dcreate( + data_group_id, name.c_str(), data_type, meta_space_id, + H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); + if (dataset_id < 0) { + throw runtime_error("Cannot create " + name + " dataset."); + } + + return dataset_id; + }; + + pulse_dataset_id_ = create_meta_dataset("pulse_id", H5T_NATIVE_UINT64); + frame_dataset_id_ = create_meta_dataset("frame_index", H5T_NATIVE_UINT64); + daq_rec_dataset_id_ = create_meta_dataset("daq_rec", H5T_NATIVE_UINT32); + is_good_dataset_id_ = create_meta_dataset("is_good_frame", H5T_NATIVE_UINT8); + + H5Sclose(meta_space_id); + H5Sclose(image_space_id); + H5Pclose(dcpl_id); + H5Gclose(data_group_id); +} + +void JFH5Writer::close_file() +{ + if (file_id_ < 0) { + return; + } + + H5Dclose(image_dataset_id_); + image_dataset_id_ = -1; + + H5Dclose(pulse_dataset_id_); + pulse_dataset_id_ = -1; + + H5Dclose(frame_dataset_id_); + frame_dataset_id_ = -1; + + H5Dclose(daq_rec_dataset_id_); + daq_rec_dataset_id_ = -1; + + H5Dclose(is_good_dataset_id_); + is_good_dataset_id_ = -1; + + H5Fclose(file_id_); + file_id_ = -1; +} + +void JFH5Writer::write_data( + const int64_t run_id, const uint32_t index, const char* data) +{ + if (run_id != current_run_id_) { + throw runtime_error("Invalid run_id."); + } + +// hsize_t b_i_dims[3] = {BUFFER_BLOCK_SIZE, +// MODULE_Y_SIZE * n_modules_, +// MODULE_X_SIZE}; +// H5::DataSpace b_i_space(3, b_i_dims); +// hsize_t b_i_count[] = {n_images_to_copy, +// MODULE_Y_SIZE * n_modules_, +// MODULE_X_SIZE}; +// hsize_t b_i_start[] = {n_images_offset, 0, 0}; +// b_i_space.selectHyperslab(H5S_SELECT_SET, b_i_count, b_i_start); +// +// hsize_t f_i_dims[3] = {n_images_, +// MODULE_Y_SIZE * n_modules_, +// MODULE_X_SIZE}; +// H5::DataSpace f_i_space(3, f_i_dims); +// hsize_t f_i_count[] = {n_images_to_copy, +// MODULE_Y_SIZE * n_modules_, +// MODULE_X_SIZE}; +// hsize_t f_i_start[] = {data_write_index_, 0, 0}; +// f_i_space.selectHyperslab(H5S_SELECT_SET, f_i_count, f_i_start); +// +// image_dataset_.write( +// data, H5::PredType::NATIVE_UINT16, b_i_space, f_i_space); + + hsize_t offset[] = {data_write_index_, 0, 0}; + size_t data_offset = i_image * MODULE_N_BYTES * n_modules_; + + H5DOwrite_chunk( + image_dataset_.getId(), + H5P_DEFAULT, + 0, + offset, + MODULE_N_BYTES * n_modules_, + data + data_offset); +} + +void JFH5Writer::write_meta( + const int64_t run_id, const uint32_t index, const ImageMetadata& meta) +{ + if (run_id != current_run_id_) { + throw runtime_error("Invalid run_id."); + } + + +} diff --git a/jf-live-writer/src/WriterStats.cpp b/jf-live-writer/src/WriterStats.cpp new file mode 100644 index 0000000..51a0922 --- /dev/null +++ b/jf-live-writer/src/WriterStats.cpp @@ -0,0 +1,69 @@ +#include +#include "WriterStats.hpp" + +using namespace std; +using namespace chrono; + +WriterStats::WriterStats( + const string& detector_name, + const size_t stats_modulo, + const size_t image_n_bytes) : + detector_name_(detector_name), + stats_modulo_(stats_modulo), + image_n_bytes_(image_n_bytes) +{ + reset_counters(); +} + +void WriterStats::reset_counters() +{ + image_counter_ = 0; + total_buffer_write_us_ = 0; + max_buffer_write_us_ = 0; +} + +void WriterStats::start_image_write() +{ + stats_interval_start_ = steady_clock::now(); +} + +void WriterStats::end_image_write() +{ + image_counter_++; + + uint32_t write_us_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + + total_buffer_write_us_ += write_us_duration; + max_buffer_write_us_ = max(max_buffer_write_us_, write_us_duration); + + if (image_counter_ == stats_modulo_) { + print_stats(); + reset_counters(); + } +} + +void WriterStats::print_stats() +{ + const float avg_buffer_write_us = total_buffer_write_us_ / image_counter_; + + const uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + const uint64_t avg_throughput = + // bytes -> megabytes + (image_n_bytes_ / 1024 / 1024) / + // micro seconds -> seconds + (avg_buffer_write_us * 1000 * 1000); + + // Output in InfluxDB line protocol + cout << "jf_buffer_writer"; + cout << ",detector_name=" << detector_name_; + cout << " "; + cout << "n_written_images=" << image_counter_ << "i"; + cout << " ,avg_buffer_write_us=" << avg_buffer_write_us; + cout << " ,max_buffer_write_us=" << max_buffer_write_us_ << "i"; + cout << " ,avg_throughput=" << avg_throughput; + cout << timestamp; + cout << endl; +} diff --git a/jf-live-writer/src/main.cpp b/jf-live-writer/src/main.cpp new file mode 100644 index 0000000..4d77d29 --- /dev/null +++ b/jf-live-writer/src/main.cpp @@ -0,0 +1,88 @@ +#include +#include +#include +#include +#include +#include "live_writer_config.hpp" +#include "WriterStats.hpp" +#include "broker_format.hpp" +#include +#include + + +using namespace std; +using namespace buffer_config; +using namespace live_writer_config; + +int main (int argc, char *argv[]) +{ + if (argc != 3) { + cout << endl; + cout << "Usage: jf_live_writer [detector_json_filename]" + " [bits_per_pixel]" << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; + cout << "\tbits_per_pixel: Number of bits in each pixel." << endl; + cout << endl; + + exit(-1); + } + + auto const config = BufferUtils::read_json_config(string(argv[1])); + auto const bits_per_pixel = atoi(argv[2]); + + MPI_Init(NULL, NULL); + + int n_writers; + MPI_Comm_size(MPI_COMM_WORLD, &n_writers); + + int i_writer; + MPI_Comm_size(MPI_COMM_WORLD, &i_writer); + + auto ctx = zmq_ctx_new(); + zmq_ctx_set(ctx, ZMQ_IO_THREADS, LIVE_ZMQ_IO_THREADS); + auto receiver = BufferUtils::connect_socket( + ctx, config.detector_name, "broker-agent"); + + RamBuffer ram_buffer(config.detector_name, config.n_modules); + + const uint64_t image_n_bytes = + config.image_y_size * config.image_x_size * bits_per_pixel; + + JFH5Writer writer(config); + WriterStats stats(config.detector_name, STATS_MODULO, image_n_bytes); + + StoreStream meta = {}; + while (true) { + zmq_recv(receiver, &meta, sizeof(meta), 0); + + if (meta.op_code == OP_START) { + writer.open_run(meta.run_id, + meta.n_images, + meta.image_y_size, + meta.image_x_size, + meta.bits_per_pixel); + continue; + } + + if (meta.op_code == OP_END) { + writer.close_run(); + continue; + } + + // Fair distribution of images among writers. + if (meta.i_image % n_writers == i_writer) { + char* data = ram_buffer.read_image(meta.image_metadata.pulse_id); + + stats.start_image_write(); + writer.write_data(meta.run_id, meta.i_image, data); + stats.end_image_write(); + } + + // Only the first instance writes metadata. + if (i_writer == 0) { + writer.write_meta(meta.run_id, meta.i_image, meta.image_metadata); + } + } + + MPI_Finalize(); +} diff --git a/jf-live-writer/test/CMakeLists.txt b/jf-live-writer/test/CMakeLists.txt new file mode 100644 index 0000000..8f806b0 --- /dev/null +++ b/jf-live-writer/test/CMakeLists.txt @@ -0,0 +1,8 @@ +add_executable(jf-live-writer-tests main.cpp) + +target_link_libraries(jf-live-writer-tests + jf-live-writer-lib + zmq + rt + gtest + ) diff --git a/jf-live-writer/test/main.cpp b/jf-live-writer/test/main.cpp new file mode 100644 index 0000000..e819294 --- /dev/null +++ b/jf-live-writer/test/main.cpp @@ -0,0 +1,8 @@ +#include "gtest/gtest.h" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/jf-udp-recv/CMakeLists.txt b/jf-udp-recv/CMakeLists.txt new file mode 100644 index 0000000..3c83127 --- /dev/null +++ b/jf-udp-recv/CMakeLists.txt @@ -0,0 +1,18 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(jf-udp-recv-lib STATIC ${SOURCES}) +target_include_directories(jf-udp-recv-lib PUBLIC include/) +target_link_libraries(jf-udp-recv-lib + external + core-buffer-lib) + +add_executable(jf-udp-recv src/main.cpp) +set_target_properties(jf-udp-recv PROPERTIES OUTPUT_NAME jf_udp_recv) +target_link_libraries(jf-udp-recv + jf-udp-recv-lib + zmq + rt) + +enable_testing() +add_subdirectory(test/) diff --git a/sf-buffer/README.md b/jf-udp-recv/README.md similarity index 97% rename from sf-buffer/README.md rename to jf-udp-recv/README.md index a738087..504281f 100644 --- a/sf-buffer/README.md +++ b/jf-udp-recv/README.md @@ -63,7 +63,7 @@ struct ModuleFrame { #pragma pack(1) struct BufferBinaryFormat { const char FORMAT_MARKER = 0xBE; - ModuleFrame metadata; + ModuleFrame meta; char data[buffer_config::MODULE_N_BYTES]; }; #pragma pack(pop) @@ -74,7 +74,7 @@ struct BufferBinaryFormat { Each frame is composed by: - **FORMAT\_MARKER** (0xBE) - a control byte to determine the validity of the frame. -- **ModuleFrame** - frame metadata used in image assembly phase. +- **ModuleFrame** - frame meta used in image assembly phase. - **Data** - assembled frame from a single module. Frames are written one after another to a specific offset in the file. The @@ -139,7 +139,7 @@ and the received data. - VerifyH5DataConsistency.py checks the consistency between the H5 file and buffer. -- BinaryBufferReader.py reads the buffer and prints metadata. The class inside +- BinaryBufferReader.py reads the buffer and prints meta. The class inside can also be used in external scripts. ### ZMQ sending diff --git a/jf-udp-recv/include/FrameStats.hpp b/jf-udp-recv/include/FrameStats.hpp new file mode 100644 index 0000000..7839a38 --- /dev/null +++ b/jf-udp-recv/include/FrameStats.hpp @@ -0,0 +1,31 @@ +#include +#include +#include + +#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP +#define SF_DAQ_BUFFER_FRAMESTATS_HPP + + +class FrameStats { + const std::string detector_name_; + const int module_id_; + size_t stats_time_; + + int frames_counter_; + int n_missed_packets_; + int n_corrupted_frames_; + int n_corrupted_pulse_id_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + FrameStats(const std::string &detector_name, + const int module_id, + const size_t stats_time); + void record_stats(const ModuleFrame &meta, const bool bad_pulse_id); +}; + + +#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP diff --git a/sf-buffer/include/FrameUdpReceiver.hpp b/jf-udp-recv/include/FrameUdpReceiver.hpp similarity index 92% rename from sf-buffer/include/FrameUdpReceiver.hpp rename to jf-udp-recv/include/FrameUdpReceiver.hpp index 6dece49..eea4d46 100644 --- a/sf-buffer/include/FrameUdpReceiver.hpp +++ b/jf-udp-recv/include/FrameUdpReceiver.hpp @@ -7,7 +7,7 @@ #include "buffer_config.hpp" class FrameUdpReceiver { - const int source_id_; + const int module_id_; PacketUdpReceiver udp_receiver_; @@ -27,7 +27,7 @@ class FrameUdpReceiver { const int n_packets, ModuleFrame& metadata, char* frame_buffer); public: - FrameUdpReceiver(const uint16_t port, const int source_id); + FrameUdpReceiver(const uint16_t port, const int module_id); virtual ~FrameUdpReceiver(); uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer); }; diff --git a/sf-buffer/include/PacketUdpReceiver.hpp b/jf-udp-recv/include/PacketUdpReceiver.hpp similarity index 100% rename from sf-buffer/include/PacketUdpReceiver.hpp rename to jf-udp-recv/include/PacketUdpReceiver.hpp diff --git a/jf-udp-recv/src/FrameStats.cpp b/jf-udp-recv/src/FrameStats.cpp new file mode 100644 index 0000000..28161c7 --- /dev/null +++ b/jf-udp-recv/src/FrameStats.cpp @@ -0,0 +1,71 @@ +#include +#include "FrameStats.hpp" + +using namespace std; +using namespace chrono; + +FrameStats::FrameStats( + const std::string &detector_name, + const int module_id, + const size_t stats_time) : + detector_name_(detector_name), + module_id_(module_id), + stats_time_(stats_time) +{ + reset_counters(); +} + +void FrameStats::reset_counters() +{ + frames_counter_ = 0; + n_missed_packets_ = 0; + n_corrupted_frames_ = 0; + n_corrupted_pulse_id_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) +{ + + if (bad_pulse_id) { + n_corrupted_pulse_id_++; + } + + if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) { + n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets; + n_corrupted_frames_++; + } + + frames_counter_++; + + auto time_passed = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + + if (time_passed >= stats_time_*1000) { + print_stats(); + reset_counters(); + } +} + +void FrameStats::print_stats() +{ + auto interval_ms_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "jf_udp_recv"; + cout << ",detector_name=" << detector_name_; + cout << ",module_name=M" << module_id_; + cout << " "; + cout << "n_missed_packets=" << n_missed_packets_ << "i"; + cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} diff --git a/sf-buffer/src/FrameUdpReceiver.cpp b/jf-udp-recv/src/FrameUdpReceiver.cpp similarity index 89% rename from sf-buffer/src/FrameUdpReceiver.cpp rename to jf-udp-recv/src/FrameUdpReceiver.cpp index bcad86e..cb78a7a 100644 --- a/sf-buffer/src/FrameUdpReceiver.cpp +++ b/jf-udp-recv/src/FrameUdpReceiver.cpp @@ -7,8 +7,8 @@ using namespace buffer_config; FrameUdpReceiver::FrameUdpReceiver( const uint16_t port, - const int source_id) : - source_id_(source_id) + const int module_id) : + module_id_(module_id) { udp_receiver_.bind(port); @@ -33,7 +33,7 @@ inline void FrameUdpReceiver::init_frame( frame_metadata.pulse_id = packet_buffer_[i_packet].bunchid; frame_metadata.frame_index = packet_buffer_[i_packet].framenum; frame_metadata.daq_rec = (uint64_t) packet_buffer_[i_packet].debug; - frame_metadata.module_id = (int64_t) source_id_; + frame_metadata.module_id = (int64_t) module_id_; } inline void FrameUdpReceiver::copy_packet_to_buffers( @@ -54,18 +54,17 @@ inline uint64_t FrameUdpReceiver::process_packets( ModuleFrame& metadata, char* frame_buffer) { - for ( - int i_packet=start_offset; - i_packet < packet_buffer_n_packets_; - i_packet++) { + for (int i_packet=start_offset; + i_packet < packet_buffer_n_packets_; + i_packet++) { // First packet for this frame. if (metadata.pulse_id == 0) { init_frame(metadata, i_packet); // Happens if the last packet from the previous frame gets lost. - // In the jungfrau_packet, pulse_id is called bunchid. - } else if (metadata.pulse_id != packet_buffer_[i_packet].bunchid) { + // In the jungfrau_packet, framenum is the trigger number (how many triggers from detector power-on) happened + } else if (metadata.frame_index != packet_buffer_[i_packet].framenum) { packet_buffer_loaded_ = true; // Continue on this packet. packet_buffer_offset_ = i_packet; @@ -135,4 +134,4 @@ uint64_t FrameUdpReceiver::get_frame_from_udp( return pulse_id; } } -} \ No newline at end of file +} diff --git a/sf-buffer/src/PacketUdpReceiver.cpp b/jf-udp-recv/src/PacketUdpReceiver.cpp similarity index 100% rename from sf-buffer/src/PacketUdpReceiver.cpp rename to jf-udp-recv/src/PacketUdpReceiver.cpp diff --git a/jf-udp-recv/src/main.cpp b/jf-udp-recv/src/main.cpp new file mode 100644 index 0000000..bca57e2 --- /dev/null +++ b/jf-udp-recv/src/main.cpp @@ -0,0 +1,75 @@ +#include +#include +#include +#include + +#include "formats.hpp" +#include "buffer_config.hpp" +#include "FrameUdpReceiver.hpp" +#include "BufferUtils.hpp" +#include "FrameStats.hpp" + +using namespace std; +using namespace chrono; +using namespace buffer_config; +using namespace BufferUtils; + +int main (int argc, char *argv[]) { + + if (argc != 3) { + cout << endl; + cout << "Usage: jf_udp_recv [detector_json_filename] [module_id]"; + cout << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; + cout << "\tmodule_id: id of the module for this process." << endl; + cout << endl; + + exit(-1); + } + + const auto config = read_json_config(string(argv[1])); + const int module_id = atoi(argv[2]); + + const auto udp_port = config.start_udp_port + module_id; + FrameUdpReceiver receiver(udp_port, module_id); + RamBuffer buffer(config.detector_name, config.n_modules); + FrameStats stats(config.detector_name, module_id, STATS_TIME); + + auto ctx = zmq_ctx_new(); + auto socket = bind_socket(ctx, config.detector_name, to_string(module_id)); + + ModuleFrame meta; + char* data = new char[MODULE_N_BYTES]; + + uint64_t pulse_id_previous = 0; + uint64_t frame_index_previous = 0; + + while (true) { + + auto pulse_id = receiver.get_frame_from_udp(meta, data); + + bool bad_pulse_id = false; + + if ( ( meta.frame_index != (frame_index_previous+1) ) || + ( (pulse_id-pulse_id_previous) < 0 ) || + ( (pulse_id-pulse_id_previous) > 1000 ) ) { + + bad_pulse_id = true; + + } else { + + buffer.write_frame(meta, data); + + zmq_send(socket, &pulse_id, sizeof(pulse_id), 0); + + } + + stats.record_stats(meta, bad_pulse_id); + + pulse_id_previous = pulse_id; + frame_index_previous = meta.frame_index; + + } + + delete[] data; +} diff --git a/jf-udp-recv/test/CMakeLists.txt b/jf-udp-recv/test/CMakeLists.txt new file mode 100644 index 0000000..25c729a --- /dev/null +++ b/jf-udp-recv/test/CMakeLists.txt @@ -0,0 +1,8 @@ +add_executable(jf-udp-recv-tests main.cpp) + +target_link_libraries(jf-udp-recv-tests + core-buffer-lib + jf-udp-recv-lib + gtest + ) + diff --git a/sf-buffer/test/main.cpp b/jf-udp-recv/test/main.cpp similarity index 85% rename from sf-buffer/test/main.cpp rename to jf-udp-recv/test/main.cpp index a8035f5..8f2cd01 100644 --- a/sf-buffer/test/main.cpp +++ b/jf-udp-recv/test/main.cpp @@ -1,6 +1,5 @@ #include "gtest/gtest.h" #include "test_PacketUdpReceiver.cpp" -#include "test_BufferBinaryWriter.cpp" #include "test_FrameUdpReceiver.cpp" using namespace std; diff --git a/sf-buffer/test/mock/udp.hpp b/jf-udp-recv/test/mock/udp.hpp similarity index 100% rename from sf-buffer/test/mock/udp.hpp rename to jf-udp-recv/test/mock/udp.hpp diff --git a/sf-buffer/test/test_FrameUdpReceiver.cpp b/jf-udp-recv/test/test_FrameUdpReceiver.cpp similarity index 100% rename from sf-buffer/test/test_FrameUdpReceiver.cpp rename to jf-udp-recv/test/test_FrameUdpReceiver.cpp diff --git a/sf-buffer/test/test_PacketUdpReceiver.cpp b/jf-udp-recv/test/test_PacketUdpReceiver.cpp similarity index 100% rename from sf-buffer/test/test_PacketUdpReceiver.cpp rename to jf-udp-recv/test/test_PacketUdpReceiver.cpp diff --git a/scripts/JF01-buffer-worker.sh b/scripts/JF01-buffer-worker.sh index e9a101e..a7ae3db 100644 --- a/scripts/JF01-buffer-worker.sh +++ b/scripts/JF01-buffer-worker.sh @@ -13,5 +13,6 @@ coreAssociatedBuffer=(12 12 12) initialUDPport=50010 port=$((${initialUDPport}+10#${M})) DETECTOR=JF01T03V01 +N_MODULES=3 -taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/local/bin/sf_buffer ${DETECTOR} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/local/bin/sf_buffer ${DETECTOR} ${N_MODULES} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/scripts/JF02-buffer-worker.sh b/scripts/JF02-buffer-worker.sh index a358eec..1bb35e0 100644 --- a/scripts/JF02-buffer-worker.sh +++ b/scripts/JF02-buffer-worker.sh @@ -24,5 +24,6 @@ esac initialUDPport=50020 port=$((${initialUDPport}+10#${M})) DETECTOR=JF02T09V02 +N_MODULES=9 -taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/local/bin/sf_buffer ${DETECTOR} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/local/bin/sf_buffer ${DETECTOR} ${N_MODULES} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/scripts/JF06-buffer-worker.sh b/scripts/JF06-buffer-worker.sh index 1c94d44..3124952 100644 --- a/scripts/JF06-buffer-worker.sh +++ b/scripts/JF06-buffer-worker.sh @@ -13,5 +13,6 @@ coreAssociatedBuffer=(4 4 4 4 5 5 5 5 6 6 6 6 7 7 7 7 8 8 8 8 9 9 9 9 10 10 10 1 initialUDPport=50060 port=$((${initialUDPport}+10#${M})) DETECTOR=JF06T32V02 +N_MODULES=32 -taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/local/bin/sf_buffer ${DETECTOR} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/local/bin/sf_buffer ${DETECTOR} ${N_MODULES} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/scripts/JF06_4M-buffer-worker.sh b/scripts/JF06_4M-buffer-worker.sh index 7bc9afd..846f5eb 100644 --- a/scripts/JF06_4M-buffer-worker.sh +++ b/scripts/JF06_4M-buffer-worker.sh @@ -13,5 +13,6 @@ coreAssociatedBuffer=(4 5 6 7 8 9 10 11) initialUDPport=50060 port=$((${initialUDPport}+10#${M})) DETECTOR=JF06T08V02 +N_MODULES=8 -taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/local/bin/sf_buffer ${DETECTOR} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/local/bin/sf_buffer ${DETECTOR} ${N_MODULES} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/scripts/JF07-buffer-worker.sh b/scripts/JF07-buffer-worker.sh index 5d744e1..2698a36 100644 --- a/scripts/JF07-buffer-worker.sh +++ b/scripts/JF07-buffer-worker.sh @@ -23,5 +23,6 @@ coreAssociatedBuffer=(4 4 4 4 5 5 5 5 6 6 6 6 7 7 7 7 8 8 8 8 9 9 9 9 10 10 10 1 initialUDPport=50100 port=$((${initialUDPport}+10#${M})) DETECTOR=JF07T32V01 +N_MODULES=32 -taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer ${DETECTOR} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer ${DETECTOR} ${N_MODULES} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/scripts/JF13-buffer-worker.sh b/scripts/JF13-buffer-worker.sh index c269179..21fff2a 100644 --- a/scripts/JF13-buffer-worker.sh +++ b/scripts/JF13-buffer-worker.sh @@ -16,5 +16,6 @@ coreAssociatedBuffer=(13) initialUDPport=50190 port=$((${initialUDPport}+10#${M})) DETECTOR=JF13T01V01 +N_MODULES=1 -taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer ${DETECTOR} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} +taskset -c ${coreAssociatedBuffer[10#${M}]} /usr/bin/sf_buffer ${DETECTOR} ${N_MODULES} M${M} ${port} /gpfs/photonics/swissfel/buffer/${DETECTOR} ${M} diff --git a/sf-buffer/CMakeLists.txt b/sf-buffer/CMakeLists.txt deleted file mode 100644 index 5063955..0000000 --- a/sf-buffer/CMakeLists.txt +++ /dev/null @@ -1,17 +0,0 @@ -file(GLOB SOURCES - src/*.cpp) - -add_library(sf-buffer-lib STATIC ${SOURCES}) -target_include_directories(sf-buffer-lib PUBLIC include/) -target_link_libraries(sf-buffer-lib - external - core-buffer-lib) - -add_executable(sf-buffer src/main.cpp) -set_target_properties(sf-buffer PROPERTIES OUTPUT_NAME sf_buffer) -target_link_libraries(sf-buffer - sf-buffer-lib - zmq) - -enable_testing() -add_subdirectory(test/) diff --git a/sf-buffer/src/main.cpp b/sf-buffer/src/main.cpp deleted file mode 100644 index a82ff25..0000000 --- a/sf-buffer/src/main.cpp +++ /dev/null @@ -1,141 +0,0 @@ -#include -#include -#include -#include -#include -#include - -#include "formats.hpp" -#include "buffer_config.hpp" -#include "jungfrau.hpp" -#include "FrameUdpReceiver.hpp" -#include "BufferBinaryWriter.hpp" - -using namespace std; -using namespace chrono; -using namespace buffer_config; - -void* get_live_stream_socket(const string& detector_name, const int source_id) -{ - stringstream ipc_stream; - string LIVE_IPC_URL = BUFFER_LIVE_IPC_URL + detector_name + "-"; - ipc_stream << LIVE_IPC_URL << source_id; - const auto ipc_address = ipc_stream.str(); - - void* ctx = zmq_ctx_new(); - void* socket = zmq_socket(ctx, ZMQ_PUB); - - const int sndhwm = BUFFER_ZMQ_SNDHWM; - if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - const int linger = 0; - if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - if (zmq_bind(socket, ipc_address.c_str()) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - return socket; -} - -int main (int argc, char *argv[]) { - - if (argc != 6) { - cout << endl; - cout << "Usage: sf_buffer [detector_name] [device_name]"; - cout << " [udp_port] [root_folder] [source_id]"; - cout << endl; - cout << "\tdetector_name: Detector name, example JF07T32V01" << endl; - cout << "\tdevice_name: Name to write to disk."; - cout << "\tudp_port: UDP port to connect to." << endl; - cout << "\troot_folder: FS root folder." << endl; - cout << "\tsource_id: ID of the source for live stream." << endl; - cout << endl; - - exit(-1); - } - - string detector_name = string(argv[1]); - string device_name = string(argv[2]); - int udp_port = atoi(argv[3]); - string root_folder = string(argv[4]); - int source_id = atoi(argv[5]); - - uint64_t stats_counter(0); - uint64_t n_missed_packets = 0; - uint64_t n_corrupted_frames = 0; - - BufferBinaryWriter writer(root_folder, device_name); - FrameUdpReceiver receiver(udp_port, source_id); - - auto binary_buffer = new BufferBinaryFormat(); - auto socket = get_live_stream_socket(detector_name, source_id); - - size_t write_total_us = 0; - size_t write_max_us = 0; - size_t send_total_us = 0; - size_t send_max_us = 0; - - while (true) { - - auto pulse_id = receiver.get_frame_from_udp( - binary_buffer->metadata, binary_buffer->data); - - auto start_time = steady_clock::now(); - - writer.write(pulse_id, binary_buffer); - - auto write_end_time = steady_clock::now(); - size_t write_us_duration = duration_cast( - write_end_time-start_time).count(); - - start_time = steady_clock::now(); - - zmq_send(socket, &(binary_buffer->metadata), sizeof(ModuleFrame), - ZMQ_SNDMORE); - zmq_send(socket, binary_buffer->data, MODULE_N_BYTES, 0); - - auto send_end_time = steady_clock::now(); - size_t send_us_duration = duration_cast( - send_end_time-start_time).count(); - - // TODO: Make real statistics, please. - stats_counter++; - write_total_us += write_us_duration; - send_total_us += send_us_duration; - - write_max_us = max(write_max_us, write_us_duration); - send_max_us = max(send_max_us, send_us_duration); - - if (binary_buffer->metadata.n_recv_packets < JF_N_PACKETS_PER_FRAME) { - n_missed_packets += JF_N_PACKETS_PER_FRAME - - binary_buffer->metadata.n_recv_packets; - n_corrupted_frames++; - } - - if (stats_counter == STATS_MODULO) { - cout << "sf_buffer:device_name " << device_name; - cout << " sf_buffer:n_missed_packets " << n_missed_packets; - cout << " sf_buffer:n_corrupted_frames " << n_corrupted_frames; - - cout << " sf_buffer:write_total_us " << write_total_us/STATS_MODULO; - cout << " sf_buffer:write_max_us " << write_max_us; - cout << " sf_buffer:send_total_us " << send_total_us/STATS_MODULO; - cout << " sf_buffer:send_max_us " << send_max_us; - cout << endl; - - stats_counter = 0; - n_missed_packets = 0; - n_corrupted_frames = 0; - - write_total_us = 0; - write_max_us = 0; - send_total_us = 0; - send_max_us = 0; - } - } -} diff --git a/sf-buffer/test/CMakeLists.txt b/sf-buffer/test/CMakeLists.txt deleted file mode 100644 index 4ae80a8..0000000 --- a/sf-buffer/test/CMakeLists.txt +++ /dev/null @@ -1,8 +0,0 @@ -add_executable(sf-buffer-tests main.cpp) - -target_link_libraries(sf-buffer-tests - core-buffer-lib - sf-buffer-lib - gtest - ) - diff --git a/sf-stream/CMakeLists.txt b/sf-stream/CMakeLists.txt index fafe832..9957767 100644 --- a/sf-stream/CMakeLists.txt +++ b/sf-stream/CMakeLists.txt @@ -10,9 +10,12 @@ target_link_libraries(sf-stream-lib add_executable(sf-stream src/main.cpp) set_target_properties(sf-stream PROPERTIES OUTPUT_NAME sf_stream) target_link_libraries(sf-stream + external + core-buffer-lib sf-stream-lib zmq - pthread) + pthread + rt) enable_testing() add_subdirectory(test/) diff --git a/sf-stream/README.md b/sf-stream/README.md index 1ccd5fd..ce21a4d 100644 --- a/sf-stream/README.md +++ b/sf-stream/README.md @@ -6,13 +6,13 @@ per detector. It currently has 3 output streams: -- **Full data full metadata** rate stream (send all images and metadata) -- **Reduced data full metadata** rate stream (send less images, but -all metadata) +- **Full data full meta** rate stream (send all images and meta) +- **Reduced data full meta** rate stream (send less images, but +all meta) - **Pulse_id** stream (send only the current pulse_id) In addition to receiving and assembling images, sf-stream also calculates -additional metadata and constructs the structures needed to send data in +additional meta and constructs the structures needed to send data in Array 1.0 protocol. This component does not guarantee that the streams will always contain all @@ -103,12 +103,12 @@ arrived. We devide the ZMQ sending to 3 types of stream: - Data processing stream. This is basically the complete stream from -the detector with all metadata and data. It can be described as full data full -metadata stream. Only 1 client at the time can be connected to this stream +the detector with all meta and data. It can be described as full data full +meta stream. Only 1 client at the time can be connected to this stream (PUSH/PULL for load balancing). -- Live viewing stream. This is a reduced data full metadata stream. We send -metadata for all frames, but data only for subset of them (10Hz, for example). +- Live viewing stream. This is a reduced data full meta stream. We send +meta for all frames, but data only for subset of them (10Hz, for example). Any number of clients can connect to the 10Hz stream, because we use PUB/SUB for this socket. @@ -137,7 +137,7 @@ We use following fields in the JSON header: |type|string|Value: "uint16"| |shape|Array[uint64]|Shape of the image in stream| -### Full data full metadata stream +### Full data full meta stream This stream runs at detector frequency and uses PUSH/PULL to distribute data to max 1 client (this client can have many processes, but it needs to be a @@ -151,9 +151,9 @@ for purposes of online analysis. Given the large amount of data on this stream only "pre-approved" applications that can handle the load should be attached here. -### Reduced data full metadata stream +### Reduced data full meta stream -This streams also runs at detector frequency for JSON headers (metadata), but +This streams also runs at detector frequency for JSON headers (meta), but it sends only part of the images in the stream. The rest of the images are sent as empty buffers (the receiver needs to be aware of this behaviour, as Array 1.0 alone does not define it). diff --git a/sf-stream/include/StreamStats.hpp b/sf-stream/include/StreamStats.hpp new file mode 100644 index 0000000..bca5ce0 --- /dev/null +++ b/sf-stream/include/StreamStats.hpp @@ -0,0 +1,29 @@ +#ifndef SF_DAQ_BUFFER_STREAMSTATS_HPP +#define SF_DAQ_BUFFER_STREAMSTATS_HPP + +#include +#include +#include + +class StreamStats { + const std::string detector_name_; + const std::string stream_name_; + const size_t stats_modulo_; + + int image_counter_; + int n_corrupted_images_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + StreamStats(const std::string &detector_name, + const std::string &stream_name, + const size_t stats_modulo); + + void record_stats(const ImageMetadata &meta); +}; + + +#endif //SF_DAQ_BUFFER_STREAMSTATS_HPP diff --git a/sf-stream/include/ZmqLiveReceiver.hpp b/sf-stream/include/ZmqLiveReceiver.hpp deleted file mode 100644 index c12e34e..0000000 --- a/sf-stream/include/ZmqLiveReceiver.hpp +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef SF_DAQ_BUFFER_ZMQLIVERECEIVER_HPP -#define SF_DAQ_BUFFER_ZMQLIVERECEIVER_HPP - - -#include -#include -#include - -#include "formats.hpp" - -class ZmqLiveReceiver { - - const size_t n_modules_; - void* ctx_; - const std::string ipc_prefix_; - std::vector sockets_; - - void* connect_socket(size_t module_id); - void recv_single_module(void* socket, ModuleFrame* meta, char* data); - uint64_t align_modules(ModuleFrameBuffer *meta, char *data); - -public: - ZmqLiveReceiver(const size_t n_modules, - void* ctx, - const std::string& ipc_prefix); - - ~ZmqLiveReceiver(); - - uint64_t get_next_image(ModuleFrameBuffer* meta, char* data); -}; - - -#endif //SF_DAQ_BUFFER_ZMQLIVERECEIVER_HPP diff --git a/sf-stream/include/ZmqLiveSender.hpp b/sf-stream/include/ZmqLiveSender.hpp index ee197d5..5a0a1b4 100644 --- a/sf-stream/include/ZmqLiveSender.hpp +++ b/sf-stream/include/ZmqLiveSender.hpp @@ -2,43 +2,23 @@ #define SF_DAQ_BUFFER_ZMQLIVESENDER_HPP #include -#include -#include -#include -#include -#include - #include "formats.hpp" +#include "BufferUtils.hpp" -struct LiveStreamConfig { - const std::string streamvis_address; - const int reduction_factor_streamvis; - const std::string live_analysis_address; - const int reduction_factor_live_analysis; - const std::string PEDE_FILENAME; - const std::string GAIN_FILENAME; - const std::string DETECTOR_NAME; - const int n_modules; - const std::string pulse_address; -}; - -LiveStreamConfig read_json_config(const std::string filename); - class ZmqLiveSender { const void* ctx_; - const LiveStreamConfig config_; + const BufferUtils::DetectorConfig config_; void* socket_streamvis_; void* socket_live_; - void* socket_pulse_; public: ZmqLiveSender(void* ctx, - const LiveStreamConfig& config); + const BufferUtils::DetectorConfig& config); ~ZmqLiveSender(); - void send(const ModuleFrameBuffer* meta, const char* data); + void send(const ImageMetadata& meta, const char* data); }; diff --git a/sf-stream/include/ZmqPulseSyncReceiver.hpp b/sf-stream/include/ZmqPulseSyncReceiver.hpp new file mode 100644 index 0000000..624de34 --- /dev/null +++ b/sf-stream/include/ZmqPulseSyncReceiver.hpp @@ -0,0 +1,34 @@ +#ifndef SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP +#define SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP + + +#include +#include +#include + +#include "formats.hpp" + +struct PulseAndSync { + const uint64_t pulse_id; + const uint32_t n_lost_pulses; +}; + +class ZmqPulseSyncReceiver { + + void* ctx_; + const int n_modules_; + + std::vector sockets_; + +public: + ZmqPulseSyncReceiver( + void* ctx, + const std::string& detector_name, + const int n_modules); + ~ZmqPulseSyncReceiver(); + + PulseAndSync get_next_pulse_id() const; +}; + + +#endif //SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP diff --git a/sf-stream/include/stream_config.hpp b/sf-stream/include/stream_config.hpp index 745b9cd..8f8b977 100644 --- a/sf-stream/include/stream_config.hpp +++ b/sf-stream/include/stream_config.hpp @@ -1,15 +1,20 @@ namespace stream_config { // N of IO threads to receive data from modules. - const int STREAM_ZMQ_IO_THREADS = 5; + const int STREAM_ZMQ_IO_THREADS = 1; // How long should the RECV queue be. const size_t STREAM_RCVHWM = 100; // Size of buffer between the receiving and sending part. const int STREAM_FASTQUEUE_SLOTS = 5; // If the modules are offset more than 1000 pulses, crush. - const uint64_t PULSE_OFFSET_LIMIT = 1000; + const uint64_t PULSE_OFFSET_LIMIT = 100; // SNDHWM for live processing socket. const int PROCESSING_ZMQ_SNDHWM = 10; // Keep the last second of pulses in the buffer. const int PULSE_ZMQ_SNDHWM = 100; + // Number of times we try to re-sync in case of failure. + const int SYNC_RETRY_LIMIT = 3; + + // Number of pulses between each statistics print out. + const size_t STREAM_STATS_MODULO = 1000; } diff --git a/sf-stream/src/StreamStats.cpp b/sf-stream/src/StreamStats.cpp new file mode 100644 index 0000000..7408629 --- /dev/null +++ b/sf-stream/src/StreamStats.cpp @@ -0,0 +1,62 @@ +#include "StreamStats.hpp" + +#include + +using namespace std; +using namespace chrono; + +StreamStats::StreamStats( + const std::string &detector_name, + const std::string &stream_name, + const size_t stats_modulo) : + detector_name_(detector_name), + stream_name_(stream_name), + stats_modulo_(stats_modulo) +{ + reset_counters(); +} + +void StreamStats::reset_counters() +{ + image_counter_ = 0; + n_corrupted_images_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void StreamStats::record_stats( + const ImageMetadata &meta) +{ + image_counter_++; + + if (!meta.is_good_image) { + n_corrupted_images_++; + } + + if (image_counter_ == stats_modulo_) { + print_stats(); + reset_counters(); + } +} + +void StreamStats::print_stats() +{ + auto interval_ms_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((image_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "sf_stream"; + cout << ",detector_name=" << detector_name_; + cout << ",stream_name=" << stream_name_; + cout << " "; + cout << "n_processed_images=" << image_counter_ << "i"; + cout << ",n_corrupted_images=" << n_corrupted_images_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} + diff --git a/sf-stream/src/ZmqLiveReceiver.cpp b/sf-stream/src/ZmqLiveReceiver.cpp deleted file mode 100644 index 5ac915e..0000000 --- a/sf-stream/src/ZmqLiveReceiver.cpp +++ /dev/null @@ -1,171 +0,0 @@ -#include "ZmqLiveReceiver.hpp" - -#include -#include -#include -#include - -#include "buffer_config.hpp" -#include "stream_config.hpp" - -using namespace std; -using namespace chrono; -using namespace buffer_config; -using namespace stream_config; - - -ZmqLiveReceiver::ZmqLiveReceiver( - const size_t n_modules, - void* ctx, - const std::string &ipc_prefix) : - n_modules_(n_modules), - ctx_(ctx), - ipc_prefix_(ipc_prefix), - sockets_(n_modules) -{ - for (size_t i = 0; i < n_modules_; i++) { - sockets_[i] = connect_socket(i); - } -} - -ZmqLiveReceiver::~ZmqLiveReceiver() -{ - for (auto& socket:sockets_) { - zmq_close(socket); - } -} - -void* ZmqLiveReceiver::connect_socket(size_t module_id) -{ - void* socket = zmq_socket(ctx_, ZMQ_SUB); - if (socket == nullptr) { - throw runtime_error(zmq_strerror(errno)); - } - - int rcvhwm = STREAM_RCVHWM; - if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - int linger = 0; - if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - stringstream ipc_addr; - ipc_addr << ipc_prefix_ << module_id; - const auto ipc = ipc_addr.str(); - - if (zmq_connect(socket, ipc.c_str()) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - if (zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - return socket; -} - -void ZmqLiveReceiver::recv_single_module( - void* socket, ModuleFrame* meta, char* data) -{ - auto n_bytes_meta = zmq_recv(socket, meta, sizeof(ModuleFrame), 0); - - if (n_bytes_meta == -1) { - throw runtime_error(zmq_strerror(errno)); - } - if (n_bytes_meta != sizeof(ModuleFrame)) { - throw runtime_error("Stream header of wrong size."); - } - if (meta->pulse_id == 0) { - throw runtime_error("Received invalid pulse_id=0."); - } - - auto n_bytes_frame = zmq_recv(socket, data, MODULE_N_BYTES, 0); - - if (n_bytes_frame == -1) { - throw runtime_error(zmq_strerror(errno)); - } - if (n_bytes_frame != MODULE_N_BYTES) { - throw runtime_error("Stream data of wrong size."); - } -} - -uint64_t ZmqLiveReceiver::align_modules(ModuleFrameBuffer *meta, char *data) -{ - uint64_t max_pulse_id = 0; - uint64_t min_pulse_id = numeric_limits::max(); - - // First pass - determine current min and max pulse_id. - for (size_t i_module = 0; i_module < n_modules_; i_module++) { - auto& module_meta = meta->module[i_module]; - - min_pulse_id = min(min_pulse_id, module_meta.pulse_id); - max_pulse_id = max(max_pulse_id, module_meta.pulse_id); - } - - auto max_diff = max_pulse_id - min_pulse_id; - if (max_diff > PULSE_OFFSET_LIMIT) { - stringstream err_msg; - - err_msg << "[ZmqLiveReceiver::align_modules]"; - err_msg << " PULSE_OFFSET_LIMIT exceeded."; - err_msg << " Modules out of sync for " << max_diff << " pulses."; - - for (size_t i_module = 0; i_module < n_modules_; i_module++) { - auto& module_meta = meta->module[i_module]; - - err_msg << " (" << module_meta.module_id << ", "; - err_msg << module_meta.pulse_id << "),"; - } - - err_msg << endl; - - throw runtime_error(err_msg.str()); - } - - // Second pass - align all receivers to max_pulse_id. - for (size_t i_module = 0; i_module < n_modules_; i_module++) { - auto& module_meta = meta->module[i_module]; - - while (module_meta.pulse_id < max_pulse_id) { - recv_single_module( - sockets_[i_module], - &module_meta, - data + (MODULE_N_BYTES * i_module)); - } - - if (module_meta.pulse_id != max_pulse_id) { - throw runtime_error("Cannot align pulse_ids."); - } - } - - return max_pulse_id - min_pulse_id; -} - -uint64_t ZmqLiveReceiver::get_next_image(ModuleFrameBuffer* meta, char* data) -{ - uint64_t frame_pulse_id; - bool sync_needed = false; - - for (size_t i_module = 0; i_module < n_modules_; i_module++) { - auto& module_meta = meta->module[i_module]; - - char* buffer = data + (MODULE_N_BYTES * i_module); - recv_single_module(sockets_[i_module], &module_meta, buffer); - - if (i_module == 0) { - frame_pulse_id = module_meta.pulse_id; - } else if (frame_pulse_id != module_meta.pulse_id) { - sync_needed = true; - } - } - - if (sync_needed) { - auto lost_pulses = align_modules(meta, data); - return lost_pulses; - } - - return 0; -} diff --git a/sf-stream/src/ZmqLiveSender.cpp b/sf-stream/src/ZmqLiveSender.cpp index 5a2719b..67b44c7 100644 --- a/sf-stream/src/ZmqLiveSender.cpp +++ b/sf-stream/src/ZmqLiveSender.cpp @@ -3,35 +3,18 @@ #include "zmq.h" #include +#include +#include +#include #include // using namespace std; using namespace stream_config; -LiveStreamConfig read_json_config(const std::string filename) -{ - std::ifstream ifs(filename); - rapidjson::IStreamWrapper isw(ifs); - rapidjson::Document config_parameters; - config_parameters.ParseStream(isw); - - return { - config_parameters["streamvis_stream"].GetString(), - config_parameters["streamvis_rate"].GetInt(), - config_parameters["live_stream"].GetString(), - config_parameters["live_rate"].GetInt(), - config_parameters["pedestal_file"].GetString(), - config_parameters["gain_file"].GetString(), - config_parameters["detector_name"].GetString(), - config_parameters["n_modules"].GetInt(), - "tcp://127.0.0.1:51234" - }; -} - ZmqLiveSender::ZmqLiveSender( void* ctx, - const LiveStreamConfig& config) : + const BufferUtils::DetectorConfig& config) : ctx_(ctx), config_(config) { @@ -69,7 +52,7 @@ ZmqLiveSender::~ZmqLiveSender() zmq_close(socket_live_); } -void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data) +void ZmqLiveSender::send(const ImageMetadata& meta, const char *data) { uint16_t data_empty [] = { 0, 0, 0, 0}; @@ -77,40 +60,11 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data) auto& header_alloc = header.GetAllocator(); string text_header; - uint64_t pulse_id = 0; - uint64_t frame_index = 0; - uint64_t daq_rec = 0; - bool is_good_frame = true; - - for (size_t i_module = 0; i_module < config_.n_modules; i_module++) { - // TODO: Place this tests in the appropriate spot. - auto& module_metadata = meta->module[i_module]; - if (i_module == 0) { - pulse_id = module_metadata.pulse_id; - frame_index = module_metadata.frame_index; - daq_rec = module_metadata.daq_rec; - - if (module_metadata.n_recv_packets != 128 ) is_good_frame = false; - } else { - if (module_metadata.pulse_id != pulse_id) is_good_frame = false; - - if (module_metadata.frame_index != frame_index) is_good_frame = false; - - if (module_metadata.daq_rec != daq_rec) is_good_frame = false; - - if (module_metadata.n_recv_packets != 128 ) is_good_frame = false; - } - if (pulse_id % 10000 == 0 && is_good_frame != true) { - cout << "Frame is not good " << pulse_id << " module : " << i_module << " frame_index(0) : " << frame_index << " frame_index : " << module_metadata.frame_index << endl; - } - } - // TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) - - header.AddMember("frame", frame_index, header_alloc); - header.AddMember("is_good_frame", is_good_frame, header_alloc); - header.AddMember("daq_rec", daq_rec, header_alloc); - header.AddMember("pulse_id", pulse_id, header_alloc); + header.AddMember("frame", meta.frame_index, header_alloc); + header.AddMember("is_good_frame", meta.is_good_image, header_alloc); + header.AddMember("daq_rec", meta.daq_rec, header_alloc); + header.AddMember("pulse_id", meta.pulse_id, header_alloc); rapidjson::Value pedestal_file; pedestal_file.SetString(config_.PEDE_FILENAME.c_str(), header_alloc); @@ -124,12 +78,12 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data) rapidjson::Value run_name; run_name.SetString( - to_string(uint64_t(pulse_id/10000)*10000).c_str(), + to_string(uint64_t(meta.pulse_id/10000)*10000).c_str(), header_alloc); header.AddMember("run_name", run_name, header_alloc); rapidjson::Value detector_name; - detector_name.SetString(config_.DETECTOR_NAME.c_str(), header_alloc); + detector_name.SetString(config_.detector_name.c_str(), header_alloc); header.AddMember("detector_name", detector_name, header_alloc); header.AddMember("htype", "array-1.0", header_alloc); diff --git a/sf-stream/src/ZmqPulseSyncReceiver.cpp b/sf-stream/src/ZmqPulseSyncReceiver.cpp new file mode 100644 index 0000000..96221f3 --- /dev/null +++ b/sf-stream/src/ZmqPulseSyncReceiver.cpp @@ -0,0 +1,115 @@ +#include "ZmqPulseSyncReceiver.hpp" +#include "BufferUtils.hpp" + +#include +#include +#include +#include +#include +#include + +#include "stream_config.hpp" + +using namespace std; +using namespace chrono; +using namespace buffer_config; +using namespace stream_config; + + +ZmqPulseSyncReceiver::ZmqPulseSyncReceiver( + void * ctx, + const string& detector_name, + const int n_modules) : + ctx_(ctx), + n_modules_(n_modules) +{ + sockets_.reserve(n_modules_); + + for (int i=0; i::max();; + uint64_t max_pulse_id = 0; + + for (int i = 0; i < n_modules_; i++) { + min_pulse_id = min(min_pulse_id, pulses[i]); + max_pulse_id = max(max_pulse_id, pulses[i]); + } + + auto max_diff = max_pulse_id - min_pulse_id; + if (max_diff > PULSE_OFFSET_LIMIT) { + stringstream err_msg; + err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]"; + err_msg << " PULSE_OFFSET_LIMIT exceeded."; + err_msg << " max_diff=" << max_diff << " pulses."; + + for (int i = 0; i < n_modules_; i++) { + err_msg << " (module " << i << ", "; + err_msg << pulses[i] << "),"; + } + err_msg << endl; + + throw runtime_error(err_msg.str()); + } + + modules_in_sync = true; + // Max pulses we lost in this sync attempt. + uint32_t i_sync_lost_pulses = 0; + for (int i = 0; i < n_modules_; i++) { + // How many pulses we lost for this specific module. + uint32_t i_module_lost_pulses = 0; + while (pulses[i] < max_pulse_id) { + zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0); + i_module_lost_pulses++; + } + + i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses); + + if (pulses[i] != max_pulse_id) { + modules_in_sync = false; + } + } + n_lost_pulses += i_sync_lost_pulses; + + if (modules_in_sync) { + return {pulses[0], n_lost_pulses}; + } + } + + stringstream err_msg; + err_msg << "[ZmqLiveReceiver::get_next_pulse_id]"; + err_msg << " SYNC_RETRY_LIMIT exceeded."; + err_msg << endl; + + throw runtime_error(err_msg.str()); +} diff --git a/sf-stream/src/main.cpp b/sf-stream/src/main.cpp index 07e0ac3..dbe313a 100644 --- a/sf-stream/src/main.cpp +++ b/sf-stream/src/main.cpp @@ -1,95 +1,49 @@ #include #include -#include -#include #include +#include +#include +#include -#include "buffer_config.hpp" #include "stream_config.hpp" #include "ZmqLiveSender.hpp" -#include "ZmqLiveReceiver.hpp" using namespace std; -using namespace chrono; using namespace buffer_config; using namespace stream_config; int main (int argc, char *argv[]) { - if (argc != 2) { + if (argc != 3) { cout << endl; - cout << "Usage: sf_stream "; - cout << " [config_json_file]"; - cout << endl; - cout << "\tconfig_json_file: json file with the configuration " - "parameters(detector name, number of modules, pedestal and " - "gain files" << endl; + cout << "Usage: sf_stream [detector_json_filename]" + " [stream_name]" << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; cout << endl; exit(-1); } - auto config = read_json_config(string(argv[1])); - string RECV_IPC_URL = BUFFER_LIVE_IPC_URL + config.DETECTOR_NAME + "-"; - - ModuleFrameBuffer* meta = new ModuleFrameBuffer(); - char* data = new char[config.n_modules * MODULE_N_BYTES]; + const auto stream_name = string(argv[2]); + // TODO: Add stream_name to config reading - multiple stream definitions. + auto config = BufferUtils::read_json_config(string(argv[1])); auto ctx = zmq_ctx_new(); - zmq_ctx_set (ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); + zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); + auto receiver = BufferUtils::connect_socket( + ctx, config.detector_name, "assembler"); - ZmqLiveReceiver receiver(config.n_modules, ctx, RECV_IPC_URL); + RamBuffer ram_buffer(config.detector_name, config.n_modules); + StreamStats stats(config.detector_name, stream_name, STREAM_STATS_MODULO); ZmqLiveSender sender(ctx, config); - // TODO: Remove stats trash. - int stats_counter = 0; - size_t read_total_us = 0; - size_t read_max_us = 0; - size_t send_total_us = 0; - size_t send_max_us = 0; - + ImageMetadata meta; while (true) { - - auto start_time = steady_clock::now(); - - auto n_lost_pulses = receiver.get_next_image(meta, data); - - if (n_lost_pulses > 0) { - cout << "sf_stream:sync_lost_pulses " << n_lost_pulses << endl; - } - - auto end_time = steady_clock::now(); - size_t read_us_duration = duration_cast( - end_time - start_time).count(); - - start_time = steady_clock::now(); + zmq_recv(receiver, &meta, sizeof(meta), 0); + char* data = ram_buffer.read_image(meta.pulse_id); sender.send(meta, data); - end_time = steady_clock::now(); - size_t send_us_duration = duration_cast( - end_time - start_time).count(); - - // TODO: Some poor statistics. - stats_counter++; - read_total_us += read_us_duration; - send_total_us += send_us_duration; - - read_max_us = max(read_max_us, read_us_duration); - send_max_us = max(send_max_us, send_us_duration); - - if (stats_counter == STATS_MODULO) { - cout << "sf_stream:read_us " << read_total_us / STATS_MODULO; - cout << " sf_stream:read_max_us " << read_max_us; - cout << " sf_stream:send_us " << send_total_us / STATS_MODULO; - cout << " sf_stream:send_max_us " << send_max_us; - cout << endl; - - stats_counter = 0; - read_total_us = 0; - read_max_us = 0; - send_total_us = 0; - send_max_us = 0; - } + stats.record_stats(meta); } }