clientContext cleanup

This commit is contained in:
Matej Sekoranja
2012-07-30 11:24:38 +02:00
parent 835ce323f1
commit 878f44ffb8
3 changed files with 86 additions and 61 deletions

22
TODO
View File

@@ -4,4 +4,26 @@ opt) connection validation message sends max paylaod size
readSize checks if size is in limits of size_t?
1. protected void destroy(boolean createRequestFailed) {
2. reuse on reconnect !!!
3. initResponse failed: should I give non-null instance or thisPtr?
4. channelArray need to lock offset, conut, ..
// should be called without any lock hold
// TODO in Java as in C++ ? reportChannelStateChange();
else
// TODO not only first
// TODO minor version
// TODO what to do if there is no channel, do not search in a loop!!! do this in other thread...!
searchResponse(CAConstants.CA_MINOR_PROTOCOL_REVISION, addresses[0]);
void transportUnresponsive() { not implemented (also in Java)

View File

@@ -2439,7 +2439,7 @@ namespace epics {
{
transport->ensureData(4);
pvAccessID cid = payloadBuffer->getInt();
csm->searchResponse(cid, searchSequenceId, version & 0x0F, &serverAddress);
csm->searchResponse(cid, searchSequenceId, version, &serverAddress);
}
@@ -2505,7 +2505,6 @@ namespace epics {
if (beaconHandler == 0)
return;
// TODO smart pointers
// extra data
PVFieldPtr data;
const FieldConstPtr field = getFieldCreate()->deserialize(payloadBuffer, transport.get());
@@ -2686,7 +2685,7 @@ namespace epics {
{
// TODO remove debug output
char buf[100];
sprintf(buf, "Invalid CA header %d, its payload", command);
sprintf(buf, "Invalid (or unsupported) command %d, its payload", command);
hexDump(buf, (const int8*)(payloadBuffer->getArray()), payloadBuffer->getPosition(), payloadSize);
return;
}
@@ -2738,7 +2737,7 @@ namespace epics {
class ChannelImplFind : public ChannelFind
{
public:
ChannelImplFind(ChannelProvider::shared_pointer provider) : m_provider(provider)
ChannelImplFind(ChannelProvider::shared_pointer const & provider) : m_provider(provider)
{
}
@@ -2769,7 +2768,7 @@ namespace epics {
class ChannelProviderImpl : public ChannelProvider {
public:
ChannelProviderImpl(ClientContextImpl* context) :
ChannelProviderImpl(std::tr1::shared_ptr<ClientContextImpl> const & context) :
m_context(context)
{
}
@@ -2787,8 +2786,11 @@ namespace epics {
epics::pvData::String channelName,
ChannelFindRequester::shared_pointer const & channelFindRequester)
{
// TODO
m_context->checkChannelName(channelName);
// TODO not implemented
std::tr1::shared_ptr<ClientContextImpl> context = m_context.lock();
if (context.get())
context->checkChannelName(channelName);
if (!channelFindRequester.get())
throw std::runtime_error("null requester");
@@ -2813,9 +2815,18 @@ namespace epics {
short priority,
epics::pvData::String /*address*/)
{
// TODO support addressList
std::tr1::shared_ptr<ClientContextImpl> context = m_context.lock();
if (!context.get())
{
Status errorStatus(Status::STATUSTYPE_ERROR, "context already destroyed");
Channel::shared_pointer nullChannel;
EXCEPTION_GUARD(channelRequester->channelCreated(errorStatus, nullChannel));
return nullChannel;
}
// TODO support addressList
auto_ptr<InetAddrVector> addresses;
Channel::shared_pointer channel = m_context->createChannelInternal(channelName, channelRequester, priority, addresses);
Channel::shared_pointer channel = context->createChannelInternal(channelName, channelRequester, priority, addresses);
if (channel.get())
channelRequester->channelCreated(Status::Ok, channel);
return channel;
@@ -2827,7 +2838,7 @@ namespace epics {
private:
ClientContextImpl* m_context;
std::tr1::weak_ptr<ClientContextImpl> m_context;
};
@@ -3131,7 +3142,7 @@ namespace epics {
m_transport->release(getID());
}
else if (m_transport == transport)
else if (m_transport.get() == transport.get())
{
// request to sent create request to same transport, ignore
// this happens when server is slower (processing search requests) than client generating it
@@ -3139,8 +3150,7 @@ namespace epics {
}
m_transport = transport;
TransportSender::shared_pointer thisSender = shared_from_this();
m_transport->enqueueSendRequest(thisSender);
m_transport->enqueueSendRequest(shared_from_this());
}
virtual void cancel() {
@@ -3173,12 +3183,15 @@ namespace epics {
{
Lock guard(m_channelMutex);
bool allOK = false;
try
{
// do this silently
if (m_connectionState == DESTROYED)
{
// end connection request
cancel();
return;
}
// store data
m_serverChannelID = sid;
@@ -3189,18 +3202,15 @@ namespace epics {
// but this cannot happen since transport (TCP) is serving in this thread
resubscribeSubscriptions();
setConnectionState(CONNECTED);
allOK = true;
}
catch (...) {
// noop
// TODO at least log something??
}
if (!allOK)
{
// end connection request
cancel();
}
// NOTE: always call cancel
// end connection request
cancel();
}
// should be called without any lock hold
@@ -3219,8 +3229,7 @@ namespace epics {
}
// do destruction via context
ChannelImpl::shared_pointer thisPointer = shared_from_this();
m_context->destroyChannel(thisPointer, force);
m_context->destroyChannel(shared_from_this(), force);
}
@@ -3260,8 +3269,7 @@ namespace epics {
setConnectionState(DESTROYED);
// unregister
ChannelImpl::shared_pointer thisPointer = shared_from_this();
m_context->unregisterChannel(thisPointer);
m_context->unregisterChannel(shared_from_this());
}
// should be called without any lock hold
@@ -3281,8 +3289,7 @@ namespace epics {
if (!initiateSearch) {
// stop searching...
SearchInstance::shared_pointer thisChannelPointer = shared_from_this();
m_context->getChannelSearchManager()->unregisterSearchInstance(thisChannelPointer);
m_context->getChannelSearchManager()->unregisterSearchInstance(shared_from_this());
cancel();
}
setConnectionState(DISCONNECTED);
@@ -3294,8 +3301,7 @@ namespace epics {
{
if (remoteDestroy) {
m_issueCreateMessage = false;
TransportSender::shared_pointer thisSender = shared_from_this();
m_transport->enqueueSendRequest(thisSender);
m_transport->enqueueSendRequest(shared_from_this());
}
m_transport->release(getID());
@@ -3318,8 +3324,7 @@ namespace epics {
if (!m_addresses.get())
{
SearchInstance::shared_pointer thisChannelPointer = shared_from_this();
m_context->getChannelSearchManager()->registerSearchInstance(thisChannelPointer);
m_context->getChannelSearchManager()->registerSearchInstance(shared_from_this());
}
/* TODO
else
@@ -3344,8 +3349,7 @@ namespace epics {
}
}
TransportClient::shared_pointer thisPointer = shared_from_this();
transport = m_context->getTransport(thisPointer, serverAddress, minorRevision, m_priority);
transport = m_context->getTransport(shared_from_this(), serverAddress, minorRevision, m_priority);
if (!transport.get())
{
createChannelFailed();
@@ -3364,7 +3368,10 @@ namespace epics {
}
virtual void transportChanged() {
initiateSearch();
// initiateSearch();
// TODO
// this will be called immediately after reconnect... bad...
}
virtual Transport::shared_pointer checkAndGetTransport()
@@ -3396,11 +3403,12 @@ namespace epics {
}
void transportUnresponsive() {
/*
{
Lock guard(m_channelMutex);
if (m_connectionState == CONNECTED)
{
// NOTE: 2 types of disconnected state - distinguish them
// TODO 2 types of disconnected state - distinguish them otherwise disconnect will handle connection loss right
setConnectionState(DISCONNECTED);
// ... CA notifies also w/ no access rights callback, although access right are not changed
@@ -3409,6 +3417,7 @@ namespace epics {
// should be called without any lock hold
reportChannelStateChange();
*/
}
/**
@@ -3437,7 +3446,6 @@ namespace epics {
std::queue<ConnectionState> channelStateChangeQueue;
// TODO very suboptimal, usually there is only one element
void reportChannelStateChange()
{
Channel::shared_pointer thisPointer = shared_from_this();
@@ -3510,7 +3518,7 @@ namespace epics {
*/
void disconnectPendingIO(bool destroy)
{
Status* status = destroy ? &channelDestroyed : &channelDisconnected;
Status& status = destroy ? channelDestroyed : channelDisconnected;
Lock guard(m_responseRequestsMutex);
@@ -3530,7 +3538,7 @@ namespace epics {
{
if(ptr = rrs[i].lock())
{
EXCEPTION_GUARD(ptr->reportStatus(*status));
EXCEPTION_GUARD(ptr->reportStatus(status));
}
}
}
@@ -3590,64 +3598,56 @@ namespace epics {
virtual void getField(GetFieldRequester::shared_pointer const & requester,epics::pvData::String subField)
{
ChannelImpl::shared_pointer thisPointer = shared_from_this();
ChannelGetFieldRequestImpl::create(thisPointer, requester, subField);
ChannelGetFieldRequestImpl::create(shared_from_this(), requester, subField);
}
virtual ChannelProcess::shared_pointer createChannelProcess(
ChannelProcessRequester::shared_pointer const & channelProcessRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
ChannelImpl::shared_pointer thisPointer = shared_from_this();
return ChannelProcessRequestImpl::create(thisPointer, channelProcessRequester, pvRequest);
return ChannelProcessRequestImpl::create(shared_from_this(), channelProcessRequester, pvRequest);
}
virtual ChannelGet::shared_pointer createChannelGet(
ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
ChannelImpl::shared_pointer thisPointer = shared_from_this();
return ChannelGetImpl::create(thisPointer, channelGetRequester, pvRequest);
return ChannelGetImpl::create(shared_from_this(), channelGetRequester, pvRequest);
}
virtual ChannelPut::shared_pointer createChannelPut(
ChannelPutRequester::shared_pointer const & channelPutRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
ChannelImpl::shared_pointer thisPointer = shared_from_this();
return ChannelPutImpl::create(thisPointer, channelPutRequester, pvRequest);
return ChannelPutImpl::create(shared_from_this(), channelPutRequester, pvRequest);
}
virtual ChannelPutGet::shared_pointer createChannelPutGet(
ChannelPutGetRequester::shared_pointer const & channelPutGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
ChannelImpl::shared_pointer thisPointer = shared_from_this();
return ChannelPutGetImpl::create(thisPointer, channelPutGetRequester, pvRequest);
return ChannelPutGetImpl::create(shared_from_this(), channelPutGetRequester, pvRequest);
}
virtual ChannelRPC::shared_pointer createChannelRPC(
ChannelRPCRequester::shared_pointer const & channelRPCRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
ChannelImpl::shared_pointer thisPointer = shared_from_this();
return ChannelRPCImpl::create(thisPointer, channelRPCRequester, pvRequest);
return ChannelRPCImpl::create(shared_from_this(), channelRPCRequester, pvRequest);
}
virtual epics::pvData::Monitor::shared_pointer createMonitor(
epics::pvData::MonitorRequester::shared_pointer const & monitorRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
ChannelImpl::shared_pointer thisPointer = shared_from_this();
return ChannelMonitorImpl::create(thisPointer, monitorRequester, pvRequest);
return ChannelMonitorImpl::create(shared_from_this(), monitorRequester, pvRequest);
}
virtual ChannelArray::shared_pointer createChannelArray(
ChannelArrayRequester::shared_pointer const & channelArrayRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
ChannelImpl::shared_pointer thisPointer = shared_from_this();
return ChannelArrayImpl::create(thisPointer, channelArrayRequester, pvRequest);
return ChannelArrayImpl::create(shared_from_this(), channelArrayRequester, pvRequest);
}
@@ -3690,7 +3690,6 @@ namespace epics {
m_configuration(new SystemConfigurationImpl())
{
PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(remoteClientContext);
m_provider.reset(new ChannelProviderImpl(this));
loadConfiguration();
}
@@ -3699,9 +3698,15 @@ namespace epics {
static InternalClientContextImpl::shared_pointer create()
{
InternalClientContextImpl::shared_pointer thisPointer(new InternalClientContextImpl(), delayed_destroyable_deleter());
static_cast<InternalClientContextImpl*>(thisPointer.get())->activate();
return thisPointer;
}
void activate()
{
m_provider.reset(new ChannelProviderImpl(shared_from_this()));
}
virtual Configuration::shared_pointer getConfiguration() {
/*
TODO
@@ -3714,7 +3719,7 @@ TODO
return m_configuration;
}
virtual Version& getVersion() {
virtual const Version& getVersion() {
return m_version;
}
@@ -3837,7 +3842,7 @@ TODO
// setup search manager
m_channelSearchManager = SimpleChannelSearchManagerImpl::create(thisPointer);
// TODO put memory barrier here...
// TODO put memory barrier here... (if not already called withing a lock?)
// setup UDP transport
initializeUDPTransport();
@@ -4140,8 +4145,7 @@ TODO
try
{
// TODO we are creating a new response handler even-though we might not need a new transprot !!!
ClientContextImpl::shared_pointer thisPointer = shared_from_this();
auto_ptr<ResponseHandler> handler(new ClientResponseHandler(thisPointer));
auto_ptr<ResponseHandler> handler(new ClientResponseHandler(shared_from_this()));
return m_connector->connect(client, handler, *serverAddress, minorRevision, priority);
}
catch (...)
@@ -4175,8 +4179,7 @@ TODO
try
{
pvAccessID cid = generateCID();
ClientContextImpl::shared_pointer thisPointer = shared_from_this();
return InternalChannelImpl::create(thisPointer, cid, name, requester, priority, addresses);
return InternalChannelImpl::create(shared_from_this(), cid, name, requester, priority, addresses);
}
catch(...) {
// TODO

View File

@@ -56,7 +56,7 @@ namespace epics {
* Get context implementation version.
* @return version of the context implementation.
*/
virtual Version& getVersion() = 0;
virtual const Version& getVersion() = 0;
/**
* Initialize client context. This method is called immediately after instance construction (call of constructor).