From 9d56da96f91b5af61ac2ceaa05ef3b2bfb43a6ae Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 17:59:09 +0200 Subject: [PATCH] Add stub of std-stream-send --- std-stream-send/CMakeLists.txt | 18 ++ std-stream-send/README.md | 179 +++++++++++++++++++ std-stream-send/build_stdstream.sh | 10 ++ std-stream-send/debug.Dockerfile | 13 ++ std-stream-send/include/StreamSendConfig.hpp | 33 ++++ std-stream-send/include/stream_config.hpp | 14 ++ std-stream-send/src/main.cpp | 52 ++++++ std-stream-send/test/CMakeLists.txt | 7 + std-stream-send/test/main.cpp | 8 + 9 files changed, 334 insertions(+) create mode 100644 std-stream-send/CMakeLists.txt create mode 100644 std-stream-send/README.md create mode 100644 std-stream-send/build_stdstream.sh create mode 100644 std-stream-send/debug.Dockerfile create mode 100644 std-stream-send/include/StreamSendConfig.hpp create mode 100644 std-stream-send/include/stream_config.hpp create mode 100644 std-stream-send/src/main.cpp create mode 100644 std-stream-send/test/CMakeLists.txt create mode 100644 std-stream-send/test/main.cpp diff --git a/std-stream-send/CMakeLists.txt b/std-stream-send/CMakeLists.txt new file mode 100644 index 0000000..3d8b419 --- /dev/null +++ b/std-stream-send/CMakeLists.txt @@ -0,0 +1,18 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(std-stream-send-lib STATIC ${SOURCES}) +target_include_directories(std-stream-send-lib PUBLIC include/) +target_link_libraries(std-stream-send-lib + core-buffer-lib) + +add_executable(std-stream-send src/main.cpp) +set_target_properties(std-stream-send PROPERTIES OUTPUT_NAME std_stream_send) +target_link_libraries(std-stream-send + std-stream-send-lib + zmq + pthread + rt) + +enable_testing() +add_subdirectory(test/) diff --git a/std-stream-send/README.md b/std-stream-send/README.md new file mode 100644 index 0000000..ce21a4d --- /dev/null +++ b/std-stream-send/README.md @@ -0,0 +1,179 @@ +# sf-stream +sf-stream is the component that receives a live stream of frame data from +sf-buffers over ZMQ and assembles them into images. This images are then +sent again over ZMQ to external components. There is always only 1 sf-stream +per detector. + +It currently has 3 output streams: + +- **Full data full meta** rate stream (send all images and meta) +- **Reduced data full meta** rate stream (send less images, but +all meta) +- **Pulse_id** stream (send only the current pulse_id) + +In addition to receiving and assembling images, sf-stream also calculates +additional meta and constructs the structures needed to send data in +Array 1.0 protocol. + +This component does not guarantee that the streams will always contain all +the data - it can happen that frame resynchronization is needed, and in this +case 1 or more frames could potentially be lost. This happens so rarely that in +practice is not a problem. + +## Overview + +![image_stream_overview](../docs/sf_daq_buffer-overview-stream.jpg) + +sf-stream is a single threaded application (without counting the ZMQ IO threads) +that is used for providing live assembled images to anyone willing to listen. + +In addition, it also provides a pulse_id stream, which is the most immediate +pulse_id feedback we currently have in case we need to synchronize external +components to the current machine pulse_id. + +## ZMQ receiving +Each ZMQ stream is coming from a separate sf-buffer. This means that we have as +many connections as we have modules in a detector. + +Messages are multipart (2 parts) and are received in PUB/SUB mode. + +There is no need for special synchronization between modules as we expect that +frames will always be in the correct order and all modules will provide the +same frame more or less at the same time. If any of this 2 conditions is not +met, the detector is not working properly and we cannot guaranty that sf-stream +will work correctly. + +Nonetheless we provide the capability to synchronize the streams in image +assembly phase - this is needed rarely, but occasionally happens. In this sort +of hiccups we usually loose only a couple of consecutive images. + +### Messages format +Each message is composed by 2 parts: + +- Serialization of ModuleFrame in the first part. +- Frame data in the second part. + +Module frame is defined as: +```c++ +#pragma pack(push) +#pragma pack(1) +struct ModuleFrame { + uint64_t pulse_id; + uint64_t frame_index; + uint64_t daq_rec; + uint64_t n_recv_packets; + uint64_t module_id; +}; +#pragma pack(pop) +``` + +The frame data is a 1MB (1024*512 pixels * 2 bytes/pixel) blob of data in +**uint16** representing the detector image. + +## Image assembly +We first synchronize the modules. We do this by reading all sockets and +deciding the largest frame pulse_id among them (max_pulse_id). We then calculate +the diff between a specific socket pulse_id and the max_pulse_id. +This difference tells us how many messages we need to discard from a specific socket. + +This discarding is the source of possible missing images in the output stream. +It can happen in 3 cases: + +- At least one of the detector modules did not sent any packets for the specific +pulse_id. +- All the packets from a specific module for a pulse_id were lost before UDP +receiving them. +- ZMQ HWM was reached (either on the sf-buffer or sf-stream) and the message was +dropped. + +All this 3 cases are highly unlikely, so synchronization is mostly needed when +first starting sf-stream. Different sockets connect to sf-buffers at different +times. Apart from the initial synchronization there should be no need to +re-synchronize modules in a healthy running environment. + +If an image is missing any ZMQ messages from sf-buffers (not all modules data +arrived), the image will be dropped. We do not do partial reconstruction in +sf-stream. However, it is important to note, that this does not cover the case +where frames are incomplete (missing UDP packets on sf-buffer) - we still +assemble this images as long as at least 1 packet/frame for a specific pulse_id +arrived. + +## ZMQ sending + +We devide the ZMQ sending to 3 types of stream: + +- Data processing stream. This is basically the complete stream from +the detector with all meta and data. It can be described as full data full +meta stream. Only 1 client at the time can be connected to this stream +(PUSH/PULL for load balancing). + +- Live viewing stream. This is a reduced data full meta stream. We send +meta for all frames, but data only for subset of them (10Hz, for example). +Any number of clients can connect to the 10Hz stream, because we use PUB/SUB +for this socket. + +- Pulse_id stream. This is a stream that sends out only the current pulse_id. +It can be used to synchronize any external system with the current pulse_id +being recorded. Multiple clients can connect to this stream. + +In the data processing and live viewing stream we use +[Array 1.0](https://github.com/paulscherrerinstitute/htypes/blob/master/array-1.0.md) +as our protocol to be compatible with currently available external components. + +We use following fields in the JSON header: + +| Name | Type | Comment | +| --- | --- | --- | +| pulse_id | uint64 |bunchid from detector header| +|frame|uint64|frame_index from detector header| +|is_good_frame|bool|true if all packets for this frame are present| +|daq_rec|uint32|daqrec from detector header| +|pedestal_file|string|Path to pedestal file| +|gain_file|string|Path to gain file| +|number_frames_expected|int|Number of expected frames| +|run_name|string|Name of the run| +|detector_name|string|Name of the detector| +|htype|string|Value: "array-1.0"| +|type|string|Value: "uint16"| +|shape|Array[uint64]|Shape of the image in stream| + +### Full data full meta stream + +This stream runs at detector frequency and uses PUSH/PULL to distribute data +to max 1 client (this client can have many processes, but it needs to be a +single logical entity, since the images are evenly distributed to all +connected sockets). + +![image_full_stream](../docs/sf_daq_buffer-FullStream.jpg) + +The goal here is to provide a complete copy of the detector image stream +for purposes of online analysis. Given the large amount of data on this +stream only "pre-approved" applications that can handle the load should be +attached here. + +### Reduced data full meta stream + +This streams also runs at detector frequency for JSON headers (meta), but +it sends only part of the images in the stream. The rest of the images are +sent as empty buffers (the receiver needs to be aware of this behaviour, as +Array 1.0 alone does not define it). + +![image_reduced_stream](../docs/sf_daq_buffer-ReducedStream.jpg) + +This is the lightweight version of the image stream. Any number of clients +can connect to this stream (PUB/SUB) but no client can do load +balancing automatically (it would require PUSH/PULL). + +This is a "public interface" for anyone who wants to get detector data live, +and can do with only a subset of images. + +### Pulse_id stream + +This stream runs ar detector frequency in PUB/SUB mode. The only thing it +does is sends out the pulse_id (of the just received image) in uint64_t +format. + +![image_pulse_stream](../docs/sf_daq_buffer-PulseStream.jpg) + +This is also a "public interface" for anyone who wants to get the current +system pulse_id. \ No newline at end of file diff --git a/std-stream-send/build_stdstream.sh b/std-stream-send/build_stdstream.sh new file mode 100644 index 0000000..d954ab2 --- /dev/null +++ b/std-stream-send/build_stdstream.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +VERSION=1.0.0 + +docker build --no-cache=true -f debug.Dockerfile -t paulscherrerinstitute/std-stream-send-sim . +docker tag paulscherrerinstitute/std-stream-send-sim paulscherrerinstitute/std-stream-send-sim:$VERSION + +docker login +docker push paulscherrerinstitute/std-stream-send-sim:$VERSION +docker push paulscherrerinstitute/std-stream-send-sim \ No newline at end of file diff --git a/std-stream-send/debug.Dockerfile b/std-stream-send/debug.Dockerfile new file mode 100644 index 0000000..97a772a --- /dev/null +++ b/std-stream-send/debug.Dockerfile @@ -0,0 +1,13 @@ +FROM paulscherrerinstitute/sf-daq_phdf5:1.0.0 + +COPY . /sf_daq_buffer/ + +RUN mkdir /sf_daq_buffer/build && \ + cd /sf_daq_buffer/build && \ + cmake3 .. && \ + make std-stream-send + +WORKDIR /sf_daq_buffer/build + +ENTRYPOINT ["/sf_daq_buffer/build/std_stream"] + diff --git a/std-stream-send/include/StreamSendConfig.hpp b/std-stream-send/include/StreamSendConfig.hpp new file mode 100644 index 0000000..36ecd2c --- /dev/null +++ b/std-stream-send/include/StreamSendConfig.hpp @@ -0,0 +1,33 @@ +#ifndef SF_DAQ_BUFFER_UDPRECVCONFIG_HPP +#define SF_DAQ_BUFFER_UDPRECVCONFIG_HPP + + +#include +#include +#include +#include +#include + +struct StreamSendConfig { + static StreamSendConfig from_json_file(const std::string& filename) { + std::ifstream ifs(filename); + rapidjson::IStreamWrapper isw(ifs); + rapidjson::Document config_parameters; + config_parameters.ParseStream(isw); + + return { + config_parameters["detector_name"].GetString(), + config_parameters["detector_type"].GetString(), + config_parameters["n_modules"].GetInt(), + config_parameters["start_udp_port"].GetInt(), + }; + } + + const std::string detector_name; + const std::string detector_type; + const int n_modules; + const int start_udp_port; +}; + + +#endif //SF_DAQ_BUFFER_UDPRECVCONFIG_HPP diff --git a/std-stream-send/include/stream_config.hpp b/std-stream-send/include/stream_config.hpp new file mode 100644 index 0000000..a0f1d72 --- /dev/null +++ b/std-stream-send/include/stream_config.hpp @@ -0,0 +1,14 @@ +namespace stream_config +{ + // N of IO threads to receive data from modules. + const int STREAM_ZMQ_IO_THREADS = 1; + // How long should the RECV queue be. + const size_t STREAM_RCVHWM = 100; + + const int PROCESSING_ZMQ_SNDHWM = 10; + // Keep the last second of pulses in the buffer. + const int PULSE_ZMQ_SNDHWM = 100; + + // Number of pulses between each statistics print out. + const size_t STREAM_STATS_MODULO = 1000; +} diff --git a/std-stream-send/src/main.cpp b/std-stream-send/src/main.cpp new file mode 100644 index 0000000..c832578 --- /dev/null +++ b/std-stream-send/src/main.cpp @@ -0,0 +1,52 @@ +#include +#include +#include "stream_config.hpp" +#include +#include +#include "RamBuffer.hpp" + + +using namespace std; +using namespace stream_config; + +int main (int argc, char *argv[]) +{ + auto ctx = zmq_ctx_new(); + zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); + + auto sender = zmq_socket(ctx, ZMQ_PUSH); + const int sndhwm = PROCESSING_ZMQ_SNDHWM; + if (zmq_setsockopt( + sender, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + const int linger = 0; + if (zmq_setsockopt( + sender, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + if (zmq_bind(sender, "tcp://127.0.0.1:10000") != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + RamBuffer image_buffer(config.detector_name + "_assembler", + sizeof(ImageMetadata), assembler.get_image_n_bytes(), 1); + + while (true) { + + image_id = 123; + + zmq_send(sender, + ram_buffer.get_slot_meta(image_id), + sizeof(ImageMetadata), ZMQ_SNDMORE); + + zmq_send(sender, + ram_buffer.get_slot_data(image_id), + buffer_config::MODULE_N_BYTES * 4, 0); + + pulse_id++; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } +} diff --git a/std-stream-send/test/CMakeLists.txt b/std-stream-send/test/CMakeLists.txt new file mode 100644 index 0000000..2778a7a --- /dev/null +++ b/std-stream-send/test/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(std-stream-send-tests main.cpp) + +target_link_libraries(std-stream-send-tests + std-stream-send-lib + gtest + ) + diff --git a/std-stream-send/test/main.cpp b/std-stream-send/test/main.cpp new file mode 100644 index 0000000..e819294 --- /dev/null +++ b/std-stream-send/test/main.cpp @@ -0,0 +1,8 @@ +#include "gtest/gtest.h" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}