This commit is contained in:
Matej Sekoranja
2015-10-15 21:18:56 +02:00
14 changed files with 1359 additions and 32 deletions

View File

@@ -1875,20 +1875,46 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom,
// create...
ServerMonitorRequesterImpl::create(_context, channel, ioid, transport, pvRequest);
// pipelining monitor (i.e. w/ flow control)
const bool ack = (QOS_GET_PUT & qosCode) != 0;
if (ack)
{
int32 nfree = payloadBuffer->getInt();
ServerMonitorRequesterImpl::shared_pointer request = static_pointer_cast<ServerMonitorRequesterImpl>(channel->getRequest(ioid));
Monitor::shared_pointer mp = request->getChannelMonitor();
PipelineMonitor* pmp = dynamic_cast<PipelineMonitor*>(mp.get());
if (pmp)
pmp->reportRemoteQueueStatus(nfree);
}
}
else
{
const bool lastRequest = (QOS_DESTROY & qosCode) != 0;
const bool get = (QOS_GET & qosCode) != 0;
const bool process = (QOS_PROCESS & qosCode) != 0;
const bool lastRequest = (QOS_DESTROY & qosCode) != 0;
const bool get = (QOS_GET & qosCode) != 0;
const bool process = (QOS_PROCESS & qosCode) != 0;
const bool ack = (QOS_GET_PUT & qosCode) != 0;
ServerMonitorRequesterImpl::shared_pointer request = static_pointer_cast<ServerMonitorRequesterImpl>(channel->getRequest(ioid));
ServerMonitorRequesterImpl::shared_pointer request = static_pointer_cast<ServerMonitorRequesterImpl>(channel->getRequest(ioid));
if (!request.get())
{
BaseChannelRequester::sendFailureMessage((int8)CMD_MONITOR, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus);
return;
}
if (ack)
{
int32 nfree = payloadBuffer->getInt();
Monitor::shared_pointer mp = request->getChannelMonitor();
PipelineMonitor* pmp = dynamic_cast<PipelineMonitor*>(mp.get());
if (pmp)
pmp->reportRemoteQueueStatus(nfree);
return;
// note: not possible to ack and destroy
}
/*
if (!request->startRequest(qosCode))
{
@@ -1929,7 +1955,7 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom,
ServerMonitorRequesterImpl::ServerMonitorRequesterImpl(
ServerContextImpl::shared_pointer const & context, ServerChannelImpl::shared_pointer const & channel,
const pvAccessID ioid, Transport::shared_pointer const & transport):
BaseChannelRequester(context, channel, ioid, transport), _channelMonitor(), _structure()
BaseChannelRequester(context, channel, ioid, transport), _channelMonitor(), _structure(), _unlisten(false)
{
}
@@ -1946,7 +1972,7 @@ MonitorRequester::shared_pointer ServerMonitorRequesterImpl::create(
void ServerMonitorRequesterImpl::activate(PVStructure::shared_pointer const & pvRequest)
{
startRequest(QOS_INIT);
startRequest(QOS_INIT);
MonitorRequester::shared_pointer thisPointer = shared_from_this();
Destroyable::shared_pointer thisDestroyable = shared_from_this();
_channel->registerRequest(_ioid, thisDestroyable);
@@ -1973,7 +1999,12 @@ void ServerMonitorRequesterImpl::monitorConnect(const Status& status, Monitor::s
void ServerMonitorRequesterImpl::unlisten(Monitor::shared_pointer const & /*monitor*/)
{
//TODO
{
Lock guard(_mutex);
_unlisten = true;
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
}
void ServerMonitorRequesterImpl::monitorEvent(Monitor::shared_pointer const & /*monitor*/)
@@ -2091,6 +2122,23 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl*
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
}
else
{
// TODO CAS
bool unlisten;
Lock guard(_mutex);
unlisten = _unlisten;
_unlisten = false;
guard.unlock();
if (unlisten)
{
control->startMessage((int8)CMD_MONITOR, sizeof(int32)/sizeof(int8) + 1);
buffer->putInt(_ioid);
buffer->putByte((int8)QOS_DESTROY);
Status::Ok.serialize(buffer, control);
}
}
}
}