works zmq separated for client and rx, ports and ip included in expert advanced mode

This commit is contained in:
Dhanya Maliakal 2017-11-17 17:39:04 +01:00
commit c5c23e73d5
4 changed files with 95 additions and 41 deletions

View File

@ -5784,7 +5784,7 @@ int multiSlsDetector::createReceivingDataSockets(const bool destroy){
sscanf(detectors[i/numSocketsPerDetector]->getClientStreamingPort().c_str(),"%d",&portnum);
portnum += (i%numSocketsPerDetector);
zmqSocket[i] = new ZmqSocket(detectors[i/numSocketsPerDetector]->getReceiver().c_str(), portnum);
zmqSocket[i] = new ZmqSocket(detectors[i/numSocketsPerDetector]->getClientStreamingIP().c_str(), portnum);
if (zmqSocket[i]->IsError()) {
cprintf(RED, "Error: Could not create Zmq socket on port %d\n", portnum);
createReceivingDataSockets(true);
@ -5903,8 +5903,6 @@ void multiSlsDetector::readFrameFromReceiver(){
if (jungfrau)
multiframegain = new char[multidatabytes]();
int nch;
bool runningList[numSockets];
int numRunning = 0;

View File

@ -559,8 +559,10 @@ int slsDetector::initializeDetectorSize(detectorType type) {
/** set detector ip address */
memset(thisDetector->detectorIP,0,MAX_STR_LENGTH);
strcpy(thisDetector->detectorIP,DEFAULT_DET_IP);
/** set zmq tcp src ip address */
memset(thisDetector->zmqsrcip,0,MAX_STR_LENGTH);
/** set zmq tcp src ip address in client */
memset(thisDetector->zmqip,0,MAX_STR_LENGTH);
/** set zmq tcp src ip address in receiver*/
memset(thisDetector->receiver_zmqip,0,MAX_STR_LENGTH);
/** sets onlineFlag to OFFLINE_FLAG */
@ -6028,15 +6030,13 @@ string slsDetector::setNetworkParameter(networkParameter index, string value) {
sscanf(value.c_str(),"%d",&i);
return setDetectorNetworkParameter(index, i);
case CLIENT_STREAMING_PORT:
setClientStreamingPort(value);
return getClientStreamingPort();
return setClientStreamingPort(value);
case RECEIVER_STREAMING_PORT:
setReceiverStreamingPort(value);
return getReceiverStreamingPort();
case RECEIVER_STREAMING_SRC_IP:
return setReceiverStreamingSourceIP(value);
return setReceiverStreamingPort(value);
case CLIENT_STREAMING_SRC_IP:
return setClientStreamingSourceIP(value);
return setClientStreamingIP(value);
case RECEIVER_STREAMING_SRC_IP:
return setReceiverStreamingIP(value);
default:
return (char*)("unknown network parameter");
}
@ -6071,10 +6071,10 @@ string slsDetector::getNetworkParameter(networkParameter index) {
return getClientStreamingPort();
case RECEIVER_STREAMING_PORT:
return getReceiverStreamingPort();
case RECEIVER_STREAMING_SRC_IP:
return getReceiverStreamingSourceIP();
case CLIENT_STREAMING_SRC_IP:
return getClientStreamingSourceIP();
case CLIENT_STREAMING_SRC_IP:
return getClientStreamingIP();
case RECEIVER_STREAMING_SRC_IP:
return getReceiverStreamingIP();
default:
return (char*)("unknown network parameter");
}
@ -6174,7 +6174,7 @@ string slsDetector::setReceiver(string receiverIP){
std::cout << "dynamic range:" << thisDetector->dynamicRange << endl << endl;
std::cout << "flippeddatax:" << thisDetector->flippedData[d] << endl;
std::cout << "10GbE:" << thisDetector->tenGigaEnable << endl << endl;
std::cout << "streaming source ip:" << thisDetector->zmqsrcip << endl;
std::cout << "rx streaming source ip:" << thisDetector->receiver_zmqip << endl;
std::cout << "enable gap pixels:" << thisDetector->gappixels << endl;
std::cout << "rx streaming port:" << thisDetector->receiver_zmqport << endl;
@ -6220,6 +6220,7 @@ string slsDetector::setReceiver(string receiverIP){
// data streaming
setReceiverStreamingPort(getReceiverStreamingPort());
setReceiverStreamingIP(getReceiverStreamingIP());
enableDataStreamingFromReceiver(enableDataStreamingFromReceiver(-1));
}
}
@ -6377,16 +6378,61 @@ string slsDetector::setReceiverStreamingPort(string port) {
return getReceiverStreamingPort();
}
string slsDetector::setReceiverStreamingSourceIP(string sourceIP) {
string slsDetector::setClientStreamingIP(string sourceIP) {
struct addrinfo *result;
// on failure to convert to a valid ip
if (dataSocket->ConvertHostnameToInternetAddress(sourceIP.c_str(), &result)) {
std::cout << "Warning: Could not convert zmqip into a valid IP" << sourceIP << std::endl;
setErrorMask((getErrorMask())|(COULDNOT_SET_NETWORK_PARAMETER));
return getClientStreamingIP();
}
// on success put IP as string into arg
else {
memset(thisDetector->zmqip, 0, MAX_STR_LENGTH);
dataSocket->ConvertInternetAddresstoIpString(result, thisDetector->zmqip, MAX_STR_LENGTH);
}
return getClientStreamingIP();
}
string slsDetector::setReceiverStreamingIP(string sourceIP) {
int fnum=F_RECEIVER_STREAMING_SRC_IP;
int ret = FAIL;
char arg[MAX_STR_LENGTH];
memset(arg,0, sizeof(arg));
strcpy(arg,sourceIP.c_str());
memset(arg,0,sizeof(arg));
char retval[MAX_STR_LENGTH];
memset(retval,0, sizeof(retval));
// if empty, give rx_hostname
if (sourceIP.empty()) {
if(!strcmp(thisDetector->receiver_hostname,"none")) {
std::cout << "Receiver hostname not set yet. Cannot create rx_zmqip from none\n" << endl;
setErrorMask((getErrorMask())|(COULDNOT_SET_NETWORK_PARAMETER));
return getReceiverStreamingIP();
}
sourceIP.assign(thisDetector->receiver_hostname);
}
// verify the ip
{
struct addrinfo *result;
// on failure to convert to a valid ip
if (dataSocket->ConvertHostnameToInternetAddress(sourceIP.c_str(), &result)) {
std::cout << "Warning: Could not convert rx_zmqip into a valid IP" << sourceIP << std::endl;
setErrorMask((getErrorMask())|(COULDNOT_SET_NETWORK_PARAMETER));
return getReceiverStreamingIP();
}
// on success put IP as string into arg
else {
memset(thisDetector->zmqip, 0, MAX_STR_LENGTH);
dataSocket->ConvertInternetAddresstoIpString(result, arg, MAX_STR_LENGTH);
}
}
if(thisDetector->receiverOnlineFlag==ONLINE_FLAG){
#ifdef VERBOSE
std::cout << "Sending receiver streaming source ip to receiver " << arg << std::endl;
@ -6396,16 +6442,21 @@ string slsDetector::setReceiverStreamingSourceIP(string sourceIP) {
disconnectData();
}
if(ret!=FAIL) {
memset(thisDetector->zmqsrcip, 0, MAX_STR_LENGTH);
strcpy(thisDetector->zmqsrcip, retval);
memset(thisDetector->receiver_zmqip, 0, MAX_STR_LENGTH);
strcpy(thisDetector->receiver_zmqip, retval);
// if zmqip is empty, update it
if (! strlen(thisDetector->zmqip))
strcpy(thisDetector->zmqip, retval);
}
if(ret==FORCE_UPDATE)
updateReceiver();
}
return getReceiverStreamingSourceIP();
return getReceiverStreamingIP();
}
string slsDetector::setDetectorNetworkParameter(networkParameter index, int delay){
int fnum = F_SET_NETWORK_PARAMETER;
int ret = FAIL;
@ -8433,7 +8484,7 @@ int slsDetector::updateReceiverNoWait() {
// streaming source ip
n += dataSocket->ReceiveDataOnly(path,MAX_STR_LENGTH);
strcpy(thisDetector->zmqsrcip, path);
strcpy(thisDetector->receiver_zmqip, path);
// gap pixels
n += dataSocket->ReceiveDataOnly(&ind,sizeof(ind));

View File

@ -275,9 +275,10 @@ class slsDetector : public slsDetectorUtils, public energyConversion {
int receiver_zmqport;
/** data streaming (up stream) enable in receiver */
bool receiver_datastream;
/** zmq tcp src ip address between receiver and gui (only data) **/
char zmqsrcip[MAX_STR_LENGTH];
/** zmq tcp src ip address in client (only data) **/
char zmqip[MAX_STR_LENGTH];
/** zmq tcp src ip address in receiver (only data) **/
char receiver_zmqip[MAX_STR_LENGTH];
/** gap pixels enable */
int gappixels;
/** gap pixels in each direction */
@ -1765,8 +1766,10 @@ class slsDetector : public slsDetectorUtils, public energyConversion {
string getClientStreamingPort() {ostringstream ss; ss << thisDetector->zmqport; string s = ss.str(); return s;};
/** returns the receiver zmq port \sa sharedSlsDetector */
string getReceiverStreamingPort() {ostringstream ss; ss << thisDetector->receiver_zmqport; string s = ss.str(); return s;};
/** gets the zmq source ip in client and receiver, returns "none" if default setting and no custom ip set*/
string getReceiverStreamingSourceIP(){return string(thisDetector->zmqsrcip);};
/** gets the zmq source ip in client, returns "none" if default setting and no custom ip set*/
string getClientStreamingIP(){return string(thisDetector->zmqip);};
/** gets the zmq source ip in receiver, returns "none" if default setting and no custom ip set*/
string getReceiverStreamingIP(){return string(thisDetector->receiver_zmqip);};
/** validates the format of detector MAC address and sets it \sa sharedSlsDetector */
string setDetectorMAC(string detectorMAC);
@ -1786,8 +1789,10 @@ class slsDetector : public slsDetectorUtils, public energyConversion {
string setClientStreamingPort(string port);
/** sets the zmq port in receiver (includes "multi" at the end if it should calculate individual ports \sa sharedSlsDetector */
string setReceiverStreamingPort(string port);
/** sets the zmq source ip in client and receiver */
string setReceiverStreamingSourceIP(string sourceIP);
/** sets the zmq source ip in client */
string setClientStreamingIP(string sourceIP);
/** sets the zmq source ip in receiver. if empty, uses rx_hostname*/
string setReceiverStreamingIP(string sourceIP);
/** sets the transmission delay for left or right port or for an entire frame*/
string setDetectorNetworkParameter(networkParameter index, int delay);

View File

@ -1916,16 +1916,16 @@ slsDetectorCommand::slsDetectorCommand(slsDetectorUtils *det) {
++i;
/*! \page network
- <b>zmqsrcip [ip]</b> sets/gets the 0MQ (TCP) ip of the client to where final data is streamed to (eg. for GUI). Default is ip of rx_hostname and works for GUI. This command to change from default can be used from command line when sockets are not already open as the command line is not aware/create the 0mq sockets in the client side. This is usually used to stream in from an external process. \c Returns \c (string)
- <b>zmqip [ip]</b> sets/gets the 0MQ (TCP) ip of the client to where final data is streamed to (eg. for GUI). For Experts only! Default is ip of rx_hostname and works for GUI. This command to change from default can be used from command line when sockets are not already open as the command line is not aware/create the 0mq sockets in the client side. This is usually used to stream in from an external process. . If no custom ip, empty until first time connect to receiver. \c Returns \c (string)
*/
descrToFuncMap[i].m_pFuncName="zmqsrcip"; //
descrToFuncMap[i].m_pFuncName="zmqip"; //
descrToFuncMap[i].m_pFuncPtr=&slsDetectorCommand::cmdNetworkParameter;
i++;
/*! \page network
- <b>rx_zmqsrcip [ip]</b> sets/gets the 0MQ (TCP) ip of the receiver from where data is streamed from (eg. to GUI or another process for further processing). Default is ip of rx_hostname and works for GUI. This is usually used to stream out to an external process for further processing. \c Returns \c (string)
- <b>rx_zmqip [ip]</b> sets/gets the 0MQ (TCP) ip of the receiver from where data is streamed from (eg. to GUI or another process for further processing). For Experts only! Default is ip of rx_hostname and works for GUI. This is usually used to stream out to an external process for further processing. . If no custom ip, empty until first time connect to receiver. \c Returns \c (string)
*/
descrToFuncMap[i].m_pFuncName="rx_zmqsrcip"; //
descrToFuncMap[i].m_pFuncName="rx_zmqip"; //
descrToFuncMap[i].m_pFuncPtr=&slsDetectorCommand::cmdNetworkParameter;
i++;
@ -3969,9 +3969,9 @@ string slsDetectorCommand::cmdNetworkParameter(int narg, char *args[], int actio
if (!(sscanf(args[1],"%d",&i)))
return ("cannot parse argument") + string(args[1]);
}
}else if (cmd=="zmqsrcip") {
}else if (cmd=="zmqip") {
t=CLIENT_STREAMING_SRC_IP;
}else if (cmd=="rx_zmqsrcip") {
}else if (cmd=="rx_zmqip") {
t=RECEIVER_STREAMING_SRC_IP;
}
else return ("unknown network parameter")+cmd;
@ -4004,10 +4004,10 @@ string slsDetectorCommand::helpNetworkParameter(int narg, char *args[], int acti
"Use single-detector command to set individually or multi-detector command to calculate based on port for the rest."<< std::endl;
os << "rx_zmqport port \n sets the 0MQ (TCP) port of the receiver from where data is streamed from (eg. to GUI or another process for further processing). "
"Use single-detector command to set individually or multi-detector command to calculate based on port for the rest."<< std::endl;
os << "zmqsrcip ip \n sets the 0MQ (TCP) ip of the client to where final data is streamed to (eg. for GUI). Default is ip of rx_hostname and works for GUI. "
os << "zmqip ip \n sets the 0MQ (TCP) ip of the client to where final data is streamed to (eg. for GUI). Default is ip of rx_hostname and works for GUI. "
"This command to change from default can be used from command line when sockets are not already open as the command line is not aware/create the 0mq sockets in the client side. "
"This is usually used to stream in from an external process." << std::endl;
os << "rx_zmqsrcip ip \n sets/gets the 0MQ (TCP) ip of the receiver from where data is streamed from (eg. to GUI or another process for further processing). "
os << "rx_zmqip ip \n sets/gets the 0MQ (TCP) ip of the receiver from where data is streamed from (eg. to GUI or another process for further processing). "
"Default is ip of rx_hostname and works for GUI. This is usually used to stream out to an external process for further processing." << std::endl;
}
if (action==GET_ACTION || action==HELP_ACTION) {
@ -4023,8 +4023,8 @@ string slsDetectorCommand::helpNetworkParameter(int narg, char *args[], int acti
os << "flowcontrol_10g \n gets flow control for 10g for eiger"<< std::endl;
os << "zmqport \n gets the 0MQ (TCP) port of the client to where final data is streamed to"<< std::endl;
os << "rx_zmqport \n gets the 0MQ (TCP) port of the receiver from where data is streamed from"<< std::endl;
os << "zmqsrcip \n gets the 0MQ (TCP) ip of the client to where final data is streamed to, none if default setting and no custom ip" << std::endl;
os << "rx_zmqsrcip \n gets/gets the 0MQ (TCP) ip of the receiver from where data is streamed from, none if default setting and no custom ip" << std::endl;
os << "zmqip \n gets the 0MQ (TCP) ip of the client to where final data is streamed to.If no custom ip, empty until first time connect to receiver" << std::endl;
os << "rx_zmqip \n gets/gets the 0MQ (TCP) ip of the receiver from where data is streamed from. If no custom ip, empty until first time connect to receiver" << std::endl;
}
return os.str();