From e4d2ba5c69689583355abfab05bb7e48baf2baf7 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Fri, 31 Jan 2020 14:28:36 -0800 Subject: [PATCH] mcat demo --- test/Makefile | 4 ++ test/mcat.cpp | 170 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 174 insertions(+) create mode 100644 test/mcat.cpp diff --git a/test/Makefile b/test/Makefile index 430e2f5..b8db69d 100644 --- a/test/Makefile +++ b/test/Makefile @@ -50,6 +50,10 @@ TESTPROD += countdown countdown_SRCS += countdown.cpp # not a unittest +TESTPROD += mcat +mcat_SRCS += mcat.cpp +# not a unittest + PROD_SYS_LIBS += event_core PROD_SYS_LIBS_DEFAULT += event_pthreads diff --git a/test/mcat.cpp b/test/mcat.cpp new file mode 100644 index 0000000..0a981d4 --- /dev/null +++ b/test/mcat.cpp @@ -0,0 +1,170 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +namespace { +using namespace pvxs; + +DEFINE_LOGGER(app, "mcat"); + +auto def = nt::NTScalar{TypeCode::String}.build(); + +struct FileSource : public server::Source +{ + std::string name; + std::string fname; + + virtual void onSearch(Search &search) override final + { + for(auto& op : search) { + if(op.name()==name) { + log_printf(app, Info, "Claiming '%s'\n", op.name()); + op.claim(); + } else { + log_printf(app, Debug, "Ignoring '%s'\n", op.name()); + } + } + } + virtual void onCreate(std::unique_ptr &&op) override final + { + if(op->name()!=name) + return; + + std::shared_ptr chan(std::move(op)); + + log_printf(app, Info, "Create chan '%s'\n", chan->name().c_str()); + + chan->onSubscribe([this, chan](std::unique_ptr&& setup) { + + log_printf(app, Info, "Create mon '%s'\n", chan->name().c_str()); + + auto fstrm = std::make_shared(fname); + if(!fstrm->is_open()) { + setup->error("Unable to open file"); + return; + } + + std::shared_ptr op(setup->connect(def.create())); // unique_ptr becomes shared_ptr + + server::MonitorStat stats; + op->stats(stats); + log_printf(app, Info, "Queue size %u\n", unsigned(stats.limitQueue)); + + op->setWatermarks(0, 0); + + auto refill = [op, fstrm](){ + log_printf(app, Info, "fill mon '%s'\n", op->name().c_str()); + + std::string line; + while(std::getline(*fstrm, line)) { + auto val = def.create(); + val["value"] = line; + val["alarm.severity"] = 0; + + log_printf(app, Info, "push line '%s'\n", line.c_str()); + if(!op->forcePost(std::move(val))) + return; + } + + log_printf(app, Info, "finished %s\n", fstrm->eof() ? "EOF" : ""); + if(!fstrm->eof()) { + auto val = def.create(); + val["value"] = ""; + val["alarm.severity"] = 3; + op->forcePost(std::move(val)); + } + + op->finish(); + }; + + op->onHighMark([refill, op](){ + log_printf(app, Info, "mon now '%s'\n", op->name().c_str()); + refill(); + }); + + // initial fill + refill(); + }); + + // return a dummy value for info/get + chan->onOp([](std::unique_ptr&& conn) { + conn->connect(def.create()); + + conn->onGet([](std::unique_ptr&& op){ + auto val = def.create(); + val["value"] = "No current value to get"; + val["alarm.severity"] = 3; + op->reply(val); + }); + }); + } + + virtual List onList() override final + { + auto names(std::make_shared>()); + names->insert(name); + return List{names}; + } +}; + +void usage(const char *argv0) +{ + std::cerr<<"Usage: "< \n"; +} + +} // namespace + +int main(int argc, char* argv[]) +{ + int opt; + while ((opt = getopt(argc, argv, "h")) != -1) { + switch (opt) { + case 'h': /* Print usage */ + usage(argv[0]); + return 0; + } + } + + if(argc - optind !=2 ) { + usage(argv[0]); + std::cerr<<"\nError incorrect number of positional arguments\n"; + return 1; + } + + logger_level_set(app.name, pvxs::Level::Info); + logger_config_env(); + + auto src = std::make_shared(); + src->name = argv[optind]; + src->fname = argv[optind+1]; + + auto serv = server::Server::Config::from_env() + .build() + .addSource("mcat", src); + + auto& conf = serv.config(); + + std::cout<<"Serving from :\n"; + for(auto& iface : conf.interfaces) { + std::cout<<" "<