mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-06 12:24:13 +02:00
Conversion to block logic
This commit is contained in:
+16
-16
@@ -4,7 +4,7 @@
|
||||
#include <cstring>
|
||||
|
||||
#include "buffer_config.hpp"
|
||||
#include "ReplayH5Reader.hpp"
|
||||
#include "BufferBinaryReader.hpp"
|
||||
#include "ReplayZmqSender.hpp"
|
||||
|
||||
using namespace std;
|
||||
@@ -19,12 +19,15 @@ void sf_replay (
|
||||
const uint64_t stop_pulse_id
|
||||
)
|
||||
{
|
||||
ReplayH5Reader file_reader(device, channel_name);
|
||||
BufferBinaryReader block_reader(device, channel_name);
|
||||
|
||||
// "<= stop_pulse_id" because we include the stop_pulse_id in the file.
|
||||
for (uint64_t curr_pulse_id=start_pulse_id;
|
||||
curr_pulse_id <= stop_pulse_id;
|
||||
curr_pulse_id++) {
|
||||
uint64_t start_block = start_pulse_id / BUFFER_BLOCK_SIZE;
|
||||
uint64_t stop_block = stop_pulse_id / BUFFER_BLOCK_SIZE;
|
||||
|
||||
// "<= stop_block" because we include the stop_block in the transfer.
|
||||
for (uint64_t curr_block=start_block;
|
||||
curr_block <= stop_block;
|
||||
curr_block++) {
|
||||
|
||||
int slot_id;
|
||||
while((slot_id = queue.reserve()) == -1) {
|
||||
@@ -35,13 +38,12 @@ void sf_replay (
|
||||
auto start_time = steady_clock::now();
|
||||
|
||||
auto metadata = queue.get_metadata_buffer(slot_id);
|
||||
auto buffer = queue.get_data_buffer(slot_id);
|
||||
|
||||
file_reader.get_buffer(curr_pulse_id, metadata, buffer);
|
||||
block_reader.get_block(curr_block, metadata);
|
||||
|
||||
auto end_time = steady_clock::now();
|
||||
uint64_t read_us_duration =
|
||||
duration_cast<microseconds>(end_time-start_time).count();
|
||||
uint64_t read_us_duration = duration_cast<microseconds>(
|
||||
end_time-start_time).count();
|
||||
|
||||
queue.commit();
|
||||
|
||||
@@ -75,9 +77,8 @@ int main (int argc, char *argv[]) {
|
||||
const auto start_pulse_id = (uint64_t) atoll(argv[5]);
|
||||
const auto stop_pulse_id = (uint64_t) atoll(argv[6]);
|
||||
|
||||
FastQueue<BufferBinaryBlock> queue(
|
||||
MODULE_N_BYTES * BUFFER_BLOCK_SIZE,
|
||||
REPLAY_FASTQUEUE_N_SLOTS);
|
||||
// 0 bytes for data since everything is in the header.
|
||||
FastQueue<BufferBinaryBlock> queue(0, REPLAY_FASTQUEUE_N_SLOTS);
|
||||
|
||||
thread file_read_thread(sf_replay,
|
||||
device, channel_name, ref(queue),
|
||||
@@ -103,12 +104,11 @@ int main (int argc, char *argv[]) {
|
||||
RB_READ_RETRY_INTERVAL_MS));
|
||||
}
|
||||
|
||||
auto m_buffer = queue.get_metadata_buffer(slot_id);
|
||||
auto f_buffer = queue.get_data_buffer(slot_id);
|
||||
auto block_buffer = queue.get_metadata_buffer(slot_id);
|
||||
|
||||
auto start_time = steady_clock::now();
|
||||
|
||||
sender.send(m_buffer, f_buffer);
|
||||
sender.send(block_buffer);
|
||||
|
||||
auto end_time = steady_clock::now();
|
||||
uint64_t send_us_duration =
|
||||
|
||||
Reference in New Issue
Block a user