JFJochReceiver: Save receiver_available_send_buffers and receiver_aq_dev_delay in HDF5 file

This commit is contained in:
2023-05-18 12:18:38 +02:00
parent e7f012dd90
commit 1df16b1a1e
8 changed files with 60 additions and 46 deletions
+3
View File
@@ -31,6 +31,9 @@ struct DataMessage {
uint64_t bunch_id;
uint32_t jf_info;
float receiver_available_send_buffers;
int64_t receiver_aq_dev_delay;
uint64_t timestamp;
uint32_t timestamp_base;
@@ -367,6 +367,10 @@ void JFJochFrameDeserializer::ProcessImageMessageUserDataElement(CborValue &valu
GetCBORFloatArray(map_value, data_message.indexing_lattice);
else if (key == "jf_info")
data_message.jf_info = GetCBORUInt(map_value) & UINT32_MAX;
else if (key == "receiver_available_send_buffers")
data_message.receiver_available_send_buffers = GetCBORFloat(map_value);
else if (key == "receiver_aq_dev_delay")
data_message.receiver_aq_dev_delay = GetCBORInt(map_value);
else if (key == "storage_cell")
data_message.storage_cell = GetCBORUInt(map_value) & UINT32_MAX;
else if (key == "bunch_id")
+3 -1
View File
@@ -429,7 +429,7 @@ void JFJochFrameSerializer::SerializeImage(const DataMessage& message) {
message.timestamp_base);
cborErr(cbor_encode_text_stringz(&mapEncoder, "user_data"));
cborErr(cbor_encoder_create_map(&mapEncoder, &userDataMapEncoder, 7));
cborErr(cbor_encoder_create_map(&mapEncoder, &userDataMapEncoder, 9));
CBOR_ENC(userDataMapEncoder, "spots", message.spots);
CBOR_ENC(userDataMapEncoder, "rad_int_profile", message.rad_int_profile);
@@ -437,6 +437,8 @@ void JFJochFrameSerializer::SerializeImage(const DataMessage& message) {
CBOR_ENC(userDataMapEncoder, "indexing_lattice", message.indexing_lattice);
CBOR_ENC(userDataMapEncoder, "bunch_id", message.bunch_id);
CBOR_ENC(userDataMapEncoder, "jf_info", (uint64_t) message.jf_info);
CBOR_ENC(userDataMapEncoder, "receiver_available_send_buffers", message.receiver_available_send_buffers);
CBOR_ENC(userDataMapEncoder, "receiver_aq_dev_delay", message.receiver_aq_dev_delay);
CBOR_ENC(userDataMapEncoder, "storage_cell", (uint64_t) message.storage_cell);
cborErr(cbor_encoder_close_container(&mapEncoder, &userDataMapEncoder));
+25 -37
View File
@@ -185,7 +185,7 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings,
measurement = std::async(std::launch::async, &JFJochReceiver::FinalizeMeasurement, this);
}
int64_t JFJochReceiver::AcquireThread(uint16_t data_stream) {
void JFJochReceiver::AcquireThread(uint16_t data_stream) {
PinThreadToDevice(data_stream);
try {
@@ -195,7 +195,6 @@ int64_t JFJochReceiver::AcquireThread(uint16_t data_stream) {
} catch (const JFJochException &e) {
Cancel(e);
data_acquisition_ready.count_down();
return -1;
}
data_acquisition_ready.count_down();
@@ -208,10 +207,9 @@ int64_t JFJochReceiver::AcquireThread(uint16_t data_stream) {
}
logger.Info("Device thread {} done", data_stream);
return -1;
}
int64_t JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t module_number, uint16_t storage_cell) {
void JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t module_number, uint16_t storage_cell) {
PinThreadToDevice(data_stream);
JFPedestalCalc pedestal_calc(experiment);
@@ -219,9 +217,6 @@ int64_t JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t mod
bool storage_cell_G1G2 = (experiment.GetStorageCellNumber() > 1)
&& ((experiment.GetDetectorMode() == DetectorMode::PedestalG1)
|| (experiment.GetDetectorMode() == DetectorMode::PedestalG2));
int64_t delay = 0;
size_t staring_frame;
size_t frame_stride;
@@ -253,7 +248,7 @@ int64_t JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t mod
acquisition_device[data_stream]->FrameBufferRelease(frame, module_number);
delay = std::max(delay, acquisition_device[data_stream]->CalculateDelay(frame, module_number));
UpdateMaxDelay(acquisition_device[data_stream]->CalculateDelay(frame, module_number));
UpdateMaxImage(frame);
}
@@ -266,13 +261,10 @@ int64_t JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t mod
} catch (const JFJochException &e) { Cancel(e); }
logger.Info("Pedestal calculation thread for data stream {} module {} storage cell {} -> header {} done",
data_stream, module_number, storage_cell, storage_cell_header);
return delay;
}
int64_t JFJochReceiver::MiniSummationThread(int d, int m, size_t image_number, bool &send_image,
void JFJochReceiver::MiniSummationThread(int d, int m, size_t image_number, bool &send_image,
FrameTransformation &transformation, DataMessage &message) {
int64_t max_thread_delay = 0;
for (int j = 0; j < experiment.GetSummation(); j++) {
size_t frame_number = image_number * experiment.GetSummation() + j;
acquisition_device[d]->WaitForFrame(frame_number + 2);
@@ -301,13 +293,11 @@ int64_t JFJochReceiver::MiniSummationThread(int d, int m, size_t image_number, b
transformation.ProcessModule(src, m, d);
acquisition_device[d]->FrameBufferRelease(frame_number, m);
max_thread_delay = std::max(max_thread_delay, acquisition_device[d]->CalculateDelay(frame_number));
UpdateMaxDelay(acquisition_device[d]->CalculateDelay(frame_number, m));
}
return max_thread_delay;
}
int64_t JFJochReceiver::FrameTransformationThread() {
void JFJochReceiver::FrameTransformationThread() {
FrameTransformation transformation(experiment);
std::unique_ptr<GPUImageAnalysis> spot_finder;
@@ -327,13 +317,11 @@ int64_t JFJochReceiver::FrameTransformationThread() {
frame_transformation_ready.count_down();
logger.Error("Error creating GPU spot finder");
Cancel(e);
return -1;
}
std::vector<char> writer_buffer(experiment.GetMaxCompressedSize());
std::vector<int16_t> conversion_buffer(RAW_MODULE_SIZE);
int64_t max_thread_delay = 0;
uint64_t image_number;
frame_transformation_ready.count_down();
@@ -368,15 +356,14 @@ int64_t JFJochReceiver::FrameTransformationThread() {
}
if (experiment.GetSummation() >= threaded_summation_threshold) {
std::vector<std::future<int64_t>> mini_summation_threads;
std::vector<std::future<void>> mini_summation_threads;
for (int d = 0; d < ndatastreams; d++)
for (int m = 0; m < experiment.GetModulesNum(d); m++)
mini_summation_threads.emplace_back(std::async(std::launch::async, &JFJochReceiver::MiniSummationThread,
this, d, m, image_number,
std::ref(send_image), std::ref(transformation),
std::ref(message)));
for (auto &i: mini_summation_threads)
max_thread_delay = std::max(max_thread_delay, i.get());
message.receiver_aq_dev_delay = max_delay;
} else {
for (int j = 0; j < experiment.GetSummation(); j++) {
size_t frame_number = image_number * experiment.GetSummation() + j;
@@ -410,8 +397,10 @@ int64_t JFJochReceiver::FrameTransformationThread() {
acquisition_device[d]->FrameBufferRelease(frame_number, m);
}
max_thread_delay = std::max(max_thread_delay,
acquisition_device[d]->CalculateDelay(frame_number));
auto delay = acquisition_device[d]->CalculateDelay(frame_number);
UpdateMaxDelay(delay);
if (delay > message.receiver_aq_dev_delay)
message.receiver_aq_dev_delay = delay;
}
}
}
@@ -484,6 +473,8 @@ int64_t JFJochReceiver::FrameTransformationThread() {
}
if (push_images_to_writer) {
message.receiver_available_send_buffers = GetAvailableSendBuffers();
auto send_buffer_handle = send_buffer_avail.GetBlocking();
auto ptr = send_buffer.data() + send_buffer_size * send_buffer_handle;
JFJochFrameSerializer serializer(ptr, send_buffer_size);
@@ -508,8 +499,6 @@ int64_t JFJochReceiver::FrameTransformationThread() {
spot_finder->UnregisterBuffer();
logger.Debug("Sum&compression thread done");
return max_thread_delay;
}
void JFJochReceiver::GetStatistics(JFJochProtoBuf::ReceiverOutput &ret) const {
@@ -539,10 +528,7 @@ void JFJochReceiver::GetStatistics(JFJochProtoBuf::ReceiverOutput &ret) const {
} else
ret.set_compressed_ratio(0);
if (!max_delay.empty())
ret.set_max_receive_delay(*std::max_element(max_delay.begin(), max_delay.end()));
else
ret.set_max_receive_delay(0);
ret.set_max_receive_delay(max_delay);
ret.set_images_sent(images_sent);
ret.set_start_time_ms(std::chrono::duration_cast<std::chrono::milliseconds>(start_time.time_since_epoch()).count());
@@ -598,10 +584,8 @@ float JFJochReceiver::GetProgress() const {
void JFJochReceiver::FinalizeMeasurement() {
if (!frame_transformation_futures.empty()) {
for (auto &future: frame_transformation_futures) {
auto val = future.get();
if (val >= 0) max_delay.push_back(val);
}
for (auto &future: frame_transformation_futures)
future.get();
logger.Info("All processing threads done");
}
@@ -635,10 +619,8 @@ void JFJochReceiver::FinalizeMeasurement() {
end_time = std::chrono::system_clock::now();
for (auto &future : data_acquisition_futures) {
auto val = future.get();
if (val >= 0) max_delay.push_back(val);
}
for (auto &future : data_acquisition_futures)
future.get();
for (int i = 0; i < send_buffer_count; i++)
send_buffer_avail.GetBlocking();
@@ -750,6 +732,12 @@ void JFJochReceiver::UpdateMaxImage(int64_t image_number) {
max_image_number_sent = image_number;
}
void JFJochReceiver::UpdateMaxDelay(int64_t delay) {
std::unique_lock<std::mutex> ul(max_delay_mutex);
if (delay > max_delay)
max_delay = delay;
}
float JFJochReceiver::GetAvailableSendBuffers() const {
return static_cast<float>(send_buffer_avail.Size()) / static_cast<float>(send_buffer_count);
}
+10 -7
View File
@@ -39,8 +39,8 @@ class JFJochReceiver {
std::vector<uint8_t> one_byte_mask;
Logger &logger;
std::vector<std::future<int64_t>> frame_transformation_futures;
std::vector<std::future<int64_t>> data_acquisition_futures;
std::vector<std::future<void>> frame_transformation_futures;
std::vector<std::future<void>> data_acquisition_futures;
std::unique_ptr<RadialIntegrationMapping> rad_int_mapping;
std::unique_ptr<RadialIntegrationProfile> rad_int_profile;
@@ -62,7 +62,9 @@ class JFJochReceiver {
std::vector<AcquisitionDevice *> &acquisition_device;
uint16_t ndatastreams{0};
std::vector<size_t> max_delay;
int64_t max_delay = 0;
std::mutex max_delay_mutex;
std::atomic<size_t> compressed_size{0};
std::atomic<size_t> images_sent{0};
@@ -99,16 +101,17 @@ class JFJochReceiver {
void PinThreadToDevice(uint16_t data_stream);
void PrepareConversionOnCPU();
int64_t AcquireThread(uint16_t data_stream);
int64_t FrameTransformationThread();
int64_t MeasurePedestalThread(uint16_t data_stream, uint16_t module_number, uint16_t storage_cell);
int64_t MiniSummationThread(int d, int m, size_t image_number, bool &send_image,
void AcquireThread(uint16_t data_stream);
void FrameTransformationThread();
void MeasurePedestalThread(uint16_t data_stream, uint16_t module_number, uint16_t storage_cell);
void MiniSummationThread(int d, int m, size_t image_number, bool &send_image,
FrameTransformation &transformation, DataMessage &message);
void Cancel(const JFJochException &e);
void FinalizeMeasurement();
JFJochProtoBuf::DataProcessingSettings GetDataProcessingSettings();
void UpdateMaxImage(int64_t image_number);
void UpdateMaxDelay(int64_t delay);
public:
constexpr static const int64_t threaded_summation_threshold = 50;
+5
View File
@@ -219,6 +219,8 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
.indexing_result = 1,
.bunch_id = UINT64_MAX,
.jf_info = UINT32_MAX,
.receiver_available_send_buffers = 786.546,
.receiver_aq_dev_delay = 2323,
.timestamp = 1ul<<27 | 1ul <<35,
.storage_cell = 0xF,
.exptime = 1000,
@@ -251,6 +253,8 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
REQUIRE(image_array.timestamp == message.timestamp);
REQUIRE(image_array.storage_cell == message.storage_cell);
REQUIRE(image_array.exptime == message.exptime);
REQUIRE(image_array.receiver_available_send_buffers == image_array.receiver_available_send_buffers);
REQUIRE(image_array.receiver_aq_dev_delay == image_array.receiver_aq_dev_delay);
}
TEST_CASE("CBORSerialize_Image_2", "[CBOR]") {
@@ -296,6 +300,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
REQUIRE(image_array.image.channel == "default");
REQUIRE(image_array.image.size == test.size());
REQUIRE(image_array.indexing_result == message.indexing_result);
REQUIRE(image_array.receiver_available_send_buffers == message.receiver_available_send_buffers);
REQUIRE(image_array.number == 480);
REQUIRE(memcmp(image_array.image.data, test.data(), test.size()) == 0);
}
+8 -1
View File
@@ -45,7 +45,8 @@ HDF5DataFile::~HDF5DataFile() {
group_exp.SaveVector("timestamp", timestamp);
group_exp.SaveVector("storage_cell", storage_cell);
group_exp.SaveVector("exptime", exptime);
group_exp.SaveVector("receiver_available_send_buffers", receiver_available_send_buffers);
group_exp.SaveVector("receiver_aq_dev_delay", receiver_aq_dev_delay);
}
rad_int_group.reset();
result_group.reset();
@@ -85,6 +86,8 @@ void HDF5DataFile::CreateFile() {
indexing_result.resize(1);
bunch_id.resize(1);
jf_info.resize(1);
receiver_available_send_buffers.resize(1);
receiver_aq_dev_delay.resize(1);
timestamp.resize(1);
storage_cell.resize(1);
exptime.resize(1);
@@ -108,6 +111,8 @@ void HDF5DataFile::Write(const DataMessage &msg, uint64_t image_number) {
indexing_result.resize(max_image_number + 1);
bunch_id.resize(max_image_number + 1);
jf_info.resize(max_image_number + 1);
receiver_available_send_buffers.resize(max_image_number + 1);
receiver_aq_dev_delay.resize(max_image_number + 1);
timestamp.resize(max_image_number + 1);
exptime.resize(max_image_number + 1);
storage_cell.resize(max_image_number + 1);
@@ -137,6 +142,8 @@ void HDF5DataFile::Write(const DataMessage &msg, uint64_t image_number) {
indexing_result[image_number] = msg.indexing_result;
bunch_id[image_number] = msg.bunch_id;
jf_info[image_number] = msg.jf_info;
receiver_available_send_buffers[image_number] = msg.receiver_available_send_buffers;
receiver_aq_dev_delay[image_number] = msg.receiver_aq_dev_delay;
timestamp[image_number] = msg.timestamp;
storage_cell[image_number] = msg.storage_cell;
exptime[image_number] = msg.exptime;
+2
View File
@@ -45,6 +45,8 @@ class HDF5DataFile {
std::vector<uint64_t> timestamp;
std::vector<uint32_t> storage_cell;
std::vector<uint32_t> exptime;
std::vector<float> receiver_available_send_buffers;
std::vector<int64_t> receiver_aq_dev_delay;
std::vector<float> rad_int_bin_to_q;
std::vector<float> rad_int_file_avg;