beaconEmitter and handler (not finished)

This commit is contained in:
Gasper Jansa
2011-01-03 09:58:35 +01:00
parent 95148e2d50
commit 0a43e1104c
12 changed files with 750 additions and 4 deletions

View File

@@ -40,11 +40,14 @@ LIBSRCS += CreateRequestFactory.cpp
SRC_DIRS += $(PVACCESS)/remote
INC += remote.h
INC += blockingUDP.h
INC += blockingTCP.h
INC += beaconEmitter.h
INC += beaconServerStatusProvider.h
INC += beaconHandler.h
LIBSRCS += blockingUDPTransport.cpp
LIBSRCS += blockingUDPConnector.cpp
LIBSRCS += blockingTCPTransport.cpp
LIBSRCS += beaconEmitter.cpp
LIBSRCS += beaconServerStatusProvider.cpp
LIBSRCS += beaconHandler.cpp
LIBRARY = pvAccess
pvAccess_LIBS += Com

View File

@@ -0,0 +1,147 @@
/*
* beaconEmitter.cpp
*/
#include "beaconEmitter.h"
using namespace std;
namespace epics { namespace pvAccess {
const float BeaconEmitter::EPICS_CA_MIN_BEACON_PERIOD = 1.0;
const float BeaconEmitter::EPICS_CA_MIN_BEACON_COUNT_LIMIT = 3.0;
BeaconEmitter::BeaconEmitter(Transport* transport, ServerContext* context): _transport(transport)
{
if(transport == NULL || context == NULL)
{
throw EpicsException("null transport or context");
}
/* _timer = context->getTimer();
_logger = context->getLogger();
_beaconSequenceID = 0;
_serverAddress = context->getServerInetAddres();
_serverPort = context->getServerPort();
_serverStatusProvider = context->getBeaconServerStatusProvider();
_fastBeaconPeriod = std::max(context->getBeaconPeriod(), EPICS_CA_MIN_BEACON_PERIOD);
_slowBeaconPeriod = std::max(180.0, _fastBeaconPeriod); // TODO configurable
_beaconCountLimit = (int16)std::max(10, EPICS_CA_MIN_BEACON_COUNT_LIMIT); // TODO configurable
_startupTime = TimeStampFactory.create(System.currentTimeMillis());
_timerNode = TimerFactory.createNode(this);*/
}
BeaconEmitter::BeaconEmitter(Transport* transport,const osiSockAddr* serverAddress): _transport(transport)
{
if(transport == NULL)
{
throw EpicsException("null transport");
}
_timer = new Timer("pvAccess-server timer", lowPriority);
//_logger = new Loger();
_beaconSequenceID = 0;
_serverAddress = serverAddress;
_serverPort = serverAddress->ia.sin_port;
_serverStatusProvider = NULL;//new BeaconServerStatusProvider();
_fastBeaconPeriod = EPICS_CA_MIN_BEACON_PERIOD;
_slowBeaconPeriod = 180.0;
_beaconCountLimit = 10;
_startupTime = new TimeStamp();
_timerNode = new TimerNode(this);
}
BeaconEmitter::~BeaconEmitter()
{
if(_timer) delete _timer;
if(_serverStatusProvider) delete _serverStatusProvider;
if(_startupTime) delete _startupTime;
if(_timerNode) delete _timerNode;
}
void BeaconEmitter::lock()
{
//noop
}
void BeaconEmitter::unlock()
{
//noop
}
void BeaconEmitter::send(ByteBuffer* buffer, TransportSendControl* control)
{
// get server status
PVFieldPtr serverStatus = NULL;
if(_serverStatusProvider != NULL)
{
try
{
serverStatus = _serverStatusProvider->getServerStatusData();
}
catch (...) {
// we have to proctect internal code from external implementation...
//logger->log(Level.WARNING, "BeaconServerStatusProvider implementation thrown an exception.", th);
}
}
// send beacon
control->startMessage((int8)0, (sizeof(int16)+2*sizeof(int32)+128+sizeof(int16))/sizeof(int8));
buffer->putShort(_beaconSequenceID);
buffer->putInt((int32)_startupTime->getSecondsPastEpoch());
buffer->putInt((int32)_startupTime->getNanoSeconds());
// NOTE: is it possible (very likely) that address is any local address ::ffff:0.0.0.0
encodeAsIPv6Address(buffer, _serverAddress);
buffer->putShort((int16)_serverPort);
if (serverStatus != NULL)
{
// introspection interface + data
IntrospectionRegistry::serializeFull(serverStatus->getField(), buffer, control);
serverStatus->serialize(buffer, control);
}
else
{
IntrospectionRegistry::serializeFull(NULL, buffer, control);
}
control->flush(true);
// increment beacon sequence ID
_beaconSequenceID++;
reschedule();
}
void BeaconEmitter::timerStopped()
{
//noop
}
void BeaconEmitter::destroy()
{
_timerNode->cancel();
}
void BeaconEmitter::start()
{
_timer->scheduleAfterDelay(_timerNode, 0.0);
}
void BeaconEmitter::reschedule()
{
const double period = (_beaconSequenceID >= _beaconCountLimit) ? _slowBeaconPeriod : _fastBeaconPeriod;
if (period > 0)
{
_timer->scheduleAfterDelay(_timerNode, period);
}
}
void BeaconEmitter::callback()
{
_transport->enqueueSendRequest(this);
}
}}

