// Copyright (2019-2023) Paul Scherrer Institute #ifdef JFJOCH_USE_IBVERBS #include "MlxRawEthDevice.h" #include "../common/NetworkAddressConvert.h" #include #ifdef JFJOCH_USE_NUMA #include #endif MlxRawEthDevice::MlxRawEthDevice(uint16_t dev_id, uint16_t data_stream, size_t in_frame_buffer_size_modules, int16_t in_numa_node) : AcquisitionDevice(data_stream), context("mlx5_" + std::to_string(dev_id)), numa_node(in_numa_node) { max_modules = 16; MapBuffersStandard(in_frame_buffer_size_modules, 1, numa_node); mac_addr = (static_cast(dev_id) << 8*5) | 0x00DDCCBBAA06; } int32_t MlxRawEthDevice::GetNUMANode() const { return numa_node; } void MlxRawEthDevice::MeasureThread() { work_completion_queue.Put(Completion{ .type = Completion::Type::Start }); try { IBProtectionDomain pd(context); IBCompletionQueue cq(context, BUFFER_COUNT+2); IBQueuePair qp(pd, cq, 16, BUFFER_COUNT); IBRegBuffer buffer(pd, BUFFER_COUNT, BUFFER_SIZE, numa_node); ProcessJFPacket process(work_completion_queue, work_request_queue, max_modules); qp.Init(); qp.ReadyToReceive(); qp.ReadyToSend(); qp.FlowSteeringIPv4(ipv4_addr); for (int i = 0; i < BUFFER_COUNT-1; i++) qp.PostReceiveWR(*buffer.GetMemoryRegion(), i, buffer.GetLocation(i), BUFFER_SIZE); auto cq_poll_future = std::async(std::launch::async, &MlxRawEthDevice::PollCQ, this, std::ref(buffer), std::ref(qp), std::ref(cq), std::ref(process)); auto arp_future = std::async(std::launch::async, &MlxRawEthDevice::SendARP, this, std::ref(buffer), std::ref(qp)); cq_poll_future.get(); arp_future.get(); } catch (const JFJochException &e) { cancel = true; if (logger) logger->ErrorException(e); } work_completion_queue.Put(Completion{ .type = Completion::Type::End }); } void MlxRawEthDevice::Start(const DiffractionExperiment& experiment) { if (experiment.GetConversionOnFPGA()) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Conversion on CPU flag has to be enabled for Raw Ethernet device"); cancel = false; measure = std::async(std::launch::async, &MlxRawEthDevice::MeasureThread, this); } void MlxRawEthDevice::Cancel() { cancel = true; } void MlxRawEthDevice::Finalize() { if (measure.valid()) measure.get(); } void MlxRawEthDevice::SetMACAddress(uint64_t mac_addr_network_order) { mac_addr = mac_addr_network_order; } void MlxRawEthDevice::SetIPv4Address(uint32_t ipv4_addr_network_order) { ipv4_addr = ipv4_addr_network_order; } void MlxRawEthDevice::PollCQ(IBRegBuffer &buffer, IBQueuePair &qp, IBCompletionQueue &cq, ProcessJFPacket &process) { #ifdef JFJOCH_USE_NUMA if (numa_available() != -1) numa_run_on_node(numa_node); #endif while (!cancel) { int64_t i; size_t size; if (cq.Poll(i, size) > 0) { if ((i < BUFFER_COUNT) && (size == sizeof(jf_raw_packet))) { auto ptr = (jf_raw_packet *) buffer.GetLocation(i); process.ProcessPacket(&ptr->jf); completed_descriptors = process.GetCompletedDescriptors(); qp.PostReceiveWR(*buffer.GetMemoryRegion(), i, buffer.GetLocation(i), BUFFER_SIZE); } } else std::this_thread::sleep_for(std::chrono::microseconds(10)); } } // ARP packet - from if_arp.h #pragma pack(push) #pragma pack(2) struct RAW_ARP_Packet { unsigned char dest_mac[6]; unsigned char sour_mac[6]; uint16_t ether_type; unsigned short int ar_hrd; /* Format of hardware address. */ unsigned short int ar_pro; /* Format of protocol address. */ unsigned char ar_hln; /* Length of hardware address. */ unsigned char ar_pln; /* Length of protocol address. */ unsigned short int ar_op; /* ARP opcode (command). */ unsigned char ar_sha[6]; /* Sender hardware address. */ unsigned ar_sip; /* Sender IP address. */ unsigned char ar_tha[6]; /* Target hardware address. */ unsigned ar_tip; /* Target IP address. */ }; #pragma pack(pop) void MlxRawEthDevice::SendARP(IBRegBuffer &buffer, IBQueuePair &qp) { uint8_t src_mac[6]; for (int i = 0; i < 6; i++) src_mac[i] = (mac_addr & (0xFFlu << (i * 8))) >> (i * 8); RAW_ARP_Packet arp_packet{ .dest_mac = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}, .sour_mac = {src_mac[0], src_mac[1], src_mac[2], src_mac[3], src_mac[4], src_mac[5]}, .ether_type = htons(0x0806), // ether type for ARP .ar_hrd = htons(1), // MAC addr .ar_pro = htons(0x0800), // IPv4 .ar_hln = 0x6, // 6 bytes .ar_pln = 0x4, // 4 bytes .ar_op = htons(1), // ARP request .ar_sha = {src_mac[0], src_mac[1], src_mac[2], src_mac[3], src_mac[4], src_mac[5]}, .ar_sip = ipv4_addr, .ar_tha = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}, .ar_tip = ipv4_addr }; memcpy(buffer.GetLocation(BUFFER_COUNT-1), &arp_packet, sizeof(RAW_ARP_Packet)); while (!cancel) { qp.PostSendWR(*buffer.GetMemoryRegion(), buffer.GetLocation(BUFFER_COUNT-1), sizeof(RAW_ARP_Packet)); std::this_thread::sleep_for(std::chrono::milliseconds(200)); } } uint32_t MlxRawEthDevice::GetCompletedDescriptors() const { return completed_descriptors; } #endif //JFJOCH_USE_IBVERBS