start client

This commit is contained in:
Michael Davidsaver
2020-02-17 15:43:56 -08:00
parent 1aa1b56acb
commit 1edeab8a39
9 changed files with 1567 additions and 0 deletions
+4
View File
@@ -75,6 +75,10 @@ LIB_SRCS += servermon.cpp
LIB_SRCS += serversource.cpp
LIB_SRCS += sharedpv.cpp
LIB_SRCS += client.cpp
LIB_SRCS += clientconn.cpp
LIB_SRCS += clientintrospect.cpp
LIB_LIBS += Com
LIB_SYS_LIBS += event_core
+525
View File
@@ -0,0 +1,525 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvxs is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include <algorithm>
#include <set>
#include <osiSock.h>
#include <dbDefs.h>
#include <epicsThread.h>
#include <pvxs/log.h>
#include <clientimpl.h>
DEFINE_LOGGER(setup, "pvxs.client.setup");
DEFINE_LOGGER(io, "pvxs.client.io");
DEFINE_LOGGER(duppv, "pvxs.client.dup");
namespace pvxs {
namespace client {
constexpr timeval bucketInterval{1,0};
constexpr size_t nBuckets = 30u;
constexpr size_t maxSearchPayload = 0x4000;
Channel::Channel(const std::shared_ptr<Context::Pvt>& context, const std::string& name, uint32_t cid)
:context(context)
,name(name)
,cid(cid)
{}
Channel::~Channel()
{
context->chanByCID.erase(cid);
context->chanByName.erase(name);
// searchBuckets cleaned in tickSearch()
}
void Channel::createOperations()
{
if(state!=Channel::Active)
return;
auto todo = std::move(pending);
for(auto& wop : todo) {
auto op = wop.lock();
if(!op)
continue;
uint32_t ioid;
do {
ioid = conn->nextIOID++;
} while(conn->opByIOID.find(ioid)!=conn->opByIOID.end());
conn->opByIOID.insert(std::make_pair(ioid, RequestInfo(ioid, op)));
op->ioid = ioid;
op->createOp();
}
}
OperationBase::OperationBase(operation_t op, const std::shared_ptr<Channel>& chan)
:Operation(op)
,chan(chan)
{}
OperationBase::~OperationBase() {}
RequestInfo::RequestInfo(uint32_t ioid, std::shared_ptr<OperationBase>& handle)
:ioid(ioid)
,op(handle->op)
,handle(handle)
{}
std::shared_ptr<Channel> Channel::build(const std::shared_ptr<Context::Pvt>& context, const std::string& name)
{
std::shared_ptr<Channel> chan;
auto it = context->chanByName.find(name);
if(it!=context->chanByName.end()) {
chan = it->second.lock();
}
if(!chan) {
while(context->chanByCID.find(context->nextCID)!=context->chanByCID.end())
context->nextCID++;
chan = std::make_shared<Channel>(context, name, context->nextCID);
context->chanByCID[chan->cid] = chan;
context->chanByName[chan->name] = chan;
context->searchBuckets[context->currentBucket].push_back(chan);
}
return chan;
}
Operation::~Operation() {}
Subscription::~Subscription() {}
Context::Context(const Config& conf)
{
/* Here be dragons.
*
* We keep two different ref. counters.
* - "externel" counter which keeps a server running.
* - "internal" which only keeps server storage from being destroyed.
*
* External refs are held as Server::pvt. Internal refs are
* held by various in-progress operations (OpBase sub-classes)
* Which need to safely access server storage, but should not
* prevent a server from stopping.
*/
auto internal(std::make_shared<Pvt>(conf));
internal->internal_self = internal;
// external
pvt.reset(internal.get(), [internal](Pvt*) mutable {
internal->close();
internal.reset();
});
// we don't keep a weak_ptr to the external reference.
// Caller is entirely responsible for keeping this server running
}
Context::~Context() {}
const Config& Context::config() const
{
return pvt->effective;
}
void Context::poke()
{}
static
Value buildCAMethod()
{
using namespace pvxs::members;
return TypeDef(TypeCode::Struct, {
String("user"),
String("host"),
}).create();
}
Context::Pvt::Pvt(const Config& conf)
:effective(conf)
,caMethod(buildCAMethod())
,searchTx(AF_INET, SOCK_DGRAM, 0)
,tcp_loop("PVXCTCP", epicsThreadPriorityCAServerLow)
,searchRx(event_new(tcp_loop.base, searchTx.sock, EV_READ|EV_PERSIST, &Pvt::onSearchS, this))
,searchTimer(event_new(tcp_loop.base, -1, EV_TIMEOUT, &Pvt::tickSearchS, this))
{
effective.expand();
searchBuckets.resize(nBuckets);
if(effective.udp_port==0)
throw std::runtime_error("Client can't use UDP random port");
std::set<std::string> bcasts;
{
ELLLIST list = ELLLIST_INIT;
osiSockAddr any{};
osiSockDiscoverBroadcastAddresses(&list, searchTx.sock, &any);
while(ELLNODE *cur = ellGet(&list)) {
osiSockAddrNode *node = CONTAINER(cur, osiSockAddrNode, node);
SockAddr addr(&node->addr.sa, sizeof(node->addr));
addr.setPort(0u);
bcasts.insert(addr.tostring());
free(node);
}
}
{
osiSockAddr any{};
any.ia.sin_family = AF_INET;
if(bind(searchTx.sock, &any.sa, sizeof(any.ia)))
throw std::runtime_error("Unable to bind random UDP port");
socklen_t alen = sizeof(any);
if(getsockname(searchTx.sock, &any.sa, &alen))
throw std::runtime_error("Unable to readback random UDP port");
searchRxPort = ntohs(any.ia.sin_port);
log_debug_printf(setup, "Using UDP Rx port %u\n", searchRxPort);
}
{
int val = 1;
if(setsockopt(searchTx.sock, SOL_SOCKET, SO_BROADCAST, (char *)&val, sizeof(val)))
log_err_printf(setup, "Unable to setup beacon sender SO_BROADCAST: %d\n", SOCKERRNO);
}
for(auto& addr : effective.addressList) {
auto isbcast = bcasts.find(addr)!=bcasts.end();
SockAddr saddr(AF_INET);
try {
saddr.setAddress(addr.c_str());
}catch(std::runtime_error& e) {
log_err_printf(setup, "%s Ignoring...\n", e.what());
}
auto top = ntohl(saddr->in.sin_addr.s_addr)>>24u;
auto isucast = !isbcast && top<239 && top>224;
saddr.setPort(effective.udp_port);
searchDest.emplace_back(saddr, isucast);
}
// TODO: receive beacons
//auto manager = UDPManager::instance();
if(event_add(searchTimer.get(), &bucketInterval))
log_err_printf(setup, "Error enabling search timer\n%s", "");
if(event_add(searchRx.get(), nullptr))
log_err_printf(setup, "Error enabling search RX\n%s", "");
}
Context::Pvt::~Pvt() {}
void Context::Pvt::close()
{
// terminate all active connections
tcp_loop.call([this]() {
(void)event_del(searchTimer.get());
(void)event_del(searchRx.get());
decltype (connByAddr) conns(std::move(connByAddr));
for(auto& pair : conns) {
auto conn = pair.second.lock();
if(!conn)
continue;
conn->cleanup();
}
});
}
bool Context::Pvt::onSearch()
{
searchMsg.resize(0x10000);
SockAddr src;
osiSocklen_t alen = src.size();
const int nrx = recvfrom(searchTx.sock, (char*)&searchMsg[0], searchMsg.size()-1, 0, &src->sa, &alen);
if(nrx<0) {
int err = evutil_socket_geterror(searchTx.sock);
if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) {
// nothing to do here
} else {
log_warn_printf(io, "UDP search RX Error on : %s\n",
evutil_socket_error_to_string(err));
}
return false; // wait for more I/O
} else if(nrx<8) {
// maybe a zero (body) length packet?
// maybe an OS error?
log_info_printf(io, "UDP ignore runt%s\n", "");
return true;
} else if(searchMsg[0]!=0xca || searchMsg[1]==0 || (searchMsg[2]&(pva_flags::Control|pva_flags::SegMask))) {
// minimum header size is 8 bytes
// ID byte must by 0xCA (because PVA has some paternal envy)
// ignore incompatible version 0
// UDP packets can't contain control messages, or use segmentation
log_info_printf(io, "UDP ignore header%u %02x%02x%02x%02x\n",
unsigned(nrx), searchMsg[0], searchMsg[1], searchMsg[2], searchMsg[3]);
return true;
}
log_hex_printf(io, Level::Debug, &searchMsg[0], nrx, "UDP search Rx %d from %s\n", nrx, src.tostring().c_str());
bool be = searchMsg[2]&pva_flags::MSB;
FixedBuf M(be, searchMsg.data(), nrx);
const uint8_t cmd = M[3];
M.skip(4);
uint32_t len=0;
from_wire(M, len);
if(len > M.size() && M.good()) {
log_info_printf(io, "UDP ignore header%u %02x%02x%02x%02x\n",
unsigned(M.size()), M[0], M[1], M[2], M[3]);
return true;
}
if(cmd==CMD_SEARCH_RESPONSE) {
std::array<uint8_t, 12> guid;
SockAddr serv;
uint16_t port = 0;
uint8_t found = 0u;
_from_wire<12>(M, &guid[0], false);
// searchSequenceID
// we don't use this and instead rely on ID for individual PVs
M.skip(4u);
from_wire(M, serv);
if(serv.isAny())
serv = src;
from_wire(M, port);
serv.setPort(port);
if(M.size()<4u || M[0]!=3u || M[1]!='t' || M[2]!='c' || M[3]!='p')
return true;
M.skip(4u);
from_wire(M, found);
if(!found)
return true;
uint16_t nSearch = 0u;
from_wire(M, nSearch);
for(auto n : range(nSearch)) {
(void)n;
uint32_t id=0u;
from_wire(M, id);
if(!M.good())
break;
std::shared_ptr<Channel> chan;
{
auto it = chanByCID.find(id);
if(it==chanByCID.end())
continue;
chan = it->second.lock();
if(!chan)
continue;
}
log_debug_printf(io, "Search reply for %s\n", chan->name.c_str());
if(chan->state==Channel::Searching) {
chan->guid = guid;
chan->replyAddr = serv;
auto it = connByAddr.find(serv);
if(it==connByAddr.end() || !(chan->conn = it->second.lock())) {
connByAddr[serv] = chan->conn = std::make_shared<Connection>(internal_self.lock(), serv);
}
chan->conn->pending.push_back(chan);
chan->state = Channel::Connecting;
chan->conn->createChannels();
} else if(chan->guid!=guid) {
log_err_printf(duppv, "Duplicate PV name %s from %s and %s\n",
chan->name.c_str(),
chan->replyAddr.tostring().c_str(),
serv.tostring().c_str());
}
}
} else {
M.fault();
}
if(!M.good()) {
log_hex_printf(io, Level::Err, &searchMsg[0], nrx, "Invalid search reply %d from %s\n", nrx, src.tostring().c_str());
}
return true;
}
void Context::Pvt::onSearchS(evutil_socket_t fd, short evt, void *raw)
{
try {
log_debug_printf(io, "UDP search Rx event %x\n", evt);
if(!(evt&EV_READ))
return;
// handle up to 4 packets before going back to the reactor
for(unsigned i=0; i<4 && static_cast<Pvt*>(raw)->onSearch(); i++) {}
}catch(std::exception& e){
log_crit_printf(io, "Unhandled error in search Rx callback: %s\n", e.what());
}
}
void Context::Pvt::tickSearch()
{
auto idx = currentBucket;
currentBucket = (currentBucket+1u)%searchBuckets.size();
log_debug_printf(io, "Search tick %zu\n", idx);
decltype (searchBuckets)::value_type bucket;
searchBuckets[idx].swap(bucket);
while(!bucket.empty()) {
searchMsg.resize(0x10000);
FixedBuf M(true, searchMsg.data(), searchMsg.size());
M.skip(8); // fill in header after body length known
// searchSequenceID
// we don't use this and instead rely on IDs for individual PVs
to_wire(M, uint32_t(0x66696e64));
// flags and reserved.
// initially flags[7] is cleared (bcast)
auto pflags = M.save();
to_wire(M, uint32_t(0u));
// IN6ADDR_ANY_INIT
to_wire(M, uint32_t(0u));
to_wire(M, uint32_t(0u));
to_wire(M, uint32_t(0u));
to_wire(M, uint32_t(0u));
to_wire(M, uint16_t(searchRxPort));
to_wire(M, uint8_t(1u));
to_wire(M, "tcp");
// placeholder for channel count;
auto pcount = M.save();
uint16_t count = 0u;
M.skip(2u);
bool payload = false;
while(!bucket.empty()) {
auto chan = bucket.front().lock();
if(!chan || chan->state!=Channel::Searching) {
bucket.pop_front();
continue;
}
if(searchMsg.size()<=maxSearchPayload-(5+chan->name.size()))
break;
to_wire(M, uint32_t(chan->cid));
to_wire(M, chan->name);
count++;
auto ninc = chan->nSearch = std::min(searchBuckets.size(), chan->nSearch+1u);
auto next = (idx + ninc)%searchBuckets.size();
// TODO leveling with next+-1 buckets
auto& nextBucket = searchBuckets[next];
nextBucket.splice(nextBucket.end(),
bucket,
bucket.begin());
payload = true;
}
if(!payload)
break;
{
FixedBuf C(true, pcount, 2u);
to_wire(C, count);
}
auto consumed = M.save() - searchMsg.data();
{
FixedBuf H(true, searchMsg.data(), 8);
to_wire(H, Header{CMD_SEARCH, pva_flags::Server, uint32_t(consumed-8u)});
}
for(auto& pair : searchDest) {
// TODO: unicast/bcast
*pflags = pair.second ? 0x80 : 0x00;
int ntx = sendto(searchTx.sock, (char*)searchMsg.data(), consumed, 0, &pair.first->sa, pair.first.size());
if(ntx<0) {
int err = evutil_socket_geterror(searchTx.sock);
auto lvl = Level::Warn;
if(err==EINTR || err==EPERM)
lvl = Level::Debug;
log_printf(io, lvl, "Search tx error (%d) %s\n",
err, evutil_socket_error_to_string(err));
} else if(unsigned(ntx)<consumed) {
log_warn_printf(io, "Search truncated %u < %u",
unsigned(ntx), unsigned(consumed));
} else {
log_debug_printf(io, "Search to %s %s\n", pair.first.tostring().c_str(),
pair.second ? "ucast" : "bcast");
}
}
}
if(event_add(searchTimer.get(), &bucketInterval))
log_err_printf(setup, "Error re-enabling search timer on\n%s", "");
}
void Context::Pvt::tickSearchS(evutil_socket_t fd, short evt, void *raw)
{
try {
static_cast<Pvt*>(raw)->tickSearch();
}catch(std::exception& e){
log_crit_printf(io, "Unhandled error in search timer callback: %s\n", e.what());
}
}
} // namespace client
} // namespace pvxs
+371
View File
@@ -0,0 +1,371 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvxs is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include <osiProcess.h>
#include <pvxs/log.h>
#include "clientimpl.h"
namespace pvxs {
namespace client {
DEFINE_LOGGER(io, "pvxs.client.io");
Connection::Connection(const std::shared_ptr<Context::Pvt>& context, const SockAddr& peerAddr)
:ConnBase (true,
bufferevent_socket_new(context->tcp_loop.base, -1, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS),
peerAddr)
,context(context)
,echoTimer(event_new(context->tcp_loop.base, -1, EV_TIMEOUT|EV_PERSIST, &tickEchoS, this))
{
bufferevent_setcb(bev.get(), &bevReadS, nullptr, &bevEventS, this);
// shorter timeout until connect() ?
timeval timo = {30, 0};
bufferevent_set_timeouts(bev.get(), &timo, &timo);
if(bufferevent_socket_connect(bev.get(), &peerAddr->sa, peerAddr.size()))
throw std::runtime_error("Unable to begin connecting");
log_debug_printf(io, "Connecting to %s\n", peerName.c_str());
}
Connection::~Connection()
{
log_debug_printf(io, "Cleaning connection to %s\n", peerName.c_str());
cleanup();
}
void Connection::createChannels()
{
if(!ready)
return; // defer until CONNECTION_VALIDATED
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));
auto todo = std::move(pending);
for(auto& wchan : todo) {
auto chan = wchan.lock();
if(!chan)
continue;
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));
EvOutBuf R(hostBE, txBody.get());
to_wire(R, uint16_t(1u));
to_wire(R, chan->cid);
to_wire(R, chan->name);
}
enqueueTxBody(CMD_CREATE_CHANNEL);
creatingByCID[chan->cid] = chan;
chan->state = Channel::Creating;
log_debug_printf(io, "Server %s creating channel '%s' (%u)\n", peerName.c_str(),
chan->name.c_str(), unsigned(chan->cid));
}
}
void Connection::bevEvent(short events)
{
ConnBase::bevEvent(events);
if(bev && (events&BEV_EVENT_CONNECTED)) {
log_debug_printf(io, "Connected to %s\n", peerName.c_str());
auto tx = bufferevent_get_output(bev.get());
to_evbuf(tx, Header{pva_ctrl_msg::SetEndian,
pva_flags::Control,
0u},
hostBE);
if(bufferevent_enable(bev.get(), EV_READ|EV_WRITE))
throw std::logic_error("Unable to enable BEV");
// start echo timer
timeval interval{15, 0};
if(event_add(echoTimer.get(), &interval))
log_err_printf(io, "Server %s error starting echoTimer\n", peerName.c_str());
}
}
void Connection::cleanup()
{
// (maybe) keep myself alive
std::shared_ptr<Connection> self;
context->connByAddr.erase(peerAddr);
if(bev)
bev.reset();
if(event_del(echoTimer.get()))
log_err_printf(io, "Server %s error stopping echoTimer\n", peerName.c_str());
// return Channels to Searching state
for(auto& wchan : pending) {
auto chan = wchan.lock();
if(!chan)
continue;
chan->state = Channel::Searching;
chan->sid = 0xdeadbeef; // spoil
self = std::move(chan->conn);
context->searchBuckets[context->currentBucket].push_back(chan);
log_debug_printf(io, "Server %s detach channel '%s' to re-search\n", peerName.c_str(), chan->name.c_str());
}
// paranoia
pending.clear();
chanBySID.clear();
}
void Connection::handle_CONNECTION_VALIDATION()
{
log_debug_printf(io, "Server %s begins validation handshake\n", peerName.c_str());
EvInBuf M(peerBE, segBuf.get(), 16);
// unused
// serverReceiveBufferSize
// serverIntrospectionRegistryMaxSize
M.skip(4u + 2u);
Size nauth{};
from_wire(M, nauth);
std::string selected;
for(auto n : range(nauth.size)) {
(void)n;
std::string method;
from_wire(M, method);
if(method=="ca" || (method=="anonymous" && selected!="ca"))
selected = method;
}
if(!M.good()) {
log_err_printf(io, "Server %s sends invalid CONNECTION_VALIDATION. Disconnect...\n", peerName.c_str());
bev.reset();
return;
}
if(!selected.empty()) {
log_debug_printf(io, "Server %s selecting auth '%s'\n", peerName.c_str(), selected.c_str());
} else {
selected = "anonymous";
log_warn_printf(io, "Server %s no supported auth. try to force '%s'\n", peerName.c_str(), selected.c_str());
}
Value cred;
if(selected=="ca") {
cred = context->caMethod.cloneEmpty();
std::vector<char> buffer(256u);
if(osiGetUserName(&buffer[0], buffer.size()) == osiGetUserNameSuccess) {
buffer[buffer.size()-1] = '\0';
cred["user"] = buffer.data();
} else {
cred["user"] = "nobody";
}
if (gethostname(&buffer[0], buffer.size()) == 0) {
buffer[buffer.size()-1] = '\0';
cred["host"] = buffer.data();
} else {
cred["host"] = "invalidhost.";
}
log_info_printf(io, "Server %s 'ca' auth as %s@%s\n", peerName.c_str(),
cred["user"].as<std::string>().c_str(),
cred["host"].as<std::string>().c_str());
}
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));
EvOutBuf R(hostBE, txBody.get());
// serverReceiveBufferSize, not used
to_wire(R, uint32_t(0x10000));
// serverIntrospectionRegistryMaxSize, also not used
to_wire(R, uint16_t(0x7fff));
// QoS, not used (quality?)
to_wire(R, uint16_t(0));
to_wire(R, selected);
to_wire(R, Value::Helper::desc(cred));
if(cred)
to_wire_full(R, cred);
}
enqueueTxBody(CMD_CONNECTION_VALIDATION);
}
void Connection::handle_CONNECTION_VALIDATED()
{
EvInBuf M(peerBE, segBuf.get(), 16);
Status sts{};
from_wire(M, sts);
if(!M.good()) {
log_crit_printf(io, "Server %s sends invalid CONNECTION_VALIDATED. Disconnecting...\n", peerName.c_str());
bev.reset();
return;
} else if(!sts.isSuccess()) {
log_err_printf(io, "Server %s refuses auth. Trying to proceed w/o cred\n", peerName.c_str());
} else {
log_debug_printf(io, "Server %s accepts auth%s%s\n", peerName.c_str(),
sts.msg.empty() ? "" : " ", sts.msg.c_str());
}
ready = true;
createChannels();
}
void Connection::handle_CREATE_CHANNEL()
{
EvInBuf M(peerBE, segBuf.get(), 16);
uint32_t cid, sid;
Status sts{};
from_wire(M, cid);
from_wire(M, sid);
from_wire(M, sts);
// "spec" calls for uint16_t Access Rights here, but pvAccessCPP don't include this (it's useless anyway)
if(!M.good()) {
log_crit_printf(io, "Server %s sends invalid CREATE_CHANNEL. Disconnecting...\n", peerName.c_str());
bev.reset();
return;
}
std::shared_ptr<Channel> chan;
{
auto it = creatingByCID.find(cid);
if(it==creatingByCID.end() || !(chan = it->second.lock())) {
if(it!=creatingByCID.end())
creatingByCID.erase(it);
if(sts.isSuccess()) {
// we now have a channel which is no longer interesting.
log_debug_printf(io, "Server %s disposing of newly stale channel\n", peerName.c_str());
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));
EvOutBuf R(hostBE, txBody.get());
to_wire(R, sid);
to_wire(R, cid);
}
enqueueTxBody(CMD_DESTROY_CHANNEL);
}
return;
}
creatingByCID.erase(it);
}
if(!sts.isSuccess()) {
// server refuses to create a channel, but presumably responded positivly to search
chan->state = Channel::Searching;
context->searchBuckets[context->currentBucket].push_back(chan);
log_warn_printf(io, "Server %s refuses channel to '%s' : %s\n", peerName.c_str(),
chan->name.c_str(), sts.msg.c_str());
} else {
chan->state = Channel::Active;
chan->sid = sid;
chanBySID[sid] = chan;
log_debug_printf(io, "Server %s active channel to '%s' %u:%u\n", peerName.c_str(),
chan->name.c_str(), unsigned(chan->cid), unsigned(chan->sid));
chan->createOperations();
}
}
void Connection::handle_DESTROY_CHANNEL()
{
// (maybe) keep myself alive
std::shared_ptr<Connection> self;
EvInBuf M(peerBE, segBuf.get(), 16);
uint32_t cid, sid;
from_wire(M, sid);
from_wire(M, cid);
if(!M.good()) {
log_crit_printf(io, "Server %s sends invalid DESTROY_CHANNEL. Disconnecting...\n", peerName.c_str());
bev.reset();
return;
}
std::shared_ptr<Channel> chan;
{
auto it = chanBySID.find(sid);
if(it==chanBySID.end() || !(chan = it->second.lock())) {
log_debug_printf(io, "Server %s destroys non-existant channel %u:%u\n",
peerName.c_str(), unsigned(cid), unsigned(sid));
return;
}
}
chanBySID.erase(sid);
chan->state = Channel::Searching;
chan->sid = 0xdeadbeef; // spoil
self = std::move(chan->conn);
context->searchBuckets[context->currentBucket].push_back(chan);
log_debug_printf(io, "Server %s destroys channel '%s' %u:%u\n",
peerName.c_str(), chan->name.c_str(), unsigned(cid), unsigned(sid));
}
void Connection::tickEcho()
{
log_debug_printf(io, "Server %s ping\n", peerName.c_str());
if(!bev)
return;
auto tx = bufferevent_get_output(bev.get());
to_evbuf(tx, Header{CMD_ECHO, 0u, 0u}, hostBE);
// maybe help reduce latency
bufferevent_flush(bev.get(), EV_WRITE, BEV_FLUSH);
}
void Connection::tickEchoS(evutil_socket_t fd, short evt, void *raw)
{
try {
static_cast<Connection*>(raw)->tickEcho();
}catch(std::exception& e){
log_crit_printf(io, "Unhandled error in echo timer callback: %s\n", e.what());
}
}
} // namespace client
} // namespace pvxs
+170
View File
@@ -0,0 +1,170 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvxs is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#ifndef CLIENTIMPL_H
#define CLIENTIMPL_H
#include <list>
#include <pvxs/client.h>
#include "evhelper.h"
#include "dataimpl.h"
#include "utilpvt.h"
#include "udp_collector.h"
#include "conn.h"
namespace pvxs {
namespace client {
struct Channel;
// internal actions on an Operation
struct OperationBase : public Operation
{
const std::shared_ptr<Channel> chan;
uint32_t ioid;
OperationBase(operation_t op, const std::shared_ptr<Channel>& chan);
virtual ~OperationBase();
virtual void createOp() =0;
};
struct RequestInfo {
const uint32_t ioid;
const Operation::operation_t op;
const std::weak_ptr<OperationBase> handle;
Value prototype;
RequestInfo(uint32_t ioid, std::shared_ptr<OperationBase>& handle);
};
struct Connection : public ConnBase {
const std::shared_ptr<Context::Pvt> context;
const evevent echoTimer;
bool ready = false;
// channels to be created on this Connection
std::list<std::weak_ptr<Channel>> pending;
std::map<uint32_t, std::weak_ptr<Channel>> creatingByCID,
chanBySID;
std::map<uint32_t, RequestInfo> opByIOID;
uint32_t nextIOID = 0u;
Connection(const std::shared_ptr<Context::Pvt>& context, const SockAddr &peerAddr);
~Connection();
void createChannels();
virtual void bevEvent(short events) override final;
virtual void cleanup() override final;
#define CASE(Op) virtual void handle_##Op() override final;
CASE(CONNECTION_VALIDATION);
CASE(CONNECTION_VALIDATED);
CASE(CREATE_CHANNEL);
CASE(DESTROY_CHANNEL);
CASE(GET_FIELD);
#undef CASE
protected:
void tickEcho();
static void tickEchoS(evutil_socket_t fd, short evt, void *raw);
};
struct Channel {
const std::shared_ptr<Context::Pvt> context;
const std::string name;
// Our choosen ID for this channel.
// used as persistent CID and searchID
const uint32_t cid;
enum state_t {
Searching, // waiting for a server to claim
Connecting, // waiting for Connection to become ready
Creating, // waiting for reply to CREATE_CHANNEL
Active,
} state = Searching;
std::shared_ptr<Connection> conn;
uint32_t sid = 0u;
// when state==Searching, number of repeatitions
size_t nSearch = 0u;
// GUID of last positive reply when state!=Searching
std::array<uint8_t, 12> guid;
SockAddr replyAddr;
std::list<std::weak_ptr<OperationBase>> pending;
Channel(const std::shared_ptr<Context::Pvt>& context, const std::string& name, uint32_t cid);
~Channel();
void createOperations();
static
std::shared_ptr<Channel> build(const std::shared_ptr<Context::Pvt>& context, const std::string &name);
};
struct Context::Pvt
{
std::weak_ptr<Pvt> internal_self;
// "const" after ctor
Config effective;
const Value caMethod;
uint32_t nextCID=0u;
evsocket searchTx;
uint16_t searchRxPort;
std::vector<uint8_t> searchMsg;
// search destination address and whether to set the unicast flag
std::vector<std::pair<SockAddr, bool>> searchDest;
size_t currentBucket = 0u;
std::vector<std::list<std::weak_ptr<Channel>>> searchBuckets;
std::list<std::unique_ptr<UDPListener> > beaconRx;
std::map<uint32_t, std::weak_ptr<Channel>> chanByCID;
std::map<std::string, std::weak_ptr<Channel>> chanByName;
std::map<SockAddr, std::weak_ptr<Connection>> connByAddr;
evbase tcp_loop;
const evevent searchRx;
const evevent searchTimer;
Pvt(const Config& conf);
~Pvt();
void close();
bool onSearch();
static void onSearchS(evutil_socket_t fd, short evt, void *raw);
void tickSearch();
static void tickSearchS(evutil_socket_t fd, short evt, void *raw);
};
} // namespace client
} // namespace pvxs
#endif // CLIENTIMPL_H
+140
View File
@@ -0,0 +1,140 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvxs is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include <epicsAssert.h>
#include <pvxs/log.h>
#include "clientimpl.h"
namespace pvxs {
namespace client {
DEFINE_LOGGER(io, "pvxs.client.io");
namespace {
struct InfoOp : public OperationBase
{
std::function<void(Value&&)> done;
Value result;
enum state_t {
Connecting,
Waiting,
Done,
} state = Connecting;
explicit InfoOp(const std::shared_ptr<Channel>& chan)
:OperationBase(Info, chan)
{}
virtual ~InfoOp()
{
cancel();
}
virtual void cancel() override final {}
virtual void createOp() override final
{
assert(state==Connecting);
auto& conn = chan->conn;
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));
EvOutBuf R(hostBE, conn->txBody.get());
to_wire(R, chan->sid);
to_wire(R, ioid);
// sub-field, which no one knows how to use...
to_wire(R, "");
}
conn->enqueueTxBody(CMD_GET_FIELD);
log_debug_printf(io, "Server %s channel '%s' GET_INFO\n", conn->peerName.c_str(), chan->name.c_str());
state = Waiting;
}
};
} // namespace
void Connection::handle_GET_FIELD()
{
EvInBuf M(peerBE, segBuf.get(), 16);
uint32_t ioid=0u;
Status sts;
Value prototype;
from_wire(M, ioid);
from_wire(M, sts);
from_wire_type(M, rxRegistry, prototype);
if(!M.good()) {
log_crit_printf(io, "Server %s sends invalid GET_FIELD. Disconnecting...\n", peerName.c_str());
bev.reset();
return;
}
std::shared_ptr<Operation> op;
{
auto it = opByIOID.find(ioid);
if(it==opByIOID.end()
|| !(op = it->second.handle.lock())
|| op->op!=Operation::Info) {
log_warn_printf(io, "Server %s sends stale GET_FIELD\n", peerName.c_str());
return;
}
opByIOID.erase(it);
}
auto info = static_cast<InfoOp*>(op.get());
if(info->state!=InfoOp::Waiting) {
log_warn_printf(io, "Server %s ignore second reply to GET_FIELD\n", peerName.c_str());
return;
}
log_debug_printf(io, "Server %s completes GET_FIELD.\n", peerName.c_str());
info->state = InfoOp::Done;
if(info->done) {
auto done = std::move(info->done);
done(std::move(prototype));
} else {
info->result = prototype;
}
}
std::shared_ptr<Operation> Context::GetBuilder::exec()
{
std::shared_ptr<Operation> ret;
if(_get)
throw std::runtime_error("Get Not Implemented");
pvt->tcp_loop.call([&ret, this]() {
auto chan = Channel::build(pvt, _name);
auto op = std::make_shared<InfoOp>(chan);
op->done = std::move(_result);
chan->pending.push_back(op);
chan->createOperations();
ret = op;
});
return ret;
}
} // namespace client
} // namespace pvxs
+70
View File
@@ -15,6 +15,7 @@
#include <pvxs/log.h>
#include "serverconn.h"
#include "clientimpl.h"
#include "evhelper.h"
DEFINE_LOGGER(serversetup, "pvxs.server.setup");
@@ -215,5 +216,74 @@ std::ostream& operator<<(std::ostream& strm, const Config& conf)
return strm;
}
} // namespace server
namespace client {
Config Config::from_env()
{
Config ret;
const char* name;
if(const char *env = pickenv(&name, {"EPICS_PVA_ADDR_LIST"})) {
split_addr_into(name, ret.addressList, env);
}
if(const char *env = pickenv(&name, {"EPICS_PVA_AUTO_ADDR_LIST"})) {
if(epicsStrCaseCmp(env, "YES")==0) {
ret.autoAddrList = true;
} else if(epicsStrCaseCmp(env, "NO")==0) {
ret.autoAddrList = false;
} else {
log_err_printf(serversetup, "%s invalid bool value (YES/NO)", name);
}
}
if(const char *env = pickenv(&name, {"EPICS_PVA_BROADCAST_PORT"})) {
try {
ret.udp_port = lexical_cast<unsigned short>(env);
}catch(std::exception& e) {
log_err_printf(serversetup, "%s invalid integer : %s", name, e.what());
}
}
return ret;
}
void Config::expand()
{
if(autoAddrList) {
std::vector<std::string> all({"0.0.0.0"});
expandAddrList(all, addressList);
autoAddrList = false;
}
removeDups(addressList);
}
std::ostream& operator<<(std::ostream& strm, const Config& conf)
{
bool first;
strm<<"EPICS_PVA_ADDR_LIST=\"";
first = true;
for(auto& iface : conf.addressList) {
if(first)
first = false;
else
strm<<' ';
strm<<iface;
}
strm<<"\"\n";
strm<<"EPICS_PVA_AUTO_ADDR_LIST="<<(conf.autoAddrList?"YES":"NO")<<'\n';
strm<<"EPICS_PVA_BROADCAST_PORT="<<conf.udp_port<<'\n';
return strm;
}
} // namespace client
} // namespace pvxs
+203
View File
@@ -0,0 +1,203 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvxs is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#ifndef PVXS_CLIENT_H
#define PVXS_CLIENT_H
#include <string>
#include <vector>
#include <memory>
#include <functional>
#include <ostream>
#include <typeinfo>
#include <pvxs/version.h>
#include <pvxs/data.h>
namespace pvxs {
namespace client {
class Context;
struct Config;
//! builder for pvRequest blob
struct PVXS_API Request {
Request& parse(const std::string& expr);
Request& field(const std::string& name);
private:
void _record(const std::string& name, const std::type_info& info, const void* val);
public:
template<typename T>
Request& record(const std::string& name, const T& v) {
_record(name, typeid(T), static_cast<const void*>(&v));
return *this;
}
inline explicit operator bool() const { return pvt.operator bool(); }
struct Pvt;
private:
std::shared_ptr<Pvt> pvt;
};
//! Handle for in-progress operation
struct PVXS_API Operation {
const enum operation_t {
Info,
Get,
Put,
RPC,
Monitor,
} op;
explicit constexpr Operation(operation_t op) :op(op) {}
Operation(const Operation&) = delete;
Operation& operator=(const Operation&) = delete;
virtual ~Operation() =0;
virtual void cancel() =0;
};
//! Handle for monitor subscription
struct PVXS_API Subscription {
enum Event {
Error,
Disconnect,
NotEmpty,
};
virtual ~Subscription() =0;
virtual void cancel() =0;
};
class PVXS_API Context {
public:
struct Pvt;
//! An empty/dummy Server
constexpr Context() = default;
//! Create/allocate, but do not start, a new server with the provided config.
explicit Context(const Config &);
~Context();
//! effective config
const Config& config() const;
//! Request prompt search of any disconnected channels
void poke();
Request request() const;
template<typename SubBuilder>
class CommonBuilder {
protected:
std::shared_ptr<Pvt> pvt;
std::string _name;
Request _pvRequest;
std::string _server;
int _prio;
CommonBuilder(const std::shared_ptr<Pvt>& pvt, const std::string& name) : pvt(pvt), _name(name), _prio(0) {}
inline SubBuilder& _sb() { return static_cast<SubBuilder&>(*this); }
public:
SubBuilder& priority(int p) { _prio = p; return _sb(); }
SubBuilder& request(const Request& r) { _pvRequest = r; return _sb(); }
SubBuilder& server(const std::string& s) { _server = s; return _sb(); }
};
class GetBuilder : protected CommonBuilder<GetBuilder> {
std::function<void(Value&&)> _result;
bool _get;
public:
GetBuilder(const std::shared_ptr<Pvt>& pvt, const std::string& name, bool get) :CommonBuilder{pvt,name}, _get(get) {}
GetBuilder& result(decltype (_result)&& cb) { _result = std::move(cb); return *this; }
PVXS_API
std::shared_ptr<Operation> exec();
friend struct Context::Pvt;
};
GetBuilder get(const std::string& name) { return GetBuilder{pvt, name, true}; }
GetBuilder info(const std::string& name) { return GetBuilder{pvt, name, false}; }
struct PutBuilder : protected CommonBuilder<GetBuilder> {
bool _doGet = true;
std::function<Value(Value&&)> _builder;
std::function<void(Value&&)> _result;
public:
PutBuilder(const std::shared_ptr<Pvt>& pvt, const std::string& name) :CommonBuilder{pvt,name} {}
PVXS_API
std::shared_ptr<Operation> exec();
friend struct Context::Pvt;
};
PutBuilder put(const std::string& name) { return PutBuilder{pvt, name}; }
struct RPCBuilder : protected CommonBuilder<GetBuilder> {
Value _argument;
std::function<void(Value&&)> _result;
public:
RPCBuilder(const std::shared_ptr<Pvt>& pvt, const std::string& name, Value&& arg) :CommonBuilder{pvt,name}, _argument(std::move(arg)) {}
PVXS_API
std::shared_ptr<Operation> exec();
friend struct Context::Pvt;
};
RPCBuilder rpc(const std::string& name, Value&& arg) { return RPCBuilder{pvt, name, std::move(arg)}; }
struct MonitorBuilder : protected CommonBuilder<GetBuilder> {
std::function<void(const std::shared_ptr<Subscription>&, Subscription::Event)> _event;
public:
MonitorBuilder(const std::shared_ptr<Pvt>& pvt, const std::string& name) :CommonBuilder{pvt,name} {}
PVXS_API
std::shared_ptr<Subscription> exec();
friend struct Context::Pvt;
};
MonitorBuilder monitor(const std::string& name) { return MonitorBuilder{pvt, name}; }
explicit operator bool() const { return pvt.operator bool(); }
private:
std::shared_ptr<Pvt> pvt;
};
struct PVXS_API Config {
std::vector<std::string> addressList;
//! UDP port to bind. Default is 5076. May be zero, cf. Server::config() to find allocated port.
unsigned short udp_port;
//! Whether to populate the beacon address list automatically. (recommended)
bool autoAddrList;
//! Default configuration using process environment
static Config from_env();
//! Empty config
Config() :udp_port(5076), autoAddrList(true) {}
//! Apply rules to translate current requested configuration
//! into one which can actually be loaded.
//! @post autoAddrList==false
void expand();
inline
Context build() const {
Context ret(*this);
return ret;
}
};
PVXS_API
std::ostream& operator<<(std::ostream& strm, const Config& conf);
} // namespace client
} // namespace pvxs
#endif // PVXS_CLIENT_H
+3
View File
@@ -13,6 +13,9 @@ PROD_LIBS += pvxs Com
PROD += pvxvct
pvxvct_SRCS += pvxvct.cpp
PROD += pvxinfo
pvxinfo_SRCS += info.cpp
PROD_SYS_LIBS += event_core
PROD_SYS_LIBS_DEFAULT += event_pthreads
+81
View File
@@ -0,0 +1,81 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvxs is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include <iostream>
#include <list>
#include <epicsStdlib.h>
#include <epicsGetopt.h>
#include <epicsThread.h>
#include <pvxs/client.h>
#include <pvxs/log.h>
#include "utilpvt.h"
using namespace pvxs;
namespace {
void usage(const char* argv0)
{
std::cerr<<"Usage: "<<argv0<<" <opts> [pvname ...]\n";
}
}
int main(int argc, char *argv[])
{
logger_config_env(); // from $PVXS_LOG
double timeout = 5.0;
bool verbose = false;
{
int opt;
while ((opt = getopt(argc, argv, "hvdw:")) != -1) {
switch(opt) {
case 'h':
usage(argv[0]);
return 0;
case 'v':
verbose = true;
break;
case 'd':
logger_level_set("pvxs.*", Level::Debug);
break;
case 'w':
if(epicsParseDouble(optarg, &timeout, nullptr)) {
std::cerr<<"Invalid timeout value: "<<optarg<<"\n";
return 1;
}
break;
default:
usage(argv[0]);
std::cerr<<"\nUnknown argument: "<<char(opt)<<std::endl;
return 1;
}
}
}
auto ctxt = client::Config::from_env().build();
if(verbose)
std::cout<<"Effective config\n"<<ctxt.config();
std::list<std::shared_ptr<client::Operation>> ops;
for(auto n : range(optind, argc)) {
ops.push_back(ctxt.info(argv[n])
.result([&argv, n](Value&& prototype) {
std::cout<<argv[n]<<"\n"<<prototype;
})
.exec());
}
epicsThreadSleep(timeout);
return 0;
}