// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include "ImageBuffer.h" #include "JFJochException.h" #ifdef JFJOCH_USE_NUMA #include #endif #include ImageBuffer::ImageBuffer(size_t buffer_size_bytes) : buffer_size(buffer_size_bytes) { #ifdef JFJOCH_USE_NUMA buffer = (uint8_t *) numa_alloc_interleaved(buffer_size); #else buffer = (uint8_t *) mmap (nullptr, buffer_size, PROT_READ | PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) ; #endif if (buffer == nullptr) throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Failed to allocate image buffer"); memset(buffer, 0, buffer_size); } ImageBuffer::~ImageBuffer() { // Wait max. 5 seconds std::unique_lock ul(m); FinalizeInternal(ul); #ifdef JFJOCH_USE_NUMA numa_free(buffer, buffer_size); #else munmap(buffer, buffer_size); #endif } void ImageBuffer::StartMeasurement(size_t in_location_size) { std::unique_lock ul(m); // Ensure there is nothing running for now if (!FinalizeInternal(ul)) throw JFJochException(JFJochExceptionCategory::WrongDAQState, "There are unfinished preview/sending jobs in the buffer"); // Setup buffer if (buffer_size < in_location_size) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Buffer is to small to hold even 1 image"); slot_size = in_location_size; slot_count = buffer_size / in_location_size; // Ensure that requests are allowed and clear active process counters disable_requests = false; zeromq_in_process = 0; preview_in_process = 0; // Clear queue and vectors while (!free_slots.empty()) free_slots.pop(); v.clear(); send_buffer_zero_copy_ret_val.clear(); // Fill queue and vectors for (int i = 0; i < slot_count; i++) { free_slots.push(i); v.emplace_back(); send_buffer_zero_copy_ret_val.emplace_back(GetBufferLocation(i),*this, i); } } void ImageBuffer::StartMeasurement(const DiffractionExperiment &experiment) { StartMeasurement(experiment.GetImageBufferLocationSize()); } bool ImageBuffer::CheckIfBufferReturned(std::chrono::microseconds timeout) { std::unique_lock ul(m); if (zeromq_in_process > 0) cv_zeromq_done.wait_for(ul, timeout, [this] { return zeromq_in_process == 0; }); return zeromq_in_process == 0; } ZeroCopyReturnValue *ImageBuffer::GetImageSlot() { std::unique_lock ul(m); if (!disable_requests && !free_slots.empty()) { auto i = free_slots.front(); free_slots.pop(); // Wait for already existing preview threads and don't allow new ones v[i].zeromq_processing = true; // Clear image number/size v[i].image_number = INT64_MIN; v[i].image_size = 0; v[i].indexed = false; ++zeromq_in_process; // Wait for any preview thread currently reading this one // All has to be ready, as waiting on cv_preview_done will release mutex if (v[i].readers > 0) cv_preview_done.wait(ul, [i, this] {return v[i].readers == 0;}); return &send_buffer_zero_copy_ret_val[i]; } return nullptr; } int64_t ImageBuffer::GetAvailSlots() const { std::unique_lock ul(m); return slot_count - zeromq_in_process; } void ImageBuffer::ReleaseSlot(uint32_t location, int64_t image_number, size_t image_size, bool indexed) { std::unique_lock ul(m); zeromq_in_process--; v[location].zeromq_processing = false; v[location].image_number = image_number; v[location].image_size = image_size; v[location].indexed = indexed; free_slots.push(location); cv_zeromq_done.notify_all(); } bool ImageBuffer::Finalize(std::chrono::microseconds timeout) { std::unique_lock ul(m); bool ret = FinalizeInternal(ul, timeout); return ret; } bool ImageBuffer::GetImage(std::vector &out_v, int64_t image_number) { std::unique_lock ul(m); if (disable_requests) return false; std::optional val; if (image_number == MaxImage) val = getHandleMaxImage(); else if (image_number == MaxIndexedImage) val = getHandleMaxIndexedImage(); else val = getHandle(image_number); if (!val.has_value()) return false; uint32_t i = val.value(); if (v[i].zeromq_processing) return false; // Unlock mutex to allow memory copy happen outside the critical section ++preview_in_process; ++v[i].readers; ul.unlock(); out_v.resize(v[i].image_size); memcpy(out_v.data(), GetBufferLocation(i), v[i].image_size); ul.lock(); --preview_in_process; --v[i].readers; cv_preview_done.notify_all(); return true; } bool ImageBuffer::FinalizeInternal(std::unique_lock &ul, std::chrono::microseconds timeout) { disable_requests = true; if (preview_in_process > 0) cv_preview_done.wait_for(ul, timeout, [this] {return preview_in_process == 0;} ); if (zeromq_in_process > 0) cv_zeromq_done.wait_for(ul, timeout, [this] {return zeromq_in_process == 0; }); return (preview_in_process == 0) && (zeromq_in_process == 0); } std::optional ImageBuffer::getHandle(int64_t image_number) const { for (int i = 0; i < slot_count; i++) { if (!v[i].zeromq_processing && v[i].image_number == image_number) return i; } return {}; } std::optional ImageBuffer::getHandleMaxImage() const { int64_t max_image_number = INT64_MIN; std::optional ret; for (int i = 0; i < slot_count; i++) { // if this is never true, then optional will remain empty if (!v[i].zeromq_processing && v[i].image_number > max_image_number && v[i].image_number >= 0) { max_image_number = v[i].image_number; ret = i; } } return ret; } std::optional ImageBuffer::getHandleMaxIndexedImage() const { int64_t max_image_number = INT64_MIN; std::optional ret; for (int i = 0; i < slot_count; i++) { // if this is never true, then optional will remain empty if (!v[i].zeromq_processing && v[i].image_number > max_image_number && v[i].image_number >= 0 && v[i].indexed) { max_image_number = v[i].image_number; ret = i; } } return ret; } uint8_t *ImageBuffer::GetBufferLocation(size_t id) { if (slot_size == 0) throw JFJochException(JFJochExceptionCategory::WrongNumber, "Buffer not initialized"); if (id >= slot_count) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Image entry out of buffer bounds"); return buffer + slot_size * id; } void ImageBuffer::GetStartMessage(std::vector &out_v) { std::unique_lock ul(start_message_mutex); out_v = start_message; } void ImageBuffer::SaveStartMessage(const std::vector &msg) { start_message = msg; } ImageBufferStatus ImageBuffer::GetStatus() const { ImageBufferStatus ret; { std::unique_lock ul(m); ret.total_slots = slot_count; ret.available_slots = slot_count - zeromq_in_process; ret.min_image_number = INT64_MAX; ret.max_image_number = 0; for (int i = 0; i < slot_count; i++) { if (v[i].image_number >= 0) { if (ret.max_image_number < v[i].image_number) ret.max_image_number = v[i].image_number; if (ret.min_image_number > v[i].image_number) ret.min_image_number = v[i].image_number; ret.images_in_the_buffer.push_back(v[i].image_number); } } if (ret.images_in_the_buffer.empty()) { ret.min_image_number = 0; ret.max_image_number = 0; } } std::sort(ret.images_in_the_buffer.begin(), ret.images_in_the_buffer.end()); return ret; }