added docs for ClusterFinderMT

This commit is contained in:
froejdh_e
2025-01-10 10:22:04 +01:00
parent cc95561eda
commit caf7b4ecdb
12 changed files with 309 additions and 105 deletions

View File

@ -5,9 +5,9 @@
#include <thread>
#include <vector>
#include "aare/ClusterFinder.hpp"
#include "aare/NDArray.hpp"
#include "aare/ProducerConsumerQueue.hpp"
#include "aare/ClusterFinder.hpp"
namespace aare {
@ -22,6 +22,14 @@ struct FrameWrapper {
NDArray<uint16_t, 2> data;
};
/**
* @brief ClusterFinderMT is a multi-threaded version of ClusterFinder. It uses
* a producer-consumer queue to distribute the frames to the threads. The
* clusters are collected in a single output queue.
* @tparam FRAME_TYPE type of the frame data
* @tparam PEDESTAL_TYPE type of the pedestal data
* @tparam CT type of the cluster data
*/
template <typename FRAME_TYPE = uint16_t, typename PEDESTAL_TYPE = double,
typename CT = int32_t>
class ClusterFinderMT {
@ -43,31 +51,28 @@ class ClusterFinderMT {
std::atomic<bool> m_stop_requested{false};
std::atomic<bool> m_processing_threads_stopped{true};
/**
* @brief Function called by the processing threads. It reads the frames
* from the input queue and processes them.
*/
void process(int thread_id) {
auto cf = m_cluster_finders[thread_id].get();
auto q = m_input_queues[thread_id].get();
// TODO! Avoid indexing into the vector every time
fmt::print("Thread {} started\n", thread_id);
// TODO! is this check enough to make sure we process all the frames?
bool realloc_same_capacity = true;
while (!m_stop_requested || !q->isEmpty()) {
if (FrameWrapper *frame = q->frontPtr(); frame != nullptr) {
// fmt::print("Thread {} got frame {}, type: {}\n", thread_id,
// frame->frame_number, static_cast<int>(frame->type));
switch (frame->type) {
case FrameType::DATA:
cf->find_clusters(frame->data.view(), frame->frame_number);
m_output_queues[thread_id]->write(cf->steal_clusters());
m_output_queues[thread_id]->write(cf->steal_clusters(realloc_same_capacity));
break;
case FrameType::PEDESTAL:
m_cluster_finders[thread_id]->push_pedestal_frame(
frame->data.view());
break;
default:
break;
}
// frame is processed now discard it
@ -76,7 +81,6 @@ class ClusterFinderMT {
std::this_thread::sleep_for(m_default_wait);
}
}
fmt::print("Thread {} stopped\n", thread_id);
}
/**
@ -101,11 +105,19 @@ class ClusterFinderMT {
}
public:
/**
* @brief Construct a new ClusterFinderMT object
* @param image_size size of the image
* @param cluster_size size of the cluster
* @param nSigma number of sigma above the pedestal to consider a photon
* @param capacity initial capacity of the cluster vector. Should match
* expected number of clusters in a frame per frame.
* @param n_threads number of threads to use
*/
ClusterFinderMT(Shape<2> image_size, Shape<2> cluster_size,
PEDESTAL_TYPE nSigma = 5.0, size_t capacity = 2000,
size_t n_threads = 3)
: m_n_threads(n_threads) {
fmt::print("ClusterFinderMT: using {} threads\n", n_threads);
for (size_t i = 0; i < n_threads; i++) {
m_cluster_finders.push_back(
std::make_unique<ClusterFinder<FRAME_TYPE, PEDESTAL_TYPE, CT>>(
@ -115,39 +127,48 @@ class ClusterFinderMT {
m_input_queues.emplace_back(std::make_unique<InputQueue>(200));
m_output_queues.emplace_back(std::make_unique<OutputQueue>(200));
}
//TODO! Should we start automatically?
start();
}
/**
* @brief Return the sink queue where all the clusters are collected
* @warning You need to empty this queue otherwise the cluster finder will wait forever
*/
ProducerConsumerQueue<ClusterVector<int>> *sink() { return &m_sink; }
/**
* @brief Start all threads
* @brief Start all processing threads
*/
void start() {
m_processing_threads_stopped = false;
m_stop_requested = false;
for (size_t i = 0; i < m_n_threads; i++) {
m_threads.push_back(
std::thread(&ClusterFinderMT::process, this, i));
}
m_processing_threads_stopped = false;
m_collect_thread = std::thread(&ClusterFinderMT::collect, this);
}
/**
* @brief Stop all threads
* @brief Stop all processing threads
*/
void stop() {
m_stop_requested = true;
for (auto &thread : m_threads) {
thread.join();
}
m_threads.clear();
m_processing_threads_stopped = true;
m_collect_thread.join();
}
/**
* @brief Wait for all the queues to be empty
* @brief Wait for all the queues to be empty. Mostly used for timing tests.
*/
void sync() {
for (auto &q : m_input_queues) {
@ -194,24 +215,38 @@ class ClusterFinderMT {
m_current_thread++;
}
auto pedestal() {
/**
* @brief Return the pedestal currently used by the cluster finder
* @param thread_index index of the thread
*/
auto pedestal(size_t thread_index = 0) {
if (m_cluster_finders.empty()) {
throw std::runtime_error("No cluster finders available");
}
if(!m_processing_threads_stopped){
if (!m_processing_threads_stopped) {
throw std::runtime_error("ClusterFinderMT is still running");
}
return m_cluster_finders[0]->pedestal();
if (thread_index >= m_cluster_finders.size()) {
throw std::runtime_error("Thread index out of range");
}
return m_cluster_finders[thread_index]->pedestal();
}
auto noise() {
/**
* @brief Return the noise currently used by the cluster finder
* @param thread_index index of the thread
*/
auto noise(size_t thread_index = 0) {
if (m_cluster_finders.empty()) {
throw std::runtime_error("No cluster finders available");
}
if(!m_processing_threads_stopped){
if (!m_processing_threads_stopped) {
throw std::runtime_error("ClusterFinderMT is still running");
}
return m_cluster_finders[0]->noise();
if (thread_index >= m_cluster_finders.size()) {
throw std::runtime_error("Thread index out of range");
}
return m_cluster_finders[thread_index]->noise();
}
// void push(FrameWrapper&& frame) {

View File

@ -9,20 +9,24 @@
namespace aare {
/**
* @brief ClusterVector is a container for clusters of various sizes. It uses a
* contiguous memory buffer to store the clusters.
* @brief ClusterVector is a container for clusters of various sizes. It uses a
* contiguous memory buffer to store the clusters. It is templated on the data
* type and the coordinate type of the clusters.
* @note push_back can invalidate pointers to elements in the container
* @warning ClusterVector is currently move only to catch unintended copies, but
* this might change since there are probably use cases where copying is needed.
* @tparam T data type of the pixels in the cluster
* @tparam CoordType data type of the x and y coordinates of the cluster (normally int16_t)
* @tparam CoordType data type of the x and y coordinates of the cluster
* (normally int16_t)
*/
template <typename T, typename CoordType=int16_t> class ClusterVector {
template <typename T, typename CoordType = int16_t> class ClusterVector {
using value_type = T;
size_t m_cluster_size_x;
size_t m_cluster_size_y;
std::byte *m_data{};
size_t m_size{0};
size_t m_capacity;
uint64_t m_frame_number{0}; //TODO! Check frame number size and type
uint64_t m_frame_number{0}; // TODO! Check frame number size and type
/*
Format string used in the python bindings to create a numpy
array from the buffer
@ -31,7 +35,7 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
d - double
i - int
*/
constexpr static char m_fmt_base[] = "=h:x:\nh:y:\n({},{}){}:data:" ;
constexpr static char m_fmt_base[] = "=h:x:\nh:y:\n({},{}){}:data:";
public:
/**
@ -39,6 +43,8 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
* @param cluster_size_x size of the cluster in x direction
* @param cluster_size_y size of the cluster in y direction
* @param capacity initial capacity of the buffer in number of clusters
* @param frame_number frame number of the clusters. Default is 0, which is
* also used to indicate that the clusters come from many frames
*/
ClusterVector(size_t cluster_size_x = 3, size_t cluster_size_y = 3,
size_t capacity = 1024, uint64_t frame_number = 0)
@ -46,23 +52,22 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
m_capacity(capacity), m_frame_number(frame_number) {
allocate_buffer(capacity);
}
~ClusterVector() {
delete[] m_data;
}
//Move constructor
~ClusterVector() { delete[] m_data; }
// Move constructor
ClusterVector(ClusterVector &&other) noexcept
: m_cluster_size_x(other.m_cluster_size_x),
m_cluster_size_y(other.m_cluster_size_y), m_data(other.m_data),
m_size(other.m_size), m_capacity(other.m_capacity), m_frame_number(other.m_frame_number) {
m_size(other.m_size), m_capacity(other.m_capacity),
m_frame_number(other.m_frame_number) {
other.m_data = nullptr;
other.m_size = 0;
other.m_capacity = 0;
}
//Move assignment operator
ClusterVector& operator=(ClusterVector &&other) noexcept {
// Move assignment operator
ClusterVector &operator=(ClusterVector &&other) noexcept {
if (this != &other) {
delete[] m_data;
m_cluster_size_x = other.m_cluster_size_x;
@ -82,7 +87,8 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
/**
* @brief Reserve space for at least capacity clusters
* @param capacity number of clusters to reserve space for
* @note If capacity is less than the current capacity, the function does nothing.
* @note If capacity is less than the current capacity, the function does
* nothing.
*/
void reserve(size_t capacity) {
if (capacity > m_capacity) {
@ -95,7 +101,8 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
* @param x x-coordinate of the cluster
* @param y y-coordinate of the cluster
* @param data pointer to the data of the cluster
* @warning The data pointer must point to a buffer of size cluster_size_x * cluster_size_y * sizeof(T)
* @warning The data pointer must point to a buffer of size cluster_size_x *
* cluster_size_y * sizeof(T)
*/
void push_back(CoordType x, CoordType y, const std::byte *data) {
if (m_size == m_capacity) {
@ -111,11 +118,12 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
ptr);
m_size++;
}
ClusterVector& operator+=(const ClusterVector& other){
ClusterVector &operator+=(const ClusterVector &other) {
if (m_size + other.m_size > m_capacity) {
allocate_buffer(m_capacity + other.m_size);
}
std::copy(other.m_data, other.m_data + other.m_size * element_offset(), m_data + m_size * element_offset());
std::copy(other.m_data, other.m_data + other.m_size * item_size(),
m_data + m_size * item_size());
m_size += other.m_size;
return *this;
}
@ -126,7 +134,7 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
*/
std::vector<T> sum() {
std::vector<T> sums(m_size);
const size_t stride = element_offset();
const size_t stride = item_size();
const size_t n_pixels = m_cluster_size_x * m_cluster_size_y;
std::byte *ptr = m_data + 2 * sizeof(CoordType); // skip x and y
@ -139,32 +147,41 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
return sums;
}
size_t size() const { return m_size; }
size_t capacity() const { return m_capacity; }
/**
* @brief Return the offset in bytes for a single cluster
* @brief Return the number of clusters in the vector
*/
size_t element_offset() const {
return 2*sizeof(CoordType) +
m_cluster_size_x * m_cluster_size_y * sizeof(T);
}
size_t size() const { return m_size; }
/**
* @brief Return the capacity of the buffer in number of clusters. This is
* the number of clusters that can be stored in the current buffer without reallocation.
*/
size_t capacity() const { return m_capacity; }
/**
* @brief Return the size in bytes of a single cluster
*/
size_t item_size() const { return element_offset(); }
size_t item_size() const {
return 2 * sizeof(CoordType) +
m_cluster_size_x * m_cluster_size_y * sizeof(T);
}
/**
* @brief Return the offset in bytes for the i-th cluster
*/
size_t element_offset(size_t i) const { return element_offset() * i; }
size_t element_offset(size_t i) const { return item_size() * i; }
/**
* @brief Return a pointer to the i-th cluster
*/
std::byte *element_ptr(size_t i) { return m_data + element_offset(i); }
const std::byte * element_ptr(size_t i) const { return m_data + element_offset(i); }
/**
* @brief Return a pointer to the i-th cluster
*/
const std::byte *element_ptr(size_t i) const {
return m_data + element_offset(i);
}
size_t cluster_size_x() const { return m_cluster_size_x; }
size_t cluster_size_y() const { return m_cluster_size_y; }
@ -172,19 +189,37 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
std::byte *data() { return m_data; }
std::byte const *data() const { return m_data; }
template<typename V>
V& at(size_t i) {
return *reinterpret_cast<V*>(element_ptr(i));
/**
* @brief Return a reference to the i-th cluster casted to type V
* @tparam V type of the cluster
*/
template <typename V> V &at(size_t i) {
return *reinterpret_cast<V *>(element_ptr(i));
}
const std::string_view fmt_base() const {
//TODO! how do we match on coord_t?
// TODO! how do we match on coord_t?
return m_fmt_base;
}
/**
* @brief Return the frame number of the clusters. 0 is used to indicate that
* the clusters come from many frames
*/
uint64_t frame_number() const { return m_frame_number; }
void set_frame_number(uint64_t frame_number) { m_frame_number = frame_number; }
void set_frame_number(uint64_t frame_number) {
m_frame_number = frame_number;
}
/**
* @brief Resize the vector to contain new_size clusters. If new_size is greater than the current capacity, a new buffer is allocated.
* If the size is smaller no memory is freed, size is just updated.
* @param new_size new size of the vector
* @warning The additional clusters are not initialized
*/
void resize(size_t new_size) {
//TODO! Should we initialize the new clusters?
if (new_size > m_capacity) {
allocate_buffer(new_size);
}
@ -193,9 +228,9 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
private:
void allocate_buffer(size_t new_capacity) {
size_t num_bytes = element_offset() * new_capacity;
size_t num_bytes = item_size() * new_capacity;
std::byte *new_data = new std::byte[num_bytes]{};
std::copy(m_data, m_data + element_offset() * m_size, new_data);
std::copy(m_data, m_data + item_size() * m_size, new_data);
delete[] m_data;
m_data = new_data;
m_capacity = new_capacity;