This commit is contained in:
2021-06-30 14:29:09 +02:00
parent 395fff5da8
commit 99bbb270b7
7 changed files with 123 additions and 150 deletions
+32 -25
View File
@@ -1,24 +1,23 @@
#ifndef FRAME_CACHE_HPP
#define FRAME_CACHE_HPP
#ifndef SF_DAQ_FRAME_CACHE_HPP
#define SF_DAQ_FRAME_CACHE_HPP
#include <cstddef>
#include <cstring>
#include <stdexcept>
// #include <cstddef>
// #include <cstring>
// #include <stdexcept>
#include <iostream>
#include <mutex>
#include <shared_mutex>
#include <vector>
#include <atomic>
#include <functional>
#include <thread>
#include <mutex>
#include <shared_mutex>
// #include <atomic>
// #include <thread>
// #include <mutex>
#include "../../core-buffer/include/formats.hpp"
/** Frame cache
Reimplemented RamBuffer for better concurrency.
Reimplemented RamBuffer that handles concurrency internally via mutexes.
The class operates on in-memory arrays via pointer/reference access. It uses a
linearly increasing pulseID index to provide some headroom for collecting frames
from multiple detectors.
@@ -28,24 +27,26 @@ public:
FrameCache(uint64_t _C, uint64_t N_MOD, std::function<void(ImageBinaryFormat&)> callback):
m_CAP(_C), m_M(N_MOD),
m_buffer(_C, ImageBinaryFormat(512*N_MOD, 1024, sizeof(uint16_t))),
f_send(callback), m_lock(_C), m_valid(_C) {
f_send(callback), m_lock(_C), m_valid(_C, 0) {
// Initialize buffer metadata
for(auto& it: m_buffer){ memset(&it.meta, 0, sizeof(it.meta)); }
// Initialize Mutexes
for(auto& it: m_valid){ it = 0; }
//for(auto& it: m_valid){ it = 0; }
};
/** Emplace
Place a recorded frame to it's corresponding module location.
This simultaneously handles buffering and assembly. **/
This simultaneously handles buffering, assembly and flushing.
Also handles concurrency (shared and unique mutexes). **/
void emplace(uint64_t pulseID, uint64_t moduleIDX, BufferBinaryFormat& inc_frame){
uint64_t idx = pulseID % m_CAP;
// A new frame is starting
if(inc_frame.meta.pulse_id != m_buffer[idx].meta.pulse_id){
// Unique lock to flush and start a new one
std::unique_lock<std::shared_mutex> p_guard(m_lock[idx]);
// Check if condition persists after getting the mutex
if(inc_frame.meta.pulse_id != m_buffer[idx].meta.pulse_id){
@@ -56,34 +57,38 @@ public:
// Shared lock for concurrent PUT operations
std::shared_lock<std::shared_mutex> s_guard(m_lock[idx]);
// Calculate destination pointer (easier to debug)
// Calculate destination pointer and copy data
char* ptr_dest = m_buffer[idx].data.data() + moduleIDX * m_blocksize;
std::memcpy((void*)ptr_dest, (void*)&inc_frame.data, m_blocksize);
}
void flush_all(){
for(int64_t idx=0; idx< m_CAP; idx++){
std::unique_lock<std::shared_mutex> p_guard(m_lock[idx]);
flush_line(idx);
}
}
// Flush and invalidate a line (incl. lock)
/** Flush and invalidate a line
Flushes a valid cache line and invalidates the associated buffer.
NOTE : It does not lock, that must be done externally! **/
void flush_line(uint64_t idx){
std::unique_lock<std::shared_mutex> guard(m_lock[idx]);
if(m_valid[idx]){
f_send(m_buffer[idx]);
m_valid[idx] = 0;
}
}
// Flush and start a new line (incl. lock)
/** Flush and start a new line
Flushes a valid cache line and starts another one from the provided metadata.
NOTE : It does not lock, that must be done externally! **/
void start_line(uint64_t idx, ModuleFrame& inc_frame){
// 0. Guard
// 1. Flush
if(m_valid[idx]){
f_send(m_buffer[idx]);
}
// 2. Init
if(m_valid[idx]){ f_send(m_buffer[idx]); }
// 2. Init new frame
m_buffer[idx].meta.pulse_id = inc_frame.pulse_id;
m_buffer[idx].meta.frame_index = inc_frame.frame_index;
m_buffer[idx].meta.daq_rec = inc_frame.daq_rec;
@@ -95,12 +100,14 @@ private:
const uint64_t m_CAP;
const uint64_t m_M;
const uint64_t m_blocksize = 1024*512*sizeof(uint16_t);
/** Flush function **/
std::function<void(ImageBinaryFormat&)> f_send;
/** Main container and mutex guard **/
std::vector<uint32_t> m_valid;
std::vector<std::shared_mutex> m_lock;
std::vector<std::atomic<uint32_t>> m_valid;
std::vector<ImageBinaryFormat> m_buffer;
};
#endif // FRAME_CACHE_HPP
#endif // SF_DAQ_FRAME_CACHE_HPP
+2 -3
View File
@@ -7,6 +7,7 @@
class FrameStats {
private:
const std::string detector_name_;
const int module_id_;
size_t stats_time_;
@@ -21,9 +22,7 @@ class FrameStats {
void print_stats();
public:
FrameStats(const std::string &detector_name,
const int module_id,
const size_t stats_time);
FrameStats(const std::string &detector_name, const int module_id, const size_t stats_time);
void record_stats(const ModuleFrame &meta, const bool bad_pulse_id);
};
+12 -11
View File
@@ -9,32 +9,33 @@
/** JungfrauJoch UDP receiver
Wrapper class to capture frames from the UDP stream of the JungfrauJoch FPGA card.
Capture UDP data stream from Jungfrau(Joch) FPGA card.
NOTE: This design will not scale well for higher frame rates...
TODO: Direct copy into FrameCache buffer (saves a memcopy)
**/
class JfjFrameWorker {
std::string m_state = "INIT";
PacketUdpReceiver m_udp_receiver;
bool in_progress = false;
uint64_t m_frame_index = 0;
const std::string m_moduleName;
const uint64_t m_moduleID;
// UDP and statistics interfaces
FrameStats m_moduleStats;
PacketUdpReceiver m_udp_receiver;
// Buffer and helper structures
bool in_progress = false;
uint64_t m_current_index = 0;
PacketBuffer<jfjoch_packet_t, 64> m_buffer;
// Buffer processing
inline uint64_t process_packets(BufferBinaryFormat& buffer);
uint64_t get_frame(BufferBinaryFormat& buffer);
std::function<void(uint64_t, uint64_t, BufferBinaryFormat&)> f_push_callback;
public:
JfjFrameWorker(const uint16_t port, const uint32_t moduleID,
JfjFrameWorker(const uint16_t port, std::string moduleName, const uint32_t moduleID,
std::function<void(uint64_t, uint64_t, BufferBinaryFormat&)> callback);
virtual ~JfjFrameWorker();
std::string print() const;
void run();
};
std::ostream& operator<<(std::ostream& os, const JfjFrameWorker& worker);
#endif //SF_DAQ_BUFFER_JFJ_FRAMEWORKER_HPP
+5 -6
View File
@@ -23,11 +23,11 @@ template <typename T, size_t CAPACITY>
class PacketBuffer{
public:
PacketBuffer() {
// Initialize C-structures as expected by <sockets.h>
for (int i = 0; i < CAPACITY; i++) {
m_recv_buff_ptr[i].iov_base = (void*) &(m_container[i]);
m_recv_buff_ptr[i].iov_len = sizeof(T);
// C-structure as expected by <sockets.h>
m_msgs[i].msg_hdr.msg_iov = &m_recv_buff_ptr[i];
m_msgs[i].msg_hdr.msg_iovlen = 1;
m_msgs[i].msg_hdr.msg_name = &m_sock_from[i];
@@ -52,12 +52,11 @@ public:
const T& peek_front(); //Non-destructive read
void push_back(T item); //Write new element to buffer
/**Fill from UDP receiver**/
/**Fill from UDP receiver (threadsafe)**/
template <typename TY>
void fill_from(TY& recv){
std::lock_guard<std::mutex> g_guard(m_mutex);
this->idx_write = recv.receive_many(m_msgs, this->capacity());
// std::cout << "Received " << this->idx_write << " frames" << std::endl;
// Returns -1 with errno=11 if no data received
if(idx_write==-1){ idx_write = 0; }
this->idx_read = 0;
@@ -90,7 +89,7 @@ private:
template <typename T, size_t CAPACITY>
T& PacketBuffer<T, CAPACITY>::pop_front(){
std::lock_guard<std::mutex> g_guard(m_mutex);
if(this->is_empty()){ throw std::out_of_range("Attempted to read empty queue!"); }
if(this->is_empty()) [[unlikely]] { throw std::out_of_range("Attempted to read empty queue!"); }
idx_read++;
return m_container[idx_read-1];
}
@@ -101,7 +100,7 @@ T& PacketBuffer<T, CAPACITY>::pop_front(){
template <typename T, size_t CAPACITY>
const T& PacketBuffer<T, CAPACITY>::peek_front(){
std::lock_guard<std::mutex> g_guard(m_mutex);
if(this->is_empty()){ throw std::out_of_range("Attempted to read empty queue!"); }
if(this->is_empty()) [[unlikely]] { throw std::out_of_range("Attempted to read empty queue!"); }
return m_container[idx_read];
}
@@ -110,7 +109,7 @@ const T& PacketBuffer<T, CAPACITY>::peek_front(){
template <typename T, size_t CAPACITY>
void PacketBuffer<T, CAPACITY>::push_back(T item){
std::lock_guard<std::mutex> g_guard(m_mutex);
if(this->is_full()){ throw std::out_of_range("Attempted to write a full buffer!"); }
if(this->is_full()) [[unlikely]] { throw std::out_of_range("Attempted to write a full buffer!"); }
m_container[idx_write] = item;
idx_write++;
}
+10 -10
View File
@@ -7,7 +7,7 @@
#define ASSERT_FALSE(expr, msg) \
if(bool(expr)){ \
if(bool(expr)) [[unlikely]] { \
std::string text = "ASSERTION called at " + std::string(__FILE__) + " line " + std::to_string(__LINE__) + "\n"; \
text = text + "Reason: " + std::to_string(expr) + "\n"; \
text = text + "Message:" + msg + "\nErrno: " + std::to_string(errno); \
@@ -15,18 +15,18 @@
} \
#define ASSERT_TRUE(expr, msg) \
if(!bool(expr)){ \
if(!bool(expr)) [[unlikely]] { \
std::string text = "ASSERTION called at " + std::string(__FILE__) + " line " + std::to_string(__LINE__) + "\n"; \
text = text + "Reason: " + std::to_string(expr) + "\n"; \
text = text + "Message:" + msg + "\nErrno: " + std::to_string(errno); \
throw std::runtime_error(text); \
}
}
/** ZMQ Publisher
Lightweight wrapper base class to initialize a ZMQ Publisher.
Nothing data specific, but everything is only 'protected'.
It also has an internal mutex that can be used for threadsafe
It also has an internal mutex that can be used for threadsafe
access to the undelying connection;
**/
class ZmqPublisher {
@@ -36,22 +36,22 @@ class ZmqPublisher {
zmq::context_t m_ctx;
zmq::socket_t m_socket;
std::mutex g_zmq_socket;
public:
public:
ZmqPublisher(std::string ip, uint16_t port) :
m_port(port), m_address("tcp://*:" + std::to_string(port)), m_ctx(1), m_socket(m_ctx, ZMQ_PUB) {
// Bind the socket
m_socket.bind(m_address.c_str());
std::cout << "Initialized ZMQ publisher at " << m_address << std::endl;
};
~ZmqPublisher(){};
};
/** ZMQ Image Publisher
Specialized publisher to send 'ImageBinaryFormat' data format as
Specialized publisher to send 'ImageBinaryFormat' data format as
multipart message. It also takes care of thread safety.
**/
class ZmqImagePublisher: public ZmqPublisher {
@@ -62,9 +62,9 @@ class ZmqImagePublisher: public ZmqPublisher {
void sendImage(ImageBinaryFormat& image){
std::lock_guard<std::mutex> guard(g_zmq_socket);
int len;
len = m_socket.send(topic.c_str(), topic.size(), ZMQ_SNDMORE);
ASSERT_TRUE( len >=0, "Failed to send topic data" )
ASSERT_TRUE( len >=0, "Failed to send topic data" )
len = m_socket.send(&image.meta, sizeof(image.meta), ZMQ_SNDMORE);
ASSERT_TRUE( len >=0, "Failed to send meta data" )
// std::cout << "\tPT1 Sent " << len << "\n";
+21 -25
View File
@@ -1,4 +1,4 @@
#include <iostream>
#include <sstream>
#include "JfjFrameStats.hpp"
using namespace std;
@@ -15,8 +15,7 @@ FrameStats::FrameStats(
reset_counters();
}
void FrameStats::reset_counters()
{
void FrameStats::reset_counters(){
frames_counter_ = 0;
n_missed_packets_ = 0;
n_corrupted_frames_ = 0;
@@ -24,12 +23,11 @@ void FrameStats::reset_counters()
stats_interval_start_ = steady_clock::now();
}
void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id)
{
void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id){
if (bad_pulse_id) {
n_corrupted_pulse_id_++;
}
}
if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) {
n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets;
@@ -38,8 +36,7 @@ void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id)
frames_counter_++;
auto time_passed = duration_cast<milliseconds>(
steady_clock::now()-stats_interval_start_).count();
auto time_passed = duration_cast<milliseconds>(steady_clock::now()-stats_interval_start_).count();
if (time_passed >= stats_time_*1000) {
print_stats();
@@ -47,25 +44,24 @@ void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id)
}
}
void FrameStats::print_stats()
{
auto interval_ms_duration = duration_cast<milliseconds>(
steady_clock::now()-stats_interval_start_).count();
void FrameStats::print_stats(){
auto interval_ms_duration = duration_cast<milliseconds>(steady_clock::now()-stats_interval_start_).count();
// * 1000 because milliseconds, + 250 because of truncation.
int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration;
uint64_t timestamp = time_point_cast<nanoseconds>(
system_clock::now()).time_since_epoch().count();
uint64_t timestamp = time_point_cast<nanoseconds>(system_clock::now()).time_since_epoch().count();
// Output in InfluxDB line protocol
cout << "jf_udp_recv";
cout << ",detector_name=" << detector_name_;
cout << ",module_name=M" << module_id_;
cout << " ";
cout << "n_missed_packets=" << n_missed_packets_ << "i";
cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i";
cout << ",repetition_rate=" << rep_rate << "i";
cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i";
cout << " ";
cout << timestamp;
cout << endl;
stringstream ss;
ss << "jf_udp_recv";
ss << ",detector_name=" << detector_name_;
ss << ",module_name=M" << module_id_;
ss << " ";
ss << "n_missed_packets=" << n_missed_packets_ << "i";
ss << ",n_corrupted_frames=" << n_corrupted_frames_ << "i";
ss << ",repetition_rate=" << rep_rate << "i";
ss << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i";
ss << " ";
ss << timestamp;
ss << endl;
std::cout << ss.str();
}
+41 -70
View File
@@ -1,16 +1,11 @@
#include <cstring>
#include <sstream>
#include "JfjFrameWorker.hpp"
using namespace std;
using namespace buffer_config;
JfjFrameWorker::JfjFrameWorker(const uint16_t port, const uint32_t moduleID,
std::function<void(uint64_t, uint64_t, BufferBinaryFormat&)> callback):
m_moduleID(moduleID), f_push_callback(callback) {
JfjFrameWorker::JfjFrameWorker(const uint16_t port, std::string moduleName, const uint32_t moduleID, std::function<void(uint64_t, uint64_t, BufferBinaryFormat&)> callback):
m_moduleName(moduleName), m_moduleID(moduleID), m_moduleStats(modulename, moduleID, 10.0), f_push_callback(callback) {
m_udp_receiver.bind(port);
m_state = "ON";
}
@@ -18,14 +13,17 @@ JfjFrameWorker::~JfjFrameWorker() {
m_udp_receiver.disconnect();
}
/** Process Packets
Drains the buffer either until it's empty or the current frame is finished.
Has some optimizations and safety checks before segfaulting right away...
TODO: Direct memcopy into FrameCache for more speed! **/
inline uint64_t JfjFrameWorker::process_packets(BufferBinaryFormat& buffer){
//std::cout << " Called process_packets()" << std::endl;
while(!m_buffer.is_empty()){
// Happens if the last packet from the previous frame gets lost.
if (m_frame_index != m_buffer.peek_front().framenum) {
m_frame_index = m_buffer.peek_front().framenum;
if (m_current_index != m_buffer.peek_front().bunchid) [[unlikely]] {
m_current_index = m_buffer.peek_front().bunchid;
if(this->in_progress){
this->in_progress = false;
return buffer.meta.pulse_id;
@@ -34,30 +32,32 @@ inline uint64_t JfjFrameWorker::process_packets(BufferBinaryFormat& buffer){
// Otherwise pop the queue (and set current frame index)
jfjoch_packet_t& c_packet = m_buffer.pop_front();
m_frame_index = c_packet.framenum;
this->in_progress = true;
//std::cout << " ff: " << c_packet.framenum << std::endl;
//std::cout << " ex: " << c_packet.exptime << std::endl;
//std::cout << " pp: " << c_packet.packetnum << std::endl;
// Always copy metadata (otherwise problem when 0th packet gets lost)
buffer.meta.pulse_id = c_packet.bunchid;
buffer.meta.frame_index = c_packet.framenum;
buffer.meta.daq_rec = c_packet.debug;
buffer.meta.module_id = m_moduleID;
// Sanity check: rather throw than segfault...
if(c_packet.packetnum >= JF_N_PACKETS_PER_FRAME) [[unlikely]] {
std::stringstream ss << "Packet index '" << c_packet.packetnum "' is out of range of " << JF_N_PACKETS_PER_FRAME << std::endl;
throw std::range_error(ss.str());
}
// Start new frame
if(!this->in_progress) [[unlikely]] {
m_current_index = c_packet.bunchid;
this->in_progress = true;
// Always copy metadata (otherwise problem when 0th packet gets lost)
buffer.meta.pulse_id = c_packet.bunchid;
buffer.meta.frame_index = c_packet.framenum;
buffer.meta.daq_rec = c_packet.debug;
buffer.meta.module_id = m_moduleID;
}
// Copy data to frame buffer
if(c_packet.packetnum >= JF_N_PACKETS_PER_FRAME){
std::cout << "Too high packet index: " << c_packet.packetnum << std::endl;
return 0;
}
size_t offset = JUNGFRAU_DATA_BYTES_PER_PACKET * c_packet.packetnum;
memcpy( (void*) (buffer.data + offset), c_packet.data, JUNGFRAU_DATA_BYTES_PER_PACKET);
buffer.meta.n_recv_packets++;
// Last frame packet received. Frame finished.
if (c_packet.packetnum == JF_N_PACKETS_PER_FRAME - 1){
// std::cout << "Finished pulse: " << buffer.meta.pulse_id << std::endl;
if (c_packet.packetnum == JF_N_PACKETS_PER_FRAME - 1) [[unlikely]] {
this->in_progress = false;
return buffer.meta.pulse_id;
}
@@ -68,68 +68,39 @@ inline uint64_t JfjFrameWorker::process_packets(BufferBinaryFormat& buffer){
}
uint64_t JfjFrameWorker::get_frame(BufferBinaryFormat& buffer){
//std::cout << "Called get_frame()" << std::endl;
// std::cout << "Called get_frame()" << std::endl;
// Reset the metadata and frame buffer for the next frame. (really needed?)
// Reset the metadata and frame buffer for the next frame
memset(&buffer, 0, sizeof(buffer));
uint64_t pulse_id = 0;
// Process leftover packages in the buffer
if (!m_buffer.is_empty()) {
//std::cout << "Leftovers" << std::endl;
auto pulse_id = process_packets(buffer);
if (pulse_id != 0) { return pulse_id; }
}
// Hehehehe... do-while loop!
do {
// First make sure the buffer is drained of leftovers
pulse_id = process_packets(buffer);
if (pulse_id != 0) [[likely]] { return pulse_id; }
while (true) {
// Receive new packages (pass if none)...
//std::cout << "New packages" << std::endl;
// Then try to refill buffer...
m_buffer.fill_from(m_udp_receiver);
if (m_buffer.is_empty()) { continue; }
// std::cout << "\tGot " << m_buffer.size() << std::endl;
// ... and process them
auto pulse_id = process_packets(buffer);
if (pulse_id != 0) { return pulse_id; }
}
} while (true);
}
void JfjFrameWorker::run(){
std::cout << "Running worker loop" << std::endl;
// Might be better creating a structure for double buffering
BufferBinaryFormat buffer;
uint64_t pulse_id_previous = 0;
uint64_t frame_index_previous = 0;
try{
m_state = "RUNNING";
while (true) {
// NOTE: Needs to be pipelined for really high frame rates
auto pulse_id = get_frame(buffer);
m_stats.record_stats(buffer.meta, true);
if(pulse_id>10){
if(pulse_id>10) [[likely]] {
f_push_callback(pulse_id, m_moduleID, buffer);
}
}
} catch (const std::exception& ex) {
std::cout << "Exception in worker loop: " << ex.what() << std::endl;
m_state = "ERROR";
throw;
};
}
std::string JfjFrameWorker::print() const {
std::string msg = "JungfrauFrameWorker #" + std::to_string(m_moduleID) + "\n"+
"State:\t" + m_state + "\n";
return msg;
}
std::ostream& operator<<(std::ostream& os, const JfjFrameWorker& worker){
os << worker.print() << std::endl;
return os;
}