mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-01 04:32:22 +02:00
Almost compiles but needs more metadata
This commit is contained in:
+12
-18
@@ -1,18 +1,12 @@
|
||||
file(GLOB SOURCES
|
||||
src/*.cpp)
|
||||
|
||||
add_library(jf-udp-recv-lib STATIC ${SOURCES})
|
||||
target_include_directories(jf-udp-recv-lib PUBLIC include/)
|
||||
target_link_libraries(jf-udp-recv-lib
|
||||
external
|
||||
core-buffer-lib)
|
||||
|
||||
add_executable(jf-udp-recv src/main.cpp)
|
||||
set_target_properties(jf-udp-recv PROPERTIES OUTPUT_NAME jf_udp_recv)
|
||||
target_link_libraries(jf-udp-recv
|
||||
jf-udp-recv-lib
|
||||
zmq
|
||||
rt)
|
||||
|
||||
enable_testing()
|
||||
add_subdirectory(test/)
|
||||
file(GLOB SOURCES src/*.cpp)
|
||||
|
||||
add_library(jfj-udp-recv-lib STATIC ${SOURCES})
|
||||
target_include_directories(jfj-udp-recv-lib PUBLIC include/)
|
||||
target_link_libraries(jfj-udp-recv-lib external core-buffer-lib)
|
||||
|
||||
add_executable(jfj-udp-recv src/main.cpp)
|
||||
set_target_properties(jfj-udp-recv PROPERTIES OUTPUT_NAME jf_udp_recv)
|
||||
target_link_libraries(jfj-udp-recv jfj-udp-recv-lib zmq rt)
|
||||
|
||||
enable_testing()
|
||||
# add_subdirectory(test/)
|
||||
|
||||
+164
-164
@@ -1,164 +1,164 @@
|
||||
# sf-buffer
|
||||
sf-buffer is the component that receives the detector data in form of UDP
|
||||
packages and writes them down to disk to a binary format. In addition, it
|
||||
sends a copy of the module frame to sf-stream via ZMQ.
|
||||
|
||||
Each sf-buffer process is taking care of a single detector module. The
|
||||
processes are all independent and do not rely on any external data input
|
||||
to maximize isolation and possible interactions in our system.
|
||||
|
||||
The main design principle is simplicity and decoupling:
|
||||
|
||||
- No interprocess dependencies/communication.
|
||||
- No dependencies on external libraries (as much as possible).
|
||||
- Using POSIX as much as possible.
|
||||
|
||||
We are optimizing for maintainability and long term stability. Performance is
|
||||
of concern only if the performance criteria are not met.
|
||||
|
||||
## Overview
|
||||
|
||||

|
||||
|
||||
sf-buffer is a single threaded application (without counting the ZMQ IO threads)
|
||||
that does both receiving, assembling, writing and sending in the same thread.
|
||||
|
||||
### UDP receiving
|
||||
|
||||
Each process listens to one udp port. Packets coming to this udp port are
|
||||
assembled into frames. Frames (either complete or with missing packets) are
|
||||
passed forward. The number of received packets is saved so we can later
|
||||
(at image assembly time) determine if the frame is valid or not. At this point
|
||||
we do no validation.
|
||||
|
||||
We are currently using **recvmmsg** to minimize the number of switches to
|
||||
kernel mode.
|
||||
|
||||
We expect all packets to come in order or not come at all. Once we see the
|
||||
package for the next pulse_id we can assume no more packages are coming for
|
||||
the previous one, and send the assembled frame down the program.
|
||||
|
||||
### File writing
|
||||
|
||||
Files are written to disk in frames - one write to disk per frame. This gives
|
||||
us a relaxed 10ms interval of 1 MB writes.
|
||||
|
||||
#### File format
|
||||
|
||||
The binary file on disk is just a serialization of multiple
|
||||
**BufferBinaryFormat** structs:
|
||||
```c++
|
||||
#pragma pack(push)
|
||||
#pragma pack(1)
|
||||
struct ModuleFrame {
|
||||
uint64_t pulse_id;
|
||||
uint64_t frame_index;
|
||||
uint64_t daq_rec;
|
||||
uint64_t n_recv_packets;
|
||||
uint64_t module_id;
|
||||
};
|
||||
#pragma pack(pop)
|
||||
|
||||
#pragma pack(push)
|
||||
#pragma pack(1)
|
||||
struct BufferBinaryFormat {
|
||||
const char FORMAT_MARKER = 0xBE;
|
||||
ModuleFrame meta;
|
||||
char data[buffer_config::MODULE_N_BYTES];
|
||||
};
|
||||
#pragma pack(pop)
|
||||
```
|
||||
|
||||

|
||||
|
||||
Each frame is composed by:
|
||||
|
||||
- **FORMAT\_MARKER** (0xBE) - a control byte to determine the validity of the frame.
|
||||
- **ModuleFrame** - frame meta used in image assembly phase.
|
||||
- **Data** - assembled frame from a single module.
|
||||
|
||||
Frames are written one after another to a specific offset in the file. The
|
||||
offset is calculated based on the pulse_id, so each frame has a specific place
|
||||
in the file and there is no need to have an index for frame retrieval.
|
||||
|
||||
The offset where a specific pulse_id is written in a file is calculated:
|
||||
|
||||
```c++
|
||||
// We save 1000 pulses in each file.
|
||||
const uint64_t FILE_MOD = 1000
|
||||
|
||||
// Relative index of pulse_id inside file.
|
||||
size_t file_base = pulse_id % FILE_MOD;
|
||||
// Offset in bytes of relative index in file.
|
||||
size_t file_offset = file_base * sizeof(BufferBinaryFormat);
|
||||
```
|
||||
|
||||
We now know where to look for data inside the file, but we still don't know
|
||||
inside which file to look. For this we need to discuss the folder structure.
|
||||
|
||||
#### Folder structure
|
||||
|
||||
The folder (as well as file) structure is deterministic in the sense that given
|
||||
a specific pulse_id, we can directly calculate the folder, file, and file
|
||||
offset where the data is stored. This allows us to have independent writing
|
||||
and reading from the buffer without building any indexes.
|
||||
|
||||
The binary files written by sf_buffer are saved to:
|
||||
|
||||
[detector_folder]/[module_folder]/[data_folder]/[data_file].bin
|
||||
|
||||
- **detector\_folder** should always be passed as an absolute path. This is the
|
||||
container that holds all data related to a specific detector.
|
||||
- **module\_folder** is usually composed like "M00", "M01". It separates data
|
||||
from different modules of one detector.
|
||||
- **data\_folder** and **data\_file** are automatically calculated based on the
|
||||
current pulse_id, FOLDER_MOD and FILE_MOD attributes. This folders act as our
|
||||
index for accessing data.
|
||||
|
||||

|
||||
|
||||
```c++
|
||||
// FOLDER_MOD = 100000
|
||||
int data_folder = (pulse_id % FOLDER_MOD) * FOLDER_MOD;
|
||||
// FILE_MOD = 1000
|
||||
int data_file = (pulse_id % FILE_MOD) * FILE_MOD;
|
||||
```
|
||||
|
||||
The data_folder and data_file folders are named as the first pulse_id that
|
||||
should be stored inside them.
|
||||
|
||||
FOLDER_MOD == 100000 means that each data_folder will contain data for 100000
|
||||
pulses, while FILE_MOD == 1000 means that each file inside the data_folder
|
||||
will contain 1000 pulses. The total number of data_files in each data_folder
|
||||
will therefore be **FILE\_MOD / FOLDER\_MOD = 100**.
|
||||
|
||||
#### Analyzing the buffer on disk
|
||||
In **sf-utils** there is a Python module that allows you to read directly the
|
||||
buffer in order to debug it or to verify the consistency between the HDF5 file
|
||||
and the received data.
|
||||
|
||||
- VerifyH5DataConsistency.py checks the consistency between the H5 file and
|
||||
buffer.
|
||||
- BinaryBufferReader.py reads the buffer and prints meta. The class inside
|
||||
can also be used in external scripts.
|
||||
|
||||
### ZMQ sending
|
||||
|
||||
A copy of the data written to disk is also send via ZMQ to the sf-stream. This
|
||||
is used to provide live viewing / processing capabilities. Each module data is
|
||||
sent separately, and this is later assembled in the sf-stream.
|
||||
|
||||
We use the PUB/SUB mechanism for distributing this data - we cannot control the
|
||||
rate of the producer, and we would like to avoid distributed image assembly
|
||||
if possible, so PUSH/PULL does not make sense in this case.
|
||||
|
||||
We provide no guarantees on live data delivery, but in practice the number of
|
||||
dropped or incomplete frames in currently negligible.
|
||||
|
||||
The protocol is a serialization of the same data structures we use to
|
||||
write on disk (no need for additional memory operations before sending out
|
||||
data). It uses a 2 part multipart ZMQ message:
|
||||
|
||||
- The first part is a serialization of the ModuleFrame struct (see above).
|
||||
- The second part is the data field in the BufferBinaryFormat struct (the frame
|
||||
data).
|
||||
# sf-buffer
|
||||
sf-buffer is the component that receives the detector data in form of UDP
|
||||
packages and writes them down to disk to a binary format. In addition, it
|
||||
sends a copy of the module frame to sf-stream via ZMQ.
|
||||
|
||||
Each sf-buffer process is taking care of a single detector module. The
|
||||
processes are all independent and do not rely on any external data input
|
||||
to maximize isolation and possible interactions in our system.
|
||||
|
||||
The main design principle is simplicity and decoupling:
|
||||
|
||||
- No interprocess dependencies/communication.
|
||||
- No dependencies on external libraries (as much as possible).
|
||||
- Using POSIX as much as possible.
|
||||
|
||||
We are optimizing for maintainability and long term stability. Performance is
|
||||
of concern only if the performance criteria are not met.
|
||||
|
||||
## Overview
|
||||
|
||||

|
||||
|
||||
sf-buffer is a single threaded application (without counting the ZMQ IO threads)
|
||||
that does both receiving, assembling, writing and sending in the same thread.
|
||||
|
||||
### UDP receiving
|
||||
|
||||
Each process listens to one udp port. Packets coming to this udp port are
|
||||
assembled into frames. Frames (either complete or with missing packets) are
|
||||
passed forward. The number of received packets is saved so we can later
|
||||
(at image assembly time) determine if the frame is valid or not. At this point
|
||||
we do no validation.
|
||||
|
||||
We are currently using **recvmmsg** to minimize the number of switches to
|
||||
kernel mode.
|
||||
|
||||
We expect all packets to come in order or not come at all. Once we see the
|
||||
package for the next pulse_id we can assume no more packages are coming for
|
||||
the previous one, and send the assembled frame down the program.
|
||||
|
||||
### File writing
|
||||
|
||||
Files are written to disk in frames - one write to disk per frame. This gives
|
||||
us a relaxed 10ms interval of 1 MB writes.
|
||||
|
||||
#### File format
|
||||
|
||||
The binary file on disk is just a serialization of multiple
|
||||
**BufferBinaryFormat** structs:
|
||||
```c++
|
||||
#pragma pack(push)
|
||||
#pragma pack(1)
|
||||
struct ModuleFrame {
|
||||
uint64_t pulse_id;
|
||||
uint64_t frame_index;
|
||||
uint64_t daq_rec;
|
||||
uint64_t n_recv_packets;
|
||||
uint64_t module_id;
|
||||
};
|
||||
#pragma pack(pop)
|
||||
|
||||
#pragma pack(push)
|
||||
#pragma pack(1)
|
||||
struct BufferBinaryFormat {
|
||||
const char FORMAT_MARKER = 0xBE;
|
||||
ModuleFrame meta;
|
||||
char data[buffer_config::MODULE_N_BYTES];
|
||||
};
|
||||
#pragma pack(pop)
|
||||
```
|
||||
|
||||

|
||||
|
||||
Each frame is composed by:
|
||||
|
||||
- **FORMAT\_MARKER** (0xBE) - a control byte to determine the validity of the frame.
|
||||
- **ModuleFrame** - frame meta used in image assembly phase.
|
||||
- **Data** - assembled frame from a single module.
|
||||
|
||||
Frames are written one after another to a specific offset in the file. The
|
||||
offset is calculated based on the pulse_id, so each frame has a specific place
|
||||
in the file and there is no need to have an index for frame retrieval.
|
||||
|
||||
The offset where a specific pulse_id is written in a file is calculated:
|
||||
|
||||
```c++
|
||||
// We save 1000 pulses in each file.
|
||||
const uint64_t FILE_MOD = 1000
|
||||
|
||||
// Relative index of pulse_id inside file.
|
||||
size_t file_base = pulse_id % FILE_MOD;
|
||||
// Offset in bytes of relative index in file.
|
||||
size_t file_offset = file_base * sizeof(BufferBinaryFormat);
|
||||
```
|
||||
|
||||
We now know where to look for data inside the file, but we still don't know
|
||||
inside which file to look. For this we need to discuss the folder structure.
|
||||
|
||||
#### Folder structure
|
||||
|
||||
The folder (as well as file) structure is deterministic in the sense that given
|
||||
a specific pulse_id, we can directly calculate the folder, file, and file
|
||||
offset where the data is stored. This allows us to have independent writing
|
||||
and reading from the buffer without building any indexes.
|
||||
|
||||
The binary files written by sf_buffer are saved to:
|
||||
|
||||
[detector_folder]/[module_folder]/[data_folder]/[data_file].bin
|
||||
|
||||
- **detector\_folder** should always be passed as an absolute path. This is the
|
||||
container that holds all data related to a specific detector.
|
||||
- **module\_folder** is usually composed like "M00", "M01". It separates data
|
||||
from different modules of one detector.
|
||||
- **data\_folder** and **data\_file** are automatically calculated based on the
|
||||
current pulse_id, FOLDER_MOD and FILE_MOD attributes. This folders act as our
|
||||
index for accessing data.
|
||||
|
||||

|
||||
|
||||
```c++
|
||||
// FOLDER_MOD = 100000
|
||||
int data_folder = (pulse_id % FOLDER_MOD) * FOLDER_MOD;
|
||||
// FILE_MOD = 1000
|
||||
int data_file = (pulse_id % FILE_MOD) * FILE_MOD;
|
||||
```
|
||||
|
||||
The data_folder and data_file folders are named as the first pulse_id that
|
||||
should be stored inside them.
|
||||
|
||||
FOLDER_MOD == 100000 means that each data_folder will contain data for 100000
|
||||
pulses, while FILE_MOD == 1000 means that each file inside the data_folder
|
||||
will contain 1000 pulses. The total number of data_files in each data_folder
|
||||
will therefore be **FILE\_MOD / FOLDER\_MOD = 100**.
|
||||
|
||||
#### Analyzing the buffer on disk
|
||||
In **sf-utils** there is a Python module that allows you to read directly the
|
||||
buffer in order to debug it or to verify the consistency between the HDF5 file
|
||||
and the received data.
|
||||
|
||||
- VerifyH5DataConsistency.py checks the consistency between the H5 file and
|
||||
buffer.
|
||||
- BinaryBufferReader.py reads the buffer and prints meta. The class inside
|
||||
can also be used in external scripts.
|
||||
|
||||
### ZMQ sending
|
||||
|
||||
A copy of the data written to disk is also send via ZMQ to the sf-stream. This
|
||||
is used to provide live viewing / processing capabilities. Each module data is
|
||||
sent separately, and this is later assembled in the sf-stream.
|
||||
|
||||
We use the PUB/SUB mechanism for distributing this data - we cannot control the
|
||||
rate of the producer, and we would like to avoid distributed image assembly
|
||||
if possible, so PUSH/PULL does not make sense in this case.
|
||||
|
||||
We provide no guarantees on live data delivery, but in practice the number of
|
||||
dropped or incomplete frames in currently negligible.
|
||||
|
||||
The protocol is a serialization of the same data structures we use to
|
||||
write on disk (no need for additional memory operations before sending out
|
||||
data). It uses a 2 part multipart ZMQ message:
|
||||
|
||||
- The first part is a serialization of the ModuleFrame struct (see above).
|
||||
- The second part is the data field in the BufferBinaryFormat struct (the frame
|
||||
data).
|
||||
|
||||
@@ -1,31 +1,28 @@
|
||||
#include <cstddef>
|
||||
#include <formats.hpp>
|
||||
#include <chrono>
|
||||
|
||||
#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP
|
||||
#define SF_DAQ_BUFFER_FRAMESTATS_HPP
|
||||
|
||||
|
||||
class FrameStats {
|
||||
const std::string detector_name_;
|
||||
const int module_id_;
|
||||
size_t stats_time_;
|
||||
|
||||
int frames_counter_;
|
||||
int n_missed_packets_;
|
||||
int n_corrupted_frames_;
|
||||
int n_corrupted_pulse_id_;
|
||||
std::chrono::time_point<std::chrono::steady_clock> stats_interval_start_;
|
||||
|
||||
void reset_counters();
|
||||
void print_stats();
|
||||
|
||||
public:
|
||||
FrameStats(const std::string &detector_name,
|
||||
const int module_id,
|
||||
const size_t stats_time);
|
||||
void record_stats(const ModuleFrame &meta, const bool bad_pulse_id);
|
||||
};
|
||||
|
||||
|
||||
#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP
|
||||
#include <cstddef>
|
||||
#include <formats.hpp>
|
||||
#include <chrono>
|
||||
|
||||
#ifndef SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP
|
||||
#define SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP
|
||||
|
||||
|
||||
class FrameStats {
|
||||
const std::string detector_name_;
|
||||
size_t stats_time_;
|
||||
|
||||
int frames_counter_;
|
||||
int n_missed_packets_;
|
||||
int n_corrupted_frames_;
|
||||
int n_corrupted_pulse_id_;
|
||||
std::chrono::time_point<std::chrono::steady_clock> stats_interval_start_;
|
||||
|
||||
void reset_counters();
|
||||
void print_stats();
|
||||
|
||||
public:
|
||||
FrameStats(const std::string &detector_name, const size_t stats_time);
|
||||
void record_stats(const ImageMetadata &meta, const bool bad_pulse_id);
|
||||
};
|
||||
|
||||
|
||||
#endif //SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
#include "formats.hpp"
|
||||
#include "buffer_config.hpp"
|
||||
#include "PacketBuffer.hpp"
|
||||
|
||||
#include "jungfraujoch.hpp"
|
||||
|
||||
/** JungfrauJoch UDP receiver
|
||||
|
||||
@@ -14,11 +14,12 @@
|
||||
NOTE: This design will not scale well for higher frame rates...
|
||||
**/
|
||||
class JfjFrameUdpReceiver {
|
||||
PacketUdpReceiver udp_receiver_;
|
||||
PacketUdpReceiver m_udp_receiver;
|
||||
uint64_t m_frame_index;
|
||||
|
||||
PacketBuffer<jfjoch_packet_t, buffer_config::BUFFER_UDP_N_RECV_MSG> m_buffer;
|
||||
|
||||
inline void init_frame(ImageMetadata& frame_metadata, jfjoch_packet_t& c_packet);
|
||||
inline void init_frame(ImageMetadata& frame_metadata, const jfjoch_packet_t& c_packet);
|
||||
inline uint64_t process_packets(ImageMetadata& metadata, char* frame_buffer);
|
||||
|
||||
public:
|
||||
|
||||
@@ -34,8 +34,8 @@ public:
|
||||
/**Diagnostics**/
|
||||
size_t size() const { return ( idx_write-idx_read ); }
|
||||
size_t capacity() const { return m_capacity; }
|
||||
bool is_full() const { return (idx_write >= m_capacity); }
|
||||
bool is_empty() const { return (idx_write <= idx_read); }
|
||||
bool is_full() const { return bool(idx_write >= m_capacity); }
|
||||
bool is_empty() const { return bool(idx_write <= idx_read); }
|
||||
|
||||
/**Operators**/
|
||||
void reset(){ idx_write = 0; idx_read = 0; }; // Reset the buffer
|
||||
@@ -43,15 +43,15 @@ public:
|
||||
mmsghdr& msgs(){ return m_msgs; };
|
||||
|
||||
/**Element access**/
|
||||
const T& pop_front(); //Destructive read
|
||||
T& pop_front(); //Destructive read
|
||||
const T& peek_front(); //Non-destructive read
|
||||
void push_back(T item); //Write new element to buffer
|
||||
|
||||
/**Fill from UDP receiver**/
|
||||
template <typename TY>
|
||||
void fill_fom(TY& recv){
|
||||
void fill_from(TY& recv){
|
||||
std::lock_guard<std::mutex> g_guard(m_mutex);
|
||||
this->idx_write = recv.receive_many(this->msgs(), this->capacity());
|
||||
this->idx_write = recv.receive_many(m_msgs, this->capacity());
|
||||
this->idx_read = 0;
|
||||
}
|
||||
|
||||
@@ -80,7 +80,7 @@ private:
|
||||
Standard read access to queues (i.e. progress the read pointer).
|
||||
Throws 'std::length_error' if container is empty. **/
|
||||
template <typename T, size_t CAPACITY>
|
||||
const T& PacketBuffer<T, CAPACITY>::pop_front(){
|
||||
T& PacketBuffer<T, CAPACITY>::pop_front(){
|
||||
std::lock_guard<std::mutex> g_guard(m_mutex);
|
||||
if(this->is_empty()){ throw std::out_of_range("Attempted to read empty queue!"); }
|
||||
idx_read++;
|
||||
|
||||
@@ -1,22 +1,22 @@
|
||||
#ifndef UDPRECEIVER_H
|
||||
#define UDPRECEIVER_H
|
||||
|
||||
#include <sys/socket.h>
|
||||
|
||||
class PacketUdpReceiver {
|
||||
|
||||
int socket_fd_;
|
||||
|
||||
public:
|
||||
PacketUdpReceiver();
|
||||
virtual ~PacketUdpReceiver();
|
||||
|
||||
bool receive(void* buffer, const size_t buffer_n_bytes);
|
||||
int receive_many(mmsghdr* msgs, const size_t n_msgs);
|
||||
|
||||
void bind(const uint16_t port);
|
||||
void disconnect();
|
||||
};
|
||||
|
||||
|
||||
#endif //LIB_CPP_H5_WRITER_UDPRECEIVER_H
|
||||
#ifndef UDPRECEIVER_H
|
||||
#define UDPRECEIVER_H
|
||||
|
||||
#include <sys/socket.h>
|
||||
|
||||
class PacketUdpReceiver {
|
||||
|
||||
int socket_fd_;
|
||||
|
||||
public:
|
||||
PacketUdpReceiver();
|
||||
virtual ~PacketUdpReceiver();
|
||||
|
||||
bool receive(void* buffer, const size_t buffer_n_bytes);
|
||||
int receive_many(mmsghdr* msgs, const size_t n_msgs);
|
||||
|
||||
void bind(const uint16_t port);
|
||||
void disconnect();
|
||||
};
|
||||
|
||||
|
||||
#endif //LIB_CPP_H5_WRITER_UDPRECEIVER_H
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
|
||||
#include "include/PacketBuffer.hpp"
|
||||
|
||||
|
||||
struct DummyContainer{
|
||||
uint64_t index;
|
||||
uint64_t timestamp;
|
||||
uint16_t data[32];
|
||||
};
|
||||
|
||||
|
||||
int main (int argc, char *argv[]) {
|
||||
PacketBuffer<DummyContainer, 64> b;
|
||||
|
||||
}
|
||||
@@ -1,71 +1,54 @@
|
||||
#include <iostream>
|
||||
#include "FrameStats.hpp"
|
||||
|
||||
using namespace std;
|
||||
using namespace chrono;
|
||||
|
||||
FrameStats::FrameStats(
|
||||
const std::string &detector_name,
|
||||
const int module_id,
|
||||
const size_t stats_time) :
|
||||
detector_name_(detector_name),
|
||||
module_id_(module_id),
|
||||
stats_time_(stats_time)
|
||||
{
|
||||
reset_counters();
|
||||
}
|
||||
|
||||
void FrameStats::reset_counters()
|
||||
{
|
||||
frames_counter_ = 0;
|
||||
n_missed_packets_ = 0;
|
||||
n_corrupted_frames_ = 0;
|
||||
n_corrupted_pulse_id_ = 0;
|
||||
stats_interval_start_ = steady_clock::now();
|
||||
}
|
||||
|
||||
void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id)
|
||||
{
|
||||
|
||||
if (bad_pulse_id) {
|
||||
n_corrupted_pulse_id_++;
|
||||
}
|
||||
|
||||
if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) {
|
||||
n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets;
|
||||
n_corrupted_frames_++;
|
||||
}
|
||||
|
||||
frames_counter_++;
|
||||
|
||||
auto time_passed = duration_cast<milliseconds>(
|
||||
steady_clock::now()-stats_interval_start_).count();
|
||||
|
||||
if (time_passed >= stats_time_*1000) {
|
||||
print_stats();
|
||||
reset_counters();
|
||||
}
|
||||
}
|
||||
|
||||
void FrameStats::print_stats()
|
||||
{
|
||||
auto interval_ms_duration = duration_cast<milliseconds>(
|
||||
steady_clock::now()-stats_interval_start_).count();
|
||||
// * 1000 because milliseconds, + 250 because of truncation.
|
||||
int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration;
|
||||
uint64_t timestamp = time_point_cast<nanoseconds>(
|
||||
system_clock::now()).time_since_epoch().count();
|
||||
|
||||
// Output in InfluxDB line protocol
|
||||
cout << "jf_udp_recv";
|
||||
cout << ",detector_name=" << detector_name_;
|
||||
cout << ",module_name=M" << module_id_;
|
||||
cout << " ";
|
||||
cout << "n_missed_packets=" << n_missed_packets_ << "i";
|
||||
cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i";
|
||||
cout << ",repetition_rate=" << rep_rate << "i";
|
||||
cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i";
|
||||
cout << " ";
|
||||
cout << timestamp;
|
||||
cout << endl;
|
||||
}
|
||||
#include <iostream>
|
||||
#include "JfjFrameStats.hpp"
|
||||
|
||||
using namespace std;
|
||||
using namespace chrono;
|
||||
|
||||
FrameStats::FrameStats(const std::string &detector_name, const size_t stats_time) :
|
||||
detector_name_(detector_name), stats_time_(stats_time) {
|
||||
reset_counters();
|
||||
}
|
||||
|
||||
void FrameStats::reset_counters()
|
||||
{
|
||||
frames_counter_ = 0;
|
||||
n_corrupted_frames_ = 0;
|
||||
n_corrupted_pulse_id_ = 0;
|
||||
stats_interval_start_ = steady_clock::now();
|
||||
}
|
||||
|
||||
void FrameStats::record_stats(const ImageMetadata &meta, const bool bad_pulse_id)
|
||||
{
|
||||
|
||||
if (bad_pulse_id) {
|
||||
n_corrupted_pulse_id_++;
|
||||
n_corrupted_frames_++;
|
||||
}
|
||||
|
||||
frames_counter_++;
|
||||
|
||||
auto time_passed = duration_cast<milliseconds>(steady_clock::now()-stats_interval_start_).count();
|
||||
|
||||
if (time_passed >= stats_time_*1000) {
|
||||
print_stats();
|
||||
reset_counters();
|
||||
}
|
||||
}
|
||||
|
||||
void FrameStats::print_stats(){
|
||||
auto interval_ms_duration = duration_cast<milliseconds>(steady_clock::now()-stats_interval_start_).count();
|
||||
// * 1000 because milliseconds, + 250 because of truncation.
|
||||
int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration;
|
||||
uint64_t timestamp = time_point_cast<nanoseconds>(system_clock::now()).time_since_epoch().count();
|
||||
|
||||
// Output in InfluxDB line protocol
|
||||
cout << "jfj_udp_recv";
|
||||
cout << ",detector_name=" << detector_name_;
|
||||
cout << " ";
|
||||
cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i";
|
||||
cout << ",repetition_rate=" << rep_rate << "i";
|
||||
cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i";
|
||||
cout << " ";
|
||||
cout << timestamp;
|
||||
cout << endl;
|
||||
}
|
||||
|
||||
@@ -1,23 +1,23 @@
|
||||
#include <cstring>
|
||||
#include <jungfrau.hpp>
|
||||
#include <jungfraujoch.hpp>
|
||||
#include "JfjFrameUdpReceiver.hpp"
|
||||
|
||||
using namespace std;
|
||||
using namespace buffer_config;
|
||||
|
||||
JfjFrameUdpReceiver::JfjFrameUdpReceiver(const uint16_t port) {
|
||||
udp_receiver_.bind(port);
|
||||
m_udp_receiver.bind(port);
|
||||
}
|
||||
|
||||
JfjFrameUdpReceiver::~JfjFrameUdpReceiver() {
|
||||
udp_receiver_.disconnect();
|
||||
m_udp_receiver.disconnect();
|
||||
}
|
||||
|
||||
inline void JfjFrameUdpReceiver::init_frame(ImageMetadata& frame_metadata, const jfjoch_packet_t& c_packet) {
|
||||
frame_metadata.pulse_id = c_packet.timestamp;
|
||||
frame_metadata.frame_index = c_packet.framenum;
|
||||
frame_metadata.daq_rec = (uint32_t) c_packet.debug;
|
||||
frame_metadata.is_good_image = (int32_t) true;
|
||||
inline void JfjFrameUdpReceiver::init_frame(ImageMetadata& metadata, const jfjoch_packet_t& c_packet) {
|
||||
metadata.pulse_id = c_packet.timestamp;
|
||||
metadata.frame_index = c_packet.framenum;
|
||||
metadata.daq_rec = (uint32_t) c_packet.debug;
|
||||
metadata.is_good_image = (int32_t) true;
|
||||
}
|
||||
|
||||
inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, char* frame_buffer){
|
||||
@@ -26,7 +26,7 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, ch
|
||||
// Happens if the last packet from the previous frame gets lost.
|
||||
if (m_frame_index != m_buffer.peek_front().framenum) {
|
||||
m_frame_index = m_buffer.peek_front().framenum;
|
||||
frame_metadata.is_good_image = (int32_t) false;
|
||||
metadata.is_good_image = (int32_t) false;
|
||||
return metadata.pulse_id;
|
||||
}
|
||||
|
||||
@@ -38,12 +38,11 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, ch
|
||||
this->init_frame(metadata, c_packet);
|
||||
|
||||
// Copy data to frame buffer
|
||||
size_t offset = JUNGFRAU_DATA_BYTES_PER_PACKET * c_packet.packetnum;
|
||||
memcpy( (void*) (frame_buffer + offset), c_packet.data, JUNGFRAU_DATA_BYTES_PER_PACKET);
|
||||
metadata.n_recv_packets++;
|
||||
size_t offset = JFJOCH_DATA_BYTES_PER_PACKET * c_packet.packetnum;
|
||||
memcpy( (void*) (frame_buffer + offset), c_packet.data, JFJOCH_DATA_BYTES_PER_PACKET);
|
||||
|
||||
// Last frame packet received. Frame finished.
|
||||
if (c_packet.packetnum == JFJ_N_PACKETS_PER_FRAME - 1){
|
||||
if (c_packet.packetnum == JFJOCH_N_PACKETS_PER_FRAME - 1){
|
||||
return metadata.pulse_id;
|
||||
}
|
||||
}
|
||||
@@ -54,10 +53,9 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, ch
|
||||
}
|
||||
|
||||
uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer){
|
||||
// Reset the metadata and frame buffer for the next frame.
|
||||
// Reset the metadata and frame buffer for the next frame. (really needed?)
|
||||
metadata.pulse_id = 0;
|
||||
metadata.n_recv_packets = 0;
|
||||
memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME);
|
||||
memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_PACKET);
|
||||
|
||||
// Process leftover packages in the buffer
|
||||
if (!m_buffer.is_empty()) {
|
||||
|
||||
+13
-20
@@ -5,9 +5,9 @@
|
||||
|
||||
#include "formats.hpp"
|
||||
#include "buffer_config.hpp"
|
||||
#include "FrameUdpReceiver.hpp"
|
||||
#include "JfjFrameUdpReceiver.hpp"
|
||||
#include "BufferUtils.hpp"
|
||||
#include "FrameStats.hpp"
|
||||
#include "JfjFrameStats.hpp"
|
||||
|
||||
using namespace std;
|
||||
using namespace chrono;
|
||||
@@ -18,7 +18,7 @@ int main (int argc, char *argv[]) {
|
||||
|
||||
if (argc != 3) {
|
||||
cout << endl;
|
||||
cout << "Usage: jf_udp_recv [detector_json_filename] [module_id]";
|
||||
cout << "Usage: jfj_udp_recv [detector_json_filename] [module_id]";
|
||||
cout << endl;
|
||||
cout << "\tdetector_json_filename: detector config file path." << endl;
|
||||
cout << "\tmodule_id: id of the module for this process." << endl;
|
||||
@@ -30,18 +30,18 @@ int main (int argc, char *argv[]) {
|
||||
const auto config = read_json_config(string(argv[1]));
|
||||
const int module_id = atoi(argv[2]);
|
||||
|
||||
const auto udp_port = config.start_udp_port + module_id;
|
||||
ImageUdpReceiver receiver(udp_port, module_id);
|
||||
const auto udp_port = config.start_udp_port;
|
||||
JfjFrameUdpReceiver receiver(udp_port);
|
||||
RamBuffer buffer(config.detector_name, config.n_modules);
|
||||
ImageStats stats(config.detector_name, module_id, STATS_TIME);
|
||||
FrameStats stats(config.detector_name, STATS_TIME);
|
||||
|
||||
auto ctx = zmq_ctx_new();
|
||||
auto socket = bind_socket(ctx, config.detector_name, to_string(module_id));
|
||||
|
||||
zmq_ctx_set(ctx, ZMQ_IO_THREADS, ZMQ_IO_THREADS);
|
||||
auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "jungfraujoch");
|
||||
|
||||
// Might be better creating a structure for double buffering
|
||||
ImageMetadata metaBufferA;
|
||||
char* dataBufferA = new char[IMAGE_N_BYTES];
|
||||
char* dataBufferA = new char[JFJOCH_DATA_BYTES_PER_FRAME];
|
||||
|
||||
uint64_t pulse_id_previous = 0;
|
||||
uint64_t frame_index_previous = 0;
|
||||
@@ -49,22 +49,15 @@ int main (int argc, char *argv[]) {
|
||||
|
||||
while (true) {
|
||||
// NOTE: Needs to be pipelined for really high frame rates
|
||||
auto pulse_id = receiver.get_image_from_udp(metaBufferA, dataBufferA);
|
||||
auto pulse_id = receiver.get_frame_from_udp(metaBufferA, dataBufferA);
|
||||
|
||||
bool bad_pulse_id = false;
|
||||
|
||||
if ( ( metaBufferA.frame_index != (frame_index_previous+1) ) ||
|
||||
( (pulse_id-pulse_id_previous) < 0 ) ||
|
||||
( (pulse_id-pulse_id_previous) > 1000 ) ) {
|
||||
|
||||
if ( ( metaBufferA.frame_index != (frame_index_previous+1) ) || ( (pulse_id-pulse_id_previous) < 0 ) || ( (pulse_id-pulse_id_previous) > 1000 ) ) {
|
||||
bad_pulse_id = true;
|
||||
|
||||
} else {
|
||||
|
||||
buffer.write_frame(metaBufferA, dataBufferA);
|
||||
|
||||
zmq_send(socket, &pulse_id, sizeof(pulse_id), 0);
|
||||
|
||||
zmq_send(sender, &metaBufferA, sizeof(metaBufferA), 0);
|
||||
}
|
||||
|
||||
stats.record_stats(metaBufferA, bad_pulse_id);
|
||||
@@ -74,5 +67,5 @@ int main (int argc, char *argv[]) {
|
||||
|
||||
}
|
||||
|
||||
delete[] data;
|
||||
delete[] dataBufferA;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user