add more support for no block; removed extra create methods for put and get

This commit is contained in:
mrkraimer
2017-09-08 14:22:50 -04:00
parent 97d9dc2034
commit 4606d84185
8 changed files with 297 additions and 154 deletions

View File

@@ -58,6 +58,9 @@ class PvaClientChannel;
typedef std::tr1::shared_ptr<PvaClientChannel> PvaClientChannelPtr;
class PvaClientField;
typedef std::tr1::shared_ptr<PvaClientField> PvaClientFieldPtr;
class PvaClientProcessRequester;
typedef std::tr1::shared_ptr<PvaClientProcessRequester> PvaClientProcessRequesterPtr;
typedef std::tr1::weak_ptr<PvaClientProcessRequester> PvaClientProcessRequesterWPtr;
class PvaClientProcess;
typedef std::tr1::shared_ptr<PvaClientProcess> PvaClientProcessPtr;
class PvaClientGetRequester;
@@ -920,6 +923,35 @@ private:
friend class PvaClientMonitor;
};
/**
* @brief Optional client callback.
*
* <a href = "../htmldoxygen/pvaClientProcessRequester.html">Overview of PvaClientProcessRequester</a>
*/
class epicsShareClass PvaClientProcessRequester
{
public:
POINTER_DEFINITIONS(PvaClientProcessRequester);
virtual ~PvaClientProcessRequester() {}
/** @brief A channelProcess has connected.
*
* @param status The status returned by the server.
* @param clientProcess The PvaClientProcess that issued the request to create a ChannelProcess.
*/
virtual void channelProcessConnect(
const epics::pvData::Status& status,
PvaClientProcessPtr const & clientProcess)
{
}
/** @brief A process request is complete.
*
* @param status The status returned by the server.
* @param clientProcess The PvaClientProcess that issued the request to create a ChannelProcess.
*/
virtual void processDone(
const epics::pvData::Status& status,
PvaClientProcessPtr const & clientProcess) = 0;
};
// NOTE: must use separate class that implements ChannelProcessRequester,
// because pvAccess holds a shared_ptr to ChannelProcessRequester instead of weak_pointer
@@ -931,7 +963,9 @@ typedef std::tr1::shared_ptr<ChannelProcessRequesterImpl> ChannelProcessRequeste
*
* <a href = "../htmldoxygen/pvaClientProcess.html">Overview of PvaClientProcess</a>
*/
class epicsShareClass PvaClientProcess
class epicsShareClass PvaClientProcess :
public PvaClientChannelStateChangeRequester,
public std::tr1::enable_shared_from_this<PvaClientProcess>
{
public:
POINTER_DEFINITIONS(PvaClientProcess);
@@ -943,13 +977,17 @@ public:
*/
static PvaClientProcessPtr create(
PvaClientPtr const &pvaClient,
epics::pvAccess::Channel::shared_pointer const & channel,
PvaClientChannelPtr const & pvaClientChannel,
epics::pvData::PVStructurePtr const &pvRequest
);
/** @brief Destructor
*/
~PvaClientProcess();
/** @brief Set a user callback.
* @param pvaClientProcessRequester The requester which must be implemented by the caller.
*/
void setRequester(PvaClientProcessRequesterPtr const & pvaClientProcessRequester);
/** @brief Call issueConnect and then waitConnect.
*
* An exception is thrown if connect fails.
@@ -967,7 +1005,7 @@ public:
epics::pvData::Status waitConnect();
/** @brief Call issueProcess and then waitProcess.
*
* An exception is thrown if get fails.
* An exception is thrown if process fails.
*/
void process();
/** @brief Issue a process request and return immediately.
@@ -977,6 +1015,11 @@ public:
* @return status.
*/
epics::pvData::Status waitProcess();
/** @brief Get the PvaClientChannel;
*
* @return The interface.
*/
PvaClientChannelPtr getPvaClientChannel();
private:
std::string getRequesterName();
void message(std::string const & message,epics::pvData::MessageType messageType);
@@ -989,13 +1032,14 @@ private:
PvaClientProcess(
PvaClientPtr const &pvaClient,
epics::pvAccess::Channel::shared_pointer const & channel,
PvaClientChannelPtr const & pvaClientChannel,
epics::pvData::PVStructurePtr const &pvRequest);
void checkProcessState();
enum ProcessConnectState {connectIdle,connectActive,connected};
PvaClient::weak_pointer pvaClient;
epics::pvAccess::Channel::shared_pointer channel;
PvaClientChannelPtr pvaClientChannel;
epics::pvData::PVStructurePtr pvRequest;
epics::pvData::Mutex mutex;
epics::pvData::Event waitForConnect;
@@ -1007,9 +1051,13 @@ private:
ProcessConnectState connectState;
PvaClientChannelStateChangeRequesterWPtr pvaClientChannelStateChangeRequester;
PvaClientProcessRequesterWPtr pvaClientProcessRequester;
enum ProcessState {processIdle,processActive,processComplete};
ProcessState processState;
ChannelProcessRequesterImplPtr channelProcessRequester;
public:
void channelStateChange(PvaClientChannelPtr const & pvaClientChannel, bool isConnected);
friend class ChannelProcessRequesterImpl;
};
@@ -1069,25 +1117,7 @@ public:
PvaClientChannelPtr const & pvaClientChannel,
epics::pvData::PVStructurePtr const &pvRequest
);
/** @brief Create a PvaClientGet.
* @param pvaClient Interface to PvaClient
* @param channelName channel name
* @param providerName provider name
* @param request The request.
* @param stateChangeRequester The state change requester. Can be null.
* @param GetRequester The Get requester. Can be null;
* @return The new instance.
*/
static PvaClientGetPtr create(
PvaClientPtr const &pvaClient,
std::string const & channelName,
std::string const & providerName,
std::string const & request,
PvaClientChannelStateChangeRequesterPtr const & stateChangeRequester
= PvaClientChannelStateChangeRequesterPtr(),
PvaClientGetRequesterPtr const & getRequester
= PvaClientGetRequesterPtr()
);
/** @brief Destructor
*/
~PvaClientGet();
@@ -1160,7 +1190,6 @@ private:
epics::pvData::Event waitForConnect;
epics::pvData::Event waitForGet;
PvaClientGetDataPtr pvaClientData;
std::string messagePrefix;
epics::pvData::Status channelGetConnectStatus;
epics::pvData::Status channelGetStatus;
@@ -1247,25 +1276,6 @@ public:
PvaClientChannelPtr const & pvaClientChannel,
epics::pvData::PVStructurePtr const &pvRequest
);
/** @brief Create a PvaClientPut.
* @param pvaClient Interface to PvaClient
* @param channelName channel name
* @param providerName provider name
* @param request The request.
* @param stateChangeRequester The state change requester. Can be null.
* @param PutRequester The Put requester. Can be null;
* @return The new instance.
*/
static PvaClientPutPtr create(
PvaClientPtr const &pvaClient,
std::string const & channelName,
std::string const & providerName,
std::string const & request,
PvaClientChannelStateChangeRequesterPtr const & stateChangeRequester
= PvaClientChannelStateChangeRequesterPtr(),
PvaClientPutRequesterPtr const & putRequester
= PvaClientPutRequesterPtr()
);
/** @brief Destructor
*/
~PvaClientPut();
@@ -1358,7 +1368,7 @@ private :
epics::pvAccess::ChannelPut::shared_pointer channelPut;
PutConnectState connectState;
enum PutState {putIdle,getActive,putActive};
enum PutState {putIdle,getActive,putActive,putComplete};
PutState putState;
ChannelPutRequesterImplPtr channelPutRequester;
PvaClientChannelStateChangeRequesterWPtr pvaClientChannelStateChangeRequester;

