From 0a43e1104c85389e51cf2f85cf7f38ccd86bb7a3 Mon Sep 17 00:00:00 2001 From: Gasper Jansa Date: Mon, 3 Jan 2011 09:58:35 +0100 Subject: [PATCH] beaconEmitter and handler (not finished) --- pvAccessApp/Makefile | 9 +- pvAccessApp/remote/beaconEmitter.cpp | 147 +++++++++++++++++ pvAccessApp/remote/beaconEmitter.h | 153 ++++++++++++++++++ pvAccessApp/remote/beaconHandler.cpp | 109 +++++++++++++ pvAccessApp/remote/beaconHandler.h | 92 +++++++++++ .../remote/beaconServerStatusProvider.cpp | 50 ++++++ .../remote/beaconServerStatusProvider.h | 52 ++++++ testApp/remote/Makefile | 5 + testApp/remote/testBeaconEmitter.cpp | 71 ++++++++ testApp/remote/testBeaconHandler.cpp | 53 ++++++ testApp/utils/introspectionRegistryTest.cpp | 3 +- testApp/utils/transportRegistryTest.cpp | 10 ++ 12 files changed, 750 insertions(+), 4 deletions(-) create mode 100644 pvAccessApp/remote/beaconEmitter.cpp create mode 100644 pvAccessApp/remote/beaconEmitter.h create mode 100644 pvAccessApp/remote/beaconHandler.cpp create mode 100644 pvAccessApp/remote/beaconHandler.h create mode 100644 pvAccessApp/remote/beaconServerStatusProvider.cpp create mode 100644 pvAccessApp/remote/beaconServerStatusProvider.h create mode 100644 testApp/remote/testBeaconEmitter.cpp create mode 100644 testApp/remote/testBeaconHandler.cpp diff --git a/pvAccessApp/Makefile b/pvAccessApp/Makefile index 19a13a8..368e322 100644 --- a/pvAccessApp/Makefile +++ b/pvAccessApp/Makefile @@ -40,11 +40,14 @@ LIBSRCS += CreateRequestFactory.cpp SRC_DIRS += $(PVACCESS)/remote INC += remote.h INC += blockingUDP.h -INC += blockingTCP.h +INC += beaconEmitter.h +INC += beaconServerStatusProvider.h +INC += beaconHandler.h LIBSRCS += blockingUDPTransport.cpp LIBSRCS += blockingUDPConnector.cpp -LIBSRCS += blockingTCPTransport.cpp - +LIBSRCS += beaconEmitter.cpp +LIBSRCS += beaconServerStatusProvider.cpp +LIBSRCS += beaconHandler.cpp LIBRARY = pvAccess pvAccess_LIBS += Com diff --git a/pvAccessApp/remote/beaconEmitter.cpp b/pvAccessApp/remote/beaconEmitter.cpp new file mode 100644 index 0000000..8b3c1e5 --- /dev/null +++ b/pvAccessApp/remote/beaconEmitter.cpp @@ -0,0 +1,147 @@ +/* + * beaconEmitter.cpp + */ + +#include "beaconEmitter.h" + +using namespace std; + +namespace epics { namespace pvAccess { + +const float BeaconEmitter::EPICS_CA_MIN_BEACON_PERIOD = 1.0; +const float BeaconEmitter::EPICS_CA_MIN_BEACON_COUNT_LIMIT = 3.0; + +BeaconEmitter::BeaconEmitter(Transport* transport, ServerContext* context): _transport(transport) +{ + if(transport == NULL || context == NULL) + { + throw EpicsException("null transport or context"); + } + +/* _timer = context->getTimer(); + _logger = context->getLogger(); + _beaconSequenceID = 0; + _serverAddress = context->getServerInetAddres(); + _serverPort = context->getServerPort(); + _serverStatusProvider = context->getBeaconServerStatusProvider(); + _fastBeaconPeriod = std::max(context->getBeaconPeriod(), EPICS_CA_MIN_BEACON_PERIOD); + _slowBeaconPeriod = std::max(180.0, _fastBeaconPeriod); // TODO configurable + _beaconCountLimit = (int16)std::max(10, EPICS_CA_MIN_BEACON_COUNT_LIMIT); // TODO configurable + _startupTime = TimeStampFactory.create(System.currentTimeMillis()); + _timerNode = TimerFactory.createNode(this);*/ +} + +BeaconEmitter::BeaconEmitter(Transport* transport,const osiSockAddr* serverAddress): _transport(transport) +{ + if(transport == NULL) + { + throw EpicsException("null transport"); + } + + _timer = new Timer("pvAccess-server timer", lowPriority); + //_logger = new Loger(); + _beaconSequenceID = 0; + _serverAddress = serverAddress; + _serverPort = serverAddress->ia.sin_port; + _serverStatusProvider = NULL;//new BeaconServerStatusProvider(); + _fastBeaconPeriod = EPICS_CA_MIN_BEACON_PERIOD; + _slowBeaconPeriod = 180.0; + _beaconCountLimit = 10; + _startupTime = new TimeStamp(); + _timerNode = new TimerNode(this); +} + +BeaconEmitter::~BeaconEmitter() +{ + if(_timer) delete _timer; + if(_serverStatusProvider) delete _serverStatusProvider; + if(_startupTime) delete _startupTime; + if(_timerNode) delete _timerNode; +} + +void BeaconEmitter::lock() +{ + //noop +} + +void BeaconEmitter::unlock() +{ + //noop +} + +void BeaconEmitter::send(ByteBuffer* buffer, TransportSendControl* control) +{ + // get server status + PVFieldPtr serverStatus = NULL; + if(_serverStatusProvider != NULL) + { + try + { + serverStatus = _serverStatusProvider->getServerStatusData(); + } + catch (...) { + // we have to proctect internal code from external implementation... + //logger->log(Level.WARNING, "BeaconServerStatusProvider implementation thrown an exception.", th); + } + } + + // send beacon + control->startMessage((int8)0, (sizeof(int16)+2*sizeof(int32)+128+sizeof(int16))/sizeof(int8)); + + buffer->putShort(_beaconSequenceID); + buffer->putInt((int32)_startupTime->getSecondsPastEpoch()); + buffer->putInt((int32)_startupTime->getNanoSeconds()); + + // NOTE: is it possible (very likely) that address is any local address ::ffff:0.0.0.0 + encodeAsIPv6Address(buffer, _serverAddress); + buffer->putShort((int16)_serverPort); + + if (serverStatus != NULL) + { + // introspection interface + data + IntrospectionRegistry::serializeFull(serverStatus->getField(), buffer, control); + serverStatus->serialize(buffer, control); + } + else + { + IntrospectionRegistry::serializeFull(NULL, buffer, control); + } + control->flush(true); + + // increment beacon sequence ID + _beaconSequenceID++; + + reschedule(); +} + +void BeaconEmitter::timerStopped() +{ + //noop +} + +void BeaconEmitter::destroy() +{ + _timerNode->cancel(); +} + +void BeaconEmitter::start() +{ + _timer->scheduleAfterDelay(_timerNode, 0.0); +} + +void BeaconEmitter::reschedule() +{ + const double period = (_beaconSequenceID >= _beaconCountLimit) ? _slowBeaconPeriod : _fastBeaconPeriod; + if (period > 0) + { + _timer->scheduleAfterDelay(_timerNode, period); + } +} + +void BeaconEmitter::callback() +{ + _transport->enqueueSendRequest(this); +} + +}} + diff --git a/pvAccessApp/remote/beaconEmitter.h b/pvAccessApp/remote/beaconEmitter.h new file mode 100644 index 0000000..a3789c4 --- /dev/null +++ b/pvAccessApp/remote/beaconEmitter.h @@ -0,0 +1,153 @@ +/* + * beaconEmitter.h + */ + +#ifndef BEACONEMITTER_H +#define BEACONEMITTER_H + +#include "timer.h" +#include "remote.h" +#include "beaconServerStatusProvider.h" +#include "inetAddressUtil.h" +#include "introspectionRegistry.h" + +#include +#include + +#include +#include + +using namespace epics::pvData; + +namespace epics { namespace pvAccess { + + class ServerContext; + class Logger; + + /** + * BeaconEmitter + * + * @author gjansa + */ + class BeaconEmitter: public TransportSender, public TimerCallback + { + public: + /** + * Constructor. + * @param transport transport to be used to send beacons. + * @param context CA context. + */ + BeaconEmitter(Transport* transport, ServerContext* context); + /** + * Test Constructor (ohne context) + * @param transport transport to be used to send beacons. + */ + BeaconEmitter(Transport* transport,const osiSockAddr* serverAddress); + virtual ~BeaconEmitter(); + + /* + * @see TransportSender#lock() + */ + void lock(); + /* + * @see TransportSender#unlock() + */ + void unlock(); + + void send(ByteBuffer* buffer, TransportSendControl* control); + /** + * noop + */ + void timerStopped(); + /** + * noop + */ + void destroy(); + /** + * Start emitting. + */ + void start(); + /** + * Reschedule timer. + */ + void reschedule(); + /** + * Timer callback. + */ + void callback(); + + private: + /** + * Minimal (initial) CA beacon period (in seconds). + */ + static const float EPICS_CA_MIN_BEACON_PERIOD; + + /** + * Minimal CA beacon count limit. + */ + static const float EPICS_CA_MIN_BEACON_COUNT_LIMIT; + + /** + * Timer. + */ + Timer* _timer; + + /** + * Logger. + */ + Logger* _logger; + + /** + * Transport. + */ + Transport* _transport; + + /** + * Beacon sequence ID. + */ + int16 _beaconSequenceID; + + /** + * Startup timestamp (when clients detect a change, they will consider server restarted). + */ + TimeStamp* _startupTime; + + /** + * Fast (at startup) beacon period (in sec). + */ + double _fastBeaconPeriod; + + /** + * Slow (after beaconCountLimit is reached) beacon period (in sec). + */ + double _slowBeaconPeriod; + + /** + * Limit on number of beacons issued. + */ + int16 _beaconCountLimit; + + /** + * Server address. + */ + const osiSockAddr* _serverAddress; + + /** + * Server port. + */ + int32 _serverPort; + + /** + * Server status provider implementation (optional). + */ + BeaconServerStatusProvider* _serverStatusProvider; + + /** + * Timer task node. + */ + TimerNode* _timerNode; + }; + +}} + +#endif /* INTROSPECTIONREGISTRY_H */ diff --git a/pvAccessApp/remote/beaconHandler.cpp b/pvAccessApp/remote/beaconHandler.cpp new file mode 100644 index 0000000..88a4392 --- /dev/null +++ b/pvAccessApp/remote/beaconHandler.cpp @@ -0,0 +1,109 @@ +/* + * beaconHandler.cpp + */ + +#include "beaconHandler.h" + +using namespace std; + +namespace epics { namespace pvAccess { + +BeaconHandler::BeaconHandler(const ClientContextImpl* context, const osiSockAddr* responseFrom): _context(context), _responseFrom(responseFrom), _mutex(Mutex()) +{ + +} + +BeaconHandler::BeaconHandler(const osiSockAddr* responseFrom): _responseFrom(responseFrom), _mutex(Mutex()) +{ + +} + +BeaconHandler::~BeaconHandler() +{ + +} + +void BeaconHandler::beaconNotify(osiSockAddr* from, int8 remoteTransportRevision, + int64 timestamp, TimeStamp* startupTime, int32 sequentalID, + PVFieldPtr data) +{ + bool networkChanged = updateBeacon(remoteTransportRevision, timestamp, startupTime, sequentalID); + if(networkChanged) + { + changedTransport(); + } +} + +bool BeaconHandler::updateBeacon(int8 remoteTransportRevision, int64 timestamp, + TimeStamp* startupTime, int32 sequentalID) +{ + Lock guard(&_mutex); + // first beacon notification check + if (_serverStartupTime == NULL) + { + _serverStartupTime = startupTime; + + // new server up.. + //TODO + //_context->beaconAnomalyNotify(); + + // notify corresponding transport(s) + beaconArrivalNotify(); + + return false; + } + + bool networkChange = !(*_serverStartupTime == *startupTime); + if (networkChange) + { + //TODO + //_context->beaconAnomalyNotify(); + } + else + { + beaconArrivalNotify(); + } + + return networkChange; +} + +void BeaconHandler::beaconArrivalNotify() +{ + int32 size; + //TODO TCP name must be get from somewhere not hardcoded + //TODO + Transport** transports = NULL;//_context->getTransportRegistry().get("TCP", _responseFrom, size); + if (transports == NULL) + { + return; + } + + // notify all + for (int i = 0; i < size; i++) + { + transports[i]->aliveNotification(); + } + delete transports; +} + +void BeaconHandler::changedTransport() +{ + int32 size; + //TODO TCP name must be get from somewhere not hardcoded + //TODO + Transport** transports = NULL;//_context->getTransportRegistry().get("TCP", _responseFrom, size); + if (transports == NULL) + { + return; + } + + // notify all + for (int i = 0; i < size; i++) + { + transports[i]->changedTransport(); + } + delete transports; +} + +}} + diff --git a/pvAccessApp/remote/beaconHandler.h b/pvAccessApp/remote/beaconHandler.h new file mode 100644 index 0000000..73fa8fa --- /dev/null +++ b/pvAccessApp/remote/beaconHandler.h @@ -0,0 +1,92 @@ +/* + * beaconHandler.h + */ + +#ifndef BEACONHANDLER_H +#define BEACONHANDLER_H + +#include "remote.h" +#include "inetAddressUtil.h" +#include "pvAccess.h" + +#include +#include +#include + +#include + +using namespace epics::pvData; + +namespace epics { namespace pvAccess { + //TODO delete this + class ClientContextImpl; + /** + * BeaconHandler + */ + class BeaconHandler + { + public: + /** + * Constructor. + * @param transport transport to be used to send beacons. + * @param context CA context. + */ + BeaconHandler(const ClientContextImpl* context, const osiSockAddr* responseFrom); + /** + * Test Constructor (for testing) + * @param transport transport to be used to send beacons. + */ + BeaconHandler(const osiSockAddr* responseFrom); + virtual ~BeaconHandler(); + /** + * Update beacon period and do analitical checks (server restared, routing problems, etc.) + * @param from who is notifying. + * @param remoteTransportRevision encoded (major, minor) revision. + * @param timestamp time when beacon was received. + * @param startupTime server (reported) startup time. + * @param sequentalID sequential ID. + * @param data server status data, can be NULL. + */ + void beaconNotify(osiSockAddr* from, int8 remoteTransportRevision, + int64 timestamp, TimeStamp* startupTime, int32 sequentalID, + PVFieldPtr data); + private: + /** + * Context instance. + */ + const ClientContextImpl* _context; + /** + * Remote address. + */ + const osiSockAddr* _responseFrom; + /** + * Server startup timestamp. + */ + TimeStamp* _serverStartupTime; + /** + * Mutex + */ + Mutex _mutex; + + /** + * Update beacon. + * @param remoteTransportRevision encoded (major, minor) revision. + * @param timestamp time when beacon was received. + * @param sequentalID sequential ID. + * @return network change (server restarted) detected. + */ + bool updateBeacon(int8 remoteTransportRevision, int64 timestamp, + TimeStamp* startupTime, int32 sequentalID); + /** + * Notify transport about beacon arrival. + */ + void beaconArrivalNotify(); + /** + * Changed transport (server restarted) notify. + */ + void changedTransport(); + }; + +}} + +#endif /* INTROSPECTIONREGISTRY_H */ diff --git a/pvAccessApp/remote/beaconServerStatusProvider.cpp b/pvAccessApp/remote/beaconServerStatusProvider.cpp new file mode 100644 index 0000000..dac0026 --- /dev/null +++ b/pvAccessApp/remote/beaconServerStatusProvider.cpp @@ -0,0 +1,50 @@ +/* + * beaconServerStatusProvider.cpp + */ + +#include "beaconServerStatusProvider.h" + +namespace epics { namespace pvAccess { + +BeaconServerStatusProvider::BeaconServerStatusProvider( ServerContext* context): _context(context) +{ + if(context == NULL) + { + throw EpicsException("null context"); + } + initialize(); +} + +BeaconServerStatusProvider::BeaconServerStatusProvider() +{ + initialize(); +} + +BeaconServerStatusProvider::~BeaconServerStatusProvider() +{ +} + +void BeaconServerStatusProvider::initialize() +{ + PVDataCreate* pvDataCreate = getPVDataCreate(); + FieldCreate* fieldCreate = getFieldCreate(); + FieldConstPtrArray fields = new FieldConstPtr[6]; + // TODO hierarchy can be used... + fields[0] = fieldCreate->createScalar("connections",pvInt); + fields[1] = fieldCreate->createScalar("allocatedMemory",pvLong); + fields[2] = fieldCreate->createScalar("freeMemory",pvLong); + fields[3] = fieldCreate->createScalar("threads",pvInt); + fields[4] = fieldCreate->createScalar("deadlocks",pvInt); + fields[5] = fieldCreate->createScalar("averageSystemLoad",pvDouble); + + _status = pvDataCreate->createPVStructure(NULL,"status",6,fields); +} + +PVFieldPtr BeaconServerStatusProvider::getServerStatusData() +{ + //TODO implement + return static_cast(_status); +} + +}} + diff --git a/pvAccessApp/remote/beaconServerStatusProvider.h b/pvAccessApp/remote/beaconServerStatusProvider.h new file mode 100644 index 0000000..e6711da --- /dev/null +++ b/pvAccessApp/remote/beaconServerStatusProvider.h @@ -0,0 +1,52 @@ +/* + * beaconServerStatusProvider.h + */ + +#ifndef BEACONSERVERSTATUSPROVIDER_H +#define BEACONSERVERSTATUSPROVIDER_H + +#include "pvData.h" + +using namespace epics::pvData; + +namespace epics { namespace pvAccess { + + class ServerContext; + /** + * BeaconServerStatusProvider + */ + class BeaconServerStatusProvider + { + public: + /** + * Constructor. + * @param context CA context. + */ + BeaconServerStatusProvider(ServerContext* context); + /** + * Test Constructor (ohne context) + */ + BeaconServerStatusProvider(); + /** + * Destructor. + */ + virtual ~BeaconServerStatusProvider(); + /** + * Gets server status data. + */ + PVFieldPtr getServerStatusData(); + private: + /** + * Initialize + */ + void initialize(); + + + private: + PVStructurePtr _status; + ServerContext* _context; + }; + +}} + +#endif /* INTROSPECTIONREGISTRY_H */ diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index 6721b4e..2228d16 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -14,6 +14,11 @@ PROD_HOST += testRemoteClientImpl testRemoteClientImpl_SRCS += testRemoteClientImpl.cpp testRemoteClientImpl_LIBS += pvData pvAccess Com +PROD_HOST += testBeaconEmitter +testBeaconEmitter_SRCS += testBeaconEmitter.cpp +testBeaconEmitter_LIBS += pvData pvAccess Com + + include $(TOP)/configure/RULES #---------------------------------------- # ADD RULES AFTER THIS LINE diff --git a/testApp/remote/testBeaconEmitter.cpp b/testApp/remote/testBeaconEmitter.cpp new file mode 100644 index 0000000..ac1ec33 --- /dev/null +++ b/testApp/remote/testBeaconEmitter.cpp @@ -0,0 +1,71 @@ +/* + * testBeaconEmitter.cpp + */ + +#include "remote.h" +#include "blockingUDP.h" +#include "beaconEmitter.h" +#include "inetAddressUtil.h" + +#include + +#include +#include + +using namespace epics::pvAccess; +using namespace epics::pvData; + +class DummyResponseHandler : public ResponseHandler +{ +public: + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, int payloadSize, + ByteBuffer* payloadBuffer) + { + cout << "DummyResponseHandler::handleResponse" << endl; + } +}; + + +void testBeaconEmitter() +{ + DummyResponseHandler drh; +/* SOCKET mysocket; + if ((mysocket = socket (AF_INET, SOCK_DGRAM, 0)) == -1) + { + assert(false); + } + InetAddrVector* broadcastAddresses = getBroadcastAddresses(mysocket);*/ + + + InetAddrVector* broadcastAddresses = new InetAddrVector; + osiSockAddr* addr = new osiSockAddr; + addr->ia.sin_family = AF_INET; + addr->ia.sin_port = htons(5067); + if(inet_aton("92.50.75.255",&addr->ia.sin_addr)==0) { + cout<<"error in inet_aton()"<push_back(addr); + BlockingUDPConnector connector(true, broadcastAddresses, true); + + osiSockAddr bindAddr; + bindAddr.ia.sin_family = AF_INET; + bindAddr.ia.sin_port = htons(5066); + bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY); + Transport* transport = connector.connect(NULL, &drh, &bindAddr, 1, 50); + + cout<<"Sending beacons"<getRemoteAddress()); + beaconEmitter.start(); + + while(1) sleep(1); + + delete transport; +} + +int main(int argc, char *argv[]) +{ + testBeaconEmitter(); + return (0); +} diff --git a/testApp/remote/testBeaconHandler.cpp b/testApp/remote/testBeaconHandler.cpp new file mode 100644 index 0000000..435acac --- /dev/null +++ b/testApp/remote/testBeaconHandler.cpp @@ -0,0 +1,53 @@ +/* + * testBeaconEmitter.cpp + */ + +#include "remote.h" +#include "blockingUDP.h" +#include "beaconHandler.h" +#include "inetAddressUtil.h" + +#include + +#include +#include + +using namespace epics::pvAccess; +using namespace epics::pvData; + +class BeaconResponseHandler : public ResponseHandler +{ +public: + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, int payloadSize, + ByteBuffer* payloadBuffer) + { + cout << "DummyResponseHandler::handleResponse" << endl; + } +}; + + +void testBeaconHandler() +{ + BeacondResponseHandler brh; + BlockingUDPConnector connector(false, NULL, true); + DummyClientContext context; + + osiSockAddr bindAddr; + bindAddr.ia.sin_family = AF_INET; + bindAddr.ia.sin_port = htons(5067); + bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY); + Transport* transport = connector.connect(NULL, &brh, &bindAddr, 1, 50); + + ((BlockingUDPTransport*)transport)->start(); + + while(1) sleep(1); + + delete transport; +} + +int main(int argc, char *argv[]) +{ + testBeaconHandler(); + return (0); +} diff --git a/testApp/utils/introspectionRegistryTest.cpp b/testApp/utils/introspectionRegistryTest.cpp index bdc9e86..ccef093 100644 --- a/testApp/utils/introspectionRegistryTest.cpp +++ b/testApp/utils/introspectionRegistryTest.cpp @@ -412,7 +412,8 @@ void testSerializeStatus() //TODO why are in and out on the same address? } -int main(int argc, char *argv[]) { +int main(int argc, char *argv[]) +{ pvDataCreate = getPVDataCreate(); statusCreate = getStatusCreate(); fieldCreate = getFieldCreate(); diff --git a/testApp/utils/transportRegistryTest.cpp b/testApp/utils/transportRegistryTest.cpp index 4d3f004..872c42f 100644 --- a/testApp/utils/transportRegistryTest.cpp +++ b/testApp/utils/transportRegistryTest.cpp @@ -118,6 +118,16 @@ int main(int argc, char *argv[]) } assert(registry->numberOfActiveTransports() == 0); + for(int32 i = 0; i < address_max; i++) + { + for(int16 j = 0; j < priority_max; j++) + { + registry->put(static_cast(transportArrayIn[i * priority_max + j]));; + } + } + assert(registry->numberOfActiveTransports() == (priority_max * address_max)); + registry->clear(); + assert(registry->numberOfActiveTransports() == 0); for(int32 i = 0; i < address_max; i++) {