/* UdpRxSocket provies socket control to receive data on a udp socket. It provides a drop in replacement for genericSocket. But please be careful since this might be deprecated in the future */ #include "container_utils.h" #include "genericSocket.h" #include "network_utils.h" #include "sls_detector_exceptions.h" #include #include #include #include #include #include #include #include #include #include namespace sls { class UdpRxSocket { const ssize_t packet_size; std::unique_ptr buff; int sockfd = -1; public: UdpRxSocket(int port, ssize_t packet_size, const char *hostname = nullptr, ssize_t kernel_rbuffer_size = 0) : packet_size(packet_size) { /* hostname = nullptr -> wildcard */ struct addrinfo hints; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_DGRAM; hints.ai_protocol = 0; hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; struct addrinfo *res = 0; const std::string portname = std::to_string(port); if (getaddrinfo(hostname, portname.c_str(), &hints, &res)) { throw RuntimeError("Failed at getaddrinfo with " + std::string(hostname)); } sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); if (sockfd == -1) { throw RuntimeError("Failed to create UDP RX socket"); } if (bind(sockfd, res->ai_addr, res->ai_addrlen) == -1) { throw RuntimeError("Failed to bind UDP RX socket"); } freeaddrinfo(res); // If we get a specified buffer size that is larger than the set one // we set it. Otherwise we leave it there since it could have been // set by the rx_udpsocksize command if (kernel_rbuffer_size) { auto current = getBufferSize() / 2; if (current < kernel_rbuffer_size) { setBufferSize(kernel_rbuffer_size); if (getBufferSize() / 2 < kernel_rbuffer_size) { FILE_LOG(logWARNING) << "Could not set buffer size. Got: " << getBufferSize() / 2 << " instead of " << kernel_rbuffer_size; } } } buff = sls::make_unique(packet_size); } // Delegating constructor to allow drop in replacement for old socket class // This one might be removed in the future UdpRxSocket(unsigned short int const port_number, genericSocket::communicationProtocol p, int ps = DEFAULT_PACKET_SIZE, const char *eth = NULL, int hsize = 0, uint64_t buf_size = SOCKET_BUFFER_SIZE) : UdpRxSocket(port_number, ps, InterfaceNameToIp(eth).str().c_str(), buf_size) {} ~UdpRxSocket() { if (sockfd >= 0) close(sockfd); } UdpRxSocket(const UdpRxSocket &) = delete; UdpRxSocket(UdpRxSocket &&) = delete; const char *LastPacket() const noexcept { return buff.get(); } constexpr ssize_t getPacketSize() const noexcept { return packet_size; } bool ReceivePacket() noexcept { return ReceivePacket(buff.get()); // auto bytes_received = // recvfrom(sockfd, buff.get(), packet_size, 0, nullptr, nullptr); // return bytes_received == packet_size; } bool ReceivePacket(char *dst) noexcept { auto bytes_received = recvfrom(sockfd, dst, packet_size, 0, nullptr, nullptr); return bytes_received == packet_size; } // Only for backwards compatibility this function will be removed during // refactoring of the receiver ssize_t ReceiveDataOnly(char *dst) { auto r = recvfrom(sockfd, dst, packet_size, 0, nullptr, nullptr); constexpr ssize_t eiger_header_packet = 40; // only detector that has this if (r == eiger_header_packet) { FILE_LOG(logWARNING) << "Got header pkg"; r = recvfrom(sockfd, dst, packet_size, 0, nullptr, nullptr); } return r; } ssize_t getBufferSize() const { uint64_t ret_size = 0; socklen_t optlen = sizeof(uint64_t); if (getsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen) == -1) return -1; else return ret_size; } // Only for backwards compatibility will be removed ssize_t getActualUDPSocketBufferSize() const { return getBufferSize(); } // Only for backwards compatibility will be removed void ShutDownSocket() { Close(); } void setBufferSize(ssize_t size) { socklen_t optlen = sizeof(size); if (setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &size, optlen)) { throw RuntimeError("Could not set socket buffer size"); } } // Do we need this function or can we rely on scope? void Close() { if (sockfd >= 0) { close(sockfd); sockfd = -1; } } }; } // namespace sls