From 7d945c8195098c2b2a444f0655a9402accef2ca0 Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Fri, 4 Aug 2023 12:46:58 +0200 Subject: [PATCH] JFJochReceiver: send buffer is pointer with more flexibility --- receiver/JFJochReceiver.cpp | 340 +++++++++++++++++++----------------- receiver/JFJochReceiver.h | 2 +- 2 files changed, 176 insertions(+), 166 deletions(-) diff --git a/receiver/JFJochReceiver.cpp b/receiver/JFJochReceiver.cpp index 2c5b4cff..da3d989b 100644 --- a/receiver/JFJochReceiver.cpp +++ b/receiver/JFJochReceiver.cpp @@ -40,200 +40,209 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings, data_acquisition_ready(ndatastreams), frame_transformation_ready((experiment.GetImageNum() > 0) ? frame_transformation_nthreads : 0), send_buffer_count(in_send_buffer_count), - send_buffer(send_buffer_size * send_buffer_count), indexing_solution_per_file(experiment.GetDataFileCount()), numa_policy(in_numa_policy) { - if (settings.has_calibration()) { - calib.emplace(settings.calibration()); - one_byte_mask = calib->CalculateOneByteMask(experiment); - } else { - one_byte_mask.resize(experiment.GetPixelsNum()); - for (auto &i: one_byte_mask) i = 1; - } + send_buffer = (uint8_t *) calloc(send_buffer_size, send_buffer_count); - for (uint32_t i = 0; i < send_buffer_count; i++) { - send_buffer_avail.Put(i); - send_buffer_zero_copy_ret_val.emplace_back(send_buffer_avail, i); - } - - if (experiment.GetConversionOnCPU()) - PrepareConversionOnCPU(); - - if (!experiment.CheckGitSha1Consistent()) - logger.Warning(experiment.CheckGitSha1Msg()); - - push_images_to_writer = (experiment.GetImageNum() > 0) && (!experiment.GetFilePrefix().empty()); - - if (acquisition_device.size() < ndatastreams) - throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, - "Number of acquisition devices has to match data streams"); - if (frame_transformation_nthreads <= 0) - throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, - "Number of threads must be more than zero"); - - preview_stride = experiment.GetPreviewStride(); - spotfinder_stride = experiment.GetSpotFindingStride(); - - logger.Info("NUMA policy: {}", numa_policy.GetName()); - logger.Info("Image stride for data analysis: preview {}, spot finding/radial integration {}", - preview_stride, spotfinder_stride); - - if (experiment.GetDetectorMode() == DetectorMode::Conversion) { - if (preview_publisher != nullptr) - preview_publisher->Start(experiment, calib.value()); - if (preview_publisher_indexed != nullptr) - preview_publisher_indexed->Start(experiment, calib.value()); - - if (!GPUImageAnalysis::GPUPresent()) - logger.Info("GPU support missing"); - - rad_int_mapping = std::make_unique(experiment, one_byte_mask.data()); - rad_int_profile = std::make_unique(*rad_int_mapping, experiment); - rad_int_corr = CalcRadIntCorr(experiment); - - for (int i = 0; i < experiment.GetDataFileCount(); i++) - rad_int_profile_per_file.emplace_back(std::make_unique(*rad_int_mapping, experiment)); - - spot_finder_mask = calib->CalculateOneByteMask(experiment); - } - - for (int d = 0; d < ndatastreams; d++) { - if (calib) - acquisition_device[d]->InitializeCalibration(experiment, calib.value()); - acquisition_device[d]->PrepareAction(experiment); - - logger.Debug("Acquisition device {} prepared", d); - } - - for (int d = 0; d < ndatastreams; d++) - data_acquisition_futures.emplace_back(std::async(std::launch::async, &JFJochReceiver::AcquireThread, - this, d)); - - logger.Info("Data acquisition devices ready"); - - if ((experiment.GetDetectorMode() == DetectorMode::PedestalG0) - || (experiment.GetDetectorMode() == DetectorMode::PedestalG1) - || (experiment.GetDetectorMode() == DetectorMode::PedestalG2)) { - - if (experiment.GetImageNum() > 0) { - throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, - "Saving and calculating pedestal is not supported for the time being"); + try { + if (settings.has_calibration()) { + calib.emplace(settings.calibration()); + one_byte_mask = calib->CalculateOneByteMask(experiment); + } else { + one_byte_mask.resize(experiment.GetPixelsNum()); + for (auto &i: one_byte_mask) i = 1; } - if (experiment.GetDetectorMode() == DetectorMode::PedestalG0) { - pedestal_result.resize(experiment.GetModulesNum() * experiment.GetStorageCellNumber()); - for (int s = 0; s < experiment.GetStorageCellNumber(); s++) { + for (uint32_t i = 0; i < send_buffer_count; i++) { + send_buffer_avail.Put(i); + send_buffer_zero_copy_ret_val.emplace_back(send_buffer_avail, i); + } + + if (experiment.GetConversionOnCPU()) + PrepareConversionOnCPU(); + + if (!experiment.CheckGitSha1Consistent()) + logger.Warning(experiment.CheckGitSha1Msg()); + + push_images_to_writer = (experiment.GetImageNum() > 0) && (!experiment.GetFilePrefix().empty()); + + if (acquisition_device.size() < ndatastreams) + throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, + "Number of acquisition devices has to match data streams"); + if (frame_transformation_nthreads <= 0) + throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, + "Number of threads must be more than zero"); + + preview_stride = experiment.GetPreviewStride(); + spotfinder_stride = experiment.GetSpotFindingStride(); + + logger.Info("NUMA policy: {}", numa_policy.GetName()); + logger.Info("Image stride for data analysis: preview {}, spot finding/radial integration {}", + preview_stride, spotfinder_stride); + + if (experiment.GetDetectorMode() == DetectorMode::Conversion) { + if (preview_publisher != nullptr) + preview_publisher->Start(experiment, calib.value()); + if (preview_publisher_indexed != nullptr) + preview_publisher_indexed->Start(experiment, calib.value()); + + if (!GPUImageAnalysis::GPUPresent()) + logger.Info("GPU support missing"); + + rad_int_mapping = std::make_unique(experiment, one_byte_mask.data()); + rad_int_profile = std::make_unique(*rad_int_mapping, experiment); + rad_int_corr = CalcRadIntCorr(experiment); + + for (int i = 0; i < experiment.GetDataFileCount(); i++) + rad_int_profile_per_file.emplace_back( + std::make_unique(*rad_int_mapping, experiment)); + + spot_finder_mask = calib->CalculateOneByteMask(experiment); + } + + for (int d = 0; d < ndatastreams; d++) { + if (calib) + acquisition_device[d]->InitializeCalibration(experiment, calib.value()); + acquisition_device[d]->PrepareAction(experiment); + + logger.Debug("Acquisition device {} prepared", d); + } + + for (int d = 0; d < ndatastreams; d++) + data_acquisition_futures.emplace_back(std::async(std::launch::async, &JFJochReceiver::AcquireThread, + this, d)); + + logger.Info("Data acquisition devices ready"); + + if ((experiment.GetDetectorMode() == DetectorMode::PedestalG0) + || (experiment.GetDetectorMode() == DetectorMode::PedestalG1) + || (experiment.GetDetectorMode() == DetectorMode::PedestalG2)) { + + if (experiment.GetImageNum() > 0) { + throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, + "Saving and calculating pedestal is not supported for the time being"); + } + + if (experiment.GetDetectorMode() == DetectorMode::PedestalG0) { + pedestal_result.resize(experiment.GetModulesNum() * experiment.GetStorageCellNumber()); + for (int s = 0; s < experiment.GetStorageCellNumber(); s++) { + for (int d = 0; d < ndatastreams; d++) { + for (int m = 0; m < experiment.GetModulesNum(d); m++) { + auto handle = std::async(std::launch::async, &JFJochReceiver::MeasurePedestalThread, this, + d, m, + s); + frame_transformation_futures.emplace_back(std::move(handle)); + } + } + } + } else { + pedestal_result.resize(experiment.GetModulesNum()); for (int d = 0; d < ndatastreams; d++) { for (int m = 0; m < experiment.GetModulesNum(d); m++) { auto handle = std::async(std::launch::async, &JFJochReceiver::MeasurePedestalThread, this, d, m, - s); + 0); frame_transformation_futures.emplace_back(std::move(handle)); } } } - } else { - pedestal_result.resize(experiment.GetModulesNum()); - for (int d = 0; d < ndatastreams; d++) { - for (int m = 0; m < experiment.GetModulesNum(d); m++) { - auto handle = std::async(std::launch::async, &JFJochReceiver::MeasurePedestalThread, this, d, m, 0); - frame_transformation_futures.emplace_back(std::move(handle)); - } - } + + logger.Info("Pedestal threads ready"); } - logger.Info("Pedestal threads ready"); - } + if (experiment.GetImageNum() > 0) { + logger.Info("Data file count {}", experiment.GetDataFileCount()); - if (experiment.GetImageNum() > 0) { - logger.Info("Data file count {}", experiment.GetDataFileCount()); + if (push_images_to_writer) { + StartMessage message{}; + experiment.FillMessage(message); + message.arm_date = time_UTC(std::chrono::system_clock::now()); - if (push_images_to_writer) { - StartMessage message{}; - experiment.FillMessage(message); - message.arm_date = time_UTC(std::chrono::system_clock::now()); + JFJochBitShuffleCompressor compressor(CompressionAlgorithm::BSHUF_LZ4); + std::vector pixel_mask; + std::vector > pedestal; - JFJochBitShuffleCompressor compressor(CompressionAlgorithm::BSHUF_LZ4); - std::vector pixel_mask; - std::vector > pedestal; + if (calib) { + size_t xpixel = experiment.GetXPixelsNum(); + size_t ypixel = experiment.GetYPixelsNum(); - if (calib) { - size_t xpixel = experiment.GetXPixelsNum(); - size_t ypixel = experiment.GetYPixelsNum(); + pixel_mask = compressor.Compress(calib->CalculateNexusMask(experiment, 0)); + message.AddPixelMask(CBORImage{ + .data = pixel_mask.data(), + .size = pixel_mask.size(), + .xpixel = (size_t) xpixel, + .ypixel = (size_t) ypixel, + .pixel_depth_bytes = 4, + .pixel_is_signed = false, + .pixel_is_float = false, + .algorithm = CompressionAlgorithm::BSHUF_LZ4, + .channel = "sc0" + }); + if (experiment.GetSaveCalibration()) { + for (int sc = 0; sc < experiment.GetStorageCellNumber(); sc++) { + for (int gain = 0; gain < 3; gain++) { + auto tmp = compressor.Compress(calib->GetPedestal(gain, sc)); + pedestal.emplace_back(tmp); + std::string channel = "pedestal_G" + std::to_string(gain); - pixel_mask = compressor.Compress(calib->CalculateNexusMask(experiment, 0)); - message.AddPixelMask(CBORImage{ - .data = pixel_mask.data(), - .size = pixel_mask.size(), - .xpixel = (size_t) xpixel, - .ypixel = (size_t) ypixel, - .pixel_depth_bytes = 4, - .pixel_is_signed = false, - .pixel_is_float = false, - .algorithm = CompressionAlgorithm::BSHUF_LZ4, - .channel = "sc0" - }); - if (experiment.GetSaveCalibration()) { - for (int sc = 0; sc < experiment.GetStorageCellNumber(); sc++) { - for (int gain = 0; gain < 3; gain++) { - auto tmp = compressor.Compress(calib->GetPedestal(gain, sc)); - pedestal.emplace_back(tmp); - std::string channel = "pedestal_G" + std::to_string(gain); + if (experiment.GetStorageCellNumber() > 1) + channel += "_sc" + std::to_string(sc); - if (experiment.GetStorageCellNumber() > 1) - channel += "_sc" + std::to_string(sc); + CBORImage image{ + .data = pedestal.at(pedestal.size() - 1).data(), + .size = pedestal.at(pedestal.size() - 1).size(), + .xpixel = (size_t) xpixel, + .ypixel = (size_t) ypixel, + .pixel_depth_bytes = 2, + .pixel_is_signed = false, + .pixel_is_float = false, + .algorithm = CompressionAlgorithm::BSHUF_LZ4, + .channel = channel + }; - CBORImage image{ - .data = pedestal.at(pedestal.size() - 1).data(), - .size = pedestal.at(pedestal.size() - 1).size(), - .xpixel = (size_t) xpixel, - .ypixel = (size_t) ypixel, - .pixel_depth_bytes = 2, - .pixel_is_signed = false, - .pixel_is_float = false, - .algorithm = CompressionAlgorithm::BSHUF_LZ4, - .channel = channel - }; - - message.AddCalibration(image); + message.AddCalibration(image); + } } } } + + if (rad_int_mapping) { + message.rad_int_bin_number = rad_int_mapping->GetBinNumber(); + message.rad_int_bin_to_q = rad_int_mapping->GetBinToQ(); + message.rad_int_solid_angle_corr = rad_int_mapping->GetSolidAngleCorr(); + } else + message.rad_int_bin_number = 0; + + image_pusher.StartDataCollection(message); } - if (rad_int_mapping) { - message.rad_int_bin_number = rad_int_mapping->GetBinNumber(); - message.rad_int_bin_to_q = rad_int_mapping->GetBinToQ(); - message.rad_int_solid_angle_corr = rad_int_mapping->GetSolidAngleCorr(); - } else - message.rad_int_bin_number = 0; + for (int i = 0; i < experiment.GetImageNum(); i++) + images_to_go.Put(i); - image_pusher.StartDataCollection(message); + // Setup frames summation and forwarding + for (int i = 0; i < frame_transformation_nthreads; i++) { + auto handle = std::async(std::launch::async, &JFJochReceiver::FrameTransformationThread, this); + frame_transformation_futures.emplace_back(std::move(handle)); + } + + logger.Info("Image compression/forwarding threads started"); + + frame_transformation_ready.wait(); + logger.Info("Image compression/forwarding threads ready"); } - for (int i = 0; i < experiment.GetImageNum(); i++) - images_to_go.Put(i); + data_acquisition_ready.wait(); - // Setup frames summation and forwarding - for (int i = 0; i < frame_transformation_nthreads; i++) { - auto handle = std::async(std::launch::async, &JFJochReceiver::FrameTransformationThread, this); - frame_transformation_futures.emplace_back(std::move(handle)); - } + logger.Info("Acquisition devices ready"); - logger.Info("Image compression/forwarding threads started"); + start_time = std::chrono::system_clock::now(); + logger.Info("Receiving data started"); - frame_transformation_ready.wait(); - logger.Info("Image compression/forwarding threads ready"); + measurement = std::async(std::launch::async, &JFJochReceiver::FinalizeMeasurement, this); + } catch (...) { + free(send_buffer); + throw; } - - data_acquisition_ready.wait(); - - logger.Info("Acquisition devices ready"); - - start_time = std::chrono::system_clock::now(); - logger.Info("Receiving data started"); - - measurement = std::async(std::launch::async, &JFJochReceiver::FinalizeMeasurement, this); } void JFJochReceiver::AcquireThread(uint16_t data_stream) { @@ -323,11 +332,11 @@ void JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t module pedestal_result[offset].SetCollectionTime(start_time.time_since_epoch().count() / 1e9); } catch (const JFJochException &e) { Cancel(e); } logger.Info("Pedestal calculation thread for data stream {} module {} storage cell {} -> header {} done", - data_stream, module_number, storage_cell, storage_cell_header); + data_stream, module_number, storage_cell, storage_cell_header); } void JFJochReceiver::MiniSummationThread(int d, int m, size_t image_number, bool &send_image, - FrameTransformation &transformation, DataMessage &message) { + FrameTransformation &transformation, DataMessage &message) { for (int j = 0; j < experiment.GetSummation(); j++) { size_t frame_number = image_number * experiment.GetSummation() + j; acquisition_device[d]->Counters().WaitForFrame(frame_number + 2); @@ -370,7 +379,7 @@ void JFJochReceiver::FrameTransformationThread() { FrameTransformation transformation(experiment); - std::unique_ptr spot_finder; + std::unique_ptr spot_finder; try { if (rad_int_mapping) { @@ -542,7 +551,7 @@ void JFJochReceiver::FrameTransformationThread() { if (image_number % experiment.GetDataFileCount() < rad_int_profile_per_file.size()) rad_int_profile_per_file[image_number % experiment.GetDataFileCount()] ->Add(spot_finder->GetRadialIntegrationSum(), - spot_finder->GetRadialIntegrationCount()); + spot_finder->GetRadialIntegrationCount()); } if (send_preview) @@ -554,7 +563,7 @@ void JFJochReceiver::FrameTransformationThread() { message.receiver_available_send_buffers = GetAvailableSendBuffers(); auto send_buffer_handle = send_buffer_avail.GetBlocking(); - auto ptr = send_buffer.data() + send_buffer_size * send_buffer_handle; + auto ptr = send_buffer + send_buffer_size * send_buffer_handle; JFJochFrameSerializer serializer(ptr, send_buffer_size); PrepareCBORImage(message, experiment, nullptr, 0); serializer.SerializeImage(message); @@ -726,6 +735,7 @@ void JFJochReceiver::StopReceiver() { JFJochReceiver::~JFJochReceiver() { if (measurement.valid()) measurement.get(); + free(send_buffer); } JFJochProtoBuf::DataProcessingSettings JFJochReceiver::GetDataProcessingSettings() { diff --git a/receiver/JFJochReceiver.h b/receiver/JFJochReceiver.h index 438ad71c..4f1ce8c1 100644 --- a/receiver/JFJochReceiver.h +++ b/receiver/JFJochReceiver.h @@ -99,7 +99,7 @@ class JFJochReceiver { const size_t send_buffer_count; ThreadSafeFIFO send_buffer_avail; - std::vector send_buffer; + uint8_t *send_buffer; std::vector send_buffer_zero_copy_ret_val; NUMAHWPolicy numa_policy;