The changes:

1) Fix issues #66 and #70
2) All tests in exampleCPP/testMultiplePutGet now work successfully.

I still want to do more testing, especially on connection management.
Also relese notes and documentation needs work.
This commit is contained in:
mrkraimer
2021-02-26 12:32:15 -05:00
parent b665dac669
commit 1211d01800
5 changed files with 44 additions and 204 deletions

View File

@ -162,28 +162,19 @@ void PvaClientGet::channelGetConnect(
}
{
Lock xx(mutex);
this->channelGet = channelGet;
channelGetConnectStatus = status;
if(status.isOK()) {
channelGetConnectStatus = status;
this->channelGet = channelGet;
connectState = connected;
pvaClientData = PvaClientGetData::create(structure);
pvaClientData->setMessagePrefix(channelGet->getChannel()->getChannelName());
} else {
stringstream ss;
ss << pvRequest;
string message = string("\nPvaClientGet::channelGetConnect)")
+ "\nchannelName=" + pvaClientChannel->getChannel()->getChannelName()
+ "\npvRequest=" + ss.str()
+ "\nerror\n" + status.getMessage();
channelGetConnectStatus = Status(Status::STATUSTYPE_ERROR,message);
}
}
waitForConnect.signal();
}
PvaClientGetRequesterPtr req(pvaClientGetRequester.lock());
if(req) {
req->channelGetConnect(status,shared_from_this());
}
waitForConnect.signal();
}
}
void PvaClientGet::getDone(
@ -201,16 +192,16 @@ void PvaClientGet::getDone(
{
Lock xx(mutex);
channelGetStatus = status;
getState = getComplete;
if(status.isOK()) {
pvaClientData->setData(pvStructure,bitSet);
}
getState = getComplete;
waitForGet.signal();
}
PvaClientGetRequesterPtr req(pvaClientGetRequester.lock());
if(req) {
req->getDone(status,shared_from_this());
}
waitForGet.signal();
}
void PvaClientGet::connect()
@ -249,20 +240,7 @@ Status PvaClientGet::waitConnect()
cout << "PvaClientGet::waitConnect channelName "
<< pvaClientChannel->getChannel()->getChannelName() << "\n";
}
{
Lock xx(mutex);
if(connectState==connected) {
if(!channelGetConnectStatus.isOK()) connectState = connectIdle;
return channelGetConnectStatus;
}
if(connectState!=connectActive) {
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientGet::waitConnect illegal connect state ";
throw std::runtime_error(message);
}
}
waitForConnect.wait();
if(!channelGetConnectStatus.isOK()) connectState = connectIdle;
return channelGetConnectStatus;
}
@ -302,20 +280,7 @@ Status PvaClientGet::waitGet()
cout << "PvaClientGet::waitGet channelName "
<< pvaClientChannel->getChannel()->getChannelName() << "\n";
}
{
Lock xx(mutex);
if(getState==getComplete) {
getState = getIdle;
return channelGetStatus;
}
if(getState!=getActive){
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientGet::waitGet llegal get state";
throw std::runtime_error(message);
}
}
waitForGet.wait();
getState = getComplete;
return channelGetStatus;
}
PvaClientGetDataPtr PvaClientGet::getData()

View File

