diff --git a/pvAccessApp/Makefile b/pvAccessApp/Makefile index 8cd62cc..6982657 100644 --- a/pvAccessApp/Makefile +++ b/pvAccessApp/Makefile @@ -52,6 +52,7 @@ LIBSRCS += beaconHandler.cpp LIBSRCS += blockingTCPTransport.cpp LIBSRCS += blockingClientTCPTransport.cpp LIBSRCS += blockingTCPConnector.cpp +LIBSRCS += blockingServerTCPTransport.cpp LIBRARY = pvAccess pvAccess_LIBS += Com diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp new file mode 100644 index 0000000..29272c5 --- /dev/null +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -0,0 +1,126 @@ +/* + * blockingServerTCPTransport.cpp + * + * Created on: Jan 4, 2011 + * Author: Miha Vitorovic + */ + +/* pvAccess */ +#include "blockingTCP.h" +#include "remote.h" + +/* pvData */ +#include +#include + +/* EPICSv3 */ +#include + +/* standard */ +#include + +using namespace epics::pvData; +using std::map; + +namespace epics { + namespace pvAccess { + + BlockingServerTCPTransport::BlockingServerTCPTransport( + Context* context, SOCKET channel, + ResponseHandler* responseHandler, int receiveBufferSize) : + BlockingTCPTransport(context, channel, responseHandler, + receiveBufferSize, CA_DEFAULT_PRIORITY), + _introspectionRegistry(new IntrospectionRegistry(true)), + _lastChannelSID(0), _channels( + new map ()), _channelsMutex( + new Mutex()) { + // NOTE: priority not yet known, default priority is used to register/unregister + // TODO implement priorities in Reactor... not that user will + // change it.. still getPriority() must return "registered" priority! + + start(); + } + + BlockingServerTCPTransport::~BlockingServerTCPTransport() { + delete _introspectionRegistry; + delete _channels; + delete _channelsMutex; + } + + void BlockingServerTCPTransport::destroyAllChannels() { + Lock lock(_channelsMutex); + if(_channels->size()==0) return; + + char ipAddrStr[64]; + ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + + errlogSevPrintf( + errlogInfo, + "Transport to %s still has %d channel(s) active and closing...", + ipAddrStr, _channels->size()); + + map::iterator it = _channels->begin(); + for(; it!=_channels->end(); it++) + it->second->destroy(); + + _channels->clear(); + } + + void BlockingServerTCPTransport::internalClose(bool force) { + BlockingTCPTransport::internalClose(force); + destroyAllChannels(); + } + + int BlockingServerTCPTransport::preallocateChannelSID() { + Lock lock(_channelsMutex); + // search first free (theoretically possible loop of death) + int sid = ++_lastChannelSID; + while(_channels->find(sid)!=_channels->end()) + sid = ++_lastChannelSID; + return sid; + } + + void BlockingServerTCPTransport::registerChannel(int sid, + ServerChannel* channel) { + Lock lock(_channelsMutex); + (*_channels)[sid] = channel; + } + + void BlockingServerTCPTransport::unregisterChannel(int sid) { + Lock lock(_channelsMutex); + _channels->erase(sid); + } + + ServerChannel* BlockingServerTCPTransport::getChannel(int sid) { + Lock lock(_channelsMutex); + + map::iterator it = _channels->find(sid); + if(it!=_channels->end()) return it->second; + + return NULL; + } + + int BlockingServerTCPTransport::getChannelCount() { + Lock lock(_channelsMutex); + return _channels->size(); + } + + void BlockingServerTCPTransport::send(ByteBuffer* buffer, + TransportSendControl* control) { + // + // send verification message + // + control->startMessage(1, 2*sizeof(int32)); + + // receive buffer size + buffer->putInt(getReceiveBufferSize()); + + // socket receive buffer size + buffer->putInt(getSocketReceiveBufferSize()); + + // send immediately + control->flush(true); + } + + } +} diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index 8bb1c8c..7b22d92 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -19,16 +19,17 @@ #include #include #include -#include #include /* EPICSv3 */ #include #include #include +#include /* standard */ #include +#include namespace epics { namespace pvAccess { @@ -485,8 +486,8 @@ namespace epics { virtual ~BlockingTCPConnector(); virtual Transport* connect(TransportClient* client, - ResponseHandler* responseHandler, osiSockAddr* address, - short transportRevision, int16 priority); + ResponseHandler* responseHandler, osiSockAddr* address, + short transportRevision, int16 priority); private: /** * Lock timeout @@ -524,6 +525,130 @@ namespace epics { }; + class BlockingServerTCPTransport : public BlockingTCPTransport, + public ChannelHostingTransport, + public TransportSender { + + BlockingServerTCPTransport(Context* context, SOCKET channel, + ResponseHandler* responseHandler, int receiveBufferSize); + + virtual ~BlockingServerTCPTransport(); + + virtual IntrospectionRegistry* getIntrospectionRegistry() { + return _introspectionRegistry; + } + + /** + * Preallocate new channel SID. + * @return new channel server id (SID). + */ + virtual int preallocateChannelSID(); + + /** + * De-preallocate new channel SID. + * @param sid preallocated channel SID. + */ + virtual void depreallocateChannelSID(int sid) { + // noop + } + + /** + * Register a new channel. + * @param sid preallocated channel SID. + * @param channel channel to register. + */ + virtual void registerChannel(int sid, ServerChannel* channel); + + /** + * Unregister a new channel (and deallocates its handle). + * @param sid SID + */ + virtual void unregisterChannel(int sid); + + /** + * Get channel by its SID. + * @param sid channel SID + * @return channel with given SID, NULL otherwise + */ + virtual ServerChannel* getChannel(int sid); + + /** + * Get channel count. + * @return channel count. + */ + virtual int getChannelCount(); + + virtual epics::pvData::PVField* getSecurityToken() { + return NULL; + } + + virtual void lock() { + // noop + } + + virtual void unlock() { + // noop + } + + /** + * Verify transport. Server side is self-verified. + */ + void verify() { + enqueueSendRequest(this); + verified(); + } + + /** + * CA connection validation request. + * A server sends a validate connection message when it receives a new connection. + * The message indicates that the server is ready to receive requests; the client must + * not send any messages on the connection until it has received the validate connection message + * from the server. No reply to the message is expected by the server. + * The purpose of the validate connection message is two-fold: + * It informs the client of the protocol version supported by the server. + * It prevents the client from writing a request message to its local transport + * buffers until after the server has acknowledged that it can actually process the + * request. This avoids a race condition caused by the server's TCP/IP stack + * accepting connections in its backlog while the server is in the process of shutting down: + * if the client were to send a request in this situation, the request + * would be lost but the client could not safely re-issue the request because that + * might violate at-most-once semantics. + * The validate connection message guarantees that a server is not in the middle + * of shutting down when the server's TCP/IP stack accepts an incoming connection + * and so avoids the race condition. + * @see org.epics.ca.impl.remote.TransportSender#send(java.nio.ByteBuffer, org.epics.ca.impl.remote.TransportSendControl) + */ + virtual void send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control); + + protected: + /** + * Introspection registry. + */ + IntrospectionRegistry* _introspectionRegistry; + + virtual void internalClose(bool force); + + private: + /** + * Last SID cache. + */ + volatile int _lastChannelSID; + + /** + * Channel table (SID -> channel mapping). + */ + std::map* _channels; + + Mutex* _channelsMutex; + + /** + * Destroy all channels. + */ + void destroyAllChannels(); + + }; + } } diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 91232a2..ae3a9ee 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -308,6 +309,73 @@ namespace epics { virtual void release(TransportClient* client) =0; }; + class ServerChannel { + public: + /** + * Get channel SID. + * @return channel SID. + */ + virtual int getSID() =0; + + /** + * Destroy server channel. + * This method MUST BE called if overriden. + */ + virtual void destroy() =0; + }; + + /** + * Interface defining a transport that hosts channels. + * @author Matej Sekoranja + * @version $Id: ChannelHostingTransport.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class ChannelHostingTransport { + public: + /** + * Get security token. + * @return security token, can be null. + */ + virtual epics::pvData::PVField* getSecurityToken() =0; + + /** + * Preallocate new channel SID. + * @return new channel server id (SID). + */ + virtual int preallocateChannelSID() =0; + + /** + * De-preallocate new channel SID. + * @param sid preallocated channel SID. + */ + virtual void depreallocateChannelSID(int sid) =0; + + /** + * Register a new channel. + * @param sid preallocated channel SID. + * @param channel channel to register. + */ + virtual void registerChannel(int sid, ServerChannel* channel) =0; + + /** + * Unregister a new channel (and deallocates its handle). + * @param sid SID + */ + virtual void unregisterChannel(int sid) =0; + + /** + * Get channel by its SID. + * @param sid channel SID + * @return channel with given SID, null otherwise + */ + virtual ServerChannel* getChannel(int sid) =0; + + /** + * Get channel count. + * @return channel count. + */ + virtual int getChannelCount() =0; + }; + } }