diff --git a/slsSupportLib/include/UdpRxSocket.h b/slsSupportLib/include/UdpRxSocket.h index 0e6b004a6..7610ad150 100644 --- a/slsSupportLib/include/UdpRxSocket.h +++ b/slsSupportLib/include/UdpRxSocket.h @@ -9,12 +9,14 @@ this might be deprecated in the future */ +#include "container_utils.h" #include "genericSocket.h" #include "network_utils.h" #include "sls_detector_exceptions.h" #include #include #include +#include #include #include #include @@ -26,12 +28,12 @@ namespace sls { class UdpRxSocket { const ssize_t packet_size; - char *buff; - int fd = -1; + std::unique_ptr buff; + int sockfd = -1; public: UdpRxSocket(int port, ssize_t packet_size, const char *hostname = nullptr, - ssize_t buffer_size = 0) + ssize_t kernel_rbuffer_size = 0) : packet_size(packet_size) { /* hostname = nullptr -> wildcard */ @@ -48,11 +50,11 @@ class UdpRxSocket { throw RuntimeError("Failed at getaddrinfo with " + std::string(hostname)); } - fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); - if (fd == -1) { + sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (sockfd == -1) { throw RuntimeError("Failed to create UDP RX socket"); } - if (bind(fd, res->ai_addr, res->ai_addrlen) == -1) { + if (bind(sockfd, res->ai_addr, res->ai_addrlen) == -1) { throw RuntimeError("Failed to bind UDP RX socket"); } freeaddrinfo(res); @@ -60,19 +62,19 @@ class UdpRxSocket { // If we get a specified buffer size that is larger than the set one // we set it. Otherwise we leave it there since it could have been // set by the rx_udpsocksize command - if (buffer_size) { + if (kernel_rbuffer_size) { auto current = getBufferSize() / 2; - if (current < buffer_size) { - setBufferSize(buffer_size); - if (getBufferSize() / 2 < buffer_size) { + if (current < kernel_rbuffer_size) { + setBufferSize(kernel_rbuffer_size); + if (getBufferSize() / 2 < kernel_rbuffer_size) { FILE_LOG(logWARNING) << "Could not set buffer size. Got: " - << getBufferSize() / 2 << " instead of " << buffer_size; + << getBufferSize() / 2 << " instead of " + << kernel_rbuffer_size; } } } - // Allocate at the end to avoid memory leak if we throw - buff = new char[packet_size]; + buff = sls::make_unique(packet_size); } // Delegating constructor to allow drop in replacement for old socket class @@ -85,34 +87,38 @@ class UdpRxSocket { buf_size) {} ~UdpRxSocket() { - delete[] buff; - Shutdown(); + if (sockfd >= 0) + close(sockfd); } - const char *LastPacket() const noexcept { return buff; } + UdpRxSocket(const UdpRxSocket &) = delete; + UdpRxSocket(UdpRxSocket &&) = delete; + + const char *LastPacket() const noexcept { return buff.get(); } constexpr ssize_t getPacketSize() const noexcept { return packet_size; } bool ReceivePacket() noexcept { - auto bytes_received = - recvfrom(fd, buff, packet_size, 0, nullptr, nullptr); - return bytes_received == packet_size; + return ReceivePacket(buff.get()); + // auto bytes_received = + // recvfrom(sockfd, buff.get(), packet_size, 0, nullptr, nullptr); + // return bytes_received == packet_size; } bool ReceivePacket(char *dst) noexcept { auto bytes_received = - recvfrom(fd, buff, packet_size, 0, nullptr, nullptr); + recvfrom(sockfd, dst, packet_size, 0, nullptr, nullptr); return bytes_received == packet_size; } // Only for backwards compatibility this function will be removed during // refactoring of the receiver ssize_t ReceiveDataOnly(char *dst) { - auto r = recvfrom(fd, dst, packet_size, 0, nullptr, nullptr); + auto r = recvfrom(sockfd, dst, packet_size, 0, nullptr, nullptr); constexpr ssize_t eiger_header_packet = 40; // only detector that has this if (r == eiger_header_packet) { FILE_LOG(logWARNING) << "Got header pkg"; - r = recvfrom(fd, dst, packet_size, 0, nullptr, nullptr); + r = recvfrom(sockfd, dst, packet_size, 0, nullptr, nullptr); } return r; } @@ -120,7 +126,7 @@ class UdpRxSocket { ssize_t getBufferSize() const { uint64_t ret_size = 0; socklen_t optlen = sizeof(uint64_t); - if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen) == -1) + if (getsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen) == -1) return -1; else return ret_size; @@ -130,20 +136,20 @@ class UdpRxSocket { ssize_t getActualUDPSocketBufferSize() const { return getBufferSize(); } // Only for backwards compatibility will be removed - void ShutDownSocket() { Shutdown(); } + void ShutDownSocket() { Close(); } void setBufferSize(ssize_t size) { socklen_t optlen = sizeof(size); - if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &size, optlen)) { + if (setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &size, optlen)) { throw RuntimeError("Could not set socket buffer size"); } } - void Shutdown() { - shutdown(fd, SHUT_RDWR); - if (fd >= 0) { - close(fd); - fd = -1; + // Do we need this function or can we rely on scope? + void Close() { + if (sockfd >= 0) { + close(sockfd); + sockfd = -1; } } }; diff --git a/slsSupportLib/tests/test-UdpRxSocket.cpp b/slsSupportLib/tests/test-UdpRxSocket.cpp index 452fa9e72..ff8268369 100644 --- a/slsSupportLib/tests/test-UdpRxSocket.cpp +++ b/slsSupportLib/tests/test-UdpRxSocket.cpp @@ -5,6 +5,8 @@ #include #include +constexpr int default_port = 50001; + int open_socket(int port) { const char *host = nullptr; // localhost @@ -34,9 +36,8 @@ int open_socket(int port) { return fd; } -TEST_CASE("Receive a packet on localhost") { +TEST_CASE("Receive data on localhost") { constexpr int port = 50001; - std::vector data_to_send{4, 5, 3, 2, 5, 7, 2, 3}; ssize_t packet_size = sizeof(decltype(data_to_send)::value_type) * data_to_send.size(); @@ -44,9 +45,6 @@ TEST_CASE("Receive a packet on localhost") { int fd = open_socket(port); - // int n = sendto(fd, data_to_send.data(), packet_size, 0, res->ai_addr, - // res->ai_addrlen); - auto n = write(fd, data_to_send.data(), packet_size); CHECK(n == packet_size); CHECK(udpsock.ReceivePacket()); @@ -60,7 +58,7 @@ TEST_CASE("Receive a packet on localhost") { } } -TEST_CASE("Shutdown socket without hanging") { +TEST_CASE("Shutdown socket without hanging when waiting for data") { constexpr int port = 50001; constexpr ssize_t packet_size = 8000; sls::UdpRxSocket s{port, packet_size}; @@ -72,7 +70,7 @@ TEST_CASE("Shutdown socket without hanging") { &sls::UdpRxSocket::ReceivePacket), &s); - s.Shutdown(); + s.Close(); auto r = ret.get(); CHECK(r == false); // since we didn't get the packet @@ -86,4 +84,26 @@ TEST_CASE("Too small packet"){ write(fd, &val, sizeof(val)); CHECK(s.ReceivePacket() == false); close(fd); +} + + +TEST_CASE("Receive an int to internal buffer"){ + int to_send = 5; + int received = -1; + auto fd = open_socket(default_port); + sls::UdpRxSocket s(default_port, sizeof(int)); + write(fd, &to_send, sizeof(to_send)); + CHECK(s.ReceivePacket()); + memcpy(&received, s.LastPacket(), sizeof(int)); + CHECK(received == to_send); +} + +TEST_CASE("Receive an int to an external buffer"){ + int to_send = 5; + int received = -1; + auto fd = open_socket(default_port); + sls::UdpRxSocket s(default_port, sizeof(int)); + write(fd, &to_send, sizeof(to_send)); + CHECK(s.ReceivePacket(reinterpret_cast(&received))); + CHECK(received == to_send); } \ No newline at end of file