merge conflict from 3.0.1

This commit is contained in:
Dhanya Maliakal 2017-11-17 11:35:02 +01:00
commit 51499fa503
11 changed files with 271 additions and 175 deletions

View File

@ -242,7 +242,7 @@ public:
retval.append("Could not activate/deactivate receiver\n"); retval.append("Could not activate/deactivate receiver\n");
if(slsErrorMask&DATA_STREAMING) if(slsErrorMask&DATA_STREAMING)
retval.append("Could not set/reset Data Streaming\n"); retval.append("Could not enable/disable Data Streaming\n");
if(slsErrorMask&RESET_ERROR) if(slsErrorMask&RESET_ERROR)
retval.append("Could not reset the FPGA\n"); retval.append("Could not reset the FPGA\n");

View File

@ -194,6 +194,7 @@ enum networkParameter {
FLOW_CONTROL_WR_PTR, /**< memory write pointer for flow control */ FLOW_CONTROL_WR_PTR, /**< memory write pointer for flow control */
FLOW_CONTROL_RD_PTR, /**< memory read pointer for flow control */ FLOW_CONTROL_RD_PTR, /**< memory read pointer for flow control */
RECEIVER_STREAMING_PORT, /**< receiever streaming TCP(ZMQ) port */ RECEIVER_STREAMING_PORT, /**< receiever streaming TCP(ZMQ) port */
CLIENT_STREAMING_PORT, /**< client streaming TCP(ZMQ) port */
RECEIVER_STREAMING_SRC_IP /**< receiever streaming TCP(ZMQ) ip */ RECEIVER_STREAMING_SRC_IP /**< receiever streaming TCP(ZMQ) ip */
}; };

View File

