ChannelRequest::cancel() impl.

This commit is contained in:
Matej Sekoranja
2014-04-07 12:22:30 +02:00
parent d2fb05ddd8
commit 4bbab422fe
11 changed files with 216 additions and 20 deletions
+23
View File
@@ -881,6 +881,13 @@ void CAChannelGet::get(bool lastRequest)
}
/* --------------- epics::pvData::ChannelRequest --------------- */
void CAChannelGet::cancel()
{
// noop
}
/* --------------- epics::pvData::Destroyable --------------- */
@@ -1176,6 +1183,14 @@ void CAChannelPut::get()
}
/* --------------- epics::pvData::ChannelRequest --------------- */
void CAChannelPut::cancel()
{
// noop
}
/* --------------- epics::pvData::Destroyable --------------- */
@@ -1350,6 +1365,14 @@ void CAChannelMonitor::release(epics::pvData::MonitorElementPtr const & /*monito
}
/* --------------- epics::pvData::ChannelRequest --------------- */
void CAChannelMonitor::cancel()
{
// noop
}
/* --------------- epics::pvData::Destroyable --------------- */
+12
View File
@@ -144,6 +144,10 @@ public:
virtual void get(bool lastRequest);
/* --------------- epics::pvData::ChannelRequest --------------- */
virtual void cancel();
/* --------------- epics::pvData::Destroyable --------------- */
virtual void destroy();
@@ -192,6 +196,10 @@ public:
virtual void put(bool lastRequest);
virtual void get();
/* --------------- epics::pvData::ChannelRequest --------------- */
virtual void cancel();
/* --------------- epics::pvData::Destroyable --------------- */
virtual void destroy();
@@ -240,6 +248,10 @@ public:
virtual epics::pvData::MonitorElementPtr poll();
virtual void release(epics::pvData::MonitorElementPtr const & monitorElement);
/* --------------- epics::pvData::ChannelRequest --------------- */
virtual void cancel();
/* --------------- epics::pvData::Destroyable --------------- */
virtual void destroy();
+6
View File
@@ -114,6 +114,12 @@ namespace pvAccess {
class epicsShareClass ChannelRequest : public epics::pvData::Destroyable, public Lockable, private epics::pvData::NoDefaultMethods {
public:
POINTER_DEFINITIONS(ChannelRequest);
/**
* Cancel any currently pending request.
* No response callback should be called after the request has been canceled.
*/
virtual void cancel() = 0;
};
/**
+3 -2
View File
@@ -99,12 +99,13 @@ namespace epics {
CMD_PUT_GET = 12,
CMD_MONITOR = 13,
CMD_ARRAY = 14,
CMD_CANCEL_REQUEST = 15,
CMD_DESTROY_REQUEST = 15,
CMD_PROCESS = 16,
CMD_GET_FIELD = 17,
CMD_MESSAGE = 18,
CMD_MULTIPLE_DATA = 19,
CMD_RPC = 20
CMD_RPC = 20,
CMD_CANCEL_REQUEST = 21
};
enum ControlCommands {
+61 -9
View File
@@ -120,6 +120,7 @@ namespace epics {
/* negative... */
static const int NULL_REQUEST = -1;
static const int PURE_DESTROY_REQUEST = -2;
static const int PURE_CANCEL_REQUEST = -3;
pvAccessID m_ioid;
@@ -160,7 +161,7 @@ namespace epics {
Lock guard(m_mutex);
// we allow pure destroy...
if (m_pendingRequest != NULL_REQUEST && qos != PURE_DESTROY_REQUEST)
if (m_pendingRequest != NULL_REQUEST && qos != PURE_DESTROY_REQUEST && qos != PURE_CANCEL_REQUEST)
return false;
m_pendingRequest = qos;
@@ -224,7 +225,7 @@ namespace epics {
m_mutex.unlock();
if (!destroyResponse(transport, version, payloadBuffer, qos, m_status))
cancel();
destroy();
}
else
{
@@ -239,7 +240,21 @@ namespace epics {
}
virtual void cancel() {
destroy();
{
Lock guard(m_mutex);
if (m_destroyed)
return;
}
try
{
startRequest(PURE_CANCEL_REQUEST);
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (...) {
// noop (do not complain if fails)
}
}
virtual void destroy() {
@@ -311,6 +326,12 @@ namespace epics {
if (qos == -1)
return;
else if (qos == PURE_DESTROY_REQUEST)
{
control->startMessage((int8)CMD_DESTROY_REQUEST, 8);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
}
else if (qos == PURE_CANCEL_REQUEST)
{
control->startMessage((int8)CMD_CANCEL_REQUEST, 8);
buffer->putInt(m_channel->getServerChannelID());
@@ -467,6 +488,11 @@ namespace epics {
}
}
virtual void cancel()
{
BaseRequestImpl::cancel();
}
virtual void destroy()
{
BaseRequestImpl::destroy();
@@ -669,6 +695,11 @@ namespace epics {
}
}
virtual void cancel()
{
BaseRequestImpl::cancel();
}
virtual void destroy()
{
BaseRequestImpl::destroy();
@@ -893,6 +924,11 @@ namespace epics {
}
}
virtual void cancel()
{
BaseRequestImpl::cancel();
}
virtual void destroy()
{
BaseRequestImpl::destroy();
@@ -1169,6 +1205,11 @@ namespace epics {
}
}
virtual void cancel()
{
BaseRequestImpl::cancel();
}
virtual void destroy()
{
BaseRequestImpl::destroy();
@@ -1351,6 +1392,11 @@ namespace epics {
}
}
virtual void cancel()
{
BaseRequestImpl::cancel();
}
virtual void destroy()
{
BaseRequestImpl::destroy();
@@ -1638,6 +1684,11 @@ namespace epics {
}
}
virtual void cancel()
{
BaseRequestImpl::cancel();
}
virtual void destroy()
{
BaseRequestImpl::destroy();
@@ -1746,8 +1797,8 @@ namespace epics {
virtual void cancel() {
destroy();
// TODO notify?
// TODO
// noop
}
virtual void timeout() {
@@ -1796,7 +1847,7 @@ namespace epics {
EXCEPTION_GUARD(m_callback->getDone(status, FieldConstPtr()));
}
cancel();
destroy();
}
@@ -2144,7 +2195,7 @@ namespace epics {
m_mutex.unlock();
if (!destroyResponse(transport, version, payloadBuffer, qos, status))
cancel();
destroy();
}
else
{
@@ -2580,7 +2631,7 @@ namespace epics {
ResponseHandler::shared_pointer badResponse(new BadResponse(context));
ResponseHandler::shared_pointer dataResponse(new DataResponseHandler(context));
m_handlerTable.resize(CMD_RPC+1);
m_handlerTable.resize(CMD_CANCEL_REQUEST+1);
m_handlerTable[CMD_BEACON].reset(new BeaconResponseHandler(context)); /* 0 */
m_handlerTable[CMD_CONNECTION_VALIDATION].reset(new ClientConnectionValidationHandler(context)); /* 1 */
@@ -2597,12 +2648,13 @@ namespace epics {
m_handlerTable[CMD_PUT_GET] = dataResponse; /* 12 - put-get response */
m_handlerTable[CMD_MONITOR] = dataResponse; /* 13 - monitor response */
m_handlerTable[CMD_ARRAY] = dataResponse; /* 14 - array response */
m_handlerTable[CMD_CANCEL_REQUEST] = badResponse; /* 15 - cancel request */
m_handlerTable[CMD_DESTROY_REQUEST] = badResponse; /* 15 - destroy request */
m_handlerTable[CMD_PROCESS] = dataResponse; /* 16 - process response */
m_handlerTable[CMD_GET_FIELD] = dataResponse; /* 17 - get field response */
m_handlerTable[CMD_MESSAGE].reset(new MessageHandler(context)); /* 18 - message to Requester */
m_handlerTable[CMD_MULTIPLE_DATA] = badResponse; // TODO new MultipleDataResponseHandler(context), /* 19 - grouped monitors */
m_handlerTable[CMD_RPC] = dataResponse; /* 20 - RPC response */
m_handlerTable[CMD_CANCEL_REQUEST] = badResponse; /* 21 - cancel request */
}
virtual void handleResponse(osiSockAddr* responseFrom,
+5
View File
@@ -78,6 +78,11 @@ class ChannelRPCServiceImpl : public ChannelRPC
processRequest(pvArgument, lastRequest);
}
virtual void cancel()
{
// noop
}
virtual void destroy()
{
// noop
@@ -18,6 +18,7 @@ const Status BaseChannelRequester::noReadACLStatus = Status(Status::STATUSTYPE_E
const Status BaseChannelRequester::noWriteACLStatus = Status(Status::STATUSTYPE_ERROR, "no write access");
const Status BaseChannelRequester::noProcessACLStatus = Status(Status::STATUSTYPE_ERROR, "no process access");
const Status BaseChannelRequester::otherRequestPendingStatus = Status(Status::STATUSTYPE_ERROR, "other request pending");
const Status BaseChannelRequester::notAChannelRequestStatus = Status(Status::STATUSTYPE_ERROR, "not a channel request");
const int32 BaseChannelRequester::NULL_REQUEST = -1;
@@ -48,6 +48,7 @@ public:
static const epics::pvData::Status noWriteACLStatus;
static const epics::pvData::Status noProcessACLStatus;
static const epics::pvData::Status otherRequestPendingStatus;
static const epics::pvData::Status notAChannelRequestStatus;
protected:
const pvAccessID _ioid;
Transport::shared_pointer _transport;
+50 -4
View File
@@ -50,7 +50,7 @@ ServerResponseHandler::ServerResponseHandler(ServerContextImpl::shared_pointer c
MB_INIT;
ResponseHandler::shared_pointer badResponse(new ServerBadResponse(context));
m_handlerTable.resize(CMD_RPC+1);
m_handlerTable.resize(CMD_CANCEL_REQUEST+1);
m_handlerTable[CMD_BEACON].reset(new ServerNoopResponse(context, "Beacon")); /* 0 */
m_handlerTable[CMD_CONNECTION_VALIDATION].reset(new ServerConnectionValidationHandler(context)); /* 1 */
@@ -68,12 +68,13 @@ ServerResponseHandler::ServerResponseHandler(ServerContextImpl::shared_pointer c
m_handlerTable[CMD_PUT_GET].reset(new ServerPutGetHandler(context)); /* 12 - put-get response */
m_handlerTable[CMD_MONITOR].reset(new ServerMonitorHandler(context)); /* 13 - monitor response */
m_handlerTable[CMD_ARRAY].reset(new ServerArrayHandler(context)); /* 14 - array response */
m_handlerTable[CMD_CANCEL_REQUEST].reset(new ServerCancelRequestHandler(context)); /* 15 - cancel request */
m_handlerTable[CMD_DESTROY_REQUEST].reset(new ServerDestroyRequestHandler(context)); /* 15 - destroy request */
m_handlerTable[CMD_PROCESS].reset(new ServerProcessHandler(context)); /* 16 - process response */
m_handlerTable[CMD_GET_FIELD].reset(new ServerGetFieldHandler(context)); /* 17 - get field response */
m_handlerTable[CMD_MESSAGE] = badResponse; /* 18 - message to Requester */
m_handlerTable[CMD_MULTIPLE_DATA] = badResponse; /* 19 - grouped monitors */
m_handlerTable[CMD_RPC].reset(new ServerRPCHandler(context)); /* 20 - RPC response */
m_handlerTable[CMD_CANCEL_REQUEST].reset(new ServerCancelRequestHandler(context)); /* 21 - cancel request */
}
void ServerResponseHandler::handleResponse(osiSockAddr* responseFrom,
@@ -1659,7 +1660,7 @@ void ServerChannelArrayRequesterImpl::send(ByteBuffer* buffer, TransportSendCont
}
/****************************************************************************************/
void ServerCancelRequestHandler::handleResponse(osiSockAddr* responseFrom,
void ServerDestroyRequestHandler::handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, int8 version, int8 command,
size_t payloadSize, ByteBuffer* payloadBuffer) {
AbstractServerResponseHandler::handleResponse(responseFrom,
@@ -1693,11 +1694,56 @@ void ServerCancelRequestHandler::handleResponse(osiSockAddr* responseFrom,
channel->unregisterRequest(ioid);
}
void ServerCancelRequestHandler::failureResponse(Transport::shared_pointer const & transport, pvAccessID ioid, const Status& errorStatus)
void ServerDestroyRequestHandler::failureResponse(Transport::shared_pointer const & transport, pvAccessID ioid, const Status& errorStatus)
{
BaseChannelRequester::message(transport, ioid, errorStatus.getMessage(), warningMessage);
}
/****************************************************************************************/
void ServerCancelRequestHandler::handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, int8 version, int8 command,
size_t payloadSize, ByteBuffer* payloadBuffer) {
AbstractServerResponseHandler::handleResponse(responseFrom,
transport, version, command, payloadSize, payloadBuffer);
// NOTE: we do not explicitly check if transport is OK
ChannelHostingTransport::shared_pointer casTransport = dynamic_pointer_cast<ChannelHostingTransport>(transport);
transport->ensureData(2*sizeof(int32)/sizeof(int8));
const pvAccessID sid = payloadBuffer->getInt();
const pvAccessID ioid = payloadBuffer->getInt();
ServerChannelImpl::shared_pointer channel = static_pointer_cast<ServerChannelImpl>(casTransport->getChannel(sid));
if (channel == NULL)
{
failureResponse(transport, ioid, BaseChannelRequester::badCIDStatus);
return;
}
Destroyable::shared_pointer request = channel->getRequest(ioid);
if (request == NULL)
{
failureResponse(transport, ioid, BaseChannelRequester::badIOIDStatus);
return;
}
ChannelRequest::shared_pointer cr = dynamic_pointer_cast<ChannelRequest>(request);
if (cr == NULL)
{
failureResponse(transport, ioid, BaseChannelRequester::notAChannelRequestStatus);
return;
}
// cancel
cr->cancel();
}
void ServerCancelRequestHandler::failureResponse(Transport::shared_pointer const & transport, pvAccessID ioid, const Status& errorStatus)
{
BaseChannelRequester::message(transport, ioid, errorStatus.getMessage(), warningMessage);
}
/****************************************************************************************/
void ServerProcessHandler::handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, int8 version, int8 command,
+26 -5
View File
@@ -573,15 +573,15 @@ namespace pvAccess {
/****************************************************************************************/
/**
* Cancel request handler.
* Destroy request handler.
*/
class ServerCancelRequestHandler : public AbstractServerResponseHandler
class ServerDestroyRequestHandler : public AbstractServerResponseHandler
{
public:
ServerCancelRequestHandler(ServerContextImpl::shared_pointer const & context) :
AbstractServerResponseHandler(context, "Cancel request") {
ServerDestroyRequestHandler(ServerContextImpl::shared_pointer const & context) :
AbstractServerResponseHandler(context, "Destroy request") {
}
virtual ~ServerCancelRequestHandler() {}
virtual ~ServerDestroyRequestHandler() {}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, epics::pvData::int8 version, epics::pvData::int8 command,
@@ -592,6 +592,27 @@ namespace pvAccess {
};
/****************************************************************************************/
/**
* Cancel request handler.
*/
class ServerCancelRequestHandler : public AbstractServerResponseHandler
{
public:
ServerCancelRequestHandler(ServerContextImpl::shared_pointer const & context) :
AbstractServerResponseHandler(context, "Cancel request") {
}
virtual ~ServerCancelRequestHandler() {}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, epics::pvData::int8 version, epics::pvData::int8 command,
std::size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer);
private:
void failureResponse(Transport::shared_pointer const & transport, pvAccessID ioid, const epics::pvData::Status& errorStatus);
};
/****************************************************************************************/
/**
* Process request handler.
+28
View File
@@ -885,6 +885,10 @@ public:
destroy();
}
virtual void cancel()
{
}
virtual void destroy()
{
}
@@ -998,6 +1002,10 @@ public:
m_changed.set();
}
virtual void cancel()
{
}
virtual void destroy()
{
if (m_channelProcess)
@@ -1097,6 +1105,10 @@ public:
m_channelPutRequester->getDone(Status::Ok);
}
virtual void cancel()
{
}
virtual void destroy()
{
if (m_channelProcess)
@@ -1187,6 +1199,10 @@ public:
m_channelPutGetRequester->getPutDone(Status::Ok);
}
virtual void cancel()
{
}
virtual void destroy()
{
if (m_channelProcess)
@@ -1602,6 +1618,10 @@ public:
destroy();
}
virtual void cancel()
{
}
virtual void destroy()
{
}
@@ -1794,6 +1814,10 @@ public:
destroy();
}
virtual void cancel()
{
}
virtual void destroy()
{
}
@@ -1931,6 +1955,10 @@ public:
}
}
virtual void cancel()
{
}
virtual void destroy()
{
stop();