Files
Jungfraujoch/receiver/host/LinuxSocketDevice.cpp

197 lines
5.8 KiB
C++

// Copyright (2019-2022) Paul Scherrer Institute
// SPDX-License-Identifier: GPL-3.0-or-later
#include "LinuxSocketDevice.h"
#include <sys/socket.h>
#include <arpa/inet.h>
#include <ifaddrs.h>
#include <linux/if_packet.h>
#include "../common/JFJochException.h"
#define MAX_MODULES 16
LinuxSocketDevice::LinuxSocketDevice(uint32_t in_ipv4_addr, uint16_t in_udp_port,
uint16_t data_stream, size_t in_frame_buffer_size_modules,
int32_t in_rcv_buf_size, int16_t in_numa_node) :
AcquisitionDevice(data_stream), ipv4_addr(in_ipv4_addr), udp_port(in_udp_port),
numa_node(in_numa_node), rcv_buf_size(in_rcv_buf_size) {
max_modules = 16;
MapBuffersStandard(in_frame_buffer_size_modules, 1, numa_node);
mac_addr = 0;
FindMACAddress();
}
void LinuxSocketDevice::MeasureThread(int fd) {
jf_udp_payload jf{};
uint64_t packet_count = 0;
completion_queue.Put(Completion{
.type = Completion::Type::Start
});
try {
ProcessJFPacket process(completion_queue, wr_queue, max_modules);
while (!cancel) {
auto count = recv(fd, &jf, sizeof(jf_udp_payload), 0);
if (count == sizeof(jf_udp_payload)) {
process.ProcessPacket(&jf);
packet_count++;
} else if ((count == -1) && (errno != EAGAIN) && (errno != EWOULDBLOCK))
throw JFJochException(JFJochExceptionCategory::UDPError, "Error in UDP receiving");
}
} catch (const JFJochException &e) {
if (logger)
logger->ErrorException(e);
}
// End message should be sent always
completion_queue.Put(Completion{
.type = Completion::Type::End,
.frame_number = packet_count
});
close(fd);
idle = true;
}
Completion LinuxSocketDevice::ReadCompletion() {
return completion_queue.GetBlocking();
}
void LinuxSocketDevice::HW_WriteActionRegister(const ActionConfig *job) {
memcpy(&cfg, job, sizeof(ActionConfig));
}
void LinuxSocketDevice::HW_ReadActionRegister(ActionConfig *job) {
memcpy(job, &cfg, sizeof(ActionConfig));
}
void LinuxSocketDevice::HW_StartAction() {
if (cfg.mode & MODE_CONV)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Conversion on CPU flag has to be enabled for Raw Ethernet device");
int fd = socket(AF_INET, SOCK_DGRAM, 0);
if (fd < 0)
throw JFJochException(JFJochExceptionCategory::UDPError, "Cannot create UDP socket");
sockaddr_in server_addr{
.sin_family = AF_INET,
.sin_port = htons(udp_port),
.sin_addr = {.s_addr = ipv4_addr}
};
timeval timeout{
.tv_sec = 0,
.tv_usec = 10000 // 10 ms
};
if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) != 0)
throw JFJochException(JFJochExceptionCategory::UDPError, "Cannot set socket timeout");
if (rcv_buf_size > 0) {
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcv_buf_size, sizeof(rcv_buf_size)) != 0)
throw JFJochException(JFJochExceptionCategory::UDPError, "Cannot set receive buffer size");
}
if (bind(fd, (struct sockaddr *) &server_addr, sizeof(server_addr)) != 0)
throw JFJochException(JFJochExceptionCategory::UDPError, "Cannot bind to UDP port");
cancel = false;
idle = false;
measure = std::async(std::launch::async, &LinuxSocketDevice::MeasureThread, this, fd);
}
bool LinuxSocketDevice::HW_IsIdle() const {
return idle;
}
void LinuxSocketDevice::HW_SetCancelDataCollectionBit() {
cancel = true;
}
bool LinuxSocketDevice::HW_SendWorkRequest(uint32_t handle) {
if (handle == UINT32_MAX)
HW_SetCancelDataCollectionBit();
else
wr_queue.Put(ProcessWorkRequest{
.ptr = buffer_device.at(handle),
.handle = handle
});
return true;
}
void LinuxSocketDevice::HW_GetStatus(ActionStatus *status) const {
memset(status, 0, sizeof(ActionStatus));
status->modules_internal_packet_generator = 1;
status->max_modules = max_modules;
}
uint64_t LinuxSocketDevice::HW_GetMACAddress() const {
return mac_addr;
}
uint32_t LinuxSocketDevice::HW_GetIPv4Address() const {
return ipv4_addr;
}
void LinuxSocketDevice::HW_EndAction() {
if (measure.valid())
measure.get();
}
void LinuxSocketDevice::CopyInternalPacketGenFrameToDeviceBuffer() {
// Do nothing
}
void LinuxSocketDevice::InitializeCalibration(const DiffractionExperiment &experiment, const JFCalibration &calib) {
// Do nothing
}
int32_t LinuxSocketDevice::GetNUMANode() const {
return numa_node;
}
uint16_t LinuxSocketDevice::GetUDPPort() const {
return udp_port;
}
void LinuxSocketDevice::FindMACAddress() {
std::string ifname;
bool found = false;
ifaddrs *ifaddr;
if (getifaddrs(&ifaddr) == -1)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "getifaddrs error");
for (ifaddrs *ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) {
if (ifa->ifa_addr->sa_family == AF_INET) {
if (((sockaddr_in *) ifa->ifa_addr)->sin_addr.s_addr == ipv4_addr) {
ifname = std::string(ifa->ifa_name);
found = true;
}
}
}
if (!found)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "IPv4 address not found");
found = false;
for (ifaddrs *ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) {
if ((std::string(ifa->ifa_name) == ifname) && ((ifa->ifa_addr->sa_family == AF_PACKET))) {
memcpy(&mac_addr, ((sockaddr_ll *) ifa->ifa_addr)->sll_addr, sizeof(uint64_t));
found = true;
}
}
if (!found)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "MAC address not found");
freeifaddrs(ifaddr);
}