record[block=] record[queueSize=] createChannelGet, createChannelPut, and createMonitor before channel connects

This commit is contained in:
mrkraimer
2017-07-28 10:36:29 -04:00
parent f7874396b9
commit b519422df5
2 changed files with 323 additions and 233 deletions

View File

@@ -4,6 +4,7 @@
* in file LICENSE that is included with this distribution.
*/
#include <epicsVersion.h>
#include <pv/standardField.h>
@@ -15,11 +16,12 @@
#include <pv/caStatus.h>
using namespace epics::pvData;
using namespace epics::pvAccess;
using namespace epics::pvAccess::ca;
using std::string;
namespace epics {
namespace pvAccess {
namespace ca {
#define EXCEPTION_GUARD(code) try { code; } \
catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \
catch (...) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); }
@@ -252,6 +254,18 @@ void CAChannel::connected()
// TODO call channelCreated if structure has changed
EXCEPTION_GUARD(channelRequester->channelStateChange(shared_from_this(), Channel::CONNECTED));
while(!putQueue.empty()) {
putQueue.front()->activate();
putQueue.pop();
}
while(!getQueue.empty()) {
getQueue.front()->activate();
getQueue.pop();
}
while(!monitorQueue.empty()) {
monitorQueue.front()->activate();
monitorQueue.pop();
}
}
void CAChannel::disconnected()
@@ -403,7 +417,13 @@ ChannelGet::shared_pointer CAChannel::createChannelGet(
ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return CAChannelGet::create(shared_from_this(), channelGetRequester, pvRequest);
CAChannelGetPtr channelGet = CAChannelGet::create(shared_from_this(), channelGetRequester, pvRequest);\
if(getConnectionState()==Channel::CONNECTED) {
channelGet->activate();
} else {
getQueue.push(channelGet);
}
return channelGet;
}
@@ -411,7 +431,13 @@ ChannelPut::shared_pointer CAChannel::createChannelPut(
ChannelPutRequester::shared_pointer const & channelPutRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return CAChannelPut::create(shared_from_this(), channelPutRequester, pvRequest);
CAChannelPutPtr channelPut = CAChannelPut::create(shared_from_this(), channelPutRequester, pvRequest);\
if(getConnectionState()==Channel::CONNECTED) {
channelPut->activate();
} else {
putQueue.push(channelPut);
}
return channelPut;
}
@@ -419,7 +445,13 @@ Monitor::shared_pointer CAChannel::createMonitor(
MonitorRequester::shared_pointer const & monitorRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return CAChannelMonitor::create(shared_from_this(), monitorRequester, pvRequest);
CAChannelMonitorPtr channelMonitor = CAChannelMonitor::create(shared_from_this(), monitorRequester, pvRequest);\
if(getConnectionState()==Channel::CONNECTED) {
channelMonitor->activate();
} else {
monitorQueue.push(channelMonitor);
}
return channelMonitor;
}
@@ -444,16 +476,8 @@ void CAChannel::destroy()
{
Lock lock(requestsMutex);
{
if (destroyed)
return;
if (destroyed) return;
destroyed = true;
while (!requests.empty())
{
ChannelRequest::shared_pointer request = requests.begin()->second.lock();
if (request)
request->destroy();
}
}
channelProvider->unregisterChannel(shared_from_this());
@@ -470,49 +494,18 @@ void CAChannel::threadAttach()
std::tr1::static_pointer_cast<CAChannelProvider>(channelProvider)->threadAttach();
}
void CAChannel::registerRequest(ChannelRequest::shared_pointer const & request)
{
Lock lock(requestsMutex);
requests[request.get()] = ChannelRequest::weak_pointer(request);
}
void CAChannel::unregisterRequest(ChannelRequest::shared_pointer const & request)
{
Lock lock(requestsMutex);
requests.erase(request.get());
}
ChannelGet::shared_pointer CAChannelGet::create(
CAChannelGetPtr CAChannelGet::create(
CAChannel::shared_pointer const & channel,
ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
// TODO use std::make_shared
std::tr1::shared_ptr<CAChannelGet> tp(
new CAChannelGet(channel, channelGetRequester, pvRequest)
);
ChannelGet::shared_pointer thisPtr = tp;
static_cast<CAChannelGet*>(thisPtr.get())->activate();
return thisPtr;
return CAChannelGetPtr(new CAChannelGet(channel, channelGetRequester, pvRequest));
}
CAChannelGet::~CAChannelGet()
{
// TODO
}
@@ -548,22 +541,25 @@ static chtype getDBRType(PVStructure::shared_pointer const & pvRequest, chtype n
}
CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & _channel,
ChannelGetRequester::shared_pointer const & _channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest) :
channel(_channel),
channelGetRequester(_channelGetRequester),
getType(getDBRType(pvRequest, _channel->getNativeType())),
pvStructure(createPVStructure(_channel, getType, pvRequest)),
bitSet(new BitSet(static_cast<uint32>(pvStructure->getStructure()->getNumberFields()))),
CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel,
ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
:
channel(channel),
channelGetRequester(channelGetRequester),
pvRequest(pvRequest),
lastRequestFlag(false)
{
// TODO
bitSet->set(0);
}
void CAChannelGet::activate()
{
if(pvStructure) throw std::runtime_error("CAChannelGet::activate() was called twice");
getType = getDBRType(pvRequest, channel->getNativeType());
pvStructure = createPVStructure(channel, getType, pvRequest);
bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields()));
bitSet->set(0);
EXCEPTION_GUARD(channelGetRequester->channelGetConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
@@ -974,12 +970,8 @@ void CAChannelGet::get()
*/
int result = ca_array_get_callback(getType,
#if (((EPICS_VERSION * 256 + EPICS_REVISION) * 256 + EPICS_MODIFICATION) >= ((3*256+14)*256+12))
0,
#else
channel->getElementCount(),
#endif
channel->getChannelID(), ca_get_handler, this);
0,
channel->getChannelID(), ca_get_handler, this);
if (result == ECA_NORMAL)
{
ca_flush_io();
@@ -1022,26 +1014,12 @@ void CAChannelGet::destroy()
}
ChannelPut::shared_pointer CAChannelPut::create(
CAChannelPutPtr CAChannelPut::create(
CAChannel::shared_pointer const & channel,
ChannelPutRequester::shared_pointer const & channelPutRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
// TODO use std::make_shared
std::tr1::shared_ptr<CAChannelPut> tp(
new CAChannelPut(channel, channelPutRequester, pvRequest)
);
ChannelPut::shared_pointer thisPtr = tp;
static_cast<CAChannelPut*>(thisPtr.get())->activate();
return thisPtr;
return CAChannelPutPtr(new CAChannelPut(channel, channelPutRequester, pvRequest));
}
@@ -1051,22 +1029,32 @@ CAChannelPut::~CAChannelPut()
}
CAChannelPut::CAChannelPut(CAChannel::shared_pointer const & _channel,
ChannelPutRequester::shared_pointer const & _channelPutRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest) :
channel(_channel),
channelPutRequester(_channelPutRequester),
getType(getDBRType(pvRequest, _channel->getNativeType())),
pvStructure(createPVStructure(_channel, getType, pvRequest)),
bitSet(new BitSet(static_cast<uint32>(pvStructure->getStructure()->getNumberFields()))),
lastRequestFlag(false)
CAChannelPut::CAChannelPut(CAChannel::shared_pointer const & channel,
ChannelPutRequester::shared_pointer const & channelPutRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
:
channel(channel),
channelPutRequester(channelPutRequester),
pvRequest(pvRequest),
lastRequestFlag(false),
block(false)
{
// NOTE: we require value type, we can only put value field
bitSet->set(pvStructure->getSubFieldT("value")->getFieldOffset());
}
void CAChannelPut::activate()
{
if(pvStructure) throw std::runtime_error("CAChannelPut::activate() was called twice");
getType = getDBRType(pvRequest,channel->getNativeType());
pvStructure = createPVStructure(channel, getType, pvRequest);
bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields()));
// NOTE: we require value type, we can only put value field
PVStringPtr pvString = pvRequest->getSubField<PVString>("record._options.block");
if(pvString) {
std::string val = pvString->get();
if(val.compare("true")==0) block = true;
}
bitSet->set(pvStructure->getSubFieldT("value")->getFieldOffset());
EXCEPTION_GUARD(channelPutRequester->channelPutConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
@@ -1103,9 +1091,15 @@ int doPut_pvStructure(CAChannel::shared_pointer const & channel, void *usrArg, P
std::tr1::shared_ptr<sF> value = std::tr1::static_pointer_cast<sF>(pvStructure->getSubFieldT("value"));
pT val = value->get();
int result = ca_array_put_callback(channel->getNativeType(), 1,
channel->getChannelID(), &val,
ca_put_handler, usrArg);
int result = 0;
if(usrArg!=NULL) {
result = ca_array_put_callback(channel->getNativeType(), 1,
channel->getChannelID(), &val,
ca_put_handler, usrArg);
} else {
result = ca_array_put(channel->getNativeType(), 1,
channel->getChannelID(), &val);
}
if (result == ECA_NORMAL)
{
@@ -1119,10 +1113,17 @@ int doPut_pvStructure(CAChannel::shared_pointer const & channel, void *usrArg, P
std::tr1::shared_ptr<aF> value = pvStructure->getSubFieldT<aF>("value");
const pT* val = value->view().data();
int result = ca_array_put_callback(channel->getNativeType(), static_cast<unsigned long>(value->getLength()),
channel->getChannelID(), val,
ca_put_handler, usrArg);
int result = 0;
if(usrArg!=NULL) {
result = ca_array_put_callback(channel->getNativeType(),
static_cast<unsigned long>(value->getLength()),
channel->getChannelID(), val,
ca_put_handler, usrArg);
} else {
result = ca_array_put(channel->getNativeType(),
static_cast<unsigned long>(value->getLength()),
channel->getChannelID(), val);
}
if (result == ECA_NORMAL)
{
ca_flush_io();
@@ -1143,10 +1144,17 @@ int doPut_pvStructure<string, pvString, PVString, PVStringArray>(CAChannel::shar
std::tr1::shared_ptr<PVString> value = std::tr1::static_pointer_cast<PVString>(pvStructure->getSubFieldT("value"));
string val = value->get();
int result = ca_array_put_callback(channel->getNativeType(), 1,
channel->getChannelID(), val.c_str(),
ca_put_handler, usrArg);
int result = 0;
if(usrArg!=NULL) {
result = ca_array_put_callback(
channel->getNativeType(), 1,
channel->getChannelID(), val.c_str(),
ca_put_handler, usrArg);
} else {
result = ca_array_put(
channel->getNativeType(), 1,
channel->getChannelID(), val.c_str());
}
if (result == ECA_NORMAL)
{
ca_flush_io();
@@ -1176,10 +1184,17 @@ int doPut_pvStructure<string, pvString, PVString, PVStringArray>(CAChannel::shar
p += MAX_STRING_SIZE;
}
int result = ca_array_put_callback(channel->getNativeType(), arraySize,
channel->getChannelID(), ca_stringBuffer,
ca_put_handler, usrArg);
int result = 0;
if(usrArg!=NULL) {
result = ca_array_put_callback(
channel->getNativeType(), arraySize,
channel->getChannelID(), ca_stringBuffer,
ca_put_handler, usrArg);
} else {
result = ca_array_put(
channel->getNativeType(), arraySize,
channel->getChannelID(), ca_stringBuffer);
}
delete[] ca_stringBuffer;
if (result == ECA_NORMAL)
@@ -1202,10 +1217,17 @@ int doPut_pvStructure<dbr_enum_t, pvString, PVString, PVStringArray>(CAChannel::
std::tr1::shared_ptr<PVInt> value = std::tr1::static_pointer_cast<PVInt>(pvStructure->getSubFieldT("value.index"));
dbr_enum_t val = value->get();
int result = ca_array_put_callback(channel->getNativeType(), 1,
channel->getChannelID(), &val,
ca_put_handler, usrArg);
int result = 0;
if(usrArg!=NULL) {
result = ca_array_put_callback(
channel->getNativeType(), 1,
channel->getChannelID(), &val,
ca_put_handler, usrArg);
} else {
result = ca_array_put(
channel->getNativeType(), 1,
channel->getChannelID(), &val);
}
if (result == ECA_NORMAL)
{
ca_flush_io();
@@ -1254,16 +1276,29 @@ void CAChannelPut::put(PVStructure::shared_pointer const & pvPutStructure,
BitSet::shared_pointer const & /*putBitSet*/)
{
channel->threadAttach();
doPut putFunc = doPutFuncTable[channel->getNativeType()];
if (putFunc)
{
// TODO now we always put all
int result = putFunc(channel, this, pvPutStructure);
if (result != ECA_NORMAL)
{
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
EXCEPTION_GUARD(channelPutRequester->putDone(errorStatus, shared_from_this()));
// TODO now we always put all
if(block) {
int result = putFunc(channel, this, pvPutStructure);
if (result != ECA_NORMAL)
{
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
EXCEPTION_GUARD(channelPutRequester->putDone(errorStatus, shared_from_this()));
}
} else {
int result = putFunc(channel,NULL, pvPutStructure);
if (result == ECA_NORMAL)
{
EXCEPTION_GUARD(channelPutRequester->putDone(Status::Ok, shared_from_this()));
}
else
{
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
EXCEPTION_GUARD(channelPutRequester->putDone(errorStatus, shared_from_this()));
}
}
}
else
@@ -1352,60 +1387,6 @@ void CAChannelPut::destroy()
// TODO
}
Monitor::shared_pointer CAChannelMonitor::create(
CAChannel::shared_pointer const & channel,
MonitorRequester::shared_pointer const & monitorRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
// TODO use std::make_shared
std::tr1::shared_ptr<CAChannelMonitor> tp(
new CAChannelMonitor(channel, monitorRequester, pvRequest)
);
Monitor::shared_pointer thisPtr = tp;
static_cast<CAChannelMonitor*>(thisPtr.get())->activate();
return thisPtr;
}
CAChannelMonitor::~CAChannelMonitor()
{
// TODO
}
CAChannelMonitor::CAChannelMonitor(CAChannel::shared_pointer const & _channel,
MonitorRequester::shared_pointer const & _monitorRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest) :
channel(_channel),
monitorRequester(_monitorRequester),
getType(getDBRType(pvRequest, _channel->getNativeType())),
pvStructure(createPVStructure(_channel, getType, pvRequest)),
count(0),
element(new MonitorElement(pvStructure))
{
// TODO
element->changedBitSet->set(0);
}
void CAChannelMonitor::activate()
{
// TODO remove
thisPointer = shared_from_this();
EXCEPTION_GUARD(monitorRequester->monitorConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
/* --------------- Monitor --------------- */
@@ -1415,27 +1396,144 @@ static void ca_subscription_handler(struct event_handler_args args)
channelMonitor->subscriptionEvent(args);
}
class CACMonitorQueue :
public std::tr1::enable_shared_from_this<CACMonitorQueue>
{
public:
POINTER_DEFINITIONS(CACMonitorQueue);
private:
size_t queueSize;
bool overrunInProgress;
bool isStarted;
Mutex mutex;
std::queue<MonitorElementPtr> monitorElementQueue;
public:
CACMonitorQueue(
int32 queueSize)
: queueSize(queueSize),
overrunInProgress(false),
isStarted(false)
{}
~CACMonitorQueue()
{
}
void start()
{
Lock guard(mutex);
while(!monitorElementQueue.empty()) monitorElementQueue.pop();
overrunInProgress = false;
isStarted = true;
}
void stop()
{
Lock guard(mutex);
while(!monitorElementQueue.empty()) monitorElementQueue.pop();
overrunInProgress = false;
isStarted = false;
}
// return true if added to queue
bool event(const PVStructurePtr &pvStructure)
{
Lock guard(mutex);
if(!isStarted) return false;
if(monitorElementQueue.size()==queueSize)
{
overrunInProgress = true;
return false;
} else {
PVStructure::shared_pointer pvs =
getPVDataCreate()->createPVStructure(pvStructure->getStructure());
pvs->copy(*pvStructure);
MonitorElementPtr monitorElement(new MonitorElement(pvs));
monitorElement->changedBitSet->set(0);
if(overrunInProgress) {
overrunInProgress = false;
monitorElement->overrunBitSet->set(0);
}
monitorElementQueue.push(monitorElement);
return true;
}
}
MonitorElementPtr poll()
{
Lock guard(mutex);
if(!isStarted) return MonitorElementPtr();
if(monitorElementQueue.empty()) return MonitorElementPtr();
MonitorElementPtr retval = monitorElementQueue.front();
return retval;
}
void release(MonitorElementPtr const & monitorElement)
{
Lock guard(mutex);
if(monitorElementQueue.empty()) {
throw std::runtime_error("client error calling release");
}
monitorElementQueue.pop();
}
};
CAChannelMonitorPtr CAChannelMonitor::create(
CAChannel::shared_pointer const & channel,
MonitorRequester::shared_pointer const & monitorRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return CAChannelMonitorPtr(new CAChannelMonitor(channel, monitorRequester, pvRequest));
}
CAChannelMonitor::~CAChannelMonitor()
{
}
CAChannelMonitor::CAChannelMonitor(
CAChannel::shared_pointer const & channel,
MonitorRequester::shared_pointer const & monitorRequester,
PVStructurePtr const & pvRequest)
:
channel(channel),
monitorRequester(monitorRequester),
pvRequest(pvRequest)
{
}
void CAChannelMonitor::activate()
{
if(pvStructure) throw std::runtime_error("CAChannelMonitor::activate() was called twice");
getType = getDBRType(pvRequest, channel->getNativeType());
pvStructure = createPVStructure(channel, getType, pvRequest);
int32 queueSize = 2;
PVStructurePtr pvOptions = pvRequest->getSubField<PVStructure>("record._options");
if (pvOptions) {
PVStringPtr pvString = pvOptions->getSubField<PVString>("queueSize");
if (pvString) {
int size;
std::stringstream ss;
ss << pvString->get();
ss >> size;
if (size > 1) queueSize = size;
}
}
monitorQueue = CACMonitorQueuePtr(new CACMonitorQueue(queueSize));
EXCEPTION_GUARD(monitorRequester->monitorConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
{
if (args.status == ECA_NORMAL)
{
// TODO override indicator
copyDBRtoPVStructure copyFunc = copyFuncTable[getType];
if (copyFunc)
if (copyFunc) {
copyFunc(args.dbr, args.count, pvStructure);
else
{
// TODO remove
monitorQueue->event(pvStructure);
// call monitorRequester even if queue is full
monitorRequester->monitorEvent(shared_from_this());
} else {
std::cout << "no copy func implemented" << std::endl;
}
{
Lock lock(mutex);
count = 1;
}
monitorRequester->monitorEvent(shared_from_this());
}
else
{
@@ -1444,7 +1542,6 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
}
}
epics::pvData::Status CAChannelMonitor::start()
{
channel->threadAttach();
@@ -1462,12 +1559,13 @@ epics::pvData::Status CAChannelMonitor::start()
// TODO DBE_PROPERTY support
int result = ca_create_subscription(getType,
0 /*channel->getElementCount()*/,
channel->getChannelID(), DBE_VALUE,
ca_subscription_handler, this,
&eventID);
0,
channel->getChannelID(), DBE_VALUE,
ca_subscription_handler, this,
&eventID);
if (result == ECA_NORMAL)
{
monitorQueue->start();
ca_flush_io();
return Status::Ok;
}
@@ -1477,7 +1575,6 @@ epics::pvData::Status CAChannelMonitor::start()
}
}
epics::pvData::Status CAChannelMonitor::stop()
{
channel->threadAttach();
@@ -1486,6 +1583,7 @@ epics::pvData::Status CAChannelMonitor::stop()
if (result == ECA_NORMAL)
{
monitorQueue->stop();
return Status::Ok;
}
else
@@ -1497,22 +1595,13 @@ epics::pvData::Status CAChannelMonitor::stop()
MonitorElementPtr CAChannelMonitor::poll()
{
Lock lock(mutex);
if (count)
{
count--;
return element;
}
else
{
return nullElement;
}
return monitorQueue->poll();
}
void CAChannelMonitor::release(MonitorElementPtr const & /*monitorElement*/)
void CAChannelMonitor::release(MonitorElementPtr const & monitorElement)
{
// noop
monitorQueue->release(monitorElement);
}
/* --------------- epics::pvData::ChannelRequest --------------- */
@@ -1528,9 +1617,7 @@ void CAChannelMonitor::cancel()
void CAChannelMonitor::destroy()
{
channel->threadAttach();
ca_clear_subscription(eventID);
// TODO
thisPointer.reset();
}
}}}