update RPCClient

Addition ctor to use specific Provider (w/ custom config).
Start connect immediately.
Remove need to issueConnect()/waitConnect().
This commit is contained in:
Michael Davidsaver
2017-06-08 20:35:43 +02:00
parent fabb85c5e3
commit 3e37781d85
3 changed files with 266 additions and 276 deletions

View File

@@ -7,8 +7,10 @@
#include <iostream>
#include <string>
#include <epicsEvent.h>
#include <pv/pvData.h>
#include <pv/event.h>
#include <pv/current_function.h>
#define epicsExportSharedSymbols
#include <pv/pvAccess.h>
@@ -18,10 +20,39 @@
#include "pv/rpcClient.h"
#if 0
# define TRACE(msg) std::cerr<<"TRACE: "<<CURRENT_FUNCTION<<" : "<< msg <<"\n"
#else
# define TRACE(msg)
#endif
using namespace epics::pvData;
namespace TR1 = std::tr1;
using std::string;
namespace pvd = epics::pvData;
namespace pva = epics::pvAccess;
namespace {
struct DummyChannelRequester : public pva::ChannelRequester
{
POINTER_DEFINITIONS(DummyChannelRequester);
pvd::Mutex mutex;
pvd::Status status;
DummyChannelRequester()
:status(pvd::Status::error("Never created"))
{}
virtual ~DummyChannelRequester() {}
virtual std::string getRequesterName() { return "DummyChannelRequester"; }
virtual void channelCreated(const pvd::Status& status, pva::Channel::shared_pointer const & channel) {
TRACE("status="<<status);
pvd::Lock L(mutex);
this->status = status;
}
virtual void channelStateChange(pva::Channel::shared_pointer const & channel, pva::Channel::ConnectionState connectionState) {}
};
}// namespace
namespace epics
{
@@ -30,181 +61,86 @@ namespace pvAccess
{
class ChannelAndRPCRequesterImpl :
public TR1::enable_shared_from_this<ChannelAndRPCRequesterImpl>,
public virtual epics::pvAccess::ChannelRequester,
public virtual epics::pvAccess::ChannelRPCRequester
struct RPCClient::RPCRequester : public pva::ChannelRPCRequester
{
private:
Mutex m_mutex;
Event m_event;
Event m_connectionEvent;
POINTER_DEFINITIONS(RPCRequester);
Status m_status;
PVStructure::shared_pointer m_response;
ChannelRPC::shared_pointer m_channelRPC;
PVStructure::shared_pointer m_pvRequest;
pvd::Mutex mutex;
ChannelRPC::weak_pointer op;
pvd::Status conn_status, resp_status;
epics::pvData::PVStructure::shared_pointer next_args, last_data;
epicsEvent event;
bool inprogress, last;
public:
RPCRequester()
:conn_status(pvd::Status::error("Never connected"))
,resp_status(pvd::Status::error("Never connected"))
,inprogress(false)
,last(false)
{}
virtual ~RPCRequester() {}
ChannelAndRPCRequesterImpl(PVStructure::shared_pointer const & pvRequest)
: m_pvRequest(pvRequest)
{
}
virtual string getRequesterName()
{
return "ChannelAndRPCRequesterImpl";
}
virtual void message(std::string const & message, MessageType messageType)
{
std::cerr << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl;
}
void channelCreated(
const epics::pvData::Status& status,
Channel::shared_pointer const & channel)
{
if (status.isSuccess())
{
// show warning
if (!status.isOK())
{
std::cerr << "[" << channel->getChannelName() << "] channel create: " << status << std::endl;
}
}
else
{
std::cerr << "[" << channel->getChannelName() << "] failed to create a channel: " << status << std::endl;
{
Lock lock(m_mutex);
m_status = status;
}
m_connectionEvent.signal();
}
}
void channelStateChange(
Channel::shared_pointer const & channel,
Channel::ConnectionState connectionState)
{
if (connectionState == Channel::CONNECTED)
{
bool rpcAlreadyConnectedOnce = false;
{
Lock lock(m_mutex);
rpcAlreadyConnectedOnce = (m_channelRPC.get() != 0);
}
if (!rpcAlreadyConnectedOnce)
{
channel->createChannelRPC(shared_from_this(), m_pvRequest);
}
}
/*
else if (connectionState != Channel::DESTROYED)
{
std::cerr << "[" << channel->getChannelName() << "] channel state change: " << Channel::ConnectionStateNames[connectionState] << std::endl;
}
*/
}
virtual std::string getRequesterName() { return "RPCClient::RPCRequester"; }
virtual void channelRPCConnect(
const epics::pvData::Status & status,
ChannelRPC::shared_pointer const & channelRPC)
const pvd::Status& status,
ChannelRPC::shared_pointer const & operation)
{
if (status.isSuccess())
bool lastreq, inprog;
pvd::PVStructure::shared_pointer args;
{
if (!status.isOK())
std::cerr << "[" << channelRPC->getChannel()->getChannelName() << "] channel RPC create: " << status << std::endl;
pvd::Lock L(mutex);
TRACE("status="<<status);
op = operation;
conn_status = status;
args.swap(next_args);
lastreq = last;
inprog = inprogress;
if(inprog && args)
TRACE("request deferred: "<<args);
}
else
{
std::cerr << "[" << channelRPC->getChannel()->getChannelName() << "] failed to create channel RPC: " << status << std::endl;
if(inprog && args) {
TRACE("request deferred: "<<args);
if(lastreq)
operation->lastRequest();
operation->request(args);
}
{
Lock lock(m_mutex);
m_status = status;
m_channelRPC = channelRPC;
}
m_connectionEvent.signal();
event.signal();
}
virtual void requestDone(
const epics::pvData::Status & status,
ChannelRPC::shared_pointer const & channelRPC,
epics::pvData::PVStructure::shared_pointer const & pvResponse)
const pvd::Status& status,
ChannelRPC::shared_pointer const & operation,
pvd::PVStructure::shared_pointer const & pvResponse)
{
if (status.isSuccess())
TRACE("status="<<status<<" response:\n"<<pvResponse<<"\n");
{
if (!status.isOK())
std::cerr << "[" << channelRPC->getChannel()->getChannelName() << "] channel RPC: " << status << std::endl;
pvd::Lock L(mutex);
if(!inprogress) {
std::cerr<<"pva provider give RPC requestDone() when no request in progress\n";
} else {
resp_status = status;
last_data = pvResponse;
if(resp_status.isSuccess() && !last_data) {
resp_status = pvd::Status::error("No reply data");
}
inprogress = false;
}
}
else
event.signal();
}
virtual void channelDisconnect(bool destroy)
{
TRACE("destroy="<<destroy);
{
std::cerr << "[" << channelRPC->getChannel()->getChannelName() << "] failed to RPC: " << status << std::endl;
pvd::Lock L(mutex);
resp_status = conn_status = pvd::Status::error("Connection lost");
last_data.reset();
next_args.reset();
inprogress = false;
}
{
Lock lock(m_mutex);
m_status = status;
m_response = pvResponse;
}
m_event.signal();
}
bool waitForResponse(double timeOut)
{
return m_event.wait(timeOut);
}
bool waitUntilRPCConnected(double timeOut)
{
if (isRPCConnected())
return true;
return m_connectionEvent.wait(timeOut);
}
bool isRPCConnected()
{
Lock lock(m_mutex);
return (m_channelRPC.get() != 0);
}
PVStructure::shared_pointer & getResponse()
{
Lock lock(m_mutex);
return m_response;
}
Status & getStatus()
{
Lock lock(m_mutex);
return m_status;
}
void request(PVStructure::shared_pointer const & pvArgument, bool lastRequest)
{
ChannelRPC::shared_pointer rpc;
{
Lock lock(m_mutex);
rpc = m_channelRPC;
}
if (!rpc)
throw std::runtime_error("channel RPC not connected");
if (lastRequest)
rpc->lastRequest();
rpc->request(pvArgument);
event.signal();
}
};
@@ -213,18 +149,47 @@ public:
RPCClient::RPCClient(const std::string & serviceName,
PVStructure::shared_pointer const & pvRequest)
: m_serviceName(serviceName), m_pvRequest(pvRequest)
pvd::PVStructure::shared_pointer const & pvRequest)
: m_serviceName(serviceName), m_pvRequest(pvRequest ? pvRequest : pvd::createRequest(""))
{
ClientFactory::start();
ChannelProvider::shared_pointer provider(getChannelProviderRegistry()->getProvider("pva"));
if(!provider)
throw std::logic_error("Unknown Provider");
construct(provider, serviceName, pvRequest);
}
RPCClient::RPCClient(const ChannelProvider::shared_pointer& provider,
const std::string & serviceName,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
construct(provider, serviceName, pvRequest);
}
void RPCClient::construct(const ChannelProvider::shared_pointer& provider,
const std::string & serviceName,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
assert(provider);
DummyChannelRequester::shared_pointer dummy(new DummyChannelRequester);
m_channel = provider->createChannel(serviceName, dummy);
{
pvd::Lock L(dummy->mutex);
if(!dummy->status.isSuccess())
throw std::runtime_error(dummy->status.getMessage());
}
if(!m_channel)
throw std::logic_error("provider createChannel() succeeds w/ NULL Channel");
m_rpc_requester.reset(new RPCRequester);
m_rpc = m_channel->createChannelRPC(m_rpc_requester, m_pvRequest);
if(!m_rpc)
throw std::logic_error("channel createChannelRPC() NULL");
}
void RPCClient::destroy()
{
if (m_channel)
@@ -232,43 +197,43 @@ void RPCClient::destroy()
m_channel->destroy();
m_channel.reset();
}
if (m_rpc)
{
m_rpc->destroy();
m_rpc.reset();
}
}
bool RPCClient::connect(double timeout)
{
if (m_channel &&
TR1::dynamic_pointer_cast<ChannelAndRPCRequesterImpl>(m_channel->getChannelRequester())->isRPCConnected())
return true;
issueConnect();
return waitConnect(timeout);
}
void RPCClient::issueConnect()
{
ChannelProvider::shared_pointer provider = getChannelProviderRegistry()->getProvider("pva");
// TODO try to reuse ChannelRequesterImpl instance (i.e. create only once)
TR1::shared_ptr<ChannelAndRPCRequesterImpl> channelRequesterImpl(new ChannelAndRPCRequesterImpl(m_pvRequest));
m_channel = provider->createChannel(m_serviceName, channelRequesterImpl);
}
bool RPCClient::waitConnect(double timeout)
{
if (!m_channel)
throw std::runtime_error("issueConnect() must be called before waitConnect()");
TR1::shared_ptr<ChannelAndRPCRequesterImpl> channelRequesterImpl =
TR1::dynamic_pointer_cast<ChannelAndRPCRequesterImpl>(m_channel->getChannelRequester());
return channelRequesterImpl->waitUntilRPCConnected(timeout) &&
channelRequesterImpl->isRPCConnected();
pvd::Lock L(m_rpc_requester->mutex);
TRACE("timeout="<<timeout);
while(!m_rpc_requester->conn_status.isSuccess()) {
L.unlock();
if(!m_rpc_requester->event.wait(timeout)) {
TRACE("TIMEOUT");
return false;
}
L.lock();
}
TRACE("Connected");
return true;
}
PVStructure::shared_pointer RPCClient::request(
PVStructure::shared_pointer const & pvArgument,
pvd::PVStructure::shared_pointer RPCClient::request(
pvd::PVStructure::shared_pointer const & pvArgument,
double timeout,
bool lastRequest)
{
@@ -278,65 +243,88 @@ PVStructure::shared_pointer RPCClient::request(
return waitResponse(timeout); // TODO reduce timeout for a time spent on connect
}
else
throw epics::pvAccess::RPCRequestException(Status::STATUSTYPE_ERROR, "connection timeout");
throw epics::pvAccess::RPCRequestException(pvd::Status::STATUSTYPE_ERROR, "connection timeout");
}
void RPCClient::issueRequest(
PVStructure::shared_pointer const & pvArgument,
pvd::PVStructure::shared_pointer const & pvArgument,
bool lastRequest)
{
if (!m_channel)
throw std::runtime_error("channel not connected");
TR1::shared_ptr<ChannelAndRPCRequesterImpl> channelRequesterImpl =
TR1::dynamic_pointer_cast<ChannelAndRPCRequesterImpl>(m_channel->getChannelRequester());
channelRequesterImpl->request(pvArgument, lastRequest);
}
PVStructure::shared_pointer RPCClient::waitResponse(double timeout)
{
TR1::shared_ptr<ChannelAndRPCRequesterImpl> channelRequesterImpl =
TR1::dynamic_pointer_cast<ChannelAndRPCRequesterImpl>(m_channel->getChannelRequester());
if (channelRequesterImpl->waitForResponse(timeout))
{
Status & status = channelRequesterImpl->getStatus();
if (status.isSuccess())
{
// release response structure
PVStructure::shared_pointer & response = channelRequesterImpl->getResponse();
PVStructure::shared_pointer retVal = response;
response.reset();
return retVal;
pvd::Lock L(m_rpc_requester->mutex);
TRACE("conn_status="<<m_rpc_requester->conn_status
<<" resp_status="<<m_rpc_requester->resp_status
<<" args:\n"<<pvArgument);
if(m_rpc_requester->inprogress)
throw std::logic_error("Request already in progress");
m_rpc_requester->inprogress = true;
m_rpc_requester->resp_status = pvd::Status::error("No Data");
if(!m_rpc_requester->conn_status.isSuccess()) {
TRACE("defer");
m_rpc_requester->last = lastRequest;
m_rpc_requester->next_args = pvArgument;
return;
}
else
throw epics::pvAccess::RPCRequestException(status.getType(), status.getMessage());
TRACE("request args: "<<pvArgument);
}
else
throw epics::pvAccess::RPCRequestException(Status::STATUSTYPE_ERROR, "RPC timeout");
if(lastRequest)
m_rpc->lastRequest();
m_rpc->request(pvArgument);
}
RPCClient::shared_pointer RPCClient::create(const std::string & serviceName)
pvd::PVStructure::shared_pointer RPCClient::waitResponse(double timeout)
{
PVStructure::shared_pointer pvRequest =
CreateRequest::create()->createRequest("");
return create(serviceName, pvRequest);
pvd::Lock L(m_rpc_requester->mutex);
TRACE("timeout="<<timeout);
if(!m_rpc_requester->inprogress)
throw std::logic_error("No request in progress");
while(m_rpc_requester->inprogress)
{
L.unlock();
if(!m_rpc_requester->event.wait(timeout)) {
TRACE("TIMEOUT");
throw RPCRequestException(pvd::Status::STATUSTYPE_ERROR, "RPC timeout");
}
L.lock();
}
TRACE("Complete: conn_status="<<m_rpc_requester->conn_status
<<" resp_status="<<m_rpc_requester->resp_status
<<" data:\n"<<m_rpc_requester->last_data);
if(!m_rpc_requester->conn_status.isSuccess())
throw RPCRequestException(pvd::Status::STATUSTYPE_ERROR, m_rpc_requester->conn_status.getMessage());
if(!m_rpc_requester->resp_status.isSuccess())
throw RPCRequestException(pvd::Status::STATUSTYPE_ERROR, m_rpc_requester->resp_status.getMessage());
// consume last_data so that we can't possibly return it twice
pvd::PVStructure::shared_pointer data;
data.swap(m_rpc_requester->last_data);
if(!data)
throw std::logic_error("No reply data?!?");
// copy it so that the caller need not worry about whether it will overwritten
// when the next request is issued
pvd::PVStructure::shared_pointer ret(pvd::getPVDataCreate()->createPVStructure(data->getStructure()));
ret->copyUnchecked(*data);
return ret;
}
RPCClient::shared_pointer RPCClient::create(const std::string & serviceName,
PVStructure::shared_pointer const & pvRequest)
pvd::PVStructure::shared_pointer const & pvRequest)
{
ClientFactory::start();
return RPCClient::shared_pointer(new RPCClient(serviceName, pvRequest));
}
PVStructure::shared_pointer RPCClient::sendRequest(const std::string & serviceName,
PVStructure::shared_pointer const & queryRequest,
pvd::PVStructure::shared_pointer RPCClient::sendRequest(const std::string & serviceName,
pvd::PVStructure::shared_pointer const & queryRequest,
double timeOut)
{
RPCClient::shared_pointer client = RPCClient::create(serviceName);
return client->request(queryRequest, timeOut);
RPCClient client(serviceName, queryRequest);
return client.request(queryRequest, timeOut);
}