@ -216,6 +216,7 @@ multiSlsDetector::multiSlsDetector(int id) : slsDetectorUtils(), shmId(-1)
thisMultiDetector->receiver_read_freq = 0; thisMultiDetector->receiver_read_freq = 0;
thisMultiDetector->acquiringFlag = false; thisMultiDetector->acquiringFlag = false;
thisMultiDetector->externalgui = false; thisMultiDetector->externalgui = false;
thisMultiDetector->receiver_datastream = false;
thisMultiDetector->alreadyExisting=1; thisMultiDetector->alreadyExisting=1;
} }
@ -281,7 +282,7 @@ multiSlsDetector::multiSlsDetector(int id) : slsDetectorUtils(), shmId(-1)
getNMods(); getNMods();
getMaxMods(); getMaxMods();
dataSocketsStarted = false; client_datastream = false;
for(int i=0;i<MAXDET;++i) for(int i=0;i<MAXDET;++i)
zmqSocket[i] = 0; zmqSocket[i] = 0;
threadpool = 0; threadpool = 0;
@ -3728,10 +3729,21 @@ string multiSlsDetector::setNetworkParameter(networkParameter p, string s){
// disable data streaming before changing zmq port (but only if they were on) // disable data streaming before changing zmq port (but only if they were on)
int prev_streaming = 0; int prev_streaming = 0;
if (p == RECEIVER_STREAMING_PORT || p == RECEIVER_STREAMING_SRC_IP) { switch (p) {
prev_streaming = getStreamingSocketsCreatedInClient(); case RECEIVER_STREAMING_PORT:
enableDataStreamingFromReceiver(0); prev_streaming = enableDataStreamingFromReceiver();
} enableDataStreamingFromReceiver(0);
break;
case CLIENT_STREAMING_PORT:
prev_streaming = enableDataStreamingToClient();
enableDataStreamingToClient(0);
break;
case RECEIVER_STREAMING_SRC_IP:
prev_streaming = enableDataStreamingFromReceiver();
enableDataStreamingFromReceiver(0);
default: break;
}
if (s.find('+')==string::npos) { if (s.find('+')==string::npos) {
@ -3742,7 +3754,7 @@ string multiSlsDetector::setNetworkParameter(networkParameter p, string s){
string* sret[thisMultiDetector->numberOfDetectors]; string* sret[thisMultiDetector->numberOfDetectors];
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; ++idet){ for(int idet=0; idet<thisMultiDetector->numberOfDetectors; ++idet){
if(detectors[idet]){ if(detectors[idet]){
if (p == RECEIVER_STREAMING_PORT) if (p == RECEIVER_STREAMING_PORT || p == CLIENT_STREAMING_PORT)
s.append("multi\0"); s.append("multi\0");
sret[idet]=new string("error"); sret[idet]=new string("error");
Task* task = new Task(new func2_t<string,networkParameter,string>(&slsDetector::setNetworkParameter, Task* task = new Task(new func2_t<string,networkParameter,string>(&slsDetector::setNetworkParameter,
@ -3786,8 +3798,20 @@ string multiSlsDetector::setNetworkParameter(networkParameter p, string s){
} }
//enable data streaming if it was on //enable data streaming if it was on
if ((p == RECEIVER_STREAMING_PORT || p == RECEIVER_STREAMING_SRC_IP) && prev_streaming) if (prev_streaming) {
enableDataStreamingFromReceiver(1); switch (p) {
case RECEIVER_STREAMING_PORT:
enableDataStreamingFromReceiver(1);
break;
case CLIENT_STREAMING_PORT:
enableDataStreamingToClient(1);
break;
case RECEIVER_STREAMING_SRC_IP:
enableDataStreamingFromReceiver(1);
break;
default: break;
}
}
return getNetworkParameter(p); return getNetworkParameter(p);
@ -5740,7 +5764,7 @@ int multiSlsDetector::createReceivingDataSockets(const bool destroy){
zmqSocket[i] = 0; zmqSocket[i] = 0;
} }
} }
dataSocketsStarted = false; client_datastream = false;
cout << "Destroyed Receiving Data Socket(s)" << endl; cout << "Destroyed Receiving Data Socket(s)" << endl;
return OK; return OK;
} }
@ -5748,14 +5772,9 @@ int multiSlsDetector::createReceivingDataSockets(const bool destroy){
cprintf(MAGENTA,"Going to create data sockets\n"); cprintf(MAGENTA,"Going to create data sockets\n");
for(int i=0;i<numSockets; ++i){ for(int i=0;i<numSockets; ++i){
uint32_t portnum; uint32_t portnum = 0;
sscanf(detectors[i/numSocketsPerDetector]->getReceiverStreamingPort().c_str(),"%d",&portnum); sscanf(detectors[i/numSocketsPerDetector]->getClientStreamingPort().c_str(),"%d",&portnum);
if (portnum == 0) { portnum += (i%numSocketsPerDetector);
portnum = DEFAULT_ZMQ_PORTNO +
(i/numSocketsPerDetector)*numSocketsPerDetector + (i%numSocketsPerDetector); // *num and /num is not same cuz its integers
}else{
portnum += (i%numSocketsPerDetector);
}
zmqSocket[i] = new ZmqSocket(detectors[i/numSocketsPerDetector]->getReceiver().c_str(), portnum); zmqSocket[i] = new ZmqSocket(detectors[i/numSocketsPerDetector]->getReceiver().c_str(), portnum);
if (zmqSocket[i]->IsError()) { if (zmqSocket[i]->IsError()) {
@ -5766,7 +5785,7 @@ int multiSlsDetector::createReceivingDataSockets(const bool destroy){
printf("Zmq Client[%d] at %s\n",i, zmqSocket[i]->GetZmqServerAddress()); printf("Zmq Client[%d] at %s\n",i, zmqSocket[i]->GetZmqServerAddress());
} }
dataSocketsStarted = true; client_datastream = true;
cout << "Receiving Data Socket(s) created" << endl; cout << "Receiving Data Socket(s) created" << endl;
return OK; return OK;
} }
@ -6340,71 +6359,63 @@ int multiSlsDetector::setReceiverReadTimer(int time_in_ms){
return ret; return ret;
} }
int multiSlsDetector::getStreamingSocketsCreatedInClient() { int multiSlsDetector::enableDataStreamingToClient(int enable) {
return dataSocketsStarted; if(enable >= 0){
//destroy data threads
if (!enable)
createReceivingDataSockets(true);
//create data threads
else {
if(createReceivingDataSockets() == FAIL){
std::cout << "Could not create data threads in client." << std::endl;
//only for the first det as theres no general one
setErrorMask(getErrorMask()|(1<<0));
detectors[0]->setErrorMask((detectors[0]->getErrorMask())|(DATA_STREAMING));
}
}
}
return client_datastream;
} }
int multiSlsDetector::enableDataStreamingFromReceiver(int enable){//cannot parrallize due to serReceiver calling parentdet->enabledatastremain int multiSlsDetector::enableDataStreamingFromReceiver(int enable){
if(enable >= 0){
//create client sockets only if no external gui int ret=-100;
if (!thisMultiDetector->externalgui) { if(!threadpool){
if(enable >= 0){ cout << "Error in creating threadpool. Exiting" << endl;
return -1;
//destroy data threads }else{
if(dataSocketsStarted) //return storage values
createReceivingDataSockets(true); int* iret[thisMultiDetector->numberOfDetectors];
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; ++idet){
//create data threads if(detectors[idet]){
if(enable > 0){ iret[idet]= new int(-1);
if(createReceivingDataSockets() == FAIL){ Task* task = new Task(new func1_t<int,int>(&slsDetector::enableDataStreamingFromReceiver,
std::cout << "Could not create data threads in client. Aborting creating data sockets in receiver" << std::endl; detectors[idet],enable,iret[idet]));
//only for the first det as theres no general one threadpool->add_task(task);
setErrorMask(getErrorMask()|(1<<0)); }
detectors[0]->setErrorMask((detectors[0]->getErrorMask())|(DATA_STREAMING)); }
return -1; threadpool->startExecuting();
threadpool->wait_for_tasks_to_complete();
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; ++idet){
if(detectors[idet]){
if(iret[idet] != NULL){
if (ret==-100)
ret=*iret[idet];
else if (ret!=*iret[idet])
ret=-1;
delete iret[idet];
}else ret=-1;
if(detectors[idet]->getErrorMask())
setErrorMask(getErrorMask()|(1<<idet));
} }
} }
} }
thisMultiDetector->receiver_datastream = ret;
} }
return thisMultiDetector->receiver_datastream;
int ret=-100;
if(!threadpool){
cout << "Error in creating threadpool. Exiting" << endl;
return -1;
}else{
//return storage values
int* iret[thisMultiDetector->numberOfDetectors];
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; ++idet){
if(detectors[idet]){
iret[idet]= new int(-1);
Task* task = new Task(new func1_t<int,int>(&slsDetector::enableDataStreamingFromReceiver,
detectors[idet],enable,iret[idet]));
threadpool->add_task(task);
}
}
threadpool->startExecuting();
threadpool->wait_for_tasks_to_complete();
for(int idet=0; idet<thisMultiDetector->numberOfDetectors; ++idet){
if(detectors[idet]){
if(iret[idet] != NULL){
if (ret==-100)
ret=*iret[idet];
else if (ret!=*iret[idet])
ret=-1;
delete iret[idet];
}else ret=-1;
if(detectors[idet]->getErrorMask())
setErrorMask(getErrorMask()|(1<<idet));
}
}
}
if (!thisMultiDetector->externalgui) {
if (ret != dataSocketsStarted)
ret = -1;
}
return ret;
} }
int multiSlsDetector::enableReceiverCompression(int i){ int multiSlsDetector::enableReceiverCompression(int i){

View File

@ -219,6 +219,9 @@ class multiSlsDetector : public slsDetectorUtils {
/** receiver online flag - is set if the receiver is connected, unset if socket connection is not possible */ /** receiver online flag - is set if the receiver is connected, unset if socket connection is not possible */
int receiverOnlineFlag; int receiverOnlineFlag;
/** data streaming (up stream) enable in receiver */
bool receiver_datastream;
} sharedMultiSlsDetector; } sharedMultiSlsDetector;
@ -1367,17 +1370,19 @@ class multiSlsDetector : public slsDetectorUtils {
/** /**
* Get Streaming sockets created in client from reciever * Enable data streaming to client
/returns 1 if sockets created, else 0 * @param enable 0 to disable, 1 to enable, -1 to get the value
* @returns data streaming to client enable
*/ */
int getStreamingSocketsCreatedInClient(); int enableDataStreamingToClient(int enable=-1);
/** Enable or disable streaming data from receiver to client /** Enable or disable streaming data from receiver to client
* @param enable 0 to disable 1 to enable -1 to only get the value * @param enable 0 to disable 1 to enable -1 to only get the value
* @returns data streaming * @returns data streaming from receiver enable
*/ */
int enableDataStreamingFromReceiver(int enable=-1); int enableDataStreamingFromReceiver(int enable=-1);
/** updates the multidetector offsets */ /** updates the multidetector offsets */
void updateOffsets(); void updateOffsets();
@ -1523,8 +1528,8 @@ private:
/** Ensures if sockets created successfully */ /** data streaming (down stream) enabled in client (zmq sckets created) */
bool dataSocketsStarted; bool client_datastream;
/** ZMQ Socket - Receiver to Client */ /** ZMQ Socket - Receiver to Client */
ZmqSocket* zmqSocket[MAXDET]; ZmqSocket* zmqSocket[MAXDET];

View File

@ -788,6 +788,8 @@ int slsDetector::initializeDetectorSize(detectorType type) {
thisDetector->flippedData[0] = 0; thisDetector->flippedData[0] = 0;
thisDetector->flippedData[1] = 0; thisDetector->flippedData[1] = 0;
thisDetector->zmqport = 0; thisDetector->zmqport = 0;
thisDetector->receiver_zmqport = 0;
thisDetector->receiver_datastream = false;
for (int ia=0; ia<MAX_ACTIONS; ++ia) { for (int ia=0; ia<MAX_ACTIONS; ++ia) {
strcpy(thisDetector->actionScript[ia],"none"); strcpy(thisDetector->actionScript[ia],"none");
@ -932,6 +934,14 @@ int slsDetector::initializeDetectorSize(detectorType type) {
delete thisReceiver; delete thisReceiver;
thisReceiver = new receiverInterface(dataSocket); thisReceiver = new receiverInterface(dataSocket);
// zmq ports
if (posId != -1) {
if (thisDetector->zmqport == 0)
thisDetector->zmqport = DEFAULT_ZMQ_CL_PORTNO + (posId * ((thisDetector->myDetectorType == EIGER) ? 2 : 1));
if (thisDetector->receiver_zmqport == 0)
thisDetector->receiver_zmqport = DEFAULT_ZMQ_RX_PORTNO + (posId * ((thisDetector->myDetectorType == EIGER) ? 2 : 1));
}
// setAngularConversionPointer(thisDetector->angOff,&thisDetector->nMods, thisDetector->nChans*thisDetector->nChips); // setAngularConversionPointer(thisDetector->angOff,&thisDetector->nMods, thisDetector->nChans*thisDetector->nChips);
#ifdef VERBOSE #ifdef VERBOSE
@ -6017,6 +6027,9 @@ string slsDetector::setNetworkParameter(networkParameter index, string value) {
case FLOW_CONTROL_10G: case FLOW_CONTROL_10G:
sscanf(value.c_str(),"%d",&i); sscanf(value.c_str(),"%d",&i);
return setDetectorNetworkParameter(index, i); return setDetectorNetworkParameter(index, i);
case CLIENT_STREAMING_PORT:
setClientStreamingPort(value);
return getClientStreamingPort();
case RECEIVER_STREAMING_PORT: case RECEIVER_STREAMING_PORT:
setReceiverStreamingPort(value); setReceiverStreamingPort(value);
return getReceiverStreamingPort(); return getReceiverStreamingPort();
@ -6052,6 +6065,8 @@ string slsDetector::getNetworkParameter(networkParameter index) {
case DETECTOR_TXN_DELAY_FRAME: case DETECTOR_TXN_DELAY_FRAME:
case FLOW_CONTROL_10G: case FLOW_CONTROL_10G:
return setDetectorNetworkParameter(index, -1); return setDetectorNetworkParameter(index, -1);
case CLIENT_STREAMING_PORT:
return getClientStreamingPort();
case RECEIVER_STREAMING_PORT: case RECEIVER_STREAMING_PORT:
return getReceiverStreamingPort(); return getReceiverStreamingPort();
case RECEIVER_STREAMING_SRC_IP: case RECEIVER_STREAMING_SRC_IP:
@ -6155,9 +6170,10 @@ string slsDetector::setReceiver(string receiverIP){
std::cout << "dynamic range:" << thisDetector->dynamicRange << endl << endl; std::cout << "dynamic range:" << thisDetector->dynamicRange << endl << endl;
std::cout << "flippeddatax:" << thisDetector->flippedData[d] << endl; std::cout << "flippeddatax:" << thisDetector->flippedData[d] << endl;
std::cout << "10GbE:" << thisDetector->tenGigaEnable << endl << endl; std::cout << "10GbE:" << thisDetector->tenGigaEnable << endl << endl;
std::cout << "streaming port:" << thisDetector->zmqport << endl;
std::cout << "streaming source ip:" << thisDetector->zmqsrcip << endl; std::cout << "streaming source ip:" << thisDetector->zmqsrcip << endl;
std::cout << "enable gap pixels:" << thisDetector->gappixels << endl; std::cout << "enable gap pixels:" << thisDetector->gappixels << endl;
std::cout << "rx streaming port:" << thisDetector->receiver_zmqport << endl;
//std::cout << "dataStreaming:" << enableDataStreamingFromReceiver(-1) << endl << endl; //std::cout << "dataStreaming:" << enableDataStreamingFromReceiver(-1) << endl << endl;
/** enable compresison, */ /** enable compresison, */
#endif #endif
@ -6200,20 +6216,7 @@ string slsDetector::setReceiver(string receiverIP){
// data streaming // data streaming
setReceiverStreamingPort(getReceiverStreamingPort()); setReceiverStreamingPort(getReceiverStreamingPort());
setReceiverStreamingSourceIP(getReceiverStreamingSourceIP()); enableDataStreamingFromReceiver(enableDataStreamingFromReceiver(-1));
int clientSockets = parentDet->getStreamingSocketsCreatedInClient();
int recSockets = enableDataStreamingFromReceiver(-1);
if(clientSockets != recSockets) {
pthread_mutex_lock(&ms);
if(clientSockets)
printf("Enabling Data Streaming\n");
else
printf("Disabling Data Streaming\n");
// push client state to receiver
/*parentDet->enableDataStreamingFromReceiver(clientSockets);*/
enableDataStreamingFromReceiver(clientSockets);
pthread_mutex_unlock(&ms);
}
} }
} }
@ -6312,7 +6315,29 @@ int slsDetector::setReceiverUDPPort2(int udpport){
} }
int slsDetector::setReceiverStreamingPort(string port) { string slsDetector::setClientStreamingPort(string port) {
int defaultport = 0;
int numsockets = (thisDetector->myDetectorType == EIGER) ? 2:1;
int arg = 0;
//multi command, calculate individual ports
size_t found = port.find("multi");
if(found != string::npos) {
port.erase(found,5);
sscanf(port.c_str(),"%d",&defaultport);
arg = defaultport + (posId * numsockets);
}
else
sscanf(port.c_str(),"%d",&arg);
thisDetector->zmqport = arg;
return getClientStreamingPort();
}
string slsDetector::setReceiverStreamingPort(string port) {
int defaultport = 0; int defaultport = 0;
int numsockets = (thisDetector->myDetectorType == EIGER) ? 2:1; int numsockets = (thisDetector->myDetectorType == EIGER) ? 2:1;
int arg = 0; int arg = 0;
@ -6340,12 +6365,12 @@ int slsDetector::setReceiverStreamingPort(string port) {
disconnectData(); disconnectData();
} }
if(ret!=FAIL) if(ret!=FAIL)
thisDetector->zmqport = retval; thisDetector->receiver_zmqport = retval;
if(ret==FORCE_UPDATE) if(ret==FORCE_UPDATE)
updateReceiver(); updateReceiver();
} }
return thisDetector->zmqport; return getReceiverStreamingPort();
} }
string slsDetector::setReceiverStreamingSourceIP(string sourceIP) { string slsDetector::setReceiverStreamingSourceIP(string sourceIP) {
@ -8394,9 +8419,13 @@ int slsDetector::updateReceiverNoWait() {
parentDet->enableOverwriteMask(ind); parentDet->enableOverwriteMask(ind);
pthread_mutex_unlock(&ms); pthread_mutex_unlock(&ms);
// streaming port // receiver streaming port
n += dataSocket->ReceiveDataOnly(&ind,sizeof(ind)); n += dataSocket->ReceiveDataOnly(&ind,sizeof(ind));
thisDetector->zmqport = ind; thisDetector->receiver_zmqport = ind;
// receiver streaming enable
n += dataSocket->ReceiveDataOnly(&ind,sizeof(ind));
thisDetector->receiver_datastream = ind;
// streaming source ip // streaming source ip
n += dataSocket->ReceiveDataOnly(path,MAX_STR_LENGTH); n += dataSocket->ReceiveDataOnly(path,MAX_STR_LENGTH);

View File

@ -269,8 +269,12 @@ class slsDetector : public slsDetectorUtils, public energyConversion {
bool acquiringFlag; bool acquiringFlag;
/** flipped data across x or y axis */ /** flipped data across x or y axis */
int flippedData[2]; int flippedData[2];
/** tcp port between receiver and gui (only data) */ /** tcp port from gui/different process to receiver (only data) */
int zmqport; int zmqport;
/** tcp port from receiver to gui/different process (only data) */
int receiver_zmqport;
/** data streaming (up stream) enable in receiver */
bool receiver_datastream;
/** zmq tcp src ip address between receiver and gui (only data) **/ /** zmq tcp src ip address between receiver and gui (only data) **/
char zmqsrcip[MAX_STR_LENGTH]; char zmqsrcip[MAX_STR_LENGTH];
@ -1670,7 +1674,7 @@ class slsDetector : public slsDetectorUtils, public energyConversion {
/** gets the number of frames caught by any one receiver (to avoid using threadpool) /** gets the number of frames caught by any one receiver (to avoid using threadpool)
\returns number of frames caught by any one receiver (master receiver if exists) \returns number of frames caught by any one receiver (master receiver if exists)
*/ */
int getFramesCaughtByAnyReceiver() {getFramesCaughtByReceiver();}; int getFramesCaughtByAnyReceiver() {return getFramesCaughtByReceiver();};
/** gets the current frame index of receiver /** gets the current frame index of receiver
\returns current frame index of receiver \returns current frame index of receiver
@ -1757,8 +1761,10 @@ class slsDetector : public slsDetectorUtils, public energyConversion {
string getReceiverUDPPort() {ostringstream ss; ss << thisDetector->receiverUDPPort; string s = ss.str(); return s;}; string getReceiverUDPPort() {ostringstream ss; ss << thisDetector->receiverUDPPort; string s = ss.str(); return s;};
/** returns the receiver UDP2 for Eiger IP address \sa sharedSlsDetector */ /** returns the receiver UDP2 for Eiger IP address \sa sharedSlsDetector */
string getReceiverUDPPort2() {ostringstream ss; ss << thisDetector->receiverUDPPort2; string s = ss.str(); return s;}; string getReceiverUDPPort2() {ostringstream ss; ss << thisDetector->receiverUDPPort2; string s = ss.str(); return s;};
/** returns the zmq port \sa sharedSlsDetector */ /** returns the client zmq port \sa sharedSlsDetector */
string getReceiverStreamingPort() {ostringstream ss; ss << thisDetector->zmqport; string s = ss.str(); return s;}; string getClientStreamingPort() {ostringstream ss; ss << thisDetector->zmqport; string s = ss.str(); return s;};
/** returns the receiver zmq port \sa sharedSlsDetector */
string getReceiverStreamingPort() {ostringstream ss; ss << thisDetector->receiver_zmqport; string s = ss.str(); return s;};
/** gets the zmq source ip in client and receiver, returns "none" if default setting and no custom ip set*/ /** gets the zmq source ip in client and receiver, returns "none" if default setting and no custom ip set*/
string getReceiverStreamingSourceIP(){return string(thisDetector->zmqsrcip);}; string getReceiverStreamingSourceIP(){return string(thisDetector->zmqsrcip);};
@ -1776,10 +1782,13 @@ class slsDetector : public slsDetectorUtils, public energyConversion {
int setReceiverUDPPort(int udpport); int setReceiverUDPPort(int udpport);
/** sets the receiver udp port2 for Eiger \sa sharedSlsDetector */ /** sets the receiver udp port2 for Eiger \sa sharedSlsDetector */
int setReceiverUDPPort2(int udpport); int setReceiverUDPPort2(int udpport);
/** sets the zmq port in client and receiver (includes "multi" at the end if it should calculate individual ports \sa sharedSlsDetector */ /** sets the zmq port in client (includes "multi" at the end if it should calculate individual ports \sa sharedSlsDetector */
int setReceiverStreamingPort(string port); string setClientStreamingPort(string port);
/** sets the zmq port in receiver (includes "multi" at the end if it should calculate individual ports \sa sharedSlsDetector */
string setReceiverStreamingPort(string port);
/** sets the zmq source ip in client and receiver */ /** sets the zmq source ip in client and receiver */
string setReceiverStreamingSourceIP(string sourceIP); string setReceiverStreamingSourceIP(string sourceIP);
/** sets the transmission delay for left or right port or for an entire frame*/ /** sets the transmission delay for left or right port or for an entire frame*/
string setDetectorNetworkParameter(networkParameter index, int delay); string setDetectorNetworkParameter(networkParameter index, int delay);
@ -1802,8 +1811,8 @@ class slsDetector : public slsDetectorUtils, public energyConversion {
int setReceiverReadTimer(int time_in_ms=500); int setReceiverReadTimer(int time_in_ms=500);
/** Enable or disable streaming data from receiver to client /** Enable or disable streaming data from receiver to client
* @param enable 0 to disable 1 to enable -1 to only get the value * @param enable 0 to disable 1 to enable -1 to only get the value
* @returns data streaming * @returns data streaming from receiver enable
*/ */
int enableDataStreamingFromReceiver(int enable=-1); int enableDataStreamingFromReceiver(int enable=-1);

View File

@ -425,7 +425,6 @@ class slsDetectorBase : public virtual slsDetectorDefs, public virtual errorDef
*/ */
int setHighVoltage(int val){return setDAC(val, HV_NEW, 0, -1);} \ int setHighVoltage(int val){return setDAC(val, HV_NEW, 0, -1);} \
/** /**
set dacs value set dacs value
\param val value \param val value

View File

@ -255,13 +255,6 @@ slsDetectorCommand::slsDetectorCommand(slsDetectorUtils *det) {
commands to configure detector data structure commands to configure detector data structure
*/ */
/*! \page config
- <b> externalgui </b>sets/gets external gui flag. 1 sets and enables the 0MQ data stream (0MQ threads created) from receiver to client, while 0 unsets and disables. \c Returns \c (int)
*/
descrToFuncMap[i].m_pFuncName="externalgui"; //
descrToFuncMap[i].m_pFuncPtr=&slsDetectorCommand::cmdDataStream;
++i;
/*! \page config /*! \page config
- \b free Free shared memory on the control PC - \b free Free shared memory on the control PC
*/ */
@ -1902,12 +1895,26 @@ slsDetectorCommand::slsDetectorCommand(slsDetectorUtils *det) {
++i; ++i;
/*! \page network /*! \page network
- <b>zmqport [port]</b> sets/gets the 0MQ (TCP) port of the receiver from where data is streamed to the client. Use single-detector command to set individually or multi-detector command to calculate based on \c port for the rest. \c Returns \c (int) - <b>zmqport [port]</b> sets/gets the 0MQ (TCP) port of the client to where final data is streamed to (eg. for GUI). Use single-detector command to set individually or multi-detector command to calculate based on \c port for the rest. \c Returns \c (int)
*/ */
descrToFuncMap[i].m_pFuncName="zmqport"; // descrToFuncMap[i].m_pFuncName="zmqport"; //
descrToFuncMap[i].m_pFuncPtr=&slsDetectorCommand::cmdNetworkParameter; descrToFuncMap[i].m_pFuncPtr=&slsDetectorCommand::cmdNetworkParameter;
++i; ++i;
/*! \page network
- <b>rx_zmqport [port]</b> sets/gets the 0MQ (TCP) port of the receiver from where data is streamed from (eg. to GUI or another process for further processing). Use single-detector command to set individually or multi-detector command to calculate based on \c port for the rest. \c Returns \c (int)
*/
descrToFuncMap[i].m_pFuncName="rx_zmqport"; //
descrToFuncMap[i].m_pFuncPtr=&slsDetectorCommand::cmdNetworkParameter;
++i;
/*! \page network
- <b> rx_datastream </b>enables/disables data streaming from receiver. 1 enables 0MQ data stream from receiver (creates streamer threads), while 0 disables (destroys streamer threads). \c Returns \c (int)
*/
descrToFuncMap[i].m_pFuncName="rx_datastream"; //
descrToFuncMap[i].m_pFuncPtr=&slsDetectorCommand::cmdDataStream;
++i;
/*! \page network /*! \page network
- <b>zmqsrcip [ip]</b> sets/gets the 0MQ (TCP) ip of the receiver from where data is streamed to the client. Default is ip of rx_hostname. Use this only with external gui. \c Returns \c (string) - <b>zmqsrcip [ip]</b> sets/gets the 0MQ (TCP) ip of the receiver from where data is streamed to the client. Default is ip of rx_hostname. Use this only with external gui. \c Returns \c (string)
*/ */
@ -2318,19 +2325,13 @@ string slsDetectorCommand::cmdAcquire(int narg, char *args[], int action) {
#endif #endif
if (action==HELP_ACTION) {
return helpAcquire(narg,args,HELP_ACTION);
}
myDet->setOnline(ONLINE_FLAG); myDet->setOnline(ONLINE_FLAG);
int r_online = myDet->setReceiverOnline(ONLINE_FLAG); int r_online = myDet->setReceiverOnline(ONLINE_FLAG);
if ((!myDet->getExternalGuiFlag()) && (r_online == ONLINE_FLAG)) {
// command line: must be off, if receiver on or there was -1, then
if (myDet->enableDataStreamingFromReceiver(-1) != 0){
//switch it off, if error
if (myDet->enableDataStreamingFromReceiver(0) != 0) {
return string("could not disable data streaming in receiver\n");
}
}
}
if(myDet->acquire() == FAIL) if(myDet->acquire() == FAIL)
return string("acquire unsuccessful"); return string("acquire unsuccessful");
if(r_online){ if(r_online){
@ -2495,18 +2496,11 @@ string slsDetectorCommand::cmdDataStream(int narg, char *args[], int action) {
if (action==PUT_ACTION) { if (action==PUT_ACTION) {
if (!sscanf(args[1],"%d",&ival)) if (!sscanf(args[1],"%d",&ival))
return string ("cannot scan externalgui mode"); return string ("cannot scan rx_datastream mode");
myDet->setExternalGuiFlag(ival>0?true:false);
myDet->enableDataStreamingFromReceiver(ival); myDet->enableDataStreamingFromReceiver(ival);
} }
int retval = myDet->getExternalGuiFlag(); sprintf(ans,"%d",myDet->enableDataStreamingFromReceiver());
//if external gui on and datastreaming off
if (retval && !myDet->enableDataStreamingFromReceiver()) {
retval=-1;
printf("Error: data streaming in receiver is switched off while external gui flag in shared memory is off.\n");
}
sprintf(ans,"%d",myDet->getExternalGuiFlag());
return string(ans); return string(ans);
} }
@ -2515,9 +2509,9 @@ string slsDetectorCommand::helpDataStream(int narg, char *args[], int action) {
ostringstream os; ostringstream os;
if (action==GET_ACTION || action==HELP_ACTION) if (action==GET_ACTION || action==HELP_ACTION)
os << string("externalgui \t gets external gui flag. 1/0 means the 0MQ data stream (0MQ threads created) from receiver to client is enabled/disabled. -1 for inconsistency. \n"); os << string("rx_datastream \t enables/disables data streaming from receiver. 1 is 0MQ data stream from receiver enabled, while 0 is 0MQ disabled. -1 for inconsistency between multiple receivers. \n");
if (action==PUT_ACTION || action==HELP_ACTION) if (action==PUT_ACTION || action==HELP_ACTION)
os << string("externalgui i\t sets external gui flag. 1/0 means the 0MQ data stream (0MQ threads created) from receiver to client is enabled/disabled. \n"); os << string("rx_datastream i\t enables/disables data streaming from receiver. i is 1 enables 0MQ data stream from receiver (creates streamer threads), while 0 disables (destroys streamer threads). \n");
return os.str(); return os.str();
} }
@ -3958,6 +3952,12 @@ string slsDetectorCommand::cmdNetworkParameter(int narg, char *args[], int actio
return ("cannot parse argument") + string(args[1]); return ("cannot parse argument") + string(args[1]);
} }
}else if (cmd=="zmqport") { }else if (cmd=="zmqport") {
t=CLIENT_STREAMING_PORT;
if (action==PUT_ACTION){
if (!(sscanf(args[1],"%d",&i)))
return ("cannot parse argument") + string(args[1]);
}
}else if (cmd=="rx_zmqport") {
t=RECEIVER_STREAMING_PORT; t=RECEIVER_STREAMING_PORT;
if (action==PUT_ACTION){ if (action==PUT_ACTION){
if (!(sscanf(args[1],"%d",&i))) if (!(sscanf(args[1],"%d",&i)))
@ -3991,7 +3991,8 @@ string slsDetectorCommand::helpNetworkParameter(int narg, char *args[], int acti
os << "txndelay_right port \n sets detector transmission delay of the right port"<< std::endl; os << "txndelay_right port \n sets detector transmission delay of the right port"<< std::endl;
os << "txndelay_frame port \n sets detector transmission delay of the entire frame"<< std::endl; os << "txndelay_frame port \n sets detector transmission delay of the entire frame"<< std::endl;
os << "flowcontrol_10g port \n sets flow control for 10g for eiger"<< std::endl; os << "flowcontrol_10g port \n sets flow control for 10g for eiger"<< std::endl;
os << "zmqport port \n sets zmq port (data from receiver to client); setting via multidetector command calculates port for individual detectors"<< std::endl; os << "zmqport port \n sets zmq port (data to client from receiver/different process); setting via multidetector command calculates port for individual detectors"<< std::endl;
os << "rx_zmqport port \n sets zmq port (data from receiver to client/different process); setting via multidetector command calculates port for individual detectors"<< std::endl;
os << "zmqsrcip ip \n sets/gets the 0MQ (TCP) ip of the receiver from where data is streamed to the client. Default is ip of rx_hostname. Use this only with external gui." << std::endl; os << "zmqsrcip ip \n sets/gets the 0MQ (TCP) ip of the receiver from where data is streamed to the client. Default is ip of rx_hostname. Use this only with external gui." << std::endl;
} }
if (action==GET_ACTION || action==HELP_ACTION) { if (action==GET_ACTION || action==HELP_ACTION) {
@ -4005,7 +4006,8 @@ string slsDetectorCommand::helpNetworkParameter(int narg, char *args[], int acti
os << "txndelay_right \n gets detector transmission delay of the right port"<< std::endl; os << "txndelay_right \n gets detector transmission delay of the right port"<< std::endl;
os << "txndelay_frame \n gets detector transmission delay of the entire frame"<< std::endl; os << "txndelay_frame \n gets detector transmission delay of the entire frame"<< std::endl;
os << "flowcontrol_10g \n gets flow control for 10g for eiger"<< std::endl; os << "flowcontrol_10g \n gets flow control for 10g for eiger"<< std::endl;
os << "zmqport \n gets zmq port (data from receiver to client)"<< std::endl; os << "zmqport \n gets zmq port (data to client from receiver/different process)"<< std::endl;
os << "rx_zmqport \n gets zmq port (data from receiver to client/different process)"<< std::endl;
os << "zmqsrcip \n gets zmq source ip (data from receiver to client), none if default setting and no custom ip" << std::endl; os << "zmqsrcip \n gets zmq source ip (data from receiver to client), none if default setting and no custom ip" << std::endl;
} }
return os.str(); return os.str();
@ -5916,33 +5918,14 @@ string slsDetectorCommand::cmdReceiver(int narg, char *args[], int action) {
myDet->setOnline(ONLINE_FLAG); myDet->setOnline(ONLINE_FLAG);
int receivers = myDet->setReceiverOnline(ONLINE_FLAG); myDet->setReceiverOnline(ONLINE_FLAG);
if(cmd=="receiver"){ if(cmd=="receiver"){
if (action==PUT_ACTION) { if (action==PUT_ACTION) {
if(!strcasecmp(args[1],"start")) { if(!strcasecmp(args[1],"start"))
//to ensure data streaming enable is the same across client and receiver
if ((!myDet->getExternalGuiFlag()) && (receivers == ONLINE_FLAG)) {
//if it was not off
if (myDet->enableDataStreamingFromReceiver(-1) != 0){
//switch it off, if error
if (myDet->enableDataStreamingFromReceiver(0) != 0) {
return string("could not disable data streaming in receiver\n");
}
}
}
myDet->startReceiver(); myDet->startReceiver();
} else if(!strcasecmp(args[1],"stop"))
else if(!strcasecmp(args[1],"stop")){
//myDet->stopReceiver();
// myDet->startReceiverReadout();
/*runStatus s = myDet->getReceiverStatus();
while(s != RUN_FINISHED){
usleep(50000);
s = myDet->getReceiverStatus();
}*/
myDet->stopReceiver(); myDet->stopReceiver();
}
else else
return helpReceiver(narg, args, action); return helpReceiver(narg, args, action);
} }

View File

@ -227,6 +227,18 @@ int slsDetectorUsers::enableDataStreamingFromReceiver(int i){
return myDetector->enableDataStreamingFromReceiver(i); return myDetector->enableDataStreamingFromReceiver(i);
} }
int slsDetectorUsers::enableDataStreamingToClient(int i){
return myDetector->enableDataStreamingToClient(i);
}
int slsDetectorUsers::setReceiverDataStreamingOutPort(int i){
return myDetector->setReceiverDataStreamingOutPort(i);
}
int slsDetectorUsers::setClientDataStreamingInPort(int i){
return myDetector->setClientDataStreamingInPort(i);
}
int64_t slsDetectorUsers::getModuleFirmwareVersion(){ int64_t slsDetectorUsers::getModuleFirmwareVersion(){
return myDetector->getModuleFirmwareVersion(); return myDetector->getModuleFirmwareVersion();
} }

View File

@ -439,13 +439,34 @@ class slsDetectorUsers
virtual void finalizeDataset(double *a, double *v, double *e, int &np); virtual void finalizeDataset(double *a, double *v, double *e, int &np);
/** /** Enable or disable streaming data from receiver (creates transmitting sockets)
Enable data streaming from receiver (zmq) * @param enable 0 to disable 1 to enable -1 to only get the value
\param i 1 to set, 0 to reset and -1 to get * @returns data streaming from receiver enable
\returns data streaming enable */
*/
int enableDataStreamingFromReceiver(int i=-1); int enableDataStreamingFromReceiver(int i=-1);
/**
* Enable data streaming to client (creates receiving sockets)
* @param i 0 to disable, 1 to enable, -1 to get the value
* @returns data streaming to client enable
*/
int enableDataStreamingToClient(int i=-1);
/**
* Set/Get receiver streaming out ZMQ port
* For multi modules, it calculates (increments) and sets the ports
* @param i sets, -1 gets
* @returns receiver streaming out ZMQ port ()
*/
int setReceiverDataStreamingOutPort(int i=-1);
/**
* Set/Get client streaming in ZMQ port
* For multi modules, it calculates (increments) and sets the ports
* @param i sets, -1 gets
* @returns client streaming in ZMQ port
*/
int setClientDataStreamingInPort(int i=-1);
/** /**
get get Module Firmware Version get get Module Firmware Version

View File

@ -30,6 +30,7 @@ extern "C" {
#include <queue> #include <queue>
#include <math.h> #include <math.h>
#include <semaphore.h> #include <semaphore.h>
#include <cstdlib>
using namespace std; using namespace std;
@ -99,7 +100,32 @@ class slsDetectorUtils : public slsDetectorActions, public postProcessing {
int enableFlatFieldCorrection(int i=-1) {if (i>0) setFlatFieldCorrectionFile("default"); else if (i==0) setFlatFieldCorrectionFile(""); return getFlatFieldCorrection();}; int enableFlatFieldCorrection(int i=-1) {if (i>0) setFlatFieldCorrectionFile("default"); else if (i==0) setFlatFieldCorrectionFile(""); return getFlatFieldCorrection();};
int enablePixelMaskCorrection(int i=-1) {if (i>0) setBadChannelCorrection("default"); else if (i==0) setBadChannelCorrection(""); return getBadChannelCorrection();}; int enablePixelMaskCorrection(int i=-1) {if (i>0) setBadChannelCorrection("default"); else if (i==0) setBadChannelCorrection(""); return getBadChannelCorrection();};
int enableCountRateCorrection(int i=-1) {if (i>0) setRateCorrection(i); else if (i==0) setRateCorrection(0); return getRateCorrection();}; int enableCountRateCorrection(int i=-1) {if (i>0) setRateCorrection(i); else if (i==0) setRateCorrection(0); return getRateCorrection();};
// string getFilePath(){return fileIO::getFilePath();};;
/**
* Set/Get receiver streaming out ZMQ port
* For multi modules, it calculates (increments) and sets the ports
* @param i sets, -1 gets
* @returns receiver streaming out ZMQ port
*/
int setReceiverDataStreamingOutPort(int i) { \
if (i >= 0) { ostringstream ss; ss << i; string s = ss.str(); \
setNetworkParameter(RECEIVER_STREAMING_PORT, s);} \
return atoi(getNetworkParameter(RECEIVER_STREAMING_PORT).c_str());}; \
/**
* Set/Get client streaming in ZMQ port
* For multi modules, it calculates (increments) and sets the ports
* @param i sets, -1 gets
* @returns client streaming in ZMQ port
*/
int setClientDataStreamingInPort(int i){ \
if (i >= 0) { ostringstream ss; ss << i; string s = ss.str(); \
setNetworkParameter(CLIENT_STREAMING_PORT, s);} \
return atoi(getNetworkParameter(CLIENT_STREAMING_PORT).c_str());}; \
// string getFilePath(){return fileIO::getFilePath();};;
// string setFilePath(string s){return fileIO::setFilePath(s);}; // string setFilePath(string s){return fileIO::setFilePath(s);};
// string getFileName(){return fileIO::getFileName();}; // string getFileName(){return fileIO::getFileName();};
@ -760,9 +786,9 @@ virtual ROI* getROI(int &n)=0;
*/ */
virtual int setReadReceiverFrequency(int getFromReceiver, int freq=-1)=0; virtual int setReadReceiverFrequency(int getFromReceiver, int freq=-1)=0;
/** Enable or disable streaming of data from receiver to client /** Enable or disable streaming data from receiver to client
* @param enable 0 to disable 1 to enable -1 to only get the value * @param enable 0 to disable 1 to enable -1 to only get the value
* @returns data streaming * @returns data streaming from receiver enable
*/ */
virtual int enableDataStreamingFromReceiver(int enable=-1)=0; virtual int enableDataStreamingFromReceiver(int enable=-1)=0;