JFJochReceiver: Use multithreaded implementation for summation of 50+ (assuming 100 microsecond to span thread, it is 0.5% overhead at 25 ms)
This commit is contained in:
+75
-26
@@ -32,12 +32,12 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings,
|
||||
acquisition_device(in_aq_device),
|
||||
logger(in_logger),
|
||||
image_pusher(in_image_sender),
|
||||
frame_transformation_nthreads(in_forward_and_sum_nthreads),
|
||||
frame_transformation_nthreads((experiment.GetSummation() >= threaded_summation_threshold) ?
|
||||
2 : in_forward_and_sum_nthreads),
|
||||
preview_publisher(in_preview_publisher),
|
||||
rad_int_profile_window(experiment.GetSpotFindingBin())
|
||||
{
|
||||
ndatastreams = experiment.GetDataStreamsNum();
|
||||
|
||||
if (settings.has_calibration()) {
|
||||
calib.emplace(settings.calibration());
|
||||
one_byte_mask = calib->CalculateOneByteMask(experiment);
|
||||
@@ -254,6 +254,42 @@ int64_t JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t mod
|
||||
return delay;
|
||||
}
|
||||
|
||||
int64_t JFJochReceiver::MiniSummationThread(int d, int m, size_t image_number, bool &send_image,
|
||||
FrameTransformation &transformation, DataMessage &message) {
|
||||
int64_t max_thread_delay = 0;
|
||||
|
||||
for (int j = 0; j < experiment.GetSummation(); j++) {
|
||||
size_t frame_number = image_number * experiment.GetSummation() + j;
|
||||
acquisition_device[d]->WaitForFrame(frame_number + 2);
|
||||
|
||||
const int16_t *src;
|
||||
|
||||
if (acquisition_device[d]->IsFullModuleCollected(frame_number, m)) {
|
||||
src = acquisition_device[d]->GetFrameBuffer(frame_number, m);
|
||||
|
||||
if (!send_image) {
|
||||
// the information is for first module/frame that was collected in full
|
||||
message.bunch_id = acquisition_device[d]->GetBunchID(frame_number, m);
|
||||
message.jf_info = acquisition_device[d]->GetJFInfo(frame_number, m);
|
||||
message.timestamp = acquisition_device[d]->GetTimestamp(frame_number, m);
|
||||
}
|
||||
send_image = true;
|
||||
} else
|
||||
src = acquisition_device[d]->GetErrorFrameBuffer();
|
||||
|
||||
if (experiment.GetConversionOnCPU()) {
|
||||
auto &conv = fixed_point_conversion.at(experiment.GetFirstModuleOfDataStream(d) + m);
|
||||
transformation.ProcessModule(conv, src, m, d);
|
||||
} else
|
||||
transformation.ProcessModule(src, m, d);
|
||||
|
||||
acquisition_device[d]->FrameBufferRelease(frame_number, m);
|
||||
|
||||
max_thread_delay = std::max(max_thread_delay, acquisition_device[d]->CalculateDelay(frame_number));
|
||||
}
|
||||
return max_thread_delay;
|
||||
}
|
||||
|
||||
int64_t JFJochReceiver::FrameTransformationThread() {
|
||||
|
||||
FrameTransformation transformation(experiment);
|
||||
@@ -319,37 +355,50 @@ int64_t JFJochReceiver::FrameTransformationThread() {
|
||||
index = true;
|
||||
}
|
||||
|
||||
for (int j = 0; j < experiment.GetSummation(); j++) {
|
||||
size_t frame_number = image_number * experiment.GetSummation() + j;
|
||||
if (experiment.GetSummation() >= threaded_summation_threshold) {
|
||||
std::vector<std::future<int64_t>> mini_summation_threads;
|
||||
for (int d = 0; d < ndatastreams; d++)
|
||||
for (int m = 0; m < experiment.GetModulesNum(d); m++)
|
||||
mini_summation_threads.emplace_back(std::async(std::launch::async, &JFJochReceiver::MiniSummationThread,
|
||||
this, d, m, image_number,
|
||||
std::ref(send_image), std::ref(transformation),
|
||||
std::ref(message)));
|
||||
for (auto &i: mini_summation_threads)
|
||||
max_thread_delay = std::max(max_thread_delay, i.get());
|
||||
} else {
|
||||
for (int j = 0; j < experiment.GetSummation(); j++) {
|
||||
size_t frame_number = image_number * experiment.GetSummation() + j;
|
||||
|
||||
for (int d = 0; d < ndatastreams; d++) {
|
||||
acquisition_device[d]->WaitForFrame(frame_number + 2);
|
||||
for (int d = 0; d < ndatastreams; d++) {
|
||||
acquisition_device[d]->WaitForFrame(frame_number + 2);
|
||||
|
||||
for (int m = 0; m < experiment.GetModulesNum(d); m++) {
|
||||
const int16_t *src;
|
||||
for (int m = 0; m < experiment.GetModulesNum(d); m++) {
|
||||
const int16_t *src;
|
||||
|
||||
if (acquisition_device[d]->IsFullModuleCollected(frame_number, m)) {
|
||||
src = acquisition_device[d]->GetFrameBuffer(frame_number, m);
|
||||
if (acquisition_device[d]->IsFullModuleCollected(frame_number, m)) {
|
||||
src = acquisition_device[d]->GetFrameBuffer(frame_number, m);
|
||||
|
||||
if (!send_image) {
|
||||
// the information is for first module/frame that was collected in full
|
||||
message.bunch_id = acquisition_device[d]->GetBunchID(frame_number, m);
|
||||
message.jf_info = acquisition_device[d]->GetJFInfo(frame_number, m);
|
||||
message.timestamp = acquisition_device[d]->GetTimestamp(frame_number, m);
|
||||
}
|
||||
send_image = true;
|
||||
} else
|
||||
src = acquisition_device[d]->GetErrorFrameBuffer();
|
||||
if (!send_image) {
|
||||
// the information is for first module/frame that was collected in full
|
||||
message.bunch_id = acquisition_device[d]->GetBunchID(frame_number, m);
|
||||
message.jf_info = acquisition_device[d]->GetJFInfo(frame_number, m);
|
||||
message.timestamp = acquisition_device[d]->GetTimestamp(frame_number, m);
|
||||
}
|
||||
send_image = true;
|
||||
} else
|
||||
src = acquisition_device[d]->GetErrorFrameBuffer();
|
||||
|
||||
if (experiment.GetConversionOnCPU()) {
|
||||
auto &conv = fixed_point_conversion.at(experiment.GetFirstModuleOfDataStream(d) + m);
|
||||
transformation.ProcessModule(conv, src, m, d);
|
||||
} else
|
||||
transformation.ProcessModule(src, m, d);
|
||||
if (experiment.GetConversionOnCPU()) {
|
||||
auto &conv = fixed_point_conversion.at(experiment.GetFirstModuleOfDataStream(d) + m);
|
||||
transformation.ProcessModule(conv, src, m, d);
|
||||
} else
|
||||
transformation.ProcessModule(src, m, d);
|
||||
|
||||
acquisition_device[d]->FrameBufferRelease(frame_number, m);
|
||||
acquisition_device[d]->FrameBufferRelease(frame_number, m);
|
||||
}
|
||||
max_thread_delay = std::max(max_thread_delay,
|
||||
acquisition_device[d]->CalculateDelay(frame_number));
|
||||
}
|
||||
max_thread_delay = std::max(max_thread_delay, acquisition_device[d]->CalculateDelay(frame_number));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -94,7 +94,8 @@ class JFJochReceiver {
|
||||
int64_t AcquireThread(uint16_t data_stream);
|
||||
int64_t FrameTransformationThread();
|
||||
int64_t MeasurePedestalThread(uint16_t data_stream, uint16_t module_number, uint16_t storage_cell);
|
||||
|
||||
int64_t MiniSummationThread(int d, int m, size_t image_number, bool &send_image,
|
||||
FrameTransformation &transformation, DataMessage &message);
|
||||
void Abort(const JFJochException &e);
|
||||
void FinalizeMeasurement();
|
||||
JFJochProtoBuf::DataProcessingSettings GetDataProcessingSettings();
|
||||
@@ -104,6 +105,8 @@ class JFJochReceiver {
|
||||
|
||||
void UpdateMaxImage(int64_t image_number);
|
||||
public:
|
||||
constexpr static const int64_t threaded_summation_threshold = 50;
|
||||
|
||||
JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings,
|
||||
std::vector<AcquisitionDevice *> &open_capi_device,
|
||||
ImagePusher &image_pusher,
|
||||
|
||||
@@ -206,6 +206,120 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_2DataStreams_4Devices", "[JFJochReceiver]")
|
||||
writer_server->Shutdown();
|
||||
}
|
||||
|
||||
|
||||
TEST_CASE("JFJochIntegrationTest_ZMQ_2Devices_Summation100", "[JFJochReceiver]") {
|
||||
Logger logger("JFJochIntegrationTest_ZMQ_2Devices_Summation100");
|
||||
ZMQContext zmq_context;
|
||||
|
||||
RegisterHDF5Filter();
|
||||
|
||||
int64_t nimages = 1;
|
||||
int64_t ndatastream = 2;
|
||||
int64_t nmodules = 2;
|
||||
int64_t summation = 100;
|
||||
|
||||
JFJochServices services(logger);
|
||||
JFJochStateMachine state_machine(services, logger);
|
||||
|
||||
REQUIRE(!state_machine.GetMeasurementStatistics().has_value());
|
||||
|
||||
state_machine.AddDetectorSetup(DetectorGeometry(ndatastream * nmodules, 1, 0, 0, false));
|
||||
|
||||
state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream);
|
||||
state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0);
|
||||
services.Writer("unix:writer_test", "inproc://#1").Receiver("unix:fpga_receiver_test");
|
||||
|
||||
logger.Verbose(true);
|
||||
|
||||
std::vector<uint16_t> image(RAW_MODULE_SIZE);
|
||||
for (int i = 0; i < image.size(); i++)
|
||||
image[i] = (i*7+i*i*3) % 29;
|
||||
|
||||
std::vector<std::unique_ptr<MockAcquisitionDevice>> aq_devices;
|
||||
|
||||
for (int i = 0; i < 2; i++) {
|
||||
auto *test = new MockAcquisitionDevice(i, 512);
|
||||
aq_devices.emplace_back(test);
|
||||
}
|
||||
|
||||
std::vector<AcquisitionDevice *> tmp_devices;
|
||||
for (const auto &i: aq_devices)
|
||||
tmp_devices.emplace_back(i.get());
|
||||
|
||||
ZMQImagePusher pusher(zmq_context, {"inproc://#1"});
|
||||
JFJochReceiverService fpga_receiver(tmp_devices, logger, pusher);
|
||||
|
||||
ZMQPreviewPublisher preview(zmq_context, "inproc://#2");
|
||||
fpga_receiver.PreviewPublisher(&preview);
|
||||
|
||||
JFJochWriterService writer(zmq_context, logger);
|
||||
|
||||
auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver);
|
||||
auto writer_server = gRPCServer("unix:writer_test", writer);
|
||||
|
||||
REQUIRE_NOTHROW(state_machine.Initialize());
|
||||
logger.Info("Initialized");
|
||||
|
||||
JFJochProtoBuf::DatasetSettings setup;
|
||||
setup.set_ntrigger(1);
|
||||
setup.set_images_per_trigger(1);
|
||||
setup.set_detector_distance_mm(100);
|
||||
setup.set_file_prefix("integration_test_summation");
|
||||
setup.set_photon_energy_kev(12.4);
|
||||
setup.set_data_file_count(2);
|
||||
setup.set_summation(summation);
|
||||
|
||||
REQUIRE_NOTHROW(state_machine.Start(setup));
|
||||
logger.Info("Started measurement");
|
||||
|
||||
JFJochProtoBuf::BrokerStatus status;
|
||||
status = state_machine.GetStatus();
|
||||
REQUIRE(status.progress() == Approx(0.0));
|
||||
REQUIRE(status.broker_state() == JFJochProtoBuf::DATA_COLLECTION);
|
||||
|
||||
for (int i = 0; i < ndatastream; i++) {
|
||||
for (int m = 0; m < nmodules; m++) {
|
||||
for (int image_num = 1; image_num <= nimages*summation; image_num++)
|
||||
aq_devices[i]->AddModule(image_num, m, image.data());
|
||||
}
|
||||
aq_devices[i]->Terminate();
|
||||
}
|
||||
|
||||
REQUIRE_NOTHROW(state_machine.Stop());
|
||||
logger.Info("Stopped measurement");
|
||||
|
||||
status = state_machine.GetStatus();
|
||||
REQUIRE(status.broker_state() == JFJochProtoBuf::IDLE);
|
||||
|
||||
auto tmp = state_machine.GetMeasurementStatistics();
|
||||
REQUIRE(tmp.has_value());
|
||||
auto statistics = tmp.value();
|
||||
|
||||
REQUIRE(statistics.collection_efficiency() == 1.0);
|
||||
REQUIRE(statistics.images_collected() == 1);
|
||||
REQUIRE(statistics.images_written() == 1);
|
||||
REQUIRE(statistics.max_image_number_sent() == 0);
|
||||
REQUIRE(!statistics.cancelled());
|
||||
REQUIRE(statistics.file_prefix() == "integration_test_summation");
|
||||
REQUIRE(statistics.detector_width() == 1030);
|
||||
REQUIRE(statistics.detector_height() == 514*4);
|
||||
REQUIRE(statistics.detector_pixel_depth() == 4);
|
||||
|
||||
auto preview_image = services.GetPreviewFrame();
|
||||
REQUIRE(preview_image.pixel_depth() == 2);
|
||||
REQUIRE(preview_image.data().size() == 514*1030*ndatastream*nmodules*2);
|
||||
auto preview_image_content = (int16_t *) preview_image.data().data();
|
||||
REQUIRE(preview_image_content[0] == image[0] * summation);
|
||||
REQUIRE(preview_image_content[5] == image[5] * summation);
|
||||
REQUIRE(preview_image_content[36+302*1030] == image[36+300*1024] * summation);
|
||||
|
||||
REQUIRE(preview_image_content[514*1030*2+7] == image[7] * summation);
|
||||
|
||||
REQUIRE(preview_image_content[514*1030*3+1030*200+7] == image[7+1024*200] * summation);
|
||||
fpga_receiver_server->Shutdown();
|
||||
writer_server->Shutdown();
|
||||
}
|
||||
|
||||
TEST_CASE("JFJochIntegrationTest_ZMQ_RAW", "[JFJochReceiver]") {
|
||||
Logger logger("JFJochIntegrationTest_ZMQ");
|
||||
ZMQContext zmq_context;
|
||||
|
||||
Reference in New Issue
Block a user