From 398f3734eca97d8a797317cf5f43f0d5297ea1de Mon Sep 17 00:00:00 2001 From: Dhanya Thattil Date: Wed, 20 Nov 2019 16:16:14 +0100 Subject: [PATCH] rxr: missing packets, stopacquistion lets rxr know to calculate missing packets from last frame caught --- slsDetectorSoftware/include/CmdProxy.h | 4 ++ slsDetectorSoftware/include/Detector.h | 2 + slsDetectorSoftware/include/slsDetector.h | 6 +++ slsDetectorSoftware/src/Detector.cpp | 4 ++ slsDetectorSoftware/src/slsDetector.cpp | 34 ++++++++++++++- .../tests/test-multiSlsDetectorClient.cpp | 6 +++ slsReceiverSoftware/include/ClientInterface.h | 1 + slsReceiverSoftware/include/Implementation.h | 3 ++ slsReceiverSoftware/include/Listener.h | 3 ++ slsReceiverSoftware/src/ClientInterface.cpp | 17 +++++++- slsReceiverSoftware/src/Implementation.cpp | 42 ++++++++++++------- slsReceiverSoftware/src/Listener.cpp | 7 ++++ slsSupportLib/include/sls_detector_funcs.h | 2 + 13 files changed, 114 insertions(+), 17 deletions(-) diff --git a/slsDetectorSoftware/include/CmdProxy.h b/slsDetectorSoftware/include/CmdProxy.h index dba132f93..ff56339c4 100644 --- a/slsDetectorSoftware/include/CmdProxy.h +++ b/slsDetectorSoftware/include/CmdProxy.h @@ -679,6 +679,7 @@ class CmdProxy { {"rx_status", &CmdProxy::rx_status}, {"status", &CmdProxy::status}, {"rx_framescaught", &CmdProxy::rx_framescaught}, + {"rx_missingpackets", &CmdProxy::rx_missingpackets}, {"startingfnum", &CmdProxy::startingfnum}, {"trigger", &CmdProxy::trigger}, @@ -1288,6 +1289,9 @@ class CmdProxy { GET_COMMAND(rx_framescaught, getFramesCaught, "\n\tNumber of frames caught by receiver."); + GET_COMMAND(rx_missingpackets, getNumMissingPackets, + "\n\tNumber of missing packets for each port in receiver."); + INTEGER_COMMAND(startingfnum, getStartingFrameNumber, setStartingFrameNumber, std::stoull, "[n_value]\n\t[Eiger[Jungfrau] Starting frame number for next acquisition."); diff --git a/slsDetectorSoftware/include/Detector.h b/slsDetectorSoftware/include/Detector.h index 6512b825e..dbb077d9e 100644 --- a/slsDetectorSoftware/include/Detector.h +++ b/slsDetectorSoftware/include/Detector.h @@ -296,6 +296,8 @@ class Detector { Result getFramesCaught(Positions pos = {}) const; + Result> getNumMissingPackets(Positions pos = {}) const; + /** [Eiger][Jungfrau] */ Result getStartingFrameNumber(Positions pos = {}) const; diff --git a/slsDetectorSoftware/include/slsDetector.h b/slsDetectorSoftware/include/slsDetector.h index 5d91b4a79..ad9d00d5d 100755 --- a/slsDetectorSoftware/include/slsDetector.h +++ b/slsDetectorSoftware/include/slsDetector.h @@ -178,6 +178,9 @@ struct sharedSlsDetector { /** num udp interfaces */ int numUDPInterfaces; + + /** stopped flag to inform rxr */ + bool stoppedFlag; }; class slsDetector : public virtual slsDetectorDefs { @@ -1611,6 +1614,9 @@ class slsDetector : public virtual slsDetectorDefs { */ int64_t getFramesCaughtByReceiver() const; + /** Gets number of missing packets */ + std::vector getNumMissingPackets() const; + /** * Gets the current frame index of receiver * @returns current frame index of receiver diff --git a/slsDetectorSoftware/src/Detector.cpp b/slsDetectorSoftware/src/Detector.cpp index da1206050..22562b36f 100644 --- a/slsDetectorSoftware/src/Detector.cpp +++ b/slsDetectorSoftware/src/Detector.cpp @@ -405,6 +405,10 @@ Result Detector::getFramesCaught(Positions pos) const { return pimpl->Parallel(&slsDetector::getFramesCaughtByReceiver, pos); } +Result> Detector::getNumMissingPackets(Positions pos) const { + return pimpl->Parallel(&slsDetector::getNumMissingPackets, pos); +} + Result Detector::getStartingFrameNumber(Positions pos) const { return pimpl->Parallel(&slsDetector::getStartingFrameNumber, pos); } diff --git a/slsDetectorSoftware/src/slsDetector.cpp b/slsDetectorSoftware/src/slsDetector.cpp index 97dfb8ef1..41cdbd032 100755 --- a/slsDetectorSoftware/src/slsDetector.cpp +++ b/slsDetectorSoftware/src/slsDetector.cpp @@ -385,6 +385,7 @@ void slsDetector::initializeDetectorStructure(detectorType type) { shm()->rxFileOverWrite = true; shm()->rxDbitOffset = 0; shm()->numUDPInterfaces = 1; + shm()->stoppedFlag = false; // get the detector parameters based on type detParameters parameters{type}; @@ -1117,6 +1118,7 @@ void slsDetector::prepareAcquisition() { void slsDetector::startAcquisition() { FILE_LOG(logDEBUG1) << "Starting Acquisition"; + shm()->stoppedFlag = false; sendToDetector(F_START_ACQUISITION); FILE_LOG(logDEBUG1) << "Starting Acquisition successful"; } @@ -1130,6 +1132,7 @@ void slsDetector::stopAcquisition() { } FILE_LOG(logDEBUG1) << "Stopping Acquisition"; sendToDetectorStop(F_STOP_ACQUISITION); + shm()->stoppedFlag = true; FILE_LOG(logDEBUG1) << "Stopping Acquisition successful"; // if rxr streaming and acquisition finished, restream dummy stop packet if ((shm()->rxUpstream) && (s == IDLE) && (r == IDLE)) { @@ -1145,6 +1148,7 @@ void slsDetector::sendSoftwareTrigger() { void slsDetector::startAndReadAll() { FILE_LOG(logDEBUG1) << "Starting and reading all frames"; + shm()->stoppedFlag = false; sendToDetector(F_START_AND_READ_ALL); FILE_LOG(logDEBUG1) << "Detector successfully finished acquisition"; } @@ -3435,7 +3439,8 @@ void slsDetector::startReceiver() { void slsDetector::stopReceiver() { FILE_LOG(logDEBUG1) << "Stopping Receiver"; if (shm()->useReceiverFlag) { - sendToReceiver(F_STOP_RECEIVER); + int arg = static_cast(shm()->stoppedFlag); + sendToReceiver(F_STOP_RECEIVER, arg, nullptr); } } @@ -3459,6 +3464,33 @@ int64_t slsDetector::getFramesCaughtByReceiver() const { return retval; } +std::vector slsDetector::getNumMissingPackets() const { + FILE_LOG(logDEBUG1) << "Getting num missing packets"; + if (shm()->useReceiverFlag) { + int fnum = F_GET_NUM_MISSING_PACKETS; + int ret = FAIL; + auto client = ReceiverSocket(shm()->rxHostname, shm()->rxTCPPort); + client.Send(&fnum, sizeof(fnum)); + client.Receive(&ret, sizeof(ret)); + if (ret == FAIL) { + char mess[MAX_STR_LENGTH]{}; + client.Receive(mess, MAX_STR_LENGTH); + throw RuntimeError("Receiver " + std::to_string(detId) + + " returned error: " + std::string(mess)); + } else { + int nports = -1; + client.Receive(&nports, sizeof(nports)); + uint64_t mp[nports]; + memset(mp, 0, sizeof(mp)); + client.Receive(mp, sizeof(mp)); + std::vector retval(mp, mp + nports); + FILE_LOG(logDEBUG1) << "Missing packets of Receiver" << detId << ": " << sls::ToString(retval); + return retval; + } + } + throw RuntimeError("No receiver to get missing packets."); +} + uint64_t slsDetector::getReceiverCurrentFrameIndex() const { uint64_t retval = -1; FILE_LOG(logDEBUG1) << "Getting Current Frame Index of Receiver"; diff --git a/slsDetectorSoftware/tests/test-multiSlsDetectorClient.cpp b/slsDetectorSoftware/tests/test-multiSlsDetectorClient.cpp index 139c38066..f2236876a 100644 --- a/slsDetectorSoftware/tests/test-multiSlsDetectorClient.cpp +++ b/slsDetectorSoftware/tests/test-multiSlsDetectorClient.cpp @@ -9,6 +9,12 @@ auto GET = slsDetectorDefs::GET_ACTION; auto PUT = slsDetectorDefs::PUT_ACTION; + + +TEST_CASE("rx_missingpackets", "[.cmd]") { + REQUIRE_NOTHROW(multiSlsDetectorClient("rx_missingpackets", GET)); +} + TEST_CASE("burstmode", "[.cmd][.gotthard2]") { if (test::type == slsDetectorDefs::GOTTHARD2) { REQUIRE_NOTHROW(multiSlsDetectorClient("burstmode 0", PUT)); diff --git a/slsReceiverSoftware/include/ClientInterface.h b/slsReceiverSoftware/include/ClientInterface.h index 39ace4847..09a3e38f2 100755 --- a/slsReceiverSoftware/include/ClientInterface.h +++ b/slsReceiverSoftware/include/ClientInterface.h @@ -77,6 +77,7 @@ class ClientInterface : private virtual slsDetectorDefs { int set_file_name(sls::ServerInterface2 &socket); int set_file_index(sls::ServerInterface2 &socket); int get_frame_index(sls::ServerInterface2 &socket); + int get_missing_packets(sls::ServerInterface2 &socket); int get_frames_caught(sls::ServerInterface2 &socket); int enable_file_write(sls::ServerInterface2 &socket); int enable_master_file_write(sls::ServerInterface2 &socket); diff --git a/slsReceiverSoftware/include/Implementation.h b/slsReceiverSoftware/include/Implementation.h index 8a485ebbc..f08c90c03 100755 --- a/slsReceiverSoftware/include/Implementation.h +++ b/slsReceiverSoftware/include/Implementation.h @@ -58,6 +58,7 @@ class Implementation : private virtual slsDetectorDefs { //***acquisition count parameters*** uint64_t getFramesCaught() const; uint64_t getAcquisitionIndex() const; + std::vector getNumMissingPackets() const; //***connection parameters*** uint32_t getUDPPortNumber() const; @@ -198,6 +199,7 @@ class Implementation : private virtual slsDetectorDefs { //***acquisition functions*** int startReceiver(std::string& err); + void setStoppedFlag(bool stopped); void stopReceiver(); void startReadout(); void shutDownUDPSockets(); @@ -291,6 +293,7 @@ class Implementation : private virtual slsDetectorDefs { uint32_t streamingPort; sls::IpAddr streamingSrcIP; std::string additionalJsonHeader; + bool stoppedFlag; //** class objects *** GeneralData *generalData; diff --git a/slsReceiverSoftware/include/Listener.h b/slsReceiverSoftware/include/Listener.h index 3ac61a0ee..67371fc1d 100755 --- a/slsReceiverSoftware/include/Listener.h +++ b/slsReceiverSoftware/include/Listener.h @@ -70,6 +70,9 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { */ uint64_t GetLastFrameIndexCaught(); + /** Get number of missing packets */ + uint64_t GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets); + //*** setters *** /** diff --git a/slsReceiverSoftware/src/ClientInterface.cpp b/slsReceiverSoftware/src/ClientInterface.cpp index 890f3d2b5..03629ce24 100755 --- a/slsReceiverSoftware/src/ClientInterface.cpp +++ b/slsReceiverSoftware/src/ClientInterface.cpp @@ -162,8 +162,9 @@ int ClientInterface::function_table(){ flist[F_SET_RECEIVER_FILE_PATH] = &ClientInterface::set_file_dir; flist[F_SET_RECEIVER_FILE_NAME] = &ClientInterface::set_file_name; flist[F_SET_RECEIVER_FILE_INDEX] = &ClientInterface::set_file_index; - flist[F_GET_RECEIVER_FRAME_INDEX] = &ClientInterface::get_frame_index; + flist[F_GET_RECEIVER_FRAME_INDEX] = &ClientInterface::get_frame_index; flist[F_GET_RECEIVER_FRAMES_CAUGHT] = &ClientInterface::get_frames_caught; + flist[F_GET_NUM_MISSING_PACKETS] = &ClientInterface::get_missing_packets; flist[F_ENABLE_RECEIVER_FILE_WRITE] = &ClientInterface::enable_file_write; flist[F_ENABLE_RECEIVER_MASTER_FILE_WRITE] = &ClientInterface::enable_master_file_write; flist[F_ENABLE_RECEIVER_OVERWRITE] = &ClientInterface::enable_overwrite; @@ -657,8 +658,10 @@ int ClientInterface::start_receiver(Interface &socket) { } int ClientInterface::stop_receiver(Interface &socket) { + auto arg = socket.Receive(); if (impl()->getStatus() == RUNNING) { FILE_LOG(logDEBUG1) << "Stopping Receiver"; + impl()->setStoppedFlag(static_cast(arg)); impl()->stopReceiver(); } auto s = impl()->getStatus(); @@ -726,6 +729,18 @@ int ClientInterface::get_frame_index(Interface &socket) { return socket.sendResult(retval); } +int ClientInterface::get_missing_packets(Interface &socket) { + std::vector m = impl()->getNumMissingPackets(); + FILE_LOG(logDEBUG1) << "missing packets:" << sls::ToString(m); + int retvalsize = m.size(); + uint64_t retval[retvalsize]; + std::copy(std::begin(m), std::end(m), retval); + socket.Send(OK); + socket.Send(&retvalsize, sizeof(retvalsize)); + socket.Send(retval, sizeof(retval)); + return OK; +} + int ClientInterface::get_frames_caught(Interface &socket) { int64_t retval = impl()->getFramesCaught(); FILE_LOG(logDEBUG1) << "frames caught:" << retval; diff --git a/slsReceiverSoftware/src/Implementation.cpp b/slsReceiverSoftware/src/Implementation.cpp index aaec89199..aa54c2397 100755 --- a/slsReceiverSoftware/src/Implementation.cpp +++ b/slsReceiverSoftware/src/Implementation.cpp @@ -103,6 +103,7 @@ void Implementation::InitializeMembers() { dataStreamEnable = false; streamingPort = 0; streamingSrcIP = 0u; + stoppedFlag = false; //** class objects *** generalData = nullptr; @@ -249,6 +250,21 @@ uint64_t Implementation::getAcquisitionIndex() const { return min; } +std::vector Implementation::getNumMissingPackets() const { + std::vector mp(numThreads); + for (int i = 0; i < numThreads; i++) { + int np = generalData->packetsPerFrame; + uint64_t totnp = np; + // partial readout + if (numLinesReadout != MAX_EIGER_ROWS_PER_READOUT) { + totnp = ((numLinesReadout * np) / MAX_EIGER_ROWS_PER_READOUT); + } + totnp *= numberOfFrames; + mp[i] = listener[i]->GetNumMissingPacket(stoppedFlag, totnp); + } + return mp; +} + /***connection parameters***/ uint32_t Implementation::getUDPPortNumber() const { FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called"; @@ -1204,6 +1220,7 @@ void Implementation::setDetectorPositionId(const int id) { int Implementation::startReceiver(std::string& err) { FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called"; FILE_LOG(logINFO) << "Starting Receiver"; + stoppedFlag = false; ResetParametersforNewAcquisition(); // listener @@ -1247,6 +1264,10 @@ int Implementation::startReceiver(std::string& err) { return OK; } +void Implementation::setStoppedFlag(bool stopped) { + stoppedFlag = stopped; +} + void Implementation::stopReceiver() { FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called"; FILE_LOG(logINFO) << "Stopping Receiver"; @@ -1297,29 +1318,20 @@ void Implementation::stopReceiver() { FILE_LOG(logINFO) << "Status: " << sls::ToString(status); { // statistics + std::vector mp = getNumMissingPackets(); uint64_t tot = 0; for (int i = 0; i < numThreads; i++) { - tot += dataProcessor[i]->GetNumFramesCaught(); - int64_t missingpackets = - numberOfFrames * generalData->packetsPerFrame - - listener[i]->GetPacketsCaught(); - - // partial readout - if (numLinesReadout != MAX_EIGER_ROWS_PER_READOUT) { - int maxnp = generalData->packetsPerFrame; - int np = ((numLinesReadout * maxnp) / MAX_EIGER_ROWS_PER_READOUT); - missingpackets = numberOfFrames * np - listener[i]->GetPacketsCaught(); - } + int nf = dataProcessor[i]->GetNumFramesCaught(); + tot += nf; TLogLevel lev = - (((int64_t)missingpackets) > 0) ? logINFORED : logINFOGREEN; + (((int64_t)mp[i]) > 0) ? logINFORED : logINFOGREEN; FILE_LOG(lev) << // udp port number could be the second if selected interface is // 2 for jungfrau "Summary of Port " << udpPortNum[i] - << "\n\tMissing Packets\t\t: " << missingpackets - << "\n\tComplete Frames\t\t: " - << dataProcessor[i]->GetNumFramesCaught() + << "\n\tMissing Packets\t\t: " << mp[i] + << "\n\tComplete Frames\t\t: " << nf << "\n\tLast Frame Caught\t: " << listener[i]->GetLastFrameIndexCaught(); } diff --git a/slsReceiverSoftware/src/Listener.cpp b/slsReceiverSoftware/src/Listener.cpp index 2fd6eed83..8c0ae5764 100755 --- a/slsReceiverSoftware/src/Listener.cpp +++ b/slsReceiverSoftware/src/Listener.cpp @@ -88,6 +88,13 @@ uint64_t Listener::GetLastFrameIndexCaught() { return lastCaughtFrameIndex; } +uint64_t Listener::GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) { + if (!stoppedFlag) { + return (numPackets - numPacketsCaught); + } + return (lastCaughtFrameIndex - firstIndex + 1) * generalData->packetsPerFrame - numPacketsCaught; +} + /** setters */ void Listener::StartRunning() { runningFlag = true; diff --git a/slsSupportLib/include/sls_detector_funcs.h b/slsSupportLib/include/sls_detector_funcs.h index 7162fefa8..5f6d020e4 100755 --- a/slsSupportLib/include/sls_detector_funcs.h +++ b/slsSupportLib/include/sls_detector_funcs.h @@ -214,6 +214,7 @@ enum detFuncs{ F_SET_RECEIVER_FILE_INDEX, F_GET_RECEIVER_FRAME_INDEX, F_GET_RECEIVER_FRAMES_CAUGHT, + F_GET_NUM_MISSING_PACKETS, F_ENABLE_RECEIVER_FILE_WRITE, F_ENABLE_RECEIVER_MASTER_FILE_WRITE, F_ENABLE_RECEIVER_OVERWRITE, @@ -463,6 +464,7 @@ static const char* getFunctionNameFromEnum(enum detFuncs func) { case F_SET_RECEIVER_FILE_INDEX: return "F_SET_RECEIVER_FILE_INDEX"; case F_GET_RECEIVER_FRAME_INDEX: return "F_GET_RECEIVER_FRAME_INDEX"; case F_GET_RECEIVER_FRAMES_CAUGHT: return "F_GET_RECEIVER_FRAMES_CAUGHT"; + case F_GET_NUM_MISSING_PACKETS: return "F_GET_NUM_MISSING_PACKETS"; case F_ENABLE_RECEIVER_FILE_WRITE: return "F_ENABLE_RECEIVER_FILE_WRITE"; case F_ENABLE_RECEIVER_MASTER_FILE_WRITE: return "F_ENABLE_RECEIVER_MASTER_FILE_WRITE"; case F_ENABLE_RECEIVER_OVERWRITE: return "F_ENABLE_RECEIVER_OVERWRITE";