View File

@@ -341,17 +341,19 @@ PvaClientProcessPtr PvaClientChannel::createProcess(PVStructurePtr const & pvRe
if(connectState!=connected) connect(5.0);
PvaClientPtr yyy = pvaClient.lock();
if(!yyy) throw std::runtime_error("PvaClient was destroyed");
return PvaClientProcess::create(yyy,channel,pvRequest);
return PvaClientProcess::create(yyy,shared_from_this(),pvRequest);
}
PvaClientGetPtr PvaClientChannel::get(string const & request)
{
PvaClientGetPtr pvaClientGet = pvaClientGetCache->getGet(request);
if(pvaClientGet) return pvaClientGet;
pvaClientGet = createGet(request);
pvaClientGet->connect();
pvaClientGetCache->addGet(request,pvaClientGet);
if(!pvaClientGet) {
pvaClientGet = createGet(request);
pvaClientGet->connect();
pvaClientGetCache->addGet(request,pvaClientGet);
}
pvaClientGet->get();
return pvaClientGet;
}
@@ -381,10 +383,12 @@ PvaClientPutPtr PvaClientChannel::put(string const & request)
{
PvaClientPutPtr pvaClientPut = pvaClientPutCache->getPut(request);
if(pvaClientPut) return pvaClientPut;
pvaClientPut = createPut(request);
pvaClientPut->connect();
pvaClientPut->get();
pvaClientPutCache->addPut(request,pvaClientPut);
if(!pvaClientPut) {
pvaClientPut = createPut(request);
pvaClientPut->connect();
pvaClientPut->get();
pvaClientPutCache->addPut(request,pvaClientPut);
}
return pvaClientPut;
}

