From c4e70ca038f0179e26e2c8fed760495767a6280f Mon Sep 17 00:00:00 2001 From: leonarski_f Date: Tue, 11 Apr 2023 14:21:28 +0000 Subject: [PATCH] Raw Ethernet: working in tests --- common/DiffractionExperiment.cpp | 7 +- receiver/host/CMakeLists.txt | 7 +- receiver/host/IBReceiver.cpp | 171 ---------------------------- receiver/host/IBReceiver.h | 52 --------- receiver/host/IBWrappers.cpp | 94 +++++++++------ receiver/host/IBWrappers.h | 17 ++- receiver/host/MlxRawEthDevice.cpp | 150 ++++++++++++++++++++++-- receiver/host/MlxRawEthDevice.h | 23 +++- receiver/host/MlxRawEthRcv.cpp | 32 ++++++ tests/DiffractionExperimentTest.cpp | 22 ++-- 10 files changed, 285 insertions(+), 290 deletions(-) delete mode 100644 receiver/host/IBReceiver.cpp delete mode 100644 receiver/host/IBReceiver.h create mode 100644 receiver/host/MlxRawEthRcv.cpp diff --git a/common/DiffractionExperiment.cpp b/common/DiffractionExperiment.cpp index 2096bf5d..5a1eb264 100644 --- a/common/DiffractionExperiment.cpp +++ b/common/DiffractionExperiment.cpp @@ -791,10 +791,13 @@ uint32_t DiffractionExperiment::GetSrcIPv4Address(uint32_t data_stream, uint32_t throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Non existing data stream"); if (half_module >= 2 * GetModulesNum()) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Non existing module"); + if ((data_stream+1) >= 256/32) + throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, + "Cannot handle more than 7 data stream in the current model"); if (half_module >= 32) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Cannot handle more than 16 modules / data stream in the current model"); - uint32_t host = ((data_stream * 32) | half_module) << 24; + uint32_t host = (((data_stream+1) * 32) | half_module) << 24; return internal.ipv4_subnet() | host; } @@ -802,7 +805,7 @@ uint32_t DiffractionExperiment::GetDestIPv4Address(uint32_t data_stream) const { if (data_stream >= GetDataStreamsNum()) throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Non existing data stream"); - uint32_t host = (0xFFlu - data_stream) << 24; + uint32_t host = (data_stream + 1) << 24; return internal.ipv4_subnet() | host; } diff --git a/receiver/host/CMakeLists.txt b/receiver/host/CMakeLists.txt index e4f17fad..e0f88829 100644 --- a/receiver/host/CMakeLists.txt +++ b/receiver/host/CMakeLists.txt @@ -8,7 +8,7 @@ ADD_LIBRARY(JungfraujochHost STATIC AcquisitionOfflineCounters.cpp AcquisitionOfflineCounters.h IBWrappers.cpp IBWrappers.h MlxRawEthDevice.cpp MlxRawEthDevice.h - ../../jungfrau/jf_packet.h UdpReceiver.cpp UdpReceiver.h IBReceiver.cpp IBReceiver.h) + ../../jungfrau/jf_packet.h UdpReceiver.cpp UdpReceiver.h) TARGET_LINK_LIBRARIES(JungfraujochHost CommonFunctions HLSSimulation ${IBVERBS} JFCalibration) @@ -19,6 +19,9 @@ IF(IBVERBS) TARGET_COMPILE_DEFINITIONS(JungfraujochHost PUBLIC -DJFJOCH_USE_IBVERBS) TARGET_LINK_LIBRARIES(JungfraujochHost ${IBVERBS}) MESSAGE(STATUS "JFJochReceiver compiled with IBVerbs support") + + ADD_EXECUTABLE(MlxRawEthRcv MlxRawEthRcv.cpp) + TARGET_LINK_LIBRARIES(MlxRawEthRcv JungfraujochHost) ENDIF() IF(HAS_NUMAIF AND HAS_NUMA_H AND NUMA_LIBRARY) @@ -39,4 +42,4 @@ INSTALL(TARGETS jfjoch_pcie_set_network RUNTIME) ADD_EXECUTABLE(jfjoch_pcie_clear_net_counters jfjoch_pcie_clear_net_counters.cpp) TARGET_LINK_LIBRARIES(jfjoch_pcie_clear_net_counters JungfraujochHost) -INSTALL(TARGETS jfjoch_pcie_clear_net_counters RUNTIME) +INSTALL(TARGETS jfjoch_pcie_clear_net_counters RUNTIME) \ No newline at end of file diff --git a/receiver/host/IBReceiver.cpp b/receiver/host/IBReceiver.cpp deleted file mode 100644 index 3b921848..00000000 --- a/receiver/host/IBReceiver.cpp +++ /dev/null @@ -1,171 +0,0 @@ -// Copyright (2019-2022) Paul Scherrer Institute -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifdef JFJOCH_USE_IBVERBS - -#include "IBReceiver.h" - -#include -#include - -#ifdef JFJOCH_USE_NUMA -#include -#include -#endif - -#include "../common/JFJochException.h" - -#define BUFFER_SIZE (9000) -#define BUFFER_COUNT 4096 - -IBReceiverBuffer::IBReceiverBuffer(uint8_t numa_node) { - buffer = (uint8_t *) mmap(nullptr, BUFFER_SIZE * BUFFER_COUNT, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); - if (buffer == nullptr) - throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "frame_buffer"); - -#ifdef JFJOCH_USE_NUMA - if (numa_node >= 0) { - unsigned long nodemask = 1L << numa_node;; - if (numa_node > sizeof(nodemask)*8) { - Unmap(); - throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Mask too small for NUMA node"); - } - if (mbind(buffer, BUFFER_SIZE * BUFFER_COUNT, MPOL_BIND, &nodemask, sizeof(nodemask)*8, MPOL_MF_STRICT) == -1) { - Unmap(); - throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Cannot apply NUMA policy"); - } - } -#endif - memset(buffer, 0, BUFFER_SIZE * BUFFER_COUNT); -} - -void IBReceiverBuffer::Unmap() { - munmap(buffer, BUFFER_SIZE * BUFFER_COUNT); -} - -IBReceiverBuffer::~IBReceiverBuffer() { - mr.reset(); - Unmap(); -} - -void IBReceiverBuffer::Register(IBProtectionDomain &pd) { - mr = std::make_unique(pd, buffer, BUFFER_SIZE * BUFFER_COUNT); -} - -IBMemoryRegion *IBReceiverBuffer::GetMemoryRegion() { - return mr.get(); -} - -uint8_t *IBReceiverBuffer::GetLocation(uint64_t location) { - if (location < BUFFER_COUNT) - return buffer + BUFFER_SIZE * location; - else - throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Location out of bounds"); -} - - -IBReceiver::IBReceiver(IBContext &context, - ThreadSafeFIFO &completion_queue, - ThreadSafeFIFO &wr_queue, - uint32_t nmodules, - uint64_t mac_addr, - uint32_t ipv4, - volatile bool &in_cancel, - uint32_t nthreads, - uint8_t numa_node) - : pd(context), - cq(context, BUFFER_COUNT), - qp(pd, cq, 16, BUFFER_COUNT), - buffer(numa_node), - process(completion_queue, wr_queue, nmodules), - cancel(in_cancel) { - buffer.Register(pd); - - qp.FlowSteering(mac_addr, ipv4); - - for (int i = 0; i < BUFFER_COUNT - 1; i++) - qp.PostReceiveWR(*buffer.GetMemoryRegion(), i, buffer.GetLocation(i), BUFFER_SIZE); - - for (int i = 0; i < nthreads; i++) - futures.emplace_back(std::async(std::launch::async, &IBReceiver::Run, this, numa_node)); - futures.emplace_back(std::async(std::launch::async, &IBReceiver::Arp, this, mac_addr, ipv4)); -} - -void IBReceiver::Run(uint8_t numa_node) { -#ifdef JFJOCH_USE_NUMA - if (numa_available() != -1) - numa_run_on_node(numa_node); -#endif - - while (!cancel) { - int64_t i; - size_t size; - if (cq.Poll(i, size) > 0) { - if (size == sizeof(jf_raw_packet)) { - auto ptr = (jf_raw_packet *) buffer.GetLocation(i); - process.ProcessPacket(&ptr->jf, ptr->ipv4_header_sour_ip); - qp.PostReceiveWR(*buffer.GetMemoryRegion(), i, buffer.GetLocation(i), BUFFER_SIZE); - } - } else - std::this_thread::sleep_for(std::chrono::microseconds(10)); - } -} - -// ARP packet - from if_arp.h -#pragma pack(push) -#pragma pack(2) -struct RAW_ARP_Packet -{ - unsigned char dest_mac[6]; - unsigned char sour_mac[6]; - uint16_t ether_type; - - unsigned short int ar_hrd; /* Format of hardware address. */ - unsigned short int ar_pro; /* Format of protocol address. */ - unsigned char ar_hln; /* Length of hardware address. */ - unsigned char ar_pln; /* Length of protocol address. */ - unsigned short int ar_op; /* ARP opcode (command). */ - - unsigned char ar_sha[6]; /* Sender hardware address. */ - unsigned ar_sip; /* Sender IP address. */ - unsigned char ar_tha[6]; /* Target hardware address. */ - unsigned ar_tip; /* Target IP address. */ -}; -#pragma pack(pop) - -void IBReceiver::Arp(uint64_t mac_addr, uint32_t ipv4_addr) { - - uint8_t src_mac[6]; - for (int i = 0; i < 6; i++) - src_mac[i] = (mac_addr & (0xFF << (i * 8))) >> (i * 8); - - RAW_ARP_Packet arp_packet{ - .dest_mac = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}, - .sour_mac = {src_mac[5], src_mac[4], src_mac[3], src_mac[2], src_mac[1], src_mac[0]}, - .ether_type = htons(0x0806), // ether type for ARP - .ar_hrd = htons(1), // MAC addr - .ar_pro = htons(0x0800), // IPv4 - .ar_hln = 0x6, // 6 bytes - .ar_pln = 0x4, // 4 bytes - .ar_op = htons(1), // ARP request - .ar_sha = {src_mac[5], src_mac[4], src_mac[3], src_mac[2], src_mac[1], src_mac[0]}, - .ar_sip = ipv4_addr, - .ar_tha = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}, - .ar_tip = ipv4_addr - }; - - while (!cancel) { - qp.PostInlineSendWR(&arp_packet, sizeof(RAW_ARP_Packet)); - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - } -} - -IBReceiver::~IBReceiver() { - cancel = true; - for (auto &iter: futures) { - if (iter.valid()) - iter.get(); - } -} - -#endif //JFJOCH_USE_IBVERBS \ No newline at end of file diff --git a/receiver/host/IBReceiver.h b/receiver/host/IBReceiver.h deleted file mode 100644 index 8460ea37..00000000 --- a/receiver/host/IBReceiver.h +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright (2019-2022) Paul Scherrer Institute -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifdef JFJOCH_USE_IBVERBS - -#ifndef JUNGFRAUJOCH_IBRECEIVER_H -#define JUNGFRAUJOCH_IBRECEIVER_H - -#include "IBWrappers.h" - -#include - -#include "../../jungfrau/ProcessJFPacket.h" - -class IBReceiverBuffer { - std::unique_ptr mr; - uint8_t *buffer; - void Unmap(); -public: - explicit IBReceiverBuffer(uint8_t numa_node = 1); - ~IBReceiverBuffer(); - void Register(IBProtectionDomain &pd); - uint8_t *GetLocation(uint64_t location); - IBMemoryRegion *GetMemoryRegion(); -}; - -class IBReceiver { - IBProtectionDomain pd; - IBCompletionQueue cq; - IBQueuePair qp; - IBReceiverBuffer buffer; - ProcessJFPacket process; - volatile bool &cancel; - - std::vector> futures; - void Run(uint8_t numa_node); - void Arp(uint64_t mac_addr, uint32_t ipv4_addr); -public: - IBReceiver(IBContext &context, - ThreadSafeFIFO &completion_queue, - ThreadSafeFIFO &wr_queue, - uint32_t nmodules, - uint64_t mac_addr, - uint32_t ipv4, - volatile bool &cancel, - uint32_t nthreads = 1, - uint8_t numa_node = -1); - ~IBReceiver(); -}; - -#endif //JUNGFRAUJOCH_IBRECEIVER_H -#endif //JFJOCH_USE_IBVERBS \ No newline at end of file diff --git a/receiver/host/IBWrappers.cpp b/receiver/host/IBWrappers.cpp index 5f4a4c82..b09e35a6 100644 --- a/receiver/host/IBWrappers.cpp +++ b/receiver/host/IBWrappers.cpp @@ -5,6 +5,12 @@ #include #include +#include + +#ifdef JFJOCH_USE_NUMA +#include +#include +#endif #include "IBWrappers.h" #include "../../common/JFJochException.h" @@ -122,11 +128,9 @@ void IBQueuePair::Init() { ibv_qp_attr qp_attr{}; memset(&qp_attr, 0, sizeof(qp_attr)); - int qp_flags = IBV_QP_STATE | IBV_QP_PORT |IBV_QP_PKEY_INDEX | IBV_QP_ACCESS_FLAGS; + int qp_flags = IBV_QP_STATE | IBV_QP_PORT; qp_attr.qp_state = IBV_QPS_INIT; qp_attr.port_num = 1; - qp_attr.qp_access_flags = 0; - qp_attr.pkey_index = 0; if (ibv_modify_qp(qp, &qp_attr, qp_flags)) throw JFJochException(JFJochExceptionCategory::IBVerbs, "Failed modify IB queue pair to init."); @@ -177,11 +181,11 @@ void IBQueuePair::PostReceiveWR(IBMemoryRegion &mr, uint32_t location, void *poi // pointer to packet buffer size and memory key of each packet buffer ib_sg_entry.length = size; ib_sg_entry.lkey = mr.GetLocalKey(); + ib_sg_entry.addr = (uintptr_t)(pointer); ib_wr.num_sge = 1; ib_wr.sg_list = &ib_sg_entry; ib_wr.next = nullptr; - ib_sg_entry.addr = (uintptr_t)(pointer); ib_wr.wr_id = location; int ret; @@ -193,7 +197,7 @@ void IBQueuePair::PostReceiveWR(IBMemoryRegion &mr, uint32_t location, void *poi } } -void IBQueuePair::PostInlineSendWR(void *pointer, size_t size) { +void IBQueuePair::PostSendWR(IBMemoryRegion &mr, void *pointer, size_t size) { // Send the frame via RDMA ibv_sge ib_sg{}; ibv_send_wr ib_wr{}; @@ -204,13 +208,14 @@ void IBQueuePair::PostInlineSendWR(void *pointer, size_t size) { ib_sg.addr = (uintptr_t)(pointer); ib_sg.length = size; + ib_sg.lkey = mr.GetLocalKey(); ib_wr.wr_id = UINT64_MAX; ib_wr.sg_list = &ib_sg; ib_wr.num_sge = 1; ib_wr.opcode = IBV_WR_SEND; - ib_wr.send_flags = IBV_SEND_INLINE; + ib_wr.send_flags = IBV_SEND_SIGNALED; int ret; while ((ret = ibv_post_send(qp, &ib_wr, &ib_bad_wr))) { @@ -221,14 +226,9 @@ void IBQueuePair::PostInlineSendWR(void *pointer, size_t size) { } } -void IBQueuePair::FlowSteering(uint64_t mac_addr, uint32_t ipv4) { - uint8_t dst_mac_addr[6]; - for (int i = 0; i < 6; i++) - dst_mac_addr[i] = (mac_addr & (0xFF << (i*8))) >> (i * 8); - +void IBQueuePair::FlowSteeringIPv4(uint32_t ipv4) { struct raw_eth_flow_attr { struct ibv_flow_attr attr; - struct ibv_flow_spec_eth spec_eth; struct ibv_flow_spec_ipv4 spec_ipv4; } __attribute__((packed)) flow_attr = { .attr = { @@ -236,33 +236,10 @@ void IBQueuePair::FlowSteering(uint64_t mac_addr, uint32_t ipv4) { .type = IBV_FLOW_ATTR_NORMAL, .size = sizeof(flow_attr), .priority = 0, - .num_of_specs = 2, + .num_of_specs = 1, .port = 1, .flags = 0, }, - .spec_eth = { - .type = IBV_FLOW_SPEC_ETH, - .size = sizeof(struct ibv_flow_spec_eth), - .val = { - .dst_mac = { - dst_mac_addr[0], - dst_mac_addr[1], - dst_mac_addr[2], - dst_mac_addr[3], - dst_mac_addr[4], - dst_mac_addr[5] - }, - .src_mac = {0,0,0,0,0,0}, - .ether_type = 0x0800, - .vlan_tag = 0x0 - }, - .mask = { - .dst_mac = {0xFF,0xFF,0xFF,0xFF,0xFF,0xFF}, - .src_mac = {0,0,0,0,0,0}, - .ether_type = 0xFFFF, - .vlan_tag = 0x0 - } - }, .spec_ipv4 = { .type = IBV_FLOW_SPEC_IPV4, .size = sizeof(struct ibv_flow_spec_ipv4), @@ -277,6 +254,7 @@ void IBQueuePair::FlowSteering(uint64_t mac_addr, uint32_t ipv4) { } }; + flow = ibv_create_flow(qp, &flow_attr.attr); if (flow == nullptr) throw JFJochException(JFJochExceptionCategory::IBVerbs, "Failed to set IB flow steering"); @@ -321,4 +299,48 @@ IBMemoryRegion::~IBMemoryRegion() { ibv_dereg_mr(buffer_mr); } +IBRegBuffer::IBRegBuffer(IBProtectionDomain &pd, size_t in_loc_count, size_t in_loc_size, int16_t numa_node) : + loc_count(in_loc_count), loc_size(in_loc_size) { + buffer = (uint8_t *) mmap(nullptr, loc_size * loc_count, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + if (buffer == nullptr) + throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "frame_buffer"); + +#ifdef JFJOCH_USE_NUMA + if (numa_node >= 0) { + unsigned long nodemask = 1L << numa_node;; + if (numa_node > sizeof(nodemask)*8) { + Unmap(); + throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Mask too small for NUMA node"); + } + if (mbind(buffer, loc_size * loc_count, MPOL_BIND, &nodemask, sizeof(nodemask)*8, MPOL_MF_STRICT) == -1) { + Unmap(); + throw JFJochException(JFJochExceptionCategory::MemAllocFailed, "Cannot apply NUMA policy"); + } + } +#endif + memset(buffer, 0, loc_size * loc_count); + + mr = std::make_unique(pd, buffer, loc_size * loc_count); +} + +void IBRegBuffer::Unmap() { + munmap(buffer, loc_size * loc_count); +} + +IBRegBuffer::~IBRegBuffer() { + mr.reset(); + Unmap(); +} + +IBMemoryRegion *IBRegBuffer::GetMemoryRegion() { + return mr.get(); +} + +uint8_t *IBRegBuffer::GetLocation(uint64_t location) { + if (location < loc_count) + return buffer + loc_size * location; + else + throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "Location out of bounds"); +} + #endif //JFJOCH_USE_IBVERBS diff --git a/receiver/host/IBWrappers.h b/receiver/host/IBWrappers.h index e7f2987b..902b0777 100644 --- a/receiver/host/IBWrappers.h +++ b/receiver/host/IBWrappers.h @@ -60,11 +60,24 @@ public: void ReadyToSend(); void PostReceiveWR(IBMemoryRegion &mr, uint32_t location, void *pointer, size_t size); - void PostInlineSendWR(void *pointer, size_t size); - void FlowSteering(uint64_t mac_addr, uint32_t ipv4); + void PostSendWR(IBMemoryRegion &mr, void *pointer, size_t size); + void FlowSteeringIPv4(uint32_t ipv4); ~IBQueuePair(); }; +class IBRegBuffer { + std::unique_ptr mr; + uint8_t *buffer; + const size_t loc_count; + const size_t loc_size; + void Unmap(); +public: + explicit IBRegBuffer(IBProtectionDomain &pd, size_t loc_count, size_t loc_size, int16_t numa_node = -1); + ~IBRegBuffer(); + uint8_t *GetLocation(uint64_t location); + IBMemoryRegion *GetMemoryRegion(); +}; + #endif //JUNGFRAUJOCH_IBWRAPPERS_H #endif // JFJOCH_USE_IBVERBS \ No newline at end of file diff --git a/receiver/host/MlxRawEthDevice.cpp b/receiver/host/MlxRawEthDevice.cpp index db229390..a06f03d0 100644 --- a/receiver/host/MlxRawEthDevice.cpp +++ b/receiver/host/MlxRawEthDevice.cpp @@ -4,6 +4,12 @@ #ifdef JFJOCH_USE_IBVERBS #include "MlxRawEthDevice.h" +#include + +#ifdef JFJOCH_USE_NUMA +#include +#endif + MlxRawEthDevice::MlxRawEthDevice(uint16_t dev_id, uint16_t data_stream, size_t in_frame_buffer_size_modules, int16_t in_numa_node) : AcquisitionDevice(data_stream), @@ -35,21 +41,67 @@ void MlxRawEthDevice::HW_ReadActionRegister(ActionConfig *job) { memcpy(job, &cfg, sizeof(ActionConfig)); } +void MlxRawEthDevice::MeasureThread() { + try { + IBProtectionDomain pd(context); + IBCompletionQueue cq(context, BUFFER_COUNT+2); + IBQueuePair qp(pd, cq, 16, BUFFER_COUNT); + IBRegBuffer buffer(pd, BUFFER_COUNT, BUFFER_SIZE, numa_node); + ProcessJFPacket process(completion_queue, wr_queue, cfg.nmodules); + + qp.Init(); + qp.ReadyToReceive(); + qp.ReadyToSend(); + + qp.FlowSteeringIPv4(cfg.fpga_ipv4_addr); + + for (int i = 0; i < BUFFER_COUNT-1; i++) + qp.PostReceiveWR(*buffer.GetMemoryRegion(), i, buffer.GetLocation(i), BUFFER_SIZE); + + completion_queue.Put(Completion{ + .type = Completion::Type::Start + }); + + auto cq_poll_future = std::async(std::launch::async, &MlxRawEthDevice::PollCQ, + this, + std::ref(buffer), + std::ref(qp), + std::ref(cq), + std::ref(process)); + + auto arp_future = std::async(std::launch::async, &MlxRawEthDevice::SendARP, + this, + std::ref(buffer), + std::ref(qp)); + + uint64_t packet_count = cq_poll_future.get(); + + arp_future.get(); + + completion_queue.Put(Completion{ + .type = Completion::Type::End, + .frame_number = packet_count + }); + + } catch (const JFJochException &e) { + cancel = true; + throw e; + } + idle = true; +} + void MlxRawEthDevice::HW_StartAction() { + if (cfg.mode & MODE_CONV) + throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, + "Conversion on CPU flag has to be enabled for Raw Ethernet device"); + cancel = false; - receiver = std::make_unique(context, - completion_queue, - wr_queue, - cfg.nmodules, - mac_addr, - cfg.fpga_ipv4_addr, - cancel, - 1, - numa_node); + idle = false; + measure = std::async(std::launch::async, &MlxRawEthDevice::MeasureThread, this); } bool MlxRawEthDevice::HW_IsIdle() const { - return (receiver == nullptr); + return idle; } void MlxRawEthDevice::HW_SetCancelDataCollectionBit() { @@ -76,7 +128,8 @@ void MlxRawEthDevice::HW_GetStatus(ActionStatus *status) const { } void MlxRawEthDevice::HW_EndAction() { - receiver.reset(); + if (measure.valid()) + measure.get(); } void MlxRawEthDevice::CopyInternalPacketGenFrameToDeviceBuffer() { @@ -92,4 +145,79 @@ uint64_t MlxRawEthDevice::HW_GetMACAddress() const { return mac_addr; } +uint64_t MlxRawEthDevice::PollCQ(IBRegBuffer &buffer, IBQueuePair &qp, IBCompletionQueue &cq, ProcessJFPacket &process) { +#ifdef JFJOCH_USE_NUMA + if (numa_available() != -1) + numa_run_on_node(numa_node); +#endif + + uint64_t packet_counter = 0; + + while (!cancel) { + int64_t i; + size_t size; + if (cq.Poll(i, size) > 0) { + if ((i < BUFFER_COUNT) && (size == sizeof(jf_raw_packet))) { + auto ptr = (jf_raw_packet *) buffer.GetLocation(i); + process.ProcessPacket(&ptr->jf, ptr->ipv4_header_sour_ip); + qp.PostReceiveWR(*buffer.GetMemoryRegion(), i, buffer.GetLocation(i), BUFFER_SIZE); + packet_counter++; + } + } else + std::this_thread::sleep_for(std::chrono::microseconds(10)); + } + + return packet_counter; +} + +// ARP packet - from if_arp.h +#pragma pack(push) +#pragma pack(2) +struct RAW_ARP_Packet +{ + unsigned char dest_mac[6]; + unsigned char sour_mac[6]; + uint16_t ether_type; + + unsigned short int ar_hrd; /* Format of hardware address. */ + unsigned short int ar_pro; /* Format of protocol address. */ + unsigned char ar_hln; /* Length of hardware address. */ + unsigned char ar_pln; /* Length of protocol address. */ + unsigned short int ar_op; /* ARP opcode (command). */ + + unsigned char ar_sha[6]; /* Sender hardware address. */ + unsigned ar_sip; /* Sender IP address. */ + unsigned char ar_tha[6]; /* Target hardware address. */ + unsigned ar_tip; /* Target IP address. */ +}; +#pragma pack(pop) + +void MlxRawEthDevice::SendARP(IBRegBuffer &buffer, IBQueuePair &qp) { + + uint8_t src_mac[6]; + for (int i = 0; i < 6; i++) + src_mac[i] = (mac_addr & (0xFFlu << (i * 8))) >> (i * 8); + + RAW_ARP_Packet arp_packet{ + .dest_mac = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}, + .sour_mac = {src_mac[0], src_mac[1], src_mac[2], src_mac[3], src_mac[4], src_mac[5]}, + .ether_type = htons(0x0806), // ether type for ARP + .ar_hrd = htons(1), // MAC addr + .ar_pro = htons(0x0800), // IPv4 + .ar_hln = 0x6, // 6 bytes + .ar_pln = 0x4, // 4 bytes + .ar_op = htons(1), // ARP request + .ar_sha = {src_mac[0], src_mac[1], src_mac[2], src_mac[3], src_mac[4], src_mac[5]}, + .ar_sip = cfg.fpga_ipv4_addr, + .ar_tha = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}, + .ar_tip = cfg.fpga_ipv4_addr + }; + + memcpy(buffer.GetLocation(BUFFER_COUNT-1), &arp_packet, sizeof(RAW_ARP_Packet)); + while (!cancel) { + qp.PostSendWR(*buffer.GetMemoryRegion(), buffer.GetLocation(BUFFER_COUNT-1), sizeof(RAW_ARP_Packet)); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } +} + #endif //JFJOCH_USE_IBVERBS \ No newline at end of file diff --git a/receiver/host/MlxRawEthDevice.h b/receiver/host/MlxRawEthDevice.h index 16db6137..de012266 100644 --- a/receiver/host/MlxRawEthDevice.h +++ b/receiver/host/MlxRawEthDevice.h @@ -2,14 +2,21 @@ // SPDX-License-Identifier: GPL-3.0-or-later #ifdef JFJOCH_USE_IBVERBS + #ifndef JUNGFRAUJOCH_RAWETHDEVICE_H #define JUNGFRAUJOCH_RAWETHDEVICE_H +#include + #include "AcquisitionDevice.h" #include "IBWrappers.h" -#include "IBReceiver.h" +#include "../../jungfrau/ProcessJFPacket.h" class MlxRawEthDevice : public AcquisitionDevice { + + constexpr const static size_t BUFFER_SIZE = 9000; + constexpr const static size_t BUFFER_COUNT = 4096; + std::mutex m; IBContext context; ThreadSafeFIFO completion_queue; @@ -18,9 +25,19 @@ class MlxRawEthDevice : public AcquisitionDevice { ActionConfig cfg; const int16_t numa_node; - std::unique_ptr receiver; - volatile bool cancel = false; + std::future measure; + volatile bool cancel = false; + volatile bool idle = true; + + void SendARP(IBRegBuffer &buffer, IBQueuePair &qp); + + uint64_t PollCQ(IBRegBuffer &buffer, + IBQueuePair &qp, + IBCompletionQueue &cq, + ProcessJFPacket &process); + + void MeasureThread(); Completion ReadCompletion() override; void HW_WriteActionRegister(const ActionConfig *job) override; void HW_ReadActionRegister(ActionConfig *job) override; diff --git a/receiver/host/MlxRawEthRcv.cpp b/receiver/host/MlxRawEthRcv.cpp new file mode 100644 index 00000000..b77fd984 --- /dev/null +++ b/receiver/host/MlxRawEthRcv.cpp @@ -0,0 +1,32 @@ +// Copyright (2019-2022) Paul Scherrer Institute +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "MlxRawEthDevice.h" +#include "../common/DiffractionExperiment.h" + +int main(int argc, char **argv) { + DiffractionExperiment experiment(2, {4,4}, 8, 36); + experiment.Mode(DetectorMode::Raw); + experiment.ImagesPerTrigger(1000); + + Logger logger("MlxRawEthRcv"); + logger.Verbose(true); + + try { + MlxRawEthDevice mlx(1, 0, 1024); + logger.Info("Mac addr {}", mlx.GetMACAddress()); + mlx.EnableLogging(&logger); + mlx.StartAction(experiment); + mlx.WaitForActionComplete(); + logger.Info("Bytes received {}", mlx.GetBytesReceived()); + + JFJochProtoBuf::AcquisitionDeviceStatistics statistics; + + mlx.SaveStatistics(experiment, statistics); + logger.Info("{} {}", statistics.efficiency(), statistics.packets_received_per_module(1)); + + } catch (std::exception &e) { + logger.ErrorException(e); + exit(EXIT_FAILURE); + } +} \ No newline at end of file diff --git a/tests/DiffractionExperimentTest.cpp b/tests/DiffractionExperimentTest.cpp index f9106a79..9111ecc4 100644 --- a/tests/DiffractionExperimentTest.cpp +++ b/tests/DiffractionExperimentTest.cpp @@ -177,11 +177,11 @@ TEST_CASE("DiffractionExperiment_IPv4Address","[DiffractionExperiment]") { uint32_t ndatastreams = 3; - REQUIRE(x.GetDestIPv4Address(0) == 0xFF32010a); - REQUIRE(x.GetDestIPv4Address(1) == 0xFE32010a); - REQUIRE(x.GetDestIPv4Address(2) == 0xFD32010a); - REQUIRE(x.GetSrcIPv4Address(0, 4) == (0x0032010a | ((0 * 32 + 4) << 24))); - REQUIRE(x.GetSrcIPv4Address(2, 23) == (0x0032010a | ((2 * 32 + 23) << 24))); + REQUIRE(x.GetDestIPv4Address(0) == 0x0132010a); + REQUIRE(x.GetDestIPv4Address(1) == 0x0232010a); + REQUIRE(x.GetDestIPv4Address(2) == 0x0332010a); + REQUIRE(x.GetSrcIPv4Address(0, 4) == (0x0032010a | ((1 * 32 + 4) << 24))); + REQUIRE(x.GetSrcIPv4Address(2, 23) == (0x0032010a | ((3 * 32 + 23) << 24))); REQUIRE_THROWS(x.GetDestIPv4Address(3)); REQUIRE_THROWS(x.GetSrcIPv4Address(0, 24)); REQUIRE_THROWS(x.GetSrcIPv4Address(3, 5)); @@ -193,12 +193,12 @@ TEST_CASE("DiffractionExperiment_IPv4Address","[DiffractionExperiment]") { REQUIRE_THROWS(x.IPv4Subnet("64.1.124.129")); REQUIRE_NOTHROW(x.IPv4Subnet("64.1.124.0")); - REQUIRE(x.GetDestIPv4Address(0) == 0xFF7c0140u); - REQUIRE(x.GetDestIPv4Address(1) == 0xFE7c0140u); - REQUIRE(x.GetDestIPv4Address(2) == 0xFD7c0140u); - REQUIRE(x.GetSrcIPv4Address(2, 12) == 0x007c0140u + ((2 * 32 + 12) << 24)); + REQUIRE(x.GetDestIPv4Address(0) == 0x017c0140u); + REQUIRE(x.GetDestIPv4Address(1) == 0x027c0140u); + REQUIRE(x.GetDestIPv4Address(2) == 0x037c0140u); + REQUIRE(x.GetSrcIPv4Address(2, 12) == 0x007c0140u + ((3 * 32 + 12) << 24)); - REQUIRE(IPv4AddressToStr(x.GetDestIPv4Address(2)) == "64.1.124.253"); + REQUIRE(IPv4AddressToStr(x.GetDestIPv4Address(2)) == "64.1.124.3"); } TEST_CASE("IPv4AddressToStr","") { @@ -535,7 +535,7 @@ TEST_CASE("DiffractionExperiment_ExportProtobuf","[DiffractionExperiment]") { REQUIRE(x.GetImageTime() == y.GetImageTime()); REQUIRE(y.GetFrameCountTime().count() == x.GetFrameCountTime().count()); REQUIRE(y.GetDestUDPPort(0,0) == 64*76); - REQUIRE(y.GetDestIPv4Address(0) == 0xFF020202); + REQUIRE(y.GetDestIPv4Address(0) == 0x01020202); REQUIRE(y.GetPedestalG0Frames() == x.GetPedestalG0Frames()); REQUIRE(y.GetMaskModuleEdges() == x.GetMaskModuleEdges()); REQUIRE(y.GetMaskChipEdges() == x.GetMaskChipEdges());