Merge branch 'raw_eth' into 'main'

Raw Ethernet: working in tests

See merge request jungfraujoch/nextgendcu!1
This commit is contained in:
2023-04-11 14:21:29 +00:00
10 changed files with 285 additions and 290 deletions

View File

@@ -791,10 +791,13 @@ uint32_t DiffractionExperiment::GetSrcIPv4Address(uint32_t data_stream, uint32_t
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Non existing data stream");
if (half_module >= 2 * GetModulesNum())
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Non existing module");
if ((data_stream+1) >= 256/32)
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds,
"Cannot handle more than 7 data stream in the current model");
if (half_module >= 32)
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds,
"Cannot handle more than 16 modules / data stream in the current model");
uint32_t host = ((data_stream * 32) | half_module) << 24;
uint32_t host = (((data_stream+1) * 32) | half_module) << 24;
return internal.ipv4_subnet() | host;
}
@@ -802,7 +805,7 @@ uint32_t DiffractionExperiment::GetDestIPv4Address(uint32_t data_stream) const {
if (data_stream >= GetDataStreamsNum())
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Non existing data stream");
uint32_t host = (0xFFlu - data_stream) << 24;
uint32_t host = (data_stream + 1) << 24;
return internal.ipv4_subnet() | host;
}

View File

@@ -8,7 +8,7 @@ ADD_LIBRARY(JungfraujochHost STATIC
AcquisitionOfflineCounters.cpp AcquisitionOfflineCounters.h
IBWrappers.cpp IBWrappers.h
MlxRawEthDevice.cpp MlxRawEthDevice.h
../../jungfrau/jf_packet.h UdpReceiver.cpp UdpReceiver.h IBReceiver.cpp IBReceiver.h)
../../jungfrau/jf_packet.h UdpReceiver.cpp UdpReceiver.h)
TARGET_LINK_LIBRARIES(JungfraujochHost CommonFunctions HLSSimulation ${IBVERBS} JFCalibration)
@@ -19,6 +19,9 @@ IF(IBVERBS)
TARGET_COMPILE_DEFINITIONS(JungfraujochHost PUBLIC -DJFJOCH_USE_IBVERBS)
TARGET_LINK_LIBRARIES(JungfraujochHost ${IBVERBS})
MESSAGE(STATUS "JFJochReceiver compiled with IBVerbs support")
ADD_EXECUTABLE(MlxRawEthRcv MlxRawEthRcv.cpp)
TARGET_LINK_LIBRARIES(MlxRawEthRcv JungfraujochHost)
ENDIF()
IF(HAS_NUMAIF AND HAS_NUMA_H AND NUMA_LIBRARY)
@@ -39,4 +42,4 @@ INSTALL(TARGETS jfjoch_pcie_set_network RUNTIME)
ADD_EXECUTABLE(jfjoch_pcie_clear_net_counters jfjoch_pcie_clear_net_counters.cpp)
TARGET_LINK_LIBRARIES(jfjoch_pcie_clear_net_counters JungfraujochHost)
INSTALL(TARGETS jfjoch_pcie_clear_net_counters RUNTIME)
INSTALL(TARGETS jfjoch_pcie_clear_net_counters RUNTIME)

View File

