diff --git a/jfj-udp-recv/include/JfjFrameStats.hpp b/jfj-udp-recv/include/JfjFrameStats.hpp index 7d53b6c..7ab6636 100644 --- a/jfj-udp-recv/include/JfjFrameStats.hpp +++ b/jfj-udp-recv/include/JfjFrameStats.hpp @@ -2,12 +2,13 @@ #include #include -#ifndef SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP -#define SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP +#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP +#define SF_DAQ_BUFFER_FRAMESTATS_HPP class FrameStats { const std::string detector_name_; + const int module_id_; size_t stats_time_; int frames_counter_; @@ -20,9 +21,11 @@ class FrameStats { void print_stats(); public: - FrameStats(const std::string &detector_name, const size_t stats_time); - void record_stats(const ImageMetadata &meta, const bool bad_pulse_id); + FrameStats(const std::string &detector_name, + const int module_id, + const size_t stats_time); + void record_stats(const ModuleFrame &meta, const bool bad_pulse_id); }; -#endif //SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP +#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP diff --git a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp index fc9a284..aad5962 100644 --- a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp +++ b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp @@ -19,13 +19,13 @@ class JfjFrameUdpReceiver { PacketBuffer m_buffer; - inline void init_frame(ImageMetadata& frame_metadata, const jfjoch_packet_t& c_packet); - inline uint64_t process_packets(ImageMetadata& metadata, char* frame_buffer); + inline void init_frame(ModuleFrame& frame_metadata, const jfjoch_packet_t& c_packet); + inline uint64_t process_packets(ModuleFrame& metadata, char* frame_buffer); public: JfjFrameUdpReceiver(const uint16_t port); virtual ~JfjFrameUdpReceiver(); - uint64_t get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer); + uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer); }; diff --git a/jfj-udp-recv/src/JfjFrameStats.cpp b/jfj-udp-recv/src/JfjFrameStats.cpp index 36bb614..ee48423 100644 --- a/jfj-udp-recv/src/JfjFrameStats.cpp +++ b/jfj-udp-recv/src/JfjFrameStats.cpp @@ -1,54 +1,71 @@ -#include -#include "JfjFrameStats.hpp" - -using namespace std; -using namespace chrono; - -FrameStats::FrameStats(const std::string &detector_name, const size_t stats_time) : - detector_name_(detector_name), stats_time_(stats_time) { - reset_counters(); -} - -void FrameStats::reset_counters() -{ - frames_counter_ = 0; - n_corrupted_frames_ = 0; - n_corrupted_pulse_id_ = 0; - stats_interval_start_ = steady_clock::now(); -} - -void FrameStats::record_stats(const ImageMetadata &meta, const bool bad_pulse_id) -{ - - if (bad_pulse_id) { - n_corrupted_pulse_id_++; - n_corrupted_frames_++; - } - - frames_counter_++; - - auto time_passed = duration_cast(steady_clock::now()-stats_interval_start_).count(); - - if (time_passed >= stats_time_*1000) { - print_stats(); - reset_counters(); - } -} - -void FrameStats::print_stats(){ - auto interval_ms_duration = duration_cast(steady_clock::now()-stats_interval_start_).count(); - // * 1000 because milliseconds, + 250 because of truncation. - int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration; - uint64_t timestamp = time_point_cast(system_clock::now()).time_since_epoch().count(); - - // Output in InfluxDB line protocol - cout << "jfj_udp_recv"; - cout << ",detector_name=" << detector_name_; - cout << " "; - cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; - cout << ",repetition_rate=" << rep_rate << "i"; - cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; - cout << " "; - cout << timestamp; - cout << endl; -} +#include +#include "JfjFrameStats.hpp" + +using namespace std; +using namespace chrono; + +FrameStats::FrameStats( + const std::string &detector_name, + const int module_id, + const size_t stats_time) : + detector_name_(detector_name), + module_id_(module_id), + stats_time_(stats_time) +{ + reset_counters(); +} + +void FrameStats::reset_counters() +{ + frames_counter_ = 0; + n_missed_packets_ = 0; + n_corrupted_frames_ = 0; + n_corrupted_pulse_id_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) +{ + + if (bad_pulse_id) { + n_corrupted_pulse_id_++; + } + + if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) { + n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets; + n_corrupted_frames_++; + } + + frames_counter_++; + + auto time_passed = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + + if (time_passed >= stats_time_*1000) { + print_stats(); + reset_counters(); + } +} + +void FrameStats::print_stats() +{ + auto interval_ms_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "jf_udp_recv"; + cout << ",detector_name=" << detector_name_; + cout << ",module_name=M" << module_id_; + cout << " "; + cout << "n_missed_packets=" << n_missed_packets_ << "i"; + cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} diff --git a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp index 88b9ec0..c7c6724 100644 --- a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp +++ b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp @@ -13,20 +13,19 @@ JfjFrameUdpReceiver::~JfjFrameUdpReceiver() { m_udp_receiver.disconnect(); } -inline void JfjFrameUdpReceiver::init_frame(ImageMetadata& metadata, const jfjoch_packet_t& c_packet) { - metadata.pulse_id = c_packet.timestamp; - metadata.frame_index = c_packet.framenum; - metadata.daq_rec = (uint32_t) c_packet.debug; - metadata.is_good_image = (int32_t) true; +inline void JfjFrameUdpReceiver::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) { + metadata.pulse_id = c_packet.bunchid; + metadata.frame_index = c_packet.framenum; + metadata.daq_rec = (uint64_t) c_packet.debug; + metadata.module_id = (int64_t) 0; } -inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, char* frame_buffer){ +inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char* frame_buffer){ while(!m_buffer.is_empty()){ // Happens if the last packet from the previous frame gets lost. if (m_frame_index != m_buffer.peek_front().framenum) { m_frame_index = m_buffer.peek_front().framenum; - metadata.is_good_image = (int32_t) false; return metadata.pulse_id; } @@ -40,6 +39,7 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, ch // Copy data to frame buffer size_t offset = JFJOCH_DATA_BYTES_PER_PACKET * c_packet.packetnum; memcpy( (void*) (frame_buffer + offset), c_packet.data, JFJOCH_DATA_BYTES_PER_PACKET); + metadata.n_recv_packets++; // Last frame packet received. Frame finished. if (c_packet.packetnum == JFJOCH_N_PACKETS_PER_FRAME - 1){ @@ -52,9 +52,10 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, ch return 0; } -uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer){ +uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer){ // Reset the metadata and frame buffer for the next frame. (really needed?) metadata.pulse_id = 0; + metadata.n_recv_packets = 0; memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_PACKET); // Process leftover packages in the buffer diff --git a/jfj-udp-recv/src/main.cpp b/jfj-udp-recv/src/main.cpp index df9d0fa..2c52f25 100644 --- a/jfj-udp-recv/src/main.cpp +++ b/jfj-udp-recv/src/main.cpp @@ -18,30 +18,29 @@ int main (int argc, char *argv[]) { if (argc != 3) { cout << endl; - cout << "Usage: jfj_udp_recv [detector_json_filename] [module_id]"; + cout << "Usage: jfj_udp_recv [detector_json_filename]"; cout << endl; cout << "\tdetector_json_filename: detector config file path." << endl; - cout << "\tmodule_id: id of the module for this process." << endl; cout << endl; exit(-1); } const auto config = read_json_config(string(argv[1])); - const int module_id = atoi(argv[2]); const auto udp_port = config.start_udp_port; JfjFrameUdpReceiver receiver(udp_port); RamBuffer buffer(config.detector_name, config.n_modules); - FrameStats stats(config.detector_name, STATS_TIME); + FrameStats stats(config.detector_name, 0, STATS_TIME); auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, ZMQ_IO_THREADS); auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "jungfraujoch"); // Might be better creating a structure for double buffering - ImageMetadata metaBufferA; - char* dataBufferA = new char[JFJOCH_DATA_BYTES_PER_FRAME]; + ModuleFrame frameMeta; + ImageMetadata imageMeta; + char* dataBuffer = new char[JFJOCH_DATA_BYTES_PER_FRAME]; uint64_t pulse_id_previous = 0; uint64_t frame_index_previous = 0; @@ -49,23 +48,28 @@ int main (int argc, char *argv[]) { while (true) { // NOTE: Needs to be pipelined for really high frame rates - auto pulse_id = receiver.get_frame_from_udp(metaBufferA, dataBufferA); + auto pulse_id = receiver.get_frame_from_udp(frameMeta, dataBuffer); bool bad_pulse_id = false; - if ( ( metaBufferA.frame_index != (frame_index_previous+1) ) || ( (pulse_id-pulse_id_previous) < 0 ) || ( (pulse_id-pulse_id_previous) > 1000 ) ) { + if ( ( frameMeta.frame_index != (frame_index_previous+1) ) || ( (pulse_id-pulse_id_previous) < 0 ) || ( (pulse_id-pulse_id_previous) > 1000 ) ) { bad_pulse_id = true; } else { - buffer.write_frame(metaBufferA, dataBufferA); - zmq_send(sender, &metaBufferA, sizeof(metaBufferA), 0); + imageMeta.pulse_id = frameMeta.pulse_id; + imageMeta.frame_index = frameMeta.frame_index; + imageMeta.daq_rec = frameMeta.daq_rec; + imageMeta.is_good_image = true; + + buffer.write_frame(frameMeta, dataBuffer); + zmq_send(sender, &imageMeta, sizeof(imageMeta), 0); } - stats.record_stats(metaBufferA, bad_pulse_id); + stats.record_stats(frameMeta, bad_pulse_id); pulse_id_previous = pulse_id; - frame_index_previous = metaBufferA.frame_index; + frame_index_previous = frameMeta.frame_index; } - delete[] dataBufferA; + delete[] dataBuffer; }