diff --git a/pvAccessApp/Makefile b/pvAccessApp/Makefile index 8cd62cc..a6ebef8 100644 --- a/pvAccessApp/Makefile +++ b/pvAccessApp/Makefile @@ -19,18 +19,25 @@ INC += inetAddressUtil.h INC += logger.h INC += introspectionRegistry.h INC += transportRegistry.h +INC += namedLockPattern.h +INC += referenceCountingLock.h LIBSRCS += hexDump.cpp LIBSRCS += wildcharMatcher.cpp LIBSRCS += inetAddressUtil.cpp LIBSRCS += logger.cpp LIBSRCS += introspectionRegistry.cpp LIBSRCS += transportRegistry.cpp +LIBSRCS += namedLockPattern.cpp +LIBSRCS += referenceCountingLock.cpp SRC_DIRS += $(PVACCESS)/client INC += pvAccess.h LIBSRCS += pvAccess.cpp +SRC_DIRS += $(PVACCESS)/server +INC += serverContext.h + SRC_DIRS += $(PVACCESS)/factory LIBSRCS += ChannelAccessFactory.cpp @@ -52,6 +59,9 @@ LIBSRCS += beaconHandler.cpp LIBSRCS += blockingTCPTransport.cpp LIBSRCS += blockingClientTCPTransport.cpp LIBSRCS += blockingTCPConnector.cpp +LIBSRCS += blockingServerTCPTransport.cpp +LIBSRCS += blockingTCPAcceptor.cpp +LIBSRCS += responseHandlers.cpp LIBRARY = pvAccess pvAccess_LIBS += Com diff --git a/pvAccessApp/remote/beaconHandler.cpp b/pvAccessApp/remote/beaconHandler.cpp index 88a4392..596ace2 100644 --- a/pvAccessApp/remote/beaconHandler.cpp +++ b/pvAccessApp/remote/beaconHandler.cpp @@ -69,7 +69,7 @@ bool BeaconHandler::updateBeacon(int8 remoteTransportRevision, int64 timestamp, void BeaconHandler::beaconArrivalNotify() { - int32 size; + int32 size = 0; //TODO TCP name must be get from somewhere not hardcoded //TODO Transport** transports = NULL;//_context->getTransportRegistry().get("TCP", _responseFrom, size); @@ -83,12 +83,12 @@ void BeaconHandler::beaconArrivalNotify() { transports[i]->aliveNotification(); } - delete transports; + delete[] transports; } void BeaconHandler::changedTransport() { - int32 size; + int32 size = 0; //TODO TCP name must be get from somewhere not hardcoded //TODO Transport** transports = NULL;//_context->getTransportRegistry().get("TCP", _responseFrom, size); @@ -102,7 +102,7 @@ void BeaconHandler::changedTransport() { transports[i]->changedTransport(); } - delete transports; + delete[] transports; } }} diff --git a/pvAccessApp/remote/beaconHandler.h b/pvAccessApp/remote/beaconHandler.h index 73fa8fa..2a033a6 100644 --- a/pvAccessApp/remote/beaconHandler.h +++ b/pvAccessApp/remote/beaconHandler.h @@ -67,7 +67,6 @@ namespace epics { namespace pvAccess { * Mutex */ Mutex _mutex; - /** * Update beacon. * @param remoteTransportRevision encoded (major, minor) revision. diff --git a/pvAccessApp/remote/beaconServerStatusProvider.h b/pvAccessApp/remote/beaconServerStatusProvider.h index e6711da..5b65494 100644 --- a/pvAccessApp/remote/beaconServerStatusProvider.h +++ b/pvAccessApp/remote/beaconServerStatusProvider.h @@ -24,7 +24,7 @@ namespace epics { namespace pvAccess { */ BeaconServerStatusProvider(ServerContext* context); /** - * Test Constructor (ohne context) + * Test Constructor (without context) */ BeaconServerStatusProvider(); /** diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp new file mode 100644 index 0000000..29272c5 --- /dev/null +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -0,0 +1,126 @@ +/* + * blockingServerTCPTransport.cpp + * + * Created on: Jan 4, 2011 + * Author: Miha Vitorovic + */ + +/* pvAccess */ +#include "blockingTCP.h" +#include "remote.h" + +/* pvData */ +#include +#include + +/* EPICSv3 */ +#include + +/* standard */ +#include + +using namespace epics::pvData; +using std::map; + +namespace epics { + namespace pvAccess { + + BlockingServerTCPTransport::BlockingServerTCPTransport( + Context* context, SOCKET channel, + ResponseHandler* responseHandler, int receiveBufferSize) : + BlockingTCPTransport(context, channel, responseHandler, + receiveBufferSize, CA_DEFAULT_PRIORITY), + _introspectionRegistry(new IntrospectionRegistry(true)), + _lastChannelSID(0), _channels( + new map ()), _channelsMutex( + new Mutex()) { + // NOTE: priority not yet known, default priority is used to register/unregister + // TODO implement priorities in Reactor... not that user will + // change it.. still getPriority() must return "registered" priority! + + start(); + } + + BlockingServerTCPTransport::~BlockingServerTCPTransport() { + delete _introspectionRegistry; + delete _channels; + delete _channelsMutex; + } + + void BlockingServerTCPTransport::destroyAllChannels() { + Lock lock(_channelsMutex); + if(_channels->size()==0) return; + + char ipAddrStr[64]; + ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + + errlogSevPrintf( + errlogInfo, + "Transport to %s still has %d channel(s) active and closing...", + ipAddrStr, _channels->size()); + + map::iterator it = _channels->begin(); + for(; it!=_channels->end(); it++) + it->second->destroy(); + + _channels->clear(); + } + + void BlockingServerTCPTransport::internalClose(bool force) { + BlockingTCPTransport::internalClose(force); + destroyAllChannels(); + } + + int BlockingServerTCPTransport::preallocateChannelSID() { + Lock lock(_channelsMutex); + // search first free (theoretically possible loop of death) + int sid = ++_lastChannelSID; + while(_channels->find(sid)!=_channels->end()) + sid = ++_lastChannelSID; + return sid; + } + + void BlockingServerTCPTransport::registerChannel(int sid, + ServerChannel* channel) { + Lock lock(_channelsMutex); + (*_channels)[sid] = channel; + } + + void BlockingServerTCPTransport::unregisterChannel(int sid) { + Lock lock(_channelsMutex); + _channels->erase(sid); + } + + ServerChannel* BlockingServerTCPTransport::getChannel(int sid) { + Lock lock(_channelsMutex); + + map::iterator it = _channels->find(sid); + if(it!=_channels->end()) return it->second; + + return NULL; + } + + int BlockingServerTCPTransport::getChannelCount() { + Lock lock(_channelsMutex); + return _channels->size(); + } + + void BlockingServerTCPTransport::send(ByteBuffer* buffer, + TransportSendControl* control) { + // + // send verification message + // + control->startMessage(1, 2*sizeof(int32)); + + // receive buffer size + buffer->putInt(getReceiveBufferSize()); + + // socket receive buffer size + buffer->putInt(getSocketReceiveBufferSize()); + + // send immediately + control->flush(true); + } + + } +} diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index 8bb1c8c..8e11405 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -14,21 +14,23 @@ #include "growingCircularBuffer.h" #include "transportRegistry.h" #include "introspectionRegistry.h" +#include "namedLockPattern.h" /* pvData */ #include #include #include -#include #include /* EPICSv3 */ #include #include #include +#include /* standard */ #include +#include namespace epics { namespace pvAccess { @@ -472,6 +474,20 @@ namespace epics { void responsiveTransport(); }; + /** + * comparator for osiSockAddr* + */ + struct comp_osiSockAddrPtr + { + bool operator()(osiSockAddr const *a, osiSockAddr const *b) + { + if (a->sa.sa_family < b->sa.sa_family) return true; + if ((a->sa.sa_family == b->sa.sa_family) && (a->ia.sin_addr.s_addr < b->ia.sin_addr.s_addr )) return true; + if ((a->sa.sa_family == b->sa.sa_family) && (a->ia.sin_addr.s_addr == b->ia.sin_addr.s_addr ) && ( a->ia.sin_port < b->ia.sin_port )) return true; + return false; + } + }; + /** * Channel Access TCP connector. * @author Matej Sekoranja @@ -485,8 +501,8 @@ namespace epics { virtual ~BlockingTCPConnector(); virtual Transport* connect(TransportClient* client, - ResponseHandler* responseHandler, osiSockAddr* address, - short transportRevision, int16 priority); + ResponseHandler* responseHandler, osiSockAddr* address, + short transportRevision, int16 priority); private: /** * Lock timeout @@ -499,9 +515,9 @@ namespace epics { Context* _context; /** - * Context instance. + * named lock */ - //NamedLockPattern* _namedLocker; + NamedLockPattern* _namedLocker; /** * Receive buffer size. @@ -524,6 +540,207 @@ namespace epics { }; + class BlockingServerTCPTransport : public BlockingTCPTransport, + public ChannelHostingTransport, + public TransportSender { + public: + BlockingServerTCPTransport(Context* context, SOCKET channel, + ResponseHandler* responseHandler, int receiveBufferSize); + + virtual ~BlockingServerTCPTransport(); + + virtual IntrospectionRegistry* getIntrospectionRegistry() { + return _introspectionRegistry; + } + + /** + * Preallocate new channel SID. + * @return new channel server id (SID). + */ + virtual int preallocateChannelSID(); + + /** + * De-preallocate new channel SID. + * @param sid preallocated channel SID. + */ + virtual void depreallocateChannelSID(int sid) { + // noop + } + + /** + * Register a new channel. + * @param sid preallocated channel SID. + * @param channel channel to register. + */ + virtual void registerChannel(int sid, ServerChannel* channel); + + /** + * Unregister a new channel (and deallocates its handle). + * @param sid SID + */ + virtual void unregisterChannel(int sid); + + /** + * Get channel by its SID. + * @param sid channel SID + * @return channel with given SID, NULL otherwise + */ + virtual ServerChannel* getChannel(int sid); + + /** + * Get channel count. + * @return channel count. + */ + virtual int getChannelCount(); + + virtual epics::pvData::PVField* getSecurityToken() { + return NULL; + } + + virtual void lock() { + // noop + } + + virtual void unlock() { + // noop + } + + /** + * Verify transport. Server side is self-verified. + */ + void verify() { + enqueueSendRequest(this); + verified(); + } + + /** + * CA connection validation request. + * A server sends a validate connection message when it receives a new connection. + * The message indicates that the server is ready to receive requests; the client must + * not send any messages on the connection until it has received the validate connection message + * from the server. No reply to the message is expected by the server. + * The purpose of the validate connection message is two-fold: + * It informs the client of the protocol version supported by the server. + * It prevents the client from writing a request message to its local transport + * buffers until after the server has acknowledged that it can actually process the + * request. This avoids a race condition caused by the server's TCP/IP stack + * accepting connections in its backlog while the server is in the process of shutting down: + * if the client were to send a request in this situation, the request + * would be lost but the client could not safely re-issue the request because that + * might violate at-most-once semantics. + * The validate connection message guarantees that a server is not in the middle + * of shutting down when the server's TCP/IP stack accepts an incoming connection + * and so avoids the race condition. + * @see org.epics.ca.impl.remote.TransportSender#send(java.nio.ByteBuffer, org.epics.ca.impl.remote.TransportSendControl) + */ + virtual void send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control); + + protected: + /** + * Introspection registry. + */ + IntrospectionRegistry* _introspectionRegistry; + + virtual void internalClose(bool force); + + private: + /** + * Last SID cache. + */ + volatile int _lastChannelSID; + + /** + * Channel table (SID -> channel mapping). + */ + std::map* _channels; + + Mutex* _channelsMutex; + + /** + * Destroy all channels. + */ + void destroyAllChannels(); + }; + + /** + * Channel Access Server TCP acceptor. + * @author Matej Sekoranja + * @version $Id: BlockingTCPAcceptor.java,v 1.1 2010/05/03 14:45:42 mrkraimer Exp $ + */ + class BlockingTCPAcceptor { + public: + + /** + * @param context + * @param port + * @param receiveBufferSize + * @throws CAException + */ + BlockingTCPAcceptor(Context* context, int port, + int receiveBufferSize); + + ~BlockingTCPAcceptor(); + + void handleEvents(); + + /** + * Bind socket address. + * @return bind socket address, null if not binded. + */ + osiSockAddr* getBindAddress() { + return _bindAddress; + } + + /** + * Destroy acceptor (stop listening). + */ + void destroy(); + + private: + /** + * Context instance. + */ + Context* _context; + + /** + * Bind server socket address. + */ + osiSockAddr* _bindAddress; + + /** + * Server socket channel. + */ + SOCKET _serverSocketChannel; + + /** + * Receive buffer size. + */ + int _receiveBufferSize; + + /** + * Destroyed flag. + */ + volatile bool _destroyed; + + epicsThreadId _threadId; + + /** + * Initialize connection acception. + * @return port where server is listening + */ + int initialize(in_port_t port); + + /** + * Validate connection by sending a validation message request. + * @return true on success. + */ + bool validateConnection(BlockingServerTCPTransport* transport, + const char* address); + + static void handleEventsRunner(void* param); + }; + } } diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp new file mode 100644 index 0000000..edb93c0 --- /dev/null +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -0,0 +1,280 @@ +/* + * blockingTCPAcceptor.cpp + * + * Created on: Jan 4, 2011 + * Author: Miha Vitorovic + */ + +/* pvAccess */ +#include "blockingTCP.h" +#include "remote.h" +#include "serverContext.h" + +/* pvData */ +#include + +/* EPICSv3 */ +#include +#include +#include + +/* standard */ +#include +#include + +using std::ostringstream; + +namespace epics { + namespace pvAccess { + + BlockingTCPAcceptor::BlockingTCPAcceptor(Context* context, int port, + int receiveBufferSize) : + _context(context), _bindAddress(NULL), _serverSocketChannel( + INVALID_SOCKET), _receiveBufferSize(receiveBufferSize), + _destroyed(false), _threadId(NULL) { + initialize(port); + } + + BlockingTCPAcceptor::~BlockingTCPAcceptor() { + if(_bindAddress!=NULL) delete _bindAddress; + } + + int BlockingTCPAcceptor::initialize(in_port_t port) { + // specified bind address + _bindAddress = new osiSockAddr; + _bindAddress->ia.sin_family = AF_INET; + _bindAddress->ia.sin_port = htons(port); + _bindAddress->ia.sin_addr.s_addr = htonl(INADDR_ANY); + + char strBuffer[64]; + char ipAddrStr[48]; + ipAddrToA(&_bindAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + + int tryCount = 0; + while(tryCount<2) { + + errlogSevPrintf(errlogInfo, "Creating acceptor to %s.", + ipAddrStr); + + _serverSocketChannel = epicsSocketCreate(AF_INET, SOCK_STREAM, + IPPROTO_TCP); + if(_serverSocketChannel==INVALID_SOCKET) { + epicsSocketConvertErrnoToString(strBuffer, + sizeof(strBuffer)); + ostringstream temp; + temp<<"Socket create error: "<sa, sizeof(sockaddr)); + if(retval<0) { + epicsSocketConvertErrnoToString(strBuffer, + sizeof(strBuffer)); + errlogSevPrintf(errlogMinor, "Socket bind error: %s", + strBuffer); + if(_bindAddress->ia.sin_port!=0) { + // failed to bind to specified bind address, + // try to get port dynamically, but only once + errlogSevPrintf( + errlogMinor, + "Configured TCP port %d is unavailable, trying to assign it dynamically.", + port); + _bindAddress->ia.sin_port = htons(0); + } + else { + ::close(_serverSocketChannel); + break; // exit while loop + } + } + else { // if(retval<0) + // bind succeeded + + // update bind address, if dynamically port selection was used + if(ntohs(_bindAddress->ia.sin_port)==0) { + socklen_t sockLen = sizeof(sockaddr); + // read the actual socket info + retval = ::getsockname(_serverSocketChannel, + &_bindAddress->sa, &sockLen); + if(retval<0) { + // error obtaining port number + epicsSocketConvertErrnoToString(strBuffer, + sizeof(strBuffer)); + errlogSevPrintf(errlogMinor, + "getsockname error: %s", strBuffer); + } + else { + errlogSevPrintf( + errlogInfo, + "Using dynamically assigned TCP port %d.", + ntohs(_bindAddress->ia.sin_port)); + } + } + + retval = ::listen(_serverSocketChannel, 5); + if(retval<0) { + epicsSocketConvertErrnoToString(strBuffer, + sizeof(strBuffer)); + ostringstream temp; + temp<<"Socket listen error: "<ia.sin_port); + } // successful bind + } // successfully obtained socket + tryCount++; + } // while + + ostringstream temp; + temp<<"Failed to create acceptor to "<ia, ipAddrStr, sizeof(ipAddrStr)); + errlogSevPrintf(errlogInfo, "Accepting connections at %s.", + ipAddrStr); + + bool socketOpen = true; + char strBuffer[64]; + + pollfd sockets[1]; + sockets[0].fd = _serverSocketChannel; + sockets[0].events = POLLIN; + + while(!_destroyed&&socketOpen) { + int retval = ::poll(sockets, 1, 50); + if(retval<0) { + // error in poll + epicsSocketConvertErrnoToString(strBuffer, + sizeof(strBuffer)); + errlogSevPrintf(errlogMajor, "socket poll error: %s", + strBuffer); + socketOpen = false; + } + else if(retval>0) { + // some event on a socket + if(sockets[0].revents&POLLIN!=0) { + // connection waiting + + osiSockAddr address; + osiSocklen_t len = sizeof(sockaddr); + + SOCKET newClient = epicsSocketAccept( + _serverSocketChannel, &address.sa, &len); + if(newClient!=INVALID_SOCKET) { + // accept succeeded + ipAddrToA(&address.ia, ipAddrStr, sizeof(ipAddrStr)); + errlogSevPrintf(errlogInfo, + "Accepted connection from CA client: %s", + ipAddrStr); + + // enable TCP_NODELAY (disable Nagle's algorithm) + int optval = 1; // true + retval = ::setsockopt(newClient, IPPROTO_TCP, + TCP_NODELAY, &optval, sizeof(int)); + if(retval<0) { + epicsSocketConvertErrnoToString(strBuffer, + sizeof(strBuffer)); + errlogSevPrintf(errlogMinor, + "Error setting TCP_NODELAY: %s", + strBuffer); + } + + // enable TCP_KEEPALIVE + retval = ::setsockopt(newClient, SOL_SOCKET, + SO_KEEPALIVE, &optval, sizeof(int)); + if(retval<0) { + epicsSocketConvertErrnoToString(strBuffer, + sizeof(strBuffer)); + errlogSevPrintf(errlogMinor, + "Error setting SO_KEEPALIVE: %s", + strBuffer); + } + + // TODO tune buffer sizes?! + //socket.socket().setReceiveBufferSize(); + //socket.socket().setSendBufferSize(); + + // create transport + // each transport should have its own response handler since it is not "shareable" + BlockingServerTCPTransport + * transport = + new BlockingServerTCPTransport( + _context, + newClient, + new ServerResponseHandler( + (ServerContextImpl*)_context), + _receiveBufferSize); + + // validate connection + if(!validateConnection(transport, ipAddrStr)) { + transport->close(true); + errlogSevPrintf( + errlogInfo, + "Connection to CA client %s failed to be validated, closing it.", + ipAddrStr); + return; + } + + errlogSevPrintf(errlogInfo, + "Serving to CA client: %s", ipAddrStr); + + }// accept succeeded + } // connection waiting + if(sockets[0].revents&(POLLERR|POLLHUP|POLLNVAL)!=0) { + errlogSevPrintf(errlogMajor, + "error on a socket: POLLERR|POLLHUP|POLLNVAL"); + socketOpen = false; + } + } // some event on a socket + } // while + } + + bool BlockingTCPAcceptor::validateConnection( + BlockingServerTCPTransport* transport, const char* address) { + try { + transport->verify(); + return true; + } catch(...) { + errlogSevPrintf(errlogInfo, "Validation of %s failed.", address); + return false; + } + } + + void BlockingTCPAcceptor::handleEventsRunner(void* param) { + ((BlockingTCPAcceptor*)param)->handleEvents(); + } + + void BlockingTCPAcceptor::destroy() { + if(_destroyed) return; + _destroyed = true; + + if(_serverSocketChannel!=INVALID_SOCKET) { + char ipAddrStr[48]; + ipAddrToA(&_bindAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + errlogSevPrintf(errlogInfo, + "Stopped accepting connections at %s.", ipAddrStr); + + epicsSocketDestroy(_serverSocketChannel); + } + } + + } +} diff --git a/pvAccessApp/remote/blockingTCPConnector.cpp b/pvAccessApp/remote/blockingTCPConnector.cpp index 232cebc..0c7e7f1 100644 --- a/pvAccessApp/remote/blockingTCPConnector.cpp +++ b/pvAccessApp/remote/blockingTCPConnector.cpp @@ -7,6 +7,7 @@ #include "blockingTCP.h" #include "remote.h" +#include "namedLockPattern.h" #include #include @@ -21,14 +22,14 @@ namespace epics { BlockingTCPConnector::BlockingTCPConnector(Context* context, int receiveBufferSize, float beaconInterval) : - _context(context), _receiveBufferSize(receiveBufferSize), - _beaconInterval(beaconInterval) - //TODO , _namedLocker(new NamedLockPattern()) - { + _context(context), _namedLocker(new NamedLockPattern< + const osiSockAddr*, comp_osiSockAddrPtr> ()), + _receiveBufferSize(receiveBufferSize), _beaconInterval( + beaconInterval) { } BlockingTCPConnector::~BlockingTCPConnector() { - // TODO delete _namedLocker; + delete _namedLocker; } SOCKET BlockingTCPConnector::tryConnect(osiSockAddr* address, int tries) { @@ -86,10 +87,8 @@ namespace epics { if(transport->acquire(client)) return transport; } - bool lockAcquired = true; - // TODO comment out - //bool lockAcquired = _namedLocker->acquireSynchronizationObject( - // address, LOCK_TIMEOUT); + bool lockAcquired = _namedLocker->acquireSynchronizationObject( + address, LOCK_TIMEOUT); if(lockAcquired) { try { // ... transport created during waiting in lock @@ -115,13 +114,13 @@ namespace epics { // enable TCP_NODELAY (disable Nagle's algorithm) int optval = 1; // true int retval = ::setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, - &optval, sizeof(optval)); + &optval, sizeof(int)); if(retval<0) errlogSevPrintf(errlogMajor, "Error setting TCP_NODELAY: %s", strerror(errno)); // enable TCP_KEEPALIVE retval = ::setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE, - &optval, sizeof(optval)); + &optval, sizeof(int)); if(retval<0) errlogSevPrintf(errlogMinor, "Error setting SO_KEEPALIVE: %s", strerror(errno)); @@ -130,9 +129,10 @@ namespace epics { //socket.socket().setSendBufferSize(); // create transport - transport = new BlockingClientTCPTransport(_context, socket, - responseHandler, _receiveBufferSize, client, - transportRevision, _beaconInterval, priority); + transport = new BlockingClientTCPTransport(_context, + socket, responseHandler, _receiveBufferSize, + client, transportRevision, _beaconInterval, + priority); // verify if(!transport->waitUntilVerified(3.0)) { @@ -156,11 +156,10 @@ namespace epics { } catch(...) { // close socket, if open if(socket!=INVALID_SOCKET) epicsSocketDestroy(socket); - - // TODO namedLocker.releaseSynchronizationObject(address); - + _namedLocker->releaseSynchronizationObject(address); throw; } + _namedLocker->releaseSynchronizationObject(address); } else { ostringstream temp; diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 91232a2..3dd3720 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -11,11 +11,13 @@ #include "caConstants.h" #include "transportRegistry.h" #include "introspectionRegistry.h" +#include "serverContext.h" #include #include #include #include +#include #include #include @@ -29,12 +31,24 @@ namespace epics { TCP, UDP, SSL }; + enum MessageCommands { + CMD_BEACON = 0, CMD_CONNECTION_VALIDATION = 1, CMD_ECHO = 2, + CMD_SEARCH = 3, CMD_INTROSPECTION_SEARCH = 5, + CMD_CREATE_CHANNEL = 7, CMD_DESTROY_CHANNEL = 8, CMD_GET = 10, + CMD_PUT = 11, CMD_PUT_GET = 12, CMD_MONITOR = 13, CMD_ARRAY = 14, + CMD_CANCEL_REQUEST = 15, CMD_PROCESS = 16, CMD_GET_FIELD = 17, + CMD_RPC = 20, + }; + /** * Interface defining transport send control. * @author Matej Sekoranja */ class TransportSendControl : public epics::pvData::SerializableControl { public: + virtual ~TransportSendControl() { + } + virtual void startMessage(int8 command, int ensureCapacity) =0; virtual void endMessage() =0; @@ -50,6 +64,8 @@ namespace epics { */ class TransportSender { public: + virtual ~TransportSender() {} + /** * Called by transport. * By this call transport gives callee ownership over the buffer. @@ -58,8 +74,7 @@ namespace epics { * of this method. * NOTE: these limitations allows efficient implementation. */ - virtual void - send(epics::pvData::ByteBuffer* buffer, + virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) =0; virtual void lock() =0; @@ -202,6 +217,8 @@ namespace epics { */ class ResponseHandler { public: + virtual ~ResponseHandler() {} + /** * Handle response. * @param[in] responseFrom remote address of the responder, null if unknown. @@ -213,9 +230,115 @@ namespace epics { * Code must not manipulate buffer. */ virtual void - handleResponse(osiSockAddr* responseFrom, Transport* transport, - int8 version, int8 command, int payloadSize, - epics::pvData::ByteBuffer* payloadBuffer) =0; + handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, + epics::pvData::ByteBuffer* payloadBuffer) =0; + }; + + /** + * Base (abstract) channel access response handler. + * @author Matej Sekoranja + * @version $Id: AbstractResponseHandler.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class AbstractResponseHandler : public ResponseHandler { + public: + /** + * @param description + */ + AbstractResponseHandler(String description) : + _description(description), _debug(true) { + //debug = System.getProperties().containsKey(CAConstants.CAJ_DEBUG); + } + + virtual ~AbstractResponseHandler() { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + + protected: + /** + * Response hanlder description. + */ + String _description; + + /** + * Debug flag. + */ + bool _debug; + }; + + /** + * @author Matej Sekoranja + * @version $Id: AbstractServerResponseHandler.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class AbstractServerResponseHandler : public AbstractResponseHandler { + public: + /** + * @param context + * @param description + */ + AbstractServerResponseHandler(ServerContextImpl* context, + String description) : + AbstractResponseHandler(description), _context(context) { + } + + virtual ~AbstractServerResponseHandler() { + } + protected: + ServerContextImpl* _context; + }; + + /** + * Bad request handler. + * @author Matej Sekoranja + * @version $Id: BadResponse.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class BadResponse : public AbstractServerResponseHandler { + public: + /** + * @param context + */ + BadResponse(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "Bad request") { + } + + virtual ~BadResponse() { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + }; + + /** + * CAS request handler - main handler which dispatches requests to appropriate handlers. + * @author Matej Sekoranja + * @version $Id: ServerResponseHandler.java,v 1.1 2010/05/03 14:45:48 mrkraimer Exp $ + */ + class ServerResponseHandler : public ResponseHandler { + public: + ServerResponseHandler(ServerContextImpl* context); + + virtual ~ServerResponseHandler(); + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + private: + static const int HANDLER_TABLE_LENGTH = 28; + /** + * Table of response handlers for each command ID. + */ + ResponseHandler** _handlerTable; + + /** + * Context instance. + */ + ServerContextImpl* _context; + }; /** @@ -225,6 +348,8 @@ namespace epics { */ class TransportClient { public: + virtual ~TransportClient(); + /** * Notification of unresponsive transport (e.g. no heartbeat detected) . */ @@ -256,6 +381,9 @@ namespace epics { */ class Connector { public: + virtual ~Connector() { + } + /** * Connect. * @param[in] client client requesting connection (transport). @@ -274,6 +402,8 @@ namespace epics { class Context { public: + virtual ~Context() { + } /** * Get timer. * @return timer. @@ -294,6 +424,8 @@ namespace epics { */ class ReferenceCountingTransport { public: + virtual ~ReferenceCountingTransport() {} + /** * Acquires transport. * @param client client (channel) acquiring the transport @@ -308,6 +440,78 @@ namespace epics { virtual void release(TransportClient* client) =0; }; + class ServerChannel { + public: + virtual ~ServerChannel() { + } + /** + * Get channel SID. + * @return channel SID. + */ + virtual int getSID() =0; + + /** + * Destroy server channel. + * This method MUST BE called if overriden. + */ + virtual void destroy() =0; + }; + + /** + * Interface defining a transport that hosts channels. + * @author Matej Sekoranja + * @version $Id: ChannelHostingTransport.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class ChannelHostingTransport { + public: + virtual ~ChannelHostingTransport() { + } + + /** + * Get security token. + * @return security token, can be null. + */ + virtual epics::pvData::PVField* getSecurityToken() =0; + + /** + * Preallocate new channel SID. + * @return new channel server id (SID). + */ + virtual int preallocateChannelSID() =0; + + /** + * De-preallocate new channel SID. + * @param sid preallocated channel SID. + */ + virtual void depreallocateChannelSID(int sid) =0; + + /** + * Register a new channel. + * @param sid preallocated channel SID. + * @param channel channel to register. + */ + virtual void registerChannel(int sid, ServerChannel* channel) =0; + + /** + * Unregister a new channel (and deallocates its handle). + * @param sid SID + */ + virtual void unregisterChannel(int sid) =0; + + /** + * Get channel by its SID. + * @param sid channel SID + * @return channel with given SID, null otherwise + */ + virtual ServerChannel* getChannel(int sid) =0; + + /** + * Get channel count. + * @return channel count. + */ + virtual int getChannelCount() =0; + }; + } } diff --git a/pvAccessApp/remote/responseHandlers.cpp b/pvAccessApp/remote/responseHandlers.cpp new file mode 100644 index 0000000..b1820da --- /dev/null +++ b/pvAccessApp/remote/responseHandlers.cpp @@ -0,0 +1,93 @@ +/* + * responseHandlers.cpp + * + * Created on: Jan 4, 2011 + * Author: Miha Vitorovic + */ + +#include "remote.h" +#include "hexDump.h" + +#include + +#include +#include + +#include + +using std::ostringstream; +using std::hex; + +using namespace epics::pvData; + +namespace epics { + namespace pvAccess { + + void AbstractResponseHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + if(_debug) { + char ipAddrStr[48]; + ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); + + ostringstream prologue; + prologue<<"Message ["<getArray(), + payloadBuffer->getPosition(), payloadSize); + } + } + + void BadResponse::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + char ipAddrStr[48]; + ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); + + errlogSevPrintf(errlogInfo, + "Undecipherable message (bad response type %d) from %s.", + command, ipAddrStr); + + } + + ServerResponseHandler::ServerResponseHandler(ServerContextImpl* context) : + _context(context) { + + BadResponse* badResponse = new BadResponse(context); + + _handlerTable = new ResponseHandler*[HANDLER_TABLE_LENGTH]; + _handlerTable[0] = badResponse; + } + + ServerResponseHandler::~ServerResponseHandler() { + delete[] _handlerTable; + } + + void ServerResponseHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + if(command<0||command>=HANDLER_TABLE_LENGTH) { + errlogSevPrintf(errlogMinor, + "Invalid (or unsupported) command: %d.", command); + // TODO remove debug output + ostringstream name; + name<<"Invalid CA header "<getArray(), + payloadBuffer->getPosition(), payloadSize); + return; + } + + // delegate + _handlerTable[command]->handleResponse(responseFrom, transport, + version, command, payloadSize, payloadBuffer); + } + + } +} diff --git a/pvAccessApp/server/serverContext.h b/pvAccessApp/server/serverContext.h new file mode 100644 index 0000000..d535616 --- /dev/null +++ b/pvAccessApp/server/serverContext.h @@ -0,0 +1,23 @@ +/* + * serverContext.h + * + * Created on: Jan 4, 2011 + * Author: Miha Vitorovic + */ + +#ifndef SERVERCONTEXT_H_ +#define SERVERCONTEXT_H_ + +namespace epics { + namespace pvAccess { + + + class ServerContextImpl { + + }; + + } +} + + +#endif /* SERVERCONTEXT_H_ */ diff --git a/pvAccessApp/utils/growingCircularBuffer.h b/pvAccessApp/utils/growingCircularBuffer.h index 2d8c28a..6a993a1 100644 --- a/pvAccessApp/utils/growingCircularBuffer.h +++ b/pvAccessApp/utils/growingCircularBuffer.h @@ -134,7 +134,6 @@ namespace epics { _elements[_putPointer] = x; if (++_putPointer >= _size) _putPointer = 0; return _count == 1; - } template diff --git a/pvAccessApp/utils/namedLockPattern.cpp b/pvAccessApp/utils/namedLockPattern.cpp new file mode 100644 index 0000000..538d502 --- /dev/null +++ b/pvAccessApp/utils/namedLockPattern.cpp @@ -0,0 +1,7 @@ +/* + * namedLockPattern.cpp + */ + +#include "namedLockPattern.h" + +//NOTE NamedLockPattern is template so implementation is in header file diff --git a/pvAccessApp/utils/namedLockPattern.h b/pvAccessApp/utils/namedLockPattern.h new file mode 100644 index 0000000..d57455a --- /dev/null +++ b/pvAccessApp/utils/namedLockPattern.h @@ -0,0 +1,144 @@ +/* + * namedLockPattern.h + */ + +#ifndef NAMEDLOCKPATTERN_H +#define NAMEDLOCKPATTERN_H + +#include +#include + +#include +#include + +#include "referenceCountingLock.h" + +using namespace std; +using namespace epics::pvData; + +namespace epics { namespace pvAccess { +/** + * NamedLockPattern + */ +template > +class NamedLockPattern +{ +public: + /** + * Constructor. + */ + NamedLockPattern() {}; + /** + * Destructor. + */ + virtual ~NamedLockPattern() {}; + /** + * Acquire synchronization lock for named object. + * @param name name of the object whose lock to acquire. + * @param msec the number of milleseconds to wait. + * An argument less than or equal to zero means not to wait at all. + * @return true if acquired, false othwerwise. + */ + bool acquireSynchronizationObject(const Key name, const int64 msec); + /** + * Release synchronization lock for named object. + * @param name name of the object whose lock to release. + */ + void releaseSynchronizationObject(const Key name); +private: + Mutex _mutex; + std::map _namedLocks; + typename std::map::iterator _namedLocksIter; + + /** + * Release synchronization lock for named object. + * @param name name of the object whose lock to release. + * @param release set to false if there is no need to call release + * on synchronization lock. + */ + void releaseSynchronizationObject(const Key name,const bool release); +}; + +template +bool NamedLockPattern::acquireSynchronizationObject(const Key name, const int64 msec) +{ + ReferenceCountingLock* lock; + { //due to guard + Lock guard(&_mutex); + + _namedLocksIter = _namedLocks.find(name); + // get synchronization object + + // none is found, create and return new one + // increment references + if(_namedLocksIter == _namedLocks.end()) + { + lock = new ReferenceCountingLock(); + _namedLocks[name] = lock; + } + else + { + lock = _namedLocksIter->second; + lock->increment(); + } + } // end of guarded area + + bool success = lock->acquire(msec); + + if(!success) + { + releaseSynchronizationObject(name, false); + } + + return success; +} + +template +void NamedLockPattern::releaseSynchronizationObject(const Key name) +{ + releaseSynchronizationObject(name, true); +} + +template +void NamedLockPattern::releaseSynchronizationObject(const Key name,const bool release) +{ + Lock guard(&_mutex); + ReferenceCountingLock* lock; + _namedLocksIter = _namedLocks.find(name); + + // release lock + if (_namedLocksIter != _namedLocks.end()) + { + lock = _namedLocksIter->second; + + // release the lock + if (release) + { + lock->release(); + } + + // if there only one current lock exists + // remove it from the map + if (lock->decrement() <= 0) + { + _namedLocks.erase(_namedLocksIter); + delete lock; + } + } +} + +template +class NamedLock : private NoDefaultMethods +{ +public: + NamedLock(NamedLockPattern* namedLockPattern): _namedLockPattern(namedLockPattern) {} + bool acquireSynchronizationObject(const Key name, const int64 msec) {_name = name; return _namedLockPattern->acquireSynchronizationObject(name,msec);} + ~NamedLock(){_namedLockPattern->releaseSynchronizationObject(_name);} +private: + Key _name; + NamedLockPattern* _namedLockPattern; +}; + +}} + +#endif /* NAMEDLOCKPATTERN_H */ diff --git a/pvAccessApp/utils/referenceCountingLock.cpp b/pvAccessApp/utils/referenceCountingLock.cpp new file mode 100644 index 0000000..abd2250 --- /dev/null +++ b/pvAccessApp/utils/referenceCountingLock.cpp @@ -0,0 +1,85 @@ +/* + * namedLockPattern.cpp + */ + +#include "referenceCountingLock.h" + +namespace epics { namespace pvAccess { + +ReferenceCountingLock::ReferenceCountingLock(): _references(1) +{ + pthread_mutexattr_t mutexAttribute; + int32 retval = pthread_mutexattr_init(&mutexAttribute); + if(retval != 0) + { + //string errMsg = "Error: pthread_mutexattr_init failed: " + string(strerror(retval)); + assert(true); + } + retval = pthread_mutexattr_settype(&mutexAttribute, PTHREAD_MUTEX_RECURSIVE); + if(retval == 0) + { + retval = pthread_mutex_init(&_mutex, &mutexAttribute); + if(retval != 0) + { + //string errMsg = "Error: pthread_mutex_init failed: " + string(strerror(retval)); + assert(true); + } + } + else + { + //string errMsg = "Error: pthread_mutexattr_settype failed: " + string(strerror(retval)); + assert(true); + } + + pthread_mutexattr_destroy(&mutexAttribute); +} + +ReferenceCountingLock::~ReferenceCountingLock() +{ + pthread_mutex_destroy(&_mutex); +} + +bool ReferenceCountingLock::acquire(int64 msecs) +{ + struct timespec deltatime; + deltatime.tv_sec = msecs / 1000; + deltatime.tv_nsec = (msecs % 1000) * 1000; + + int32 retval = pthread_mutex_timedlock(&_mutex, &deltatime); + if(retval == 0) + { + return true; + } + return false; +} + +void ReferenceCountingLock::release() +{ + int retval = pthread_mutex_unlock(&_mutex); + if(retval != 0) + { + string errMsg = "Error: pthread_mutex_unlock failed: " + string(strerror(retval)); + //TODO do something? + } +} + +int ReferenceCountingLock::increment() +{ + //TODO does it really has to be atomic? + return ++_references; + // commented because linking depends on specific version of glibc library + // on i386 target + //return __sync_add_and_fetch(&_references,1); +} + +int ReferenceCountingLock::decrement() +{ + //TODO does it really has to be atomic? + return --_references; + // commented because linking depends on specific version of glibc library + // on i386 target + //return __sync_sub_and_fetch(&_references,1); +} + +}} + diff --git a/pvAccessApp/utils/referenceCountingLock.h b/pvAccessApp/utils/referenceCountingLock.h new file mode 100644 index 0000000..2825cae --- /dev/null +++ b/pvAccessApp/utils/referenceCountingLock.h @@ -0,0 +1,74 @@ +/* + * referenceCountingLock.h + */ + +#ifndef REFERENCECOUNTINGLOCK_H +#define REFERENCECOUNTINGLOCK_H + +#include +#include +#include +#include +#include + +#include +#include + +using namespace std; +using namespace epics::pvData; + +namespace epics { namespace pvAccess { + +/** + * Reference counting mutex implementation w/ deadlock detection. + * Synchronization helper class used (intended for use) for activation/deactivation synchronization. + * This class enforces attempt method of acquiring the locks to prevent deadlocks. + * Class also offers reference counting. + * (NOTE: automatic lock counting was not implemented due to imperfect usage.) + * + */ +class ReferenceCountingLock +{ +public: + /** + * Constructor of ReferenceCountingLock. + * After construction lock is free and reference count equals 1. + */ + ReferenceCountingLock(); + /** + * Destructor of ReferenceCountingLock. + */ + virtual ~ReferenceCountingLock(); + /** + * Attempt to acquire lock. + * + * @param msecs the number of milleseconds to wait. + * An argument less than or equal to zero means not to wait at all. + * @return true if acquired, false otherwise. + */ + bool acquire(int64 msecs); + /** + * Release previously acquired lock. + */ + void release(); + /** + * Increment number of references. + * + * @return number of references. + */ + int increment(); + /** + * Decrement number of references. + * + * @return number of references. + */ + int decrement(); +private: + int _references; + pthread_mutex_t _mutex; + +}; + +}} + +#endif /* NAMEDLOCKPATTERN_H */ diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index 2228d16..0ddd4f4 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -18,6 +18,9 @@ PROD_HOST += testBeaconEmitter testBeaconEmitter_SRCS += testBeaconEmitter.cpp testBeaconEmitter_LIBS += pvData pvAccess Com +PROD_HOST += testBeaconHandler +testBeaconHandler_SRCS += testBeaconHandler.cpp +testBeaconHandler_LIBS += pvData pvAccess Com include $(TOP)/configure/RULES #---------------------------------------- diff --git a/testApp/remote/testBeaconEmitter.cpp b/testApp/remote/testBeaconEmitter.cpp index ac1ec33..9ddd828 100644 --- a/testApp/remote/testBeaconEmitter.cpp +++ b/testApp/remote/testBeaconEmitter.cpp @@ -42,9 +42,9 @@ void testBeaconEmitter() osiSockAddr* addr = new osiSockAddr; addr->ia.sin_family = AF_INET; addr->ia.sin_port = htons(5067); - if(inet_aton("92.50.75.255",&addr->ia.sin_addr)==0) { - cout<<"error in inet_aton()"<ia.sin_addr)==0) + { + assert(false); } broadcastAddresses->push_back(addr); BlockingUDPConnector connector(true, broadcastAddresses, true); diff --git a/testApp/remote/testBeaconHandler.cpp b/testApp/remote/testBeaconHandler.cpp index 435acac..30a51e2 100644 --- a/testApp/remote/testBeaconHandler.cpp +++ b/testApp/remote/testBeaconHandler.cpp @@ -6,6 +6,7 @@ #include "blockingUDP.h" #include "beaconHandler.h" #include "inetAddressUtil.h" +#include "introspectionRegistry.h" #include @@ -14,32 +15,105 @@ using namespace epics::pvAccess; using namespace epics::pvData; +using namespace std; + +void decodeFromIPv6Address(ByteBuffer* buffer, osiSockAddr* address) +{ + // IPv4 compatible IPv6 address + // first 80-bit are 0 + buffer->getLong(); + buffer->getShort(); + // next 16-bits are 1 + buffer->getShort(); + // following IPv4 address in big-endian (network) byte order + in_addr_t ipv4Addr = 0; + ipv4Addr |= (uint32)buffer->getByte() << 24; + ipv4Addr |= (uint32)buffer->getByte() << 16; + ipv4Addr |= (uint32)buffer->getByte() << 8; + ipv4Addr |= (uint32)buffer->getByte() << 0; + address->ia.sin_addr.s_addr = ipv4Addr; +} class BeaconResponseHandler : public ResponseHandler { public: + BeaconResponseHandler() + { + _pvDataCreate = getPVDataCreate(); + } + virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, ByteBuffer* payloadBuffer) { - cout << "DummyResponseHandler::handleResponse" << endl; + cout << "BeaconResponseHandler::handleResponse" << endl; + + // reception timestamp + TimeStamp timestamp; + timestamp.getCurrent(); + + //TODO + //super.handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + + transport->ensureData((2*sizeof(int16)+2*sizeof(int32)+128)/sizeof(int8)); + + const int32 sequentalID = payloadBuffer->getShort() & 0x0000FFFF; + const TimeStamp startupTimestamp(payloadBuffer->getInt() & 0x00000000FFFFFFFFL,(int32)(payloadBuffer->getInt() & 0x00000000FFFFFFFFL)); + + // 128-bit IPv6 address + osiSockAddr address; + decodeFromIPv6Address(payloadBuffer, &address); + + // get port + const int32 port = payloadBuffer->getShort() & 0xFFFF; + address.ia.sin_port = ntohs(port); + + // accept given address if explicitly specified by sender + if (!ipv4AddressToInt(address)) + { + responseFrom->ia.sin_port = port; + } + else + { + responseFrom->ia.sin_port = port; + responseFrom->ia.sin_addr.s_addr = address.ia.sin_addr.s_addr; + } + + //org.epics.ca.client.impl.remote.BeaconHandler beaconHandler = context.getBeaconHandler(responseFrom); + // currently we care only for servers used by this context + //if (beaconHandler == null) + // return; + + // extra data + PVFieldPtr data = NULL; + const FieldConstPtr field = IntrospectionRegistry::deserializeFull(payloadBuffer, transport); + if (field != NULL) + { + data = _pvDataCreate->createPVField(NULL, field); + data->deserialize(payloadBuffer, transport); + } + + // notify beacon handler + //beaconHandler.beaconNotify(responseFrom, version, timestamp, startupTimestamp, sequentalID, data); } + +private: + PVDataCreate* _pvDataCreate; + BeaconHandler* _beaconHandler; }; void testBeaconHandler() { - BeacondResponseHandler brh; + BeaconResponseHandler brh; BlockingUDPConnector connector(false, NULL, true); - DummyClientContext context; osiSockAddr bindAddr; bindAddr.ia.sin_family = AF_INET; bindAddr.ia.sin_port = htons(5067); bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY); Transport* transport = connector.connect(NULL, &brh, &bindAddr, 1, 50); - - ((BlockingUDPTransport*)transport)->start(); + (static_cast(transport))->start(); while(1) sleep(1); diff --git a/testApp/utils/Makefile b/testApp/utils/Makefile index 8e364c2..ba2628e 100644 --- a/testApp/utils/Makefile +++ b/testApp/utils/Makefile @@ -34,6 +34,10 @@ PROD_HOST += transportRegisterTest transportRegisterTest_SRCS += transportRegistryTest.cpp transportRegisterTest_LIBS += pvAccess Com pvData +PROD_HOST += namedLockPatternTest +namedLockPatternTest_SRCS += namedLockPatternTest.cpp +namedLockPatternTest_LIBS += pvAccess Com pvData + include $(TOP)/configure/RULES #---------------------------------------- # ADD RULES AFTER THIS LINE diff --git a/testApp/utils/namedLockPatternTest.cpp b/testApp/utils/namedLockPatternTest.cpp new file mode 100644 index 0000000..9224800 --- /dev/null +++ b/testApp/utils/namedLockPatternTest.cpp @@ -0,0 +1,216 @@ +/* + * namedLockPatternTest.cpp + * + */ + +#include "namedLockPattern.h" +#include "showConstructDestruct.h" + +#include +#include + +#include + +using namespace epics::pvAccess; +using namespace std; + +void testIntLockPattern() +{ + int64 timeout = 100; + NamedLockPattern namedLockPattern; + int name1 = 1; + assert(namedLockPattern.acquireSynchronizationObject(name1,timeout)); + assert(namedLockPattern.acquireSynchronizationObject(name1,timeout)); + namedLockPattern.releaseSynchronizationObject(name1); + namedLockPattern.releaseSynchronizationObject(name1); + int name2 = 2; + assert(namedLockPattern.acquireSynchronizationObject(name2,timeout)); + namedLockPattern.releaseSynchronizationObject(name2); +} + +void testIntPtrLockPattern() +{ + int64 timeout = 100; + NamedLockPattern namedLockPattern; + int name1 = 1; + assert(namedLockPattern.acquireSynchronizationObject(&name1,timeout)); + assert(namedLockPattern.acquireSynchronizationObject(&name1,timeout)); + namedLockPattern.releaseSynchronizationObject(&name1); + namedLockPattern.releaseSynchronizationObject(&name1); + int name2 = 2; + assert(namedLockPattern.acquireSynchronizationObject(&name2,timeout)); + namedLockPattern.releaseSynchronizationObject(&name2); +} + +struct cmp_str +{ + bool operator()(char const *a, char const *b) + { + return strcmp(a, b) < 0; + } +}; + +void testCharPtrLockPattern() +{ + int64 timeout = 100; + NamedLockPattern namedLockPattern; + string name1 = "lojze"; + assert(namedLockPattern.acquireSynchronizationObject(name1.c_str(),timeout)); + assert(namedLockPattern.acquireSynchronizationObject(name1.c_str(),timeout)); + namedLockPattern.releaseSynchronizationObject(name1.c_str()); + namedLockPattern.releaseSynchronizationObject(name1.c_str()); + string name2 = "francka"; + assert(namedLockPattern.acquireSynchronizationObject(name2.c_str(),timeout)); + namedLockPattern.releaseSynchronizationObject(name2.c_str()); +} + +struct comp_osiSockAddrPtr +{ + bool operator()(osiSockAddr const *a, osiSockAddr const *b) + { + if (a->sa.sa_family < b->sa.sa_family) return true; + if ((a->sa.sa_family == b->sa.sa_family) && (a->ia.sin_addr.s_addr < b->ia.sin_addr.s_addr )) return true; + if ((a->sa.sa_family == b->sa.sa_family) && (a->ia.sin_addr.s_addr == b->ia.sin_addr.s_addr ) && ( a->ia.sin_port < b->ia.sin_port )) return true; + return false; + } +}; + +void testOsiSockAddrLockPattern() +{ + int64 timeout = 10000; + NamedLockPattern namedLockPattern; + osiSockAddr name1; + name1.ia.sin_addr.s_addr = 1; + name1.ia.sin_port = 1; + name1.ia.sin_family = AF_INET; + + assert(namedLockPattern.acquireSynchronizationObject(&name1,timeout)); + assert(namedLockPattern.acquireSynchronizationObject(&name1,timeout)); + namedLockPattern.releaseSynchronizationObject(&name1); + namedLockPattern.releaseSynchronizationObject(&name1); + + osiSockAddr name2; + name2.ia.sin_addr.s_addr = 1; + name2.ia.sin_port = 1; + name2.ia.sin_family = AF_INET; + NamedLock namedGuard(&namedLockPattern); + assert(namedGuard.acquireSynchronizationObject(&name1,timeout)); +} + +struct comp_osiSockAddr +{ + bool operator()(osiSockAddr const a, osiSockAddr const b) + { + if (a.sa.sa_family < b.sa.sa_family) return true; + if ((a.sa.sa_family == b.sa.sa_family) && (a.ia.sin_addr.s_addr < b.ia.sin_addr.s_addr )) return true; + if ((a.sa.sa_family == b.sa.sa_family) && (a.ia.sin_addr.s_addr == b.ia.sin_addr.s_addr ) && ( a.ia.sin_port < b.ia.sin_port )) return true; + return false; + } +}; + +void* testWorker1(void* p) +{ + int32 timeout = 1000; + const int32 max = 1000; + NamedLockPattern* namedLockPattern = (NamedLockPattern*)p; + + for(int32 i = 0 ; i < max; i = i +2) + { + osiSockAddr addr; + addr.ia.sin_addr.s_addr = i; + addr.ia.sin_port = i; + addr.ia.sin_family = AF_INET; + NamedLock namedGuard(namedLockPattern); + assert(namedGuard.acquireSynchronizationObject(addr,timeout)); + usleep(1); + } + + //this one takes a lock, thread 2 will be slower and will get timeout + { //due to namedGuard + osiSockAddr addr; + addr.ia.sin_addr.s_addr = 1; + addr.ia.sin_port = 1; + addr.ia.sin_family = AF_INET; + NamedLock namedGuard(namedLockPattern); + assert(namedGuard.acquireSynchronizationObject(addr,timeout)); + sleep(5); + } + + return NULL; +} + + +void* testWorker2(void* p) +{ + int32 timeout = 1000; + const int32 max = 1000; + NamedLockPattern* namedLockPattern = (NamedLockPattern*)p; + + for(int32 i = 1 ; i < max; i = i + 2) + { + osiSockAddr addr; + addr.ia.sin_addr.s_addr = i; + addr.ia.sin_port = i; + addr.ia.sin_family = AF_INET; + NamedLock namedGuard(namedLockPattern); + assert(namedGuard.acquireSynchronizationObject(addr,timeout)); + usleep(1); + } + + //this thread sleeps a while and gets timeout on lock + { + sleep(1); + osiSockAddr addr; + addr.ia.sin_addr.s_addr = 1; + addr.ia.sin_port = 1; + addr.ia.sin_family = AF_INET; + NamedLock namedGuard(namedLockPattern); + assert(!namedGuard.acquireSynchronizationObject(addr,timeout)); + } + + return NULL; +} + +int main(int argc, char *argv[]) +{ + testIntLockPattern(); + testIntPtrLockPattern(); + testCharPtrLockPattern(); + testOsiSockAddrLockPattern(); + + pthread_t _worker1Id; + pthread_t _worker2Id; + + NamedLockPattern namedLockPattern; + + //create two threads + int32 retval = pthread_create(&_worker1Id, NULL, testWorker1, &namedLockPattern); + if(retval != 0) + { + assert(true); + } + + retval = pthread_create(&_worker2Id, NULL, testWorker2, &namedLockPattern); + if(retval != 0) + { + assert(true); + } + + //wait for threads + retval = pthread_join(_worker1Id, NULL); + if(retval != 0) + { + assert(true); + } + + retval = pthread_join(_worker2Id, NULL); + if(retval != 0) + { + assert(true); + } + + getShowConstructDestruct()->constuctDestructTotals(stdout); + return 0; +} + +