Merge pull request #74 from mrkraimer/master

This fixes the problems raised in issue #71
This commit is contained in:
Andrew Johnson
2017-11-28 09:04:47 -06:00
committed by GitHub
2 changed files with 132 additions and 61 deletions

View File

@@ -238,6 +238,9 @@ void CAChannel::connected()
if(DEBUG_LEVEL>0) {
cout<< "CAChannel::connected " << channelName << endl;
}
std::queue<CAChannelPutPtr> putQ;
std::queue<CAChannelGetPtr> getQ;
std::queue<CAChannelMonitorPtr> monitorQ;
{
Lock lock(requestsMutex);
// we assume array if element count > 1
@@ -258,24 +261,7 @@ void CAChannel::connected()
// TODO we need only Structure here
this->structure = structure;
}
while(!putQueue.empty()) {
putQueue.front()->activate();
putQueue.pop();
}
while(!getQueue.empty()) {
getQueue.front()->activate();
getQueue.pop();
}
while(!monitorQueue.empty()) {
monitorQueue.front()->activate();
monitorQueue.pop();
}
std::queue<CAChannelPutPtr> putQ;
std::queue<CAChannelGetPtr> getQ;
std::queue<CAChannelMonitorPtr> monitorQ;
{
Lock lock(requestsMutex);
std::vector<CAChannelGetWPtr>::const_iterator getiter;
for (getiter = getList.begin(); getiter != getList.end(); ++getiter) {
CAChannelGetPtr temp = (*getiter).lock();
@@ -296,8 +282,8 @@ void CAChannel::connected()
}
}
while(!putQ.empty()) {
putQ.front()->channelCreated(Status::Ok,shared_from_this());
putQ.pop();
putQ.front()->channelCreated(Status::Ok,shared_from_this());
putQ.pop();
}
while(!getQ.empty()) {
getQ.front()->channelCreated(Status::Ok,shared_from_this());
@@ -307,6 +293,22 @@ void CAChannel::connected()
monitorQ.front()->channelCreated(Status::Ok,shared_from_this());
monitorQ.pop();
}
while(!getFieldQueue.empty()) {
getFieldQueue.front()->callRequester(shared_from_this());
getFieldQueue.pop();
}
while(!putQueue.empty()) {
putQueue.front()->activate();
putQueue.pop();
}
while(!getQueue.empty()) {
getQueue.front()->activate();
getQueue.pop();
}
while(!monitorQueue.empty()) {
monitorQueue.front()->activate();
monitorQueue.pop();
}
ChannelRequester::shared_pointer req(channelRequester.lock());
if(req) {
EXCEPTION_GUARD(req->channelStateChange(
@@ -520,20 +522,18 @@ std::tr1::shared_ptr<ChannelRequester> CAChannel::getChannelRequester()
void CAChannel::getField(GetFieldRequester::shared_pointer const & requester,
std::string const & subField)
{
Field::const_shared_pointer field =
subField.empty() ?
std::tr1::static_pointer_cast<const Field>(structure) :
structure->getField(subField);
if (field)
{
EXCEPTION_GUARD(requester->getDone(Status::Ok, field));
if(DEBUG_LEVEL>0) {
cout << "CAChannel::getField " << channelName << endl;
}
else
CAChannelGetFieldPtr getField(new CAChannelGetField(requester,subField));
{
Status errorStatus(Status::STATUSTYPE_ERROR, "field '" + subField + "' not found");
EXCEPTION_GUARD(requester->getDone(errorStatus, FieldConstPtr()));
}
Lock lock(requestsMutex);
if(getConnectionState()!=Channel::CONNECTED) {
getFieldQueue.push(getField);
return;
}
}
getField->callRequester(shared_from_this());
}
@@ -555,12 +555,16 @@ ChannelGet::shared_pointer CAChannel::createChannelGet(
if(DEBUG_LEVEL>0) {
cout << "CAChannel::createChannelGet " << channelName << endl;
}
CAChannelGetPtr channelGet = CAChannelGet::create(shared_from_this(), channelGetRequester, pvRequest);\
if(getConnectionState()==Channel::CONNECTED) {
channelGet->activate();
} else {
getQueue.push(channelGet);
CAChannelGetPtr channelGet =
CAChannelGet::create(shared_from_this(), channelGetRequester, pvRequest);
{
Lock lock(requestsMutex);
if(getConnectionState()!=Channel::CONNECTED) {
getQueue.push(channelGet);
return channelGet;
}
}
channelGet->activate();
return channelGet;
}
@@ -572,12 +576,16 @@ ChannelPut::shared_pointer CAChannel::createChannelPut(
if(DEBUG_LEVEL>0) {
cout << "CAChannel::createChannelPut " << channelName << endl;
}
CAChannelPutPtr channelPut = CAChannelPut::create(shared_from_this(), channelPutRequester, pvRequest);\
if(getConnectionState()==Channel::CONNECTED) {
channelPut->activate();
} else {
putQueue.push(channelPut);
CAChannelPutPtr channelPut =
CAChannelPut::create(shared_from_this(), channelPutRequester, pvRequest);
{
Lock lock(requestsMutex);
if(getConnectionState()!=Channel::CONNECTED) {
putQueue.push(channelPut);
return channelPut;
}
}
channelPut->activate();
return channelPut;
}
@@ -589,12 +597,16 @@ Monitor::shared_pointer CAChannel::createMonitor(
if(DEBUG_LEVEL>0) {
cout << "CAChannel::createMonitor " << channelName << endl;
}
CAChannelMonitorPtr channelMonitor = CAChannelMonitor::create(shared_from_this(), monitorRequester, pvRequest);\
if(getConnectionState()==Channel::CONNECTED) {
channelMonitor->activate();
} else {
monitorQueue.push(channelMonitor);
CAChannelMonitorPtr channelMonitor =
CAChannelMonitor::create(shared_from_this(), monitorRequester, pvRequest);
{
Lock lock(requestsMutex);
if(getConnectionState()!=Channel::CONNECTED) {
monitorQueue.push(channelMonitor);
return channelMonitor;
}
}
channelMonitor->activate();
return channelMonitor;
}
@@ -613,6 +625,47 @@ void CAChannel::printInfo(std::ostream& out)
}
CAChannelGetField::CAChannelGetField(
GetFieldRequester::shared_pointer const & requester,std::string const & subField)
: getFieldRequester(requester),
subField(subField)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelGetField::CAChannelGetField()\n";
}
}
CAChannelGetField::~CAChannelGetField()
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelGetField::~CAChannelGetField()\n";
}
}
void CAChannelGetField::callRequester(CAChannelPtr const & caChannel)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelGetField::callRequester\n";
}
GetFieldRequester::shared_pointer requester(getFieldRequester.lock());
if(!requester) return;
epics::pvData::Structure::const_shared_pointer structure(caChannel->getStructure());
Field::const_shared_pointer field =
subField.empty() ?
std::tr1::static_pointer_cast<const Field>(structure) :
structure->getField(subField);
if (field)
{
EXCEPTION_GUARD(requester->getDone(Status::Ok, field));
}
else
{
Status errorStatus(Status::STATUSTYPE_ERROR, "field '" + subField + "' not found");
EXCEPTION_GUARD(requester->getDone(errorStatus, FieldConstPtr()));
}
}
/* ---------------------------------------------------------- */
void CAChannel::threadAttach()
@@ -624,15 +677,6 @@ void CAChannel::threadAttach()
}
CAChannelGetPtr CAChannelGet::create(
CAChannel::shared_pointer const & channel,
ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return CAChannelGetPtr(new CAChannelGet(channel, channelGetRequester, pvRequest));
}
static chtype getDBRType(PVStructure::shared_pointer const & pvRequest, chtype nativeType)
{
// get "field" sub-structure
@@ -642,9 +686,13 @@ static chtype getDBRType(PVStructure::shared_pointer const & pvRequest, chtype n
fieldSubField = pvRequest;
Structure::const_shared_pointer fieldStructure = fieldSubField->getStructure();
// no fields or control -> DBR_CTRL_<type>
if (fieldStructure->getNumberFields() == 0 ||
fieldStructure->getField("control"))
// no fields
if (fieldStructure->getNumberFields() == 0)
{
return static_cast<chtype>(static_cast<int>(nativeType) + DBR_TIME_STRING);
}
// control -> DBR_CTRL_<type>
if (fieldStructure->getField("control"))
return static_cast<chtype>(static_cast<int>(nativeType) + DBR_CTRL_STRING);
// display/valueAlarm -> DBR_GR_<type>
@@ -666,6 +714,15 @@ static chtype getDBRType(PVStructure::shared_pointer const & pvRequest, chtype n
size_t CAChannelGet::num_instances;
CAChannelGetPtr CAChannelGet::create(
CAChannel::shared_pointer const & channel,
ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return CAChannelGetPtr(new CAChannelGet(channel, channelGetRequester, pvRequest));
}
CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel,
ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)

View File

@@ -24,6 +24,9 @@ namespace ca {
class CAChannel;
typedef std::tr1::shared_ptr<CAChannel> CAChannelPtr;
class CAChannelGetField;
typedef std::tr1::shared_ptr<CAChannelGetField> CAChannelGetFieldPtr;
typedef std::tr1::weak_ptr<CAChannelGetField> CAChannelGetFieldWPtr;
class CAChannelPut;
typedef std::tr1::shared_ptr<CAChannelPut> CAChannelPutPtr;
typedef std::tr1::weak_ptr<CAChannelPut> CAChannelPutWPtr;
@@ -34,8 +37,18 @@ class CAChannelMonitor;
typedef std::tr1::shared_ptr<CAChannelMonitor> CAChannelMonitorPtr;
typedef std::tr1::weak_ptr<CAChannelMonitor> CAChannelMonitorWPtr;
typedef std::tr1::shared_ptr<ChannelBaseRequester> ChannelBaseRequesterPtr;
typedef std::tr1::weak_ptr<ChannelBaseRequester> ChannelBaseRequesterWPtr;
class CAChannelGetField :
public std::tr1::enable_shared_from_this<CAChannelGetField>
{
public:
POINTER_DEFINITIONS(CAChannelGetField);
CAChannelGetField(GetFieldRequester::shared_pointer const & requester,std::string const & subField);
~CAChannelGetField();
void callRequester(CAChannelPtr const & caChannel);
private:
GetFieldRequester::weak_pointer getFieldRequester;
std::string subField;
};
class CAChannel :
public Channel,
@@ -120,6 +133,7 @@ private:
epics::pvData::Mutex requestsMutex;
std::queue<CAChannelGetFieldPtr> getFieldQueue;
std::queue<CAChannelPutPtr> putQueue;
std::queue<CAChannelGetPtr> getQueue;
std::queue<CAChannelMonitorPtr> monitorQueue;