diff --git a/jfj-combined/CMakeLists.txt b/jfj-combined/CMakeLists.txt index 0085b99..bbe23ed 100644 --- a/jfj-combined/CMakeLists.txt +++ b/jfj-combined/CMakeLists.txt @@ -4,7 +4,7 @@ add_library(jfj-combined-lib STATIC ${SOURCES}) target_include_directories(jfj-combined-lib PUBLIC include/) target_link_libraries(jfj-combined-lib external core-buffer-lib) -add_executable(jfj-combined src/dummyMain.cpp) +add_executable(jfj-combined src/main.cpp) set_target_properties(jfj-combined PROPERTIES OUTPUT_NAME jf_combined) target_link_libraries(jfj-combined jfj-combined-lib zmq rt) diff --git a/jfj-combined/include/JfjFrameCache.hpp b/jfj-combined/include/JfjFrameCache.hpp index 0742b0a..4d45aa7 100644 --- a/jfj-combined/include/JfjFrameCache.hpp +++ b/jfj-combined/include/JfjFrameCache.hpp @@ -2,6 +2,7 @@ #define FRAME_CACHE_HPP #include +#include #include #include #include @@ -31,25 +32,25 @@ public: /** Emplace a specific frame and module **/ - void emplace(uint64_t pulseID, uint32_t moduleID, char* ptr_source, ModuleFrame* ptr_meta){ + void emplace(uint64_t pulseID, uint64_t moduleID, char* ptr_source, ModuleFrame& ref_meta){ uint64_t idx = pulseID % m_capacity; // Wait for unlocking block while(m_vlock[idx]){ std::this_thread::yield(); } // Invalid cache line: Just start a new line - if(m_valid[idx]){ start_line(idx, ptr_meta); } + if(m_valid[idx]){ start_line(idx, ref_meta); } // A new frame is starting - if(ptr_meta->frame_index != m_meta[idx].frame_index){ + if(ref_meta.frame_index != m_meta[idx].frame_index){ flush_line(idx); - start_line(idx, ptr_meta); + start_line(idx, ref_meta); } m_fill[idx]++; char* ptr_dest = m_data[idx].data() + moduleID * m_blocksize; - memcpy(ptr_dest, (void*)ptr_source, m_blocksize); - memcpy(&m_meta[idx], (void*)ptr_meta, sizeof(ModuleFrame)); + std::memcpy(ptr_dest, (void*)ptr_source, m_blocksize); + std::memcpy(&m_meta[idx], (void*)&ref_meta, sizeof(ModuleFrame)); } @@ -71,11 +72,11 @@ public: } } - void start_line(uint64_t idx, ModuleFrame* ptr_meta){ + void start_line(uint64_t idx, ModuleFrame& ref_meta){ m_vlock[idx] = 1; - m_meta[idx].pulse_id = ptr_meta->pulse_id; - m_meta[idx].frame_index = ptr_meta->frame_index; - m_meta[idx].daq_rec = ptr_meta->daq_rec; + m_meta[idx].pulse_id = ref_meta.pulse_id; + m_meta[idx].frame_index = ref_meta.frame_index; + m_meta[idx].daq_rec = ref_meta.daq_rec; m_meta[idx].is_good_image = true; m_valid[idx] = 1; m_fill[idx] = 0; diff --git a/jfj-combined/include/JfjFrameWorker.hpp b/jfj-combined/include/JfjFrameWorker.hpp index 37874b8..c4299cb 100644 --- a/jfj-combined/include/JfjFrameWorker.hpp +++ b/jfj-combined/include/JfjFrameWorker.hpp @@ -1,6 +1,7 @@ #ifndef SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP #define SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP +#include #include "PacketUdpReceiver.hpp" #include "formats.hpp" #include "buffer_config.hpp" @@ -16,7 +17,7 @@ class JfjFrameWorker { PacketUdpReceiver m_udp_receiver; bool in_progress = false; uint64_t m_frame_index = 0; - const uint64_t m_num_modules; + const uint64_t m_moduleID; const uint64_t m_num_packets; const uint64_t m_num_data_bytes; @@ -26,11 +27,12 @@ class JfjFrameWorker { inline void init_frame(ModuleFrame& frame_metadata, const jfjoch_packet_t& c_packet); inline uint64_t process_packets(ModuleFrame& metadata, char* frame_buffer); - std::function f_push_callback; + std::function f_push_callback; public: - JfjFrameUdpReceiver(const uint16_t port, std::function callback); - virtual ~JfjFrameUdpReceiver(); - std::generator get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer); + JfjFrameWorker(const uint16_t port, const uint32_t moduleID, + std::function callback); + virtual ~JfjFrameWorker(); + uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer); void run(); }; diff --git a/jfj-combined/include/PacketUdpReceiver.hpp b/jfj-combined/include/PacketUdpReceiver.hpp index 8fa9176..4999ec7 100644 --- a/jfj-combined/include/PacketUdpReceiver.hpp +++ b/jfj-combined/include/PacketUdpReceiver.hpp @@ -1,6 +1,7 @@ #ifndef UDPRECEIVER_H #define UDPRECEIVER_H +#include #if defined(WIN32) || defined(_WIN32) || defined(MINGW32) #include #else diff --git a/jfj-combined/src/JfjFrameWorker.cpp b/jfj-combined/src/JfjFrameWorker.cpp index 63dc6fb..71f91f7 100644 --- a/jfj-combined/src/JfjFrameWorker.cpp +++ b/jfj-combined/src/JfjFrameWorker.cpp @@ -6,9 +6,10 @@ using namespace buffer_config; -JfjFrameWorker::JfjFrameWorker(const uint16_t port, std::function callback): - m_num_modules(n_modules), m_num_packets(n_modules*JFJOCH_N_PACKETS_PER_MODULE), - m_num_data_bytes(n_modules*JFJOCH_DATA_BYTES_PER_MODULE), f_push_callback(callback) { +JfjFrameWorker::JfjFrameWorker(const uint16_t port, const uint32_t moduleID, + std::function callback): + m_moduleID(moduleID), m_num_packets(JFJOCH_N_PACKETS_PER_MODULE), + m_num_data_bytes(JFJOCH_DATA_BYTES_PER_MODULE), f_push_callback(callback) { m_udp_receiver.bind(port); } @@ -60,36 +61,7 @@ inline uint64_t JfjFrameWorker::process_packets(ModuleFrame& metadata, char* fra return 0; } -//uint64_t JfjFrameWorker::get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer){ -// // Reset the metadata and frame buffer for the next frame. (really needed?) -// metadata.pulse_id = 0; -// metadata.n_recv_packets = 0; -// memset(frame_buffer, 0, m_num_data_bytes); -// -// -// // Process leftover packages in the buffer -// if (!m_buffer.is_empty()) { -// auto pulse_id = process_packets(metadata, frame_buffer); -// if (pulse_id != 0) { return pulse_id; } -// } -// -// -// while (true) { -// // Receive new packages (pass if none)... -// m_buffer.fill_from(m_udp_receiver); -// if (m_buffer.is_empty()) { continue; } -// -// // ... and process them -// auto pulse_id = process_packets(metadata, frame_buffer); -// if (pulse_id != 0) { return pulse_id; } -// } -//} -// - - - - -std::generator JfjFrameWorker::get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer){ +uint64_t JfjFrameWorker::get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer){ // Reset the metadata and frame buffer for the next frame. (really needed?) metadata.pulse_id = 0; metadata.n_recv_packets = 0; @@ -99,7 +71,7 @@ std::generator JfjFrameWorker::get_frame_from_udp(ModuleFrame& metadat // Process leftover packages in the buffer if (!m_buffer.is_empty()) { auto pulse_id = process_packets(metadata, frame_buffer); - if (pulse_id != 0) { co_yield pulse_id; } + if (pulse_id != 0) { return pulse_id; } } @@ -110,7 +82,7 @@ std::generator JfjFrameWorker::get_frame_from_udp(ModuleFrame& metadat // ... and process them auto pulse_id = process_packets(metadata, frame_buffer); - if (pulse_id != 0) { co_yield pulse_id; } + if (pulse_id != 0) { return pulse_id; } } } @@ -127,7 +99,7 @@ void JfjFrameWorker::run(){ while (true) { // NOTE: Needs to be pipelined for really high frame rates - auto pulse_id = co_await get_frame_from_udp(&frameMeta, dataBuffer); + auto pulse_id = get_frame_from_udp(frameMeta, dataBuffer); if(pulse_id>1000){ f_push_callback(pulse_id, m_moduleID, dataBuffer, frameMeta); diff --git a/jfj-combined/src/dummyMain.cpp b/jfj-combined/src/dummyMain.cpp deleted file mode 100644 index 0ecea91..0000000 --- a/jfj-combined/src/dummyMain.cpp +++ /dev/null @@ -1,27 +0,0 @@ -#include -#include -#include - -#include "formats.hpp" -#include "../include/JfjFrameCache.hpp" -#include "../include/JfjFrameWorker.hpp" - - -void dummy_sender(ImageMetadata* meta, std::vector* data){ - std::cout << "Sending " << meta->frame_index << std::endl; -} - - - -int main (int argc, char *argv[]) { - - - - - - FrameCache cache(32, 3, JFJOCH_DATA_BYTES_PER_MODULE, &dummy_sender); - JfjFrameWorker W0(5005, 0, cache.emplace); - JfjFrameWorker W1(5006, 1, cache.emplace); - JfjFrameWorker W2(5007, 2, cache.emplace); - -} diff --git a/jfj-combined/src/main.cpp b/jfj-combined/src/main.cpp index fb2c3ce..304e535 100644 --- a/jfj-combined/src/main.cpp +++ b/jfj-combined/src/main.cpp @@ -1,74 +1,37 @@ #include #include #include -#include #include "formats.hpp" -#include "buffer_config.hpp" -#include "JfjFrameUdpReceiver.hpp" -#include "BufferUtils.hpp" -#include "JfjFrameStats.hpp" +#include "../include/JfjFrameCache.hpp" +#include "../include/JfjFrameWorker.hpp" + + +void dummy_sender(ImageMetadata* meta, std::vector* data){ + std::cout << "Sending " << meta->frame_index << std::endl; +} + -using namespace std; -using namespace chrono; -using namespace buffer_config; -using namespace BufferUtils; int main (int argc, char *argv[]) { - if (argc != 3) { - cout << endl; - cout << "Usage: jfj_udp_recv [detector_json_filename]" << endl; - cout << "\tdetector_json_filename: detector config file path." << endl; - cout << endl; - - exit(-1); - } - - const auto config = read_json_config(string(argv[1])); - - const auto udp_port = config.start_udp_port; - JfjFrameUdpReceiver receiver(udp_port, 8); - RamBuffer buffer(config.detector_name, config.n_modules); - FrameStats stats(config.detector_name, 0, STATS_TIME); - - auto ctx = zmq_ctx_new(); - zmq_ctx_set(ctx, ZMQ_IO_THREADS, ZMQ_IO_THREADS); - auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "jungfraujoch"); - - // Might be better creating a structure for double buffering - ModuleFrame frameMeta; - ImageMetadata imageMeta; - char* dataBuffer = new char[8 * JFJOCH_DATA_BYTES_PER_MODULE]; - - uint64_t pulse_id_previous = 0; - uint64_t frame_index_previous = 0; - while (true) { - // NOTE: Needs to be pipelined for really high frame rates - auto pulse_id = receiver.get_frame_from_udp(frameMeta, dataBuffer); - bool bad_pulse_id = false; - if ( ( frameMeta.frame_index != (frame_index_previous+1) ) || ( (pulse_id-pulse_id_previous) < 0 ) || ( (pulse_id-pulse_id_previous) > 1000 ) ) { - bad_pulse_id = true; - } else { - imageMeta.pulse_id = frameMeta.pulse_id; - imageMeta.frame_index = frameMeta.frame_index; - imageMeta.daq_rec = frameMeta.daq_rec; - imageMeta.is_good_image = true; - - buffer.write_frame(frameMeta, dataBuffer); - zmq_send(sender, &imageMeta, sizeof(imageMeta), 0); - } + FrameCache cache(32, 3, JFJOCH_DATA_BYTES_PER_MODULE, &dummy_sender); + + + + + std::function push_cb = + std::bind(&FrameCache::emplace, &cache, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4); + + + + + JfjFrameWorker W0(5005, 0, push_cb); + JfjFrameWorker W1(5006, 1, push_cb); + JfjFrameWorker W2(5007, 2, push_cb); - stats.record_stats(frameMeta, bad_pulse_id); - - pulse_id_previous = pulse_id; - frame_index_previous = frameMeta.frame_index; - - } - - delete[] dataBuffer; }