Inital implementation of multithreading for chunked pedestal

This commit is contained in:
2025-08-12 11:58:21 +02:00
parent 29a2374446
commit 7ea20c6b9d
4 changed files with 50 additions and 10 deletions

View File

@@ -20,9 +20,15 @@ enum class FrameType {
struct FrameWrapper { struct FrameWrapper {
FrameType type; FrameType type;
uint64_t frame_number; uint64_t frame_number;
// NDArray<T, 2> data;
NDArray<uint16_t, 2> data; NDArray<uint16_t, 2> data;
// NDArray<double, 2> data;
// void* data_ptr;
// std::type_index data_type;
uint32_t chunk_number;
}; };
/** /**
* @brief ClusterFinderMT is a multi-threaded version of ClusterFinder. It uses * @brief ClusterFinderMT is a multi-threaded version of ClusterFinder. It uses
* a producer-consumer queue to distribute the frames to the threads. The * a producer-consumer queue to distribute the frames to the threads. The
@@ -68,6 +74,7 @@ class ClusterFinderMT {
while (!m_stop_requested || !q->isEmpty()) { while (!m_stop_requested || !q->isEmpty()) {
if (FrameWrapper *frame = q->frontPtr(); frame != nullptr) { if (FrameWrapper *frame = q->frontPtr(); frame != nullptr) {
switch (frame->type) { switch (frame->type) {
case FrameType::DATA: case FrameType::DATA:
cf->find_clusters(frame->data.view(), frame->frame_number); cf->find_clusters(frame->data.view(), frame->frame_number);
@@ -121,7 +128,8 @@ class ClusterFinderMT {
* @param n_threads number of threads to use * @param n_threads number of threads to use
*/ */
ClusterFinderMT(Shape<2> image_size, PEDESTAL_TYPE nSigma = 5.0, ClusterFinderMT(Shape<2> image_size, PEDESTAL_TYPE nSigma = 5.0,
size_t capacity = 2000, size_t n_threads = 3) size_t capacity = 2000, size_t n_threads = 3,
uint32_t chunk_size = 50000, uint32_t n_chunks = 10)
: m_n_threads(n_threads) { : m_n_threads(n_threads) {
LOG(logDEBUG1) << "ClusterFinderMT: " LOG(logDEBUG1) << "ClusterFinderMT: "
@@ -134,7 +142,7 @@ class ClusterFinderMT {
m_cluster_finders.push_back( m_cluster_finders.push_back(
std::make_unique< std::make_unique<
ClusterFinder<ClusterType, FRAME_TYPE, PEDESTAL_TYPE>>( ClusterFinder<ClusterType, FRAME_TYPE, PEDESTAL_TYPE>>(
image_size, nSigma, capacity)); image_size, nSigma, capacity, chunk_size, n_chunks));
} }
for (size_t i = 0; i < n_threads; i++) { for (size_t i = 0; i < n_threads; i++) {
m_input_queues.emplace_back(std::make_unique<InputQueue>(200)); m_input_queues.emplace_back(std::make_unique<InputQueue>(200));
@@ -208,7 +216,7 @@ class ClusterFinderMT {
*/ */
void push_pedestal_frame(NDView<FRAME_TYPE, 2> frame) { void push_pedestal_frame(NDView<FRAME_TYPE, 2> frame) {
FrameWrapper fw{FrameType::PEDESTAL, 0, FrameWrapper fw{FrameType::PEDESTAL, 0,
NDArray(frame)}; // TODO! copies the data! NDArray(frame), 0}; // TODO! copies the data!
for (auto &queue : m_input_queues) { for (auto &queue : m_input_queues) {
while (!queue->write(fw)) { while (!queue->write(fw)) {
@@ -217,6 +225,23 @@ class ClusterFinderMT {
} }
} }
void push_pedestal_mean(NDView<PEDESTAL_TYPE, 2> frame, uint32_t chunk_number) {
if (!m_processing_threads_stopped) {
throw std::runtime_error("ClusterFinderMT is still running");
}
for (auto &cf : m_cluster_finders) {
cf->push_pedestal_mean(frame, chunk_number);
}
}
void push_pedestal_std(NDView<PEDESTAL_TYPE, 2> frame, uint32_t chunk_number) {
if (!m_processing_threads_stopped) {
throw std::runtime_error("ClusterFinderMT is still running");
}
for (auto &cf : m_cluster_finders) {
cf->push_pedestal_std(frame, chunk_number);
}
}
/** /**
* @brief Push the frame to the queue of the next available thread. Function * @brief Push the frame to the queue of the next available thread. Function
* returns once the frame is in a queue. * returns once the frame is in a queue.
@@ -224,7 +249,7 @@ class ClusterFinderMT {
*/ */
void find_clusters(NDView<FRAME_TYPE, 2> frame, uint64_t frame_number = 0) { void find_clusters(NDView<FRAME_TYPE, 2> frame, uint64_t frame_number = 0) {
FrameWrapper fw{FrameType::DATA, frame_number, FrameWrapper fw{FrameType::DATA, frame_number,
NDArray(frame)}; // TODO! copies the data! NDArray(frame), 0}; // TODO! copies the data!
while (!m_input_queues[m_current_thread % m_n_threads]->write(fw)) { while (!m_input_queues[m_current_thread % m_n_threads]->write(fw)) {
std::this_thread::sleep_for(m_default_wait); std::this_thread::sleep_for(m_default_wait);
} }

View File

@@ -36,14 +36,14 @@ def ClusterFinder(image_size, cluster_size, n_sigma=5, dtype = np.int32, capacit
def ClusterFinderMT(image_size, cluster_size = (3,3), dtype=np.int32, n_sigma=5, capacity = 1024, n_threads = 3): def ClusterFinderMT(image_size, cluster_size = (3,3), dtype=np.int32, n_sigma=5, capacity = 1024, n_threads = 3, chunk_size=50000, n_chunks = 10):
""" """
Factory function to create a ClusterFinderMT object. Provides a cleaner syntax for Factory function to create a ClusterFinderMT object. Provides a cleaner syntax for
the templated ClusterFinderMT in C++. the templated ClusterFinderMT in C++.
""" """
cls = _get_class("ClusterFinderMT", cluster_size, dtype) cls = _get_class("ClusterFinderMT", cluster_size, dtype)
return cls(image_size, n_sigma=n_sigma, capacity=capacity, n_threads=n_threads) return cls(image_size, n_sigma=n_sigma, capacity=capacity, n_threads=n_threads, chunk_size=chunk_size, n_chunks=n_chunks)

View File

@@ -30,15 +30,30 @@ void define_ClusterFinderMT(py::module &m, const std::string &typestr) {
py::class_<ClusterFinderMT<ClusterType, uint16_t, pd_type>>( py::class_<ClusterFinderMT<ClusterType, uint16_t, pd_type>>(
m, class_name.c_str()) m, class_name.c_str())
.def(py::init<Shape<2>, pd_type, size_t, size_t>(), .def(py::init<Shape<2>, pd_type, size_t, size_t, uint32_t, uint32_t>(),
py::arg("image_size"), py::arg("n_sigma") = 5.0, py::arg("image_size"), py::arg("n_sigma") = 5.0,
py::arg("capacity") = 2048, py::arg("n_threads") = 3) py::arg("capacity") = 2048, py::arg("n_threads") = 3,
py::arg("chunk_size") = 50'000, py::arg("n_chunks") = 10)
.def("push_pedestal_frame", .def("push_pedestal_frame",
[](ClusterFinderMT<ClusterType, uint16_t, pd_type> &self, [](ClusterFinderMT<ClusterType, uint16_t, pd_type> &self,
py::array_t<uint16_t> frame) { py::array_t<uint16_t> frame) {
auto view = make_view_2d(frame); auto view = make_view_2d(frame);
self.push_pedestal_frame(view); self.push_pedestal_frame(view);
}) })
.def("push_pedestal_mean",
[](ClusterFinderMT<ClusterType, uint16_t, pd_type> &self,
py::array_t<double> frame, uint32_t chunk_number) {
auto view = make_view_2d(frame);
self.push_pedestal_mean(view, chunk_number);
})
.def("push_pedestal_std",
[](ClusterFinderMT<ClusterType, uint16_t, pd_type> &self,
py::array_t<double> frame, uint32_t chunk_number) {
auto view = make_view_2d(frame);
self.push_pedestal_std(view, chunk_number);
})
.def( .def(
"find_clusters", "find_clusters",
[](ClusterFinderMT<ClusterType, uint16_t, pd_type> &self, [](ClusterFinderMT<ClusterType, uint16_t, pd_type> &self,