ZMQ streamer for jungfrau

This commit is contained in:
2021-09-14 10:43:38 +02:00
parent 4484d70aea
commit ad634a8c8c
21 changed files with 1635 additions and 0 deletions
+12
View File
@@ -0,0 +1,12 @@
file(GLOB SOURCES src/*.cpp)
add_library(jfj-combined-lib STATIC ${SOURCES})
target_include_directories(jfj-combined-lib PUBLIC include/)
target_link_libraries(jfj-combined-lib external core-buffer-lib)
add_executable(jfj-combined src/main.cpp)
set_target_properties(jfj-combined PROPERTIES OUTPUT_NAME jfj_combined)
target_link_libraries(jfj-combined jfj-combined-lib zmq rt pthread)
enable_testing()
add_subdirectory(test/)
+164
View File
@@ -0,0 +1,164 @@
# sf-buffer
sf-buffer is the component that receives the detector data in form of UDP
packages and writes them down to disk to a binary format. In addition, it
sends a copy of the module frame to sf-stream via ZMQ.
Each sf-buffer process is taking care of a single detector module. The
processes are all independent and do not rely on any external data input
to maximize isolation and possible interactions in our system.
The main design principle is simplicity and decoupling:
- No interprocess dependencies/communication.
- No dependencies on external libraries (as much as possible).
- Using POSIX as much as possible.
We are optimizing for maintainability and long term stability. Performance is
of concern only if the performance criteria are not met.
## Overview
![image_buffer_overview](../docs/sf_daq_buffer-overview-buffer.jpg)
sf-buffer is a single threaded application (without counting the ZMQ IO threads)
that does both receiving, assembling, writing and sending in the same thread.
### UDP receiving
Each process listens to one udp port. Packets coming to this udp port are
assembled into frames. Frames (either complete or with missing packets) are
passed forward. The number of received packets is saved so we can later
(at image assembly time) determine if the frame is valid or not. At this point
we do no validation.
We are currently using **recvmmsg** to minimize the number of switches to
kernel mode.
We expect all packets to come in order or not come at all. Once we see the
package for the next pulse_id we can assume no more packages are coming for
the previous one, and send the assembled frame down the program.
### File writing
Files are written to disk in frames - one write to disk per frame. This gives
us a relaxed 10ms interval of 1 MB writes.
#### File format
The binary file on disk is just a serialization of multiple
**BufferBinaryFormat** structs:
```c++
#pragma pack(push)
#pragma pack(1)
struct ModuleFrame {
uint64_t pulse_id;
uint64_t frame_index;
uint64_t daq_rec;
uint64_t n_recv_packets;
uint64_t module_id;
};
#pragma pack(pop)
#pragma pack(push)
#pragma pack(1)
struct BufferBinaryFormat {
const char FORMAT_MARKER = 0xBE;
ModuleFrame meta;
char data[buffer_config::MODULE_N_BYTES];
};
#pragma pack(pop)
```
![file_layout_image](../docs/sf_daq_buffer-FileLayout.jpg)
Each frame is composed by:
- **FORMAT\_MARKER** (0xBE) - a control byte to determine the validity of the frame.
- **ModuleFrame** - frame meta used in image assembly phase.
- **Data** - assembled frame from a single module.
Frames are written one after another to a specific offset in the file. The
offset is calculated based on the pulse_id, so each frame has a specific place
in the file and there is no need to have an index for frame retrieval.
The offset where a specific pulse_id is written in a file is calculated:
```c++
// We save 1000 pulses in each file.
const uint64_t FILE_MOD = 1000
// Relative index of pulse_id inside file.
size_t file_base = pulse_id % FILE_MOD;
// Offset in bytes of relative index in file.
size_t file_offset = file_base * sizeof(BufferBinaryFormat);
```
We now know where to look for data inside the file, but we still don't know
inside which file to look. For this we need to discuss the folder structure.
#### Folder structure
The folder (as well as file) structure is deterministic in the sense that given
a specific pulse_id, we can directly calculate the folder, file, and file
offset where the data is stored. This allows us to have independent writing
and reading from the buffer without building any indexes.
The binary files written by sf_buffer are saved to:
[detector_folder]/[module_folder]/[data_folder]/[data_file].bin
- **detector\_folder** should always be passed as an absolute path. This is the
container that holds all data related to a specific detector.
- **module\_folder** is usually composed like "M00", "M01". It separates data
from different modules of one detector.
- **data\_folder** and **data\_file** are automatically calculated based on the
current pulse_id, FOLDER_MOD and FILE_MOD attributes. This folders act as our
index for accessing data.
![folder_layout_image](../docs/sf_daq_buffer-FolderLayout.jpg)
```c++
// FOLDER_MOD = 100000
int data_folder = (pulse_id % FOLDER_MOD) * FOLDER_MOD;
// FILE_MOD = 1000
int data_file = (pulse_id % FILE_MOD) * FILE_MOD;
```
The data_folder and data_file folders are named as the first pulse_id that
should be stored inside them.
FOLDER_MOD == 100000 means that each data_folder will contain data for 100000
pulses, while FILE_MOD == 1000 means that each file inside the data_folder
will contain 1000 pulses. The total number of data_files in each data_folder
will therefore be **FILE\_MOD / FOLDER\_MOD = 100**.
#### Analyzing the buffer on disk
In **sf-utils** there is a Python module that allows you to read directly the
buffer in order to debug it or to verify the consistency between the HDF5 file
and the received data.
- VerifyH5DataConsistency.py checks the consistency between the H5 file and
buffer.
- BinaryBufferReader.py reads the buffer and prints meta. The class inside
can also be used in external scripts.
### ZMQ sending
A copy of the data written to disk is also send via ZMQ to the sf-stream. This
is used to provide live viewing / processing capabilities. Each module data is
sent separately, and this is later assembled in the sf-stream.
We use the PUB/SUB mechanism for distributing this data - we cannot control the
rate of the producer, and we would like to avoid distributed image assembly
if possible, so PUSH/PULL does not make sense in this case.
We provide no guarantees on live data delivery, but in practice the number of
dropped or incomplete frames in currently negligible.
The protocol is a serialization of the same data structures we use to
write on disk (no need for additional memory operations before sending out
data). It uses a 2 part multipart ZMQ message:
- The first part is a serialization of the ModuleFrame struct (see above).
- The second part is the data field in the BufferBinaryFormat struct (the frame
data).
+177
View File
@@ -0,0 +1,177 @@
#ifndef SF_DAQ_FRAME_CACHE_HPP
#define SF_DAQ_FRAME_CACHE_HPP
#include <iostream>
#include <cstring>
#include <deque>
#include <thread>
#include <vector>
#include <functional>
#include <shared_mutex>
#include "../../core-buffer/include/EpicsFieldTypes.hpp"
#include "../../core-buffer/include/formats.hpp"
#include "Watchdog.hpp"
#define MAX_FIFO_LENGTH 32
/** Frame Cache
Similar to a thread-safe RamBuffer that handles concurrency internally via mutexes.
The class operates on in-memory arrays via pointer/reference access. It uses a
linearly increasing pulseID index for cache addressing. The standard placement method
ensures that no data corruption occurs, lines are always flushed before overwrite.
A large-enough buffer should ensure that there is sufficient time to retrieve all
data from all detector modules.
The cache line is flushed on three occasions:
- A new frame is about to overwrite it (by the frame-worker thread)
- Complete frames are queued for flushing internally (by internal worker)
- Incomplete frames are flushed by a watchdog after a timeout (by watchdog worker)
NOTE: The class is header-only for future template-refactoring.
TODO: Multiple queue workers
**/
class FrameCache{
public:
FrameCache(uint64_t N_CAP, uint64_t modX, uint64_t modY, std::function<void(ImageBinaryFormat&)> callback):
m_capacity(N_CAP), m_modX(modX), m_modY(modY), m_mod(modX*modY), m_valid(N_CAP, 0), m_fill(N_CAP, 0), m_lock(N_CAP),
m_buffer(N_CAP, ImageBinaryFormat(512*N_MOD, 1024, sizeof(uint16_t))),
f_send(callback) {
// Initialize buffer metadata
for(auto& it: m_buffer){ memset(&it.meta, 0, sizeof(it.meta)); }
// Initialize the watchdog
std::function<void()> wd_callback = std::bind(&FrameCache::flush_all, this);
m_watchdog = new Watchdog(500, wd_callback);
m_watchdog->Start();
// Start drain worker
m_drainer = std::thread(&FrameCache::drain_loop, this);
};
/** Emplace
Place a recorded frame to it's corresponding module location.
This simultaneously handles buffering, assembly and flushing.
Also handles concurrency (shared and unique mutexes).
NOTE: Forced flushing is performed by the current thread.
**/
void emplace(uint64_t pulseID, uint64_t moduleIDX, BufferBinaryFormat& inc_frame){
// Cache-line index
const uint64_t idx = pulseID % m_capacity;
// A new frame is starting
if(inc_frame.meta.pulse_id != m_buffer[idx].meta.id){
// Unique lock to flush and start a new one
std::unique_lock<std::shared_mutex> p_guard(m_lock[idx]);
// Check if condition persists after getting the mutex
if(inc_frame.meta.pulse_id != m_buffer[idx].meta.id){
start_line(idx, inc_frame.meta);
}
}
// Shared lock for concurrent PUT operations
std::shared_lock<std::shared_mutex> s_guard(m_lock[idx]);
// Calculate destination pointer and copy data
char* ptr_dest = m_buffer[idx].data.data() + moduleIDX * m_blocksize;
std::memcpy((void*)ptr_dest, (void*)&inc_frame.data, m_blocksize);
m_fill[idx]++;
m_watchdog->Kick();
// Queue for draining
if(m_fill[idx]==m_mod-1){
if(m_fill.size() > MAX_FIFO_LENGTH) {
m_drain_queue.push_back(idx);
}
}
}
protected:
/** Flush and start a new line
Flushes a valid cache line and starts another one from the provided metadata.
NOTE : It does not lock, that must be done externally! **/
void start_line(uint64_t idx, ModuleFrame& inc_frame){
// 1. Flush
if(m_valid[idx]){ f_send(m_buffer[idx]); }
// 2. Init new frame
m_buffer[idx].meta.id = inc_frame.pulse_id;
m_buffer[idx].meta.width = 1024 * m_modX;
m_buffer[idx].meta.height = 512 * m_modY;
m_buffer[idx].meta.dtype = (int)DBF_USHORT;
m_buffer[idx].meta.user_1 = inc_frame.frame_index;
m_buffer[idx].meta.user_2 = inc_frame.daq_rec;
m_buffer[idx].meta.status = true;
m_fill[idx] = 0;
m_valid[idx] = 1;
}
/** Flush and invalidate a line
Flushes a valid cache line and invalidates the associated buffer.
NOTE : It does not lock, that must be done externally! **/
void flush_line(uint64_t idx){
if(m_valid[idx]){
f_send(m_buffer[idx]);
m_fill[idx] = 0;
m_valid[idx] = 0;
}
}
/** Flush all lines in the buffer**/
void flush_all(){
for(int64_t idx=0; idx< m_capacity; idx++){
std::unique_lock<std::shared_mutex> p_guard(m_lock[idx]);
flush_line(idx);
}
}
/** Drain loop
Flushes queued frames from the cache buffer and invalidates line.
It also locks the frame for the duration of flushing! **/
void drain_loop(){
while(true){
if(!m_drain_queue.empty()){
uint32_t idx = m_drain_queue.front();
m_drain_queue.pop_front();
// Lock and flush the frame
std::unique_lock<std::shared_mutex> p_guard(m_lock[idx]);
flush_line(idx);
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(2));
}
}
}
/** Variables **/
const uint64_t m_capacity;
const uint64_t m_modX;
const uint64_t m_modY;
const uint64_t m_mod;
const uint64_t m_blocksize = 1024*512*sizeof(uint16_t);
/** Flush function **/
std::function<void(ImageBinaryFormat&)> f_send;
/** Main container and mutex guard **/
std::vector<uint32_t> m_valid;
std::vector<uint32_t> m_fill;
std::vector<std::shared_mutex> m_lock;
std::vector<ImageBinaryFormat> m_buffer;
/** Watchdog timer and flush queue **/
Watchdog *m_watchdog;
std::thread m_drainer;
std::deque<uint32_t> m_drain_queue;
};
#endif // SF_DAQ_FRAME_CACHE_HPP
+31
View File
@@ -0,0 +1,31 @@
#include <cstddef>
#include <chrono>
#include "../../core-buffer/include/formats.hpp"
#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP
#define SF_DAQ_BUFFER_FRAMESTATS_HPP
class FrameStats {
private:
const std::string detector_name_;
const int module_id_;
size_t stats_time_;
int frames_counter_;
int n_missed_packets_;
int n_corrupted_frames_;
int n_corrupted_pulse_id_;
std::chrono::time_point<std::chrono::steady_clock> stats_interval_start_;
void reset_counters();
void print_stats();
public:
FrameStats(const std::string &detector_name, const int module_id, const size_t stats_time);
void record_stats(const ModuleFrame &meta, const bool bad_pulse_id);
};
#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP
+48
View File
@@ -0,0 +1,48 @@
#ifndef SF_DAQ_BUFFER_JFJ_FRAMEWORKER_HPP
#define SF_DAQ_BUFFER_JFJ_FRAMEWORKER_HPP
#include <iostream>
#include <cstring>
#include <functional>
#include "../../core-buffer/include/formats.hpp"
#include "PacketUdpReceiver.hpp"
#include "PacketBuffer.hpp"
#include "FrameStats.hpp"
/** JungfrauJoch UDP receiver
Capture UDP data stream from Jungfrau(Joch) FPGA card.
NOTE: This design will not scale well for higher frame rates...
TODO: Direct copy into FrameCache buffer (saves a memcopy)
**/
class JfjFrameWorker {
const std::string m_moduleName;
const uint64_t m_moduleID;
// UDP and statistics interfaces
FrameStats m_moduleStats;
PacketUdpReceiver m_udp_receiver;
// Buffer and helper structures
bool in_progress = false;
uint64_t m_current_index = 0;
PacketBuffer<jfjoch_packet_t, 64> m_buffer;
// Buffer processing
inline uint64_t process_packets(BufferBinaryFormat& buffer);
uint64_t get_frame(BufferBinaryFormat& buffer);
std::function<void(uint64_t, uint64_t, BufferBinaryFormat&)> f_push_callback;
public:
JfjFrameWorker(const uint16_t port, std::string moduleName, const uint32_t moduleID,
std::function<void(uint64_t, uint64_t, BufferBinaryFormat&)> callback);
virtual ~JfjFrameWorker();
void run();
// Copy semantics : OFF
JfjFrameWorker(JfjFrameWorker const &) = delete;
JfjFrameWorker& operator=(JfjFrameWorker const &) = delete;
};
#endif //SF_DAQ_BUFFER_JFJ_FRAMEWORKER_HPP
+117
View File
@@ -0,0 +1,117 @@
#ifndef CIRCULAR_BUFFER_TEMPLATE_HPP
#define CIRCULAR_BUFFER_TEMPLATE_HPP
#include <cstddef>
#include <stdexcept>
#include <iostream>
#include <mutex>
#include <cstdint>
#if defined(WIN32) || defined(_WIN32) || defined(MINGW32)
#include <winsock2.h>
#else
#include <sys/socket.h>
#include <netinet/in.h>
#endif // defined
/** Linear data buffer (NOT FIFO)
Simplified data buffer that provides pop and push operations and
bundles the actual container with metadata required by <sockets.h>.
It stores the actual data in an accessible C-style array. **/
template <typename T, size_t CAPACITY>
class PacketBuffer{
public:
PacketBuffer() {
// Initialize C-structures as expected by <sockets.h>
for (int i = 0; i < CAPACITY; i++) {
m_recv_buff_ptr[i].iov_base = (void*) &(m_container[i]);
m_recv_buff_ptr[i].iov_len = sizeof(T);
m_msgs[i].msg_hdr.msg_iov = &m_recv_buff_ptr[i];
m_msgs[i].msg_hdr.msg_iovlen = 1;
m_msgs[i].msg_hdr.msg_name = &m_sock_from[i];
m_msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_in);
}
};
// ~PacketBuffer() {};
/**Diagnostics**/
int size() const { return ( idx_write-idx_read ); }
int capacity() const { return m_capacity; }
bool is_full() const { return bool(idx_write >= m_capacity); }
bool is_empty() const { return bool(idx_write <= idx_read); }
/**Operators**/
void reset(){ idx_write = 0; idx_read = 0; }; // Reset the buffer
T& container(){ return m_container; }; // Direct container reference
mmsghdr& msgs(){ return m_msgs; };
/**Element access**/
T& pop_front(); //Destructive read
const T& peek_front(); //Non-destructive read
void push_back(T item); //Write new element to buffer
/**Fill from UDP receiver (threadsafe)**/
template <typename TY>
void fill_from(TY& recv){
std::lock_guard<std::mutex> g_guard(m_mutex);
this->idx_write = recv.receive_many(m_msgs, this->capacity());
// Returns -1 with errno=11 if no data received
if(idx_write==-1){ idx_write = 0; }
this->idx_read = 0;
}
private:
// Main container
T m_container[CAPACITY];
const size_t m_capacity = CAPACITY;
/**Guards**/
std::mutex m_mutex;
/**Read and write index**/
int idx_write = 0;
int idx_read = 0;
// C-structures as expected by <sockets.h>
mmsghdr m_msgs[CAPACITY];
iovec m_recv_buff_ptr[CAPACITY];
sockaddr_in m_sock_from[CAPACITY];
};
/*********************************************************************/
/*********************************************************************/
/*********************************************************************/
/** Destructive read
Standard read access to queues (i.e. progress the read pointer).
Throws 'std::length_error' if container is empty. **/
template <typename T, size_t CAPACITY>
T& PacketBuffer<T, CAPACITY>::pop_front(){
std::lock_guard<std::mutex> g_guard(m_mutex);
if(this->is_empty()) { throw std::out_of_range("Attempted to read empty queue!"); }
idx_read++;
return m_container[idx_read-1];
}
/** Non-destructive read
Standard, non-destructive read access (does not progress the read pointer).
Throws 'std::length_error' if container is empty. **/
template <typename T, size_t CAPACITY>
const T& PacketBuffer<T, CAPACITY>::peek_front(){
std::lock_guard<std::mutex> g_guard(m_mutex);
if(this->is_empty()) { throw std::out_of_range("Attempted to read empty queue!"); }
return m_container[idx_read];
}
/** Push an element into the end of the buffer**/
template <typename T, size_t CAPACITY>
void PacketBuffer<T, CAPACITY>::push_back(T item){
std::lock_guard<std::mutex> g_guard(m_mutex);
if(this->is_full()) { throw std::out_of_range("Attempted to write a full buffer!"); }
m_container[idx_write] = item;
idx_write++;
}
#endif // CIRCULAR_BUFFER_TEMPLATE_HPP
@@ -0,0 +1,27 @@
#ifndef UDPRECEIVER_H
#define UDPRECEIVER_H
#include <cstdint>
#if defined(WIN32) || defined(_WIN32) || defined(MINGW32)
#include <winsock2.h>
#else
#include <sys/socket.h>
#endif // defined
class PacketUdpReceiver {
int socket_fd_;
public:
PacketUdpReceiver();
virtual ~PacketUdpReceiver();
bool receive(void* buffer, const size_t buffer_n_bytes);
int receive_many(mmsghdr* msgs, const size_t n_msgs);
void bind(const uint16_t port);
void disconnect();
};
#endif //LIB_CPP_H5_WRITER_UDPRECEIVER_H
+74
View File
@@ -0,0 +1,74 @@
#ifndef FRAME_CACHE_HPP
#define FRAME_CACHE_HPP
#include <thread>
#include <atomic>
#include <chrono>
#include <mutex>
#include <iostream>
/** Watchdog timer class
Unless kicked repeatedly, it periodically calls a user-defined function.
**/
class Watchdog{
public:
Watchdog(int64_t timeout, std::function<void()> callback): m_timeout(timeout), m_callback(callback) {};
~Watchdog() { Stop(); };
void Start();
void Stop();
void Kick();
protected:
int64_t m_timeout;
std::atomic<bool> m_running = false;
std::function<void()> m_callback;
std::chrono::time_point<std::chrono::steady_clock> m_lastkick;
std::thread m_thread;
std::mutex m_mutex;
void Loop();
};
void Watchdog::Start(){
std::unique_lock<std::mutex> lock(m_mutex);
if(m_running == false){
m_running = true;
m_lastkick = std::chrono::steady_clock::now();
m_thread = std::thread(&Watchdog::Loop, this);
}
}
void Watchdog::Stop(){
std::unique_lock<std::mutex> g_guard(m_mutex);
if(m_running == true){
m_running = false;
m_thread.join();
}
}
void Watchdog::Kick(){
std::unique_lock<std::mutex> g_guard(m_mutex);
m_lastkick = std::chrono::steady_clock::now();
}
void Watchdog::Loop(){
std::cout << "Starting watchdog" << std::endl;
while(m_running){
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - m_lastkick);
if(elapsed.count() < m_timeout){
// std::cout << "Elapsed " << (int64_t)elapsed.count() << " of " << m_timeout << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
} else {
std::cout << "Expired timer" << std::endl;
m_callback();
// Infinite re-kick
std::unique_lock<std::mutex> g_guard(m_mutex);
m_lastkick = std::chrono::steady_clock::now();
}
}
}
#endif // FRAME_CACHE_HPP
+140
View File
@@ -0,0 +1,140 @@
#ifndef SF_DAQ_BUFFER_ZMQ_IMAGE_PUBLISHER_HPP
#define SF_DAQ_BUFFER_ZMQ_IMAGE_PUBLISHER_HPP
#include <iostream>
#include <zmq.hpp>
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include "../../core-buffer/include/formats.hpp"
typedef enum {
DBF_STRING,
DBF_CHAR,
DBF_UCHAR,
DBF_SHORT,
DBF_USHORT,
DBF_LONG,
DBF_ULONG,
DBF_INT64,
DBF_UINT64,
DBF_FLOAT,
DBF_DOUBLE,
DBF_ENUM,
DBF_MENU,
DBF_DEVICE,
DBF_INLINK,
DBF_OUTLINK,
DBF_FWDLINK,
DBF_NOACCESS
}dbfType;
#define ASSERT_FALSE(expr, msg) \
if(bool(expr)) { \
std::string text = "ASSERTION called at " + std::string(__FILE__) + " line " + std::to_string(__LINE__) + "\n"; \
text = text + "Reason: " + std::to_string(expr) + "\n"; \
text = text + "Message:" + msg + "\nErrno: " + std::to_string(errno); \
throw std::runtime_error(text); \
} \
#define ASSERT_TRUE(expr, msg) \
if(!bool(expr)) { \
std::string text = "ASSERTION called at " + std::string(__FILE__) + " line " + std::to_string(__LINE__) + "\n"; \
text = text + "Reason: " + std::to_string(expr) + "\n"; \
text = text + "Message:" + msg + "\nErrno: " + std::to_string(errno); \
throw std::runtime_error(text); \
}
/** ZMQ Publisher
Lightweight wrapper base class to initialize a ZMQ Publisher.
Nothing data specific, but everything is only 'protected'.
It also has an internal mutex that can be used for thread-safe
access to the underlying connection;
**/
class ZmqPublisher {
protected:
const uint16_t m_port;
std::string m_address;
zmq::context_t m_ctx;
zmq::socket_t m_socket;
std::mutex g_zmq_socket;
public:
ZmqPublisher(std::string ip, uint16_t port, uint32_t n_threads) :
m_port(port), m_address("tcp://*:" + std::to_string(port)), m_ctx(n_threads), m_socket(m_ctx, ZMQ_PUB) {
// Bind the socket
m_socket.bind(m_address.c_str());
std::cout << "Initialized ZMQ publisher at " << m_address << std::endl;
};
~ZmqPublisher(){};
};
/** ZMQ Image Publisher
Specialized publisher to send 'ImageBinaryFormat' data format as
multipart message. It also takes care of thread safety.
NOTE: This method implements a single publisher! The receiver should take care of load balancing and redistributing.
**/
class ZmqImagePublisher: public ZmqPublisher {
public:
ZmqImagePublisher(std::string ip, uint16_t port, uint32_t n_threads) : ZmqPublisher(ip, port, n_threads) {};
const std::string topic = "IMAGEDATA";
std::string serializer(const ImageMetadata& meta){
rapidjson::Document header(rapidjson::kObjectType);
auto& header_alloc = header.GetAllocator();
// Fill the RapidJSON header with metadata
header.AddMember("version", meta.version, header_alloc);
header.AddMember("id", meta.id, header_alloc);
header.AddMember("height", meta.height, header_alloc);
header.AddMember("width", meta.width, header_alloc);
header.AddMember("dtype", meta.dtype, header_alloc);
header.AddMember("encoding", meta.encoding, header_alloc);
header.AddMember("array_id", meta.array_id, header_alloc);
header.AddMember("status", meta.status, header_alloc);
header.AddMember("user_1", meta.user_1, header_alloc);
header.AddMember("user_2", meta.user_2, header_alloc);
// Set image shape
auto shape_value = rapidjson::Value(rapidjson::kArrayType);
shape_value.PushBack((uint64_t)meta.height, header_alloc);
shape_value.PushBack((uint64_t)meta.width, header_alloc);
header.AddMember("shape", shape_value, header_alloc);
// Serialize header
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
header.Accept(writer);
std::string text_header = buffer.GetString();
return text_header;
}
void sendImage(ImageBinaryFormat& image){
auto meta_str = serializer(image.meta);
std::cout << "Metadata JSON file:\n" << meta_str << std::endl;
std::lock_guard<std::mutex> guard(g_zmq_socket);
int len;
len = m_socket.send(topic.c_str(), topic.size(), ZMQ_SNDMORE);
ASSERT_TRUE( len >=0, "Failed to send topic data" )
len = m_socket.send(meta_str.c_str(), meta_str.length(), ZMQ_SNDMORE);
ASSERT_TRUE( len >=0, "Failed to send meta data" )
len = m_socket.send(image.data.data(), image.data.size(), 0);
ASSERT_TRUE( len >=0, "Failed to send image data" )
if(image.meta.id%100==0){
std::cout << "Sent ZMQ stream of pulse: " << image.meta.id << std::endl;
}
}
};
#endif //SF_DAQ_BUFFER_ZMQ_IMAGE_PUBLISHER_HPP
+68
View File
@@ -0,0 +1,68 @@
#include <iostream>
#include <sstream>
#include "FrameStats.hpp"
using namespace std;
using namespace chrono;
FrameStats::FrameStats(
const std::string &detector_name,
const int module_id,
const size_t stats_time) :
detector_name_(detector_name),
module_id_(module_id),
stats_time_(stats_time)
{
reset_counters();
}
void FrameStats::reset_counters(){
frames_counter_ = 0;
n_missed_packets_ = 0;
n_corrupted_frames_ = 0;
n_corrupted_pulse_id_ = 0;
stats_interval_start_ = steady_clock::now();
}
void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id){
if (bad_pulse_id) {
n_corrupted_pulse_id_++;
}
if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) {
n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets;
n_corrupted_frames_++;
}
frames_counter_++;
auto time_passed = duration_cast<milliseconds>(steady_clock::now()-stats_interval_start_).count();
if (time_passed >= stats_time_*1000) {
print_stats();
reset_counters();
}
}
void FrameStats::print_stats(){
auto interval_ms_duration = duration_cast<milliseconds>(steady_clock::now()-stats_interval_start_).count();
// * 1000 because milliseconds, + 250 because of truncation.
int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration;
uint64_t timestamp = time_point_cast<nanoseconds>(system_clock::now()).time_since_epoch().count();
// Output in InfluxDB line protocol
stringstream ss;
ss << "jf_udp_recv";
ss << ",detector_name=" << detector_name_;
ss << ",module_name=M" << module_id_;
ss << " ";
ss << "n_missed_packets=" << n_missed_packets_ << "i";
ss << ",n_corrupted_frames=" << n_corrupted_frames_ << "i";
ss << ",repetition_rate=" << rep_rate << "i";
ss << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i";
ss << " ";
ss << timestamp;
ss << std::endl;
std::cout << ss.str();
}
+107
View File
@@ -0,0 +1,107 @@
#include <sstream>
#include "FrameWorker.hpp"
JfjFrameWorker::JfjFrameWorker(const uint16_t port, std::string moduleName, const uint32_t moduleID, std::function<void(uint64_t, uint64_t, BufferBinaryFormat&)> callback):
m_moduleName(moduleName), m_moduleID(moduleID), m_moduleStats(moduleName, moduleID, 10.0), f_push_callback(callback) {
m_udp_receiver.bind(port);
}
JfjFrameWorker::~JfjFrameWorker() {
m_udp_receiver.disconnect();
}
/** Process Packets
Drains the buffer either until it's empty or the current frame is finished.
Has some optimizations and safety checks before segfaulting right away...
TODO: Direct memcopy into FrameCache for more speed! **/
inline uint64_t JfjFrameWorker::process_packets(BufferBinaryFormat& buffer){
while(!m_buffer.is_empty()){
// Happens if the last packet from the previous frame gets lost.
if (m_current_index != m_buffer.peek_front().bunchid) {
m_current_index = m_buffer.peek_front().bunchid;
if(this->in_progress){
this->in_progress = false;
return buffer.meta.pulse_id;
}
}
// Otherwise pop the queue (and set current frame index)
jfjoch_packet_t& c_packet = m_buffer.pop_front();
// Sanity check: rather throw than segfault...
if(c_packet.packetnum >= JF_N_PACKETS_PER_FRAME) {
std::stringstream ss;
ss << "Packet index '" << c_packet.packetnum << "' is out of range of " << JF_N_PACKETS_PER_FRAME << std::endl;
throw std::range_error(ss.str());
}
// Start new frame
if(!this->in_progress) {
m_current_index = c_packet.bunchid;
this->in_progress = true;
// Always copy metadata (otherwise problem when 0th packet gets lost)
buffer.meta.pulse_id = c_packet.bunchid;
buffer.meta.frame_index = c_packet.framenum;
buffer.meta.daq_rec = c_packet.debug;
buffer.meta.module_id = m_moduleID;
}
// Copy data to frame buffer
size_t offset = JUNGFRAU_DATA_BYTES_PER_PACKET * c_packet.packetnum;
std::memcpy( (void*) (buffer.data + offset), c_packet.data, JUNGFRAU_DATA_BYTES_PER_PACKET);
buffer.meta.n_recv_packets++;
// Last frame packet received. Frame finished.
if (c_packet.packetnum == JF_N_PACKETS_PER_FRAME - 1) {
this->in_progress = false;
return buffer.meta.pulse_id;
}
}
// We emptied the buffer.
return 0;
}
uint64_t JfjFrameWorker::get_frame(BufferBinaryFormat& buffer){
// Reset the metadata and frame buffer for the next frame
std::memset(&buffer, 0, sizeof(buffer));
uint64_t pulse_id = 0;
// Hehehehe... do-while loop!
do {
// First make sure the buffer is drained of leftovers
pulse_id = process_packets(buffer);
if (pulse_id != 0) { return pulse_id; }
// Then try to refill buffer...
m_buffer.fill_from(m_udp_receiver);
} while (true);
}
void JfjFrameWorker::run(){
std::cout << "Running worker loop" << std::endl;
// Might be better creating a structure for double buffering
BufferBinaryFormat buffer;
try{
while (true) {
// NOTE: Needs to be pipelined for really high frame rates
auto pulse_id = get_frame(buffer);
m_moduleStats.record_stats(buffer.meta, true);
if(pulse_id>10) {
f_push_callback(pulse_id, m_moduleID, buffer);
}
}
} catch (const std::exception& ex) {
std::cout << "Exception in worker loop: " << ex.what() << std::endl;
throw;
};
}
+66
View File
@@ -0,0 +1,66 @@
#include <netinet/in.h>
#include <iostream>
#include <unistd.h>
#include <cstring>
#include "PacketUdpReceiver.hpp"
#include "../../core-buffer/include/buffer_config.hpp"
using namespace std;
using namespace buffer_config;
PacketUdpReceiver::PacketUdpReceiver() : socket_fd_(-1) { }
PacketUdpReceiver::~PacketUdpReceiver() {
disconnect();
}
void PacketUdpReceiver::bind(const uint16_t port){
if (socket_fd_ > -1) {
throw runtime_error("Socket already bound.");
}
socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
if (socket_fd_ < 0) {
throw runtime_error("Cannot open socket.");
}
sockaddr_in server_address = {0};
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = INADDR_ANY;
server_address.sin_port = htons(port);
timeval udp_socket_timeout;
udp_socket_timeout.tv_sec = 0;
udp_socket_timeout.tv_usec = BUFFER_UDP_US_TIMEOUT;
if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVTIMEO, &udp_socket_timeout, sizeof(timeval)) == -1) {
throw runtime_error("Cannot set SO_RCVTIMEO. " + string(strerror(errno)));
}
if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVBUF, &BUFFER_UDP_RCVBUF_BYTES, sizeof(int)) == -1) {
throw runtime_error("Cannot set SO_RCVBUF. " + string(strerror(errno)));
};
//TODO: try to set SO_RCVLOWAT
auto bind_result = ::bind(socket_fd_, reinterpret_cast<const sockaddr *>(&server_address), sizeof(server_address));
if (bind_result < 0) {
throw runtime_error("Cannot bind socket.");
}
}
int PacketUdpReceiver::receive_many(mmsghdr* msgs, const size_t n_msgs){
return recvmmsg(socket_fd_, msgs, n_msgs, 0, 0);
}
bool PacketUdpReceiver::receive(void* buffer, const size_t buffer_n_bytes){
auto data_len = recv(socket_fd_, buffer, buffer_n_bytes, 0);
return (data_len == buffer_n_bytes) ? true : false;
}
void PacketUdpReceiver::disconnect(){
close(socket_fd_);
socket_fd_ = -1;
}
+60
View File
@@ -0,0 +1,60 @@
#include <iostream>
#include <thread>
#include <stdexcept>
#include <zmq.h>
#include "BufferUtils.hpp"
#include "formats.hpp"
#include "../include/FrameCache.hpp"
#include "../include/FrameWorker.hpp"
#include "../include/ZmqImagePublisher.hpp"
int main (int argc, char *argv[]) {
if (argc != 2) {
std::cout << "\nUsage: jf_buffer_writer [detector_json_filename]\n";
std::cout << "\tdetector_json_filename: detector config file path." << std::endl;
std::cout << "\tZMQ publisher port: 5200 (high data rate)" << std::endl;
exit(-1);
}
const auto config = BufferUtils::read_json_config(std::string(argv[1]));
std::cout << "Creating ZMQ sockets..." << std::endl;
ZmqImagePublisher pub("*", 5200, 2);
// ... and extracting sender function
std::function<void(ImageBinaryFormat&)> zmq_publish =
std::bind(&ZmqImagePublisher::sendImage, &pub, std::placeholders::_1);
std::cout << "Creating frame cache..." << std::endl;
FrameCache cache(128, 1, 3, zmq_publish);
// ... and extracting push function
std::function<void(uint64_t, uint64_t, BufferBinaryFormat&)> push_cb =
std::bind(&FrameCache::emplace, &cache, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
std::cout << "Creating frame workers..." << std::endl;
std::vector<std::shared_ptr<JfjFrameWorker>> vWorkers;
for(int mm=0; mm<config.n_modules; mm++){
// Module name (not really used...)
char m_name[128];
snprintf(m_name, 128, "M%02d", mm);
std::string moduleName(m_name);
vWorkers.emplace_back( std::make_shared<JfjFrameWorker>(config.start_udp_port+mm, moduleName, mm, push_cb) );
}
std::cout << "Starting frame worker threads..." << std::endl;
std::vector<std::thread> vThreads;
for(int mm=0; mm<config.n_modules; mm++){
vThreads.push_back( std::thread(&JfjFrameWorker::run, vWorkers[mm].get()) );
}
for(auto& it: vThreads){
it.join();
}
std::cout << "Exiting program..." << std::endl;
return 0;
}
+8
View File
@@ -0,0 +1,8 @@
add_executable(jfj-combined-tests main.cpp)
target_link_libraries(jfj-combined-tests
core-buffer-lib
jfj-combined-lib
gtest
)
+11
View File
@@ -0,0 +1,11 @@
#include "gtest/gtest.h"
#include "test_PacketUdpReceiver.cpp"
#include "test_FrameUdpReceiver.cpp"
#include "test_PacketBuffer.cpp"
using namespace std;
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
@@ -0,0 +1,13 @@
{
"streamvis_stream": "127.0.0.1",
"streamvis_rate": 2,
"live_stream": "127.0.0.1",
"live_rate": 2,
"live_rate": 2,
"pedestal_file": "/dev/null",
"gain_file": "/dev/null",
"detector_name": "JF",
"n_modules": 3,
"start_udp_port": 5100,
"buffer_folder": "."
}
+16
View File
@@ -0,0 +1,16 @@
#ifndef MOCK_UDP_H
#define MOCK_UDP_H
const int MOCK_UDP_PORT(13000);
sockaddr_in get_server_address(uint16_t udp_port)
{
sockaddr_in server_address = {0};
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = INADDR_ANY;
server_address.sin_port = htons(udp_port);
return server_address;
}
#endif
+199
View File
@@ -0,0 +1,199 @@
#include <netinet/in.h>
#include <jungfraujoch.hpp>
#include "gtest/gtest.h"
#include "JfjFrameUdpReceiver.hpp"
#include "mock/udp.hpp"
#include <thread>
#include <chrono>
#include <future>
using namespace std;
#define NUM_TEST_MODULES 3
TEST(BufferUdpReceiver, simple_recv){
int n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE;
int n_frames = 3;
uint16_t udp_port = MOCK_UDP_PORT;
auto server_address = get_server_address(udp_port);
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES);
auto handle = async(launch::async, [&](){
for (int64_t i_frame=0; i_frame < n_frames; i_frame++){
for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
jfjoch_packet_t send_udp_buffer;
send_udp_buffer.packetnum = i_packet;
send_udp_buffer.bunchid = i_frame + 1;
send_udp_buffer.framenum = i_frame + 1000;
send_udp_buffer.debug = i_frame + 10000;
::sendto(send_socket_fd, &send_udp_buffer, JFJOCH_BYTES_PER_PACKET,
0, (sockaddr*) &server_address, sizeof(server_address));
}
}
});
handle.wait();
ModuleFrame metadata;
auto frame_buffer = make_unique<char[]>(NUM_TEST_MODULES*JFJOCH_DATA_BYTES_PER_MODULE);
for (int i_frame=0; i_frame < n_frames; i_frame++) {
auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get());
ASSERT_EQ(i_frame + 1, pulse_id);
ASSERT_EQ(metadata.frame_index, i_frame + 1000);
ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
ASSERT_EQ(metadata.n_recv_packets, n_packets);
}
::close(send_socket_fd);
}
TEST(BufferUdpReceiver, missing_middle_packet){
int n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE;
int n_frames = 3;
uint16_t udp_port = MOCK_UDP_PORT;
auto server_address = get_server_address(udp_port);
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES);
auto handle = async(launch::async, [&](){
for (int64_t i_frame=0; i_frame < n_frames; i_frame++){
for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
// Skip some random middle packet.
if (i_packet == 10) { continue; }
jfjoch_packet_t send_udp_buffer;
send_udp_buffer.packetnum = i_packet;
send_udp_buffer.bunchid = i_frame + 1;
send_udp_buffer.framenum = i_frame + 1000;
send_udp_buffer.debug = i_frame + 10000;
::sendto(send_socket_fd, &send_udp_buffer, JFJOCH_BYTES_PER_PACKET,
0, (sockaddr*) &server_address, sizeof(server_address));
}
}
});
handle.wait();
ModuleFrame metadata;
auto frame_buffer = make_unique<char[]>(NUM_TEST_MODULES * JFJOCH_DATA_BYTES_PER_MODULE);
for (int i_frame=0; i_frame < n_frames; i_frame++) {
auto pulse_id = udp_receiver.get_frame_from_udp(
metadata, frame_buffer.get());
ASSERT_EQ(i_frame + 1, pulse_id);
ASSERT_EQ(metadata.frame_index, i_frame + 1000);
ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
// -1 because we skipped a packet.
ASSERT_EQ(metadata.n_recv_packets, n_packets - 1);
}
::close(send_socket_fd);
}
TEST(BufferUdpReceiver, missing_first_packet){
auto n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE;
int n_frames = 3;
uint16_t udp_port = MOCK_UDP_PORT;
auto server_address = get_server_address(udp_port);
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES);
auto handle = async(launch::async, [&](){
for (int64_t i_frame=0; i_frame < n_frames; i_frame++){
for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
// Skip first packet.
if (i_packet == 0) {continue;}
jfjoch_packet_t send_udp_buffer;
send_udp_buffer.packetnum = i_packet;
send_udp_buffer.bunchid = i_frame + 1;
send_udp_buffer.framenum = i_frame + 1000;
send_udp_buffer.debug = i_frame + 10000;
::sendto(send_socket_fd, &send_udp_buffer, JUNGFRAU_BYTES_PER_PACKET,
0, (sockaddr*) &server_address, sizeof(server_address));
}
}
});
handle.wait();
ModuleFrame metadata;
auto frame_buffer = make_unique<char[]>(NUM_TEST_MODULES * JFJOCH_DATA_BYTES_PER_MODULE);
for (int i_frame=0; i_frame < n_frames; i_frame++) {
auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get());
ASSERT_EQ(i_frame + 1, pulse_id);
ASSERT_EQ(metadata.frame_index, i_frame + 1000);
ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
// -2 because we skipped a packet.
ASSERT_EQ(metadata.n_recv_packets, n_packets - 1);
}
::close(send_socket_fd);
}
TEST(BufferUdpReceiver, missing_last_packet){
int n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE;
int n_frames = 4;
uint16_t udp_port = MOCK_UDP_PORT;
auto server_address = get_server_address(udp_port);
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES);
auto handle = async(launch::async, [&](){
for (int64_t i_frame=0; i_frame < n_frames+1; i_frame++){
for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
// Skip the last packet.
if (i_packet == n_packets-1) {continue;}
jfjoch_packet_t send_udp_buffer;
send_udp_buffer.packetnum = i_packet;
send_udp_buffer.bunchid = i_frame + 1;
send_udp_buffer.framenum = i_frame + 1000;
send_udp_buffer.debug = i_frame + 10000;
::sendto(send_socket_fd, &send_udp_buffer, JUNGFRAU_BYTES_PER_PACKET,
0, (sockaddr*) &server_address, sizeof(server_address));
}
}
});
handle.wait();
ModuleFrame metadata;
auto frame_buffer = make_unique<char[]>(NUM_TEST_MODULES * JFJOCH_DATA_BYTES_PER_MODULE);
// n_frames -1 because the last frame is not complete.
for (int i_frame=0; i_frame < n_frames - 1; i_frame++) {
auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get());
ASSERT_EQ(i_frame + 1, pulse_id);
ASSERT_EQ(metadata.frame_index, i_frame + 1000);
ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
// -1 because we skipped a packet.
ASSERT_EQ(metadata.n_recv_packets, n_packets - 1);
}
::close(send_socket_fd);
}
+76
View File
@@ -0,0 +1,76 @@
#include <netinet/in.h>
#include <jungfraujoch.hpp>
#include "gtest/gtest.h"
#include "PacketBuffer.hpp"
#include <thread>
#include <chrono>
#include <future>
using namespace std;
std::ostream &operator<<(std::ostream &os, jfjoch_packet_t const &packet) {
os << "Frame number: " << packet.framenum << std::endl;
os << "Packet number: " << packet.packetnum << std::endl;
os << "Bunch id: " << packet.bunchid << std::endl;
os << std::endl;
return os;
}
class MockReceiver{
public:
uint64_t idx_packet = 42000;
uint64_t packet_per_frame = 512;
uint64_t num_bunches = 100;
uint64_t num_packets =50;
jfjoch_packet_t tmp;
uint64_t receive_many(mmsghdr* msgs, const size_t n_msgs){
// Receive 'num_packets numner of packets'
for(int ii=0; ii<num_packets; ii++){
tmp.framenum = idx_packet / packet_per_frame;
tmp.bunchid = 1000 + idx_packet / packet_per_frame;
tmp.packetnum = idx_packet % packet_per_frame;
memcpy( msgs[ii].msg_hdr.msg_iov->iov_base, &tmp, sizeof(tmp));
idx_packet++;
}
return num_packets;
};
};
TEST(BufferUdpReceiver, packetbuffer_simple){
PacketBuffer<jfjoch_packet_t, 128> p_buffer;
MockReceiver mockery;
uint64_t prev_bunch, prev_packet;
jfjoch_packet_t p_pop;
mockery.idx_packet = 7*512 + 13;
mockery.num_packets = 25;
p_buffer.fill_from(mockery);
// First packet
ASSERT_FALSE(p_buffer.is_empty());
ASSERT_EQ(p_buffer.size(), 25);
ASSERT_EQ(p_buffer.peek_front().bunchid, 1007);
ASSERT_EQ(p_buffer.peek_front().packetnum, 13);
prev_bunch = p_buffer.peek_front().bunchid;
prev_packet = p_buffer.peek_front().packetnum;
p_pop = p_buffer.pop_front();
ASSERT_EQ(p_buffer.size(), 24);
ASSERT_EQ(p_pop.bunchid, prev_bunch);
ASSERT_EQ(p_pop.packetnum, prev_packet);
ASSERT_EQ(p_buffer.peek_front().bunchid, prev_bunch);
ASSERT_EQ(p_buffer.peek_front().packetnum, prev_packet+1);
};
@@ -0,0 +1,170 @@
#include <netinet/in.h>
#include <jungfrau.hpp>
#include "gtest/gtest.h"
#include "mock/udp.hpp"
#include "PacketUdpReceiver.hpp"
#include <thread>
#include <chrono>
using namespace std;
TEST(PacketUdpReceiver, simple_recv)
{
uint16_t udp_port = MOCK_UDP_PORT;
auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0);
ASSERT_TRUE(send_socket_fd >= 0);
PacketUdpReceiver udp_receiver;
udp_receiver.bind(udp_port);
jungfrau_packet send_udp_buffer;
send_udp_buffer.packetnum = 91;
send_udp_buffer.framenum = 92;
send_udp_buffer.bunchid = 93;
send_udp_buffer.debug = 94;
auto server_address = get_server_address(udp_port);
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET,
0,
(sockaddr*) &server_address,
sizeof(server_address));
this_thread::sleep_for(chrono::milliseconds(100));
jungfrau_packet recv_udp_buffer;
ASSERT_TRUE(udp_receiver.receive(
&recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET));
EXPECT_EQ(send_udp_buffer.packetnum, recv_udp_buffer.packetnum);
EXPECT_EQ(send_udp_buffer.framenum, recv_udp_buffer.framenum);
EXPECT_EQ(send_udp_buffer.bunchid, recv_udp_buffer.bunchid);
EXPECT_EQ(send_udp_buffer.debug, recv_udp_buffer.debug);
ASSERT_FALSE(udp_receiver.receive(
&recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET));
udp_receiver.disconnect();
::close(send_socket_fd);
}
TEST(PacketUdpReceiver, false_recv)
{
uint16_t udp_port = MOCK_UDP_PORT;
auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0);
ASSERT_TRUE(send_socket_fd >= 0);
PacketUdpReceiver udp_receiver;
udp_receiver.bind(udp_port);
jungfrau_packet send_udp_buffer;
jungfrau_packet recv_udp_buffer;
auto server_address = get_server_address(udp_port);
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET-1,
0,
(sockaddr*) &server_address,
sizeof(server_address));
ASSERT_FALSE(udp_receiver.receive(
&recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET));
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET,
0,
(sockaddr*) &server_address,
sizeof(server_address));
ASSERT_TRUE(udp_receiver.receive(
&recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET));
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET-1,
0,
(sockaddr*) &server_address,
sizeof(server_address));
ASSERT_TRUE(udp_receiver.receive(
&recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET-1));
udp_receiver.disconnect();
::close(send_socket_fd);
}
TEST(PacketUdpReceiver, receive_many)
{
auto n_msg_buffer = JF_N_PACKETS_PER_FRAME;
jungfrau_packet recv_buffer[n_msg_buffer];
iovec recv_buff_ptr[n_msg_buffer];
struct mmsghdr msgs[n_msg_buffer];
struct sockaddr_in sockFrom[n_msg_buffer];
for (int i = 0; i < n_msg_buffer; i++) {
recv_buff_ptr[i].iov_base = (void*) &(recv_buffer[i]);
recv_buff_ptr[i].iov_len = sizeof(jungfrau_packet);
msgs[i].msg_hdr.msg_iov = &recv_buff_ptr[i];
msgs[i].msg_hdr.msg_iovlen = 1;
msgs[i].msg_hdr.msg_name = &sockFrom[i];
msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_in);
}
uint16_t udp_port = MOCK_UDP_PORT;
auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0);
ASSERT_TRUE(send_socket_fd >= 0);
PacketUdpReceiver udp_receiver;
udp_receiver.bind(udp_port);
jungfrau_packet send_udp_buffer;
auto server_address = get_server_address(udp_port);
send_udp_buffer.bunchid = 0;
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET,
0,
(sockaddr*) &server_address,
sizeof(server_address));
send_udp_buffer.bunchid = 1;
::sendto(
send_socket_fd,
&send_udp_buffer,
JUNGFRAU_BYTES_PER_PACKET,
0,
(sockaddr*) &server_address,
sizeof(server_address));
this_thread::sleep_for(chrono::milliseconds(10));
auto n_msgs = udp_receiver.receive_many(msgs, JF_N_PACKETS_PER_FRAME);
ASSERT_EQ(n_msgs, 2);
for (size_t i=0;i<n_msgs;i++) {
ASSERT_EQ(msgs[i].msg_len, JUNGFRAU_BYTES_PER_PACKET);
ASSERT_EQ(recv_buffer[i].bunchid, i);
}
n_msgs = udp_receiver.receive_many(msgs, JF_N_PACKETS_PER_FRAME);
ASSERT_EQ(n_msgs, -1);
udp_receiver.disconnect();
::close(send_socket_fd);
}
+51
View File
@@ -0,0 +1,51 @@
#include "gtest/gtest.h"
#include "Watchdog.hpp"
#include <thread>
#include <chrono>
#include <future>
using namespace std;
uint64_t tick_counter = 0;
// Dummy callback to increase tick counter
void mock_callback(){
tick_counter++;
};
TEST(WatchdogTimer, timer_test){
Watchdog wDog(100, &mock_callback);
// Free running
wDog.Start();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
ASSERT_EQ(tick_counter, 0);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
ASSERT_EQ(tick_counter, 5);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
ASSERT_EQ(tick_counter, 10);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
ASSERT_EQ(tick_counter, 15);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
ASSERT_EQ(tick_counter, 20);
// Test Stop()
wDog.Stop();
std::this_thread::sleep_for(std::chrono::milliseconds(500));
ASSERT_EQ(tick_counter, 20);
// Test Kick()
tick_counter = 0;
wDog.Start();
std::this_thread::sleep_for(std::chrono::milliseconds(250));
ASSERT_EQ(tick_counter, 2);
for(int ii=0; ii<20 ii++){
wDog.Kick();
ASSERT_EQ(tick_counter, 2);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
std::this_thread::sleep_for(std::chrono::milliseconds(150));
ASSERT_EQ(tick_counter, 4);
}