diff --git a/slsReceiverSoftware/src/Listener.cpp b/slsReceiverSoftware/src/Listener.cpp index 6c0b054ff..0416e90d0 100755 --- a/slsReceiverSoftware/src/Listener.cpp +++ b/slsReceiverSoftware/src/Listener.cpp @@ -12,6 +12,7 @@ #include "container_utils.h" // For sls::make_unique<> #include "sls_detector_exceptions.h" #include "UdpRxSocket.h" +#include "network_utils.h" #include #include @@ -177,7 +178,7 @@ void Listener::CreateUDPSockets() { sem_init(&semaphore_socket,1,0); // doubled due to kernel bookkeeping (could also be less due to permissions) - *actualUDPSocketBufferSize = udpSocket->getActualUDPSocketBufferSize(); + *actualUDPSocketBufferSize = udpSocket->getBufferSize(); } @@ -185,7 +186,7 @@ void Listener::CreateUDPSockets() { void Listener::ShutDownUDPSocket() { if(udpSocket){ udpSocketAlive = false; - udpSocket->ShutDownSocket(); + udpSocket->Shutdown(); LOG(logINFO) << "Shut down of UDP port " << *udpPortNumber; fflush(stdout); // wait only if the threads have started as it is the threads that @@ -220,7 +221,7 @@ void Listener::CreateDummySocketForUDPSocketBufferSize(int64_t s) { *udpSocketBufferSize); // doubled due to kernel bookkeeping (could also be less due to permissions) - *actualUDPSocketBufferSize = g.getActualUDPSocketBufferSize(); + *actualUDPSocketBufferSize = g.getBufferSize(); if (*actualUDPSocketBufferSize == -1) { *udpSocketBufferSize = temp; } else { diff --git a/slsSupportLib/CMakeLists.txt b/slsSupportLib/CMakeLists.txt index 98904031e..67cc82d19 100755 --- a/slsSupportLib/CMakeLists.txt +++ b/slsSupportLib/CMakeLists.txt @@ -8,6 +8,7 @@ set(SOURCES src/ToString.cpp src/network_utils.cpp src/ZmqSocket.cpp + src/UdpRxSocket.cpp ) set(HEADERS diff --git a/slsSupportLib/include/UdpRxSocket.h b/slsSupportLib/include/UdpRxSocket.h index a6ffffbb3..3a4ee5685 100644 --- a/slsSupportLib/include/UdpRxSocket.h +++ b/slsSupportLib/include/UdpRxSocket.h @@ -1,141 +1,30 @@ +#pragma once /* -UdpRxSocket provies socket control to receive -data on a udp socket. - -It provides a drop in replacement for -genericSocket. But please be careful since -this might be deprecated in the future - +UDP socket class to receive data. The intended use is in the +receiver listener loop. Should be used RAII style... */ -#include "network_utils.h" -#include "sls_detector_exceptions.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include - +#include //ssize_t namespace sls { class UdpRxSocket { - const ssize_t packet_size; - char *buff; - int fd = -1; + const ssize_t packet_size_; + int sockfd_{-1}; public: UdpRxSocket(int port, ssize_t packet_size, const char *hostname = nullptr, - ssize_t buffer_size = 0) - : packet_size(packet_size) { - /* hostname = nullptr -> wildcard */ + size_t kernel_buffer_size = 0); + ~UdpRxSocket(); + bool ReceivePacket(char *dst) noexcept; + size_t getBufferSize() const; + void setBufferSize(ssize_t size); + ssize_t getPacketSize() const noexcept; + void Shutdown(); - struct addrinfo hints; - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_DGRAM; - hints.ai_protocol = 0; - hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; - struct addrinfo *res = 0; - - const std::string portname = std::to_string(port); - if (getaddrinfo(hostname, portname.c_str(), &hints, &res)) { - throw RuntimeError("Failed at getaddrinfo with " + - std::string(hostname)); - } - fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); - if (fd == -1) { - throw RuntimeError("Failed to create UDP RX socket"); - } - if (bind(fd, res->ai_addr, res->ai_addrlen) == -1) { - throw RuntimeError("Failed to bind UDP RX socket"); - } - freeaddrinfo(res); - - // If we get a specified buffer size that is larger than the set one - // we set it. Otherwise we leave it there since it could have been - // set by the rx_udpsocksize command - if (buffer_size) { - auto current = getBufferSize() / 2; - if (current < buffer_size) { - setBufferSize(buffer_size); - if (getBufferSize() / 2 < buffer_size) { - LOG(logWARNING) - << "Could not set buffer size. Got: " - << getBufferSize() / 2 << " instead of " << buffer_size; - } - } - } - // Allocate at the end to avoid memory leak if we throw - buff = new char[packet_size]; - } - - ~UdpRxSocket() { - delete[] buff; - Shutdown(); - } - - const char *LastPacket() const noexcept { return buff; } - ssize_t getPacketSize() const noexcept { return packet_size; } - - bool ReceivePacket() noexcept { return ReceivePacket(buff); } - - bool ReceivePacket(char *dst, int flags = 0) noexcept { - auto bytes_received = - recvfrom(fd, dst, packet_size, flags, nullptr, nullptr); - return bytes_received == packet_size; - } - - bool PeekPacket() noexcept{ - return ReceivePacket(buff, MSG_PEEK); - } - - // Only for backwards compatibility this function will be removed during - // refactoring of the receiver - ssize_t ReceiveDataOnly(char *dst) { - auto r = recvfrom(fd, dst, packet_size, 0, nullptr, nullptr); - constexpr ssize_t eiger_header_packet = - 40; // only detector that has this - if (r == eiger_header_packet) { - LOG(logWARNING) << "Got header pkg"; - r = recvfrom(fd, dst, packet_size, 0, nullptr, nullptr); - } - return r; - } - - ssize_t getBufferSize() const { - uint64_t ret_size = 0; - socklen_t optlen = sizeof(uint64_t); - if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen) == -1) - return -1; - else - return ret_size; - } - - // Only for backwards compatibility will be removed - ssize_t getActualUDPSocketBufferSize() const { return getBufferSize(); } - - // Only for backwards compatibility will be removed - void ShutDownSocket() { Shutdown(); } - - void setBufferSize(ssize_t size) { - socklen_t optlen = sizeof(size); - if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &size, optlen)) { - throw RuntimeError("Could not set socket buffer size"); - } - } - - void Shutdown() { - shutdown(fd, SHUT_RDWR); - if (fd >= 0) { - close(fd); - fd = -1; - } - } + // Only for backwards compatibility, this drops the EIGER small pkt, may be + // removed + ssize_t ReceiveDataOnly(char *dst) noexcept; }; } // namespace sls diff --git a/slsSupportLib/src/UdpRxSocket.cpp b/slsSupportLib/src/UdpRxSocket.cpp new file mode 100644 index 000000000..558d9ee7e --- /dev/null +++ b/slsSupportLib/src/UdpRxSocket.cpp @@ -0,0 +1,96 @@ +#include "UdpRxSocket.h" +#include "network_utils.h" +#include "sls_detector_exceptions.h" +#include +#include +#include +#include +#include +#include +#include +#include + +namespace sls { + +UdpRxSocket::UdpRxSocket(int port, ssize_t packet_size, const char *hostname, + size_t kernel_buffer_size) + : packet_size_(packet_size) { + struct addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; + struct addrinfo *res = 0; + + const std::string portname = std::to_string(port); + if (getaddrinfo(hostname, portname.c_str(), &hints, &res)) { + throw RuntimeError("Failed at getaddrinfo with " + + std::string(hostname)); + } + sockfd_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (sockfd_ == -1) { + throw RuntimeError("Failed to create UDP RX socket"); + } + if (bind(sockfd_, res->ai_addr, res->ai_addrlen) == -1) { + throw RuntimeError("Failed to bind UDP RX socket"); + } + freeaddrinfo(res); + + // If we get a specified buffer size that is larger than the set one + // we set it. Otherwise we leave it there since it could have been + // set by the rx_udpsocksize command + if (kernel_buffer_size) { + auto current = getBufferSize() / 2; + if (current < kernel_buffer_size) { + setBufferSize(kernel_buffer_size); + if (getBufferSize() / 2 < kernel_buffer_size) { + LOG(logWARNING) + << "Could not set buffer size. Got: " << getBufferSize() / 2 + << " instead of " << kernel_buffer_size; + } + } + } +} + +UdpRxSocket::~UdpRxSocket() { Shutdown(); } +ssize_t UdpRxSocket::getPacketSize() const noexcept { return packet_size_; } + +bool UdpRxSocket::ReceivePacket(char *dst) noexcept{ + auto bytes_received = + recvfrom(sockfd_, dst, packet_size_, 0, nullptr, nullptr); + return bytes_received == packet_size_; +} + +ssize_t UdpRxSocket::ReceiveDataOnly(char *dst) noexcept { + auto r = recvfrom(sockfd_, dst, packet_size_, 0, nullptr, nullptr); + constexpr ssize_t eiger_header_packet = + 40; // only detector that has this + if (r == eiger_header_packet) { + LOG(logWARNING) << "Got header pkg"; + r = recvfrom(sockfd_, dst, packet_size_, 0, nullptr, nullptr); + } + return r; + } + +size_t UdpRxSocket::getBufferSize() const { + size_t ret = 0; + socklen_t optlen = sizeof(ret); + if (getsockopt(sockfd_, SOL_SOCKET, SO_RCVBUF, &ret, &optlen) == -1) + throw RuntimeError("Could not get socket buffer size"); + return ret; +} + +void UdpRxSocket::setBufferSize(ssize_t size) { + if (setsockopt(sockfd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size))) + throw RuntimeError("Could not set socket buffer size"); +} + +void UdpRxSocket::Shutdown() { + shutdown(sockfd_, SHUT_RDWR); + if (sockfd_ >= 0) { + close(sockfd_); + sockfd_ = -1; + } + } +} // namespace sls \ No newline at end of file diff --git a/slsSupportLib/tests/test-UdpRxSocket.cpp b/slsSupportLib/tests/test-UdpRxSocket.cpp index d922b5e5c..07c7e115c 100644 --- a/slsSupportLib/tests/test-UdpRxSocket.cpp +++ b/slsSupportLib/tests/test-UdpRxSocket.cpp @@ -4,6 +4,14 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include constexpr int default_port = 50001; @@ -29,46 +37,49 @@ int open_socket(int port) { throw sls::RuntimeError("Failed to create UDP RX socket"); } - if (connect(fd, res->ai_addr, res->ai_addrlen)){ + if (connect(fd, res->ai_addr, res->ai_addrlen)) { throw sls::RuntimeError("Failed to connect socket"); } freeaddrinfo(res); return fd; } -TEST_CASE("Receive data on localhost") { +TEST_CASE("Get packet size returns the packet size we set in the constructor"){ + constexpr int port = 50001; + constexpr ssize_t packet_size = 8000; + sls::UdpRxSocket s{port, packet_size}; + CHECK(s.getPacketSize() == packet_size); +} + +TEST_CASE("Receive data from a vector") { constexpr int port = 50001; std::vector data_to_send{4, 5, 3, 2, 5, 7, 2, 3}; + std::vector data_received(data_to_send.size()); ssize_t packet_size = sizeof(decltype(data_to_send)::value_type) * data_to_send.size(); + sls::UdpRxSocket udpsock{port, packet_size}; - int fd = open_socket(port); auto n = write(fd, data_to_send.data(), packet_size); CHECK(n == packet_size); - CHECK(udpsock.ReceivePacket()); + + CHECK(udpsock.ReceivePacket((char*)data_received.data())); close(fd); - // Copy data from buffer and compare values - std::vector data_received(data_to_send.size()); - memcpy(data_received.data(), udpsock.LastPacket(), udpsock.getPacketSize()); - CHECK(data_received.size() == data_to_send.size()); // sanity check - for (size_t i = 0; i != data_to_send.size(); ++i) { - CHECK(data_to_send[i] == data_received[i]); - } + CHECK(data_to_send == data_received); + } TEST_CASE("Shutdown socket without hanging when waiting for data") { constexpr int port = 50001; constexpr ssize_t packet_size = 8000; sls::UdpRxSocket s{port, packet_size}; + char buff[packet_size]; // Start a thread and wait for package // if the socket is left open we would block std::future ret = - std::async(static_cast( - &sls::UdpRxSocket::ReceivePacket), - &s); + std::async(&sls::UdpRxSocket::ReceivePacket, &s, (char *)&buff); s.Shutdown(); auto r = ret.get(); @@ -76,60 +87,23 @@ TEST_CASE("Shutdown socket without hanging when waiting for data") { CHECK(r == false); // since we didn't get the packet } -TEST_CASE("Too small packet"){ +TEST_CASE("Too small packet") { constexpr int port = 50001; - sls::UdpRxSocket s(port, 2*sizeof(uint32_t)); + sls::UdpRxSocket s(port, 2 * sizeof(uint32_t)); auto fd = open_socket(port); uint32_t val = 10; write(fd, &val, sizeof(val)); - CHECK(s.ReceivePacket() == false); + uint32_t buff[2]; + CHECK(s.ReceivePacket((char *)&buff) == false); close(fd); } - -TEST_CASE("Receive an int to internal buffer"){ +TEST_CASE("Receive an int to an external buffer") { int to_send = 5; int received = -1; auto fd = open_socket(default_port); sls::UdpRxSocket s(default_port, sizeof(int)); write(fd, &to_send, sizeof(to_send)); - CHECK(s.ReceivePacket()); - memcpy(&received, s.LastPacket(), sizeof(int)); + CHECK(s.ReceivePacket(reinterpret_cast(&received))); CHECK(received == to_send); } - -TEST_CASE("Receive an int to an external buffer"){ - int to_send = 5; - int received = -1; - auto fd = open_socket(default_port); - sls::UdpRxSocket s(default_port, sizeof(int)); - write(fd, &to_send, sizeof(to_send)); - CHECK(s.ReceivePacket(reinterpret_cast(&received))); - CHECK(received == to_send); -} - - -TEST_CASE("PEEK data"){ - int to_send = 5; - int to_send2 = 12; - int received = -1; - auto fd = open_socket(default_port); - sls::UdpRxSocket s(default_port, sizeof(int)); - write(fd, &to_send, sizeof(to_send)); - write(fd, &to_send2, sizeof(to_send)); - CHECK(s.PeekPacket()); - memcpy(&received, s.LastPacket(), sizeof(int)); - CHECK(received == to_send); - - CHECK(s.PeekPacket()); - memcpy(&received, s.LastPacket(), sizeof(int)); - CHECK(received == to_send); - - CHECK(s.ReceivePacket()); - memcpy(&received, s.LastPacket(), sizeof(int)); - CHECK(received == to_send); - - CHECK(s.ReceivePacket()); - memcpy(&received, s.LastPacket(), sizeof(int)); - CHECK(received == to_send2); -}