View File

@@ -77,7 +77,7 @@ PvaClientGetPtr PvaClientGet::create(
PVStructurePtr const &pvRequest)
{
if(PvaClient::getDebug()) {
cout<< "PvaClientGet::create(pvaClient,channelName,providerName,pvRequest)\n"
cout<< "PvaClientGet::create(pvaClient,channelName,pvRequest)\n"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< " pvRequest " << pvRequest
<< endl;
@@ -88,35 +88,6 @@ PvaClientGetPtr PvaClientGet::create(
return clientGet;
}
PvaClientGetPtr PvaClientGet::create(
PvaClientPtr const &pvaClient,
std::string const & channelName,
std::string const & providerName,
std::string const & request,
PvaClientChannelStateChangeRequesterPtr const & stateChangeRequester,
PvaClientGetRequesterPtr const & getRequester)
{
if(PvaClient::getDebug()) {
cout<< "PvaClientGet::create(pvaClient,channelName,providerName,request,stateChangeRequester,getRequester)\n"
<< " channelName " << channelName
<< " providerName " << providerName
<< " request " << request
<< endl;
}
CreateRequest::shared_pointer createRequest(CreateRequest::create());
PVStructurePtr pvRequest(createRequest->createRequest(request));
if(!pvRequest) throw std::runtime_error(createRequest->getMessage());
PvaClientChannelPtr pvaClientChannel = pvaClient->createChannel(channelName,providerName);
PvaClientGetPtr clientGet(new PvaClientGet(pvaClient,pvaClientChannel,pvRequest));
clientGet->channelGetRequester = ChannelGetRequesterImplPtr(
new ChannelGetRequesterImpl(clientGet,pvaClient));
if(stateChangeRequester) clientGet->pvaClientChannelStateChangeRequester = stateChangeRequester;
if(getRequester) clientGet->pvaClientGetRequester = getRequester;
pvaClientChannel->setStateChangeRequester(clientGet);
pvaClientChannel->issueConnect();
return clientGet;
}
PvaClientGet::PvaClientGet(
PvaClientPtr const &pvaClient,
PvaClientChannelPtr const & pvaClientChannel,
@@ -141,7 +112,6 @@ PvaClientGet::~PvaClientGet()
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(channelGet) channelGet->destroy();
}
void PvaClientGet::channelStateChange(PvaClientChannelPtr const & pvaClientChannel, bool isConnected)
@@ -184,7 +154,6 @@ void PvaClientGet::checkGetState()
+ channelGetConnectStatus.getMessage();
throw std::runtime_error(message);
}
if(getState==getIdle) get();
}
// from ChannelGetRequester
@@ -253,6 +222,7 @@ void PvaClientGet::getDone(
{
Lock xx(mutex);
channelGetStatus = status;
getState = getComplete;
if(status.isOK()) {
pvaClientData->setData(pvStructure,bitSet);
}
@@ -362,6 +332,7 @@ Status PvaClientGet::waitGet()
{
Lock xx(mutex);
if(getState==getComplete) {
getState = getIdle;
return channelGetStatus;
}
if(getState!=getActive){
@@ -376,7 +347,13 @@ Status PvaClientGet::waitGet()
}
PvaClientGetDataPtr PvaClientGet::getData()
{
if(PvaClient::getDebug()) {
cout<< "PvaClientGet::getData"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
checkGetState();
if(getState==getIdle) get();
return pvaClientData;
}

View File

@@ -42,6 +42,10 @@ static string noTimeStamp("no timeStamp");
PvaClientGetDataPtr PvaClientGetData::create(StructureConstPtr const & structure)
{
if(PvaClient::getDebug()) {
cout << "PvaClientGetData::create"
<< endl;
}
PvaClientGetDataPtr epv(new PvaClientGetData(structure));
return epv;
}
@@ -54,6 +58,10 @@ PvaClientGetData::PvaClientGetData(StructureConstPtr const & structure)
void PvaClientGetData::checkValue()
{
if(PvaClient::getDebug()) {
cout << "PvaClientGetData::checkValue"
<< endl;
}
if(pvValue) return;
throw std::runtime_error(messagePrefix + noValue);
}
@@ -100,6 +108,10 @@ void PvaClientGetData::setData(
PVStructurePtr const & pvStructureFrom,
BitSetPtr const & bitSetFrom)
{
if(PvaClient::getDebug()) {
cout << "PvaClientGetData::setData"
<< endl;
}
pvStructure = pvStructureFrom;
bitSet = bitSetFrom;
pvValue = pvStructure->getSubField("value");

View File

@@ -150,7 +150,6 @@ PvaClientMonitor::~PvaClientMonitor()
}
if(monitor) {
if(isStarted) monitor->stop();
monitor->destroy();
}
}
@@ -400,7 +399,6 @@ void PvaClientMonitor::start(string const & request)
if(!pvr) throw std::runtime_error(createRequest->getMessage());
if(monitor) {
if(isStarted) monitor->stop();
monitor->destroy();
}
monitorRequester.reset();
monitor.reset();

View File

@@ -70,35 +70,92 @@ public:
PvaClientProcessPtr PvaClientProcess::create(
PvaClientPtr const &pvaClient,
Channel::shared_pointer const & channel,
PvaClientChannelPtr const & pvaClientChannel,
PVStructurePtr const &pvRequest)
{
PvaClientProcessPtr epv(new PvaClientProcess(pvaClient,channel,pvRequest));
epv->channelProcessRequester = ChannelProcessRequesterImplPtr(
new ChannelProcessRequesterImpl(epv,pvaClient));
return epv;
if(PvaClient::getDebug()) {
cout<< "PvaClientProcess::create(pvaClient,channelName,pvRequest)\n"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< " pvRequest " << pvRequest
<< endl;
}
PvaClientProcessPtr channelProcess(new PvaClientProcess(pvaClient,pvaClientChannel,pvRequest));
channelProcess->channelProcessRequester = ChannelProcessRequesterImplPtr(
new ChannelProcessRequesterImpl(channelProcess,pvaClient));
return channelProcess;
}
PvaClientProcess::PvaClientProcess(
PvaClientPtr const &pvaClient,
Channel::shared_pointer const & channel,
PvaClientChannelPtr const & pvaClientChannel,
PVStructurePtr const &pvRequest)
: pvaClient(pvaClient),
channel(channel),
pvaClientChannel(pvaClientChannel),
pvRequest(pvRequest),
connectState(connectIdle),
processState(processIdle)
{
if(PvaClient::getDebug()) cout<< "PvaClientProcess::PvaClientProcess()\n";
if(PvaClient::getDebug()) {
cout<< "PvaClientProcess::PvaClientProcess()"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
}
PvaClientProcess::~PvaClientProcess()
{
if(PvaClient::getDebug()) cout<< "PvaClientProcess::~PvaClientProcess()\n";
channelProcess->destroy();
if(PvaClient::getDebug()) {
cout<< "PvaClientProcess::~PvaClientProcess()"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
}
void PvaClientProcess::channelStateChange(PvaClientChannelPtr const & pvaClientChannel, bool isConnected)
{
if(PvaClient::getDebug()) {
cout<< "PvaClientProcess::channelStateChange"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< " isConnected " << (isConnected ? "true" : "false")
<< endl;
}
if(isConnected)
{
connectState = connectActive;
channelProcess = pvaClientChannel->getChannel()->createChannelProcess(channelProcessRequester,pvRequest);
}
PvaClientChannelStateChangeRequesterPtr req(pvaClientChannelStateChangeRequester.lock());
if(req) {
req->channelStateChange(pvaClientChannel,isConnected);
}
}
void PvaClientProcess::checkProcessState()
{
if(PvaClient::getDebug()) {
cout << "PvaClientProcess::checkProcessState"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(!pvaClientChannel->getChannel()->isConnected()) {
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientProcess::checkProcessState channel not connected ";
throw std::runtime_error(message);
}
if(connectState==connectIdle) {
connect();
}
if(connectState==connectActive){
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " "
+ channelProcessConnectStatus.getMessage();
throw std::runtime_error(message);
}
if(processState==processIdle) process();
}
// from ChannelProcessRequester
string PvaClientProcess::getRequesterName()
{
@@ -118,9 +175,31 @@ void PvaClientProcess::channelProcessConnect(
const Status& status,
ChannelProcess::shared_pointer const & channelProcess)
{
channelProcessConnectStatus = status;
connectState = connected;
this->channelProcess = channelProcess;
if(PvaClient::getDebug()) {
cout << "PvaClientProcess::channelProcessConnect"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< " status.isOK " << (status.isOK() ? "true" : "false")
<< endl;
}
{
Lock xx(mutex);
this->channelProcess = channelProcess;
if(status.isOK()) {
channelProcessConnectStatus = status;
connectState = connected;
} else {
stringstream ss;
ss << pvRequest;
string message = string("PvaClientProcess::channelProcessConnect")
+ "\npvRequest\n" + ss.str()
+ "\nerror\n" + status.getMessage();
channelProcessConnectStatus = Status(Status::STATUSTYPE_ERROR,message);
}
}
PvaClientProcessRequesterPtr req(pvaClientProcessRequester.lock());
if(req) {
req->channelProcessConnect(status,shared_from_this());
}
waitForConnect.signal();
}
@@ -129,40 +208,69 @@ void PvaClientProcess::processDone(
const Status& status,
ChannelProcess::shared_pointer const & channelProcess)
{
channelProcessStatus = status;
processState = processComplete;
if(PvaClient::getDebug()) {
cout << "PvaClientProcess::processDone"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< " status.isOK " << (status.isOK() ? "true" : "false")
<< endl;
}
{
Lock xx(mutex);
channelProcessStatus = status;
processState = processComplete;
}
PvaClientProcessRequesterPtr req(pvaClientProcessRequester.lock());
if(req) {
req->processDone(status,shared_from_this());
}
waitForProcess.signal();
}
void PvaClientProcess::connect()
{
if(PvaClient::getDebug()) {
cout << "PvaClientProcess::connect"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
issueConnect();
Status status = waitConnect();
if(status.isOK()) return;
string message = string("channel ") + channel->getChannelName()
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientProcess::connect " + status.getMessage();
throw std::runtime_error(message);
}
void PvaClientProcess::issueConnect()
{
if(PvaClient::getDebug()) {
cout << "PvaClientProcess::issueConnect"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(connectState!=connectIdle) {
string message = string("channel ") + channel->getChannelName()
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " pvaClientProcess already connected ";
throw std::runtime_error(message);
}
connectState = connectActive;
channelProcess = channel->createChannelProcess(channelProcessRequester,pvRequest);
channelProcess = pvaClientChannel->getChannel()->createChannelProcess(channelProcessRequester,pvRequest);
}
Status PvaClientProcess::waitConnect()
{
if(PvaClient::getDebug()) {
cout << "PvaClientProcess::waitConnect"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(connectState==connected) {
if(!channelProcessConnectStatus.isOK()) connectState = connectIdle;
return channelProcessConnectStatus;
}
if(connectState!=connectActive) {
string message = string("channel ") + channel->getChannelName()
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " pvaClientProcess illegal connect state ";
throw std::runtime_error(message);
}
@@ -173,19 +281,29 @@ Status PvaClientProcess::waitConnect()
void PvaClientProcess::process()
{
if(PvaClient::getDebug()) {
cout << "PvaClientProcess::process"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
issueProcess();
Status status = waitProcess();
if(status.isOK()) return;
string message = string("channel ") + channel->getChannelName()
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientProcess::process" + status.getMessage();
throw std::runtime_error(message);
}
void PvaClientProcess::issueProcess()
{
if(PvaClient::getDebug()) {
cout << "PvaClientProcess::issueProcess"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(connectState==connectIdle) connect();
if(processState!=processIdle) {
string message = string("channel ") + channel->getChannelName()
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientProcess::issueProcess process aleady active ";
throw std::runtime_error(message);
}
@@ -195,18 +313,42 @@ void PvaClientProcess::issueProcess()
Status PvaClientProcess::waitProcess()
{
if(processState==processComplete) {
processState = processIdle;
return channelProcessStatus;
if(PvaClient::getDebug()) {
cout << "PvaClientProcess::waitProcess"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(processState!=processActive){
string message = string("channel ") + channel->getChannelName()
+ " PvaClientProcess::waitProcess llegal process state";
throw std::runtime_error(message);
{
Lock xx(mutex);
if(processState==processComplete) {
processState = processIdle;
return channelProcessStatus;
}
if(processState!=processActive){
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientProcess::waitProcess llegal process state";
throw std::runtime_error(message);
}
}
waitForProcess.wait();
processState = processIdle;
processState = processComplete;
return channelProcessStatus;
}
void PvaClientProcess::setRequester(PvaClientProcessRequesterPtr const & pvaClientProcessRequester)
{
if(PvaClient::getDebug()) {
cout << "PvaClientProcess::setRequester"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
this->pvaClientProcessRequester = pvaClientProcessRequester;
}
PvaClientChannelPtr PvaClientProcess::getPvaClientChannel()
{
return pvaClientChannel;
}
}}

View File

@@ -91,35 +91,6 @@ PvaClientPutPtr PvaClientPut::create(
return clientPut;
}
PvaClientPutPtr PvaClientPut::create(
PvaClientPtr const &pvaClient,
std::string const & channelName,
std::string const & providerName,
std::string const & request,
PvaClientChannelStateChangeRequesterPtr const & stateChangeRequester,
PvaClientPutRequesterPtr const & putRequester)
{
if(PvaClient::getDebug()) {
cout<< "PvaClientPut::create(pvaClient,channelName,providerName,request,stateChangeRequester,putRequester)\n"
<< " channelName " << channelName
<< " providerName " << providerName
<< " request " << request
<< endl;
}
CreateRequest::shared_pointer createRequest(CreateRequest::create());
PVStructurePtr pvRequest(createRequest->createRequest(request));
if(!pvRequest) throw std::runtime_error(createRequest->getMessage());
PvaClientChannelPtr pvaClientChannel = pvaClient->createChannel(channelName,providerName);
PvaClientPutPtr clientPut(new PvaClientPut(pvaClient,pvaClientChannel,pvRequest));
clientPut->channelPutRequester = ChannelPutRequesterImplPtr(
new ChannelPutRequesterImpl(clientPut,pvaClient));
if(stateChangeRequester) clientPut->pvaClientChannelStateChangeRequester = stateChangeRequester;
if(putRequester) clientPut->pvaClientPutRequester = putRequester;
pvaClientChannel->setStateChangeRequester(clientPut);
pvaClientChannel->issueConnect();
return clientPut;
}
PvaClientPut::PvaClientPut(
PvaClientPtr const &pvaClient,
@@ -145,7 +116,6 @@ PvaClientPut::~PvaClientPut()
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(channelPut) channelPut->destroy();
}
void PvaClientPut::channelStateChange(PvaClientChannelPtr const & pvaClientChannel, bool isConnected)
@@ -176,8 +146,6 @@ void PvaClientPut::checkPutState()
}
if(connectState==connectIdle){
connect();
get();
return;
}
if(connectState==connectActive){
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
@@ -343,6 +311,11 @@ Status PvaClientPut::waitConnect()
void PvaClientPut::get()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPut::get"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
issueGet();
Status status = waitGet();
if(status.isOK()) return;
@@ -355,8 +328,13 @@ void PvaClientPut::get()
void PvaClientPut::issueGet()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPut::issueGet"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(connectState==connectIdle) connect();
if(putState!=putIdle) {
if(putState==getActive || putState==putActive) {
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ "PvaClientPut::issueGet get or put aleady active ";
@@ -368,6 +346,11 @@ void PvaClientPut::issueGet()
Status PvaClientPut::waitGet()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPut::waitGet"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(putState!=getActive){
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
@@ -375,12 +358,17 @@ Status PvaClientPut::waitGet()
throw std::runtime_error(message);
}
waitForGetPut.wait();
putState = putIdle;
putState = putComplete;
return channelGetPutStatus;
}
void PvaClientPut::put()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPut::put"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
issuePut();
Status status = waitPut();
if(status.isOK()) return;
@@ -393,8 +381,15 @@ void PvaClientPut::put()
void PvaClientPut::issuePut()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPut::issuePut"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< " pvStructure\n" << pvaClientData->getPVStructure()
<< " bitSet " << *pvaClientData->getChangedBitSet() << endl
<< endl;
}
if(connectState==connectIdle) connect();
if(putState!=putIdle) {
if(putState==getActive || putState==putActive) {
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ "PvaClientPut::issueGet get or put aleady active ";
@@ -406,6 +401,11 @@ void PvaClientPut::issuePut()
Status PvaClientPut::waitPut()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPut::waitPut"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(putState!=putActive){
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
@@ -413,7 +413,7 @@ Status PvaClientPut::waitPut()
throw std::runtime_error(message);
}
waitForGetPut.wait();
putState = putIdle;
putState = putComplete;
if(channelGetPutStatus.isOK()) pvaClientData->getChangedBitSet()->clear();
return channelGetPutStatus;
}
@@ -426,6 +426,7 @@ PvaClientPutDataPtr PvaClientPut::getData()
<< endl;
}
checkPutState();
if(putState==putIdle) get();
return pvaClientData;
}

View File

@@ -132,7 +132,6 @@ PvaClientPutGet::~PvaClientPutGet()
<< " channelName " << channelName
<< endl;
}
channelPutGet->destroy();
}
void PvaClientPutGet::checkPutGetState()