diff --git a/src/Makefile b/src/Makefile index de8a2e0..ce97876 100644 --- a/src/Makefile +++ b/src/Makefile @@ -84,6 +84,7 @@ LIB_SRCS += clientreq.cpp LIB_SRCS += clientconn.cpp LIB_SRCS += clientintrospect.cpp LIB_SRCS += clientget.cpp +LIB_SRCS += clientmon.cpp LIB_LIBS += Com diff --git a/src/client.cpp b/src/client.cpp index 3d7509c..96f04a5 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -31,6 +31,7 @@ constexpr timeval beaconCleanInterval{2*180, 0}; Disconnect::Disconnect() :std::runtime_error("Disconnected") + ,time(epicsTime::getCurrent()) {} Disconnect::~Disconnect() {} @@ -41,6 +42,16 @@ RemoteError::RemoteError(const std::string& msg) RemoteError::~RemoteError() {} +Finished::~Finished() {} + +Connected::Connected(const std::string& peerName) + :std::runtime_error("Connected") + ,peerName(peerName) + ,time(epicsTime::getCurrent()) +{} + +Connected::~Connected() {} + Channel::Channel(const std::shared_ptr& context, const std::string& name, uint32_t cid) :context(context) ,name(name) @@ -562,8 +573,16 @@ void Context::Pvt::tickSearch() auto ninc = chan->nSearch = std::min(searchBuckets.size(), chan->nSearch+1u); auto next = (idx + ninc)%searchBuckets.size(); + auto nextnext = (next + 1u)%searchBuckets.size(); - // TODO leveling with next+-1 buckets + // try to smooth out UDP bcast load by waiting one extra tick + { + auto nextN = searchBuckets[next].size(); + auto nextnextN = searchBuckets[nextnext].size(); + + if(nextN > nextnextN && (nextN-nextnextN > 100u)) + next = nextnext; + } auto& nextBucket = searchBuckets[next]; diff --git a/src/clientconn.cpp b/src/clientconn.cpp index a588b9e..955c888 100644 --- a/src/clientconn.cpp +++ b/src/clientconn.cpp @@ -75,6 +75,8 @@ void Connection::createChannels() void Connection::sendDestroyRequest(uint32_t sid, uint32_t ioid) { + if(!bev) + return; { (void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get())); diff --git a/src/clientimpl.h b/src/clientimpl.h index 79c54e9..e2a43a9 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -86,7 +86,7 @@ struct Connection : public ConnBase, public std::enable_shared_from_this +#include +#include +#include + +#include + +#include +#include "clientimpl.h" + +namespace pvxs { +namespace client { + +typedef epicsGuard Guard; + +DEFINE_LOGGER(setup, "pvxs.client.setup"); +DEFINE_LOGGER(io, "pvxs.client.io"); + +namespace { +struct Entry { + Value val; + std::exception_ptr exc; + Entry() = default; + Entry(Value&& v) :val(std::move(v)) {} + Entry(const std::exception_ptr& e) :exc(e) {} +}; +} + +struct SubscriptionImpl : public OperationBase, public Subscription +{ + evevent ackTick; + + // const after exec() + std::function event; + Value pvRequest; + bool pipeline = false; + bool autostart = true; + bool maskConn = false, maskDiscon = true; + uint32_t queueSize = 4u, ackAt=0u; + + // only access from tcp_loop + + enum state_t : uint8_t { + Connecting, // waiting for an active Channel + Creating, // waiting for reply to INIT + Idle, // waiting for start + Running, // waiting for stop + Done, // Finished or error + } state = Connecting; + + mutable epicsMutex lock; + + // guarded by lock + + std::deque queue; + uint32_t window =0u, unack =0u; + + SubscriptionImpl(operation_t op, const std::shared_ptr& chan) + :OperationBase (op, chan) + ,ackTick(event_new(chan->context->tcp_loop.base, -1, EV_TIMEOUT, &tickAckS, this)) + {} + virtual ~SubscriptionImpl() { + cancel(); + } + + void notify() + { + log_info_printf(io, "Server %s channel %s monitor notify\n", + chan->conn ? chan->conn->peerName.c_str() : "", + chan->name.c_str()); + if(event) { + try { + event(*this); + }catch(std::exception& e){ + log_err_printf(io, "Unhandled user exception in Monitor %s %s : %s\n", + __func__, typeid (e).name(), e.what()); + } + } + } + + virtual void pause(bool p) override final + { + if(!chan) + return; + chan->context->tcp_loop.call([this, p](){ + log_info_printf(io, "Server %s channel %s monitor %s\n", + chan->conn ? chan->conn->peerName.c_str() : "", + chan->name.c_str(), + p ? "pause" : "resume"); + + if((state==Idle && !p) || (state==Running && p)) { + auto& conn = chan->conn; + + { + uint8_t subcmd = p ? 0x04 : 0x44; // STOP | START + + (void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get())); + + EvOutBuf R(hostBE, conn->txBody.get()); + + to_wire(R, chan->sid); + to_wire(R, ioid); + to_wire(R, subcmd); + } + conn->enqueueTxBody(CMD_MONITOR); + + state = p ? Idle : Running; + } + }); + } + + virtual Value pop() override final + { + Value ret; + { + Guard G(lock); + + if(!queue.empty()) { + auto ent(queue.front()); + queue.pop_front(); + + if(pipeline) { + timeval tick{}; // immediate ACK + + // schedule delayed ack while below threshold. + // avoid overhead of re-scheduling when unack in range [1, ackAt) + if(unack==0u && ackAt!=1u) + tick = timeval{1,0}; + + if(unack==0u || unack>=ackAt) { + if(event_add(ackTick.get(), &tick)) + log_err_printf(io, "Monitor '%s' unable to schedule ack\n", chan->name.c_str()); + } + + unack++; + } + + if(ent.exc) + std::rethrow_exception(ent.exc); + else + ret = std::move(ent.val); + } + } + return ret; + } + + virtual void cancel() override final + { + auto context = chan->context; + decltype (event) junk; + context->tcp_loop.call([this, &junk](){ + log_info_printf(io, "Server %s channel %s monitor cancel\n", + chan->conn ? chan->conn->peerName.c_str() : "", + chan->name.c_str()); + + if(state==Idle || state==Running) { + chan->conn->sendDestroyRequest(chan->sid, ioid); + + // This opens up a race with an in-flight reply. + chan->conn->opByIOID.erase(ioid); + chan->opByIOID.erase(ioid); + + if(pipeline) + (void)event_del(ackTick.get()); + } + state = Done; + chan.reset(); + junk = std::move(event); + // leave opByIOID for GC + }); + } + + virtual void createOp() override final + { + if(state!=Connecting) { + return; + } + + auto& conn = chan->conn; + + { + uint8_t subcmd = 0x08; // INIT + if(pipeline) + subcmd |= 0x80; + + (void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get())); + + EvOutBuf R(hostBE, conn->txBody.get()); + + to_wire(R, chan->sid); + to_wire(R, ioid); + to_wire(R, subcmd); + to_wire(R, Value::Helper::desc(pvRequest)); + to_wire_full(R, pvRequest); + if(pipeline) + to_wire(R, queueSize); + } + conn->enqueueTxBody(CMD_MONITOR); + + log_debug_printf(io, "Server %s channel '%s' monitor INIT\n", + conn->peerName.c_str(), chan->name.c_str()); + + state = Creating; + + bool empty = false; + if(!maskConn || pipeline) { + Guard G(lock); + + if(!maskConn) { + empty = queue.empty(); + + queue.emplace_back(Entry(std::make_exception_ptr(Connected(conn->peerName)))); + } + if(pipeline) + window = queueSize; + } + + if(empty) + notify(); + } + + virtual void disconnected(const std::shared_ptr &self) override final + { + log_debug_printf(io, "Server %s channel %s monitor disconnected in %d\n", + chan->conn ? chan->conn->peerName.c_str() : "", + chan->name.c_str(), + state); + + switch (state) { + case Connecting: + case Done: + // noop + break; + case Creating: + case Idle: + case Running: + // return to pending + + bool empty = false; + if(!maskDiscon) { + Guard G(lock); + empty = queue.empty(); + + queue.emplace_back(Entry(std::make_exception_ptr(Disconnect()))); + } + + chan->pending.push_back(self); + state = Connecting; + + if(empty) + notify(); + + break; + } + } + + void tickAck() + { + if(((state==Idle) || (state==Running)) && pipeline && unack) { + log_debug_printf(io, "Server %s channel %s monitor ACK\n", + chan->conn ? chan->conn->peerName.c_str() : "", + chan->name.c_str()); + + auto& conn = chan->conn; + { + (void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get())); + + EvOutBuf R(hostBE, conn->txBody.get()); + + to_wire(R, chan->sid); + to_wire(R, ioid); + to_wire(R, uint8_t(0x80)); + to_wire(R, uint32_t(unack)); + } + conn->enqueueTxBody(CMD_MONITOR); + + window += unack; + unack = 0u; + } + } + static + void tickAckS(evutil_socket_t fd, short evt, void *raw) + { + try { + static_cast(raw)->tickAck(); + }catch(std::exception& e) { + log_crit_printf(io, "Unhandled exception in %s %s : %s\n", + __func__, typeid (e).name(), e.what()); + } + } +}; + +void Connection::handle_MONITOR() +{ + EvInBuf M(peerBE, segBuf.get(), 16); + + uint32_t ioid=0; + uint8_t subcmd=0; + Status sts{}; + Value data; // hold prototype (INIT) or reply data + + from_wire(M, ioid); + from_wire(M, subcmd); + bool init = subcmd&0x08; + bool final = subcmd&0x10; + + if(init || final) + from_wire(M, sts); + if(init) + from_wire_type(M, rxRegistry, data); + + RequestInfo* info=nullptr; + if(M.good()) { + auto it = opByIOID.find(ioid); + if(it!=opByIOID.end()) { + info = &it->second; + + } else { + auto lvl = Level::Debug; + if(!init) { + // We don't have enough information to decode the rest of the payload. + // This *may* leave rxRegistry out of sync (if it contains Variant Unions). + // We can't know whether this is the case. + // Failing soft here may lead to failures decoding future replies. + // We could force close the Connection here to be "safe". + // However, we assume the such usage of Variant is relatively rare + + lvl = Level::Err; + } + + log_printf(io, lvl, "Server %s uses non-existant IOID %u. Ignoring...\n", + peerName.c_str(), unsigned(ioid)); + return; + } + + if(!sts.isSuccess()) { + + } else if(init) { + info->prototype = std::move(data); + + } else if(!final || !M.empty()) { + + data = info->prototype.cloneEmpty(); + from_wire_valid(M, rxRegistry, data); + + BitMask overrun; + from_wire(M, overrun); + (void)overrun; // ignoring + } + } + + // validate received message against operation state + + std::shared_ptr op; + SubscriptionImpl* mon = nullptr; + if(M.good() && info) { + op = info->handle.lock(); + if(!op) { + // assume op has already sent CMD_DESTROY_REQUEST + log_debug_printf(io, "Server %s ignoring stake cmd%02x ioid %u\n", + peerName.c_str(), CMD_MONITOR, unsigned(ioid)); + return; + } + + if(uint8_t(op->op)!=CMD_MONITOR) { + // peer mixes up IOID and operation type + M.fault(); + + } else { + mon = static_cast(op.get()); + + // check that subcmd is as expected based on operation state + if((mon->state==SubscriptionImpl::Creating) && init) { + + } else if((mon->state==SubscriptionImpl::Idle) && !init) { + + } else if((mon->state==SubscriptionImpl::Running) && !init) { + + } else { + M.fault(); + } + } + } + + if(!M.good() || !mon) { + log_crit_printf(io, "Server %s sends invalid MONITOR. Disconnecting...\n", peerName.c_str()); + bev.reset(); + return; + } + + Entry update; + + if(!sts.isSuccess()) { + update.exc = std::make_exception_ptr(RemoteError(sts.msg)); + mon->state = SubscriptionImpl::Done; + + } else if(mon->state==SubscriptionImpl::Creating) { + log_debug_printf(io, "Server %s channel %s monitor Created\n", + peerName.c_str(), + mon->chan->name.c_str()); + + mon->state = SubscriptionImpl::Idle; + + if(mon->autostart) + mon->resume(); + + } else if(data) { // Idle or Running + update.val = std::move(data); + + } else { + // NULL update. can this happen? + log_debug_printf(io, "Server %s channel %s monitor RX NULL\n", + peerName.c_str(), + mon->chan->name.c_str()); + } + + bool notify = false; + if(!init) { + Guard G(mon->lock); + + if(mon->pipeline) { + if(mon->window) { + mon->window--; + } else { + log_err_printf(io, "Server %s channel '%s' MONITOR exceeds window size\n", + peerName.c_str(), mon->chan->name.c_str()); + } + } + + notify = mon->queue.empty(); + + if(update.exc || (mon->queue.size() < mon->queueSize) || mon->queue.back().exc) { + log_debug_printf(io, "Server %s channel %s monitor PUSH\n", + peerName.c_str(), + mon->chan->name.c_str()); + + mon->queue.emplace_back(std::move(update)); + + } else if(update.val) { + log_debug_printf(io, "Server %s channel %s monitor Squash\n", + peerName.c_str(), + mon->chan->name.c_str()); + + mon->queue.back().val.assign(update.val); + } + + if(final && !update.exc) { + log_debug_printf(io, "Server %s channel %s monitor FINISH\n", + peerName.c_str(), + mon->chan->name.c_str()); + + mon->queue.emplace_back(std::make_exception_ptr(Finished())); + } + } + + if(mon->state==SubscriptionImpl::Done || final) { + mon->state=SubscriptionImpl::Done; + + opByIOID.erase(ioid); + mon->chan->opByIOID.erase(ioid); + + if(mon->pipeline) + (void)event_del(mon->ackTick.get()); + + if(!final) + sendDestroyRequest(mon->chan->sid, ioid); + } + + if(notify) + mon->notify(); +} + + +std::shared_ptr MonitorBuilder::exec() +{ + std::shared_ptr ret; + + ctx->tcp_loop.call([&ret, this]() { + auto chan = Channel::build(ctx, _name); + + auto op = std::make_shared(Operation::Monitor, chan); + op->event = std::move(_event); + op->pvRequest = _build(); + op->maskConn = _maskConn; + op->maskDiscon = _maskDisconn; + + auto options = op->pvRequest["record._options"]; + + options["queueSize"].as([&op](uint32_t Q) { + if(Q>1) + op->queueSize = Q; + }); + + (void)options["pipeline"].as(op->pipeline); + + auto ackAny = options["ackAny"]; + + if(ackAny.type()==TypeCode::String) { + auto sval = ackAny.as(); + if(sval.size()>1 && sval.back()=='%') { + double percent=50.0; + char *units = nullptr; + if(epicsParseDouble(sval.c_str(), &percent, &units)==0 && units && units[0]=='%') { + if(percent>0.0 && percent<=100.0) + op->ackAt = uint32_t(percent * op->queueSize); + } + } + + } + + if(op->ackAt==0u){ + uint32_t count=0u; + + if(ackAny.as(count)) { + op->ackAt = count; + } + } + + if(op->ackAt==0u){ + op->ackAt = op->queueSize/2u; + } + + op->ackAt = std::max(1u, std::min(op->ackAt, op->queueSize)); + + chan->pending.push_back(op); + chan->createOperations(); + + ret = std::move(op); + }); + + return ret; +} + +} // namespace client +} // namespace pvxs diff --git a/src/pvxs/client.h b/src/pvxs/client.h index 6afb157..e3d760c 100644 --- a/src/pvxs/client.h +++ b/src/pvxs/client.h @@ -14,6 +14,8 @@ #include #include +#include + #include #include @@ -28,6 +30,9 @@ struct PVXS_API Disconnect : public std::runtime_error { Disconnect(); virtual ~Disconnect(); + + //! When loss of connection was noticed (when timeout expires). + const epicsTime time; }; //! Error condition signaled by server @@ -37,6 +42,24 @@ struct PVXS_API RemoteError : public std::runtime_error virtual ~RemoteError(); }; +//! For monitor only. Subscription has completed normally +//! and no more events will ever be received. +struct PVXS_API Finished : public Disconnect +{ + Finished() = default; + virtual ~Finished(); +}; + +//! For monitor only. Subscription has (re)connected. +struct PVXS_API Connected : public std::runtime_error +{ + Connected(const std::string& peerName); + virtual ~Connected(); + + const std::string peerName; + const epicsTime time; +}; + //! Holder for a Value or an exception class Result { Value _result; @@ -80,15 +103,15 @@ struct PVXS_API Operation { //! Handle for monitor subscription struct PVXS_API Subscription { - enum Event { - Error, - Disconnect, - NotEmpty, - }; virtual ~Subscription() =0; virtual void cancel() =0; + + virtual void pause(bool p=true) =0; + inline void resume() { pause(false); } + + virtual Value pop() =0; }; class GetBuilder; @@ -357,10 +380,18 @@ public: }; RPCBuilder Context::rpc(const std::string& name, Value&& arg) { return RPCBuilder{pvt, name, std::move(arg)}; } -class MonitorBuilder : protected detail::CommonBuilder { - std::function&, Subscription::Event)> _event; +class MonitorBuilder : public detail::CommonBuilder { + std::function _event; + bool _maskConn = true; + bool _maskDisconn = false; public: MonitorBuilder(const std::shared_ptr& ctx, const std::string& name) :CommonBuilder{ctx,name} {} + //! Install event callback + MonitorBuilder& event(decltype (_event)&& cb) { _event = std::move(cb); return *this; } + //! Include Connected exceptions in queue (default false). + MonitorBuilder& maskConnected(bool m = true) { _maskConn = m; return *this; } + //! Include Disconnected exceptiosn in queue (default true). + MonitorBuilder& maskDisconnected(bool m = true) { _maskDisconn = m; return *this; } PVXS_API std::shared_ptr exec(); diff --git a/test/Makefile b/test/Makefile index 40cf724..69666b6 100644 --- a/test/Makefile +++ b/test/Makefile @@ -58,6 +58,10 @@ TESTPROD += testget testget_SRCS += testget.cpp TESTS += testget +TESTPROD += testmon +testmon_SRCS += testmon.cpp +TESTS += testmon + TESTPROD += testput testput_SRCS += testput.cpp TESTS += testput diff --git a/test/testmon.cpp b/test/testmon.cpp new file mode 100644 index 0000000..510a4d0 --- /dev/null +++ b/test/testmon.cpp @@ -0,0 +1,203 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include + +#include + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace { +using namespace pvxs; + +struct BasicTest { + Value initial; + server::SharedPV mbox; + server::Server serv; + client::Context cli; + + epicsEvent evt; + std::shared_ptr sub; + + BasicTest() + :initial(nt::NTScalar{TypeCode::Int32}.create()) + ,mbox(server::SharedPV::buildReadonly()) + ,serv(server::Config::isolated() + .build() + .addPV("mailbox", mbox)) + ,cli(serv.clientConfig().build()) + { + testShow()<<"Server:\n"<([this](){ + sub->pop(); + }); + } + + void phase1() + { + testShow()<<"begin "<<__func__; + + testDiag("Wait for Data update event"); + testOk1(!!evt.wait(5.0)); + + if(auto val = sub->pop()) { + testEq(val["value"].as(), 42); + } else { + testFail("Missing data update"); + } + + post(123); + + testDiag("Wait for Data update event 2"); + testOk1(!!evt.wait(5.0)); + + if(auto val = sub->pop()) { + testEq(val["value"].as(), 123); + } else { + testFail("Missing data update 2"); + } + + testShow()<<"end "<<__func__; + } + + void phase2(bool howdisconn) + { + testShow()<<"begin "<<__func__; + + if(howdisconn) { + testDiag("Stopping server"); + serv.stop(); + } else { + testDiag("close() mbox"); + mbox.close(); + } + + testDiag("Wait for Disconnected event"); + testOk1(!!evt.wait(5.0)); + + testThrows([this](){ + sub->pop(); + }); + + testShow()<<"end "<<__func__; + } + + void testBasic(bool howdisconn) + { + testShow()<<__func__<<" "<pop()) { + testEq(val["value"].as(), 42); + } else { + testFail("Missing data update"); + } + + phase2(false); + + // closing mbox should not disconnect mbox2. + + auto update(initial.cloneEmpty()); + update["value"] = 39; + mbox2.post(std::move(update)); + + testDiag("Wait for Data update event2 on mbox2"); + testOk1(!!evt2.wait(5.0)); + + if(auto val = sub2->pop()) { + testEq(val["value"].as(), 39); + } else { + testFail("Missing data update"); + } + } +}; + +} // namespace + +MAIN(testmon) +{ + testPlan(0); + logger_config_env(); + TestLifeCycle().testBasic(true); + TestLifeCycle().testBasic(false); + TestLifeCycle().testSecond(); + cleanup_for_valgrind(); + return testDone(); +} diff --git a/tools/Makefile b/tools/Makefile index 0851945..51a27b8 100644 --- a/tools/Makefile +++ b/tools/Makefile @@ -19,6 +19,9 @@ pvxinfo_SRCS += info.cpp PROD += pvxget pvxget_SRCS += get.cpp +PROD += pvxmonitor +pvxmonitor_SRCS += monitor.cpp + PROD += pvxput pvxput_SRCS += put.cpp diff --git a/tools/monitor.cpp b/tools/monitor.cpp new file mode 100644 index 0000000..6e32ed7 --- /dev/null +++ b/tools/monitor.cpp @@ -0,0 +1,130 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include +#include + +#include +#include +#include + +#include +#include +#include "utilpvt.h" + +using namespace pvxs; + +namespace { + +DEFINE_LOGGER(app, "app"); + +void usage(const char* argv0) +{ + std::cerr<<"Usage: "< [pvname ...]\n" + "\n" + " -h Show this message.\n" + " -r pvRequest condition.\n" + " -v Make more noise.\n" + " -d Shorthand for $PVXS_LOG=\"pvxs.*=DEBUG\". Make a lot of noise.\n" + ; +} + +} + +int main(int argc, char *argv[]) +{ + logger_config_env(); // from $PVXS_LOG + bool verbose = false; + std::string request("field()"); + + { + int opt; + while ((opt = getopt(argc, argv, "hvdr:")) != -1) { + switch(opt) { + case 'h': + usage(argv[0]); + return 0; + case 'v': + verbose = true; + logger_level_set("app", Level::Debug); + break; + case 'd': + logger_level_set("pvxs.*", Level::Debug); + break; + case 'r': + request = optarg; + break; + default: + usage(argv[0]); + std::cerr<<"\nUnknown argument: "<> ops; + + std::atomic remaining{argc-optind}; + epicsEvent done; + + for(auto n : range(optind, argc)) { + + ops.push_back(ctxt.monitor(argv[n]) + .pvRequest(request) + .event([&argv, n, verbose, &remaining, &done](client::Subscription& mon) + { + + try { + while(auto update = mon.pop()) { + log_info_printf(app, "%s POP data\n", argv[n]); + std::cout<