This commit is contained in:
mrkraimer
2018-10-03 05:54:04 -04:00
22 changed files with 686 additions and 220 deletions

View File

@@ -42,7 +42,7 @@ namespace {
bool debugFlag = false;
string request("field(value)");
string request("field()");
string defaultProvider("pva");
enum PrintMode { ValueOnlyMode, StructureMode, TerseMode };
@@ -80,7 +80,7 @@ void printValue(std::string const & channelName, PVStructure::shared_pointer con
PVField::shared_pointer value = pv->getSubField("value");
if (value.get() == 0)
{
std::cerr << "no 'value' field\n";
//std::cerr << "no 'value' field\n";
pvutil_ostream myos(std::cout);
myos << channelName << "\n" << *(pv.get()) << "\n\n";
}

View File

@@ -11,5 +11,7 @@ pvAccess_SRCS += monitor.cpp
pvAccess_SRCS += client.cpp
pvAccess_SRCS += clientSync.cpp
pvAccess_SRCS += clientGet.cpp
pvAccess_SRCS += clientPut.cpp
pvAccess_SRCS += clientRPC.cpp
pvAccess_SRCS += clientMonitor.cpp
pvAccess_SRCS += clientInfo.cpp

View File

@@ -219,8 +219,10 @@ void register_reftrack()
// done is an optimization, duplicate calls to registerRef* are no-ops
pvac::detail::registerRefTrack();
pvac::detail::registerRefTrackGet();
pvac::detail::registerRefTrackPut();
pvac::detail::registerRefTrackMonitor();
pvac::detail::registerRefTrackRPC();
pvac::detail::registerRefTrackInfo();
}
std::tr1::shared_ptr<epics::pvAccess::Channel>

View File

@@ -17,51 +17,40 @@
#include "clientpvt.h"
#include "pv/pvAccess.h"
namespace pvd = epics::pvData;
namespace pva = epics::pvAccess;
typedef epicsGuard<epicsMutex> Guard;
typedef epicsGuardRelease<epicsMutex> UnGuard;
namespace {
using pvac::detail::CallbackGuard;
using pvac::detail::CallbackUse;
struct GetPutter : public pva::ChannelPutRequester,
public pvac::Operation::Impl,
public pvac::detail::wrapped_shared_from_this<GetPutter>
struct Getter : public pvac::detail::CallbackStorage,
public pva::ChannelGetRequester,
public pvac::Operation::Impl,
public pvac::detail::wrapped_shared_from_this<Getter>
{
mutable epicsMutex mutex;
bool started;
operation_type::shared_pointer op;
pvac::ClientChannel::GetCallback *getcb;
pvac::ClientChannel::PutCallback *putcb;
pvac::ClientChannel::GetCallback *cb;
pvac::GetEvent event;
static size_t num_instances;
explicit GetPutter(pvac::ClientChannel::GetCallback* cb) :started(false), getcb(cb), putcb(0)
explicit Getter(pvac::ClientChannel::GetCallback* cb) :cb(cb)
{REFTRACE_INCREMENT(num_instances);}
explicit GetPutter(pvac::ClientChannel::PutCallback* cb) :started(false), getcb(0), putcb(cb)
{REFTRACE_INCREMENT(num_instances);}
virtual ~GetPutter() {REFTRACE_DECREMENT(num_instances);}
virtual ~Getter() {
CallbackGuard G(*this);
cb = 0;
G.wait(); // paranoia
REFTRACE_DECREMENT(num_instances);
}
void callEvent(Guard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail)
void callEvent(CallbackGuard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail)
{
if(!putcb && !getcb) return;
if(!cb) return;
event.event = evt;
if(putcb) {
pvac::ClientChannel::PutCallback *cb=putcb;
putcb = 0;
UnGuard U(G);
cb->putDone(event);
}
if(getcb) {
pvac::ClientChannel::GetCallback *cb=getcb;
getcb = 0;
UnGuard U(G);
cb->getDone(event);
}
pvac::ClientChannel::GetCallback *C=cb;
cb = 0;
CallbackUse U(G);
C->getDone(event);
}
virtual std::string name() const OVERRIDE FINAL
@@ -74,10 +63,11 @@ struct GetPutter : public pva::ChannelPutRequester,
virtual void cancel() OVERRIDE FINAL
{
// keepalive for safety in case callback wants to destroy us
std::tr1::shared_ptr<GetPutter> keepalive(internal_shared_from_this());
Guard G(mutex);
if(started && op) op->cancel();
std::tr1::shared_ptr<Getter> keepalive(internal_shared_from_this());
CallbackGuard G(*this);
if(op) op->cancel();
callEvent(G, pvac::GetEvent::Cancel);
G.wait();
}
virtual std::string getRequesterName() OVERRIDE FINAL
@@ -86,15 +76,14 @@ struct GetPutter : public pva::ChannelPutRequester,
return op ? op->getChannel()->getRequesterName() : "<dead>";
}
virtual void channelPutConnect(
virtual void channelGetConnect(
const epics::pvData::Status& status,
pva::ChannelPut::shared_pointer const & channelPut,
pva::ChannelGet::shared_pointer const & channelGet,
epics::pvData::Structure::const_shared_pointer const & structure) OVERRIDE FINAL
{
std::tr1::shared_ptr<GetPutter> keepalive(internal_shared_from_this());
Guard G(mutex);
if(started) return;
if(!putcb && !getcb) return;
std::tr1::shared_ptr<Getter> keepalive(internal_shared_from_this());
CallbackGuard G(*this);
if(!cb) return;
if(!status.isOK()) {
event.message = status.getMessage();
@@ -104,71 +93,29 @@ struct GetPutter : public pva::ChannelPutRequester,
if(!status.isSuccess()) {
callEvent(G);
} else if(getcb){
channelPut->get();
started = true;
} else {
channelGet->get();
} else if(putcb){
pvac::ClientChannel::PutCallback *cb(putcb);
pvd::BitSet::shared_pointer tosend(new pvd::BitSet);
pvac::ClientChannel::PutCallback::Args args(*tosend);
try {
UnGuard U(G);
cb->putBuild(structure, args);
if(!args.root)
throw std::logic_error("No put value provided");
else if(*args.root->getStructure()!=*structure)
throw std::logic_error("Provided put value with wrong type");
}catch(std::exception& e){
if(putcb) {
event.message = e.what();
callEvent(G);
} else {
LOG(pva::logLevelInfo, "Lost exception in %s: %s", CURRENT_FUNCTION, e.what());
}
}
// check putcb again after UnGuard
if(putcb) {
channelPut->put(std::tr1::const_pointer_cast<pvd::PVStructure>(args.root), tosend);
started = true;
}
}
}
virtual void channelDisconnect(bool destroy) OVERRIDE FINAL
{
Guard G(mutex);
CallbackGuard G(*this);
event.message = "Disconnect";
callEvent(G);
}
virtual void putDone(
const epics::pvData::Status& status,
pva::ChannelPut::shared_pointer const & channelPut) OVERRIDE FINAL
{
std::tr1::shared_ptr<GetPutter> keepalive(internal_shared_from_this());
Guard G(mutex);
if(!putcb) return;
if(!status.isOK()) {
event.message = status.getMessage();
} else {
event.message.clear();
}
callEvent(G, status.isSuccess()? pvac::GetEvent::Success : pvac::GetEvent::Fail);
}
virtual void getDone(
const epics::pvData::Status& status,
pva::ChannelPut::shared_pointer const & channelPut,
pva::ChannelGet::shared_pointer const & channelGet,
epics::pvData::PVStructure::shared_pointer const & pvStructure,
epics::pvData::BitSet::shared_pointer const & bitSet) OVERRIDE FINAL
{
std::tr1::shared_ptr<GetPutter> keepalive(internal_shared_from_this());
Guard G(mutex);
if(!getcb) return;
std::tr1::shared_ptr<Getter> keepalive(internal_shared_from_this());
CallbackGuard G(*this);
if(!cb) return;
if(!status.isOK()) {
event.message = status.getMessage();
@@ -183,13 +130,13 @@ struct GetPutter : public pva::ChannelPutRequester,
virtual void show(std::ostream &strm) const
{
strm << "Operation(Get/Put"
strm << "Operation(Get"
"\"" << name() <<"\""
")";
}
};
size_t GetPutter::num_instances;
size_t Getter::num_instances;
} //namespace
@@ -203,42 +150,22 @@ ClientChannel::get(ClientChannel::GetCallback* cb,
if(!pvRequest)
pvRequest = pvd::createRequest("field()");
std::tr1::shared_ptr<GetPutter> ret(GetPutter::build(cb));
std::tr1::shared_ptr<Getter> ret(Getter::build(cb));
{
Guard G(ret->mutex);
ret->op = getChannel()->createChannelPut(ret->internal_shared_from_this(),
ret->op = getChannel()->createChannelGet(ret->internal_shared_from_this(),
std::tr1::const_pointer_cast<pvd::PVStructure>(pvRequest));
}
return Operation(ret);
}
Operation
ClientChannel::put(PutCallback* cb,
epics::pvData::PVStructure::const_shared_pointer pvRequest)
{
if(!impl) throw std::logic_error("Dead Channel");
if(!pvRequest)
pvRequest = pvd::createRequest("field()");
std::tr1::shared_ptr<GetPutter> ret(GetPutter::build(cb));
{
Guard G(ret->mutex);
ret->op = getChannel()->createChannelPut(ret->internal_shared_from_this(),
std::tr1::const_pointer_cast<pvd::PVStructure>(pvRequest));
}
return Operation(ret);
}
namespace detail {
void registerRefTrackGet()
{
epics::registerRefCounter("pvac::GetPutter", &GetPutter::num_instances);
epics::registerRefCounter("pvac::Getter", &Getter::num_instances);
}
}

123
src/client/clientInfo.cpp Normal file
View File

@@ -0,0 +1,123 @@
/*
* Copyright information and license terms for this software can be
* found in the file LICENSE that is included with the distribution
*/
#include <epicsMutex.h>
#include <epicsGuard.h>
#include <epicsEvent.h>
#include <pv/current_function.h>
#include <pv/pvData.h>
#include <pv/bitSet.h>
#include <pv/reftrack.h>
#define epicsExportSharedSymbols
#include "pv/logger.h"
#include "clientpvt.h"
#include "pv/pvAccess.h"
namespace {
using pvac::detail::CallbackGuard;
using pvac::detail::CallbackUse;
struct Infoer : public pvac::detail::CallbackStorage,
public pva::GetFieldRequester,
public pvac::Operation::Impl,
public pvac::detail::wrapped_shared_from_this<Infoer>
{
pvac::ClientChannel::InfoCallback *cb;
const pva::Channel::shared_pointer channel;
static size_t num_instances;
explicit Infoer(pvac::ClientChannel::InfoCallback *cb, const pva::Channel::shared_pointer& channel)
:cb(cb), channel(channel)
{REFTRACE_INCREMENT(num_instances);}
virtual ~Infoer() {
CallbackGuard G(*this);
cb = 0;
G.wait(); // paranoia
REFTRACE_DECREMENT(num_instances);
}
virtual std::string getRequesterName() OVERRIDE FINAL
{
Guard G(mutex);
return channel->getChannelName();
}
virtual void getDone(
const pvd::Status& status,
pvd::FieldConstPtr const & field) OVERRIDE FINAL
{
CallbackGuard G(*this);
pvac::ClientChannel::InfoCallback *C(cb);
cb = 0;
if(C) {
pvac::InfoEvent evt;
evt.event = status.isSuccess() ? pvac::InfoEvent::Success : pvac::InfoEvent::Fail;
evt.message = status.getMessage();
evt.type = field;
CallbackUse U(G);
C->infoDone(evt);
}
pvac::InfoEvent evt;
}
virtual std::string name() const OVERRIDE FINAL { return channel->getChannelName(); }
virtual void cancel() OVERRIDE FINAL {
CallbackGuard G(*this);
// we can't actually cancel a getField
pvac::ClientChannel::InfoCallback *C(cb);
cb = 0;
if(C) {
pvac::InfoEvent evt;
evt.event = pvac::InfoEvent::Cancel;
CallbackUse U(G);
C->infoDone(evt);
}
G.wait();
}
virtual void show(std::ostream& strm) const OVERRIDE FINAL {
strm << "Operation(Info"
"\"" << name() <<"\""
")";
}
};
size_t Infoer::num_instances;
} // namespace
namespace pvac {
Operation ClientChannel::info(InfoCallback *cb, const std::string& subfld)
{
if(!impl) throw std::logic_error("Dead Channel");
std::tr1::shared_ptr<Infoer> ret(Infoer::build(cb, getChannel()));
{
Guard G(ret->mutex);
getChannel()->getField(ret, subfld);
// getField is an oddity as it doesn't have an associated Operation class,
// and is thus largely out of our control. (eg. can't cancel)
}
return Operation(ret);
}
namespace detail {
void registerRefTrackInfo()
{
epics::registerRefCounter("pvac::Infoer", &Infoer::num_instances);
}
}
} // namespace pvac

View File

@@ -5,6 +5,7 @@
#include <epicsMutex.h>
#include <epicsGuard.h>
#include <epicsEvent.h>
#include <pv/current_function.h>
#include <pv/pvData.h>
@@ -16,17 +17,14 @@
#include "clientpvt.h"
#include "pv/pvAccess.h"
namespace pvd = epics::pvData;
namespace pva = epics::pvAccess;
typedef epicsGuard<epicsMutex> Guard;
typedef epicsGuardRelease<epicsMutex> UnGuard;
namespace pvac {
using pvac::detail::CallbackGuard;
using pvac::detail::CallbackUse;
struct Monitor::Impl : public pva::MonitorRequester,
struct Monitor::Impl : public pvac::detail::CallbackStorage,
public pva::MonitorRequester,
public pvac::detail::wrapped_shared_from_this<Monitor::Impl>
{
mutable epicsMutex mutex;
pva::Channel::shared_pointer chan;
operation_type::shared_pointer op;
bool started, done, seenEmpty;
@@ -44,9 +42,14 @@ struct Monitor::Impl : public pva::MonitorRequester,
,seenEmpty(false)
,cb(cb)
{REFTRACE_INCREMENT(num_instances);}
virtual ~Impl() {REFTRACE_DECREMENT(num_instances);}
virtual ~Impl() {
CallbackGuard G(*this);
cb = 0;
G.wait(); // paranoia
REFTRACE_DECREMENT(num_instances);
}
void callEvent(Guard& G, MonitorEvent::event_t evt = MonitorEvent::Fail)
void callEvent(CallbackGuard& G, MonitorEvent::event_t evt = MonitorEvent::Fail)
{
ClientChannel::MonitorCallback *cb=this->cb;
if(!cb) return;
@@ -57,7 +60,7 @@ struct Monitor::Impl : public pva::MonitorRequester,
this->cb = 0; // last event
try {
UnGuard U(G);
CallbackUse U(G);
cb->monitorEvent(event);
return;
}catch(std::exception& e){
@@ -71,7 +74,7 @@ struct Monitor::Impl : public pva::MonitorRequester,
}
// continues error handling
try {
UnGuard U(G);
CallbackUse U(G);
cb->monitorEvent(event);
return;
}catch(std::exception& e){
@@ -87,7 +90,7 @@ struct Monitor::Impl : public pva::MonitorRequester,
// keepalive for safety in case callback wants to destroy us
std::tr1::shared_ptr<Monitor::Impl> keepalive(internal_shared_from_this());
Guard G(mutex);
CallbackGuard G(*this);
last.reset();
@@ -98,6 +101,7 @@ struct Monitor::Impl : public pva::MonitorRequester,
temp.swap(op);
callEvent(G, MonitorEvent::Cancel);
G.wait();
}
if(temp)
temp->destroy();
@@ -115,7 +119,7 @@ struct Monitor::Impl : public pva::MonitorRequester,
pvd::StructureConstPtr const & structure) OVERRIDE FINAL
{
std::tr1::shared_ptr<Monitor::Impl> keepalive(internal_shared_from_this());
Guard G(mutex);
CallbackGuard G(*this);
if(!cb || started || done) return;
if(!status.isOK()) {
@@ -145,7 +149,7 @@ struct Monitor::Impl : public pva::MonitorRequester,
virtual void channelDisconnect(bool destroy) OVERRIDE FINAL
{
std::tr1::shared_ptr<Monitor::Impl> keepalive(internal_shared_from_this());
Guard G(mutex);
CallbackGuard G(*this);
if(!cb || done) return;
event.message = "Disconnect";
started = false;
@@ -155,7 +159,7 @@ struct Monitor::Impl : public pva::MonitorRequester,
virtual void monitorEvent(pva::MonitorPtr const & monitor) OVERRIDE FINAL
{
std::tr1::shared_ptr<Monitor::Impl> keepalive(internal_shared_from_this());
Guard G(mutex);
CallbackGuard G(*this);
if(!cb || done) return;
event.message.clear();
@@ -165,7 +169,7 @@ struct Monitor::Impl : public pva::MonitorRequester,
virtual void unlisten(pva::MonitorPtr const & monitor) OVERRIDE FINAL
{
std::tr1::shared_ptr<Monitor::Impl> keepalive(internal_shared_from_this());
Guard G(mutex);
CallbackGuard G(*this);
if(!cb || done) return;
done = true;

235
src/client/clientPut.cpp Normal file
View File

@@ -0,0 +1,235 @@
/*
* Copyright information and license terms for this software can be
* found in the file LICENSE that is included with the distribution
*/
#include <epicsMutex.h>
#include <epicsGuard.h>
#include <epicsEvent.h>
#include <pv/current_function.h>
#include <pv/pvData.h>
#include <pv/bitSet.h>
#include <pv/reftrack.h>
#define epicsExportSharedSymbols
#include "pv/logger.h"
#include "clientpvt.h"
#include "pv/pvAccess.h"
namespace {
using pvac::detail::CallbackGuard;
using pvac::detail::CallbackUse;
struct Putter : public pvac::detail::CallbackStorage,
public pva::ChannelPutRequester,
public pvac::Operation::Impl,
public pvac::detail::wrapped_shared_from_this<Putter>
{
const bool getcurrent;
bool started; // whether the put() has actually been sent. After which point we can't safely re-try.
operation_type::shared_pointer op;
pvd::StructureConstPtr puttype;
pvac::ClientChannel::PutCallback *cb;
pvac::GetEvent event;
static size_t num_instances;
Putter(pvac::ClientChannel::PutCallback* cb, bool getcurrent) :getcurrent(getcurrent), started(false), cb(cb)
{REFTRACE_INCREMENT(num_instances);}
virtual ~Putter() {
CallbackGuard G(*this);
cb = 0;
G.wait(); // paranoia
REFTRACE_DECREMENT(num_instances);
}
void callEvent(CallbackGuard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail)
{
if(!cb) return;
event.event = evt;
pvac::ClientChannel::PutCallback *C=cb;
cb = 0;
CallbackUse U(G);
C->putDone(event);
}
virtual std::string name() const OVERRIDE FINAL
{
Guard G(mutex);
return op ? op->getChannel()->getChannelName() : "<dead>";
}
// called automatically via wrapped_shared_from_this
virtual void cancel() OVERRIDE FINAL
{
// keepalive for safety in case callback wants to destroy us
std::tr1::shared_ptr<Putter> keepalive(internal_shared_from_this());
CallbackGuard G(*this);
if(started && op) op->cancel();
callEvent(G, pvac::GetEvent::Cancel);
G.wait();
}
virtual std::string getRequesterName() OVERRIDE FINAL
{
Guard G(mutex);
return op ? op->getChannel()->getRequesterName() : "<dead>";
}
virtual void channelPutConnect(
const epics::pvData::Status& status,
pva::ChannelPut::shared_pointer const & channelPut,
epics::pvData::Structure::const_shared_pointer const & structure) OVERRIDE FINAL
{
std::tr1::shared_ptr<Putter> keepalive(internal_shared_from_this());
CallbackGuard G(*this);
op = channelPut; // we may be called before createChannelPut() has returned.
puttype = structure;
if(started || !cb) return;
if(!status.isOK()) {
event.message = status.getMessage();
} else {
event.message.clear();
}
if(!status.isSuccess()) {
callEvent(G);
} else if(getcurrent) {
// fetch a previous value first
op->get();
} else {
// build Put value immediately
pvd::BitSet empty;
pvd::BitSet::shared_pointer tosend(new pvd::BitSet);
pvac::ClientChannel::PutCallback::Args args(*tosend, empty);
// args.previous = 0; // implied
doPut(G, args, channelPut, tosend);
}
}
virtual void channelDisconnect(bool destroy) OVERRIDE FINAL
{
CallbackGuard G(*this);
event.message = "Disconnect";
callEvent(G);
}
void doPut(CallbackGuard& G,
pvac::ClientChannel::PutCallback::Args& args,
pva::ChannelPut::shared_pointer const & channelPut,
const pvd::BitSet::shared_pointer& tosend)
{
try {
pvac::ClientChannel::PutCallback *C(cb);
CallbackUse U(G);
C->putBuild(puttype, args);
if(!args.root)
throw std::logic_error("No put value provided");
else if(*args.root->getStructure()!=*puttype)
throw std::logic_error("Provided put value with wrong type");
}catch(std::exception& e){
if(cb) {
event.message = e.what();
callEvent(G);
} else {
LOG(pva::logLevelInfo, "Lost exception in %s: %s", CURRENT_FUNCTION, e.what());
}
}
// check cb again after UnGuard
if(cb) {
started = true;
channelPut->put(std::tr1::const_pointer_cast<pvd::PVStructure>(args.root), tosend);
}
}
virtual void getDone(
const epics::pvData::Status& status,
pva::ChannelPut::shared_pointer const & channelPut,
epics::pvData::PVStructure::shared_pointer const & pvStructure,
epics::pvData::BitSet::shared_pointer const & bitSet) OVERRIDE FINAL
{
std::tr1::shared_ptr<Putter> keepalive(internal_shared_from_this());
CallbackGuard G(*this);
if(!cb) return;
if(!status.isOK()) {
event.message = status.getMessage();
callEvent(G, pvac::GetEvent::Fail);
} else {
pvd::BitSet::shared_pointer tosend(new pvd::BitSet);
pvac::ClientChannel::PutCallback::Args args(*tosend, *bitSet);
args.previous = pvStructure;
doPut(G, args, channelPut, tosend);
}
}
virtual void putDone(
const epics::pvData::Status& status,
pva::ChannelPut::shared_pointer const & channelPut) OVERRIDE FINAL
{
std::tr1::shared_ptr<Putter> keepalive(internal_shared_from_this());
CallbackGuard G(*this);
if(!cb) return;
if(!status.isOK()) {
event.message = status.getMessage();
} else {
event.message.clear();
}
callEvent(G, status.isSuccess()? pvac::GetEvent::Success : pvac::GetEvent::Fail);
}
virtual void show(std::ostream &strm) const
{
strm << "Operation(Put"
"\"" << name() <<"\""
")";
}
};
size_t Putter::num_instances;
} //namespace
namespace pvac {
Operation
ClientChannel::put(PutCallback* cb,
epics::pvData::PVStructure::const_shared_pointer pvRequest,
bool getcurrent)
{
if(!impl) throw std::logic_error("Dead Channel");
if(!pvRequest)
pvRequest = pvd::createRequest("field()");
std::tr1::shared_ptr<Putter> ret(Putter::build(cb, getcurrent));
{
Guard G(ret->mutex);
ret->op = getChannel()->createChannelPut(ret->internal_shared_from_this(),
std::tr1::const_pointer_cast<pvd::PVStructure>(pvRequest));
}
return Operation(ret);
}
namespace detail {
void registerRefTrackPut()
{
epics::registerRefCounter("pvac::Putter", &Putter::num_instances);
}
}
}//namespace pvac

View File

@@ -5,6 +5,7 @@
#include <epicsMutex.h>
#include <epicsGuard.h>
#include <epicsEvent.h>
#include <pv/pvData.h>
#include <pv/bitSet.h>
@@ -15,19 +16,15 @@
#include "clientpvt.h"
#include "pv/pvAccess.h"
namespace pvd = epics::pvData;
namespace pva = epics::pvAccess;
typedef epicsGuard<epicsMutex> Guard;
typedef epicsGuardRelease<epicsMutex> UnGuard;
namespace {
using pvac::detail::CallbackGuard;
using pvac::detail::CallbackUse;
struct RPCer : public pva::ChannelRPCRequester,
struct RPCer : public pvac::detail::CallbackStorage,
public pva::ChannelRPCRequester,
public pvac::Operation::Impl,
public pvac::detail::wrapped_shared_from_this<RPCer>
{
mutable epicsMutex mutex;
bool started;
operation_type::shared_pointer op;
@@ -41,9 +38,14 @@ struct RPCer : public pva::ChannelRPCRequester,
RPCer(pvac::ClientChannel::GetCallback* cb,
const pvd::PVStructure::const_shared_pointer& args) :started(false), cb(cb), args(args)
{REFTRACE_INCREMENT(num_instances);}
virtual ~RPCer() {REFTRACE_DECREMENT(num_instances);}
virtual ~RPCer() {
CallbackGuard G(*this);
cb = 0;
G.wait(); // paranoia
REFTRACE_DECREMENT(num_instances);
}
void callEvent(Guard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail)
void callEvent(CallbackGuard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail)
{
pvac::ClientChannel::GetCallback *cb=this->cb;
if(!cb) return;
@@ -53,24 +55,11 @@ struct RPCer : public pva::ChannelRPCRequester,
this->cb = 0;
try {
UnGuard U(G);
CallbackUse U(G);
cb->getDone(event);
return;
}catch(std::exception& e){
if(!this->cb || evt==pvac::GetEvent::Fail) {
LOG(pva::logLevelError, "Unhandled exception in ClientChannel::GetCallback::getDone(): %s", e.what());
} else {
event.event = pvac::GetEvent::Fail;
event.message = e.what();
}
}
// continues error handling
try {
UnGuard U(G);
cb->getDone(event);
return;
}catch(std::exception& e){
LOG(pva::logLevelError, "Unhandled exception following exception in ClientChannel::GetCallback::monitorEvent(): %s", e.what());
LOG(pva::logLevelError, "Unhandled exception in ClientChannel::RPCCallback::requestDone(): %s", e.what());
}
}
@@ -84,7 +73,7 @@ struct RPCer : public pva::ChannelRPCRequester,
virtual void cancel()
{
std::tr1::shared_ptr<RPCer> keepalive(internal_shared_from_this());
Guard G(mutex);
CallbackGuard G(*this);
if(started && op) op->cancel();
callEvent(G, pvac::GetEvent::Cancel);
}
@@ -100,7 +89,7 @@ struct RPCer : public pva::ChannelRPCRequester,
pva::ChannelRPC::shared_pointer const & operation)
{
std::tr1::shared_ptr<RPCer> keepalive(internal_shared_from_this());
Guard G(mutex);
CallbackGuard G(*this);
if(!cb || started) return;
if(!status.isOK()) {
@@ -120,7 +109,7 @@ struct RPCer : public pva::ChannelRPCRequester,
virtual void channelDisconnect(bool destroy) OVERRIDE FINAL
{
std::tr1::shared_ptr<RPCer> keepalive(internal_shared_from_this());
Guard G(mutex);
CallbackGuard G(*this);
event.message = "Disconnect";
callEvent(G);
@@ -132,7 +121,7 @@ struct RPCer : public pva::ChannelRPCRequester,
epics::pvData::PVStructure::shared_pointer const & pvResponse)
{
std::tr1::shared_ptr<RPCer> keepalive(internal_shared_from_this());
Guard G(mutex);
CallbackGuard G(*this);
if(!cb) return;
if(!status.isOK()) {

View File

@@ -376,5 +376,51 @@ ClientChannel::monitor(const epics::pvData::PVStructure::const_shared_pointer &p
return MonitorSync(mon, simpl);
}
namespace {
struct InfoWait : public pvac::ClientChannel::InfoCallback,
public WaitCommon
{
pvac::InfoEvent result;
InfoWait() {}
virtual ~InfoWait() {}
virtual void infoDone(const pvac::InfoEvent& evt) OVERRIDE FINAL
{
{
Guard G(mutex);
if(done) {
LOG(pva::logLevelWarn, "oops, double event to InfoCallback");
} else {
result = evt;
done = true;
}
}
event.signal();
}
};
} // namespace
epics::pvData::FieldConstPtr
ClientChannel::info(double timeout, const std::string& subfld)
{
InfoWait waiter;
{
Operation op(info(&waiter, subfld));
waiter.wait(timeout);
}
switch(waiter.result.event) {
case InfoEvent::Success:
return waiter.result.type;
case InfoEvent::Fail:
throw std::runtime_error(waiter.result.message);
default:
case InfoEvent::Cancel: // cancel implies timeout, which should already be thrown
THROW_EXCEPTION2(std::logic_error, "Cancelled!?!?");
}
}
}//namespace pvac

View File

@@ -1,10 +1,18 @@
#ifndef CLIENTPVT_H
#define CLIENTPVT_H
#include <epicsEvent.h>
#include <epicsThread.h>
#include <pv/sharedPtr.h>
#include <pva/client.h>
namespace pvd = epics::pvData;
namespace pva = epics::pvAccess;
typedef epicsGuard<epicsMutex> Guard;
typedef epicsGuardRelease<epicsMutex> UnGuard;
namespace pvac{namespace detail{
/* Like std::tr1::enable_shared_from_this
* with the notion of internal vs. external references.
@@ -65,10 +73,95 @@ public:
}
};
/** Safe use of raw callback pointer while unlocked.
* clear pointer and then call CallbackGuard::wait() to ensure that concurrent
* callback have completed.
*
* Prototype usage
@code
* struct mycb : public CallbackStorage {
* void (*ptr)();
* };
* // make a callback
* void docb(mycb& cb) {
* CallbackGuard G(cb); // lock
* // decide whether to make CB
* if(P){
* void (*P)() = ptr; // copy for use while unlocked
* CallbackUse U(G); // unlock
* (*P)();
* // automatic re-lock
* }
* // automatic final unlock
* }
* void cancelop(mycb& cb) {
* CallbackGuard G(cb);
* ptr = 0; // prevent further callbacks from starting
* G.wait(); // wait for inprogress callbacks to complete
* }
@endcode
*/
struct CallbackStorage {
mutable epicsMutex mutex;
epicsEvent wakeup;
size_t nwaitcb;
epicsThreadId incb;
CallbackStorage() :nwaitcb(0u), incb(0) {}
};
// analogous to epicsGuard
struct CallbackGuard {
CallbackStorage& store;
epicsThreadId self;
explicit CallbackGuard(CallbackStorage& store) :store(store), self(0) {
store.mutex.lock();
}
~CallbackGuard() {
bool notify = store.nwaitcb!=0;
store.mutex.unlock();
if(notify)
store.wakeup.signal();
}
void ensureself() {
if(!self)
self = epicsThreadGetIdSelf();
}
// unlock and block until no in-progress callbacks
void wait() {
if(!store.incb) return;
ensureself();
store.nwaitcb++;
while(store.incb && store.incb!=self) {
store.mutex.unlock();
store.wakeup.wait();
store.mutex.lock();
}
store.nwaitcb--;
}
};
// analogous to epicsGuardRelease
struct CallbackUse {
CallbackGuard& G;
explicit CallbackUse(CallbackGuard& G) :G(G) {
G.wait(); // serialize callbacks
G.ensureself();
G.store.incb=G.self;
G.store.mutex.unlock();
}
~CallbackUse() {
G.store.mutex.lock();
G.store.incb=0;
}
};
void registerRefTrack();
void registerRefTrackGet();
void registerRefTrackPut();
void registerRefTrackMonitor();
void registerRefTrackRPC();
void registerRefTrackInfo();
}} // namespace pvac::detail

View File

@@ -85,7 +85,7 @@ protected:
};
//! Information on put completion
struct PutEvent
struct epicsShareClass PutEvent
{
enum event_t {
Fail, //!< request ends in failure. Check message
@@ -105,6 +105,12 @@ struct epicsShareClass GetEvent : public PutEvent
epics::pvData::BitSet::const_shared_pointer valid;
};
struct epicsShareClass InfoEvent : public PutEvent
{
//! Type description resulting from getField operation. NULL unless event==Success
epics::pvData::FieldConstPtr type;
};
struct MonitorSync;
//! Handle for monitor subscription
@@ -344,9 +350,20 @@ public:
struct PutCallback {
virtual ~PutCallback() {}
struct Args {
Args(epics::pvData::BitSet& tosend) :tosend(tosend) {}
Args(epics::pvData::BitSet& tosend, epics::pvData::BitSet& previousmask) :tosend(tosend), previousmask(previousmask) {}
//! Callee must fill this in with an instance of the Structure passed as the 'build' argument.
epics::pvData::PVStructure::const_shared_pointer root;
//! Callee must set bits corresponding to the fields of 'root' which will actually be sent.
epics::pvData::BitSet& tosend;
//! A previous value of the PV being "put" when put(..., getprevious=true). eg. use to find enumeration value.
//! Otherwise NULL.
//! @note The value of the PV may change between the point where "previous" is fetched,
//! and when this Put operation completes.
//! @since 6.1.0 Added after 6.0.0
epics::pvData::PVStructure::const_shared_pointer previous;
//! Bit mask indicating those fields of 'previous' which have been set by the server. (others have local defaults)
//! Unused if previous==NULL.
const epics::pvData::BitSet& previousmask;
};
/** Server provides expected structure.
*
@@ -362,8 +379,13 @@ public:
//! Initiate request to change PV
//! @param cb Completion notification callback. Must outlive Operation (call Operation::cancel() to force release)
//! @param pvRequest if NULL defaults to "field()".
//! @param getprevious If true, fetch a previous value of the PV and make
//! this available as PutCallback::Args::previous and previousmask.
//! If false, then previous=NULL
Operation put(PutCallback* cb,
epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer());
epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer(),
bool getcurrent = false);
//! Synchronious put operation
inline
@@ -400,6 +422,20 @@ public:
MonitorSync monitor(const epics::pvData::PVStructure::const_shared_pointer& pvRequest = epics::pvData::PVStructure::const_shared_pointer(),
epicsEvent *event =0);
struct InfoCallback {
virtual ~InfoCallback() {}
//! getField operation is complete
virtual void infoDone(const InfoEvent& evt) =0;
};
//! Request PV type info.
//! @note This type may not be the same as the types used in the get/put/monitor operations.
Operation info(InfoCallback *cb, const std::string& subfld = std::string());
//! Synchronious getField opreation
epics::pvData::FieldConstPtr info(double timeout = 3.0,
const std::string& subfld = std::string());
//! Connection state change CB
struct ConnectCallback {
virtual ~ConnectCallback() {}

View File

@@ -39,9 +39,6 @@ BlockingUDPTransport::shared_pointer BlockingUDPConnector::connect(std::tr1::sha
ResponseHandler::shared_pointer const & responseHandler, osiSockAddr& bindAddress,
int8 transportRevision, int16 /*priority*/) {
LOG(logLevelDebug, "Creating datagram socket to: %s.",
inetAddressToString(bindAddress).c_str());
SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if(socket==INVALID_SOCKET) {
char errStr[64];

View File

@@ -81,6 +81,8 @@ BlockingUDPTransport::BlockingUDPTransport(bool serverFlag,
char strBuffer[64];
sockAddrToDottedIP(&_remoteAddress.sa, strBuffer, sizeof(strBuffer));
_remoteName = strBuffer;
LOG(logLevelDebug, "Creating datagram socket from: %s.",
_remoteName.c_str());
}
REFTRACE_INCREMENT(num_instances);
@@ -398,8 +400,8 @@ bool BlockingUDPTransport::send(const char* buffer, size_t length, const osiSock
{
if (IS_LOGGABLE(logLevelDebug))
{
LOG(logLevelDebug, "Sending %zu bytes to %s.",
length, inetAddressToString(address).c_str());
LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
length, _remoteName.c_str(), inetAddressToString(address).c_str());
}
int retval = sendto(_channel, buffer,
@@ -422,8 +424,8 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, const osiSockAddr& address)
if (IS_LOGGABLE(logLevelDebug))
{
LOG(logLevelDebug, "Sending %zu bytes to %s.",
buffer->getRemaining(), inetAddressToString(address).c_str());
LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(address).c_str());
}
int retval = sendto(_channel, buffer->getArray(),
@@ -459,8 +461,8 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, InetAddressType target) {
if (IS_LOGGABLE(logLevelDebug))
{
LOG(logLevelDebug, "Sending %zu bytes to %s.",
buffer->getRemaining(), inetAddressToString(_sendAddresses[i]).c_str());
LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(_sendAddresses[i]).c_str());
}
int retval = sendto(_channel, buffer->getArray(),

View File

@@ -1398,7 +1398,7 @@ public:
}
void send(ByteBuffer* buffer, TransportSendControl* control) {
control->startMessage((int8)5, 0);
control->startMessage(CMD_AUTHNZ, 0);
SerializationHelper::serializeFull(buffer, control, _data);
// send immediately
control->flush(true);

View File

@@ -339,7 +339,7 @@ public:
}
virtual void processControlMessage() OVERRIDE FINAL {
if (_command == 2)
if (_command == CMD_SET_ENDIANESS)
{
// check 7-th bit
setByteOrder(_flags < 0 ? EPICS_ENDIAN_BIG : EPICS_ENDIAN_LITTLE);

View File

@@ -62,7 +62,7 @@ void BeaconEmitter::send(ByteBuffer* buffer, TransportSendControl* control)
}
// send beacon
control->startMessage((int8)0, 12+2+2+16+2);
control->startMessage((int8)CMD_BEACON, 12+2+2+16+2);
buffer->put(_guid.value, 0, sizeof(_guid.value));

View File

@@ -47,7 +47,7 @@ struct Operation;
/** A Shared State Process Variable (PV)
*
* "Shared" in the sense that all clients/subscribers interact with the
* same PVStructure.
* same PVStructure (excluding the RPC operation).
*
* @warning For the purposes of locking, this class is an Operation (see @ref provider_roles_requester_locking).
* eg. no locks may be held when calling post(), open(), close(), or connect().
@@ -62,7 +62,7 @@ struct Operation;
* Calling close() will close all currently opened client channels.
*
* Client channels, and operations on them, may be initiated at any time (via connect()).
* However, operations will not be fully created until open() is called.
* However, operations other than RPC will not proceed until open() is called.
*
* @note A SharedPV does not have a name. Name(s) are associated with a SharedPV
* By a Provider (StaticProvider, DynamicProvider, or any epics::pvAccess::ChannelProvider).
@@ -143,6 +143,10 @@ public:
//! If destory=true, the internal client list is cleared.
//! @post In the closed state
//! @note Provider locking rules apply (@see provider_roles_requester_locking).
//!
//! close() is not final, even with destroy=true new clients may begin connecting, and open() may be called again.
//! A final close() should be performed after the removal from StaticProvider/DynamicProvider
//! which will prevent new clients.
virtual void close(bool destroy=false);
//! Create a new container which may be used to prepare to call post().

View File

@@ -443,7 +443,7 @@ void ServerChannelFindRequesterImpl::channelFindResult(const Status& /*status*/,
void ServerChannelFindRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control)
{
control->startMessage((int8)4, 12+4+16+2);
control->startMessage(CMD_SEARCH_RESPONSE, 12+4+16+2);
Lock guard(_mutex);
buffer->put(_guid.value, 0, sizeof(_guid.value));
@@ -1802,7 +1802,7 @@ void ServerChannelPutGetRequesterImpl::send(ByteBuffer* buffer, TransportSendCon
return;
}
control->startMessage((int32)12, sizeof(int32)/sizeof(int8) + 1);
control->startMessage(CMD_PUT_GET, sizeof(int32)/sizeof(int8) + 1);
buffer->putInt(_ioid);
buffer->putByte((int8)request);
{

View File

@@ -87,12 +87,17 @@ struct StaticProvider::Impl : public pva::ChannelProvider
pva::Channel::shared_pointer ret;
pvd::Status sts;
builders_t::mapped_type builder;
{
Guard G(mutex);
builders_t::const_iterator it(builders.find(name));
if(it!=builders.end())
ret = it->second->connect(Impl::shared_pointer(internal_self), name, requester);
if(it!=builders.end()) {
UnGuard U(G);
builder = it->second;
}
}
if(builder)
ret = builder->connect(Impl::shared_pointer(internal_self), name, requester);
if(!ret) {
sts = pvd::Status::error("No such channel");

View File

@@ -51,7 +51,7 @@ SharedChannel::SharedChannel(const std::tr1::shared_ptr<SharedPV> &owner,
if(owner->channels.empty())
handler = owner->handler;
owner->channels.push_back(this);
owner->notifiedConn = !!handler;
owner->notifiedConn = true;
}
if(handler) {
handler->onFirstConnect(owner);
@@ -157,14 +157,11 @@ pva::ChannelRPC::shared_pointer SharedChannel::createChannelRPC(
pvd::PVStructure::shared_pointer const & pvRequest)
{
std::tr1::shared_ptr<SharedRPC> ret(new SharedRPC(shared_from_this(), requester, pvRequest));
bool opened;
{
Guard G(owner->mutex);
owner->rpcs.push_back(ret.get());
opened = !!owner->type;
}
if(opened)
requester->channelRPCConnect(pvd::Status(), ret);
requester->channelRPCConnect(pvd::Status(), ret);
return ret;
}

View File

@@ -129,6 +129,8 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet&
typedef std::vector<std::tr1::shared_ptr<pva::GetFieldRequester> > xgetfields_t;
const pvd::StructureConstPtr newtype(value.getStructure());
pvd::PVStructurePtr newvalue(pvd::getPVDataCreate()->createPVStructure(newtype));
newvalue->copyUnchecked(value, valid);
xputs_t p_put;
xrpcs_t p_rpc;
@@ -145,9 +147,8 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet&
p_monitor.reserve(monitors.size());
p_getfield.reserve(getfields.size());
type = value.getStructure();
current = pvd::getPVDataCreate()->createPVStructure(newtype);
current->copyUnchecked(value);
type = newtype;
current = newvalue;
this->valid = valid;
FOR_EACH(puts_t::const_iterator, it, end, puts) {
@@ -230,35 +231,36 @@ void SharedPV::close(bool destroy)
{
Guard I(mutex);
if(!type)
return;
if(type) {
p_put.reserve(puts.size());
p_rpc.reserve(rpcs.size());
p_monitor.reserve(monitors.size());
p_channel.reserve(channels.size());
p_put.reserve(puts.size());
p_rpc.reserve(rpcs.size());
p_monitor.reserve(monitors.size());
p_channel.reserve(channels.size());
FOR_EACH(puts_t::const_iterator, it, end, puts) {
(*it)->mapper.reset();
p_put.push_back((*it)->requester.lock());
}
FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) {
p_rpc.push_back((*it)->requester.lock());
}
FOR_EACH(monitors_t::const_iterator, it, end, monitors) {
(*it)->close();
try {
p_monitor.push_back((*it)->shared_from_this());
}catch(std::tr1::bad_weak_ptr&) { /* ignore, racing dtor */ }
}
FOR_EACH(channels_t::const_iterator, it, end, channels) {
try {
p_channel.push_back((*it)->shared_from_this());
}catch(std::tr1::bad_weak_ptr&) { /* ignore, racing dtor */ }
FOR_EACH(puts_t::const_iterator, it, end, puts) {
(*it)->mapper.reset();
p_put.push_back((*it)->requester.lock());
}
FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) {
p_rpc.push_back((*it)->requester.lock());
}
FOR_EACH(monitors_t::const_iterator, it, end, monitors) {
(*it)->close();
try {
p_monitor.push_back((*it)->shared_from_this());
}catch(std::tr1::bad_weak_ptr&) { /* ignore, racing dtor */ }
}
FOR_EACH(channels_t::const_iterator, it, end, channels) {
try {
p_channel.push_back((*it)->shared_from_this());
}catch(std::tr1::bad_weak_ptr&) { /* ignore, racing dtor */ }
}
type.reset();
current.reset();
}
type.reset();
current.reset();
if(destroy) {
// forget about all clients, to prevent the possibility of our
// sending a second destroy notification.

View File

@@ -57,11 +57,13 @@ bool decodeAsIPv6Address(ByteBuffer* buffer, osiSockAddr* address) {
// allow all zeros address
//if (ffff != (int16)0xFFFF) return false;
uint32_t ipv4Addr =
((uint32_t)(buffer->getByte()&0xFF))<<24 |
((uint32_t)(buffer->getByte()&0xFF))<<16 |
((uint32_t)(buffer->getByte()&0xFF))<<8 |
((uint32_t)(buffer->getByte()&0xFF));
uint32 ipv4Addr = uint8(buffer->getByte());
ipv4Addr <<= 8;
ipv4Addr |= uint8(buffer->getByte());
ipv4Addr <<= 8;
ipv4Addr |= uint8(buffer->getByte());
ipv4Addr <<= 8;
ipv4Addr |= uint8(buffer->getByte());
if (ffff != (int16)0xFFFF && ipv4Addr != (uint32_t)0)
return false;