JFJochReceiver: Remove MiniSummationThread (as summation is anyway handled on FPGA)

This commit is contained in:
2023-11-03 11:25:14 +01:00
parent 3d7c7b0779
commit 71960d5496
2 changed files with 33 additions and 90 deletions

View File

@@ -31,8 +31,7 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings,
acquisition_device(in_aq_device),
logger(in_logger),
image_pusher(in_image_sender),
frame_transformation_nthreads((experiment.GetSummation() >= threaded_summation_threshold) ?
2 : in_forward_and_sum_nthreads),
frame_transformation_nthreads(in_forward_and_sum_nthreads),
preview_publisher(in_preview_publisher),
ndatastreams(experiment.GetDataStreamsNum()),
data_acquisition_ready(ndatastreams),
@@ -58,7 +57,7 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings,
send_buffer_zero_copy_ret_val.emplace_back(send_buffer_avail, i);
}
if (!experiment.CheckGitSha1Consistent())
if (!experiment.CheckGitSha1Consistent())
logger.Warning(experiment.CheckGitSha1Msg());
push_images_to_writer = (experiment.GetImageNum() > 0) && (!experiment.GetFilePrefix().empty());
@@ -335,46 +334,6 @@ void JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t module
data_stream, module_number, storage_cell, storage_cell_header);
}
void JFJochReceiver::MiniSummationThread(int d, int m, size_t image_number, bool &send_image,
FrameTransformation &transformation, DataMessage &message,
RadialIntegrationProfile *profile) {
size_t module_abs_number = experiment.GetFirstModuleOfDataStream(d) + m;
for (int j = 0; j < experiment.GetSummation(); j++) {
size_t frame_number = image_number * experiment.GetSummation() + j;
acquisition_device[d]->Counters().WaitForFrame(frame_number + 2);
const int16_t *src;
if (acquisition_device[d]->Counters().IsAnyPacketCollected(frame_number, m)) {
src = acquisition_device[d]->GetDeviceOutput(frame_number, m)->pixels;
if (!send_image) {
Completion c = acquisition_device[d]->Counters().GetCompletion(frame_number, m);
// the information is for first module/frame that was collected in full
message.bunch_id = c.bunchid;
message.jf_info = c.debug;
message.storage_cell = (message.jf_info >> 8) & 0xF;
message.timestamp = c.timestamp;
message.exptime = c.exptime;
}
send_image = true;
if (profile)
profile->Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m));
adu_histogram_total.Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m));
adu_histogram_module[module_abs_number].Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m));
} else
src = acquisition_device[d]->GetErrorFrameBuffer();
transformation.ProcessModule(src, m, d);
acquisition_device[d]->FrameBufferRelease(frame_number, m);
UpdateMaxDelay(acquisition_device[d]->Counters().CalculateDelay(frame_number, m));
}
}
void JFJochReceiver::FrameTransformationThread() {
try {
@@ -441,56 +400,45 @@ void JFJochReceiver::FrameTransformationThread() {
}
}
if (experiment.GetSummation() >= threaded_summation_threshold) {
std::vector<std::future<void>> mini_summation_threads;
for (int d = 0; d < ndatastreams; d++)
for (int m = 0; m < experiment.GetModulesNum(d); m++)
mini_summation_threads.emplace_back(std::async(std::launch::async, &JFJochReceiver::MiniSummationThread,
this, d, m, image_number,
std::ref(send_image), std::ref(transformation),
std::ref(message), rad_int_profile_image.get()));
message.receiver_aq_dev_delay = max_delay;
} else {
for (int j = 0; j < experiment.GetSummation(); j++) {
frame_number = image_number * experiment.GetSummation() + j;
for (int j = 0; j < experiment.GetSummation(); j++) {
frame_number = image_number * experiment.GetSummation() + j;
for (int d = 0; d < ndatastreams; d++) {
acquisition_device[d]->Counters().WaitForFrame(frame_number + 2);
for (int d = 0; d < ndatastreams; d++) {
acquisition_device[d]->Counters().WaitForFrame(frame_number + 2);
for (int m = 0; m < experiment.GetModulesNum(d); m++) {
const int16_t *src;
if (acquisition_device[d]->Counters().IsAnyPacketCollected(frame_number, m)) {
src = acquisition_device[d]->GetDeviceOutput(frame_number, m)->pixels;
for (int m = 0; m < experiment.GetModulesNum(d); m++) {
const int16_t *src;
if (acquisition_device[d]->Counters().IsAnyPacketCollected(frame_number, m)) {
src = acquisition_device[d]->GetDeviceOutput(frame_number, m)->pixels;
if (!send_image) {
Completion c = acquisition_device[d]->Counters().GetCompletion(frame_number, m);
// the information is for first module/frame that was collected in full
message.bunch_id = c.bunchid;
message.jf_info = c.debug;
message.storage_cell = (message.jf_info >> 8) & 0xF;
message.timestamp = c.timestamp;
message.exptime = c.exptime;
}
send_image = true;
if (!send_image) {
Completion c = acquisition_device[d]->Counters().GetCompletion(frame_number, m);
// the information is for first module/frame that was collected in full
message.bunch_id = c.bunchid;
message.jf_info = c.debug;
message.storage_cell = (message.jf_info >> 8) & 0xF;
message.timestamp = c.timestamp;
message.exptime = c.exptime;
}
send_image = true;
size_t module_abs_number = experiment.GetFirstModuleOfDataStream(d) + m;
adu_histogram_module[module_abs_number].Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m));
adu_histogram_total.Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m));
size_t module_abs_number = experiment.GetFirstModuleOfDataStream(d) + m;
adu_histogram_module[module_abs_number].Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m));
adu_histogram_total.Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m));
if (rad_int_profile_image)
rad_int_profile_image->Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m));
} else
src = acquisition_device[d]->GetErrorFrameBuffer();
if (rad_int_profile_image)
rad_int_profile_image->Add(*acquisition_device[d]->GetDeviceOutput(frame_number, m));
} else
src = acquisition_device[d]->GetErrorFrameBuffer();
transformation.ProcessModule(src, m, d);
transformation.ProcessModule(src, m, d);
acquisition_device[d]->FrameBufferRelease(frame_number, m);
}
auto delay = acquisition_device[d]->Counters().CalculateDelay(frame_number);
UpdateMaxDelay(delay);
if (delay > message.receiver_aq_dev_delay)
message.receiver_aq_dev_delay = delay;
acquisition_device[d]->FrameBufferRelease(frame_number, m);
}
auto delay = acquisition_device[d]->Counters().CalculateDelay(frame_number);
UpdateMaxDelay(delay);
if (delay > message.receiver_aq_dev_delay)
message.receiver_aq_dev_delay = delay;
}
}

View File

@@ -108,9 +108,6 @@ class JFJochReceiver {
void AcquireThread(uint16_t data_stream);
void FrameTransformationThread();
void MeasurePedestalThread(uint16_t data_stream, uint16_t module_number, uint16_t storage_cell);
void MiniSummationThread(int d, int m, size_t image_number, bool &send_image,
FrameTransformation &transformation, DataMessage &message,
RadialIntegrationProfile *profile);
void Cancel(const JFJochException &e);
void FinalizeMeasurement();
JFJochProtoBuf::DataProcessingSettings GetDataProcessingSettings();
@@ -118,8 +115,6 @@ class JFJochReceiver {
void UpdateMaxImage(int64_t image_number);
void UpdateMaxDelay(int64_t delay);
public:
constexpr static const int64_t threaded_summation_threshold = 50;
JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings,
std::vector<AcquisitionDevice *> &open_capi_device,
ImagePusher &image_pusher,