diff --git a/CMakeLists.txt b/CMakeLists.txt index 1fd9a7e..63c25dc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,6 +31,7 @@ add_subdirectory( add_subdirectory("core-buffer") add_subdirectory("jf-udp-recv") add_subdirectory("jf-buffer-writer") +add_subdirectory("jf-assembler") add_subdirectory("sf-stream") add_subdirectory("sf-writer") -#add_subdirectory("jf-live-writer") +add_subdirectory("jf-live-writer") diff --git a/core-buffer/include/BufferUtils.hpp b/core-buffer/include/BufferUtils.hpp index 835e602..e3ffcef 100644 --- a/core-buffer/include/BufferUtils.hpp +++ b/core-buffer/include/BufferUtils.hpp @@ -51,9 +51,14 @@ namespace BufferUtils void create_destination_folder(const std::string& output_file); void* bind_socket( - void* ctx, const std::string& detector_name, const int source_id); + void* ctx, + const std::string& detector_name, + const std::string& stream_name); + void* connect_socket( - void* ctx, const std::string& detector_name, const int source_id); + void* ctx, + const std::string& detector_name, + const std::string& stream_name); DetectorConfig read_json_config(const std::string& filename); } diff --git a/core-buffer/include/RamBuffer.hpp b/core-buffer/include/RamBuffer.hpp index 45b67a1..91872cb 100644 --- a/core-buffer/include/RamBuffer.hpp +++ b/core-buffer/include/RamBuffer.hpp @@ -30,7 +30,9 @@ public: const uint64_t module_id, ModuleFrame &meta, char *data) const; - char* read_image(const uint64_t pulse_id, ImageMetadata &image_meta) const; + char* read_image(const uint64_t pulse_id) const; + void assemble_image( + const uint64_t pulse_id, ImageMetadata &image_meta) const; }; diff --git a/core-buffer/include/buffer_config.hpp b/core-buffer/include/buffer_config.hpp index 73c97b5..b2e68ca 100644 --- a/core-buffer/include/buffer_config.hpp +++ b/core-buffer/include/buffer_config.hpp @@ -20,15 +20,17 @@ namespace buffer_config { const size_t FOLDER_MOD = 100000; // Extension of our file format. const std::string FILE_EXTENSION = ".bin"; - // Number of pulses between each statistics print out. - const size_t STATS_MODULO = 100; + // Number of pulses between each statistics print out (buffer_writer, stream2vis...) + const size_t STATS_MODULO = 1000; + // Number of seconds after which statistics is print out (udp_recv) + const size_t STATS_TIME = 10; // If the RB is empty, how much time to wait before trying to read it again. const size_t RB_READ_RETRY_INTERVAL_MS = 5; // How many frames to read at once from file. const size_t BUFFER_BLOCK_SIZE = 100; - const size_t BUFFER_UDP_N_RECV_MSG = 64; + const size_t BUFFER_UDP_N_RECV_MSG = 128; // Size of UDP recv buffer const int BUFFER_UDP_RCVBUF_N_SLOTS = 100; // 8246 bytes for each UDP packet. diff --git a/core-buffer/src/BufferUtils.cpp b/core-buffer/src/BufferUtils.cpp index 9358930..7fd100b 100644 --- a/core-buffer/src/BufferUtils.cpp +++ b/core-buffer/src/BufferUtils.cpp @@ -73,11 +73,11 @@ void BufferUtils::create_destination_folder(const string& output_file) } void* BufferUtils::connect_socket( - void* ctx, const string& detector_name, const int source_id) + void* ctx, const string& detector_name, const string& stream_name) { string ipc_address = BUFFER_LIVE_IPC_URL + detector_name + "-" + - to_string(source_id); + stream_name; void* socket = zmq_socket(ctx, ZMQ_SUB); if (socket == nullptr) { @@ -106,11 +106,11 @@ void* BufferUtils::connect_socket( } void* BufferUtils::bind_socket( - void* ctx, const string& detector_name, const int source_id) + void* ctx, const string& detector_name, const string& stream_name) { string ipc_address = BUFFER_LIVE_IPC_URL + detector_name + "-" + - to_string(source_id); + stream_name; void* socket = zmq_socket(ctx, ZMQ_PUB); diff --git a/core-buffer/src/RamBuffer.cpp b/core-buffer/src/RamBuffer.cpp index 13405f9..e03016e 100644 --- a/core-buffer/src/RamBuffer.cpp +++ b/core-buffer/src/RamBuffer.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include "RamBuffer.hpp" #include "buffer_config.hpp" @@ -118,15 +119,12 @@ void RamBuffer::read_frame( memcpy(dst_data, src_data, MODULE_N_BYTES); } -char* RamBuffer::read_image(const uint64_t pulse_id, - ImageMetadata &image_meta) const +void RamBuffer::assemble_image( + const uint64_t pulse_id, ImageMetadata &image_meta) const { const size_t slot_n = pulse_id % n_slots_; - ModuleFrame *src_meta = meta_buffer_ + (n_modules_ * slot_n); - char *src_data = image_buffer_ + (image_bytes_ * slot_n); - auto is_pulse_init = false; auto is_good_image = true; @@ -153,7 +151,21 @@ char* RamBuffer::read_image(const uint64_t pulse_id, cout << endl; #endif if (frame_meta->pulse_id != pulse_id) { - throw runtime_error("Wrong pulse_id in ram buffer slot."); + stringstream err_msg; + err_msg << "[RamBuffer::read_image]"; + err_msg << " Unexpected pulse_id in ram buffer."; + err_msg << " expected=" << pulse_id; + err_msg << " got=" << frame_meta->pulse_id; + + for (int i = 0; i < n_modules_; i++) { + ModuleFrame *meta = src_meta + i_module; + + err_msg << " (module " << i << ", "; + err_msg << meta->pulse_id << "),"; + } + err_msg << endl; + + throw runtime_error(err_msg.str()); } image_meta.pulse_id = frame_meta->pulse_id; @@ -186,7 +198,12 @@ char* RamBuffer::read_image(const uint64_t pulse_id, image_meta.frame_index = 0; image_meta.daq_rec = 0; } +} + +char* RamBuffer::read_image(const uint64_t pulse_id) const +{ + const size_t slot_n = pulse_id % n_slots_; + char *src_data = image_buffer_ + (image_bytes_ * slot_n); return src_data; } - diff --git a/core-buffer/test/CMakeLists.txt b/core-buffer/test/CMakeLists.txt index bf61f07..aa1173a 100644 --- a/core-buffer/test/CMakeLists.txt +++ b/core-buffer/test/CMakeLists.txt @@ -4,7 +4,5 @@ target_link_libraries(core-buffer-tests core-buffer-lib external rt - hdf5 - hdf5_cpp zmq gtest) diff --git a/core-buffer/test/test_RamBuffer.cpp b/core-buffer/test/test_RamBuffer.cpp index e2975b1..aa7c8f2 100644 --- a/core-buffer/test/test_RamBuffer.cpp +++ b/core-buffer/test/test_RamBuffer.cpp @@ -29,7 +29,7 @@ TEST(RamBuffer, simple_store) } ImageMetadata image_meta; - buffer.read_image(frame_meta.pulse_id, image_meta); + buffer.assemble_image(frame_meta.pulse_id, image_meta); ASSERT_EQ(image_meta.pulse_id, frame_meta.pulse_id); ASSERT_EQ(image_meta.daq_rec, frame_meta.daq_rec); ASSERT_EQ(image_meta.frame_index, frame_meta.frame_index); diff --git a/jf-assembler/CMakeLists.txt b/jf-assembler/CMakeLists.txt new file mode 100644 index 0000000..95e755c --- /dev/null +++ b/jf-assembler/CMakeLists.txt @@ -0,0 +1,21 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(jf-assembler-lib STATIC ${SOURCES}) +target_include_directories(jf-assembler-lib PUBLIC include/) +target_link_libraries(jf-assembler-lib + external + core-buffer-lib) + +add_executable(jf-assembler src/main.cpp) +set_target_properties(jf-assembler PROPERTIES OUTPUT_NAME jf_assembler) +target_link_libraries(jf-assembler + external + core-buffer-lib + jf-assembler-lib + zmq + pthread + rt) + +enable_testing() +add_subdirectory(test/) diff --git a/jf-assembler/README.md b/jf-assembler/README.md new file mode 100644 index 0000000..ce21a4d --- /dev/null +++ b/jf-assembler/README.md @@ -0,0 +1,179 @@ +# sf-stream +sf-stream is the component that receives a live stream of frame data from +sf-buffers over ZMQ and assembles them into images. This images are then +sent again over ZMQ to external components. There is always only 1 sf-stream +per detector. + +It currently has 3 output streams: + +- **Full data full meta** rate stream (send all images and meta) +- **Reduced data full meta** rate stream (send less images, but +all meta) +- **Pulse_id** stream (send only the current pulse_id) + +In addition to receiving and assembling images, sf-stream also calculates +additional meta and constructs the structures needed to send data in +Array 1.0 protocol. + +This component does not guarantee that the streams will always contain all +the data - it can happen that frame resynchronization is needed, and in this +case 1 or more frames could potentially be lost. This happens so rarely that in +practice is not a problem. + +## Overview + +![image_stream_overview](../docs/sf_daq_buffer-overview-stream.jpg) + +sf-stream is a single threaded application (without counting the ZMQ IO threads) +that is used for providing live assembled images to anyone willing to listen. + +In addition, it also provides a pulse_id stream, which is the most immediate +pulse_id feedback we currently have in case we need to synchronize external +components to the current machine pulse_id. + +## ZMQ receiving +Each ZMQ stream is coming from a separate sf-buffer. This means that we have as +many connections as we have modules in a detector. + +Messages are multipart (2 parts) and are received in PUB/SUB mode. + +There is no need for special synchronization between modules as we expect that +frames will always be in the correct order and all modules will provide the +same frame more or less at the same time. If any of this 2 conditions is not +met, the detector is not working properly and we cannot guaranty that sf-stream +will work correctly. + +Nonetheless we provide the capability to synchronize the streams in image +assembly phase - this is needed rarely, but occasionally happens. In this sort +of hiccups we usually loose only a couple of consecutive images. + +### Messages format +Each message is composed by 2 parts: + +- Serialization of ModuleFrame in the first part. +- Frame data in the second part. + +Module frame is defined as: +```c++ +#pragma pack(push) +#pragma pack(1) +struct ModuleFrame { + uint64_t pulse_id; + uint64_t frame_index; + uint64_t daq_rec; + uint64_t n_recv_packets; + uint64_t module_id; +}; +#pragma pack(pop) +``` + +The frame data is a 1MB (1024*512 pixels * 2 bytes/pixel) blob of data in +**uint16** representing the detector image. + +## Image assembly +We first synchronize the modules. We do this by reading all sockets and +deciding the largest frame pulse_id among them (max_pulse_id). We then calculate +the diff between a specific socket pulse_id and the max_pulse_id. +This difference tells us how many messages we need to discard from a specific socket. + +This discarding is the source of possible missing images in the output stream. +It can happen in 3 cases: + +- At least one of the detector modules did not sent any packets for the specific +pulse_id. +- All the packets from a specific module for a pulse_id were lost before UDP +receiving them. +- ZMQ HWM was reached (either on the sf-buffer or sf-stream) and the message was +dropped. + +All this 3 cases are highly unlikely, so synchronization is mostly needed when +first starting sf-stream. Different sockets connect to sf-buffers at different +times. Apart from the initial synchronization there should be no need to +re-synchronize modules in a healthy running environment. + +If an image is missing any ZMQ messages from sf-buffers (not all modules data +arrived), the image will be dropped. We do not do partial reconstruction in +sf-stream. However, it is important to note, that this does not cover the case +where frames are incomplete (missing UDP packets on sf-buffer) - we still +assemble this images as long as at least 1 packet/frame for a specific pulse_id +arrived. + +## ZMQ sending + +We devide the ZMQ sending to 3 types of stream: + +- Data processing stream. This is basically the complete stream from +the detector with all meta and data. It can be described as full data full +meta stream. Only 1 client at the time can be connected to this stream +(PUSH/PULL for load balancing). + +- Live viewing stream. This is a reduced data full meta stream. We send +meta for all frames, but data only for subset of them (10Hz, for example). +Any number of clients can connect to the 10Hz stream, because we use PUB/SUB +for this socket. + +- Pulse_id stream. This is a stream that sends out only the current pulse_id. +It can be used to synchronize any external system with the current pulse_id +being recorded. Multiple clients can connect to this stream. + +In the data processing and live viewing stream we use +[Array 1.0](https://github.com/paulscherrerinstitute/htypes/blob/master/array-1.0.md) +as our protocol to be compatible with currently available external components. + +We use following fields in the JSON header: + +| Name | Type | Comment | +| --- | --- | --- | +| pulse_id | uint64 |bunchid from detector header| +|frame|uint64|frame_index from detector header| +|is_good_frame|bool|true if all packets for this frame are present| +|daq_rec|uint32|daqrec from detector header| +|pedestal_file|string|Path to pedestal file| +|gain_file|string|Path to gain file| +|number_frames_expected|int|Number of expected frames| +|run_name|string|Name of the run| +|detector_name|string|Name of the detector| +|htype|string|Value: "array-1.0"| +|type|string|Value: "uint16"| +|shape|Array[uint64]|Shape of the image in stream| + +### Full data full meta stream + +This stream runs at detector frequency and uses PUSH/PULL to distribute data +to max 1 client (this client can have many processes, but it needs to be a +single logical entity, since the images are evenly distributed to all +connected sockets). + +![image_full_stream](../docs/sf_daq_buffer-FullStream.jpg) + +The goal here is to provide a complete copy of the detector image stream +for purposes of online analysis. Given the large amount of data on this +stream only "pre-approved" applications that can handle the load should be +attached here. + +### Reduced data full meta stream + +This streams also runs at detector frequency for JSON headers (meta), but +it sends only part of the images in the stream. The rest of the images are +sent as empty buffers (the receiver needs to be aware of this behaviour, as +Array 1.0 alone does not define it). + +![image_reduced_stream](../docs/sf_daq_buffer-ReducedStream.jpg) + +This is the lightweight version of the image stream. Any number of clients +can connect to this stream (PUB/SUB) but no client can do load +balancing automatically (it would require PUSH/PULL). + +This is a "public interface" for anyone who wants to get detector data live, +and can do with only a subset of images. + +### Pulse_id stream + +This stream runs ar detector frequency in PUB/SUB mode. The only thing it +does is sends out the pulse_id (of the just received image) in uint64_t +format. + +![image_pulse_stream](../docs/sf_daq_buffer-PulseStream.jpg) + +This is also a "public interface" for anyone who wants to get the current +system pulse_id. \ No newline at end of file diff --git a/jf-assembler/include/AssemblerStats.hpp b/jf-assembler/include/AssemblerStats.hpp new file mode 100644 index 0000000..a8267a6 --- /dev/null +++ b/jf-assembler/include/AssemblerStats.hpp @@ -0,0 +1,28 @@ +#ifndef SF_DAQ_BUFFER_ASSEMBLERSTATS_HPP +#define SF_DAQ_BUFFER_ASSEMBLERSTATS_HPP + +#include +#include +#include + +class AssemblerStats { + const std::string detector_name_; + const size_t stats_modulo_; + + int image_counter_; + int n_corrupted_images_; + int n_sync_lost_images_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + AssemblerStats(const std::string &detector_name, + const size_t stats_modulo); + + void record_stats(const ImageMetadata &meta, const uint32_t n_lost_pulses); +}; + + +#endif //SF_DAQ_BUFFER_ASSEMBLERSTATS_HPP diff --git a/jf-assembler/include/ZmqPulseSyncReceiver.hpp b/jf-assembler/include/ZmqPulseSyncReceiver.hpp new file mode 100644 index 0000000..624de34 --- /dev/null +++ b/jf-assembler/include/ZmqPulseSyncReceiver.hpp @@ -0,0 +1,34 @@ +#ifndef SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP +#define SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP + + +#include +#include +#include + +#include "formats.hpp" + +struct PulseAndSync { + const uint64_t pulse_id; + const uint32_t n_lost_pulses; +}; + +class ZmqPulseSyncReceiver { + + void* ctx_; + const int n_modules_; + + std::vector sockets_; + +public: + ZmqPulseSyncReceiver( + void* ctx, + const std::string& detector_name, + const int n_modules); + ~ZmqPulseSyncReceiver(); + + PulseAndSync get_next_pulse_id() const; +}; + + +#endif //SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP diff --git a/jf-assembler/include/assembler_config.hpp b/jf-assembler/include/assembler_config.hpp new file mode 100644 index 0000000..b0e277d --- /dev/null +++ b/jf-assembler/include/assembler_config.hpp @@ -0,0 +1,14 @@ +namespace assembler_config +{ + // N of IO threads to send image metadata. + const int ASSEMBLER_ZMQ_IO_THREADS = 1; + + // If the modules are offset more than 1000 pulses, crush. + const uint64_t PULSE_OFFSET_LIMIT = 100; + + // Number of times we try to re-sync in case of failure. + const int SYNC_RETRY_LIMIT = 3; + + // Number of pulses between each statistics print out. + const size_t ASSEMBLER_STATS_MODULO = 1000; +} diff --git a/jf-assembler/src/AssemblerStats.cpp b/jf-assembler/src/AssemblerStats.cpp new file mode 100644 index 0000000..e295da6 --- /dev/null +++ b/jf-assembler/src/AssemblerStats.cpp @@ -0,0 +1,62 @@ +#include "AssemblerStats.hpp" + +#include + +using namespace std; +using namespace chrono; + +AssemblerStats::AssemblerStats( + const std::string &detector_name, + const size_t stats_modulo) : + detector_name_(detector_name), + stats_modulo_(stats_modulo) +{ + reset_counters(); +} + +void AssemblerStats::reset_counters() +{ + image_counter_ = 0; + n_sync_lost_images_ = 0; + n_corrupted_images_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void AssemblerStats::record_stats( + const ImageMetadata &meta, const uint32_t n_lost_pulses) +{ + image_counter_++; + n_sync_lost_images_ += n_lost_pulses; + + if (!meta.is_good_image) { + n_corrupted_images_++; + } + + if (image_counter_ == stats_modulo_) { + print_stats(); + reset_counters(); + } +} + +void AssemblerStats::print_stats() +{ + auto interval_ms_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((image_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "jf_assembler"; + cout << ",detector_name=" << detector_name_; + cout << " "; + cout << "n_processed_images=" << image_counter_ << "i"; + cout << ",n_corrupted_images=" << n_corrupted_images_ << "i"; + cout << ",n_sync_lost_images=" << n_sync_lost_images_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} + diff --git a/jf-assembler/src/ZmqPulseSyncReceiver.cpp b/jf-assembler/src/ZmqPulseSyncReceiver.cpp new file mode 100644 index 0000000..6dbe2fc --- /dev/null +++ b/jf-assembler/src/ZmqPulseSyncReceiver.cpp @@ -0,0 +1,115 @@ +#include "ZmqPulseSyncReceiver.hpp" +#include "BufferUtils.hpp" + +#include +#include +#include +#include +#include +#include + +#include "assembler_config.hpp" + +using namespace std; +using namespace chrono; +using namespace buffer_config; +using namespace assembler_config; + + +ZmqPulseSyncReceiver::ZmqPulseSyncReceiver( + void * ctx, + const string& detector_name, + const int n_modules) : + ctx_(ctx), + n_modules_(n_modules) +{ + sockets_.reserve(n_modules_); + + for (int i=0; i::max();; + uint64_t max_pulse_id = 0; + + for (int i = 0; i < n_modules_; i++) { + min_pulse_id = min(min_pulse_id, pulses[i]); + max_pulse_id = max(max_pulse_id, pulses[i]); + } + + auto max_diff = max_pulse_id - min_pulse_id; + if (max_diff > PULSE_OFFSET_LIMIT) { + stringstream err_msg; + err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]"; + err_msg << " PULSE_OFFSET_LIMIT exceeded."; + err_msg << " max_diff=" << max_diff << " pulses."; + + for (int i = 0; i < n_modules_; i++) { + err_msg << " (module " << i << ", "; + err_msg << pulses[i] << "),"; + } + err_msg << endl; + + throw runtime_error(err_msg.str()); + } + + modules_in_sync = true; + // Max pulses we lost in this sync attempt. + uint32_t i_sync_lost_pulses = 0; + for (int i = 0; i < n_modules_; i++) { + // How many pulses we lost for this specific module. + uint32_t i_module_lost_pulses = 0; + while (pulses[i] < max_pulse_id) { + zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0); + i_module_lost_pulses++; + } + + i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses); + + if (pulses[i] != max_pulse_id) { + modules_in_sync = false; + } + } + n_lost_pulses += i_sync_lost_pulses; + + if (modules_in_sync) { + return {pulses[0], n_lost_pulses}; + } + } + + stringstream err_msg; + err_msg << "[ZmqLiveReceiver::get_next_pulse_id]"; + err_msg << " SYNC_RETRY_LIMIT exceeded."; + err_msg << endl; + + throw runtime_error(err_msg.str()); +} diff --git a/jf-assembler/src/main.cpp b/jf-assembler/src/main.cpp new file mode 100644 index 0000000..e1b76ab --- /dev/null +++ b/jf-assembler/src/main.cpp @@ -0,0 +1,47 @@ +#include +#include +#include +#include +#include +#include + +#include "assembler_config.hpp" +#include "ZmqPulseSyncReceiver.hpp" + +using namespace std; +using namespace buffer_config; +using namespace assembler_config; + +int main (int argc, char *argv[]) +{ + if (argc != 2) { + cout << endl; + cout << "Usage: jf_assembler [detector_json_filename]" << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; + cout << endl; + + exit(-1); + } + + auto config = BufferUtils::read_json_config(string(argv[1])); + auto const stream_name = "assembler"; + + auto ctx = zmq_ctx_new(); + zmq_ctx_set(ctx, ZMQ_IO_THREADS, ASSEMBLER_ZMQ_IO_THREADS); + auto sender = BufferUtils::bind_socket( + ctx, config.detector_name, stream_name); + + ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules); + RamBuffer ram_buffer(config.detector_name, config.n_modules); + AssemblerStats stats(config.detector_name, ASSEMBLER_STATS_MODULO); + + ImageMetadata meta; + while (true) { + auto pulse_and_sync = receiver.get_next_pulse_id(); + ram_buffer.assemble_image(pulse_and_sync.pulse_id, meta); + + zmq_send(sender, &meta, sizeof(meta), 0); + + stats.record_stats(meta, pulse_and_sync.n_lost_pulses); + } +} diff --git a/jf-assembler/test/CMakeLists.txt b/jf-assembler/test/CMakeLists.txt new file mode 100644 index 0000000..bb240e7 --- /dev/null +++ b/jf-assembler/test/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(jf-assembler-tests main.cpp) + +target_link_libraries(jf-assembler-tests + jf-assembler-lib + gtest + ) + diff --git a/jf-assembler/test/main.cpp b/jf-assembler/test/main.cpp new file mode 100644 index 0000000..e819294 --- /dev/null +++ b/jf-assembler/test/main.cpp @@ -0,0 +1,8 @@ +#include "gtest/gtest.h" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/jf-buffer-writer/src/BufferStats.cpp b/jf-buffer-writer/src/BufferStats.cpp index 325e930..173a35c 100644 --- a/jf-buffer-writer/src/BufferStats.cpp +++ b/jf-buffer-writer/src/BufferStats.cpp @@ -51,7 +51,7 @@ void BufferStats::print_stats() system_clock::now()).time_since_epoch().count(); // Output in InfluxDB line protocol - cout << "jf-buffer-writer"; + cout << "jf_buffer_writer"; cout << ",detector_name=" << detector_name_; cout << ",module_name=M" << module_id_; cout << " "; diff --git a/jf-buffer-writer/src/main.cpp b/jf-buffer-writer/src/main.cpp index e1eb961..cd65986 100644 --- a/jf-buffer-writer/src/main.cpp +++ b/jf-buffer-writer/src/main.cpp @@ -39,7 +39,8 @@ int main (int argc, char *argv[]) { BufferStats stats(config.detector_name, module_id, STATS_MODULO); auto ctx = zmq_ctx_new(); - auto socket = connect_socket(ctx, config.detector_name, module_id); + auto socket = connect_socket( + ctx, config.detector_name, to_string(module_id)); auto file_buff = new BufferBinaryFormat(); uint64_t pulse_id; diff --git a/jf-live-writer/include/BinaryReader.hpp b/jf-live-writer/include/BinaryReader.hpp deleted file mode 100644 index 85d2a0c..0000000 --- a/jf-live-writer/include/BinaryReader.hpp +++ /dev/null @@ -1,28 +0,0 @@ -#ifndef SF_DAQ_BUFFER_BINARYREADER_HPP -#define SF_DAQ_BUFFER_BINARYREADER_HPP - - -#include - -class BinaryReader { - - const std::string detector_folder_; - const std::string module_name_; - - std::string current_input_file_; - int input_file_fd_; - - void open_file(const std::string& filename); - void close_current_file(); - -public: - BinaryReader(const std::string &detector_folder, - const std::string &module_name); - - ~BinaryReader(); - - void get_frame(const uint64_t pulse_id, BufferBinaryFormat *buffer); -}; - - -#endif //SF_DAQ_BUFFER_BINARYREADER_HPP diff --git a/jf-live-writer/include/ImageBinaryWriter.hpp b/jf-live-writer/include/ImageBinaryWriter.hpp new file mode 100644 index 0000000..8e6ebfb --- /dev/null +++ b/jf-live-writer/include/ImageBinaryWriter.hpp @@ -0,0 +1,33 @@ +#ifndef BINARYWRITER_HPP +#define BINARYWRITER_HPP + +#include + +#include "formats.hpp" + +class ImageBinaryWriter { + + const size_t MAX_FILE_BYTES = + buffer_config::FILE_MOD * sizeof(BufferBinaryFormat); + + const std::string detector_folder_; + std::string latest_filename_; + + std::string current_output_filename_; + int output_file_fd_; + + void open_file(const std::string& filename); + void close_current_file(); + + +public: + ImageBinaryWriter(const std::string& detector_folder); + + virtual ~ImageBinaryWriter(); + + void write(const uint64_t pulse_id, const BufferBinaryFormat* buffer); + +}; + + +#endif //BINARYWRITER_HPP diff --git a/jf-live-writer/include/JFH5LiveWriter.hpp b/jf-live-writer/include/JFH5LiveWriter.hpp deleted file mode 100644 index a417631..0000000 --- a/jf-live-writer/include/JFH5LiveWriter.hpp +++ /dev/null @@ -1,49 +0,0 @@ -#ifndef SFWRITER_HPP -#define SFWRITER_HPP - -#include -#include -#include - -#include "LiveImageAssembler.hpp" - -const auto& H5_UINT64 = H5::PredType::NATIVE_UINT64; -const auto& H5_UINT32 = H5::PredType::NATIVE_UINT32; -const auto& H5_UINT16 = H5::PredType::NATIVE_UINT16; -const auto& H5_UINT8 = H5::PredType::NATIVE_UINT8; - -class JFH5LiveWriter { - - const std::string detector_name_; - const size_t n_modules_; - const size_t n_pulses_; - - size_t write_index_; - - H5::H5File file_; - H5::DataSet image_dataset_; - - uint64_t* b_pulse_id_; - uint64_t* b_frame_index_; - uint32_t* b_daq_rec_; - uint8_t* b_is_good_frame_ ; - - void init_file(const std::string &output_file); - void write_dataset(const std::string name, - const void *buffer, - const H5::PredType &type); - void write_metadata(); - std::string get_detector_name(const std::string& detector_folder); - - void close_file(); - -public: - JFH5LiveWriter(const std::string& output_file, - const std::string& detector_folder, - const size_t n_modules, - const size_t n_pulses); - ~JFH5LiveWriter(); - void write(const ImageMetadata* metadata, const char* data); -}; - -#endif //SFWRITER_HPP diff --git a/jf-live-writer/include/LiveImageAssembler.hpp b/jf-live-writer/include/LiveImageAssembler.hpp deleted file mode 100644 index 5bcb749..0000000 --- a/jf-live-writer/include/LiveImageAssembler.hpp +++ /dev/null @@ -1,51 +0,0 @@ -#ifndef SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP -#define SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP - -#include - -#include "buffer_config.hpp" -#include "formats.hpp" - -const uint64_t IA_EMPTY_SLOT_VALUE = 0; - -struct ImageMetadata -{ - uint64_t pulse_id; - uint64_t frame_index; - uint32_t daq_rec; - uint8_t is_good_image; -}; - -class LiveImageAssembler { - const size_t n_modules_; - const size_t image_buffer_slot_n_bytes_; - - char* image_buffer_; - ImageMetadata* image_meta_buffer_; - ModuleFrame* frame_meta_buffer_; - std::atomic_int* buffer_status_; - std::atomic_uint64_t* buffer_pulse_id_; - - size_t get_data_offset(const uint64_t slot_id, const int i_module); - size_t get_frame_metadata_offset(const uint64_t slot_id, const int i_module); - -public: - LiveImageAssembler(const size_t n_modules); - - virtual ~LiveImageAssembler(); - - bool is_slot_free(const uint64_t pulse_id); - bool is_slot_full(const uint64_t pulse_id); - - void process(const uint64_t pulse_id, - const int i_module, - const BufferBinaryFormat* block_buffer); - - void free_slot(const uint64_t pulse_id); - - ImageMetadata* get_metadata_buffer(const uint64_t pulse_id); - char* get_data_buffer(const uint64_t pulse_id); -}; - - -#endif //SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP diff --git a/jf-live-writer/include/WriterStats.hpp b/jf-live-writer/include/WriterStats.hpp new file mode 100644 index 0000000..cb023a7 --- /dev/null +++ b/jf-live-writer/include/WriterStats.hpp @@ -0,0 +1,30 @@ +#include +#include +#include + +#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP +#define SF_DAQ_BUFFER_FRAMESTATS_HPP + + +class WriterStats { + const std::string detector_name_; + size_t stats_modulo_; + + int image_counter_; + uint32_t total_buffer_write_us_; + uint32_t max_buffer_write_us_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + WriterStats( + const std::string &detector_name, + const size_t stats_modulo); + void start_image_write(); + void end_image_write(); +}; + + +#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP diff --git a/jf-live-writer/include/live_writer_config.hpp b/jf-live-writer/include/live_writer_config.hpp index 0a62457..76d9b05 100644 --- a/jf-live-writer/include/live_writer_config.hpp +++ b/jf-live-writer/include/live_writer_config.hpp @@ -2,8 +2,6 @@ namespace live_writer_config { - // MS to retry reading from the image assembler. - const size_t ASSEMBLER_RETRY_MS = 5; - // Number of slots in the reconstruction buffer. - const size_t WRITER_IA_N_SLOTS = 200; + // N of IO threads to receive data from modules. + const int LIVE_ZMQ_IO_THREADS = 1; } \ No newline at end of file diff --git a/jf-live-writer/src/BinaryReader.cpp b/jf-live-writer/src/BinaryReader.cpp deleted file mode 100644 index 07f8149..0000000 --- a/jf-live-writer/src/BinaryReader.cpp +++ /dev/null @@ -1,103 +0,0 @@ -#include "BinaryReader.hpp" - -#include -#include -#include -#include -#include - -#include "BufferUtils.hpp" -#include "buffer_config.hpp" - -using namespace std; -using namespace buffer_config; - -BinaryReader::BinaryReader( - const std::string &detector_folder, - const std::string &module_name) : - detector_folder_(detector_folder), - module_name_(module_name), - current_input_file_(""), - input_file_fd_(-1) -{} - -BinaryReader::~BinaryReader() -{ - close_current_file(); -} - -void BinaryReader::get_frame( - const uint64_t pulse_id, BufferBinaryFormat* buffer) -{ - - auto current_frame_file = BufferUtils::get_filename( - detector_folder_, module_name_, pulse_id); - - if (current_frame_file != current_input_file_) { - open_file(current_frame_file); - } - - size_t file_index = BufferUtils::get_file_frame_index(pulse_id); - size_t n_bytes_offset = file_index * sizeof(BufferBinaryFormat); - - auto lseek_result = lseek(input_file_fd_, n_bytes_offset, SEEK_SET); - if (lseek_result < 0) { - stringstream err_msg; - - err_msg << "[BinaryReader::get_frame]"; - err_msg << " Error while lseek on file "; - err_msg << current_input_file_ << " for n_bytes_offset "; - err_msg << n_bytes_offset << ": " << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - auto n_bytes = ::read(input_file_fd_, buffer, sizeof(BufferBinaryFormat)); - - if (n_bytes < sizeof(BufferBinaryFormat)) { - stringstream err_msg; - - err_msg << "[BinaryReader::get_block]"; - err_msg << " Error while reading from file "; - err_msg << current_input_file_ << ": " << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } -} - -void BinaryReader::open_file(const std::string& filename) -{ - close_current_file(); - - input_file_fd_ = open(filename.c_str(), O_RDONLY); - - if (input_file_fd_ < 0) { - stringstream err_msg; - - err_msg << "[BinaryReader::open_file]"; - err_msg << " Cannot open file " << filename << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - current_input_file_ = filename; -} - -void BinaryReader::close_current_file() -{ - if (input_file_fd_ != -1) { - if (close(input_file_fd_) < 0) { - stringstream err_msg; - - err_msg << "[BinaryWriter::close_current_file]"; - err_msg << " Error while closing file " << current_input_file_; - err_msg << ": " << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - input_file_fd_ = -1; - current_input_file_ = ""; - } -} diff --git a/jf-live-writer/src/ImageBinaryWriter.cpp b/jf-live-writer/src/ImageBinaryWriter.cpp new file mode 100644 index 0000000..c3f70f5 --- /dev/null +++ b/jf-live-writer/src/ImageBinaryWriter.cpp @@ -0,0 +1,165 @@ +#include "ImageBinaryWriter.hpp" + +#include +#include +#include "date.h" +#include +#include +#include +#include + +#include "BufferUtils.hpp" + +using namespace std; + +ImageBinaryWriter::ImageBinaryWriter( + const string& detector_folder): + detector_folder_(detector_folder), + latest_filename_(detector_folder + "/LATEST"), + current_output_filename_(""), + output_file_fd_(-1) +{ +} + +ImageBinaryWriter::~ImageBinaryWriter() +{ + close_current_file(); +} + +void ImageBinaryWriter::write( + const uint64_t pulse_id, + const BufferBinaryFormat* buffer) +{ + auto current_frame_file = + BufferUtils::get_filename(detector_folder_, module_name_, pulse_id); + + if (current_frame_file != current_output_filename_) { + open_file(current_frame_file); + } + + size_t n_bytes_offset = + BufferUtils::get_file_frame_index(pulse_id) * + sizeof(BufferBinaryFormat); + + auto lseek_result = lseek(output_file_fd_, n_bytes_offset, SEEK_SET); + if (lseek_result < 0) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BufferBinaryWriter::write]"; + err_msg << " Error while lseek on file "; + err_msg << current_output_filename_; + err_msg << " for n_bytes_offset "; + err_msg << n_bytes_offset << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } + + auto n_bytes = ::write(output_file_fd_, buffer, sizeof(BufferBinaryFormat)); + if (n_bytes < sizeof(BufferBinaryFormat)) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BufferBinaryWriter::write]"; + err_msg << " Error while writing to file "; + err_msg << current_output_filename_ << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } +} + +void ImageBinaryWriter::open_file(const std::string& filename) +{ + close_current_file(); + + BufferUtils::create_destination_folder(filename); + + output_file_fd_ = ::open(filename.c_str(), O_WRONLY | O_CREAT, + S_IRWXU | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH); + if (output_file_fd_ < 0) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BinaryWriter::open_file]"; + err_msg << " Cannot create file "; + err_msg << filename << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } + + // TODO: Remove context if test successful. + + /** Setting the buffer file size in advance to try to lower the number of + metadata updates on GPFS. */ + { + // TODO: Try instead to use fallocate. + if (lseek(output_file_fd_, MAX_FILE_BYTES, SEEK_SET) < 0) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BufferBinaryWriter::open_file]"; + err_msg << " Error while lseek on end of file "; + err_msg << current_output_filename_; + err_msg << " for MAX_FILE_BYTES "; + err_msg << MAX_FILE_BYTES << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } + + const uint8_t mark = 255; + if(::write(output_file_fd_, &mark, sizeof(mark)) != sizeof(mark)) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BufferBinaryWriter::open_file]"; + err_msg << " Error while writing to file "; + err_msg << current_output_filename_ << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } + } + + + current_output_filename_ = filename; +} + +void ImageBinaryWriter::close_current_file() +{ + if (output_file_fd_ != -1) { + if (close(output_file_fd_) < 0) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BufferBinaryWriter::close_current_file]"; + err_msg << " Error while closing file "; + err_msg << current_output_filename_ << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } + + output_file_fd_ = -1; + + BufferUtils::update_latest_file( + latest_filename_, current_output_filename_); + + current_output_filename_ = ""; + } +} \ No newline at end of file diff --git a/jf-live-writer/src/JFH5LiveWriter.cpp b/jf-live-writer/src/JFH5LiveWriter.cpp deleted file mode 100644 index 332fc6c..0000000 --- a/jf-live-writer/src/JFH5LiveWriter.cpp +++ /dev/null @@ -1,133 +0,0 @@ -#include "JFH5LiveWriter.hpp" - -#include -#include - - -#include "buffer_config.hpp" - -using namespace std; -using namespace buffer_config; - -JFH5LiveWriter::JFH5LiveWriter(const string& output_file, - const string& detector_folder, - const size_t n_modules, - const size_t n_pulses) : - detector_name_(get_detector_name(detector_folder)), - n_modules_(n_modules), - n_pulses_(n_pulses), - write_index_(0) -{ - b_pulse_id_ = new uint64_t[n_pulses_]; - b_frame_index_= new uint64_t[n_pulses_]; - b_daq_rec_ = new uint32_t[n_pulses_]; - b_is_good_frame_ = new uint8_t[n_pulses_]; - - init_file(output_file); -} - -void JFH5LiveWriter::init_file(const string& output_file) -{ - file_ = H5::H5File(output_file, H5F_ACC_TRUNC); - file_.createGroup("/data"); - file_.createGroup("/data/" + detector_name_); - - H5::DataSpace att_space(H5S_SCALAR); - H5::DataType data_type = H5::StrType(0, H5T_VARIABLE); - - file_.createGroup("/general"); - auto detector_dataset = file_.createDataSet( - "/general/detector_name", data_type ,att_space); - - detector_dataset.write(detector_name_, data_type); - - hsize_t image_dataset_dims[3] = - {n_pulses_, n_modules_ * MODULE_Y_SIZE, MODULE_X_SIZE}; - - H5::DataSpace image_dataspace(3, image_dataset_dims); - - hsize_t image_dataset_chunking[3] = - {1, n_modules_ * MODULE_Y_SIZE, MODULE_X_SIZE}; - H5::DSetCreatPropList image_dataset_properties; - image_dataset_properties.setChunk(3, image_dataset_chunking); - - image_dataset_ = file_.createDataSet( - "/data/" + detector_name_ + "/data", - H5::PredType::NATIVE_UINT16, - image_dataspace, - image_dataset_properties); -} - - -std::string JFH5LiveWriter::get_detector_name(const string& detector_folder) -{ - size_t last_separator; - if ((last_separator = detector_folder.rfind("/")) == string::npos) { - return detector_folder; - } - - return detector_folder.substr(last_separator + 1); -} - -JFH5LiveWriter::~JFH5LiveWriter() -{ - close_file(); - - delete[] b_pulse_id_; - delete[] b_frame_index_; - delete[] b_daq_rec_; - delete[] b_is_good_frame_; -} - -void JFH5LiveWriter::write_dataset( - const string name, const void* buffer, const H5::PredType& type) -{ - hsize_t b_m_dims[] = {n_pulses_}; - H5::DataSpace b_m_space (1, b_m_dims); - - hsize_t f_m_dims[] = {n_pulses_, 1}; - H5::DataSpace f_m_space(2, f_m_dims); - - auto complete_name = "/data/" + detector_name_ + "/" + name; - auto dataset = file_.createDataSet(complete_name, type, f_m_space); - - dataset.write(buffer, type, b_m_space, f_m_space); - - dataset.close(); -} - -void JFH5LiveWriter::write_metadata() -{ - write_dataset("pulse_id", &b_pulse_id_, H5::PredType::NATIVE_UINT64); - write_dataset("frame_index", &b_frame_index_, H5::PredType::NATIVE_UINT64); - write_dataset("daq_rec", &b_daq_rec_, H5::PredType::NATIVE_UINT32); - write_dataset("is_good_frame", &b_is_good_frame_, H5::PredType::NATIVE_UINT8); -} - -void JFH5LiveWriter::close_file() -{ - if (file_.getId() == -1) { - return; - } - - image_dataset_.close(); - - write_metadata(); - - file_.close(); -} - -void JFH5LiveWriter::write(const ImageMetadata* metadata, const char* data) -{ - hsize_t offset[] = {write_index_, 0, 0}; - - H5DOwrite_chunk(image_dataset_.getId(), H5P_DEFAULT, 0, - offset, MODULE_N_BYTES * n_modules_, data); - - b_pulse_id_[write_index_] = metadata->pulse_id; - b_frame_index_[write_index_] = metadata->frame_index; - b_daq_rec_[write_index_] = metadata->daq_rec; - b_is_good_frame_[write_index_] = metadata->is_good_image; - - write_index_++; -} diff --git a/jf-live-writer/src/LiveImageAssembler.cpp b/jf-live-writer/src/LiveImageAssembler.cpp deleted file mode 100644 index 57cf48b..0000000 --- a/jf-live-writer/src/LiveImageAssembler.cpp +++ /dev/null @@ -1,159 +0,0 @@ -#include - -#include "LiveImageAssembler.hpp" -#include "buffer_config.hpp" -#include "live_writer_config.hpp" - -using namespace std; -using namespace buffer_config; -using namespace live_writer_config; - -LiveImageAssembler::LiveImageAssembler(const size_t n_modules) : - n_modules_(n_modules), - image_buffer_slot_n_bytes_(MODULE_N_BYTES * n_modules_) -{ - image_buffer_ = new char[WRITER_IA_N_SLOTS * image_buffer_slot_n_bytes_]; - image_meta_buffer_ = new ImageMetadata[WRITER_IA_N_SLOTS]; - frame_meta_buffer_ = new ModuleFrame[WRITER_IA_N_SLOTS * n_modules]; - buffer_status_ = new atomic_int[WRITER_IA_N_SLOTS]; - buffer_pulse_id_ = new atomic_uint64_t[WRITER_IA_N_SLOTS]; - - for (size_t i=0; i < WRITER_IA_N_SLOTS; i++) { - free_slot(i); - } -} - -LiveImageAssembler::~LiveImageAssembler() -{ - delete[] image_buffer_; - delete[] image_meta_buffer_; -} - -bool LiveImageAssembler::is_slot_free(const uint64_t pulse_id) -{ - auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - - uint64_t slot_pulse_id = IA_EMPTY_SLOT_VALUE; - if (buffer_pulse_id_[slot_id].compare_exchange_strong( - slot_pulse_id, pulse_id)) { - return true; - } - - auto is_free = buffer_status_[slot_id].load(memory_order_relaxed) > 0; - return is_free && (slot_pulse_id == pulse_id); -} - -bool LiveImageAssembler::is_slot_full(const uint64_t pulse_id) -{ - auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - return buffer_status_[slot_id].load(memory_order_relaxed) == 0; -} - -size_t LiveImageAssembler::get_data_offset( - const uint64_t slot_id, const int i_module) -{ - size_t slot_i_offset = slot_id * image_buffer_slot_n_bytes_; - size_t module_i_offset = i_module * MODULE_N_BYTES; - - return slot_i_offset + module_i_offset; -} - -size_t LiveImageAssembler::get_frame_metadata_offset( - const uint64_t slot_id, const int i_module) -{ - size_t slot_m_offset = slot_id * n_modules_; - size_t module_m_offset = i_module; - - return slot_m_offset + module_m_offset; -} - -void LiveImageAssembler::process( - const uint64_t pulse_id, - const int i_module, - const BufferBinaryFormat* file_buffer) -{ - const auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - - auto frame_meta_offset = get_frame_metadata_offset(slot_id, i_module); - auto image_offset = get_data_offset(slot_id, i_module); - - memcpy( - &(frame_meta_buffer_[frame_meta_offset]), - &(file_buffer->metadata), - sizeof(file_buffer->metadata)); - - memcpy( - image_buffer_ + image_offset, - &(file_buffer->data[0]), - MODULE_N_BYTES); - - buffer_status_[slot_id].fetch_sub(1, memory_order_relaxed); -} - -void LiveImageAssembler::free_slot(const uint64_t pulse_id) -{ - auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - buffer_status_[slot_id].store(n_modules_, memory_order_relaxed); - buffer_pulse_id_[slot_id].store(IA_EMPTY_SLOT_VALUE, memory_order_relaxed); -} - -ImageMetadata* LiveImageAssembler::get_metadata_buffer(const uint64_t pulse_id) -{ - const auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - - ImageMetadata& image_meta = image_meta_buffer_[slot_id]; - - auto frame_meta_offset = get_frame_metadata_offset(slot_id, 0); - - auto is_pulse_init = false; - image_meta.is_good_image = 1; - image_meta.pulse_id = 0; - - for (size_t i_module=0; i_module < n_modules_; i_module++) { - - auto& frame_meta = frame_meta_buffer_[frame_meta_offset]; - frame_meta_offset += 1; - - auto is_good_frame = - frame_meta.n_recv_packets == JF_N_PACKETS_PER_FRAME; - - if (!is_good_frame) { - image_meta.pulse_id = 0; - continue; - } - - if (!is_pulse_init) { - image_meta.pulse_id = frame_meta.pulse_id; - image_meta.frame_index = frame_meta.frame_index; - image_meta.daq_rec = frame_meta.daq_rec; - - is_pulse_init = true; - } - - if (image_meta.is_good_image == 1) { - if (frame_meta.pulse_id != image_meta.pulse_id) { - image_meta.is_good_image = 0; - } - - if (frame_meta.frame_index != image_meta.frame_index) { - image_meta.is_good_image = 0; - } - - if (frame_meta.daq_rec != image_meta.daq_rec) { - image_meta.is_good_image = 0; - } - - if (frame_meta.n_recv_packets != JF_N_PACKETS_PER_FRAME) { - image_meta.is_good_image = 0; - } - } - } - - return &image_meta; -} - -char* LiveImageAssembler::get_data_buffer(const uint64_t pulse_id) -{ - auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - return image_buffer_ + (slot_id * image_buffer_slot_n_bytes_); -} diff --git a/jf-live-writer/src/WriterStats.cpp b/jf-live-writer/src/WriterStats.cpp new file mode 100644 index 0000000..1d67947 --- /dev/null +++ b/jf-live-writer/src/WriterStats.cpp @@ -0,0 +1,61 @@ +#include +#include "WriterStats.hpp" + +using namespace std; +using namespace chrono; + +WriterStats::WriterStats( + const string& detector_name, + const size_t stats_modulo) : + detector_name_(detector_name), + stats_modulo_(stats_modulo) +{ + reset_counters(); +} + +void WriterStats::reset_counters() +{ + image_counter_ = 0; + total_buffer_write_us_ = 0; + max_buffer_write_us_ = 0; +} + +void WriterStats::start_image_write() +{ + stats_interval_start_ = steady_clock::now(); +} + +void WriterStats::end_image_write() +{ + image_counter_++; + + uint32_t write_us_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + + total_buffer_write_us_ += write_us_duration; + max_buffer_write_us_ = max(max_buffer_write_us_, write_us_duration); + + if (image_counter_ == stats_modulo_) { + print_stats(); + reset_counters(); + } +} + +void WriterStats::print_stats() +{ + float avg_buffer_write_us = total_buffer_write_us_ / image_counter_; + + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "jf_buffer_writer"; + cout << ",detector_name=" << detector_name_; + cout << " "; + cout << "n_written_images=" << image_counter_ << "i"; + cout << " ,avg_buffer_write_us=" << avg_buffer_write_us; + cout << ",max_buffer_write_us=" << max_buffer_write_us_ << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} diff --git a/jf-live-writer/src/main.cpp b/jf-live-writer/src/main.cpp index 139a34f..1b912f8 100644 --- a/jf-live-writer/src/main.cpp +++ b/jf-live-writer/src/main.cpp @@ -1,195 +1,46 @@ #include #include -#include -#include -#include - -#include "zmq.h" +#include +#include +#include #include "live_writer_config.hpp" -#include "buffer_config.hpp" -#include "bitshuffle/bitshuffle.h" -#include "JFH5LiveWriter.hpp" -#include "LiveImageAssembler.hpp" -#include "BinaryReader.hpp" +#include "../../jf-buffer-writer/include/BufferStats.hpp" + using namespace std; -using namespace chrono; using namespace buffer_config; using namespace live_writer_config; -void read_buffer( - const string detector_folder, - const string module_name, - const int i_module, - const vector& pulse_ids_to_write, - LiveImageAssembler& image_assembler, - void* ctx) -{ - BinaryReader reader(detector_folder, module_name); - auto frame_buffer = new BufferBinaryFormat(); - - void* socket = zmq_socket(ctx, ZMQ_SUB); - if (socket == nullptr) { - throw runtime_error(zmq_strerror(errno)); - } - - int rcvhwm = 100; - if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - int linger = 0; - if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - // In milliseconds. - int rcvto = 2000; - if (zmq_setsockopt(socket, ZMQ_RCVTIMEO, &rcvto, sizeof(rcvto)) != 0 ){ - throw runtime_error(zmq_strerror(errno)); - } - - if (zmq_connect(socket, "tcp://127.0.0.1:51234") != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - if (zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - const uint64_t PULSE_ID_DELAY = 100; - - uint64_t live_pulse_id = pulse_ids_to_write.front(); - for (uint64_t pulse_id:pulse_ids_to_write) { - - while(!image_assembler.is_slot_free(pulse_id)) { - this_thread::sleep_for(chrono::milliseconds(ASSEMBLER_RETRY_MS)); - } - - auto start_time = steady_clock::now(); - - // Enforce a delay of 1 second for writing. - while (live_pulse_id - pulse_id < PULSE_ID_DELAY) { - if (zmq_recv(socket, &live_pulse_id, - sizeof(live_pulse_id), 0) == -1) { - if (errno == EAGAIN) { - throw runtime_error("Did not receive pulse_id in time."); - } else { - throw runtime_error(zmq_strerror(errno)); - } - } - } - - reader.get_frame(pulse_id, frame_buffer); - - auto end_time = steady_clock::now(); - uint64_t read_us_duration = duration_cast( - end_time-start_time).count(); - - start_time = steady_clock::now(); - - image_assembler.process(pulse_id, i_module, frame_buffer); - - end_time = steady_clock::now(); - uint64_t compose_us_duration = duration_cast( - end_time-start_time).count(); - - cout << "sf_writer:avg_read_us "; - cout << read_us_duration / BUFFER_BLOCK_SIZE << endl; - cout << "sf_writer:avg_assemble_us "; - cout << compose_us_duration / BUFFER_BLOCK_SIZE << endl; - } - - delete frame_buffer; -} - int main (int argc, char *argv[]) { - if (argc != 7) { + if (argc != 3) { cout << endl; - cout << "Usage: sf_writer [output_file] [detector_folder] [n_modules]"; - cout << " [start_pulse_id] [n_pulses] [pulse_id_step]"; - cout << endl; - cout << "\toutput_file: Complete path to the output file." << endl; - cout << "\tdetector_folder: Absolute path to detector buffer." << endl; - cout << "\tn_modules: number of modules" << endl; - cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; - cout << "\tn_pulses: Number of pulses to write." << endl; - cout << "\tpulse_id_step: 1==100Hz, 2==50hz, 4==25Hz.." << endl; + cout << "Usage: jf_live_writer [detector_json_filename]" + " [stream_name]" << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; cout << endl; exit(-1); } - string output_file = string(argv[1]); - const string detector_folder = string(argv[2]); - size_t n_modules = atoi(argv[3]); - uint64_t start_pulse_id = (uint64_t) atoll(argv[4]); - size_t n_pulses = (size_t) atoll(argv[5]); - int pulse_id_step = atoi(argv[6]); - - std::vector pulse_ids_to_write; - uint64_t i_pulse_id = start_pulse_id; - for (size_t i=0; i reading_threads(n_modules); - for (size_t i_module=0; i_module( - end_time-start_time).count(); - - image_assembler.free_slot(pulse_id); - - cout << "sf_writer:avg_write_us "; - cout << write_us_duration / BUFFER_BLOCK_SIZE << endl; - } - - for (auto& reading_thread : reading_threads) { - if (reading_thread.joinable()) { - reading_thread.join(); - } - } - - return 0; } diff --git a/jf-udp-recv/include/FrameStats.hpp b/jf-udp-recv/include/FrameStats.hpp index dd4ef95..7839a38 100644 --- a/jf-udp-recv/include/FrameStats.hpp +++ b/jf-udp-recv/include/FrameStats.hpp @@ -9,11 +9,12 @@ class FrameStats { const std::string detector_name_; const int module_id_; - size_t stats_modulo_; + size_t stats_time_; int frames_counter_; int n_missed_packets_; int n_corrupted_frames_; + int n_corrupted_pulse_id_; std::chrono::time_point stats_interval_start_; void reset_counters(); @@ -22,8 +23,8 @@ class FrameStats { public: FrameStats(const std::string &detector_name, const int module_id, - const size_t stats_modulo); - void record_stats(const ModuleFrame &meta); + const size_t stats_time); + void record_stats(const ModuleFrame &meta, const bool bad_pulse_id); }; diff --git a/jf-udp-recv/src/FrameStats.cpp b/jf-udp-recv/src/FrameStats.cpp index 95d1931..5cc33ab 100644 --- a/jf-udp-recv/src/FrameStats.cpp +++ b/jf-udp-recv/src/FrameStats.cpp @@ -7,10 +7,10 @@ using namespace chrono; FrameStats::FrameStats( const std::string &detector_name, const int module_id, - const size_t stats_modulo) : + const size_t stats_time) : detector_name_(detector_name), module_id_(module_id), - stats_modulo_(stats_modulo) + stats_time_(stats_time) { reset_counters(); } @@ -20,11 +20,17 @@ void FrameStats::reset_counters() frames_counter_ = 0; n_missed_packets_ = 0; n_corrupted_frames_ = 0; + n_corrupted_pulse_id_ = 0; stats_interval_start_ = steady_clock::now(); } -void FrameStats::record_stats(const ModuleFrame &meta) +void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) { + + if (bad_pulse_id) { + n_corrupted_pulse_id_++; + } + if (meta.n_recv_packets < N_PACKETS_PER_FRAME) { n_missed_packets_ += N_PACKETS_PER_FRAME - meta.n_recv_packets; n_corrupted_frames_++; @@ -32,7 +38,10 @@ void FrameStats::record_stats(const ModuleFrame &meta) frames_counter_++; - if (frames_counter_ == stats_modulo_) { + auto time_passed = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + + if (time_passed >= stats_time_*1000) { print_stats(); reset_counters(); } @@ -48,13 +57,14 @@ void FrameStats::print_stats() system_clock::now()).time_since_epoch().count(); // Output in InfluxDB line protocol - cout << "jf-udp-recv"; + cout << "jf_udp_recv"; cout << ",detector_name=" << detector_name_; cout << ",module_name=M" << module_id_; cout << " "; cout << "n_missed_packets=" << n_missed_packets_ << "i"; cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; cout << ",repetition_rate=" << rep_rate << "i"; + cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; cout << " "; cout << timestamp; cout << endl; diff --git a/jf-udp-recv/src/FrameUdpReceiver.cpp b/jf-udp-recv/src/FrameUdpReceiver.cpp index 2a5f47a..81efbe6 100644 --- a/jf-udp-recv/src/FrameUdpReceiver.cpp +++ b/jf-udp-recv/src/FrameUdpReceiver.cpp @@ -92,8 +92,8 @@ inline uint64_t FrameUdpReceiver::process_packets( init_frame(metadata, i_packet); // Happens if the last packet from the previous frame gets lost. - // In the jungfrau_packet, pulse_id is called bunchid. - } else if (metadata.pulse_id != packet_buffer_[i_packet].bunchid) { + // In the jungfrau_packet, framenum is the trigger number (how many triggers from detector power-on) happened + } else if (metadata.frame_index != packet_buffer_[i_packet].framenum) { packet_buffer_loaded_ = true; // Continue on this packet. packet_buffer_offset_ = i_packet; @@ -186,4 +186,4 @@ uint64_t FrameUdpReceiver::get_frame_from_udp( return pulse_id; } } -} \ No newline at end of file +} diff --git a/jf-udp-recv/src/main.cpp b/jf-udp-recv/src/main.cpp index 177e495..1038b54 100644 --- a/jf-udp-recv/src/main.cpp +++ b/jf-udp-recv/src/main.cpp @@ -39,22 +39,42 @@ int main (int argc, char *argv[]) { const auto udp_port = config.start_udp_port + module_id; FrameUdpReceiver receiver(udp_port, module_id); RamBuffer buffer(config.detector_name, config.n_modules); - FrameStats stats(config.detector_name, module_id, STATS_MODULO); + FrameStats stats(config.detector_name, module_id, STATS_TIME); auto ctx = zmq_ctx_new(); - auto socket = bind_socket(ctx, config.detector_name, module_id); + auto socket = bind_socket(ctx, config.detector_name, to_string(module_id)); ModuleFrame meta; char* data = new char[MODULE_N_BYTES]; + uint64_t pulse_id_previous = 0; + uint64_t frame_index_previous = 0; + while (true) { + auto pulse_id = receiver.get_frame_from_udp(meta, data); - buffer.write_frame(meta, data); + bool bad_pulse_id = false; - zmq_send(socket, &pulse_id, sizeof(pulse_id), 0); + if ( ( meta.frame_index != (frame_index_previous+1) ) || + ( (pulse_id-pulse_id_previous) < 0 ) || + ( (pulse_id-pulse_id_previous) > 1000 ) ) { + + bad_pulse_id = true; + + } else { + + buffer.write_frame(meta, data); + + zmq_send(socket, &pulse_id, sizeof(pulse_id), 0); + + } + + stats.record_stats(meta, bad_pulse_id); + + pulse_id_previous = pulse_id; + frame_index_previous = meta.frame_index; - stats.record_stats(meta); } delete[] data; diff --git a/sf-stream/include/StreamStats.hpp b/sf-stream/include/StreamStats.hpp new file mode 100644 index 0000000..bca5ce0 --- /dev/null +++ b/sf-stream/include/StreamStats.hpp @@ -0,0 +1,29 @@ +#ifndef SF_DAQ_BUFFER_STREAMSTATS_HPP +#define SF_DAQ_BUFFER_STREAMSTATS_HPP + +#include +#include +#include + +class StreamStats { + const std::string detector_name_; + const std::string stream_name_; + const size_t stats_modulo_; + + int image_counter_; + int n_corrupted_images_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + StreamStats(const std::string &detector_name, + const std::string &stream_name, + const size_t stats_modulo); + + void record_stats(const ImageMetadata &meta); +}; + + +#endif //SF_DAQ_BUFFER_STREAMSTATS_HPP diff --git a/sf-stream/include/ZmqPulseSyncReceiver.hpp b/sf-stream/include/ZmqPulseSyncReceiver.hpp index aa00035..624de34 100644 --- a/sf-stream/include/ZmqPulseSyncReceiver.hpp +++ b/sf-stream/include/ZmqPulseSyncReceiver.hpp @@ -8,6 +8,11 @@ #include "formats.hpp" +struct PulseAndSync { + const uint64_t pulse_id; + const uint32_t n_lost_pulses; +}; + class ZmqPulseSyncReceiver { void* ctx_; @@ -22,7 +27,7 @@ public: const int n_modules); ~ZmqPulseSyncReceiver(); - uint64_t get_next_pulse_id() const; + PulseAndSync get_next_pulse_id() const; }; diff --git a/sf-stream/include/stream_config.hpp b/sf-stream/include/stream_config.hpp index 9b7e693..8f8b977 100644 --- a/sf-stream/include/stream_config.hpp +++ b/sf-stream/include/stream_config.hpp @@ -14,4 +14,7 @@ namespace stream_config const int PULSE_ZMQ_SNDHWM = 100; // Number of times we try to re-sync in case of failure. const int SYNC_RETRY_LIMIT = 3; + + // Number of pulses between each statistics print out. + const size_t STREAM_STATS_MODULO = 1000; } diff --git a/sf-stream/src/StreamStats.cpp b/sf-stream/src/StreamStats.cpp new file mode 100644 index 0000000..7408629 --- /dev/null +++ b/sf-stream/src/StreamStats.cpp @@ -0,0 +1,62 @@ +#include "StreamStats.hpp" + +#include + +using namespace std; +using namespace chrono; + +StreamStats::StreamStats( + const std::string &detector_name, + const std::string &stream_name, + const size_t stats_modulo) : + detector_name_(detector_name), + stream_name_(stream_name), + stats_modulo_(stats_modulo) +{ + reset_counters(); +} + +void StreamStats::reset_counters() +{ + image_counter_ = 0; + n_corrupted_images_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void StreamStats::record_stats( + const ImageMetadata &meta) +{ + image_counter_++; + + if (!meta.is_good_image) { + n_corrupted_images_++; + } + + if (image_counter_ == stats_modulo_) { + print_stats(); + reset_counters(); + } +} + +void StreamStats::print_stats() +{ + auto interval_ms_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((image_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "sf_stream"; + cout << ",detector_name=" << detector_name_; + cout << ",stream_name=" << stream_name_; + cout << " "; + cout << "n_processed_images=" << image_counter_ << "i"; + cout << ",n_corrupted_images=" << n_corrupted_images_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} + diff --git a/sf-stream/src/ZmqPulseSyncReceiver.cpp b/sf-stream/src/ZmqPulseSyncReceiver.cpp index f309389..96221f3 100644 --- a/sf-stream/src/ZmqPulseSyncReceiver.cpp +++ b/sf-stream/src/ZmqPulseSyncReceiver.cpp @@ -27,7 +27,7 @@ ZmqPulseSyncReceiver::ZmqPulseSyncReceiver( for (int i=0; i::max();; uint64_t max_pulse_id = 0; @@ -83,18 +83,26 @@ uint64_t ZmqPulseSyncReceiver::get_next_pulse_id() const } modules_in_sync = true; + // Max pulses we lost in this sync attempt. + uint32_t i_sync_lost_pulses = 0; for (int i = 0; i < n_modules_; i++) { + // How many pulses we lost for this specific module. + uint32_t i_module_lost_pulses = 0; while (pulses[i] < max_pulse_id) { zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0); + i_module_lost_pulses++; } + i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses); + if (pulses[i] != max_pulse_id) { modules_in_sync = false; } } + n_lost_pulses += i_sync_lost_pulses; if (modules_in_sync) { - return pulses[0]; + return {pulses[0], n_lost_pulses}; } } diff --git a/sf-stream/src/main.cpp b/sf-stream/src/main.cpp index dbd588f..dbe313a 100644 --- a/sf-stream/src/main.cpp +++ b/sf-stream/src/main.cpp @@ -3,11 +3,10 @@ #include #include #include +#include -#include "buffer_config.hpp" #include "stream_config.hpp" #include "ZmqLiveSender.hpp" -#include "ZmqPulseSyncReceiver.hpp" using namespace std; using namespace buffer_config; @@ -15,57 +14,36 @@ using namespace stream_config; int main (int argc, char *argv[]) { - if (argc != 2) { + if (argc != 3) { cout << endl; - cout << "Usage: sf_stream [detector_json_filename]" << endl; + cout << "Usage: sf_stream [detector_json_filename]" + " [stream_name]" << endl; cout << "\tdetector_json_filename: detector config file path." << endl; cout << endl; exit(-1); } + const auto stream_name = string(argv[2]); + // TODO: Add stream_name to config reading - multiple stream definitions. auto config = BufferUtils::read_json_config(string(argv[1])); - string RECV_IPC_URL = BUFFER_LIVE_IPC_URL + config.detector_name + "-"; auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); + auto receiver = BufferUtils::connect_socket( + ctx, config.detector_name, "assembler"); - ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules); RamBuffer ram_buffer(config.detector_name, config.n_modules); + StreamStats stats(config.detector_name, stream_name, STREAM_STATS_MODULO); ZmqLiveSender sender(ctx, config); - // TODO: Remove stats trash. - uint64_t last_pulse_id = 0; - uint64_t last_pulse_id_range = 0; - uint16_t n_good_images = 0; - ImageMetadata meta; while (true) { - auto pulse_id = receiver.get_next_pulse_id(); - char* data = ram_buffer.read_image(pulse_id, meta); + zmq_recv(receiver, &meta, sizeof(meta), 0); + char* data = ram_buffer.read_image(meta.pulse_id); sender.send(meta, data); - // TODO: This logic works only at 100Hz. Fix it systematically. - uint64_t sync_lost_pulses = pulse_id - last_pulse_id; - if (last_pulse_id > 0 && sync_lost_pulses > 1) { - cout << "sf_stream:sync_lost_pulses " << sync_lost_pulses << endl; - } - last_pulse_id = pulse_id; - - uint64_t curr_pulse_id_range = pulse_id / 10000; - if (last_pulse_id_range != curr_pulse_id_range) { - if (last_pulse_id_range > 0) { - cout << "sf_stream:n_good_images " << n_good_images; - cout << endl; - } - - last_pulse_id_range = curr_pulse_id_range; - n_good_images = 0; - } - - if (meta.is_good_image) { - n_good_images++; - } + stats.record_stats(meta); } } diff --git a/sf-writer/CMakeLists.txt b/sf-writer/CMakeLists.txt index dc87ad2..5b520f5 100644 --- a/sf-writer/CMakeLists.txt +++ b/sf-writer/CMakeLists.txt @@ -11,6 +11,7 @@ add_executable(sf-writer src/main.cpp) set_target_properties(sf-writer PROPERTIES OUTPUT_NAME sf_writer) target_link_libraries(sf-writer sf-writer-lib + zmq hdf5 hdf5_hl hdf5_cpp