JFJochReceiver: Remove host subdirectory
This commit is contained in:
346
receiver/IBWrappers.cpp
Normal file
346
receiver/IBWrappers.cpp
Normal file
@@ -0,0 +1,346 @@
|
||||
// Copyright (2019-2022) Paul Scherrer Institute
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#ifdef JFJOCH_USE_IBVERBS
|
||||
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <sys/mman.h>
|
||||
|
||||
#ifdef JFJOCH_USE_NUMA
|
||||
#include <numaif.h>
|
||||
#include <numa.h>
|
||||
#endif
|
||||
|
||||
#include "IBWrappers.h"
|
||||
#include "../common/JFJochException.h"
|
||||
|
||||
IBContext::IBContext(const std::string &dev_name) {
|
||||
struct ibv_device **dev_list;
|
||||
int num_devices;
|
||||
|
||||
dev_list = ibv_get_device_list(&num_devices);
|
||||
|
||||
if (dev_list == nullptr) throw JFJochException(JFJochExceptionCategory::IBVerbs, "Failed to get IB devices list.");
|
||||
|
||||
int selected = -1;
|
||||
for (int i = 0; i < num_devices; i++) {
|
||||
std::string tmp = std::string(ibv_get_device_name(dev_list[i]));
|
||||
if (dev_name == tmp) selected = i;
|
||||
}
|
||||
|
||||
if (selected == -1) throw JFJochException(JFJochExceptionCategory::IBVerbs, "IB device not found.");
|
||||
|
||||
context = ibv_open_device(dev_list[selected]);
|
||||
if (context == nullptr)
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Failed to open context for IB device.");
|
||||
|
||||
ibv_free_device_list(dev_list);
|
||||
|
||||
if (GetState() != IBV_PORT_ACTIVE) {
|
||||
ibv_close_device(context);
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "IB port is not open");
|
||||
}
|
||||
}
|
||||
|
||||
IBContext::~IBContext() {
|
||||
ibv_close_device(context);
|
||||
}
|
||||
|
||||
ibv_port_state IBContext::GetState() {
|
||||
ibv_port_attr port_attr{};
|
||||
if(ibv_query_port(context, 1, &port_attr) != 0)
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Cannot query port");
|
||||
return port_attr.state;
|
||||
}
|
||||
|
||||
uint16_t IBContext::GetLid() {
|
||||
ibv_port_attr port_attr{};
|
||||
if(ibv_query_port(context, 1, &port_attr) != 0)
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Cannot query port");
|
||||
return port_attr.lid;
|
||||
}
|
||||
|
||||
ibv_context *IBContext::GetPointer() {
|
||||
return context;
|
||||
}
|
||||
|
||||
IBCompletionQueue::IBCompletionQueue(IBContext &context, int minimum_size) {
|
||||
cq = ibv_create_cq(context.GetPointer(), minimum_size, nullptr, nullptr, 0);
|
||||
if (cq == nullptr)
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Failed to create completion queue.");
|
||||
}
|
||||
|
||||
int IBCompletionQueue::Poll(int64_t &id, size_t &compl_size) {
|
||||
ibv_wc wc{};
|
||||
int num_comp = ibv_poll_cq(cq, 1, &wc);
|
||||
|
||||
// Error in CQ polling
|
||||
if (num_comp < 0)
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Failed polling IB Verbs completion queue");
|
||||
else if (num_comp > 0) {
|
||||
// Check for error in work completion
|
||||
if (wc.status == IBV_WC_RETRY_EXC_ERR)
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Remote party can be disconnected or failed (transport retry counter exceeded status in CQ)");
|
||||
else if (wc.status != IBV_WC_SUCCESS)
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Failed status of IB Verbs send request " + std::to_string(wc.status), wc.status);
|
||||
else if (!(wc.wc_flags & IBV_WC_IP_CSUM_OK))
|
||||
compl_size = 0;
|
||||
else
|
||||
// Frame length in bytes
|
||||
compl_size = wc.byte_len;
|
||||
|
||||
// Location in buffer is based on work request ID
|
||||
id = wc.wr_id;
|
||||
}
|
||||
return num_comp;
|
||||
}
|
||||
|
||||
ibv_cq *IBCompletionQueue::GetPointer() {
|
||||
return cq;
|
||||
}
|
||||
|
||||
IBCompletionQueue::~IBCompletionQueue() {
|
||||
ibv_destroy_cq(cq);
|
||||
}
|
||||
|
||||
IBQueuePair::IBQueuePair(IBProtectionDomain &pd, IBCompletionQueue &cq, size_t send_queue_size,
|
||||
size_t receive_queue_size) {
|
||||
ibv_qp_init_attr qp_init_attr{};
|
||||
memset(&qp_init_attr, 0, sizeof (qp_init_attr));
|
||||
qp_init_attr.send_cq = cq.GetPointer();
|
||||
qp_init_attr.recv_cq = cq.GetPointer();
|
||||
qp_init_attr.qp_type = IBV_QPT_RAW_PACKET;
|
||||
qp_init_attr.cap.max_send_wr = send_queue_size;
|
||||
qp_init_attr.cap.max_recv_wr = receive_queue_size;
|
||||
qp_init_attr.cap.max_send_sge = 1; // No scatter/gather operations
|
||||
|
||||
qp = ibv_create_qp(pd.GetPointer(), &qp_init_attr);
|
||||
if (qp == nullptr)
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Failed to create IB queue pair.");
|
||||
}
|
||||
|
||||
uint32_t IBQueuePair::GetNumber() {
|
||||
return qp->qp_num;
|
||||
}
|
||||
|
||||
void IBQueuePair::Init() {
|
||||
ibv_qp_attr qp_attr{};
|
||||
memset(&qp_attr, 0, sizeof(qp_attr));
|
||||
|
||||
int qp_flags = IBV_QP_STATE | IBV_QP_PORT;
|
||||
qp_attr.qp_state = IBV_QPS_INIT;
|
||||
qp_attr.port_num = 1;
|
||||
|
||||
if (ibv_modify_qp(qp, &qp_attr, qp_flags))
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Failed modify IB queue pair to init.");
|
||||
}
|
||||
|
||||
void IBQueuePair::ReadyToReceive() {
|
||||
int qp_flags = IBV_QP_STATE ;
|
||||
|
||||
ibv_qp_attr qp_attr{};
|
||||
memset(&qp_attr, 0, sizeof(qp_attr));
|
||||
|
||||
qp_attr.qp_state = IBV_QPS_RTR;
|
||||
|
||||
if (ibv_modify_qp(qp, &qp_attr, qp_flags))
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Failed to set IB queue pair to ready to receive ");
|
||||
}
|
||||
|
||||
void IBQueuePair::ReadyToSend() {
|
||||
int qp_flags = IBV_QP_STATE ;
|
||||
|
||||
ibv_qp_attr qp_attr{};
|
||||
memset(&qp_attr, 0, sizeof(qp_attr));
|
||||
|
||||
qp_attr.qp_state = IBV_QPS_RTS;
|
||||
|
||||
if (ibv_modify_qp(qp, &qp_attr, qp_flags))
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Failed to set IB queue pair to ready to send");
|
||||
}
|
||||
|
||||
void IBQueuePair::Reset() {
|
||||
int qp_flags = IBV_QP_STATE;
|
||||
|
||||
ibv_qp_attr qp_attr{};
|
||||
memset(&qp_attr, 0, sizeof(qp_attr));
|
||||
|
||||
qp_attr.qp_state = IBV_QPS_RESET;
|
||||
|
||||
if (ibv_modify_qp(qp, &qp_attr, qp_flags))
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Failed modify IB queue pair to reset.");
|
||||
}
|
||||
|
||||
void IBQueuePair::PostReceiveWR(IBMemoryRegion &mr, uint32_t location, void *pointer, size_t size) {
|
||||
|
||||
ibv_sge ib_sg_entry{};
|
||||
ibv_recv_wr ib_wr{};
|
||||
ibv_recv_wr *ib_bad_recv_wr = nullptr;
|
||||
|
||||
// pointer to packet buffer size and memory key of each packet buffer
|
||||
ib_sg_entry.length = size;
|
||||
ib_sg_entry.lkey = mr.GetLocalKey();
|
||||
ib_sg_entry.addr = (uintptr_t)(pointer);
|
||||
|
||||
ib_wr.num_sge = 1;
|
||||
ib_wr.sg_list = &ib_sg_entry;
|
||||
ib_wr.next = nullptr;
|
||||
ib_wr.wr_id = location;
|
||||
|
||||
int ret;
|
||||
while ((ret = ibv_post_recv(qp, &ib_wr, &ib_bad_recv_wr))) {
|
||||
if (ret != ENOMEM)
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Receiving with IB Verbs failed ", ret);
|
||||
// ENONEM is not enough location in receive queue; just wait
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(1));
|
||||
}
|
||||
}
|
||||
|
||||
void IBQueuePair::PostSendWR(IBMemoryRegion &mr, void *pointer, size_t size) {
|
||||
// Send the frame via RDMA
|
||||
ibv_sge ib_sg{};
|
||||
ibv_send_wr ib_wr{};
|
||||
ibv_send_wr *ib_bad_wr = nullptr;
|
||||
|
||||
memset(&ib_sg, 0, sizeof(ib_sg));
|
||||
memset(&ib_wr, 0, sizeof(ib_wr));
|
||||
|
||||
ib_sg.addr = (uintptr_t)(pointer);
|
||||
ib_sg.length = size;
|
||||
ib_sg.lkey = mr.GetLocalKey();
|
||||
|
||||
ib_wr.wr_id = UINT64_MAX;
|
||||
ib_wr.sg_list = &ib_sg;
|
||||
ib_wr.num_sge = 1;
|
||||
|
||||
ib_wr.opcode = IBV_WR_SEND;
|
||||
ib_wr.send_flags = IBV_SEND_SIGNALED;
|
||||
|
||||
int ret;
|
||||
while ((ret = ibv_post_send(qp, &ib_wr, &ib_bad_wr))) {
|
||||
if (ret != ENOMEM)
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Sending with IB Verbs failed", ret);
|
||||
// ENONEM is not enough location in send queue; just wait
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(1));
|
||||
}
|
||||
}
|
||||
|
||||
void IBQueuePair::FlowSteeringIPv4(uint32_t ipv4) {
|
||||
struct raw_eth_flow_attr {
|
||||
struct ibv_flow_attr attr;
|
||||
struct ibv_flow_spec_ipv4 spec_ipv4;
|
||||
} __attribute__((packed)) flow_attr = {
|
||||
.attr = {
|
||||
.comp_mask = 0,
|
||||
.type = IBV_FLOW_ATTR_NORMAL,
|
||||
.size = sizeof(flow_attr),
|
||||
.priority = 0,
|
||||
.num_of_specs = 1,
|
||||
.port = 1,
|
||||
.flags = 0,
|
||||
},
|
||||
.spec_ipv4 = {
|
||||
.type = IBV_FLOW_SPEC_IPV4,
|
||||
.size = sizeof(struct ibv_flow_spec_ipv4),
|
||||
.val = {
|
||||
.src_ip = 0,
|
||||
.dst_ip = ipv4
|
||||
},
|
||||
.mask = {
|
||||
.src_ip = 0,
|
||||
.dst_ip = 0xFFFFFFFF
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
flow = ibv_create_flow(qp, &flow_attr.attr);
|
||||
if (flow == nullptr)
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Failed to set IB flow steering");
|
||||
}
|
||||
|
||||
IBQueuePair::~IBQueuePair() {
|
||||
if (flow)
|
||||
ibv_destroy_flow(flow);
|
||||
ibv_destroy_qp(qp);
|
||||
}
|
||||
|
||||
IBProtectionDomain::IBProtectionDomain(IBContext &context) {
|
||||
pd = ibv_alloc_pd(context.GetPointer());
|
||||
if (pd == nullptr)
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Failed to allocate IB protection domain.");
|
||||
}
|
||||
|
||||
IBProtectionDomain::~IBProtectionDomain() {
|
||||
ibv_dealloc_pd(pd);
|
||||
}
|
||||
|
||||
ibv_pd *IBProtectionDomain::GetPointer() {
|
||||
return pd;
|
||||
}
|
||||
|
||||
IBMemoryRegion::IBMemoryRegion(IBProtectionDomain &pd, void *pointer, size_t size) {
|
||||
buffer_mr = ibv_reg_mr(pd.GetPointer(), pointer, size, IBV_ACCESS_LOCAL_WRITE);
|
||||
|
||||
if (buffer_mr == nullptr)
|
||||
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Failed to register IB memory region.");
|
||||
}
|
||||
|
||||
uint32_t IBMemoryRegion::GetLocalKey() {
|
||||
return buffer_mr->lkey;
|
||||
}
|
||||
|
||||
uint32_t IBMemoryRegion::GetRemoteKey() {
|
||||
return buffer_mr->rkey;
|
||||
}
|
||||
|
||||
IBMemoryRegion::~IBMemoryRegion() {
|
||||
ibv_dereg_mr(buffer_mr);
|
||||
}
|
||||
|
||||
IBRegBuffer::IBRegBuffer(IBProtectionDomain &pd, size_t in_loc_count, size_t in_loc_size, int16_t numa_node) :
|
||||
loc_count(in_loc_count), loc_size(in_loc_size) {
|
||||
buffer = (uint8_t *) mmap(nullptr, loc_size * loc_count, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
|
||||
if (buffer == nullptr)
|
||||
throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "frame_buffer");
|
||||
|
||||
#ifdef JFJOCH_USE_NUMA
|
||||
if (numa_node >= 0) {
|
||||
unsigned long nodemask = 1L << numa_node;;
|
||||
if (numa_node > sizeof(nodemask)*8) {
|
||||
Unmap();
|
||||
throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Mask too small for NUMA node");
|
||||
}
|
||||
if (mbind(buffer, loc_size * loc_count, MPOL_BIND, &nodemask, sizeof(nodemask)*8, MPOL_MF_STRICT) == -1) {
|
||||
Unmap();
|
||||
throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Cannot apply NUMA policy");
|
||||
}
|
||||
}
|
||||
#endif
|
||||
memset(buffer, 0, loc_size * loc_count);
|
||||
|
||||
mr = std::make_unique<IBMemoryRegion>(pd, buffer, loc_size * loc_count);
|
||||
}
|
||||
|
||||
void IBRegBuffer::Unmap() {
|
||||
munmap(buffer, loc_size * loc_count);
|
||||
}
|
||||
|
||||
IBRegBuffer::~IBRegBuffer() {
|
||||
mr.reset();
|
||||
Unmap();
|
||||
}
|
||||
|
||||
IBMemoryRegion *IBRegBuffer::GetMemoryRegion() {
|
||||
return mr.get();
|
||||
}
|
||||
|
||||
uint8_t *IBRegBuffer::GetLocation(uint64_t location) {
|
||||
if (location < loc_count)
|
||||
return buffer + loc_size * location;
|
||||
else
|
||||
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Location out of bounds");
|
||||
}
|
||||
|
||||
#endif //JFJOCH_USE_IBVERBS
|
||||
Reference in New Issue
Block a user