1212 lines
34 KiB
C++
1212 lines
34 KiB
C++
/**
|
|
* Copyright - See the COPYRIGHT that is included with this distribution.
|
|
* pvxs is distributed subject to a Software License Agreement found
|
|
* in file LICENSE that is included with this distribution.
|
|
*/
|
|
|
|
#include <algorithm>
|
|
#include <set>
|
|
#include <tuple>
|
|
|
|
#include <osiSock.h>
|
|
#include <dbDefs.h>
|
|
#include <epicsThread.h>
|
|
#include <epicsGuard.h>
|
|
|
|
#include <pvxs/log.h>
|
|
#include <clientimpl.h>
|
|
|
|
DEFINE_LOGGER(setup, "pvxs.client.setup");
|
|
DEFINE_LOGGER(io, "pvxs.client.io");
|
|
DEFINE_LOGGER(duppv, "pvxs.client.dup");
|
|
|
|
typedef epicsGuard<epicsMutex> Guard;
|
|
typedef epicsGuardRelease<epicsMutex> UnGuard;
|
|
|
|
namespace pvxs {
|
|
namespace client {
|
|
|
|
constexpr timeval bucketInterval{1,0};
|
|
constexpr size_t nBuckets = 30u;
|
|
|
|
// try not to fragment with usual MTU==1500
|
|
constexpr size_t maxSearchPayload = 1400;
|
|
|
|
constexpr timeval channelCacheCleanInterval{10,0};
|
|
|
|
constexpr timeval beaconCleanInterval{180, 0};
|
|
|
|
constexpr timeval tcpNSCheckInterval{10, 0};
|
|
|
|
// searchSequenceID in CMD_SEARCH is redundant.
|
|
// So we use a static value and instead rely on IDs for individual PVs
|
|
constexpr uint32_t search_seq{0x66696e64}; // "find"
|
|
|
|
Disconnect::Disconnect()
|
|
:std::runtime_error("Disconnected")
|
|
,time(epicsTime::getCurrent())
|
|
{}
|
|
|
|
Disconnect::~Disconnect() {}
|
|
|
|
RemoteError::RemoteError(const std::string& msg)
|
|
:std::runtime_error(msg)
|
|
{}
|
|
|
|
RemoteError::~RemoteError() {}
|
|
|
|
Finished::~Finished() {}
|
|
|
|
Connected::Connected(const std::string& peerName)
|
|
:std::runtime_error("Connected")
|
|
,peerName(peerName)
|
|
,time(epicsTime::getCurrent())
|
|
{}
|
|
|
|
Connected::~Connected() {}
|
|
|
|
Interrupted::Interrupted()
|
|
:std::runtime_error ("Interrupted")
|
|
{}
|
|
Interrupted::~Interrupted() {}
|
|
|
|
Timeout::Timeout()
|
|
:std::runtime_error ("Interrupted")
|
|
{}
|
|
Timeout::~Timeout() {}
|
|
|
|
Channel::Channel(const std::shared_ptr<ContextImpl>& context, const std::string& name, uint32_t cid)
|
|
:context(context)
|
|
,name(name)
|
|
,cid(cid)
|
|
{}
|
|
|
|
Channel::~Channel()
|
|
{
|
|
disconnect(nullptr);
|
|
}
|
|
|
|
void Channel::createOperations()
|
|
{
|
|
if(state!=Channel::Active)
|
|
return;
|
|
|
|
auto todo = std::move(pending);
|
|
|
|
for(auto& wop : todo) {
|
|
auto op = wop.lock();
|
|
if(!op)
|
|
continue;
|
|
|
|
uint32_t ioid;
|
|
do {
|
|
ioid = conn->nextIOID++;
|
|
} while(conn->opByIOID.find(ioid)!=conn->opByIOID.end());
|
|
|
|
//conn->opByIOID.insert(std::make_pair(ioid, RequestInfo(sid, ioid, op)));
|
|
auto pair = conn->opByIOID.emplace(std::piecewise_construct,
|
|
std::forward_as_tuple(ioid),
|
|
std::forward_as_tuple(sid, ioid, op));
|
|
opByIOID[ioid] = &pair.first->second;
|
|
|
|
op->ioid = ioid;
|
|
|
|
op->createOp();
|
|
}
|
|
}
|
|
|
|
// call on disconnect or CMD_DESTROY_CHANNEL
|
|
// detach from Connection and notify Connect and *Op
|
|
void Channel::disconnect(const std::shared_ptr<Channel>& self)
|
|
{
|
|
assert(!self || this==self.get());
|
|
auto current(std::move(conn));
|
|
|
|
switch(state) {
|
|
case Channel::Connecting:
|
|
current->pending.erase(cid);
|
|
break;
|
|
case Channel::Creating:
|
|
current->creatingByCID.erase(cid);
|
|
break;
|
|
case Channel::Active:
|
|
current->chanBySID.erase(sid);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
if((state==Creating || state==Active) && current && current->bev) {
|
|
{
|
|
(void)evbuffer_drain(current->txBody.get(), evbuffer_get_length(current->txBody.get()));
|
|
|
|
EvOutBuf R(current->sendBE, current->txBody.get());
|
|
|
|
to_wire(R, sid);
|
|
to_wire(R, cid);
|
|
}
|
|
statTx += current->enqueueTxBody(CMD_DESTROY_CHANNEL);
|
|
}
|
|
|
|
state = Channel::Searching;
|
|
sid = 0xdeadbeef; // spoil
|
|
|
|
auto conns(connectors); // copy list
|
|
|
|
for(auto& interested : conns) {
|
|
if(interested->_connected.exchange(false, std::memory_order_relaxed) && interested->_onDis)
|
|
interested->_onDis();
|
|
}
|
|
|
|
auto ops(std::move(opByIOID));
|
|
for(auto& pair : ops) {
|
|
auto op = pair.second->handle.lock();
|
|
current->opByIOID.erase(pair.first);
|
|
if(op)
|
|
op->disconnected(op);
|
|
}
|
|
|
|
if(!self) { // in ~Channel
|
|
// searchBuckets cleaned in tickSearch()
|
|
|
|
} else if(forcedServer.family()==AF_UNSPEC) { // begin search
|
|
|
|
context->searchBuckets[context->currentBucket].push_back(self);
|
|
|
|
log_debug_printf(io, "Server %s detach channel '%s' to re-search\n",
|
|
current ? current->peerName.c_str() : "<disconnected>",
|
|
name.c_str());
|
|
|
|
} else { // reconnect to specific server
|
|
// TODO: holdoff to prevent fast reconnect loop
|
|
|
|
conn = Connection::build(context, forcedServer);
|
|
|
|
conn->pending[cid] = self;
|
|
state = Connecting;
|
|
|
|
conn->createChannels();
|
|
|
|
}
|
|
}
|
|
|
|
Connect::~Connect() {}
|
|
|
|
ConnectImpl::~ConnectImpl() {}
|
|
|
|
const std::string& ConnectImpl::name() const
|
|
{
|
|
return _name;
|
|
}
|
|
bool ConnectImpl::connected() const
|
|
{
|
|
return _connected.load(std::memory_order_relaxed);
|
|
}
|
|
|
|
std::shared_ptr<Connect> ConnectBuilder::exec()
|
|
{
|
|
if(!ctx)
|
|
throw std::logic_error("NULL Builder");
|
|
|
|
auto syncCancel(_syncCancel);
|
|
auto context(ctx->impl->shared_from_this());
|
|
|
|
auto op(std::make_shared<ConnectImpl>(context->tcp_loop, _pvname));
|
|
op->_onConn = std::move(_onConn);
|
|
op->_onDis = std::move(_onDis);
|
|
|
|
std::shared_ptr<ConnectImpl> external(op.get(), [op, syncCancel](ConnectImpl*) mutable {
|
|
// from user thread
|
|
auto temp(std::move(op));
|
|
auto loop(temp->loop);
|
|
// std::bind for lack of c++14 generalized capture
|
|
// to move internal ref to worker for dtor
|
|
loop.tryInvoke(syncCancel, std::bind([](std::shared_ptr<ConnectImpl>& op) {
|
|
// on worker
|
|
|
|
// ordering of dispatch()/call() ensures creation before destruction
|
|
assert(op->chan);
|
|
op->chan->connectors.remove(op.get());
|
|
}, std::move(temp)));
|
|
});
|
|
|
|
context->tcp_loop.dispatch([op, context]() {
|
|
// on worker
|
|
|
|
op->chan = Channel::build(context, op->_name, std::string());
|
|
|
|
bool cur = op->_connected = op->chan->state==Channel::Active;
|
|
if(cur && op->_onConn)
|
|
op->_onConn();
|
|
else if(!cur && op->_onDis)
|
|
op->_onDis();
|
|
|
|
op->chan->connectors.push_back(op.get());
|
|
});
|
|
|
|
return external;
|
|
}
|
|
|
|
Value ResultWaiter::wait(double timeout)
|
|
{
|
|
Guard G(lock);
|
|
while(outcome==Busy) {
|
|
UnGuard U(G);
|
|
if(!notify.wait(timeout))
|
|
throw Timeout();
|
|
}
|
|
if(outcome==Done)
|
|
return result();
|
|
else
|
|
throw Interrupted();
|
|
}
|
|
|
|
void ResultWaiter::complete(Result&& result, bool interrupt)
|
|
{
|
|
{
|
|
Guard G(lock);
|
|
if(outcome!=Busy)
|
|
return;
|
|
this->result = std::move(result);
|
|
outcome = interrupt ? Abort : Done;
|
|
}
|
|
notify.signal();
|
|
}
|
|
|
|
OperationBase::OperationBase(operation_t op, const evbase& loop)
|
|
:Operation(op)
|
|
,loop(loop)
|
|
{}
|
|
|
|
OperationBase::~OperationBase() {}
|
|
|
|
const std::string& OperationBase::name()
|
|
{
|
|
return chan->name;
|
|
}
|
|
|
|
Value OperationBase::wait(double timeout)
|
|
{
|
|
if(!waiter)
|
|
throw std::logic_error("Operation has custom .result() callback");
|
|
return waiter->wait(timeout);
|
|
}
|
|
|
|
void OperationBase::interrupt()
|
|
{
|
|
if(waiter)
|
|
waiter->complete(Result(), true);
|
|
}
|
|
|
|
RequestInfo::RequestInfo(uint32_t sid, uint32_t ioid, std::shared_ptr<OperationBase>& handle)
|
|
:sid(sid)
|
|
,ioid(ioid)
|
|
,op(handle->op)
|
|
,handle(handle)
|
|
{}
|
|
|
|
std::shared_ptr<Channel> Channel::build(const std::shared_ptr<ContextImpl>& context,
|
|
const std::string& name,
|
|
const std::string& server)
|
|
{
|
|
SockAddr forceServer;
|
|
decltype (context->chanByName)::key_type namekey(name, server);
|
|
|
|
if(!server.empty()) {
|
|
forceServer.setAddress(server.c_str(), context->effective.tcp_port);
|
|
}
|
|
|
|
std::shared_ptr<Channel> chan;
|
|
|
|
auto it = context->chanByName.find(namekey);
|
|
if(it!=context->chanByName.end()) {
|
|
chan = it->second;
|
|
chan->garbage = false;
|
|
}
|
|
|
|
if(!chan) {
|
|
while(context->chanByCID.find(context->nextCID)!=context->chanByCID.end())
|
|
context->nextCID++;
|
|
|
|
chan = std::make_shared<Channel>(context, name, context->nextCID);
|
|
|
|
context->chanByCID[chan->cid] = chan;
|
|
context->chanByName[namekey] = chan;
|
|
|
|
if(server.empty()) {
|
|
context->searchBuckets[context->currentBucket].push_back(chan);
|
|
|
|
context->poke(true);
|
|
|
|
} else { // bypass search and connect so a specific server
|
|
chan->forcedServer = forceServer;
|
|
chan->conn = Connection::build(context, forceServer);
|
|
|
|
chan->conn->pending[chan->cid] = chan;
|
|
chan->state = Connecting;
|
|
|
|
chan->conn->createChannels();
|
|
|
|
}
|
|
}
|
|
|
|
return chan;
|
|
}
|
|
|
|
Operation::~Operation() {}
|
|
|
|
Subscription::~Subscription() {}
|
|
|
|
Context Context::fromEnv()
|
|
{
|
|
return Config::fromEnv().build();
|
|
}
|
|
|
|
Context::Context(const Config& conf)
|
|
:pvt(std::make_shared<Pvt>(conf))
|
|
{
|
|
pvt->impl->startNS();
|
|
}
|
|
|
|
Context::~Context() {}
|
|
|
|
const Config& Context::config() const
|
|
{
|
|
if(!pvt)
|
|
throw std::logic_error("NULL Context");
|
|
|
|
return pvt->impl->effective;
|
|
}
|
|
|
|
void Context::hurryUp()
|
|
{
|
|
if(!pvt)
|
|
throw std::logic_error("NULL Context");
|
|
|
|
pvt->impl->manager.loop().call([this](){
|
|
pvt->impl->poke(true);
|
|
});
|
|
}
|
|
|
|
void Context::cacheClear(const std::string& name, cacheAction action)
|
|
{
|
|
if(!pvt)
|
|
throw std::logic_error("NULL Context");
|
|
|
|
pvt->impl->tcp_loop.call([this, name, action](){
|
|
// run twice to ensure both mark and sweep of all unused channels
|
|
log_debug_printf(setup, "cacheClear('%s')\n", name.c_str());
|
|
pvt->impl->cacheClean(name, action);
|
|
pvt->impl->cacheClean(name, action);
|
|
});
|
|
}
|
|
|
|
void Context::ignoreServerGUIDs(const std::vector<ServerGUID>& guids)
|
|
{
|
|
if(!pvt)
|
|
throw std::logic_error("NULL Context");
|
|
|
|
pvt->impl->manager.loop().call([this, &guids](){
|
|
pvt->impl->ignoreServerGUIDs = guids;
|
|
});
|
|
}
|
|
|
|
Report Context::report(bool zero) const
|
|
{
|
|
Report ret;
|
|
|
|
pvt->impl->tcp_loop.call([this, &ret, zero](){
|
|
|
|
for(auto& pair : pvt->impl->connByAddr) {
|
|
auto conn = pair.second.lock();
|
|
if(!conn)
|
|
continue;
|
|
|
|
ret.connections.emplace_back();
|
|
auto& sconn = ret.connections.back();
|
|
sconn.peer = conn->peerName;
|
|
sconn.tx = conn->statTx;
|
|
sconn.rx = conn->statRx;
|
|
|
|
if(zero) {
|
|
conn->statTx = conn->statRx = 0u;
|
|
}
|
|
|
|
// omit stats for transitory conn->creatingByCID
|
|
|
|
for(auto& pair : conn->chanBySID) {
|
|
auto chan = pair.second.lock();
|
|
if(!chan)
|
|
continue;
|
|
|
|
sconn.channels.emplace_back();
|
|
auto& schan = sconn.channels.back();
|
|
schan.name = chan->name;
|
|
schan.tx = chan->statTx;
|
|
schan.rx = chan->statRx;
|
|
|
|
if(zero) {
|
|
chan->statTx = chan->statRx = 0u;
|
|
}
|
|
}
|
|
}
|
|
|
|
});
|
|
|
|
return ret;
|
|
}
|
|
|
|
static
|
|
Value buildCAMethod()
|
|
{
|
|
using namespace pvxs::members;
|
|
|
|
return TypeDef(TypeCode::Struct, {
|
|
String("user"),
|
|
String("host"),
|
|
}).create();
|
|
}
|
|
|
|
ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop)
|
|
:ifmap(IfaceMap::instance())
|
|
,effective(conf)
|
|
,caMethod(buildCAMethod())
|
|
,searchTx4(AF_INET, SOCK_DGRAM, 0)
|
|
,searchTx6(AF_INET6, SOCK_DGRAM, 0)
|
|
,tcp_loop(tcp_loop)
|
|
,searchRx4(event_new(tcp_loop.base, searchTx4.sock, EV_READ|EV_PERSIST, &ContextImpl::onSearchS, this))
|
|
,searchRx6(event_new(tcp_loop.base, searchTx6.sock, EV_READ|EV_PERSIST, &ContextImpl::onSearchS, this))
|
|
,searchTimer(event_new(tcp_loop.base, -1, EV_TIMEOUT, &ContextImpl::tickSearchS, this))
|
|
,manager(UDPManager::instance())
|
|
,beaconCleaner(event_new(manager.loop().base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::tickBeaconCleanS, this))
|
|
,cacheCleaner(event_new(tcp_loop.base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::cacheCleanS, this))
|
|
,nsChecker(event_new(tcp_loop.base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::onNSCheckS, this))
|
|
{
|
|
effective.expand();
|
|
|
|
searchBuckets.resize(nBuckets);
|
|
|
|
std::set<SockAddr, SockAddrOnlyLess> bcasts;
|
|
for(auto& addr : searchTx4.broadcasts()) {
|
|
addr.setPort(0u);
|
|
bcasts.insert(addr);
|
|
}
|
|
|
|
searchTx6.ipv6_only();
|
|
|
|
{
|
|
auto any(SockAddr::any(searchTx4.af));
|
|
if(bind(searchTx4.sock, &any->sa, any.size()))
|
|
throw std::runtime_error("Unable to bind random UDP port");
|
|
|
|
socklen_t alen = any.capacity();
|
|
if(getsockname(searchTx4.sock, &any->sa, &alen))
|
|
throw std::runtime_error("Unable to readback random UDP port");
|
|
|
|
searchRxPort = any.port();
|
|
|
|
log_debug_printf(setup, "Using UDP Rx port %u\n", searchRxPort);
|
|
}
|
|
{
|
|
auto any(SockAddr::any(searchTx6.af, searchRxPort));
|
|
if(bind(searchTx6.sock, &any->sa, any.size()))
|
|
throw std::runtime_error("Unable to bind random UDP6 port");
|
|
}
|
|
|
|
searchTx4.set_broadcast(true);
|
|
searchTx4.enable_SO_RXQ_OVFL();
|
|
searchTx6.enable_SO_RXQ_OVFL();
|
|
|
|
for(auto& addr : effective.addressList) {
|
|
SockEndpoint ep;
|
|
try {
|
|
ep = SockEndpoint(addr, effective.udp_port);
|
|
}catch(std::exception& e){
|
|
log_warn_printf(setup, "%s Ignoring malformed address %s\n", e.what(), addr.c_str());
|
|
continue;
|
|
}
|
|
assert(ep.addr.family()==AF_INET || ep.addr.family()==AF_INET6);
|
|
|
|
// if !bcast and !mcast
|
|
auto isucast = !ep.addr.isMCast();
|
|
|
|
if(isucast && ep.addr.family()==AF_INET && bcasts.find(ep.addr)!=bcasts.end())
|
|
isucast = false;
|
|
|
|
log_info_printf(io, "Searching to %s%s\n", std::string(SB()<<ep).c_str(), (isucast?" unicast":""));
|
|
searchDest.emplace_back(ep, isucast);
|
|
}
|
|
|
|
for(auto& addr : effective.nameServers) {
|
|
SockAddr saddr;
|
|
try {
|
|
saddr.setAddress(addr.c_str(), effective.tcp_port);
|
|
}catch(std::runtime_error& e) {
|
|
log_err_printf(setup, "%s Ignoring...\n", e.what());
|
|
}
|
|
|
|
log_info_printf(io, "Searching to TCP %s\n", saddr.tostring().c_str());
|
|
nameServers.emplace_back(saddr, nullptr);
|
|
}
|
|
|
|
const auto cb([this](const UDPManager::Beacon& msg) {
|
|
onBeacon(msg);
|
|
});
|
|
|
|
for(auto& iface : effective.interfaces) {
|
|
SockEndpoint addr(iface.c_str(), effective.udp_port);
|
|
beaconRx.push_back(manager.onBeacon(addr, cb));
|
|
log_info_printf(io, "Listening for beacons on %s\n", addr.addr.tostring().c_str());
|
|
|
|
if(addr.addr.family()==AF_INET && addr.addr.isAny()) {
|
|
// if listening on 0.0.0.0, also listen on [::]
|
|
auto any6(addr);
|
|
any6.addr = SockAddr::any(AF_INET6);
|
|
|
|
beaconRx.push_back(manager.onBeacon(any6, cb));
|
|
}
|
|
}
|
|
|
|
for(auto& listener : beaconRx) {
|
|
listener->start();
|
|
}
|
|
|
|
if(event_add(searchTimer.get(), &bucketInterval))
|
|
log_err_printf(setup, "Error enabling search timer\n%s", "");
|
|
if(event_add(searchRx4.get(), nullptr))
|
|
log_err_printf(setup, "Error enabling search RX4\n%s", "");
|
|
if(event_add(searchRx6.get(), nullptr))
|
|
log_err_printf(setup, "Error enabling search RX6\n%s", "");
|
|
if(event_add(beaconCleaner.get(), &beaconCleanInterval))
|
|
log_err_printf(setup, "Error enabling beacon clean timer on\n%s", "");
|
|
if(event_add(cacheCleaner.get(), &channelCacheCleanInterval))
|
|
log_err_printf(setup, "Error enabling channel cache clean timer on\n%s", "");
|
|
}
|
|
|
|
ContextImpl::~ContextImpl() {}
|
|
|
|
void ContextImpl::startNS()
|
|
{
|
|
if(nameServers.empty()) // vector size const after ctor, contents remain mutable
|
|
return;
|
|
|
|
tcp_loop.call([this]() {
|
|
// start connections to name servers
|
|
for(auto& ns : nameServers) {
|
|
const auto& serv = ns.first;
|
|
ns.second = Connection::build(shared_from_this(), serv);
|
|
ns.second->nameserver = true;
|
|
log_debug_printf(io, "Connecting to nameserver %s\n", ns.second->peerName.c_str());
|
|
}
|
|
|
|
if(event_add(nsChecker.get(), &tcpNSCheckInterval))
|
|
log_err_printf(setup, "Error enabling TCP search reconnect timer\n%s", "");
|
|
});
|
|
}
|
|
|
|
void ContextImpl::close()
|
|
{
|
|
// terminate all active connections
|
|
tcp_loop.call([this]() {
|
|
(void)event_del(searchTimer.get());
|
|
(void)event_del(searchRx4.get());
|
|
(void)event_del(searchRx6.get());
|
|
(void)event_del(beaconCleaner.get());
|
|
(void)event_del(cacheCleaner.get());
|
|
|
|
auto conns(std::move(connByAddr));
|
|
// explicitly break ref. loop of channel cache
|
|
auto chans(std::move(chanByName));
|
|
|
|
for(auto& pair : conns) {
|
|
auto conn = pair.second.lock();
|
|
if(!conn)
|
|
continue;
|
|
|
|
conn->cleanup();
|
|
}
|
|
|
|
conns.clear();
|
|
chans.clear();
|
|
|
|
// internal_self.use_count() may be >1 if
|
|
// we are orphaning some Operations
|
|
});
|
|
|
|
tcp_loop.sync();
|
|
|
|
// ensure any in-progress callbacks have completed
|
|
manager.sync();
|
|
}
|
|
|
|
void ContextImpl::poke(bool force)
|
|
{
|
|
{
|
|
Guard G(pokeLock);
|
|
if(poked)
|
|
return;
|
|
|
|
epicsTimeStamp now{};
|
|
|
|
double age = -1.0;
|
|
if(!force && (epicsTimeGetCurrent(&now) || (age=epicsTimeDiffInSeconds(&now, &lastPoke))<30.0)) {
|
|
log_debug_printf(setup, "Ignoring hurryUp() age=%.1f sec\n", age);
|
|
return;
|
|
}
|
|
lastPoke = now;
|
|
poked = true;
|
|
}
|
|
|
|
log_debug_printf(setup, "hurryUp()%s\n", "");
|
|
|
|
timeval immediate{0,0};
|
|
if(event_add(searchTimer.get(), &immediate))
|
|
throw std::runtime_error("Unable to schedule searchTimer");
|
|
}
|
|
|
|
void ContextImpl::onBeacon(const UDPManager::Beacon& msg)
|
|
{
|
|
const auto& guid = msg.guid;
|
|
|
|
epicsTimeStamp now;
|
|
epicsTimeGetCurrent(&now);
|
|
|
|
bool newserv;
|
|
{
|
|
Guard G(pokeLock);
|
|
|
|
auto& lastRx(beaconTrack[msg.guid][std::make_pair(msg.proto, msg.server)]);
|
|
if((newserv = !lastRx)) {
|
|
serverEvent(Discovered{Discovered::Online,
|
|
msg.peerVersion,
|
|
msg.src.tostring(),
|
|
msg.proto,
|
|
msg.server.tostring(),
|
|
msg.guid,
|
|
now
|
|
});
|
|
}
|
|
lastRx.peerVersion = msg.peerVersion;
|
|
lastRx.time = now;
|
|
}
|
|
|
|
if(newserv) {
|
|
log_debug_printf(io, "%s\n",
|
|
std::string(SB()<<msg.src<<" New server "<<guid<<' '<<msg.server).c_str());
|
|
|
|
poke(false);
|
|
}
|
|
}
|
|
|
|
static
|
|
void procSearchReply(ContextImpl& self, const SockAddr& src, uint8_t peerVersion, Buffer& M, bool istcp)
|
|
{
|
|
ServerGUID guid;
|
|
SockAddr serv;
|
|
uint16_t port = 0;
|
|
uint8_t found = 0u;
|
|
uint32_t seq = 0u;
|
|
|
|
_from_wire<12>(M, &guid[0], false, __FILE__, __LINE__);
|
|
// searchSequenceID
|
|
// we don't use this for normal search and instead rely on ID for individual PVs
|
|
from_wire(M, seq);
|
|
|
|
from_wire(M, serv);
|
|
if(serv.isAny())
|
|
serv = src;
|
|
from_wire(M, port);
|
|
if(istcp && port==0)
|
|
port = src.port();
|
|
serv.setPort(port);
|
|
|
|
std::string proto;
|
|
from_wire(M, proto);
|
|
from_wire(M, found);
|
|
|
|
uint16_t nSearch = 0u;
|
|
from_wire(M, nSearch);
|
|
|
|
if(M.good()) {
|
|
for(const ServerGUID& ignore : self.ignoreServerGUIDs) {
|
|
if(guid==ignore) {
|
|
log_info_printf(io, "Ignore reply from %s with %s\n",
|
|
src.tostring().c_str(), std::string(SB()<<guid).c_str());
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
if(M.good() && !istcp && seq==search_seq && nSearch==0u && !found && !self.discoverers.empty()) {
|
|
// a discovery pong, process this like a beacon
|
|
log_debug_printf(io, "Discover reply for %s\n", src.tostring().c_str());
|
|
|
|
UDPManager::Beacon fakebeacon{src};
|
|
fakebeacon.proto = proto;
|
|
fakebeacon.server = serv;
|
|
fakebeacon.guid = guid;
|
|
fakebeacon.peerVersion = peerVersion;
|
|
|
|
self.onBeacon(fakebeacon);
|
|
}
|
|
|
|
if(!found || proto!="tcp")
|
|
return;
|
|
|
|
for(auto n : range(nSearch)) {
|
|
(void)n;
|
|
|
|
uint32_t id=0u;
|
|
from_wire(M, id);
|
|
if(!M.good())
|
|
break;
|
|
|
|
std::shared_ptr<Channel> chan;
|
|
{
|
|
auto it = self.chanByCID.find(id);
|
|
if(it==self.chanByCID.end())
|
|
continue;
|
|
|
|
chan = it->second.lock();
|
|
if(!chan)
|
|
continue;
|
|
}
|
|
|
|
log_debug_printf(io, "Search reply for %s\n", chan->name.c_str());
|
|
|
|
if(chan->state==Channel::Searching) {
|
|
chan->guid = guid;
|
|
chan->replyAddr = serv;
|
|
|
|
chan->conn = Connection::build(self.shared_from_this(), serv);
|
|
|
|
chan->conn->pending[chan->cid] = chan;
|
|
chan->state = Channel::Connecting;
|
|
|
|
chan->conn->createChannels();
|
|
|
|
} else if(chan->guid!=guid) {
|
|
log_err_printf(duppv, "Duplicate PV name %s from %s and %s\n",
|
|
chan->name.c_str(),
|
|
chan->replyAddr.tostring().c_str(),
|
|
serv.tostring().c_str());
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
bool ContextImpl::onSearch(evutil_socket_t fd)
|
|
{
|
|
searchMsg.resize(0x10000);
|
|
SockAddr src;
|
|
|
|
recvfromx rx{fd, (char*)&searchMsg[0], searchMsg.size()-1, &src};
|
|
const int nrx = rx.call();
|
|
|
|
if(nrx>=0 && rx.ndrop!=0 && prevndrop!=rx.ndrop) {
|
|
log_debug_printf(io, "UDP search reply buffer overflow %u -> %u\n", unsigned(prevndrop), unsigned(rx.ndrop));
|
|
prevndrop = rx.ndrop;
|
|
}
|
|
|
|
if(nrx<0) {
|
|
int err = evutil_socket_geterror(fd);
|
|
if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) {
|
|
// nothing to do here
|
|
} else {
|
|
log_warn_printf(io, "UDP search RX Error on : %s\n",
|
|
evutil_socket_error_to_string(err));
|
|
}
|
|
return false; // wait for more I/O
|
|
|
|
}
|
|
|
|
FixedBuf M(true, searchMsg.data(), nrx);
|
|
Header head{};
|
|
from_wire(M, head); // overwrites M.be
|
|
|
|
if(!M.good() || (head.flags&(pva_flags::Control|pva_flags::SegMask))) {
|
|
// UDP packets can't contain control messages, or use segmentation
|
|
|
|
log_hex_printf(io, Level::Debug, &searchMsg[0], nrx, "Ignore UDP message from %s\n", src.tostring().c_str());
|
|
return true;
|
|
}
|
|
|
|
log_hex_printf(io, Level::Debug, &searchMsg[0], nrx, "UDP search Rx %d from %s\n", nrx, src.tostring().c_str());
|
|
|
|
if(head.len > M.size() && M.good()) {
|
|
log_info_printf(io, "UDP ignore header truncated%s", "\n");
|
|
return true;
|
|
}
|
|
|
|
if(head.cmd==CMD_SEARCH_RESPONSE) {
|
|
procSearchReply(*this, src, head.version, M, false);
|
|
|
|
} else {
|
|
M.fault(__FILE__, __LINE__);
|
|
}
|
|
|
|
if(!M.good()) {
|
|
log_hex_printf(io, Level::Err, &searchMsg[0], nrx,
|
|
"%s:%d Invalid search reply %d from %s\n",
|
|
M.file(), M.line(), nrx, src.tostring().c_str());
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void Connection::handle_SEARCH_RESPONSE()
|
|
{
|
|
EvInBuf M(peerBE, segBuf.get(), 16);
|
|
|
|
procSearchReply(*context, peerAddr, peerVersion, M, true);
|
|
|
|
if(!M.good()) {
|
|
log_crit_printf(io, "%s:%d Server %s sends invalid SEARCH_RESPONSE. Disconnecting...\n",
|
|
M.file(), M.line(), peerName.c_str());
|
|
bev.reset();
|
|
}
|
|
}
|
|
|
|
void ContextImpl::onSearchS(evutil_socket_t fd, short evt, void *raw)
|
|
{
|
|
try {
|
|
log_debug_printf(io, "UDP search Rx event %x\n", evt);
|
|
if(!(evt&EV_READ))
|
|
return;
|
|
|
|
// limit number of packets processed before going back to the reactor
|
|
unsigned i;
|
|
const unsigned limit = 40;
|
|
for(i=0; i<limit && static_cast<ContextImpl*>(raw)->onSearch(fd); i++) {}
|
|
log_debug_printf(io, "UDP search processed %u/%u\n", i, limit);
|
|
|
|
}catch(std::exception& e){
|
|
log_exc_printf(io, "Unhandled error in search Rx callback: %s\n", e.what());
|
|
}
|
|
}
|
|
|
|
void ContextImpl::tickSearch(bool discover)
|
|
{
|
|
// If !discover, then this is a discovery ping.
|
|
// these are really empty searches with must-reply set.
|
|
// So if !discover, then we should not be modifying any internal state
|
|
{
|
|
Guard G(pokeLock);
|
|
poked = false;
|
|
}
|
|
|
|
auto idx = currentBucket;
|
|
if(!discover)
|
|
currentBucket = (currentBucket+1u)%searchBuckets.size();
|
|
|
|
log_debug_printf(io, "Search tick %zu\n", idx);
|
|
|
|
decltype (searchBuckets)::value_type bucket;
|
|
if(!discover)
|
|
searchBuckets[idx].swap(bucket);
|
|
|
|
while(!bucket.empty() || discover) {
|
|
// when 'discover' we only loop once
|
|
|
|
searchMsg.resize(0x10000);
|
|
FixedBuf M(true, searchMsg.data(), searchMsg.size());
|
|
M.skip(8, __FILE__, __LINE__); // fill in header after body length known
|
|
|
|
// searchSequenceID
|
|
to_wire(M, search_seq);
|
|
|
|
// flags and reserved.
|
|
// initially flags[7] is cleared (bcast)
|
|
auto pflags = M.save();
|
|
to_wire(M, uint8_t(discover ? pva_search_flags::MustReply : 0u)); // must-reply to discovery, ignore regular negative search
|
|
to_wire(M, uint8_t(0u));
|
|
to_wire(M, uint16_t(0u));
|
|
|
|
// IN6ADDR_ANY_INIT
|
|
to_wire(M, uint32_t(0u));
|
|
to_wire(M, uint32_t(0u));
|
|
to_wire(M, uint32_t(0u));
|
|
to_wire(M, uint32_t(0u));
|
|
|
|
auto pport = M.save();
|
|
to_wire(M, uint16_t(searchRxPort));
|
|
|
|
if(discover) {
|
|
to_wire(M, uint8_t(0u));
|
|
|
|
} else {
|
|
to_wire(M, uint8_t(1u));
|
|
to_wire(M, "tcp");
|
|
}
|
|
|
|
// placeholder for channel count;
|
|
auto pcount = M.save();
|
|
uint16_t count = 0u;
|
|
M.skip(2u, __FILE__, __LINE__);
|
|
|
|
bool payload = false;
|
|
while(!bucket.empty()) {
|
|
assert(!discover);
|
|
|
|
auto chan = bucket.front().lock();
|
|
if(!chan || chan->state!=Channel::Searching) {
|
|
bucket.pop_front();
|
|
continue;
|
|
}
|
|
|
|
auto save = M.save();
|
|
to_wire(M, uint32_t(chan->cid));
|
|
to_wire(M, chan->name);
|
|
|
|
if(!M.good()) {
|
|
// some absurdly long PV name?
|
|
log_err_printf(io, "PV name exceeds search buffer: '%s'\n", chan->name.c_str());
|
|
// drop it on the floor
|
|
bucket.pop_front();
|
|
continue;
|
|
|
|
} else if(size_t(M.save() - searchMsg.data()) > maxSearchPayload) {
|
|
assert(payload); // must have something
|
|
// too large, defer
|
|
M.restore(save);
|
|
break;
|
|
}
|
|
|
|
count++;
|
|
|
|
auto ninc = chan->nSearch = std::min(searchBuckets.size(), chan->nSearch+1u);
|
|
auto next = (idx + ninc)%searchBuckets.size();
|
|
auto nextnext = (next + 1u)%searchBuckets.size();
|
|
|
|
// try to smooth out UDP bcast load by waiting one extra tick
|
|
{
|
|
auto nextN = searchBuckets[next].size();
|
|
auto nextnextN = searchBuckets[nextnext].size();
|
|
|
|
if(nextN > nextnextN && (nextN-nextnextN > 100u))
|
|
next = nextnext;
|
|
}
|
|
|
|
auto& nextBucket = searchBuckets[next];
|
|
|
|
nextBucket.splice(nextBucket.end(),
|
|
bucket,
|
|
bucket.begin());
|
|
payload = true;
|
|
}
|
|
assert(M.good());
|
|
|
|
if(!payload && !discover)
|
|
break;
|
|
|
|
{
|
|
FixedBuf C(true, pcount, 2u);
|
|
to_wire(C, count);
|
|
}
|
|
size_t consumed = M.save() - searchMsg.data();
|
|
{
|
|
FixedBuf H(true, searchMsg.data(), 8);
|
|
to_wire(H, Header{CMD_SEARCH, 0, uint32_t(consumed-8u)});
|
|
}
|
|
for(auto& pair : searchDest) {
|
|
auto& dest = pair.first.addr.family()==AF_INET ? searchTx4 : searchTx6;
|
|
|
|
if(pair.second) {
|
|
*pflags |= pva_search_flags::Unicast;
|
|
|
|
} else {
|
|
*pflags &= ~pva_search_flags::Unicast;
|
|
|
|
dest.mcast_prep_sendto(pair.first);
|
|
}
|
|
|
|
int ntx = sendto(dest.sock, (char*)searchMsg.data(), consumed, 0,
|
|
&pair.first.addr->sa, pair.first.addr.size());
|
|
|
|
if(ntx<0) {
|
|
int err = evutil_socket_geterror(dest.sock);
|
|
auto lvl = Level::Warn;
|
|
if(err==EINTR || err==EPERM)
|
|
lvl = Level::Debug;
|
|
log_printf(io, lvl, "Search tx %s error (%d) %s\n",
|
|
pair.first.addr.tostring().c_str(), err, evutil_socket_error_to_string(err));
|
|
|
|
} else if(unsigned(ntx)<consumed) {
|
|
log_warn_printf(io, "Search truncated %u < %u",
|
|
unsigned(ntx), unsigned(consumed));
|
|
|
|
} else {
|
|
log_hex_printf(io, Level::Debug, (char*)searchMsg.data(), consumed,
|
|
"Search to %s %s\n",
|
|
std::string(SB()<<pair.first).c_str(),
|
|
pair.second ? "ucast" : "bcast");
|
|
}
|
|
}
|
|
*pflags |= 0x80; // TCP search is always "unicast"
|
|
// TCP search replies should always come back on the same connection,
|
|
// so zero out the meaningless response port.
|
|
pport[0] = pport[1] = 0;
|
|
|
|
for(auto& pair : nameServers) {
|
|
auto& serv = pair.second;
|
|
|
|
if(!serv->ready || !serv->bev)
|
|
continue;
|
|
|
|
auto tx = bufferevent_get_output(serv->bev.get());
|
|
|
|
// arbitrarily skip searching if TX buffer is too full
|
|
// TODO: configure limit?
|
|
if(evbuffer_get_length(tx) > 64*1024u)
|
|
continue;
|
|
|
|
(void)evbuffer_add(tx, (char*)searchMsg.data(), consumed);
|
|
// fail silently, will retry
|
|
}
|
|
|
|
if(discover)
|
|
break;
|
|
}
|
|
|
|
if(event_add(searchTimer.get(), &bucketInterval))
|
|
log_err_printf(setup, "Error re-enabling search timer on\n%s", "");
|
|
}
|
|
|
|
void ContextImpl::tickSearchS(evutil_socket_t fd, short evt, void *raw)
|
|
{
|
|
try {
|
|
static_cast<ContextImpl*>(raw)->tickSearch(false);
|
|
}catch(std::exception& e){
|
|
log_exc_printf(io, "Unhandled error in search timer callback: %s\n", e.what());
|
|
}
|
|
}
|
|
|
|
void ContextImpl::tickBeaconClean()
|
|
{
|
|
epicsTimeStamp now;
|
|
epicsTimeGetCurrent(&now);
|
|
|
|
Guard G(pokeLock);
|
|
|
|
auto it = beaconTrack.begin();
|
|
while(it!=beaconTrack.end()) {
|
|
auto cur = it++; // [GUID, {[proto, EP], lastRx}]
|
|
|
|
auto it2 = cur->second.begin();
|
|
while(it2!=cur->second.end()) {
|
|
auto cur2 = it2++; // [[proto, EP], lastRx]
|
|
double age = epicsTimeDiffInSeconds(&now, &cur2->second.time);
|
|
|
|
if(age < -15.0 || age > 2*beaconCleanInterval.tv_sec) {
|
|
log_debug_printf(io, "%s\n",
|
|
std::string(SB()<<" Lost server "<<cur->first<<' '<<cur->first).c_str());
|
|
|
|
serverEvent(Discovered{Discovered::Timeout,
|
|
cur2->second.peerVersion,
|
|
"", // no associated Beacon
|
|
cur2->first.first,
|
|
cur2->first.second.tostring(),
|
|
cur->first,
|
|
now
|
|
});
|
|
|
|
cur->second.erase(cur2);
|
|
}
|
|
}
|
|
|
|
if(cur->second.empty()) {
|
|
beaconTrack.erase(cur);
|
|
}
|
|
}
|
|
}
|
|
|
|
void ContextImpl::tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw)
|
|
{
|
|
try {
|
|
static_cast<ContextImpl*>(raw)->tickBeaconClean();
|
|
}catch(std::exception& e){
|
|
log_exc_printf(io, "Unhandled error in beacon cleaner timer callback: %s\n", e.what());
|
|
}
|
|
}
|
|
|
|
void ContextImpl::onNSCheck()
|
|
{
|
|
for(auto& ns : nameServers) {
|
|
if(ns.second && ns.second->bev) // connecting or connected
|
|
continue;
|
|
|
|
ns.second = Connection::build(shared_from_this(), ns.first);
|
|
ns.second->nameserver = true;
|
|
log_debug_printf(io, "Reconnecting nameserver %s\n", ns.second->peerName.c_str());
|
|
}
|
|
}
|
|
|
|
void ContextImpl::onNSCheckS(evutil_socket_t fd, short evt, void *raw)
|
|
{
|
|
try {
|
|
static_cast<ContextImpl*>(raw)->onNSCheck();
|
|
}catch(std::exception& e){
|
|
log_exc_printf(io, "Unhandled error in TCP nameserver timer callback: %s\n", e.what());
|
|
}
|
|
}
|
|
|
|
void ContextImpl::cacheClean(const std::string& name, Context::cacheAction action)
|
|
{
|
|
auto next(chanByName.begin()),
|
|
end(chanByName.end());
|
|
|
|
while(next!=end) {
|
|
auto cur(next++);
|
|
|
|
if(!name.empty() && cur->first.first!=name)
|
|
continue;
|
|
|
|
else if(action!=Context::Clean || cur->second.use_count()<=1) {
|
|
cur->second->garbage = true;
|
|
|
|
if(action==Context::Clean && !cur->second->garbage) {
|
|
// mark for next sweep
|
|
log_debug_printf(setup, "Chan GC mark '%s':'%s'\n",
|
|
cur->first.first.c_str(), cur->first.second.c_str());
|
|
|
|
} else {
|
|
log_debug_printf(setup, "Chan GC sweep '%s':'%s'\n",
|
|
cur->first.first.c_str(), cur->first.second.c_str());
|
|
|
|
auto trash(std::move(cur->second));
|
|
|
|
// explicitly break ref. loop of channel cache
|
|
chanByName.erase(cur);
|
|
|
|
if(action==Context::Disconnect) {
|
|
trash->disconnect(trash);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void ContextImpl::cacheCleanS(evutil_socket_t fd, short evt, void *raw)
|
|
{
|
|
try {
|
|
static_cast<ContextImpl*>(raw)->cacheClean(std::string(), Context::Clean);
|
|
static_cast<ContextImpl*>(raw)->tickBeaconClean();
|
|
}catch(std::exception& e){
|
|
log_exc_printf(io, "Unhandled error in beacon cleaner timer callback: %s\n", e.what());
|
|
}
|
|
}
|
|
|
|
Context::Pvt::Pvt(const Config& conf)
|
|
:loop("PVXCTCP", epicsThreadPriorityCAServerLow)
|
|
,impl(std::make_shared<ContextImpl>(conf, loop.internal()))
|
|
{}
|
|
|
|
Context::Pvt::~Pvt()
|
|
{
|
|
impl->close();
|
|
}
|
|
|
|
} // namespace client
|
|
|
|
} // namespace pvxs
|