diff --git a/include/aare/CircularFifo.hpp b/include/aare/CircularFifo.hpp new file mode 100644 index 0000000..8098082 --- /dev/null +++ b/include/aare/CircularFifo.hpp @@ -0,0 +1,97 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "aare/ProducerConsumerQueue.hpp" + +namespace aare { + +template class CircularFifo { + uint32_t fifo_size; + aare::ProducerConsumerQueue free_slots; + aare::ProducerConsumerQueue filled_slots; + + public: + CircularFifo() : CircularFifo(100){}; + CircularFifo(uint32_t size) : fifo_size(size), free_slots(size + 1), filled_slots(size + 1) { + + // TODO! how do we deal with alignment for writing? alignas??? + // Do we give the user a chance to provide memory locations? + // Templated allocator? + for (size_t i = 0; i < fifo_size; ++i) { + free_slots.write(ItemType{}); + } + } + + bool next() { + // TODO! avoid default constructing ItemType + ItemType it; + if (!filled_slots.read(it)) + return false; + if (!free_slots.write(std::move(it))) + return false; + return true; + } + + ~CircularFifo() {} + + using value_type = ItemType; + + auto numFilledSlots() const noexcept { return filled_slots.sizeGuess(); } + auto numFreeSlots() const noexcept { return free_slots.sizeGuess(); } + auto isFull() const noexcept { return filled_slots.isFull(); } + + ItemType pop_free() { + ItemType v; + while (!free_slots.read(v)) + ; + return std::move(v); + // return v; + } + + bool try_pop_free(ItemType &v) { return free_slots.read(v); } + + ItemType pop_value(std::chrono::nanoseconds wait, std::atomic &stopped) { + ItemType v; + while (!filled_slots.read(v) && !stopped) { + std::this_thread::sleep_for(wait); + } + return std::move(v); + } + + ItemType pop_value() { + ItemType v; + while (!filled_slots.read(v)) + ; + return std::move(v); + } + + ItemType *frontPtr() { return filled_slots.frontPtr(); } + + // TODO! Add function to move item from filled to free to be used + // with the frontPtr function + + template void push_value(Args &&...recordArgs) { + while (!filled_slots.write(std::forward(recordArgs)...)) + ; + } + + template bool try_push_value(Args &&...recordArgs) { + return filled_slots.write(std::forward(recordArgs)...); + } + + template void push_free(Args &&...recordArgs) { + while (!free_slots.write(std::forward(recordArgs)...)) + ; + } + + template bool try_push_free(Args &&...recordArgs) { + return free_slots.write(std::forward(recordArgs)...); + } +}; + +} // namespace aare \ No newline at end of file diff --git a/include/aare/ClusterCollector.hpp b/include/aare/ClusterCollector.hpp new file mode 100644 index 0000000..0738062 --- /dev/null +++ b/include/aare/ClusterCollector.hpp @@ -0,0 +1,52 @@ +#pragma once +#include +#include + +#include "aare/ProducerConsumerQueue.hpp" +#include "aare/ClusterVector.hpp" +#include "aare/ClusterFinderMT.hpp" + +namespace aare { + +class ClusterCollector{ + ProducerConsumerQueue>* m_source; + std::atomic m_stop_requested{false}; + std::atomic m_stopped{true}; + std::chrono::milliseconds m_default_wait{1}; + std::thread m_thread; + std::vector> m_clusters; + + void process(){ + m_stopped = false; + fmt::print("ClusterCollector started\n"); + while (!m_stop_requested || !m_source->isEmpty()) { + if (ClusterVector *clusters = m_source->frontPtr(); + clusters != nullptr) { + m_clusters.push_back(std::move(*clusters)); + m_source->popFront(); + }else{ + std::this_thread::sleep_for(m_default_wait); + } + } + fmt::print("ClusterCollector stopped\n"); + m_stopped = true; + } + + public: + ClusterCollector(ClusterFinderMT* source){ + m_source = source->sink(); + m_thread = std::thread(&ClusterCollector::process, this); + } + void stop(){ + m_stop_requested = true; + m_thread.join(); + } + std::vector> steal_clusters(){ + if(!m_stopped){ + throw std::runtime_error("ClusterCollector is still running"); + } + return std::move(m_clusters); + } +}; + +} // namespace aare \ No newline at end of file diff --git a/include/aare/ClusterFinder.hpp b/include/aare/ClusterFinder.hpp index aa17d19..2fe33a7 100644 --- a/include/aare/ClusterFinder.hpp +++ b/include/aare/ClusterFinder.hpp @@ -10,26 +10,12 @@ namespace aare { -/** enum to define the event types */ -enum class eventType { - PEDESTAL, /** pedestal */ - NEIGHBOUR, /** neighbour i.e. below threshold, but in the cluster of a - photon */ - PHOTON, /** photon i.e. above threshold */ - PHOTON_MAX, /** maximum of a cluster satisfying the photon conditions */ - NEGATIVE_PEDESTAL, /** negative value, will not be accounted for as pedestal - in order to avoid drift of the pedestal towards - negative values */ - UNDEFINED_EVENT = -1 /** undefined */ -}; - template class ClusterFinder { Shape<2> m_image_size; const int m_cluster_sizeX; const int m_cluster_sizeY; - // const PEDESTAL_TYPE m_threshold; const PEDESTAL_TYPE m_nSigma; const PEDESTAL_TYPE c2; const PEDESTAL_TYPE c3; diff --git a/include/aare/ClusterFinderMT.hpp b/include/aare/ClusterFinderMT.hpp new file mode 100644 index 0000000..d3cbaf6 --- /dev/null +++ b/include/aare/ClusterFinderMT.hpp @@ -0,0 +1,189 @@ +#pragma once +#include +#include +#include +#include +#include + +#include "aare/NDArray.hpp" +#include "aare/ProducerConsumerQueue.hpp" + +namespace aare { + +enum class FrameType { + DATA, + PEDESTAL, +}; + +struct FrameWrapper { + FrameType type; + uint64_t frame_number; + NDArray data; +}; + + +template +class ClusterFinderMT { + size_t m_current_thread{0}; + size_t m_n_threads{0}; + using Finder = ClusterFinder; + using InputQueue = ProducerConsumerQueue; + using OutputQueue = ProducerConsumerQueue>; + std::vector> m_input_queues; + std::vector> m_output_queues; + + + OutputQueue m_sink{1000}; //All clusters go into this queue + + + std::vector> m_cluster_finders; + std::vector m_threads; + std::thread m_collect_thread; + std::chrono::milliseconds m_default_wait{1}; + + std::atomic m_stop_requested{false}; + std::atomic m_processing_threads_stopped{false}; + + void process(int thread_id) { + auto cf = m_cluster_finders[thread_id].get(); + auto q = m_input_queues[thread_id].get(); + // TODO! Avoid indexing into the vector every time + fmt::print("Thread {} started\n", thread_id); + //TODO! is this check enough to make sure we process all the frames? + while (!m_stop_requested || !q->isEmpty()) { + if (FrameWrapper *frame = q->frontPtr(); + frame != nullptr) { + // fmt::print("Thread {} got frame {}, type: {}\n", thread_id, + // frame->frame_number, static_cast(frame->type)); + + switch (frame->type) { + case FrameType::DATA: + cf->find_clusters( + frame->data.view()); + m_output_queues[thread_id]->write(cf->steal_clusters()); + + break; + + case FrameType::PEDESTAL: + m_cluster_finders[thread_id]->push_pedestal_frame( + frame->data.view()); + break; + + default: + break; + } + + // frame is processed now discard it + m_input_queues[thread_id]->popFront(); + } else { + std::this_thread::sleep_for(m_default_wait); + } + + } + fmt::print("Thread {} stopped\n", thread_id); + } + + /** + * @brief Collect all the clusters from the output queues and write them to the sink + */ + void collect(){ + bool empty = true; + while(!m_stop_requested || !empty || !m_processing_threads_stopped){ + empty = true; + for (auto &queue : m_output_queues) { + if (!queue->isEmpty()) { + + while(!m_sink.write(std::move(*queue->frontPtr()))){ + std::this_thread::sleep_for(m_default_wait); + } + queue->popFront(); + empty = false; + } + } + } + } + + public: + ClusterFinderMT(Shape<2> image_size, Shape<2> cluster_size, + PEDESTAL_TYPE nSigma = 5.0, size_t capacity = 2000, + size_t n_threads = 3) + : m_n_threads(n_threads) { + fmt::print("ClusterFinderMT: using {} threads\n", n_threads); + for (size_t i = 0; i < n_threads; i++) { + m_cluster_finders.push_back( + std::make_unique>( + image_size, cluster_size, nSigma, capacity)); + } + for (size_t i = 0; i < n_threads; i++) { + m_input_queues.emplace_back(std::make_unique(200)); + m_output_queues.emplace_back(std::make_unique(200)); + + } + + start(); + } + + ProducerConsumerQueue> *sink() { return &m_sink; } + + void start(){ + for (size_t i = 0; i < m_n_threads; i++) { + m_threads.push_back( + std::thread(&ClusterFinderMT::process, this, i)); + } + m_collect_thread = std::thread(&ClusterFinderMT::collect, this); + } + + void stop() { + m_stop_requested = true; + for (auto &thread : m_threads) { + thread.join(); + } + m_processing_threads_stopped = true; + m_collect_thread.join(); + } + + void sync(){ + for (auto &q : m_input_queues) { + while(!q->isEmpty()){ + std::this_thread::sleep_for(m_default_wait); + } + } + } + + void push_pedestal_frame(NDView frame) { + FrameWrapper fw{FrameType::PEDESTAL, 0, NDArray(frame)}; + + for (auto &queue : m_input_queues) { + while (!queue->write(fw)) { + // fmt::print("push_pedestal_frame: queue full\n"); + std::this_thread::sleep_for(m_default_wait); + } + } + } + + void find_clusters(NDView frame) { + FrameWrapper fw{FrameType::DATA, 0, NDArray(frame)}; + while (!m_input_queues[m_current_thread%m_n_threads]->write(fw)) { + std::this_thread::sleep_for(m_default_wait); + } + m_current_thread++; + } + + ClusterVector steal_clusters(bool realloc_same_capacity = false) { + ClusterVector clusters(3,3); + for (auto &finder : m_cluster_finders) { + clusters += finder->steal_clusters(); + } + return clusters; + } + + + // void push(FrameWrapper&& frame) { + // //TODO! need to loop until we are successful + // auto rc = m_input_queue.write(std::move(frame)); + // fmt::print("pushed frame {}\n", rc); + // } +}; + +} // namespace aare \ No newline at end of file diff --git a/include/aare/ClusterVector.hpp b/include/aare/ClusterVector.hpp index 98d4b37..a1b3a62 100644 --- a/include/aare/ClusterVector.hpp +++ b/include/aare/ClusterVector.hpp @@ -22,6 +22,7 @@ template class ClusterVector { std::byte *m_data{}; size_t m_size{0}; size_t m_capacity; + uint64_t m_frame_number{0}; //TODO! Check frame number size and type /* Format string used in the python bindings to create a numpy array from the buffer @@ -39,7 +40,7 @@ template class ClusterVector { * @param cluster_size_y size of the cluster in y direction * @param capacity initial capacity of the buffer in number of clusters */ - ClusterVector(size_t cluster_size_x, size_t cluster_size_y, + ClusterVector(size_t cluster_size_x = 3, size_t cluster_size_y = 3, size_t capacity = 1024) : m_cluster_size_x(cluster_size_x), m_cluster_size_y(cluster_size_y), m_capacity(capacity) { @@ -108,7 +109,14 @@ template class ClusterVector { ptr); m_size++; } - + ClusterVector& operator+=(const ClusterVector& other){ + if (m_size + other.m_size > m_capacity) { + allocate_buffer(m_capacity + other.m_size); + } + std::copy(other.m_data, other.m_data + other.m_size * element_offset(), m_data + m_size * element_offset()); + m_size += other.m_size; + return *this; + } /** * @brief Sum the pixels in each cluster @@ -166,6 +174,9 @@ template class ClusterVector { return m_fmt_base; } + uint64_t frame_number() const { return m_frame_number; } + void set_frame_number(uint64_t frame_number) { m_frame_number = frame_number; } + private: void allocate_buffer(size_t new_capacity) { size_t num_bytes = element_offset() * new_capacity; diff --git a/include/aare/ProducerConsumerQueue.hpp b/include/aare/ProducerConsumerQueue.hpp new file mode 100644 index 0000000..426b9e2 --- /dev/null +++ b/include/aare/ProducerConsumerQueue.hpp @@ -0,0 +1,203 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// @author Bo Hu (bhu@fb.com) +// @author Jordan DeLong (delong.j@fb.com) + +// Changes made by PSD Detector Group: +// Copied: Line 34 constexpr std::size_t hardware_destructive_interference_size = 128; from folly/lang/Align.h +// Changed extension to .hpp +// Changed namespace to aare + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +constexpr std::size_t hardware_destructive_interference_size = 128; +namespace aare { + +/* + * ProducerConsumerQueue is a one producer and one consumer queue + * without locks. + */ +template struct ProducerConsumerQueue { + typedef T value_type; + + ProducerConsumerQueue(const ProducerConsumerQueue &) = delete; + ProducerConsumerQueue &operator=(const ProducerConsumerQueue &) = delete; + + + ProducerConsumerQueue(ProducerConsumerQueue &&other){ + size_ = other.size_; + records_ = other.records_; + other.records_ = nullptr; + readIndex_ = other.readIndex_.load(std::memory_order_acquire); + writeIndex_ = other.writeIndex_.load(std::memory_order_acquire); + } + ProducerConsumerQueue &operator=(ProducerConsumerQueue &&other){ + size_ = other.size_; + records_ = other.records_; + other.records_ = nullptr; + readIndex_ = other.readIndex_.load(std::memory_order_acquire); + writeIndex_ = other.writeIndex_.load(std::memory_order_acquire); + return *this; + } + + + ProducerConsumerQueue():ProducerConsumerQueue(2){}; + // size must be >= 2. + // + // Also, note that the number of usable slots in the queue at any + // given time is actually (size-1), so if you start with an empty queue, + // isFull() will return true after size-1 insertions. + explicit ProducerConsumerQueue(uint32_t size) + : size_(size), records_(static_cast(std::malloc(sizeof(T) * size))), readIndex_(0), writeIndex_(0) { + assert(size >= 2); + if (!records_) { + throw std::bad_alloc(); + } + } + + ~ProducerConsumerQueue() { + // We need to destruct anything that may still exist in our queue. + // (No real synchronization needed at destructor time: only one + // thread can be doing this.) + if (!std::is_trivially_destructible::value) { + size_t readIndex = readIndex_; + size_t endIndex = writeIndex_; + while (readIndex != endIndex) { + records_[readIndex].~T(); + if (++readIndex == size_) { + readIndex = 0; + } + } + } + + std::free(records_); + } + + template bool write(Args &&...recordArgs) { + auto const currentWrite = writeIndex_.load(std::memory_order_relaxed); + auto nextRecord = currentWrite + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + if (nextRecord != readIndex_.load(std::memory_order_acquire)) { + new (&records_[currentWrite]) T(std::forward(recordArgs)...); + writeIndex_.store(nextRecord, std::memory_order_release); + return true; + } + + // queue is full + return false; + } + + // move (or copy) the value at the front of the queue to given variable + bool read(T &record) { + auto const currentRead = readIndex_.load(std::memory_order_relaxed); + if (currentRead == writeIndex_.load(std::memory_order_acquire)) { + // queue is empty + return false; + } + + auto nextRecord = currentRead + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + record = std::move(records_[currentRead]); + records_[currentRead].~T(); + readIndex_.store(nextRecord, std::memory_order_release); + return true; + } + + // pointer to the value at the front of the queue (for use in-place) or + // nullptr if empty. + T *frontPtr() { + auto const currentRead = readIndex_.load(std::memory_order_relaxed); + if (currentRead == writeIndex_.load(std::memory_order_acquire)) { + // queue is empty + return nullptr; + } + return &records_[currentRead]; + } + + // queue must not be empty + void popFront() { + auto const currentRead = readIndex_.load(std::memory_order_relaxed); + assert(currentRead != writeIndex_.load(std::memory_order_acquire)); + + auto nextRecord = currentRead + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + records_[currentRead].~T(); + readIndex_.store(nextRecord, std::memory_order_release); + } + + bool isEmpty() const { + return readIndex_.load(std::memory_order_acquire) == writeIndex_.load(std::memory_order_acquire); + } + + bool isFull() const { + auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + if (nextRecord != readIndex_.load(std::memory_order_acquire)) { + return false; + } + // queue is full + return true; + } + + // * If called by consumer, then true size may be more (because producer may + // be adding items concurrently). + // * If called by producer, then true size may be less (because consumer may + // be removing items concurrently). + // * It is undefined to call this from any other thread. + size_t sizeGuess() const { + int ret = writeIndex_.load(std::memory_order_acquire) - readIndex_.load(std::memory_order_acquire); + if (ret < 0) { + ret += size_; + } + return ret; + } + + // maximum number of items in the queue. + size_t capacity() const { return size_ - 1; } + + private: + using AtomicIndex = std::atomic; + + char pad0_[hardware_destructive_interference_size]; + // const uint32_t size_; + uint32_t size_; + // T *const records_; + T* records_; + + alignas(hardware_destructive_interference_size) AtomicIndex readIndex_; + alignas(hardware_destructive_interference_size) AtomicIndex writeIndex_; + + char pad1_[hardware_destructive_interference_size - sizeof(AtomicIndex)]; +}; + +} // namespace aare \ No newline at end of file diff --git a/python/examples/play.py b/python/examples/play.py index 986b718..9c07c99 100644 --- a/python/examples/play.py +++ b/python/examples/play.py @@ -13,36 +13,66 @@ from aare import File, ClusterFinder, VarClusterFinder base = Path('/mnt/sls_det_storage/matterhorn_data/aare_test_data/') f = File(base/'Moench03new/cu_half_speed_master_4.json') -cf = ClusterFinder((400,400), (3,3)) + + +from aare._aare import ClusterFinderMT, ClusterCollector + + +cf = ClusterFinderMT((400,400), (3,3), n_threads = 3) +collector = ClusterCollector(cf) + for i in range(1000): - cf.push_pedestal_frame(f.read_frame()) + img = f.read_frame() + cf.push_pedestal_frame(img) +print('Pedestal done') +cf.sync() -fig, ax = plt.subplots() -im = ax.imshow(cf.pedestal()) -cf.pedestal() -cf.noise() +for i in range(100): + img = f.read_frame() + cf.find_clusters(img) + + +# time.sleep(1) +cf.stop() +collector.stop() +cv = collector.steal_clusters() +print(f'Processed {len(cv)} frames') + +print('Done') -N = 500 -t0 = time.perf_counter() -hist1 = bh.Histogram(bh.axis.Regular(40, -2, 4000)) -f.seek(0) -t0 = time.perf_counter() -data = f.read_n(N) -t_elapsed = time.perf_counter()-t0 +# cf = ClusterFinder((400,400), (3,3)) +# for i in range(1000): +# cf.push_pedestal_frame(f.read_frame()) + +# fig, ax = plt.subplots() +# im = ax.imshow(cf.pedestal()) +# cf.pedestal() +# cf.noise() -n_bytes = data.itemsize*data.size -print(f'Reading {N} frames took {t_elapsed:.3f}s {N/t_elapsed:.0f} FPS, {n_bytes/1024**2:.4f} GB/s') +# N = 500 +# t0 = time.perf_counter() +# hist1 = bh.Histogram(bh.axis.Regular(40, -2, 4000)) +# f.seek(0) + +# t0 = time.perf_counter() +# data = f.read_n(N) +# t_elapsed = time.perf_counter()-t0 -for frame in data: - a = cf.find_clusters(frame) +# n_bytes = data.itemsize*data.size -clusters = cf.steal_clusters() +# print(f'Reading {N} frames took {t_elapsed:.3f}s {N/t_elapsed:.0f} FPS, {n_bytes/1024**2:.4f} GB/s') + + +# for frame in data: +# a = cf.find_clusters(frame) + +# clusters = cf.steal_clusters() # t_elapsed = time.perf_counter()-t0 # print(f'Clustering {N} frames took {t_elapsed:.2f}s {N/t_elapsed:.0f} FPS') diff --git a/python/src/cluster.hpp b/python/src/cluster.hpp index 5d22091..90c9f21 100644 --- a/python/src/cluster.hpp +++ b/python/src/cluster.hpp @@ -1,5 +1,7 @@ #include "aare/ClusterFinder.hpp" +#include "aare/ClusterFinderMT.hpp" #include "aare/ClusterVector.hpp" +#include "aare/ClusterCollector.hpp" #include "aare/NDView.hpp" #include "aare/Pedestal.hpp" #include "np_helper.hpp" @@ -7,11 +9,13 @@ #include #include #include +#include #include namespace py = pybind11; using pd_type = double; + template void define_cluster_vector(py::module &m, const std::string &typestr) { auto class_name = fmt::format("ClusterVector_{}", typestr); @@ -31,6 +35,9 @@ void define_cluster_vector(py::module &m, const std::string &typestr) { auto *vec = new std::vector(self.sum()); return return_vector(vec); }) + .def_property_readonly("capacity", &ClusterVector::capacity) + .def_property("frame_number", &ClusterVector::frame_number, + &ClusterVector::set_frame_number) .def_buffer([typestr](ClusterVector &self) -> py::buffer_info { return py::buffer_info( self.data(), /* Pointer to buffer */ @@ -45,6 +52,55 @@ void define_cluster_vector(py::module &m, const std::string &typestr) { }); } +void define_cluster_finder_mt_bindings(py::module &m) { + py::class_>(m, "ClusterFinderMT") + .def(py::init, Shape<2>, pd_type, size_t, size_t>(), + py::arg("image_size"), py::arg("cluster_size"), + py::arg("n_sigma") = 5.0, py::arg("capacity") = 1000, + py::arg("n_threads") = 3) + .def("push_pedestal_frame", + [](ClusterFinderMT &self, + py::array_t frame) { + auto view = make_view_2d(frame); + self.push_pedestal_frame(view); + }) + .def("find_clusters", + [](ClusterFinderMT &self, + py::array_t frame) { + auto view = make_view_2d(frame); + self.find_clusters(view); + return; + }) + .def("sync", &ClusterFinderMT::sync) + .def( + "steal_clusters", + [](ClusterFinderMT &self, + bool realloc_same_capacity) { + auto v = new ClusterVector( + self.steal_clusters(realloc_same_capacity)); + return v; + }, + py::arg("realloc_same_capacity") = false) + .def("stop", &ClusterFinderMT::stop); +} + + +void define_cluster_collector_bindings(py::module &m) { + py::class_(m, "ClusterCollector") + .def(py::init*>()) + .def("stop", &ClusterCollector::stop) + .def("steal_clusters", + [](ClusterCollector &self) { + auto v = new std::vector>( + self.steal_clusters()); + return v; + }, py::return_value_policy::take_ownership); + + + +} + + void define_cluster_finder_bindings(py::module &m) { py::class_>(m, "ClusterFinder") .def(py::init, Shape<2>, pd_type, size_t>(), diff --git a/python/src/module.cpp b/python/src/module.cpp index 14a686a..0ef868b 100644 --- a/python/src/module.cpp +++ b/python/src/module.cpp @@ -25,5 +25,8 @@ PYBIND11_MODULE(_aare, m) { define_pedestal_bindings(m, "Pedestal_d"); define_pedestal_bindings(m, "Pedestal_f"); define_cluster_finder_bindings(m); + define_cluster_finder_mt_bindings(m); define_cluster_file_io_bindings(m); + define_cluster_collector_bindings(m); + } \ No newline at end of file