diff --git a/src/ca/caChannel.cpp b/src/ca/caChannel.cpp index a6c2541..fd92bfd 100644 --- a/src/ca/caChannel.cpp +++ b/src/ca/caChannel.cpp @@ -755,7 +755,8 @@ CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel, : channel(channel), channelGetRequester(channelGetRequester), - pvRequest(pvRequest) + pvRequest(pvRequest), + firstTime(true) { if(DEBUG_LEVEL>0) { cout << "CAChannelGet::CAChannelGet() " << channel->getChannelName() << endl; @@ -780,7 +781,9 @@ void CAChannelGet::activate() getType = getDBRType(pvRequest, channel->getNativeType()); pvStructure = createPVStructure(channel, getType, pvRequest); bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields())); - bitSet->set(0); + pvCopy = PVCopy::create( + createPVStructure(channel, getType, pvRequest), + pvRequest,""); channel->addChannelGet(shared_from_this()); if(channel->getConnectionState()==Channel::CONNECTED) { EXCEPTION_GUARD(getRequester->channelGetConnect(Status::Ok, shared_from_this(), @@ -793,6 +796,7 @@ void CAChannelGet::channelCreated(const Status& status,Channel::shared_pointer c if(DEBUG_LEVEL>0) { std::cout << "CAChannelGet::channelCreated " << channel->getChannelName() << endl; } + firstTime = true; ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock()); if(!getRequester) return; chtype newType = getDBRType(pvRequest, channel->getNativeType()); @@ -800,7 +804,7 @@ void CAChannelGet::channelCreated(const Status& status,Channel::shared_pointer c getType = getDBRType(pvRequest, channel->getNativeType()); pvStructure = createPVStructure(channel, getType, pvRequest); bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields())); - bitSet->set(0); + pvCopy = PVCopy::create(pvStructure,pvRequest,""); } EXCEPTION_GUARD(getRequester->channelGetConnect(Status::Ok, shared_from_this(), pvStructure->getStructure())); @@ -1195,7 +1199,10 @@ static copyDBRtoPVStructure copyFuncTable[] = void CAChannelGet::getDone(struct event_handler_args &args) { if(DEBUG_LEVEL>1) { - std::cout << "CAChannelGet::getDone " << channel->getChannelName() << endl; + std::cout << "CAChannelGet::getDone " + << channel->getChannelName() + << " firstTime " << (firstTime ? "true" : "false") + << endl; } ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock()); if(!getRequester) return; @@ -1208,6 +1215,12 @@ void CAChannelGet::getDone(struct event_handler_args &args) { throw std::runtime_error("CAChannelGet::getDone no copy func implemented"); } + pvCopy->updateMasterSetBitSet(pvStructure,bitSet); + if(firstTime) { + bitSet->clear(); + bitSet->set(0); + firstTime = false; + } EXCEPTION_GUARD(getRequester->getDone(Status::Ok, shared_from_this(), pvStructure, bitSet)); } else @@ -1235,7 +1248,7 @@ void CAChannelGet::get() Prior to R3.14.12 requesting zero elements in a ca_array_get_callback() call was illegal and would fail immediately. */ - + bitSet->clear(); int result = ca_array_get_callback(getType, 0, channel->getChannelID(), ca_get_handler, this); @@ -1720,7 +1733,6 @@ public: POINTER_DEFINITIONS(CACMonitorQueue); private: size_t queueSize; - bool overrunInProgress; bool isStarted; Mutex mutex; @@ -1729,7 +1741,6 @@ public: CACMonitorQueue( int32 queueSize) : queueSize(queueSize), - overrunInProgress(false), isStarted(false) {} ~CACMonitorQueue() @@ -1739,39 +1750,29 @@ public: { Lock guard(mutex); while(!monitorElementQueue.empty()) monitorElementQueue.pop(); - overrunInProgress = false; isStarted = true; } void stop() { Lock guard(mutex); while(!monitorElementQueue.empty()) monitorElementQueue.pop(); - overrunInProgress = false; isStarted = false; } - // return true if added to queue - bool event(const PVStructurePtr &pvStructure) + + bool event( + const PVStructurePtr &pvStructure, + const MonitorElementPtr & activeElement) { Lock guard(mutex); if(!isStarted) return false; - if(monitorElementQueue.size()==queueSize) - { - overrunInProgress = true; - return false; - } else { - PVStructure::shared_pointer pvs = - getPVDataCreate()->createPVStructure(pvStructure->getStructure()); - pvs->copy(*pvStructure); - MonitorElementPtr monitorElement(new MonitorElement(pvs)); - monitorElement->changedBitSet->set(0); - if(overrunInProgress) { - overrunInProgress = false; - monitorElement->overrunBitSet->set(0); - } - monitorElementQueue.push(monitorElement); - return true; - } - + if(monitorElementQueue.size()==queueSize) return false; + PVStructure::shared_pointer pvs = + getPVDataCreate()->createPVStructure(pvStructure); + MonitorElementPtr monitorElement(new MonitorElement(pvs)); + *(monitorElement->changedBitSet) = *(activeElement->changedBitSet); + *(monitorElement->overrunBitSet) = *(activeElement->overrunBitSet); + monitorElementQueue.push(monitorElement); + return true; } MonitorElementPtr poll() { @@ -1823,7 +1824,8 @@ CAChannelMonitor::CAChannelMonitor( channel(channel), monitorRequester(monitorRequester), pvRequest(pvRequest), - isStarted(false) + isStarted(false), + firstTime(true) { if(DEBUG_LEVEL>0) { cout << "CAChannelMonitor::CAChannelMonitor() " << channel->getChannelName() << endl; @@ -1840,6 +1842,10 @@ void CAChannelMonitor::activate() if(pvStructure) throw std::runtime_error("CAChannelMonitor::activate() was called twice"); getType = getDBRType(pvRequest, channel->getNativeType()); pvStructure = createPVStructure(channel, getType, pvRequest); + activeElement = MonitorElementPtr(new MonitorElement(pvStructure)); + pvCopy = PVCopy::create( + createPVStructure(channel, getType, pvRequest), + pvRequest,""); int32 queueSize = 2; PVStructurePtr pvOptions = pvRequest->getSubField("record._options"); if (pvOptions) { @@ -1865,12 +1871,17 @@ void CAChannelMonitor::channelCreated(const Status& status,Channel::shared_point if(DEBUG_LEVEL>0) { std::cout << "CAChannelMonitor::channelCreated " << channel->getChannelName() << endl; } + firstTime = true; MonitorRequester::shared_pointer requester(monitorRequester.lock()); if(!requester) return; chtype newType = getDBRType(pvRequest, channel->getNativeType()); if(newType!=getType) { getType = getDBRType(pvRequest, channel->getNativeType()); pvStructure = createPVStructure(channel, getType, pvRequest); + activeElement = MonitorElementPtr(new MonitorElement(pvStructure)); + pvCopy = PVCopy::create( + createPVStructure(channel, getType, pvRequest), + pvRequest,""); int32 queueSize = 2; PVStructurePtr pvOptions = pvRequest->getSubField("record._options"); if (pvOptions) { @@ -1917,7 +1928,10 @@ void CAChannelMonitor::channelDisconnect(bool destroy) void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args) { if(DEBUG_LEVEL>1) { - std::cout << "CAChannelMonitor::subscriptionEvent " << channel->getChannelName() << endl; + std::cout << "CAChannelMonitor::subscriptionEvent " + << channel->getChannelName() + << " firstTime " << (firstTime ? "true" : "false") + << endl; } MonitorRequester::shared_pointer requester(monitorRequester.lock()); if(!requester) return; @@ -1926,7 +1940,20 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args) copyDBRtoPVStructure copyFunc = copyFuncTable[getType]; if (copyFunc) { copyFunc(args.dbr, args.count, pvStructure); - monitorQueue->event(pvStructure); + pvCopy->updateMasterSetBitSet(pvStructure,activeElement->changedBitSet); + if(firstTime) { + activeElement->changedBitSet->clear(); + activeElement->overrunBitSet->clear(); + activeElement->changedBitSet->set(0); + firstTime = false; + } + if(monitorQueue->event(pvStructure,activeElement)) { + activeElement->changedBitSet->clear(); + activeElement->overrunBitSet->clear(); + } else { + *(activeElement->overrunBitSet) |= *(activeElement->changedBitSet); + } + // call monitorRequester even if queue is full requester->monitorEvent(shared_from_this()); } else { @@ -1966,6 +1993,7 @@ epics::pvData::Status CAChannelMonitor::start() */ // TODO DBE_PROPERTY support + monitorQueue->start(); int result = ca_create_subscription(getType, 0, channel->getChannelID(), DBE_VALUE, @@ -1974,7 +2002,6 @@ epics::pvData::Status CAChannelMonitor::start() if (result == ECA_NORMAL) { isStarted = true; - monitorQueue->start(); result = ca_flush_io(); } if (result == ECA_NORMAL) return status; diff --git a/src/ca/caChannel.h b/src/ca/caChannel.h index 4a2c40c..98e6b1b 100644 --- a/src/ca/caChannel.h +++ b/src/ca/caChannel.h @@ -11,6 +11,7 @@ #include #include +#include /* for CA */ @@ -196,8 +197,10 @@ private: CAChannelPtr channel; ChannelGetRequester::weak_pointer channelGetRequester; const epics::pvData::PVStructure::shared_pointer pvRequest; + volatile bool firstTime; chtype getType; + epics::pvData::PVCopyPtr pvCopy; epics::pvData::PVStructure::shared_pointer pvStructure; epics::pvData::BitSet::shared_pointer bitSet; }; @@ -329,7 +332,10 @@ private: bool isStarted; chtype getType; + volatile bool firstTime; + epics::pvData::PVCopyPtr pvCopy; epics::pvData::PVStructure::shared_pointer pvStructure; + epics::pvData::MonitorElementPtr activeElement; evid eventID; CACMonitorQueuePtr monitorQueue; }; diff --git a/src/ca/caProvider.cpp b/src/ca/caProvider.cpp index 8f380b6..6ec6fb9 100644 --- a/src/ca/caProvider.cpp +++ b/src/ca/caProvider.cpp @@ -169,7 +169,11 @@ void CAChannelProvider::poll() void CAChannelProvider::attachContext() { - if(ca_current_context()) return; + ca_client_context* thread_context = ca_current_context(); + if (thread_context == current_context) return; + if (thread_context != NULL) { + throw std::runtime_error("CAChannelProvider: Foreign CA context in use"); + } int result = ca_attach_context(current_context); if (result != ECA_NORMAL) { std::cout << diff --git a/src/ca/caProviderPvt.h b/src/ca/caProviderPvt.h index c5c8883..7f656b8 100644 --- a/src/ca/caProviderPvt.h +++ b/src/ca/caProviderPvt.h @@ -73,7 +73,7 @@ public: void attachContext(); private: - virtual void destroy() EPICS_DEPRECATED {}; + virtual void destroy() EPICS_DEPRECATED {} void initialize(); ca_client_context* current_context; epics::pvData::Mutex channelListMutex;