Conflicts:
	jf-assembler/src/main.cpp
	jf-udp-recv/src/main.cpp
	std-udp-recv/src/FrameUdpReceiver.cpp
This commit is contained in:
lhdamiani
2021-07-05 22:13:15 +02:00
43 changed files with 1137 additions and 168 deletions
+7 -8
View File
@@ -31,13 +31,12 @@ add_subdirectory(
EXCLUDE_FROM_ALL)
add_subdirectory("core-buffer")
add_subdirectory("jf-udp-recv")
add_subdirectory("jf-buffer-writer")
#add_subdirectory("std-stream-send")
add_subdirectory("std-udp-recv")
add_subdirectory("std-udp-sync")
#add_subdirectory("jf-buffer-writer")
add_subdirectory("jf-assembler")
add_subdirectory("sf-stream")
add_subdirectory("sf-writer")
if(BUILD_JF_LIVE_WRITER)
add_subdirectory("jf-live-writer")
endif()
#add_subdirectory("sf-stream")
#add_subdirectory("sf-writer")
add_subdirectory("jf-live-writer")
+1
View File
@@ -3,6 +3,7 @@
#include <string>
#include "formats.hpp"
#include "buffer_config.hpp"
class RamBuffer {
const std::string buffer_name_;
-1
View File
@@ -24,7 +24,6 @@ namespace buffer_config {
const size_t BUFFER_BLOCK_SIZE = 100;
const size_t BUFFER_UDP_N_RECV_MSG = 128;
// Size of UDP recv buffer
const int BUFFER_UDP_RCVBUF_N_SLOTS = 100;
// 8246 bytes for each UDP packet.
+5 -2
View File
@@ -4,6 +4,10 @@
#include <cstdint>
#include <stdint.h>
#define IS_BOTTOM(n) ((n%2 != 0) ? -1 : 1)
const std::string DETECTOR_TYPE = "eiger";
#define N_MODULES 1
#define BYTES_PER_PACKET 4144
#define DATA_BYTES_PER_PACKET 4096
@@ -20,7 +24,6 @@
#define MODULE_Y_SIZE 512
#define MODULE_N_PIXELS 131072
#define PIXEL_N_BYTES 2
#define MODULE_N_BYTES 262144
#define GAP_X_MODULE_PIXELS 2
#define GAP_Y_MODULE_PIXELS 2
#define GAP_X_EIGERMOD_PIXELS 8
@@ -28,7 +31,7 @@
#define N_BYTES_PER_MODULE_LINE(bit_depth) ((MODULE_X_SIZE * bit_depth) / 8)
#define N_BYTES_PER_MODULE_FRAME(bit_depth) ((131072 * bit_depth) / 8)
#define N_BYTES_PER_MODULE_FRAME(bit_depth) ((MODULE_N_PIXELS * bit_depth) / 8)
// #define N_BYTES_PER_IMAGE_LINE(bit_depth, n_submodules) ((n_submodules / 2 * MODULE_X_SIZE * bit_depth) / 8)
+12 -37
View File
@@ -1,27 +1,18 @@
#ifndef SF_DAQ_BUFFER_FORMATS_HPP
#define SF_DAQ_BUFFER_FORMATS_HPP
#include "buffer_config.hpp"
#ifndef USE_EIGER
#include "jungfrau.hpp"
#else
#include "eiger.hpp"
#endif
#define IS_BOTTOM(n) ((n%2 != 0) ? -1 : 1)
#pragma pack(push)
#pragma pack(1)
struct ModuleFrame {
uint64_t id;
uint64_t pulse_id;
uint64_t frame_index;
uint64_t daq_rec;
uint64_t n_recv_packets;
uint64_t module_id;
uint16_t bit_depth;
uint16_t row;
uint16_t column;
uint16_t pos_y;
uint16_t pos_x;
};
#pragma pack(pop)
@@ -29,33 +20,17 @@ struct ModuleFrame {
#pragma pack(push)
#pragma pack(1)
struct ImageMetadata {
uint64_t pulse_id;
uint64_t frame_index;
uint32_t daq_rec;
uint32_t is_good_image;
uint64_t id;
uint64_t height;
uint64_t width;
uint64_t dtype;
uint64_t encoding;
uint64_t source_id;
uint64_t status;
uint64_t user_1;
uint64_t user_2;
};
#pragma pack(pop)
struct ModuleFrameBuffer {
ModuleFrame module[N_MODULES];
};
#pragma pack(push)
#pragma pack(1)
struct BufferBinaryFormat {
const char FORMAT_MARKER = 0xBE;
ModuleFrame meta;
char data[MODULE_N_BYTES];
};
#pragma pack(pop)
#pragma pack(push)
#pragma pack(1)
struct BufferBinaryBlock
{
BufferBinaryFormat frame[buffer_config::BUFFER_BLOCK_SIZE];
uint64_t start_pulse_id;
};
#pragma pack(pop)
#endif //SF_DAQ_BUFFER_FORMATS_HPP
+2 -1
View File
@@ -3,6 +3,8 @@
#include <cstdint>
const std::string DETECTOR_TYPE = "jungfrau";
#define N_MODULES 32
#define BYTES_PER_PACKET 8240
#define DATA_BYTES_PER_PACKET 8192
@@ -40,5 +42,4 @@ struct det_packet {
};
#pragma pack(pop)
#endif
+1 -1
View File
@@ -62,7 +62,7 @@ int main (int argc, char *argv[])
// Fair distribution of images among writers.
if (meta.i_image % n_writers == i_writer) {
char* data = ram_buffer.read_image(meta.image_metadata.pulse_id);
char* data = ram_buffer.get_slot_data(meta.image_metadata.pulse_id);
stats.start_image_write();
writer.write_data(meta.run_id, meta.i_image, data);
-26
View File
@@ -1,26 +0,0 @@
file(GLOB SOURCES
src/*.cpp)
add_library(jf-udp-recv-lib STATIC ${SOURCES})
target_include_directories(jf-udp-recv-lib PUBLIC include/)
target_link_libraries(jf-udp-recv-lib
external
core-buffer-lib)
add_executable(jf-udp-recv src/main.cpp)
if (USE_EIGER)
set (LIB_NAME_UDP_RECV "eiger_udp_recv")
else()
set (LIB_NAME_UDP_RECV "jf_udp_recv")
endif()
set_target_properties(jf-udp-recv PROPERTIES OUTPUT_NAME ${LIB_NAME_UDP_RECV})
target_link_libraries(jf-udp-recv
jf-udp-recv-lib
zmq
rt)
enable_testing()
add_subdirectory(test/)
+18
View File
@@ -0,0 +1,18 @@
file(GLOB SOURCES
src/*.cpp)
add_library(std-stream-send-lib STATIC ${SOURCES})
target_include_directories(std-stream-send-lib PUBLIC include/)
target_link_libraries(std-stream-send-lib
core-buffer-lib)
add_executable(std-stream-send src/main.cpp)
set_target_properties(std-stream-send PROPERTIES OUTPUT_NAME std_stream_send)
target_link_libraries(std-stream-send
std-stream-send-lib
zmq
pthread
rt)
enable_testing()
add_subdirectory(test/)
+179
View File
@@ -0,0 +1,179 @@
# sf-stream
sf-stream is the component that receives a live stream of frame data from
sf-buffers over ZMQ and assembles them into images. This images are then
sent again over ZMQ to external components. There is always only 1 sf-stream
per detector.
It currently has 3 output streams:
- **Full data full meta** rate stream (send all images and meta)
- **Reduced data full meta** rate stream (send less images, but
all meta)
- **Pulse_id** stream (send only the current pulse_id)
In addition to receiving and assembling images, sf-stream also calculates
additional meta and constructs the structures needed to send data in
Array 1.0 protocol.
This component does not guarantee that the streams will always contain all
the data - it can happen that frame resynchronization is needed, and in this
case 1 or more frames could potentially be lost. This happens so rarely that in
practice is not a problem.
## Overview
![image_stream_overview](../docs/sf_daq_buffer-overview-stream.jpg)
sf-stream is a single threaded application (without counting the ZMQ IO threads)
that is used for providing live assembled images to anyone willing to listen.
In addition, it also provides a pulse_id stream, which is the most immediate
pulse_id feedback we currently have in case we need to synchronize external
components to the current machine pulse_id.
## ZMQ receiving
Each ZMQ stream is coming from a separate sf-buffer. This means that we have as
many connections as we have modules in a detector.
Messages are multipart (2 parts) and are received in PUB/SUB mode.
There is no need for special synchronization between modules as we expect that
frames will always be in the correct order and all modules will provide the
same frame more or less at the same time. If any of this 2 conditions is not
met, the detector is not working properly and we cannot guaranty that sf-stream
will work correctly.
Nonetheless we provide the capability to synchronize the streams in image
assembly phase - this is needed rarely, but occasionally happens. In this sort
of hiccups we usually loose only a couple of consecutive images.
### Messages format
Each message is composed by 2 parts:
- Serialization of ModuleFrame in the first part.
- Frame data in the second part.
Module frame is defined as:
```c++
#pragma pack(push)
#pragma pack(1)
struct ModuleFrame {
uint64_t pulse_id;
uint64_t frame_index;
uint64_t daq_rec;
uint64_t n_recv_packets;
uint64_t module_id;
};
#pragma pack(pop)
```
The frame data is a 1MB (1024*512 pixels * 2 bytes/pixel) blob of data in
**uint16** representing the detector image.
## Image assembly
We first synchronize the modules. We do this by reading all sockets and
deciding the largest frame pulse_id among them (max_pulse_id). We then calculate
the diff between a specific socket pulse_id and the max_pulse_id.
This difference tells us how many messages we need to discard from a specific socket.
This discarding is the source of possible missing images in the output stream.
It can happen in 3 cases:
- At least one of the detector modules did not sent any packets for the specific
pulse_id.
- All the packets from a specific module for a pulse_id were lost before UDP
receiving them.
- ZMQ HWM was reached (either on the sf-buffer or sf-stream) and the message was
dropped.
All this 3 cases are highly unlikely, so synchronization is mostly needed when
first starting sf-stream. Different sockets connect to sf-buffers at different
times. Apart from the initial synchronization there should be no need to
re-synchronize modules in a healthy running environment.
If an image is missing any ZMQ messages from sf-buffers (not all modules data
arrived), the image will be dropped. We do not do partial reconstruction in
sf-stream. However, it is important to note, that this does not cover the case
where frames are incomplete (missing UDP packets on sf-buffer) - we still
assemble this images as long as at least 1 packet/frame for a specific pulse_id
arrived.
## ZMQ sending
We devide the ZMQ sending to 3 types of stream:
- Data processing stream. This is basically the complete stream from
the detector with all meta and data. It can be described as full data full
meta stream. Only 1 client at the time can be connected to this stream
(PUSH/PULL for load balancing).
- Live viewing stream. This is a reduced data full meta stream. We send
meta for all frames, but data only for subset of them (10Hz, for example).
Any number of clients can connect to the 10Hz stream, because we use PUB/SUB
for this socket.
- Pulse_id stream. This is a stream that sends out only the current pulse_id.
It can be used to synchronize any external system with the current pulse_id
being recorded. Multiple clients can connect to this stream.
In the data processing and live viewing stream we use
[Array 1.0](https://github.com/paulscherrerinstitute/htypes/blob/master/array-1.0.md)
as our protocol to be compatible with currently available external components.
We use following fields in the JSON header:
| Name | Type | Comment |
| --- | --- | --- |
| pulse_id | uint64 |bunchid from detector header|
|frame|uint64|frame_index from detector header|
|is_good_frame|bool|true if all packets for this frame are present|
|daq_rec|uint32|daqrec from detector header|
|pedestal_file|string|Path to pedestal file|
|gain_file|string|Path to gain file|
|number_frames_expected|int|Number of expected frames|
|run_name|string|Name of the run|
|detector_name|string|Name of the detector|
|htype|string|Value: "array-1.0"|
|type|string|Value: "uint16"|
|shape|Array[uint64]|Shape of the image in stream|
### Full data full meta stream
This stream runs at detector frequency and uses PUSH/PULL to distribute data
to max 1 client (this client can have many processes, but it needs to be a
single logical entity, since the images are evenly distributed to all
connected sockets).
![image_full_stream](../docs/sf_daq_buffer-FullStream.jpg)
The goal here is to provide a complete copy of the detector image stream
for purposes of online analysis. Given the large amount of data on this
stream only "pre-approved" applications that can handle the load should be
attached here.
### Reduced data full meta stream
This streams also runs at detector frequency for JSON headers (meta), but
it sends only part of the images in the stream. The rest of the images are
sent as empty buffers (the receiver needs to be aware of this behaviour, as
Array 1.0 alone does not define it).
![image_reduced_stream](../docs/sf_daq_buffer-ReducedStream.jpg)
This is the lightweight version of the image stream. Any number of clients
can connect to this stream (PUB/SUB) but no client can do load
balancing automatically (it would require PUSH/PULL).
This is a "public interface" for anyone who wants to get detector data live,
and can do with only a subset of images.
### Pulse_id stream
This stream runs ar detector frequency in PUB/SUB mode. The only thing it
does is sends out the pulse_id (of the just received image) in uint64_t
format.
![image_pulse_stream](../docs/sf_daq_buffer-PulseStream.jpg)
This is also a "public interface" for anyone who wants to get the current
system pulse_id.
+10
View File
@@ -0,0 +1,10 @@
#!/bin/bash
VERSION=1.0.0
docker build --no-cache=true -f debug.Dockerfile -t paulscherrerinstitute/std-stream-send-sim .
docker tag paulscherrerinstitute/std-stream-send-sim paulscherrerinstitute/std-stream-send-sim:$VERSION
docker login
docker push paulscherrerinstitute/std-stream-send-sim:$VERSION
docker push paulscherrerinstitute/std-stream-send-sim
+13
View File
@@ -0,0 +1,13 @@
FROM paulscherrerinstitute/sf-daq_phdf5:1.0.0
COPY . /sf_daq_buffer/
RUN mkdir /sf_daq_buffer/build && \
cd /sf_daq_buffer/build && \
cmake3 .. && \
make std-stream-send
WORKDIR /sf_daq_buffer/build
ENTRYPOINT ["/sf_daq_buffer/build/std_stream"]
@@ -0,0 +1,33 @@
#ifndef SF_DAQ_BUFFER_UDPRECVCONFIG_HPP
#define SF_DAQ_BUFFER_UDPRECVCONFIG_HPP
#include <rapidjson/istreamwrapper.h>
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <string>
#include <fstream>
struct StreamSendConfig {
static StreamSendConfig from_json_file(const std::string& filename) {
std::ifstream ifs(filename);
rapidjson::IStreamWrapper isw(ifs);
rapidjson::Document config_parameters;
config_parameters.ParseStream(isw);
return {
config_parameters["detector_name"].GetString(),
config_parameters["detector_type"].GetString(),
config_parameters["n_modules"].GetInt(),
config_parameters["start_udp_port"].GetInt(),
};
}
const std::string detector_name;
const std::string detector_type;
const int n_modules;
const int start_udp_port;
};
#endif //SF_DAQ_BUFFER_UDPRECVCONFIG_HPP
+14
View File
@@ -0,0 +1,14 @@
namespace stream_config
{
// N of IO threads to receive data from modules.
const int STREAM_ZMQ_IO_THREADS = 1;
// How long should the RECV queue be.
const size_t STREAM_RCVHWM = 100;
const int PROCESSING_ZMQ_SNDHWM = 10;
// Keep the last second of pulses in the buffer.
const int PULSE_ZMQ_SNDHWM = 100;
// Number of pulses between each statistics print out.
const size_t STREAM_STATS_MODULO = 1000;
}
+52
View File
@@ -0,0 +1,52 @@
#include <iostream>
#include <zmq.h>
#include "stream_config.hpp"
#include <chrono>
#include <thread>
#include "RamBuffer.hpp"
using namespace std;
using namespace stream_config;
int main (int argc, char *argv[])
{
auto ctx = zmq_ctx_new();
zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS);
auto sender = zmq_socket(ctx, ZMQ_PUSH);
const int sndhwm = PROCESSING_ZMQ_SNDHWM;
if (zmq_setsockopt(
sender, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
const int linger = 0;
if (zmq_setsockopt(
sender, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
if (zmq_bind(sender, "tcp://127.0.0.1:10000") != 0) {
throw runtime_error(zmq_strerror(errno));
}
RamBuffer image_buffer(config.detector_name + "_assembler",
sizeof(ImageMetadata), assembler.get_image_n_bytes(), 1);
while (true) {
image_id = 123;
zmq_send(sender,
ram_buffer.get_slot_meta(image_id),
sizeof(ImageMetadata), ZMQ_SNDMORE);
zmq_send(sender,
ram_buffer.get_slot_data(image_id),
buffer_config::MODULE_N_BYTES * 4, 0);
pulse_id++;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
+7
View File
@@ -0,0 +1,7 @@
add_executable(std-stream-send-tests main.cpp)
target_link_libraries(std-stream-send-tests
std-stream-send-lib
gtest
)
+8
View File
@@ -0,0 +1,8 @@
#include "gtest/gtest.h"
using namespace std;
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
+20
View File
@@ -0,0 +1,20 @@
file(GLOB SOURCES
src/*.cpp)
add_library(std-udp-recv-lib STATIC ${SOURCES})
target_include_directories(std-udp-recv-lib PUBLIC include/)
target_link_libraries(std-udp-recv-lib
external
core-buffer-lib)
add_executable(std-udp-recv src/main.cpp)
set_target_properties(std-udp-recv PROPERTIES OUTPUT_NAME std_udp_recv)
target_link_libraries(std-udp-recv
std-udp-recv-lib
zmq
rt)
enable_testing()
add_subdirectory(test/)
@@ -8,28 +8,24 @@
class FrameStats {
const std::string detector_name_;
const int n_modules_;
const int module_id_;
const int bit_depth_;
const int n_packets_per_frame_;
size_t stats_time_;
const size_t stats_time_;
int frames_counter_;
int n_missed_packets_;
int n_corrupted_frames_;
int n_corrupted_pulse_id_;
std::chrono::time_point<std::chrono::steady_clock> stats_interval_start_;
void reset_counters();
void print_stats();
public:////config.detector_name, n_receivers, module_id, bit_depth, STATS_TIME
FrameStats(const std::string &detector_name,
const int n_modules,
public:
FrameStats(std::string detector_name,
const int module_id,
const int bit_depth,
const int n_packets_per_frame,
const size_t stats_time);
void record_stats(const ModuleFrame &meta, const bool bad_pulse_id);
void record_stats(const ModuleFrame &meta);
};
@@ -6,18 +6,20 @@
#include "formats.hpp"
#include "buffer_config.hpp"
#ifdef USE_EIGER
#include "eiger.hpp"
#else
#include "jungfrau.hpp"
#endif
class FrameUdpReceiver {
const int module_id_;
const int bit_depth_;
const size_t n_packets_per_frame_;
const size_t data_bytes_per_frame_;
PacketUdpReceiver udp_receiver_;
const int n_packets_per_frame_;
det_packet packet_buffer_[buffer_config::BUFFER_UDP_N_RECV_MSG];
iovec recv_buff_ptr_[buffer_config::BUFFER_UDP_N_RECV_MSG];
mmsghdr msgs_[buffer_config::BUFFER_UDP_N_RECV_MSG];
sockaddr_in sock_from_[buffer_config::BUFFER_UDP_N_RECV_MSG];
det_packet* const packet_buffer_;
iovec* const recv_buff_ptr_;
mmsghdr* const msgs_;
sockaddr_in* const sock_from_;
bool packet_buffer_loaded_ = false;
int packet_buffer_n_packets_ = 0;
@@ -30,9 +32,9 @@ class FrameUdpReceiver {
const int n_packets, ModuleFrame& metadata, char* frame_buffer);
public:
FrameUdpReceiver(const int module_id, const uint16_t port, const int n_modules, const int n_submodules, const int bit_depth);
FrameUdpReceiver(const uint16_t port, const int n_packets_per_frame);
virtual ~FrameUdpReceiver();
uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer);
uint64_t get_frame_from_udp(ModuleFrame& meta, char* frame_buffer);
};
+33
View File
@@ -0,0 +1,33 @@
#ifndef SF_DAQ_BUFFER_UDPRECVCONFIG_HPP
#define SF_DAQ_BUFFER_UDPRECVCONFIG_HPP
#include <rapidjson/istreamwrapper.h>
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <string>
#include <fstream>
struct UdpRecvConfig {
static UdpRecvConfig from_json_file(const std::string& filename) {
std::ifstream ifs(filename);
rapidjson::IStreamWrapper isw(ifs);
rapidjson::Document config_parameters;
config_parameters.ParseStream(isw);
return {
config_parameters["detector_name"].GetString(),
config_parameters["detector_type"].GetString(),
config_parameters["n_modules"].GetInt(),
config_parameters["start_udp_port"].GetInt(),
};
}
const std::string detector_name;
const std::string detector_type;
const int n_modules;
const int start_udp_port;
};
#endif //SF_DAQ_BUFFER_UDPRECVCONFIG_HPP
@@ -1,20 +1,18 @@
#include <iostream>
#include <utility>
#include "FrameStats.hpp"
#include "date.h"
using namespace std;
using namespace chrono;
FrameStats::FrameStats(
const std::string &detector_name,
const int n_modules,
string detector_name,
const int module_id,
const int bit_depth,
const int n_packets_per_frame,
const size_t stats_time) :
detector_name_(detector_name),
n_modules_(n_modules),
detector_name_(move(detector_name)),
module_id_(module_id),
bit_depth_(bit_depth),
n_packets_per_frame_(bit_depth_ * MODULE_N_PIXELS / 8 / DATA_BYTES_PER_PACKET / n_modules),
n_packets_per_frame_(n_packets_per_frame),
stats_time_(stats_time)
{
reset_counters();
@@ -25,36 +23,31 @@ void FrameStats::reset_counters()
frames_counter_ = 0;
n_missed_packets_ = 0;
n_corrupted_frames_ = 0;
n_corrupted_pulse_id_ = 0;
stats_interval_start_ = steady_clock::now();
}
void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id)
void FrameStats::record_stats(const ModuleFrame &meta)
{
if (bad_pulse_id) {
n_corrupted_pulse_id_++;
}
if (meta.n_recv_packets < n_packets_per_frame_) {
n_missed_packets_ += n_packets_per_frame_ - meta.n_recv_packets;
n_corrupted_frames_++;
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [FrameStats::record_stats] :";
cout << " meta.frame "<< meta.frame_index;
cout << " [" << std::chrono::system_clock::now() << "]";
cout << " [FrameStats::record_stats] :";
cout << " meta.pulse_id "<< meta.pulse_id;
cout << " meta.frame_index "<< meta.frame_index;
cout << " || meta.n_recv_packets " << meta.n_recv_packets;
cout << " || n_missed_packets_ " << n_missed_packets_;
cout << endl;
#endif
}
frames_counter_++;
auto time_passed = duration_cast<milliseconds>(
const auto time_passed = duration_cast<milliseconds>(
steady_clock::now()-stats_interval_start_).count();
if (time_passed >= stats_time_*1000) {
@@ -73,14 +66,13 @@ void FrameStats::print_stats()
system_clock::now()).time_since_epoch().count();
// Output in InfluxDB line protocol
cout << "jf_udp_recv";
cout << "std_udp_recv,";
cout << ",detector_name=" << detector_name_;
cout << ",module_name=M" << module_id_;
cout << ",module_id=" << module_id_;
cout << " ";
cout << "n_missed_packets=" << n_missed_packets_ << "i";
cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i";
cout << ",repetition_rate=" << rep_rate << "i";
cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i";
cout << " ";
cout << timestamp;
cout << endl;
@@ -3,37 +3,32 @@
#include <ostream>
#include <iostream>
#include <chrono>
#include <unistd.h>
#include "date.h"
using namespace std;
using namespace buffer_config;
FrameUdpReceiver::FrameUdpReceiver(
const int module_id,
const uint16_t port,
const int n_modules,
const int n_submodules,
const int bit_depth):
module_id_(module_id),
bit_depth_(bit_depth),
n_packets_per_frame_(bit_depth_ * MODULE_N_PIXELS / 8 / DATA_BYTES_PER_PACKET / n_modules * n_submodules),
data_bytes_per_frame_(n_packets_per_frame_ * DATA_BYTES_PER_PACKET)
const uint16_t port, const int n_packets_per_frame) :
n_packets_per_frame_(n_packets_per_frame),
packet_buffer_(new det_packet[n_packets_per_frame_]),
recv_buff_ptr_(new iovec[n_packets_per_frame_]),
msgs_(new mmsghdr[n_packets_per_frame_]),
sock_from_(new sockaddr_in[n_packets_per_frame_])
{
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [FrameUdpReceiver::FrameUdpReceiver] :";
cout << " Details of FrameUdpReceiver:";
cout << "module_id: " << module_id_;
cout << " || port: " << port;
cout << " || bit_depth : " << bit_depth_;
cout << " || n_packets_per_frame_ : " << n_packets_per_frame_;
cout << " || data_bytes_per_frame_: " << data_bytes_per_frame_ << " !!";
cout << " || n_packets_per_frame: " << n_packets_per_frame_;
cout << endl;
#endif
udp_receiver_.bind(port);
for (int i = 0; i < BUFFER_UDP_N_RECV_MSG; i++) {
for (int i = 0; i < n_packets_per_frame_; i++) {
recv_buff_ptr_[i].iov_base = (void*) &(packet_buffer_[i]);
recv_buff_ptr_[i].iov_len = sizeof(det_packet);
@@ -46,28 +41,28 @@ FrameUdpReceiver::FrameUdpReceiver(
FrameUdpReceiver::~FrameUdpReceiver() {
udp_receiver_.disconnect();
delete[] packet_buffer_;
delete[] recv_buff_ptr_;
delete[] msgs_;
delete[] sock_from_;
}
inline void FrameUdpReceiver::init_frame(
ModuleFrame& frame_metadata, const int i_packet)
{
// Eiger has no pulse_id, frame number instead
frame_metadata.pulse_id = packet_buffer_[i_packet].framenum;
frame_metadata.pulse_id = packet_buffer_[i_packet].bunchid;
frame_metadata.frame_index = packet_buffer_[i_packet].framenum;
frame_metadata.daq_rec = (uint64_t) packet_buffer_[i_packet].debug;
frame_metadata.module_id = (int64_t) module_id_;
frame_metadata.bit_depth = (int16_t) bit_depth_;
frame_metadata.row = (int16_t) packet_buffer_[i_packet].row;
frame_metadata.column = (int16_t) packet_buffer_[i_packet].column;
frame_metadata.pos_y = (int16_t) packet_buffer_[i_packet].row;
frame_metadata.pos_x = (int16_t) packet_buffer_[i_packet].column;
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [FrameUdpReceiver::init_frame] :";
cout << "module_id: " << module_id_;
cout << " || row: " << frame_metadata.row;
cout << " || column: " << frame_metadata.column;
cout << " || pos_y: " << frame_metadata.pos_x;
cout << " || pos_x: " << frame_metadata.pos_x;
cout << " || pulse_id: " << frame_metadata.pulse_id;
cout << " || frame_index: " << frame_metadata.frame_index;
cout << endl;
@@ -100,11 +95,12 @@ inline uint64_t FrameUdpReceiver::process_packets(
i_packet++) {
// First packet for this frame.
if (metadata.pulse_id == 0) {
if (metadata.frame_index == 0) {
init_frame(metadata, i_packet);
// Happens if the last packet from the previous frame gets lost.
// In the jungfrau_packet, framenum is the trigger number (how many triggers from detector power-on) happened
// In the jungfrau_packet, framenum is the trigger number
// (how many triggers from detector power-on) happened
} else if (metadata.frame_index != packet_buffer_[i_packet].framenum) {
packet_buffer_loaded_ = true;
// Continue on this packet.
@@ -122,21 +118,19 @@ inline uint64_t FrameUdpReceiver::process_packets(
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [FrameUdpReceiver::process_packets] :";
cout << " Frame " << metadata.frame_index;
cout << " || N_RECV_PACKETS " << metadata.n_recv_packets;
cout << " || PULSE ID "<< metadata.pulse_id;
cout << " metadata.row " << metadata.row;
cout << " metadata.column " << metadata.column;
cout << "] [frameudpreceiver::process_packets] :";
cout << " frame " << metadata.frame_index << " || ";
cout << packet_buffer_[i_packet].packetnum << " packets received.";
cout << " pulse id "<< metadata.pulse_id;
cout << endl;
#endif
// Buffer is loaded only if this is not the last message.
// buffer is loaded only if this is not the last message.
if (i_packet+1 != packet_buffer_n_packets_) {
packet_buffer_loaded_ = true;
// Continue on next packet.
// continue on next packet.
packet_buffer_offset_ = i_packet + 1;
// If i_packet is the last packet the buffer is empty.
// if i_packet is the last packet the buffer is empty.
} else {
packet_buffer_loaded_ = false;
packet_buffer_offset_ = 0;
@@ -153,36 +147,31 @@ inline uint64_t FrameUdpReceiver::process_packets(
}
uint64_t FrameUdpReceiver::get_frame_from_udp(
ModuleFrame& metadata, char* frame_buffer)
ModuleFrame& meta, char* frame_buffer)
{
// Reset the metadata and frame buffer for the next frame.
metadata.pulse_id = 0;
metadata.n_recv_packets = 0;
memset(frame_buffer, 0, data_bytes_per_frame_);
// Happens when last packet from previous frame was missed.
if (packet_buffer_loaded_) {
auto pulse_id = process_packets(
packet_buffer_offset_, metadata, frame_buffer);
if (pulse_id != 0) {
return pulse_id;
auto frame_index = process_packets(
packet_buffer_offset_, meta, frame_buffer);
if (frame_index != 0) {
return frame_index;
}
}
while (true) {
packet_buffer_n_packets_ = udp_receiver_.receive_many(
msgs_, BUFFER_UDP_N_RECV_MSG);
msgs_, n_packets_per_frame_);
if (packet_buffer_n_packets_ == 0) {
continue;
}
auto pulse_id = process_packets(0, metadata, frame_buffer);
auto frame_index = process_packets(0, meta, frame_buffer);
if (pulse_id != 0) {
return pulse_id;
if (frame_index != 0) {
return frame_index;
}
}
}
+83
View File
@@ -0,0 +1,83 @@
#include <iostream>
#include <stdexcept>
#include <zmq.h>
#include <RamBuffer.hpp>
#include "formats.hpp"
#include "buffer_config.hpp"
#include "FrameUdpReceiver.hpp"
#include "BufferUtils.hpp"
#include "FrameStats.hpp"
#include "UdpRecvConfig.hpp"
using namespace std;
using namespace chrono;
using namespace buffer_config;
using namespace BufferUtils;
int main (int argc, char *argv[]) {
if (argc != 4) {
cout << endl;
cout << "Usage: std_udp_recv [udp_recv_config_filename] [module_id] "
"[bit_depth]";
cout << endl;
cout << "\tudp_recv_config_filename: detector config file path." << endl;
cout << "\tmodule_id: id of the module for this process." << endl;
cout << "\tbit_depth: bit depth of the incoming udp packets." << endl;
cout << endl;
exit(-1);
}
const auto config = UdpRecvConfig::from_json_file(string(argv[1]));
const int module_id = atoi(argv[2]);
const int bit_depth = atoi(argv[3]);
if (DETECTOR_TYPE != config.detector_type) {
throw runtime_error("UDP recv version for " + DETECTOR_TYPE +
" but config for " + config.detector_type);
}
const auto udp_port = config.start_udp_port + module_id;
const size_t FRAME_N_BYTES = MODULE_N_PIXELS * bit_depth / 8;
const size_t N_PACKETS_PER_FRAME = FRAME_N_BYTES / DATA_BYTES_PER_PACKET;
FrameUdpReceiver receiver(udp_port, N_PACKETS_PER_FRAME);
RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame),
FRAME_N_BYTES, config.n_modules);
FrameStats stats(config.detector_name, module_id,
N_PACKETS_PER_FRAME, STATS_TIME);
auto ctx = zmq_ctx_new();
auto socket = bind_socket(ctx, config.detector_name, to_string(module_id));
ModuleFrame meta;
meta.module_id = module_id;
meta.bit_depth = bit_depth;
char* data = new char[FRAME_N_BYTES];
while (true) {
// Reset the metadata and frame buffer for the next frame.
meta.frame_index = 0;
memset(data, 0, FRAME_N_BYTES);
receiver.get_frame_from_udp(meta, data);
// Assign the image_id based on the detector type.
#ifdef USE_EIGER
const uint64_t image_id = meta.frame_index;
#else
const uint64_t image_id = meta.pulse_id;
#endif
meta.id = image_id;
frame_buffer.write_frame(meta, data);
zmq_send(socket, &image_id, sizeof(image_id), 0);
stats.record_stats(meta);
}
delete[] data;
}
+22
View File
@@ -0,0 +1,22 @@
file(GLOB SOURCES
src/*.cpp)
add_library(std-udp-sync-lib STATIC ${SOURCES})
target_include_directories(std-udp-sync-lib PUBLIC include/)
target_link_libraries(std-udp-sync-lib
external
core-buffer-lib)
add_executable(std-udp-sync src/main.cpp)
set_target_properties(std-udp-sync PROPERTIES OUTPUT_NAME std_udp_sync)
target_link_libraries(std-udp-sync
external
core-buffer-lib
std-udp-sync-lib
zmq
pthread
rt)
enable_testing()
add_subdirectory(test/)
+179
View File
@@ -0,0 +1,179 @@
# sf-stream
sf-stream is the component that receives a live stream of frame data from
sf-buffers over ZMQ and assembles them into images. This images are then
sent again over ZMQ to external components. There is always only 1 sf-stream
per detector.
It currently has 3 output streams:
- **Full data full meta** rate stream (send all images and meta)
- **Reduced data full meta** rate stream (send less images, but
all meta)
- **Pulse_id** stream (send only the current pulse_id)
In addition to receiving and assembling images, sf-stream also calculates
additional meta and constructs the structures needed to send data in
Array 1.0 protocol.
This component does not guarantee that the streams will always contain all
the data - it can happen that frame resynchronization is needed, and in this
case 1 or more frames could potentially be lost. This happens so rarely that in
practice is not a problem.
## Overview
![image_stream_overview](../docs/sf_daq_buffer-overview-stream.jpg)
sf-stream is a single threaded application (without counting the ZMQ IO threads)
that is used for providing live assembled images to anyone willing to listen.
In addition, it also provides a pulse_id stream, which is the most immediate
pulse_id feedback we currently have in case we need to synchronize external
components to the current machine pulse_id.
## ZMQ receiving
Each ZMQ stream is coming from a separate sf-buffer. This means that we have as
many connections as we have modules in a detector.
Messages are multipart (2 parts) and are received in PUB/SUB mode.
There is no need for special synchronization between modules as we expect that
frames will always be in the correct order and all modules will provide the
same frame more or less at the same time. If any of this 2 conditions is not
met, the detector is not working properly and we cannot guaranty that sf-stream
will work correctly.
Nonetheless we provide the capability to synchronize the streams in image
assembly phase - this is needed rarely, but occasionally happens. In this sort
of hiccups we usually loose only a couple of consecutive images.
### Messages format
Each message is composed by 2 parts:
- Serialization of ModuleFrame in the first part.
- Frame data in the second part.
Module frame is defined as:
```c++
#pragma pack(push)
#pragma pack(1)
struct ModuleFrame {
uint64_t pulse_id;
uint64_t frame_index;
uint64_t daq_rec;
uint64_t n_recv_packets;
uint64_t module_id;
};
#pragma pack(pop)
```
The frame data is a 1MB (1024*512 pixels * 2 bytes/pixel) blob of data in
**uint16** representing the detector image.
## Image assembly
We first synchronize the modules. We do this by reading all sockets and
deciding the largest frame pulse_id among them (max_pulse_id). We then calculate
the diff between a specific socket pulse_id and the max_pulse_id.
This difference tells us how many messages we need to discard from a specific socket.
This discarding is the source of possible missing images in the output stream.
It can happen in 3 cases:
- At least one of the detector modules did not sent any packets for the specific
pulse_id.
- All the packets from a specific module for a pulse_id were lost before UDP
receiving them.
- ZMQ HWM was reached (either on the sf-buffer or sf-stream) and the message was
dropped.
All this 3 cases are highly unlikely, so synchronization is mostly needed when
first starting sf-stream. Different sockets connect to sf-buffers at different
times. Apart from the initial synchronization there should be no need to
re-synchronize modules in a healthy running environment.
If an image is missing any ZMQ messages from sf-buffers (not all modules data
arrived), the image will be dropped. We do not do partial reconstruction in
sf-stream. However, it is important to note, that this does not cover the case
where frames are incomplete (missing UDP packets on sf-buffer) - we still
assemble this images as long as at least 1 packet/frame for a specific pulse_id
arrived.
## ZMQ sending
We devide the ZMQ sending to 3 types of stream:
- Data processing stream. This is basically the complete stream from
the detector with all meta and data. It can be described as full data full
meta stream. Only 1 client at the time can be connected to this stream
(PUSH/PULL for load balancing).
- Live viewing stream. This is a reduced data full meta stream. We send
meta for all frames, but data only for subset of them (10Hz, for example).
Any number of clients can connect to the 10Hz stream, because we use PUB/SUB
for this socket.
- Pulse_id stream. This is a stream that sends out only the current pulse_id.
It can be used to synchronize any external system with the current pulse_id
being recorded. Multiple clients can connect to this stream.
In the data processing and live viewing stream we use
[Array 1.0](https://github.com/paulscherrerinstitute/htypes/blob/master/array-1.0.md)
as our protocol to be compatible with currently available external components.
We use following fields in the JSON header:
| Name | Type | Comment |
| --- | --- | --- |
| pulse_id | uint64 |bunchid from detector header|
|frame|uint64|frame_index from detector header|
|is_good_frame|bool|true if all packets for this frame are present|
|daq_rec|uint32|daqrec from detector header|
|pedestal_file|string|Path to pedestal file|
|gain_file|string|Path to gain file|
|number_frames_expected|int|Number of expected frames|
|run_name|string|Name of the run|
|detector_name|string|Name of the detector|
|htype|string|Value: "array-1.0"|
|type|string|Value: "uint16"|
|shape|Array[uint64]|Shape of the image in stream|
### Full data full meta stream
This stream runs at detector frequency and uses PUSH/PULL to distribute data
to max 1 client (this client can have many processes, but it needs to be a
single logical entity, since the images are evenly distributed to all
connected sockets).
![image_full_stream](../docs/sf_daq_buffer-FullStream.jpg)
The goal here is to provide a complete copy of the detector image stream
for purposes of online analysis. Given the large amount of data on this
stream only "pre-approved" applications that can handle the load should be
attached here.
### Reduced data full meta stream
This streams also runs at detector frequency for JSON headers (meta), but
it sends only part of the images in the stream. The rest of the images are
sent as empty buffers (the receiver needs to be aware of this behaviour, as
Array 1.0 alone does not define it).
![image_reduced_stream](../docs/sf_daq_buffer-ReducedStream.jpg)
This is the lightweight version of the image stream. Any number of clients
can connect to this stream (PUB/SUB) but no client can do load
balancing automatically (it would require PUSH/PULL).
This is a "public interface" for anyone who wants to get detector data live,
and can do with only a subset of images.
### Pulse_id stream
This stream runs ar detector frequency in PUB/SUB mode. The only thing it
does is sends out the pulse_id (of the just received image) in uint64_t
format.
![image_pulse_stream](../docs/sf_daq_buffer-PulseStream.jpg)
This is also a "public interface" for anyone who wants to get the current
system pulse_id.
+27
View File
@@ -0,0 +1,27 @@
#ifndef SF_DAQ_BUFFER_SYNCSTATS_HPP
#define SF_DAQ_BUFFER_SYNCSTATS_HPP
#include <chrono>
#include <string>
#include <formats.hpp>
class SyncStats {
const std::string detector_name_;
const size_t stats_modulo_;
int image_counter_;
int n_sync_lost_images_;
std::chrono::time_point<std::chrono::steady_clock> stats_interval_start_;
void reset_counters();
void print_stats();
public:
SyncStats(const std::string &detector_name,
const size_t stats_modulo);
void record_stats(const uint32_t n_lost_pulses);
};
#endif //SF_DAQ_BUFFER_SYNCSTATS_HPP
+29
View File
@@ -0,0 +1,29 @@
#ifndef SF_DAQ_BUFFER_UDPRECVCONFIG_HPP
#define SF_DAQ_BUFFER_UDPRECVCONFIG_HPP
#include <rapidjson/istreamwrapper.h>
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <string>
#include <fstream>
struct UdpSyncConfig {
static UdpSyncConfig from_json_file(const std::string& filename) {
std::ifstream ifs(filename);
rapidjson::IStreamWrapper isw(ifs);
rapidjson::Document config_parameters;
config_parameters.ParseStream(isw);
return {
config_parameters["detector_name"].GetString(),
config_parameters["n_modules"].GetInt()
};
}
const std::string detector_name;
const int n_modules;
};
#endif //SF_DAQ_BUFFER_UDPRECVCONFIG_HPP
@@ -0,0 +1,34 @@
#ifndef SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP
#define SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP
#include <cstddef>
#include <string>
#include <vector>
#include "formats.hpp"
struct PulseAndSync {
const uint64_t image_id;
const uint32_t n_lost_pulses;
};
class ZmqPulseSyncReceiver {
void* ctx_;
const int n_modules_;
std::vector<void*> sockets_;
public:
ZmqPulseSyncReceiver(
void* ctx,
const std::string& detector_name,
const int n_modules);
~ZmqPulseSyncReceiver();
PulseAndSync get_next_pulse_id() const;
};
#endif //SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP
+11
View File
@@ -0,0 +1,11 @@
namespace sync_config
{
// If the modules are offset more than 1000 pulses, crush.
const uint64_t PULSE_OFFSET_LIMIT = 100;
// Number of times we try to re-sync in case of failure.
const int SYNC_RETRY_LIMIT = 3;
// Number of pulses between each statistics print out.
const size_t SYNC_STATS_MODULO = 1000;
}
+55
View File
@@ -0,0 +1,55 @@
#include "SyncStats.hpp"
#include <iostream>
using namespace std;
using namespace chrono;
SyncStats::SyncStats(
const std::string &detector_name,
const size_t stats_modulo) :
detector_name_(detector_name),
stats_modulo_(stats_modulo)
{
reset_counters();
}
void SyncStats::reset_counters()
{
image_counter_ = 0;
n_sync_lost_images_ = 0;
stats_interval_start_ = steady_clock::now();
}
void SyncStats::record_stats(const uint32_t n_lost_pulses)
{
image_counter_++;
n_sync_lost_images_ += n_lost_pulses;
if (image_counter_ == stats_modulo_) {
print_stats();
reset_counters();
}
}
void SyncStats::print_stats()
{
auto interval_ms_duration = duration_cast<milliseconds>(
steady_clock::now()-stats_interval_start_).count();
// * 1000 because milliseconds, + 250 because of truncation.
int rep_rate = ((image_counter_ * 1000) + 250) / interval_ms_duration;
uint64_t timestamp = time_point_cast<nanoseconds>(
system_clock::now()).time_since_epoch().count();
// Output in InfluxDB line protocol
cout << "std_udp_sync";
cout << ",detector_name=" << detector_name_;
cout << " ";
cout << "n_processed_images=" << image_counter_ << "i";
cout << ",n_sync_lost_images=" << n_sync_lost_images_ << "i";
cout << ",repetition_rate=" << rep_rate << "i";
cout << " ";
cout << timestamp;
cout << endl;
}
+127
View File
@@ -0,0 +1,127 @@
#include "ZmqPulseSyncReceiver.hpp"
#include "BufferUtils.hpp"
#include <zmq.h>
#include <stdexcept>
#include <sstream>
#include <chrono>
#include <iostream>
#include "date.h"
#include "sync_config.hpp"
using namespace std;
using namespace chrono;
using namespace sync_config;
ZmqPulseSyncReceiver::ZmqPulseSyncReceiver(
void * ctx,
const string& detector_name,
const int n_modules) :
ctx_(ctx),
n_modules_(n_modules)
{
sockets_.reserve(n_modules_);
for (int i=0; i<n_modules_; i++) {
sockets_.push_back(
BufferUtils::connect_socket(ctx_, detector_name, to_string(i)));
}
}
ZmqPulseSyncReceiver::~ZmqPulseSyncReceiver()
{
for (auto& socket:sockets_) {
zmq_close(socket);
}
}
PulseAndSync ZmqPulseSyncReceiver::get_next_pulse_id() const
{
uint64_t pulses[n_modules_];
bool modules_in_sync = true;
for (int i = 0; i < n_modules_; i++) {
zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0);
if (pulses[0] != pulses[i]) {
modules_in_sync = false;
}
}
if (modules_in_sync) {
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [ZmqPulseSyncReceiver::get_next_pulse_id] ";
cout << "] (modules_in_sync) Frame index:" << pulses[0];
cout << endl;
#endif
return {pulses[0], 0};
}
// How many pulses we lost in total to get the next pulse_id.
uint32_t n_lost_pulses = 0;
for (int i_sync=0; i_sync < SYNC_RETRY_LIMIT; i_sync++) {
uint64_t min_pulse_id = numeric_limits<uint64_t>::max();;
uint64_t max_pulse_id = 0;
for (int i = 0; i < n_modules_; i++) {
min_pulse_id = min(min_pulse_id, pulses[i]);
max_pulse_id = max(max_pulse_id, pulses[i]);
}
auto max_diff = max_pulse_id - min_pulse_id;
if (max_diff > PULSE_OFFSET_LIMIT) {
stringstream err_msg;
err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]";
err_msg << " PULSE_OFFSET_LIMIT exceeded.";
err_msg << " max_diff=" << max_diff << " pulses.";
for (int i = 0; i < n_modules_; i++) {
err_msg << " (module " << i << ", ";
err_msg << pulses[i] << "),";
}
err_msg << endl;
throw runtime_error(err_msg.str());
}
modules_in_sync = true;
// Max pulses we lost in this sync attempt.
uint32_t i_sync_lost_pulses = 0;
for (int i = 0; i < n_modules_; i++) {
// How many pulses we lost for this specific module.
uint32_t i_module_lost_pulses = 0;
while (pulses[i] < max_pulse_id) {
zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0);
i_module_lost_pulses++;
}
i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses);
if (pulses[i] != max_pulse_id) {
modules_in_sync = false;
}
}
n_lost_pulses += i_sync_lost_pulses;
if (modules_in_sync) {
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [ZmqPulseSyncReceiver::get_next_pulse_id]";
cout << " modules_in_sync false";
cout << endl;
#endif
return {pulses[0], n_lost_pulses};
}
}
stringstream err_msg;
err_msg << "[ZmqLiveReceiver::get_next_pulse_id]";
err_msg << " SYNC_RETRY_LIMIT exceeded.";
err_msg << endl;
throw runtime_error(err_msg.str());
}
+69
View File
@@ -0,0 +1,69 @@
#include <iostream>
#include <string>
#include <zmq.h>
#include <RamBuffer.hpp>
#include <BufferUtils.hpp>
#include <SyncStats.hpp>
#include "date.h"
#include <chrono>
#include "sync_config.hpp"
#include "ZmqPulseSyncReceiver.hpp"
#include "UdpSyncConfig.hpp"
using namespace std;
using namespace sync_config;
#ifdef USE_EIGER
#include "eiger.hpp"
#else
#include "jungfrau.hpp"
#endif
int main (int argc, char *argv[])
{
if (argc != 3) {
cout << endl;
cout << "Usage: std_udp_sync [detector_json_filename] [bit_depth]" << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << "\tbit_depth: bit depth of the incoming udp packets." << endl;
cout << endl;
exit(-1);
}
const auto config = UdpSyncConfig::from_json_file(string(argv[1]));
const int bit_depth = atoi(argv[2]);
const size_t FRAME_N_BYTES = MODULE_N_PIXELS * bit_depth / 8;
auto ctx = zmq_ctx_new();
zmq_ctx_set(ctx, ZMQ_IO_THREADS, 1);
auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "sync");
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [Assembler] :";
cout << " Details of Assembler:";
cout << " detector_name: " << config.detector_name;
cout << " || n_modules: " << config.n_modules;
cout << endl;
#endif
RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame),
FRAME_N_BYTES, config.n_modules);
ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules);
SyncStats stats(config.detector_name, SYNC_STATS_MODULO);
while (true) {
auto meta = receiver.get_next_pulse_id();
zmq_send(sender, &meta.image_id, sizeof(meta.image_id), 0);
stats.record_stats(meta.n_lost_pulses);
}
}
+7
View File
@@ -0,0 +1,7 @@
add_executable(std-udp-sync-tests main.cpp)
target_link_libraries(std-udp-sync-tests
std-udp-sync-lib
gtest
)
+8
View File
@@ -0,0 +1,8 @@
#include "gtest/gtest.h"
using namespace std;
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}