This commit is contained in:
Mohacsi Istvan
2021-06-21 11:40:06 +02:00
parent 3fc87e1740
commit 9c6eb376bd
7 changed files with 50 additions and 138 deletions
+1 -1
View File
@@ -4,7 +4,7 @@ add_library(jfj-combined-lib STATIC ${SOURCES})
target_include_directories(jfj-combined-lib PUBLIC include/)
target_link_libraries(jfj-combined-lib external core-buffer-lib)
add_executable(jfj-combined src/dummyMain.cpp)
add_executable(jfj-combined src/main.cpp)
set_target_properties(jfj-combined PROPERTIES OUTPUT_NAME jf_combined)
target_link_libraries(jfj-combined jfj-combined-lib zmq rt)
+11 -10
View File
@@ -2,6 +2,7 @@
#define FRAME_CACHE_HPP
#include <cstddef>
#include <cstring>
#include <stdexcept>
#include <iostream>
#include <mutex>
@@ -31,25 +32,25 @@ public:
/** Emplace a specific frame and module **/
void emplace(uint64_t pulseID, uint32_t moduleID, char* ptr_source, ModuleFrame* ptr_meta){
void emplace(uint64_t pulseID, uint64_t moduleID, char* ptr_source, ModuleFrame& ref_meta){
uint64_t idx = pulseID % m_capacity;
// Wait for unlocking block
while(m_vlock[idx]){ std::this_thread::yield(); }
// Invalid cache line: Just start a new line
if(m_valid[idx]){ start_line(idx, ptr_meta); }
if(m_valid[idx]){ start_line(idx, ref_meta); }
// A new frame is starting
if(ptr_meta->frame_index != m_meta[idx].frame_index){
if(ref_meta.frame_index != m_meta[idx].frame_index){
flush_line(idx);
start_line(idx, ptr_meta);
start_line(idx, ref_meta);
}
m_fill[idx]++;
char* ptr_dest = m_data[idx].data() + moduleID * m_blocksize;
memcpy(ptr_dest, (void*)ptr_source, m_blocksize);
memcpy(&m_meta[idx], (void*)ptr_meta, sizeof(ModuleFrame));
std::memcpy(ptr_dest, (void*)ptr_source, m_blocksize);
std::memcpy(&m_meta[idx], (void*)&ref_meta, sizeof(ModuleFrame));
}
@@ -71,11 +72,11 @@ public:
}
}
void start_line(uint64_t idx, ModuleFrame* ptr_meta){
void start_line(uint64_t idx, ModuleFrame& ref_meta){
m_vlock[idx] = 1;
m_meta[idx].pulse_id = ptr_meta->pulse_id;
m_meta[idx].frame_index = ptr_meta->frame_index;
m_meta[idx].daq_rec = ptr_meta->daq_rec;
m_meta[idx].pulse_id = ref_meta.pulse_id;
m_meta[idx].frame_index = ref_meta.frame_index;
m_meta[idx].daq_rec = ref_meta.daq_rec;
m_meta[idx].is_good_image = true;
m_valid[idx] = 1;
m_fill[idx] = 0;
+7 -5
View File
@@ -1,6 +1,7 @@
#ifndef SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP
#define SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP
#include <functional>
#include "PacketUdpReceiver.hpp"
#include "formats.hpp"
#include "buffer_config.hpp"
@@ -16,7 +17,7 @@ class JfjFrameWorker {
PacketUdpReceiver m_udp_receiver;
bool in_progress = false;
uint64_t m_frame_index = 0;
const uint64_t m_num_modules;
const uint64_t m_moduleID;
const uint64_t m_num_packets;
const uint64_t m_num_data_bytes;
@@ -26,11 +27,12 @@ class JfjFrameWorker {
inline void init_frame(ModuleFrame& frame_metadata, const jfjoch_packet_t& c_packet);
inline uint64_t process_packets(ModuleFrame& metadata, char* frame_buffer);
std::function<void(uint64_t index, uint32_t module, char* ptr_data, ModuleFrame* ptr_meta)> f_push_callback;
std::function<void(uint64_t, uint64_t, char*, ModuleFrame&)> f_push_callback;
public:
JfjFrameUdpReceiver(const uint16_t port, std::function<void(uint64_t index, uint32_t module, char* ptr_data, ModuleFrame* ptr_meta)> callback);
virtual ~JfjFrameUdpReceiver();
std::generator<uint64_t> get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer);
JfjFrameWorker(const uint16_t port, const uint32_t moduleID,
std::function<void(uint64_t, uint64_t, char*, ModuleFrame&)> callback);
virtual ~JfjFrameWorker();
uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer);
void run();
};
@@ -1,6 +1,7 @@
#ifndef UDPRECEIVER_H
#define UDPRECEIVER_H
#include <cstdint>
#if defined(WIN32) || defined(_WIN32) || defined(MINGW32)
#include <winsock2.h>
#else
+8 -36
View File
@@ -6,9 +6,10 @@ using namespace buffer_config;
JfjFrameWorker::JfjFrameWorker(const uint16_t port, std::function<void(uint64_t index, uint32_t module, char* ptr_data, ModuleFrame* ptr_meta)> callback):
m_num_modules(n_modules), m_num_packets(n_modules*JFJOCH_N_PACKETS_PER_MODULE),
m_num_data_bytes(n_modules*JFJOCH_DATA_BYTES_PER_MODULE), f_push_callback(callback) {
JfjFrameWorker::JfjFrameWorker(const uint16_t port, const uint32_t moduleID,
std::function<void(uint64_t, uint64_t, char*, ModuleFrame&)> callback):
m_moduleID(moduleID), m_num_packets(JFJOCH_N_PACKETS_PER_MODULE),
m_num_data_bytes(JFJOCH_DATA_BYTES_PER_MODULE), f_push_callback(callback) {
m_udp_receiver.bind(port);
}
@@ -60,36 +61,7 @@ inline uint64_t JfjFrameWorker::process_packets(ModuleFrame& metadata, char* fra
return 0;
}
//uint64_t JfjFrameWorker::get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer){
// // Reset the metadata and frame buffer for the next frame. (really needed?)
// metadata.pulse_id = 0;
// metadata.n_recv_packets = 0;
// memset(frame_buffer, 0, m_num_data_bytes);
//
//
// // Process leftover packages in the buffer
// if (!m_buffer.is_empty()) {
// auto pulse_id = process_packets(metadata, frame_buffer);
// if (pulse_id != 0) { return pulse_id; }
// }
//
//
// while (true) {
// // Receive new packages (pass if none)...
// m_buffer.fill_from(m_udp_receiver);
// if (m_buffer.is_empty()) { continue; }
//
// // ... and process them
// auto pulse_id = process_packets(metadata, frame_buffer);
// if (pulse_id != 0) { return pulse_id; }
// }
//}
//
std::generator<uint64_t> JfjFrameWorker::get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer){
uint64_t JfjFrameWorker::get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer){
// Reset the metadata and frame buffer for the next frame. (really needed?)
metadata.pulse_id = 0;
metadata.n_recv_packets = 0;
@@ -99,7 +71,7 @@ std::generator<uint64_t> JfjFrameWorker::get_frame_from_udp(ModuleFrame& metadat
// Process leftover packages in the buffer
if (!m_buffer.is_empty()) {
auto pulse_id = process_packets(metadata, frame_buffer);
if (pulse_id != 0) { co_yield pulse_id; }
if (pulse_id != 0) { return pulse_id; }
}
@@ -110,7 +82,7 @@ std::generator<uint64_t> JfjFrameWorker::get_frame_from_udp(ModuleFrame& metadat
// ... and process them
auto pulse_id = process_packets(metadata, frame_buffer);
if (pulse_id != 0) { co_yield pulse_id; }
if (pulse_id != 0) { return pulse_id; }
}
}
@@ -127,7 +99,7 @@ void JfjFrameWorker::run(){
while (true) {
// NOTE: Needs to be pipelined for really high frame rates
auto pulse_id = co_await get_frame_from_udp(&frameMeta, dataBuffer);
auto pulse_id = get_frame_from_udp(frameMeta, dataBuffer);
if(pulse_id>1000){
f_push_callback(pulse_id, m_moduleID, dataBuffer, frameMeta);
-27
View File
@@ -1,27 +0,0 @@
#include <iostream>
#include <stdexcept>
#include <zmq.h>
#include "formats.hpp"
#include "../include/JfjFrameCache.hpp"
#include "../include/JfjFrameWorker.hpp"
void dummy_sender(ImageMetadata* meta, std::vector<char>* data){
std::cout << "Sending " << meta->frame_index << std::endl;
}
int main (int argc, char *argv[]) {
FrameCache cache(32, 3, JFJOCH_DATA_BYTES_PER_MODULE, &dummy_sender);
JfjFrameWorker W0(5005, 0, cache.emplace);
JfjFrameWorker W1(5006, 1, cache.emplace);
JfjFrameWorker W2(5007, 2, cache.emplace);
}
+22 -59
View File
@@ -1,74 +1,37 @@
#include <iostream>
#include <stdexcept>
#include <zmq.h>
#include <RamBuffer.hpp>
#include "formats.hpp"
#include "buffer_config.hpp"
#include "JfjFrameUdpReceiver.hpp"
#include "BufferUtils.hpp"
#include "JfjFrameStats.hpp"
#include "../include/JfjFrameCache.hpp"
#include "../include/JfjFrameWorker.hpp"
void dummy_sender(ImageMetadata* meta, std::vector<char>* data){
std::cout << "Sending " << meta->frame_index << std::endl;
}
using namespace std;
using namespace chrono;
using namespace buffer_config;
using namespace BufferUtils;
int main (int argc, char *argv[]) {
if (argc != 3) {
cout << endl;
cout << "Usage: jfj_udp_recv [detector_json_filename]" << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << endl;
exit(-1);
}
const auto config = read_json_config(string(argv[1]));
const auto udp_port = config.start_udp_port;
JfjFrameUdpReceiver receiver(udp_port, 8);
RamBuffer buffer(config.detector_name, config.n_modules);
FrameStats stats(config.detector_name, 0, STATS_TIME);
auto ctx = zmq_ctx_new();
zmq_ctx_set(ctx, ZMQ_IO_THREADS, ZMQ_IO_THREADS);
auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "jungfraujoch");
// Might be better creating a structure for double buffering
ModuleFrame frameMeta;
ImageMetadata imageMeta;
char* dataBuffer = new char[8 * JFJOCH_DATA_BYTES_PER_MODULE];
uint64_t pulse_id_previous = 0;
uint64_t frame_index_previous = 0;
while (true) {
// NOTE: Needs to be pipelined for really high frame rates
auto pulse_id = receiver.get_frame_from_udp(frameMeta, dataBuffer);
bool bad_pulse_id = false;
if ( ( frameMeta.frame_index != (frame_index_previous+1) ) || ( (pulse_id-pulse_id_previous) < 0 ) || ( (pulse_id-pulse_id_previous) > 1000 ) ) {
bad_pulse_id = true;
} else {
imageMeta.pulse_id = frameMeta.pulse_id;
imageMeta.frame_index = frameMeta.frame_index;
imageMeta.daq_rec = frameMeta.daq_rec;
imageMeta.is_good_image = true;
buffer.write_frame(frameMeta, dataBuffer);
zmq_send(sender, &imageMeta, sizeof(imageMeta), 0);
}
FrameCache cache(32, 3, JFJOCH_DATA_BYTES_PER_MODULE, &dummy_sender);
std::function<void(uint64_t, uint64_t, char*, ModuleFrame&)> push_cb =
std::bind(&FrameCache::emplace, &cache, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4);
JfjFrameWorker W0(5005, 0, push_cb);
JfjFrameWorker W1(5006, 1, push_cb);
JfjFrameWorker W2(5007, 2, push_cb);
stats.record_stats(frameMeta, bad_pulse_id);
pulse_id_previous = pulse_id;
frame_index_previous = frameMeta.frame_index;
}
delete[] dataBuffer;
}