Files
Jungfraujoch/receiver/AcquisitionDevice.cpp

218 lines
7.4 KiB
C++

// Copyright (2019-2023) Paul Scherrer Institute
#ifdef JFJOCH_USE_NUMA
#include <numaif.h>
#endif
#include <sys/mman.h>
#include <thread>
#include <fstream>
#include <cmath>
#include <sstream>
#include "../common/JFJochException.h"
#include "AcquisitionDevice.h"
#include "../common/NetworkAddressConvert.h"
void *mmap_acquisition_buffer(size_t size, int16_t numa_node) {
void *ret = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (ret == nullptr) {
throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "frame_buffer");
}
#ifdef JFJOCH_USE_NUMA
if (numa_node >= 0) {
unsigned long nodemask = 1L << numa_node;;
if (numa_node > sizeof(nodemask)*8)
throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Mask too small for NUMA node");
if (mbind(ret, size, MPOL_BIND, &nodemask, sizeof(nodemask)*8, MPOL_MF_STRICT) == -1)
throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Cannot apply NUMA policy");
}
#endif
memset(ret, 0, size);
return ret;
}
AcquisitionDevice::AcquisitionDevice(uint16_t in_data_stream) :
buffer_err(RAW_MODULE_SIZE) {
logger = nullptr;
data_stream = in_data_stream;
}
void AcquisitionDevice::PrepareAction(const DiffractionExperiment &experiment) {
if (experiment.GetModulesNum(data_stream) > max_modules)
throw(JFJochException(JFJochExceptionCategory::InputParameterAboveMax,
"Number of modules exceeds max possible for FPGA"));
counters.Reset(experiment, data_stream);
}
void AcquisitionDevice::StartAction(const DiffractionExperiment &experiment) {
Cancel();
if (experiment.GetModulesNum(data_stream) > max_modules)
throw(JFJochException(JFJochExceptionCategory::InputParameterAboveMax,
"Number of modules exceeds max possible for FPGA"));
for (int i = 0; i < RAW_MODULE_SIZE; i++) {
if (experiment.GetDetectorMode() == DetectorMode::Conversion)
buffer_err[i] = PIXEL_OUT_LOST;
else
buffer_err[i] = -1;
}
counters.Reset(experiment, data_stream);
expected_frames = experiment.GetFrameNum();
// Ensure internal WR queue is empty
work_request_queue.Clear();
Start(experiment);
for (uint32_t i = 0; i < buffer_device.size(); i++)
SendWorkRequest(i);
auto c = work_completion_queue.GetBlocking();
if (c.type != Completion::Type::Start)
throw JFJochException(JFJochExceptionCategory::AcquisitionDeviceError, "Mismatch in work completions");
StartSendingWorkRequests();
start_time = std::chrono::system_clock::now();
if (logger)
logger->Info("Started");
}
void AcquisitionDevice::WaitForActionComplete() {
auto c = work_completion_queue.GetBlocking();
while (c.type != Completion::Type::End) {
if (c.frame_number >= expected_frames) {
Cancel();
// this frame is not of any interest, therefore its location can be immediately released
SendWorkRequest(c.handle);
} else if (c.module_number >= max_modules) {
// Module number out of bounds, don't process
if (logger != nullptr)
logger->Error("Completion with wrong module number data stream {} completion frame number {} module {} handle {} timestamp {} status {}",
data_stream, c.frame_number, c.module_number, c.handle, c.timestamp, c.status);
SendWorkRequest(c.handle);
} else
counters.UpdateCounters(&c);
if (logger != nullptr)
logger->Debug("Data stream {} completion frame number {} module {} handle {} timestamp {} status {}",
data_stream, c.frame_number, c.module_number, c.handle, c.timestamp, c.status);
c = work_completion_queue.GetBlocking();
}
counters.SetAcquisitionFinished();
end_time = std::chrono::system_clock::now();
Cancel();
Finalize();
}
void AcquisitionDevice::SendWorkRequest(uint32_t handle) {
work_request_queue.Put(WorkRequest{
.ptr = buffer_device.at(handle),
.handle = handle
});
}
uint64_t AcquisitionDevice::GetBytesReceived() const {
return counters.GetTotalPackets() * 8192LU;
}
void AcquisitionDevice::SaveStatistics(const DiffractionExperiment &experiment,
JFJochProtoBuf::AcquisitionDeviceStatistics &statistics) const {
statistics.set_bytes_received(GetBytesReceived());
statistics.set_start_timestamp(start_time.time_since_epoch().count());
statistics.set_end_timestamp(end_time.time_since_epoch().count());
auto nmodules = experiment.GetModulesNum(data_stream);
auto expected_packets = counters.GetExpectedPackets();
auto total_packets = counters.GetTotalPackets();
statistics.set_nmodules(nmodules);
statistics.set_packets_expected(expected_packets);
statistics.set_good_packets(total_packets);
for (int i = 0; i < nmodules; i++)
statistics.add_packets_received_per_module(counters.GetTotalPackets(i));
if ((expected_packets == 0) || (total_packets == expected_packets))
statistics.set_efficiency(1.0);
else
statistics.set_efficiency(static_cast<float>(total_packets) / static_cast<float>(expected_packets));
*statistics.mutable_fpga_status() = GetStatus();
}
const int16_t *AcquisitionDevice::GetFrameBuffer(size_t frame_number, uint16_t module_number) const {
auto handle = counters.GetBufferHandle(frame_number, module_number);
if (handle != HandleNotValid)
return (int16_t *) buffer_device.at(handle);
else
return GetErrorFrameBuffer();
}
const int16_t *AcquisitionDevice::GetErrorFrameBuffer() const {
return buffer_err.data();
}
int16_t *AcquisitionDevice::GetDeviceBuffer(size_t handle) {
if (handle >= buffer_device.size())
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Handle outside of range");
else
return (int16_t *) buffer_device.at(handle);
}
void AcquisitionDevice::InitializeCalibration(const DiffractionExperiment &experiment, const JFCalibration &calib) {}
void AcquisitionDevice::MapBuffersStandard(size_t c2h_buffer_count, size_t h2c_buffer_count, int16_t numa_node) {
try {
for (int i = 0; i < std::max(c2h_buffer_count, h2c_buffer_count); i++)
buffer_device.emplace_back((uint16_t *) mmap_acquisition_buffer(FPGA_BUFFER_LOCATION_SIZE, numa_node));
} catch (const JFJochException &e) {
UnmapBuffers();
throw;
}
}
void AcquisitionDevice::UnmapBuffers() {
for (auto &i: buffer_device)
if (i != nullptr) munmap(i, FPGA_BUFFER_LOCATION_SIZE);
}
void AcquisitionDevice::FrameBufferRelease(size_t frame_number, uint16_t module_number) {
auto handle = counters.GetBufferHandleAndClear(frame_number, module_number);
if (handle != AcquisitionCounters::HandleNotFound)
SendWorkRequest(handle);
}
void AcquisitionDevice::EnableLogging(Logger *in_logger) {
logger = in_logger;
}
int32_t AcquisitionDevice::GetNUMANode() const {
return -1;
}
uint16_t AcquisitionDevice::GetUDPPort() const {
return 1234;
}
const AcquisitionCounters &AcquisitionDevice::Counters() const {
return counters;
}
std::string AcquisitionDevice::GetIPv4Address() const {
return IPv4AddressToStr(ipv4_addr);
}
std::string AcquisitionDevice::GetMACAddress() const {
return MacAddressToStr(mac_addr);
}