From c2a4224a2176b345e116bc3747d1a1e4a2ab6d08 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 16 Jan 2020 19:44:54 -0800 Subject: [PATCH] server monitor --- src/Makefile | 1 + src/evhelper.cpp | 23 ++- src/evhelper.h | 7 +- src/pvxs/server.h | 74 ++++++- src/server.cpp | 5 + src/serverchan.cpp | 15 ++ src/serverconn.cpp | 1 + src/serverconn.h | 11 +- src/serverget.cpp | 18 +- src/servermon.cpp | 476 +++++++++++++++++++++++++++++++++++++++++++++ test/Makefile | 4 + test/countdown.cpp | 139 +++++++++++++ 12 files changed, 761 insertions(+), 13 deletions(-) create mode 100644 src/servermon.cpp create mode 100644 test/countdown.cpp diff --git a/src/Makefile b/src/Makefile index 47f0092..29669a5 100644 --- a/src/Makefile +++ b/src/Makefile @@ -66,6 +66,7 @@ LIB_SRCS += serverconn.cpp LIB_SRCS += serverchan.cpp LIB_SRCS += serverintrospect.cpp LIB_SRCS += serverget.cpp +LIB_SRCS += servermon.cpp LIB_SRCS += serversource.cpp LIB_LIBS += Com diff --git a/src/evhelper.cpp b/src/evhelper.cpp index 477bc33..8865c45 100644 --- a/src/evhelper.cpp +++ b/src/evhelper.cpp @@ -64,7 +64,9 @@ struct evbase::Pvt : public epicsThreadRunable int ret = event_base_loop(base, EVLOOP_NO_EXIT_ON_EMPTY); - if(logerr.test(int(ret ? Level::Crit : Level::Info))) + // TODO: cleanup after pending event_base_once() + + if(logerr.test(ret ? Level::Crit : Level::Info)) errlogPrintf("Exit loop worker: %d for %p\n", ret, base); } }; @@ -132,6 +134,25 @@ void evbase::dispatch(std::function&& fn) } } + +// queue request to execute in event loop after at least delay seconds have passed +// @param delay second in future. must be finite and >=0 +void evbase::later(double delay, std::function&& fn) +{ + timeval tv; + tv.tv_sec = unsigned(delay); + tv.tv_usec = unsigned(delay*1e6)%1000000; + + std::unique_ptr > action(new std::function(std::move(fn))); + + if(event_base_once(base, -1, EV_TIMEOUT, &dispatch_action, action.get(), &tv)==0) { + // successfully queued. No longer my responsibility + action.release(); + } else { + throw std::runtime_error("Unable to queue dispatch()"); + } +} + namespace { struct action_args { std::function fn; diff --git a/src/evhelper.h b/src/evhelper.h index e67214f..0dfa98b 100644 --- a/src/evhelper.h +++ b/src/evhelper.h @@ -64,7 +64,12 @@ struct PVXS_API evbase { // queue request to execute in event loop. return immediately. void dispatch(std::function&& fn); - // queue request to execute in event loop. return after executed + + // queue request to execute in event loop after at least delay seconds have passed + // @param delay second in future. must be finite and >=0 + void later(double delay, std::function&& fn); + + // queue request to execute in event loop. return after executed. void call(std::function&& fn); void assertInLoop(); diff --git a/src/pvxs/server.h b/src/pvxs/server.h index 8eb06e9..e4f26c0 100644 --- a/src/pvxs/server.h +++ b/src/pvxs/server.h @@ -178,6 +178,76 @@ struct PVXS_API ConnectOp : public OpBase { virtual void onClose(std::function&&) =0; }; +//! Handle for active subscription +struct PVXS_API MonitorControlOp : public OpBase { + virtual ~MonitorControlOp(); + +protected: + virtual bool doPost(Value&& val, bool maybe, bool force) =0; +public: + + //! Add a new entry to the monitor queue. + //! If nFree()<=0 the output queue will be over-filled with this element. + //! Returns @code nFree()>0u @endcode + bool forcePost(Value&& val) { + return doPost(std::move(val), false, true); + } + + //! Add a new entry to the monitor queue. + //! If nFree()<=0 this element will be "squshed" to the last element in the queue + //! Returns @code nFree()>0u @endcode + bool post(Value&& val) { + return doPost(std::move(val), false, false); + } + + //! Add a new entry to the monitor queue. + //! If nFree()<=0 return false and take no other action + //! Returns @code nFree()>0u @endcode + bool tryPost(Value&& val) { + return doPost(std::move(val), true, false); + } + + //! Signal to subscriber that this subscription will not yield any further events. + //! This is not an error. Client should not retry. + void finish() { + doPost(Value(), false, false); + } + + //! Number of "free" elements in the output flow window. + //! May be negative if local queue is over-filled + //! Returns @code numeric_limits::max() @endcode if flow control not enabled by client. + virtual int32_t nFree() const =0; + + //! Maximum which may be returned by nFree() + //! Returns @code numeric_limits::max() @endcode if flow control not enabled by client. + virtual unsigned long long maxFree() const =0; + + //! Set flow control levels. + //! onLowMark callback will be invoked when nFree()<=low becomes true, and not again until it has been false. + //! onHighMark callback will be invoked when nFree()>high becomes true, and not again until it has been false. + virtual void setWatermarks(size_t low, size_t high) =0; + + //! Callback when client resumes/pauses updates + virtual void onStart(std::function&&) =0; + virtual void onHighMark(std::function&&) =0; + virtual void onLowMark(std::function&&) =0; +}; + +//! Handle for subscription which is being setup +struct PVXS_API MonitorSetupOp : public OpBase { + Value pvRequest; + + //! Inform peer of our data-type and acquire control of subscription queue. + //! The queue is initially stopped. + virtual std::unique_ptr connect(const Value& prototype) =0; + //! Indicate that this operation can not be setup + virtual void error(const std::string& msg) =0; + + virtual ~MonitorSetupOp(); + + virtual void onClose(std::function&&) =0; +}; + /** Manipulate an active Channel, and any in-progress Operations through it. * */ @@ -186,8 +256,10 @@ struct PVXS_API ChannelControl : public OpBase { //! Invoked when a new GET, PUT, or RPC Operation is requested through this Channel virtual void onOp(std::function&&)>&& ) =0; - //! Invoked when the a peer executes an RPC + //! Invoked when the peer executes an RPC virtual void onRPC(std::function&&, Value&&)>&& fn)=0; + //! Invoked when the peer create a new subscription + virtual void onSubscribe(std::function&&)>&&)=0; //! Callback when the channel closes (eg. peer disconnect) virtual void onClose(std::function&&) =0; diff --git a/src/server.cpp b/src/server.cpp index 9ca7a0e..430e218 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -511,6 +511,8 @@ void Server::Pvt::onSearch(const UDPManager::Search& msg) { // on UDPManager worker + log_printf(serverio, Debug, "%s searching\n", msg.src.tostring().c_str()); + searchOp._names.resize(msg.names.size()); for(auto i : range(msg.names.size())) { searchOp._names[i]._name = msg.names[i].name; @@ -633,4 +635,7 @@ ChannelControl::~ChannelControl() {} ConnectOp::~ConnectOp() {} ExecOp::~ExecOp() {} +MonitorControlOp::~MonitorControlOp() {} +MonitorSetupOp::~MonitorSetupOp() {} + }} // namespace pvxs::server diff --git a/src/serverchan.cpp b/src/serverchan.cpp index 7e5d00b..11a4b0f 100644 --- a/src/serverchan.cpp +++ b/src/serverchan.cpp @@ -74,6 +74,21 @@ void ServerChannelControl::onRPC(std::function&&)>&& fn) +{ + auto serv = server.lock(); + if(!serv) + return; + + serv->acceptor_loop.call([this, &fn](){ + auto ch = chan.lock(); + if(!ch) + return; + + ch->onSubscribe = std::move(fn); + }); +} + void ServerChannelControl::onClose(std::function&& fn) { auto serv = server.lock(); diff --git a/src/serverconn.cpp b/src/serverconn.cpp index a2ae29a..4453f85 100644 --- a/src/serverconn.cpp +++ b/src/serverconn.cpp @@ -417,6 +417,7 @@ void ServerConn::bevRead() CASE(GET); CASE(PUT); CASE(PUT_GET); + CASE(MONITOR); CASE(RPC); CASE(CANCEL_REQUEST); CASE(DESTROY_REQUEST); diff --git a/src/serverconn.h b/src/serverconn.h index 54957de..9e3710a 100644 --- a/src/serverconn.h +++ b/src/serverconn.h @@ -56,6 +56,7 @@ struct ServerChannelControl : public server::ChannelControl virtual void onOp(std::function&&)>&& fn) override final; virtual void onRPC(std::function&&, Value&&)>&& fn) override final; + virtual void onSubscribe(std::function&&)>&& fn) override final; virtual void onClose(std::function&& fn) override final; virtual void close() override final; @@ -79,6 +80,7 @@ struct ServerChan std::function&&)> onOp; std::function&&, Value&&)> onRPC; + std::function&&)> onSubscribe; std::function onClose; std::map > opByIOID; // our subset of ServerConn::opByIOID @@ -133,6 +135,7 @@ private: CASE(GET); CASE(PUT); CASE(PUT_GET); + CASE(MONITOR); CASE(RPC); CASE(CANCEL_REQUEST); CASE(DESTROY_REQUEST); @@ -197,6 +200,10 @@ struct Server::Pvt epicsEvent done; + // handle server "background" tasks. + // accept new connections and send beacons + evbase acceptor_loop; + std::vector beaconMsg; std::list > listeners; @@ -205,10 +212,6 @@ struct Server::Pvt std::list interfaces; std::map > connections; - // handle server "background" tasks. - // accept new connections and send beacons - evbase acceptor_loop; - evsocket beaconSender; evevent beaconTimer; diff --git a/src/serverget.cpp b/src/serverget.cpp index f365cc4..67ea14e 100644 --- a/src/serverget.cpp +++ b/src/serverget.cpp @@ -100,9 +100,10 @@ struct ServerGPR : public ServerOp auto self(it->second); conn->opByIOID.erase(it); - conn->iface->server->acceptor_loop.dispatch([self](){ - self->onClose(""); - }); + if(self->onClose) + conn->iface->server->acceptor_loop.dispatch([self](){ + self->onClose(""); + }); } else { assert(false); // really shouldn't happen @@ -336,15 +337,20 @@ void ServerConn::handle_GPR(pva_app_msg_t cmd) } else if(chan->onOp) { // GET, PUT chan->onOp(std::move(ctrl)); + + } else { + ctrl->error("Get/Put/RPC not implemented for this PV"); } } else { // EXEC, maybe Get or Put std::shared_ptr op; auto it = opByIOID.find(ioid); - if(it==opByIOID.end() || !(op=std::dynamic_pointer_cast(it->second))) { - log_printf(connio, Err, "Client %s Gets %s IOID %u\n", peerName.c_str(), - it==opByIOID.end() ? "non-existant" : "invalid", unsigned(ioid)); + if(it==opByIOID.end() || !(op=std::dynamic_pointer_cast(it->second)) + || op->state==ServerOp::Dead || op->state==ServerOp::Creating) { + log_printf(connio, Err, "Client %s Gets %s IOID %u state=%d\n", peerName.c_str(), + it==opByIOID.end() ? "non-existant" : "invalid", unsigned(ioid), + op ? op->state : ServerOp::Dead); bev.reset(); return; } diff --git a/src/servermon.cpp b/src/servermon.cpp new file mode 100644 index 0000000..90e53a8 --- /dev/null +++ b/src/servermon.cpp @@ -0,0 +1,476 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include + +#include + +#include +#include + +#include +#include "dataimpl.h" +#include "serverconn.h" + +namespace pvxs { namespace impl { +DEFINE_LOGGER(connsetup, "pvxs.tcp.setup"); +DEFINE_LOGGER(connio, "pvxs.tcp.io"); + +namespace { + +typedef epicsGuard Guard; + +struct MonitorOp : public ServerOp, + public std::enable_shared_from_this +{ + MonitorOp(const std::shared_ptr& chan, uint32_t ioid) + :ServerOp(chan, ioid) + {} + virtual ~MonitorOp() {} + + // only access from acceptor worker thread + std::function onStart; + std::function onLowMark; + std::function onHighMark; + std::function onClose; + + // const after setup phase + std::shared_ptr type; + std::string msg; + + // Further members can only be changed from the accepter worker thread with this lock held. + // They may be read from the worker, or if this lock is held. + mutable epicsMutex lock; + + // is doReply() scheduled to run + bool scheduled=false; + bool pipeline=false; + bool finished=false; + size_t window=0u, limit=1u; + size_t low=0u, high=0u; + + std::deque queue; + + // caller must hold lock. + // only used after State==Idle + static + void maybeReply(server::Server::Pvt* server, const std::shared_ptr& op) + { + if(!op->scheduled && op->state==Executing && !op->queue.empty() && (!op->pipeline || op->window)) + { + server->acceptor_loop.dispatch([op](){ + op->doReply(); + }); + op->scheduled = true; + } + } + + void doReply() + { + auto ch = chan.lock(); + if(!ch) + return; + auto conn = ch->conn.lock(); + if(!conn || !conn->bev) + return; + + Guard G(lock); + scheduled = false; + + if(state==Dead) + return; + + uint8_t subcmd = 0u; + if(state==Creating) { + subcmd = 0x08; + state = type ? Idle : Dead; + + } else if(state==Executing) { + if(queue.empty()) { + return; // nothing to do + + } else if(!queue.front()) { + finished = true; + subcmd = 0x10; + state = Dead; + } + } + + { + (void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get())); + + EvOutBuf R(hostBE, conn->txBody.get()); + to_wire(R, uint32_t(ioid)); + to_wire(R, subcmd); + if(subcmd&0x08) { + if(!msg.empty() || !type) { + to_wire(R, Status::error(msg)); + + } else { + to_wire(R, Status{}); + to_wire(R, type.get()); + } + + } else if(!queue.empty()) { + auto& ent = queue.front(); + if(ent) { + to_wire_valid(R, ent); + // TODO: placeholder for overrun mask + to_wire(R, uint8_t(0u)); + + } else { // finish (could be used to send an error) + to_wire(R, Status{}); + } + + queue.pop_front(); + } + } + + conn->enqueueTxBody(pva_app_msg_t::CMD_MONITOR); + + if(state == ServerOp::Dead) { + ch->opByIOID.erase(ioid); + auto it = conn->opByIOID.find(ioid); + if(it!=conn->opByIOID.end()) { + auto self(it->second); + conn->opByIOID.erase(it); + + if(self->onClose) + conn->iface->server->acceptor_loop.dispatch([self](){ + self->onClose(""); + }); + + } else { + assert(false); // really shouldn't happen + } + conn->opByIOID.erase(ioid); + return; + } + + if(state==Executing) { + // TODO: look at queue size change and maybe dispatch low water mark + } + + if(state==Executing && !queue.empty()) { + // reshedule myself + assert(!scheduled); // we've been holding the lock, so this should not have changed + + auto self(shared_from_this()); + conn->iface->server->acceptor_loop.dispatch([self]() { + self->doReply(); + }); + scheduled = true; + } + } +}; + +struct ServerMonitorSetup; + +struct ServerMonitorControl : public server::MonitorControlOp +{ + ServerMonitorControl(ServerMonitorSetup* setup, + const std::weak_ptr& server, + const std::string& name, + const Value& request, + const std::weak_ptr& op); + virtual ~ServerMonitorControl() { + finish(); + } + + virtual bool doPost(Value&& val, bool maybe, bool force) override final + { + auto mon(op.lock()); + if(!mon) + return false; + + Guard G(mon->lock); + + if((mon->queue.size() < mon->limit) || force || !val) { + mon->queue.push_back(std::move(val)); + + } else if(!maybe) { + // squash + assert(mon->limit>0 && !mon->queue.empty()); + + mon->queue.back().assign(val); + // TODO track overrun + + } else { + // nope + } + + if(auto serv = server.lock()) + MonitorOp::maybeReply(serv.get(), mon); + + return mon->queue.size() < mon->limit; + } + + virtual int32_t nFree() const override final + { + return 0; // TODO + } + virtual unsigned long long maxFree() const override final + { + return 0; // TODO + } + virtual void setWatermarks(size_t low, size_t high) override final + { + } + virtual void onStart(std::function &&fn) override final + { + auto serv = server.lock(); + if(!serv) + return; + serv->acceptor_loop.call([this, &fn](){ + if(auto oper = op.lock()) + oper->onStart = std::move(fn); + }); + } + virtual void onHighMark(std::function &&fn) override final + { + } + virtual void onLowMark(std::function &&fn) override final + { + } + + const std::weak_ptr server; + const std::weak_ptr op; +}; + +struct ServerMonitorSetup : public server::MonitorSetupOp +{ + ServerMonitorSetup(ServerConn* conn, + const std::weak_ptr& server, + const std::string& name, + const Value& request, + const std::weak_ptr& op) + :server(server) + ,op(op) + { + _op = Info; + _name = name; + _peerName = conn->peerName; + _ifaceName = conn->iface->name; + } + virtual ~ServerMonitorSetup() { + error("Monitor Create implied error"); + } + + virtual std::unique_ptr connect(const Value &prototype) override final + { + if(!prototype) + throw std::invalid_argument("Must provide prototype"); + auto type = Value::Helper::type(prototype); + + std::unique_ptr ret; + + auto serv = server.lock(); + if(!serv) + return ret; + serv->acceptor_loop.call([this, &type, &ret](){ + if(auto oper = op.lock()) { + if(oper->state!=ServerOp::Creating) + return; + oper->type = type; + ret.reset(new ServerMonitorControl(this, server, _name, pvRequest, oper)); + oper->doReply(); + } + }); + if(!ret) + throw std::runtime_error("Dead Operation"); + + return ret; + } + virtual void error(const std::string &msg) override final + { + if(msg.empty()) + throw std::invalid_argument("Must provide error message"); + auto serv = server.lock(); + if(!serv) + return; + serv->acceptor_loop.call([this, &msg](){ + if(auto oper = op.lock()) { + if(oper->state==ServerOp::Creating) { + oper->msg = msg; + oper->doReply(); + } + } + }); + } + virtual void onClose(std::function &&fn) override final + { + auto serv = server.lock(); + if(!serv) + return; + serv->acceptor_loop.call([this, &fn](){ + if(auto oper = op.lock()) + oper->onClose = std::move(fn); + }); + } + + const std::weak_ptr server; + const std::weak_ptr op; +}; + + +ServerMonitorControl::ServerMonitorControl(ServerMonitorSetup* setup, + const std::weak_ptr& server, + const std::string& name, + const Value& request, + const std::weak_ptr& op) + :server(server) + ,op(op) +{ + _op = Info; + _name = name; + _peerName = setup->peerName(); + _ifaceName = setup->name(); +} + +} // namespace + +void ServerConn::handle_MONITOR() +{ + EvInBuf M(peerBE, segBuf.get(), 16); + + uint32_t sid = -1, ioid = -1; + uint8_t subcmd = 0; + uint32_t nack = 0; + + from_wire(M, sid); + from_wire(M, ioid); + from_wire(M, subcmd); + + if(subcmd&0x08) { // INIT + // type and full value + Value pvRequest; + from_wire_type_value(M, rxRegistry, pvRequest); + + if(subcmd&0x80) { + from_wire(M, nack); + } + + if(!M.good()) { + log_printf(connio, Debug, "Client %s\n Invalid MONITOR/%x INIT\n", + peerName.c_str(), subcmd); + bev.reset(); + return; + } + + auto& chan = lookupSID(sid); + + if(opByIOID.find(ioid)!=opByIOID.end()) { + log_printf(connsetup, Err, "Client %s reuses existing ioid %u\n", peerName.c_str(), unsigned(ioid)); + bev.reset(); + return; + } + + auto op(std::make_shared(chan, ioid)); + op->window = nack; + (void)pvRequest["record._options.pipeline"].as(op->pipeline); + + pvRequest["record._options.queueSize"].as([&op](size_t qSize){ + if(qSize>1) + op->limit = qSize; + }); + + std::unique_ptr ctrl(new ServerMonitorSetup(this, iface->server->internal_self, chan->name, pvRequest, op)); + + op->state = ServerOp::Creating; + + opByIOID[ioid] = op; + chan->opByIOID[ioid] = op; + + log_printf(connsetup, Debug, "Client %s Monitor INIT ioid=%u pvRequest=%s\n", + peerName.c_str(), unsigned(ioid), + std::string(SB()<onSubscribe) { + chan->onSubscribe(std::move(ctrl)); + } else { + ctrl->error("Monitor operation not implemented by this PV"); + } + + } else { // start, stop, ack, destroy + + if(subcmd&0x80) { + from_wire(M, nack); + } + + if(!M.good()) { + log_printf(connio, Debug, "Client %s\n Invalid MONITOR/%x CMD\n", + peerName.c_str(), subcmd); + bev.reset(); + return; + } + + std::shared_ptr op; + auto it = opByIOID.find(ioid); + if(it==opByIOID.end() || !(op=std::dynamic_pointer_cast(it->second)) + || op->state==ServerOp::Dead || op->state==ServerOp::Creating) { + log_printf(connio, Err, "Client %s MONITORs %s IOID %u state=%d\n", peerName.c_str(), + it==opByIOID.end() ? "non-existant" : "invalid", unsigned(ioid), + op ? op->state : ServerOp::Dead); + bev.reset(); + return; + } + + auto& chan = lookupSID(sid); + + // pvAccessCPP won't accept ack and start/stop in the same message, + // although it will accept destroy in any !INIT message. + // We do accept ack+start/stop as there is no reason not to. + if(subcmd&0x80 && op->pipeline) { // ack + log_printf(connio, Debug, "Client %s IOID %u acks %u\n", + peerName.c_str(), unsigned(ioid), unsigned(nack)); + + Guard G(op->lock); + + op->window += nack; + + // TODO: notify high level + } + + if(subcmd&0x04) { + bool start = subcmd&0x40; + + { + Guard G(op->lock); + op->state = start ? ServerOp::Executing : ServerOp::Idle; + } + + if(op->onStart) + op->onStart(start); + + { + Guard G(op->lock); + MonitorOp::maybeReply(iface->server, op); + } + } + + if(subcmd&0x10) { + // destroy + + chan->opByIOID.erase(ioid); + auto it = opByIOID.find(ioid); + if(it!=opByIOID.end()) { + auto self(it->second); + opByIOID.erase(it); + + if(self->onClose) + iface->server->acceptor_loop.dispatch([self](){ + self->onClose(""); + }); + + } else { + assert(false); // really shouldn't happen + } + opByIOID.erase(ioid); + } + } +} + +}} // namespace pvxs::impl diff --git a/test/Makefile b/test/Makefile index 8becaf3..430e2f5 100644 --- a/test/Makefile +++ b/test/Makefile @@ -46,6 +46,10 @@ TESTPROD += dummyserv dummyserv_SRCS += dummyserv.cpp # not a unittest +TESTPROD += countdown +countdown_SRCS += countdown.cpp +# not a unittest + PROD_SYS_LIBS += event_core PROD_SYS_LIBS_DEFAULT += event_pthreads diff --git a/test/countdown.cpp b/test/countdown.cpp new file mode 100644 index 0000000..cb034e1 --- /dev/null +++ b/test/countdown.cpp @@ -0,0 +1,139 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include + +#include + +#include +#include +#include +#include + +#include + +namespace { +using namespace pvxs; +using namespace pvxs::server; + +DEFINE_LOGGER(app, "countdown"); + +auto def = nt::NTScalar{TypeCode::UInt32}.build(); + +struct CountdownSrc : public Source +{ + const std::string name; + + pvxs::impl::evbase loop; + + CountdownSrc(const std::string& name) + :name(name) + ,loop("counter") + {} + + virtual void onSearch(Search &search) override final + { + for(auto& op : search) { + if(op.name()==name) { + log_printf(app, Info, "Claiming '%s'\n", op.name()); + op.claim(); + } else { + log_printf(app, Debug, "Ignoring '%s'\n", op.name()); + } + } + } + virtual void onCreate(std::unique_ptr &&op) override final + { + if(op->name()!=name) + return; + + std::shared_ptr chan(std::move(op)); + + log_printf(app, Info, "Create chan '%s'\n", chan->name().c_str()); + + chan->onSubscribe([this, chan](std::unique_ptr&& setup) { + + log_printf(app, Info, "Create mon '%s'\n", chan->name().c_str()); + + std::shared_ptr op(setup->connect(def.create())); // unique_ptr becomes shared_ptr + + loop.later(1.0, std::bind(&CountdownSrc::tick, this, op, 5u)); + }); + + // return a dummy value for info/get + chan->onOp([](std::unique_ptr&& conn) { + conn->connect(def.create()); + + conn->onGet([](std::unique_ptr&& op){ + auto val = def.create(); + val["value"] = 0u; + op->reply(val); + }); + }); + } + + virtual List onList() override final + { + auto names(std::make_shared>()); + names->insert(name); + return List{names}; + } + + void tick(const std::shared_ptr& op, uint32_t count) + { + log_printf(app, Info, "tick %u\n", unsigned(count)); + + auto val = def.create(); + val["value"].from(count); + { + epicsTimeStamp now; + epicsTimeGetCurrent(&now); + val["value"] = count; + val["timeStamp.secondsPastEpoch"] = now.secPastEpoch+POSIX_TIME_AT_EPICS_EPOCH; + val["timeStamp.nanoseconds"] = now.nsec; + } + + op->post(std::move(val)); + + if(count) + loop.later(1.0, std::bind(&CountdownSrc::tick, this, op, count-1u)); + else + op->finish(); + } +}; + +} // namespace + +int main(int argc, char *argv[]) +{ + int ret = 0; + try { + pvxs::logger_level_set(app.name, pvxs::Level::Info); + pvxs::logger_config_env(); + + auto src = std::make_shared("countdown"); + + auto serv = Server::Config::from_env() + .build() + .addSource("countdown", src); + + auto& conf = serv.config(); + + std::cout<<"Serving from :\n"; + for(auto& iface : conf.interfaces) { + std::cout<<" "<