It compiles

This commit is contained in:
Mohacsi Istvan
2021-06-01 13:49:30 +02:00
parent 5be091eee8
commit 53abc8e867
5 changed files with 108 additions and 83 deletions
+8 -5
View File
@@ -2,12 +2,13 @@
#include <formats.hpp>
#include <chrono>
#ifndef SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP
#define SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP
#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP
#define SF_DAQ_BUFFER_FRAMESTATS_HPP
class FrameStats {
const std::string detector_name_;
const int module_id_;
size_t stats_time_;
int frames_counter_;
@@ -20,9 +21,11 @@ class FrameStats {
void print_stats();
public:
FrameStats(const std::string &detector_name, const size_t stats_time);
void record_stats(const ImageMetadata &meta, const bool bad_pulse_id);
FrameStats(const std::string &detector_name,
const int module_id,
const size_t stats_time);
void record_stats(const ModuleFrame &meta, const bool bad_pulse_id);
};
#endif //SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP
#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP
+3 -3
View File
@@ -19,13 +19,13 @@ class JfjFrameUdpReceiver {
PacketBuffer<jfjoch_packet_t, buffer_config::BUFFER_UDP_N_RECV_MSG> m_buffer;
inline void init_frame(ImageMetadata& frame_metadata, const jfjoch_packet_t& c_packet);
inline uint64_t process_packets(ImageMetadata& metadata, char* frame_buffer);
inline void init_frame(ModuleFrame& frame_metadata, const jfjoch_packet_t& c_packet);
inline uint64_t process_packets(ModuleFrame& metadata, char* frame_buffer);
public:
JfjFrameUdpReceiver(const uint16_t port);
virtual ~JfjFrameUdpReceiver();
uint64_t get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer);
uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer);
};
+71 -54
View File
@@ -1,54 +1,71 @@
#include <iostream>
#include "JfjFrameStats.hpp"
using namespace std;
using namespace chrono;
FrameStats::FrameStats(const std::string &detector_name, const size_t stats_time) :
detector_name_(detector_name), stats_time_(stats_time) {
reset_counters();
}
void FrameStats::reset_counters()
{
frames_counter_ = 0;
n_corrupted_frames_ = 0;
n_corrupted_pulse_id_ = 0;
stats_interval_start_ = steady_clock::now();
}
void FrameStats::record_stats(const ImageMetadata &meta, const bool bad_pulse_id)
{
if (bad_pulse_id) {
n_corrupted_pulse_id_++;
n_corrupted_frames_++;
}
frames_counter_++;
auto time_passed = duration_cast<milliseconds>(steady_clock::now()-stats_interval_start_).count();
if (time_passed >= stats_time_*1000) {
print_stats();
reset_counters();
}
}
void FrameStats::print_stats(){
auto interval_ms_duration = duration_cast<milliseconds>(steady_clock::now()-stats_interval_start_).count();
// * 1000 because milliseconds, + 250 because of truncation.
int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration;
uint64_t timestamp = time_point_cast<nanoseconds>(system_clock::now()).time_since_epoch().count();
// Output in InfluxDB line protocol
cout << "jfj_udp_recv";
cout << ",detector_name=" << detector_name_;
cout << " ";
cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i";
cout << ",repetition_rate=" << rep_rate << "i";
cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i";
cout << " ";
cout << timestamp;
cout << endl;
}
#include <iostream>
#include "JfjFrameStats.hpp"
using namespace std;
using namespace chrono;
FrameStats::FrameStats(
const std::string &detector_name,
const int module_id,
const size_t stats_time) :
detector_name_(detector_name),
module_id_(module_id),
stats_time_(stats_time)
{
reset_counters();
}
void FrameStats::reset_counters()
{
frames_counter_ = 0;
n_missed_packets_ = 0;
n_corrupted_frames_ = 0;
n_corrupted_pulse_id_ = 0;
stats_interval_start_ = steady_clock::now();
}
void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id)
{
if (bad_pulse_id) {
n_corrupted_pulse_id_++;
}
if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) {
n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets;
n_corrupted_frames_++;
}
frames_counter_++;
auto time_passed = duration_cast<milliseconds>(
steady_clock::now()-stats_interval_start_).count();
if (time_passed >= stats_time_*1000) {
print_stats();
reset_counters();
}
}
void FrameStats::print_stats()
{
auto interval_ms_duration = duration_cast<milliseconds>(
steady_clock::now()-stats_interval_start_).count();
// * 1000 because milliseconds, + 250 because of truncation.
int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration;
uint64_t timestamp = time_point_cast<nanoseconds>(
system_clock::now()).time_since_epoch().count();
// Output in InfluxDB line protocol
cout << "jf_udp_recv";
cout << ",detector_name=" << detector_name_;
cout << ",module_name=M" << module_id_;
cout << " ";
cout << "n_missed_packets=" << n_missed_packets_ << "i";
cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i";
cout << ",repetition_rate=" << rep_rate << "i";
cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i";
cout << " ";
cout << timestamp;
cout << endl;
}
+9 -8
View File
@@ -13,20 +13,19 @@ JfjFrameUdpReceiver::~JfjFrameUdpReceiver() {
m_udp_receiver.disconnect();
}
inline void JfjFrameUdpReceiver::init_frame(ImageMetadata& metadata, const jfjoch_packet_t& c_packet) {
metadata.pulse_id = c_packet.timestamp;
metadata.frame_index = c_packet.framenum;
metadata.daq_rec = (uint32_t) c_packet.debug;
metadata.is_good_image = (int32_t) true;
inline void JfjFrameUdpReceiver::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) {
metadata.pulse_id = c_packet.bunchid;
metadata.frame_index = c_packet.framenum;
metadata.daq_rec = (uint64_t) c_packet.debug;
metadata.module_id = (int64_t) 0;
}
inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, char* frame_buffer){
inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char* frame_buffer){
while(!m_buffer.is_empty()){
// Happens if the last packet from the previous frame gets lost.
if (m_frame_index != m_buffer.peek_front().framenum) {
m_frame_index = m_buffer.peek_front().framenum;
metadata.is_good_image = (int32_t) false;
return metadata.pulse_id;
}
@@ -40,6 +39,7 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, ch
// Copy data to frame buffer
size_t offset = JFJOCH_DATA_BYTES_PER_PACKET * c_packet.packetnum;
memcpy( (void*) (frame_buffer + offset), c_packet.data, JFJOCH_DATA_BYTES_PER_PACKET);
metadata.n_recv_packets++;
// Last frame packet received. Frame finished.
if (c_packet.packetnum == JFJOCH_N_PACKETS_PER_FRAME - 1){
@@ -52,9 +52,10 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, ch
return 0;
}
uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer){
uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer){
// Reset the metadata and frame buffer for the next frame. (really needed?)
metadata.pulse_id = 0;
metadata.n_recv_packets = 0;
memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_PACKET);
// Process leftover packages in the buffer
+17 -13
View File
@@ -18,30 +18,29 @@ int main (int argc, char *argv[]) {
if (argc != 3) {
cout << endl;
cout << "Usage: jfj_udp_recv [detector_json_filename] [module_id]";
cout << "Usage: jfj_udp_recv [detector_json_filename]";
cout << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << "\tmodule_id: id of the module for this process." << endl;
cout << endl;
exit(-1);
}
const auto config = read_json_config(string(argv[1]));
const int module_id = atoi(argv[2]);
const auto udp_port = config.start_udp_port;
JfjFrameUdpReceiver receiver(udp_port);
RamBuffer buffer(config.detector_name, config.n_modules);
FrameStats stats(config.detector_name, STATS_TIME);
FrameStats stats(config.detector_name, 0, STATS_TIME);
auto ctx = zmq_ctx_new();
zmq_ctx_set(ctx, ZMQ_IO_THREADS, ZMQ_IO_THREADS);
auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "jungfraujoch");
// Might be better creating a structure for double buffering
ImageMetadata metaBufferA;
char* dataBufferA = new char[JFJOCH_DATA_BYTES_PER_FRAME];
ModuleFrame frameMeta;
ImageMetadata imageMeta;
char* dataBuffer = new char[JFJOCH_DATA_BYTES_PER_FRAME];
uint64_t pulse_id_previous = 0;
uint64_t frame_index_previous = 0;
@@ -49,23 +48,28 @@ int main (int argc, char *argv[]) {
while (true) {
// NOTE: Needs to be pipelined for really high frame rates
auto pulse_id = receiver.get_frame_from_udp(metaBufferA, dataBufferA);
auto pulse_id = receiver.get_frame_from_udp(frameMeta, dataBuffer);
bool bad_pulse_id = false;
if ( ( metaBufferA.frame_index != (frame_index_previous+1) ) || ( (pulse_id-pulse_id_previous) < 0 ) || ( (pulse_id-pulse_id_previous) > 1000 ) ) {
if ( ( frameMeta.frame_index != (frame_index_previous+1) ) || ( (pulse_id-pulse_id_previous) < 0 ) || ( (pulse_id-pulse_id_previous) > 1000 ) ) {
bad_pulse_id = true;
} else {
buffer.write_frame(metaBufferA, dataBufferA);
zmq_send(sender, &metaBufferA, sizeof(metaBufferA), 0);
imageMeta.pulse_id = frameMeta.pulse_id;
imageMeta.frame_index = frameMeta.frame_index;
imageMeta.daq_rec = frameMeta.daq_rec;
imageMeta.is_good_image = true;
buffer.write_frame(frameMeta, dataBuffer);
zmq_send(sender, &imageMeta, sizeof(imageMeta), 0);
}
stats.record_stats(metaBufferA, bad_pulse_id);
stats.record_stats(frameMeta, bad_pulse_id);
pulse_id_previous = pulse_id;
frame_index_previous = metaBufferA.frame_index;
frame_index_previous = frameMeta.frame_index;
}
delete[] dataBufferA;
delete[] dataBuffer;
}