Clanup depends

This commit is contained in:
2021-06-21 14:07:33 +02:00
parent 0600cfa583
commit adc7071f67
8 changed files with 80 additions and 76 deletions
+1
View File
@@ -3,6 +3,7 @@
#include "buffer_config.hpp"
#include "jungfrau.hpp"
#include "jungfraujoch.hpp"
#pragma pack(push)
#pragma pack(1)
+1 -1
View File
@@ -2,7 +2,7 @@ file(GLOB SOURCES src/*.cpp)
add_library(jfj-combined-lib STATIC ${SOURCES})
target_include_directories(jfj-combined-lib PUBLIC include/)
target_link_libraries(jfj-combined-lib external core-buffer-lib)
target_link_libraries(jfj-combined-lib external)
add_executable(jfj-combined src/main.cpp)
set_target_properties(jfj-combined PROPERTIES OUTPUT_NAME jfj_combined)
+7 -7
View File
@@ -11,7 +11,7 @@
#include <functional>
#include <thread>
#include "jungfraujoch.hpp"
#include "../../core-buffer/include/formats.hpp"
/** Frame cache
@@ -32,25 +32,25 @@ public:
/** Emplace a specific frame and module **/
void emplace(uint64_t pulseID, uint64_t moduleID, char* ptr_source, ModuleFrame& ref_meta){
void emplace(uint64_t pulseID, uint64_t moduleID, BufferBinaryFormat& ref_frame){
uint64_t idx = pulseID % m_capacity;
// Wait for unlocking block
while(m_vlock[idx]){ std::this_thread::yield(); }
// Invalid cache line: Just start a new line
if(m_valid[idx]){ start_line(idx, ref_meta); }
if(m_valid[idx]){ start_line(idx, ref_frame.meta); }
// A new frame is starting
if(ref_meta.frame_index != m_meta[idx].frame_index){
if(ref_frame.meta.frame_index != m_meta[idx].frame_index){
flush_line(idx);
start_line(idx, ref_meta);
start_line(idx, ref_frame.meta);
}
m_fill[idx]++;
char* ptr_dest = m_data[idx].data() + moduleID * m_blocksize;
std::memcpy(ptr_dest, (void*)ptr_source, m_blocksize);
std::memcpy(&m_meta[idx], (void*)&ref_meta, sizeof(ModuleFrame));
std::memcpy(ptr_dest, (void*)&ref_frame.data, m_blocksize);
std::memcpy(&m_meta[idx], (void*)&ref_frame.meta, sizeof(ModuleFrame));
}
+1 -1
View File
@@ -1,5 +1,5 @@
#include <cstddef>
#include <formats.hpp>
#include "../../core-buffer/include/formats.hpp"
#include <chrono>
#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP
+19 -13
View File
@@ -1,12 +1,11 @@
#ifndef SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP
#define SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP
#ifndef SF_DAQ_BUFFER_JFJ_FRAMEWORKER_HPP
#define SF_DAQ_BUFFER_JFJ_FRAMEWORKER_HPP
#include <iostream>
#include <functional>
#include "../../core-buffer/include/formats.hpp"
#include "PacketUdpReceiver.hpp"
#include "formats.hpp"
#include "buffer_config.hpp"
#include "PacketBuffer.hpp"
#include "jungfraujoch.hpp"
/** JungfrauJoch UDP receiver
@@ -14,6 +13,7 @@
NOTE: This design will not scale well for higher frame rates...
**/
class JfjFrameWorker {
std::string m_state = "INIT";
PacketUdpReceiver m_udp_receiver;
bool in_progress = false;
uint64_t m_frame_index = 0;
@@ -21,19 +21,25 @@ class JfjFrameWorker {
const uint64_t m_num_packets;
const uint64_t m_num_data_bytes;
// PacketBuffer<jfjoch_packet_t, buffer_config::BUFFER_UDP_N_RECV_MSG> m_buffer;
PacketBuffer<jfjoch_packet_t, 64> m_buffer;
inline void init_frame(ModuleFrame& frame_metadata, const jfjoch_packet_t& c_packet);
inline uint64_t process_packets(ModuleFrame& metadata, char* frame_buffer);
inline uint64_t process_packets(BufferBinaryFormat& buffer);
uint64_t get_frame(BufferBinaryFormat& buffer);
std::function<void(uint64_t, uint64_t, char*, ModuleFrame&)> f_push_callback;
std::function<void(uint64_t, uint64_t, BufferBinaryFormat&)> f_push_callback;
public:
JfjFrameWorker(const uint16_t port, const uint32_t moduleID,
std::function<void(uint64_t, uint64_t, char*, ModuleFrame&)> callback);
JfjFrameWorker(const uint16_t port, const uint32_t moduleID,
std::function<void(uint64_t, uint64_t, BufferBinaryFormat&)> callback);
virtual ~JfjFrameWorker();
uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer);
std::string print() const;
void run();
};
#endif //SF_DAQ_BUFFER_JFJ_UDPRECEIVER_HPP
std::ostream& operator<<(std::ostream& os, const JfjFrameWorker& worker){
os << worker.print() << std::endl;
return os;
}
#endif //SF_DAQ_BUFFER_JFJ_FRAMEWORKER_HPP
+7 -2
View File
@@ -5,8 +5,13 @@
#include <stdexcept>
#include <iostream>
#include <mutex>
#include <sys/socket.h>
#include <netinet/in.h>
#include <cstdint>
#if defined(WIN32) || defined(_WIN32) || defined(MINGW32)
#include <winsock2.h>
#else
#include <sys/socket.h>
#include <netinet/in.h>
#endif // defined
/** Linear data buffer (NOT FIFO)
+35 -34
View File
@@ -7,24 +7,19 @@ using namespace buffer_config;
JfjFrameWorker::JfjFrameWorker(const uint16_t port, const uint32_t moduleID,
std::function<void(uint64_t, uint64_t, char*, ModuleFrame&)> callback):
std::function<void(uint64_t, uint64_t, BufferBinaryFormat&)> callback):
m_moduleID(moduleID), m_num_packets(JFJOCH_N_PACKETS_PER_MODULE),
m_num_data_bytes(JFJOCH_DATA_BYTES_PER_MODULE), f_push_callback(callback) {
m_udp_receiver.bind(port);
m_state = "ON";
}
JfjFrameWorker::~JfjFrameWorker() {
m_udp_receiver.disconnect();
}
inline void JfjFrameWorker::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) {
metadata.pulse_id = c_packet.bunchid;
metadata.frame_index = c_packet.framenum;
metadata.daq_rec = (uint64_t) c_packet.debug;
metadata.module_id = (int64_t) 0;
}
inline uint64_t JfjFrameWorker::process_packets(ModuleFrame& metadata, char* frame_buffer){
inline uint64_t JfjFrameWorker::process_packets(BufferBinaryFormat& buffer){
while(!m_buffer.is_empty()){
// Happens if the last packet from the previous frame gets lost.
@@ -32,7 +27,7 @@ inline uint64_t JfjFrameWorker::process_packets(ModuleFrame& metadata, char* fra
m_frame_index = m_buffer.peek_front().framenum;
if(this->in_progress){
this->in_progress = false;
return metadata.pulse_id;
return buffer.meta.pulse_id;
}
}
@@ -41,36 +36,36 @@ inline uint64_t JfjFrameWorker::process_packets(ModuleFrame& metadata, char* fra
m_frame_index = c_packet.framenum;
this->in_progress = true;
// Always copy metadata (otherwise problem when 0th packet gets lost)
this->init_frame(metadata, c_packet);
buffer.meta.pulse_id = c_packet.bunchid;
buffer.meta.frame_index = c_packet.framenum;
buffer.meta.daq_rec = c_packet.debug;
buffer.meta.module_id = m_moduleID;
// Copy data to frame buffer
size_t offset = JFJOCH_DATA_BYTES_PER_PACKET * c_packet.packetnum;
memcpy( (void*) (frame_buffer + offset), c_packet.data, JFJOCH_DATA_BYTES_PER_PACKET);
metadata.n_recv_packets++;
memcpy( (void*) (&buffer.data + offset), c_packet.data, JFJOCH_DATA_BYTES_PER_PACKET);
buffer.meta.n_recv_packets++;
// Last frame packet received. Frame finished.
if (c_packet.packetnum == m_num_packets - 1){
this->in_progress = false;
return metadata.pulse_id;
return buffer.meta.pulse_id;
}
}
// We emptied the buffer.
// m_buffer.reset();
return 0;
}
uint64_t JfjFrameWorker::get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer){
uint64_t JfjFrameWorker::get_frame(BufferBinaryFormat& buffer){
// Reset the metadata and frame buffer for the next frame. (really needed?)
metadata.pulse_id = 0;
metadata.n_recv_packets = 0;
memset(frame_buffer, 0, m_num_data_bytes);
memset(&buffer, 0, sizeof(buffer));
// Process leftover packages in the buffer
if (!m_buffer.is_empty()) {
auto pulse_id = process_packets(metadata, frame_buffer);
auto pulse_id = process_packets(buffer);
if (pulse_id != 0) { return pulse_id; }
}
@@ -81,32 +76,40 @@ uint64_t JfjFrameWorker::get_frame_from_udp(ModuleFrame& metadata, char* frame_b
if (m_buffer.is_empty()) { continue; }
// ... and process them
auto pulse_id = process_packets(metadata, frame_buffer);
auto pulse_id = process_packets(buffer);
if (pulse_id != 0) { return pulse_id; }
}
}
void JfjFrameWorker::run(){
std::cout << "Running worker loop" << std::endl;
// Might be better creating a structure for double buffering
ModuleFrame frameMeta;
char* dataBuffer = new char[JFJOCH_DATA_BYTES_PER_MODULE];
BufferBinaryFormat buffer;
uint64_t pulse_id_previous = 0;
uint64_t frame_index_previous = 0;
try{
m_state = "RUNNING";
while (true) {
// NOTE: Needs to be pipelined for really high frame rates
auto pulse_id = get_frame(buffer);
while (true) {
// NOTE: Needs to be pipelined for really high frame rates
auto pulse_id = get_frame_from_udp(frameMeta, dataBuffer);
if(pulse_id>1000){
f_push_callback(pulse_id, m_moduleID, dataBuffer, frameMeta);
if(pulse_id>10){
f_push_callback(pulse_id, m_moduleID, buffer);
}
}
}
} catch (const std::exception& ex) {
std::cout << "Exception in worker loop: " << ex.what() << std::endl;
throw;
};
delete[] dataBuffer;
}
std::string JfjFrameWorker::print() const {
std::string msg = "JungfrauFrameWorker #" + std::to_string(m_moduleID) + "\n"+
"State:\t" + m_state + "\n";
return msg;
}
@@ -119,5 +122,3 @@ void JfjFrameWorker::run(){
+9 -18
View File
@@ -2,7 +2,7 @@
#include <stdexcept>
#include <zmq.h>
#include "formats.hpp"
#include "../../core-buffer/include/formats.hpp"
#include "../include/JfjFrameCache.hpp"
#include "../include/JfjFrameWorker.hpp"
@@ -14,29 +14,20 @@ void dummy_sender(ImageMetadata* meta, std::vector<char>* data){
int main (int argc, char *argv[]) {
std::cout << "Creating frame cache..." << std::endl;
FrameCache cache(32, 3, JFJOCH_DATA_BYTES_PER_MODULE, &dummy_sender);
std::function<void(uint64_t, uint64_t, char*, ModuleFrame&)> push_cb =
std::bind(&FrameCache::emplace, &cache, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4);
std::function<void(uint64_t, uint64_t, BufferBinaryFormat&)> push_cb =
std::bind(&FrameCache::emplace, &cache, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
std::cout << "Creating workers..." << std::endl;
JfjFrameWorker W0(5005, 0, push_cb);
JfjFrameWorker W1(5006, 1, push_cb);
JfjFrameWorker W2(5007, 2, push_cb);
std::thread T0(&JfjFrameWorker::run, &W0);
std::thread T0(&JfjFrameWorker::run, &W0);
T0.join();
std::cout << "Exiting program..." << std::endl;
return 0;