diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index ce2bf46..ba69edc 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -52,8 +52,8 @@ namespace epics { _handlerTable[4] = badResponse; _handlerTable[5] = new IntrospectionSearchHandler(context); _handlerTable[6] = badResponse; - _handlerTable[7] = badResponse; - _handlerTable[8] = badResponse; + _handlerTable[7] = new CreateChannelHandler(context); + _handlerTable[8] = new DestroyChannelHandler(context); _handlerTable[9] = badResponse; _handlerTable[10] = badResponse; _handlerTable[11] = badResponse; @@ -81,6 +81,8 @@ namespace epics { delete _handlerTable[2]; delete _handlerTable[3]; delete _handlerTable[5]; + delete _handlerTable[7]; + delete _handlerTable[8]; delete _handlerTable[27]; delete[] _handlerTable; } @@ -177,6 +179,8 @@ namespace epics { THROW_BASE_EXCEPTION("not implemented"); } + /****************************************************************************************/ + SearchHandler::SearchHandler(ServerContextImpl* context) : AbstractServerResponseHandler(context, "Introspection search request") { @@ -313,8 +317,9 @@ namespace epics { _elements.push_back(element); } + /****************************************************************************************/ - ChannelRequesterImpl::ChannelRequesterImpl(Transport* transport, const String channelName, const int32 cid) : + ChannelRequesterImpl::ChannelRequesterImpl(Transport* transport, const String channelName, const pvAccessID cid) : _transport(transport), _channelName(channelName), _cid(cid), @@ -329,6 +334,7 @@ namespace epics { Lock guard(_mutex); _status = status; _channel = channel; + _transport->enqueueSendRequest(this); } void ChannelRequesterImpl::channelStateChange(Channel* constc, const Channel::ConnectionState isConnected) @@ -338,14 +344,15 @@ namespace epics { String ChannelRequesterImpl::getRequesterName() { - //TODO - // return _transport-> + "/" + _cid; + stringstream name; + name << typeid(*_transport).name() << "/" << _cid; + return name.str(); } void ChannelRequesterImpl::message(const String message, const epics::pvData::MessageType messageType) { - // TODO - //System.err.println("[" + messageType + "] " + message); + // TODO review + errlogSevPrintf(errlogMinor, "[%s] %s", messageTypeName[messageType].c_str(), message.c_str()); } void ChannelRequesterImpl::lock() @@ -360,8 +367,8 @@ namespace epics { void ChannelRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) { - /* Channel* channel; - Status* status; + Channel* channel; + Status status; { Lock guard(_mutex); channel = _channel; @@ -376,19 +383,18 @@ namespace epics { // OK else { - ServerChannelImpl serverChannel = NULL; + ServerChannelImpl* serverChannel = NULL; try { // NOTE: we do not explicitly check if transport OK - ChannelHostingTransport* casTransport = static_cast(_transport); + ChannelHostingTransport* casTransport = dynamic_cast(_transport); // // create a new channel instance // - int sid = casTransport->preallocateChannelSID(); + pvAccessID sid = casTransport->preallocateChannelSID(); try { - //TODO serverChannel = new ServerChannelImpl(channel, _cid, sid, casTransport->getSecurityToken()); // ack allocation and register @@ -406,15 +412,76 @@ namespace epics { buffer->putInt(sid); _transport->getIntrospectionRegistry()->serializeStatus(buffer, control, status); } + catch (std::exception& e) + { + errlogSevPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName.c_str()); + //TODO implement BaseChannelRequester + //createChannelFailedResponse(buffer, control, + // BaseChannelRequester.statusCreate.createStatus(StatusType.FATAL, "failed to create channel", e)); + if (serverChannel != NULL) + { + serverChannel->destroy(); + } + } catch (...) { - errlogPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName); - createChannelFailedResponse(buffer, control, - //BaseChannelRequester.statusCreate.createStatus(StatusType.FATAL, "failed to create channel", th)); - // if (serverChannel != null) - // serverChannel.destroy(); + errlogSevPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName.c_str()); + //TODO implement BaseChannelRequester + //createChannelFailedResponse(buffer, control, + // BaseChannelRequester.statusCreate.createStatus(StatusType.FATAL, "failed to create channel", e)); + if (serverChannel != NULL) + { + serverChannel->destroy(); + } } - }*/ + } + } + + void ChannelRequesterImpl::createChannelFailedResponse(ByteBuffer* buffer, TransportSendControl* control, const Status& status) + { + control->startMessage((int8)7, 2*sizeof(int32)/sizeof(int8)); + buffer->putInt(_cid); + buffer->putInt(-1); + _transport->getIntrospectionRegistry()->serializeStatus(buffer, control, status); + } + + /****************************************************************************************/ + + void DestroyChannelHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + // NOTE: we do not explicitly check if transport OK + ChannelHostingTransport* casTransport = dynamic_cast(transport); + + + transport->ensureData(2*sizeof(int32)/sizeof(int8)); + const pvAccessID sid = payloadBuffer->getInt(); + const pvAccessID cid = payloadBuffer->getInt(); + + // get channel by SID + ServerChannelImpl* channel = static_cast(casTransport->getChannel(sid)); + if (channel == NULL) + { + if (!transport->isClosed()) + { + char host[100]; + sockAddrToA(&responseFrom->sa,host,100); + errlogSevPrintf(errlogMinor, "Trying to destroy a channel that no longer exists (SID: %d, CID %d, client: %s).", sid, cid, host); + } + return; + } + + // destroy + channel->destroy(); + + // .. and unregister + casTransport->unregisterChannel(sid); + + // send response back + transport->enqueueSendRequest( new DestroyChannelHandlerTransportSender(cid, sid)); } } } diff --git a/pvAccessApp/server/responseHandlers.h b/pvAccessApp/server/responseHandlers.h index ab5d97e..cda5e39 100644 --- a/pvAccessApp/server/responseHandlers.h +++ b/pvAccessApp/server/responseHandlers.h @@ -10,6 +10,7 @@ #include "serverContext.h" #include "remote.h" +#include "serverChannelImpl.h" namespace epics { namespace pvAccess { @@ -152,6 +153,7 @@ namespace epics { int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); }; + /****************************************************************************************/ /** * Introspection search request handler. */ @@ -210,7 +212,7 @@ namespace epics { epics::pvData::Mutex _mutex; ServerContextImpl* _context; }; - + /****************************************************************************************/ /** * Create channel request handler. */ @@ -232,7 +234,6 @@ namespace epics { void disconnect(Transport* transport); }; - class ServerChannelImpl; class ChannelRequesterImpl : public ChannelRequester, public TransportSender { public: @@ -248,14 +249,66 @@ namespace epics { private: Transport* _transport; const String _channelName; - const int32 _cid; + const pvAccessID _cid; Status _status; Channel* _channel; epics::pvData::Mutex _mutex; - void createChannelFailedResponse(ByteBuffer* buffer, TransportSendControl* control, Status* const status); + void createChannelFailedResponse(ByteBuffer* buffer, TransportSendControl* control, const Status& status); + }; + + /****************************************************************************************/ + /** + * Destroy channel request handler. + */ + class DestroyChannelHandler : public AbstractServerResponseHandler + { + public: + /** + * @param context + */ + DestroyChannelHandler(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "Destroy channel request") { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); }; + class DestroyChannelHandlerTransportSender : public TransportSender + { + public: + DestroyChannelHandlerTransportSender(pvAccessID cid, pvAccessID sid): _cid(cid), _sid(sid) { + + } + + void send(ByteBuffer* buffer, TransportSendControl* control) { + control->startMessage((int8)8, 2*sizeof(int32)/sizeof(int8)); + buffer->putInt(_sid); + buffer->putInt(_cid); + } + + void lock() { + // noop + } + + void unlock() { + // noop + } + + void release() { + delete this; + } + + void acquire() { + // noop + } + + private: + pvAccessID _cid; + pvAccessID _sid; + }; } }