// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include "JFJochReceiverService.h" #include "../preview/ZMQMetadataSocket.h" JFJochReceiverService::JFJochReceiverService(AcquisitionDeviceGroup &in_aq_devices, Logger &in_logger, ImagePusher &pusher, size_t send_buffer_size_MiB) : logger(in_logger), aq_devices(in_aq_devices), image_pusher(pusher), spot_finding_settings(DiffractionExperiment::DefaultDataProcessingSettings()), buffer(send_buffer_size_MiB * 1024 * 1024) {} JFJochReceiverService& JFJochReceiverService::NumThreads(int64_t input) { if (input <= 0) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Thread number must be above zero"); nthreads = input; return *this; } JFJochReceiverService &JFJochReceiverService::NUMAPolicy(const NUMAHWPolicy &policy) { numa_policy = policy; return *this; } JFJochReceiverService &JFJochReceiverService::NUMAPolicy(const std::string &policy) { numa_policy = NUMAHWPolicy(policy); return *this; } void JFJochReceiverService::FinalizeMeasurementChangeState() { std::unique_lock ul(state_mutex); state = ReceiverState::Idle; measurement_done.notify_all(); } void JFJochReceiverService::FinalizeMeasurement() { try { receiver->StopReceiver(); } catch (...) { FinalizeMeasurementChangeState(); throw; } FinalizeMeasurementChangeState(); } std::optional JFJochReceiverService::GetStatus() { return receiver_status.GetStatus(); } void JFJochReceiverService::Start(const DiffractionExperiment &experiment, const PixelMask &pixel_mask, const JFCalibration *calibration) { std::unique_lock ul_state(state_mutex); // unique lock, as it will destroy and create receiver object if (state != ReceiverState::Idle) throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Receiver not idle, cannot start"); try { preview_image.Configure(experiment, pixel_mask); preview_image_indexed.Configure(experiment, pixel_mask); auto nthreads_local = nthreads; if (experiment.IsCPUSummation()) nthreads_local = 4; // Thanks to properties of unique_ptr, starting new measurement will call destructor of JFJochReceiver, which will // ensure that everything was rolled back receiver = std::make_unique(experiment, pixel_mask, calibration, aq_devices, image_pusher, logger, nthreads_local, numa_policy, spot_finding_settings, preview_image, preview_image_indexed, receiver_status, plots, buffer, zmq_preview_socket.get(), zmq_metadata_socket.get()); try { // Don't want to stop receiver->SetSpotFindingSettings(spot_finding_settings); } catch (...) {} measurement = std::async(std::launch::async, &JFJochReceiverService::FinalizeMeasurement, this); state = ReceiverState::Running; } catch (const JFJochException &e) { logger.ErrorException(e); throw; } } void JFJochReceiverService::Cancel(bool silent) { std::unique_lock ul(state_mutex); if (state == ReceiverState::Running) receiver->Cancel(silent); } JFJochReceiverOutput JFJochReceiverService::Stop() { std::unique_lock ul(state_mutex); measurement_done.wait(ul, [this] { return (state != ReceiverState::Running);}); if (state != ReceiverState::Idle) throw JFJochException(JFJochExceptionCategory::WrongReceiverState, "Receiver in weird state"); try { if (measurement.valid()) measurement.get(); } catch (JFJochException &e) { logger.ErrorException(e); throw; } if (!receiver) { logger.Warning("Request to stop while receiver not running"); throw JFJochException(JFJochExceptionCategory::WrongReceiverState, "Receiver idle, cannot stop"); } return receiver->GetFinalStatistics(); } void JFJochReceiverService::SetSpotFindingSettings(const SpotFindingSettings &settings) { try { std::unique_lock ul(state_mutex); DiffractionExperiment::CheckDataProcessingSettings(settings); spot_finding_settings = settings; if (state != ReceiverState::Idle) receiver->SetSpotFindingSettings(settings); } catch (std::exception &e) { logger.ErrorException(e); throw; } } MultiLinePlot JFJochReceiverService::GetDataProcessingPlot(const PlotRequest &request) { return plots.GetPlots(request); } std::vector JFJochReceiverService::GetNetworkConfig() { return aq_devices.GetNetworkConfig(); } std::string JFJochReceiverService::GetTIFF(bool calibration) const { if (calibration) return preview_image.GenerateTIFFDioptas(); else return preview_image.GenerateTIFF(); } std::string JFJochReceiverService::GetJPEG(const PreviewJPEGSettings &settings) const { if (settings.show_indexed) return preview_image_indexed.GenerateJPEG(settings); else return preview_image.GenerateJPEG(settings); } void JFJochReceiverService::LoadInternalGeneratorImage(const DiffractionExperiment &experiment, const std::vector &image, uint64_t image_number) { std::vector raw_geom, eiger_geom; const uint16_t *frame; if (image.size() == RAW_MODULE_SIZE * experiment.GetModulesNum()) { frame = image.data(); } else if (image.size() == experiment.GetPixelsNum()) { raw_geom.resize(RAW_MODULE_SIZE * experiment.GetModulesNum()); ConvertedToRawGeometry(experiment, raw_geom.data(), image.data()); frame = raw_geom.data(); } else throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Size of input array with raw expected image is wrong"); for (int i = 0; i < experiment.GetDataStreamsNum(); i++) { uint32_t module0 = experiment.GetFirstModuleOfDataStream(i); switch (experiment.GetDetectorSetup().GetDetectorType()) { case DetectorType::EIGER: eiger_geom.resize(RAW_MODULE_SIZE); for (int m = 0; m < experiment.GetModulesNum(i); m++) { RawToEigerInput(eiger_geom.data(), frame + (module0 + m) * RAW_MODULE_SIZE); aq_devices[i].SetInternalGeneratorFrame(eiger_geom.data(), m + experiment.GetModulesNum(i) * image_number); } break; case DetectorType::JUNGFRAU: for (int m = 0; m < experiment.GetModulesNum(i); m++) aq_devices[i].SetInternalGeneratorFrame(frame + (module0 + m) * RAW_MODULE_SIZE, m + experiment.GetModulesNum(i) * image_number); break; default: throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Detector not supported"); } } } void JFJochReceiverService::GetXFELEventCode(std::vector &v) const { plots.GetXFELEventCode(v); } void JFJochReceiverService::GetXFELPulseID(std::vector &v) const { plots.GetXFELPulseID(v); } std::vector JFJochReceiverService::GetDeviceStatus() const { return aq_devices.GetDeviceStatus(); } std::optional JFJochReceiverService::GetProgress() const { return receiver_status.GetProgress(); } JFJochReceiverService &JFJochReceiverService::PreviewSocket(const std::string &addr) { if (!addr.empty()) { logger.Info("ZeroMQ preview socket available at {}", addr); zmq_preview_socket = std::make_unique(addr); } return *this; } JFJochReceiverService & JFJochReceiverService::MetadataSocket(const std::string &addr) { if (!addr.empty()) { logger.Info("ZeroMQ metadata socket available at {}", addr); zmq_metadata_socket = std::make_unique(addr); } return *this; } std::string JFJochReceiverService::GetPreviewSocketAddress() const { if (zmq_preview_socket) return zmq_preview_socket->GetAddress(); return ""; } std::string JFJochReceiverService::GetMetadataSocketAddress() const { if (zmq_metadata_socket) return zmq_metadata_socket->GetAddress(); return ""; } JFJochReceiverService &JFJochReceiverService::PreviewSocketSettings(const ZMQPreviewSettings &input) { if (zmq_preview_socket) zmq_preview_socket->ImportSettings(input); return *this; } JFJochReceiverService & JFJochReceiverService::MetadataSocketSettings(const ZMQMetadataSettings &input) { if (zmq_metadata_socket) zmq_metadata_socket->ImportSettings(input); return *this; } ZMQPreviewSettings JFJochReceiverService::GetPreviewSocketSettings() const { if (zmq_preview_socket) return zmq_preview_socket->GetSettings(); return {}; } ZMQMetadataSettings JFJochReceiverService::GetMetadataSocketSettings() const { if (zmq_metadata_socket) return zmq_metadata_socket->GetSettings(); return {}; }