Files
Jungfraujoch/receiver/JFJochReceiverService.cpp

163 lines
5.6 KiB
C++

// Copyright (2019-2023) Paul Scherrer Institute
#include "JFJochReceiverService.h"
JFJochReceiverService::JFJochReceiverService(AcquisitionDeviceGroup &in_aq_devices,
Logger &in_logger, ImagePusher &pusher) :
logger(in_logger), aq_devices(in_aq_devices),
image_pusher(pusher), data_processing_settings(DiffractionExperiment::DefaultDataProcessingSettings()) {
}
JFJochReceiverService& JFJochReceiverService::NumThreads(int64_t input) {
if (input <= 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Thread number must be above zero");
nthreads = input;
return *this;
}
JFJochReceiverService &JFJochReceiverService::SendBufferCount(int64_t input) {
if (input <= 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Send buffer count must be above zero");
send_buffer_count = input;
return *this;
}
JFJochReceiverService& JFJochReceiverService::PreviewPublisher(ZMQPreviewPublisher *in_preview_writer) {
preview_publisher = in_preview_writer;
return *this;
}
JFJochReceiverService &JFJochReceiverService::NUMAPolicy(const NUMAHWPolicy &policy) {
numa_policy = policy;
return *this;
}
JFJochReceiverService &JFJochReceiverService::NUMAPolicy(const std::string &policy) {
numa_policy = NUMAHWPolicy(policy);
return *this;
}
void JFJochReceiverService::FinalizeMeasurement() {
receiver->StopReceiver();
{
std::unique_lock<std::mutex> ul(state_mutex);
state = ReceiverState::Idle;
measurement_done.notify_all();
}
}
JFJochReceiverStatus JFJochReceiverService::GetStatus() {
// Need to hold mutex, as receiver might not exist here, if state is idle
std::unique_lock<std::mutex> ul(state_mutex);
if (state == ReceiverState::Running)
return receiver->GetStatus();
else
return {
.progress = -1,
.indexing_rate = -1,
.send_buffers_avail = -1
};
}
void JFJochReceiverService::Start(const DiffractionExperiment &experiment, const JFCalibration *calibration) {
std::unique_lock<std::mutex> ul_state(state_mutex);
if (state != ReceiverState::Idle)
throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Receiver not idle, cannot start");
try {
// Thanks to properties of unique_ptr, starting new measurement will call destructor of JFJochReceiver, which will
// ensure that everything was rolled back
// But it is important to do it in correct order - first abort old receiver, close it, than start new one
receiver = std::make_unique<JFJochReceiver>(experiment, calibration,
aq_devices, image_pusher,
logger, nthreads, send_buffer_count,
preview_publisher, numa_policy);
try {
// Don't want to stop
receiver->SetDataProcessingSettings(data_processing_settings);
} catch (...) {}
measurement = std::async(std::launch::async, &JFJochReceiverService::FinalizeMeasurement, this);
state = ReceiverState::Running;
} catch (const JFJochException &e) {
logger.ErrorException(e);
throw;
}
}
void JFJochReceiverService::Abort() {
std::unique_lock<std::mutex> ul(state_mutex);
if (state == ReceiverState::Idle)
throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Receiver not idle, cannot start");
else {
receiver->Cancel();
}
}
void JFJochReceiverService::Cancel() {
std::unique_lock<std::mutex> ul(state_mutex);
if (state == ReceiverState::Idle)
throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Receiver not idle, cannot start");
else {
receiver->Cancel();
}
}
JFJochReceiverOutput JFJochReceiverService::Stop() {
std::unique_lock<std::mutex> ul(state_mutex);
measurement_done.wait(ul, [this] { return (state != ReceiverState::Running);});
if (state != ReceiverState::Idle)
throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Receiver in weird state");
try {
if (measurement.valid())
measurement.get();
} catch (JFJochException &e) {
logger.ErrorException(e);
throw;
}
if (!receiver) {
logger.Warning("Request via gRPC, while receiver not running");
throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Receiver not idle, cannot start");
}
return receiver->GetStatistics();
}
void JFJochReceiverService::SetDataProcessingSettings(const DataProcessingSettings &settings) {
try {
std::unique_lock<std::mutex> ul(state_mutex);
data_processing_settings = settings;
if (state != ReceiverState::Idle)
receiver->SetDataProcessingSettings(settings);
} catch (std::exception &e) {
throw;
}
}
Plot JFJochReceiverService::GetDataProcessingPlot(const PlotRequest &request) {
// Need to hold mutex, as receiver might not exist here, if state is idle
std::unique_lock<std::mutex> ul(state_mutex);
if (receiver)
return receiver->GetPlots(request);
else
return {};
}
RadialIntegrationProfiles JFJochReceiverService::GetRadialIntegrationProfiles() {
// Need to hold mutex, as receiver might not exist here, if state is idle
std::unique_lock<std::mutex> ul(state_mutex);
if (receiver)
return receiver->GetRadialIntegrationProfiles();
else
return {};
}
std::vector<AcquisitionDeviceNetConfig> JFJochReceiverService::GetNetworkConfig() {
return aq_devices.GetNetworkConfig();
}