mm of TransportSender

This commit is contained in:
Matej Sekoranja
2011-02-01 00:38:11 +01:00
parent 5ee480beee
commit d0db4588ee
11 changed files with 163 additions and 74 deletions
+10
View File
@@ -69,6 +69,16 @@ void BeaconEmitter::unlock()
//noop
}
void BeaconEmitter::acquire()
{
//noop
}
void BeaconEmitter::release()
{
//noop
}
void BeaconEmitter::send(ByteBuffer* buffer, TransportSendControl* control)
{
// get server status
+3
View File
@@ -53,6 +53,9 @@ namespace epics { namespace pvAccess {
* @see TransportSender#unlock()
*/
void unlock();
void acquire();
void release();
void send(ByteBuffer* buffer, TransportSendControl* control);
/**
+16
View File
@@ -477,6 +477,14 @@ namespace epics {
// noop
}
virtual void acquire() {
// noop, since does not make sence on itself
}
virtual void release() {
// noop, since does not make sence on itself
}
virtual void send(epics::pvData::ByteBuffer* buffer,
TransportSendControl* control);
@@ -651,6 +659,14 @@ namespace epics {
// noop
}
virtual void acquire() {
// noop, since does not make sence on itself
}
virtual void release() {
// noop, since does not make sence on itself
}
/**
* Verify transport. Server side is self-verified.
*/
+2 -2
View File
@@ -65,7 +65,7 @@ namespace epics {
sizeof(strBuffer));
ostringstream temp;
temp<<"Socket create error: "<<strBuffer;
errlogSevPrintf(errlogMajor, temp.str().c_str());
errlogSevPrintf(errlogMajor, "%s", temp.str().c_str());
THROW_BASE_EXCEPTION(temp.str().c_str());
}
else {
@@ -121,7 +121,7 @@ namespace epics {
sizeof(strBuffer));
ostringstream temp;
temp<<"Socket listen error: "<<strBuffer;
errlogSevPrintf(errlogMajor, temp.str().c_str());
errlogSevPrintf(errlogMajor, "%s", temp.str().c_str());
THROW_BASE_EXCEPTION(temp.str().c_str());
}
@@ -54,6 +54,12 @@ namespace epics {
virtual void unlock() {
}
virtual void acquire() {
}
virtual void release() {
}
virtual void
send(ByteBuffer* buffer, TransportSendControl* control);
@@ -129,7 +135,18 @@ namespace epics {
BlockingTCPTransport::~BlockingTCPTransport() {
close(true);
TransportSender* sender;
while (sender = _monitorSendQueue->extract())
sender->release();
delete _monitorSendQueue;
while (sender = _sendQueue->extract())
sender->release();
delete _sendQueue;
delete _monitorSender;
delete _socketBuffer;
delete _sendBuffer;
@@ -244,6 +261,7 @@ namespace epics {
internalVerified = _verified;
_verifiedMutex.unlock();
}
return internalVerified;
}
@@ -806,6 +824,7 @@ namespace epics {
_sendBuffer->setPosition(_lastMessageStartPosition);
}
sender->unlock();
sender->release();
} // if(sender!=NULL)
} // while(!_closed)
}
@@ -869,6 +888,7 @@ printf("sendThreadRunnner exception\n");
void BlockingTCPTransport::enqueueSendRequest(TransportSender* sender) {
Lock lock(&_sendQueueMutex);
if(_closed) return;
sender->acquire();
_sendQueue->insert(sender);
_sendQueueEvent.signal();
}
@@ -876,6 +896,7 @@ printf("sendThreadRunnner exception\n");
void BlockingTCPTransport::enqueueMonitorSendRequest(TransportSender* sender) {
Lock lock(&_monitorMutex);
if(_closed) return;
sender->acquire();
_monitorSendQueue->insert(sender);
if(_monitorSendQueue->size()==1) enqueueSendRequest(_monitorSender);
}
@@ -898,6 +919,7 @@ printf("sendThreadRunnner exception\n");
break;
}
sender->send(buffer, control);
sender->release();
}
}
+2 -2
View File
@@ -123,8 +123,8 @@ namespace epics {
virtual void lock() =0;
virtual void unlock() =0;
//virtual void acquire() =0;
//virtual void release() =0;
virtual void acquire() =0;
virtual void release() =0;
};
/**
+90 -67
View File
@@ -50,7 +50,8 @@ namespace epics {
class BaseRequestImpl :
public DataResponse,
public SubscriptionRequest,
public TransportSender {
public TransportSender,
public Destroyable {
protected:
ChannelImpl* m_channel;
@@ -70,6 +71,10 @@ namespace epics {
int32 m_pendingRequest;
Mutex m_mutex;
int m_refCount;
virtual ~BaseRequestImpl() {};
public:
@@ -84,7 +89,7 @@ namespace epics {
BaseRequestImpl(ChannelImpl* channel, Requester* requester) :
m_channel(channel), m_context(channel->getContext()),
m_requester(requester), m_destroyed(false), m_remotelyDestroyed(false),
m_pendingRequest(NULL_REQUEST)
m_pendingRequest(NULL_REQUEST), m_refCount(1)
{
// register response request
m_ioid = m_context->registerResponseRequest(this);
@@ -182,10 +187,11 @@ namespace epics {
// destroy remote instance
if (!m_remotelyDestroyed)
{
// TODO !!! startRequest(PURE_DESTROY_REQUEST);
/// TODO !!! causes crash m_channel->checkAndGetTransport()->enqueueSendRequest(this);
startRequest(PURE_DESTROY_REQUEST);
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
}
release();
}
virtual void timeout() {
@@ -227,6 +233,20 @@ namespace epics {
// noop
}
virtual void acquire() {
Lock guard(&m_mutex);
m_refCount++;
}
virtual void release() {
m_mutex.lock();
m_refCount--;
m_mutex.unlock();
if (m_refCount == 0)
delete this;
}
};
@@ -257,6 +277,7 @@ namespace epics {
~ChannelProcessRequestImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelProcess);
if (m_pvRequest) delete m_pvRequest;
}
public:
@@ -343,14 +364,11 @@ namespace epics {
startRequest(QOS_INIT);
transport->enqueueSendRequest(this);
}
virtual void destroy()
{
BaseRequestImpl::destroy();
if (m_pvRequest) delete m_pvRequest;
delete this;
}
};
@@ -375,6 +393,10 @@ namespace epics {
~ChannelGetImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelGet);
// synced by code calling this
if (m_data) delete m_data;
if (m_bitSet) delete m_bitSet;
if (m_pvRequest) delete m_pvRequest;
}
public:
@@ -484,17 +506,10 @@ namespace epics {
transport->enqueueSendRequest(this);
}
virtual void destroy()
{
BaseRequestImpl::destroy();
// synced by code above
if (m_data) delete m_data;
if (m_bitSet) delete m_bitSet;
if (m_pvRequest) delete m_pvRequest;
delete this;
}
};
@@ -523,6 +538,10 @@ namespace epics {
~ChannelPutImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelPut);
// synced by code calling this
if (m_data) delete m_data;
if (m_bitSet) delete m_bitSet;
if (m_pvRequest) delete m_pvRequest;
}
public:
@@ -662,17 +681,10 @@ namespace epics {
transport->enqueueSendRequest(this);
}
virtual void destroy()
{
BaseRequestImpl::destroy();
// TODO sync
if (m_data) delete m_data;
if (m_bitSet) delete m_bitSet;
if (m_pvRequest) delete m_pvRequest;
delete this;
}
};
@@ -698,6 +710,10 @@ namespace epics {
~ChannelPutGetImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelPutGet);
// synced by code calling this
if (m_putData) delete m_putData;
if (m_getData) delete m_getData;
if (m_pvRequest) delete m_pvRequest;
}
public:
@@ -878,17 +894,10 @@ namespace epics {
transport->enqueueSendRequest(this);
}
virtual void destroy()
{
BaseRequestImpl::destroy();
// TODO sync
if (m_putData) delete m_putData;
if (m_getData) delete m_getData;
if (m_pvRequest) delete m_pvRequest;
delete this;
}
};
@@ -916,6 +925,10 @@ namespace epics {
~ChannelRPCImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelRPC);
// synced by code calling this
if (m_data) delete m_data;
if (m_bitSet) delete m_bitSet;
if (m_pvRequest) delete m_pvRequest;
}
public:
@@ -1026,17 +1039,10 @@ namespace epics {
transport->enqueueSendRequest(this);
}
virtual void destroy()
{
BaseRequestImpl::destroy();
// TODO sync
if (m_data) delete m_data;
if (m_bitSet) delete m_bitSet;
if (m_pvRequest) delete m_pvRequest;
delete this;
}
};
@@ -1068,6 +1074,9 @@ namespace epics {
~ChannelArrayImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelArray);
// synced by code calling this
if (m_data) delete m_data;
if (m_pvRequest) delete m_pvRequest;
}
public:
@@ -1247,16 +1256,10 @@ namespace epics {
transport->enqueueSendRequest(this);
}
virtual void destroy()
{
BaseRequestImpl::destroy();
// TODO sync
if (m_data) delete m_data;
if (m_pvRequest) delete m_pvRequest;
delete this;
}
};
@@ -1280,7 +1283,8 @@ namespace epics {
String m_subField;
Mutex m_mutex;
bool m_destroyed;
int m_refCount;
private:
~ChannelGetFieldRequestImpl()
{
@@ -1292,7 +1296,7 @@ namespace epics {
ChannelGetFieldRequestImpl(ChannelImpl* channel, GetFieldRequester* callback, String subField) :
m_channel(channel), m_context(channel->getContext()),
m_callback(callback), m_subField(subField),
m_destroyed(false)
m_destroyed(false), m_refCount(1)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelGetField);
@@ -1361,7 +1365,20 @@ namespace epics {
m_context->unregisterResponseRequest(this);
m_channel->unregisterResponseRequest(this);
delete this;
release();
}
virtual void acquire() {
Lock guard(&m_mutex);
m_refCount++;
}
virtual void release() {
m_mutex.lock();
m_refCount--;
m_mutex.unlock();
if (m_refCount == 0)
delete this;
}
virtual void response(Transport* transport, int8 version, ByteBuffer* payloadBuffer) {
@@ -1436,6 +1453,19 @@ namespace epics {
~ChannelMonitorImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelMonitor);
// synced by code calling this
if (m_pvRequest) delete m_pvRequest;
// uncomment when m_pvStructure not destroyed if (m_structure) m_structure->decReferenceCount();
// TODO temp
if (m_pvStructure)
{
delete m_pvStructure;
delete m_overrunBitSet;
delete m_changedBitSet;
}
}
public:
@@ -1535,27 +1565,6 @@ namespace epics {
transport->enqueueSendRequest(this);
}
virtual void destroy()
{
BaseRequestImpl::destroy();
// TODO sync
if (m_pvRequest) delete m_pvRequest;
// uncomment when m_pvStructure not destroyed if (m_structure) m_structure->decReferenceCount();
// TODO temp
if (m_pvStructure)
{
delete m_pvStructure;
delete m_overrunBitSet;
delete m_changedBitSet;
}
delete this;
}
// override, since we optimize status
virtual void response(Transport* transport, int8 version, ByteBuffer* payloadBuffer) {
// TODO?
@@ -1644,6 +1653,11 @@ namespace epics {
}
virtual void destroy()
{
BaseRequestImpl::destroy();
}
// ============ temp ============
virtual MonitorElement* poll()
@@ -2508,6 +2522,15 @@ namespace epics {
Lock guard(&m_channelMutex);
m_references++;
}
virtual void release() {
m_channelMutex.lock();
m_references--;
m_channelMutex.unlock();
// if (m_references == 0)
// delete this;
}
// TTTOOOOOOODOOOOOO !!!
/**
* Actual destroy method, to be called <code>CAJContext</code>.
+7
View File
@@ -136,8 +136,15 @@ namespace epics {
}
virtual void unlock() {
}
virtual void acquire() {
}
virtual void release() {
delete this;
}
private:
osiSockAddr _echoFrom;
+4
View File
@@ -115,6 +115,10 @@ public:
}
virtual void unlock() {
}
virtual void acquire() {
}
virtual void release() {
}
private:
char data[20];
int count;
+4
View File
@@ -87,6 +87,10 @@ public:
}
virtual void unlock() {
}
virtual void acquire() {
}
virtual void release() {
}
private:
char data[20];
int count;
+3 -3
View File
@@ -451,7 +451,7 @@ int main(int argc,char *argv[])
channel->printInfo();
PVStructure* pvRequest;
/*
GetFieldRequesterImpl getFieldRequesterImpl;
channel->getField(&getFieldRequesterImpl, "");
epicsThreadSleep ( 1.0 );
@@ -521,7 +521,7 @@ int main(int argc,char *argv[])
channelArray->setLength(false,3,4);
epicsThreadSleep ( 1.0 );
channelArray->destroy();
*/
MonitorRequesterImpl monitorRequesterImpl;
pvRequest = getCreateRequest()->createRequest("field()",&monitorRequesterImpl);
Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, pvRequest);
@@ -532,7 +532,7 @@ int main(int argc,char *argv[])
std::cout << "monitor->start() = " << status->toString() << std::endl;
delete status;
epicsThreadSleep( 30.0 );
epicsThreadSleep( 3.0 );
status = monitor->stop();
std::cout << "monitor->stop() = " << status->toString() << std::endl;