cancel(), done(), ackAny param

This commit is contained in:
Matej Sekoranja
2015-05-06 11:08:23 +02:00
parent 197f763452
commit b381d84e64
7 changed files with 245 additions and 74 deletions
+4
View File
@@ -0,0 +1,4 @@
bin
lib
include
dbd
+4 -2
View File
@@ -1422,7 +1422,9 @@ class MonitorRequesterImpl : public MonitorRequester
virtual void unlisten(Monitor::shared_pointer const & /*monitor*/)
{
std::cerr << "unlisten" << std::endl;
//std::cerr << "unlisten" << std::endl;
// TODO
epicsExit(0);
}
};
@@ -1747,7 +1749,7 @@ int main (int argc, char *argv[])
fprintf(stderr, "failed to parse request string\n");
return 1;
}
// register "pva" and "ca" providers
ClientFactory::start();
epics::pvAccess::ca::CAClientFactory::start();
+94 -32
View File
@@ -40,11 +40,17 @@ class ChannelPipelineMonitorImpl :
Mutex m_freeQueueLock;
Mutex m_monitorQueueLock;
AtomicBoolean m_active;
bool m_active;
MonitorElement::shared_pointer m_nullMonitorElement;
size_t m_requestedCount;
bool m_pipeline;
bool m_done;
bool m_unlistenReported;
public:
ChannelPipelineMonitorImpl(
Channel::shared_pointer const & channel,
@@ -56,28 +62,30 @@ class ChannelPipelineMonitorImpl :
m_queueSize(2),
m_freeQueueLock(),
m_monitorQueueLock(),
m_active(),
m_requestedCount(0)
m_active(false),
m_requestedCount(0),
m_pipeline(false),
m_done(false),
m_unlistenReported(false)
{
m_pipelineSession = pipelineService->createPipeline(pvRequest);
// extract queueSize parameter
PVFieldPtr pvField = pvRequest->getSubField("record._options");
if (pvField.get()) {
PVStructurePtr pvOptions = static_pointer_cast<PVStructure>(pvField);
pvField = pvOptions->getSubField("queueSize");
if (pvField.get()) {
PVStringPtr pvString = pvOptions->getStringField("queueSize");
if(pvString) {
int32 size;
std::stringstream ss;
ss << pvString->get();
ss >> size;
if (size < 2) size = 2;
// extract queueSize and pipeline parameter
PVStructurePtr pvOptions = pvRequest->getSubField<PVStructure>("record._options");
if (pvOptions) {
PVStringPtr pvString = pvOptions->getSubField<PVString>("queueSize");
if (pvString) {
int32 size;
std::stringstream ss;
ss << pvString->get();
ss >> size;
if (size > 1)
m_queueSize = static_cast<size_t>(size);
}
}
pvString = pvOptions->getSubField<PVString>("pipeline");
if (pvString)
m_pipeline = (pvString->get() == "true");
}
// server queue size must be >= client queue size
@@ -106,6 +114,10 @@ class ChannelPipelineMonitorImpl :
return m_pipelineSession;
}
bool isPipelineEnabled() const {
return m_pipeline;
}
virtual ~ChannelPipelineMonitorImpl()
{
destroy();
@@ -113,15 +125,17 @@ class ChannelPipelineMonitorImpl :
virtual Status start()
{
// already started
if (m_active.get())
return Status::Ok;
bool notify = false;
{
Lock guard(m_monitorQueueLock);
m_active.set();
// already started
if (m_active)
return Status::Ok;
m_active = true;
m_monitorQueueLock.lock();
bool notify = (m_monitorQueue.size() != 0);
m_monitorQueueLock.unlock();
notify = (m_monitorQueue.size() != 0);
}
if (notify)
{
@@ -134,7 +148,8 @@ class ChannelPipelineMonitorImpl :
virtual Status stop()
{
m_active.clear();
Lock guard(m_monitorQueueLock);
m_active = false;
return Status::Ok;
}
@@ -145,8 +160,19 @@ class ChannelPipelineMonitorImpl :
// do not give send more elements than m_requestedCount
// even if m_monitorQueue is not empty
if (m_monitorQueue.empty() || m_requestedCount == 0)
bool emptyQueue = m_monitorQueue.empty();
if (emptyQueue || m_requestedCount == 0 || !m_active)
{
// report "unlisten" event if queue empty and done, release lock first
if (!m_unlistenReported && m_done && emptyQueue)
{
m_unlistenReported = true;
guard.unlock();
m_monitorRequester->unlisten(shared_from_this());
}
return m_nullMonitorElement;
}
MonitorElement::shared_pointer element = m_monitorQueue.front();
m_monitorQueue.pop();
@@ -173,7 +199,7 @@ class ChannelPipelineMonitorImpl :
{
Lock guard(m_monitorQueueLock);
m_requestedCount += count;
notify = (m_monitorQueue.size() != 0);
notify = m_active && (m_monitorQueue.size() != 0);
}
// notify
@@ -189,7 +215,17 @@ class ChannelPipelineMonitorImpl :
virtual void destroy()
{
stop();
bool notifyCancel = false;
{
Lock guard(m_monitorQueueLock);
m_active = false;
notifyCancel = !m_done;
m_done = true;
}
if (notifyCancel)
m_pipelineSession->cancel();
}
virtual void lock()
@@ -229,6 +265,10 @@ class ChannelPipelineMonitorImpl :
bool notify = false;
{
Lock guard(m_monitorQueueLock);
if (m_done)
return;
// throw std::logic_error("putElement called after done");
m_monitorQueue.push(element);
// TODO there is way to much of notification, per each putElement
notify = (m_requestedCount != 0);
@@ -243,7 +283,17 @@ class ChannelPipelineMonitorImpl :
}
virtual void done() {
// TODO
Lock guard(m_monitorQueueLock);
m_done = true;
bool report = !m_unlistenReported && m_monitorQueue.empty();
if (report)
m_unlistenReported = true;
guard.unlock();
if (report)
m_monitorRequester->unlisten(shared_from_this());
}
};
@@ -398,9 +448,21 @@ public:
std::tr1::shared_ptr<ChannelPipelineMonitorImpl> tp(
new ChannelPipelineMonitorImpl(shared_from_this(), monitorRequester, pvRequest, m_pipelineService)
);
Monitor::shared_pointer ChannelPipelineMonitorImpl = tp;
monitorRequester->monitorConnect(Status::Ok, ChannelPipelineMonitorImpl, tp->getPipelineSession()->getStructure());
return ChannelPipelineMonitorImpl;
Monitor::shared_pointer channelPipelineMonitorImpl = tp;
if (tp->isPipelineEnabled())
{
monitorRequester->monitorConnect(Status::Ok, channelPipelineMonitorImpl, tp->getPipelineSession()->getStructure());
return channelPipelineMonitorImpl;
}
else
{
Monitor::shared_pointer nullPtr;
epics::pvData::Structure::const_shared_pointer nullStructure;
Status noPipelineEnabledStatus(Status::STATUSTYPE_ERROR, "pipeline option not enabled, use e.g. 'record[queueSize=16,pipeline=true]field(value)' pvRequest to enable pipelining");
monitorRequester->monitorConnect(noPipelineEnabledStatus, nullPtr, nullStructure);
return nullPtr;
}
}
virtual ChannelArray::shared_pointer createChannelArray(
+97 -36
View File
@@ -2063,6 +2063,7 @@ namespace epics {
virtual ~MonitorStrategy() {};
virtual void init(StructureConstPtr const & structure) = 0;
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0;
virtual void unlisten() = 0;
};
typedef vector<MonitorElement::shared_pointer> FreeElementQueue;
@@ -2103,10 +2104,18 @@ namespace epics {
// TODO check for cyclic-ref
ChannelImpl::shared_pointer m_channel;
pvAccessID m_ioid;
bool m_pipeline;
int32 m_ackAny;
bool m_unlisten;
public:
MonitorStrategyQueue(ChannelImpl::shared_pointer channel, pvAccessID ioid,
MonitorRequester::shared_pointer const & callback, int32 queueSize) :
MonitorRequester::shared_pointer const & callback,
int32 queueSize,
bool pipeline, int32 ackAny) :
m_queueSize(queueSize), m_lastStructure(),
m_freeQueue(),
m_monitorQueue(),
@@ -2115,7 +2124,9 @@ namespace epics {
m_nullMonitorElement(),
m_releasedCount(0),
m_reportQueueStateInProgress(false),
m_channel(channel), m_ioid(ioid)
m_channel(channel), m_ioid(ioid),
m_pipeline(pipeline), m_ackAny(ackAny),
m_unlisten(false)
{
if (queueSize <= 1)
throw std::invalid_argument("queueSize <= 1");
@@ -2318,12 +2329,33 @@ namespace epics {
}
}
virtual void unlisten()
{
bool notifyUnlisten = false;
{
Lock guard(m_mutex);
notifyUnlisten = m_monitorQueue.empty();
m_unlisten = !notifyUnlisten;
}
if (notifyUnlisten)
{
EXCEPTION_GUARD(m_callback->unlisten(shared_from_this()));
}
}
virtual MonitorElement::shared_pointer poll() {
Lock guard(m_mutex);
if (m_monitorQueue.empty())
if (m_monitorQueue.empty()) {
if (m_unlisten) {
m_unlisten = false;
guard.unlock();
EXCEPTION_GUARD(m_callback->unlisten(shared_from_this()));
}
return m_nullMonitorElement;
}
MonitorElement::shared_pointer retVal = m_monitorQueue.front();
m_monitorQueue.pop();
@@ -2351,25 +2383,27 @@ namespace epics {
m_overrunInProgress = false;
}
m_releasedCount++;
// TODO limit reporting back?
if (!m_reportQueueStateInProgress)
if (m_pipeline)
{
sendAck = true;
m_reportQueueStateInProgress = true;
}
}
if (sendAck)
{
try
{
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (...) {
// noop (do not complain if fails)
m_reportQueueStateInProgress = false;
m_releasedCount++;
if (!m_reportQueueStateInProgress && m_releasedCount >= m_ackAny)
{
sendAck = true;
m_reportQueueStateInProgress = true;
}
}
}
if (sendAck)
{
try
{
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (...) {
// noop (do not complain if fails)
m_reportQueueStateInProgress = false;
}
}
}
}
virtual void reportRemoteQueueStatus(int32 /*freeElements*/)
@@ -2447,6 +2481,7 @@ namespace epics {
int32 m_queueSize;
bool m_pipeline;
int32 m_ackAny;
ChannelMonitorImpl(
ChannelImpl::shared_pointer const & channel,
@@ -2457,8 +2492,9 @@ namespace epics {
m_monitorRequester(monitorRequester),
m_started(false),
m_pvRequest(pvRequest),
m_queueSize(0),
m_pipeline(false)
m_queueSize(2),
m_pipeline(false),
m_ackAny(0)
{
PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelMonitor);
}
@@ -2472,30 +2508,45 @@ namespace epics {
return;
}
m_queueSize = 2;
PVFieldPtr pvField = m_pvRequest->getSubField("record._options");
if (pvField.get()) {
PVStructurePtr pvOptions = static_pointer_cast<PVStructure>(pvField);
pvField = pvOptions->getSubField("queueSize");
if (pvField.get()) {
PVStringPtr pvString = pvOptions->getStringField("queueSize");
if(pvString) {
PVStructurePtr pvOptions = m_pvRequest->getSubField<PVStructure>("record._options");
if (pvOptions) {
PVStringPtr pvString = pvOptions->getSubField<PVString>("queueSize");
if (pvString) {
int32 size;
std::stringstream ss;
ss << pvString->get();
ss >> size;
if (size > 1)
m_queueSize = size;
}
pvString = pvOptions->getSubField<PVString>("pipeline");
if (pvString)
m_pipeline = (pvString->get() == "true");
// pipeline options
if (m_pipeline)
{
// defaults to queueSize/2
m_ackAny = m_queueSize/2;
pvString = pvOptions->getSubField<PVString>("ackAny");
if (pvString) {
int32 size;
std::stringstream ss;
ss << pvString->get();
ss >> size;
m_queueSize = size;
if (size > 0)
m_ackAny = size;
}
}
PVStringPtr pvString = pvOptions->getSubField<PVString>("pipeline");
if (pvString)
m_pipeline = (pvString->get() == "true");
}
BaseRequestImpl::activate();
if (m_queueSize < 2) m_queueSize = 2;
std::tr1::shared_ptr<MonitorStrategyQueue> tp(new MonitorStrategyQueue(m_channel, m_ioid, m_monitorRequester, m_queueSize));
std::tr1::shared_ptr<MonitorStrategyQueue> tp(
new MonitorStrategyQueue(m_channel, m_ioid, m_monitorRequester, m_queueSize,
m_pipeline, m_ackAny)
);
m_monitorStrategy = tp;
// subscribe
@@ -2609,6 +2660,16 @@ namespace epics {
{
// TODO not supported by IF yet...
}
else if (qos & QOS_DESTROY)
{
// TODO for now status is ignored
if (payloadBuffer->getRemaining())
m_monitorStrategy->response(transport, payloadBuffer);
// unlisten will be called when all the elements in the queue gets processed
m_monitorStrategy->unlisten();
}
else
{
m_monitorStrategy->response(transport, payloadBuffer);
+24 -2
View File
@@ -1948,7 +1948,7 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom,
ServerMonitorRequesterImpl::ServerMonitorRequesterImpl(
ServerContextImpl::shared_pointer const & context, ServerChannelImpl::shared_pointer const & channel,
const pvAccessID ioid, Transport::shared_pointer const & transport):
BaseChannelRequester(context, channel, ioid, transport), _channelMonitor(), _structure()
BaseChannelRequester(context, channel, ioid, transport), _channelMonitor(), _structure(), _unlisten(false)
{
}
@@ -1992,7 +1992,12 @@ void ServerMonitorRequesterImpl::monitorConnect(const Status& status, Monitor::s
void ServerMonitorRequesterImpl::unlisten(Monitor::shared_pointer const & /*monitor*/)
{
//TODO
{
Lock guard(_mutex);
_unlisten = true;
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
}
void ServerMonitorRequesterImpl::monitorEvent(Monitor::shared_pointer const & /*monitor*/)
@@ -2110,6 +2115,23 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl*
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
}
else
{
// TODO CAS
bool unlisten;
Lock guard(_mutex);
unlisten = _unlisten;
_unlisten = false;
guard.unlock();
if (unlisten)
{
control->startMessage((int8)CMD_MONITOR, sizeof(int32)/sizeof(int8) + 1);
buffer->putInt(_ioid);
buffer->putByte((int8)QOS_DESTROY);
Status::Ok.serialize(buffer, control);
}
}
}
}
+1
View File
@@ -542,6 +542,7 @@ namespace pvAccess {
epics::pvData::Monitor::shared_pointer _channelMonitor;
epics::pvData::StructureConstPtr _structure;
epics::pvData::Status _status;
bool _unlisten;
};
+21 -2
View File
@@ -21,10 +21,21 @@ class PipelineSessionImpl :
public:
PipelineSessionImpl(
epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/
epics::pvData::PVStructure::shared_pointer const & pvRequest
) :
m_counter(0)
m_counter(0),
m_max(0)
{
PVStructure::shared_pointer pvOptions = pvRequest->getSubField<PVStructure>("record._options");
if (pvOptions) {
PVString::shared_pointer pvString = pvOptions->getSubField<PVString>("limit");
if (pvString)
{
// note: this throws an exception if conversion fails
m_max = pvString->getAs<int32>();
}
}
}
size_t getMinQueueSize() const {
@@ -44,6 +55,13 @@ public:
MonitorElement::shared_pointer element = control->getFreeElement();
element->pvStructurePtr->getSubField<PVInt>(1 /*"count"*/)->put(m_counter++);
control->putElement(element);
// we reached the limit, no more data
if (m_max != 0 && m_counter == m_max)
{
control->done();
break;
}
}
}
@@ -54,6 +72,7 @@ public:
private:
// NOTE: all the request calls will be made from the same thread, so we do not need sync m_counter
int32 m_counter;
int32 m_max;
};
class PipelineServiceImpl :