Compare commits

...

27 Commits

Author SHA1 Message Date
337e56d9bf cleaned up LTO detection 2020-04-23 08:23:36 +02:00
eb257154c6 added triggers 2020-04-22 09:15:31 +02:00
c1ae67ac46 Small refactor on ThreadObject and listener (#93)
* removed pointer, slight cleaning of loop

* removed semaphore, use getters

* removed redundant log msg

* removed comment

* added const

* removed comment

* changed header
2020-04-21 09:45:29 +02:00
68f76e5356 more like UdpRxSocket 2020-04-20 17:24:53 +02:00
8afa11ed33 removed pointer to server socket 2020-04-20 17:20:33 +02:00
bc389f4825 moved data members to top 2020-04-20 14:51:48 +02:00
095ced153c removed need for pointer 2020-04-20 14:31:10 +02:00
a1a5a20845 from thread sanitizer 2020-04-17 09:35:38 +02:00
c725a05ef8 fix RH7 2020-04-16 15:51:28 +02:00
815b6a37aa moved flag to base class 2020-04-16 13:22:51 +02:00
655a410d43 cleaned up UdpRxSocket 2020-04-16 09:45:44 +02:00
97ba81d923 fixed test 2020-04-14 16:58:37 +02:00
3d00eed0f0 Change SetTrimbits() and SaveAllTrimbits() to rely on top/bottom signal instead of TopAddressIsValid() for further cleanup. 2020-04-14 16:13:26 +02:00
a7f5300455 Merge pull request #92 from slsdetectorgroup/setrxhostname
Setrxhostname
2020-04-09 11:52:46 +02:00
2f33a1a479 updated binaries 2020-04-09 09:35:46 +02:00
39fa5e0185 client recieve rx parameters as a struct 2020-04-09 09:34:20 +02:00
ba4985ed4d Merge branch 'developer' of github.com:slsdetectorgroup/slsDetectorPackage into developer 2020-04-09 09:32:09 +02:00
f811c065d1 corrected a delete [] in multiThreadedAnalogDetector 2020-04-09 09:31:43 +02:00
3a1d87728c updated client api 2020-04-09 08:38:37 +02:00
0652ff6b5a updated binaries 2020-04-09 08:37:27 +02:00
373e177274 WIP 2020-04-09 08:35:30 +02:00
6dd6685e7d minor 2020-04-09 08:24:16 +02:00
38c31fdada WIP 2020-04-08 20:27:10 +02:00
b3fe0e79bc WIP 2020-04-08 16:43:28 +02:00
215e4a56fd Merge branch 'developer' into setrxhostname 2020-04-08 12:00:21 +02:00
71a68c2022 eiger transmitting fix in 10g mode (stop servers informed about 10g mode) 2020-04-08 11:58:59 +02:00
55f8497eac WIP 2020-04-08 11:05:06 +02:00
44 changed files with 1125 additions and 1057 deletions

View File

@ -46,6 +46,11 @@ option(SLS_BUILD_DOCS "docs" OFF)
option(SLS_BUILD_EXAMPLES "examples" OFF)
option(SLS_TUNE_LOCAL "tune to local machine" OFF)
#Enable LTO if available
check_ipo_supported(RESULT SLS_LTO_AVAILABLE)
# Use ld.gold if it is available and isn't disabled explicitly
option(SLS_USE_LD_GOLD "Use GNU gold linker" ON)
if (SLS_USE_LD_GOLD)

View File

@ -160,6 +160,13 @@ class Detector(CppDetectorApi):
def frames(self, n_frames):
self.setNumberOfFrames(n_frames)
@property
def triggers(self):
return element_if_equal(self.getNumberOfTriggers())
@triggers.setter
def triggers(self, n_triggers):
self.setNumberOfTriggers(n_triggers)
@property
def exptime(self):

View File

@ -320,7 +320,7 @@ public:
// int nnx, nny, ns;
int nn=dets[0]->getImageSize(nnx, nny,ns, nsy);
if (image) {
delete image;
delete [] image;
image=NULL;
}
image=new int[nn];

View File

@ -898,7 +898,7 @@ int Feb_Control_SendDACValue(unsigned int dst_num, unsigned int ch, unsigned int
int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int *trimbits) {
int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int *trimbits, int top) {
LOG(logINFO, ("Setting Trimbits\n"));
//for (int iy=10000;iy<20020;++iy)//263681
@ -963,7 +963,7 @@ int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int *trimbits) {
int i;
for(i=0;i<8;i++) { // column loop i
if (Module_TopAddressIsValid(&modules[1])) {
if (top==1) {
trimbits_to_load_l[offset+chip_sc] |= ( 0x7 & trimbits[row_set*16480+super_column_start_position_l+i])<<((7-i)*4);//low
trimbits_to_load_l[offset+chip_sc+32] |= ((0x38 & trimbits[row_set*16480+super_column_start_position_l+i])>>3)<<((7-i)*4);//upper
trimbits_to_load_r[offset+chip_sc] |= ( 0x7 & trimbits[row_set*16480+super_column_start_position_r+i])<<((7-i)*4);//low
@ -1572,12 +1572,12 @@ int Feb_Control_StopAcquisition() {
int Feb_Control_SaveAllTrimbitsTo(int value) {
int Feb_Control_SaveAllTrimbitsTo(int value, int top) {
unsigned int chanregs[Feb_Control_trimbit_size];
int i;
for(i=0;i<Feb_Control_trimbit_size;i++)
chanregs[i] = value;
return Feb_Control_SetTrimbits(0,chanregs);
return Feb_Control_SetTrimbits(0,chanregs, top);
}

View File

@ -93,9 +93,9 @@ int Feb_Control_SetDAC(char* s, int value, int is_a_voltage_mv);
int Feb_Control_GetDAC(char* s, int* ret_value, int voltage_mv);
int Feb_Control_GetDACName(unsigned int dac_num,char* s);
int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int* trimbits);
int Feb_Control_SetTrimbits(unsigned int module_num, unsigned int* trimbits, int top);
unsigned int* Feb_Control_GetTrimbits();
int Feb_Control_SaveAllTrimbitsTo(int value);
int Feb_Control_SaveAllTrimbitsTo(int value, int top);
int Feb_Control_Reset();
int Feb_Control_PrepareForAcquisition();

View File

@ -759,7 +759,7 @@ int64_t getSubExpTime() {
#endif
}
int setDeadTime(int64_t val) {
int setSubDeadTime(int64_t val) {
LOG(logINFO, ("Setting subdeadtime %lld ns\n", (long long int)val));
#ifndef VIRTUAL
// get subexptime
@ -781,7 +781,7 @@ int setDeadTime(int64_t val) {
return OK;
}
int64_t getDeadTime() {
int64_t getSubDeadTime() {
#ifndef VIRTUAL
// get subexptime
int64_t subexptime = Feb_Control_GetSubFrameExposureTime();
@ -890,7 +890,7 @@ int setModule(sls_detector_module myMod, char* mess) {
}
//set trimbits
if (!Feb_Control_SetTrimbits(Feb_Control_GetModuleNumber(), tt)) {
if (!Feb_Control_SetTrimbits(Feb_Control_GetModuleNumber(), tt,top)) {
sprintf(mess, "Could not set module. Could not set trimbits\n");
LOG(logERROR, (mess));
setSettings(UNDEFINED);
@ -1671,7 +1671,7 @@ void setExternalGating(int enable[]) {
int setAllTrimbits(int val) {
#ifndef VIRTUAL
if (!Feb_Control_SaveAllTrimbitsTo(val)) {
if (!Feb_Control_SaveAllTrimbitsTo(val,top)) {
LOG(logERROR, ("Could not set all trimbits\n"));
return FAIL;
}

View File

@ -52,12 +52,11 @@ int Server_VerifyLock();
* Server sends result to client (also set ret to force_update if different clients)
* @param fileDes file descriptor for the socket
* @param itype 32 or 64 or others to determine to swap data from big endian to little endian
* @param update 1 if one must update if different clients, else 0
* @param retval pointer to result
* @param retvalSize size of result
* @returns result of operation
*/
int Server_SendResult(int fileDes, intType itype, int update, void* retval, int retvalSize);
int Server_SendResult(int fileDes, intType itype, void* retval, int retvalSize);
/**
* Convert mac address from integer to char array

View File

@ -206,8 +206,8 @@ int64_t getBurstPeriod();
#ifdef EIGERD
int setSubExpTime(int64_t val);
int64_t getSubExpTime();
int setDeadTime(int64_t val);
int64_t getDeadTime();
int setSubDeadTime(int64_t val);
int64_t getSubDeadTime();
int64_t getMeasuredPeriod();
int64_t getMeasuredSubPeriod();
#endif

View File

@ -216,4 +216,5 @@ int get_timing_source(int);
int set_timing_source(int);
int get_num_channels(int);
int update_rate_correction(int);
int get_receiver_parameters(int);

View File

@ -577,7 +577,7 @@ int Server_VerifyLock() {
}
int Server_SendResult(int fileDes, intType itype, int update, void* retval, int retvalSize) {
int Server_SendResult(int fileDes, intType itype, void* retval, int retvalSize) {
// send success of operation
int ret1 = ret;

View File

@ -15,9 +15,8 @@ add_library(slsDetectorShared SHARED
${HEADERS}
)
# Do we have link time optimization?
check_ipo_supported(RESULT LTO_AVAILABLE)
if(LTO_AVAILABLE)
if(SLS_LTO_AVAILABLE)
set_property(TARGET slsDetectorShared PROPERTY INTERPROCEDURAL_OPTIMIZATION True)
endif()

View File

@ -526,27 +526,6 @@ Module::getTypeFromDetector(const std::string &hostname, int cport) {
return retval;
}
int Module::setDetectorType(detectorType const type) {
int fnum = F_GET_DETECTOR_TYPE;
detectorType retval = GENERIC;
LOG(logDEBUG1) << "Setting detector type to " << type;
// if unspecified, then get from detector
if (type == GET_DETECTOR_TYPE) {
sendToDetector(fnum, nullptr, retval);
shm()->myDetectorType = static_cast<detectorType>(retval);
LOG(logDEBUG1) << "Detector Type: " << retval;
}
if (shm()->useReceiverFlag) {
auto arg = static_cast<int>(shm()->myDetectorType);
retval = GENERIC;
LOG(logDEBUG1) << "Sending detector type to Receiver: " << arg;
sendToReceiver(F_GET_RECEIVER_TYPE, arg, retval);
LOG(logDEBUG1) << "Receiver Type: " << retval;
}
return retval;
}
slsDetectorDefs::detectorType Module::getDetectorType() const {
return shm()->myDetectorType;
}
@ -1510,14 +1489,13 @@ uint32_t Module::clearBit(uint32_t addr, int n) {
}
}
std::string Module::setReceiverHostname(const std::string &receiverIP) {
void Module::setReceiverHostname(const std::string &receiverIP) {
LOG(logDEBUG1) << "Setting up Receiver with " << receiverIP;
// recieverIP is none
if (receiverIP == "none") {
memset(shm()->rxHostname, 0, MAX_STR_LENGTH);
sls::strcpy_safe(shm()->rxHostname, "none");
shm()->useReceiverFlag = false;
return std::string(shm()->rxHostname);
}
// stop acquisition if running
@ -1537,81 +1515,78 @@ std::string Module::setReceiverHostname(const std::string &receiverIP) {
shm()->useReceiverFlag = true;
checkReceiverVersionCompatibility();
if (setDetectorType(shm()->myDetectorType) != GENERIC) {
sendMultiDetectorSize();
setDetectorId();
setDetectorHostname();
// populate parameters from detector
rxParameters retval;
sendToDetector(F_GET_RECEIVER_PARAMETERS, nullptr, retval);
// setup udp
updateRxDestinationUDPIP();
setDestinationUDPPort(getDestinationUDPPort());
if (shm()->myDetectorType == JUNGFRAU || shm()->myDetectorType == EIGER ) {
setDestinationUDPPort2(getDestinationUDPPort2());
// populate from shared memory
retval.detType = shm()->myDetectorType;
retval.multiSize.x = shm()->multiSize.x;
retval.multiSize.y = shm()->multiSize.y;
retval.detId = detId;
memset(retval.hostname, 0, sizeof(retval.hostname));
strcpy_safe(retval.hostname, shm()->hostname);
LOG(logDEBUG1)
<< "detType:" << retval.detType << std::endl
<< "multiSize.x:" << retval.multiSize.x << std::endl
<< "multiSize.y:" << retval.multiSize.y << std::endl
<< "detId:" << retval.detId << std::endl
<< "hostname:" << retval.hostname << std::endl
<< "udpInterfaces:" << retval.udpInterfaces << std::endl
<< "udp_dstport:" << retval.udp_dstport << std::endl
<< "udp_dstip:" << sls::IpAddr(retval.udp_dstip) << std::endl
<< "udp_dstmac:" << sls::MacAddr(retval.udp_dstmac) << std::endl
<< "udp_dstport2:" << retval.udp_dstport2 << std::endl
<< "udp_dstip2:" << sls::IpAddr(retval.udp_dstip2) << std::endl
<< "udp_dstmac2:" << sls::MacAddr(retval.udp_dstmac2) << std::endl
<< "frames:" << retval.frames << std::endl
<< "triggers:" << retval.triggers << std::endl
<< "bursts:" << retval.bursts << std::endl
<< "analogSamples:" << retval.analogSamples << std::endl
<< "digitalSamples:" << retval.digitalSamples << std::endl
<< "expTimeNs:" << retval.expTimeNs << std::endl
<< "periodNs:" << retval.periodNs << std::endl
<< "subExpTimeNs:" << retval.subExpTimeNs << std::endl
<< "subDeadTimeNs:" << retval.subDeadTimeNs << std::endl
<< "activate:" << retval.activate << std::endl
<< "quad:" << retval.quad << std::endl
<< "dynamicRange:" << retval.dynamicRange << std::endl
<< "timMode:" << retval.timMode << std::endl
<< "tenGiga:" << retval.tenGiga << std::endl
<< "roMode:" << retval.roMode << std::endl
<< "adcMask:" << retval.adcMask << std::endl
<< "adc10gMask:" << retval.adc10gMask << std::endl
<< "roi.xmin:" << retval.roi.xmin << std::endl
<< "roi.xmax:" << retval.roi.xmax << std::endl
<< "countermask:" << retval.countermask << std::endl
<< "burstType:" << retval.burstType << std::endl;
sls::MacAddr retvals[2];
sendToReceiver(F_SETUP_RECEIVER, retval, retvals);
// update detectors with dest mac
if (retval.udp_dstmac == 0 && retvals[0] != 0) {
LOG(logINFO) << "Setting destination udp mac of "
"detector " << detId << " to " << retvals[0];
sendToDetector(F_SET_DEST_UDP_MAC, retvals[0], nullptr);
}
if (shm()->myDetectorType == JUNGFRAU) {
updateRxDestinationUDPIP2();
setNumberofUDPInterfaces(getNumberofUDPInterfaces());
if (retval.udp_dstmac2 == 0 && retvals[1] != 0) {
LOG(logINFO) << "Setting destination udp mac2 of "
"detector " << detId << " to " << retvals[1];
sendToDetector(F_SET_DEST_UDP_MAC2, retvals[1], nullptr);
}
LOG(logDEBUG1) << printReceiverConfiguration();
setReceiverUDPSocketBufferSize(0);
setNumberOfFrames(getNumberOfFrames());
setNumberOfTriggers(getNumberOfTriggers());
setTimingMode(getTimingMode());
setExptime(getExptime());
setPeriod(getPeriod());
// update numinterfaces if different
shm()->numUDPInterfaces = retval.udpInterfaces;
// detector specific
switch (shm()->myDetectorType) {
case EIGER:
setSubExptime(getSubExptime());
setSubDeadTime(getSubDeadTime());
setDynamicRange(getDynamicRange());
activate(-1);
enableTenGigabitEthernet(-1);
setQuad(getQuad());
break;
case CHIPTESTBOARD:
setNumberOfAnalogSamples(getNumberOfAnalogSamples());
setNumberOfDigitalSamples(getNumberOfDigitalSamples());
enableTenGigabitEthernet(-1);
setReadoutMode(getReadoutMode());
setADCEnableMask(getADCEnableMask());
setTenGigaADCEnableMask(getTenGigaADCEnableMask());
break;
case MOENCH:
setNumberOfAnalogSamples(getNumberOfAnalogSamples());
enableTenGigabitEthernet(-1);
setADCEnableMask(getADCEnableMask());
setTenGigaADCEnableMask(getTenGigaADCEnableMask());
break;
case GOTTHARD:
setROI(getROI());
break;
case MYTHEN3:
sendNumberofCounterstoReceiver(getCounterMask());
setDynamicRange(getDynamicRange());
break;
case GOTTHARD2:
setNumberOfBursts(getNumberOfBursts());
setBurstMode(getBurstMode());
break;
default:
break;
if (shm()->myDetectorType == MOENCH) {
setAdditionalJsonParameter("adcmask_1g", std::to_string(retval.adcMask));
setAdditionalJsonParameter("adcmask_10g", std::to_string(retval.adc10gMask));
}
// to use rx_hostname if empty and also update client zmqip
updateReceiverStreamingIP();
}
return std::string(shm()->rxHostname);
}
std::string Module::getReceiverHostname() const {
@ -1706,13 +1681,6 @@ sls::IpAddr Module::getDestinationUDPIP() {
return retval;
}
void Module::updateRxDestinationUDPIP() {
auto ip = getDestinationUDPIP();
if (ip != 0) {
setDestinationUDPIP(ip);
}
}
void Module::setDestinationUDPIP2(const IpAddr ip) {
LOG(logDEBUG1) << "Setting destination udp ip2 to " << ip;
if (ip == 0) {
@ -1736,13 +1704,6 @@ sls::IpAddr Module::getDestinationUDPIP2() {
return retval;
}
void Module::updateRxDestinationUDPIP2() {
auto ip = getDestinationUDPIP2();
if (ip != 0) {
setDestinationUDPIP2(ip);
}
}
void Module::setDestinationUDPMAC(const MacAddr mac) {
LOG(logDEBUG1) << "Setting destination udp mac to " << mac;
if (mac == 0) {
@ -2887,38 +2848,6 @@ void Module::execReceiverCommand(const std::string &cmd) {
}
}
void Module::sendMultiDetectorSize() {
int args[]{shm()->multiSize.x, shm()->multiSize.y};
int retval = -1;
LOG(logDEBUG1) << "Sending multi detector size to receiver: ("
<< shm()->multiSize.x << "," << shm()->multiSize.y
<< ")";
if (shm()->useReceiverFlag) {
sendToReceiver(F_SEND_RECEIVER_MULTIDETSIZE, args, retval);
LOG(logDEBUG1) << "Receiver multi size returned: " << retval;
}
}
void Module::setDetectorId() {
LOG(logDEBUG1) << "Sending detector pos id to receiver: " << detId;
if (shm()->useReceiverFlag) {
int retval = -1;
sendToReceiver(F_SEND_RECEIVER_DETPOSID, detId, retval);
LOG(logDEBUG1) << "Receiver Position Id returned: " << retval;
}
}
void Module::setDetectorHostname() {
char args[MAX_STR_LENGTH]{};
char retvals[MAX_STR_LENGTH]{};
sls::strcpy_safe(args, shm()->hostname);
LOG(logDEBUG1) << "Sending detector hostname to receiver: " << args;
if (shm()->useReceiverFlag) {
sendToReceiver(F_SEND_RECEIVER_DETHOSTNAME, args, retvals);
LOG(logDEBUG1) << "Receiver set detector hostname: " << retvals;
}
}
std::string Module::getFilePath() {
if (!shm()->useReceiverFlag) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (file path)");
@ -3216,6 +3145,10 @@ bool Module::enableTenGigabitEthernet(int value) {
int retval = -1;
LOG(logDEBUG1) << "Enabling / Disabling 10Gbe: " << value;
sendToDetector(F_ENABLE_TEN_GIGA, value, retval);
if (value != -1) {
int stopRetval = -1;
sendToDetectorStop(F_ENABLE_TEN_GIGA, value, stopRetval);
}
LOG(logDEBUG1) << "10Gbe: " << retval;
value = retval;
if (shm()->useReceiverFlag && value != -1) {
@ -3459,10 +3392,6 @@ void Module::setPipeline(int clkIndex, int value) {
void Module::setCounterMask(uint32_t countermask) {
LOG(logDEBUG1) << "Setting Counter mask to " << countermask;
sendToDetector(F_SET_COUNTER_MASK, countermask, nullptr);
sendNumberofCounterstoReceiver(countermask);
}
void Module::sendNumberofCounterstoReceiver(uint32_t countermask) {
if (shm()->useReceiverFlag) {
int ncounters = __builtin_popcount(countermask);
LOG(logDEBUG1) << "Sending Reciver #counters: " << ncounters;

View File

@ -176,13 +176,6 @@ class Module : public virtual slsDetectorDefs {
*/
detectorType getDetectorType() const;
/**
* Gets detector type from detector and set it in receiver
* @param type the detector type
* @returns detector type in receiver
*/
int setDetectorType(detectorType type = GET_DETECTOR_TYPE);
/**
* Update total number of channels (chiptestboard or moench)
* from the detector server
@ -647,10 +640,10 @@ class Module : public virtual slsDetectorDefs {
* significant for the receiver Also configures the detector to the receiver
* as UDP destination
* @param receiver receiver hostname or IP address
* @returns the receiver IP address from shared memory
*/
std::string setReceiverHostname(const std::string &receiver);
void setReceiverHostname(const std::string &receiver);
void test();
/**
* Returns the receiver IP address\sa sharedSlsDetector
* @returns the receiver IP address
@ -721,13 +714,6 @@ class Module : public virtual slsDetectorDefs {
*/
sls::IpAddr getDestinationUDPIP();
/**
* Gets destination udp ip from detector,
* if 0, it converts rx_hostname to ip and
* updates both detector and receiver
*/
void updateRxDestinationUDPIP();
/**
* Validates the format of the receiver UDP IP address (bottom half) and
* sets it(Jungfrau only)
@ -743,13 +729,6 @@ class Module : public virtual slsDetectorDefs {
*/
sls::IpAddr getDestinationUDPIP2();
/**
* Gets destination udp ip2 from detector,
* if 0, it converts rx_hostname to ip and
* updates both detector and receiver
*/
void updateRxDestinationUDPIP2();
/**
* Validates the format of the receiver UDP MAC address and sets it
* @param mac receiver UDP MAC address
@ -1344,26 +1323,6 @@ class Module : public virtual slsDetectorDefs {
*/
void execReceiverCommand(const std::string &cmd);
/**
* Send the multi detector size to the detector
* @param detx number of detectors in x dir
* @param dety number of detectors in y dir
*/
void sendMultiDetectorSize();
/**
* Send the detector pos id to the receiver
* for various file naming conventions for multi detectors in receiver
*/
void setDetectorId();
/**
* Send the detector host name to the receiver
* for various handshaking required with the detector
*/
void setDetectorHostname();
std::string getFilePath();
void setFilePath(const std::string &path);
std::string getFileName();
@ -1606,9 +1565,6 @@ class Module : public virtual slsDetectorDefs {
/** [Mythen3] */
void setCounterMask(uint32_t countermask);
/** [Mythen3] */
void sendNumberofCounterstoReceiver(uint32_t countermask);
/** [Mythen3] */
uint32_t getCounterMask();

View File

@ -29,8 +29,8 @@ add_library(slsReceiverShared SHARED
${HEADERS}
)
check_ipo_supported(RESULT result)
if(result)
if(SLS_LTO_AVAILABLE)
set_property(TARGET slsReceiverShared PROPERTY INTERPROCEDURAL_OPTIMIZATION True)
endif()

View File

@ -24,19 +24,16 @@ using Interface = sls::ServerInterface;
ClientInterface::~ClientInterface() {
killTcpThread = true;
// shut down tcp sockets
if (server.get() != nullptr) {
LOG(logINFO) << "Shutting down TCP Socket on port " << portNumber;
server->shutDownSocket();
server.shutdown();
LOG(logDEBUG) << "TCP Socket closed on port " << portNumber;
}
// shut down tcp thread
tcpThread->join();
}
ClientInterface::ClientInterface(int portNumber)
: myDetectorType(GOTTHARD),
portNumber(portNumber > 0 ? portNumber : DEFAULT_PORTNO + 2) {
portNumber(portNumber > 0 ? portNumber : DEFAULT_PORTNO + 2),
server(portNumber) {
functionTable();
// start up tcp thread
tcpThread = sls::make_unique<std::thread>(&ClientInterface::startTCPServer, this);
@ -73,11 +70,11 @@ void ClientInterface::startTCPServer() {
LOG(logINFOBLUE) << "Created [ TCP server Tid: " << syscall(SYS_gettid) << "]";
LOG(logINFO) << "SLS Receiver starting TCP Server on port "
<< portNumber << '\n';
server = sls::make_unique<sls::ServerSocket>(portNumber);
while (true) {
// server = sls::make_unique<sls::ServerSocket>(portNumber);
while (!killTcpThread) {
LOG(logDEBUG1) << "Start accept loop";
try {
auto socket = server->accept();
auto socket = server.accept();
try {
verifyLock();
ret = decodeFunction(socket);
@ -95,10 +92,6 @@ void ClientInterface::startTCPServer() {
} catch (const RuntimeError &e) {
LOG(logERROR) << "Accept failed";
}
// destructor to kill this thread
if (killTcpThread) {
break;
}
}
if (receiver) {
@ -115,8 +108,7 @@ int ClientInterface::functionTable(){
flist[F_GET_LAST_RECEIVER_CLIENT_IP] = &ClientInterface::get_last_client_ip;
flist[F_SET_RECEIVER_PORT] = &ClientInterface::set_port;
flist[F_GET_RECEIVER_VERSION] = &ClientInterface::get_version;
flist[F_GET_RECEIVER_TYPE] = &ClientInterface::set_detector_type;
flist[F_SEND_RECEIVER_DETHOSTNAME] = &ClientInterface::set_detector_hostname;
flist[F_SETUP_RECEIVER] = &ClientInterface::setup_receiver;
flist[F_RECEIVER_SET_ROI] = &ClientInterface::set_roi;
flist[F_RECEIVER_SET_NUM_FRAMES] = &ClientInterface::set_num_frames;
flist[F_SET_RECEIVER_NUM_TRIGGERS] = &ClientInterface::set_num_triggers;
@ -160,8 +152,6 @@ int ClientInterface::functionTable(){
flist[F_SET_FLIPPED_DATA_RECEIVER] = &ClientInterface::set_flipped_data;
flist[F_SET_RECEIVER_FILE_FORMAT] = &ClientInterface::set_file_format;
flist[F_GET_RECEIVER_FILE_FORMAT] = &ClientInterface::get_file_format;
flist[F_SEND_RECEIVER_DETPOSID] = &ClientInterface::set_detector_posid;
flist[F_SEND_RECEIVER_MULTIDETSIZE] = &ClientInterface::set_multi_detector_size;
flist[F_SET_RECEIVER_STREAMING_PORT] = &ClientInterface::set_streaming_port;
flist[F_GET_RECEIVER_STREAMING_PORT] = &ClientInterface::get_streaming_port;
flist[F_SET_RECEIVER_STREAMING_SRC_IP] = &ClientInterface::set_streaming_source_ip;
@ -256,7 +246,7 @@ void ClientInterface::validate(T arg, T retval, const std::string& modename,
}
void ClientInterface::verifyLock() {
if (lockedByClient && server->getThisClient() != server->getLockedBy()) {
if (lockedByClient && server.getThisClient() != server.getLockedBy()) {
throw sls::SocketError("Receiver locked\n");
}
}
@ -302,10 +292,10 @@ int ClientInterface::lock_receiver(Interface &socket) {
auto lock = socket.Receive<int>();
LOG(logDEBUG1) << "Locking Server to " << lock;
if (lock >= 0) {
if (!lockedByClient || (server->getLockedBy() == server->getThisClient())) {
if (!lockedByClient || (server.getLockedBy() == server.getThisClient())) {
lockedByClient = lock;
lock ? server->setLockedBy(server->getThisClient())
: server->setLockedBy(sls::IpAddr{});
lock ? server.setLockedBy(server.getThisClient())
: server.setLockedBy(sls::IpAddr{});
} else {
throw RuntimeError("Receiver locked\n");
}
@ -314,7 +304,7 @@ int ClientInterface::lock_receiver(Interface &socket) {
}
int ClientInterface::get_last_client_ip(Interface &socket) {
return socket.sendResult(server->getLastClient());
return socket.sendResult(server.getLastClient());
}
int ClientInterface::set_port(Interface &socket) {
@ -324,9 +314,9 @@ int ClientInterface::set_port(Interface &socket) {
" is too low (<1024)");
LOG(logINFO) << "TCP port set to " << p_number << std::endl;
auto new_server = sls::make_unique<sls::ServerSocket>(p_number);
new_server->setLockedBy(server->getLockedBy());
new_server->setLastClient(server->getThisClient());
sls::ServerSocket new_server(p_number);
new_server.setLockedBy(server.getLockedBy());
new_server.setLastClient(server.getThisClient());
server = std::move(new_server);
socket.sendResult(p_number);
return OK;
@ -336,15 +326,175 @@ int ClientInterface::get_version(Interface &socket) {
return socket.sendResult(getReceiverVersion());
}
int ClientInterface::set_detector_type(Interface &socket) {
auto arg = socket.Receive<detectorType>();
// set
if (arg >= 0) {
int ClientInterface::setup_receiver(Interface &socket) {
auto arg = socket.Receive<rxParameters>();
LOG(logDEBUG1)
<< "detType:" << arg.detType << std::endl
<< "multiSize.x:" << arg.multiSize.x << std::endl
<< "multiSize.y:" << arg.multiSize.y << std::endl
<< "detId:" << arg.detId << std::endl
<< "hostname:" << arg.hostname << std::endl
<< "udpInterfaces:" << arg.udpInterfaces << std::endl
<< "udp_dstport:" << arg.udp_dstport << std::endl
<< "udp_dstip:" << sls::IpAddr(arg.udp_dstip) << std::endl
<< "udp_dstmac:" << sls::MacAddr(arg.udp_dstmac) << std::endl
<< "udp_dstport2:" << arg.udp_dstport2 << std::endl
<< "udp_dstip2:" << sls::IpAddr(arg.udp_dstip2) << std::endl
<< "udp_dstmac2:" << sls::MacAddr(arg.udp_dstmac2) << std::endl
<< "frames:" << arg.frames << std::endl
<< "triggers:" << arg.triggers << std::endl
<< "bursts:" << arg.bursts << std::endl
<< "analogSamples:" << arg.analogSamples << std::endl
<< "digitalSamples:" << arg.digitalSamples << std::endl
<< "expTimeNs:" << arg.expTimeNs << std::endl
<< "periodNs:" << arg.periodNs << std::endl
<< "subExpTimeNs:" << arg.subExpTimeNs << std::endl
<< "subDeadTimeNs:" << arg.subDeadTimeNs << std::endl
<< "activate:" << arg.activate << std::endl
<< "quad:" << arg.quad << std::endl
<< "dynamicRange:" << arg.dynamicRange << std::endl
<< "timMode:" << arg.timMode << std::endl
<< "tenGiga:" << arg.tenGiga << std::endl
<< "roMode:" << arg.roMode << std::endl
<< "adcMask:" << arg.adcMask << std::endl
<< "adc10gMask:" << arg.adc10gMask << std::endl
<< "roi.xmin:" << arg.roi.xmin << std::endl
<< "roi.xmax:" << arg.roi.xmax << std::endl
<< "countermask:" << arg.countermask << std::endl
<< "burstType:" << arg.burstType << std::endl;
// if object exists, verify unlocked and idle, else only verify lock
// (connecting first time)
if (receiver != nullptr) {
verifyIdle(socket);
}
// basic setup
setDetectorType(arg.detType);
{
int msize[2] = {arg.multiSize.x, arg.multiSize.y};
impl()->setMultiDetectorSize(msize);
}
impl()->setDetectorPositionId(arg.detId);
impl()->setDetectorHostname(arg.hostname);
// udp setup
sls::MacAddr retvals[2];
if (arg.udp_dstmac == 0 && arg.udp_dstip != 0) {
retvals[0] = setUdpIp(sls::IpAddr(arg.udp_dstip));
}
if (arg.udp_dstmac2 == 0 && arg.udp_dstip2 != 0) {
retvals[1] = setUdpIp2(sls::IpAddr(arg.udp_dstip2));
}
impl()->setUDPPortNumber(arg.udp_dstport);
impl()->setUDPPortNumber2(arg.udp_dstport2);
if (myDetectorType == JUNGFRAU) {
try {
impl()->setNumberofUDPInterfaces(arg.udpInterfaces);
} catch(const RuntimeError &e) {
throw RuntimeError("Failed to set number of interfaces to " +
std::to_string(arg.udpInterfaces));
}
}
impl()->setUDPSocketBufferSize(0);
// acquisition parameters
impl()->setNumberOfFrames(arg.frames);
impl()->setNumberOfTriggers(arg.triggers);
if (myDetectorType == GOTTHARD) {
impl()->setNumberOfBursts(arg.bursts);
}
if (myDetectorType == MOENCH || myDetectorType == CHIPTESTBOARD) {
try {
impl()->setNumberofAnalogSamples(arg.analogSamples);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set num analog samples to " +
std::to_string(arg.analogSamples) +
" due to fifo structure memory allocation.");
}
}
if (myDetectorType == CHIPTESTBOARD) {
try {
impl()->setNumberofDigitalSamples(arg.digitalSamples);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set num digital samples to "
+ std::to_string(arg.analogSamples) +
" due to fifo structure memory allocation.");
}
}
impl()->setAcquisitionTime(arg.expTimeNs);
impl()->setAcquisitionPeriod(arg.periodNs);
if (myDetectorType == EIGER) {
impl()->setSubExpTime(arg.subExpTimeNs);
impl()->setSubPeriod(arg.subExpTimeNs + arg.subDeadTimeNs);
impl()->setActivate(static_cast<bool>(arg.activate));
try {
impl()->setQuad(arg.quad == 0 ? false : true);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set quad to " +
std::to_string(arg.quad) +
" due to fifo strucutre memory allocation");
}
}
if (myDetectorType == EIGER || myDetectorType == MYTHEN3) {
try {
impl()->setDynamicRange(arg.dynamicRange);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set dynamic range. Could not allocate "
"memory for fifo or could not start listening/writing threads");
}
}
impl()->setTimingMode(arg.timMode);
if (myDetectorType == EIGER || myDetectorType == MOENCH ||
myDetectorType == CHIPTESTBOARD) {
try {
impl()->setTenGigaEnable(arg.tenGiga);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set 10GbE.");
}
}
if (myDetectorType == CHIPTESTBOARD) {
try {
impl()->setReadoutMode(arg.roMode);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set read out mode "
"due to fifo memory allocation.");
}
}
if (myDetectorType == CHIPTESTBOARD || myDetectorType == MOENCH) {
try {
impl()->setADCEnableMask(arg.adcMask);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set adc enable mask "
"due to fifo memory allcoation");
}
try {
impl()->setTenGigaADCEnableMask(arg.adc10gMask);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set 10Gb adc enable mask "
"due to fifo memory allcoation");
}
}
if (myDetectorType == GOTTHARD) {
try {
impl()->setROI(arg.roi);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set ROI");
}
}
if (myDetectorType == MYTHEN3) {
int ncounters = __builtin_popcount(arg.countermask);
impl()->setNumberofCounters(ncounters);
}
if (myDetectorType == GOTTHARD) {
impl()->setBurstMode(arg.burstType);
}
return socket.sendResult(retvals);
}
void ClientInterface::setDetectorType(detectorType arg) {
switch (arg) {
case GOTTHARD:
case EIGER:
@ -380,25 +530,6 @@ int ClientInterface::set_detector_type(Interface &socket) {
if (rawDataModifyReadyCallBack != nullptr)
impl()->registerCallBackRawDataModifyReady(
rawDataModifyReadyCallBack, pRawDataReady);
}
return socket.sendResult(myDetectorType);
}
int ClientInterface::set_detector_hostname(Interface &socket) {
char hostname[MAX_STR_LENGTH]{};
char retval[MAX_STR_LENGTH]{};
socket.Receive(hostname);
if (strlen(hostname) != 0) {
verifyIdle(socket);
impl()->setDetectorHostname(hostname);
}
auto s = impl()->getDetectorHostname();
sls::strcpy_safe(retval, s.c_str());
if (s.empty()) {
throw RuntimeError("Hostname not set");
}
return socket.sendResult(retval);
}
int ClientInterface::set_roi(Interface &socket) {
@ -514,7 +645,6 @@ int ClientInterface::set_num_analog_samples(Interface &socket) {
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set num analog samples to " + std::to_string(value) + " due to fifo structure memory allocation.");
}
return socket.Send(OK);
}
@ -831,7 +961,7 @@ int ClientInterface::enable_tengiga(Interface &socket) {
try {
impl()->setTenGigaEnable(val);
} catch(const RuntimeError &e) {
throw RuntimeError("Could not set 10GbE.");
throw RuntimeError("Could not set 10GbE." );
}
}
int retval = impl()->getTenGigaEnable();
@ -952,34 +1082,6 @@ int ClientInterface::get_file_format(Interface &socket) {
return socket.sendResult(retval);
}
int ClientInterface::set_detector_posid(Interface &socket) {
auto arg = socket.Receive<int>();
if (arg >= 0) {
verifyIdle(socket);
LOG(logDEBUG1) << "Setting detector position id:" << arg;
impl()->setDetectorPositionId(arg);
}
auto retval = impl()->getDetectorPositionId();
validate(arg, retval, "set detector position id", DEC);
LOG(logDEBUG1) << "Position Id:" << retval;
return socket.sendResult(retval);
}
int ClientInterface::set_multi_detector_size(Interface &socket) {
int arg[]{-1, -1};
socket.Receive(arg);
if ((arg[0] > 0) && (arg[1] > 0)) {
verifyIdle(socket);
LOG(logDEBUG1)
<< "Setting multi detector size:" << arg[0] << "," << arg[1];
impl()->setMultiDetectorSize(arg);
}
int *temp = impl()->getMultiDetectorSize(); // TODO! return by value!
int retval = temp[0] * temp[1];
LOG(logDEBUG1) << "Multi Detector Size:" << retval;
return socket.sendResult(retval);
}
int ClientInterface::set_streaming_port(Interface &socket) {
auto port = socket.Receive<int>();
if (port < 0) {
@ -1363,11 +1465,7 @@ int ClientInterface::set_read_n_lines(Interface &socket) {
return socket.Send(OK);
}
int ClientInterface::set_udp_ip(Interface &socket) {
auto arg = socket.Receive<sls::IpAddr>();
verifyIdle(socket);
LOG(logINFO) << "Received UDP IP: " << arg;
sls::MacAddr ClientInterface::setUdpIp(sls::IpAddr arg) {
// getting eth
std::string eth = sls::IpToInterfaceName(arg.str());
if (eth == "none") {
@ -1386,18 +1484,19 @@ int ClientInterface::set_udp_ip(Interface &socket) {
if (retval == 0) {
throw RuntimeError("Failed to get udp mac adddress to listen to (eth:" + eth + ", ip:" + arg.str() + ")\n");
}
return retval;
}
int ClientInterface::set_udp_ip(Interface &socket) {
auto arg = socket.Receive<sls::IpAddr>();
verifyIdle(socket);
LOG(logINFO) << "Received UDP IP: " << arg;
auto retval = setUdpIp(arg);
LOG(logINFO) << "Receiver MAC Address: " << retval;
return socket.sendResult(retval);
}
int ClientInterface::set_udp_ip2(Interface &socket) {
auto arg = socket.Receive<sls::IpAddr>();
verifyIdle(socket);
if (myDetectorType != JUNGFRAU) {
throw RuntimeError("UDP Destination IP2 not implemented for this detector");
}
LOG(logINFO) << "Received UDP IP2: " << arg;
sls::MacAddr ClientInterface::setUdpIp2(sls::IpAddr arg) {
// getting eth
std::string eth = sls::IpToInterfaceName(arg.str());
if (eth == "none") {
@ -1414,6 +1513,17 @@ int ClientInterface::set_udp_ip2(Interface &socket) {
if (retval == 0) {
throw RuntimeError("Failed to get udp mac adddress2 to listen to (eth:" + eth + ", ip:" + arg.str() + ")\n");
}
return retval;
}
int ClientInterface::set_udp_ip2(Interface &socket) {
auto arg = socket.Receive<sls::IpAddr>();
verifyIdle(socket);
if (myDetectorType != JUNGFRAU) {
throw RuntimeError("UDP Destination IP2 not implemented for this detector");
}
LOG(logINFO) << "Received UDP IP2: " << arg;
auto retval = setUdpIp2(arg);
LOG(logINFO) << "Receiver MAC Address2: " << retval;
return socket.sendResult(retval);
}

View File

@ -10,8 +10,18 @@ class ServerInterface;
#include <future>
class ClientInterface : private virtual slsDetectorDefs {
private:
enum numberMode { DEC, HEX };
detectorType myDetectorType;
int portNumber{0};
sls::ServerSocket server;
std::unique_ptr<Implementation> receiver;
std::unique_ptr<std::thread> tcpThread;
int ret{OK};
int fnum{-1};
int lockedByClient{0};
std::atomic<bool> killTcpThread{false};
public:
virtual ~ClientInterface();
@ -49,15 +59,14 @@ class ClientInterface : private virtual slsDetectorDefs {
void verifyLock();
void verifyIdle(sls::ServerInterface &socket);
int exec_command(sls::ServerInterface &socket);
int exit_server(sls::ServerInterface &socket);
int lock_receiver(sls::ServerInterface &socket);
int get_last_client_ip(sls::ServerInterface &socket);
int set_port(sls::ServerInterface &socket);
int get_version(sls::ServerInterface &socket);
int set_detector_type(sls::ServerInterface &socket);
int set_detector_hostname(sls::ServerInterface &socket);
int setup_receiver(sls::ServerInterface &socket);
void setDetectorType(detectorType arg);
int set_roi(sls::ServerInterface &socket);
int set_num_frames(sls::ServerInterface &socket);
int set_num_triggers(sls::ServerInterface &socket);
@ -102,8 +111,6 @@ class ClientInterface : private virtual slsDetectorDefs {
int set_flipped_data(sls::ServerInterface &socket);
int set_file_format(sls::ServerInterface &socket);
int get_file_format(sls::ServerInterface &socket);
int set_detector_posid(sls::ServerInterface &socket);
int set_multi_detector_size(sls::ServerInterface &socket);
int set_streaming_port(sls::ServerInterface &socket);
int get_streaming_port(sls::ServerInterface &socket);
int set_streaming_source_ip(sls::ServerInterface &socket);
@ -132,7 +139,9 @@ class ClientInterface : private virtual slsDetectorDefs {
int get_dbit_offset(sls::ServerInterface &socket);
int set_quad_type(sls::ServerInterface &socket);
int set_read_n_lines(sls::ServerInterface &socket);
sls::MacAddr setUdpIp(sls::IpAddr arg);
int set_udp_ip(sls::ServerInterface &socket);
sls::MacAddr setUdpIp2(sls::IpAddr arg);
int set_udp_ip2(sls::ServerInterface &socket);
int set_udp_port(sls::ServerInterface &socket);
int set_udp_port2(sls::ServerInterface &socket);
@ -144,7 +153,6 @@ class ClientInterface : private virtual slsDetectorDefs {
int get_additional_json_parameter(sls::ServerInterface &socket);
int get_progress(sls::ServerInterface &socket);
Implementation *impl() {
if (receiver != nullptr) {
return receiver.get();
@ -154,18 +162,8 @@ class ClientInterface : private virtual slsDetectorDefs {
}
}
detectorType myDetectorType;
std::unique_ptr<Implementation> receiver{nullptr};
int (ClientInterface::*flist[NUM_REC_FUNCTIONS])(
sls::ServerInterface &socket);
int ret{OK};
int fnum{-1};
int lockedByClient{0};
int portNumber{0};
std::atomic<bool> killTcpThread{false};
std::unique_ptr<std::thread> tcpThread;
//***callback parameters***
@ -179,6 +177,6 @@ class ClientInterface : private virtual slsDetectorDefs {
void *) = nullptr;
void *pRawDataReady{nullptr};
protected:
std::unique_ptr<sls::ServerSocket> server{nullptr};
};

View File

@ -29,13 +29,9 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f,
uint32_t* freq, uint32_t* timer,
bool* fp, bool* act, bool* depaden, bool* sm, bool* qe,
std::vector <int> * cdl, int* cdo, int* cad) :
ThreadObject(ind, TypeName),
runningFlag(false),
generalData(nullptr),
fifo(f),
myDetectorType(dtype),
file(nullptr),
dataStreamEnable(dsEnable),
fileFormatType(ftype),
fileWriteEnable(fwenable),
@ -43,7 +39,6 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f,
dynamicRange(dr),
streamingFrequency(freq),
streamingTimerInMs(timer),
currentFreqCount(0),
activated(act),
deactivatedPaddingEnable(depaden),
silentMode(sm),
@ -51,14 +46,7 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f,
framePadding(fp),
ctbDbitList(cdl),
ctbDbitOffset(cdo),
ctbAnalogDataBytes(cad),
startedFlag(false),
firstIndex(0),
numFramesCaught(0),
currentFrameIndex(0),
rawDataReadyCallBack(nullptr),
rawDataModifyReadyCallBack(nullptr),
pRawDataReady(nullptr)
ctbAnalogDataBytes(cad)
{
LOG(logDEBUG) << "DataProcessor " << ind << " created";
memset((void*)&timerBegin, 0, sizeof(timespec));
@ -71,10 +59,6 @@ DataProcessor::~DataProcessor() {
/** getters */
bool DataProcessor::IsRunning() {
return runningFlag;
}
bool DataProcessor::GetStartedFlag(){
return startedFlag;
}
@ -91,24 +75,12 @@ uint64_t DataProcessor::GetProcessedIndex() {
return currentFrameIndex - firstIndex;
}
/** setters */
void DataProcessor::StartRunning() {
runningFlag = true;
}
void DataProcessor::StopRunning() {
runningFlag = false;
}
void DataProcessor::SetFifo(Fifo* f) {
fifo = f;
}
void DataProcessor::ResetParametersforNewAcquisition(){
runningFlag = false;
StopRunning();
startedFlag = false;
numFramesCaught = 0;
firstIndex = 0;

View File

@ -59,11 +59,6 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
//*** getters ***
/**
* Returns if the thread is currently running
* @returns true if thread is running, else false
*/
bool IsRunning() override;
/**
* Get acquisition started flag
@ -89,17 +84,6 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
*/
uint64_t GetProcessedIndex();
//*** setters ***
/**
* Set bit in RunningMask to allow thread to run
*/
void StartRunning();
/**
* Reset bit in RunningMask to prevent thread from running
*/
void StopRunning();
/**
* Set Fifo pointer to the one given
* @param f address of Fifo pointer
@ -254,11 +238,8 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
/** type of thread */
static const std::string TypeName;
/** Object running status */
std::atomic<bool> runningFlag;
/** GeneralData (Detector Data) object */
const GeneralData* generalData;
const GeneralData* generalData{nullptr};
/** Fifo structure */
Fifo* fifo;
@ -269,7 +250,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
detectorType myDetectorType;
/** File writer implemented as binary or hdf5 File */
File* file;
File* file{nullptr};
/** Data Stream Enable */
bool* dataStreamEnable;
@ -293,7 +274,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
uint32_t* streamingTimerInMs;
/** Current frequency count */
uint32_t currentFreqCount;
uint32_t currentFreqCount{0};
/** timer beginning stamp for random streaming */
struct timespec timerBegin;
@ -324,21 +305,18 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
//acquisition start
/** Aquisition Started flag */
bool startedFlag;
std::atomic<bool> startedFlag{false};
/** Frame Number of First Frame */
uint64_t firstIndex;
std::atomic<uint64_t> firstIndex{0};
//for statistics
/** Number of complete frames caught */
uint64_t numFramesCaught;
uint64_t numFramesCaught{0};
/** Frame Number of latest processed frame number */
uint64_t currentFrameIndex;
std::atomic<uint64_t> currentFrameIndex{0};
//call back
/**
@ -349,7 +327,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
* dataSize in bytes is the size of the data in bytes.
*/
void (*rawDataReadyCallBack)(char*,
char*, uint32_t, void*);
char*, uint32_t, void*) = nullptr;
/**
* Call back for raw data (modified)
@ -359,9 +337,9 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
* revDatasize is the reference of data size in bytes. Can be modified to the new size to be written/streamed. (only smaller value).
*/
void (*rawDataModifyReadyCallBack)(char*,
char*, uint32_t &, void*);
char*, uint32_t &, void*) = nullptr;
void *pRawDataReady;
void *pRawDataReady{nullptr};

View File

@ -19,18 +19,11 @@ const std::string DataStreamer::TypeName = "DataStreamer";
DataStreamer::DataStreamer(int ind, Fifo* f, uint32_t* dr, ROI* r,
uint64_t* fi, int fd, int* nd, bool* qe, uint64_t* tot) :
ThreadObject(ind, TypeName),
runningFlag(0),
generalData(nullptr),
fifo(f),
zmqSocket(nullptr),
dynamicRange(dr),
roi(r),
adcConfigured(-1),
fileIndex(fi),
flippedDataX(fd),
startedFlag(false),
firstIndex(0),
completeBuffer(nullptr),
quadEnable(qe),
totalNumFrames(tot)
{
@ -46,29 +39,12 @@ DataStreamer::~DataStreamer() {
delete [] completeBuffer;
}
/** getters */
bool DataStreamer::IsRunning() {
return runningFlag;
}
/** setters */
void DataStreamer::StartRunning() {
runningFlag = true;
}
void DataStreamer::StopRunning() {
runningFlag = false;
}
void DataStreamer::SetFifo(Fifo* f) {
fifo = f;
}
void DataStreamer::ResetParametersforNewAcquisition(const std::string& fname){
runningFlag = false;
StopRunning();
startedFlag = false;
firstIndex = 0;

View File

@ -42,25 +42,6 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
*/
~DataStreamer();
//*** getters ***
/**
* Returns if the thread is currently running
* @returns true if thread is running, else false
*/
bool IsRunning();
//*** setters ***
/**
* Set bit in RunningMask to allow thread to run
*/
void StartRunning();
/**
* Reset bit in RunningMask to prevent thread from running
*/
void StopRunning();
/**
* Set Fifo pointer to the one given
* @param f address of Fifo pointer
@ -158,19 +139,14 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
/** type of thread */
static const std::string TypeName;
/** Object running status */
bool runningFlag;
/** GeneralData (Detector Data) object */
const GeneralData* generalData;
const GeneralData* generalData{nullptr};
/** Fifo structure */
Fifo* fifo;
/** ZMQ Socket - Receiver to Client */
ZmqSocket* zmqSocket;
ZmqSocket* zmqSocket{nullptr};
/** Pointer to dynamic range */
uint32_t* dynamicRange;
@ -179,7 +155,7 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
ROI* roi;
/** adc Configured */
int adcConfigured;
int adcConfigured{-1};
/** Pointer to file index */
uint64_t* fileIndex;
@ -191,16 +167,16 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
std::map<std::string, std::string> additionJsonHeader;
/** Aquisition Started flag */
bool startedFlag;
bool startedFlag{nullptr};
/** Frame Number of First Frame */
uint64_t firstIndex;
uint64_t firstIndex{0};
/* File name to stream */
std::string fileNametoStream;
/** Complete buffer used for roi, eg. shortGotthard */
char* completeBuffer;
char* completeBuffer{nullptr};
/** Number of Detectors in X and Y dimension */
int numDet[2];

View File

@ -12,6 +12,7 @@
#include "container_utils.h" // For sls::make_unique<>
#include "sls_detector_exceptions.h"
#include "UdpRxSocket.h"
#include "network_utils.h"
#include <cerrno>
#include <cstring>
@ -25,12 +26,9 @@ Listener::Listener(int ind, detectorType dtype, Fifo* f, std::atomic<runStatus>*
int64_t* us, int64_t* as, uint32_t* fpf,
frameDiscardPolicy* fdp, bool* act, bool* depaden, bool* sm) :
ThreadObject(ind, TypeName),
runningFlag(0),
generalData(nullptr),
fifo(f),
myDetectorType(dtype),
status(s),
udpSocket(nullptr),
udpPortNumber(portno),
eth(e),
numImages(nf),
@ -41,45 +39,22 @@ Listener::Listener(int ind, detectorType dtype, Fifo* f, std::atomic<runStatus>*
frameDiscardMode(fdp),
activated(act),
deactivatedPaddingEnable(depaden),
silentMode(sm),
row(0),
column(0),
startedFlag(false),
firstIndex(0),
numPacketsCaught(0),
lastCaughtFrameIndex(0),
currentFrameIndex(0),
carryOverFlag(0),
udpSocketAlive(0),
numPacketsStatistic(0),
numFramesStatistic(0),
oddStartingPacket(true)
silentMode(sm)
{
LOG(logDEBUG) << "Listener " << ind << " created";
}
Listener::~Listener() = default;
Listener::~Listener() {
if (udpSocket){
sem_post(&semaphore_socket);
sem_destroy(&semaphore_socket);
}
}
/** getters */
bool Listener::IsRunning() {
return runningFlag;
}
uint64_t Listener::GetPacketsCaught() {
uint64_t Listener::GetPacketsCaught() const {
return numPacketsCaught;
}
uint64_t Listener::GetLastFrameIndexCaught() {
uint64_t Listener::GetLastFrameIndexCaught() const {
return lastCaughtFrameIndex;
}
uint64_t Listener::GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) {
uint64_t Listener::GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) const {
if (!stoppedFlag) {
return (numPackets - numPacketsCaught);
}
@ -89,24 +64,12 @@ uint64_t Listener::GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) {
return (lastCaughtFrameIndex - firstIndex + 1) * generalData->packetsPerFrame - numPacketsCaught;
}
/** setters */
void Listener::StartRunning() {
runningFlag = true;
}
void Listener::StopRunning() {
runningFlag = false;
}
void Listener::SetFifo(Fifo* f) {
fifo = f;
}
void Listener::ResetParametersforNewAcquisition() {
runningFlag = false;
StopRunning();
startedFlag = false;
numPacketsCaught = 0;
firstIndex = 0;
@ -174,10 +137,9 @@ void Listener::CreateUDPSockets() {
}
udpSocketAlive = true;
sem_init(&semaphore_socket,1,0);
// doubled due to kernel bookkeeping (could also be less due to permissions)
*actualUDPSocketBufferSize = udpSocket->getActualUDPSocketBufferSize();
*actualUDPSocketBufferSize = udpSocket->getBufferSize();
}
@ -185,14 +147,8 @@ void Listener::CreateUDPSockets() {
void Listener::ShutDownUDPSocket() {
if(udpSocket){
udpSocketAlive = false;
udpSocket->ShutDownSocket();
udpSocket->Shutdown();
LOG(logINFO) << "Shut down of UDP port " << *udpPortNumber;
fflush(stdout);
// wait only if the threads have started as it is the threads that
//give a post to semaphore(at stopListening)
if (runningFlag)
sem_wait(&semaphore_socket);
sem_destroy(&semaphore_socket);
}
}
@ -220,7 +176,7 @@ void Listener::CreateDummySocketForUDPSocketBufferSize(int64_t s) {
*udpSocketBufferSize);
// doubled due to kernel bookkeeping (could also be less due to permissions)
*actualUDPSocketBufferSize = g.getActualUDPSocketBufferSize();
*actualUDPSocketBufferSize = g.getBufferSize();
if (*actualUDPSocketBufferSize == -1) {
*udpSocketBufferSize = temp;
} else {
@ -300,8 +256,6 @@ void Listener::StopListening(char* buf) {
(*((uint32_t*)buf)) = DUMMY_PACKET_VALUE;
fifo->PushAddress(buf);
StopRunning();
sem_post(&semaphore_socket);
LOG(logDEBUG1) << index << ": Listening Packets (" << *udpPortNumber << ") : " << numPacketsCaught;
LOG(logDEBUG1) << index << ": Listening Completed";
}

View File

@ -12,13 +12,10 @@
#include <memory>
#include <atomic>
#include "ThreadObject.h"
#include "UdpRxSocket.h"
class GeneralData;
class Fifo;
namespace sls{
class UdpRxSocket;
}
class Listener : private virtual slsDetectorDefs, public ThreadObject {
@ -53,40 +50,20 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
*/
~Listener();
//*** getters ***
/**
* Returns if the thread is currently running
* @returns true if thread is running, else false
*/
bool IsRunning() override;
/**
* Get Packets caught
* @return Packets caught
*/
uint64_t GetPacketsCaught();
uint64_t GetPacketsCaught() const;
/**
* Get Last Frame index caught
* @return last frame index caught
*/
uint64_t GetLastFrameIndexCaught();
uint64_t GetLastFrameIndexCaught() const;
/** Get number of missing packets */
uint64_t GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets);
//*** setters ***
/**
* Set bit in RunningMask to allow thread to run
*/
void StartRunning();
/**
* Reset bit in RunningMask to prevent thread from running
*/
void StopRunning();
uint64_t GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) const;
/**
* Set Fifo pointer to the one given
@ -140,7 +117,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
void RecordFirstIndex(uint64_t fnum);
/**
* Thread Exeution for Listener Class
* Thread Execution for Listener Class
* Pop free addresses, listen to udp socket,
* write to memory & push the address into fifo
*/
@ -168,16 +145,11 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
*/
void PrintFifoStatistics();
/** type of thread */
static const std::string TypeName;
/** Object running status */
std::atomic<bool> runningFlag;
/** GeneralData (Detector Data) object */
GeneralData* generalData;
GeneralData* generalData{nullptr};
/** Fifo structure */
Fifo* fifo;
@ -190,7 +162,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
std::atomic<runStatus>* status;
/** UDP Socket - Detector to Receiver */
std::unique_ptr<sls::UdpRxSocket> udpSocket;
std::unique_ptr<sls::UdpRxSocket> udpSocket{nullptr};
/** UDP Port Number */
uint32_t* udpPortNumber;
@ -228,36 +200,34 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
/** row hardcoded as 1D or 2d,
* if detector does not send them yet or
* missing packets/deactivated (eiger/jungfrau sends 2d pos) **/
uint16_t row;
uint16_t row{0};
/** column hardcoded as 2D,
* deactivated eiger/missing packets (eiger/jungfrau sends 2d pos) **/
uint16_t column;
uint16_t column{0};
// acquisition start
/** Aquisition Started flag */
std::atomic<bool> startedFlag;
std::atomic<bool> startedFlag{false};
/** Frame Number of First Frame */
uint64_t firstIndex;
uint64_t firstIndex{0};
// for acquisition summary
/** Number of complete Packets caught */
std::atomic<uint64_t> numPacketsCaught;
std::atomic<uint64_t> numPacketsCaught{0};
/** Last Frame Index caught from udp network */
std::atomic<uint64_t> lastCaughtFrameIndex;
std::atomic<uint64_t> lastCaughtFrameIndex{0};
// parameters to acquire image
/** Current Frame Index, default value is 0
* ( always check startedFlag for validity first)
*/
uint64_t currentFrameIndex;
uint64_t currentFrameIndex{0};
/** True if there is a packet carry over from previous Image */
bool carryOverFlag;
bool carryOverFlag{false};
/** Carry over packet buffer */
std::unique_ptr<char []> carryOverPacket;
@ -266,22 +236,19 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
std::unique_ptr<char []> listeningPacket;
/** if the udp socket is connected */
std::atomic<bool> udpSocketAlive;
std::atomic<bool> udpSocketAlive{false};
/** Semaphore to synchonize deleting udp socket */
sem_t semaphore_socket;
// for print progress during acqusition
// for print progress during acquisition
/** number of packets for statistic */
uint32_t numPacketsStatistic;
uint32_t numPacketsStatistic{0};
/** number of images for statistic */
uint32_t numFramesStatistic;
uint32_t numFramesStatistic{0};
/**
* starting packet number is odd or evern, accordingly increment frame number
* starting packet number is odd or even, accordingly increment frame number
* to get first packet number as 0
* (pecific to gotthard, can vary between modules, hence defined here) */
bool oddStartingPacket;
bool oddStartingPacket{true};
};

View File

@ -42,17 +42,14 @@ int main(int argc, char *argv[]) {
LOG(logERROR) << "Could not set handler function for SIGPIPE";
}
std::unique_ptr<Receiver> receiver = nullptr;
try {
receiver = sls::make_unique<Receiver>(argc, argv);
} catch (...) {
LOG(logINFOBLUE) << "Exiting [ Tid: " << syscall(SYS_gettid) << " ]";
throw;
}
Receiver r(argc, argv);
LOG(logINFO) << "[ Press \'Ctrl+c\' to exit ]";
sem_wait(&semaphore);
sem_destroy(&semaphore);
} catch (...) {
//pass
}
LOG(logINFOBLUE) << "Exiting [ Tid: " << syscall(SYS_gettid) << " ]";
LOG(logINFO) << "Exiting Receiver";
return 0;

View File

@ -3,56 +3,51 @@
* @short creates/destroys a thread
***********************************************/
#include "ThreadObject.h"
#include "container_utils.h"
#include <iostream>
#include <sys/syscall.h>
#include <unistd.h>
ThreadObject::ThreadObject(int threadIndex, std::string threadType)
: index(threadIndex), type(threadType) {
LOG(logDEBUG) << type << " thread created: " << index;
sem_init(&semaphore,1,0);
try {
threadObject = sls::make_unique<std::thread>(&ThreadObject::RunningThread, this);
threadObject = std::thread(&ThreadObject::RunningThread, this);
} catch (...) {
throw sls::RuntimeError("Could not create " + type + " thread with index " + std::to_string(index));
}
}
ThreadObject::~ThreadObject() {
killThread = true;
sem_post(&semaphore);
threadObject->join();
threadObject.join();
sem_destroy(&semaphore);
}
bool ThreadObject::IsRunning() const{
return runningFlag;
}
void ThreadObject::StartRunning() {
runningFlag = true;
}
void ThreadObject::StopRunning() {
runningFlag = false;
}
void ThreadObject::RunningThread() {
LOG(logINFOBLUE) << "Created [ " << type << "Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]";
LOG(logDEBUG) << type << " thread " << index << " created successfully.";
while(true) {
while(!killThread) {
while(IsRunning()) {
ThreadExecution();
}
//wait till the next acquisition
sem_wait(&semaphore);
if(killThread) {
break;
}
}
LOG(logDEBUG) << type << " thread with index " << index << " destroyed successfully.";
LOG(logINFOBLUE) << "Exiting [ " << type << " Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]";
}
@ -61,11 +56,10 @@ void ThreadObject::Continue() {
sem_post(&semaphore);
}
void ThreadObject::SetThreadPriority(int priority) {
struct sched_param param;
param.sched_priority = priority;
if (pthread_setschedparam(threadObject->native_handle(), SCHED_FIFO, &param) == EPERM) {
if (pthread_setschedparam(threadObject.native_handle(), SCHED_FIFO, &param) == EPERM) {
if (index == 0) {
LOG(logWARNING) << "Could not prioritize " << type << " thread. "
"(No Root Privileges?)";

View File

@ -7,41 +7,40 @@
*@short creates/destroys a thread
*/
#include "sls_detector_defs.h"
#include "logger.h"
#include "sls_detector_defs.h"
#include <atomic>
#include <thread>
#include <semaphore.h>
#include <string>
#include <atomic>
#include <future>
class ThreadObject : private virtual slsDetectorDefs {
protected:
const int index{0};
private:
std::atomic<bool> killThread{false};
std::atomic<bool> runningFlag{false};
std::thread threadObject;
sem_t semaphore;
const std::string type;
public:
ThreadObject(int threadIndex, std::string threadType);
virtual ~ThreadObject();
virtual bool IsRunning() = 0;
bool IsRunning() const;
void StartRunning();
void StopRunning();
void Continue();
void SetThreadPriority(int priority);
protected:
virtual void ThreadExecution() = 0;
private:
virtual void ThreadExecution() = 0;
/**
* Thread called: An infinite while loop in which,
* semaphore starts executing its contents as long RunningMask is satisfied
* Then it exits the thread on its own if killThread is true
*/
void RunningThread();
protected:
int index{0};
std::string type;
std::atomic<bool> killThread{false};
std::unique_ptr<std::thread> threadObject;
sem_t semaphore;
};

View File

@ -8,6 +8,7 @@ set(SOURCES
src/ToString.cpp
src/network_utils.cpp
src/ZmqSocket.cpp
src/UdpRxSocket.cpp
)
set(HEADERS
@ -41,11 +42,13 @@ add_library(slsSupportLib SHARED
${HEADERS}
)
check_ipo_supported(RESULT result)
if(result)
if(SLS_LTO_AVAILABLE)
set_property(TARGET slsSupportLib PROPERTY INTERPROCEDURAL_OPTIMIZATION True)
endif()
target_include_directories(slsSupportLib PUBLIC
"$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>"
"$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>"

View File

@ -21,7 +21,7 @@ class DataSocket {
//No copy since the class manage the underlying socket
DataSocket(const DataSocket &) = delete;
DataSocket &operator=(DataSocket const &) = delete;
int getSocketId() const { return socketId_; }
int getSocketId() const { return sockfd_; }
int Send(const void *buffer, size_t size);
@ -51,9 +51,10 @@ class DataSocket {
int setReceiveTimeout(int us);
void close();
void shutDownSocket();
void shutdown();
private:
int socketId_ = -1;
int sockfd_ = -1;
};
}; // namespace sls

View File

@ -1,141 +1,30 @@
#pragma once
/*
UdpRxSocket provies socket control to receive
data on a udp socket.
It provides a drop in replacement for
genericSocket. But please be careful since
this might be deprecated in the future
UDP socket class to receive data. The intended use is in the
receiver listener loop. Should be used RAII style...
*/
#include "network_utils.h"
#include "sls_detector_exceptions.h"
#include <cstdint>
#include <errno.h>
#include <iostream>
#include <netdb.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
#include <vector>
#include <sys/types.h> //ssize_t
namespace sls {
class UdpRxSocket {
const ssize_t packet_size;
char *buff;
int fd = -1;
const ssize_t packet_size_;
int sockfd_{-1};
public:
UdpRxSocket(int port, ssize_t packet_size, const char *hostname = nullptr,
ssize_t buffer_size = 0)
: packet_size(packet_size) {
/* hostname = nullptr -> wildcard */
size_t kernel_buffer_size = 0);
~UdpRxSocket();
bool ReceivePacket(char *dst) noexcept;
size_t getBufferSize() const;
void setBufferSize(ssize_t size);
ssize_t getPacketSize() const noexcept;
void Shutdown();
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = 0;
hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
struct addrinfo *res = 0;
const std::string portname = std::to_string(port);
if (getaddrinfo(hostname, portname.c_str(), &hints, &res)) {
throw RuntimeError("Failed at getaddrinfo with " +
std::string(hostname));
}
fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (fd == -1) {
throw RuntimeError("Failed to create UDP RX socket");
}
if (bind(fd, res->ai_addr, res->ai_addrlen) == -1) {
throw RuntimeError("Failed to bind UDP RX socket");
}
freeaddrinfo(res);
// If we get a specified buffer size that is larger than the set one
// we set it. Otherwise we leave it there since it could have been
// set by the rx_udpsocksize command
if (buffer_size) {
auto current = getBufferSize() / 2;
if (current < buffer_size) {
setBufferSize(buffer_size);
if (getBufferSize() / 2 < buffer_size) {
LOG(logWARNING)
<< "Could not set buffer size. Got: "
<< getBufferSize() / 2 << " instead of " << buffer_size;
}
}
}
// Allocate at the end to avoid memory leak if we throw
buff = new char[packet_size];
}
~UdpRxSocket() {
delete[] buff;
Shutdown();
}
const char *LastPacket() const noexcept { return buff; }
ssize_t getPacketSize() const noexcept { return packet_size; }
bool ReceivePacket() noexcept { return ReceivePacket(buff); }
bool ReceivePacket(char *dst, int flags = 0) noexcept {
auto bytes_received =
recvfrom(fd, dst, packet_size, flags, nullptr, nullptr);
return bytes_received == packet_size;
}
bool PeekPacket() noexcept{
return ReceivePacket(buff, MSG_PEEK);
}
// Only for backwards compatibility this function will be removed during
// refactoring of the receiver
ssize_t ReceiveDataOnly(char *dst) {
auto r = recvfrom(fd, dst, packet_size, 0, nullptr, nullptr);
constexpr ssize_t eiger_header_packet =
40; // only detector that has this
if (r == eiger_header_packet) {
LOG(logWARNING) << "Got header pkg";
r = recvfrom(fd, dst, packet_size, 0, nullptr, nullptr);
}
return r;
}
ssize_t getBufferSize() const {
uint64_t ret_size = 0;
socklen_t optlen = sizeof(uint64_t);
if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &ret_size, &optlen) == -1)
return -1;
else
return ret_size;
}
// Only for backwards compatibility will be removed
ssize_t getActualUDPSocketBufferSize() const { return getBufferSize(); }
// Only for backwards compatibility will be removed
void ShutDownSocket() { Shutdown(); }
void setBufferSize(ssize_t size) {
socklen_t optlen = sizeof(size);
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &size, optlen)) {
throw RuntimeError("Could not set socket buffer size");
}
}
void Shutdown() {
shutdown(fd, SHUT_RDWR);
if (fd >= 0) {
close(fd);
fd = -1;
}
}
// Only for backwards compatibility, this drops the EIGER small pkt, may be
// removed
ssize_t ReceiveDataOnly(char *dst) noexcept;
};
} // namespace sls

View File

@ -175,7 +175,7 @@ class slsDetectorDefs {
struct ROI {
int xmin{-1}; /**< is the roi xmin (in channel number) */
int xmax{-1}; /**< is the roi xmax (in channel number)*/
};
}__attribute__((packed));
#else
typedef struct {
int xmin; /**< is the roi xmin (in channel number) */
@ -203,7 +203,7 @@ class slsDetectorDefs {
int y{0};
xy() = default;
xy(int x, int y):x(x),y(y){};
};
}__attribute__((packed));
#endif
@ -460,6 +460,44 @@ class slsDetectorDefs {
TIMING_EXTERNAL
};
#ifdef __cplusplus
/**
* structure to udpate receiver
*/
struct rxParameters {
detectorType detType{GENERIC};
xy multiSize;
int detId{0};
char hostname[MAX_STR_LENGTH];
int udpInterfaces{1};
int udp_dstport{0};
uint32_t udp_dstip{0U};
uint64_t udp_dstmac{0LU};
int udp_dstport2{0};
uint32_t udp_dstip2{0U};
uint64_t udp_dstmac2{0LU};
int64_t frames{0};
int64_t triggers{0};
int64_t bursts{0};
int analogSamples{0};
int digitalSamples{0};
int64_t expTimeNs{0};
int64_t periodNs{0};
int64_t subExpTimeNs{0};
int64_t subDeadTimeNs{0};
int activate{0};
int quad{0};
int dynamicRange{16};
timingMode timMode{AUTO_TIMING};
int tenGiga{0};
readoutMode roMode{ANALOG_ONLY};
uint32_t adcMask{0};
uint32_t adc10gMask{0};
ROI roi;
uint32_t countermask{0};
burstMode burstType{BURST_OFF};
}__attribute__((packed));
#endif
#ifdef __cplusplus
protected:

View File

@ -197,6 +197,7 @@ enum detFuncs{
F_SET_TIMING_SOURCE,
F_GET_NUM_CHANNELS,
F_UPDATE_RATE_CORRECTION,
F_GET_RECEIVER_PARAMETERS,
NUM_DET_FUNCTIONS,
RECEIVER_ENUM_START = 256, /**< detector function should not exceed this (detector server should not compile anyway) */
@ -207,8 +208,6 @@ enum detFuncs{
F_GET_LAST_RECEIVER_CLIENT_IP,
F_SET_RECEIVER_PORT,
F_GET_RECEIVER_VERSION,
F_GET_RECEIVER_TYPE,
F_SEND_RECEIVER_DETHOSTNAME,
F_RECEIVER_SET_ROI,
F_RECEIVER_SET_NUM_FRAMES,
F_SET_RECEIVER_NUM_TRIGGERS,
@ -252,8 +251,6 @@ enum detFuncs{
F_SET_FLIPPED_DATA_RECEIVER,
F_SET_RECEIVER_FILE_FORMAT,
F_GET_RECEIVER_FILE_FORMAT,
F_SEND_RECEIVER_DETPOSID,
F_SEND_RECEIVER_MULTIDETSIZE,
F_SET_RECEIVER_STREAMING_PORT,
F_GET_RECEIVER_STREAMING_PORT,
F_SET_RECEIVER_STREAMING_SRC_IP,
@ -293,6 +290,7 @@ enum detFuncs{
F_SET_ADDITIONAL_JSON_PARAMETER,
F_GET_ADDITIONAL_JSON_PARAMETER,
F_GET_RECEIVER_PROGRESS,
F_SETUP_RECEIVER,
NUM_REC_FUNCTIONS
};
@ -487,6 +485,7 @@ static const char* getFunctionNameFromEnum(enum detFuncs func) {
case F_SET_TIMING_SOURCE: return "F_SET_TIMING_SOURCE";
case F_GET_NUM_CHANNELS: return "F_GET_NUM_CHANNELS";
case F_UPDATE_RATE_CORRECTION: return "F_UPDATE_RATE_CORRECTION";
case F_GET_RECEIVER_PARAMETERS: return "F_GET_RECEIVER_PARAMETERS";
case NUM_DET_FUNCTIONS: return "NUM_DET_FUNCTIONS";
case RECEIVER_ENUM_START: return "RECEIVER_ENUM_START";
@ -497,8 +496,7 @@ static const char* getFunctionNameFromEnum(enum detFuncs func) {
case F_GET_LAST_RECEIVER_CLIENT_IP: return "F_GET_LAST_RECEIVER_CLIENT_IP";
case F_SET_RECEIVER_PORT: return "F_SET_RECEIVER_PORT";
case F_GET_RECEIVER_VERSION: return "F_GET_RECEIVER_VERSION";
case F_GET_RECEIVER_TYPE: return "F_GET_RECEIVER_TYPE";
case F_SEND_RECEIVER_DETHOSTNAME: return "F_SEND_RECEIVER_DETHOSTNAME";
case F_SETUP_RECEIVER: return "F_SETUP_RECEIVER";
case F_RECEIVER_SET_ROI: return "F_RECEIVER_SET_ROI";
case F_RECEIVER_SET_NUM_FRAMES: return "F_RECEIVER_SET_NUM_FRAMES";
case F_SET_RECEIVER_NUM_TRIGGERS: return "F_SET_RECEIVER_NUM_TRIGGERS";
@ -542,8 +540,6 @@ static const char* getFunctionNameFromEnum(enum detFuncs func) {
case F_SET_FLIPPED_DATA_RECEIVER: return "F_SET_FLIPPED_DATA_RECEIVER";
case F_SET_RECEIVER_FILE_FORMAT: return "F_SET_RECEIVER_FILE_FORMAT";
case F_GET_RECEIVER_FILE_FORMAT: return "F_GET_RECEIVER_FILE_FORMAT";
case F_SEND_RECEIVER_DETPOSID: return "F_SEND_RECEIVER_DETPOSID";
case F_SEND_RECEIVER_MULTIDETSIZE: return "F_SEND_RECEIVER_MULTIDETSIZE";
case F_SET_RECEIVER_STREAMING_PORT: return "F_SET_RECEIVER_STREAMING_PORT";
case F_GET_RECEIVER_STREAMING_PORT: return "F_GET_RECEIVER_STREAMING_PORT";
case F_SET_RECEIVER_STREAMING_SRC_IP: return "F_SET_RECEIVER_STREAMING_SRC_IP";

View File

@ -1,12 +1,12 @@
/** API versions */
#define GITBRANCH "removeshm"
#define APILIB 0x200402
#define APIRECEIVER 0x200402
#define APIGUI 0x200331
#define APICTB 0x200407
#define APIGOTTHARD 0x200407
#define APIGOTTHARD2 0x200407
#define APIJUNGFRAU 0x200407
#define APIMYTHEN3 0x200407
#define APIMOENCH 0x200407
#define APIEIGER 0x200408
#define GITBRANCH "setrxhostname"
#define APILIB 0x200409
#define APIRECEIVER 0x200409
#define APIGUI 0x200409
#define APIEIGER 0x200409
#define APICTB 0x200409
#define APIGOTTHARD 0x200409
#define APIGOTTHARD2 0x200409
#define APIJUNGFRAU 0x200409
#define APIMYTHEN3 0x200409
#define APIMOENCH 0x200409

View File

@ -15,13 +15,13 @@
namespace sls {
DataSocket::DataSocket(int socketId) : socketId_(socketId) {
DataSocket::DataSocket(int socketId) : sockfd_(socketId) {
int value = 1;
setsockopt(socketId_, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
}
DataSocket::~DataSocket() {
if (socketId_ <= 0) {
if (sockfd_ <= 0) {
return;
} else {
try {
@ -32,7 +32,7 @@ DataSocket::~DataSocket() {
}
void DataSocket::swap(DataSocket &other) noexcept {
std::swap(socketId_, other.socketId_);
std::swap(sockfd_, other.sockfd_);
}
DataSocket::DataSocket(DataSocket &&move) noexcept { move.swap(*this); }
@ -121,19 +121,23 @@ int DataSocket::setTimeOut(int t_seconds) {
}
void DataSocket::close() {
if (socketId_ > 0) {
if (::close(socketId_)) {
if (sockfd_ > 0) {
if (::close(sockfd_)) {
throw SocketError("could not close socket");
}
socketId_ = -1;
sockfd_ = -1;
} else {
throw std::runtime_error("Socket ERROR: close called on bad socket\n");
}
}
void DataSocket::shutDownSocket() {
shutdown(getSocketId(), SHUT_RDWR);
::shutdown(getSocketId(), SHUT_RDWR);
close();
}
void DataSocket::shutdown(){
::shutdown(sockfd_, SHUT_RDWR);
}
} // namespace sls

View File

@ -0,0 +1,96 @@
#include "UdpRxSocket.h"
#include "network_utils.h"
#include "sls_detector_exceptions.h"
#include <cstdint>
#include <errno.h>
#include <iostream>
#include <netdb.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
namespace sls {
UdpRxSocket::UdpRxSocket(int port, ssize_t packet_size, const char *hostname,
size_t kernel_buffer_size)
: packet_size_(packet_size) {
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = 0;
hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
struct addrinfo *res = 0;
const std::string portname = std::to_string(port);
if (getaddrinfo(hostname, portname.c_str(), &hints, &res)) {
throw RuntimeError("Failed at getaddrinfo with " +
std::string(hostname));
}
sockfd_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (sockfd_ == -1) {
throw RuntimeError("Failed to create UDP RX socket");
}
if (bind(sockfd_, res->ai_addr, res->ai_addrlen) == -1) {
throw RuntimeError("Failed to bind UDP RX socket");
}
freeaddrinfo(res);
// If we get a specified buffer size that is larger than the set one
// we set it. Otherwise we leave it there since it could have been
// set by the rx_udpsocksize command
if (kernel_buffer_size) {
auto current = getBufferSize() / 2;
if (current < kernel_buffer_size) {
setBufferSize(kernel_buffer_size);
if (getBufferSize() / 2 < kernel_buffer_size) {
LOG(logWARNING)
<< "Could not set buffer size. Got: " << getBufferSize() / 2
<< " instead of " << kernel_buffer_size;
}
}
}
}
UdpRxSocket::~UdpRxSocket() { Shutdown(); }
ssize_t UdpRxSocket::getPacketSize() const noexcept { return packet_size_; }
bool UdpRxSocket::ReceivePacket(char *dst) noexcept{
auto bytes_received =
recvfrom(sockfd_, dst, packet_size_, 0, nullptr, nullptr);
return bytes_received == packet_size_;
}
ssize_t UdpRxSocket::ReceiveDataOnly(char *dst) noexcept {
auto r = recvfrom(sockfd_, dst, packet_size_, 0, nullptr, nullptr);
constexpr ssize_t eiger_header_packet =
40; // only detector that has this
if (r == eiger_header_packet) {
LOG(logWARNING) << "Got header pkg";
r = recvfrom(sockfd_, dst, packet_size_, 0, nullptr, nullptr);
}
return r;
}
size_t UdpRxSocket::getBufferSize() const {
size_t ret = 0;
socklen_t optlen = sizeof(ret);
if (getsockopt(sockfd_, SOL_SOCKET, SO_RCVBUF, &ret, &optlen) == -1)
throw RuntimeError("Could not get socket buffer size");
return ret;
}
void UdpRxSocket::setBufferSize(ssize_t size) {
if (setsockopt(sockfd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)))
throw RuntimeError("Could not set socket buffer size");
}
void UdpRxSocket::Shutdown() {
shutdown(sockfd_, SHUT_RDWR);
if (sockfd_ >= 0) {
close(sockfd_);
sockfd_ = -1;
}
}
} // namespace sls

View File

@ -109,7 +109,7 @@ TEST_CASE("run status"){
using defs = slsDetectorDefs;
REQUIRE(ToString(defs::runStatus::ERROR) == "error");
REQUIRE(ToString(defs::runStatus::WAITING) == "waiting");
REQUIRE(ToString(defs::runStatus::TRANSMITTING) == "data"); //??
REQUIRE(ToString(defs::runStatus::TRANSMITTING) == "transmitting");
REQUIRE(ToString(defs::runStatus::RUN_FINISHED) == "finished");
REQUIRE(ToString(defs::runStatus::STOPPED) == "stopped");
REQUIRE(ToString(defs::runStatus::IDLE) == "idle");

View File

@ -4,6 +4,14 @@
#include <future>
#include <thread>
#include <vector>
#include <cstdint>
#include <errno.h>
#include <iostream>
#include <netdb.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
constexpr int default_port = 50001;
@ -29,46 +37,49 @@ int open_socket(int port) {
throw sls::RuntimeError("Failed to create UDP RX socket");
}
if (connect(fd, res->ai_addr, res->ai_addrlen)){
if (connect(fd, res->ai_addr, res->ai_addrlen)) {
throw sls::RuntimeError("Failed to connect socket");
}
freeaddrinfo(res);
return fd;
}
TEST_CASE("Receive data on localhost") {
TEST_CASE("Get packet size returns the packet size we set in the constructor"){
constexpr int port = 50001;
constexpr ssize_t packet_size = 8000;
sls::UdpRxSocket s{port, packet_size};
CHECK(s.getPacketSize() == packet_size);
}
TEST_CASE("Receive data from a vector") {
constexpr int port = 50001;
std::vector<int> data_to_send{4, 5, 3, 2, 5, 7, 2, 3};
std::vector<int> data_received(data_to_send.size());
ssize_t packet_size =
sizeof(decltype(data_to_send)::value_type) * data_to_send.size();
sls::UdpRxSocket udpsock{port, packet_size};
sls::UdpRxSocket udpsock{port, packet_size};
int fd = open_socket(port);
auto n = write(fd, data_to_send.data(), packet_size);
CHECK(n == packet_size);
CHECK(udpsock.ReceivePacket());
CHECK(udpsock.ReceivePacket((char*)data_received.data()));
close(fd);
// Copy data from buffer and compare values
std::vector<int> data_received(data_to_send.size());
memcpy(data_received.data(), udpsock.LastPacket(), udpsock.getPacketSize());
CHECK(data_received.size() == data_to_send.size()); // sanity check
for (size_t i = 0; i != data_to_send.size(); ++i) {
CHECK(data_to_send[i] == data_received[i]);
}
CHECK(data_to_send == data_received);
}
TEST_CASE("Shutdown socket without hanging when waiting for data") {
constexpr int port = 50001;
constexpr ssize_t packet_size = 8000;
sls::UdpRxSocket s{port, packet_size};
char buff[packet_size];
// Start a thread and wait for package
// if the socket is left open we would block
std::future<bool> ret =
std::async(static_cast<bool (sls::UdpRxSocket::*)()>(
&sls::UdpRxSocket::ReceivePacket),
&s);
std::async(&sls::UdpRxSocket::ReceivePacket, &s, (char *)&buff);
s.Shutdown();
auto r = ret.get();
@ -76,60 +87,23 @@ TEST_CASE("Shutdown socket without hanging when waiting for data") {
CHECK(r == false); // since we didn't get the packet
}
TEST_CASE("Too small packet"){
TEST_CASE("Too small packet") {
constexpr int port = 50001;
sls::UdpRxSocket s(port, 2*sizeof(uint32_t));
sls::UdpRxSocket s(port, 2 * sizeof(uint32_t));
auto fd = open_socket(port);
uint32_t val = 10;
write(fd, &val, sizeof(val));
CHECK(s.ReceivePacket() == false);
uint32_t buff[2];
CHECK(s.ReceivePacket((char *)&buff) == false);
close(fd);
}
TEST_CASE("Receive an int to internal buffer"){
TEST_CASE("Receive an int to an external buffer") {
int to_send = 5;
int received = -1;
auto fd = open_socket(default_port);
sls::UdpRxSocket s(default_port, sizeof(int));
write(fd, &to_send, sizeof(to_send));
CHECK(s.ReceivePacket());
memcpy(&received, s.LastPacket(), sizeof(int));
CHECK(s.ReceivePacket(reinterpret_cast<char *>(&received)));
CHECK(received == to_send);
}
TEST_CASE("Receive an int to an external buffer"){
int to_send = 5;
int received = -1;
auto fd = open_socket(default_port);
sls::UdpRxSocket s(default_port, sizeof(int));
write(fd, &to_send, sizeof(to_send));
CHECK(s.ReceivePacket(reinterpret_cast<char*>(&received)));
CHECK(received == to_send);
}
TEST_CASE("PEEK data"){
int to_send = 5;
int to_send2 = 12;
int received = -1;
auto fd = open_socket(default_port);
sls::UdpRxSocket s(default_port, sizeof(int));
write(fd, &to_send, sizeof(to_send));
write(fd, &to_send2, sizeof(to_send));
CHECK(s.PeekPacket());
memcpy(&received, s.LastPacket(), sizeof(int));
CHECK(received == to_send);
CHECK(s.PeekPacket());
memcpy(&received, s.LastPacket(), sizeof(int));
CHECK(received == to_send);
CHECK(s.ReceivePacket());
memcpy(&received, s.LastPacket(), sizeof(int));
CHECK(received == to_send);
CHECK(s.ReceivePacket());
memcpy(&received, s.LastPacket(), sizeof(int));
CHECK(received == to_send2);
}