rx_statusL unknown, but others good WIP

This commit is contained in:
maliakal_d 2020-04-17 12:09:23 +02:00
parent cfa9049ed3
commit df63a6dffe
9 changed files with 303 additions and 162 deletions

View File

@ -346,7 +346,11 @@ class Detector {
Result<defs::runStatus> getDetectorStatus(Positions pos = {}) const; Result<defs::runStatus> getDetectorStatus(Positions pos = {}) const;
Result<defs::runStatus> getReceiverStatus(Positions pos = {}) const; Result<defs::runStatus> getReceiverStatus() const;
/** interface is by 1 (primary udp interface),
* 2 for second udp interface [Eiger][Jungfrau] */
Result<defs::runStatus> getReceiverStatus(const int udpInterface,
Positions pos = {}) const;
Result<int64_t> getFramesCaught(Positions pos = {}) const; Result<int64_t> getFramesCaught(Positions pos = {}) const;

View File

@ -821,16 +821,29 @@ std::vector<std::string> CmdProxy::DacCommands() {
/* acquisition */ /* acquisition */
std::string CmdProxy::ReceiverStatus(int action) { std::string CmdProxy::ReceiverStatus(int action) {
int udpInterface = 1;
if (cmd == "rx_status") {
udpInterface = 1;
} else if (cmd == "rx_status2") {
udpInterface = 2;
} else {
throw sls::RuntimeError("Unknown command, use list to list all commands");
}
std::ostringstream os; std::ostringstream os;
os << cmd << ' '; os << cmd << ' ';
if (action == defs::HELP_ACTION) { if (action == defs::HELP_ACTION) {
if (cmd == "rx_status") {
os << "running, idle]\n\tReceiver listener status." os << "running, idle]\n\tReceiver listener status."
<< '\n'; << '\n';
} else {
os << "running, idle]\n\tReceiver listener status for second udp port."
<< '\n';
}
} else if (action == defs::GET_ACTION) { } else if (action == defs::GET_ACTION) {
if (args.size() != 0) { if (args.size() != 0) {
WrongNumberOfParameters(0); WrongNumberOfParameters(0);
} }
auto t = det->getReceiverStatus({det_id}); auto t = det->getReceiverStatus(udpInterface, {det_id});
os << OutString(t) << '\n'; os << OutString(t) << '\n';
} else if (action == defs::PUT_ACTION) { } else if (action == defs::PUT_ACTION) {
throw sls::RuntimeError("Cannot put. Did you mean to use command 'rx_start' or 'rx_stop'?"); throw sls::RuntimeError("Cannot put. Did you mean to use command 'rx_start' or 'rx_stop'?");

View File

@ -463,11 +463,11 @@ void Detector::acquire() { pimpl->acquire(); }
void Detector::clearAcquiringFlag() { pimpl->setAcquiringFlag(0); } void Detector::clearAcquiringFlag() { pimpl->setAcquiringFlag(0); }
void Detector::startReceiver() { void Detector::startReceiver() {
pimpl->Parallel(&Module::startReceiver, {}); pimpl->Parallel3(&Receiver::start);
} }
void Detector::stopReceiver() { void Detector::stopReceiver() {
pimpl->Parallel(&Module::stopReceiver, {}); pimpl->Parallel3(&Receiver::stop);
} }
void Detector::startDetector() { void Detector::startDetector() {
@ -478,15 +478,49 @@ void Detector::startDetector() {
} }
void Detector::stopDetector() { void Detector::stopDetector() {
// get status before stopping acquisition
defs::runStatus s = defs::ERROR, r = defs::ERROR;
bool restreamStop = false;
if (pimpl->isReceiverInitialized(1) && getRxZmqDataStream().squash(true)) {
s = getDetectorStatus().squash(defs::ERROR);
r = getReceiverStatus().squash(defs::ERROR);
// if rxr streaming and acquisition finished, restream dummy stop packet
if (s == defs::IDLE && r == defs::IDLE) {
restreamStop = true;
}
}
pimpl->Parallel(&Module::stopAcquisition, {}); pimpl->Parallel(&Module::stopAcquisition, {});
if (pimpl->isReceiverInitialized(1)) {
pimpl->Parallel1(&Receiver::setStoppedFlag, {}, {});
if (restreamStop) {
pimpl->Parallel1(&Receiver::restreamStop, {}, {});
}
} else if (pimpl->isReceiverInitialized(2)) {
pimpl->Parallel2(&Receiver::setStoppedFlag, {}, {});
if (restreamStop) {
pimpl->Parallel2(&Receiver::restreamStop, {}, {});
}
}
} }
Result<defs::runStatus> Detector::getDetectorStatus(Positions pos) const { Result<defs::runStatus> Detector::getDetectorStatus(Positions pos) const {
return pimpl->Parallel(&Module::getRunStatus, pos); return pimpl->Parallel(&Module::getRunStatus, pos);
} }
Result<defs::runStatus> Detector::getReceiverStatus(Positions pos) const { Result<defs::runStatus> Detector::getReceiverStatus() const {
return pimpl->Parallel(&Module::getReceiverStatus, pos); return pimpl->Parallel3(&Receiver::getStatus);
}
Result<defs::runStatus> Detector::getReceiverStatus(const int udpInterface, Positions pos) const {
switch (udpInterface) {
case 1:
return pimpl->Parallel1(&Receiver::getStatus, pos, {});
case 2:
return pimpl->Parallel2(&Receiver::getStatus, pos, {});
default:
throw RuntimeError("Invalid udp interface number " +
std::to_string(udpInterface));
}
} }
Result<int64_t> Detector::getFramesCaught(Positions pos) const { Result<int64_t> Detector::getFramesCaught(Positions pos) const {
@ -703,9 +737,9 @@ Result<bool> Detector::getUseReceiverFlag(Positions pos) const {
Result<std::string> Detector::getRxHostname(const int udpInterface, Positions pos) const { Result<std::string> Detector::getRxHostname(const int udpInterface, Positions pos) const {
switch (udpInterface) { switch (udpInterface) {
case 1: case 1:
return pimpl->Parallel1(&Receiver::getHostname, pos, {0}); return pimpl->Parallel1(&Receiver::getHostname, pos, {});
case 2: case 2:
return pimpl->Parallel2(&Receiver::getHostname, pos, {0}); return pimpl->Parallel2(&Receiver::getHostname, pos, {});
default: default:
throw RuntimeError("Invalid udp interface number " + throw RuntimeError("Invalid udp interface number " +
std::to_string(udpInterface)); std::to_string(udpInterface));
@ -713,13 +747,17 @@ Result<std::string> Detector::getRxHostname(const int udpInterface, Positions po
} }
void Detector::setRxHostname(const int udpInterface, const std::string &hostname, Positions pos) { void Detector::setRxHostname(const int udpInterface, const std::string &hostname, Positions pos) {
if (getDetectorStatus(pos).squash(defs::ERROR) == defs::RUNNING) {
LOG(logWARNING) << "Acquisition already running, Stopping it.";
stopDetector();
}
if (!pimpl->isReceiverInitialized(udpInterface)) { if (!pimpl->isReceiverInitialized(udpInterface)) {
pimpl->initReceiver(udpInterface); pimpl->initReceiver(udpInterface);
} }
if (udpInterface == 1) { if (udpInterface == 1) {
pimpl->Parallel1(&Receiver::setHostname, pos, {0}, hostname); pimpl->Parallel1(&Receiver::setHostname, pos, {}, hostname);
} else { } else {
pimpl->Parallel2(&Receiver::setHostname, pos, {0}, hostname); pimpl->Parallel2(&Receiver::setHostname, pos, {}, hostname);
} }
} }
@ -729,20 +767,20 @@ void Detector::setRxHostname(const int udpInterface, const std::string &hostname
pimpl->initReceiver(udpInterface); pimpl->initReceiver(udpInterface);
} }
if (udpInterface == 1) { if (udpInterface == 1) {
pimpl->Parallel1(&Receiver::setTCPPort, {module_id}, {0}, port); pimpl->Parallel1(&Receiver::setTCPPort, {module_id}, {}, port);
pimpl->Parallel1(&Receiver::setHostname, {module_id}, {0}, hostname); pimpl->Parallel1(&Receiver::setHostname, {module_id}, {}, hostname);
} else { } else {
pimpl->Parallel2(&Receiver::setTCPPort, {module_id}, {0}, port); pimpl->Parallel2(&Receiver::setTCPPort, {module_id}, {}, port);
pimpl->Parallel2(&Receiver::setHostname, {module_id}, {0}, hostname); pimpl->Parallel2(&Receiver::setHostname, {module_id}, {}, hostname);
} }
} }
Result<int> Detector::getRxPort(const int udpInterface, Positions pos) const { Result<int> Detector::getRxPort(const int udpInterface, Positions pos) const {
switch (udpInterface) { switch (udpInterface) {
case 1: case 1:
return pimpl->Parallel1(&Receiver::getTCPPort, pos, {0}); return pimpl->Parallel1(&Receiver::getTCPPort, pos, {});
case 2: case 2:
return pimpl->Parallel2(&Receiver::getTCPPort, pos, {0}); return pimpl->Parallel2(&Receiver::getTCPPort, pos, {});
default: default:
throw RuntimeError("Invalid udp interface number " + throw RuntimeError("Invalid udp interface number " +
std::to_string(udpInterface)); std::to_string(udpInterface));
@ -757,21 +795,21 @@ void Detector::setRxPort(const int udpInterface, int port, int module_id) {
if (module_id == -1) { if (module_id == -1) {
std::vector<int> port_list = getPortNumbers(port); std::vector<int> port_list = getPortNumbers(port);
for (int idet = 0; idet < size(); ++idet) { for (int idet = 0; idet < size(); ++idet) {
pimpl->Parallel1(&Receiver::setTCPPort, {idet}, {0}, pimpl->Parallel1(&Receiver::setTCPPort, {idet}, {},
port_list[idet]); port_list[idet]);
} }
} else { } else {
pimpl->Parallel1(&Receiver::setTCPPort, {module_id}, {0}, port); pimpl->Parallel1(&Receiver::setTCPPort, {module_id}, {}, port);
} }
} else { } else {
if (module_id == -1) { if (module_id == -1) {
std::vector<int> port_list = getPortNumbers(port); std::vector<int> port_list = getPortNumbers(port);
for (int idet = 0; idet < size(); ++idet) { for (int idet = 0; idet < size(); ++idet) {
pimpl->Parallel2(&Receiver::setTCPPort, {idet}, {0}, pimpl->Parallel2(&Receiver::setTCPPort, {idet}, {},
port_list[idet]); port_list[idet]);
} }
} else { } else {
pimpl->Parallel2(&Receiver::setTCPPort, {module_id}, {0}, port); pimpl->Parallel2(&Receiver::setTCPPort, {module_id}, {}, port);
} }
} }
} }

View File

@ -1174,16 +1174,16 @@ int DetectorImpl::acquire() {
// receiver/ext process) // receiver/ext process)
sem_init(&sem_endRTAcquisition, 1, 0); sem_init(&sem_endRTAcquisition, 1, 0);
bool receiver = bool receiver1 = isReceiverInitialized(1);
Parallel(&Module::getUseReceiverFlag, {}).squash(false); bool receiver2 = isReceiverInitialized(2);
bool receiver = receiver1 || receiver2;
setJoinThreadFlag(false); setJoinThreadFlag(false);
// verify receiver is idle // verify receiver is idle
if (receiver) { if (receiver) {
if (Parallel(&Module::getReceiverStatus, {}).squash(ERROR) != if (Parallel3(&Receiver::getStatus).squash(ERROR) != IDLE) {
IDLE) { Parallel3(&Receiver::stop);
Parallel(&Module::stopReceiver, {});
} }
} }
@ -1191,7 +1191,7 @@ int DetectorImpl::acquire() {
// start receiver // start receiver
if (receiver) { if (receiver) {
Parallel(&Module::startReceiver, {}); Parallel3(&Receiver::start);
// let processing thread listen to these packets // let processing thread listen to these packets
sem_post(&sem_newRTAcquisition); sem_post(&sem_newRTAcquisition);
} }
@ -1203,13 +1203,13 @@ int DetectorImpl::acquire() {
} }
Parallel(&Module::startAndReadAll, {}); Parallel(&Module::startAndReadAll, {});
} catch (...) { } catch (...) {
Parallel(&Module::stopReceiver, {}); Parallel3(&Receiver::stop);
throw; throw;
} }
// stop receiver // stop receiver
if (receiver) { if (receiver) {
Parallel(&Module::stopReceiver, {}); Parallel3(&Receiver::stop);
if (dataReady != nullptr) { if (dataReady != nullptr) {
sem_wait(&sem_endRTAcquisition); // waits for receiver's sem_wait(&sem_endRTAcquisition); // waits for receiver's
} }
@ -1226,7 +1226,7 @@ int DetectorImpl::acquire() {
if (acquisition_finished != nullptr) { if (acquisition_finished != nullptr) {
int status = Parallel(&Module::getRunStatus, {}).squash(ERROR); int status = Parallel(&Module::getRunStatus, {}).squash(ERROR);
auto a = Parallel(&Module::getReceiverProgress, {}); auto a = Parallel3(&Receiver::getProgress);
int progress = (*std::min_element (a.begin(), a.end())); int progress = (*std::min_element (a.begin(), a.end()));
acquisition_finished((double)progress, status, acqFinished_p); acquisition_finished((double)progress, status, acqFinished_p);
} }
@ -1278,7 +1278,7 @@ void DetectorImpl::processData() {
} }
} }
// get and print progress // get and print progress
double temp = (double)Parallel(&Module::getReceiverProgress, {0}).squash(); double temp = (double)Parallel1(&Receiver::getProgress, {0}, {0}).squash();
if (temp != progress) { if (temp != progress) {
printProgress(progress); printProgress(progress);
progress = temp; progress = temp;
@ -1287,7 +1287,7 @@ void DetectorImpl::processData() {
// exiting loop // exiting loop
if (getJoinThreadFlag()) { if (getJoinThreadFlag()) {
// print progress one final time before exiting // print progress one final time before exiting
progress = (double)Parallel(&Module::getReceiverProgress, {0}).squash(); progress = (double)Parallel1(&Receiver::getProgress, {0}, {0}).squash();
printProgress(progress); printProgress(progress);
break; break;
} }

View File

@ -496,6 +496,138 @@ class DetectorImpl : public virtual slsDetectorDefs {
// for all , but dont complain if receiver2 doesnt exist
template <typename RT, typename... CT>
sls::Result<RT> Parallel3(RT (sls::Receiver::*somefunc)(CT...),
typename NonDeduced<CT>::type... Args) {
if (receivers.size() == 0)
throw sls::RuntimeError("No receivers added");
std::vector<int> dPositions;
dPositions.resize(receivers.size());
std::iota(begin(dPositions), end(dPositions), 0);
std::vector<int> rxPositions;
rxPositions.resize(receivers[0].size());
std::iota(begin(rxPositions), end(rxPositions), 0);
// multiply by 2 if receivers2 exists
size_t futureSize = dPositions.size() * rxPositions.size() *
(receivers2.size() > 0 ? 2 : 1);
std::vector<std::future<RT>> futures;
futures.reserve(futureSize);
for (size_t i : dPositions) {
// each entry
for (size_t j : rxPositions) {
futures.push_back(std::async(std::launch::async, somefunc,
receivers[i][j].get(), Args...));
futures.push_back(std::async(std::launch::async, somefunc,
receivers2[i][j].get(), Args...));
}
}
sls::Result<RT> result;
result.reserve(futureSize);
for (auto &i : futures) {
result.push_back(i.get());
}
return result;
}
template <typename RT, typename... CT>
sls::Result<RT> Parallel3(RT (sls::Receiver::*somefunc)(CT...) const,
typename NonDeduced<CT>::type... Args) const {
if (receivers.size() == 0)
throw sls::RuntimeError("No receivers added");
std::vector<int> dPositions;
dPositions.resize(receivers.size());
std::iota(begin(dPositions), end(dPositions), 0);
std::vector<int> rxPositions;
rxPositions.resize(receivers[0].size());
std::iota(begin(rxPositions), end(rxPositions), 0);
// multiply by 2 if receivers2 exists
size_t futureSize = dPositions.size() * rxPositions.size() *
(receivers2.size() > 0 ? 2 : 1);
std::vector<std::future<RT>> futures;
futures.reserve(futureSize);
for (size_t i : dPositions) {
// each entry
for (size_t j : rxPositions) {
futures.push_back(std::async(std::launch::async, somefunc,
receivers[i][j].get(), Args...));
futures.push_back(std::async(std::launch::async, somefunc,
receivers2[i][j].get(), Args...));
}
}
sls::Result<RT> result;
result.reserve(futureSize);
for (auto &i : futures) {
result.push_back(i.get());
}
return result;
}
template <typename... CT>
void Parallel3(void (sls::Receiver::*somefunc)(CT...),
typename NonDeduced<CT>::type... Args) {
if (receivers.size() == 0)
throw sls::RuntimeError("No receivers added");
std::vector<int> dPositions;
dPositions.resize(receivers.size());
std::iota(begin(dPositions), end(dPositions), 0);
std::vector<int> rxPositions;
rxPositions.resize(receivers[0].size());
std::iota(begin(rxPositions), end(rxPositions), 0);
// multiply by 2 if receivers2 exists
size_t futureSize = dPositions.size() * rxPositions.size() *
(receivers2.size() > 0 ? 2 : 1);
std::vector<std::future<void>> futures;
futures.reserve(futureSize);
for (size_t i : dPositions) {
// each entry
for (size_t j : rxPositions) {
futures.push_back(std::async(std::launch::async, somefunc,
receivers[i][j].get(), Args...));
futures.push_back(std::async(std::launch::async, somefunc,
receivers2[i][j].get(), Args...));
}
}
for (auto &i : futures) {
i.get();
}
}
template <typename... CT>
void Parallel3(void (sls::Receiver::*somefunc)(CT...) const,
typename NonDeduced<CT>::type... Args) const {
if (receivers.size() == 0)
throw sls::RuntimeError("No receivers added");
std::vector<int> dPositions;
dPositions.resize(receivers.size());
std::iota(begin(dPositions), end(dPositions), 0);
std::vector<int> rxPositions;
rxPositions.resize(receivers[0].size());
std::iota(begin(rxPositions), end(rxPositions), 0);
// multiply by 2 if receivers2 exists
size_t futureSize = dPositions.size() * rxPositions.size() *
(receivers2.size() > 0 ? 2 : 1);
std::vector<std::future<void>> futures;
futures.reserve(futureSize);
for (size_t i : dPositions) {
// each entry
for (size_t j : rxPositions) {
futures.push_back(std::async(std::launch::async, somefunc,
receivers[i][j].get(), Args...));
futures.push_back(std::async(std::launch::async, somefunc,
receivers2[i][j].get(), Args...));
}
}
for (auto &i : futures) {
i.get();
}
}
/** set acquiring flag in shared memory */ /** set acquiring flag in shared memory */
void setAcquiringFlag(bool flag); void setAcquiringFlag(bool flag);

View File

@ -421,7 +421,6 @@ void Module::initializeDetectorStructure(detectorType type) {
(moduleId * ((shm()->myDetectorType == EIGER) ? 2 : 1)); (moduleId * ((shm()->myDetectorType == EIGER) ? 2 : 1));
shm()->zmqip = IpAddr{}; shm()->zmqip = IpAddr{};
shm()->numUDPInterfaces = 1; shm()->numUDPInterfaces = 1;
shm()->stoppedFlag = false;
// get the detector parameters based on type // get the detector parameters based on type
detParameters parameters{type}; detParameters parameters{type};
@ -985,28 +984,14 @@ void Module::prepareAcquisition() {
void Module::startAcquisition() { void Module::startAcquisition() {
LOG(logDEBUG1) << "Starting Acquisition"; LOG(logDEBUG1) << "Starting Acquisition";
shm()->stoppedFlag = false;
sendToDetector(F_START_ACQUISITION); sendToDetector(F_START_ACQUISITION);
LOG(logDEBUG1) << "Starting Acquisition successful"; LOG(logDEBUG1) << "Starting Acquisition successful";
} }
void Module::stopAcquisition() { void Module::stopAcquisition() {
// get status before stopping acquisition
runStatus s = ERROR, r = ERROR;
bool zmqstreaming = false;
if (shm()->useReceiver && getReceiverStreaming()) {
zmqstreaming = true;
s = getRunStatus();
r = getReceiverStatus();
}
LOG(logDEBUG1) << "Stopping Acquisition"; LOG(logDEBUG1) << "Stopping Acquisition";
sendToDetectorStop(F_STOP_ACQUISITION); sendToDetectorStop(F_STOP_ACQUISITION);
shm()->stoppedFlag = true;
LOG(logDEBUG1) << "Stopping Acquisition successful"; LOG(logDEBUG1) << "Stopping Acquisition successful";
// if rxr streaming and acquisition finished, restream dummy stop packet
if (zmqstreaming && (s == IDLE) && (r == IDLE)) {
restreamStopFromReceiver();
}
} }
void Module::sendSoftwareTrigger() { void Module::sendSoftwareTrigger() {
@ -1017,7 +1002,6 @@ void Module::sendSoftwareTrigger() {
void Module::startAndReadAll() { void Module::startAndReadAll() {
LOG(logDEBUG1) << "Starting and reading all frames"; LOG(logDEBUG1) << "Starting and reading all frames";
shm()->stoppedFlag = false;
sendToDetector(F_START_AND_READ_ALL); sendToDetector(F_START_AND_READ_ALL);
LOG(logDEBUG1) << "Detector successfully finished acquisition"; LOG(logDEBUG1) << "Detector successfully finished acquisition";
} }
@ -2995,32 +2979,6 @@ void Module::setPartialFramesPadding(bool padding) {
sendToReceiver(F_SET_RECEIVER_PADDING, arg, nullptr); sendToReceiver(F_SET_RECEIVER_PADDING, arg, nullptr);
} }
void Module::startReceiver() {
LOG(logDEBUG1) << "Starting Receiver";
shm()->stoppedFlag = false;
if (shm()->useReceiver) {
sendToReceiver(F_START_RECEIVER, nullptr, nullptr);
}
}
void Module::stopReceiver() {
LOG(logDEBUG1) << "Stopping Receiver";
if (shm()->useReceiver) {
int arg = static_cast<int>(shm()->stoppedFlag);
sendToReceiver(F_STOP_RECEIVER, arg, nullptr);
}
}
slsDetectorDefs::runStatus Module::getReceiverStatus() const {
runStatus retval = ERROR;
LOG(logDEBUG1) << "Getting Receiver Status";
if (shm()->useReceiver) {
sendToReceiver(F_GET_RECEIVER_STATUS, nullptr, retval);
LOG(logDEBUG1) << "Receiver Status: " << ToString(retval);
}
return retval;
}
int64_t Module::getFramesCaughtByReceiver() const { int64_t Module::getFramesCaughtByReceiver() const {
int64_t retval = -1; int64_t retval = -1;
LOG(logDEBUG1) << "Getting Frames Caught by Receiver"; LOG(logDEBUG1) << "Getting Frames Caught by Receiver";
@ -3068,15 +3026,6 @@ uint64_t Module::getReceiverCurrentFrameIndex() const {
return retval; return retval;
} }
int Module::getReceiverProgress() const {
int retval = -1;
if (shm()->useReceiver) {
sendToReceiver(F_GET_RECEIVER_PROGRESS, nullptr, retval);
LOG(logDEBUG1) << "Current Progress of Receiver: " << retval;
}
return retval;
}
void Module::setFileWrite(bool value) { void Module::setFileWrite(bool value) {
if (!shm()->useReceiver) { if (!shm()->useReceiver) {
throw RuntimeError("Set rx_hostname first to use receiver parameters (file write enable)"); throw RuntimeError("Set rx_hostname first to use receiver parameters (file write enable)");
@ -3208,13 +3157,6 @@ void Module::setReceiverSilentMode(bool enable) {
sendToReceiver(F_SET_RECEIVER_SILENT_MODE, arg, nullptr); sendToReceiver(F_SET_RECEIVER_SILENT_MODE, arg, nullptr);
} }
void Module::restreamStopFromReceiver() {
LOG(logDEBUG1) << "Restream stop dummy from Receiver via zmq";
if (shm()->useReceiver) {
sendToReceiver(F_RESTREAM_STOP_FROM_RECEIVER, nullptr, nullptr);
}
}
void Module::setPattern(const std::string &fname) { void Module::setPattern(const std::string &fname) {
uint64_t word; uint64_t word;
uint64_t addr = 0; uint64_t addr = 0;

View File

@ -84,9 +84,6 @@ struct sharedModule {
/** num udp interfaces */ /** num udp interfaces */
int numUDPInterfaces; int numUDPInterfaces;
/** stopped flag to inform rxr */
bool stoppedFlag;
}; };
class Module : public virtual slsDetectorDefs { class Module : public virtual slsDetectorDefs {
@ -1349,22 +1346,6 @@ class Module : public virtual slsDetectorDefs {
void setPartialFramesPadding(bool padding); void setPartialFramesPadding(bool padding);
/**
* Receiver starts listening to packets
*/
void startReceiver();
/**
* Stops the listening mode of receiver
*/
void stopReceiver();
/**
* Gets the status of the listening mode of receiver
* @returns status
*/
runStatus getReceiverStatus() const;
/** /**
* Gets the number of frames caught by receiver * Gets the number of frames caught by receiver
* @returns number of frames caught by receiver * @returns number of frames caught by receiver
@ -1379,8 +1360,6 @@ class Module : public virtual slsDetectorDefs {
* @returns current frame index of receiver * @returns current frame index of receiver
*/ */
uint64_t getReceiverCurrentFrameIndex() const; uint64_t getReceiverCurrentFrameIndex() const;
int getReceiverProgress() const;
void setFileWrite(bool value); void setFileWrite(bool value);
bool getFileWrite(); bool getFileWrite();
@ -1431,14 +1410,6 @@ class Module : public virtual slsDetectorDefs {
bool getReceiverSilentMode(); bool getReceiverSilentMode();
void setReceiverSilentMode(bool enable); void setReceiverSilentMode(bool enable);
/**
* If data streaming in receiver is enabled,
* restream the stop dummy packet from receiver
* Used usually for Moench,
* in case it is lost in network due to high data rate
*/
void restreamStopFromReceiver();
/** /**
* Opens pattern file and sends pattern to CTB * Opens pattern file and sends pattern to CTB
* @param fname pattern file to open * @param fname pattern file to open

View File

@ -2,6 +2,7 @@
#include "ClientSocket.h" #include "ClientSocket.h"
#include "string_utils.h" #include "string_utils.h"
#include "versionAPI.h" #include "versionAPI.h"
#include "ToString.h"
namespace sls { namespace sls {
@ -116,7 +117,7 @@ Receiver::Receiver(int detector_id, int module_id, int receiver_id,
shm()->shmversion = RECEIVER_SHMVERSION; shm()->shmversion = RECEIVER_SHMVERSION;
memset(shm()->hostname, 0, MAX_STR_LENGTH); memset(shm()->hostname, 0, MAX_STR_LENGTH);
shm()->tcpPort = DEFAULT_RX_PORTNO + receiver_id; shm()->tcpPort = DEFAULT_RX_PORTNO + receiver_id;
shm()->valid = false; shm()-> stoppedFlag = false;
shm()->zmqPort = DEFAULT_ZMQ_RX_PORTNO + receiver_id; shm()->zmqPort = DEFAULT_ZMQ_RX_PORTNO + receiver_id;
shm()->zmqIp = IpAddr{}; shm()->zmqIp = IpAddr{};
@ -174,7 +175,7 @@ int Receiver::getTCPPort() const {
void Receiver::setTCPPort(const int port) { void Receiver::setTCPPort(const int port) {
if (port >= 0 && port != shm()->tcpPort) { if (port >= 0 && port != shm()->tcpPort) {
if (shm()->valid) { if (strlen(shm()->hostname) != 0) {
// send to receiver to change tcpp port // send to receiver to change tcpp port
shm()->tcpPort = port; // for now shm()->tcpPort = port; // for now
} else { } else {
@ -184,10 +185,8 @@ void Receiver::setTCPPort(const int port) {
} }
void Receiver::configure() { void Receiver::configure() {
shm()->valid = false;
LOG(logINFOBLUE) << receiverId << " configured!"; LOG(logINFOBLUE) << receiverId << " configured!";
checkVersionCompatibility(); checkVersionCompatibility();
shm()->valid = true;
} }
void Receiver::checkVersionCompatibility() { void Receiver::checkVersionCompatibility() {
@ -198,4 +197,42 @@ void Receiver::checkVersionCompatibility() {
sendToReceiver(F_RECEIVER_CHECK_VERSION, arg, nullptr); sendToReceiver(F_RECEIVER_CHECK_VERSION, arg, nullptr);
} }
void Receiver::start() {
LOG(logDEBUG1) << "Starting Receiver";
shm()->stoppedFlag = false;
sendToReceiver(F_START_RECEIVER, nullptr, nullptr);
}
void Receiver::stop() {
LOG(logDEBUG1) << "Stopping Receiver";
int arg = static_cast<int>(shm()->stoppedFlag);
sendToReceiver(F_STOP_RECEIVER, arg, nullptr);
}
slsDetectorDefs::runStatus Receiver::getStatus() const {
runStatus retval = ERROR;
LOG(logDEBUG1) << "Getting Receiver Status";
sendToReceiver(F_GET_RECEIVER_STATUS, nullptr, retval);
LOG(logDEBUG1) << "Receiver Status: " << ToString(retval);
return retval;
}
int Receiver::getProgress() const {
int retval = -1;
sendToReceiver(F_GET_RECEIVER_PROGRESS, nullptr, retval);
LOG(logDEBUG1) << "Current Progress of Receiver: " << retval;
return retval;
}
void Receiver::setStoppedFlag() {
shm()->stoppedFlag = true;
}
void Receiver::restreamStop() {
LOG(logDEBUG1) << "Restream stop dummy from Receiver via zmq";
sendToReceiver(F_RESTREAM_STOP_FROM_RECEIVER, nullptr, nullptr);
}
} // namespace sls } // namespace sls

View File

@ -4,20 +4,18 @@
#include "sls_detector_defs.h" #include "sls_detector_defs.h"
#include "network_utils.h" #include "network_utils.h"
#define RECEIVER_SHMVERSION 0x200414 #define RECEIVER_SHMVERSION 0x200417
namespace sls { namespace sls {
struct sharedReceiver { struct sharedReceiver {
/* FIXED PATTERN FOR STATIC FUNCTIONS. DO NOT CHANGE, ONLY APPEND ------*/ /* FIXED PATTERN FOR STATIC FUNCTIONS. DO NOT CHANGE, ONLY APPEND ------*/
int shmversion; int shmversion;
char hostname[MAX_STR_LENGTH]; char hostname[MAX_STR_LENGTH];
int tcpPort; int tcpPort;
bool valid;
/** END OF FIXED PATTERN -----------------------------------------------*/ /** END OF FIXED PATTERN -----------------------------------------------*/
int stoppedFlag;
int zmqPort; int zmqPort;
sls::IpAddr zmqIp; sls::IpAddr zmqIp;
@ -49,8 +47,13 @@ namespace sls {
void setHostname(const std::string &hostname); void setHostname(const std::string &hostname);
int getTCPPort() const; int getTCPPort() const;
void setTCPPort(const int port); void setTCPPort(const int port);
void configure();
void checkVersionCompatibility(); void start();
void stop();
slsDetectorDefs::runStatus getStatus() const;
int getProgress() const;
void setStoppedFlag();
void restreamStop();
private: private:
/** /**
@ -97,7 +100,8 @@ namespace sls {
template <typename Ret, typename Arg> template <typename Ret, typename Arg>
Ret sendToReceiver(int fnum, const Arg &args) const; Ret sendToReceiver(int fnum, const Arg &args) const;
void configure();
void checkVersionCompatibility();
const int receiverId{0}; const int receiverId{0};
const int moduleId{0}; const int moduleId{0};
mutable sls::SharedMemory<sharedReceiver> shm{0, 0, 0, true}; mutable sls::SharedMemory<sharedReceiver> shm{0, 0, 0, true};