View File

@@ -0,0 +1,153 @@
/*
* beaconEmitter.h
*/
#ifndef BEACONEMITTER_H
#define BEACONEMITTER_H
#include "timer.h"
#include "remote.h"
#include "beaconServerStatusProvider.h"
#include "inetAddressUtil.h"
#include "introspectionRegistry.h"
#include <timeStamp.h>
#include <osiSock.h>
#include <algorithm>
#include <iostream>
using namespace epics::pvData;
namespace epics { namespace pvAccess {
class ServerContext;
class Logger;
/**
* BeaconEmitter
*
* @author gjansa
*/
class BeaconEmitter: public TransportSender, public TimerCallback
{
public:
/**
* Constructor.
* @param transport transport to be used to send beacons.
* @param context CA context.
*/
BeaconEmitter(Transport* transport, ServerContext* context);
/**
* Test Constructor (ohne context)
* @param transport transport to be used to send beacons.
*/
BeaconEmitter(Transport* transport,const osiSockAddr* serverAddress);
virtual ~BeaconEmitter();
/*
* @see TransportSender#lock()
*/
void lock();
/*
* @see TransportSender#unlock()
*/
void unlock();
void send(ByteBuffer* buffer, TransportSendControl* control);
/**
* noop
*/
void timerStopped();
/**
* noop
*/
void destroy();
/**
* Start emitting.
*/
void start();
/**
* Reschedule timer.
*/
void reschedule();
/**
* Timer callback.
*/
void callback();
private:
/**
* Minimal (initial) CA beacon period (in seconds).
*/
static const float EPICS_CA_MIN_BEACON_PERIOD;
/**
* Minimal CA beacon count limit.
*/
static const float EPICS_CA_MIN_BEACON_COUNT_LIMIT;
/**
* Timer.
*/
Timer* _timer;
/**
* Logger.
*/
Logger* _logger;
/**
* Transport.
*/
Transport* _transport;
/**
* Beacon sequence ID.
*/
int16 _beaconSequenceID;
/**
* Startup timestamp (when clients detect a change, they will consider server restarted).
*/
TimeStamp* _startupTime;
/**
* Fast (at startup) beacon period (in sec).
*/
double _fastBeaconPeriod;
/**
* Slow (after beaconCountLimit is reached) beacon period (in sec).
*/
double _slowBeaconPeriod;
/**
* Limit on number of beacons issued.
*/
int16 _beaconCountLimit;
/**
* Server address.
*/
const osiSockAddr* _serverAddress;
/**
* Server port.
*/
int32 _serverPort;
/**
* Server status provider implementation (optional).
*/
BeaconServerStatusProvider* _serverStatusProvider;
/**
* Timer task node.
*/
TimerNode* _timerNode;
};
}}
#endif /* INTROSPECTIONREGISTRY_H */

