diff --git a/.hgignore b/.hgignore index 2090ed3..5be0514 100644 --- a/.hgignore +++ b/.hgignore @@ -1,10 +1,10 @@ QtC-pvAccess.creator.user syntax: glob -O.Common -O.linux-x86 +O.* +.DS_Store syntax: regexp ^bin ^include - +^lib diff --git a/pvAccessApp/Makefile b/pvAccessApp/Makefile index c1c15a4..a6ebef8 100644 --- a/pvAccessApp/Makefile +++ b/pvAccessApp/Makefile @@ -19,12 +19,16 @@ INC += inetAddressUtil.h INC += logger.h INC += introspectionRegistry.h INC += transportRegistry.h +INC += namedLockPattern.h +INC += referenceCountingLock.h LIBSRCS += hexDump.cpp LIBSRCS += wildcharMatcher.cpp LIBSRCS += inetAddressUtil.cpp LIBSRCS += logger.cpp LIBSRCS += introspectionRegistry.cpp LIBSRCS += transportRegistry.cpp +LIBSRCS += namedLockPattern.cpp +LIBSRCS += referenceCountingLock.cpp SRC_DIRS += $(PVACCESS)/client diff --git a/pvAccessApp/ca/caConstants.h b/pvAccessApp/ca/caConstants.h index b3ef88b..999d5f2 100644 --- a/pvAccessApp/ca/caConstants.h +++ b/pvAccessApp/ca/caConstants.h @@ -68,7 +68,7 @@ namespace epics { const int16 CA_DEFAULT_PRIORITY = 0; /** Unreasonable channel name length. */ - const int32 UNREASONABLE_CHANNEL_NAME_LENGTH = 500; + const uint32 UNREASONABLE_CHANNEL_NAME_LENGTH = 500; /** Invalid data type. */ const int16 INVALID_DATA_TYPE = (int16)0xFFFF; diff --git a/pvAccessApp/client/pvAccess.h b/pvAccessApp/client/pvAccess.h index da230c3..ac9ea13 100644 --- a/pvAccessApp/client/pvAccess.h +++ b/pvAccessApp/client/pvAccess.h @@ -583,6 +583,17 @@ namespace epics { namespace pvAccess { virtual ChannelArray* createChannelArray( ChannelArrayRequester *channelArrayRequester, epics::pvData::PVStructure *pvRequest) = 0; + + /** + * Prints detailed information about the context to the standard output stream. + */ + virtual void printInfo() = 0; + + /** + * Prints detailed information about the context to the specified output stream. + * @param out the output stream. + */ + virtual void printInfo(epics::pvData::StringBuilder out) = 0; }; diff --git a/pvAccessApp/remote/beaconHandler.cpp b/pvAccessApp/remote/beaconHandler.cpp index 88a4392..596ace2 100644 --- a/pvAccessApp/remote/beaconHandler.cpp +++ b/pvAccessApp/remote/beaconHandler.cpp @@ -69,7 +69,7 @@ bool BeaconHandler::updateBeacon(int8 remoteTransportRevision, int64 timestamp, void BeaconHandler::beaconArrivalNotify() { - int32 size; + int32 size = 0; //TODO TCP name must be get from somewhere not hardcoded //TODO Transport** transports = NULL;//_context->getTransportRegistry().get("TCP", _responseFrom, size); @@ -83,12 +83,12 @@ void BeaconHandler::beaconArrivalNotify() { transports[i]->aliveNotification(); } - delete transports; + delete[] transports; } void BeaconHandler::changedTransport() { - int32 size; + int32 size = 0; //TODO TCP name must be get from somewhere not hardcoded //TODO Transport** transports = NULL;//_context->getTransportRegistry().get("TCP", _responseFrom, size); @@ -102,7 +102,7 @@ void BeaconHandler::changedTransport() { transports[i]->changedTransport(); } - delete transports; + delete[] transports; } }} diff --git a/pvAccessApp/remote/beaconHandler.h b/pvAccessApp/remote/beaconHandler.h index 73fa8fa..2a033a6 100644 --- a/pvAccessApp/remote/beaconHandler.h +++ b/pvAccessApp/remote/beaconHandler.h @@ -67,7 +67,6 @@ namespace epics { namespace pvAccess { * Mutex */ Mutex _mutex; - /** * Update beacon. * @param remoteTransportRevision encoded (major, minor) revision. diff --git a/pvAccessApp/remote/beaconServerStatusProvider.h b/pvAccessApp/remote/beaconServerStatusProvider.h index e6711da..5b65494 100644 --- a/pvAccessApp/remote/beaconServerStatusProvider.h +++ b/pvAccessApp/remote/beaconServerStatusProvider.h @@ -24,7 +24,7 @@ namespace epics { namespace pvAccess { */ BeaconServerStatusProvider(ServerContext* context); /** - * Test Constructor (ohne context) + * Test Constructor (without context) */ BeaconServerStatusProvider(); /** diff --git a/pvAccessApp/utils/namedLockPattern.cpp b/pvAccessApp/utils/namedLockPattern.cpp new file mode 100644 index 0000000..538d502 --- /dev/null +++ b/pvAccessApp/utils/namedLockPattern.cpp @@ -0,0 +1,7 @@ +/* + * namedLockPattern.cpp + */ + +#include "namedLockPattern.h" + +//NOTE NamedLockPattern is template so implementation is in header file diff --git a/pvAccessApp/utils/namedLockPattern.h b/pvAccessApp/utils/namedLockPattern.h new file mode 100644 index 0000000..d57455a --- /dev/null +++ b/pvAccessApp/utils/namedLockPattern.h @@ -0,0 +1,144 @@ +/* + * namedLockPattern.h + */ + +#ifndef NAMEDLOCKPATTERN_H +#define NAMEDLOCKPATTERN_H + +#include +#include + +#include +#include + +#include "referenceCountingLock.h" + +using namespace std; +using namespace epics::pvData; + +namespace epics { namespace pvAccess { +/** + * NamedLockPattern + */ +template > +class NamedLockPattern +{ +public: + /** + * Constructor. + */ + NamedLockPattern() {}; + /** + * Destructor. + */ + virtual ~NamedLockPattern() {}; + /** + * Acquire synchronization lock for named object. + * @param name name of the object whose lock to acquire. + * @param msec the number of milleseconds to wait. + * An argument less than or equal to zero means not to wait at all. + * @return true if acquired, false othwerwise. + */ + bool acquireSynchronizationObject(const Key name, const int64 msec); + /** + * Release synchronization lock for named object. + * @param name name of the object whose lock to release. + */ + void releaseSynchronizationObject(const Key name); +private: + Mutex _mutex; + std::map _namedLocks; + typename std::map::iterator _namedLocksIter; + + /** + * Release synchronization lock for named object. + * @param name name of the object whose lock to release. + * @param release set to false if there is no need to call release + * on synchronization lock. + */ + void releaseSynchronizationObject(const Key name,const bool release); +}; + +template +bool NamedLockPattern::acquireSynchronizationObject(const Key name, const int64 msec) +{ + ReferenceCountingLock* lock; + { //due to guard + Lock guard(&_mutex); + + _namedLocksIter = _namedLocks.find(name); + // get synchronization object + + // none is found, create and return new one + // increment references + if(_namedLocksIter == _namedLocks.end()) + { + lock = new ReferenceCountingLock(); + _namedLocks[name] = lock; + } + else + { + lock = _namedLocksIter->second; + lock->increment(); + } + } // end of guarded area + + bool success = lock->acquire(msec); + + if(!success) + { + releaseSynchronizationObject(name, false); + } + + return success; +} + +template +void NamedLockPattern::releaseSynchronizationObject(const Key name) +{ + releaseSynchronizationObject(name, true); +} + +template +void NamedLockPattern::releaseSynchronizationObject(const Key name,const bool release) +{ + Lock guard(&_mutex); + ReferenceCountingLock* lock; + _namedLocksIter = _namedLocks.find(name); + + // release lock + if (_namedLocksIter != _namedLocks.end()) + { + lock = _namedLocksIter->second; + + // release the lock + if (release) + { + lock->release(); + } + + // if there only one current lock exists + // remove it from the map + if (lock->decrement() <= 0) + { + _namedLocks.erase(_namedLocksIter); + delete lock; + } + } +} + +template +class NamedLock : private NoDefaultMethods +{ +public: + NamedLock(NamedLockPattern* namedLockPattern): _namedLockPattern(namedLockPattern) {} + bool acquireSynchronizationObject(const Key name, const int64 msec) {_name = name; return _namedLockPattern->acquireSynchronizationObject(name,msec);} + ~NamedLock(){_namedLockPattern->releaseSynchronizationObject(_name);} +private: + Key _name; + NamedLockPattern* _namedLockPattern; +}; + +}} + +#endif /* NAMEDLOCKPATTERN_H */ diff --git a/pvAccessApp/utils/referenceCountingLock.cpp b/pvAccessApp/utils/referenceCountingLock.cpp new file mode 100644 index 0000000..4a8b3fb --- /dev/null +++ b/pvAccessApp/utils/referenceCountingLock.cpp @@ -0,0 +1,81 @@ +/* + * namedLockPattern.cpp + */ + +#include "referenceCountingLock.h" + +namespace epics { namespace pvAccess { + +ReferenceCountingLock::ReferenceCountingLock(): _references(1) +{ + pthread_mutexattr_t mutexAttribute; + int32 retval = pthread_mutexattr_init(&mutexAttribute); + if(retval != 0) + { + //string errMsg = "Error: pthread_mutexattr_init failed: " + string(strerror(retval)); + assert(true); + } + retval = pthread_mutexattr_settype(&mutexAttribute, PTHREAD_MUTEX_RECURSIVE); + if(retval == 0) + { + retval = pthread_mutex_init(&_mutex, &mutexAttribute); + if(retval != 0) + { + //string errMsg = "Error: pthread_mutex_init failed: " + string(strerror(retval)); + assert(true); + } + } + else + { + //string errMsg = "Error: pthread_mutexattr_settype failed: " + string(strerror(retval)); + assert(true); + } + + pthread_mutexattr_destroy(&mutexAttribute); +} + +ReferenceCountingLock::~ReferenceCountingLock() +{ + pthread_mutex_destroy(&_mutex); +} + +bool ReferenceCountingLock::acquire(int64 msecs) +{ + struct timespec deltatime; + deltatime.tv_sec = msecs / 1000; + deltatime.tv_nsec = (msecs % 1000) * 1000; + + int32 retval = pthread_mutex_timedlock(&_mutex, &deltatime); + if(retval == 0) + { + return true; + } + return false; +} + +void ReferenceCountingLock::release() +{ + int retval = pthread_mutex_unlock(&_mutex); + if(retval != 0) + { + string errMsg = "Error: pthread_mutex_unlock failed: " + string(strerror(retval)); + //TODO do something? + } +} + +int ReferenceCountingLock::increment() +{ + //TODO does it really has to be atomic? + //return ++_references; + return __sync_add_and_fetch(&_references,1); +} + +int ReferenceCountingLock::decrement() +{ + //TODO does it really has to be atomic? + //return --_references; + return __sync_sub_and_fetch(&_references,1); +} + +}} + diff --git a/pvAccessApp/utils/referenceCountingLock.h b/pvAccessApp/utils/referenceCountingLock.h new file mode 100644 index 0000000..2825cae --- /dev/null +++ b/pvAccessApp/utils/referenceCountingLock.h @@ -0,0 +1,74 @@ +/* + * referenceCountingLock.h + */ + +#ifndef REFERENCECOUNTINGLOCK_H +#define REFERENCECOUNTINGLOCK_H + +#include +#include +#include +#include +#include + +#include +#include + +using namespace std; +using namespace epics::pvData; + +namespace epics { namespace pvAccess { + +/** + * Reference counting mutex implementation w/ deadlock detection. + * Synchronization helper class used (intended for use) for activation/deactivation synchronization. + * This class enforces attempt method of acquiring the locks to prevent deadlocks. + * Class also offers reference counting. + * (NOTE: automatic lock counting was not implemented due to imperfect usage.) + * + */ +class ReferenceCountingLock +{ +public: + /** + * Constructor of ReferenceCountingLock. + * After construction lock is free and reference count equals 1. + */ + ReferenceCountingLock(); + /** + * Destructor of ReferenceCountingLock. + */ + virtual ~ReferenceCountingLock(); + /** + * Attempt to acquire lock. + * + * @param msecs the number of milleseconds to wait. + * An argument less than or equal to zero means not to wait at all. + * @return true if acquired, false otherwise. + */ + bool acquire(int64 msecs); + /** + * Release previously acquired lock. + */ + void release(); + /** + * Increment number of references. + * + * @return number of references. + */ + int increment(); + /** + * Decrement number of references. + * + * @return number of references. + */ + int decrement(); +private: + int _references; + pthread_mutex_t _mutex; + +}; + +}} + +#endif /* NAMEDLOCKPATTERN_H */ diff --git a/testApp/client/MockClientImpl.cpp b/testApp/client/MockClientImpl.cpp index bb17553..44f76bb 100644 --- a/testApp/client/MockClientImpl.cpp +++ b/testApp/client/MockClientImpl.cpp @@ -520,6 +520,26 @@ class MockChannel : public Channel { // TODO return 0; } + + virtual void printInfo() { + String info; + printInfo(&info); + std::cout << info.c_str() << std::endl; + } + + virtual void printInfo(epics::pvData::StringBuilder out) { + //std::ostringstream ostr; + //static String emptyString; + + out->append( "CHANNEL : "); out->append(m_name); + out->append("\nSTATE : "); out->append(ConnectionStateNames[getConnectionState()]); + if (isConnected()) + { + out->append("\nADDRESS : "); out->append(getRemoteAddress()); + //out->append("\nRIGHTS : "); out->append(getAccessRights()); + } + out->append("\n"); + } }; class MockChannelProvider; @@ -905,7 +925,7 @@ int main(int argc,char *argv[]) context->getProvider()->createChannel("test", &channelRequester, ChannelProvider::PRIORITY_DEFAULT, "over the rainbow"); Channel* channel = context->getProvider()->createChannel("test", &channelRequester); - std::cout << channel->getChannelName() << std::endl; + channel->printInfo(); GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, "timeStamp.secondsPastEpoch"); diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index 2228d16..0ddd4f4 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -18,6 +18,9 @@ PROD_HOST += testBeaconEmitter testBeaconEmitter_SRCS += testBeaconEmitter.cpp testBeaconEmitter_LIBS += pvData pvAccess Com +PROD_HOST += testBeaconHandler +testBeaconHandler_SRCS += testBeaconHandler.cpp +testBeaconHandler_LIBS += pvData pvAccess Com include $(TOP)/configure/RULES #---------------------------------------- diff --git a/testApp/remote/testBeaconEmitter.cpp b/testApp/remote/testBeaconEmitter.cpp index ac1ec33..9ddd828 100644 --- a/testApp/remote/testBeaconEmitter.cpp +++ b/testApp/remote/testBeaconEmitter.cpp @@ -42,9 +42,9 @@ void testBeaconEmitter() osiSockAddr* addr = new osiSockAddr; addr->ia.sin_family = AF_INET; addr->ia.sin_port = htons(5067); - if(inet_aton("92.50.75.255",&addr->ia.sin_addr)==0) { - cout<<"error in inet_aton()"<ia.sin_addr)==0) + { + assert(false); } broadcastAddresses->push_back(addr); BlockingUDPConnector connector(true, broadcastAddresses, true); diff --git a/testApp/remote/testBeaconHandler.cpp b/testApp/remote/testBeaconHandler.cpp index 435acac..30a51e2 100644 --- a/testApp/remote/testBeaconHandler.cpp +++ b/testApp/remote/testBeaconHandler.cpp @@ -6,6 +6,7 @@ #include "blockingUDP.h" #include "beaconHandler.h" #include "inetAddressUtil.h" +#include "introspectionRegistry.h" #include @@ -14,32 +15,105 @@ using namespace epics::pvAccess; using namespace epics::pvData; +using namespace std; + +void decodeFromIPv6Address(ByteBuffer* buffer, osiSockAddr* address) +{ + // IPv4 compatible IPv6 address + // first 80-bit are 0 + buffer->getLong(); + buffer->getShort(); + // next 16-bits are 1 + buffer->getShort(); + // following IPv4 address in big-endian (network) byte order + in_addr_t ipv4Addr = 0; + ipv4Addr |= (uint32)buffer->getByte() << 24; + ipv4Addr |= (uint32)buffer->getByte() << 16; + ipv4Addr |= (uint32)buffer->getByte() << 8; + ipv4Addr |= (uint32)buffer->getByte() << 0; + address->ia.sin_addr.s_addr = ipv4Addr; +} class BeaconResponseHandler : public ResponseHandler { public: + BeaconResponseHandler() + { + _pvDataCreate = getPVDataCreate(); + } + virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, ByteBuffer* payloadBuffer) { - cout << "DummyResponseHandler::handleResponse" << endl; + cout << "BeaconResponseHandler::handleResponse" << endl; + + // reception timestamp + TimeStamp timestamp; + timestamp.getCurrent(); + + //TODO + //super.handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + + transport->ensureData((2*sizeof(int16)+2*sizeof(int32)+128)/sizeof(int8)); + + const int32 sequentalID = payloadBuffer->getShort() & 0x0000FFFF; + const TimeStamp startupTimestamp(payloadBuffer->getInt() & 0x00000000FFFFFFFFL,(int32)(payloadBuffer->getInt() & 0x00000000FFFFFFFFL)); + + // 128-bit IPv6 address + osiSockAddr address; + decodeFromIPv6Address(payloadBuffer, &address); + + // get port + const int32 port = payloadBuffer->getShort() & 0xFFFF; + address.ia.sin_port = ntohs(port); + + // accept given address if explicitly specified by sender + if (!ipv4AddressToInt(address)) + { + responseFrom->ia.sin_port = port; + } + else + { + responseFrom->ia.sin_port = port; + responseFrom->ia.sin_addr.s_addr = address.ia.sin_addr.s_addr; + } + + //org.epics.ca.client.impl.remote.BeaconHandler beaconHandler = context.getBeaconHandler(responseFrom); + // currently we care only for servers used by this context + //if (beaconHandler == null) + // return; + + // extra data + PVFieldPtr data = NULL; + const FieldConstPtr field = IntrospectionRegistry::deserializeFull(payloadBuffer, transport); + if (field != NULL) + { + data = _pvDataCreate->createPVField(NULL, field); + data->deserialize(payloadBuffer, transport); + } + + // notify beacon handler + //beaconHandler.beaconNotify(responseFrom, version, timestamp, startupTimestamp, sequentalID, data); } + +private: + PVDataCreate* _pvDataCreate; + BeaconHandler* _beaconHandler; }; void testBeaconHandler() { - BeacondResponseHandler brh; + BeaconResponseHandler brh; BlockingUDPConnector connector(false, NULL, true); - DummyClientContext context; osiSockAddr bindAddr; bindAddr.ia.sin_family = AF_INET; bindAddr.ia.sin_port = htons(5067); bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY); Transport* transport = connector.connect(NULL, &brh, &bindAddr, 1, 50); - - ((BlockingUDPTransport*)transport)->start(); + (static_cast(transport))->start(); while(1) sleep(1); diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 7db24eb..12135f8 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -1,9 +1,10 @@ /* testRemoteClientImpl.cpp */ /* Author: Matej Sekoranja Date: 2011.1.1 */ -#include + #include #include +#include #include #include #include @@ -11,6 +12,7 @@ #include #include #include +#include using namespace epics::pvData; using namespace epics::pvAccess; @@ -25,7 +27,7 @@ class ChannelImplProcess : public ChannelProcess ChannelProcessRequester* m_channelProcessRequester; PVStructure* m_pvStructure; PVScalar* m_valueField; - + private: ~ChannelImplProcess() { @@ -44,29 +46,29 @@ class ChannelImplProcess : public ChannelProcess Status* noValueFieldStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "no 'value' field"); m_channelProcessRequester->channelProcessConnect(noValueFieldStatus, this); delete noValueFieldStatus; - + // NOTE client must destroy this instance... // do not access any fields and return ASAP return; } - + if (field->getField()->getType() != scalar) { Status* notAScalarStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "'value' field not scalar type"); m_channelProcessRequester->channelProcessConnect(notAScalarStatus, this); delete notAScalarStatus; - + // NOTE client must destroy this instance…. // do not access any fields and return ASAP return; } - + m_valueField = static_cast(field); - - // TODO pvRequest + + // TODO pvRequest m_channelProcessRequester->channelProcessConnect(getStatusCreate()->getStatusOK(), this); } - + virtual void process(bool lastRequest) { switch (m_valueField->getScalar()->getScalarType()) @@ -138,19 +140,19 @@ class ChannelImplProcess : public ChannelProcess default: // noop break; - - } + + } m_channelProcessRequester->processDone(getStatusCreate()->getStatusOK()); - + if (lastRequest) destroy(); } - + virtual void destroy() { delete this; } - + }; @@ -167,7 +169,7 @@ class ChannelImplGet : public ChannelGet PVStructure* m_pvStructure; BitSet* m_bitSet; volatile bool m_first; - + private: ~ChannelImplGet() { @@ -181,10 +183,10 @@ class ChannelImplGet : public ChannelGet { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelGet); - // TODO pvRequest + // TODO pvRequest m_channelGetRequester->channelGetConnect(getStatusCreate()->getStatusOK(), this, m_pvStructure, m_bitSet); } - + virtual void get(bool lastRequest) { m_channelGetRequester->getDone(getStatusCreate()->getStatusOK()); @@ -193,17 +195,17 @@ class ChannelImplGet : public ChannelGet m_first = false; m_bitSet->set(0); // TODO } - + if (lastRequest) destroy(); } - + virtual void destroy() { delete m_bitSet; delete this; } - + }; @@ -222,7 +224,7 @@ class ChannelImplPut : public ChannelPut PVStructure* m_pvStructure; BitSet* m_bitSet; volatile bool m_first; - + private: ~ChannelImplPut() { @@ -236,17 +238,17 @@ class ChannelImplPut : public ChannelPut { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelPut); - // TODO pvRequest + // TODO pvRequest m_channelPutRequester->channelPutConnect(getStatusCreate()->getStatusOK(), this, m_pvStructure, m_bitSet); } - + virtual void put(bool lastRequest) { m_channelPutRequester->putDone(getStatusCreate()->getStatusOK()); if (lastRequest) destroy(); } - + virtual void get() { m_channelPutRequester->getDone(getStatusCreate()->getStatusOK()); @@ -257,7 +259,7 @@ class ChannelImplPut : public ChannelPut delete m_bitSet; delete this; } - + }; @@ -278,7 +280,7 @@ class MockMonitor : public Monitor, public MonitorElement volatile bool m_first; Mutex* m_lock; volatile int m_count; - + private: ~MockMonitor() { @@ -297,16 +299,16 @@ class MockMonitor : public Monitor, public MonitorElement PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockMonitor); m_changedBitSet->set(0); - - // TODO pvRequest + + // TODO pvRequest m_monitorRequester->monitorConnect(getStatusCreate()->getStatusOK(), this, const_cast(m_pvStructure->getStructure())); } - + virtual Status* start() { // fist monitor m_monitorRequester->monitorEvent(this); - + // client needs to delete status, so passing shared OK instance is not right thing to do return getStatusCreate()->createStatus(STATUSTYPE_OK, "Monitor started."); } @@ -337,24 +339,24 @@ class MockMonitor : public Monitor, public MonitorElement if (m_count) m_count--; } - + virtual void destroy() { delete stop(); - + delete m_lock; delete m_overrunBitSet; delete m_changedBitSet; delete this; } - + // ============ MonitorElement ============ - + virtual PVStructure* getPVStructure() { return m_pvStructure; } - + virtual BitSet* getChangedBitSet() { return m_changedBitSet; @@ -364,47 +366,179 @@ class MockMonitor : public Monitor, public MonitorElement { return m_overrunBitSet; } - - + + }; +class ChannelSearchManager; +class BlockingTCPConnector; +class NamedLockPattern; +class ResponseRequest; +class BeaconHandlerImpl; + +class ClientContextImpl; +typedef int pvAccessID; + // TODO consider std::unordered_map +typedef std::map IOIDResponseRequestMap; +// TODO log +#define CALLBACK_GUARD(code) try { code } catch(...) { } -PVDATA_REFCOUNT_MONITOR_DEFINE(mockChannel); -class ChannelImpl : public Channel { +PVDATA_REFCOUNT_MONITOR_DEFINE(channel); + + +/** + * Context state enum. + */ +enum ContextState { + /** + * State value of non-initialized context. + */ + CONTEXT_NOT_INITIALIZED, + + /** + * State value of initialized context. + */ + CONTEXT_INITIALIZED, + + /** + * State value of destroyed context. + */ + CONTEXT_DESTROYED +}; + + +class ClientContextImpl : public ClientContext +{ + +/** + * Implementation of CAJ JCA Channel. + * @author Matej Sekoranja + */ +class ChannelImpl : + public Channel /*, + public TransportClient, + public TransportSender, + public BaseSearchInstance */ { private: - ChannelProvider* m_provider; - ChannelRequester* m_requester; + + /** + * Context. + */ + ClientContextImpl* m_context; + + /** + * Client channel ID. + */ + pvAccessID m_channelID; + + /** + * Channel name. + */ String m_name; - String m_remoteAddress; + + /** + * Channel requester. + */ + ChannelRequester* m_requester; + /** + * Process priority. + */ + short m_priority; + + /** + * List of fixed addresses, if name resolution will be used. + */ + InetAddrVector* m_addresses; + + /** + * Connection status. + */ + ConnectionState m_connectionState; + + /** + * List of all channel's pending requests (keys are subscription IDs). + */ + IOIDResponseRequestMap m_responseRequests; + + /** + * Allow reconnection flag. + */ + bool m_allowCreation; // = true; + + /** + * Reference counting. + * NOTE: synced on this. + */ + int m_references; // = 1; + + /* ****************** */ + /* CA protocol fields */ + /* ****************** */ + + /** + * Server transport. + */ + Transport* m_transport; + + /** + * Server channel ID. + */ + pvAccessID m_serverChannelID; + + + // TODO mock PVStructure* m_pvStructure; - + private: - ~ChannelImpl() + ~ChannelImpl() { - PVDATA_REFCOUNT_MONITOR_DESTRUCT(mockChannel); + PVDATA_REFCOUNT_MONITOR_DESTRUCT(channel); } - + public: - + + /** + * Constructor. + * @param context + * @param name + * @param listener + * @throws CAException + */ ChannelImpl( - ChannelProvider* provider, - ChannelRequester* requester, - String name, - String remoteAddress) : - m_provider(provider), - m_requester(requester), + ClientContextImpl* context, + pvAccessID channelID, + String name, + ChannelRequester* requester, + short priority, + InetAddrVector* addresses) : + m_context(context), + m_channelID(channelID), m_name(name), - m_remoteAddress(remoteAddress) + m_requester(requester), + m_priority(priority), + m_addresses(addresses), + m_connectionState(NEVER_CONNECTED), + m_allowCreation(true), + m_serverChannelID(0xFFFFFFFF) { - PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannel); - - + PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channel); + + /* + // register before issuing search request + m_context->registerChannel(this); + + // connect + connect(); + */ + // + // mock + // ScalarType stype = pvDouble; String allProperties("alarm,timeStamp,display,control,valueAlarm"); @@ -413,13 +547,17 @@ class ChannelImpl : public Channel { PVDouble *pvField = m_pvStructure->getDoubleField(String("value")); pvField->put(1.123); - + // already connected, report state m_requester->channelStateChange(this, CONNECTED); + + + } - + virtual void destroy() { + if (m_addresses) delete m_addresses; delete m_pvStructure; delete this; }; @@ -428,20 +566,20 @@ class ChannelImpl : public Channel { { return getChannelName(); }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } - virtual ChannelProvider* getProvider() + virtual ChannelProvider* getProvider() { - return m_provider; + return m_context->getProvider(); } virtual epics::pvData::String getRemoteAddress() { - return m_remoteAddress; + return "TODO"; } virtual epics::pvData::String getChannelName() @@ -502,7 +640,7 @@ class ChannelImpl : public Channel { // TODO return 0; } - + virtual ChannelRPC* createChannelRPC(ChannelRPCRequester *channelRPCRequester, epics::pvData::PVStructure *pvRequest) { @@ -524,168 +662,395 @@ class ChannelImpl : public Channel { // TODO return 0; } -}; - -class ChannelProviderImpl; - -class ChannelImplFind : public ChannelFind -{ - public: - ChannelImplFind(ChannelProvider* provider) : m_provider(provider) - { - } - - virtual void destroy() - { - // one instance for all, do not delete at all - } - - virtual ChannelProvider* getChannelProvider() - { - return m_provider; - }; - - virtual void cancelChannelFind() - { - throw std::runtime_error("not supported"); - } - - private: - - // only to be destroyed by it - friend class ChannelProviderImpl; - virtual ~ChannelImplFind() {} - - ChannelProvider* m_provider; -}; - -class ChannelProviderImpl : public ChannelProvider { - public: - - ChannelProviderImpl() : m_mockChannelFind(new ChannelImplFind(this)) { - } - - virtual epics::pvData::String getProviderName() - { - return "ChannelProviderImpl"; - } - - virtual void destroy() - { - delete m_mockChannelFind; - delete this; - } - - virtual ChannelFind* channelFind( - epics::pvData::String channelName, - ChannelFindRequester *channelFindRequester) - { - channelFindRequester->channelFindResult(getStatusCreate()->getStatusOK(), m_mockChannelFind, true); - return m_mockChannelFind; - } - - virtual Channel* createChannel( - epics::pvData::String channelName, - ChannelRequester *channelRequester, - short priority) - { - return createChannel(channelName, channelRequester, priority, "local"); - } - - virtual Channel* createChannel( - epics::pvData::String channelName, - ChannelRequester *channelRequester, - short priority, - epics::pvData::String address) - { - if (address == "local") - { - Channel* channel = new ChannelImpl(this, channelRequester, channelName, address); - channelRequester->channelCreated(getStatusCreate()->getStatusOK(), channel); - return channel; - } - else - { - Status* errorStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "only local supported", 0); - channelRequester->channelCreated(errorStatus, 0); - delete errorStatus; // TODO guard from CB - return 0; - } - } - - private: - ~ChannelProviderImpl() {}; - - ChannelImplFind* m_mockChannelFind; - -}; - - -class ChannelSearchManager; -class BlockingTCPConnector; -class NamedLockPattern; -class ResponseRequest; -class BeaconHandlerImpl; - -class ClientContextImpl : public ClientContext -{ - public: - - ClientContextImpl() : - m_addressList(""), m_autoAddressList(true), m_connectionTimeout(30.0f), m_beaconPeriod(15.0f), - m_broadcastPort(CA_BROADCAST_PORT), m_receiveBufferSize(MAX_TCP_RECV), m_timer(0), - m_broadcastTransport(0), m_searchTransport(0), m_connector(0), m_transportRegistry(0), - m_namedLocker(0), m_lastCID(0), m_lastIOID(0), m_channelSearchManager(0), - m_version(new Version("CA Client", "cpp", 0, 0, 0, 1)) - { - initialize(); - } - - virtual Version* getVersion() { - return m_version; - } - - virtual ChannelProvider* getProvider() { - return m_provider; - } - - virtual void initialize() { - m_provider = new ChannelProviderImpl(); - } - + virtual void printInfo() { String info; printInfo(&info); std::cout << info.c_str() << std::endl; } - + virtual void printInfo(epics::pvData::StringBuilder out) { - out->append(m_version->getVersionString()); + //Lock lock(&m_channelMutex); + //std::ostringstream ostr; + //static String emptyString; + + out->append( "CHANNEL : "); out->append(m_name); + out->append("\nSTATE : "); out->append(ConnectionStateNames[m_connectionState]); + if (m_connectionState == CONNECTED) + { + out->append("\nADDRESS : "); out->append(getRemoteAddress()); + //out->append("\nRIGHTS : "); out->append(getAccessRights()); + } + out->append("\n"); + } +}; + + + class ChannelProviderImpl; + + class ChannelImplFind : public ChannelFind + { + public: + ChannelImplFind(ChannelProvider* provider) : m_provider(provider) + { + } + + virtual void destroy() + { + // one instance for all, do not delete at all + } + + virtual ChannelProvider* getChannelProvider() + { + return m_provider; + }; + + virtual void cancelChannelFind() + { + throw std::runtime_error("not supported"); + } + + private: + + // only to be destroyed by it + friend class ChannelProviderImpl; + virtual ~ChannelImplFind() {} + + ChannelProvider* m_provider; + }; + + class ChannelProviderImpl : public ChannelProvider { + public: + + ChannelProviderImpl(ClientContextImpl* context) : + m_context(context) { + } + + virtual epics::pvData::String getProviderName() + { + return "ChannelProviderImpl"; + } + + virtual void destroy() + { + delete this; + } + + virtual ChannelFind* channelFind( + epics::pvData::String channelName, + ChannelFindRequester *channelFindRequester) + { + m_context->checkChannelName(channelName); + + if (!channelFindRequester) + throw std::runtime_error("null requester"); + + std::auto_ptr errorStatus(getStatusCreate()->createStatus(STATUSTYPE_ERROR, "not implemented", 0)); + channelFindRequester->channelFindResult(errorStatus.get(), 0, false); + return 0; + } + + virtual Channel* createChannel( + epics::pvData::String channelName, + ChannelRequester *channelRequester, + short priority) + { + return createChannel(channelName, channelRequester, priority, emptyString); + } + + virtual Channel* createChannel( + epics::pvData::String channelName, + ChannelRequester *channelRequester, + short priority, + epics::pvData::String address) + { + // TODO support addressList + Channel* channel = m_context->createChannelInternal(channelName, channelRequester, priority, 0); + if (channel) + channelRequester->channelCreated(getStatusCreate()->getStatusOK(), channel); + return channel; + + // NOTE it's up to internal code to respond w/ error to requester and return 0 in case of errors + } + + private: + ~ChannelProviderImpl() {}; + + /* TODO static*/ String emptyString; + ClientContextImpl* m_context; + }; + + public: + + ClientContextImpl() : + m_addressList(""), m_autoAddressList(true), m_connectionTimeout(30.0f), m_beaconPeriod(15.0f), + m_broadcastPort(CA_BROADCAST_PORT), m_receiveBufferSize(MAX_TCP_RECV), m_timer(0), + m_broadcastTransport(0), m_searchTransport(0), m_connector(0), m_transportRegistry(0), + m_namedLocker(0), m_lastCID(0), m_lastIOID(0), m_channelSearchManager(0), + m_version(new Version("CA Client", "cpp", 0, 0, 0, 1)), + m_provider(new ChannelProviderImpl(this)), + m_contextState(CONTEXT_NOT_INITIALIZED) + { + loadConfiguration(); + } + + virtual Version* getVersion() { + return m_version; } + virtual ChannelProvider* getProvider() { + Lock lock(&m_contextMutex); + return m_provider; + } + + virtual void initialize() { + Lock lock(&m_contextMutex); + + if (m_contextState == CONTEXT_DESTROYED) + throw std::runtime_error("Context destroyed."); + else if (m_contextState == CONTEXT_INITIALIZED) + throw std::runtime_error("Context already initialized."); + + internalInitialize(); + + m_contextState = CONTEXT_INITIALIZED; + } + + virtual void printInfo() { + String info; + printInfo(&info); + std::cout << info.c_str() << std::endl; + } + + virtual void printInfo(epics::pvData::StringBuilder out) { + Lock lock(&m_contextMutex); + std::ostringstream ostr; + static String emptyString; + + out->append( "CLASS : ::epics::pvAccess::ClientContextImpl"); + out->append("\nVERSION : "); out->append(m_version->getVersionString()); + out->append("\nADDR_LIST : "); ostr << m_addressList; out->append(ostr.str()); ostr.str(emptyString); + out->append("\nAUTO_ADDR_LIST : "); out->append(m_autoAddressList ? "true" : "false"); + out->append("\nCONNECTION_TIMEOUT : "); ostr << m_connectionTimeout; out->append(ostr.str()); ostr.str(emptyString); + out->append("\nBEACON_PERIOD : "); ostr << m_beaconPeriod; out->append(ostr.str()); ostr.str(emptyString); + out->append("\nBROADCAST_PORT : "); ostr << m_broadcastPort; out->append(ostr.str()); ostr.str(emptyString); + out->append("\nRCV_BUFFER_SIZE : "); ostr << m_receiveBufferSize; out->append(ostr.str()); ostr.str(emptyString); + out->append("\nSTATE : "); + switch (m_contextState) + { + case CONTEXT_NOT_INITIALIZED: + out->append("CONTEXT_NOT_INITIALIZED"); + break; + case CONTEXT_INITIALIZED: + out->append("CONTEXT_INITIALIZED"); + break; + case CONTEXT_DESTROYED: + out->append("CONTEXT_DESTROYED"); + break; + default: + out->append("UNKNOWN"); + } + out->append("\n"); + } + virtual void destroy() { - m_provider->destroy(); - delete m_version; - delete this; - } + m_contextMutex.lock(); + if (m_contextState == CONTEXT_DESTROYED) + { + m_contextMutex.unlock(); + throw std::runtime_error("Context already destroyed."); + } + + // go into destroyed state ASAP + m_contextState = CONTEXT_DESTROYED; + + internalDestroy(); + } + virtual void dispose() { destroy(); - } - + } + private: ~ClientContextImpl() {}; + + void loadConfiguration() { + // TODO + /* + m_addressList = config->getPropertyAsString("EPICS4_CA_ADDR_LIST", m_addressList); + m_autoAddressList = config->getPropertyAsBoolean("EPICS4_CA_AUTO_ADDR_LIST", m_autoAddressList); + m_connectionTimeout = config->getPropertyAsFloat("EPICS4_CA_CONN_TMO", m_connectionTimeout); + m_beaconPeriod = config->getPropertyAsFloat("EPICS4_CA_BEACON_PERIOD", m_beaconPeriod); + m_broadcastPort = config->getPropertyAsInteger("EPICS4_CA_BROADCAST_PORT", m_broadcastPort); + m_receiveBufferSize = config->getPropertyAsInteger("EPICS4_CA_MAX_ARRAY_BYTES", m_receiveBufferSize); + */ + } + + void internalInitialize() { + + m_timer = new Timer("pvAccess-client timer", lowPriority); + /* TODO + connector = new BlockingTCPConnector(this, receiveBufferSize, beaconPeriod); + transportRegistry = new TransportRegistry(); + namedLocker = new NamedLockPattern(); + */ + + // setup UDP transport + initializeUDPTransport(); + + // TODO + // setup search manager + //channelSearchManager = new ChannelSearchManager(this); + } + + void initializeUDPTransport() { + // TODO + } + + void internalDestroy() { + + // stop searching + /* TODO + if (m_channelSearchManager) + channelSearchManager->destroy(); + */ + + // stop timer + if (m_timer) + delete m_timer; + + // + // cleanup + // + + // this will also close all CA transports + destroyAllChannels(); + + // close broadcast transport + /* TODO + if (m_broadcastTransport) + m_broadcastTransport->destroy(true); + if (m_searchTransport != null) + m_searchTransport->destroy(true); + */ + + m_provider->destroy(); + delete m_version; + m_contextMutex.unlock(); + delete this; + } + + void destroyAllChannels() { + // TODO + } + + /** + * Check channel name. + */ + void checkChannelName(String& name) { + if (name.empty()) + throw std::runtime_error("null or empty channel name"); + else if (name.length() > UNREASONABLE_CHANNEL_NAME_LENGTH) + throw std::runtime_error("name too long"); + } + + /** + * Check context state and tries to establish necessary state. + */ + void checkState() { + Lock lock(&m_contextMutex); // TODO check double-lock?!!! + + if (m_contextState == CONTEXT_DESTROYED) + throw std::runtime_error("Context destroyed."); + else if (m_contextState == CONTEXT_NOT_INITIALIZED) + initialize(); + } + + /** + * Searches for a channel with given channel ID. + * @param channelID CID. + * @return channel with given CID, 0 if non-existent. + */ + ChannelImpl* getChannel(pvAccessID channelID) + { + Lock guard(&m_cidMapMutex); + CIDChannelMap::iterator it = m_channelsByCID.find(channelID); + return (it == m_channelsByCID.end() ? 0 : it->second); + } + + + /** + * Generate Client channel ID (CID). + * @return Client channel ID (CID). + */ + pvAccessID generateCID() + { + Lock guard(&m_cidMapMutex); + + // search first free (theoretically possible loop of death) + while (m_channelsByCID.find(++m_lastCID) != m_channelsByCID.end()); + // reserve CID + m_channelsByCID[m_lastCID] = 0; + return m_lastCID; + } + + /** + * Free generated channel ID (CID). + */ + void freeCID(int cid) + { + Lock guard(&m_cidMapMutex); + m_channelsByCID.erase(cid); + } + + /** + * Internal create channel. + */ + // TODO no minor version with the addresses + // TODO what if there is an channel with the same name, but on different host! + Channel* createChannelInternal(String name, ChannelRequester* requester, short priority, + InetAddrVector* addresses) { // TODO addresses + + checkState(); + checkChannelName(name); + + if (requester == 0) + throw std::runtime_error("null requester"); + + if (priority < ChannelProvider::PRIORITY_MIN || priority > ChannelProvider::PRIORITY_MAX) + throw std::range_error("priority out of bounds"); + + bool lockAcquired = true; // TODO namedLocker->acquireSynchronizationObject(name, LOCK_TIMEOUT); + if (lockAcquired) + { + try + { + pvAccessID cid = generateCID(); + return new ChannelImpl(this, cid, name, requester, priority, addresses); + } + catch(...) { + // TODO + return 0; + } + // TODO namedLocker.releaseSynchronizationObject(name); + } + else + { + throw std::runtime_error("Failed to obtain synchronization lock for '" + name + "', possible deadlock."); + } + } /** * A space-separated list of broadcast address for process variable name resolution. * Each address must be of the form: ip.number:port or host.name:port */ String m_addressList; - + /** - * Define whether or not the network interfaces should be discovered at runtime. + * Define whether or not the network interfaces should be discovered at runtime. */ bool m_autoAddressList; @@ -696,22 +1061,22 @@ class ClientContextImpl : public ClientContext * the server is no longer present on the network and disconnect. */ float m_connectionTimeout; - + /** * Period in second between two beacon signals. */ float m_beaconPeriod; - + /** * Broadcast (beacon, search) port number to listen to. */ int m_broadcastPort; - + /** * Receive buffer size (max size of payload). */ int m_receiveBufferSize; - + /** * Timer. */ @@ -721,7 +1086,7 @@ class ClientContextImpl : public ClientContext * Broadcast transport needed to listen for broadcasts. */ BlockingUDPTransport* m_broadcastTransport; - + /** * UDP transport needed for channel searches. */ @@ -734,7 +1099,7 @@ class ClientContextImpl : public ClientContext /** * CA transport (virtual circuit) registry. - * This registry contains all active transports - connections to CA servers. + * This registry contains all active transports - connections to CA servers. */ TransportRegistry* m_transportRegistry; @@ -752,12 +1117,16 @@ class ClientContextImpl : public ClientContext * Map of channels (keys are CIDs). */ // TODO consider std::unordered_map - typedef std::map IntChannelMap; - IntChannelMap m_channelsByCID; + typedef std::map CIDChannelMap; + CIDChannelMap m_channelsByCID; + + /** + * CIDChannelMap mutex. + */ + Mutex m_cidMapMutex; -typedef int pvAccessID; /** - * Last CID cache. + * Last CID cache. */ pvAccessID m_lastCID; @@ -765,11 +1134,11 @@ typedef int pvAccessID; * Map of pending response requests (keys are IOID). */ // TODO consider std::unordered_map - typedef std::map IntResponseRequestMap; - IntResponseRequestMap m_pendingResponseRequests; + typedef std::map IOIDResponseRequestMap; + IOIDResponseRequestMap m_pendingResponseRequests; /** - * Last IOID cache. + * Last IOID cache. */ pvAccessID m_lastIOID; @@ -785,7 +1154,7 @@ typedef int pvAccessID; // TODO consider std::unordered_map typedef std::map AddressBeaconHandlerMap; AddressBeaconHandlerMap m_beaconHandlers; - + /** * Version. */ @@ -795,6 +1164,18 @@ typedef int pvAccessID; * Provider implementation. */ ChannelProviderImpl* m_provider; + + /** + * Context state. + */ + ContextState m_contextState; + + /** + * Context sync. mutex. + */ + Mutex m_contextMutex; + + friend class ChannelProviderImpl; }; @@ -803,7 +1184,7 @@ class ChannelFindRequesterImpl : public ChannelFindRequester virtual void channelFindResult(epics::pvData::Status *status,ChannelFind *channelFind,bool wasFound) { std::cout << "[ChannelFindRequesterImpl] channelFindResult(" - << status->toString() << ", ..., " << wasFound << ")" << std::endl; + << status->toString() << ", ..., " << wasFound << ")" << std::endl; } }; @@ -813,10 +1194,10 @@ class ChannelRequesterImpl : public ChannelRequester { return "ChannelRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void channelCreated(epics::pvData::Status* status, Channel *channel) @@ -824,7 +1205,7 @@ class ChannelRequesterImpl : public ChannelRequester std::cout << "channelCreated(" << status->toString() << ", " << (channel ? channel->getChannelName() : "(null)") << ")" << std::endl; } - + virtual void channelStateChange(Channel *c, ConnectionState connectionState) { std::cout << "channelStateChange(" << c->getChannelName() << ", " << ConnectionStateNames[connectionState] << ")" << std::endl; @@ -837,10 +1218,10 @@ class GetFieldRequesterImpl : public GetFieldRequester { return "GetFieldRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void getDone(epics::pvData::Status *status,epics::pvData::FieldConstPtr field) @@ -863,22 +1244,22 @@ class ChannelGetRequesterImpl : public ChannelGetRequester ChannelGet *m_channelGet; epics::pvData::PVStructure *m_pvStructure; epics::pvData::BitSet *m_bitSet; - + virtual String getRequesterName() { return "ChannelGetRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void channelGetConnect(epics::pvData::Status *status,ChannelGet *channelGet, epics::pvData::PVStructure *pvStructure,epics::pvData::BitSet *bitSet) { std::cout << "channelGetConnect(" << status->toString() << ")" << std::endl; - + // TODO sync m_channelGet = channelGet; m_pvStructure = pvStructure; @@ -900,22 +1281,22 @@ class ChannelPutRequesterImpl : public ChannelPutRequester ChannelPut *m_channelPut; epics::pvData::PVStructure *m_pvStructure; epics::pvData::BitSet *m_bitSet; - + virtual String getRequesterName() { return "ChannelPutRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void channelPutConnect(epics::pvData::Status *status,ChannelPut *channelPut, epics::pvData::PVStructure *pvStructure,epics::pvData::BitSet *bitSet) { std::cout << "channelPutConnect(" << status->toString() << ")" << std::endl; - + // TODO sync m_channelPut = channelPut; m_pvStructure = pvStructure; @@ -941,20 +1322,20 @@ class ChannelPutRequesterImpl : public ChannelPutRequester } }; - - + + class MonitorRequesterImpl : public MonitorRequester { virtual String getRequesterName() { return "MonitorRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } - + virtual void monitorConnect(Status* status, Monitor* monitor, Structure* structure) { std::cout << "monitorConnect(" << status->toString() << ")" << std::endl; @@ -965,13 +1346,13 @@ class MonitorRequesterImpl : public MonitorRequester std::cout << str << std::endl; } } - + virtual void monitorEvent(Monitor* monitor) { std::cout << "monitorEvent" << std::endl; MonitorElement* element = monitor->poll(); - + String str("changed/overrun "); element->getChangedBitSet()->toString(&str); str += '/'; @@ -979,35 +1360,35 @@ class MonitorRequesterImpl : public MonitorRequester str += '\n'; element->getPVStructure()->toString(&str); std::cout << str << std::endl; - + monitor->release(element); } - + virtual void unlisten(Monitor* monitor) { std::cout << "unlisten" << std::endl; } -}; +}; class ChannelProcessRequesterImpl : public ChannelProcessRequester { ChannelProcess *m_channelProcess; - + virtual String getRequesterName() { return "ProcessRequesterImpl"; }; - - virtual void message(String message,MessageType messageType) + + virtual void message(String message,MessageType messageType) { - std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } virtual void channelProcessConnect(epics::pvData::Status *status,ChannelProcess *channelProcess) { std::cout << "channelProcessConnect(" << status->toString() << ")" << std::endl; - + // TODO sync m_channelProcess = channelProcess; } @@ -1023,33 +1404,30 @@ int main(int argc,char *argv[]) { ClientContextImpl* context = new ClientContextImpl(); context->printInfo(); - - + + ChannelFindRequesterImpl findRequester; context->getProvider()->channelFind("something", &findRequester); - + ChannelRequesterImpl channelRequester; - //Channel* noChannel - context->getProvider()->createChannel("test", &channelRequester, ChannelProvider::PRIORITY_DEFAULT, "over the rainbow"); - Channel* channel = context->getProvider()->createChannel("test", &channelRequester); - std::cout << channel->getChannelName() << std::endl; + channel->printInfo(); /* GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, "timeStamp.secondsPastEpoch"); - + ChannelGetRequesterImpl channelGetRequesterImpl; ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, 0); channelGet->get(false); channelGet->destroy(); - + ChannelPutRequesterImpl channelPutRequesterImpl; ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, 0); channelPut->get(); channelPut->put(false); channelPut->destroy(); - - + + MonitorRequesterImpl monitorRequesterImpl; Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, 0); @@ -1062,19 +1440,19 @@ int main(int argc,char *argv[]) ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0); channelProcess->process(false); channelProcess->destroy(); - + status = monitor->stop(); std::cout << "monitor->stop() = " << status->toString() << std::endl; delete status; - - + + monitor->destroy(); */ channel->destroy(); - + context->destroy(); - + std::cout << "-----------------------------------------------------------------------" << std::endl; getShowConstructDestruct()->constuctDestructTotals(stdout); return(0); diff --git a/testApp/utils/Makefile b/testApp/utils/Makefile index 8e364c2..ba2628e 100644 --- a/testApp/utils/Makefile +++ b/testApp/utils/Makefile @@ -34,6 +34,10 @@ PROD_HOST += transportRegisterTest transportRegisterTest_SRCS += transportRegistryTest.cpp transportRegisterTest_LIBS += pvAccess Com pvData +PROD_HOST += namedLockPatternTest +namedLockPatternTest_SRCS += namedLockPatternTest.cpp +namedLockPatternTest_LIBS += pvAccess Com pvData + include $(TOP)/configure/RULES #---------------------------------------- # ADD RULES AFTER THIS LINE diff --git a/testApp/utils/namedLockPatternTest.cpp b/testApp/utils/namedLockPatternTest.cpp new file mode 100644 index 0000000..9224800 --- /dev/null +++ b/testApp/utils/namedLockPatternTest.cpp @@ -0,0 +1,216 @@ +/* + * namedLockPatternTest.cpp + * + */ + +#include "namedLockPattern.h" +#include "showConstructDestruct.h" + +#include +#include + +#include + +using namespace epics::pvAccess; +using namespace std; + +void testIntLockPattern() +{ + int64 timeout = 100; + NamedLockPattern namedLockPattern; + int name1 = 1; + assert(namedLockPattern.acquireSynchronizationObject(name1,timeout)); + assert(namedLockPattern.acquireSynchronizationObject(name1,timeout)); + namedLockPattern.releaseSynchronizationObject(name1); + namedLockPattern.releaseSynchronizationObject(name1); + int name2 = 2; + assert(namedLockPattern.acquireSynchronizationObject(name2,timeout)); + namedLockPattern.releaseSynchronizationObject(name2); +} + +void testIntPtrLockPattern() +{ + int64 timeout = 100; + NamedLockPattern namedLockPattern; + int name1 = 1; + assert(namedLockPattern.acquireSynchronizationObject(&name1,timeout)); + assert(namedLockPattern.acquireSynchronizationObject(&name1,timeout)); + namedLockPattern.releaseSynchronizationObject(&name1); + namedLockPattern.releaseSynchronizationObject(&name1); + int name2 = 2; + assert(namedLockPattern.acquireSynchronizationObject(&name2,timeout)); + namedLockPattern.releaseSynchronizationObject(&name2); +} + +struct cmp_str +{ + bool operator()(char const *a, char const *b) + { + return strcmp(a, b) < 0; + } +}; + +void testCharPtrLockPattern() +{ + int64 timeout = 100; + NamedLockPattern namedLockPattern; + string name1 = "lojze"; + assert(namedLockPattern.acquireSynchronizationObject(name1.c_str(),timeout)); + assert(namedLockPattern.acquireSynchronizationObject(name1.c_str(),timeout)); + namedLockPattern.releaseSynchronizationObject(name1.c_str()); + namedLockPattern.releaseSynchronizationObject(name1.c_str()); + string name2 = "francka"; + assert(namedLockPattern.acquireSynchronizationObject(name2.c_str(),timeout)); + namedLockPattern.releaseSynchronizationObject(name2.c_str()); +} + +struct comp_osiSockAddrPtr +{ + bool operator()(osiSockAddr const *a, osiSockAddr const *b) + { + if (a->sa.sa_family < b->sa.sa_family) return true; + if ((a->sa.sa_family == b->sa.sa_family) && (a->ia.sin_addr.s_addr < b->ia.sin_addr.s_addr )) return true; + if ((a->sa.sa_family == b->sa.sa_family) && (a->ia.sin_addr.s_addr == b->ia.sin_addr.s_addr ) && ( a->ia.sin_port < b->ia.sin_port )) return true; + return false; + } +}; + +void testOsiSockAddrLockPattern() +{ + int64 timeout = 10000; + NamedLockPattern namedLockPattern; + osiSockAddr name1; + name1.ia.sin_addr.s_addr = 1; + name1.ia.sin_port = 1; + name1.ia.sin_family = AF_INET; + + assert(namedLockPattern.acquireSynchronizationObject(&name1,timeout)); + assert(namedLockPattern.acquireSynchronizationObject(&name1,timeout)); + namedLockPattern.releaseSynchronizationObject(&name1); + namedLockPattern.releaseSynchronizationObject(&name1); + + osiSockAddr name2; + name2.ia.sin_addr.s_addr = 1; + name2.ia.sin_port = 1; + name2.ia.sin_family = AF_INET; + NamedLock namedGuard(&namedLockPattern); + assert(namedGuard.acquireSynchronizationObject(&name1,timeout)); +} + +struct comp_osiSockAddr +{ + bool operator()(osiSockAddr const a, osiSockAddr const b) + { + if (a.sa.sa_family < b.sa.sa_family) return true; + if ((a.sa.sa_family == b.sa.sa_family) && (a.ia.sin_addr.s_addr < b.ia.sin_addr.s_addr )) return true; + if ((a.sa.sa_family == b.sa.sa_family) && (a.ia.sin_addr.s_addr == b.ia.sin_addr.s_addr ) && ( a.ia.sin_port < b.ia.sin_port )) return true; + return false; + } +}; + +void* testWorker1(void* p) +{ + int32 timeout = 1000; + const int32 max = 1000; + NamedLockPattern* namedLockPattern = (NamedLockPattern*)p; + + for(int32 i = 0 ; i < max; i = i +2) + { + osiSockAddr addr; + addr.ia.sin_addr.s_addr = i; + addr.ia.sin_port = i; + addr.ia.sin_family = AF_INET; + NamedLock namedGuard(namedLockPattern); + assert(namedGuard.acquireSynchronizationObject(addr,timeout)); + usleep(1); + } + + //this one takes a lock, thread 2 will be slower and will get timeout + { //due to namedGuard + osiSockAddr addr; + addr.ia.sin_addr.s_addr = 1; + addr.ia.sin_port = 1; + addr.ia.sin_family = AF_INET; + NamedLock namedGuard(namedLockPattern); + assert(namedGuard.acquireSynchronizationObject(addr,timeout)); + sleep(5); + } + + return NULL; +} + + +void* testWorker2(void* p) +{ + int32 timeout = 1000; + const int32 max = 1000; + NamedLockPattern* namedLockPattern = (NamedLockPattern*)p; + + for(int32 i = 1 ; i < max; i = i + 2) + { + osiSockAddr addr; + addr.ia.sin_addr.s_addr = i; + addr.ia.sin_port = i; + addr.ia.sin_family = AF_INET; + NamedLock namedGuard(namedLockPattern); + assert(namedGuard.acquireSynchronizationObject(addr,timeout)); + usleep(1); + } + + //this thread sleeps a while and gets timeout on lock + { + sleep(1); + osiSockAddr addr; + addr.ia.sin_addr.s_addr = 1; + addr.ia.sin_port = 1; + addr.ia.sin_family = AF_INET; + NamedLock namedGuard(namedLockPattern); + assert(!namedGuard.acquireSynchronizationObject(addr,timeout)); + } + + return NULL; +} + +int main(int argc, char *argv[]) +{ + testIntLockPattern(); + testIntPtrLockPattern(); + testCharPtrLockPattern(); + testOsiSockAddrLockPattern(); + + pthread_t _worker1Id; + pthread_t _worker2Id; + + NamedLockPattern namedLockPattern; + + //create two threads + int32 retval = pthread_create(&_worker1Id, NULL, testWorker1, &namedLockPattern); + if(retval != 0) + { + assert(true); + } + + retval = pthread_create(&_worker2Id, NULL, testWorker2, &namedLockPattern); + if(retval != 0) + { + assert(true); + } + + //wait for threads + retval = pthread_join(_worker1Id, NULL); + if(retval != 0) + { + assert(true); + } + + retval = pthread_join(_worker2Id, NULL); + if(retval != 0) + { + assert(true); + } + + getShowConstructDestruct()->constuctDestructTotals(stdout); + return 0; +} + +