// Copyright (2019-2023) Paul Scherrer Institute #include "JFJochReceiver.h" #include #include "../jungfrau/JFPedestalCalc.h" #include "../image_analysis/IndexerWrapper.h" #include "../common/DiffractionGeometry.h" inline std::string time_UTC(const std::chrono::time_point &input) { auto time_ms = std::chrono::duration_cast(input.time_since_epoch()).count(); char buf1[255], buf2[255]; time_t time = time_ms / (1000); strftime(buf1, sizeof(buf1), "%FT%T", gmtime(&time)); snprintf(buf2, sizeof(buf2), ".%06ld", time_ms%1000); return std::string(buf1) + std::string(buf2) + "Z"; } JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment, const JFCalibration *in_calibration, AcquisitionDeviceGroup &in_aq_device, ImagePusher &in_image_sender, Logger &in_logger, int64_t in_forward_and_sum_nthreads, int64_t in_send_buffer_count, ZMQPreviewPublisher* in_preview_publisher, const NUMAHWPolicy &in_numa_policy) : experiment(in_experiment), calibration(in_calibration), acquisition_device(in_aq_device), logger(in_logger), image_pusher(in_image_sender), frame_transformation_nthreads(in_forward_and_sum_nthreads), preview_publisher(in_preview_publisher), ndatastreams(experiment.GetDataStreamsNum()), data_acquisition_ready(ndatastreams), frame_transformation_ready((experiment.GetImageNum() > 0) ? frame_transformation_nthreads : 0), send_buffer_count(in_send_buffer_count), indexing_solution_per_file(experiment.GetDataFileCount()), numa_policy(in_numa_policy), adu_histogram_module(experiment.GetModulesNum()) { send_buffer = (uint8_t *) malloc(send_buffer_size * send_buffer_count); try { if (calibration != nullptr) { one_byte_mask = calibration->CalculateOneByteMask(experiment); } else { one_byte_mask.resize(experiment.GetPixelsNum()); for (auto &i: one_byte_mask) i = 1; } for (uint32_t i = 0; i < send_buffer_count; i++) { send_buffer_avail.Put(i); send_buffer_zero_copy_ret_val.emplace_back(send_buffer_avail, i); } if (!experiment.CheckGitSha1Consistent()) logger.Warning(experiment.CheckGitSha1Msg()); push_images_to_writer = (experiment.GetImageNum() > 0) && (!experiment.GetFilePrefix().empty()); if (acquisition_device.size() < ndatastreams) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Number of acquisition devices has to match data streams"); if (frame_transformation_nthreads <= 0) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Number of threads must be more than zero"); logger.Info("NUMA policy: {}", numa_policy.GetName()); if (experiment.GetDetectorMode() == DetectorMode::Conversion) { rad_int_mapping = std::make_unique(experiment); rad_int_profile = std::make_unique(*rad_int_mapping, experiment); rad_int_corr = CalcRadIntCorr(experiment); rad_int_corr_raw = CalcRadIntCorrRawCoord(experiment); for (int i = 0; i < experiment.GetDataFileCount(); i++) rad_int_profile_per_file.emplace_back( std::make_unique(*rad_int_mapping, experiment)); find_spots = true; } for (int d = 0; d < ndatastreams; d++) { acquisition_device[d].PrepareAction(experiment); logger.Debug("Acquisition device {} prepared", d); } for (int d = 0; d < ndatastreams; d++) data_acquisition_futures.emplace_back(std::async(std::launch::async, &JFJochReceiver::AcquireThread, this, d)); logger.Info("Data acquisition devices ready"); if ((experiment.GetDetectorMode() == DetectorMode::PedestalG0) || (experiment.GetDetectorMode() == DetectorMode::PedestalG1) || (experiment.GetDetectorMode() == DetectorMode::PedestalG2)) { if (experiment.GetImageNum() > 0) { throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Saving and calculating pedestal is not supported for the time being"); } if (experiment.GetDetectorMode() == DetectorMode::PedestalG0) { pedestal_result.resize(experiment.GetModulesNum() * experiment.GetStorageCellNumber()); for (int s = 0; s < experiment.GetStorageCellNumber(); s++) { for (int d = 0; d < ndatastreams; d++) { for (int m = 0; m < experiment.GetModulesNum(d); m++) { auto handle = std::async(std::launch::async, &JFJochReceiver::MeasurePedestalThread, this, d, m, s); frame_transformation_futures.emplace_back(std::move(handle)); } } } } else { pedestal_result.resize(experiment.GetModulesNum()); for (int d = 0; d < ndatastreams; d++) { for (int m = 0; m < experiment.GetModulesNum(d); m++) { auto handle = std::async(std::launch::async, &JFJochReceiver::MeasurePedestalThread, this, d, m, 0); frame_transformation_futures.emplace_back(std::move(handle)); } } } logger.Info("Pedestal threads ready"); } if (experiment.GetImageNum() > 0) { logger.Info("Data file count {}", experiment.GetDataFileCount()); StartMessage message{}; experiment.FillMessage(message); message.arm_date = time_UTC(std::chrono::system_clock::now()); JFJochBitShuffleCompressor compressor(CompressionAlgorithm::BSHUF_LZ4); std::vector pixel_mask; std::vector > pedestal; if (calibration != nullptr) { size_t xpixel = experiment.GetXPixelsNum(); size_t ypixel = experiment.GetYPixelsNum(); pixel_mask = compressor.Compress(calibration->CalculateNexusMask(experiment, 0)); message.AddPixelMask(CBORImage{ .data = pixel_mask.data(), .size = pixel_mask.size(), .xpixel = (size_t) xpixel, .ypixel = (size_t) ypixel, .pixel_depth_bytes = 4, .pixel_is_signed = false, .pixel_is_float = false, .algorithm = CompressionAlgorithm::BSHUF_LZ4, .channel = "sc0" }); if (experiment.GetSaveCalibration()) { for (int sc = 0; sc < experiment.GetStorageCellNumber(); sc++) { for (int gain = 0; gain < 3; gain++) { auto tmp = compressor.Compress(calibration->GetPedestal(gain, sc)); pedestal.emplace_back(tmp); std::string channel = "pedestal_G" + std::to_string(gain); if (experiment.GetStorageCellNumber() > 1) channel += "_sc" + std::to_string(sc); CBORImage image{ .data = pedestal.at(pedestal.size() - 1).data(), .size = pedestal.at(pedestal.size() - 1).size(), .xpixel = (size_t) xpixel, .ypixel = (size_t) ypixel, .pixel_depth_bytes = 2, .pixel_is_signed = false, .pixel_is_float = false, .algorithm = CompressionAlgorithm::BSHUF_LZ4, .channel = channel }; message.AddCalibration(image); } } } } if (rad_int_mapping) { message.rad_int_bin_number = rad_int_mapping->GetBinNumber(); message.rad_int_bin_to_q = rad_int_mapping->GetBinToQ(); message.rad_int_solid_angle_corr = rad_int_mapping->GetSolidAngleCorr(); } else message.rad_int_bin_number = 0; std::vector serialization_buffer(message.approx_size); JFJochFrameSerializer serializer(serialization_buffer.data(), serialization_buffer.size()); serializer.SerializeSequenceStart(message); if (preview_publisher != nullptr) preview_publisher->StartDataCollection(serialization_buffer.data(), serializer.GetBufferSize(), experiment.GetPreviewStride()); if (push_images_to_writer) image_pusher.StartDataCollection(serialization_buffer.data(), serializer.GetBufferSize(), experiment.GetDataFileCount()); for (int i = 0; i < experiment.GetImageNum(); i++) images_to_go.Put(i); // Setup frames summation and forwarding for (int i = 0; i < frame_transformation_nthreads; i++) { auto handle = std::async(std::launch::async, &JFJochReceiver::FrameTransformationThread, this); frame_transformation_futures.emplace_back(std::move(handle)); } logger.Info("Image compression/forwarding threads started"); frame_transformation_ready.wait(); logger.Info("Image compression/forwarding threads ready"); } data_acquisition_ready.wait(); logger.Info("Acquisition devices ready"); start_time = std::chrono::system_clock::now(); logger.Info("Receiving data started"); measurement = std::async(std::launch::async, &JFJochReceiver::FinalizeMeasurement, this); } catch (...) { free(send_buffer); throw; } } void JFJochReceiver::AcquireThread(uint16_t data_stream) { try { NUMAHWPolicy::RunOnNode(acquisition_device[data_stream].GetNUMANode()); } catch (const JFJochException &e) { logger.Error("HW bind error {}", e.what()); } try { if (calibration != nullptr) acquisition_device[data_stream].InitializeCalibration(experiment, *calibration); if (rad_int_mapping) acquisition_device[data_stream].InitializeIntegrationMap(experiment, rad_int_mapping->GetPixelToBinMappingRaw(), rad_int_corr_raw); frame_transformation_ready.wait(); logger.Debug("Device thread {} start FPGA action", data_stream); acquisition_device[data_stream].StartAction(experiment); } catch (const JFJochException &e) { Cancel(e); data_acquisition_ready.count_down(); } data_acquisition_ready.count_down(); try { logger.Debug("Device thread {} wait for FPGA action complete", data_stream); acquisition_device[data_stream].WaitForActionComplete(); } catch (const JFJochException &e) { Cancel(e); } logger.Info("Device thread {} done", data_stream); } void JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t module_number, uint16_t storage_cell) { try { NUMAHWPolicy::RunOnNode(acquisition_device[data_stream].GetNUMANode()); } catch (const JFJochException &e) { logger.Error("HW bind error {}", e.what()); } JFPedestalCalc pedestal_calc(experiment); bool storage_cell_G1G2 = (experiment.GetStorageCellNumber() > 1) && ((experiment.GetDetectorMode() == DetectorMode::PedestalG1) || (experiment.GetDetectorMode() == DetectorMode::PedestalG2)); size_t staring_frame; size_t frame_stride; size_t offset = experiment.GetFirstModuleOfDataStream(data_stream) + module_number; if (experiment.GetDetectorMode() == DetectorMode::PedestalG0) { staring_frame = storage_cell; frame_stride = experiment.GetStorageCellNumber(); offset += experiment.GetModulesNum() * storage_cell; } else { staring_frame = 0; frame_stride = 1; } uint32_t storage_cell_header = UINT32_MAX; logger.Debug("Pedestal calculation thread for data stream {} module {} storage cell {} starting", data_stream, module_number, storage_cell); try { for (size_t frame = staring_frame; frame < experiment.GetFrameNum(); frame += frame_stride) { // Frame will be processed only if one already collects frame+2 acquisition_device[data_stream].Counters().WaitForFrame(frame + 2, module_number); if (!storage_cell_G1G2 || (frame % 2 == 1)) { // Partial packets will bring more problems, than benefit if (acquisition_device[data_stream].Counters().IsFullModuleCollected(frame, module_number)) { pedestal_calc.AnalyzeImage((uint16_t *) acquisition_device[data_stream].GetDeviceOutput(frame, module_number)->pixels); } auto tmp = acquisition_device[data_stream].Counters().GetCompletion(frame, module_number).debug; storage_cell_header = (tmp >> 8) & 0xF; } acquisition_device[data_stream].FrameBufferRelease(frame, module_number); UpdateMaxDelay(acquisition_device[data_stream].Counters().CalculateDelay(frame, module_number)); UpdateMaxImage(frame); } if (experiment.GetDetectorMode() == DetectorMode::PedestalG0) pedestal_calc.Export(pedestal_result[offset], 2); else pedestal_calc.Export(pedestal_result[offset]); pedestal_result[offset].SetFrameCount(experiment.GetFrameNum()); pedestal_result[offset].SetCollectionTime(start_time.time_since_epoch().count() / 1e9); } catch (const JFJochException &e) { Cancel(e); } logger.Info("Pedestal calculation thread for data stream {} module {} storage cell {} -> header {} done", data_stream, module_number, storage_cell, storage_cell_header); } void JFJochReceiver::FrameTransformationThread() { try { numa_policy.Bind(); } catch (const JFJochException &e) { frame_transformation_ready.count_down(); logger.Error("HW bind error {}", e.what()); Cancel(e); } FrameTransformation transformation(experiment); std::vector writer_buffer(experiment.GetMaxCompressedSize()); std::unique_ptr indexer; if (experiment.HasUnitCell()) { indexer = std::make_unique(); indexer->Setup(experiment.GetUnitCell()); } frame_transformation_ready.count_down(); uint64_t image_number; while (images_to_go.Get(image_number) != 0) { try { // If data acquisition is finished and fastest frame for the first device is behind acquisition_device[0].Counters().WaitForFrame(image_number + 1); if (acquisition_device[0].Counters().IsAcquisitionFinished() && (acquisition_device[0].Counters().GetFastestFrameNumber() < image_number)) continue; DataMessage message{}; message.number = image_number; message.timestamp_base = 10*1000*1000; message.exptime_base = 10*1000*1000; message.indexing_result = 0; bool indexed = false; bool send_image = false; // We send image if at least one module was collected in full std::unique_ptr rad_int_profile_image; if (rad_int_mapping) rad_int_profile_image = std::make_unique(*rad_int_mapping, experiment); StrongPixelSet strong_pixel_set(experiment); for (int d = 0; d < ndatastreams; d++) { for (int m = 0; m < experiment.GetModulesNum(d); m++) { acquisition_device[d].Counters().WaitForFrame(image_number + 1, m); const int16_t *src; if (acquisition_device[d].Counters().IsAnyPacketCollected(image_number, m)) { src = acquisition_device[d].GetDeviceOutput(image_number, m)->pixels; if (!send_image) { Completion c = acquisition_device[d].Counters().GetCompletion(image_number, m); // the information is for first module/frame that was collected in full message.bunch_id = c.bunchid; message.jf_info = c.debug; message.storage_cell = (message.jf_info >> 8) & 0xF; message.timestamp = c.timestamp; message.exptime = c.exptime; } send_image = true; size_t module_abs_number = experiment.GetFirstModuleOfDataStream(d) + m; adu_histogram_module[module_abs_number].Add(*acquisition_device[d].GetDeviceOutput(image_number, m)); adu_histogram_total.Add(*acquisition_device[d].GetDeviceOutput(image_number, m)); if (rad_int_profile_image) rad_int_profile_image->Add(*acquisition_device[d].GetDeviceOutput(image_number, m)); if (find_spots) strong_pixel_set.ReadFPGAOutput(experiment, *acquisition_device[d].GetDeviceOutput(image_number, m), m); } else src = acquisition_device[d].GetErrorFrameBuffer(); transformation.ProcessModule(src, m, d); acquisition_device[d].FrameBufferRelease(image_number, m); } auto delay = acquisition_device[d].Counters().CalculateDelay(image_number); UpdateMaxDelay(delay); if (delay > message.receiver_aq_dev_delay) message.receiver_aq_dev_delay = delay; } if (send_image) { std::vector spots; auto local_data_processing_settings = GetDataProcessingSettings(); if (find_spots) { for (const auto &spot: spots) message.spots.push_back(spot); spot_count.AddElement(image_number, spots.size()); if (indexer) { std::vector recip; for (const auto &i: spots) recip.push_back(i.ReciprocalCoord(experiment)); auto indexer_result = indexer->Run(recip); if (!indexer_result.empty()) { message.indexing_result = 2; indexing_solution.AddElement(image_number, 1); indexing_solution_per_file.Add(image_number % experiment.GetDataFileCount(), 1); for (int i = 0; i < recip.size(); i++) message.spots[i].indexed = indexer_result[0].indexed_spots[i]; indexer_result[0].l.Save(message.indexing_lattice); indexed = true; } else { message.indexing_result = 1; indexing_solution.AddElement(image_number, 0); indexing_solution_per_file.Add(image_number % experiment.GetDataFileCount(), 0); } } } if (rad_int_profile_image) { uint16_t rad_int_min_bin = std::floor(rad_int_mapping->QToBin(experiment.GetLowQForBkgEstimate_recipA())); uint16_t rad_int_max_bin = std::ceil(rad_int_mapping->QToBin(experiment.GetHighQForBkgEstimate_recipA())); float bkg_estimate_val = rad_int_profile_image->GetMeanValueOfBins(rad_int_min_bin, rad_int_max_bin); bkg_estimate.AddElement(image_number, bkg_estimate_val); message.rad_int_profile = rad_int_profile_image->GetResult(); *rad_int_profile += *rad_int_profile_image; if (image_number % experiment.GetDataFileCount() < rad_int_profile_per_file.size()) *rad_int_profile_per_file[image_number % experiment.GetDataFileCount()] += *rad_int_profile_image; } message.receiver_available_send_buffers = GetAvailableSendBuffers(); auto send_buffer_handle = send_buffer_avail.GetBlocking(); auto ptr = send_buffer + send_buffer_size * send_buffer_handle; JFJochFrameSerializer serializer(ptr, send_buffer_size); PrepareCBORImage(message, experiment, nullptr, 0); serializer.SerializeImage(message); if (serializer.GetRemainingBuffer() < experiment.GetMaxCompressedSize()) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Not enough memory to save image"); size_t image_size = transformation.SaveCompressedImage(ptr + serializer.GetImageAppendOffset()); serializer.AppendImage(image_size); if (preview_publisher && (!local_data_processing_settings.preview_indexed_only || indexed)) preview_publisher->SendImage(ptr, serializer.GetBufferSize(), image_number); if (push_images_to_writer) { image_pusher.SendImage(ptr, serializer.GetBufferSize(), image_number, &send_buffer_zero_copy_ret_val[send_buffer_handle]); compressed_size += image_size; } else send_buffer_avail.Put(send_buffer_handle); UpdateMaxImage(image_number); images_sent++; } } catch (const JFJochException &e) { Cancel(e); } } logger.Debug("Sum&compression thread done"); } float JFJochReceiver::GetEfficiency() const { uint64_t expected_packets = 0; uint64_t received_packets = 0; for (int d = 0; d < ndatastreams; d++) { expected_packets += acquisition_device[d].Counters().GetExpectedPackets(); received_packets += acquisition_device[d].Counters().GetTotalPackets(); } if ((expected_packets == received_packets) || (expected_packets == 0)) return 1.0; else return received_packets / static_cast(expected_packets); } JFJochReceiverOutput JFJochReceiver::GetStatistics() const { JFJochReceiverOutput ret; for (int d = 0; d < ndatastreams; d++) { for (int m = 0; m < acquisition_device[d].Counters().GetModuleNumber(); m++) { ret.expected_packets.push_back(acquisition_device[d].Counters().GetExpectedPacketsPerModule()); ret.received_packets.push_back(acquisition_device[d].Counters().GetTotalPackets(m)); } } ret.efficiency = GetEfficiency(); ret.compressed_size = compressed_size; ret.max_image_number_sent = max_image_number_sent; if ((experiment.GetImageNum() > 0) && (compressed_size > 0)) { ret.compressed_ratio = static_cast (images_sent * experiment.GetPixelDepth() * experiment.GetModulesNum() * RAW_MODULE_SIZE) / static_cast (compressed_size); } else ret.compressed_ratio = 0; ret.max_receive_delay = max_delay; ret.images_sent = images_sent; ret.start_time_ms = std::chrono::duration_cast(start_time.time_since_epoch()).count(); ret.end_time_ms = std::chrono::duration_cast(end_time.time_since_epoch()).count(); ret.pedestal_result = pedestal_result; ret.master_file_name = experiment.GetFilePrefix(); ret.cancelled = cancelled; auto tmp = indexing_solution.Mean(); if (!std::isnan(tmp)) ret.indexing_rate = tmp; else ret.indexing_rate = -1; tmp = bkg_estimate.Mean(); if (!std::isnan(tmp)) ret.bkg_estimate = tmp; else ret.bkg_estimate = -1; return ret; } void JFJochReceiver::Cancel() { // Remote abort: This tells FPGAs to stop, but doesn't do anything to CPU code logger.Warning("Cancelling on request"); cancelled = true; for (int d = 0; d < ndatastreams; d++) acquisition_device[d].Cancel(); } void JFJochReceiver::Cancel(const JFJochException &e) { logger.Error("Cancelling data collection due to exception"); logger.ErrorException(e); // Error abort: This tells FPGAs to stop and also prevents deadlock in CPU code, by setting abort to 1 cancelled = true; for (int d = 0; d < ndatastreams; d++) acquisition_device[d].Cancel(); } float JFJochReceiver::GetIndexingRate() const { return indexing_solution.Mean(); } float JFJochReceiver::GetProgress() const { if (experiment.GetImageNum() > 0) return static_cast(max_image_number_sent) / static_cast(experiment.GetImageNum()) * 100.0f; else if (experiment.GetFrameNum() > 0) // Pedestal return static_cast(max_image_number_sent) / static_cast(experiment.GetFrameNum()) * 100.0f; else return 100.0; } void JFJochReceiver::FinalizeMeasurement() { if (!frame_transformation_futures.empty()) { for (auto &future: frame_transformation_futures) future.get(); logger.Info("All processing threads done"); } if (push_images_to_writer) { EndMessage message{}; message.number_of_images = max_image_number_sent; message.max_receiver_delay = max_delay; message.efficiency = GetEfficiency(); message.end_date = time_UTC(std::chrono::system_clock::now()); message.write_master_file = true; if (rad_int_profile) message.rad_int_result["dataset"] = rad_int_profile->GetResult(); for (int i = 0; i < rad_int_profile_per_file.size(); i++) message.rad_int_result["file" + std::to_string(i)] = rad_int_profile_per_file[i]->GetResult(); message.adu_histogram["total"] = adu_histogram_total.GetHistogram(); for (int i = 0; i < adu_histogram_module.size(); i++) message.adu_histogram["module" + std::to_string(i)] = adu_histogram_module[i].GetHistogram(); image_pusher.EndDataCollection(message); logger.Info("Disconnected from writers"); } for (int d = 0; d < ndatastreams; d++) acquisition_device[d].Cancel(); end_time = std::chrono::system_clock::now(); for (auto &future : data_acquisition_futures) future.get(); for (int i = 0; i < send_buffer_count; i++) send_buffer_avail.GetBlocking(); logger.Info("Devices stopped"); logger.Info("Receiving data done"); } void JFJochReceiver::SetDataProcessingSettings(const DataProcessingSettings &in_data_processing_settings) { std::unique_lock ul(data_processing_settings_mutex); DiffractionExperiment::CheckDataProcessingSettings(in_data_processing_settings); data_processing_settings = in_data_processing_settings; for (int i = 0; i < ndatastreams; i++) { acquisition_device[i].SetSpotFinderParameters(data_processing_settings.photon_count_threshold, data_processing_settings.signal_to_noise_threshold); } } void JFJochReceiver::StopReceiver() { if (measurement.valid()) { measurement.get(); logger.Info("Receiver stopped"); } } JFJochReceiver::~JFJochReceiver() { if (measurement.valid()) measurement.get(); free(send_buffer); } DataProcessingSettings JFJochReceiver::GetDataProcessingSettings() { std::unique_lock ul(data_processing_settings_mutex); return data_processing_settings; } Plot JFJochReceiver::GetPlots(const PlotRequest &request) { Plot ret; auto nbins = experiment.GetSpotFindingBin(); if (request.binning > 0) nbins = request.binning; switch (request.type) { case PlotType::RadInt: if (rad_int_profile) return rad_int_profile->GetPlot(); else return {}; case PlotType::SpotCount: return spot_count.GetPlot(nbins); case PlotType::IndexingRate: return indexing_solution.GetPlot(nbins); case PlotType::BkgEstimate: return bkg_estimate.GetPlot(nbins); case PlotType::IndexingRatePerFile: return indexing_solution_per_file.GetPlot(); case PlotType::ADUHistorgram: return adu_histogram_total.GetPlot(); break; default: // Do nothing break; } return ret; } RadialIntegrationProfiles JFJochReceiver::GetRadialIntegrationProfiles() { RadialIntegrationProfiles ret; if (rad_int_profile) { ret.profiles.emplace_back(RadialIntegrationProfileStruct{ .title = "dataset" , .plot = rad_int_profile->GetPlot()}); } for (int i = 0; i < rad_int_profile_per_file.size(); i++) { ret.profiles.emplace_back(RadialIntegrationProfileStruct{ .title = "file" + std::to_string(i), .plot = rad_int_profile_per_file[i]->GetPlot()}); } return ret; } void JFJochReceiver::UpdateMaxImage(int64_t image_number) { std::unique_lock ul(max_image_number_sent_mutex); if (image_number > max_image_number_sent) max_image_number_sent = image_number; } void JFJochReceiver::UpdateMaxDelay(int64_t delay) { std::unique_lock ul(max_delay_mutex); if (delay > max_delay) max_delay = delay; } float JFJochReceiver::GetAvailableSendBuffers() const { return static_cast(send_buffer_avail.Size()) / static_cast(send_buffer_count); } JFJochReceiverStatus JFJochReceiver::GetStatus() const { return { .progress = GetProgress(), .indexing_rate = GetIndexingRate(), .send_buffers_avail = GetAvailableSendBuffers() }; }