MultiThreaded Cluster finder

This commit is contained in:
froejdh_e 2025-01-09 16:53:22 +01:00
parent dc9e10016d
commit cc95561eda
16 changed files with 268 additions and 111 deletions

View File

@ -1,6 +1,6 @@
package: package:
name: aare name: aare
version: 2025.1.7.dev0 #TODO! how to not duplicate this? version: 2025.1.9.dev0 #TODO! how to not duplicate this?
source: source:

View File

@ -59,10 +59,9 @@ class ClusterFile {
ClusterFile(const std::filesystem::path &fname, size_t chunk_size = 1000, ClusterFile(const std::filesystem::path &fname, size_t chunk_size = 1000,
const std::string &mode = "r"); const std::string &mode = "r");
~ClusterFile(); ~ClusterFile();
std::vector<Cluster3x3> read_clusters(size_t n_clusters); ClusterVector<int32_t> read_clusters(size_t n_clusters);
std::vector<Cluster3x3> read_frame(int32_t &out_fnum); ClusterVector<int32_t> read_frame();
void write_frame(int32_t frame_number, void write_frame(const ClusterVector<int32_t> &clusters);
const ClusterVector<int32_t> &clusters);
std::vector<Cluster3x3> std::vector<Cluster3x3>
read_cluster_with_cut(size_t n_clusters, double *noise_map, int nx, int ny); read_cluster_with_cut(size_t n_clusters, double *noise_map, int nx, int ny);

View File

@ -0,0 +1,56 @@
#pragma once
#include <atomic>
#include <filesystem>
#include <thread>
#include "aare/ProducerConsumerQueue.hpp"
#include "aare/ClusterVector.hpp"
#include "aare/ClusterFinderMT.hpp"
namespace aare{
class ClusterFileSink{
ProducerConsumerQueue<ClusterVector<int>>* m_source;
std::atomic<bool> m_stop_requested{false};
std::atomic<bool> m_stopped{true};
std::chrono::milliseconds m_default_wait{1};
std::thread m_thread;
std::ofstream m_file;
void process(){
m_stopped = false;
fmt::print("ClusterFileSink started\n");
while (!m_stop_requested || !m_source->isEmpty()) {
if (ClusterVector<int> *clusters = m_source->frontPtr();
clusters != nullptr) {
// Write clusters to file
int32_t frame_number = clusters->frame_number(); //TODO! Should we store frame number already as int?
uint32_t num_clusters = clusters->size();
m_file.write(reinterpret_cast<const char*>(&frame_number), sizeof(frame_number));
m_file.write(reinterpret_cast<const char*>(&num_clusters), sizeof(num_clusters));
m_file.write(reinterpret_cast<const char*>(clusters->data()), clusters->size() * clusters->item_size());
m_source->popFront();
}else{
std::this_thread::sleep_for(m_default_wait);
}
}
fmt::print("ClusterFileSink stopped\n");
m_stopped = true;
}
public:
ClusterFileSink(ClusterFinderMT<uint16_t, double, int32_t>* source, const std::filesystem::path& fname){
m_source = source->sink();
m_thread = std::thread(&ClusterFileSink::process, this);
m_file.open(fname, std::ios::binary);
}
void stop(){
m_stop_requested = true;
m_thread.join();
m_file.close();
}
};
} // namespace aare

View File

@ -64,13 +64,13 @@ class ClusterFinder {
m_clusters = ClusterVector<CT>(m_cluster_sizeX, m_cluster_sizeY); m_clusters = ClusterVector<CT>(m_cluster_sizeX, m_cluster_sizeY);
return tmp; return tmp;
} }
void find_clusters(NDView<FRAME_TYPE, 2> frame) { void find_clusters(NDView<FRAME_TYPE, 2> frame, uint64_t frame_number = 0) {
// // TODO! deal with even size clusters // // TODO! deal with even size clusters
// // currently 3,3 -> +/- 1 // // currently 3,3 -> +/- 1
// // 4,4 -> +/- 2 // // 4,4 -> +/- 2
int dy = m_cluster_sizeY / 2; int dy = m_cluster_sizeY / 2;
int dx = m_cluster_sizeX / 2; int dx = m_cluster_sizeX / 2;
m_clusters.set_frame_number(frame_number);
std::vector<CT> cluster_data(m_cluster_sizeX * m_cluster_sizeY); std::vector<CT> cluster_data(m_cluster_sizeX * m_cluster_sizeY);
for (int iy = 0; iy < frame.shape(0); iy++) { for (int iy = 0; iy < frame.shape(0); iy++) {
for (int ix = 0; ix < frame.shape(1); ix++) { for (int ix = 0; ix < frame.shape(1); ix++) {

View File

@ -7,6 +7,7 @@
#include "aare/NDArray.hpp" #include "aare/NDArray.hpp"
#include "aare/ProducerConsumerQueue.hpp" #include "aare/ProducerConsumerQueue.hpp"
#include "aare/ClusterFinder.hpp"
namespace aare { namespace aare {
@ -21,7 +22,6 @@ struct FrameWrapper {
NDArray<uint16_t, 2> data; NDArray<uint16_t, 2> data;
}; };
template <typename FRAME_TYPE = uint16_t, typename PEDESTAL_TYPE = double, template <typename FRAME_TYPE = uint16_t, typename PEDESTAL_TYPE = double,
typename CT = int32_t> typename CT = int32_t>
class ClusterFinderMT { class ClusterFinderMT {
@ -32,10 +32,8 @@ class ClusterFinderMT {
using OutputQueue = ProducerConsumerQueue<ClusterVector<int>>; using OutputQueue = ProducerConsumerQueue<ClusterVector<int>>;
std::vector<std::unique_ptr<InputQueue>> m_input_queues; std::vector<std::unique_ptr<InputQueue>> m_input_queues;
std::vector<std::unique_ptr<OutputQueue>> m_output_queues; std::vector<std::unique_ptr<OutputQueue>> m_output_queues;
OutputQueue m_sink{1000}; //All clusters go into this queue
OutputQueue m_sink{1000}; // All clusters go into this queue
std::vector<std::unique_ptr<Finder>> m_cluster_finders; std::vector<std::unique_ptr<Finder>> m_cluster_finders;
std::vector<std::thread> m_threads; std::vector<std::thread> m_threads;
@ -43,26 +41,24 @@ class ClusterFinderMT {
std::chrono::milliseconds m_default_wait{1}; std::chrono::milliseconds m_default_wait{1};
std::atomic<bool> m_stop_requested{false}; std::atomic<bool> m_stop_requested{false};
std::atomic<bool> m_processing_threads_stopped{false}; std::atomic<bool> m_processing_threads_stopped{true};
void process(int thread_id) { void process(int thread_id) {
auto cf = m_cluster_finders[thread_id].get(); auto cf = m_cluster_finders[thread_id].get();
auto q = m_input_queues[thread_id].get(); auto q = m_input_queues[thread_id].get();
// TODO! Avoid indexing into the vector every time // TODO! Avoid indexing into the vector every time
fmt::print("Thread {} started\n", thread_id); fmt::print("Thread {} started\n", thread_id);
//TODO! is this check enough to make sure we process all the frames? // TODO! is this check enough to make sure we process all the frames?
while (!m_stop_requested || !q->isEmpty()) { while (!m_stop_requested || !q->isEmpty()) {
if (FrameWrapper *frame = q->frontPtr(); if (FrameWrapper *frame = q->frontPtr(); frame != nullptr) {
frame != nullptr) {
// fmt::print("Thread {} got frame {}, type: {}\n", thread_id, // fmt::print("Thread {} got frame {}, type: {}\n", thread_id,
// frame->frame_number, static_cast<int>(frame->type)); // frame->frame_number, static_cast<int>(frame->type));
switch (frame->type) { switch (frame->type) {
case FrameType::DATA: case FrameType::DATA:
cf->find_clusters( cf->find_clusters(frame->data.view(), frame->frame_number);
frame->data.view());
m_output_queues[thread_id]->write(cf->steal_clusters()); m_output_queues[thread_id]->write(cf->steal_clusters());
break; break;
case FrameType::PEDESTAL: case FrameType::PEDESTAL:
@ -79,22 +75,22 @@ class ClusterFinderMT {
} else { } else {
std::this_thread::sleep_for(m_default_wait); std::this_thread::sleep_for(m_default_wait);
} }
} }
fmt::print("Thread {} stopped\n", thread_id); fmt::print("Thread {} stopped\n", thread_id);
} }
/** /**
* @brief Collect all the clusters from the output queues and write them to the sink * @brief Collect all the clusters from the output queues and write them to
* the sink
*/ */
void collect(){ void collect() {
bool empty = true; bool empty = true;
while(!m_stop_requested || !empty || !m_processing_threads_stopped){ while (!m_stop_requested || !empty || !m_processing_threads_stopped) {
empty = true; empty = true;
for (auto &queue : m_output_queues) { for (auto &queue : m_output_queues) {
if (!queue->isEmpty()) { if (!queue->isEmpty()) {
while(!m_sink.write(std::move(*queue->frontPtr()))){ while (!m_sink.write(std::move(*queue->frontPtr()))) {
std::this_thread::sleep_for(m_default_wait); std::this_thread::sleep_for(m_default_wait);
} }
queue->popFront(); queue->popFront();
@ -118,7 +114,6 @@ class ClusterFinderMT {
for (size_t i = 0; i < n_threads; i++) { for (size_t i = 0; i < n_threads; i++) {
m_input_queues.emplace_back(std::make_unique<InputQueue>(200)); m_input_queues.emplace_back(std::make_unique<InputQueue>(200));
m_output_queues.emplace_back(std::make_unique<OutputQueue>(200)); m_output_queues.emplace_back(std::make_unique<OutputQueue>(200));
} }
start(); start();
@ -126,14 +121,22 @@ class ClusterFinderMT {
ProducerConsumerQueue<ClusterVector<int>> *sink() { return &m_sink; } ProducerConsumerQueue<ClusterVector<int>> *sink() { return &m_sink; }
void start(){ /**
* @brief Start all threads
*/
void start() {
for (size_t i = 0; i < m_n_threads; i++) { for (size_t i = 0; i < m_n_threads; i++) {
m_threads.push_back( m_threads.push_back(
std::thread(&ClusterFinderMT::process, this, i)); std::thread(&ClusterFinderMT::process, this, i));
} }
m_processing_threads_stopped = false;
m_collect_thread = std::thread(&ClusterFinderMT::collect, this); m_collect_thread = std::thread(&ClusterFinderMT::collect, this);
} }
/**
* @brief Stop all threads
*/
void stop() { void stop() {
m_stop_requested = true; m_stop_requested = true;
for (auto &thread : m_threads) { for (auto &thread : m_threads) {
@ -143,42 +146,74 @@ class ClusterFinderMT {
m_collect_thread.join(); m_collect_thread.join();
} }
void sync(){ /**
* @brief Wait for all the queues to be empty
*/
void sync() {
for (auto &q : m_input_queues) { for (auto &q : m_input_queues) {
while(!q->isEmpty()){ while (!q->isEmpty()) {
std::this_thread::sleep_for(m_default_wait); std::this_thread::sleep_for(m_default_wait);
} }
} }
for (auto &q : m_output_queues) {
while (!q->isEmpty()) {
std::this_thread::sleep_for(m_default_wait);
}
}
while (!m_sink.isEmpty()) {
std::this_thread::sleep_for(m_default_wait);
}
} }
/**
* @brief Push a pedestal frame to all the cluster finders. The frames is
* expected to be dark. No photon finding is done. Just pedestal update.
*/
void push_pedestal_frame(NDView<FRAME_TYPE, 2> frame) { void push_pedestal_frame(NDView<FRAME_TYPE, 2> frame) {
FrameWrapper fw{FrameType::PEDESTAL, 0, NDArray(frame)}; FrameWrapper fw{FrameType::PEDESTAL, 0,
NDArray(frame)}; // TODO! copies the data!
for (auto &queue : m_input_queues) { for (auto &queue : m_input_queues) {
while (!queue->write(fw)) { while (!queue->write(fw)) {
// fmt::print("push_pedestal_frame: queue full\n");
std::this_thread::sleep_for(m_default_wait); std::this_thread::sleep_for(m_default_wait);
} }
} }
} }
void find_clusters(NDView<FRAME_TYPE, 2> frame) { /**
FrameWrapper fw{FrameType::DATA, 0, NDArray(frame)}; * @brief Push the frame to the queue of the next available thread. Function
while (!m_input_queues[m_current_thread%m_n_threads]->write(fw)) { * returns once the frame is in a queue.
* @note Spin locks with a default wait if the queue is full.
*/
void find_clusters(NDView<FRAME_TYPE, 2> frame, uint64_t frame_number = 0) {
FrameWrapper fw{FrameType::DATA, frame_number,
NDArray(frame)}; // TODO! copies the data!
while (!m_input_queues[m_current_thread % m_n_threads]->write(fw)) {
std::this_thread::sleep_for(m_default_wait); std::this_thread::sleep_for(m_default_wait);
} }
m_current_thread++; m_current_thread++;
} }
ClusterVector<int> steal_clusters(bool realloc_same_capacity = false) { auto pedestal() {
ClusterVector<int> clusters(3,3); if (m_cluster_finders.empty()) {
for (auto &finder : m_cluster_finders) { throw std::runtime_error("No cluster finders available");
clusters += finder->steal_clusters();
} }
return clusters; if(!m_processing_threads_stopped){
throw std::runtime_error("ClusterFinderMT is still running");
}
return m_cluster_finders[0]->pedestal();
}
auto noise() {
if (m_cluster_finders.empty()) {
throw std::runtime_error("No cluster finders available");
}
if(!m_processing_threads_stopped){
throw std::runtime_error("ClusterFinderMT is still running");
}
return m_cluster_finders[0]->noise();
} }
// void push(FrameWrapper&& frame) { // void push(FrameWrapper&& frame) {
// //TODO! need to loop until we are successful // //TODO! need to loop until we are successful
// auto rc = m_input_queue.write(std::move(frame)); // auto rc = m_input_queue.write(std::move(frame));

View File

@ -41,9 +41,9 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
* @param capacity initial capacity of the buffer in number of clusters * @param capacity initial capacity of the buffer in number of clusters
*/ */
ClusterVector(size_t cluster_size_x = 3, size_t cluster_size_y = 3, ClusterVector(size_t cluster_size_x = 3, size_t cluster_size_y = 3,
size_t capacity = 1024) size_t capacity = 1024, uint64_t frame_number = 0)
: m_cluster_size_x(cluster_size_x), m_cluster_size_y(cluster_size_y), : m_cluster_size_x(cluster_size_x), m_cluster_size_y(cluster_size_y),
m_capacity(capacity) { m_capacity(capacity), m_frame_number(frame_number) {
allocate_buffer(capacity); allocate_buffer(capacity);
} }
~ClusterVector() { ~ClusterVector() {
@ -55,7 +55,7 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
ClusterVector(ClusterVector &&other) noexcept ClusterVector(ClusterVector &&other) noexcept
: m_cluster_size_x(other.m_cluster_size_x), : m_cluster_size_x(other.m_cluster_size_x),
m_cluster_size_y(other.m_cluster_size_y), m_data(other.m_data), m_cluster_size_y(other.m_cluster_size_y), m_data(other.m_data),
m_size(other.m_size), m_capacity(other.m_capacity) { m_size(other.m_size), m_capacity(other.m_capacity), m_frame_number(other.m_frame_number) {
other.m_data = nullptr; other.m_data = nullptr;
other.m_size = 0; other.m_size = 0;
other.m_capacity = 0; other.m_capacity = 0;
@ -70,9 +70,11 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
m_data = other.m_data; m_data = other.m_data;
m_size = other.m_size; m_size = other.m_size;
m_capacity = other.m_capacity; m_capacity = other.m_capacity;
m_frame_number = other.m_frame_number;
other.m_data = nullptr; other.m_data = nullptr;
other.m_size = 0; other.m_size = 0;
other.m_capacity = 0; other.m_capacity = 0;
other.m_frame_number = 0;
} }
return *this; return *this;
} }
@ -147,6 +149,12 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
return 2*sizeof(CoordType) + return 2*sizeof(CoordType) +
m_cluster_size_x * m_cluster_size_y * sizeof(T); m_cluster_size_x * m_cluster_size_y * sizeof(T);
} }
/**
* @brief Return the size in bytes of a single cluster
*/
size_t item_size() const { return element_offset(); }
/** /**
* @brief Return the offset in bytes for the i-th cluster * @brief Return the offset in bytes for the i-th cluster
*/ */
@ -176,6 +184,12 @@ template <typename T, typename CoordType=int16_t> class ClusterVector {
uint64_t frame_number() const { return m_frame_number; } uint64_t frame_number() const { return m_frame_number; }
void set_frame_number(uint64_t frame_number) { m_frame_number = frame_number; } void set_frame_number(uint64_t frame_number) { m_frame_number = frame_number; }
void resize(size_t new_size) {
if (new_size > m_capacity) {
allocate_buffer(new_size);
}
m_size = new_size;
}
private: private:
void allocate_buffer(size_t new_capacity) { void allocate_buffer(size_t new_capacity) {

View File

@ -36,6 +36,8 @@ class File {
File(File &&other) noexcept; File(File &&other) noexcept;
File& operator=(File &&other) noexcept; File& operator=(File &&other) noexcept;
~File() = default; ~File() = default;
// void close(); //!< close the file
Frame read_frame(); //!< read one frame from the file at the current position Frame read_frame(); //!< read one frame from the file at the current position
Frame read_frame(size_t frame_index); //!< read one frame at the position given by frame number Frame read_frame(size_t frame_index); //!< read one frame at the position given by frame number

View File

@ -4,7 +4,7 @@ build-backend = "scikit_build_core.build"
[project] [project]
name = "aare" name = "aare"
version = "2025.1.7.dev0" version = "2025.1.9.dev0"
[tool.scikit-build] [tool.scikit-build]
cmake.verbose = true cmake.verbose = true

View File

@ -8,6 +8,8 @@ from ._aare import DetectorType
from ._aare import ClusterFile from ._aare import ClusterFile
from ._aare import hitmap from ._aare import hitmap
from ._aare import ClusterFinderMT, ClusterCollector, ClusterFileSink
from .CtbRawFile import CtbRawFile from .CtbRawFile import CtbRawFile
from .RawFile import RawFile from .RawFile import RawFile
from .ScanParameters import ScanParameters from .ScanParameters import ScanParameters

View File

@ -8,18 +8,19 @@ import numpy as np
import boost_histogram as bh import boost_histogram as bh
import time import time
from aare import File, ClusterFinder, VarClusterFinder from aare import File, ClusterFinder, VarClusterFinder, ClusterFile
base = Path('/mnt/sls_det_storage/matterhorn_data/aare_test_data/') base = Path('/mnt/sls_det_storage/matterhorn_data/aare_test_data/')
f = File(base/'Moench03new/cu_half_speed_master_4.json') f = File(base/'Moench03new/cu_half_speed_master_4.json')
from aare._aare import ClusterFinderMT, ClusterCollector from aare._aare import ClusterFinderMT, ClusterCollector, ClusterFileSink
cf = ClusterFinderMT((400,400), (3,3), n_threads = 3) cf = ClusterFinderMT((400,400), (3,3), n_threads = 3)
collector = ClusterCollector(cf) # collector = ClusterCollector(cf)
out_file = ClusterFileSink(cf, "test.clust")
for i in range(1000): for i in range(1000):
img = f.read_frame() img = f.read_frame()
@ -34,13 +35,13 @@ for i in range(100):
# time.sleep(1) # time.sleep(1)
cf.stop() cf.stop()
collector.stop() out_file.stop()
cv = collector.steal_clusters()
print(f'Processed {len(cv)} frames')
print('Done') print('Done')
cfile = ClusterFile("test.clust")
# cf = ClusterFinder((400,400), (3,3)) # cf = ClusterFinder((400,400), (3,3))

View File

@ -1,7 +1,8 @@
#include "aare/ClusterCollector.hpp"
#include "aare/ClusterFileSink.hpp"
#include "aare/ClusterFinder.hpp" #include "aare/ClusterFinder.hpp"
#include "aare/ClusterFinderMT.hpp" #include "aare/ClusterFinderMT.hpp"
#include "aare/ClusterVector.hpp" #include "aare/ClusterVector.hpp"
#include "aare/ClusterCollector.hpp"
#include "aare/NDView.hpp" #include "aare/NDView.hpp"
#include "aare/Pedestal.hpp" #include "aare/Pedestal.hpp"
#include "np_helper.hpp" #include "np_helper.hpp"
@ -9,13 +10,12 @@
#include <cstdint> #include <cstdint>
#include <filesystem> #include <filesystem>
#include <pybind11/pybind11.h> #include <pybind11/pybind11.h>
#include <pybind11/stl_bind.h>
#include <pybind11/stl.h> #include <pybind11/stl.h>
#include <pybind11/stl_bind.h>
namespace py = pybind11; namespace py = pybind11;
using pd_type = double; using pd_type = double;
template <typename T> template <typename T>
void define_cluster_vector(py::module &m, const std::string &typestr) { void define_cluster_vector(py::module &m, const std::string &typestr) {
auto class_name = fmt::format("ClusterVector_{}", typestr); auto class_name = fmt::format("ClusterVector_{}", typestr);
@ -64,42 +64,51 @@ void define_cluster_finder_mt_bindings(py::module &m) {
auto view = make_view_2d(frame); auto view = make_view_2d(frame);
self.push_pedestal_frame(view); self.push_pedestal_frame(view);
}) })
.def("find_clusters",
[](ClusterFinderMT<uint16_t, pd_type> &self,
py::array_t<uint16_t> frame) {
auto view = make_view_2d(frame);
self.find_clusters(view);
return;
})
.def("sync", &ClusterFinderMT<uint16_t, pd_type>::sync)
.def( .def(
"steal_clusters", "find_clusters",
[](ClusterFinderMT<uint16_t, pd_type> &self, [](ClusterFinderMT<uint16_t, pd_type> &self,
bool realloc_same_capacity) { py::array_t<uint16_t> frame, uint64_t frame_number) {
auto v = new ClusterVector<int>( auto view = make_view_2d(frame);
self.steal_clusters(realloc_same_capacity)); self.find_clusters(view, frame_number);
return v; return;
}, },
py::arg("realloc_same_capacity") = false) py::arg(), py::arg("frame_number") = 0)
.def("stop", &ClusterFinderMT<uint16_t, pd_type>::stop); .def("sync", &ClusterFinderMT<uint16_t, pd_type>::sync)
.def("stop", &ClusterFinderMT<uint16_t, pd_type>::stop)
.def_property_readonly("pedestal",
[](ClusterFinderMT<uint16_t, pd_type> &self) {
auto pd = new NDArray<pd_type, 2>{};
*pd = self.pedestal();
return return_image_data(pd);
})
.def_property_readonly("noise",
[](ClusterFinderMT<uint16_t, pd_type> &self) {
auto arr = new NDArray<pd_type, 2>{};
*arr = self.noise();
return return_image_data(arr);
});
} }
void define_cluster_collector_bindings(py::module &m) { void define_cluster_collector_bindings(py::module &m) {
py::class_<ClusterCollector>(m, "ClusterCollector") py::class_<ClusterCollector>(m, "ClusterCollector")
.def(py::init<ClusterFinderMT<uint16_t, double, int32_t>*>()) .def(py::init<ClusterFinderMT<uint16_t, double, int32_t> *>())
.def("stop", &ClusterCollector::stop) .def("stop", &ClusterCollector::stop)
.def("steal_clusters", .def(
[](ClusterCollector &self) { "steal_clusters",
auto v = new std::vector<ClusterVector<int>>( [](ClusterCollector &self) {
self.steal_clusters()); auto v =
return v; new std::vector<ClusterVector<int>>(self.steal_clusters());
}, py::return_value_policy::take_ownership); return v;
},
py::return_value_policy::take_ownership);
} }
void define_cluster_file_sink_bindings(py::module &m) {
py::class_<ClusterFileSink>(m, "ClusterFileSink")
.def(py::init<ClusterFinderMT<uint16_t, double, int32_t> *,
const std::filesystem::path &>())
.def("stop", &ClusterFileSink::stop);
}
void define_cluster_finder_bindings(py::module &m) { void define_cluster_finder_bindings(py::module &m) {
py::class_<ClusterFinder<uint16_t, pd_type>>(m, "ClusterFinder") py::class_<ClusterFinder<uint16_t, pd_type>>(m, "ClusterFinder")
@ -133,12 +142,15 @@ void define_cluster_finder_bindings(py::module &m) {
return v; return v;
}, },
py::arg("realloc_same_capacity") = false) py::arg("realloc_same_capacity") = false)
.def("find_clusters", [](ClusterFinder<uint16_t, pd_type> &self, .def(
py::array_t<uint16_t> frame) { "find_clusters",
auto view = make_view_2d(frame); [](ClusterFinder<uint16_t, pd_type> &self,
self.find_clusters(view); py::array_t<uint16_t> frame, uint64_t frame_number) {
return; auto view = make_view_2d(frame);
}); self.find_clusters(view);
return;
},
py::arg(), py::arg("frame_number") = 0);
m.def("hitmap", m.def("hitmap",
[](std::array<size_t, 2> image_size, ClusterVector<int32_t> &cv) { [](std::array<size_t, 2> image_size, ClusterVector<int32_t> &cv) {

View File

@ -28,16 +28,13 @@ void define_cluster_file_io_bindings(py::module &m) {
py::arg(), py::arg("chunk_size") = 1000, py::arg("mode") = "r") py::arg(), py::arg("chunk_size") = 1000, py::arg("mode") = "r")
.def("read_clusters", .def("read_clusters",
[](ClusterFile &self, size_t n_clusters) { [](ClusterFile &self, size_t n_clusters) {
auto *vec = auto v = new ClusterVector<int32_t>(self.read_clusters(n_clusters));
new std::vector<Cluster3x3>(self.read_clusters(n_clusters)); return v;
return return_vector(vec);
}) })
.def("read_frame", .def("read_frame",
[](ClusterFile &self) { [](ClusterFile &self) {
int32_t frame_number; auto v = new ClusterVector<int32_t>(self.read_frame());
auto *vec = return v;
new std::vector<Cluster3x3>(self.read_frame(frame_number));
return py::make_tuple(frame_number, return_vector(vec));
}) })
.def("write_frame", &ClusterFile::write_frame) .def("write_frame", &ClusterFile::write_frame)
.def("read_cluster_with_cut", .def("read_cluster_with_cut",
@ -59,12 +56,11 @@ void define_cluster_file_io_bindings(py::module &m) {
}) })
.def("__iter__", [](ClusterFile &self) { return &self; }) .def("__iter__", [](ClusterFile &self) { return &self; })
.def("__next__", [](ClusterFile &self) { .def("__next__", [](ClusterFile &self) {
auto vec = auto v = new ClusterVector<int32_t>(self.read_clusters(self.chunk_size()));
new std::vector<Cluster3x3>(self.read_clusters(self.chunk_size())); if (v->size() == 0) {
if (vec->size() == 0) {
throw py::stop_iteration(); throw py::stop_iteration();
} }
return return_vector(vec); return v;
}); });
m.def("calculate_eta2", []( aare::ClusterVector<int32_t> &clusters) { m.def("calculate_eta2", []( aare::ClusterVector<int32_t> &clusters) {

View File

@ -124,8 +124,41 @@ void define_file_io_bindings(py::module &m) {
self.read_into(reinterpret_cast<std::byte *>(image.mutable_data()), self.read_into(reinterpret_cast<std::byte *>(image.mutable_data()),
n_frames); n_frames);
return image; return image;
})
.def("__enter__", [](File &self) { return &self; })
.def("__exit__",
[](File &self,
const std::optional<pybind11::type> &exc_type,
const std::optional<pybind11::object> &exc_value,
const std::optional<pybind11::object> &traceback) {
// self.close();
})
.def("__iter__", [](File &self) { return &self; })
.def("__next__", [](File &self) {
try{
const uint8_t item_size = self.bytes_per_pixel();
py::array image;
std::vector<ssize_t> shape;
shape.reserve(2);
shape.push_back(self.rows());
shape.push_back(self.cols());
if (item_size == 1) {
image = py::array_t<uint8_t>(shape);
} else if (item_size == 2) {
image = py::array_t<uint16_t>(shape);
} else if (item_size == 4) {
image = py::array_t<uint32_t>(shape);
}
self.read_into(
reinterpret_cast<std::byte *>(image.mutable_data()));
return image;
}catch(std::runtime_error &e){
throw py::stop_iteration();
}
}); });
py::class_<FileConfig>(m, "FileConfig") py::class_<FileConfig>(m, "FileConfig")
.def(py::init<>()) .def(py::init<>())
.def_readwrite("rows", &FileConfig::rows) .def_readwrite("rows", &FileConfig::rows)

View File

@ -28,5 +28,6 @@ PYBIND11_MODULE(_aare, m) {
define_cluster_finder_mt_bindings(m); define_cluster_finder_mt_bindings(m);
define_cluster_file_io_bindings(m); define_cluster_file_io_bindings(m);
define_cluster_collector_bindings(m); define_cluster_collector_bindings(m);
define_cluster_file_sink_bindings(m);
} }

View File

@ -34,8 +34,7 @@ void ClusterFile::close() {
} }
} }
void ClusterFile::write_frame(int32_t frame_number, void ClusterFile::write_frame(const ClusterVector<int32_t> &clusters) {
const ClusterVector<int32_t> &clusters) {
if (m_mode != "w") { if (m_mode != "w") {
throw std::runtime_error("File not opened for writing"); throw std::runtime_error("File not opened for writing");
} }
@ -43,26 +42,27 @@ void ClusterFile::write_frame(int32_t frame_number,
!(clusters.cluster_size_y() == 3)) { !(clusters.cluster_size_y() == 3)) {
throw std::runtime_error("Only 3x3 clusters are supported"); throw std::runtime_error("Only 3x3 clusters are supported");
} }
int32_t frame_number = clusters.frame_number();
fwrite(&frame_number, sizeof(frame_number), 1, fp); fwrite(&frame_number, sizeof(frame_number), 1, fp);
uint32_t n_clusters = clusters.size(); uint32_t n_clusters = clusters.size();
fwrite(&n_clusters, sizeof(n_clusters), 1, fp); fwrite(&n_clusters, sizeof(n_clusters), 1, fp);
fwrite(clusters.data(), clusters.element_offset(), clusters.size(), fp); fwrite(clusters.data(), clusters.element_offset(), clusters.size(), fp);
// write clusters
// fwrite(clusters.data(), sizeof(Cluster), clusters.size(), fp);
} }
std::vector<Cluster3x3> ClusterFile::read_clusters(size_t n_clusters) { ClusterVector<int32_t> ClusterFile::read_clusters(size_t n_clusters) {
if (m_mode != "r") { if (m_mode != "r") {
throw std::runtime_error("File not opened for reading"); throw std::runtime_error("File not opened for reading");
} }
std::vector<Cluster3x3> clusters(n_clusters);
ClusterVector<int32_t> clusters(3,3, n_clusters);
int32_t iframe = 0; // frame number needs to be 4 bytes! int32_t iframe = 0; // frame number needs to be 4 bytes!
size_t nph_read = 0; size_t nph_read = 0;
uint32_t nn = m_num_left; uint32_t nn = m_num_left;
uint32_t nph = m_num_left; // number of clusters in frame needs to be 4 uint32_t nph = m_num_left; // number of clusters in frame needs to be 4
auto buf = reinterpret_cast<Cluster3x3 *>(clusters.data()); // auto buf = reinterpret_cast<Cluster3x3 *>(clusters.data());
auto buf = clusters.data();
// if there are photons left from previous frame read them first // if there are photons left from previous frame read them first
if (nph) { if (nph) {
if (nph > n_clusters) { if (nph > n_clusters) {
@ -73,7 +73,7 @@ std::vector<Cluster3x3> ClusterFile::read_clusters(size_t n_clusters) {
nn = nph; nn = nph;
} }
nph_read += fread(reinterpret_cast<void *>(buf + nph_read), nph_read += fread(reinterpret_cast<void *>(buf + nph_read),
sizeof(Cluster3x3), nn, fp); clusters.item_size(), nn, fp);
m_num_left = nph - nn; // write back the number of photons left m_num_left = nph - nn; // write back the number of photons left
} }
@ -88,7 +88,7 @@ std::vector<Cluster3x3> ClusterFile::read_clusters(size_t n_clusters) {
nn = nph; nn = nph;
nph_read += fread(reinterpret_cast<void *>(buf + nph_read), nph_read += fread(reinterpret_cast<void *>(buf + nph_read),
sizeof(Cluster3x3), nn, fp); clusters.item_size(), nn, fp);
m_num_left = nph - nn; m_num_left = nph - nn;
} }
if (nph_read >= n_clusters) if (nph_read >= n_clusters)
@ -102,7 +102,7 @@ std::vector<Cluster3x3> ClusterFile::read_clusters(size_t n_clusters) {
return clusters; return clusters;
} }
std::vector<Cluster3x3> ClusterFile::read_frame(int32_t &out_fnum) { ClusterVector<int32_t> ClusterFile::read_frame() {
if (m_mode != "r") { if (m_mode != "r") {
throw std::runtime_error("File not opened for reading"); throw std::runtime_error("File not opened for reading");
} }
@ -110,8 +110,8 @@ std::vector<Cluster3x3> ClusterFile::read_frame(int32_t &out_fnum) {
throw std::runtime_error( throw std::runtime_error(
"There are still photons left in the last frame"); "There are still photons left in the last frame");
} }
int32_t frame_number;
if (fread(&out_fnum, sizeof(out_fnum), 1, fp) != 1) { if (fread(&frame_number, sizeof(frame_number), 1, fp) != 1) {
throw std::runtime_error("Could not read frame number"); throw std::runtime_error("Could not read frame number");
} }
@ -119,15 +119,19 @@ std::vector<Cluster3x3> ClusterFile::read_frame(int32_t &out_fnum) {
if (fread(&n_clusters, sizeof(n_clusters), 1, fp) != 1) { if (fread(&n_clusters, sizeof(n_clusters), 1, fp) != 1) {
throw std::runtime_error("Could not read number of clusters"); throw std::runtime_error("Could not read number of clusters");
} }
std::vector<Cluster3x3> clusters(n_clusters); // std::vector<Cluster3x3> clusters(n_clusters);
ClusterVector<int32_t> clusters(3, 3, n_clusters);
clusters.set_frame_number(frame_number);
if (fread(clusters.data(), sizeof(Cluster3x3), n_clusters, fp) != if (fread(clusters.data(), clusters.item_size(), n_clusters, fp) !=
static_cast<size_t>(n_clusters)) { static_cast<size_t>(n_clusters)) {
throw std::runtime_error("Could not read clusters"); throw std::runtime_error("Could not read clusters");
} }
clusters.resize(n_clusters);
return clusters; return clusters;
} }
std::vector<Cluster3x3> ClusterFile::read_cluster_with_cut(size_t n_clusters, std::vector<Cluster3x3> ClusterFile::read_cluster_with_cut(size_t n_clusters,
double *noise_map, double *noise_map,
int nx, int ny) { int nx, int ny) {

View File

@ -45,6 +45,8 @@ File& File::operator=(File &&other) noexcept {
return *this; return *this;
} }
// void File::close() { file_impl->close(); }
Frame File::read_frame() { return file_impl->read_frame(); } Frame File::read_frame() { return file_impl->read_frame(); }
Frame File::read_frame(size_t frame_index) { Frame File::read_frame(size_t frame_index) {
return file_impl->read_frame(frame_index); return file_impl->read_frame(frame_index);