From 403d10b668cd561e5d709e430335ef43cda4ee1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Fr=C3=B6jdh?= Date: Tue, 16 Jul 2024 18:43:23 +0200 Subject: [PATCH] Cpp multi example (#84) - Small modifications to the ProducerConsumerQueue to allow for storage in a vector - Example showing multi threading in C++ using queues - fixes for python bindings --------- Co-authored-by: Bechir Co-authored-by: Bechir Braham --- examples/CMakeLists.txt | 2 +- examples/clustering.cpp | 151 ++++++++++++++++++ src/core/include/aare/core/MultiThread.hpp | 30 ++++ .../aare/core/ProducerConsumerQueue.hpp | 25 ++- src/python/cpp/core.hpp | 19 +++ src/python/cpp/file_io.hpp | 2 +- src/python/cpp/processing.hpp | 40 ++++- src/python/example/mt_clusterFinder.py | 40 +++++ 8 files changed, 300 insertions(+), 9 deletions(-) create mode 100644 examples/clustering.cpp create mode 100644 src/core/include/aare/core/MultiThread.hpp create mode 100644 src/python/example/mt_clusterFinder.py diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 8007bc3..c7514dd 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -3,7 +3,7 @@ set(EXAMPLE_LIST "json_example;logger_example;numpy_read_example;multiport_examp set(EXAMPLE_LIST "${EXAMPLE_LIST};mythen_example;numpy_write_example;zmq_sender_example;") set(EXAMPLE_LIST "${EXAMPLE_LIST};cluster_example;zmq_multi_receiver;zmq_worker;zmq_sink") set(EXAMPLE_LIST "${EXAMPLE_LIST};zmq_task_ventilator;zmq_restream_example;zmq_receiver_example") -set(EXAMPLE_LIST "${EXAMPLE_LIST};testing_pedestal;cluster_finder_example;reorder_moench_example") +set(EXAMPLE_LIST "${EXAMPLE_LIST};testing_pedestal;cluster_finder_example;reorder_moench_example;clustering") foreach(example ${EXAMPLE_LIST}) add_executable(${example} ${example}.cpp) diff --git a/examples/clustering.cpp b/examples/clustering.cpp new file mode 100644 index 0000000..68c2d41 --- /dev/null +++ b/examples/clustering.cpp @@ -0,0 +1,151 @@ + +#include "aare/file_io.hpp" +#include "aare/processing/ClusterFinder.hpp" +#include "aare/processing/Pedestal.hpp" + +#include +#include +using namespace std::chrono; +#include "aare/core/ProducerConsumerQueue.hpp" +#include +#include + +using Queue = folly::ProducerConsumerQueue; + +//Global constants, for easy tweaking +constexpr size_t print_interval = 100; +constexpr std::chrono::milliseconds default_wait(1); +constexpr uint32_t queue_size = 1000; +constexpr int n_frames = 8000; + +// Small wrapper to do cluster finding in a thread +// helps with keeping track of stopping tokens etc. +class ThreadedClusterFinder { + std::atomic m_stop_requested = false; + std::atomic m_frames_processed = 0; + Queue *m_queue = nullptr; + aare::Pedestal m_pedestal; + int m_object_id; + + public: + ThreadedClusterFinder(Queue &q, aare::Pedestal pd, int id) : m_queue(&q), m_pedestal(pd), m_object_id(id) {} + + size_t frames_processed() const { return m_frames_processed; } + void request_stop() { + fmt::print("{}:Stop requested\n", m_object_id); + m_stop_requested = true; + } + + void find_clusters() { + aare::ClusterFinder cf(3, 3, 5, 0); + while (!m_stop_requested) { + aare::Frame frame(1, 1, aare::Dtype("u4")); + if (m_queue->read(frame)) { + auto clusters = cf.find_clusters_without_threshold(frame.view(), m_pedestal, false); + m_frames_processed++; + if (m_frames_processed % print_interval == 0) { + fmt::print("{}:Found {} clusters\n", m_object_id, clusters.size()); + } + } else { + // fmt::print("{}:Queue empty\n", m_object_id); + std::this_thread::sleep_for(default_wait); + } + } + fmt::print("{}:Done\n", m_object_id); + } +}; + +int main(int argc, char **argv) { + //Rudimentary argument parsing + if (argc != 3) { + fmt::print("Usage: {} \n", argv[0]); + return 1; + } + + std::filesystem::path fname(argv[1]); + fmt::print("Loading {}\n", fname.string()); + const int n_threads = std::stoi(argv[2]); + + aare::Pedestal pd(400, 400, 1000); + aare::File f(fname, "r"); + + // Use the first 1000 frames to calculate the pedestal + // we can then copy this pedestal to each thread + auto t0 = high_resolution_clock::now(); + for (int i = 0; i < 1000; ++i) { + aare::Frame frame = f.iread(i); + pd.push(frame); + } + auto t1 = high_resolution_clock::now(); + fmt::print("Pedestal run took: {}s\n", duration_cast(t1 - t0).count() / 1e6); + + //--------------------------------------------------------------------------------------------- + //---------------------- Now lets start with the setup for the threaded cluster finding + + // We need one queue per thread... + std::vector queues; + for (int i = 0; i < n_threads; ++i) { + queues.emplace_back(queue_size); + } + + // and also one cluster finder per thread + std::vector> cluster_finders; + for (int i = 0; i < n_threads; ++i) { + cluster_finders.push_back(std::make_unique(queues[i], pd, i)); + } + + // next we start the threads + std::vector threads; + for (int i = 0; i < n_threads; ++i) { + threads.emplace_back(&ThreadedClusterFinder::find_clusters, cluster_finders[i].get()); + } + + // Push frames to the queues + + for (int i = 0; i < n_frames; ++i) { + // if the Queue is full, wait, there are better ways to do this =) + while (queues[i % n_threads].isFull()) { + // fmt::print("Queue {} is full, waiting\n", i % n_threads); + std::this_thread::sleep_for(default_wait); + } + queues[i % n_threads].write(f.iread(i+1000)); + if (i % 100 == 0) { + fmt::print("Pushed frame {}\n", i); + } + } + + + // wait for all queues to be empty + for (auto &q : queues) { + while (!q.isEmpty()) { + // fmt::print("Finish Queue not empty, waiting\n"); + std::this_thread::sleep_for(default_wait); + } + } + + // and once empty we stop the cluster finders + for (auto &cf : cluster_finders) { + cf->request_stop(); + } + for (auto &t : threads) { + t.join(); + } + + size_t total_frames = 0; + for (auto &cf : cluster_finders) { + total_frames += cf->frames_processed(); + } + auto t2 = high_resolution_clock::now(); + fmt::print("Processed {} frames in {}s\n", total_frames, duration_cast(t2 - t1).count() / 1e6); + + // auto start = high_resolution_clock::now(); + // aare::ClusterFinder cf(3, 3, 5, 0); + // for (int i = 1000; i<2000; ++i){ + // aare::Frame frame = f.iread(i); + // auto clusters = cf.find_clusters_without_threshold(frame.view(), pd, true); + // } + + // auto stop = high_resolution_clock::now(); + // auto duration = duration_cast(stop - start); + // fmt::print("Run took: {}s\n", duration.count()/1e6); +} \ No newline at end of file diff --git a/src/core/include/aare/core/MultiThread.hpp b/src/core/include/aare/core/MultiThread.hpp new file mode 100644 index 0000000..26bbba6 --- /dev/null +++ b/src/core/include/aare/core/MultiThread.hpp @@ -0,0 +1,30 @@ +// class that takes std::function as a constructor argument +// and run each of them in different threads + +#pragma once + +#include +#include +#include + +namespace aare { + +class MultiThread { + public: + explicit MultiThread(std::vector> const &functions) : functions_(functions) {} + + void run() { + std::vector threads; + for (auto const &f : functions_) { + threads.emplace_back(f); + } + for (auto &t : threads) { + t.join(); + } + } + + private: + std::vector> functions_; +}; + +} // namespace aare \ No newline at end of file diff --git a/src/core/include/aare/core/ProducerConsumerQueue.hpp b/src/core/include/aare/core/ProducerConsumerQueue.hpp index 0ac71df..6ba3d23 100644 --- a/src/core/include/aare/core/ProducerConsumerQueue.hpp +++ b/src/core/include/aare/core/ProducerConsumerQueue.hpp @@ -44,6 +44,25 @@ template struct ProducerConsumerQueue { ProducerConsumerQueue(const ProducerConsumerQueue &) = delete; ProducerConsumerQueue &operator=(const ProducerConsumerQueue &) = delete; + + ProducerConsumerQueue(ProducerConsumerQueue &&other){ + size_ = other.size_; + records_ = other.records_; + other.records_ = nullptr; + readIndex_ = other.readIndex_.load(std::memory_order_acquire); + writeIndex_ = other.writeIndex_.load(std::memory_order_acquire); + } + ProducerConsumerQueue &operator=(ProducerConsumerQueue &&other){ + size_ = other.size_; + records_ = other.records_; + other.records_ = nullptr; + readIndex_ = other.readIndex_.load(std::memory_order_acquire); + writeIndex_ = other.writeIndex_.load(std::memory_order_acquire); + return *this; + } + + + ProducerConsumerQueue():ProducerConsumerQueue(2){}; // size must be >= 2. // // Also, note that the number of usable slots in the queue at any @@ -169,8 +188,10 @@ template struct ProducerConsumerQueue { using AtomicIndex = std::atomic; char pad0_[hardware_destructive_interference_size]; - const uint32_t size_; - T *const records_; + // const uint32_t size_; + uint32_t size_; + // T *const records_; + T* records_; alignas(hardware_destructive_interference_size) AtomicIndex readIndex_; alignas(hardware_destructive_interference_size) AtomicIndex writeIndex_; diff --git a/src/python/cpp/core.hpp b/src/python/cpp/core.hpp index d5ae746..332cce7 100644 --- a/src/python/cpp/core.hpp +++ b/src/python/cpp/core.hpp @@ -10,6 +10,7 @@ #include "aare/core/Frame.hpp" #include "aare/core/Transforms.hpp" #include "aare/core/defs.hpp" +#include "aare/core/MultiThread.hpp" template void define_to_frame(py::module &m) { m.def("to_frame", [](py::array_t &np_array) { @@ -156,4 +157,22 @@ void define_core_bindings(py::module &m) { define_to_frame(m); define_to_frame(m); define_to_frame(m); + + + py::class_(m, "MultiThread") + // .def(py::init([](std::vector> const &python_functions) { + // std::vector> functions; + // for (auto const &python_function : python_functions) { + // std::function tmp = [python_function]() { python_function(); }; + // functions.emplace_back(tmp); + // } + // return std::unique_ptr(new MultiThread(functions)); + // } + + // )) + .def(py::init> const &>()) + .def("run", [](MultiThread &self) { + py::gil_scoped_release release; + return self.run(); + }); } \ No newline at end of file diff --git a/src/python/cpp/file_io.hpp b/src/python/cpp/file_io.hpp index 6c10888..406a397 100644 --- a/src/python/cpp/file_io.hpp +++ b/src/python/cpp/file_io.hpp @@ -20,7 +20,7 @@ void define_file_io_bindings(py::module &m) { .def(py::init()) .def("read", py::overload_cast<>(&File::read)) .def("read", py::overload_cast(&File::read)) - .def("iread", py::overload_cast(&File::iread)) + .def("iread", py::overload_cast(&File::iread),py::call_guard()) .def("frame_number", &File::frame_number) .def_property_readonly("bytes_per_frame", &File::bytes_per_frame) .def_property_readonly("pixels_per_frame", &File::pixels_per_frame) diff --git a/src/python/cpp/processing.hpp b/src/python/cpp/processing.hpp index dc8119f..d04215c 100644 --- a/src/python/cpp/processing.hpp +++ b/src/python/cpp/processing.hpp @@ -43,10 +43,11 @@ template void define_pedestal_bindings(py::module &m) { .def(py::init()) .def("set_freeze", &Pedestal::set_freeze) .def("mean", py::overload_cast<>(&Pedestal::mean)) - .def("mean", [](Pedestal &pedestal, const uint32_t row, const uint32_t col) { return pedestal.mean(row, col); }) + .def("mean", [](Pedestal &pedestal, const uint32_t row, + const uint32_t col) { return pedestal.mean(row, col); }) .def("variance", py::overload_cast<>(&Pedestal::variance)) - .def("variance", - [](Pedestal &pedestal, const uint32_t row, const uint32_t col) { return pedestal.variance(row, col); }) + .def("variance", [](Pedestal &pedestal, const uint32_t row, + const uint32_t col) { return pedestal.variance(row, col); }) .def("standard_deviation", py::overload_cast<>(&Pedestal::standard_deviation)) .def("standard_deviation", [](Pedestal &pedestal, const int row, const int col) { return pedestal.standard_deviation(row, col); }) @@ -57,7 +58,10 @@ template void define_pedestal_bindings(py::module &m) { .def_property_readonly("n_samples", &Pedestal::n_samples) .def_property_readonly("index", &Pedestal::index) .def_property_readonly("sum", &Pedestal::get_sum) - .def_property_readonly("sum2", &Pedestal::get_sum2); + .def_property_readonly("sum2", &Pedestal::get_sum2) + .def("copy",[&](Pedestal &pedestal) { + return Pedestal(pedestal); + }); p.def("push", [](Pedestal &pedestal, Frame &f) { if (f.bitdepth() == 8) { pedestal.template push(f); @@ -138,11 +142,37 @@ void define_cluster_finder_bindings(py::module &m) { define_cluster_finder_template_bindings(cf); define_cluster_finder_template_bindings(cf); define_cluster_finder_template_bindings(cf); + cf.def("find_clusters_without_threshold", + [](ClusterFinder &self, Frame &f, Pedestal &pedestal, bool late_update) { + if (f.dtype() == Dtype::INT8) { + return self.find_clusters_without_threshold(f.view(), pedestal, late_update); + } else if (f.dtype() == Dtype::INT16) { + return self.find_clusters_without_threshold(f.view(), pedestal, late_update); + } else if (f.dtype() == Dtype::INT32) { + return self.find_clusters_without_threshold(f.view(), pedestal, late_update); + } else if (f.dtype() == Dtype::INT64) { + return self.find_clusters_without_threshold(f.view(), pedestal, late_update); + } else if (f.dtype() == Dtype::UINT8) { + return self.find_clusters_without_threshold(f.view(), pedestal, late_update); + } else if (f.dtype() == Dtype::UINT16) { + return self.find_clusters_without_threshold(f.view(), pedestal, late_update); + } else if (f.dtype() == Dtype::UINT32) { + return self.find_clusters_without_threshold(f.view(), pedestal, late_update); + } else if (f.dtype() == Dtype::UINT64) { + return self.find_clusters_without_threshold(f.view(), pedestal, late_update); + } else if (f.dtype() == Dtype::FLOAT) { + return self.find_clusters_without_threshold(f.view(), pedestal, late_update); + } else if (f.dtype() == Dtype::DOUBLE) { + return self.find_clusters_without_threshold(f.view(), pedestal, late_update); + } else { + throw std::runtime_error("Unsupported dtype"); + } + },py::call_guard()); } void define_processing_bindings(py::module &m) { define_pedestal_bindings(m); - py::class_(m, "Cluster",py::buffer_protocol()) + py::class_(m, "Cluster", py::buffer_protocol()) .def(py::init()) .def("size", &Cluster::size) .def("begin", &Cluster::begin) diff --git a/src/python/example/mt_clusterFinder.py b/src/python/example/mt_clusterFinder.py new file mode 100644 index 0000000..9c0bff8 --- /dev/null +++ b/src/python/example/mt_clusterFinder.py @@ -0,0 +1,40 @@ +from pathlib import Path +import sys +PROJECT_ROOT_DIR=(Path(__file__) / "../../../../").resolve() +print(PROJECT_ROOT_DIR) +sys.path.append(str((PROJECT_ROOT_DIR / 'build').resolve())) +from threading import Thread + +from _aare import * + +file_path = None +N_THREADS = None +if len(sys.argv) <= 2: + raise Exception("Usage: mt_clusterFinder.py ") +else: + file_path = sys.argv[1] + N_THREADS = int(sys.argv[2]) + +file = File(file_path) +pedestal=Pedestal(400,400,1000) +for i in range(1000): + frame = file.iread(i) + pedestal.push(frame) +print("Pedestal done") + +def f(idx,n): + def g(): + print("Hello from thread",idx) + f = File(file_path) + p = pedestal.copy() + cf = ClusterFinder(3,3,5.0,0) + for i in range(idx,1000,n): + frame = f.iread(i) + clusters=cf.find_clusters_without_threshold(frame,p,False) + + print("Goodbye from thread",idx) + return g + + +mt = MultiThread([f(i,N_THREADS) for i in range(N_THREADS)]) +mt.run() \ No newline at end of file