client.h guard concurrent callbacks on cancel
cancel() blocks until concurrent callbacks have completed. Also serializes any callbacks for each Operation/Monitor.
This commit is contained in:
@@ -5,6 +5,7 @@
|
||||
|
||||
#include <epicsMutex.h>
|
||||
#include <epicsGuard.h>
|
||||
#include <epicsEvent.h>
|
||||
|
||||
#include <pv/pvData.h>
|
||||
#include <pv/bitSet.h>
|
||||
@@ -16,13 +17,14 @@
|
||||
#include "pv/pvAccess.h"
|
||||
|
||||
namespace {
|
||||
using pvac::detail::CallbackGuard;
|
||||
using pvac::detail::CallbackUse;
|
||||
|
||||
struct RPCer : public pva::ChannelRPCRequester,
|
||||
struct RPCer : public pvac::detail::CallbackStorage,
|
||||
public pva::ChannelRPCRequester,
|
||||
public pvac::Operation::Impl,
|
||||
public pvac::detail::wrapped_shared_from_this<RPCer>
|
||||
{
|
||||
mutable epicsMutex mutex;
|
||||
|
||||
bool started;
|
||||
operation_type::shared_pointer op;
|
||||
|
||||
@@ -36,9 +38,14 @@ struct RPCer : public pva::ChannelRPCRequester,
|
||||
RPCer(pvac::ClientChannel::GetCallback* cb,
|
||||
const pvd::PVStructure::const_shared_pointer& args) :started(false), cb(cb), args(args)
|
||||
{REFTRACE_INCREMENT(num_instances);}
|
||||
virtual ~RPCer() {REFTRACE_DECREMENT(num_instances);}
|
||||
virtual ~RPCer() {
|
||||
CallbackGuard G(*this);
|
||||
cb = 0;
|
||||
G.wait(); // paranoia
|
||||
REFTRACE_DECREMENT(num_instances);
|
||||
}
|
||||
|
||||
void callEvent(Guard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail)
|
||||
void callEvent(CallbackGuard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail)
|
||||
{
|
||||
pvac::ClientChannel::GetCallback *cb=this->cb;
|
||||
if(!cb) return;
|
||||
@@ -48,24 +55,11 @@ struct RPCer : public pva::ChannelRPCRequester,
|
||||
this->cb = 0;
|
||||
|
||||
try {
|
||||
UnGuard U(G);
|
||||
CallbackUse U(G);
|
||||
cb->getDone(event);
|
||||
return;
|
||||
}catch(std::exception& e){
|
||||
if(!this->cb || evt==pvac::GetEvent::Fail) {
|
||||
LOG(pva::logLevelError, "Unhandled exception in ClientChannel::GetCallback::getDone(): %s", e.what());
|
||||
} else {
|
||||
event.event = pvac::GetEvent::Fail;
|
||||
event.message = e.what();
|
||||
}
|
||||
}
|
||||
// continues error handling
|
||||
try {
|
||||
UnGuard U(G);
|
||||
cb->getDone(event);
|
||||
return;
|
||||
}catch(std::exception& e){
|
||||
LOG(pva::logLevelError, "Unhandled exception following exception in ClientChannel::GetCallback::monitorEvent(): %s", e.what());
|
||||
LOG(pva::logLevelError, "Unhandled exception in ClientChannel::RPCCallback::requestDone(): %s", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,7 +73,7 @@ struct RPCer : public pva::ChannelRPCRequester,
|
||||
virtual void cancel()
|
||||
{
|
||||
std::tr1::shared_ptr<RPCer> keepalive(internal_shared_from_this());
|
||||
Guard G(mutex);
|
||||
CallbackGuard G(*this);
|
||||
if(started && op) op->cancel();
|
||||
callEvent(G, pvac::GetEvent::Cancel);
|
||||
}
|
||||
@@ -95,7 +89,7 @@ struct RPCer : public pva::ChannelRPCRequester,
|
||||
pva::ChannelRPC::shared_pointer const & operation)
|
||||
{
|
||||
std::tr1::shared_ptr<RPCer> keepalive(internal_shared_from_this());
|
||||
Guard G(mutex);
|
||||
CallbackGuard G(*this);
|
||||
if(!cb || started) return;
|
||||
|
||||
if(!status.isOK()) {
|
||||
@@ -115,7 +109,7 @@ struct RPCer : public pva::ChannelRPCRequester,
|
||||
virtual void channelDisconnect(bool destroy) OVERRIDE FINAL
|
||||
{
|
||||
std::tr1::shared_ptr<RPCer> keepalive(internal_shared_from_this());
|
||||
Guard G(mutex);
|
||||
CallbackGuard G(*this);
|
||||
event.message = "Disconnect";
|
||||
|
||||
callEvent(G);
|
||||
@@ -127,7 +121,7 @@ struct RPCer : public pva::ChannelRPCRequester,
|
||||
epics::pvData::PVStructure::shared_pointer const & pvResponse)
|
||||
{
|
||||
std::tr1::shared_ptr<RPCer> keepalive(internal_shared_from_this());
|
||||
Guard G(mutex);
|
||||
CallbackGuard G(*this);
|
||||
if(!cb) return;
|
||||
|
||||
if(!status.isOK()) {
|
||||
|
||||
Reference in New Issue
Block a user