From 71960d5496b2e819bded086de6c8a9fc40d2b090 Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Fri, 3 Nov 2023 11:25:14 +0100 Subject: [PATCH] JFJochReceiver: Remove MiniSummationThread (as summation is anyway handled on FPGA) --- receiver/JFJochReceiver.cpp | 118 ++++++++++-------------------------- receiver/JFJochReceiver.h | 5 -- 2 files changed, 33 insertions(+), 90 deletions(-) diff --git a/receiver/JFJochReceiver.cpp b/receiver/JFJochReceiver.cpp index 82e51c6c..704af169 100644 --- a/receiver/JFJochReceiver.cpp +++ b/receiver/JFJochReceiver.cpp @@ -31,8 +31,7 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings, acquisition_device(in_aq_device), logger(in_logger), image_pusher(in_image_sender), - frame_transformation_nthreads((experiment.GetSummation() >= threaded_summation_threshold) ? - 2 : in_forward_and_sum_nthreads), + frame_transformation_nthreads(in_forward_and_sum_nthreads), preview_publisher(in_preview_publisher), ndatastreams(experiment.GetDataStreamsNum()), data_acquisition_ready(ndatastreams), @@ -58,7 +57,7 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings, send_buffer_zero_copy_ret_val.emplace_back(send_buffer_avail, i); } - if (!experiment.CheckGitSha1Consistent()) + if (!experiment.CheckGitSha1Consistent()) logger.Warning(experiment.CheckGitSha1Msg()); push_images_to_writer = (experiment.GetImageNum() > 0) && (!experiment.GetFilePrefix().empty()); @@ -335,46 +334,6 @@ void JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t module data_stream, module_number, storage_cell, storage_cell_header); } -void JFJochReceiver::MiniSummationThread(int d, int m, size_t image_number, bool &send_image, - FrameTransformation &transformation, DataMessage &message, - RadialIntegrationProfile *profile) { - size_t module_abs_number = experiment.GetFirstModuleOfDataStream(d) + m; - - for (int j = 0; j < experiment.GetSummation(); j++) { - size_t frame_number = image_number * experiment.GetSummation() + j; - acquisition_device[d]->Counters().WaitForFrame(frame_number + 2); - - const int16_t *src; - - if (acquisition_device[d]->Counters().IsAnyPacketCollected(frame_number, m)) { - src = acquisition_device[d]->GetDeviceOutput(frame_number, m)->pixels; - - if (!send_image) { - Completion c = acquisition_device[d]->Counters().GetCompletion(frame_number, m); - // the information is for first module/frame that was collected in full - message.bunch_id = c.bunchid; - message.jf_info = c.debug; - message.storage_cell = (message.jf_info >> 8) & 0xF; - message.timestamp = c.timestamp; - message.exptime = c.exptime; - } - send_image = true; - - if (profile) - profile->Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m)); - - adu_histogram_total.Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m)); - adu_histogram_module[module_abs_number].Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m)); - } else - src = acquisition_device[d]->GetErrorFrameBuffer(); - - transformation.ProcessModule(src, m, d); - - acquisition_device[d]->FrameBufferRelease(frame_number, m); - UpdateMaxDelay(acquisition_device[d]->Counters().CalculateDelay(frame_number, m)); - } -} - void JFJochReceiver::FrameTransformationThread() { try { @@ -441,56 +400,45 @@ void JFJochReceiver::FrameTransformationThread() { } } - if (experiment.GetSummation() >= threaded_summation_threshold) { - std::vector> mini_summation_threads; - for (int d = 0; d < ndatastreams; d++) - for (int m = 0; m < experiment.GetModulesNum(d); m++) - mini_summation_threads.emplace_back(std::async(std::launch::async, &JFJochReceiver::MiniSummationThread, - this, d, m, image_number, - std::ref(send_image), std::ref(transformation), - std::ref(message), rad_int_profile_image.get())); - message.receiver_aq_dev_delay = max_delay; - } else { - for (int j = 0; j < experiment.GetSummation(); j++) { - frame_number = image_number * experiment.GetSummation() + j; + for (int j = 0; j < experiment.GetSummation(); j++) { + frame_number = image_number * experiment.GetSummation() + j; - for (int d = 0; d < ndatastreams; d++) { - acquisition_device[d]->Counters().WaitForFrame(frame_number + 2); + for (int d = 0; d < ndatastreams; d++) { + acquisition_device[d]->Counters().WaitForFrame(frame_number + 2); - for (int m = 0; m < experiment.GetModulesNum(d); m++) { - const int16_t *src; - if (acquisition_device[d]->Counters().IsAnyPacketCollected(frame_number, m)) { - src = acquisition_device[d]->GetDeviceOutput(frame_number, m)->pixels; + for (int m = 0; m < experiment.GetModulesNum(d); m++) { + const int16_t *src; + if (acquisition_device[d]->Counters().IsAnyPacketCollected(frame_number, m)) { + src = acquisition_device[d]->GetDeviceOutput(frame_number, m)->pixels; - if (!send_image) { - Completion c = acquisition_device[d]->Counters().GetCompletion(frame_number, m); - // the information is for first module/frame that was collected in full - message.bunch_id = c.bunchid; - message.jf_info = c.debug; - message.storage_cell = (message.jf_info >> 8) & 0xF; - message.timestamp = c.timestamp; - message.exptime = c.exptime; - } - send_image = true; + if (!send_image) { + Completion c = acquisition_device[d]->Counters().GetCompletion(frame_number, m); + // the information is for first module/frame that was collected in full + message.bunch_id = c.bunchid; + message.jf_info = c.debug; + message.storage_cell = (message.jf_info >> 8) & 0xF; + message.timestamp = c.timestamp; + message.exptime = c.exptime; + } + send_image = true; - size_t module_abs_number = experiment.GetFirstModuleOfDataStream(d) + m; - adu_histogram_module[module_abs_number].Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m)); - adu_histogram_total.Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m)); + size_t module_abs_number = experiment.GetFirstModuleOfDataStream(d) + m; + adu_histogram_module[module_abs_number].Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m)); + adu_histogram_total.Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m)); - if (rad_int_profile_image) - rad_int_profile_image->Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m)); - } else - src = acquisition_device[d]->GetErrorFrameBuffer(); + if (rad_int_profile_image) + rad_int_profile_image->Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m)); + } else + src = acquisition_device[d]->GetErrorFrameBuffer(); - transformation.ProcessModule(src, m, d); + transformation.ProcessModule(src, m, d); - acquisition_device[d]->FrameBufferRelease(frame_number, m); - } - auto delay = acquisition_device[d]->Counters().CalculateDelay(frame_number); - UpdateMaxDelay(delay); - if (delay > message.receiver_aq_dev_delay) - message.receiver_aq_dev_delay = delay; + acquisition_device[d]->FrameBufferRelease(frame_number, m); } + auto delay = acquisition_device[d]->Counters().CalculateDelay(frame_number); + UpdateMaxDelay(delay); + if (delay > message.receiver_aq_dev_delay) + message.receiver_aq_dev_delay = delay; } } diff --git a/receiver/JFJochReceiver.h b/receiver/JFJochReceiver.h index 575632a8..747f58d1 100644 --- a/receiver/JFJochReceiver.h +++ b/receiver/JFJochReceiver.h @@ -108,9 +108,6 @@ class JFJochReceiver { void AcquireThread(uint16_t data_stream); void FrameTransformationThread(); void MeasurePedestalThread(uint16_t data_stream, uint16_t module_number, uint16_t storage_cell); - void MiniSummationThread(int d, int m, size_t image_number, bool &send_image, - FrameTransformation &transformation, DataMessage &message, - RadialIntegrationProfile *profile); void Cancel(const JFJochException &e); void FinalizeMeasurement(); JFJochProtoBuf::DataProcessingSettings GetDataProcessingSettings(); @@ -118,8 +115,6 @@ class JFJochReceiver { void UpdateMaxImage(int64_t image_number); void UpdateMaxDelay(int64_t delay); public: - constexpr static const int64_t threaded_summation_threshold = 50; - JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings, std::vector &open_capi_device, ImagePusher &image_pusher,