diff --git a/receiver/JFJochReceiver.cpp b/receiver/JFJochReceiver.cpp index 0cf8392f..7ba94784 100644 --- a/receiver/JFJochReceiver.cpp +++ b/receiver/JFJochReceiver.cpp @@ -633,7 +633,8 @@ void JFJochReceiver::PrepareConversionOnCPU() { void JFJochReceiver::PinThreadToDevice(uint16_t data_stream) { #ifdef JFJOCH_USE_NUMA_H - numa_run_on_node(acquisition_device[data_stream]->GetNUMANode()); + if (numa_available() != -1) + numa_run_on_node(acquisition_device[data_stream]->GetNUMANode()); #endif } diff --git a/receiver/host/CMakeLists.txt b/receiver/host/CMakeLists.txt index 908635f1..f25f28d9 100644 --- a/receiver/host/CMakeLists.txt +++ b/receiver/host/CMakeLists.txt @@ -18,6 +18,12 @@ TARGET_LINK_LIBRARIES(JungfraujochHost CommonFunctions HLSSimulation ${IBVERBS} TARGET_INCLUDE_DIRECTORIES(JungfraujochHost PUBLIC ../../include) +IF(HAS_NUMA_H AND NUMA_LIBRARY) + TARGET_COMPILE_DEFINITIONS(JungfraujochHost PRIVATE -DJFJOCH_USE_NUMA_H) + TARGET_LINK_LIBRARIES(JungfraujochHost ${NUMA_LIBRARY}) + MESSAGE(STATUS "JFJochReceiver compiled with NUMA thread pinning") +ENDIF() + ADD_EXECUTABLE(jfjoch_pcie_status jfjoch_pcie_status.cpp) TARGET_LINK_LIBRARIES(jfjoch_pcie_status JungfraujochHost) INSTALL(TARGETS jfjoch_pcie_status RUNTIME) diff --git a/receiver/host/IBReceiver.cpp b/receiver/host/IBReceiver.cpp index ed21fa26..b3c28bfd 100644 --- a/receiver/host/IBReceiver.cpp +++ b/receiver/host/IBReceiver.cpp @@ -9,6 +9,10 @@ #include #endif +#ifdef JFJOCH_USE_NUMA_H +#include +#endif + #include "../common/JFJochException.h" #include "RawJFUDPPacket.h" @@ -75,11 +79,16 @@ IBReceiver::IBReceiver(IBContext &context, ProcessJFPacket &process, uint64_t ma qp.PostReceiveWR(*buffer.GetMemoryRegion(), i, buffer.GetLocation(i), BUFFER_SIZE); for (int i = 0; i < nthreads; i++) - futures.emplace_back(std::async(std::launch::async, &IBReceiver::Run, this, &process)); + futures.emplace_back(std::async(std::launch::async, &IBReceiver::Run, this, &process, numa_node)); futures.emplace_back(std::async(std::launch::async, &IBReceiver::Arp, this, mac_addr, ipv4)); } -void IBReceiver::Run(ProcessJFPacket *process) { +void IBReceiver::Run(ProcessJFPacket *process, uint8_t numa_node) { +#ifdef JFJOCH_USE_NUMA_H + if (numa_available() != -1) + numa_run_on_node(numa_node); +#endif + while (!cancel) { int64_t i; size_t size; diff --git a/receiver/host/IBReceiver.h b/receiver/host/IBReceiver.h index 8a29e4b7..510ada65 100644 --- a/receiver/host/IBReceiver.h +++ b/receiver/host/IBReceiver.h @@ -31,10 +31,11 @@ class IBReceiver { std::vector> futures; bool cancel = false; - void Run(ProcessJFPacket *process); + void Run(ProcessJFPacket *process, uint8_t numa_node); void Arp(uint64_t mac_addr, uint32_t ipv4_addr); public: - IBReceiver(IBContext &context, ProcessJFPacket &process, uint64_t mac_addr, uint32_t ipv4, uint32_t nthreads = 1, uint8_t numa_node = -1); + IBReceiver(IBContext &context, ProcessJFPacket &process, uint64_t mac_addr, uint32_t ipv4, uint32_t nthreads = 1, + uint8_t numa_node = -1); ~IBReceiver(); };