@ -226,8 +226,10 @@ void PvaClientMonitor::monitorConnect(
}
{
Lock xx(mutex);
this->monitor = monitor;
if(!status.isOK()) {
monitorConnectStatus = status;
if(status.isOK()) {
this->monitor = monitor;
} else {
stringstream ss;
ss << pvRequest;
string message = string("\nPvaClientMonitor::monitorConnect)")
@ -236,11 +238,12 @@ void PvaClientMonitor::monitorConnect(
+ "\nerror\n" + status.getMessage();
monitorConnectStatus = Status(Status::STATUSTYPE_ERROR,message);
waitForConnect.signal();
PvaClientMonitorRequesterPtr req(pvaClientMonitorRequester.lock());
if(req) req->monitorConnect(status,shared_from_this(),structure);
return;
}
}
bool signal = (connectState==connectWait) ? true : false;
monitorConnectStatus = status;
connectState = connected;
if(isStarted) {
if(PvaClient::getDebug()) {
@ -249,6 +252,9 @@ void PvaClientMonitor::monitorConnect(
<< " is already started "
<< endl;
}
waitForConnect.signal();
PvaClientMonitorRequesterPtr req(pvaClientMonitorRequester.lock());
if(req) req->monitorConnect(status,shared_from_this(),structure);
return;
}
pvaClientData = PvaClientMonitorData::create(structure);
@ -329,20 +335,7 @@ Status PvaClientMonitor::waitConnect()
<< pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
{
Lock xx(mutex);
if(connectState==connected) {
if(!monitorConnectStatus.isOK()) connectState = connectIdle;
return monitorConnectStatus;
}
if(connectState!=connectWait) {
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientMonitor::waitConnect illegal connect state ";
throw std::runtime_error(message);
}
}
waitForConnect.wait();
connectState = monitorConnectStatus.isOK() ? connected : connectIdle;
if(PvaClient::getDebug()) {
cout << "PvaClientMonitor::waitConnect"
<< " monitorConnectStatus " << (monitorConnectStatus.isOK() ? "connected" : "not connected")

View File

@ -138,25 +138,17 @@ void PvaClientProcess::channelProcessConnect(
}
{
Lock xx(mutex);
this->channelProcess = channelProcess;
channelProcessConnectStatus = status;
if(status.isOK()) {
channelProcessConnectStatus = status;
this->channelProcess = channelProcess;
connectState = connected;
} else {
stringstream ss;
ss << pvRequest;
string message = string("PvaClientProcess::channelProcessConnect")
+ "\npvRequest\n" + ss.str()
+ "\nerror\n" + status.getMessage();
channelProcessConnectStatus = Status(Status::STATUSTYPE_ERROR,message);
}
}
waitForConnect.signal();
}
PvaClientProcessRequesterPtr req(pvaClientProcessRequester.lock());
if(req) {
req->channelProcessConnect(status,shared_from_this());
}
waitForConnect.signal();
}
void PvaClientProcess::processDone(
@ -173,13 +165,12 @@ void PvaClientProcess::processDone(
Lock xx(mutex);
channelProcessStatus = status;
processState = processComplete;
waitForProcess.signal();
}
PvaClientProcessRequesterPtr req(pvaClientProcessRequester.lock());
if(req) {
req->processDone(status,shared_from_this());
}
waitForProcess.signal();
}
void PvaClientProcess::connect()
@ -221,17 +212,7 @@ Status PvaClientProcess::waitConnect()
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(connectState==connected) {
if(!channelProcessConnectStatus.isOK()) connectState = connectIdle;
return channelProcessConnectStatus;
}
if(connectState!=connectActive) {
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " pvaClientProcess illegal connect state ";
throw std::runtime_error(message);
}
waitForConnect.wait();
if(!channelProcessConnectStatus.isOK()) connectState = connectIdle;
return channelProcessConnectStatus;
}
@ -274,18 +255,6 @@ Status PvaClientProcess::waitProcess()
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
{
Lock xx(mutex);
if(processState==processComplete) {
processState = processIdle;
return channelProcessStatus;
}
if(processState!=processActive){
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientProcess::waitProcess llegal process state";
throw std::runtime_error(message);
}
}
waitForProcess.wait();
processState = processComplete;
return channelProcessStatus;

View File

@ -163,28 +163,19 @@ void PvaClientPut::channelPutConnect(
}
{
Lock xx(mutex);
this->channelPut = channelPut;
channelPutConnectStatus = status;
if(status.isOK()) {
channelPutConnectStatus = status;
this->channelPut = channelPut;
connectState = connected;
pvaClientData = PvaClientPutData::create(structure);
pvaClientData->setMessagePrefix(channelPut->getChannel()->getChannelName());
} else {
stringstream ss;
ss << pvRequest;
string message = string("\nPvaClientPut::channelPutConnect)")
+ "\nchannelName=" + pvaClientChannel->getChannel()->getChannelName()
+ "\npvRequest\n" + ss.str()
+ "\nerror\n" + status.getMessage();
channelPutConnectStatus = Status(Status::STATUSTYPE_ERROR,message);
}
}
waitForConnect.signal();
}
PvaClientPutRequesterPtr req(pvaClientPutRequester.lock());
if(req) {
req->channelPutConnect(status,shared_from_this());
}
waitForConnect.signal();
}
void PvaClientPut::getDone(
@ -208,14 +199,14 @@ void PvaClientPut::getDone(
BitSetPtr bs = pvaClientData->getChangedBitSet();
bs->clear();
*bs |= *bitSet;
putState = putComplete;
}
}
putState = putComplete;
waitForGetPut.signal();
}
PvaClientPutRequesterPtr req(pvaClientPutRequester.lock());
if(req) {
req->getDone(status,shared_from_this());
}
waitForGetPut.signal();
}
void PvaClientPut::putDone(
@ -232,12 +223,10 @@ void PvaClientPut::putDone(
Lock xx(mutex);
channelGetPutStatus = status;
putState = putComplete;
waitForGetPut.signal();
}
PvaClientPutRequesterPtr req(pvaClientPutRequester.lock());
if(req) {
req->putDone(status,shared_from_this());
}
waitForGetPut.signal();
if(req) { req->putDone(status,shared_from_this());}
}
void PvaClientPut::connect()
@ -282,20 +271,7 @@ Status PvaClientPut::waitConnect()
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
{
Lock xx(mutex);
if(connectState==connected) {
if(!channelPutConnectStatus.isOK()) connectState = connectIdle;
return channelPutConnectStatus;
}
if(connectState!=connectActive) {
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPut::waitConnect illegal connect state ";
throw std::runtime_error(message);
}
}
waitForConnect.wait();
if(!channelPutConnectStatus.isOK()) connectState = connectIdle;
return channelPutConnectStatus;
}
@ -341,17 +317,8 @@ Status PvaClientPut::waitGet()
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
{
Lock xx(mutex);
if(putState==putComplete) return channelGetPutStatus;
if(putState!=getActive){
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPut::waitGet illegal put state";
throw std::runtime_error(message);
}
}
waitForGetPut.wait();
putState = putComplete;
return channelGetPutStatus;
}
@ -380,12 +347,12 @@ void PvaClientPut::issuePut()
<< " pvStructure\n" << pvaClientData->getPVStructure()
<< " bitSet " << *pvaClientData->getChangedBitSet() << endl
<< endl;
}
}
if(connectState==connectIdle) connect();
if(putState==getActive || putState==putActive) {
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ "PvaClientPut::issuePut get or put aleady active ";
+ " PvaClientPut::issuePut get or put aleady active ";
throw std::runtime_error(message);
}
putState = putActive;
@ -399,17 +366,8 @@ Status PvaClientPut::waitPut()
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
{
Lock xx(mutex);
if(putState==putComplete) return channelGetPutStatus;
if(putState!=putActive){
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPut::waitPut illegal put state";
throw std::runtime_error(message);
}
}
waitForGetPut.wait();
putState = putComplete;
if(channelGetPutStatus.isOK()) pvaClientData->getChangedBitSet()->clear();
return channelGetPutStatus;
}

