protocol change: connection validation/authNZ support

This commit is contained in:
Matej Sekoranja
2014-06-09 12:39:29 +02:00
parent 8862d29ea5
commit b101fa1e7a
9 changed files with 186 additions and 115 deletions

View File

@@ -180,7 +180,7 @@ namespace pvAccess {
LOG(logLevelDebug, "Error setting SO_KEEPALIVE: %s.", strBuffer);
}
// TODO tune buffer sizes?!
// do NOT tune socket buffer sizes, this will disable auto-tunning
// get TCP send buffer size
osiSocklen_t intLen = sizeof(int);
@@ -223,7 +223,8 @@ namespace pvAccess {
bool BlockingTCPAcceptor::validateConnection(Transport::shared_pointer const & transport, const char* address) {
try {
transport->verify(0);
// TODO constant
transport->verify(5000);
return true;
} catch(...) {
LOG(logLevelDebug, "Validation of %s failed.", address);

View File

@@ -115,7 +115,7 @@ namespace epics {
return true;
}
virtual void verified() {
virtual void verified(epics::pvData::Status const & /*status*/) {
// noop
}

View File

@@ -1269,6 +1269,35 @@ namespace epics {
}
}
bool BlockingTCPTransportCodec::verify(epics::pvData::int32 timeoutMs) {
return _verifiedEvent.wait(timeoutMs/1000.0);
}
void BlockingTCPTransportCodec::verified(epics::pvData::Status const & status) {
epics::pvData::Lock lock(_verifiedMutex);
if (IS_LOGGABLE(logLevelDebug) && !status.isOK())
{
char ipAddrStr[48];
ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
LOG(logLevelDebug, "Failed to verify connection to %s: %s.", ipAddrStr, status.getMessage().c_str());
// TODO stack dump
}
_verified = status.isSuccess();
_verifiedEvent.signal();
}
BlockingServerTCPTransportCodec::BlockingServerTCPTransportCodec(
Context::shared_pointer const & context,
SOCKET channel,
@@ -1277,7 +1306,7 @@ namespace epics {
int32_t receiveBufferSize) :
BlockingTCPTransportCodec(context, channel, responseHandler,
sendBufferSize, receiveBufferSize, PVA_DEFAULT_PRIORITY),
_lastChannelSID(0)
_lastChannelSID(0), _verifyOrVerified(false)
{
// NOTE: priority not yet known, default priority is used to
@@ -1343,33 +1372,59 @@ namespace epics {
void BlockingServerTCPTransportCodec::send(ByteBuffer* buffer,
TransportSendControl* control) {
//
// set byte order control message
//
if (!_verifyOrVerified)
{
_verifyOrVerified = true;
ensureBuffer(PVA_MESSAGE_HEADER_SIZE);
buffer->putByte(PVA_MAGIC);
buffer->putByte(PVA_VERSION);
buffer->putByte(
0x01 | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG)
? 0x80 : 0x00)); // control + big endian
buffer->putByte(2); // set byte order
buffer->putInt(0);
//
// set byte order control message
//
ensureBuffer(PVA_MESSAGE_HEADER_SIZE);
buffer->putByte(PVA_MAGIC);
buffer->putByte(PVA_VERSION);
buffer->putByte(
0x01 | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG)
? 0x80 : 0x00)); // control + big endian
buffer->putByte(2); // set byte order
buffer->putInt(0);
//
// send verification message
//
control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32));
//
// send verification message
//
control->startMessage(CMD_CONNECTION_VALIDATION, 4+2);
// receive buffer size
buffer->putInt(static_cast<int32>(getReceiveBufferSize()));
// receive buffer size
buffer->putInt(static_cast<int32>(getReceiveBufferSize()));
// socket receive buffer size
buffer->putInt(static_cast<int32>(getSocketReceiveBufferSize()));
// server introspection registy max size
// TODO
buffer->putShort(0x7FFF);
// send immediately
control->flush(true);
// list of authNZ plugin names
// TODO
buffer->putByte(0);
// send immediately
control->flush(true);
}
else
{
//
// send verified message
//
control->startMessage(CMD_CONNECTION_VALIDATED, 0);
{
Lock lock(_verificationStatusMutex);
_verificationStatus.serialize(buffer, control);
}
// send immediately
control->flush(true);
}
}
void BlockingServerTCPTransportCodec::destroyAllChannels() {
@@ -1419,8 +1474,7 @@ namespace epics {
sendBufferSize, receiveBufferSize, priority),
_connectionTimeout(beaconInterval*1000),
_unresponsiveTransport(false),
_verifyOrEcho(true),
_verified(false)
_verifyOrEcho(true)
{
// initialize owners list, send queue
acquire(client);
@@ -1581,16 +1635,6 @@ namespace epics {
if(_unresponsiveTransport) responsiveTransport();
}
bool BlockingClientTCPTransportCodec::verify(epics::pvData::int32 timeoutMs) {
return _verifiedEvent.wait(timeoutMs/1000.0);
}
void BlockingClientTCPTransportCodec::verified() {
epics::pvData::Lock lock(_verifiedMutex);
_verified = true;
_verifiedEvent.signal();
}
void BlockingClientTCPTransportCodec::responsiveTransport() {
Lock lock(_mutex);
if(_unresponsiveTransport) {
@@ -1625,25 +1669,30 @@ namespace epics {
void BlockingClientTCPTransportCodec::send(ByteBuffer* buffer,
TransportSendControl* control) {
if(_verifyOrEcho) {
_verifyOrEcho = false;
/*
* send verification response message
*/
control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32)+sizeof(int16));
control->startMessage(CMD_CONNECTION_VALIDATION, 4+2+2);
// receive buffer size
buffer->putInt(static_cast<int32>(getReceiveBufferSize()));
// socket receive buffer size
buffer->putInt(static_cast<int32>(getSocketReceiveBufferSize()));
// max introspection registry size
// TODO
buffer->putShort(0x7FFF);
// connection priority
// QoS (aka connection priority)
buffer->putShort(getPriority());
// authNZ plugin name
// TODO
SerializeHelper::serializeString("", buffer, control);
// send immediately
control->flush(true);
_verifyOrEcho = false;
}
else {
control->startMessage(CMD_ECHO, 0);

View File

@@ -518,6 +518,10 @@ namespace epics {
start();
}
bool verify(epics::pvData::int32 timeoutMs);
void verified(epics::pvData::Status const & status);
protected:
BlockingTCPTransportCodec(
@@ -531,7 +535,8 @@ namespace epics {
BlockingSocketAbstractCodec(channel, sendBufferSize, receiveBufferSize),
_context(context), _responseHandler(responseHandler),
_remoteTransportReceiveBufferSize(MAX_TCP_RECV),
_remoteTransportRevision(0), _priority(priority)
_remoteTransportRevision(0), _priority(priority),
_verified(false)
{
}
@@ -548,6 +553,10 @@ namespace epics {
size_t _remoteTransportReceiveBufferSize;
epics::pvData::int8 _remoteTransportRevision;
epics::pvData::int16 _priority;
bool _verified;
epics::pvData::Mutex _verifiedMutex;
epics::pvData::Event _verifiedEvent;
};
@@ -622,14 +631,23 @@ namespace epics {
}
bool verify(epics::pvData::int32 timeoutMs) {
TransportSender::shared_pointer transportSender =
TransportSender::shared_pointer transportSender =
std::tr1::dynamic_pointer_cast<TransportSender>(shared_from_this());
enqueueSendRequest(transportSender);
verified();
return true;
bool verifiedStatus = BlockingTCPTransportCodec::verify(timeoutMs);
enqueueSendRequest(transportSender);
return verifiedStatus;
}
void verified() {
void verified(epics::pvData::Status const & status) {
_verificationStatusMutex.lock();
_verificationStatus = status;
_verificationStatusMutex.unlock();
BlockingTCPTransportCodec::verified(status);
}
void aliveNotification() {
@@ -660,6 +678,11 @@ namespace epics {
epics::pvData::Mutex _channelsMutex;
epics::pvData::Status _verificationStatus;
epics::pvData::Mutex _verificationStatusMutex;
bool _verifyOrVerified;
};
class BlockingClientTCPTransportCodec :
@@ -731,10 +754,6 @@ namespace epics {
// noop
}
bool verify(epics::pvData::int32 timeoutMs);
void verified();
void aliveNotification();
void send(epics::pvData::ByteBuffer* buffer,
@@ -786,13 +805,7 @@ namespace epics {
*/
void responsiveTransport();
epics::pvData::Mutex _mutex;
bool _verified;
epics::pvData::Mutex _verifiedMutex;
epics::pvData::Event _verifiedEvent;
};
}

View File

@@ -94,11 +94,11 @@ namespace epics {
CMD_ECHO = 2,
CMD_SEARCH = 3,
CMD_SEARCH_RESPONSE = 4,
CMD_INTROSPECTION_SEARCH = 5,
CMD_INTROSPECTION_SEARCH_RESPONSE = 6,
CMD_AUTHNZ = 5,
CMD_ACL_CHANGE = 6,
CMD_CREATE_CHANNEL = 7,
CMD_DESTROY_CHANNEL = 8,
CMD_RESERVED0 = 9,
CMD_CONNECTION_VALIDATED = 9,
CMD_GET = 10,
CMD_PUT = 11,
CMD_PUT_GET = 12,
@@ -263,8 +263,9 @@ namespace epics {
/**
* Notify transport that it is has been verified.
* @param status vefification status;
*/
virtual void verified() = 0;
virtual void verified(epics::pvData::Status const & status) = 0;
/**
* Waits (if needed) until transport is verified, i.e. verified() method is being called.

View File

@@ -228,14 +228,14 @@ namespace epics {
transport->ensureData(1);
int8 qos = payloadBuffer->getByte();
Status m_status;
m_status.deserialize(payloadBuffer, transport.get());
Status status;
status.deserialize(payloadBuffer, transport.get());
try
{
if (qos & QOS_INIT)
{
if (m_status.isSuccess())
if (status.isSuccess())
{
// once created set destroy flag
m_mutex.lock();
@@ -247,7 +247,7 @@ namespace epics {
// this is safe since at least caller owns it
m_thisPointer.reset();
initResponse(transport, version, payloadBuffer, qos, m_status);
initResponse(transport, version, payloadBuffer, qos, status);
}
else
{
@@ -261,7 +261,7 @@ namespace epics {
m_mutex.unlock();
}
normalResponse(transport, version, payloadBuffer, qos, m_status);
normalResponse(transport, version, payloadBuffer, qos, status);
if (destroyReq)
destroy();
@@ -2719,16 +2719,44 @@ namespace epics {
{
AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
transport->ensureData(8);
transport->setRemoteTransportReceiveBufferSize(payloadBuffer->getInt());
transport->setRemoteTransportSocketReceiveBufferSize(payloadBuffer->getInt());
transport->setRemoteRevision(version);
transport->ensureData(4+2);
transport->setRemoteTransportReceiveBufferSize(payloadBuffer->getInt());
// TODO
// TODO serverIntrospectionRegistryMaxSize
/*int serverIntrospectionRegistryMaxSize = */ payloadBuffer->getShort();
// TODO authNZ
size_t size = SerializeHelper::readSize(payloadBuffer, transport.get());
for (size_t i = 0; i < size; i++)
SerializeHelper::deserializeString(payloadBuffer, transport.get());
TransportSender::shared_pointer sender = dynamic_pointer_cast<TransportSender>(transport);
if (sender.get()) {
if (sender.get())
transport->enqueueSendRequest(sender);
}
transport->verified();
}
};
class ClientConnectionValidatedHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods {
public:
ClientConnectionValidatedHandler(ClientContextImpl::shared_pointer context) :
AbstractClientResponseHandler(context, "Connection validated")
{
}
virtual ~ClientConnectionValidatedHandler() {
}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, int8 version, int8 command,
size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer)
{
AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
Status status;
status.deserialize(payloadBuffer, transport.get());
transport->verified(status);
}
};
@@ -2843,11 +2871,11 @@ namespace epics {
m_handlerTable[CMD_ECHO].reset(new NoopResponse(context, "Echo")); /* 2 */
m_handlerTable[CMD_SEARCH].reset(new NoopResponse(context, "Search")); /* 3 */
m_handlerTable[CMD_SEARCH_RESPONSE].reset(new SearchResponseHandler(context)); /* 4 */
m_handlerTable[CMD_INTROSPECTION_SEARCH].reset(new NoopResponse(context, "Introspection search")); /* 5 */
m_handlerTable[CMD_INTROSPECTION_SEARCH_RESPONSE] = dataResponse; /* 6 - introspection search */
m_handlerTable[CMD_AUTHNZ].reset(new NoopResponse(context, "Introspection search")); /* 5 */
m_handlerTable[CMD_ACL_CHANGE] = dataResponse; /* 6 */
m_handlerTable[CMD_CREATE_CHANNEL].reset(new CreateChannelHandler(context)); /* 7 */
m_handlerTable[CMD_DESTROY_CHANNEL].reset(new NoopResponse(context, "Destroy channel")); /* 8 */ // TODO it might be useful to implement this...
m_handlerTable[CMD_RESERVED0] = badResponse; /* 9 */
m_handlerTable[CMD_CONNECTION_VALIDATED].reset(new ClientConnectionValidatedHandler(context)); /* 9 */
m_handlerTable[CMD_GET] = dataResponse; /* 10 - get response */
m_handlerTable[CMD_PUT] = dataResponse; /* 11 - put response */
m_handlerTable[CMD_PUT_GET] = dataResponse; /* 12 - put-get response */

View File

@@ -90,11 +90,11 @@ ServerResponseHandler::ServerResponseHandler(ServerContextImpl::shared_pointer c
m_handlerTable[CMD_ECHO].reset(new ServerEchoHandler(context)); /* 2 */
m_handlerTable[CMD_SEARCH].reset(new ServerSearchHandler(context)); /* 3 */
m_handlerTable[CMD_SEARCH_RESPONSE] = badResponse;
m_handlerTable[CMD_INTROSPECTION_SEARCH].reset(new ServerIntrospectionSearchHandler(context)); /* 5 */
m_handlerTable[CMD_INTROSPECTION_SEARCH_RESPONSE] = badResponse; /* 6 - introspection search */
m_handlerTable[CMD_AUTHNZ] = badResponse; /* 5 */
m_handlerTable[CMD_ACL_CHANGE] = badResponse; /* 6 - introspection search */
m_handlerTable[CMD_CREATE_CHANNEL].reset(new ServerCreateChannelHandler(context)); /* 7 */
m_handlerTable[CMD_DESTROY_CHANNEL].reset(new ServerDestroyChannelHandler(context)); /* 8 */
m_handlerTable[CMD_RESERVED0] = badResponse; /* 9 */
m_handlerTable[CMD_CONNECTION_VALIDATED] = badResponse; /* 9 */
m_handlerTable[CMD_GET].reset(new ServerGetHandler(context)); /* 10 - get response */
m_handlerTable[CMD_PUT].reset(new ServerPutHandler(context)); /* 11 - put response */
@@ -142,14 +142,19 @@ void ServerConnectionValidationHandler::handleResponse(
AbstractServerResponseHandler::handleResponse(responseFrom,
transport, version, command, payloadSize, payloadBuffer);
transport->ensureData(2*sizeof(int32)+sizeof(int16));
transport->setRemoteTransportReceiveBufferSize(
payloadBuffer->getInt());
transport->setRemoteTransportSocketReceiveBufferSize(
payloadBuffer->getInt());
transport->setRemoteRevision(version);
// TODO support priority !!!
//transport.setPriority(payloadBuffer.getShort());
transport->setRemoteRevision(version);
transport->ensureData(4+2+2);
transport->setRemoteTransportReceiveBufferSize(payloadBuffer->getInt());
// TODO clientIntrospectionRegistryMaxSize
/* int clientIntrospectionRegistryMaxSize = */ payloadBuffer->getShort();
// TODO connectionQoS
/* int16 connectionQoS = */ payloadBuffer->getShort();
// TODO authNZ
/*std::string authNZ = */ SerializeHelper::deserializeString(payloadBuffer, transport.get());
// TODO call this after authNZ has done their work
transport->verified(Status::Ok);
}
void ServerEchoHandler::handleResponse(osiSockAddr* responseFrom,
@@ -164,16 +169,6 @@ void ServerEchoHandler::handleResponse(osiSockAddr* responseFrom,
transport->enqueueSendRequest(echoReply);
}
void ServerIntrospectionSearchHandler::handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, int8 version, int8 command,
size_t payloadSize, ByteBuffer* payloadBuffer)
{
AbstractServerResponseHandler::handleResponse(responseFrom,
transport, version, command, payloadSize, payloadBuffer);
THROW_BASE_EXCEPTION("not implemented");
}
/****************************************************************************************/
std::string ServerSearchHandler::SUPPORTED_PROTOCOL = "tcp";

View File

@@ -133,22 +133,6 @@ namespace pvAccess {
osiSockAddr _echoFrom;
};
/**
* Introspection search request handler.
*/
class ServerIntrospectionSearchHandler : public AbstractServerResponseHandler
{
public:
ServerIntrospectionSearchHandler(ServerContextImpl::shared_pointer const & context) :
AbstractServerResponseHandler(context, "Search request") {
}
virtual ~ServerIntrospectionSearchHandler() {}
virtual void handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, epics::pvData::int8 version, epics::pvData::int8 command,
std::size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer);
};
/****************************************************************************************/
/**
* Search channel request handler.

View File

@@ -352,7 +352,7 @@ namespace epics {
bool verify(epics::pvData::int32 timeoutMs) { return true;}
void verified() {}
void verified(epics::pvData::Status const &) {}
void aliveNotification() {}