// Copyright (2019-2024) Paul Scherrer Institute #include "JFJochReceiver.h" #include #include "../image_analysis/MXAnalyzer.h" #include "../common/DiffractionGeometry.h" #include "ImageMetadata.h" #include "../common/time_utc.h" #include "../common/PixelMask.h" JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment, const JFCalibration *in_calibration, AcquisitionDeviceGroup &in_aq_device, ImagePusher &in_image_sender, Logger &in_logger, int64_t in_forward_and_sum_nthreads, const NUMAHWPolicy &in_numa_policy, const SpotFindingSettings &in_spot_finding_settings, SendBuffer &buf) : experiment(in_experiment), calibration(nullptr), send_buf_ctrl(experiment, buf), acquisition_device(in_aq_device), logger(in_logger), image_pusher(in_image_sender), frame_transformation_nthreads(in_forward_and_sum_nthreads), ndatastreams(experiment.GetDataStreamsNum()), data_acquisition_ready(ndatastreams), frame_transformation_ready((experiment.GetImageNum() > 0) ? frame_transformation_nthreads : 0), numa_policy(in_numa_policy), adu_histogram_module(experiment.GetModulesNum()), az_int_mapping(experiment), plots(experiment, az_int_mapping), spot_finding_settings(in_spot_finding_settings), preview_image(experiment), preview_image_indexed(experiment), serialmx_filter(experiment) { if (experiment.GetDetectorSetup().GetDetectorType() == DetectorType::JUNGFRAU) calibration = in_calibration; push_images_to_writer = (experiment.GetImageNum() > 0) && (!experiment.GetFilePrefix().empty()); if (acquisition_device.size() < ndatastreams) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Number of acquisition devices has to match data streams"); if (frame_transformation_nthreads <= 0) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Number of threads must be more than zero"); logger.Info("NUMA policy: {}", numa_policy.GetName()); for (int d = 0; d < ndatastreams; d++) { acquisition_device[d].PrepareAction(experiment); acquisition_device[d].SetSpotFinderParameters(spot_finding_settings); logger.Debug("Acquisition device {} prepared", d); } logger.Info("Data acquisition devices ready"); if (experiment.GetImageNum() > 0) { SendStartMessage(); SendCalibration(); for (int i = 0; i < experiment.GetImageNum(); i++) images_to_go.Put(i); // Setup frames summation and forwarding for (uint32_t i = 0; i < frame_transformation_nthreads; i++) { auto handle = std::async(std::launch::async, &JFJochReceiver::FrameTransformationThread, this, i); frame_transformation_futures.emplace_back(std::move(handle)); } logger.Info("Image compression/forwarding threads started"); frame_transformation_ready.wait(); logger.Info("Image compression/forwarding threads ready"); } for (int d = 0; d < ndatastreams; d++) data_acquisition_futures.emplace_back(std::async(std::launch::async, &JFJochReceiver::AcquireThread, this, d)); data_acquisition_ready.wait(); start_time = std::chrono::system_clock::now(); measurement = std::async(std::launch::async, &JFJochReceiver::FinalizeMeasurement, this); logger.Info("Receiving data started"); } void JFJochReceiver::SendStartMessage() { if (!push_images_to_writer) return; StartMessage message{}; experiment.FillMessage(message); message.arm_date = time_UTC(std::chrono::system_clock::now()); message.az_int_bin_number = az_int_mapping.GetBinNumber(); message.az_int_bin_to_q = az_int_mapping.GetBinToQ(); message.write_master_file = true; PixelMask pixel_mask(experiment); if (calibration) pixel_mask.LoadDetectorBadPixelMask(calibration->CalculateMask(), 1); std::vector nexus_mask; nexus_mask = pixel_mask.GetMask(experiment); size_t xpixel = experiment.GetXPixelsNum(); size_t ypixel = experiment.GetYPixelsNum(); CompressedImage image{ .data = (uint8_t *) nexus_mask.data(), .size = nexus_mask.size() * sizeof(nexus_mask[0]), .xpixel = xpixel, .ypixel = ypixel, .pixel_depth_bytes = sizeof(nexus_mask[0]), .pixel_is_signed = false, .pixel_is_float = false, .algorithm = CompressionAlgorithm::NO_COMPRESSION, .channel = "default" }; message.AddPixelMask(image); image_pusher.StartDataCollection(message); } void JFJochReceiver::SendCalibration() { if ((calibration == nullptr) || !experiment.GetSaveCalibration() || !push_images_to_writer) return; JFJochBitShuffleCompressor compressor(CompressionAlgorithm::BSHUF_LZ4); size_t xpixel = RAW_MODULE_COLS; size_t ypixel = experiment.GetModulesNum() * RAW_MODULE_LINES; for (int sc = 0; sc < experiment.GetStorageCellNumber(); sc++) { for (int gain = 0; gain < 3; gain++) { if (experiment.IsFixedGainG1() && (gain != 1)) continue; auto v = compressor.Compress(calibration->GetPedestal(gain, sc)); std::string channel = "pedestal_G" + std::to_string(gain); if (experiment.GetStorageCellNumber() > 1) channel += "_sc" + std::to_string(sc); CompressedImage image{ .data = v.data(), .size = v.size(), .xpixel = (size_t) xpixel, .ypixel = (size_t) ypixel, .pixel_depth_bytes = 2, .pixel_is_signed = false, .pixel_is_float = false, .algorithm = CompressionAlgorithm::BSHUF_LZ4, .channel = channel }; image_pusher.SendCalibration(image); } } } void JFJochReceiver::AcquireThread(uint16_t data_stream) { try { NUMAHWPolicy::RunOnNode(acquisition_device[data_stream].GetNUMANode()); } catch (const JFJochException &e) { logger.Warning("NUMA bind error {} for device thread {} - continuing without binding", e.what(), data_stream); } try { if (calibration != nullptr) acquisition_device[data_stream].InitializeCalibration(experiment, *calibration); size_t m_offset = experiment.GetFirstModuleOfDataStream(data_stream); acquisition_device[data_stream].InitializeROIMap(experiment); std::vector tmp(RAW_MODULE_SIZE); for (int m = 0; m < experiment.GetModulesNum(data_stream); m++) { CalcSpotFinderResolutionMap(tmp.data(), experiment, m_offset + m); acquisition_device[data_stream].InitializeSpotFinderResolutionMap(tmp.data(), m); CalcAzIntCorrRawCoord(tmp.data(), experiment, m_offset + m); acquisition_device[data_stream].InitializeIntegrationMap(az_int_mapping.GetPixelToBinMappingRaw().data() + (m_offset + m) * RAW_MODULE_SIZE, tmp.data(), m); } frame_transformation_ready.wait(); logger.Debug("Device thread {} start FPGA action", data_stream); acquisition_device[data_stream].StartAction(experiment); } catch (const JFJochException &e) { Cancel(e); data_acquisition_ready.count_down(); logger.ErrorException(e); logger.Warning("Device thread {} done due to an error", data_stream); return; } data_acquisition_ready.count_down(); try { bool send_wr_to_fpga_immediately = (experiment.GetImageNum() == 0); logger.Debug("Device thread {} wait for FPGA action complete", data_stream); acquisition_device[data_stream].WaitForActionComplete(send_wr_to_fpga_immediately); } catch (const JFJochException &e) { logger.ErrorException(e); Cancel(e); logger.ErrorException(e); logger.Warning("Device thread {} done due to an error", data_stream); return; } logger.Info("Device thread {} done", data_stream); } void JFJochReceiver::RetrievePedestal() { try { if ((experiment.GetDetectorMode() == DetectorMode::PedestalG0) || ((experiment.GetDetectorMode() == DetectorMode::PedestalG1) && experiment.IsFixedGainG1())) { pedestal_result.resize(experiment.GetModulesNum() * experiment.GetStorageCellNumber()); for (int s = 0; s < experiment.GetStorageCellNumber(); s++) { for (int d = 0; d < ndatastreams; d++) { for (int m = 0; m < experiment.GetModulesNum(d); m++) { size_t offset = experiment.GetModulesNum() * s + experiment.GetFirstModuleOfDataStream(d) + m; JFModulePedestal pedestal(16384); try { pedestal.ImportFPGAPedestal(acquisition_device[d].GetDeviceOutputPedestal(s, m)); } catch (const JFJochException& e) { logger.Warning("Pedestal not collected for module {} storage cell {}", s, m); } pedestal_result[offset] = pedestal; pedestal_result[offset].SetCollectionTime(start_time.time_since_epoch().count() / 1e9); } } } } else if ((experiment.GetDetectorMode() == DetectorMode::PedestalG1) || (experiment.GetDetectorMode() == DetectorMode::PedestalG2)) { pedestal_result.resize(experiment.GetModulesNum()); for (int d = 0; d < ndatastreams; d++) { for (int m = 0; m < experiment.GetModulesNum(d); m++) { size_t offset = experiment.GetFirstModuleOfDataStream(d) + m; JFModulePedestal pedestal(16384); try { pedestal.ImportFPGAPedestal(acquisition_device[d].GetDeviceOutputPedestal( (experiment.GetStorageCellNumber() == 2) ? 1 : 0, m)); } catch (const JFJochException& e) { logger.Warning("Pedestal not collected for module {}", m); } pedestal_result[offset] = pedestal; pedestal_result[offset].SetCollectionTime(start_time.time_since_epoch().count() / 1e9); } } } } catch (const JFJochException& e) { logger.ErrorException(e); } } void JFJochReceiver::FrameTransformationThread(uint32_t threadid) { std::unique_ptr analyzer; try { numa_policy.Bind(threadid); analyzer = std::make_unique(experiment); } catch (const JFJochException &e) { frame_transformation_ready.count_down(); logger.Error("Thread setup error {}", e.what()); Cancel(e); return; } FrameTransformation transformation(experiment); frame_transformation_ready.count_down(); uint16_t az_int_min_bin = std::floor(az_int_mapping.QToBin(experiment.GetLowQForBkgEstimate_recipA())); uint16_t az_int_max_bin = std::ceil(az_int_mapping.QToBin(experiment.GetHighQForBkgEstimate_recipA())); uint64_t image_number; while (images_to_go.Get(image_number) != 0) { try { logger.Debug("Frame transformation thread - trying to get image {}", image_number); // If data acquisition is finished and fastest frame for the first device is behind acquisition_device[0].Counters().WaitForFrame(image_number + 1); logger.Debug("Frame transformation thread - frame arrived {}", image_number + 1); if (acquisition_device[0].Counters().IsAcquisitionFinished() && (acquisition_device[0].Counters().GetFastestFrameNumber() < image_number)) { logger.Debug("Frame transformation thread - skipping image {}", image_number); continue; } DataMessage message{}; message.number = image_number; message.original_number = image_number; message.user_data = experiment.GetImageAppendix(); message.series_id = experiment.GetSeriesID(); message.series_unique_id = experiment.GetSeriesIDString(); ImageMetadata metadata(experiment); AzimuthalIntegrationProfile az_int_profile_image(az_int_mapping); auto local_spot_finding_settings = GetSpotFindingSettings(); logger.Debug("Frame transformation thread - processing image from FPGA {}", image_number); for (int d = 0; d < ndatastreams; d++) { for (int m = 0; m < experiment.GetModulesNum(d); m++) { acquisition_device[d].Counters().WaitForFrame(image_number + 1, m); if (acquisition_device[d].Counters().IsAnyPacketCollected(image_number, m)) { const DeviceOutput* output = acquisition_device[d].GetDeviceOutput(image_number, m); metadata.Process(output); size_t module_abs_number = experiment.GetFirstModuleOfDataStream(d) + m; adu_histogram_module[module_abs_number].Add(*output); az_int_profile_image.Add(*output); analyzer->ReadFromFPGA(output, local_spot_finding_settings, module_abs_number); transformation.ProcessModule(output, d); } else transformation.FillNotCollectedModule(m, d); acquisition_device[d].FrameBufferRelease(image_number, m); } auto delay = acquisition_device[d].Counters().CalculateDelay(image_number); UpdateMaxDelay(delay); if (delay > message.receiver_aq_dev_delay) message.receiver_aq_dev_delay = delay; } metadata.Export(message, 256 * experiment.GetModulesNum() * experiment.GetSummation()); if (message.image_collection_efficiency == 0.0f) { plots.AddEmptyImage(message); continue; } analyzer->Process(message, local_spot_finding_settings); message.receiver_free_send_buf = send_buf_ctrl.GetAvailBufLocations(); message.az_int_profile = az_int_profile_image.GetResult(); message.bkg_estimate = az_int_profile_image.GetMeanValueOfBins(az_int_min_bin, az_int_max_bin); plots.Add(message, image_number % experiment.GetTimePointNumber(), az_int_profile_image); preview_image.UpdateImage(transformation.GetImage(), message.spots); if (message.indexing_result) preview_image_indexed.UpdateImage(transformation.GetImage(), message.spots); images_collected++; if (!push_images_to_writer) continue; if (!serialmx_filter.ApplyFilter(message)) { images_skipped++; continue; } auto loc = send_buf_ctrl.GetBufLocation(); if (loc == nullptr) continue; auto writer_buffer = (uint8_t *)loc->get_ptr(); CBORStream2Serializer serializer(writer_buffer, experiment.GetSendBufferLocationSize()); PrepareCBORImage(message, experiment, nullptr, 0); serializer.SerializeImage(message); if (experiment.GetSendBufferLocationSize() - serializer.GetImageAppendOffset() < experiment.GetMaxCompressedSize()) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Not enough memory to save image"); size_t image_size = transformation.CompressImage(writer_buffer + serializer.GetImageAppendOffset()); serializer.AppendImage(image_size); compressed_size += image_size; image_pusher.SendImage(writer_buffer, serializer.GetBufferSize(), image_number, loc); images_sent++; // Handle case when image not sent properly UpdateMaxImage(image_number); logger.Debug("Frame transformation thread - done sending image {}", image_number); } catch (const JFJochException &e) { logger.ErrorException(e); Cancel(e); } } logger.Debug("Sum&compression thread done"); } float JFJochReceiver::GetEfficiency() const { uint64_t expected_packets = 0; uint64_t received_packets = 0; for (int d = 0; d < ndatastreams; d++) { expected_packets += acquisition_device[d].Counters().GetExpectedPackets(); received_packets += acquisition_device[d].Counters().GetTotalPackets(); } if ((expected_packets == received_packets) || (expected_packets == 0)) return 1.0; else return received_packets / static_cast(expected_packets); } void JFJochReceiver::Cancel(bool silent) { if (!silent) { // Remote abort: This tells FPGAs to stop, but doesn't do anything to CPU code logger.Warning("Cancelling on request"); cancelled = true; } for (int d = 0; d < ndatastreams; d++) acquisition_device[d].Cancel(); } void JFJochReceiver::Cancel(const JFJochException &e) { logger.Error("Cancelling data collection due to exception"); logger.ErrorException(e); // Error abort: This tells FPGAs to stop and also prevents deadlock in CPU code, by setting abort to 1 cancelled = true; for (int d = 0; d < ndatastreams; d++) acquisition_device[d].Cancel(); } float JFJochReceiver::GetProgress() const { int64_t frames = experiment.GetImageNum(); if (frames == 0) frames = experiment.GetFrameNum(); if ((frames == 0) || (acquisition_device[0].Counters().IsAcquisitionFinished())) return 1.0; else return static_cast(acquisition_device[0].Counters().GetSlowestFrameNumber()) / static_cast(frames); } void JFJochReceiver::FinalizeMeasurement() { if (!frame_transformation_futures.empty()) { for (auto &future: frame_transformation_futures) future.get(); logger.Info("All processing threads done"); } if (push_images_to_writer) { EndMessage message{}; message.max_image_number = max_image_number_sent; message.images_collected_count = images_collected; message.images_sent_to_write_count = images_sent; message.max_receiver_delay = max_delay; message.efficiency = GetEfficiency(); message.end_date = time_UTC(std::chrono::system_clock::now()); message.series_id = experiment.GetSeriesID(); message.series_unique_id = experiment.GetSeriesIDString(); message.az_int_result["dataset"] = plots.GetAzIntProfile(); for (int i = 0; i < experiment.GetTimePointNumber(); i++) message.az_int_result["file" + std::to_string(i)] = plots.GetAzIntProfilePerFile(i); for (int i = 0; i < adu_histogram_module.size(); i++) message.adu_histogram["module" + std::to_string(i)] = adu_histogram_module[i].GetHistogram(); if (!image_pusher.EndDataCollection(message)) logger.Error("End message not sent via ZeroMQ (time-out)"); logger.Info("Disconnected from writers"); } if (experiment.GetImageNum() > 0) { for (int d = 0; d < ndatastreams; d++) acquisition_device[d].Cancel(); } end_time = std::chrono::system_clock::now(); for (auto &future : data_acquisition_futures) future.get(); RetrievePedestal(); logger.Info("Devices stopped"); if (!send_buf_ctrl.CheckIfBufferReturned(std::chrono::seconds(10))) { logger.Error("Send commands not finalized in 10 seconds"); throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Send commands not finalized in 10 seconds"); } logger.Info("Receiving data done"); } void JFJochReceiver::SetSpotFindingSettings(const SpotFindingSettings &in_spot_finding_settings) { std::unique_lock ul(spot_finding_settings_mutex); DiffractionExperiment::CheckDataProcessingSettings(in_spot_finding_settings); spot_finding_settings = in_spot_finding_settings; for (int i = 0; i < ndatastreams; i++) acquisition_device[i].SetSpotFinderParameters(spot_finding_settings); } void JFJochReceiver::StopReceiver() { if (measurement.valid()) { measurement.get(); logger.Info("Receiver stopped"); } } JFJochReceiver::~JFJochReceiver() { if (measurement.valid()) measurement.get(); } SpotFindingSettings JFJochReceiver::GetSpotFindingSettings() { std::unique_lock ul(spot_finding_settings_mutex); return spot_finding_settings; } MultiLinePlot JFJochReceiver::GetPlots(const PlotRequest &request) { return plots.GetPlots(request); } void JFJochReceiver::UpdateMaxImage(uint64_t image_number) { std::unique_lock ul(max_image_number_sent_mutex); if (image_number + 1 > max_image_number_sent) max_image_number_sent = image_number + 1; } void JFJochReceiver::UpdateMaxDelay(uint64_t delay) { std::unique_lock ul(max_delay_mutex); if (delay > max_delay) max_delay = delay; } JFJochReceiverOutput JFJochReceiver::GetStatistics() const { JFJochReceiverOutput ret; for (int d = 0; d < ndatastreams; d++) { for (int m = 0; m < acquisition_device[d].Counters().GetModuleNumber(); m++) { ret.expected_packets.push_back(acquisition_device[d].Counters().GetExpectedPacketsPerModule()); ret.received_packets.push_back(acquisition_device[d].Counters().GetTotalPackets(m)); } } ret.efficiency = GetEfficiency(); ret.start_time_ms = std::chrono::duration_cast(start_time.time_since_epoch()).count(); ret.end_time_ms = std::chrono::duration_cast(end_time.time_since_epoch()).count(); ret.pedestal_result = pedestal_result; ret.status = GetStatus(); return ret; } JFJochReceiverStatus JFJochReceiver::GetStatus() const { JFJochReceiverStatus ret; ret.indexing_rate = plots.GetIndexingRate(); ret.bkg_estimate = plots.GetBkgEstimate(); if ((experiment.GetImageNum() > 0) && (compressed_size > 0)) { ret.compressed_ratio = static_cast ((images_sent + images_skipped) * experiment.GetPixelDepth() * experiment.GetModulesNum() * RAW_MODULE_SIZE) / static_cast (compressed_size); } ret.progress = GetProgress(); ret.compressed_size = compressed_size; ret.max_receive_delay = max_delay; ret.max_image_number_sent = max_image_number_sent; ret.images_collected = images_collected; ret.images_sent = images_sent; ret.images_skipped = images_skipped; ret.cancelled = cancelled; return ret; } std::string JFJochReceiver::GetTIFF(bool calibration_run) const { if (calibration_run) return preview_image.GenerateTIFFDioptas(); else return preview_image.GenerateTIFF(); } std::string JFJochReceiver::GetJPEG(const PreviewJPEGSettings &settings) const { if (settings.show_indexed) return preview_image_indexed.GenerateJPEG(settings); else return preview_image.GenerateJPEG(settings); } void JFJochReceiver::GetXFELEventCode(std::vector &v) const { if (experiment.IsPulsedSource()) plots.GetXFELEventCode(v); } void JFJochReceiver::GetXFELPulseID(std::vector &v) const { if (experiment.IsPulsedSource()) plots.GetXFELPulseID(v); }