@@ -1,171 +0,0 @@
// Copyright (2019-2022) Paul Scherrer Institute
// SPDX-License-Identifier: GPL-3.0-or-later
#ifdef JFJOCH_USE_IBVERBS
#include "IBReceiver.h"
#include <sys/mman.h>
#include <netinet/in.h>
#ifdef JFJOCH_USE_NUMA
#include <numaif.h>
#include <numa.h>
#endif
#include "../common/JFJochException.h"
#define BUFFER_SIZE (9000)
#define BUFFER_COUNT 4096
IBReceiverBuffer::IBReceiverBuffer(uint8_t numa_node) {
buffer = (uint8_t *) mmap(nullptr, BUFFER_SIZE * BUFFER_COUNT, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (buffer == nullptr)
throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "frame_buffer");
#ifdef JFJOCH_USE_NUMA
if (numa_node >= 0) {
unsigned long nodemask = 1L << numa_node;;
if (numa_node > sizeof(nodemask)*8) {
Unmap();
throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Mask too small for NUMA node");
}
if (mbind(buffer, BUFFER_SIZE * BUFFER_COUNT, MPOL_BIND, &nodemask, sizeof(nodemask)*8, MPOL_MF_STRICT) == -1) {
Unmap();
throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Cannot apply NUMA policy");
}
}
#endif
memset(buffer, 0, BUFFER_SIZE * BUFFER_COUNT);
}
void IBReceiverBuffer::Unmap() {
munmap(buffer, BUFFER_SIZE * BUFFER_COUNT);
}
IBReceiverBuffer::~IBReceiverBuffer() {
mr.reset();
Unmap();
}
void IBReceiverBuffer::Register(IBProtectionDomain &pd) {
mr = std::make_unique<IBMemoryRegion>(pd, buffer, BUFFER_SIZE * BUFFER_COUNT);
}
IBMemoryRegion *IBReceiverBuffer::GetMemoryRegion() {
return mr.get();
}
uint8_t *IBReceiverBuffer::GetLocation(uint64_t location) {
if (location < BUFFER_COUNT)
return buffer + BUFFER_SIZE * location;
else
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Location out of bounds");
}
IBReceiver::IBReceiver(IBContext &context,
ThreadSafeFIFO<Completion> &completion_queue,
ThreadSafeFIFO<ProcessWorkRequest> &wr_queue,
uint32_t nmodules,
uint64_t mac_addr,
uint32_t ipv4,
volatile bool &in_cancel,
uint32_t nthreads,
uint8_t numa_node)
: pd(context),
cq(context, BUFFER_COUNT),
qp(pd, cq, 16, BUFFER_COUNT),
buffer(numa_node),
process(completion_queue, wr_queue, nmodules),
cancel(in_cancel) {
buffer.Register(pd);
qp.FlowSteering(mac_addr, ipv4);
for (int i = 0; i < BUFFER_COUNT - 1; i++)
qp.PostReceiveWR(*buffer.GetMemoryRegion(), i, buffer.GetLocation(i), BUFFER_SIZE);
for (int i = 0; i < nthreads; i++)
futures.emplace_back(std::async(std::launch::async, &IBReceiver::Run, this, numa_node));
futures.emplace_back(std::async(std::launch::async, &IBReceiver::Arp, this, mac_addr, ipv4));
}
void IBReceiver::Run(uint8_t numa_node) {
#ifdef JFJOCH_USE_NUMA
if (numa_available() != -1)
numa_run_on_node(numa_node);
#endif
while (!cancel) {
int64_t i;
size_t size;
if (cq.Poll(i, size) > 0) {
if (size == sizeof(jf_raw_packet)) {
auto ptr = (jf_raw_packet *) buffer.GetLocation(i);
process.ProcessPacket(&ptr->jf, ptr->ipv4_header_sour_ip);
qp.PostReceiveWR(*buffer.GetMemoryRegion(), i, buffer.GetLocation(i), BUFFER_SIZE);
}
} else
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
}
// ARP packet - from if_arp.h
#pragma pack(push)
#pragma pack(2)
struct RAW_ARP_Packet
{
unsigned char dest_mac[6];
unsigned char sour_mac[6];
uint16_t ether_type;
unsigned short int ar_hrd; /* Format of hardware address. */
unsigned short int ar_pro; /* Format of protocol address. */
unsigned char ar_hln; /* Length of hardware address. */
unsigned char ar_pln; /* Length of protocol address. */
unsigned short int ar_op; /* ARP opcode (command). */
unsigned char ar_sha[6]; /* Sender hardware address. */
unsigned ar_sip; /* Sender IP address. */
unsigned char ar_tha[6]; /* Target hardware address. */
unsigned ar_tip; /* Target IP address. */
};
#pragma pack(pop)
void IBReceiver::Arp(uint64_t mac_addr, uint32_t ipv4_addr) {
uint8_t src_mac[6];
for (int i = 0; i < 6; i++)
src_mac[i] = (mac_addr & (0xFF << (i * 8))) >> (i * 8);
RAW_ARP_Packet arp_packet{
.dest_mac = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF},
.sour_mac = {src_mac[5], src_mac[4], src_mac[3], src_mac[2], src_mac[1], src_mac[0]},
.ether_type = htons(0x0806), // ether type for ARP
.ar_hrd = htons(1), // MAC addr
.ar_pro = htons(0x0800), // IPv4
.ar_hln = 0x6, // 6 bytes
.ar_pln = 0x4, // 4 bytes
.ar_op = htons(1), // ARP request
.ar_sha = {src_mac[5], src_mac[4], src_mac[3], src_mac[2], src_mac[1], src_mac[0]},
.ar_sip = ipv4_addr,
.ar_tha = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF},
.ar_tip = ipv4_addr
};
while (!cancel) {
qp.PostInlineSendWR(&arp_packet, sizeof(RAW_ARP_Packet));
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
IBReceiver::~IBReceiver() {
cancel = true;
for (auto &iter: futures) {
if (iter.valid())
iter.get();
}
}
#endif //JFJOCH_USE_IBVERBS

View File

@@ -1,52 +0,0 @@
// Copyright (2019-2022) Paul Scherrer Institute
// SPDX-License-Identifier: GPL-3.0-or-later
#ifdef JFJOCH_USE_IBVERBS
#ifndef JUNGFRAUJOCH_IBRECEIVER_H
#define JUNGFRAUJOCH_IBRECEIVER_H
#include "IBWrappers.h"
#include <future>
#include "../../jungfrau/ProcessJFPacket.h"
class IBReceiverBuffer {
std::unique_ptr<IBMemoryRegion> mr;
uint8_t *buffer;
void Unmap();
public:
explicit IBReceiverBuffer(uint8_t numa_node = 1);
~IBReceiverBuffer();
void Register(IBProtectionDomain &pd);
uint8_t *GetLocation(uint64_t location);
IBMemoryRegion *GetMemoryRegion();
};
class IBReceiver {
IBProtectionDomain pd;
IBCompletionQueue cq;
IBQueuePair qp;
IBReceiverBuffer buffer;
ProcessJFPacket process;
volatile bool &cancel;
std::vector<std::future<void>> futures;
void Run(uint8_t numa_node);
void Arp(uint64_t mac_addr, uint32_t ipv4_addr);
public:
IBReceiver(IBContext &context,
ThreadSafeFIFO<Completion> &completion_queue,
ThreadSafeFIFO<ProcessWorkRequest> &wr_queue,
uint32_t nmodules,
uint64_t mac_addr,
uint32_t ipv4,
volatile bool &cancel,
uint32_t nthreads = 1,
uint8_t numa_node = -1);
~IBReceiver();
};
#endif //JUNGFRAUJOCH_IBRECEIVER_H
#endif //JFJOCH_USE_IBVERBS

View File

@@ -5,6 +5,12 @@
#include <thread>
#include <chrono>
#include <sys/mman.h>
#ifdef JFJOCH_USE_NUMA
#include <numaif.h>
#include <numa.h>
#endif
#include "IBWrappers.h"
#include "../../common/JFJochException.h"
@@ -122,11 +128,9 @@ void IBQueuePair::Init() {
ibv_qp_attr qp_attr{};
memset(&qp_attr, 0, sizeof(qp_attr));
int qp_flags = IBV_QP_STATE | IBV_QP_PORT |IBV_QP_PKEY_INDEX | IBV_QP_ACCESS_FLAGS;
int qp_flags = IBV_QP_STATE | IBV_QP_PORT;
qp_attr.qp_state = IBV_QPS_INIT;
qp_attr.port_num = 1;
qp_attr.qp_access_flags = 0;
qp_attr.pkey_index = 0;
if (ibv_modify_qp(qp, &qp_attr, qp_flags))
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Failed modify IB queue pair to init.");
@@ -177,11 +181,11 @@ void IBQueuePair::PostReceiveWR(IBMemoryRegion &mr, uint32_t location, void *poi
// pointer to packet buffer size and memory key of each packet buffer
ib_sg_entry.length = size;
ib_sg_entry.lkey = mr.GetLocalKey();
ib_sg_entry.addr = (uintptr_t)(pointer);
ib_wr.num_sge = 1;
ib_wr.sg_list = &ib_sg_entry;
ib_wr.next = nullptr;
ib_sg_entry.addr = (uintptr_t)(pointer);
ib_wr.wr_id = location;
int ret;
@@ -193,7 +197,7 @@ void IBQueuePair::PostReceiveWR(IBMemoryRegion &mr, uint32_t location, void *poi
}
}
void IBQueuePair::PostInlineSendWR(void *pointer, size_t size) {
void IBQueuePair::PostSendWR(IBMemoryRegion &mr, void *pointer, size_t size) {
// Send the frame via RDMA
ibv_sge ib_sg{};
ibv_send_wr ib_wr{};
@@ -204,13 +208,14 @@ void IBQueuePair::PostInlineSendWR(void *pointer, size_t size) {
ib_sg.addr = (uintptr_t)(pointer);
ib_sg.length = size;
ib_sg.lkey = mr.GetLocalKey();
ib_wr.wr_id = UINT64_MAX;
ib_wr.sg_list = &ib_sg;
ib_wr.num_sge = 1;
ib_wr.opcode = IBV_WR_SEND;
ib_wr.send_flags = IBV_SEND_INLINE;
ib_wr.send_flags = IBV_SEND_SIGNALED;
int ret;
while ((ret = ibv_post_send(qp, &ib_wr, &ib_bad_wr))) {
@@ -221,14 +226,9 @@ void IBQueuePair::PostInlineSendWR(void *pointer, size_t size) {
}
}
void IBQueuePair::FlowSteering(uint64_t mac_addr, uint32_t ipv4) {
uint8_t dst_mac_addr[6];
for (int i = 0; i < 6; i++)
dst_mac_addr[i] = (mac_addr & (0xFF << (i*8))) >> (i * 8);
void IBQueuePair::FlowSteeringIPv4(uint32_t ipv4) {
struct raw_eth_flow_attr {
struct ibv_flow_attr attr;
struct ibv_flow_spec_eth spec_eth;
struct ibv_flow_spec_ipv4 spec_ipv4;
} __attribute__((packed)) flow_attr = {
.attr = {
@@ -236,33 +236,10 @@ void IBQueuePair::FlowSteering(uint64_t mac_addr, uint32_t ipv4) {
.type = IBV_FLOW_ATTR_NORMAL,
.size = sizeof(flow_attr),
.priority = 0,
.num_of_specs = 2,
.num_of_specs = 1,
.port = 1,
.flags = 0,
},
.spec_eth = {
.type = IBV_FLOW_SPEC_ETH,
.size = sizeof(struct ibv_flow_spec_eth),
.val = {
.dst_mac = {
dst_mac_addr[0],
dst_mac_addr[1],
dst_mac_addr[2],
dst_mac_addr[3],
dst_mac_addr[4],
dst_mac_addr[5]
},
.src_mac = {0,0,0,0,0,0},
.ether_type = 0x0800,
.vlan_tag = 0x0
},
.mask = {
.dst_mac = {0xFF,0xFF,0xFF,0xFF,0xFF,0xFF},
.src_mac = {0,0,0,0,0,0},
.ether_type = 0xFFFF,
.vlan_tag = 0x0
}
},
.spec_ipv4 = {
.type = IBV_FLOW_SPEC_IPV4,
.size = sizeof(struct ibv_flow_spec_ipv4),
@@ -277,6 +254,7 @@ void IBQueuePair::FlowSteering(uint64_t mac_addr, uint32_t ipv4) {
}
};
flow = ibv_create_flow(qp, &flow_attr.attr);
if (flow == nullptr)
throw JFJochException(JFJochExceptionCategory::IBVerbs, "Failed to set IB flow steering");
@@ -321,4 +299,48 @@ IBMemoryRegion::~IBMemoryRegion() {
ibv_dereg_mr(buffer_mr);
}
IBRegBuffer::IBRegBuffer(IBProtectionDomain &pd, size_t in_loc_count, size_t in_loc_size, int16_t numa_node) :
loc_count(in_loc_count), loc_size(in_loc_size) {
buffer = (uint8_t *) mmap(nullptr, loc_size * loc_count, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (buffer == nullptr)
throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "frame_buffer");
#ifdef JFJOCH_USE_NUMA
if (numa_node >= 0) {
unsigned long nodemask = 1L << numa_node;;
if (numa_node > sizeof(nodemask)*8) {
Unmap();
throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Mask too small for NUMA node");
}
if (mbind(buffer, loc_size * loc_count, MPOL_BIND, &nodemask, sizeof(nodemask)*8, MPOL_MF_STRICT) == -1) {
Unmap();
throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Cannot apply NUMA policy");
}
}
#endif
memset(buffer, 0, loc_size * loc_count);
mr = std::make_unique<IBMemoryRegion>(pd, buffer, loc_size * loc_count);
}
void IBRegBuffer::Unmap() {
munmap(buffer, loc_size * loc_count);
}
IBRegBuffer::~IBRegBuffer() {
mr.reset();
Unmap();
}
IBMemoryRegion *IBRegBuffer::GetMemoryRegion() {
return mr.get();
}
uint8_t *IBRegBuffer::GetLocation(uint64_t location) {
if (location < loc_count)
return buffer + loc_size * location;
else
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Location out of bounds");
}
#endif //JFJOCH_USE_IBVERBS

View File

@@ -60,11 +60,24 @@ public:
void ReadyToSend();
void PostReceiveWR(IBMemoryRegion &mr, uint32_t location, void *pointer, size_t size);
void PostInlineSendWR(void *pointer, size_t size);
void FlowSteering(uint64_t mac_addr, uint32_t ipv4);
void PostSendWR(IBMemoryRegion &mr, void *pointer, size_t size);
void FlowSteeringIPv4(uint32_t ipv4);
~IBQueuePair();
};
class IBRegBuffer {
std::unique_ptr<IBMemoryRegion> mr;
uint8_t *buffer;
const size_t loc_count;
const size_t loc_size;
void Unmap();
public:
explicit IBRegBuffer(IBProtectionDomain &pd, size_t loc_count, size_t loc_size, int16_t numa_node = -1);
~IBRegBuffer();
uint8_t *GetLocation(uint64_t location);
IBMemoryRegion *GetMemoryRegion();
};
#endif //JUNGFRAUJOCH_IBWRAPPERS_H
#endif // JFJOCH_USE_IBVERBS

View File

@@ -4,6 +4,12 @@
#ifdef JFJOCH_USE_IBVERBS
#include "MlxRawEthDevice.h"
#include <netinet/in.h>
#ifdef JFJOCH_USE_NUMA
#include <numa.h>
#endif
MlxRawEthDevice::MlxRawEthDevice(uint16_t dev_id, uint16_t data_stream, size_t in_frame_buffer_size_modules,
int16_t in_numa_node)
: AcquisitionDevice(data_stream),
@@ -35,21 +41,67 @@ void MlxRawEthDevice::HW_ReadActionRegister(ActionConfig *job) {
memcpy(job, &cfg, sizeof(ActionConfig));
}
void MlxRawEthDevice::MeasureThread() {
try {
IBProtectionDomain pd(context);
IBCompletionQueue cq(context, BUFFER_COUNT+2);
IBQueuePair qp(pd, cq, 16, BUFFER_COUNT);
IBRegBuffer buffer(pd, BUFFER_COUNT, BUFFER_SIZE, numa_node);
ProcessJFPacket process(completion_queue, wr_queue, cfg.nmodules);
qp.Init();
qp.ReadyToReceive();
qp.ReadyToSend();
qp.FlowSteeringIPv4(cfg.fpga_ipv4_addr);
for (int i = 0; i < BUFFER_COUNT-1; i++)
qp.PostReceiveWR(*buffer.GetMemoryRegion(), i, buffer.GetLocation(i), BUFFER_SIZE);
completion_queue.Put(Completion{
.type = Completion::Type::Start
});
auto cq_poll_future = std::async(std::launch::async, &MlxRawEthDevice::PollCQ,
this,
std::ref(buffer),
std::ref(qp),
std::ref(cq),
std::ref(process));
auto arp_future = std::async(std::launch::async, &MlxRawEthDevice::SendARP,
this,
std::ref(buffer),
std::ref(qp));
uint64_t packet_count = cq_poll_future.get();
arp_future.get();
completion_queue.Put(Completion{
.type = Completion::Type::End,
.frame_number = packet_count
});
} catch (const JFJochException &e) {
cancel = true;
throw e;
}
idle = true;
}
void MlxRawEthDevice::HW_StartAction() {
if (cfg.mode & MODE_CONV)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Conversion on CPU flag has to be enabled for Raw Ethernet device");
cancel = false;
receiver = std::make_unique<IBReceiver>(context,
completion_queue,
wr_queue,
cfg.nmodules,
mac_addr,
cfg.fpga_ipv4_addr,
cancel,
1,
numa_node);
idle = false;
measure = std::async(std::launch::async, &MlxRawEthDevice::MeasureThread, this);
}
bool MlxRawEthDevice::HW_IsIdle() const {
return (receiver == nullptr);
return idle;
}
void MlxRawEthDevice::HW_SetCancelDataCollectionBit() {
@@ -76,7 +128,8 @@ void MlxRawEthDevice::HW_GetStatus(ActionStatus *status) const {
}
void MlxRawEthDevice::HW_EndAction() {
receiver.reset();
if (measure.valid())
measure.get();
}
void MlxRawEthDevice::CopyInternalPacketGenFrameToDeviceBuffer() {
@@ -92,4 +145,79 @@ uint64_t MlxRawEthDevice::HW_GetMACAddress() const {
return mac_addr;
}
uint64_t MlxRawEthDevice::PollCQ(IBRegBuffer &buffer, IBQueuePair &qp, IBCompletionQueue &cq, ProcessJFPacket &process) {
#ifdef JFJOCH_USE_NUMA
if (numa_available() != -1)
numa_run_on_node(numa_node);
#endif
uint64_t packet_counter = 0;
while (!cancel) {
int64_t i;
size_t size;
if (cq.Poll(i, size) > 0) {
if ((i < BUFFER_COUNT) && (size == sizeof(jf_raw_packet))) {
auto ptr = (jf_raw_packet *) buffer.GetLocation(i);
process.ProcessPacket(&ptr->jf, ptr->ipv4_header_sour_ip);
qp.PostReceiveWR(*buffer.GetMemoryRegion(), i, buffer.GetLocation(i), BUFFER_SIZE);
packet_counter++;
}
} else
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
return packet_counter;
}
// ARP packet - from if_arp.h
#pragma pack(push)
#pragma pack(2)
struct RAW_ARP_Packet
{
unsigned char dest_mac[6];
unsigned char sour_mac[6];
uint16_t ether_type;
unsigned short int ar_hrd; /* Format of hardware address. */
unsigned short int ar_pro; /* Format of protocol address. */
unsigned char ar_hln; /* Length of hardware address. */
unsigned char ar_pln; /* Length of protocol address. */
unsigned short int ar_op; /* ARP opcode (command). */
unsigned char ar_sha[6]; /* Sender hardware address. */
unsigned ar_sip; /* Sender IP address. */
unsigned char ar_tha[6]; /* Target hardware address. */
unsigned ar_tip; /* Target IP address. */
};
#pragma pack(pop)
void MlxRawEthDevice::SendARP(IBRegBuffer &buffer, IBQueuePair &qp) {
uint8_t src_mac[6];
for (int i = 0; i < 6; i++)
src_mac[i] = (mac_addr & (0xFFlu << (i * 8))) >> (i * 8);
RAW_ARP_Packet arp_packet{
.dest_mac = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF},
.sour_mac = {src_mac[0], src_mac[1], src_mac[2], src_mac[3], src_mac[4], src_mac[5]},
.ether_type = htons(0x0806), // ether type for ARP
.ar_hrd = htons(1), // MAC addr
.ar_pro = htons(0x0800), // IPv4
.ar_hln = 0x6, // 6 bytes
.ar_pln = 0x4, // 4 bytes
.ar_op = htons(1), // ARP request
.ar_sha = {src_mac[0], src_mac[1], src_mac[2], src_mac[3], src_mac[4], src_mac[5]},
.ar_sip = cfg.fpga_ipv4_addr,
.ar_tha = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF},
.ar_tip = cfg.fpga_ipv4_addr
};
memcpy(buffer.GetLocation(BUFFER_COUNT-1), &arp_packet, sizeof(RAW_ARP_Packet));
while (!cancel) {
qp.PostSendWR(*buffer.GetMemoryRegion(), buffer.GetLocation(BUFFER_COUNT-1), sizeof(RAW_ARP_Packet));
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
#endif //JFJOCH_USE_IBVERBS

View File

@@ -2,14 +2,21 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifdef JFJOCH_USE_IBVERBS
#ifndef JUNGFRAUJOCH_RAWETHDEVICE_H
#define JUNGFRAUJOCH_RAWETHDEVICE_H
#include <future>
#include "AcquisitionDevice.h"
#include "IBWrappers.h"
#include "IBReceiver.h"
#include "../../jungfrau/ProcessJFPacket.h"
class MlxRawEthDevice : public AcquisitionDevice {
constexpr const static size_t BUFFER_SIZE = 9000;
constexpr const static size_t BUFFER_COUNT = 4096;
std::mutex m;
IBContext context;
ThreadSafeFIFO<Completion> completion_queue;
@@ -18,9 +25,19 @@ class MlxRawEthDevice : public AcquisitionDevice {
ActionConfig cfg;
const int16_t numa_node;
std::unique_ptr<IBReceiver> receiver;
volatile bool cancel = false;
std::future<void> measure;
volatile bool cancel = false;
volatile bool idle = true;
void SendARP(IBRegBuffer &buffer, IBQueuePair &qp);
uint64_t PollCQ(IBRegBuffer &buffer,
IBQueuePair &qp,
IBCompletionQueue &cq,
ProcessJFPacket &process);
void MeasureThread();
Completion ReadCompletion() override;
void HW_WriteActionRegister(const ActionConfig *job) override;
void HW_ReadActionRegister(ActionConfig *job) override;

View File

@@ -0,0 +1,32 @@
// Copyright (2019-2022) Paul Scherrer Institute
// SPDX-License-Identifier: GPL-3.0-or-later
#include "MlxRawEthDevice.h"
#include "../common/DiffractionExperiment.h"
int main(int argc, char **argv) {
DiffractionExperiment experiment(2, {4,4}, 8, 36);
experiment.Mode(DetectorMode::Raw);
experiment.ImagesPerTrigger(1000);
Logger logger("MlxRawEthRcv");
logger.Verbose(true);
try {
MlxRawEthDevice mlx(1, 0, 1024);
logger.Info("Mac addr {}", mlx.GetMACAddress());
mlx.EnableLogging(&logger);
mlx.StartAction(experiment);
mlx.WaitForActionComplete();
logger.Info("Bytes received {}", mlx.GetBytesReceived());
JFJochProtoBuf::AcquisitionDeviceStatistics statistics;
mlx.SaveStatistics(experiment, statistics);
logger.Info("{} {}", statistics.efficiency(), statistics.packets_received_per_module(1));
} catch (std::exception &e) {
logger.ErrorException(e);
exit(EXIT_FAILURE);
}
}

View File

@@ -177,11 +177,11 @@ TEST_CASE("DiffractionExperiment_IPv4Address","[DiffractionExperiment]") {
uint32_t ndatastreams = 3;
REQUIRE(x.GetDestIPv4Address(0) == 0xFF32010a);
REQUIRE(x.GetDestIPv4Address(1) == 0xFE32010a);
REQUIRE(x.GetDestIPv4Address(2) == 0xFD32010a);
REQUIRE(x.GetSrcIPv4Address(0, 4) == (0x0032010a | ((0 * 32 + 4) << 24)));
REQUIRE(x.GetSrcIPv4Address(2, 23) == (0x0032010a | ((2 * 32 + 23) << 24)));
REQUIRE(x.GetDestIPv4Address(0) == 0x0132010a);
REQUIRE(x.GetDestIPv4Address(1) == 0x0232010a);
REQUIRE(x.GetDestIPv4Address(2) == 0x0332010a);
REQUIRE(x.GetSrcIPv4Address(0, 4) == (0x0032010a | ((1 * 32 + 4) << 24)));
REQUIRE(x.GetSrcIPv4Address(2, 23) == (0x0032010a | ((3 * 32 + 23) << 24)));
REQUIRE_THROWS(x.GetDestIPv4Address(3));
REQUIRE_THROWS(x.GetSrcIPv4Address(0, 24));
REQUIRE_THROWS(x.GetSrcIPv4Address(3, 5));
@@ -193,12 +193,12 @@ TEST_CASE("DiffractionExperiment_IPv4Address","[DiffractionExperiment]") {
REQUIRE_THROWS(x.IPv4Subnet("64.1.124.129"));
REQUIRE_NOTHROW(x.IPv4Subnet("64.1.124.0"));
REQUIRE(x.GetDestIPv4Address(0) == 0xFF7c0140u);
REQUIRE(x.GetDestIPv4Address(1) == 0xFE7c0140u);
REQUIRE(x.GetDestIPv4Address(2) == 0xFD7c0140u);
REQUIRE(x.GetSrcIPv4Address(2, 12) == 0x007c0140u + ((2 * 32 + 12) << 24));
REQUIRE(x.GetDestIPv4Address(0) == 0x017c0140u);
REQUIRE(x.GetDestIPv4Address(1) == 0x027c0140u);
REQUIRE(x.GetDestIPv4Address(2) == 0x037c0140u);
REQUIRE(x.GetSrcIPv4Address(2, 12) == 0x007c0140u + ((3 * 32 + 12) << 24));
REQUIRE(IPv4AddressToStr(x.GetDestIPv4Address(2)) == "64.1.124.253");
REQUIRE(IPv4AddressToStr(x.GetDestIPv4Address(2)) == "64.1.124.3");
}
TEST_CASE("IPv4AddressToStr","") {
@@ -535,7 +535,7 @@ TEST_CASE("DiffractionExperiment_ExportProtobuf","[DiffractionExperiment]") {
REQUIRE(x.GetImageTime() == y.GetImageTime());
REQUIRE(y.GetFrameCountTime().count() == x.GetFrameCountTime().count());
REQUIRE(y.GetDestUDPPort(0,0) == 64*76);
REQUIRE(y.GetDestIPv4Address(0) == 0xFF020202);
REQUIRE(y.GetDestIPv4Address(0) == 0x01020202);
REQUIRE(y.GetPedestalG0Frames() == x.GetPedestalG0Frames());
REQUIRE(y.GetMaskModuleEdges() == x.GetMaskModuleEdges());
REQUIRE(y.GetMaskChipEdges() == x.GetMaskChipEdges());