/** * Copyright - See the COPYRIGHT that is included with this distribution. * pvxs is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. */ #include #include #include #include #include #include #include #include "serverconn.h" // limit on size of TX buffer above which we suspend RX static constexpr size_t tcp_tx_limit = 0x100000; namespace pvxs { namespace server { std::set ClientCredentials::roles() const { std::set ret; osdGetRoles(account, ret); return ret; } std::ostream& operator<<(std::ostream& strm, const ClientCredentials& cred) { strm<server->acceptor_loop.base, sock, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS), SockAddr(peer, socklen)) ,iface(iface) { log_debug_printf(connio, "Client %s connects\n", peerName.c_str()); { auto cred(std::make_shared()); cred->peer = peerName; cred->iface = iface->name; // paranoia placeholder prior to handle_CONNECTION_VALIDATION() cred->method = cred->account = "anonymous"; this->cred = std::move(cred); } bufferevent_setcb(bev.get(), &bevReadS, &bevWriteS, &bevEventS, this); timeval tmo(totv(iface->server->effective.tcpTimeout)); bufferevent_set_timeouts(bev.get(), &tmo, &tmo); auto tx = bufferevent_get_output(bev.get()); std::vector buf(128); // queue connection validation message { VectorOutBuf M(hostBE, buf); to_wire(M, Header{pva_ctrl_msg::SetEndian, pva_flags::Control|pva_flags::Server, 0}); auto save = M.save(); M.skip(8, __FILE__, __LINE__); // placeholder for header auto bstart = M.save(); // serverReceiveBufferSize, not used to_wire(M, uint32_t(0x10000)); // serverIntrospectionRegistryMaxSize, also not used to_wire(M, uint16_t(0x7fff)); to_wire(M, Size{2}); to_wire(M, "anonymous"); to_wire(M, "ca"); auto bend = M.save(); FixedBuf H(hostBE, save, 8); to_wire(H, Header{CMD_CONNECTION_VALIDATION, pva_flags::Server, uint32_t(bend-bstart)}); assert(M.good() && H.good()); if(evbuffer_add(tx, buf.data(), M.save()-buf.data())) throw std::bad_alloc(); statTx += M.save()-buf.data(); } if(bufferevent_enable(bev.get(), EV_READ|EV_WRITE)) throw std::logic_error("Unable to enable BEV"); } ServerConn::~ServerConn() {} const std::shared_ptr& ServerConn::lookupSID(uint32_t sid) { auto it = chanBySID.find(sid); if(it==chanBySID.end()) { static decltype (it->second) empty{}; return empty; //throw std::runtime_error(SB()<<"Client "<second; } void ServerConn::handle_ECHO() { // Client requests echo as a keep-alive check auto tx = bufferevent_get_output(bev.get()); uint32_t len = evbuffer_get_length(segBuf.get()); to_evbuf(tx, Header{CMD_ECHO, pva_flags::Server, len}, hostBE); auto err = evbuffer_add_buffer(tx, segBuf.get()); assert(!err); // maybe help reduce latency bufferevent_flush(bev.get(), EV_WRITE, BEV_FLUSH); statTx += 8u + len; } static void auth_complete(ServerConn *self, const Status& sts) { (void)evbuffer_drain(self->txBody.get(), evbuffer_get_length(self->txBody.get())); { EvOutBuf M(hostBE, self->txBody.get()); to_wire(M, sts); } self->enqueueTxBody(CMD_CONNECTION_VALIDATED); log_debug_printf(connsetup, "%s Auth complete with %d\n", self->peerName.c_str(), sts.code); } void ServerConn::handle_CONNECTION_VALIDATION() { // Client begins (restarts?) Auth handshake EvInBuf M(peerBE, segBuf.get(), 16); std::string selected; { M.skip(4+2+2, __FILE__, __LINE__); // ignore unused buffer, introspection size, and QoS from_wire(M, selected); Value auth; from_wire_type_value(M, rxRegistry, auth); if(!M.good()) { log_err_printf(connio, "%s:%d Client %s Truncated/Invalid ConnValid from client\n", M.file(), M.line(), peerName.c_str()); bev.reset(); return; } else { log_debug_printf(connsetup, "Client %s authenticates using %s and %s\n", peerName.c_str(), selected.c_str(), std::string(SB()<(*cred)); if(selected=="ca") { auth["user"].as([&C, &selected](const std::string& user) { C->method = selected; C->account = user; }); } if(C->method.empty()) { C->account = C->method = "anonymous"; } C->raw = auth; cred = std::move(C); } } if(selected!="ca" && selected!="anonymous") { log_debug_printf(connsetup, "Client %s selects unadvertised auth \"%s\"", peerName.c_str(), selected.c_str()); auth_complete(this, Status{Status::Error, "Client selects unadvertised auth"}); return; } else { log_debug_printf(connsetup, "Client %s selects auth \"%s\"\n", peerName.c_str(), selected.c_str()); } // remainder of segBuf is payload w/ credentials // No practical way to handle auth failure. // So we accept all credentials, but may not grant rights. auth_complete(this, Status{Status::Ok}); } void ServerConn::handle_AUTHNZ() { // ignored (so far no auth plugin actually uses) } void ServerConn::handle_PUT_GET() {} void ServerConn::handle_CANCEL_REQUEST() { EvInBuf M(peerBE, segBuf.get(), 16); uint32_t sid=0, ioid=0; from_wire(M, sid); from_wire(M, ioid); if(!M.good()) throw std::runtime_error(SB()<second; auto chan = op->chan.lock(); if(!chan || chan->sid!=sid) { log_err_printf(connsetup, "Client %s Cancel inconsistent Op\n", peerName.c_str()); return; } if(op->state==ServerOp::Executing) { op->state = ServerOp::Idle; if(op->onCancel) op->onCancel(); } else { // an allowed race log_debug_printf(connsetup, "Client %s Cancel of non-executing Op\n", peerName.c_str()); } } void ServerConn::handle_DESTROY_REQUEST() { EvInBuf M(peerBE, segBuf.get(), 16); uint32_t sid=0, ioid=0; from_wire(M, sid); from_wire(M, ioid); if(!M.good()) throw std::runtime_error(SB()<opByIOID.erase(ioid)) { log_debug_printf(connsetup, "Client %s can't destroy non-existent op %u:%u\n", peerName.c_str(), unsigned(sid), unsigned(ioid)); } if(it!=opByIOID.end()) { auto op = it->second; opByIOID.erase(it); op->state = ServerOp::Dead; if(op->onClose) op->onClose(""); } } void ServerConn::handle_MESSAGE() { EvInBuf M(peerBE, segBuf.get(), 16); uint32_t ioid = 0; uint8_t mtype = 0; std::string msg; from_wire(M, ioid); from_wire(M, mtype); from_wire(M, msg); if(!M.good()) throw std::runtime_error(SB()<second->chan.lock(); Level lvl; switch(mtype) { case 0: lvl = Level::Info; case 1: lvl = Level::Warn; case 2: lvl = Level::Err; default:lvl = Level::Crit; } if(remote.test(lvl)) errlogPrintf("Client %s Channel %s Remote message: %s\n", peerName.c_str(), chan ? "" : chan->name.c_str(), msg.c_str()); } std::shared_ptr ServerConn::self_from_this() { return shared_from_this(); } void ServerConn::cleanup() { log_debug_printf(connsetup, "Client %s Cleanup TCP Connection\n", peerName.c_str()); iface->server->connections.erase(this); for(auto& pair : opByIOID) { if(pair.second->onClose) pair.second->onClose(""); } for(auto& pair : chanBySID) { pair.second->state = ServerChan::Destroy; if(pair.second->onClose) { auto fn(std::move(pair.second->onClose)); fn(""); } } } void ServerConn::bevRead() { ConnBase::bevRead(); if(bev) { auto tx = bufferevent_get_output(bev.get()); if(evbuffer_get_length(tx)>=tcp_tx_limit) { // write buffer "full". stop reading until it drains // TODO configure (void)bufferevent_disable(bev.get(), EV_READ); bufferevent_setwatermark(bev.get(), EV_WRITE, tcp_tx_limit/2, 0); log_debug_printf(connio, "%s suspend READ\n", peerName.c_str()); } } } void ServerConn::bevWrite() { log_debug_printf(connio, "%s process backlog\n", peerName.c_str()); auto tx = bufferevent_get_output(bev.get()); // handle pending monitors while(!backlog.empty() && evbuffer_get_length(tx)acceptor_loop.assertInLoop(); auto orig_port = bind_addr.port(); if(evutil_make_listen_socket_reuseable(sock.sock)) log_warn_printf(connsetup, "Unable to make socket reusable%s", "\n"); // try to bind to requested port, then fallback to a random port while(true) { try { sock.bind(bind_addr); } catch(std::system_error& e) { if(fallback && e.code().value()==SOCK_EADDRINUSE) { bind_addr.setPort(0); continue; } throw; } break; } name = bind_addr.tostring(); if(orig_port && bind_addr.port() != orig_port) { log_warn_printf(connsetup, "Server unable to bind port %u, falling back to %s\n", orig_port, name.c_str()); } // added in libevent 2.1.1 #ifndef LEV_OPT_DISABLED # define LEV_OPT_DISABLED 0 #endif const int backlog = 4; listener = evlisten(evconnlistener_new(server->acceptor_loop.base, onConnS, this, LEV_OPT_DISABLED|LEV_OPT_CLOSE_ON_EXEC, backlog, sock.sock)); if(!LEV_OPT_DISABLED) evconnlistener_disable(listener.get()); } void ServIface::onConnS(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *peer, int socklen, void *raw) { auto self = static_cast(raw); try { if(peer->sa_family!=AF_INET) { log_crit_printf(connsetup, "Interface %s Rejecting !ipv4 client\n", self->name.c_str()); evutil_closesocket(sock); return; } auto conn(std::make_shared(self, sock, peer, socklen)); self->server->connections[conn.get()] = std::move(conn); }catch(std::exception& e){ log_exc_printf(connsetup, "Interface %s Unhandled error in accept callback: %s\n", self->name.c_str(), e.what()); evutil_closesocket(sock); } } ServerOp::~ServerOp() {} }} // namespace pvxs::impl