Compare commits

...

27 Commits

Author SHA1 Message Date
337e56d9bf cleaned up LTO detection 2020-04-23 08:23:36 +02:00
eb257154c6 added triggers 2020-04-22 09:15:31 +02:00
c1ae67ac46 Small refactor on ThreadObject and listener (#93)
* removed pointer, slight cleaning of loop

* removed semaphore, use getters

* removed redundant log msg

* removed comment

* added const

* removed comment

* changed header
2020-04-21 09:45:29 +02:00
68f76e5356 more like UdpRxSocket 2020-04-20 17:24:53 +02:00
8afa11ed33 removed pointer to server socket 2020-04-20 17:20:33 +02:00
bc389f4825 moved data members to top 2020-04-20 14:51:48 +02:00
095ced153c removed need for pointer 2020-04-20 14:31:10 +02:00
a1a5a20845 from thread sanitizer 2020-04-17 09:35:38 +02:00
c725a05ef8 fix RH7 2020-04-16 15:51:28 +02:00
815b6a37aa moved flag to base class 2020-04-16 13:22:51 +02:00
655a410d43 cleaned up UdpRxSocket 2020-04-16 09:45:44 +02:00
97ba81d923 fixed test 2020-04-14 16:58:37 +02:00
3d00eed0f0 Change SetTrimbits() and SaveAllTrimbits() to rely on top/bottom signal instead of TopAddressIsValid() for further cleanup. 2020-04-14 16:13:26 +02:00
a7f5300455 Merge pull request #92 from slsdetectorgroup/setrxhostname
Setrxhostname
2020-04-09 11:52:46 +02:00
2f33a1a479 updated binaries 2020-04-09 09:35:46 +02:00
39fa5e0185 client recieve rx parameters as a struct 2020-04-09 09:34:20 +02:00
ba4985ed4d Merge branch 'developer' of github.com:slsdetectorgroup/slsDetectorPackage into developer 2020-04-09 09:32:09 +02:00
f811c065d1 corrected a delete [] in multiThreadedAnalogDetector 2020-04-09 09:31:43 +02:00
3a1d87728c updated client api 2020-04-09 08:38:37 +02:00
0652ff6b5a updated binaries 2020-04-09 08:37:27 +02:00
373e177274 WIP 2020-04-09 08:35:30 +02:00
6dd6685e7d minor 2020-04-09 08:24:16 +02:00
38c31fdada WIP 2020-04-08 20:27:10 +02:00
b3fe0e79bc WIP 2020-04-08 16:43:28 +02:00
215e4a56fd Merge branch 'developer' into setrxhostname 2020-04-08 12:00:21 +02:00
71a68c2022 eiger transmitting fix in 10g mode (stop servers informed about 10g mode) 2020-04-08 11:58:59 +02:00
55f8497eac WIP 2020-04-08 11:05:06 +02:00
44 changed files with 1125 additions and 1057 deletions

View File

@ -46,6 +46,11 @@ option(SLS_BUILD_DOCS "docs" OFF)
option(SLS_BUILD_EXAMPLES "examples" OFF) option(SLS_BUILD_EXAMPLES "examples" OFF)
option(SLS_TUNE_LOCAL "tune to local machine" OFF) option(SLS_TUNE_LOCAL "tune to local machine" OFF)
#Enable LTO if available
check_ipo_supported(RESULT SLS_LTO_AVAILABLE)
# Use ld.gold if it is available and isn't disabled explicitly # Use ld.gold if it is available and isn't disabled explicitly
option(SLS_USE_LD_GOLD "Use GNU gold linker" ON) option(SLS_USE_LD_GOLD "Use GNU gold linker" ON)
if (SLS_USE_LD_GOLD) if (SLS_USE_LD_GOLD)

View File

@ -160,6 +160,13 @@ class Detector(CppDetectorApi):
def frames(self, n_frames): def frames(self, n_frames):
self.setNumberOfFrames(n_frames) self.setNumberOfFrames(n_frames)
@property
def triggers(self):
return element_if_equal(self.getNumberOfTriggers())
@triggers.setter
def triggers(self, n_triggers):
self.setNumberOfTriggers(n_triggers)
@property @property
def exptime(self): def exptime(self):

View File

@ -320,7 +320,7 @@ public:
// int nnx, nny, ns; // int nnx, nny, ns;
int nn=dets[0]->getImageSize(nnx, nny,ns, nsy); int nn=dets[0]->getImageSize(nnx, nny,ns, nsy);
if (image) { if (image) {
delete image; delete [] image;
image=NULL; image=NULL;
} }
image=new int[nn]; image=new int[nn];

View File

@ -429,7 +429,7 @@ int *getClusters(char *data, int *ph=NULL) {
max=*v; max=*v;
} }
} }
} }

View File

