Multi threaded cluster finder. (#115)

Added a prototype for the multi threaded cluster finder including python
bindings
This commit is contained in:
Erik Fröjdh 2025-01-09 16:55:35 +01:00 committed by GitHub
commit 72d10b7735
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 861 additions and 77 deletions

View File

@ -1,6 +1,6 @@
package: package:
name: aare name: aare
version: 2025.1.7.dev0 #TODO! how to not duplicate this? version: 2025.1.9.dev0 #TODO! how to not duplicate this?
source: source:

View File

@ -0,0 +1,97 @@
#pragma once
#include <chrono>
#include <fmt/color.h>
#include <fmt/format.h>
#include <memory>
#include <thread>
#include "aare/ProducerConsumerQueue.hpp"
namespace aare {
template <class ItemType> class CircularFifo {
uint32_t fifo_size;
aare::ProducerConsumerQueue<ItemType> free_slots;
aare::ProducerConsumerQueue<ItemType> filled_slots;
public:
CircularFifo() : CircularFifo(100){};
CircularFifo(uint32_t size) : fifo_size(size), free_slots(size + 1), filled_slots(size + 1) {
// TODO! how do we deal with alignment for writing? alignas???
// Do we give the user a chance to provide memory locations?
// Templated allocator?
for (size_t i = 0; i < fifo_size; ++i) {
free_slots.write(ItemType{});
}
}
bool next() {
// TODO! avoid default constructing ItemType
ItemType it;
if (!filled_slots.read(it))
return false;
if (!free_slots.write(std::move(it)))
return false;
return true;
}
~CircularFifo() {}
using value_type = ItemType;
auto numFilledSlots() const noexcept { return filled_slots.sizeGuess(); }
auto numFreeSlots() const noexcept { return free_slots.sizeGuess(); }
auto isFull() const noexcept { return filled_slots.isFull(); }
ItemType pop_free() {
ItemType v;
while (!free_slots.read(v))
;
return std::move(v);
// return v;
}
bool try_pop_free(ItemType &v) { return free_slots.read(v); }
ItemType pop_value(std::chrono::nanoseconds wait, std::atomic<bool> &stopped) {
ItemType v;
while (!filled_slots.read(v) && !stopped) {
std::this_thread::sleep_for(wait);
}
return std::move(v);
}
ItemType pop_value() {
ItemType v;
while (!filled_slots.read(v))
;
return std::move(v);
}
ItemType *frontPtr() { return filled_slots.frontPtr(); }
// TODO! Add function to move item from filled to free to be used
// with the frontPtr function
template <class... Args> void push_value(Args &&...recordArgs) {
while (!filled_slots.write(std::forward<Args>(recordArgs)...))
;
}
template <class... Args> bool try_push_value(Args &&...recordArgs) {
return filled_slots.write(std::forward<Args>(recordArgs)...);
}
template <class... Args> void push_free(Args &&...recordArgs) {
while (!free_slots.write(std::forward<Args>(recordArgs)...))
;
}
template <class... Args> bool try_push_free(Args &&...recordArgs) {
return free_slots.write(std::forward<Args>(recordArgs)...);
}
};
} // namespace aare

View File

@ -0,0 +1,52 @@
#pragma once
#include <atomic>
#include <thread>
#include "aare/ProducerConsumerQueue.hpp"
#include "aare/ClusterVector.hpp"
#include "aare/ClusterFinderMT.hpp"
namespace aare {
class ClusterCollector{
ProducerConsumerQueue<ClusterVector<int>>* m_source;
std::atomic<bool> m_stop_requested{false};
std::atomic<bool> m_stopped{true};
std::chrono::milliseconds m_default_wait{1};
std::thread m_thread;
std::vector<ClusterVector<int>> m_clusters;
void process(){
m_stopped = false;
fmt::print("ClusterCollector started\n");
while (!m_stop_requested || !m_source->isEmpty()) {
if (ClusterVector<int> *clusters = m_source->frontPtr();
clusters != nullptr) {
m_clusters.push_back(std::move(*clusters));
m_source->popFront();
}else{
std::this_thread::sleep_for(m_default_wait);
}
}
fmt::print("ClusterCollector stopped\n");
m_stopped = true;
}
public:
ClusterCollector(ClusterFinderMT<uint16_t, double, int32_t>* source){
m_source = source->sink();
m_thread = std::thread(&ClusterCollector::process, this);
}
void stop(){
m_stop_requested = true;
m_thread.join();
}
std::vector<ClusterVector<int>> steal_clusters(){
if(!m_stopped){
throw std::runtime_error("ClusterCollector is still running");
}
return std::move(m_clusters);
}
};
} // namespace aare

View File

@ -59,10 +59,9 @@ class ClusterFile {
ClusterFile(const std::filesystem::path &fname, size_t chunk_size = 1000, ClusterFile(const std::filesystem::path &fname, size_t chunk_size = 1000,
const std::string &mode = "r"); const std::string &mode = "r");
~ClusterFile(); ~ClusterFile();
std::vector<Cluster3x3> read_clusters(size_t n_clusters); ClusterVector<int32_t> read_clusters(size_t n_clusters);
std::vector<Cluster3x3> read_frame(int32_t &out_fnum); ClusterVector<int32_t> read_frame();
void write_frame(int32_t frame_number, void write_frame(const ClusterVector<int32_t> &clusters);
const ClusterVector<int32_t> &clusters);
std::vector<Cluster3x3> std::vector<Cluster3x3>
read_cluster_with_cut(size_t n_clusters, double *noise_map, int nx, int ny); read_cluster_with_cut(size_t n_clusters, double *noise_map, int nx, int ny);

View File

@ -0,0 +1,56 @@
#pragma once
#include <atomic>
#include <filesystem>
#include <thread>
#include "aare/ProducerConsumerQueue.hpp"
#include "aare/ClusterVector.hpp"
#include "aare/ClusterFinderMT.hpp"
namespace aare{
class ClusterFileSink{
ProducerConsumerQueue<ClusterVector<int>>* m_source;
std::atomic<bool> m_stop_requested{false};
std::atomic<bool> m_stopped{true};
std::chrono::milliseconds m_default_wait{1};
std::thread m_thread;
std::ofstream m_file;
void process(){
m_stopped = false;
fmt::print("ClusterFileSink started\n");
while (!m_stop_requested || !m_source->isEmpty()) {
if (ClusterVector<int> *clusters = m_source->frontPtr();
clusters != nullptr) {
// Write clusters to file
int32_t frame_number = clusters->frame_number(); //TODO! Should we store frame number already as int?
uint32_t num_clusters = clusters->size();
m_file.write(reinterpret_cast<const char*>(&frame_number), sizeof(frame_number));
m_file.write(reinterpret_cast<const char*>(&num_clusters), sizeof(num_clusters));
m_file.write(reinterpret_cast<const char*>(clusters->data()), clusters->size() * clusters->item_size());
m_source->popFront();
}else{
std::this_thread::sleep_for(m_default_wait);
}
}
fmt::print("ClusterFileSink stopped\n");
m_stopped = true;
}
public:
ClusterFileSink(ClusterFinderMT<uint16_t, double, int32_t>* source, const std::filesystem::path& fname){
m_source = source->sink();
m_thread = std::thread(&ClusterFileSink::process, this);
m_file.open(fname, std::ios::binary);
}
void stop(){
m_stop_requested = true;
m_thread.join();
m_file.close();
}
};
} // namespace aare

View File

@ -10,26 +10,12 @@
namespace aare { namespace aare {
/** enum to define the event types */
enum class eventType {
PEDESTAL, /** pedestal */
NEIGHBOUR, /** neighbour i.e. below threshold, but in the cluster of a
photon */
PHOTON, /** photon i.e. above threshold */
PHOTON_MAX, /** maximum of a cluster satisfying the photon conditions */
NEGATIVE_PEDESTAL, /** negative value, will not be accounted for as pedestal
in order to avoid drift of the pedestal towards
negative values */
UNDEFINED_EVENT = -1 /** undefined */
};
template <typename FRAME_TYPE = uint16_t, typename PEDESTAL_TYPE = double, template <typename FRAME_TYPE = uint16_t, typename PEDESTAL_TYPE = double,
typename CT = int32_t> typename CT = int32_t>
class ClusterFinder { class ClusterFinder {
Shape<2> m_image_size; Shape<2> m_image_size;
const int m_cluster_sizeX; const int m_cluster_sizeX;
const int m_cluster_sizeY; const int m_cluster_sizeY;
// const PEDESTAL_TYPE m_threshold;
const PEDESTAL_TYPE m_nSigma; const PEDESTAL_TYPE m_nSigma;
const PEDESTAL_TYPE c2; const PEDESTAL_TYPE c2;
const PEDESTAL_TYPE c3; const PEDESTAL_TYPE c3;
@ -78,13 +64,13 @@ class ClusterFinder {
m_clusters = ClusterVector<CT>(m_cluster_sizeX, m_cluster_sizeY); m_clusters = ClusterVector<CT>(m_cluster_sizeX, m_cluster_sizeY);
return tmp; return tmp;
} }
void find_clusters(NDView<FRAME_TYPE, 2> frame) { void find_clusters(NDView<FRAME_TYPE, 2> frame, uint64_t frame_number = 0) {
// // TODO! deal with even size clusters // // TODO! deal with even size clusters
// // currently 3,3 -> +/- 1 // // currently 3,3 -> +/- 1
// // 4,4 -> +/- 2 // // 4,4 -> +/- 2
int dy = m_cluster_sizeY / 2; int dy = m_cluster_sizeY / 2;
int dx = m_cluster_sizeX / 2; int dx = m_cluster_sizeX / 2;
m_clusters.set_frame_number(frame_number);
std::vector<CT> cluster_data(m_cluster_sizeX * m_cluster_sizeY); std::vector<CT> cluster_data(m_cluster_sizeX * m_cluster_sizeY);
for (int iy = 0; iy < frame.shape(0); iy++) { for (int iy = 0; iy < frame.shape(0); iy++) {
for (int ix = 0; ix < frame.shape(1); ix++) { for (int ix = 0; ix < frame.shape(1); ix++) {

View File

@ -0,0 +1,224 @@
#pragma once
#include <atomic>
#include <cstdint>
#include <memory>
#include <thread>
#include <vector>
#include "aare/NDArray.hpp"
#include "aare/ProducerConsumerQueue.hpp"
#include "aare/ClusterFinder.hpp"
namespace aare {
enum class FrameType {
DATA,
PEDESTAL,
};
struct FrameWrapper {
FrameType type;
uint64_t frame_number;
NDArray<uint16_t, 2> data;
};
template <typename FRAME_TYPE = uint16_t, typename PEDESTAL_TYPE = double,
typename CT = int32_t>
class ClusterFinderMT {
size_t m_current_thread{0};
size_t m_n_threads{0};
using Finder = ClusterFinder<FRAME_TYPE, PEDESTAL_TYPE, CT>;
using InputQueue = ProducerConsumerQueue<FrameWrapper>;
using OutputQueue = ProducerConsumerQueue<ClusterVector<int>>;
std::vector<std::unique_ptr<InputQueue>> m_input_queues;
std::vector<std::unique_ptr<OutputQueue>> m_output_queues;
OutputQueue m_sink{1000}; // All clusters go into this queue
std::vector<std::unique_ptr<Finder>> m_cluster_finders;
std::vector<std::thread> m_threads;
std::thread m_collect_thread;
std::chrono::milliseconds m_default_wait{1};
std::atomic<bool> m_stop_requested{false};
std::atomic<bool> m_processing_threads_stopped{true};
void process(int thread_id) {
auto cf = m_cluster_finders[thread_id].get();
auto q = m_input_queues[thread_id].get();
// TODO! Avoid indexing into the vector every time
fmt::print("Thread {} started\n", thread_id);
// TODO! is this check enough to make sure we process all the frames?
while (!m_stop_requested || !q->isEmpty()) {
if (FrameWrapper *frame = q->frontPtr(); frame != nullptr) {
// fmt::print("Thread {} got frame {}, type: {}\n", thread_id,
// frame->frame_number, static_cast<int>(frame->type));
switch (frame->type) {
case FrameType::DATA:
cf->find_clusters(frame->data.view(), frame->frame_number);
m_output_queues[thread_id]->write(cf->steal_clusters());
break;
case FrameType::PEDESTAL:
m_cluster_finders[thread_id]->push_pedestal_frame(
frame->data.view());
break;
default:
break;
}
// frame is processed now discard it
m_input_queues[thread_id]->popFront();
} else {
std::this_thread::sleep_for(m_default_wait);
}
}
fmt::print("Thread {} stopped\n", thread_id);
}
/**
* @brief Collect all the clusters from the output queues and write them to
* the sink
*/
void collect() {
bool empty = true;
while (!m_stop_requested || !empty || !m_processing_threads_stopped) {
empty = true;
for (auto &queue : m_output_queues) {
if (!queue->isEmpty()) {
while (!m_sink.write(std::move(*queue->frontPtr()))) {
std::this_thread::sleep_for(m_default_wait);
}
queue->popFront();
empty = false;
}
}
}
}
public:
ClusterFinderMT(Shape<2> image_size, Shape<2> cluster_size,
PEDESTAL_TYPE nSigma = 5.0, size_t capacity = 2000,
size_t n_threads = 3)
: m_n_threads(n_threads) {
fmt::print("ClusterFinderMT: using {} threads\n", n_threads);
for (size_t i = 0; i < n_threads; i++) {
m_cluster_finders.push_back(
std::make_unique<ClusterFinder<FRAME_TYPE, PEDESTAL_TYPE, CT>>(
image_size, cluster_size, nSigma, capacity));
}
for (size_t i = 0; i < n_threads; i++) {
m_input_queues.emplace_back(std::make_unique<InputQueue>(200));
m_output_queues.emplace_back(std::make_unique<OutputQueue>(200));
}
start();
}
ProducerConsumerQueue<ClusterVector<int>> *sink() { return &m_sink; }
/**
* @brief Start all threads
*/
void start() {
for (size_t i = 0; i < m_n_threads; i++) {
m_threads.push_back(
std::thread(&ClusterFinderMT::process, this, i));
}
m_processing_threads_stopped = false;
m_collect_thread = std::thread(&ClusterFinderMT::collect, this);
}
/**
* @brief Stop all threads
*/
void stop() {
m_stop_requested = true;
for (auto &thread : m_threads) {
thread.join();
}
m_processing_threads_stopped = true;
m_collect_thread.join();
}
/**
* @brief Wait for all the queues to be empty
*/
void sync() {
for (auto &q : m_input_queues) {
while (!q->isEmpty()) {
std::this_thread::sleep_for(m_default_wait);
}
}
for (auto &q : m_output_queues) {
while (!q->isEmpty()) {
std::this_thread::sleep_for(m_default_wait);
}
}
while (!m_sink.isEmpty()) {
std::this_thread::sleep_for(m_default_wait);
}
}
/**
* @brief Push a pedestal frame to all the cluster finders. The frames is
* expected to be dark. No photon finding is done. Just pedestal update.
*/
void push_pedestal_frame(NDView<FRAME_TYPE, 2> frame) {
FrameWrapper fw{FrameType::PEDESTAL, 0,
NDArray(frame)}; // TODO! copies the data!
for (auto &queue : m_input_queues) {
while (!queue->write(fw)) {
std::this_thread::sleep_for(m_default_wait);
}
}
}
/**
* @brief Push the frame to the queue of the next available thread. Function
* returns once the frame is in a queue.
* @note Spin locks with a default wait if the queue is full.
*/
void find_clusters(NDView<FRAME_TYPE, 2> frame, uint64_t frame_number = 0) {
FrameWrapper fw{FrameType::DATA, frame_number,
NDArray(frame)}; // TODO! copies the data!
while (!m_input_queues[m_current_thread % m_n_threads]->write(fw)) {
std::this_thread::sleep_for(m_default_wait);
}
m_current_thread++;
}
auto pedestal() {
if (m_cluster_finders.empty()) {
throw std::runtime_error("No cluster finders available");
}
if(!m_processing_threads_stopped){
throw std::runtime_error("ClusterFinderMT is still running");
}
return m_cluster_finders[0]->pedestal();
}
auto noise() {
if (m_cluster_finders.empty()) {
throw std::runtime_error("No cluster finders available");
}
if(!m_processing_threads_stopped){
throw std::runtime_error("ClusterFinderMT is still running");
}
return m_cluster_finders[0]->noise();
}
// void push(FrameWrapper&& frame) {
// //TODO! need to loop until we are successful
// auto rc = m_input_queue.write(std::move(frame));
// fmt::print("pushed frame {}\n", rc);
// }
};
} // namespace aare

View File

@ -22,6 +22,7 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
std::byte *m_data{}; std::byte *m_data{};
size_t m_size{0}; size_t m_size{0};
size_t m_capacity; size_t m_capacity;
uint64_t m_frame_number{0}; //TODO! Check frame number size and type
/* /*
Format string used in the python bindings to create a numpy Format string used in the python bindings to create a numpy
array from the buffer array from the buffer
@ -39,10 +40,10 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
* @param cluster_size_y size of the cluster in y direction * @param cluster_size_y size of the cluster in y direction
* @param capacity initial capacity of the buffer in number of clusters * @param capacity initial capacity of the buffer in number of clusters
*/ */
ClusterVector(size_t cluster_size_x, size_t cluster_size_y, ClusterVector(size_t cluster_size_x = 3, size_t cluster_size_y = 3,
size_t capacity = 1024) size_t capacity = 1024, uint64_t frame_number = 0)
: m_cluster_size_x(cluster_size_x), m_cluster_size_y(cluster_size_y), : m_cluster_size_x(cluster_size_x), m_cluster_size_y(cluster_size_y),
m_capacity(capacity) { m_capacity(capacity), m_frame_number(frame_number) {
allocate_buffer(capacity); allocate_buffer(capacity);
} }
~ClusterVector() { ~ClusterVector() {
@ -54,7 +55,7 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
ClusterVector(ClusterVector &&other) noexcept ClusterVector(ClusterVector &&other) noexcept
: m_cluster_size_x(other.m_cluster_size_x), : m_cluster_size_x(other.m_cluster_size_x),
m_cluster_size_y(other.m_cluster_size_y), m_data(other.m_data), m_cluster_size_y(other.m_cluster_size_y), m_data(other.m_data),
m_size(other.m_size), m_capacity(other.m_capacity) { m_size(other.m_size), m_capacity(other.m_capacity), m_frame_number(other.m_frame_number) {
other.m_data = nullptr; other.m_data = nullptr;
other.m_size = 0; other.m_size = 0;
other.m_capacity = 0; other.m_capacity = 0;
@ -69,9 +70,11 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
m_data = other.m_data; m_data = other.m_data;
m_size = other.m_size; m_size = other.m_size;
m_capacity = other.m_capacity; m_capacity = other.m_capacity;
m_frame_number = other.m_frame_number;
other.m_data = nullptr; other.m_data = nullptr;
other.m_size = 0; other.m_size = 0;
other.m_capacity = 0; other.m_capacity = 0;
other.m_frame_number = 0;
} }
return *this; return *this;
} }
@ -108,7 +111,14 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
ptr); ptr);
m_size++; m_size++;
} }
ClusterVector& operator+=(const ClusterVector& other){
if (m_size + other.m_size > m_capacity) {
allocate_buffer(m_capacity + other.m_size);
}
std::copy(other.m_data, other.m_data + other.m_size * element_offset(), m_data + m_size * element_offset());
m_size += other.m_size;
return *this;
}
/** /**
* @brief Sum the pixels in each cluster * @brief Sum the pixels in each cluster
@ -139,6 +149,12 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
return 2*sizeof(CoordType) + return 2*sizeof(CoordType) +
m_cluster_size_x * m_cluster_size_y * sizeof(T); m_cluster_size_x * m_cluster_size_y * sizeof(T);
} }
/**
* @brief Return the size in bytes of a single cluster
*/
size_t item_size() const { return element_offset(); }
/** /**
* @brief Return the offset in bytes for the i-th cluster * @brief Return the offset in bytes for the i-th cluster
*/ */
@ -166,6 +182,15 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
return m_fmt_base; return m_fmt_base;
} }
uint64_t frame_number() const { return m_frame_number; }
void set_frame_number(uint64_t frame_number) { m_frame_number = frame_number; }
void resize(size_t new_size) {
if (new_size > m_capacity) {
allocate_buffer(new_size);
}
m_size = new_size;
}
private: private:
void allocate_buffer(size_t new_capacity) { void allocate_buffer(size_t new_capacity) {
size_t num_bytes = element_offset() * new_capacity; size_t num_bytes = element_offset() * new_capacity;

View File

@ -37,6 +37,8 @@ class File {
File& operator=(File &&other) noexcept; File& operator=(File &&other) noexcept;
~File() = default; ~File() = default;
// void close(); //!< close the file
Frame read_frame(); //!< read one frame from the file at the current position Frame read_frame(); //!< read one frame from the file at the current position
Frame read_frame(size_t frame_index); //!< read one frame at the position given by frame number Frame read_frame(size_t frame_index); //!< read one frame at the position given by frame number
std::vector<Frame> read_n(size_t n_frames); //!< read n_frames from the file at the current position std::vector<Frame> read_n(size_t n_frames); //!< read n_frames from the file at the current position

View File

@ -0,0 +1,203 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// @author Bo Hu (bhu@fb.com)
// @author Jordan DeLong (delong.j@fb.com)
// Changes made by PSD Detector Group:
// Copied: Line 34 constexpr std::size_t hardware_destructive_interference_size = 128; from folly/lang/Align.h
// Changed extension to .hpp
// Changed namespace to aare
#pragma once
#include <atomic>
#include <cassert>
#include <cstdlib>
#include <memory>
#include <stdexcept>
#include <type_traits>
#include <utility>
constexpr std::size_t hardware_destructive_interference_size = 128;
namespace aare {
/*
* ProducerConsumerQueue is a one producer and one consumer queue
* without locks.
*/
template <class T> struct ProducerConsumerQueue {
typedef T value_type;
ProducerConsumerQueue(const ProducerConsumerQueue &) = delete;
ProducerConsumerQueue &operator=(const ProducerConsumerQueue &) = delete;
ProducerConsumerQueue(ProducerConsumerQueue &&other){
size_ = other.size_;
records_ = other.records_;
other.records_ = nullptr;
readIndex_ = other.readIndex_.load(std::memory_order_acquire);
writeIndex_ = other.writeIndex_.load(std::memory_order_acquire);
}
ProducerConsumerQueue &operator=(ProducerConsumerQueue &&other){
size_ = other.size_;
records_ = other.records_;
other.records_ = nullptr;
readIndex_ = other.readIndex_.load(std::memory_order_acquire);
writeIndex_ = other.writeIndex_.load(std::memory_order_acquire);
return *this;
}
ProducerConsumerQueue():ProducerConsumerQueue(2){};
// size must be >= 2.
//
// Also, note that the number of usable slots in the queue at any
// given time is actually (size-1), so if you start with an empty queue,
// isFull() will return true after size-1 insertions.
explicit ProducerConsumerQueue(uint32_t size)
: size_(size), records_(static_cast<T *>(std::malloc(sizeof(T) * size))), readIndex_(0), writeIndex_(0) {
assert(size >= 2);
if (!records_) {
throw std::bad_alloc();
}
}
~ProducerConsumerQueue() {
// We need to destruct anything that may still exist in our queue.
// (No real synchronization needed at destructor time: only one
// thread can be doing this.)
if (!std::is_trivially_destructible<T>::value) {
size_t readIndex = readIndex_;
size_t endIndex = writeIndex_;
while (readIndex != endIndex) {
records_[readIndex].~T();
if (++readIndex == size_) {
readIndex = 0;
}
}
}
std::free(records_);
}
template <class... Args> bool write(Args &&...recordArgs) {
auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
auto nextRecord = currentWrite + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
writeIndex_.store(nextRecord, std::memory_order_release);
return true;
}
// queue is full
return false;
}
// move (or copy) the value at the front of the queue to given variable
bool read(T &record) {
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
// queue is empty
return false;
}
auto nextRecord = currentRead + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
record = std::move(records_[currentRead]);
records_[currentRead].~T();
readIndex_.store(nextRecord, std::memory_order_release);
return true;
}
// pointer to the value at the front of the queue (for use in-place) or
// nullptr if empty.
T *frontPtr() {
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
// queue is empty
return nullptr;
}
return &records_[currentRead];
}
// queue must not be empty
void popFront() {
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
assert(currentRead != writeIndex_.load(std::memory_order_acquire));
auto nextRecord = currentRead + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
records_[currentRead].~T();
readIndex_.store(nextRecord, std::memory_order_release);
}
bool isEmpty() const {
return readIndex_.load(std::memory_order_acquire) == writeIndex_.load(std::memory_order_acquire);
}
bool isFull() const {
auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
return false;
}
// queue is full
return true;
}
// * If called by consumer, then true size may be more (because producer may
// be adding items concurrently).
// * If called by producer, then true size may be less (because consumer may
// be removing items concurrently).
// * It is undefined to call this from any other thread.
size_t sizeGuess() const {
int ret = writeIndex_.load(std::memory_order_acquire) - readIndex_.load(std::memory_order_acquire);
if (ret < 0) {
ret += size_;
}
return ret;
}
// maximum number of items in the queue.
size_t capacity() const { return size_ - 1; }
private:
using AtomicIndex = std::atomic<unsigned int>;
char pad0_[hardware_destructive_interference_size];
// const uint32_t size_;
uint32_t size_;
// T *const records_;
T* records_;
alignas(hardware_destructive_interference_size) AtomicIndex readIndex_;
alignas(hardware_destructive_interference_size) AtomicIndex writeIndex_;
char pad1_[hardware_destructive_interference_size - sizeof(AtomicIndex)];
};
} // namespace aare

View File

@ -4,7 +4,7 @@ build-backend = "scikit_build_core.build"
[project] [project]
name = "aare" name = "aare"
version = "2025.1.7.dev0" version = "2025.1.9.dev0"
[tool.scikit-build] [tool.scikit-build]
cmake.verbose = true cmake.verbose = true

View File

@ -8,6 +8,8 @@ from ._aare import DetectorType
from ._aare import ClusterFile from ._aare import ClusterFile
from ._aare import hitmap from ._aare import hitmap
from ._aare import ClusterFinderMT, ClusterCollector, ClusterFileSink
from .CtbRawFile import CtbRawFile from .CtbRawFile import CtbRawFile
from .RawFile import RawFile from .RawFile import RawFile
from .ScanParameters import ScanParameters from .ScanParameters import ScanParameters

View File

@ -8,41 +8,72 @@ import numpy as np
import boost_histogram as bh import boost_histogram as bh
import time import time
from aare import File, ClusterFinder, VarClusterFinder from aare import File, ClusterFinder, VarClusterFinder, ClusterFile
base = Path('/mnt/sls_det_storage/matterhorn_data/aare_test_data/') base = Path('/mnt/sls_det_storage/matterhorn_data/aare_test_data/')
f = File(base/'Moench03new/cu_half_speed_master_4.json') f = File(base/'Moench03new/cu_half_speed_master_4.json')
cf = ClusterFinder((400,400), (3,3))
from aare._aare import ClusterFinderMT, ClusterCollector, ClusterFileSink
cf = ClusterFinderMT((400,400), (3,3), n_threads = 3)
# collector = ClusterCollector(cf)
out_file = ClusterFileSink(cf, "test.clust")
for i in range(1000): for i in range(1000):
cf.push_pedestal_frame(f.read_frame()) img = f.read_frame()
cf.push_pedestal_frame(img)
print('Pedestal done')
cf.sync()
fig, ax = plt.subplots() for i in range(100):
im = ax.imshow(cf.pedestal()) img = f.read_frame()
cf.pedestal() cf.find_clusters(img)
cf.noise()
# time.sleep(1)
cf.stop()
out_file.stop()
print('Done')
cfile = ClusterFile("test.clust")
N = 500
t0 = time.perf_counter()
hist1 = bh.Histogram(bh.axis.Regular(40, -2, 4000))
f.seek(0)
t0 = time.perf_counter() # cf = ClusterFinder((400,400), (3,3))
data = f.read_n(N) # for i in range(1000):
t_elapsed = time.perf_counter()-t0 # cf.push_pedestal_frame(f.read_frame())
# fig, ax = plt.subplots()
# im = ax.imshow(cf.pedestal())
# cf.pedestal()
# cf.noise()
n_bytes = data.itemsize*data.size
print(f'Reading {N} frames took {t_elapsed:.3f}s {N/t_elapsed:.0f} FPS, {n_bytes/1024**2:.4f} GB/s') # N = 500
# t0 = time.perf_counter()
# hist1 = bh.Histogram(bh.axis.Regular(40, -2, 4000))
# f.seek(0)
# t0 = time.perf_counter()
# data = f.read_n(N)
# t_elapsed = time.perf_counter()-t0
for frame in data: # n_bytes = data.itemsize*data.size
a = cf.find_clusters(frame)
clusters = cf.steal_clusters() # print(f'Reading {N} frames took {t_elapsed:.3f}s {N/t_elapsed:.0f} FPS, {n_bytes/1024**2:.4f} GB/s')
# for frame in data:
# a = cf.find_clusters(frame)
# clusters = cf.steal_clusters()
# t_elapsed = time.perf_counter()-t0 # t_elapsed = time.perf_counter()-t0
# print(f'Clustering {N} frames took {t_elapsed:.2f}s {N/t_elapsed:.0f} FPS') # print(f'Clustering {N} frames took {t_elapsed:.2f}s {N/t_elapsed:.0f} FPS')

View File

@ -1,4 +1,7 @@
#include "aare/ClusterCollector.hpp"
#include "aare/ClusterFileSink.hpp"
#include "aare/ClusterFinder.hpp" #include "aare/ClusterFinder.hpp"
#include "aare/ClusterFinderMT.hpp"
#include "aare/ClusterVector.hpp" #include "aare/ClusterVector.hpp"
#include "aare/NDView.hpp" #include "aare/NDView.hpp"
#include "aare/Pedestal.hpp" #include "aare/Pedestal.hpp"
@ -8,6 +11,7 @@
#include <filesystem> #include <filesystem>
#include <pybind11/pybind11.h> #include <pybind11/pybind11.h>
#include <pybind11/stl.h> #include <pybind11/stl.h>
#include <pybind11/stl_bind.h>
namespace py = pybind11; namespace py = pybind11;
using pd_type = double; using pd_type = double;
@ -31,6 +35,9 @@ void define_cluster_vector(py::module &m, const std::string &typestr) {
auto *vec = new std::vector<T>(self.sum()); auto *vec = new std::vector<T>(self.sum());
return return_vector(vec); return return_vector(vec);
}) })
.def_property_readonly("capacity", &ClusterVector<T>::capacity)
.def_property("frame_number", &ClusterVector<T>::frame_number,
&ClusterVector<T>::set_frame_number)
.def_buffer([typestr](ClusterVector<T> &self) -> py::buffer_info { .def_buffer([typestr](ClusterVector<T> &self) -> py::buffer_info {
return py::buffer_info( return py::buffer_info(
self.data(), /* Pointer to buffer */ self.data(), /* Pointer to buffer */
@ -45,6 +52,64 @@ void define_cluster_vector(py::module &m, const std::string &typestr) {
}); });
} }
void define_cluster_finder_mt_bindings(py::module &m) {
py::class_<ClusterFinderMT<uint16_t, pd_type>>(m, "ClusterFinderMT")
.def(py::init<Shape<2>, Shape<2>, pd_type, size_t, size_t>(),
py::arg("image_size"), py::arg("cluster_size"),
py::arg("n_sigma") = 5.0, py::arg("capacity") = 1000,
py::arg("n_threads") = 3)
.def("push_pedestal_frame",
[](ClusterFinderMT<uint16_t, pd_type> &self,
py::array_t<uint16_t> frame) {
auto view = make_view_2d(frame);
self.push_pedestal_frame(view);
})
.def(
"find_clusters",
[](ClusterFinderMT<uint16_t, pd_type> &self,
py::array_t<uint16_t> frame, uint64_t frame_number) {
auto view = make_view_2d(frame);
self.find_clusters(view, frame_number);
return;
},
py::arg(), py::arg("frame_number") = 0)
.def("sync", &ClusterFinderMT<uint16_t, pd_type>::sync)
.def("stop", &ClusterFinderMT<uint16_t, pd_type>::stop)
.def_property_readonly("pedestal",
[](ClusterFinderMT<uint16_t, pd_type> &self) {
auto pd = new NDArray<pd_type, 2>{};
*pd = self.pedestal();
return return_image_data(pd);
})
.def_property_readonly("noise",
[](ClusterFinderMT<uint16_t, pd_type> &self) {
auto arr = new NDArray<pd_type, 2>{};
*arr = self.noise();
return return_image_data(arr);
});
}
void define_cluster_collector_bindings(py::module &m) {
py::class_<ClusterCollector>(m, "ClusterCollector")
.def(py::init<ClusterFinderMT<uint16_t, double, int32_t> *>())
.def("stop", &ClusterCollector::stop)
.def(
"steal_clusters",
[](ClusterCollector &self) {
auto v =
new std::vector<ClusterVector<int>>(self.steal_clusters());
return v;
},
py::return_value_policy::take_ownership);
}
void define_cluster_file_sink_bindings(py::module &m) {
py::class_<ClusterFileSink>(m, "ClusterFileSink")
.def(py::init<ClusterFinderMT<uint16_t, double, int32_t> *,
const std::filesystem::path &>())
.def("stop", &ClusterFileSink::stop);
}
void define_cluster_finder_bindings(py::module &m) { void define_cluster_finder_bindings(py::module &m) {
py::class_<ClusterFinder<uint16_t, pd_type>>(m, "ClusterFinder") py::class_<ClusterFinder<uint16_t, pd_type>>(m, "ClusterFinder")
.def(py::init<Shape<2>, Shape<2>, pd_type, size_t>(), .def(py::init<Shape<2>, Shape<2>, pd_type, size_t>(),
@ -77,12 +142,15 @@ void define_cluster_finder_bindings(py::module &m) {
return v; return v;
}, },
py::arg("realloc_same_capacity") = false) py::arg("realloc_same_capacity") = false)
.def("find_clusters", [](ClusterFinder<uint16_t, pd_type> &self, .def(
py::array_t<uint16_t> frame) { "find_clusters",
auto view = make_view_2d(frame); [](ClusterFinder<uint16_t, pd_type> &self,
self.find_clusters(view); py::array_t<uint16_t> frame, uint64_t frame_number) {
return; auto view = make_view_2d(frame);
}); self.find_clusters(view);
return;
},
py::arg(), py::arg("frame_number") = 0);
m.def("hitmap", m.def("hitmap",
[](std::array<size_t, 2> image_size, ClusterVector<int32_t> &cv) { [](std::array<size_t, 2> image_size, ClusterVector<int32_t> &cv) {

View File

@ -28,16 +28,13 @@ void define_cluster_file_io_bindings(py::module &m) {
py::arg(), py::arg("chunk_size") = 1000, py::arg("mode") = "r") py::arg(), py::arg("chunk_size") = 1000, py::arg("mode") = "r")
.def("read_clusters", .def("read_clusters",
[](ClusterFile &self, size_t n_clusters) { [](ClusterFile &self, size_t n_clusters) {
auto *vec = auto v = new ClusterVector<int32_t>(self.read_clusters(n_clusters));
new std::vector<Cluster3x3>(self.read_clusters(n_clusters)); return v;
return return_vector(vec);
}) })
.def("read_frame", .def("read_frame",
[](ClusterFile &self) { [](ClusterFile &self) {
int32_t frame_number; auto v = new ClusterVector<int32_t>(self.read_frame());
auto *vec = return v;
new std::vector<Cluster3x3>(self.read_frame(frame_number));
return py::make_tuple(frame_number, return_vector(vec));
}) })
.def("write_frame", &ClusterFile::write_frame) .def("write_frame", &ClusterFile::write_frame)
.def("read_cluster_with_cut", .def("read_cluster_with_cut",
@ -59,12 +56,11 @@ void define_cluster_file_io_bindings(py::module &m) {
}) })
.def("__iter__", [](ClusterFile &self) { return &self; }) .def("__iter__", [](ClusterFile &self) { return &self; })
.def("__next__", [](ClusterFile &self) { .def("__next__", [](ClusterFile &self) {
auto vec = auto v = new ClusterVector<int32_t>(self.read_clusters(self.chunk_size()));
new std::vector<Cluster3x3>(self.read_clusters(self.chunk_size())); if (v->size() == 0) {
if (vec->size() == 0) {
throw py::stop_iteration(); throw py::stop_iteration();
} }
return return_vector(vec); return v;
}); });
m.def("calculate_eta2", []( aare::ClusterVector<int32_t> &clusters) { m.def("calculate_eta2", []( aare::ClusterVector<int32_t> &clusters) {

View File

@ -124,8 +124,41 @@ void define_file_io_bindings(py::module &m) {
self.read_into(reinterpret_cast<std::byte *>(image.mutable_data()), self.read_into(reinterpret_cast<std::byte *>(image.mutable_data()),
n_frames); n_frames);
return image; return image;
})
.def("__enter__", [](File &self) { return &self; })
.def("__exit__",
[](File &self,
const std::optional<pybind11::type> &exc_type,
const std::optional<pybind11::object> &exc_value,
const std::optional<pybind11::object> &traceback) {
// self.close();
})
.def("__iter__", [](File &self) { return &self; })
.def("__next__", [](File &self) {
try{
const uint8_t item_size = self.bytes_per_pixel();
py::array image;
std::vector<ssize_t> shape;
shape.reserve(2);
shape.push_back(self.rows());
shape.push_back(self.cols());
if (item_size == 1) {
image = py::array_t<uint8_t>(shape);
} else if (item_size == 2) {
image = py::array_t<uint16_t>(shape);
} else if (item_size == 4) {
image = py::array_t<uint32_t>(shape);
}
self.read_into(
reinterpret_cast<std::byte *>(image.mutable_data()));
return image;
}catch(std::runtime_error &e){
throw py::stop_iteration();
}
}); });
py::class_<FileConfig>(m, "FileConfig") py::class_<FileConfig>(m, "FileConfig")
.def(py::init<>()) .def(py::init<>())
.def_readwrite("rows", &FileConfig::rows) .def_readwrite("rows", &FileConfig::rows)

View File

@ -25,5 +25,9 @@ PYBIND11_MODULE(_aare, m) {
define_pedestal_bindings<double>(m, "Pedestal_d"); define_pedestal_bindings<double>(m, "Pedestal_d");
define_pedestal_bindings<float>(m, "Pedestal_f"); define_pedestal_bindings<float>(m, "Pedestal_f");
define_cluster_finder_bindings(m); define_cluster_finder_bindings(m);
define_cluster_finder_mt_bindings(m);
define_cluster_file_io_bindings(m); define_cluster_file_io_bindings(m);
define_cluster_collector_bindings(m);
define_cluster_file_sink_bindings(m);
} }

View File

@ -34,8 +34,7 @@ void ClusterFile::close() {
} }
} }
void ClusterFile::write_frame(int32_t frame_number, void ClusterFile::write_frame(const ClusterVector<int32_t> &clusters) {
const ClusterVector<int32_t> &clusters) {
if (m_mode != "w") { if (m_mode != "w") {
throw std::runtime_error("File not opened for writing"); throw std::runtime_error("File not opened for writing");
} }
@ -43,26 +42,27 @@ void ClusterFile::write_frame(int32_t frame_number,
!(clusters.cluster_size_y() == 3)) { !(clusters.cluster_size_y() == 3)) {
throw std::runtime_error("Only 3x3 clusters are supported"); throw std::runtime_error("Only 3x3 clusters are supported");
} }
int32_t frame_number = clusters.frame_number();
fwrite(&frame_number, sizeof(frame_number), 1, fp); fwrite(&frame_number, sizeof(frame_number), 1, fp);
uint32_t n_clusters = clusters.size(); uint32_t n_clusters = clusters.size();
fwrite(&n_clusters, sizeof(n_clusters), 1, fp); fwrite(&n_clusters, sizeof(n_clusters), 1, fp);
fwrite(clusters.data(), clusters.element_offset(), clusters.size(), fp); fwrite(clusters.data(), clusters.element_offset(), clusters.size(), fp);
// write clusters
// fwrite(clusters.data(), sizeof(Cluster), clusters.size(), fp);
} }
std::vector<Cluster3x3> ClusterFile::read_clusters(size_t n_clusters) { ClusterVector<int32_t> ClusterFile::read_clusters(size_t n_clusters) {
if (m_mode != "r") { if (m_mode != "r") {
throw std::runtime_error("File not opened for reading"); throw std::runtime_error("File not opened for reading");
} }
std::vector<Cluster3x3> clusters(n_clusters);
ClusterVector<int32_t> clusters(3,3, n_clusters);
int32_t iframe = 0; // frame number needs to be 4 bytes! int32_t iframe = 0; // frame number needs to be 4 bytes!
size_t nph_read = 0; size_t nph_read = 0;
uint32_t nn = m_num_left; uint32_t nn = m_num_left;
uint32_t nph = m_num_left; // number of clusters in frame needs to be 4 uint32_t nph = m_num_left; // number of clusters in frame needs to be 4
auto buf = reinterpret_cast<Cluster3x3 *>(clusters.data()); // auto buf = reinterpret_cast<Cluster3x3 *>(clusters.data());
auto buf = clusters.data();
// if there are photons left from previous frame read them first // if there are photons left from previous frame read them first
if (nph) { if (nph) {
if (nph > n_clusters) { if (nph > n_clusters) {
@ -73,7 +73,7 @@ std::vector<Cluster3x3> ClusterFile::read_clusters(size_t n_clusters) {
nn = nph; nn = nph;
} }
nph_read += fread(reinterpret_cast<void *>(buf + nph_read), nph_read += fread(reinterpret_cast<void *>(buf + nph_read),
sizeof(Cluster3x3), nn, fp); clusters.item_size(), nn, fp);
m_num_left = nph - nn; // write back the number of photons left m_num_left = nph - nn; // write back the number of photons left
} }
@ -88,7 +88,7 @@ std::vector<Cluster3x3> ClusterFile::read_clusters(size_t n_clusters) {
nn = nph; nn = nph;
nph_read += fread(reinterpret_cast<void *>(buf + nph_read), nph_read += fread(reinterpret_cast<void *>(buf + nph_read),
sizeof(Cluster3x3), nn, fp); clusters.item_size(), nn, fp);
m_num_left = nph - nn; m_num_left = nph - nn;
} }
if (nph_read >= n_clusters) if (nph_read >= n_clusters)
@ -102,7 +102,7 @@ std::vector<Cluster3x3> ClusterFile::read_clusters(size_t n_clusters) {
return clusters; return clusters;
} }
std::vector<Cluster3x3> ClusterFile::read_frame(int32_t &out_fnum) { ClusterVector<int32_t> ClusterFile::read_frame() {
if (m_mode != "r") { if (m_mode != "r") {
throw std::runtime_error("File not opened for reading"); throw std::runtime_error("File not opened for reading");
} }
@ -110,8 +110,8 @@ std::vector<Cluster3x3> ClusterFile::read_frame(int32_t &out_fnum) {
throw std::runtime_error( throw std::runtime_error(
"There are still photons left in the last frame"); "There are still photons left in the last frame");
} }
int32_t frame_number;
if (fread(&out_fnum, sizeof(out_fnum), 1, fp) != 1) { if (fread(&frame_number, sizeof(frame_number), 1, fp) != 1) {
throw std::runtime_error("Could not read frame number"); throw std::runtime_error("Could not read frame number");
} }
@ -119,15 +119,19 @@ std::vector<Cluster3x3> ClusterFile::read_frame(int32_t &out_fnum) {
if (fread(&n_clusters, sizeof(n_clusters), 1, fp) != 1) { if (fread(&n_clusters, sizeof(n_clusters), 1, fp) != 1) {
throw std::runtime_error("Could not read number of clusters"); throw std::runtime_error("Could not read number of clusters");
} }
std::vector<Cluster3x3> clusters(n_clusters); // std::vector<Cluster3x3> clusters(n_clusters);
ClusterVector<int32_t> clusters(3, 3, n_clusters);
clusters.set_frame_number(frame_number);
if (fread(clusters.data(), sizeof(Cluster3x3), n_clusters, fp) != if (fread(clusters.data(), clusters.item_size(), n_clusters, fp) !=
static_cast<size_t>(n_clusters)) { static_cast<size_t>(n_clusters)) {
throw std::runtime_error("Could not read clusters"); throw std::runtime_error("Could not read clusters");
} }
clusters.resize(n_clusters);
return clusters; return clusters;
} }
std::vector<Cluster3x3> ClusterFile::read_cluster_with_cut(size_t n_clusters, std::vector<Cluster3x3> ClusterFile::read_cluster_with_cut(size_t n_clusters,
double *noise_map, double *noise_map,
int nx, int ny) { int nx, int ny) {

View File

@ -45,6 +45,8 @@ File& File::operator=(File &&other) noexcept {
return *this; return *this;
} }
// void File::close() { file_impl->close(); }
Frame File::read_frame() { return file_impl->read_frame(); } Frame File::read_frame() { return file_impl->read_frame(); }
Frame File::read_frame(size_t frame_index) { Frame File::read_frame(size_t frame_index) {
return file_impl->read_frame(frame_index); return file_impl->read_frame(frame_index);