From bf5e3f3e1969321df6f9a8bbb024c330f3bb9285 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Wed, 5 Jan 2011 10:14:18 +0100 Subject: [PATCH] ChannelSearchManager mocks --- testApp/remote/testRemoteClientImpl.cpp | 714 +++++++++++++++++++++++- 1 file changed, 693 insertions(+), 21 deletions(-) diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 12135f8..11893a8 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -13,6 +13,8 @@ #include #include #include +#include +#include using namespace epics::pvData; using namespace epics::pvAccess; @@ -370,24 +372,200 @@ class MockMonitor : public Monitor, public MonitorElement }; -class ChannelSearchManager; -class BlockingTCPConnector; -class NamedLockPattern; -class ResponseRequest; -class BeaconHandlerImpl; - -class ClientContextImpl; typedef int pvAccessID; + + +/** + * A request that expects an response. + * Responses identified by its I/O ID. + * This interface needs to be extended (to provide method called on response). + * @author Matej Sekoranja + */ +class ResponseRequest { + public: + + /** + * Get I/O ID. + * @return ioid + */ + virtual pvAccessID getIOID() = 0; + + /** + * Timeout notification. + */ + virtual void timeout() = 0; + + /** + * Cancel response request (always to be called to complete/destroy). + */ + virtual void cancel() = 0; + + /** + * Report status to clients (e.g. disconnected). + * @param status to report. + */ + virtual void reportStatus(Status* status) = 0; + + /** + * Get request requester. + * @return request requester. + */ + virtual Requester* getRequester() = 0; +}; + + // TODO consider std::unordered_map typedef std::map IOIDResponseRequestMap; + // TODO log #define CALLBACK_GUARD(code) try { code } catch(...) { } +class ClientContextImpl; + + + +/** + * CA response handler - main handler which dispatches responses to appripriate handlers. + * @author Matej Sekoranja + * @version $Id: ClientResponseHandler.java,v 1.1 2010/05/03 14:45:40 mrkraimer Exp $ + */ +class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoDefaultMethods { + private: + + /** + * Table of response handlers for each command ID. + */ + ResponseHandler** m_handlerTable; + + /** + * Context instance. + */ + ClientContextImpl* m_context; + + public: + + ~ClientResponseHandler() { + delete[] m_handlerTable; + } + + /** + * @param context + */ + ClientResponseHandler(ClientContextImpl* context) : m_context(context) { + static ResponseHandler* badResponse = 0; //new BadResponse(context); + static ResponseHandler* dataResponse = 0; //new DataResponseHandler(context); + + #define HANDLER_COUNT 28 + m_handlerTable = new ResponseHandler*[HANDLER_COUNT]; + m_handlerTable[ 0] = badResponse; // TODO new BeaconHandler(context), /* 0 */ + m_handlerTable[ 1] = badResponse; // TODO new ConnectionValidationHandler(context), /* 1 */ + m_handlerTable[ 2] = badResponse; // TODO new NoopResponse(context, "Echo"), /* 2 */ + m_handlerTable[ 3] = badResponse; // TODO new NoopResponse(context, "Search"), /* 3 */ + m_handlerTable[ 4] = badResponse; // TODO new SearchResponseHandler(context), /* 4 */ + m_handlerTable[ 5] = badResponse; // TODO new NoopResponse(context, "Introspection search"), /* 5 */ + m_handlerTable[ 6] = dataResponse; /* 6 - introspection search */ + m_handlerTable[ 7] = badResponse; // TODO new CreateChannelHandler(context), /* 7 */ + m_handlerTable[ 8] = badResponse; // TODO new NoopResponse(context, "Destroy channel"), /* 8 */ // TODO it might be useful to implement this... + m_handlerTable[ 9] = badResponse; /* 9 */ + m_handlerTable[10] = dataResponse; /* 10 - get response */ + m_handlerTable[11] = dataResponse; /* 11 - put response */ + m_handlerTable[12] = dataResponse; /* 12 - put-get response */ + m_handlerTable[13] = dataResponse; /* 13 - monitor response */ + m_handlerTable[14] = dataResponse; /* 14 - array response */ + m_handlerTable[15] = badResponse; /* 15 - cancel request */ + m_handlerTable[16] = dataResponse; /* 16 - process response */ + m_handlerTable[17] = dataResponse; /* 17 - get field response */ + m_handlerTable[18] = badResponse; // TODO new MessageHandler(context), /* 18 - message to Requester */ + m_handlerTable[19] = badResponse; // TODO new MultipleDataResponseHandler(context), /* 19 - grouped monitors */ + m_handlerTable[20] = dataResponse; /* 20 - RPC response */ + m_handlerTable[21] = badResponse; /* 21 */ + m_handlerTable[22] = badResponse; /* 22 */ + m_handlerTable[23] = badResponse; /* 23 */ + m_handlerTable[24] = badResponse; /* 24 */ + m_handlerTable[25] = badResponse; /* 25 */ + m_handlerTable[26] = badResponse; /* 26 */ + m_handlerTable[27] = badResponse; /* 27 */ + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) + { + if (command < 0 || command >= HANDLER_COUNT) + { + // TODO context.getLogger().fine("Invalid (or unsupported) command: " + command + "."); + std::cout << "Invalid (or unsupported) command: " << command << "." << std::endl; + // TODO remove debug output + char buf[100]; + sprintf(buf, "Invalid CA header %d its payload buffer", command); + hexDump(buf, (const int8*)(payloadBuffer->getArray()), payloadBuffer->getPosition(), payloadSize); + return; + } + + // delegate + m_handlerTable[command]->handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + } +}; + + + + +#include + +class SearchInstance { + public: + + virtual pvAccessID getChannelID() = 0; + virtual String getChannelName() = 0; + virtual void unsetListOwnership() = 0; + virtual void addAndSetListOwnership(ArrayFIFO* newOwner, int index) = 0; + virtual void removeAndUnsetListOwnership() = 0; + virtual int getOwnerIndex() = 0; + virtual bool generateSearchRequestMessage(ByteBuffer* buffer, TransportSendControl* control) = 0; + + /** + * Search response from server (channel found). + * @param minorRevision server minor CA revision. + * @param serverAddress server address. + */ + virtual void searchResponse(int8 minorRevision, osiSockAddr* serverAddress) = 0; +}; + +// TODO (only to make to make it compile) +class BaseSearchInstance : public SearchInstance +{ + public: + virtual pvAccessID getChannelID() { return 0; } + virtual String getChannelName() { return ""; } + virtual void unsetListOwnership() {} + virtual void addAndSetListOwnership(ArrayFIFO* newOwner, int index) {} + virtual void removeAndUnsetListOwnership() {} + virtual int getOwnerIndex() { return 0; } + + virtual bool generateSearchRequestMessage(ByteBuffer* buffer, TransportSendControl* control) { return false; }; +}; + +class ChannelSearchManager { // tODO no default, etc. + public: + virtual void registerChannel(SearchInstance* channel) = 0; + virtual void unregisterChannel(SearchInstance* channel) = 0; +}; + + + +class BlockingTCPConnector; +class NamedLockPattern; +class BeaconHandlerImpl; + + + + + PVDATA_REFCOUNT_MONITOR_DEFINE(channel); @@ -420,10 +598,10 @@ class ClientContextImpl : public ClientContext * @author Matej Sekoranja */ class ChannelImpl : - public Channel /*, + public Channel , public TransportClient, public TransportSender, - public BaseSearchInstance */ { + public BaseSearchInstance { private: /** @@ -469,13 +647,13 @@ class ChannelImpl : /** * Allow reconnection flag. */ - bool m_allowCreation; // = true; + bool m_allowCreation; /** * Reference counting. - * NOTE: synced on this. + * NOTE: synced on m_channelMutex. */ - int m_references; // = 1; + int m_references; /* ****************** */ /* CA protocol fields */ @@ -491,6 +669,16 @@ class ChannelImpl : */ pvAccessID m_serverChannelID; + + /** + * Context sync. mutex. + */ + Mutex m_channelMutex; + + /** + * Flag indicting what message to send. + */ + bool m_issueCreateMessage; // TODO mock PVStructure* m_pvStructure; @@ -525,17 +713,21 @@ class ChannelImpl : m_addresses(addresses), m_connectionState(NEVER_CONNECTED), m_allowCreation(true), - m_serverChannelID(0xFFFFFFFF) + m_references(1), + m_transport(0), + m_serverChannelID(0xFFFFFFFF), + m_issueCreateMessage(true) { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channel); - /* // register before issuing search request m_context->registerChannel(this); // connect connect(); - */ + + + // // mock // @@ -577,10 +769,19 @@ class ChannelImpl : return m_context->getProvider(); } + // NOTE: synchronization guarantees that transport is non-null and state == CONNECTED. virtual epics::pvData::String getRemoteAddress() { - return "TODO"; - } + Lock guard(&m_channelMutex); + if (m_connectionState != CONNECTED) { + static String emptyString; + return emptyString; + } + else + { + return inetAddressToString(m_transport->getRemoteAddress()); + } + } virtual epics::pvData::String getChannelName() { @@ -594,7 +795,8 @@ class ChannelImpl : virtual ConnectionState getConnectionState() { - return CONNECTED; + Lock guard(&m_channelMutex); + return m_connectionState; } virtual bool isConnected() @@ -607,7 +809,394 @@ class ChannelImpl : return readWrite; } - virtual void getField(GetFieldRequester *requester,epics::pvData::String subField) + /** + * Get client channel ID. + * @return client channel ID. + */ + pvAccessID getChannelID() const { + return m_channelID; + } + + void connect() { + Lock guard(&m_channelMutex); + // if not destroyed... + if (m_connectionState == DESTROYED) + throw std::runtime_error("Channel destroyed."); + else if (m_connectionState != CONNECTED) + initiateSearch(); + } + + void disconnect() { + Lock guard(&m_channelMutex); + // if not destroyed... + if (m_connectionState == DESTROYED) + throw std::runtime_error("Channel destroyed."); + else if (m_connectionState == CONNECTED) + disconnect(false, true); + } + + /** + * Create a channel, i.e. submit create channel request to the server. + * This method is called after search is complete. + * @param transport + */ + void createChannel(Transport* transport) + { + Lock guard(&m_channelMutex); + + // do not allow duplicate creation to the same transport + if (!m_allowCreation) + return; + m_allowCreation = false; + + // check existing transport + if (m_transport && m_transport != transport) + { + disconnectPendingIO(false); + + ReferenceCountingTransport* rct = dynamic_cast(m_transport); + if (rct) rct->release(this); + } + else if (m_transport == transport) + { + // request to sent create request to same transport, ignore + // this happens when server is slower (processing search requests) than client generating it + return; + } + + m_transport = transport; + m_transport->enqueueSendRequest(this); + } + + virtual void cancel() { + // noop + } + + virtual void timeout() { + createChannelFailed(); + } + + /** + * Create channel failed. + */ + virtual void createChannelFailed() + { + Lock guard(&m_channelMutex); + + cancel(); + // ... and search again + initiateSearch(); + } + + /** + * Called when channel created succeeded on the server. + * sid might not be valid, this depends on protocol revision. + * @param sid + */ + void connectionCompleted(pvAccessID sid/*, rights*/) + { + Lock guard(&m_channelMutex); + + bool allOK = false; + try + { + // do this silently + if (m_connectionState == DESTROYED) + return; + + // store data + m_serverChannelID = sid; + //setAccessRights(rights); + + // user might create monitors in listeners, so this has to be done before this can happen + // however, it would not be nice if events would come before connection event is fired + // but this cannot happen since transport (TCP) is serving in this thread + resubscribeSubscriptions(); + setConnectionState(CONNECTED); + allOK = true; + } + catch (...) { + // noop + // TODO at least log something?? + } + + if (!allOK) + { + // end connection request + cancel(); + } + } + + /** + * @param force force destruction regardless of reference count + */ + void destroy(bool force) { + Lock guard(&m_channelMutex); + if (m_connectionState == DESTROYED) + throw std::runtime_error("Channel already destroyed."); + + // do destruction via context + m_context->destroyChannel(this, force); + + } + + /** + * Increment reference. + */ + void acquire() { + Lock guard(&m_channelMutex); + m_references++; + } + + /** + * Actual destroy method, to be called CAJContext. + * @param force force destruction regardless of reference count + * @throws CAException + * @throws IllegalStateException + * @throws IOException + */ + void destroyChannel(bool force) { + Lock guard(&m_channelMutex); + + if (m_connectionState == DESTROYED) + throw std::runtime_error("Channel already destroyed."); + + m_references--; + if (m_references > 0 && !force) + return; + + // stop searching... + m_context->getChannelSearchManager()->unregisterChannel(this); + cancel(); + + disconnectPendingIO(true); + + if (m_connectionState == CONNECTED) + { + disconnect(false, true); + } + else if (m_transport) + { + // unresponsive state, do not forget to release transport + ReferenceCountingTransport* rct = dynamic_cast(m_transport); + if (rct) rct->release(this); + m_transport = 0; + } + + setConnectionState(DESTROYED); + + // unregister + m_context->unregisterChannel(this); + } + + /** + * Disconnected notification. + * @param initiateSearch flag to indicate if searching (connect) procedure should be initiated + * @param remoteDestroy issue channel destroy request. + */ + void disconnect(bool initiateSearch, bool remoteDestroy) { + Lock guard(&m_channelMutex); + + if (m_connectionState != CONNECTED && !m_transport) + return; + + if (!initiateSearch) { + // stop searching... + m_context->getChannelSearchManager()->unregisterChannel(this); + cancel(); + } + setConnectionState(DISCONNECTED); + + disconnectPendingIO(false); + + // release transport + if (m_transport) + { + if (remoteDestroy) { + m_issueCreateMessage = false; + m_transport->enqueueSendRequest(this); + } + + ReferenceCountingTransport* rct = dynamic_cast(m_transport); + if (rct) rct->release(this); + m_transport = 0; + } + + if (initiateSearch) + this->initiateSearch(); + + } + + /** + * Initiate search (connect) procedure. + */ + void initiateSearch() + { + Lock guard(&m_channelMutex); + + m_allowCreation = true; + + if (!m_addresses) + m_context->getChannelSearchManager()->registerChannel(this); + /* TODO + else + // TODO not only first + // TODO minor version + // TODO what to do if there is no channel, do not search in a loop!!! do this in other thread...! + searchResponse(CAConstants.CA_MINOR_PROTOCOL_REVISION, addresses[0]); + */ + } + + virtual void searchResponse(int8 minorRevision, osiSockAddr* serverAddress) { + Lock guard(&m_channelMutex); + Transport* transport = m_transport; + if (transport) + { + // multiple defined PV or reconnect request (same server address) + // TOD !!!! if (!(*(transport->getRemoteAddress()) == *serverAddress)) + if (false) + { + m_requester->message("More than one channel with name '" + m_name + + "' detected, additional response from: " + inetAddressToString(serverAddress), warningMessage); + return; + } + } + + transport = m_context->getTransport(this, serverAddress, minorRevision, m_priority); + if (!transport) + { + createChannelFailed(); + return; + } + + // create channel + createChannel(transport); + } + + virtual void transportClosed() { + disconnect(true, false); + } + + virtual void transportChanged() { + initiateSearch(); + } + + virtual void transportResponsive(Transport* transport) { + Lock guard(&m_channelMutex); + if (m_connectionState == DISCONNECTED) + { + updateSubscriptions(); + + // reconnect using existing IDs, data + connectionCompleted(m_serverChannelID/*, accessRights*/); + } + } + + void transportUnresponsive() { + Lock guard(&m_channelMutex); + if (m_connectionState == CONNECTED) + { + // NOTE: 2 types of disconnected state - distinguish them + setConnectionState(DISCONNECTED); + + // ... CA notifies also w/ no access rights callback, although access right are not changed + } + } + + /** + * Set connection state and if changed, notifies listeners. + * @param newState state to set. + */ + void setConnectionState(ConnectionState connectionState) + { + Lock guard(&m_channelMutex); + if (m_connectionState != connectionState) + { + m_connectionState = connectionState; + + //bool connectionStatusToReport = (connectionState == CONNECTED); + //if (connectionStatusToReport != lastReportedConnectionState) + { + //lastReportedConnectionState = connectionStatusToReport; + // TODO via dispatcher ?!!! + m_requester->channelStateChange(this, connectionState); + } + } + } + + virtual void lock() { + // noop + } + + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + m_channelMutex.lock(); + bool issueCreateMessage = m_issueCreateMessage; + m_channelMutex.unlock(); + + if (issueCreateMessage) + { + control->startMessage((int8)7, 2+4); + + // count + buffer->putShort((int16)1); + // array of CIDs and names + buffer->putInt(m_channelID); + SerializeHelper::serializeString(m_name, buffer, control); + // send immediately + // TODO + control->flush(true); + } + else + { + control->startMessage((int8)8, 4+4); + // SID + m_channelMutex.lock(); + pvAccessID sid = m_serverChannelID; + m_channelMutex.unlock(); + buffer->putInt(sid); + // CID + buffer->putInt(m_channelID); + // send immediately + // TODO + control->flush(true); + } + } + + virtual void unlock() { + // noop + } + + + /** + * Disconnects (destroys) all channels pending IO. + * @param destroy true if channel is being destroyed. + */ + void disconnectPendingIO(bool destroy) + { +// TODO + } + + /** + * Resubscribe subscriptions. + */ + // TODO to be called from non-transport thread !!!!!! + void resubscribeSubscriptions() + { +// TODO + } + + /** + * Update subscriptions. + */ + // TODO to be called from non-transport thread !!!!!! + void updateSubscriptions() + { +// TODO + } + + + virtual void getField(GetFieldRequester *requester,epics::pvData::String subField) { requester->getDone(getStatusCreate()->getStatusOK(),m_pvStructure->getSubField(subField)->getField()); } @@ -663,6 +1252,8 @@ class ChannelImpl : return 0; } + + virtual void printInfo() { String info; printInfo(&info); @@ -969,6 +1560,26 @@ class ChannelImpl : initialize(); } + /** + * Register channel. + * @param channel + */ + void registerChannel(ChannelImpl* channel) + { + Lock guard(&m_cidMapMutex); + m_channelsByCID[channel->getChannelID()] = channel; + } + + /** + * Unregister channel. + * @param channel + */ + void unregisterChannel(ChannelImpl* channel) + { + Lock guard(&m_cidMapMutex); + m_channelsByCID.erase(channel->getChannelID()); + } + /** * Searches for a channel with given channel ID. * @param channelID CID. @@ -980,7 +1591,6 @@ class ChannelImpl : CIDChannelMap::iterator it = m_channelsByCID.find(channelID); return (it == m_channelsByCID.end() ? 0 : it->second); } - /** * Generate Client channel ID (CID). @@ -1006,7 +1616,31 @@ class ChannelImpl : m_channelsByCID.erase(cid); } + /** + * Get, or create if necessary, transport of given server address. + * @param serverAddress required transport address + * @param priority process priority. + * @return transport for given address + */ + Transport* getTransport(TransportClient* client, osiSockAddr* serverAddress, int16 minorRevision, int16 priority) + { + // TODO !!! + /* + try + { + return connector->connect(client, new ClientResponseHandler(this), serverAddress, minorRevision, priority); + } + catch (ConnectionException cex) + { + logger.log(Level.SEVERE, "Failed to create transport for: " + serverAddress, cex); + } + */ + return 0; + + } + + /** * Internal create channel. */ // TODO no minor version with the addresses @@ -1039,10 +1673,48 @@ class ChannelImpl : } else { + // TODO is this OK? throw std::runtime_error("Failed to obtain synchronization lock for '" + name + "', possible deadlock."); } } + /** + * Destroy channel. + * @param channel + * @param force + * @throws CAException + * @throws IllegalStateException + */ + void destroyChannel(ChannelImpl* channel, bool force) { + + String name = channel->getChannelName(); + bool lockAcquired = true; //namedLocker->acquireSynchronizationObject(name, LOCK_TIMEOUT); + if (lockAcquired) + { + try + { + channel->destroyChannel(force); + } + catch(...) { + // TODO + } + // TODO namedLocker->releaseSynchronizationObject(channel.getChannelName()); + } + else + { + // TODO is this OK? + throw std::runtime_error("Failed to obtain synchronization lock for '" + name + "', possible deadlock."); + } + } + + /** + * Get channel search manager. + * @return channel search manager. + */ + ChannelSearchManager* getChannelSearchManager() { + return m_channelSearchManager; + } + /** * A space-separated list of broadcast address for process variable name resolution. * Each address must be of the form: ip.number:port or host.name:port @@ -1174,7 +1846,7 @@ class ChannelImpl : * Context sync. mutex. */ Mutex m_contextMutex; - + friend class ChannelProviderImpl; };