mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-08 08:42:03 +02:00
Reworked frame start mutex
This commit is contained in:
@@ -31,7 +31,7 @@ public:
|
||||
f_send(callback), m_lock(_C), m_valid(_C) {
|
||||
// Initialize buffer metadata
|
||||
for(auto& it: m_buffer){ memset(&it.meta, 0, sizeof(it.meta)); }
|
||||
|
||||
|
||||
// Initialize Mutexes
|
||||
for(auto& it: m_valid){ it = 0; }
|
||||
};
|
||||
@@ -41,25 +41,24 @@ public:
|
||||
|
||||
Place a recorded frame to it's corresponding module location.
|
||||
This simultaneously handles buffering and assembly. **/
|
||||
void emplace(uint64_t pulseID, uint64_t moduleIDX, BufferBinaryFormat& ref_frame){
|
||||
void emplace(uint64_t pulseID, uint64_t moduleIDX, BufferBinaryFormat& inc_frame){
|
||||
uint64_t idx = pulseID % m_CAP;
|
||||
|
||||
// A new frame is starting
|
||||
if(ref_frame.meta.pulse_id != m_buffer[idx].meta.pulse_id){
|
||||
start_line(idx, ref_frame.meta);
|
||||
}
|
||||
|
||||
// Shared lock for concurrent PUT operations
|
||||
std::shared_lock<std::shared_mutex> guard(m_lock[idx]);
|
||||
|
||||
// Copy metadata (manually for now...)
|
||||
m_buffer[idx].meta.pulse_id = ref_frame.meta.pulse_id;
|
||||
m_buffer[idx].meta.frame_index = ref_frame.meta.frame_index;
|
||||
m_buffer[idx].meta.daq_rec = ref_frame.meta.daq_rec;
|
||||
|
||||
// A new frame is starting
|
||||
if(inc_frame.meta.pulse_id != m_buffer[idx].meta.pulse_id){
|
||||
std::unique_lock<std::shared_mutex> p_guard(m_lock[idx]);
|
||||
// Check if condition persists after getting the mutex
|
||||
if(inc_frame.meta.pulse_id != m_buffer[idx].meta.pulse_id){
|
||||
start_line(idx, inc_frame.meta);
|
||||
}
|
||||
}
|
||||
|
||||
// Shared lock for concurrent PUT operations
|
||||
std::shared_lock<std::shared_mutex> s_guard(m_lock[idx]);
|
||||
|
||||
// Calculate destination pointer (easier to debug)
|
||||
char* ptr_dest = m_buffer[idx].data.data() + moduleIDX * m_blocksize;
|
||||
std::memcpy((void*)ptr_dest, (void*)&ref_frame.data, m_blocksize);
|
||||
std::memcpy((void*)ptr_dest, (void*)&inc_frame.data, m_blocksize);
|
||||
}
|
||||
|
||||
void flush_all(){
|
||||
@@ -78,17 +77,16 @@ public:
|
||||
}
|
||||
|
||||
// Flush and start a new line (incl. lock)
|
||||
void start_line(uint64_t idx, ModuleFrame& ref_meta){
|
||||
void start_line(uint64_t idx, ModuleFrame& inc_frame){
|
||||
// 0. Guard
|
||||
std::unique_lock<std::shared_mutex> guard(m_lock[idx]);
|
||||
// 1. Flush
|
||||
if(m_valid[idx]){
|
||||
f_send(m_buffer[idx]);
|
||||
}
|
||||
// 2. Init
|
||||
m_buffer[idx].meta.pulse_id = ref_meta.pulse_id;
|
||||
m_buffer[idx].meta.frame_index = ref_meta.frame_index;
|
||||
m_buffer[idx].meta.daq_rec = ref_meta.daq_rec;
|
||||
m_buffer[idx].meta.pulse_id = inc_frame.pulse_id;
|
||||
m_buffer[idx].meta.frame_index = inc_frame.frame_index;
|
||||
m_buffer[idx].meta.daq_rec = inc_frame.daq_rec;
|
||||
m_buffer[idx].meta.is_good_image = true;
|
||||
m_valid[idx] = 1;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user