Placeholder based on jungfrau

This commit is contained in:
2021-05-28 17:44:15 +02:00
parent be206af5e2
commit 0ae9eb1e88
17 changed files with 1168 additions and 0 deletions
+1
View File
@@ -34,6 +34,7 @@ add_subdirectory("core-buffer")
add_subdirectory("jf-udp-recv")
add_subdirectory("jf-buffer-writer")
add_subdirectory("jf-assembler")
add_subdirectory("jfj-udp-recv")
add_subdirectory("sf-stream")
add_subdirectory("sf-writer")
+37
View File
@@ -0,0 +1,37 @@
#ifndef JUNGFRAUJOCH_H
#define JUNGFRAUJOCH_H
#include <cstdint>
#define JFJOCH_N_MODULES 32
#define JFJOCH_BYTES_PER_PACKET 8240
#define JFJOCH_DATA_BYTES_PER_PACKET 8192
#define JFJOCH_N_PACKETS_PER_FRAME JFJOCH_N_MODULES * 128
#define JFJOCH_DATA_BYTES_PER_FRAME JFJOCH_N_MODULES * 1048576
// 48 bytes + 8192 bytes = 8240 bytes
#pragma pack(push)
#pragma pack(2)
struct jfjoch_packet_t {
uint64_t framenum;
uint32_t exptime;
uint32_t packetnum;
double bunchid;
uint64_t timestamp;
uint16_t moduleID;
uint16_t xCoord;
uint16_t yCoord;
uint16_t zCoord;
uint32_t debug;
uint16_t roundRobin;
uint8_t detectortype;
uint8_t headerVersion;
char data[JFJOCH_DATA_BYTES_PER_PACKET];
};
#pragma pack(pop)
#endif
+18
View File
@@ -0,0 +1,18 @@
file(GLOB SOURCES
src/*.cpp)
add_library(jf-udp-recv-lib STATIC ${SOURCES})
target_include_directories(jf-udp-recv-lib PUBLIC include/)
target_link_libraries(jf-udp-recv-lib
external
core-buffer-lib)
add_executable(jf-udp-recv src/main.cpp)
set_target_properties(jf-udp-recv PROPERTIES OUTPUT_NAME jf_udp_recv)
target_link_libraries(jf-udp-recv
jf-udp-recv-lib
zmq
rt)
enable_testing()
add_subdirectory(test/)
+164
View File
@@ -0,0 +1,164 @@
# sf-buffer
sf-buffer is the component that receives the detector data in form of UDP
packages and writes them down to disk to a binary format. In addition, it
sends a copy of the module frame to sf-stream via ZMQ.
Each sf-buffer process is taking care of a single detector module. The
processes are all independent and do not rely on any external data input
to maximize isolation and possible interactions in our system.
The main design principle is simplicity and decoupling:
- No interprocess dependencies/communication.
- No dependencies on external libraries (as much as possible).
- Using POSIX as much as possible.
We are optimizing for maintainability and long term stability. Performance is
of concern only if the performance criteria are not met.
## Overview
![image_buffer_overview](../docs/sf_daq_buffer-overview-buffer.jpg)
sf-buffer is a single threaded application (without counting the ZMQ IO threads)
that does both receiving, assembling, writing and sending in the same thread.
### UDP receiving
Each process listens to one udp port. Packets coming to this udp port are
assembled into frames. Frames (either complete or with missing packets) are
passed forward. The number of received packets is saved so we can later
(at image assembly time) determine if the frame is valid or not. At this point
we do no validation.
We are currently using **recvmmsg** to minimize the number of switches to
kernel mode.
We expect all packets to come in order or not come at all. Once we see the
package for the next pulse_id we can assume no more packages are coming for
the previous one, and send the assembled frame down the program.
### File writing
Files are written to disk in frames - one write to disk per frame. This gives
us a relaxed 10ms interval of 1 MB writes.
#### File format
The binary file on disk is just a serialization of multiple
**BufferBinaryFormat** structs:
```c++
#pragma pack(push)
#pragma pack(1)
struct ModuleFrame {
uint64_t pulse_id;
uint64_t frame_index;
uint64_t daq_rec;
uint64_t n_recv_packets;
uint64_t module_id;
};
#pragma pack(pop)
#pragma pack(push)
#pragma pack(1)
struct BufferBinaryFormat {
const char FORMAT_MARKER = 0xBE;
ModuleFrame meta;
char data[buffer_config::MODULE_N_BYTES];
};
#pragma pack(pop)
```
![file_layout_image](../docs/sf_daq_buffer-FileLayout.jpg)
Each frame is composed by:
- **FORMAT\_MARKER** (0xBE) - a control byte to determine the validity of the frame.
- **ModuleFrame** - frame meta used in image assembly phase.
- **Data** - assembled frame from a single module.
Frames are written one after another to a specific offset in the file. The
offset is calculated based on the pulse_id, so each frame has a specific place
in the file and there is no need to have an index for frame retrieval.
The offset where a specific pulse_id is written in a file is calculated:
```c++
// We save 1000 pulses in each file.
const uint64_t FILE_MOD = 1000
// Relative index of pulse_id inside file.
size_t file_base = pulse_id % FILE_MOD;
// Offset in bytes of relative index in file.
size_t file_offset = file_base * sizeof(BufferBinaryFormat);
```
We now know where to look for data inside the file, but we still don't know
inside which file to look. For this we need to discuss the folder structure.
#### Folder structure
The folder (as well as file) structure is deterministic in the sense that given
a specific pulse_id, we can directly calculate the folder, file, and file
offset where the data is stored. This allows us to have independent writing
and reading from the buffer without building any indexes.
The binary files written by sf_buffer are saved to:
[detector_folder]/[module_folder]/[data_folder]/[data_file].bin
- **detector\_folder** should always be passed as an absolute path. This is the
container that holds all data related to a specific detector.
- **module\_folder** is usually composed like "M00", "M01". It separates data
from different modules of one detector.
- **data\_folder** and **data\_file** are automatically calculated based on the
current pulse_id, FOLDER_MOD and FILE_MOD attributes. This folders act as our
index for accessing data.
![folder_layout_image](../docs/sf_daq_buffer-FolderLayout.jpg)
```c++
// FOLDER_MOD = 100000
int data_folder = (pulse_id % FOLDER_MOD) * FOLDER_MOD;
// FILE_MOD = 1000
int data_file = (pulse_id % FILE_MOD) * FILE_MOD;
```
The data_folder and data_file folders are named as the first pulse_id that
should be stored inside them.
FOLDER_MOD == 100000 means that each data_folder will contain data for 100000
pulses, while FILE_MOD == 1000 means that each file inside the data_folder
will contain 1000 pulses. The total number of data_files in each data_folder
will therefore be **FILE\_MOD / FOLDER\_MOD = 100**.
#### Analyzing the buffer on disk
In **sf-utils** there is a Python module that allows you to read directly the
buffer in order to debug it or to verify the consistency between the HDF5 file
and the received data.
- VerifyH5DataConsistency.py checks the consistency between the H5 file and
buffer.
- BinaryBufferReader.py reads the buffer and prints meta. The class inside
can also be used in external scripts.
### ZMQ sending
A copy of the data written to disk is also send via ZMQ to the sf-stream. This
is used to provide live viewing / processing capabilities. Each module data is
sent separately, and this is later assembled in the sf-stream.
We use the PUB/SUB mechanism for distributing this data - we cannot control the
rate of the producer, and we would like to avoid distributed image assembly
if possible, so PUSH/PULL does not make sense in this case.
We provide no guarantees on live data delivery, but in practice the number of
dropped or incomplete frames in currently negligible.
The protocol is a serialization of the same data structures we use to
write on disk (no need for additional memory operations before sending out
data). It uses a 2 part multipart ZMQ message:
- The first part is a serialization of the ModuleFrame struct (see above).
- The second part is the data field in the BufferBinaryFormat struct (the frame
data).
+92
View File
@@ -0,0 +1,92 @@
#ifndef CIRCULAR_BUFFER_TEMPLATE_HPP
#define CIRCULAR_BUFFER_TEMPLATE_HPP
#include <algorithm>
#include <cstddef>
#include <cassert>
#include <stdexcept>
#include <iostream>
/**Linear data buffer
A simplified version of FIFO.
**/
template <typename T, size_t CAP>
class DataBuffer{
public:
DataBuffer() {};
~DataBuffer() {};
/**Diagnostics**/
size_t size() const { return ( _write-_read ); }
size_t capacity() const { return _capacity; }
bool is_full(){ return ( (_write - _read)<_capacity ); }
bool is_empty(){ return (_write ==_read); }
/**Operators**/
void zero(){ memset(m_cont, 0, sizeof(m_cont)); }
T& operator[](size_t index); // Array subscript operator
T& container(){ return (_cont; } // Direct container reference
/**Element access**/
const T& pop_front(); //Destructive read
const T& get_front(); //Non-destructive read
void push_back(T item); //Write new element to buffer
/**Guards**/
std::mutex g_mutex;
private:
T m_cont[CAP];
const size_t m_capacity = CAP;
size_t ptr_write = 0;
size_t ptr_read = 0;
};
/** Array subscript operator
Throws 'std::length_error' if out of range.
**/
template<typename T>
T& DataBuffer<T>::operator[](size_t idx){
if(idx > m_capacity){
std::string msg = "Buffer index '" + std::to_string(idx) + "' is out of range with capacity '" + std::to_sting(m_capacity) + "'" + std::endl;
throw std::out_of_range(msg);
}
return m_buffer[idx];
}
template<typename T>
T& DataBuffer<T>::container(){
return m_buffer;
}
/*********************************************************************/
/** Destructive read (i.e. progress the read pointer) **/
template<typename T>
const T& DataBuffer<T>::pop_front(){
std::lock_guard<std::mutex> g_guard;
ptr_read++;
return _buffer[ptr_read-1];
}
/**Push a new element to the ringbuffer (do not progress read pointer)**/
template<typename T>
const T& DataBuffer<T>::peek_front(){
return m_buffer[ptr_read];
}
/**Push a new element to the ringbuffer**/
template<typename T>
void DataBuffer<T>::push_back(T item){
std::lock_guard<std::mutex> g_guard;
if(ptr_write==m_capacity-1){
std::string msg = "Buffer with '" + std::to_sting(m_capacity) + "' capacity is full" + std::endl;
throw std::out_of_range(msg);
}
m_buffer[ptr_write] = item;
ptr_write++;
}
#endif // CIRCULAR_BUFFER_TEMPLATE_HPP
+31
View File
@@ -0,0 +1,31 @@
#include <cstddef>
#include <formats.hpp>
#include <chrono>
#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP
#define SF_DAQ_BUFFER_FRAMESTATS_HPP
class FrameStats {
const std::string detector_name_;
const int module_id_;
size_t stats_time_;
int frames_counter_;
int n_missed_packets_;
int n_corrupted_frames_;
int n_corrupted_pulse_id_;
std::chrono::time_point<std::chrono::steady_clock> stats_interval_start_;
void reset_counters();
void print_stats();
public:
FrameStats(const std::string &detector_name,
const int module_id,
const size_t stats_time);
void record_stats(const ModuleFrame &meta, const bool bad_pulse_id);
};
#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP
@@ -0,0 +1,33 @@
#ifndef SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP
#define SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP
#include <netinet/in.h>
#include "PacketUdpReceiver.hpp"
#include "formats.hpp"
#include "buffer_config.hpp"
class JFJochUdpReceiver {
PacketUdpReceiver m_udp_receiver;
// Incoming packet buffers
jfjoch_packet_t m_packet_buffer[buffer_config::BUFFER_UDP_N_RECV_MSG];
iovec m_recv_buff_ptr[buffer_config::BUFFER_UDP_N_RECV_MSG];
mmsghdr m_msgs[buffer_config::BUFFER_UDP_N_RECV_MSG];
sockaddr_in m_sock_from[buffer_config::BUFFER_UDP_N_RECV_MSG];
bool packet_buffer_loaded_ = false;
int packet_buffer_n_packets_ = 0;
int packet_buffer_offset_ = 0;
inline void init_frame(ImageMetadata& frame_metadata, const int i_packet);
inline void copy_packet_to_buffers(ImageMetadata& metadata, char* frame_buffer, const int i_packet);
inline uint64_t m_process_packets(const int n_packets, ImageMetadata& metadata, char* frame_buffer);
public:
JFJochUdpReceiver(const uint16_t port, const int module_id);
virtual ~JFJochUdpReceiver();
uint64_t get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer);
};
#endif //SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP
@@ -0,0 +1,22 @@
#ifndef UDPRECEIVER_H
#define UDPRECEIVER_H
#include <sys/socket.h>
class PacketUdpReceiver {
int socket_fd_;
public:
PacketUdpReceiver();
virtual ~PacketUdpReceiver();
bool receive(void* buffer, const size_t buffer_n_bytes);
int receive_many(mmsghdr* msgs, const size_t n_msgs);
void bind(const uint16_t port);
void disconnect();
};
#endif //LIB_CPP_H5_WRITER_UDPRECEIVER_H
+71
View File
@@ -0,0 +1,71 @@
#include <iostream>
#include "FrameStats.hpp"
using namespace std;
using namespace chrono;
FrameStats::FrameStats(
const std::string &detector_name,
const int module_id,
const size_t stats_time) :
detector_name_(detector_name),
module_id_(module_id),
stats_time_(stats_time)
{
reset_counters();
}
void FrameStats::reset_counters()
{
frames_counter_ = 0;
n_missed_packets_ = 0;
n_corrupted_frames_ = 0;
n_corrupted_pulse_id_ = 0;
stats_interval_start_ = steady_clock::now();
}
void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id)
{
if (bad_pulse_id) {
n_corrupted_pulse_id_++;
}
if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) {
n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets;
n_corrupted_frames_++;
}
frames_counter_++;
auto time_passed = duration_cast<milliseconds>(
steady_clock::now()-stats_interval_start_).count();
if (time_passed >= stats_time_*1000) {
print_stats();
reset_counters();
}
}
void FrameStats::print_stats()
{
auto interval_ms_duration = duration_cast<milliseconds>(
steady_clock::now()-stats_interval_start_).count();
// * 1000 because milliseconds, + 250 because of truncation.
int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration;
uint64_t timestamp = time_point_cast<nanoseconds>(
system_clock::now()).time_since_epoch().count();
// Output in InfluxDB line protocol
cout << "jf_udp_recv";
cout << ",detector_name=" << detector_name_;
cout << ",module_name=M" << module_id_;
cout << " ";
cout << "n_missed_packets=" << n_missed_packets_ << "i";
cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i";
cout << ",repetition_rate=" << rep_rate << "i";
cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i";
cout << " ";
cout << timestamp;
cout << endl;
}
+112
View File
@@ -0,0 +1,112 @@
#include <cstring>
#include <jungfraujoch.hpp>
#include "JFJochUdpReceiver.hpp"
using namespace std;
using namespace buffer_config;
JFJochUdpReceiver::JFJochUdpReceiver(const uint16_t port, const int module_id) : module_id_(module_id){
udp_receiver_.bind(port);
for (int i = 0; i < BUFFER_UDP_N_RECV_MSG; i++) {
m_recv_buff_ptr[i].iov_base = (void*) &(m_packet_buffer[i]);
m_recv_buff_ptr[i].iov_len = sizeof(jfjoch_packet_t);
msgs_[i].msg_hdr.msg_iov = &m_recv_buff_ptr[i];
msgs_[i].msg_hdr.msg_iovlen = 1;
msgs_[i].msg_hdr.msg_name = &m_sock_from[i];
msgs_[i].msg_hdr.msg_namelen = sizeof(sockaddr_in);
}
}
JFJochUdpReceiver::~JFJochUdpReceiver() {
m_udp_receiver.disconnect();
}
inline void JFJochUdpReceiver::init_frame(ImageMetadata& image_metadata, const int i_packet) {
image_metadata.pulse_id = m_packet_buffer[i_packet].bunchid;
image_metadata.frame_index = m_packet_buffer[i_packet].framenum;
image_metadata.daq_rec = m_packet_buffer[i_packet].debug;
image_metadata.is_good_image = 0;
}
inline void JFJochUdpReceiver::copy_packet_to_buffers(ImageMetadata& metadata, char* frame_buffer, const int idx_packet){
size_t buffer_offset = JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer_[idx_packet].packetnum;
memcpy((void*) (frame_buffer + buffer_offset), m_packet_buffer[idx_packet].data, JUNGFRAU_DATA_BYTES_PER_PACKET);
metadata.n_recv_packets++;
}
/** Copy the contents of the packet buffer into a single assembled image
NOTE: In the jungfrau_packet, framenum is the trigger number
NOTE: Even partial frames are valid
**/
inline uint64_t JFJochUdpReceiver::m_process_packets(const int start_offset, ImageMetadata& metadata, char* frame_buffer){
for (int i_packet=start_offset; i_packet < packet_buffer_n_packets_; i_packet++) {
// First packet for this frame (sucks if this one is missed)
if (metadata.pulse_id == 0) {
init_frame(metadata, i_packet);
}
// Unexpected jump (if the last packet from the previous frame got lost)
if (metadata.frame_index != m_packet_buffer[i_packet].framenum) {
// Save queue status (lazy fifo queue)
m_packet_buffer_loaded_ = true;
m_packet_buffer_offset_ = i_packet;
// Even partial frames are valid?
return metadata.pulse_id;
}
copy_packet_to_buffers(metadata, frame_buffer, i_packet);
// Last frame packet received (frame finished)
if (packet_buffer_[i_packet].packetnum == JFJ_N_PACKETS_PER_FRAME - 1) {
// Buffer is loaded only if this is not the last message.
if (i_packet+1 != packet_buffer_n_packets_) {
// Continue on next packet
m_packet_buffer_loaded = true;
m_packet_buffer_offset = i_packet + 1;
// If i_packet is the last packet the buffer is empty.
} else {
m_packet_buffer_loaded = true;
m_packet_buffer_offset = 0;
}
return metadata.pulse_id;
}
}
// We emptied the buffer.
m_packet_buffer_loaded = false;
m_packet_buffer_offset = 0;
return 0;
}
uint64_t JFJochUdpReceiver::get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer){
// Reset the metadata and frame buffer for the next frame.
metadata.pulse_id = 0;
metadata.n_recv_packets = 0;
memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME);
// Happens when last packet from previous frame was missed.
if (packet_buffer_loaded_) {
auto pulse_id = m_process_packets(packet_buffer_offset_, metadata, frame_buffer);
if (pulse_id != 0) { return pulse_id; }
}
// Otherwise read a new one
while (true) {
packet_buffer_n_packets_ = udp_receiver_.receive_many(msgs_, BUFFER_UDP_N_RECV_MSG);
if (packet_buffer_n_packets_ > 0) {
auto pulse_id = m_process_packets(0, metadata, frame_buffer);
if (pulse_id != 0) { return pulse_id; }
}
}
}
+66
View File
@@ -0,0 +1,66 @@
#include <netinet/in.h>
#include <iostream>
#include "PacketUdpReceiver.hpp"
#include "jungfrau.hpp"
#include <unistd.h>
#include <cstring>
#include "buffer_config.hpp"
using namespace std;
using namespace buffer_config;
PacketUdpReceiver::PacketUdpReceiver() : socket_fd_(-1) { }
PacketUdpReceiver::~PacketUdpReceiver() {
disconnect();
}
void PacketUdpReceiver::bind(const uint16_t port){
if (socket_fd_ > -1) {
throw runtime_error("Socket already bound.");
}
socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
if (socket_fd_ < 0) {
throw runtime_error("Cannot open socket.");
}
sockaddr_in server_address = {0};
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = INADDR_ANY;
server_address.sin_port = htons(port);
timeval udp_socket_timeout;
udp_socket_timeout.tv_sec = 0;
udp_socket_timeout.tv_usec = BUFFER_UDP_US_TIMEOUT;
if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVTIMEO, &udp_socket_timeout, sizeof(timeval)) == -1) {
throw runtime_error("Cannot set SO_RCVTIMEO. " + string(strerror(errno)));
}
if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVBUF, &BUFFER_UDP_RCVBUF_BYTES, sizeof(int)) == -1) {
throw runtime_error("Cannot set SO_RCVBUF. " + string(strerror(errno)));
};
//TODO: try to set SO_RCVLOWAT
auto bind_result = ::bind(socket_fd_, reinterpret_cast<const sockaddr *>(&server_address), sizeof(server_address));
if (bind_result < 0) {
throw runtime_error("Cannot bind socket.");
}
}
int PacketUdpReceiver::receive_many(mmsghdr* msgs, const size_t n_msgs){
return recvmmsg(socket_fd_, msgs, n_msgs, 0, 0);
}
bool PacketUdpReceiver::receive(void* buffer, const size_t buffer_n_bytes){
auto data_len = recv(socket_fd_, buffer, buffer_n_bytes, 0);
return (data_len == buffer_n_bytes) ? true : false;
}
void PacketUdpReceiver::disconnect(){
close(socket_fd_);
socket_fd_ = -1;
}
+78
View File
@@ -0,0 +1,78 @@
#include <iostream>
#include <stdexcept>
#include <zmq.h>
#include <RamBuffer.hpp>
#include "formats.hpp"
#include "buffer_config.hpp"
#include "FrameUdpReceiver.hpp"
#include "BufferUtils.hpp"
#include "FrameStats.hpp"
using namespace std;
using namespace chrono;
using namespace buffer_config;
using namespace BufferUtils;
int main (int argc, char *argv[]) {
if (argc != 3) {
cout << endl;
cout << "Usage: jf_udp_recv [detector_json_filename] [module_id]";
cout << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << "\tmodule_id: id of the module for this process." << endl;
cout << endl;
exit(-1);
}
const auto config = read_json_config(string(argv[1]));
const int module_id = atoi(argv[2]);
const auto udp_port = config.start_udp_port + module_id;
ImageUdpReceiver receiver(udp_port, module_id);
RamBuffer buffer(config.detector_name, config.n_modules);
ImageStats stats(config.detector_name, module_id, STATS_TIME);
auto ctx = zmq_ctx_new();
auto socket = bind_socket(ctx, config.detector_name, to_string(module_id));
// Might be better creating a structure for double buffering
ImageMetadata metaBufferA;
char* dataBufferA = new char[IMAGE_N_BYTES];
uint64_t pulse_id_previous = 0;
uint64_t frame_index_previous = 0;
while (true) {
// NOTE: Needs to be pipelined for really high frame rates
auto pulse_id = receiver.get_image_from_udp(metaBufferA, dataBufferA);
bool bad_pulse_id = false;
if ( ( metaBufferA.frame_index != (frame_index_previous+1) ) ||
( (pulse_id-pulse_id_previous) < 0 ) ||
( (pulse_id-pulse_id_previous) > 1000 ) ) {
bad_pulse_id = true;
} else {
buffer.write_frame(metaBufferA, dataBufferA);
zmq_send(socket, &pulse_id, sizeof(pulse_id), 0);
}
stats.record_stats(metaBufferA, bad_pulse_id);
pulse_id_previous = pulse_id;
frame_index_previous = metaBufferA.frame_index;
}
delete[] data;
}
+8
View File
@@ -0,0 +1,8 @@
add_executable(jf-udp-recv-tests main.cpp)
target_link_libraries(jf-udp-recv-tests
core-buffer-lib
jf-udp-recv-lib
gtest
)
+10
View File
@@ -0,0 +1,10 @@
#include "gtest/gtest.h"
#include "test_PacketUdpReceiver.cpp"
#include "test_FrameUdpReceiver.cpp"
using namespace std;
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
+16
View File
@@ -0,0 +1,16 @@
#ifndef MOCK_UDP_H
#define MOCK_UDP_H
const int MOCK_UDP_PORT(13000);
sockaddr_in get_server_address(uint16_t udp_port)
{
sockaddr_in server_address = {0};
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = INADDR_ANY;
server_address.sin_port = htons(udp_port);
return server_address;
}
#endif
+239
View File
@@ -0,0 +1,239 @@
#include <netinet/in.h>
#include <jungfrau.hpp>
#include "gtest/gtest.h"
#include "FrameUdpReceiver.hpp"
#include "mock/udp.hpp"
#include <thread>
#include <chrono>
#include <future>
using namespace std;
TEST(BufferUdpReceiver, simple_recv)
{
auto n_packets = JF_N_PACKETS_PER_FRAME;
int source_id = 1234;
int n_frames = 5;
uint16_t udp_port = MOCK_UDP_PORT;
auto server_address = get_server_address(udp_port);
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
FrameUdpReceiver udp_receiver(udp_port, source_id);
auto handle = async(launch::async, [&](){
for (int i_frame=0; i_frame < n_frames; i_frame++){
for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
jungfrau_packet send_udp_buffer;
send_udp_buffer.packetnum = i_packet;
send_udp_buffer.bunchid = i_frame + 1;
send_udp_buffer.framenum = i_frame + 1000;
send_udp_buffer.debug = i_frame + 10000;
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET,
0,
(sockaddr*) &server_address,
sizeof(server_address));
}
}
});
handle.wait();
ModuleFrame metadata;
auto frame_buffer = make_unique<char[]>(JUNGFRAU_DATA_BYTES_PER_FRAME);
for (int i_frame=0; i_frame < n_frames; i_frame++) {
auto pulse_id = udp_receiver.get_frame_from_udp(
metadata, frame_buffer.get());
ASSERT_EQ(i_frame + 1, pulse_id);
ASSERT_EQ(metadata.frame_index, i_frame + 1000);
ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
// -1 because we skipped a packet.
ASSERT_EQ(metadata.n_recv_packets, n_packets);
ASSERT_EQ(metadata.module_id, source_id);
}
::close(send_socket_fd);
}
TEST(BufferUdpReceiver, missing_middle_packet)
{
auto n_packets = JF_N_PACKETS_PER_FRAME;
int source_id = 1234;
int n_frames = 3;
uint16_t udp_port = MOCK_UDP_PORT;
auto server_address = get_server_address(udp_port);
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
FrameUdpReceiver udp_receiver(udp_port, source_id);
auto handle = async(launch::async, [&](){
for (int i_frame=0; i_frame < n_frames; i_frame++){
for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
// Skip some random middle packet.
if (i_packet == 10) {
continue;
}
jungfrau_packet send_udp_buffer;
send_udp_buffer.packetnum = i_packet;
send_udp_buffer.bunchid = i_frame + 1;
send_udp_buffer.framenum = i_frame + 1000;
send_udp_buffer.debug = i_frame + 10000;
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET,
0,
(sockaddr*) &server_address,
sizeof(server_address));
}
}
});
handle.wait();
ModuleFrame metadata;
auto frame_buffer = make_unique<char[]>(JUNGFRAU_DATA_BYTES_PER_FRAME);
for (int i_frame=0; i_frame < n_frames; i_frame++) {
auto pulse_id = udp_receiver.get_frame_from_udp(
metadata, frame_buffer.get());
ASSERT_EQ(i_frame + 1, pulse_id);
ASSERT_EQ(metadata.frame_index, i_frame + 1000);
ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
// -1 because we skipped a packet.
ASSERT_EQ(metadata.n_recv_packets, n_packets - 1);
ASSERT_EQ(metadata.module_id, source_id);
}
::close(send_socket_fd);
}
TEST(BufferUdpReceiver, missing_first_packet)
{
auto n_packets = JF_N_PACKETS_PER_FRAME;
int source_id = 1234;
int n_frames = 3;
uint16_t udp_port = MOCK_UDP_PORT;
auto server_address = get_server_address(udp_port);
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
FrameUdpReceiver udp_receiver(udp_port, source_id);
auto handle = async(launch::async, [&](){
for (int i_frame=0; i_frame < n_frames; i_frame++){
for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
// Skip first packet.
if (i_packet == 0) {
continue;
}
jungfrau_packet send_udp_buffer;
send_udp_buffer.packetnum = i_packet;
send_udp_buffer.bunchid = i_frame + 1;
send_udp_buffer.framenum = i_frame + 1000;
send_udp_buffer.debug = i_frame + 10000;
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET,
0,
(sockaddr*) &server_address,
sizeof(server_address));
}
}
});
handle.wait();
ModuleFrame metadata;
auto frame_buffer = make_unique<char[]>(JUNGFRAU_DATA_BYTES_PER_FRAME);
for (int i_frame=0; i_frame < n_frames; i_frame++) {
auto pulse_id = udp_receiver.get_frame_from_udp(
metadata, frame_buffer.get());
ASSERT_EQ(i_frame + 1, pulse_id);
ASSERT_EQ(metadata.frame_index, i_frame + 1000);
ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
// -1 because we skipped a packet.
ASSERT_EQ(metadata.n_recv_packets, n_packets - 1);
ASSERT_EQ(metadata.module_id, source_id);
}
::close(send_socket_fd);
}
TEST(BufferUdpReceiver, missing_last_packet)
{
auto n_packets = JF_N_PACKETS_PER_FRAME;
int source_id = 1234;
int n_frames = 3;
uint16_t udp_port = MOCK_UDP_PORT;
auto server_address = get_server_address(udp_port);
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
FrameUdpReceiver udp_receiver(udp_port, source_id);
auto handle = async(launch::async, [&](){
for (int i_frame=0; i_frame < n_frames; i_frame++){
for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
// Skip the last packet.
if (i_packet == n_packets-1) {
continue;
}
jungfrau_packet send_udp_buffer;
send_udp_buffer.packetnum = i_packet;
send_udp_buffer.bunchid = i_frame + 1;
send_udp_buffer.framenum = i_frame + 1000;
send_udp_buffer.debug = i_frame + 10000;
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET,
0,
(sockaddr*) &server_address,
sizeof(server_address));
}
}
});
handle.wait();
ModuleFrame metadata;
auto frame_buffer = make_unique<char[]>(JUNGFRAU_DATA_BYTES_PER_FRAME);
// n_frames -1 because the last frame is not complete.
for (int i_frame=0; i_frame < n_frames - 1; i_frame++) {
auto pulse_id = udp_receiver.get_frame_from_udp(
metadata, frame_buffer.get());
ASSERT_EQ(i_frame + 1, pulse_id);
ASSERT_EQ(metadata.frame_index, i_frame + 1000);
ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
// -1 because we skipped a packet.
ASSERT_EQ(metadata.n_recv_packets, n_packets - 1);
ASSERT_EQ(metadata.module_id, source_id);
}
::close(send_socket_fd);
}
@@ -0,0 +1,170 @@
#include <netinet/in.h>
#include <jungfrau.hpp>
#include "gtest/gtest.h"
#include "mock/udp.hpp"
#include "PacketUdpReceiver.hpp"
#include <thread>
#include <chrono>
using namespace std;
TEST(PacketUdpReceiver, simple_recv)
{
uint16_t udp_port = MOCK_UDP_PORT;
auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0);
ASSERT_TRUE(send_socket_fd >= 0);
PacketUdpReceiver udp_receiver;
udp_receiver.bind(udp_port);
jungfrau_packet send_udp_buffer;
send_udp_buffer.packetnum = 91;
send_udp_buffer.framenum = 92;
send_udp_buffer.bunchid = 93;
send_udp_buffer.debug = 94;
auto server_address = get_server_address(udp_port);
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET,
0,
(sockaddr*) &server_address,
sizeof(server_address));
this_thread::sleep_for(chrono::milliseconds(100));
jungfrau_packet recv_udp_buffer;
ASSERT_TRUE(udp_receiver.receive(
&recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET));
EXPECT_EQ(send_udp_buffer.packetnum, recv_udp_buffer.packetnum);
EXPECT_EQ(send_udp_buffer.framenum, recv_udp_buffer.framenum);
EXPECT_EQ(send_udp_buffer.bunchid, recv_udp_buffer.bunchid);
EXPECT_EQ(send_udp_buffer.debug, recv_udp_buffer.debug);
ASSERT_FALSE(udp_receiver.receive(
&recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET));
udp_receiver.disconnect();
::close(send_socket_fd);
}
TEST(PacketUdpReceiver, false_recv)
{
uint16_t udp_port = MOCK_UDP_PORT;
auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0);
ASSERT_TRUE(send_socket_fd >= 0);
PacketUdpReceiver udp_receiver;
udp_receiver.bind(udp_port);
jungfrau_packet send_udp_buffer;
jungfrau_packet recv_udp_buffer;
auto server_address = get_server_address(udp_port);
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET-1,
0,
(sockaddr*) &server_address,
sizeof(server_address));
ASSERT_FALSE(udp_receiver.receive(
&recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET));
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET,
0,
(sockaddr*) &server_address,
sizeof(server_address));
ASSERT_TRUE(udp_receiver.receive(
&recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET));
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET-1,
0,
(sockaddr*) &server_address,
sizeof(server_address));
ASSERT_TRUE(udp_receiver.receive(
&recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET-1));
udp_receiver.disconnect();
::close(send_socket_fd);
}
TEST(PacketUdpReceiver, receive_many)
{
auto n_msg_buffer = JF_N_PACKETS_PER_FRAME;
jungfrau_packet recv_buffer[n_msg_buffer];
iovec recv_buff_ptr[n_msg_buffer];
struct mmsghdr msgs[n_msg_buffer];
struct sockaddr_in sockFrom[n_msg_buffer];
for (int i = 0; i < n_msg_buffer; i++) {
recv_buff_ptr[i].iov_base = (void*) &(recv_buffer[i]);
recv_buff_ptr[i].iov_len = sizeof(jungfrau_packet);
msgs[i].msg_hdr.msg_iov = &recv_buff_ptr[i];
msgs[i].msg_hdr.msg_iovlen = 1;
msgs[i].msg_hdr.msg_name = &sockFrom[i];
msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_in);
}
uint16_t udp_port = MOCK_UDP_PORT;
auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0);
ASSERT_TRUE(send_socket_fd >= 0);
PacketUdpReceiver udp_receiver;
udp_receiver.bind(udp_port);
jungfrau_packet send_udp_buffer;
auto server_address = get_server_address(udp_port);
send_udp_buffer.bunchid = 0;
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET,
0,
(sockaddr*) &server_address,
sizeof(server_address));
send_udp_buffer.bunchid = 1;
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET,
0,
(sockaddr*) &server_address,
sizeof(server_address));
this_thread::sleep_for(chrono::milliseconds(10));
auto n_msgs = udp_receiver.receive_many(msgs, JF_N_PACKETS_PER_FRAME);
ASSERT_EQ(n_msgs, 2);
for (size_t i=0;i<n_msgs;i++) {
ASSERT_EQ(msgs[i].msg_len, JUNGFRAU_BYTES_PER_PACKET);
ASSERT_EQ(recv_buffer[i].bunchid, i);
}
n_msgs = udp_receiver.receive_many(msgs, JF_N_PACKETS_PER_FRAME);
ASSERT_EQ(n_msgs, -1);
udp_receiver.disconnect();
::close(send_socket_fd);
}