client monitor cleanup and logging
This commit is contained in:
+34
-16
@@ -19,7 +19,7 @@ namespace client {
|
||||
|
||||
typedef epicsGuard<epicsMutex> Guard;
|
||||
|
||||
DEFINE_LOGGER(setup, "pvxs.client.setup");
|
||||
DEFINE_LOGGER(monevt, "pvxs.client.monitor");
|
||||
DEFINE_LOGGER(io, "pvxs.client.io");
|
||||
|
||||
namespace {
|
||||
@@ -34,6 +34,9 @@ struct Entry {
|
||||
|
||||
struct SubscriptionImpl : public OperationBase, public Subscription
|
||||
{
|
||||
// for use in log messages, event after cancel()
|
||||
const std::string channelName;
|
||||
|
||||
evevent ackTick;
|
||||
|
||||
// const after exec()
|
||||
@@ -63,6 +66,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription
|
||||
|
||||
SubscriptionImpl(operation_t op, const std::shared_ptr<Channel>& chan)
|
||||
:OperationBase (op, chan)
|
||||
,channelName(chan->name)
|
||||
,ackTick(event_new(chan->context->tcp_loop.base, -1, EV_TIMEOUT, &tickAckS, this))
|
||||
{}
|
||||
virtual ~SubscriptionImpl() {
|
||||
@@ -71,7 +75,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription
|
||||
|
||||
void notify()
|
||||
{
|
||||
log_info_printf(io, "Server %s channel %s monitor notify\n",
|
||||
log_info_printf(monevt, "Server %s channel '%s' monitor notify\n",
|
||||
chan->conn ? chan->conn->peerName.c_str() : "<disconnected>",
|
||||
chan->name.c_str());
|
||||
if(event) {
|
||||
@@ -135,16 +139,24 @@ struct SubscriptionImpl : public OperationBase, public Subscription
|
||||
|
||||
if(unack==0u || unack>=ackAt) {
|
||||
if(event_add(ackTick.get(), &tick))
|
||||
log_err_printf(io, "Monitor '%s' unable to schedule ack\n", chan->name.c_str());
|
||||
log_err_printf(io, "Monitor '%s' unable to schedule ack\n", channelName.c_str());
|
||||
}
|
||||
|
||||
unack++;
|
||||
}
|
||||
|
||||
log_info_printf(monevt, "channel '%s' monitor pop() %s\n",
|
||||
channelName.c_str(),
|
||||
ent.exc ? "exception" : ent.val ? "data" : "null!");
|
||||
|
||||
if(ent.exc)
|
||||
std::rethrow_exception(ent.exc);
|
||||
else
|
||||
ret = std::move(ent.val);
|
||||
|
||||
} else {
|
||||
log_info_printf(monevt, "channel '%s' monitor pop() empty\n",
|
||||
channelName.c_str());
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@@ -217,17 +229,17 @@ struct SubscriptionImpl : public OperationBase, public Subscription
|
||||
state = Creating;
|
||||
|
||||
bool empty = false;
|
||||
if(!maskConn || pipeline) {
|
||||
Guard G(lock);
|
||||
if(!maskConn || pipeline) {
|
||||
Guard G(lock);
|
||||
|
||||
if(!maskConn) {
|
||||
empty = queue.empty();
|
||||
if(!maskConn) {
|
||||
empty = queue.empty();
|
||||
|
||||
queue.emplace_back(Entry(std::make_exception_ptr(Connected(conn->peerName))));
|
||||
queue.emplace_back(Entry(std::make_exception_ptr(Connected(conn->peerName))));
|
||||
}
|
||||
if(pipeline)
|
||||
window = queueSize;
|
||||
}
|
||||
if(pipeline)
|
||||
window = queueSize;
|
||||
}
|
||||
|
||||
if(empty)
|
||||
notify();
|
||||
@@ -251,12 +263,12 @@ struct SubscriptionImpl : public OperationBase, public Subscription
|
||||
// return to pending
|
||||
|
||||
bool empty = false;
|
||||
if(!maskDiscon) {
|
||||
Guard G(lock);
|
||||
empty = queue.empty();
|
||||
if(!maskDiscon) {
|
||||
Guard G(lock);
|
||||
empty = queue.empty();
|
||||
|
||||
queue.emplace_back(Entry(std::make_exception_ptr(Disconnect())));
|
||||
}
|
||||
queue.emplace_back(Entry(std::make_exception_ptr(Disconnect())));
|
||||
}
|
||||
|
||||
chan->pending.push_back(self);
|
||||
state = Connecting;
|
||||
@@ -465,6 +477,12 @@ void Connection::handle_MONITOR()
|
||||
|
||||
mon->queue.emplace_back(std::make_exception_ptr(Finished()));
|
||||
}
|
||||
|
||||
if(mon->queue.empty()) {
|
||||
log_err_printf(io, "Server %s channel '%s' MONITOR empty update!\n",
|
||||
peerName.c_str(), mon->chan->name.c_str());
|
||||
notify = false;
|
||||
}
|
||||
}
|
||||
|
||||
if(mon->state==SubscriptionImpl::Done || final) {
|
||||
|
||||
Reference in New Issue
Block a user