client Channel search bypass

Bypass the broadcast search phase and directly connect
to a specific server and issue CMD_CREATE_CHANNEL.

Note, due to pvAccessCPP buggyness, this can only be relied
upon to connect with the unadvertised magic 'server' PV.
This commit is contained in:
Michael Davidsaver
2021-04-17 16:22:20 -07:00
parent 55f8b91ede
commit 5d3a21f030
6 changed files with 88 additions and 33 deletions
+54 -22
View File
@@ -127,7 +127,6 @@ void Channel::disconnect(const std::shared_ptr<Channel>& self)
{
self->state = Channel::Searching;
self->sid = 0xdeadbeef; // spoil
context->searchBuckets[context->currentBucket].push_back(self);
auto conns(connectors); // copy list
@@ -137,10 +136,25 @@ void Channel::disconnect(const std::shared_ptr<Channel>& self)
conn->_onDis();
}
log_debug_printf(io, "Server %s detach channel '%s' to re-search\n",
conn ? conn->peerName.c_str() : "<disconnected>",
self->name.c_str());
if(forcedServer.family()==AF_UNSPEC) {
context->searchBuckets[context->currentBucket].push_back(self);
log_debug_printf(io, "Server %s detach channel '%s' to re-search\n",
conn ? conn->peerName.c_str() : "<disconnected>",
self->name.c_str());
} else { // reconnect to specific server
// TODO: holdoff to prevent fast reconnect loop
self->conn = Connection::build(context, self->forcedServer);
self->conn->pending.push_back(self);
self->state = Connecting;
self->conn->createChannels();
}
}
Connect::~Connect() {}
@@ -186,7 +200,7 @@ std::shared_ptr<Connect> ConnectBuilder::exec()
context->tcp_loop.dispatch([op, context]() {
// on worker
op->chan = Channel::build(context, op->_name);
op->chan = Channel::build(context, op->_name, std::string());
bool cur = op->_connected = op->chan->state==Channel::Active;
if(cur && op->_onConn)
@@ -258,12 +272,20 @@ RequestInfo::RequestInfo(uint32_t sid, uint32_t ioid, std::shared_ptr<OperationB
,handle(handle)
{}
std::shared_ptr<Channel> Channel::build(const std::shared_ptr<ContextImpl>& context, const std::string& name)
std::shared_ptr<Channel> Channel::build(const std::shared_ptr<ContextImpl>& context,
const std::string& name,
const std::string& server)
{
SockAddr forceServer;
decltype (context->chanByName)::key_type namekey(name, server);
if(!server.empty()) {
forceServer.setAddress(server.c_str(), context->effective.tcp_port);
}
std::shared_ptr<Channel> chan;
auto it = context->chanByName.find(name);
auto it = context->chanByName.find(namekey);
if(it!=context->chanByName.end()) {
chan = it->second;
chan->garbage = false;
@@ -274,12 +296,25 @@ std::shared_ptr<Channel> Channel::build(const std::shared_ptr<ContextImpl>& cont
context->nextCID++;
chan = std::make_shared<Channel>(context, name, context->nextCID);
context->chanByCID[chan->cid] = chan;
context->chanByName[chan->name] = chan;
context->chanByName[namekey] = chan;
context->searchBuckets[context->currentBucket].push_back(chan);
if(server.empty()) {
context->searchBuckets[context->currentBucket].push_back(chan);
context->poke(true);
context->poke(true);
} else { // bypass search and connect so a specific server
chan->forcedServer = forceServer;
chan->conn = Connection::build(context, forceServer);
chan->conn->pending.push_back(chan);
chan->state = Connecting;
chan->conn->createChannels();
}
}
return chan;
@@ -481,7 +516,7 @@ void ContextImpl::startNS()
// start connections to name servers
for(auto& ns : nameServers) {
const auto& serv = ns.first;
connByAddr[serv] = ns.second = std::make_shared<Connection>(shared_from_this(), serv);
ns.second = Connection::build(shared_from_this(), serv);
ns.second->nameserver = true;
log_debug_printf(io, "Connecting to nameserver %s\n", ns.second->peerName.c_str());
}
@@ -631,10 +666,7 @@ void procSearchReply(ContextImpl& self, const SockAddr& src, Buffer& M, bool ist
chan->guid = guid;
chan->replyAddr = serv;
auto it = self.connByAddr.find(serv);
if(it==self.connByAddr.end() || !(chan->conn = it->second.lock())) {
self.connByAddr[serv] = chan->conn = std::make_shared<Connection>(self.shared_from_this(), serv);
}
chan->conn = Connection::build(self.shared_from_this(), serv);
chan->conn->pending.push_back(chan);
chan->state = Channel::Connecting;
@@ -963,7 +995,7 @@ void ContextImpl::onNSCheck()
if(ns.second && ns.second->bev) // connecting or connected
continue;
connByAddr[ns.first] = ns.second = std::make_shared<Connection>(shared_from_this(), ns.first);
ns.second = Connection::build(shared_from_this(), ns.first);
ns.second->nameserver = true;
log_debug_printf(io, "Reconnecting nameserver %s\n", ns.second->peerName.c_str());
}
@@ -980,16 +1012,16 @@ void ContextImpl::onNSCheckS(evutil_socket_t fd, short evt, void *raw)
void ContextImpl::cacheClean(const std::string& name)
{
std::set<std::string> trash;
std::set<decltype (chanByName)::key_type> trash;
for(auto& pair : chanByName) {
if(!name.empty() && pair.first!=name)
if(!name.empty() && pair.first.first!=name)
continue; // skip
if(pair.second.use_count()<=1) {
if(!pair.second->garbage) {
// mark for next sweep
log_debug_printf(setup, "Chan GC mark '%s'\n", pair.first.c_str());
log_debug_printf(setup, "Chan GC mark '%s':'%s'\n", pair.first.first.c_str(), pair.first.second.c_str());
pair.second->garbage = true;
} else {
@@ -1000,9 +1032,9 @@ void ContextImpl::cacheClean(const std::string& name)
}
// explicitly break ref. loop of channel cache
for(auto& name : trash) {
chanByName.erase(name);
log_debug_printf(setup, "Chan GC sweep '%s'\n", name.c_str());
for(auto& key : trash) {
chanByName.erase(key);
log_debug_printf(setup, "Chan GC sweep '%s':'%s'\n", key.first.c_str(), key.second.c_str());
}
}