This commit is contained in:
miha_vitorovic
2011-01-12 14:34:39 +01:00
4 changed files with 165 additions and 70 deletions

View File

@@ -5,15 +5,18 @@
#include "beaconHandler.h"
using namespace std;
using namespace epics::pvData;
using namespace epics::pvAccess;
namespace epics { namespace pvAccess {
BeaconHandler::BeaconHandler(const ClientContextImpl* context, const osiSockAddr* responseFrom): _context(context), _responseFrom(responseFrom), _mutex(Mutex())
BeaconHandler::BeaconHandler(ClientContextImpl* context, const osiSockAddr* responseFrom): _context(context), _responseFrom(*responseFrom), _mutex(Mutex()), _serverStartupTime(TimeStamp(0))
{
}
BeaconHandler::BeaconHandler(const osiSockAddr* responseFrom): _responseFrom(responseFrom), _mutex(Mutex())
BeaconHandler::BeaconHandler(const osiSockAddr* responseFrom):
_responseFrom(*responseFrom), _mutex(Mutex()), _serverStartupTime(TimeStamp(0))
{
}
@@ -24,7 +27,7 @@ BeaconHandler::~BeaconHandler()
}
void BeaconHandler::beaconNotify(osiSockAddr* from, int8 remoteTransportRevision,
int64 timestamp, TimeStamp* startupTime, int32 sequentalID,
TimeStamp* timestamp, TimeStamp* startupTime, int16 sequentalID,
PVFieldPtr data)
{
bool networkChanged = updateBeacon(remoteTransportRevision, timestamp, startupTime, sequentalID);
@@ -34,18 +37,17 @@ void BeaconHandler::beaconNotify(osiSockAddr* from, int8 remoteTransportRevision
}
}
bool BeaconHandler::updateBeacon(int8 remoteTransportRevision, int64 timestamp,
TimeStamp* startupTime, int32 sequentalID)
bool BeaconHandler::updateBeacon(int8 remoteTransportRevision, TimeStamp* timestamp,
TimeStamp* startupTime, int16 sequentalID)
{
Lock guard(&_mutex);
// first beacon notification check
if (_serverStartupTime == NULL)
if (_serverStartupTime.getSecondsPastEpoch() == 0)
{
_serverStartupTime = startupTime;
_serverStartupTime = *startupTime;
// new server up..
//TODO
//_context->beaconAnomalyNotify();
_context->beaconAnomalyNotify();
// notify corresponding transport(s)
beaconArrivalNotify();
@@ -53,11 +55,10 @@ bool BeaconHandler::updateBeacon(int8 remoteTransportRevision, int64 timestamp,
return false;
}
bool networkChange = !(*_serverStartupTime == *startupTime);
bool networkChange = !(_serverStartupTime == *startupTime);
if (networkChange)
{
//TODO
//_context->beaconAnomalyNotify();
_context->beaconAnomalyNotify();
}
else
{
@@ -71,8 +72,7 @@ void BeaconHandler::beaconArrivalNotify()
{
int32 size = 0;
//TODO TCP name must be get from somewhere not hardcoded
//TODO
Transport** transports = NULL;//_context->getTransportRegistry().get("TCP", _responseFrom, size);
Transport** transports = _context->getTransportRegistry()->get("TCP", &_responseFrom, size);
if (transports == NULL)
{
return;
@@ -90,8 +90,7 @@ void BeaconHandler::changedTransport()
{
int32 size = 0;
//TODO TCP name must be get from somewhere not hardcoded
//TODO
Transport** transports = NULL;//_context->getTransportRegistry().get("TCP", _responseFrom, size);
Transport** transports = _context->getTransportRegistry()->get("TCP", &_responseFrom, size);
if (transports == NULL)
{
return;

View File

@@ -6,21 +6,15 @@
#define BEACONHANDLER_H
#include "remote.h"
#include "inetAddressUtil.h"
#include "pvAccess.h"
#include <pvAccess.h>
#include "clientContextImpl.h"
#include <timeStamp.h>
#include <osiSock.h>
#include <lock.h>
#include <iostream>
using namespace epics::pvData;
namespace epics { namespace pvAccess {
class ClientContextImpl;
/**
* BeaconHandler
*/
@@ -32,7 +26,7 @@ namespace epics { namespace pvAccess {
* @param transport transport to be used to send beacons.
* @param context CA context.
*/
BeaconHandler(const ClientContextImpl* context, const osiSockAddr* responseFrom);
BeaconHandler(ClientContextImpl* context, const osiSockAddr* responseFrom);
/**
* Test Constructor (for testing)
* @param transport transport to be used to send beacons.
@@ -48,26 +42,29 @@ namespace epics { namespace pvAccess {
* @param sequentalID sequential ID.
* @param data server status data, can be <code>NULL</code>.
*/
void beaconNotify(osiSockAddr* from, int8 remoteTransportRevision,
int64 timestamp, TimeStamp* startupTime, int32 sequentalID,
PVFieldPtr data);
void beaconNotify(osiSockAddr* from,
epics::pvData::int8 remoteTransportRevision,
epics::pvData::TimeStamp* timestamp,
epics::pvData::TimeStamp* startupTime,
epics::pvData::int16 sequentalID,
epics::pvData::PVFieldPtr data);
private:
/**
* Context instance.
*/
const ClientContextImpl* _context;
ClientContextImpl* _context;
/**
* Remote address.
*/
const osiSockAddr* _responseFrom;
/**
* Server startup timestamp.
*/
TimeStamp* _serverStartupTime;
const osiSockAddr _responseFrom;
/**
* Mutex
*/
Mutex _mutex;
epics::pvData::Mutex _mutex;
/**
* Server startup timestamp.
*/
epics::pvData::TimeStamp _serverStartupTime;
/**
* Update beacon.
* @param remoteTransportRevision encoded (major, minor) revision.
@@ -75,8 +72,10 @@ namespace epics { namespace pvAccess {
* @param sequentalID sequential ID.
* @return network change (server restarted) detected.
*/
bool updateBeacon(int8 remoteTransportRevision, int64 timestamp,
TimeStamp* startupTime, int32 sequentalID);
bool updateBeacon(epics::pvData::int8 remoteTransportRevision,
epics::pvData::TimeStamp* timestamp,
epics::pvData::TimeStamp* startupTime,
epics::pvData::int16 sequentalID);
/**
* Notify transport about beacon arrival.
*/

View File

@@ -14,7 +14,9 @@
namespace epics {
namespace pvAccess {
class BeaconHandler;
class ChannelImpl :
public Channel ,
public TransportClient,
@@ -48,7 +50,10 @@ namespace epics {
virtual Transport* getTransport(TransportClient* client, osiSockAddr* serverAddress, int16 minorRevision, int16 priority) = 0;
virtual void beaconAnomalyNotify() = 0;
virtual BeaconHandler* getBeaconHandler(osiSockAddr* responseFrom) = 0;
};

View File

@@ -22,7 +22,7 @@
#include <channelSearchManager.h>
#include <clientContextImpl.h>
#include <configuration.h>
#include <beaconHandler.h>
#include <errlog.h>
using namespace epics::pvData;
@@ -544,6 +544,74 @@ typedef std::map<pvAccessID, ResponseRequest*> IOIDResponseRequestMap;
};
class BeaconResponseHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods {
public:
BeaconResponseHandler(ClientContextImpl* context) :
AbstractClientResponseHandler(context, "Beacon")
{
}
virtual ~BeaconResponseHandler() {
}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer)
{
// reception timestamp
TimeStamp timestamp;
timestamp.getCurrent();
AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
transport->ensureData((2*sizeof(int16)+2*sizeof(int32)+128)/sizeof(int8));
int16 sequentalID = payloadBuffer->getShort();
TimeStamp startupTimestamp(payloadBuffer->getInt(),payloadBuffer->getInt());
osiSockAddr serverAddress;
serverAddress.ia.sin_family = AF_INET;
// 128-bit IPv6 address
/*
int8* byteAddress = new int8[16];
for (int i = 0; i < 16; i++)
byteAddress[i] = payloadBuffer->getByte(); };
*/
// IPv4 compatible IPv6 address expected
// first 80-bit are 0
if (payloadBuffer->getLong() != 0) return;
if (payloadBuffer->getShort() != 0) return;
if (payloadBuffer->getShort() != (int16)0xFFFF) return;
// accept given address if explicitly specified by sender
serverAddress.ia.sin_addr.s_addr = htonl(payloadBuffer->getInt());
if (serverAddress.ia.sin_addr.s_addr == INADDR_ANY)
serverAddress.ia.sin_addr = responseFrom->ia.sin_addr;
serverAddress.ia.sin_port = htons(payloadBuffer->getShort());
BeaconHandler* beaconHandler = _context->getBeaconHandler(responseFrom);
// currently we care only for servers used by this context
if (beaconHandler == NULL)
return;
// extra data
PVFieldPtr data = NULL;
const FieldConstPtr field = IntrospectionRegistry::deserializeFull(payloadBuffer, transport);
if (field != NULL)
{
data = getPVDataCreate()->createPVField(NULL, field);
data->deserialize(payloadBuffer, transport);
}
// notify beacon handler
beaconHandler->beaconNotify(responseFrom, version, &timestamp, &startupTimestamp, sequentalID, data);
}
};
class ConnectionValidationHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods {
public:
ConnectionValidationHandler(ClientContextImpl* context) :
@@ -1854,13 +1922,9 @@ class TestChannelImpl : public ChannelImpl {
*/
ResponseRequest* getResponseRequest(pvAccessID ioid)
{
/*
synchronized (pendingResponseRequests)
{
return (ResponseRequest)pendingResponseRequests.get(ioid);
}
*/
return 0;
Lock guard(&m_ioidMapMutex);
IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(ioid);
return (it == m_pendingResponseRequests.end() ? 0 : it->second);
}
/**
@@ -1870,15 +1934,10 @@ class TestChannelImpl : public ChannelImpl {
*/
pvAccessID registerResponseRequest(ResponseRequest* request)
{
/*
synchronized (pendingResponseRequests)
{
int ioid = generateIOID();
pendingResponseRequests.put(ioid, request);
return ioid;
}
*/
return 0;
Lock guard(&m_ioidMapMutex);
pvAccessID ioid = generateIOID();
m_pendingResponseRequests[ioid] = request;
return ioid;
}
/**
@@ -1888,13 +1947,14 @@ class TestChannelImpl : public ChannelImpl {
*/
ResponseRequest* unregisterResponseRequest(ResponseRequest* request)
{
/*
synchronized (pendingResponseRequests)
{
return (ResponseRequest)pendingResponseRequests.remove(request.getIOID());
}
*/
return 0;
Lock guard(&m_ioidMapMutex);
IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(request->getIOID());
if (it == m_pendingResponseRequests.end())
return 0;
ResponseRequest* retVal = it->second;
m_pendingResponseRequests.erase(it);
return retVal;
}
/**
@@ -1902,15 +1962,42 @@ class TestChannelImpl : public ChannelImpl {
* @return IOID.
*/
pvAccessID generateIOID()
{
Lock guard(&m_ioidMapMutex);
// search first free (theoretically possible loop of death)
while (m_pendingResponseRequests.find(++m_lastIOID) != m_pendingResponseRequests.end());
// reserve IOID
m_pendingResponseRequests[m_lastIOID] = 0;
return m_lastIOID;
}
/**
* Called each time beacon anomaly is detected.
*/
void beaconAnomalyNotify()
{
if (m_channelSearchManager)
m_channelSearchManager->beaconAnomalyNotify();
}
/**
* Get (and if necessary create) beacon handler.
* @param responseFrom remote source address of received beacon.
* @return beacon handler for particular server.
*/
BeaconHandler* getBeaconHandler(osiSockAddr* responseFrom)
{
/*
synchronized (pendingResponseRequests)
{
// search first free (theoretically possible loop of death)
while (pendingResponseRequests.get(++lastIOID) != null || lastIOID == CAConstants.CAJ_INVALID_IOID);
// reserve IOID
pendingResponseRequests.put(lastIOID, null);
return lastIOID;
synchronized (beaconHandlers) {
BeaconHandlerImpl handler = beaconHandlers.get(responseFrom);
if (handler == null)
{
handler = new BeaconHandlerImpl(this, responseFrom);
beaconHandlers.put(responseFrom, handler);
}
return handler;
}
*/
return 0;
@@ -2105,6 +2192,11 @@ class TestChannelImpl : public ChannelImpl {
typedef std::map<pvAccessID, ResponseRequest*> IOIDResponseRequestMap;
IOIDResponseRequestMap m_pendingResponseRequests;
/**
* IOIDResponseRequestMap mutex.
*/
Mutex m_ioidMapMutex;
/**
* Last IOID cache.
*/