diff --git a/sample/CMakeLists.txt b/sample/CMakeLists.txt index 86481eadc..a1d2e21c1 100644 --- a/sample/CMakeLists.txt +++ b/sample/CMakeLists.txt @@ -26,6 +26,7 @@ target_link_libraries(udp slsSupportLib pthread rt + fmt ) set_target_properties(udp PROPERTIES diff --git a/sample/udp.cpp b/sample/udp.cpp index fbcb7fbb7..769c6cec7 100644 --- a/sample/udp.cpp +++ b/sample/udp.cpp @@ -1,13 +1,61 @@ -#include "UdpSocket.h" -#include +#include "UdpRxSocket.h" +#include "sls_detector_defs.h" #include +#include +#include #include -int main(){ - std::cout << "HEJ\n"; - sls::UdpSocket s(50010, 1024); +#include "network_utils.h" - while(true){ - std::cout << "Got: " << s.ReceivePacket() << " bytes\n"; - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - } +// Assume packages arrive in order + +// Assume frame nr starts from 0 + +using header_t = slsDetectorDefs::sls_detector_header; + +int main() { + fmt::print("Hej!\n"); + + // constexpr ssize_t expected_packages = 128; + // constexpr ssize_t n_pixels = 512 * 1024; + constexpr ssize_t packet_size = 8240; + constexpr ssize_t payload_size = 8240 - sizeof(header_t); + int port = 50020; + // fmt::print("header size: {}\n", sizeof(header_t)); + + sls::UdpRxSocket s(port, packet_size, nullptr, 212992*2); + fmt::print("buffer: {}\n", s.getBufferSize()); + s.setBufferSize(212992*4); + fmt::print("buffer: {}\n", s.getBufferSize()); + // auto header = reinterpret_cast(s.buffer()); + // char *data = s.buffer() + sizeof(header_t); + // fmt::print("buffer start: {}\nheader: {}\ndata: {}\n", fmt::ptr(s.buffer()), + // fmt::ptr(header), fmt::ptr(data)); + + // int n = 0; + + // fmt::print("Buffer size: {}\n", s.buffer_size()); + // std::vector image(n_pixels); + // char *image_data = (char *)image.data(); + // uint64_t frame_nr = 0; + // while (true) { + + // if (s.ReceivePacket()) { + + // // fmt::print("frame: {} pkt: {} dst: {}\n", header->frameNumber, + // // header->packetNumber, header->packetNumber*payload_size); + // if (header->frameNumber != frame_nr) { + // // dispatch frame + // fmt::print("frame {} done! got: {} pkgs\n", frame_nr, n); + // frame_nr = header->frameNumber; + // n = 0; + // std::this_thread::sleep_for(std::chrono::milliseconds(1)); + // } + // ++n; + // memcpy(image_data + header->packetNumber * payload_size, data, + // payload_size); + + // } else { + // std::cout << "timeout\n"; + // } + // } } \ No newline at end of file diff --git a/slsDetectorServers/jungfrauDetectorServer/CMakeLists.txt b/slsDetectorServers/jungfrauDetectorServer/CMakeLists.txt index e96070408..2d1b5e493 100644 --- a/slsDetectorServers/jungfrauDetectorServer/CMakeLists.txt +++ b/slsDetectorServers/jungfrauDetectorServer/CMakeLists.txt @@ -28,7 +28,7 @@ target_compile_definitions(jungfrauDetectorServer_virtual ) target_link_libraries(jungfrauDetectorServer_virtual - PUBLIC pthread rt + PUBLIC pthread rt slsProjectOptions slsProjectWarnings ) set_target_properties(jungfrauDetectorServer_virtual PROPERTIES diff --git a/slsReceiverSoftware/src/Listener.cpp b/slsReceiverSoftware/src/Listener.cpp index 2347f3da1..17c689af0 100755 --- a/slsReceiverSoftware/src/Listener.cpp +++ b/slsReceiverSoftware/src/Listener.cpp @@ -12,6 +12,7 @@ #include "container_utils.h" // For sls::make_unique<> #include "genericSocket.h" #include "sls_detector_exceptions.h" +#include "UdpRxSocket.h" #include #include @@ -147,7 +148,6 @@ void Listener::SetGeneralData(GeneralData* g) { void Listener::CreateUDPSockets() { - if (!(*activated)) { return; } @@ -163,7 +163,7 @@ void Listener::CreateUDPSockets() { ShutDownUDPSocket(); try{ - udpSocket = sls::make_unique(*udpPortNumber, genericSocket::UDP, + udpSocket = sls::make_unique(*udpPortNumber, genericSocket::UDP, generalData->packetSize, ((*eth).length() ? (*eth).c_str() : nullptr), generalData->headerPacketSize, *udpSocketBufferSize); FILE_LOG(logINFO) << index << ": UDP port opened at port " << *udpPortNumber; @@ -213,7 +213,7 @@ void Listener::CreateDummySocketForUDPSocketBufferSize(int64_t s) { //create dummy socket try { - genericSocket g(*udpPortNumber, genericSocket::UDP, + SELECTED_SOCKET g(*udpPortNumber, genericSocket::UDP, generalData->packetSize, ((*eth).length() ? (*eth).c_str() : nullptr), generalData->headerPacketSize, *udpSocketBufferSize); diff --git a/slsReceiverSoftware/src/Listener.h b/slsReceiverSoftware/src/Listener.h index 423df0fa2..56da69171 100755 --- a/slsReceiverSoftware/src/Listener.h +++ b/slsReceiverSoftware/src/Listener.h @@ -16,6 +16,13 @@ class GeneralData; class Fifo; class genericSocket; +namespace sls{ + class UdpRxSocket; +} + + +// #define SELECTED_SOCKET genericSocket +#define SELECTED_SOCKET sls::UdpRxSocket class Listener : private virtual slsDetectorDefs, public ThreadObject { @@ -187,7 +194,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { std::atomic* status; /** UDP Socket - Detector to Receiver */ - std::unique_ptr udpSocket; + std::unique_ptr udpSocket; /** UDP Port Number */ uint32_t* udpPortNumber; diff --git a/slsSupportLib/include/UdpRxSocket.h b/slsSupportLib/include/UdpRxSocket.h new file mode 100644 index 000000000..88e0df0e3 --- /dev/null +++ b/slsSupportLib/include/UdpRxSocket.h @@ -0,0 +1,149 @@ + +/* +UdpRxSocket provies socket control to receive +data on a udp socket. + +It provides a drop in replacement for +genericSocket. But please be careful since +this might be deprecated in the future. + + +*/ + +#include "genericSocket.h" +#include "network_utils.h" +#include "sls_detector_exceptions.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace sls { + +class UdpRxSocket { + int port; + const ssize_t packet_size; + ssize_t buffer_size; + char *buff; + int fd = -1; + + // If possible we could listen to only one source but for our setup + // we should have only our data on this port network??? + // recvfrom(fd, buff, packet_size, 0, (struct sockaddr *)&src_addr, + // &src_addr_len); + + // struct sockaddr_storage src_addr; + // socklen_t src_addr_len = sizeof(src_addr); + + public: + UdpRxSocket(int port, ssize_t packet_size, const char *hostname = nullptr, + ssize_t buffer_size = 0) + : port(port), packet_size(packet_size), buffer_size(buffer_size) { + /* hostname = nullptr -> wildcard */ + const std::string portname = std::to_string(port); + struct addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; + struct addrinfo *res = 0; + if (getaddrinfo(hostname, portname.c_str(), &hints, &res)) { + throw RuntimeError("Failed getaddinfo"); + } + fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (fd == -1) { + throw RuntimeError("Failed creating socket"); + } + if (bind(fd, res->ai_addr, res->ai_addrlen) == -1) { + throw RuntimeError("Failed to bind socket"); + } + freeaddrinfo(res); + + // If we get a specified buffer size that is larger than the set one + // we set it. Otherwise we leave it there since it could have been + // set by the rx_udpsocksize command + if (buffer_size) { + auto current = getBufferSize() / 2; + if (current < buffer_size) { + setBufferSize(buffer_size); + if (getBufferSize() / 2 < buffer_size) { + FILE_LOG(logWARNING) << "Could not set buffer size. Got: " << getBufferSize()/2 << " instead of " << buffer_size; + } + } + } + // Allocate at the end to avoid memory leak if we throw + buff = new char[packet_size]; + } + + // Delegating constructor to allow drop in replacement for old socket class + // This one might be removed in the future + UdpRxSocket(unsigned short int const port_number, + genericSocket::communicationProtocol p, + int ps = DEFAULT_PACKET_SIZE, const char *eth = NULL, + int hsize = 0, uint64_t buf_size = SOCKET_BUFFER_SIZE) + : UdpRxSocket(port_number, ps, InterfaceNameToIp(eth).str().c_str(), + buf_size) {} + + const char *LastPacket() const { return buff; } + + ~UdpRxSocket() { + delete[] buff; + Shutdown(); + } + + // Receive one packet to the internal buffer of the socket class, preferred + // method? + bool ReceivePacket() { + ssize_t count = recvfrom(fd, buff, packet_size, 0, nullptr, nullptr); + return count == packet_size; + } + + // Not sure we keep this + bool ReceivePacket(char *dst) { + ssize_t count = recvfrom(fd, buff, packet_size, 0, nullptr, nullptr); + return count == packet_size; + } + + // Only for backwards compatibility will be removed + ssize_t ReceiveDataOnly(char *dst) { + return recvfrom(fd, dst, packet_size, 0, nullptr, nullptr); + } + + ssize_t getBufferSize() const { + uint64_t ret_size = 0; + socklen_t optlen = sizeof(uint64_t); + if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen) == -1) + return -1; + else + return ret_size; + } + + // Only for backwards compatibility will be removed + ssize_t getActualUDPSocketBufferSize() const { return getBufferSize(); } + + // Only for backwards compatibility will be removed + void ShutDownSocket() { Shutdown(); } + + void setBufferSize(ssize_t size) { + socklen_t optlen = sizeof(size); + if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &size, optlen)) { + throw RuntimeError("Could not set socket buffer size"); + } + } + + void Shutdown() { + shutdown(fd, SHUT_RDWR); + if (fd >= 0) { + close(fd); + fd = -1; + } + } +}; + +} // namespace sls \ No newline at end of file diff --git a/slsSupportLib/include/UdpSocket.h b/slsSupportLib/include/UdpSocket.h deleted file mode 100644 index d1a8dd8b3..000000000 --- a/slsSupportLib/include/UdpSocket.h +++ /dev/null @@ -1,56 +0,0 @@ - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "sls_detector_exceptions.h" - -namespace sls { - -class UdpSocket { - int port; - size_t packet_size; - size_t buffer_size; - int fd = -1; - - public: - UdpSocket(int port, size_t packet_size) - : port(port), packet_size(packet_size) { - const char *hostname = 0; /* wildcard */ - const std::string portname = std::to_string(port); - struct addrinfo hints; - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_DGRAM; - hints.ai_protocol = 0; - hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; - struct addrinfo *res = 0; - if (getaddrinfo(hostname, portname.c_str(), &hints, &res)){ - throw RuntimeError("Failed getaddinfo"); - } - fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); - if(fd==-1){ - throw RuntimeError("Failed creating socket"); - } - if (bind(fd, res->ai_addr, res->ai_addrlen) == -1) { - throw RuntimeError("Failed to bind socket"); - } - freeaddrinfo(res); - } - - int ReceivePacket() { - char buffer[549]; - struct sockaddr_storage src_addr; - socklen_t src_addr_len = sizeof(src_addr); - ssize_t count = recvfrom(fd, buffer, sizeof(buffer), 0, - (struct sockaddr *)&src_addr, &src_addr_len); - return count; - } -}; - -} // namespace sls \ No newline at end of file diff --git a/slsSupportLib/include/network_utils.h b/slsSupportLib/include/network_utils.h index ca90002bc..b010451f8 100755 --- a/slsSupportLib/include/network_utils.h +++ b/slsSupportLib/include/network_utils.h @@ -62,7 +62,7 @@ class MacAddr { IpAddr HostnameToIp(const char *hostname); std::string IpToInterfaceName(const std::string& ip); MacAddr InterfaceNameToMac(const std::string& inf); - +IpAddr InterfaceNameToIp(const std::string& ifn); std::ostream &operator<<(std::ostream &out, const IpAddr &addr); std::ostream &operator<<(std::ostream &out, const MacAddr &addr); diff --git a/slsSupportLib/src/network_utils.cpp b/slsSupportLib/src/network_utils.cpp index 2a55ab8ed..c19bf62d5 100755 --- a/slsSupportLib/src/network_utils.cpp +++ b/slsSupportLib/src/network_utils.cpp @@ -1,36 +1,33 @@ #include "sls_detector_exceptions.h" #include +#include #include #include #include +#include #include -#include -#include -#include +#include #include +#include +#include +#include #include #include -#include -#include -#include #include "network_utils.h" namespace sls { - IpAddr::IpAddr(const std::string &address) { inet_pton(AF_INET, address.c_str(), &addr_); } IpAddr::IpAddr(const char *address) { inet_pton(AF_INET, address, &addr_); } -std::string IpAddr::str() const { - return arr().data(); -} +std::string IpAddr::str() const { return arr().data(); } -std::array IpAddr::arr() const{ +std::array IpAddr::arr() const { std::array ipstring{}; inet_ntop(AF_INET, &addr_, ipstring.data(), INET_ADDRSTRLEN); return ipstring; @@ -96,7 +93,7 @@ IpAddr HostnameToIp(const char *hostname) { } std::string IpToInterfaceName(const std::string &ip) { - //TODO! Copied from genericSocket needs to be refactored! + // TODO! Copied from genericSocket needs to be refactored! struct ifaddrs *addrs, *iap; struct sockaddr_in *sa; @@ -122,33 +119,58 @@ std::string IpToInterfaceName(const std::string &ip) { return std::string(buf); } -MacAddr InterfaceNameToMac(const std::string& inf) { - //TODO! Copied from genericSocket needs to be refactored! - struct ifreq ifr; - char mac[32]; - const int mac_len = sizeof(mac); - memset(mac,0,mac_len); +IpAddr InterfaceNameToIp(const std::string &ifn) { + struct ifaddrs *ifaddr, *ifa; + int family, s; + char host[NI_MAXHOST]; - int sock=socket(PF_INET, SOCK_STREAM, 0); - strncpy(ifr.ifr_name,inf.c_str(),sizeof(ifr.ifr_name)-1); - ifr.ifr_name[sizeof(ifr.ifr_name)-1]='\0'; + if (getifaddrs(&ifaddr) == -1) { + perror("getifaddrs"); + exit(EXIT_FAILURE); + } + for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) { + if (ifa->ifa_addr == NULL) + continue; - if (-1==ioctl(sock, SIOCGIFHWADDR, &ifr)) { - perror("ioctl(SIOCGIFHWADDR) "); - return MacAddr{}; - } - for (int j=0, k=0; j<6; j++) { - k+=snprintf(mac+k, mac_len-k-1, j ? ":%02X" : "%02X", - (int)(unsigned int)(unsigned char)ifr.ifr_hwaddr.sa_data[j]); - } - mac[mac_len-1]='\0'; + s = getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in), host, + NI_MAXHOST, NULL, 0, NI_NUMERICHOST); - if(sock!=1){ - close(sock); - } - return MacAddr(mac); + if ((strcmp(ifa->ifa_name, ifn.c_str()) == 0) && + (ifa->ifa_addr->sa_family == AF_INET)) { + } + } - } + freeifaddrs(ifaddr); + return IpAddr{host}; +} + +MacAddr InterfaceNameToMac(const std::string &inf) { + // TODO! Copied from genericSocket needs to be refactored! + struct ifreq ifr; + char mac[32]; + const int mac_len = sizeof(mac); + memset(mac, 0, mac_len); + + int sock = socket(PF_INET, SOCK_STREAM, 0); + strncpy(ifr.ifr_name, inf.c_str(), sizeof(ifr.ifr_name) - 1); + ifr.ifr_name[sizeof(ifr.ifr_name) - 1] = '\0'; + + if (-1 == ioctl(sock, SIOCGIFHWADDR, &ifr)) { + perror("ioctl(SIOCGIFHWADDR) "); + return MacAddr{}; + } + for (int j = 0, k = 0; j < 6; j++) { + k += snprintf( + mac + k, mac_len - k - 1, j ? ":%02X" : "%02X", + (int)(unsigned int)(unsigned char)ifr.ifr_hwaddr.sa_data[j]); + } + mac[mac_len - 1] = '\0'; + + if (sock != 1) { + close(sock); + } + return MacAddr(mac); +} } // namespace sls