diff --git a/pvAccessApp/Makefile b/pvAccessApp/Makefile index fc2a966..df20643 100644 --- a/pvAccessApp/Makefile +++ b/pvAccessApp/Makefile @@ -49,10 +49,7 @@ INC += codec.h LIBSRCS += blockingUDPTransport.cpp LIBSRCS += blockingUDPConnector.cpp LIBSRCS += beaconHandler.cpp -LIBSRCS += blockingTCPTransport.cpp -LIBSRCS += blockingClientTCPTransport.cpp LIBSRCS += blockingTCPConnector.cpp -LIBSRCS += blockingServerTCPTransport.cpp LIBSRCS += simpleChannelSearchManagerImpl.cpp LIBSRCS += abstractResponseHandler.cpp LIBSRCS += blockingTCPAcceptor.cpp diff --git a/pvAccessApp/remote/blockingClientTCPTransport.cpp b/pvAccessApp/remote/blockingClientTCPTransport.cpp deleted file mode 100644 index ff6f2c6..0000000 --- a/pvAccessApp/remote/blockingClientTCPTransport.cpp +++ /dev/null @@ -1,240 +0,0 @@ -/** - * Copyright - See the COPYRIGHT that is included with this distribution. - * pvAccessCPP is distributed subject to a Software License Agreement found - * in file LICENSE that is included with this distribution. - */ - -#include -#include -#include - -#include - -#include -#include -#include - -using namespace std; -using namespace epics::pvData; - -namespace epics { - namespace pvAccess { - -#define EXCEPTION_GUARD(code) try { code; } \ - catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \ - catch (...) { LOG(logLevelError, "Unhandled exception caught from code at %s:%d.", __FILE__, __LINE__); } - - BlockingClientTCPTransport::BlockingClientTCPTransport( - Context::shared_pointer const & context, SOCKET channel, - auto_ptr& responseHandler, int receiveBufferSize, - TransportClient::shared_pointer client, int8 /*remoteTransportRevision*/, - float beaconInterval, int16 priority) : - BlockingTCPTransport(context, channel, responseHandler, receiveBufferSize, priority), - _connectionTimeout(beaconInterval*1000), - _unresponsiveTransport(false), - _verifyOrEcho(true) - { -// _autoDelete = false; - - // initialize owners list, send queue - acquire(client); - - // use immediate for clients - setFlushStrategy(DELAYED); - - // setup connection timeout timer (watchdog) - epicsTimeGetCurrent(&_aliveTimestamp); - } - - void BlockingClientTCPTransport::start() - { - TimerCallbackPtr tcb = std::tr1::dynamic_pointer_cast(shared_from_this()); - _context->getTimer()->schedulePeriodic(tcb, _connectionTimeout, _connectionTimeout); - BlockingTCPTransport::start(); - } - - BlockingClientTCPTransport::~BlockingClientTCPTransport() { - } - - void BlockingClientTCPTransport::callback() { - epicsTimeStamp currentTime; - epicsTimeGetCurrent(¤tTime); - - _mutex.lock(); - // no exception expected here - double diff = epicsTimeDiffInSeconds(¤tTime, &_aliveTimestamp); - _mutex.unlock(); - - if(diff>2*_connectionTimeout) { - unresponsiveTransport(); - } - // use some k (3/4) to handle "jitter" - else if(diff>=((3*_connectionTimeout)/4)) { - // send echo - TransportSender::shared_pointer transportSender = std::tr1::dynamic_pointer_cast(shared_from_this()); - enqueueSendRequest(transportSender); - } - } - - void BlockingClientTCPTransport::unresponsiveTransport() { - Lock lock(_mutex); - if(!_unresponsiveTransport) { - _unresponsiveTransport = true; - - TransportClientMap_t::iterator it = _owners.begin(); - for(; it!=_owners.end(); it++) { - TransportClient::shared_pointer client = it->second.lock(); - if (client) - { - EXCEPTION_GUARD(client->transportUnresponsive()); - } - } - } - } - - bool BlockingClientTCPTransport::acquire(TransportClient::shared_pointer const & client) { - Lock lock(_mutex); - if(_closed.get()) return false; - - char ipAddrStr[48]; - ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); - LOG(logLevelDebug, "Acquiring transport to %s.", ipAddrStr); - - _owners[client->getID()] = TransportClient::weak_pointer(client); - //_owners.insert(TransportClient::weak_pointer(client)); - - return true; - } - - // _mutex is held when this method is called - void BlockingClientTCPTransport::internalClose(bool forced) { - BlockingTCPTransport::internalClose(forced); - - TimerCallbackPtr tcb = std::tr1::dynamic_pointer_cast(shared_from_this()); - _context->getTimer()->cancel(tcb); - } - - void BlockingClientTCPTransport::internalPostClose(bool forced) { - BlockingTCPTransport::internalPostClose(forced); - - // _owners cannot change when transport is closed - closedNotifyClients(); - } - - /** - * Notifies clients about disconnect. - */ - void BlockingClientTCPTransport::closedNotifyClients() { - - // check if still acquired - size_t refs = _owners.size(); - if(refs>0) { - char ipAddrStr[48]; - ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); - LOG( - logLevelDebug, - "Transport to %s still has %d client(s) active and closing...", - ipAddrStr, refs); - - TransportClientMap_t::iterator it = _owners.begin(); - for(; it!=_owners.end(); it++) { - TransportClient::shared_pointer client = it->second.lock(); - if (client) - { - EXCEPTION_GUARD(client->transportClosed()); - } - } - - } - - _owners.clear(); - } - - //void BlockingClientTCPTransport::release(TransportClient::shared_pointer const & client) { - void BlockingClientTCPTransport::release(pvAccessID clientID) { - Lock lock(_mutex); - if(_closed.get()) return; - - char ipAddrStr[48]; - ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); - - LOG(logLevelDebug, "Releasing transport to %s.", ipAddrStr); - - _owners.erase(clientID); - //_owners.erase(TransportClient::weak_pointer(client)); - - // not used anymore, close it - // TODO consider delayed destruction (can improve performance!!!) - if(_owners.size()==0) close(); // TODO close(false) - } - - void BlockingClientTCPTransport::aliveNotification() { - Lock guard(_mutex); - epicsTimeGetCurrent(&_aliveTimestamp); - if(_unresponsiveTransport) responsiveTransport(); - } - - void BlockingClientTCPTransport::responsiveTransport() { - Lock lock(_mutex); - if(_unresponsiveTransport) { - _unresponsiveTransport = false; - - Transport::shared_pointer thisSharedPtr = shared_from_this(); - TransportClientMap_t::iterator it = _owners.begin(); - for(; it!=_owners.end(); it++) { - TransportClient::shared_pointer client = it->second.lock(); - if (client) - { - EXCEPTION_GUARD(client->transportResponsive(thisSharedPtr)); - } - } - } - } - - void BlockingClientTCPTransport::changedTransport() { - outgoingIR.reset(); - - Lock lock(_mutex); - TransportClientMap_t::iterator it = _owners.begin(); - for(; it!=_owners.end(); it++) { - TransportClient::shared_pointer client = it->second.lock(); - if (client) - { - EXCEPTION_GUARD(client->transportChanged()); - } - } - } - - void BlockingClientTCPTransport::send(ByteBuffer* buffer, - TransportSendControl* control) { - if(_verifyOrEcho) { - /* - * send verification response message - */ - - control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32)+sizeof(int16)); - - // receive buffer size - buffer->putInt(static_cast(getReceiveBufferSize())); - - // socket receive buffer size - buffer->putInt(static_cast(getSocketReceiveBufferSize())); - - // connection priority - buffer->putShort(getPriority()); - - // send immediately - control->flush(true); - - _verifyOrEcho = false; - } - else { - control->startMessage(CMD_ECHO, 0); - // send immediately - control->flush(true); - } - - } - - } -} diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp deleted file mode 100644 index 9a7bc94..0000000 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Copyright - See the COPYRIGHT that is included with this distribution. - * pvAccessCPP is distributed subject to a Software License Agreement found - * in file LICENSE that is included with this distribution. - */ - -#include -#include -#include - -#include -#include - -/* standard */ -#include - -using namespace epics::pvData; -using namespace std; - -namespace epics { -namespace pvAccess { - - BlockingServerTCPTransport::BlockingServerTCPTransport( - Context::shared_pointer const & context, SOCKET channel, - auto_ptr& responseHandler, int receiveBufferSize) : - BlockingTCPTransport(context, channel, responseHandler, receiveBufferSize, PVA_DEFAULT_PRIORITY), - _lastChannelSID(0) - { - // for performance testing - setFlushStrategy(DELAYED); - _delay = 0.000; - - // NOTE: priority not yet known, default priority is used to register/unregister - // TODO implement priorities in Reactor... not that user will - // change it.. still getPriority() must return "registered" priority! - - //start(); - } - - BlockingServerTCPTransport::~BlockingServerTCPTransport() { - } - - void BlockingServerTCPTransport::destroyAllChannels() { - Lock lock(_channelsMutex); - if(_channels.size()==0) return; - - char ipAddrStr[64]; - ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); - - LOG( - logLevelDebug, - "Transport to %s still has %u channel(s) active and closing...", - ipAddrStr, (unsigned int)_channels.size()); - - map::iterator it = _channels.begin(); - for(; it!=_channels.end(); it++) - it->second->destroy(); - - _channels.clear(); - } - - void BlockingServerTCPTransport::internalClose(bool force) { - Transport::shared_pointer thisSharedPtr = shared_from_this(); - BlockingTCPTransport::internalClose(force); - destroyAllChannels(); - } - - void BlockingServerTCPTransport::internalPostClose(bool forced) { - BlockingTCPTransport::internalPostClose(forced); - } - - pvAccessID BlockingServerTCPTransport::preallocateChannelSID() { - Lock lock(_channelsMutex); - // search first free (theoretically possible loop of death) - pvAccessID sid = ++_lastChannelSID; - while(_channels.find(sid)!=_channels.end()) - sid = ++_lastChannelSID; - return sid; - } - - void BlockingServerTCPTransport::registerChannel(pvAccessID sid, ServerChannel::shared_pointer const & channel) { - Lock lock(_channelsMutex); - _channels[sid] = channel; - } - - void BlockingServerTCPTransport::unregisterChannel(pvAccessID sid) { - Lock lock(_channelsMutex); - _channels.erase(sid); - } - - ServerChannel::shared_pointer BlockingServerTCPTransport::getChannel(pvAccessID sid) { - Lock lock(_channelsMutex); - - map::iterator it = _channels.find(sid); - if(it!=_channels.end()) return it->second; - - return ServerChannel::shared_pointer(); - } - - int BlockingServerTCPTransport::getChannelCount() { - Lock lock(_channelsMutex); - return static_cast(_channels.size()); - } - - void BlockingServerTCPTransport::send(ByteBuffer* buffer, - TransportSendControl* control) { - - // - // set byte order control message - // - - control->ensureBuffer(PVA_MESSAGE_HEADER_SIZE); - buffer->putByte(PVA_MAGIC); - buffer->putByte(PVA_VERSION); - buffer->putByte(0x01 | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00)); // control + big endian - buffer->putByte(2); // set byte order - buffer->putInt(0); - - - // - // send verification message - // - control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32)); - - // receive buffer size - buffer->putInt(static_cast(getReceiveBufferSize())); - - // socket receive buffer size - buffer->putInt(static_cast(getSocketReceiveBufferSize())); - - // send immediately - control->flush(true); - } - - } -} diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index 35d9945..b70c758 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -40,584 +40,9 @@ #include #include -// not implemented anyway -#define FLOW_CONTROL 0 - namespace epics { namespace pvAccess { - //class MonitorSender; - - enum ReceiveStage { - READ_FROM_SOCKET, PROCESS_HEADER, PROCESS_PAYLOAD, UNDEFINED_STAGE - }; - - class BlockingTCPTransport : - public Transport, - public TransportSendControl, - public std::tr1::enable_shared_from_this - { - protected: - BlockingTCPTransport(Context::shared_pointer const & context, SOCKET channel, - std::auto_ptr& responseHandler, int receiveBufferSize, - epics::pvData::int16 priority); - - public: - virtual bool isClosed() { - return _closed.get(); - } - - virtual epics::pvData::int8 getRevision() const { - return PVA_PROTOCOL_REVISION; - } - - virtual void setRemoteRevision(epics::pvData::int8 revision) { - _remoteTransportRevision = revision; - } - - virtual void setRemoteTransportReceiveBufferSize(std::size_t remoteTransportReceiveBufferSize) { - _remoteTransportReceiveBufferSize = remoteTransportReceiveBufferSize; - } - - virtual void setRemoteTransportSocketReceiveBufferSize(std::size_t socketReceiveBufferSize) { - _remoteTransportSocketReceiveBufferSize = socketReceiveBufferSize; - } - - virtual epics::pvData::String getType() const { - return epics::pvData::String("TCP"); - } - - virtual void aliveNotification() { - // noop - } - - virtual void changedTransport() { - // noop - } - - virtual const osiSockAddr* getRemoteAddress() const { - return &_socketAddress; - } - - virtual epics::pvData::int16 getPriority() const { - return _priority; - } - - virtual std::size_t getReceiveBufferSize() const { - return _socketBuffer->getSize(); - } - - /** - * Get remote transport receive buffer size (in bytes). - * @return remote transport receive buffer size - */ - virtual std::size_t getRemoteTransportReceiveBufferSize() const { - return _remoteTransportReceiveBufferSize; - } - - virtual std::size_t getSocketReceiveBufferSize() const; - - virtual bool verify(epics::pvData::int32 timeoutMs) { - return _verifiedEvent.wait(timeoutMs/1000.0); - - //epics::pvData::Lock lock(_verifiedMutex); - //return _verified; - } - - virtual void verified() { - epics::pvData::Lock lock(_verifiedMutex); - _verified = true; - _verifiedEvent.signal(); - } - - virtual void setRecipient(const osiSockAddr& /*sendTo*/) { - // noop - } - - virtual void flush(bool lastMessageCompleted); - virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity); - virtual void endMessage(); - - virtual void flushSerializeBuffer() { - flush(false); - } - - virtual void ensureBuffer(std::size_t size); - - virtual void alignBuffer(std::size_t alignment); - - virtual void ensureData(std::size_t size); - - virtual void alignData(std::size_t alignment); - - virtual bool directSerialize(epics::pvData::ByteBuffer *existingBuffer, const char* toSerialize, - std::size_t elementCount, std::size_t elementSize); - - virtual bool directDeserialize(epics::pvData::ByteBuffer *existingBuffer, char* deserializeTo, - std::size_t elementCount, std::size_t elementSize); - - void processReadIntoDirectBuffer(std::size_t bytesToRead); - - virtual void close(); - - virtual void setByteOrder(int /*byteOrder*/) - { - // not used this this implementation - } - - FlushStrategy getFlushStrategy() { - return _flushStrategy; - } - - void setFlushStrategy(FlushStrategy flushStrategy) { - _flushStrategy = flushStrategy; - } - - //void requestFlush(); - - /** - * Close and free connection resources. - */ - void freeConnectionResorces(); - - /** - * Starts the receive and send threads - */ - virtual void start(); - - virtual void enqueueSendRequest(TransportSender::shared_pointer const & sender); - - //void enqueueMonitorSendRequest(TransportSender::shared_pointer const & sender); - - virtual void enqueueOnlySendRequest(TransportSender::shared_pointer const & sender); - - virtual void flushSendQueue(); - - virtual void cachedSerialize( - const std::tr1::shared_ptr& field, epics::pvData::ByteBuffer* buffer) - { - outgoingIR.serialize(field, buffer, this); - } - - virtual std::tr1::shared_ptr - cachedDeserialize(epics::pvData::ByteBuffer* buffer) - { - return incomingIR.deserialize(buffer, this); - } - - protected: - - virtual void processReadCached(bool nestedCall, - ReceiveStage inStage, std::size_t requiredBytes); - - /** - * Called to any resources just before closing transport - * @param[in] force flag indicating if forced (e.g. forced - * disconnect) is required - */ - virtual void internalClose(bool force); - - /** - * Called to any resources just after closing transport and without any locks held on transport - * @param[in] force flag indicating if forced (e.g. forced - * disconnect) is required - */ - virtual void internalPostClose(bool force); - - /** - * Send a buffer through the transport. - * NOTE: TCP sent buffer/sending has to be synchronized (not done by this method). - * @param buffer[in] buffer to be sent - * @return success indicator - */ - virtual bool send(epics::pvData::ByteBuffer* buffer); - - virtual ~BlockingTCPTransport(); - - -#if FLOW_CONTROL - /** - * Default marker period. - */ - static const std::size_t MARKER_PERIOD = 1024; -#endif - static const std::size_t MAX_ENSURE_DATA_BUFFER_SIZE = 1024; - -// TODO - double _delay; - - /****** finally initialized at construction time and after start (called by the same thread) ********/ - - /** - * Corresponding channel. - */ - SOCKET _channel; - - /** - * Cached socket address. - */ - osiSockAddr _socketAddress; - - /** - * Priority. - * NOTE: Priority cannot just be changed, since it is registered - * in transport registry with given priority. - */ - epics::pvData::int16 _priority; - // TODO to be implemeneted - - /** - * PVAS response handler. - */ - std::auto_ptr _responseHandler; - - // TODO review int vs std::size_t - - /** - * Send buffer size. - */ - std::size_t _maxPayloadSize; - - /** - * Send buffer size. - */ - int _socketSendBufferSize; - - /** - * Marker "period" in bytes (every X bytes marker should be set). - */ - epics::pvData::int64 _markerPeriodBytes; - - - FlushStrategy _flushStrategy; - - - epicsThreadId _rcvThreadId; - - epicsThreadId _sendThreadId; - - // TODO - //MonitorSender* _monitorSender; - - Context::shared_pointer _context; - - bool _autoDelete; - - - - /**** after verification ****/ - - /** - * Remote side transport revision (minor). - */ - epics::pvData::int8 _remoteTransportRevision; - - /** - * Remote side transport receive buffer size. - */ - size_t _remoteTransportReceiveBufferSize; - - /** - * Remote side transport socket receive buffer size. - */ - size_t _remoteTransportSocketReceiveBufferSize; - - - - /*** send thread only - no need to sync ***/ - // NOTE: now all send-related external calls are TransportSender IF - // and its reference is only valid when called from send thread - - // initialized at construction time - std::deque _sendQueue; - epics::pvData::Mutex _sendQueueMutex; - - // initialized at construction time -// std::deque _monitorSendQueue; -// epics::pvData::Mutex _monitorMutex; - - /** - * Send buffer. - */ - epics::pvData::ByteBuffer* _sendBuffer; - -#if FLOW_CONTROL - /** - * Next planned marker position. - */ - epics::pvData::int64 _nextMarkerPosition; -#endif - - /** - * Send pending flag. - */ - bool _sendPending; - - /** - * Last message start position. - */ - int _lastMessageStartPosition; - - epics::pvData::int8 _lastSegmentedMessageType; - epics::pvData::int8 _lastSegmentedMessageCommand; - - bool _flushRequested; - - int _sendBufferSentPosition; - - epics::pvData::int8 _byteOrderFlag; - - /** - * Outgoing (codes generated by this party) introspection registry. - */ - IntrospectionRegistry outgoingIR; - - - - - - - /*** receive thread only - no need to sync ***/ - - // initialized at construction time - epics::pvData::ByteBuffer* _socketBuffer; - - std::size_t _startPosition; - - std::size_t _storedPayloadSize; - std::size_t _storedPosition; - std::size_t _storedLimit; - - epics::pvData::int8 _version; - epics::pvData::int8 _packetType; - epics::pvData::int8 _command; - std::size_t _payloadSize; - - ReceiveStage _stage; - - std::size_t _directPayloadRead; - char * _directBuffer; - -#if FLOW_CONTROL - - /** - * Total bytes received. - */ - epics::pvData::int64 _totalBytesReceived; -#endif - - /** - * Incoming (codes generated by other party) introspection registry. - */ - IntrospectionRegistry incomingIR; - - - - /*** send/receive thread shared ***/ - - /** - * Connection status - * NOTE: synced by _mutex - */ - AtomicBoolean _closed; - - // NOTE: synced by _mutex - bool _sendThreadExited; - - epics::pvData::Mutex _mutex; - - - bool _verified; - epics::pvData::Mutex _verifiedMutex; - - - - - epics::pvData::Event _sendQueueEvent; - - epics::pvData::Event _verifiedEvent; - - - - - -#if FLOW_CONTROL - /** - * Marker to send. - * NOTE: synced by _flowControlMutex - */ - int _markerToSend; - - /** - * Total bytes sent. - * NOTE: synced by _flowControlMutex - */ - epics::pvData::int64 _totalBytesSent; - - /** - * Calculated remote free buffer size. - * NOTE: synced by _flowControlMutex - */ - epics::pvData::int64 _remoteBufferFreeSpace; - - epics::pvData::Mutex _flowControlMutex; -#endif - - private: - - /** - * Internal method that clears and releases buffer. - * sendLock and sendBufferLock must be hold while calling this method. - */ - void clearAndReleaseBuffer(); - - void endMessage(bool hasMoreSegments); - - bool flush(); - - void processSendQueue(); - - static void rcvThreadRunner(void* param); - - static void sendThreadRunner(void* param); - - /** - * Free all send buffers (return them to the cached buffer allocator). - */ - void freeSendBuffers(); - }; - - - class BlockingClientTCPTransport : public BlockingTCPTransport, - public TransportSender, - public epics::pvData::TimerCallback { - - public: - POINTER_DEFINITIONS(BlockingClientTCPTransport); - - private: - BlockingClientTCPTransport(Context::shared_pointer const & context, SOCKET channel, - std::auto_ptr& responseHandler, int receiveBufferSize, - TransportClient::shared_pointer client, epics::pvData::int8 remoteTransportRevision, - float beaconInterval, epics::pvData::int16 priority); - - public: - static shared_pointer create(Context::shared_pointer const & context, SOCKET channel, - std::auto_ptr& responseHandler, int receiveBufferSize, - TransportClient::shared_pointer client, epics::pvData::int8 remoteTransportRevision, - float beaconInterval, epics::pvData::int16 priority) - { - shared_pointer thisPointer( - new BlockingClientTCPTransport(context, channel, responseHandler, receiveBufferSize, - client, remoteTransportRevision, beaconInterval, priority) - ); - thisPointer->start(); - return thisPointer; - } - - virtual void start(); - - virtual ~BlockingClientTCPTransport(); - - virtual void timerStopped() { - // noop - } - - virtual void callback(); - - /** - * Acquires transport. - * @param client client (channel) acquiring the transport - * @return true if transport was granted, false otherwise. - */ - virtual bool acquire(TransportClient::shared_pointer const & client); - - /** - * Releases transport. - * @param client client (channel) releasing the transport - */ - virtual void release(pvAccessID clientId); - //virtual void release(TransportClient::shared_pointer const & client); - - /** - * Alive notification. - * This method needs to be called (by newly received data or beacon) - * at least once in this period, if not echo will be issued - * and if there is not response to it, transport will be considered as unresponsive. - */ - virtual void aliveNotification(); - - /** - * Changed transport (server restared) notify. - */ - virtual void changedTransport(); - - virtual void lock() { - // noop - } - - virtual void unlock() { - // noop - } - - virtual void acquire() { - // noop, since does not make sence on itself - } - - virtual void release() { - // noop, since does not make sence on itself - } - - virtual void send(epics::pvData::ByteBuffer* buffer, - TransportSendControl* control); - - protected: - - virtual void internalClose(bool force); - virtual void internalPostClose(bool force); - - private: - - /** - * Owners (users) of the transport. - */ - // TODO consider using TR1 hash map - typedef std::map TransportClientMap_t; - TransportClientMap_t _owners; - - /** - * Connection timeout (no-traffic) flag. - */ - double _connectionTimeout; - - /** - * Unresponsive transport flag. - */ - bool _unresponsiveTransport; - - /** - * Timestamp of last "live" event on this transport. - */ - epicsTimeStamp _aliveTimestamp; - - bool _verifyOrEcho; - - /** - * Unresponsive transport notify. - */ - void unresponsiveTransport(); - - /** - * Notifies clients about disconnect. - */ - void closedNotifyClients(); - - /** - * Responsive transport notify. - */ - void responsiveTransport(); - }; - /** * Channel Access TCP connector. * @author Matej Sekoranja @@ -672,152 +97,6 @@ namespace epics { }; - class BlockingServerTCPTransport : public BlockingTCPTransport, - public ChannelHostingTransport, - public TransportSender { - public: - POINTER_DEFINITIONS(BlockingServerTCPTransport); - - private: - BlockingServerTCPTransport(Context::shared_pointer const & context, SOCKET channel, - std::auto_ptr& responseHandler, int receiveBufferSize); - public: - static shared_pointer create(Context::shared_pointer const & context, SOCKET channel, - std::auto_ptr& responseHandler, int receiveBufferSize) - { - shared_pointer thisPointer( - new BlockingServerTCPTransport(context, channel, responseHandler, receiveBufferSize) - ); - thisPointer->start(); - return thisPointer; - } - - virtual bool acquire(std::tr1::shared_ptr const & /*client*/) - { - return false; - } - - virtual void release(pvAccessID /*clientId*/) {} - - /** - * Preallocate new channel SID. - * @return new channel server id (SID). - */ - virtual pvAccessID preallocateChannelSID(); - - /** - * De-preallocate new channel SID. - * @param sid preallocated channel SID. - */ - virtual void depreallocateChannelSID(pvAccessID /*sid*/) { - // noop - } - - /** - * Register a new channel. - * @param sid preallocated channel SID. - * @param channel channel to register. - */ - virtual void registerChannel(pvAccessID sid, ServerChannel::shared_pointer const & channel); - - /** - * Unregister a new channel (and deallocates its handle). - * @param sid SID - */ - virtual void unregisterChannel(pvAccessID sid); - - /** - * Get channel by its SID. - * @param sid channel SID - * @return channel with given SID, NULL otherwise - */ - virtual ServerChannel::shared_pointer getChannel(pvAccessID sid); - - /** - * Get channel count. - * @return channel count. - */ - virtual int getChannelCount(); - - virtual epics::pvData::PVField::shared_pointer getSecurityToken() { - return epics::pvData::PVField::shared_pointer(); - } - - virtual void lock() { - // noop - } - - virtual void unlock() { - // noop - } - - virtual void acquire() { - // noop, since does not make sence on itself - } - - virtual void release() { - // noop, since does not make sence on itself - } - - /** - * Verify transport. Server side is self-verified. - */ - virtual bool verify(epics::pvData::int32 /*timeoutMs*/) { - TransportSender::shared_pointer transportSender = std::tr1::dynamic_pointer_cast(shared_from_this()); - enqueueSendRequest(transportSender); - verified(); - return true; - } - - /** - * PVA connection validation request. - * A server sends a validate connection message when it receives a new connection. - * The message indicates that the server is ready to receive requests; the client must - * not send any messages on the connection until it has received the validate connection message - * from the server. No reply to the message is expected by the server. - * The purpose of the validate connection message is two-fold: - * It informs the client of the protocol version supported by the server. - * It prevents the client from writing a request message to its local transport - * buffers until after the server has acknowledged that it can actually process the - * request. This avoids a race condition caused by the server's TCP/IP stack - * accepting connections in its backlog while the server is in the process of shutting down: - * if the client were to send a request in this situation, the request - * would be lost but the client could not safely re-issue the request because that - * might violate at-most-once semantics. - * The validate connection message guarantees that a server is not in the middle - * of shutting down when the server's TCP/IP stack accepts an incoming connection - * and so avoids the race condition. - * @see org.epics.ca.impl.remote.TransportSender#send(java.nio.ByteBuffer, org.epics.ca.impl.remote.TransportSendControl) - */ - virtual void send(epics::pvData::ByteBuffer* buffer, - TransportSendControl* control); - - virtual ~BlockingServerTCPTransport(); - - protected: - - virtual void internalClose(bool force); - virtual void internalPostClose(bool force); - - private: - /** - * Last SID cache. - */ - pvAccessID _lastChannelSID; - - /** - * Channel table (SID -> channel mapping). - */ - std::map _channels; - - epics::pvData::Mutex _channelsMutex; - - /** - * Destroy all channels. - */ - void destroyAllChannels(); - }; - class ResponseHandlerFactory { public: diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp index 0f7b381..bd99b9a 100644 --- a/pvAccessApp/remote/blockingTCPAcceptor.cpp +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -191,15 +191,12 @@ namespace pvAccess { LOG(logLevelDebug, "Error getting SO_SNDBUF: %s", strBuffer); } - /** * Create transport, it registers itself to the registry. - * Each transport should have its own response handler since it is not "shareable" */ - // TODO it is shareable?!!! but code is not adopted to it... std::auto_ptr responseHandler = _responseHandlerFactory->createResponseHandler(); - BlockingServerTCPTransportCodec::shared_pointer transport = - BlockingServerTCPTransportCodec::create( + detail::BlockingServerTCPTransportCodec::shared_pointer transport = + detail::BlockingServerTCPTransportCodec::create( _context, newClient, responseHandler, diff --git a/pvAccessApp/remote/blockingTCPConnector.cpp b/pvAccessApp/remote/blockingTCPConnector.cpp index 1871e9d..2e12404 100644 --- a/pvAccessApp/remote/blockingTCPConnector.cpp +++ b/pvAccessApp/remote/blockingTCPConnector.cpp @@ -150,7 +150,7 @@ namespace epics { LOG(logLevelDebug, "Error getting SO_SNDBUF: %s", strBuffer); } - transport = BlockingClientTCPTransportCodec::create( + transport = detail::BlockingClientTCPTransportCodec::create( context, socket, responseHandler, _receiveBufferSize, _socketSendBufferSize, client, transportRevision, _beaconInterval, priority); diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp deleted file mode 100644 index a99c655..0000000 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ /dev/null @@ -1,1342 +0,0 @@ -/** - * Copyright - See the COPYRIGHT that is included with this distribution. - * pvAccessCPP is distributed subject to a Software License Agreement found - * in file LICENSE that is included with this distribution. - */ - -#ifdef _WIN32 -#define NOMINMAX -#include -typedef SSIZE_T ssize_t; -#endif - -#define __STDC_LIMIT_MACROS 1 -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#include -#include -#include - -#include -#include -#include - -using namespace epics::pvData; - -using std::max; -using std::min; -using std::ostringstream; - -// TODO to be completely replaced by codec based implementation (see Java) -namespace epics { -namespace pvAccess { - - /* - class MonitorSender : public TransportSender, public NoDefaultMethods { - public: - MonitorSender(Mutex* monitorMutex, GrowingCircularBuffer< - TransportSender*>* monitorSendQueue) : - _monitorMutex(monitorMutex), - _monitorSendQueue(monitorSendQueue) { - } - - virtual ~MonitorSender() { - } - - virtual void lock() { - } - - virtual void unlock() { - } - - virtual void acquire() { - } - - virtual void release() { - } - - virtual void - send(ByteBuffer* buffer, TransportSendControl* control); - - private: - Mutex* _monitorMutex; - GrowingCircularBuffer* _monitorSendQueue; - }; - */ - - PVACCESS_REFCOUNT_MONITOR_DEFINE(blockingTCPTransport); - - //const double BlockingTCPTransport::_delay = 0.000; - - BlockingTCPTransport::BlockingTCPTransport(Context::shared_pointer const & context, - SOCKET channel, std::auto_ptr& responseHandler, - int receiveBufferSize, int16 priority) : - _delay(0.0), - _channel(channel), - _priority(priority), - _responseHandler(responseHandler), -#if FLOW_CONTROL - _markerPeriodBytes(MARKER_PERIOD), -#endif - _flushStrategy(DELAYED), - _rcvThreadId(0), - _sendThreadId(0), - //_monitorSender(new MonitorSender(&_monitorMutex,_monitorSendQueue)), - _context(context), - _autoDelete(true), - _remoteTransportRevision(0), - _remoteTransportReceiveBufferSize(MAX_TCP_RECV), - _remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV), - _sendQueue(), - //_monitorSendQueue(), -#if FLOW_CONTROL - _nextMarkerPosition(_markerPeriodBytes), -#endif - _sendPending(false), - _lastMessageStartPosition(0), - _lastSegmentedMessageType(0), - _lastSegmentedMessageCommand(0), - _flushRequested(false), - _sendBufferSentPosition(0), - _byteOrderFlag((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00), - _storedPayloadSize(0), - _storedPosition(0), - _storedLimit(0), - _version(0), - _packetType(0), - _command(0), - _payloadSize(0), - _stage(READ_FROM_SOCKET), - _directPayloadRead(0), - _directBuffer(0), -#if FLOW_CONTROL - _totalBytesReceived(0), -#endif - _closed(), - _sendThreadExited(false), - _verified(false) -#if FLOW_CONTROL - , - _markerToSend(0), - _totalBytesSent(0), - _remoteBufferFreeSpace(INT64_MAX) -#endif - { - PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(blockingTCPTransport); - - // TODO minor tweak: deque size is not preallocated... - - unsigned int bufferSize = max((int)(MAX_TCP_RECV+MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize); - // size must be "aligned" - bufferSize = (bufferSize + (PVA_ALIGNMENT - 1)) & (~(PVA_ALIGNMENT - 1)); - - _socketBuffer = new ByteBuffer(bufferSize); - _socketBuffer->setPosition(_socketBuffer->getLimit()); - _startPosition = _socketBuffer->getPosition(); - - // allocate buffer - _sendBuffer = new ByteBuffer(bufferSize); - _maxPayloadSize = _sendBuffer->getSize() - 2*PVA_MESSAGE_HEADER_SIZE; // one for header, one for flow control - - // get TCP send buffer size - osiSocklen_t intLen = sizeof(int); - int retval = getsockopt(_channel, SOL_SOCKET, SO_SNDBUF, (char *)&_socketSendBufferSize, &intLen); - if(unlikely(retval<0)) { - _socketSendBufferSize = MAX_TCP_RECV; - char errStr[64]; - epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); - LOG(logLevelDebug, - "Unable to retrieve socket send buffer size: %s", - errStr); - } - - // get remote address - osiSocklen_t saSize = sizeof(sockaddr); - retval = getpeername(_channel, &(_socketAddress.sa), &saSize); - if(unlikely(retval<0)) { - char errStr[64]; - epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); - LOG(logLevelError, - "Error fetching socket remote address: %s", - errStr); - } - - // set receive timeout so that we do not have problems at shutdown (recvfrom would block) - struct timeval timeout; - memset(&timeout, 0, sizeof(struct timeval)); - timeout.tv_sec = 1; - timeout.tv_usec = 0; - - if (unlikely(::setsockopt (_channel, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout)) < 0)) - { - char errStr[64]; - epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); - LOG(logLevelError, - "Failed to set SO_RCVTIMEO for TDP socket %s: %s.", - inetAddressToString(_socketAddress).c_str(), errStr); - } - - // TODO this will create marker with invalid endian flag - // prepare buffer - clearAndReleaseBuffer(); - } - - BlockingTCPTransport::~BlockingTCPTransport() { - PVACCESS_REFCOUNT_MONITOR_DESTRUCT(blockingTCPTransport); - - close(); - - // TODO use auto_ptr class members - - delete _socketBuffer; - delete _sendBuffer; - } - - // TODO consider epics::pvData::Thread - void BlockingTCPTransport::start() { - - // TODO this was in constructor - // add to registry - Transport::shared_pointer thisSharedPtr = shared_from_this(); - _context->getTransportRegistry()->put(thisSharedPtr); - - - String socketAddressString = inetAddressToString(_socketAddress); - - // - // start receive thread - // - - String threadName = "TCP-receive " + socketAddressString; - LOG(logLevelDebug, "Starting thread: %s", threadName.c_str()); - - _rcvThreadId = epicsThreadCreate(threadName.c_str(), - epicsThreadPriorityMedium, - epicsThreadGetStackSize(epicsThreadStackBig), - BlockingTCPTransport::rcvThreadRunner, this); - - // - // start send thread - // - - threadName = "TCP-send " + socketAddressString; - LOG(logLevelDebug, "Starting thread: %s",threadName.c_str()); - - _sendThreadId = epicsThreadCreate(threadName.c_str(), - epicsThreadPriorityMedium, - epicsThreadGetStackSize(epicsThreadStackSmall), - BlockingTCPTransport::sendThreadRunner, this); - } - - void BlockingTCPTransport::clearAndReleaseBuffer() { -#if FLOW_CONTROL - // NOTE: take care that nextMarkerPosition is set right - // fix position to be correct when buffer is cleared - // do not include pre-buffered flow control message; not 100% correct, but OK - _nextMarkerPosition -= _sendBuffer->getPosition() - PVA_MESSAGE_HEADER_SIZE; -#endif - - _sendQueueMutex.lock(); - _flushRequested = false; - _sendQueueMutex.unlock(); - - _sendBuffer->clear(); - - _sendPending = false; - -#if FLOW_CONTROL - // prepare ACK marker - _sendBuffer->putByte(PVA_MAGIC); - _sendBuffer->putByte(PVA_VERSION); - _sendBuffer->putByte(0x01 | _byteOrderFlag); // control data - _sendBuffer->putByte(1); // marker ACK - _sendBuffer->putInt(0); -#endif - } - - void BlockingTCPTransport::close() { - Lock lock(_mutex); - - // already closed check - if(_closed.get()) return; - _closed.set(); - - // remove from registry - Transport::shared_pointer thisSharedPtr = shared_from_this(); - _context->getTransportRegistry()->remove(thisSharedPtr).get(); - - // TODO !!! - bool force = true; - - // clean resources - internalClose(force); - - // notify send queue - _sendQueueEvent.signal(); - - lock.unlock(); - - // post close without a lock - internalPostClose(force); - } - - void BlockingTCPTransport::internalClose(bool /*force*/) { - // close the socket - if(_channel!=INVALID_SOCKET) { - epicsSocketDestroy(_channel); - } - } - - void BlockingTCPTransport::internalPostClose(bool /*force*/) { - } - - size_t BlockingTCPTransport::getSocketReceiveBufferSize() const { - // Get value of the SO_RCVBUF option for this DatagramSocket, - // that is the buffer size used by the platform for input on - // this DatagramSocket. - - int sockBufSize; - osiSocklen_t intLen = sizeof(int); - - int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF, (char *)&sockBufSize, &intLen); - if(unlikely(retval<0)) - { - char errStr[64]; - epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); - LOG(logLevelError, - "Socket getsockopt SO_RCVBUF error: %s", - errStr); - } - - return (size_t)sockBufSize; - } - - void BlockingTCPTransport::flush(bool lastMessageCompleted) { - - // automatic end - endMessage(!lastMessageCompleted); - - bool moreToSend = true; - while(moreToSend) { - moreToSend = !flush(); - - // all sent, exit - if(!moreToSend) break; - // TODO check if this is OK - else if (_closed.get()) THROW_BASE_EXCEPTION("transport closed"); - - // TODO solve this sleep in a better way - epicsThreadSleep(0.01); - } - - _lastMessageStartPosition = _sendBuffer->getPosition(); - - // start with last header - if (unlikely(!lastMessageCompleted && _lastSegmentedMessageType!=0)) - startMessage(_lastSegmentedMessageCommand, 0); - } - - void BlockingTCPTransport::startMessage(int8 command, size_t ensureCapacity) { - _lastMessageStartPosition = -1; - ensureBuffer(PVA_MESSAGE_HEADER_SIZE+ensureCapacity); - _lastMessageStartPosition = _sendBuffer->getPosition(); - _sendBuffer->putByte(PVA_MAGIC); - _sendBuffer->putByte(PVA_VERSION); - _sendBuffer->putByte(_lastSegmentedMessageType | _byteOrderFlag); // data + endianess - _sendBuffer->putByte(command); // command - _sendBuffer->putInt(0); // temporary zero payload - - } - - void BlockingTCPTransport::endMessage() { - endMessage(false); - } - - void BlockingTCPTransport::ensureBuffer(size_t size) { - if(likely(_sendBuffer->getRemaining()>=size)) return; - - // too large for buffer... - if(unlikely(_maxPayloadSizegetRemaining()getRemaining()<(alignment-1))) - ensureBuffer(alignment-1); - - _sendBuffer->align(alignment); - } - - void BlockingTCPTransport::endMessage(bool hasMoreSegments) { - if(likely(_lastMessageStartPosition>=0)) { - - // align - // alignBuffer(PVA_ALIGNMENT); - - // set paylaod size - const size_t payloadSize = _sendBuffer->getPosition()-_lastMessageStartPosition-PVA_MESSAGE_HEADER_SIZE; - - // TODO by spec? - // ignore empty segmented messages - if (payloadSize == 0 && _lastSegmentedMessageType != 0) - { - _sendBuffer->setPosition(_lastMessageStartPosition); - if (!hasMoreSegments) - _lastSegmentedMessageType = 0; - return; - - } - - _sendBuffer->putInt(_lastMessageStartPosition+sizeof(int16)+2, (int32)payloadSize); - - int flagsPosition = _lastMessageStartPosition+sizeof(int16); - // set segmented bit - if(likely(hasMoreSegments)) { - // first segment - if(unlikely(_lastSegmentedMessageType==0)) { - int8 type = _sendBuffer->getByte(flagsPosition); - - // set first segment bit - _sendBuffer->putByte(flagsPosition, (int8)(type|0x10)); - - // first + last segment bit == in-between segment - _lastSegmentedMessageType = (int8)(type|0x30); - _lastSegmentedMessageCommand = _sendBuffer->getByte( - flagsPosition+1); - } - } - else { - // last segment - if(unlikely(_lastSegmentedMessageType!=0)) { - // set last segment bit (by clearing first segment bit) - _sendBuffer->putByte(flagsPosition, - (int8)(_lastSegmentedMessageType&0xEF)); - _lastSegmentedMessageType = 0; - } - } - -#if FLOW_CONTROL - // manage markers - int position = _sendBuffer->getPosition(); - int bytesLeft = _sendBuffer->getRemaining(); - - if(unlikely(position>=_nextMarkerPosition && - bytesLeft>=PVA_MESSAGE_HEADER_SIZE)) { - _sendBuffer->putByte(PVA_MAGIC); - _sendBuffer->putByte(PVA_VERSION); - _sendBuffer->putByte(0x01 | _byteOrderFlag); // control data - _sendBuffer->putByte(0); // marker - s_sendBuffer->putInt((int)(_totalBytesSent+position+PVA_MESSAGE_HEADER_SIZE)); - _nextMarkerPosition = position+_markerPeriodBytes; - } -#endif - } - } - - void BlockingTCPTransport::ensureData(size_t size) { - // enough of data? - const size_t remainingBytes = _socketBuffer->getRemaining(); - if (likely(remainingBytes>=size)) return; - - // too large for buffer... - if (unlikely(MAX_ENSURE_DATA_BUFFER_SIZEgetPosition()-_storedPosition; - - // no more data and we have some payload left => read buffer - if (likely(_storedPayloadSize>=size)) - { - //LOG(logLevelInfo, - // "storedPayloadSize >= size, remaining: %d", - // _socketBuffer->getRemaining()); - - // just read up remaining payload, move current (getPosition(); - _storedLimit = _socketBuffer->getLimit(); - _socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize, - _storedLimit)); - } - else - { - // copy remaining bytes to safe region of buffer, if any - for(size_t i = 0; iputByte(i, _socketBuffer->getByte()); - - // extend limit to what was read - _socketBuffer->setLimit(_storedLimit); - - _stage = PROCESS_HEADER; - processReadCached(true, UNDEFINED_STAGE, size-remainingBytes); - - if (unlikely(remainingBytes > 0)) - { - // copy saved back to before position - for(int i = static_cast(remainingBytes-1), j = _socketBuffer->getPosition()-1; - i>=0; - i--, j--) - _socketBuffer->putByte(j, _socketBuffer->getByte(i)); - _startPosition = _socketBuffer->getPosition()-remainingBytes; - _socketBuffer->setPosition(_startPosition); - _storedPosition = _startPosition; - } - else - { - _storedPosition = _socketBuffer->getPosition(); - } - - _storedLimit = _socketBuffer->getLimit(); - _socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize, - _storedLimit)); - - // add if missing, since UNDEFINED_STAGE and return less... - if(unlikely(!_closed.get()&&(_socketBuffer->getRemaining()getRemaining()<(alignment-1))) - ensureData(alignment-1); - - _socketBuffer->align(alignment); - } - - bool BlockingTCPTransport::directSerialize(ByteBuffer* /*existingBuffer*/, const char* toSerialize, - std::size_t elementCount, std::size_t elementSize) - { - // TODO overflow check, size_t type, other is int32 for payloadSize header field !!! - // TODO do not ignore or new field in max message size in connection validation - std::size_t count = elementCount * elementSize; - - // TODO find smart limit - // check if direct mode actually pays off - if (count < 1024) - return false; - - // first end current message indicating the we will segment - endMessage(true); - - // append segmented message header - startMessage(_lastSegmentedMessageCommand, 0); - // set segmented message size - _sendBuffer->putInt(_lastMessageStartPosition+sizeof(int16)+2, (int32)count); - - // flush (TODO this is code is duplicated) - bool moreToSend = true; - while (moreToSend) { - moreToSend = !flush(); - - // all sent, exit - if(!moreToSend) break; - // TODO check if this is OK - else if (_closed.get()) THROW_BASE_EXCEPTION("transport closed"); - - // TODO solve this sleep in a better way - epicsThreadSleep(0.01); - } - _lastMessageStartPosition = _sendBuffer->getPosition(); - - // TODO think if alignment is preserved after... - - try { - //LOG(logLevelInfo, - // "Sending (direct) %d bytes in the packet to %s.", - // count, - // inetAddressToString(_socketAddress).c_str()); - const char* ptr = toSerialize; - while(count>0) { - ssize_t bytesSent = ::send(_channel, - ptr, - count, 0); - - if(unlikely(bytesSent<0)) { - - int socketError = SOCKERRNO; - - // spurious EINTR check - if (socketError==SOCK_EINTR) - continue; - - // TODO check this (copy below)... consolidate!!! - if (socketError==SOCK_ENOBUFS) { - // TODO improve this - epicsThreadSleep(0.01); - continue; - } - - // connection lost - - char errStr[64]; - epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); - ostringstream temp; - temp<<"error in sending TCP data: "<getRemaining()); - existingBuffer->getArray(_directBuffer, availableBytes); - _directPayloadRead -= availableBytes; - - if (_directPayloadRead == 0) - return true; - - _directBuffer += availableBytes; - - // subtract what was already processed - size_t pos = _socketBuffer->getPosition(); - _storedPayloadSize -= pos -_storedPosition; - _storedPosition = pos; - - // no more data and we have some payload left => read buffer - if (likely(_storedPayloadSize > 0)) - { - size_t bytesToRead = std::min(_directPayloadRead, _storedPayloadSize); - processReadIntoDirectBuffer(bytesToRead); - // std::cout << "d: " << bytesToRead << std::endl; - _storedPayloadSize -= bytesToRead; - _directPayloadRead -= bytesToRead; - } - - if (_directPayloadRead == 0) - return true; - - _stage = PROCESS_HEADER; - processReadCached(true, UNDEFINED_STAGE, _directPayloadRead); - - _storedPosition = _socketBuffer->getPosition(); - _storedLimit = _socketBuffer->getLimit(); - _socketBuffer->setLimit( - min(_storedPosition + _storedPayloadSize, _storedLimit) - ); - - } - - return true; - } - - void BlockingTCPTransport::processReadIntoDirectBuffer(size_t bytesToRead) - { - while (bytesToRead > 0) - { - ssize_t bytesRead = recv(_channel, _directBuffer, bytesToRead, 0); - - // std::cout << "d: " << bytesRead << std::endl; - - if(unlikely(bytesRead<=0)) - { - - if (bytesRead<0) - { - int socketError = SOCKERRNO; - - // interrupted or timeout - if (socketError == EINTR || - socketError == EAGAIN || - socketError == EWOULDBLOCK) - continue; - } - - // error (disconnect, end-of-stream) detected - close(); - - THROW_BASE_EXCEPTION("bytesRead < 0"); - - return; - } - - bytesToRead -= bytesRead; - _directBuffer += bytesRead; - - } - } - - void BlockingTCPTransport::processReadCached(bool nestedCall, - ReceiveStage inStage, size_t requiredBytes) { - try { - // TODO we need to throw exception in nextedCall not just bail out!!!! - while(likely(!_closed.get())) { - if(_stage==READ_FROM_SOCKET||inStage!=UNDEFINED_STAGE) { - - // add to bytes read -#if FLOW_CONTROL - int currentPosition = _socketBuffer->getPosition(); - _totalBytesReceived += (currentPosition - _startPosition); -#endif - // preserve alignment - int currentStartPosition = _startPosition = - MAX_ENSURE_DATA_BUFFER_SIZE; // "TODO uncomment align" + (unsigned int)currentPosition % PVA_ALIGNMENT; - - // copy remaining bytes, if any - int remainingBytes = _socketBuffer->getRemaining(); - - int endPosition = currentStartPosition + remainingBytes; - // TODO memmove - for(int i = MAX_ENSURE_DATA_BUFFER_SIZE; iputByte(i, _socketBuffer->getByte()); - - _socketBuffer->setPosition(endPosition); - _socketBuffer->setLimit(_socketBuffer->getSize()); - - // read at least requiredBytes bytes - - size_t requiredPosition = (currentStartPosition+requiredBytes); - while(_socketBuffer->getPosition()getPosition(); - - ssize_t bytesRead = recv(_channel, (char*)(_socketBuffer->getArray()+pos), -// _socketBuffer->getRemaining(), 0); -// TODO we assume that caller is smart and requiredBytes > remainingBytes -// if in direct read mode, try to read only header so that rest can be read directly to direct buffers -(_directPayloadRead > 0 && inStage == PROCESS_HEADER) ? (requiredBytes-remainingBytes) : _socketBuffer->getRemaining(), 0); -//std::cout << "i: " << bytesRead << std::endl; - - if(unlikely(bytesRead<=0)) { - - if (bytesRead<0) - { - int socketError = SOCKERRNO; - - // interrupted or timeout - if (socketError == EINTR || - socketError == EAGAIN || - socketError == EWOULDBLOCK) - continue; - } - - // error (disconnect, end-of-stream) detected - close(); - - if(nestedCall) - THROW_BASE_EXCEPTION("bytesRead < 0"); - - return; - } - - _socketBuffer->setPosition(pos+bytesRead); - } - - std::size_t pos = _socketBuffer->getPosition(); - _storedLimit = pos; - _socketBuffer->setLimit(pos); - _socketBuffer->setPosition(currentStartPosition); - - /* - hexDump("\n\n\n", "READ", - (const int8*)_socketBuffer->getArray(), - _socketBuffer->getPosition(), _socketBuffer->getRemaining()); - */ - - // notify liveness - aliveNotification(); - - // exit - if(inStage!=UNDEFINED_STAGE) return; - - _stage = PROCESS_HEADER; - } - - if(likely(_stage==PROCESS_HEADER)) { - - // reveal what's already in buffer - _socketBuffer->setLimit(_storedLimit); - - // ensure PVAConstants.PVA_MESSAGE_HEADER_SIZE bytes of data - if(unlikely(((int)_socketBuffer->getRemaining())getByte(); - _version = _socketBuffer->getByte(); - if(unlikely(magic != PVA_MAGIC)) - { - // error... disconnect - LOG( - logLevelError, - "Invalid header received from client %s, disconnecting...", - inetAddressToString(_socketAddress).c_str()); - close(); - return; - } - - // data vs. control packet - _packetType = _socketBuffer->getByte(); - - // command - _command = _socketBuffer->getByte(); - - // read payload size - _payloadSize = _socketBuffer->getInt(); - - int8 type = (int8)(_packetType&0x0F); - if(likely(type==0)) - { - // data - _stage = PROCESS_PAYLOAD; - } - else if(unlikely(type==1)) - { - // control - // marker request sent - if (_command == CMD_SET_MARKER) { -#if FLOW_CONTROL - _flowControlMutex.lock(); - if(_markerToSend==0) - _markerToSend = _payloadSize; - // TODO send back response - _flowControlMutex.unlock(); -#endif - } - - // marker received back - else if (_command == CMD_ACK_MARKER) - { -#if FLOW_CONTROL - _flowControlMutex.lock(); - int difference = (int)_totalBytesSent-_payloadSize+PVA_MESSAGE_HEADER_SIZE; - // overrun check - if(difference<0) difference += INT_MAX; - _remoteBufferFreeSpace - = _remoteTransportReceiveBufferSize - +_remoteTransportSocketReceiveBufferSize - -difference; - // TODO if this is calculated wrong, this can be critical !!! - _flowControlMutex.unlock(); -#endif - } - // set byte order - else if (_command == CMD_SET_ENDIANESS) - { - // check 7-th bit - - int endianess = (_packetType < 0 ? EPICS_ENDIAN_BIG : EPICS_ENDIAN_LITTLE); - _socketBuffer->setEndianess(endianess); - - // TODO register as TransportSender and add to the queue - // current implementation is OK, but not nice - _sendQueueMutex.lock(); - _sendBuffer->setEndianess(endianess); - _byteOrderFlag = (endianess == EPICS_ENDIAN_BIG) ? 0x80 : 0x00; - _sendQueueMutex.unlock(); - } - - // no payload - //stage = ReceiveStage.PROCESS_HEADER; - continue; - } - else { - LOG( - logLevelError, - "Unknown packet type %d, received from client %s, disconnecting...", - type, - inetAddressToString(_socketAddress).c_str()); - close(); - return; - } - } - - if(likely(_stage==PROCESS_PAYLOAD)) { - // read header - - // last segment bit set (means in-between segment or last segment) - bool notFirstSegment = (_packetType&0x20)!=0; - - _storedPayloadSize = _payloadSize; - - // if segmented, exit reading code - if(nestedCall&¬FirstSegment) return; - - // ignore segmented messages with no payload - if (likely(!notFirstSegment || _payloadSize > 0)) - { - - // NOTE: nested data (w/ payload) messages between segmented messages are not supported - _storedPosition = _socketBuffer->getPosition(); - _storedLimit = _socketBuffer->getLimit(); - _socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize, _storedLimit)); - try { - // handle response - Transport::shared_pointer thisPointer = shared_from_this(); - _responseHandler->handleResponse(&_socketAddress, - thisPointer, _version, _command, _payloadSize, - _socketBuffer); - } catch (std::exception& ex) { - - LOG( - logLevelDebug, - "Unexpected exception in responseHandler: %s", - ex.what() - ); - - } catch(...) { - - LOG( - logLevelDebug, - "Unexpected exception in responseHandler!" - ); - - } - - _socketBuffer->setLimit(_storedLimit); - size_t newPosition = _storedPosition+_storedPayloadSize; - if(unlikely(newPosition>_storedLimit)) { - newPosition -= _storedLimit; - _socketBuffer->setPosition(_storedLimit); - processReadCached(true, PROCESS_PAYLOAD,newPosition); - newPosition += _startPosition; - } - _socketBuffer->setPosition(newPosition); - // TODO discard all possible segments?!!! - - } - - _stage = PROCESS_HEADER; - - continue; - } - - } - } catch(...) { - // close connection - close(); - - if(nestedCall) throw; - } - } - - bool BlockingTCPTransport::flush() { - // request issues, has not sent anything yet (per partes) - if(likely(!_sendPending)) { - _sendPending = true; - - // start sending from the start - _sendBufferSentPosition = 0; - -#if FLOW_CONTROL - // if not set skip marker otherwise set it - _flowControlMutex.lock(); - int markerValue = _markerToSend; - _markerToSend = 0; - _flowControlMutex.unlock(); - if(markerValue==0) - _sendBufferSentPosition = PVA_MESSAGE_HEADER_SIZE; - else - _sendBuffer->putInt(4, markerValue); -#endif - } - - bool success = false; - try { - // remember current position - int currentPos = _sendBuffer->getPosition(); - - // set to send position - _sendBuffer->setPosition(_sendBufferSentPosition); - _sendBuffer->setLimit(currentPos); - - success = send(_sendBuffer); - - // all sent? - if(likely(success)) - clearAndReleaseBuffer(); - else { - // remember position - _sendBufferSentPosition = _sendBuffer->getPosition(); - - // .. reset to previous state - _sendBuffer->setPosition(currentPos); - _sendBuffer->setLimit(_sendBuffer->getSize()); - } - //} catch(std::exception& e) { - // LOG(logLevelError, "%s", e.what()); - // // error, release lock - // clearAndReleaseBuffer(); - } catch(...) { - clearAndReleaseBuffer(); - } - return success; - } - - bool BlockingTCPTransport::send(ByteBuffer* buffer) { - try { - // TODO simply use value from marker???!!! - // On Windows, limiting the buffer size is important to prevent - // poor throughput performances when transferring large amount of - // data. See Microsoft KB article KB823764. - // We do it also for other systems just to be safe. - int maxBytesToSend = std::min(_socketSendBufferSize, - static_cast(_remoteTransportSocketReceiveBufferSize))/2; - - int limit = buffer->getLimit(); - int bytesToSend = limit-buffer->getPosition(); - - //LOG(logLevelInfo,"Total bytes to send: %d", bytesToSend); - //printf("Total bytes to send: %d\n", bytesToSend); - - // limit sending - if(bytesToSend>maxBytesToSend) { - bytesToSend = maxBytesToSend; - buffer->setLimit(buffer->getPosition()+bytesToSend); - } - - //LOG(logLevelInfo, - // "Sending %d of total %d bytes in the packet to %s.", - // bytesToSend, limit, - // inetAddressToString(_socketAddress).c_str()); - - while(buffer->getRemaining()>0) { - ssize_t bytesSent = ::send(_channel, - &buffer->getArray()[buffer->getPosition()], - buffer->getRemaining(), 0); - - if(unlikely(bytesSent<0)) { - - int socketError = SOCKERRNO; - - // spurious EINTR check - if (socketError==SOCK_EINTR) - continue; - - // TODO check this (copy below)... consolidate!!! - if (socketError==SOCK_ENOBUFS) { - /* buffers full, reset the limit and indicate that there - * is more data to be sent - */ - if(bytesSent==maxBytesToSend) buffer->setLimit(limit); - return false; - } - - // connection lost - - char errStr[64]; - epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); - ostringstream temp; - temp<<"error in sending TCP data: "<getPosition(), limit); - - /* buffers full, reset the limit and indicate that there - * is more data to be sent - */ - if(bytesSent==maxBytesToSend) buffer->setLimit(limit); - - //LOG(logLevelInfo, - // "Send buffer full for %s, waiting...", - // inetAddressToString(_socketAddress)); - return false; - } - - buffer->setPosition(buffer->getPosition()+bytesSent); - -#if FLOW_CONTROL - _flowControlMutex.lock(); - _totalBytesSent += bytesSent; - _flowControlMutex.unlock(); -#endif - - // readjust limit - if(bytesToSend==maxBytesToSend) { - bytesToSend = limit-buffer->getPosition(); - if(bytesToSend>maxBytesToSend) bytesToSend - = maxBytesToSend; - buffer->setLimit(buffer->getPosition()+bytesToSend); - } - - //LOG(logLevelInfo, - // "Sent, position %d of total %d bytes.", - // buffer->getPosition(), limit); - } // while - } catch(...) { - close(); - throw; - } - - // all sent - return true; - } - - void BlockingTCPTransport::processSendQueue() { - while(unlikely(!_closed.get())) { - - _sendQueueMutex.lock(); - // TODO optimize - TransportSender::shared_pointer sender; - if (likely(!_sendQueue.empty())) - { - sender = _sendQueue.front(); - _sendQueue.pop_front(); - } - _sendQueueMutex.unlock(); - - // wait for new message - while(likely(sender.get()==0&&!_flushRequested&&!_closed.get())) { - if(_flushStrategy==DELAYED) { - if(_delay>0) epicsThreadSleep(_delay); - if(unlikely(_sendQueue.empty())) { - // if (hasMonitors || sendBuffer.position() > PVAConstants.PVA_MESSAGE_HEADER_SIZE) -#if FLOW_CONTROL - if(((int)_sendBuffer->getPosition())>PVA_MESSAGE_HEADER_SIZE) -#else - if(((int)_sendBuffer->getPosition())>0) -#endif - _flushRequested = true; - else - _sendQueueEvent.wait(); - } - } - else - _sendQueueEvent.wait(); - - _sendQueueMutex.lock(); - if (likely(!_sendQueue.empty())) - { - sender = _sendQueue.front(); - _sendQueue.pop_front(); - } - else - sender.reset(); - _sendQueueMutex.unlock(); - } - - // always do flush from this thread - if(unlikely(_flushRequested)) { - /* - if (hasMonitors) - { - monitorSender.send(sendBuffer, this); - } - */ - - flush(); - } - - if(likely(sender.get() != 0)) { - sender->lock(); - try { - _lastMessageStartPosition = _sendBuffer->getPosition(); - sender->send(_sendBuffer, this); - - if(_flushStrategy==IMMEDIATE) - flush(true); - else - endMessage(false);// automatic end (to set payload) - } catch(std::exception &) { - //LOG(logLevelError, "%s", e.what()); - _sendBuffer->setPosition(_lastMessageStartPosition); - } catch(...) { - _sendBuffer->setPosition(_lastMessageStartPosition); - } - sender->unlock(); - } // if(sender!=NULL) - } // while(!_closed.get()) - } - - void BlockingTCPTransport::freeSendBuffers() { - // TODO ? - } - - void BlockingTCPTransport::freeConnectionResorces() { - freeSendBuffers(); - - LOG(logLevelDebug, "Connection to %s closed.", - inetAddressToString(_socketAddress).c_str()); -/* - if(_channel!=INVALID_SOCKET) { - epicsSocketDestroy(_channel); - _channel = INVALID_SOCKET; - } -*/ - } - - void BlockingTCPTransport::rcvThreadRunner(void* param) { - BlockingTCPTransport* obj = (BlockingTCPTransport*)param; - Transport::shared_pointer ptr = obj->shared_from_this(); // hold reference - -try{ - obj->processReadCached(false, UNDEFINED_STAGE, PVA_MESSAGE_HEADER_SIZE); -} catch (...) { -printf("rcvThreadRunnner exception\n"); -} - - /* - if(obj->_autoDelete) { - while(true) - { - bool exited; - obj->_mutex.lock(); - exited = obj->_sendThreadExited; - obj->_mutex.unlock(); - if (exited) - break; - epicsThreadSleep(0.1); - } - delete obj; - } - */ - } - - void BlockingTCPTransport::sendThreadRunner(void* param) { - BlockingTCPTransport* obj = (BlockingTCPTransport*)param; - Transport::shared_pointer ptr = obj->shared_from_this(); // hold reference -try { - obj->processSendQueue(); -} catch (std::exception& ex) { - printf("sendThreadRunnner exception %s\n", ex.what()); // TODO -} catch (...) { -printf("sendThreadRunnner exception\n"); -} - - obj->freeConnectionResorces(); - - // TODO possible crash on unlock - obj->_mutex.lock(); - obj->_sendThreadExited = true; - obj->_mutex.unlock(); - } - - void BlockingTCPTransport::enqueueSendRequest(TransportSender::shared_pointer const & sender) { - Lock lock(_sendQueueMutex); - if(unlikely(_closed.get())) return; - _sendQueue.push_back(sender); - _sendQueueEvent.signal(); - } - - void BlockingTCPTransport::enqueueOnlySendRequest(TransportSender::shared_pointer const & sender) { - Lock lock(_sendQueueMutex); - if(unlikely(_closed.get())) return; - _sendQueue.push_back(sender); - } - - class FlushTransportSender : public TransportSender { - public: - virtual void send(epics::pvData::ByteBuffer*, TransportSendControl* control) - { - control->flush(true); - } - - virtual void lock() {} - virtual void unlock() {} - }; - - static TransportSender::shared_pointer flushTransportSender(new FlushTransportSender()); - - void BlockingTCPTransport::flushSendQueue() { - enqueueSendRequest(flushTransportSender); - } - - /* - void BlockingTCPTransport::enqueueMonitorSendRequest(TransportSender::shared_pointer sender) { - Lock lock(_monitorMutex); - if(unlikely(_closed.get())) return; - _monitorSendQueue.insert(sender); - if(_monitorSendQueue.size()==1) enqueueSendRequest(_monitorSender); - } - - - void MonitorSender::send(ByteBuffer* buffer, TransportSendControl* control) { - control->startMessage(19, 0); - - while(true) { - TransportSender* sender; - _monitorMutex->lock(); - if(_monitorSendQueue->size()>0) - sender = _monitorSendQueue->extract(); - else - sender = NULL; - _monitorMutex->unlock(); - - if(sender==NULL) { - control->ensureBuffer(sizeof(int32)); - buffer->putInt(INVALID_IOID); - break; - } - sender->send(buffer, control); - sender->release(); - } - } -*/ - } -} diff --git a/pvAccessApp/remote/codec.cpp b/pvAccessApp/remote/codec.cpp index 8a95a0f..df3bb2c 100644 --- a/pvAccessApp/remote/codec.cpp +++ b/pvAccessApp/remote/codec.cpp @@ -21,7 +21,6 @@ #include #include - #include using namespace epics::pvData; @@ -30,6 +29,7 @@ using namespace epics::pvAccess; namespace epics { namespace pvAccess { + namespace detail { const std::size_t AbstractCodec::MAX_MESSAGE_PROCESS = 100; const std::size_t AbstractCodec::MAX_MESSAGE_SEND = 100; @@ -91,16 +91,10 @@ namespace epics { _sendBuffer->getSize() - 2*PVA_MESSAGE_HEADER_SIZE; _socketSendBufferSize = socketSendBufferSize; _blockingProcessQueue = blockingProcessQueue; - LOG(logLevelTrace, "AbstractCodec constructed (threadId: %u)", - epicsThreadGetIdSelf()); } void AbstractCodec::processRead() { - - LOG(logLevelTrace, "AbstractCodec::processRead: enter (threadId: %u)", - epicsThreadGetIdSelf()); - switch (_readMode) { case NORMAL: @@ -118,10 +112,6 @@ namespace epics { void AbstractCodec::processHeader() { - LOG(logLevelTrace, "AbstractCodec::processHeader enter (threadId: %u)", - epicsThreadGetIdSelf()); - - // magic code int8_t magicCode = _socketBuffer->getByte(); @@ -153,10 +143,6 @@ namespace epics { void AbstractCodec::processReadNormal() { - LOG(logLevelTrace, - "AbstractCodec::processReadNormal enter (threadId: %u)", - epicsThreadGetIdSelf()); - try { std::size_t messageProcessCount = 0; @@ -286,10 +272,6 @@ namespace epics { void AbstractCodec::processReadSegmented() { - LOG(logLevelTrace, - "AbstractCodec::processReadSegmented enter (threadId: %u)", - epicsThreadGetIdSelf()); - while (true) { // read as much as available, but at least for a header @@ -335,18 +317,9 @@ namespace epics { std::size_t requiredBytes, bool persistent) { - LOG(logLevelTrace, - "AbstractCodec::readToBuffer enter requiredBytes: %u," - " persistant: %d (threadId: %u)", - requiredBytes, persistent, epicsThreadGetIdSelf()); - // do we already have requiredBytes available? std::size_t remainingBytes = _socketBuffer->getRemaining(); if (remainingBytes >= requiredBytes) { - LOG(logLevelTrace, - "AbstractCodec::readToBuffer requiredBytes: %u" - " <= remainingBytes: %d (threadId: %u)", - requiredBytes, remainingBytes); return true; } @@ -378,17 +351,8 @@ namespace epics { { int bytesRead = read(_socketBuffer.get()); - LOG(logLevelTrace, - "AbstractCodec::readToBuffer READ BYTES: %d (threadId: %u)", - bytesRead, epicsThreadGetIdSelf()); - if (bytesRead < 0) { - - LOG(logLevelTrace, - "AbstractCodec::before close on bytesRead < 0 condition (threadId: %u)", - epicsThreadGetIdSelf()); - close(); throw connection_closed_exception("bytesRead < 0"); } @@ -418,11 +382,6 @@ namespace epics { void AbstractCodec::ensureData(std::size_t size) { - LOG(logLevelTrace, - "AbstractCodec::ensureData enter: size: %u (threadId: %u)", - size, epicsThreadGetIdSelf()); - - // enough of data? if (_socketBuffer->getRemaining() >= size) return; @@ -553,11 +512,6 @@ namespace epics { std::size_t value, std::size_t alignment) { - LOG(logLevelTrace, - "AbstractCodec::alignedValue enter: value: %u, alignment:%u" - " (threadId: %u)", - value, alignment, epicsThreadGetIdSelf()); - std::size_t k = (alignment - 1); return (value + k) & (~k); } @@ -565,10 +519,6 @@ namespace epics { void AbstractCodec::alignData(std::size_t alignment) { - LOG(logLevelTrace, - "AbstractCodec::alignData enter: alignment:%u (threadId: %u)", - alignment, epicsThreadGetIdSelf()); - std::size_t k = (alignment - 1); std::size_t pos = _socketBuffer->getPosition(); std::size_t newpos = (pos + k) & (~k); @@ -592,10 +542,6 @@ namespace epics { void AbstractCodec::alignBuffer(std::size_t alignment) { - LOG(logLevelTrace, "AbstractCodec::alignBuffer enter:" - " alignment:%u (threadId: %u)", - alignment, epicsThreadGetIdSelf()); - std::size_t k = (alignment - 1); std::size_t pos = _sendBuffer->getPosition(); std::size_t newpos = (pos + k) & (~k); @@ -612,11 +558,6 @@ namespace epics { epics::pvData::int8 command, std::size_t ensureCapacity) { - LOG(logLevelTrace, - "AbstractCodec::startMessage enter: command:%x " - " ensureCapacity:%u (threadId: %u)", - command, ensureCapacity, epicsThreadGetIdSelf()); - _lastMessageStartPosition = std::numeric_limits::max(); // TODO revise this ensureBuffer( @@ -640,11 +581,6 @@ namespace epics { epics::pvData::int8 command, epics::pvData::int32 data) { - LOG(logLevelTrace, - "AbstractCodec::putControlMessage enter: command:%x " - "data:%d (threadId: %u)", - command, data, epicsThreadGetIdSelf()); - _lastMessageStartPosition = std::numeric_limits::max(); // TODO revise this ensureBuffer(PVA_MESSAGE_HEADER_SIZE); @@ -657,20 +593,12 @@ namespace epics { void AbstractCodec::endMessage() { - - LOG(logLevelTrace, "AbstractCodec::endMessage enter: (threadId: %u)", - epicsThreadGetIdSelf()); - endMessage(false); } void AbstractCodec::endMessage(bool hasMoreSegments) { - LOG(logLevelTrace, - "AbstractCodec::endMessage enter: hasMoreSegments:%d (threadId: %u)", - hasMoreSegments, epicsThreadGetIdSelf()); - if (_lastMessageStartPosition != std::numeric_limits::max()) { std::size_t lastPayloadBytePosition = _sendBuffer->getPosition(); @@ -739,10 +667,6 @@ namespace epics { void AbstractCodec::ensureBuffer(std::size_t size) { - LOG(logLevelTrace, - "AbstractCodec::ensureBuffer enter: size:%u (threadId: %u)", - size, epicsThreadGetIdSelf()); - if (_sendBuffer->getRemaining() >= size) return; @@ -763,21 +687,12 @@ namespace epics { // assumes startMessage was called (or header is in place), because endMessage(true) is later called that peeks and sets _lastSegmentedMessageType void AbstractCodec::flushSerializeBuffer() { - - LOG(logLevelTrace, - "AbstractCodec::flushSerializeBuffer enter: (threadId: %u)", - epicsThreadGetIdSelf()); - flush(false); } void AbstractCodec::flush(bool lastMessageCompleted) { - LOG(logLevelTrace, - "AbstractCodec::flush enter: lastMessageCompleted:%d (threadId: %u)", - lastMessageCompleted, epicsThreadGetIdSelf()); - // automatic end endMessage(!lastMessageCompleted); @@ -807,10 +722,6 @@ namespace epics { void AbstractCodec::processWrite() { - LOG(logLevelTrace, - "AbstractCodec::processWrite enter: (threadId: %u)", - epicsThreadGetIdSelf()); - // TODO catch ConnectionClosedException, InvalidStreamException? switch (_writeMode) { @@ -827,10 +738,6 @@ namespace epics { void AbstractCodec::send(ByteBuffer *buffer) { - LOG(logLevelTrace, "AbstractCodec::send enter: (threadId: %u)", - epicsThreadGetIdSelf()); - - // On Windows, limiting the buffer size is important to prevent // poor throughput performances when transferring large amount of // data. See Microsoft KB article KB823764. @@ -856,11 +763,13 @@ namespace epics { //int p = buffer.position(); int bytesSent = write(buffer); + /* if (IS_LOGGABLE(logLevelTrace)) { hexDump(std::string("AbstractCodec::send WRITE"), (const int8 *)buffer->getArray(), buffer->getPosition(), buffer->getRemaining()); } + */ if (bytesSent < 0) { @@ -894,10 +803,6 @@ namespace epics { void AbstractCodec::processSendQueue() { - LOG(logLevelTrace, - "AbstractCodec::processSendQueue enter: (threadId: %u)", - epicsThreadGetIdSelf()); - { std::size_t senderProcessed = 0; while (senderProcessed++ < MAX_MESSAGE_SEND) @@ -935,22 +840,12 @@ namespace epics { void AbstractCodec::clearSendQueue() { - LOG(logLevelTrace, - "AbstractCodec::clearSendQueue enter: (threadId: %u)", - epicsThreadGetIdSelf()); - _sendQueue.clean(); } void AbstractCodec::enqueueSendRequest( TransportSender::shared_pointer const & sender) { - - LOG(logLevelTrace, - "AbstractCodec::enqueueSendRequest enter: sender is set:%d" - " (threadId: %u)", - (sender.get() != 0), epicsThreadGetIdSelf()); - _sendQueue.put(sender); scheduleSend(); } @@ -958,10 +853,6 @@ namespace epics { void AbstractCodec::setSenderThread() { - LOG(logLevelTrace, - "AbstractCodec::setSenderThread enter: (threadId: %u)", - epicsThreadGetIdSelf()); - _senderThread = epicsThreadGetIdSelf(); } @@ -970,10 +861,6 @@ namespace epics { TransportSender::shared_pointer const & sender) { - LOG(logLevelTrace, - "AbstractCodec::processSender enter: sender is set:%d (threadId: %u)", - (sender.get() != 0), epicsThreadGetIdSelf); - ScopedLock lock(sender); try { @@ -1007,11 +894,6 @@ namespace epics { TransportSender::shared_pointer const & sender, std::size_t requiredBufferSize) { - LOG(logLevelTrace, - "AbstractCodec::enqueueSendRequest enter: sender is set:%d " - "requiredBufferSize:%u (threadId: %u)", - (sender.get() != 0), requiredBufferSize, epicsThreadGetIdSelf); - if (_senderThread == epicsThreadGetIdSelf() && _sendQueue.empty() && _sendBuffer->getRemaining() >= requiredBufferSize) @@ -1031,22 +913,12 @@ namespace epics { void AbstractCodec::setRecipient(osiSockAddr const & sendTo) { - - LOG(logLevelTrace, - "AbstractCodec::setRecipient enter: (threadId: %u)", - epicsThreadGetIdSelf); - _sendTo = sendTo; } void AbstractCodec::setByteOrder(int byteOrder) { - - LOG(logLevelTrace, - "AbstractCodec::setByteOrder enter: byteOrder:%x (threadId: %u)", - byteOrder, epicsThreadGetIdSelf()); - _socketBuffer->setEndianess(byteOrder); // TODO sync _sendBuffer->setEndianess(byteOrder); @@ -1063,40 +935,21 @@ namespace epics { // void BlockingAbstractCodec::readPollOne() { - - LOG(logLevelTrace, - "BlockingAbstractCodec::readPollOne enter: (threadId: %u)", - epicsThreadGetIdSelf()); - throw std::logic_error("should not be called for blocking IO"); } void BlockingAbstractCodec::writePollOne() { - - LOG(logLevelTrace, - "BlockingAbstractCodec::writePollOne enter: (threadId: %u)", - epicsThreadGetIdSelf()); - throw std::logic_error("should not be called for blocking IO"); } void BlockingAbstractCodec::close() { - LOG(logLevelTrace, - "BlockingAbstractCodec::close enter: (threadId: %u)", - epicsThreadGetIdSelf()); - - if (_isOpen.getAndSet(false)) { // always close in the same thread, same way, etc. // wakeup processSendQueue - LOG(logLevelTrace, - "BlockingAbstractCodec::close _sendQueue.waaaaakeup: " - " (threadId: %u)", - epicsThreadGetIdSelf()); // clean resources internalClose(true); @@ -1106,12 +959,6 @@ namespace epics { // post close internalPostClose(true); } - else { - LOG(logLevelTrace, - "BlockingAbstractCodec::close NOT WAKING UP _sendQueue: " - " (threadId: %u)", - epicsThreadGetIdSelf()); - } } void BlockingAbstractCodec::internalClose(bool /*force*/) { @@ -1121,20 +968,11 @@ namespace epics { } bool BlockingAbstractCodec::terminated() { - - LOG(logLevelTrace, - "BlockingAbstractCodec::terminated enter: (threadId: %u)", - epicsThreadGetIdSelf()); - return !isOpen(); } bool BlockingAbstractCodec::isOpen() { - - LOG(logLevelTrace, "BlockingAbstractCodec::isOpen %d (threadId: %u)", _isOpen.get(), - epicsThreadGetIdSelf()); - return _isOpen.get(); } @@ -1142,9 +980,6 @@ namespace epics { // NOTE: must not be called from constructor (e.g. needs shared_from_this()) void BlockingAbstractCodec::start() { - LOG(logLevelTrace, "BlockingAbstractCodec::start enter: (threadId: %u)", - epicsThreadGetIdSelf()); - _readThread = epicsThreadCreate( "BlockingAbstractCodec-readThread", epicsThreadPriorityMedium, @@ -1161,21 +996,12 @@ namespace epics { BlockingAbstractCodec::sendThread, this); - LOG(logLevelTrace, - "BlockingAbstractCodec::start exit WITH readThread: %u," - " sendThread:%u (threadId: %u)", - _readThread, _sendThread, epicsThreadGetIdSelf()); - } void BlockingAbstractCodec::receiveThread(void *param) { - LOG(logLevelTrace, - "BlockingAbstractCodec::receiveThread enter: (threadId: %u)", - epicsThreadGetIdSelf()); - BlockingAbstractCodec *bac = static_cast(param); Transport::shared_pointer ptr = bac->shared_from_this(); @@ -1190,23 +1016,14 @@ namespace epics { } } - LOG(logLevelTrace, "BlockingAbstractCodec::receiveThread" - " EXIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIT: (threadId: %u)", - epicsThreadGetIdSelf()); - bac->_shutdownEvent.signal(); - } void BlockingAbstractCodec::sendThread(void *param) { - LOG(logLevelTrace, - "BlockingAbstractCodec::sendThread enter: (threadId: %u)", - epicsThreadGetIdSelf()); - - BlockingAbstractCodec *bac = static_cast(param); + BlockingAbstractCodec *bac = static_cast(param); Transport::shared_pointer ptr = bac->shared_from_this(); bac->setSenderThread(); @@ -1222,38 +1039,16 @@ namespace epics { } } - LOG(logLevelTrace, - "BlockingAbstractCodec::sendThread EXIIIIIIIIIIIIIIT" - " while(bac->isOpen): (threadId: %u)", - epicsThreadGetIdSelf()); - - // wait read thread to die bac->_shutdownEvent.wait(); // call internal destroy - LOG(logLevelTrace, "XXXXXXXXXXXXXXXXXXXXXXXXXXXX" - "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX (threadId: %u)", - epicsThreadGetIdSelf()); bac->internalDestroy(); - LOG(logLevelTrace, "YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY" - "YYYYYYYYYYYYYYYYYYYYYYYYYYYY (threadId: %u)", - epicsThreadGetIdSelf()); - - LOG(logLevelTrace, - "BlockingAbstractCodec::sendThread EXIIIIT (threadId: %u)", - epicsThreadGetIdSelf()); } void BlockingAbstractCodec::sendBufferFull(int tries) { - - LOG(logLevelTrace, - "BlockingAbstractCodec::sendBufferFull enter: tries: %d " - "(threadId: %u)", - tries, epicsThreadGetIdSelf()); - // TODO constants epicsThreadSleep(std::max(tries * 0.1, 1)); } @@ -1309,18 +1104,11 @@ namespace epics { inetAddressToString(_socketAddress).c_str(), errStr); } - LOG(logLevelTrace, - "BlockingSocketAbstractCodec constructed (threadId: %u)", - epicsThreadGetIdSelf()); } // must be called only once, when there will be no operation on socket (e.g. just before tx/rx thread exists) void BlockingSocketAbstractCodec::internalDestroy() { - LOG(logLevelTrace, - "BlockingSocketAbstractCodec::internalDestroy enter: (threadId: %u)", - epicsThreadGetIdSelf()); - if(_channel != INVALID_SOCKET) { epicsSocketDestroy(_channel); _channel = INVALID_SOCKET; @@ -1330,12 +1118,6 @@ namespace epics { void BlockingSocketAbstractCodec::invalidDataStreamHandler() { - - LOG(logLevelTrace, - "BlockingSocketAbstractCodec::invalidDataStreamHandler enter:" - " (threadId: %u)", - epicsThreadGetIdSelf()); - close(); } @@ -1343,29 +1125,14 @@ namespace epics { int BlockingSocketAbstractCodec::write( epics::pvData::ByteBuffer *src) { - LOG(logLevelTrace, - "BlockingSocketAbstractCodec::write enter: position:%u, " - "remaining:%u (threadId: %u)", - src->getPosition(), src->getRemaining(), epicsThreadGetIdSelf()); - std::size_t remaining; while((remaining=src->getRemaining()) > 0) { - LOG(logLevelTrace, - "BlockingSocketAbstractCodec::write before send" - " (threadId: %u)", - epicsThreadGetIdSelf()); - - int bytesSent = ::send(_channel, &src->getArray()[src->getPosition()], remaining, 0); - LOG(logLevelTrace, - "BlockingSocketAbstractCodec::write afer send, read:%d" - " (threadId: %u)", - bytesSent, epicsThreadGetIdSelf()); - + // NOTE: do not log here, you might override SOCKERRNO relevant to recv() operation above if(unlikely(bytesSent<0)) { @@ -1384,18 +1151,13 @@ namespace epics { } - //TODO check what to return - return -1; + return 0; } std::size_t BlockingSocketAbstractCodec::getSocketReceiveBufferSize() const { - LOG(logLevelTrace, - "BlockingSocketAbstractCodec::getSocketReceiveBufferSize" - " enter (threadId: %u)", epicsThreadGetIdSelf()); - osiSocklen_t intLen = sizeof(int); char strBuffer[64]; int socketRecvBufferSize; @@ -1407,46 +1169,24 @@ namespace epics { //LOG(logLevelDebug, "Error getting SO_SNDBUF: %s", strBuffer); } - LOG(logLevelTrace, - "BlockingSocketAbstractCodec::getSocketReceiveBufferSize" - " returning:%u (threadId: %u)", socketRecvBufferSize, - epicsThreadGetIdSelf()); - return socketRecvBufferSize; } int BlockingSocketAbstractCodec::read(epics::pvData::ByteBuffer* dst) { - LOG(logLevelTrace, - "BlockingSocketAbstractCodec::read enter: " - "read bytes:%u (threadId: %u)", - dst->getRemaining(), epicsThreadGetIdSelf()); - std::size_t remaining; while((remaining=dst->getRemaining()) > 0) { // read std::size_t pos = dst->getPosition(); - LOG(logLevelTrace, - "BlockingSocketAbstractCodec::read before recv" - " (threadId: %u)", - epicsThreadGetIdSelf()); - - int bytesRead = recv(_channel, (char*)(dst->getArray()+pos), remaining, 0); // NOTE: do not log here, you might override SOCKERRNO relevant to recv() operation above /* - LOG(logLevelTrace, - "BlockingSocketAbstractCodec::read after recv, read: %d", - bytesRead," (threadId: %u)", - epicsThreadGetIdSelf()); - - if (IS_LOGGABLE(logLevelTrace)) { hexDump(std::string("READ"), (const int8 *)(dst->getArray()+pos), bytesRead); @@ -1459,11 +1199,6 @@ namespace epics { { int socketError = SOCKERRNO; - LOG(logLevelTrace, - "BlockingSocketAbstractCodec::read SOCKERRNO %d", socketError, - " (threadId: %u)", - epicsThreadGetIdSelf()); - // interrupted or timeout if (socketError == EINTR || socketError == EAGAIN || @@ -1497,28 +1232,15 @@ namespace epics { //register/unregister // TODO implement priorities in Reactor... not that user will // change it.. still getPriority() must return "registered" priority! - - LOG(logLevelTrace, - "BlockingServerTCPTransportCodec constructed (threadId: %u)", - epicsThreadGetIdSelf()); - } BlockingServerTCPTransportCodec::~BlockingServerTCPTransportCodec() { - LOG(logLevelTrace, - "BlockingServerTCPTransportCodec DESTRUCTED (threadId: %u)", - epicsThreadGetIdSelf()); } pvAccessID BlockingServerTCPTransportCodec::preallocateChannelSID() { - LOG(logLevelTrace, - "BlockingServerTCPTransportCodec::preallocateChannelSID enter:" - " (threadId: %u)", - epicsThreadGetIdSelf()); - Lock lock(_channelsMutex); // search first free (theoretically possible loop of death) pvAccessID sid = ++_lastChannelSID; @@ -1532,11 +1254,6 @@ namespace epics { pvAccessID sid, ServerChannel::shared_pointer const & channel) { - LOG(logLevelTrace, - "BlockingServerTCPTransportCodec::registerChannel enter: sid:%d " - " (threadId: %u)", - channel->getSID(), epicsThreadGetIdSelf()); - Lock lock(_channelsMutex); _channels[sid] = channel; @@ -1545,11 +1262,6 @@ namespace epics { void BlockingServerTCPTransportCodec::unregisterChannel(pvAccessID sid) { - LOG(logLevelTrace, - "BlockingServerTCPTransportCodec::unregisterChannel enter:" - " sid:%d (threadId: %u)", - sid, epicsThreadGetIdSelf()); - Lock lock(_channelsMutex); _channels.erase(sid); } @@ -1558,11 +1270,6 @@ namespace epics { ServerChannel::shared_pointer BlockingServerTCPTransportCodec::getChannel(pvAccessID sid) { - LOG(logLevelTrace, - "BlockingServerTCPTransportCodec::getChannel enter:" - " sid:%d (threadId: %u)", - sid, epicsThreadGetIdSelf()); - Lock lock(_channelsMutex); std::map::iterator it = @@ -1576,11 +1283,6 @@ namespace epics { int BlockingServerTCPTransportCodec::getChannelCount() { - LOG(logLevelTrace, - "BlockingServerTCPTransportCodec::getChannelCount enter: " - "(threadId: %u)", - epicsThreadGetIdSelf()); - Lock lock(_channelsMutex); return static_cast(_channels.size()); } @@ -1589,10 +1291,6 @@ namespace epics { void BlockingServerTCPTransportCodec::send(ByteBuffer* buffer, TransportSendControl* control) { - LOG(logLevelTrace, - "BlockingServerTCPTransportCodec::send enter: (threadId: %u)", - epicsThreadGetIdSelf()); - // // set byte order control message // @@ -1893,7 +1591,6 @@ namespace epics { } - + } } - } diff --git a/pvAccessApp/remote/codec.h b/pvAccessApp/remote/codec.h index f5b5b17..a752acd 100644 --- a/pvAccessApp/remote/codec.h +++ b/pvAccessApp/remote/codec.h @@ -28,7 +28,6 @@ #include #include #include -#include #ifdef abstractCodecEpicsExportSharedSymbols # define epicsExportSharedSymbols @@ -44,6 +43,7 @@ namespace epics { namespace pvAccess { + namespace detail { // TODO replace mutex with atomic (CAS) operations template @@ -80,24 +80,17 @@ namespace epics { */ ~queue(void) { - LOG(logLevelTrace, - "queue::~queue DESTROY (threadId: %u)", epicsThreadGetIdSelf()); } bool empty(void) { - LOG(logLevelTrace, - "queue::empty enter: (threadId: %u)", epicsThreadGetIdSelf()); epics::pvData::Lock lock(_queueMutex); return _queue.empty(); } void clean() { - LOG(logLevelTrace, "queue::clean enter: (threadId: %u)", - epicsThreadGetIdSelf()); - epics::pvData::Lock lock(_queueMutex); _queue.clear(); } @@ -105,15 +98,8 @@ namespace epics { void wakeup() { - - LOG(logLevelTrace, "queue::wakeup enter: (threadId: %u)", - epicsThreadGetIdSelf()); - if (!_wakeup.getAndSet(true)) { - LOG(logLevelTrace, - "queue::wakeup signaling on _queueEvent: (threadId: %u)", - epicsThreadGetIdSelf()); _queueEvent.signal(); } } @@ -121,9 +107,6 @@ namespace epics { void put(T const & elem) { - LOG(logLevelTrace, - "queue::put enter (threadId: %u)", epicsThreadGetIdSelf()); - { epics::pvData::Lock lock(_queueMutex); _queue.push_back(elem); @@ -135,11 +118,6 @@ namespace epics { T take(int timeOut) { - - LOG(logLevelTrace, - "queue::take enter timeOut:%d (threadId: %u)", - timeOut, epicsThreadGetIdSelf()); - while (true) { @@ -149,10 +127,6 @@ namespace epics { { if (timeOut < 0) { - epics::pvAccess::LOG(logLevelTrace, - "queue::take exit timeOut:%d (threadId: %u)", - timeOut, epicsThreadGetIdSelf()); - return T(); } @@ -160,45 +134,21 @@ namespace epics { { if (timeOut == 0) { - - LOG(logLevelTrace, - "queue::take going to wait timeOut:%d (threadId: %u)", - timeOut, epicsThreadGetIdSelf()); - _queueEvent.wait(); } else { - - LOG(logLevelTrace, - "queue::take going to wait timeOut:%d (threadId: %u)", - timeOut, epicsThreadGetIdSelf()); - _queueEvent.wait(timeOut); } - LOG(logLevelTrace, - "queue::take waking up timeOut:%d (threadId: %u)", - timeOut, epicsThreadGetIdSelf()); - isEmpty = empty(); if (isEmpty) { if (timeOut > 0) { // TODO spurious wakeup, but not critical - LOG(logLevelTrace, - "queue::take exit after being woken up timeOut:%d" - " (threadId: %u)", - timeOut, epicsThreadGetIdSelf()); return T(); } else // if (timeout == 0) cannot be negative { if (_wakeup.getAndSet(false)) { - - LOG(logLevelTrace, - "queue::take exit after being woken up timeOut:%d" - " (threadId: %u)", - timeOut, epicsThreadGetIdSelf()); - return T(); } } @@ -207,20 +157,9 @@ namespace epics { } else { - - LOG(logLevelTrace, - "queue::take obtaining lock for front element timeOut:%d" - " (threadId: %u)", - timeOut, epicsThreadGetIdSelf()); - epics::pvData::Lock lock(_queueMutex); T sender = _queue.front(); _queue.pop_front(); - - LOG(logLevelTrace, - "queue::take exit with sender timeOut:%d (threadId: %u)", - timeOut, epicsThreadGetIdSelf()); - return sender; } } @@ -296,9 +235,6 @@ namespace epics { virtual ~AbstractCodec() { - LOG(logLevelTrace, - "AbstractCodec::~AbstractCodec DESTROY (threadId: %u)", - epicsThreadGetIdSelf()); } void alignBuffer(std::size_t alignment); @@ -351,7 +287,7 @@ namespace epics { std::tr1::shared_ptr _socketBuffer; std::tr1::shared_ptr _sendBuffer; - epics::pvAccess::queue _sendQueue; + queue _sendQueue; private: @@ -473,11 +409,6 @@ namespace epics { void internalDestroy() { - - LOG(logLevelTrace, - "BlockingTCPTransportCodec::internalDestroy() enter: (threadId: %u)", - epicsThreadGetIdSelf()); - BlockingSocketAbstractCodec::internalDestroy(); Transport::shared_pointer thisSharedPtr = this->shared_from_this(); _context->getTransportRegistry()->remove(thisSharedPtr); @@ -488,12 +419,6 @@ namespace epics { void processControlMessage() { - - LOG(logLevelTrace, - "BlockingTCPTransportCodec::processControlMessage()" - "enter: (threadId: %u)", - epicsThreadGetIdSelf()); - if (_command == 2) { // check 7-th bit @@ -503,12 +428,6 @@ namespace epics { void processApplicationMessage() { - - LOG(logLevelTrace, - "BlockingTCPTransportCodec::processApplicationMessage() enter:" - " (threadId: %u)", - epicsThreadGetIdSelf()); - _responseHandler->handleResponse(&_socketAddress, shared_from_this(), _version, _command, _payloadSize, _socketBuffer.get()); } @@ -535,37 +454,18 @@ namespace epics { void setRemoteRevision(epics::pvData::int8 revision) { - - LOG(logLevelTrace, - "BlockingTCPTransportCodec::setRemoteRevision() enter:" - " revision: %d (threadId: %u)", - revision, epicsThreadGetIdSelf()); - _remoteTransportRevision = revision; } void setRemoteTransportReceiveBufferSize( std::size_t remoteTransportReceiveBufferSize) { - - LOG(logLevelTrace, - "BlockingTCPTransportCodec::setRemoteTransportReceiveBufferSize()" - " enter: remoteTransportReceiveBufferSize:%u (threadId: %u)", - remoteTransportReceiveBufferSize, epicsThreadGetIdSelf()); - _remoteTransportReceiveBufferSize = remoteTransportReceiveBufferSize; } void setRemoteTransportSocketReceiveBufferSize( std::size_t socketReceiveBufferSize) { - - LOG(logLevelTrace, - "BlockingTCPTransportCodec::" - "setRemoteTransportSocketReceiveBufferSize()" - "enter: socketReceiveBufferSize:%u (threadId: %u)", - socketReceiveBufferSize, epicsThreadGetIdSelf()); - _remoteTransportSocketReceiveBufferSize = socketReceiveBufferSize; } @@ -573,12 +473,6 @@ namespace epics { std::tr1::shared_ptr cachedDeserialize(epics::pvData::ByteBuffer* buffer) { - - LOG(logLevelTrace, - "BlockingTCPTransportCodec::cachedDeserialize() enter:" - " (threadId: %u)", - epicsThreadGetIdSelf()); - return _incomingIR.deserialize(buffer, this); } @@ -587,12 +481,6 @@ namespace epics { const std::tr1::shared_ptr& field, epics::pvData::ByteBuffer* buffer) { - - LOG(logLevelTrace, - "BlockingTCPTransportCodec::cachedSerialize() enter:" - " (threadId: %u)", - epicsThreadGetIdSelf()); - _outgoingIR.serialize(field, buffer, this); } @@ -602,11 +490,7 @@ namespace epics { const char* /*toSerialize*/, std::size_t /*elementCount*/, std::size_t /*elementSize*/) { - - LOG(logLevelTrace, - "BlockingTCPTransportCodec::directSerialize() enter: (threadId: %u)", - epicsThreadGetIdSelf()); - + // TODO !!!! return false; } @@ -614,12 +498,7 @@ namespace epics { bool directDeserialize(epics::pvData::ByteBuffer * /*existingBuffer*/, char* /*deserializeTo*/, std::size_t /*elementCount*/, std::size_t /*elementSize*/) { - - LOG(logLevelTrace, - "BlockingTCPTransportCodec::directDeserialize() enter:" - " (threadId: %u)", - epicsThreadGetIdSelf()); - + // TODO !!! return false; } @@ -628,21 +507,11 @@ namespace epics { bool isClosed() { - - LOG(logLevelTrace, - "BlockingTCPTransportCodec::isClosed() enter: (threadId: %u)", - epicsThreadGetIdSelf()); - return !isOpen(); } void activate() { - - LOG(logLevelTrace, - "BlockingTCPTransportCodec::activate() enter: (threadId: %u)", - epicsThreadGetIdSelf()); - Transport::shared_pointer thisSharedPtr = shared_from_this(); _context->getTransportRegistry()->put(thisSharedPtr); @@ -664,9 +533,6 @@ namespace epics { _remoteTransportReceiveBufferSize(MAX_TCP_RECV), _remoteTransportRevision(0), _priority(priority) { - LOG(logLevelTrace, - "BlockingTCPTransportCodec constructed: (threadId: %u)", - epicsThreadGetIdSelf()); } Context::shared_pointer _context; @@ -720,12 +586,6 @@ namespace epics { bool acquire(std::tr1::shared_ptr const & client) { - - LOG(logLevelTrace, - "BlockingServerTCPTransportCodec::acquire() enter:" - " client is set: %d (threadId: %u)", - (client.get() != 0), epicsThreadGetIdSelf()); - return false; } @@ -748,12 +608,6 @@ namespace epics { int getChannelCount(); epics::pvData::PVField::shared_pointer getSecurityToken() { - - LOG(logLevelTrace, - "BlockingServerTCPTransportCodec::getSecurityToken() enter:" - " (threadId: %u)", - epicsThreadGetIdSelf()); - return epics::pvData::PVField::shared_pointer(); } @@ -766,12 +620,6 @@ namespace epics { } bool verify(epics::pvData::int32 timeoutMs) { - - LOG(logLevelTrace, - "BlockingServerTCPTransportCodec::verify() enter: " - "timeoutMs:%d (threadId: %u)", - timeoutMs, epicsThreadGetIdSelf()); - TransportSender::shared_pointer transportSender = std::tr1::dynamic_pointer_cast(shared_from_this()); enqueueSendRequest(transportSender); @@ -944,7 +792,8 @@ namespace epics { epics::pvData::Event _verifiedEvent; }; - + + } } } diff --git a/pvAccessCPP.files b/pvAccessCPP.files index 82a19d7..a0e4c1e 100644 --- a/pvAccessCPP.files +++ b/pvAccessCPP.files @@ -23,12 +23,9 @@ pvAccessApp/mb/pvAccessMB.h pvAccessApp/remote/abstractResponseHandler.cpp pvAccessApp/remote/beaconHandler.cpp pvAccessApp/remote/beaconHandler.h -pvAccessApp/remote/blockingClientTCPTransport.cpp -pvAccessApp/remote/blockingServerTCPTransport.cpp pvAccessApp/remote/blockingTCP.h pvAccessApp/remote/blockingTCPAcceptor.cpp pvAccessApp/remote/blockingTCPConnector.cpp -pvAccessApp/remote/blockingTCPTransport.cpp pvAccessApp/remote/blockingUDP.h pvAccessApp/remote/blockingUDPConnector.cpp pvAccessApp/remote/blockingUDPTransport.cpp diff --git a/testApp/remote/testCodec.cpp b/testApp/remote/testCodec.cpp index 0897354..b400b7f 100644 --- a/testApp/remote/testCodec.cpp +++ b/testApp/remote/testCodec.cpp @@ -16,6 +16,7 @@ #include using namespace epics::pvData; +using namespace epics::pvAccess::detail; namespace epics {