// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include "JFJochReceiverLite.h" JFJochReceiverLite::JFJochReceiverLite(const DiffractionExperiment &in_experiment, const PixelMask &in_pixel_mask, ImagePuller &in_image_puller, ImagePusher &in_image_pusher, Logger &in_logger, int64_t forward_and_sum_nthreads, const NUMAHWPolicy &in_numa_policy, const SpotFindingSettings &in_spot_finding_settings, PreviewImage &in_preview_image, JFJochReceiverCurrentStatus &in_current_status, JFJochReceiverPlots &in_plots, ImageBuffer &in_image_buffer, ZMQPreviewSocket *in_zmq_preview_socket, ZMQMetadataSocket *in_zmq_metadata_socket) : JFJochReceiver(in_experiment, in_image_buffer, in_image_pusher, in_preview_image, in_current_status, in_plots, in_spot_finding_settings, in_logger, in_numa_policy, in_pixel_mask, in_zmq_preview_socket, in_zmq_metadata_socket), image_puller(in_image_puller), frame_transformation_nthreads(forward_and_sum_nthreads) { measurement = std::async(std::launch::async, &JFJochReceiverLite::MeasurementThread, this); } JFJochReceiverLite::~JFJochReceiverLite() { cancelled = true; if (measurement.valid()) measurement.get(); } void JFJochReceiverLite::MeasurementThread() { // Wait for start message to arrive auto msg = image_puller.PollImage(); while (!cancelled && (!msg.has_value() || !msg->cbor || !msg->cbor->start_message)) msg = image_puller.PollImage(); if (cancelled) { image_puller.Disconnect(); current_status.SetProgress({}); current_status.SetStatus(GetStatus()); return; // just quit the function - no measurement, no results } try { Configure(msg->cbor->start_message.value()); start_time = std::chrono::system_clock::now(); // Send new start message out SendStartMessage(); } catch (const JFJochException &e) { logger.ErrorException(e); image_puller.Disconnect(); throw; } // Start frame transformation threads for (int i = 0; i < frame_transformation_nthreads; i++) frame_transformation_futures.emplace_back( std::async(std::launch::async, &JFJochReceiverLite::DataAnalysisThread, this, i) ); // Analysis is running // ... // Till it is done. // Combine frame transformation threads for (auto &f: frame_transformation_futures) f.get(); if (!image_buffer.CheckIfBufferReturned(std::chrono::seconds(10))) { logger.Error("Send commands not finalized in 10 seconds"); throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Send commands not finalized in 10 seconds"); } if (push_images_to_writer) writer_error = image_pusher.Finalize(); image_puller.Disconnect(); logger.Info("Writing process finalized"); end_time = std::chrono::system_clock::now(); // Send end message out SendEndMessage(); current_status.SetProgress({}); current_status.SetStatus(GetStatus()); } void JFJochReceiverLite::Configure(const StartMessage &msg) { if ((experiment.GetXPixelsNum() != msg.image_size_x) || (experiment.GetYPixelsNum() != msg.image_size_y)) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Mismatch in detector size"); if (fabs(experiment.GetPixelSize_mm() - msg.pixel_size_x * 1e3) > 1e-7) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Mismatch in pixel size"); experiment.Detector().Description(msg.detector_description); experiment.Detector().SerialNumber(msg.detector_serial_number); experiment.Detector().SensorMaterial(msg.sensor_material); experiment.Detector().SensorThickness_um(msg.sensor_thickness * 1e6); experiment.Detector().SaturationLimit(msg.saturation_value); experiment.Detector().BitDepthImage(msg.bit_depth_image); if (msg.bit_depth_readout) experiment.Detector().BitDepthReadout(msg.bit_depth_readout.value()); } void JFJochReceiverLite::DataAnalysisThread(uint32_t id) { std::vector buffer; std::unique_ptr analysis; try { numa_policy.Bind(id); analysis = std::make_unique(experiment, az_int_mapping, pixel_mask); analysis->NeuralNetInference(&inference_client); } catch (const JFJochException &e) { Cancel(e); return; } while (!cancelled && !end_message_received) { try { auto msg = image_puller.PollImage(); UpdateMaxDelay(image_puller.GetCurrentFifoUtilization()); // Message not received or not parsed if (!msg.has_value() || !msg->cbor) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } // Message is end message if (msg->cbor->end_message) { end_message_received = true; } else if (msg->cbor->calibration) { // Calibration messages are just forwarded if (push_images_to_writer) image_pusher.SendCalibration(msg->cbor->calibration.value()); } else if (msg->cbor->data_message) { DataMessage data_msg = msg->cbor->data_message.value(); ++images_collected; compressed_size += data_msg.image.GetCompressedSize(); uncompressed_size += data_msg.image.GetUncompressedSize(); UpdateMaxImageReceived(data_msg.number); AzimuthalIntegrationProfile profile(az_int_mapping); analysis->Analyze(data_msg, buffer, profile, GetSpotFindingSettings()); data_msg.original_number = data_msg.number; data_msg.user_data = experiment.GetImageAppendix(); data_msg.run_number = experiment.GetRunNumber(); data_msg.run_name = experiment.GetRunName(); data_msg.receiver_free_send_buf = image_buffer.GetAvailSlots(); data_msg.receiver_aq_dev_delay = image_puller.GetCurrentFifoUtilization(); saturated_pixels.Add(data_msg.saturated_pixel_count); error_pixels.Add(data_msg.error_pixel_count); if (data_msg.roi.contains("beam")) { roi_beam_npixel.Add(data_msg.roi["beam"].pixels); roi_beam_sum.Add(data_msg.roi["beam"].sum); } plots.Add(data_msg, profile); scan_result.Add(data_msg); if (!serialmx_filter.ApplyFilter(data_msg)) ++images_skipped; else { auto loc = image_buffer.GetImageSlot(); if (loc == nullptr) writer_queue_full = true; else { auto writer_buffer = (uint8_t *) loc->GetImage(); CBORStream2Serializer serializer(writer_buffer, experiment.GetImageBufferLocationSize()); serializer.SerializeImage(data_msg); loc->SetImageNumber(data_msg.number); loc->SetImageSize(serializer.GetBufferSize()); loc->SetIndexed(data_msg.indexing_result.value_or(false)); if (zmq_preview_socket != nullptr) zmq_preview_socket->SendImage(writer_buffer, serializer.GetBufferSize()); if (zmq_metadata_socket != nullptr) zmq_metadata_socket->AddDataMessage(data_msg); if (push_images_to_writer) { image_pusher.SendImage(*loc); ++images_sent; // Handle case when image not sent properly } else loc->release(); UpdateMaxImageSent(data_msg.number); } } } current_status.SetProgress(GetProgress()); current_status.SetStatus(GetStatus()); } catch (const JFJochException &e) { logger.ErrorException(e); Cancel(e); } } } void JFJochReceiverLite::StopReceiver() { if (measurement.valid()) { measurement.get(); logger.Info("Receiver stopped"); } } float JFJochReceiverLite::GetEfficiency() const { if (experiment.GetImageNum() == 0) return 0; return static_cast(images_collected) / static_cast(experiment.GetImageNum()); } float JFJochReceiverLite::GetProgress() const { if (experiment.GetImageNum() == 0) return 0.0; return static_cast(max_image_number_received) / static_cast(experiment.GetImageNum()); } void JFJochReceiverLite::SetSpotFindingSettings(const SpotFindingSettings &in_spot_finding_settings) { std::unique_lock ul(spot_finding_settings_mutex); DiffractionExperiment::CheckDataProcessingSettings(in_spot_finding_settings); spot_finding_settings = in_spot_finding_settings; }