JFJochServices: ProtoBuf JFJochFullBrokerStatus becomes C++ struct + remove writer stats

This commit is contained in:
2023-11-13 14:26:14 +01:00
parent 13d22493c1
commit 3725ec5a73
7 changed files with 60 additions and 110 deletions
+12 -19
View File
@@ -3,11 +3,6 @@
#include "JFJochServices.h"
#include "../common/JFJochException.h"
uint64_t current_time_ms() {
auto curr_time = std::chrono::system_clock::now();
return std::chrono::duration_cast<std::chrono::milliseconds>(curr_time.time_since_epoch()).count();
}
JFJochServices::JFJochServices(Logger &in_logger) : logger(in_logger) {}
void JFJochServices::Start(const DiffractionExperiment& experiment, const JFCalibration &calibration) {
@@ -46,24 +41,24 @@ void JFJochServices::On(const DiffractionExperiment &x) {
logger.Info(" ... done");
}
JFJochProtoBuf::BrokerFullStatus JFJochServices::Stop(const JFCalibration &calibration) {
JFJochProtoBuf::BrokerFullStatus ret;
JFJochServicesOutput JFJochServices::Stop(const JFCalibration &calibration) {
JFJochServicesOutput ret;
std::unique_ptr<JFJochException> exception;
try {
logger.Info("Wait for receiver done");
*ret.mutable_receiver() = receiver.Stop();
ret.receiver_output = receiver.Stop();
logger.Info(" ... Receiver efficiency: {} % Max delay: {} Compression ratio {}x",
static_cast<int>(ret.receiver().efficiency()*100.0),
ret.receiver().max_receive_delay(),
static_cast<int>(std::round(ret.receiver().compressed_ratio())));
if (ret.receiver().efficiency() < 1.0) {
for (int i = 0; i < ret.receiver().received_packets_size(); i++) {
if (ret.receiver().received_packets(i) != ret.receiver().expected_packets(i))
static_cast<int>(ret.receiver_output.efficiency()*100.0),
ret.receiver_output.max_receive_delay(),
static_cast<int>(std::round(ret.receiver_output.compressed_ratio())));
if (ret.receiver_output.efficiency() < 1.0) {
for (int i = 0; i < ret.receiver_output.received_packets_size(); i++) {
if (ret.receiver_output.received_packets(i) != ret.receiver_output.expected_packets(i))
logger.Info(" ... Module: {} Packets received: {} out of {}", i,
ret.receiver().received_packets(i), ret.receiver().expected_packets(i));
ret.receiver_output.received_packets(i), ret.receiver_output.expected_packets(i));
}
}
} catch (const JFJochException &e) {
@@ -77,11 +72,9 @@ JFJochProtoBuf::BrokerFullStatus JFJochServices::Stop(const JFCalibration &calib
try {
auto stats = writer.Stop();
logger.Info(" ... finished with success");
for (int i = 0; i < stats.size(); i++) {
*ret.add_writer() = stats[i];
logger.Info("Writer {}: Images = {} Throughput = {:.0f} MB/s Frame rate = {:.0f} Hz",
for (int i = 0; i < stats.size(); i++)
logger.Info("Writer {}: Images = {} Throughput = {:.0f} MB/s Frame rate = {:.0f} Hz",
i, stats[i].nimages(), stats[i].performance_mbs(), stats[i].performance_hz());
}
} catch (JFJochException &e) {
logger.Error(" ... finished with error {}",e.what());
exception = std::make_unique<JFJochException>(e);
+5 -1
View File
@@ -10,6 +10,10 @@
#include "../grpc/JFJochWriterGroupClient.h"
#include "../grpc/JFJochDetectorClient.h"
struct JFJochServicesOutput {
JFJochProtoBuf::ReceiverOutput receiver_output;
};
class JFJochServices {
JFJochReceiverClient receiver;
JFJochWriterGroupClient writer;
@@ -23,7 +27,7 @@ public:
void On(const DiffractionExperiment& experiment);
void Off();
void Start(const DiffractionExperiment& experiment, const JFCalibration &calibration);
JFJochProtoBuf::BrokerFullStatus Stop(const JFCalibration &calibration);
JFJochServicesOutput Stop(const JFCalibration &calibration);
void Abort();
void Cancel();
void Trigger();
+12 -33
View File
@@ -109,7 +109,7 @@ void JFJochStateMachine::TakePedestalInternalG0(std::unique_lock<std::mutex> &ul
ul.lock();
// SetFullMeasurementOutput(pedestal_output);
ImportPedestalG0(pedestal_output.receiver());
ImportPedestalG0(pedestal_output.receiver_output);
}
state = JFJochState::Idle;
}
@@ -134,7 +134,7 @@ void JFJochStateMachine::TakePedestalInternalG1(std::unique_lock<std::mutex> &ul
ul.lock();
// SetFullMeasurementOutput(pedestal_output);
ImportPedestal(pedestal_output.receiver(), 1, storage_cell);
ImportPedestal(pedestal_output.receiver_output, 1, storage_cell);
}
state = JFJochState::Idle;
}
@@ -159,7 +159,7 @@ void JFJochStateMachine::TakePedestalInternalG2(std::unique_lock<std::mutex> &ul
ul.lock();
// SetFullMeasurementOutput(pedestal_output);
ImportPedestal(pedestal_output.receiver(), 2, storage_cell);
ImportPedestal(pedestal_output.receiver_output, 2, storage_cell);
}
state = JFJochState::Idle;
}
@@ -329,14 +329,8 @@ JFJochStateMachine::~JFJochStateMachine() {
} catch (...) {}
}
JFJochProtoBuf::BrokerFullStatus JFJochStateMachine::GetFullMeasurementOutput() const {
void JFJochStateMachine::SetFullMeasurementOutput(const JFJochServicesOutput &output) {
std::unique_lock<std::mutex> ul(last_receiver_output_mutex);
return last_receiver_output;
}
void JFJochStateMachine::SetFullMeasurementOutput(JFJochProtoBuf::BrokerFullStatus &output) {
std::unique_lock<std::mutex> ul(last_receiver_output_mutex);
last_receiver_output = output;
auto tmp = JFJochProtoBuf::MeasurementStatistics(); // reset last measurement statistics
@@ -345,29 +339,14 @@ void JFJochStateMachine::SetFullMeasurementOutput(JFJochProtoBuf::BrokerFullStat
tmp.set_detector_height(experiment.GetYPixelsNum());
tmp.set_detector_pixel_depth(experiment.GetPixelDepth());
if (last_receiver_output.has_receiver()) {
tmp.set_compression_ratio(output.receiver().compressed_ratio());
tmp.set_collection_efficiency(output.receiver().efficiency());
tmp.set_images_collected(output.receiver().images_sent());
tmp.set_cancelled(output.receiver().cancelled());
tmp.set_max_image_number_sent(output.receiver().max_image_number_sent());
tmp.set_max_receive_delay(output.receiver().max_receive_delay());
tmp.set_indexing_rate(output.receiver().indexing_rate());
tmp.set_bkg_estimate(output.receiver().bkg_estimate());
}
double writer_perf = 0.0;
int64_t images_written = 0;
if (last_receiver_output.writer_size() > 0) {
for (const auto &i: output.writer()) {
writer_perf += i.performance_mbs();
images_written += i.nimages();
for (const auto &f: i.file_statistics())
*tmp.add_file_statistics() = f;
}
}
tmp.set_writer_performance_mbs(writer_perf);
tmp.set_images_written(images_written);
tmp.set_compression_ratio(output.receiver_output.compressed_ratio());
tmp.set_collection_efficiency(output.receiver_output.efficiency());
tmp.set_images_collected(output.receiver_output.images_sent());
tmp.set_cancelled(output.receiver_output.cancelled());
tmp.set_max_image_number_sent(output.receiver_output.max_image_number_sent());
tmp.set_max_receive_delay(output.receiver_output.max_receive_delay());
tmp.set_indexing_rate(output.receiver_output.indexing_rate());
tmp.set_bkg_estimate(output.receiver_output.bkg_estimate());
measurement_statistics = tmp;
}
+1 -3
View File
@@ -38,12 +38,10 @@ class JFJochStateMachine {
JFJochProtoBuf::JFCalibrationStatistics calibration_statistics;
mutable std::mutex last_receiver_output_mutex;
JFJochProtoBuf::BrokerFullStatus last_receiver_output;
std::optional<JFJochProtoBuf::MeasurementStatistics> measurement_statistics;
void SetFullMeasurementOutput(JFJochProtoBuf::BrokerFullStatus &output);
void SetFullMeasurementOutput(const JFJochServicesOutput &output);
void ClearMeasurementStatistics();
void ClearAndSetMeasurementStatistics();
JFJochProtoBuf::BrokerFullStatus GetFullMeasurementOutput() const;
mutable std::mutex data_processing_settings_mutex;
DataProcessingSettings data_processing_settings;
-11
View File
@@ -388,17 +388,12 @@ message MeasurementStatistics {
bool cancelled = 6;
int64 max_receive_delay = 7;
float writer_performance_MBs = 8;
int64 images_written = 9;
float indexing_rate = 10;
int64 detector_width = 12;
int64 detector_height = 13;
int64 detector_pixel_depth = 14;
repeated DataFileStatistics file_statistics = 15;
float bkg_estimate = 16;
}
@@ -409,12 +404,6 @@ message BrokerStatus {
float receiver_send_buffers_avail = 4;
}
message BrokerFullStatus {
ReceiverOutput receiver = 1;
DetectorOutput detector = 2;
repeated WriterOutput writer = 3;
}
message DetectorListElement {
string description = 1;
int64 nmodules = 2;
+30 -32
View File
File diff suppressed because one or more lines are too long
-11
View File
@@ -90,14 +90,12 @@ TEST_CASE("JFJochIntegrationTest_ZMQ", "[JFJochReceiver]") {
REQUIRE(statistics.collection_efficiency() == 1.0);
REQUIRE(statistics.images_collected() == 5);
REQUIRE(statistics.images_written() == 5);
REQUIRE(statistics.max_image_number_sent() == 4);
REQUIRE(!statistics.cancelled());
REQUIRE(statistics.file_prefix() == "integration_test");
REQUIRE(statistics.detector_width() == 2068);
REQUIRE(statistics.detector_height() == 2164);
REQUIRE(statistics.detector_pixel_depth() == 2);
REQUIRE(statistics.file_statistics_size() == 2);
fpga_receiver_server->Shutdown();
writer_server->Shutdown();
}
@@ -182,14 +180,12 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_save_calibration", "[JFJochReceiver]") {
REQUIRE(statistics.collection_efficiency() == 1.0);
REQUIRE(statistics.images_collected() == 5);
REQUIRE(statistics.images_written() == 5);
REQUIRE(statistics.max_image_number_sent() == 4);
REQUIRE(!statistics.cancelled());
REQUIRE(statistics.file_prefix() == "integration_test_with_calibration");
REQUIRE(statistics.detector_width() == 2068);
REQUIRE(statistics.detector_height() == 2164);
REQUIRE(statistics.detector_pixel_depth() == 2);
REQUIRE(statistics.file_statistics_size() == 2);
fpga_receiver_server->Shutdown();
writer_server->Shutdown();
}
@@ -273,7 +269,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_2DataStreams_4Devices", "[JFJochReceiver]")
REQUIRE(statistics.collection_efficiency() == 1.0);
REQUIRE(statistics.images_collected() == 5);
REQUIRE(statistics.images_written() == 5);
REQUIRE(statistics.max_image_number_sent() == 4);
REQUIRE(!statistics.cancelled());
REQUIRE(statistics.file_prefix() == "integration_test");
@@ -367,7 +362,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_RAW", "[JFJochReceiver]") {
REQUIRE(statistics.collection_efficiency() == 1.0);
REQUIRE(statistics.images_collected() == 5);
REQUIRE(statistics.images_written() == 5);
REQUIRE(statistics.file_prefix() == "integration_raw_test");
REQUIRE(statistics.detector_width() == 1024);
REQUIRE(statistics.detector_height() == 8 * 512);
@@ -462,8 +456,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_3Writers", "[JFJochReceiver]") {
REQUIRE(statistics.collection_efficiency() == 1.0);
REQUIRE(statistics.images_collected() == nimages);
REQUIRE(statistics.images_written() == nimages);
REQUIRE(statistics.file_statistics_size() == 5);
fpga_receiver_server->Shutdown();
writer_server_0->Shutdown();
@@ -539,7 +531,6 @@ TEST_CASE("JFJochIntegrationTest_Cancel", "[JFJochReceiver]") {
REQUIRE(statistics.collection_efficiency() == 0.5);
REQUIRE(statistics.images_collected() == 5);
REQUIRE(statistics.images_written() == 5);
REQUIRE(statistics.max_image_number_sent() == 4);
REQUIRE(statistics.cancelled());
@@ -645,7 +636,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") {
REQUIRE(statistics.collection_efficiency() == 1.0);
REQUIRE(statistics.images_collected() == 5);
REQUIRE(statistics.images_written() == 5);
fpga_receiver_server->Shutdown();
writer_server->Shutdown();
@@ -747,7 +737,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]"
REQUIRE(statistics.collection_efficiency() == 1.0);
REQUIRE(statistics.images_collected() == 5);
REQUIRE(statistics.images_written() == 0);
fpga_receiver_server->Shutdown();
writer_server->Shutdown();
}