diff --git a/slsDetectorSoftware/commonFiles/sls_detector_defs.h b/slsDetectorSoftware/commonFiles/sls_detector_defs.h index 46c21b67e..6ab8ca615 100755 --- a/slsDetectorSoftware/commonFiles/sls_detector_defs.h +++ b/slsDetectorSoftware/commonFiles/sls_detector_defs.h @@ -197,7 +197,9 @@ enum networkParameter { CLIENT_STREAMING_PORT, /**< client streaming TCP(ZMQ) port */ RECEIVER_STREAMING_SRC_IP,/**< receiever streaming TCP(ZMQ) ip */ CLIENT_STREAMING_SRC_IP, /**< client streaming TCP(ZMQ) ip */ - ADDITIONAL_JSON_HEADER /**< additional json header (ZMQ) */ + ADDITIONAL_JSON_HEADER, /**< additional json header (ZMQ) */ + RECEIVER_UDP_SCKT_BUF_SIZE, /**< UDP socket buffer size */ + RECEIVER_REAL_UDP_SCKT_BUF_SIZE /**< real UDP socket buffer size */ }; /** diff --git a/slsDetectorSoftware/slsDetector/slsDetector.cpp b/slsDetectorSoftware/slsDetector/slsDetector.cpp index 85906539c..fc0220810 100644 --- a/slsDetectorSoftware/slsDetector/slsDetector.cpp +++ b/slsDetectorSoftware/slsDetector/slsDetector.cpp @@ -6257,6 +6257,10 @@ string slsDetector::setNetworkParameter(networkParameter index, string value) { return setReceiverStreamingIP(value); case ADDITIONAL_JSON_HEADER: return setAdditionalJsonHeader(value); + case RECEIVER_UDP_SCKT_BUF_SIZE: + sscanf(value.c_str(),"%d",&i); + setReceiverUDPSocketBufferSize(i); + return getReceiverUDPSocketBufferSize(); default: return (char*)("unknown network parameter"); @@ -6298,6 +6302,11 @@ string slsDetector::getNetworkParameter(networkParameter index) { return getReceiverStreamingIP(); case ADDITIONAL_JSON_HEADER: return getAdditionalJsonHeader(); + case RECEIVER_UDP_SCKT_BUF_SIZE: + return getReceiverUDPSocketBufferSize(); + case RECEIVER_REAL_UDP_SCKT_BUF_SIZE: + return getReceiverRealUDPSocketBufferSize(); + default: return (char*)("unknown network parameter"); } @@ -6724,6 +6733,64 @@ string slsDetector::setAdditionalJsonHeader(string jsonheader) { } +string slsDetector::setReceiverUDPSocketBufferSize(int udpsockbufsize) { + + int fnum=F_RECEIVER_UDP_SOCK_BUF_SIZE; + int ret = FAIL; + int retval = -1; + int arg = udpsockbufsize; + + if(thisDetector->receiverOnlineFlag == ONLINE_FLAG){ +#ifdef VERBOSE + std::cout << "Sending UDP Socket Buffer size to receiver " << arg << std::endl; +#endif + if (connectData() == OK){ + ret=thisReceiver->sendInt(fnum,retval,arg); + disconnectData(); + } + if(ret==FAIL) { + setErrorMask((getErrorMask())|(COULDNOT_SET_NETWORK_PARAMETER)); + std::cout << "Warning: Could not set udp socket buffer size" << std::endl; + } + if(ret==FORCE_UPDATE) + updateReceiver(); + } + + ostringstream ss; + ss << retval; + string s = ss.str(); + return s; +} + + +string slsDetector::getReceiverRealUDPSocketBufferSize() { + + int fnum=F_RECEIVER_REAL_UDP_SOCK_BUF_SIZE; + int ret = FAIL; + int retval = -1; + + if(thisDetector->receiverOnlineFlag == ONLINE_FLAG){ +#ifdef VERBOSE + std::cout << "Getting real UDP Socket Buffer size to receiver " << std::endl; +#endif + if (connectData() == OK){ + ret=thisReceiver->getInt(fnum,retval); + disconnectData(); + } + if(ret==FAIL) { + setErrorMask((getErrorMask())|(COULDNOT_SET_NETWORK_PARAMETER)); + std::cout << "Warning: Could not get real socket buffer size" << std::endl; + } + if(ret==FORCE_UPDATE) + updateReceiver(); + } + + ostringstream ss; + ss << retval; + string s = ss.str(); + return s; +} + string slsDetector::setDetectorNetworkParameter(networkParameter index, int delay){ int fnum = F_SET_NETWORK_PARAMETER; diff --git a/slsDetectorSoftware/slsDetector/slsDetector.h b/slsDetectorSoftware/slsDetector/slsDetector.h index b319f5719..9fd02e008 100644 --- a/slsDetectorSoftware/slsDetector/slsDetector.h +++ b/slsDetectorSoftware/slsDetector/slsDetector.h @@ -1833,6 +1833,11 @@ class slsDetector : public slsDetectorUtils, public energyConversion { string getReceiverStreamingIP(){return string(thisDetector->receiver_zmqip);}; /** gets the additional json header, returns "none" if default setting and no custom set*/ string getAdditionalJsonHeader(){return string(thisDetector->receiver_additionalJsonHeader);}; + /** returns the receiver UDP socket buffer size */ + string getReceiverUDPSocketBufferSize() {return setReceiverUDPSocketBufferSize();}; + /** returns the real receiver UDP socket buffer size */ + string getReceiverRealUDPSocketBufferSize(); + /** validates the format of detector MAC address and sets it \sa sharedSlsDetector */ string setDetectorMAC(string detectorMAC); @@ -1858,6 +1863,8 @@ class slsDetector : public slsDetectorUtils, public energyConversion { string setReceiverStreamingIP(string sourceIP); /** additional json header, returns "none" if default setting and no custom set */ string setAdditionalJsonHeader(string jsonheader); + /** sets the receiver UDP socket buffer size */ + string setReceiverUDPSocketBufferSize(int udpsockbufsize=-1); /** sets the transmission delay for left or right port or for an entire frame*/ string setDetectorNetworkParameter(networkParameter index, int delay); diff --git a/slsDetectorSoftware/slsDetector/slsDetectorCommand.cpp b/slsDetectorSoftware/slsDetector/slsDetectorCommand.cpp index 60e8f9ffa..4ab729023 100644 --- a/slsDetectorSoftware/slsDetector/slsDetectorCommand.cpp +++ b/slsDetectorSoftware/slsDetector/slsDetectorCommand.cpp @@ -1978,6 +1978,21 @@ slsDetectorCommand::slsDetectorCommand(slsDetectorUtils *det) { descrToFuncMap[i].m_pFuncPtr=&slsDetectorCommand::cmdNetworkParameter; ++i; + /*! \page network + - rx_udpsocksize [size] sets/gets the UDP socket buffer size. Already trying to set by default to 100mb, 2gb for Jungfrau. Does not remember in client shared memory, so must be initialized each time after setting receiver hostname in config file.\c Returns \c (int) + */ + descrToFuncMap[i].m_pFuncName="rx_udpsocksize"; // + descrToFuncMap[i].m_pFuncPtr=&slsDetectorCommand::cmdNetworkParameter; + ++i; + + /*! \page network + - rx_realudpsocksize [size] gets the actual UDP socket buffer size. Usually double the set udp socket buffer size due to kernel bookkeeping. Get only. \c Returns \c (int) + */ + descrToFuncMap[i].m_pFuncName="rx_realudpsocksize"; // + descrToFuncMap[i].m_pFuncPtr=&slsDetectorCommand::cmdNetworkParameter; + ++i; + + /*! \page network - detectormac [mac] sets/gets the mac address of the detector UDP interface from where the detector will stream data. Use single-detector command. Normally unused. \c Returns \c (string) */ @@ -4071,7 +4086,18 @@ string slsDetectorCommand::cmdNetworkParameter(int narg, char *args[], int actio if (!(sscanf(args[1],"%d",&i))) return ("cannot parse argument") + string(args[1]); } - } else if (cmd=="txndelay_left") { + } else if (cmd=="rx_udpsocksize") { + t=RECEIVER_UDP_SCKT_BUF_SIZE; + if (action==PUT_ACTION){ + if (!(sscanf(args[1],"%d",&i))) + return ("cannot parse argument") + string(args[1]); + } + } else if (cmd=="rx_realudpsocksize") { + t=RECEIVER_REAL_UDP_SCKT_BUF_SIZE; + if (action==PUT_ACTION){ + return ("cannot put!"); + } + } else if (cmd=="txndelay_left") { t=DETECTOR_TXN_DELAY_LEFT; if (action==PUT_ACTION){ if (!(sscanf(args[1],"%d",&i))) @@ -4165,6 +4191,10 @@ string slsDetectorCommand::helpNetworkParameter(int narg, char *args[], int acti os << "rx_jsonaddheader [t]\n sets additional json header to be streamed " "out with the zmq from receiver. Default is empty. t must be in the format '\"label1\":\"value1\",\"label2\":\"value2\"' etc." "Use only if it needs to be processed by an intermediate process." << std::endl; + os << "rx_udpsocksize [t]\n sets the UDP socket buffer size. Different defaults for Jungfrau. " + "Does not remember in client shared memory, " + "so must be initialized each time after setting receiver " + "hostname in config file." << std::endl; } if (action==GET_ACTION || action==HELP_ACTION) { os << "detectormac \n gets detector mac "<< std::endl; @@ -4183,6 +4213,9 @@ string slsDetectorCommand::helpNetworkParameter(int narg, char *args[], int acti os << "rx_zmqip \n gets/gets the 0MQ (TCP) ip of the receiver from where data is streamed from. If no custom ip, empty until first time connect to receiver" << std::endl; os << "rx_jsonaddheader \n gets additional json header to be streamed " "out with the zmq from receiver." << std::endl; + os << "rx_udpsocksize \n gets the UDP socket buffer size." << std::endl; + os << "rx_realudpsocksize \n gets the actual UDP socket buffer size. Usually double the set udp socket buffer size due to kernel bookkeeping." << std::endl; + } return os.str(); diff --git a/slsDetectorSoftware/slsDetectorAnalysis/energyConversion.cpp b/slsDetectorSoftware/slsDetectorAnalysis/energyConversion.cpp index 301679b3a..91a555caa 100644 --- a/slsDetectorSoftware/slsDetectorAnalysis/energyConversion.cpp +++ b/slsDetectorSoftware/slsDetectorAnalysis/energyConversion.cpp @@ -149,8 +149,8 @@ slsDetectorDefs::sls_detector_module* energyConversion::interpolateTrim(detector enum eiger_DacIndex{SVP,VTR,VRF,VRS,SVN,VTGSTV,VCMP_LL,VCMP_LR,CAL,VCMP_RL,RXB_RB,RXB_LB,VCMP_RR,VCP,VCN,VIS}; //Copy other dacs - int num_dacs_to_copy = 9; int dacs_to_copy[] = {SVP,VTR,SVN,VTGSTV,RXB_RB,RXB_LB,VCN,VIS}; + int num_dacs_to_copy = sizeof(dacs_to_copy) / sizeof(dacs_to_copy[0]); for (int i = 0; i < num_dacs_to_copy; ++i) { if(a->dacs[dacs_to_copy[i]] != b->dacs[dacs_to_copy[i]]) { deleteModule(myMod); @@ -170,8 +170,8 @@ slsDetectorDefs::sls_detector_module* energyConversion::interpolateTrim(detector //Interpolate vrf, vcmp, vcp - int num_dacs_to_interpolate = 7; int dacs_to_interpolate[] = {VRF,VCMP_LL,VCMP_LR,VCMP_RL,VCMP_RR,VCP, VRS}; + int num_dacs_to_interpolate = sizeof(dacs_to_interpolate) / sizeof(dacs_to_interpolate[0]); for (int i = 0; i < num_dacs_to_interpolate; ++i) { myMod->dacs[dacs_to_interpolate[i]] = linearInterpolation(energy, e1, e2, a->dacs[dacs_to_interpolate[i]], b->dacs[dacs_to_interpolate[i]]); diff --git a/slsReceiverSoftware/include/GeneralData.h b/slsReceiverSoftware/include/GeneralData.h index 6809b156c..83acda63c 100644 --- a/slsReceiverSoftware/include/GeneralData.h +++ b/slsReceiverSoftware/include/GeneralData.h @@ -83,6 +83,9 @@ public: /** if standard header implemented in firmware */ bool standardheader; + /** default udp socket buffer size */ + uint32_t defaultUdpSocketBufferSize; + @@ -109,7 +112,8 @@ public: nPixelsXComplete(0), nPixelsYComplete(0), imageSizeComplete(0), - standardheader(false) + standardheader(false), + defaultUdpSocketBufferSize(RECEIVE_SOCKET_BUFFER_SIZE) {}; /** Destructor */ @@ -214,6 +218,7 @@ public: FILE_LOG(logDEBUG) << "Complete Pixels Y: " << nPixelsYComplete; FILE_LOG(logDEBUG) << "Complete Image Size: " << imageSizeComplete; FILE_LOG(logDEBUG) << "Standard Header: " << standardheader; + FILE_LOG(logDEBUG) << "UDP Socket Buffer Size: " << defaultUdpSocketBufferSize; }; }; @@ -494,6 +499,7 @@ class JungfrauData : public GeneralData { fifoBufferHeaderSize= FIFO_HEADER_NUMBYTES + sizeof(slsReceiverDefs::sls_detector_header); defaultFifoDepth = 2500; standardheader = true; + defaultUdpSocketBufferSize = (2000 * 1024 * 1024); }; }; diff --git a/slsReceiverSoftware/include/Listener.h b/slsReceiverSoftware/include/Listener.h index 0f8994bca..1db34af9e 100644 --- a/slsReceiverSoftware/include/Listener.h +++ b/slsReceiverSoftware/include/Listener.h @@ -31,9 +31,12 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject { * @param act pointer to activated * @param nf pointer to number of images to catch * @param dr pointer to dynamic range + * @param us pointer to udp socket buffer size + * @param as pointer to actual udp socket buffer size */ Listener(int& ret, int ind, detectorType dtype, Fifo*& f, runStatus* s, - uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr); + uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr, + uint32_t* us, uint32_t* as); /** * Destructor @@ -131,7 +134,13 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject { */ void SetSilentMode(bool mode); - + /** + * Create & closes a dummy UDP socket + * to set & get actual buffer size + * @param s UDP socket buffer size to be set + * @return OK or FAIL of dummy socket creation + */ + int CreateDummySocketForUDPSocketBufferSize(uint32_t s); private: @@ -223,6 +232,12 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject { /** Dynamic Range */ uint32_t* dynamicRange; + /** UDP Socket Buffer Size */ + uint32_t* udpSocketBufferSize; + + /** actual UDP Socket Buffer Size (double due to kernel bookkeeping) */ + uint32_t* actualUDPSocketBufferSize; + // acquisition start /** Aquisition Started flag */ diff --git a/slsReceiverSoftware/include/UDPBaseImplementation.h b/slsReceiverSoftware/include/UDPBaseImplementation.h index a549f9f87..4eb725e3a 100644 --- a/slsReceiverSoftware/include/UDPBaseImplementation.h +++ b/slsReceiverSoftware/include/UDPBaseImplementation.h @@ -277,6 +277,18 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter */ char *getAdditionalJsonHeader() const; + /** (not saved in client shared memory) + * Get UDP Socket Buffer Size + * @return UDP Socket Buffer Size + */ + uint32_t getUDPSocketBufferSize() const; + + + /** (not saved in client shared memory) + * Get actual UDP Socket Buffer Size + * @return actual UDP Socket Buffer Size + */ + uint32_t getActualUDPSocketBufferSize() const; /************************************************************************* * Setters *************************************************************** @@ -566,6 +578,13 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter */ void setAdditionalJsonHeader(const char* c); + /** (not saved in client shared memory) + * Set UDP Socket Buffer Size + * @param s UDP Socket Buffer Size + * @return OK or FAIL if dummy socket could be created + */ + int setUDPSocketBufferSize(const uint32_t s); + /* * Restream stop dummy packet from receiver * @return OK or FAIL @@ -691,6 +710,10 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter char eth[MAX_STR_LENGTH]; /** Server UDP Port Number*/ uint32_t udpPortNum[MAX_NUMBER_OF_LISTENING_THREADS]; + /** udp socket buffer size */ + uint32_t udpSocketBufferSize; + /** actual UDP Socket Buffer Size (halved due to kernel bookkeeping) */ + uint32_t actualUDPSocketBufferSize; //***file parameters*** /** File format */ diff --git a/slsReceiverSoftware/include/UDPInterface.h b/slsReceiverSoftware/include/UDPInterface.h index 776fc8dd1..763c1b5ba 100644 --- a/slsReceiverSoftware/include/UDPInterface.h +++ b/slsReceiverSoftware/include/UDPInterface.h @@ -332,7 +332,7 @@ class UDPInterface { */ virtual slsReceiverDefs::runStatus getStatus() const = 0; - /** + /** (not saved in client shared memory) * Get Silent Mode * @return silent mode */ @@ -365,6 +365,18 @@ class UDPInterface { virtual char *getAdditionalJsonHeader() const = 0; + /** (not saved in client shared memory) + * Get UDP Socket Buffer Size + * @return UDP Socket Buffer Size + */ + virtual uint32_t getUDPSocketBufferSize() const = 0; + + /** (not saved in client shared memory) + * Get actual UDP Socket Buffer Size + * @return actual UDP Socket Buffer Size + */ + virtual uint32_t getActualUDPSocketBufferSize() const = 0; + /************************************************************************* * Setters *************************************************************** * They modify the local cache of configuration or detector parameters *** @@ -655,6 +667,13 @@ class UDPInterface { */ virtual void setAdditionalJsonHeader(const char* c) = 0; + /** (not saved in client shared memory) + * Set UDP Socket Buffer Size + * @param s UDP Socket Buffer Size + * @return OK or FAIL if dummy socket could be created + */ + virtual int setUDPSocketBufferSize(const uint32_t s) = 0; + /* * Restream stop dummy packet from receiver * @return OK or FAIL diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index 6f0f71a70..c71d2ebe0 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -184,6 +184,13 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase */ void closeFiles(); + /** (not saved in client shared memory) + * Set UDP Socket Buffer Size + * @param s UDP Socket Buffer Size + * @return OK or FAIL if dummy socket could be created + */ + int setUDPSocketBufferSize(const uint32_t s); + /** * Restream stop dummy packet from receiver * @return OK or FAIL diff --git a/slsReceiverSoftware/include/genericSocket.h b/slsReceiverSoftware/include/genericSocket.h index db20bcaeb..633ed8068 100644 --- a/slsReceiverSoftware/include/genericSocket.h +++ b/slsReceiverSoftware/include/genericSocket.h @@ -49,6 +49,8 @@ class sockaddr_in; #include #include #include +#include // capabilities +#include #endif @@ -63,7 +65,7 @@ class sockaddr_in; using namespace std; #define DEFAULT_PACKET_SIZE 1286 -#define SOCKET_BUFFER_SIZE (2000*1024*1024) //2GB, previously 100MB +#define SOCKET_BUFFER_SIZE (100*1024*1024) //100 MB #define DEFAULT_BACKLOG 5 @@ -92,7 +94,8 @@ enum communicationProtocol{ nsending(0), nsent(0), total_sent(0),// sender (client): where to? ip - header_packet_size(0) + header_packet_size(0), + actual_udp_socket_buffer_size(0) { memset(&serverAddress, 0, sizeof(serverAddress)); memset(&clientAddress, 0, sizeof(clientAddress)); @@ -143,7 +146,7 @@ enum communicationProtocol{ genericSocket(unsigned short int const port_number, communicationProtocol p, int ps = DEFAULT_PACKET_SIZE, const char *eth=NULL, int hsize=0, - int buf_size=SOCKET_BUFFER_SIZE): + uint32_t buf_size=SOCKET_BUFFER_SIZE): portno(port_number), protocol(p), is_a_server(1), @@ -153,7 +156,8 @@ enum communicationProtocol{ nsending(0), nsent(0), total_sent(0), - header_packet_size(hsize) + header_packet_size(hsize), + actual_udp_socket_buffer_size(0) { @@ -209,49 +213,69 @@ enum communicationProtocol{ // reuse port - int val=1; - if (setsockopt(socketDescriptor,SOL_SOCKET,SO_REUSEADDR,&val,sizeof(int)) == -1) { - cprintf(RED, "setsockopt REUSEADDR failed\n"); - socketDescriptor=-1; - return; + { + int val=1; + if (setsockopt(socketDescriptor,SOL_SOCKET,SO_REUSEADDR,&val,sizeof(int)) == -1) { + cprintf(RED, "setsockopt REUSEADDR failed\n"); + socketDescriptor=-1; + return; + } } - //increase buffer size if its udp if (p == UDP) { - val = buf_size; - int real_val = -1; + uint32_t desired_size = buf_size; + uint32_t real_size = desired_size * 2; // kernel doubles this value for bookkeeping overhead + uint32_t ret_size = -1; socklen_t optlen = sizeof(int); - // set buffer size (could not set) - if (setsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &val, optlen) == -1) { - FILE_LOG(logWARNING) << "Could not set socket receive buffer size: " - << val << " : no root privileges?"; + + // confirm if sufficient + if (getsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen) == -1) { + FILE_LOG(logWARNING) << "[Port " << port_number << "] Could not get Socket Receive Buffer Size"; + } else if (ret_size >= real_size) { + actual_udp_socket_buffer_size = ret_size; +#ifdef VEBOSE + FILE_LOG(logINFO) << "[Port " << port_number << "] UDP Socket Buffer Size is sufficient (" << ret_size << ")"; +#endif } - // confirm size (could not get) - else if (getsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &real_val, &optlen) == -1) { - FILE_LOG(logWARNING) << "Could not get socket receive buffer size"; - } - // set buffer size worked if real val is twice the requested value - else if (real_val == val * 2) { - cprintf(GREEN, "UDP Socket buffer size modified to %d\n", real_val); - } - // buffer size too large + + // not sufficient, enhance size else { - // force a value larger than system limit (if run in a privileged context (capability CAP_NET_ADMIN set)) - int ret = setsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUFFORCE, &val, optlen); - getsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &real_val, &optlen); - if (ret == -1) { - FILE_LOG(logWARNING) << "Could not force socket receive buffer size to " - << val << ", real size is " << real_val << - " : no root privileges?"; - } else { - cprintf(GREEN, "UDP socket buffer size modified to %d\n", real_val); + // set buffer size (could not set) + if (setsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &desired_size, optlen) == -1) { + FILE_LOG(logWARNING) << "[Port " << port_number << "] Could not set Socket Receive Buffer Size to " + << desired_size << ". No Root Privileges?"; + } + // confirm size + else if (getsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen) == -1) { + FILE_LOG(logWARNING) << "[Port " << port_number << "] Could not get Socket Receive Buffer Size"; + } + else if (ret_size >= real_size) { + actual_udp_socket_buffer_size = ret_size; + FILE_LOG(logINFO) << "[Port " << port_number << "] UDP Socket Buffer Size modified to " << ret_size; + } + // buffer size too large + else { + actual_udp_socket_buffer_size = ret_size; + cprintf(BLUE, "[Port %u] wanted : %u, actualsize: %u\n", port_number, real_size, ret_size); + // force a value larger than system limit (if run in a privileged context (capability CAP_NET_ADMIN set)) + int ret = setsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUFFORCE, &desired_size, optlen); + getsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen); + if (ret == -1) { + FILE_LOG(logWARNING) << "[Port " << port_number << "] " + "Could not force Socket Receive Buffer Size to " + << desired_size << ". Real size is " << ret_size << + ". (No Root Privileges?)\n" + "Set rx_udpsocksize from the client to <= " << + (ret_size/2) << " (Real size:" << ret_size << ") to remove this warning.\n"; + } else { + FILE_LOG(logINFO) << "[Port " << port_number << "] UDP socket buffer size modified to " << ret_size; + } } } } - if(bind(socketDescriptor,(struct sockaddr *) &serverAddress,sizeof(serverAddress))<0){ cprintf(RED, "Can not bind socket\n"); socketDescriptor=-1; @@ -267,8 +291,13 @@ enum communicationProtocol{ - - + /** + * Returns actual udp socket buffer size/2. + * Halving is because of kernel book keeping + */ + int getActualUDPSocketBufferSize(){ + return actual_udp_socket_buffer_size; + } @@ -464,7 +493,7 @@ enum communicationProtocol{ void ShutDownSocket(){ - while(!shutdown(socketDescriptor, SHUT_RDWR)); + shutdown(socketDescriptor, SHUT_RDWR); Disconnect(); }; @@ -801,6 +830,7 @@ enum communicationProtocol{ int nsent; int total_sent; int header_packet_size; + int actual_udp_socket_buffer_size; // pthread_mutex_t mp; }; diff --git a/slsReceiverSoftware/include/receiver_defs.h b/slsReceiverSoftware/include/receiver_defs.h index 5b7b924c9..5f3bd6c48 100755 --- a/slsReceiverSoftware/include/receiver_defs.h +++ b/slsReceiverSoftware/include/receiver_defs.h @@ -7,6 +7,7 @@ //socket #define GOODBYE -200 #define RECEIVE_SOCKET_BUFFER_SIZE (100*1024*1024) + #define MAX_SOCKET_INPUT_PACKET_QUEUE 250000 diff --git a/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h b/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h index d6e797f0f..c7b4d1be0 100644 --- a/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h +++ b/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h @@ -300,6 +300,12 @@ class slsReceiverTCPIPInterface : private virtual slsReceiverDefs { /** set additional json header */ int set_additional_json_header(); + /** set udp socket buffer size */ + int set_udp_socket_buffer_size(); + + /** get real udp socket buffer size */ + int get_real_udp_socket_buffer_size(); + /** detector type */ diff --git a/slsReceiverSoftware/include/sls_receiver_funcs.h b/slsReceiverSoftware/include/sls_receiver_funcs.h index 9d6caeb48..5237140ec 100644 --- a/slsReceiverSoftware/include/sls_receiver_funcs.h +++ b/slsReceiverSoftware/include/sls_receiver_funcs.h @@ -64,7 +64,8 @@ enum recFuncs{ F_ENABLE_GAPPIXELS_IN_RECEIVER, /** < sets gap pixels in the receiver */ F_RESTREAM_STOP_FROM_RECEIVER, /** < restream stop from receiver */ F_ADDITIONAL_JSON_HEADER, /** < additional json header */ - + F_RECEIVER_UDP_SOCK_BUF_SIZE, /** < UDP socket buffer size */ + F_RECEIVER_REAL_UDP_SOCK_BUF_SIZE, /** < real UDP socket buffer size */ /* Always append functions hereafter!!! */ diff --git a/slsReceiverSoftware/src/Listener.cpp b/slsReceiverSoftware/src/Listener.cpp index 16fc1ab4a..07c6325f6 100644 --- a/slsReceiverSoftware/src/Listener.cpp +++ b/slsReceiverSoftware/src/Listener.cpp @@ -20,7 +20,8 @@ const string Listener::TypeName = "Listener"; Listener::Listener(int& ret, int ind, detectorType dtype, Fifo*& f, runStatus* s, - uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr) : + uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr, + uint32_t* us, uint32_t* as) : ThreadObject(ind), runningFlag(0), generalData(0), @@ -44,7 +45,9 @@ Listener::Listener(int& ret, int ind, detectorType dtype, Fifo*& f, runStatus* s carryOverPacket(0), listeningPacket(0), udpSocketAlive(0), - silentMode(false) + silentMode(false), + udpSocketBufferSize(us), + actualUDPSocketBufferSize(as) { ret = FAIL; if(ThreadObject::CreateThread() == OK) @@ -182,15 +185,17 @@ int Listener::CreateUDPSockets() { //if eth is mistaken with ip address if (strchr(eth,'.') != NULL){ - strncpy(eth,"", MAX_STR_LENGTH); + memset(eth, 0, MAX_STR_LENGTH); } if(!strlen(eth)){ FILE_LOG(logWARNING) << "eth is empty. Listening to all"; } ShutDownUDPSocket(); - udpSocket = new genericSocket(*udpPortNumber, genericSocket::UDP, - generalData->packetSize, (strlen(eth)?eth:NULL), generalData->headerPacketSize); + + udpSocket = new genericSocket(*udpPortNumber, genericSocket::UDP, + generalData->packetSize, (strlen(eth)?eth:NULL), generalData->headerPacketSize, + *udpSocketBufferSize); int iret = udpSocket->getErrorStatus(); if(!iret){ FILE_LOG(logINFO) << index << ": UDP port opened at port " << *udpPortNumber; @@ -200,6 +205,10 @@ int Listener::CreateUDPSockets() { } udpSocketAlive = true; sem_init(&semaphore_socket,1,0); + + // doubled due to kernel bookkeeping (could also be less due to permissions) + *actualUDPSocketBufferSize = udpSocket->getActualUDPSocketBufferSize(); + return OK; } @@ -225,6 +234,54 @@ void Listener::SetSilentMode(bool mode) { } +int Listener::CreateDummySocketForUDPSocketBufferSize(uint32_t s) { + uint32_t temp = *udpSocketBufferSize; + *udpSocketBufferSize = s; + + if (!(*activated)) + return OK; + + + + //if eth is mistaken with ip address + if (strchr(eth,'.') != NULL){ + memset(eth, 0, MAX_STR_LENGTH); + } + + // shutdown if any open + if(udpSocket){ + udpSocket->ShutDownSocket(); + delete udpSocket; + } + + //create dummy socket + udpSocket = new genericSocket(*udpPortNumber, genericSocket::UDP, + generalData->packetSize, (strlen(eth)?eth:NULL), generalData->headerPacketSize, + *udpSocketBufferSize); + int iret = udpSocket->getErrorStatus(); + if (iret){ + FILE_LOG(logERROR) << "Could not create a test UDP socket on port " << *udpPortNumber << " error: " << iret; + return FAIL; + } + + // doubled due to kernel bookkeeping (could also be less due to permissions) + *actualUDPSocketBufferSize = udpSocket->getActualUDPSocketBufferSize(); + if (*actualUDPSocketBufferSize != (s*2)) + *udpSocketBufferSize = temp; + + + // shutdown socket + if(udpSocket){ + udpSocketAlive = false; + udpSocket->ShutDownSocket(); + delete udpSocket; + udpSocket = 0; + } + + return OK; +} + + void Listener::ThreadExecution() { char* buffer; int rc = 0; diff --git a/slsReceiverSoftware/src/UDPBaseImplementation.cpp b/slsReceiverSoftware/src/UDPBaseImplementation.cpp index 7a2e87641..d1ab17173 100644 --- a/slsReceiverSoftware/src/UDPBaseImplementation.cpp +++ b/slsReceiverSoftware/src/UDPBaseImplementation.cpp @@ -65,6 +65,8 @@ void UDPBaseImplementation::initializeMembers(){ for(int i=0;ithreadsPerReceiver; fifoDepth = generalData->defaultFifoDepth; + udpSocketBufferSize = generalData->defaultUdpSocketBufferSize; //local network parameters SetLocalNetworkParameters(); @@ -375,11 +376,14 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) { for ( int i = 0; i < numThreads; ++i ) { int ret = FAIL; - Listener* l = new Listener(ret, i, myDetectorType, fifo[i], &status, &udpPortNum[i], eth, &activated, &numberOfFrames, &dynamicRange); + Listener* l = new Listener(ret, i, myDetectorType, fifo[i], &status, + &udpPortNum[i], eth, &activated, &numberOfFrames, &dynamicRange, + &udpSocketBufferSize, &actualUDPSocketBufferSize); DataProcessor* p = NULL; if (ret == OK) p = new DataProcessor(ret, i, fifo[i], &fileFormatType, - fileWriteEnable, &dataStreamEnable, &gapPixelsEnable, &dynamicRange, &frameToGuiFrequency, &frameToGuiTimerinMS, + fileWriteEnable, &dataStreamEnable, &gapPixelsEnable, + &dynamicRange, &frameToGuiFrequency, &frameToGuiTimerinMS, rawDataReadyCallBack, rawDataModifyReadyCallBack, pRawDataReady); // error in creating threads @@ -406,6 +410,9 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) { SetThreadPriorities(); + // check udp socket buffer size + setUDPSocketBufferSize(udpSocketBufferSize); + FILE_LOG(logDEBUG) << " Detector type set to " << getDetectorType(d); return OK; } @@ -629,6 +636,10 @@ void UDPStandardImplementation::closeFiles() { dataProcessor[0]->EndofAcquisition(maxIndexCaught); } +int UDPStandardImplementation::setUDPSocketBufferSize(const uint32_t s) { + if (listener.size()) + return listener[0]->CreateDummySocketForUDPSocketBufferSize(s); +} int UDPStandardImplementation::restreamStop() { bool ret = OK; @@ -666,8 +677,8 @@ void UDPStandardImplementation::SetLocalNetworkParameters() { MAX_SOCKET_INPUT_PACKET_QUEUE); } else { const char *msg = "Could not change max length of" - "input packet queue (net.core.netdev_max_backlog): no root privileges?"; - cprintf(RED, "WARNING: %s\n", msg); + "input packet queue (net.core.netdev_max_backlog). No Root Privileges?"; + FILE_LOG(logWARNING) << msg; } } } @@ -678,7 +689,7 @@ void UDPStandardImplementation::SetThreadPriorities() { for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it){ if ((*it)->SetThreadPriority(LISTENER_PRIORITY) == FAIL) { - FILE_LOG(logWARNING) << "No root privileges to prioritize listener threads"; + FILE_LOG(logWARNING) << "Could not prioritize listener threads. No Root Privileges?"; return; } } diff --git a/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp b/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp index 4e255f2b5..0df07bbe9 100644 --- a/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp +++ b/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp @@ -294,6 +294,9 @@ const char* slsReceiverTCPIPInterface::getFunctionName(enum recFuncs func) { case F_ENABLE_GAPPIXELS_IN_RECEIVER:return "F_ENABLE_GAPPIXELS_IN_RECEIVER"; case F_RESTREAM_STOP_FROM_RECEIVER: return "F_RESTREAM_STOP_FROM_RECEIVER"; case F_ADDITIONAL_JSON_HEADER: return "F_ADDITIONAL_JSON_HEADER"; + case F_RECEIVER_UDP_SOCK_BUF_SIZE: return "F_RECEIVER_UDP_SOCK_BUF_SIZE"; + case F_RECEIVER_REAL_UDP_SOCK_BUF_SIZE: return "F_RECEIVER_REAL_UDP_SOCK_BUF_SIZE"; + default: return "Unknown Function"; } } @@ -343,6 +346,8 @@ int slsReceiverTCPIPInterface::function_table(){ flist[F_ENABLE_GAPPIXELS_IN_RECEIVER] = &slsReceiverTCPIPInterface::enable_gap_pixels; flist[F_RESTREAM_STOP_FROM_RECEIVER] = &slsReceiverTCPIPInterface::restream_stop; flist[F_ADDITIONAL_JSON_HEADER] = &slsReceiverTCPIPInterface::set_additional_json_header; + flist[F_RECEIVER_UDP_SOCK_BUF_SIZE] = &slsReceiverTCPIPInterface::set_udp_socket_buffer_size; + flist[F_RECEIVER_REAL_UDP_SOCK_BUF_SIZE]= &slsReceiverTCPIPInterface::get_real_udp_socket_buffer_size; #ifdef VERYVERBOSE for (int i = 0; i < NUM_REC_FUNCTIONS ; i++) { @@ -2610,3 +2615,86 @@ int slsReceiverTCPIPInterface::set_additional_json_header() { // return ok/fail return ret; } + + + +int slsReceiverTCPIPInterface::set_udp_socket_buffer_size() { + ret = OK; + memset(mess, 0, sizeof(mess)); + int index = -1; + int retval = -1; + + // receive arguments + if (mySock->ReceiveDataOnly(&index,sizeof(index)) < 0 ) + return printSocketReadError(); + + // execute action +#ifdef SLS_RECEIVER_UDP_FUNCTIONS + if (receiverBase == NULL) + invalidReceiverObject(); + else { + // set + if(index >= 0) { + if (mySock->differentClients && lockStatus) + receiverlocked(); + else if (receiverBase->getStatus() != IDLE) + receiverNotIdle(); + else { + if (receiverBase->setUDPSocketBufferSize(index) == FAIL) { + ret = FAIL; + strcpy(mess, "Could not create dummy UDP Socket to test buffer size\n"); + FILE_LOG(logERROR) << mess; + } + } + } + //get + retval=receiverBase->getUDPSocketBufferSize(); + if(index >= 0 && retval != index) { + ret = FAIL; + strcpy(mess, "Could not set UDP Socket buffer size\n"); + FILE_LOG(logERROR) << mess; + } + } +#endif +#ifdef VERYVERBOSE + FILE_LOG(logDEBUG1) << "UDP Socket Buffer Size:" << retval; +#endif + + if (ret == OK && mySock->differentClients) + ret = FORCE_UPDATE; + + // send answer + mySock->SendDataOnly(&ret,sizeof(ret)); + if (ret == FAIL) + mySock->SendDataOnly(mess,sizeof(mess)); + mySock->SendDataOnly(&retval,sizeof(retval)); + + // return ok/fail + return ret; +} + + + +int slsReceiverTCPIPInterface::get_real_udp_socket_buffer_size(){ + ret = OK; + int retval = -1; + + // execute action +#ifdef SLS_RECEIVER_UDP_FUNCTIONS + if (receiverBase == NULL) + invalidReceiverObject(); + else retval = receiverBase->getActualUDPSocketBufferSize(); +#endif + + if (ret == OK && mySock->differentClients) + ret = FORCE_UPDATE; + + // send answer + mySock->SendDataOnly(&ret,sizeof(ret)); + mySock->SendDataOnly(&retval,sizeof(retval)); + + // return ok/fail + return ret; +} + +