diff --git a/pvAccessApp/remote/blockingClientTCPTransport.cpp b/pvAccessApp/remote/blockingClientTCPTransport.cpp index 2fe61e3..a7b91eb 100644 --- a/pvAccessApp/remote/blockingClientTCPTransport.cpp +++ b/pvAccessApp/remote/blockingClientTCPTransport.cpp @@ -101,7 +101,7 @@ namespace epics { bool BlockingClientTCPTransport::acquire(TransportClient::shared_pointer const & client) { Lock lock(_mutex); - if(_closed) return false; + if(_closed.get()) return false; char ipAddrStr[48]; ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); @@ -159,7 +159,7 @@ namespace epics { //void BlockingClientTCPTransport::release(TransportClient::shared_pointer const & client) { void BlockingClientTCPTransport::release(pvAccessID clientID) { Lock lock(_mutex); - if(_closed) return; + if(_closed.get()) return; char ipAddrStr[48]; ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index 8cd6c3e..69d2bfd 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -39,6 +39,26 @@ using epics::pvData::int64; namespace epics { namespace pvAccess { +struct null_deleter +{ + void operator()(void const *) const {} +}; + +class AtomicBoolean +{ + public: + AtomicBoolean() {}; + + void set() { counter.reset(static_cast(0), null_deleter()); } + void clear() { counter.reset(); } + + bool get() { return counter.use_count(); } + + private: + std::tr1::shared_ptr counter; +}; + + //class MonitorSender; enum ReceiveStage { @@ -61,8 +81,7 @@ namespace epics { public: virtual bool isClosed() { - epics::pvData::Lock guard(_mutex); - return _closed; + return _closed.get(); } virtual void setRemoteMinorRevision(int8 minorRevision) { @@ -370,7 +389,7 @@ namespace epics { * Connection status * NOTE: synced by _mutex */ - bool _closed; + AtomicBoolean _closed; // NOTE: synced by _mutex bool _sendThreadExited; diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index c98ead1..b61c465 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -123,7 +123,7 @@ namespace epics { _payloadSize(0), _stage(READ_FROM_SOCKET), _totalBytesReceived(0), - _closed(false), + _closed(), _sendThreadExited(false), _verified(false), _markerToSend(0), @@ -258,9 +258,8 @@ namespace epics { Lock lock(_mutex); // already closed check - if(_closed) return; - - _closed = true; + if(_closed.get()) return; + _closed.set(); // remove from registry Transport::shared_pointer thisSharedPtr = shared_from_this(); @@ -364,13 +363,11 @@ namespace epics { temp<<_maxPayloadSize<<" available."; THROW_BASE_EXCEPTION(temp.str().c_str()); } - - // TODO sync _closed - while(((int)_sendBuffer->getRemaining())getRemaining())setLimit(min(_storedPosition+_storedPayloadSize, _storedLimit)); - // TODO sync _closed - // add if missing... - if(unlikely(!_closed&&((int)_socketBuffer->getRemaining())getRemaining())0) epicsThreadSleep(_delay); if(unlikely(_sendQueue.empty())) { @@ -952,7 +945,7 @@ namespace epics { } sender->unlock(); } // if(sender!=NULL) - } // while(!_closed) + } // while(!_closed.get()) } void BlockingTCPTransport::freeSendBuffers() { @@ -1019,7 +1012,7 @@ printf("sendThreadRunnner exception\n"); void BlockingTCPTransport::enqueueSendRequest(TransportSender::shared_pointer const & sender) { Lock lock(_sendQueueMutex); - if(_closed) return; + if(unlikely(_closed.get())) return; _sendQueue.push_back(sender); _sendQueueEvent.signal(); } @@ -1027,7 +1020,7 @@ printf("sendThreadRunnner exception\n"); /* void BlockingTCPTransport::enqueueMonitorSendRequest(TransportSender::shared_pointer sender) { Lock lock(_monitorMutex); - if(_closed) return; + if(unlikely(_closed.get())) return; _monitorSendQueue.insert(sender); if(_monitorSendQueue.size()==1) enqueueSendRequest(_monitorSender); }