add server to client remote logging

This commit is contained in:
Michael Davidsaver
2025-01-31 20:38:30 -08:00
parent 839fc01bfd
commit a372d936e8
10 changed files with 125 additions and 17 deletions
+8 -13
View File
@@ -459,23 +459,18 @@ void Connection::handle_MESSAGE()
if(!M.good())
throw std::runtime_error(SB()<<M.file()<<':'<<M.line()<<" Decode error for Message");
auto it = opByIOID.find(ioid);
if(it==opByIOID.end()) {
log_debug_printf(connsetup, "Server %s Message on non-existent ioid\n", peerName.c_str());
return;
}
auto op = it->second.handle.lock();
auto lvl(mtype2level(mtype));
const char *chan = "<no channel>";
Level lvl;
switch(mtype) {
case 0: lvl = Level::Info; break;
case 1: lvl = Level::Warn; break;
case 2: lvl = Level::Err; break;
default: lvl = Level::Crit; break;
auto it = opByIOID.find(ioid);
if(it!=opByIOID.end()) {
if(auto op = it->second.handle.lock()) {
chan = op->chan->name.c_str();
}
}
log_printf(remote, lvl, "%s : %s\n",
op && op->chan ? op->chan->name.c_str() : "<dead>", msg.c_str());
chan, msg.c_str());
}
void Connection::tickEcho()
+27
View File
@@ -22,6 +22,7 @@
#include <event2/buffer.h>
#include <pvxs/version.h>
#include <pvxs/sharedArray.h>
#include <pvxs/log.h>
#include "utilpvt.h"
namespace pvxs {namespace impl {
@@ -697,6 +698,32 @@ void from_wire(Buf& buf, Header& H)
}
}
// mapping between CMD_MESSAGE mtype and logging Level
inline
Level mtype2level(uint8_t mtype)
{
switch(mtype) {
case 0: return Level::Info;
case 1: return Level::Warn;
case 2: return Level::Err;
default: return Level::Crit;
}
}
inline
uint8_t level2mtype(Level lvl)
{
switch(lvl) {
case Level::Debug:
case Level::Info: return 0;
case Level::Warn: return 1;
case Level::Err: return 2;
case Level::Crit: return 3;
}
return 3;
}
}} // namespace pvxs::impl
#endif // PVAPROTO_H
+3 -3
View File
@@ -20,7 +20,7 @@ struct ServerConn;
namespace server {
//! Handle when an operation is being setup
struct PVXS_API ConnectOp : public OpBase {
struct PVXS_API ConnectOp : public OpBase, public RemoteLogger {
protected:
Value _pvRequest;
public:
@@ -72,7 +72,7 @@ struct MonitorStat {
};
//! Handle for active subscription
struct PVXS_API MonitorControlOp : public OpBase {
struct PVXS_API MonitorControlOp : public OpBase, public RemoteLogger {
MonitorControlOp(const std::string& name,
const std::shared_ptr<const ClientCredentials>& cred, op_t op)
:OpBase(name, cred, op)
@@ -134,7 +134,7 @@ public:
};
//! Handle for subscription which is being setup
struct PVXS_API MonitorSetupOp : public OpBase {
struct PVXS_API MonitorSetupOp : public OpBase, public RemoteLogger {
protected:
Value _pvRequest;
public:
+9 -1
View File
@@ -20,6 +20,7 @@
#include <pvxs/data.h>
namespace pvxs {
enum struct Level;
namespace server {
/** Credentials presented by a client.
@@ -91,9 +92,16 @@ public:
OpBase& operator=(const OpBase&) = delete;
virtual ~OpBase() =0;
};
struct PVXS_API RemoteLogger {
virtual ~RemoteLogger() =0;
//! Request log message to peer
//! @since UNRELEASED
virtual void logRemote(Level lvl, const std::string& msg) =0;
};
//! Handle when an operation is being executed
struct PVXS_API ExecOp : public OpBase {
struct PVXS_API ExecOp : public OpBase, public RemoteLogger {
//! Issue a reply without data. (eg. to complete a PUT)
virtual void reply() =0;
//! Issue a reply with data. For a GET or RPC (or PUT/Get)
+1
View File
@@ -846,6 +846,7 @@ void Source::show(std::ostream& strm)
}
OpBase::~OpBase() {}
RemoteLogger::~RemoteLogger() {}
ChannelControl::~ChannelControl() {}
+17
View File
@@ -140,6 +140,23 @@ const std::shared_ptr<ServerChan>& ServerConn::lookupSID(uint32_t sid)
return it->second;
}
void ServerConn::logRemote(uint32_t ioid, Level lvl, const std::string& msg)
{
// TODO: respect TX throttle
if(!connection())
return;
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));
EvOutBuf R(sendBE, txBody.get());
to_wire(R, ioid);
to_wire(R, level2mtype(lvl));
to_wire(R, msg);
}
enqueueTxBody(CMD_MESSAGE);
}
void ServerConn::handle_ECHO()
{
// Client requests echo as a keep-alive check
+23
View File
@@ -57,6 +57,27 @@ struct ServerOp
virtual void show(std::ostream& strm) const =0;
};
// helper for implementing OpBase::logRemote
template<typename Control>
void doLogRemote(Control *self, Level lvl, const std::string& msg)
{
auto serv = self->server.lock();
if(!serv)
return;
std::string m(msg); // copy for bind
auto wop(self->op); // copy weak for bind
serv->acceptor_loop.dispatch([wop, lvl, m](){
// op, chan, or conn may be dead or dissociated by this point...
if(auto oper = wop.lock()) {
if(auto chan = oper->chan.lock()) {
if(auto conn = chan->conn.lock()) {
conn->logRemote(oper->ioid, lvl, m);
}
}
}
});
}
struct ServerChannelControl : public server::ChannelControl
{
ServerChannelControl(const std::shared_ptr<ServerConn>& conn, const std::shared_ptr<ServerChan>& chan);
@@ -132,6 +153,8 @@ struct ServerConn final : public ConnBase, public std::enable_shared_from_this<S
const std::shared_ptr<ServerChan>& lookupSID(uint32_t sid);
void logRemote(uint32_t ioid, Level lvl, const std::string& msg);
private:
#define CASE(Op) virtual void handle_##Op() override final;
CASE(ECHO);
+10
View File
@@ -250,6 +250,11 @@ struct ServerGPRConnect : public server::ConnectOp
});
}
virtual void logRemote(Level lvl, const std::string& msg) override final
{
doLogRemote(this, lvl, msg);
}
const std::weak_ptr<server::Server::Pvt> server;
const std::weak_ptr<ServerGPR> op;
@@ -324,6 +329,11 @@ struct ServerGPRExec : public server::ExecOp
return Timer::Pvt::buildOneShot(delay, serv->acceptor_loop.internal(), std::move(fn));
}
virtual void logRemote(Level lvl, const std::string& msg) override final
{
doLogRemote(this, lvl, msg);
}
const std::weak_ptr<server::Server::Pvt> server;
const std::weak_ptr<ServerGPR> op;
+17
View File
@@ -113,6 +113,23 @@ struct ServerIntrospectControl : public server::ConnectOp
virtual void onGet(std::function<void(std::unique_ptr<server::ExecOp>&& fn)>&& fn) override final {}
virtual void onPut(std::function<void(std::unique_ptr<server::ExecOp>&& fn, Value&&)>&& fn) override final {}
virtual void logRemote(Level lvl, const std::string& msg) override final
{
auto serv = server.lock();
if(!serv)
return;
std::string m(msg); // copy for bind
serv->acceptor_loop.dispatch([this, lvl, m](){
if(auto oper = op.lock()) {
if(auto chan = oper->chan.lock()) {
if(auto conn = chan->conn.lock()) {
conn->logRemote(oper->ioid, lvl, m);
}
}
}
});
}
const std::weak_ptr<server::Server::Pvt> server;
const std::weak_ptr<ServerIntrospect> op;
+10
View File
@@ -355,6 +355,11 @@ struct ServerMonitorControl : public server::MonitorControlOp
});
}
virtual void logRemote(Level lvl, const std::string& msg) override final
{
doLogRemote(this, lvl, msg);
}
const std::weak_ptr<server::Server::Pvt> server;
const std::weak_ptr<MonitorOp> op;
@@ -432,6 +437,11 @@ struct ServerMonitorSetup : public server::MonitorSetupOp
});
}
virtual void logRemote(Level lvl, const std::string& msg) override final
{
doLogRemote(this, lvl, msg);
}
const std::weak_ptr<server::Server::Pvt> server;
const std::weak_ptr<MonitorOp> op;