View File

@@ -0,0 +1,109 @@
/*
* beaconHandler.cpp
*/
#include "beaconHandler.h"
using namespace std;
namespace epics { namespace pvAccess {
BeaconHandler::BeaconHandler(const ClientContextImpl* context, const osiSockAddr* responseFrom): _context(context), _responseFrom(responseFrom), _mutex(Mutex())
{
}
BeaconHandler::BeaconHandler(const osiSockAddr* responseFrom): _responseFrom(responseFrom), _mutex(Mutex())
{
}
BeaconHandler::~BeaconHandler()
{
}
void BeaconHandler::beaconNotify(osiSockAddr* from, int8 remoteTransportRevision,
int64 timestamp, TimeStamp* startupTime, int32 sequentalID,
PVFieldPtr data)
{
bool networkChanged = updateBeacon(remoteTransportRevision, timestamp, startupTime, sequentalID);
if(networkChanged)
{
changedTransport();
}
}
bool BeaconHandler::updateBeacon(int8 remoteTransportRevision, int64 timestamp,
TimeStamp* startupTime, int32 sequentalID)
{
Lock guard(&_mutex);
// first beacon notification check
if (_serverStartupTime == NULL)
{
_serverStartupTime = startupTime;
// new server up..
//TODO
//_context->beaconAnomalyNotify();
// notify corresponding transport(s)
beaconArrivalNotify();
return false;
}
bool networkChange = !(*_serverStartupTime == *startupTime);
if (networkChange)
{
//TODO
//_context->beaconAnomalyNotify();
}
else
{
beaconArrivalNotify();
}
return networkChange;
}
void BeaconHandler::beaconArrivalNotify()
{
int32 size;
//TODO TCP name must be get from somewhere not hardcoded
//TODO
Transport** transports = NULL;//_context->getTransportRegistry().get("TCP", _responseFrom, size);
if (transports == NULL)
{
return;
}
// notify all
for (int i = 0; i < size; i++)
{
transports[i]->aliveNotification();
}
delete transports;
}
void BeaconHandler::changedTransport()
{
int32 size;
//TODO TCP name must be get from somewhere not hardcoded
//TODO
Transport** transports = NULL;//_context->getTransportRegistry().get("TCP", _responseFrom, size);
if (transports == NULL)
{
return;
}
// notify all
for (int i = 0; i < size; i++)
{
transports[i]->changedTransport();
}
delete transports;
}
}}

View File

@@ -0,0 +1,92 @@
/*
* beaconHandler.h
*/
#ifndef BEACONHANDLER_H
#define BEACONHANDLER_H
#include "remote.h"
#include "inetAddressUtil.h"
#include "pvAccess.h"
#include <timeStamp.h>
#include <osiSock.h>
#include <lock.h>
#include <iostream>
using namespace epics::pvData;
namespace epics { namespace pvAccess {
//TODO delete this
class ClientContextImpl;
/**
* BeaconHandler
*/
class BeaconHandler
{
public:
/**
* Constructor.
* @param transport transport to be used to send beacons.
* @param context CA context.
*/
BeaconHandler(const ClientContextImpl* context, const osiSockAddr* responseFrom);
/**
* Test Constructor (for testing)
* @param transport transport to be used to send beacons.
*/
BeaconHandler(const osiSockAddr* responseFrom);
virtual ~BeaconHandler();
/**
* Update beacon period and do analitical checks (server restared, routing problems, etc.)
* @param from who is notifying.
* @param remoteTransportRevision encoded (major, minor) revision.
* @param timestamp time when beacon was received.
* @param startupTime server (reported) startup time.
* @param sequentalID sequential ID.
* @param data server status data, can be <code>NULL</code>.
*/
void beaconNotify(osiSockAddr* from, int8 remoteTransportRevision,
int64 timestamp, TimeStamp* startupTime, int32 sequentalID,
PVFieldPtr data);
private:
/**
* Context instance.
*/
const ClientContextImpl* _context;
/**
* Remote address.
*/
const osiSockAddr* _responseFrom;
/**
* Server startup timestamp.
*/
TimeStamp* _serverStartupTime;
/**
* Mutex
*/
Mutex _mutex;
/**
* Update beacon.
* @param remoteTransportRevision encoded (major, minor) revision.
* @param timestamp time when beacon was received.
* @param sequentalID sequential ID.
* @return network change (server restarted) detected.
*/
bool updateBeacon(int8 remoteTransportRevision, int64 timestamp,
TimeStamp* startupTime, int32 sequentalID);
/**
* Notify transport about beacon arrival.
*/
void beaconArrivalNotify();
/**
* Changed transport (server restarted) notify.
*/
void changedTransport();
};
}}
#endif /* INTROSPECTIONREGISTRY_H */

