From b519422df5d2c5ca7caf9e4b707c4c021e94c61c Mon Sep 17 00:00:00 2001 From: mrkraimer Date: Fri, 28 Jul 2017 10:36:29 -0400 Subject: [PATCH] record[block=] record[queueSize=] createChannelGet, createChannelPut, and createMonitor before channel connects --- src/ca/caChannel.cpp | 497 +++++++++++++++++++++++++----------------- src/ca/pv/caChannel.h | 59 ++--- 2 files changed, 323 insertions(+), 233 deletions(-) diff --git a/src/ca/caChannel.cpp b/src/ca/caChannel.cpp index b28ba1f..edbcf27 100644 --- a/src/ca/caChannel.cpp +++ b/src/ca/caChannel.cpp @@ -4,6 +4,7 @@ * in file LICENSE that is included with this distribution. */ + #include #include @@ -15,11 +16,12 @@ #include using namespace epics::pvData; -using namespace epics::pvAccess; -using namespace epics::pvAccess::ca; - using std::string; +namespace epics { +namespace pvAccess { +namespace ca { + #define EXCEPTION_GUARD(code) try { code; } \ catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \ catch (...) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); } @@ -252,6 +254,18 @@ void CAChannel::connected() // TODO call channelCreated if structure has changed EXCEPTION_GUARD(channelRequester->channelStateChange(shared_from_this(), Channel::CONNECTED)); + while(!putQueue.empty()) { + putQueue.front()->activate(); + putQueue.pop(); + } + while(!getQueue.empty()) { + getQueue.front()->activate(); + getQueue.pop(); + } + while(!monitorQueue.empty()) { + monitorQueue.front()->activate(); + monitorQueue.pop(); + } } void CAChannel::disconnected() @@ -403,7 +417,13 @@ ChannelGet::shared_pointer CAChannel::createChannelGet( ChannelGetRequester::shared_pointer const & channelGetRequester, epics::pvData::PVStructure::shared_pointer const & pvRequest) { - return CAChannelGet::create(shared_from_this(), channelGetRequester, pvRequest); + CAChannelGetPtr channelGet = CAChannelGet::create(shared_from_this(), channelGetRequester, pvRequest);\ + if(getConnectionState()==Channel::CONNECTED) { + channelGet->activate(); + } else { + getQueue.push(channelGet); + } + return channelGet; } @@ -411,7 +431,13 @@ ChannelPut::shared_pointer CAChannel::createChannelPut( ChannelPutRequester::shared_pointer const & channelPutRequester, epics::pvData::PVStructure::shared_pointer const & pvRequest) { - return CAChannelPut::create(shared_from_this(), channelPutRequester, pvRequest); + CAChannelPutPtr channelPut = CAChannelPut::create(shared_from_this(), channelPutRequester, pvRequest);\ + if(getConnectionState()==Channel::CONNECTED) { + channelPut->activate(); + } else { + putQueue.push(channelPut); + } + return channelPut; } @@ -419,7 +445,13 @@ Monitor::shared_pointer CAChannel::createMonitor( MonitorRequester::shared_pointer const & monitorRequester, epics::pvData::PVStructure::shared_pointer const & pvRequest) { - return CAChannelMonitor::create(shared_from_this(), monitorRequester, pvRequest); + CAChannelMonitorPtr channelMonitor = CAChannelMonitor::create(shared_from_this(), monitorRequester, pvRequest);\ + if(getConnectionState()==Channel::CONNECTED) { + channelMonitor->activate(); + } else { + monitorQueue.push(channelMonitor); + } + return channelMonitor; } @@ -444,16 +476,8 @@ void CAChannel::destroy() { Lock lock(requestsMutex); { - if (destroyed) - return; + if (destroyed) return; destroyed = true; - - while (!requests.empty()) - { - ChannelRequest::shared_pointer request = requests.begin()->second.lock(); - if (request) - request->destroy(); - } } channelProvider->unregisterChannel(shared_from_this()); @@ -470,49 +494,18 @@ void CAChannel::threadAttach() std::tr1::static_pointer_cast(channelProvider)->threadAttach(); } -void CAChannel::registerRequest(ChannelRequest::shared_pointer const & request) -{ - Lock lock(requestsMutex); - requests[request.get()] = ChannelRequest::weak_pointer(request); -} -void CAChannel::unregisterRequest(ChannelRequest::shared_pointer const & request) -{ - Lock lock(requestsMutex); - requests.erase(request.get()); -} - - - - - - - - - - - - - - -ChannelGet::shared_pointer CAChannelGet::create( +CAChannelGetPtr CAChannelGet::create( CAChannel::shared_pointer const & channel, ChannelGetRequester::shared_pointer const & channelGetRequester, epics::pvData::PVStructure::shared_pointer const & pvRequest) { - // TODO use std::make_shared - std::tr1::shared_ptr tp( - new CAChannelGet(channel, channelGetRequester, pvRequest) - ); - ChannelGet::shared_pointer thisPtr = tp; - static_cast(thisPtr.get())->activate(); - return thisPtr; + return CAChannelGetPtr(new CAChannelGet(channel, channelGetRequester, pvRequest)); } CAChannelGet::~CAChannelGet() { - // TODO } @@ -548,22 +541,25 @@ static chtype getDBRType(PVStructure::shared_pointer const & pvRequest, chtype n } -CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & _channel, - ChannelGetRequester::shared_pointer const & _channelGetRequester, - epics::pvData::PVStructure::shared_pointer const & pvRequest) : - channel(_channel), - channelGetRequester(_channelGetRequester), - getType(getDBRType(pvRequest, _channel->getNativeType())), - pvStructure(createPVStructure(_channel, getType, pvRequest)), - bitSet(new BitSet(static_cast(pvStructure->getStructure()->getNumberFields()))), +CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel, + ChannelGetRequester::shared_pointer const & channelGetRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) + : + channel(channel), + channelGetRequester(channelGetRequester), + pvRequest(pvRequest), lastRequestFlag(false) { - // TODO - bitSet->set(0); + } void CAChannelGet::activate() { + if(pvStructure) throw std::runtime_error("CAChannelGet::activate() was called twice"); + getType = getDBRType(pvRequest, channel->getNativeType()); + pvStructure = createPVStructure(channel, getType, pvRequest); + bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields())); + bitSet->set(0); EXCEPTION_GUARD(channelGetRequester->channelGetConnect(Status::Ok, shared_from_this(), pvStructure->getStructure())); } @@ -974,12 +970,8 @@ void CAChannelGet::get() */ int result = ca_array_get_callback(getType, -#if (((EPICS_VERSION * 256 + EPICS_REVISION) * 256 + EPICS_MODIFICATION) >= ((3*256+14)*256+12)) - 0, -#else - channel->getElementCount(), -#endif - channel->getChannelID(), ca_get_handler, this); + 0, + channel->getChannelID(), ca_get_handler, this); if (result == ECA_NORMAL) { ca_flush_io(); @@ -1022,26 +1014,12 @@ void CAChannelGet::destroy() } - - - - - - - - -ChannelPut::shared_pointer CAChannelPut::create( +CAChannelPutPtr CAChannelPut::create( CAChannel::shared_pointer const & channel, ChannelPutRequester::shared_pointer const & channelPutRequester, epics::pvData::PVStructure::shared_pointer const & pvRequest) { - // TODO use std::make_shared - std::tr1::shared_ptr tp( - new CAChannelPut(channel, channelPutRequester, pvRequest) - ); - ChannelPut::shared_pointer thisPtr = tp; - static_cast(thisPtr.get())->activate(); - return thisPtr; + return CAChannelPutPtr(new CAChannelPut(channel, channelPutRequester, pvRequest)); } @@ -1051,22 +1029,32 @@ CAChannelPut::~CAChannelPut() } -CAChannelPut::CAChannelPut(CAChannel::shared_pointer const & _channel, - ChannelPutRequester::shared_pointer const & _channelPutRequester, - epics::pvData::PVStructure::shared_pointer const & pvRequest) : - channel(_channel), - channelPutRequester(_channelPutRequester), - getType(getDBRType(pvRequest, _channel->getNativeType())), - pvStructure(createPVStructure(_channel, getType, pvRequest)), - bitSet(new BitSet(static_cast(pvStructure->getStructure()->getNumberFields()))), - lastRequestFlag(false) +CAChannelPut::CAChannelPut(CAChannel::shared_pointer const & channel, + ChannelPutRequester::shared_pointer const & channelPutRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) +: + channel(channel), + channelPutRequester(channelPutRequester), + pvRequest(pvRequest), + lastRequestFlag(false), + block(false) { - // NOTE: we require value type, we can only put value field - bitSet->set(pvStructure->getSubFieldT("value")->getFieldOffset()); + } void CAChannelPut::activate() { + if(pvStructure) throw std::runtime_error("CAChannelPut::activate() was called twice"); + getType = getDBRType(pvRequest,channel->getNativeType()); + pvStructure = createPVStructure(channel, getType, pvRequest); + bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields())); + // NOTE: we require value type, we can only put value field + PVStringPtr pvString = pvRequest->getSubField("record._options.block"); + if(pvString) { + std::string val = pvString->get(); + if(val.compare("true")==0) block = true; + } + bitSet->set(pvStructure->getSubFieldT("value")->getFieldOffset()); EXCEPTION_GUARD(channelPutRequester->channelPutConnect(Status::Ok, shared_from_this(), pvStructure->getStructure())); } @@ -1103,9 +1091,15 @@ int doPut_pvStructure(CAChannel::shared_pointer const & channel, void *usrArg, P std::tr1::shared_ptr value = std::tr1::static_pointer_cast(pvStructure->getSubFieldT("value")); pT val = value->get(); - int result = ca_array_put_callback(channel->getNativeType(), 1, - channel->getChannelID(), &val, - ca_put_handler, usrArg); + int result = 0; + if(usrArg!=NULL) { + result = ca_array_put_callback(channel->getNativeType(), 1, + channel->getChannelID(), &val, + ca_put_handler, usrArg); + } else { + result = ca_array_put(channel->getNativeType(), 1, + channel->getChannelID(), &val); + } if (result == ECA_NORMAL) { @@ -1119,10 +1113,17 @@ int doPut_pvStructure(CAChannel::shared_pointer const & channel, void *usrArg, P std::tr1::shared_ptr value = pvStructure->getSubFieldT("value"); const pT* val = value->view().data(); - int result = ca_array_put_callback(channel->getNativeType(), static_cast(value->getLength()), - channel->getChannelID(), val, - ca_put_handler, usrArg); - + int result = 0; + if(usrArg!=NULL) { + result = ca_array_put_callback(channel->getNativeType(), + static_cast(value->getLength()), + channel->getChannelID(), val, + ca_put_handler, usrArg); + } else { + result = ca_array_put(channel->getNativeType(), + static_cast(value->getLength()), + channel->getChannelID(), val); + } if (result == ECA_NORMAL) { ca_flush_io(); @@ -1143,10 +1144,17 @@ int doPut_pvStructure(CAChannel::shar std::tr1::shared_ptr value = std::tr1::static_pointer_cast(pvStructure->getSubFieldT("value")); string val = value->get(); - int result = ca_array_put_callback(channel->getNativeType(), 1, - channel->getChannelID(), val.c_str(), - ca_put_handler, usrArg); - + int result = 0; + if(usrArg!=NULL) { + result = ca_array_put_callback( + channel->getNativeType(), 1, + channel->getChannelID(), val.c_str(), + ca_put_handler, usrArg); + } else { + result = ca_array_put( + channel->getNativeType(), 1, + channel->getChannelID(), val.c_str()); + } if (result == ECA_NORMAL) { ca_flush_io(); @@ -1176,10 +1184,17 @@ int doPut_pvStructure(CAChannel::shar p += MAX_STRING_SIZE; } - - int result = ca_array_put_callback(channel->getNativeType(), arraySize, - channel->getChannelID(), ca_stringBuffer, - ca_put_handler, usrArg); + int result = 0; + if(usrArg!=NULL) { + result = ca_array_put_callback( + channel->getNativeType(), arraySize, + channel->getChannelID(), ca_stringBuffer, + ca_put_handler, usrArg); + } else { + result = ca_array_put( + channel->getNativeType(), arraySize, + channel->getChannelID(), ca_stringBuffer); + } delete[] ca_stringBuffer; if (result == ECA_NORMAL) @@ -1202,10 +1217,17 @@ int doPut_pvStructure(CAChannel:: std::tr1::shared_ptr value = std::tr1::static_pointer_cast(pvStructure->getSubFieldT("value.index")); dbr_enum_t val = value->get(); - int result = ca_array_put_callback(channel->getNativeType(), 1, - channel->getChannelID(), &val, - ca_put_handler, usrArg); - + int result = 0; + if(usrArg!=NULL) { + result = ca_array_put_callback( + channel->getNativeType(), 1, + channel->getChannelID(), &val, + ca_put_handler, usrArg); + } else { + result = ca_array_put( + channel->getNativeType(), 1, + channel->getChannelID(), &val); + } if (result == ECA_NORMAL) { ca_flush_io(); @@ -1254,16 +1276,29 @@ void CAChannelPut::put(PVStructure::shared_pointer const & pvPutStructure, BitSet::shared_pointer const & /*putBitSet*/) { channel->threadAttach(); - doPut putFunc = doPutFuncTable[channel->getNativeType()]; if (putFunc) { - // TODO now we always put all - int result = putFunc(channel, this, pvPutStructure); - if (result != ECA_NORMAL) - { - Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result))); - EXCEPTION_GUARD(channelPutRequester->putDone(errorStatus, shared_from_this())); + // TODO now we always put all + + if(block) { + int result = putFunc(channel, this, pvPutStructure); + if (result != ECA_NORMAL) + { + Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result))); + EXCEPTION_GUARD(channelPutRequester->putDone(errorStatus, shared_from_this())); + } + } else { + int result = putFunc(channel,NULL, pvPutStructure); + if (result == ECA_NORMAL) + { + EXCEPTION_GUARD(channelPutRequester->putDone(Status::Ok, shared_from_this())); + } + else + { + Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result))); + EXCEPTION_GUARD(channelPutRequester->putDone(errorStatus, shared_from_this())); + } } } else @@ -1352,60 +1387,6 @@ void CAChannelPut::destroy() // TODO } - - - - - - - - - -Monitor::shared_pointer CAChannelMonitor::create( - CAChannel::shared_pointer const & channel, - MonitorRequester::shared_pointer const & monitorRequester, - epics::pvData::PVStructure::shared_pointer const & pvRequest) -{ - // TODO use std::make_shared - std::tr1::shared_ptr tp( - new CAChannelMonitor(channel, monitorRequester, pvRequest) - ); - Monitor::shared_pointer thisPtr = tp; - static_cast(thisPtr.get())->activate(); - return thisPtr; -} - - -CAChannelMonitor::~CAChannelMonitor() -{ - // TODO -} - - -CAChannelMonitor::CAChannelMonitor(CAChannel::shared_pointer const & _channel, - MonitorRequester::shared_pointer const & _monitorRequester, - epics::pvData::PVStructure::shared_pointer const & pvRequest) : - channel(_channel), - monitorRequester(_monitorRequester), - getType(getDBRType(pvRequest, _channel->getNativeType())), - pvStructure(createPVStructure(_channel, getType, pvRequest)), - count(0), - element(new MonitorElement(pvStructure)) -{ - // TODO - element->changedBitSet->set(0); -} - -void CAChannelMonitor::activate() -{ - // TODO remove - thisPointer = shared_from_this(); - - EXCEPTION_GUARD(monitorRequester->monitorConnect(Status::Ok, shared_from_this(), - pvStructure->getStructure())); -} - - /* --------------- Monitor --------------- */ @@ -1415,27 +1396,144 @@ static void ca_subscription_handler(struct event_handler_args args) channelMonitor->subscriptionEvent(args); } +class CACMonitorQueue : + public std::tr1::enable_shared_from_this +{ +public: + POINTER_DEFINITIONS(CACMonitorQueue); +private: + size_t queueSize; + bool overrunInProgress; + bool isStarted; + Mutex mutex; + + std::queue monitorElementQueue; +public: + CACMonitorQueue( + int32 queueSize) + : queueSize(queueSize), + overrunInProgress(false), + isStarted(false) + {} + ~CACMonitorQueue() + { + } + void start() + { + Lock guard(mutex); + while(!monitorElementQueue.empty()) monitorElementQueue.pop(); + overrunInProgress = false; + isStarted = true; + } + void stop() + { + Lock guard(mutex); + while(!monitorElementQueue.empty()) monitorElementQueue.pop(); + overrunInProgress = false; + isStarted = false; + } + // return true if added to queue + bool event(const PVStructurePtr &pvStructure) + { + Lock guard(mutex); + if(!isStarted) return false; + if(monitorElementQueue.size()==queueSize) + { + overrunInProgress = true; + return false; + } else { + PVStructure::shared_pointer pvs = + getPVDataCreate()->createPVStructure(pvStructure->getStructure()); + pvs->copy(*pvStructure); + MonitorElementPtr monitorElement(new MonitorElement(pvs)); + monitorElement->changedBitSet->set(0); + if(overrunInProgress) { + overrunInProgress = false; + monitorElement->overrunBitSet->set(0); + } + monitorElementQueue.push(monitorElement); + return true; + } + + } + MonitorElementPtr poll() + { + Lock guard(mutex); + if(!isStarted) return MonitorElementPtr(); + if(monitorElementQueue.empty()) return MonitorElementPtr(); + MonitorElementPtr retval = monitorElementQueue.front(); + return retval; + } + void release(MonitorElementPtr const & monitorElement) + { + Lock guard(mutex); + if(monitorElementQueue.empty()) { + throw std::runtime_error("client error calling release"); + } + monitorElementQueue.pop(); + } +}; + +CAChannelMonitorPtr CAChannelMonitor::create( + CAChannel::shared_pointer const & channel, + MonitorRequester::shared_pointer const & monitorRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) +{ + return CAChannelMonitorPtr(new CAChannelMonitor(channel, monitorRequester, pvRequest)); +} + +CAChannelMonitor::~CAChannelMonitor() +{ +} + +CAChannelMonitor::CAChannelMonitor( + CAChannel::shared_pointer const & channel, + MonitorRequester::shared_pointer const & monitorRequester, + PVStructurePtr const & pvRequest) +: + channel(channel), + monitorRequester(monitorRequester), + pvRequest(pvRequest) + +{ +} + +void CAChannelMonitor::activate() +{ + if(pvStructure) throw std::runtime_error("CAChannelMonitor::activate() was called twice"); + getType = getDBRType(pvRequest, channel->getNativeType()); + pvStructure = createPVStructure(channel, getType, pvRequest); + int32 queueSize = 2; + PVStructurePtr pvOptions = pvRequest->getSubField("record._options"); + if (pvOptions) { + PVStringPtr pvString = pvOptions->getSubField("queueSize"); + if (pvString) { + int size; + std::stringstream ss; + ss << pvString->get(); + ss >> size; + if (size > 1) queueSize = size; + } + } + monitorQueue = CACMonitorQueuePtr(new CACMonitorQueue(queueSize)); + EXCEPTION_GUARD(monitorRequester->monitorConnect(Status::Ok, shared_from_this(), + pvStructure->getStructure())); +} void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args) { if (args.status == ECA_NORMAL) { - // TODO override indicator - copyDBRtoPVStructure copyFunc = copyFuncTable[getType]; - if (copyFunc) + if (copyFunc) { copyFunc(args.dbr, args.count, pvStructure); - else - { - // TODO remove + monitorQueue->event(pvStructure); + // call monitorRequester even if queue is full + monitorRequester->monitorEvent(shared_from_this()); + } else { std::cout << "no copy func implemented" << std::endl; + } - - { - Lock lock(mutex); - count = 1; - } - monitorRequester->monitorEvent(shared_from_this()); } else { @@ -1444,7 +1542,6 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args) } } - epics::pvData::Status CAChannelMonitor::start() { channel->threadAttach(); @@ -1462,12 +1559,13 @@ epics::pvData::Status CAChannelMonitor::start() // TODO DBE_PROPERTY support int result = ca_create_subscription(getType, - 0 /*channel->getElementCount()*/, - channel->getChannelID(), DBE_VALUE, - ca_subscription_handler, this, - &eventID); + 0, + channel->getChannelID(), DBE_VALUE, + ca_subscription_handler, this, + &eventID); if (result == ECA_NORMAL) { + monitorQueue->start(); ca_flush_io(); return Status::Ok; } @@ -1477,7 +1575,6 @@ epics::pvData::Status CAChannelMonitor::start() } } - epics::pvData::Status CAChannelMonitor::stop() { channel->threadAttach(); @@ -1486,6 +1583,7 @@ epics::pvData::Status CAChannelMonitor::stop() if (result == ECA_NORMAL) { + monitorQueue->stop(); return Status::Ok; } else @@ -1497,22 +1595,13 @@ epics::pvData::Status CAChannelMonitor::stop() MonitorElementPtr CAChannelMonitor::poll() { - Lock lock(mutex); - if (count) - { - count--; - return element; - } - else - { - return nullElement; - } + return monitorQueue->poll(); } -void CAChannelMonitor::release(MonitorElementPtr const & /*monitorElement*/) +void CAChannelMonitor::release(MonitorElementPtr const & monitorElement) { - // noop + monitorQueue->release(monitorElement); } /* --------------- epics::pvData::ChannelRequest --------------- */ @@ -1528,9 +1617,7 @@ void CAChannelMonitor::cancel() void CAChannelMonitor::destroy() { channel->threadAttach(); - ca_clear_subscription(eventID); - - // TODO - thisPointer.reset(); } + +}}} diff --git a/src/ca/pv/caChannel.h b/src/ca/pv/caChannel.h index 59f8632..21edf0c 100644 --- a/src/ca/pv/caChannel.h +++ b/src/ca/pv/caChannel.h @@ -7,8 +7,11 @@ #ifndef CACHANNEL_H #define CACHANNEL_H +#include + #include + /* for CA */ #include @@ -18,6 +21,13 @@ namespace epics { namespace pvAccess { namespace ca { +class CAChannelPut; +typedef std::tr1::shared_ptr CAChannelPutPtr; +class CAChannelGet; +typedef std::tr1::shared_ptr CAChannelGetPtr; +class CAChannelMonitor; +typedef std::tr1::shared_ptr CAChannelMonitorPtr; + class CAChannel : public Channel, public std::tr1::enable_shared_from_this @@ -74,9 +84,6 @@ public: void threadAttach(); - void registerRequest(ChannelRequest::shared_pointer const & request); - void unregisterRequest(ChannelRequest::shared_pointer const & request); - private: CAChannel(std::string const & channelName, @@ -95,17 +102,15 @@ private: epics::pvData::Structure::const_shared_pointer structure; epics::pvData::Mutex requestsMutex; - // TODO std::unordered_map - // void* is not the nicest thing, but there is no fast weak_ptr::operator== - typedef std::map RequestsList; - RequestsList requests; // synced on requestsMutex bool destroyed; + std::queue putQueue; + std::queue getQueue; + std::queue monitorQueue; }; - class CAChannelGet : public ChannelGet, public std::tr1::enable_shared_from_this @@ -114,7 +119,7 @@ class CAChannelGet : public: POINTER_DEFINITIONS(CAChannelGet); - static ChannelGet::shared_pointer create(CAChannel::shared_pointer const & channel, + static CAChannelGet::shared_pointer create(CAChannel::shared_pointer const & channel, ChannelGetRequester::shared_pointer const & channelGetRequester, epics::pvData::PVStructure::shared_pointer const & pvRequest); @@ -136,15 +141,17 @@ public: virtual void destroy(); + void activate(); + private: CAChannelGet(CAChannel::shared_pointer const & _channel, ChannelGetRequester::shared_pointer const & _channelGetRequester, epics::pvData::PVStructure::shared_pointer const & pvRequest); - void activate(); - + CAChannel::shared_pointer channel; ChannelGetRequester::shared_pointer channelGetRequester; + epics::pvData::PVStructure::shared_pointer pvRequest; chtype getType; epics::pvData::PVStructure::shared_pointer pvStructure; @@ -164,7 +171,7 @@ class CAChannelPut : public: POINTER_DEFINITIONS(CAChannelPut); - static ChannelPut::shared_pointer create(CAChannel::shared_pointer const & channel, + static CAChannelPut::shared_pointer create(CAChannel::shared_pointer const & channel, ChannelPutRequester::shared_pointer const & channelPutRequester, epics::pvData::PVStructure::shared_pointer const & pvRequest); @@ -191,15 +198,17 @@ public: virtual void destroy(); + void activate(); + private: CAChannelPut(CAChannel::shared_pointer const & _channel, ChannelPutRequester::shared_pointer const & _channelPutRequester, epics::pvData::PVStructure::shared_pointer const & pvRequest); - void activate(); - + CAChannel::shared_pointer channel; ChannelPutRequester::shared_pointer channelPutRequester; + epics::pvData::PVStructure::shared_pointer pvRequest; chtype getType; epics::pvData::PVStructure::shared_pointer pvStructure; @@ -207,8 +216,11 @@ private: // TODO AtomicBoolean !!! bool lastRequestFlag; + bool block; }; +class CACMonitorQueue; +typedef std::tr1::shared_ptr CACMonitorQueuePtr; class CAChannelMonitor : public Monitor, @@ -218,7 +230,7 @@ class CAChannelMonitor : public: POINTER_DEFINITIONS(CAChannelMonitor); - static Monitor::shared_pointer create(CAChannel::shared_pointer const & channel, + static CAChannelMonitor::shared_pointer create(CAChannel::shared_pointer const & channel, MonitorRequester::shared_pointer const & monitorRequester, epics::pvData::PVStructure::shared_pointer const & pvRequest); @@ -240,31 +252,22 @@ public: /* --------------- Destroyable --------------- */ virtual void destroy(); - + void activate(); private: CAChannelMonitor(CAChannel::shared_pointer const & _channel, MonitorRequester::shared_pointer const & _monitorRequester, epics::pvData::PVStructure::shared_pointer const & pvRequest); - void activate(); + CAChannel::shared_pointer channel; MonitorRequester::shared_pointer monitorRequester; + epics::pvData::PVStructure::shared_pointer pvRequest; chtype getType; epics::pvData::PVStructure::shared_pointer pvStructure; - epics::pvData::BitSet::shared_pointer changedBitSet; - epics::pvData::BitSet::shared_pointer overrunBitSet; evid eventID; - - epics::pvData::Mutex mutex; - int count; - - MonitorElement::shared_pointer element; - MonitorElement::shared_pointer nullElement; - - // TODO remove - Monitor::shared_pointer thisPointer; + CACMonitorQueuePtr monitorQueue; }; }