started protocol changes

This commit is contained in:
Matej Sekoranja
2014-06-09 08:25:47 +02:00
parent 05d4c3ea55
commit d19cf15a0e
10 changed files with 141 additions and 77 deletions
+1 -1
View File
@@ -26,7 +26,7 @@ namespace pvAccess {
const epics::pvData::int8 PVA_MAGIC = static_cast<epics::pvData::int8>(0xCA);
/** PVA protocol revision (implemented by this library). */
const epics::pvData::int8 PVA_PROTOCOL_REVISION = 0;
const epics::pvData::int8 PVA_PROTOCOL_REVISION = 1;
/** PVA version signature used to report this implementation version in header. */
const epics::pvData::int8 PVA_VERSION = PVA_PROTOCOL_REVISION;
+31 -23
View File
@@ -15,45 +15,43 @@ namespace epics {
namespace pvAccess {
BeaconHandler::BeaconHandler(Context::shared_pointer const & context,
std::string const & protocol,
const osiSockAddr* responseFrom) :
_context(Context::weak_pointer(context)),
_protocol(protocol),
_responseFrom(*responseFrom),
_mutex(),
_serverStartupTime(0)
{
}
BeaconHandler::BeaconHandler(const osiSockAddr* responseFrom) :
_responseFrom(*responseFrom),
_mutex(),
_serverStartupTime(0)
_serverGUID(),
_serverChangeCount(-1),
_first(true)
{
}
BeaconHandler::~BeaconHandler()
{
}
void BeaconHandler::beaconNotify(osiSockAddr* /*from*/, int8 remoteTransportRevision,
TimeStamp* timestamp, TimeStamp* startupTime, int16 sequentalID,
TimeStamp* timestamp, GUID const & guid, int16 sequentalID,
int16 changeCount,
PVFieldPtr /*data*/)
{
bool networkChanged = updateBeacon(remoteTransportRevision, timestamp, startupTime, sequentalID);
bool networkChanged = updateBeacon(remoteTransportRevision, timestamp, guid, sequentalID, changeCount);
if (networkChanged)
changedTransport();
}
bool BeaconHandler::updateBeacon(int8 /*remoteTransportRevision*/, TimeStamp* /*timestamp*/,
TimeStamp* startupTime, int16 /*sequentalID*/)
GUID const & guid, int16 /*sequentalID*/, int16 changeCount)
{
Lock guard(_mutex);
// first beacon notification check
if (_serverStartupTime.getSecondsPastEpoch() == 0)
{
_serverStartupTime = *startupTime;
Lock guard(_mutex);
// first beacon notification check
if (_first)
{
_first = false;
_serverGUID = guid;
_serverChangeCount = changeCount;
// new server up..
_context.lock()->newServerDetected();
@@ -61,25 +59,35 @@ bool BeaconHandler::updateBeacon(int8 /*remoteTransportRevision*/, TimeStamp* /*
return false;
}
bool networkChange = !(_serverStartupTime == *startupTime);
bool networkChange = (memcmp(_serverGUID.value, guid.value, sizeof(guid.value)) != 0);
if (networkChange)
{
// update startup time
_serverStartupTime = *startupTime;
// update startup time and change count
_serverGUID = guid;
_serverChangeCount = changeCount;
_context.lock()->newServerDetected();
return true;
}
else if (_serverChangeCount != changeCount)
{
// update change count
_serverChangeCount = changeCount;
// TODO be more specific (possible optimizations)
_context.lock()->newServerDetected();
return true;
}
return false;
}
void BeaconHandler::changedTransport()
{
// TODO why only TCP, actually TCP does not need this
auto_ptr<TransportRegistry::transportVector_t> transports =
_context.lock()->getTransportRegistry()->get("TCP", &_responseFrom);
_context.lock()->getTransportRegistry()->get(_protocol, &_responseFrom);
if (!transports.get())
return;
+27 -14
View File
@@ -38,36 +38,37 @@ namespace pvAccess {
/**
* Constructor.
* @param transport transport to be used to send beacons.
* @param context PVA context.
*/
BeaconHandler(Context::shared_pointer const & context, const osiSockAddr* responseFrom);
/**
* Test Constructor (for testing)
* @param transport transport to be used to send beacons.
*/
BeaconHandler(const osiSockAddr* responseFrom);
BeaconHandler(Context::shared_pointer const & context, std::string const & protocol,
const osiSockAddr* responseFrom);
virtual ~BeaconHandler();
/**
* Update beacon period and do analitical checks (server restared, routing problems, etc.)
* @param from who is notifying.
* @param remoteTransportRevision encoded (major, minor) revision.
* @param timestamp time when beacon was received.
* @param startupTime server (reported) startup time.
* @param guid server GUID.
* @param sequentalID sequential ID.
* @param changeCount change count.
* @param data server status data, can be <code>NULL</code>.
*/
void beaconNotify(osiSockAddr* from,
epics::pvData::int8 remoteTransportRevision,
epics::pvData::TimeStamp* timestamp,
epics::pvData::TimeStamp* startupTime,
GUID const &guid,
epics::pvData::int16 sequentalID,
epics::pvData::int16 changeCount,
epics::pvData::PVFieldPtr data);
private:
/**
* Context instance.
*/
Context::weak_pointer _context;
/**
* The procotol (transport), "tcp" for pvAccess TCP/IP.
*/
std::string _protocol;
/**
* Remote address.
*/
@@ -76,21 +77,33 @@ namespace pvAccess {
* Mutex
*/
epics::pvData::Mutex _mutex;
/**
* Server GUID.
*/
GUID _serverGUID;
/**
* Server startup timestamp.
*/
epics::pvData::TimeStamp _serverStartupTime;
epics::pvData::int16 _serverChangeCount;
/**
* First beacon flag.
*/
bool _first;
/**
* Update beacon.
* @param remoteTransportRevision encoded (major, minor) revision.
* @param timestamp time when beacon was received.
* @param guid server GUID.
* @param sequentalID sequential ID.
* @param changeCount change count.
* @return network change (server restarted) detected.
*/
bool updateBeacon(epics::pvData::int8 remoteTransportRevision,
epics::pvData::TimeStamp* timestamp,
epics::pvData::TimeStamp* startupTime,
epics::pvData::int16 sequentalID);
GUID const &guid,
epics::pvData::int16 sequentalID,
epics::pvData::int16 changeCount);
/**
* Changed transport (server restarted) notify.
*/
+5
View File
@@ -41,6 +41,11 @@ namespace epics {
#define PVACCESS_REFCOUNT_MONITOR_DESTRUCT(name)
class TransportRegistry;
/**
* Globally unique ID.
*/
typedef struct { char value[12]; } GUID;
enum QoS {
/**
+16 -6
View File
@@ -2640,10 +2640,13 @@ namespace epics {
AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
transport->ensureData((2*sizeof(int16)+2*sizeof(int32)+128)/sizeof(int8));
transport->ensureData(12+2+2+16+2);
GUID guid;
payloadBuffer->get(guid.value, 0, sizeof(guid.value));
int16 sequentalID = payloadBuffer->getShort();
TimeStamp startupTimestamp(payloadBuffer->getLong(),payloadBuffer->getInt());
int16 changeCount = payloadBuffer->getShort();
osiSockAddr serverAddress;
serverAddress.ia.sin_family = AF_INET;
@@ -2667,13 +2670,15 @@ namespace epics {
serverAddress.ia.sin_addr = responseFrom->ia.sin_addr;
serverAddress.ia.sin_port = htons(payloadBuffer->getShort());
std::string protocol = SerializeHelper::deserializeString(payloadBuffer, transport.get());
// TODO optimize
ClientContextImpl::shared_pointer context = _context.lock();
if (!context)
return;
std::tr1::shared_ptr<epics::pvAccess::BeaconHandler> beaconHandler = context->getBeaconHandler(responseFrom);
std::tr1::shared_ptr<epics::pvAccess::BeaconHandler> beaconHandler = context->getBeaconHandler(protocol, responseFrom);
// currently we care only for servers used by this context
if (beaconHandler == 0)
return;
@@ -2688,7 +2693,7 @@ namespace epics {
}
// notify beacon handler
beaconHandler->beaconNotify(responseFrom, version, &timestamp, &startupTimestamp, sequentalID, data);
beaconHandler->beaconNotify(responseFrom, version, &timestamp, guid, sequentalID, changeCount, data);
}
};
@@ -4342,17 +4347,22 @@ TODO
/**
* Get (and if necessary create) beacon handler.
* @param protocol the protocol.
* @param responseFrom remote source address of received beacon.
* @return beacon handler for particular server.
*/
BeaconHandler::shared_pointer getBeaconHandler(osiSockAddr* responseFrom)
BeaconHandler::shared_pointer getBeaconHandler(std::string const & protocol, osiSockAddr* responseFrom)
{
// TODO !!! protocol !!!
if (protocol != "tcp")
return BeaconHandler::shared_pointer();
Lock guard(m_beaconMapMutex);
AddressBeaconHandlerMap::iterator it = m_beaconHandlers.find(*responseFrom);
BeaconHandler::shared_pointer handler;
if (it == m_beaconHandlers.end())
{
handler.reset(new BeaconHandler(shared_from_this(), responseFrom));
handler.reset(new BeaconHandler(shared_from_this(), protocol, responseFrom));
m_beaconHandlers[*responseFrom] = handler;
}
else
+1 -1
View File
@@ -121,7 +121,7 @@ namespace epics {
virtual void newServerDetected() = 0;
virtual std::tr1::shared_ptr<BeaconHandler> getBeaconHandler(osiSockAddr* responseFrom) = 0;
virtual std::tr1::shared_ptr<BeaconHandler> getBeaconHandler(std::string const & protocol, osiSockAddr* responseFrom) = 0;
virtual void configure(epics::pvData::PVStructure::shared_pointer configuration) = 0;
virtual void flush() = 0;
+11 -21
View File
@@ -25,10 +25,12 @@ const float BeaconEmitter::EPICS_PVA_MIN_BEACON_PERIOD = 1.0;
const float BeaconEmitter::EPICS_PVA_MIN_BEACON_COUNT_LIMIT = 3.0;
//BeaconEmitter::BeaconEmitter(Transport::shared_pointer const & transport, ServerContextImpl::shared_pointer const & context) :
BeaconEmitter::BeaconEmitter(Transport::shared_pointer const & transport, std::tr1::shared_ptr<ServerContextImpl>& context) :
BeaconEmitter::BeaconEmitter(std::string const & protocol,
Transport::shared_pointer const & transport, std::tr1::shared_ptr<ServerContextImpl>& context) :
_protocol(protocol),
_transport(transport),
_beaconSequenceID(0),
_startupTime(),
_guid(context->getGUID()),
_fastBeaconPeriod(std::max(context->getBeaconPeriod(), EPICS_PVA_MIN_BEACON_PERIOD)),
_slowBeaconPeriod(std::max(180.0, _fastBeaconPeriod)), // TODO configurable
_beaconCountLimit((int16)std::max(10.0f, EPICS_PVA_MIN_BEACON_COUNT_LIMIT)), // TODO configurable
@@ -37,22 +39,6 @@ BeaconEmitter::BeaconEmitter(Transport::shared_pointer const & transport, std::t
_serverStatusProvider(context->getBeaconServerStatusProvider()),
_timer(context->getTimer())
{
_startupTime.getCurrent();
}
BeaconEmitter::BeaconEmitter(Transport::shared_pointer const & transport, const osiSockAddr& serverAddress) :
_transport(transport),
_beaconSequenceID(0),
_startupTime(),
_fastBeaconPeriod(EPICS_PVA_MIN_BEACON_PERIOD),
_slowBeaconPeriod(180.0),
_beaconCountLimit(10),
_serverAddress(serverAddress),
_serverPort(serverAddress.ia.sin_port),
_serverStatusProvider(),
_timer(new Timer("pvAccess-server timer", lowPriority))
{
_startupTime.getCurrent();
}
BeaconEmitter::~BeaconEmitter()
@@ -89,16 +75,20 @@ void BeaconEmitter::send(ByteBuffer* buffer, TransportSendControl* control)
}
// send beacon
control->startMessage((int8)0, (sizeof(int16)+2*sizeof(int32)+128+sizeof(int16))/sizeof(int8));
control->startMessage((int8)0, 12+2+2+16+2);
buffer->put(_guid.value, 0, sizeof(_guid.value));
buffer->putShort(_beaconSequenceID);
buffer->putLong((int64)_startupTime.getSecondsPastEpoch());
buffer->putInt((int32)_startupTime.getNanoSeconds());
// TODO for now fixed changeCount
buffer->putShort(0);
// NOTE: is it possible (very likely) that address is any local address ::ffff:0.0.0.0
encodeAsIPv6Address(buffer, &_serverAddress);
buffer->putShort((int16)_serverPort);
SerializeHelper::serializeString(_protocol, buffer, control);
if (serverStatus)
{
// introspection interface + data
+12 -10
View File
@@ -47,18 +47,15 @@ namespace epics { namespace pvAccess {
/**
* Constructor.
* @param protocol a protocol (transport) name to report.
* @param transport transport to be used to send beacons.
* @param context PVA context.
*/
// BeaconEmitter(Transport::shared_pointer const & transport, ServerContextImpl::shared_pointer const & context);
BeaconEmitter(Transport::shared_pointer const & transport, std::tr1::shared_ptr<ServerContextImpl>& context);
// BeaconEmitter(std::sting const & protocol,
// Transport::shared_pointer const & transport, ServerContextImpl::shared_pointer const & context);
BeaconEmitter(std::string const & protocol,
Transport::shared_pointer const & transport, std::tr1::shared_ptr<ServerContextImpl>& context);
/**
* Test Constructor (ohne context)
* @param transport transport to be used to send beacons.
*/
BeaconEmitter(Transport::shared_pointer const & transport, const osiSockAddr& serverAddress);
virtual ~BeaconEmitter();
void lock();
@@ -97,6 +94,11 @@ namespace epics { namespace pvAccess {
*/
static const float EPICS_PVA_MIN_BEACON_COUNT_LIMIT;
/**
* Protocol.
*/
std::string _protocol;
/**
* Transport.
*/
@@ -108,9 +110,9 @@ namespace epics { namespace pvAccess {
epics::pvData::int16 _beaconSequenceID;
/**
* Startup timestamp (when clients detect a change, they will consider server restarted).
* Server GUID.
*/
epics::pvData::TimeStamp _startupTime;
GUID _guid;
/**
* Fast (at startup) beacon period (in sec).
+18 -1
View File
@@ -46,6 +46,7 @@ ServerContextImpl::ServerContextImpl():
epicsSignalInstallSigAlarmIgnore ();
epicsSignalInstallSigPipeIgnore ();
generateGUID();
initializeLogger();
loadConfiguration();
}
@@ -61,11 +62,26 @@ ServerContextImpl::~ServerContextImpl()
dispose();
}
const GUID& ServerContextImpl::getGUID()
{
return _guid;
}
const Version& ServerContextImpl::getVersion()
{
return ServerContextImpl::VERSION;
}
void ServerContextImpl::generateGUID()
{
epics::pvData::TimeStamp startupTime;
startupTime.getCurrent();
ByteBuffer buffer(_guid.value, sizeof(_guid.value));
buffer.putLong(startupTime.getSecondsPastEpoch());
buffer.putInt(startupTime.getNanoSeconds());
}
void ServerContextImpl::initializeLogger()
{
//createFileLogger("serverContextImpl.log");
@@ -217,7 +233,8 @@ void ServerContextImpl::internalInitialize()
// setup broadcast UDP transport
initializeBroadcastTransport();
_beaconEmitter.reset(new BeaconEmitter(_broadcastTransport, thisServerContext));
// TODO introduce a constant
_beaconEmitter.reset(new BeaconEmitter("tcp", _broadcastTransport, thisServerContext));
}
void ServerContextImpl::initializeBroadcastTransport()
+19
View File
@@ -35,6 +35,13 @@ public:
* Destructor
*/
virtual ~ServerContext() {};
/**
* Returns GUID (12-byte array).
* @return GUID.
*/
virtual const GUID& getGUID() = 0;
/**
* Get context implementation version.
* @return version of the context implementation.
@@ -115,6 +122,7 @@ public:
virtual ~ServerContextImpl();
//**************** derived from ServerContext ****************//
const GUID& getGUID();
const Version& getVersion();
void initialize(ChannelProviderRegistry::shared_pointer const & channelProviderRegistry);
void run(epics::pvData::int32 seconds);
@@ -283,6 +291,12 @@ public:
bool isChannelProviderNamePreconfigured();
private:
/**
* Server GUID.
*/
GUID _guid;
/**
* Initialization status.
*/
@@ -381,6 +395,11 @@ private:
*/
BeaconServerStatusProvider::shared_pointer _beaconServerStatusProvider;
/**
* Generate GUID.
*/
void generateGUID();
/**
* Initialize logger.
*/