View File

@@ -0,0 +1,50 @@
/*
* beaconServerStatusProvider.cpp
*/
#include "beaconServerStatusProvider.h"
namespace epics { namespace pvAccess {
BeaconServerStatusProvider::BeaconServerStatusProvider( ServerContext* context): _context(context)
{
if(context == NULL)
{
throw EpicsException("null context");
}
initialize();
}
BeaconServerStatusProvider::BeaconServerStatusProvider()
{
initialize();
}
BeaconServerStatusProvider::~BeaconServerStatusProvider()
{
}
void BeaconServerStatusProvider::initialize()
{
PVDataCreate* pvDataCreate = getPVDataCreate();
FieldCreate* fieldCreate = getFieldCreate();
FieldConstPtrArray fields = new FieldConstPtr[6];
// TODO hierarchy can be used...
fields[0] = fieldCreate->createScalar("connections",pvInt);
fields[1] = fieldCreate->createScalar("allocatedMemory",pvLong);
fields[2] = fieldCreate->createScalar("freeMemory",pvLong);
fields[3] = fieldCreate->createScalar("threads",pvInt);
fields[4] = fieldCreate->createScalar("deadlocks",pvInt);
fields[5] = fieldCreate->createScalar("averageSystemLoad",pvDouble);
_status = pvDataCreate->createPVStructure(NULL,"status",6,fields);
}
PVFieldPtr BeaconServerStatusProvider::getServerStatusData()
{
//TODO implement
return static_cast<PVFieldPtr>(_status);
}
}}

View File

@@ -0,0 +1,52 @@
/*
* beaconServerStatusProvider.h
*/
#ifndef BEACONSERVERSTATUSPROVIDER_H
#define BEACONSERVERSTATUSPROVIDER_H
#include "pvData.h"
using namespace epics::pvData;
namespace epics { namespace pvAccess {
class ServerContext;
/**
* BeaconServerStatusProvider
*/
class BeaconServerStatusProvider
{
public:
/**
* Constructor.
* @param context CA context.
*/
BeaconServerStatusProvider(ServerContext* context);
/**
* Test Constructor (ohne context)
*/
BeaconServerStatusProvider();
/**
* Destructor.
*/
virtual ~BeaconServerStatusProvider();
/**
* Gets server status data.
*/
PVFieldPtr getServerStatusData();
private:
/**
* Initialize
*/
void initialize();
private:
PVStructurePtr _status;
ServerContext* _context;
};
}}
#endif /* INTROSPECTIONREGISTRY_H */

View File

@@ -14,6 +14,11 @@ PROD_HOST += testRemoteClientImpl
testRemoteClientImpl_SRCS += testRemoteClientImpl.cpp
testRemoteClientImpl_LIBS += pvData pvAccess Com
PROD_HOST += testBeaconEmitter
testBeaconEmitter_SRCS += testBeaconEmitter.cpp
testBeaconEmitter_LIBS += pvData pvAccess Com
include $(TOP)/configure/RULES
#----------------------------------------
# ADD RULES AFTER THIS LINE

