Merge commit '731767b086e8146f31c1f9038568e3c852c609f7'

* commit '731767b086e8146f31c1f9038568e3c852c609f7':
  SharedPV fix onFirstConnect() on monitor creation
  SharedPV allow use of Channel::destroy() to trigger onLastDisconnect()
  SharedPV defer onFirstConnect() to first getField/Put/Monitor
  client.h add some exception guards
  more SharedPV rpc state tracking
  more UDP debugging
  fix global string constants
  fix SharedPV allow rpc() while close()'d
  RPC troubleshooting
  minor
  SharedPV too many notifiedConn
  client.h minor
This commit is contained in:
Michael Davidsaver
2018-10-08 09:56:37 -07:00
14 changed files with 211 additions and 84 deletions

View File

@@ -50,7 +50,11 @@ struct Getter : public pvac::detail::CallbackStorage,
pvac::ClientChannel::GetCallback *C=cb;
cb = 0;
CallbackUse U(G);
C->getDone(event);
try {
C->getDone(event);
} catch(std::exception& e) {
LOG(pva::logLevelInfo, "Lost exception during getDone(): %s", e.what());
}
}
virtual std::string name() const OVERRIDE FINAL

View File

@@ -54,7 +54,11 @@ struct Putter : public pvac::detail::CallbackStorage,
pvac::ClientChannel::PutCallback *C=cb;
cb = 0;
CallbackUse U(G);
C->putDone(event);
try {
C->putDone(event);
} catch(std::exception& e) {
LOG(pva::logLevelInfo, "Lost exception during putDone(): %s", e.what());
}
}
virtual std::string name() const OVERRIDE FINAL
@@ -205,13 +209,13 @@ namespace pvac {
Operation
ClientChannel::put(PutCallback* cb,
epics::pvData::PVStructure::const_shared_pointer pvRequest,
bool getcurrent)
bool getprevious)
{
if(!impl) throw std::logic_error("Dead Channel");
if(!pvRequest)
pvRequest = pvd::createRequest("field()");
std::tr1::shared_ptr<Putter> ret(Putter::build(cb, getcurrent));
std::tr1::shared_ptr<Putter> ret(Putter::build(cb, getprevious));
{
Guard G(ret->mutex);

View File

@@ -29,6 +29,7 @@ struct RPCer : public pvac::detail::CallbackStorage,
operation_type::shared_pointer op;
pvac::ClientChannel::GetCallback *cb;
// 'event' may be modified as long as cb!=NULL
pvac::GetEvent event;
pvd::PVStructure::const_shared_pointer args;
@@ -70,7 +71,7 @@ struct RPCer : public pvac::detail::CallbackStorage,
}
// called automatically via wrapped_shared_from_this
virtual void cancel()
virtual void cancel() OVERRIDE FINAL
{
std::tr1::shared_ptr<RPCer> keepalive(internal_shared_from_this());
CallbackGuard G(*this);
@@ -86,7 +87,7 @@ struct RPCer : public pvac::detail::CallbackStorage,
virtual void channelRPCConnect(
const epics::pvData::Status& status,
pva::ChannelRPC::shared_pointer const & operation)
pva::ChannelRPC::shared_pointer const & operation) OVERRIDE FINAL
{
std::tr1::shared_ptr<RPCer> keepalive(internal_shared_from_this());
CallbackGuard G(*this);
@@ -110,6 +111,7 @@ struct RPCer : public pvac::detail::CallbackStorage,
{
std::tr1::shared_ptr<RPCer> keepalive(internal_shared_from_this());
CallbackGuard G(*this);
if(!cb) return;
event.message = "Disconnect";
callEvent(G);
@@ -118,7 +120,7 @@ struct RPCer : public pvac::detail::CallbackStorage,
virtual void requestDone(
const epics::pvData::Status& status,
pva::ChannelRPC::shared_pointer const & operation,
epics::pvData::PVStructure::shared_pointer const & pvResponse)
epics::pvData::PVStructure::shared_pointer const & pvResponse) OVERRIDE FINAL
{
std::tr1::shared_ptr<RPCer> keepalive(internal_shared_from_this());
CallbackGuard G(*this);

View File

@@ -385,7 +385,7 @@ public:
//! If false, then previous=NULL
Operation put(PutCallback* cb,
epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer(),
bool getcurrent = false);
bool getprevious = false);
//! Synchronious put operation
inline

View File

@@ -18,6 +18,7 @@
# define epicsExportSharedSymbols
# undef pvaConstantsepicsExportSharedSymbols
#endif
#include <shareLib.h>
namespace epics {
namespace pvAccess {
@@ -73,13 +74,13 @@ const epics::pvData::int16 INVALID_DATA_TYPE = 0xFFFF;
const epics::pvData::int32 INVALID_IOID = 0;
/** Default PVA provider name. */
const std::string PVACCESS_DEFAULT_PROVIDER;
epicsShareExtern const std::string PVACCESS_DEFAULT_PROVIDER;
/** "All-providers registered" PVA provider name. */
const std::string PVACCESS_ALL_PROVIDERS;
epicsShareExtern const std::string PVACCESS_ALL_PROVIDERS;
/** Name of the system env. variable to turn on debugging. */
const std::string PVACCESS_DEBUG;
epicsShareExtern const std::string PVACCESS_DEBUG;
}
}

View File

@@ -8,6 +8,7 @@
#define epicsExportSharedSymbols
#include <pv/pvaVersion.h>
#include <pv/pvaConstants.h>
using std::stringstream;
using std::string;
@@ -15,9 +16,9 @@ using std::string;
namespace epics {
namespace pvAccess {
const std::string PVACCESS_DEFAULT_PROVIDER = "local";
const std::string PVACCESS_ALL_PROVIDERS = "<all>";
const std::string PVACCESS_DEBUG = "EPICS_PVA_DEBUG";
const std::string PVACCESS_DEFAULT_PROVIDER("local");
const std::string PVACCESS_ALL_PROVIDERS("<all>");
const std::string PVACCESS_DEBUG("EPICS_PVA_DEBUG");
Version::Version(std::string const & productName,
std::string const & implementationLangugage,

View File

@@ -225,7 +225,7 @@ void BlockingUDPTransport::run() {
0, (sockaddr*)&fromAddress,
&addrStructSize);
if(likely(bytesRead>0)) {
if(likely(bytesRead>=0)) {
// successfully got datagram
bool ignore = false;
for(size_t i = 0; i <_ignoredAddresses.size(); i++)
@@ -233,11 +233,22 @@ void BlockingUDPTransport::run() {
if(_ignoredAddresses[i].ia.sin_addr.s_addr==fromAddress.ia.sin_addr.s_addr)
{
ignore = true;
if(pvAccessIsLoggable(logLevelDebug)) {
char strBuffer[64];
sockAddrToDottedIP(&fromAddress.sa, strBuffer, sizeof(strBuffer));
LOG(logLevelDebug, "UDP Ignore (%d) %s x- %s", bytesRead, _remoteName.c_str(), strBuffer);
}
break;
}
}
if(likely(!ignore)) {
if(pvAccessIsLoggable(logLevelDebug)) {
char strBuffer[64];
sockAddrToDottedIP(&fromAddress.sa, strBuffer, sizeof(strBuffer));
LOG(logLevelDebug, "UDP Rx (%d) %s <- %s", bytesRead, _remoteName.c_str(), strBuffer);
}
_receiveBuffer.setPosition(RECEIVE_BUFFER_PRE_RESERVE);
_receiveBuffer.setLimit(RECEIVE_BUFFER_PRE_RESERVE+bytesRead);
@@ -253,8 +264,7 @@ void BlockingUDPTransport::run() {
__FILE__, __LINE__);
}
}
}
else if (unlikely(bytesRead == -1)) {
} else {
int socketError = SOCKERRNO;
@@ -400,7 +410,7 @@ bool BlockingUDPTransport::send(const char* buffer, size_t length, const osiSock
{
if (IS_LOGGABLE(logLevelDebug))
{
LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
LOG(logLevelDebug, "UDP Tx (%zu) %s -> %s.",
length, _remoteName.c_str(), inetAddressToString(address).c_str());
}

View File

@@ -206,7 +206,10 @@ private:
//! Used for initial Monitor update and Get operations.
epics::pvData::BitSet valid;
bool notifiedConn; // whether onFirstConnect() has been, or is being, called
// whether onFirstConnect() has been, or is being, called.
// Set when the first getField, Put, or Monitor (but not RPC) is created.
// Cleared when the last Channel is destroyed.
bool notifiedConn;
int debugLvl;

View File

@@ -2988,6 +2988,7 @@ ServerChannelRPCRequesterImpl::ServerChannelRPCRequesterImpl(
const pvAccessID ioid, Transport::shared_pointer const & transport):
BaseChannelRequester(context, channel, ioid, transport),
_channelRPC(), _pvResponse()
,_status(Status::fatal("Invalid State"))
{
}
@@ -3090,6 +3091,7 @@ void ServerChannelRPCRequesterImpl::send(ByteBuffer* buffer, TransportSendContro
SerializationHelper::serializeStructureFull(buffer, control, _pvResponse);
}
}
_status = Status::fatal("Stale state");
}
stopRequest();

View File

@@ -35,6 +35,7 @@ SharedChannel::SharedChannel(const std::tr1::shared_ptr<SharedPV> &owner,
,channelName(channelName)
,requester(requester)
,provider(provider)
,dead(false)
{
REFTRACE_INCREMENT(num_instances);
@@ -45,24 +46,27 @@ SharedChannel::SharedChannel(const std::tr1::shared_ptr<SharedPV> &owner,
this);
}
SharedPV::Handler::shared_pointer handler;
{
Guard G(owner->mutex);
if(owner->channels.empty())
handler = owner->handler;
owner->channels.push_back(this);
owner->notifiedConn = true;
}
if(handler) {
handler->onFirstConnect(owner);
}
}
SharedChannel::~SharedChannel()
{
destroy();
REFTRACE_DECREMENT(num_instances);
}
void SharedChannel::destroy()
{
std::tr1::shared_ptr<SharedPV::Handler> handler;
{
Guard G(owner->mutex);
if(dead) return;
dead = true;
bool wasempty = owner->channels.empty();
owner->channels.remove(this);
if(!wasempty && owner->channels.empty() && owner->notifiedConn) {
@@ -81,12 +85,8 @@ SharedChannel::~SharedChannel()
channelName.c_str(),
this);
}
REFTRACE_DECREMENT(num_instances);
}
void SharedChannel::destroy() {}
std::tr1::shared_ptr<pva::ChannelProvider> SharedChannel::getProvider()
{
return provider.lock();
@@ -109,16 +109,32 @@ std::tr1::shared_ptr<pva::ChannelRequester> SharedChannel::getChannelRequester()
void SharedChannel::getField(pva::GetFieldRequester::shared_pointer const & requester,std::string const & subField)
{
epics::pvData::FieldConstPtr desc;
pvd::FieldConstPtr desc;
pvd::Status sts;
SharedPV::Handler::shared_pointer handler;
{
Guard G(owner->mutex);
if(owner->type)
desc = owner->type;
else
if(dead) {
sts = pvd::Status::error("Dead Channel");
} else {
if(owner->type) {
desc = owner->type;
}
if(!owner->channels.empty() && !owner->notifiedConn) {
handler = owner->handler;
owner->notifiedConn = true;
}
owner->getfields.push_back(requester);
}
}
if(desc || !sts.isOK()) {
requester->getDone(sts, desc);
}
if(handler) {
handler->onFirstConnect(owner);
}
if(desc)
requester->getDone(pvd::Status(), desc);
}
pva::ChannelPut::shared_pointer SharedChannel::createChannelPut(
@@ -128,27 +144,42 @@ pva::ChannelPut::shared_pointer SharedChannel::createChannelPut(
std::tr1::shared_ptr<SharedPut> ret(new SharedPut(shared_from_this(), requester, pvRequest));
pvd::StructureConstPtr type;
pvd::Status sts;
std::string warning;
SharedPV::Handler::shared_pointer handler;
try {
{
Guard G(owner->mutex);
// ~SharedPut removes
owner->puts.push_back(ret.get());
if(owner->current) {
ret->mapper.compute(*owner->current, *pvRequest, owner->config.mapperMode);
type = ret->mapper.requested();
warning = ret->mapper.warnings();
if(dead) {
sts = pvd::Status::error("Dead Channel");
} else {
// ~SharedPut removes
owner->puts.push_back(ret.get());
if(owner->current) {
ret->mapper.compute(*owner->current, *pvRequest, owner->config.mapperMode);
type = ret->mapper.requested();
warning = ret->mapper.warnings();
}
if(!owner->channels.empty() && !owner->notifiedConn) {
handler = owner->handler;
owner->notifiedConn = true;
}
}
}
if(!warning.empty())
requester->message(warning, pvd::warningMessage);
if(type)
requester->channelPutConnect(pvd::Status(), ret, type);
if(type || !sts.isOK())
requester->channelPutConnect(sts, ret, type);
}catch(std::runtime_error& e){
ret.reset();
type.reset();
requester->channelPutConnect(pvd::Status::error(e.what()), ret, type);
}
if(handler) {
handler->onFirstConnect(owner);
}
return ret;
}
@@ -157,11 +188,19 @@ pva::ChannelRPC::shared_pointer SharedChannel::createChannelRPC(
pvd::PVStructure::shared_pointer const & pvRequest)
{
std::tr1::shared_ptr<SharedRPC> ret(new SharedRPC(shared_from_this(), requester, pvRequest));
ret->connected = true;
pvd::Status sts;
{
Guard G(owner->mutex);
owner->rpcs.push_back(ret.get());
if(dead) {
sts = pvd::Status::error("Dead Channel");
} else {
owner->rpcs.push_back(ret.get());
}
}
requester->channelRPCConnect(pvd::Status(), ret);
requester->channelRPCConnect(sts, ret);
return ret;
}
@@ -170,22 +209,47 @@ pva::Monitor::shared_pointer SharedChannel::createMonitor(
pvd::PVStructure::shared_pointer const & pvRequest)
{
SharedMonitorFIFO::Config mconf;
SharedPV::Handler::shared_pointer handler;
mconf.dropEmptyUpdates = owner->config.dropEmptyUpdates;
mconf.mapperMode = owner->config.mapperMode;
std::tr1::shared_ptr<SharedMonitorFIFO> ret(new SharedMonitorFIFO(shared_from_this(), requester, pvRequest, &mconf));
bool notify;
pvd::Status sts;
{
Guard G(owner->mutex);
owner->monitors.push_back(ret.get());
notify = !!owner->type;
if(notify) {
ret->open(owner->type);
// post initial update
ret->post(*owner->current, owner->valid);
if(dead) {
sts = pvd::Status::error("Dead Channel");
notify = false;
} else {
owner->monitors.push_back(ret.get());
notify = !!owner->type;
if(notify) {
ret->open(owner->type);
// post initial update
ret->post(*owner->current, owner->valid);
}
if(!owner->channels.empty() && !owner->notifiedConn) {
handler = owner->handler;
owner->notifiedConn = true;
}
}
}
if(!sts.isOK()) {
requester->monitorConnect(sts, pvd::MonitorPtr(), pvd::StructureConstPtr());
ret.reset();
} else {
if(notify) {
ret->notify();
}
if(handler) {
handler->onFirstConnect(owner);
}
}
if(notify)
ret->notify();
return ret;
}

View File

@@ -107,43 +107,55 @@ void SharedPut::put(
std::tr1::shared_ptr<SharedPV::Handler> handler;
pvd::PVStructure::shared_pointer realval;
pvd::BitSet changed;
pvd::Status sts;
{
Guard G(channel->owner->mutex);
if(pvPutStructure->getStructure()!=mapper.requested()) {
if(channel->dead) {
sts = pvd::Status::error("Dead Channel");
} else if(pvPutStructure->getStructure()!=mapper.requested()) {
requester_type::shared_pointer req(requester.lock());
if(req)
req->putDone(pvd::Status::error("Type changed"), shared_from_this());
return;
sts = pvd::Status::error("Type changed");
} else {
handler = channel->owner->handler;
realval = mapper.buildBase();
mapper.copyBaseFromRequested(*realval, changed, *pvPutStructure, *putBitSet);
}
handler = channel->owner->handler;
realval = mapper.buildBase();
mapper.copyBaseFromRequested(*realval, changed, *pvPutStructure, *putBitSet);
}
std::tr1::shared_ptr<PutOP> impl(new PutOP(shared_from_this(), pvRequest, realval, changed),
Operation::Impl::Cleanup());
if(!sts.isOK()) {
requester_type::shared_pointer req(requester.lock());
if(req)
req->putDone(sts, pva::ChannelPut::shared_pointer());
if(handler) {
Operation op(impl);
handler->onPut(channel->owner, op);
} else {
std::tr1::shared_ptr<PutOP> impl(new PutOP(shared_from_this(), pvRequest, realval, changed),
Operation::Impl::Cleanup());
if(handler) {
Operation op(impl);
handler->onPut(channel->owner, op);
}
}
}
void SharedPut::get()
{
pvd::Status sts;
pvd::PVStructurePtr current;
pvd::BitSetPtr changed;
bool emptyselect = false;
{
Guard G(channel->owner->mutex);
if(channel->owner->current) {
if(channel->dead) {
sts = pvd::Status::error("Dead Channel");
} else if(channel->owner->current) {
assert(!!mapper.requested());
current = mapper.buildRequested();
@@ -157,10 +169,10 @@ void SharedPut::get()
requester_type::shared_pointer req(requester.lock());
if(!req) return;
if(!current) {
if(!sts.isOK()) {
// no-op
} else if(!current) {
sts = pvd::Status::error("Get not possible, cache disabled");
} else if(emptyselect) {
sts = pvd::Status::warn("pvRequest with empty field mask");
}
req->getDone(sts, shared_from_this(), current, changed);

View File

@@ -152,6 +152,7 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet&
this->valid = valid;
FOR_EACH(puts_t::const_iterator, it, end, puts) {
if((*it)->channel->dead) continue;
try {
try {
(*it)->mapper.compute(*current, *(*it)->pvRequest, config.mapperMode);
@@ -165,11 +166,13 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet&
}
}
FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) {
if((*it)->connected || (*it)->channel->dead) continue;
try {
p_rpc.push_back((*it)->shared_from_this());
}catch(std::tr1::bad_weak_ptr&) {}
}
FOR_EACH(monitors_t::const_iterator, it, end, monitors) {
if((*it)->channel->dead) continue;
try {
(*it)->open(newtype);
// post initial update
@@ -179,6 +182,7 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet&
}
// consume getField
FOR_EACH(getfields_t::iterator, it, end, getfields) {
// TODO: this may be on a dead Channel
p_getfield.push_back(it->lock());
}
getfields.clear(); // consume
@@ -231,6 +235,11 @@ void SharedPV::close(bool destroy)
{
Guard I(mutex);
FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) {
if(!(*it)->connected) continue;
p_rpc.push_back((*it)->requester.lock());
}
if(type) {
p_put.reserve(puts.size());
@@ -242,9 +251,6 @@ void SharedPV::close(bool destroy)
(*it)->mapper.reset();
p_put.push_back((*it)->requester.lock());
}
FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) {
p_rpc.push_back((*it)->requester.lock());
}
FOR_EACH(monitors_t::const_iterator, it, end, monitors) {
(*it)->close();
try {

View File

@@ -84,6 +84,7 @@ SharedRPC::SharedRPC(const std::tr1::shared_ptr<SharedChannel>& channel,
:channel(channel)
,requester(requester)
,pvRequest(pvRequest)
,connected(false)
{
REFTRACE_INCREMENT(num_instances);
}
@@ -108,17 +109,30 @@ void SharedRPC::lastRequest() {}
void SharedRPC::request(epics::pvData::PVStructure::shared_pointer const & pvArgument)
{
std::tr1::shared_ptr<SharedPV::Handler> handler;
pvd::Status sts;
{
Guard G(channel->owner->mutex);
handler = channel->owner->handler;
if(channel->dead) {
sts = pvd::Status::error("Dead Channel");
} else {
handler = channel->owner->handler;
}
}
std::tr1::shared_ptr<RPCOP> impl(new RPCOP(shared_from_this(), pvRequest, pvArgument),
Operation::Impl::Cleanup());
if(!sts.isOK()) {
requester_type::shared_pointer req(requester.lock());
if(req)
req->requestDone(sts, shared_from_this(), pvd::PVStructurePtr());
if(handler) {
Operation op(impl);
handler->onRPC(channel->owner, op);
} else {
std::tr1::shared_ptr<RPCOP> impl(new RPCOP(shared_from_this(), pvRequest, pvArgument),
Operation::Impl::Cleanup());
if(handler) {
Operation op(impl);
handler->onRPC(channel->owner, op);
}
}
}

View File

@@ -31,6 +31,8 @@ struct SharedChannel : public pva::Channel,
const requester_type::weak_pointer requester;
const pva::ChannelProvider::weak_pointer provider;
bool dead; // has destroy() been called?
SharedChannel(const std::tr1::shared_ptr<SharedPV>& owner,
const pva::ChannelProvider::shared_pointer provider,
const std::string& channelName,
@@ -107,6 +109,8 @@ struct SharedRPC : public pva::ChannelRPC,
static size_t num_instances;
bool connected; // have I called requester->channelRPCConnect(Ok) ?
SharedRPC(const std::tr1::shared_ptr<SharedChannel>& channel,
const requester_type::shared_pointer& requester,
const pvd::PVStructure::const_shared_pointer &pvRequest);