diff --git a/src/evhelper.cpp b/src/evhelper.cpp index 177b178..a1786dd 100644 --- a/src/evhelper.cpp +++ b/src/evhelper.cpp @@ -10,6 +10,7 @@ #include #include +#include #include #include @@ -18,19 +19,39 @@ #include #include #include +#include +#include #include "evhelper.h" #include "pvaproto.h" #include "utilpvt.h" #include +typedef epicsGuard Guard; + namespace pvxs {namespace impl { DEFINE_LOGGER(logerr, "pvxs.loop"); +thread_local epicsEvent call_done; + struct evbase::Pvt : public epicsThreadRunable { - event_base* base; + struct Work { + std::function fn; + std::exception_ptr *result; + epicsEvent *notify; + Work(const std::function& fn, std::exception_ptr *result, epicsEvent *notify) + :fn(fn), result(result), notify(notify) + {} + }; + std::deque actions; + + std::unique_ptr base; + evevent dowork; + epicsEvent start_sync; + epicsMutex lock; + epicsThread worker; Pvt(const std::string& name, unsigned prio) @@ -49,131 +70,107 @@ struct evbase::Pvt : public epicsThreadRunable # error No threading support for this target // TODO fallback to libCom ? #endif + + worker.start(); + start_sync.wait(); + if(!base) { + throw std::runtime_error("event_base_new() fails"); + } } virtual ~Pvt() { - if(event_base_loopexit(base, nullptr)) - log_crit_printf(logerr, "evbase error while interrupting loop for %p\n", base); + if(event_base_loopexit(base.get(), nullptr)) + log_crit_printf(logerr, "evbase error while interrupting loop for %p\n", base.get()); worker.exitWait(); - event_base_free(base); } virtual void run() override final { - log_info_printf(logerr, "Enter loop worker for %p\n", base); + try { + decltype (base) tbase(event_base_new()); + if(evthread_make_base_notifiable(tbase.get())) { + throw std::runtime_error("evthread_make_base_notifiable"); + } - int ret = event_base_loop(base, EVLOOP_NO_EXIT_ON_EMPTY); + evevent handle(event_new(tbase.get(), -1, EV_TIMEOUT, &doWorkS, this)); - // TODO: cleanup after pending event_base_once() + base = std::move(tbase); + dowork = std::move(handle); + start_sync.trigger(); - auto lvl = ret ? Level::Crit : Level::Info; - log_printf(logerr, lvl, "Exit loop worker: %d for %p\n", ret, base); + log_info_printf(logerr, "Enter loop worker for %p\n", base.get()); + + int ret = event_base_loop(base.get(), EVLOOP_NO_EXIT_ON_EMPTY); + + auto lvl = ret ? Level::Crit : Level::Info; + log_printf(logerr, lvl, "Exit loop worker: %d for %p\n", ret, base.get()); + + }catch(std::exception& e){ + log_crit_printf(logerr, "Unhandled exception in event_base run : %s\n", + e.what()); + start_sync.trigger(); + } + } + + void doWork() + { + decltype (actions) todo; + { + Guard G(lock); + todo = std::move(actions); + } + for(auto& work : todo) { + try { + work.fn(); + }catch(std::exception& e){ + if(work.result) { + Guard G(lock); + *work.result = std::current_exception(); + } else { + log_crit_printf(logerr, "Unhandled exception in event_base : %s : %s\n", + typeid(e).name(), e.what()); + } + } + if(work.notify) + work.notify->trigger(); + } + } + static + void doWorkS(evutil_socket_t sock, short evt, void *raw) + { + auto self =static_cast(raw); + try { + self->doWork(); + }catch(std::exception& e){ + log_crit_printf(logerr, "Unhandled error in doWorkS callback: %s\n", e.what()); + } } }; evbase::evbase(const std::string &name, unsigned prio) :pvt(new Pvt(name, prio)) - ,base(event_base_new()) -{ - if(!base) { - throw std::runtime_error("event_base_new() fails"); - } - if(evthread_make_base_notifiable(base)) { - event_base_free(base); - throw std::runtime_error("evthread_make_base_notifiable() fails"); - } - pvt->base = base; - log_info_printf(logerr, "Starting loop worker for %p\n", base); - pvt->worker.start(); -} + ,base(pvt->base.get()) +{} evbase::~evbase() {} -static void evhelper_sync_done(evutil_socket_t _fd, short _ev, void *raw) -{ - auto wait = static_cast(raw); - wait->signal(); -} - void evbase::sync() { - assert(!pvt->worker.isCurrentThread()); - - epicsEvent wait; - - if(event_base_once(base, (evutil_socket_t)-1, EV_TIMEOUT, &evhelper_sync_done, &wait, nullptr)!=0) - throw std::runtime_error("event_base_once fails"); - - wait.wait(); -} - -namespace { -void dispatch_action(evutil_socket_t _fd, short _ev, void *raw) -{ - try { - // take ownership of raw - std::unique_ptr > action(reinterpret_cast*>(raw)); - (*action)(); - }catch(std::exception& e){ - log_crit_printf(logerr, "evhelper::call unhandled error %s : %s\n", typeid(&e).name(), e.what()); - } -} + call([](){}); } void evbase::dispatch(std::function&& fn) { - std::unique_ptr > action(new std::function(std::move(fn))); - - if(event_base_once(base, -1, EV_TIMEOUT, &dispatch_action, action.get(), nullptr)==0) { - // successfully queued. No longer my responsibility - action.release(); - } else { - throw std::runtime_error("Unable to queue dispatch()"); + bool empty; + { + Guard G(pvt->lock); + empty = pvt->actions.empty(); + pvt->actions.emplace_back(std::move(fn), nullptr, nullptr); } -} - -// queue request to execute in event loop after at least delay seconds have passed -// @param delay second in future. must be finite and >=0 -void evbase::later(double delay, std::function&& fn) -{ - timeval tv; - tv.tv_sec = unsigned(delay); - tv.tv_usec = unsigned(delay*1e6)%1000000; - - std::unique_ptr > action(new std::function(std::move(fn))); - - if(event_base_once(base, -1, EV_TIMEOUT, &dispatch_action, action.get(), &tv)==0) { - // successfully queued. No longer my responsibility - action.release(); - } else { - throw std::runtime_error("Unable to queue dispatch()"); - } -} - -namespace { -struct action_args { - std::function fn; - epicsEvent wait; - std::exception_ptr err; - action_args(std::function&& fn) :fn(std::move(fn)) {} -}; - -void call_action(evutil_socket_t _fd, short _ev, void *raw) -{ - auto args(reinterpret_cast(raw)); - try { - try { - args->fn(); - }catch(std::exception& e){ - args->err = std::current_exception(); - } - args->wait.signal(); - }catch(std::exception& e){ - log_crit_printf(logerr, "evhelper::call unhandled error: %s\n", e.what()); - args->wait.signal(); - } -} + timeval now{}; + if(empty && event_add(pvt->dowork.get(), &now)) + throw std::runtime_error("Unable to wakeup dispatch()"); } void evbase::call(std::function&& fn) @@ -183,17 +180,23 @@ void evbase::call(std::function&& fn) return; } - action_args args(std::move(fn)); - - if(event_base_once(base, -1, EV_TIMEOUT, &call_action, &args, nullptr)==0) { - // successfully queued. - args.wait.wait(); - if(args.err) { - std::rethrow_exception(args.err); - } - } else { - throw std::runtime_error("Unable to queue call()"); + auto& done = call_done; + std::exception_ptr result; + bool empty; + { + Guard G(pvt->lock); + empty = pvt->actions.empty(); + pvt->actions.emplace_back(std::move(fn), &result, &done); } + + timeval now{}; + if(empty && event_add(pvt->dowork.get(), &now)) + throw std::runtime_error("Unable to wakeup call()"); + + done.wait(); + Guard G(pvt->lock); + if(result) + std::rethrow_exception(result); } void evbase::assertInLoop() @@ -264,7 +267,7 @@ void evsocket::mcast_join(const SockAddr& grp, const SockAddr& iface) const if(grp.family()!=iface.family() || grp.family()!=AF_INET) throw std::invalid_argument("Unsupported address family"); - ip_mreq req; + ip_mreq req{}; req.imr_multiaddr.s_addr = grp->in.sin_addr.s_addr; req.imr_interface.s_addr = iface->in.sin_addr.s_addr; diff --git a/src/evhelper.h b/src/evhelper.h index 0dfa98b..2c2a91a 100644 --- a/src/evhelper.h +++ b/src/evhelper.h @@ -25,6 +25,10 @@ // hooks for std::unique_ptr namespace std { template<> +struct default_delete { + inline void operator()(event_base* ev) { event_base_free(ev); } +}; +template<> struct default_delete { inline void operator()(event* ev) { event_free(ev); } }; @@ -65,10 +69,6 @@ struct PVXS_API evbase { // queue request to execute in event loop. return immediately. void dispatch(std::function&& fn); - // queue request to execute in event loop after at least delay seconds have passed - // @param delay second in future. must be finite and >=0 - void later(double delay, std::function&& fn); - // queue request to execute in event loop. return after executed. void call(std::function&& fn);