diff --git a/receiver/JFJochReceiver.cpp b/receiver/JFJochReceiver.cpp index 88c4d658..94e8fede 100644 --- a/receiver/JFJochReceiver.cpp +++ b/receiver/JFJochReceiver.cpp @@ -32,12 +32,12 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings, acquisition_device(in_aq_device), logger(in_logger), image_pusher(in_image_sender), - frame_transformation_nthreads(in_forward_and_sum_nthreads), + frame_transformation_nthreads((experiment.GetSummation() >= threaded_summation_threshold) ? + 2 : in_forward_and_sum_nthreads), preview_publisher(in_preview_publisher), rad_int_profile_window(experiment.GetSpotFindingBin()) { ndatastreams = experiment.GetDataStreamsNum(); - if (settings.has_calibration()) { calib.emplace(settings.calibration()); one_byte_mask = calib->CalculateOneByteMask(experiment); @@ -254,6 +254,42 @@ int64_t JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t mod return delay; } +int64_t JFJochReceiver::MiniSummationThread(int d, int m, size_t image_number, bool &send_image, + FrameTransformation &transformation, DataMessage &message) { + int64_t max_thread_delay = 0; + + for (int j = 0; j < experiment.GetSummation(); j++) { + size_t frame_number = image_number * experiment.GetSummation() + j; + acquisition_device[d]->WaitForFrame(frame_number + 2); + + const int16_t *src; + + if (acquisition_device[d]->IsFullModuleCollected(frame_number, m)) { + src = acquisition_device[d]->GetFrameBuffer(frame_number, m); + + if (!send_image) { + // the information is for first module/frame that was collected in full + message.bunch_id = acquisition_device[d]->GetBunchID(frame_number, m); + message.jf_info = acquisition_device[d]->GetJFInfo(frame_number, m); + message.timestamp = acquisition_device[d]->GetTimestamp(frame_number, m); + } + send_image = true; + } else + src = acquisition_device[d]->GetErrorFrameBuffer(); + + if (experiment.GetConversionOnCPU()) { + auto &conv = fixed_point_conversion.at(experiment.GetFirstModuleOfDataStream(d) + m); + transformation.ProcessModule(conv, src, m, d); + } else + transformation.ProcessModule(src, m, d); + + acquisition_device[d]->FrameBufferRelease(frame_number, m); + + max_thread_delay = std::max(max_thread_delay, acquisition_device[d]->CalculateDelay(frame_number)); + } + return max_thread_delay; +} + int64_t JFJochReceiver::FrameTransformationThread() { FrameTransformation transformation(experiment); @@ -319,37 +355,50 @@ int64_t JFJochReceiver::FrameTransformationThread() { index = true; } - for (int j = 0; j < experiment.GetSummation(); j++) { - size_t frame_number = image_number * experiment.GetSummation() + j; + if (experiment.GetSummation() >= threaded_summation_threshold) { + std::vector> mini_summation_threads; + for (int d = 0; d < ndatastreams; d++) + for (int m = 0; m < experiment.GetModulesNum(d); m++) + mini_summation_threads.emplace_back(std::async(std::launch::async, &JFJochReceiver::MiniSummationThread, + this, d, m, image_number, + std::ref(send_image), std::ref(transformation), + std::ref(message))); + for (auto &i: mini_summation_threads) + max_thread_delay = std::max(max_thread_delay, i.get()); + } else { + for (int j = 0; j < experiment.GetSummation(); j++) { + size_t frame_number = image_number * experiment.GetSummation() + j; - for (int d = 0; d < ndatastreams; d++) { - acquisition_device[d]->WaitForFrame(frame_number + 2); + for (int d = 0; d < ndatastreams; d++) { + acquisition_device[d]->WaitForFrame(frame_number + 2); - for (int m = 0; m < experiment.GetModulesNum(d); m++) { - const int16_t *src; + for (int m = 0; m < experiment.GetModulesNum(d); m++) { + const int16_t *src; - if (acquisition_device[d]->IsFullModuleCollected(frame_number, m)) { - src = acquisition_device[d]->GetFrameBuffer(frame_number, m); + if (acquisition_device[d]->IsFullModuleCollected(frame_number, m)) { + src = acquisition_device[d]->GetFrameBuffer(frame_number, m); - if (!send_image) { - // the information is for first module/frame that was collected in full - message.bunch_id = acquisition_device[d]->GetBunchID(frame_number, m); - message.jf_info = acquisition_device[d]->GetJFInfo(frame_number, m); - message.timestamp = acquisition_device[d]->GetTimestamp(frame_number, m); - } - send_image = true; - } else - src = acquisition_device[d]->GetErrorFrameBuffer(); + if (!send_image) { + // the information is for first module/frame that was collected in full + message.bunch_id = acquisition_device[d]->GetBunchID(frame_number, m); + message.jf_info = acquisition_device[d]->GetJFInfo(frame_number, m); + message.timestamp = acquisition_device[d]->GetTimestamp(frame_number, m); + } + send_image = true; + } else + src = acquisition_device[d]->GetErrorFrameBuffer(); - if (experiment.GetConversionOnCPU()) { - auto &conv = fixed_point_conversion.at(experiment.GetFirstModuleOfDataStream(d) + m); - transformation.ProcessModule(conv, src, m, d); - } else - transformation.ProcessModule(src, m, d); + if (experiment.GetConversionOnCPU()) { + auto &conv = fixed_point_conversion.at(experiment.GetFirstModuleOfDataStream(d) + m); + transformation.ProcessModule(conv, src, m, d); + } else + transformation.ProcessModule(src, m, d); - acquisition_device[d]->FrameBufferRelease(frame_number, m); + acquisition_device[d]->FrameBufferRelease(frame_number, m); + } + max_thread_delay = std::max(max_thread_delay, + acquisition_device[d]->CalculateDelay(frame_number)); } - max_thread_delay = std::max(max_thread_delay, acquisition_device[d]->CalculateDelay(frame_number)); } } diff --git a/receiver/JFJochReceiver.h b/receiver/JFJochReceiver.h index e27fe77f..e3487882 100644 --- a/receiver/JFJochReceiver.h +++ b/receiver/JFJochReceiver.h @@ -94,7 +94,8 @@ class JFJochReceiver { int64_t AcquireThread(uint16_t data_stream); int64_t FrameTransformationThread(); int64_t MeasurePedestalThread(uint16_t data_stream, uint16_t module_number, uint16_t storage_cell); - + int64_t MiniSummationThread(int d, int m, size_t image_number, bool &send_image, + FrameTransformation &transformation, DataMessage &message); void Abort(const JFJochException &e); void FinalizeMeasurement(); JFJochProtoBuf::DataProcessingSettings GetDataProcessingSettings(); @@ -104,6 +105,8 @@ class JFJochReceiver { void UpdateMaxImage(int64_t image_number); public: + constexpr static const int64_t threaded_summation_threshold = 50; + JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings, std::vector &open_capi_device, ImagePusher &image_pusher, diff --git a/tests/JFJochFullIntegrationTest.cpp b/tests/JFJochFullIntegrationTest.cpp index 2073caf2..1a04e144 100644 --- a/tests/JFJochFullIntegrationTest.cpp +++ b/tests/JFJochFullIntegrationTest.cpp @@ -206,6 +206,120 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_2DataStreams_4Devices", "[JFJochReceiver]") writer_server->Shutdown(); } + +TEST_CASE("JFJochIntegrationTest_ZMQ_2Devices_Summation100", "[JFJochReceiver]") { + Logger logger("JFJochIntegrationTest_ZMQ_2Devices_Summation100"); + ZMQContext zmq_context; + + RegisterHDF5Filter(); + + int64_t nimages = 1; + int64_t ndatastream = 2; + int64_t nmodules = 2; + int64_t summation = 100; + + JFJochServices services(logger); + JFJochStateMachine state_machine(services, logger); + + REQUIRE(!state_machine.GetMeasurementStatistics().has_value()); + + state_machine.AddDetectorSetup(DetectorGeometry(ndatastream * nmodules, 1, 0, 0, false)); + + state_machine.NotThreadSafe_Experiment().DataStreams(ndatastream); + state_machine.NotThreadSafe_Experiment().PedestalG0Frames(0).PedestalG1Frames(0).PedestalG2Frames(0); + services.Writer("unix:writer_test", "inproc://#1").Receiver("unix:fpga_receiver_test"); + + logger.Verbose(true); + + std::vector image(RAW_MODULE_SIZE); + for (int i = 0; i < image.size(); i++) + image[i] = (i*7+i*i*3) % 29; + + std::vector> aq_devices; + + for (int i = 0; i < 2; i++) { + auto *test = new MockAcquisitionDevice(i, 512); + aq_devices.emplace_back(test); + } + + std::vector tmp_devices; + for (const auto &i: aq_devices) + tmp_devices.emplace_back(i.get()); + + ZMQImagePusher pusher(zmq_context, {"inproc://#1"}); + JFJochReceiverService fpga_receiver(tmp_devices, logger, pusher); + + ZMQPreviewPublisher preview(zmq_context, "inproc://#2"); + fpga_receiver.PreviewPublisher(&preview); + + JFJochWriterService writer(zmq_context, logger); + + auto fpga_receiver_server = gRPCServer("unix:fpga_receiver_test", fpga_receiver); + auto writer_server = gRPCServer("unix:writer_test", writer); + + REQUIRE_NOTHROW(state_machine.Initialize()); + logger.Info("Initialized"); + + JFJochProtoBuf::DatasetSettings setup; + setup.set_ntrigger(1); + setup.set_images_per_trigger(1); + setup.set_detector_distance_mm(100); + setup.set_file_prefix("integration_test_summation"); + setup.set_photon_energy_kev(12.4); + setup.set_data_file_count(2); + setup.set_summation(summation); + + REQUIRE_NOTHROW(state_machine.Start(setup)); + logger.Info("Started measurement"); + + JFJochProtoBuf::BrokerStatus status; + status = state_machine.GetStatus(); + REQUIRE(status.progress() == Approx(0.0)); + REQUIRE(status.broker_state() == JFJochProtoBuf::DATA_COLLECTION); + + for (int i = 0; i < ndatastream; i++) { + for (int m = 0; m < nmodules; m++) { + for (int image_num = 1; image_num <= nimages*summation; image_num++) + aq_devices[i]->AddModule(image_num, m, image.data()); + } + aq_devices[i]->Terminate(); + } + + REQUIRE_NOTHROW(state_machine.Stop()); + logger.Info("Stopped measurement"); + + status = state_machine.GetStatus(); + REQUIRE(status.broker_state() == JFJochProtoBuf::IDLE); + + auto tmp = state_machine.GetMeasurementStatistics(); + REQUIRE(tmp.has_value()); + auto statistics = tmp.value(); + + REQUIRE(statistics.collection_efficiency() == 1.0); + REQUIRE(statistics.images_collected() == 1); + REQUIRE(statistics.images_written() == 1); + REQUIRE(statistics.max_image_number_sent() == 0); + REQUIRE(!statistics.cancelled()); + REQUIRE(statistics.file_prefix() == "integration_test_summation"); + REQUIRE(statistics.detector_width() == 1030); + REQUIRE(statistics.detector_height() == 514*4); + REQUIRE(statistics.detector_pixel_depth() == 4); + + auto preview_image = services.GetPreviewFrame(); + REQUIRE(preview_image.pixel_depth() == 2); + REQUIRE(preview_image.data().size() == 514*1030*ndatastream*nmodules*2); + auto preview_image_content = (int16_t *) preview_image.data().data(); + REQUIRE(preview_image_content[0] == image[0] * summation); + REQUIRE(preview_image_content[5] == image[5] * summation); + REQUIRE(preview_image_content[36+302*1030] == image[36+300*1024] * summation); + + REQUIRE(preview_image_content[514*1030*2+7] == image[7] * summation); + + REQUIRE(preview_image_content[514*1030*3+1030*200+7] == image[7+1024*200] * summation); + fpga_receiver_server->Shutdown(); + writer_server->Shutdown(); +} + TEST_CASE("JFJochIntegrationTest_ZMQ_RAW", "[JFJochReceiver]") { Logger logger("JFJochIntegrationTest_ZMQ"); ZMQContext zmq_context;