esrf changes: rx_udpsocksize sets/gets udp socket buffer size to be set, rx_realudpsocksize gets the real udp sock size buffer. At receiver config and at rx_udpsocksize command, dummy udp sockets created to know if set udp sock size fails (if fail, set to previous value), and also to get the real udp sock buffer size

This commit is contained in:
2018-05-01 11:55:48 +02:00
parent 99281e2690
commit 1152555663
18 changed files with 537 additions and 86 deletions

View File

@ -197,7 +197,9 @@ enum networkParameter {
CLIENT_STREAMING_PORT, /**< client streaming TCP(ZMQ) port */
RECEIVER_STREAMING_SRC_IP,/**< receiever streaming TCP(ZMQ) ip */
CLIENT_STREAMING_SRC_IP, /**< client streaming TCP(ZMQ) ip */
ADDITIONAL_JSON_HEADER /**< additional json header (ZMQ) */
ADDITIONAL_JSON_HEADER, /**< additional json header (ZMQ) */
RECEIVER_UDP_SCKT_BUF_SIZE, /**< UDP socket buffer size */
RECEIVER_REAL_UDP_SCKT_BUF_SIZE /**< real UDP socket buffer size */
};
/**

View File

@ -6257,6 +6257,10 @@ string slsDetector::setNetworkParameter(networkParameter index, string value) {
return setReceiverStreamingIP(value);
case ADDITIONAL_JSON_HEADER:
return setAdditionalJsonHeader(value);
case RECEIVER_UDP_SCKT_BUF_SIZE:
sscanf(value.c_str(),"%d",&i);
setReceiverUDPSocketBufferSize(i);
return getReceiverUDPSocketBufferSize();
default:
return (char*)("unknown network parameter");
@ -6298,6 +6302,11 @@ string slsDetector::getNetworkParameter(networkParameter index) {
return getReceiverStreamingIP();
case ADDITIONAL_JSON_HEADER:
return getAdditionalJsonHeader();
case RECEIVER_UDP_SCKT_BUF_SIZE:
return getReceiverUDPSocketBufferSize();
case RECEIVER_REAL_UDP_SCKT_BUF_SIZE:
return getReceiverRealUDPSocketBufferSize();
default:
return (char*)("unknown network parameter");
}
@ -6724,6 +6733,64 @@ string slsDetector::setAdditionalJsonHeader(string jsonheader) {
}
string slsDetector::setReceiverUDPSocketBufferSize(int udpsockbufsize) {
int fnum=F_RECEIVER_UDP_SOCK_BUF_SIZE;
int ret = FAIL;
int retval = -1;
int arg = udpsockbufsize;
if(thisDetector->receiverOnlineFlag == ONLINE_FLAG){
#ifdef VERBOSE
std::cout << "Sending UDP Socket Buffer size to receiver " << arg << std::endl;
#endif
if (connectData() == OK){
ret=thisReceiver->sendInt(fnum,retval,arg);
disconnectData();
}
if(ret==FAIL) {
setErrorMask((getErrorMask())|(COULDNOT_SET_NETWORK_PARAMETER));
std::cout << "Warning: Could not set udp socket buffer size" << std::endl;
}
if(ret==FORCE_UPDATE)
updateReceiver();
}
ostringstream ss;
ss << retval;
string s = ss.str();
return s;
}
string slsDetector::getReceiverRealUDPSocketBufferSize() {
int fnum=F_RECEIVER_REAL_UDP_SOCK_BUF_SIZE;
int ret = FAIL;
int retval = -1;
if(thisDetector->receiverOnlineFlag == ONLINE_FLAG){
#ifdef VERBOSE
std::cout << "Getting real UDP Socket Buffer size to receiver " << std::endl;
#endif
if (connectData() == OK){
ret=thisReceiver->getInt(fnum,retval);
disconnectData();
}
if(ret==FAIL) {
setErrorMask((getErrorMask())|(COULDNOT_SET_NETWORK_PARAMETER));
std::cout << "Warning: Could not get real socket buffer size" << std::endl;
}
if(ret==FORCE_UPDATE)
updateReceiver();
}
ostringstream ss;
ss << retval;
string s = ss.str();
return s;
}
string slsDetector::setDetectorNetworkParameter(networkParameter index, int delay){
int fnum = F_SET_NETWORK_PARAMETER;

View File

@ -1833,6 +1833,11 @@ class slsDetector : public slsDetectorUtils, public energyConversion {
string getReceiverStreamingIP(){return string(thisDetector->receiver_zmqip);};
/** gets the additional json header, returns "none" if default setting and no custom set*/
string getAdditionalJsonHeader(){return string(thisDetector->receiver_additionalJsonHeader);};
/** returns the receiver UDP socket buffer size */
string getReceiverUDPSocketBufferSize() {return setReceiverUDPSocketBufferSize();};
/** returns the real receiver UDP socket buffer size */
string getReceiverRealUDPSocketBufferSize();
/** validates the format of detector MAC address and sets it \sa sharedSlsDetector */
string setDetectorMAC(string detectorMAC);
@ -1858,6 +1863,8 @@ class slsDetector : public slsDetectorUtils, public energyConversion {
string setReceiverStreamingIP(string sourceIP);
/** additional json header, returns "none" if default setting and no custom set */
string setAdditionalJsonHeader(string jsonheader);
/** sets the receiver UDP socket buffer size */
string setReceiverUDPSocketBufferSize(int udpsockbufsize=-1);
/** sets the transmission delay for left or right port or for an entire frame*/
string setDetectorNetworkParameter(networkParameter index, int delay);

View File

@ -1978,6 +1978,21 @@ slsDetectorCommand::slsDetectorCommand(slsDetectorUtils *det) {
descrToFuncMap[i].m_pFuncPtr=&slsDetectorCommand::cmdNetworkParameter;
++i;
/*! \page network
- <b>rx_udpsocksize [size]</b> sets/gets the UDP socket buffer size. Already trying to set by default to 100mb, 2gb for Jungfrau. Does not remember in client shared memory, so must be initialized each time after setting receiver hostname in config file.\c Returns \c (int)
*/
descrToFuncMap[i].m_pFuncName="rx_udpsocksize"; //
descrToFuncMap[i].m_pFuncPtr=&slsDetectorCommand::cmdNetworkParameter;
++i;
/*! \page network
- <b>rx_realudpsocksize [size]</b> gets the actual UDP socket buffer size. Usually double the set udp socket buffer size due to kernel bookkeeping. Get only. \c Returns \c (int)
*/
descrToFuncMap[i].m_pFuncName="rx_realudpsocksize"; //
descrToFuncMap[i].m_pFuncPtr=&slsDetectorCommand::cmdNetworkParameter;
++i;
/*! \page network
- <b>detectormac [mac]</b> sets/gets the mac address of the detector UDP interface from where the detector will stream data. Use single-detector command. Normally unused. \c Returns \c (string)
*/
@ -4071,7 +4086,18 @@ string slsDetectorCommand::cmdNetworkParameter(int narg, char *args[], int actio
if (!(sscanf(args[1],"%d",&i)))
return ("cannot parse argument") + string(args[1]);
}
} else if (cmd=="txndelay_left") {
} else if (cmd=="rx_udpsocksize") {
t=RECEIVER_UDP_SCKT_BUF_SIZE;
if (action==PUT_ACTION){
if (!(sscanf(args[1],"%d",&i)))
return ("cannot parse argument") + string(args[1]);
}
} else if (cmd=="rx_realudpsocksize") {
t=RECEIVER_REAL_UDP_SCKT_BUF_SIZE;
if (action==PUT_ACTION){
return ("cannot put!");
}
} else if (cmd=="txndelay_left") {
t=DETECTOR_TXN_DELAY_LEFT;
if (action==PUT_ACTION){
if (!(sscanf(args[1],"%d",&i)))
@ -4165,6 +4191,10 @@ string slsDetectorCommand::helpNetworkParameter(int narg, char *args[], int acti
os << "rx_jsonaddheader [t]\n sets additional json header to be streamed "
"out with the zmq from receiver. Default is empty. t must be in the format '\"label1\":\"value1\",\"label2\":\"value2\"' etc."
"Use only if it needs to be processed by an intermediate process." << std::endl;
os << "rx_udpsocksize [t]\n sets the UDP socket buffer size. Different defaults for Jungfrau. "
"Does not remember in client shared memory, "
"so must be initialized each time after setting receiver "
"hostname in config file." << std::endl;
}
if (action==GET_ACTION || action==HELP_ACTION) {
os << "detectormac \n gets detector mac "<< std::endl;
@ -4183,6 +4213,9 @@ string slsDetectorCommand::helpNetworkParameter(int narg, char *args[], int acti
os << "rx_zmqip \n gets/gets the 0MQ (TCP) ip of the receiver from where data is streamed from. If no custom ip, empty until first time connect to receiver" << std::endl;
os << "rx_jsonaddheader \n gets additional json header to be streamed "
"out with the zmq from receiver." << std::endl;
os << "rx_udpsocksize \n gets the UDP socket buffer size." << std::endl;
os << "rx_realudpsocksize \n gets the actual UDP socket buffer size. Usually double the set udp socket buffer size due to kernel bookkeeping." << std::endl;
}
return os.str();

View File

@ -149,8 +149,8 @@ slsDetectorDefs::sls_detector_module* energyConversion::interpolateTrim(detector
enum eiger_DacIndex{SVP,VTR,VRF,VRS,SVN,VTGSTV,VCMP_LL,VCMP_LR,CAL,VCMP_RL,RXB_RB,RXB_LB,VCMP_RR,VCP,VCN,VIS};
//Copy other dacs
int num_dacs_to_copy = 9;
int dacs_to_copy[] = {SVP,VTR,SVN,VTGSTV,RXB_RB,RXB_LB,VCN,VIS};
int num_dacs_to_copy = sizeof(dacs_to_copy) / sizeof(dacs_to_copy[0]);
for (int i = 0; i < num_dacs_to_copy; ++i) {
if(a->dacs[dacs_to_copy[i]] != b->dacs[dacs_to_copy[i]]) {
deleteModule(myMod);
@ -170,8 +170,8 @@ slsDetectorDefs::sls_detector_module* energyConversion::interpolateTrim(detector
//Interpolate vrf, vcmp, vcp
int num_dacs_to_interpolate = 7;
int dacs_to_interpolate[] = {VRF,VCMP_LL,VCMP_LR,VCMP_RL,VCMP_RR,VCP, VRS};
int num_dacs_to_interpolate = sizeof(dacs_to_interpolate) / sizeof(dacs_to_interpolate[0]);
for (int i = 0; i < num_dacs_to_interpolate; ++i) {
myMod->dacs[dacs_to_interpolate[i]] = linearInterpolation(energy, e1, e2,
a->dacs[dacs_to_interpolate[i]], b->dacs[dacs_to_interpolate[i]]);

View File

@ -83,6 +83,9 @@ public:
/** if standard header implemented in firmware */
bool standardheader;
/** default udp socket buffer size */
uint32_t defaultUdpSocketBufferSize;
@ -109,7 +112,8 @@ public:
nPixelsXComplete(0),
nPixelsYComplete(0),
imageSizeComplete(0),
standardheader(false)
standardheader(false),
defaultUdpSocketBufferSize(RECEIVE_SOCKET_BUFFER_SIZE)
{};
/** Destructor */
@ -214,6 +218,7 @@ public:
FILE_LOG(logDEBUG) << "Complete Pixels Y: " << nPixelsYComplete;
FILE_LOG(logDEBUG) << "Complete Image Size: " << imageSizeComplete;
FILE_LOG(logDEBUG) << "Standard Header: " << standardheader;
FILE_LOG(logDEBUG) << "UDP Socket Buffer Size: " << defaultUdpSocketBufferSize;
};
};
@ -494,6 +499,7 @@ class JungfrauData : public GeneralData {
fifoBufferHeaderSize= FIFO_HEADER_NUMBYTES + sizeof(slsReceiverDefs::sls_detector_header);
defaultFifoDepth = 2500;
standardheader = true;
defaultUdpSocketBufferSize = (2000 * 1024 * 1024);
};
};

View File

@ -31,9 +31,12 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject {
* @param act pointer to activated
* @param nf pointer to number of images to catch
* @param dr pointer to dynamic range
* @param us pointer to udp socket buffer size
* @param as pointer to actual udp socket buffer size
*/
Listener(int& ret, int ind, detectorType dtype, Fifo*& f, runStatus* s,
uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr);
uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr,
uint32_t* us, uint32_t* as);
/**
* Destructor
@ -131,7 +134,13 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject {
*/
void SetSilentMode(bool mode);
/**
* Create & closes a dummy UDP socket
* to set & get actual buffer size
* @param s UDP socket buffer size to be set
* @return OK or FAIL of dummy socket creation
*/
int CreateDummySocketForUDPSocketBufferSize(uint32_t s);
private:
@ -223,6 +232,12 @@ class Listener : private virtual slsReceiverDefs, public ThreadObject {
/** Dynamic Range */
uint32_t* dynamicRange;
/** UDP Socket Buffer Size */
uint32_t* udpSocketBufferSize;
/** actual UDP Socket Buffer Size (double due to kernel bookkeeping) */
uint32_t* actualUDPSocketBufferSize;
// acquisition start
/** Aquisition Started flag */

View File

@ -277,6 +277,18 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
*/
char *getAdditionalJsonHeader() const;
/** (not saved in client shared memory)
* Get UDP Socket Buffer Size
* @return UDP Socket Buffer Size
*/
uint32_t getUDPSocketBufferSize() const;
/** (not saved in client shared memory)
* Get actual UDP Socket Buffer Size
* @return actual UDP Socket Buffer Size
*/
uint32_t getActualUDPSocketBufferSize() const;
/*************************************************************************
* Setters ***************************************************************
@ -566,6 +578,13 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
*/
void setAdditionalJsonHeader(const char* c);
/** (not saved in client shared memory)
* Set UDP Socket Buffer Size
* @param s UDP Socket Buffer Size
* @return OK or FAIL if dummy socket could be created
*/
int setUDPSocketBufferSize(const uint32_t s);
/*
* Restream stop dummy packet from receiver
* @return OK or FAIL
@ -691,6 +710,10 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
char eth[MAX_STR_LENGTH];
/** Server UDP Port Number*/
uint32_t udpPortNum[MAX_NUMBER_OF_LISTENING_THREADS];
/** udp socket buffer size */
uint32_t udpSocketBufferSize;
/** actual UDP Socket Buffer Size (halved due to kernel bookkeeping) */
uint32_t actualUDPSocketBufferSize;
//***file parameters***
/** File format */

View File

@ -332,7 +332,7 @@ class UDPInterface {
*/
virtual slsReceiverDefs::runStatus getStatus() const = 0;
/**
/** (not saved in client shared memory)
* Get Silent Mode
* @return silent mode
*/
@ -365,6 +365,18 @@ class UDPInterface {
virtual char *getAdditionalJsonHeader() const = 0;
/** (not saved in client shared memory)
* Get UDP Socket Buffer Size
* @return UDP Socket Buffer Size
*/
virtual uint32_t getUDPSocketBufferSize() const = 0;
/** (not saved in client shared memory)
* Get actual UDP Socket Buffer Size
* @return actual UDP Socket Buffer Size
*/
virtual uint32_t getActualUDPSocketBufferSize() const = 0;
/*************************************************************************
* Setters ***************************************************************
* They modify the local cache of configuration or detector parameters ***
@ -655,6 +667,13 @@ class UDPInterface {
*/
virtual void setAdditionalJsonHeader(const char* c) = 0;
/** (not saved in client shared memory)
* Set UDP Socket Buffer Size
* @param s UDP Socket Buffer Size
* @return OK or FAIL if dummy socket could be created
*/
virtual int setUDPSocketBufferSize(const uint32_t s) = 0;
/*
* Restream stop dummy packet from receiver
* @return OK or FAIL

View File

@ -184,6 +184,13 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
*/
void closeFiles();
/** (not saved in client shared memory)
* Set UDP Socket Buffer Size
* @param s UDP Socket Buffer Size
* @return OK or FAIL if dummy socket could be created
*/
int setUDPSocketBufferSize(const uint32_t s);
/**
* Restream stop dummy packet from receiver
* @return OK or FAIL

View File

@ -49,6 +49,8 @@ class sockaddr_in;
#include <sys/ioctl.h>
#include <net/if.h>
#include <ifaddrs.h>
#include <sys/prctl.h> // capabilities
#include <linux/capability.h>
#endif
@ -63,7 +65,7 @@ class sockaddr_in;
using namespace std;
#define DEFAULT_PACKET_SIZE 1286
#define SOCKET_BUFFER_SIZE (2000*1024*1024) //2GB, previously 100MB
#define SOCKET_BUFFER_SIZE (100*1024*1024) //100 MB
#define DEFAULT_BACKLOG 5
@ -92,7 +94,8 @@ enum communicationProtocol{
nsending(0),
nsent(0),
total_sent(0),// sender (client): where to? ip
header_packet_size(0)
header_packet_size(0),
actual_udp_socket_buffer_size(0)
{
memset(&serverAddress, 0, sizeof(serverAddress));
memset(&clientAddress, 0, sizeof(clientAddress));
@ -143,7 +146,7 @@ enum communicationProtocol{
genericSocket(unsigned short int const port_number, communicationProtocol p,
int ps = DEFAULT_PACKET_SIZE, const char *eth=NULL, int hsize=0,
int buf_size=SOCKET_BUFFER_SIZE):
uint32_t buf_size=SOCKET_BUFFER_SIZE):
portno(port_number),
protocol(p),
is_a_server(1),
@ -153,7 +156,8 @@ enum communicationProtocol{
nsending(0),
nsent(0),
total_sent(0),
header_packet_size(hsize)
header_packet_size(hsize),
actual_udp_socket_buffer_size(0)
{
@ -209,49 +213,69 @@ enum communicationProtocol{
// reuse port
int val=1;
if (setsockopt(socketDescriptor,SOL_SOCKET,SO_REUSEADDR,&val,sizeof(int)) == -1) {
cprintf(RED, "setsockopt REUSEADDR failed\n");
socketDescriptor=-1;
return;
{
int val=1;
if (setsockopt(socketDescriptor,SOL_SOCKET,SO_REUSEADDR,&val,sizeof(int)) == -1) {
cprintf(RED, "setsockopt REUSEADDR failed\n");
socketDescriptor=-1;
return;
}
}
//increase buffer size if its udp
if (p == UDP) {
val = buf_size;
int real_val = -1;
uint32_t desired_size = buf_size;
uint32_t real_size = desired_size * 2; // kernel doubles this value for bookkeeping overhead
uint32_t ret_size = -1;
socklen_t optlen = sizeof(int);
// set buffer size (could not set)
if (setsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &val, optlen) == -1) {
FILE_LOG(logWARNING) << "Could not set socket receive buffer size: "
<< val << " : no root privileges?";
// confirm if sufficient
if (getsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen) == -1) {
FILE_LOG(logWARNING) << "[Port " << port_number << "] Could not get Socket Receive Buffer Size";
} else if (ret_size >= real_size) {
actual_udp_socket_buffer_size = ret_size;
#ifdef VEBOSE
FILE_LOG(logINFO) << "[Port " << port_number << "] UDP Socket Buffer Size is sufficient (" << ret_size << ")";
#endif
}
// confirm size (could not get)
else if (getsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &real_val, &optlen) == -1) {
FILE_LOG(logWARNING) << "Could not get socket receive buffer size";
}
// set buffer size worked if real val is twice the requested value
else if (real_val == val * 2) {
cprintf(GREEN, "UDP Socket buffer size modified to %d\n", real_val);
}
// buffer size too large
// not sufficient, enhance size
else {
// force a value larger than system limit (if run in a privileged context (capability CAP_NET_ADMIN set))
int ret = setsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUFFORCE, &val, optlen);
getsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &real_val, &optlen);
if (ret == -1) {
FILE_LOG(logWARNING) << "Could not force socket receive buffer size to "
<< val << ", real size is " << real_val <<
" : no root privileges?";
} else {
cprintf(GREEN, "UDP socket buffer size modified to %d\n", real_val);
// set buffer size (could not set)
if (setsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &desired_size, optlen) == -1) {
FILE_LOG(logWARNING) << "[Port " << port_number << "] Could not set Socket Receive Buffer Size to "
<< desired_size << ". No Root Privileges?";
}
// confirm size
else if (getsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen) == -1) {
FILE_LOG(logWARNING) << "[Port " << port_number << "] Could not get Socket Receive Buffer Size";
}
else if (ret_size >= real_size) {
actual_udp_socket_buffer_size = ret_size;
FILE_LOG(logINFO) << "[Port " << port_number << "] UDP Socket Buffer Size modified to " << ret_size;
}
// buffer size too large
else {
actual_udp_socket_buffer_size = ret_size;
cprintf(BLUE, "[Port %u] wanted : %u, actualsize: %u\n", port_number, real_size, ret_size);
// force a value larger than system limit (if run in a privileged context (capability CAP_NET_ADMIN set))
int ret = setsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUFFORCE, &desired_size, optlen);
getsockopt(socketDescriptor, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen);
if (ret == -1) {
FILE_LOG(logWARNING) << "[Port " << port_number << "] "
"Could not force Socket Receive Buffer Size to "
<< desired_size << ". Real size is " << ret_size <<
". (No Root Privileges?)\n"
"Set rx_udpsocksize from the client to <= " <<
(ret_size/2) << " (Real size:" << ret_size << ") to remove this warning.\n";
} else {
FILE_LOG(logINFO) << "[Port " << port_number << "] UDP socket buffer size modified to " << ret_size;
}
}
}
}
if(bind(socketDescriptor,(struct sockaddr *) &serverAddress,sizeof(serverAddress))<0){
cprintf(RED, "Can not bind socket\n");
socketDescriptor=-1;
@ -267,8 +291,13 @@ enum communicationProtocol{
/**
* Returns actual udp socket buffer size/2.
* Halving is because of kernel book keeping
*/
int getActualUDPSocketBufferSize(){
return actual_udp_socket_buffer_size;
}
@ -464,7 +493,7 @@ enum communicationProtocol{
void ShutDownSocket(){
while(!shutdown(socketDescriptor, SHUT_RDWR));
shutdown(socketDescriptor, SHUT_RDWR);
Disconnect();
};
@ -801,6 +830,7 @@ enum communicationProtocol{
int nsent;
int total_sent;
int header_packet_size;
int actual_udp_socket_buffer_size;
// pthread_mutex_t mp;
};

View File

@ -7,6 +7,7 @@
//socket
#define GOODBYE -200
#define RECEIVE_SOCKET_BUFFER_SIZE (100*1024*1024)
#define MAX_SOCKET_INPUT_PACKET_QUEUE 250000

View File

@ -300,6 +300,12 @@ class slsReceiverTCPIPInterface : private virtual slsReceiverDefs {
/** set additional json header */
int set_additional_json_header();
/** set udp socket buffer size */
int set_udp_socket_buffer_size();
/** get real udp socket buffer size */
int get_real_udp_socket_buffer_size();
/** detector type */

View File

@ -64,7 +64,8 @@ enum recFuncs{
F_ENABLE_GAPPIXELS_IN_RECEIVER, /** < sets gap pixels in the receiver */
F_RESTREAM_STOP_FROM_RECEIVER, /** < restream stop from receiver */
F_ADDITIONAL_JSON_HEADER, /** < additional json header */
F_RECEIVER_UDP_SOCK_BUF_SIZE, /** < UDP socket buffer size */
F_RECEIVER_REAL_UDP_SOCK_BUF_SIZE, /** < real UDP socket buffer size */
/* Always append functions hereafter!!! */

View File

@ -20,7 +20,8 @@ const string Listener::TypeName = "Listener";
Listener::Listener(int& ret, int ind, detectorType dtype, Fifo*& f, runStatus* s,
uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr) :
uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr,
uint32_t* us, uint32_t* as) :
ThreadObject(ind),
runningFlag(0),
generalData(0),
@ -44,7 +45,9 @@ Listener::Listener(int& ret, int ind, detectorType dtype, Fifo*& f, runStatus* s
carryOverPacket(0),
listeningPacket(0),
udpSocketAlive(0),
silentMode(false)
silentMode(false),
udpSocketBufferSize(us),
actualUDPSocketBufferSize(as)
{
ret = FAIL;
if(ThreadObject::CreateThread() == OK)
@ -182,15 +185,17 @@ int Listener::CreateUDPSockets() {
//if eth is mistaken with ip address
if (strchr(eth,'.') != NULL){
strncpy(eth,"", MAX_STR_LENGTH);
memset(eth, 0, MAX_STR_LENGTH);
}
if(!strlen(eth)){
FILE_LOG(logWARNING) << "eth is empty. Listening to all";
}
ShutDownUDPSocket();
udpSocket = new genericSocket(*udpPortNumber, genericSocket::UDP,
generalData->packetSize, (strlen(eth)?eth:NULL), generalData->headerPacketSize);
udpSocket = new genericSocket(*udpPortNumber, genericSocket::UDP,
generalData->packetSize, (strlen(eth)?eth:NULL), generalData->headerPacketSize,
*udpSocketBufferSize);
int iret = udpSocket->getErrorStatus();
if(!iret){
FILE_LOG(logINFO) << index << ": UDP port opened at port " << *udpPortNumber;
@ -200,6 +205,10 @@ int Listener::CreateUDPSockets() {
}
udpSocketAlive = true;
sem_init(&semaphore_socket,1,0);
// doubled due to kernel bookkeeping (could also be less due to permissions)
*actualUDPSocketBufferSize = udpSocket->getActualUDPSocketBufferSize();
return OK;
}
@ -225,6 +234,54 @@ void Listener::SetSilentMode(bool mode) {
}
int Listener::CreateDummySocketForUDPSocketBufferSize(uint32_t s) {
uint32_t temp = *udpSocketBufferSize;
*udpSocketBufferSize = s;
if (!(*activated))
return OK;
//if eth is mistaken with ip address
if (strchr(eth,'.') != NULL){
memset(eth, 0, MAX_STR_LENGTH);
}
// shutdown if any open
if(udpSocket){
udpSocket->ShutDownSocket();
delete udpSocket;
}
//create dummy socket
udpSocket = new genericSocket(*udpPortNumber, genericSocket::UDP,
generalData->packetSize, (strlen(eth)?eth:NULL), generalData->headerPacketSize,
*udpSocketBufferSize);
int iret = udpSocket->getErrorStatus();
if (iret){
FILE_LOG(logERROR) << "Could not create a test UDP socket on port " << *udpPortNumber << " error: " << iret;
return FAIL;
}
// doubled due to kernel bookkeeping (could also be less due to permissions)
*actualUDPSocketBufferSize = udpSocket->getActualUDPSocketBufferSize();
if (*actualUDPSocketBufferSize != (s*2))
*udpSocketBufferSize = temp;
// shutdown socket
if(udpSocket){
udpSocketAlive = false;
udpSocket->ShutDownSocket();
delete udpSocket;
udpSocket = 0;
}
return OK;
}
void Listener::ThreadExecution() {
char* buffer;
int rc = 0;

View File

@ -65,6 +65,8 @@ void UDPBaseImplementation::initializeMembers(){
for(int i=0;i<MAX_NUMBER_OF_LISTENING_THREADS;i++){
udpPortNum[i] = DEFAULT_UDP_PORTNO + i;
}
udpSocketBufferSize = 0;
actualUDPSocketBufferSize = 0;
//***file parameters***
fileFormatType = BINARY;
@ -99,9 +101,13 @@ UDPBaseImplementation::~UDPBaseImplementation(){}
*************************************************************************/
/**initial parameters***/
int* UDPBaseImplementation::getMultiDetectorSize() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return (int*) numDet;}
int* UDPBaseImplementation::getMultiDetectorSize() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return (int*) numDet;}
int UDPBaseImplementation::getDetectorPositionId() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return detID;}
int UDPBaseImplementation::getDetectorPositionId() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return detID;}
char *UDPBaseImplementation::getDetectorHostname() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
@ -128,7 +134,9 @@ bool UDPBaseImplementation::getGapPixelsEnable() const {
}
/***file parameters***/
slsReceiverDefs::fileFormat UDPBaseImplementation::getFileFormat() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return fileFormatType;}
slsReceiverDefs::fileFormat UDPBaseImplementation::getFileFormat() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return fileFormatType;}
char *UDPBaseImplementation::getFileName() const{
@ -157,28 +165,48 @@ char *UDPBaseImplementation::getFilePath() const{
return output;
}
uint64_t UDPBaseImplementation::getFileIndex() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return fileIndex;}
uint64_t UDPBaseImplementation::getFileIndex() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return fileIndex;}
int UDPBaseImplementation::getScanTag() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return scanTag;}
int UDPBaseImplementation::getScanTag() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return scanTag;}
bool UDPBaseImplementation::getFileWriteEnable() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return fileWriteEnable;}
bool UDPBaseImplementation::getFileWriteEnable() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return fileWriteEnable;}
bool UDPBaseImplementation::getOverwriteEnable() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return overwriteEnable;}
bool UDPBaseImplementation::getOverwriteEnable() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return overwriteEnable;}
bool UDPBaseImplementation::getDataCompressionEnable() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return dataCompressionEnable;}
bool UDPBaseImplementation::getDataCompressionEnable() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return dataCompressionEnable;}
/***acquisition count parameters***/
uint64_t UDPBaseImplementation::getTotalFramesCaught() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return 0;}
uint64_t UDPBaseImplementation::getTotalFramesCaught() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return 0;}
uint64_t UDPBaseImplementation::getFramesCaught() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return 0;}
uint64_t UDPBaseImplementation::getFramesCaught() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return 0;}
int64_t UDPBaseImplementation::getAcquisitionIndex() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return -1;}
int64_t UDPBaseImplementation::getAcquisitionIndex() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return -1;}
/***connection parameters***/
uint32_t UDPBaseImplementation::getUDPPortNumber() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return udpPortNum[0];}
uint32_t UDPBaseImplementation::getUDPPortNumber() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return udpPortNum[0];}
uint32_t UDPBaseImplementation::getUDPPortNumber2() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return udpPortNum[1];}
uint32_t UDPBaseImplementation::getUDPPortNumber2() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return udpPortNum[1];}
char *UDPBaseImplementation::getEthernetInterface() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
@ -191,38 +219,70 @@ char *UDPBaseImplementation::getEthernetInterface() const{
/***acquisition parameters***/
int UDPBaseImplementation::getShortFrameEnable() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return shortFrameEnable;}
int UDPBaseImplementation::getShortFrameEnable() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return shortFrameEnable;}
uint32_t UDPBaseImplementation::getFrameToGuiFrequency() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return frameToGuiFrequency;}
uint32_t UDPBaseImplementation::getFrameToGuiFrequency() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return frameToGuiFrequency;}
uint32_t UDPBaseImplementation::getFrameToGuiTimer() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return frameToGuiTimerinMS;}
uint32_t UDPBaseImplementation::getFrameToGuiTimer() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return frameToGuiTimerinMS;}
bool UDPBaseImplementation::getDataStreamEnable() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return dataStreamEnable;}
bool UDPBaseImplementation::getDataStreamEnable() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return dataStreamEnable;}
uint64_t UDPBaseImplementation::getAcquisitionPeriod() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return acquisitionPeriod;}
uint64_t UDPBaseImplementation::getAcquisitionPeriod() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return acquisitionPeriod;}
uint64_t UDPBaseImplementation::getAcquisitionTime() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return acquisitionTime;}
uint64_t UDPBaseImplementation::getAcquisitionTime() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return acquisitionTime;}
uint64_t UDPBaseImplementation::getSubExpTime() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return subExpTime;}
uint64_t UDPBaseImplementation::getSubExpTime() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return subExpTime;}
uint64_t UDPBaseImplementation::getNumberOfFrames() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return numberOfFrames;}
uint64_t UDPBaseImplementation::getNumberOfFrames() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return numberOfFrames;}
uint64_t UDPBaseImplementation::getNumberofSamples() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return numberOfSamples;}
uint64_t UDPBaseImplementation::getNumberofSamples() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return numberOfSamples;}
uint32_t UDPBaseImplementation::getDynamicRange() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return dynamicRange;}
uint32_t UDPBaseImplementation::getDynamicRange() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return dynamicRange;}
bool UDPBaseImplementation::getTenGigaEnable() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return tengigaEnable;}
bool UDPBaseImplementation::getTenGigaEnable() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return tengigaEnable;}
uint32_t UDPBaseImplementation::getFifoDepth() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return fifoDepth;}
uint32_t UDPBaseImplementation::getFifoDepth() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return fifoDepth;}
/***receiver status***/
slsReceiverDefs::runStatus UDPBaseImplementation::getStatus() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return status;}
slsReceiverDefs::runStatus UDPBaseImplementation::getStatus() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return status;}
uint32_t UDPBaseImplementation::getSilentMode() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return silentMode;}
uint32_t UDPBaseImplementation::getSilentMode() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return silentMode;}
int UDPBaseImplementation::getActivate() const{FILE_LOG(logDEBUG) << __AT__ << " starting"; return activated;}
int UDPBaseImplementation::getActivate() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return activated;}
uint32_t UDPBaseImplementation::getStreamingPort() const{FILE_LOG(logDEBUG) << __AT__ << " starting"; return streamingPort;}
uint32_t UDPBaseImplementation::getStreamingPort() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return streamingPort;}
char *UDPBaseImplementation::getStreamingSourceIP() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
@ -243,6 +303,16 @@ char *UDPBaseImplementation::getAdditionalJsonHeader() const{
return output;
}
uint32_t UDPBaseImplementation::getUDPSocketBufferSize() const {
FILE_LOG(logDEBUG) << __AT__ << " starting";
return udpSocketBufferSize;
}
uint32_t UDPBaseImplementation::getActualUDPSocketBufferSize() const {
FILE_LOG(logDEBUG) << __AT__ << " starting";
return actualUDPSocketBufferSize;
}
/*************************************************************************
* Setters ***************************************************************
* They modify the local cache of configuration or detector parameters ***
@ -614,6 +684,14 @@ void UDPBaseImplementation::setAdditionalJsonHeader(const char c[]){
FILE_LOG(logINFO) << "Additional JSON Header: " << additionalJsonHeader;
}
int UDPBaseImplementation::setUDPSocketBufferSize(const uint32_t s) {
FILE_LOG(logDEBUG) << __AT__ << " starting";
udpSocketBufferSize = s;
return OK;
}
int UDPBaseImplementation::restreamStop() {
FILE_LOG(logERROR) << __AT__ << " doing nothing...";
FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes";

View File

@ -360,6 +360,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
}
numThreads = generalData->threadsPerReceiver;
fifoDepth = generalData->defaultFifoDepth;
udpSocketBufferSize = generalData->defaultUdpSocketBufferSize;
//local network parameters
SetLocalNetworkParameters();
@ -375,11 +376,14 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
for ( int i = 0; i < numThreads; ++i ) {
int ret = FAIL;
Listener* l = new Listener(ret, i, myDetectorType, fifo[i], &status, &udpPortNum[i], eth, &activated, &numberOfFrames, &dynamicRange);
Listener* l = new Listener(ret, i, myDetectorType, fifo[i], &status,
&udpPortNum[i], eth, &activated, &numberOfFrames, &dynamicRange,
&udpSocketBufferSize, &actualUDPSocketBufferSize);
DataProcessor* p = NULL;
if (ret == OK)
p = new DataProcessor(ret, i, fifo[i], &fileFormatType,
fileWriteEnable, &dataStreamEnable, &gapPixelsEnable, &dynamicRange, &frameToGuiFrequency, &frameToGuiTimerinMS,
fileWriteEnable, &dataStreamEnable, &gapPixelsEnable,
&dynamicRange, &frameToGuiFrequency, &frameToGuiTimerinMS,
rawDataReadyCallBack, rawDataModifyReadyCallBack, pRawDataReady);
// error in creating threads
@ -406,6 +410,9 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
SetThreadPriorities();
// check udp socket buffer size
setUDPSocketBufferSize(udpSocketBufferSize);
FILE_LOG(logDEBUG) << " Detector type set to " << getDetectorType(d);
return OK;
}
@ -629,6 +636,10 @@ void UDPStandardImplementation::closeFiles() {
dataProcessor[0]->EndofAcquisition(maxIndexCaught);
}
int UDPStandardImplementation::setUDPSocketBufferSize(const uint32_t s) {
if (listener.size())
return listener[0]->CreateDummySocketForUDPSocketBufferSize(s);
}
int UDPStandardImplementation::restreamStop() {
bool ret = OK;
@ -666,8 +677,8 @@ void UDPStandardImplementation::SetLocalNetworkParameters() {
MAX_SOCKET_INPUT_PACKET_QUEUE);
} else {
const char *msg = "Could not change max length of"
"input packet queue (net.core.netdev_max_backlog): no root privileges?";
cprintf(RED, "WARNING: %s\n", msg);
"input packet queue (net.core.netdev_max_backlog). No Root Privileges?";
FILE_LOG(logWARNING) << msg;
}
}
}
@ -678,7 +689,7 @@ void UDPStandardImplementation::SetThreadPriorities() {
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it){
if ((*it)->SetThreadPriority(LISTENER_PRIORITY) == FAIL) {
FILE_LOG(logWARNING) << "No root privileges to prioritize listener threads";
FILE_LOG(logWARNING) << "Could not prioritize listener threads. No Root Privileges?";
return;
}
}

View File

@ -294,6 +294,9 @@ const char* slsReceiverTCPIPInterface::getFunctionName(enum recFuncs func) {
case F_ENABLE_GAPPIXELS_IN_RECEIVER:return "F_ENABLE_GAPPIXELS_IN_RECEIVER";
case F_RESTREAM_STOP_FROM_RECEIVER: return "F_RESTREAM_STOP_FROM_RECEIVER";
case F_ADDITIONAL_JSON_HEADER: return "F_ADDITIONAL_JSON_HEADER";
case F_RECEIVER_UDP_SOCK_BUF_SIZE: return "F_RECEIVER_UDP_SOCK_BUF_SIZE";
case F_RECEIVER_REAL_UDP_SOCK_BUF_SIZE: return "F_RECEIVER_REAL_UDP_SOCK_BUF_SIZE";
default: return "Unknown Function";
}
}
@ -343,6 +346,8 @@ int slsReceiverTCPIPInterface::function_table(){
flist[F_ENABLE_GAPPIXELS_IN_RECEIVER] = &slsReceiverTCPIPInterface::enable_gap_pixels;
flist[F_RESTREAM_STOP_FROM_RECEIVER] = &slsReceiverTCPIPInterface::restream_stop;
flist[F_ADDITIONAL_JSON_HEADER] = &slsReceiverTCPIPInterface::set_additional_json_header;
flist[F_RECEIVER_UDP_SOCK_BUF_SIZE] = &slsReceiverTCPIPInterface::set_udp_socket_buffer_size;
flist[F_RECEIVER_REAL_UDP_SOCK_BUF_SIZE]= &slsReceiverTCPIPInterface::get_real_udp_socket_buffer_size;
#ifdef VERYVERBOSE
for (int i = 0; i < NUM_REC_FUNCTIONS ; i++) {
@ -2610,3 +2615,86 @@ int slsReceiverTCPIPInterface::set_additional_json_header() {
// return ok/fail
return ret;
}
int slsReceiverTCPIPInterface::set_udp_socket_buffer_size() {
ret = OK;
memset(mess, 0, sizeof(mess));
int index = -1;
int retval = -1;
// receive arguments
if (mySock->ReceiveDataOnly(&index,sizeof(index)) < 0 )
return printSocketReadError();
// execute action
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
if (receiverBase == NULL)
invalidReceiverObject();
else {
// set
if(index >= 0) {
if (mySock->differentClients && lockStatus)
receiverlocked();
else if (receiverBase->getStatus() != IDLE)
receiverNotIdle();
else {
if (receiverBase->setUDPSocketBufferSize(index) == FAIL) {
ret = FAIL;
strcpy(mess, "Could not create dummy UDP Socket to test buffer size\n");
FILE_LOG(logERROR) << mess;
}
}
}
//get
retval=receiverBase->getUDPSocketBufferSize();
if(index >= 0 && retval != index) {
ret = FAIL;
strcpy(mess, "Could not set UDP Socket buffer size\n");
FILE_LOG(logERROR) << mess;
}
}
#endif
#ifdef VERYVERBOSE
FILE_LOG(logDEBUG1) << "UDP Socket Buffer Size:" << retval;
#endif
if (ret == OK && mySock->differentClients)
ret = FORCE_UPDATE;
// send answer
mySock->SendDataOnly(&ret,sizeof(ret));
if (ret == FAIL)
mySock->SendDataOnly(mess,sizeof(mess));
mySock->SendDataOnly(&retval,sizeof(retval));
// return ok/fail
return ret;
}
int slsReceiverTCPIPInterface::get_real_udp_socket_buffer_size(){
ret = OK;
int retval = -1;
// execute action
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
if (receiverBase == NULL)
invalidReceiverObject();
else retval = receiverBase->getActualUDPSocketBufferSize();
#endif
if (ret == OK && mySock->differentClients)
ret = FORCE_UPDATE;
// send answer
mySock->SendDataOnly(&ret,sizeof(ret));
mySock->SendDataOnly(&retval,sizeof(retval));
// return ok/fail
return ret;
}