From 9d978a41f766760c2a5dec351a02ea84267cea39 Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Fri, 28 Jul 2023 11:00:42 +0200 Subject: [PATCH] NUMAHWPolicy: Added --- common/CMakeLists.txt | 11 ++- common/CUDAWrapper.cpp | 2 +- common/CUDAWrapper.cu | 2 +- common/CUDAWrapper.h | 2 +- common/NUMAHWPolicy.cpp | 119 +++++++++++++++++++++++++++++ common/NUMAHWPolicy.h | 40 ++++++++++ receiver/CMakeLists.txt | 14 ---- receiver/JFJochReceiver.cpp | 37 +++++---- receiver/JFJochReceiver.h | 19 +++-- receiver/JFJochReceiverService.cpp | 16 +++- receiver/JFJochReceiverService.h | 6 ++ receiver/JFJochReceiverTest.cpp | 16 ++-- receiver/JFJochReceiverTest.h | 9 ++- receiver/jfjoch_action_test.cpp | 9 ++- receiver/jfjoch_receiver.cpp | 2 +- 15 files changed, 246 insertions(+), 58 deletions(-) create mode 100644 common/NUMAHWPolicy.cpp create mode 100644 common/NUMAHWPolicy.h diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index b2e7e530..d7d84cfd 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -46,7 +46,9 @@ ADD_LIBRARY( CommonFunctions STATIC DetectorSetup.h DetectorSetup.cpp ZeroCopyReturnValue.h Histogram.h DiffractionGeometry.h ROIFilter.h CUDAWrapper.cpp - CUDAWrapper.h) + CUDAWrapper.h + NUMAHWPolicy.cpp + NUMAHWPolicy.h) TARGET_LINK_LIBRARIES(CommonFunctions Compression FrameSerialize libzmq JFCalibration JFJochProtoBuf -lrt) @@ -55,7 +57,10 @@ IF (CMAKE_CUDA_COMPILER) TARGET_LINK_LIBRARIES(CommonFunctions ${CUDART_LIBRARY} ${CMAKE_DL_LIBS} rt) ENDIF() -IF(HAS_NUMAIF AND NUMA_LIBRARY) - TARGET_COMPILE_DEFINITIONS(CommonFunctions PRIVATE -DJFJOCH_USE_NUMA) +IF(HAS_NUMAIF AND HAS_NUMA_H AND NUMA_LIBRARY) + TARGET_COMPILE_DEFINITIONS(CommonFunctions PUBLIC -DJFJOCH_USE_NUMA) TARGET_LINK_LIBRARIES(CommonFunctions ${NUMA_LIBRARY}) + MESSAGE(STATUS "NUMA memory/CPU pinning enabled") +ELSE() + MESSAGE(WARNING "NUMA memory/CPU pinning disabled") ENDIF() diff --git a/common/CUDAWrapper.cpp b/common/CUDAWrapper.cpp index ead14de6..9b5ff35d 100644 --- a/common/CUDAWrapper.cpp +++ b/common/CUDAWrapper.cpp @@ -9,6 +9,6 @@ int32_t get_gpu_count() { return 0; } -void set_gpu_count(int32_t dev_id) {} +void set_gpu(int32_t dev_id) {} #endif diff --git a/common/CUDAWrapper.cu b/common/CUDAWrapper.cu index df4108fb..8a47b847 100644 --- a/common/CUDAWrapper.cu +++ b/common/CUDAWrapper.cu @@ -15,7 +15,7 @@ int32_t get_gpu_count() { return device_count; } -void set_gpu_count(int32_t dev_id) { +void set_gpu(int32_t dev_id) { auto dev_count = get_gpu_count(); if ((dev_id < 0) || (dev_id >= dev_count)) diff --git a/common/CUDAWrapper.h b/common/CUDAWrapper.h index 61865da5..4297ed1f 100644 --- a/common/CUDAWrapper.h +++ b/common/CUDAWrapper.h @@ -7,6 +7,6 @@ #include int32_t get_gpu_count(); -void set_gpu_count(int32_t dev_id); +void set_gpu(int32_t dev_id); #endif //JUNGFRAUJOCH_CUDAWRAPPER_H diff --git a/common/NUMAHWPolicy.cpp b/common/NUMAHWPolicy.cpp new file mode 100644 index 00000000..42043e95 --- /dev/null +++ b/common/NUMAHWPolicy.cpp @@ -0,0 +1,119 @@ +// Copyright (2019-2023) Paul Scherrer Institute +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "NUMAHWPolicy.h" + +#include "../common/CUDAWrapper.h" +#include "JFJochException.h" + +#ifdef JFJOCH_USE_NUMA +#include +#endif + +NUMAHWPolicy::NUMAHWPolicy(const std::string &policy) : name(policy) { + if ((policy.empty()) || (policy == "none")) { + name = "none"; + } else if (policy == "n2g2") { + bindings.emplace_back(NUMABinding{.cpu_node = 0, .mem_node = 0, .gpu = 0}); + bindings.emplace_back(NUMABinding{.cpu_node = 1, .mem_node = 1, .gpu = 1}); + } else if (policy == "n8g4") { + for (int32_t i = 0; i < 8; i++) + bindings.emplace_back(NUMABinding{.cpu_node = i, .mem_node = i, .gpu = i/2}); + } else if (policy == "n8g4_hbm") { + for (int32_t i = 0; i < 8; i++) + bindings.emplace_back(NUMABinding{.cpu_node = i, .mem_node = i + 8, .gpu = i / 2}); + } else if (policy == "g2") { + bindings.emplace_back(NUMABinding{.cpu_node = -1, .mem_node = -1, .gpu = 0}); + bindings.emplace_back(NUMABinding{.cpu_node = -1, .mem_node = -1, .gpu = 1}); + } else if (policy == "g4") { + bindings.emplace_back(NUMABinding{.cpu_node = -1, .mem_node = -1, .gpu = 0}); + bindings.emplace_back(NUMABinding{.cpu_node = -1, .mem_node = -1, .gpu = 1}); + bindings.emplace_back(NUMABinding{.cpu_node = -1, .mem_node = -1, .gpu = 2}); + bindings.emplace_back(NUMABinding{.cpu_node = -1, .mem_node = -1, .gpu = 3}); + } else + throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Unknown NUMA policy"); +} + +NUMAHWPolicy::NUMAHWPolicy(const NUMAHWPolicy &other) : bindings(other.bindings), name(other.name), curr_thread(0) {} + +NUMAHWPolicy &NUMAHWPolicy::operator=(const NUMAHWPolicy &other) { + bindings = other.bindings; + name = other.name; + curr_thread = 0; + return *this; +} + +NUMABinding NUMAHWPolicy::GetBinding(uint32_t thread) { + if (bindings.empty()) + return NUMABinding{.cpu_node = -1, .mem_node = -1, .gpu = -1}; + else + return bindings.at(thread % bindings.size()); +} + +NUMABinding NUMAHWPolicy::GetBinding() { + return GetBinding(curr_thread++); +} + +void NUMAHWPolicy::Bind() { + Bind(GetBinding()); +} + +void NUMAHWPolicy::Bind(uint32_t thread) { + Bind(GetBinding(thread)); +} + +void NUMAHWPolicy::Bind(const NUMABinding &binding) { + RunOnNode(binding.cpu_node); + MemOnNode(binding.mem_node); + SelectGPU(binding.gpu); +} + +void NUMAHWPolicy::RunOnNode(int32_t cpu_node) { +#ifdef JFJOCH_USE_NUMA + if (numa_available() != -1) { + auto max_nodes = numa_num_configured_nodes(); + + if (cpu_node >= 0) { + if (cpu_node < max_nodes) + numa_run_on_node(cpu_node); + else + throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "CPU NUMA node out of bounds"); + } + } +#endif +} + +void NUMAHWPolicy::MemOnNode(int32_t mem_node) { +#ifdef JFJOCH_USE_NUMA + if (numa_available() != -1) { + auto max_nodes = numa_num_configured_nodes(); + + if (mem_node >= 0) { + if (mem_node < max_nodes) { + struct bitmask *mask = numa_allocate_nodemask(); + numa_bitmask_setbit(mask, mem_node); + numa_set_membind(mask); + numa_bitmask_free(mask); + } else + throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Memory NUMA node out of bounds"); + } + } +#endif +} + +void NUMAHWPolicy::SelectGPU(int32_t gpu) { +#ifdef JFJOCH_USE_CUDA + if (gpu > 0) { + if (gpu < get_gpu_count()) + set_gpu(gpu); + else + throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "GPU device out of bounds"); + } +#endif +} + +const std::string &NUMAHWPolicy::GetName() const { + return name; +} + + diff --git a/common/NUMAHWPolicy.h b/common/NUMAHWPolicy.h new file mode 100644 index 00000000..446b66a1 --- /dev/null +++ b/common/NUMAHWPolicy.h @@ -0,0 +1,40 @@ +// Copyright (2019-2023) Paul Scherrer Institute +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef JUNGFRAUJOCH_NUMAHWPOLICY_H +#define JUNGFRAUJOCH_NUMAHWPOLICY_H + +#include +#include +#include +#include + +struct NUMABinding { + int32_t cpu_node; + int32_t mem_node; + int32_t gpu; +}; + +class NUMAHWPolicy { + std::string name; + std::vector bindings; + std::atomic curr_thread = 0; +public: + NUMAHWPolicy() = default; + explicit NUMAHWPolicy(const std::string& policy); + NUMAHWPolicy(const NUMAHWPolicy& other); + NUMAHWPolicy& operator=(const NUMAHWPolicy& other); + NUMABinding GetBinding(uint32_t thread); + NUMABinding GetBinding(); // round-robin + + const std::string &GetName() const; + + void Bind(uint32_t thread); + void Bind(); // round-robin + static void Bind(const NUMABinding &binding); + static void RunOnNode(int32_t cpu_node); + static void MemOnNode(int32_t mem_node); + static void SelectGPU(int32_t gpu); +}; + +#endif //JUNGFRAUJOCH_NUMAHWPOLICY_H diff --git a/receiver/CMakeLists.txt b/receiver/CMakeLists.txt index acf4ecea..90cae64f 100644 --- a/receiver/CMakeLists.txt +++ b/receiver/CMakeLists.txt @@ -29,14 +29,6 @@ ADD_EXECUTABLE(jfjoch_lxsocket_test jfjoch_lxsocket_test.cpp) TARGET_LINK_LIBRARIES(jfjoch_lxsocket_test JungfraujochHost) INSTALL(TARGETS jfjoch_lxsocket_test RUNTIME) -IF(HAS_NUMAIF AND HAS_NUMA_H AND NUMA_LIBRARY) - TARGET_COMPILE_DEFINITIONS(JungfraujochHost PUBLIC -DJFJOCH_USE_NUMA) - TARGET_LINK_LIBRARIES(JungfraujochHost ${NUMA_LIBRARY}) - MESSAGE(STATUS "NUMA memory/CPU pinning enabled") -ELSE() - MESSAGE(WARNING "NUMA memory/CPU pinning disabled") -ENDIF() - ADD_EXECUTABLE(jfjoch_pcie_status jfjoch_pcie_status.cpp) TARGET_LINK_LIBRARIES(jfjoch_pcie_status JungfraujochHost) INSTALL(TARGETS jfjoch_pcie_status RUNTIME) @@ -60,12 +52,6 @@ ADD_LIBRARY(JFJochReceiver STATIC TARGET_LINK_LIBRARIES(JFJochReceiver ImageAnalysis JungfraujochHost CommonFunctions HLSSimulation) -IF(HAS_NUMA_H AND NUMA_LIBRARY) - TARGET_COMPILE_DEFINITIONS(JFJochReceiver PRIVATE -DJFJOCH_USE_NUMA_H) - TARGET_LINK_LIBRARIES(JFJochReceiver ${NUMA_LIBRARY}) - MESSAGE(STATUS "JFJochReceiver compiled with NUMA thread pinning") -ENDIF() - ADD_EXECUTABLE(jfjoch_receiver jfjoch_receiver.cpp) TARGET_LINK_LIBRARIES(jfjoch_receiver JFJochReceiver) INSTALL(TARGETS jfjoch_receiver RUNTIME) diff --git a/receiver/JFJochReceiver.cpp b/receiver/JFJochReceiver.cpp index d42da4b5..2c5b4cff 100644 --- a/receiver/JFJochReceiver.cpp +++ b/receiver/JFJochReceiver.cpp @@ -10,10 +10,6 @@ #include "../image_analysis/IndexerWrapper.h" #include "../common/DiffractionGeometry.h" -#ifdef JFJOCH_USE_NUMA -#include -#endif - inline std::string time_UTC(const std::chrono::time_point &input) { auto time_ms = std::chrono::duration_cast(input.time_since_epoch()).count(); @@ -30,7 +26,8 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings, Logger &in_logger, int64_t in_forward_and_sum_nthreads, int64_t in_send_buffer_count, ZMQPreviewPublisher* in_preview_publisher, - ZMQPreviewPublisher* in_preview_publisher_indexed) : + ZMQPreviewPublisher* in_preview_publisher_indexed, + const NUMAHWPolicy &in_numa_policy) : experiment(settings.jungfraujoch_settings()), acquisition_device(in_aq_device), logger(in_logger), @@ -44,7 +41,8 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings, frame_transformation_ready((experiment.GetImageNum() > 0) ? frame_transformation_nthreads : 0), send_buffer_count(in_send_buffer_count), send_buffer(send_buffer_size * send_buffer_count), - indexing_solution_per_file(experiment.GetDataFileCount()) + indexing_solution_per_file(experiment.GetDataFileCount()), + numa_policy(in_numa_policy) { if (settings.has_calibration()) { calib.emplace(settings.calibration()); @@ -77,8 +75,10 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings, preview_stride = experiment.GetPreviewStride(); spotfinder_stride = experiment.GetSpotFindingStride(); + logger.Info("NUMA policy: {}", numa_policy.GetName()); logger.Info("Image stride for data analysis: preview {}, spot finding/radial integration {}", preview_stride, spotfinder_stride); + if (experiment.GetDetectorMode() == DetectorMode::Conversion) { if (preview_publisher != nullptr) preview_publisher->Start(experiment, calib.value()); @@ -237,7 +237,11 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings, } void JFJochReceiver::AcquireThread(uint16_t data_stream) { - PinThreadToDevice(data_stream); + try { + NUMAHWPolicy::RunOnNode(acquisition_device[data_stream]->GetNUMANode()); + } catch (const JFJochException &e) { + logger.Error("HW bind error {}", e.what()); + } try { frame_transformation_ready.wait(); @@ -265,7 +269,11 @@ void JFJochReceiver::AcquireThread(uint16_t data_stream) { } void JFJochReceiver::MeasurePedestalThread(uint16_t data_stream, uint16_t module_number, uint16_t storage_cell) { - PinThreadToDevice(data_stream); + try { + NUMAHWPolicy::RunOnNode(acquisition_device[data_stream]->GetNUMANode()); + } catch (const JFJochException &e) { + logger.Error("HW bind error {}", e.what()); + } JFPedestalCalc pedestal_calc(experiment); @@ -354,6 +362,12 @@ void JFJochReceiver::MiniSummationThread(int d, int m, size_t image_number, bool } void JFJochReceiver::FrameTransformationThread() { + try { + numa_policy.Bind(); + } catch (const JFJochException &e) { + logger.Error("HW bind error {}", e.what()); + } + FrameTransformation transformation(experiment); std::unique_ptr spot_finder; @@ -785,13 +799,6 @@ void JFJochReceiver::PrepareConversionOnCPU() { experiment.GetPhotonEnergy_keV()); } -void JFJochReceiver::PinThreadToDevice(uint16_t data_stream) { -#ifdef JFJOCH_USE_NUMA - if (numa_available() != -1) - numa_run_on_node(acquisition_device[data_stream]->GetNUMANode()); -#endif -} - void JFJochReceiver::UpdateMaxImage(int64_t image_number) { std::unique_lock ul(max_image_number_sent_mutex); if (image_number > max_image_number_sent) diff --git a/receiver/JFJochReceiver.h b/receiver/JFJochReceiver.h index 2744d05b..438ad71c 100644 --- a/receiver/JFJochReceiver.h +++ b/receiver/JFJochReceiver.h @@ -11,23 +11,24 @@ #include #include +#include "AcquisitionDevice.h" + #include "../common/DiffractionExperiment.h" #include "../common/JFJochException.h" #include "../common/FrameTransformation.h" -#include "../image_analysis/StrongPixelSet.h" -#include "../jungfrau/JFCalibration.h" #include "../common/ImagePusher.h" - -#include "AcquisitionDevice.h" #include "../common/Logger.h" #include "../common/ThreadSafeFIFO.h" #include "../common/ZMQPreviewPublisher.h" +#include "../common/NUMAHWPolicy.h" +#include "../common/StatusVector.h" +#include "../common/Histogram.h" +#include "../image_analysis/StrongPixelSet.h" #include "../image_analysis/RadialIntegrationMapping.h" #include "../image_analysis/RadialIntegrationProfile.h" -#include "../common/StatusVector.h" -#include "../common/Histogram.h" +#include "../jungfrau/JFCalibration.h" #include "../jungfrau/JFConversionFixedPoint.h" class JFJochReceiver { @@ -101,7 +102,8 @@ class JFJochReceiver { std::vector send_buffer; std::vector send_buffer_zero_copy_ret_val; - void PinThreadToDevice(uint16_t data_stream); + NUMAHWPolicy numa_policy; + void PrepareConversionOnCPU(); void AcquireThread(uint16_t data_stream); void FrameTransformationThread(); @@ -123,7 +125,8 @@ public: Logger &logger, int64_t forward_and_sum_nthreads, int64_t send_buffer_count, ZMQPreviewPublisher* preview_publisher, - ZMQPreviewPublisher* preview_publisher_indexed); + ZMQPreviewPublisher* preview_publisher_indexed, + const NUMAHWPolicy &numa_policy); ~JFJochReceiver(); JFJochReceiver(const JFJochReceiver &other) = delete; JFJochReceiver& operator=(const JFJochReceiver &other) = delete; diff --git a/receiver/JFJochReceiverService.cpp b/receiver/JFJochReceiverService.cpp index b7263eb5..2116605a 100644 --- a/receiver/JFJochReceiverService.cpp +++ b/receiver/JFJochReceiverService.cpp @@ -22,7 +22,8 @@ grpc::Status JFJochReceiverService::Start(grpc::ServerContext *context, const JF receiver.reset(); receiver = std::make_unique(*request, aq_devices, image_pusher, logger, nthreads, send_buffer_count, - preview_publisher, preview_publisher_indexed); + preview_publisher, preview_publisher_indexed, + numa_policy); try { // Don't want to stop receiver->SetDataProcessingSettings(data_processing_settings); @@ -119,6 +120,16 @@ JFJochReceiverService& JFJochReceiverService::PreviewPublisherIndexed(ZMQPreview return *this; } +JFJochReceiverService &JFJochReceiverService::NUMAPolicy(const NUMAHWPolicy &policy) { + numa_policy = policy; + return *this; +} + +JFJochReceiverService &JFJochReceiverService::NUMAPolicy(const std::string &policy) { + numa_policy = NUMAHWPolicy(policy); + return *this; +} + grpc::Status JFJochReceiverService::GetStatus(grpc::ServerContext *context, const JFJochProtoBuf::Empty *request, JFJochProtoBuf::ReceiverStatus *response) { // FPGA status can be polled outside the state mutex @@ -196,4 +207,5 @@ grpc::Status JFJochReceiverService::GetNetworkConfig(grpc::ServerContext *contex dev_net_cfg->set_udp_port(aq->GetUDPPort()); } return grpc::Status::OK; -} \ No newline at end of file +} + diff --git a/receiver/JFJochReceiverService.h b/receiver/JFJochReceiverService.h index abef96ac..6b5f6092 100644 --- a/receiver/JFJochReceiverService.h +++ b/receiver/JFJochReceiverService.h @@ -6,9 +6,13 @@ #include "JFJochReceiver.h" #include "jfjoch.grpc.pb.h" + #include +#include "../common/NUMAHWPolicy.h" + class JFJochReceiverService final : public JFJochProtoBuf::gRPC_JFJochReceiver::Service { + NUMAHWPolicy numa_policy; std::unique_ptr receiver; std::vector &aq_devices; @@ -35,6 +39,8 @@ public: JFJochReceiverService& PreviewPublisherIndexed(ZMQPreviewPublisher *in_preview_writer); JFJochReceiverService& NumThreads(int64_t input); JFJochReceiverService& SendBufferCount(int64_t input); + JFJochReceiverService& NUMAPolicy(const NUMAHWPolicy& policy); + JFJochReceiverService& NUMAPolicy(const std::string& policy); grpc::Status Start(grpc::ServerContext* context, const JFJochProtoBuf::ReceiverInput* request, JFJochProtoBuf::Empty* response) override; diff --git a/receiver/JFJochReceiverTest.cpp b/receiver/JFJochReceiverTest.cpp index 39f44fa9..62f6659f 100644 --- a/receiver/JFJochReceiverTest.cpp +++ b/receiver/JFJochReceiverTest.cpp @@ -12,7 +12,8 @@ JFJochProtoBuf::ReceiverOutput RunJFJochReceiverTest(std::vector> &aq_devices, const DiffractionExperiment &x, uint16_t nthreads, bool abort, - bool verbose, ZMQPreviewPublisher *in_preview_writer) { + bool verbose, ZMQPreviewPublisher *in_preview_writer, + const std::string &numa_policy) { std::vector tmp_aq_devices; for (const auto &i: aq_devices) tmp_aq_devices.emplace_back(i.get()); return JFJochReceiverTest(output, logger, tmp_aq_devices, x, nthreads, - abort, verbose, in_preview_writer); + abort, verbose, in_preview_writer, numa_policy); } static JFCalibration GeneratePedestalCalibration(const DiffractionExperiment &x) { @@ -86,7 +87,8 @@ static JFCalibration GeneratePedestalCalibration(const DiffractionExperiment &x) bool JFJochReceiverTest(JFJochProtoBuf::ReceiverOutput &output, Logger &logger, std::vector &aq_devices, const DiffractionExperiment &x, uint16_t nthreads, bool abort, - bool verbose, ZMQPreviewPublisher *in_preview_writer) { + bool verbose, ZMQPreviewPublisher *in_preview_writer, + const std::string &numa_policy) { std::vector raw_expected_image(RAW_MODULE_SIZE * x.GetModulesNum()); for (int i = 0; i < x.GetDataStreamsNum(); i++) { @@ -112,7 +114,7 @@ bool JFJochReceiverTest(JFJochProtoBuf::ReceiverOutput &output, Logger &logger, TestImagePusher pusher(image_number); output = RunJFJochReceiverTest(aq_devices, pusher, x, logger, calib, nthreads, abort, - in_preview_writer); + in_preview_writer, numa_policy); bool no_errors = true; diff --git a/receiver/JFJochReceiverTest.h b/receiver/JFJochReceiverTest.h index 90aaf6de..f36aaa5b 100644 --- a/receiver/JFJochReceiverTest.h +++ b/receiver/JFJochReceiverTest.h @@ -9,16 +9,19 @@ JFJochProtoBuf::ReceiverOutput RunJFJochReceiverTest(std::vector &aq_devices, ImagePusher &pusher, const DiffractionExperiment &x, Logger &logger, JFCalibration &calib, uint16_t nthreads, bool abort = false, - ZMQPreviewPublisher *in_preview_writer = nullptr); + ZMQPreviewPublisher *in_preview_writer = nullptr, + const std::string &numa_policy = ""); bool JFJochReceiverTest(JFJochProtoBuf::ReceiverOutput &output, Logger &logger, std::vector &aq_devices, const DiffractionExperiment &x, uint16_t nthreads, bool abort = false, - bool verbose = true, ZMQPreviewPublisher *in_preview_writer = nullptr); + bool verbose = true, ZMQPreviewPublisher *in_preview_writer = nullptr, + const std::string &numa_policy = ""); bool JFJochReceiverTest(JFJochProtoBuf::ReceiverOutput &output, Logger &logger, std::vector> &aq_devices, const DiffractionExperiment &x, uint16_t nthreads, bool abort = false, - bool verbose = true, ZMQPreviewPublisher *in_preview_writer = nullptr); + bool verbose = true, ZMQPreviewPublisher *in_preview_writer = nullptr, + const std::string &numa_policy = ""); #endif //JUNGFRAUJOCH_JFJOCHRECEIVERTEST_H diff --git a/receiver/jfjoch_action_test.cpp b/receiver/jfjoch_action_test.cpp index 9e76402d..9c7244bf 100644 --- a/receiver/jfjoch_action_test.cpp +++ b/receiver/jfjoch_action_test.cpp @@ -20,6 +20,7 @@ void print_usage(Logger &logger) { logger.Info(" -i number of images"); logger.Info(" -p data processing period"); logger.Info(" -N number of image processing threads"); + logger.Info(" -P NUMA Policy (none|n2g2|n8g4|n8g4_hbm), none is default"); } int main(int argc, char **argv) { @@ -36,12 +37,13 @@ int main(int argc, char **argv) { bool use_mock_device = false; bool nonblocking_mode = true; bool verbose = false; + std::string numa_policy_name = ""; if (argc == 1) print_usage(logger); int opt; - while ((opt = getopt(argc, argv, "s:i:m:p:N:CMBv")) != -1) { + while ((opt = getopt(argc, argv, "s:i:m:p:N:P:CMBv")) != -1) { switch (opt) { case 'C': conversion_on_cpu = true; @@ -70,6 +72,9 @@ int main(int argc, char **argv) { case 'v': verbose = true; break; + case 'P': + numa_policy_name = std::string(optarg); + break; default: /* '?' */ print_usage(logger); exit(EXIT_FAILURE); @@ -142,7 +147,7 @@ int main(int argc, char **argv) { bool ret; std::thread run_thread([&] { try { - ret = JFJochReceiverTest(output, logger, aq_devices, x, nthreads, false, verbose); + ret = JFJochReceiverTest(output, logger, aq_devices, x, nthreads, false, verbose, nullptr, numa_policy_name); } catch (std::exception &e) { logger.Error(e.what()); ret = false; diff --git a/receiver/jfjoch_receiver.cpp b/receiver/jfjoch_receiver.cpp index 1e1acd20..a20994f8 100644 --- a/receiver/jfjoch_receiver.cpp +++ b/receiver/jfjoch_receiver.cpp @@ -154,7 +154,7 @@ int main(int argc, char **argv) { if (input.contains("preview_indexed_zmq_addr")) { preview_indexed = std::make_unique(context, input["preview_indexed_zmq_addr"]); - service.PreviewPublisher(preview_indexed.get()); + service.PreviewPublisherIndexed(preview_indexed.get()); logger.Info("Preview available for indexed frames on ZMQ addr " + input["preview_indexed_zmq_addr"].get()); }