// Copyright (2019-2022) Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-or-later #ifdef JFJOCH_USE_NUMA #include #endif #include #include #include #include #include #include "../common/JFJochException.h" #include "AcquisitionDevice.h" #include "../common/NetworkAddressConvert.h" void *mmap_acquisition_buffer(size_t size, int16_t numa_node) { void *ret = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); if (ret == nullptr) { throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "frame_buffer"); } #ifdef JFJOCH_USE_NUMA if (numa_node >= 0) { unsigned long nodemask = 1L << numa_node;; if (numa_node > sizeof(nodemask)*8) throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Mask too small for NUMA node"); if (mbind(ret, size, MPOL_BIND, &nodemask, sizeof(nodemask)*8, MPOL_MF_STRICT) == -1) throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Cannot apply NUMA policy"); } #endif memset(ret, 0, size); return ret; } AcquisitionDevice::AcquisitionDevice(uint16_t in_data_stream) : buffer_err(RAW_MODULE_SIZE) { logger = nullptr; data_stream = in_data_stream; } void AcquisitionDevice::PrepareAction(const DiffractionExperiment &experiment) { if (experiment.GetModulesNum(data_stream) > max_modules) throw(JFJochException(JFJochExceptionCategory::InputParameterAboveMax, "Number of modules exceeds max possible for FPGA")); counters.Reset(experiment, data_stream); } void AcquisitionDevice::StartAction(const DiffractionExperiment &experiment) { Cancel(); if (experiment.GetModulesNum(data_stream) > max_modules) throw(JFJochException(JFJochExceptionCategory::InputParameterAboveMax, "Number of modules exceeds max possible for FPGA")); for (int i = 0; i < RAW_MODULE_SIZE; i++) { if (experiment.GetDetectorMode() == DetectorMode::Conversion) buffer_err[i] = PIXEL_OUT_LOST; else buffer_err[i] = -1; } counters.Reset(experiment, data_stream); expected_frames = experiment.GetFrameNum(); // Ensure internal WR queue is empty work_request_queue.Clear(); Start(experiment); for (uint32_t i = 0; i < buffer_device.size(); i++) SendWorkRequest(i); auto c = work_completion_queue.GetBlocking(); if (c.type != Completion::Type::Start) throw JFJochException(JFJochExceptionCategory::AcquisitionDeviceError, "Mismatch in work completions"); StartSendingWorkRequests(); start_time = std::chrono::system_clock::now(); if (logger) logger->Info("Started"); } void AcquisitionDevice::WaitForActionComplete() { auto c = work_completion_queue.GetBlocking(); while (c.type != Completion::Type::End) { if (c.frame_number >= expected_frames) { Cancel(); // this frame is not of any interest, therefore its location can be immediately released SendWorkRequest(c.handle); } else if (c.module_number >= max_modules) { // Module number out of bounds, don't process if (logger != nullptr) logger->Error("Completion with wrong module number data stream {} completion frame number {} module {} handle {} timestamp {} status {}", data_stream, c.frame_number, c.module_number, c.handle, c.timestamp, c.status); SendWorkRequest(c.handle); } else counters.UpdateCounters(&c); if (logger != nullptr) logger->Debug("Data stream {} completion frame number {} module {} handle {} timestamp {} status {}", data_stream, c.frame_number, c.module_number, c.handle, c.timestamp, c.status); c = work_completion_queue.GetBlocking(); } counters.SetAcquisitionFinished(); end_time = std::chrono::system_clock::now(); Cancel(); Finalize(); } void AcquisitionDevice::SendWorkRequest(uint32_t handle) { work_request_queue.Put(WorkRequest{ .ptr = buffer_device.at(handle), .handle = handle }); } uint64_t AcquisitionDevice::GetBytesReceived() const { return counters.GetTotalPackets() * 8192LU; } void AcquisitionDevice::SaveStatistics(const DiffractionExperiment &experiment, JFJochProtoBuf::AcquisitionDeviceStatistics &statistics) const { statistics.set_bytes_received(GetBytesReceived()); statistics.set_start_timestamp(start_time.time_since_epoch().count()); statistics.set_end_timestamp(end_time.time_since_epoch().count()); auto nmodules = experiment.GetModulesNum(data_stream); auto expected_packets = counters.GetExpectedPackets(); auto total_packets = counters.GetTotalPackets(); statistics.set_nmodules(nmodules); statistics.set_packets_expected(expected_packets); statistics.set_good_packets(total_packets); for (int i = 0; i < nmodules; i++) statistics.add_packets_received_per_module(counters.GetTotalPackets(i)); if ((expected_packets == 0) || (total_packets == expected_packets)) statistics.set_efficiency(1.0); else statistics.set_efficiency(static_cast(total_packets) / static_cast(expected_packets)); *statistics.mutable_fpga_status() = GetStatus(); } const int16_t *AcquisitionDevice::GetFrameBuffer(size_t frame_number, uint16_t module_number) const { auto handle = counters.GetBufferHandle(frame_number, module_number); if (handle != HandleNotValid) return (int16_t *) buffer_device.at(handle); else return GetErrorFrameBuffer(); } const int16_t *AcquisitionDevice::GetErrorFrameBuffer() const { return buffer_err.data(); } int16_t *AcquisitionDevice::GetDeviceBuffer(size_t handle) { if (handle >= buffer_device.size()) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Handle outside of range"); else return (int16_t *) buffer_device.at(handle); } void AcquisitionDevice::InitializeCalibration(const DiffractionExperiment &experiment, const JFCalibration &calib) {} void AcquisitionDevice::MapBuffersStandard(size_t c2h_buffer_count, size_t h2c_buffer_count, int16_t numa_node) { try { for (int i = 0; i < std::max(c2h_buffer_count, h2c_buffer_count); i++) buffer_device.emplace_back((uint16_t *) mmap_acquisition_buffer(FPGA_BUFFER_LOCATION_SIZE, numa_node)); } catch (const JFJochException &e) { UnmapBuffers(); throw; } } void AcquisitionDevice::UnmapBuffers() { for (auto &i: buffer_device) if (i != nullptr) munmap(i, FPGA_BUFFER_LOCATION_SIZE); } void AcquisitionDevice::FrameBufferRelease(size_t frame_number, uint16_t module_number) { auto handle = counters.GetBufferHandleAndClear(frame_number, module_number); if (handle != AcquisitionCounters::HandleNotFound) SendWorkRequest(handle); } void AcquisitionDevice::EnableLogging(Logger *in_logger) { logger = in_logger; } int32_t AcquisitionDevice::GetNUMANode() const { return -1; } uint16_t AcquisitionDevice::GetUDPPort() const { return 1234; } const AcquisitionCounters &AcquisitionDevice::Counters() const { return counters; } std::string AcquisitionDevice::GetIPv4Address() const { return IPv4AddressToStr(ipv4_addr); } std::string AcquisitionDevice::GetMACAddress() const { return MacAddressToStr(mac_addr); }