diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp index f778f6e..a63587e 100644 --- a/pvAccessApp/remote/blockingTCPAcceptor.cpp +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -180,7 +180,7 @@ namespace pvAccess { LOG(logLevelDebug, "Error setting SO_KEEPALIVE: %s.", strBuffer); } - // TODO tune buffer sizes?! + // do NOT tune socket buffer sizes, this will disable auto-tunning // get TCP send buffer size osiSocklen_t intLen = sizeof(int); @@ -223,7 +223,8 @@ namespace pvAccess { bool BlockingTCPAcceptor::validateConnection(Transport::shared_pointer const & transport, const char* address) { try { - transport->verify(0); + // TODO constant + transport->verify(5000); return true; } catch(...) { LOG(logLevelDebug, "Validation of %s failed.", address); diff --git a/pvAccessApp/remote/blockingUDP.h b/pvAccessApp/remote/blockingUDP.h index 02dc3da..4504cc6 100644 --- a/pvAccessApp/remote/blockingUDP.h +++ b/pvAccessApp/remote/blockingUDP.h @@ -115,7 +115,7 @@ namespace epics { return true; } - virtual void verified() { + virtual void verified(epics::pvData::Status const & /*status*/) { // noop } diff --git a/pvAccessApp/remote/codec.cpp b/pvAccessApp/remote/codec.cpp index 5cf50b0..2dba640 100644 --- a/pvAccessApp/remote/codec.cpp +++ b/pvAccessApp/remote/codec.cpp @@ -1269,6 +1269,35 @@ namespace epics { } } + + bool BlockingTCPTransportCodec::verify(epics::pvData::int32 timeoutMs) { + return _verifiedEvent.wait(timeoutMs/1000.0); + } + + void BlockingTCPTransportCodec::verified(epics::pvData::Status const & status) { + epics::pvData::Lock lock(_verifiedMutex); + + if (IS_LOGGABLE(logLevelDebug) && !status.isOK()) + { + char ipAddrStr[48]; + ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); + LOG(logLevelDebug, "Failed to verify connection to %s: %s.", ipAddrStr, status.getMessage().c_str()); + // TODO stack dump + } + + _verified = status.isSuccess(); + _verifiedEvent.signal(); + } + + + + + + + + + + BlockingServerTCPTransportCodec::BlockingServerTCPTransportCodec( Context::shared_pointer const & context, SOCKET channel, @@ -1277,7 +1306,7 @@ namespace epics { int32_t receiveBufferSize) : BlockingTCPTransportCodec(context, channel, responseHandler, sendBufferSize, receiveBufferSize, PVA_DEFAULT_PRIORITY), - _lastChannelSID(0) + _lastChannelSID(0), _verifyOrVerified(false) { // NOTE: priority not yet known, default priority is used to @@ -1343,33 +1372,59 @@ namespace epics { void BlockingServerTCPTransportCodec::send(ByteBuffer* buffer, TransportSendControl* control) { - // - // set byte order control message - // + if (!_verifyOrVerified) + { + _verifyOrVerified = true; - ensureBuffer(PVA_MESSAGE_HEADER_SIZE); - buffer->putByte(PVA_MAGIC); - buffer->putByte(PVA_VERSION); - buffer->putByte( - 0x01 | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) - ? 0x80 : 0x00)); // control + big endian - buffer->putByte(2); // set byte order - buffer->putInt(0); + // + // set byte order control message + // + + ensureBuffer(PVA_MESSAGE_HEADER_SIZE); + buffer->putByte(PVA_MAGIC); + buffer->putByte(PVA_VERSION); + buffer->putByte( + 0x01 | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) + ? 0x80 : 0x00)); // control + big endian + buffer->putByte(2); // set byte order + buffer->putInt(0); - // - // send verification message - // - control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32)); + // + // send verification message + // + control->startMessage(CMD_CONNECTION_VALIDATION, 4+2); - // receive buffer size - buffer->putInt(static_cast(getReceiveBufferSize())); + // receive buffer size + buffer->putInt(static_cast(getReceiveBufferSize())); - // socket receive buffer size - buffer->putInt(static_cast(getSocketReceiveBufferSize())); + // server introspection registy max size + // TODO + buffer->putShort(0x7FFF); - // send immediately - control->flush(true); + // list of authNZ plugin names + // TODO + buffer->putByte(0); + + // send immediately + control->flush(true); + } + else + { + // + // send verified message + // + control->startMessage(CMD_CONNECTION_VALIDATED, 0); + + { + Lock lock(_verificationStatusMutex); + _verificationStatus.serialize(buffer, control); + } + + // send immediately + control->flush(true); + + } } void BlockingServerTCPTransportCodec::destroyAllChannels() { @@ -1419,8 +1474,7 @@ namespace epics { sendBufferSize, receiveBufferSize, priority), _connectionTimeout(beaconInterval*1000), _unresponsiveTransport(false), - _verifyOrEcho(true), - _verified(false) + _verifyOrEcho(true) { // initialize owners list, send queue acquire(client); @@ -1581,16 +1635,6 @@ namespace epics { if(_unresponsiveTransport) responsiveTransport(); } - bool BlockingClientTCPTransportCodec::verify(epics::pvData::int32 timeoutMs) { - return _verifiedEvent.wait(timeoutMs/1000.0); - } - - void BlockingClientTCPTransportCodec::verified() { - epics::pvData::Lock lock(_verifiedMutex); - _verified = true; - _verifiedEvent.signal(); - } - void BlockingClientTCPTransportCodec::responsiveTransport() { Lock lock(_mutex); if(_unresponsiveTransport) { @@ -1625,25 +1669,30 @@ namespace epics { void BlockingClientTCPTransportCodec::send(ByteBuffer* buffer, TransportSendControl* control) { if(_verifyOrEcho) { + _verifyOrEcho = false; + /* * send verification response message */ - control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32)+sizeof(int16)); + control->startMessage(CMD_CONNECTION_VALIDATION, 4+2+2); // receive buffer size buffer->putInt(static_cast(getReceiveBufferSize())); - // socket receive buffer size - buffer->putInt(static_cast(getSocketReceiveBufferSize())); + // max introspection registry size + // TODO + buffer->putShort(0x7FFF); - // connection priority + // QoS (aka connection priority) buffer->putShort(getPriority()); + // authNZ plugin name + // TODO + SerializeHelper::serializeString("", buffer, control); + // send immediately control->flush(true); - - _verifyOrEcho = false; } else { control->startMessage(CMD_ECHO, 0); diff --git a/pvAccessApp/remote/codec.h b/pvAccessApp/remote/codec.h index 91eca9e..69afd4c 100644 --- a/pvAccessApp/remote/codec.h +++ b/pvAccessApp/remote/codec.h @@ -518,6 +518,10 @@ namespace epics { start(); } + bool verify(epics::pvData::int32 timeoutMs); + + void verified(epics::pvData::Status const & status); + protected: BlockingTCPTransportCodec( @@ -531,7 +535,8 @@ namespace epics { BlockingSocketAbstractCodec(channel, sendBufferSize, receiveBufferSize), _context(context), _responseHandler(responseHandler), _remoteTransportReceiveBufferSize(MAX_TCP_RECV), - _remoteTransportRevision(0), _priority(priority) + _remoteTransportRevision(0), _priority(priority), + _verified(false) { } @@ -548,6 +553,10 @@ namespace epics { size_t _remoteTransportReceiveBufferSize; epics::pvData::int8 _remoteTransportRevision; epics::pvData::int16 _priority; + + bool _verified; + epics::pvData::Mutex _verifiedMutex; + epics::pvData::Event _verifiedEvent; }; @@ -622,14 +631,23 @@ namespace epics { } bool verify(epics::pvData::int32 timeoutMs) { - TransportSender::shared_pointer transportSender = + + TransportSender::shared_pointer transportSender = std::tr1::dynamic_pointer_cast(shared_from_this()); enqueueSendRequest(transportSender); - verified(); - return true; + + bool verifiedStatus = BlockingTCPTransportCodec::verify(timeoutMs); + + enqueueSendRequest(transportSender); + + return verifiedStatus; } - void verified() { + void verified(epics::pvData::Status const & status) { + _verificationStatusMutex.lock(); + _verificationStatus = status; + _verificationStatusMutex.unlock(); + BlockingTCPTransportCodec::verified(status); } void aliveNotification() { @@ -660,6 +678,11 @@ namespace epics { epics::pvData::Mutex _channelsMutex; + epics::pvData::Status _verificationStatus; + epics::pvData::Mutex _verificationStatusMutex; + + bool _verifyOrVerified; + }; class BlockingClientTCPTransportCodec : @@ -731,10 +754,6 @@ namespace epics { // noop } - bool verify(epics::pvData::int32 timeoutMs); - - void verified(); - void aliveNotification(); void send(epics::pvData::ByteBuffer* buffer, @@ -786,13 +805,7 @@ namespace epics { */ void responsiveTransport(); - epics::pvData::Mutex _mutex; - - bool _verified; - epics::pvData::Mutex _verifiedMutex; - epics::pvData::Event _verifiedEvent; - }; } diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index af2e0ad..e1d9178 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -94,11 +94,11 @@ namespace epics { CMD_ECHO = 2, CMD_SEARCH = 3, CMD_SEARCH_RESPONSE = 4, - CMD_INTROSPECTION_SEARCH = 5, - CMD_INTROSPECTION_SEARCH_RESPONSE = 6, + CMD_AUTHNZ = 5, + CMD_ACL_CHANGE = 6, CMD_CREATE_CHANNEL = 7, CMD_DESTROY_CHANNEL = 8, - CMD_RESERVED0 = 9, + CMD_CONNECTION_VALIDATED = 9, CMD_GET = 10, CMD_PUT = 11, CMD_PUT_GET = 12, @@ -263,8 +263,9 @@ namespace epics { /** * Notify transport that it is has been verified. + * @param status vefification status; */ - virtual void verified() = 0; + virtual void verified(epics::pvData::Status const & status) = 0; /** * Waits (if needed) until transport is verified, i.e. verified() method is being called. diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index 69a1f92..1866185 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -228,14 +228,14 @@ namespace epics { transport->ensureData(1); int8 qos = payloadBuffer->getByte(); - Status m_status; - m_status.deserialize(payloadBuffer, transport.get()); + Status status; + status.deserialize(payloadBuffer, transport.get()); try { if (qos & QOS_INIT) { - if (m_status.isSuccess()) + if (status.isSuccess()) { // once created set destroy flag m_mutex.lock(); @@ -247,7 +247,7 @@ namespace epics { // this is safe since at least caller owns it m_thisPointer.reset(); - initResponse(transport, version, payloadBuffer, qos, m_status); + initResponse(transport, version, payloadBuffer, qos, status); } else { @@ -261,7 +261,7 @@ namespace epics { m_mutex.unlock(); } - normalResponse(transport, version, payloadBuffer, qos, m_status); + normalResponse(transport, version, payloadBuffer, qos, status); if (destroyReq) destroy(); @@ -2719,16 +2719,44 @@ namespace epics { { AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); - transport->ensureData(8); - transport->setRemoteTransportReceiveBufferSize(payloadBuffer->getInt()); - transport->setRemoteTransportSocketReceiveBufferSize(payloadBuffer->getInt()); - transport->setRemoteRevision(version); + + transport->ensureData(4+2); + + transport->setRemoteTransportReceiveBufferSize(payloadBuffer->getInt()); + // TODO + // TODO serverIntrospectionRegistryMaxSize + /*int serverIntrospectionRegistryMaxSize = */ payloadBuffer->getShort(); + // TODO authNZ + size_t size = SerializeHelper::readSize(payloadBuffer, transport.get()); + for (size_t i = 0; i < size; i++) + SerializeHelper::deserializeString(payloadBuffer, transport.get()); + TransportSender::shared_pointer sender = dynamic_pointer_cast(transport); - if (sender.get()) { + if (sender.get()) transport->enqueueSendRequest(sender); - } - transport->verified(); + } + }; + + class ClientConnectionValidatedHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { + public: + ClientConnectionValidatedHandler(ClientContextImpl::shared_pointer context) : + AbstractClientResponseHandler(context, "Connection validated") + { + } + + virtual ~ClientConnectionValidatedHandler() { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport::shared_pointer const & transport, int8 version, int8 command, + size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer) + { + AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + + Status status; + status.deserialize(payloadBuffer, transport.get()); + transport->verified(status); } }; @@ -2843,11 +2871,11 @@ namespace epics { m_handlerTable[CMD_ECHO].reset(new NoopResponse(context, "Echo")); /* 2 */ m_handlerTable[CMD_SEARCH].reset(new NoopResponse(context, "Search")); /* 3 */ m_handlerTable[CMD_SEARCH_RESPONSE].reset(new SearchResponseHandler(context)); /* 4 */ - m_handlerTable[CMD_INTROSPECTION_SEARCH].reset(new NoopResponse(context, "Introspection search")); /* 5 */ - m_handlerTable[CMD_INTROSPECTION_SEARCH_RESPONSE] = dataResponse; /* 6 - introspection search */ + m_handlerTable[CMD_AUTHNZ].reset(new NoopResponse(context, "Introspection search")); /* 5 */ + m_handlerTable[CMD_ACL_CHANGE] = dataResponse; /* 6 */ m_handlerTable[CMD_CREATE_CHANNEL].reset(new CreateChannelHandler(context)); /* 7 */ m_handlerTable[CMD_DESTROY_CHANNEL].reset(new NoopResponse(context, "Destroy channel")); /* 8 */ // TODO it might be useful to implement this... - m_handlerTable[CMD_RESERVED0] = badResponse; /* 9 */ + m_handlerTable[CMD_CONNECTION_VALIDATED].reset(new ClientConnectionValidatedHandler(context)); /* 9 */ m_handlerTable[CMD_GET] = dataResponse; /* 10 - get response */ m_handlerTable[CMD_PUT] = dataResponse; /* 11 - put response */ m_handlerTable[CMD_PUT_GET] = dataResponse; /* 12 - put-get response */ diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index dbb7ec1..8595637 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -90,11 +90,11 @@ ServerResponseHandler::ServerResponseHandler(ServerContextImpl::shared_pointer c m_handlerTable[CMD_ECHO].reset(new ServerEchoHandler(context)); /* 2 */ m_handlerTable[CMD_SEARCH].reset(new ServerSearchHandler(context)); /* 3 */ m_handlerTable[CMD_SEARCH_RESPONSE] = badResponse; - m_handlerTable[CMD_INTROSPECTION_SEARCH].reset(new ServerIntrospectionSearchHandler(context)); /* 5 */ - m_handlerTable[CMD_INTROSPECTION_SEARCH_RESPONSE] = badResponse; /* 6 - introspection search */ + m_handlerTable[CMD_AUTHNZ] = badResponse; /* 5 */ + m_handlerTable[CMD_ACL_CHANGE] = badResponse; /* 6 - introspection search */ m_handlerTable[CMD_CREATE_CHANNEL].reset(new ServerCreateChannelHandler(context)); /* 7 */ m_handlerTable[CMD_DESTROY_CHANNEL].reset(new ServerDestroyChannelHandler(context)); /* 8 */ - m_handlerTable[CMD_RESERVED0] = badResponse; /* 9 */ + m_handlerTable[CMD_CONNECTION_VALIDATED] = badResponse; /* 9 */ m_handlerTable[CMD_GET].reset(new ServerGetHandler(context)); /* 10 - get response */ m_handlerTable[CMD_PUT].reset(new ServerPutHandler(context)); /* 11 - put response */ @@ -142,14 +142,19 @@ void ServerConnectionValidationHandler::handleResponse( AbstractServerResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); - transport->ensureData(2*sizeof(int32)+sizeof(int16)); - transport->setRemoteTransportReceiveBufferSize( - payloadBuffer->getInt()); - transport->setRemoteTransportSocketReceiveBufferSize( - payloadBuffer->getInt()); - transport->setRemoteRevision(version); - // TODO support priority !!! - //transport.setPriority(payloadBuffer.getShort()); + transport->setRemoteRevision(version); + + transport->ensureData(4+2+2); + transport->setRemoteTransportReceiveBufferSize(payloadBuffer->getInt()); + // TODO clientIntrospectionRegistryMaxSize + /* int clientIntrospectionRegistryMaxSize = */ payloadBuffer->getShort(); + // TODO connectionQoS + /* int16 connectionQoS = */ payloadBuffer->getShort(); + // TODO authNZ + /*std::string authNZ = */ SerializeHelper::deserializeString(payloadBuffer, transport.get()); + + // TODO call this after authNZ has done their work + transport->verified(Status::Ok); } void ServerEchoHandler::handleResponse(osiSockAddr* responseFrom, @@ -164,16 +169,6 @@ void ServerEchoHandler::handleResponse(osiSockAddr* responseFrom, transport->enqueueSendRequest(echoReply); } -void ServerIntrospectionSearchHandler::handleResponse(osiSockAddr* responseFrom, - Transport::shared_pointer const & transport, int8 version, int8 command, - size_t payloadSize, ByteBuffer* payloadBuffer) -{ - AbstractServerResponseHandler::handleResponse(responseFrom, - transport, version, command, payloadSize, payloadBuffer); - - THROW_BASE_EXCEPTION("not implemented"); -} - /****************************************************************************************/ std::string ServerSearchHandler::SUPPORTED_PROTOCOL = "tcp"; diff --git a/pvAccessApp/server/responseHandlers.h b/pvAccessApp/server/responseHandlers.h index a509069..552c0c4 100644 --- a/pvAccessApp/server/responseHandlers.h +++ b/pvAccessApp/server/responseHandlers.h @@ -133,22 +133,6 @@ namespace pvAccess { osiSockAddr _echoFrom; }; - /** - * Introspection search request handler. - */ - class ServerIntrospectionSearchHandler : public AbstractServerResponseHandler - { - public: - ServerIntrospectionSearchHandler(ServerContextImpl::shared_pointer const & context) : - AbstractServerResponseHandler(context, "Search request") { - } - virtual ~ServerIntrospectionSearchHandler() {} - - virtual void handleResponse(osiSockAddr* responseFrom, - Transport::shared_pointer const & transport, epics::pvData::int8 version, epics::pvData::int8 command, - std::size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer); - }; - /****************************************************************************************/ /** * Search channel request handler. diff --git a/testApp/remote/testCodec.cpp b/testApp/remote/testCodec.cpp index 52b0d49..f39846a 100644 --- a/testApp/remote/testCodec.cpp +++ b/testApp/remote/testCodec.cpp @@ -352,7 +352,7 @@ namespace epics { bool verify(epics::pvData::int32 timeoutMs) { return true;} - void verified() {} + void verified(epics::pvData::Status const &) {} void aliveNotification() {}