diff --git a/example/Makefile b/example/Makefile index ee93585..4478232 100644 --- a/example/Makefile +++ b/example/Makefile @@ -17,9 +17,6 @@ simplesrv_SRCS += simplesrv.cpp TESTPROD_HOST += mailbox mailbox_SRCS += mailbox.cpp -TESTPROD_HOST += spam -spam_SRCS += spam.cpp - TESTPROD_HOST += ticker ticker_SRCS += ticker.cpp diff --git a/example/spam.cpp b/example/spam.cpp deleted file mode 100644 index b895c94..0000000 --- a/example/spam.cpp +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Copyright - See the COPYRIGHT that is included with this distribution. - * pvxs is distributed subject to a Software License Agreement found - * in file LICENSE that is included with this distribution. - */ - -#include - -#include - -#include -#include -#include -#include - -using namespace pvxs; - -DEFINE_LOGGER(app, "spam"); - -namespace { - -struct SpamSource : public server::Source -{ - std::shared_ptr> names; - Value initial; - - SpamSource() - :names(std::make_shared()) - ,initial(nt::NTScalar{TypeCode::UInt32}.create()) - {} - - // Source interface -public: - virtual void onSearch(Search &op) override final - { - for(auto& pv :op) { - if(names->find(pv.name())!=names->end()) - pv.claim(); - } - } - virtual void onCreate(std::unique_ptr &&chan) override final - { - chan->onOp([this](std::unique_ptr&& cop) { - cop->onGet([](std::unique_ptr&& op) { - op->error("Only monitor implemented"); - }); - cop->connect(initial); - }); - - chan->onSubscribe([this](std::unique_ptr&& setup) { - - std::shared_ptr sub(setup->connect(initial)); - - auto counter = std::make_shared(0u); - - auto fill = [this, sub, counter]() mutable { - Value update; - do { - auto cnt = (*counter)++; - update = initial.cloneEmpty(); - update["value"] = cnt; - - log_debug_printf(app, "%s count %u\n", sub->peerName().c_str(), unsigned(cnt)); - - }while(sub->tryPost(update)); - }; - - sub->onHighMark(fill); - - sub->onStart([fill](bool start) mutable { - if(start) - fill(); - }); - - log_info_printf(app, "%s Subscribing\n", setup->peerName().c_str()); - }); - } - virtual List onList() override final - { - return List{names, false}; - } -}; - -} // namespace - -int main(int argc, char* argv[]) -{ - if(argc<=1) { - std::cerr<<"Usage: "<\n"; - return 1; - } - - // Read $PVXS_LOG from process environment and update - // logging configuration. eg. - // export PVXS_LOG=*=DEBUG - // makes a lot of noise. - logger_level_set(app.name, Level::Info); - logger_config_env(); - - auto src = std::make_shared(); - src->names->insert(argv[1]); - - // Build server which will serve this PV - // Configure using process environment. - server::Server serv = server::Server::fromEnv() - .addSource("spamsrc", src); - - // (optional) Print the configuration this server is using - // with any auto-address list expanded. - std::cout<<"Effective config\n"< +#include +#include +#include + +#include +#include +#include +#include + +#define PVXS_ENABLE_EXPERT_API + +#include +#include +#include + +#if EPICS_VERSION_INT < VERSION_INT(7,0,1,0) +#define epicsMonotonicGet epicsTime::getCurrent +#endif + +using namespace pvxs; + +DEFINE_LOGGER(app, "eatspam"); + +namespace { + +template +bool parse_as(T& out, const char *s) +{ + std::istringstream strm(s); + return (strm>>out).fail() || !strm.eof(); +} + +struct Counter { + std::string name; + std::vector scratch; + std::shared_ptr sub; + uint32_t prev; + size_t nwake = 0; + size_t nupdate = 0; + size_t nskip = 0; + bool first = true; +}; + +} // namespace + +int main(int argc, char* argv[]) +{ + logger_level_set(app.name, Level::Warn); + logger_config_env(); + size_t queueSize = 0; + int pipeline = 0; // tri-bool + + int opt; + { + while((opt = getopt(argc, argv, "hpPQ:")) != -1) { + switch (opt) { + case 'h': + std::cerr<<"Usage: "<] pvname..."<(queueSize, optarg)) { + std::cerr<<"Invalid queueSize: "<>()); + if(!aval.empty()) { + sval = aval[0]; + gotit = true; + } else { + continue; + } + + } else if(val.type().code <= TypeCode::Float64 && !val.type().isarray()) { + sval = val.as(); + gotit = true; + } + + if(gotit) { + if(!ctr.first) { + auto diff = sval-ctr.prev; + if(diff != 1) { + log_info_printf(app, "%s skip %u -> %u, %u\n", + ctr.name.c_str(), ctr.prev, sval, diff); + ctr.nskip++; + } + + } else { + ctr.first = false; + log_info_printf(app, "%s initial %u\n", ctr.name.c_str(), sval); + } + ctr.nupdate++; + ctr.prev = sval; + + } else { + std::cerr<cancel(); + } + } + ctr.scratch.clear(); + } + + // cancel subscriptions + for(auto& ctr : ctrs) { + ctr.sub->cancel(); + } + + // final stats and cleanup + for(auto& ctr : ctrs) { + client::SubscriptionStat stats; + ctr.sub->stats(stats); + std::cout<<' '< +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +using namespace pvxs; + +DEFINE_LOGGER(app, "spam"); + +namespace { + +struct SpamSource : public server::Source +{ + std::shared_ptr> names; + Value initial; + size_t nelem = 1u; + + SpamSource() + :names(std::make_shared()) + ,initial(nt::NTScalar{TypeCode::UInt32}.create()) + {} + + void set_nelem(size_t n) + { + initial = nt::NTScalar{TypeCode::UInt32A}.create(); + nelem = n; + } + + // Source interface + virtual void onSearch(Search &op) override final + { + for(auto& pv :op) { + if(names->find(pv.name())!=names->end()) + pv.claim(); + } + } + virtual void onCreate(std::unique_ptr &&chan) override final + { + chan->onOp([this](std::unique_ptr&& cop) { + cop->onGet([](std::unique_ptr&& op) { + op->error("Only monitor implemented"); + }); + cop->connect(initial); + }); + + chan->onSubscribe([this](std::unique_ptr&& setup) { + + std::shared_ptr sub(setup->connect(initial)); + + uint32_t counter = 0; + + auto fill = [this, sub, counter]() mutable { + Value update; + size_t nposted = 0; + server::MonitorStat stats{}; + + sub->stats(stats); + + do { + auto cnt = counter++; + update = initial.cloneEmpty(); + auto value = update["value"]; + if(value.type().isarray()) { + shared_array arr(nelem, cnt); + + value = arr.freeze(); + + } else { + value = cnt; + } + + nposted++; + + }while(sub->tryPost(update)); + + log_debug_printf(app, "%s %s counted %zu, %zu, %zu/%zu -> %u\n", + sub->peerName().c_str(), sub->name().c_str(), + nposted, stats.window, stats.nQueue, stats.limitQueue, + unsigned(counter)); + }; + + sub->onHighMark(fill); + + sub->onStart([fill](bool start) mutable { + if(start) + fill(); + }); + + log_info_printf(app, "%s Subscribing\n", setup->peerName().c_str()); + }); + } + virtual List onList() override final + { + return List{names, false}; + } +}; + +template +bool parse_as(T& out, const char *s) +{ + std::istringstream strm(s); + return (strm>>out).fail() || !strm.eof(); +} + +int help(int ret, const char* argv0) +{ + std::cerr<< + "Usage: "<] [-# ] [-S ] ... [-H ] ...\n" + "\n" + " -h \n" + ; + std::cerr.flush(); + return ret; +} + +} // namespace + +int main(int argc, char* argv[]) +{ + // Read $PVXS_LOG from process environment and update + // logging configuration. eg. + // export PVXS_LOG=*=DEBUG + // makes a lot of noise. + logger_level_set(app.name, Level::Info); + logger_config_env(); + + auto spamsrc = std::make_shared(); + auto hamsrc = server::StaticSource::build(); + + auto hampv(server::SharedPV::buildReadonly()); + auto hamval(nt::NTScalar{TypeCode::UInt32}.create()); + hampv.open(hamval); + + double ham_period = 1.0; + size_t nelem = 1; + + int opt; + { + while((opt = getopt(argc, argv, "hS:H:T:#:")) != -1) { + switch (opt) { + case 'h': + return help(0, argv[0]); + default: + std::cerr<<"Unknown argument -"<names->insert(optarg); + break; + case 'H': + hamsrc.add(optarg, hampv); + break; + case 'T': + if(parse_as(ham_period, optarg)) { + std::cerr<<"Unable to parse period: "<set_nelem(nelem); + + // Build server which will serve this PV + // Configure using process environment. + server::Server serv = server::Server::fromEnv() + .addSource("spamsrc", spamsrc) + .addSource("hamsrc", hamsrc.source()); + + // (optional) Print the configuration this server is using + // with any auto-address list expanded. + { + Detailed d(std::cout, 1); + std::cout<