This commit is contained in:
froejdh_e 2025-01-08 16:45:24 +01:00
parent 21ce7a3efa
commit dc9e10016d
9 changed files with 661 additions and 34 deletions

View File

@ -0,0 +1,97 @@
#pragma once
#include <chrono>
#include <fmt/color.h>
#include <fmt/format.h>
#include <memory>
#include <thread>
#include "aare/ProducerConsumerQueue.hpp"
namespace aare {
template <class ItemType> class CircularFifo {
uint32_t fifo_size;
aare::ProducerConsumerQueue<ItemType> free_slots;
aare::ProducerConsumerQueue<ItemType> filled_slots;
public:
CircularFifo() : CircularFifo(100){};
CircularFifo(uint32_t size) : fifo_size(size), free_slots(size + 1), filled_slots(size + 1) {
// TODO! how do we deal with alignment for writing? alignas???
// Do we give the user a chance to provide memory locations?
// Templated allocator?
for (size_t i = 0; i < fifo_size; ++i) {
free_slots.write(ItemType{});
}
}
bool next() {
// TODO! avoid default constructing ItemType
ItemType it;
if (!filled_slots.read(it))
return false;
if (!free_slots.write(std::move(it)))
return false;
return true;
}
~CircularFifo() {}
using value_type = ItemType;
auto numFilledSlots() const noexcept { return filled_slots.sizeGuess(); }
auto numFreeSlots() const noexcept { return free_slots.sizeGuess(); }
auto isFull() const noexcept { return filled_slots.isFull(); }
ItemType pop_free() {
ItemType v;
while (!free_slots.read(v))
;
return std::move(v);
// return v;
}
bool try_pop_free(ItemType &v) { return free_slots.read(v); }
ItemType pop_value(std::chrono::nanoseconds wait, std::atomic<bool> &stopped) {
ItemType v;
while (!filled_slots.read(v) && !stopped) {
std::this_thread::sleep_for(wait);
}
return std::move(v);
}
ItemType pop_value() {
ItemType v;
while (!filled_slots.read(v))
;
return std::move(v);
}
ItemType *frontPtr() { return filled_slots.frontPtr(); }
// TODO! Add function to move item from filled to free to be used
// with the frontPtr function
template <class... Args> void push_value(Args &&...recordArgs) {
while (!filled_slots.write(std::forward<Args>(recordArgs)...))
;
}
template <class... Args> bool try_push_value(Args &&...recordArgs) {
return filled_slots.write(std::forward<Args>(recordArgs)...);
}
template <class... Args> void push_free(Args &&...recordArgs) {
while (!free_slots.write(std::forward<Args>(recordArgs)...))
;
}
template <class... Args> bool try_push_free(Args &&...recordArgs) {
return free_slots.write(std::forward<Args>(recordArgs)...);
}
};
} // namespace aare

View File

@ -0,0 +1,52 @@
#pragma once
#include <atomic>
#include <thread>
#include "aare/ProducerConsumerQueue.hpp"
#include "aare/ClusterVector.hpp"
#include "aare/ClusterFinderMT.hpp"
namespace aare {
class ClusterCollector{
ProducerConsumerQueue<ClusterVector<int>>* m_source;
std::atomic<bool> m_stop_requested{false};
std::atomic<bool> m_stopped{true};
std::chrono::milliseconds m_default_wait{1};
std::thread m_thread;
std::vector<ClusterVector<int>> m_clusters;
void process(){
m_stopped = false;
fmt::print("ClusterCollector started\n");
while (!m_stop_requested || !m_source->isEmpty()) {
if (ClusterVector<int> *clusters = m_source->frontPtr();
clusters != nullptr) {
m_clusters.push_back(std::move(*clusters));
m_source->popFront();
}else{
std::this_thread::sleep_for(m_default_wait);
}
}
fmt::print("ClusterCollector stopped\n");
m_stopped = true;
}
public:
ClusterCollector(ClusterFinderMT<uint16_t, double, int32_t>* source){
m_source = source->sink();
m_thread = std::thread(&ClusterCollector::process, this);
}
void stop(){
m_stop_requested = true;
m_thread.join();
}
std::vector<ClusterVector<int>> steal_clusters(){
if(!m_stopped){
throw std::runtime_error("ClusterCollector is still running");
}
return std::move(m_clusters);
}
};
} // namespace aare

