WIP, rxr constr done

This commit is contained in:
maliakal_d 2020-04-16 13:58:59 +02:00
parent 2921cbfac8
commit d536ad2b5b
11 changed files with 943 additions and 277 deletions

View File

@ -19,7 +19,7 @@ class IpAddr;
//Free function to avoid dependence on class
//and avoid the option to free another objects
//shm by mistake
void freeSharedMemory(int detectorId, int detPos = -1);
void freeSharedMemory(int detectorId, int moduleId = -1);
/**
@ -57,6 +57,8 @@ class Detector {
/* Frees shared memory, adds detectors to the list
* and updates local detector cache */
void setHostname(const std::vector<std::string> &hostname);
void setHostname(const std::vector<std::string> &hostname,
const std::vector<int> &port);
/** connects to n servers at local host starting at specific control port */
void setVirtualDetectorServers(int numServers, int startingPort);
@ -507,24 +509,30 @@ class Detector {
Result<bool> getUseReceiverFlag(Positions pos = {}) const;
Result<std::string> getRxHostname(Positions pos = {}) const;
Result<std::string> getRxHostname2(Positions pos = {}) const;
/**
* Validates and sets the receiver.
* Updates local receiver cache parameters
* Configures the detector to the receiver as UDP destination
* @param receiver receiver hostname or IP address, can include tcp port eg. hostname:port
*/
void setRxHostname(const std::string &receiver, Positions pos = {});
/** multiple rx hostnames (same as setRxHostname) */
void setRxHostname(const std::vector<std::string> &name);
void setRxHostname(const std::string &hostname, Positions pos = {});
/** receiver hostname for udp port 2 */
void setRxHostname2(const std::string &hostname, Positions pos = {});
/** cannot be for multiple detectors as port is unique*/
void setRxHostname(const std::string &hostname, const int port,
int module_id);
void setRxHostname2(const std::string &hostname, const int port,
int module_id);
Result<int> getRxPort(Positions pos = {}) const;
/** for 2nd udp port receiver */
Result<int> getRxPort2(Positions pos = {}) const;
/** Receiver TCP port (for client communication with Receiver)
* module_id is -1 for all detectors, ports for each module is calculated
* (increments) */
void setRxPort(int port, int module_id = -1);
void setRxPort(const int port, int module_id = -1);
void setRxPort2(const int port, int module_id = -1);
Result<int> getRxFifoDepth(Positions pos = {}) const;

View File

@ -115,13 +115,27 @@ std::string CmdProxy::ListCommands(int action) {
}
/* configuration */
std::pair<std::string, int>
CmdProxy::parseHostnameAndPort(std::string name) {
std::string host = name;
std::string hostname;
int port = 0;
auto res = sls::split(host, ':');
if (res.size() > 1) {
hostname = res[0];
port = StringTo<int>(res[1]);
}
return std::make_pair(host, port);
}
std::string CmdProxy::Hostname(int action) {
std::ostringstream os;
os << cmd << ' ';
if (action == defs::HELP_ACTION) {
os << "\n\tFrees shared memory and sets hostname (or IP address) of "
"all modules concatenated by +."
"all modules concatenated by +.\n\t"
"[hostname or ip address]:[tcp port] Use this for virtual servers\n\t"
<< '\n';
} else if (action == defs::GET_ACTION) {
if (!args.empty()) {
@ -136,17 +150,35 @@ std::string CmdProxy::Hostname(int action) {
if (det_id != -1) {
throw sls::RuntimeError("Cannot execute this at module level");
}
std::vector<std::string> arguments;
// only args[0], but many hostames concatenated with +
if (args[0].find('+') != std::string::npos) {
auto t = sls::split(args[0], '+');
det->setHostname(t);
os << ToString(t) << '\n';
if (args.size() > 1) {
throw sls::RuntimeError("Cannot have concatenated hostnames and"
"multiple arguments");
}
arguments = sls::split(args[0], '+');
}
// either hostnames separated by space, or single hostname
else {
det->setHostname(args);
os << ToString(args) << '\n';
arguments.assign(args.begin(), args.end());
}
// separate hostname and port
std::vector<std::string> hostnames;
std::vector<int> ports;
for (size_t i = 0; i < arguments.size(); ++i) {
std::pair<std::string, int> res = parseHostnameAndPort(arguments[i]);
hostnames.push_back(res.first);
if (res.second == 0) {
ports.push_back(DEFAULT_PORTNO);
} else {
ports.push_back(res.second);
}
}
det->setHostname(hostnames, ports);
auto t = det->getHostname({det_id});
os << OutString(t) << '\n';
} else {
throw sls::RuntimeError("Unknown action");
}
@ -905,16 +937,19 @@ std::string CmdProxy::UDPDestinationIP2(int action) {
}
/* Receiver Config */
std::string CmdProxy::ReceiveHostname(int action) {
std::string CmdProxy::ReceiverHostname(int action) {
std::ostringstream os;
os << cmd << ' ';
if (action == defs::HELP_ACTION) {
os << "[hostname or ip address]\n\t"
"[hostname or ip address]:[tcp port]\n\t"
"[hostname1]:[tcp_port1]+[hostname2]:[tcp_port2]+\n\t"
"Receiver hostname or IP. If port included, then the receiver tcp port.\n\t"
"Receiver hostname or IP. Port is the receiver tcp port (optional).\n\t"
"Used for TCP control communication between client and receiver "
"to configure receiver. Also updates receiver with detector parameters."
"to configure receiver. Also updates receiver with detector parameters.\n\t"
"TCP port must be unique, if included.\n\t"
"If port not included and not set earlier, then it takes default port 1954"
" and calculates from there. \n\t"
"[Eiger][Jungfrau] For the 2nd udp interface, use rx_hostname2."
<< '\n';
} else if (action == defs::GET_ACTION) {
if (!args.empty()) {
@ -923,38 +958,69 @@ std::string CmdProxy::ReceiveHostname(int action) {
auto t = det->getRxHostname({det_id});
os << OutString(t) << '\n';
} else if (action == defs::PUT_ACTION) {
if (args.size() < 1) {
if (args.size() != 1) {
WrongNumberOfParameters(1);
}
// multiple arguments
if (args.size() > 1) {
// multiple in mulitple
if (args[0].find('+') != std::string::npos) {
throw sls::RuntimeError("Cannot add multiple receivers at module level");
if (args[0].find('+') != std::string::npos) {
throw sls::RuntimeError("Cannot concatenate receiver hostnames");
}
std::pair<std::string, int> res = parseHostnameAndPort(args[0]);
std::string hostname = res.first;
int port = res.second;
if (port == 0) {
det->setRxHostname(hostname, {det_id});
} else {
if (det_id == -1) {
throw sls::RuntimeError("Cannot set same tcp port "
"for all receiver hostnames");
}
if (det_id != -1) {
throw sls::RuntimeError("Cannot add multiple receivers at module level");
}
det->setRxHostname(args);
os << ToString(args) << '\n';
det->setRxHostname(hostname, port, det_id);
}
// single argument
else {
// multiple receivers concatenated with +
if (args[0].find('+') != std::string::npos) {
if (det_id != -1) {
throw sls::RuntimeError("Cannot add multiple receivers at module level");
}
auto t = sls::split(args[0], '+');
det->setRxHostname(t);
os << ToString(t) << '\n';
}
// single receiver
else {
det->setRxHostname(args[0], {det_id});
os << ToString(args) << '\n';
}
auto t = det->getRxHostname({det_id});
os << OutString(t) << '\n';
} else {
throw sls::RuntimeError("Unknown action");
}
return os.str();
}
std::string CmdProxy::ReceiverHostname2(int action) {
std::ostringstream os;
os << cmd << ' ';
if (action == defs::HELP_ACTION) {
os << "[hostname or ip address]\n\t"
"[hostname or ip address]:[tcp port]\n\t"
"[Eiger][Jungfrau] Receiver hostname or IP for the second udp port. "
"Port is the receiver tcp port (optional).\n\t"
"Refer rx_hostname help for details"
<< '\n';
} else if (action == defs::GET_ACTION) {
if (!args.empty()) {
WrongNumberOfParameters(0);
}
auto t = det->getRxHostname2({det_id});
os << OutString(t) << '\n';
} else if (action == defs::PUT_ACTION) {
if (args.size() != 1) {
WrongNumberOfParameters(1);
}
if (args[0].find('+') != std::string::npos) {
throw sls::RuntimeError("Cannot concatenate receiver hostnames");
}
std::pair<std::string, int> res = parseHostnameAndPort(args[0]);
std::string hostname = res.first;
int port = res.second;
if (port == 0) {
det->setRxHostname2(hostname, {det_id});
} else {
if (det_id == -1) {
throw sls::RuntimeError("Cannot set same tcp port "
"for all receiver hostnames");
}
det->setRxHostname2(hostname, port, det_id);
}
auto t = det->getRxHostname2({det_id});
os << OutString(t) << '\n';
} else {
throw sls::RuntimeError("Unknown action");
}

View File

@ -719,8 +719,10 @@ class CmdProxy {
{"txndelay_right", &CmdProxy::txndelay_right},
/* Receiver Config */
{"rx_hostname", &CmdProxy::ReceiveHostname},
{"rx_hostname", &CmdProxy::ReceiverHostname},
{"rx_hostname2", &CmdProxy::ReceiverHostname2},
{"rx_tcpport", &CmdProxy::rx_tcpport},
{"rx_tcpport2", &CmdProxy::rx_tcpport2},
{"rx_fifodepth", &CmdProxy::rx_fifodepth},
{"rx_silent", &CmdProxy::rx_silent},
{"rx_discardpolicy", &CmdProxy::rx_discardpolicy},
@ -911,6 +913,7 @@ class CmdProxy {
/* configuration */
std::string free(int action);
// std::string config2(int action);
std::pair<std::string, int> parseHostnameAndPort(std::string name);
std::string Hostname(int action);
std::string VirtualServer(int action);
std::string FirmwareVersion(int action);
@ -940,7 +943,8 @@ class CmdProxy {
std::string UDPDestinationIP(int action);
std::string UDPDestinationIP2(int action);
/* Receiver Config */
std::string ReceiveHostname(int action);
std::string ReceiverHostname(int action);
std::string ReceiverHostname2(int action);
/* File */
/* ZMQ Streaming Parameters (Receiver<->Client) */
/* Eiger Specific */
@ -1433,6 +1437,9 @@ class CmdProxy {
INTEGER_COMMAND(rx_tcpport, getRxPort, setRxPort, StringTo<int>,
"[port]\n\tTCP port for client-receiver communication. Default is 1954. Must be different if multiple receivers on same pc. Must be first command to set a receiver parameter. Multi command will automatically increment for individual modules.");
INTEGER_COMMAND(rx_tcpport2, getRxPort2, setRxPort2, StringTo<int>,
"[port]\n\t[Eiger][Jungfrau] TCP port for client-receiver communication for 2nd udp port. For details, refer rx_tcpport.");
INTEGER_COMMAND(rx_fifodepth, getRxFifoDepth, setRxFifoDepth, StringTo<int>,
"[n_frames]\n\tSet the number of frames in the receiver fifo (buffer between listener and writer threads).");

View File

@ -6,6 +6,7 @@
#include "logger.h"
#include "DetectorImpl.h"
#include "Module.h"
#include "Receiver.h"
#include "sls_detector_defs.h"
#include "versionAPI.h"
@ -13,13 +14,25 @@
namespace sls {
void freeSharedMemory(int detectorId, int detPos) {
void freeSharedMemory(int detectorId, int moduleId) {
// single
if (detPos >= 0) {
SharedMemory<sharedModule> moduleShm(detectorId, detPos);
if (moduleId >= 0) {
SharedMemory<sharedModule> moduleShm(detectorId, moduleId);
int numReceivers = 0, numReceivers2 = 0;
if (moduleShm.IsExisting()) {
moduleShm.OpenSharedMemory();
if (Module::hasSharedMemoryReceiverList(moduleShm()->shmversion)) {
numReceivers = moduleShm()->numberOfReceivers;
numReceivers2 = moduleShm()->numberOfReceivers2;
}
moduleShm.RemoveSharedMemory();
}
for (int iReceiver = 0; iReceiver < numReceivers + numReceivers2; ++iReceiver) {
SharedMemory<sharedModule> receiverShm(detectorId, moduleId, iReceiver);
if (receiverShm.IsExisting()) {
receiverShm.RemoveSharedMemory();
}
}
return;
}
@ -29,13 +42,27 @@ void freeSharedMemory(int detectorId, int detPos) {
if (detectorShm.IsExisting()) {
detectorShm.OpenSharedMemory();
numDetectors = detectorShm()->numberOfDetectors;
numDetectors = detectorShm()->numberOfModules;
detectorShm.RemoveSharedMemory();
}
for (int i = 0; i < numDetectors; ++i) {
SharedMemory<sharedModule> moduleShm(detectorId, i);
moduleShm.RemoveSharedMemory();
for (int iModule = 0; iModule < numDetectors; ++iModule) {
SharedMemory<sharedModule> moduleShm(detectorId, iModule);
int numReceivers = 0, numReceivers2 = 0;
if (moduleShm.IsExisting()) {
moduleShm.OpenSharedMemory();
if (Module::hasSharedMemoryReceiverList(moduleShm()->shmversion)) {
numReceivers = moduleShm()->numberOfReceivers;
numReceivers2 = moduleShm()->numberOfReceivers2;
}
moduleShm.RemoveSharedMemory();
}
for (int iReceiver = 0; iReceiver < numReceivers + numReceivers2; ++iReceiver) {
SharedMemory<sharedModule> receiverShm(detectorId, iModule, iReceiver);
if (receiverShm.IsExisting()) {
receiverShm.RemoveSharedMemory();
}
}
}
}
@ -91,6 +118,11 @@ void Detector::setHostname(const std::vector<std::string> &hostname) {
pimpl->setHostname(hostname);
}
void Detector::setHostname(const std::vector<std::string> &hostname,
const std::vector<int> &port) {
pimpl->setHostname(hostname, port);
}
void Detector::setVirtualDetectorServers(int numServers, int startingPort) {
pimpl->setVirtualDetectorServers(numServers, startingPort);
}
@ -664,46 +696,80 @@ Result<bool> Detector::getUseReceiverFlag(Positions pos) const {
}
Result<std::string> Detector::getRxHostname(Positions pos) const {
return pimpl->Parallel(&Module::getReceiverHostname, pos);
return pimpl->Parallel1(&Receiver::getHostname, pos, {0});
}
void Detector::setRxHostname(const std::string &receiver, Positions pos) {
pimpl->Parallel(&Module::setReceiverHostname, pos, receiver);
Result<std::string> Detector::getRxHostname2(Positions pos) const {
return pimpl->Parallel2(&Receiver::getHostname, pos, {0});
}
void Detector::setRxHostname(const std::vector<std::string> &name) {
// set all to same rx_hostname
if (name.size() == 1) {
pimpl->Parallel(&Module::setReceiverHostname, {}, name[0]);
} else {
if ((int)name.size() != size()) {
throw RuntimeError("Receiver hostnames size " +
std::to_string(name.size()) + " does not match detector size " +
std::to_string(size()));
}
// set each rx_hostname
for (int idet = 0; idet < size(); ++idet) {
pimpl->Parallel(&Module::setReceiverHostname, {idet}, name[idet]);
}
void Detector::setRxHostname(const std::string &hostname, Positions pos) {
if (!pimpl->isReceiverInitialized()) {
pimpl->initReceiver();
}
pimpl->Parallel1(&Receiver::setHostname, pos, {0}, hostname);
}
void Detector::setRxHostname2(const std::string &hostname, Positions pos) {
if (!pimpl->isReceiver2Initialized()) {
pimpl->initReceiver2();
}
pimpl->Parallel2(&Receiver::setHostname, pos, {0}, hostname);
}
void Detector::setRxHostname(const std::string &hostname, const int port,
int module_id) {
if (!pimpl->isReceiverInitialized()) {
pimpl->initReceiver();
}
pimpl->Parallel1(&Receiver::setTCPPort, {module_id}, {0}, port);
pimpl->Parallel1(&Receiver::setHostname, {module_id}, {0}, hostname);
}
void Detector::setRxHostname2(const std::string &hostname, const int port,
int module_id) {
if (!pimpl->isReceiver2Initialized()) {
pimpl->initReceiver2();
}
pimpl->Parallel2(&Receiver::setTCPPort, {module_id}, {0}, port);
pimpl->Parallel2(&Receiver::setHostname, {module_id}, {0}, hostname);
}
Result<int> Detector::getRxPort(Positions pos) const {
return pimpl->Parallel(&Module::getReceiverPort, pos);
return pimpl->Parallel1(&Receiver::getTCPPort, pos, {0});
}
Result<int> Detector::getRxPort2(Positions pos) const {
return pimpl->Parallel2(&Receiver::getTCPPort, pos, {0});
}
void Detector::setRxPort(int port, int module_id) {
if (!pimpl->isReceiverInitialized()) {
pimpl->initReceiver();
}
if (module_id == -1) {
std::vector<int> port_list(size());
for (auto &it : port_list) {
it = port++;
}
std::vector<int> port_list = getPortNumbers(port);
for (int idet = 0; idet < size(); ++idet) {
pimpl->Parallel(&Module::setReceiverPort, {idet},
pimpl->Parallel1(&Receiver::setTCPPort, {idet}, {0},
port_list[idet]);
}
} else {
pimpl->Parallel(&Module::setReceiverPort, {module_id}, port);
pimpl->Parallel1(&Receiver::setTCPPort, {module_id}, {0}, port);
}
}
void Detector::setRxPort2(int port, int module_id) {
if (!pimpl->isReceiver2Initialized()) {
pimpl->initReceiver2();
}
if (module_id == -1) {
std::vector<int> port_list = getPortNumbers(port);
for (int idet = 0; idet < size(); ++idet) {
pimpl->Parallel2(&Receiver::setTCPPort, {idet}, {0},
port_list[idet]);
}
} else {
pimpl->Parallel2(&Receiver::setTCPPort, {module_id}, {0}, port);
}
}
@ -1822,23 +1888,10 @@ Result<uint64_t> Detector::getRxCurrentFrameIndex(Positions pos) const {
}
std::vector<int> Detector::getPortNumbers(int start_port) {
int num_sockets_per_detector = 1;
switch (getDetectorType().squash()) {
case defs::EIGER:
num_sockets_per_detector *= 2;
break;
case defs::JUNGFRAU:
if (getNumberofUDPInterfaces().squash() == 2) {
num_sockets_per_detector *= 2;
}
break;
default:
break;
}
std::vector<int> res;
res.reserve(size());
for (int idet = 0; idet < size(); ++idet) {
res.push_back(start_port + (idet * num_sockets_per_detector));
res.push_back(start_port + idet);
}
return res;
}

View File

@ -5,6 +5,7 @@
#include "file_utils.h"
#include "logger.h"
#include "Module.h"
#include "Receiver.h"
#include "sls_detector_exceptions.h"
#include "versionAPI.h"
@ -49,12 +50,24 @@ void DetectorImpl::setAcquiringFlag(bool flag) {
int DetectorImpl::getDetectorId() const { return detectorId; }
void DetectorImpl::freeSharedMemory(int detectorId, int detPos) {
void DetectorImpl::freeSharedMemory(int detectorId, int moduleId) {
// single
if (detPos >= 0) {
SharedMemory<sharedModule> module_shm(detectorId, detPos);
if (module_shm.IsExisting()) {
module_shm.RemoveSharedMemory();
if (moduleId >= 0) {
SharedMemory<sharedModule> moduleShm(detectorId, moduleId);
int numReceivers = 0, numReceivers2 = 0;
if (moduleShm.IsExisting()) {
moduleShm.OpenSharedMemory();
if (Module::hasSharedMemoryReceiverList(moduleShm()->shmversion)) {
numReceivers = moduleShm()->numberOfReceivers;
numReceivers2 = moduleShm()->numberOfReceivers2;
}
moduleShm.RemoveSharedMemory();
}
for (int iReceiver = 0; iReceiver < numReceivers + numReceivers2; ++iReceiver) {
SharedMemory<sharedModule> receiverShm(detectorId, moduleId, iReceiver);
if (receiverShm.IsExisting()) {
receiverShm.RemoveSharedMemory();
}
}
return;
}
@ -65,13 +78,27 @@ void DetectorImpl::freeSharedMemory(int detectorId, int detPos) {
if (detectorShm.IsExisting()) {
detectorShm.OpenSharedMemory();
numDetectors = detectorShm()->numberOfDetectors;
numDetectors = detectorShm()->numberOfModules;
detectorShm.RemoveSharedMemory();
}
for (int i = 0; i < numDetectors; ++i) {
SharedMemory<sharedModule> module_shm(detectorId, i);
module_shm.RemoveSharedMemory();
for (int iModule = 0; iModule < numDetectors; ++iModule) {
SharedMemory<sharedModule> moduleShm(detectorId, iModule);
int numReceivers = 0, numReceivers2 = 0;
if (moduleShm.IsExisting()) {
moduleShm.OpenSharedMemory();
if (Module::hasSharedMemoryReceiverList(moduleShm()->shmversion)) {
numReceivers = moduleShm()->numberOfReceivers;
numReceivers2 = moduleShm()->numberOfReceivers2;
}
moduleShm.RemoveSharedMemory();
}
for (int iReceiver = 0; iReceiver < numReceivers + numReceivers2; ++iReceiver) {
SharedMemory<sharedModule> receiverShm(detectorId, iModule, iReceiver);
if (receiverShm.IsExisting()) {
receiverShm.RemoveSharedMemory();
}
}
}
}
@ -81,7 +108,18 @@ void DetectorImpl::freeSharedMemory() {
d->freeSharedMemory();
}
detectors.clear();
for (auto &dr : receivers) {
for (auto &r : dr) {
r->freeSharedMemory();
}
}
receivers.clear();
for (auto &dr : receivers2) {
for (auto &r : dr) {
r->freeSharedMemory();
}
}
receivers2.clear();
// clear multi detector shm
detector_shm.RemoveSharedMemory();
client_downstream = false;
@ -149,7 +187,7 @@ void DetectorImpl::initSharedMemory(bool verify) {
void DetectorImpl::initializeDetectorStructure() {
detector_shm()->shmversion = DETECTOR_SHMVERSION;
detector_shm()->numberOfDetectors = 0;
detector_shm()->numberOfModules = 0;
detector_shm()->multiDetectorType = GENERIC;
detector_shm()->numberOfDetector.x = 0;
detector_shm()->numberOfDetector.y = 0;
@ -163,16 +201,37 @@ void DetectorImpl::initializeDetectorStructure() {
void DetectorImpl::initializeMembers(bool verify) {
// DetectorImpl
zmqSocket.clear();
int numModules = detector_shm()->numberOfModules;
// get objects from single det shared memory (open)
for (int i = 0; i < detector_shm()->numberOfDetectors; i++) {
try {
try {
for (int iModule = 0; iModule < numModules; ++iModule) {
detectors.push_back(
sls::make_unique<Module>(detectorId, i, verify));
} catch (...) {
detectors.clear();
throw;
sls::make_unique<Module>(detectorId, iModule, verify));
int numReceivers = detectors[iModule]->getNumberOfReceivers();
if (numReceivers != 0) {
receivers.resize(numModules);
for (int iReceiver = 0; iReceiver < numReceivers; ++iReceiver) {
receivers[iModule].push_back(
sls::make_unique<Receiver>(detectorId, iModule, iReceiver,
true, verify));
}
}
int numReceivers2 = detectors[iModule]->getNumberOfReceivers2();
if (numReceivers2 != 0) {
receivers2.resize(numModules);
for (int iReceiver = 0; iReceiver < numReceivers2; ++iReceiver) {
receivers2[iModule].push_back(
sls::make_unique<Receiver>(detectorId, iModule, iReceiver,
false, verify));
}
}
}
} catch (...) {
detectors.clear();
receivers.clear();
receivers2.clear();
throw;
}
}
@ -222,17 +281,18 @@ std::string DetectorImpl::exec(const char *cmd) {
void DetectorImpl::setVirtualDetectorServers(const int numdet, const int port) {
std::vector<std::string> hostnames;
std::vector<int> ports;
for (int i = 0; i < numdet; ++i) {
hostnames.push_back(std::string("localhost"));
// * 2 is for control and stop port
hostnames.push_back(std::string("localhost:") +
std::to_string(port + i * 2));
ports.push_back(port + i * 2);
}
setHostname(hostnames);
setHostname(hostnames, ports);
}
void DetectorImpl::setHostname(const std::vector<std::string> &name) {
// this check is there only to allow the previous detsizechan command
if (detector_shm()->numberOfDetectors != 0) {
if (detector_shm()->numberOfModules != 0) {
LOG(logWARNING)
<< "There are already detector(s) in shared memory."
"Freeing Shared memory now.";
@ -242,27 +302,41 @@ void DetectorImpl::setHostname(const std::vector<std::string> &name) {
detector_shm()->initialChecks = initialChecks;
}
for (const auto &hostname : name) {
addSlsDetector(hostname);
addModule(hostname, DEFAULT_PORTNO);
}
updateDetectorSize();
}
void DetectorImpl::addSlsDetector(const std::string &hostname) {
void DetectorImpl::setHostname(const std::vector<std::string> &name,
const std::vector<int> &port) {
if (name.size() != port.size()) {
throw RuntimeError("hostname vector size and port vector size do not match");
}
// this check is there only to allow the previous detsizechan command
if (detector_shm()->numberOfModules != 0) {
LOG(logWARNING)
<< "There are already detector(s) in shared memory."
"Freeing Shared memory now.";
bool initialChecks = detector_shm()->initialChecks;
freeSharedMemory();
setupDetector();
detector_shm()->initialChecks = initialChecks;
}
for (size_t i = 0; i < name.size(); ++i) {
addModule(name[i], port[i]);
}
updateDetectorSize();
}
void DetectorImpl::addModule(const std::string &hostname,
const int port) {
LOG(logINFO) << "Adding detector " << hostname;
int port = DEFAULT_PORTNO;
std::string host = hostname;
auto res = sls::split(hostname, ':');
if (res.size() > 1) {
host = res[0];
port = StringTo<int>(res[1]);
}
if (host != "localhost") {
if (hostname != "localhost") {
for (auto &d : detectors) {
if (d->getHostname() == host) {
if (d->getHostname() == hostname) {
LOG(logWARNING)
<< "Detector " << host
<< "Detector " << hostname
<< "already part of the Detector!" << std::endl
<< "Remove it before adding it back in a new position!";
return;
@ -271,14 +345,14 @@ void DetectorImpl::addSlsDetector(const std::string &hostname) {
}
// get type by connecting
detectorType type = Module::getTypeFromDetector(host, port);
detectorType type = Module::getTypeFromDetector(hostname, port);
auto pos = detectors.size();
detectors.emplace_back(
sls::make_unique<Module>(type, detectorId, pos, false));
detector_shm()->numberOfDetectors = detectors.size();
detector_shm()->numberOfModules = detectors.size();
detectors[pos]->setControlPort(port);
detectors[pos]->setStopPort(port + 1);
detectors[pos]->setHostname(host, detector_shm()->initialChecks);
detectors[pos]->setHostname(hostname, detector_shm()->initialChecks);
// detector type updated by now
detector_shm()->multiDetectorType =
Parallel(&Module::getDetectorType, {})
@ -287,6 +361,54 @@ void DetectorImpl::addSlsDetector(const std::string &hostname) {
detectors[pos]->updateNumberOfChannels();
}
void DetectorImpl::initReceiver() {
if (receivers.size() != 0) {
throw RuntimeError("receiver vector already initialized");
}
int tcpPort = DEFAULT_RX_PORTNO;
int zmqPort = DEFAULT_ZMQ_CL_PORTNO;
try {
for (int iModule = 0; iModule < size(); ++iModule) {
receivers.resize(detectors.size());
receivers[iModule].push_back(
sls::make_unique<Receiver>(detectorId, iModule, 0,
true, tcpPort++, "", zmqPort++));
detectors[iModule]->setNumberOfReceivers(1);
}
} catch (...) {
receivers.clear();
throw;
}
}
bool DetectorImpl::isReceiverInitialized() {
return (receivers.size() > 0);
}
void DetectorImpl::initReceiver2() {
if (receivers2.size() != 0) {
throw RuntimeError("receiver2 vector already initialized");
}
int tcpPort = DEFAULT_RX_PORTNO + size();
int zmqPort = DEFAULT_ZMQ_CL_PORTNO + size();
try {
for (int iModule = 0; iModule < size(); ++iModule) {
receivers2.resize(detectors.size());
receivers2[iModule].push_back(
sls::make_unique<Receiver>(detectorId, iModule, 0,
false, tcpPort++, "", zmqPort++));
detectors[iModule]->setNumberOfReceivers2(1);
}
} catch (...) {
receivers.clear();
throw;
}
}
bool DetectorImpl::isReceiver2Initialized() {
return (receivers2.size() > 0);
}
void DetectorImpl::updateDetectorSize() {
LOG(logDEBUG) << "Updating Detector Size: " << size();
@ -429,7 +551,7 @@ void DetectorImpl::readFrameFromReceiver() {
int nDetPixelsY = 0;
bool quadEnable = false;
bool eiger = false;
bool numInterfaces =
int numInterfaces =
Parallel(&Module::getNumberofUDPInterfacesFromShm, {})
.squash(); // cannot pick up from zmq

View File

@ -25,6 +25,7 @@ class detectorData;
namespace sls{
class Module;
class Receiver;
/**
* @short structure allocated in shared memory to store detector settings
@ -47,7 +48,7 @@ struct sharedDetector {
/** last time stamp when accessing the shared memory */
char lastDate[SHORT_STRING_LENGTH];
int numberOfDetectors;
int numberOfModules;
slsDetectorDefs::detectorType multiDetectorType;
/** END OF FIXED PATTERN
@ -189,6 +190,320 @@ class DetectorImpl : public virtual slsDetectorDefs {
}
template <typename RT, typename... CT>
sls::Result<RT> Parallel1(RT (sls::Receiver::*somefunc)(CT...),
std::vector<int> dPositions,
std::vector<int> rxPositions,
typename NonDeduced<CT>::type... Args) {
if (receivers.size() == 0)
throw sls::RuntimeError("No receivers added");
if (dPositions.empty() ||
(dPositions.size() == 1 && dPositions[0] == -1)) {
dPositions.resize(receivers.size());
std::iota(begin(dPositions), end(dPositions), 0);
}
std::vector<std::future<RT>> futures;
futures.reserve(dPositions.size());// cannot know rxPositions.size() without looping
for (size_t i : dPositions) {
if (i >= receivers.size())
throw sls::RuntimeError("Detector out of range");
// each entry
std::vector<int> rxPos(rxPositions);
if (rxPositions.empty() ||
(rxPositions.size() == 1 && rxPositions[0] == -1)) {
rxPos.resize(receivers[i].size());
std::iota(begin(rxPos), end(rxPos), 0);
}
for (size_t j : rxPos) {
futures.push_back(std::async(std::launch::async, somefunc,
receivers[i][j].get(), Args...));
}
}
sls::Result<RT> result;
result.reserve(dPositions.size());// cannot know rxPositions.size() without looping
for (auto &i : futures) {
result.push_back(i.get());
}
return result;
}
template <typename RT, typename... CT>
sls::Result<RT> Parallel1(RT (sls::Receiver::*somefunc)(CT...) const,
std::vector<int> dPositions,
std::vector<int> rxPositions,
typename NonDeduced<CT>::type... Args) const {
if (receivers.size() == 0)
throw sls::RuntimeError("No receivers added");
if (dPositions.empty() ||
(dPositions.size() == 1 && dPositions[0] == -1)) {
dPositions.resize(receivers.size());
std::iota(begin(dPositions), end(dPositions), 0);
}
std::vector<std::future<RT>> futures;
futures.reserve(dPositions.size());// cannot know rxPositions.size() without looping
for (size_t i : dPositions) {
if (i >= receivers.size())
throw sls::RuntimeError("Detector out of range");
// each entry
std::vector<int> rxPos(rxPositions);
if (rxPositions.empty() ||
(rxPositions.size() == 1 && rxPositions[0] == -1)) {
rxPos.resize(receivers[i].size());
std::iota(begin(rxPos), end(rxPos), 0);
}
for (size_t j : rxPos) {
futures.push_back(std::async(std::launch::async, somefunc,
receivers[i][j].get(), Args...));
}
}
sls::Result<RT> result;
result.reserve(dPositions.size());// cannot know rxPositions.size() without looping
for (auto &i : futures) {
result.push_back(i.get());
}
return result;
}
template <typename... CT>
void Parallel1(void (sls::Receiver::*somefunc)(CT...),
std::vector<int> dPositions,
std::vector<int> rxPositions,
typename NonDeduced<CT>::type... Args) {
if (receivers.size() == 0)
throw sls::RuntimeError("No receivers added");
if (dPositions.empty() ||
(dPositions.size() == 1 && dPositions[0] == -1)) {
dPositions.resize(receivers.size());
std::iota(begin(dPositions), end(dPositions), 0);
}
std::vector<std::future<void>> futures;
futures.reserve(dPositions.size());// cannot know rxPositions.size() without looping
for (size_t i : dPositions) {
if (i >= receivers.size())
throw sls::RuntimeError("Detector out of range");
// each entry
std::vector<int> rxPos(rxPositions);
if (rxPositions.empty() ||
(rxPositions.size() == 1 && rxPositions[0] == -1)) {
rxPos.resize(receivers[i].size());
std::iota(begin(rxPos), end(rxPos), 0);
}
for (size_t j : rxPos) {
futures.push_back(std::async(std::launch::async, somefunc,
receivers[i][j].get(), Args...));
}
}
for (auto &i : futures) {
i.get();
}
}
template <typename... CT>
void Parallel1(void (sls::Receiver::*somefunc)(CT...) const,
std::vector<int> dPositions,
std::vector<int> rxPositions,
typename NonDeduced<CT>::type... Args) const {
if (receivers.size() == 0)
throw sls::RuntimeError("No receivers added");
if (dPositions.empty() ||
(dPositions.size() == 1 && dPositions[0] == -1)) {
dPositions.resize(receivers.size());
std::iota(begin(dPositions), end(dPositions), 0);
}
std::vector<std::future<void>> futures;
futures.reserve(dPositions.size());// cannot know rxPositions.size() without looping
for (size_t i : dPositions) {
if (i >= receivers.size())
throw sls::RuntimeError("Detector out of range");
// each entry
std::vector<int> rxPos(rxPositions);
if (rxPositions.empty() ||
(rxPositions.size() == 1 && rxPositions[0] == -1)) {
rxPos.resize(receivers[i].size());
std::iota(begin(rxPos), end(rxPos), 0);
}
for (size_t j : rxPos) {
futures.push_back(std::async(std::launch::async, somefunc,
receivers[i][j].get(), Args...));
}
}
for (auto &i : futures) {
i.get();
}
}
template <typename RT, typename... CT>
sls::Result<RT> Parallel2(RT (sls::Receiver::*somefunc)(CT...),
std::vector<int> dPositions,
std::vector<int> rxPositions,
typename NonDeduced<CT>::type... Args) {
if (receivers2.size() == 0)
throw sls::RuntimeError("No receivers2 added");
if (dPositions.empty() ||
(dPositions.size() == 1 && dPositions[0] == -1)) {
dPositions.resize(receivers2.size());
std::iota(begin(dPositions), end(dPositions), 0);
}
std::vector<std::future<RT>> futures;
futures.reserve(dPositions.size());// cannot know rxPositions.size() without looping
for (size_t i : dPositions) {
if (i >= receivers2.size())
throw sls::RuntimeError("Detector out of range");
// each entry
std::vector<int> rxPos(rxPositions);
if (rxPositions.empty() ||
(rxPositions.size() == 1 && rxPositions[0] == -1)) {
rxPos.resize(receivers2[i].size());
std::iota(begin(rxPos), end(rxPos), 0);
}
for (size_t j : rxPos) {
futures.push_back(std::async(std::launch::async, somefunc,
receivers2[i][j].get(), Args...));
}
}
sls::Result<RT> result;
result.reserve(dPositions.size());// cannot know rxPositions.size() without looping
for (auto &i : futures) {
result.push_back(i.get());
}
return result;
}
template <typename RT, typename... CT>
sls::Result<RT> Parallel2(RT (sls::Receiver::*somefunc)(CT...) const,
std::vector<int> dPositions,
std::vector<int> rxPositions,
typename NonDeduced<CT>::type... Args) const {
if (receivers2.size() == 0)
throw sls::RuntimeError("No receivers2 added");
if (dPositions.empty() ||
(dPositions.size() == 1 && dPositions[0] == -1)) {
dPositions.resize(receivers2.size());
std::iota(begin(dPositions), end(dPositions), 0);
}
std::vector<std::future<RT>> futures;
futures.reserve(dPositions.size());// cannot know rxPositions.size() without looping
for (size_t i : dPositions) {
if (i >= receivers2.size())
throw sls::RuntimeError("Detector out of range");
// each entry
std::vector<int> rxPos(rxPositions);
if (rxPositions.empty() ||
(rxPositions.size() == 1 && rxPositions[0] == -1)) {
rxPos.resize(receivers2[i].size());
std::iota(begin(rxPos), end(rxPos), 0);
}
for (size_t j : rxPos) {
futures.push_back(std::async(std::launch::async, somefunc,
receivers2[i][j].get(), Args...));
}
}
sls::Result<RT> result;
result.reserve(dPositions.size());// cannot know rxPositions.size() without looping
for (auto &i : futures) {
result.push_back(i.get());
}
return result;
}
template <typename... CT>
void Parallel2(void (sls::Receiver::*somefunc)(CT...),
std::vector<int> dPositions,
std::vector<int> rxPositions,
typename NonDeduced<CT>::type... Args) {
if (receivers2.size() == 0)
throw sls::RuntimeError("No receivers2 added");
if (dPositions.empty() ||
(dPositions.size() == 1 && dPositions[0] == -1)) {
dPositions.resize(receivers2.size());
std::iota(begin(dPositions), end(dPositions), 0);
}
std::vector<std::future<void>> futures;
futures.reserve(dPositions.size());// cannot know rxPositions.size() without looping
for (size_t i : dPositions) {
if (i >= receivers2.size())
throw sls::RuntimeError("Detector out of range");
// each entry
std::vector<int> rxPos(rxPositions);
if (rxPositions.empty() ||
(rxPositions.size() == 1 && rxPositions[0] == -1)) {
rxPos.resize(receivers2[i].size());
std::iota(begin(rxPos), end(rxPos), 0);
}
for (size_t j : rxPos) {
futures.push_back(std::async(std::launch::async, somefunc,
receivers2[i][j].get(), Args...));
}
}
for (auto &i : futures) {
i.get();
}
}
template <typename... CT>
void Parallel2(void (sls::Receiver::*somefunc)(CT...) const,
std::vector<int> dPositions,
std::vector<int> rxPositions,
typename NonDeduced<CT>::type... Args) const {
if (receivers2.size() == 0)
throw sls::RuntimeError("No receivers2 added");
if (dPositions.empty() ||
(dPositions.size() == 1 && dPositions[0] == -1)) {
dPositions.resize(receivers2.size());
std::iota(begin(dPositions), end(dPositions), 0);
}
std::vector<std::future<void>> futures;
futures.reserve(dPositions.size());// cannot know rxPositions.size() without looping
for (size_t i : dPositions) {
if (i >= receivers2.size())
throw sls::RuntimeError("Detector out of range");
// each entry
std::vector<int> rxPos(rxPositions);
if (rxPositions.empty() ||
(rxPositions.size() == 1 && rxPositions[0] == -1)) {
rxPos.resize(receivers2[i].size());
std::iota(begin(rxPos), end(rxPos), 0);
}
for (size_t j : rxPos) {
futures.push_back(std::async(std::launch::async, somefunc,
receivers2[i][j].get(), Args...));
}
}
for (auto &i : futures) {
i.get();
}
}
/** set acquiring flag in shared memory */
void setAcquiringFlag(bool flag);
@ -196,7 +511,7 @@ class DetectorImpl : public virtual slsDetectorDefs {
int getDetectorId() const;
/** Free specific shared memory from the command line without creating object */
static void freeSharedMemory(int detectorId, int detPos = -1);
static void freeSharedMemory(int detectorId, int moduleId = -1);
/** Free all modules from current multi Id shared memory and delete members */
void freeSharedMemory();
@ -217,8 +532,14 @@ class DetectorImpl : public virtual slsDetectorDefs {
*/
void setVirtualDetectorServers(const int numdet, const int port);
/** Sets the hostname of all sls detectors in shared memory and updates local cache */
void setHostname(const std::vector<std::string> &name);
void setHostname(const std::vector<std::string> &name,
const std::vector<int> &port);
void initReceiver();
bool isReceiverInitialized();
void initReceiver2();
bool isReceiver2Initialized();
/** Gets the total number of detectors */
int size() const;
@ -324,7 +645,7 @@ class DetectorImpl : public virtual slsDetectorDefs {
/** Execute command in terminal and return result */
std::string exec(const char *cmd);
void addSlsDetector(const std::string &hostname);
void addModule(const std::string &hostname, const int port);
void updateDetectorSize();
@ -385,6 +706,12 @@ class DetectorImpl : public virtual slsDetectorDefs {
/** pointers to the Module structures */
std::vector<std::unique_ptr<sls::Module>> detectors;
/** pointers to the Receiver structures, each row for a module */
std::vector<std::vector<std::unique_ptr<sls::Receiver>>> receivers;
/** for the second udp port [Eiger][Jungfrau] */
std::vector<std::vector<std::unique_ptr<sls::Receiver>>> receivers2;
/** data streaming (down stream) enabled in client (zmq sckets created) */
bool client_downstream{false};

View File

@ -55,6 +55,27 @@ bool Module::isFixedPatternSharedMemoryCompatible() {
return (shm()->shmversion >= MODULE_SHMAPIVERSION);
}
bool Module::hasSharedMemoryReceiverList(int version) {
return (version >= MODULE_SHMRXVERSION);
}
int Module::getNumberOfReceivers() const {
return shm()->numberOfReceivers;
}
void Module::setNumberOfReceivers(const int num) {
shm()->numberOfReceivers = num;
}
int Module::getNumberOfReceivers2() const {
return shm()->numberOfReceivers2;
}
void Module::setNumberOfReceivers2(const int num) {
shm()->numberOfReceivers2 = num;
}
void Module::checkDetectorVersionCompatibility() {
int fnum = F_CHECK_VERSION;
int64_t arg = 0;
@ -128,7 +149,7 @@ int64_t Module::getSerialNumber() {
int64_t Module::getReceiverSoftwareVersion() const {
LOG(logDEBUG1) << "Getting receiver software version";
int64_t retval = -1;
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_GET_RECEIVER_VERSION, nullptr, retval);
}
return retval;
@ -386,6 +407,8 @@ void Module::initializeDetectorStructure(detectorType type) {
shm()->shmversion = MODULE_SHMVERSION;
memset(shm()->hostname, 0, MAX_STR_LENGTH);
shm()->myDetectorType = type;
shm()->numberOfReceivers = 0;
shm()->numberOfReceivers2 = 0;
shm()->detectorSize.x = 0;
shm()->detectorSize.y = 0;
shm()->controlPort = DEFAULT_PORTNO;
@ -393,7 +416,7 @@ void Module::initializeDetectorStructure(detectorType type) {
sls::strcpy_safe(shm()->settingsDir, getenv("HOME"));
sls::strcpy_safe(shm()->rxHostname, "none");
shm()->rxTCPPort = DEFAULT_PORTNO + 2;
shm()->useReceiverFlag = false;
shm()->useReceiver = false;
shm()->zmqport = DEFAULT_ZMQ_CL_PORTNO +
(moduleId * ((shm()->myDetectorType == EIGER) ? 2 : 1));
shm()->zmqip = IpAddr{};
@ -562,7 +585,7 @@ void Module::setQuad(const bool enable) {
LOG(logDEBUG1) << "Setting Quad type to " << value;
sendToDetector(F_SET_QUAD, value, nullptr);
LOG(logDEBUG1) << "Setting Quad type to " << value << " in Receiver";
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_SET_RECEIVER_QUAD, value, nullptr);
}
}
@ -572,7 +595,7 @@ void Module::setReadNLines(const int value) {
sendToDetector(F_SET_READ_N_LINES, value, nullptr);
LOG(logDEBUG1) << "Setting read n lines to " << value
<< " in Receiver";
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_SET_RECEIVER_READ_N_LINES, value, nullptr);
}
}
@ -625,7 +648,7 @@ int Module::setStopPort(int port_number) {
int Module::setReceiverPort(int port_number) {
LOG(logDEBUG1) << "Setting reciever port to " << port_number;
if (port_number >= 0 && port_number != shm()->rxTCPPort) {
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
int retval = -1;
sendToReceiver(F_SET_RECEIVER_PORT, port_number, retval);
shm()->rxTCPPort = retval;
@ -971,7 +994,7 @@ void Module::stopAcquisition() {
// get status before stopping acquisition
runStatus s = ERROR, r = ERROR;
bool zmqstreaming = false;
if (shm()->useReceiverFlag && getReceiverStreaming()) {
if (shm()->useReceiver && getReceiverStreaming()) {
zmqstreaming = true;
s = getRunStatus();
r = getReceiverStatus();
@ -1034,7 +1057,7 @@ int64_t Module::getNumberOfFrames() {
void Module::setNumberOfFrames(int64_t value) {
LOG(logDEBUG1) << "Setting number of frames to " << value;
sendToDetector(F_SET_NUM_FRAMES, value, nullptr);
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
LOG(logDEBUG1) << "Sending number of frames to Receiver: " << value;
sendToReceiver(F_RECEIVER_SET_NUM_FRAMES, value, nullptr);
}
@ -1050,7 +1073,7 @@ int64_t Module::getNumberOfTriggers() {
void Module::setNumberOfTriggers(int64_t value) {
LOG(logDEBUG1) << "Setting number of triggers to " << value;
sendToDetector(F_SET_NUM_TRIGGERS, value, nullptr);
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
LOG(logDEBUG1) << "Sending number of triggers to Receiver: " << value;
sendToReceiver(F_SET_RECEIVER_NUM_TRIGGERS, value, nullptr);
}
@ -1066,7 +1089,7 @@ int64_t Module::getNumberOfBursts() {
void Module::setNumberOfBursts(int64_t value) {
LOG(logDEBUG1) << "Setting number of bursts to " << value;
sendToDetector(F_SET_NUM_BURSTS, value, nullptr);
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
LOG(logDEBUG1) << "Sending number of bursts to Receiver: " << value;
sendToReceiver(F_SET_RECEIVER_NUM_BURSTS, value, nullptr);
}
@ -1096,7 +1119,7 @@ void Module::setNumberOfAnalogSamples(int value) {
sendToDetector(F_SET_NUM_ANALOG_SAMPLES, value, nullptr);
// update #nchan, as it depends on #samples, adcmask
updateNumberOfChannels();
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
LOG(logDEBUG1) << "Sending number of analog samples to Receiver: " << value;
sendToReceiver(F_RECEIVER_SET_NUM_ANALOG_SAMPLES, value, nullptr);
}
@ -1114,7 +1137,7 @@ void Module::setNumberOfDigitalSamples(int value) {
sendToDetector(F_SET_NUM_DIGITAL_SAMPLES, value, nullptr);
// update #nchan, as it depends on #samples, adcmask
updateNumberOfChannels();
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
LOG(logDEBUG1) << "Sending number of digital samples to Receiver: " << value;
sendToReceiver(F_RECEIVER_SET_NUM_DIGITAL_SAMPLES, value, nullptr);
}
@ -1131,7 +1154,7 @@ void Module::setExptime(int64_t value) {
}
LOG(logDEBUG1) << "Setting exptime to " << value << "ns";
sendToDetector(F_SET_EXPTIME, value, nullptr);
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
LOG(logDEBUG1) << "Sending exptime to Receiver: " << value;
sendToReceiver(F_RECEIVER_SET_EXPTIME, value, nullptr);
}
@ -1147,7 +1170,7 @@ int64_t Module::getPeriod() {
void Module::setPeriod(int64_t value) {
LOG(logDEBUG1) << "Setting period to " << value << "ns";
sendToDetector(F_SET_PERIOD, value, nullptr);
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
LOG(logDEBUG1) << "Sending period to Receiver: " << value;
sendToReceiver(F_RECEIVER_SET_PERIOD, value, nullptr);
}
@ -1182,7 +1205,7 @@ void Module::setSubExptime(int64_t value) {
}
LOG(logDEBUG1) << "Setting sub exptime to " << value << "ns";
sendToDetector(F_SET_SUB_EXPTIME, value, nullptr);
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
LOG(logDEBUG1) << "Sending sub exptime to Receiver: " << value;
sendToReceiver(F_RECEIVER_SET_SUB_EXPTIME, value, nullptr);
}
@ -1198,7 +1221,7 @@ int64_t Module::getSubDeadTime() {
void Module::setSubDeadTime(int64_t value) {
LOG(logDEBUG1) << "Setting sub deadtime to " << value << "ns";
sendToDetector(F_SET_SUB_DEADTIME, value, nullptr);
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
LOG(logDEBUG1) << "Sending sub deadtime to Receiver: " << value;
sendToReceiver(F_RECEIVER_SET_SUB_DEADTIME, value, nullptr);
}
@ -1298,7 +1321,7 @@ void Module::setTimingMode(timingMode value) {
timingMode retval = GET_TIMING_MODE;
LOG(logDEBUG1) << "Setting timing mode to " << value;
sendToDetector(F_SET_TIMING_MODE, static_cast<int>(value), retval);
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
LOG(logDEBUG1) << "Sending timing mode to Receiver: " << value;
sendToReceiver(F_SET_RECEIVER_TIMING_MODE, value, nullptr);
}
@ -1323,7 +1346,7 @@ void Module::setDynamicRange(int n) {
sendToDetector(F_SET_DYNAMIC_RANGE, n, retval);
LOG(logDEBUG1) << "Dynamic Range: " << retval;
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
int arg = retval;
retval = -1;
LOG(logDEBUG1) << "Sending dynamic range to receiver: " << arg;
@ -1438,7 +1461,7 @@ void Module::setReadoutMode(const slsDetectorDefs::readoutMode mode) {
if (shm()->myDetectorType == CHIPTESTBOARD) {
updateNumberOfChannels();
}
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_RECEIVER_SET_READOUT_MODE, mode, nullptr);
}
}
@ -1495,7 +1518,7 @@ void Module::setReceiverHostname(const std::string &receiverIP) {
if (receiverIP == "none") {
memset(shm()->rxHostname, 0, MAX_STR_LENGTH);
sls::strcpy_safe(shm()->rxHostname, "none");
shm()->useReceiverFlag = false;
shm()->useReceiver = false;
}
// stop acquisition if running
@ -1512,7 +1535,7 @@ void Module::setReceiverHostname(const std::string &receiverIP) {
shm()->rxTCPPort = std::stoi(res[1]);
}
sls::strcpy_safe(shm()->rxHostname, host.c_str());
shm()->useReceiverFlag = true;
shm()->useReceiver = true;
checkReceiverVersionCompatibility();
// populate parameters from detector
@ -1665,7 +1688,7 @@ void Module::setDestinationUDPIP(const IpAddr ip) {
throw RuntimeError("Invalid destination udp ip address");
}
sendToDetector(F_SET_DEST_UDP_IP, ip, nullptr);
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sls::MacAddr retval(0LU);
sendToReceiver(F_SET_RECEIVER_UDP_IP, ip, retval);
LOG(logINFO) << "Setting destination udp mac of detector " << moduleId << " to " << retval;
@ -1688,7 +1711,7 @@ void Module::setDestinationUDPIP2(const IpAddr ip) {
}
sendToDetector(F_SET_DEST_UDP_IP2, ip, nullptr);
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sls::MacAddr retval(0LU);
sendToReceiver(F_SET_RECEIVER_UDP_IP2, ip, retval);
LOG(logINFO) << "Setting destination udp mac2 of detector " << moduleId << " to " << retval;
@ -1741,7 +1764,7 @@ sls::MacAddr Module::getDestinationUDPMAC2() {
void Module::setDestinationUDPPort(const int port) {
LOG(logDEBUG1) << "Setting destination udp port to " << port;
sendToDetector(F_SET_DEST_UDP_PORT, port, nullptr);
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_SET_RECEIVER_UDP_PORT, port, nullptr);
}
}
@ -1758,7 +1781,7 @@ int Module::getDestinationUDPPort() {
void Module::setDestinationUDPPort2(const int port) {
LOG(logDEBUG1) << "Setting destination udp port2 to " << port;
sendToDetector(F_SET_DEST_UDP_PORT2, port, nullptr);
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_SET_RECEIVER_UDP_PORT2, port, nullptr);
}
}
@ -1775,7 +1798,7 @@ void Module::setNumberofUDPInterfaces(int n) {
LOG(logDEBUG1) << "Setting number of udp interfaces to " << n;
sendToDetector(F_SET_NUM_INTERFACES, n, nullptr);
shm()->numUDPInterfaces = n;
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_SET_RECEIVER_NUM_INTERFACES, n, nullptr);
}
}
@ -1812,14 +1835,14 @@ void Module::setClientStreamingPort(int port) { shm()->zmqport = port; }
int Module::getClientStreamingPort() { return shm()->zmqport; }
void Module::setReceiverStreamingPort(int port) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (zmq port)");
}
sendToReceiver(F_SET_RECEIVER_STREAMING_PORT, port, nullptr);
}
int Module::getReceiverStreamingPort() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to get receiver parameters (zmq port)");
}
return sendToReceiver<int>(F_GET_RECEIVER_STREAMING_PORT);
@ -1836,7 +1859,7 @@ void Module::setClientStreamingIP(const sls::IpAddr ip) {
sls::IpAddr Module::getClientStreamingIP() { return shm()->zmqip; }
void Module::setReceiverStreamingIP(const sls::IpAddr ip) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (streaming ip)");
}
if (ip == 0) {
@ -1851,7 +1874,7 @@ void Module::setReceiverStreamingIP(const sls::IpAddr ip) {
}
sls::IpAddr Module::getReceiverStreamingIP() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (streaming ip)");
}
return sendToReceiver<sls::IpAddr>(F_GET_RECEIVER_STREAMING_SRC_IP);
@ -1921,7 +1944,7 @@ void Module::setTransmissionDelayRight(int value) {
void Module::setAdditionalJsonHeader(const std::map<std::string, std::string> &jsonHeader) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (zmq json header)");
}
for (auto &it : jsonHeader) {
@ -1959,7 +1982,7 @@ void Module::setAdditionalJsonHeader(const std::map<std::string, std::string> &j
}
std::map<std::string, std::string> Module::getAdditionalJsonHeader() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (zmq json header)");
}
int fnum = F_GET_ADDITIONAL_JSON_HEADER;
@ -1990,7 +2013,7 @@ std::map<std::string, std::string> Module::getAdditionalJsonHeader() {
}
void Module::setAdditionalJsonParameter(const std::string &key, const std::string &value) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (zmq json parameter)");
}
if (key.empty() || key.length() > SHORT_STR_LENGTH ||
@ -2005,7 +2028,7 @@ void Module::setAdditionalJsonParameter(const std::string &key, const std::strin
}
std::string Module::getAdditionalJsonParameter(const std::string &key) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (zmq json parameter)");
}
char arg[SHORT_STR_LENGTH]{};
@ -2019,7 +2042,7 @@ int64_t Module::setReceiverUDPSocketBufferSize(int64_t udpsockbufsize) {
LOG(logDEBUG1) << "Sending UDP Socket Buffer size to receiver: "
<< udpsockbufsize;
int64_t retval = -1;
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_RECEIVER_UDP_SOCK_BUF_SIZE, udpsockbufsize, retval);
LOG(logDEBUG1) << "Receiver UDP Socket Buffer size: " << retval;
}
@ -2196,7 +2219,7 @@ void Module::setBurstMode(slsDetectorDefs::burstMode value) {
int arg = static_cast<int>(value);
LOG(logDEBUG1) << "Setting burst mode to " << arg;
sendToDetector(F_SET_BURST_MODE, arg, nullptr);
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
LOG(logDEBUG1) << "Sending burst mode to Receiver: " << value;
sendToReceiver(F_SET_RECEIVER_BURST_MODE, value, nullptr);
}
@ -2253,7 +2276,7 @@ void Module::setROI(slsDetectorDefs::ROI arg) {
LOG(logDEBUG) << "Sending ROI to detector [" << arg.xmin << ", "
<< arg.xmax << "]";
sendToDetector(F_SET_ROI, args, nullptr);
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
LOG(logDEBUG1) << "Sending ROI to receiver";
sendToReceiver(F_RECEIVER_SET_ROI, arg, nullptr);
}
@ -2283,7 +2306,7 @@ void Module::setADCEnableMask(uint32_t mask) {
if (shm()->myDetectorType == MOENCH)
setAdditionalJsonParameter("adcmask_1g", std::to_string(mask));
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
int fnum = F_RECEIVER_SET_ADC_MASK;
int retval = -1;
LOG(logDEBUG1) << "Setting ADC Enable mask to 0x" << std::hex
@ -2313,7 +2336,7 @@ void Module::setTenGigaADCEnableMask(uint32_t mask) {
if (shm()->myDetectorType == MOENCH)
setAdditionalJsonParameter("adcmask_10g", std::to_string(mask));
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
int fnum = F_RECEIVER_SET_ADC_MASK_10G;
int retval = -1;
LOG(logDEBUG1) << "Setting 10Gb ADC Enable mask to 0x" << std::hex
@ -2369,7 +2392,7 @@ int Module::setExternalSampling(int value) {
int Module::getExternalSampling() { return setExternalSampling(-1); }
void Module::setReceiverDbitList(const std::vector<int>& list) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (dbit list)");
}
@ -2389,7 +2412,7 @@ void Module::setReceiverDbitList(const std::vector<int>& list) {
}
std::vector<int> Module::getReceiverDbitList() const {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (dbit list)");
}
sls::FixedCapacityContainer<int, MAX_RX_DBIT> retval;
@ -2398,14 +2421,14 @@ std::vector<int> Module::getReceiverDbitList() const {
}
void Module::setReceiverDbitOffset(int value) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (dbit offset)");
}
sendToReceiver(F_SET_RECEIVER_DBIT_OFFSET, value, nullptr);
}
int Module::getReceiverDbitOffset() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (dbit offset)");
}
return sendToReceiver<int>(F_GET_RECEIVER_DBIT_OFFSET);
@ -2424,21 +2447,21 @@ int Module::activate(int enable) {
sendToDetector(F_ACTIVATE, enable, retval);
sendToDetectorStop(F_ACTIVATE, enable, retval);
LOG(logDEBUG1) << "Activate: " << retval;
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_RECEIVER_ACTIVATE, retval, nullptr);
}
return retval;
}
bool Module::getDeactivatedRxrPaddingMode() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (deactivated padding)");
}
return sendToReceiver<int>(F_GET_RECEIVER_DEACTIVATED_PADDING);
}
void Module::setDeactivatedRxrPaddingMode(bool padding) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (deactivated padding)");
}
int arg = static_cast<int>(padding);
@ -2446,7 +2469,7 @@ void Module::setDeactivatedRxrPaddingMode(bool padding) {
}
bool Module::getFlippedDataX() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (flipped data x)");
}
int retval = -1;
@ -2457,7 +2480,7 @@ bool Module::getFlippedDataX() {
}
void Module::setFlippedDataX(bool value) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (flipped data x)");
}
int retval = -1;
@ -2808,12 +2831,12 @@ std::string Module::printReceiverConfiguration() {
return os.str();
}
bool Module::getUseReceiverFlag() const { return shm()->useReceiverFlag; }
bool Module::getUseReceiverFlag() const { return shm()->useReceiver; }
int Module::lockReceiver(int lock) {
LOG(logDEBUG1) << "Setting receiver server lock to " << lock;
int retval = -1;
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_LOCK_RECEIVER, lock, retval);
LOG(logDEBUG1) << "Receiver Lock: " << retval;
}
@ -2823,7 +2846,7 @@ int Module::lockReceiver(int lock) {
sls::IpAddr Module::getReceiverLastClientIP() const {
sls::IpAddr retval;
LOG(logDEBUG1) << "Getting last client ip to receiver server";
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_GET_LAST_RECEIVER_CLIENT_IP, nullptr, retval);
LOG(logDEBUG1) << "Last client IP from receiver: " << retval;
}
@ -2832,7 +2855,7 @@ sls::IpAddr Module::getReceiverLastClientIP() const {
void Module::exitReceiver() {
LOG(logDEBUG1) << "Sending exit command to receiver server";
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_EXIT_RECEIVER, nullptr, nullptr);
}
}
@ -2842,14 +2865,14 @@ void Module::execReceiverCommand(const std::string &cmd) {
char retval[MAX_STR_LENGTH]{};
sls::strcpy_safe(arg, cmd.c_str());
LOG(logDEBUG1) << "Sending command to receiver: " << arg;
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_EXEC_RECEIVER_COMMAND, arg, retval);
LOG(logINFO) << "Receiver " << moduleId << " returned:\n" << retval;
}
}
std::string Module::getFilePath() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (file path)");
}
char retvals[MAX_STR_LENGTH]{};
@ -2858,7 +2881,7 @@ std::string Module::getFilePath() {
}
void Module::setFilePath(const std::string &path) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (file path)");
}
if (path.empty()) {
@ -2870,7 +2893,7 @@ void Module::setFilePath(const std::string &path) {
}
std::string Module::getFileName() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (file name prefix)");
}
char retvals[MAX_STR_LENGTH]{};
@ -2879,7 +2902,7 @@ std::string Module::getFileName() {
}
void Module::setFileName(const std::string &fname) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (file name prefix)");
}
if (fname.empty()) {
@ -2891,28 +2914,28 @@ void Module::setFileName(const std::string &fname) {
}
int64_t Module::getFileIndex() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (file index)");
}
return sendToReceiver<int64_t>(F_GET_RECEIVER_FILE_INDEX);
}
void Module::setFileIndex(int64_t file_index) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (file index)");
}
sendToReceiver(F_SET_RECEIVER_FILE_INDEX, file_index, nullptr);
}
void Module::incrementFileIndex() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (increment file index)");
}
sendToReceiver(F_INCREMENT_FILE_INDEX, nullptr, nullptr);
}
slsDetectorDefs::fileFormat Module::getFileFormat() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (file format)");
}
return static_cast<fileFormat>(
@ -2920,7 +2943,7 @@ slsDetectorDefs::fileFormat Module::getFileFormat() {
}
void Module::setFileFormat(fileFormat f) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (file format)");
}
int arg = static_cast<int>(f);
@ -2928,21 +2951,21 @@ void Module::setFileFormat(fileFormat f) {
}
int Module::getFramesPerFile() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (frames per file)");
}
return sendToReceiver<int>(F_GET_RECEIVER_FRAMES_PER_FILE);
}
void Module::setFramesPerFile(int n_frames) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (frames per file)");
}
sendToReceiver(F_SET_RECEIVER_FRAMES_PER_FILE, n_frames, nullptr);
}
slsDetectorDefs::frameDiscardPolicy Module::getReceiverFramesDiscardPolicy() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (frame discard policy)");
}
return static_cast<frameDiscardPolicy>(
@ -2950,7 +2973,7 @@ slsDetectorDefs::frameDiscardPolicy Module::getReceiverFramesDiscardPolicy() {
}
void Module::setReceiverFramesDiscardPolicy(frameDiscardPolicy f) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (frame discard policy)");
}
int arg = static_cast<int>(f);
@ -2958,14 +2981,14 @@ void Module::setReceiverFramesDiscardPolicy(frameDiscardPolicy f) {
}
bool Module::getPartialFramesPadding() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (frame padding)");
}
return sendToReceiver<int>(F_GET_RECEIVER_PADDING);
}
void Module::setPartialFramesPadding(bool padding) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (frame padding)");
}
int arg = static_cast<int>(padding);
@ -2975,14 +2998,14 @@ void Module::setPartialFramesPadding(bool padding) {
void Module::startReceiver() {
LOG(logDEBUG1) << "Starting Receiver";
shm()->stoppedFlag = false;
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_START_RECEIVER, nullptr, nullptr);
}
}
void Module::stopReceiver() {
LOG(logDEBUG1) << "Stopping Receiver";
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
int arg = static_cast<int>(shm()->stoppedFlag);
sendToReceiver(F_STOP_RECEIVER, arg, nullptr);
}
@ -2991,7 +3014,7 @@ void Module::stopReceiver() {
slsDetectorDefs::runStatus Module::getReceiverStatus() const {
runStatus retval = ERROR;
LOG(logDEBUG1) << "Getting Receiver Status";
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_GET_RECEIVER_STATUS, nullptr, retval);
LOG(logDEBUG1) << "Receiver Status: " << ToString(retval);
}
@ -3001,7 +3024,7 @@ slsDetectorDefs::runStatus Module::getReceiverStatus() const {
int64_t Module::getFramesCaughtByReceiver() const {
int64_t retval = -1;
LOG(logDEBUG1) << "Getting Frames Caught by Receiver";
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_GET_RECEIVER_FRAMES_CAUGHT, nullptr, retval);
LOG(logDEBUG1) << "Frames Caught by Receiver: " << retval;
}
@ -3010,7 +3033,7 @@ int64_t Module::getFramesCaughtByReceiver() const {
std::vector<uint64_t> Module::getNumMissingPackets() const {
LOG(logDEBUG1) << "Getting num missing packets";
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
int fnum = F_GET_NUM_MISSING_PACKETS;
int ret = FAIL;
auto client = ReceiverSocket(shm()->rxHostname, shm()->rxTCPPort);
@ -3038,7 +3061,7 @@ std::vector<uint64_t> Module::getNumMissingPackets() const {
uint64_t Module::getReceiverCurrentFrameIndex() const {
uint64_t retval = -1;
LOG(logDEBUG1) << "Getting Current Frame Index of Receiver";
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_GET_RECEIVER_FRAME_INDEX, nullptr, retval);
LOG(logDEBUG1) << "Current Frame Index of Receiver: " << retval;
}
@ -3047,7 +3070,7 @@ uint64_t Module::getReceiverCurrentFrameIndex() const {
int Module::getReceiverProgress() const {
int retval = -1;
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_GET_RECEIVER_PROGRESS, nullptr, retval);
LOG(logDEBUG1) << "Current Progress of Receiver: " << retval;
}
@ -3055,7 +3078,7 @@ int Module::getReceiverProgress() const {
}
void Module::setFileWrite(bool value) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (file write enable)");
}
int arg = static_cast<int>(value);
@ -3063,14 +3086,14 @@ void Module::setFileWrite(bool value) {
}
bool Module::getFileWrite() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (file_write enable)");
}
return sendToReceiver<int>(F_GET_RECEIVER_FILE_WRITE);
}
void Module::setMasterFileWrite(bool value) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (master file write enable)");
}
int arg = static_cast<int>(value);
@ -3078,14 +3101,14 @@ void Module::setMasterFileWrite(bool value) {
}
bool Module::getMasterFileWrite() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (master file write enable)");
}
return sendToReceiver<int>(F_GET_RECEIVER_MASTER_FILE_WRITE);
}
void Module::setFileOverWrite(bool value) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (file overwrite enable)");
}
int arg = static_cast<int>(value);
@ -3093,21 +3116,21 @@ void Module::setFileOverWrite(bool value) {
}
bool Module::getFileOverWrite() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (file overwrite enable)");
}
return sendToReceiver<int>(F_GET_RECEIVER_OVERWRITE);
}
int Module::getReceiverStreamingFrequency() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (streaming/read frequency)");
}
return sendToReceiver<int>(F_GET_RECEIVER_STREAMING_FREQUENCY);
}
void Module::setReceiverStreamingFrequency(int freq) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (streaming/read frequency)");
}
if (freq < 0) {
@ -3119,7 +3142,7 @@ void Module::setReceiverStreamingFrequency(int freq) {
int Module::setReceiverStreamingTimer(int time_in_ms) {
int retval = -1;
LOG(logDEBUG1) << "Sending read timer to receiver: " << time_in_ms;
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_RECEIVER_STREAMING_TIMER, time_in_ms, retval);
LOG(logDEBUG1) << "Receiver read timer: " << retval;
}
@ -3127,14 +3150,14 @@ int Module::setReceiverStreamingTimer(int time_in_ms) {
}
bool Module::getReceiverStreaming() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to get receiver parameters (zmq enable)");
}
return sendToReceiver<int>(F_GET_RECEIVER_STREAMING);
}
void Module::setReceiverStreaming(bool enable) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (zmq enable)");
}
int arg = static_cast<int>(enable);
@ -3151,7 +3174,7 @@ bool Module::enableTenGigabitEthernet(int value) {
}
LOG(logDEBUG1) << "10Gbe: " << retval;
value = retval;
if (shm()->useReceiverFlag && value != -1) {
if (shm()->useReceiver && value != -1) {
int retval = -1;
LOG(logDEBUG1) << "Sending 10Gbe enable to receiver: " << value;
sendToReceiver(F_ENABLE_RECEIVER_TEN_GIGA, value, retval);
@ -3163,7 +3186,7 @@ bool Module::enableTenGigabitEthernet(int value) {
int Module::setReceiverFifoDepth(int n_frames) {
int retval = -1;
LOG(logDEBUG1) << "Sending Receiver Fifo Depth: " << n_frames;
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_SET_RECEIVER_FIFO_DEPTH, n_frames, retval);
LOG(logDEBUG1) << "Receiver Fifo Depth: " << retval;
}
@ -3171,14 +3194,14 @@ int Module::setReceiverFifoDepth(int n_frames) {
}
bool Module::getReceiverSilentMode() {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (silent mode)");
}
return sendToReceiver<int>(F_GET_RECEIVER_SILENT_MODE);
}
void Module::setReceiverSilentMode(bool enable) {
if (!shm()->useReceiverFlag) {
if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (silent mode)");
}
int arg = static_cast<int>(enable);
@ -3187,7 +3210,7 @@ void Module::setReceiverSilentMode(bool enable) {
void Module::restreamStopFromReceiver() {
LOG(logDEBUG1) << "Restream stop dummy from Receiver via zmq";
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
sendToReceiver(F_RESTREAM_STOP_FROM_RECEIVER, nullptr, nullptr);
}
}
@ -3392,7 +3415,7 @@ void Module::setPipeline(int clkIndex, int value) {
void Module::setCounterMask(uint32_t countermask) {
LOG(logDEBUG1) << "Setting Counter mask to " << countermask;
sendToDetector(F_SET_COUNTER_MASK, countermask, nullptr);
if (shm()->useReceiverFlag) {
if (shm()->useReceiver) {
int ncounters = __builtin_popcount(countermask);
LOG(logDEBUG1) << "Sending Reciver #counters: " << ncounters;
sendToReceiver(F_RECEIVER_SET_NUM_COUNTERS, ncounters, nullptr);

View File

@ -13,8 +13,9 @@
class ServerInterface;
#define MODULE_SHMRXVERSION 0x200415
#define MODULE_SHMAPIVERSION 0x190726
#define MODULE_SHMVERSION 0x200402
#define MODULE_SHMVERSION 0x200415
namespace sls{
@ -36,6 +37,9 @@ struct sharedModule {
/** detector type \ see :: detectorType*/
slsDetectorDefs::detectorType myDetectorType;
int numberOfReceivers;
int numberOfReceivers2;
/** END OF FIXED PATTERN -----------------------------------------------*/
/** Number of detectors in multi list in x dir and y dir */
@ -70,7 +74,7 @@ struct sharedModule {
/** is set if the receiver hostname given and is connected,
* unset if socket connection is not possible */
bool useReceiverFlag;
bool useReceiver;
/** tcp port from gui/different process to receiver (only data) */
int zmqport;
@ -118,6 +122,12 @@ class Module : public virtual slsDetectorDefs {
*/
bool isFixedPatternSharedMemoryCompatible();
static bool hasSharedMemoryReceiverList(int version);
int getNumberOfReceivers() const;
void setNumberOfReceivers(const int num);
int getNumberOfReceivers2() const;
void setNumberOfReceivers2(const int num);
/**
* Check version compatibility with receiver software
*/

View File

@ -3,17 +3,13 @@
namespace sls {
size_t Receiver::NUM_RECEIVERS{0};
size_t Receiver::getNumReceivers() {
return NUM_RECEIVERS;
}
// create shm
Receiver::Receiver(int detector_id, int module_id, int receiver_id,
int tcp_port, std::string hostname)
: receiverId(receiver_id), shm(detector_id, module_id, receiver_id) {
bool primaryInterface, int tcp_port, std::string hostname,
int zmq_port) :
receiverId(receiver_id), moduleId(module_id),
shm(detector_id, module_id, receiver_id, primaryInterface) {
// ensure shared memory was not created before
if (shm.IsExisting()) {
@ -22,19 +18,24 @@ Receiver::Receiver(int detector_id, int module_id, int receiver_id,
<< shm.GetName() << ". Freeing it again";
shm.RemoveSharedMemory();
}
shm = SharedMemory<sharedReceiver>(detector_id, module_id, receiver_id);
++NUM_RECEIVERS;
shm = SharedMemory<sharedReceiver>(detector_id, module_id, receiver_id,
primaryInterface);
shm.CreateSharedMemory();
// initalize receiver structure
shm()->shmversion = RECEIVER_SHMVERSION;
memset(shm()->hostname, 0, MAX_STR_LENGTH);
shm()->tcpPort = DEFAULT_RX_PORTNO + NUM_RECEIVERS - 1;
shm()->zmqPort = DEFAULT_ZMQ_RX_PORTNO + NUM_RECEIVERS - 1;
shm()->tcpPort = DEFAULT_RX_PORTNO + receiver_id;
shm()->valid = false;
shm()->zmqPort = DEFAULT_ZMQ_RX_PORTNO + receiver_id;
shm()->zmqIp = IpAddr{};
// copy port, hostname if given
if (tcp_port != 0) {
shm()->tcpPort = tcp_port;
setTCPPort(tcp_port);
}
if (zmq_port != 0) {
shm()->zmqPort = zmq_port;
}
if (!hostname.empty()) {
setHostname(hostname);
@ -43,42 +44,60 @@ Receiver::Receiver(int detector_id, int module_id, int receiver_id,
// open shm
Receiver::Receiver(int detector_id, int module_id, int receiver_id,
bool verify)
: receiverId(receiver_id), shm(detector_id, module_id, receiver_id) {
bool primaryInterface, bool verify) :
receiverId(receiver_id), moduleId(module_id),
shm(detector_id, module_id, receiver_id, primaryInterface) {
shm.OpenSharedMemory();
if (verify && shm()->shmversion != RECEIVER_SHMVERSION) {
std::ostringstream ss;
ss << "Receiver shared memory (" << detector_id << "-" << receiverId
<< ":) version mismatch (expected 0x" << std::hex
ss << "Receiver shared memory (" << detector_id << "-" << moduleId
<< ":" << receiverId << ") version mismatch (expected 0x" << std::hex
<< RECEIVER_SHMVERSION << " but got 0x" << shm()->shmversion << ")"
<< std::dec << ". Clear Shared memory to continue.";
throw SharedMemoryError(ss.str());
}
}
Receiver::~Receiver() {
--NUM_RECEIVERS;
Receiver::~Receiver() = default;
void Receiver::freeSharedMemory() {
if (shm.IsExisting()) {
shm.RemoveSharedMemory();
}
}
void Receiver::setHostname(const std::string &ip_port) {
if (ip_port.empty()) {
std::string Receiver::getHostname() const {
return shm()->hostname;
}
void Receiver::setHostname(const std::string &hostname) {
if (hostname.empty()) {
throw RuntimeError("Invalid receiver hostname. Cannot be empty.");
}
// parse tcp port from this hostname:port string
std::string host = ip_port;
auto res = sls::split(host, ':');
if (res.size() > 1) {
host = res[0];
shm()->tcpPort = std::stoi(res[1]);
}
sls::strcpy_safe(shm()->hostname, host.c_str());
updateReceiver();
sls::strcpy_safe(shm()->hostname, hostname.c_str());
configure();
}
void Receiver::updateReceiver() {
void Receiver::configure() {
shm()->valid = false;
LOG(logINFOBLUE) << receiverId << " configured!";
//checkReceiverVersionCompatibility();
shm()->valid = true;
}
int Receiver::getTCPPort() const {
return shm()->tcpPort;
}
void Receiver::setTCPPort(const int port) {
if (port >= 0 && port != shm()->tcpPort) {
if (shm()->valid) {
// send to receiver to change tcpp port
shm()->tcpPort = port; // for now
} else {
shm()->tcpPort = port;
}
}
}
} // namespace sls

View File

@ -14,6 +14,7 @@ namespace sls {
int shmversion;
char hostname[MAX_STR_LENGTH];
int tcpPort;
bool valid;
/** END OF FIXED PATTERN -----------------------------------------------*/
@ -27,20 +28,33 @@ namespace sls {
static size_t getNumReceivers();
// create shm
explicit Receiver(int detector_id, int module_id, int receiver_id,
int tcp_port = 0, std::string hostname = "");
bool primaryInterface, int tcp_port = 0, std::string hostname = "",
int zmq_port = 0);
// open shm
explicit Receiver(int detector_id, int module_id, int receiver_id,
bool verify);
bool primaryInterface, bool verify);
virtual ~Receiver();
void setHostname(const std::string &ip_port);
void updateReceiver();
/**
* Free shared memory and delete shared memory structure
* occupied by the sharedReceiver structure
* Is only safe to call if one deletes the Receiver object afterward
* and frees multi shared memory/updates
* thisMultiDetector->numberOfReceivers
*/
void freeSharedMemory();
std::string getHostname() const;
void setHostname(const std::string &hostname);
void configure();
int getTCPPort() const;
void setTCPPort(const int port);
private:
static size_t NUM_RECEIVERS;
const int receiverId{0};
mutable sls::SharedMemory<sharedReceiver> shm{0, 0, 0};
const int moduleId{0};
mutable sls::SharedMemory<sharedReceiver> shm{0, 0, 0, true};
};
} // sls

View File

@ -25,6 +25,7 @@
#define SHM_MULTI_PREFIX "/slsDetectorPackage_multi_"
#define SHM_MODULE_PREFIX "_module_"
#define SHM_RECEIVER_PREFIX "_receiver_"
#define SHM_RECEIVER2_PREFIX "_receiver2_"
#define SHM_ENV_NAME "SLSDETNAME"
#include <iostream>
@ -42,9 +43,10 @@ class SharedMemory {
* @param multiId multi detector id
* @param moduleId module detector id, -1 if a multi detector shared memory
* @param receiverId receiver id, -1 if a multi detector or module shared memory
* @param primaryInterface is false, for the second udp port receiver
*/
SharedMemory(int multiId, int moduleId, int receiverId = -1) {
name = ConstructSharedMemoryName(multiId, moduleId, receiverId);
SharedMemory(int multiId, int moduleId, int receiverId = -1, bool primaryInterface = true) {
name = ConstructSharedMemoryName(multiId, moduleId, receiverId, primaryInterface);
}
/**
@ -186,7 +188,8 @@ class SharedMemory {
// silent exit if shm did not exist anyway
if (errno == ENOENT)
return;
std::string msg = "Free Shared Memory " + name + " Failed: " + strerror(errno);
std::string msg = "Free Shared Memory " + name + " Failed: " +
strerror(errno);
LOG(logERROR) << msg;
throw SharedMemoryError(msg);
}
@ -219,9 +222,11 @@ class SharedMemory {
* @param multiId multi detector id
* @param moduleId module detector id, -1 if a multi detector shared memory
* @param receiverId receiver id, -1 if a multi detector or module shared memory
* @param primaryInterface false, if second udp port receiver
* @returns shared memory name
*/
std::string ConstructSharedMemoryName(int multiId, int moduleId, int receiverId) {
std::string ConstructSharedMemoryName(int multiId, int moduleId,
int receiverId, bool primaryInterface) {
// using environment path
std::string sEnvPath = "";
@ -232,13 +237,20 @@ class SharedMemory {
}
std::stringstream ss;
if (moduleId < 0)
if (moduleId < 0 && receiverId < 0)
ss << SHM_MULTI_PREFIX << multiId << sEnvPath;
else if (receiverId < 0)
ss << SHM_MULTI_PREFIX << multiId << SHM_MODULE_PREFIX << moduleId << sEnvPath;
else
ss << SHM_MULTI_PREFIX << multiId << SHM_MODULE_PREFIX << moduleId <<
ss << SHM_MULTI_PREFIX << multiId <<
SHM_MODULE_PREFIX << moduleId << sEnvPath;
else if (primaryInterface)
ss << SHM_MULTI_PREFIX << multiId <<
SHM_MODULE_PREFIX << moduleId <<
SHM_RECEIVER_PREFIX << receiverId << sEnvPath;
else
ss << SHM_MULTI_PREFIX << multiId <<
SHM_MODULE_PREFIX << moduleId <<
SHM_RECEIVER2_PREFIX << receiverId << sEnvPath;
std::string temp = ss.str();
if (temp.length() > NAME_MAX_LENGTH) {
@ -259,9 +271,11 @@ class SharedMemory {
*/
T *MapSharedMemory() {
void *addr = mmap(nullptr, sizeof(T), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
void *addr = mmap(nullptr, sizeof(T), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (addr == MAP_FAILED) {
std::string msg = "Mapping shared memory " + name + " failed: " + strerror(errno);
std::string msg = "Mapping shared memory " + name + " failed: " +
strerror(errno);
LOG(logERROR) << msg;
close(fd);
throw SharedMemoryError(msg);
@ -280,7 +294,8 @@ class SharedMemory {
struct stat sb;
// could not fstat
if (fstat(fd, &sb) < 0) {
std::string msg = "Could not verify existing shared memory " + name + " size match " + "(could not fstat): " + strerror(errno);
std::string msg = "Could not verify existing shared memory " +
name + " size match " + "(could not fstat): " + strerror(errno);
LOG(logERROR) << msg;
close(fd);
throw SharedMemoryError(msg);
@ -289,7 +304,9 @@ class SharedMemory {
//size does not match
auto sz = static_cast<size_t>(sb.st_size);
if (sz != expectedSize) {
std::string msg = "Existing shared memory " + name + " size does not match" + "Expected " + std::to_string(expectedSize) + ", found " + std::to_string(sz);
std::string msg = "Existing shared memory " + name +
" size does not match" + "Expected " +
std::to_string(expectedSize) + ", found " + std::to_string(sz);
LOG(logERROR) << msg;
throw SharedMemoryError(msg);
return 1;