mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-04-29 14:32:23 +02:00
Improve ImageAssembler workflow
This commit is contained in:
+27
-17
@@ -1,5 +1,4 @@
|
||||
#include <iostream>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
@@ -7,12 +6,11 @@
|
||||
|
||||
#include "date.h"
|
||||
#include "zmq.h"
|
||||
#include "jungfrau.hpp"
|
||||
#include "buffer_config.hpp"
|
||||
#include "bitshuffle/bitshuffle.h"
|
||||
#include "WriterZmqReceiver.hpp"
|
||||
#include "JFH5Writer.hpp"
|
||||
#include "BufferBinaryReader.hpp"
|
||||
#include "ImageAssembler.hpp"
|
||||
|
||||
using namespace std;
|
||||
using namespace core_buffer;
|
||||
@@ -21,12 +19,21 @@ using namespace chrono;
|
||||
void read_buffer(
|
||||
const string device,
|
||||
const string channel_name,
|
||||
const vector<uint64_t>& buffer_blocks)
|
||||
const int i_module,
|
||||
const vector<uint64_t>& buffer_blocks,
|
||||
ImageAssembler& image_assembler)
|
||||
{
|
||||
BufferBinaryReader block_reader(device, channel_name);
|
||||
auto block_buffer = new BufferBinaryBlock();
|
||||
|
||||
for (uint64_t block_number:buffer_blocks) {
|
||||
|
||||
int slot_id;
|
||||
while((slot_id = image_assembler.get_free_slot()) == -1) {
|
||||
this_thread::sleep_for(chrono::milliseconds(
|
||||
RB_READ_RETRY_INTERVAL_MS));
|
||||
}
|
||||
|
||||
auto start_time = steady_clock::now();
|
||||
|
||||
block_reader.get_block(block_number, block_buffer);
|
||||
@@ -37,16 +44,15 @@ void read_buffer(
|
||||
|
||||
start_time = steady_clock::now();
|
||||
|
||||
// TODO: Send to composition.
|
||||
image_assembler.process(slot_id, i_module, block_buffer);
|
||||
|
||||
end_time = steady_clock::now();
|
||||
uint64_t compose_us_duration = duration_cast<microseconds>(
|
||||
end_time-start_time).count();
|
||||
|
||||
// TODO: Proper statistics
|
||||
cout << "sf_replay:avg_read_us ";
|
||||
cout << read_us_duration / BUFFER_BLOCK_SIZE << endl;
|
||||
cout << "sf_replay:avg_compose_us ";
|
||||
cout << "sf_replay:avg_assemble_us ";
|
||||
cout << compose_us_duration / BUFFER_BLOCK_SIZE << endl;
|
||||
}
|
||||
|
||||
@@ -79,6 +85,8 @@ int main (int argc, char *argv[])
|
||||
uint64_t stop_block = stop_pulse_id / BUFFER_BLOCK_SIZE;
|
||||
auto n_blocks = stop_block - start_block + 1;
|
||||
|
||||
ImageAssembler image_assembler;
|
||||
|
||||
// Generate list of buffer blocks that need to be loaded.
|
||||
std::vector<uint64_t> buffer_blocks(n_blocks);
|
||||
for (uint64_t curr_block=start_block;
|
||||
@@ -91,24 +99,26 @@ int main (int argc, char *argv[])
|
||||
for (size_t i_module=0; i_module<n_modules; i_module++) {
|
||||
string channel_name = "M" + to_string(i_module);
|
||||
reading_threads.emplace_back(
|
||||
read_buffer, device, channel_name, ref(buffer_blocks));
|
||||
read_buffer,
|
||||
device,
|
||||
channel_name,
|
||||
i_module,
|
||||
ref(buffer_blocks),
|
||||
ref(image_assembler));
|
||||
}
|
||||
|
||||
size_t n_frames = stop_pulse_id - start_pulse_id + 1;
|
||||
JFH5Writer writer(output_file, n_frames, n_modules);
|
||||
JFH5Writer writer(output_file, start_pulse_id, stop_pulse_id, n_modules);
|
||||
|
||||
auto current_pulse_id = start_pulse_id;
|
||||
// "<= stop_pulse_id" because we include the last pulse_id.
|
||||
while (current_pulse_id <= stop_pulse_id) {
|
||||
for (uint64_t block_number:buffer_blocks) {
|
||||
|
||||
int slot_id;
|
||||
while((slot_id = queue.read()) == -1) {
|
||||
while((slot_id = image_assembler.get_full_slot()) == -1) {
|
||||
this_thread::sleep_for(chrono::milliseconds(
|
||||
RB_READ_RETRY_INTERVAL_MS));
|
||||
}
|
||||
|
||||
auto metadata = queue.get_metadata_buffer(slot_id);
|
||||
auto data = queue.get_data_buffer(slot_id);
|
||||
auto metadata = image_assembler.get_metadata_buffer(slot_id);
|
||||
auto data = image_assembler.get_data_buffer(slot_id);
|
||||
|
||||
auto start_time = steady_clock::now();
|
||||
|
||||
@@ -118,7 +128,7 @@ int main (int argc, char *argv[])
|
||||
auto write_us_duration = chrono::duration_cast<chrono::microseconds>(
|
||||
end_time-start_time).count();
|
||||
|
||||
queue.release();
|
||||
image_assembler.free_slot(slot_id);
|
||||
|
||||
cout << "sf_writer:avg_write_us ";
|
||||
cout << write_us_duration / BUFFER_BLOCK_SIZE << endl;
|
||||
|
||||
Reference in New Issue
Block a user