@ -898,7 +898,7 @@ int Feb_Control_SendDACValue(unsigned int dst_num, unsigned int ch, unsigned int
int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int *trimbits) { int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int *trimbits, int top) {
LOG(logINFO, ("Setting Trimbits\n")); LOG(logINFO, ("Setting Trimbits\n"));
//for (int iy=10000;iy<20020;++iy)//263681 //for (int iy=10000;iy<20020;++iy)//263681
@ -963,7 +963,7 @@ int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int *trimbits) {
int i; int i;
for(i=0;i<8;i++) { // column loop i for(i=0;i<8;i++) { // column loop i
if (Module_TopAddressIsValid(&modules[1])) { if (top==1) {
trimbits_to_load_l[offset+chip_sc] |= ( 0x7 & trimbits[row_set*16480+super_column_start_position_l+i])<<((7-i)*4);//low trimbits_to_load_l[offset+chip_sc] |= ( 0x7 & trimbits[row_set*16480+super_column_start_position_l+i])<<((7-i)*4);//low
trimbits_to_load_l[offset+chip_sc+32] |= ((0x38 & trimbits[row_set*16480+super_column_start_position_l+i])>>3)<<((7-i)*4);//upper trimbits_to_load_l[offset+chip_sc+32] |= ((0x38 & trimbits[row_set*16480+super_column_start_position_l+i])>>3)<<((7-i)*4);//upper
trimbits_to_load_r[offset+chip_sc] |= ( 0x7 & trimbits[row_set*16480+super_column_start_position_r+i])<<((7-i)*4);//low trimbits_to_load_r[offset+chip_sc] |= ( 0x7 & trimbits[row_set*16480+super_column_start_position_r+i])<<((7-i)*4);//low
@ -1572,12 +1572,12 @@ int Feb_Control_StopAcquisition() {
int Feb_Control_SaveAllTrimbitsTo(int value) { int Feb_Control_SaveAllTrimbitsTo(int value, int top) {
unsigned int chanregs[Feb_Control_trimbit_size]; unsigned int chanregs[Feb_Control_trimbit_size];
int i; int i;
for(i=0;i<Feb_Control_trimbit_size;i++) for(i=0;i<Feb_Control_trimbit_size;i++)
chanregs[i] = value; chanregs[i] = value;
return Feb_Control_SetTrimbits(0,chanregs); return Feb_Control_SetTrimbits(0,chanregs, top);
} }

View File

@ -93,9 +93,9 @@ int Feb_Control_SetDAC(char* s, int value, int is_a_voltage_mv);
int Feb_Control_GetDAC(char* s, int* ret_value, int voltage_mv); int Feb_Control_GetDAC(char* s, int* ret_value, int voltage_mv);
int Feb_Control_GetDACName(unsigned int dac_num,char* s); int Feb_Control_GetDACName(unsigned int dac_num,char* s);
int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int* trimbits); int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int* trimbits, int top);
unsigned int* Feb_Control_GetTrimbits(); unsigned int* Feb_Control_GetTrimbits();
int Feb_Control_SaveAllTrimbitsTo(int value); int Feb_Control_SaveAllTrimbitsTo(int value, int top);
int Feb_Control_Reset(); int Feb_Control_Reset();
int Feb_Control_PrepareForAcquisition(); int Feb_Control_PrepareForAcquisition();

View File

@ -759,7 +759,7 @@ int64_t getSubExpTime() {
#endif #endif
} }
int setDeadTime(int64_t val) { int setSubDeadTime(int64_t val) {
LOG(logINFO, ("Setting subdeadtime %lld ns\n", (long long int)val)); LOG(logINFO, ("Setting subdeadtime %lld ns\n", (long long int)val));
#ifndef VIRTUAL #ifndef VIRTUAL
// get subexptime // get subexptime
@ -781,7 +781,7 @@ int setDeadTime(int64_t val) {
return OK; return OK;
} }
int64_t getDeadTime() { int64_t getSubDeadTime() {
#ifndef VIRTUAL #ifndef VIRTUAL
// get subexptime // get subexptime
int64_t subexptime = Feb_Control_GetSubFrameExposureTime(); int64_t subexptime = Feb_Control_GetSubFrameExposureTime();
@ -890,7 +890,7 @@ int setModule(sls_detector_module myMod, char* mess) {
} }
//set trimbits //set trimbits
if (!Feb_Control_SetTrimbits(Feb_Control_GetModuleNumber(), tt)) { if (!Feb_Control_SetTrimbits(Feb_Control_GetModuleNumber(), tt,top)) {
sprintf(mess, "Could not set module. Could not set trimbits\n"); sprintf(mess, "Could not set module. Could not set trimbits\n");
LOG(logERROR, (mess)); LOG(logERROR, (mess));
setSettings(UNDEFINED); setSettings(UNDEFINED);
@ -1671,7 +1671,7 @@ void setExternalGating(int enable[]) {
int setAllTrimbits(int val) { int setAllTrimbits(int val) {
#ifndef VIRTUAL #ifndef VIRTUAL
if (!Feb_Control_SaveAllTrimbitsTo(val)) { if (!Feb_Control_SaveAllTrimbitsTo(val,top)) {
LOG(logERROR, ("Could not set all trimbits\n")); LOG(logERROR, ("Could not set all trimbits\n"));
return FAIL; return FAIL;
} }

View File

@ -52,12 +52,11 @@ int Server_VerifyLock();
* Server sends result to client (also set ret to force_update if different clients) * Server sends result to client (also set ret to force_update if different clients)
* @param fileDes file descriptor for the socket * @param fileDes file descriptor for the socket
* @param itype 32 or 64 or others to determine to swap data from big endian to little endian * @param itype 32 or 64 or others to determine to swap data from big endian to little endian
* @param update 1 if one must update if different clients, else 0
* @param retval pointer to result * @param retval pointer to result
* @param retvalSize size of result * @param retvalSize size of result
* @returns result of operation * @returns result of operation
*/ */
int Server_SendResult(int fileDes, intType itype, int update, void* retval, int retvalSize); int Server_SendResult(int fileDes, intType itype, void* retval, int retvalSize);
/** /**
* Convert mac address from integer to char array * Convert mac address from integer to char array

View File

@ -206,8 +206,8 @@ int64_t getBurstPeriod();
#ifdef EIGERD #ifdef EIGERD
int setSubExpTime(int64_t val); int setSubExpTime(int64_t val);
int64_t getSubExpTime(); int64_t getSubExpTime();
int setDeadTime(int64_t val); int setSubDeadTime(int64_t val);
int64_t getDeadTime(); int64_t getSubDeadTime();
int64_t getMeasuredPeriod(); int64_t getMeasuredPeriod();
int64_t getMeasuredSubPeriod(); int64_t getMeasuredSubPeriod();
#endif #endif

View File

@ -216,4 +216,5 @@ int get_timing_source(int);
int set_timing_source(int); int set_timing_source(int);
int get_num_channels(int); int get_num_channels(int);
int update_rate_correction(int); int update_rate_correction(int);
int get_receiver_parameters(int);

View File

@ -577,7 +577,7 @@ int Server_VerifyLock() {
} }
int Server_SendResult(int fileDes, intType itype, int update, void* retval, int retvalSize) { int Server_SendResult(int fileDes, intType itype, void* retval, int retvalSize) {
// send success of operation // send success of operation
int ret1 = ret; int ret1 = ret;

View File

@ -15,9 +15,8 @@ add_library(slsDetectorShared SHARED
${HEADERS} ${HEADERS}
) )
# Do we have link time optimization?
check_ipo_supported(RESULT LTO_AVAILABLE) if(SLS_LTO_AVAILABLE)
if(LTO_AVAILABLE)
set_property(TARGET slsDetectorShared PROPERTY INTERPROCEDURAL_OPTIMIZATION True) set_property(TARGET slsDetectorShared PROPERTY INTERPROCEDURAL_OPTIMIZATION True)
endif() endif()

View File

@ -526,27 +526,6 @@ Module::getTypeFromDetector(const std::string &hostname, int cport) {
return retval; return retval;
} }
int Module::setDetectorType(detectorType const type) {
int fnum = F_GET_DETECTOR_TYPE;
detectorType retval = GENERIC;
LOG(logDEBUG1) << "Setting detector type to " << type;
// if unspecified, then get from detector
if (type == GET_DETECTOR_TYPE) {
sendToDetector(fnum, nullptr, retval);
shm()->myDetectorType = static_cast<detectorType>(retval);
LOG(logDEBUG1) << "Detector Type: " << retval;
}
if (shm()->useReceiverFlag) {
auto arg = static_cast<int>(shm()->myDetectorType);
retval = GENERIC;
LOG(logDEBUG1) << "Sending detector type to Receiver: " << arg;
sendToReceiver(F_GET_RECEIVER_TYPE, arg, retval);
LOG(logDEBUG1) << "Receiver Type: " << retval;
}
return retval;
}
slsDetectorDefs::detectorType Module::getDetectorType() const { slsDetectorDefs::detectorType Module::getDetectorType() const {
return shm()->myDetectorType; return shm()->myDetectorType;
} }
@ -1510,14 +1489,13 @@ uint32_t Module::clearBit(uint32_t addr, int n) {
} }
} }
std::string Module::setReceiverHostname(const std::string &receiverIP) { void Module::setReceiverHostname(const std::string &receiverIP) {
LOG(logDEBUG1) << "Setting up Receiver with " << receiverIP; LOG(logDEBUG1) << "Setting up Receiver with " << receiverIP;
// recieverIP is none // recieverIP is none
if (receiverIP == "none") { if (receiverIP == "none") {
memset(shm()->rxHostname, 0, MAX_STR_LENGTH); memset(shm()->rxHostname, 0, MAX_STR_LENGTH);
sls::strcpy_safe(shm()->rxHostname, "none"); sls::strcpy_safe(shm()->rxHostname, "none");
shm()->useReceiverFlag = false; shm()->useReceiverFlag = false;
return std::string(shm()->rxHostname);
} }
// stop acquisition if running // stop acquisition if running
@ -1537,81 +1515,78 @@ std::string Module::setReceiverHostname(const std::string &receiverIP) {
shm()->useReceiverFlag = true; shm()->useReceiverFlag = true;
checkReceiverVersionCompatibility(); checkReceiverVersionCompatibility();
if (setDetectorType(shm()->myDetectorType) != GENERIC) { // populate parameters from detector
sendMultiDetectorSize(); rxParameters retval;
setDetectorId(); sendToDetector(F_GET_RECEIVER_PARAMETERS, nullptr, retval);
setDetectorHostname();
// setup udp
updateRxDestinationUDPIP();
setDestinationUDPPort(getDestinationUDPPort());
if (shm()->myDetectorType == JUNGFRAU || shm()->myDetectorType == EIGER ) {
setDestinationUDPPort2(getDestinationUDPPort2());
}
if (shm()->myDetectorType == JUNGFRAU) {
updateRxDestinationUDPIP2();
setNumberofUDPInterfaces(getNumberofUDPInterfaces());
}
LOG(logDEBUG1) << printReceiverConfiguration();
setReceiverUDPSocketBufferSize(0); // populate from shared memory
setNumberOfFrames(getNumberOfFrames()); retval.detType = shm()->myDetectorType;
setNumberOfTriggers(getNumberOfTriggers()); retval.multiSize.x = shm()->multiSize.x;
setTimingMode(getTimingMode()); retval.multiSize.y = shm()->multiSize.y;
setExptime(getExptime()); retval.detId = detId;
setPeriod(getPeriod()); memset(retval.hostname, 0, sizeof(retval.hostname));
strcpy_safe(retval.hostname, shm()->hostname);
// detector specific LOG(logDEBUG1)
switch (shm()->myDetectorType) { << "detType:" << retval.detType << std::endl
<< "multiSize.x:" << retval.multiSize.x << std::endl
<< "multiSize.y:" << retval.multiSize.y << std::endl
<< "detId:" << retval.detId << std::endl
<< "hostname:" << retval.hostname << std::endl
<< "udpInterfaces:" << retval.udpInterfaces << std::endl
<< "udp_dstport:" << retval.udp_dstport << std::endl
<< "udp_dstip:" << sls::IpAddr(retval.udp_dstip) << std::endl
<< "udp_dstmac:" << sls::MacAddr(retval.udp_dstmac) << std::endl
<< "udp_dstport2:" << retval.udp_dstport2 << std::endl
<< "udp_dstip2:" << sls::IpAddr(retval.udp_dstip2) << std::endl
<< "udp_dstmac2:" << sls::MacAddr(retval.udp_dstmac2) << std::endl
<< "frames:" << retval.frames << std::endl
<< "triggers:" << retval.triggers << std::endl
<< "bursts:" << retval.bursts << std::endl
<< "analogSamples:" << retval.analogSamples << std::endl
<< "digitalSamples:" << retval.digitalSamples << std::endl
<< "expTimeNs:" << retval.expTimeNs << std::endl
<< "periodNs:" << retval.periodNs << std::endl
<< "subExpTimeNs:" << retval.subExpTimeNs << std::endl
<< "subDeadTimeNs:" << retval.subDeadTimeNs << std::endl
<< "activate:" << retval.activate << std::endl
<< "quad:" << retval.quad << std::endl
<< "dynamicRange:" << retval.dynamicRange << std::endl
<< "timMode:" << retval.timMode << std::endl
<< "tenGiga:" << retval.tenGiga << std::endl
<< "roMode:" << retval.roMode << std::endl
<< "adcMask:" << retval.adcMask << std::endl
<< "adc10gMask:" << retval.adc10gMask << std::endl
<< "roi.xmin:" << retval.roi.xmin << std::endl
<< "roi.xmax:" << retval.roi.xmax << std::endl
<< "countermask:" << retval.countermask << std::endl
<< "burstType:" << retval.burstType << std::endl;
case EIGER:
setSubExptime(getSubExptime());
setSubDeadTime(getSubDeadTime());
setDynamicRange(getDynamicRange());
activate(-1);
enableTenGigabitEthernet(-1);
setQuad(getQuad());
break;
case CHIPTESTBOARD: sls::MacAddr retvals[2];
setNumberOfAnalogSamples(getNumberOfAnalogSamples()); sendToReceiver(F_SETUP_RECEIVER, retval, retvals);
setNumberOfDigitalSamples(getNumberOfDigitalSamples()); // update detectors with dest mac
enableTenGigabitEthernet(-1); if (retval.udp_dstmac == 0 && retvals[0] != 0) {
setReadoutMode(getReadoutMode()); LOG(logINFO) << "Setting destination udp mac of "
setADCEnableMask(getADCEnableMask()); "detector " << detId << " to " << retvals[0];
setTenGigaADCEnableMask(getTenGigaADCEnableMask()); sendToDetector(F_SET_DEST_UDP_MAC, retvals[0], nullptr);
break; }
if (retval.udp_dstmac2 == 0 && retvals[1] != 0) {
case MOENCH: LOG(logINFO) << "Setting destination udp mac2 of "
setNumberOfAnalogSamples(getNumberOfAnalogSamples()); "detector " << detId << " to " << retvals[1];
enableTenGigabitEthernet(-1); sendToDetector(F_SET_DEST_UDP_MAC2, retvals[1], nullptr);
setADCEnableMask(getADCEnableMask());
setTenGigaADCEnableMask(getTenGigaADCEnableMask());
break;
case GOTTHARD:
setROI(getROI());
break;
case MYTHEN3:
sendNumberofCounterstoReceiver(getCounterMask());
setDynamicRange(getDynamicRange());
break;
case GOTTHARD2:
setNumberOfBursts(getNumberOfBursts());
setBurstMode(getBurstMode());
break;
default:
break;
}
// to use rx_hostname if empty and also update client zmqip
updateReceiverStreamingIP();
} }
return std::string(shm()->rxHostname); // update numinterfaces if different
shm()->numUDPInterfaces = retval.udpInterfaces;
if (shm()->myDetectorType == MOENCH) {
setAdditionalJsonParameter("adcmask_1g", std::to_string(retval.adcMask));
setAdditionalJsonParameter("adcmask_10g", std::to_string(retval.adc10gMask));
}
// to use rx_hostname if empty and also update client zmqip
updateReceiverStreamingIP();
} }
std::string Module::getReceiverHostname() const { std::string Module::getReceiverHostname() const {
@ -1706,13 +1681,6 @@ sls::IpAddr Module::getDestinationUDPIP() {
return retval; return retval;
} }
void Module::updateRxDestinationUDPIP() {
auto ip = getDestinationUDPIP();
if (ip != 0) {
setDestinationUDPIP(ip);
}
}
void Module::setDestinationUDPIP2(const IpAddr ip) { void Module::setDestinationUDPIP2(const IpAddr ip) {
LOG(logDEBUG1) << "Setting destination udp ip2 to " << ip; LOG(logDEBUG1) << "Setting destination udp ip2 to " << ip;
if (ip == 0) { if (ip == 0) {
@ -1736,13 +1704,6 @@ sls::IpAddr Module::getDestinationUDPIP2() {
return retval; return retval;
} }
void Module::updateRxDestinationUDPIP2() {
auto ip = getDestinationUDPIP2();
if (ip != 0) {
setDestinationUDPIP2(ip);
}
}
void Module::setDestinationUDPMAC(const MacAddr mac) { void Module::setDestinationUDPMAC(const MacAddr mac) {
LOG(logDEBUG1) << "Setting destination udp mac to " << mac; LOG(logDEBUG1) << "Setting destination udp mac to " << mac;
if (mac == 0) { if (mac == 0) {
@ -2887,38 +2848,6 @@ void Module::execReceiverCommand(const std::string &cmd) {
} }
} }
void Module::sendMultiDetectorSize() {
int args[]{shm()->multiSize.x, shm()->multiSize.y};
int retval = -1;
LOG(logDEBUG1) << "Sending multi detector size to receiver: ("
<< shm()->multiSize.x << "," << shm()->multiSize.y
<< ")";
if (shm()->useReceiverFlag) {
sendToReceiver(F_SEND_RECEIVER_MULTIDETSIZE, args, retval);
LOG(logDEBUG1) << "Receiver multi size returned: " << retval;
}
}
void Module::setDetectorId() {
LOG(logDEBUG1) << "Sending detector pos id to receiver: " << detId;
if (shm()->useReceiverFlag) {
int retval = -1;
sendToReceiver(F_SEND_RECEIVER_DETPOSID, detId, retval);
LOG(logDEBUG1) << "Receiver Position Id returned: " << retval;
}
}
void Module::setDetectorHostname() {
char args[MAX_STR_LENGTH]{};
char retvals[MAX_STR_LENGTH]{};
sls::strcpy_safe(args, shm()->hostname);
LOG(logDEBUG1) << "Sending detector hostname to receiver: " << args;
if (shm()->useReceiverFlag) {
sendToReceiver(F_SEND_RECEIVER_DETHOSTNAME, args, retvals);
LOG(logDEBUG1) << "Receiver set detector hostname: " << retvals;
}
}
std::string Module::getFilePath() { std::string Module::getFilePath() {
if (!shm()->useReceiverFlag) { if (!shm()->useReceiverFlag) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (file path)"); throw RuntimeError("Set rx_hostname first to use receiver parameters (file path)");
@ -3216,6 +3145,10 @@ bool Module::enableTenGigabitEthernet(int value) {
int retval = -1; int retval = -1;
LOG(logDEBUG1) << "Enabling / Disabling 10Gbe: " << value; LOG(logDEBUG1) << "Enabling / Disabling 10Gbe: " << value;
sendToDetector(F_ENABLE_TEN_GIGA, value, retval); sendToDetector(F_ENABLE_TEN_GIGA, value, retval);
if (value != -1) {
int stopRetval = -1;
sendToDetectorStop(F_ENABLE_TEN_GIGA, value, stopRetval);
}
LOG(logDEBUG1) << "10Gbe: " << retval; LOG(logDEBUG1) << "10Gbe: " << retval;
value = retval; value = retval;
if (shm()->useReceiverFlag && value != -1) { if (shm()->useReceiverFlag && value != -1) {
@ -3459,10 +3392,6 @@ void Module::setPipeline(int clkIndex, int value) {
void Module::setCounterMask(uint32_t countermask) { void Module::setCounterMask(uint32_t countermask) {
LOG(logDEBUG1) << "Setting Counter mask to " << countermask; LOG(logDEBUG1) << "Setting Counter mask to " << countermask;
sendToDetector(F_SET_COUNTER_MASK, countermask, nullptr); sendToDetector(F_SET_COUNTER_MASK, countermask, nullptr);
sendNumberofCounterstoReceiver(countermask);
}
void Module::sendNumberofCounterstoReceiver(uint32_t countermask) {
if (shm()->useReceiverFlag) { if (shm()->useReceiverFlag) {
int ncounters = __builtin_popcount(countermask); int ncounters = __builtin_popcount(countermask);
LOG(logDEBUG1) << "Sending Reciver #counters: " << ncounters; LOG(logDEBUG1) << "Sending Reciver #counters: " << ncounters;

View File

@ -176,13 +176,6 @@ class Module : public virtual slsDetectorDefs {
*/ */
detectorType getDetectorType() const; detectorType getDetectorType() const;
/**
* Gets detector type from detector and set it in receiver
* @param type the detector type
* @returns detector type in receiver
*/
int setDetectorType(detectorType type = GET_DETECTOR_TYPE);
/** /**
* Update total number of channels (chiptestboard or moench) * Update total number of channels (chiptestboard or moench)
* from the detector server * from the detector server
@ -647,10 +640,10 @@ class Module : public virtual slsDetectorDefs {
* significant for the receiver Also configures the detector to the receiver * significant for the receiver Also configures the detector to the receiver
* as UDP destination * as UDP destination
* @param receiver receiver hostname or IP address * @param receiver receiver hostname or IP address
* @returns the receiver IP address from shared memory
*/ */
std::string setReceiverHostname(const std::string &receiver); void setReceiverHostname(const std::string &receiver);
void test();
/** /**
* Returns the receiver IP address\sa sharedSlsDetector * Returns the receiver IP address\sa sharedSlsDetector
* @returns the receiver IP address * @returns the receiver IP address
@ -721,13 +714,6 @@ class Module : public virtual slsDetectorDefs {
*/ */
sls::IpAddr getDestinationUDPIP(); sls::IpAddr getDestinationUDPIP();
/**
* Gets destination udp ip from detector,
* if 0, it converts rx_hostname to ip and
* updates both detector and receiver
*/
void updateRxDestinationUDPIP();
/** /**
* Validates the format of the receiver UDP IP address (bottom half) and * Validates the format of the receiver UDP IP address (bottom half) and
* sets it(Jungfrau only) * sets it(Jungfrau only)
@ -743,13 +729,6 @@ class Module : public virtual slsDetectorDefs {
*/ */
sls::IpAddr getDestinationUDPIP2(); sls::IpAddr getDestinationUDPIP2();
/**
* Gets destination udp ip2 from detector,
* if 0, it converts rx_hostname to ip and
* updates both detector and receiver
*/
void updateRxDestinationUDPIP2();
/** /**
* Validates the format of the receiver UDP MAC address and sets it * Validates the format of the receiver UDP MAC address and sets it
* @param mac receiver UDP MAC address * @param mac receiver UDP MAC address
@ -1344,26 +1323,6 @@ class Module : public virtual slsDetectorDefs {
*/ */
void execReceiverCommand(const std::string &cmd); void execReceiverCommand(const std::string &cmd);
/**
* Send the multi detector size to the detector
* @param detx number of detectors in x dir
* @param dety number of detectors in y dir
*/
void sendMultiDetectorSize();
/**
* Send the detector pos id to the receiver
* for various file naming conventions for multi detectors in receiver
*/
void setDetectorId();
/**
* Send the detector host name to the receiver
* for various handshaking required with the detector
*/
void setDetectorHostname();
std::string getFilePath(); std::string getFilePath();
void setFilePath(const std::string &path); void setFilePath(const std::string &path);
std::string getFileName(); std::string getFileName();
@ -1606,9 +1565,6 @@ class Module : public virtual slsDetectorDefs {
/** [Mythen3] */ /** [Mythen3] */
void setCounterMask(uint32_t countermask); void setCounterMask(uint32_t countermask);
/** [Mythen3] */
void sendNumberofCounterstoReceiver(uint32_t countermask);
/** [Mythen3] */ /** [Mythen3] */
uint32_t getCounterMask(); uint32_t getCounterMask();

View File

@ -29,8 +29,8 @@ add_library(slsReceiverShared SHARED
${HEADERS} ${HEADERS}
) )
check_ipo_supported(RESULT result)
if(result) if(SLS_LTO_AVAILABLE)
set_property(TARGET slsReceiverShared PROPERTY INTERPROCEDURAL_OPTIMIZATION True) set_property(TARGET slsReceiverShared PROPERTY INTERPROCEDURAL_OPTIMIZATION True)
endif() endif()

View File

@ -24,19 +24,16 @@ using Interface = sls::ServerInterface;
ClientInterface::~ClientInterface() { ClientInterface::~ClientInterface() {
killTcpThread = true; killTcpThread = true;
// shut down tcp sockets LOG(logINFO) << "Shutting down TCP Socket on port " << portNumber;
if (server.get() != nullptr) { server.shutdown();
LOG(logINFO) << "Shutting down TCP Socket on port " << portNumber; LOG(logDEBUG) << "TCP Socket closed on port " << portNumber;
server->shutDownSocket();
LOG(logDEBUG) << "TCP Socket closed on port " << portNumber;
}
// shut down tcp thread
tcpThread->join(); tcpThread->join();
} }
ClientInterface::ClientInterface(int portNumber) ClientInterface::ClientInterface(int portNumber)
: myDetectorType(GOTTHARD), : myDetectorType(GOTTHARD),
portNumber(portNumber > 0 ? portNumber : DEFAULT_PORTNO + 2) { portNumber(portNumber > 0 ? portNumber : DEFAULT_PORTNO + 2),
server(portNumber) {
functionTable(); functionTable();
// start up tcp thread // start up tcp thread
tcpThread = sls::make_unique<std::thread>(&ClientInterface::startTCPServer, this); tcpThread = sls::make_unique<std::thread>(&ClientInterface::startTCPServer, this);
@ -73,11 +70,11 @@ void ClientInterface::startTCPServer() {
LOG(logINFOBLUE) << "Created [ TCP server Tid: " << syscall(SYS_gettid) << "]"; LOG(logINFOBLUE) << "Created [ TCP server Tid: " << syscall(SYS_gettid) << "]";
LOG(logINFO) << "SLS Receiver starting TCP Server on port " LOG(logINFO) << "SLS Receiver starting TCP Server on port "
<< portNumber << '\n'; << portNumber << '\n';
server = sls::make_unique<sls::ServerSocket>(portNumber); // server = sls::make_unique<sls::ServerSocket>(portNumber);
while (true) { while (!killTcpThread) {
LOG(logDEBUG1) << "Start accept loop"; LOG(logDEBUG1) << "Start accept loop";
try { try {
auto socket = server->accept(); auto socket = server.accept();
try { try {
verifyLock(); verifyLock();
ret = decodeFunction(socket); ret = decodeFunction(socket);
@ -95,10 +92,6 @@ void ClientInterface::startTCPServer() {
} catch (const RuntimeError &e) { } catch (const RuntimeError &e) {
LOG(logERROR) << "Accept failed"; LOG(logERROR) << "Accept failed";
} }
// destructor to kill this thread
if (killTcpThread) {
break;
}
} }
if (receiver) { if (receiver) {
@ -115,8 +108,7 @@ int ClientInterface::functionTable(){
flist[F_GET_LAST_RECEIVER_CLIENT_IP] = &ClientInterface::get_last_client_ip; flist[F_GET_LAST_RECEIVER_CLIENT_IP] = &ClientInterface::get_last_client_ip;
flist[F_SET_RECEIVER_PORT] = &ClientInterface::set_port; flist[F_SET_RECEIVER_PORT] = &ClientInterface::set_port;
flist[F_GET_RECEIVER_VERSION] = &ClientInterface::get_version; flist[F_GET_RECEIVER_VERSION] = &ClientInterface::get_version;
flist[F_GET_RECEIVER_TYPE] = &ClientInterface::set_detector_type; flist[F_SETUP_RECEIVER] = &ClientInterface::setup_receiver;
flist[F_SEND_RECEIVER_DETHOSTNAME] = &ClientInterface::set_detector_hostname;
flist[F_RECEIVER_SET_ROI] = &ClientInterface::set_roi; flist[F_RECEIVER_SET_ROI] = &ClientInterface::set_roi;
flist[F_RECEIVER_SET_NUM_FRAMES] = &ClientInterface::set_num_frames; flist[F_RECEIVER_SET_NUM_FRAMES] = &ClientInterface::set_num_frames;
flist[F_SET_RECEIVER_NUM_TRIGGERS] = &ClientInterface::set_num_triggers; flist[F_SET_RECEIVER_NUM_TRIGGERS] = &ClientInterface::set_num_triggers;
@ -160,8 +152,6 @@ int ClientInterface::functionTable(){
flist[F_SET_FLIPPED_DATA_RECEIVER] = &ClientInterface::set_flipped_data; flist[F_SET_FLIPPED_DATA_RECEIVER] = &ClientInterface::set_flipped_data;
flist[F_SET_RECEIVER_FILE_FORMAT] = &ClientInterface::set_file_format; flist[F_SET_RECEIVER_FILE_FORMAT] = &ClientInterface::set_file_format;
flist[F_GET_RECEIVER_FILE_FORMAT] = &ClientInterface::get_file_format; flist[F_GET_RECEIVER_FILE_FORMAT] = &ClientInterface::get_file_format;
flist[F_SEND_RECEIVER_DETPOSID] = &ClientInterface::set_detector_posid;
flist[F_SEND_RECEIVER_MULTIDETSIZE] = &ClientInterface::set_multi_detector_size;
flist[F_SET_RECEIVER_STREAMING_PORT] = &ClientInterface::set_streaming_port; flist[F_SET_RECEIVER_STREAMING_PORT] = &ClientInterface::set_streaming_port;
flist[F_GET_RECEIVER_STREAMING_PORT] = &ClientInterface::get_streaming_port; flist[F_GET_RECEIVER_STREAMING_PORT] = &ClientInterface::get_streaming_port;
flist[F_SET_RECEIVER_STREAMING_SRC_IP] = &ClientInterface::set_streaming_source_ip; flist[F_SET_RECEIVER_STREAMING_SRC_IP] = &ClientInterface::set_streaming_source_ip;
@ -256,7 +246,7 @@ void ClientInterface::validate(T arg, T retval, const std::string& modename,
} }
void ClientInterface::verifyLock() { void ClientInterface::verifyLock() {
if (lockedByClient && server->getThisClient() != server->getLockedBy()) { if (lockedByClient && server.getThisClient() != server.getLockedBy()) {
throw sls::SocketError("Receiver locked\n"); throw sls::SocketError("Receiver locked\n");
} }
} }
@ -302,10 +292,10 @@ int ClientInterface::lock_receiver(Interface &socket) {
auto lock = socket.Receive<int>(); auto lock = socket.Receive<int>();
LOG(logDEBUG1) << "Locking Server to " << lock; LOG(logDEBUG1) << "Locking Server to " << lock;
if (lock >= 0) { if (lock >= 0) {
if (!lockedByClient || (server->getLockedBy() == server->getThisClient())) { if (!lockedByClient || (server.getLockedBy() == server.getThisClient())) {
lockedByClient = lock; lockedByClient = lock;
lock ? server->setLockedBy(server->getThisClient()) lock ? server.setLockedBy(server.getThisClient())
: server->setLockedBy(sls::IpAddr{}); : server.setLockedBy(sls::IpAddr{});
} else { } else {
throw RuntimeError("Receiver locked\n"); throw RuntimeError("Receiver locked\n");
} }
@ -314,7 +304,7 @@ int ClientInterface::lock_receiver(Interface &socket) {
} }
int ClientInterface::get_last_client_ip(Interface &socket) { int ClientInterface::get_last_client_ip(Interface &socket) {
return socket.sendResult(server->getLastClient()); return socket.sendResult(server.getLastClient());
} }
int ClientInterface::set_port(Interface &socket) { int ClientInterface::set_port(Interface &socket) {
@ -324,9 +314,9 @@ int ClientInterface::set_port(Interface &socket) {
" is too low (<1024)"); " is too low (<1024)");
LOG(logINFO) << "TCP port set to " << p_number << std::endl; LOG(logINFO) << "TCP port set to " << p_number << std::endl;
auto new_server = sls::make_unique<sls::ServerSocket>(p_number); sls::ServerSocket new_server(p_number);
new_server->setLockedBy(server->getLockedBy()); new_server.setLockedBy(server.getLockedBy());
new_server->setLastClient(server->getThisClient()); new_server.setLastClient(server.getThisClient());
server = std::move(new_server); server = std::move(new_server);
socket.sendResult(p_number); socket.sendResult(p_number);
return OK; return OK;
@ -336,69 +326,210 @@ int ClientInterface::get_version(Interface &socket) {
return socket.sendResult(getReceiverVersion()); return socket.sendResult(getReceiverVersion());
} }
int ClientInterface::set_detector_type(Interface &socket) { int ClientInterface::setup_receiver(Interface &socket) {
auto arg = socket.Receive<detectorType>(); auto arg = socket.Receive<rxParameters>();
// set LOG(logDEBUG1)
if (arg >= 0) { << "detType:" << arg.detType << std::endl
// if object exists, verify unlocked and idle, else only verify lock << "multiSize.x:" << arg.multiSize.x << std::endl
// (connecting first time) << "multiSize.y:" << arg.multiSize.y << std::endl
if (receiver != nullptr) { << "detId:" << arg.detId << std::endl
verifyIdle(socket); << "hostname:" << arg.hostname << std::endl
} << "udpInterfaces:" << arg.udpInterfaces << std::endl
switch (arg) { << "udp_dstport:" << arg.udp_dstport << std::endl
case GOTTHARD: << "udp_dstip:" << sls::IpAddr(arg.udp_dstip) << std::endl
case EIGER: << "udp_dstmac:" << sls::MacAddr(arg.udp_dstmac) << std::endl
case CHIPTESTBOARD: << "udp_dstport2:" << arg.udp_dstport2 << std::endl
case MOENCH: << "udp_dstip2:" << sls::IpAddr(arg.udp_dstip2) << std::endl
case JUNGFRAU: << "udp_dstmac2:" << sls::MacAddr(arg.udp_dstmac2) << std::endl
case MYTHEN3: << "frames:" << arg.frames << std::endl
case GOTTHARD2: << "triggers:" << arg.triggers << std::endl
break; << "bursts:" << arg.bursts << std::endl
default: << "analogSamples:" << arg.analogSamples << std::endl
throw RuntimeError("Unknown detector type: " + std::to_string(arg)); << "digitalSamples:" << arg.digitalSamples << std::endl
break; << "expTimeNs:" << arg.expTimeNs << std::endl
} << "periodNs:" << arg.periodNs << std::endl
<< "subExpTimeNs:" << arg.subExpTimeNs << std::endl
<< "subDeadTimeNs:" << arg.subDeadTimeNs << std::endl
<< "activate:" << arg.activate << std::endl
<< "quad:" << arg.quad << std::endl
<< "dynamicRange:" << arg.dynamicRange << std::endl
<< "timMode:" << arg.timMode << std::endl
<< "tenGiga:" << arg.tenGiga << std::endl
<< "roMode:" << arg.roMode << std::endl
<< "adcMask:" << arg.adcMask << std::endl
<< "adc10gMask:" << arg.adc10gMask << std::endl
<< "roi.xmin:" << arg.roi.xmin << std::endl
<< "roi.xmax:" << arg.roi.xmax << std::endl
<< "countermask:" << arg.countermask << std::endl
<< "burstType:" << arg.burstType << std::endl;
try {
myDetectorType = GENERIC;
receiver = sls::make_unique<Implementation>(arg);
myDetectorType = arg;
} catch (...) {
throw RuntimeError("Could not set detector type");
}
// callbacks after (in setdetectortype, the object is reinitialized) // if object exists, verify unlocked and idle, else only verify lock
if (startAcquisitionCallBack != nullptr) // (connecting first time)
impl()->registerCallBackStartAcquisition(startAcquisitionCallBack, if (receiver != nullptr) {
pStartAcquisition); verifyIdle(socket);
if (acquisitionFinishedCallBack != nullptr)
impl()->registerCallBackAcquisitionFinished(
acquisitionFinishedCallBack, pAcquisitionFinished);
if (rawDataReadyCallBack != nullptr)
impl()->registerCallBackRawDataReady(rawDataReadyCallBack,
pRawDataReady);
if (rawDataModifyReadyCallBack != nullptr)
impl()->registerCallBackRawDataModifyReady(
rawDataModifyReadyCallBack, pRawDataReady);
} }
return socket.sendResult(myDetectorType);
// basic setup
setDetectorType(arg.detType);
{
int msize[2] = {arg.multiSize.x, arg.multiSize.y};
impl()->setMultiDetectorSize(msize);
}
impl()->setDetectorPositionId(arg.detId);
impl()->setDetectorHostname(arg.hostname);
// udp setup
sls::MacAddr retvals[2];
if (arg.udp_dstmac == 0 && arg.udp_dstip != 0) {
retvals[0] = setUdpIp(sls::IpAddr(arg.udp_dstip));
}
if (arg.udp_dstmac2 == 0 && arg.udp_dstip2 != 0) {
retvals[1] = setUdpIp2(sls::IpAddr(arg.udp_dstip2));
}
impl()->setUDPPortNumber(arg.udp_dstport);
impl()->setUDPPortNumber2(arg.udp_dstport2);
if (myDetectorType == JUNGFRAU) {
try {
impl()->setNumberofUDPInterfaces(arg.udpInterfaces);
} catch(const RuntimeError &e) {
throw RuntimeError("Failed to set number of interfaces to " +
std::to_string(arg.udpInterfaces));
}
}
impl()->setUDPSocketBufferSize(0);
// acquisition parameters
impl()->setNumberOfFrames(arg.frames);
impl()->setNumberOfTriggers(arg.triggers);
if (myDetectorType == GOTTHARD) {
impl()->setNumberOfBursts(arg.bursts);
}
if (myDetectorType == MOENCH || myDetectorType == CHIPTESTBOARD) {
try {
impl()->setNumberofAnalogSamples(arg.analogSamples);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set num analog samples to " +
std::to_string(arg.analogSamples) +
" due to fifo structure memory allocation.");
}
}
if (myDetectorType == CHIPTESTBOARD) {
try {
impl()->setNumberofDigitalSamples(arg.digitalSamples);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set num digital samples to "
+ std::to_string(arg.analogSamples) +
" due to fifo structure memory allocation.");
}
}
impl()->setAcquisitionTime(arg.expTimeNs);
impl()->setAcquisitionPeriod(arg.periodNs);
if (myDetectorType == EIGER) {
impl()->setSubExpTime(arg.subExpTimeNs);
impl()->setSubPeriod(arg.subExpTimeNs + arg.subDeadTimeNs);
impl()->setActivate(static_cast<bool>(arg.activate));
try {
impl()->setQuad(arg.quad == 0 ? false : true);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set quad to " +
std::to_string(arg.quad) +
" due to fifo strucutre memory allocation");
}
}
if (myDetectorType == EIGER || myDetectorType == MYTHEN3) {
try {
impl()->setDynamicRange(arg.dynamicRange);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set dynamic range. Could not allocate "
"memory for fifo or could not start listening/writing threads");
}
}
impl()->setTimingMode(arg.timMode);
if (myDetectorType == EIGER || myDetectorType == MOENCH ||
myDetectorType == CHIPTESTBOARD) {
try {
impl()->setTenGigaEnable(arg.tenGiga);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set 10GbE.");
}
}
if (myDetectorType == CHIPTESTBOARD) {
try {
impl()->setReadoutMode(arg.roMode);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set read out mode "
"due to fifo memory allocation.");
}
}
if (myDetectorType == CHIPTESTBOARD || myDetectorType == MOENCH) {
try {
impl()->setADCEnableMask(arg.adcMask);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set adc enable mask "
"due to fifo memory allcoation");
}
try {
impl()->setTenGigaADCEnableMask(arg.adc10gMask);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set 10Gb adc enable mask "
"due to fifo memory allcoation");
}
}
if (myDetectorType == GOTTHARD) {
try {
impl()->setROI(arg.roi);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set ROI");
}
}
if (myDetectorType == MYTHEN3) {
int ncounters = __builtin_popcount(arg.countermask);
impl()->setNumberofCounters(ncounters);
}
if (myDetectorType == GOTTHARD) {
impl()->setBurstMode(arg.burstType);
}
return socket.sendResult(retvals);
} }
int ClientInterface::set_detector_hostname(Interface &socket) { void ClientInterface::setDetectorType(detectorType arg) {
char hostname[MAX_STR_LENGTH]{}; switch (arg) {
char retval[MAX_STR_LENGTH]{}; case GOTTHARD:
socket.Receive(hostname); case EIGER:
case CHIPTESTBOARD:
case MOENCH:
case JUNGFRAU:
case MYTHEN3:
case GOTTHARD2:
break;
default:
throw RuntimeError("Unknown detector type: " + std::to_string(arg));
break;
}
if (strlen(hostname) != 0) { try {
verifyIdle(socket); myDetectorType = GENERIC;
impl()->setDetectorHostname(hostname); receiver = sls::make_unique<Implementation>(arg);
myDetectorType = arg;
} catch (...) {
throw RuntimeError("Could not set detector type");
} }
auto s = impl()->getDetectorHostname();
sls::strcpy_safe(retval, s.c_str()); // callbacks after (in setdetectortype, the object is reinitialized)
if (s.empty()) { if (startAcquisitionCallBack != nullptr)
throw RuntimeError("Hostname not set"); impl()->registerCallBackStartAcquisition(startAcquisitionCallBack,
} pStartAcquisition);
return socket.sendResult(retval); if (acquisitionFinishedCallBack != nullptr)
impl()->registerCallBackAcquisitionFinished(
acquisitionFinishedCallBack, pAcquisitionFinished);
if (rawDataReadyCallBack != nullptr)
impl()->registerCallBackRawDataReady(rawDataReadyCallBack,
pRawDataReady);
if (rawDataModifyReadyCallBack != nullptr)
impl()->registerCallBackRawDataModifyReady(
rawDataModifyReadyCallBack, pRawDataReady);
} }
int ClientInterface::set_roi(Interface &socket) { int ClientInterface::set_roi(Interface &socket) {
@ -514,7 +645,6 @@ int ClientInterface::set_num_analog_samples(Interface &socket) {
} catch(const RuntimeError &e) { } catch(const RuntimeError &e) {
throw RuntimeError("Could not set num analog samples to " + std::to_string(value) + " due to fifo structure memory allocation."); throw RuntimeError("Could not set num analog samples to " + std::to_string(value) + " due to fifo structure memory allocation.");
} }
return socket.Send(OK); return socket.Send(OK);
} }
@ -831,7 +961,7 @@ int ClientInterface::enable_tengiga(Interface &socket) {
try { try {
impl()->setTenGigaEnable(val); impl()->setTenGigaEnable(val);
} catch(const RuntimeError &e) { } catch(const RuntimeError &e) {
throw RuntimeError("Could not set 10GbE."); throw RuntimeError("Could not set 10GbE." );
} }
} }
int retval = impl()->getTenGigaEnable(); int retval = impl()->getTenGigaEnable();
@ -952,34 +1082,6 @@ int ClientInterface::get_file_format(Interface &socket) {
return socket.sendResult(retval); return socket.sendResult(retval);
} }
int ClientInterface::set_detector_posid(Interface &socket) {
auto arg = socket.Receive<int>();
if (arg >= 0) {
verifyIdle(socket);
LOG(logDEBUG1) << "Setting detector position id:" << arg;
impl()->setDetectorPositionId(arg);
}
auto retval = impl()->getDetectorPositionId();
validate(arg, retval, "set detector position id", DEC);
LOG(logDEBUG1) << "Position Id:" << retval;
return socket.sendResult(retval);
}
int ClientInterface::set_multi_detector_size(Interface &socket) {
int arg[]{-1, -1};
socket.Receive(arg);
if ((arg[0] > 0) && (arg[1] > 0)) {
verifyIdle(socket);
LOG(logDEBUG1)
<< "Setting multi detector size:" << arg[0] << "," << arg[1];
impl()->setMultiDetectorSize(arg);
}
int *temp = impl()->getMultiDetectorSize(); // TODO! return by value!
int retval = temp[0] * temp[1];
LOG(logDEBUG1) << "Multi Detector Size:" << retval;
return socket.sendResult(retval);
}
int ClientInterface::set_streaming_port(Interface &socket) { int ClientInterface::set_streaming_port(Interface &socket) {
auto port = socket.Receive<int>(); auto port = socket.Receive<int>();
if (port < 0) { if (port < 0) {
@ -1363,11 +1465,7 @@ int ClientInterface::set_read_n_lines(Interface &socket) {
return socket.Send(OK); return socket.Send(OK);
} }
sls::MacAddr ClientInterface::setUdpIp(sls::IpAddr arg) {
int ClientInterface::set_udp_ip(Interface &socket) {
auto arg = socket.Receive<sls::IpAddr>();
verifyIdle(socket);
LOG(logINFO) << "Received UDP IP: " << arg;
// getting eth // getting eth
std::string eth = sls::IpToInterfaceName(arg.str()); std::string eth = sls::IpToInterfaceName(arg.str());
if (eth == "none") { if (eth == "none") {
@ -1386,18 +1484,19 @@ int ClientInterface::set_udp_ip(Interface &socket) {
if (retval == 0) { if (retval == 0) {
throw RuntimeError("Failed to get udp mac adddress to listen to (eth:" + eth + ", ip:" + arg.str() + ")\n"); throw RuntimeError("Failed to get udp mac adddress to listen to (eth:" + eth + ", ip:" + arg.str() + ")\n");
} }
return retval;
}
int ClientInterface::set_udp_ip(Interface &socket) {
auto arg = socket.Receive<sls::IpAddr>();
verifyIdle(socket);
LOG(logINFO) << "Received UDP IP: " << arg;
auto retval = setUdpIp(arg);
LOG(logINFO) << "Receiver MAC Address: " << retval; LOG(logINFO) << "Receiver MAC Address: " << retval;
return socket.sendResult(retval); return socket.sendResult(retval);
} }
sls::MacAddr ClientInterface::setUdpIp2(sls::IpAddr arg) {
int ClientInterface::set_udp_ip2(Interface &socket) {
auto arg = socket.Receive<sls::IpAddr>();
verifyIdle(socket);
if (myDetectorType != JUNGFRAU) {
throw RuntimeError("UDP Destination IP2 not implemented for this detector");
}
LOG(logINFO) << "Received UDP IP2: " << arg;
// getting eth // getting eth
std::string eth = sls::IpToInterfaceName(arg.str()); std::string eth = sls::IpToInterfaceName(arg.str());
if (eth == "none") { if (eth == "none") {
@ -1414,6 +1513,17 @@ int ClientInterface::set_udp_ip2(Interface &socket) {
if (retval == 0) { if (retval == 0) {
throw RuntimeError("Failed to get udp mac adddress2 to listen to (eth:" + eth + ", ip:" + arg.str() + ")\n"); throw RuntimeError("Failed to get udp mac adddress2 to listen to (eth:" + eth + ", ip:" + arg.str() + ")\n");
} }
return retval;
}
int ClientInterface::set_udp_ip2(Interface &socket) {
auto arg = socket.Receive<sls::IpAddr>();
verifyIdle(socket);
if (myDetectorType != JUNGFRAU) {
throw RuntimeError("UDP Destination IP2 not implemented for this detector");
}
LOG(logINFO) << "Received UDP IP2: " << arg;
auto retval = setUdpIp2(arg);
LOG(logINFO) << "Receiver MAC Address2: " << retval; LOG(logINFO) << "Receiver MAC Address2: " << retval;
return socket.sendResult(retval); return socket.sendResult(retval);
} }

View File

@ -10,8 +10,18 @@ class ServerInterface;
#include <future> #include <future>
class ClientInterface : private virtual slsDetectorDefs { class ClientInterface : private virtual slsDetectorDefs {
private:
enum numberMode { DEC, HEX }; enum numberMode { DEC, HEX };
detectorType myDetectorType;
int portNumber{0};
sls::ServerSocket server;
std::unique_ptr<Implementation> receiver;
std::unique_ptr<std::thread> tcpThread;
int ret{OK};
int fnum{-1};
int lockedByClient{0};
std::atomic<bool> killTcpThread{false};
public: public:
virtual ~ClientInterface(); virtual ~ClientInterface();
@ -49,15 +59,14 @@ class ClientInterface : private virtual slsDetectorDefs {
void verifyLock(); void verifyLock();
void verifyIdle(sls::ServerInterface &socket); void verifyIdle(sls::ServerInterface &socket);
int exec_command(sls::ServerInterface &socket); int exec_command(sls::ServerInterface &socket);
int exit_server(sls::ServerInterface &socket); int exit_server(sls::ServerInterface &socket);
int lock_receiver(sls::ServerInterface &socket); int lock_receiver(sls::ServerInterface &socket);
int get_last_client_ip(sls::ServerInterface &socket); int get_last_client_ip(sls::ServerInterface &socket);
int set_port(sls::ServerInterface &socket); int set_port(sls::ServerInterface &socket);
int get_version(sls::ServerInterface &socket); int get_version(sls::ServerInterface &socket);
int set_detector_type(sls::ServerInterface &socket); int setup_receiver(sls::ServerInterface &socket);
int set_detector_hostname(sls::ServerInterface &socket); void setDetectorType(detectorType arg);
int set_roi(sls::ServerInterface &socket); int set_roi(sls::ServerInterface &socket);
int set_num_frames(sls::ServerInterface &socket); int set_num_frames(sls::ServerInterface &socket);
int set_num_triggers(sls::ServerInterface &socket); int set_num_triggers(sls::ServerInterface &socket);
@ -102,8 +111,6 @@ class ClientInterface : private virtual slsDetectorDefs {
int set_flipped_data(sls::ServerInterface &socket); int set_flipped_data(sls::ServerInterface &socket);
int set_file_format(sls::ServerInterface &socket); int set_file_format(sls::ServerInterface &socket);
int get_file_format(sls::ServerInterface &socket); int get_file_format(sls::ServerInterface &socket);
int set_detector_posid(sls::ServerInterface &socket);
int set_multi_detector_size(sls::ServerInterface &socket);
int set_streaming_port(sls::ServerInterface &socket); int set_streaming_port(sls::ServerInterface &socket);
int get_streaming_port(sls::ServerInterface &socket); int get_streaming_port(sls::ServerInterface &socket);
int set_streaming_source_ip(sls::ServerInterface &socket); int set_streaming_source_ip(sls::ServerInterface &socket);
@ -132,7 +139,9 @@ class ClientInterface : private virtual slsDetectorDefs {
int get_dbit_offset(sls::ServerInterface &socket); int get_dbit_offset(sls::ServerInterface &socket);
int set_quad_type(sls::ServerInterface &socket); int set_quad_type(sls::ServerInterface &socket);
int set_read_n_lines(sls::ServerInterface &socket); int set_read_n_lines(sls::ServerInterface &socket);
sls::MacAddr setUdpIp(sls::IpAddr arg);
int set_udp_ip(sls::ServerInterface &socket); int set_udp_ip(sls::ServerInterface &socket);
sls::MacAddr setUdpIp2(sls::IpAddr arg);
int set_udp_ip2(sls::ServerInterface &socket); int set_udp_ip2(sls::ServerInterface &socket);
int set_udp_port(sls::ServerInterface &socket); int set_udp_port(sls::ServerInterface &socket);
int set_udp_port2(sls::ServerInterface &socket); int set_udp_port2(sls::ServerInterface &socket);
@ -144,7 +153,6 @@ class ClientInterface : private virtual slsDetectorDefs {
int get_additional_json_parameter(sls::ServerInterface &socket); int get_additional_json_parameter(sls::ServerInterface &socket);
int get_progress(sls::ServerInterface &socket); int get_progress(sls::ServerInterface &socket);
Implementation *impl() { Implementation *impl() {
if (receiver != nullptr) { if (receiver != nullptr) {
return receiver.get(); return receiver.get();
@ -154,19 +162,9 @@ class ClientInterface : private virtual slsDetectorDefs {
} }
} }
detectorType myDetectorType;
std::unique_ptr<Implementation> receiver{nullptr};
int (ClientInterface::*flist[NUM_REC_FUNCTIONS])( int (ClientInterface::*flist[NUM_REC_FUNCTIONS])(
sls::ServerInterface &socket); sls::ServerInterface &socket);
int ret{OK};
int fnum{-1};
int lockedByClient{0};
int portNumber{0};
std::atomic<bool> killTcpThread{false};
std::unique_ptr<std::thread> tcpThread;
//***callback parameters*** //***callback parameters***
int (*startAcquisitionCallBack)(std::string, std::string, uint64_t, uint32_t, int (*startAcquisitionCallBack)(std::string, std::string, uint64_t, uint32_t,
@ -179,6 +177,6 @@ class ClientInterface : private virtual slsDetectorDefs {
void *) = nullptr; void *) = nullptr;
void *pRawDataReady{nullptr}; void *pRawDataReady{nullptr};
protected:
std::unique_ptr<sls::ServerSocket> server{nullptr};
}; };

View File

@ -29,13 +29,9 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f,
uint32_t* freq, uint32_t* timer, uint32_t* freq, uint32_t* timer,
bool* fp, bool* act, bool* depaden, bool* sm, bool* qe, bool* fp, bool* act, bool* depaden, bool* sm, bool* qe,
std::vector <int> * cdl, int* cdo, int* cad) : std::vector <int> * cdl, int* cdo, int* cad) :
ThreadObject(ind, TypeName), ThreadObject(ind, TypeName),
runningFlag(false),
generalData(nullptr),
fifo(f), fifo(f),
myDetectorType(dtype), myDetectorType(dtype),
file(nullptr),
dataStreamEnable(dsEnable), dataStreamEnable(dsEnable),
fileFormatType(ftype), fileFormatType(ftype),
fileWriteEnable(fwenable), fileWriteEnable(fwenable),
@ -43,7 +39,6 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f,
dynamicRange(dr), dynamicRange(dr),
streamingFrequency(freq), streamingFrequency(freq),
streamingTimerInMs(timer), streamingTimerInMs(timer),
currentFreqCount(0),
activated(act), activated(act),
deactivatedPaddingEnable(depaden), deactivatedPaddingEnable(depaden),
silentMode(sm), silentMode(sm),
@ -51,14 +46,7 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f,
framePadding(fp), framePadding(fp),
ctbDbitList(cdl), ctbDbitList(cdl),
ctbDbitOffset(cdo), ctbDbitOffset(cdo),
ctbAnalogDataBytes(cad), ctbAnalogDataBytes(cad)
startedFlag(false),
firstIndex(0),
numFramesCaught(0),
currentFrameIndex(0),
rawDataReadyCallBack(nullptr),
rawDataModifyReadyCallBack(nullptr),
pRawDataReady(nullptr)
{ {
LOG(logDEBUG) << "DataProcessor " << ind << " created"; LOG(logDEBUG) << "DataProcessor " << ind << " created";
memset((void*)&timerBegin, 0, sizeof(timespec)); memset((void*)&timerBegin, 0, sizeof(timespec));
@ -71,10 +59,6 @@ DataProcessor::~DataProcessor() {
/** getters */ /** getters */
bool DataProcessor::IsRunning() {
return runningFlag;
}
bool DataProcessor::GetStartedFlag(){ bool DataProcessor::GetStartedFlag(){
return startedFlag; return startedFlag;
} }
@ -91,24 +75,12 @@ uint64_t DataProcessor::GetProcessedIndex() {
return currentFrameIndex - firstIndex; return currentFrameIndex - firstIndex;
} }
/** setters */
void DataProcessor::StartRunning() {
runningFlag = true;
}
void DataProcessor::StopRunning() {
runningFlag = false;
}
void DataProcessor::SetFifo(Fifo* f) { void DataProcessor::SetFifo(Fifo* f) {
fifo = f; fifo = f;
} }
void DataProcessor::ResetParametersforNewAcquisition(){ void DataProcessor::ResetParametersforNewAcquisition(){
runningFlag = false; StopRunning();
startedFlag = false; startedFlag = false;
numFramesCaught = 0; numFramesCaught = 0;
firstIndex = 0; firstIndex = 0;

View File

@ -59,11 +59,6 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
//*** getters *** //*** getters ***
/**
* Returns if the thread is currently running
* @returns true if thread is running, else false
*/
bool IsRunning() override;
/** /**
* Get acquisition started flag * Get acquisition started flag
@ -89,17 +84,6 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
*/ */
uint64_t GetProcessedIndex(); uint64_t GetProcessedIndex();
//*** setters ***
/**
* Set bit in RunningMask to allow thread to run
*/
void StartRunning();
/**
* Reset bit in RunningMask to prevent thread from running
*/
void StopRunning();
/** /**
* Set Fifo pointer to the one given * Set Fifo pointer to the one given
* @param f address of Fifo pointer * @param f address of Fifo pointer
@ -254,11 +238,8 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
/** type of thread */ /** type of thread */
static const std::string TypeName; static const std::string TypeName;
/** Object running status */
std::atomic<bool> runningFlag;
/** GeneralData (Detector Data) object */ /** GeneralData (Detector Data) object */
const GeneralData* generalData; const GeneralData* generalData{nullptr};
/** Fifo structure */ /** Fifo structure */
Fifo* fifo; Fifo* fifo;
@ -269,7 +250,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
detectorType myDetectorType; detectorType myDetectorType;
/** File writer implemented as binary or hdf5 File */ /** File writer implemented as binary or hdf5 File */
File* file; File* file{nullptr};
/** Data Stream Enable */ /** Data Stream Enable */
bool* dataStreamEnable; bool* dataStreamEnable;
@ -293,7 +274,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
uint32_t* streamingTimerInMs; uint32_t* streamingTimerInMs;
/** Current frequency count */ /** Current frequency count */
uint32_t currentFreqCount; uint32_t currentFreqCount{0};
/** timer beginning stamp for random streaming */ /** timer beginning stamp for random streaming */
struct timespec timerBegin; struct timespec timerBegin;
@ -324,21 +305,18 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
//acquisition start //acquisition start
/** Aquisition Started flag */ /** Aquisition Started flag */
bool startedFlag; std::atomic<bool> startedFlag{false};
/** Frame Number of First Frame */ /** Frame Number of First Frame */
uint64_t firstIndex; std::atomic<uint64_t> firstIndex{0};
//for statistics //for statistics
/** Number of complete frames caught */ /** Number of complete frames caught */
uint64_t numFramesCaught; uint64_t numFramesCaught{0};
/** Frame Number of latest processed frame number */ /** Frame Number of latest processed frame number */
uint64_t currentFrameIndex; std::atomic<uint64_t> currentFrameIndex{0};
//call back //call back
/** /**
@ -349,7 +327,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
* dataSize in bytes is the size of the data in bytes. * dataSize in bytes is the size of the data in bytes.
*/ */
void (*rawDataReadyCallBack)(char*, void (*rawDataReadyCallBack)(char*,
char*, uint32_t, void*); char*, uint32_t, void*) = nullptr;
/** /**
* Call back for raw data (modified) * Call back for raw data (modified)
@ -359,9 +337,9 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
* revDatasize is the reference of data size in bytes. Can be modified to the new size to be written/streamed. (only smaller value). * revDatasize is the reference of data size in bytes. Can be modified to the new size to be written/streamed. (only smaller value).
*/ */
void (*rawDataModifyReadyCallBack)(char*, void (*rawDataModifyReadyCallBack)(char*,
char*, uint32_t &, void*); char*, uint32_t &, void*) = nullptr;
void *pRawDataReady; void *pRawDataReady{nullptr};

View File

@ -19,18 +19,11 @@ const std::string DataStreamer::TypeName = "DataStreamer";
DataStreamer::DataStreamer(int ind, Fifo* f, uint32_t* dr, ROI* r, DataStreamer::DataStreamer(int ind, Fifo* f, uint32_t* dr, ROI* r,
uint64_t* fi, int fd, int* nd, bool* qe, uint64_t* tot) : uint64_t* fi, int fd, int* nd, bool* qe, uint64_t* tot) :
ThreadObject(ind, TypeName), ThreadObject(ind, TypeName),
runningFlag(0),
generalData(nullptr),
fifo(f), fifo(f),
zmqSocket(nullptr),
dynamicRange(dr), dynamicRange(dr),
roi(r), roi(r),
adcConfigured(-1),
fileIndex(fi), fileIndex(fi),
flippedDataX(fd), flippedDataX(fd),
startedFlag(false),
firstIndex(0),
completeBuffer(nullptr),
quadEnable(qe), quadEnable(qe),
totalNumFrames(tot) totalNumFrames(tot)
{ {
@ -46,29 +39,12 @@ DataStreamer::~DataStreamer() {
delete [] completeBuffer; delete [] completeBuffer;
} }
/** getters */
bool DataStreamer::IsRunning() {
return runningFlag;
}
/** setters */
void DataStreamer::StartRunning() {
runningFlag = true;
}
void DataStreamer::StopRunning() {
runningFlag = false;
}
void DataStreamer::SetFifo(Fifo* f) { void DataStreamer::SetFifo(Fifo* f) {
fifo = f; fifo = f;
} }
void DataStreamer::ResetParametersforNewAcquisition(const std::string& fname){ void DataStreamer::ResetParametersforNewAcquisition(const std::string& fname){
runningFlag = false; StopRunning();
startedFlag = false; startedFlag = false;
firstIndex = 0; firstIndex = 0;

View File

@ -42,25 +42,6 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
*/ */
~DataStreamer(); ~DataStreamer();
//*** getters ***
/**
* Returns if the thread is currently running
* @returns true if thread is running, else false
*/
bool IsRunning();
//*** setters ***
/**
* Set bit in RunningMask to allow thread to run
*/
void StartRunning();
/**
* Reset bit in RunningMask to prevent thread from running
*/
void StopRunning();
/** /**
* Set Fifo pointer to the one given * Set Fifo pointer to the one given
* @param f address of Fifo pointer * @param f address of Fifo pointer
@ -158,19 +139,14 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
/** type of thread */ /** type of thread */
static const std::string TypeName; static const std::string TypeName;
/** Object running status */
bool runningFlag;
/** GeneralData (Detector Data) object */ /** GeneralData (Detector Data) object */
const GeneralData* generalData; const GeneralData* generalData{nullptr};
/** Fifo structure */ /** Fifo structure */
Fifo* fifo; Fifo* fifo;
/** ZMQ Socket - Receiver to Client */ /** ZMQ Socket - Receiver to Client */
ZmqSocket* zmqSocket; ZmqSocket* zmqSocket{nullptr};
/** Pointer to dynamic range */ /** Pointer to dynamic range */
uint32_t* dynamicRange; uint32_t* dynamicRange;
@ -179,7 +155,7 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
ROI* roi; ROI* roi;
/** adc Configured */ /** adc Configured */
int adcConfigured; int adcConfigured{-1};
/** Pointer to file index */ /** Pointer to file index */
uint64_t* fileIndex; uint64_t* fileIndex;
@ -191,16 +167,16 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
std::map<std::string, std::string> additionJsonHeader; std::map<std::string, std::string> additionJsonHeader;
/** Aquisition Started flag */ /** Aquisition Started flag */
bool startedFlag; bool startedFlag{nullptr};
/** Frame Number of First Frame */ /** Frame Number of First Frame */
uint64_t firstIndex; uint64_t firstIndex{0};
/* File name to stream */ /* File name to stream */
std::string fileNametoStream; std::string fileNametoStream;
/** Complete buffer used for roi, eg. shortGotthard */ /** Complete buffer used for roi, eg. shortGotthard */
char* completeBuffer; char* completeBuffer{nullptr};
/** Number of Detectors in X and Y dimension */ /** Number of Detectors in X and Y dimension */
int numDet[2]; int numDet[2];

View File

@ -12,6 +12,7 @@
#include "container_utils.h" // For sls::make_unique<> #include "container_utils.h" // For sls::make_unique<>
#include "sls_detector_exceptions.h" #include "sls_detector_exceptions.h"
#include "UdpRxSocket.h" #include "UdpRxSocket.h"
#include "network_utils.h"
#include <cerrno> #include <cerrno>
#include <cstring> #include <cstring>
@ -25,12 +26,9 @@ Listener::Listener(int ind, detectorType dtype, Fifo* f, std::atomic<runStatus>*
int64_t* us, int64_t* as, uint32_t* fpf, int64_t* us, int64_t* as, uint32_t* fpf,
frameDiscardPolicy* fdp, bool* act, bool* depaden, bool* sm) : frameDiscardPolicy* fdp, bool* act, bool* depaden, bool* sm) :
ThreadObject(ind, TypeName), ThreadObject(ind, TypeName),
runningFlag(0),
generalData(nullptr),
fifo(f), fifo(f),
myDetectorType(dtype), myDetectorType(dtype),
status(s), status(s),
udpSocket(nullptr),
udpPortNumber(portno), udpPortNumber(portno),
eth(e), eth(e),
numImages(nf), numImages(nf),
@ -41,45 +39,22 @@ Listener::Listener(int ind, detectorType dtype, Fifo* f, std::atomic<runStatus>*
frameDiscardMode(fdp), frameDiscardMode(fdp),
activated(act), activated(act),
deactivatedPaddingEnable(depaden), deactivatedPaddingEnable(depaden),
silentMode(sm), silentMode(sm)
row(0),
column(0),
startedFlag(false),
firstIndex(0),
numPacketsCaught(0),
lastCaughtFrameIndex(0),
currentFrameIndex(0),
carryOverFlag(0),
udpSocketAlive(0),
numPacketsStatistic(0),
numFramesStatistic(0),
oddStartingPacket(true)
{ {
LOG(logDEBUG) << "Listener " << ind << " created"; LOG(logDEBUG) << "Listener " << ind << " created";
} }
Listener::~Listener() = default;
Listener::~Listener() { uint64_t Listener::GetPacketsCaught() const {
if (udpSocket){
sem_post(&semaphore_socket);
sem_destroy(&semaphore_socket);
}
}
/** getters */
bool Listener::IsRunning() {
return runningFlag;
}
uint64_t Listener::GetPacketsCaught() {
return numPacketsCaught; return numPacketsCaught;
} }
uint64_t Listener::GetLastFrameIndexCaught() { uint64_t Listener::GetLastFrameIndexCaught() const {
return lastCaughtFrameIndex; return lastCaughtFrameIndex;
} }
uint64_t Listener::GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) { uint64_t Listener::GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) const {
if (!stoppedFlag) { if (!stoppedFlag) {
return (numPackets - numPacketsCaught); return (numPackets - numPacketsCaught);
} }
@ -89,24 +64,12 @@ uint64_t Listener::GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) {
return (lastCaughtFrameIndex - firstIndex + 1) * generalData->packetsPerFrame - numPacketsCaught; return (lastCaughtFrameIndex - firstIndex + 1) * generalData->packetsPerFrame - numPacketsCaught;
} }
/** setters */
void Listener::StartRunning() {
runningFlag = true;
}
void Listener::StopRunning() {
runningFlag = false;
}
void Listener::SetFifo(Fifo* f) { void Listener::SetFifo(Fifo* f) {
fifo = f; fifo = f;
} }
void Listener::ResetParametersforNewAcquisition() { void Listener::ResetParametersforNewAcquisition() {
runningFlag = false; StopRunning();
startedFlag = false; startedFlag = false;
numPacketsCaught = 0; numPacketsCaught = 0;
firstIndex = 0; firstIndex = 0;
@ -174,10 +137,9 @@ void Listener::CreateUDPSockets() {
} }
udpSocketAlive = true; udpSocketAlive = true;
sem_init(&semaphore_socket,1,0);
// doubled due to kernel bookkeeping (could also be less due to permissions) // doubled due to kernel bookkeeping (could also be less due to permissions)
*actualUDPSocketBufferSize = udpSocket->getActualUDPSocketBufferSize(); *actualUDPSocketBufferSize = udpSocket->getBufferSize();
} }
@ -185,14 +147,8 @@ void Listener::CreateUDPSockets() {
void Listener::ShutDownUDPSocket() { void Listener::ShutDownUDPSocket() {
if(udpSocket){ if(udpSocket){
udpSocketAlive = false; udpSocketAlive = false;
udpSocket->ShutDownSocket(); udpSocket->Shutdown();
LOG(logINFO) << "Shut down of UDP port " << *udpPortNumber; LOG(logINFO) << "Shut down of UDP port " << *udpPortNumber;
fflush(stdout);
// wait only if the threads have started as it is the threads that
//give a post to semaphore(at stopListening)
if (runningFlag)
sem_wait(&semaphore_socket);
sem_destroy(&semaphore_socket);
} }
} }
@ -220,7 +176,7 @@ void Listener::CreateDummySocketForUDPSocketBufferSize(int64_t s) {
*udpSocketBufferSize); *udpSocketBufferSize);
// doubled due to kernel bookkeeping (could also be less due to permissions) // doubled due to kernel bookkeeping (could also be less due to permissions)
*actualUDPSocketBufferSize = g.getActualUDPSocketBufferSize(); *actualUDPSocketBufferSize = g.getBufferSize();
if (*actualUDPSocketBufferSize == -1) { if (*actualUDPSocketBufferSize == -1) {
*udpSocketBufferSize = temp; *udpSocketBufferSize = temp;
} else { } else {
@ -300,10 +256,8 @@ void Listener::StopListening(char* buf) {
(*((uint32_t*)buf)) = DUMMY_PACKET_VALUE; (*((uint32_t*)buf)) = DUMMY_PACKET_VALUE;
fifo->PushAddress(buf); fifo->PushAddress(buf);
StopRunning(); StopRunning();
LOG(logDEBUG1) << index << ": Listening Packets (" << *udpPortNumber << ") : " << numPacketsCaught;
sem_post(&semaphore_socket); LOG(logDEBUG1) << index << ": Listening Completed";
LOG(logDEBUG1) << index << ": Listening Packets (" << *udpPortNumber << ") : " << numPacketsCaught;
LOG(logDEBUG1) << index << ": Listening Completed";
} }

View File

@ -12,13 +12,10 @@
#include <memory> #include <memory>
#include <atomic> #include <atomic>
#include "ThreadObject.h" #include "ThreadObject.h"
#include "UdpRxSocket.h"
class GeneralData; class GeneralData;
class Fifo; class Fifo;
namespace sls{
class UdpRxSocket;
}
class Listener : private virtual slsDetectorDefs, public ThreadObject { class Listener : private virtual slsDetectorDefs, public ThreadObject {
@ -53,40 +50,20 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
*/ */
~Listener(); ~Listener();
//*** getters ***
/**
* Returns if the thread is currently running
* @returns true if thread is running, else false
*/
bool IsRunning() override;
/** /**
* Get Packets caught * Get Packets caught
* @return Packets caught * @return Packets caught
*/ */
uint64_t GetPacketsCaught(); uint64_t GetPacketsCaught() const;
/** /**
* Get Last Frame index caught * Get Last Frame index caught
* @return last frame index caught * @return last frame index caught
*/ */
uint64_t GetLastFrameIndexCaught(); uint64_t GetLastFrameIndexCaught() const;
/** Get number of missing packets */ /** Get number of missing packets */
uint64_t GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets); uint64_t GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) const;
//*** setters ***
/**
* Set bit in RunningMask to allow thread to run
*/
void StartRunning();
/**
* Reset bit in RunningMask to prevent thread from running
*/
void StopRunning();
/** /**
* Set Fifo pointer to the one given * Set Fifo pointer to the one given
@ -140,7 +117,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
void RecordFirstIndex(uint64_t fnum); void RecordFirstIndex(uint64_t fnum);
/** /**
* Thread Exeution for Listener Class * Thread Execution for Listener Class
* Pop free addresses, listen to udp socket, * Pop free addresses, listen to udp socket,
* write to memory & push the address into fifo * write to memory & push the address into fifo
*/ */
@ -168,16 +145,11 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
*/ */
void PrintFifoStatistics(); void PrintFifoStatistics();
/** type of thread */ /** type of thread */
static const std::string TypeName; static const std::string TypeName;
/** Object running status */
std::atomic<bool> runningFlag;
/** GeneralData (Detector Data) object */ /** GeneralData (Detector Data) object */
GeneralData* generalData; GeneralData* generalData{nullptr};
/** Fifo structure */ /** Fifo structure */
Fifo* fifo; Fifo* fifo;
@ -190,7 +162,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
std::atomic<runStatus>* status; std::atomic<runStatus>* status;
/** UDP Socket - Detector to Receiver */ /** UDP Socket - Detector to Receiver */
std::unique_ptr<sls::UdpRxSocket> udpSocket; std::unique_ptr<sls::UdpRxSocket> udpSocket{nullptr};
/** UDP Port Number */ /** UDP Port Number */
uint32_t* udpPortNumber; uint32_t* udpPortNumber;
@ -228,36 +200,34 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
/** row hardcoded as 1D or 2d, /** row hardcoded as 1D or 2d,
* if detector does not send them yet or * if detector does not send them yet or
* missing packets/deactivated (eiger/jungfrau sends 2d pos) **/ * missing packets/deactivated (eiger/jungfrau sends 2d pos) **/
uint16_t row; uint16_t row{0};
/** column hardcoded as 2D, /** column hardcoded as 2D,
* deactivated eiger/missing packets (eiger/jungfrau sends 2d pos) **/ * deactivated eiger/missing packets (eiger/jungfrau sends 2d pos) **/
uint16_t column; uint16_t column{0};
// acquisition start // acquisition start
/** Aquisition Started flag */ /** Aquisition Started flag */
std::atomic<bool> startedFlag; std::atomic<bool> startedFlag{false};
/** Frame Number of First Frame */ /** Frame Number of First Frame */
uint64_t firstIndex; uint64_t firstIndex{0};
// for acquisition summary // for acquisition summary
/** Number of complete Packets caught */ /** Number of complete Packets caught */
std::atomic<uint64_t> numPacketsCaught; std::atomic<uint64_t> numPacketsCaught{0};
/** Last Frame Index caught from udp network */ /** Last Frame Index caught from udp network */
std::atomic<uint64_t> lastCaughtFrameIndex; std::atomic<uint64_t> lastCaughtFrameIndex{0};
// parameters to acquire image // parameters to acquire image
/** Current Frame Index, default value is 0 /** Current Frame Index, default value is 0
* ( always check startedFlag for validity first) * ( always check startedFlag for validity first)
*/ */
uint64_t currentFrameIndex; uint64_t currentFrameIndex{0};
/** True if there is a packet carry over from previous Image */ /** True if there is a packet carry over from previous Image */
bool carryOverFlag; bool carryOverFlag{false};
/** Carry over packet buffer */ /** Carry over packet buffer */
std::unique_ptr<char []> carryOverPacket; std::unique_ptr<char []> carryOverPacket;
@ -266,22 +236,19 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
std::unique_ptr<char []> listeningPacket; std::unique_ptr<char []> listeningPacket;
/** if the udp socket is connected */ /** if the udp socket is connected */
std::atomic<bool> udpSocketAlive; std::atomic<bool> udpSocketAlive{false};
/** Semaphore to synchonize deleting udp socket */ // for print progress during acquisition
sem_t semaphore_socket;
// for print progress during acqusition
/** number of packets for statistic */ /** number of packets for statistic */
uint32_t numPacketsStatistic; uint32_t numPacketsStatistic{0};
/** number of images for statistic */ /** number of images for statistic */
uint32_t numFramesStatistic; uint32_t numFramesStatistic{0};
/** /**
* starting packet number is odd or evern, accordingly increment frame number * starting packet number is odd or even, accordingly increment frame number
* to get first packet number as 0 * to get first packet number as 0
* (pecific to gotthard, can vary between modules, hence defined here) */ * (pecific to gotthard, can vary between modules, hence defined here) */
bool oddStartingPacket; bool oddStartingPacket{true};
}; };

View File

@ -42,17 +42,14 @@ int main(int argc, char *argv[]) {
LOG(logERROR) << "Could not set handler function for SIGPIPE"; LOG(logERROR) << "Could not set handler function for SIGPIPE";
} }
std::unique_ptr<Receiver> receiver = nullptr;
try { try {
receiver = sls::make_unique<Receiver>(argc, argv); Receiver r(argc, argv);
LOG(logINFO) << "[ Press \'Ctrl+c\' to exit ]";
sem_wait(&semaphore);
sem_destroy(&semaphore);
} catch (...) { } catch (...) {
LOG(logINFOBLUE) << "Exiting [ Tid: " << syscall(SYS_gettid) << " ]"; //pass
throw;
} }
LOG(logINFO) << "[ Press \'Ctrl+c\' to exit ]";
sem_wait(&semaphore);
sem_destroy(&semaphore);
LOG(logINFOBLUE) << "Exiting [ Tid: " << syscall(SYS_gettid) << " ]"; LOG(logINFOBLUE) << "Exiting [ Tid: " << syscall(SYS_gettid) << " ]";
LOG(logINFO) << "Exiting Receiver"; LOG(logINFO) << "Exiting Receiver";
return 0; return 0;

View File

@ -3,56 +3,51 @@
* @short creates/destroys a thread * @short creates/destroys a thread
***********************************************/ ***********************************************/
#include "ThreadObject.h" #include "ThreadObject.h"
#include "container_utils.h" #include "container_utils.h"
#include <iostream> #include <iostream>
#include <sys/syscall.h> #include <sys/syscall.h>
#include <unistd.h> #include <unistd.h>
ThreadObject::ThreadObject(int threadIndex, std::string threadType) ThreadObject::ThreadObject(int threadIndex, std::string threadType)
: index(threadIndex), type(threadType) { : index(threadIndex), type(threadType) {
LOG(logDEBUG) << type << " thread created: " << index; LOG(logDEBUG) << type << " thread created: " << index;
sem_init(&semaphore,1,0); sem_init(&semaphore,1,0);
try { try {
threadObject = sls::make_unique<std::thread>(&ThreadObject::RunningThread, this); threadObject = std::thread(&ThreadObject::RunningThread, this);
} catch (...) { } catch (...) {
throw sls::RuntimeError("Could not create " + type + " thread with index " + std::to_string(index)); throw sls::RuntimeError("Could not create " + type + " thread with index " + std::to_string(index));
} }
} }
ThreadObject::~ThreadObject() { ThreadObject::~ThreadObject() {
killThread = true; killThread = true;
sem_post(&semaphore); sem_post(&semaphore);
threadObject.join();
threadObject->join();
sem_destroy(&semaphore); sem_destroy(&semaphore);
} }
bool ThreadObject::IsRunning() const{
return runningFlag;
}
void ThreadObject::StartRunning() {
runningFlag = true;
}
void ThreadObject::StopRunning() {
runningFlag = false;
}
void ThreadObject::RunningThread() { void ThreadObject::RunningThread() {
LOG(logINFOBLUE) << "Created [ " << type << "Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]"; LOG(logINFOBLUE) << "Created [ " << type << "Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]";
LOG(logDEBUG) << type << " thread " << index << " created successfully."; while(!killThread) {
while(true) {
while(IsRunning()) { while(IsRunning()) {
ThreadExecution(); ThreadExecution();
} }
//wait till the next acquisition //wait till the next acquisition
sem_wait(&semaphore); sem_wait(&semaphore);
if(killThread) {
break;
}
} }
LOG(logDEBUG) << type << " thread with index " << index << " destroyed successfully.";
LOG(logINFOBLUE) << "Exiting [ " << type << " Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]"; LOG(logINFOBLUE) << "Exiting [ " << type << " Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]";
} }
@ -61,11 +56,10 @@ void ThreadObject::Continue() {
sem_post(&semaphore); sem_post(&semaphore);
} }
void ThreadObject::SetThreadPriority(int priority) { void ThreadObject::SetThreadPriority(int priority) {
struct sched_param param; struct sched_param param;
param.sched_priority = priority; param.sched_priority = priority;
if (pthread_setschedparam(threadObject->native_handle(), SCHED_FIFO, &param) == EPERM) { if (pthread_setschedparam(threadObject.native_handle(), SCHED_FIFO, &param) == EPERM) {
if (index == 0) { if (index == 0) {
LOG(logWARNING) << "Could not prioritize " << type << " thread. " LOG(logWARNING) << "Could not prioritize " << type << " thread. "
"(No Root Privileges?)"; "(No Root Privileges?)";

View File

@ -7,41 +7,40 @@
*@short creates/destroys a thread *@short creates/destroys a thread
*/ */
#include "sls_detector_defs.h"
#include "logger.h" #include "logger.h"
#include "sls_detector_defs.h"
#include <atomic>
#include <thread>
#include <semaphore.h> #include <semaphore.h>
#include <string> #include <string>
#include <atomic>
#include <future>
class ThreadObject : private virtual slsDetectorDefs { class ThreadObject : private virtual slsDetectorDefs {
protected:
public: const int index{0};
ThreadObject(int threadIndex, std::string threadType);
virtual ~ThreadObject();
virtual bool IsRunning() = 0;
void Continue();
void SetThreadPriority(int priority);
protected: private:
virtual void ThreadExecution() = 0; std::atomic<bool> killThread{false};
std::atomic<bool> runningFlag{false};
std::thread threadObject;
sem_t semaphore;
const std::string type;
private: public:
/** ThreadObject(int threadIndex, std::string threadType);
* Thread called: An infinite while loop in which, virtual ~ThreadObject();
* semaphore starts executing its contents as long RunningMask is satisfied bool IsRunning() const;
* Then it exits the thread on its own if killThread is true void StartRunning();
*/ void StopRunning();
void RunningThread(); void Continue();
void SetThreadPriority(int priority);
private:
protected: virtual void ThreadExecution() = 0;
int index{0}; /**
std::string type; * Thread called: An infinite while loop in which,
std::atomic<bool> killThread{false}; * semaphore starts executing its contents as long RunningMask is satisfied
std::unique_ptr<std::thread> threadObject; * Then it exits the thread on its own if killThread is true
sem_t semaphore; */
void RunningThread();
}; };

View File

@ -8,6 +8,7 @@ set(SOURCES
src/ToString.cpp src/ToString.cpp
src/network_utils.cpp src/network_utils.cpp
src/ZmqSocket.cpp src/ZmqSocket.cpp
src/UdpRxSocket.cpp
) )
set(HEADERS set(HEADERS
@ -41,11 +42,13 @@ add_library(slsSupportLib SHARED
${HEADERS} ${HEADERS}
) )
check_ipo_supported(RESULT result)
if(result) if(SLS_LTO_AVAILABLE)
set_property(TARGET slsSupportLib PROPERTY INTERPROCEDURAL_OPTIMIZATION True) set_property(TARGET slsSupportLib PROPERTY INTERPROCEDURAL_OPTIMIZATION True)
endif() endif()
target_include_directories(slsSupportLib PUBLIC target_include_directories(slsSupportLib PUBLIC
"$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>" "$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>"
"$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>" "$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>"

View File

@ -21,7 +21,7 @@ class DataSocket {
//No copy since the class manage the underlying socket //No copy since the class manage the underlying socket
DataSocket(const DataSocket &) = delete; DataSocket(const DataSocket &) = delete;
DataSocket &operator=(DataSocket const &) = delete; DataSocket &operator=(DataSocket const &) = delete;
int getSocketId() const { return socketId_; } int getSocketId() const { return sockfd_; }
int Send(const void *buffer, size_t size); int Send(const void *buffer, size_t size);
@ -51,9 +51,10 @@ class DataSocket {
int setReceiveTimeout(int us); int setReceiveTimeout(int us);
void close(); void close();
void shutDownSocket(); void shutDownSocket();
void shutdown();
private: private:
int socketId_ = -1; int sockfd_ = -1;
}; };
}; // namespace sls }; // namespace sls

View File

@ -1,141 +1,30 @@
#pragma once
/* /*
UdpRxSocket provies socket control to receive UDP socket class to receive data. The intended use is in the
data on a udp socket. receiver listener loop. Should be used RAII style...
It provides a drop in replacement for
genericSocket. But please be careful since
this might be deprecated in the future
*/ */
#include "network_utils.h" #include <sys/types.h> //ssize_t
#include "sls_detector_exceptions.h"
#include <cstdint>
#include <errno.h>
#include <iostream>
#include <netdb.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
#include <vector>
namespace sls { namespace sls {
class UdpRxSocket { class UdpRxSocket {
const ssize_t packet_size; const ssize_t packet_size_;
char *buff; int sockfd_{-1};
int fd = -1;
public: public:
UdpRxSocket(int port, ssize_t packet_size, const char *hostname = nullptr, UdpRxSocket(int port, ssize_t packet_size, const char *hostname = nullptr,
ssize_t buffer_size = 0) size_t kernel_buffer_size = 0);
: packet_size(packet_size) { ~UdpRxSocket();
/* hostname = nullptr -> wildcard */ bool ReceivePacket(char *dst) noexcept;
size_t getBufferSize() const;
void setBufferSize(ssize_t size);
ssize_t getPacketSize() const noexcept;
void Shutdown();
struct addrinfo hints; // Only for backwards compatibility, this drops the EIGER small pkt, may be
memset(&hints, 0, sizeof(hints)); // removed
hints.ai_family = AF_UNSPEC; ssize_t ReceiveDataOnly(char *dst) noexcept;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = 0;
hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
struct addrinfo *res = 0;
const std::string portname = std::to_string(port);
if (getaddrinfo(hostname, portname.c_str(), &hints, &res)) {
throw RuntimeError("Failed at getaddrinfo with " +
std::string(hostname));
}
fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (fd == -1) {
throw RuntimeError("Failed to create UDP RX socket");
}
if (bind(fd, res->ai_addr, res->ai_addrlen) == -1) {
throw RuntimeError("Failed to bind UDP RX socket");
}
freeaddrinfo(res);
// If we get a specified buffer size that is larger than the set one
// we set it. Otherwise we leave it there since it could have been
// set by the rx_udpsocksize command
if (buffer_size) {
auto current = getBufferSize() / 2;
if (current < buffer_size) {
setBufferSize(buffer_size);
if (getBufferSize() / 2 < buffer_size) {
LOG(logWARNING)
<< "Could not set buffer size. Got: "
<< getBufferSize() / 2 << " instead of " << buffer_size;
}
}
}
// Allocate at the end to avoid memory leak if we throw
buff = new char[packet_size];
}
~UdpRxSocket() {
delete[] buff;
Shutdown();
}
const char *LastPacket() const noexcept { return buff; }
ssize_t getPacketSize() const noexcept { return packet_size; }
bool ReceivePacket() noexcept { return ReceivePacket(buff); }
bool ReceivePacket(char *dst, int flags = 0) noexcept {
auto bytes_received =
recvfrom(fd, dst, packet_size, flags, nullptr, nullptr);
return bytes_received == packet_size;
}
bool PeekPacket() noexcept{
return ReceivePacket(buff, MSG_PEEK);
}
// Only for backwards compatibility this function will be removed during
// refactoring of the receiver
ssize_t ReceiveDataOnly(char *dst) {
auto r = recvfrom(fd, dst, packet_size, 0, nullptr, nullptr);
constexpr ssize_t eiger_header_packet =
40; // only detector that has this
if (r == eiger_header_packet) {
LOG(logWARNING) << "Got header pkg";
r = recvfrom(fd, dst, packet_size, 0, nullptr, nullptr);
}
return r;
}
ssize_t getBufferSize() const {
uint64_t ret_size = 0;
socklen_t optlen = sizeof(uint64_t);
if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen) == -1)
return -1;
else
return ret_size;
}
// Only for backwards compatibility will be removed
ssize_t getActualUDPSocketBufferSize() const { return getBufferSize(); }
// Only for backwards compatibility will be removed
void ShutDownSocket() { Shutdown(); }
void setBufferSize(ssize_t size) {
socklen_t optlen = sizeof(size);
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &size, optlen)) {
throw RuntimeError("Could not set socket buffer size");
}
}
void Shutdown() {
shutdown(fd, SHUT_RDWR);
if (fd >= 0) {
close(fd);
fd = -1;
}
}
}; };
} // namespace sls } // namespace sls

View File

@ -175,7 +175,7 @@ class slsDetectorDefs {
struct ROI { struct ROI {
int xmin{-1}; /**< is the roi xmin (in channel number) */ int xmin{-1}; /**< is the roi xmin (in channel number) */
int xmax{-1}; /**< is the roi xmax (in channel number)*/ int xmax{-1}; /**< is the roi xmax (in channel number)*/
}; }__attribute__((packed));
#else #else
typedef struct { typedef struct {
int xmin; /**< is the roi xmin (in channel number) */ int xmin; /**< is the roi xmin (in channel number) */
@ -203,7 +203,7 @@ class slsDetectorDefs {
int y{0}; int y{0};
xy() = default; xy() = default;
xy(int x, int y):x(x),y(y){}; xy(int x, int y):x(x),y(y){};
}; }__attribute__((packed));
#endif #endif
@ -460,6 +460,44 @@ class slsDetectorDefs {
TIMING_EXTERNAL TIMING_EXTERNAL
}; };
#ifdef __cplusplus
/**
* structure to udpate receiver
*/
struct rxParameters {
detectorType detType{GENERIC};
xy multiSize;
int detId{0};
char hostname[MAX_STR_LENGTH];
int udpInterfaces{1};
int udp_dstport{0};
uint32_t udp_dstip{0U};
uint64_t udp_dstmac{0LU};
int udp_dstport2{0};
uint32_t udp_dstip2{0U};
uint64_t udp_dstmac2{0LU};
int64_t frames{0};
int64_t triggers{0};
int64_t bursts{0};
int analogSamples{0};
int digitalSamples{0};
int64_t expTimeNs{0};
int64_t periodNs{0};
int64_t subExpTimeNs{0};
int64_t subDeadTimeNs{0};
int activate{0};
int quad{0};
int dynamicRange{16};
timingMode timMode{AUTO_TIMING};
int tenGiga{0};
readoutMode roMode{ANALOG_ONLY};
uint32_t adcMask{0};
uint32_t adc10gMask{0};
ROI roi;
uint32_t countermask{0};
burstMode burstType{BURST_OFF};
}__attribute__((packed));
#endif
#ifdef __cplusplus #ifdef __cplusplus
protected: protected:

View File

@ -197,6 +197,7 @@ enum detFuncs{
F_SET_TIMING_SOURCE, F_SET_TIMING_SOURCE,
F_GET_NUM_CHANNELS, F_GET_NUM_CHANNELS,
F_UPDATE_RATE_CORRECTION, F_UPDATE_RATE_CORRECTION,
F_GET_RECEIVER_PARAMETERS,
NUM_DET_FUNCTIONS, NUM_DET_FUNCTIONS,
RECEIVER_ENUM_START = 256, /**< detector function should not exceed this (detector server should not compile anyway) */ RECEIVER_ENUM_START = 256, /**< detector function should not exceed this (detector server should not compile anyway) */
@ -207,8 +208,6 @@ enum detFuncs{
F_GET_LAST_RECEIVER_CLIENT_IP, F_GET_LAST_RECEIVER_CLIENT_IP,
F_SET_RECEIVER_PORT, F_SET_RECEIVER_PORT,
F_GET_RECEIVER_VERSION, F_GET_RECEIVER_VERSION,
F_GET_RECEIVER_TYPE,
F_SEND_RECEIVER_DETHOSTNAME,
F_RECEIVER_SET_ROI, F_RECEIVER_SET_ROI,
F_RECEIVER_SET_NUM_FRAMES, F_RECEIVER_SET_NUM_FRAMES,
F_SET_RECEIVER_NUM_TRIGGERS, F_SET_RECEIVER_NUM_TRIGGERS,
@ -252,8 +251,6 @@ enum detFuncs{
F_SET_FLIPPED_DATA_RECEIVER, F_SET_FLIPPED_DATA_RECEIVER,
F_SET_RECEIVER_FILE_FORMAT, F_SET_RECEIVER_FILE_FORMAT,
F_GET_RECEIVER_FILE_FORMAT, F_GET_RECEIVER_FILE_FORMAT,
F_SEND_RECEIVER_DETPOSID,
F_SEND_RECEIVER_MULTIDETSIZE,
F_SET_RECEIVER_STREAMING_PORT, F_SET_RECEIVER_STREAMING_PORT,
F_GET_RECEIVER_STREAMING_PORT, F_GET_RECEIVER_STREAMING_PORT,
F_SET_RECEIVER_STREAMING_SRC_IP, F_SET_RECEIVER_STREAMING_SRC_IP,
@ -293,6 +290,7 @@ enum detFuncs{
F_SET_ADDITIONAL_JSON_PARAMETER, F_SET_ADDITIONAL_JSON_PARAMETER,
F_GET_ADDITIONAL_JSON_PARAMETER, F_GET_ADDITIONAL_JSON_PARAMETER,
F_GET_RECEIVER_PROGRESS, F_GET_RECEIVER_PROGRESS,
F_SETUP_RECEIVER,
NUM_REC_FUNCTIONS NUM_REC_FUNCTIONS
}; };
@ -487,6 +485,7 @@ static const char* getFunctionNameFromEnum(enum detFuncs func) {
case F_SET_TIMING_SOURCE: return "F_SET_TIMING_SOURCE"; case F_SET_TIMING_SOURCE: return "F_SET_TIMING_SOURCE";
case F_GET_NUM_CHANNELS: return "F_GET_NUM_CHANNELS"; case F_GET_NUM_CHANNELS: return "F_GET_NUM_CHANNELS";
case F_UPDATE_RATE_CORRECTION: return "F_UPDATE_RATE_CORRECTION"; case F_UPDATE_RATE_CORRECTION: return "F_UPDATE_RATE_CORRECTION";
case F_GET_RECEIVER_PARAMETERS: return "F_GET_RECEIVER_PARAMETERS";
case NUM_DET_FUNCTIONS: return "NUM_DET_FUNCTIONS"; case NUM_DET_FUNCTIONS: return "NUM_DET_FUNCTIONS";
case RECEIVER_ENUM_START: return "RECEIVER_ENUM_START"; case RECEIVER_ENUM_START: return "RECEIVER_ENUM_START";
@ -497,8 +496,7 @@ static const char* getFunctionNameFromEnum(enum detFuncs func) {
case F_GET_LAST_RECEIVER_CLIENT_IP: return "F_GET_LAST_RECEIVER_CLIENT_IP"; case F_GET_LAST_RECEIVER_CLIENT_IP: return "F_GET_LAST_RECEIVER_CLIENT_IP";
case F_SET_RECEIVER_PORT: return "F_SET_RECEIVER_PORT"; case F_SET_RECEIVER_PORT: return "F_SET_RECEIVER_PORT";
case F_GET_RECEIVER_VERSION: return "F_GET_RECEIVER_VERSION"; case F_GET_RECEIVER_VERSION: return "F_GET_RECEIVER_VERSION";
case F_GET_RECEIVER_TYPE: return "F_GET_RECEIVER_TYPE"; case F_SETUP_RECEIVER: return "F_SETUP_RECEIVER";
case F_SEND_RECEIVER_DETHOSTNAME: return "F_SEND_RECEIVER_DETHOSTNAME";
case F_RECEIVER_SET_ROI: return "F_RECEIVER_SET_ROI"; case F_RECEIVER_SET_ROI: return "F_RECEIVER_SET_ROI";
case F_RECEIVER_SET_NUM_FRAMES: return "F_RECEIVER_SET_NUM_FRAMES"; case F_RECEIVER_SET_NUM_FRAMES: return "F_RECEIVER_SET_NUM_FRAMES";
case F_SET_RECEIVER_NUM_TRIGGERS: return "F_SET_RECEIVER_NUM_TRIGGERS"; case F_SET_RECEIVER_NUM_TRIGGERS: return "F_SET_RECEIVER_NUM_TRIGGERS";
@ -542,8 +540,6 @@ static const char* getFunctionNameFromEnum(enum detFuncs func) {
case F_SET_FLIPPED_DATA_RECEIVER: return "F_SET_FLIPPED_DATA_RECEIVER"; case F_SET_FLIPPED_DATA_RECEIVER: return "F_SET_FLIPPED_DATA_RECEIVER";
case F_SET_RECEIVER_FILE_FORMAT: return "F_SET_RECEIVER_FILE_FORMAT"; case F_SET_RECEIVER_FILE_FORMAT: return "F_SET_RECEIVER_FILE_FORMAT";
case F_GET_RECEIVER_FILE_FORMAT: return "F_GET_RECEIVER_FILE_FORMAT"; case F_GET_RECEIVER_FILE_FORMAT: return "F_GET_RECEIVER_FILE_FORMAT";
case F_SEND_RECEIVER_DETPOSID: return "F_SEND_RECEIVER_DETPOSID";
case F_SEND_RECEIVER_MULTIDETSIZE: return "F_SEND_RECEIVER_MULTIDETSIZE";
case F_SET_RECEIVER_STREAMING_PORT: return "F_SET_RECEIVER_STREAMING_PORT"; case F_SET_RECEIVER_STREAMING_PORT: return "F_SET_RECEIVER_STREAMING_PORT";
case F_GET_RECEIVER_STREAMING_PORT: return "F_GET_RECEIVER_STREAMING_PORT"; case F_GET_RECEIVER_STREAMING_PORT: return "F_GET_RECEIVER_STREAMING_PORT";
case F_SET_RECEIVER_STREAMING_SRC_IP: return "F_SET_RECEIVER_STREAMING_SRC_IP"; case F_SET_RECEIVER_STREAMING_SRC_IP: return "F_SET_RECEIVER_STREAMING_SRC_IP";

View File

@ -1,12 +1,12 @@
/** API versions */ /** API versions */
#define GITBRANCH "removeshm" #define GITBRANCH "setrxhostname"
#define APILIB 0x200402 #define APILIB 0x200409
#define APIRECEIVER 0x200402 #define APIRECEIVER 0x200409
#define APIGUI 0x200331 #define APIGUI 0x200409
#define APICTB 0x200407 #define APIEIGER 0x200409
#define APIGOTTHARD 0x200407 #define APICTB 0x200409
#define APIGOTTHARD2 0x200407 #define APIGOTTHARD 0x200409
#define APIJUNGFRAU 0x200407 #define APIGOTTHARD2 0x200409
#define APIMYTHEN3 0x200407 #define APIJUNGFRAU 0x200409
#define APIMOENCH 0x200407 #define APIMYTHEN3 0x200409
#define APIEIGER 0x200408 #define APIMOENCH 0x200409

View File

@ -15,13 +15,13 @@
namespace sls { namespace sls {
DataSocket::DataSocket(int socketId) : socketId_(socketId) { DataSocket::DataSocket(int socketId) : sockfd_(socketId) {
int value = 1; int value = 1;
setsockopt(socketId_, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)); setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
} }
DataSocket::~DataSocket() { DataSocket::~DataSocket() {
if (socketId_ <= 0) { if (sockfd_ <= 0) {
return; return;
} else { } else {
try { try {
@ -32,7 +32,7 @@ DataSocket::~DataSocket() {
} }
void DataSocket::swap(DataSocket &other) noexcept { void DataSocket::swap(DataSocket &other) noexcept {
std::swap(socketId_, other.socketId_); std::swap(sockfd_, other.sockfd_);
} }
DataSocket::DataSocket(DataSocket &&move) noexcept { move.swap(*this); } DataSocket::DataSocket(DataSocket &&move) noexcept { move.swap(*this); }
@ -121,19 +121,23 @@ int DataSocket::setTimeOut(int t_seconds) {
} }
void DataSocket::close() { void DataSocket::close() {
if (socketId_ > 0) { if (sockfd_ > 0) {
if (::close(socketId_)) { if (::close(sockfd_)) {
throw SocketError("could not close socket"); throw SocketError("could not close socket");
} }
socketId_ = -1; sockfd_ = -1;
} else { } else {
throw std::runtime_error("Socket ERROR: close called on bad socket\n"); throw std::runtime_error("Socket ERROR: close called on bad socket\n");
} }
} }
void DataSocket::shutDownSocket() { void DataSocket::shutDownSocket() {
shutdown(getSocketId(), SHUT_RDWR); ::shutdown(getSocketId(), SHUT_RDWR);
close(); close();
} }
void DataSocket::shutdown(){
::shutdown(sockfd_, SHUT_RDWR);
}
} // namespace sls } // namespace sls

View File

@ -0,0 +1,96 @@
#include "UdpRxSocket.h"
#include "network_utils.h"
#include "sls_detector_exceptions.h"
#include <cstdint>
#include <errno.h>
#include <iostream>
#include <netdb.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
namespace sls {
UdpRxSocket::UdpRxSocket(int port, ssize_t packet_size, const char *hostname,
size_t kernel_buffer_size)
: packet_size_(packet_size) {
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = 0;
hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
struct addrinfo *res = 0;
const std::string portname = std::to_string(port);
if (getaddrinfo(hostname, portname.c_str(), &hints, &res)) {
throw RuntimeError("Failed at getaddrinfo with " +
std::string(hostname));
}
sockfd_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (sockfd_ == -1) {
throw RuntimeError("Failed to create UDP RX socket");
}
if (bind(sockfd_, res->ai_addr, res->ai_addrlen) == -1) {
throw RuntimeError("Failed to bind UDP RX socket");
}
freeaddrinfo(res);
// If we get a specified buffer size that is larger than the set one
// we set it. Otherwise we leave it there since it could have been
// set by the rx_udpsocksize command
if (kernel_buffer_size) {
auto current = getBufferSize() / 2;
if (current < kernel_buffer_size) {
setBufferSize(kernel_buffer_size);
if (getBufferSize() / 2 < kernel_buffer_size) {
LOG(logWARNING)
<< "Could not set buffer size. Got: " << getBufferSize() / 2
<< " instead of " << kernel_buffer_size;
}
}
}
}
UdpRxSocket::~UdpRxSocket() { Shutdown(); }
ssize_t UdpRxSocket::getPacketSize() const noexcept { return packet_size_; }
bool UdpRxSocket::ReceivePacket(char *dst) noexcept{
auto bytes_received =
recvfrom(sockfd_, dst, packet_size_, 0, nullptr, nullptr);
return bytes_received == packet_size_;
}
ssize_t UdpRxSocket::ReceiveDataOnly(char *dst) noexcept {
auto r = recvfrom(sockfd_, dst, packet_size_, 0, nullptr, nullptr);
constexpr ssize_t eiger_header_packet =
40; // only detector that has this
if (r == eiger_header_packet) {
LOG(logWARNING) << "Got header pkg";
r = recvfrom(sockfd_, dst, packet_size_, 0, nullptr, nullptr);
}
return r;
}
size_t UdpRxSocket::getBufferSize() const {
size_t ret = 0;
socklen_t optlen = sizeof(ret);
if (getsockopt(sockfd_, SOL_SOCKET, SO_RCVBUF, &ret, &optlen) == -1)
throw RuntimeError("Could not get socket buffer size");
return ret;
}
void UdpRxSocket::setBufferSize(ssize_t size) {
if (setsockopt(sockfd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)))
throw RuntimeError("Could not set socket buffer size");
}
void UdpRxSocket::Shutdown() {
shutdown(sockfd_, SHUT_RDWR);
if (sockfd_ >= 0) {
close(sockfd_);
sockfd_ = -1;
}
}
} // namespace sls

View File

@ -109,7 +109,7 @@ TEST_CASE("run status"){
using defs = slsDetectorDefs; using defs = slsDetectorDefs;
REQUIRE(ToString(defs::runStatus::ERROR) == "error"); REQUIRE(ToString(defs::runStatus::ERROR) == "error");
REQUIRE(ToString(defs::runStatus::WAITING) == "waiting"); REQUIRE(ToString(defs::runStatus::WAITING) == "waiting");
REQUIRE(ToString(defs::runStatus::TRANSMITTING) == "data"); //?? REQUIRE(ToString(defs::runStatus::TRANSMITTING) == "transmitting");
REQUIRE(ToString(defs::runStatus::RUN_FINISHED) == "finished"); REQUIRE(ToString(defs::runStatus::RUN_FINISHED) == "finished");
REQUIRE(ToString(defs::runStatus::STOPPED) == "stopped"); REQUIRE(ToString(defs::runStatus::STOPPED) == "stopped");
REQUIRE(ToString(defs::runStatus::IDLE) == "idle"); REQUIRE(ToString(defs::runStatus::IDLE) == "idle");

View File

@ -4,6 +4,14 @@
#include <future> #include <future>
#include <thread> #include <thread>
#include <vector> #include <vector>
#include <cstdint>
#include <errno.h>
#include <iostream>
#include <netdb.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
constexpr int default_port = 50001; constexpr int default_port = 50001;
@ -29,46 +37,49 @@ int open_socket(int port) {
throw sls::RuntimeError("Failed to create UDP RX socket"); throw sls::RuntimeError("Failed to create UDP RX socket");
} }
if (connect(fd, res->ai_addr, res->ai_addrlen)){ if (connect(fd, res->ai_addr, res->ai_addrlen)) {
throw sls::RuntimeError("Failed to connect socket"); throw sls::RuntimeError("Failed to connect socket");
} }
freeaddrinfo(res); freeaddrinfo(res);
return fd; return fd;
} }
TEST_CASE("Receive data on localhost") { TEST_CASE("Get packet size returns the packet size we set in the constructor"){
constexpr int port = 50001;
constexpr ssize_t packet_size = 8000;
sls::UdpRxSocket s{port, packet_size};
CHECK(s.getPacketSize() == packet_size);
}
TEST_CASE("Receive data from a vector") {
constexpr int port = 50001; constexpr int port = 50001;
std::vector<int> data_to_send{4, 5, 3, 2, 5, 7, 2, 3}; std::vector<int> data_to_send{4, 5, 3, 2, 5, 7, 2, 3};
std::vector<int> data_received(data_to_send.size());
ssize_t packet_size = ssize_t packet_size =
sizeof(decltype(data_to_send)::value_type) * data_to_send.size(); sizeof(decltype(data_to_send)::value_type) * data_to_send.size();
sls::UdpRxSocket udpsock{port, packet_size}; sls::UdpRxSocket udpsock{port, packet_size};
int fd = open_socket(port); int fd = open_socket(port);
auto n = write(fd, data_to_send.data(), packet_size); auto n = write(fd, data_to_send.data(), packet_size);
CHECK(n == packet_size); CHECK(n == packet_size);
CHECK(udpsock.ReceivePacket());
CHECK(udpsock.ReceivePacket((char*)data_received.data()));
close(fd); close(fd);
// Copy data from buffer and compare values CHECK(data_to_send == data_received);
std::vector<int> data_received(data_to_send.size());
memcpy(data_received.data(), udpsock.LastPacket(), udpsock.getPacketSize());
CHECK(data_received.size() == data_to_send.size()); // sanity check
for (size_t i = 0; i != data_to_send.size(); ++i) {
CHECK(data_to_send[i] == data_received[i]);
}
} }
TEST_CASE("Shutdown socket without hanging when waiting for data") { TEST_CASE("Shutdown socket without hanging when waiting for data") {
constexpr int port = 50001; constexpr int port = 50001;
constexpr ssize_t packet_size = 8000; constexpr ssize_t packet_size = 8000;
sls::UdpRxSocket s{port, packet_size}; sls::UdpRxSocket s{port, packet_size};
char buff[packet_size];
// Start a thread and wait for package // Start a thread and wait for package
// if the socket is left open we would block // if the socket is left open we would block
std::future<bool> ret = std::future<bool> ret =
std::async(static_cast<bool (sls::UdpRxSocket::*)()>( std::async(&sls::UdpRxSocket::ReceivePacket, &s, (char *)&buff);
&sls::UdpRxSocket::ReceivePacket),
&s);
s.Shutdown(); s.Shutdown();
auto r = ret.get(); auto r = ret.get();
@ -76,60 +87,23 @@ TEST_CASE("Shutdown socket without hanging when waiting for data") {
CHECK(r == false); // since we didn't get the packet CHECK(r == false); // since we didn't get the packet
} }
TEST_CASE("Too small packet"){ TEST_CASE("Too small packet") {
constexpr int port = 50001; constexpr int port = 50001;
sls::UdpRxSocket s(port, 2*sizeof(uint32_t)); sls::UdpRxSocket s(port, 2 * sizeof(uint32_t));
auto fd = open_socket(port); auto fd = open_socket(port);
uint32_t val = 10; uint32_t val = 10;
write(fd, &val, sizeof(val)); write(fd, &val, sizeof(val));
CHECK(s.ReceivePacket() == false); uint32_t buff[2];
CHECK(s.ReceivePacket((char *)&buff) == false);
close(fd); close(fd);
} }
TEST_CASE("Receive an int to an external buffer") {
TEST_CASE("Receive an int to internal buffer"){
int to_send = 5; int to_send = 5;
int received = -1; int received = -1;
auto fd = open_socket(default_port); auto fd = open_socket(default_port);
sls::UdpRxSocket s(default_port, sizeof(int)); sls::UdpRxSocket s(default_port, sizeof(int));
write(fd, &to_send, sizeof(to_send)); write(fd, &to_send, sizeof(to_send));
CHECK(s.ReceivePacket()); CHECK(s.ReceivePacket(reinterpret_cast<char *>(&received)));
memcpy(&received, s.LastPacket(), sizeof(int));
CHECK(received == to_send); CHECK(received == to_send);
} }
TEST_CASE("Receive an int to an external buffer"){
int to_send = 5;
int received = -1;
auto fd = open_socket(default_port);
sls::UdpRxSocket s(default_port, sizeof(int));
write(fd, &to_send, sizeof(to_send));
CHECK(s.ReceivePacket(reinterpret_cast<char*>(&received)));
CHECK(received == to_send);
}
TEST_CASE("PEEK data"){
int to_send = 5;
int to_send2 = 12;
int received = -1;
auto fd = open_socket(default_port);
sls::UdpRxSocket s(default_port, sizeof(int));
write(fd, &to_send, sizeof(to_send));
write(fd, &to_send2, sizeof(to_send));
CHECK(s.PeekPacket());
memcpy(&received, s.LastPacket(), sizeof(int));
CHECK(received == to_send);
CHECK(s.PeekPacket());
memcpy(&received, s.LastPacket(), sizeof(int));
CHECK(received == to_send);
CHECK(s.ReceivePacket());
memcpy(&received, s.LastPacket(), sizeof(int));
CHECK(received == to_send);
CHECK(s.ReceivePacket());
memcpy(&received, s.LastPacket(), sizeof(int));
CHECK(received == to_send2);
}