rxr: missing packets, stopacquistion lets rxr know to calculate missing packets from last frame caught

This commit is contained in:
2019-11-20 16:16:14 +01:00
parent f96f716f39
commit 398f3734ec
13 changed files with 114 additions and 17 deletions

View File

@ -162,8 +162,9 @@ int ClientInterface::function_table(){
flist[F_SET_RECEIVER_FILE_PATH] = &ClientInterface::set_file_dir;
flist[F_SET_RECEIVER_FILE_NAME] = &ClientInterface::set_file_name;
flist[F_SET_RECEIVER_FILE_INDEX] = &ClientInterface::set_file_index;
flist[F_GET_RECEIVER_FRAME_INDEX] = &ClientInterface::get_frame_index;
flist[F_GET_RECEIVER_FRAME_INDEX] = &ClientInterface::get_frame_index;
flist[F_GET_RECEIVER_FRAMES_CAUGHT] = &ClientInterface::get_frames_caught;
flist[F_GET_NUM_MISSING_PACKETS] = &ClientInterface::get_missing_packets;
flist[F_ENABLE_RECEIVER_FILE_WRITE] = &ClientInterface::enable_file_write;
flist[F_ENABLE_RECEIVER_MASTER_FILE_WRITE] = &ClientInterface::enable_master_file_write;
flist[F_ENABLE_RECEIVER_OVERWRITE] = &ClientInterface::enable_overwrite;
@ -657,8 +658,10 @@ int ClientInterface::start_receiver(Interface &socket) {
}
int ClientInterface::stop_receiver(Interface &socket) {
auto arg = socket.Receive<int>();
if (impl()->getStatus() == RUNNING) {
FILE_LOG(logDEBUG1) << "Stopping Receiver";
impl()->setStoppedFlag(static_cast<bool>(arg));
impl()->stopReceiver();
}
auto s = impl()->getStatus();
@ -726,6 +729,18 @@ int ClientInterface::get_frame_index(Interface &socket) {
return socket.sendResult(retval);
}
int ClientInterface::get_missing_packets(Interface &socket) {
std::vector<uint64_t> m = impl()->getNumMissingPackets();
FILE_LOG(logDEBUG1) << "missing packets:" << sls::ToString(m);
int retvalsize = m.size();
uint64_t retval[retvalsize];
std::copy(std::begin(m), std::end(m), retval);
socket.Send(OK);
socket.Send(&retvalsize, sizeof(retvalsize));
socket.Send(retval, sizeof(retval));
return OK;
}
int ClientInterface::get_frames_caught(Interface &socket) {
int64_t retval = impl()->getFramesCaught();
FILE_LOG(logDEBUG1) << "frames caught:" << retval;

View File

@ -103,6 +103,7 @@ void Implementation::InitializeMembers() {
dataStreamEnable = false;
streamingPort = 0;
streamingSrcIP = 0u;
stoppedFlag = false;
//** class objects ***
generalData = nullptr;
@ -249,6 +250,21 @@ uint64_t Implementation::getAcquisitionIndex() const {
return min;
}
std::vector<uint64_t> Implementation::getNumMissingPackets() const {
std::vector<uint64_t> mp(numThreads);
for (int i = 0; i < numThreads; i++) {
int np = generalData->packetsPerFrame;
uint64_t totnp = np;
// partial readout
if (numLinesReadout != MAX_EIGER_ROWS_PER_READOUT) {
totnp = ((numLinesReadout * np) / MAX_EIGER_ROWS_PER_READOUT);
}
totnp *= numberOfFrames;
mp[i] = listener[i]->GetNumMissingPacket(stoppedFlag, totnp);
}
return mp;
}
/***connection parameters***/
uint32_t Implementation::getUDPPortNumber() const {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
@ -1204,6 +1220,7 @@ void Implementation::setDetectorPositionId(const int id) {
int Implementation::startReceiver(std::string& err) {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
FILE_LOG(logINFO) << "Starting Receiver";
stoppedFlag = false;
ResetParametersforNewAcquisition();
// listener
@ -1247,6 +1264,10 @@ int Implementation::startReceiver(std::string& err) {
return OK;
}
void Implementation::setStoppedFlag(bool stopped) {
stoppedFlag = stopped;
}
void Implementation::stopReceiver() {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
FILE_LOG(logINFO) << "Stopping Receiver";
@ -1297,29 +1318,20 @@ void Implementation::stopReceiver() {
FILE_LOG(logINFO) << "Status: " << sls::ToString(status);
{ // statistics
std::vector<uint64_t> mp = getNumMissingPackets();
uint64_t tot = 0;
for (int i = 0; i < numThreads; i++) {
tot += dataProcessor[i]->GetNumFramesCaught();
int64_t missingpackets =
numberOfFrames * generalData->packetsPerFrame -
listener[i]->GetPacketsCaught();
// partial readout
if (numLinesReadout != MAX_EIGER_ROWS_PER_READOUT) {
int maxnp = generalData->packetsPerFrame;
int np = ((numLinesReadout * maxnp) / MAX_EIGER_ROWS_PER_READOUT);
missingpackets = numberOfFrames * np - listener[i]->GetPacketsCaught();
}
int nf = dataProcessor[i]->GetNumFramesCaught();
tot += nf;
TLogLevel lev =
(((int64_t)missingpackets) > 0) ? logINFORED : logINFOGREEN;
(((int64_t)mp[i]) > 0) ? logINFORED : logINFOGREEN;
FILE_LOG(lev) <<
// udp port number could be the second if selected interface is
// 2 for jungfrau
"Summary of Port " << udpPortNum[i]
<< "\n\tMissing Packets\t\t: " << missingpackets
<< "\n\tComplete Frames\t\t: "
<< dataProcessor[i]->GetNumFramesCaught()
<< "\n\tMissing Packets\t\t: " << mp[i]
<< "\n\tComplete Frames\t\t: " << nf
<< "\n\tLast Frame Caught\t: "
<< listener[i]->GetLastFrameIndexCaught();
}

View File

@ -88,6 +88,13 @@ uint64_t Listener::GetLastFrameIndexCaught() {
return lastCaughtFrameIndex;
}
uint64_t Listener::GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) {
if (!stoppedFlag) {
return (numPackets - numPacketsCaught);
}
return (lastCaughtFrameIndex - firstIndex + 1) * generalData->packetsPerFrame - numPacketsCaught;
}
/** setters */
void Listener::StartRunning() {
runningFlag = true;