jungfrau: switching between 2 and 1 interface, implementation for server required when firmware done

This commit is contained in:
2019-03-26 15:00:19 +01:00
parent cd5aea895b
commit 7cd5bc8b2d
26 changed files with 1316 additions and 301 deletions

View File

@ -81,8 +81,9 @@ void slsReceiverImplementation::InitializeMembers() {
silentMode = false;
//***connection parameters***
strcpy(eth,"");
numUDPInterfaces = 1;
for(int i=0;i<MAX_NUMBER_OF_LISTENING_THREADS;i++) {
strcpy(eth[i], "");
udpPortNum[i] = DEFAULT_UDP_PORTNO + i;
}
udpSocketBufferSize = 0;
@ -268,7 +269,17 @@ uint32_t slsReceiverImplementation::getUDPPortNumber2() const{
std::string slsReceiverImplementation::getEthernetInterface() const{
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
return std::string(eth);
return std::string(eth[0]);
}
std::string slsReceiverImplementation::getEthernetInterface2() const{
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
return std::string(eth[1]);
}
int slsReceiverImplementation::getNumberofUDPInterfaces() const{
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
return numUDPInterfaces;
}
@ -402,10 +413,14 @@ void slsReceiverImplementation::setDetectorHostname(const char *c) {
void slsReceiverImplementation::setMultiDetectorSize(const int* size) {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
std::string log_message = "Detector Size: (";
std::string log_message = "Detector Size (ports): (";
for (int i = 0; i < MAX_DIMENSIONS; ++i) {
if (myDetectorType == EIGER && (!i))
numDet[i] = size[i]*2;
// x dir (colums) each udp port
if (myDetectorType == EIGER && i == X)
numDet[i] = size[i] * 2;
// y dir (rows) each udp port
else if (numUDPInterfaces == 2 && i == Y)
numDet[i] = size[i] * 2;
else
numDet[i] = size[i];
log_message += std::to_string(numDet[i]);
@ -606,14 +621,116 @@ void slsReceiverImplementation::setUDPPortNumber2(const uint32_t i) {
void slsReceiverImplementation::setEthernetInterface(const char* c) {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
strcpy(eth, c);
FILE_LOG(logINFO) << "Ethernet Interface: " << eth;
strcpy(eth[0], c);
FILE_LOG(logINFO) << "Ethernet Interface: " << eth[0];
}
void slsReceiverImplementation::setEthernetInterface2(const char* c) {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
strcpy(eth[1], c);
FILE_LOG(logINFO) << "Ethernet Interface 2: " << eth[1];
}
int slsReceiverImplementation::setNumberofUDPInterfaces(const int n) {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
if (numUDPInterfaces != n) {
// reduce number of detectors in y dir (rows) if it had 2 interfaces before
if (numUDPInterfaces == 2)
numDet[Y] /= 2;
numUDPInterfaces = n;
// clear all threads and fifos
listener.clear();
dataProcessor.clear();
dataStreamer.clear();
fifo.clear();
// set local variables
generalData->SetNumberofInterfaces(n);
numThreads = generalData->threadsPerReceiver;
udpSocketBufferSize = generalData->defaultUdpSocketBufferSize;
// fifo
if (SetupFifoStructure() == FAIL)
return FAIL;
//create threads
for ( int i = 0; i < numThreads; ++i ) {
// listener and dataprocessor threads
try {
auto fifo_ptr = fifo[i].get();
listener.push_back(sls::make_unique<Listener>(i, myDetectorType, fifo_ptr, &status,
&udpPortNum[i], eth[i], &numberOfFrames, &dynamicRange,
&udpSocketBufferSize, &actualUDPSocketBufferSize, &framesPerFile,
&frameDiscardMode, &activated, &deactivatedPaddingEnable, &silentMode));
listener[i]->SetGeneralData(generalData);
dataProcessor.push_back(sls::make_unique<DataProcessor>(i, myDetectorType, fifo_ptr, &fileFormatType,
fileWriteEnable, &dataStreamEnable, &gapPixelsEnable,
&dynamicRange, &streamingFrequency, &streamingTimerInMs,
&framePadding, &activated, &deactivatedPaddingEnable, &silentMode));
dataProcessor[i]->SetGeneralData(generalData);
}
catch (...) {
FILE_LOG(logERROR) << "Could not create listener/dataprocessor threads (index:" << i << ")";
listener.clear();
dataProcessor.clear();
return FAIL;
}
// streamer threads
if (dataStreamEnable) {
try {
dataStreamer.push_back(sls::make_unique<DataStreamer>(i, fifo[i].get(), &dynamicRange,
&roi, &fileIndex, flippedData, additionalJsonHeader));
dataStreamer[i]->SetGeneralData(generalData);
dataStreamer[i]->CreateZmqSockets(&numThreads, streamingPort, streamingSrcIP);
} catch(...) {
FILE_LOG(logERROR) << "Could not create datastreamer threads (index:" << i << ")";
if (dataStreamEnable) {
dataStreamer.clear();
dataStreamEnable = false;
}
return FAIL;
}
}
}
SetThreadPriorities();
// update (from 1 to 2 interface) & also for printout
setMultiDetectorSize(numDet);
// update row and column in dataprocessor
setDetectorPositionId(detID);
// test socket buffer size with current set up
if (setUDPSocketBufferSize(0) == FAIL) {
return FAIL;
}
}
FILE_LOG(logINFO) << "Number of Interfaces: " << numUDPInterfaces;
return OK;
}
int slsReceiverImplementation::setUDPSocketBufferSize(const int64_t s) {
if (listener.size())
return listener[0]->CreateDummySocketForUDPSocketBufferSize(s);
return FAIL;
int64_t size = (s == 0) ? udpSocketBufferSize : s;
size_t listSize = listener.size();
if ((int)listSize != numUDPInterfaces)
return FAIL;
for (unsigned int i = 0; i < listSize; ++i) {
if (listener[i]->CreateDummySocketForUDPSocketBufferSize(size) == FAIL)
return FAIL;
}
return OK;
}
@ -940,7 +1057,7 @@ int slsReceiverImplementation::setDetectorType(const detectorType d) {
try {
auto fifo_ptr = fifo[i].get();
listener.push_back(sls::make_unique<Listener>(i, myDetectorType, fifo_ptr, &status,
&udpPortNum[i], eth, &numberOfFrames, &dynamicRange,
&udpPortNum[i], eth[i], &numberOfFrames, &dynamicRange,
&udpSocketBufferSize, &actualUDPSocketBufferSize, &framesPerFile,
&frameDiscardMode, &activated, &deactivatedPaddingEnable, &silentMode));
dataProcessor.push_back(sls::make_unique<DataProcessor>(i, myDetectorType, fifo_ptr, &fileFormatType,
@ -964,9 +1081,6 @@ int slsReceiverImplementation::setDetectorType(const detectorType d) {
it->SetGeneralData(generalData);
SetThreadPriorities();
// check udp socket buffer size
setUDPSocketBufferSize(udpSocketBufferSize);
FILE_LOG(logDEBUG) << " Detector type set to " << detectorTypeToString(d);
return OK;
}
@ -974,9 +1088,9 @@ int slsReceiverImplementation::setDetectorType(const detectorType d) {
void slsReceiverImplementation::setDetectorPositionId(const int i) {
void slsReceiverImplementation::setDetectorPositionId(const int id) {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
detID = i;
detID = id;
FILE_LOG(logINFO) << "Detector Position Id:" << detID;
for (unsigned int i = 0; i < dataProcessor.size(); ++i) {
dataProcessor[i]->SetupFileWriter(fileWriteEnable, (int*)numDet,
@ -987,7 +1101,7 @@ void slsReceiverImplementation::setDetectorPositionId(const int i) {
for (unsigned int i = 0; i < listener.size(); ++i) {
uint16_t row = 0, col = 0;
row = detID % numDet[1]; // row
row = (detID % numDet[1]) * ((numUDPInterfaces == 2) ? 2 : 1); // row
col = (detID / numDet[1]) * ((myDetectorType == EIGER) ? 2 : 1) + i; // col for horiz. udp ports
listener[i]->SetHardCodedPosition(row, col);
}
@ -1112,6 +1226,7 @@ void slsReceiverImplementation::stopReceiver() {
int64_t missingpackets = numberOfFrames*generalData->packetsPerFrame-listener[i]->GetPacketsCaught();
TLogLevel lev = (((int64_t)missingpackets) > 0) ? logINFORED : logINFOGREEN;
FILE_LOG(lev) <<
// udp port number could be the second if selected interface is 2 for jungfrau
"Summary of Port " << udpPortNum[i] <<
"\n\tMissing Packets\t\t: " << missingpackets <<
"\n\tComplete Frames\t\t: " << dataProcessor[i]->GetNumFramesCaught() <<
@ -1328,11 +1443,15 @@ void slsReceiverImplementation::ResetParametersforNewMeasurement() {
int slsReceiverImplementation::CreateUDPSockets() {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
bool error = false;
for (unsigned int i = 0; i < listener.size(); ++i)
for (unsigned int i = 0; i < listener.size(); ++i) {
if (listener[i]->CreateUDPSockets() == FAIL) {
error = true;
break;
}
}
if (error) {
shutDownUDPSockets();
return FAIL;

View File

@ -710,8 +710,8 @@ int slsReceiverTCPIPInterface::set_roi() {
int slsReceiverTCPIPInterface::setup_udp(){
ret = OK;
memset(mess, 0, sizeof(mess));
char args[3][MAX_STR_LENGTH] = {{""}, {""}, {""}};
char retvals[MAX_STR_LENGTH] = {""};
char args[6][MAX_STR_LENGTH] = {{""}, {""}, {""}, {""}, {""}, {""}};
char retvals[2][MAX_STR_LENGTH] = {{""}, {""}};
// get args, return if socket crashed, ret is fail if receiver is not null
if (interface->Server_ReceiveArg(ret, mess, args, sizeof(args), true, receiver) == FAIL)
@ -722,48 +722,108 @@ int slsReceiverTCPIPInterface::setup_udp(){
// only set
// verify if receiver is unlocked and idle
if (interface->Server_VerifyLockAndIdle(ret, mess, lockStatus, receiver->getStatus(), fnum) == OK) {
//set up udp port
int udpport=-1,udpport2=-1;
sscanf(args[1],"%d",&udpport);
sscanf(args[2],"%d",&udpport2);
receiver->setUDPPortNumber(udpport);
if (myDetectorType == EIGER)
receiver->setUDPPortNumber2(udpport2);
//setup udpip
//get ethernet interface or IP to listen to
FILE_LOG(logINFO) << "Receiver UDP IP: " << args[0];
std::string temp = genericSocket::ipToName(args[0]);
//setup interfaces count
int numInterfaces = atoi(args[0]) > 1 ? 2 : 1;
int selInterface = atoi(args[1]) > 1 ? 2 : 1;
char* ip1 = args[2];
char* ip2 = args[3];
uint32_t port1 = atoi(args[4]);
uint32_t port2 = atoi(args[5]);
// using the 2nd interface only
if (numInterfaces == 1 && selInterface == 2) {
ip1 = ip2;
port1 = port2;
}
// 1st interface
receiver->setUDPPortNumber(port1);
FILE_LOG(logINFO) << "Receiver UDP IP: " << ip1;
// get eth
std::string temp = genericSocket::ipToName(ip1);
if (temp == "none"){
ret = FAIL;
strcpy(mess, "Failed to get ethernet interface or IP\n");
strcpy(mess, "Failed to get ethernet interface or IP \n");
FILE_LOG(logERROR) << mess;
}
else {
char eth[MAX_STR_LENGTH];
memset(eth,0,sizeof(eth));
strcpy(eth,temp.c_str());
if (strchr(eth,'.') != nullptr) {
strcpy(eth,"");
} else {
char eth[MAX_STR_LENGTH] = {""};
memset(eth, 0, MAX_STR_LENGTH);
strcpy(eth, temp.c_str());
// if there is a dot in eth name
if (strchr(eth, '.') != nullptr) {
strcpy(eth, "");
ret = FAIL;
strcpy(mess, "Failed to get ethernet interface\n");
sprintf(mess, "Failed to get ethernet interface from IP. Got %s\n", temp.c_str());
FILE_LOG(logERROR) << mess;
}
receiver->setEthernetInterface(eth);
//get mac address from ethernet interface
if (ret != FAIL)
//get mac address
if (ret != FAIL) {
temp = genericSocket::nameToMac(eth);
if (temp=="00:00:00:00:00:00") {
ret = FAIL;
strcpy(mess,"failed to get mac adddress to listen to\n");
FILE_LOG(logERROR) << mess;
} else {
// using the 2nd interface only
if (numInterfaces == 1 && selInterface == 2) {
strcpy(retvals[1],temp.c_str());
FILE_LOG(logINFO) << "Receiver MAC Address: " << retvals[1];
}
else {
strcpy(retvals[0],temp.c_str());
FILE_LOG(logINFO) << "Receiver MAC Address: " << retvals[0];
}
}
}
}
if ((temp=="00:00:00:00:00:00") || (ret == FAIL)){
// 2nd interface
if (numInterfaces == 2) {
receiver->setUDPPortNumber2(port2);
FILE_LOG(logINFO) << "Receiver UDP IP 2: " << ip2;
// get eth
std::string temp = genericSocket::ipToName(ip2);
if (temp == "none"){
ret = FAIL;
strcpy(mess,"failed to get mac adddress to listen to\n");
strcpy(mess, "Failed to get 2nd ethernet interface or IP \n");
FILE_LOG(logERROR) << mess;
} else {
char eth[MAX_STR_LENGTH] = {""};
memset(eth, 0, MAX_STR_LENGTH);
strcpy(eth, temp.c_str());
// if there is a dot in eth name
if (strchr(eth, '.') != nullptr) {
strcpy(eth, "");
ret = FAIL;
sprintf(mess, "Failed to get 2nd ethernet interface from IP. Got %s\n", temp.c_str());
FILE_LOG(logERROR) << mess;
}
receiver->setEthernetInterface2(eth);
//get mac address
if (ret != FAIL) {
temp = genericSocket::nameToMac(eth);
if (temp=="00:00:00:00:00:00") {
ret = FAIL;
strcpy(mess,"failed to get 2nd mac adddress to listen to\n");
FILE_LOG(logERROR) << mess;
} else {
strcpy(retvals[1],temp.c_str());
FILE_LOG(logINFO) << "Receiver MAC Address 2: " << retvals[1];
}
}
}
else {
strcpy(retvals,temp.c_str());
FILE_LOG(logINFO) << "Receiver MAC Address: " << retvals;
}
}
// set the number of udp interfaces (changes number of threads and many others)
if (receiver->setNumberofUDPInterfaces(numInterfaces) == FAIL) {
ret = FAIL;
sprintf(mess, "Failed to set number of interfaces\n");
FILE_LOG(logERROR) << mess;
}
}
}
@ -1752,7 +1812,8 @@ int slsReceiverTCPIPInterface::set_udp_socket_buffer_size() {
}
// get
retval = receiver->getUDPSocketBufferSize();
validate(index, retval, std::string("set udp socket buffer size (No CAP_NET_ADMIN privileges?)"), DEC);
if (index != 0)
validate(index, retval, std::string("set udp socket buffer size (No CAP_NET_ADMIN privileges?)"), DEC);
FILE_LOG(logDEBUG1) << "UDP Socket Buffer Size:" << retval;
}
return interface->Server_SendResult(true, ret, &retval, sizeof(retval), mess);