Multi threaded cluster finder (#117)

This commit is contained in:
Erik Fröjdh 2025-01-14 21:36:25 +01:00 committed by GitHub
commit e1cc774d6c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 443 additions and 198 deletions

View File

@ -12,28 +12,7 @@ set(SPHINX_BUILD ${CMAKE_CURRENT_BINARY_DIR})
file(GLOB SPHINX_SOURCE_FILES CONFIGURE_DEPENDS "src/*.rst")
# set(SPHINX_SOURCE_FILES
# src/index.rst
# src/Installation.rst
# src/Requirements.rst
# src/NDArray.rst
# src/NDView.rst
# src/File.rst
# src/Frame.rst
# src/Dtype.rst
# src/ClusterFinder.rst
# src/ClusterFile.rst
# src/Pedestal.rst
# src/RawFile.rst
# src/RawSubFile.rst
# src/RawMasterFile.rst
# src/VarClusterFinder.rst
# src/pyVarClusterFinder.rst
# src/pyFile.rst
# src/pyCtbRawFile.rst
# src/pyRawFile.rst
# src/pyRawMasterFile.rst
# )
foreach(filename ${SPHINX_SOURCE_FILES})

View File

@ -0,0 +1,7 @@
ClusterFinderMT
==================
.. doxygenclass:: aare::ClusterFinderMT
:members:
:undoc-members:

View File

@ -30,6 +30,7 @@ AARE
pyFile
pyCtbRawFile
pyClusterFile
pyClusterVector
pyRawFile
pyRawMasterFile
pyVarClusterFinder
@ -45,6 +46,7 @@ AARE
File
Dtype
ClusterFinder
ClusterFinderMT
ClusterFile
ClusterVector
Pedestal

View File

@ -0,0 +1,33 @@
ClusterVector
================
The ClusterVector, holds clusters from the ClusterFinder. Since it is templated
in C++ we use a suffix indicating the data type in python. The suffix is
``_i`` for integer, ``_f`` for float, and ``_d`` for double.
At the moment the functionality from python is limited and it is not supported
to push_back clusters to the vector. The intended use case is to pass it to
C++ functions that support the ClusterVector or to view it as a numpy array.
**View ClusterVector as numpy array**
.. code:: python
from aare import ClusterFile
with ClusterFile("path/to/file") as f:
cluster_vector = f.read_frame()
# Create a copy of the cluster data in a numpy array
clusters = np.array(cluster_vector)
# Avoid copying the data by passing copy=False
clusters = np.array(cluster_vector, copy = False)
.. py:currentmodule:: aare
.. autoclass:: ClusterVector_i
:members:
:undoc-members:
:show-inheritance:
:inherited-members:

View File

@ -33,6 +33,12 @@ typedef enum {
pTopRight = 8
} pixel;
struct Eta2 {
double x;
double y;
corner c;
};
struct ClusterAnalysis {
uint32_t c;
int32_t tot;
@ -74,7 +80,8 @@ int analyze_data(int32_t *data, int32_t *t2, int32_t *t3, char *quad,
int analyze_cluster(Cluster3x3& cl, int32_t *t2, int32_t *t3, char *quad,
double *eta2x, double *eta2y, double *eta3x, double *eta3y);
NDArray<double, 2> calculate_eta2( ClusterVector<int>& clusters);
std::array<double,2> calculate_eta2( Cluster3x3& cl);
Eta2 calculate_eta2( Cluster3x3& cl);
} // namespace aare

View File

@ -47,6 +47,7 @@ class ClusterFinder {
NDArray<PEDESTAL_TYPE, 2> pedestal() { return m_pedestal.mean(); }
NDArray<PEDESTAL_TYPE, 2> noise() { return m_pedestal.std(); }
void clear_pedestal() { m_pedestal.clear(); }
/**
* @brief Move the clusters from the ClusterVector in the ClusterFinder to a

View File

@ -5,9 +5,9 @@
#include <thread>
#include <vector>
#include "aare/ClusterFinder.hpp"
#include "aare/NDArray.hpp"
#include "aare/ProducerConsumerQueue.hpp"
#include "aare/ClusterFinder.hpp"
namespace aare {
@ -22,6 +22,14 @@ struct FrameWrapper {
NDArray<uint16_t, 2> data;
};
/**
* @brief ClusterFinderMT is a multi-threaded version of ClusterFinder. It uses
* a producer-consumer queue to distribute the frames to the threads. The
* clusters are collected in a single output queue.
* @tparam FRAME_TYPE type of the frame data
* @tparam PEDESTAL_TYPE type of the pedestal data
* @tparam CT type of the cluster data
*/
template <typename FRAME_TYPE = uint16_t, typename PEDESTAL_TYPE = double,
typename CT = int32_t>
class ClusterFinderMT {
@ -43,31 +51,28 @@ class ClusterFinderMT {
std::atomic<bool> m_stop_requested{false};
std::atomic<bool> m_processing_threads_stopped{true};
/**
* @brief Function called by the processing threads. It reads the frames
* from the input queue and processes them.
*/
void process(int thread_id) {
auto cf = m_cluster_finders[thread_id].get();
auto q = m_input_queues[thread_id].get();
// TODO! Avoid indexing into the vector every time
fmt::print("Thread {} started\n", thread_id);
// TODO! is this check enough to make sure we process all the frames?
bool realloc_same_capacity = true;
while (!m_stop_requested || !q->isEmpty()) {
if (FrameWrapper *frame = q->frontPtr(); frame != nullptr) {
// fmt::print("Thread {} got frame {}, type: {}\n", thread_id,
// frame->frame_number, static_cast<int>(frame->type));
switch (frame->type) {
case FrameType::DATA:
cf->find_clusters(frame->data.view(), frame->frame_number);
m_output_queues[thread_id]->write(cf->steal_clusters());
m_output_queues[thread_id]->write(cf->steal_clusters(realloc_same_capacity));
break;
case FrameType::PEDESTAL:
m_cluster_finders[thread_id]->push_pedestal_frame(
frame->data.view());
break;
default:
break;
}
// frame is processed now discard it
@ -76,7 +81,6 @@ class ClusterFinderMT {
std::this_thread::sleep_for(m_default_wait);
}
}
fmt::print("Thread {} stopped\n", thread_id);
}
/**
@ -101,11 +105,19 @@ class ClusterFinderMT {
}
public:
/**
* @brief Construct a new ClusterFinderMT object
* @param image_size size of the image
* @param cluster_size size of the cluster
* @param nSigma number of sigma above the pedestal to consider a photon
* @param capacity initial capacity of the cluster vector. Should match
* expected number of clusters in a frame per frame.
* @param n_threads number of threads to use
*/
ClusterFinderMT(Shape<2> image_size, Shape<2> cluster_size,
PEDESTAL_TYPE nSigma = 5.0, size_t capacity = 2000,
size_t n_threads = 3)
: m_n_threads(n_threads) {
fmt::print("ClusterFinderMT: using {} threads\n", n_threads);
for (size_t i = 0; i < n_threads; i++) {
m_cluster_finders.push_back(
std::make_unique<ClusterFinder<FRAME_TYPE, PEDESTAL_TYPE, CT>>(
@ -115,39 +127,48 @@ class ClusterFinderMT {
m_input_queues.emplace_back(std::make_unique<InputQueue>(200));
m_output_queues.emplace_back(std::make_unique<OutputQueue>(200));
}
//TODO! Should we start automatically?
start();
}
/**
* @brief Return the sink queue where all the clusters are collected
* @warning You need to empty this queue otherwise the cluster finder will wait forever
*/
ProducerConsumerQueue<ClusterVector<int>> *sink() { return &m_sink; }
/**
* @brief Start all threads
* @brief Start all processing threads
*/
void start() {
m_processing_threads_stopped = false;
m_stop_requested = false;
for (size_t i = 0; i < m_n_threads; i++) {
m_threads.push_back(
std::thread(&ClusterFinderMT::process, this, i));
}
m_processing_threads_stopped = false;
m_collect_thread = std::thread(&ClusterFinderMT::collect, this);
}
/**
* @brief Stop all threads
* @brief Stop all processing threads
*/
void stop() {
m_stop_requested = true;
for (auto &thread : m_threads) {
thread.join();
}
m_threads.clear();
m_processing_threads_stopped = true;
m_collect_thread.join();
}
/**
* @brief Wait for all the queues to be empty
* @brief Wait for all the queues to be empty. Mostly used for timing tests.
*/
void sync() {
for (auto &q : m_input_queues) {
@ -194,24 +215,47 @@ class ClusterFinderMT {
m_current_thread++;
}
auto pedestal() {
if (m_cluster_finders.empty()) {
throw std::runtime_error("No cluster finders available");
}
if(!m_processing_threads_stopped){
void clear_pedestal() {
if (!m_processing_threads_stopped) {
throw std::runtime_error("ClusterFinderMT is still running");
}
return m_cluster_finders[0]->pedestal();
for (auto &cf : m_cluster_finders) {
cf->clear_pedestal();
}
}
auto noise() {
/**
* @brief Return the pedestal currently used by the cluster finder
* @param thread_index index of the thread
*/
auto pedestal(size_t thread_index = 0) {
if (m_cluster_finders.empty()) {
throw std::runtime_error("No cluster finders available");
}
if(!m_processing_threads_stopped){
if (!m_processing_threads_stopped) {
throw std::runtime_error("ClusterFinderMT is still running");
}
return m_cluster_finders[0]->noise();
if (thread_index >= m_cluster_finders.size()) {
throw std::runtime_error("Thread index out of range");
}
return m_cluster_finders[thread_index]->pedestal();
}
/**
* @brief Return the noise currently used by the cluster finder
* @param thread_index index of the thread
*/
auto noise(size_t thread_index = 0) {
if (m_cluster_finders.empty()) {
throw std::runtime_error("No cluster finders available");
}
if (!m_processing_threads_stopped) {
throw std::runtime_error("ClusterFinderMT is still running");
}
if (thread_index >= m_cluster_finders.size()) {
throw std::runtime_error("Thread index out of range");
}
return m_cluster_finders[thread_index]->noise();
}
// void push(FrameWrapper&& frame) {

View File

@ -9,20 +9,24 @@
namespace aare {
/**
* @brief ClusterVector is a container for clusters of various sizes. It uses a
* contiguous memory buffer to store the clusters.
* @brief ClusterVector is a container for clusters of various sizes. It uses a
* contiguous memory buffer to store the clusters. It is templated on the data
* type and the coordinate type of the clusters.
* @note push_back can invalidate pointers to elements in the container
* @warning ClusterVector is currently move only to catch unintended copies, but
* this might change since there are probably use cases where copying is needed.
* @tparam T data type of the pixels in the cluster
* @tparam CoordType data type of the x and y coordinates of the cluster (normally int16_t)
* @tparam CoordType data type of the x and y coordinates of the cluster
* (normally int16_t)
*/
template <typename T, typename CoordType=int16_t> class ClusterVector {
template <typename T, typename CoordType = int16_t> class ClusterVector {
using value_type = T;
size_t m_cluster_size_x;
size_t m_cluster_size_y;
std::byte *m_data{};
size_t m_size{0};
size_t m_capacity;
uint64_t m_frame_number{0}; //TODO! Check frame number size and type
uint64_t m_frame_number{0}; // TODO! Check frame number size and type
/*
Format string used in the python bindings to create a numpy
array from the buffer
@ -31,7 +35,7 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
d - double
i - int
*/
constexpr static char m_fmt_base[] = "=h:x:\nh:y:\n({},{}){}:data:" ;
constexpr static char m_fmt_base[] = "=h:x:\nh:y:\n({},{}){}:data:";
public:
/**
@ -39,6 +43,8 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
* @param cluster_size_x size of the cluster in x direction
* @param cluster_size_y size of the cluster in y direction
* @param capacity initial capacity of the buffer in number of clusters
* @param frame_number frame number of the clusters. Default is 0, which is
* also used to indicate that the clusters come from many frames
*/
ClusterVector(size_t cluster_size_x = 3, size_t cluster_size_y = 3,
size_t capacity = 1024, uint64_t frame_number = 0)
@ -46,23 +52,22 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
m_capacity(capacity), m_frame_number(frame_number) {
allocate_buffer(capacity);
}
~ClusterVector() {
delete[] m_data;
}
//Move constructor
~ClusterVector() { delete[] m_data; }
// Move constructor
ClusterVector(ClusterVector &&other) noexcept
: m_cluster_size_x(other.m_cluster_size_x),
m_cluster_size_y(other.m_cluster_size_y), m_data(other.m_data),
m_size(other.m_size), m_capacity(other.m_capacity), m_frame_number(other.m_frame_number) {
m_size(other.m_size), m_capacity(other.m_capacity),
m_frame_number(other.m_frame_number) {
other.m_data = nullptr;
other.m_size = 0;
other.m_capacity = 0;
}
//Move assignment operator
ClusterVector& operator=(ClusterVector &&other) noexcept {
// Move assignment operator
ClusterVector &operator=(ClusterVector &&other) noexcept {
if (this != &other) {
delete[] m_data;
m_cluster_size_x = other.m_cluster_size_x;
@ -82,7 +87,8 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
/**
* @brief Reserve space for at least capacity clusters
* @param capacity number of clusters to reserve space for
* @note If capacity is less than the current capacity, the function does nothing.
* @note If capacity is less than the current capacity, the function does
* nothing.
*/
void reserve(size_t capacity) {
if (capacity > m_capacity) {
@ -95,7 +101,8 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
* @param x x-coordinate of the cluster
* @param y y-coordinate of the cluster
* @param data pointer to the data of the cluster
* @warning The data pointer must point to a buffer of size cluster_size_x * cluster_size_y * sizeof(T)
* @warning The data pointer must point to a buffer of size cluster_size_x *
* cluster_size_y * sizeof(T)
*/
void push_back(CoordType x, CoordType y, const std::byte *data) {
if (m_size == m_capacity) {
@ -111,11 +118,12 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
ptr);
m_size++;
}
ClusterVector& operator+=(const ClusterVector& other){
ClusterVector &operator+=(const ClusterVector &other) {
if (m_size + other.m_size > m_capacity) {
allocate_buffer(m_capacity + other.m_size);
}
std::copy(other.m_data, other.m_data + other.m_size * element_offset(), m_data + m_size * element_offset());
std::copy(other.m_data, other.m_data + other.m_size * item_size(),
m_data + m_size * item_size());
m_size += other.m_size;
return *this;
}
@ -126,7 +134,7 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
*/
std::vector<T> sum() {
std::vector<T> sums(m_size);
const size_t stride = element_offset();
const size_t stride = item_size();
const size_t n_pixels = m_cluster_size_x * m_cluster_size_y;
std::byte *ptr = m_data + 2 * sizeof(CoordType); // skip x and y
@ -139,32 +147,41 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
return sums;
}
size_t size() const { return m_size; }
size_t capacity() const { return m_capacity; }
/**
* @brief Return the offset in bytes for a single cluster
* @brief Return the number of clusters in the vector
*/
size_t element_offset() const {
return 2*sizeof(CoordType) +
m_cluster_size_x * m_cluster_size_y * sizeof(T);
}
size_t size() const { return m_size; }
/**
* @brief Return the capacity of the buffer in number of clusters. This is
* the number of clusters that can be stored in the current buffer without reallocation.
*/
size_t capacity() const { return m_capacity; }
/**
* @brief Return the size in bytes of a single cluster
*/
size_t item_size() const { return element_offset(); }
size_t item_size() const {
return 2 * sizeof(CoordType) +
m_cluster_size_x * m_cluster_size_y * sizeof(T);
}
/**
* @brief Return the offset in bytes for the i-th cluster
*/
size_t element_offset(size_t i) const { return element_offset() * i; }
size_t element_offset(size_t i) const { return item_size() * i; }
/**
* @brief Return a pointer to the i-th cluster
*/
std::byte *element_ptr(size_t i) { return m_data + element_offset(i); }
const std::byte * element_ptr(size_t i) const { return m_data + element_offset(i); }
/**
* @brief Return a pointer to the i-th cluster
*/
const std::byte *element_ptr(size_t i) const {
return m_data + element_offset(i);
}
size_t cluster_size_x() const { return m_cluster_size_x; }
size_t cluster_size_y() const { return m_cluster_size_y; }
@ -172,19 +189,37 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
std::byte *data() { return m_data; }
std::byte const *data() const { return m_data; }
template<typename V>
V& at(size_t i) {
return *reinterpret_cast<V*>(element_ptr(i));
/**
* @brief Return a reference to the i-th cluster casted to type V
* @tparam V type of the cluster
*/
template <typename V> V &at(size_t i) {
return *reinterpret_cast<V *>(element_ptr(i));
}
const std::string_view fmt_base() const {
//TODO! how do we match on coord_t?
// TODO! how do we match on coord_t?
return m_fmt_base;
}
/**
* @brief Return the frame number of the clusters. 0 is used to indicate that
* the clusters come from many frames
*/
uint64_t frame_number() const { return m_frame_number; }
void set_frame_number(uint64_t frame_number) { m_frame_number = frame_number; }
void set_frame_number(uint64_t frame_number) {
m_frame_number = frame_number;
}
/**
* @brief Resize the vector to contain new_size clusters. If new_size is greater than the current capacity, a new buffer is allocated.
* If the size is smaller no memory is freed, size is just updated.
* @param new_size new size of the vector
* @warning The additional clusters are not initialized
*/
void resize(size_t new_size) {
//TODO! Should we initialize the new clusters?
if (new_size > m_capacity) {
allocate_buffer(new_size);
}
@ -193,9 +228,9 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
private:
void allocate_buffer(size_t new_capacity) {
size_t num_bytes = element_offset() * new_capacity;
size_t num_bytes = item_size() * new_capacity;
std::byte *new_data = new std::byte[num_bytes]{};
std::copy(m_data, m_data + element_offset() * m_size, new_data);
std::copy(m_data, m_data + item_size() * m_size, new_data);
delete[] m_data;
m_data = new_data;
m_capacity = new_capacity;

View File

@ -89,6 +89,7 @@ template <typename SUM_TYPE = double> class Pedestal {
m_sum = 0;
m_sum2 = 0;
m_cur_samples = 0;
m_mean = 0;
}
@ -97,6 +98,7 @@ template <typename SUM_TYPE = double> class Pedestal {
m_sum(row, col) = 0;
m_sum2(row, col) = 0;
m_cur_samples(row, col) = 0;
m_mean(row, col) = 0;
}
@ -119,7 +121,7 @@ template <typename SUM_TYPE = double> class Pedestal {
/**
* Push but don't update the cached mean. Speeds up the process
* when intitializing the pedestal.
* when initializing the pedestal.
*
*/
template <typename T> void push_no_update(NDView<T, 2> frame) {

View File

@ -8,7 +8,7 @@ from ._aare import DetectorType
from ._aare import ClusterFile
from ._aare import hitmap
from ._aare import ClusterFinderMT, ClusterCollector, ClusterFileSink
from ._aare import ClusterFinderMT, ClusterCollector, ClusterFileSink, ClusterVector_i
from .CtbRawFile import CtbRawFile
from .RawFile import RawFile

View File

@ -14,76 +14,103 @@ base = Path('/mnt/sls_det_storage/matterhorn_data/aare_test_data/')
f = File(base/'Moench03new/cu_half_speed_master_4.json')
for i, frame in enumerate(f):
print(f'{i}', end='\r')
print()
from aare._aare import ClusterFinderMT, ClusterCollector, ClusterFileSink
cf = ClusterFinderMT((400,400), (3,3), n_threads = 3)
# collector = ClusterCollector(cf)
out_file = ClusterFileSink(cf, "test.clust")
# cf = ClusterFinderMT((400,400), (3,3), n_threads = 3)
# # collector = ClusterCollector(cf)
# out_file = ClusterFileSink(cf, "test.clust")
for i in range(1000):
img = f.read_frame()
cf.push_pedestal_frame(img)
print('Pedestal done')
cf.sync()
for i in range(100):
img = f.read_frame()
cf.find_clusters(img)
# time.sleep(1)
cf.stop()
out_file.stop()
print('Done')
cfile = ClusterFile("test.clust")
# cf = ClusterFinder((400,400), (3,3))
# for i in range(1000):
# cf.push_pedestal_frame(f.read_frame())
# img = f.read_frame()
# cf.push_pedestal_frame(img)
# print('Pedestal done')
# cf.sync()
# fig, ax = plt.subplots()
# im = ax.imshow(cf.pedestal())
# cf.pedestal()
# cf.noise()
# for i in range(100):
# img = f.read_frame()
# cf.find_clusters(img)
# # time.sleep(1)
# cf.stop()
# time.sleep(1)
# print('Second run')
# cf.start()
# for i in range(100):
# img = f.read_frame()
# cf.find_clusters(img)
# cf.stop()
# print('Third run')
# cf.start()
# for i in range(129):
# img = f.read_frame()
# cf.find_clusters(img)
# cf.stop()
# out_file.stop()
# print('Done')
# cfile = ClusterFile("test.clust")
# i = 0
# while True:
# try:
# cv = cfile.read_frame()
# i+=1
# except RuntimeError:
# break
# print(f'Read {i} frames')
# N = 500
# t0 = time.perf_counter()
# hist1 = bh.Histogram(bh.axis.Regular(40, -2, 4000))
# f.seek(0)
# t0 = time.perf_counter()
# data = f.read_n(N)
# t_elapsed = time.perf_counter()-t0
# # cf = ClusterFinder((400,400), (3,3))
# # for i in range(1000):
# # cf.push_pedestal_frame(f.read_frame())
# # fig, ax = plt.subplots()
# # im = ax.imshow(cf.pedestal())
# # cf.pedestal()
# # cf.noise()
# n_bytes = data.itemsize*data.size
# print(f'Reading {N} frames took {t_elapsed:.3f}s {N/t_elapsed:.0f} FPS, {n_bytes/1024**2:.4f} GB/s')
# # N = 500
# # t0 = time.perf_counter()
# # hist1 = bh.Histogram(bh.axis.Regular(40, -2, 4000))
# # f.seek(0)
# # t0 = time.perf_counter()
# # data = f.read_n(N)
# # t_elapsed = time.perf_counter()-t0
# for frame in data:
# a = cf.find_clusters(frame)
# # n_bytes = data.itemsize*data.size
# clusters = cf.steal_clusters()
# t_elapsed = time.perf_counter()-t0
# print(f'Clustering {N} frames took {t_elapsed:.2f}s {N/t_elapsed:.0f} FPS')
# # print(f'Reading {N} frames took {t_elapsed:.3f}s {N/t_elapsed:.0f} FPS, {n_bytes/1024**2:.4f} GB/s')
# t0 = time.perf_counter()
# total_clusters = clusters.size
# # for frame in data:
# # a = cf.find_clusters(frame)
# hist1.fill(clusters.sum())
# # clusters = cf.steal_clusters()
# t_elapsed = time.perf_counter()-t0
# print(f'Filling histogram with the sum of {total_clusters} clusters took: {t_elapsed:.3f}s, {total_clusters/t_elapsed:.3g} clust/s')
# print(f'Average number of clusters per frame {total_clusters/N:.3f}')
# # t_elapsed = time.perf_counter()-t0
# # print(f'Clustering {N} frames took {t_elapsed:.2f}s {N/t_elapsed:.0f} FPS')
# # t0 = time.perf_counter()
# # total_clusters = clusters.size
# # hist1.fill(clusters.sum())
# # t_elapsed = time.perf_counter()-t0
# # print(f'Filling histogram with the sum of {total_clusters} clusters took: {t_elapsed:.3f}s, {total_clusters/t_elapsed:.3g} clust/s')
# # print(f'Average number of clusters per frame {total_clusters/N:.3f}')

View File

@ -22,8 +22,7 @@ void define_cluster_vector(py::module &m, const std::string &typestr) {
py::class_<ClusterVector<T>>(m, class_name.c_str(), py::buffer_protocol())
.def(py::init<int, int>())
.def_property_readonly("size", &ClusterVector<T>::size)
.def("element_offset",
py::overload_cast<>(&ClusterVector<T>::element_offset, py::const_))
.def("item_size", &ClusterVector<T>::item_size)
.def_property_readonly("fmt",
[typestr](ClusterVector<T> &self) {
return fmt::format(
@ -40,14 +39,14 @@ void define_cluster_vector(py::module &m, const std::string &typestr) {
&ClusterVector<T>::set_frame_number)
.def_buffer([typestr](ClusterVector<T> &self) -> py::buffer_info {
return py::buffer_info(
self.data(), /* Pointer to buffer */
self.element_offset(), /* Size of one scalar */
self.data(), /* Pointer to buffer */
self.item_size(), /* Size of one scalar */
fmt::format(self.fmt_base(), self.cluster_size_x(),
self.cluster_size_y(),
typestr), /* Format descriptor */
1, /* Number of dimensions */
{self.size()}, /* Buffer dimensions */
{self.element_offset()} /* Strides (in bytes) for each index */
typestr), /* Format descriptor */
1, /* Number of dimensions */
{self.size()}, /* Buffer dimensions */
{self.item_size()} /* Strides (in bytes) for each index */
);
});
}
@ -56,7 +55,7 @@ void define_cluster_finder_mt_bindings(py::module &m) {
py::class_<ClusterFinderMT<uint16_t, pd_type>>(m, "ClusterFinderMT")
.def(py::init<Shape<2>, Shape<2>, pd_type, size_t, size_t>(),
py::arg("image_size"), py::arg("cluster_size"),
py::arg("n_sigma") = 5.0, py::arg("capacity") = 1000,
py::arg("n_sigma") = 5.0, py::arg("capacity") = 2048,
py::arg("n_threads") = 3)
.def("push_pedestal_frame",
[](ClusterFinderMT<uint16_t, pd_type> &self,
@ -73,20 +72,22 @@ void define_cluster_finder_mt_bindings(py::module &m) {
return;
},
py::arg(), py::arg("frame_number") = 0)
.def("clear_pedestal", &ClusterFinderMT<uint16_t, pd_type>::clear_pedestal)
.def("sync", &ClusterFinderMT<uint16_t, pd_type>::sync)
.def("stop", &ClusterFinderMT<uint16_t, pd_type>::stop)
.def_property_readonly("pedestal",
[](ClusterFinderMT<uint16_t, pd_type> &self) {
auto pd = new NDArray<pd_type, 2>{};
*pd = self.pedestal();
return return_image_data(pd);
})
.def_property_readonly("noise",
[](ClusterFinderMT<uint16_t, pd_type> &self) {
auto arr = new NDArray<pd_type, 2>{};
*arr = self.noise();
return return_image_data(arr);
});
.def("start", &ClusterFinderMT<uint16_t, pd_type>::start)
.def("pedestal",
[](ClusterFinderMT<uint16_t, pd_type> &self, size_t thread_index) {
auto pd = new NDArray<pd_type, 2>{};
*pd = self.pedestal(thread_index);
return return_image_data(pd);
},py::arg("thread_index") = 0)
.def("noise",
[](ClusterFinderMT<uint16_t, pd_type> &self, size_t thread_index) {
auto arr = new NDArray<pd_type, 2>{};
*arr = self.noise(thread_index);
return return_image_data(arr);
},py::arg("thread_index") = 0);
}
void define_cluster_collector_bindings(py::module &m) {
@ -121,6 +122,7 @@ void define_cluster_finder_bindings(py::module &m) {
auto view = make_view_2d(frame);
self.push_pedestal_frame(view);
})
.def("clear_pedestal", &ClusterFinder<uint16_t, pd_type>::clear_pedestal)
.def_property_readonly("pedestal",
[](ClusterFinder<uint16_t, pd_type> &self) {
auto pd = new NDArray<pd_type, 2>{};
@ -162,7 +164,7 @@ void define_cluster_finder_bindings(py::module &m) {
for (py::ssize_t j = 0; j < r.shape(1); j++)
r(i, j) = 0;
size_t stride = cv.element_offset();
size_t stride = cv.item_size();
auto ptr = cv.data();
for (size_t i = 0; i < cv.size(); i++) {
auto x = *reinterpret_cast<int16_t *>(ptr);

View File

@ -30,7 +30,7 @@ void define_cluster_file_io_bindings(py::module &m) {
[](ClusterFile &self, size_t n_clusters) {
auto v = new ClusterVector<int32_t>(self.read_clusters(n_clusters));
return v;
})
},py::return_value_policy::take_ownership)
.def("read_frame",
[](ClusterFile &self) {
auto v = new ClusterVector<int32_t>(self.read_frame());

View File

@ -46,7 +46,7 @@ void ClusterFile::write_frame(const ClusterVector<int32_t> &clusters) {
fwrite(&frame_number, sizeof(frame_number), 1, fp);
uint32_t n_clusters = clusters.size();
fwrite(&n_clusters, sizeof(n_clusters), 1, fp);
fwrite(clusters.data(), clusters.element_offset(), clusters.size(), fp);
fwrite(clusters.data(), clusters.item_size(), clusters.size(), fp);
}
ClusterVector<int32_t> ClusterFile::read_clusters(size_t n_clusters) {
@ -72,7 +72,7 @@ ClusterVector<int32_t> ClusterFile::read_clusters(size_t n_clusters) {
} else {
nn = nph;
}
nph_read += fread(reinterpret_cast<void *>(buf + nph_read),
nph_read += fread((buf + nph_read*clusters.item_size()),
clusters.item_size(), nn, fp);
m_num_left = nph - nn; // write back the number of photons left
}
@ -87,7 +87,7 @@ ClusterVector<int32_t> ClusterFile::read_clusters(size_t n_clusters) {
else
nn = nph;
nph_read += fread(reinterpret_cast<void *>(buf + nph_read),
nph_read += fread((buf + nph_read*clusters.item_size()),
clusters.item_size(), nn, fp);
m_num_left = nph - nn;
}
@ -262,19 +262,19 @@ std::vector<Cluster3x3> ClusterFile::read_cluster_with_cut(size_t n_clusters,
NDArray<double, 2> calculate_eta2(ClusterVector<int> &clusters) {
NDArray<double, 2> eta2({static_cast<int64_t>(clusters.size()), 2});
for (size_t i = 0; i < clusters.size(); i++) {
// int32_t t2;
// auto* ptr = reinterpret_cast<int32_t*> (clusters.element_ptr(i) + 2 *
// sizeof(int16_t)); analyze_cluster(clusters.at<Cluster3x3>(i), &t2,
// nullptr, nullptr, &eta2(i,0), &eta2(i,1) , nullptr, nullptr);
auto [x, y] = calculate_eta2(clusters.at<Cluster3x3>(i));
eta2(i, 0) = x;
eta2(i, 1) = y;
auto e = calculate_eta2(clusters.at<Cluster3x3>(i));
eta2(i, 0) = e.x;
eta2(i, 1) = e.y;
}
return eta2;
}
std::array<double, 2> calculate_eta2(Cluster3x3 &cl) {
std::array<double, 2> eta2{};
/**
* @brief Calculate the eta2 values for a 3x3 cluster and return them in a Eta2 struct
* containing etay, etax and the corner of the cluster.
*/
Eta2 calculate_eta2(Cluster3x3 &cl) {
Eta2 eta{};
std::array<int32_t, 4> tot2;
tot2[0] = cl.data[0] + cl.data[1] + cl.data[3] + cl.data[4];
@ -287,39 +287,43 @@ std::array<double, 2> calculate_eta2(Cluster3x3 &cl) {
switch (c) {
case cBottomLeft:
if ((cl.data[3] + cl.data[4]) != 0)
eta2[0] =
eta.x =
static_cast<double>(cl.data[4]) / (cl.data[3] + cl.data[4]);
if ((cl.data[1] + cl.data[4]) != 0)
eta2[1] =
eta.y =
static_cast<double>(cl.data[4]) / (cl.data[1] + cl.data[4]);
eta.c = cBottomLeft;
break;
case cBottomRight:
if ((cl.data[2] + cl.data[5]) != 0)
eta2[0] =
eta.x =
static_cast<double>(cl.data[5]) / (cl.data[4] + cl.data[5]);
if ((cl.data[1] + cl.data[4]) != 0)
eta2[1] =
eta.y =
static_cast<double>(cl.data[4]) / (cl.data[1] + cl.data[4]);
eta.c = cBottomRight;
break;
case cTopLeft:
if ((cl.data[7] + cl.data[4]) != 0)
eta2[0] =
eta.x =
static_cast<double>(cl.data[4]) / (cl.data[3] + cl.data[4]);
if ((cl.data[7] + cl.data[4]) != 0)
eta2[1] =
eta.y =
static_cast<double>(cl.data[7]) / (cl.data[7] + cl.data[4]);
eta.c = cTopLeft;
break;
case cTopRight:
if ((cl.data[5] + cl.data[4]) != 0)
eta2[0] =
eta.x =
static_cast<double>(cl.data[5]) / (cl.data[5] + cl.data[4]);
if ((cl.data[7] + cl.data[4]) != 0)
eta2[1] =
eta.y =
static_cast<double>(cl.data[7]) / (cl.data[7] + cl.data[4]);
eta.c = cTopRight;
break;
// default:;
// no default to allow compiler to warn about missing cases
}
return eta2;
return eta;
}
int analyze_cluster(Cluster3x3 &cl, int32_t *t2, int32_t *t3, char *quad,

View File

@ -6,12 +6,14 @@
using aare::ClusterVector;
struct Cluster_i2x2 {
int16_t x;
int16_t y;
int32_t data[4];
};
TEST_CASE("ClusterVector 2x2 int32_t capacity 4, push back then read") {
struct Cluster_i2x2 {
int16_t x;
int16_t y;
int32_t data[4];
};
ClusterVector<int32_t> cv(2, 2, 4);
REQUIRE(cv.capacity() == 4);
@ -19,7 +21,7 @@ TEST_CASE("ClusterVector 2x2 int32_t capacity 4, push back then read") {
REQUIRE(cv.cluster_size_x() == 2);
REQUIRE(cv.cluster_size_y() == 2);
// int16_t, int16_t, 2x2 int32_t = 20 bytes
REQUIRE(cv.element_offset() == 20);
REQUIRE(cv.item_size() == 20);
//Create a cluster and push back into the vector
Cluster_i2x2 c1 = {1, 2, {3, 4, 5, 6}};
@ -30,7 +32,7 @@ TEST_CASE("ClusterVector 2x2 int32_t capacity 4, push back then read") {
//Read the cluster back out using copy. TODO! Can we improve the API?
Cluster_i2x2 c2;
std::byte *ptr = cv.element_ptr(0);
std::copy(ptr, ptr + cv.element_offset(), reinterpret_cast<std::byte*>(&c2));
std::copy(ptr, ptr + cv.item_size(), reinterpret_cast<std::byte*>(&c2));
//Check that the data is the same
REQUIRE(c1.x == c2.x);
@ -83,8 +85,8 @@ TEST_CASE("Storing floats"){
float data[8];
};
ClusterVector<float> cv(2, 4, 2);
REQUIRE(cv.capacity() == 2);
ClusterVector<float> cv(2, 4, 10);
REQUIRE(cv.capacity() == 10);
REQUIRE(cv.size() == 0);
REQUIRE(cv.cluster_size_x() == 2);
REQUIRE(cv.cluster_size_y() == 4);
@ -92,17 +94,105 @@ TEST_CASE("Storing floats"){
//Create a cluster and push back into the vector
Cluster_f4x2 c1 = {1, 2, {3.0, 4.0, 5.0, 6.0,3.0, 4.0, 5.0, 6.0}};
cv.push_back(c1.x, c1.y, reinterpret_cast<std::byte*>(&c1.data[0]));
REQUIRE(cv.capacity() == 2);
REQUIRE(cv.capacity() == 10);
REQUIRE(cv.size() == 1);
Cluster_f4x2 c2 = {6, 7, {8.0, 9.0, 10.0, 11.0,8.0, 9.0, 10.0, 11.0}};
cv.push_back(c2.x, c2.y, reinterpret_cast<std::byte*>(&c2.data[0]));
REQUIRE(cv.capacity() == 2);
REQUIRE(cv.capacity() == 10);
REQUIRE(cv.size() == 2);
auto sums = cv.sum();
REQUIRE(sums.size() == 2);
REQUIRE_THAT(sums[0], Catch::Matchers::WithinAbs(36.0, 1e-6));
REQUIRE_THAT(sums[1], Catch::Matchers::WithinAbs(76.0, 1e-6));
}
TEST_CASE("Push back more than initial capacity"){
ClusterVector<int32_t> cv(2, 2, 2);
auto initial_data = cv.data();
Cluster_i2x2 c1 = {1, 2, {3, 4, 5, 6}};
cv.push_back(c1.x, c1.y, reinterpret_cast<std::byte*>(&c1.data[0]));
REQUIRE(cv.size() == 1);
REQUIRE(cv.capacity() == 2);
Cluster_i2x2 c2 = {6, 7, {8, 9, 10, 11}};
cv.push_back(c2.x, c2.y, reinterpret_cast<std::byte*>(&c2.data[0]));
REQUIRE(cv.size() == 2);
REQUIRE(cv.capacity() == 2);
Cluster_i2x2 c3 = {11, 12, {13, 14, 15, 16}};
cv.push_back(c3.x, c3.y, reinterpret_cast<std::byte*>(&c3.data[0]));
REQUIRE(cv.size() == 3);
REQUIRE(cv.capacity() == 4);
Cluster_i2x2* ptr = reinterpret_cast<Cluster_i2x2*>(cv.data());
REQUIRE(ptr[0].x == 1);
REQUIRE(ptr[0].y == 2);
REQUIRE(ptr[1].x == 6);
REQUIRE(ptr[1].y == 7);
REQUIRE(ptr[2].x == 11);
REQUIRE(ptr[2].y == 12);
//We should have allocated a new buffer, since we outgrew the initial capacity
REQUIRE(initial_data != cv.data());
}
TEST_CASE("Concatenate two cluster vectors where the first has enough capacity"){
ClusterVector<int32_t> cv1(2, 2, 12);
Cluster_i2x2 c1 = {1, 2, {3, 4, 5, 6}};
cv1.push_back(c1.x, c1.y, reinterpret_cast<std::byte*>(&c1.data[0]));
Cluster_i2x2 c2 = {6, 7, {8, 9, 10, 11}};
cv1.push_back(c2.x, c2.y, reinterpret_cast<std::byte*>(&c2.data[0]));
ClusterVector<int32_t> cv2(2, 2, 2);
Cluster_i2x2 c3 = {11, 12, {13, 14, 15, 16}};
cv2.push_back(c3.x, c3.y, reinterpret_cast<std::byte*>(&c3.data[0]));
Cluster_i2x2 c4 = {16, 17, {18, 19, 20, 21}};
cv2.push_back(c4.x, c4.y, reinterpret_cast<std::byte*>(&c4.data[0]));
cv1 += cv2;
REQUIRE(cv1.size() == 4);
REQUIRE(cv1.capacity() == 12);
Cluster_i2x2* ptr = reinterpret_cast<Cluster_i2x2*>(cv1.data());
REQUIRE(ptr[0].x == 1);
REQUIRE(ptr[0].y == 2);
REQUIRE(ptr[1].x == 6);
REQUIRE(ptr[1].y == 7);
REQUIRE(ptr[2].x == 11);
REQUIRE(ptr[2].y == 12);
REQUIRE(ptr[3].x == 16);
REQUIRE(ptr[3].y == 17);
}
TEST_CASE("Concatenate two cluster vectors where we need to allocate"){
ClusterVector<int32_t> cv1(2, 2, 2);
Cluster_i2x2 c1 = {1, 2, {3, 4, 5, 6}};
cv1.push_back(c1.x, c1.y, reinterpret_cast<std::byte*>(&c1.data[0]));
Cluster_i2x2 c2 = {6, 7, {8, 9, 10, 11}};
cv1.push_back(c2.x, c2.y, reinterpret_cast<std::byte*>(&c2.data[0]));
ClusterVector<int32_t> cv2(2, 2, 2);
Cluster_i2x2 c3 = {11, 12, {13, 14, 15, 16}};
cv2.push_back(c3.x, c3.y, reinterpret_cast<std::byte*>(&c3.data[0]));
Cluster_i2x2 c4 = {16, 17, {18, 19, 20, 21}};
cv2.push_back(c4.x, c4.y, reinterpret_cast<std::byte*>(&c4.data[0]));
cv1 += cv2;
REQUIRE(cv1.size() == 4);
REQUIRE(cv1.capacity() == 4);
Cluster_i2x2* ptr = reinterpret_cast<Cluster_i2x2*>(cv1.data());
REQUIRE(ptr[0].x == 1);
REQUIRE(ptr[0].y == 2);
REQUIRE(ptr[1].x == 6);
REQUIRE(ptr[1].y == 7);
REQUIRE(ptr[2].x == 11);
REQUIRE(ptr[2].y == 12);
REQUIRE(ptr[3].x == 16);
REQUIRE(ptr[3].y == 17);
}

View File

@ -278,6 +278,10 @@ void RawFile::get_frame_into(size_t frame_index, std::byte *frame_buffer, Detect
if (n_subfile_parts != 1) {
for (size_t part_idx = 0; part_idx != n_subfile_parts; ++part_idx) {
auto subfile_id = frame_index / m_master.max_frames_per_file();
if (subfile_id >= subfiles.size()) {
throw std::runtime_error(LOCATION +
" Subfile out of range. Possible missing data.");
}
frame_numbers[part_idx] =
subfiles[subfile_id][part_idx]->frame_number(
frame_index % m_master.max_frames_per_file());
@ -311,6 +315,10 @@ void RawFile::get_frame_into(size_t frame_index, std::byte *frame_buffer, Detect
for (size_t part_idx = 0; part_idx != n_subfile_parts; ++part_idx) {
auto corrected_idx = frame_indices[part_idx];
auto subfile_id = corrected_idx / m_master.max_frames_per_file();
if (subfile_id >= subfiles.size()) {
throw std::runtime_error(LOCATION +
" Subfile out of range. Possible missing data.");
}
// This is where we start writing
auto offset = (m_module_pixel_0[part_idx].y * m_cols +
@ -343,6 +351,10 @@ void RawFile::get_frame_into(size_t frame_index, std::byte *frame_buffer, Detect
auto pos = m_module_pixel_0[part_idx];
auto corrected_idx = frame_indices[part_idx];
auto subfile_id = corrected_idx / m_master.max_frames_per_file();
if (subfile_id >= subfiles.size()) {
throw std::runtime_error(LOCATION +
" Subfile out of range. Possible missing data.");
}
subfiles[subfile_id][part_idx]->seek(corrected_idx % m_master.max_frames_per_file());
subfiles[subfile_id][part_idx]->read_into(part_buffer, header);