diff --git a/core-buffer/include/buffer_config.hpp b/core-buffer/include/buffer_config.hpp index 293308b..b2e68ca 100644 --- a/core-buffer/include/buffer_config.hpp +++ b/core-buffer/include/buffer_config.hpp @@ -20,8 +20,10 @@ namespace buffer_config { const size_t FOLDER_MOD = 100000; // Extension of our file format. const std::string FILE_EXTENSION = ".bin"; - // Number of pulses between each statistics print out. - const size_t STATS_MODULO = 100; + // Number of pulses between each statistics print out (buffer_writer, stream2vis...) + const size_t STATS_MODULO = 1000; + // Number of seconds after which statistics is print out (udp_recv) + const size_t STATS_TIME = 10; // If the RB is empty, how much time to wait before trying to read it again. const size_t RB_READ_RETRY_INTERVAL_MS = 5; // How many frames to read at once from file. diff --git a/jf-udp-recv/include/FrameStats.hpp b/jf-udp-recv/include/FrameStats.hpp index dd4ef95..7839a38 100644 --- a/jf-udp-recv/include/FrameStats.hpp +++ b/jf-udp-recv/include/FrameStats.hpp @@ -9,11 +9,12 @@ class FrameStats { const std::string detector_name_; const int module_id_; - size_t stats_modulo_; + size_t stats_time_; int frames_counter_; int n_missed_packets_; int n_corrupted_frames_; + int n_corrupted_pulse_id_; std::chrono::time_point stats_interval_start_; void reset_counters(); @@ -22,8 +23,8 @@ class FrameStats { public: FrameStats(const std::string &detector_name, const int module_id, - const size_t stats_modulo); - void record_stats(const ModuleFrame &meta); + const size_t stats_time); + void record_stats(const ModuleFrame &meta, const bool bad_pulse_id); }; diff --git a/jf-udp-recv/src/FrameStats.cpp b/jf-udp-recv/src/FrameStats.cpp index 103e4fe..28161c7 100644 --- a/jf-udp-recv/src/FrameStats.cpp +++ b/jf-udp-recv/src/FrameStats.cpp @@ -7,10 +7,10 @@ using namespace chrono; FrameStats::FrameStats( const std::string &detector_name, const int module_id, - const size_t stats_modulo) : + const size_t stats_time) : detector_name_(detector_name), module_id_(module_id), - stats_modulo_(stats_modulo) + stats_time_(stats_time) { reset_counters(); } @@ -20,11 +20,17 @@ void FrameStats::reset_counters() frames_counter_ = 0; n_missed_packets_ = 0; n_corrupted_frames_ = 0; + n_corrupted_pulse_id_ = 0; stats_interval_start_ = steady_clock::now(); } -void FrameStats::record_stats(const ModuleFrame &meta) +void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) { + + if (bad_pulse_id) { + n_corrupted_pulse_id_++; + } + if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) { n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets; n_corrupted_frames_++; @@ -32,7 +38,10 @@ void FrameStats::record_stats(const ModuleFrame &meta) frames_counter_++; - if (frames_counter_ == stats_modulo_) { + auto time_passed = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + + if (time_passed >= stats_time_*1000) { print_stats(); reset_counters(); } @@ -55,6 +64,7 @@ void FrameStats::print_stats() cout << "n_missed_packets=" << n_missed_packets_ << "i"; cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; cout << ",repetition_rate=" << rep_rate << "i"; + cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; cout << " "; cout << timestamp; cout << endl; diff --git a/jf-udp-recv/src/FrameUdpReceiver.cpp b/jf-udp-recv/src/FrameUdpReceiver.cpp index 1c0833b..cb78a7a 100644 --- a/jf-udp-recv/src/FrameUdpReceiver.cpp +++ b/jf-udp-recv/src/FrameUdpReceiver.cpp @@ -63,8 +63,8 @@ inline uint64_t FrameUdpReceiver::process_packets( init_frame(metadata, i_packet); // Happens if the last packet from the previous frame gets lost. - // In the jungfrau_packet, pulse_id is called bunchid. - } else if (metadata.pulse_id != packet_buffer_[i_packet].bunchid) { + // In the jungfrau_packet, framenum is the trigger number (how many triggers from detector power-on) happened + } else if (metadata.frame_index != packet_buffer_[i_packet].framenum) { packet_buffer_loaded_ = true; // Continue on this packet. packet_buffer_offset_ = i_packet; @@ -134,4 +134,4 @@ uint64_t FrameUdpReceiver::get_frame_from_udp( return pulse_id; } } -} \ No newline at end of file +} diff --git a/jf-udp-recv/src/main.cpp b/jf-udp-recv/src/main.cpp index 2194b80..bca57e2 100644 --- a/jf-udp-recv/src/main.cpp +++ b/jf-udp-recv/src/main.cpp @@ -33,7 +33,7 @@ int main (int argc, char *argv[]) { const auto udp_port = config.start_udp_port + module_id; FrameUdpReceiver receiver(udp_port, module_id); RamBuffer buffer(config.detector_name, config.n_modules); - FrameStats stats(config.detector_name, module_id, STATS_MODULO); + FrameStats stats(config.detector_name, module_id, STATS_TIME); auto ctx = zmq_ctx_new(); auto socket = bind_socket(ctx, config.detector_name, to_string(module_id)); @@ -41,14 +41,34 @@ int main (int argc, char *argv[]) { ModuleFrame meta; char* data = new char[MODULE_N_BYTES]; + uint64_t pulse_id_previous = 0; + uint64_t frame_index_previous = 0; + while (true) { + auto pulse_id = receiver.get_frame_from_udp(meta, data); - buffer.write_frame(meta, data); + bool bad_pulse_id = false; - zmq_send(socket, &pulse_id, sizeof(pulse_id), 0); + if ( ( meta.frame_index != (frame_index_previous+1) ) || + ( (pulse_id-pulse_id_previous) < 0 ) || + ( (pulse_id-pulse_id_previous) > 1000 ) ) { + + bad_pulse_id = true; + + } else { + + buffer.write_frame(meta, data); + + zmq_send(socket, &pulse_id, sizeof(pulse_id), 0); + + } + + stats.record_stats(meta, bad_pulse_id); + + pulse_id_previous = pulse_id; + frame_index_previous = meta.frame_index; - stats.record_stats(meta); } delete[] data;