From 1dfdaa4ec580fe2271d139cd3c75f7458e1bc915 Mon Sep 17 00:00:00 2001 From: Gasper Jansa Date: Tue, 22 Feb 2011 22:55:47 +0100 Subject: [PATCH] getHandler --- pvAccessApp/server/responseHandlers.cpp | 178 +++++++++++++++++------- pvAccessApp/server/responseHandlers.h | 19 ++- pvAccessApp/server/serverChannelImpl.h | 4 +- 3 files changed, 146 insertions(+), 55 deletions(-) diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index 7418652..c5e7941 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -55,7 +55,7 @@ namespace epics { _handlerTable[7] = new CreateChannelHandler(context); _handlerTable[8] = new DestroyChannelHandler(context); _handlerTable[9] = _badResponse; - //_handlerTable[10] = new GetHandler(context); + _handlerTable[10] = new GetHandler(context); _handlerTable[11] = _badResponse; _handlerTable[12] = _badResponse; _handlerTable[13] = _badResponse; @@ -296,6 +296,16 @@ namespace epics { _context(context) {} + ChannelFindRequesterImplObjectPool::~ChannelFindRequesterImplObjectPool() + { + for(std::vector::iterator iter = _elements.begin(); + iter != _elements.end(); iter++) + { + delete *iter; + } + _elements.erase(_elements.begin(), _elements.end()); + } + ChannelFindRequesterImpl* ChannelFindRequesterImplObjectPool::get() { Lock guard(_mutex); @@ -320,6 +330,49 @@ namespace epics { } /****************************************************************************************/ + void CreateChannelHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + // TODO for not only one request at the time is supported, i.e. dataCount == 1 + + transport->ensureData((sizeof(int32)+sizeof(int16))/sizeof(int8)); + const int16 count = payloadBuffer->getShort(); + if (count != 1) + { + THROW_BASE_EXCEPTION("only 1 supported for now"); + } + const pvAccessID cid = payloadBuffer->getInt(); + + String channelName = SerializeHelper::deserializeString(payloadBuffer, transport); + if (channelName.size() == 0) + { + + char host[100]; + sockAddrToA(&transport->getRemoteAddress()->sa,host,100); + errlogSevPrintf(errlogMinor,"Zero length channel name, disconnecting client: %s", host); + disconnect(transport); + return; + } + else if (channelName.size() > UNREASONABLE_CHANNEL_NAME_LENGTH) + { + char host[100]; + sockAddrToA(&transport->getRemoteAddress()->sa,host,100); + errlogSevPrintf(errlogMinor,"Unreasonable channel name length, disconnecting client: %s", host); + disconnect(transport); + return; + } + + ChannelRequester* cr = new ChannelRequesterImpl(transport, channelName, cid); + _provider->createChannel(channelName, cr, transport->getPriority()); + } + + void CreateChannelHandler::disconnect(Transport* transport) + { + transport->close(true); + } ChannelRequesterImpl::ChannelRequesterImpl(Transport* transport, const String channelName, const pvAccessID cid) : _transport(transport), @@ -351,9 +404,8 @@ namespace epics { return name.str(); } - void ChannelRequesterImpl::message(const String message, const epics::pvData::MessageType messageType) + void ChannelRequesterImpl::message(const String message, const MessageType messageType) { - // TODO review errlogSevPrintf(errlogMinor, "[%s] %s", messageTypeName[messageType].c_str(), message.c_str()); } @@ -417,9 +469,7 @@ namespace epics { catch (std::exception& e) { errlogSevPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName.c_str()); - //TODO implement BaseChannelRequester - //createChannelFailedResponse(buffer, control, - // BaseChannelRequester.statusCreate.createStatus(StatusType.FATAL, "failed to create channel", e)); + createChannelFailedResponse(buffer, control, Status(Status::STATUSTYPE_FATAL, "failed to create channel", e.what())); if (serverChannel != NULL) { serverChannel->destroy(); @@ -428,9 +478,7 @@ namespace epics { catch (...) { errlogSevPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName.c_str()); - //TODO implement BaseChannelRequester - //createChannelFailedResponse(buffer, control, - // BaseChannelRequester.statusCreate.createStatus(StatusType.FATAL, "failed to create channel", e)); + createChannelFailedResponse(buffer, control, Status(Status::STATUSTYPE_FATAL, "failed to create channel")); if (serverChannel != NULL) { serverChannel->destroy(); @@ -439,6 +487,16 @@ namespace epics { } } + void ChannelRequesterImpl::release() + { + delete this; + } + + void ChannelRequesterImpl::acquire() + { + //noop + } + void ChannelRequesterImpl::createChannelFailedResponse(ByteBuffer* buffer, TransportSendControl* control, const Status& status) { control->startMessage((int8)7, 2*sizeof(int32)/sizeof(int8)); @@ -515,30 +573,30 @@ namespace epics { if (init) { // pvRequest - //PVStructurePtr pvRequest = transport->getIntrospectionRegistry()->deserializePVRequest(payloadBuffer, transport); + PVStructurePtr pvRequest = transport->getIntrospectionRegistry()->deserializePVRequest(payloadBuffer, transport); // create... - // new ChannelGetRequesterImpl(_context, channel, ioid, transport, pvRequest); + new ChannelGetRequesterImpl(_context, channel, ioid, transport, pvRequest); } - /* else + else { - final boolean lastRequest = QoS.DESTROY.isSet(qosCode); + const boolean lastRequest = (QOS_DESTROY & qosCode) != 0; - ChannelGetRequesterImpl request = (ChannelGetRequesterImpl)channel.getRequest(ioid); - if (request == null) { - BaseChannelRequester.sendFailureMessage((byte)10, transport, ioid, qosCode, BaseChannelRequester.badIOIDStatus); + ChannelGetRequesterImpl* request = static_cast(channel->getRequest(ioid)); + if (request == NULL) + { + BaseChannelRequester::sendFailureMessage((int8)10, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); return; } - if (!request.startRequest(qosCode)) { - BaseChannelRequester.sendFailureMessage((byte)10, transport, ioid, qosCode, BaseChannelRequester.otherRequestPendingStatus); + if (!request->startRequest(qosCode)) + { + BaseChannelRequester::sendFailureMessage((int8)10, transport, ioid, qosCode, BaseChannelRequester::otherRequestPendingStatus); return; } - request.getChannelGet().get(lastRequest); - } - */ - + request->getChannelGet()->get(lastRequest); + } } ChannelGetRequesterImpl::ChannelGetRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, const pvAccessID ioid, Transport* transport, @@ -593,6 +651,13 @@ namespace epics { return _channelGet; } + String ChannelGetRequesterImpl::getRequesterName() + { + stringstream name; + name << typeid(*_transport).name(); + return name.str(); + } + void ChannelGetRequesterImpl::lock() { //TODO @@ -603,41 +668,56 @@ namespace epics { //TODO } + void ChannelGetRequesterImpl::release() + { + delete this; + } + + void ChannelGetRequesterImpl::acquire() + { + //noop + } + + void ChannelGetRequesterImpl::message(const String message, MessageType messageType) + { + errlogSevPrintf(errlogMinor, "[%s] %s", messageTypeName[messageType].c_str(), message.c_str()); + } + void ChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) { const int32 request = getPendingRequest(); - control->startMessage((int8)10, sizeof(int32)/sizeof(int8) + 1); - buffer->putInt(_ioid); - buffer->put((int8)request); - IntrospectionRegistry* introspectionRegistry = _transport->getIntrospectionRegistry(); - { - Lock guard(_mutex); - introspectionRegistry->serializeStatus(buffer, control, _status); - } + control->startMessage((int8)10, sizeof(int32)/sizeof(int8) + 1); + buffer->putInt(_ioid); + buffer->put((int8)request); + IntrospectionRegistry* introspectionRegistry = _transport->getIntrospectionRegistry(); + { + Lock guard(_mutex); + introspectionRegistry->serializeStatus(buffer, control, _status); + } - if (_status.isSuccess()) - { - if (request & QOS_INIT) - { - Lock guard(_mutex); - introspectionRegistry->serialize(_pvStructure != NULL ? _pvStructure->getField() : NULL, buffer, control); + if (_status.isSuccess()) + { + if (request & QOS_INIT) + { + Lock guard(_mutex); + introspectionRegistry->serialize(_pvStructure != NULL ? _pvStructure->getField() : NULL, buffer, control); - } - else - { - _bitSet->serialize(buffer, control); - _pvStructure->serialize(buffer, control, _bitSet); - } - } + } + else + { + _bitSet->serialize(buffer, control); + _pvStructure->serialize(buffer, control, _bitSet); + } + } - stopRequest(); + stopRequest(); - // lastRequest - if (request & QOS_DESTROY) - { - destroy(); - } + // lastRequest + if (request & QOS_DESTROY) + { + destroy(); + } } } } diff --git a/pvAccessApp/server/responseHandlers.h b/pvAccessApp/server/responseHandlers.h index 3c5c759..868c69c 100644 --- a/pvAccessApp/server/responseHandlers.h +++ b/pvAccessApp/server/responseHandlers.h @@ -160,7 +160,7 @@ namespace epics { /****************************************************************************************/ /** - * Introspection search request handler. + * Search channel request handler. */ class ChannelFindRequesterImplObjectPool; class SearchHandler : public AbstractServerResponseHandler @@ -209,6 +209,7 @@ namespace epics { { public: ChannelFindRequesterImplObjectPool(ServerContextImpl* context); + ~ChannelFindRequesterImplObjectPool(); ChannelFindRequesterImpl* get(); void put(ChannelFindRequesterImpl* element); @@ -229,15 +230,19 @@ namespace epics { */ CreateChannelHandler(ServerContextImpl* context) : AbstractServerResponseHandler(context, "Create channel request") { + _provider = context->getChannelProvider(); } - //TODO where is implementation??? virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); private: + /** + * Disconnect. + */ void disconnect(Transport* transport); + ChannelProvider* _provider; }; class ChannelRequesterImpl : public ChannelRequester, public TransportSender @@ -251,6 +256,8 @@ namespace epics { void lock(); void unlock(); void send(ByteBuffer* buffer, TransportSendControl* control); + void release(); + void acquire(); private: Transport* _transport; @@ -304,7 +311,7 @@ namespace epics { } void release() { - delete this; + delete this; } void acquire() { @@ -335,7 +342,7 @@ namespace epics { int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); }; - class ChannelGetRequesterImpl : private BaseChannelRequester, public ChannelGetRequester, public TransportSender + class ChannelGetRequesterImpl : public BaseChannelRequester, public ChannelGetRequester, public TransportSender { public: ChannelGetRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, const pvAccessID ioid, Transport* transport, @@ -348,9 +355,13 @@ namespace epics { * @return the channelGet */ ChannelGet* getChannelGet(); + String getRequesterName(); void lock(); void unlock(); + void release(); + void acquire(); void send(ByteBuffer* buffer, TransportSendControl* control); + void message(const String message, const epics::pvData::MessageType messageType); private: ChannelGet* _channelGet; epics::pvData::BitSet* _bitSet; diff --git a/pvAccessApp/server/serverChannelImpl.h b/pvAccessApp/server/serverChannelImpl.h index dadbc54..8f966a8 100644 --- a/pvAccessApp/server/serverChannelImpl.h +++ b/pvAccessApp/server/serverChannelImpl.h @@ -59,7 +59,7 @@ public: * @param id request ID. * @param request request to be registered. */ - void registerRequest(pvAccessID id, Destroyable* request); + void registerRequest(pvAccessID id, epics::pvData::Destroyable* request); /** * Unregister request. @@ -72,7 +72,7 @@ public: * @param id request ID. * @return request with given ID, null if there is no request with such ID. */ - Destroyable* getRequest(pvAccessID id); + epics::pvData::Destroyable* getRequest(pvAccessID id); /** * Destroy server channel.