From 1df16b1a1e6a32d80875783ea8d2631e438edfd9 Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Thu, 18 May 2023 12:18:38 +0200 Subject: [PATCH] JFJochReceiver: Save receiver_available_send_buffers and receiver_aq_dev_delay in HDF5 file --- frame_serialize/ImageMessage.h | 3 + frame_serialize/JFJochFrameDeserializer.cpp | 4 ++ frame_serialize/JFJochFrameSerializer.cpp | 4 +- receiver/JFJochReceiver.cpp | 62 +++++++++------------ receiver/JFJochReceiver.h | 17 +++--- tests/CBORTest.cpp | 5 ++ writer/HDF5DataFile.cpp | 9 ++- writer/HDF5DataFile.h | 2 + 8 files changed, 60 insertions(+), 46 deletions(-) diff --git a/frame_serialize/ImageMessage.h b/frame_serialize/ImageMessage.h index 407d3fdf..abf6812e 100644 --- a/frame_serialize/ImageMessage.h +++ b/frame_serialize/ImageMessage.h @@ -31,6 +31,9 @@ struct DataMessage { uint64_t bunch_id; uint32_t jf_info; + float receiver_available_send_buffers; + int64_t receiver_aq_dev_delay; + uint64_t timestamp; uint32_t timestamp_base; diff --git a/frame_serialize/JFJochFrameDeserializer.cpp b/frame_serialize/JFJochFrameDeserializer.cpp index 3bd4737b..d1a5acf2 100644 --- a/frame_serialize/JFJochFrameDeserializer.cpp +++ b/frame_serialize/JFJochFrameDeserializer.cpp @@ -367,6 +367,10 @@ void JFJochFrameDeserializer::ProcessImageMessageUserDataElement(CborValue &valu GetCBORFloatArray(map_value, data_message.indexing_lattice); else if (key == "jf_info") data_message.jf_info = GetCBORUInt(map_value) & UINT32_MAX; + else if (key == "receiver_available_send_buffers") + data_message.receiver_available_send_buffers = GetCBORFloat(map_value); + else if (key == "receiver_aq_dev_delay") + data_message.receiver_aq_dev_delay = GetCBORInt(map_value); else if (key == "storage_cell") data_message.storage_cell = GetCBORUInt(map_value) & UINT32_MAX; else if (key == "bunch_id") diff --git a/frame_serialize/JFJochFrameSerializer.cpp b/frame_serialize/JFJochFrameSerializer.cpp index ecaa7dfa..5db632fd 100644 --- a/frame_serialize/JFJochFrameSerializer.cpp +++ b/frame_serialize/JFJochFrameSerializer.cpp @@ -429,7 +429,7 @@ void JFJochFrameSerializer::SerializeImage(const DataMessage& message) { message.timestamp_base); cborErr(cbor_encode_text_stringz(&mapEncoder, "user_data")); - cborErr(cbor_encoder_create_map(&mapEncoder, &userDataMapEncoder, 7)); + cborErr(cbor_encoder_create_map(&mapEncoder, &userDataMapEncoder, 9)); CBOR_ENC(userDataMapEncoder, "spots", message.spots); CBOR_ENC(userDataMapEncoder, "rad_int_profile", message.rad_int_profile); @@ -437,6 +437,8 @@ void JFJochFrameSerializer::SerializeImage(const DataMessage& message) { CBOR_ENC(userDataMapEncoder, "indexing_lattice", message.indexing_lattice); CBOR_ENC(userDataMapEncoder, "bunch_id", message.bunch_id); CBOR_ENC(userDataMapEncoder, "jf_info", (uint64_t) message.jf_info); + CBOR_ENC(userDataMapEncoder, "receiver_available_send_buffers", message.receiver_available_send_buffers); + CBOR_ENC(userDataMapEncoder, "receiver_aq_dev_delay", message.receiver_aq_dev_delay); CBOR_ENC(userDataMapEncoder, "storage_cell", (uint64_t) message.storage_cell); cborErr(cbor_encoder_close_container(&mapEncoder, &userDataMapEncoder)); diff --git a/receiver/JFJochReceiver.cpp b/receiver/JFJochReceiver.cpp index 8a35c7ea..eec3e332 100644 --- a/receiver/JFJochReceiver.cpp +++ b/receiver/JFJochReceiver.cpp @@ -185,7 +185,7 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings, measurement = std::async(std::launch::async, &JFJochReceiver::FinalizeMeasurement, this); } -int64_t JFJochReceiver::AcquireThread(uint16_t data_stream) { +void JFJochReceiver::AcquireThread(uint16_t data_stream) { PinThreadToDevice(data_stream); try { @@ -195,7 +195,6 @@ int64_t JFJochReceiver::AcquireThread(uint16_t data_stream) { } catch (const JFJochException &e) { Cancel(e); data_acquisition_ready.count_down(); - return -1; } data_acquisition_ready.count_down(); @@ -208,10 +207,9 @@ int64_t JFJochReceiver::AcquireThread(uint16_t data_stream) { } logger.Info("Device thread {} done", data_stream); - return -1; } -int64_t JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t module_number, uint16_t storage_cell) { +void JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t module_number, uint16_t storage_cell) { PinThreadToDevice(data_stream); JFPedestalCalc pedestal_calc(experiment); @@ -219,9 +217,6 @@ int64_t JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t mod bool storage_cell_G1G2 = (experiment.GetStorageCellNumber() > 1) && ((experiment.GetDetectorMode() == DetectorMode::PedestalG1) || (experiment.GetDetectorMode() == DetectorMode::PedestalG2)); - - int64_t delay = 0; - size_t staring_frame; size_t frame_stride; @@ -253,7 +248,7 @@ int64_t JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t mod acquisition_device[data_stream]->FrameBufferRelease(frame, module_number); - delay = std::max(delay, acquisition_device[data_stream]->CalculateDelay(frame, module_number)); + UpdateMaxDelay(acquisition_device[data_stream]->CalculateDelay(frame, module_number)); UpdateMaxImage(frame); } @@ -266,13 +261,10 @@ int64_t JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t mod } catch (const JFJochException &e) { Cancel(e); } logger.Info("Pedestal calculation thread for data stream {} module {} storage cell {} -> header {} done", data_stream, module_number, storage_cell, storage_cell_header); - return delay; } -int64_t JFJochReceiver::MiniSummationThread(int d, int m, size_t image_number, bool &send_image, +void JFJochReceiver::MiniSummationThread(int d, int m, size_t image_number, bool &send_image, FrameTransformation &transformation, DataMessage &message) { - int64_t max_thread_delay = 0; - for (int j = 0; j < experiment.GetSummation(); j++) { size_t frame_number = image_number * experiment.GetSummation() + j; acquisition_device[d]->WaitForFrame(frame_number + 2); @@ -301,13 +293,11 @@ int64_t JFJochReceiver::MiniSummationThread(int d, int m, size_t image_number, b transformation.ProcessModule(src, m, d); acquisition_device[d]->FrameBufferRelease(frame_number, m); - - max_thread_delay = std::max(max_thread_delay, acquisition_device[d]->CalculateDelay(frame_number)); + UpdateMaxDelay(acquisition_device[d]->CalculateDelay(frame_number, m)); } - return max_thread_delay; } -int64_t JFJochReceiver::FrameTransformationThread() { +void JFJochReceiver::FrameTransformationThread() { FrameTransformation transformation(experiment); std::unique_ptr spot_finder; @@ -327,13 +317,11 @@ int64_t JFJochReceiver::FrameTransformationThread() { frame_transformation_ready.count_down(); logger.Error("Error creating GPU spot finder"); Cancel(e); - return -1; } std::vector writer_buffer(experiment.GetMaxCompressedSize()); std::vector conversion_buffer(RAW_MODULE_SIZE); - int64_t max_thread_delay = 0; uint64_t image_number; frame_transformation_ready.count_down(); @@ -368,15 +356,14 @@ int64_t JFJochReceiver::FrameTransformationThread() { } if (experiment.GetSummation() >= threaded_summation_threshold) { - std::vector> mini_summation_threads; + std::vector> mini_summation_threads; for (int d = 0; d < ndatastreams; d++) for (int m = 0; m < experiment.GetModulesNum(d); m++) mini_summation_threads.emplace_back(std::async(std::launch::async, &JFJochReceiver::MiniSummationThread, this, d, m, image_number, std::ref(send_image), std::ref(transformation), std::ref(message))); - for (auto &i: mini_summation_threads) - max_thread_delay = std::max(max_thread_delay, i.get()); + message.receiver_aq_dev_delay = max_delay; } else { for (int j = 0; j < experiment.GetSummation(); j++) { size_t frame_number = image_number * experiment.GetSummation() + j; @@ -410,8 +397,10 @@ int64_t JFJochReceiver::FrameTransformationThread() { acquisition_device[d]->FrameBufferRelease(frame_number, m); } - max_thread_delay = std::max(max_thread_delay, - acquisition_device[d]->CalculateDelay(frame_number)); + auto delay = acquisition_device[d]->CalculateDelay(frame_number); + UpdateMaxDelay(delay); + if (delay > message.receiver_aq_dev_delay) + message.receiver_aq_dev_delay = delay; } } } @@ -484,6 +473,8 @@ int64_t JFJochReceiver::FrameTransformationThread() { } if (push_images_to_writer) { + message.receiver_available_send_buffers = GetAvailableSendBuffers(); + auto send_buffer_handle = send_buffer_avail.GetBlocking(); auto ptr = send_buffer.data() + send_buffer_size * send_buffer_handle; JFJochFrameSerializer serializer(ptr, send_buffer_size); @@ -508,8 +499,6 @@ int64_t JFJochReceiver::FrameTransformationThread() { spot_finder->UnregisterBuffer(); logger.Debug("Sum&compression thread done"); - - return max_thread_delay; } void JFJochReceiver::GetStatistics(JFJochProtoBuf::ReceiverOutput &ret) const { @@ -539,10 +528,7 @@ void JFJochReceiver::GetStatistics(JFJochProtoBuf::ReceiverOutput &ret) const { } else ret.set_compressed_ratio(0); - if (!max_delay.empty()) - ret.set_max_receive_delay(*std::max_element(max_delay.begin(), max_delay.end())); - else - ret.set_max_receive_delay(0); + ret.set_max_receive_delay(max_delay); ret.set_images_sent(images_sent); ret.set_start_time_ms(std::chrono::duration_cast(start_time.time_since_epoch()).count()); @@ -598,10 +584,8 @@ float JFJochReceiver::GetProgress() const { void JFJochReceiver::FinalizeMeasurement() { if (!frame_transformation_futures.empty()) { - for (auto &future: frame_transformation_futures) { - auto val = future.get(); - if (val >= 0) max_delay.push_back(val); - } + for (auto &future: frame_transformation_futures) + future.get(); logger.Info("All processing threads done"); } @@ -635,10 +619,8 @@ void JFJochReceiver::FinalizeMeasurement() { end_time = std::chrono::system_clock::now(); - for (auto &future : data_acquisition_futures) { - auto val = future.get(); - if (val >= 0) max_delay.push_back(val); - } + for (auto &future : data_acquisition_futures) + future.get(); for (int i = 0; i < send_buffer_count; i++) send_buffer_avail.GetBlocking(); @@ -750,6 +732,12 @@ void JFJochReceiver::UpdateMaxImage(int64_t image_number) { max_image_number_sent = image_number; } +void JFJochReceiver::UpdateMaxDelay(int64_t delay) { + std::unique_lock ul(max_delay_mutex); + if (delay > max_delay) + max_delay = delay; +} + float JFJochReceiver::GetAvailableSendBuffers() const { return static_cast(send_buffer_avail.Size()) / static_cast(send_buffer_count); } \ No newline at end of file diff --git a/receiver/JFJochReceiver.h b/receiver/JFJochReceiver.h index ffddbdb9..39a06eeb 100644 --- a/receiver/JFJochReceiver.h +++ b/receiver/JFJochReceiver.h @@ -39,8 +39,8 @@ class JFJochReceiver { std::vector one_byte_mask; Logger &logger; - std::vector> frame_transformation_futures; - std::vector> data_acquisition_futures; + std::vector> frame_transformation_futures; + std::vector> data_acquisition_futures; std::unique_ptr rad_int_mapping; std::unique_ptr rad_int_profile; @@ -62,7 +62,9 @@ class JFJochReceiver { std::vector &acquisition_device; uint16_t ndatastreams{0}; - std::vector max_delay; + int64_t max_delay = 0; + std::mutex max_delay_mutex; + std::atomic compressed_size{0}; std::atomic images_sent{0}; @@ -99,16 +101,17 @@ class JFJochReceiver { void PinThreadToDevice(uint16_t data_stream); void PrepareConversionOnCPU(); - int64_t AcquireThread(uint16_t data_stream); - int64_t FrameTransformationThread(); - int64_t MeasurePedestalThread(uint16_t data_stream, uint16_t module_number, uint16_t storage_cell); - int64_t MiniSummationThread(int d, int m, size_t image_number, bool &send_image, + void AcquireThread(uint16_t data_stream); + void FrameTransformationThread(); + void MeasurePedestalThread(uint16_t data_stream, uint16_t module_number, uint16_t storage_cell); + void MiniSummationThread(int d, int m, size_t image_number, bool &send_image, FrameTransformation &transformation, DataMessage &message); void Cancel(const JFJochException &e); void FinalizeMeasurement(); JFJochProtoBuf::DataProcessingSettings GetDataProcessingSettings(); void UpdateMaxImage(int64_t image_number); + void UpdateMaxDelay(int64_t delay); public: constexpr static const int64_t threaded_summation_threshold = 50; diff --git a/tests/CBORTest.cpp b/tests/CBORTest.cpp index c6b1f392..2be10aae 100644 --- a/tests/CBORTest.cpp +++ b/tests/CBORTest.cpp @@ -219,6 +219,8 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); .indexing_result = 1, .bunch_id = UINT64_MAX, .jf_info = UINT32_MAX, + .receiver_available_send_buffers = 786.546, + .receiver_aq_dev_delay = 2323, .timestamp = 1ul<<27 | 1ul <<35, .storage_cell = 0xF, .exptime = 1000, @@ -251,6 +253,8 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); REQUIRE(image_array.timestamp == message.timestamp); REQUIRE(image_array.storage_cell == message.storage_cell); REQUIRE(image_array.exptime == message.exptime); + REQUIRE(image_array.receiver_available_send_buffers == image_array.receiver_available_send_buffers); + REQUIRE(image_array.receiver_aq_dev_delay == image_array.receiver_aq_dev_delay); } TEST_CASE("CBORSerialize_Image_2", "[CBOR]") { @@ -296,6 +300,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size()); REQUIRE(image_array.image.channel == "default"); REQUIRE(image_array.image.size == test.size()); REQUIRE(image_array.indexing_result == message.indexing_result); + REQUIRE(image_array.receiver_available_send_buffers == message.receiver_available_send_buffers); REQUIRE(image_array.number == 480); REQUIRE(memcmp(image_array.image.data, test.data(), test.size()) == 0); } diff --git a/writer/HDF5DataFile.cpp b/writer/HDF5DataFile.cpp index 5856756b..a4241faa 100644 --- a/writer/HDF5DataFile.cpp +++ b/writer/HDF5DataFile.cpp @@ -45,7 +45,8 @@ HDF5DataFile::~HDF5DataFile() { group_exp.SaveVector("timestamp", timestamp); group_exp.SaveVector("storage_cell", storage_cell); group_exp.SaveVector("exptime", exptime); - + group_exp.SaveVector("receiver_available_send_buffers", receiver_available_send_buffers); + group_exp.SaveVector("receiver_aq_dev_delay", receiver_aq_dev_delay); } rad_int_group.reset(); result_group.reset(); @@ -85,6 +86,8 @@ void HDF5DataFile::CreateFile() { indexing_result.resize(1); bunch_id.resize(1); jf_info.resize(1); + receiver_available_send_buffers.resize(1); + receiver_aq_dev_delay.resize(1); timestamp.resize(1); storage_cell.resize(1); exptime.resize(1); @@ -108,6 +111,8 @@ void HDF5DataFile::Write(const DataMessage &msg, uint64_t image_number) { indexing_result.resize(max_image_number + 1); bunch_id.resize(max_image_number + 1); jf_info.resize(max_image_number + 1); + receiver_available_send_buffers.resize(max_image_number + 1); + receiver_aq_dev_delay.resize(max_image_number + 1); timestamp.resize(max_image_number + 1); exptime.resize(max_image_number + 1); storage_cell.resize(max_image_number + 1); @@ -137,6 +142,8 @@ void HDF5DataFile::Write(const DataMessage &msg, uint64_t image_number) { indexing_result[image_number] = msg.indexing_result; bunch_id[image_number] = msg.bunch_id; jf_info[image_number] = msg.jf_info; + receiver_available_send_buffers[image_number] = msg.receiver_available_send_buffers; + receiver_aq_dev_delay[image_number] = msg.receiver_aq_dev_delay; timestamp[image_number] = msg.timestamp; storage_cell[image_number] = msg.storage_cell; exptime[image_number] = msg.exptime; diff --git a/writer/HDF5DataFile.h b/writer/HDF5DataFile.h index 8fa4066a..bd43a00e 100644 --- a/writer/HDF5DataFile.h +++ b/writer/HDF5DataFile.h @@ -45,6 +45,8 @@ class HDF5DataFile { std::vector timestamp; std::vector storage_cell; std::vector exptime; + std::vector receiver_available_send_buffers; + std::vector receiver_aq_dev_delay; std::vector rad_int_bin_to_q; std::vector rad_int_file_avg;