View File

@ -10,26 +10,12 @@
namespace aare {
/** enum to define the event types */
enum class eventType {
PEDESTAL, /** pedestal */
NEIGHBOUR, /** neighbour i.e. below threshold, but in the cluster of a
photon */
PHOTON, /** photon i.e. above threshold */
PHOTON_MAX, /** maximum of a cluster satisfying the photon conditions */
NEGATIVE_PEDESTAL, /** negative value, will not be accounted for as pedestal
in order to avoid drift of the pedestal towards
negative values */
UNDEFINED_EVENT = -1 /** undefined */
};
template <typename FRAME_TYPE = uint16_t, typename PEDESTAL_TYPE = double,
typename CT = int32_t>
class ClusterFinder {
Shape<2> m_image_size;
const int m_cluster_sizeX;
const int m_cluster_sizeY;
// const PEDESTAL_TYPE m_threshold;
const PEDESTAL_TYPE m_nSigma;
const PEDESTAL_TYPE c2;
const PEDESTAL_TYPE c3;

View File

@ -0,0 +1,189 @@
#pragma once
#include <atomic>
#include <cstdint>
#include <memory>
#include <thread>
#include <vector>
#include "aare/NDArray.hpp"
#include "aare/ProducerConsumerQueue.hpp"
namespace aare {
enum class FrameType {
DATA,
PEDESTAL,
};
struct FrameWrapper {
FrameType type;
uint64_t frame_number;
NDArray<uint16_t, 2> data;
};
template <typename FRAME_TYPE = uint16_t, typename PEDESTAL_TYPE = double,
typename CT = int32_t>
class ClusterFinderMT {
size_t m_current_thread{0};
size_t m_n_threads{0};
using Finder = ClusterFinder<FRAME_TYPE, PEDESTAL_TYPE, CT>;
using InputQueue = ProducerConsumerQueue<FrameWrapper>;
using OutputQueue = ProducerConsumerQueue<ClusterVector<int>>;
std::vector<std::unique_ptr<InputQueue>> m_input_queues;
std::vector<std::unique_ptr<OutputQueue>> m_output_queues;
OutputQueue m_sink{1000}; //All clusters go into this queue
std::vector<std::unique_ptr<Finder>> m_cluster_finders;
std::vector<std::thread> m_threads;
std::thread m_collect_thread;
std::chrono::milliseconds m_default_wait{1};
std::atomic<bool> m_stop_requested{false};
std::atomic<bool> m_processing_threads_stopped{false};
void process(int thread_id) {
auto cf = m_cluster_finders[thread_id].get();
auto q = m_input_queues[thread_id].get();
// TODO! Avoid indexing into the vector every time
fmt::print("Thread {} started\n", thread_id);
//TODO! is this check enough to make sure we process all the frames?
while (!m_stop_requested || !q->isEmpty()) {
if (FrameWrapper *frame = q->frontPtr();
frame != nullptr) {
// fmt::print("Thread {} got frame {}, type: {}\n", thread_id,
// frame->frame_number, static_cast<int>(frame->type));
switch (frame->type) {
case FrameType::DATA:
cf->find_clusters(
frame->data.view());
m_output_queues[thread_id]->write(cf->steal_clusters());
break;
case FrameType::PEDESTAL:
m_cluster_finders[thread_id]->push_pedestal_frame(
frame->data.view());
break;
default:
break;
}
// frame is processed now discard it
m_input_queues[thread_id]->popFront();
} else {
std::this_thread::sleep_for(m_default_wait);
}
}
fmt::print("Thread {} stopped\n", thread_id);
}
/**
* @brief Collect all the clusters from the output queues and write them to the sink
*/
void collect(){
bool empty = true;
while(!m_stop_requested || !empty || !m_processing_threads_stopped){
empty = true;
for (auto &queue : m_output_queues) {
if (!queue->isEmpty()) {
while(!m_sink.write(std::move(*queue->frontPtr()))){
std::this_thread::sleep_for(m_default_wait);
}
queue->popFront();
empty = false;
}
}
}
}
public:
ClusterFinderMT(Shape<2> image_size, Shape<2> cluster_size,
PEDESTAL_TYPE nSigma = 5.0, size_t capacity = 2000,
size_t n_threads = 3)
: m_n_threads(n_threads) {
fmt::print("ClusterFinderMT: using {} threads\n", n_threads);
for (size_t i = 0; i < n_threads; i++) {
m_cluster_finders.push_back(
std::make_unique<ClusterFinder<FRAME_TYPE, PEDESTAL_TYPE, CT>>(
image_size, cluster_size, nSigma, capacity));
}
for (size_t i = 0; i < n_threads; i++) {
m_input_queues.emplace_back(std::make_unique<InputQueue>(200));
m_output_queues.emplace_back(std::make_unique<OutputQueue>(200));
}
start();
}
ProducerConsumerQueue<ClusterVector<int>> *sink() { return &m_sink; }
void start(){
for (size_t i = 0; i < m_n_threads; i++) {
m_threads.push_back(
std::thread(&ClusterFinderMT::process, this, i));
}
m_collect_thread = std::thread(&ClusterFinderMT::collect, this);
}
void stop() {
m_stop_requested = true;
for (auto &thread : m_threads) {
thread.join();
}
m_processing_threads_stopped = true;
m_collect_thread.join();
}
void sync(){
for (auto &q : m_input_queues) {
while(!q->isEmpty()){
std::this_thread::sleep_for(m_default_wait);
}
}
}
void push_pedestal_frame(NDView<FRAME_TYPE, 2> frame) {
FrameWrapper fw{FrameType::PEDESTAL, 0, NDArray(frame)};
for (auto &queue : m_input_queues) {
while (!queue->write(fw)) {
// fmt::print("push_pedestal_frame: queue full\n");
std::this_thread::sleep_for(m_default_wait);
}
}
}
void find_clusters(NDView<FRAME_TYPE, 2> frame) {
FrameWrapper fw{FrameType::DATA, 0, NDArray(frame)};
while (!m_input_queues[m_current_thread%m_n_threads]->write(fw)) {
std::this_thread::sleep_for(m_default_wait);
}
m_current_thread++;
}
ClusterVector<int> steal_clusters(bool realloc_same_capacity = false) {
ClusterVector<int> clusters(3,3);
for (auto &finder : m_cluster_finders) {
clusters += finder->steal_clusters();
}
return clusters;
}
// void push(FrameWrapper&& frame) {
// //TODO! need to loop until we are successful
// auto rc = m_input_queue.write(std::move(frame));
// fmt::print("pushed frame {}\n", rc);
// }
};
} // namespace aare

View File

@ -22,6 +22,7 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
std::byte *m_data{};
size_t m_size{0};
size_t m_capacity;
uint64_t m_frame_number{0}; //TODO! Check frame number size and type
/*
Format string used in the python bindings to create a numpy
array from the buffer
@ -39,7 +40,7 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
* @param cluster_size_y size of the cluster in y direction
* @param capacity initial capacity of the buffer in number of clusters
*/
ClusterVector(size_t cluster_size_x, size_t cluster_size_y,
ClusterVector(size_t cluster_size_x = 3, size_t cluster_size_y = 3,
size_t capacity = 1024)
: m_cluster_size_x(cluster_size_x), m_cluster_size_y(cluster_size_y),
m_capacity(capacity) {
@ -108,7 +109,14 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
ptr);
m_size++;
}
ClusterVector& operator+=(const ClusterVector& other){
if (m_size + other.m_size > m_capacity) {
allocate_buffer(m_capacity + other.m_size);
}
std::copy(other.m_data, other.m_data + other.m_size * element_offset(), m_data + m_size * element_offset());
m_size += other.m_size;
return *this;
}
/**
* @brief Sum the pixels in each cluster
@ -166,6 +174,9 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
return m_fmt_base;
}
uint64_t frame_number() const { return m_frame_number; }
void set_frame_number(uint64_t frame_number) { m_frame_number = frame_number; }
private:
void allocate_buffer(size_t new_capacity) {
size_t num_bytes = element_offset() * new_capacity;

View File

@ -0,0 +1,203 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// @author Bo Hu (bhu@fb.com)
// @author Jordan DeLong (delong.j@fb.com)
// Changes made by PSD Detector Group:
// Copied: Line 34 constexpr std::size_t hardware_destructive_interference_size = 128; from folly/lang/Align.h
// Changed extension to .hpp
// Changed namespace to aare
#pragma once
#include <atomic>
#include <cassert>
#include <cstdlib>
#include <memory>
#include <stdexcept>
#include <type_traits>
#include <utility>
constexpr std::size_t hardware_destructive_interference_size = 128;
namespace aare {
/*
* ProducerConsumerQueue is a one producer and one consumer queue
* without locks.
*/
template <class T> struct ProducerConsumerQueue {
typedef T value_type;
ProducerConsumerQueue(const ProducerConsumerQueue &) = delete;
ProducerConsumerQueue &operator=(const ProducerConsumerQueue &) = delete;
ProducerConsumerQueue(ProducerConsumerQueue &&other){
size_ = other.size_;
records_ = other.records_;
other.records_ = nullptr;
readIndex_ = other.readIndex_.load(std::memory_order_acquire);
writeIndex_ = other.writeIndex_.load(std::memory_order_acquire);
}
ProducerConsumerQueue &operator=(ProducerConsumerQueue &&other){
size_ = other.size_;
records_ = other.records_;
other.records_ = nullptr;
readIndex_ = other.readIndex_.load(std::memory_order_acquire);
writeIndex_ = other.writeIndex_.load(std::memory_order_acquire);
return *this;
}
ProducerConsumerQueue():ProducerConsumerQueue(2){};
// size must be >= 2.
//
// Also, note that the number of usable slots in the queue at any
// given time is actually (size-1), so if you start with an empty queue,
// isFull() will return true after size-1 insertions.
explicit ProducerConsumerQueue(uint32_t size)
: size_(size), records_(static_cast<T *>(std::malloc(sizeof(T) * size))), readIndex_(0), writeIndex_(0) {
assert(size >= 2);
if (!records_) {
throw std::bad_alloc();
}
}
~ProducerConsumerQueue() {
// We need to destruct anything that may still exist in our queue.
// (No real synchronization needed at destructor time: only one
// thread can be doing this.)
if (!std::is_trivially_destructible<T>::value) {
size_t readIndex = readIndex_;
size_t endIndex = writeIndex_;
while (readIndex != endIndex) {
records_[readIndex].~T();
if (++readIndex == size_) {
readIndex = 0;
}
}
}
std::free(records_);
}
template <class... Args> bool write(Args &&...recordArgs) {
auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
auto nextRecord = currentWrite + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
writeIndex_.store(nextRecord, std::memory_order_release);
return true;
}
// queue is full
return false;
}
// move (or copy) the value at the front of the queue to given variable
bool read(T &record) {
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
// queue is empty
return false;
}
auto nextRecord = currentRead + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
record = std::move(records_[currentRead]);
records_[currentRead].~T();
readIndex_.store(nextRecord, std::memory_order_release);
return true;
}
// pointer to the value at the front of the queue (for use in-place) or
// nullptr if empty.
T *frontPtr() {
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
// queue is empty
return nullptr;
}
return &records_[currentRead];
}
// queue must not be empty
void popFront() {
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
assert(currentRead != writeIndex_.load(std::memory_order_acquire));
auto nextRecord = currentRead + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
records_[currentRead].~T();
readIndex_.store(nextRecord, std::memory_order_release);
}
bool isEmpty() const {
return readIndex_.load(std::memory_order_acquire) == writeIndex_.load(std::memory_order_acquire);
}
bool isFull() const {
auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
return false;
}
// queue is full
return true;
}
// * If called by consumer, then true size may be more (because producer may
// be adding items concurrently).
// * If called by producer, then true size may be less (because consumer may
// be removing items concurrently).
// * It is undefined to call this from any other thread.
size_t sizeGuess() const {
int ret = writeIndex_.load(std::memory_order_acquire) - readIndex_.load(std::memory_order_acquire);
if (ret < 0) {
ret += size_;
}
return ret;
}
// maximum number of items in the queue.
size_t capacity() const { return size_ - 1; }
private:
using AtomicIndex = std::atomic<unsigned int>;
char pad0_[hardware_destructive_interference_size];
// const uint32_t size_;
uint32_t size_;
// T *const records_;
T* records_;
alignas(hardware_destructive_interference_size) AtomicIndex readIndex_;
alignas(hardware_destructive_interference_size) AtomicIndex writeIndex_;
char pad1_[hardware_destructive_interference_size - sizeof(AtomicIndex)];
};
} // namespace aare

