diff --git a/CMakeLists.txt b/CMakeLists.txt index 74c7972..19194fc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,13 +31,12 @@ add_subdirectory( EXCLUDE_FROM_ALL) add_subdirectory("core-buffer") -add_subdirectory("jf-udp-recv") -add_subdirectory("jf-buffer-writer") +#add_subdirectory("std-stream-send") +add_subdirectory("std-udp-recv") +add_subdirectory("std-udp-sync") +#add_subdirectory("jf-buffer-writer") add_subdirectory("jf-assembler") -add_subdirectory("sf-stream") -add_subdirectory("sf-writer") - -if(BUILD_JF_LIVE_WRITER) - add_subdirectory("jf-live-writer") -endif() +#add_subdirectory("sf-stream") +#add_subdirectory("sf-writer") +add_subdirectory("jf-live-writer") diff --git a/core-buffer/include/RamBuffer.hpp b/core-buffer/include/RamBuffer.hpp index 36dde00..b14f293 100644 --- a/core-buffer/include/RamBuffer.hpp +++ b/core-buffer/include/RamBuffer.hpp @@ -3,6 +3,7 @@ #include #include "formats.hpp" +#include "buffer_config.hpp" class RamBuffer { const std::string buffer_name_; diff --git a/core-buffer/include/buffer_config.hpp b/core-buffer/include/buffer_config.hpp index f33e601..c6f64be 100644 --- a/core-buffer/include/buffer_config.hpp +++ b/core-buffer/include/buffer_config.hpp @@ -24,7 +24,6 @@ namespace buffer_config { const size_t BUFFER_BLOCK_SIZE = 100; - const size_t BUFFER_UDP_N_RECV_MSG = 128; // Size of UDP recv buffer const int BUFFER_UDP_RCVBUF_N_SLOTS = 100; // 8246 bytes for each UDP packet. diff --git a/core-buffer/include/eiger.hpp b/core-buffer/include/eiger.hpp index ab0b0c2..6283cdc 100644 --- a/core-buffer/include/eiger.hpp +++ b/core-buffer/include/eiger.hpp @@ -4,6 +4,10 @@ #include #include +#define IS_BOTTOM(n) ((n%2 != 0) ? -1 : 1) + +const std::string DETECTOR_TYPE = "eiger"; + #define N_MODULES 1 #define BYTES_PER_PACKET 4144 #define DATA_BYTES_PER_PACKET 4096 @@ -20,7 +24,6 @@ #define MODULE_Y_SIZE 512 #define MODULE_N_PIXELS 131072 #define PIXEL_N_BYTES 2 -#define MODULE_N_BYTES 262144 #define GAP_X_MODULE_PIXELS 2 #define GAP_Y_MODULE_PIXELS 2 #define GAP_X_EIGERMOD_PIXELS 8 @@ -28,7 +31,7 @@ #define N_BYTES_PER_MODULE_LINE(bit_depth) ((MODULE_X_SIZE * bit_depth) / 8) -#define N_BYTES_PER_MODULE_FRAME(bit_depth) ((131072 * bit_depth) / 8) +#define N_BYTES_PER_MODULE_FRAME(bit_depth) ((MODULE_N_PIXELS * bit_depth) / 8) // #define N_BYTES_PER_IMAGE_LINE(bit_depth, n_submodules) ((n_submodules / 2 * MODULE_X_SIZE * bit_depth) / 8) diff --git a/core-buffer/include/formats.hpp b/core-buffer/include/formats.hpp index ffc065e..7ae731f 100644 --- a/core-buffer/include/formats.hpp +++ b/core-buffer/include/formats.hpp @@ -1,27 +1,18 @@ #ifndef SF_DAQ_BUFFER_FORMATS_HPP #define SF_DAQ_BUFFER_FORMATS_HPP -#include "buffer_config.hpp" - -#ifndef USE_EIGER -#include "jungfrau.hpp" -#else -#include "eiger.hpp" -#endif - -#define IS_BOTTOM(n) ((n%2 != 0) ? -1 : 1) - #pragma pack(push) #pragma pack(1) struct ModuleFrame { + uint64_t id; uint64_t pulse_id; uint64_t frame_index; uint64_t daq_rec; uint64_t n_recv_packets; uint64_t module_id; uint16_t bit_depth; - uint16_t row; - uint16_t column; + uint16_t pos_y; + uint16_t pos_x; }; #pragma pack(pop) @@ -29,33 +20,17 @@ struct ModuleFrame { #pragma pack(push) #pragma pack(1) struct ImageMetadata { - uint64_t pulse_id; - uint64_t frame_index; - uint32_t daq_rec; - uint32_t is_good_image; + uint64_t id; + uint64_t height; + uint64_t width; + uint64_t dtype; + uint64_t encoding; + uint64_t source_id; + uint64_t status; + uint64_t user_1; + uint64_t user_2; }; #pragma pack(pop) -struct ModuleFrameBuffer { - ModuleFrame module[N_MODULES]; -}; - -#pragma pack(push) -#pragma pack(1) -struct BufferBinaryFormat { - const char FORMAT_MARKER = 0xBE; - ModuleFrame meta; - char data[MODULE_N_BYTES]; -}; -#pragma pack(pop) - -#pragma pack(push) -#pragma pack(1) -struct BufferBinaryBlock -{ - BufferBinaryFormat frame[buffer_config::BUFFER_BLOCK_SIZE]; - uint64_t start_pulse_id; -}; -#pragma pack(pop) #endif //SF_DAQ_BUFFER_FORMATS_HPP diff --git a/core-buffer/include/jungfrau.hpp b/core-buffer/include/jungfrau.hpp index eb4f174..b7ad43d 100644 --- a/core-buffer/include/jungfrau.hpp +++ b/core-buffer/include/jungfrau.hpp @@ -3,6 +3,8 @@ #include +const std::string DETECTOR_TYPE = "jungfrau"; + #define N_MODULES 32 #define BYTES_PER_PACKET 8240 #define DATA_BYTES_PER_PACKET 8192 @@ -40,5 +42,4 @@ struct det_packet { }; #pragma pack(pop) - #endif diff --git a/jf-live-writer/src/main.cpp b/jf-live-writer/src/main.cpp index 179e3aa..e2940e7 100644 --- a/jf-live-writer/src/main.cpp +++ b/jf-live-writer/src/main.cpp @@ -62,7 +62,7 @@ int main (int argc, char *argv[]) // Fair distribution of images among writers. if (meta.i_image % n_writers == i_writer) { - char* data = ram_buffer.read_image(meta.image_metadata.pulse_id); + char* data = ram_buffer.get_slot_data(meta.image_metadata.pulse_id); stats.start_image_write(); writer.write_data(meta.run_id, meta.i_image, data); diff --git a/jf-udp-recv/CMakeLists.txt b/jf-udp-recv/CMakeLists.txt deleted file mode 100644 index 3667afb..0000000 --- a/jf-udp-recv/CMakeLists.txt +++ /dev/null @@ -1,26 +0,0 @@ -file(GLOB SOURCES - src/*.cpp) - -add_library(jf-udp-recv-lib STATIC ${SOURCES}) -target_include_directories(jf-udp-recv-lib PUBLIC include/) -target_link_libraries(jf-udp-recv-lib - external - core-buffer-lib) - -add_executable(jf-udp-recv src/main.cpp) - -if (USE_EIGER) - set (LIB_NAME_UDP_RECV "eiger_udp_recv") -else() - set (LIB_NAME_UDP_RECV "jf_udp_recv") -endif() - -set_target_properties(jf-udp-recv PROPERTIES OUTPUT_NAME ${LIB_NAME_UDP_RECV}) - -target_link_libraries(jf-udp-recv - jf-udp-recv-lib - zmq - rt) - -enable_testing() -add_subdirectory(test/) diff --git a/std-stream-send/CMakeLists.txt b/std-stream-send/CMakeLists.txt new file mode 100644 index 0000000..3d8b419 --- /dev/null +++ b/std-stream-send/CMakeLists.txt @@ -0,0 +1,18 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(std-stream-send-lib STATIC ${SOURCES}) +target_include_directories(std-stream-send-lib PUBLIC include/) +target_link_libraries(std-stream-send-lib + core-buffer-lib) + +add_executable(std-stream-send src/main.cpp) +set_target_properties(std-stream-send PROPERTIES OUTPUT_NAME std_stream_send) +target_link_libraries(std-stream-send + std-stream-send-lib + zmq + pthread + rt) + +enable_testing() +add_subdirectory(test/) diff --git a/std-stream-send/README.md b/std-stream-send/README.md new file mode 100644 index 0000000..ce21a4d --- /dev/null +++ b/std-stream-send/README.md @@ -0,0 +1,179 @@ +# sf-stream +sf-stream is the component that receives a live stream of frame data from +sf-buffers over ZMQ and assembles them into images. This images are then +sent again over ZMQ to external components. There is always only 1 sf-stream +per detector. + +It currently has 3 output streams: + +- **Full data full meta** rate stream (send all images and meta) +- **Reduced data full meta** rate stream (send less images, but +all meta) +- **Pulse_id** stream (send only the current pulse_id) + +In addition to receiving and assembling images, sf-stream also calculates +additional meta and constructs the structures needed to send data in +Array 1.0 protocol. + +This component does not guarantee that the streams will always contain all +the data - it can happen that frame resynchronization is needed, and in this +case 1 or more frames could potentially be lost. This happens so rarely that in +practice is not a problem. + +## Overview + +![image_stream_overview](../docs/sf_daq_buffer-overview-stream.jpg) + +sf-stream is a single threaded application (without counting the ZMQ IO threads) +that is used for providing live assembled images to anyone willing to listen. + +In addition, it also provides a pulse_id stream, which is the most immediate +pulse_id feedback we currently have in case we need to synchronize external +components to the current machine pulse_id. + +## ZMQ receiving +Each ZMQ stream is coming from a separate sf-buffer. This means that we have as +many connections as we have modules in a detector. + +Messages are multipart (2 parts) and are received in PUB/SUB mode. + +There is no need for special synchronization between modules as we expect that +frames will always be in the correct order and all modules will provide the +same frame more or less at the same time. If any of this 2 conditions is not +met, the detector is not working properly and we cannot guaranty that sf-stream +will work correctly. + +Nonetheless we provide the capability to synchronize the streams in image +assembly phase - this is needed rarely, but occasionally happens. In this sort +of hiccups we usually loose only a couple of consecutive images. + +### Messages format +Each message is composed by 2 parts: + +- Serialization of ModuleFrame in the first part. +- Frame data in the second part. + +Module frame is defined as: +```c++ +#pragma pack(push) +#pragma pack(1) +struct ModuleFrame { + uint64_t pulse_id; + uint64_t frame_index; + uint64_t daq_rec; + uint64_t n_recv_packets; + uint64_t module_id; +}; +#pragma pack(pop) +``` + +The frame data is a 1MB (1024*512 pixels * 2 bytes/pixel) blob of data in +**uint16** representing the detector image. + +## Image assembly +We first synchronize the modules. We do this by reading all sockets and +deciding the largest frame pulse_id among them (max_pulse_id). We then calculate +the diff between a specific socket pulse_id and the max_pulse_id. +This difference tells us how many messages we need to discard from a specific socket. + +This discarding is the source of possible missing images in the output stream. +It can happen in 3 cases: + +- At least one of the detector modules did not sent any packets for the specific +pulse_id. +- All the packets from a specific module for a pulse_id were lost before UDP +receiving them. +- ZMQ HWM was reached (either on the sf-buffer or sf-stream) and the message was +dropped. + +All this 3 cases are highly unlikely, so synchronization is mostly needed when +first starting sf-stream. Different sockets connect to sf-buffers at different +times. Apart from the initial synchronization there should be no need to +re-synchronize modules in a healthy running environment. + +If an image is missing any ZMQ messages from sf-buffers (not all modules data +arrived), the image will be dropped. We do not do partial reconstruction in +sf-stream. However, it is important to note, that this does not cover the case +where frames are incomplete (missing UDP packets on sf-buffer) - we still +assemble this images as long as at least 1 packet/frame for a specific pulse_id +arrived. + +## ZMQ sending + +We devide the ZMQ sending to 3 types of stream: + +- Data processing stream. This is basically the complete stream from +the detector with all meta and data. It can be described as full data full +meta stream. Only 1 client at the time can be connected to this stream +(PUSH/PULL for load balancing). + +- Live viewing stream. This is a reduced data full meta stream. We send +meta for all frames, but data only for subset of them (10Hz, for example). +Any number of clients can connect to the 10Hz stream, because we use PUB/SUB +for this socket. + +- Pulse_id stream. This is a stream that sends out only the current pulse_id. +It can be used to synchronize any external system with the current pulse_id +being recorded. Multiple clients can connect to this stream. + +In the data processing and live viewing stream we use +[Array 1.0](https://github.com/paulscherrerinstitute/htypes/blob/master/array-1.0.md) +as our protocol to be compatible with currently available external components. + +We use following fields in the JSON header: + +| Name | Type | Comment | +| --- | --- | --- | +| pulse_id | uint64 |bunchid from detector header| +|frame|uint64|frame_index from detector header| +|is_good_frame|bool|true if all packets for this frame are present| +|daq_rec|uint32|daqrec from detector header| +|pedestal_file|string|Path to pedestal file| +|gain_file|string|Path to gain file| +|number_frames_expected|int|Number of expected frames| +|run_name|string|Name of the run| +|detector_name|string|Name of the detector| +|htype|string|Value: "array-1.0"| +|type|string|Value: "uint16"| +|shape|Array[uint64]|Shape of the image in stream| + +### Full data full meta stream + +This stream runs at detector frequency and uses PUSH/PULL to distribute data +to max 1 client (this client can have many processes, but it needs to be a +single logical entity, since the images are evenly distributed to all +connected sockets). + +![image_full_stream](../docs/sf_daq_buffer-FullStream.jpg) + +The goal here is to provide a complete copy of the detector image stream +for purposes of online analysis. Given the large amount of data on this +stream only "pre-approved" applications that can handle the load should be +attached here. + +### Reduced data full meta stream + +This streams also runs at detector frequency for JSON headers (meta), but +it sends only part of the images in the stream. The rest of the images are +sent as empty buffers (the receiver needs to be aware of this behaviour, as +Array 1.0 alone does not define it). + +![image_reduced_stream](../docs/sf_daq_buffer-ReducedStream.jpg) + +This is the lightweight version of the image stream. Any number of clients +can connect to this stream (PUB/SUB) but no client can do load +balancing automatically (it would require PUSH/PULL). + +This is a "public interface" for anyone who wants to get detector data live, +and can do with only a subset of images. + +### Pulse_id stream + +This stream runs ar detector frequency in PUB/SUB mode. The only thing it +does is sends out the pulse_id (of the just received image) in uint64_t +format. + +![image_pulse_stream](../docs/sf_daq_buffer-PulseStream.jpg) + +This is also a "public interface" for anyone who wants to get the current +system pulse_id. \ No newline at end of file diff --git a/std-stream-send/build_stdstream.sh b/std-stream-send/build_stdstream.sh new file mode 100644 index 0000000..d954ab2 --- /dev/null +++ b/std-stream-send/build_stdstream.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +VERSION=1.0.0 + +docker build --no-cache=true -f debug.Dockerfile -t paulscherrerinstitute/std-stream-send-sim . +docker tag paulscherrerinstitute/std-stream-send-sim paulscherrerinstitute/std-stream-send-sim:$VERSION + +docker login +docker push paulscherrerinstitute/std-stream-send-sim:$VERSION +docker push paulscherrerinstitute/std-stream-send-sim \ No newline at end of file diff --git a/std-stream-send/debug.Dockerfile b/std-stream-send/debug.Dockerfile new file mode 100644 index 0000000..97a772a --- /dev/null +++ b/std-stream-send/debug.Dockerfile @@ -0,0 +1,13 @@ +FROM paulscherrerinstitute/sf-daq_phdf5:1.0.0 + +COPY . /sf_daq_buffer/ + +RUN mkdir /sf_daq_buffer/build && \ + cd /sf_daq_buffer/build && \ + cmake3 .. && \ + make std-stream-send + +WORKDIR /sf_daq_buffer/build + +ENTRYPOINT ["/sf_daq_buffer/build/std_stream"] + diff --git a/std-stream-send/include/StreamSendConfig.hpp b/std-stream-send/include/StreamSendConfig.hpp new file mode 100644 index 0000000..36ecd2c --- /dev/null +++ b/std-stream-send/include/StreamSendConfig.hpp @@ -0,0 +1,33 @@ +#ifndef SF_DAQ_BUFFER_UDPRECVCONFIG_HPP +#define SF_DAQ_BUFFER_UDPRECVCONFIG_HPP + + +#include +#include +#include +#include +#include + +struct StreamSendConfig { + static StreamSendConfig from_json_file(const std::string& filename) { + std::ifstream ifs(filename); + rapidjson::IStreamWrapper isw(ifs); + rapidjson::Document config_parameters; + config_parameters.ParseStream(isw); + + return { + config_parameters["detector_name"].GetString(), + config_parameters["detector_type"].GetString(), + config_parameters["n_modules"].GetInt(), + config_parameters["start_udp_port"].GetInt(), + }; + } + + const std::string detector_name; + const std::string detector_type; + const int n_modules; + const int start_udp_port; +}; + + +#endif //SF_DAQ_BUFFER_UDPRECVCONFIG_HPP diff --git a/std-stream-send/include/stream_config.hpp b/std-stream-send/include/stream_config.hpp new file mode 100644 index 0000000..a0f1d72 --- /dev/null +++ b/std-stream-send/include/stream_config.hpp @@ -0,0 +1,14 @@ +namespace stream_config +{ + // N of IO threads to receive data from modules. + const int STREAM_ZMQ_IO_THREADS = 1; + // How long should the RECV queue be. + const size_t STREAM_RCVHWM = 100; + + const int PROCESSING_ZMQ_SNDHWM = 10; + // Keep the last second of pulses in the buffer. + const int PULSE_ZMQ_SNDHWM = 100; + + // Number of pulses between each statistics print out. + const size_t STREAM_STATS_MODULO = 1000; +} diff --git a/std-stream-send/src/main.cpp b/std-stream-send/src/main.cpp new file mode 100644 index 0000000..c832578 --- /dev/null +++ b/std-stream-send/src/main.cpp @@ -0,0 +1,52 @@ +#include +#include +#include "stream_config.hpp" +#include +#include +#include "RamBuffer.hpp" + + +using namespace std; +using namespace stream_config; + +int main (int argc, char *argv[]) +{ + auto ctx = zmq_ctx_new(); + zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); + + auto sender = zmq_socket(ctx, ZMQ_PUSH); + const int sndhwm = PROCESSING_ZMQ_SNDHWM; + if (zmq_setsockopt( + sender, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + const int linger = 0; + if (zmq_setsockopt( + sender, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + if (zmq_bind(sender, "tcp://127.0.0.1:10000") != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + RamBuffer image_buffer(config.detector_name + "_assembler", + sizeof(ImageMetadata), assembler.get_image_n_bytes(), 1); + + while (true) { + + image_id = 123; + + zmq_send(sender, + ram_buffer.get_slot_meta(image_id), + sizeof(ImageMetadata), ZMQ_SNDMORE); + + zmq_send(sender, + ram_buffer.get_slot_data(image_id), + buffer_config::MODULE_N_BYTES * 4, 0); + + pulse_id++; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } +} diff --git a/std-stream-send/test/CMakeLists.txt b/std-stream-send/test/CMakeLists.txt new file mode 100644 index 0000000..2778a7a --- /dev/null +++ b/std-stream-send/test/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(std-stream-send-tests main.cpp) + +target_link_libraries(std-stream-send-tests + std-stream-send-lib + gtest + ) + diff --git a/std-stream-send/test/main.cpp b/std-stream-send/test/main.cpp new file mode 100644 index 0000000..e819294 --- /dev/null +++ b/std-stream-send/test/main.cpp @@ -0,0 +1,8 @@ +#include "gtest/gtest.h" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/std-udp-recv/CMakeLists.txt b/std-udp-recv/CMakeLists.txt new file mode 100644 index 0000000..001dbf0 --- /dev/null +++ b/std-udp-recv/CMakeLists.txt @@ -0,0 +1,20 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(std-udp-recv-lib STATIC ${SOURCES}) +target_include_directories(std-udp-recv-lib PUBLIC include/) +target_link_libraries(std-udp-recv-lib + external + core-buffer-lib) + +add_executable(std-udp-recv src/main.cpp) + +set_target_properties(std-udp-recv PROPERTIES OUTPUT_NAME std_udp_recv) + +target_link_libraries(std-udp-recv + std-udp-recv-lib + zmq + rt) + +enable_testing() +add_subdirectory(test/) diff --git a/jf-udp-recv/README.md b/std-udp-recv/README.md similarity index 100% rename from jf-udp-recv/README.md rename to std-udp-recv/README.md diff --git a/jf-udp-recv/include/FrameStats.hpp b/std-udp-recv/include/FrameStats.hpp similarity index 60% rename from jf-udp-recv/include/FrameStats.hpp rename to std-udp-recv/include/FrameStats.hpp index da2d000..c144845 100644 --- a/jf-udp-recv/include/FrameStats.hpp +++ b/std-udp-recv/include/FrameStats.hpp @@ -8,28 +8,24 @@ class FrameStats { const std::string detector_name_; - const int n_modules_; const int module_id_; - const int bit_depth_; const int n_packets_per_frame_; - size_t stats_time_; + const size_t stats_time_; int frames_counter_; int n_missed_packets_; int n_corrupted_frames_; - int n_corrupted_pulse_id_; std::chrono::time_point stats_interval_start_; void reset_counters(); void print_stats(); -public:////config.detector_name, n_receivers, module_id, bit_depth, STATS_TIME - FrameStats(const std::string &detector_name, - const int n_modules, +public: + FrameStats(std::string detector_name, const int module_id, - const int bit_depth, + const int n_packets_per_frame, const size_t stats_time); - void record_stats(const ModuleFrame &meta, const bool bad_pulse_id); + void record_stats(const ModuleFrame &meta); }; diff --git a/jf-udp-recv/include/FrameUdpReceiver.hpp b/std-udp-recv/include/FrameUdpReceiver.hpp similarity index 56% rename from jf-udp-recv/include/FrameUdpReceiver.hpp rename to std-udp-recv/include/FrameUdpReceiver.hpp index 8487e0f..6038bf7 100644 --- a/jf-udp-recv/include/FrameUdpReceiver.hpp +++ b/std-udp-recv/include/FrameUdpReceiver.hpp @@ -6,18 +6,20 @@ #include "formats.hpp" #include "buffer_config.hpp" +#ifdef USE_EIGER + #include "eiger.hpp" +#else + #include "jungfrau.hpp" +#endif + class FrameUdpReceiver { - const int module_id_; - const int bit_depth_; - const size_t n_packets_per_frame_; - const size_t data_bytes_per_frame_; - PacketUdpReceiver udp_receiver_; + const int n_packets_per_frame_; - det_packet packet_buffer_[buffer_config::BUFFER_UDP_N_RECV_MSG]; - iovec recv_buff_ptr_[buffer_config::BUFFER_UDP_N_RECV_MSG]; - mmsghdr msgs_[buffer_config::BUFFER_UDP_N_RECV_MSG]; - sockaddr_in sock_from_[buffer_config::BUFFER_UDP_N_RECV_MSG]; + det_packet* const packet_buffer_; + iovec* const recv_buff_ptr_; + mmsghdr* const msgs_; + sockaddr_in* const sock_from_; bool packet_buffer_loaded_ = false; int packet_buffer_n_packets_ = 0; @@ -30,9 +32,9 @@ class FrameUdpReceiver { const int n_packets, ModuleFrame& metadata, char* frame_buffer); public: - FrameUdpReceiver(const int module_id, const uint16_t port, const int n_modules, const int n_submodules, const int bit_depth); + FrameUdpReceiver(const uint16_t port, const int n_packets_per_frame); virtual ~FrameUdpReceiver(); - uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer); + uint64_t get_frame_from_udp(ModuleFrame& meta, char* frame_buffer); }; diff --git a/jf-udp-recv/include/PacketUdpReceiver.hpp b/std-udp-recv/include/PacketUdpReceiver.hpp similarity index 100% rename from jf-udp-recv/include/PacketUdpReceiver.hpp rename to std-udp-recv/include/PacketUdpReceiver.hpp diff --git a/std-udp-recv/include/UdpRecvConfig.hpp b/std-udp-recv/include/UdpRecvConfig.hpp new file mode 100644 index 0000000..847439b --- /dev/null +++ b/std-udp-recv/include/UdpRecvConfig.hpp @@ -0,0 +1,33 @@ +#ifndef SF_DAQ_BUFFER_UDPRECVCONFIG_HPP +#define SF_DAQ_BUFFER_UDPRECVCONFIG_HPP + + +#include +#include +#include +#include +#include + +struct UdpRecvConfig { + static UdpRecvConfig from_json_file(const std::string& filename) { + std::ifstream ifs(filename); + rapidjson::IStreamWrapper isw(ifs); + rapidjson::Document config_parameters; + config_parameters.ParseStream(isw); + + return { + config_parameters["detector_name"].GetString(), + config_parameters["detector_type"].GetString(), + config_parameters["n_modules"].GetInt(), + config_parameters["start_udp_port"].GetInt(), + }; + } + + const std::string detector_name; + const std::string detector_type; + const int n_modules; + const int start_udp_port; +}; + + +#endif //SF_DAQ_BUFFER_UDPRECVCONFIG_HPP diff --git a/jf-udp-recv/src/FrameStats.cpp b/std-udp-recv/src/FrameStats.cpp similarity index 67% rename from jf-udp-recv/src/FrameStats.cpp rename to std-udp-recv/src/FrameStats.cpp index 46bc8a7..22948d1 100644 --- a/jf-udp-recv/src/FrameStats.cpp +++ b/std-udp-recv/src/FrameStats.cpp @@ -1,20 +1,18 @@ #include +#include #include "FrameStats.hpp" #include "date.h" using namespace std; using namespace chrono; FrameStats::FrameStats( - const std::string &detector_name, - const int n_modules, + string detector_name, const int module_id, - const int bit_depth, + const int n_packets_per_frame, const size_t stats_time) : - detector_name_(detector_name), - n_modules_(n_modules), + detector_name_(move(detector_name)), module_id_(module_id), - bit_depth_(bit_depth), - n_packets_per_frame_(bit_depth_ * MODULE_N_PIXELS / 8 / DATA_BYTES_PER_PACKET / n_modules), + n_packets_per_frame_(n_packets_per_frame), stats_time_(stats_time) { reset_counters(); @@ -25,36 +23,31 @@ void FrameStats::reset_counters() frames_counter_ = 0; n_missed_packets_ = 0; n_corrupted_frames_ = 0; - n_corrupted_pulse_id_ = 0; stats_interval_start_ = steady_clock::now(); } -void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) +void FrameStats::record_stats(const ModuleFrame &meta) { - if (bad_pulse_id) { - n_corrupted_pulse_id_++; - } - if (meta.n_recv_packets < n_packets_per_frame_) { n_missed_packets_ += n_packets_per_frame_ - meta.n_recv_packets; n_corrupted_frames_++; + #ifdef DEBUG_OUTPUT using namespace date; - cout << " [" << std::chrono::system_clock::now(); - cout << "] [FrameStats::record_stats] :"; - cout << " meta.frame "<< meta.frame_index; + cout << " [" << std::chrono::system_clock::now() << "]"; + cout << " [FrameStats::record_stats] :"; + cout << " meta.pulse_id "<< meta.pulse_id; + cout << " meta.frame_index "<< meta.frame_index; cout << " || meta.n_recv_packets " << meta.n_recv_packets; cout << " || n_missed_packets_ " << n_missed_packets_; cout << endl; #endif - - } frames_counter_++; - auto time_passed = duration_cast( + const auto time_passed = duration_cast( steady_clock::now()-stats_interval_start_).count(); if (time_passed >= stats_time_*1000) { @@ -73,14 +66,13 @@ void FrameStats::print_stats() system_clock::now()).time_since_epoch().count(); // Output in InfluxDB line protocol - cout << "jf_udp_recv"; + cout << "std_udp_recv,"; cout << ",detector_name=" << detector_name_; - cout << ",module_name=M" << module_id_; + cout << ",module_id=" << module_id_; cout << " "; cout << "n_missed_packets=" << n_missed_packets_ << "i"; cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; cout << ",repetition_rate=" << rep_rate << "i"; - cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; cout << " "; cout << timestamp; cout << endl; diff --git a/jf-udp-recv/src/FrameUdpReceiver.cpp b/std-udp-recv/src/FrameUdpReceiver.cpp similarity index 61% rename from jf-udp-recv/src/FrameUdpReceiver.cpp rename to std-udp-recv/src/FrameUdpReceiver.cpp index a21ad99..92b7a9c 100644 --- a/jf-udp-recv/src/FrameUdpReceiver.cpp +++ b/std-udp-recv/src/FrameUdpReceiver.cpp @@ -3,37 +3,32 @@ #include #include #include -#include #include "date.h" using namespace std; using namespace buffer_config; FrameUdpReceiver::FrameUdpReceiver( - const int module_id, - const uint16_t port, - const int n_modules, - const int n_submodules, - const int bit_depth): - module_id_(module_id), - bit_depth_(bit_depth), - n_packets_per_frame_(bit_depth_ * MODULE_N_PIXELS / 8 / DATA_BYTES_PER_PACKET / n_modules * n_submodules), - data_bytes_per_frame_(n_packets_per_frame_ * DATA_BYTES_PER_PACKET) + const uint16_t port, const int n_packets_per_frame) : + n_packets_per_frame_(n_packets_per_frame), + packet_buffer_(new det_packet[n_packets_per_frame_]), + recv_buff_ptr_(new iovec[n_packets_per_frame_]), + msgs_(new mmsghdr[n_packets_per_frame_]), + sock_from_(new sockaddr_in[n_packets_per_frame_]) { #ifdef DEBUG_OUTPUT using namespace date; cout << " [" << std::chrono::system_clock::now(); cout << "] [FrameUdpReceiver::FrameUdpReceiver] :"; cout << " Details of FrameUdpReceiver:"; - cout << "module_id: " << module_id_; cout << " || port: " << port; - cout << " || bit_depth : " << bit_depth_; - cout << " || n_packets_per_frame_ : " << n_packets_per_frame_; - cout << " || data_bytes_per_frame_: " << data_bytes_per_frame_ << " !!"; + cout << " || n_packets_per_frame: " << n_packets_per_frame_; cout << endl; #endif + udp_receiver_.bind(port); - for (int i = 0; i < BUFFER_UDP_N_RECV_MSG; i++) { + + for (int i = 0; i < n_packets_per_frame_; i++) { recv_buff_ptr_[i].iov_base = (void*) &(packet_buffer_[i]); recv_buff_ptr_[i].iov_len = sizeof(det_packet); @@ -46,28 +41,28 @@ FrameUdpReceiver::FrameUdpReceiver( FrameUdpReceiver::~FrameUdpReceiver() { udp_receiver_.disconnect(); + + delete[] packet_buffer_; + delete[] recv_buff_ptr_; + delete[] msgs_; + delete[] sock_from_; } inline void FrameUdpReceiver::init_frame( ModuleFrame& frame_metadata, const int i_packet) { - // Eiger has no pulse_id, frame number instead - frame_metadata.pulse_id = packet_buffer_[i_packet].framenum; + frame_metadata.pulse_id = packet_buffer_[i_packet].bunchid; frame_metadata.frame_index = packet_buffer_[i_packet].framenum; frame_metadata.daq_rec = (uint64_t) packet_buffer_[i_packet].debug; - frame_metadata.module_id = (int64_t) module_id_; - - frame_metadata.bit_depth = (int16_t) bit_depth_; - frame_metadata.row = (int16_t) packet_buffer_[i_packet].row; - frame_metadata.column = (int16_t) packet_buffer_[i_packet].column; + frame_metadata.pos_y = (int16_t) packet_buffer_[i_packet].row; + frame_metadata.pos_x = (int16_t) packet_buffer_[i_packet].column; #ifdef DEBUG_OUTPUT using namespace date; cout << " [" << std::chrono::system_clock::now(); cout << "] [FrameUdpReceiver::init_frame] :"; - cout << "module_id: " << module_id_; - cout << " || row: " << frame_metadata.row; - cout << " || column: " << frame_metadata.column; + cout << " || pos_y: " << frame_metadata.pos_x; + cout << " || pos_x: " << frame_metadata.pos_x; cout << " || pulse_id: " << frame_metadata.pulse_id; cout << " || frame_index: " << frame_metadata.frame_index; cout << endl; @@ -100,11 +95,12 @@ inline uint64_t FrameUdpReceiver::process_packets( i_packet++) { // First packet for this frame. - if (metadata.pulse_id == 0) { + if (metadata.frame_index == 0) { init_frame(metadata, i_packet); // Happens if the last packet from the previous frame gets lost. - // In the jungfrau_packet, framenum is the trigger number (how many triggers from detector power-on) happened + // In the jungfrau_packet, framenum is the trigger number + // (how many triggers from detector power-on) happened } else if (metadata.frame_index != packet_buffer_[i_packet].framenum) { packet_buffer_loaded_ = true; // Continue on this packet. @@ -122,21 +118,19 @@ inline uint64_t FrameUdpReceiver::process_packets( #ifdef DEBUG_OUTPUT using namespace date; cout << " [" << std::chrono::system_clock::now(); - cout << "] [FrameUdpReceiver::process_packets] :"; - cout << " Frame " << metadata.frame_index; - cout << " || N_RECV_PACKETS " << metadata.n_recv_packets; - cout << " || PULSE ID "<< metadata.pulse_id; - cout << " metadata.row " << metadata.row; - cout << " metadata.column " << metadata.column; + cout << "] [frameudpreceiver::process_packets] :"; + cout << " frame " << metadata.frame_index << " || "; + cout << packet_buffer_[i_packet].packetnum << " packets received."; + cout << " pulse id "<< metadata.pulse_id; cout << endl; #endif - // Buffer is loaded only if this is not the last message. + // buffer is loaded only if this is not the last message. if (i_packet+1 != packet_buffer_n_packets_) { packet_buffer_loaded_ = true; - // Continue on next packet. + // continue on next packet. packet_buffer_offset_ = i_packet + 1; - // If i_packet is the last packet the buffer is empty. + // if i_packet is the last packet the buffer is empty. } else { packet_buffer_loaded_ = false; packet_buffer_offset_ = 0; @@ -153,36 +147,31 @@ inline uint64_t FrameUdpReceiver::process_packets( } uint64_t FrameUdpReceiver::get_frame_from_udp( - ModuleFrame& metadata, char* frame_buffer) + ModuleFrame& meta, char* frame_buffer) { - // Reset the metadata and frame buffer for the next frame. - metadata.pulse_id = 0; - metadata.n_recv_packets = 0; - memset(frame_buffer, 0, data_bytes_per_frame_); - // Happens when last packet from previous frame was missed. if (packet_buffer_loaded_) { - auto pulse_id = process_packets( - packet_buffer_offset_, metadata, frame_buffer); - if (pulse_id != 0) { - return pulse_id; + auto frame_index = process_packets( + packet_buffer_offset_, meta, frame_buffer); + if (frame_index != 0) { + return frame_index; } } while (true) { packet_buffer_n_packets_ = udp_receiver_.receive_many( - msgs_, BUFFER_UDP_N_RECV_MSG); + msgs_, n_packets_per_frame_); if (packet_buffer_n_packets_ == 0) { continue; } - auto pulse_id = process_packets(0, metadata, frame_buffer); + auto frame_index = process_packets(0, meta, frame_buffer); - if (pulse_id != 0) { - return pulse_id; + if (frame_index != 0) { + return frame_index; } } } diff --git a/jf-udp-recv/src/PacketUdpReceiver.cpp b/std-udp-recv/src/PacketUdpReceiver.cpp similarity index 100% rename from jf-udp-recv/src/PacketUdpReceiver.cpp rename to std-udp-recv/src/PacketUdpReceiver.cpp diff --git a/std-udp-recv/src/main.cpp b/std-udp-recv/src/main.cpp new file mode 100644 index 0000000..c9c1a06 --- /dev/null +++ b/std-udp-recv/src/main.cpp @@ -0,0 +1,83 @@ +#include +#include +#include +#include + +#include "formats.hpp" +#include "buffer_config.hpp" +#include "FrameUdpReceiver.hpp" +#include "BufferUtils.hpp" +#include "FrameStats.hpp" +#include "UdpRecvConfig.hpp" + +using namespace std; +using namespace chrono; +using namespace buffer_config; +using namespace BufferUtils; + + +int main (int argc, char *argv[]) { + + if (argc != 4) { + cout << endl; + cout << "Usage: std_udp_recv [udp_recv_config_filename] [module_id] " + "[bit_depth]"; + cout << endl; + cout << "\tudp_recv_config_filename: detector config file path." << endl; + cout << "\tmodule_id: id of the module for this process." << endl; + cout << "\tbit_depth: bit depth of the incoming udp packets." << endl; + cout << endl; + exit(-1); + } + + const auto config = UdpRecvConfig::from_json_file(string(argv[1])); + const int module_id = atoi(argv[2]); + const int bit_depth = atoi(argv[3]); + + if (DETECTOR_TYPE != config.detector_type) { + throw runtime_error("UDP recv version for " + DETECTOR_TYPE + + " but config for " + config.detector_type); + } + + const auto udp_port = config.start_udp_port + module_id; + const size_t FRAME_N_BYTES = MODULE_N_PIXELS * bit_depth / 8; + const size_t N_PACKETS_PER_FRAME = FRAME_N_BYTES / DATA_BYTES_PER_PACKET; + + FrameUdpReceiver receiver(udp_port, N_PACKETS_PER_FRAME); + RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame), + FRAME_N_BYTES, config.n_modules); + FrameStats stats(config.detector_name, module_id, + N_PACKETS_PER_FRAME, STATS_TIME); + + auto ctx = zmq_ctx_new(); + auto socket = bind_socket(ctx, config.detector_name, to_string(module_id)); + + ModuleFrame meta; + meta.module_id = module_id; + meta.bit_depth = bit_depth; + + char* data = new char[FRAME_N_BYTES]; + + while (true) { + // Reset the metadata and frame buffer for the next frame. + meta.frame_index = 0; + memset(data, 0, FRAME_N_BYTES); + + receiver.get_frame_from_udp(meta, data); + + // Assign the image_id based on the detector type. +#ifdef USE_EIGER + const uint64_t image_id = meta.frame_index; +#else + const uint64_t image_id = meta.pulse_id; +#endif + meta.id = image_id; + + frame_buffer.write_frame(meta, data); + zmq_send(socket, &image_id, sizeof(image_id), 0); + + stats.record_stats(meta); + } + + delete[] data; +} diff --git a/jf-udp-recv/test/CMakeLists.txt b/std-udp-recv/test/CMakeLists.txt similarity index 100% rename from jf-udp-recv/test/CMakeLists.txt rename to std-udp-recv/test/CMakeLists.txt diff --git a/jf-udp-recv/test/main.cpp b/std-udp-recv/test/main.cpp similarity index 100% rename from jf-udp-recv/test/main.cpp rename to std-udp-recv/test/main.cpp diff --git a/jf-udp-recv/test/mock/udp.hpp b/std-udp-recv/test/mock/udp.hpp similarity index 100% rename from jf-udp-recv/test/mock/udp.hpp rename to std-udp-recv/test/mock/udp.hpp diff --git a/jf-udp-recv/test/test_FrameUdpReceiver.cpp b/std-udp-recv/test/test_FrameUdpReceiver.cpp similarity index 100% rename from jf-udp-recv/test/test_FrameUdpReceiver.cpp rename to std-udp-recv/test/test_FrameUdpReceiver.cpp diff --git a/jf-udp-recv/test/test_PacketUdpReceiver.cpp b/std-udp-recv/test/test_PacketUdpReceiver.cpp similarity index 100% rename from jf-udp-recv/test/test_PacketUdpReceiver.cpp rename to std-udp-recv/test/test_PacketUdpReceiver.cpp diff --git a/std-udp-sync/CMakeLists.txt b/std-udp-sync/CMakeLists.txt new file mode 100644 index 0000000..7466d1e --- /dev/null +++ b/std-udp-sync/CMakeLists.txt @@ -0,0 +1,22 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(std-udp-sync-lib STATIC ${SOURCES}) +target_include_directories(std-udp-sync-lib PUBLIC include/) +target_link_libraries(std-udp-sync-lib + external + core-buffer-lib) + +add_executable(std-udp-sync src/main.cpp) + +set_target_properties(std-udp-sync PROPERTIES OUTPUT_NAME std_udp_sync) +target_link_libraries(std-udp-sync + external + core-buffer-lib + std-udp-sync-lib + zmq + pthread + rt) + +enable_testing() +add_subdirectory(test/) diff --git a/std-udp-sync/README.md b/std-udp-sync/README.md new file mode 100644 index 0000000..ce21a4d --- /dev/null +++ b/std-udp-sync/README.md @@ -0,0 +1,179 @@ +# sf-stream +sf-stream is the component that receives a live stream of frame data from +sf-buffers over ZMQ and assembles them into images. This images are then +sent again over ZMQ to external components. There is always only 1 sf-stream +per detector. + +It currently has 3 output streams: + +- **Full data full meta** rate stream (send all images and meta) +- **Reduced data full meta** rate stream (send less images, but +all meta) +- **Pulse_id** stream (send only the current pulse_id) + +In addition to receiving and assembling images, sf-stream also calculates +additional meta and constructs the structures needed to send data in +Array 1.0 protocol. + +This component does not guarantee that the streams will always contain all +the data - it can happen that frame resynchronization is needed, and in this +case 1 or more frames could potentially be lost. This happens so rarely that in +practice is not a problem. + +## Overview + +![image_stream_overview](../docs/sf_daq_buffer-overview-stream.jpg) + +sf-stream is a single threaded application (without counting the ZMQ IO threads) +that is used for providing live assembled images to anyone willing to listen. + +In addition, it also provides a pulse_id stream, which is the most immediate +pulse_id feedback we currently have in case we need to synchronize external +components to the current machine pulse_id. + +## ZMQ receiving +Each ZMQ stream is coming from a separate sf-buffer. This means that we have as +many connections as we have modules in a detector. + +Messages are multipart (2 parts) and are received in PUB/SUB mode. + +There is no need for special synchronization between modules as we expect that +frames will always be in the correct order and all modules will provide the +same frame more or less at the same time. If any of this 2 conditions is not +met, the detector is not working properly and we cannot guaranty that sf-stream +will work correctly. + +Nonetheless we provide the capability to synchronize the streams in image +assembly phase - this is needed rarely, but occasionally happens. In this sort +of hiccups we usually loose only a couple of consecutive images. + +### Messages format +Each message is composed by 2 parts: + +- Serialization of ModuleFrame in the first part. +- Frame data in the second part. + +Module frame is defined as: +```c++ +#pragma pack(push) +#pragma pack(1) +struct ModuleFrame { + uint64_t pulse_id; + uint64_t frame_index; + uint64_t daq_rec; + uint64_t n_recv_packets; + uint64_t module_id; +}; +#pragma pack(pop) +``` + +The frame data is a 1MB (1024*512 pixels * 2 bytes/pixel) blob of data in +**uint16** representing the detector image. + +## Image assembly +We first synchronize the modules. We do this by reading all sockets and +deciding the largest frame pulse_id among them (max_pulse_id). We then calculate +the diff between a specific socket pulse_id and the max_pulse_id. +This difference tells us how many messages we need to discard from a specific socket. + +This discarding is the source of possible missing images in the output stream. +It can happen in 3 cases: + +- At least one of the detector modules did not sent any packets for the specific +pulse_id. +- All the packets from a specific module for a pulse_id were lost before UDP +receiving them. +- ZMQ HWM was reached (either on the sf-buffer or sf-stream) and the message was +dropped. + +All this 3 cases are highly unlikely, so synchronization is mostly needed when +first starting sf-stream. Different sockets connect to sf-buffers at different +times. Apart from the initial synchronization there should be no need to +re-synchronize modules in a healthy running environment. + +If an image is missing any ZMQ messages from sf-buffers (not all modules data +arrived), the image will be dropped. We do not do partial reconstruction in +sf-stream. However, it is important to note, that this does not cover the case +where frames are incomplete (missing UDP packets on sf-buffer) - we still +assemble this images as long as at least 1 packet/frame for a specific pulse_id +arrived. + +## ZMQ sending + +We devide the ZMQ sending to 3 types of stream: + +- Data processing stream. This is basically the complete stream from +the detector with all meta and data. It can be described as full data full +meta stream. Only 1 client at the time can be connected to this stream +(PUSH/PULL for load balancing). + +- Live viewing stream. This is a reduced data full meta stream. We send +meta for all frames, but data only for subset of them (10Hz, for example). +Any number of clients can connect to the 10Hz stream, because we use PUB/SUB +for this socket. + +- Pulse_id stream. This is a stream that sends out only the current pulse_id. +It can be used to synchronize any external system with the current pulse_id +being recorded. Multiple clients can connect to this stream. + +In the data processing and live viewing stream we use +[Array 1.0](https://github.com/paulscherrerinstitute/htypes/blob/master/array-1.0.md) +as our protocol to be compatible with currently available external components. + +We use following fields in the JSON header: + +| Name | Type | Comment | +| --- | --- | --- | +| pulse_id | uint64 |bunchid from detector header| +|frame|uint64|frame_index from detector header| +|is_good_frame|bool|true if all packets for this frame are present| +|daq_rec|uint32|daqrec from detector header| +|pedestal_file|string|Path to pedestal file| +|gain_file|string|Path to gain file| +|number_frames_expected|int|Number of expected frames| +|run_name|string|Name of the run| +|detector_name|string|Name of the detector| +|htype|string|Value: "array-1.0"| +|type|string|Value: "uint16"| +|shape|Array[uint64]|Shape of the image in stream| + +### Full data full meta stream + +This stream runs at detector frequency and uses PUSH/PULL to distribute data +to max 1 client (this client can have many processes, but it needs to be a +single logical entity, since the images are evenly distributed to all +connected sockets). + +![image_full_stream](../docs/sf_daq_buffer-FullStream.jpg) + +The goal here is to provide a complete copy of the detector image stream +for purposes of online analysis. Given the large amount of data on this +stream only "pre-approved" applications that can handle the load should be +attached here. + +### Reduced data full meta stream + +This streams also runs at detector frequency for JSON headers (meta), but +it sends only part of the images in the stream. The rest of the images are +sent as empty buffers (the receiver needs to be aware of this behaviour, as +Array 1.0 alone does not define it). + +![image_reduced_stream](../docs/sf_daq_buffer-ReducedStream.jpg) + +This is the lightweight version of the image stream. Any number of clients +can connect to this stream (PUB/SUB) but no client can do load +balancing automatically (it would require PUSH/PULL). + +This is a "public interface" for anyone who wants to get detector data live, +and can do with only a subset of images. + +### Pulse_id stream + +This stream runs ar detector frequency in PUB/SUB mode. The only thing it +does is sends out the pulse_id (of the just received image) in uint64_t +format. + +![image_pulse_stream](../docs/sf_daq_buffer-PulseStream.jpg) + +This is also a "public interface" for anyone who wants to get the current +system pulse_id. \ No newline at end of file diff --git a/std-udp-sync/include/SyncStats.hpp b/std-udp-sync/include/SyncStats.hpp new file mode 100644 index 0000000..18b9d1d --- /dev/null +++ b/std-udp-sync/include/SyncStats.hpp @@ -0,0 +1,27 @@ +#ifndef SF_DAQ_BUFFER_SYNCSTATS_HPP +#define SF_DAQ_BUFFER_SYNCSTATS_HPP + +#include +#include +#include + +class SyncStats { + const std::string detector_name_; + const size_t stats_modulo_; + + int image_counter_; + int n_sync_lost_images_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + SyncStats(const std::string &detector_name, + const size_t stats_modulo); + + void record_stats(const uint32_t n_lost_pulses); +}; + + +#endif //SF_DAQ_BUFFER_SYNCSTATS_HPP diff --git a/std-udp-sync/include/UdpSyncConfig.hpp b/std-udp-sync/include/UdpSyncConfig.hpp new file mode 100644 index 0000000..89d735a --- /dev/null +++ b/std-udp-sync/include/UdpSyncConfig.hpp @@ -0,0 +1,29 @@ +#ifndef SF_DAQ_BUFFER_UDPRECVCONFIG_HPP +#define SF_DAQ_BUFFER_UDPRECVCONFIG_HPP + + +#include +#include +#include +#include +#include + +struct UdpSyncConfig { + static UdpSyncConfig from_json_file(const std::string& filename) { + std::ifstream ifs(filename); + rapidjson::IStreamWrapper isw(ifs); + rapidjson::Document config_parameters; + config_parameters.ParseStream(isw); + + return { + config_parameters["detector_name"].GetString(), + config_parameters["n_modules"].GetInt() + }; + } + + const std::string detector_name; + const int n_modules; +}; + + +#endif //SF_DAQ_BUFFER_UDPRECVCONFIG_HPP diff --git a/std-udp-sync/include/ZmqPulseSyncReceiver.hpp b/std-udp-sync/include/ZmqPulseSyncReceiver.hpp new file mode 100644 index 0000000..26a5d5b --- /dev/null +++ b/std-udp-sync/include/ZmqPulseSyncReceiver.hpp @@ -0,0 +1,34 @@ +#ifndef SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP +#define SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP + + +#include +#include +#include + +#include "formats.hpp" + +struct PulseAndSync { + const uint64_t image_id; + const uint32_t n_lost_pulses; +}; + +class ZmqPulseSyncReceiver { + + void* ctx_; + const int n_modules_; + + std::vector sockets_; + +public: + ZmqPulseSyncReceiver( + void* ctx, + const std::string& detector_name, + const int n_modules); + ~ZmqPulseSyncReceiver(); + + PulseAndSync get_next_pulse_id() const; +}; + + +#endif //SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP diff --git a/std-udp-sync/include/sync_config.hpp b/std-udp-sync/include/sync_config.hpp new file mode 100644 index 0000000..ff8f500 --- /dev/null +++ b/std-udp-sync/include/sync_config.hpp @@ -0,0 +1,11 @@ +namespace sync_config +{ + // If the modules are offset more than 1000 pulses, crush. + const uint64_t PULSE_OFFSET_LIMIT = 100; + + // Number of times we try to re-sync in case of failure. + const int SYNC_RETRY_LIMIT = 3; + + // Number of pulses between each statistics print out. + const size_t SYNC_STATS_MODULO = 1000; +} diff --git a/std-udp-sync/src/SyncStats.cpp b/std-udp-sync/src/SyncStats.cpp new file mode 100644 index 0000000..e9bb76d --- /dev/null +++ b/std-udp-sync/src/SyncStats.cpp @@ -0,0 +1,55 @@ +#include "SyncStats.hpp" + +#include + +using namespace std; +using namespace chrono; + +SyncStats::SyncStats( + const std::string &detector_name, + const size_t stats_modulo) : + detector_name_(detector_name), + stats_modulo_(stats_modulo) +{ + reset_counters(); +} + +void SyncStats::reset_counters() +{ + image_counter_ = 0; + n_sync_lost_images_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void SyncStats::record_stats(const uint32_t n_lost_pulses) +{ + image_counter_++; + n_sync_lost_images_ += n_lost_pulses; + + if (image_counter_ == stats_modulo_) { + print_stats(); + reset_counters(); + } +} + +void SyncStats::print_stats() +{ + auto interval_ms_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((image_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "std_udp_sync"; + cout << ",detector_name=" << detector_name_; + cout << " "; + cout << "n_processed_images=" << image_counter_ << "i"; + cout << ",n_sync_lost_images=" << n_sync_lost_images_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} + diff --git a/std-udp-sync/src/ZmqPulseSyncReceiver.cpp b/std-udp-sync/src/ZmqPulseSyncReceiver.cpp new file mode 100644 index 0000000..c8a87cb --- /dev/null +++ b/std-udp-sync/src/ZmqPulseSyncReceiver.cpp @@ -0,0 +1,127 @@ +#include "ZmqPulseSyncReceiver.hpp" +#include "BufferUtils.hpp" + +#include +#include +#include +#include +#include +#include "date.h" + +#include "sync_config.hpp" + +using namespace std; +using namespace chrono; +using namespace sync_config; + + +ZmqPulseSyncReceiver::ZmqPulseSyncReceiver( + void * ctx, + const string& detector_name, + const int n_modules) : + ctx_(ctx), + n_modules_(n_modules) +{ + sockets_.reserve(n_modules_); + + for (int i=0; i::max();; + uint64_t max_pulse_id = 0; + + for (int i = 0; i < n_modules_; i++) { + min_pulse_id = min(min_pulse_id, pulses[i]); + max_pulse_id = max(max_pulse_id, pulses[i]); + } + + auto max_diff = max_pulse_id - min_pulse_id; + if (max_diff > PULSE_OFFSET_LIMIT) { + stringstream err_msg; + err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]"; + err_msg << " PULSE_OFFSET_LIMIT exceeded."; + err_msg << " max_diff=" << max_diff << " pulses."; + + for (int i = 0; i < n_modules_; i++) { + err_msg << " (module " << i << ", "; + err_msg << pulses[i] << "),"; + } + err_msg << endl; + + throw runtime_error(err_msg.str()); + } + + modules_in_sync = true; + // Max pulses we lost in this sync attempt. + uint32_t i_sync_lost_pulses = 0; + for (int i = 0; i < n_modules_; i++) { + // How many pulses we lost for this specific module. + uint32_t i_module_lost_pulses = 0; + while (pulses[i] < max_pulse_id) { + zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0); + i_module_lost_pulses++; + } + + i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses); + + if (pulses[i] != max_pulse_id) { + modules_in_sync = false; + } + } + n_lost_pulses += i_sync_lost_pulses; + + if (modules_in_sync) { + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [ZmqPulseSyncReceiver::get_next_pulse_id]"; + cout << " modules_in_sync false"; + cout << endl; + #endif + return {pulses[0], n_lost_pulses}; + } + } + + stringstream err_msg; + err_msg << "[ZmqLiveReceiver::get_next_pulse_id]"; + err_msg << " SYNC_RETRY_LIMIT exceeded."; + err_msg << endl; + + throw runtime_error(err_msg.str()); +} diff --git a/std-udp-sync/src/main.cpp b/std-udp-sync/src/main.cpp new file mode 100644 index 0000000..1068091 --- /dev/null +++ b/std-udp-sync/src/main.cpp @@ -0,0 +1,69 @@ +#include +#include +#include +#include +#include +#include + +#include "date.h" +#include +#include "sync_config.hpp" +#include "ZmqPulseSyncReceiver.hpp" +#include "UdpSyncConfig.hpp" + + +using namespace std; +using namespace sync_config; + +#ifdef USE_EIGER + #include "eiger.hpp" +#else + #include "jungfrau.hpp" +#endif + +int main (int argc, char *argv[]) +{ + if (argc != 3) { + cout << endl; + cout << "Usage: std_udp_sync [detector_json_filename] [bit_depth]" << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; + cout << "\tbit_depth: bit depth of the incoming udp packets." << endl; + cout << endl; + + exit(-1); + } + + const auto config = UdpSyncConfig::from_json_file(string(argv[1])); + const int bit_depth = atoi(argv[2]); + + const size_t FRAME_N_BYTES = MODULE_N_PIXELS * bit_depth / 8; + + auto ctx = zmq_ctx_new(); + zmq_ctx_set(ctx, ZMQ_IO_THREADS, 1); + + auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "sync"); + + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [Assembler] :"; + cout << " Details of Assembler:"; + cout << " detector_name: " << config.detector_name; + cout << " || n_modules: " << config.n_modules; + cout << endl; + #endif + + RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame), + FRAME_N_BYTES, config.n_modules); + + ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules); + SyncStats stats(config.detector_name, SYNC_STATS_MODULO); + + while (true) { + auto meta = receiver.get_next_pulse_id(); + + zmq_send(sender, &meta.image_id, sizeof(meta.image_id), 0); + + stats.record_stats(meta.n_lost_pulses); + } +} diff --git a/std-udp-sync/test/CMakeLists.txt b/std-udp-sync/test/CMakeLists.txt new file mode 100644 index 0000000..01fc383 --- /dev/null +++ b/std-udp-sync/test/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(std-udp-sync-tests main.cpp) + +target_link_libraries(std-udp-sync-tests + std-udp-sync-lib + gtest + ) + diff --git a/std-udp-sync/test/main.cpp b/std-udp-sync/test/main.cpp new file mode 100644 index 0000000..e819294 --- /dev/null +++ b/std-udp-sync/test/main.cpp @@ -0,0 +1,8 @@ +#include "gtest/gtest.h" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}