// Copyright (2019-2023) Paul Scherrer Institute #include "JFJochReceiverService.h" inline PlotRequest Convert(const JFJochProtoBuf::PlotRequest& request) { PlotRequest ret; ret.binning = request.binning(); switch (request.type()) { case JFJochProtoBuf::BKG_ESTIMATE: ret.type = PlotType::BkgEstimate; break; case JFJochProtoBuf::RAD_INT: ret.type = PlotType::RadInt; break; case JFJochProtoBuf::SPOT_COUNT: ret.type = PlotType::SpotCount; break; case JFJochProtoBuf::INDEXING_RATE: ret.type = PlotType::IndexingRate; break; case JFJochProtoBuf::INDEXING_RATE_PER_FILE: ret.type = PlotType::IndexingRatePerFile; break; default: case JFJochProtoBuf::ADU_HISTOGRAM: ret.type = PlotType::ADUHistorgram; break; } return ret; } inline void Convert(const Plot& input, JFJochProtoBuf::Plot &output) { if (!input.x.empty()) *output.mutable_x() = {input.x.begin(), input.x.end()}; if (!input.y.empty()) *output.mutable_y() = {input.y.begin(), input.y.end()}; } inline DataProcessingSettings Convert(const JFJochProtoBuf::DataProcessingSettings &input) { DataProcessingSettings ret; ret.signal_to_noise_threshold = input.signal_to_noise_threshold(); ret.photon_count_threshold = input.photon_count_threshold(); ret.min_pix_per_spot = input.min_pix_per_spot(); ret.max_pix_per_spot = input.max_pix_per_spot(); ret.local_bkg_size = input.local_bkg_size(); ret.high_resolution_limit = input.high_resolution_limit(); ret.low_resolution_limit = input.low_resolution_limit(); ret.bkg_estimate_low_q = input.bkg_estimate_low_q(); ret.bkg_estimate_high_q = input.bkg_estimate_high_q(); ret.preview_indexed_only = input.preview_indexed_only(); return ret; } void Convert(const RadialIntegrationProfiles& input, JFJochProtoBuf::RadialIntegrationProfiles& output) { for (const auto &i: input.profiles) { auto tmp = output.add_profiles(); tmp->set_title(i.title); Convert(i.plot, *tmp->mutable_plot()); } } JFJochReceiverService::JFJochReceiverService(std::vector &open_capi_device, Logger &in_logger, ImagePusher &pusher) : logger(in_logger), aq_devices(open_capi_device), image_pusher(pusher), data_processing_settings(DiffractionExperiment::DefaultDataProcessingSettings()) { } JFJochReceiverService& JFJochReceiverService::NumThreads(int64_t input) { if (input <= 0) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Thread number must be above zero"); nthreads = input; return *this; } JFJochReceiverService &JFJochReceiverService::SendBufferCount(int64_t input) { if (input <= 0) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Send buffer count must be above zero"); send_buffer_count = input; return *this; } JFJochReceiverService& JFJochReceiverService::PreviewPublisher(ZMQPreviewPublisher *in_preview_writer) { preview_publisher = in_preview_writer; return *this; } JFJochReceiverService &JFJochReceiverService::NUMAPolicy(const NUMAHWPolicy &policy) { numa_policy = policy; return *this; } JFJochReceiverService &JFJochReceiverService::NUMAPolicy(const std::string &policy) { numa_policy = NUMAHWPolicy(policy); return *this; } grpc::Status JFJochReceiverService::Start(grpc::ServerContext *context, const JFJochProtoBuf::ReceiverInput *request, JFJochProtoBuf::Empty *response) { experiment = std::make_unique(request->jungfraujoch_settings()); if (request->has_calibration()) calibration = std::make_unique(request->calibration()); else calibration.reset(); try { Start(*experiment, calibration.get()); return grpc::Status::OK; } catch (const JFJochException &e) { logger.ErrorException(e); return {grpc::StatusCode::ABORTED, e.what()}; } } void JFJochReceiverService::FinalizeMeasurement() { receiver->StopReceiver(); { std::unique_lock ul(state_mutex); state = ReceiverState::Idle; measurement_done.notify_all(); } } grpc::Status JFJochReceiverService::Stop(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request, JFJochProtoBuf::ReceiverOutput *response) { try { auto output = Stop(); response->set_max_receive_delay(output.max_receive_delay); response->set_compressed_size(output.compressed_size); response->set_compressed_ratio(output.compressed_ratio); response->set_images_sent(output.images_sent); response->set_start_time_ms(output.start_time_ms); response->set_end_time_ms(output.end_time_ms); response->set_efficiency(output.efficiency); response->set_max_image_number_sent(output.max_image_number_sent); response->set_cancelled(output.cancelled); response->set_master_file_name(output.master_file_name); if (!output.pedestal_result.empty()) *response->mutable_pedestal_result() = {output.pedestal_result.begin(), output.pedestal_result.end()}; response->set_indexing_rate(output.indexing_rate); response->set_bkg_estimate(output.bkg_estimate); for (const auto &i: output.received_packets) response->add_received_packets(i); for (const auto &i: output.expected_packets) response->add_expected_packets(i); return grpc::Status::OK; } catch (JFJochException &e) { logger.ErrorException(e); return {grpc::StatusCode::ABORTED, e.what()}; } } grpc::Status JFJochReceiverService::Cancel(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request, JFJochProtoBuf::Empty *response) { try { Cancel(); return grpc::Status::OK; } catch (std::exception &e) { return {grpc::StatusCode::ABORTED, e.what()}; } } grpc::Status JFJochReceiverService::Abort(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request, JFJochProtoBuf::Empty *response) { try { Abort(); return grpc::Status::OK; } catch (std::exception &e) { return {grpc::StatusCode::ABORTED, e.what()}; } } grpc::Status JFJochReceiverService::GetStatus(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request, JFJochProtoBuf::ReceiverStatus *response) { // Need to hold mutex, as receiver might not exist here, if state is idle std::unique_lock ul(state_mutex); response->set_progress(-1); response->set_send_buffers_avail(-1); response->set_indexing_rate(-1); if (state == ReceiverState::Running) { response->set_progress(receiver->GetProgress()); response->set_send_buffers_avail(receiver->GetAvailableSendBuffers()); } if (receiver) { double indexing_rate = receiver->GetIndexingRate(); if (!std::isnan(indexing_rate)) response->set_indexing_rate(indexing_rate); } return grpc::Status::OK; } grpc::Status JFJochReceiverService::GetDataProcessingPlots(grpc::ServerContext *context, const JFJochProtoBuf::PlotRequest *request, JFJochProtoBuf::Plot *response) { // Need to hold mutex, as receiver might not exist here, if state is idle try { Convert(GetDataProcessingPlot(Convert(*request)), *response); return grpc::Status::OK; } catch (std::exception &e) { return {grpc::StatusCode::ABORTED, e.what()}; } } grpc::Status JFJochReceiverService::GetRadialIntegrationProfiles(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request, JFJochProtoBuf::RadialIntegrationProfiles *response) { // Need to hold mutex, as receiver might not exist here, if state is idle try { Convert(GetRadialIntegrationProfiles(), *response); return grpc::Status::OK; } catch (std::exception &e) { return {grpc::StatusCode::ABORTED, e.what()}; } } grpc::Status JFJochReceiverService::SetDataProcessingSettings(grpc::ServerContext *context, const JFJochProtoBuf::DataProcessingSettings *request, JFJochProtoBuf::Empty *response) { try { SetDataProcessingSettings(Convert(*request)); return grpc::Status::OK; } catch (std::exception &e) { return {grpc::StatusCode::ABORTED, e.what()}; } } grpc::Status JFJochReceiverService::GetNetworkConfig(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request, JFJochProtoBuf::ReceiverNetworkConfig *response) { for (const auto &aq: aq_devices) { auto dev_net_cfg = response->add_device(); dev_net_cfg->set_mac_addr(aq->GetMACAddress()); dev_net_cfg->set_ipv4_addr(aq->GetIPv4Address()); dev_net_cfg->set_udp_port(aq->GetUDPPort()); } return grpc::Status::OK; } void JFJochReceiverService::Start(const DiffractionExperiment &experiment, const JFCalibration *calibration) { std::unique_lock ul_state(state_mutex); if (state != ReceiverState::Idle) throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Receiver not idle, cannot start"); try { // Thanks to properties of unique_ptr, starting new measurement will call destructor of JFJochReceiver, which will // ensure that everything was rolled back // But it is important to do it in correct order - first abort old receiver, close it, than start new one receiver = std::make_unique(experiment, calibration, aq_devices, image_pusher, logger, nthreads, send_buffer_count, preview_publisher, numa_policy); try { // Don't want to stop receiver->SetDataProcessingSettings(data_processing_settings); } catch (...) {} measurement = std::async(std::launch::async, &JFJochReceiverService::FinalizeMeasurement, this); state = ReceiverState::Running; } catch (const JFJochException &e) { logger.ErrorException(e); throw; } } void JFJochReceiverService::Abort() { std::unique_lock ul(state_mutex); if (state == ReceiverState::Idle) throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Receiver not idle, cannot start"); else { receiver->Cancel(); } } void JFJochReceiverService::Cancel() { std::unique_lock ul(state_mutex); if (state == ReceiverState::Idle) throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Receiver not idle, cannot start"); else { receiver->Cancel(); } } JFJochReceiverOutput JFJochReceiverService::Stop() { std::unique_lock ul(state_mutex); measurement_done.wait(ul, [this] { return (state != ReceiverState::Running);}); if (state != ReceiverState::Idle) throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Receiver in weird state"); try { if (measurement.valid()) measurement.get(); } catch (JFJochException &e) { logger.ErrorException(e); throw; } if (!receiver) { logger.Warning("Request via gRPC, while receiver not running"); throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Receiver not idle, cannot start"); } return receiver->GetStatistics(); } void JFJochReceiverService::SetDataProcessingSettings(const DataProcessingSettings &settings) { try { std::unique_lock ul(state_mutex); data_processing_settings = settings; if (state != ReceiverState::Idle) receiver->SetDataProcessingSettings(settings); } catch (std::exception &e) { throw; } } Plot JFJochReceiverService::GetDataProcessingPlot(const PlotRequest &request) { // Need to hold mutex, as receiver might not exist here, if state is idle std::unique_lock ul(state_mutex); if (receiver) return receiver->GetPlots(request); else return {}; } RadialIntegrationProfiles JFJochReceiverService::GetRadialIntegrationProfiles() { // Need to hold mutex, as receiver might not exist here, if state is idle std::unique_lock ul(state_mutex); if (receiver) return receiver->GetRadialIntegrationProfiles(); else return {}; }