diff --git a/.hgignore b/.hgignore index 2090ed3..5be0514 100644 --- a/.hgignore +++ b/.hgignore @@ -1,10 +1,10 @@ QtC-pvAccess.creator.user syntax: glob -O.Common -O.linux-x86 +O.* +.DS_Store syntax: regexp ^bin ^include - +^lib diff --git a/pvAccessApp/ca/caConstants.h b/pvAccessApp/ca/caConstants.h index b3ef88b..999d5f2 100644 --- a/pvAccessApp/ca/caConstants.h +++ b/pvAccessApp/ca/caConstants.h @@ -68,7 +68,7 @@ namespace epics { const int16 CA_DEFAULT_PRIORITY = 0; /** Unreasonable channel name length. */ - const int32 UNREASONABLE_CHANNEL_NAME_LENGTH = 500; + const uint32 UNREASONABLE_CHANNEL_NAME_LENGTH = 500; /** Invalid data type. */ const int16 INVALID_DATA_TYPE = (int16)0xFFFF; diff --git a/pvAccessApp/client/pvAccess.h b/pvAccessApp/client/pvAccess.h index da230c3..ac9ea13 100644 --- a/pvAccessApp/client/pvAccess.h +++ b/pvAccessApp/client/pvAccess.h @@ -583,6 +583,17 @@ namespace epics { namespace pvAccess { virtual ChannelArray* createChannelArray( ChannelArrayRequester *channelArrayRequester, epics::pvData::PVStructure *pvRequest) = 0; + + /** + * Prints detailed information about the context to the standard output stream. + */ + virtual void printInfo() = 0; + + /** + * Prints detailed information about the context to the specified output stream. + * @param out the output stream. + */ + virtual void printInfo(epics::pvData::StringBuilder out) = 0; }; diff --git a/testApp/client/MockClientImpl.cpp b/testApp/client/MockClientImpl.cpp index bb17553..44f76bb 100644 --- a/testApp/client/MockClientImpl.cpp +++ b/testApp/client/MockClientImpl.cpp @@ -520,6 +520,26 @@ class MockChannel : public Channel { // TODO return 0; } + + virtual void printInfo() { + String info; + printInfo(&info); + std::cout << info.c_str() << std::endl; + } + + virtual void printInfo(epics::pvData::StringBuilder out) { + //std::ostringstream ostr; + //static String emptyString; + + out->append( "CHANNEL : "); out->append(m_name); + out->append("\nSTATE : "); out->append(ConnectionStateNames[getConnectionState()]); + if (isConnected()) + { + out->append("\nADDRESS : "); out->append(getRemoteAddress()); + //out->append("\nRIGHTS : "); out->append(getAccessRights()); + } + out->append("\n"); + } }; class MockChannelProvider; @@ -905,7 +925,7 @@ int main(int argc,char *argv[]) context->getProvider()->createChannel("test", &channelRequester, ChannelProvider::PRIORITY_DEFAULT, "over the rainbow"); Channel* channel = context->getProvider()->createChannel("test", &channelRequester); - std::cout << channel->getChannelName() << std::endl; + channel->printInfo(); GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, "timeStamp.secondsPastEpoch"); diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 237e0f1..dd6d10d 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -11,6 +12,7 @@ #include #include #include +#include using namespace epics::pvData; using namespace epics::pvAccess; @@ -369,42 +371,176 @@ class MockMonitor : public Monitor, public MonitorElement }; +class TransportRegistry; +class ChannelSearchManager; +class BlockingTCPConnector; +class NamedLockPattern; +class ResponseRequest; +class BeaconHandlerImpl; + +class ClientContextImpl; +typedef int pvAccessID; + // TODO consider std::unordered_map +typedef std::map IOIDResponseRequestMap; -PVDATA_REFCOUNT_MONITOR_DEFINE(mockChannel); +// TODO log +#define CALLBACK_GUARD(code) try { code } catch(...) { } -class ChannelImpl : public Channel { + +PVDATA_REFCOUNT_MONITOR_DEFINE(channel); + + +/** + * Context state enum. + */ +enum ContextState { + /** + * State value of non-initialized context. + */ + CONTEXT_NOT_INITIALIZED, + + /** + * State value of initialized context. + */ + CONTEXT_INITIALIZED, + + /** + * State value of destroyed context. + */ + CONTEXT_DESTROYED +}; + + +class ClientContextImpl : public ClientContext +{ + +/** + * Implementation of CAJ JCA Channel. + * @author Matej Sekoranja + */ +class ChannelImpl : + public Channel /*, + public TransportClient, + public TransportSender, + public BaseSearchInstance */ { private: - ChannelProvider* m_provider; - ChannelRequester* m_requester; - String m_name; - String m_remoteAddress; + /** + * Context. + */ + ClientContextImpl* m_context; + + /** + * Client channel ID. + */ + pvAccessID m_channelID; + + /** + * Channel name. + */ + String m_name; + + /** + * Channel requester. + */ + ChannelRequester* m_requester; + + /** + * Process priority. + */ + short m_priority; + + /** + * List of fixed addresses, if name resolution will be used. + */ + InetAddrVector* m_addresses; + + /** + * Connection status. + */ + ConnectionState m_connectionState; + + /** + * List of all channel's pending requests (keys are subscription IDs). + */ + IOIDResponseRequestMap m_responseRequests; + + /** + * Allow reconnection flag. + */ + bool m_allowCreation; // = true; + + /** + * Reference counting. + * NOTE: synced on this. + */ + int m_references; // = 1; + + /* ****************** */ + /* CA protocol fields */ + /* ****************** */ + + /** + * Server transport. + */ + Transport* m_transport; + + /** + * Server channel ID. + */ + pvAccessID m_serverChannelID; + + + // TODO mock PVStructure* m_pvStructure; private: - ~ChannelImpl() + ~ChannelImpl() { - PVDATA_REFCOUNT_MONITOR_DESTRUCT(mockChannel); + PVDATA_REFCOUNT_MONITOR_DESTRUCT(channel); } public: + /** + * Constructor. + * @param context + * @param name + * @param listener + * @throws CAException + */ ChannelImpl( - ChannelProvider* provider, - ChannelRequester* requester, - String name, - String remoteAddress) : - m_provider(provider), - m_requester(requester), + ClientContextImpl* context, + pvAccessID channelID, + String name, + ChannelRequester* requester, + short priority, + InetAddrVector* addresses) : + m_context(context), + m_channelID(channelID), m_name(name), - m_remoteAddress(remoteAddress) + m_requester(requester), + m_priority(priority), + m_addresses(addresses), + m_connectionState(NEVER_CONNECTED), + m_allowCreation(true), + m_serverChannelID(0xFFFFFFFF) { - PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannel); - + PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channel); + /* + // register before issuing search request + m_context->registerChannel(this); + + // connect + connect(); + */ + // + // mock + // ScalarType stype = pvDouble; String allProperties("alarm,timeStamp,display,control,valueAlarm"); @@ -416,10 +552,14 @@ class ChannelImpl : public Channel { // already connected, report state m_requester->channelStateChange(this, CONNECTED); + + + } virtual void destroy() { + if (m_addresses) delete m_addresses; delete m_pvStructure; delete this; }; @@ -436,12 +576,12 @@ class ChannelImpl : public Channel { virtual ChannelProvider* getProvider() { - return m_provider; + return m_context->getProvider(); } virtual epics::pvData::String getRemoteAddress() { - return m_remoteAddress; + return "TODO"; } virtual epics::pvData::String getChannelName() @@ -524,112 +664,124 @@ class ChannelImpl : public Channel { // TODO return 0; } + + virtual void printInfo() { + String info; + printInfo(&info); + std::cout << info.c_str() << std::endl; + } + + virtual void printInfo(epics::pvData::StringBuilder out) { + //Lock lock(&m_channelMutex); + //std::ostringstream ostr; + //static String emptyString; + + out->append( "CHANNEL : "); out->append(m_name); + out->append("\nSTATE : "); out->append(ConnectionStateNames[m_connectionState]); + if (m_connectionState == CONNECTED) + { + out->append("\nADDRESS : "); out->append(getRemoteAddress()); + //out->append("\nRIGHTS : "); out->append(getAccessRights()); + } + out->append("\n"); + } }; - -class ChannelProviderImpl; - -class ChannelImplFind : public ChannelFind -{ - public: - ChannelImplFind(ChannelProvider* provider) : m_provider(provider) - { - } - - virtual void destroy() - { - // one instance for all, do not delete at all - } - - virtual ChannelProvider* getChannelProvider() - { - return m_provider; - }; - virtual void cancelChannelFind() - { - throw std::runtime_error("not supported"); - } - - private: - - // only to be destroyed by it - friend class ChannelProviderImpl; - virtual ~ChannelImplFind() {} - - ChannelProvider* m_provider; -}; -class ChannelProviderImpl : public ChannelProvider { - public: - - ChannelProviderImpl() : m_mockChannelFind(new ChannelImplFind(this)) { - } - - virtual epics::pvData::String getProviderName() - { - return "ChannelProviderImpl"; - } + class ChannelProviderImpl; - virtual void destroy() + class ChannelImplFind : public ChannelFind { - delete m_mockChannelFind; - delete this; - } - - virtual ChannelFind* channelFind( - epics::pvData::String channelName, - ChannelFindRequester *channelFindRequester) - { - channelFindRequester->channelFindResult(getStatusCreate()->getStatusOK(), m_mockChannelFind, true); - return m_mockChannelFind; - } - - virtual Channel* createChannel( - epics::pvData::String channelName, - ChannelRequester *channelRequester, - short priority) - { - return createChannel(channelName, channelRequester, priority, "local"); - } - - virtual Channel* createChannel( - epics::pvData::String channelName, - ChannelRequester *channelRequester, - short priority, - epics::pvData::String address) - { - if (address == "local") + public: + ChannelImplFind(ChannelProvider* provider) : m_provider(provider) { - Channel* channel = new ChannelImpl(this, channelRequester, channelName, address); - channelRequester->channelCreated(getStatusCreate()->getStatusOK(), channel); - return channel; } - else - { - Status* errorStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "only local supported", 0); - channelRequester->channelCreated(errorStatus, 0); - delete errorStatus; // TODO guard from CB + + virtual void destroy() + { + // one instance for all, do not delete at all + } + + virtual ChannelProvider* getChannelProvider() + { + return m_provider; + }; + + virtual void cancelChannelFind() + { + throw std::runtime_error("not supported"); + } + + private: + + // only to be destroyed by it + friend class ChannelProviderImpl; + virtual ~ChannelImplFind() {} + + ChannelProvider* m_provider; + }; + + class ChannelProviderImpl : public ChannelProvider { + public: + + ChannelProviderImpl(ClientContextImpl* context) : + m_context(context) { + } + + virtual epics::pvData::String getProviderName() + { + return "ChannelProviderImpl"; + } + + virtual void destroy() + { + delete this; + } + + virtual ChannelFind* channelFind( + epics::pvData::String channelName, + ChannelFindRequester *channelFindRequester) + { + m_context->checkChannelName(channelName); + + if (!channelFindRequester) + throw std::runtime_error("null requester"); + + std::auto_ptr errorStatus(getStatusCreate()->createStatus(STATUSTYPE_ERROR, "not implemented", 0)); + channelFindRequester->channelFindResult(errorStatus.get(), 0, false); return 0; } - } - private: - ~ChannelProviderImpl() {}; + virtual Channel* createChannel( + epics::pvData::String channelName, + ChannelRequester *channelRequester, + short priority) + { + return createChannel(channelName, channelRequester, priority, emptyString); + } - ChannelImplFind* m_mockChannelFind; - -}; + virtual Channel* createChannel( + epics::pvData::String channelName, + ChannelRequester *channelRequester, + short priority, + epics::pvData::String address) + { + // TODO support addressList + Channel* channel = m_context->createChannelInternal(channelName, channelRequester, priority, 0); + if (channel) + channelRequester->channelCreated(getStatusCreate()->getStatusOK(), channel); + return channel; + + // NOTE it's up to internal code to respond w/ error to requester and return 0 in case of errors + } + + private: + ~ChannelProviderImpl() {}; + + /* TODO static*/ String emptyString; + ClientContextImpl* m_context; + }; - -class TransportRegistry; -class ChannelSearchManager; -class BlockingTCPConnector; -class NamedLockPattern; -class ResponseRequest; -class BeaconHandlerImpl; - -class ClientContextImpl : public ClientContext -{ public: ClientContextImpl() : @@ -637,9 +789,11 @@ class ClientContextImpl : public ClientContext m_broadcastPort(CA_BROADCAST_PORT), m_receiveBufferSize(MAX_TCP_RECV), m_timer(0), m_broadcastTransport(0), m_searchTransport(0), m_connector(0), m_transportRegistry(0), m_namedLocker(0), m_lastCID(0), m_lastIOID(0), m_channelSearchManager(0), - m_version(new Version("CA Client", "cpp", 0, 0, 0, 1)) + m_version(new Version("CA Client", "cpp", 0, 0, 0, 1)), + m_provider(new ChannelProviderImpl(this)), + m_contextState(CONTEXT_NOT_INITIALIZED) { - initialize(); + loadConfiguration(); } virtual Version* getVersion() { @@ -647,11 +801,21 @@ class ClientContextImpl : public ClientContext } virtual ChannelProvider* getProvider() { + Lock lock(&m_contextMutex); return m_provider; } virtual void initialize() { - m_provider = new ChannelProviderImpl(); + Lock lock(&m_contextMutex); + + if (m_contextState == CONTEXT_DESTROYED) + throw std::runtime_error("Context destroyed."); + else if (m_contextState == CONTEXT_INITIALIZED) + throw std::runtime_error("Context already initialized."); + + internalInitialize(); + + m_contextState = CONTEXT_INITIALIZED; } virtual void printInfo() { @@ -661,14 +825,50 @@ class ClientContextImpl : public ClientContext } virtual void printInfo(epics::pvData::StringBuilder out) { - out->append(m_version->getVersionString()); + Lock lock(&m_contextMutex); + std::ostringstream ostr; + static String emptyString; + + out->append( "CLASS : ::epics::pvAccess::ClientContextImpl"); + out->append("\nVERSION : "); out->append(m_version->getVersionString()); + out->append("\nADDR_LIST : "); ostr << m_addressList; out->append(ostr.str()); ostr.str(emptyString); + out->append("\nAUTO_ADDR_LIST : "); out->append(m_autoAddressList ? "true" : "false"); + out->append("\nCONNECTION_TIMEOUT : "); ostr << m_connectionTimeout; out->append(ostr.str()); ostr.str(emptyString); + out->append("\nBEACON_PERIOD : "); ostr << m_beaconPeriod; out->append(ostr.str()); ostr.str(emptyString); + out->append("\nBROADCAST_PORT : "); ostr << m_broadcastPort; out->append(ostr.str()); ostr.str(emptyString); + out->append("\nRCV_BUFFER_SIZE : "); ostr << m_receiveBufferSize; out->append(ostr.str()); ostr.str(emptyString); + out->append("\nSTATE : "); + switch (m_contextState) + { + case CONTEXT_NOT_INITIALIZED: + out->append("CONTEXT_NOT_INITIALIZED"); + break; + case CONTEXT_INITIALIZED: + out->append("CONTEXT_INITIALIZED"); + break; + case CONTEXT_DESTROYED: + out->append("CONTEXT_DESTROYED"); + break; + default: + out->append("UNKNOWN"); + } + out->append("\n"); } virtual void destroy() { - m_provider->destroy(); - delete m_version; - delete this; + m_contextMutex.lock(); + + if (m_contextState == CONTEXT_DESTROYED) + { + m_contextMutex.unlock(); + throw std::runtime_error("Context already destroyed."); + } + + // go into destroyed state ASAP + m_contextState = CONTEXT_DESTROYED; + + internalDestroy(); } virtual void dispose() @@ -678,6 +878,172 @@ class ClientContextImpl : public ClientContext private: ~ClientContextImpl() {}; + + void loadConfiguration() { + // TODO + /* + m_addressList = config->getPropertyAsString("EPICS4_CA_ADDR_LIST", m_addressList); + m_autoAddressList = config->getPropertyAsBoolean("EPICS4_CA_AUTO_ADDR_LIST", m_autoAddressList); + m_connectionTimeout = config->getPropertyAsFloat("EPICS4_CA_CONN_TMO", m_connectionTimeout); + m_beaconPeriod = config->getPropertyAsFloat("EPICS4_CA_BEACON_PERIOD", m_beaconPeriod); + m_broadcastPort = config->getPropertyAsInteger("EPICS4_CA_BROADCAST_PORT", m_broadcastPort); + m_receiveBufferSize = config->getPropertyAsInteger("EPICS4_CA_MAX_ARRAY_BYTES", m_receiveBufferSize); + */ + } + + void internalInitialize() { + + m_timer = new Timer("pvAccess-client timer", lowPriority); + /* TODO + connector = new BlockingTCPConnector(this, receiveBufferSize, beaconPeriod); + transportRegistry = new TransportRegistry(); + namedLocker = new NamedLockPattern(); + */ + + // setup UDP transport + initializeUDPTransport(); + + // TODO + // setup search manager + //channelSearchManager = new ChannelSearchManager(this); + } + + void initializeUDPTransport() { + // TODO + } + + void internalDestroy() { + + // stop searching + /* TODO + if (m_channelSearchManager) + channelSearchManager->destroy(); + */ + + // stop timer + if (m_timer) + delete m_timer; + + // + // cleanup + // + + // this will also close all CA transports + destroyAllChannels(); + + // close broadcast transport + /* TODO + if (m_broadcastTransport) + m_broadcastTransport->destroy(true); + if (m_searchTransport != null) + m_searchTransport->destroy(true); + */ + + m_provider->destroy(); + delete m_version; + m_contextMutex.unlock(); + delete this; + } + + void destroyAllChannels() { + // TODO + } + + /** + * Check channel name. + */ + void checkChannelName(String& name) { + if (name.empty()) + throw std::runtime_error("null or empty channel name"); + else if (name.length() > UNREASONABLE_CHANNEL_NAME_LENGTH) + throw std::runtime_error("name too long"); + } + + /** + * Check context state and tries to establish necessary state. + */ + void checkState() { + Lock lock(&m_contextMutex); // TODO check double-lock?!!! + + if (m_contextState == CONTEXT_DESTROYED) + throw std::runtime_error("Context destroyed."); + else if (m_contextState == CONTEXT_NOT_INITIALIZED) + initialize(); + } + + /** + * Searches for a channel with given channel ID. + * @param channelID CID. + * @return channel with given CID, 0 if non-existent. + */ + ChannelImpl* getChannel(pvAccessID channelID) + { + Lock guard(&m_cidMapMutex); + CIDChannelMap::iterator it = m_channelsByCID.find(channelID); + return (it == m_channelsByCID.end() ? 0 : it->second); + } + + + /** + * Generate Client channel ID (CID). + * @return Client channel ID (CID). + */ + pvAccessID generateCID() + { + Lock guard(&m_cidMapMutex); + + // search first free (theoretically possible loop of death) + while (m_channelsByCID.find(++m_lastCID) != m_channelsByCID.end()); + // reserve CID + m_channelsByCID[m_lastCID] = 0; + return m_lastCID; + } + + /** + * Free generated channel ID (CID). + */ + void freeCID(int cid) + { + Lock guard(&m_cidMapMutex); + m_channelsByCID.erase(cid); + } + + /** + * Internal create channel. + */ + // TODO no minor version with the addresses + // TODO what if there is an channel with the same name, but on different host! + Channel* createChannelInternal(String name, ChannelRequester* requester, short priority, + InetAddrVector* addresses) { // TODO addresses + + checkState(); + checkChannelName(name); + + if (requester == 0) + throw std::runtime_error("null requester"); + + if (priority < ChannelProvider::PRIORITY_MIN || priority > ChannelProvider::PRIORITY_MAX) + throw std::range_error("priority out of bounds"); + + bool lockAcquired = true; // TODO namedLocker->acquireSynchronizationObject(name, LOCK_TIMEOUT); + if (lockAcquired) + { + try + { + pvAccessID cid = generateCID(); + return new ChannelImpl(this, cid, name, requester, priority, addresses); + } + catch(...) { + // TODO + return 0; + } + // TODO namedLocker.releaseSynchronizationObject(name); + } + else + { + throw std::runtime_error("Failed to obtain synchronization lock for '" + name + "', possible deadlock."); + } + } /** * A space-separated list of broadcast address for process variable name resolution. @@ -753,10 +1119,14 @@ class ClientContextImpl : public ClientContext * Map of channels (keys are CIDs). */ // TODO consider std::unordered_map - typedef std::map IntChannelMap; - IntChannelMap m_channelsByCID; + typedef std::map CIDChannelMap; + CIDChannelMap m_channelsByCID; + + /** + * CIDChannelMap mutex. + */ + Mutex m_cidMapMutex; -typedef int pvAccessID; /** * Last CID cache. */ @@ -766,8 +1136,8 @@ typedef int pvAccessID; * Map of pending response requests (keys are IOID). */ // TODO consider std::unordered_map - typedef std::map IntResponseRequestMap; - IntResponseRequestMap m_pendingResponseRequests; + typedef std::map IOIDResponseRequestMap; + IOIDResponseRequestMap m_pendingResponseRequests; /** * Last IOID cache. @@ -796,6 +1166,18 @@ typedef int pvAccessID; * Provider implementation. */ ChannelProviderImpl* m_provider; + + /** + * Context state. + */ + ContextState m_contextState; + + /** + * Context sync. mutex. + */ + Mutex m_contextMutex; + + friend class ChannelProviderImpl; }; @@ -1030,11 +1412,8 @@ int main(int argc,char *argv[]) context->getProvider()->channelFind("something", &findRequester); ChannelRequesterImpl channelRequester; - //Channel* noChannel - context->getProvider()->createChannel("test", &channelRequester, ChannelProvider::PRIORITY_DEFAULT, "over the rainbow"); - Channel* channel = context->getProvider()->createChannel("test", &channelRequester); - std::cout << channel->getChannelName() << std::endl; + channel->printInfo(); /* GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, "timeStamp.secondsPastEpoch");