From 70bf6eb4cbb9ebac5fd6b39b1c3bcdd854c07664 Mon Sep 17 00:00:00 2001 From: Dhanya Maliakal Date: Thu, 30 Nov 2017 18:39:08 +0100 Subject: [PATCH] restream stop from receiver --- slsReceiverSoftware/include/DataStreamer.h | 6 +++ .../include/UDPBaseImplementation.h | 6 +++ slsReceiverSoftware/include/UDPInterface.h | 6 +++ .../include/UDPStandardImplementation.h | 6 +++ .../include/slsReceiverTCPIPInterface.h | 3 ++ .../include/sls_receiver_funcs.h | 1 + slsReceiverSoftware/src/DataStreamer.cpp | 12 +++++ .../src/UDPBaseImplementation.cpp | 5 +++ .../src/UDPStandardImplementation.cpp | 15 +++++++ .../src/slsReceiverTCPIPInterface.cpp | 45 +++++++++++++++++++ 10 files changed, 105 insertions(+) diff --git a/slsReceiverSoftware/include/DataStreamer.h b/slsReceiverSoftware/include/DataStreamer.h index cd30f68d3..cd316d55d 100644 --- a/slsReceiverSoftware/include/DataStreamer.h +++ b/slsReceiverSoftware/include/DataStreamer.h @@ -116,6 +116,12 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject { */ void CloseZmqSocket(); + /** + * Restream stop dummy packet + * @return OK or FAIL + */ + int restreamStop(); + private: /** diff --git a/slsReceiverSoftware/include/UDPBaseImplementation.h b/slsReceiverSoftware/include/UDPBaseImplementation.h index 906e18fe4..1dcb8b66a 100644 --- a/slsReceiverSoftware/include/UDPBaseImplementation.h +++ b/slsReceiverSoftware/include/UDPBaseImplementation.h @@ -541,6 +541,12 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter */ void setStreamingPort(const uint32_t i); + /** + * Restream stop dummy packet from receiver + * @return OK or FAIL + */ + int restreamStop(); + //***callback functions*** /** * Call back for start acquisition diff --git a/slsReceiverSoftware/include/UDPInterface.h b/slsReceiverSoftware/include/UDPInterface.h index bea127706..3f8d1b45c 100644 --- a/slsReceiverSoftware/include/UDPInterface.h +++ b/slsReceiverSoftware/include/UDPInterface.h @@ -624,6 +624,12 @@ class UDPInterface { */ virtual void setStreamingPort(const uint32_t i) = 0; + /** + * Restream stop dummy packet from receiver + * @return OK or FAIL + */ + virtual int restreamStop() = 0; + //***callback functions*** /** diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index 43b4ff5e6..77357c733 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -170,6 +170,12 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase */ void closeFiles(); + /** + * Restream stop dummy packet from receiver + * @return OK or FAIL + */ + int restreamStop(); + private: diff --git a/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h b/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h index 8780ab1c9..db792c02e 100644 --- a/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h +++ b/slsReceiverSoftware/include/slsReceiverTCPIPInterface.h @@ -267,6 +267,9 @@ class slsReceiverTCPIPInterface : private virtual slsReceiverDefs { /** set silent mode */ int set_silent_mode(); + /** restream stop packet */ + int restream_stop(); + /** detector type */ diff --git a/slsReceiverSoftware/include/sls_receiver_funcs.h b/slsReceiverSoftware/include/sls_receiver_funcs.h index 5b4be98b6..c7cc0cc0f 100644 --- a/slsReceiverSoftware/include/sls_receiver_funcs.h +++ b/slsReceiverSoftware/include/sls_receiver_funcs.h @@ -63,6 +63,7 @@ enum recFuncs{ F_SET_RECEIVER_STREAMING_PORT, /** < sets the receiver streaming port */ F_SET_RECEIVER_SILENT_MODE, /** < sets the receiver silent mode */ + F_RESTREAM_STOP_FROM_RECEIVER, /** < restream stop from receiver */ /* Always append functions hereafter!!! */ diff --git a/slsReceiverSoftware/src/DataStreamer.cpp b/slsReceiverSoftware/src/DataStreamer.cpp index 5c0fa0aaf..8335cd52d 100644 --- a/slsReceiverSoftware/src/DataStreamer.cpp +++ b/slsReceiverSoftware/src/DataStreamer.cpp @@ -280,3 +280,15 @@ int DataStreamer::SendHeader(sls_detector_header* header, bool dummy) { header->detType, header->version ); } + + + +int DataStreamer::restreamStop() { + //send dummy header + int ret = zmqSocket->SendHeaderData(index, true, SLS_DETECTOR_JSON_HEADER_VERSION); + if (!ret) { + FILE_LOG(logERROR) << "Could not Restream Dummy Header via ZMQ for port " << zmqSocket->GetPortNumber(); + return FAIL; + } + return OK; +} diff --git a/slsReceiverSoftware/src/UDPBaseImplementation.cpp b/slsReceiverSoftware/src/UDPBaseImplementation.cpp index 810e9c5ad..b922cffc4 100644 --- a/slsReceiverSoftware/src/UDPBaseImplementation.cpp +++ b/slsReceiverSoftware/src/UDPBaseImplementation.cpp @@ -569,6 +569,11 @@ void UDPBaseImplementation::setStreamingPort(const uint32_t i) { } +int UDPBaseImplementation::restreamStop() { + FILE_LOG(logWARNING) << __AT__ << " doing nothing..."; + FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes"; +} + /***callback functions***/ void UDPBaseImplementation::registerCallBackStartAcquisition(int (*func)(char*, char*, uint64_t, uint32_t, void*),void *arg){ startAcquisitionCallBack=func; diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index 3d49d7edc..a09bca237 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -571,6 +571,21 @@ void UDPStandardImplementation::closeFiles() { } +int UDPStandardImplementation::restreamStop() { + bool ret = OK; + for (vector::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) { + if ((*it)->restreamStop() == FAIL) + ret = FAIL; + } + + // if fail, prints in datastreamer + if (ret == OK) { + FILE_LOG(logINFO) << "Restreaming Dummy Header via ZMQ successful"; + } + + return ret; +} + void UDPStandardImplementation::SetLocalNetworkParameters() { //to increase socket receiver buffer size and max length of input queue by changing kernel settings diff --git a/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp b/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp index 563a7540f..48e64346c 100644 --- a/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp +++ b/slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp @@ -283,6 +283,7 @@ const char* slsReceiverTCPIPInterface::getFunctionName(enum recFuncs func) { case F_SEND_RECEIVER_MULTIDETSIZE: return "F_SEND_RECEIVER_MULTIDETSIZE"; case F_SET_RECEIVER_STREAMING_PORT: return "F_SET_RECEIVER_STREAMING_PORT"; case F_SET_RECEIVER_SILENT_MODE: return "F_SET_RECEIVER_SILENT_MODE"; + case F_RESTREAM_STOP_FROM_RECEIVER: return "F_RESTREAM_STOP_FROM_RECEIVER"; default: return "Unknown Function"; } } @@ -330,6 +331,7 @@ int slsReceiverTCPIPInterface::function_table(){ flist[F_SEND_RECEIVER_MULTIDETSIZE] = &slsReceiverTCPIPInterface::set_multi_detector_size; flist[F_SET_RECEIVER_STREAMING_PORT] = &slsReceiverTCPIPInterface::set_streaming_port; flist[F_SET_RECEIVER_SILENT_MODE] = &slsReceiverTCPIPInterface::set_silent_mode; + flist[F_RESTREAM_STOP_FROM_RECEIVER] = &slsReceiverTCPIPInterface::restream_stop; #ifdef VERYVERBOSE for (int i = 0; i < NUM_REC_FUNCTIONS ; i++) { FILE_LOG(logINFO) << "function fnum: " << i << " (" << getFunctionName((enum recFuncs)i) << ") located at " << (unsigned int)flist[i]; @@ -2425,3 +2427,46 @@ int slsReceiverTCPIPInterface::set_silent_mode() { // return ok/fail return ret; } + + + + +int slsReceiverTCPIPInterface::restream_stop(){ + ret = OK; + memset(mess, 0, sizeof(mess)); + + // execute action + // only a set, not a get +#ifdef SLS_RECEIVER_UDP_FUNCTIONS + if (receiverBase == NULL) + invalidReceiverObject(); + else if (mySock->differentClients && lockStatus) + receiverlocked(); + else if (receiverBase->getStatus() != IDLE) + receiverNotIdle(); + else if (receiverBase->getDataStreamEnable() == false) { + ret = FAIL; + sprintf(mess,"Could not restream stop packet as data Streaming is disabled.\n"); + FILE_LOG(logERROR) << "Warning: " << mess; + } else { + ret = receiverBase->restreamStop(); + if (ret == FAIL) { + sprintf(mess,"Could not restream stop packet.\n"); + FILE_LOG(logERROR) << "Warning: " << mess; + } + } +#endif + + if (ret == OK && mySock->differentClients) + ret = FORCE_UPDATE; + + // send answer + mySock->SendDataOnly(&ret,sizeof(ret)); + if (ret == FAIL) + mySock->SendDataOnly(mess,sizeof(mess)); + + // return ok/fail + return ret; + + +}