// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include "JFJochReceiverLite.h" #include "../image_analysis/indexing/IndexerFactory.h" using namespace std::chrono_literals; int64_t JFJochReceiverLite::NumberOfDataAnalysisThreads(int64_t requested_thread_number, const DiffractionExperiment& in_experiment) { int64_t number_of_images = in_experiment.GetImageNum(); auto image_time = in_experiment.GetImageTime(); if (requested_thread_number <= 0) return 1; // For very small datasets no need to go multithreaded if (number_of_images < 4) return 1; if (number_of_images < 16) return std::min(2, requested_thread_number); // For 100 Hz, there is no reason to go for a very large number of threads if (number_of_images < 256 || (image_time >= 10ms)) return std::min(16, requested_thread_number); return requested_thread_number; } JFJochReceiverLite::JFJochReceiverLite(const DiffractionExperiment &in_experiment, const PixelMask &in_pixel_mask, ImagePuller &in_image_puller, ImagePusher &in_image_pusher, Logger &in_logger, int64_t forward_and_sum_nthreads, const NUMAHWPolicy &in_numa_policy, const SpotFindingSettings &in_spot_finding_settings, PreviewImage &in_preview_image, JFJochReceiverCurrentStatus &in_current_status, JFJochReceiverPlots &in_plots, ImageBuffer &in_image_buffer, ZMQPreviewSocket *in_zmq_preview_socket, ZMQMetadataSocket *in_zmq_metadata_socket, IndexerThreadPool *indexing_thread_pool) : JFJochReceiver(in_experiment, in_image_buffer, in_image_pusher, in_preview_image, in_current_status, in_plots, in_spot_finding_settings, in_logger, in_numa_policy, in_pixel_mask, in_zmq_preview_socket, in_zmq_metadata_socket, indexing_thread_pool), image_puller(in_image_puller), data_analysis_nthreads(NumberOfDataAnalysisThreads(forward_and_sum_nthreads, in_experiment)), data_analysis_started(data_analysis_nthreads), measurement_started(1), dark_mask_analysis(in_experiment.GetDarkMaskSettings(), in_experiment.GetPixelsNum()) { logger.Info("Starting {} data analysis threads", data_analysis_nthreads); if (experiment.GetDetectorMode() == DetectorMode::DarkMask) { for (int i = 0; i < data_analysis_nthreads; i++) data_analysis_futures.emplace_back( std::async(std::launch::async, &JFJochReceiverLite::MaskThread, this, i) ); } else { // Start frame transformation threads for (int i = 0; i < data_analysis_nthreads; i++) data_analysis_futures.emplace_back( std::async(std::launch::async, &JFJochReceiverLite::DataAnalysisThread, this, i) ); } measurement = std::async(std::launch::async, &JFJochReceiverLite::MeasurementThread, this); data_analysis_started.wait(); logger.Info("Data analysis threads ready"); } JFJochReceiverLite::~JFJochReceiverLite() { cancelled = true; if (measurement.valid()) measurement.get(); } void JFJochReceiverLite::MeasurementThread() { try { // Wait for start message to arrive auto msg = image_puller.PollImage(); while (!cancelled && (!msg.has_value() || !msg->cbor || !msg->cbor->start_message)) msg = image_puller.PollImage(); if (cancelled) { image_puller.Disconnect(); current_status.SetProgress({}); current_status.SetStatus(GetStatus()); measurement_started.count_down(); return; // just quit the function - no measurement, no results } Configure(msg->cbor->start_message.value()); start_time = std::chrono::system_clock::now(); // Send new start message out SendStartMessage(); measurement_started.count_down(); } catch (const JFJochException &e) { logger.ErrorException(e); image_puller.Disconnect(); Cancel(e); measurement_started.count_down(); throw; } logger.Info("Receiving started"); // Analysis is running // ... // Till it is done. // Combine frame transformation threads for (auto &f: data_analysis_futures) f.get(); logger.Info("Data analysis threads finished"); current_status.SetProgress(1.0); current_status.SetStatus(GetStatus()); // Send end message out SendEndMessage(); if (!image_buffer.CheckIfBufferReturned(std::chrono::seconds(10))) { logger.Error("Send commands not finalized in 10 seconds"); throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Send commands not finalized in 10 seconds"); } logger.Info("All images sent through ZeroMQ"); if (push_images_to_writer) writer_error = image_pusher.Finalize(); image_puller.Disconnect(); logger.Info("Writing process finalized"); end_time = std::chrono::system_clock::now(); current_status.SetProgress({}); current_status.SetStatus(GetStatus()); } void JFJochReceiverLite::Configure(const StartMessage &msg) { if ((experiment.GetXPixelsNum() != msg.image_size_x) || (experiment.GetYPixelsNum() != msg.image_size_y)) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Mismatch in detector size"); if (fabs(experiment.GetPixelSize_mm() - msg.pixel_size_x * 1e3) > 1e-7) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Mismatch in pixel size"); experiment.Detector().Description(msg.detector_description); experiment.Detector().SerialNumber(msg.detector_serial_number); experiment.Detector().SensorMaterial(msg.sensor_material); experiment.Detector().SensorThickness_um(msg.sensor_thickness * 1e6); experiment.Detector().SaturationLimit(msg.saturation_value); experiment.Detector().BitDepthImage(msg.bit_depth_image); if (msg.bit_depth_readout) experiment.Detector().BitDepthReadout(msg.bit_depth_readout.value()); } void JFJochReceiverLite::MaskThread(uint32_t id) { std::vector buffer; data_analysis_started.count_down(); measurement_started.wait(); while (!cancelled && !end_message_received) { try { auto msg = image_puller.PollImage(std::chrono::milliseconds(5)); if (!msg.has_value() || !msg->cbor) { // Message not received or not parsed continue; } else if (msg->cbor->end_message) { end_message_received = true; logger.Debug("Thread {} end message received in JFJochReceiverLite", id); } else if (msg->cbor->data_message) { ++images_collected; DataMessage data_msg = msg->cbor->data_message.value(); dark_mask_analysis.AnalyzeImage(data_msg, buffer); compressed_size += data_msg.image.GetCompressedSize(); uncompressed_size += data_msg.image.GetUncompressedSize(); UpdateMaxImageReceived(data_msg.number); auto loc = image_buffer.GetImageSlot(); if (loc == nullptr) writer_queue_full = true; else { auto writer_buffer = (uint8_t *) loc->GetImage(); CBORStream2Serializer serializer(writer_buffer, experiment.GetImageBufferLocationSize()); serializer.SerializeImage(data_msg); loc->SetImageNumber(data_msg.number); loc->SetImageSize(serializer.GetBufferSize()); loc->SetIndexed(false); loc->release(); } } current_status.SetProgress(GetProgress()); current_status.SetStatus(GetStatus()); } catch (const JFJochException &e) { logger.ErrorException(e); Cancel(e); } } } void JFJochReceiverLite::DataAnalysisThread(uint32_t id) { std::vector buffer; std::unique_ptr analysis; logger.Debug("Thread {} started", id); try { numa_policy.Bind(id); data_analysis_started.count_down(); } catch (const JFJochException &e) { data_analysis_started.count_down(); Cancel(e); return; } measurement_started.wait(); try { analysis = std::make_unique(experiment, az_int_mapping, pixel_mask, indexer); } catch (const JFJochException &e) { Cancel(e); return; } while (!cancelled && !end_message_received) { try { auto msg = image_puller.PollImage(std::chrono::milliseconds(5)); if (!msg.has_value() || !msg->cbor) { // Message not received or not parsed continue; } else if (msg->cbor->end_message) { // Message is end message end_message_received = true; logger.Debug("Thread {} End message received in JFJochReceiverLite", id); } else if (msg->cbor->calibration) { // Calibration messages are just forwarded if (push_images_to_writer) image_pusher.SendCalibration(msg->cbor->calibration.value()); } else if (msg->cbor->data_message) { auto start = std::chrono::high_resolution_clock::now(); DataMessage data_msg = msg->cbor->data_message.value(); ++images_collected; compressed_size += data_msg.image.GetCompressedSize(); uncompressed_size += data_msg.image.GetUncompressedSize(); UpdateMaxImageReceived(data_msg.number); auto image_start_time = std::chrono::high_resolution_clock::now(); AzimuthalIntegrationProfile profile(az_int_mapping); analysis->Analyze(data_msg, buffer, profile, GetSpotFindingSettings()); auto image_end_time = std::chrono::high_resolution_clock::now(); std::chrono::duration image_duration = image_end_time - image_start_time; data_msg.processing_time_s = image_duration.count(); data_msg.original_number = data_msg.number; data_msg.user_data = experiment.GetImageAppendix(); data_msg.run_number = experiment.GetRunNumber(); data_msg.run_name = experiment.GetRunName(); data_msg.receiver_free_send_buf = image_buffer.GetAvailSlots(); data_msg.receiver_aq_dev_delay = image_puller.GetCurrentFifoUtilization(); saturated_pixels.Add(data_msg.saturated_pixel_count); error_pixels.Add(data_msg.error_pixel_count); if (data_msg.roi.contains("beam")) { roi_beam_npixel.Add(data_msg.roi["beam"].pixels); roi_beam_sum.Add(data_msg.roi["beam"].sum); } plots.Add(data_msg, profile); scan_result.Add(data_msg); if (!serialmx_filter.ApplyFilter(data_msg)) ++images_skipped; else { auto loc = image_buffer.GetImageSlot(); if (loc == nullptr) writer_queue_full = true; else { auto writer_buffer = (uint8_t *) loc->GetImage(); CBORStream2Serializer serializer(writer_buffer, experiment.GetImageBufferLocationSize()); serializer.SerializeImage(data_msg); loc->SetImageNumber(data_msg.number); loc->SetImageSize(serializer.GetBufferSize()); loc->SetIndexed(data_msg.indexing_result.value_or(false)); if (zmq_preview_socket != nullptr) zmq_preview_socket->SendImage(writer_buffer, serializer.GetBufferSize()); if (zmq_metadata_socket != nullptr) zmq_metadata_socket->AddDataMessage(data_msg); if (push_images_to_writer) { image_pusher.SendImage(*loc); ++images_sent; // Handle case when image not sent properly } else loc->release(); UpdateMaxImageSent(data_msg.number); } } auto end = std::chrono::high_resolution_clock::now(); std::chrono::duration duration_total = end - start; logger.Debug("Thread {} Image {:6d} processing {:8.04f} s analysis {:8.04f} s", id, data_msg.number, duration_total.count(), image_duration.count()); } UpdateMaxDelay(image_puller.GetCurrentFifoUtilization()); current_status.SetProgress(GetProgress()); current_status.SetStatus(GetStatus()); } catch (const JFJochException &e) { logger.ErrorException(e); Cancel(e); } } logger.Debug("Thread {} finished", id); } void JFJochReceiverLite::StopReceiver() { if (measurement.valid()) { measurement.get(); logger.Info("Receiver stopped"); } } float JFJochReceiverLite::GetEfficiency() const { if (experiment.GetFrameNum() == 0) return 0; return static_cast(images_collected) / static_cast(experiment.GetFrameNum()); } float JFJochReceiverLite::GetProgress() const { if (experiment.GetFrameNum() == 0) return 0.0; return static_cast(max_image_number_received) / static_cast(experiment.GetFrameNum()); } void JFJochReceiverLite::SetSpotFindingSettings(const SpotFindingSettings &in_spot_finding_settings) { std::unique_lock ul(spot_finding_settings_mutex); DiffractionExperiment::CheckDataProcessingSettings(in_spot_finding_settings); spot_finding_settings = in_spot_finding_settings; } JFJochReceiverOutput JFJochReceiverLite::GetFinalStatistics() const { JFJochReceiverOutput ret = JFJochReceiver::GetFinalStatistics(); if (experiment.GetDetectorMode() == DetectorMode::DarkMask) ret.dark_mask_result = dark_mask_analysis.GetMask(); return ret; }