diff --git a/src/Makefile b/src/Makefile index 284c63f..eb0743e 100644 --- a/src/Makefile +++ b/src/Makefile @@ -75,6 +75,10 @@ LIB_SRCS += servermon.cpp LIB_SRCS += serversource.cpp LIB_SRCS += sharedpv.cpp +LIB_SRCS += client.cpp +LIB_SRCS += clientconn.cpp +LIB_SRCS += clientintrospect.cpp + LIB_LIBS += Com LIB_SYS_LIBS += event_core diff --git a/src/client.cpp b/src/client.cpp new file mode 100644 index 0000000..7f311c0 --- /dev/null +++ b/src/client.cpp @@ -0,0 +1,525 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include + +#include +#include +#include + +#include +#include + +DEFINE_LOGGER(setup, "pvxs.client.setup"); +DEFINE_LOGGER(io, "pvxs.client.io"); +DEFINE_LOGGER(duppv, "pvxs.client.dup"); + +namespace pvxs { +namespace client { + +constexpr timeval bucketInterval{1,0}; +constexpr size_t nBuckets = 30u; + +constexpr size_t maxSearchPayload = 0x4000; + +Channel::Channel(const std::shared_ptr& context, const std::string& name, uint32_t cid) + :context(context) + ,name(name) + ,cid(cid) +{} + +Channel::~Channel() +{ + context->chanByCID.erase(cid); + context->chanByName.erase(name); + // searchBuckets cleaned in tickSearch() +} + +void Channel::createOperations() +{ + if(state!=Channel::Active) + return; + + auto todo = std::move(pending); + + for(auto& wop : todo) { + auto op = wop.lock(); + if(!op) + continue; + + uint32_t ioid; + do { + ioid = conn->nextIOID++; + } while(conn->opByIOID.find(ioid)!=conn->opByIOID.end()); + + conn->opByIOID.insert(std::make_pair(ioid, RequestInfo(ioid, op))); + op->ioid = ioid; + + op->createOp(); + } +} + + +OperationBase::OperationBase(operation_t op, const std::shared_ptr& chan) + :Operation(op) + ,chan(chan) +{} + +OperationBase::~OperationBase() {} + +RequestInfo::RequestInfo(uint32_t ioid, std::shared_ptr& handle) + :ioid(ioid) + ,op(handle->op) + ,handle(handle) +{} + +std::shared_ptr Channel::build(const std::shared_ptr& context, const std::string& name) +{ + + std::shared_ptr chan; + + auto it = context->chanByName.find(name); + if(it!=context->chanByName.end()) { + chan = it->second.lock(); + } + + if(!chan) { + while(context->chanByCID.find(context->nextCID)!=context->chanByCID.end()) + context->nextCID++; + + chan = std::make_shared(context, name, context->nextCID); + context->chanByCID[chan->cid] = chan; + context->chanByName[chan->name] = chan; + + context->searchBuckets[context->currentBucket].push_back(chan); + } + + return chan; +} + +Operation::~Operation() {} + +Subscription::~Subscription() {} + +Context::Context(const Config& conf) +{ + /* Here be dragons. + * + * We keep two different ref. counters. + * - "externel" counter which keeps a server running. + * - "internal" which only keeps server storage from being destroyed. + * + * External refs are held as Server::pvt. Internal refs are + * held by various in-progress operations (OpBase sub-classes) + * Which need to safely access server storage, but should not + * prevent a server from stopping. + */ + auto internal(std::make_shared(conf)); + internal->internal_self = internal; + + // external + pvt.reset(internal.get(), [internal](Pvt*) mutable { + internal->close(); + internal.reset(); + }); + // we don't keep a weak_ptr to the external reference. + // Caller is entirely responsible for keeping this server running +} + +Context::~Context() {} + +const Config& Context::config() const +{ + return pvt->effective; +} + +void Context::poke() +{} + +static +Value buildCAMethod() +{ + using namespace pvxs::members; + + return TypeDef(TypeCode::Struct, { + String("user"), + String("host"), + }).create(); +} + +Context::Pvt::Pvt(const Config& conf) + :effective(conf) + ,caMethod(buildCAMethod()) + ,searchTx(AF_INET, SOCK_DGRAM, 0) + ,tcp_loop("PVXCTCP", epicsThreadPriorityCAServerLow) + ,searchRx(event_new(tcp_loop.base, searchTx.sock, EV_READ|EV_PERSIST, &Pvt::onSearchS, this)) + ,searchTimer(event_new(tcp_loop.base, -1, EV_TIMEOUT, &Pvt::tickSearchS, this)) +{ + effective.expand(); + + searchBuckets.resize(nBuckets); + + if(effective.udp_port==0) + throw std::runtime_error("Client can't use UDP random port"); + + std::set bcasts; + { + ELLLIST list = ELLLIST_INIT; + osiSockAddr any{}; + + osiSockDiscoverBroadcastAddresses(&list, searchTx.sock, &any); + + while(ELLNODE *cur = ellGet(&list)) { + osiSockAddrNode *node = CONTAINER(cur, osiSockAddrNode, node); + + SockAddr addr(&node->addr.sa, sizeof(node->addr)); + addr.setPort(0u); + bcasts.insert(addr.tostring()); + + free(node); + } + } + + { + osiSockAddr any{}; + any.ia.sin_family = AF_INET; + if(bind(searchTx.sock, &any.sa, sizeof(any.ia))) + throw std::runtime_error("Unable to bind random UDP port"); + + socklen_t alen = sizeof(any); + if(getsockname(searchTx.sock, &any.sa, &alen)) + throw std::runtime_error("Unable to readback random UDP port"); + + searchRxPort = ntohs(any.ia.sin_port); + + log_debug_printf(setup, "Using UDP Rx port %u\n", searchRxPort); + } + + { + int val = 1; + if(setsockopt(searchTx.sock, SOL_SOCKET, SO_BROADCAST, (char *)&val, sizeof(val))) + log_err_printf(setup, "Unable to setup beacon sender SO_BROADCAST: %d\n", SOCKERRNO); + } + + for(auto& addr : effective.addressList) { + auto isbcast = bcasts.find(addr)!=bcasts.end(); + SockAddr saddr(AF_INET); + try { + saddr.setAddress(addr.c_str()); + }catch(std::runtime_error& e) { + log_err_printf(setup, "%s Ignoring...\n", e.what()); + } + auto top = ntohl(saddr->in.sin_addr.s_addr)>>24u; + auto isucast = !isbcast && top<239 && top>224; + + saddr.setPort(effective.udp_port); + searchDest.emplace_back(saddr, isucast); + } + + // TODO: receive beacons + //auto manager = UDPManager::instance(); + + + if(event_add(searchTimer.get(), &bucketInterval)) + log_err_printf(setup, "Error enabling search timer\n%s", ""); + if(event_add(searchRx.get(), nullptr)) + log_err_printf(setup, "Error enabling search RX\n%s", ""); +} + +Context::Pvt::~Pvt() {} + +void Context::Pvt::close() +{ + // terminate all active connections + tcp_loop.call([this]() { + (void)event_del(searchTimer.get()); + (void)event_del(searchRx.get()); + + decltype (connByAddr) conns(std::move(connByAddr)); + + for(auto& pair : conns) { + auto conn = pair.second.lock(); + if(!conn) + continue; + + conn->cleanup(); + } + }); +} + +bool Context::Pvt::onSearch() +{ + searchMsg.resize(0x10000); + SockAddr src; + + osiSocklen_t alen = src.size(); + const int nrx = recvfrom(searchTx.sock, (char*)&searchMsg[0], searchMsg.size()-1, 0, &src->sa, &alen); + + if(nrx<0) { + int err = evutil_socket_geterror(searchTx.sock); + if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) { + // nothing to do here + } else { + log_warn_printf(io, "UDP search RX Error on : %s\n", + evutil_socket_error_to_string(err)); + } + return false; // wait for more I/O + + } else if(nrx<8) { + // maybe a zero (body) length packet? + // maybe an OS error? + + log_info_printf(io, "UDP ignore runt%s\n", ""); + return true; + + } else if(searchMsg[0]!=0xca || searchMsg[1]==0 || (searchMsg[2]&(pva_flags::Control|pva_flags::SegMask))) { + // minimum header size is 8 bytes + // ID byte must by 0xCA (because PVA has some paternal envy) + // ignore incompatible version 0 + // UDP packets can't contain control messages, or use segmentation + + log_info_printf(io, "UDP ignore header%u %02x%02x%02x%02x\n", + unsigned(nrx), searchMsg[0], searchMsg[1], searchMsg[2], searchMsg[3]); + return true; + } + + log_hex_printf(io, Level::Debug, &searchMsg[0], nrx, "UDP search Rx %d from %s\n", nrx, src.tostring().c_str()); + + bool be = searchMsg[2]&pva_flags::MSB; + + FixedBuf M(be, searchMsg.data(), nrx); + + const uint8_t cmd = M[3]; + M.skip(4); + + uint32_t len=0; + from_wire(M, len); + + if(len > M.size() && M.good()) { + log_info_printf(io, "UDP ignore header%u %02x%02x%02x%02x\n", + unsigned(M.size()), M[0], M[1], M[2], M[3]); + return true; + } + + if(cmd==CMD_SEARCH_RESPONSE) { + std::array guid; + SockAddr serv; + uint16_t port = 0; + uint8_t found = 0u; + + _from_wire<12>(M, &guid[0], false); + // searchSequenceID + // we don't use this and instead rely on ID for individual PVs + M.skip(4u); + + from_wire(M, serv); + if(serv.isAny()) + serv = src; + from_wire(M, port); + serv.setPort(port); + + if(M.size()<4u || M[0]!=3u || M[1]!='t' || M[2]!='c' || M[3]!='p') + return true; + M.skip(4u); + + from_wire(M, found); + if(!found) + return true; + + uint16_t nSearch = 0u; + from_wire(M, nSearch); + + for(auto n : range(nSearch)) { + (void)n; + + uint32_t id=0u; + from_wire(M, id); + if(!M.good()) + break; + + std::shared_ptr chan; + { + auto it = chanByCID.find(id); + if(it==chanByCID.end()) + continue; + + chan = it->second.lock(); + if(!chan) + continue; + } + + log_debug_printf(io, "Search reply for %s\n", chan->name.c_str()); + + if(chan->state==Channel::Searching) { + chan->guid = guid; + chan->replyAddr = serv; + + auto it = connByAddr.find(serv); + if(it==connByAddr.end() || !(chan->conn = it->second.lock())) { + connByAddr[serv] = chan->conn = std::make_shared(internal_self.lock(), serv); + } + + chan->conn->pending.push_back(chan); + chan->state = Channel::Connecting; + + chan->conn->createChannels(); + + } else if(chan->guid!=guid) { + log_err_printf(duppv, "Duplicate PV name %s from %s and %s\n", + chan->name.c_str(), + chan->replyAddr.tostring().c_str(), + serv.tostring().c_str()); + } + } + + } else { + M.fault(); + } + + if(!M.good()) { + log_hex_printf(io, Level::Err, &searchMsg[0], nrx, "Invalid search reply %d from %s\n", nrx, src.tostring().c_str()); + } + + return true; +} + +void Context::Pvt::onSearchS(evutil_socket_t fd, short evt, void *raw) +{ + try { + log_debug_printf(io, "UDP search Rx event %x\n", evt); + if(!(evt&EV_READ)) + return; + + // handle up to 4 packets before going back to the reactor + for(unsigned i=0; i<4 && static_cast(raw)->onSearch(); i++) {} + + }catch(std::exception& e){ + log_crit_printf(io, "Unhandled error in search Rx callback: %s\n", e.what()); + } +} + +void Context::Pvt::tickSearch() +{ + auto idx = currentBucket; + currentBucket = (currentBucket+1u)%searchBuckets.size(); + + log_debug_printf(io, "Search tick %zu\n", idx); + + decltype (searchBuckets)::value_type bucket; + searchBuckets[idx].swap(bucket); + + while(!bucket.empty()) { + searchMsg.resize(0x10000); + FixedBuf M(true, searchMsg.data(), searchMsg.size()); + M.skip(8); // fill in header after body length known + + // searchSequenceID + // we don't use this and instead rely on IDs for individual PVs + to_wire(M, uint32_t(0x66696e64)); + + // flags and reserved. + // initially flags[7] is cleared (bcast) + auto pflags = M.save(); + to_wire(M, uint32_t(0u)); + + // IN6ADDR_ANY_INIT + to_wire(M, uint32_t(0u)); + to_wire(M, uint32_t(0u)); + to_wire(M, uint32_t(0u)); + to_wire(M, uint32_t(0u)); + + to_wire(M, uint16_t(searchRxPort)); + + to_wire(M, uint8_t(1u)); + to_wire(M, "tcp"); + + // placeholder for channel count; + auto pcount = M.save(); + uint16_t count = 0u; + M.skip(2u); + + bool payload = false; + while(!bucket.empty()) { + auto chan = bucket.front().lock(); + if(!chan || chan->state!=Channel::Searching) { + bucket.pop_front(); + continue; + } + + if(searchMsg.size()<=maxSearchPayload-(5+chan->name.size())) + break; + + to_wire(M, uint32_t(chan->cid)); + to_wire(M, chan->name); + count++; + + auto ninc = chan->nSearch = std::min(searchBuckets.size(), chan->nSearch+1u); + auto next = (idx + ninc)%searchBuckets.size(); + + // TODO leveling with next+-1 buckets + + auto& nextBucket = searchBuckets[next]; + + nextBucket.splice(nextBucket.end(), + bucket, + bucket.begin()); + payload = true; + } + + if(!payload) + break; + + { + FixedBuf C(true, pcount, 2u); + to_wire(C, count); + } + auto consumed = M.save() - searchMsg.data(); + { + FixedBuf H(true, searchMsg.data(), 8); + to_wire(H, Header{CMD_SEARCH, pva_flags::Server, uint32_t(consumed-8u)}); + } + for(auto& pair : searchDest) { + // TODO: unicast/bcast + *pflags = pair.second ? 0x80 : 0x00; + + int ntx = sendto(searchTx.sock, (char*)searchMsg.data(), consumed, 0, &pair.first->sa, pair.first.size()); + + if(ntx<0) { + int err = evutil_socket_geterror(searchTx.sock); + auto lvl = Level::Warn; + if(err==EINTR || err==EPERM) + lvl = Level::Debug; + log_printf(io, lvl, "Search tx error (%d) %s\n", + err, evutil_socket_error_to_string(err)); + + } else if(unsigned(ntx)(raw)->tickSearch(); + }catch(std::exception& e){ + log_crit_printf(io, "Unhandled error in search timer callback: %s\n", e.what()); + } +} + +} // namespace client + +} // namespace pvxs diff --git a/src/clientconn.cpp b/src/clientconn.cpp new file mode 100644 index 0000000..8923dac --- /dev/null +++ b/src/clientconn.cpp @@ -0,0 +1,371 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include + +#include +#include "clientimpl.h" + +namespace pvxs { +namespace client { + +DEFINE_LOGGER(io, "pvxs.client.io"); + +Connection::Connection(const std::shared_ptr& context, const SockAddr& peerAddr) + :ConnBase (true, + bufferevent_socket_new(context->tcp_loop.base, -1, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS), + peerAddr) + ,context(context) + ,echoTimer(event_new(context->tcp_loop.base, -1, EV_TIMEOUT|EV_PERSIST, &tickEchoS, this)) +{ + bufferevent_setcb(bev.get(), &bevReadS, nullptr, &bevEventS, this); + + // shorter timeout until connect() ? + timeval timo = {30, 0}; + bufferevent_set_timeouts(bev.get(), &timo, &timo); + + if(bufferevent_socket_connect(bev.get(), &peerAddr->sa, peerAddr.size())) + throw std::runtime_error("Unable to begin connecting"); + + log_debug_printf(io, "Connecting to %s\n", peerName.c_str()); +} + +Connection::~Connection() +{ + log_debug_printf(io, "Cleaning connection to %s\n", peerName.c_str()); + cleanup(); +} + + +void Connection::createChannels() +{ + if(!ready) + return; // defer until CONNECTION_VALIDATED + + (void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get())); + + auto todo = std::move(pending); + + for(auto& wchan : todo) { + auto chan = wchan.lock(); + if(!chan) + continue; + + { + (void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get())); + + EvOutBuf R(hostBE, txBody.get()); + + to_wire(R, uint16_t(1u)); + to_wire(R, chan->cid); + to_wire(R, chan->name); + } + enqueueTxBody(CMD_CREATE_CHANNEL); + + creatingByCID[chan->cid] = chan; + chan->state = Channel::Creating; + + log_debug_printf(io, "Server %s creating channel '%s' (%u)\n", peerName.c_str(), + chan->name.c_str(), unsigned(chan->cid)); + } +} + +void Connection::bevEvent(short events) +{ + ConnBase::bevEvent(events); + + if(bev && (events&BEV_EVENT_CONNECTED)) { + log_debug_printf(io, "Connected to %s\n", peerName.c_str()); + + auto tx = bufferevent_get_output(bev.get()); + to_evbuf(tx, Header{pva_ctrl_msg::SetEndian, + pva_flags::Control, + 0u}, + hostBE); + + if(bufferevent_enable(bev.get(), EV_READ|EV_WRITE)) + throw std::logic_error("Unable to enable BEV"); + + // start echo timer + timeval interval{15, 0}; + if(event_add(echoTimer.get(), &interval)) + log_err_printf(io, "Server %s error starting echoTimer\n", peerName.c_str()); + } +} + +void Connection::cleanup() +{ + // (maybe) keep myself alive + std::shared_ptr self; + + context->connByAddr.erase(peerAddr); + + if(bev) + bev.reset(); + + if(event_del(echoTimer.get())) + log_err_printf(io, "Server %s error stopping echoTimer\n", peerName.c_str()); + + // return Channels to Searching state + for(auto& wchan : pending) { + auto chan = wchan.lock(); + if(!chan) + continue; + + chan->state = Channel::Searching; + chan->sid = 0xdeadbeef; // spoil + self = std::move(chan->conn); + context->searchBuckets[context->currentBucket].push_back(chan); + + log_debug_printf(io, "Server %s detach channel '%s' to re-search\n", peerName.c_str(), chan->name.c_str()); + } + + // paranoia + pending.clear(); + chanBySID.clear(); +} + +void Connection::handle_CONNECTION_VALIDATION() +{ + log_debug_printf(io, "Server %s begins validation handshake\n", peerName.c_str()); + + EvInBuf M(peerBE, segBuf.get(), 16); + + // unused + // serverReceiveBufferSize + // serverIntrospectionRegistryMaxSize + M.skip(4u + 2u); + + Size nauth{}; + from_wire(M, nauth); + + std::string selected; + + for(auto n : range(nauth.size)) { + (void)n; + + std::string method; + from_wire(M, method); + + if(method=="ca" || (method=="anonymous" && selected!="ca")) + selected = method; + } + + if(!M.good()) { + log_err_printf(io, "Server %s sends invalid CONNECTION_VALIDATION. Disconnect...\n", peerName.c_str()); + bev.reset(); + return; + } + + if(!selected.empty()) { + log_debug_printf(io, "Server %s selecting auth '%s'\n", peerName.c_str(), selected.c_str()); + + } else { + selected = "anonymous"; + log_warn_printf(io, "Server %s no supported auth. try to force '%s'\n", peerName.c_str(), selected.c_str()); + } + + Value cred; + if(selected=="ca") { + cred = context->caMethod.cloneEmpty(); + + std::vector buffer(256u); + + if(osiGetUserName(&buffer[0], buffer.size()) == osiGetUserNameSuccess) { + buffer[buffer.size()-1] = '\0'; + cred["user"] = buffer.data(); + } else { + cred["user"] = "nobody"; + } + + if (gethostname(&buffer[0], buffer.size()) == 0) { + buffer[buffer.size()-1] = '\0'; + cred["host"] = buffer.data(); + } else { + cred["host"] = "invalidhost."; + } + + log_info_printf(io, "Server %s 'ca' auth as %s@%s\n", peerName.c_str(), + cred["user"].as().c_str(), + cred["host"].as().c_str()); + } + + { + (void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get())); + + EvOutBuf R(hostBE, txBody.get()); + + // serverReceiveBufferSize, not used + to_wire(R, uint32_t(0x10000)); + // serverIntrospectionRegistryMaxSize, also not used + to_wire(R, uint16_t(0x7fff)); + // QoS, not used (quality?) + to_wire(R, uint16_t(0)); + + to_wire(R, selected); + + to_wire(R, Value::Helper::desc(cred)); + if(cred) + to_wire_full(R, cred); + } + enqueueTxBody(CMD_CONNECTION_VALIDATION); +} + +void Connection::handle_CONNECTION_VALIDATED() +{ + EvInBuf M(peerBE, segBuf.get(), 16); + + Status sts{}; + from_wire(M, sts); + + if(!M.good()) { + log_crit_printf(io, "Server %s sends invalid CONNECTION_VALIDATED. Disconnecting...\n", peerName.c_str()); + bev.reset(); + return; + + } else if(!sts.isSuccess()) { + log_err_printf(io, "Server %s refuses auth. Trying to proceed w/o cred\n", peerName.c_str()); + + } else { + log_debug_printf(io, "Server %s accepts auth%s%s\n", peerName.c_str(), + sts.msg.empty() ? "" : " ", sts.msg.c_str()); + } + + ready = true; + + createChannels(); +} + +void Connection::handle_CREATE_CHANNEL() +{ + EvInBuf M(peerBE, segBuf.get(), 16); + + uint32_t cid, sid; + Status sts{}; + + from_wire(M, cid); + from_wire(M, sid); + from_wire(M, sts); + // "spec" calls for uint16_t Access Rights here, but pvAccessCPP don't include this (it's useless anyway) + + if(!M.good()) { + log_crit_printf(io, "Server %s sends invalid CREATE_CHANNEL. Disconnecting...\n", peerName.c_str()); + bev.reset(); + return; + } + + std::shared_ptr chan; + { + auto it = creatingByCID.find(cid); + if(it==creatingByCID.end() || !(chan = it->second.lock())) { + + if(it!=creatingByCID.end()) + creatingByCID.erase(it); + + if(sts.isSuccess()) { + // we now have a channel which is no longer interesting. + log_debug_printf(io, "Server %s disposing of newly stale channel\n", peerName.c_str()); + + { + (void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get())); + + EvOutBuf R(hostBE, txBody.get()); + to_wire(R, sid); + to_wire(R, cid); + } + enqueueTxBody(CMD_DESTROY_CHANNEL); + } + return; + } + creatingByCID.erase(it); + } + + if(!sts.isSuccess()) { + // server refuses to create a channel, but presumably responded positivly to search + + chan->state = Channel::Searching; + context->searchBuckets[context->currentBucket].push_back(chan); + + log_warn_printf(io, "Server %s refuses channel to '%s' : %s\n", peerName.c_str(), + chan->name.c_str(), sts.msg.c_str()); + + } else { + chan->state = Channel::Active; + chan->sid = sid; + + chanBySID[sid] = chan; + + log_debug_printf(io, "Server %s active channel to '%s' %u:%u\n", peerName.c_str(), + chan->name.c_str(), unsigned(chan->cid), unsigned(chan->sid)); + + chan->createOperations(); + } +} + +void Connection::handle_DESTROY_CHANNEL() +{ + // (maybe) keep myself alive + std::shared_ptr self; + + EvInBuf M(peerBE, segBuf.get(), 16); + + uint32_t cid, sid; + from_wire(M, sid); + from_wire(M, cid); + + if(!M.good()) { + log_crit_printf(io, "Server %s sends invalid DESTROY_CHANNEL. Disconnecting...\n", peerName.c_str()); + bev.reset(); + return; + } + + std::shared_ptr chan; + { + auto it = chanBySID.find(sid); + if(it==chanBySID.end() || !(chan = it->second.lock())) { + log_debug_printf(io, "Server %s destroys non-existant channel %u:%u\n", + peerName.c_str(), unsigned(cid), unsigned(sid)); + return; + } + } + + chanBySID.erase(sid); + + chan->state = Channel::Searching; + chan->sid = 0xdeadbeef; // spoil + self = std::move(chan->conn); + context->searchBuckets[context->currentBucket].push_back(chan); + + log_debug_printf(io, "Server %s destroys channel '%s' %u:%u\n", + peerName.c_str(), chan->name.c_str(), unsigned(cid), unsigned(sid)); +} + +void Connection::tickEcho() +{ + log_debug_printf(io, "Server %s ping\n", peerName.c_str()); + + if(!bev) + return; + + auto tx = bufferevent_get_output(bev.get()); + + to_evbuf(tx, Header{CMD_ECHO, 0u, 0u}, hostBE); + + // maybe help reduce latency + bufferevent_flush(bev.get(), EV_WRITE, BEV_FLUSH); +} + +void Connection::tickEchoS(evutil_socket_t fd, short evt, void *raw) +{ + try { + static_cast(raw)->tickEcho(); + }catch(std::exception& e){ + log_crit_printf(io, "Unhandled error in echo timer callback: %s\n", e.what()); + } +} + +} // namespace client +} // namespace pvxs diff --git a/src/clientimpl.h b/src/clientimpl.h new file mode 100644 index 0000000..2c53f1a --- /dev/null +++ b/src/clientimpl.h @@ -0,0 +1,170 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +#ifndef CLIENTIMPL_H +#define CLIENTIMPL_H + +#include + +#include + +#include "evhelper.h" +#include "dataimpl.h" +#include "utilpvt.h" +#include "udp_collector.h" +#include "conn.h" + +namespace pvxs { +namespace client { + +struct Channel; + +// internal actions on an Operation +struct OperationBase : public Operation +{ + const std::shared_ptr chan; + uint32_t ioid; + + OperationBase(operation_t op, const std::shared_ptr& chan); + virtual ~OperationBase(); + + virtual void createOp() =0; +}; + +struct RequestInfo { + const uint32_t ioid; + const Operation::operation_t op; + const std::weak_ptr handle; + + Value prototype; + + RequestInfo(uint32_t ioid, std::shared_ptr& handle); +}; + +struct Connection : public ConnBase { + const std::shared_ptr context; + + const evevent echoTimer; + + bool ready = false; + + // channels to be created on this Connection + std::list> pending; + + std::map> creatingByCID, + chanBySID; + + std::map opByIOID; + + uint32_t nextIOID = 0u; + + Connection(const std::shared_ptr& context, const SockAddr &peerAddr); + ~Connection(); + + void createChannels(); + + virtual void bevEvent(short events) override final; + + virtual void cleanup() override final; + +#define CASE(Op) virtual void handle_##Op() override final; + CASE(CONNECTION_VALIDATION); + CASE(CONNECTION_VALIDATED); + + CASE(CREATE_CHANNEL); + CASE(DESTROY_CHANNEL); + + CASE(GET_FIELD); +#undef CASE + +protected: + void tickEcho(); + static void tickEchoS(evutil_socket_t fd, short evt, void *raw); +}; + +struct Channel { + const std::shared_ptr context; + const std::string name; + // Our choosen ID for this channel. + // used as persistent CID and searchID + const uint32_t cid; + + enum state_t { + Searching, // waiting for a server to claim + Connecting, // waiting for Connection to become ready + Creating, // waiting for reply to CREATE_CHANNEL + Active, + } state = Searching; + + std::shared_ptr conn; + uint32_t sid = 0u; + + // when state==Searching, number of repeatitions + size_t nSearch = 0u; + + // GUID of last positive reply when state!=Searching + std::array guid; + SockAddr replyAddr; + + std::list> pending; + + Channel(const std::shared_ptr& context, const std::string& name, uint32_t cid); + ~Channel(); + + void createOperations(); + + static + std::shared_ptr build(const std::shared_ptr& context, const std::string &name); +}; + +struct Context::Pvt +{ + std::weak_ptr internal_self; + + // "const" after ctor + Config effective; + + const Value caMethod; + + uint32_t nextCID=0u; + + evsocket searchTx; + uint16_t searchRxPort; + + std::vector searchMsg; + + // search destination address and whether to set the unicast flag + std::vector> searchDest; + + size_t currentBucket = 0u; + std::vector>> searchBuckets; + + std::list > beaconRx; + + std::map> chanByCID; + std::map> chanByName; + + std::map> connByAddr; + + evbase tcp_loop; + const evevent searchRx; + const evevent searchTimer; + + Pvt(const Config& conf); + ~Pvt(); + + void close(); + + bool onSearch(); + static void onSearchS(evutil_socket_t fd, short evt, void *raw); + void tickSearch(); + static void tickSearchS(evutil_socket_t fd, short evt, void *raw); +}; + +} // namespace client + +} // namespace pvxs + +#endif // CLIENTIMPL_H diff --git a/src/clientintrospect.cpp b/src/clientintrospect.cpp new file mode 100644 index 0000000..992fe5f --- /dev/null +++ b/src/clientintrospect.cpp @@ -0,0 +1,140 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include + +#include +#include "clientimpl.h" + +namespace pvxs { +namespace client { + +DEFINE_LOGGER(io, "pvxs.client.io"); + +namespace { + +struct InfoOp : public OperationBase +{ + std::function done; + Value result; + + enum state_t { + Connecting, + Waiting, + Done, + } state = Connecting; + + explicit InfoOp(const std::shared_ptr& chan) + :OperationBase(Info, chan) + {} + + virtual ~InfoOp() + { + cancel(); + } + + virtual void cancel() override final {} + + virtual void createOp() override final + { + assert(state==Connecting); + + auto& conn = chan->conn; + + { + (void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get())); + + EvOutBuf R(hostBE, conn->txBody.get()); + + to_wire(R, chan->sid); + to_wire(R, ioid); + // sub-field, which no one knows how to use... + to_wire(R, ""); + } + conn->enqueueTxBody(CMD_GET_FIELD); + + log_debug_printf(io, "Server %s channel '%s' GET_INFO\n", conn->peerName.c_str(), chan->name.c_str()); + + state = Waiting; + } +}; + +} // namespace + +void Connection::handle_GET_FIELD() +{ + EvInBuf M(peerBE, segBuf.get(), 16); + + uint32_t ioid=0u; + Status sts; + Value prototype; + + from_wire(M, ioid); + from_wire(M, sts); + from_wire_type(M, rxRegistry, prototype); + + if(!M.good()) { + log_crit_printf(io, "Server %s sends invalid GET_FIELD. Disconnecting...\n", peerName.c_str()); + bev.reset(); + return; + } + + std::shared_ptr op; + { + auto it = opByIOID.find(ioid); + if(it==opByIOID.end() + || !(op = it->second.handle.lock()) + || op->op!=Operation::Info) { + log_warn_printf(io, "Server %s sends stale GET_FIELD\n", peerName.c_str()); + return; + } + opByIOID.erase(it); + } + + auto info = static_cast(op.get()); + + if(info->state!=InfoOp::Waiting) { + log_warn_printf(io, "Server %s ignore second reply to GET_FIELD\n", peerName.c_str()); + return; + } + + log_debug_printf(io, "Server %s completes GET_FIELD.\n", peerName.c_str()); + + info->state = InfoOp::Done; + + if(info->done) { + auto done = std::move(info->done); + done(std::move(prototype)); + + } else { + info->result = prototype; + } +} + +std::shared_ptr Context::GetBuilder::exec() +{ + std::shared_ptr ret; + + if(_get) + throw std::runtime_error("Get Not Implemented"); + + pvt->tcp_loop.call([&ret, this]() { + auto chan = Channel::build(pvt, _name); + + auto op = std::make_shared(chan); + op->done = std::move(_result); + + chan->pending.push_back(op); + chan->createOperations(); + + ret = op; + }); + + return ret; +} + +} // namespace client +} // namespace pvxs diff --git a/src/config.cpp b/src/config.cpp index 1991b37..63b03e8 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -15,6 +15,7 @@ #include #include "serverconn.h" +#include "clientimpl.h" #include "evhelper.h" DEFINE_LOGGER(serversetup, "pvxs.server.setup"); @@ -215,5 +216,74 @@ std::ostream& operator<<(std::ostream& strm, const Config& conf) return strm; } +} // namespace server + +namespace client { + +Config Config::from_env() +{ + Config ret; + + const char* name; + + if(const char *env = pickenv(&name, {"EPICS_PVA_ADDR_LIST"})) { + split_addr_into(name, ret.addressList, env); + } + + if(const char *env = pickenv(&name, {"EPICS_PVA_AUTO_ADDR_LIST"})) { + if(epicsStrCaseCmp(env, "YES")==0) { + ret.autoAddrList = true; + } else if(epicsStrCaseCmp(env, "NO")==0) { + ret.autoAddrList = false; + } else { + log_err_printf(serversetup, "%s invalid bool value (YES/NO)", name); + } + } + + if(const char *env = pickenv(&name, {"EPICS_PVA_BROADCAST_PORT"})) { + try { + ret.udp_port = lexical_cast(env); + }catch(std::exception& e) { + log_err_printf(serversetup, "%s invalid integer : %s", name, e.what()); + } + } + + return ret; } + +void Config::expand() +{ + if(autoAddrList) { + std::vector all({"0.0.0.0"}); + expandAddrList(all, addressList); + autoAddrList = false; + } + + removeDups(addressList); } + +std::ostream& operator<<(std::ostream& strm, const Config& conf) +{ + bool first; + + strm<<"EPICS_PVA_ADDR_LIST=\""; + first = true; + for(auto& iface : conf.addressList) { + if(first) + first = false; + else + strm<<' '; + strm< [pvname ...]\n"; +} + +} + +int main(int argc, char *argv[]) +{ + logger_config_env(); // from $PVXS_LOG + double timeout = 5.0; + bool verbose = false; + + { + int opt; + while ((opt = getopt(argc, argv, "hvdw:")) != -1) { + switch(opt) { + case 'h': + usage(argv[0]); + return 0; + case 'v': + verbose = true; + break; + case 'd': + logger_level_set("pvxs.*", Level::Debug); + break; + case 'w': + if(epicsParseDouble(optarg, &timeout, nullptr)) { + std::cerr<<"Invalid timeout value: "<> ops; + + for(auto n : range(optind, argc)) { + + ops.push_back(ctxt.info(argv[n]) + .result([&argv, n](Value&& prototype) { + std::cout<