Files
Jungfraujoch/receiver/JFJochReceiverService.cpp

339 lines
13 KiB
C++

// Copyright (2019-2023) Paul Scherrer Institute
#include "JFJochReceiverService.h"
inline PlotRequest Convert(const JFJochProtoBuf::PlotRequest& request) {
PlotRequest ret;
ret.binning = request.binning();
switch (request.type()) {
case JFJochProtoBuf::BKG_ESTIMATE:
ret.type = PlotType::BkgEstimate;
break;
case JFJochProtoBuf::RAD_INT:
ret.type = PlotType::RadInt;
break;
case JFJochProtoBuf::SPOT_COUNT:
ret.type = PlotType::SpotCount;
break;
case JFJochProtoBuf::INDEXING_RATE:
ret.type = PlotType::IndexingRate;
break;
case JFJochProtoBuf::INDEXING_RATE_PER_FILE:
ret.type = PlotType::IndexingRatePerFile;
break;
default:
case JFJochProtoBuf::ADU_HISTOGRAM:
ret.type = PlotType::ADUHistorgram;
break;
}
return ret;
}
inline void Convert(const Plot& input, JFJochProtoBuf::Plot &output) {
if (!input.x.empty())
*output.mutable_x() = {input.x.begin(), input.x.end()};
if (!input.y.empty())
*output.mutable_y() = {input.y.begin(), input.y.end()};
}
inline DataProcessingSettings Convert(const JFJochProtoBuf::DataProcessingSettings &input) {
DataProcessingSettings ret;
ret.signal_to_noise_threshold = input.signal_to_noise_threshold();
ret.photon_count_threshold = input.photon_count_threshold();
ret.min_pix_per_spot = input.min_pix_per_spot();
ret.max_pix_per_spot = input.max_pix_per_spot();
ret.local_bkg_size = input.local_bkg_size();
ret.high_resolution_limit = input.high_resolution_limit();
ret.low_resolution_limit = input.low_resolution_limit();
ret.bkg_estimate_low_q = input.bkg_estimate_low_q();
ret.bkg_estimate_high_q = input.bkg_estimate_high_q();
ret.preview_indexed_only = input.preview_indexed_only();
return ret;
}
void Convert(const RadialIntegrationProfiles& input, JFJochProtoBuf::RadialIntegrationProfiles& output) {
for (const auto &i: input.profiles) {
auto tmp = output.add_profiles();
tmp->set_title(i.title);
Convert(i.plot, *tmp->mutable_plot());
}
}
JFJochReceiverService::JFJochReceiverService(AcquisitionDeviceGroup &in_aq_devices,
Logger &in_logger, ImagePusher &pusher) :
logger(in_logger), aq_devices(in_aq_devices),
image_pusher(pusher), data_processing_settings(DiffractionExperiment::DefaultDataProcessingSettings()) {
}
JFJochReceiverService& JFJochReceiverService::NumThreads(int64_t input) {
if (input <= 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Thread number must be above zero");
nthreads = input;
return *this;
}
JFJochReceiverService &JFJochReceiverService::SendBufferCount(int64_t input) {
if (input <= 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Send buffer count must be above zero");
send_buffer_count = input;
return *this;
}
JFJochReceiverService& JFJochReceiverService::PreviewPublisher(ZMQPreviewPublisher *in_preview_writer) {
preview_publisher = in_preview_writer;
return *this;
}
JFJochReceiverService &JFJochReceiverService::NUMAPolicy(const NUMAHWPolicy &policy) {
numa_policy = policy;
return *this;
}
JFJochReceiverService &JFJochReceiverService::NUMAPolicy(const std::string &policy) {
numa_policy = NUMAHWPolicy(policy);
return *this;
}
grpc::Status JFJochReceiverService::Start(grpc::ServerContext *context, const JFJochProtoBuf::ReceiverInput *request,
JFJochProtoBuf::Empty *response) {
experiment = std::make_unique<DiffractionExperiment>(request->jungfraujoch_settings());
if (request->has_calibration())
calibration = std::make_unique<JFCalibration>(request->calibration());
else
calibration.reset();
try {
Start(*experiment, calibration.get());
return grpc::Status::OK;
} catch (const JFJochException &e) {
logger.ErrorException(e);
return {grpc::StatusCode::ABORTED, e.what()};
}
}
void JFJochReceiverService::FinalizeMeasurement() {
receiver->StopReceiver();
{
std::unique_lock<std::mutex> ul(state_mutex);
state = ReceiverState::Idle;
measurement_done.notify_all();
}
}
grpc::Status JFJochReceiverService::Stop(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::ReceiverOutput *response) {
try {
auto output = Stop();
response->set_max_receive_delay(output.max_receive_delay);
response->set_compressed_size(output.compressed_size);
response->set_compressed_ratio(output.compressed_ratio);
response->set_images_sent(output.images_sent);
response->set_start_time_ms(output.start_time_ms);
response->set_end_time_ms(output.end_time_ms);
response->set_efficiency(output.efficiency);
response->set_max_image_number_sent(output.max_image_number_sent);
response->set_cancelled(output.cancelled);
response->set_master_file_name(output.master_file_name);
if (!output.pedestal_result.empty())
*response->mutable_pedestal_result() = {output.pedestal_result.begin(), output.pedestal_result.end()};
response->set_indexing_rate(output.indexing_rate);
response->set_bkg_estimate(output.bkg_estimate);
for (const auto &i: output.received_packets)
response->add_received_packets(i);
for (const auto &i: output.expected_packets)
response->add_expected_packets(i);
return grpc::Status::OK;
} catch (JFJochException &e) {
logger.ErrorException(e);
return {grpc::StatusCode::ABORTED, e.what()};
}
}
grpc::Status JFJochReceiverService::Cancel(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::Empty *response) {
try {
Cancel();
return grpc::Status::OK;
} catch (std::exception &e) {
return {grpc::StatusCode::ABORTED, e.what()};
}
}
grpc::Status JFJochReceiverService::Abort(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::Empty *response) {
try {
Abort();
return grpc::Status::OK;
} catch (std::exception &e) {
return {grpc::StatusCode::ABORTED, e.what()};
}
}
grpc::Status JFJochReceiverService::GetStatus(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::ReceiverStatus *response) {
// Need to hold mutex, as receiver might not exist here, if state is idle
std::unique_lock<std::mutex> ul(state_mutex);
response->set_progress(-1);
response->set_send_buffers_avail(-1);
response->set_indexing_rate(-1);
if (state == ReceiverState::Running) {
response->set_progress(receiver->GetProgress());
response->set_send_buffers_avail(receiver->GetAvailableSendBuffers());
}
if (receiver) {
double indexing_rate = receiver->GetIndexingRate();
if (!std::isnan(indexing_rate))
response->set_indexing_rate(indexing_rate);
}
return grpc::Status::OK;
}
grpc::Status JFJochReceiverService::GetDataProcessingPlots(grpc::ServerContext *context,
const JFJochProtoBuf::PlotRequest *request,
JFJochProtoBuf::Plot *response) {
// Need to hold mutex, as receiver might not exist here, if state is idle
try {
Convert(GetDataProcessingPlot(Convert(*request)), *response);
return grpc::Status::OK;
} catch (std::exception &e) {
return {grpc::StatusCode::ABORTED, e.what()};
}
}
grpc::Status JFJochReceiverService::GetRadialIntegrationProfiles(grpc::ServerContext *context,
const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::RadialIntegrationProfiles *response) {
// Need to hold mutex, as receiver might not exist here, if state is idle
try {
Convert(GetRadialIntegrationProfiles(), *response);
return grpc::Status::OK;
} catch (std::exception &e) {
return {grpc::StatusCode::ABORTED, e.what()};
}
}
grpc::Status JFJochReceiverService::SetDataProcessingSettings(grpc::ServerContext *context,
const JFJochProtoBuf::DataProcessingSettings *request,
JFJochProtoBuf::Empty *response) {
try {
SetDataProcessingSettings(Convert(*request));
return grpc::Status::OK;
} catch (std::exception &e) {
return {grpc::StatusCode::ABORTED, e.what()};
}
}
grpc::Status JFJochReceiverService::GetNetworkConfig(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::ReceiverNetworkConfig *response) {
for (int i = 0; i < aq_devices.size(); i++) {
auto dev_net_cfg = response->add_device();
dev_net_cfg->set_mac_addr(aq_devices[i].GetMACAddress());
dev_net_cfg->set_ipv4_addr(aq_devices[i].GetIPv4Address());
dev_net_cfg->set_udp_port(aq_devices[i].GetUDPPort());
}
return grpc::Status::OK;
}
void JFJochReceiverService::Start(const DiffractionExperiment &experiment, const JFCalibration *calibration) {
std::unique_lock<std::mutex> ul_state(state_mutex);
if (state != ReceiverState::Idle)
throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Receiver not idle, cannot start");
try {
// Thanks to properties of unique_ptr, starting new measurement will call destructor of JFJochReceiver, which will
// ensure that everything was rolled back
// But it is important to do it in correct order - first abort old receiver, close it, than start new one
receiver = std::make_unique<JFJochReceiver>(experiment, calibration,
aq_devices, image_pusher,
logger, nthreads, send_buffer_count,
preview_publisher, numa_policy);
try {
// Don't want to stop
receiver->SetDataProcessingSettings(data_processing_settings);
} catch (...) {}
measurement = std::async(std::launch::async, &JFJochReceiverService::FinalizeMeasurement, this);
state = ReceiverState::Running;
} catch (const JFJochException &e) {
logger.ErrorException(e);
throw;
}
}
void JFJochReceiverService::Abort() {
std::unique_lock<std::mutex> ul(state_mutex);
if (state == ReceiverState::Idle)
throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Receiver not idle, cannot start");
else {
receiver->Cancel();
}
}
void JFJochReceiverService::Cancel() {
std::unique_lock<std::mutex> ul(state_mutex);
if (state == ReceiverState::Idle)
throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Receiver not idle, cannot start");
else {
receiver->Cancel();
}
}
JFJochReceiverOutput JFJochReceiverService::Stop() {
std::unique_lock<std::mutex> ul(state_mutex);
measurement_done.wait(ul, [this] { return (state != ReceiverState::Running);});
if (state != ReceiverState::Idle)
throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Receiver in weird state");
try {
if (measurement.valid())
measurement.get();
} catch (JFJochException &e) {
logger.ErrorException(e);
throw;
}
if (!receiver) {
logger.Warning("Request via gRPC, while receiver not running");
throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Receiver not idle, cannot start");
}
return receiver->GetStatistics();
}
void JFJochReceiverService::SetDataProcessingSettings(const DataProcessingSettings &settings) {
try {
std::unique_lock<std::mutex> ul(state_mutex);
data_processing_settings = settings;
if (state != ReceiverState::Idle)
receiver->SetDataProcessingSettings(settings);
} catch (std::exception &e) {
throw;
}
}
Plot JFJochReceiverService::GetDataProcessingPlot(const PlotRequest &request) {
// Need to hold mutex, as receiver might not exist here, if state is idle
std::unique_lock<std::mutex> ul(state_mutex);
if (receiver)
return receiver->GetPlots(request);
else
return {};
}
RadialIntegrationProfiles JFJochReceiverService::GetRadialIntegrationProfiles() {
// Need to hold mutex, as receiver might not exist here, if state is idle
std::unique_lock<std::mutex> ul(state_mutex);
if (receiver)
return receiver->GetRadialIntegrationProfiles();
else
return {};
}