From d19cf15a0ec481e52433a9ad7a6980bb952a224c Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Mon, 9 Jun 2014 08:25:47 +0200 Subject: [PATCH] started protocol changes --- pvAccessApp/pva/pvaConstants.h | 2 +- pvAccessApp/remote/beaconHandler.cpp | 54 +++++++++++-------- pvAccessApp/remote/beaconHandler.h | 41 +++++++++----- pvAccessApp/remote/remote.h | 5 ++ .../remoteClient/clientContextImpl.cpp | 22 +++++--- pvAccessApp/remoteClient/clientContextImpl.h | 2 +- pvAccessApp/server/beaconEmitter.cpp | 32 ++++------- pvAccessApp/server/beaconEmitter.h | 22 ++++---- pvAccessApp/server/serverContext.cpp | 19 ++++++- pvAccessApp/server/serverContext.h | 19 +++++++ 10 files changed, 141 insertions(+), 77 deletions(-) diff --git a/pvAccessApp/pva/pvaConstants.h b/pvAccessApp/pva/pvaConstants.h index d95f088..2b95c24 100644 --- a/pvAccessApp/pva/pvaConstants.h +++ b/pvAccessApp/pva/pvaConstants.h @@ -26,7 +26,7 @@ namespace pvAccess { const epics::pvData::int8 PVA_MAGIC = static_cast(0xCA); /** PVA protocol revision (implemented by this library). */ - const epics::pvData::int8 PVA_PROTOCOL_REVISION = 0; + const epics::pvData::int8 PVA_PROTOCOL_REVISION = 1; /** PVA version signature used to report this implementation version in header. */ const epics::pvData::int8 PVA_VERSION = PVA_PROTOCOL_REVISION; diff --git a/pvAccessApp/remote/beaconHandler.cpp b/pvAccessApp/remote/beaconHandler.cpp index 5bcaacc..8fafdd8 100644 --- a/pvAccessApp/remote/beaconHandler.cpp +++ b/pvAccessApp/remote/beaconHandler.cpp @@ -15,45 +15,43 @@ namespace epics { namespace pvAccess { BeaconHandler::BeaconHandler(Context::shared_pointer const & context, + std::string const & protocol, const osiSockAddr* responseFrom) : _context(Context::weak_pointer(context)), + _protocol(protocol), _responseFrom(*responseFrom), _mutex(), - _serverStartupTime(0) -{ - -} - -BeaconHandler::BeaconHandler(const osiSockAddr* responseFrom) : - _responseFrom(*responseFrom), - _mutex(), - _serverStartupTime(0) + _serverGUID(), + _serverChangeCount(-1), + _first(true) { } BeaconHandler::~BeaconHandler() { - } void BeaconHandler::beaconNotify(osiSockAddr* /*from*/, int8 remoteTransportRevision, - TimeStamp* timestamp, TimeStamp* startupTime, int16 sequentalID, + TimeStamp* timestamp, GUID const & guid, int16 sequentalID, + int16 changeCount, PVFieldPtr /*data*/) { - bool networkChanged = updateBeacon(remoteTransportRevision, timestamp, startupTime, sequentalID); + bool networkChanged = updateBeacon(remoteTransportRevision, timestamp, guid, sequentalID, changeCount); if (networkChanged) changedTransport(); } bool BeaconHandler::updateBeacon(int8 /*remoteTransportRevision*/, TimeStamp* /*timestamp*/, - TimeStamp* startupTime, int16 /*sequentalID*/) + GUID const & guid, int16 /*sequentalID*/, int16 changeCount) { - Lock guard(_mutex); - // first beacon notification check - if (_serverStartupTime.getSecondsPastEpoch() == 0) - { - _serverStartupTime = *startupTime; + Lock guard(_mutex); + // first beacon notification check + if (_first) + { + _first = false; + _serverGUID = guid; + _serverChangeCount = changeCount; // new server up.. _context.lock()->newServerDetected(); @@ -61,25 +59,35 @@ bool BeaconHandler::updateBeacon(int8 /*remoteTransportRevision*/, TimeStamp* /* return false; } - bool networkChange = !(_serverStartupTime == *startupTime); + bool networkChange = (memcmp(_serverGUID.value, guid.value, sizeof(guid.value)) != 0); if (networkChange) { - // update startup time - _serverStartupTime = *startupTime; + // update startup time and change count + _serverGUID = guid; + _serverChangeCount = changeCount; _context.lock()->newServerDetected(); return true; } + else if (_serverChangeCount != changeCount) + { + // update change count + _serverChangeCount = changeCount; + + // TODO be more specific (possible optimizations) + _context.lock()->newServerDetected(); + + return true; + } return false; } void BeaconHandler::changedTransport() { - // TODO why only TCP, actually TCP does not need this auto_ptr transports = - _context.lock()->getTransportRegistry()->get("TCP", &_responseFrom); + _context.lock()->getTransportRegistry()->get(_protocol, &_responseFrom); if (!transports.get()) return; diff --git a/pvAccessApp/remote/beaconHandler.h b/pvAccessApp/remote/beaconHandler.h index d4fdc37..941114a 100644 --- a/pvAccessApp/remote/beaconHandler.h +++ b/pvAccessApp/remote/beaconHandler.h @@ -38,36 +38,37 @@ namespace pvAccess { /** * Constructor. - * @param transport transport to be used to send beacons. - * @param context PVA context. */ - BeaconHandler(Context::shared_pointer const & context, const osiSockAddr* responseFrom); - /** - * Test Constructor (for testing) - * @param transport transport to be used to send beacons. - */ - BeaconHandler(const osiSockAddr* responseFrom); + BeaconHandler(Context::shared_pointer const & context, std::string const & protocol, + const osiSockAddr* responseFrom); + virtual ~BeaconHandler(); + /** * Update beacon period and do analitical checks (server restared, routing problems, etc.) * @param from who is notifying. * @param remoteTransportRevision encoded (major, minor) revision. - * @param timestamp time when beacon was received. - * @param startupTime server (reported) startup time. + * @param guid server GUID. * @param sequentalID sequential ID. + * @param changeCount change count. * @param data server status data, can be NULL. */ void beaconNotify(osiSockAddr* from, epics::pvData::int8 remoteTransportRevision, epics::pvData::TimeStamp* timestamp, - epics::pvData::TimeStamp* startupTime, + GUID const &guid, epics::pvData::int16 sequentalID, + epics::pvData::int16 changeCount, epics::pvData::PVFieldPtr data); private: /** * Context instance. */ Context::weak_pointer _context; + /** + * The procotol (transport), "tcp" for pvAccess TCP/IP. + */ + std::string _protocol; /** * Remote address. */ @@ -76,21 +77,33 @@ namespace pvAccess { * Mutex */ epics::pvData::Mutex _mutex; + /** + * Server GUID. + */ + GUID _serverGUID; /** * Server startup timestamp. */ - epics::pvData::TimeStamp _serverStartupTime; + epics::pvData::int16 _serverChangeCount; + /** + * First beacon flag. + */ + bool _first; + /** * Update beacon. * @param remoteTransportRevision encoded (major, minor) revision. * @param timestamp time when beacon was received. + * @param guid server GUID. * @param sequentalID sequential ID. + * @param changeCount change count. * @return network change (server restarted) detected. */ bool updateBeacon(epics::pvData::int8 remoteTransportRevision, epics::pvData::TimeStamp* timestamp, - epics::pvData::TimeStamp* startupTime, - epics::pvData::int16 sequentalID); + GUID const &guid, + epics::pvData::int16 sequentalID, + epics::pvData::int16 changeCount); /** * Changed transport (server restarted) notify. */ diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 5a53b3a..af2e0ad 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -41,6 +41,11 @@ namespace epics { #define PVACCESS_REFCOUNT_MONITOR_DESTRUCT(name) class TransportRegistry; + + /** + * Globally unique ID. + */ + typedef struct { char value[12]; } GUID; enum QoS { /** diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index 80b601e..d778059 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -2640,10 +2640,13 @@ namespace epics { AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); - transport->ensureData((2*sizeof(int16)+2*sizeof(int32)+128)/sizeof(int8)); + transport->ensureData(12+2+2+16+2); + + GUID guid; + payloadBuffer->get(guid.value, 0, sizeof(guid.value)); int16 sequentalID = payloadBuffer->getShort(); - TimeStamp startupTimestamp(payloadBuffer->getLong(),payloadBuffer->getInt()); + int16 changeCount = payloadBuffer->getShort(); osiSockAddr serverAddress; serverAddress.ia.sin_family = AF_INET; @@ -2667,13 +2670,15 @@ namespace epics { serverAddress.ia.sin_addr = responseFrom->ia.sin_addr; serverAddress.ia.sin_port = htons(payloadBuffer->getShort()); + + std::string protocol = SerializeHelper::deserializeString(payloadBuffer, transport.get()); // TODO optimize ClientContextImpl::shared_pointer context = _context.lock(); if (!context) return; - std::tr1::shared_ptr beaconHandler = context->getBeaconHandler(responseFrom); + std::tr1::shared_ptr beaconHandler = context->getBeaconHandler(protocol, responseFrom); // currently we care only for servers used by this context if (beaconHandler == 0) return; @@ -2688,7 +2693,7 @@ namespace epics { } // notify beacon handler - beaconHandler->beaconNotify(responseFrom, version, ×tamp, &startupTimestamp, sequentalID, data); + beaconHandler->beaconNotify(responseFrom, version, ×tamp, guid, sequentalID, changeCount, data); } }; @@ -4342,17 +4347,22 @@ TODO /** * Get (and if necessary create) beacon handler. + * @param protocol the protocol. * @param responseFrom remote source address of received beacon. * @return beacon handler for particular server. */ - BeaconHandler::shared_pointer getBeaconHandler(osiSockAddr* responseFrom) + BeaconHandler::shared_pointer getBeaconHandler(std::string const & protocol, osiSockAddr* responseFrom) { + // TODO !!! protocol !!! + if (protocol != "tcp") + return BeaconHandler::shared_pointer(); + Lock guard(m_beaconMapMutex); AddressBeaconHandlerMap::iterator it = m_beaconHandlers.find(*responseFrom); BeaconHandler::shared_pointer handler; if (it == m_beaconHandlers.end()) { - handler.reset(new BeaconHandler(shared_from_this(), responseFrom)); + handler.reset(new BeaconHandler(shared_from_this(), protocol, responseFrom)); m_beaconHandlers[*responseFrom] = handler; } else diff --git a/pvAccessApp/remoteClient/clientContextImpl.h b/pvAccessApp/remoteClient/clientContextImpl.h index 3eb48d3..817aa92 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.h +++ b/pvAccessApp/remoteClient/clientContextImpl.h @@ -121,7 +121,7 @@ namespace epics { virtual void newServerDetected() = 0; - virtual std::tr1::shared_ptr getBeaconHandler(osiSockAddr* responseFrom) = 0; + virtual std::tr1::shared_ptr getBeaconHandler(std::string const & protocol, osiSockAddr* responseFrom) = 0; virtual void configure(epics::pvData::PVStructure::shared_pointer configuration) = 0; virtual void flush() = 0; diff --git a/pvAccessApp/server/beaconEmitter.cpp b/pvAccessApp/server/beaconEmitter.cpp index 7208ce8..2cfef19 100644 --- a/pvAccessApp/server/beaconEmitter.cpp +++ b/pvAccessApp/server/beaconEmitter.cpp @@ -25,10 +25,12 @@ const float BeaconEmitter::EPICS_PVA_MIN_BEACON_PERIOD = 1.0; const float BeaconEmitter::EPICS_PVA_MIN_BEACON_COUNT_LIMIT = 3.0; //BeaconEmitter::BeaconEmitter(Transport::shared_pointer const & transport, ServerContextImpl::shared_pointer const & context) : -BeaconEmitter::BeaconEmitter(Transport::shared_pointer const & transport, std::tr1::shared_ptr& context) : +BeaconEmitter::BeaconEmitter(std::string const & protocol, + Transport::shared_pointer const & transport, std::tr1::shared_ptr& context) : + _protocol(protocol), _transport(transport), _beaconSequenceID(0), - _startupTime(), + _guid(context->getGUID()), _fastBeaconPeriod(std::max(context->getBeaconPeriod(), EPICS_PVA_MIN_BEACON_PERIOD)), _slowBeaconPeriod(std::max(180.0, _fastBeaconPeriod)), // TODO configurable _beaconCountLimit((int16)std::max(10.0f, EPICS_PVA_MIN_BEACON_COUNT_LIMIT)), // TODO configurable @@ -37,22 +39,6 @@ BeaconEmitter::BeaconEmitter(Transport::shared_pointer const & transport, std::t _serverStatusProvider(context->getBeaconServerStatusProvider()), _timer(context->getTimer()) { - _startupTime.getCurrent(); -} - -BeaconEmitter::BeaconEmitter(Transport::shared_pointer const & transport, const osiSockAddr& serverAddress) : - _transport(transport), - _beaconSequenceID(0), - _startupTime(), - _fastBeaconPeriod(EPICS_PVA_MIN_BEACON_PERIOD), - _slowBeaconPeriod(180.0), - _beaconCountLimit(10), - _serverAddress(serverAddress), - _serverPort(serverAddress.ia.sin_port), - _serverStatusProvider(), - _timer(new Timer("pvAccess-server timer", lowPriority)) -{ - _startupTime.getCurrent(); } BeaconEmitter::~BeaconEmitter() @@ -89,16 +75,20 @@ void BeaconEmitter::send(ByteBuffer* buffer, TransportSendControl* control) } // send beacon - control->startMessage((int8)0, (sizeof(int16)+2*sizeof(int32)+128+sizeof(int16))/sizeof(int8)); + control->startMessage((int8)0, 12+2+2+16+2); + buffer->put(_guid.value, 0, sizeof(_guid.value)); buffer->putShort(_beaconSequenceID); - buffer->putLong((int64)_startupTime.getSecondsPastEpoch()); - buffer->putInt((int32)_startupTime.getNanoSeconds()); + + // TODO for now fixed changeCount + buffer->putShort(0); // NOTE: is it possible (very likely) that address is any local address ::ffff:0.0.0.0 encodeAsIPv6Address(buffer, &_serverAddress); buffer->putShort((int16)_serverPort); + SerializeHelper::serializeString(_protocol, buffer, control); + if (serverStatus) { // introspection interface + data diff --git a/pvAccessApp/server/beaconEmitter.h b/pvAccessApp/server/beaconEmitter.h index d23d93d..0c62c65 100644 --- a/pvAccessApp/server/beaconEmitter.h +++ b/pvAccessApp/server/beaconEmitter.h @@ -47,18 +47,15 @@ namespace epics { namespace pvAccess { /** * Constructor. + * @param protocol a protocol (transport) name to report. * @param transport transport to be used to send beacons. * @param context PVA context. */ -// BeaconEmitter(Transport::shared_pointer const & transport, ServerContextImpl::shared_pointer const & context); - BeaconEmitter(Transport::shared_pointer const & transport, std::tr1::shared_ptr& context); +// BeaconEmitter(std::sting const & protocol, +// Transport::shared_pointer const & transport, ServerContextImpl::shared_pointer const & context); + BeaconEmitter(std::string const & protocol, + Transport::shared_pointer const & transport, std::tr1::shared_ptr& context); - /** - * Test Constructor (ohne context) - * @param transport transport to be used to send beacons. - */ - BeaconEmitter(Transport::shared_pointer const & transport, const osiSockAddr& serverAddress); - virtual ~BeaconEmitter(); void lock(); @@ -97,6 +94,11 @@ namespace epics { namespace pvAccess { */ static const float EPICS_PVA_MIN_BEACON_COUNT_LIMIT; + /** + * Protocol. + */ + std::string _protocol; + /** * Transport. */ @@ -108,9 +110,9 @@ namespace epics { namespace pvAccess { epics::pvData::int16 _beaconSequenceID; /** - * Startup timestamp (when clients detect a change, they will consider server restarted). + * Server GUID. */ - epics::pvData::TimeStamp _startupTime; + GUID _guid; /** * Fast (at startup) beacon period (in sec). diff --git a/pvAccessApp/server/serverContext.cpp b/pvAccessApp/server/serverContext.cpp index 67995f0..b0bcc57 100644 --- a/pvAccessApp/server/serverContext.cpp +++ b/pvAccessApp/server/serverContext.cpp @@ -46,6 +46,7 @@ ServerContextImpl::ServerContextImpl(): epicsSignalInstallSigAlarmIgnore (); epicsSignalInstallSigPipeIgnore (); + generateGUID(); initializeLogger(); loadConfiguration(); } @@ -61,11 +62,26 @@ ServerContextImpl::~ServerContextImpl() dispose(); } +const GUID& ServerContextImpl::getGUID() +{ + return _guid; +} + const Version& ServerContextImpl::getVersion() { return ServerContextImpl::VERSION; } +void ServerContextImpl::generateGUID() +{ + epics::pvData::TimeStamp startupTime; + startupTime.getCurrent(); + + ByteBuffer buffer(_guid.value, sizeof(_guid.value)); + buffer.putLong(startupTime.getSecondsPastEpoch()); + buffer.putInt(startupTime.getNanoSeconds()); +} + void ServerContextImpl::initializeLogger() { //createFileLogger("serverContextImpl.log"); @@ -217,7 +233,8 @@ void ServerContextImpl::internalInitialize() // setup broadcast UDP transport initializeBroadcastTransport(); - _beaconEmitter.reset(new BeaconEmitter(_broadcastTransport, thisServerContext)); + // TODO introduce a constant + _beaconEmitter.reset(new BeaconEmitter("tcp", _broadcastTransport, thisServerContext)); } void ServerContextImpl::initializeBroadcastTransport() diff --git a/pvAccessApp/server/serverContext.h b/pvAccessApp/server/serverContext.h index bb9afd0..c7ad31a 100644 --- a/pvAccessApp/server/serverContext.h +++ b/pvAccessApp/server/serverContext.h @@ -35,6 +35,13 @@ public: * Destructor */ virtual ~ServerContext() {}; + + /** + * Returns GUID (12-byte array). + * @return GUID. + */ + virtual const GUID& getGUID() = 0; + /** * Get context implementation version. * @return version of the context implementation. @@ -115,6 +122,7 @@ public: virtual ~ServerContextImpl(); //**************** derived from ServerContext ****************// + const GUID& getGUID(); const Version& getVersion(); void initialize(ChannelProviderRegistry::shared_pointer const & channelProviderRegistry); void run(epics::pvData::int32 seconds); @@ -283,6 +291,12 @@ public: bool isChannelProviderNamePreconfigured(); private: + + /** + * Server GUID. + */ + GUID _guid; + /** * Initialization status. */ @@ -381,6 +395,11 @@ private: */ BeaconServerStatusProvider::shared_pointer _beaconServerStatusProvider; + /** + * Generate GUID. + */ + void generateGUID(); + /** * Initialize logger. */