AcquisiotnCounters: Add more synchronization and avoid busy waiting

This commit is contained in:
2026-02-28 17:00:27 +01:00
parent 64067f7430
commit a9e88a9b6e

View File

@@ -7,8 +7,8 @@
#include "../common/JFJochException.h"
AcquisitionCounters::AcquisitionCounters()
: curr_frame_number(max_modules, 0), slowest_frame_number(0), fastest_frame_number(0),
total_packets(0), expected_frames(0), acquisition_finished(false) {}
: total_packets(0), slowest_frame_number(0), fastest_frame_number(0),
curr_frame_number(max_modules, 0), acquisition_finished(false), expected_frames(0) {}
void AcquisitionCounters::Reset(const DiffractionExperiment &experiment, uint16_t data_stream) {
std::unique_lock ul(m);
@@ -56,32 +56,31 @@ void AcquisitionCounters::UpdateCounters(const Completion *c) {
if (c->frame_number >= expected_frames)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"UpdateCounters frame number is out of bounds");
else {
if (curr_frame_number.at(c->module_number) < c->frame_number)
curr_frame_number.at(c->module_number) = c->frame_number;
if (c->frame_number > slowest_frame_number) {
slowest_frame_number = curr_frame_number[0];
for (int i = 1; i < nmodules; i++)
if (curr_frame_number[i] < slowest_frame_number)
slowest_frame_number = curr_frame_number[i];
}
if (curr_frame_number.at(c->module_number) < c->frame_number)
curr_frame_number.at(c->module_number) = c->frame_number;
if (c->frame_number > fastest_frame_number) {
fastest_frame_number = c->frame_number;
}
if (fastest_frame_number - slowest_frame_number > ThresholdFramesLost)
slowest_frame_number = fastest_frame_number - ThresholdFramesLost;
packets_collected.at(c->frame_number * nmodules + c->module_number) = c->packet_count;
handle_for_frame.at(c->frame_number * nmodules + c->module_number) = c->handle;
saved_completions.at(c->frame_number * nmodules + c->module_number) = *c;
total_packets += c->packet_count;
packets_per_module[c->module_number] += c->packet_count;
if (c->frame_number > slowest_frame_number) {
slowest_frame_number = curr_frame_number[0];
for (int i = 1; i < nmodules; i++)
if (curr_frame_number[i] < slowest_frame_number)
slowest_frame_number = curr_frame_number[i];
}
if (c->frame_number > fastest_frame_number) {
fastest_frame_number = c->frame_number;
}
if (fastest_frame_number - slowest_frame_number > ThresholdFramesLost)
slowest_frame_number = fastest_frame_number - ThresholdFramesLost;
packets_collected.at(c->frame_number * nmodules + c->module_number) = c->packet_count;
handle_for_frame.at(c->frame_number * nmodules + c->module_number) = c->handle;
saved_completions.at(c->frame_number * nmodules + c->module_number) = *c;
total_packets += c->packet_count;
packets_per_module[c->module_number] += c->packet_count;
data_updated.notify_all();
}
@@ -132,44 +131,50 @@ uint64_t AcquisitionCounters::GetCurrFrameNumber(uint16_t module_number) const {
if (module_number >= max_modules)
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds,
"GetCurrFrameNumber Wrong module number: " + std::to_string(module_number));
std::shared_lock sl(m);
return curr_frame_number[module_number];
}
uint64_t AcquisitionCounters::GetSlowestFrameNumber() const {
std::shared_lock sl(m);
return slowest_frame_number;
}
uint64_t AcquisitionCounters::GetFastestFrameNumber() const {
std::shared_lock sl(m);
return fastest_frame_number;
}
void AcquisitionCounters::WaitForFrame(size_t curr_frame, uint16_t module_number) const {
uint64_t slowest_head_tmp = (module_number == UINT16_MAX) ? GetSlowestFrameNumber() : GetCurrFrameNumber(module_number);
if (curr_frame == 0)
curr_frame = 1; // Cannot wait for frame 0, as this is initial value of slowest frame number, so waiting for frame 1 in this case
while (!acquisition_finished && (slowest_head_tmp < curr_frame)) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
slowest_head_tmp = (module_number == UINT16_MAX) ? GetSlowestFrameNumber() : GetCurrFrameNumber(module_number);
}
curr_frame = 1; // Cannot wait for frame 0, as this is initial value
std::shared_lock sl(m);
data_updated.wait(sl, [&] {
const uint64_t head = (module_number == UINT16_MAX)
? slowest_frame_number
: curr_frame_number.at(module_number);
return acquisition_finished || (head >= curr_frame);
});
}
int64_t AcquisitionCounters::CalculateDelay(size_t curr_frame, uint16_t module_number) const {
uint64_t slowest_head_tmp;
if (module_number == UINT16_MAX)
slowest_head_tmp = GetSlowestFrameNumber();
else
slowest_head_tmp = GetCurrFrameNumber(module_number);
if (slowest_head_tmp < curr_frame)
std::shared_lock sl(m);
const uint64_t head = (module_number == UINT16_MAX)
? slowest_frame_number
: curr_frame_number.at(module_number);
if (head < curr_frame)
return 0;
return slowest_head_tmp - curr_frame;
return static_cast<int64_t>(head - curr_frame);
}
bool AcquisitionCounters::IsAcquisitionFinished() const {
std::shared_lock sl(m);
return acquisition_finished;
}
bool AcquisitionCounters::IsFullModuleCollected(size_t frame, uint16_t module_number) const {
std::shared_lock sl(m);
if (frame >= expected_frames)
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds,
"IsFullModuleCollected Wrong frame number: " + std::to_string(frame));
@@ -183,6 +188,7 @@ bool AcquisitionCounters::IsFullModuleCollected(size_t frame, uint16_t module_nu
}
bool AcquisitionCounters::IsAnyPacketCollected(size_t frame, uint16_t module_number) const {
std::shared_lock sl(m);
if (frame >= expected_frames)
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds,
"IsFullModuleCollected Wrong frame number: " + std::to_string(frame));
@@ -196,19 +202,10 @@ bool AcquisitionCounters::IsAnyPacketCollected(size_t frame, uint16_t module_num
}
uint64_t AcquisitionCounters::GetTotalPackets() const {
std::shared_lock sl(m);
return total_packets;
}
uint64_t AcquisitionCounters::GetTotalPackets(uint16_t module_number) const {
std::unique_lock ul(m);
if (module_number >= nmodules)
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds,
"GetTotalPackets Wrong module number: " + std::to_string(module_number));
return packets_per_module[module_number];
}
uint64_t AcquisitionCounters::GetExpectedPacketsPerImage() const {
return expected_packets_per_module * nmodules;
}
@@ -218,13 +215,16 @@ uint64_t AcquisitionCounters::GetTotalExpectedPackets() const {
}
uint64_t AcquisitionCounters::GetTotalExpectedPacketsPerModule() const {
std::shared_lock sl(m);
return expected_frames * expected_packets_per_module;
}
uint64_t AcquisitionCounters::GetModuleNumber() const {
std::shared_lock sl(m);
return nmodules;
}
uint64_t AcquisitionCounters::GetBytesReceived() const {
return GetTotalPackets() * bytes_per_packet;
std::shared_lock sl(m);
return total_packets * bytes_per_packet;
}