Files
Jungfraujoch/receiver/host/IBReceiver.cpp

161 lines
5.1 KiB
C++

// Copyright (2019-2022) Paul Scherrer Institute
// SPDX-License-Identifier: GPL-3.0-or-later
#include "IBReceiver.h"
#include <sys/mman.h>
#ifdef JFJOCH_USE_NUMA
#include <numaif.h>
#endif
#ifdef JFJOCH_USE_NUMA_H
#include <numa.h>
#endif
#include "../common/JFJochException.h"
#include "RawJFUDPPacket.h"
#define BUFFER_SIZE 16384
#define BUFFER_COUNT 4096
IBReceiverBuffer::IBReceiverBuffer(uint8_t numa_node) {
buffer = (uint8_t *) mmap(nullptr, BUFFER_SIZE * BUFFER_COUNT, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (buffer == nullptr)
throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "frame_buffer");
#ifdef JFJOCH_USE_NUMA
if (numa_node >= 0) {
unsigned long nodemask = 1L << numa_node;;
if (numa_node > sizeof(nodemask)*8) {
Unmap();
throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Mask too small for NUMA node");
}
if (mbind(buffer, BUFFER_SIZE * BUFFER_COUNT, MPOL_BIND, &nodemask, sizeof(nodemask)*8, MPOL_MF_STRICT) == -1) {
Unmap();
throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Cannot apply NUMA policy");
}
}
#endif
memset(buffer, 0, BUFFER_SIZE * BUFFER_COUNT);
}
void IBReceiverBuffer::Unmap() {
munmap(buffer, BUFFER_SIZE * BUFFER_COUNT);
}
IBReceiverBuffer::~IBReceiverBuffer() {
mr.reset();
Unmap();
}
void IBReceiverBuffer::Register(IBProtectionDomain &pd) {
mr = std::make_unique<IBMemoryRegion>(pd, buffer, BUFFER_SIZE * BUFFER_COUNT);
}
IBMemoryRegion *IBReceiverBuffer::GetMemoryRegion() {
return mr.get();
}
uint8_t *IBReceiverBuffer::GetLocation(uint64_t location) {
if (location < BUFFER_COUNT)
return buffer + BUFFER_SIZE * location;
else
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Location out of bounds");
}
IBReceiver::IBReceiver(IBContext &context, ProcessJFPacket &process, uint64_t mac_addr, uint32_t ipv4, uint32_t nthreads,
uint8_t numa_node)
: pd(context),
cq(context, BUFFER_COUNT),
qp(pd, cq, 16, BUFFER_COUNT),
buffer(numa_node) {
buffer.Register(pd);
qp.FlowSteering(mac_addr, ipv4);
for (int i = 0; i < BUFFER_COUNT - 1; i++)
qp.PostReceiveWR(*buffer.GetMemoryRegion(), i, buffer.GetLocation(i), BUFFER_SIZE);
for (int i = 0; i < nthreads; i++)
futures.emplace_back(std::async(std::launch::async, &IBReceiver::Run, this, &process, numa_node));
futures.emplace_back(std::async(std::launch::async, &IBReceiver::Arp, this, mac_addr, ipv4));
}
void IBReceiver::Run(ProcessJFPacket *process, uint8_t numa_node) {
#ifdef JFJOCH_USE_NUMA_H
if (numa_available() != -1)
numa_run_on_node(numa_node);
#endif
while (!cancel) {
int64_t i;
size_t size;
if (cq.Poll(i, size) > 0) {
if (size == sizeof(RawJFUDPacket)) {
auto ptr = (RawJFUDPacket *) buffer.GetLocation(i);
process->ProcessPacket(&ptr->jf, ptr->ipv4_header_sour_ip);
qp.PostReceiveWR(*buffer.GetMemoryRegion(), i, buffer.GetLocation(i), BUFFER_SIZE);
}
} else
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
}
// ARP packet - from if_arp.h
#pragma pack(push)
#pragma pack(2)
struct RAW_ARP_Packet
{
unsigned char dest_mac[6];
unsigned char sour_mac[6];
uint16_t ether_type;
unsigned short int ar_hrd; /* Format of hardware address. */
unsigned short int ar_pro; /* Format of protocol address. */
unsigned char ar_hln; /* Length of hardware address. */
unsigned char ar_pln; /* Length of protocol address. */
unsigned short int ar_op; /* ARP opcode (command). */
unsigned char __ar_sha[6]; /* Sender hardware address. */
unsigned __ar_sip; /* Sender IP address. */
unsigned char __ar_tha[6]; /* Target hardware address. */
unsigned __ar_tip; /* Target IP address. */
};
#pragma pack(pop)
void IBReceiver::Arp(uint64_t mac_addr, uint32_t ipv4_addr) {
uint8_t src_mac[6];
for (int i = 0; i < 6; i++)
src_mac[i] = (mac_addr & (0xFF << (i * 8))) >> (i * 8);
RAW_ARP_Packet arp_packet{
.dest_mac = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF},
.sour_mac = {src_mac[0], src_mac[1], src_mac[2], src_mac[3], src_mac[4], src_mac[5]},
.ether_type = 0x0608, // ether type for ARP
.ar_hrd = 0x0001, // MAC addr
.ar_pro = 0x0800, // IPv4
.ar_hln = 0x6,
.ar_pln = 0x4,
.ar_op = 0x0100, // request
.__ar_sha = {src_mac[0], src_mac[1], src_mac[2], src_mac[3], src_mac[4], src_mac[5]},
.__ar_sip = ipv4_addr,
.__ar_tha = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF},
.__ar_tip = ipv4_addr
};
while (!cancel) {
qp.PostInlineSendWR(&arp_packet, sizeof(RAW_ARP_Packet));
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
IBReceiver::~IBReceiver() {
cancel = true;
for (auto &iter: futures) {
if (iter.valid())
iter.get();
}
}