RPCClient: reimplemented as in Java, backported 3.0.4

This commit is contained in:
Matej Sekoranja
2014-09-01 11:48:25 +02:00
parent c157a25490
commit c1115437e3
4 changed files with 343 additions and 233 deletions

View File

@@ -21,6 +21,7 @@
using namespace epics::pvData;
using namespace std::tr1;
using std::string;
namespace epics
@@ -29,43 +30,28 @@ namespace epics
namespace pvAccess
{
namespace
{
// copied from eget/pvutils. This is a lot of boilerplate and needs refactoring to common code.
class ChannelRequesterImpl :
public epics::pvAccess::ChannelRequester
class ChannelAndRPCRequesterImpl :
public enable_shared_from_this<ChannelAndRPCRequesterImpl>,
public virtual epics::pvAccess::ChannelRequester,
public virtual epics::pvAccess::ChannelRPCRequester
{
private:
epics::pvData::Event m_event;
public:
virtual std::string getRequesterName();
virtual void message(std::string const & message, epics::pvData::MessageType messageType);
virtual void channelCreated(const epics::pvData::Status& status, epics::pvAccess::Channel::shared_pointer const & channel);
virtual void channelStateChange(epics::pvAccess::Channel::shared_pointer const & channel, epics::pvAccess::Channel::ConnectionState connectionState);
bool waitUntilConnected(double timeOut);
};
class ChannelRPCRequesterImpl : public ChannelRPCRequester
{
private:
ChannelRPC::shared_pointer m_channelRPC;
Mutex m_pointerMutex;
Mutex m_mutex;
Event m_event;
Event m_connectionEvent;
string m_channelName;
Status m_status;
PVStructure::shared_pointer m_response;
ChannelRPC::shared_pointer m_channelRPC;
public:
epics::pvData::PVStructure::shared_pointer response;
ChannelRPCRequesterImpl(std::string channelName) : m_channelName(channelName) {}
ChannelAndRPCRequesterImpl() {}
virtual string getRequesterName()
{
return "ChannelRPCRequesterImpl";
return "ChannelAndRPCRequesterImpl";
}
virtual void message(std::string const & message, MessageType messageType)
@@ -73,231 +59,274 @@ class ChannelRPCRequesterImpl : public ChannelRPCRequester
std::cerr << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl;
}
virtual void channelRPCConnect(const epics::pvData::Status& status, ChannelRPC::shared_pointer const & channelRPC)
void channelCreated(
const epics::pvData::Status& status,
Channel::shared_pointer const & channel)
{
if (status.isSuccess())
{
// show warning
if (!status.isOK())
{
std::cerr << "[" << m_channelName << "] channel RPC create: " << status << std::endl;
std::cerr << "[" << channel->getChannelName() << "] channel create: " << status << std::endl;
}
// assign smart pointers
{
Lock lock(m_pointerMutex);
m_channelRPC = channelRPC;
}
m_connectionEvent.signal();
}
else
{
std::cerr << "[" << m_channelName << "] failed to create channel get: " << status << std::endl;
std::cerr << "[" << channel->getChannelName() << "] failed to create a channel: " << status << std::endl;
{
Lock lock(m_mutex);
m_status = status;
}
m_connectionEvent.signal();
}
}
virtual void requestDone(const epics::pvData::Status &status, ChannelRPC::shared_pointer const & /*channelRPC*/,
epics::pvData::PVStructure::shared_pointer const &pvResponse)
void channelStateChange(
Channel::shared_pointer const & channel,
Channel::ConnectionState connectionState)
{
if (connectionState == Channel::CONNECTED)
{
bool rpcAlreadyConnectedOnce = false;
{
Lock lock(m_mutex);
rpcAlreadyConnectedOnce = (m_channelRPC.get() != 0);
}
if (!rpcAlreadyConnectedOnce)
{
PVStructure::shared_pointer pvRequest =
CreateRequest::create()->createRequest("");
channel->createChannelRPC(shared_from_this(), pvRequest);
}
}
/*
else if (connectionState != Channel::DESTROYED)
{
std::cerr << "[" << channel->getChannelName() << "] channel state change: " << Channel::ConnectionStateNames[connectionState] << std::endl;
}
*/
}
virtual void channelRPCConnect(
const epics::pvData::Status & status,
ChannelRPC::shared_pointer const & channelRPC)
{
if (status.isSuccess())
{
// show warning
if (!status.isOK())
{
std::cerr << "[" << m_channelName << "] channel RPC: " << status << std::endl;
}
// access smart pointers
{
Lock lock(m_pointerMutex);
response = pvResponse;
// this is OK since calle holds reference to it
m_channelRPC.reset();
}
std::cerr << "[" << channelRPC->getChannel()->getChannelName() << "] channel RPC create: " << status << std::endl;
}
else
{
std::cerr << "[" << m_channelName << "] failed to RPC: " << status << std::endl;
{
Lock lock(m_pointerMutex);
// this is OK since calle holds reference to it
m_channelRPC.reset();
}
std::cerr << "[" << channelRPC->getChannel()->getChannelName() << "] failed to create channel RPC: " << status << std::endl;
}
{
Lock lock(m_mutex);
m_status = status;
m_channelRPC = channelRPC;
}
m_connectionEvent.signal();
}
virtual void requestDone(
const epics::pvData::Status & status,
ChannelRPC::shared_pointer const & channelRPC,
epics::pvData::PVStructure::shared_pointer const & pvResponse)
{
if (status.isSuccess())
{
if (!status.isOK())
std::cerr << "[" << channelRPC->getChannel()->getChannelName() << "] channel RPC: " << status << std::endl;
}
else
{
std::cerr << "[" << channelRPC->getChannel()->getChannelName() << "] failed to RPC: " << status << std::endl;
}
{
Lock lock(m_mutex);
m_status = status;
m_response = pvResponse;
}
m_event.signal();
}
/*
void request(epics::pvData::PVStructure::shared_pointer const &pvRequest)
{
Lock lock(m_pointerMutex);
m_channelRPC->request(pvRequest, false);
}
*/
bool waitUntilRPC(double timeOut)
bool waitForResponse(double timeOut)
{
return m_event.wait(timeOut);
}
bool waitUntilConnected(double timeOut)
bool waitUntilRPCConnected(double timeOut)
{
if (isRPCConnected())
return true;
return m_connectionEvent.wait(timeOut);
}
bool isRPCConnected()
{
Lock lock(m_mutex);
return (m_channelRPC.get() != 0);
}
PVStructure::shared_pointer & getResponse()
{
Lock lock(m_mutex);
return m_response;
}
Status & getStatus()
{
Lock lock(m_mutex);
return m_status;
}
void request(PVStructure::shared_pointer const & pvArgument, bool lastRequest)
{
ChannelRPC::shared_pointer rpc;
{
Lock lock(m_mutex);
rpc = m_channelRPC;
}
if (!rpc)
throw std::runtime_error("channel RPC not connected");
if (lastRequest)
rpc->lastRequest();
rpc->request(pvArgument);
}
};
string ChannelRequesterImpl::getRequesterName()
RPCClient::RPCClient(const std::string & serviceName)
: m_serviceName(serviceName)
{
return "ChannelRequesterImpl";
}
void ChannelRequesterImpl::message(std::string const & message, MessageType messageType)
void RPCClient::destroy()
{
std::cerr << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl;
}
void ChannelRequesterImpl::channelCreated(const epics::pvData::Status& status, Channel::shared_pointer const & channel)
{
if (status.isSuccess())
{
// show warning
if (!status.isOK())
{
std::cerr << "[" << channel->getChannelName() << "] channel create: " << status << std::endl;
}
}
else
{
std::cerr << "[" << channel->getChannelName() << "] failed to create a channel: " << status << std::endl;
}
}
void ChannelRequesterImpl::channelStateChange(Channel::shared_pointer const & /*channel*/, Channel::ConnectionState connectionState)
{
if (connectionState == Channel::CONNECTED)
{
m_event.signal();
}
/*
else if (connectionState != Channel::DESTROYED)
{
std::cerr << "[" << channel->getChannelName() << "] channel state change: " << Channel::ConnectionStateNames[connectionState] << std::endl;
}
*/
}
bool ChannelRequesterImpl::waitUntilConnected(double timeOut)
{
return m_event.wait(timeOut);
}
class RPCClientImpl: public RPCClient
{
public:
POINTER_DEFINITIONS(RPCClientImpl);
RPCClientImpl(const std::string & serviceName)
: m_serviceName(serviceName), m_connected(false)
if (m_channel)
{
m_channel->destroy();
m_channel.reset();
}
}
virtual PVStructure::shared_pointer request(PVStructure::shared_pointer pvRequest, double timeOut);
private:
void init()
{
using namespace std::tr1;
m_provider = getChannelProviderRegistry()->getProvider("pva");
shared_ptr<ChannelRequesterImpl> channelRequesterImpl(new ChannelRequesterImpl());
m_channelRequesterImpl = channelRequesterImpl;
m_channel = m_provider->createChannel(m_serviceName, channelRequesterImpl);
}
bool connect(double timeOut)
{
init();
m_connected = m_channelRequesterImpl->waitUntilConnected(timeOut);
return m_connected;
}
std::string m_serviceName;
ChannelProvider::shared_pointer m_provider;
std::tr1::shared_ptr<ChannelRequesterImpl> m_channelRequesterImpl;
Channel::shared_pointer m_channel;
bool m_connected;
};
PVStructure::shared_pointer RPCClientImpl::request(PVStructure::shared_pointer pvRequest, double timeOut)
bool RPCClient::connect(double timeout)
{
using namespace std::tr1;
if (m_channel &&
dynamic_pointer_cast<ChannelAndRPCRequesterImpl>(m_channel->getChannelRequester())->isRPCConnected())
return true;
PVStructure::shared_pointer response;
issueConnect();
return waitConnect(timeout);
}
bool allOK = true;
void RPCClient::issueConnect()
{
ChannelProvider::shared_pointer provider = getChannelProviderRegistry()->getProvider("pva");
if (m_connected || connect(timeOut))
// TODO try to reuse ChannelRequesterImpl instance (i.e. create only once)
shared_ptr<ChannelAndRPCRequesterImpl> channelRequesterImpl(new ChannelAndRPCRequesterImpl());
m_channel = provider->createChannel(m_serviceName, channelRequesterImpl);
}
bool RPCClient::waitConnect(double timeout)
{
if (!m_channel)
throw std::runtime_error("issueConnect() must be called before waitConnect()");
shared_ptr<ChannelAndRPCRequesterImpl> channelRequesterImpl =
dynamic_pointer_cast<ChannelAndRPCRequesterImpl>(m_channel->getChannelRequester());
return channelRequesterImpl->waitUntilRPCConnected(timeout) &&
channelRequesterImpl->isRPCConnected();
}
PVStructure::shared_pointer RPCClient::request(
PVStructure::shared_pointer const & pvArgument,
double timeout,
bool lastRequest)
{
if (connect(timeout))
{
shared_ptr<ChannelRPCRequesterImpl> rpcRequesterImpl(new ChannelRPCRequesterImpl(m_channel->getChannelName()));
ChannelRPC::shared_pointer channelRPC = m_channel->createChannelRPC(rpcRequesterImpl, pvRequest);
if (rpcRequesterImpl->waitUntilConnected(timeOut))
{
channelRPC->lastRequest();
channelRPC->request(pvRequest);
allOK &= rpcRequesterImpl->waitUntilRPC(timeOut);
response = rpcRequesterImpl->response;
}
else
{
allOK = false;
m_channel->destroy();
m_connected = false;
std::string errMsg = "[" + m_channel->getChannelName() + "] RPC create timeout";
throw epics::pvAccess::RPCRequestException(Status::STATUSTYPE_ERROR, errMsg);
}
issueRequest(pvArgument, lastRequest);
return waitResponse(timeout); // TODO reduce timeout for a time spent on connect
}
else
{
allOK = false;
m_channel->destroy();
m_connected = false;
std::string errMsg = "[" + m_channel->getChannelName() + "] connection timeout";
throw epics::pvAccess::RPCRequestException(Status::STATUSTYPE_ERROR, errMsg);
}
if (!allOK)
{
throw epics::pvAccess::RPCRequestException(Status::STATUSTYPE_ERROR, "RPC request failed");
}
return response;
throw epics::pvAccess::RPCRequestException(Status::STATUSTYPE_ERROR, "connection timeout");
}
void RPCClient::issueRequest(
PVStructure::shared_pointer const & pvArgument,
bool lastRequest)
{
if (!m_channel)
throw std::runtime_error("channel not connected");
shared_ptr<ChannelAndRPCRequesterImpl> channelRequesterImpl =
dynamic_pointer_cast<ChannelAndRPCRequesterImpl>(m_channel->getChannelRequester());
channelRequesterImpl->request(pvArgument, lastRequest);
}
RPCClient::shared_pointer RPCClientFactory::create(const std::string & serviceName)
PVStructure::shared_pointer RPCClient::waitResponse(double timeout)
{
shared_ptr<ChannelAndRPCRequesterImpl> channelRequesterImpl =
dynamic_pointer_cast<ChannelAndRPCRequesterImpl>(m_channel->getChannelRequester());
if (channelRequesterImpl->waitForResponse(timeout))
{
Status & status = channelRequesterImpl->getStatus();
if (status.isSuccess())
{
// release response structure
PVStructure::shared_pointer & response = channelRequesterImpl->getResponse();
PVStructure::shared_pointer retVal = response;
response.reset();
return retVal;
}
else
throw epics::pvAccess::RPCRequestException(status.getType(), status.getMessage());
}
else
throw epics::pvAccess::RPCRequestException(Status::STATUSTYPE_ERROR, "RPC timeout");
}
RPCClient::shared_pointer RPCClient::create(const std::string & serviceName)
{
ClientFactory::start();
return RPCClient::shared_pointer(new RPCClientImpl(serviceName));
return RPCClient::shared_pointer(new RPCClient(serviceName));
}
PVStructure::shared_pointer sendRequest(const std::string & serviceName,
PVStructure::shared_pointer RPCClient::sendRequest(const std::string & serviceName,
PVStructure::shared_pointer queryRequest,
double timeOut)
{
RPCClient::shared_pointer client = RPCClientFactory::create(serviceName);
RPCClient::shared_pointer client = RPCClient::create(serviceName);
return client->request(queryRequest, timeOut);
}