View File

@ -176,31 +176,21 @@ void PvaClientPutGet::channelPutGetConnect(
}
{
Lock xx(mutex);
this->channelPutGet = channelPutGet;
channelPutGetConnectStatus = status;
if(status.isOK()) {
channelPutGetConnectStatus = status;
this->channelPutGet = channelPutGet;
connectState = connected;
pvaClientPutData = PvaClientPutData::create(putStructure);
pvaClientPutData->setMessagePrefix(channelPutGet->getChannel()->getChannelName());
pvaClientGetData = PvaClientGetData::create(getStructure);
pvaClientGetData->setMessagePrefix(channelPutGet->getChannel()->getChannelName());
} else {
stringstream ss;
ss << pvRequest;
string message = string("\nPvaClientPutGet::channelPutGetConnect)")
+ "\nchannelName=" + pvaClientChannel->getChannel()->getChannelName()
+ "\npvRequest\n" + ss.str()
+ "\nerror\n" + status.getMessage();
channelPutGetConnectStatus = Status(Status::STATUSTYPE_ERROR,message);
}
}
waitForConnect.signal();
}
PvaClientPutGetRequesterPtr req(pvaClientPutGetRequester.lock());
if(req) {
req->channelPutGetConnect(status,shared_from_this());
}
waitForConnect.signal();
}
void PvaClientPutGet::putGetDone(
@ -218,16 +208,16 @@ void PvaClientPutGet::putGetDone(
{
Lock xx(mutex);
channelPutGetStatus = status;
putGetState = putGetComplete;
if(status.isOK()) {
pvaClientGetData->setData(getPVStructure,getChangedBitSet);
}
putGetState = putGetComplete;
waitForPutGet.signal();
}
PvaClientPutGetRequesterPtr req(pvaClientPutGetRequester.lock());
if(req) {
req->putGetDone(status,shared_from_this());
}
waitForPutGet.signal();
}
void PvaClientPutGet::getPutDone(
@ -245,7 +235,6 @@ void PvaClientPutGet::getPutDone(
{
Lock xx(mutex);
channelPutGetStatus = status;
putGetState = putGetComplete;
if(status.isOK()) {
PVStructurePtr pvs = pvaClientPutData->getPVStructure();
pvs->copyUnchecked(*putPVStructure,*putBitSet);
@ -253,12 +242,13 @@ void PvaClientPutGet::getPutDone(
bs->clear();
*bs |= *putBitSet;
}
putGetState = putGetComplete;
waitForPutGet.signal();
}
PvaClientPutGetRequesterPtr req(pvaClientPutGetRequester.lock());
if(req) {
req->getPutDone(status,shared_from_this());
}
waitForPutGet.signal();
}
void PvaClientPutGet::getGetDone(
@ -276,16 +266,16 @@ void PvaClientPutGet::getGetDone(
{
Lock xx(mutex);
channelPutGetStatus = status;
putGetState = putGetComplete;
if(status.isOK()) {
pvaClientGetData->setData(getPVStructure,getChangedBitSet);
}
putGetState = putGetComplete;
waitForPutGet.signal();
}
PvaClientPutGetRequesterPtr req(pvaClientPutGetRequester.lock());
if(req) {
req->getGetDone(status,shared_from_this());
}
waitForPutGet.signal();
}
void PvaClientPutGet::connect()
@ -330,21 +320,7 @@ Status PvaClientPutGet::waitConnect()
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
{
Lock xx(mutex);
if(connectState==connected) {
if(!channelPutGetConnectStatus.isOK()) connectState = connectIdle;
return channelPutGetConnectStatus;
}
if(connectState!=connectActive) {
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPutGet::waitConnect illegal connect state ";
throw std::runtime_error(message);
}
}
waitForConnect.wait();
if(!channelPutGetConnectStatus.isOK()) connectState = connectIdle;
return channelPutGetConnectStatus;
}
@ -392,13 +368,6 @@ Status PvaClientPutGet::waitPutGet()
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(putGetState==putGetComplete) return channelPutGetStatus;
if(putGetState!=putGetActive){
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPutGet::waitPutGet get or put aleady active ";
throw std::runtime_error(message);
}
waitForPutGet.wait();
if(channelPutGetStatus.isOK()) pvaClientPutData->getChangedBitSet()->clear();
return channelPutGetStatus;
@ -446,13 +415,6 @@ Status PvaClientPutGet::waitGetGet()
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(putGetState==putGetComplete) return channelPutGetStatus;
if(putGetState!=putGetActive){
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPutGet::waitGetGet get or put aleady active ";
throw std::runtime_error(message);
}
waitForPutGet.wait();
return channelPutGetStatus;
}
@ -499,13 +461,6 @@ Status PvaClientPutGet::waitGetPut()
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(putGetState==putGetComplete) return channelPutGetStatus;
if(putGetState!=putGetActive){
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPutGet::waitGetPut get or put aleady active ";
throw std::runtime_error(message);
}
waitForPutGet.wait();
return channelPutGetStatus;
}