From adc7071f679002f7bf172059ab9d5fd0de5aade3 Mon Sep 17 00:00:00 2001 From: Mohacsi Istvan Date: Mon, 21 Jun 2021 14:07:33 +0200 Subject: [PATCH] Clanup depends --- core-buffer/include/formats.hpp | 1 + jfj-combined/CMakeLists.txt | 2 +- jfj-combined/include/JfjFrameCache.hpp | 14 ++--- jfj-combined/include/JfjFrameStats.hpp | 2 +- jfj-combined/include/JfjFrameWorker.hpp | 32 +++++++----- jfj-combined/include/PacketBuffer.hpp | 9 +++- jfj-combined/src/JfjFrameWorker.cpp | 69 +++++++++++++------------ jfj-combined/src/main.cpp | 27 ++++------ 8 files changed, 80 insertions(+), 76 deletions(-) diff --git a/core-buffer/include/formats.hpp b/core-buffer/include/formats.hpp index dfa3589..cb32b96 100644 --- a/core-buffer/include/formats.hpp +++ b/core-buffer/include/formats.hpp @@ -3,6 +3,7 @@ #include "buffer_config.hpp" #include "jungfrau.hpp" +#include "jungfraujoch.hpp" #pragma pack(push) #pragma pack(1) diff --git a/jfj-combined/CMakeLists.txt b/jfj-combined/CMakeLists.txt index 4932c5d..b3a123c 100644 --- a/jfj-combined/CMakeLists.txt +++ b/jfj-combined/CMakeLists.txt @@ -2,7 +2,7 @@ file(GLOB SOURCES src/*.cpp) add_library(jfj-combined-lib STATIC ${SOURCES}) target_include_directories(jfj-combined-lib PUBLIC include/) -target_link_libraries(jfj-combined-lib external core-buffer-lib) +target_link_libraries(jfj-combined-lib external) add_executable(jfj-combined src/main.cpp) set_target_properties(jfj-combined PROPERTIES OUTPUT_NAME jfj_combined) diff --git a/jfj-combined/include/JfjFrameCache.hpp b/jfj-combined/include/JfjFrameCache.hpp index 4d45aa7..99c4009 100644 --- a/jfj-combined/include/JfjFrameCache.hpp +++ b/jfj-combined/include/JfjFrameCache.hpp @@ -11,7 +11,7 @@ #include #include -#include "jungfraujoch.hpp" +#include "../../core-buffer/include/formats.hpp" /** Frame cache @@ -32,25 +32,25 @@ public: /** Emplace a specific frame and module **/ - void emplace(uint64_t pulseID, uint64_t moduleID, char* ptr_source, ModuleFrame& ref_meta){ + void emplace(uint64_t pulseID, uint64_t moduleID, BufferBinaryFormat& ref_frame){ uint64_t idx = pulseID % m_capacity; // Wait for unlocking block while(m_vlock[idx]){ std::this_thread::yield(); } // Invalid cache line: Just start a new line - if(m_valid[idx]){ start_line(idx, ref_meta); } + if(m_valid[idx]){ start_line(idx, ref_frame.meta); } // A new frame is starting - if(ref_meta.frame_index != m_meta[idx].frame_index){ + if(ref_frame.meta.frame_index != m_meta[idx].frame_index){ flush_line(idx); - start_line(idx, ref_meta); + start_line(idx, ref_frame.meta); } m_fill[idx]++; char* ptr_dest = m_data[idx].data() + moduleID * m_blocksize; - std::memcpy(ptr_dest, (void*)ptr_source, m_blocksize); - std::memcpy(&m_meta[idx], (void*)&ref_meta, sizeof(ModuleFrame)); + std::memcpy(ptr_dest, (void*)&ref_frame.data, m_blocksize); + std::memcpy(&m_meta[idx], (void*)&ref_frame.meta, sizeof(ModuleFrame)); } diff --git a/jfj-combined/include/JfjFrameStats.hpp b/jfj-combined/include/JfjFrameStats.hpp index 7839a38..e96d3f5 100644 --- a/jfj-combined/include/JfjFrameStats.hpp +++ b/jfj-combined/include/JfjFrameStats.hpp @@ -1,5 +1,5 @@ #include -#include +#include "../../core-buffer/include/formats.hpp" #include #ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP diff --git a/jfj-combined/include/JfjFrameWorker.hpp b/jfj-combined/include/JfjFrameWorker.hpp index c4299cb..dfd49d0 100644 --- a/jfj-combined/include/JfjFrameWorker.hpp +++ b/jfj-combined/include/JfjFrameWorker.hpp @@ -1,12 +1,11 @@ -#ifndef SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP -#define SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP +#ifndef SF_DAQ_BUFFER_JFJ_FRAMEWORKER_HPP +#define SF_DAQ_BUFFER_JFJ_FRAMEWORKER_HPP +#include #include +#include "../../core-buffer/include/formats.hpp" #include "PacketUdpReceiver.hpp" -#include "formats.hpp" -#include "buffer_config.hpp" #include "PacketBuffer.hpp" -#include "jungfraujoch.hpp" /** JungfrauJoch UDP receiver @@ -14,6 +13,7 @@ NOTE: This design will not scale well for higher frame rates... **/ class JfjFrameWorker { + std::string m_state = "INIT"; PacketUdpReceiver m_udp_receiver; bool in_progress = false; uint64_t m_frame_index = 0; @@ -21,19 +21,25 @@ class JfjFrameWorker { const uint64_t m_num_packets; const uint64_t m_num_data_bytes; - // PacketBuffer m_buffer; PacketBuffer m_buffer; - inline void init_frame(ModuleFrame& frame_metadata, const jfjoch_packet_t& c_packet); - inline uint64_t process_packets(ModuleFrame& metadata, char* frame_buffer); + inline uint64_t process_packets(BufferBinaryFormat& buffer); + uint64_t get_frame(BufferBinaryFormat& buffer); - std::function f_push_callback; + std::function f_push_callback; public: - JfjFrameWorker(const uint16_t port, const uint32_t moduleID, - std::function callback); + JfjFrameWorker(const uint16_t port, const uint32_t moduleID, + std::function callback); virtual ~JfjFrameWorker(); - uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer); + std::string print() const; void run(); }; -#endif //SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP + +std::ostream& operator<<(std::ostream& os, const JfjFrameWorker& worker){ + os << worker.print() << std::endl; + return os; +} + + +#endif //SF_DAQ_BUFFER_JFJ_FRAMEWORKER_HPP diff --git a/jfj-combined/include/PacketBuffer.hpp b/jfj-combined/include/PacketBuffer.hpp index abd08bb..f571094 100644 --- a/jfj-combined/include/PacketBuffer.hpp +++ b/jfj-combined/include/PacketBuffer.hpp @@ -5,8 +5,13 @@ #include #include #include -#include -#include +#include +#if defined(WIN32) || defined(_WIN32) || defined(MINGW32) + #include +#else + #include + #include +#endif // defined /** Linear data buffer (NOT FIFO) diff --git a/jfj-combined/src/JfjFrameWorker.cpp b/jfj-combined/src/JfjFrameWorker.cpp index 344f162..2cfee63 100644 --- a/jfj-combined/src/JfjFrameWorker.cpp +++ b/jfj-combined/src/JfjFrameWorker.cpp @@ -7,24 +7,19 @@ using namespace buffer_config; JfjFrameWorker::JfjFrameWorker(const uint16_t port, const uint32_t moduleID, - std::function callback): + std::function callback): m_moduleID(moduleID), m_num_packets(JFJOCH_N_PACKETS_PER_MODULE), m_num_data_bytes(JFJOCH_DATA_BYTES_PER_MODULE), f_push_callback(callback) { m_udp_receiver.bind(port); + m_state = "ON"; } JfjFrameWorker::~JfjFrameWorker() { m_udp_receiver.disconnect(); } -inline void JfjFrameWorker::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) { - metadata.pulse_id = c_packet.bunchid; - metadata.frame_index = c_packet.framenum; - metadata.daq_rec = (uint64_t) c_packet.debug; - metadata.module_id = (int64_t) 0; -} -inline uint64_t JfjFrameWorker::process_packets(ModuleFrame& metadata, char* frame_buffer){ +inline uint64_t JfjFrameWorker::process_packets(BufferBinaryFormat& buffer){ while(!m_buffer.is_empty()){ // Happens if the last packet from the previous frame gets lost. @@ -32,7 +27,7 @@ inline uint64_t JfjFrameWorker::process_packets(ModuleFrame& metadata, char* fra m_frame_index = m_buffer.peek_front().framenum; if(this->in_progress){ this->in_progress = false; - return metadata.pulse_id; + return buffer.meta.pulse_id; } } @@ -41,36 +36,36 @@ inline uint64_t JfjFrameWorker::process_packets(ModuleFrame& metadata, char* fra m_frame_index = c_packet.framenum; this->in_progress = true; + // Always copy metadata (otherwise problem when 0th packet gets lost) - this->init_frame(metadata, c_packet); + buffer.meta.pulse_id = c_packet.bunchid; + buffer.meta.frame_index = c_packet.framenum; + buffer.meta.daq_rec = c_packet.debug; + buffer.meta.module_id = m_moduleID; // Copy data to frame buffer size_t offset = JFJOCH_DATA_BYTES_PER_PACKET * c_packet.packetnum; - memcpy( (void*) (frame_buffer + offset), c_packet.data, JFJOCH_DATA_BYTES_PER_PACKET); - metadata.n_recv_packets++; + memcpy( (void*) (&buffer.data + offset), c_packet.data, JFJOCH_DATA_BYTES_PER_PACKET); + buffer.meta.n_recv_packets++; // Last frame packet received. Frame finished. if (c_packet.packetnum == m_num_packets - 1){ this->in_progress = false; - return metadata.pulse_id; + return buffer.meta.pulse_id; } } // We emptied the buffer. - // m_buffer.reset(); return 0; } -uint64_t JfjFrameWorker::get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer){ +uint64_t JfjFrameWorker::get_frame(BufferBinaryFormat& buffer){ // Reset the metadata and frame buffer for the next frame. (really needed?) - metadata.pulse_id = 0; - metadata.n_recv_packets = 0; - memset(frame_buffer, 0, m_num_data_bytes); - + memset(&buffer, 0, sizeof(buffer)); // Process leftover packages in the buffer if (!m_buffer.is_empty()) { - auto pulse_id = process_packets(metadata, frame_buffer); + auto pulse_id = process_packets(buffer); if (pulse_id != 0) { return pulse_id; } } @@ -81,32 +76,40 @@ uint64_t JfjFrameWorker::get_frame_from_udp(ModuleFrame& metadata, char* frame_b if (m_buffer.is_empty()) { continue; } // ... and process them - auto pulse_id = process_packets(metadata, frame_buffer); + auto pulse_id = process_packets(buffer); if (pulse_id != 0) { return pulse_id; } } } void JfjFrameWorker::run(){ std::cout << "Running worker loop" << std::endl; - // Might be better creating a structure for double buffering - ModuleFrame frameMeta; - char* dataBuffer = new char[JFJOCH_DATA_BYTES_PER_MODULE]; + BufferBinaryFormat buffer; uint64_t pulse_id_previous = 0; uint64_t frame_index_previous = 0; + try{ + m_state = "RUNNING"; + while (true) { + // NOTE: Needs to be pipelined for really high frame rates + auto pulse_id = get_frame(buffer); - while (true) { - // NOTE: Needs to be pipelined for really high frame rates - auto pulse_id = get_frame_from_udp(frameMeta, dataBuffer); - - if(pulse_id>1000){ - f_push_callback(pulse_id, m_moduleID, dataBuffer, frameMeta); + if(pulse_id>10){ + f_push_callback(pulse_id, m_moduleID, buffer); + } } - } + } catch (const std::exception& ex) { + std::cout << "Exception in worker loop: " << ex.what() << std::endl; + throw; + }; - delete[] dataBuffer; +} + +std::string JfjFrameWorker::print() const { + std::string msg = "JungfrauFrameWorker #" + std::to_string(m_moduleID) + "\n"+ + "State:\t" + m_state + "\n"; + return msg; } @@ -119,5 +122,3 @@ void JfjFrameWorker::run(){ - - diff --git a/jfj-combined/src/main.cpp b/jfj-combined/src/main.cpp index 7e98b51..891ea83 100644 --- a/jfj-combined/src/main.cpp +++ b/jfj-combined/src/main.cpp @@ -2,7 +2,7 @@ #include #include -#include "formats.hpp" +#include "../../core-buffer/include/formats.hpp" #include "../include/JfjFrameCache.hpp" #include "../include/JfjFrameWorker.hpp" @@ -14,29 +14,20 @@ void dummy_sender(ImageMetadata* meta, std::vector* data){ int main (int argc, char *argv[]) { - - - - std::cout << "Creating frame cache..." << std::endl; FrameCache cache(32, 3, JFJOCH_DATA_BYTES_PER_MODULE, &dummy_sender); - - - - - std::function push_cb = - std::bind(&FrameCache::emplace, &cache, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4); - - - + + std::function push_cb = + std::bind(&FrameCache::emplace, &cache, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + std::cout << "Creating workers..." << std::endl; JfjFrameWorker W0(5005, 0, push_cb); JfjFrameWorker W1(5006, 1, push_cb); JfjFrameWorker W2(5007, 2, push_cb); - - std::thread T0(&JfjFrameWorker::run, &W0); - - + + std::thread T0(&JfjFrameWorker::run, &W0); + + T0.join(); std::cout << "Exiting program..." << std::endl; return 0;