merge from 7.0.0

This commit is contained in:
2023-02-24 10:39:51 +01:00
101 changed files with 4009 additions and 2128 deletions

View File

@ -253,14 +253,15 @@ void DetectorImpl::setVirtualDetectorServers(const int numdet, const int port) {
}
void DetectorImpl::setHostname(const std::vector<std::string> &name) {
// this check is there only to allow the previous detsizechan command
if (shm()->totalNumberOfModules != 0) {
// do not free always to allow the previous detsize/ initialchecks command
if (shm.exists() && shm()->totalNumberOfModules != 0) {
LOG(logWARNING) << "There are already module(s) in shared memory."
"Freeing Shared memory now.";
bool initialChecks = shm()->initialChecks;
freeSharedMemory();
}
// could be called after freeing shm from API
if (!shm.exists()) {
setupDetector();
shm()->initialChecks = initialChecks;
}
for (const auto &hostname : name) {
addModule(hostname);
@ -342,28 +343,32 @@ void DetectorImpl::updateDetectorSize() {
"updating detector size. ");
}
int maxx = shm()->numberOfChannels.x;
int maxy = shm()->numberOfChannels.y;
int nModx = 0, nMody = 0;
// 1d, add modules along x axis
if (modSize.y == 1) {
if (maxx == 0) {
maxx = modSize.x * size();
int detSizeX = shm()->numberOfChannels.x;
int maxChanX = modSize.x * size();
// user given detsizex used only within max value
if (detSizeX > 1 && detSizeX <= maxChanX) {
maxChanX = detSizeX;
}
nModx = maxx / modSize.x;
nModx = maxChanX / modSize.x;
nMody = size() / nModx;
if ((maxx % modSize.x) > 0) {
if ((maxChanX % modSize.x) > 0) {
++nMody;
}
}
// 2d, add modules along y axis (due to eiger top/bottom)
else {
if (maxy == 0) {
maxy = modSize.y * size();
int detSizeY = shm()->numberOfChannels.y;
int maxChanY = modSize.y * size();
// user given detsizey used only within max value
if (detSizeY > 1 && detSizeY <= maxChanY) {
maxChanY = detSizeY;
}
nMody = maxy / modSize.y;
nMody = maxChanY / modSize.y;
nModx = size() / nMody;
if ((maxy % modSize.y) > 0) {
if ((maxChanY % modSize.y) > 0) {
++nModx;
}
}
@ -518,19 +523,18 @@ void DetectorImpl::setTransmissionDelay(int step) {
f.get();
}
int DetectorImpl::destroyReceivingDataSockets() {
void DetectorImpl::destroyReceivingDataSockets() {
LOG(logINFO) << "Going to destroy data sockets";
// close socket
zmqSocket.clear();
client_downstream = false;
LOG(logINFO) << "Destroyed Receiving Data Socket(s)";
return OK;
}
int DetectorImpl::createReceivingDataSockets() {
void DetectorImpl::createReceivingDataSockets() {
if (client_downstream) {
return OK;
return;
}
LOG(logINFO) << "Going to create data sockets";
@ -557,24 +561,22 @@ int DetectorImpl::createReceivingDataSockets() {
int hwm = shm()->zmqHwm;
if (hwm >= 0) {
zmqSocket[iSocket]->SetReceiveHighWaterMark(hwm);
if (zmqSocket[iSocket]->GetReceiveHighWaterMark() != hwm) {
throw ZmqSocketError("Could not set zmq rcv hwm to " +
std::to_string(hwm));
}
// need not reconnect. cannot be connected (detector idle)
}
LOG(logINFO) << "Zmq Client[" << iSocket << "] at "
<< zmqSocket.back()->GetZmqServerAddress() << "[hwm: "
<< zmqSocket.back()->GetReceiveHighWaterMark() << "]";
} catch (...) {
LOG(logERROR) << "Could not create Zmq socket on port " << portnum;
} catch (std::exception &e) {
destroyReceivingDataSockets();
return FAIL;
std::ostringstream oss;
oss << "Could not create zmq sub socket on port " << portnum;
oss << " [" << e.what() << ']';
throw RuntimeError(oss.str());
}
}
client_downstream = true;
LOG(logINFO) << "Receiving Data Socket(s) created";
return OK;
}
void DetectorImpl::readFrameFromReceiver() {
@ -1114,9 +1116,7 @@ void DetectorImpl::setDataStreamingToClient(bool enable) {
destroyReceivingDataSockets();
// create data threads
} else {
if (createReceivingDataSockets() == FAIL) {
throw RuntimeError("Could not create data threads in client.");
}
createReceivingDataSockets();
}
}
@ -1149,11 +1149,7 @@ void DetectorImpl::setClientStreamingHwm(const int limit) {
if (limit >= 0) {
for (auto &it : zmqSocket) {
it->SetReceiveHighWaterMark(limit);
if (it->GetReceiveHighWaterMark() != limit) {
shm()->zmqHwm = -1;
throw ZmqSocketError("Could not set zmq rcv hwm to " +
std::to_string(limit));
}
// need not reconnect. cannot be connected (detector idle)
}
LOG(logINFO) << "Setting Client Zmq socket rcv hwm to " << limit;
}
@ -1231,8 +1227,7 @@ int DetectorImpl::acquire() {
// let the progress thread (no callback) know acquisition is done
if (dataReady == nullptr) {
setJoinThreadFlag(true);
}
if (receiver) {
} else if (receiver) {
while (numZmqRunning != 0) {
Parallel(&Module::restreamStopFromReceiver, {});
std::this_thread::sleep_for(std::chrono::milliseconds(200));
@ -1322,6 +1317,7 @@ void DetectorImpl::processData(bool receiver) {
}
// only update progress
else {
LOG(logINFO) << "Type 'q' and hit enter to stop acquisition";
double progress = 0;
printProgress(progress);
@ -1778,6 +1774,10 @@ void DetectorImpl::setBadChannels(const std::string &fname, Positions pos) {
if (list.empty()) {
throw RuntimeError("Bad channel file is empty.");
}
setBadChannels(list, pos);
}
void DetectorImpl::setBadChannels(const std::vector<int> list, Positions pos) {
// update to multi values if multi modules
if (isAllPositions(pos)) {
@ -1794,20 +1794,24 @@ void DetectorImpl::setBadChannels(const std::string &fname, Positions pos) {
" out of bounds.");
}
int ch = badchannel % nchan;
int imod = badchannel / nchan;
if (imod >= (int)modules.size()) {
size_t imod = badchannel / nchan;
if (imod >= modules.size()) {
throw RuntimeError("Invalid bad channel list. " +
std::to_string(badchannel) +
" out of bounds.");
}
if ((int)badchannels.size() != imod + 1) {
if (badchannels.size() != imod + 1) {
badchannels.push_back(std::vector<int>{});
}
badchannels[imod].push_back(ch);
}
for (int imod = 0; imod != (int)modules.size(); ++imod) {
Parallel(&Module::setBadChannels, {imod}, badchannels[imod]);
for (size_t imod = 0; imod != modules.size(); ++imod) {
// add empty vector if no bad channels in this module
if (badchannels.size() != imod + 1) {
badchannels.push_back(std::vector<int>{});
}
Parallel(&Module::setBadChannels, {static_cast<int>(imod)},
badchannels[imod]);
}
} else if (pos.size() != 1) {