Cpp multi example (#84)

- Small modifications to the ProducerConsumerQueue to allow for storage
in a vector
- Example showing multi threading in C++ using queues
- fixes for python bindings

---------

Co-authored-by: Bechir <bechir.brahem420@gmail.com>
Co-authored-by: Bechir Braham <bachbrahem@gmail.com>
This commit is contained in:
Erik Fröjdh 2024-07-16 18:43:23 +02:00 committed by GitHub
parent 246ac90f71
commit 403d10b668
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 300 additions and 9 deletions

View File

@ -3,7 +3,7 @@ set(EXAMPLE_LIST "json_example;logger_example;numpy_read_example;multiport_examp
set(EXAMPLE_LIST "${EXAMPLE_LIST};mythen_example;numpy_write_example;zmq_sender_example;") set(EXAMPLE_LIST "${EXAMPLE_LIST};mythen_example;numpy_write_example;zmq_sender_example;")
set(EXAMPLE_LIST "${EXAMPLE_LIST};cluster_example;zmq_multi_receiver;zmq_worker;zmq_sink") set(EXAMPLE_LIST "${EXAMPLE_LIST};cluster_example;zmq_multi_receiver;zmq_worker;zmq_sink")
set(EXAMPLE_LIST "${EXAMPLE_LIST};zmq_task_ventilator;zmq_restream_example;zmq_receiver_example") set(EXAMPLE_LIST "${EXAMPLE_LIST};zmq_task_ventilator;zmq_restream_example;zmq_receiver_example")
set(EXAMPLE_LIST "${EXAMPLE_LIST};testing_pedestal;cluster_finder_example;reorder_moench_example") set(EXAMPLE_LIST "${EXAMPLE_LIST};testing_pedestal;cluster_finder_example;reorder_moench_example;clustering")
foreach(example ${EXAMPLE_LIST}) foreach(example ${EXAMPLE_LIST})
add_executable(${example} ${example}.cpp) add_executable(${example} ${example}.cpp)

151
examples/clustering.cpp Normal file
View File

@ -0,0 +1,151 @@
#include "aare/file_io.hpp"
#include "aare/processing/ClusterFinder.hpp"
#include "aare/processing/Pedestal.hpp"
#include <chrono>
#include <fmt/format.h>
using namespace std::chrono;
#include "aare/core/ProducerConsumerQueue.hpp"
#include <memory>
#include <thread>
using Queue = folly::ProducerConsumerQueue<aare::Frame>;
//Global constants, for easy tweaking
constexpr size_t print_interval = 100;
constexpr std::chrono::milliseconds default_wait(1);
constexpr uint32_t queue_size = 1000;
constexpr int n_frames = 8000;
// Small wrapper to do cluster finding in a thread
// helps with keeping track of stopping tokens etc.
class ThreadedClusterFinder {
std::atomic<bool> m_stop_requested = false;
std::atomic<size_t> m_frames_processed = 0;
Queue *m_queue = nullptr;
aare::Pedestal<double> m_pedestal;
int m_object_id;
public:
ThreadedClusterFinder(Queue &q, aare::Pedestal<double> pd, int id) : m_queue(&q), m_pedestal(pd), m_object_id(id) {}
size_t frames_processed() const { return m_frames_processed; }
void request_stop() {
fmt::print("{}:Stop requested\n", m_object_id);
m_stop_requested = true;
}
void find_clusters() {
aare::ClusterFinder cf(3, 3, 5, 0);
while (!m_stop_requested) {
aare::Frame frame(1, 1, aare::Dtype("u4"));
if (m_queue->read(frame)) {
auto clusters = cf.find_clusters_without_threshold(frame.view<uint16_t>(), m_pedestal, false);
m_frames_processed++;
if (m_frames_processed % print_interval == 0) {
fmt::print("{}:Found {} clusters\n", m_object_id, clusters.size());
}
} else {
// fmt::print("{}:Queue empty\n", m_object_id);
std::this_thread::sleep_for(default_wait);
}
}
fmt::print("{}:Done\n", m_object_id);
}
};
int main(int argc, char **argv) {
//Rudimentary argument parsing
if (argc != 3) {
fmt::print("Usage: {} <file> <n_threads>\n", argv[0]);
return 1;
}
std::filesystem::path fname(argv[1]);
fmt::print("Loading {}\n", fname.string());
const int n_threads = std::stoi(argv[2]);
aare::Pedestal<double> pd(400, 400, 1000);
aare::File f(fname, "r");
// Use the first 1000 frames to calculate the pedestal
// we can then copy this pedestal to each thread
auto t0 = high_resolution_clock::now();
for (int i = 0; i < 1000; ++i) {
aare::Frame frame = f.iread(i);
pd.push<uint16_t>(frame);
}
auto t1 = high_resolution_clock::now();
fmt::print("Pedestal run took: {}s\n", duration_cast<microseconds>(t1 - t0).count() / 1e6);
//---------------------------------------------------------------------------------------------
//---------------------- Now lets start with the setup for the threaded cluster finding
// We need one queue per thread...
std::vector<Queue> queues;
for (int i = 0; i < n_threads; ++i) {
queues.emplace_back(queue_size);
}
// and also one cluster finder per thread
std::vector<std::unique_ptr<ThreadedClusterFinder>> cluster_finders;
for (int i = 0; i < n_threads; ++i) {
cluster_finders.push_back(std::make_unique<ThreadedClusterFinder>(queues[i], pd, i));
}
// next we start the threads
std::vector<std::thread> threads;
for (int i = 0; i < n_threads; ++i) {
threads.emplace_back(&ThreadedClusterFinder::find_clusters, cluster_finders[i].get());
}
// Push frames to the queues
for (int i = 0; i < n_frames; ++i) {
// if the Queue is full, wait, there are better ways to do this =)
while (queues[i % n_threads].isFull()) {
// fmt::print("Queue {} is full, waiting\n", i % n_threads);
std::this_thread::sleep_for(default_wait);
}
queues[i % n_threads].write(f.iread(i+1000));
if (i % 100 == 0) {
fmt::print("Pushed frame {}\n", i);
}
}
// wait for all queues to be empty
for (auto &q : queues) {
while (!q.isEmpty()) {
// fmt::print("Finish Queue not empty, waiting\n");
std::this_thread::sleep_for(default_wait);
}
}
// and once empty we stop the cluster finders
for (auto &cf : cluster_finders) {
cf->request_stop();
}
for (auto &t : threads) {
t.join();
}
size_t total_frames = 0;
for (auto &cf : cluster_finders) {
total_frames += cf->frames_processed();
}
auto t2 = high_resolution_clock::now();
fmt::print("Processed {} frames in {}s\n", total_frames, duration_cast<microseconds>(t2 - t1).count() / 1e6);
// auto start = high_resolution_clock::now();
// aare::ClusterFinder cf(3, 3, 5, 0);
// for (int i = 1000; i<2000; ++i){
// aare::Frame frame = f.iread(i);
// auto clusters = cf.find_clusters_without_threshold(frame.view<uint16_t>(), pd, true);
// }
// auto stop = high_resolution_clock::now();
// auto duration = duration_cast<microseconds>(stop - start);
// fmt::print("Run took: {}s\n", duration.count()/1e6);
}

View File

@ -0,0 +1,30 @@
// class that takes std::function as a constructor argument
// and run each of them in different threads
#pragma once
#include <functional>
#include <thread>
#include <vector>
namespace aare {
class MultiThread {
public:
explicit MultiThread(std::vector<std::function<void()>> const &functions) : functions_(functions) {}
void run() {
std::vector<std::thread> threads;
for (auto const &f : functions_) {
threads.emplace_back(f);
}
for (auto &t : threads) {
t.join();
}
}
private:
std::vector<std::function<void()>> functions_;
};
} // namespace aare

View File

@ -44,6 +44,25 @@ template <class T> struct ProducerConsumerQueue {
ProducerConsumerQueue(const ProducerConsumerQueue &) = delete; ProducerConsumerQueue(const ProducerConsumerQueue &) = delete;
ProducerConsumerQueue &operator=(const ProducerConsumerQueue &) = delete; ProducerConsumerQueue &operator=(const ProducerConsumerQueue &) = delete;
ProducerConsumerQueue(ProducerConsumerQueue &&other){
size_ = other.size_;
records_ = other.records_;
other.records_ = nullptr;
readIndex_ = other.readIndex_.load(std::memory_order_acquire);
writeIndex_ = other.writeIndex_.load(std::memory_order_acquire);
}
ProducerConsumerQueue &operator=(ProducerConsumerQueue &&other){
size_ = other.size_;
records_ = other.records_;
other.records_ = nullptr;
readIndex_ = other.readIndex_.load(std::memory_order_acquire);
writeIndex_ = other.writeIndex_.load(std::memory_order_acquire);
return *this;
}
ProducerConsumerQueue():ProducerConsumerQueue(2){};
// size must be >= 2. // size must be >= 2.
// //
// Also, note that the number of usable slots in the queue at any // Also, note that the number of usable slots in the queue at any
@ -169,8 +188,10 @@ template <class T> struct ProducerConsumerQueue {
using AtomicIndex = std::atomic<unsigned int>; using AtomicIndex = std::atomic<unsigned int>;
char pad0_[hardware_destructive_interference_size]; char pad0_[hardware_destructive_interference_size];
const uint32_t size_; // const uint32_t size_;
T *const records_; uint32_t size_;
// T *const records_;
T* records_;
alignas(hardware_destructive_interference_size) AtomicIndex readIndex_; alignas(hardware_destructive_interference_size) AtomicIndex readIndex_;
alignas(hardware_destructive_interference_size) AtomicIndex writeIndex_; alignas(hardware_destructive_interference_size) AtomicIndex writeIndex_;

View File

@ -10,6 +10,7 @@
#include "aare/core/Frame.hpp" #include "aare/core/Frame.hpp"
#include "aare/core/Transforms.hpp" #include "aare/core/Transforms.hpp"
#include "aare/core/defs.hpp" #include "aare/core/defs.hpp"
#include "aare/core/MultiThread.hpp"
template <typename T> void define_to_frame(py::module &m) { template <typename T> void define_to_frame(py::module &m) {
m.def("to_frame", [](py::array_t<T> &np_array) { m.def("to_frame", [](py::array_t<T> &np_array) {
@ -156,4 +157,22 @@ void define_core_bindings(py::module &m) {
define_to_frame<int64_t>(m); define_to_frame<int64_t>(m);
define_to_frame<float>(m); define_to_frame<float>(m);
define_to_frame<double>(m); define_to_frame<double>(m);
py::class_<MultiThread>(m, "MultiThread")
// .def(py::init([](std::vector<std::function<void()>> const &python_functions) {
// std::vector<std::function<void()>> functions;
// for (auto const &python_function : python_functions) {
// std::function<void()> tmp = [python_function]() { python_function(); };
// functions.emplace_back(tmp);
// }
// return std::unique_ptr<MultiThread>(new MultiThread(functions));
// }
// ))
.def(py::init<std::vector<std::function<void()>> const &>())
.def("run", [](MultiThread &self) {
py::gil_scoped_release release;
return self.run();
});
} }

View File

@ -20,7 +20,7 @@ void define_file_io_bindings(py::module &m) {
.def(py::init<const std::filesystem::path &, const std::string &, const FileConfig &>()) .def(py::init<const std::filesystem::path &, const std::string &, const FileConfig &>())
.def("read", py::overload_cast<>(&File::read)) .def("read", py::overload_cast<>(&File::read))
.def("read", py::overload_cast<size_t>(&File::read)) .def("read", py::overload_cast<size_t>(&File::read))
.def("iread", py::overload_cast<size_t>(&File::iread)) .def("iread", py::overload_cast<size_t>(&File::iread),py::call_guard<py::gil_scoped_release>())
.def("frame_number", &File::frame_number) .def("frame_number", &File::frame_number)
.def_property_readonly("bytes_per_frame", &File::bytes_per_frame) .def_property_readonly("bytes_per_frame", &File::bytes_per_frame)
.def_property_readonly("pixels_per_frame", &File::pixels_per_frame) .def_property_readonly("pixels_per_frame", &File::pixels_per_frame)

View File

@ -43,10 +43,11 @@ template <typename SUM_TYPE> void define_pedestal_bindings(py::module &m) {
.def(py::init<int, int>()) .def(py::init<int, int>())
.def("set_freeze", &Pedestal<SUM_TYPE>::set_freeze) .def("set_freeze", &Pedestal<SUM_TYPE>::set_freeze)
.def("mean", py::overload_cast<>(&Pedestal<SUM_TYPE>::mean)) .def("mean", py::overload_cast<>(&Pedestal<SUM_TYPE>::mean))
.def("mean", [](Pedestal<SUM_TYPE> &pedestal, const uint32_t row, const uint32_t col) { return pedestal.mean(row, col); }) .def("mean", [](Pedestal<SUM_TYPE> &pedestal, const uint32_t row,
const uint32_t col) { return pedestal.mean(row, col); })
.def("variance", py::overload_cast<>(&Pedestal<SUM_TYPE>::variance)) .def("variance", py::overload_cast<>(&Pedestal<SUM_TYPE>::variance))
.def("variance", .def("variance", [](Pedestal<SUM_TYPE> &pedestal, const uint32_t row,
[](Pedestal<SUM_TYPE> &pedestal, const uint32_t row, const uint32_t col) { return pedestal.variance(row, col); }) const uint32_t col) { return pedestal.variance(row, col); })
.def("standard_deviation", py::overload_cast<>(&Pedestal<SUM_TYPE>::standard_deviation)) .def("standard_deviation", py::overload_cast<>(&Pedestal<SUM_TYPE>::standard_deviation))
.def("standard_deviation", [](Pedestal<SUM_TYPE> &pedestal, const int row, .def("standard_deviation", [](Pedestal<SUM_TYPE> &pedestal, const int row,
const int col) { return pedestal.standard_deviation(row, col); }) const int col) { return pedestal.standard_deviation(row, col); })
@ -57,7 +58,10 @@ template <typename SUM_TYPE> void define_pedestal_bindings(py::module &m) {
.def_property_readonly("n_samples", &Pedestal<SUM_TYPE>::n_samples) .def_property_readonly("n_samples", &Pedestal<SUM_TYPE>::n_samples)
.def_property_readonly("index", &Pedestal<SUM_TYPE>::index) .def_property_readonly("index", &Pedestal<SUM_TYPE>::index)
.def_property_readonly("sum", &Pedestal<SUM_TYPE>::get_sum) .def_property_readonly("sum", &Pedestal<SUM_TYPE>::get_sum)
.def_property_readonly("sum2", &Pedestal<SUM_TYPE>::get_sum2); .def_property_readonly("sum2", &Pedestal<SUM_TYPE>::get_sum2)
.def("copy",[&](Pedestal<SUM_TYPE> &pedestal) {
return Pedestal<SUM_TYPE>(pedestal);
});
p.def("push", [](Pedestal<SUM_TYPE> &pedestal, Frame &f) { p.def("push", [](Pedestal<SUM_TYPE> &pedestal, Frame &f) {
if (f.bitdepth() == 8) { if (f.bitdepth() == 8) {
pedestal.template push<uint8_t>(f); pedestal.template push<uint8_t>(f);
@ -138,11 +142,37 @@ void define_cluster_finder_bindings(py::module &m) {
define_cluster_finder_template_bindings<int64_t>(cf); define_cluster_finder_template_bindings<int64_t>(cf);
define_cluster_finder_template_bindings<float>(cf); define_cluster_finder_template_bindings<float>(cf);
define_cluster_finder_template_bindings<double>(cf); define_cluster_finder_template_bindings<double>(cf);
cf.def("find_clusters_without_threshold",
[](ClusterFinder &self, Frame &f, Pedestal<double> &pedestal, bool late_update) {
if (f.dtype() == Dtype::INT8) {
return self.find_clusters_without_threshold(f.view<int8_t>(), pedestal, late_update);
} else if (f.dtype() == Dtype::INT16) {
return self.find_clusters_without_threshold(f.view<int16_t>(), pedestal, late_update);
} else if (f.dtype() == Dtype::INT32) {
return self.find_clusters_without_threshold(f.view<int32_t>(), pedestal, late_update);
} else if (f.dtype() == Dtype::INT64) {
return self.find_clusters_without_threshold(f.view<int64_t>(), pedestal, late_update);
} else if (f.dtype() == Dtype::UINT8) {
return self.find_clusters_without_threshold(f.view<uint8_t>(), pedestal, late_update);
} else if (f.dtype() == Dtype::UINT16) {
return self.find_clusters_without_threshold(f.view<uint16_t>(), pedestal, late_update);
} else if (f.dtype() == Dtype::UINT32) {
return self.find_clusters_without_threshold(f.view<uint32_t>(), pedestal, late_update);
} else if (f.dtype() == Dtype::UINT64) {
return self.find_clusters_without_threshold(f.view<uint64_t>(), pedestal, late_update);
} else if (f.dtype() == Dtype::FLOAT) {
return self.find_clusters_without_threshold(f.view<float>(), pedestal, late_update);
} else if (f.dtype() == Dtype::DOUBLE) {
return self.find_clusters_without_threshold(f.view<double>(), pedestal, late_update);
} else {
throw std::runtime_error("Unsupported dtype");
}
},py::call_guard<py::gil_scoped_release>());
} }
void define_processing_bindings(py::module &m) { void define_processing_bindings(py::module &m) {
define_pedestal_bindings<double>(m); define_pedestal_bindings<double>(m);
py::class_<Cluster>(m, "Cluster",py::buffer_protocol()) py::class_<Cluster>(m, "Cluster", py::buffer_protocol())
.def(py::init<int, int, Dtype>()) .def(py::init<int, int, Dtype>())
.def("size", &Cluster::size) .def("size", &Cluster::size)
.def("begin", &Cluster::begin) .def("begin", &Cluster::begin)

View File

@ -0,0 +1,40 @@
from pathlib import Path
import sys
PROJECT_ROOT_DIR=(Path(__file__) / "../../../../").resolve()
print(PROJECT_ROOT_DIR)
sys.path.append(str((PROJECT_ROOT_DIR / 'build').resolve()))
from threading import Thread
from _aare import *
file_path = None
N_THREADS = None
if len(sys.argv) <= 2:
raise Exception("Usage: mt_clusterFinder.py <file> <n_threads>")
else:
file_path = sys.argv[1]
N_THREADS = int(sys.argv[2])
file = File(file_path)
pedestal=Pedestal(400,400,1000)
for i in range(1000):
frame = file.iread(i)
pedestal.push(frame)
print("Pedestal done")
def f(idx,n):
def g():
print("Hello from thread",idx)
f = File(file_path)
p = pedestal.copy()
cf = ClusterFinder(3,3,5.0,0)
for i in range(idx,1000,n):
frame = f.iread(i)
clusters=cf.find_clusters_without_threshold(frame,p,False)
print("Goodbye from thread",idx)
return g
mt = MultiThread([f(i,N_THREADS) for i in range(N_THREADS)])
mt.run()