diff --git a/src/evhelper.cpp b/src/evhelper.cpp index 0cf8fae..d3f89d8 100644 --- a/src/evhelper.cpp +++ b/src/evhelper.cpp @@ -158,7 +158,10 @@ void call_action(evutil_socket_t _fd, short _ev, void *raw) void evbase::call(std::function&& fn) { - assert(!pvt->worker.isCurrentThread()); + if(pvt->worker.isCurrentThread()) { + fn(); + return; + } action_args args(std::move(fn)); diff --git a/src/pvxs/server.h b/src/pvxs/server.h index 237b5e0..341dae7 100644 --- a/src/pvxs/server.h +++ b/src/pvxs/server.h @@ -85,6 +85,9 @@ struct Source; * * Use a Server::Config to determine how this server will bind, listen, * and announce itself. + * + * In order to be useful, a Server will have one or more Source instances added + * to it with addSource(). */ class PVXS_API Server { @@ -113,10 +116,6 @@ public: Server(); //! Create/allocate, but do not start, a new server with the provided config. explicit Server(Config&&); - Server(Server&&) noexcept; - Server(const Server&) = delete; - Server& operator=(Server&&) noexcept; - Server& operator=(const Server&) = delete; ~Server(); //! Begin serving. Does not block. @@ -136,13 +135,21 @@ public: //! effective config const Config& config() const; + //! Add a Source to this server with an arbitrary source name. + //! + //! @param name Source name + //! @param src The Source. A strong reference to this Source which will be released by removeSource() or ~Server() + //! @param order Determines the order in which this Source::onCreate() will be called. + //! Lowest first. Server& addSource(const std::string& name, const std::shared_ptr& src, int order =0); + //! Disociate a Source using the name _and_ priority given to addSource() std::shared_ptr removeSource(const std::string& name, int order =0); + //! Fetch an std::shared_ptr getSource(const std::string& name, int order =0); @@ -152,20 +159,64 @@ public: struct Pvt; private: - std::unique_ptr pvt; + std::shared_ptr pvt; }; +struct OpBase { + //! The Client endpoint address in "X.X.X.X:Y" format. + const std::string peerName; + //! The local endpoint address in "X.X.X.X:Y" format. + const std::string ifaceName; + //! The Channel name + const std::string name; + // TODO credentials + OpBase(const std::string& peerName, + const std::string& iface, + const std::string& name); + virtual ~OpBase() =0; +}; +/** Manipulate an active Channel, and any in-progress Operations through it. + * + */ +struct PVXS_API ChannelControl : public OpBase { + ChannelControl(const std::string& peerName, + const std::string& iface, + const std::string& name) + :OpBase (peerName, iface, name) + {} + virtual ~ChannelControl() =0; + + //! Set/replace Handler associated with this Channel + //! If called from outside a Handler method, blocks until in-progress Handler methods have returned. + virtual std::shared_ptr setHandler(const std::shared_ptr& h) =0; + + //! Force disconnection + //! If called from outside a Handler method, blocks until in-progress Handler methods have returned. + //! Reference to currently attached Handler is released. + virtual void close() =0; + + // TODO: signal Rights? +}; + +/** Interface through which a Server discovers Channel names and + * associates with Handler instances. + * + * User code will sub-class. + */ struct PVXS_API Source { - virtual ~Source(); + virtual ~Source() =0; + //! An iteratable of names being sought struct Search { class Name { const char* _name; bool _claim; friend struct Server::Pvt; public: + //! The Channel name inline const char* name() const { return _name; } + //! The caller claims to be able to respond to an onCreate() inline void claim() { _claim = true; } }; private: @@ -177,20 +228,50 @@ struct PVXS_API Source { _names_t::iterator begin() { return _names.begin(); } _names_t::iterator end() { return _names.end(); } + //! The Client endpoint address in "X.X.X.X:Y" format. const SockAddr& source() const { return _src; } }; + /** Called each time a client polls for the existance of some Channel names. + * + * A Source may only Search::claim() a Channel name if it is prepared to + * immediately accept an onCreate() call for that Channel name. + * In other situations it should wait for the client to retry. + */ virtual void onSearch(Search& op) =0; - struct Create { - std::string& src; - std::string name; - // credentials - }; - virtual std::unique_ptr onCreate(const Create& op) =0; + /** A Client is attempting to open a connection to a certain Channel. + * + * This Channel name may not be one which seen or claimed by onSearch(). + * + * Callee with either do nothing, or std::move() the ChannelControl and call ChannelControl::setHandler() + */ + virtual void onCreate(std::unique_ptr&& op) =0; }; +//! Token for an in-progress request for Channel data type information. +struct PVXS_API Introspect : public OpBase +{ + //! Negative reply w/ error message + virtual void error(const std::string& msg) =0; + // void success(Data); + + virtual ~Introspect() =0; +}; + +/** Requests for a particular Channel are dispatched through me. + * + * User code will sub-class. + */ struct PVXS_API Handler { virtual ~Handler(); + + /** Request for Channel data type information + * + * Ownership of the Introspect instance is passed to the callee. + * The request will be implicitly errored if the callee allows + * the Introspect to be deleted prior to replying. + */ + virtual void onIntrospect(std::unique_ptr&& op); }; }} // namespace pvxs::server diff --git a/src/server.cpp b/src/server.cpp index d8662c0..4148ba0 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -91,19 +91,28 @@ Server Server::Config::build() Server::Server() {} Server::Server(Config&& conf) - :pvt(new Pvt(std::move(conf))) -{} - -Server::Server(Server&& other) noexcept - :pvt(std::move(other.pvt)) -{} - -Server& Server::operator=(Server&& other) noexcept { - if(this!=&other) { - pvt = std::move(other.pvt); - } - return *this; + /* Here be dragons. + * + * We keep two different ref. counters. + * - "externel" counter which keeps a server running. + * - "internal" which only keeps server storage from being destroyed. + * + * External refs are held as Server::pvt. Internal refs are + * held by various in-progress operations (OpBase sub-classes) + * Which need to safely access server storage, but should not + * prevent a server from stopping. + */ + std::shared_ptr internal(new Pvt(std::move(conf))); + internal->internal_self = internal; + + // external + pvt.reset(internal.get(), [internal](Pvt*) mutable { + internal->stop(); + internal.reset(); + }); + // we don't keep a weak_ptr to the external reference. + // Caller is entirely responsible for keeping this server running } Server::~Server() {} @@ -118,6 +127,7 @@ Server& Server::addSource(const std::string& name, throw std::logic_error(SB()<<"Attempt to add NULL Source "<sourcesLock.lockWriter()); + //epicsGuard G(pvt->sourcesLock.writer()); auto& ent = pvt->sources[std::make_pair(order, name)]; if(ent) @@ -392,8 +402,10 @@ void Server::Pvt::start() log_printf(serversetup, PLVL_DEBUG, "Server Starting\n"); // begin accepting connections - acceptor_loop.call([this]() + state_t prev_state; + acceptor_loop.call([this, &prev_state]() { + prev_state = state; if(state!=Stopped) { // already running log_printf(serversetup, PLVL_DEBUG, "Server not stopped %d\n", state); @@ -409,6 +421,8 @@ void Server::Pvt::start() log_printf(serversetup, PLVL_DEBUG, "Server enabled listener on %s\n", iface.name.c_str()); } }); + if(prev_state!=Stopped) + return; // being processing Searches for(auto& L : listeners) { @@ -433,8 +447,10 @@ void Server::Pvt::stop() log_printf(serversetup, PLVL_DEBUG, "Server Stopping\n"); // Stop sending Beacons - acceptor_loop.call([this]() + state_t prev_state; + acceptor_loop.call([this, &prev_state]() { + prev_state = state; if(state!=Running) { log_printf(serversetup, PLVL_DEBUG, "Server not running %d\n", state); return; @@ -444,6 +460,8 @@ void Server::Pvt::stop() if(event_del(beaconTimer.get())) log_printf(serversetup, PLVL_ERR, "Error disabling beacon timer on\n"); }); + if(prev_state!=Running) + return; // stop processing Search requests for(auto& L : listeners) { @@ -579,6 +597,23 @@ void Server::Pvt::doBeaconsS(evutil_socket_t fd, short evt, void *raw) Source::~Source() {} +OpBase::OpBase(const std::string& peerName, + const std::string& iface, + const std::string& name) + :peerName(peerName) + ,ifaceName(iface) + ,name(name) +{} + +OpBase::~OpBase() {} + +ChannelControl::~ChannelControl() {} + +Introspect::~Introspect() {} + Handler::~Handler() {} +void Handler::onIntrospect(std::unique_ptr&& op) +{} + }} // namespace pvxs::server diff --git a/src/serverchan.cpp b/src/serverchan.cpp index 0481130..6d839fb 100644 --- a/src/serverchan.cpp +++ b/src/serverchan.cpp @@ -4,22 +4,233 @@ * in file LICENSE that is included with this distribution. */ +#include +#include + +#include "pvxs/log.h" #include "serverconn.h" namespace pvxsimpl { -ServerChan::ServerChan(ServerConn* conn, +// message related to client state and errors +DEFINE_LOGGER(connsetup, "tcp.setup"); +// related to low level send/recv +DEFINE_LOGGER(connio, "tcp.io"); + +ServerChan::ServerChan(const std::shared_ptr &conn, uint32_t sid, uint32_t cid, - const std::string &name, - std::unique_ptr &&handler) + const std::string &name) :conn(conn) ,sid(sid) ,cid(cid) ,name(name) - ,handler(std::move(handler)) + ,state(Creating) {} ServerChan::~ServerChan() {} +ServerChannelControl::ServerChannelControl(const std::shared_ptr &conn, const std::shared_ptr& channel) + :server::ChannelControl(conn->peerName, conn->iface->name, channel->name) + ,server(conn->iface->server->internal_self) + ,chan(channel) +{} + +ServerChannelControl::~ServerChannelControl() {} + +std::shared_ptr ServerChannelControl::setHandler(const std::shared_ptr &h) +{ + std::shared_ptr ret(h); + std::shared_ptr serv(server); + + serv->acceptor_loop.call([this, &ret](){ + std::shared_ptr ch(chan); + + ch->handler.swap(ret); + }); + + return ret; +} + +void ServerChannelControl::close() +{ + // fail soft if server stopped, or channel/connection already closed + auto serv = server.lock(); + if(!serv) + return; + + serv->acceptor_loop.call([this](){ + auto ch = chan.lock(); + if(!ch) + return; + auto conn = ch->conn.lock(); + if(conn) { + if(ch->state==ServerChan::Active) { + // Send unsolicited Channel Destroy + + auto tx = bufferevent_get_output(conn->bev.get()); + constexpr bool be = EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG; + EvOutBuf R(be, tx); + to_wire(R, Header{pva_app_msg::DestroyChan, pva_flags::Server, 8}); + to_wire(R, ch->sid); + to_wire(R, ch->cid); + } + ch->state = ServerChan::Destroy; + } + }); +} + +void ServerConn::handle_CreateChan() +{ + const bool be = EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG; + const auto self = shared_from_this(); + + EvInBuf M(peerBE, segBuf.get(), 16); + + auto G(iface->server->sourcesLock.lockReader()); + + // one channel create request contains main channel names. + // each of which will received a seperate reply. + + uint16_t count = 0; + from_wire(M, count); + for(auto i : range(count)) { + (void)i; + uint32_t cid = -1, sid = -1; + std::string name; + from_wire(M, cid); + from_wire(M, name); + + if(!M.good() || name.empty()) + break; + + Status sts{Status::Ok}; + + bool claimed = false; + + if(chanByCID.size()==0xffffffff || chanBySID.size()==0xffffffff) { + sts.code = Status::Error; + sts.msg = "Too many Server channels"; + sts.trace = "pvx:serv:chanidoverflow:"; + + } else if(chanByCID.find(cid)!=chanByCID.end()) { + sts.code = Status::Fatal; + sts.msg = "Client reuses existing CID"; + sts.trace = "pvx:serv:dupcid:"; + + } else { + do { + sid = nextSID++; + } while(chanBySID.find(sid)!=chanBySID.end()); + + std::shared_ptr chan(new ServerChan(self, sid, cid, name)); + std::unique_ptr op(new ServerChannelControl(self, chan)); + + for(auto& pair : iface->server->sources) { + try { + pair.second->onCreate(std::move(op)); + if(!op || chan->handler || chan->state!=ServerChan::Creating) { + claimed = chan->state==ServerChan::Creating; + log_printf(connsetup, PLVL_DEBUG, "Client %s %s channel to %s through %s\n", peerName.c_str(), + claimed?"accepted":"rejected", name.c_str(), pair.first.second.c_str()); + break; + } + }catch(std::exception& e){ + log_printf(connsetup, PLVL_ERR, "Client %s Unhandled error in onCreate %s,%d %s : %s\n", peerName.c_str(), + pair.first.second.c_str(), pair.first.first, + typeid(&e).name(), e.what()); + } + } + + if(claimed && chan->state==ServerChan::Creating) { + chanByCID[cid] = chan; + chanBySID[sid] = chan; + chan->state = ServerChan::Active; + + } else { + sts.code = Status::Fatal; + sts.msg = "Refused to create Channel"; + sts.trace = "pvx:serv:refusechan:"; + + sid = -1; + } + + // ServerChannelControl destroyed it not saved by claiming Source + } + + + { + (void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get())); + + EvOutBuf R(be, txBody.get()); + to_wire(R, cid); + to_wire(R, sid); + to_wire(R, sts); + // "spec" calls for uint16_t Access Rights here, but pvAccessCPP don't include this (it's useless anyway) + if(!R.good()) { + M.fault(); + log_printf(connio, PLVL_ERR, "Client %s Encode error in CreateChan\n", peerName.c_str()); + break; + } + } + + auto tx = bufferevent_get_output(bev.get()); + to_evbuf(tx, Header{pva_app_msg::CreateChan, + pva_flags::Server, + uint32_t(evbuffer_get_length(txBody.get()))}, + be); + auto err = evbuffer_add_buffer(tx, txBody.get()); + assert(!err); + } + + if(!M.good()) { + log_printf(connio, PLVL_ERR, "Client %s Decode error in CreateChan\n", peerName.c_str()); + bev.reset(); + } +} + +void ServerConn::handle_DestroyChan() +{ + EvInBuf M(peerBE, segBuf.get(), 16); + + uint32_t sid=-1, cid=-1; + + from_wire(M, sid); + from_wire(M, cid); + + auto it = chanBySID.find(sid); + if(M.good() && it!=chanBySID.end()) { + { + auto& chan = it->second; + if(chan->cid!=cid) { + log_printf(connsetup, PLVL_DEBUG, "Client %s provides incorrect CID with DestroyChan sid=%d cid=%d!=%d '%s'\n", peerName.c_str(), + unsigned(sid), unsigned(chan->cid), unsigned(cid), chan->name.c_str()); + } + } + + auto n = chanByCID.erase(cid); + assert(n==1); + + chanBySID.erase(it); + assert(it->second.use_count()==1); // we only take transient refs on this thread + // ServerChannel is delete'd + + { + auto tx = bufferevent_get_output(bev.get()); + constexpr bool be = EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG; + EvOutBuf R(be, tx); + to_wire(R, Header{pva_app_msg::DestroyChan, pva_flags::Server, 8}); + to_wire(R, sid); + to_wire(R, cid); + } + + } else { + log_printf(connsetup, PLVL_DEBUG, "Client %s DestroyChan non-existant sid=%d cid=%d\n", peerName.c_str(), + unsigned(sid), unsigned(cid)); + } + + if(!M.good()) + bev.reset(); +} + } // namespace pvxsimpl diff --git a/src/serverconn.cpp b/src/serverconn.cpp index 62369d6..f6cea15 100644 --- a/src/serverconn.cpp +++ b/src/serverconn.cpp @@ -39,7 +39,7 @@ ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr * ,txBody(evbuffer_new()) ,nextSID(0) { - log_printf(connsetup, PLVL_DEBUG, "Client %s connects\n", peerName.c_str()); + log_printf(connio, PLVL_DEBUG, "Client %s connects\n", peerName.c_str()); bufferevent_setcb(bev.get(), &bevReadS, &bevWriteS, &bevEventS, this); // initially wait for at least a header @@ -174,156 +174,6 @@ void ServerConn::handle_AuthZ() void ServerConn::handle_Search() {} -void ServerConn::handle_CreateChan() -{ - const bool be = EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG; - - EvInBuf M(peerBE, segBuf.get(), 16); - - auto G(iface->server->sourcesLock.lockReader()); - - uint16_t count = 0; - from_wire(M, count); - for(auto i : range(count)) { - (void)i; - uint32_t cid = -1, sid = -1; - server::Source::Create op{peerName}; - from_wire(M, cid); - from_wire(M, op.name); - - if(!M.good()) - break; - - Status sts{Status::Ok}; - - bool claimed = false; - try { - if(chanByCID.size()==0xffffffff || chanBySID.size()==0xffffffff) { - sts.code = Status::Error; - sts.msg = "Too many Server channels"; - sts.trace = "pvx:serv:chanidoverflow:"; - } - - if(sts.isSuccess() && chanByCID.find(cid)!=chanByCID.end()) { - sts.code = Status::Fatal; - sts.msg = "Client reuses existing CID"; - sts.trace = "pvx:serv:dupcid:"; - } - - std::unique_ptr handler; - if(sts.isSuccess() && !op.name.empty()) { - for(auto& pair : iface->server->sources) { - try { - handler = pair.second->onCreate(op); - if(handler) - break; - }catch(std::exception& e){ - log_printf(connsetup, PLVL_ERR, "Client %s Unhandled error in onCreate %s,%d %s : %s\n", peerName.c_str(), - pair.first.second.c_str(), pair.first.first, - typeid(&e).name(), e.what()); - } - } - } - - if(sts.isSuccess() && handler) { - do { - sid = nextSID++; - } while(chanBySID.find(sid)!=chanBySID.end()); - - auto pair = chanBySID.emplace(std::piecewise_construct, - std::make_tuple(sid), - std::make_tuple(this, sid, cid, op.name, std::move(handler))); - auto pair2 = chanByCID.emplace(cid, &pair.first->second); - assert(!!pair.second && !!pair2.second); // we've already checked for a duplicate - claimed = true; - } - }catch(std::exception& e){ - log_printf(connsetup, PLVL_ERR, "Client %s Unhandled error in onCreate %s : %s\n", peerName.c_str(), - typeid(&e).name(), e.what()); - sts.code = Status::Fatal; - sts.msg = e.what(); - sts.trace = "pvx:serv:internal:"; - } - - { - if(sts.isSuccess() && !claimed) { - sts.code = Status::Fatal; - sts.msg = "Unable to create Channel"; - sts.trace = "pvx:serv:nosource:"; - } - - (void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get())); - - EvOutBuf R(be, txBody.get()); - to_wire(R, cid); - to_wire(R, sid); - to_wire(R, sts); - // "spec" calls for uint16_t Access Rights here, but pvAccessCPP don't include this (it's useless anyway) - if(!R.good()) { - M.fault(); - log_printf(connio, PLVL_ERR, "Client %s Encode error in CreateChan\n", peerName.c_str()); - break; - } - } - - auto tx = bufferevent_get_output(bev.get()); - to_evbuf(tx, Header{pva_app_msg::CreateChan, - pva_flags::Server, - uint32_t(evbuffer_get_length(txBody.get()))}, - be); - auto err = evbuffer_add_buffer(tx, txBody.get()); - assert(!err); - } - - if(!M.good()) { - log_printf(connio, PLVL_ERR, "Client %s Decode error in CreateChan\n", peerName.c_str()); - bev.reset(); - } -} - -void ServerConn::handle_DestroyChan() -{ - EvInBuf M(peerBE, segBuf.get(), 16); - - uint32_t sid=-1, cid=-1; - - from_wire(M, sid); - from_wire(M, cid); - - auto it = chanBySID.find(sid); - if(M.good() && it!=chanBySID.end()) { - auto& chan = it->second; - - if(chan.cid!=cid) { - log_printf(connsetup, PLVL_DEBUG, "Client %s provides incorrect CID with DestroyChan sid=%d cid=%d!=%d '%s'\n", peerName.c_str(), - unsigned(sid), unsigned(chan.cid), unsigned(cid), chan.name.c_str()); - } - - auto n = chanByCID.erase(chan.cid); - assert(n==1); - - chanBySID.erase(it); - // ServerChannel is delete'd - - { - auto tx = bufferevent_get_output(bev.get()); - constexpr bool be = EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG; - EvOutBuf R(be, tx); - to_wire(R, Header{pva_app_msg::DestroyChan, pva_flags::Server, 8}); - // yes, CID and SID really are reversed from from the Request - to_wire(R, cid); - to_wire(R, sid); - } - - } else { - log_printf(connsetup, PLVL_DEBUG, "Client %s DestroyChan non-existant sid=%d cid=%d\n", peerName.c_str(), - unsigned(sid), unsigned(cid)); - } - - if(!M.good()) - bev.reset(); -} - void ServerConn::handle_GetOp() {} @@ -343,7 +193,30 @@ void ServerConn::handle_DestroyOp() {} void ServerConn::handle_Introspect() -{} +{ + // aka. GetField + + EvInBuf M(peerBE, segBuf.get(), 16); + + uint32_t sid = -1, ioid = -1; + std::string subfield; + + from_wire(M, sid); + from_wire(M, ioid); + from_wire(M, subfield); + Status sts{Status::Ok}; + + auto it = chanBySID.find(sid); + + if(M.good() && it!=chanBySID.end() && opByIOID.find(ioid)==opByIOID.end()) { + + } else { + log_printf(connio, PLVL_DEBUG, "Client %s invalid GetField\n", peerName.c_str()); + } + + if(!M.good()) + bev.reset(); +} void ServerConn::handle_Message() {} @@ -353,16 +226,13 @@ void ServerConn::cleanup() { log_printf(connsetup, PLVL_DEBUG, "Client %s Cleanup TCP Connection\n", peerName.c_str()); - // remove myself from connections list - decltype (iface->connections) trash; - for (auto it = iface->connections.begin(), end = iface->connections.end(); it!=end; ++it) { - if((&*it)==this) { - trash.splice(trash.end(), iface->connections, it); - break; - } + auto it = iface->server->connections.find(this); + if(it!=iface->server->connections.end()) { + auto self = std::move(it->second); + iface->server->connections.erase(it); + + // delete this } - assert(!trash.empty()); - // delete this } void ServerConn::bevEvent(short events) @@ -582,11 +452,14 @@ void ServIface::onConnS(struct evconnlistener *listener, evutil_socket_t sock, s evutil_closesocket(sock); return; } - self->connections.emplace_back(self, sock, peer, socklen); + std::shared_ptr conn(new ServerConn(self, sock, peer, socklen)); + self->server->connections[conn.get()] = std::move(conn); }catch(std::exception& e){ log_printf(connsetup, PLVL_CRIT, "Interface %s Unhandled error in accept callback: %s\n", self->name.c_str(), e.what()); evutil_closesocket(sock); } } +ServerOp::~ServerOp() {} + } // namespace pvxsimpl diff --git a/src/serverconn.h b/src/serverconn.h index 5920c83..77ff551 100644 --- a/src/serverconn.h +++ b/src/serverconn.h @@ -9,6 +9,7 @@ #include #include +#include #include @@ -22,23 +23,58 @@ namespace pvxsimpl { struct ServIface; struct ServerConn; struct ServerChan; +struct ServerChan; + +struct ServerOp +{ + ServerChan* const chan; + + const uint32_t ioid; + + enum state_t { + Idle, + Active, + Dead, + } state; + + constexpr ServerOp(ServerChan *chan, uint32_t ioid) :chan(chan), ioid(ioid), state(Idle) {} + virtual ~ServerOp() =0; +}; + +struct ServerChannelControl : public server::ChannelControl +{ + explicit ServerChannelControl(const std::shared_ptr& conn, const std::shared_ptr& chan); + virtual ~ServerChannelControl(); + + virtual std::shared_ptr setHandler(const std::shared_ptr &h) override final; + virtual void close() override final; + + const std::weak_ptr server; + const std::weak_ptr chan; +}; struct ServerChan { - ServerConn* const conn; + const std::weak_ptr conn; const uint32_t sid, cid; const std::string name; - std::unique_ptr handler; + enum { + Creating, + Active, + Destroy, + } state; - ServerChan(ServerConn* conn, uint32_t sid, uint32_t cid, const std::string& name, std::unique_ptr&& handler); + std::shared_ptr handler; + + ServerChan(const std::shared_ptr& conn, uint32_t sid, uint32_t cid, const std::string& name); ServerChan(const ServerChan&) = delete; ServerChan& operator=(const ServerChan&) = delete; ~ServerChan(); }; -struct ServerConn +struct ServerConn : public std::enable_shared_from_this { ServIface* const iface; @@ -55,8 +91,9 @@ struct ServerConn evbuf segBuf, txBody; uint32_t nextSID; - std::map chanBySID; - std::map chanByCID; + std::map > chanBySID; + std::map > chanByCID; + std::map > opByIOID; ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *peer, int socklen); ServerConn(const ServerConn&) = delete; @@ -65,23 +102,23 @@ struct ServerConn private: #define CASE(Op) void handle_##Op(); - CASE(Echo); - CASE(ConnValid); - CASE(Search); - CASE(AuthZ); + CASE(Echo); + CASE(ConnValid); + CASE(Search); + CASE(AuthZ); - CASE(CreateChan); - CASE(DestroyChan); + CASE(CreateChan); + CASE(DestroyChan); - CASE(GetOp); - CASE(PutOp); - CASE(PutGetOp); - CASE(RPCOp); - CASE(CancelOp); - CASE(DestroyOp); - CASE(Introspect); + CASE(GetOp); + CASE(PutOp); + CASE(PutGetOp); + CASE(RPCOp); + CASE(CancelOp); + CASE(DestroyOp); + CASE(Introspect); - CASE(Message); + CASE(Message); #undef CASE void cleanup(); @@ -103,8 +140,6 @@ struct ServIface evsocket sock; evlisten listener; - std::list connections; - ServIface(const std::string& addr, unsigned short port, server::Server::Pvt *server); static void onConnS(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *peer, int socklen, void *raw); @@ -118,6 +153,8 @@ using namespace pvxsimpl; struct Server::Pvt { + std::weak_ptr internal_self; + // "const" after ctor Config effective; @@ -126,13 +163,10 @@ struct Server::Pvt std::vector beaconMsg; std::list > listeners; - std::list interfaces; - std::vector beaconDest; - // handlers for active TCP connections, by priority. - // once added, these remain stable for the lifetime of the Server - std::map prio_loops; + std::list interfaces; + std::map > connections; // handle server "background" tasks. // accept new connections and send beacons @@ -148,7 +182,7 @@ struct Server::Pvt RWLock sourcesLock; std::map, std::shared_ptr > sources; - enum { + enum state_t { Stopped, Starting, Running, diff --git a/test/dummyserv.cpp b/test/dummyserv.cpp index 64fc892..93aebbc 100644 --- a/test/dummyserv.cpp +++ b/test/dummyserv.cpp @@ -43,10 +43,10 @@ public: } } } - virtual std::unique_ptr onCreate(const Create &op) override final + virtual void onCreate(std::unique_ptr&& op) override final { - log_printf(dummy, PLVL_INFO, "Create '%s'\n", op.name.c_str()); - return std::unique_ptr{new DummyHandler}; + log_printf(dummy, PLVL_INFO, "Create '%s'\n", op->name.c_str()); + op->setHandler(std::unique_ptr{new DummyHandler}); } };