JFJochReceiver is directly invoked by the broker

This commit is contained in:
2023-11-13 16:08:28 +01:00
parent 3725ec5a73
commit dbef6a3b17
23 changed files with 300 additions and 1171 deletions

View File

@@ -55,12 +55,6 @@ struct AcquisitionDeviceStatistics {
std::vector<uint64_t> packets_received_per_module;
};
struct AcquisitionDeviceNetConfig {
std::string mac_addr;
std::string ipv4_addr;
uint64_t udp_port;
};
class AcquisitionDevice {
std::vector<int16_t> buffer_err;

View File

@@ -233,8 +233,6 @@ void ParseFacilityConfiguration(const nlohmann::json &input, const std::string&
void ParseBrokerConfiguration(const nlohmann::json &input, const std::string& tag, JFJochBroker& broker) {
if (CHECK_OBJECT(input, tag)) {
auto j = input[tag];
if (j.contains("receiver_addr"))
broker.Services().Receiver(GET_STR(j, "receiver_addr"));
if (CHECK_ARRAY(j, "writer")) {
for (const auto &iter: j["writer"]) {

View File

@@ -15,11 +15,13 @@ void JFJochServices::Start(const DiffractionExperiment& experiment, const JFCali
} else
writer_running = false;
logger.Info(" ... receiver start");
if (experiment.GetDetectorMode() == DetectorMode::Conversion)
receiver.Start(experiment, &calibration);
else
receiver.Start(experiment, nullptr);
if (receiver != nullptr) {
logger.Info(" ... receiver start");
if (experiment.GetDetectorMode() == DetectorMode::Conversion)
receiver->Start(experiment, &calibration);
else
receiver->Start(experiment, nullptr);
}
if (!experiment.IsUsingInternalPacketGen()) {
logger.Info(" ... detector start");
@@ -35,9 +37,10 @@ void JFJochServices::Off() {
void JFJochServices::On(const DiffractionExperiment &x) {
logger.Info("Detector on");
JFJochProtoBuf::DetectorConfig config = x.DetectorConfig(receiver.GetNetworkConfig());
detector.On(config);
if (receiver != nullptr) {
JFJochProtoBuf::DetectorConfig config = x.DetectorConfig(receiver->GetNetworkConfig());
detector.On(config);
}
logger.Info(" ... done");
}
@@ -46,26 +49,28 @@ JFJochServicesOutput JFJochServices::Stop(const JFCalibration &calibration) {
std::unique_ptr<JFJochException> exception;
try {
logger.Info("Wait for receiver done");
ret.receiver_output = receiver.Stop();
if (receiver != nullptr) {
try {
logger.Info("Wait for receiver done");
ret.receiver_output = receiver->Stop();
logger.Info(" ... Receiver efficiency: {} % Max delay: {} Compression ratio {}x",
static_cast<int>(ret.receiver_output.efficiency()*100.0),
ret.receiver_output.max_receive_delay(),
static_cast<int>(std::round(ret.receiver_output.compressed_ratio())));
if (ret.receiver_output.efficiency() < 1.0) {
for (int i = 0; i < ret.receiver_output.received_packets_size(); i++) {
if (ret.receiver_output.received_packets(i) != ret.receiver_output.expected_packets(i))
logger.Info(" ... Module: {} Packets received: {} out of {}", i,
ret.receiver_output.received_packets(i), ret.receiver_output.expected_packets(i));
logger.Info(" ... Receiver efficiency: {} % Max delay: {} Compression ratio {}x",
static_cast<int>(ret.receiver_output.efficiency * 100.0),
ret.receiver_output.max_receive_delay,
static_cast<int>(std::round(ret.receiver_output.compressed_ratio)));
if (ret.receiver_output.efficiency < 1.0) {
for (int i = 0; i < ret.receiver_output.received_packets.size(); i++) {
if (ret.receiver_output.received_packets[i] != ret.receiver_output.expected_packets[i])
logger.Info(" ... Module: {} Packets received: {} out of {}", i,
ret.receiver_output.received_packets[i], ret.receiver_output.expected_packets[i]);
}
}
} catch (const JFJochException &e) {
logger.Error(" ... finished with error {}", e.what());
exception = std::make_unique<JFJochException>(e);
}
} catch (const JFJochException &e) {
logger.Error(" ... finished with error {}",e.what());
exception = std::make_unique<JFJochException>(e);
logger.Info("Receiver finished with success");
}
logger.Info("Receiver finished with success");
if (writer_running) {
logger.Info("Stopping writer");
@@ -100,7 +105,8 @@ void JFJochServices::Abort() {
// Abort should try to achieve the best outcome possible
// but it OK if things fail (for example lost connection)
try {
receiver.Abort();
if (receiver != nullptr)
receiver->Abort();
} catch (const std::exception &e) {
logger.Error(e.what());
}
@@ -108,12 +114,12 @@ void JFJochServices::Abort() {
void JFJochServices::Cancel() {
detector.Stop();
receiver.Cancel();
if (receiver != nullptr)
receiver->Cancel();
}
JFJochServices &JFJochServices::Receiver(const std::string &addr) {
receiver.Connect(addr);
logger.Info("Using receiver service with gRPC " + addr);
JFJochServices &JFJochServices::Receiver(JFJochReceiverService *input) {
receiver = input;
return *this;
}
@@ -130,23 +136,31 @@ JFJochServices &JFJochServices::Detector(const std::string &addr) {
return *this;
}
JFJochProtoBuf::ReceiverStatus JFJochServices::GetReceiverStatus() {
return receiver.GetStatus();
JFJochReceiverStatus JFJochServices::GetReceiverStatus() {
if (receiver == nullptr)
return {};
return receiver->GetStatus();
}
JFJochProtoBuf::Plot JFJochServices::GetPlots(const JFJochProtoBuf::PlotRequest &request) {
Plot JFJochServices::GetPlots(const PlotRequest &request) {
if (receiver == nullptr)
return {};
try {
return receiver.GetPlots(request);
return receiver->GetDataProcessingPlot(request);
} catch (...) {
return JFJochProtoBuf::Plot();
return {};
}
}
JFJochProtoBuf::RadialIntegrationProfiles JFJochServices::GetRadialIntegrationProfiles() {
RadialIntegrationProfiles JFJochServices::GetRadialIntegrationProfiles() {
if (receiver == nullptr)
return {};
try {
return receiver.GetRadialIntegrationProfiles();
return receiver->GetRadialIntegrationProfiles();
} catch (...) {
return JFJochProtoBuf::RadialIntegrationProfiles();
return {};
}
}
@@ -165,8 +179,10 @@ inline JFJochProtoBuf::DataProcessingSettings Convert(const DataProcessingSettin
return ret;
}
void JFJochServices::SetDataProcessingSettings(const DataProcessingSettings &settings) {
receiver.SetDataProcessingSettings(Convert(settings));
if (receiver)
receiver->SetDataProcessingSettings(settings);
}
void JFJochServices::Trigger() {

View File

@@ -6,16 +6,16 @@
#include "../common/DiffractionExperiment.h"
#include "../jungfrau/JFCalibration.h"
#include "../common/Logger.h"
#include "../grpc/JFJochReceiverClient.h"
#include "../receiver/JFJochReceiverService.h"
#include "../grpc/JFJochWriterGroupClient.h"
#include "../grpc/JFJochDetectorClient.h"
struct JFJochServicesOutput {
JFJochProtoBuf::ReceiverOutput receiver_output;
JFJochReceiverOutput receiver_output;
};
class JFJochServices {
JFJochReceiverClient receiver;
JFJochReceiverService *receiver = nullptr;
JFJochWriterGroupClient writer;
JFJochDetectorClient detector;
@@ -32,12 +32,12 @@ public:
void Cancel();
void Trigger();
JFJochProtoBuf::ReceiverStatus GetReceiverStatus();
JFJochProtoBuf::Plot GetPlots(const JFJochProtoBuf::PlotRequest &request);
JFJochProtoBuf::RadialIntegrationProfiles GetRadialIntegrationProfiles();
JFJochReceiverStatus GetReceiverStatus();
Plot GetPlots(const PlotRequest &request);
RadialIntegrationProfiles GetRadialIntegrationProfiles();
void SetDataProcessingSettings(const DataProcessingSettings &settings);
JFJochServices& Receiver(const std::string &addr);
JFJochServices& Receiver(JFJochReceiverService *input);
JFJochServices& Writer(const std::string &addr, const std::string &zmq_push_addr);
JFJochServices& Detector(const std::string &addr);

View File

@@ -35,28 +35,74 @@ inline JFJochProtoBuf::DataProcessingSettings Convert(const DataProcessingSettin
return ret;
}
inline PlotRequest Convert(const JFJochProtoBuf::PlotRequest& request) {
PlotRequest ret;
ret.binning = request.binning();
switch (request.type()) {
case JFJochProtoBuf::BKG_ESTIMATE:
ret.type = PlotType::BkgEstimate;
break;
case JFJochProtoBuf::RAD_INT:
ret.type = PlotType::RadInt;
break;
case JFJochProtoBuf::SPOT_COUNT:
ret.type = PlotType::SpotCount;
break;
case JFJochProtoBuf::INDEXING_RATE:
ret.type = PlotType::IndexingRate;
break;
case JFJochProtoBuf::INDEXING_RATE_PER_FILE:
ret.type = PlotType::IndexingRatePerFile;
break;
default:
case JFJochProtoBuf::ADU_HISTOGRAM:
ret.type = PlotType::ADUHistorgram;
break;
}
return ret;
}
inline JFJochProtoBuf::Plot Convert(const Plot& input) {
JFJochProtoBuf::Plot output;
if (!input.x.empty())
*output.mutable_x() = {input.x.begin(), input.x.end()};
if (!input.y.empty())
*output.mutable_y() = {input.y.begin(), input.y.end()};
return output;
}
inline JFJochProtoBuf::RadialIntegrationProfiles Convert(const RadialIntegrationProfiles& input) {
JFJochProtoBuf::RadialIntegrationProfiles output;
for (const auto &i: input.profiles) {
auto tmp = output.add_profiles();
tmp->set_title(i.title);
*tmp->mutable_plot() = Convert(i.plot);
}
return output;
}
JFJochStateMachine::JFJochStateMachine(JFJochServices &in_services, Logger &in_logger)
: services(in_services), logger(in_logger),
data_processing_settings(DiffractionExperiment::DefaultDataProcessingSettings()) {
}
void JFJochStateMachine::ImportPedestalG0(const JFJochProtoBuf::ReceiverOutput &receiver_output) {
if (receiver_output.pedestal_result_size() != experiment.GetModulesNum() * experiment.GetStorageCellNumber())
void JFJochStateMachine::ImportPedestalG0(const JFJochReceiverOutput &receiver_output) {
if (receiver_output.pedestal_result.size() != experiment.GetModulesNum() * experiment.GetStorageCellNumber())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Mismatch in pedestal output");
for (int s = 0; s < experiment.GetStorageCellNumber(); s++) {
for (int module = 0; module < experiment.GetModulesNum(); module++)
calibration->Pedestal(module, 0, s)
= receiver_output.pedestal_result(module + s * experiment.GetModulesNum());
= receiver_output.pedestal_result[module + s * experiment.GetModulesNum()];
}
SetCalibrationStatistics(calibration->GetModuleStatistics());
}
void JFJochStateMachine::ImportPedestal(const JFJochProtoBuf::ReceiverOutput &receiver_output, size_t gain_level,
void JFJochStateMachine::ImportPedestal(const JFJochReceiverOutput &receiver_output, size_t gain_level,
size_t storage_cell) {
for (int i = 0; i < receiver_output.pedestal_result_size(); i++)
calibration->Pedestal(i, gain_level, storage_cell) = receiver_output.pedestal_result(i);
for (int i = 0; i < receiver_output.pedestal_result.size(); i++)
calibration->Pedestal(i, gain_level, storage_cell) = receiver_output.pedestal_result[i];
SetCalibrationStatistics(calibration->GetModuleStatistics());
}
@@ -339,14 +385,14 @@ void JFJochStateMachine::SetFullMeasurementOutput(const JFJochServicesOutput &ou
tmp.set_detector_height(experiment.GetYPixelsNum());
tmp.set_detector_pixel_depth(experiment.GetPixelDepth());
tmp.set_compression_ratio(output.receiver_output.compressed_ratio());
tmp.set_collection_efficiency(output.receiver_output.efficiency());
tmp.set_images_collected(output.receiver_output.images_sent());
tmp.set_cancelled(output.receiver_output.cancelled());
tmp.set_max_image_number_sent(output.receiver_output.max_image_number_sent());
tmp.set_max_receive_delay(output.receiver_output.max_receive_delay());
tmp.set_indexing_rate(output.receiver_output.indexing_rate());
tmp.set_bkg_estimate(output.receiver_output.bkg_estimate());
tmp.set_compression_ratio(output.receiver_output.compressed_ratio);
tmp.set_collection_efficiency(output.receiver_output.efficiency);
tmp.set_images_collected(output.receiver_output.images_sent);
tmp.set_cancelled(output.receiver_output.cancelled);
tmp.set_max_image_number_sent(output.receiver_output.max_image_number_sent);
tmp.set_max_receive_delay(output.receiver_output.max_receive_delay);
tmp.set_indexing_rate(output.receiver_output.indexing_rate);
tmp.set_bkg_estimate(output.receiver_output.bkg_estimate);
measurement_statistics = tmp;
}
@@ -514,20 +560,20 @@ JFJochProtoBuf::BrokerStatus JFJochStateMachine::GetStatus() const {
try {
auto rcv_status = services.GetReceiverStatus();
ret.set_progress(rcv_status.progress());
ret.set_indexing_rate(rcv_status.indexing_rate());
ret.set_receiver_send_buffers_avail(rcv_status.send_buffers_avail());
ret.set_progress(rcv_status.progress);
ret.set_indexing_rate(rcv_status.indexing_rate);
ret.set_receiver_send_buffers_avail(rcv_status.send_buffers_avail);
} catch (JFJochException &e) {} // ignore exception in getting receiver status (don't really care, e.g. if receiver is down)
return ret;
}
JFJochProtoBuf::Plot JFJochStateMachine::GetPlots(const JFJochProtoBuf::PlotRequest &request) const {
return services.GetPlots(request);
return Convert(services.GetPlots(Convert(request)));
}
JFJochProtoBuf::RadialIntegrationProfiles JFJochStateMachine::GetRadialIntegrationProfiles() const {
return services.GetRadialIntegrationProfiles();
return Convert(services.GetRadialIntegrationProfiles());
}
void JFJochStateMachine::SetDataProcessingSettings(const JFJochProtoBuf::DataProcessingSettings &settings) {

View File

@@ -49,8 +49,8 @@ class JFJochStateMachine {
// Private functions assume that lock m is acquired
void SetDatasetDefaults(JFJochProtoBuf::DatasetSettings& settings);
void WaitTillMeasurementDone();
void ImportPedestal(const JFJochProtoBuf::ReceiverOutput &receiver_output, size_t gain_level, size_t storage_cell = 0);
void ImportPedestalG0(const JFJochProtoBuf::ReceiverOutput &receiver_output);
void ImportPedestal(const JFJochReceiverOutput &receiver_output, size_t gain_level, size_t storage_cell = 0);
void ImportPedestalG0(const JFJochReceiverOutput &receiver_output);
void TakePedestalInternalAll(std::unique_lock<std::mutex> &ul);
void TakePedestalInternalG0(std::unique_lock<std::mutex> &ul);

View File

@@ -12,22 +12,6 @@
#define check_max(param, val, max) if ((val) > (max)) throw JFJochException(JFJochExceptionCategory::InputParameterAboveMax, param)
#define check_min(param, val, min) if ((val) < (min)) throw JFJochException(JFJochExceptionCategory::InputParameterBelowMin, param)
DiffractionExperiment::DiffractionExperiment(const JFJochProtoBuf::JungfraujochSettings &settings) : DiffractionExperiment() {
Import(settings);
}
DiffractionExperiment& DiffractionExperiment::Import(const JFJochProtoBuf::JungfraujochSettings &settings) {
internal = settings.internal();
dataset = settings.dataset();
return *this;
}
DiffractionExperiment::operator JFJochProtoBuf::JungfraujochSettings() const {
JFJochProtoBuf::JungfraujochSettings settings;
*settings.mutable_dataset() = dataset;
*settings.mutable_internal() = internal;
return settings;
}
DiffractionExperiment::DiffractionExperiment() : DiffractionExperiment(DetectorGeometry(8, 2))
{}
@@ -798,9 +782,9 @@ DiffractionExperiment::operator JFJochProtoBuf::DetectorInput() const {
return ret;
}
JFJochProtoBuf::DetectorConfig DiffractionExperiment::DetectorConfig(const JFJochProtoBuf::ReceiverNetworkConfig &net_config) const {
JFJochProtoBuf::DetectorConfig DiffractionExperiment::DetectorConfig(const std::vector<AcquisitionDeviceNetConfig> &net_config) const {
JFJochProtoBuf::DetectorConfig ret;
if (net_config.device_size() < GetDataStreamsNum())
if (net_config.size() < GetDataStreamsNum())
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds,
"Number of FPGA boards in the receiver is less then necessary");
@@ -816,14 +800,14 @@ JFJochProtoBuf::DetectorConfig DiffractionExperiment::DetectorConfig(const JFJoc
for (int d = 0; d < GetDataStreamsNum(); d++) {
for (int m = 0; m < GetModulesNum(d); m++) {
auto mod_cfg = ret.add_modules();
mod_cfg->set_udp_dest_port_1(net_config.device(d).udp_port());
mod_cfg->set_udp_dest_port_2(net_config.device(d).udp_port());
mod_cfg->set_udp_dest_port_1(net_config[d].udp_port);
mod_cfg->set_udp_dest_port_2(net_config[d].udp_port);
mod_cfg->set_ipv4_src_addr_1(IPv4AddressToStr(GetSrcIPv4Address(d, 2 * m)));
mod_cfg->set_ipv4_src_addr_2(IPv4AddressToStr(GetSrcIPv4Address(d, 2 * m + 1)));
mod_cfg->set_ipv4_dest_addr_1(net_config.device(d).ipv4_addr());
mod_cfg->set_ipv4_dest_addr_2(net_config.device(d).ipv4_addr());
mod_cfg->set_mac_addr_dest_1(net_config.device(d).mac_addr());
mod_cfg->set_mac_addr_dest_2(net_config.device(d).mac_addr());
mod_cfg->set_ipv4_dest_addr_1(net_config[d].ipv4_addr);
mod_cfg->set_ipv4_dest_addr_2(net_config[d].ipv4_addr);
mod_cfg->set_mac_addr_dest_1(net_config[d].mac_addr);
mod_cfg->set_mac_addr_dest_2(net_config[d].mac_addr);
mod_cfg->set_module_id_in_data_stream(m);
}
}

View File

@@ -22,6 +22,12 @@ enum class DetectorMode : int {
Conversion, Raw, PedestalG0, PedestalG1, PedestalG2
};
struct AcquisitionDeviceNetConfig {
std::string mac_addr;
std::string ipv4_addr;
uint64_t udp_port;
};
class DiffractionExperiment {
JFJochProtoBuf::DatasetSettings dataset;
JFJochProtoBuf::InternalSettings internal;
@@ -32,15 +38,12 @@ public:
// Public methods are atomic
DiffractionExperiment();
DiffractionExperiment(const DetectorSetup& geom);
explicit DiffractionExperiment(const JFJochProtoBuf::JungfraujochSettings &settings);
// Methods below can be chained together
DiffractionExperiment& Detector(const DetectorSetup& input);
DiffractionExperiment& Mode(DetectorMode input);
DiffractionExperiment& DataStreams(int64_t input);
DiffractionExperiment& Import(const JFJochProtoBuf::JungfraujochSettings &settings);
DiffractionExperiment& ImagesPerTrigger(int64_t input);
DiffractionExperiment& NumTriggers(int64_t triggers);
@@ -85,12 +88,11 @@ public:
DiffractionExperiment& InstrumentNameShort(std::string input);
DiffractionExperiment& ApplyPixelMaskInFPGA(bool input);
operator JFJochProtoBuf::JungfraujochSettings() const;
operator JFJochProtoBuf::DetectorInput() const;
void FillMessage(StartMessage &message) const;
JFJochProtoBuf::DetectorConfig DetectorConfig(const JFJochProtoBuf::ReceiverNetworkConfig& net_config) const;
JFJochProtoBuf::DetectorConfig DetectorConfig(const std::vector<AcquisitionDeviceNetConfig>& net_config) const;
void LoadDatasetSettings(const JFJochProtoBuf::DatasetSettings &settings);
void LoadDetectorSettings(const JFJochProtoBuf::DetectorSettings &settings);
JFJochProtoBuf::DetectorSettings GetDetectorSettings() const;

View File

@@ -47,7 +47,6 @@ TARGET_INCLUDE_DIRECTORIES(JFJochProtoBuf PUBLIC ${CMAKE_CURRENT_BINARY_DIR})
TARGET_LINK_LIBRARIES(JFJochProtoBuf ${_GRPC_GRPCPP})
ADD_LIBRARY(gRPCClients STATIC
JFJochReceiverClient.cpp JFJochReceiverClient.h
JFJochDetectorClient.cpp JFJochDetectorClient.h
JFJochWriterClient.cpp JFJochWriterClient.h
JFJochWriterGroupClient.cpp JFJochWriterGroupClient.h)

View File

@@ -1,184 +0,0 @@
// Copyright (2019-2023) Paul Scherrer Institute
#include <grpcpp/grpcpp.h>
#include "JFJochReceiverClient.h"
#include "../common/JFJochException.h"
void JFJochReceiverClient::Connect(const std::string& addr)
{
if (addr.empty()) _stub.reset();
else {
grpc::ChannelArguments ch_args;
ch_args.SetMaxReceiveMessageSize(GRPC_MAX_MESSAGE_SIZE);
auto ch = grpc::CreateCustomChannel(addr, grpc::InsecureChannelCredentials(), ch_args);
_stub = std::make_unique<JFJochProtoBuf::gRPC_JFJochReceiver::Stub>(ch);
}
};
void JFJochReceiverClient::Start(const DiffractionExperiment &experiment, const JFCalibration *calibration) {
JFJochProtoBuf::ReceiverInput receiver_input;
*receiver_input.mutable_jungfraujoch_settings() = experiment;
if (calibration != nullptr)
*receiver_input.mutable_calibration() = *calibration;
if (_stub) {
grpc::ClientContext context;
JFJochProtoBuf::Empty empty;
auto status = _stub->Start(&context, receiver_input, &empty);
if (!status.ok()) throw JFJochException(JFJochExceptionCategory::gRPCError,
"JFJochReceiver: " + status.error_message());
}
};
void JFJochReceiverClient::Cancel() {
if (_stub) {
grpc::ClientContext context;
JFJochProtoBuf::Empty empty;
auto status = _stub->Cancel(&context, empty, &empty);
if (!status.ok()) throw JFJochException(JFJochExceptionCategory::gRPCError,
"JFJochReceiver: " + status.error_message());
}
};
void JFJochReceiverClient::Abort() {
if (_stub) {
grpc::ClientContext context;
JFJochProtoBuf::Empty empty;
auto status = _stub->Abort(&context, empty, &empty);
if (!status.ok()) throw JFJochException(JFJochExceptionCategory::gRPCError,
"JFJochReceiver: " + status.error_message());
}
};
JFJochProtoBuf::ReceiverOutput JFJochReceiverClient::Stop() {
JFJochProtoBuf::ReceiverOutput ret;
if (_stub) {
grpc::ClientContext context;
JFJochProtoBuf::Empty empty;
auto status = _stub->Stop(&context, empty, &ret);
if (!status.ok()) throw JFJochException(JFJochExceptionCategory::gRPCError,
"JFJochReceiver: " + status.error_message());
}
return ret;
};
JFJochProtoBuf::ReceiverStatus JFJochReceiverClient::GetStatus() {
JFJochProtoBuf::ReceiverStatus ret;
if (_stub) {
grpc::ClientContext context;
JFJochProtoBuf::Empty empty;
auto status = _stub->GetStatus(&context, empty, &ret);
if (!status.ok()) throw JFJochException(JFJochExceptionCategory::gRPCError,
"JFJochReceiver: " + status.error_message());
}
return ret;
}
void JFJochReceiverClient::SetDataProcessingSettings(const JFJochProtoBuf::DataProcessingSettings &settings) {
JFJochProtoBuf::Empty ret;
if (_stub) {
grpc::ClientContext context;
auto status = _stub->SetDataProcessingSettings(&context, settings, &ret);
if (!status.ok()) throw JFJochException(JFJochExceptionCategory::gRPCError,
"JFJochReceiver: " + status.error_message());
}
}
JFJochProtoBuf::ReceiverNetworkConfig JFJochReceiverClient::GetNetworkConfig() {
JFJochProtoBuf::ReceiverNetworkConfig ret;
if (_stub) {
grpc::ClientContext context;
JFJochProtoBuf::Empty empty;
auto status = _stub->GetNetworkConfig(&context, empty, &ret);
if (!status.ok()) throw JFJochException(JFJochExceptionCategory::gRPCError,
"JFJochReceiver: " + status.error_message());
} else {
// For tests to work, dummy receiver needs to replay with nonsense MAC addresses
auto d1 = ret.add_device();
d1->set_mac_addr("00:00:00:00:00:00");
d1->set_ipv4_addr("10.10.50.1");
d1->set_udp_port(1234);
auto d2 = ret.add_device();
d2->set_mac_addr("00:00:00:00:00:01");
d2->set_ipv4_addr("10.10.50.2");
d2->set_udp_port(1234);
}
return ret;
}
JFJochProtoBuf::Plot JFJochReceiverClient::GetPlots(const JFJochProtoBuf::PlotRequest &request) {
JFJochProtoBuf::Plot ret;
if (_stub) {
grpc::ClientContext context;
JFJochProtoBuf::Empty empty;
auto status = _stub->GetDataProcessingPlots(&context, request, &ret);
if (!status.ok()) throw JFJochException(JFJochExceptionCategory::gRPCError,
"JFJochReceiver: " + status.error_message());
} else {
// TODO: Write some dummy plots
}
return ret;
}
JFJochProtoBuf::RadialIntegrationProfiles JFJochReceiverClient::GetRadialIntegrationProfiles() {
JFJochProtoBuf::RadialIntegrationProfiles ret;
if (_stub) {
grpc::ClientContext context;
JFJochProtoBuf::Empty empty;
auto status = _stub->GetRadialIntegrationProfiles(&context, empty, &ret);
if (!status.ok()) throw JFJochException(JFJochExceptionCategory::gRPCError,
"JFJochReceiver: " + status.error_message());
} else {
auto p = ret.add_profiles();
p->set_title("dataset");
*p->mutable_plot() = GenerateGaussianPlot(50, 0.1, 2.5, 0.2);
p = ret.add_profiles();
p->set_title("file0");
*p->mutable_plot() = GenerateGaussianPlot(50, 0.1, 3.0, 0.2);
p = ret.add_profiles();
p->set_title("file1");
*p->mutable_plot() = GenerateGaussianPlot(50, 0.1, 2.0, 0.2);
p = ret.add_profiles();
p->set_title("file2");
*p->mutable_plot() = GenerateGaussianPlot(50, 0.1, 2.5, 0.1);
p = ret.add_profiles();
p->set_title("file3");
*p->mutable_plot() = GenerateGaussianPlot(50, 0.1, 2.5, 0.5);
for (int i = 4; i < 16; i++) {
p = ret.add_profiles();
p->set_title("file" + std::to_string(i));
*p->mutable_plot() = GenerateGaussianPlot(50, 0.1, 2.3, 0.1 + 0.02 * i);
}
}
return ret;
}
JFJochProtoBuf::Plot JFJochReceiverClient::GenerateGaussianPlot(uint64_t n_elements, float spacing, float mean, float std) {
std::vector<float> x(n_elements);
std::vector<float> y(n_elements);
constexpr float inv_sqrt_2pi = 0.3989422804;
for (int i = 0; i < n_elements; i++) {
x[i] = spacing * i;
float a = (x[i] - mean) / std;
y[i] = inv_sqrt_2pi / std * expf(-0.5f * a * a);;
}
JFJochProtoBuf::Plot ret;
if (n_elements > 0) {
*ret.mutable_x() = {x.begin(), x.end()};
*ret.mutable_y() = {y.begin(), y.end()};
}
return ret;
}

View File

@@ -1,30 +0,0 @@
// Copyright (2019-2023) Paul Scherrer Institute
#ifndef JUNGFRAUJOCH_JFJOCHRECEIVERCLIENT_H
#define JUNGFRAUJOCH_JFJOCHRECEIVERCLIENT_H
#include <string>
#include <jfjoch.grpc.pb.h>
#include "../common/DiffractionExperiment.h"
#include "../jungfrau/JFCalibration.h"
class JFJochReceiverClient {
std::unique_ptr<JFJochProtoBuf::gRPC_JFJochReceiver::Stub> _stub;
static JFJochProtoBuf::Plot GenerateGaussianPlot(uint64_t n_elements, float spacing, float max, float std);
public:
void Connect(const std::string& addr);
void Start(const DiffractionExperiment &experiment, const JFCalibration *calibration);
void Abort();
void Cancel();
void SetDataProcessingSettings(const JFJochProtoBuf::DataProcessingSettings& settings);
JFJochProtoBuf::ReceiverOutput Stop();
JFJochProtoBuf::ReceiverStatus GetStatus();
JFJochProtoBuf::ReceiverNetworkConfig GetNetworkConfig();
JFJochProtoBuf::Plot GetPlots(const JFJochProtoBuf::PlotRequest& request);
JFJochProtoBuf::RadialIntegrationProfiles GetRadialIntegrationProfiles();
};
#endif //JUNGFRAUJOCH_JFJOCHRECEIVERCLIENT_H

View File

@@ -208,46 +208,6 @@ message JFCalibrationStatistics {
}
// Receiver
message ReceiverInput {
JungfraujochSettings jungfraujoch_settings = 1;
JFCalibration calibration = 2;
}
message ReceiverOutput {
uint64 max_receive_delay = 2;
uint64 compressed_size = 3;
float compressed_ratio = 4;
uint64 images_sent = 5;
uint64 start_time_ms = 6;
uint64 end_time_ms = 7;
float efficiency = 8;
uint64 max_image_number_sent = 9;
bool cancelled = 10;
string master_file_name = 11;
repeated JFPedestal pedestal_result = 12;
float indexing_rate = 13;
float bkg_estimate = 14;
repeated uint64 received_packets = 15;
repeated uint64 expected_packets = 16;
}
message ReceiverNetworkConfigDevice {
string mac_addr = 1;
string ipv4_addr = 2;
uint64 udp_port = 3;
}
message ReceiverNetworkConfig {
repeated ReceiverNetworkConfigDevice device = 1;
}
message ReceiverStatus {
float progress = 1;
float indexing_rate = 3;
float send_buffers_avail = 4;
}
enum PlotType {
BKG_ESTIMATE = 0;
RAD_INT = 1;
@@ -420,18 +380,6 @@ message DetectorSelection {
int64 id = 1;
}
service gRPC_JFJochReceiver {
rpc Start (ReceiverInput) returns (Empty) {}
rpc Abort (Empty) returns (Empty) {}
rpc Cancel (Empty) returns (Empty) {}
rpc Stop (Empty) returns (ReceiverOutput) {}
rpc GetStatus (Empty) returns (ReceiverStatus) {}
rpc SetDataProcessingSettings(DataProcessingSettings) returns (Empty) {}
rpc GetDataProcessingPlots(PlotRequest) returns (Plot) {}
rpc GetRadialIntegrationProfiles(Empty) returns (RadialIntegrationProfiles) {}
rpc GetNetworkConfig (Empty) returns (ReceiverNetworkConfig) {}
}
service gRPC_JFJochWriter {
rpc Start (WriterInput) returns (Empty) {}
rpc Abort (Empty) returns (Empty) {}

File diff suppressed because one or more lines are too long

View File

@@ -5,331 +5,6 @@ import grpc
import jfjoch_pb2 as jfjoch__pb2
class gRPC_JFJochReceiverStub(object):
"""Missing associated documentation comment in .proto file."""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.Start = channel.unary_unary(
'/JFJochProtoBuf.gRPC_JFJochReceiver/Start',
request_serializer=jfjoch__pb2.ReceiverInput.SerializeToString,
response_deserializer=jfjoch__pb2.Empty.FromString,
)
self.Abort = channel.unary_unary(
'/JFJochProtoBuf.gRPC_JFJochReceiver/Abort',
request_serializer=jfjoch__pb2.Empty.SerializeToString,
response_deserializer=jfjoch__pb2.Empty.FromString,
)
self.Cancel = channel.unary_unary(
'/JFJochProtoBuf.gRPC_JFJochReceiver/Cancel',
request_serializer=jfjoch__pb2.Empty.SerializeToString,
response_deserializer=jfjoch__pb2.Empty.FromString,
)
self.Stop = channel.unary_unary(
'/JFJochProtoBuf.gRPC_JFJochReceiver/Stop',
request_serializer=jfjoch__pb2.Empty.SerializeToString,
response_deserializer=jfjoch__pb2.ReceiverOutput.FromString,
)
self.GetStatus = channel.unary_unary(
'/JFJochProtoBuf.gRPC_JFJochReceiver/GetStatus',
request_serializer=jfjoch__pb2.Empty.SerializeToString,
response_deserializer=jfjoch__pb2.ReceiverStatus.FromString,
)
self.SetDataProcessingSettings = channel.unary_unary(
'/JFJochProtoBuf.gRPC_JFJochReceiver/SetDataProcessingSettings',
request_serializer=jfjoch__pb2.DataProcessingSettings.SerializeToString,
response_deserializer=jfjoch__pb2.Empty.FromString,
)
self.GetDataProcessingPlots = channel.unary_unary(
'/JFJochProtoBuf.gRPC_JFJochReceiver/GetDataProcessingPlots',
request_serializer=jfjoch__pb2.PlotRequest.SerializeToString,
response_deserializer=jfjoch__pb2.Plot.FromString,
)
self.GetRadialIntegrationProfiles = channel.unary_unary(
'/JFJochProtoBuf.gRPC_JFJochReceiver/GetRadialIntegrationProfiles',
request_serializer=jfjoch__pb2.Empty.SerializeToString,
response_deserializer=jfjoch__pb2.RadialIntegrationProfiles.FromString,
)
self.GetNetworkConfig = channel.unary_unary(
'/JFJochProtoBuf.gRPC_JFJochReceiver/GetNetworkConfig',
request_serializer=jfjoch__pb2.Empty.SerializeToString,
response_deserializer=jfjoch__pb2.ReceiverNetworkConfig.FromString,
)
class gRPC_JFJochReceiverServicer(object):
"""Missing associated documentation comment in .proto file."""
def Start(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Abort(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Cancel(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Stop(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetStatus(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def SetDataProcessingSettings(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetDataProcessingPlots(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetRadialIntegrationProfiles(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetNetworkConfig(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_gRPC_JFJochReceiverServicer_to_server(servicer, server):
rpc_method_handlers = {
'Start': grpc.unary_unary_rpc_method_handler(
servicer.Start,
request_deserializer=jfjoch__pb2.ReceiverInput.FromString,
response_serializer=jfjoch__pb2.Empty.SerializeToString,
),
'Abort': grpc.unary_unary_rpc_method_handler(
servicer.Abort,
request_deserializer=jfjoch__pb2.Empty.FromString,
response_serializer=jfjoch__pb2.Empty.SerializeToString,
),
'Cancel': grpc.unary_unary_rpc_method_handler(
servicer.Cancel,
request_deserializer=jfjoch__pb2.Empty.FromString,
response_serializer=jfjoch__pb2.Empty.SerializeToString,
),
'Stop': grpc.unary_unary_rpc_method_handler(
servicer.Stop,
request_deserializer=jfjoch__pb2.Empty.FromString,
response_serializer=jfjoch__pb2.ReceiverOutput.SerializeToString,
),
'GetStatus': grpc.unary_unary_rpc_method_handler(
servicer.GetStatus,
request_deserializer=jfjoch__pb2.Empty.FromString,
response_serializer=jfjoch__pb2.ReceiverStatus.SerializeToString,
),
'SetDataProcessingSettings': grpc.unary_unary_rpc_method_handler(
servicer.SetDataProcessingSettings,
request_deserializer=jfjoch__pb2.DataProcessingSettings.FromString,
response_serializer=jfjoch__pb2.Empty.SerializeToString,
),
'GetDataProcessingPlots': grpc.unary_unary_rpc_method_handler(
servicer.GetDataProcessingPlots,
request_deserializer=jfjoch__pb2.PlotRequest.FromString,
response_serializer=jfjoch__pb2.Plot.SerializeToString,
),
'GetRadialIntegrationProfiles': grpc.unary_unary_rpc_method_handler(
servicer.GetRadialIntegrationProfiles,
request_deserializer=jfjoch__pb2.Empty.FromString,
response_serializer=jfjoch__pb2.RadialIntegrationProfiles.SerializeToString,
),
'GetNetworkConfig': grpc.unary_unary_rpc_method_handler(
servicer.GetNetworkConfig,
request_deserializer=jfjoch__pb2.Empty.FromString,
response_serializer=jfjoch__pb2.ReceiverNetworkConfig.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'JFJochProtoBuf.gRPC_JFJochReceiver', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class gRPC_JFJochReceiver(object):
"""Missing associated documentation comment in .proto file."""
@staticmethod
def Start(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/JFJochProtoBuf.gRPC_JFJochReceiver/Start',
jfjoch__pb2.ReceiverInput.SerializeToString,
jfjoch__pb2.Empty.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def Abort(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/JFJochProtoBuf.gRPC_JFJochReceiver/Abort',
jfjoch__pb2.Empty.SerializeToString,
jfjoch__pb2.Empty.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def Cancel(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/JFJochProtoBuf.gRPC_JFJochReceiver/Cancel',
jfjoch__pb2.Empty.SerializeToString,
jfjoch__pb2.Empty.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def Stop(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/JFJochProtoBuf.gRPC_JFJochReceiver/Stop',
jfjoch__pb2.Empty.SerializeToString,
jfjoch__pb2.ReceiverOutput.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def GetStatus(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/JFJochProtoBuf.gRPC_JFJochReceiver/GetStatus',
jfjoch__pb2.Empty.SerializeToString,
jfjoch__pb2.ReceiverStatus.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def SetDataProcessingSettings(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/JFJochProtoBuf.gRPC_JFJochReceiver/SetDataProcessingSettings',
jfjoch__pb2.DataProcessingSettings.SerializeToString,
jfjoch__pb2.Empty.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def GetDataProcessingPlots(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/JFJochProtoBuf.gRPC_JFJochReceiver/GetDataProcessingPlots',
jfjoch__pb2.PlotRequest.SerializeToString,
jfjoch__pb2.Plot.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def GetRadialIntegrationProfiles(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/JFJochProtoBuf.gRPC_JFJochReceiver/GetRadialIntegrationProfiles',
jfjoch__pb2.Empty.SerializeToString,
jfjoch__pb2.RadialIntegrationProfiles.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def GetNetworkConfig(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/JFJochProtoBuf.gRPC_JFJochReceiver/GetNetworkConfig',
jfjoch__pb2.Empty.SerializeToString,
jfjoch__pb2.ReceiverNetworkConfig.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
class gRPC_JFJochWriterStub(object):
"""Missing associated documentation comment in .proto file."""

View File

@@ -5,10 +5,6 @@ ADD_LIBRARY(JFJochReceiver STATIC
TARGET_LINK_LIBRARIES(JFJochReceiver ImageAnalysis JungfraujochAcqusitionDevice CommonFunctions HLSSimulation)
ADD_EXECUTABLE(jfjoch_receiver jfjoch_receiver.cpp)
TARGET_LINK_LIBRARIES(jfjoch_receiver JFJochReceiver)
INSTALL(TARGETS jfjoch_receiver RUNTIME)
ADD_EXECUTABLE(jfjoch_action_test jfjoch_action_test.cpp)
TARGET_LINK_LIBRARIES(jfjoch_action_test JungfraujochHost JFJochReceiver)
INSTALL(TARGETS jfjoch_action_test RUNTIME)

View File

@@ -2,71 +2,12 @@
#include "JFJochReceiverService.h"
inline PlotRequest Convert(const JFJochProtoBuf::PlotRequest& request) {
PlotRequest ret;
ret.binning = request.binning();
switch (request.type()) {
case JFJochProtoBuf::BKG_ESTIMATE:
ret.type = PlotType::BkgEstimate;
break;
case JFJochProtoBuf::RAD_INT:
ret.type = PlotType::RadInt;
break;
case JFJochProtoBuf::SPOT_COUNT:
ret.type = PlotType::SpotCount;
break;
case JFJochProtoBuf::INDEXING_RATE:
ret.type = PlotType::IndexingRate;
break;
case JFJochProtoBuf::INDEXING_RATE_PER_FILE:
ret.type = PlotType::IndexingRatePerFile;
break;
default:
case JFJochProtoBuf::ADU_HISTOGRAM:
ret.type = PlotType::ADUHistorgram;
break;
}
return ret;
}
inline void Convert(const Plot& input, JFJochProtoBuf::Plot &output) {
if (!input.x.empty())
*output.mutable_x() = {input.x.begin(), input.x.end()};
if (!input.y.empty())
*output.mutable_y() = {input.y.begin(), input.y.end()};
}
inline DataProcessingSettings Convert(const JFJochProtoBuf::DataProcessingSettings &input) {
DataProcessingSettings ret;
ret.signal_to_noise_threshold = input.signal_to_noise_threshold();
ret.photon_count_threshold = input.photon_count_threshold();
ret.min_pix_per_spot = input.min_pix_per_spot();
ret.max_pix_per_spot = input.max_pix_per_spot();
ret.local_bkg_size = input.local_bkg_size();
ret.high_resolution_limit = input.high_resolution_limit();
ret.low_resolution_limit = input.low_resolution_limit();
ret.bkg_estimate_low_q = input.bkg_estimate_low_q();
ret.bkg_estimate_high_q = input.bkg_estimate_high_q();
ret.preview_indexed_only = input.preview_indexed_only();
return ret;
}
void Convert(const RadialIntegrationProfiles& input, JFJochProtoBuf::RadialIntegrationProfiles& output) {
for (const auto &i: input.profiles) {
auto tmp = output.add_profiles();
tmp->set_title(i.title);
Convert(i.plot, *tmp->mutable_plot());
}
}
JFJochReceiverService::JFJochReceiverService(AcquisitionDeviceGroup &in_aq_devices,
Logger &in_logger, ImagePusher &pusher) :
logger(in_logger), aq_devices(in_aq_devices),
image_pusher(pusher), data_processing_settings(DiffractionExperiment::DefaultDataProcessingSettings()) {
}
JFJochReceiverService& JFJochReceiverService::NumThreads(int64_t input) {
if (input <= 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Thread number must be above zero");
@@ -96,23 +37,6 @@ JFJochReceiverService &JFJochReceiverService::NUMAPolicy(const std::string &poli
return *this;
}
grpc::Status JFJochReceiverService::Start(grpc::ServerContext *context, const JFJochProtoBuf::ReceiverInput *request,
JFJochProtoBuf::Empty *response) {
experiment = std::make_unique<DiffractionExperiment>(request->jungfraujoch_settings());
if (request->has_calibration())
calibration = std::make_unique<JFCalibration>(request->calibration());
else
calibration.reset();
try {
Start(*experiment, calibration.get());
return grpc::Status::OK;
} catch (const JFJochException &e) {
logger.ErrorException(e);
return {grpc::StatusCode::ABORTED, e.what()};
}
}
void JFJochReceiverService::FinalizeMeasurement() {
receiver->StopReceiver();
{
@@ -122,55 +46,6 @@ void JFJochReceiverService::FinalizeMeasurement() {
}
}
grpc::Status JFJochReceiverService::Stop(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::ReceiverOutput *response) {
try {
auto output = Stop();
response->set_max_receive_delay(output.max_receive_delay);
response->set_compressed_size(output.compressed_size);
response->set_compressed_ratio(output.compressed_ratio);
response->set_images_sent(output.images_sent);
response->set_start_time_ms(output.start_time_ms);
response->set_end_time_ms(output.end_time_ms);
response->set_efficiency(output.efficiency);
response->set_max_image_number_sent(output.max_image_number_sent);
response->set_cancelled(output.cancelled);
response->set_master_file_name(output.master_file_name);
if (!output.pedestal_result.empty())
*response->mutable_pedestal_result() = {output.pedestal_result.begin(), output.pedestal_result.end()};
response->set_indexing_rate(output.indexing_rate);
response->set_bkg_estimate(output.bkg_estimate);
for (const auto &i: output.received_packets)
response->add_received_packets(i);
for (const auto &i: output.expected_packets)
response->add_expected_packets(i);
return grpc::Status::OK;
} catch (JFJochException &e) {
logger.ErrorException(e);
return {grpc::StatusCode::ABORTED, e.what()};
}
}
grpc::Status JFJochReceiverService::Cancel(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::Empty *response) {
try {
Cancel();
return grpc::Status::OK;
} catch (std::exception &e) {
return {grpc::StatusCode::ABORTED, e.what()};
}
}
grpc::Status JFJochReceiverService::Abort(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::Empty *response) {
try {
Abort();
return grpc::Status::OK;
} catch (std::exception &e) {
return {grpc::StatusCode::ABORTED, e.what()};
}
}
JFJochReceiverStatus JFJochReceiverService::GetStatus() {
// Need to hold mutex, as receiver might not exist here, if state is idle
std::unique_lock<std::mutex> ul(state_mutex);
@@ -185,63 +60,6 @@ JFJochReceiverStatus JFJochReceiverService::GetStatus() {
};
}
grpc::Status JFJochReceiverService::GetStatus(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::ReceiverStatus *response) {
auto tmp = GetStatus();
response->set_send_buffers_avail(tmp.send_buffers_avail);
response->set_indexing_rate(tmp.indexing_rate);
response->set_progress(tmp.progress);
return grpc::Status::OK;
}
grpc::Status JFJochReceiverService::GetDataProcessingPlots(grpc::ServerContext *context,
const JFJochProtoBuf::PlotRequest *request,
JFJochProtoBuf::Plot *response) {
// Need to hold mutex, as receiver might not exist here, if state is idle
try {
Convert(GetDataProcessingPlot(Convert(*request)), *response);
return grpc::Status::OK;
} catch (std::exception &e) {
return {grpc::StatusCode::ABORTED, e.what()};
}
}
grpc::Status JFJochReceiverService::GetRadialIntegrationProfiles(grpc::ServerContext *context,
const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::RadialIntegrationProfiles *response) {
// Need to hold mutex, as receiver might not exist here, if state is idle
try {
Convert(GetRadialIntegrationProfiles(), *response);
return grpc::Status::OK;
} catch (std::exception &e) {
return {grpc::StatusCode::ABORTED, e.what()};
}
}
grpc::Status JFJochReceiverService::SetDataProcessingSettings(grpc::ServerContext *context,
const JFJochProtoBuf::DataProcessingSettings *request,
JFJochProtoBuf::Empty *response) {
try {
SetDataProcessingSettings(Convert(*request));
return grpc::Status::OK;
} catch (std::exception &e) {
return {grpc::StatusCode::ABORTED, e.what()};
}
}
grpc::Status JFJochReceiverService::GetNetworkConfig(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::ReceiverNetworkConfig *response) {
auto v = GetNetworkConfig();
for (const auto &i: v) {
auto dev_net_cfg = response->add_device();
dev_net_cfg->set_mac_addr(i.mac_addr);
dev_net_cfg->set_ipv4_addr(i.ipv4_addr);
dev_net_cfg->set_udp_port(i.udp_port);
}
return grpc::Status::OK;
}
void JFJochReceiverService::Start(const DiffractionExperiment &experiment, const JFCalibration *calibration) {
std::unique_lock<std::mutex> ul_state(state_mutex);
if (state != ReceiverState::Idle)

View File

@@ -3,19 +3,15 @@
#ifndef JUNGFRAUJOCH_JFJOCHRECEIVERSERVICE_H
#define JUNGFRAUJOCH_JFJOCHRECEIVERSERVICE_H
#include "JFJochReceiver.h"
#include "jfjoch.grpc.pb.h"
#include <mutex>
#include "JFJochReceiver.h"
#include "../common/NUMAHWPolicy.h"
class JFJochReceiverService final : public JFJochProtoBuf::gRPC_JFJochReceiver::Service {
class JFJochReceiverService {
NUMAHWPolicy numa_policy;
std::unique_ptr<JFJochReceiver> receiver;
AcquisitionDeviceGroup &aq_devices;
std::unique_ptr<JFCalibration> calibration;
std::unique_ptr<DiffractionExperiment> experiment;
Logger &logger;
ImagePusher &image_pusher;
@@ -49,25 +45,6 @@ public:
RadialIntegrationProfiles GetRadialIntegrationProfiles();
JFJochReceiverStatus GetStatus();
std::vector<AcquisitionDeviceNetConfig> GetNetworkConfig();
grpc::Status Start(grpc::ServerContext* context, const JFJochProtoBuf::ReceiverInput* request,
JFJochProtoBuf::Empty* response) override;
grpc::Status Abort(grpc::ServerContext* context, const JFJochProtoBuf::Empty* request,
JFJochProtoBuf::Empty* response) override;
grpc::Status Cancel(grpc::ServerContext* context, const JFJochProtoBuf::Empty* request,
JFJochProtoBuf::Empty* response) override;
grpc::Status Stop(grpc::ServerContext* context, const JFJochProtoBuf::Empty* request,
JFJochProtoBuf::ReceiverOutput* response) override;
grpc::Status GetStatus(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::ReceiverStatus *response) override;
grpc::Status SetDataProcessingSettings(grpc::ServerContext *context, const JFJochProtoBuf::DataProcessingSettings *request,
JFJochProtoBuf::Empty *response) override;
grpc::Status GetNetworkConfig(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::ReceiverNetworkConfig *response) override;
grpc::Status GetDataProcessingPlots(grpc::ServerContext *context, const JFJochProtoBuf::PlotRequest *request,
JFJochProtoBuf::Plot *response) override;
grpc::Status GetRadialIntegrationProfiles(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request,
JFJochProtoBuf::RadialIntegrationProfiles *response) override;
};

View File

@@ -27,7 +27,6 @@ JFJochReceiverOutput RunJFJochReceiverTest(AcquisitionDeviceGroup &aq_devices, I
settings.local_bkg_size = 5;
service.SetDataProcessingSettings(settings);
grpc::ServerContext grpc_context;
service.Start(x, &calib);
if (x.GetImageNum() > 0) {

View File

@@ -11,7 +11,7 @@ ADD_EXECUTABLE(CatchTest
FrameTransformationTest.cpp
HDF5WritingTest.cpp PedestalCalcTest.cpp
ZMQImagePusherTest.cpp StreamWriterTest.cpp
CoordTest.cpp JFJochBrokerTest.cpp gRPCServerTest.cpp
CoordTest.cpp JFJochBrokerTest.cpp
JFJochReceiverIntegrationTest.cpp
AcquisitionCountersTest.cpp
JFJochFullIntegrationTest.cpp

View File

@@ -158,17 +158,6 @@ TEST_CASE("DiffractionExperiment_UnitCell","[DiffractionExperiment]") {
REQUIRE_NOTHROW(x.SetUnitCell());
REQUIRE(!x.HasUnitCell());
REQUIRE_NOTHROW(x.SetUnitCell(cell));
JFJochProtoBuf::JungfraujochSettings settings = x;
DiffractionExperiment y(settings);
REQUIRE(y.GetUnitCell().a == x.GetUnitCell().a);
REQUIRE(y.GetUnitCell().b == x.GetUnitCell().b);
REQUIRE(y.GetUnitCell().c == x.GetUnitCell().c);
REQUIRE(y.GetUnitCell().alpha == x.GetUnitCell().alpha);
REQUIRE(y.GetUnitCell().beta == x.GetUnitCell().beta);
REQUIRE(y.GetUnitCell().gamma == x.GetUnitCell().gamma);
}
TEST_CASE("DiffractionExperiment_IPv4Address","[DiffractionExperiment]") {
@@ -453,47 +442,6 @@ TEST_CASE("DiffractionExperiment_FrameCountTime","[DiffractionExperiment]") {
REQUIRE(x.GetImageCountTime() == std::chrono::microseconds(7*567));
} */
TEST_CASE("DiffractionExperiment_ExportProtobuf","[DiffractionExperiment]") {
DiffractionExperiment x(DetectorGeometry(16, 4, 0, 0, false)),y;
x.DataStreams(3);
std::vector<DetectorMode> v = {DetectorMode::Raw, DetectorMode::Conversion,
DetectorMode::PedestalG0, DetectorMode::PedestalG1, DetectorMode::PedestalG2};
for (auto &i : v) {
x.Mode(i).FilePrefix("z").ImagesPerTrigger(20).NumTriggers(5).PedestalG0Frames(1345)
.PedestalG1Frames(1876).PedestalG2Frames(654)
.PhotonEnergy_keV(16.0).BeamX_pxl(566).BeamY_pxl(1234).DetectorDistance_mm(145)
.FrameTime(std::chrono::microseconds(765), std::chrono::microseconds(10))
.PedestalG1G2FrameTime(std::chrono::milliseconds(10))
.IPv4BaseAddr("2.2.2.2").MaskModuleEdges(true);
JFJochProtoBuf::JungfraujochSettings settings_in_protobuf = x;
REQUIRE_NOTHROW(y.Import(settings_in_protobuf));
REQUIRE(y.GetFilePrefix() == x.GetFilePrefix());
REQUIRE(x.GetDataStreamsNum() == y.GetDataStreamsNum());
REQUIRE(x.GetXPixelsNum() == y.GetXPixelsNum());
REQUIRE(x.GetModulesNum(2) == y.GetModulesNum(2));
REQUIRE(x.GetDetectorMode() == y.GetDetectorMode());
REQUIRE(y.GetPedestalG1Frames() == 1876);
REQUIRE(y.GetPedestalG2Frames() == 654);
REQUIRE(y.GetPhotonEnergy_keV() == 16.0);
REQUIRE(y.GetBeamX_pxl() == 566);
REQUIRE(y.GetBeamY_pxl() == 1234);
REQUIRE(y.GetDetectorDistance_mm() == 145);
REQUIRE(x.GetFrameNum() == y.GetFrameNum());
REQUIRE(x.GetImageTime() == y.GetImageTime());
REQUIRE(y.GetFrameCountTime().count() == x.GetFrameCountTime().count());
REQUIRE(y.GetSrcIPv4Address(0, 0) == 0x02020202);
REQUIRE(y.GetPedestalG0Frames() == x.GetPedestalG0Frames());
REQUIRE(y.GetMaskModuleEdges() == x.GetMaskModuleEdges());
REQUIRE(y.GetMaskChipEdges() == x.GetMaskChipEdges());
}
}
TEST_CASE("DiffractionExperiment_InternalPacketGenerator", "[DiffractionExperiment]") {
DiffractionExperiment x;
@@ -871,17 +819,17 @@ TEST_CASE("DiffractionExperiment_DetectorModuleHostname","[DiffractionExperiment
DiffractionExperiment x(DetectorSetup(3, "X", h));
JFJochProtoBuf::DetectorConfig det_cfg;
JFJochProtoBuf::ReceiverNetworkConfig net_cfg;
std::vector<AcquisitionDeviceNetConfig> net_cfg;
auto d1 = net_cfg.add_device();
d1->set_mac_addr("00:00:00:00:00:00");
d1->set_ipv4_addr("10.10.50.1");
d1->set_udp_port(1234);
net_cfg.push_back(AcquisitionDeviceNetConfig{
.mac_addr = "00:00:00:00:00:00",
.ipv4_addr = "10.10.50.1",
.udp_port = 1234});
auto d2 = net_cfg.add_device();
d2->set_mac_addr("00:00:00:00:00:01");
d2->set_ipv4_addr("10.10.50.2");
d2->set_udp_port(1234);
net_cfg.push_back(AcquisitionDeviceNetConfig{
.mac_addr = "00:00:00:00:00:01",
.ipv4_addr = "10.10.50.2",
.udp_port = 1234});
std::vector<std::string> h_out;
REQUIRE_NOTHROW(x.GetDetectorModuleHostname(h_out));

View File

@@ -22,21 +22,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ", "[JFJochReceiver]") {
int64_t ndatastream = 2;
int64_t nmodules = 4;
JFJochServices services(logger);
JFJochStateMachine state_machine(services, logger);
REQUIRE(!state_machine.GetMeasurementStatistics().has_value());
state_machine.AddDetectorSetup(DetectorGeometry(ndatastream * nmodules, 2, 8, 36));
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0);
services.Writer("unix:writer_test", "inproc://#1").Receiver("unix:fpga_receiver_test");
logger.Verbose(true);
std::vector<uint16_t> image(RAW_MODULE_SIZE);
AcquisitionDeviceGroup aq_devices;
AcquisitionDeviceGroup aq_devices;
for (int i = 0; i < ndatastream; i++) {
auto test = std::make_unique<MockAcquisitionDevice>(i, 256);
@@ -45,10 +31,23 @@ TEST_CASE("JFJochIntegrationTest_ZMQ", "[JFJochReceiver]") {
ZMQImagePusher pusher(zmq_context, {"inproc://#1"});
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
JFJochServices services(logger);
JFJochStateMachine state_machine(services, logger);
REQUIRE(!state_machine.GetMeasurementStatistics().has_value());
state_machine.AddDetectorSetup(DetectorGeometry(ndatastream * nmodules, 2, 8, 36));
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0);
services.Writer("unix:writer_test", "inproc://#1").Receiver(&fpga_receiver);
logger.Verbose(true);
std::vector<uint16_t> image(RAW_MODULE_SIZE);
JFJochWriterService writer(zmq_context, logger);
auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver);
auto writer_server = gRPCServer("unix:writer_test", writer);
REQUIRE_NOTHROW(state_machine.Initialize());
@@ -96,7 +95,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ", "[JFJochReceiver]") {
REQUIRE(statistics.detector_width() == 2068);
REQUIRE(statistics.detector_height() == 2164);
REQUIRE(statistics.detector_pixel_depth() == 2);
fpga_receiver_server->Shutdown();
writer_server->Shutdown();
}
@@ -110,34 +108,33 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_save_calibration", "[JFJochReceiver]") {
int64_t nimages = 5;
int64_t ndatastream = 2;
int64_t nmodules = 4;
JFJochServices services(logger);
JFJochStateMachine state_machine(services, logger);
REQUIRE(!state_machine.GetMeasurementStatistics().has_value());
state_machine.AddDetectorSetup(DetectorGeometry(ndatastream * nmodules, 2, 8, 36));
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0);
services.Writer("unix:writer_test", "inproc://#1").Receiver("unix:fpga_receiver_test");
logger.Verbose(true);
std::vector<uint16_t> image(RAW_MODULE_SIZE);
AcquisitionDeviceGroup aq_devices;
AcquisitionDeviceGroup aq_devices;
for (int i = 0; i < ndatastream; i++) {
auto test = std::make_unique<MockAcquisitionDevice>(i, 256);
aq_devices.Add(std::move(test));
}
ZMQImagePusher pusher(zmq_context, {"inproc://#1"});
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
JFJochServices services(logger);
JFJochStateMachine state_machine(services, logger);
REQUIRE(!state_machine.GetMeasurementStatistics().has_value());
state_machine.AddDetectorSetup(DetectorGeometry(ndatastream * nmodules, 2, 8, 36));
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0);
services.Writer("unix:writer_test", "inproc://#1").Receiver(&fpga_receiver);
logger.Verbose(true);
std::vector<uint16_t> image(RAW_MODULE_SIZE);;
JFJochWriterService writer(zmq_context, logger);
auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver);
auto writer_server = gRPCServer("unix:writer_test", writer);
REQUIRE_NOTHROW(state_machine.Initialize());
@@ -186,7 +183,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_save_calibration", "[JFJochReceiver]") {
REQUIRE(statistics.detector_width() == 2068);
REQUIRE(statistics.detector_height() == 2164);
REQUIRE(statistics.detector_pixel_depth() == 2);
fpga_receiver_server->Shutdown();
writer_server->Shutdown();
}
@@ -200,6 +196,16 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_2DataStreams_4Devices", "[JFJochReceiver]")
int64_t ndatastream = 2;
int64_t nmodules = 4;
AcquisitionDeviceGroup aq_devices;
for (int i = 0; i < 4; i++) {
auto test = std::make_unique<MockAcquisitionDevice>(i, 256);
aq_devices.Add(std::move(test));
}
ZMQImagePusher pusher(zmq_context, {"inproc://#1"});
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
JFJochServices services(logger);
JFJochStateMachine state_machine(services, logger);
@@ -209,25 +215,14 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_2DataStreams_4Devices", "[JFJochReceiver]")
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0);
services.Writer("unix:writer_test", "inproc://#1").Receiver("unix:fpga_receiver_test");
services.Writer("unix:writer_test", "inproc://#1").Receiver(&fpga_receiver);
logger.Verbose(true);
std::vector<uint16_t> image(RAW_MODULE_SIZE);
AcquisitionDeviceGroup aq_devices;
for (int i = 0; i < 4; i++) {
auto test = std::make_unique<MockAcquisitionDevice>(i, 256);
aq_devices.Add(std::move(test));
}
ZMQImagePusher pusher(zmq_context, {"inproc://#1"});
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
JFJochWriterService writer(zmq_context, logger);
auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver);
auto writer_server = gRPCServer("unix:writer_test", writer);
REQUIRE_NOTHROW(state_machine.Initialize());
@@ -276,7 +271,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_2DataStreams_4Devices", "[JFJochReceiver]")
REQUIRE(statistics.detector_height() == 2164);
REQUIRE(statistics.detector_pixel_depth() == 2);
fpga_receiver_server->Shutdown();
writer_server->Shutdown();
}
@@ -298,7 +292,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_RAW", "[JFJochReceiver]") {
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().Mode(DetectorMode::Conversion);
services.Writer("unix:writer_test", "inproc://#1").Receiver("unix:fpga_receiver_test");
services.Writer("unix:writer_test", "inproc://#1");
logger.Verbose(true);
@@ -313,9 +307,10 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_RAW", "[JFJochReceiver]") {
ZMQImagePusher pusher(zmq_context, {"inproc://#1"});
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
services.Receiver(&fpga_receiver);
JFJochWriterService writer(zmq_context, logger);
auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver);
auto writer_server = gRPCServer("unix:writer_test", writer);
JFJochProtoBuf::DetectorSettings detector_settings;
@@ -367,7 +362,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_RAW", "[JFJochReceiver]") {
REQUIRE(statistics.detector_height() == 8 * 512);
REQUIRE(statistics.detector_pixel_depth() == 2);
fpga_receiver_server->Shutdown();
writer_server->Shutdown();
}
@@ -391,8 +386,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_3Writers", "[JFJochReceiver]") {
services
.Writer("unix:writer_test_0", "inproc://#0")
.Writer("unix:writer_test_1", "inproc://#1")
.Writer("unix:writer_test_2", "inproc://#2")
.Receiver("unix:fpga_receiver_test");
.Writer("unix:writer_test_2", "inproc://#2");
logger.Verbose(true);
@@ -407,12 +401,12 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_3Writers", "[JFJochReceiver]") {
ZMQImagePusher pusher(zmq_context, {"inproc://#0", "inproc://#1", "inproc://#2"});
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
services.Receiver(&fpga_receiver);
JFJochWriterService writer_0(zmq_context, logger);
JFJochWriterService writer_1(zmq_context, logger);
JFJochWriterService writer_2(zmq_context, logger);
auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver);
auto writer_server_0 = gRPCServer("unix:writer_test_0", writer_0);
auto writer_server_1 = gRPCServer("unix:writer_test_1", writer_1);
auto writer_server_2 = gRPCServer("unix:writer_test_2", writer_2);
@@ -457,7 +451,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_3Writers", "[JFJochReceiver]") {
REQUIRE(statistics.collection_efficiency() == 1.0);
REQUIRE(statistics.images_collected() == nimages);
fpga_receiver_server->Shutdown();
writer_server_0->Shutdown();
writer_server_1->Shutdown();
writer_server_2->Shutdown();
@@ -481,7 +475,7 @@ TEST_CASE("JFJochIntegrationTest_Cancel", "[JFJochReceiver]") {
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0);
services.Writer("unix:writer_test", "inproc://#1").Receiver("unix:fpga_receiver_test");
services.Writer("unix:writer_test", "inproc://#1");
logger.Verbose(true);
@@ -501,9 +495,10 @@ TEST_CASE("JFJochIntegrationTest_Cancel", "[JFJochReceiver]") {
ZMQImagePusher pusher(zmq_context, {"inproc://#1"});
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
services.Receiver(&fpga_receiver);
JFJochWriterService writer(zmq_context, logger);
auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver);
auto writer_server = gRPCServer("unix:writer_test", writer);
REQUIRE_NOTHROW(state_machine.Initialize());
@@ -534,7 +529,7 @@ TEST_CASE("JFJochIntegrationTest_Cancel", "[JFJochReceiver]") {
REQUIRE(statistics.max_image_number_sent() == 4);
REQUIRE(statistics.cancelled());
fpga_receiver_server->Shutdown();
writer_server->Shutdown();
}
@@ -554,7 +549,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") {
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0).PreviewPeriod(
5ms);
services.Writer("unix:writer_test", "inproc://#1").Receiver("unix:fpga_receiver_test");
services.Writer("unix:writer_test", "inproc://#1");
logger.Verbose(true);
@@ -576,6 +571,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") {
ZMQImagePusher pusher(zmq_context, {"inproc://#1"});
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
services.Receiver(&fpga_receiver);
JFJochWriterService writer(zmq_context, logger);
ZMQPreviewPublisher preview(zmq_context, "inproc://#2");
@@ -585,7 +582,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") {
REQUIRE_NOTHROW(rcv_preview_socket.Connect("inproc://#2"));
rcv_preview_socket.SubscribeAll();
auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver);
auto writer_server = gRPCServer("unix:writer_test", writer);
REQUIRE_NOTHROW(state_machine.Initialize());
@@ -637,7 +633,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") {
REQUIRE(statistics.collection_efficiency() == 1.0);
REQUIRE(statistics.images_collected() == 5);
fpga_receiver_server->Shutdown();
writer_server->Shutdown();
}
@@ -657,7 +653,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]"
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0).PreviewPeriod(
5ms);
services.Writer("unix:writer_test", "inproc://#1").Receiver("unix:fpga_receiver_test");
services.Writer("unix:writer_test", "inproc://#1");
logger.Verbose(true);
@@ -678,6 +674,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]"
ZMQImagePusher pusher(zmq_context, {"inproc://#1"});
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
services.Receiver(&fpga_receiver);
JFJochWriterService writer(zmq_context, logger);
ZMQPreviewPublisher preview(zmq_context, "inproc://#2");
@@ -687,7 +685,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]"
REQUIRE_NOTHROW(rcv_preview_socket.Connect("inproc://#2"));
rcv_preview_socket.SubscribeAll();
auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver);
auto writer_server = gRPCServer("unix:writer_test", writer);
REQUIRE_NOTHROW(state_machine.Initialize());
@@ -737,7 +734,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]"
REQUIRE(statistics.collection_efficiency() == 1.0);
REQUIRE(statistics.images_collected() == 5);
fpga_receiver_server->Shutdown();
writer_server->Shutdown();
}
@@ -756,7 +753,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot", "[JFJochReceiver]") {
state_machine.AddDetectorSetup(DetectorGeometry(ndatastream * nmodules, 2, 8, 36));
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0);
services.Writer("unix:writer_test", "inproc://#1").Receiver("unix:fpga_receiver_test");
services.Writer("unix:writer_test", "inproc://#1");
logger.Verbose(true);
@@ -797,9 +794,10 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot", "[JFJochReceiver]") {
ZMQImagePusher pusher(zmq_context, {"inproc://#1"});
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
services.Receiver(&fpga_receiver);
JFJochWriterService writer(zmq_context, logger);
auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver);
auto writer_server = gRPCServer("unix:writer_test", writer);
REQUIRE_NOTHROW(state_machine.Initialize());
@@ -844,8 +842,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot", "[JFJochReceiver]") {
REQUIRE(statistics.collection_efficiency() == 1.0);
REQUIRE(statistics.images_collected() == 1);
fpga_receiver_server->Shutdown();
writer_server->Shutdown();
}
/*
@@ -865,7 +862,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index", "[JFJochReceiver]
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0)
.SpotFindingPeriod(10ms);
services.Writer("unix:writer_test", "inproc://#1").Receiver("unix:fpga_receiver_test");
services.Writer("unix:writer_test", "inproc://#1").Receiver(&fpga_receiver);
logger.Verbose(true);
@@ -913,7 +910,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index", "[JFJochReceiver]
JFJochReceiverService fpga_receiver(tmp_devices, logger, pusher);
JFJochWriterService writer(zmq_context, logger);;
auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver);
auto writer_server = gRPCServer("unix:writer_test", writer);
REQUIRE_NOTHROW(state_machine.Initialize());
@@ -961,7 +957,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index", "[JFJochReceiver]
REQUIRE(statistics.collection_efficiency() == 1.0);
REQUIRE(statistics.images_collected() == nimages);
fpga_receiver_server->Shutdown();
writer_server->Shutdown();
}*/
@@ -983,7 +979,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_rad_int", "[JFJochReceiver]") {
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0);
state_machine.NotThreadSafe_Experiment().LowQForRadialInt_recipA(0.5).HighQForRadialInt_recipA(3.5)
.QSpacingForRadialInt_recipA(1.0);
services.Writer("unix:writer_test", "inproc://#1").Receiver("unix:fpga_receiver_test");
services.Writer("unix:writer_test", "inproc://#1");
logger.Verbose(true);
@@ -1029,9 +1025,9 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_rad_int", "[JFJochReceiver]") {
ZMQImagePusher pusher(zmq_context, {"inproc://#1"});
JFJochReceiverService fpga_receiver(aq_devices, logger, pusher);
JFJochWriterService writer(zmq_context, logger);;
services.Receiver(&fpga_receiver);
auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver);
JFJochWriterService writer(zmq_context, logger);
auto writer_server = gRPCServer("unix:writer_test", writer);
REQUIRE_NOTHROW(state_machine.Initialize());
@@ -1074,10 +1070,10 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_rad_int", "[JFJochReceiver]") {
CHECK(plot_map[0].title() == "dataset");
CHECK(plot_map[0].plot().x_size() == 3);
fpga_receiver_server->Shutdown();
writer_server->Shutdown();
}
/*
TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index_sum", "[JFJochReceiver]") {
Logger logger("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index_sum");
@@ -1095,7 +1091,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index_sum", "[JFJochRecei
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0)
.SpotFindingPeriod(10ms);
services.Writer("unix:writer_test", "inproc://#1").Receiver("unix:fpga_receiver_test");
services.Writer("unix:writer_test", "inproc://#1").Receiver(&fpga_receiver);
logger.Verbose(true);
@@ -1145,7 +1141,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index_sum", "[JFJochRecei
JFJochReceiverService fpga_receiver(tmp_devices, logger, pusher);
JFJochWriterService writer(zmq_context, logger);
auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver);
auto writer_server = gRPCServer("unix:writer_test", writer);
REQUIRE_NOTHROW(state_machine.Initialize());
@@ -1194,7 +1189,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index_sum", "[JFJochRecei
REQUIRE(statistics.has_indexing_rate());
REQUIRE(statistics.indexing_rate() == 1.0);
fpga_receiver_server->Shutdown();
writer_server->Shutdown();
}*/
//#endif

View File

@@ -24,9 +24,6 @@ TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") {
x.FilePrefix("subdir/JFJochWriterTest").NumTriggers(1).ImagesPerTrigger(5)
.UseInternalPacketGenerator(true).Mode(DetectorMode::Raw).PedestalG0Frames(0);
JFJochProtoBuf::ReceiverInput receiver_input;
*receiver_input.mutable_jungfraujoch_settings() = x;
JFModuleGainCalibration gain;
AcquisitionDeviceGroup aq_devices;
for (int i = 0; i < x.GetDataStreamsNum(); i++)
@@ -34,19 +31,19 @@ TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") {
ZMQImagePusher pusher (context, {zmq_addr});
JFJochReceiverService fpga_receiver_service(aq_devices, logger, pusher);
;
JFJochProtoBuf::ReceiverOutput receiver_output;
JFJochReceiverOutput receiver_output;
std::unique_ptr<StreamWriter> writer;
REQUIRE(x.GetImageNum() == 5);
REQUIRE_NOTHROW(writer = std::make_unique<StreamWriter>(context, logger, zmq_addr));
REQUIRE(fpga_receiver_service.Start(&grpc_context, &receiver_input, &empty).ok());
REQUIRE_NOTHROW(fpga_receiver_service.Start(x, nullptr));
REQUIRE_NOTHROW(writer->Run());
REQUIRE(fpga_receiver_service.Stop(&grpc_context, &empty, &receiver_output).ok());
CHECK(receiver_output.images_sent() == 5);
REQUIRE_NOTHROW(receiver_output = fpga_receiver_service.Stop());
CHECK(receiver_output.images_sent == 5);
// HDF5 file can be opened
std::unique_ptr<HDF5ReadOnlyFile> file;
@@ -84,9 +81,6 @@ TEST_CASE("JFJochWriterServiceTest_ZMQ","[JFJochWriter]") {
.UseInternalPacketGenerator(true)
.Mode(DetectorMode::Raw).PedestalG0Frames(0);
JFJochProtoBuf::ReceiverInput receiver_input;
*receiver_input.mutable_jungfraujoch_settings() = x;
JFModuleGainCalibration empty_gain;
AcquisitionDeviceGroup aq_devices;
@@ -99,14 +93,14 @@ TEST_CASE("JFJochWriterServiceTest_ZMQ","[JFJochWriter]") {
JFJochProtoBuf::WriterInput writer_input;
writer_input.set_zmq_receiver_address(zmq_addr);
JFJochProtoBuf::ReceiverOutput receiver_output;
JFJochReceiverOutput receiver_output;
JFJochProtoBuf::WriterOutput writer_output;
REQUIRE(x.GetImageNum() == 5);
REQUIRE(writer.Start(&grpc_context, &writer_input, &empty).ok());
REQUIRE(fpga_receiver_service.Start(&grpc_context, &receiver_input, &empty).ok());
REQUIRE(fpga_receiver_service.Stop(&grpc_context, &empty, &receiver_output).ok());
REQUIRE_NOTHROW(fpga_receiver_service.Start(x, nullptr));
REQUIRE_NOTHROW(receiver_output = fpga_receiver_service.Stop());
REQUIRE(writer.Stop(&grpc_context, &empty, &writer_output).ok());
REQUIRE(writer_output.nimages() == 5);

View File

@@ -1,34 +0,0 @@
// Copyright (2019-2023) Paul Scherrer Institute
#include <catch2/catch.hpp>
#include "../grpc/gRPCServer_Template.h"
#include "../common/Logger.h"
#include "../receiver/JFJochReceiverService.h"
#include "../grpc/JFJochReceiverClient.h"
#include "../acquisition_device/HLSSimulatedDevice.h"
#include <grpcpp/grpcpp.h>
#include "../../common/ZMQImagePusher.h"
TEST_CASE("JFJochReceiver_gRPC_server", "[gRPC]") {
DiffractionExperiment x(DetectorGeometry(4, 2));
AcquisitionDeviceGroup aq_devices;
aq_devices.AddHLSDevice(64);
ZMQContext zmq_context;
Logger logger("receiver");
ZMQImagePusher pusher(zmq_context, {"inproc://1"});
JFJochReceiverService service(aq_devices, logger, pusher);
auto server = gRPCServer("unix:receiver_test", service);
{
JFJochReceiverClient client;
REQUIRE_NOTHROW(client.Connect("unix:receiver_test"));
}
server->Shutdown();
}