// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include "JFJochReceiverFPGA.h" #include #include "ImageMetadata.h" #include "../common/CUDAWrapper.h" JFJochReceiverFPGA::JFJochReceiverFPGA(const DiffractionExperiment &in_experiment, const PixelMask &in_pixel_mask, const JFCalibration *in_calibration, AcquisitionDeviceGroup &in_aq_device, ImagePusher &in_image_sender, Logger &in_logger, int64_t in_forward_and_sum_nthreads, const NUMAHWPolicy &in_numa_policy, const SpotFindingSettings &in_spot_finding_settings, PreviewImage &in_preview_image, JFJochReceiverCurrentStatus &in_current_status, JFJochReceiverPlots &in_plots, ImageBuffer &in_send_buf_ctrl, ZMQPreviewSocket *in_zmq_preview_socket, ZMQMetadataSocket *in_zmq_metadata_socket, IndexerThreadPool *indexing_thread_pool) : JFJochReceiver(in_experiment, in_send_buf_ctrl, in_image_sender, in_preview_image, in_current_status, in_plots, in_spot_finding_settings, in_logger, in_numa_policy, in_pixel_mask, in_zmq_preview_socket, in_zmq_metadata_socket, indexing_thread_pool), calibration(nullptr), acquisition_device(in_aq_device), ndatastreams(experiment.GetDataStreamsNum()), frame_transformation_nthreads( in_forward_and_sum_nthreads), pedestal_nthreads((experiment.GetStorageCellNumber() > 2) ? 1 : 4), summation_nthreads(2), frame_transformation_ready((experiment.GetImageNum() > 0) ? frame_transformation_nthreads : 0), data_acquisition_ready(ndatastreams) { for (int m = 0; m < experiment.GetModulesNum(); m++) adu_histogram_module.emplace_back(std::make_unique()); roi_map = experiment.ExportROIMap(); if (experiment.GetDetectorSetup().GetDetectorType() == DetectorType::JUNGFRAU) calibration = in_calibration; if (acquisition_device.size() < ndatastreams) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Number of acquisition devices has to match data streams"); if (frame_transformation_nthreads <= 0) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Number of threads must be more than zero"); logger.Info("NUMA policy: {}", numa_policy.GetName()); expected_packets_per_image = 0; for (int d = 0; d < ndatastreams; d++) { acquisition_device[d].PrepareAction(experiment); acquisition_device[d].SetSpotFinderParameters(spot_finding_settings); expected_packets_per_image += acquisition_device[d].Counters().GetExpectedPacketsPerImage(); logger.Debug("Acquisition device {} prepared", d); } if (experiment.IsCPUSummation()) expected_packets_per_image *= experiment.GetSummation(); logger.Info("Data acquisition devices ready"); if ((experiment.GetDetectorMode() == DetectorMode::PedestalG0) || (experiment.GetDetectorMode() == DetectorMode::PedestalG1) || (experiment.GetDetectorMode() == DetectorMode::PedestalG2)) { if (experiment.GetImageNum() > 0) { throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Saving and calculating pedestal is not supported for the time being"); } switch (experiment.GetDetectorMode()) { case DetectorMode::PedestalG1: only_2nd_sc_pedestal = !experiment.IsFixedGainG1() && (experiment.GetStorageCellNumber() == 2); break; case DetectorMode::PedestalG2: only_2nd_sc_pedestal = (experiment.GetStorageCellNumber() == 2); break; default: only_2nd_sc_pedestal = false; break; } int64_t pedestal_count = (only_2nd_sc_pedestal) ? experiment.GetModulesNum() : experiment.GetModulesNum() * experiment.GetStorageCellNumber(); for (int i = 0; i < pedestal_count; i++) pedestal.emplace_back(std::make_unique(experiment)); for (int s = 0; s < experiment.GetStorageCellNumber(); s++) { bool ignore = only_2nd_sc_pedestal && (s == 0); for (int d = 0; d < ndatastreams; d++) for (int m = 0; m < experiment.GetModulesNum(d); m++) for (int n = 0; n < pedestal_nthreads; n++) frame_transformation_futures.emplace_back(std::async(std::launch::async, &JFJochReceiverFPGA::MeasurePedestalThread, this, d, m, s, n, ignore)); } logger.Info("Pedestal threads ready ({} threads/module*SC)", pedestal_nthreads); } else if (experiment.GetImageNum() > 0) { SendStartMessage(); SendCalibration(); for (int i = 0; i < experiment.GetImageNum(); i++) images_to_go.Put(i); // Setup frames summation and forwarding for (uint32_t i = 0; i < frame_transformation_nthreads; i++) { auto handle = std::async(std::launch::async, &JFJochReceiverFPGA::FrameTransformationThread, this, i); frame_transformation_futures.emplace_back(std::move(handle)); } logger.Info("Image compression/forwarding threads started"); frame_transformation_ready.wait(); logger.Info("Image compression/forwarding threads ready"); } for (int d = 0; d < ndatastreams; d++) data_acquisition_futures.emplace_back(std::async(std::launch::async, &JFJochReceiverFPGA::AcquireThread, this, d)); data_acquisition_ready.wait(); start_time = std::chrono::system_clock::now(); measurement = std::async(std::launch::async, &JFJochReceiverFPGA::FinalizeMeasurement, this); logger.Info("Receiving data started"); } void JFJochReceiverFPGA::SendPedestal(const std::string &prefix, const std::vector &v, int gain, int sc) { size_t xpixel = RAW_MODULE_COLS; size_t ypixel = experiment.GetModulesNum() * RAW_MODULE_LINES; std::string channel; if (experiment.GetStorageCellNumber() > 1) channel = fmt::format("{:s}_g{:d}_sc{:d}", prefix, gain, sc); else channel = fmt::format("{:s}_g{:d}", prefix, gain); CompressedImage image(v.data(), v.size(), xpixel, ypixel, CompressedImageMode::Uint32, CompressionAlgorithm::BSHUF_LZ4, channel); image_pusher.SendCalibration(image); } void JFJochReceiverFPGA::SendCalibration() { if ((calibration == nullptr) || !experiment.GetSaveCalibration() || !push_images_to_writer) return; JFJochBitShuffleCompressor compressor(CompressionAlgorithm::BSHUF_LZ4); for (int sc = 0; sc < experiment.GetStorageCellNumber(); sc++) { for (int gain = 0; gain < 3; gain++) { if (experiment.IsFixedGainG1() && (gain != 1)) continue; SendPedestal("pedestal", compressor.Compress(calibration->GetPedestal(gain, sc)), gain, sc); SendPedestal("pedestal_rms", compressor.Compress(calibration->GetPedestalRMS(gain, sc)), gain, sc); } } } void JFJochReceiverFPGA::AcquireThread(uint16_t data_stream) { try { NUMAHWPolicy::RunOnNode(acquisition_device[data_stream].GetNUMANode()); } catch (const JFJochException &e) { logger.Warning("NUMA bind error {} for device thread {} - continuing without binding", e.what(), data_stream); } try { LoadCalibrationToFPGA(data_stream); frame_transformation_ready.wait(); logger.Debug("Device thread {} start FPGA action", data_stream); acquisition_device[data_stream].StartAction(experiment); } catch (const JFJochException &e) { Cancel(e); data_acquisition_ready.count_down(); logger.ErrorException(e); logger.Warning("Device thread {} done due to an error", data_stream); return; } data_acquisition_ready.count_down(); try { logger.Debug("Device thread {} wait for FPGA action complete", data_stream); acquisition_device[data_stream].WaitForActionComplete(); } catch (const JFJochException &e) { logger.ErrorException(e); Cancel(e); logger.ErrorException(e); logger.Warning("Device thread {} done due to an error", data_stream); return; } logger.Info("Device thread {} done", data_stream); } void JFJochReceiverFPGA::MeasurePedestalThread(uint16_t data_stream, uint16_t module_number, uint16_t storage_cell, uint32_t threadid, bool ignore) { try { NUMAHWPolicy::RunOnNode(acquisition_device[data_stream].GetNUMANode()); } catch (const JFJochException &e) { logger.Error("HW bind error {}", e.what()); } JFPedestalCalc pedestal_calc(experiment); uint64_t starting_frame = storage_cell + threadid * experiment.GetStorageCellNumber(); uint64_t frame_stride = experiment.GetStorageCellNumber() * pedestal_nthreads; uint32_t storage_cell_header = UINT32_MAX; try { for (size_t frame = starting_frame; frame < experiment.GetFrameNum(); frame += frame_stride) { // Frame will be processed only if one already collects frame+2 acquisition_device[data_stream].Counters().WaitForFrame(frame + 2, module_number); if (acquisition_device[data_stream].Counters().IsFullModuleCollected(frame, module_number) && !ignore) { auto output = acquisition_device[data_stream].GetDeviceOutput(frame, module_number); // Partial packets will bring more problems, than benefit pedestal_calc.AnalyzeImage((uint16_t *) output->pixels); storage_cell_header = (output->module_statistics.debug >> 8) & 0xF; } acquisition_device[data_stream].FrameBufferRelease(frame, module_number); UpdateMaxDelay(acquisition_device[data_stream].Counters().CalculateDelay(frame, module_number)); current_status.SetProgress(GetProgress()); current_status.SetStatus(GetStatus()); } uint64_t offset = experiment.GetFirstModuleOfDataStream(data_stream) + module_number; if (!only_2nd_sc_pedestal) offset += experiment.GetModulesNum() * storage_cell; if (!ignore) *pedestal[offset] += pedestal_calc; } catch (const JFJochException &e) { Cancel(e); } logger.Debug("Pedestal calculation thread for data stream {} module {} storage cell {} -> header {} done", data_stream, module_number, storage_cell, storage_cell_header); } int64_t JFJochReceiverFPGA::SummationThread(uint16_t data_stream, int64_t image_number, uint16_t module_number, uint32_t threadid, ModuleSummation &summation) { try { NUMAHWPolicy::RunOnNode(acquisition_device[data_stream].GetNUMANode()); } catch (const JFJochException &e) { logger.Error("HW bind error {}", e.what()); } ModuleSummation local_summation(experiment); int64_t starting_frame = image_number * experiment.GetSummation(); for (int64_t i = threadid; i < experiment.GetSummation(); i += summation_nthreads) { const int64_t frame = starting_frame + i; // Frame will be processed only if one already collects frame+2 acquisition_device[data_stream].Counters().WaitForFrame(frame + 2, module_number); if (acquisition_device[data_stream].Counters().IsAnyPacketCollected(frame, module_number)) { const auto output = acquisition_device[data_stream].GetDeviceOutput(frame, module_number); local_summation.AddFPGAOutput(*output); } else local_summation.AddEmptyOutput(); acquisition_device[data_stream].FrameBufferRelease(frame, module_number); UpdateMaxDelay(acquisition_device[data_stream].Counters().CalculateDelay(frame, module_number)); current_status.SetProgress(GetProgress()); current_status.SetStatus(GetStatus()); } if (!summation.empty()) summation.AddFPGAOutput(local_summation.GetOutput(), 4); return 0; } void JFJochReceiverFPGA::FrameTransformationThread(uint32_t threadid) { std::unique_ptr analyzer; try { numa_policy.Bind(threadid); analyzer = std::make_unique(experiment, indexer); } catch (const JFJochException &e) { frame_transformation_ready.count_down(); logger.Error("Thread setup error {}", e.what()); Cancel(e); return; } FrameTransformation transformation(experiment); frame_transformation_ready.count_down(); uint16_t az_int_min_bin = std::floor(az_int_mapping.QToBin(experiment.GetLowQForBkgEstimate_recipA())); uint16_t az_int_max_bin = std::ceil(az_int_mapping.QToBin(experiment.GetHighQForBkgEstimate_recipA())); int64_t image_number; while (images_to_go.Get(image_number) != 0) { try { int64_t expected_frame = image_number; if (experiment.IsCPUSummation()) expected_frame *= experiment.GetSummation(); logger.Debug("Frame transformation thread - trying to get image {}", expected_frame); // If data acquisition is finished and fastest frame for the first device is behind acquisition_device[0].Counters().WaitForFrame(expected_frame); logger.Debug("Frame transformation thread - frame arrived {}", expected_frame); if (acquisition_device[0].Counters().IsAcquisitionFinished() && (acquisition_device[0].Counters().GetFastestFrameNumber() < expected_frame)) { logger.Debug("Frame transformation thread - skipping image {}", expected_frame); continue; } DataMessage message{}; message.number = image_number; message.original_number = image_number; message.user_data = experiment.GetImageAppendix(); message.run_number = experiment.GetRunNumber(); message.run_name = experiment.GetRunName(); ImageMetadata metadata(experiment); AzimuthalIntegrationProfile az_int_profile_image(az_int_mapping); auto local_spot_finding_settings = GetSpotFindingSettings(); if (experiment.IsCPUSummation()) { std::vector> summation; for (int i = 0; i < experiment.GetModulesNum(); i++) summation.emplace_back(std::make_unique(experiment)); std::vector > futures; for (int d = 0; d < ndatastreams; d++) { for (int m = 0; m < experiment.GetModulesNum(d); m++) { size_t module_abs_number = experiment.GetFirstModuleOfDataStream(d) + m; for (int i = 0; i < summation_nthreads; i++) { futures.emplace_back( std::async(std::launch::async, &JFJochReceiverFPGA::SummationThread, this, d, image_number, m, i, std::ref(*summation[module_abs_number])) ); } } } for (auto &f: futures) f.get(); for (int d = 0; d < ndatastreams; d++) { for (int m = 0; m < experiment.GetModulesNum(d); m++) { size_t i = experiment.GetFirstModuleOfDataStream(d) + m; if (!summation[i]->empty()) { adu_histogram_module[i]->Add(summation[i]->GetOutput()); transformation.ProcessModule(&summation[i]->GetOutput(), d); metadata.Process(&summation[i]->GetOutput()); az_int_profile_image.Add(summation[i]->GetOutput()); analyzer->ReadFromCPU(&summation[i]->GetOutput(), GetSpotFindingSettings(), i); } else transformation.FillNotCollectedModule(m, d); } } } else { logger.Debug("Frame transformation thread - processing image from FPGA {}", image_number); for (int d = 0; d < ndatastreams; d++) { for (int m = 0; m < experiment.GetModulesNum(d); m++) { acquisition_device[d].Counters().WaitForFrame(image_number + 2, m); if (acquisition_device[d].Counters().IsAnyPacketCollected(image_number, m)) { const DeviceOutput *output = acquisition_device[d].GetDeviceOutput(image_number, m); metadata.Process(output); size_t module_abs_number = experiment.GetFirstModuleOfDataStream(d) + m; adu_histogram_module[module_abs_number]->Add(*output); az_int_profile_image.Add(*output); analyzer->ReadFromFPGA(output, local_spot_finding_settings, module_abs_number); transformation.ProcessModule(output, d); } else transformation.FillNotCollectedModule(m, d); acquisition_device[d].FrameBufferRelease(image_number, m); } auto delay = acquisition_device[d].Counters().CalculateDelay(image_number); UpdateMaxDelay(delay); if (delay > message.receiver_aq_dev_delay) message.receiver_aq_dev_delay = delay; } } auto image_start_time = std::chrono::high_resolution_clock::now(); metadata.Export(message, expected_packets_per_image); if (message.image_collection_efficiency == 0.0f) { plots.AddEmptyImage(message); continue; } message.image = CompressedImage(transformation.GetImage(), experiment.GetPixelsNum() * experiment.GetByteDepthImage(), experiment.GetXPixelsNum(), experiment.GetYPixelsNum(), experiment.GetImageMode(), CompressionAlgorithm::NO_COMPRESSION); analyzer->Process(message, local_spot_finding_settings); message.receiver_free_send_buf = image_buffer.GetAvailSlots(); message.az_int_profile = az_int_profile_image.GetResult(); message.bkg_estimate = az_int_profile_image.GetBkgEstimate(experiment.GetAzimuthalIntegrationSettings()); plots.Add(message, az_int_profile_image); scan_result.Add(message); auto image_end_time = std::chrono::high_resolution_clock::now(); std::chrono::duration duration = image_end_time - image_start_time; message.processing_time_s = duration.count(); // Store overload/error pixel count if (message.image_collection_efficiency == 1.0f) { saturated_pixels.Add(message.saturated_pixel_count); error_pixels.Add(message.error_pixel_count); if (message.roi.contains("beam")) { roi_beam_npixel.Add(message.roi["beam"].pixels); roi_beam_sum.Add(message.roi["beam"].sum); } } ++images_collected; uncompressed_size += experiment.GetModulesNum() * RAW_MODULE_SIZE * experiment.GetByteDepthImage(); if (!serialmx_filter.ApplyFilter(message)) ++images_skipped; else { auto loc = image_buffer.GetImageSlot(); if (loc == nullptr) { // No free buffer locations - continue writer_queue_full = true; continue; } auto writer_buffer = (uint8_t *) loc->GetImage(); CBORStream2Serializer serializer(writer_buffer, experiment.GetImageBufferLocationSize()); message.image = CompressedImage(nullptr, 0, experiment.GetXPixelsNum(), experiment.GetYPixelsNum(), experiment.GetImageMode(), experiment.GetCompressionAlgorithm()); serializer.SerializeImage(message); if (experiment.GetImageBufferLocationSize() - serializer.GetImageAppendOffset() < experiment.GetMaxCompressedSize()) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Not enough memory to save image"); size_t image_size = transformation.CompressImage(writer_buffer + serializer.GetImageAppendOffset()); serializer.AppendImage(image_size); compressed_size += image_size; loc->SetImageNumber(image_number); loc->SetImageSize(serializer.GetBufferSize()); loc->SetIndexed(message.indexing_result.value_or(false)); if (zmq_preview_socket != nullptr) zmq_preview_socket->SendImage(writer_buffer, serializer.GetBufferSize()); if (zmq_metadata_socket != nullptr) zmq_metadata_socket->AddDataMessage(message); if (push_images_to_writer) { image_pusher.SendImage(*loc); ++images_sent; // Handle case when image not sent properly } else loc->release(); UpdateMaxImageSent(message.number); } logger.Debug("Frame transformation thread - done sending image {} / {}", image_number, message.number); current_status.SetProgress(GetProgress()); current_status.SetStatus(GetStatus()); } catch (const JFJochException &e) { logger.ErrorException(e); Cancel(e); } } logger.Debug("Sum&compression thread done"); } float JFJochReceiverFPGA::GetEfficiency() const { uint64_t expected_packets; if (experiment.GetImageNum() == 0) expected_packets = expected_packets_per_image * experiment.GetFrameNum(); else expected_packets = expected_packets_per_image * experiment.GetImageNum(); uint64_t received_packets = 0; for (int d = 0; d < ndatastreams; d++) { received_packets += acquisition_device[d].Counters().GetTotalPackets(); } if ((expected_packets == received_packets) || (expected_packets == 0)) return 1.0; return received_packets / static_cast(expected_packets); } void JFJochReceiverFPGA::Cancel(bool silent) { JFJochReceiver::Cancel(silent); for (int d = 0; d < ndatastreams; d++) acquisition_device[d].Cancel(); } void JFJochReceiverFPGA::Cancel(const JFJochException &e) { JFJochReceiver::Cancel(e); for (int d = 0; d < ndatastreams; d++) acquisition_device[d].Cancel(); } float JFJochReceiverFPGA::GetProgress() const { int64_t frames = experiment.GetImageNum(); if (experiment.IsCPUSummation()) frames *= experiment.GetSummation(); if (frames == 0) frames = experiment.GetFrameNum(); if ((frames == 0) || (acquisition_device[0].Counters().IsAcquisitionFinished())) return 1.0; return static_cast(acquisition_device[0].Counters().GetSlowestFrameNumber()) / static_cast(frames); } void JFJochReceiverFPGA::FinalizeMeasurement() { if (!frame_transformation_futures.empty()) { for (auto &future: frame_transformation_futures) future.get(); logger.Info("All processing threads done"); } current_status.SetProgress(1.0); current_status.SetStatus(GetStatus()); SendEndMessage(); if (experiment.GetImageNum() > 0) { for (int d = 0; d < ndatastreams; d++) acquisition_device[d].Cancel(); } end_time = std::chrono::system_clock::now(); for (auto &future: data_acquisition_futures) future.get(); logger.Info("Devices stopped"); if (!image_buffer.CheckIfBufferReturned(std::chrono::seconds(10))) { logger.Error("Send commands not finalized in 10 seconds"); throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Send commands not finalized in 10 seconds"); } logger.Info("Receiving data done"); if (push_images_to_writer) writer_error = image_pusher.Finalize(); current_status.SetProgress({}); current_status.SetStatus(GetStatus()); logger.Info("Writing process finalized"); } void JFJochReceiverFPGA::SetSpotFindingSettings(const SpotFindingSettings &in_spot_finding_settings) { std::unique_lock ul(spot_finding_settings_mutex); DiffractionExperiment::CheckDataProcessingSettings(in_spot_finding_settings); spot_finding_settings = in_spot_finding_settings; for (int i = 0; i < ndatastreams; i++) acquisition_device[i].SetSpotFinderParameters(spot_finding_settings); } void JFJochReceiverFPGA::StopReceiver() { if (measurement.valid()) { measurement.get(); logger.Info("Receiver stopped"); } } JFJochReceiverFPGA::~JFJochReceiverFPGA() { if (measurement.valid()) measurement.get(); } JFJochReceiverOutput JFJochReceiverFPGA::GetFinalStatistics() const { JFJochReceiverOutput ret = JFJochReceiver::GetFinalStatistics(); for (int d = 0; d < ndatastreams; d++) { for (int m = 0; m < acquisition_device[d].Counters().GetModuleNumber(); m++) { if (experiment.IsCPUSummation()) ret.expected_packets.push_back(acquisition_device[d].Counters().GetTotalExpectedPacketsPerModule() * experiment.GetSummation()); else ret.expected_packets.push_back(acquisition_device[d].Counters().GetTotalExpectedPacketsPerModule()); ret.received_packets.push_back(acquisition_device[d].Counters().GetTotalPackets(m)); } } RetrievePedestal(ret.pedestal_result); return ret; } void JFJochReceiverFPGA::RetrievePedestal(std::vector &output) const { time_t curr_time = std::chrono::system_clock::to_time_t(start_time); for (const auto &pc: pedestal) { JFModulePedestal mp; if (experiment.GetDetectorMode() == DetectorMode::PedestalG0) pc->Export(mp, PEDESTAL_G0_WRONG_GAIN_ALLOWED_COUNT); else pc->Export(mp); mp.SetCollectionTime(curr_time); output.emplace_back(std::move(mp)); } } void JFJochReceiverFPGA::LoadCalibrationToFPGA(uint16_t data_stream) { if (experiment.IsPedestalRun()) { acquisition_device[data_stream].InitializeEmptyPixelMask(experiment); return; // No calibration loaded for pedestal } if (calibration != nullptr) acquisition_device[data_stream].InitializeCalibration(experiment, *calibration); // Initialize pixel_mask acquisition_device[data_stream].InitializePixelMask(experiment, pixel_mask); // Initialize roi_map acquisition_device[data_stream].InitializeROIMap(experiment, roi_map); // Initialize data processing acquisition_device[data_stream].InitializeDataProcessing(experiment, az_int_mapping); }