diff --git a/receiver/host/IBReceiver.cpp b/receiver/host/IBReceiver.cpp index 98580122..baf3d8ad 100644 --- a/receiver/host/IBReceiver.cpp +++ b/receiver/host/IBReceiver.cpp @@ -2,3 +2,104 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "IBReceiver.h" + +#include + +#ifdef JFJOCH_USE_NUMA +#include +#endif + +#include "../common/JFJochException.h" +#include "RawJFUDPPacket.h" + +#define BUFFER_SIZE 16384 +#define BUFFER_COUNT 4096 + +IBReceiverBuffer::IBReceiverBuffer(uint8_t numa_node) { + buffer = (uint8_t *) mmap(nullptr, BUFFER_SIZE * BUFFER_COUNT, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + if (buffer == nullptr) + throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "frame_buffer"); + +#ifdef JFJOCH_USE_NUMA + if (numa_node >= 0) { + unsigned long nodemask = 1L << numa_node;; + if (numa_node > sizeof(nodemask)*8) { + Unmap(); + throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Mask too small for NUMA node"); + } + if (mbind(buffer, BUFFER_SIZE * BUFFER_COUNT, MPOL_BIND, &nodemask, sizeof(nodemask)*8, MPOL_MF_STRICT) == -1) { + Unmap(); + throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Cannot apply NUMA policy"); + } + } +#endif + memset(buffer, 0, BUFFER_SIZE * BUFFER_COUNT); +} + +void IBReceiverBuffer::Unmap() { + munmap(buffer, BUFFER_SIZE * BUFFER_COUNT); +} + +IBReceiverBuffer::~IBReceiverBuffer() { + mr.reset(); + Unmap(); +} + +void IBReceiverBuffer::Register(IBProtectionDomain &pd) { + mr = std::make_unique(pd, buffer, BUFFER_SIZE * BUFFER_COUNT); +} + +IBMemoryRegion *IBReceiverBuffer::GetMemoryRegion() { + return mr.get(); +} + +uint8_t *IBReceiverBuffer::GetLocation(uint64_t location) { + if (location < BUFFER_COUNT) + return buffer + BUFFER_SIZE * location; + else + throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Location out of bounds"); +} + + +IBReceiver::IBReceiver(IBContext &context, ProcessJFPacket &process, uint64_t mac_addr, uint32_t ipv4, uint8_t numa_node) + : pd(context), + cq(context, BUFFER_COUNT), + qp(pd, cq, 16, BUFFER_COUNT), + buffer(numa_node) { + buffer.Register(pd); + + qp.FlowSteering(mac_addr, ipv4); + + for (int i = 0; i < BUFFER_COUNT - 1; i++) + qp.PostReceiveWR(*buffer.GetMemoryRegion(), i, buffer.GetLocation(i), BUFFER_SIZE); + receiver = std::async(std::launch::async, &IBReceiver::Run, this, &process); +} + +uint64_t IBReceiver::Run(ProcessJFPacket *process) { + while (!cancel) { + int64_t i; + size_t size; + if (cq.Poll(i, size) > 0) { + if (size == sizeof(RawJFUDPacket)) { + auto ptr = (RawJFUDPacket *) buffer.GetLocation(i); + process->ProcessPacket(&ptr->jf, ptr->ipv4_header_sour_ip); + qp.PostReceiveWR(*buffer.GetMemoryRegion(), i, buffer.GetLocation(i), BUFFER_SIZE); + } + } else + std::this_thread::sleep_for(std::chrono::microseconds(10)); + } + return 0; +} + +IBReceiver::~IBReceiver() { + if (receiver.valid()) + receiver.get(); +} + +uint64_t IBReceiver::Finalize() { + cancel = true; + if (receiver.valid()) + return receiver.get(); + else + return 0; +} diff --git a/receiver/host/IBReceiver.h b/receiver/host/IBReceiver.h index a6803707..c83f3e37 100644 --- a/receiver/host/IBReceiver.h +++ b/receiver/host/IBReceiver.h @@ -4,9 +4,38 @@ #ifndef JUNGFRAUJOCH_IBRECEIVER_H #define JUNGFRAUJOCH_IBRECEIVER_H +#include "IBWrappers.h" + +#include + +#include "ProcessJFPacket.h" + +class IBReceiverBuffer { + std::unique_ptr mr; + uint8_t *buffer; + void Unmap(); +public: + IBReceiverBuffer(uint8_t numa_node = 1); + ~IBReceiverBuffer(); + void Register(IBProtectionDomain &pd); + uint8_t *GetLocation(uint64_t location); + IBMemoryRegion *GetMemoryRegion(); +}; class IBReceiver { + IBProtectionDomain pd; + IBCompletionQueue cq; + IBQueuePair qp; + IBReceiverBuffer buffer; + + std::future receiver; + bool cancel = false; + uint64_t Run(ProcessJFPacket *process); +public: + IBReceiver(IBContext &context, ProcessJFPacket &process, uint64_t mac_addr, uint32_t ipv4, uint8_t numa_node = -1); + ~IBReceiver(); + uint64_t Finalize(); }; diff --git a/receiver/host/IBWrappers.cpp b/receiver/host/IBWrappers.cpp index 701cb1c9..bdaf23f9 100644 --- a/receiver/host/IBWrappers.cpp +++ b/receiver/host/IBWrappers.cpp @@ -217,22 +217,22 @@ void IBQueuePair::PostInlineSendWR(void *pointer, size_t size) { } } -void IBQueuePair::FlowSteering(uint64_t mac_addr, uint32_t ipv4, uint16_t udp_dest_port) { - +void IBQueuePair::FlowSteering(uint64_t mac_addr, uint32_t ipv4) { uint8_t dst_mac_addr[6]; + for (int i = 0; i < 6; i++) + dst_mac_addr[i] = (mac_addr & (0xFF << (i*8))) >> (i * 8); struct raw_eth_flow_attr { struct ibv_flow_attr attr; struct ibv_flow_spec_eth spec_eth; struct ibv_flow_spec_ipv4 spec_ipv4; - struct ibv_flow_spec_tcp_udp spec_udp; } __attribute__((packed)) flow_attr = { .attr = { .comp_mask = 0, .type = IBV_FLOW_ATTR_NORMAL, .size = sizeof(flow_attr), .priority = 0, - .num_of_specs = 3, + .num_of_specs = 2, .port = 1, .flags = 0, }, @@ -270,18 +270,6 @@ void IBQueuePair::FlowSteering(uint64_t mac_addr, uint32_t ipv4, uint16_t udp_de .src_ip = 0, .dst_ip = 0xFFFFFFFF } - }, - .spec_udp = { - .type = IBV_FLOW_SPEC_UDP, - .size = sizeof(struct ibv_flow_spec_tcp_udp), - .val = { - .dst_port = udp_dest_port, - .src_port = 0 - }, - .mask = { - .dst_port = 0xFFFE, - .src_port = 0 - } } }; diff --git a/receiver/host/IBWrappers.h b/receiver/host/IBWrappers.h index 0cc2f138..56f6b6ed 100644 --- a/receiver/host/IBWrappers.h +++ b/receiver/host/IBWrappers.h @@ -60,7 +60,7 @@ public: void PostReceiveWR(IBMemoryRegion &mr, uint32_t location, void *pointer, size_t size); void PostInlineSendWR(void *pointer, size_t size); - void FlowSteering(uint64_t mac_addr, uint32_t ipv4, uint16_t udp_dest_port); + void FlowSteering(uint64_t mac_addr, uint32_t ipv4); ~IBQueuePair(); }; diff --git a/receiver/host/UdpReceiver.cpp b/receiver/host/UdpReceiver.cpp index 0113bc26..964db1ec 100644 --- a/receiver/host/UdpReceiver.cpp +++ b/receiver/host/UdpReceiver.cpp @@ -63,4 +63,5 @@ uint64_t UdpReceiver::Run(uint16_t udp_port_number, ProcessJFPacket *process) { } close(fd); + return 0; } \ No newline at end of file diff --git a/tests/ProcessRawPacketTest.cpp b/tests/ProcessRawPacketTest.cpp index 2ab2d152..6f520768 100644 --- a/tests/ProcessRawPacketTest.cpp +++ b/tests/ProcessRawPacketTest.cpp @@ -37,20 +37,19 @@ TEST_CASE("ProcessRawPacketTest") { datagram.framenum = 1; datagram.bunchid = 84; datagram.data[0] = 6789; - process.ProcessPacket(datagram, experiment.GetSrcIPv4Address(0, 4)); + process.ProcessPacket(&datagram, experiment.GetSrcIPv4Address(0, 4)); datagram.packetnum = 36; datagram.framenum = 2; datagram.bunchid = 84; datagram.data[0] = 6345; - process.ProcessPacket(datagram, experiment.GetSrcIPv4Address(0, 5)); + process.ProcessPacket(&datagram, experiment.GetSrcIPv4Address(0, 5)); datagram.packetnum = 16; datagram.framenum = 3; datagram.bunchid = 84; datagram.data[0] = 6346; - process.ProcessPacket(datagram, experiment.GetSrcIPv4Address(1,7)); - + process.ProcessPacket(&datagram, experiment.GetSrcIPv4Address(1,7)); } REQUIRE(c_fifo.Size() == 3);