diff --git a/pvAccessApp/ca/caConstants.h b/pvAccessApp/ca/caConstants.h index 492a736..1dea910 100644 --- a/pvAccessApp/ca/caConstants.h +++ b/pvAccessApp/ca/caConstants.h @@ -75,6 +75,9 @@ namespace epics { /** Invalid IOID. */ const int32 CAJ_INVALID_IOID = 0; + + /** Default CA provider name. */ + const String CAJ_DEFAULT_PROVIDER = "local"; } } diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index c677177..da7b811 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -48,9 +48,9 @@ namespace epics { _handlerTable[0] = new NoopResponse(context, "Beacon"); _handlerTable[1] = new ConnectionValidationHandler(context); _handlerTable[2] = new EchoHandler(context); - _handlerTable[3] = badResponse; + _handlerTable[3] = new SearchHandler(context); _handlerTable[4] = badResponse; - _handlerTable[5] = badResponse; + _handlerTable[5] = new IntrospectionSearchHandler(context); _handlerTable[6] = badResponse; _handlerTable[7] = badResponse; _handlerTable[8] = badResponse; @@ -79,6 +79,8 @@ namespace epics { delete _handlerTable[0]; delete _handlerTable[1]; delete _handlerTable[2]; + delete _handlerTable[3]; + delete _handlerTable[5]; delete _handlerTable[27]; delete[] _handlerTable; } @@ -165,5 +167,254 @@ namespace epics { transport->enqueueSendRequest(echoReply); } + + void IntrospectionSearchHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + THROW_BASE_EXCEPTION("not implemented"); + } + + SearchHandler::SearchHandler(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "Introspection search request") + { + _provider = context->getChannelProvider(); + _objectPool = new ChannelFindRequesterImplObjectPool(context); + } + + SearchHandler::~SearchHandler() + { + if(_objectPool) delete _objectPool; + } + + void SearchHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + transport->ensureData((sizeof(int32)+sizeof(int16))/sizeof(int8)+1); + const int32 searchSequenceId = payloadBuffer->getInt(); + const int8 qosCode = payloadBuffer->getByte(); + const int32 count = payloadBuffer->getShort() & 0xFFFF; + const boolean responseRequired = (QOS_REPLY_REQUIRED & qosCode) != 0; + + for (int32 i = 0; i < count; i++) + { + transport->ensureData(sizeof(int32)/sizeof(int8)); + const int32 cid = payloadBuffer->getInt(); + const String name = SerializeHelper::deserializeString(payloadBuffer, transport); + // no name check here... + + _provider->channelFind(name, _objectPool->get()->set(searchSequenceId, cid, responseFrom, responseRequired)); + } + } + + ChannelFindRequesterImpl::ChannelFindRequesterImpl(ServerContextImpl* context, ChannelFindRequesterImplObjectPool* objectPool) : + _sendTo(NULL), + _context(context), + _objectPool(objectPool) + {} + + void ChannelFindRequesterImpl::clear() + { + Lock guard(_mutex); + _sendTo = NULL; + } + + ChannelFindRequesterImpl* ChannelFindRequesterImpl::set(int32 searchSequenceId, int32 cid, osiSockAddr* sendTo, boolean responseRequired) + { + Lock guard(_mutex); + _searchSequenceId = searchSequenceId; + _cid = cid; + _sendTo = sendTo; + _responseRequired = responseRequired; + return this; + } + + void ChannelFindRequesterImpl::channelFindResult(epics::pvData::Status* status, ChannelFind* channelFind, boolean wasFound) + { + // TODO status + Lock guard(_mutex); + if (wasFound || _responseRequired) + { + _wasFound = wasFound; + _context->getBroadcastTransport()->enqueueSendRequest(this); + } + } + + void ChannelFindRequesterImpl::lock() + { + // noop + } + + void ChannelFindRequesterImpl::unlock() + { + // noop + } + + void ChannelFindRequesterImpl::acquire() + { + // noop + } + + void ChannelFindRequesterImpl::release() + { + // noop + } + + void ChannelFindRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) + { + int32 count = 1; + control->startMessage((int8)4, (sizeof(int32)+sizeof(int8)+128+2*sizeof(int16)+count*sizeof(int32))/sizeof(8)); + + Lock guard(_mutex); + buffer->putInt(_searchSequenceId); + buffer->putByte(_wasFound ? (int8)1 : (int8)0); + + // NOTE: is it possible (very likely) that address is any local address ::ffff:0.0.0.0 + encodeAsIPv6Address(buffer, _context->getServerInetAddress()); + buffer->putShort((int16)_context->getServerPort()); + buffer->putShort((int16)count); + buffer->putInt(_cid); + + control->setRecipient(*_sendTo); + + // return this object to the pool + _objectPool->put(this); + } + + ChannelFindRequesterImplObjectPool::ChannelFindRequesterImplObjectPool(ServerContextImpl* context) : + _context(context) + {} + + ChannelFindRequesterImpl* ChannelFindRequesterImplObjectPool::get() + { + Lock guard(_mutex); + const int32 count = _elements.size(); + if (count == 0) + { + return new ChannelFindRequesterImpl(_context, this); + } + else + { + ChannelFindRequesterImpl* channelFindRequesterImpl = _elements.back(); + _elements.pop_back(); + return channelFindRequesterImpl; + } + } + + void ChannelFindRequesterImplObjectPool::put(ChannelFindRequesterImpl* element) + { + Lock guard(_mutex); + element->clear(); + _elements.push_back(element); + } + + + ChannelRequesterImpl::ChannelRequesterImpl(Transport* transport, const String channelName, const int32 cid) : + _transport(transport), + _channelName(channelName), + _cid(cid), + _status(NULL), + _channel(NULL) + { + + } + + void ChannelRequesterImpl::channelCreated(Status* const status, Channel* const channel) + { + Lock guard(_mutex); + _status = status; + _channel = channel; + } + + void ChannelRequesterImpl::channelStateChange(Channel* constc, const Channel::ConnectionState isConnected) + { + //noop + } + + String ChannelRequesterImpl::getRequesterName() + { + //TODO + // return _transport-> + "/" + _cid; + } + + void ChannelRequesterImpl::message(const String message, const epics::pvData::MessageType messageType) + { + // TODO + //System.err.println("[" + messageType + "] " + message); + } + + void ChannelRequesterImpl::lock() + { + //noop + } + + void ChannelRequesterImpl::unlock() + { + //noop + } + + void ChannelRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) + { + /* Channel* channel; + Status* status; + { + Lock guard(_mutex); + channel = _channel; + status = _status; + } + + // error response + if (channel == NULL) + { + createChannelFailedResponse(buffer, control, status); + } + // OK + else + { + ServerChannelImpl serverChannel = NULL; + try + { + // NOTE: we do not explicitly check if transport OK + ChannelHostingTransport* casTransport = static_cast(_transport); + + // + // create a new channel instance + // + int sid = casTransport->preallocateChannelSID(); + try + { + //TODO + serverChannel = new ServerChannelImpl(channel, _cid, sid, casTransport->getSecurityToken()); + + // ack allocation and register + casTransport->registerChannel(sid, serverChannel); + + } catch (...) + { + // depreallocate and rethrow + casTransport->depreallocateChannelSID(sid); + throw; + } + + control->startMessage((int8)7, 2*sizeof(int32)/sizeof(int8)); + buffer->putInt(_cid); + buffer->putInt(sid); + _transport->getIntrospectionRegistry()->serializeStatus(buffer, control, status); + } + catch (...) + { + errlogPrintf(errlogMinor, "Exception caught when creating channel: %s", _channelName); + createChannelFailedResponse(buffer, control, + //BaseChannelRequester.statusCreate.createStatus(StatusType.FATAL, "failed to create channel", th)); + // if (serverChannel != null) + // serverChannel.destroy(); + } + }*/ + } } } diff --git a/pvAccessApp/server/responseHandlers.h b/pvAccessApp/server/responseHandlers.h index ccc3849..2058367 100644 --- a/pvAccessApp/server/responseHandlers.h +++ b/pvAccessApp/server/responseHandlers.h @@ -133,6 +133,130 @@ namespace epics { int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); }; + + /** + * Introspection search request handler. + */ + class IntrospectionSearchHandler : public AbstractServerResponseHandler + { + public: + /** + * @param context + */ + IntrospectionSearchHandler(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "Search request") { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + }; + + /** + * Introspection search request handler. + */ + class ChannelFindRequesterImplObjectPool; + class SearchHandler : public AbstractServerResponseHandler + { + public: + /** + * @param context + */ + SearchHandler(ServerContextImpl* context); + ~SearchHandler(); + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + + private: + ChannelProvider* _provider; + ChannelFindRequesterImplObjectPool* _objectPool; + }; + + + class ChannelFindRequesterImpl: public ChannelFindRequester, public TransportSender + { + public: + ChannelFindRequesterImpl(ServerContextImpl* context, ChannelFindRequesterImplObjectPool* objectPool); + void clear(); + ChannelFindRequesterImpl* set(int32 searchSequenceId, int32 cid, osiSockAddr* sendTo, boolean responseRequired); + void channelFindResult(epics::pvData::Status* status, ChannelFind* channelFind, boolean wasFound); + void lock(); + void unlock(); + void acquire(); + void release(); + void send(ByteBuffer* buffer, TransportSendControl* control); + private: + int32 _searchSequenceId; + int32 _cid; + osiSockAddr* _sendTo; + boolean _responseRequired; + boolean _wasFound; + epics::pvData::Mutex _mutex; + ServerContextImpl* _context; + ChannelFindRequesterImplObjectPool* _objectPool; + }; + + class ChannelFindRequesterImplObjectPool + { + public: + ChannelFindRequesterImplObjectPool(ServerContextImpl* context); + ChannelFindRequesterImpl* get(); + void put(ChannelFindRequesterImpl* element); + + private: + std::vector _elements; + epics::pvData::Mutex _mutex; + ServerContextImpl* _context; + }; + + /** + * Create channel request handler. + */ + class CreateChannelHandler : public AbstractServerResponseHandler + { + public: + /** + * @param context + */ + CreateChannelHandler(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "Create channel request") { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + + private: + void disconnect(Transport* transport); + }; + + class ServerChannelImpl; + class ChannelRequesterImpl : public ChannelRequester, public TransportSender + { + public: + ChannelRequesterImpl(Transport* transport, const String channelName, const int32 cid); + void channelCreated(Status* const status, Channel* const channel); + void channelStateChange(Channel* const c, const Channel::ConnectionState isConnected); + String getRequesterName(); + void message(const String message, const epics::pvData::MessageType messageType); + void lock(); + void unlock(); + void send(ByteBuffer* buffer, TransportSendControl* control); + + private: + Transport* _transport; + const String _channelName; + const int32 _cid; + Status* _status; + Channel* _channel; + epics::pvData::Mutex _mutex; + void createChannelFailedResponse(ByteBuffer* buffer, TransportSendControl* control, Status* const status); + }; + + + } } diff --git a/pvAccessApp/server/serverContext.cpp b/pvAccessApp/server/serverContext.cpp index 7f59abd..e7b8aae 100644 --- a/pvAccessApp/server/serverContext.cpp +++ b/pvAccessApp/server/serverContext.cpp @@ -35,9 +35,7 @@ ServerContextImpl::ServerContextImpl(): _acceptor(NULL), _transportRegistry(NULL), _channelAccess(NULL), - //TODO CAJ_DEFAULT_PROVIDER is not defined - _channelProviderName("local"), - //_channelProviderName(CAJ_DEFAULT_PROVIDER), + _channelProviderName(CAJ_DEFAULT_PROVIDER), _channelProvider(NULL), _beaconServerStatusProvider(NULL) @@ -50,7 +48,6 @@ ServerContextImpl::~ServerContextImpl() { if(_beaconEmitter) delete _beaconEmitter; if(_broadcastTransport) delete _broadcastTransport; - if(_broadcastConnector) delete _broadcastConnector; if(_acceptor) delete _acceptor; if(_transportRegistry) delete _transportRegistry; if(_timer) delete _timer; @@ -166,34 +163,32 @@ void ServerContextImpl::initializeBroadcastTransport() // where to send address SOCKET socket = epicsSocketCreate(AF_INET, SOCK_STREAM, IPPROTO_TCP); - InetAddrVector* broadcasts = getBroadcastAddresses(socket,_broadcastPort); + if (socket == INVALID_SOCKET) + { + THROW_BASE_EXCEPTION("Failed to initialize broadcast UDP transport"); + } + auto_ptr broadcastAddresses(getBroadcastAddresses(socket,_broadcastPort)); epicsSocketDestroy(socket); - - _broadcastConnector = new BlockingUDPConnector(true, true); - - _broadcastTransport = static_cast(_broadcastConnector->connect( + auto_ptr broadcastConnector(new BlockingUDPConnector(true, true)); + _broadcastTransport = static_cast(broadcastConnector->connect( NULL, new ServerResponseHandler(this), listenLocalAddress, CA_MINOR_PROTOCOL_REVISION, CA_DEFAULT_PRIORITY)); - - _broadcastTransport->setBroadcastAddresses(broadcasts); - if(broadcasts) delete broadcasts; + _broadcastTransport->setBroadcastAddresses(broadcastAddresses.get()); // set ignore address list - if (_ignoreAddressList.length() > 0) + if (!_ignoreAddressList.empty()) { // we do not care about the port - InetAddrVector* list = getSocketAddressList(_ignoreAddressList, 0, NULL); - if (list != NULL && list->size() > 0) + auto_ptr list(getSocketAddressList(_ignoreAddressList, 0, NULL)); + if (list.get() != NULL && list->size() > 0) { - _broadcastTransport->setIgnoredAddresses(list); + _broadcastTransport->setIgnoredAddresses(list.get()); } - if(list) delete list; - } // set broadcast address list - if (_beaconAddressList.length() > 0) + if (!_beaconAddressList.empty()) { // if auto is true, add it to specified list InetAddrVector* appendList = NULL; @@ -202,16 +197,19 @@ void ServerContextImpl::initializeBroadcastTransport() appendList = _broadcastTransport->getSendAddresses(); } - InetAddrVector* list = getSocketAddressList(_beaconAddressList, _broadcastPort, appendList); - if (list != NULL && list->size() > 0) + auto_ptr list(getSocketAddressList(_beaconAddressList, _broadcastPort, appendList)); + if (list.get() != NULL && list->size() > 0) { - _broadcastTransport->setBroadcastAddresses(list); + _broadcastTransport->setBroadcastAddresses(list.get()); } - if(list) delete list; } _broadcastTransport->start(); } + catch (std::exception& e) + { + THROW_BASE_EXCEPTION_CAUSE("Failed to initialize broadcast UDP transport", e); + } catch (...) { THROW_BASE_EXCEPTION("Failed to initialize broadcast UDP transport"); @@ -251,6 +249,7 @@ void ServerContextImpl::run(int32 seconds) // run... _beaconEmitter->start(); + //TODO review this if(seconds == 0) { _runEvent.wait(); @@ -274,7 +273,6 @@ void ServerContextImpl::shutdown() THROW_BASE_EXCEPTION("Context already destroyed."); } - // notify to stop running... _runEvent.signal(); } @@ -317,13 +315,6 @@ void ServerContextImpl::internalDestroy() _beaconEmitter->destroy(); } - // stop timer - if (_timer != NULL) - { - //TODO there is no stop in Timer - // _timer->stop(); - } - // this will also destroy all channels destroyAllTransports(); } @@ -338,18 +329,18 @@ void ServerContextImpl::destroyAllTransports() } int32 size; - Transport** transports = _transportRegistry->toArray(size); + auto_ptr transports(_transportRegistry->toArray(size)); if (size == 0) { return; } - errlogSevPrintf(errlogMajor, "Server context still has %d transport(s) active and closing...", size); + errlogSevPrintf(errlogInfo, "Server context still has %d transport(s) active and closing...", size); for (int i = 0; i < size; i++) { - Transport* transport = transports[i]; + Transport* transport = transports.get()[i]; try { transport->close(true); @@ -365,8 +356,6 @@ void ServerContextImpl::destroyAllTransports() errlogSevPrintf(errlogMajor, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); } } - - delete[] transports; } void ServerContextImpl::printInfo() @@ -502,19 +491,17 @@ TransportRegistry* ServerContextImpl::getTransportRegistry() return _transportRegistry; } -//TODO what with this? Channel* ServerContextImpl::getChannel(pvAccessID id) { + //TODO return NULL; } -//TODO what with this? Transport* ServerContextImpl::getSearchTransport() { + //TODO return NULL; } - - } } diff --git a/pvAccessApp/server/serverContext.h b/pvAccessApp/server/serverContext.h index 9f06390..4ad0357 100644 --- a/pvAccessApp/server/serverContext.h +++ b/pvAccessApp/server/serverContext.h @@ -326,16 +326,6 @@ private: */ Timer* _timer; - /** - * Reactor. - */ - //Reactor _reactor; - - /** - * Leader/followers thread pool. - */ - //LeaderFollowersThreadPool _leaderFollowersThreadPool; - /** * Broadcast transport needed for channel searches. */ diff --git a/testApp/remote/testServerContext.cpp b/testApp/remote/testServerContext.cpp index 0e99aa2..49fbcda 100644 --- a/testApp/remote/testServerContext.cpp +++ b/testApp/remote/testServerContext.cpp @@ -3,6 +3,8 @@ */ #include "serverContext.h" +#include +#include using namespace epics::pvAccess; using namespace epics::pvData; @@ -27,5 +29,8 @@ int main(int argc, char *argv[]) testServerContext(); cout << "Done" << endl; + + epicsExitCallAtExits(); + CDRMonitor::get().show(stdout); return (0); }