diff --git a/pvAccessApp/Makefile b/pvAccessApp/Makefile index fd2ce7b..03d6aa7 100644 --- a/pvAccessApp/Makefile +++ b/pvAccessApp/Makefile @@ -44,10 +44,12 @@ INC += serverContext.h INC += responseHandlers.h INC += serverChannelImpl.h INC += baseChannelRequester.h +INC += referencedTransportSender.h LIBSRCS += responseHandlers.cpp LIBSRCS += serverContext.cpp LIBSRCS += serverChannelImpl.cpp LIBSRCS += baseChannelRequester.cpp +LIBSRCS += referencedTransportSender.h SRC_DIRS += $(PVACCESS)/factory LIBSRCS += ChannelAccessFactory.cpp diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 856c353..89ba5f5 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -113,7 +113,7 @@ namespace epics { * @author Matej Sekoranja * @version $Id: TransportSender.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ */ - class TransportSender : public ReferenceCountingInstance { + class TransportSender : virtual public ReferenceCountingInstance { public: virtual ~TransportSender() { } diff --git a/pvAccessApp/server/baseChannelRequester.cpp b/pvAccessApp/server/baseChannelRequester.cpp index a602803..f549e4b 100644 --- a/pvAccessApp/server/baseChannelRequester.cpp +++ b/pvAccessApp/server/baseChannelRequester.cpp @@ -23,7 +23,8 @@ BaseChannelRequester::BaseChannelRequester(ServerContextImpl* context, ServerCha _transport(transport), _channel(channel), _context(context), - _pendingRequest(BaseChannelRequester::NULL_REQUEST) + _pendingRequest(BaseChannelRequester::NULL_REQUEST), + _refCount(1) { } @@ -35,7 +36,6 @@ boolean BaseChannelRequester::startRequest(int32 qos) { return false; } - _pendingRequest = qos; return true; } @@ -59,12 +59,12 @@ String BaseChannelRequester::getRequesterName() return name.str(); } -void BaseChannelRequester::message(const String& message, const epics::pvData::MessageType messageType) +void BaseChannelRequester::message(const String message, const epics::pvData::MessageType messageType) { BaseChannelRequester::message(_transport, _ioid, message, messageType); } -void BaseChannelRequester::message(Transport* transport, const pvAccessID ioid, const String& message, const MessageType messageType) +void BaseChannelRequester::message(Transport* transport, const pvAccessID ioid, const String message, const MessageType messageType) { transport->enqueueSendRequest( new BaseChannelRequesterMessageTransportSender(ioid, message, messageType)); } @@ -73,6 +73,23 @@ void BaseChannelRequester::sendFailureMessage(const int8 command, Transport* tra { transport->enqueueSendRequest( new BaseChannelRequesterFailureMessageTransportSender(command, transport, ioid, qos, status)); } +/* +void BaseChannelRequester::release() +{ + _mutex.lock(); + _refCount--; + _mutex.unlock(); + if (_refCount == 0) + { + delete this; + } +} + +void BaseChannelRequester::acquire() +{ + Lock guard(_mutex); + _refCount++; +}*/ BaseChannelRequesterMessageTransportSender::BaseChannelRequesterMessageTransportSender(const pvAccessID ioid, const String message,const epics::pvData::MessageType messageType): _ioid(ioid), @@ -139,7 +156,7 @@ void BaseChannelRequesterFailureMessageTransportSender::unlock() void BaseChannelRequesterFailureMessageTransportSender::release() { - delete this; + } void BaseChannelRequesterFailureMessageTransportSender::acquire() diff --git a/pvAccessApp/server/baseChannelRequester.h b/pvAccessApp/server/baseChannelRequester.h index 36eefa6..8bf5c6d 100644 --- a/pvAccessApp/server/baseChannelRequester.h +++ b/pvAccessApp/server/baseChannelRequester.h @@ -14,7 +14,7 @@ namespace epics { namespace pvAccess { -class BaseChannelRequester : public epics::pvData::Requester, public epics::pvData::Destroyable +class BaseChannelRequester : virtual public epics::pvData::Requester, public epics::pvData::Destroyable//, virtual public ReferenceCountingInstance { public: BaseChannelRequester(ServerContextImpl* context, ServerChannelImpl* channel,const pvAccessID ioid, Transport* transport); @@ -24,9 +24,11 @@ public: void stopRequest(); int32 getPendingRequest(); String getRequesterName(); - void message(const String& message, const epics::pvData::MessageType messageType); - static void message(Transport* transport, const pvAccessID ioid, const String& message, const epics::pvData::MessageType messageType); + void message(const String message, const epics::pvData::MessageType messageType); + static void message(Transport* transport, const pvAccessID ioid, const String message, const epics::pvData::MessageType messageType); static void sendFailureMessage(const int8 command, Transport* transport, const pvAccessID ioid, const int8 qos, const Status status); + //void release(); + //void acquire(); static const Status okStatus; static const Status badCIDStatus; @@ -39,11 +41,12 @@ protected: const pvAccessID _ioid; Transport* _transport; ServerChannelImpl* _channel; + epics::pvData::Mutex _mutex; private: ServerContextImpl* _context; static const int32 NULL_REQUEST; int32 _pendingRequest; - epics::pvData::Mutex _mutex; + int32 _refCount; }; class BaseChannelRequesterMessageTransportSender : public TransportSender diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index c5e7941..f61c37d 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -22,702 +22,1856 @@ using std::hex; using namespace epics::pvData; namespace epics { - namespace pvAccess { +namespace pvAccess { - void BadResponse::handleResponse(osiSockAddr* responseFrom, - Transport* transport, int8 version, int8 command, - int payloadSize, ByteBuffer* payloadBuffer) { - AbstractServerResponseHandler::handleResponse(responseFrom, - transport, version, command, payloadSize, payloadBuffer); +void ServerBadResponse::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); - char ipAddrStr[48]; - ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); + char ipAddrStr[48]; + ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); - errlogSevPrintf(errlogInfo, - "Undecipherable message (bad response type %d) from %s.", - command, ipAddrStr); + errlogSevPrintf(errlogInfo, + "Undecipherable message (bad response type %d) from %s.", + command, ipAddrStr); - } +} - ServerResponseHandler::ServerResponseHandler(ServerContextImpl* context) { +ServerResponseHandler::ServerResponseHandler(ServerContextImpl* context) { - _badResponse = new BadResponse(context); + _badResponse = new ServerBadResponse(context); - _handlerTable = new ResponseHandler*[HANDLER_TABLE_LENGTH]; - // TODO add real handlers, as they are developed - _handlerTable[0] = new NoopResponse(context, "Beacon"); - _handlerTable[1] = new ConnectionValidationHandler(context); - _handlerTable[2] = new EchoHandler(context); - _handlerTable[3] = new SearchHandler(context); - _handlerTable[4] = _badResponse; - _handlerTable[5] = new IntrospectionSearchHandler(context); - _handlerTable[6] = _badResponse; - _handlerTable[7] = new CreateChannelHandler(context); - _handlerTable[8] = new DestroyChannelHandler(context); - _handlerTable[9] = _badResponse; - _handlerTable[10] = new GetHandler(context); - _handlerTable[11] = _badResponse; - _handlerTable[12] = _badResponse; - _handlerTable[13] = _badResponse; - _handlerTable[14] = _badResponse; - _handlerTable[15] = _badResponse; - _handlerTable[16] = _badResponse; - _handlerTable[17] = _badResponse; - _handlerTable[18] = _badResponse; - _handlerTable[19] = _badResponse; - _handlerTable[20] = _badResponse; - _handlerTable[21] = _badResponse; - _handlerTable[22] = _badResponse; - _handlerTable[23] = _badResponse; - _handlerTable[24] = _badResponse; - _handlerTable[25] = _badResponse; - _handlerTable[26] = _badResponse; - _handlerTable[27] = _badResponse; - } + _handlerTable = new ResponseHandler*[HANDLER_TABLE_LENGTH]; + _handlerTable[0] = new ServerNoopResponse(context, "Beacon"); + _handlerTable[1] = new ServerConnectionValidationHandler(context); + _handlerTable[2] = new ServerEchoHandler(context); + _handlerTable[3] = new ServerSearchHandler(context); + _handlerTable[4] = _badResponse; + _handlerTable[5] = new ServerIntrospectionSearchHandler(context); + _handlerTable[6] = _badResponse; + _handlerTable[7] = new ServerCreateChannelHandler(context); + _handlerTable[8] = new ServerDestroyChannelHandler(context); + _handlerTable[9] = _badResponse; + _handlerTable[10] = new ServerGetHandler(context); + _handlerTable[11] = new ServerPutHandler(context); + _handlerTable[12] = new ServerPutGetHandler(context); + _handlerTable[13] = new ServerMonitorHandler(context); + _handlerTable[14] = new ServerArrayHandler(context); + _handlerTable[15] = new ServerCancelRequestHandler(context); + _handlerTable[16] = new ServerProcessHandler(context); + _handlerTable[17] = new ServerGetFieldHandler(context); + _handlerTable[18] = _badResponse; + _handlerTable[19] = _badResponse; + _handlerTable[20] = new ServerRPCHandler(context); + _handlerTable[21] = _badResponse; + _handlerTable[22] = _badResponse; + _handlerTable[23] = _badResponse; + _handlerTable[24] = _badResponse; + _handlerTable[25] = _badResponse; + _handlerTable[26] = _badResponse; + _handlerTable[27] = _badResponse; +} - ServerResponseHandler::~ServerResponseHandler() { - delete _badResponse; - delete _handlerTable[0]; - delete _handlerTable[1]; - delete _handlerTable[2]; - delete _handlerTable[3]; - delete _handlerTable[5]; - delete _handlerTable[7]; - delete _handlerTable[8]; - delete _handlerTable[10]; - delete _handlerTable[27]; - delete[] _handlerTable; - } +ServerResponseHandler::~ServerResponseHandler() { + delete _badResponse; + delete _handlerTable[0]; + delete _handlerTable[1]; + delete _handlerTable[2]; + delete _handlerTable[3]; + delete _handlerTable[5]; + delete _handlerTable[7]; + delete _handlerTable[8]; + delete _handlerTable[10]; + delete _handlerTable[11]; + delete _handlerTable[12]; + delete _handlerTable[13]; + delete _handlerTable[14]; + delete _handlerTable[15]; + delete _handlerTable[16]; + delete _handlerTable[17]; + delete _handlerTable[20]; + delete[] _handlerTable; +} - void ServerResponseHandler::handleResponse(osiSockAddr* responseFrom, - Transport* transport, int8 version, int8 command, - int payloadSize, ByteBuffer* payloadBuffer) { - if(command<0||command>=HANDLER_TABLE_LENGTH) { - errlogSevPrintf(errlogMinor, - "Invalid (or unsupported) command: %x.", (0xFF&command)); - // TODO remove debug output - ostringstream name; - name<<"Invalid CA header "<=HANDLER_TABLE_LENGTH) { + errlogSevPrintf(errlogMinor, + "Invalid (or unsupported) command: %x.", (0xFF&command)); + // TODO remove debug output + ostringstream name; + name<<"Invalid CA header "<getArray(), - payloadBuffer->getPosition(), payloadSize); - return; - } + hexDump(name.str(), (const int8*)payloadBuffer->getArray(), + payloadBuffer->getPosition(), payloadSize); + return; + } - // delegate - _handlerTable[command]->handleResponse(responseFrom, transport, - version, command, payloadSize, payloadBuffer); - } + // delegate + _handlerTable[command]->handleResponse(responseFrom, transport, + version, command, payloadSize, payloadBuffer); +} - void ConnectionValidationHandler::handleResponse( - osiSockAddr* responseFrom, Transport* transport, int8 version, - int8 command, int payloadSize, - epics::pvData::ByteBuffer* payloadBuffer) { - AbstractServerResponseHandler::handleResponse(responseFrom, - transport, version, command, payloadSize, payloadBuffer); +void ServerConnectionValidationHandler::handleResponse( + osiSockAddr* responseFrom, Transport* transport, int8 version, + int8 command, int payloadSize, + ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); - transport->ensureData(2*sizeof(int32)+sizeof(int16)); - transport->setRemoteTransportReceiveBufferSize( - payloadBuffer->getInt()); - transport->setRemoteTransportSocketReceiveBufferSize( - payloadBuffer->getInt()); - transport->setRemoteMinorRevision(version); - // TODO support priority !!! - //transport.setPriority(payloadBuffer.getShort()); - } + transport->ensureData(2*sizeof(int32)+sizeof(int16)); + transport->setRemoteTransportReceiveBufferSize( + payloadBuffer->getInt()); + transport->setRemoteTransportSocketReceiveBufferSize( + payloadBuffer->getInt()); + transport->setRemoteMinorRevision(version); + // TODO support priority !!! + //transport.setPriority(payloadBuffer.getShort()); +} - class EchoTransportSender : public TransportSender { - public: - EchoTransportSender(osiSockAddr* echoFrom) { - memcpy(&_echoFrom, echoFrom, sizeof(osiSockAddr)); - } +void ServerEchoHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); - virtual void send(ByteBuffer* buffer, TransportSendControl* control) { - control->startMessage(CMD_ECHO, 0); - control->setRecipient(_echoFrom); - } + EchoTransportSender* echoReply = new EchoTransportSender( + responseFrom); - virtual void lock() { - } + // send back + transport->enqueueSendRequest(echoReply); +} - virtual void unlock() { - } - - virtual void acquire() { - } - - virtual void release() { - delete this; - } - - private: - osiSockAddr _echoFrom; +void ServerIntrospectionSearchHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); - virtual ~EchoTransportSender() { - } - }; + THROW_BASE_EXCEPTION("not implemented"); +} - void EchoHandler::handleResponse(osiSockAddr* responseFrom, - Transport* transport, int8 version, int8 command, - int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { - AbstractServerResponseHandler::handleResponse(responseFrom, - transport, version, command, payloadSize, payloadBuffer); +/****************************************************************************************/ - EchoTransportSender* echoReply = new EchoTransportSender( - responseFrom); +ServerSearchHandler::ServerSearchHandler(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "Introspection search request") + { + _provider = context->getChannelProvider(); + _objectPool = new ServerChannelFindRequesterImplObjectPool(context); + } - // send back - transport->enqueueSendRequest(echoReply); - } +ServerSearchHandler::~ServerSearchHandler() +{ + if(_objectPool) delete _objectPool; +} +void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); - void IntrospectionSearchHandler::handleResponse(osiSockAddr* responseFrom, - Transport* transport, int8 version, int8 command, - int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { - AbstractServerResponseHandler::handleResponse(responseFrom, - transport, version, command, payloadSize, payloadBuffer); + transport->ensureData((sizeof(int32)+sizeof(int16))/sizeof(int8)+1); + const int32 searchSequenceId = payloadBuffer->getInt(); + const int8 qosCode = payloadBuffer->getByte(); + const int32 count = payloadBuffer->getShort() & 0xFFFF; + const boolean responseRequired = (QOS_REPLY_REQUIRED & qosCode) != 0; - THROW_BASE_EXCEPTION("not implemented"); - } + for (int32 i = 0; i < count; i++) + { + transport->ensureData(sizeof(int32)/sizeof(int8)); + const int32 cid = payloadBuffer->getInt(); + const String name = SerializeHelper::deserializeString(payloadBuffer, transport); + // no name check here... - /****************************************************************************************/ + _provider->channelFind(name, _objectPool->get()->set(searchSequenceId, cid, responseFrom, responseRequired)); + } +} - SearchHandler::SearchHandler(ServerContextImpl* context) : - AbstractServerResponseHandler(context, "Introspection search request") - { - _provider = context->getChannelProvider(); - _objectPool = new ChannelFindRequesterImplObjectPool(context); - } +ServerChannelFindRequesterImpl::ServerChannelFindRequesterImpl(ServerContextImpl* context, ServerChannelFindRequesterImplObjectPool* objectPool) : + _sendTo(NULL), + _context(context), + _objectPool(objectPool) + {} - SearchHandler::~SearchHandler() - { - if(_objectPool) delete _objectPool; - } +void ServerChannelFindRequesterImpl::clear() +{ + Lock guard(_mutex); + _sendTo = NULL; +} - void SearchHandler::handleResponse(osiSockAddr* responseFrom, - Transport* transport, int8 version, int8 command, - int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { - AbstractServerResponseHandler::handleResponse(responseFrom, - transport, version, command, payloadSize, payloadBuffer); +ServerChannelFindRequesterImpl* ServerChannelFindRequesterImpl::set(int32 searchSequenceId, int32 cid, osiSockAddr* sendTo, boolean responseRequired) +{ + Lock guard(_mutex); + _searchSequenceId = searchSequenceId; + _cid = cid; + _sendTo = sendTo; + _responseRequired = responseRequired; + return this; +} - transport->ensureData((sizeof(int32)+sizeof(int16))/sizeof(int8)+1); - const int32 searchSequenceId = payloadBuffer->getInt(); - const int8 qosCode = payloadBuffer->getByte(); - const int32 count = payloadBuffer->getShort() & 0xFFFF; - const boolean responseRequired = (QOS_REPLY_REQUIRED & qosCode) != 0; +void ServerChannelFindRequesterImpl::channelFindResult(const Status& status, ChannelFind* channelFind, boolean wasFound) +{ + // TODO status + Lock guard(_mutex); + if (wasFound || _responseRequired) + { + _wasFound = wasFound; + _context->getBroadcastTransport()->enqueueSendRequest(this); + } +} - for (int32 i = 0; i < count; i++) - { - transport->ensureData(sizeof(int32)/sizeof(int8)); - const int32 cid = payloadBuffer->getInt(); - const String name = SerializeHelper::deserializeString(payloadBuffer, transport); - // no name check here... +void ServerChannelFindRequesterImpl::lock() +{ + // noop +} - _provider->channelFind(name, _objectPool->get()->set(searchSequenceId, cid, responseFrom, responseRequired)); - } - } +void ServerChannelFindRequesterImpl::unlock() +{ + // noop +} - ChannelFindRequesterImpl::ChannelFindRequesterImpl(ServerContextImpl* context, ChannelFindRequesterImplObjectPool* objectPool) : - _sendTo(NULL), - _context(context), - _objectPool(objectPool) - {} +void ServerChannelFindRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) +{ + int32 count = 1; + control->startMessage((int8)4, (sizeof(int32)+sizeof(int8)+128+2*sizeof(int16)+count*sizeof(int32))/sizeof(8)); - void ChannelFindRequesterImpl::clear() - { - Lock guard(_mutex); - _sendTo = NULL; - } + Lock guard(_mutex); + buffer->putInt(_searchSequenceId); + buffer->putByte(_wasFound ? (int8)1 : (int8)0); - ChannelFindRequesterImpl* ChannelFindRequesterImpl::set(int32 searchSequenceId, int32 cid, osiSockAddr* sendTo, boolean responseRequired) + // NOTE: is it possible (very likely) that address is any local address ::ffff:0.0.0.0 + encodeAsIPv6Address(buffer, _context->getServerInetAddress()); + buffer->putShort((int16)_context->getServerPort()); + buffer->putShort((int16)count); + buffer->putInt(_cid); + + control->setRecipient(*_sendTo); + + // return this object to the pool + _objectPool->put(this); +} + +ServerChannelFindRequesterImplObjectPool::ServerChannelFindRequesterImplObjectPool(ServerContextImpl* context) : + _context(context) +{} + +ServerChannelFindRequesterImplObjectPool::~ServerChannelFindRequesterImplObjectPool() +{ + for(std::vector::iterator iter = _elements.begin(); + iter != _elements.end(); iter++) + { + delete *iter; + } + _elements.erase(_elements.begin(), _elements.end()); +} + +ServerChannelFindRequesterImpl* ServerChannelFindRequesterImplObjectPool::get() +{ + Lock guard(_mutex); + const int32 count = _elements.size(); + if (count == 0) + { + return new ServerChannelFindRequesterImpl(_context, this); + } + else + { + ServerChannelFindRequesterImpl* channelFindRequesterImpl = _elements.back(); + _elements.pop_back(); + return channelFindRequesterImpl; + } +} + +void ServerChannelFindRequesterImplObjectPool::put(ServerChannelFindRequesterImpl* element) +{ + Lock guard(_mutex); + element->clear(); + _elements.push_back(element); +} + +/****************************************************************************************/ +void ServerCreateChannelHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + // TODO for not only one request at the time is supported, i.e. dataCount == 1 + transport->ensureData((sizeof(int32)+sizeof(int16))/sizeof(int8)); + const int16 count = payloadBuffer->getShort(); + if (count != 1) + { + THROW_BASE_EXCEPTION("only 1 supported for now"); + } + const pvAccessID cid = payloadBuffer->getInt(); + + String channelName = SerializeHelper::deserializeString(payloadBuffer, transport); + if (channelName.size() == 0) + { + + char host[100]; + sockAddrToA(&transport->getRemoteAddress()->sa,host,100); + errlogSevPrintf(errlogMinor,"Zero length channel name, disconnecting client: %s", host); + disconnect(transport); + return; + } + else if (channelName.size() > UNREASONABLE_CHANNEL_NAME_LENGTH) + { + char host[100]; + sockAddrToA(&transport->getRemoteAddress()->sa,host,100); + errlogSevPrintf(errlogMinor,"Unreasonable channel name length, disconnecting client: %s", host); + disconnect(transport); + return; + } + + ChannelRequester* cr = new ServerChannelRequesterImpl(transport, channelName, cid); + _provider->createChannel(channelName, cr, transport->getPriority()); +} + +void ServerCreateChannelHandler::disconnect(Transport* transport) +{ + transport->close(true); +} + +ServerChannelRequesterImpl::ServerChannelRequesterImpl(Transport* transport, const String channelName, const pvAccessID cid) : + _transport(transport), + _channelName(channelName), + _cid(cid), + _status(), + _channel(NULL) +{ + +} + +void ServerChannelRequesterImpl::channelCreated(const Status& status, Channel* channel) +{ + Lock guard(_mutex); + _status = status; + _channel = channel; + _transport->enqueueSendRequest(this); +} + +void ServerChannelRequesterImpl::channelStateChange(Channel* c, const Channel::ConnectionState isConnected) +{ + //noop +} + +String ServerChannelRequesterImpl::getRequesterName() +{ + stringstream name; + name << typeid(*_transport).name() << "/" << _cid; + return name.str(); +} + +void ServerChannelRequesterImpl::message(const String message, const MessageType messageType) +{ + errlogSevPrintf(errlogMinor, "[%s] %s", messageTypeName[messageType].c_str(), message.c_str()); +} + +void ServerChannelRequesterImpl::lock() +{ + //noop +} + +void ServerChannelRequesterImpl::unlock() +{ + //noop +} + +void ServerChannelRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) +{ + Channel* channel; + Status status; + { + Lock guard(_mutex); + channel = _channel; + status = _status; + } + + // error response + if (channel == NULL) + { + createChannelFailedResponse(buffer, control, status); + } + // OK + else + { + ServerChannelImpl* serverChannel = NULL; + try { - Lock guard(_mutex); - _searchSequenceId = searchSequenceId; - _cid = cid; - _sendTo = sendTo; - _responseRequired = responseRequired; - return this; - } + // NOTE: we do not explicitly check if transport OK + ChannelHostingTransport* casTransport = dynamic_cast(_transport); - void ChannelFindRequesterImpl::channelFindResult(const epics::pvData::Status& status, ChannelFind* channelFind, boolean wasFound) - { - // TODO status - Lock guard(_mutex); - if (wasFound || _responseRequired) - { - _wasFound = wasFound; - _context->getBroadcastTransport()->enqueueSendRequest(this); - } - } - - void ChannelFindRequesterImpl::lock() - { - // noop - } - - void ChannelFindRequesterImpl::unlock() - { - // noop - } - - void ChannelFindRequesterImpl::acquire() - { - // noop - } - - void ChannelFindRequesterImpl::release() - { - // noop - } - - void ChannelFindRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) - { - int32 count = 1; - control->startMessage((int8)4, (sizeof(int32)+sizeof(int8)+128+2*sizeof(int16)+count*sizeof(int32))/sizeof(8)); - - Lock guard(_mutex); - buffer->putInt(_searchSequenceId); - buffer->putByte(_wasFound ? (int8)1 : (int8)0); - - // NOTE: is it possible (very likely) that address is any local address ::ffff:0.0.0.0 - encodeAsIPv6Address(buffer, _context->getServerInetAddress()); - buffer->putShort((int16)_context->getServerPort()); - buffer->putShort((int16)count); - buffer->putInt(_cid); - - control->setRecipient(*_sendTo); - - // return this object to the pool - _objectPool->put(this); - } - - ChannelFindRequesterImplObjectPool::ChannelFindRequesterImplObjectPool(ServerContextImpl* context) : - _context(context) - {} - - ChannelFindRequesterImplObjectPool::~ChannelFindRequesterImplObjectPool() - { - for(std::vector::iterator iter = _elements.begin(); - iter != _elements.end(); iter++) - { - delete *iter; - } - _elements.erase(_elements.begin(), _elements.end()); - } - - ChannelFindRequesterImpl* ChannelFindRequesterImplObjectPool::get() - { - Lock guard(_mutex); - const int32 count = _elements.size(); - if (count == 0) - { - return new ChannelFindRequesterImpl(_context, this); - } - else - { - ChannelFindRequesterImpl* channelFindRequesterImpl = _elements.back(); - _elements.pop_back(); - return channelFindRequesterImpl; - } - } - - void ChannelFindRequesterImplObjectPool::put(ChannelFindRequesterImpl* element) - { - Lock guard(_mutex); - element->clear(); - _elements.push_back(element); - } - - /****************************************************************************************/ - void CreateChannelHandler::handleResponse(osiSockAddr* responseFrom, - Transport* transport, int8 version, int8 command, - int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { - AbstractServerResponseHandler::handleResponse(responseFrom, - transport, version, command, payloadSize, payloadBuffer); - - // TODO for not only one request at the time is supported, i.e. dataCount == 1 - - transport->ensureData((sizeof(int32)+sizeof(int16))/sizeof(int8)); - const int16 count = payloadBuffer->getShort(); - if (count != 1) - { - THROW_BASE_EXCEPTION("only 1 supported for now"); - } - const pvAccessID cid = payloadBuffer->getInt(); - - String channelName = SerializeHelper::deserializeString(payloadBuffer, transport); - if (channelName.size() == 0) - { - - char host[100]; - sockAddrToA(&transport->getRemoteAddress()->sa,host,100); - errlogSevPrintf(errlogMinor,"Zero length channel name, disconnecting client: %s", host); - disconnect(transport); - return; - } - else if (channelName.size() > UNREASONABLE_CHANNEL_NAME_LENGTH) - { - char host[100]; - sockAddrToA(&transport->getRemoteAddress()->sa,host,100); - errlogSevPrintf(errlogMinor,"Unreasonable channel name length, disconnecting client: %s", host); - disconnect(transport); - return; - } - - ChannelRequester* cr = new ChannelRequesterImpl(transport, channelName, cid); - _provider->createChannel(channelName, cr, transport->getPriority()); - } - - void CreateChannelHandler::disconnect(Transport* transport) - { - transport->close(true); - } - - ChannelRequesterImpl::ChannelRequesterImpl(Transport* transport, const String channelName, const pvAccessID cid) : - _transport(transport), - _channelName(channelName), - _cid(cid), - _status(), - _channel(NULL) - { - - } - - void ChannelRequesterImpl::channelCreated(const Status& status, Channel* channel) - { - Lock guard(_mutex); - _status = status; - _channel = channel; - _transport->enqueueSendRequest(this); - } - - void ChannelRequesterImpl::channelStateChange(Channel* c, const Channel::ConnectionState isConnected) - { - //noop - } - - String ChannelRequesterImpl::getRequesterName() - { - stringstream name; - name << typeid(*_transport).name() << "/" << _cid; - return name.str(); - } - - void ChannelRequesterImpl::message(const String message, const MessageType messageType) - { - errlogSevPrintf(errlogMinor, "[%s] %s", messageTypeName[messageType].c_str(), message.c_str()); - } - - void ChannelRequesterImpl::lock() - { - //noop - } - - void ChannelRequesterImpl::unlock() - { - //noop - } - - void ChannelRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) - { - Channel* channel; - Status status; + // + // create a new channel instance + // + pvAccessID sid = casTransport->preallocateChannelSID(); + try { - Lock guard(_mutex); - channel = _channel; - status = _status; + serverChannel = new ServerChannelImpl(channel, _cid, sid, casTransport->getSecurityToken()); + + // ack allocation and register + casTransport->registerChannel(sid, serverChannel); + + } catch (...) + { + // depreallocate and rethrow + casTransport->depreallocateChannelSID(sid); + throw; } - // error response - if (channel == NULL) - { - createChannelFailedResponse(buffer, control, status); - } - // OK - else - { - ServerChannelImpl* serverChannel = NULL; - try - { - // NOTE: we do not explicitly check if transport OK - ChannelHostingTransport* casTransport = dynamic_cast(_transport); - - // - // create a new channel instance - // - pvAccessID sid = casTransport->preallocateChannelSID(); - try - { - serverChannel = new ServerChannelImpl(channel, _cid, sid, casTransport->getSecurityToken()); - - // ack allocation and register - casTransport->registerChannel(sid, serverChannel); - - } catch (...) - { - // depreallocate and rethrow - casTransport->depreallocateChannelSID(sid); - throw; - } - - control->startMessage((int8)7, 2*sizeof(int32)/sizeof(int8)); - buffer->putInt(_cid); - buffer->putInt(sid); - _transport->getIntrospectionRegistry()->serializeStatus(buffer, control, status); - } - catch (std::exception& e) - { - errlogSevPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName.c_str()); - createChannelFailedResponse(buffer, control, Status(Status::STATUSTYPE_FATAL, "failed to create channel", e.what())); - if (serverChannel != NULL) - { - serverChannel->destroy(); - } - } - catch (...) - { - errlogSevPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName.c_str()); - createChannelFailedResponse(buffer, control, Status(Status::STATUSTYPE_FATAL, "failed to create channel")); - if (serverChannel != NULL) - { - serverChannel->destroy(); - } - } - } - } - - void ChannelRequesterImpl::release() - { - delete this; - } - - void ChannelRequesterImpl::acquire() - { - //noop - } - - void ChannelRequesterImpl::createChannelFailedResponse(ByteBuffer* buffer, TransportSendControl* control, const Status& status) - { control->startMessage((int8)7, 2*sizeof(int32)/sizeof(int8)); buffer->putInt(_cid); - buffer->putInt(-1); + buffer->putInt(sid); _transport->getIntrospectionRegistry()->serializeStatus(buffer, control, status); - } - - /****************************************************************************************/ - - void DestroyChannelHandler::handleResponse(osiSockAddr* responseFrom, - Transport* transport, int8 version, int8 command, - int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { - AbstractServerResponseHandler::handleResponse(responseFrom, - transport, version, command, payloadSize, payloadBuffer); - - // NOTE: we do not explicitly check if transport OK - ChannelHostingTransport* casTransport = dynamic_cast(transport); - - - transport->ensureData(2*sizeof(int32)/sizeof(int8)); - const pvAccessID sid = payloadBuffer->getInt(); - const pvAccessID cid = payloadBuffer->getInt(); - - // get channel by SID - ServerChannelImpl* channel = static_cast(casTransport->getChannel(sid)); - if (channel == NULL) - { - if (!transport->isClosed()) - { - char host[100]; - sockAddrToA(&responseFrom->sa,host,100); - errlogSevPrintf(errlogMinor, "Trying to destroy a channel that no longer exists (SID: %d, CID %d, client: %s).", sid, cid, host); - } - return; - } - - // destroy - channel->destroy(); - - // .. and unregister - casTransport->unregisterChannel(sid); - - // send response back - transport->enqueueSendRequest(new DestroyChannelHandlerTransportSender(cid, sid)); - } - - /****************************************************************************************/ - - void GetHandler::handleResponse(osiSockAddr* responseFrom, - Transport* transport, int8 version, int8 command, - int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { - AbstractServerResponseHandler::handleResponse(responseFrom, - transport, version, command, payloadSize, payloadBuffer); - - // NOTE: we do not explicitly check if transport is OK - ChannelHostingTransport* casTransport = dynamic_cast(transport); - - transport->ensureData(2*sizeof(int32)/sizeof(int8)+1); - const pvAccessID sid = payloadBuffer->getInt(); - const pvAccessID ioid = payloadBuffer->getInt(); - - // mode - const int8 qosCode = payloadBuffer->getByte(); - - ServerChannelImpl* channel = static_cast(casTransport->getChannel(sid)); - if (channel == NULL) - { - BaseChannelRequester::sendFailureMessage((int8)10, transport, ioid, qosCode, BaseChannelRequester::badCIDStatus); - return; - } - - const boolean init = (QOS_INIT & qosCode) != 0; - if (init) - { - // pvRequest - PVStructurePtr pvRequest = transport->getIntrospectionRegistry()->deserializePVRequest(payloadBuffer, transport); - - // create... - new ChannelGetRequesterImpl(_context, channel, ioid, transport, pvRequest); - } - else - { - const boolean lastRequest = (QOS_DESTROY & qosCode) != 0; - - ChannelGetRequesterImpl* request = static_cast(channel->getRequest(ioid)); - if (request == NULL) - { - BaseChannelRequester::sendFailureMessage((int8)10, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); - return; - } - - if (!request->startRequest(qosCode)) - { - BaseChannelRequester::sendFailureMessage((int8)10, transport, ioid, qosCode, BaseChannelRequester::otherRequestPendingStatus); - return; - } - - request->getChannelGet()->get(lastRequest); - } - } - - ChannelGetRequesterImpl::ChannelGetRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, const pvAccessID ioid, Transport* transport, - epics::pvData::PVStructurePtr pvRequest) : - BaseChannelRequester(context, channel, ioid, transport) - { - startRequest(QOS_INIT); - channel->registerRequest(ioid, this); - _channelGet = channel->getChannel()->createChannelGet(this, pvRequest); - // TODO what if last call fails... registration is still present - } - - void ChannelGetRequesterImpl::channelGetConnect(const epics::pvData::Status& status, ChannelGet* channelGet, epics::pvData::PVStructurePtr pvStructure, - epics::pvData::BitSet* bitSet) - { - { - Lock guard(_mutex); - _bitSet = bitSet; - _pvStructure = pvStructure; - _status = status; - _channelGet = channelGet; - } - _transport->enqueueSendRequest(this); - - // self-destruction - if (!status.isSuccess()) + } + catch (std::exception& e) + { + errlogSevPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName.c_str()); + createChannelFailedResponse(buffer, control, Status(Status::STATUSTYPE_FATAL, "failed to create channel", e.what())); + if (serverChannel != NULL) { - destroy(); + serverChannel->destroy(); } - } - - void ChannelGetRequesterImpl::getDone(const epics::pvData::Status& status) - { - { - Lock guard(_mutex); - _status = status; - } - _transport->enqueueSendRequest(this); - } - - void ChannelGetRequesterImpl::destroy() - { - _channel->unregisterRequest(_ioid); - if (_channelGet != NULL) - { - _channelGet->destroy(); - } - } - - ChannelGet* ChannelGetRequesterImpl::getChannelGet() - { - return _channelGet; - } - - String ChannelGetRequesterImpl::getRequesterName() - { - stringstream name; - name << typeid(*_transport).name(); - return name.str(); - } - - void ChannelGetRequesterImpl::lock() - { - //TODO - } - - void ChannelGetRequesterImpl::unlock() - { - //TODO - } - - void ChannelGetRequesterImpl::release() - { - delete this; - } - - void ChannelGetRequesterImpl::acquire() - { - //noop - } - - void ChannelGetRequesterImpl::message(const String message, MessageType messageType) - { - errlogSevPrintf(errlogMinor, "[%s] %s", messageTypeName[messageType].c_str(), message.c_str()); - } - - void ChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) - { - const int32 request = getPendingRequest(); - - control->startMessage((int8)10, sizeof(int32)/sizeof(int8) + 1); - buffer->putInt(_ioid); - buffer->put((int8)request); - IntrospectionRegistry* introspectionRegistry = _transport->getIntrospectionRegistry(); - { - Lock guard(_mutex); - introspectionRegistry->serializeStatus(buffer, control, _status); - } - - if (_status.isSuccess()) - { - if (request & QOS_INIT) - { - Lock guard(_mutex); - introspectionRegistry->serialize(_pvStructure != NULL ? _pvStructure->getField() : NULL, buffer, control); - - } - else - { - _bitSet->serialize(buffer, control); - _pvStructure->serialize(buffer, control, _bitSet); - } - } - - stopRequest(); - - // lastRequest - if (request & QOS_DESTROY) - { - destroy(); - } - } - } + } + catch (...) + { + errlogSevPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName.c_str()); + createChannelFailedResponse(buffer, control, Status(Status::STATUSTYPE_FATAL, "failed to create channel")); + if (serverChannel != NULL) + { + serverChannel->destroy(); + } + } + } +} + + +void ServerChannelRequesterImpl::createChannelFailedResponse(ByteBuffer* buffer, TransportSendControl* control, const Status& status) +{ + control->startMessage((int8)7, 2*sizeof(int32)/sizeof(int8)); + buffer->putInt(_cid); + buffer->putInt(-1); + _transport->getIntrospectionRegistry()->serializeStatus(buffer, control, status); +} + +/****************************************************************************************/ + +void ServerDestroyChannelHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + // NOTE: we do not explicitly check if transport OK + ChannelHostingTransport* casTransport = dynamic_cast(transport); + + + transport->ensureData(2*sizeof(int32)/sizeof(int8)); + const pvAccessID sid = payloadBuffer->getInt(); + const pvAccessID cid = payloadBuffer->getInt(); + + // get channel by SID + ServerChannelImpl* channel = static_cast(casTransport->getChannel(sid)); + if (channel == NULL) + { + if (!transport->isClosed()) + { + char host[100]; + sockAddrToA(&responseFrom->sa,host,100); + errlogSevPrintf(errlogMinor, "Trying to destroy a channel that no longer exists (SID: %d, CID %d, client: %s).", sid, cid, host); + } + return; + } + + // destroy + channel->destroy(); + + // .. and unregister + casTransport->unregisterChannel(sid); + + // send response back + transport->enqueueSendRequest(new ServerDestroyChannelHandlerTransportSender(cid, sid)); +} + +/****************************************************************************************/ + +void ServerGetHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + // NOTE: we do not explicitly check if transport is OK + ChannelHostingTransport* casTransport = dynamic_cast(transport); + + transport->ensureData(2*sizeof(int32)/sizeof(int8)+1); + const pvAccessID sid = payloadBuffer->getInt(); + const pvAccessID ioid = payloadBuffer->getInt(); + + // mode + const int8 qosCode = payloadBuffer->getByte(); + + ServerChannelImpl* channel = static_cast(casTransport->getChannel(sid)); + if (channel == NULL) + { + BaseChannelRequester::sendFailureMessage((int8)10, transport, ioid, qosCode, BaseChannelRequester::badCIDStatus); + return; + } + + const boolean init = (QOS_INIT & qosCode) != 0; + if (init) + { + // pvRequest + PVStructurePtr pvRequest = transport->getIntrospectionRegistry()->deserializePVRequest(payloadBuffer, transport); + + // create... + new ServerChannelGetRequesterImpl(_context, channel, ioid, transport, pvRequest); + } + else + { + const boolean lastRequest = (QOS_DESTROY & qosCode) != 0; + + ServerChannelGetRequesterImpl* request = static_cast(channel->getRequest(ioid)); + if (request == NULL) + { + BaseChannelRequester::sendFailureMessage((int8)10, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); + return; + } + + if (!request->startRequest(qosCode)) + { + BaseChannelRequester::sendFailureMessage((int8)10, transport, ioid, qosCode, BaseChannelRequester::otherRequestPendingStatus); + return; + } + + request->getChannelGet()->get(lastRequest); + } +} + +ServerChannelGetRequesterImpl::ServerChannelGetRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, const pvAccessID ioid, Transport* transport, + PVStructurePtr pvRequest) : + BaseChannelRequester(context, channel, ioid, transport) +{ + startRequest(QOS_INIT); + channel->registerRequest(ioid, this); + _channelGet = channel->getChannel()->createChannelGet(this, pvRequest); + // TODO what if last call fails... registration is still present +} + +void ServerChannelGetRequesterImpl::channelGetConnect(const Status& status, ChannelGet* channelGet, PVStructurePtr pvStructure, + BitSet* bitSet) +{ + { + Lock guard(_mutex); + _bitSet = bitSet; + _pvStructure = pvStructure; + _status = status; + _channelGet = channelGet; + } + _transport->enqueueSendRequest(this); + + // self-destruction + if (!status.isSuccess()) + { + destroy(); + } +} + +void ServerChannelGetRequesterImpl::getDone(const Status& status) +{ + { + Lock guard(_mutex); + _status = status; + } + _transport->enqueueSendRequest(this); +} + +void ServerChannelGetRequesterImpl::destroy() +{ + { + _channel->unregisterRequest(_ioid); + if (_channelGet != NULL) + { + _channelGet->destroy(); + } + } + release(); +} + +ChannelGet* ServerChannelGetRequesterImpl::getChannelGet() +{ + return _channelGet; +} + +void ServerChannelGetRequesterImpl::lock() +{ + //noop +} + +void ServerChannelGetRequesterImpl::unlock() +{ + //noop +} + +void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) +{ + const int32 request = getPendingRequest(); + + control->startMessage((int8)10, sizeof(int32)/sizeof(int8) + 1); + buffer->putInt(_ioid); + buffer->put((int8)request); + IntrospectionRegistry* introspectionRegistry = _transport->getIntrospectionRegistry(); + { + Lock guard(_mutex); + introspectionRegistry->serializeStatus(buffer, control, _status); + } + + if (_status.isSuccess()) + { + if (request & QOS_INIT) + { + Lock guard(_mutex); + introspectionRegistry->serialize(_pvStructure != NULL ? _pvStructure->getField() : NULL, buffer, control); + + } + else + { + _bitSet->serialize(buffer, control); + _pvStructure->serialize(buffer, control, _bitSet); + } + } + + stopRequest(); + + // lastRequest + if (request & QOS_DESTROY) + { + destroy(); + } +} +/****************************************************************************************/ +void ServerPutHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + + // NOTE: we do not explicitly check if transport is OK + ChannelHostingTransport* casTransport = dynamic_cast(transport); + + transport->ensureData(2*sizeof(int32)/sizeof(int8)+1); + const pvAccessID sid = payloadBuffer->getInt(); + const pvAccessID ioid = payloadBuffer->getInt(); + + // mode + const int8 qosCode = payloadBuffer->getByte(); + + ServerChannelImpl* channel = static_cast(casTransport->getChannel(sid)); + if (channel == NULL) + { + BaseChannelRequester::sendFailureMessage((int8)11, transport, ioid, qosCode, BaseChannelRequester::badCIDStatus); + return; + } + + const boolean init = (QOS_INIT & qosCode) != 0; + if (init) + { + // pvRequest + PVStructure* pvRequest = transport->getIntrospectionRegistry()->deserializePVRequest(payloadBuffer, transport); + + // create... + new ServerChannelPutRequesterImpl(_context, channel, ioid, transport, pvRequest); + } + else + { + const boolean lastRequest = (QOS_DESTROY & qosCode) != 0; + const boolean get = (QOS_GET & qosCode) != 0; + + ServerChannelPutRequesterImpl* request = static_cast(channel->getRequest(ioid)); + if (request == NULL) + { + BaseChannelRequester::sendFailureMessage((int8)11, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); + return; + } + + if (!request->startRequest(qosCode)) + { + BaseChannelRequester::sendFailureMessage((int8)11, transport, ioid, qosCode, BaseChannelRequester::otherRequestPendingStatus); + return; + } + + if (get) + { + // no destroy w/ get + request->getChannelPut()->get(); + } + else + { + // deserialize bitSet and do a put + BitSet* putBitSet = request->getBitSet(); + putBitSet->deserialize(payloadBuffer, transport); + request->getPVStructure()->deserialize(payloadBuffer, transport, putBitSet); + request->getChannelPut()->put(lastRequest); + } + } +} + +ServerChannelPutRequesterImpl::ServerChannelPutRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, + const pvAccessID ioid, Transport* transport,PVStructure* pvRequest): + BaseChannelRequester(context, channel, ioid, transport) +{ + startRequest(QOS_INIT); + channel->registerRequest(ioid, this); + _channelPut = channel->getChannel()->createChannelPut(this, pvRequest); + // TODO what if last call fails... registration is still present +} + +void ServerChannelPutRequesterImpl::channelPutConnect(const Status& status, ChannelPut* channelPut, PVStructure* pvStructure, BitSet* bitSet) +{ + { + Lock guard(_mutex); + _bitSet = bitSet; + _pvStructure = pvStructure; + _status = status; + _channelPut = channelPut; + } + + _transport->enqueueSendRequest(this); + + // self-destruction + if (!status.isSuccess()) + { + destroy(); + } +} + +void ServerChannelPutRequesterImpl::putDone(const Status& status) +{ + { + Lock guard(_mutex); + _status = status; + } + _transport->enqueueSendRequest(this); +} + +void ServerChannelPutRequesterImpl::getDone(const Status& status) +{ + { + Lock guard(_mutex); + _status = status; + } + _transport->enqueueSendRequest(this); +} + +void ServerChannelPutRequesterImpl::lock() +{ + //noop +} + +void ServerChannelPutRequesterImpl::unlock() +{ + //noop +} + +void ServerChannelPutRequesterImpl::destroy() +{ + { + Lock guard(_mutex); + _channel->unregisterRequest(_ioid); + if (_channelPut != NULL) + { + _channelPut->destroy(); + } + } + release(); +} + +ChannelPut* ServerChannelPutRequesterImpl::getChannelPut() +{ + Lock guard(_mutex); + return _channelPut; +} + +BitSet* ServerChannelPutRequesterImpl::getBitSet() +{ + Lock guard(_mutex); + return _bitSet; +} + +PVStructure* ServerChannelPutRequesterImpl::getPVStructure() +{ + Lock guard(_mutex); + return _pvStructure; +} + +void ServerChannelPutRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) +{ + const int32 request = getPendingRequest(); + + control->startMessage((int32)11, sizeof(int32)/sizeof(int8) + 1); + buffer->putInt(_ioid); + buffer->putByte((int8)request); + IntrospectionRegistry* introspectionRegistry = _transport->getIntrospectionRegistry(); + { + Lock guard(_mutex); + introspectionRegistry->serializeStatus(buffer, control, _status); + } + + if (_status.isSuccess()) + { + if ((QOS_INIT & request) != 0) + { + Lock guard(_mutex); + introspectionRegistry->serialize(_pvStructure != NULL ? _pvStructure->getField() : NULL, buffer, control); + } + else if ((QOS_GET & request) != 0) + { + Lock guard(_mutex); + _pvStructure->serialize(buffer, control); + } + } + + stopRequest(); + + // lastRequest + if ((QOS_DESTROY & request) != 0) + destroy(); +} + + +/****************************************************************************************/ +void ServerPutGetHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + // NOTE: we do not explicitly check if transport is OK + ChannelHostingTransport* casTransport = dynamic_cast(transport); + + transport->ensureData(2*sizeof(int32)/sizeof(int8)+1); + const pvAccessID sid = payloadBuffer->getInt(); + const pvAccessID ioid = payloadBuffer->getInt(); + + // mode + const int8 qosCode = payloadBuffer->getByte(); + + ServerChannelImpl* channel = static_cast(casTransport->getChannel(sid)); + if (channel == NULL) + { + BaseChannelRequester::sendFailureMessage((int8)12, transport, ioid, qosCode, BaseChannelRequester::badCIDStatus); + return; + } + + const boolean init = (QOS_INIT & qosCode) != 0; + if (init) + { + // pvRequest + PVStructure* pvRequest = transport->getIntrospectionRegistry()->deserializePVRequest(payloadBuffer, transport); + + // create... + new ServerChannelPutGetRequesterImpl(_context, channel, ioid, transport, pvRequest); + } + else + { + const boolean lastRequest = (QOS_DESTROY & qosCode) != 0; + const boolean getGet = (QOS_GET & qosCode) != 0; + const boolean getPut = (QOS_GET_PUT & qosCode) != 0; + + ServerChannelPutGetRequesterImpl* request = static_cast(channel->getRequest(ioid)); + if (request == NULL) + { + BaseChannelRequester::sendFailureMessage((int8)12, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); + return; + } + + if (!request->startRequest(qosCode)) + { + BaseChannelRequester::sendFailureMessage((int8)12, transport, ioid, qosCode, BaseChannelRequester::otherRequestPendingStatus); + return; + } + + if (getGet) + { + request->getChannelPutGet()->getGet(); + } + else if(getPut) + { + request->getChannelPutGet()->getPut(); + } + else + { + // deserialize bitSet and do a put + request->getPVPutStructure()->deserialize(payloadBuffer, transport); + request->getChannelPutGet()->putGet(lastRequest); + } + } +} + +ServerChannelPutGetRequesterImpl::ServerChannelPutGetRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, + const pvAccessID ioid, Transport* transport,PVStructure* pvRequest): + BaseChannelRequester(context, channel, ioid, transport) +{ + startRequest(QOS_INIT); + channel->registerRequest(ioid, this); + _channelPutGet = channel->getChannel()->createChannelPutGet(this, pvRequest); + // TODO what if last call fails... registration is still present +} + +void ServerChannelPutGetRequesterImpl::channelPutGetConnect(const Status& status, ChannelPutGet* channelPutGet, + PVStructure* pvPutStructure, PVStructure* pvGetStructure) +{ + { + Lock guard(_mutex); + _pvPutStructure = pvPutStructure; + _pvGetStructure = pvGetStructure; + _status = status; + _channelPutGet = channelPutGet; + } + + _transport->enqueueSendRequest(this); + + // self-destruction + if (!status.isSuccess()) + { + destroy(); + } +} + +void ServerChannelPutGetRequesterImpl::getGetDone(const Status& status) +{ + { + Lock guard(_mutex); + _status = status; + } + _transport->enqueueSendRequest(this); +} + +void ServerChannelPutGetRequesterImpl::getPutDone(const Status& status) +{ + { + Lock guard(_mutex); + _status = status; + } + _transport->enqueueSendRequest(this); +} + +void ServerChannelPutGetRequesterImpl::putGetDone(const Status& status) +{ + { + Lock guard(_mutex); + _status = status; + } + _transport->enqueueSendRequest(this); +} + +void ServerChannelPutGetRequesterImpl::lock() +{ + //noop +} + +void ServerChannelPutGetRequesterImpl::unlock() +{ + //noop +} + +void ServerChannelPutGetRequesterImpl::destroy() +{ + { + Lock guard(_mutex); + _channel->unregisterRequest(_ioid); + if (_channelPutGet != NULL) + { + _channelPutGet->destroy(); + } + } + release(); +} + +ChannelPutGet* ServerChannelPutGetRequesterImpl::getChannelPutGet() +{ + Lock guard(_mutex); + return _channelPutGet; +} + +PVStructure* ServerChannelPutGetRequesterImpl::getPVPutStructure() +{ + Lock guard(_mutex); + return _pvPutStructure; +} + +void ServerChannelPutGetRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) +{ + const int32 request = getPendingRequest(); + + control->startMessage((int32)12, sizeof(int32)/sizeof(int8) + 1); + buffer->putInt(_ioid); + buffer->putByte((int8)request); + IntrospectionRegistry* introspectionRegistry = _transport->getIntrospectionRegistry(); + { + Lock guard(_mutex); + introspectionRegistry->serializeStatus(buffer, control, _status); + } + + if (_status.isSuccess()) + { + if ((QOS_INIT & request) != 0) + { + Lock guard(_mutex); + introspectionRegistry->serialize(_pvPutStructure != NULL ? _pvPutStructure->getField() : NULL, buffer, control); + introspectionRegistry->serialize(_pvGetStructure != NULL ? _pvGetStructure->getField() : NULL, buffer, control); + } + else if ((QOS_GET & request) != 0) + { + Lock guard(_mutex); + _pvGetStructure->serialize(buffer, control); + } + else if ((QOS_GET_PUT & request) != 0) + { + Lock guard(_mutex); + _pvPutStructure->serialize(buffer, control); + } + else + { + Lock guard(_mutex); + _pvGetStructure->serialize(buffer, control); + } + } + + stopRequest(); + + // lastRequest + if ((QOS_DESTROY & request) != 0) + destroy(); +} + +/****************************************************************************************/ +void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + // NOTE: we do not explicitly check if transport is OK + ChannelHostingTransport* casTransport = dynamic_cast(transport); + + transport->ensureData(2*sizeof(int32)/sizeof(int8)+1); + const pvAccessID sid = payloadBuffer->getInt(); + const pvAccessID ioid = payloadBuffer->getInt(); + + // mode + const int8 qosCode = payloadBuffer->getByte(); + + ServerChannelImpl* channel = static_cast(casTransport->getChannel(sid)); + if (channel == NULL) + { + BaseChannelRequester::sendFailureMessage((int8)12, transport, ioid, qosCode, BaseChannelRequester::badCIDStatus); + return; + } + + const boolean init = (QOS_INIT & qosCode) != 0; + if (init) + { + // pvRequest + PVStructure* pvRequest = transport->getIntrospectionRegistry()->deserializePVRequest(payloadBuffer, transport); + + // create... + new ServerMonitorRequesterImpl(_context, channel, ioid, transport, pvRequest); + } + else + { + const boolean lastRequest = (QOS_DESTROY & qosCode) != 0; + const boolean get = (QOS_GET & qosCode) != 0; + const boolean process = (QOS_PROCESS & qosCode) != 0; + + ServerMonitorRequesterImpl* request = static_cast(channel->getRequest(ioid)); + if (request == NULL) + { + BaseChannelRequester::sendFailureMessage((int8)13, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); + return; + } + + if (!request->startRequest(qosCode)) + { + BaseChannelRequester::sendFailureMessage((int8)13, transport, ioid, qosCode, BaseChannelRequester::otherRequestPendingStatus); + return; + } + + + if (process) + { + if (get) + request->getChannelMonitor()->start(); + else + request->getChannelMonitor()->stop(); + //request.stopRequest(); + } + else if (get) + { + // not supported + } + + if (lastRequest) + request->destroy(); + } +} + +ServerMonitorRequesterImpl::ServerMonitorRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, + const pvAccessID ioid, Transport* transport,PVStructure* pvRequest): + BaseChannelRequester(context, channel, ioid, transport) +{ + startRequest(QOS_INIT); + channel->registerRequest(ioid, this); + _channelMonitor = channel->getChannel()->createMonitor(this, pvRequest); + // TODO what if last call fails... registration is still present +} + +void ServerMonitorRequesterImpl::monitorConnect(const Status& status, Monitor* monitor, Structure* structure) +{ + { + Lock guard(_mutex); + _status = status; + _monitor = monitor; + _structure = structure; + _monitor = monitor; + } + _transport->enqueueSendRequest(this); + + // self-destruction + if (!status.isSuccess()) + { + destroy(); + } +} + +void ServerMonitorRequesterImpl::unlisten(Monitor* monitor) +{ + //TODO +} + +void ServerMonitorRequesterImpl::monitorEvent(Monitor* monitor) +{ + // TODO !!! if queueSize==0, monitor.poll() has to be called and returned NOW (since there is no cache) + //sendEvent(transport); + + // TODO implement via TransportSender + /* + // initiate submit to dispatcher queue, if necessary + synchronized (register) { + if (register.getAndSet(true)) + eventConsumer.consumeEvents(this); + }*/ + // TODO + // multiple ((BlockingServerTCPTransport)transport).enqueueMonitorSendRequest(this); + _transport->enqueueSendRequest(this); +} + +void ServerMonitorRequesterImpl::lock() +{ + //noop +} + +void ServerMonitorRequesterImpl::unlock() +{ + //noop +} + +void ServerMonitorRequesterImpl::destroy() +{ + Lock guard(_mutex); + _channel->unregisterRequest(_ioid); + if (_channelMonitor != NULL) + { + _channelMonitor->destroy(); + } + release(); +} + +Monitor* ServerMonitorRequesterImpl::getChannelMonitor() +{ + Lock guard(_mutex); + return _channelMonitor; +} + +void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) +{ + const int32 request = getPendingRequest(); + + if ((QOS_INIT & request) != 0) + { + control->startMessage((int32)13, sizeof(int32)/sizeof(int8) + 1); + buffer->putInt(_ioid); + buffer->putByte((int8)request); + + IntrospectionRegistry* introspectionRegistry = _transport->getIntrospectionRegistry(); + { + Lock guard(_mutex); + introspectionRegistry->serializeStatus(buffer, control, _status); + } + + if (_status.isSuccess()) + { + introspectionRegistry->serialize(_structure, buffer, control); + } + stopRequest(); + startRequest(QOS_DEFAULT); + } + else + { + Monitor* monitor = _monitor; + MonitorElement* element = monitor->poll(); + if (element != NULL) + { + control->startMessage((int8)13, sizeof(int32)/sizeof(int8) + 1); + buffer->putInt(_ioid); + buffer->putByte((int8)request); + + // changedBitSet and data, if not notify only (i.e. queueSize == -1) + BitSet* changedBitSet = element->getChangedBitSet(); + if (changedBitSet != NULL) + { + changedBitSet->serialize(buffer, control); + element->getPVStructure()->serialize(buffer, control, changedBitSet); + + // overrunBitset + element->getOverrunBitSet()->serialize(buffer, control); + } + + monitor->release(element); + } + } +} + +/****************************************************************************************/ +void ServerArrayHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + // NOTE: we do not explicitly check if transport is OK + ChannelHostingTransport* casTransport = dynamic_cast(transport); + + transport->ensureData(2*sizeof(int32)/sizeof(int8)+1); + const pvAccessID sid = payloadBuffer->getInt(); + const pvAccessID ioid = payloadBuffer->getInt(); + + // mode + const int8 qosCode = payloadBuffer->getByte(); + + ServerChannelImpl* channel = static_cast(casTransport->getChannel(sid)); + if (channel == NULL) + { + BaseChannelRequester::sendFailureMessage((int8)12, transport, ioid, qosCode, BaseChannelRequester::badCIDStatus); + return; + } + + const boolean init = (QOS_INIT & qosCode) != 0; + if (init) + { + // pvRequest + PVStructure* pvRequest = transport->getIntrospectionRegistry()->deserializePVRequest(payloadBuffer, transport); + + // create... + new ServerChannelArrayRequesterImpl(_context, channel, ioid, transport, pvRequest); + } + else + { + const boolean lastRequest = (QOS_DESTROY & qosCode) != 0; + const boolean get = (QOS_GET & qosCode) != 0; + const boolean setLength = (QOS_GET_PUT & qosCode) != 0; + + ServerChannelArrayRequesterImpl* request = static_cast(channel->getRequest(ioid)); + if (request == NULL) + { + BaseChannelRequester::sendFailureMessage((int8)14, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); + return; + } + + if (!request->startRequest(qosCode)) + { + BaseChannelRequester::sendFailureMessage((int8)14, transport, ioid, qosCode, BaseChannelRequester::otherRequestPendingStatus); + return; + } + + + if (get) + { + const int32 offset = SerializeHelper::readSize(payloadBuffer, transport); + const int32 count = SerializeHelper::readSize(payloadBuffer, transport); + request->getChannelArray()->getArray(lastRequest, offset, count); + } + else if (setLength) + { + const int32 length = SerializeHelper::readSize(payloadBuffer, transport); + const int32 capacity = SerializeHelper::readSize(payloadBuffer, transport); + request->getChannelArray()->setLength(lastRequest, length, capacity); + } + else + { + // deserialize data to put + const int32 offset = SerializeHelper::readSize(payloadBuffer, transport); + PVArray* array = request->getPVArray(); + array->deserialize(payloadBuffer, transport); + request->getChannelArray()->putArray(lastRequest, offset, array->getLength()); + } + } +} + +ServerChannelArrayRequesterImpl::ServerChannelArrayRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, + const pvAccessID ioid, Transport* transport,PVStructure* pvRequest): + BaseChannelRequester(context, channel, ioid, transport) +{ + + startRequest(QOS_INIT); + channel->registerRequest(ioid, this); + _channelArray = channel->getChannel()->createChannelArray(this, pvRequest); + // TODO what if last call fails... registration is still present +} + +void ServerChannelArrayRequesterImpl::channelArrayConnect(const Status& status, ChannelArray* channelArray, PVArray* pvArray) +{ + { + Lock guard(_mutex); + _status = status; + _pvArray = pvArray; + _channelArray = channelArray; + } + + _transport->enqueueSendRequest(this); + + // self-destruction + if (!status.isSuccess()) + { + destroy(); + } +} + +void ServerChannelArrayRequesterImpl::getArrayDone(const Status& status) +{ + { + Lock guard(_mutex); + _status = status; + } + _transport->enqueueSendRequest(this); +} + +void ServerChannelArrayRequesterImpl::putArrayDone(const Status& status) +{ + { + Lock guard(_mutex); + _status = status; + } + _transport->enqueueSendRequest(this); +} + +void ServerChannelArrayRequesterImpl::setLengthDone(const Status& status) +{ + { + Lock guard(_mutex); + _status = status; + } + _transport->enqueueSendRequest(this); +} + +void ServerChannelArrayRequesterImpl::lock() +{ + //noop +} + +void ServerChannelArrayRequesterImpl::unlock() +{ + //noop +} + +void ServerChannelArrayRequesterImpl::destroy() +{ + { + Lock guard(_mutex); + _channel->unregisterRequest(_ioid); + if (_channelArray != NULL) + { + _channelArray->destroy(); + } + } + release(); +} + +ChannelArray* ServerChannelArrayRequesterImpl::getChannelArray() +{ + Lock guard(_mutex); + return _channelArray; +} + +PVArray* ServerChannelArrayRequesterImpl::getPVArray() +{ + Lock guard(_mutex); + return _pvArray; +} + +void ServerChannelArrayRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) +{ + const int32 request = getPendingRequest(); + + control->startMessage((int32)14, sizeof(int32)/sizeof(int8) + 1); + buffer->putInt(_ioid); + buffer->putByte((int8)request); + IntrospectionRegistry* introspectionRegistry = _transport->getIntrospectionRegistry(); + { + Lock guard(_mutex); + introspectionRegistry->serializeStatus(buffer, control, _status); + } + + if (_status.isSuccess()) + { + if ((QOS_GET & request) != 0) + { + Lock guard(_mutex); + _pvArray->serialize(buffer, control, 0, _pvArray->getLength()); + } + else if ((QOS_INIT & request) != 0) + { + Lock guard(_mutex); + introspectionRegistry->serialize(_pvArray != NULL ? _pvArray->getField() : NULL, buffer, control); + } + } + + stopRequest(); + + // lastRequest + if ((QOS_DESTROY & request) != 0) + destroy(); +} + +/****************************************************************************************/ +void ServerCancelRequestHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + // NOTE: we do not explicitly check if transport is OK + ChannelHostingTransport* casTransport = dynamic_cast(transport); + + transport->ensureData(2*sizeof(int32)/sizeof(int8)+1); + const pvAccessID sid = payloadBuffer->getInt(); + const pvAccessID ioid = payloadBuffer->getInt(); + + ServerChannelImpl* channel = static_cast(casTransport->getChannel(sid)); + if (channel == NULL) + { + failureResponse(transport, ioid, BaseChannelRequester::badCIDStatus); + return; + } + + Destroyable* request = channel->getRequest(ioid); + if (request == NULL) + { + failureResponse(transport, ioid, BaseChannelRequester::badIOIDStatus); + return; + } + + // destroy + request->destroy(); + + // ... and remove from channel + channel->unregisterRequest(ioid); +} + +void ServerCancelRequestHandler::failureResponse(Transport* transport, pvAccessID ioid, const Status& errorStatus) +{ + BaseChannelRequester::message(transport, ioid, errorStatus.getMessage(), warningMessage); +} + +/****************************************************************************************/ +void ServerProcessHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + // NOTE: we do not explicitly check if transport is OK + ChannelHostingTransport* casTransport = dynamic_cast(transport); + + transport->ensureData(2*sizeof(int32)/sizeof(int8)+1); + const pvAccessID sid = payloadBuffer->getInt(); + const pvAccessID ioid = payloadBuffer->getInt(); + + // mode + const int8 qosCode = payloadBuffer->getByte(); + + ServerChannelImpl* channel = static_cast(casTransport->getChannel(sid)); + if (channel == NULL) + { + BaseChannelRequester::sendFailureMessage((int8)16, transport, ioid, qosCode, BaseChannelRequester::badCIDStatus); + return; + } + + const boolean init = (QOS_INIT & qosCode) != 0; + if (init) + { + // pvRequest + PVStructure* pvRequest = transport->getIntrospectionRegistry()->deserializePVRequest(payloadBuffer, transport); + + // create... + new ServerChannelProcessRequesterImpl(_context, channel, ioid, transport, pvRequest); + } + else + { + const boolean lastRequest = (QOS_DESTROY & qosCode) != 0; + + ServerChannelProcessRequesterImpl* request = static_cast(channel->getRequest(ioid)); + if (request == NULL) + { + BaseChannelRequester::sendFailureMessage((int8)16, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); + return; + } + + if (!request->startRequest(qosCode)) + { + BaseChannelRequester::sendFailureMessage((int8)16, transport, ioid, qosCode, BaseChannelRequester::otherRequestPendingStatus); + return; + } + + request->getChannelProcess()->process(lastRequest); + } +} + +ServerChannelProcessRequesterImpl::ServerChannelProcessRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, + const pvAccessID ioid, Transport* transport,PVStructure* pvRequest): BaseChannelRequester(context, channel, ioid, transport), + _refCount(1) +{ + startRequest(QOS_INIT); + channel->registerRequest(ioid, this); + _channelProcess = channel->getChannel()->createChannelProcess(this, pvRequest); + // TODO what if last call fails... registration is still present +} + +void ServerChannelProcessRequesterImpl::channelProcessConnect(const Status& status, ChannelProcess* channelProcess) +{ + { + Lock guard(_mutex); + _status = status; + _channelProcess = channelProcess; + } + _transport->enqueueSendRequest(this); + + // self-destruction + if (!status.isSuccess()) + { + destroy(); + } +} + +void ServerChannelProcessRequesterImpl::processDone(const Status& status) +{ + { + Lock guard(_mutex); + _status = status; + } + _transport->enqueueSendRequest(this); +} + +void ServerChannelProcessRequesterImpl::lock() +{ + //noop +} + +void ServerChannelProcessRequesterImpl::unlock() +{ + //noop +} + +void ServerChannelProcessRequesterImpl::destroy() +{ + { + Lock guard(_mutex); + _channel->unregisterRequest(_ioid); + if (_channelProcess != NULL) + { + _channelProcess->destroy(); + } + } + release(); +} + +ChannelProcess* ServerChannelProcessRequesterImpl::getChannelProcess() +{ + Lock guard(_mutex); + return _channelProcess; +} + +void ServerChannelProcessRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) +{ + const int32 request = getPendingRequest(); + + control->startMessage((int32)16, sizeof(int32)/sizeof(int8) + 1); + buffer->putInt(_ioid); + buffer->putByte((int8)request); + IntrospectionRegistry* introspectionRegistry = _transport->getIntrospectionRegistry(); + { + Lock guard(_mutex); + introspectionRegistry->serializeStatus(buffer, control, _status); + } + + stopRequest(); + + // lastRequest + if ((QOS_DESTROY & request) != 0) + { + destroy(); + } +} + + +/****************************************************************************************/ +void ServerGetFieldHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + // NOTE: we do not explicitly check if transport is OK + ChannelHostingTransport* casTransport = dynamic_cast(transport); + + transport->ensureData(2*sizeof(int32)/sizeof(int8)); + const pvAccessID sid = payloadBuffer->getInt(); + const pvAccessID ioid = payloadBuffer->getInt(); + + ServerChannelImpl* channel = static_cast(casTransport->getChannel(sid)); + if (channel == NULL) + { + getFieldFailureResponse(transport, ioid, BaseChannelRequester::badCIDStatus); + return; + } + + String subField = SerializeHelper::deserializeString(payloadBuffer, transport); + + // issue request + channel->getChannel()->getField(new ServerGetFieldRequesterImpl(_context, channel, ioid, transport), subField); +} + +void ServerGetFieldHandler::getFieldFailureResponse(Transport* transport, const pvAccessID ioid, const Status& errorStatus) +{ + transport->enqueueSendRequest(new ServerGetFieldHandlerTransportSender(ioid,errorStatus,transport)); +} + +ServerGetFieldRequesterImpl::ServerGetFieldRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, const pvAccessID ioid, Transport* transport) : + BaseChannelRequester(context, channel, ioid, transport) +{ +} + +void ServerGetFieldRequesterImpl::getDone(const Status& status, FieldConstPtr field) +{ + { + Lock guard(_mutex); + _status = status; + _field = field; + } + _transport->enqueueSendRequest(this); +} + +void ServerGetFieldRequesterImpl::lock() +{ + //noop +} + +void ServerGetFieldRequesterImpl::unlock() +{ + //noop +} + +void ServerGetFieldRequesterImpl::destroy() +{ + release(); +} + +void ServerGetFieldRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) +{ + control->startMessage((int8)17, sizeof(int32)/sizeof(int8)); + buffer->putInt(_ioid); + IntrospectionRegistry* introspectionRegistry = _transport->getIntrospectionRegistry(); + { + Lock guard(_mutex); + introspectionRegistry->serializeStatus(buffer, control, _status); + introspectionRegistry->serialize(_field, buffer, control); + } +} + +/****************************************************************************************/ +void ServerRPCHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + // NOTE: we do not explicitly check if transport is OK + ChannelHostingTransport* casTransport = dynamic_cast(transport); + + transport->ensureData(2*sizeof(int32)/sizeof(int8)+1); + const pvAccessID sid = payloadBuffer->getInt(); + const pvAccessID ioid = payloadBuffer->getInt(); + + // mode + const int8 qosCode = payloadBuffer->getByte(); + + ServerChannelImpl* channel = static_cast(casTransport->getChannel(sid)); + if (channel == NULL) + { + BaseChannelRequester::sendFailureMessage((int8)16, transport, ioid, qosCode, BaseChannelRequester::badCIDStatus); + return; + } + + const boolean init = (QOS_INIT & qosCode) != 0; + if (init) + { + // pvRequest + PVStructure* pvRequest = transport->getIntrospectionRegistry()->deserializePVRequest(payloadBuffer, transport); + + // create... + new ServerChannelRPCRequesterImpl(_context, channel, ioid, transport, pvRequest); + } + else + { + const boolean lastRequest = (QOS_DESTROY & qosCode) != 0; + + ServerChannelRPCRequesterImpl* request = static_cast(channel->getRequest(ioid)); + if (request == NULL) + { + BaseChannelRequester::sendFailureMessage((int8)20, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); + return; + } + + if (!request->startRequest(qosCode)) + { + BaseChannelRequester::sendFailureMessage((int8)20, transport, ioid, qosCode, BaseChannelRequester::otherRequestPendingStatus); + return; + } + + // deserialize put data + BitSet* changedBitSet = request->getAgrumentsBitSet(); + changedBitSet->deserialize(payloadBuffer, transport); + request->getPvArguments()->deserialize(payloadBuffer, transport, changedBitSet); + request->getChannelRPC()->request(lastRequest); + } +} + +ServerChannelRPCRequesterImpl::ServerChannelRPCRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, + const pvAccessID ioid, Transport* transport,PVStructure* pvRequest): +BaseChannelRequester(context, channel, ioid, transport) +{ + startRequest(QOS_INIT); + channel->registerRequest(ioid, this); + _channelRPC = channel->getChannel()->createChannelRPC(this, pvRequest); + +} + +void ServerChannelRPCRequesterImpl::channelRPCConnect(const Status& status, ChannelRPC* channelRPC, PVStructure* arguments, BitSet* bitSet) +{ + { + Lock guard(_mutex); + _pvArguments = arguments; + _argumentsBitSet = bitSet; + _status = status; + } + _transport->enqueueSendRequest(this); + + // self-destruction + if (!status.isSuccess()) + { + destroy(); + } +} + +void ServerChannelRPCRequesterImpl::requestDone(const Status& status, PVStructure* pvResponse) +{ + { + Lock guard(_mutex); + _status = status; + _pvResponse = pvResponse; + } + _transport->enqueueSendRequest(this); +} + +void ServerChannelRPCRequesterImpl::lock() +{ + //noop +} + +void ServerChannelRPCRequesterImpl::unlock() +{ + //noop +} + +void ServerChannelRPCRequesterImpl::destroy() +{ + { + Lock guard(_mutex); + _channel->unregisterRequest(_ioid); + if (_channelRPC != NULL) + { + _channelRPC->destroy(); + } + } + release(); +} + +ChannelRPC* ServerChannelRPCRequesterImpl::getChannelRPC() +{ + Lock guard(_mutex); + return _channelRPC; +} + +PVStructure* ServerChannelRPCRequesterImpl::getPvArguments() +{ + Lock guard(_mutex); + return _pvArguments; +} + +BitSet* ServerChannelRPCRequesterImpl::getAgrumentsBitSet() +{ + Lock guard(_mutex); + return _argumentsBitSet; +} + +void ServerChannelRPCRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) +{ + const int32 request = getPendingRequest(); + + control->startMessage((int32)20, sizeof(int32)/sizeof(int8) + 1); + buffer->putInt(_ioid); + buffer->putByte((int8)request); + IntrospectionRegistry* introspectionRegistry = _transport->getIntrospectionRegistry(); + { + Lock guard(_mutex); + introspectionRegistry->serializeStatus(buffer, control, _status); + } + + if (_status.isSuccess()) + { + if ((QOS_INIT & request) != 0) + { + Lock guard(_mutex); + introspectionRegistry->serialize(_pvArguments != NULL ? _pvArguments->getField() : NULL, buffer, control); + } + else + { + introspectionRegistry->serializeStructure(buffer, control, _pvResponse); + } + } + + stopRequest(); + + // lastRequest + if ((QOS_DESTROY & request) != 0) + destroy(); +} + +} } diff --git a/pvAccessApp/server/responseHandlers.h b/pvAccessApp/server/responseHandlers.h index 868c69c..e72ff7c 100644 --- a/pvAccessApp/server/responseHandlers.h +++ b/pvAccessApp/server/responseHandlers.h @@ -12,6 +12,7 @@ #include "remote.h" #include "serverChannelImpl.h" #include "baseChannelRequester.h" +#include "referencedTransportSender.h" namespace epics { namespace pvAccess { @@ -41,16 +42,16 @@ namespace epics { * @author Matej Sekoranja * @version $Id: BadResponse.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ */ - class BadResponse : public AbstractServerResponseHandler { + class ServerBadResponse : public AbstractServerResponseHandler { public: /** * @param context */ - BadResponse(ServerContextImpl* context) : + ServerBadResponse(ServerContextImpl* context) : AbstractServerResponseHandler(context, "Bad request") { } - virtual ~BadResponse() { + virtual ~ServerBadResponse() { } virtual void handleResponse(osiSockAddr* responseFrom, @@ -77,7 +78,7 @@ namespace epics { /** * Bad response handlers. */ - BadResponse *_badResponse; + ServerBadResponse *_badResponse; /** * Table of response handlers for each command ID. */ @@ -90,12 +91,12 @@ namespace epics { * @author Matej Sekoranja * @version $Id: ConnectionValidationHandler.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ */ - class ConnectionValidationHandler : public AbstractServerResponseHandler { + class ServerConnectionValidationHandler : public AbstractServerResponseHandler { public: /** * @param context */ - ConnectionValidationHandler(ServerContextImpl* context) : + ServerConnectionValidationHandler(ServerContextImpl* context) : AbstractServerResponseHandler(context, "Connection validation") { } @@ -109,13 +110,13 @@ namespace epics { * @author Matej Sekoranja * @version $Id: NoopResponse.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ */ - class NoopResponse : public AbstractServerResponseHandler { + class ServerNoopResponse : public AbstractServerResponseHandler { public: /** * @param context * @param description */ - NoopResponse(ServerContextImpl* context, String description) : + ServerNoopResponse(ServerContextImpl* context, String description) : AbstractServerResponseHandler(context, description) { } }; @@ -125,12 +126,12 @@ namespace epics { * @author Matej Sekoranja * @version $Id: EchoHandler.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ */ - class EchoHandler : public AbstractServerResponseHandler { + class ServerEchoHandler : public AbstractServerResponseHandler { public: /** * @param context */ - EchoHandler(ServerContextImpl* context) : + ServerEchoHandler(ServerContextImpl* context) : AbstractServerResponseHandler(context, "Echo request") { } @@ -139,17 +140,40 @@ namespace epics { int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); }; + class EchoTransportSender : public ReferencedTransportSender { + public: + EchoTransportSender(osiSockAddr* echoFrom) { + memcpy(&_echoFrom, echoFrom, sizeof(osiSockAddr)); + } + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + control->startMessage(CMD_ECHO, 0); + control->setRecipient(_echoFrom); + } + + virtual void lock() { + } + + virtual void unlock() { + } + + private: + osiSockAddr _echoFrom; + + virtual ~EchoTransportSender() { + } + }; /** * Introspection search request handler. */ - class IntrospectionSearchHandler : public AbstractServerResponseHandler + class ServerIntrospectionSearchHandler : public AbstractServerResponseHandler { public: /** * @param context */ - IntrospectionSearchHandler(ServerContextImpl* context) : + ServerIntrospectionSearchHandler(ServerContextImpl* context) : AbstractServerResponseHandler(context, "Search request") { } @@ -162,15 +186,15 @@ namespace epics { /** * Search channel request handler. */ - class ChannelFindRequesterImplObjectPool; - class SearchHandler : public AbstractServerResponseHandler + class ServerChannelFindRequesterImplObjectPool; + class ServerSearchHandler : public AbstractServerResponseHandler { public: /** * @param context */ - SearchHandler(ServerContextImpl* context); - ~SearchHandler(); + ServerSearchHandler(ServerContextImpl* context); + ~ServerSearchHandler(); virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, @@ -178,57 +202,55 @@ namespace epics { private: ChannelProvider* _provider; - ChannelFindRequesterImplObjectPool* _objectPool; + ServerChannelFindRequesterImplObjectPool* _objectPool; }; - class ChannelFindRequesterImpl: public ChannelFindRequester, public TransportSender + class ServerChannelFindRequesterImpl: public ChannelFindRequester, public ReferencedTransportSender { public: - ChannelFindRequesterImpl(ServerContextImpl* context, ChannelFindRequesterImplObjectPool* objectPool); + ServerChannelFindRequesterImpl(ServerContextImpl* context, ServerChannelFindRequesterImplObjectPool* objectPool); void clear(); - ChannelFindRequesterImpl* set(int32 searchSequenceId, int32 cid, osiSockAddr* sendTo, boolean responseRequired); + ServerChannelFindRequesterImpl* set(int32 searchSequenceId, int32 cid, osiSockAddr* sendTo, boolean responseRequired); void channelFindResult(const epics::pvData::Status& status, ChannelFind* channelFind, boolean wasFound); void lock(); void unlock(); - void acquire(); - void release(); - void send(ByteBuffer* buffer, TransportSendControl* control); + void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); private: int32 _searchSequenceId; int32 _cid; osiSockAddr* _sendTo; boolean _responseRequired; boolean _wasFound; - epics::pvData::Mutex _mutex; ServerContextImpl* _context; - ChannelFindRequesterImplObjectPool* _objectPool; + epics::pvData::Mutex _mutex; + ServerChannelFindRequesterImplObjectPool* _objectPool; }; - class ChannelFindRequesterImplObjectPool + class ServerChannelFindRequesterImplObjectPool { public: - ChannelFindRequesterImplObjectPool(ServerContextImpl* context); - ~ChannelFindRequesterImplObjectPool(); - ChannelFindRequesterImpl* get(); - void put(ChannelFindRequesterImpl* element); + ServerChannelFindRequesterImplObjectPool(ServerContextImpl* context); + ~ServerChannelFindRequesterImplObjectPool(); + ServerChannelFindRequesterImpl* get(); + void put(ServerChannelFindRequesterImpl* element); private: - std::vector _elements; - epics::pvData::Mutex _mutex; + std::vector _elements; ServerContextImpl* _context; + epics::pvData::Mutex _mutex; }; /****************************************************************************************/ /** * Create channel request handler. */ - class CreateChannelHandler : public AbstractServerResponseHandler + class ServerCreateChannelHandler : public AbstractServerResponseHandler { public: /** * @param context */ - CreateChannelHandler(ServerContextImpl* context) : + ServerCreateChannelHandler(ServerContextImpl* context) : AbstractServerResponseHandler(context, "Create channel request") { _provider = context->getChannelProvider(); } @@ -245,41 +267,38 @@ namespace epics { ChannelProvider* _provider; }; - class ChannelRequesterImpl : public ChannelRequester, public TransportSender + class ServerChannelRequesterImpl : public ChannelRequester, public ReferencedTransportSender { public: - ChannelRequesterImpl(Transport* transport, const String channelName, const pvAccessID cid); - void channelCreated(const Status& status, Channel* channel); + ServerChannelRequesterImpl(Transport* transport, const String channelName, const pvAccessID cid); + void channelCreated(const epics::pvData::Status& status, Channel* channel); void channelStateChange(Channel* c, const Channel::ConnectionState isConnected); String getRequesterName(); void message(const String message, const epics::pvData::MessageType messageType); void lock(); void unlock(); - void send(ByteBuffer* buffer, TransportSendControl* control); - void release(); - void acquire(); - + void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); private: Transport* _transport; const String _channelName; const pvAccessID _cid; - Status _status; + epics::pvData::Status _status; Channel* _channel; epics::pvData::Mutex _mutex; - void createChannelFailedResponse(ByteBuffer* buffer, TransportSendControl* control, const Status& status); + void createChannelFailedResponse(epics::pvData::ByteBuffer* buffer, TransportSendControl* control, const epics::pvData::Status& status); }; /****************************************************************************************/ /** * Destroy channel request handler. */ - class DestroyChannelHandler : public AbstractServerResponseHandler + class ServerDestroyChannelHandler : public AbstractServerResponseHandler { public: /** * @param context */ - DestroyChannelHandler(ServerContextImpl* context) : + ServerDestroyChannelHandler(ServerContextImpl* context) : AbstractServerResponseHandler(context, "Destroy channel request") { } @@ -289,14 +308,14 @@ namespace epics { }; - class DestroyChannelHandlerTransportSender : public TransportSender + class ServerDestroyChannelHandlerTransportSender : public ReferencedTransportSender { public: - DestroyChannelHandlerTransportSender(pvAccessID cid, pvAccessID sid): _cid(cid), _sid(sid) { + ServerDestroyChannelHandlerTransportSender(pvAccessID cid, pvAccessID sid): _cid(cid), _sid(sid) { } - void send(ByteBuffer* buffer, TransportSendControl* control) { + void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) { control->startMessage((int8)8, 2*sizeof(int32)/sizeof(int8)); buffer->putInt(_sid); buffer->putInt(_cid); @@ -310,14 +329,6 @@ namespace epics { // noop } - void release() { - delete this; - } - - void acquire() { - // noop - } - private: pvAccessID _cid; pvAccessID _sid; @@ -327,13 +338,13 @@ namespace epics { /** * Get request handler. */ - class GetHandler : public AbstractServerResponseHandler + class ServerGetHandler : public AbstractServerResponseHandler { public: /** * @param context */ - GetHandler(ServerContextImpl* context) : + ServerGetHandler(ServerContextImpl* context) : AbstractServerResponseHandler(context, "Get request") { } @@ -342,10 +353,10 @@ namespace epics { int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); }; - class ChannelGetRequesterImpl : public BaseChannelRequester, public ChannelGetRequester, public TransportSender + class ServerChannelGetRequesterImpl : public BaseChannelRequester, public ChannelGetRequester, public ReferencedTransportSender { public: - ChannelGetRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, const pvAccessID ioid, Transport* transport, + ServerChannelGetRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, const pvAccessID ioid, Transport* transport, epics::pvData::PVStructurePtr pvRequest); void channelGetConnect(const epics::pvData::Status& status, ChannelGet* channelGet, epics::pvData::PVStructurePtr pvStructure, epics::pvData::BitSet* bitSet); @@ -355,19 +366,379 @@ namespace epics { * @return the channelGet */ ChannelGet* getChannelGet(); - String getRequesterName(); void lock(); void unlock(); - void release(); - void acquire(); - void send(ByteBuffer* buffer, TransportSendControl* control); - void message(const String message, const epics::pvData::MessageType messageType); + void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); private: ChannelGet* _channelGet; epics::pvData::BitSet* _bitSet; epics::pvData::PVStructurePtr _pvStructure; epics::pvData::Status _status; - epics::pvData::Mutex _mutex; + }; + + + /****************************************************************************************/ + /** + * Put request handler. + */ + class ServerPutHandler : public AbstractServerResponseHandler + { + public: + /** + * @param context + */ + ServerPutHandler(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "Put request") { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + }; + + class ServerChannelPutRequesterImpl : public BaseChannelRequester, public ChannelPutRequester, public ReferencedTransportSender + { + public: + ServerChannelPutRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, const pvAccessID ioid, Transport* transport,epics::pvData::PVStructure* pvRequest); + void channelPutConnect(const epics::pvData::Status& status, ChannelPut* channelPut, epics::pvData::PVStructure* pvStructure, epics::pvData::BitSet* bitSet); + void putDone(const epics::pvData::Status& status); + void getDone(const epics::pvData::Status& status); + void lock(); + void unlock(); + void destroy(); + /** + * @return the channelPut + */ + ChannelPut* getChannelPut(); + /** + * @return the bitSet + */ + BitSet* getBitSet(); + /** + * @return the pvStructure + */ + PVStructure* getPVStructure(); + void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); + private: + ChannelPut* _channelPut; + epics::pvData::BitSet* _bitSet; + epics::pvData::PVStructure* _pvStructure; + epics::pvData::Status _status; + }; + + /****************************************************************************************/ + /** + * Put request handler. + */ + class ServerPutGetHandler : public AbstractServerResponseHandler + { + public: + /** + * @param context + */ + ServerPutGetHandler(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "Put-get request") { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + }; + + class ServerChannelPutGetRequesterImpl : public BaseChannelRequester, public ChannelPutGetRequester, public ReferencedTransportSender + { + public: + ServerChannelPutGetRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, const pvAccessID ioid, Transport* transport,epics::pvData::PVStructure* pvRequest); + void channelPutGetConnect(const epics::pvData::Status& status, ChannelPutGet* channelPutGet, epics::pvData::PVStructure* pvPutStructure, epics::pvData::PVStructure* pvGetStructure); + void getGetDone(const epics::pvData::Status& status); + void getPutDone(const epics::pvData::Status& status); + void putGetDone(const epics::pvData::Status& status); + void lock(); + void unlock(); + void destroy(); + /** + * @return the channelPutGet + */ + ChannelPutGet* getChannelPutGet(); + /** + * @return the pvPutStructure + */ + PVStructure* getPVPutStructure(); + void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); + private: + ChannelPutGet* _channelPutGet; + epics::pvData::PVStructure* _pvPutStructure; + epics::pvData::PVStructure* _pvGetStructure; + epics::pvData::Status _status; + }; + + + /****************************************************************************************/ + /** + * Monitor request handler. + */ + class ServerMonitorHandler : public AbstractServerResponseHandler + { + public: + /** + * @param context + */ + ServerMonitorHandler(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "Monitor request") { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + }; + + + class ServerMonitorRequesterImpl : public BaseChannelRequester, public MonitorRequester, public ReferencedTransportSender + { + public: + ServerMonitorRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, const pvAccessID ioid, Transport* transport,epics::pvData::PVStructure* pvRequest); + void monitorConnect(const epics::pvData::Status& status, epics::pvData::Monitor* monitor, epics::pvData::Structure* structure); + void unlisten(epics::pvData::Monitor* monitor); + void monitorEvent(epics::pvData::Monitor* monitor); + void lock(); + void unlock(); + void destroy(); + /** + * @return the channelMonitor + */ + epics::pvData::Monitor* getChannelMonitor(); + void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); + private: + epics::pvData::Monitor* _monitor; + epics::pvData::Monitor* _channelMonitor; + epics::pvData::Structure* _structure; + epics::pvData::Status _status; + }; + + + /****************************************************************************************/ + /** + * Array request handler. + */ + class ServerArrayHandler : public AbstractServerResponseHandler + { + public: + /** + * @param context + */ + ServerArrayHandler(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "Array request") { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + }; + + class ServerChannelArrayRequesterImpl : public BaseChannelRequester, public ChannelArrayRequester, public ReferencedTransportSender + { + public: + ServerChannelArrayRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, const pvAccessID ioid, Transport* transport,epics::pvData::PVStructure* pvRequest); + void channelArrayConnect(const epics::pvData::Status& status, ChannelArray* channelArray, epics::pvData::PVArray* pvArray); + void getArrayDone(const epics::pvData::Status& status); + void putArrayDone(const epics::pvData::Status& status); + void setLengthDone(const epics::pvData::Status& status); + void lock(); + void unlock(); + void destroy(); + /** + * @return the channelArray + */ + ChannelArray* getChannelArray(); + /** + * @return the pvArray + */ + epics::pvData::PVArray* getPVArray(); + void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); + private: + ChannelArray* _channelArray; + epics::pvData::PVArray* _pvArray; + epics::pvData::Status _status; + }; + + /****************************************************************************************/ + /** + * Cancel request handler. + */ + class ServerCancelRequestHandler : public AbstractServerResponseHandler + { + public: + /** + * @param context + */ + ServerCancelRequestHandler(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "Cancel request") { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + private: + /** + * @param transport + * @param ioid + * @param errorStatus + */ + void failureResponse(Transport* transport, pvAccessID ioid, const epics::pvData::Status& errorStatus); + }; + + + /****************************************************************************************/ + /** + * Process request handler. + */ + class ServerProcessHandler : public AbstractServerResponseHandler + { + public: + /** + * @param context + */ + ServerProcessHandler(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "Process request") { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + }; + + class ServerChannelProcessRequesterImpl : public BaseChannelRequester, public ChannelProcessRequester, public ReferencedTransportSender + { + public: + ServerChannelProcessRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, const pvAccessID ioid, Transport* transport,epics::pvData::PVStructure* pvRequest); + void channelProcessConnect(const epics::pvData::Status& status, ChannelProcess* channelProcess); + void processDone(const epics::pvData::Status& status); + void lock(); + void unlock(); + void destroy(); + /** + * @return the channelProcess + */ + ChannelProcess* getChannelProcess(); + void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); + private: + ChannelProcess* _channelProcess; + epics::pvData::Status _status; + int32 _refCount; + }; + + /****************************************************************************************/ + /** + * Get field request handler. + */ + class ServerGetFieldHandler : public AbstractServerResponseHandler + { + public: + /** + * @param context + */ + ServerGetFieldHandler(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "Get field request") { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + private: + void getFieldFailureResponse(Transport* transport, const pvAccessID ioid, const epics::pvData::Status& errorStatus); + }; + + class ServerGetFieldRequesterImpl : public BaseChannelRequester, public GetFieldRequester, public ReferencedTransportSender + { + public: + ServerGetFieldRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, const pvAccessID ioid, Transport* transport); + void getDone(const epics::pvData::Status& status, epics::pvData::FieldConstPtr field); + void lock(); + void unlock(); + void destroy(); + void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); + private: + epics::pvData::Status _status; + epics::pvData::FieldConstPtr _field; + }; + + class ServerGetFieldHandlerTransportSender : public ReferencedTransportSender + { + public: + ServerGetFieldHandlerTransportSender(const pvAccessID ioid,const epics::pvData::Status& status, Transport* transport): + _ioid(ioid), _status(status), _transport(transport) { + + } + + void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) { + control->startMessage((int8)17, sizeof(int32)/sizeof(int8)); + buffer->putInt(_ioid); + _transport->getIntrospectionRegistry()->serializeStatus(buffer, control, _status); + } + + void lock() { + // noop + } + + void unlock() { + // noop + } + + private: + const pvAccessID _ioid; + const epics::pvData::Status& _status; + Transport* _transport; + }; + + + + /****************************************************************************************/ + /** + * RPC handler. + */ + class ServerRPCHandler : public AbstractServerResponseHandler + { + public: + /** + * @param context + */ + ServerRPCHandler(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "RPC request") { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + }; + + class ServerChannelRPCRequesterImpl : public BaseChannelRequester, public ChannelRPCRequester, public ReferencedTransportSender + { + public: + ServerChannelRPCRequesterImpl(ServerContextImpl* context, ServerChannelImpl* channel, const pvAccessID ioid, Transport* transport,epics::pvData::PVStructure* pvRequest); + void channelRPCConnect(const epics::pvData::Status& status, ChannelRPC* channelRPC, epics::pvData::PVStructure* arguments, epics::pvData::BitSet* bitSet); + void requestDone(const epics::pvData::Status& status, epics::pvData::PVStructure* pvResponse); + void lock(); + void unlock(); + void destroy(); + /** + * @return the channelRPC + */ + ChannelRPC* getChannelRPC(); + /** + * @return the pvArguments + */ + PVStructure* getPvArguments(); + /** + * @return the agrumentsBitSet + */ + BitSet* getAgrumentsBitSet(); + void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); + private: + ChannelRPC* _channelRPC; + epics::pvData::PVStructure* _pvArguments; + epics::pvData::PVStructure* _pvResponse; + epics::pvData::BitSet* _argumentsBitSet; + epics::pvData::Status _status; }; } } diff --git a/pvAccessApp/server/serverContext.cpp b/pvAccessApp/server/serverContext.cpp index 7a66aaa..343b726 100644 --- a/pvAccessApp/server/serverContext.cpp +++ b/pvAccessApp/server/serverContext.cpp @@ -30,7 +30,6 @@ ServerContextImpl::ServerContextImpl(): _receiveBufferSize(MAX_TCP_RECV), _timer(NULL), _broadcastTransport(NULL), - _broadcastConnector(NULL), _beaconEmitter(NULL), _acceptor(NULL), _transportRegistry(NULL), @@ -105,8 +104,7 @@ void ServerContextImpl::loadConfiguration() void ServerContextImpl::initialize(ChannelAccess* channelAccess) { - //TODO uncomment - /*Lock guard(_mutex); + Lock guard(_mutex); if (channelAccess == NULL) { THROW_BASE_EXCEPTION("non null channelAccess expected"); @@ -128,7 +126,7 @@ void ServerContextImpl::initialize(ChannelAccess* channelAccess) { std::string msg = "Channel provider with name '" + _channelProviderName + "' not available."; THROW_BASE_EXCEPTION(msg.c_str()); - }*/ + } internalInitialize(); @@ -137,6 +135,7 @@ void ServerContextImpl::initialize(ChannelAccess* channelAccess) void ServerContextImpl::internalInitialize() { + //TODO should be allocated on stack _timer = new Timer("pvAccess-server timer",lowerPriority); _transportRegistry = new TransportRegistry(); @@ -144,14 +143,14 @@ void ServerContextImpl::internalInitialize() initializeBroadcastTransport(); _acceptor = new BlockingTCPAcceptor(this, _serverPort, _receiveBufferSize); - _serverPort = _acceptor->getBindAddress()->ia.sin_port; + //TODO fix this + //_serverPort = _acceptor->getBindAddress()->ia.sin_port; _beaconEmitter = new BeaconEmitter(_broadcastTransport, this); } void ServerContextImpl::initializeBroadcastTransport() { - // setup UDP transport try { @@ -161,7 +160,7 @@ void ServerContextImpl::initializeBroadcastTransport() listenLocalAddress.ia.sin_port = htons(_broadcastPort); listenLocalAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); - // where to send address + // where to send addresses SOCKET socket = epicsSocketCreate(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (socket == INVALID_SOCKET) { @@ -502,9 +501,15 @@ Transport* ServerContextImpl::getSearchTransport() //TODO return NULL; } -// TODO -void ServerContextImpl::acquire() {} -void ServerContextImpl::release() {} + +void ServerContextImpl::acquire() +{ + // TODO +} +void ServerContextImpl::release() +{ + // TODO +} } } diff --git a/pvAccessApp/server/serverContext.h b/pvAccessApp/server/serverContext.h index bfb21a4..6103b34 100644 --- a/pvAccessApp/server/serverContext.h +++ b/pvAccessApp/server/serverContext.h @@ -334,11 +334,6 @@ private: */ BlockingUDPTransport* _broadcastTransport; - /** - * Broadcast connector - */ - BlockingUDPConnector* _broadcastConnector; - /** * Beacon emitter. */