View File

@ -13,36 +13,66 @@ from aare import File, ClusterFinder, VarClusterFinder
base = Path('/mnt/sls_det_storage/matterhorn_data/aare_test_data/')
f = File(base/'Moench03new/cu_half_speed_master_4.json')
cf = ClusterFinder((400,400), (3,3))
from aare._aare import ClusterFinderMT, ClusterCollector
cf = ClusterFinderMT((400,400), (3,3), n_threads = 3)
collector = ClusterCollector(cf)
for i in range(1000):
cf.push_pedestal_frame(f.read_frame())
img = f.read_frame()
cf.push_pedestal_frame(img)
print('Pedestal done')
cf.sync()
fig, ax = plt.subplots()
im = ax.imshow(cf.pedestal())
cf.pedestal()
cf.noise()
for i in range(100):
img = f.read_frame()
cf.find_clusters(img)
# time.sleep(1)
cf.stop()
collector.stop()
cv = collector.steal_clusters()
print(f'Processed {len(cv)} frames')
print('Done')
N = 500
t0 = time.perf_counter()
hist1 = bh.Histogram(bh.axis.Regular(40, -2, 4000))
f.seek(0)
t0 = time.perf_counter()
data = f.read_n(N)
t_elapsed = time.perf_counter()-t0
# cf = ClusterFinder((400,400), (3,3))
# for i in range(1000):
# cf.push_pedestal_frame(f.read_frame())
# fig, ax = plt.subplots()
# im = ax.imshow(cf.pedestal())
# cf.pedestal()
# cf.noise()
n_bytes = data.itemsize*data.size
print(f'Reading {N} frames took {t_elapsed:.3f}s {N/t_elapsed:.0f} FPS, {n_bytes/1024**2:.4f} GB/s')
# N = 500
# t0 = time.perf_counter()
# hist1 = bh.Histogram(bh.axis.Regular(40, -2, 4000))
# f.seek(0)
# t0 = time.perf_counter()
# data = f.read_n(N)
# t_elapsed = time.perf_counter()-t0
for frame in data:
a = cf.find_clusters(frame)
# n_bytes = data.itemsize*data.size
clusters = cf.steal_clusters()
# print(f'Reading {N} frames took {t_elapsed:.3f}s {N/t_elapsed:.0f} FPS, {n_bytes/1024**2:.4f} GB/s')
# for frame in data:
# a = cf.find_clusters(frame)
# clusters = cf.steal_clusters()
# t_elapsed = time.perf_counter()-t0
# print(f'Clustering {N} frames took {t_elapsed:.2f}s {N/t_elapsed:.0f} FPS')

