zmq hwm are specified to 2 for gui and restreaming of receiver if all zmq not closed at end of acquiistion

This commit is contained in:
2020-10-08 13:01:01 +02:00
parent c9bba6fbdc
commit 6c1035aa99
21 changed files with 614 additions and 190 deletions

View File

@ -158,6 +158,8 @@ void DetectorImpl::initializeDetectorStructure() {
multi_shm()->acquiringFlag = false;
multi_shm()->initialChecks = true;
multi_shm()->gapPixels = false;
// zmqlib default
multi_shm()->zmqHwm = -1;
}
void DetectorImpl::initializeMembers(bool verify) {
@ -377,16 +379,17 @@ void DetectorImpl::setGapPixelsinCallback(const bool enable) {
multi_shm()->gapPixels = enable;
}
int DetectorImpl::createReceivingDataSockets(const bool destroy) {
if (destroy) {
LOG(logINFO) << "Going to destroy data sockets";
// close socket
zmqSocket.clear();
int DetectorImpl::destroyReceivingDataSockets() {
LOG(logINFO) << "Going to destroy data sockets";
// close socket
zmqSocket.clear();
client_downstream = false;
LOG(logINFO) << "Destroyed Receiving Data Socket(s)";
return OK;
}
client_downstream = false;
LOG(logINFO) << "Destroyed Receiving Data Socket(s)";
return OK;
}
int DetectorImpl::createReceivingDataSockets() {
if (client_downstream) {
return OK;
}
@ -417,11 +420,21 @@ int DetectorImpl::createReceivingDataSockets(const bool destroy) {
.str()
.c_str(),
portnum));
// set high water mark
int hwm = multi_shm()->zmqHwm;
if (hwm >= 0) {
zmqSocket[iSocket]->SetReceiveHighWaterMark(hwm);
if (zmqSocket[iSocket]->GetReceiveHighWaterMark() != hwm) {
throw sls::ZmqSocketError("Could not set zmq rcv hwm to " +
std::to_string(hwm));
}
}
LOG(logINFO) << "Zmq Client[" << iSocket << "] at "
<< zmqSocket.back()->GetZmqServerAddress();
<< zmqSocket.back()->GetZmqServerAddress() << "[hwm: "
<< zmqSocket.back()->GetReceiveHighWaterMark() << "]";
} catch (...) {
LOG(logERROR) << "Could not create Zmq socket on port " << portnum;
createReceivingDataSockets(true);
destroyReceivingDataSockets();
return FAIL;
}
}
@ -449,12 +462,12 @@ void DetectorImpl::readFrameFromReceiver() {
}
std::vector<bool> runningList(zmqSocket.size());
std::vector<bool> connectList(zmqSocket.size());
int numRunning = 0;
numZmqRunning = 0;
for (size_t i = 0; i < zmqSocket.size(); ++i) {
if (zmqSocket[i]->Connect() == 0) {
connectList[i] = true;
runningList[i] = true;
++numRunning;
++numZmqRunning;
} else {
// to remember the list it connected to, to disconnect later
connectList[i] = false;
@ -480,14 +493,14 @@ void DetectorImpl::readFrameFromReceiver() {
uint32_t currentSubFrameIndex = -1, coordX = -1, coordY = -1,
flippedDataX = -1;
while (numRunning != 0) {
while (numZmqRunning != 0) {
// reset data
data = false;
if (multiframe != nullptr) {
memset(multiframe.get(), 0xFF, multisize);
}
completeImage = (numRunning == (int)zmqSocket.size());
completeImage = (numZmqRunning == (int)zmqSocket.size());
// get each frame
for (unsigned int isocket = 0; isocket < zmqSocket.size(); ++isocket) {
@ -505,7 +518,7 @@ void DetectorImpl::readFrameFromReceiver() {
// socket
runningList[isocket] = false;
completeImage = false;
--numRunning;
--numZmqRunning;
continue;
}
@ -967,7 +980,7 @@ bool DetectorImpl::getDataStreamingToClient() { return client_downstream; }
void DetectorImpl::setDataStreamingToClient(bool enable) {
// destroy data threads
if (!enable) {
createReceivingDataSockets(true);
destroyReceivingDataSockets();
// create data threads
} else {
if (createReceivingDataSockets() == FAIL) {
@ -976,6 +989,51 @@ void DetectorImpl::setDataStreamingToClient(bool enable) {
}
}
int DetectorImpl::getClientStreamingHwm() const {
// disabled
if (!client_downstream) {
return multi_shm()->zmqHwm;
}
// enabled
sls::Result<int> result;
result.reserve(zmqSocket.size());
for (auto &it : zmqSocket) {
result.push_back(it->GetReceiveHighWaterMark());
}
int res = result.tsquash("Inconsistent zmq receive hwm values");
return res;
}
void DetectorImpl::setClientStreamingHwm(const int limit) {
if (limit < -1) {
throw sls::RuntimeError(
"Cannot set hwm to less than -1 (-1 is lib default).");
}
// update shm
multi_shm()->zmqHwm = limit;
// streaming enabled
if (client_downstream) {
// custom limit, set it directly
if (limit >= 0) {
for (auto &it : zmqSocket) {
it->SetReceiveHighWaterMark(limit);
if (it->GetReceiveHighWaterMark() != limit) {
multi_shm()->zmqHwm = -1;
throw sls::ZmqSocketError("Could not set zmq rcv hwm to " +
std::to_string(limit));
}
}
LOG(logINFO) << "Setting Client Zmq socket rcv hwm to " << limit;
}
// default, disable and enable to get default
else {
setDataStreamingToClient(false);
setDataStreamingToClient(true);
}
}
}
void DetectorImpl::registerAcquisitionFinishedCallback(void (*func)(double, int,
void *),
void *pArg) {
@ -1042,6 +1100,12 @@ int DetectorImpl::acquire() {
if (dataReady == nullptr) {
setJoinThreadFlag(true);
}
if (receiver) {
while (numZmqRunning != 0) {
Parallel(&Module::restreamStopFromReceiver, {});
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
dataProcessingThread.join();
if (acquisition_finished != nullptr) {