more responses

This commit is contained in:
Matej Sekoranja
2011-01-12 14:30:05 +01:00
parent f72e495a99
commit 9416f887ad
4 changed files with 165 additions and 70 deletions

View File

@@ -22,7 +22,7 @@
#include <channelSearchManager.h>
#include <clientContextImpl.h>
#include <configuration.h>
#include <beaconHandler.h>
#include <errlog.h>
using namespace epics::pvData;
@@ -544,6 +544,74 @@ typedef std::map<pvAccessID, ResponseRequest*> IOIDResponseRequestMap;
};
class BeaconResponseHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods {
public:
BeaconResponseHandler(ClientContextImpl* context) :
AbstractClientResponseHandler(context, "Beacon")
{
}
virtual ~BeaconResponseHandler() {
}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, epics::pvData::ByteBuffer* payloadBuffer)
{
// reception timestamp
TimeStamp timestamp;
timestamp.getCurrent();
AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
transport->ensureData((2*sizeof(int16)+2*sizeof(int32)+128)/sizeof(int8));
int16 sequentalID = payloadBuffer->getShort();
TimeStamp startupTimestamp(payloadBuffer->getInt(),payloadBuffer->getInt());
osiSockAddr serverAddress;
serverAddress.ia.sin_family = AF_INET;
// 128-bit IPv6 address
/*
int8* byteAddress = new int8[16];
for (int i = 0; i < 16; i++)
byteAddress[i] = payloadBuffer->getByte(); };
*/
// IPv4 compatible IPv6 address expected
// first 80-bit are 0
if (payloadBuffer->getLong() != 0) return;
if (payloadBuffer->getShort() != 0) return;
if (payloadBuffer->getShort() != (int16)0xFFFF) return;
// accept given address if explicitly specified by sender
serverAddress.ia.sin_addr.s_addr = htonl(payloadBuffer->getInt());
if (serverAddress.ia.sin_addr.s_addr == INADDR_ANY)
serverAddress.ia.sin_addr = responseFrom->ia.sin_addr;
serverAddress.ia.sin_port = htons(payloadBuffer->getShort());
BeaconHandler* beaconHandler = _context->getBeaconHandler(responseFrom);
// currently we care only for servers used by this context
if (beaconHandler == NULL)
return;
// extra data
PVFieldPtr data = NULL;
const FieldConstPtr field = IntrospectionRegistry::deserializeFull(payloadBuffer, transport);
if (field != NULL)
{
data = getPVDataCreate()->createPVField(NULL, field);
data->deserialize(payloadBuffer, transport);
}
// notify beacon handler
beaconHandler->beaconNotify(responseFrom, version, &timestamp, &startupTimestamp, sequentalID, data);
}
};
class ConnectionValidationHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods {
public:
ConnectionValidationHandler(ClientContextImpl* context) :
@@ -1854,13 +1922,9 @@ class TestChannelImpl : public ChannelImpl {
*/
ResponseRequest* getResponseRequest(pvAccessID ioid)
{
/*
synchronized (pendingResponseRequests)
{
return (ResponseRequest)pendingResponseRequests.get(ioid);
}
*/
return 0;
Lock guard(&m_ioidMapMutex);
IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(ioid);
return (it == m_pendingResponseRequests.end() ? 0 : it->second);
}
/**
@@ -1870,15 +1934,10 @@ class TestChannelImpl : public ChannelImpl {
*/
pvAccessID registerResponseRequest(ResponseRequest* request)
{
/*
synchronized (pendingResponseRequests)
{
int ioid = generateIOID();
pendingResponseRequests.put(ioid, request);
return ioid;
}
*/
return 0;
Lock guard(&m_ioidMapMutex);
pvAccessID ioid = generateIOID();
m_pendingResponseRequests[ioid] = request;
return ioid;
}
/**
@@ -1888,13 +1947,14 @@ class TestChannelImpl : public ChannelImpl {
*/
ResponseRequest* unregisterResponseRequest(ResponseRequest* request)
{
/*
synchronized (pendingResponseRequests)
{
return (ResponseRequest)pendingResponseRequests.remove(request.getIOID());
}
*/
return 0;
Lock guard(&m_ioidMapMutex);
IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(request->getIOID());
if (it == m_pendingResponseRequests.end())
return 0;
ResponseRequest* retVal = it->second;
m_pendingResponseRequests.erase(it);
return retVal;
}
/**
@@ -1902,15 +1962,42 @@ class TestChannelImpl : public ChannelImpl {
* @return IOID.
*/
pvAccessID generateIOID()
{
Lock guard(&m_ioidMapMutex);
// search first free (theoretically possible loop of death)
while (m_pendingResponseRequests.find(++m_lastIOID) != m_pendingResponseRequests.end());
// reserve IOID
m_pendingResponseRequests[m_lastIOID] = 0;
return m_lastIOID;
}
/**
* Called each time beacon anomaly is detected.
*/
void beaconAnomalyNotify()
{
if (m_channelSearchManager)
m_channelSearchManager->beaconAnomalyNotify();
}
/**
* Get (and if necessary create) beacon handler.
* @param responseFrom remote source address of received beacon.
* @return beacon handler for particular server.
*/
BeaconHandler* getBeaconHandler(osiSockAddr* responseFrom)
{
/*
synchronized (pendingResponseRequests)
{
// search first free (theoretically possible loop of death)
while (pendingResponseRequests.get(++lastIOID) != null || lastIOID == CAConstants.CAJ_INVALID_IOID);
// reserve IOID
pendingResponseRequests.put(lastIOID, null);
return lastIOID;
synchronized (beaconHandlers) {
BeaconHandlerImpl handler = beaconHandlers.get(responseFrom);
if (handler == null)
{
handler = new BeaconHandlerImpl(this, responseFrom);
beaconHandlers.put(responseFrom, handler);
}
return handler;
}
*/
return 0;
@@ -2105,6 +2192,11 @@ class TestChannelImpl : public ChannelImpl {
typedef std::map<pvAccessID, ResponseRequest*> IOIDResponseRequestMap;
IOIDResponseRequestMap m_pendingResponseRequests;
/**
* IOIDResponseRequestMap mutex.
*/
Mutex m_ioidMapMutex;
/**
* Last IOID cache.
*/