diff --git a/pvAccessApp/ca/caChannel.cpp b/pvAccessApp/ca/caChannel.cpp index 99b682a..bba7c38 100644 --- a/pvAccessApp/ca/caChannel.cpp +++ b/pvAccessApp/ca/caChannel.cpp @@ -881,6 +881,13 @@ void CAChannelGet::get(bool lastRequest) } +/* --------------- epics::pvData::ChannelRequest --------------- */ + +void CAChannelGet::cancel() +{ + // noop +} + /* --------------- epics::pvData::Destroyable --------------- */ @@ -1176,6 +1183,14 @@ void CAChannelPut::get() } + +/* --------------- epics::pvData::ChannelRequest --------------- */ + +void CAChannelPut::cancel() +{ + // noop +} + /* --------------- epics::pvData::Destroyable --------------- */ @@ -1350,6 +1365,14 @@ void CAChannelMonitor::release(epics::pvData::MonitorElementPtr const & /*monito } + +/* --------------- epics::pvData::ChannelRequest --------------- */ + +void CAChannelMonitor::cancel() +{ + // noop +} + /* --------------- epics::pvData::Destroyable --------------- */ diff --git a/pvAccessApp/ca/caChannel.h b/pvAccessApp/ca/caChannel.h index 74a4d2f..ee8b845 100644 --- a/pvAccessApp/ca/caChannel.h +++ b/pvAccessApp/ca/caChannel.h @@ -144,6 +144,10 @@ public: virtual void get(bool lastRequest); + /* --------------- epics::pvData::ChannelRequest --------------- */ + + virtual void cancel(); + /* --------------- epics::pvData::Destroyable --------------- */ virtual void destroy(); @@ -192,6 +196,10 @@ public: virtual void put(bool lastRequest); virtual void get(); + /* --------------- epics::pvData::ChannelRequest --------------- */ + + virtual void cancel(); + /* --------------- epics::pvData::Destroyable --------------- */ virtual void destroy(); @@ -240,6 +248,10 @@ public: virtual epics::pvData::MonitorElementPtr poll(); virtual void release(epics::pvData::MonitorElementPtr const & monitorElement); + /* --------------- epics::pvData::ChannelRequest --------------- */ + + virtual void cancel(); + /* --------------- epics::pvData::Destroyable --------------- */ virtual void destroy(); diff --git a/pvAccessApp/client/pvAccess.h b/pvAccessApp/client/pvAccess.h index 7f59f9b..5cc0982 100644 --- a/pvAccessApp/client/pvAccess.h +++ b/pvAccessApp/client/pvAccess.h @@ -114,6 +114,12 @@ namespace pvAccess { class epicsShareClass ChannelRequest : public epics::pvData::Destroyable, public Lockable, private epics::pvData::NoDefaultMethods { public: POINTER_DEFINITIONS(ChannelRequest); + + /** + * Cancel any currently pending request. + * No response callback should be called after the request has been canceled. + */ + virtual void cancel() = 0; }; /** diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 311c328..7999293 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -99,12 +99,13 @@ namespace epics { CMD_PUT_GET = 12, CMD_MONITOR = 13, CMD_ARRAY = 14, - CMD_CANCEL_REQUEST = 15, + CMD_DESTROY_REQUEST = 15, CMD_PROCESS = 16, CMD_GET_FIELD = 17, CMD_MESSAGE = 18, CMD_MULTIPLE_DATA = 19, - CMD_RPC = 20 + CMD_RPC = 20, + CMD_CANCEL_REQUEST = 21 }; enum ControlCommands { diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index c9cf0ee..e14ee2c 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -120,6 +120,7 @@ namespace epics { /* negative... */ static const int NULL_REQUEST = -1; static const int PURE_DESTROY_REQUEST = -2; + static const int PURE_CANCEL_REQUEST = -3; pvAccessID m_ioid; @@ -160,7 +161,7 @@ namespace epics { Lock guard(m_mutex); // we allow pure destroy... - if (m_pendingRequest != NULL_REQUEST && qos != PURE_DESTROY_REQUEST) + if (m_pendingRequest != NULL_REQUEST && qos != PURE_DESTROY_REQUEST && qos != PURE_CANCEL_REQUEST) return false; m_pendingRequest = qos; @@ -224,7 +225,7 @@ namespace epics { m_mutex.unlock(); if (!destroyResponse(transport, version, payloadBuffer, qos, m_status)) - cancel(); + destroy(); } else { @@ -239,7 +240,21 @@ namespace epics { } virtual void cancel() { - destroy(); + + { + Lock guard(m_mutex); + if (m_destroyed) + return; + } + + try + { + startRequest(PURE_CANCEL_REQUEST); + m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + } catch (...) { + // noop (do not complain if fails) + } + } virtual void destroy() { @@ -311,6 +326,12 @@ namespace epics { if (qos == -1) return; else if (qos == PURE_DESTROY_REQUEST) + { + control->startMessage((int8)CMD_DESTROY_REQUEST, 8); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + } + else if (qos == PURE_CANCEL_REQUEST) { control->startMessage((int8)CMD_CANCEL_REQUEST, 8); buffer->putInt(m_channel->getServerChannelID()); @@ -467,6 +488,11 @@ namespace epics { } } + virtual void cancel() + { + BaseRequestImpl::cancel(); + } + virtual void destroy() { BaseRequestImpl::destroy(); @@ -669,6 +695,11 @@ namespace epics { } } + virtual void cancel() + { + BaseRequestImpl::cancel(); + } + virtual void destroy() { BaseRequestImpl::destroy(); @@ -893,6 +924,11 @@ namespace epics { } } + virtual void cancel() + { + BaseRequestImpl::cancel(); + } + virtual void destroy() { BaseRequestImpl::destroy(); @@ -1169,6 +1205,11 @@ namespace epics { } } + virtual void cancel() + { + BaseRequestImpl::cancel(); + } + virtual void destroy() { BaseRequestImpl::destroy(); @@ -1351,6 +1392,11 @@ namespace epics { } } + virtual void cancel() + { + BaseRequestImpl::cancel(); + } + virtual void destroy() { BaseRequestImpl::destroy(); @@ -1638,6 +1684,11 @@ namespace epics { } } + virtual void cancel() + { + BaseRequestImpl::cancel(); + } + virtual void destroy() { BaseRequestImpl::destroy(); @@ -1746,8 +1797,8 @@ namespace epics { virtual void cancel() { - destroy(); - // TODO notify? + // TODO + // noop } virtual void timeout() { @@ -1796,7 +1847,7 @@ namespace epics { EXCEPTION_GUARD(m_callback->getDone(status, FieldConstPtr())); } - cancel(); + destroy(); } @@ -2144,7 +2195,7 @@ namespace epics { m_mutex.unlock(); if (!destroyResponse(transport, version, payloadBuffer, qos, status)) - cancel(); + destroy(); } else { @@ -2580,7 +2631,7 @@ namespace epics { ResponseHandler::shared_pointer badResponse(new BadResponse(context)); ResponseHandler::shared_pointer dataResponse(new DataResponseHandler(context)); - m_handlerTable.resize(CMD_RPC+1); + m_handlerTable.resize(CMD_CANCEL_REQUEST+1); m_handlerTable[CMD_BEACON].reset(new BeaconResponseHandler(context)); /* 0 */ m_handlerTable[CMD_CONNECTION_VALIDATION].reset(new ClientConnectionValidationHandler(context)); /* 1 */ @@ -2597,12 +2648,13 @@ namespace epics { m_handlerTable[CMD_PUT_GET] = dataResponse; /* 12 - put-get response */ m_handlerTable[CMD_MONITOR] = dataResponse; /* 13 - monitor response */ m_handlerTable[CMD_ARRAY] = dataResponse; /* 14 - array response */ - m_handlerTable[CMD_CANCEL_REQUEST] = badResponse; /* 15 - cancel request */ + m_handlerTable[CMD_DESTROY_REQUEST] = badResponse; /* 15 - destroy request */ m_handlerTable[CMD_PROCESS] = dataResponse; /* 16 - process response */ m_handlerTable[CMD_GET_FIELD] = dataResponse; /* 17 - get field response */ m_handlerTable[CMD_MESSAGE].reset(new MessageHandler(context)); /* 18 - message to Requester */ m_handlerTable[CMD_MULTIPLE_DATA] = badResponse; // TODO new MultipleDataResponseHandler(context), /* 19 - grouped monitors */ m_handlerTable[CMD_RPC] = dataResponse; /* 20 - RPC response */ + m_handlerTable[CMD_CANCEL_REQUEST] = badResponse; /* 21 - cancel request */ } virtual void handleResponse(osiSockAddr* responseFrom, diff --git a/pvAccessApp/rpcService/rpcServer.cpp b/pvAccessApp/rpcService/rpcServer.cpp index 7f2299b..8b50efa 100644 --- a/pvAccessApp/rpcService/rpcServer.cpp +++ b/pvAccessApp/rpcService/rpcServer.cpp @@ -78,6 +78,11 @@ class ChannelRPCServiceImpl : public ChannelRPC processRequest(pvArgument, lastRequest); } + virtual void cancel() + { + // noop + } + virtual void destroy() { // noop diff --git a/pvAccessApp/server/baseChannelRequester.cpp b/pvAccessApp/server/baseChannelRequester.cpp index 928fdb2..73416fc 100644 --- a/pvAccessApp/server/baseChannelRequester.cpp +++ b/pvAccessApp/server/baseChannelRequester.cpp @@ -18,6 +18,7 @@ const Status BaseChannelRequester::noReadACLStatus = Status(Status::STATUSTYPE_E const Status BaseChannelRequester::noWriteACLStatus = Status(Status::STATUSTYPE_ERROR, "no write access"); const Status BaseChannelRequester::noProcessACLStatus = Status(Status::STATUSTYPE_ERROR, "no process access"); const Status BaseChannelRequester::otherRequestPendingStatus = Status(Status::STATUSTYPE_ERROR, "other request pending"); +const Status BaseChannelRequester::notAChannelRequestStatus = Status(Status::STATUSTYPE_ERROR, "not a channel request"); const int32 BaseChannelRequester::NULL_REQUEST = -1; diff --git a/pvAccessApp/server/baseChannelRequester.h b/pvAccessApp/server/baseChannelRequester.h index c17af5c..ffb85f5 100644 --- a/pvAccessApp/server/baseChannelRequester.h +++ b/pvAccessApp/server/baseChannelRequester.h @@ -48,6 +48,7 @@ public: static const epics::pvData::Status noWriteACLStatus; static const epics::pvData::Status noProcessACLStatus; static const epics::pvData::Status otherRequestPendingStatus; + static const epics::pvData::Status notAChannelRequestStatus; protected: const pvAccessID _ioid; Transport::shared_pointer _transport; diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index a1699e0..4d8fb56 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -50,7 +50,7 @@ ServerResponseHandler::ServerResponseHandler(ServerContextImpl::shared_pointer c MB_INIT; ResponseHandler::shared_pointer badResponse(new ServerBadResponse(context)); - m_handlerTable.resize(CMD_RPC+1); + m_handlerTable.resize(CMD_CANCEL_REQUEST+1); m_handlerTable[CMD_BEACON].reset(new ServerNoopResponse(context, "Beacon")); /* 0 */ m_handlerTable[CMD_CONNECTION_VALIDATION].reset(new ServerConnectionValidationHandler(context)); /* 1 */ @@ -68,12 +68,13 @@ ServerResponseHandler::ServerResponseHandler(ServerContextImpl::shared_pointer c m_handlerTable[CMD_PUT_GET].reset(new ServerPutGetHandler(context)); /* 12 - put-get response */ m_handlerTable[CMD_MONITOR].reset(new ServerMonitorHandler(context)); /* 13 - monitor response */ m_handlerTable[CMD_ARRAY].reset(new ServerArrayHandler(context)); /* 14 - array response */ - m_handlerTable[CMD_CANCEL_REQUEST].reset(new ServerCancelRequestHandler(context)); /* 15 - cancel request */ + m_handlerTable[CMD_DESTROY_REQUEST].reset(new ServerDestroyRequestHandler(context)); /* 15 - destroy request */ m_handlerTable[CMD_PROCESS].reset(new ServerProcessHandler(context)); /* 16 - process response */ m_handlerTable[CMD_GET_FIELD].reset(new ServerGetFieldHandler(context)); /* 17 - get field response */ m_handlerTable[CMD_MESSAGE] = badResponse; /* 18 - message to Requester */ m_handlerTable[CMD_MULTIPLE_DATA] = badResponse; /* 19 - grouped monitors */ m_handlerTable[CMD_RPC].reset(new ServerRPCHandler(context)); /* 20 - RPC response */ + m_handlerTable[CMD_CANCEL_REQUEST].reset(new ServerCancelRequestHandler(context)); /* 21 - cancel request */ } void ServerResponseHandler::handleResponse(osiSockAddr* responseFrom, @@ -1659,7 +1660,7 @@ void ServerChannelArrayRequesterImpl::send(ByteBuffer* buffer, TransportSendCont } /****************************************************************************************/ -void ServerCancelRequestHandler::handleResponse(osiSockAddr* responseFrom, +void ServerDestroyRequestHandler::handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command, size_t payloadSize, ByteBuffer* payloadBuffer) { AbstractServerResponseHandler::handleResponse(responseFrom, @@ -1693,11 +1694,56 @@ void ServerCancelRequestHandler::handleResponse(osiSockAddr* responseFrom, channel->unregisterRequest(ioid); } -void ServerCancelRequestHandler::failureResponse(Transport::shared_pointer const & transport, pvAccessID ioid, const Status& errorStatus) +void ServerDestroyRequestHandler::failureResponse(Transport::shared_pointer const & transport, pvAccessID ioid, const Status& errorStatus) { BaseChannelRequester::message(transport, ioid, errorStatus.getMessage(), warningMessage); } +/****************************************************************************************/ +void ServerCancelRequestHandler::handleResponse(osiSockAddr* responseFrom, + Transport::shared_pointer const & transport, int8 version, int8 command, + size_t payloadSize, ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + // NOTE: we do not explicitly check if transport is OK + ChannelHostingTransport::shared_pointer casTransport = dynamic_pointer_cast(transport); + + transport->ensureData(2*sizeof(int32)/sizeof(int8)); + const pvAccessID sid = payloadBuffer->getInt(); + const pvAccessID ioid = payloadBuffer->getInt(); + + ServerChannelImpl::shared_pointer channel = static_pointer_cast(casTransport->getChannel(sid)); + if (channel == NULL) + { + failureResponse(transport, ioid, BaseChannelRequester::badCIDStatus); + return; + } + + Destroyable::shared_pointer request = channel->getRequest(ioid); + if (request == NULL) + { + failureResponse(transport, ioid, BaseChannelRequester::badIOIDStatus); + return; + } + + ChannelRequest::shared_pointer cr = dynamic_pointer_cast(request); + if (cr == NULL) + { + failureResponse(transport, ioid, BaseChannelRequester::notAChannelRequestStatus); + return; + } + + // cancel + cr->cancel(); + +} + +void ServerCancelRequestHandler::failureResponse(Transport::shared_pointer const & transport, pvAccessID ioid, const Status& errorStatus) +{ + BaseChannelRequester::message(transport, ioid, errorStatus.getMessage(), warningMessage); +} + /****************************************************************************************/ void ServerProcessHandler::handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command, diff --git a/pvAccessApp/server/responseHandlers.h b/pvAccessApp/server/responseHandlers.h index 89db7ce..49a2912 100644 --- a/pvAccessApp/server/responseHandlers.h +++ b/pvAccessApp/server/responseHandlers.h @@ -573,15 +573,15 @@ namespace pvAccess { /****************************************************************************************/ /** - * Cancel request handler. + * Destroy request handler. */ - class ServerCancelRequestHandler : public AbstractServerResponseHandler + class ServerDestroyRequestHandler : public AbstractServerResponseHandler { public: - ServerCancelRequestHandler(ServerContextImpl::shared_pointer const & context) : - AbstractServerResponseHandler(context, "Cancel request") { + ServerDestroyRequestHandler(ServerContextImpl::shared_pointer const & context) : + AbstractServerResponseHandler(context, "Destroy request") { } - virtual ~ServerCancelRequestHandler() {} + virtual ~ServerDestroyRequestHandler() {} virtual void handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, epics::pvData::int8 version, epics::pvData::int8 command, @@ -592,6 +592,27 @@ namespace pvAccess { }; + /****************************************************************************************/ + /** + * Cancel request handler. + */ + class ServerCancelRequestHandler : public AbstractServerResponseHandler + { + public: + ServerCancelRequestHandler(ServerContextImpl::shared_pointer const & context) : + AbstractServerResponseHandler(context, "Cancel request") { + } + virtual ~ServerCancelRequestHandler() {} + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport::shared_pointer const & transport, epics::pvData::int8 version, epics::pvData::int8 command, + std::size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + private: + + void failureResponse(Transport::shared_pointer const & transport, pvAccessID ioid, const epics::pvData::Status& errorStatus); + }; + + /****************************************************************************************/ /** * Process request handler. diff --git a/testApp/remote/testServer.cpp b/testApp/remote/testServer.cpp index a70f593..230e636 100644 --- a/testApp/remote/testServer.cpp +++ b/testApp/remote/testServer.cpp @@ -885,6 +885,10 @@ public: destroy(); } + virtual void cancel() + { + } + virtual void destroy() { } @@ -998,6 +1002,10 @@ public: m_changed.set(); } + virtual void cancel() + { + } + virtual void destroy() { if (m_channelProcess) @@ -1097,6 +1105,10 @@ public: m_channelPutRequester->getDone(Status::Ok); } + virtual void cancel() + { + } + virtual void destroy() { if (m_channelProcess) @@ -1187,6 +1199,10 @@ public: m_channelPutGetRequester->getPutDone(Status::Ok); } + virtual void cancel() + { + } + virtual void destroy() { if (m_channelProcess) @@ -1602,6 +1618,10 @@ public: destroy(); } + virtual void cancel() + { + } + virtual void destroy() { } @@ -1794,6 +1814,10 @@ public: destroy(); } + virtual void cancel() + { + } + virtual void destroy() { } @@ -1931,6 +1955,10 @@ public: } } + virtual void cancel() + { + } + virtual void destroy() { stop();