testing AtomicBoolean for _closed flag

This commit is contained in:
Matej Sekoranja
2011-10-06 13:53:43 +02:00
parent 39a99aa5ce
commit e79de9d46d
3 changed files with 37 additions and 25 deletions
@@ -101,7 +101,7 @@ namespace epics {
bool BlockingClientTCPTransport::acquire(TransportClient::shared_pointer const & client) {
Lock lock(_mutex);
if(_closed) return false;
if(_closed.get()) return false;
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
@@ -159,7 +159,7 @@ namespace epics {
//void BlockingClientTCPTransport::release(TransportClient::shared_pointer const & client) {
void BlockingClientTCPTransport::release(pvAccessID clientID) {
Lock lock(_mutex);
if(_closed) return;
if(_closed.get()) return;
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
+22 -3
View File
@@ -39,6 +39,26 @@ using epics::pvData::int64;
namespace epics {
namespace pvAccess {
struct null_deleter
{
void operator()(void const *) const {}
};
class AtomicBoolean
{
public:
AtomicBoolean() {};
void set() { counter.reset(static_cast<void*>(0), null_deleter()); }
void clear() { counter.reset(); }
bool get() { return counter.use_count(); }
private:
std::tr1::shared_ptr<void> counter;
};
//class MonitorSender;
enum ReceiveStage {
@@ -61,8 +81,7 @@ namespace epics {
public:
virtual bool isClosed() {
epics::pvData::Lock guard(_mutex);
return _closed;
return _closed.get();
}
virtual void setRemoteMinorRevision(int8 minorRevision) {
@@ -370,7 +389,7 @@ namespace epics {
* Connection status
* NOTE: synced by _mutex
*/
bool _closed;
AtomicBoolean _closed;
// NOTE: synced by _mutex
bool _sendThreadExited;
+13 -20
View File
@@ -123,7 +123,7 @@ namespace epics {
_payloadSize(0),
_stage(READ_FROM_SOCKET),
_totalBytesReceived(0),
_closed(false),
_closed(),
_sendThreadExited(false),
_verified(false),
_markerToSend(0),
@@ -258,9 +258,8 @@ namespace epics {
Lock lock(_mutex);
// already closed check
if(_closed) return;
_closed = true;
if(_closed.get()) return;
_closed.set();
// remove from registry
Transport::shared_pointer thisSharedPtr = shared_from_this();
@@ -364,13 +363,11 @@ namespace epics {
temp<<_maxPayloadSize<<" available.";
THROW_BASE_EXCEPTION(temp.str().c_str());
}
// TODO sync _closed
while(((int)_sendBuffer->getRemaining())<size && !_closed)
while(((int)_sendBuffer->getRemaining())<size && !_closed.get())
flush(false);
if (unlikely(_closed)) THROW_BASE_EXCEPTION("transport closed");
if (unlikely(_closed.get())) THROW_BASE_EXCEPTION("transport closed");
}
void BlockingTCPTransport::alignBuffer(int alignment) {
@@ -488,14 +485,12 @@ namespace epics {
_socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize,
_storedLimit));
// TODO sync _closed
// add if missing...
if(unlikely(!_closed&&((int)_socketBuffer->getRemaining())<size))
if(unlikely(!_closed.get()&&((int)_socketBuffer->getRemaining())<size))
ensureData(size);
}
if(unlikely(_closed)) THROW_BASE_EXCEPTION("transport closed");
if(unlikely(_closed.get())) THROW_BASE_EXCEPTION("transport closed");
}
void BlockingTCPTransport::alignData(int alignment) {
@@ -510,8 +505,7 @@ namespace epics {
void BlockingTCPTransport::processReadCached(bool nestedCall,
ReceiveStage inStage, int requiredBytes) {
try {
// TODO sync _closed
while(likely(!_closed)) {
while(likely(!_closed.get())) {
if(_stage==READ_FROM_SOCKET||inStage!=NONE) {
// add to bytes read
@@ -882,8 +876,7 @@ namespace epics {
}
void BlockingTCPTransport::processSendQueue() {
// TODO sync _closed
while(unlikely(!_closed)) {
while(unlikely(!_closed.get())) {
_sendQueueMutex.lock();
// TODO optimize
@@ -896,7 +889,7 @@ namespace epics {
_sendQueueMutex.unlock();
// wait for new message
while(likely(sender.get()==0&&!_flushRequested&&!_closed)) {
while(likely(sender.get()==0&&!_flushRequested&&!_closed.get())) {
if(_flushStrategy==DELAYED) {
if(_delay>0) epicsThreadSleep(_delay);
if(unlikely(_sendQueue.empty())) {
@@ -952,7 +945,7 @@ namespace epics {
}
sender->unlock();
} // if(sender!=NULL)
} // while(!_closed)
} // while(!_closed.get())
}
void BlockingTCPTransport::freeSendBuffers() {
@@ -1019,7 +1012,7 @@ printf("sendThreadRunnner exception\n");
void BlockingTCPTransport::enqueueSendRequest(TransportSender::shared_pointer const & sender) {
Lock lock(_sendQueueMutex);
if(_closed) return;
if(unlikely(_closed.get())) return;
_sendQueue.push_back(sender);
_sendQueueEvent.signal();
}
@@ -1027,7 +1020,7 @@ printf("sendThreadRunnner exception\n");
/*
void BlockingTCPTransport::enqueueMonitorSendRequest(TransportSender::shared_pointer sender) {
Lock lock(_monitorMutex);
if(_closed) return;
if(unlikely(_closed.get())) return;
_monitorSendQueue.insert(sender);
if(_monitorSendQueue.size()==1) enqueueSendRequest(_monitorSender);
}