diff --git a/pvAccessApp/remote/beaconHandler.cpp b/pvAccessApp/remote/beaconHandler.cpp index 596ace2..47613ba 100644 --- a/pvAccessApp/remote/beaconHandler.cpp +++ b/pvAccessApp/remote/beaconHandler.cpp @@ -5,15 +5,18 @@ #include "beaconHandler.h" using namespace std; +using namespace epics::pvData; +using namespace epics::pvAccess; namespace epics { namespace pvAccess { -BeaconHandler::BeaconHandler(const ClientContextImpl* context, const osiSockAddr* responseFrom): _context(context), _responseFrom(responseFrom), _mutex(Mutex()) +BeaconHandler::BeaconHandler(ClientContextImpl* context, const osiSockAddr* responseFrom): _context(context), _responseFrom(*responseFrom), _mutex(Mutex()), _serverStartupTime(TimeStamp(0)) { } -BeaconHandler::BeaconHandler(const osiSockAddr* responseFrom): _responseFrom(responseFrom), _mutex(Mutex()) +BeaconHandler::BeaconHandler(const osiSockAddr* responseFrom): +_responseFrom(*responseFrom), _mutex(Mutex()), _serverStartupTime(TimeStamp(0)) { } @@ -24,7 +27,7 @@ BeaconHandler::~BeaconHandler() } void BeaconHandler::beaconNotify(osiSockAddr* from, int8 remoteTransportRevision, - int64 timestamp, TimeStamp* startupTime, int32 sequentalID, + TimeStamp* timestamp, TimeStamp* startupTime, int16 sequentalID, PVFieldPtr data) { bool networkChanged = updateBeacon(remoteTransportRevision, timestamp, startupTime, sequentalID); @@ -34,18 +37,17 @@ void BeaconHandler::beaconNotify(osiSockAddr* from, int8 remoteTransportRevision } } -bool BeaconHandler::updateBeacon(int8 remoteTransportRevision, int64 timestamp, - TimeStamp* startupTime, int32 sequentalID) +bool BeaconHandler::updateBeacon(int8 remoteTransportRevision, TimeStamp* timestamp, + TimeStamp* startupTime, int16 sequentalID) { Lock guard(&_mutex); // first beacon notification check - if (_serverStartupTime == NULL) + if (_serverStartupTime.getSecondsPastEpoch() == 0) { - _serverStartupTime = startupTime; + _serverStartupTime = *startupTime; // new server up.. - //TODO - //_context->beaconAnomalyNotify(); + _context->beaconAnomalyNotify(); // notify corresponding transport(s) beaconArrivalNotify(); @@ -53,11 +55,10 @@ bool BeaconHandler::updateBeacon(int8 remoteTransportRevision, int64 timestamp, return false; } - bool networkChange = !(*_serverStartupTime == *startupTime); + bool networkChange = !(_serverStartupTime == *startupTime); if (networkChange) { - //TODO - //_context->beaconAnomalyNotify(); + _context->beaconAnomalyNotify(); } else { @@ -71,8 +72,7 @@ void BeaconHandler::beaconArrivalNotify() { int32 size = 0; //TODO TCP name must be get from somewhere not hardcoded - //TODO - Transport** transports = NULL;//_context->getTransportRegistry().get("TCP", _responseFrom, size); + Transport** transports = _context->getTransportRegistry()->get("TCP", &_responseFrom, size); if (transports == NULL) { return; @@ -90,8 +90,7 @@ void BeaconHandler::changedTransport() { int32 size = 0; //TODO TCP name must be get from somewhere not hardcoded - //TODO - Transport** transports = NULL;//_context->getTransportRegistry().get("TCP", _responseFrom, size); + Transport** transports = _context->getTransportRegistry()->get("TCP", &_responseFrom, size); if (transports == NULL) { return; diff --git a/pvAccessApp/remote/beaconHandler.h b/pvAccessApp/remote/beaconHandler.h index ed9b93a..88664d6 100644 --- a/pvAccessApp/remote/beaconHandler.h +++ b/pvAccessApp/remote/beaconHandler.h @@ -6,21 +6,15 @@ #define BEACONHANDLER_H #include "remote.h" -#include "inetAddressUtil.h" -#include "pvAccess.h" +#include +#include "clientContextImpl.h" #include #include #include -#include - -using namespace epics::pvData; - namespace epics { namespace pvAccess { - class ClientContextImpl; - /** * BeaconHandler */ @@ -32,7 +26,7 @@ namespace epics { namespace pvAccess { * @param transport transport to be used to send beacons. * @param context CA context. */ - BeaconHandler(const ClientContextImpl* context, const osiSockAddr* responseFrom); + BeaconHandler(ClientContextImpl* context, const osiSockAddr* responseFrom); /** * Test Constructor (for testing) * @param transport transport to be used to send beacons. @@ -48,26 +42,29 @@ namespace epics { namespace pvAccess { * @param sequentalID sequential ID. * @param data server status data, can be NULL. */ - void beaconNotify(osiSockAddr* from, int8 remoteTransportRevision, - int64 timestamp, TimeStamp* startupTime, int32 sequentalID, - PVFieldPtr data); + void beaconNotify(osiSockAddr* from, + epics::pvData::int8 remoteTransportRevision, + epics::pvData::TimeStamp* timestamp, + epics::pvData::TimeStamp* startupTime, + epics::pvData::int16 sequentalID, + epics::pvData::PVFieldPtr data); private: /** * Context instance. */ - const ClientContextImpl* _context; + ClientContextImpl* _context; /** * Remote address. */ - const osiSockAddr* _responseFrom; - /** - * Server startup timestamp. - */ - TimeStamp* _serverStartupTime; + const osiSockAddr _responseFrom; /** * Mutex */ - Mutex _mutex; + epics::pvData::Mutex _mutex; + /** + * Server startup timestamp. + */ + epics::pvData::TimeStamp _serverStartupTime; /** * Update beacon. * @param remoteTransportRevision encoded (major, minor) revision. @@ -75,8 +72,10 @@ namespace epics { namespace pvAccess { * @param sequentalID sequential ID. * @return network change (server restarted) detected. */ - bool updateBeacon(int8 remoteTransportRevision, int64 timestamp, - TimeStamp* startupTime, int32 sequentalID); + bool updateBeacon(epics::pvData::int8 remoteTransportRevision, + epics::pvData::TimeStamp* timestamp, + epics::pvData::TimeStamp* startupTime, + epics::pvData::int16 sequentalID); /** * Notify transport about beacon arrival. */ diff --git a/pvAccessApp/remoteClient/clientContextImpl.h b/pvAccessApp/remoteClient/clientContextImpl.h index 439c84c..7724ffa 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.h +++ b/pvAccessApp/remoteClient/clientContextImpl.h @@ -14,7 +14,9 @@ namespace epics { namespace pvAccess { - + + class BeaconHandler; + class ChannelImpl : public Channel , public TransportClient, @@ -48,7 +50,10 @@ namespace epics { virtual Transport* getTransport(TransportClient* client, osiSockAddr* serverAddress, int16 minorRevision, int16 priority) = 0; - + + virtual void beaconAnomalyNotify() = 0; + + virtual BeaconHandler* getBeaconHandler(osiSockAddr* responseFrom) = 0; }; diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 640935d..665d1e0 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -22,7 +22,7 @@ #include #include #include - +#include #include using namespace epics::pvData; @@ -544,6 +544,74 @@ typedef std::map IOIDResponseRequestMap; }; + class BeaconResponseHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { + public: + BeaconResponseHandler(ClientContextImpl* context) : + AbstractClientResponseHandler(context, "Beacon") + { + } + + virtual ~BeaconResponseHandler() { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) + { + // reception timestamp + TimeStamp timestamp; + timestamp.getCurrent(); + + AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + + transport->ensureData((2*sizeof(int16)+2*sizeof(int32)+128)/sizeof(int8)); + + int16 sequentalID = payloadBuffer->getShort(); + TimeStamp startupTimestamp(payloadBuffer->getInt(),payloadBuffer->getInt()); + + osiSockAddr serverAddress; + serverAddress.ia.sin_family = AF_INET; + + // 128-bit IPv6 address + /* + int8* byteAddress = new int8[16]; + for (int i = 0; i < 16; i++) + byteAddress[i] = payloadBuffer->getByte(); }; + */ + + // IPv4 compatible IPv6 address expected + // first 80-bit are 0 + if (payloadBuffer->getLong() != 0) return; + if (payloadBuffer->getShort() != 0) return; + if (payloadBuffer->getShort() != (int16)0xFFFF) return; + + // accept given address if explicitly specified by sender + serverAddress.ia.sin_addr.s_addr = htonl(payloadBuffer->getInt()); + if (serverAddress.ia.sin_addr.s_addr == INADDR_ANY) + serverAddress.ia.sin_addr = responseFrom->ia.sin_addr; + + serverAddress.ia.sin_port = htons(payloadBuffer->getShort()); + + BeaconHandler* beaconHandler = _context->getBeaconHandler(responseFrom); + // currently we care only for servers used by this context + if (beaconHandler == NULL) + return; + + // extra data + PVFieldPtr data = NULL; + const FieldConstPtr field = IntrospectionRegistry::deserializeFull(payloadBuffer, transport); + if (field != NULL) + { + data = getPVDataCreate()->createPVField(NULL, field); + data->deserialize(payloadBuffer, transport); + } + + // notify beacon handler + beaconHandler->beaconNotify(responseFrom, version, ×tamp, &startupTimestamp, sequentalID, data); + + } + }; + class ConnectionValidationHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { public: ConnectionValidationHandler(ClientContextImpl* context) : @@ -1854,13 +1922,9 @@ class TestChannelImpl : public ChannelImpl { */ ResponseRequest* getResponseRequest(pvAccessID ioid) { - /* - synchronized (pendingResponseRequests) - { - return (ResponseRequest)pendingResponseRequests.get(ioid); - } - */ - return 0; + Lock guard(&m_ioidMapMutex); + IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(ioid); + return (it == m_pendingResponseRequests.end() ? 0 : it->second); } /** @@ -1870,15 +1934,10 @@ class TestChannelImpl : public ChannelImpl { */ pvAccessID registerResponseRequest(ResponseRequest* request) { - /* - synchronized (pendingResponseRequests) - { - int ioid = generateIOID(); - pendingResponseRequests.put(ioid, request); - return ioid; - } - */ - return 0; + Lock guard(&m_ioidMapMutex); + pvAccessID ioid = generateIOID(); + m_pendingResponseRequests[ioid] = request; + return ioid; } /** @@ -1888,13 +1947,14 @@ class TestChannelImpl : public ChannelImpl { */ ResponseRequest* unregisterResponseRequest(ResponseRequest* request) { - /* - synchronized (pendingResponseRequests) - { - return (ResponseRequest)pendingResponseRequests.remove(request.getIOID()); - } - */ - return 0; + Lock guard(&m_ioidMapMutex); + IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(request->getIOID()); + if (it == m_pendingResponseRequests.end()) + return 0; + + ResponseRequest* retVal = it->second; + m_pendingResponseRequests.erase(it); + return retVal; } /** @@ -1902,15 +1962,42 @@ class TestChannelImpl : public ChannelImpl { * @return IOID. */ pvAccessID generateIOID() + { + Lock guard(&m_ioidMapMutex); + + + // search first free (theoretically possible loop of death) + while (m_pendingResponseRequests.find(++m_lastIOID) != m_pendingResponseRequests.end()); + // reserve IOID + m_pendingResponseRequests[m_lastIOID] = 0; + return m_lastIOID; + } + + /** + * Called each time beacon anomaly is detected. + */ + void beaconAnomalyNotify() + { + if (m_channelSearchManager) + m_channelSearchManager->beaconAnomalyNotify(); + } + + /** + * Get (and if necessary create) beacon handler. + * @param responseFrom remote source address of received beacon. + * @return beacon handler for particular server. + */ + BeaconHandler* getBeaconHandler(osiSockAddr* responseFrom) { /* - synchronized (pendingResponseRequests) - { - // search first free (theoretically possible loop of death) - while (pendingResponseRequests.get(++lastIOID) != null || lastIOID == CAConstants.CAJ_INVALID_IOID); - // reserve IOID - pendingResponseRequests.put(lastIOID, null); - return lastIOID; + synchronized (beaconHandlers) { + BeaconHandlerImpl handler = beaconHandlers.get(responseFrom); + if (handler == null) + { + handler = new BeaconHandlerImpl(this, responseFrom); + beaconHandlers.put(responseFrom, handler); + } + return handler; } */ return 0; @@ -2105,6 +2192,11 @@ class TestChannelImpl : public ChannelImpl { typedef std::map IOIDResponseRequestMap; IOIDResponseRequestMap m_pendingResponseRequests; + /** + * IOIDResponseRequestMap mutex. + */ + Mutex m_ioidMapMutex; + /** * Last IOID cache. */