JFJochReceiver: send buffer is pointer with more flexibility

This commit is contained in:
2023-08-04 12:46:58 +02:00
parent 37afbce802
commit 7d945c8195
2 changed files with 176 additions and 166 deletions
+175 -165
View File
@@ -40,200 +40,209 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings,
data_acquisition_ready(ndatastreams),
frame_transformation_ready((experiment.GetImageNum() > 0) ? frame_transformation_nthreads : 0),
send_buffer_count(in_send_buffer_count),
send_buffer(send_buffer_size * send_buffer_count),
indexing_solution_per_file(experiment.GetDataFileCount()),
numa_policy(in_numa_policy)
{
if (settings.has_calibration()) {
calib.emplace(settings.calibration());
one_byte_mask = calib->CalculateOneByteMask(experiment);
} else {
one_byte_mask.resize(experiment.GetPixelsNum());
for (auto &i: one_byte_mask) i = 1;
}
send_buffer = (uint8_t *) calloc(send_buffer_size, send_buffer_count);
for (uint32_t i = 0; i < send_buffer_count; i++) {
send_buffer_avail.Put(i);
send_buffer_zero_copy_ret_val.emplace_back(send_buffer_avail, i);
}
if (experiment.GetConversionOnCPU())
PrepareConversionOnCPU();
if (!experiment.CheckGitSha1Consistent())
logger.Warning(experiment.CheckGitSha1Msg());
push_images_to_writer = (experiment.GetImageNum() > 0) && (!experiment.GetFilePrefix().empty());
if (acquisition_device.size() < ndatastreams)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Number of acquisition devices has to match data streams");
if (frame_transformation_nthreads <= 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Number of threads must be more than zero");
preview_stride = experiment.GetPreviewStride();
spotfinder_stride = experiment.GetSpotFindingStride();
logger.Info("NUMA policy: {}", numa_policy.GetName());
logger.Info("Image stride for data analysis: preview {}, spot finding/radial integration {}",
preview_stride, spotfinder_stride);
if (experiment.GetDetectorMode() == DetectorMode::Conversion) {
if (preview_publisher != nullptr)
preview_publisher->Start(experiment, calib.value());
if (preview_publisher_indexed != nullptr)
preview_publisher_indexed->Start(experiment, calib.value());
if (!GPUImageAnalysis::GPUPresent())
logger.Info("GPU support missing");
rad_int_mapping = std::make_unique<RadialIntegrationMapping>(experiment, one_byte_mask.data());
rad_int_profile = std::make_unique<RadialIntegrationProfile>(*rad_int_mapping, experiment);
rad_int_corr = CalcRadIntCorr(experiment);
for (int i = 0; i < experiment.GetDataFileCount(); i++)
rad_int_profile_per_file.emplace_back(std::make_unique<RadialIntegrationProfile>(*rad_int_mapping, experiment));
spot_finder_mask = calib->CalculateOneByteMask(experiment);
}
for (int d = 0; d < ndatastreams; d++) {
if (calib)
acquisition_device[d]->InitializeCalibration(experiment, calib.value());
acquisition_device[d]->PrepareAction(experiment);
logger.Debug("Acquisition device {} prepared", d);
}
for (int d = 0; d < ndatastreams; d++)
data_acquisition_futures.emplace_back(std::async(std::launch::async, &JFJochReceiver::AcquireThread,
this, d));
logger.Info("Data acquisition devices ready");
if ((experiment.GetDetectorMode() == DetectorMode::PedestalG0)
|| (experiment.GetDetectorMode() == DetectorMode::PedestalG1)
|| (experiment.GetDetectorMode() == DetectorMode::PedestalG2)) {
if (experiment.GetImageNum() > 0) {
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Saving and calculating pedestal is not supported for the time being");
try {
if (settings.has_calibration()) {
calib.emplace(settings.calibration());
one_byte_mask = calib->CalculateOneByteMask(experiment);
} else {
one_byte_mask.resize(experiment.GetPixelsNum());
for (auto &i: one_byte_mask) i = 1;
}
if (experiment.GetDetectorMode() == DetectorMode::PedestalG0) {
pedestal_result.resize(experiment.GetModulesNum() * experiment.GetStorageCellNumber());
for (int s = 0; s < experiment.GetStorageCellNumber(); s++) {
for (uint32_t i = 0; i < send_buffer_count; i++) {
send_buffer_avail.Put(i);
send_buffer_zero_copy_ret_val.emplace_back(send_buffer_avail, i);
}
if (experiment.GetConversionOnCPU())
PrepareConversionOnCPU();
if (!experiment.CheckGitSha1Consistent())
logger.Warning(experiment.CheckGitSha1Msg());
push_images_to_writer = (experiment.GetImageNum() > 0) && (!experiment.GetFilePrefix().empty());
if (acquisition_device.size() < ndatastreams)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Number of acquisition devices has to match data streams");
if (frame_transformation_nthreads <= 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Number of threads must be more than zero");
preview_stride = experiment.GetPreviewStride();
spotfinder_stride = experiment.GetSpotFindingStride();
logger.Info("NUMA policy: {}", numa_policy.GetName());
logger.Info("Image stride for data analysis: preview {}, spot finding/radial integration {}",
preview_stride, spotfinder_stride);
if (experiment.GetDetectorMode() == DetectorMode::Conversion) {
if (preview_publisher != nullptr)
preview_publisher->Start(experiment, calib.value());
if (preview_publisher_indexed != nullptr)
preview_publisher_indexed->Start(experiment, calib.value());
if (!GPUImageAnalysis::GPUPresent())
logger.Info("GPU support missing");
rad_int_mapping = std::make_unique<RadialIntegrationMapping>(experiment, one_byte_mask.data());
rad_int_profile = std::make_unique<RadialIntegrationProfile>(*rad_int_mapping, experiment);
rad_int_corr = CalcRadIntCorr(experiment);
for (int i = 0; i < experiment.GetDataFileCount(); i++)
rad_int_profile_per_file.emplace_back(
std::make_unique<RadialIntegrationProfile>(*rad_int_mapping, experiment));
spot_finder_mask = calib->CalculateOneByteMask(experiment);
}
for (int d = 0; d < ndatastreams; d++) {
if (calib)
acquisition_device[d]->InitializeCalibration(experiment, calib.value());
acquisition_device[d]->PrepareAction(experiment);
logger.Debug("Acquisition device {} prepared", d);
}
for (int d = 0; d < ndatastreams; d++)
data_acquisition_futures.emplace_back(std::async(std::launch::async, &JFJochReceiver::AcquireThread,
this, d));
logger.Info("Data acquisition devices ready");
if ((experiment.GetDetectorMode() == DetectorMode::PedestalG0)
|| (experiment.GetDetectorMode() == DetectorMode::PedestalG1)
|| (experiment.GetDetectorMode() == DetectorMode::PedestalG2)) {
if (experiment.GetImageNum() > 0) {
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Saving and calculating pedestal is not supported for the time being");
}
if (experiment.GetDetectorMode() == DetectorMode::PedestalG0) {
pedestal_result.resize(experiment.GetModulesNum() * experiment.GetStorageCellNumber());
for (int s = 0; s < experiment.GetStorageCellNumber(); s++) {
for (int d = 0; d < ndatastreams; d++) {
for (int m = 0; m < experiment.GetModulesNum(d); m++) {
auto handle = std::async(std::launch::async, &JFJochReceiver::MeasurePedestalThread, this,
d, m,
s);
frame_transformation_futures.emplace_back(std::move(handle));
}
}
}
} else {
pedestal_result.resize(experiment.GetModulesNum());
for (int d = 0; d < ndatastreams; d++) {
for (int m = 0; m < experiment.GetModulesNum(d); m++) {
auto handle = std::async(std::launch::async, &JFJochReceiver::MeasurePedestalThread, this, d, m,
s);
0);
frame_transformation_futures.emplace_back(std::move(handle));
}
}
}
} else {
pedestal_result.resize(experiment.GetModulesNum());
for (int d = 0; d < ndatastreams; d++) {
for (int m = 0; m < experiment.GetModulesNum(d); m++) {
auto handle = std::async(std::launch::async, &JFJochReceiver::MeasurePedestalThread, this, d, m, 0);
frame_transformation_futures.emplace_back(std::move(handle));
}
}
logger.Info("Pedestal threads ready");
}
logger.Info("Pedestal threads ready");
}
if (experiment.GetImageNum() > 0) {
logger.Info("Data file count {}", experiment.GetDataFileCount());
if (experiment.GetImageNum() > 0) {
logger.Info("Data file count {}", experiment.GetDataFileCount());
if (push_images_to_writer) {
StartMessage message{};
experiment.FillMessage(message);
message.arm_date = time_UTC(std::chrono::system_clock::now());
if (push_images_to_writer) {
StartMessage message{};
experiment.FillMessage(message);
message.arm_date = time_UTC(std::chrono::system_clock::now());
JFJochBitShuffleCompressor compressor(CompressionAlgorithm::BSHUF_LZ4);
std::vector<uint8_t> pixel_mask;
std::vector<std::vector<uint8_t> > pedestal;
JFJochBitShuffleCompressor compressor(CompressionAlgorithm::BSHUF_LZ4);
std::vector<uint8_t> pixel_mask;
std::vector<std::vector<uint8_t> > pedestal;
if (calib) {
size_t xpixel = experiment.GetXPixelsNum();
size_t ypixel = experiment.GetYPixelsNum();
if (calib) {
size_t xpixel = experiment.GetXPixelsNum();
size_t ypixel = experiment.GetYPixelsNum();
pixel_mask = compressor.Compress(calib->CalculateNexusMask(experiment, 0));
message.AddPixelMask(CBORImage{
.data = pixel_mask.data(),
.size = pixel_mask.size(),
.xpixel = (size_t) xpixel,
.ypixel = (size_t) ypixel,
.pixel_depth_bytes = 4,
.pixel_is_signed = false,
.pixel_is_float = false,
.algorithm = CompressionAlgorithm::BSHUF_LZ4,
.channel = "sc0"
});
if (experiment.GetSaveCalibration()) {
for (int sc = 0; sc < experiment.GetStorageCellNumber(); sc++) {
for (int gain = 0; gain < 3; gain++) {
auto tmp = compressor.Compress(calib->GetPedestal(gain, sc));
pedestal.emplace_back(tmp);
std::string channel = "pedestal_G" + std::to_string(gain);
pixel_mask = compressor.Compress(calib->CalculateNexusMask(experiment, 0));
message.AddPixelMask(CBORImage{
.data = pixel_mask.data(),
.size = pixel_mask.size(),
.xpixel = (size_t) xpixel,
.ypixel = (size_t) ypixel,
.pixel_depth_bytes = 4,
.pixel_is_signed = false,
.pixel_is_float = false,
.algorithm = CompressionAlgorithm::BSHUF_LZ4,
.channel = "sc0"
});
if (experiment.GetSaveCalibration()) {
for (int sc = 0; sc < experiment.GetStorageCellNumber(); sc++) {
for (int gain = 0; gain < 3; gain++) {
auto tmp = compressor.Compress(calib->GetPedestal(gain, sc));
pedestal.emplace_back(tmp);
std::string channel = "pedestal_G" + std::to_string(gain);
if (experiment.GetStorageCellNumber() > 1)
channel += "_sc" + std::to_string(sc);
if (experiment.GetStorageCellNumber() > 1)
channel += "_sc" + std::to_string(sc);
CBORImage image{
.data = pedestal.at(pedestal.size() - 1).data(),
.size = pedestal.at(pedestal.size() - 1).size(),
.xpixel = (size_t) xpixel,
.ypixel = (size_t) ypixel,
.pixel_depth_bytes = 2,
.pixel_is_signed = false,
.pixel_is_float = false,
.algorithm = CompressionAlgorithm::BSHUF_LZ4,
.channel = channel
};
CBORImage image{
.data = pedestal.at(pedestal.size() - 1).data(),
.size = pedestal.at(pedestal.size() - 1).size(),
.xpixel = (size_t) xpixel,
.ypixel = (size_t) ypixel,
.pixel_depth_bytes = 2,
.pixel_is_signed = false,
.pixel_is_float = false,
.algorithm = CompressionAlgorithm::BSHUF_LZ4,
.channel = channel
};
message.AddCalibration(image);
message.AddCalibration(image);
}
}
}
}
if (rad_int_mapping) {
message.rad_int_bin_number = rad_int_mapping->GetBinNumber();
message.rad_int_bin_to_q = rad_int_mapping->GetBinToQ();
message.rad_int_solid_angle_corr = rad_int_mapping->GetSolidAngleCorr();
} else
message.rad_int_bin_number = 0;
image_pusher.StartDataCollection(message);
}
if (rad_int_mapping) {
message.rad_int_bin_number = rad_int_mapping->GetBinNumber();
message.rad_int_bin_to_q = rad_int_mapping->GetBinToQ();
message.rad_int_solid_angle_corr = rad_int_mapping->GetSolidAngleCorr();
} else
message.rad_int_bin_number = 0;
for (int i = 0; i < experiment.GetImageNum(); i++)
images_to_go.Put(i);
image_pusher.StartDataCollection(message);
// Setup frames summation and forwarding
for (int i = 0; i < frame_transformation_nthreads; i++) {
auto handle = std::async(std::launch::async, &JFJochReceiver::FrameTransformationThread, this);
frame_transformation_futures.emplace_back(std::move(handle));
}
logger.Info("Image compression/forwarding threads started");
frame_transformation_ready.wait();
logger.Info("Image compression/forwarding threads ready");
}
for (int i = 0; i < experiment.GetImageNum(); i++)
images_to_go.Put(i);
data_acquisition_ready.wait();
// Setup frames summation and forwarding
for (int i = 0; i < frame_transformation_nthreads; i++) {
auto handle = std::async(std::launch::async, &JFJochReceiver::FrameTransformationThread, this);
frame_transformation_futures.emplace_back(std::move(handle));
}
logger.Info("Acquisition devices ready");
logger.Info("Image compression/forwarding threads started");
start_time = std::chrono::system_clock::now();
logger.Info("Receiving data started");
frame_transformation_ready.wait();
logger.Info("Image compression/forwarding threads ready");
measurement = std::async(std::launch::async, &JFJochReceiver::FinalizeMeasurement, this);
} catch (...) {
free(send_buffer);
throw;
}
data_acquisition_ready.wait();
logger.Info("Acquisition devices ready");
start_time = std::chrono::system_clock::now();
logger.Info("Receiving data started");
measurement = std::async(std::launch::async, &JFJochReceiver::FinalizeMeasurement, this);
}
void JFJochReceiver::AcquireThread(uint16_t data_stream) {
@@ -323,11 +332,11 @@ void JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t module
pedestal_result[offset].SetCollectionTime(start_time.time_since_epoch().count() / 1e9);
} catch (const JFJochException &e) { Cancel(e); }
logger.Info("Pedestal calculation thread for data stream {} module {} storage cell {} -> header {} done",
data_stream, module_number, storage_cell, storage_cell_header);
data_stream, module_number, storage_cell, storage_cell_header);
}
void JFJochReceiver::MiniSummationThread(int d, int m, size_t image_number, bool &send_image,
FrameTransformation &transformation, DataMessage &message) {
FrameTransformation &transformation, DataMessage &message) {
for (int j = 0; j < experiment.GetSummation(); j++) {
size_t frame_number = image_number * experiment.GetSummation() + j;
acquisition_device[d]->Counters().WaitForFrame(frame_number + 2);
@@ -370,7 +379,7 @@ void JFJochReceiver::FrameTransformationThread() {
FrameTransformation transformation(experiment);
std::unique_ptr<GPUImageAnalysis> spot_finder;
std::unique_ptr<GPUImageAnalysis> spot_finder;
try {
if (rad_int_mapping) {
@@ -542,7 +551,7 @@ void JFJochReceiver::FrameTransformationThread() {
if (image_number % experiment.GetDataFileCount() < rad_int_profile_per_file.size())
rad_int_profile_per_file[image_number % experiment.GetDataFileCount()]
->Add(spot_finder->GetRadialIntegrationSum(),
spot_finder->GetRadialIntegrationCount());
spot_finder->GetRadialIntegrationCount());
}
if (send_preview)
@@ -554,7 +563,7 @@ void JFJochReceiver::FrameTransformationThread() {
message.receiver_available_send_buffers = GetAvailableSendBuffers();
auto send_buffer_handle = send_buffer_avail.GetBlocking();
auto ptr = send_buffer.data() + send_buffer_size * send_buffer_handle;
auto ptr = send_buffer + send_buffer_size * send_buffer_handle;
JFJochFrameSerializer serializer(ptr, send_buffer_size);
PrepareCBORImage(message, experiment, nullptr, 0);
serializer.SerializeImage(message);
@@ -726,6 +735,7 @@ void JFJochReceiver::StopReceiver() {
JFJochReceiver::~JFJochReceiver() {
if (measurement.valid())
measurement.get();
free(send_buffer);
}
JFJochProtoBuf::DataProcessingSettings JFJochReceiver::GetDataProcessingSettings() {
+1 -1
View File
@@ -99,7 +99,7 @@ class JFJochReceiver {
const size_t send_buffer_count;
ThreadSafeFIFO<uint32_t> send_buffer_avail;
std::vector<uint8_t> send_buffer;
uint8_t *send_buffer;
std::vector<ZeroCopyReturnValue> send_buffer_zero_copy_ret_val;
NUMAHWPolicy numa_policy;