yet more support for noblock; add ci

This commit is contained in:
mrkraimer
2017-09-12 14:20:13 -04:00
parent 4606d84185
commit b5e9aa6a60
9 changed files with 446 additions and 111 deletions

View File

@@ -96,28 +96,28 @@ public:
PvaClientPutGetPtr PvaClientPutGet::create(
PvaClientPtr const &pvaClient,
Channel::shared_pointer const & channel,
PvaClientChannelPtr const & pvaClientChannel,
PVStructurePtr const &pvRequest)
{
PvaClientPutGetPtr epv(new PvaClientPutGet(pvaClient,channel,pvRequest));
epv->channelPutGetRequester = ChannelPutGetRequesterImplPtr(
new ChannelPutGetRequesterImpl(epv,pvaClient));
return epv;
PvaClientPutGetPtr clientPutGet(new PvaClientPutGet(pvaClient,pvaClientChannel,pvRequest));
clientPutGet->channelPutGetRequester = ChannelPutGetRequesterImplPtr(
new ChannelPutGetRequesterImpl(clientPutGet,pvaClient));
return clientPutGet;
}
PvaClientPutGet::PvaClientPutGet(
PvaClientPtr const &pvaClient,
Channel::shared_pointer const & channel,
PvaClientChannelPtr const & pvaClientChannel,
PVStructurePtr const &pvRequest)
: pvaClient(pvaClient),
channel(channel),
pvaClientChannel(pvaClientChannel),
pvRequest(pvRequest),
connectState(connectIdle),
putGetState(putGetIdle)
{
if(PvaClient::getDebug()) {
cout<< "PvaClientPutGet::PvaClientPutGet"
<< " channelName " << channel->getChannelName()
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
}
@@ -125,20 +125,46 @@ PvaClientPutGet::PvaClientPutGet(
PvaClientPutGet::~PvaClientPutGet()
{
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout<< "PvaClientPutGet::~PvaClientPutGet"
<< " channelName " << channelName
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
}
void PvaClientPutGet::channelStateChange(PvaClientChannelPtr const & pvaClientChannel, bool isConnected)
{
if(PvaClient::getDebug()) {
cout<< "PvaClientPutGet::channelStateChange"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< " isConnected " << (isConnected ? "true" : "false")
<< endl;
}
if(isConnected&&!channelPutGet)
{
connectState = connectActive;
channelPutGet = pvaClientChannel->getChannel()->createChannelPutGet(channelPutGetRequester,pvRequest);
}
PvaClientChannelStateChangeRequesterPtr req(pvaClientChannelStateChangeRequester.lock());
if(req) {
req->channelStateChange(pvaClientChannel,isConnected);
}
}
void PvaClientPutGet::checkPutGetState()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPutGet::checkPutGetState"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(connectState==connectIdle){
connect();
getPut();
connect();
}
if(connectState==connectActive){
string message = string("channel ") + pvaClientChannel->getChannel()->getChannelName()
+ " "
+ channelPutGetConnectStatus.getMessage();
throw std::runtime_error(message);
}
}
@@ -164,17 +190,33 @@ void PvaClientPutGet::channelPutGetConnect(
{
if(PvaClient::getDebug()) {
cout << "PvaClientPutGet::channelPutGetConnect"
<< " channelName " << channelPutGet->getChannel()->getChannelName()
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< " status.isOK " << (status.isOK() ? "true" : "false")
<< endl;
}
channelPutGetConnectStatus = status;
this->channelPutGet = channelPutGet;
if(status.isOK()) {
pvaClientPutData = PvaClientPutData::create(putStructure);
pvaClientPutData->setMessagePrefix(channelPutGet->getChannel()->getChannelName());
pvaClientGetData = PvaClientGetData::create(getStructure);
pvaClientGetData->setMessagePrefix(channelPutGet->getChannel()->getChannelName());
{
Lock xx(mutex);
this->channelPutGet = channelPutGet;
if(status.isOK()) {
channelPutGetConnectStatus = status;
connectState = connected;
pvaClientPutData = PvaClientPutData::create(putStructure);
pvaClientPutData->setMessagePrefix(channelPutGet->getChannel()->getChannelName());
pvaClientGetData = PvaClientGetData::create(getStructure);
pvaClientGetData->setMessagePrefix(channelPutGet->getChannel()->getChannelName());
} else {
stringstream ss;
ss << pvRequest;
string message = string("\nPvaClientPutGet::channelPutGetConnect)")
+ "\npvRequest\n" + ss.str()
+ "\nerror\n" + status.getMessage();
channelPutGetConnectStatus = Status(Status::STATUSTYPE_ERROR,message);
}
}
PvaClientPutGetRequesterPtr req(pvaClientPutGetRequester.lock());
if(req) {
req->channelPutGetConnect(status,shared_from_this());
}
waitForConnect.signal();
@@ -187,17 +229,22 @@ void PvaClientPutGet::putGetDone(
BitSetPtr const & getChangedBitSet)
{
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientPutGet::putGetDone"
<< " channelName " << channelName
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< " status.isOK " << (status.isOK() ? "true" : "false")
<< endl;
}
channelPutGetStatus = status;
if(status.isOK()) {
pvaClientGetData->setData(getPVStructure,getChangedBitSet);
PVStructurePtr pvs = pvaClientGetData->getPVStructure();
pvs->copyUnchecked(*getPVStructure,*getChangedBitSet);
BitSetPtr bs = pvaClientGetData->getChangedBitSet();
bs->clear();
*bs |= *getChangedBitSet;
}
PvaClientPutGetRequesterPtr req(pvaClientPutGetRequester.lock());
if(req) {
req->putGetDone(status,shared_from_this());
}
waitForPutGet.signal();
}
@@ -209,11 +256,8 @@ void PvaClientPutGet::getPutDone(
BitSetPtr const & putBitSet)
{
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientPutGet::getPutDone"
<< " channelName " << channelName
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< " status.isOK " << (status.isOK() ? "true" : "false")
<< endl;
}
@@ -225,6 +269,10 @@ void PvaClientPutGet::getPutDone(
bs->clear();
*bs |= *putBitSet;
}
PvaClientPutGetRequesterPtr req(pvaClientPutGetRequester.lock());
if(req) {
req->getPutDone(status,shared_from_this());
}
waitForPutGet.signal();
}
@@ -235,31 +283,38 @@ void PvaClientPutGet::getGetDone(
BitSetPtr const & getChangedBitSet)
{
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientPutGet::getGetDone"
<< " channelName " << channelName
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< " status.isOK " << (status.isOK() ? "true" : "false")
<< endl;
}
channelPutGetStatus = status;
if(status.isOK()) {
pvaClientGetData->setData(getPVStructure,getChangedBitSet);
PVStructurePtr pvs = pvaClientGetData->getPVStructure();
pvs->copyUnchecked(*getPVStructure,*getChangedBitSet);
BitSetPtr bs = pvaClientGetData->getChangedBitSet();
bs->clear();
*bs |= *getChangedBitSet;
}
PvaClientPutGetRequesterPtr req(pvaClientPutGetRequester.lock());
if(req) {
req->getGetDone(status,shared_from_this());
}
waitForPutGet.signal();
}
void PvaClientPutGet::connect()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPutGet::connect"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
issueConnect();
Status status = waitConnect();
if(status.isOK()) return;
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ")
+ channelName
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPutGet::connect "
+ status.getMessage();
throw std::runtime_error(message);
@@ -267,24 +322,29 @@ void PvaClientPutGet::connect()
void PvaClientPutGet::issueConnect()
{
Channel::shared_pointer chan(channel.lock());
if(PvaClient::getDebug()) {
cout << "PvaClientPutGet::issueConnect"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(connectState!=connectIdle) {
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ") + channelName
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ " pvaClientPutGet already connected ";
throw std::runtime_error(message);
}
if(chan) {
connectState = connectActive;
channelPutGet = chan->createChannelPutGet(channelPutGetRequester,pvRequest);
return;
}
throw std::runtime_error("PvaClientPutGet::issueConnect() but channel disconnected");
connectState = connectActive;
channelPutGetConnectStatus = Status(Status::STATUSTYPE_ERROR, "connect active");
channelPutGet = pvaClientChannel->getChannel()->createChannelPutGet(channelPutGetRequester,pvRequest);
}
Status PvaClientPutGet::waitConnect()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPutGet::waitConnect"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
{
Lock xx(mutex);
if(connectState==connected) {
@@ -292,30 +352,30 @@ Status PvaClientPutGet::waitConnect()
return channelPutGetConnectStatus;
}
if(connectState!=connectActive) {
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ") + channelName
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPutGet::waitConnect illegal connect state ";
throw std::runtime_error(message);
}
}
waitForConnect.wait();
connectState = channelPutGetConnectStatus.isOK() ? connected : connectIdle;
if(!channelPutGetConnectStatus.isOK()) connectState = connectIdle;
return channelPutGetConnectStatus;
}
void PvaClientPutGet::putGet()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPutGet::putGet"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
issuePutGet();
Status status = waitPutGet();
if(status.isOK()) return;
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ")
+ channelName
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPut::putGet "
+ status.getMessage();
throw std::runtime_error(message);
@@ -323,12 +383,15 @@ void PvaClientPutGet::putGet()
void PvaClientPutGet::issuePutGet()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPutGet::issuePutGet"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(connectState==connectIdle) connect();
if(putGetState!=putGetIdle) {
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ") + channelName
if(putGetState==putGetActive) {
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPutGet::issuePutGet get or put aleady active ";
throw std::runtime_error(message);
}
@@ -339,29 +402,35 @@ void PvaClientPutGet::issuePutGet()
Status PvaClientPutGet::waitPutGet()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPutGet::waitPutGet"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(putGetState!=putGetActive){
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ") + channelName
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPutGet::waitPutGet get or put aleady active ";
throw std::runtime_error(message);
}
waitForPutGet.wait();
putGetState = putGetIdle;
putGetState = putGetComplete;
if(channelPutGetStatus.isOK()) pvaClientPutData->getChangedBitSet()->clear();
return channelPutGetStatus;
}
void PvaClientPutGet::getGet()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPutGet::getGet"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
issueGetGet();
Status status = waitGetGet();
if(status.isOK()) return;
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ")
+ channelName
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPut::getGet "
+ status.getMessage();
throw std::runtime_error(message);
@@ -369,12 +438,15 @@ void PvaClientPutGet::getGet()
void PvaClientPutGet::issueGetGet()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPutGet::issueGetGet"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(connectState==connectIdle) connect();
if(putGetState!=putGetIdle) {
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ") + channelName
if(putGetState==putGetActive) {
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPutGet::issueGetGet get or put aleady active ";
throw std::runtime_error(message);
}
@@ -384,29 +456,34 @@ void PvaClientPutGet::issueGetGet()
Status PvaClientPutGet::waitGetGet()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPutGet::waitGetGet"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(putGetState!=putGetActive){
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ") + channelName
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPutGet::waitGetGet get or put aleady active ";
throw std::runtime_error(message);
}
waitForPutGet.wait();
putGetState = putGetIdle;
putGetState = putGetComplete;
return channelPutGetStatus;
}
void PvaClientPutGet::getPut()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPutGet::getGetPut"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
issueGetPut();
Status status = waitGetPut();
if(status.isOK()) return;
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ")
+ channelName
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPut::getPut "
+ status.getMessage();
throw std::runtime_error(message);
@@ -414,12 +491,15 @@ void PvaClientPutGet::getPut()
void PvaClientPutGet::issueGetPut()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPutGet::issueGetPut"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(connectState==connectIdle) connect();
if(putGetState!=putGetIdle) {
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ") + channelName
if(putGetState==putGetActive) {
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPutGet::issueGetPut get or put aleady active ";
throw std::runtime_error(message);
}
@@ -429,29 +509,62 @@ void PvaClientPutGet::issueGetPut()
Status PvaClientPutGet::waitGetPut()
{
if(PvaClient::getDebug()) {
cout << "PvaClientPutGet::waitGetPut"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
if(putGetState!=putGetActive){
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ") + channelName
string message = string("channel ")
+ pvaClientChannel->getChannel()->getChannelName()
+ " PvaClientPutGet::waitGetPut get or put aleady active ";
throw std::runtime_error(message);
}
waitForPutGet.wait();
putGetState = putGetIdle;
putGetState = putGetComplete;
return channelPutGetStatus;
}
PvaClientGetDataPtr PvaClientPutGet::getGetData()
{
if(PvaClient::getDebug()) {
cout<< "PvaClientPutGet::getGetData"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
checkPutGetState();
if(putGetState==putGetIdle) getGet();
return pvaClientGetData;
}
PvaClientPutDataPtr PvaClientPutGet::getPutData()
{
if(PvaClient::getDebug()) {
cout<< "PvaClientPutGet::getPutData"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
checkPutGetState();
if(putGetState==putGetIdle) getPut();
return pvaClientPutData;
}
void PvaClientPutGet::setRequester(PvaClientPutGetRequesterPtr const & pvaClientPutGetRequester)
{
if(PvaClient::getDebug()) {
cout << "PvaClientPutGet::setRequester"
<< " channelName " << pvaClientChannel->getChannel()->getChannelName()
<< endl;
}
this->pvaClientPutGetRequester = pvaClientPutGetRequester;
}
PvaClientChannelPtr PvaClientPutGet::getPvaClientChannel()
{
return pvaClientChannel;
}
}}