// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include "ImageBuffer.h" #include "JFJochException.h" #include "ZeroCopyReturnValue.h" #include #include #include #include #include namespace { // Zero the buffer in parallel so that each page is first-touched by whichever NUMA node the // scheduler placed the zeroing thread on. With RAM headroom this approximates an interleaved // placement for the random-access ring buffer, while also slashing the one-time cost of // faulting in a 150-200 GB allocation. Replaces numa_alloc_interleaved + a single-threaded // memset, so this file no longer needs libnuma. void parallel_first_touch(uint8_t *buffer, size_t buffer_size) { const unsigned n = std::max(1u, std::thread::hardware_concurrency()); const size_t chunk = (buffer_size + n - 1) / n; std::vector threads; for (size_t begin = 0; begin < buffer_size; begin += chunk) { size_t len = std::min(chunk, buffer_size - begin); threads.emplace_back([=] { memset(buffer + begin, 0, len); }); } for (auto &t : threads) t.join(); } } ImageBuffer::ImageBuffer(size_t buffer_size_bytes) : buffer_size(buffer_size_bytes) { buffer = static_cast(std::malloc(buffer_size)); if (buffer == nullptr) throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Failed to allocate image buffer"); parallel_first_touch(buffer, buffer_size); } ImageBuffer::~ImageBuffer() { // Wait max. 5 seconds std::unique_lock ul(m); FinalizeInternal(ul); std::free(buffer); } void ImageBuffer::StartMeasurement(size_t in_location_size) { std::unique_lock ul(m); // Ensure there is nothing running for now if (!FinalizeInternal(ul)) throw JFJochException(JFJochExceptionCategory::WrongDAQState, "There are unfinished preview/sending jobs in the buffer"); // Setup buffer if (buffer_size < in_location_size) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Buffer is to small to hold even 1 image"); slot_size = in_location_size; slot_count = buffer_size / in_location_size; // Ensure that requests are allowed and clear active process counters disable_requests = false; slots_sending = 0; slots_preparation = 0; slots_in_preview = 0; slots_idle = 0; // Clear queue and vectors while (!free_slots.empty()) free_slots.pop(); v.clear(); send_buffer_zero_copy_ret_val.clear(); // Fill queue and vectors for (int i = 0; i < slot_count; i++) { free_slots.push(i); v.emplace_back(); send_buffer_zero_copy_ret_val.emplace_back(GetBufferLocation(i),*this, i); } ++counter; } void ImageBuffer::StartMeasurement(const DiffractionExperiment &experiment) { StartMeasurement(experiment.GetImageBufferLocationSize()); } bool ImageBuffer::CheckIfBufferReturned(std::chrono::microseconds timeout) { std::unique_lock ul(m); auto time_point = std::chrono::steady_clock::now() + timeout; if (slots_preparation > 0) cv_preparation_done.wait_until(ul, time_point, [this] { return slots_preparation == 0; }); if (slots_sending > 0) cv_send_done.wait_until(ul, time_point, [this] { return slots_sending == 0; }); return (slots_preparation == 0) && (slots_sending == 0); } ZeroCopyReturnValue *ImageBuffer::GetImageSlot() { std::unique_lock ul(m); if (!disable_requests && !free_slots.empty()) { auto i = free_slots.front(); free_slots.pop(); if (v[i].status == ImageBufferEntryStatus::Idle) --slots_idle; v[i].status = ImageBufferEntryStatus::InPreparation; v[i].image_number = INT64_MIN; v[i].image_size = 0; v[i].indexed = false; ++slots_preparation; // Wait for any preview thread currently reading this one // All has to be ready, as waiting on cv_preview_done will release mutex if (v[i].readers > 0) cv_preview_done.wait(ul, [i, this] {return v[i].readers == 0;}); return &send_buffer_zero_copy_ret_val[i]; } return nullptr; } int64_t ImageBuffer::GetAvailSlots() const { std::unique_lock ul(m); return slot_count - slots_preparation - slots_sending; } void ImageBuffer::ReadyToSend(uint32_t location, int64_t image_number, size_t image_size, bool indexed) { std::unique_lock ul(m); // Technically below should cause an exception, // but this will run in a multithreaded context, // where exceptions might not be caught, // so for now - just ignore if (location >= v.size()) return; if (v[location].status != ImageBufferEntryStatus::InPreparation) return; ++counter; ++slots_sending; --slots_preparation; v[location].status = ImageBufferEntryStatus::Sending; v[location].image_number = image_number; v[location].image_size = image_size; v[location].indexed = indexed; cv_preparation_done.notify_all(); } void ImageBuffer::ReleaseSlot(uint32_t location) { std::unique_lock ul(m); // Technically below should cause an exception, // but this will run in a multithreaded context, // where exceptions might not be caught, // so for now - just ignore if (location >= v.size()) return; if (v[location].status != ImageBufferEntryStatus::Sending) return; ++slots_idle; --slots_sending; v[location].status = ImageBufferEntryStatus::Idle; free_slots.push(location); cv_send_done.notify_all(); } bool ImageBuffer::Finalize(std::chrono::microseconds timeout) { std::unique_lock ul(m); bool ret = FinalizeInternal(ul, timeout); return ret; } bool ImageBuffer::GetImage(std::vector &out_v, int64_t image_number) { std::unique_lock ul(m); if (disable_requests) return false; std::optional val; if (image_number == MaxImage) val = getHandleMaxImage(); else if (image_number == MaxIndexedImage) val = getHandleMaxIndexedImage(); else val = getHandle(image_number); if (!val.has_value()) return false; uint32_t i = val.value(); if (v[i].status == ImageBufferEntryStatus::InPreparation || v[i].status == ImageBufferEntryStatus::Empty) return false; // Unlock mutex to allow memory copy to happen outside the critical section ++slots_in_preview; ++v[i].readers; size_t image_size = v[i].image_size; ul.unlock(); out_v.resize(image_size); memcpy(out_v.data(), GetBufferLocation(i), image_size); ul.lock(); --slots_in_preview; --v[i].readers; cv_preview_done.notify_all(); return true; } bool ImageBuffer::FinalizeInternal(std::unique_lock &ul, std::chrono::microseconds timeout) { disable_requests = true; auto time_point = std::chrono::steady_clock::now() + timeout; if (slots_in_preview > 0) cv_preview_done.wait_until(ul, time_point, [this] {return slots_in_preview == 0;} ); if (slots_preparation > 0) cv_preparation_done.wait_until(ul, time_point, [this] {return slots_preparation == 0;} ); if (slots_sending > 0) cv_send_done.wait_until(ul, time_point, [this] {return slots_sending == 0; }); ++counter; return (slots_preparation == 0) && (slots_in_preview == 0) && (slots_sending == 0); } std::optional ImageBuffer::getHandle(int64_t image_number) const { for (int i = 0; i < slot_count; i++) { if (v[i].status != ImageBufferEntryStatus::Empty && v[i].status != ImageBufferEntryStatus::InPreparation && v[i].image_number == image_number) return i; } return {}; } std::optional ImageBuffer::getHandleMaxImage() const { int64_t max_image_number = INT64_MIN; std::optional ret; for (int i = 0; i < slot_count; i++) { // if this is never true, then optional will remain empty if (v[i].status != ImageBufferEntryStatus::Empty && v[i].status != ImageBufferEntryStatus::InPreparation && v[i].image_number > max_image_number && v[i].image_number >= 0) { max_image_number = v[i].image_number; ret = i; } } return ret; } std::optional ImageBuffer::getHandleMaxIndexedImage() const { int64_t max_image_number = INT64_MIN; std::optional ret; for (int i = 0; i < slot_count; i++) { // if this is never true, then optional will remain empty if (v[i].status != ImageBufferEntryStatus::Empty && v[i].status != ImageBufferEntryStatus::InPreparation && v[i].image_number > max_image_number && v[i].image_number >= 0 && v[i].indexed) { max_image_number = v[i].image_number; ret = i; } } return ret; } uint8_t *ImageBuffer::GetBufferLocation(size_t id) { if (slot_size == 0) throw JFJochException(JFJochExceptionCategory::WrongNumber, "Buffer not initialized"); if (id >= slot_count) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Image entry out of buffer bounds"); return buffer + slot_size * id; } void ImageBuffer::GetStartMessage(std::vector &out_v) { std::unique_lock ul(start_message_mutex); out_v = start_message; } void ImageBuffer::SaveStartMessage(const std::vector &msg) { std::unique_lock ul(start_message_mutex); start_message = msg; } ImageBufferStatus ImageBuffer::GetStatus() const { ImageBufferStatus ret; { std::unique_lock ul(m); ret.total_slots = slot_count; ret.available_slots = slot_count - slots_sending - slots_preparation; ret.preparation_slots = slots_preparation; ret.sending_slots = slots_sending; ret.min_image_number = INT64_MAX; ret.max_image_number = 0; ret.current_counter = counter; for (int i = 0; i < slot_count; i++) { if (v[i].image_number >= 0) { if (ret.max_image_number < v[i].image_number) ret.max_image_number = v[i].image_number; if (ret.min_image_number > v[i].image_number) ret.min_image_number = v[i].image_number; ret.images_in_the_buffer.push_back(v[i].image_number); } } if (ret.images_in_the_buffer.empty()) { ret.min_image_number = 0; ret.max_image_number = 0; } } std::sort(ret.images_in_the_buffer.begin(), ret.images_in_the_buffer.end()); return ret; }