View File

@@ -0,0 +1,71 @@
/*
* testBeaconEmitter.cpp
*/
#include "remote.h"
#include "blockingUDP.h"
#include "beaconEmitter.h"
#include "inetAddressUtil.h"
#include <osiSock.h>
#include <iostream>
#include <cstdio>
using namespace epics::pvAccess;
using namespace epics::pvData;
class DummyResponseHandler : public ResponseHandler
{
public:
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command, int payloadSize,
ByteBuffer* payloadBuffer)
{
cout << "DummyResponseHandler::handleResponse" << endl;
}
};
void testBeaconEmitter()
{
DummyResponseHandler drh;
/* SOCKET mysocket;
if ((mysocket = socket (AF_INET, SOCK_DGRAM, 0)) == -1)
{
assert(false);
}
InetAddrVector* broadcastAddresses = getBroadcastAddresses(mysocket);*/
InetAddrVector* broadcastAddresses = new InetAddrVector;
osiSockAddr* addr = new osiSockAddr;
addr->ia.sin_family = AF_INET;
addr->ia.sin_port = htons(5067);
if(inet_aton("92.50.75.255",&addr->ia.sin_addr)==0) {
cout<<"error in inet_aton()"<<endl;
return;
}
broadcastAddresses->push_back(addr);
BlockingUDPConnector connector(true, broadcastAddresses, true);
osiSockAddr bindAddr;
bindAddr.ia.sin_family = AF_INET;
bindAddr.ia.sin_port = htons(5066);
bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY);
Transport* transport = connector.connect(NULL, &drh, &bindAddr, 1, 50);
cout<<"Sending beacons"<<endl;
BeaconEmitter beaconEmitter(transport, transport->getRemoteAddress());
beaconEmitter.start();
while(1) sleep(1);
delete transport;
}
int main(int argc, char *argv[])
{
testBeaconEmitter();
return (0);
}

View File

@@ -0,0 +1,53 @@
/*
* testBeaconEmitter.cpp
*/
#include "remote.h"
#include "blockingUDP.h"
#include "beaconHandler.h"
#include "inetAddressUtil.h"
#include <osiSock.h>
#include <iostream>
#include <cstdio>
using namespace epics::pvAccess;
using namespace epics::pvData;
class BeaconResponseHandler : public ResponseHandler
{
public:
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command, int payloadSize,
ByteBuffer* payloadBuffer)
{
cout << "DummyResponseHandler::handleResponse" << endl;
}
};
void testBeaconHandler()
{
BeacondResponseHandler brh;
BlockingUDPConnector connector(false, NULL, true);
DummyClientContext context;
osiSockAddr bindAddr;
bindAddr.ia.sin_family = AF_INET;
bindAddr.ia.sin_port = htons(5067);
bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY);
Transport* transport = connector.connect(NULL, &brh, &bindAddr, 1, 50);
((BlockingUDPTransport*)transport)->start();
while(1) sleep(1);
delete transport;
}
int main(int argc, char *argv[])
{
testBeaconHandler();
return (0);
}

View File

@@ -412,7 +412,8 @@ void testSerializeStatus()
//TODO why are in and out on the same address?
}
int main(int argc, char *argv[]) {
int main(int argc, char *argv[])
{
pvDataCreate = getPVDataCreate();
statusCreate = getStatusCreate();
fieldCreate = getFieldCreate();

View File

@@ -118,6 +118,16 @@ int main(int argc, char *argv[])
}
assert(registry->numberOfActiveTransports() == 0);
for(int32 i = 0; i < address_max; i++)
{
for(int16 j = 0; j < priority_max; j++)
{
registry->put(static_cast<Transport*>(transportArrayIn[i * priority_max + j]));;
}
}
assert(registry->numberOfActiveTransports() == (priority_max * address_max));
registry->clear();
assert(registry->numberOfActiveTransports() == 0);
for(int32 i = 0; i < address_max; i++)
{