rxr: udp socket size max of INT_MAX/2 (#191)

This commit is contained in:
Dhanya Thattil 2020-09-25 10:15:39 +02:00 committed by GitHub
parent f950e32893
commit fe81963873
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 74 additions and 61 deletions

View File

@ -1298,7 +1298,7 @@ class Detector(CppDetectorApi):
@property @property
@element @element
def rx_udpsocksize(self): def rx_udpsocksize(self):
"""UDP socket buffer size in receiver. Tune rmem_default and rmem_max accordingly.""" """UDP socket buffer size in receiver. Tune rmem_default and rmem_max accordingly. Max size: INT_MAX/2."""
return self.getRxUDPSocketBufferSize() return self.getRxUDPSocketBufferSize()
@rx_udpsocksize.setter @rx_udpsocksize.setter

View File

@ -717,17 +717,17 @@ class Detector {
/** Default: padding enabled. Disabling padding is the fastest */ /** Default: padding enabled. Disabling padding is the fastest */
void setPartialFramesPadding(bool value, Positions pos = {}); void setPartialFramesPadding(bool value, Positions pos = {});
Result<int64_t> getRxUDPSocketBufferSize(Positions pos = {}) const; Result<int> getRxUDPSocketBufferSize(Positions pos = {}) const;
/** UDP socket buffer size in receiver. Tune rmem_default and rmem_max /** UDP socket buffer size in receiver. Tune rmem_default and rmem_max
* accordingly */ * accordingly. Max value is INT_MAX/2. */
void setRxUDPSocketBufferSize(int64_t udpsockbufsize, Positions pos = {}); void setRxUDPSocketBufferSize(int udpsockbufsize, Positions pos = {});
/** TODO: /** TODO:
* Gets actual udp socket buffer size. Double the size of rx_udpsocksize due * Gets actual udp socket buffer size. Double the size of rx_udpsocksize due
* to kernel bookkeeping. * to kernel bookkeeping.
*/ */
Result<int64_t> getRxRealUDPSocketBufferSize(Positions pos = {}) const; Result<int> getRxRealUDPSocketBufferSize(Positions pos = {}) const;
Result<bool> getRxLock(Positions pos = {}); Result<bool> getRxLock(Positions pos = {});

View File

@ -1568,9 +1568,9 @@ class CmdProxy {
INTEGER_COMMAND_VEC_ID( INTEGER_COMMAND_VEC_ID(
rx_udpsocksize, getRxUDPSocketBufferSize, setRxUDPSocketBufferSize, rx_udpsocksize, getRxUDPSocketBufferSize, setRxUDPSocketBufferSize,
StringTo<int64_t>, StringTo<int>,
"[n_size]\n\tUDP socket buffer size in receiver. Tune rmem_default and " "[n_size]\n\tUDP socket buffer size in receiver. Tune rmem_default and "
"rmem_max accordingly."); "rmem_max accordingly. Max value is INT_MAX/2.");
GET_COMMAND(rx_realudpsocksize, getRxRealUDPSocketBufferSize, GET_COMMAND(rx_realudpsocksize, getRxRealUDPSocketBufferSize,
"\n\tActual udp socket buffer size. Double the size of " "\n\tActual udp socket buffer size. Double the size of "

View File

@ -907,16 +907,16 @@ void Detector::setPartialFramesPadding(bool value, Positions pos) {
pimpl->Parallel(&Module::setPartialFramesPadding, pos, value); pimpl->Parallel(&Module::setPartialFramesPadding, pos, value);
} }
Result<int64_t> Detector::getRxUDPSocketBufferSize(Positions pos) const { Result<int> Detector::getRxUDPSocketBufferSize(Positions pos) const {
return pimpl->Parallel(&Module::getReceiverUDPSocketBufferSize, pos); return pimpl->Parallel(&Module::getReceiverUDPSocketBufferSize, pos);
} }
void Detector::setRxUDPSocketBufferSize(int64_t udpsockbufsize, Positions pos) { void Detector::setRxUDPSocketBufferSize(int udpsockbufsize, Positions pos) {
pimpl->Parallel(&Module::setReceiverUDPSocketBufferSize, pos, pimpl->Parallel(&Module::setReceiverUDPSocketBufferSize, pos,
udpsockbufsize); udpsockbufsize);
} }
Result<int64_t> Detector::getRxRealUDPSocketBufferSize(Positions pos) const { Result<int> Detector::getRxRealUDPSocketBufferSize(Positions pos) const {
return pimpl->Parallel(&Module::getReceiverRealUDPSocketBufferSize, pos); return pimpl->Parallel(&Module::getReceiverRealUDPSocketBufferSize, pos);
} }

View File

@ -844,17 +844,17 @@ void Module::setPartialFramesPadding(bool padding) {
sendToReceiver(F_SET_RECEIVER_PADDING, static_cast<int>(padding), nullptr); sendToReceiver(F_SET_RECEIVER_PADDING, static_cast<int>(padding), nullptr);
} }
int64_t Module::getReceiverUDPSocketBufferSize() const { int Module::getReceiverUDPSocketBufferSize() const {
int64_t arg = GET_FLAG; int arg = GET_FLAG;
return sendToReceiver<int64_t>(F_RECEIVER_UDP_SOCK_BUF_SIZE, arg); return sendToReceiver<int>(F_RECEIVER_UDP_SOCK_BUF_SIZE, arg);
} }
int64_t Module::getReceiverRealUDPSocketBufferSize() const { int Module::getReceiverRealUDPSocketBufferSize() const {
return sendToReceiver<int64_t>(F_RECEIVER_REAL_UDP_SOCK_BUF_SIZE); return sendToReceiver<int>(F_RECEIVER_REAL_UDP_SOCK_BUF_SIZE);
} }
void Module::setReceiverUDPSocketBufferSize(int64_t udpsockbufsize) { void Module::setReceiverUDPSocketBufferSize(int udpsockbufsize) {
sendToReceiver<int64_t>(F_RECEIVER_UDP_SOCK_BUF_SIZE, udpsockbufsize); sendToReceiver<int>(F_RECEIVER_UDP_SOCK_BUF_SIZE, udpsockbufsize);
} }
bool Module::getReceiverLock() const { bool Module::getReceiverLock() const {

View File

@ -236,9 +236,9 @@ class Module : public virtual slsDetectorDefs {
void setReceiverFramesDiscardPolicy(frameDiscardPolicy f); void setReceiverFramesDiscardPolicy(frameDiscardPolicy f);
bool getPartialFramesPadding() const; bool getPartialFramesPadding() const;
void setPartialFramesPadding(bool padding); void setPartialFramesPadding(bool padding);
int64_t getReceiverUDPSocketBufferSize() const; int getReceiverUDPSocketBufferSize() const;
int64_t getReceiverRealUDPSocketBufferSize() const; int getReceiverRealUDPSocketBufferSize() const;
void setReceiverUDPSocketBufferSize(int64_t udpsockbufsize); void setReceiverUDPSocketBufferSize(int udpsockbufsize);
bool getReceiverLock() const; bool getReceiverLock() const;
void setReceiverLock(bool lock); void setReceiverLock(bool lock);
sls::IpAddr getReceiverLastClientIP() const; sls::IpAddr getReceiverLastClientIP() const;

View File

@ -12,6 +12,7 @@
#include <cstdlib> #include <cstdlib>
#include <fstream> #include <fstream>
#include <iostream> #include <iostream>
#include <limits.h>
#include <map> #include <map>
#include <memory> #include <memory>
#include <sstream> #include <sstream>
@ -1138,15 +1139,22 @@ int ClientInterface::get_additional_json_header(Interface &socket) {
} }
int ClientInterface::set_udp_socket_buffer_size(Interface &socket) { int ClientInterface::set_udp_socket_buffer_size(Interface &socket) {
auto index = socket.Receive<int64_t>(); auto size = socket.Receive<int>();
if (index >= 0) { if (size == 0) {
verifyIdle(socket); throw RuntimeError("Receiver socket buffer size must be > 0.");
LOG(logDEBUG1) << "Setting UDP Socket Buffer size: " << index;
impl()->setUDPSocketBufferSize(index);
} }
int64_t retval = impl()->getUDPSocketBufferSize(); if (size > 0) {
if (index != 0) verifyIdle(socket);
validate(index, retval, if (size > INT_MAX / 2) {
throw RuntimeError(
"Receiver socket buffer size exceeded max (INT_MAX/2)");
}
LOG(logDEBUG1) << "Setting UDP Socket Buffer size: " << size;
impl()->setUDPSocketBufferSize(size);
}
int retval = impl()->getUDPSocketBufferSize();
if (size != 0)
validate(size, retval,
"set udp socket buffer size (No CAP_NET_ADMIN privileges?)", "set udp socket buffer size (No CAP_NET_ADMIN privileges?)",
DEC); DEC);
LOG(logDEBUG1) << "UDP Socket Buffer Size:" << retval; LOG(logDEBUG1) << "UDP Socket Buffer Size:" << retval;

View File

@ -946,12 +946,14 @@ void Implementation::setUDPPortNumber2(const uint32_t i) {
LOG(logINFO) << "UDP Port Number[1]: " << udpPortNum[1]; LOG(logINFO) << "UDP Port Number[1]: " << udpPortNum[1];
} }
int64_t Implementation::getUDPSocketBufferSize() const { int Implementation::getUDPSocketBufferSize() const {
return udpSocketBufferSize; return udpSocketBufferSize;
} }
void Implementation::setUDPSocketBufferSize(const int64_t s) { void Implementation::setUDPSocketBufferSize(const int s) {
int64_t size = (s == 0) ? udpSocketBufferSize : s; // custom setup is not 0 (must complain if set up didnt work)
// testing default setup at startup, argument is 0 to use default values
int size = (s == 0) ? udpSocketBufferSize : s;
size_t listSize = listener.size(); size_t listSize = listener.size();
if (myDetectorType == JUNGFRAU && (int)listSize != numUDPInterfaces) { if (myDetectorType == JUNGFRAU && (int)listSize != numUDPInterfaces) {
throw sls::RuntimeError( throw sls::RuntimeError(
@ -959,12 +961,17 @@ void Implementation::setUDPSocketBufferSize(const int64_t s) {
" do not match listener size " + std::to_string(listSize)); " do not match listener size " + std::to_string(listSize));
} }
for (unsigned int i = 0; i < listSize; ++i) { for (auto &l : listener) {
listener[i]->CreateDummySocketForUDPSocketBufferSize(size); l->CreateDummySocketForUDPSocketBufferSize(size);
}
// custom and didnt set, throw error
if (s != 0 && udpSocketBufferSize != s) {
throw sls::RuntimeError("Could not set udp socket buffer size. (No "
"CAP_NET_ADMIN privileges?)");
} }
} }
int64_t Implementation::getActualUDPSocketBufferSize() const { int Implementation::getActualUDPSocketBufferSize() const {
return actualUDPSocketBufferSize; return actualUDPSocketBufferSize;
} }

View File

@ -108,9 +108,9 @@ class Implementation : private virtual slsDetectorDefs {
uint32_t getUDPPortNumber2() const; uint32_t getUDPPortNumber2() const;
/* [Eiger][Jungfrau] */ /* [Eiger][Jungfrau] */
void setUDPPortNumber2(const uint32_t i); void setUDPPortNumber2(const uint32_t i);
int64_t getUDPSocketBufferSize() const; int getUDPSocketBufferSize() const;
void setUDPSocketBufferSize(const int64_t s); void setUDPSocketBufferSize(const int s);
int64_t getActualUDPSocketBufferSize() const; int getActualUDPSocketBufferSize() const;
/************************************************** /**************************************************
* * * *
@ -302,8 +302,8 @@ class Implementation : private virtual slsDetectorDefs {
std::array<std::string, MAX_NUMBER_OF_LISTENING_THREADS> eth; std::array<std::string, MAX_NUMBER_OF_LISTENING_THREADS> eth;
std::array<uint32_t, MAX_NUMBER_OF_LISTENING_THREADS> udpPortNum{ std::array<uint32_t, MAX_NUMBER_OF_LISTENING_THREADS> udpPortNum{
{DEFAULT_UDP_PORTNO, DEFAULT_UDP_PORTNO + 1}}; {DEFAULT_UDP_PORTNO, DEFAULT_UDP_PORTNO + 1}};
int64_t udpSocketBufferSize{0}; int udpSocketBufferSize{0};
int64_t actualUDPSocketBufferSize{0}; int actualUDPSocketBufferSize{0};
// zmq parameters // zmq parameters
bool dataStreamEnable{false}; bool dataStreamEnable{false};

View File

@ -21,14 +21,12 @@ const std::string Listener::TypeName = "Listener";
Listener::Listener(int ind, detectorType dtype, Fifo *f, Listener::Listener(int ind, detectorType dtype, Fifo *f,
std::atomic<runStatus> *s, uint32_t *portno, std::string *e, std::atomic<runStatus> *s, uint32_t *portno, std::string *e,
uint64_t *nf, int64_t *us, int64_t *as, uint64_t *nf, int *us, int *as, uint32_t *fpf,
uint32_t *fpf, frameDiscardPolicy *fdp, bool *act, frameDiscardPolicy *fdp, bool *act, bool *depaden, bool *sm)
bool *depaden, bool *sm)
: ThreadObject(ind, TypeName), fifo(f), myDetectorType(dtype), status(s), : ThreadObject(ind, TypeName), fifo(f), myDetectorType(dtype), status(s),
udpPortNumber(portno), eth(e), numImages(nf), udpPortNumber(portno), eth(e), numImages(nf), udpSocketBufferSize(us),
udpSocketBufferSize(us), actualUDPSocketBufferSize(as), actualUDPSocketBufferSize(as), framesPerFile(fpf), frameDiscardMode(fdp),
framesPerFile(fpf), frameDiscardMode(fdp), activated(act), activated(act), deactivatedPaddingEnable(depaden), silentMode(sm) {
deactivatedPaddingEnable(depaden), silentMode(sm) {
LOG(logDEBUG) << "Listener " << ind << " created"; LOG(logDEBUG) << "Listener " << ind << " created";
} }
@ -142,7 +140,7 @@ void Listener::ShutDownUDPSocket() {
} }
} }
void Listener::CreateDummySocketForUDPSocketBufferSize(int64_t s) { void Listener::CreateDummySocketForUDPSocketBufferSize(int s) {
LOG(logINFO) << "Testing UDP Socket Buffer size " << s << " with test port " LOG(logINFO) << "Testing UDP Socket Buffer size " << s << " with test port "
<< *udpPortNumber; << *udpPortNumber;
@ -151,7 +149,7 @@ void Listener::CreateDummySocketForUDPSocketBufferSize(int64_t s) {
return; return;
} }
int64_t temp = *udpSocketBufferSize; int temp = *udpSocketBufferSize;
*udpSocketBufferSize = s; *udpSocketBufferSize = s;
// if eth is mistaken with ip address // if eth is mistaken with ip address

View File

@ -41,9 +41,9 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
* @param sm pointer to silent mode * @param sm pointer to silent mode
*/ */
Listener(int ind, detectorType dtype, Fifo *f, std::atomic<runStatus> *s, Listener(int ind, detectorType dtype, Fifo *f, std::atomic<runStatus> *s,
uint32_t *portno, std::string *e, uint64_t *nf, uint32_t *portno, std::string *e, uint64_t *nf, int *us, int *as,
int64_t *us, int64_t *as, uint32_t *fpf, frameDiscardPolicy *fdp, uint32_t *fpf, frameDiscardPolicy *fdp, bool *act, bool *depaden,
bool *act, bool *depaden, bool *sm); bool *sm);
/** /**
* Destructor * Destructor
@ -98,7 +98,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
* to set & get actual buffer size * to set & get actual buffer size
* @param s UDP socket buffer size to be set * @param s UDP socket buffer size to be set
*/ */
void CreateDummySocketForUDPSocketBufferSize(int64_t s); void CreateDummySocketForUDPSocketBufferSize(int s);
/** /**
* Set hard coded (calculated but not from detector) row and column * Set hard coded (calculated but not from detector) row and column
@ -173,10 +173,10 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
uint64_t *numImages; uint64_t *numImages;
/** UDP Socket Buffer Size */ /** UDP Socket Buffer Size */
int64_t *udpSocketBufferSize; int *udpSocketBufferSize;
/** actual UDP Socket Buffer Size (double due to kernel bookkeeping) */ /** actual UDP Socket Buffer Size (double due to kernel bookkeeping) */
int64_t *actualUDPSocketBufferSize; int *actualUDPSocketBufferSize;
/** frames per file */ /** frames per file */
uint32_t *framesPerFile; uint32_t *framesPerFile;

View File

@ -14,11 +14,11 @@ class UdpRxSocket {
public: public:
UdpRxSocket(int port, ssize_t packet_size, const char *hostname = nullptr, UdpRxSocket(int port, ssize_t packet_size, const char *hostname = nullptr,
size_t kernel_buffer_size = 0); int kernel_buffer_size = 0);
~UdpRxSocket(); ~UdpRxSocket();
bool ReceivePacket(char *dst) noexcept; bool ReceivePacket(char *dst) noexcept;
size_t getBufferSize() const; int getBufferSize() const;
void setBufferSize(ssize_t size); void setBufferSize(int size);
ssize_t getPacketSize() const noexcept; ssize_t getPacketSize() const noexcept;
void Shutdown(); void Shutdown();

View File

@ -14,7 +14,7 @@
namespace sls { namespace sls {
UdpRxSocket::UdpRxSocket(int port, ssize_t packet_size, const char *hostname, UdpRxSocket::UdpRxSocket(int port, ssize_t packet_size, const char *hostname,
size_t kernel_buffer_size) int kernel_buffer_size)
: packet_size_(packet_size) { : packet_size_(packet_size) {
struct addrinfo hints; struct addrinfo hints;
memset(&hints, 0, sizeof(hints)); memset(&hints, 0, sizeof(hints));
@ -73,15 +73,15 @@ ssize_t UdpRxSocket::ReceiveDataOnly(char *dst) noexcept {
return r; return r;
} }
size_t UdpRxSocket::getBufferSize() const { int UdpRxSocket::getBufferSize() const {
size_t ret = 0; int ret = 0;
socklen_t optlen = sizeof(ret); socklen_t optlen = sizeof(ret);
if (getsockopt(sockfd_, SOL_SOCKET, SO_RCVBUF, &ret, &optlen) == -1) if (getsockopt(sockfd_, SOL_SOCKET, SO_RCVBUF, &ret, &optlen) == -1)
throw RuntimeError("Could not get socket buffer size"); throw RuntimeError("Could not get socket buffer size");
return ret; return ret;
} }
void UdpRxSocket::setBufferSize(ssize_t size) { void UdpRxSocket::setBufferSize(int size) {
if (setsockopt(sockfd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size))) if (setsockopt(sockfd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)))
throw RuntimeError("Could not set socket buffer size"); throw RuntimeError("Could not set socket buffer size");
} }