View File

@ -1,5 +1,7 @@
#include "aare/ClusterFinder.hpp"
#include "aare/ClusterFinderMT.hpp"
#include "aare/ClusterVector.hpp"
#include "aare/ClusterCollector.hpp"
#include "aare/NDView.hpp"
#include "aare/Pedestal.hpp"
#include "np_helper.hpp"
@ -7,11 +9,13 @@
#include <cstdint>
#include <filesystem>
#include <pybind11/pybind11.h>
#include <pybind11/stl_bind.h>
#include <pybind11/stl.h>
namespace py = pybind11;
using pd_type = double;
template <typename T>
void define_cluster_vector(py::module &m, const std::string &typestr) {
auto class_name = fmt::format("ClusterVector_{}", typestr);
@ -31,6 +35,9 @@ void define_cluster_vector(py::module &m, const std::string &typestr) {
auto *vec = new std::vector<T>(self.sum());
return return_vector(vec);
})
.def_property_readonly("capacity", &ClusterVector<T>::capacity)
.def_property("frame_number", &ClusterVector<T>::frame_number,
&ClusterVector<T>::set_frame_number)
.def_buffer([typestr](ClusterVector<T> &self) -> py::buffer_info {
return py::buffer_info(
self.data(), /* Pointer to buffer */
@ -45,6 +52,55 @@ void define_cluster_vector(py::module &m, const std::string &typestr) {
});
}
void define_cluster_finder_mt_bindings(py::module &m) {
py::class_<ClusterFinderMT<uint16_t, pd_type>>(m, "ClusterFinderMT")
.def(py::init<Shape<2>, Shape<2>, pd_type, size_t, size_t>(),
py::arg("image_size"), py::arg("cluster_size"),
py::arg("n_sigma") = 5.0, py::arg("capacity") = 1000,
py::arg("n_threads") = 3)
.def("push_pedestal_frame",
[](ClusterFinderMT<uint16_t, pd_type> &self,
py::array_t<uint16_t> frame) {
auto view = make_view_2d(frame);
self.push_pedestal_frame(view);
})
.def("find_clusters",
[](ClusterFinderMT<uint16_t, pd_type> &self,
py::array_t<uint16_t> frame) {
auto view = make_view_2d(frame);
self.find_clusters(view);
return;
})
.def("sync", &ClusterFinderMT<uint16_t, pd_type>::sync)
.def(
"steal_clusters",
[](ClusterFinderMT<uint16_t, pd_type> &self,
bool realloc_same_capacity) {
auto v = new ClusterVector<int>(
self.steal_clusters(realloc_same_capacity));
return v;
},
py::arg("realloc_same_capacity") = false)
.def("stop", &ClusterFinderMT<uint16_t, pd_type>::stop);
}
void define_cluster_collector_bindings(py::module &m) {
py::class_<ClusterCollector>(m, "ClusterCollector")
.def(py::init<ClusterFinderMT<uint16_t, double, int32_t>*>())
.def("stop", &ClusterCollector::stop)
.def("steal_clusters",
[](ClusterCollector &self) {
auto v = new std::vector<ClusterVector<int>>(
self.steal_clusters());
return v;
}, py::return_value_policy::take_ownership);
}
void define_cluster_finder_bindings(py::module &m) {
py::class_<ClusterFinder<uint16_t, pd_type>>(m, "ClusterFinder")
.def(py::init<Shape<2>, Shape<2>, pd_type, size_t>(),

View File

@ -25,5 +25,8 @@ PYBIND11_MODULE(_aare, m) {
define_pedestal_bindings<double>(m, "Pedestal_d");
define_pedestal_bindings<float>(m, "Pedestal_f");
define_cluster_finder_bindings(m);
define_cluster_finder_mt_bindings(m);
define_cluster_file_io_bindings(m);
define_cluster_collector_bindings(m);
}