ChannelSearchManager mocks

This commit is contained in:
Matej Sekoranja
2011-01-05 10:14:18 +01:00
parent 45ec248380
commit bf5e3f3e19

View File

@@ -13,6 +13,8 @@
#include <timer.h>
#include <blockingUDP.h>
#include <inetAddressUtil.h>
#include <hexDump.h>
#include <remote.h>
using namespace epics::pvData;
using namespace epics::pvAccess;
@@ -370,24 +372,200 @@ class MockMonitor : public Monitor, public MonitorElement
};
class ChannelSearchManager;
class BlockingTCPConnector;
class NamedLockPattern;
class ResponseRequest;
class BeaconHandlerImpl;
class ClientContextImpl;
typedef int pvAccessID;
/**
* A request that expects an response.
* Responses identified by its I/O ID.
* This interface needs to be extended (to provide method called on response).
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
*/
class ResponseRequest {
public:
/**
* Get I/O ID.
* @return ioid
*/
virtual pvAccessID getIOID() = 0;
/**
* Timeout notification.
*/
virtual void timeout() = 0;
/**
* Cancel response request (always to be called to complete/destroy).
*/
virtual void cancel() = 0;
/**
* Report status to clients (e.g. disconnected).
* @param status to report.
*/
virtual void reportStatus(Status* status) = 0;
/**
* Get request requester.
* @return request requester.
*/
virtual Requester* getRequester() = 0;
};
// TODO consider std::unordered_map
typedef std::map<pvAccessID, ResponseRequest*> IOIDResponseRequestMap;
// TODO log
#define CALLBACK_GUARD(code) try { code } catch(...) { }
class ClientContextImpl;
/**
* CA response handler - main handler which dispatches responses to appripriate handlers.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: ClientResponseHandler.java,v 1.1 2010/05/03 14:45:40 mrkraimer Exp $
*/
class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoDefaultMethods {
private:
/**
* Table of response handlers for each command ID.
*/
ResponseHandler** m_handlerTable;
/**
* Context instance.
*/
ClientContextImpl* m_context;
public:
~ClientResponseHandler() {
delete[] m_handlerTable;
}
/**
* @param context
*/
ClientResponseHandler(ClientContextImpl* context) : m_context(context) {
static ResponseHandler* badResponse = 0; //new BadResponse(context);
static ResponseHandler* dataResponse = 0; //new DataResponseHandler(context);
#define HANDLER_COUNT 28
m_handlerTable = new ResponseHandler*[HANDLER_COUNT];
m_handlerTable[ 0] = badResponse; // TODO new BeaconHandler(context), /* 0 */
m_handlerTable[ 1] = badResponse; // TODO new ConnectionValidationHandler(context), /* 1 */
m_handlerTable[ 2] = badResponse; // TODO new NoopResponse(context, "Echo"), /* 2 */
m_handlerTable[ 3] = badResponse; // TODO new NoopResponse(context, "Search"), /* 3 */
m_handlerTable[ 4] = badResponse; // TODO new SearchResponseHandler(context), /* 4 */
m_handlerTable[ 5] = badResponse; // TODO new NoopResponse(context, "Introspection search"), /* 5 */
m_handlerTable[ 6] = dataResponse; /* 6 - introspection search */
m_handlerTable[ 7] = badResponse; // TODO new CreateChannelHandler(context), /* 7 */
m_handlerTable[ 8] = badResponse; // TODO new NoopResponse(context, "Destroy channel"), /* 8 */ // TODO it might be useful to implement this...
m_handlerTable[ 9] = badResponse; /* 9 */
m_handlerTable[10] = dataResponse; /* 10 - get response */
m_handlerTable[11] = dataResponse; /* 11 - put response */
m_handlerTable[12] = dataResponse; /* 12 - put-get response */
m_handlerTable[13] = dataResponse; /* 13 - monitor response */
m_handlerTable[14] = dataResponse; /* 14 - array response */
m_handlerTable[15] = badResponse; /* 15 - cancel request */
m_handlerTable[16] = dataResponse; /* 16 - process response */
m_handlerTable[17] = dataResponse; /* 17 - get field response */
m_handlerTable[18] = badResponse; // TODO new MessageHandler(context), /* 18 - message to Requester */
m_handlerTable[19] = badResponse; // TODO new MultipleDataResponseHandler(context), /* 19 - grouped monitors */
m_handlerTable[20] = dataResponse; /* 20 - RPC response */
m_handlerTable[21] = badResponse; /* 21 */
m_handlerTable[22] = badResponse; /* 22 */
m_handlerTable[23] = badResponse; /* 23 */
m_handlerTable[24] = badResponse; /* 24 */
m_handlerTable[25] = badResponse; /* 25 */
m_handlerTable[26] = badResponse; /* 26 */
m_handlerTable[27] = badResponse; /* 27 */
}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport* transport, int8 version, int8 command,
int payloadSize, ByteBuffer* payloadBuffer)
{
if (command < 0 || command >= HANDLER_COUNT)
{
// TODO context.getLogger().fine("Invalid (or unsupported) command: " + command + ".");
std::cout << "Invalid (or unsupported) command: " << command << "." << std::endl;
// TODO remove debug output
char buf[100];
sprintf(buf, "Invalid CA header %d its payload buffer", command);
hexDump(buf, (const int8*)(payloadBuffer->getArray()), payloadBuffer->getPosition(), payloadSize);
return;
}
// delegate
m_handlerTable[command]->handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
}
};
#include <arrayFIFO.h>
class SearchInstance {
public:
virtual pvAccessID getChannelID() = 0;
virtual String getChannelName() = 0;
virtual void unsetListOwnership() = 0;
virtual void addAndSetListOwnership(ArrayFIFO<SearchInstance>* newOwner, int index) = 0;
virtual void removeAndUnsetListOwnership() = 0;
virtual int getOwnerIndex() = 0;
virtual bool generateSearchRequestMessage(ByteBuffer* buffer, TransportSendControl* control) = 0;
/**
* Search response from server (channel found).
* @param minorRevision server minor CA revision.
* @param serverAddress server address.
*/
virtual void searchResponse(int8 minorRevision, osiSockAddr* serverAddress) = 0;
};
// TODO (only to make to make it compile)
class BaseSearchInstance : public SearchInstance
{
public:
virtual pvAccessID getChannelID() { return 0; }
virtual String getChannelName() { return ""; }
virtual void unsetListOwnership() {}
virtual void addAndSetListOwnership(ArrayFIFO<SearchInstance>* newOwner, int index) {}
virtual void removeAndUnsetListOwnership() {}
virtual int getOwnerIndex() { return 0; }
virtual bool generateSearchRequestMessage(ByteBuffer* buffer, TransportSendControl* control) { return false; };
};
class ChannelSearchManager { // tODO no default, etc.
public:
virtual void registerChannel(SearchInstance* channel) = 0;
virtual void unregisterChannel(SearchInstance* channel) = 0;
};
class BlockingTCPConnector;
class NamedLockPattern;
class BeaconHandlerImpl;
PVDATA_REFCOUNT_MONITOR_DEFINE(channel);
@@ -420,10 +598,10 @@ class ClientContextImpl : public ClientContext
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
*/
class ChannelImpl :
public Channel /*,
public Channel ,
public TransportClient,
public TransportSender,
public BaseSearchInstance */ {
public BaseSearchInstance {
private:
/**
@@ -469,13 +647,13 @@ class ChannelImpl :
/**
* Allow reconnection flag.
*/
bool m_allowCreation; // = true;
bool m_allowCreation;
/**
* Reference counting.
* NOTE: synced on <code>this</code>.
* NOTE: synced on <code>m_channelMutex</code>.
*/
int m_references; // = 1;
int m_references;
/* ****************** */
/* CA protocol fields */
@@ -491,6 +669,16 @@ class ChannelImpl :
*/
pvAccessID m_serverChannelID;
/**
* Context sync. mutex.
*/
Mutex m_channelMutex;
/**
* Flag indicting what message to send.
*/
bool m_issueCreateMessage;
// TODO mock
PVStructure* m_pvStructure;
@@ -525,17 +713,21 @@ class ChannelImpl :
m_addresses(addresses),
m_connectionState(NEVER_CONNECTED),
m_allowCreation(true),
m_serverChannelID(0xFFFFFFFF)
m_references(1),
m_transport(0),
m_serverChannelID(0xFFFFFFFF),
m_issueCreateMessage(true)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channel);
/*
// register before issuing search request
m_context->registerChannel(this);
// connect
connect();
*/
//
// mock
//
@@ -577,10 +769,19 @@ class ChannelImpl :
return m_context->getProvider();
}
// NOTE: synchronization guarantees that <code>transport</code> is non-<code>null</code> and <code>state == CONNECTED</code>.
virtual epics::pvData::String getRemoteAddress()
{
return "TODO";
}
Lock guard(&m_channelMutex);
if (m_connectionState != CONNECTED) {
static String emptyString;
return emptyString;
}
else
{
return inetAddressToString(m_transport->getRemoteAddress());
}
}
virtual epics::pvData::String getChannelName()
{
@@ -594,7 +795,8 @@ class ChannelImpl :
virtual ConnectionState getConnectionState()
{
return CONNECTED;
Lock guard(&m_channelMutex);
return m_connectionState;
}
virtual bool isConnected()
@@ -607,7 +809,394 @@ class ChannelImpl :
return readWrite;
}
virtual void getField(GetFieldRequester *requester,epics::pvData::String subField)
/**
* Get client channel ID.
* @return client channel ID.
*/
pvAccessID getChannelID() const {
return m_channelID;
}
void connect() {
Lock guard(&m_channelMutex);
// if not destroyed...
if (m_connectionState == DESTROYED)
throw std::runtime_error("Channel destroyed.");
else if (m_connectionState != CONNECTED)
initiateSearch();
}
void disconnect() {
Lock guard(&m_channelMutex);
// if not destroyed...
if (m_connectionState == DESTROYED)
throw std::runtime_error("Channel destroyed.");
else if (m_connectionState == CONNECTED)
disconnect(false, true);
}
/**
* Create a channel, i.e. submit create channel request to the server.
* This method is called after search is complete.
* @param transport
*/
void createChannel(Transport* transport)
{
Lock guard(&m_channelMutex);
// do not allow duplicate creation to the same transport
if (!m_allowCreation)
return;
m_allowCreation = false;
// check existing transport
if (m_transport && m_transport != transport)
{
disconnectPendingIO(false);
ReferenceCountingTransport* rct = dynamic_cast<ReferenceCountingTransport*>(m_transport);
if (rct) rct->release(this);
}
else if (m_transport == transport)
{
// request to sent create request to same transport, ignore
// this happens when server is slower (processing search requests) than client generating it
return;
}
m_transport = transport;
m_transport->enqueueSendRequest(this);
}
virtual void cancel() {
// noop
}
virtual void timeout() {
createChannelFailed();
}
/**
* Create channel failed.
*/
virtual void createChannelFailed()
{
Lock guard(&m_channelMutex);
cancel();
// ... and search again
initiateSearch();
}
/**
* Called when channel created succeeded on the server.
* <code>sid</code> might not be valid, this depends on protocol revision.
* @param sid
*/
void connectionCompleted(pvAccessID sid/*, rights*/)
{
Lock guard(&m_channelMutex);
bool allOK = false;
try
{
// do this silently
if (m_connectionState == DESTROYED)
return;
// store data
m_serverChannelID = sid;
//setAccessRights(rights);
// user might create monitors in listeners, so this has to be done before this can happen
// however, it would not be nice if events would come before connection event is fired
// but this cannot happen since transport (TCP) is serving in this thread
resubscribeSubscriptions();
setConnectionState(CONNECTED);
allOK = true;
}
catch (...) {
// noop
// TODO at least log something??
}
if (!allOK)
{
// end connection request
cancel();
}
}
/**
* @param force force destruction regardless of reference count
*/
void destroy(bool force) {
Lock guard(&m_channelMutex);
if (m_connectionState == DESTROYED)
throw std::runtime_error("Channel already destroyed.");
// do destruction via context
m_context->destroyChannel(this, force);
}
/**
* Increment reference.
*/
void acquire() {
Lock guard(&m_channelMutex);
m_references++;
}
/**
* Actual destroy method, to be called <code>CAJContext</code>.
* @param force force destruction regardless of reference count
* @throws CAException
* @throws IllegalStateException
* @throws IOException
*/
void destroyChannel(bool force) {
Lock guard(&m_channelMutex);
if (m_connectionState == DESTROYED)
throw std::runtime_error("Channel already destroyed.");
m_references--;
if (m_references > 0 && !force)
return;
// stop searching...
m_context->getChannelSearchManager()->unregisterChannel(this);
cancel();
disconnectPendingIO(true);
if (m_connectionState == CONNECTED)
{
disconnect(false, true);
}
else if (m_transport)
{
// unresponsive state, do not forget to release transport
ReferenceCountingTransport* rct = dynamic_cast<ReferenceCountingTransport*>(m_transport);
if (rct) rct->release(this);
m_transport = 0;
}
setConnectionState(DESTROYED);
// unregister
m_context->unregisterChannel(this);
}
/**
* Disconnected notification.
* @param initiateSearch flag to indicate if searching (connect) procedure should be initiated
* @param remoteDestroy issue channel destroy request.
*/
void disconnect(bool initiateSearch, bool remoteDestroy) {
Lock guard(&m_channelMutex);
if (m_connectionState != CONNECTED && !m_transport)
return;
if (!initiateSearch) {
// stop searching...
m_context->getChannelSearchManager()->unregisterChannel(this);
cancel();
}
setConnectionState(DISCONNECTED);
disconnectPendingIO(false);
// release transport
if (m_transport)
{
if (remoteDestroy) {
m_issueCreateMessage = false;
m_transport->enqueueSendRequest(this);
}
ReferenceCountingTransport* rct = dynamic_cast<ReferenceCountingTransport*>(m_transport);
if (rct) rct->release(this);
m_transport = 0;
}
if (initiateSearch)
this->initiateSearch();
}
/**
* Initiate search (connect) procedure.
*/
void initiateSearch()
{
Lock guard(&m_channelMutex);
m_allowCreation = true;
if (!m_addresses)
m_context->getChannelSearchManager()->registerChannel(this);
/* TODO
else
// TODO not only first
// TODO minor version
// TODO what to do if there is no channel, do not search in a loop!!! do this in other thread...!
searchResponse(CAConstants.CA_MINOR_PROTOCOL_REVISION, addresses[0]);
*/
}
virtual void searchResponse(int8 minorRevision, osiSockAddr* serverAddress) {
Lock guard(&m_channelMutex);
Transport* transport = m_transport;
if (transport)
{
// multiple defined PV or reconnect request (same server address)
// TOD !!!! if (!(*(transport->getRemoteAddress()) == *serverAddress))
if (false)
{
m_requester->message("More than one channel with name '" + m_name +
"' detected, additional response from: " + inetAddressToString(serverAddress), warningMessage);
return;
}
}
transport = m_context->getTransport(this, serverAddress, minorRevision, m_priority);
if (!transport)
{
createChannelFailed();
return;
}
// create channel
createChannel(transport);
}
virtual void transportClosed() {
disconnect(true, false);
}
virtual void transportChanged() {
initiateSearch();
}
virtual void transportResponsive(Transport* transport) {
Lock guard(&m_channelMutex);
if (m_connectionState == DISCONNECTED)
{
updateSubscriptions();
// reconnect using existing IDs, data
connectionCompleted(m_serverChannelID/*, accessRights*/);
}
}
void transportUnresponsive() {
Lock guard(&m_channelMutex);
if (m_connectionState == CONNECTED)
{
// NOTE: 2 types of disconnected state - distinguish them
setConnectionState(DISCONNECTED);
// ... CA notifies also w/ no access rights callback, although access right are not changed
}
}
/**
* Set connection state and if changed, notifies listeners.
* @param newState state to set.
*/
void setConnectionState(ConnectionState connectionState)
{
Lock guard(&m_channelMutex);
if (m_connectionState != connectionState)
{
m_connectionState = connectionState;
//bool connectionStatusToReport = (connectionState == CONNECTED);
//if (connectionStatusToReport != lastReportedConnectionState)
{
//lastReportedConnectionState = connectionStatusToReport;
// TODO via dispatcher ?!!!
m_requester->channelStateChange(this, connectionState);
}
}
}
virtual void lock() {
// noop
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
m_channelMutex.lock();
bool issueCreateMessage = m_issueCreateMessage;
m_channelMutex.unlock();
if (issueCreateMessage)
{
control->startMessage((int8)7, 2+4);
// count
buffer->putShort((int16)1);
// array of CIDs and names
buffer->putInt(m_channelID);
SerializeHelper::serializeString(m_name, buffer, control);
// send immediately
// TODO
control->flush(true);
}
else
{
control->startMessage((int8)8, 4+4);
// SID
m_channelMutex.lock();
pvAccessID sid = m_serverChannelID;
m_channelMutex.unlock();
buffer->putInt(sid);
// CID
buffer->putInt(m_channelID);
// send immediately
// TODO
control->flush(true);
}
}
virtual void unlock() {
// noop
}
/**
* Disconnects (destroys) all channels pending IO.
* @param destroy <code>true</code> if channel is being destroyed.
*/
void disconnectPendingIO(bool destroy)
{
// TODO
}
/**
* Resubscribe subscriptions.
*/
// TODO to be called from non-transport thread !!!!!!
void resubscribeSubscriptions()
{
// TODO
}
/**
* Update subscriptions.
*/
// TODO to be called from non-transport thread !!!!!!
void updateSubscriptions()
{
// TODO
}
virtual void getField(GetFieldRequester *requester,epics::pvData::String subField)
{
requester->getDone(getStatusCreate()->getStatusOK(),m_pvStructure->getSubField(subField)->getField());
}
@@ -663,6 +1252,8 @@ class ChannelImpl :
return 0;
}
virtual void printInfo() {
String info;
printInfo(&info);
@@ -969,6 +1560,26 @@ class ChannelImpl :
initialize();
}
/**
* Register channel.
* @param channel
*/
void registerChannel(ChannelImpl* channel)
{
Lock guard(&m_cidMapMutex);
m_channelsByCID[channel->getChannelID()] = channel;
}
/**
* Unregister channel.
* @param channel
*/
void unregisterChannel(ChannelImpl* channel)
{
Lock guard(&m_cidMapMutex);
m_channelsByCID.erase(channel->getChannelID());
}
/**
* Searches for a channel with given channel ID.
* @param channelID CID.
@@ -980,7 +1591,6 @@ class ChannelImpl :
CIDChannelMap::iterator it = m_channelsByCID.find(channelID);
return (it == m_channelsByCID.end() ? 0 : it->second);
}
/**
* Generate Client channel ID (CID).
@@ -1006,7 +1616,31 @@ class ChannelImpl :
m_channelsByCID.erase(cid);
}
/**
* Get, or create if necessary, transport of given server address.
* @param serverAddress required transport address
* @param priority process priority.
* @return transport for given address
*/
Transport* getTransport(TransportClient* client, osiSockAddr* serverAddress, int16 minorRevision, int16 priority)
{
// TODO !!!
/*
try
{
return connector->connect(client, new ClientResponseHandler(this), serverAddress, minorRevision, priority);
}
catch (ConnectionException cex)
{
logger.log(Level.SEVERE, "Failed to create transport for: " + serverAddress, cex);
}
*/
return 0;
}
/**
* Internal create channel.
*/
// TODO no minor version with the addresses
@@ -1039,10 +1673,48 @@ class ChannelImpl :
}
else
{
// TODO is this OK?
throw std::runtime_error("Failed to obtain synchronization lock for '" + name + "', possible deadlock.");
}
}
/**
* Destroy channel.
* @param channel
* @param force
* @throws CAException
* @throws IllegalStateException
*/
void destroyChannel(ChannelImpl* channel, bool force) {
String name = channel->getChannelName();
bool lockAcquired = true; //namedLocker->acquireSynchronizationObject(name, LOCK_TIMEOUT);
if (lockAcquired)
{
try
{
channel->destroyChannel(force);
}
catch(...) {
// TODO
}
// TODO namedLocker->releaseSynchronizationObject(channel.getChannelName());
}
else
{
// TODO is this OK?
throw std::runtime_error("Failed to obtain synchronization lock for '" + name + "', possible deadlock.");
}
}
/**
* Get channel search manager.
* @return channel search manager.
*/
ChannelSearchManager* getChannelSearchManager() {
return m_channelSearchManager;
}
/**
* A space-separated list of broadcast address for process variable name resolution.
* Each address must be of the form: ip.number:port or host.name:port
@@ -1174,7 +1846,7 @@ class ChannelImpl :
* Context sync. mutex.
*/
Mutex m_contextMutex;
friend class ChannelProviderImpl;
};