zmq fixed WIP

This commit is contained in:
2020-04-27 18:43:41 +02:00
parent 56bc9c4e08
commit 92be88ee19
6 changed files with 105 additions and 121 deletions

View File

@ -674,6 +674,9 @@ class Detector {
void setRxZmqIP(const IpAddr ip, Positions pos = {});
Result<bool> getClientZmq(Positions pos = {}) const;
void setClientZmq(const bool enable, Positions pos = {});
Result<int> getClientZmqPort(Positions pos = {}) const;
/**

View File

@ -209,6 +209,7 @@ void Detector::registerDataCallback(void (*func)(detectorData *, uint64_t,
uint32_t, void *),
void *pArg) {
pimpl->registerDataCallback(func, pArg);
setClientZmq(true);
}
bool Detector::getGapPixelsinCallback() const {
@ -577,11 +578,12 @@ Result<int> Detector::getNumberofUDPInterfaces(Positions pos) const {
}
void Detector::setNumberofUDPInterfaces(int n, Positions pos) {
int previouslyClientStreaming = pimpl->enableDataStreamingToClient();
bool prevClientZmq = getClientZmq().tsquash(
"Inconsistent number client zmq sockets");
bool useReceiver = getUseReceiverFlag().squash(false);
bool previouslyReceiverStreaming = false;
bool prevRxZmq = false;
if (useReceiver) {
previouslyReceiverStreaming = getRxZmqDataStream(pos).squash(true);
prevRxZmq = getRxZmqDataStream(pos).squash(true);
}
pimpl->Parallel(&Module::setNumberofUDPInterfaces, pos, n);
// ensure receiver zmq socket ports are multiplied by 2 (2 interfaces)
@ -590,11 +592,11 @@ void Detector::setNumberofUDPInterfaces(int n, Positions pos) {
setRxZmqPort(startingPort, -1);
}
// redo the zmq sockets if enabled
if (previouslyClientStreaming != 0) {
pimpl->enableDataStreamingToClient(0);
pimpl->enableDataStreamingToClient(1);
if (prevClientZmq) {
setClientZmq(false);
setClientZmq(true);
}
if (previouslyReceiverStreaming) {
if (prevRxZmq) {
setRxZmqDataStream(false, pos);
setRxZmqDataStream(true, pos);
}
@ -987,6 +989,8 @@ Result<int> Detector::getRxZmqPort(Positions pos) const {
}
void Detector::setRxZmqPort(int port, int module_id) {
bool previouslyReceiverStreaming =
getRxZmqDataStream({module_id}).squash(false);
if (module_id == -1) {
for (int idet = 0; idet < size(); ++idet) {
pimpl->Parallel1(&Receiver::setZmqPort, {idet},
@ -996,6 +1000,10 @@ void Detector::setRxZmqPort(int port, int module_id) {
pimpl->Parallel1(&Receiver::setZmqPort, {module_id},
{}, port++);
}
if (previouslyReceiverStreaming) {
setRxZmqDataStream(false, {module_id});
setRxZmqDataStream(true, {module_id});
}
}
Result<IpAddr> Detector::getRxZmqIP(Positions pos) const {
@ -1011,11 +1019,25 @@ void Detector::setRxZmqIP(const IpAddr ip, Positions pos) {
}
}
Result<bool> Detector::getClientZmq(Positions pos) const {
return pimpl->Parallel3(&Receiver::getClientZmq);
}
void Detector::setClientZmq(const bool enable, Positions pos) {
try {
pimpl->Parallel3(&Receiver::setClientZmq, enable);
} catch (...) {
throw;
}
}
Result<int> Detector::getClientZmqPort(Positions pos) const {
return pimpl->Parallel1(&Receiver::getClientZmqPort, pos, {});
}
void Detector::setClientZmqPort(int port, int module_id) {
bool prevClientZmq = getClientZmq().tsquash(
"Inconsistent number of client zmq sockets");
if (module_id == -1) {
for (int idet = 0; idet < size(); ++idet) {
pimpl->Parallel1(&Receiver::setClientZmqPort, {idet},
@ -1025,6 +1047,10 @@ void Detector::setClientZmqPort(int port, int module_id) {
pimpl->Parallel1(&Receiver::setClientZmqPort, {module_id},
{}, port);// FIXME: Needs a clientzmqport2
}
if (prevClientZmq) {
setClientZmq(false);
setClientZmq(true);
}
}
Result<IpAddr> Detector::getClientZmqIp(Positions pos) const {
@ -1032,11 +1058,12 @@ Result<IpAddr> Detector::getClientZmqIp(Positions pos) const {
}
void Detector::setClientZmqIp(const IpAddr ip, Positions pos) {
int previouslyClientStreaming = pimpl->enableDataStreamingToClient(-1);
bool prevClientZmq = getClientZmq().tsquash(
"Inconsistent number of client zmq sockets");
pimpl->Parallel3(&Receiver::setClientZmqIP, ip);
if (previouslyClientStreaming != 0) {
pimpl->enableDataStreamingToClient(0);
pimpl->enableDataStreamingToClient(1);
if (prevClientZmq) {
setClientZmq(false);
setClientZmq(true);
}
}

View File

@ -115,7 +115,6 @@ void DetectorImpl::freeSharedMemory(int detectorId, int moduleId) {
}
void DetectorImpl::freeSharedMemory() {
zmqSocket.clear();
for (auto &d : detectors) {
d->freeSharedMemory();
}
@ -134,7 +133,6 @@ void DetectorImpl::freeSharedMemory() {
receivers2.clear();
// clear multi detector shm
detector_shm.RemoveSharedMemory();
client_downstream = false;
}
std::string DetectorImpl::getUserDetails() {
@ -212,7 +210,6 @@ void DetectorImpl::initializeDetectorStructure() {
void DetectorImpl::initializeMembers(bool verify) {
// DetectorImpl
zmqSocket.clear();
int numModules = detector_shm()->numberOfModules;
// get objects from single det shared memory (open)
@ -600,58 +597,6 @@ void DetectorImpl::setGapPixelsinCallback(const bool enable) {
detector_shm()->gapPixels = enable;
}
int DetectorImpl::createReceivingDataSockets(const bool destroy) {
if (destroy) {
LOG(logINFO) << "Going to destroy data sockets";
// close socket
zmqSocket.clear();
client_downstream = false;
LOG(logINFO) << "Destroyed Receiving Data Socket(s)";
return OK;
}
if (client_downstream) {
return OK;
}
LOG(logINFO) << "Going to create data sockets";
size_t numSockets = detectors.size();
size_t numSocketsPerDetector = 1;
if (detector_shm()->multiDetectorType == EIGER) {
numSocketsPerDetector = 2;
}
if (Parallel(&Module::getNumberofUDPInterfacesFromShm, {}).squash() ==
2) {
numSocketsPerDetector = 2;
}
numSockets *= numSocketsPerDetector;
for (size_t iSocket = 0; iSocket < numSockets; ++iSocket) {
uint32_t portnum = (receivers[iSocket / numSocketsPerDetector][0]
->getClientZmqPort());//FIXME 2 receivers
portnum += (iSocket % numSocketsPerDetector);
try {
zmqSocket.push_back(sls::make_unique<ZmqSocket>(
receivers[iSocket / numSocketsPerDetector][0]
->getClientZmqIP()
.str()
.c_str(),
portnum));
LOG(logINFO) << "Zmq Client[" << iSocket << "] at "
<< zmqSocket.back()->GetZmqServerAddress();
} catch (...) {
LOG(logERROR)
<< "Could not create Zmq socket on port " << portnum;
createReceivingDataSockets(true);
return FAIL;
}
}
client_downstream = true;
LOG(logINFO) << "Receiving Data Socket(s) created";
return OK;
}
void DetectorImpl::readFrameFromReceiver() {
bool gapPixels = detector_shm()->gapPixels;
@ -667,19 +612,28 @@ void DetectorImpl::readFrameFromReceiver() {
Parallel(&Module::getNumberofUDPInterfacesFromShm, {})
.squash(); // cannot pick up from zmq
bool runningList[zmqSocket.size()], connectList[zmqSocket.size()];
size_t nZmq = receivers.size() + receivers2.size();
std::vector<ZmqSocket*> zmqSockets;
for (size_t i = 0; i < receivers.size(); ++i) {
zmqSockets.push_back(receivers[i][0]->getZmqSocket());
if (receivers2.size()) {
zmqSockets.push_back(receivers2[i][0]->getZmqSocket());
}
}
bool runningList[nZmq], connectList[nZmq];
int numRunning = 0;
for (size_t i = 0; i < zmqSocket.size(); ++i) {
if (zmqSocket[i]->Connect() == 0) {
for (size_t i = 0; i < nZmq; ++i) {
if (zmqSockets[i]->Connect() == 0) {
connectList[i] = true;
runningList[i] = true;
++numRunning;
} else {
// to remember the list it connected to, to disconnect later
connectList[i] = false;
LOG(logERROR) << "Could not connect to socket "
<< zmqSocket[i]->GetZmqServerAddress();
runningList[i] = false;
LOG(logERROR) << "Could not connect to socket "
<< zmqSockets[i]->GetZmqServerAddress();
}
}
int numConnected = numRunning;
@ -716,7 +670,7 @@ void DetectorImpl::readFrameFromReceiver() {
completeImage = true;
// get each frame
for (unsigned int isocket = 0; isocket < zmqSocket.size(); ++isocket) {
for (unsigned int isocket = 0; isocket < nZmq; ++isocket) {
// if running
if (runningList[isocket]) {
@ -724,7 +678,7 @@ void DetectorImpl::readFrameFromReceiver() {
// HEADER
{
zmqHeader zHeader;
if (zmqSocket[isocket]->ReceiveHeader(
if (zmqSockets[isocket]->ReceiveHeader(
isocket, zHeader, SLS_DETECTOR_JSON_HEADER_VERSION) ==
0) {
// parse error, version error or end of acquisition for
@ -738,7 +692,7 @@ void DetectorImpl::readFrameFromReceiver() {
if (image == nullptr) {
// allocate
size = zHeader.imageSize;
multisize = size * zmqSocket.size();
multisize = size * nZmq;
image = new char[size];
multiframe = new char[multisize];
memset(multiframe, 0xFF, multisize);
@ -803,7 +757,7 @@ void DetectorImpl::readFrameFromReceiver() {
// DATA
data = true;
zmqSocket[isocket]->ReceiveData(isocket, image, size);
zmqSockets[isocket]->ReceiveData(isocket, image, size);
// creating multi image
{
@ -890,7 +844,7 @@ void DetectorImpl::readFrameFromReceiver() {
running = false;
} else {
// starting a new scan/measurement (got dummy data)
for (size_t i = 0; i < zmqSocket.size(); ++i) {
for (size_t i = 0; i < zmqSockets.size(); ++i) {
runningList[i] = connectList[i];
}
numRunning = numConnected;
@ -899,9 +853,9 @@ void DetectorImpl::readFrameFromReceiver() {
}
// Disconnect resources
for (size_t i = 0; i < zmqSocket.size(); ++i) {
for (size_t i = 0; i < zmqSockets.size(); ++i) {
if (connectList[i]) {
zmqSocket[i]->Disconnect();
zmqSockets[i]->Disconnect();
}
}
@ -1207,23 +1161,6 @@ int DetectorImpl::InsertGapPixels(char *image, char *&gpImage,
return imagesize;
}
bool DetectorImpl::enableDataStreamingToClient(int enable) {
if (enable >= 0) {
// destroy data threads
if (enable == 0) {
createReceivingDataSockets(true);
// create data threads
} else {
if (createReceivingDataSockets() == FAIL) {
throw RuntimeError("Could not create data threads in client.");
}
}
}
return client_downstream;
}
void DetectorImpl::registerAcquisitionFinishedCallback(void (*func)(double, int,
void *),
void *pArg) {
@ -1237,7 +1174,6 @@ void DetectorImpl::registerDataCallback(void (*userCallback)(detectorData *,
void *pArg) {
dataReady = userCallback;
pCallbackArg = pArg;
enableDataStreamingToClient(dataReady == nullptr ? 0 : 1);
}
int DetectorImpl::acquire() {

View File

@ -5,7 +5,6 @@
#include "logger.h"
#include "sls_detector_defs.h"
class ZmqSocket;
class detectorData;
#include <memory>
@ -693,13 +692,6 @@ class DetectorImpl : public virtual slsDetectorDefs {
/** [Eiger][Jungfrau] */
void setGapPixelsinCallback(const bool enable);
/**
* Enable data streaming to client
* @param enable 0 to disable, 1 to enable, -1 to get the value
* @returns data streaming to client enable
*/
bool enableDataStreamingToClient(int enable = -1);
/**
* register callback for accessing acquisition final data
* @param func function to be called at the end of the acquisition.
@ -785,13 +777,6 @@ class DetectorImpl : public virtual slsDetectorDefs {
void updateDetectorSize();
/**
* Create Receiving Data Sockets
* @param destroy is true to destroy all the sockets
* @returns OK or FAIL
*/
int createReceivingDataSockets(const bool destroy = false);
/**
* Reads frames from receiver through a constant socket
* Called during acquire() when call back registered or when using gui
@ -847,13 +832,6 @@ class DetectorImpl : public virtual slsDetectorDefs {
/** for the second udp port [Eiger][Jungfrau] */
std::vector<std::vector<std::unique_ptr<sls::Receiver>>> receivers2;
/** data streaming (down stream) enabled in client (zmq sckets created) */
bool client_downstream{false};
/** ZMQ Socket - Receiver to Client */
std::vector<std::unique_ptr<ZmqSocket>> zmqSocket;
/** semaphore to let postprocessing thread continue for next
* scan/measurement */
sem_t sem_newRTAcquisition;

View File

@ -1,10 +1,13 @@
#include "Receiver.h"
#include "ClientSocket.h"
#include "ZmqSocket.h"
#include "FixedCapacityContainer.h"
#include "string_utils.h"
#include "versionAPI.h"
#include "ToString.h"
#include "container_utils.h"
namespace sls {
// create shm
@ -144,7 +147,7 @@ sls::MacAddr Receiver::configure(slsDetectorDefs::rxParameters arg) {
arg.udpInterfaces = 2;
}
LOG(logINFO)
LOG(logDEBUG)
<< "detType:" << arg.detType << std::endl
<< "detectorSize.x:" << arg.detectorSize.x << std::endl
<< "detectorSize.y:" << arg.detectorSize.y << std::endl
@ -389,6 +392,38 @@ void Receiver::setClientZmqIP(const sls::IpAddr ip) {
shm()->zmqIp = ip;
}
bool Receiver::getClientZmq() const {
return (zmqSocket != nullptr);
}
void Receiver::setClientZmq(const bool enable) {
// destroy
if (!enable) {
if (zmqSocket != nullptr) {
zmqSocket.reset();
}
}
// create
else {
if (zmqSocket == nullptr) {
try {
zmqSocket = sls::make_unique<ZmqSocket>(
shm()->zmqIp.str().c_str(), shm()->zmqPort);
LOG(logINFO) << "Zmq Client[" << indexString << "] at "
<< zmqSocket->GetZmqServerAddress();
} catch(...) {
throw RuntimeError(
"Could not create Zmq socket [" + indexString
+ " on port " + std::to_string(shm()->zmqPort));
}
}
}
}
ZmqSocket* Receiver::getZmqSocket() {
return zmqSocket.get();
}
/** Receiver Parameters */
bool Receiver::getLock() const {

View File

@ -5,9 +5,12 @@
#include "network_utils.h"
#include <map>
#include <memory>
#define RECEIVER_SHMVERSION 0x200421
class ZmqSocket;
namespace sls {
struct sharedReceiver {
@ -105,8 +108,9 @@ class Receiver : public virtual slsDetectorDefs {
void setClientZmqPort(const int port);
sls::IpAddr getClientZmqIP() const;
void setClientZmqIP(const sls::IpAddr ip);
bool getClientZmq() const;
void setClientZmq(const bool enable);
ZmqSocket* getZmqSocket();
/**************************************************
* *
@ -248,6 +252,7 @@ class Receiver : public virtual slsDetectorDefs {
const int moduleId{0};
std::string indexString;
mutable sls::SharedMemory<sharedReceiver> shm{0, 0, 0, 0};
std::unique_ptr<ZmqSocket> zmqSocket;
};
} // sls