/** * Copyright - See the COPYRIGHT that is included with this distribution. * pvxs is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "evhelper.h" #include "serverconn.h" #include "utilpvt.h" #include "udp_collector.h" namespace pvxs { namespace server { using namespace impl; DEFINE_LOGGER(serversetup, "pvxs.server.setup"); DEFINE_LOGGER(serverio, "pvxs.server.io"); Server::Server(const Config& conf) { /* Here be dragons. * * We keep two different ref. counters. * - "externel" counter which keeps a server running. * - "internal" which only keeps server storage from being destroyed. * * External refs are held as Server::pvt. Internal refs are * held by various in-progress operations (OpBase sub-classes) * Which need to safely access server storage, but should not * prevent a server from stopping. */ auto internal(std::make_shared(conf)); internal->internal_self = internal; // external pvt.reset(internal.get(), [internal](Pvt*) mutable { auto trash(std::move(internal)); trash->stop(); }); // we don't keep a weak_ptr to the external reference. // Caller is entirely responsible for keeping this server running } Server::~Server() {} Server& Server::addSource(const std::string& name, const std::shared_ptr& src, int order) { if(!pvt) throw std::logic_error("NULL Server"); if(!src) throw std::logic_error(SB()<<"Attempt to add NULL Source "<sourcesLock.lockWriter()); auto& ent = pvt->sources[std::make_pair(order, name)]; if(ent) throw std::runtime_error(SB()<<"Source already registered : ("<beaconChange++; } return *this; } std::shared_ptr Server::removeSource(const std::string& name, int order) { if(!pvt) throw std::logic_error("NULL Server"); auto G(pvt->sourcesLock.lockWriter()); std::shared_ptr ret; auto it = pvt->sources.find(std::make_pair(order, name)); if(it!=pvt->sources.end()) { ret = it->second; pvt->sources.erase(it); } pvt->beaconChange++; return ret; } std::shared_ptr Server::getSource(const std::string& name, int order) { if(!pvt) throw std::logic_error("NULL Server"); auto G(pvt->sourcesLock.lockReader()); std::shared_ptr ret; auto it = pvt->sources.find(std::make_pair(order, name)); if(it!=pvt->sources.end()) { ret = it->second; } return ret; } std::vector > Server::listSource() { if(!pvt) throw std::logic_error("NULL Server"); std::vector > names; auto G(pvt->sourcesLock.lockReader()); names.reserve(pvt->sources.size()); for(auto& pair : pvt->sources) { names.emplace_back(pair.first.second, pair.first.first); } return names; } const Config& Server::config() const { if(!pvt) throw std::logic_error("NULL Server"); return pvt->effective; } client::Config Server::clientConfig() const { if(!pvt) throw std::logic_error("NULL Server"); client::Config ret; ret.udp_port = pvt->effective.udp_port; ret.interfaces = pvt->effective.interfaces; ret.addressList = pvt->effective.interfaces; ret.autoAddrList = false; return ret; } Server& Server::addPV(const std::string& name, const SharedPV& pv) { if(!pvt) throw std::logic_error("NULL Server"); pvt->builtinsrc.add(name, pv); pvt->beaconChange++; return *this; } Server& Server::removePV(const std::string& name) { if(!pvt) throw std::logic_error("NULL Server"); pvt->builtinsrc.remove(name); pvt->beaconChange++; return *this; } Server& Server::start() { if(!pvt) throw std::logic_error("NULL Server"); pvt->start(); return *this; } Server& Server::stop() { if(!pvt) throw std::logic_error("NULL Server"); pvt->stop(); return *this; } Server& Server::run() { if(!pvt) throw std::logic_error("NULL Server"); pvt->start(); { SigInt handler([this](){ pvt->done.signal(); }); pvt->done.wait(); } pvt->stop(); return *this; } Server& Server::interrupt() { if(!pvt) throw std::logic_error("NULL Server"); pvt->done.signal(); return *this; } std::ostream& operator<<(std::ostream& strm, const Server& serv) { auto detail = Detailed::level(strm); if(!serv.pvt) { strm<sourcesLock.lockReader()); for(auto& pair : serv.pvt->sources) { strm<0) { Indented I(strm); Detailed D(strm, detail-1); pair.second->show(strm); } strm<<"\n"; } } if(detail<2) return strm; serv.pvt->acceptor_loop.call([&serv, &strm, detail](){ strm<state) { #define CASE(STATE) case Server::Pvt::STATE: strm<< #STATE; break CASE(Stopped); CASE(Starting); CASE(Running); CASE(Stopping); #undef CASE } strm<<"\n"; Indented I(strm); for(auto& pair : serv.pvt->connections) { auto conn = pair.first; strm<peerName <<" backlog="<backlog.size() <<" auth="<autoMethod<<"\n"; if(detail>2) strm<credentials; if(detail<=2) continue; Indented I(strm); for(auto& pair : conn->chanBySID) { auto& chan = pair.second; if(chan->state==ServerChan::Creating) { strm<state==ServerChan::Destroy) { strm<opByIOID.empty()) { strm<opByIOID) { auto& op = pair.second; if(!op) { strm<state) { #define CASE(STATE) case ServerOp::STATE: strm<< #STATE; break CASE(Creating); CASE(Idle); CASE(Executing); CASE(Dead); #undef CASE } strm<<" ioid="<show(strm); } } } } }); } return strm; } Server::Pvt::Pvt(const Config &conf) :effective(conf) ,beaconMsg(128) ,acceptor_loop("PVXTCP", epicsThreadPriorityCAServerLow-2) ,beaconSender(AF_INET, SOCK_DGRAM, 0) ,beaconTimer(event_new(acceptor_loop.base, -1, EV_TIMEOUT, doBeaconsS, this)) ,searchReply(0x10000) ,builtinsrc(StaticSource::build()) ,state(Stopped) { effective.expand(); { int val = 1; if(setsockopt(beaconSender.sock, SOL_SOCKET, SO_BROADCAST, (char *)&val, sizeof(val))) log_err_printf(serversetup, "Unable to setup beacon sender SO_BROADCAST: %d\n", SOCKERRNO); } auto manager = UDPManager::instance(); for(const auto& iface : effective.interfaces) { SockAddr addr(AF_INET, iface.c_str()); addr.setPort(effective.udp_port); listeners.push_back(manager.onSearch(addr, std::bind(&Pvt::onSearch, this, std::placeholders::_1) )); // update to allow udp_port==0 effective.udp_port = addr.port(); } evsocket dummy(AF_INET, SOCK_DGRAM, 0); acceptor_loop.call([this](){ // from acceptor worker bool firstiface = true; for(const auto& addr : effective.interfaces) { interfaces.emplace_back(addr, effective.tcp_port, this, firstiface); if(firstiface || effective.tcp_port==0) effective.tcp_port = interfaces.back().bind_addr.port(); firstiface = false; } for(const auto& addr : effective.beaconDestinations) { beaconDest.emplace_back(AF_INET, addr.c_str(), effective.udp_port); } }); { // choose new GUID. // treat as 3x 32-bit unsigned. union { std::array i; std::array b; } pun; static_assert (sizeof(pun)==12, ""); // i[0] (start) time epicsTimeStamp now; epicsTimeGetCurrent(&now); pun.i[0] = now.secPastEpoch ^ now.nsec; // i[1] host // mix together first interface and all local bcast addresses pun.i[1] = osiLocalAddr(dummy.sock).ia.sin_addr.s_addr; { ELLLIST bcasts = ELLLIST_INIT; osiSockAddr match; match.ia.sin_family = AF_INET; match.ia.sin_addr.s_addr = htonl(INADDR_ANY); match.ia.sin_port = 0; osiSockDiscoverBroadcastAddresses(&bcasts, dummy.sock, &match); while(ELLNODE *cur = ellGet(&bcasts)) { osiSockAddrNode *node = CONTAINER(cur, osiSockAddrNode, node); if(node->addr.sa.sa_family==AF_INET) pun.i[1] ^= ntohl(node->addr.ia.sin_addr.s_addr); free(cur); } } // i[2] process on host #if defined(_WIN32) pun.i[2] = GetCurrentProcessId(); #elif !defined(__rtems__) && !defined(vxWorks) pun.i[2] = getpid(); #else pun.i[2] = 0xdeadbeef; #endif // and a bit of server instance within this process pun.i[2] ^= uint32_t(effective.tcp_port)<<16u; // maybe a little bit of randomness (eg. ASLR on Linux) pun.i[2] ^= size_t(this); if(sizeof(size_t)>4) pun.i[2] ^= size_t(this)>>32u; std::copy(pun.b.begin(), pun.b.end(), effective.guid.begin()); } // Add magic "server" PV { auto L = sourcesLock.lockWriter(); sources[std::make_pair(-1, "server")] = std::make_shared(this); sources[std::make_pair(-1, "builtin")] = builtinsrc.source(); } } Server::Pvt::~Pvt() { stop(); } void Server::Pvt::start() { log_debug_printf(serversetup, "Server Starting\n%s", ""); // begin accepting connections state_t prev_state; acceptor_loop.call([this, &prev_state]() { prev_state = state; if(state!=Stopped) { // already running log_debug_printf(serversetup, "Server not stopped %d\n", state); return; } state = Starting; log_debug_printf(serversetup, "Server starting\n%s", ""); for(auto& iface : interfaces) { if(evconnlistener_enable(iface.listener.get())) { log_err_printf(serversetup, "Error enabling listener on %s\n", iface.name.c_str()); } log_debug_printf(serversetup, "Server enabled listener on %s\n", iface.name.c_str()); } }); if(prev_state!=Stopped) return; // being processing Searches for(auto& L : listeners) { L->start(); } // begin sending beacons acceptor_loop.call([this]() { timeval immediate = {0,0}; // send first beacon immediately if(event_add(beaconTimer.get(), &immediate)) log_err_printf(serversetup, "Error enabling beacon timer on\n%s", ""); state = Running; }); } void Server::Pvt::stop() { log_debug_printf(serversetup, "Server Stopping\n%s", ""); // Stop sending Beacons state_t prev_state; acceptor_loop.call([this, &prev_state]() { prev_state = state; if(state!=Running) { log_debug_printf(serversetup, "Server not running %d\n", state); return; } state = Stopping; if(event_del(beaconTimer.get())) log_err_printf(serversetup, "Error disabling beacon timer on\n%s", ""); }); if(prev_state!=Running) return; // stop processing Search requests for(auto& L : listeners) { L->stop(); } acceptor_loop.call([this]() { // stop accepting new TCP connections for(auto& iface : interfaces) { if(evconnlistener_disable(iface.listener.get())) { log_err_printf(serversetup, "Error disabling listener on %s\n", iface.name.c_str()); } log_debug_printf(serversetup, "Server disabled listener on %s\n", iface.name.c_str()); } // close current TCP connections auto conns = std::move(connections); for(auto& pair : conns) { pair.second->bev.reset(); pair.second->cleanup(); } state = Stopped; }); } void Server::Pvt::onSearch(const UDPManager::Search& msg) { // on UDPManager worker log_debug_printf(serverio, "%s searching\n", msg.src.tostring().c_str()); searchOp._names.resize(msg.names.size()); for(auto i : range(msg.names.size())) { searchOp._names[i]._name = msg.names[i].name; searchOp._names[i]._claim = false; } ipAddrToDottedIP(&msg.server->in, searchOp._src, sizeof(searchOp._src)); { auto G(sourcesLock.lockReader()); for(const auto& pair : sources) { try { pair.second->onSearch(searchOp); }catch(std::exception& e){ log_exc_printf(serversetup, "Unhandled error in Source::onSearch for '%s' : %s\n", pair.first.second.c_str(), e.what()); } } } uint16_t nreply = 0; for(const auto& name : searchOp._names) { log_debug_printf(serverio, " %sclaim %s\n", name._claim ? "" : "dis", name._name); if(name._claim) nreply++; } // "pvlist" breaks unless we honor mustReply flag if(nreply==0 && !msg.mustReply) return; VectorOutBuf M(true, searchReply); M.skip(8); // fill in header after body length known _to_wire<12>(M, effective.guid.data(), false); to_wire(M, msg.searchID); to_wire(M, SockAddr::any(AF_INET)); to_wire(M, uint16_t(effective.tcp_port)); to_wire(M, "tcp"); // "found" flag to_wire(M, uint8_t(nreply!=0 ? 1 : 0)); to_wire(M, uint16_t(nreply)); for(auto i : range(msg.names.size())) { if(searchOp._names[i]._claim) to_wire(M, uint32_t(msg.names[i].id)); } auto pktlen = M.save()-searchReply.data(); // now going back to fill in header FixedBuf H(true, searchReply.data(), 8); to_wire(H, Header{CMD_SEARCH_RESPONSE, pva_flags::Server, uint32_t(pktlen-8)}); if(!M.good() || !H.good()) { log_crit_printf(serverio, "Logic error in Search buffer fill\n%s", ""); } else { (void)msg.reply(searchReply.data(), pktlen); } } void Server::Pvt::doBeacons(short evt) { log_debug_printf(serversetup, "Server beacon timer expires\n%s", ""); VectorOutBuf M(true, beaconMsg); M.skip(8); // fill in header after body length known _to_wire<12>(M, effective.guid.data(), false); to_wire(M, uint8_t(0u)); // flags (aka. QoS, aka. undefined) to_wire(M, uint8_t(beaconSeq++)); // sequence to_wire(M, uint16_t(beaconChange));// change count to_wire(M, SockAddr::any(AF_INET)); to_wire(M, uint16_t(effective.tcp_port)); to_wire(M, "tcp"); // "NULL" serverStatus to_wire(M, uint8_t(0xff)); auto pktlen = M.save()-beaconMsg.data(); // now going back to fill in header FixedBuf H(true, beaconMsg.data(), 8); to_wire(H, Header{CMD_BEACON, pva_flags::Server, uint32_t(pktlen-8)}); assert(M.good() && H.good()); for(const auto& dest : beaconDest) { int ntx = sendto(beaconSender.sock, (char*)beaconMsg.data(), pktlen, 0, &dest->sa, dest.size()); if(ntx<0) { int err = evutil_socket_geterror(beaconSender.sock); auto lvl = Level::Warn; if(err==EINTR || err==EPERM) lvl = Level::Debug; log_printf(serverio, lvl, "Beacon tx error (%d) %s\n", err, evutil_socket_error_to_string(err)); } else if(unsigned(ntx)(raw)->doBeacons(evt); }catch(std::exception& e){ log_exc_printf(serverio, "Unhandled error in beacon timer callback: %s\n", e.what()); } } Source::~Source() {} Source::List Source::onList() { return Source::List{}; } void Source::show(std::ostream& strm) { auto list(onList()); strm<<(list.dynamic ? "Dynamic":"")<<"Source"; Indented I(strm); if(list.names) { for(auto& name : *list.names) { strm<<"\n"<