client: reorg context Pvt

This commit is contained in:
Michael Davidsaver
2020-12-30 09:00:47 -08:00
parent bcc46f0389
commit c4d2cd4a10
8 changed files with 92 additions and 98 deletions
+48 -67
View File
@@ -69,7 +69,7 @@ Timeout::Timeout()
{}
Timeout::~Timeout() {}
Channel::Channel(const std::shared_ptr<Context::Pvt>& context, const std::string& name, uint32_t cid)
Channel::Channel(const std::shared_ptr<ContextImpl>& context, const std::string& name, uint32_t cid)
:context(context)
,name(name)
,cid(cid)
@@ -159,7 +159,9 @@ std::shared_ptr<Connect> ConnectBuilder::exec()
if(!ctx)
throw std::logic_error("NULL Builder");
auto op(std::make_shared<ConnectImpl>(ctx->tcp_loop, _pvname));
auto context(ctx->impl->shared_from_this());
auto op(std::make_shared<ConnectImpl>(context->tcp_loop, _pvname));
op->_onConn = std::move(_onConn);
op->_onDis = std::move(_onDis);
@@ -179,7 +181,6 @@ std::shared_ptr<Connect> ConnectBuilder::exec()
op.reset();
});
auto context(ctx->shared_from_this());
context->tcp_loop.dispatch([op, context]() {
// on worker
@@ -255,7 +256,7 @@ RequestInfo::RequestInfo(uint32_t sid, uint32_t ioid, std::shared_ptr<OperationB
,handle(handle)
{}
std::shared_ptr<Channel> Channel::build(const std::shared_ptr<Context::Pvt>& context, const std::string& name)
std::shared_ptr<Channel> Channel::build(const std::shared_ptr<ContextImpl>& context, const std::string& name)
{
std::shared_ptr<Channel> chan;
@@ -287,38 +288,8 @@ Operation::~Operation() {}
Subscription::~Subscription() {}
Context::Context(const Config& conf)
{
/* Here be dragons.
*
* We keep two different ref. counters.
* - "externel" counter which keeps a server running.
* - "internal" which only keeps server storage from being destroyed.
*
* External refs are held as Server::pvt. Internal refs are
* held by various in-progress operations (OpBase sub-classes)
* Which need to safely access server storage, but should not
* prevent a server from stopping.
*/
auto internal(std::make_shared<Pvt>(conf));
internal->internal_self = internal;
cnt_ClientPvtLive.fetch_add(1u);
// external
pvt.reset(internal.get(), [internal](Pvt*) mutable {
auto temp(std::move(internal));
try {
temp->close();
}catch(std::exception& e){
// called through ~shared_ptr and can't propagate exceptions.
// log and continue...
log_exc_printf(setup, "Error while closing Context (%s) : %s\n",
typeid(e).name(), e.what());
}
cnt_ClientPvtLive.fetch_sub(1u);
});
// we don't keep a weak_ptr to the external reference.
// Caller is entirely responsible for keeping this server running
}
:pvt(std::make_shared<Pvt>(conf))
{}
Context::~Context() {}
@@ -327,7 +298,7 @@ const Config& Context::config() const
if(!pvt)
throw std::logic_error("NULL Context");
return pvt->effective;
return pvt->impl->effective;
}
void Context::hurryUp()
@@ -335,8 +306,8 @@ void Context::hurryUp()
if(!pvt)
throw std::logic_error("NULL Context");
pvt->manager.loop().call([this](){
pvt->poke(true);
pvt->impl->manager.loop().call([this](){
pvt->impl->poke(true);
});
}
@@ -345,10 +316,10 @@ void Context::cacheClear()
if(!pvt)
throw std::logic_error("NULL Context");
pvt->tcp_loop.call([this](){
pvt->impl->tcp_loop.call([this](){
// run twice to ensure both mark and sweep of all unused channels
pvt->cacheClean();
pvt->cacheClean();
pvt->impl->cacheClean();
pvt->impl->cacheClean();
});
}
@@ -356,9 +327,9 @@ Report Context::report() const
{
Report ret;
pvt->tcp_loop.call([this, &ret](){
pvt->impl->tcp_loop.call([this, &ret](){
for(auto& pair : pvt->connByAddr) {
for(auto& pair : pvt->impl->connByAddr) {
auto conn = pair.second.lock();
if(!conn)
continue;
@@ -400,16 +371,16 @@ Value buildCAMethod()
}).create();
}
Context::Pvt::Pvt(const Config& conf)
ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop)
:effective(conf)
,caMethod(buildCAMethod())
,searchTx(AF_INET, SOCK_DGRAM, 0)
,tcp_loop("PVXCTCP", epicsThreadPriorityCAServerLow)
,searchRx(event_new(tcp_loop.base, searchTx.sock, EV_READ|EV_PERSIST, &Pvt::onSearchS, this))
,searchTimer(event_new(tcp_loop.base, -1, EV_TIMEOUT, &Pvt::tickSearchS, this))
,tcp_loop(tcp_loop)
,searchRx(event_new(tcp_loop.base, searchTx.sock, EV_READ|EV_PERSIST, &ContextImpl::onSearchS, this))
,searchTimer(event_new(tcp_loop.base, -1, EV_TIMEOUT, &ContextImpl::tickSearchS, this))
,manager(UDPManager::instance())
,beaconCleaner(event_new(manager.loop().base, -1, EV_TIMEOUT|EV_PERSIST, &Pvt::tickBeaconCleanS, this))
,cacheCleaner(event_new(tcp_loop.base, -1, EV_TIMEOUT|EV_PERSIST, &Pvt::cacheCleanS, this))
,beaconCleaner(event_new(manager.loop().base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::tickBeaconCleanS, this))
,cacheCleaner(event_new(tcp_loop.base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::cacheCleanS, this))
{
effective.expand();
@@ -480,9 +451,9 @@ Context::Pvt::Pvt(const Config& conf)
log_err_printf(setup, "Error enabling channel cache clean timer on\n%s", "");
}
Context::Pvt::~Pvt() {}
ContextImpl::~ContextImpl() {}
void Context::Pvt::close()
void ContextImpl::close()
{
// terminate all active connections
tcp_loop.call([this]() {
@@ -516,7 +487,7 @@ void Context::Pvt::close()
manager.sync();
}
void Context::Pvt::poke(bool force)
void ContextImpl::poke(bool force)
{
{
Guard G(pokeLock);
@@ -541,7 +512,7 @@ void Context::Pvt::poke(bool force)
throw std::runtime_error("Unable to schedule searchTimer");
}
void Context::Pvt::onBeacon(const UDPManager::Beacon& msg)
void ContextImpl::onBeacon(const UDPManager::Beacon& msg)
{
const auto& guid = msg.guid;
@@ -565,7 +536,7 @@ void Context::Pvt::onBeacon(const UDPManager::Beacon& msg)
poke(false);
}
bool Context::Pvt::onSearch()
bool ContextImpl::onSearch()
{
searchMsg.resize(0x10000);
SockAddr src;
@@ -680,7 +651,7 @@ bool Context::Pvt::onSearch()
auto it = connByAddr.find(serv);
if(it==connByAddr.end() || !(chan->conn = it->second.lock())) {
connByAddr[serv] = chan->conn = std::make_shared<Connection>(internal_self.lock(), serv);
connByAddr[serv] = chan->conn = std::make_shared<Connection>(shared_from_this(), serv);
}
chan->conn->pending.push_back(chan);
@@ -709,7 +680,7 @@ bool Context::Pvt::onSearch()
return true;
}
void Context::Pvt::onSearchS(evutil_socket_t fd, short evt, void *raw)
void ContextImpl::onSearchS(evutil_socket_t fd, short evt, void *raw)
{
try {
log_debug_printf(io, "UDP search Rx event %x\n", evt);
@@ -719,7 +690,7 @@ void Context::Pvt::onSearchS(evutil_socket_t fd, short evt, void *raw)
// limit number of packets processed before going back to the reactor
unsigned i;
const unsigned limit = 40;
for(i=0; i<limit && static_cast<Pvt*>(raw)->onSearch(); i++) {}
for(i=0; i<limit && static_cast<ContextImpl*>(raw)->onSearch(); i++) {}
log_debug_printf(io, "UDP search processed %u/%u\n", i, limit);
}catch(std::exception& e){
@@ -727,7 +698,7 @@ void Context::Pvt::onSearchS(evutil_socket_t fd, short evt, void *raw)
}
}
void Context::Pvt::tickSearch()
void ContextImpl::tickSearch()
{
{
Guard G(pokeLock);
@@ -862,16 +833,16 @@ void Context::Pvt::tickSearch()
log_err_printf(setup, "Error re-enabling search timer on\n%s", "");
}
void Context::Pvt::tickSearchS(evutil_socket_t fd, short evt, void *raw)
void ContextImpl::tickSearchS(evutil_socket_t fd, short evt, void *raw)
{
try {
static_cast<Pvt*>(raw)->tickSearch();
static_cast<ContextImpl*>(raw)->tickSearch();
}catch(std::exception& e){
log_exc_printf(io, "Unhandled error in search timer callback: %s\n", e.what());
}
}
void Context::Pvt::tickBeaconClean()
void ContextImpl::tickBeaconClean()
{
epicsTimeStamp now;
epicsTimeGetCurrent(&now);
@@ -894,16 +865,16 @@ void Context::Pvt::tickBeaconClean()
}
}
void Context::Pvt::tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw)
void ContextImpl::tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw)
{
try {
static_cast<Pvt*>(raw)->tickBeaconClean();
static_cast<ContextImpl*>(raw)->tickBeaconClean();
}catch(std::exception& e){
log_exc_printf(io, "Unhandled error in beacon cleaner timer callback: %s\n", e.what());
}
}
void Context::Pvt::cacheClean()
void ContextImpl::cacheClean()
{
std::set<std::string> trash;
@@ -928,15 +899,25 @@ void Context::Pvt::cacheClean()
}
}
void Context::Pvt::cacheCleanS(evutil_socket_t fd, short evt, void *raw)
void ContextImpl::cacheCleanS(evutil_socket_t fd, short evt, void *raw)
{
try {
static_cast<Pvt*>(raw)->tickBeaconClean();
static_cast<ContextImpl*>(raw)->tickBeaconClean();
}catch(std::exception& e){
log_exc_printf(io, "Unhandled error in beacon cleaner timer callback: %s\n", e.what());
}
}
Context::Pvt::Pvt(const Config& conf)
:loop("PVXCTCP", epicsThreadPriorityCAServerLow)
,impl(std::make_shared<ContextImpl>(conf, loop))
{}
Context::Pvt::~Pvt()
{
impl->close();
}
} // namespace client
} // namespace pvxs
+1 -1
View File
@@ -14,7 +14,7 @@ namespace client {
DEFINE_LOGGER(io, "pvxs.client.io");
Connection::Connection(const std::shared_ptr<Context::Pvt>& context, const SockAddr& peerAddr)
Connection::Connection(const std::shared_ptr<ContextImpl>& context, const SockAddr& peerAddr)
:ConnBase (true,
bufferevent_socket_new(context->tcp_loop.base, -1, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS),
peerAddr)
+13 -7
View File
@@ -528,7 +528,7 @@ void Connection::handle_PUT() { handle_GPR(CMD_PUT); }
void Connection::handle_RPC() { handle_GPR(CMD_RPC); }
static
std::shared_ptr<Operation> gpr_setup(const std::shared_ptr<Context::Pvt>& context,
std::shared_ptr<Operation> gpr_setup(const std::shared_ptr<ContextImpl>& context,
std::string name, // need to capture by value
const std::shared_ptr<GPROp>& op)
{
@@ -566,12 +566,14 @@ std::shared_ptr<Operation> GetBuilder::_exec_get()
if(!ctx)
throw std::logic_error("NULL Builder");
auto op(std::make_shared<GPROp>(Operation::Get, ctx->tcp_loop));
auto context(ctx->impl->shared_from_this());
auto op(std::make_shared<GPROp>(Operation::Get, context->tcp_loop));
op->setDone(std::move(_result), std::move(_onInit));
op->autoExec = _autoexec;
op->pvRequest = _buildReq();
return gpr_setup(ctx->shared_from_this(), _name, op);
return gpr_setup(context, _name, op);
}
std::shared_ptr<Operation> PutBuilder::exec()
@@ -579,7 +581,9 @@ std::shared_ptr<Operation> PutBuilder::exec()
if(!ctx)
throw std::logic_error("NULL Builder");
auto op(std::make_shared<GPROp>(Operation::Put, ctx->tcp_loop));
auto context(ctx->impl->shared_from_this());
auto op(std::make_shared<GPROp>(Operation::Put, context->tcp_loop));
op->setDone(std::move(_result), std::move(_onInit));
if(_builder) {
@@ -599,7 +603,7 @@ std::shared_ptr<Operation> PutBuilder::exec()
op->autoExec = _autoexec;
op->pvRequest = _buildReq();
return gpr_setup(ctx->shared_from_this(), _name, op);
return gpr_setup(context, _name, op);
}
std::shared_ptr<Operation> RPCBuilder::exec()
@@ -607,7 +611,9 @@ std::shared_ptr<Operation> RPCBuilder::exec()
if(!ctx)
throw std::logic_error("NULL Builder");
auto op(std::make_shared<GPROp>(Operation::RPC, ctx->tcp_loop));
auto context(ctx->impl->shared_from_this());
auto op(std::make_shared<GPROp>(Operation::RPC, context->tcp_loop));
op->setDone(std::move(_result), std::move(_onInit));
if(_argument) {
if(!_autoexec)
@@ -620,7 +626,7 @@ std::shared_ptr<Operation> RPCBuilder::exec()
op->autoExec = _autoexec;
op->pvRequest = _buildReq();
return gpr_setup(ctx->shared_from_this(), _name, op);
return gpr_setup(context, _name, op);
}
} // namespace client
+21 -16
View File
@@ -24,6 +24,7 @@ namespace pvxs {
namespace client {
struct Channel;
struct ContextImpl;
struct ResultWaiter {
epicsMutex lock;
@@ -72,7 +73,7 @@ struct RequestInfo {
};
struct Connection : public ConnBase, public std::enable_shared_from_this<Connection> {
const std::shared_ptr<Context::Pvt> context;
const std::shared_ptr<ContextImpl> context;
const evevent echoTimer;
@@ -91,7 +92,7 @@ struct Connection : public ConnBase, public std::enable_shared_from_this<Connect
INST_COUNTER(Connection);
Connection(const std::shared_ptr<Context::Pvt>& context, const SockAddr &peerAddr);
Connection(const std::shared_ptr<ContextImpl>& context, const SockAddr &peerAddr);
virtual ~Connection();
void createChannels();
@@ -145,7 +146,7 @@ struct ConnectImpl : public Connect
};
struct Channel {
const std::shared_ptr<Context::Pvt> context;
const std::shared_ptr<ContextImpl> context;
const std::string name;
// Our choosen ID for this channel.
// used as persistent CID and searchID
@@ -181,26 +182,20 @@ struct Channel {
INST_COUNTER(Channel);
Channel(const std::shared_ptr<Context::Pvt>& context, const std::string& name, uint32_t cid);
Channel(const std::shared_ptr<ContextImpl>& context, const std::string& name, uint32_t cid);
~Channel();
void createOperations();
void disconnect(const std::shared_ptr<Channel>& self);
static
std::shared_ptr<Channel> build(const std::shared_ptr<Context::Pvt>& context, const std::string &name);
std::shared_ptr<Channel> build(const std::shared_ptr<ContextImpl>& context, const std::string &name);
};
struct Context::Pvt
struct ContextImpl : public std::enable_shared_from_this<ContextImpl>
{
SockAttach attach;
std::weak_ptr<Pvt> internal_self;
std::shared_ptr<Pvt> shared_from_this() {
std::shared_ptr<Pvt> ret(internal_self);
return ret;
}
// "const" after ctor
Config effective;
@@ -235,7 +230,7 @@ struct Context::Pvt
std::map<uint32_t, std::weak_ptr<Channel>> chanByCID;
// strong ref. loop through Channel::context
// explicitly broken by Context::close(), Context::cacheClear, or Context::Pvt::cacheClean()
// explicitly broken by Context::close(), Context::cacheClear, or ContextImpl::cacheClean()
std::map<std::string, std::shared_ptr<Channel>> chanByName;
std::map<SockAddr, std::weak_ptr<Connection>> connByAddr;
@@ -251,10 +246,10 @@ struct Context::Pvt
const evevent beaconCleaner;
const evevent cacheCleaner;
INST_COUNTER(ClientPvt);
INST_COUNTER(ClientContextImpl);
Pvt(const Config& conf);
~Pvt();
ContextImpl(const Config& conf, const evbase &tcp_loop);
~ContextImpl();
void close();
@@ -272,6 +267,16 @@ struct Context::Pvt
static void cacheCleanS(evutil_socket_t fd, short evt, void *raw);
};
struct Context::Pvt {
evbase loop;
std::shared_ptr<ContextImpl> impl;
INST_COUNTER(ClientPvt);
Pvt(const Config& conf);
~Pvt(); // I call ContextImpl::close()
};
} // namespace client
} // namespace pvxs
+3 -2
View File
@@ -180,7 +180,9 @@ std::shared_ptr<Operation> GetBuilder::_exec_info()
if(!ctx)
throw std::logic_error("NULL Builder");
auto op(std::make_shared<InfoOp>(ctx->tcp_loop));
auto context(ctx->impl->shared_from_this());
auto op(std::make_shared<InfoOp>(context->tcp_loop));
if(_result) {
op->done = std::move(_result);
} else {
@@ -207,7 +209,6 @@ std::shared_ptr<Operation> GetBuilder::_exec_info()
});
auto name(std::move(_name));
auto context(ctx->shared_from_this());
context->tcp_loop.dispatch([op, context, name]() {
// on worker
+3 -2
View File
@@ -552,7 +552,9 @@ std::shared_ptr<Subscription> MonitorBuilder::exec()
if(!ctx)
throw std::logic_error("NULL Builder");
auto op(std::make_shared<SubscriptionImpl>(ctx->tcp_loop));
auto context(ctx->impl->shared_from_this());
auto op(std::make_shared<SubscriptionImpl>(context->tcp_loop));
op->self = op;
op->event = std::move(_event);
op->onInit = std::move(_onInit);
@@ -619,7 +621,6 @@ std::shared_ptr<Subscription> MonitorBuilder::exec()
});
auto name(std::move(_name));
auto context(ctx->shared_from_this());
context->tcp_loop.dispatch([op, context, name]() {
// on worker
+2 -2
View File
@@ -67,7 +67,7 @@ CASE(GPROp);
CASE(Connection);
CASE(Channel);
CASE(ClientPvt);
CASE(ClientPvtLive);
CASE(ClientContextImpl);
CASE(InfoOp);
CASE(SubScriptionImpl);
@@ -104,7 +104,7 @@ CASE(GPROp);
CASE(Connection);
CASE(Channel);
CASE(ClientPvt);
CASE(ClientPvtLive);
CASE(ClientContextImpl);
CASE(InfoOp);
CASE(SubScriptionImpl);
+1 -1
View File
@@ -303,7 +303,7 @@ CASE(GPROp);
CASE(Connection);
CASE(Channel);
CASE(ClientPvt);
CASE(ClientPvtLive);
CASE(ClientContextImpl);
CASE(InfoOp);
CASE(SubScriptionImpl);