redo evhelper::dispatch()/call()

void races and
This commit is contained in:
Michael Davidsaver
2020-03-02 18:18:14 -08:00
parent 4f673cbbff
commit 44331decf9
2 changed files with 117 additions and 114 deletions
+113 -110
View File
@@ -10,6 +10,7 @@
#include <cstring>
#include <system_error>
#include <deque>
#include <event2/event.h>
#include <event2/thread.h>
@@ -18,19 +19,39 @@
#include <osiSock.h>
#include <epicsEvent.h>
#include <epicsThread.h>
#include <epicsMutex.h>
#include <epicsGuard.h>
#include "evhelper.h"
#include "pvaproto.h"
#include "utilpvt.h"
#include <pvxs/log.h>
typedef epicsGuard<epicsMutex> Guard;
namespace pvxs {namespace impl {
DEFINE_LOGGER(logerr, "pvxs.loop");
thread_local epicsEvent call_done;
struct evbase::Pvt : public epicsThreadRunable
{
event_base* base;
struct Work {
std::function<void()> fn;
std::exception_ptr *result;
epicsEvent *notify;
Work(const std::function<void()>& fn, std::exception_ptr *result, epicsEvent *notify)
:fn(fn), result(result), notify(notify)
{}
};
std::deque<Work> actions;
std::unique_ptr<event_base> base;
evevent dowork;
epicsEvent start_sync;
epicsMutex lock;
epicsThread worker;
Pvt(const std::string& name, unsigned prio)
@@ -49,131 +70,107 @@ struct evbase::Pvt : public epicsThreadRunable
# error No threading support for this target
// TODO fallback to libCom ?
#endif
worker.start();
start_sync.wait();
if(!base) {
throw std::runtime_error("event_base_new() fails");
}
}
virtual ~Pvt() {
if(event_base_loopexit(base, nullptr))
log_crit_printf(logerr, "evbase error while interrupting loop for %p\n", base);
if(event_base_loopexit(base.get(), nullptr))
log_crit_printf(logerr, "evbase error while interrupting loop for %p\n", base.get());
worker.exitWait();
event_base_free(base);
}
virtual void run() override final
{
log_info_printf(logerr, "Enter loop worker for %p\n", base);
try {
decltype (base) tbase(event_base_new());
if(evthread_make_base_notifiable(tbase.get())) {
throw std::runtime_error("evthread_make_base_notifiable");
}
int ret = event_base_loop(base, EVLOOP_NO_EXIT_ON_EMPTY);
evevent handle(event_new(tbase.get(), -1, EV_TIMEOUT, &doWorkS, this));
// TODO: cleanup after pending event_base_once()
base = std::move(tbase);
dowork = std::move(handle);
start_sync.trigger();
auto lvl = ret ? Level::Crit : Level::Info;
log_printf(logerr, lvl, "Exit loop worker: %d for %p\n", ret, base);
log_info_printf(logerr, "Enter loop worker for %p\n", base.get());
int ret = event_base_loop(base.get(), EVLOOP_NO_EXIT_ON_EMPTY);
auto lvl = ret ? Level::Crit : Level::Info;
log_printf(logerr, lvl, "Exit loop worker: %d for %p\n", ret, base.get());
}catch(std::exception& e){
log_crit_printf(logerr, "Unhandled exception in event_base run : %s\n",
e.what());
start_sync.trigger();
}
}
void doWork()
{
decltype (actions) todo;
{
Guard G(lock);
todo = std::move(actions);
}
for(auto& work : todo) {
try {
work.fn();
}catch(std::exception& e){
if(work.result) {
Guard G(lock);
*work.result = std::current_exception();
} else {
log_crit_printf(logerr, "Unhandled exception in event_base : %s : %s\n",
typeid(e).name(), e.what());
}
}
if(work.notify)
work.notify->trigger();
}
}
static
void doWorkS(evutil_socket_t sock, short evt, void *raw)
{
auto self =static_cast<Pvt*>(raw);
try {
self->doWork();
}catch(std::exception& e){
log_crit_printf(logerr, "Unhandled error in doWorkS callback: %s\n", e.what());
}
}
};
evbase::evbase(const std::string &name, unsigned prio)
:pvt(new Pvt(name, prio))
,base(event_base_new())
{
if(!base) {
throw std::runtime_error("event_base_new() fails");
}
if(evthread_make_base_notifiable(base)) {
event_base_free(base);
throw std::runtime_error("evthread_make_base_notifiable() fails");
}
pvt->base = base;
log_info_printf(logerr, "Starting loop worker for %p\n", base);
pvt->worker.start();
}
,base(pvt->base.get())
{}
evbase::~evbase() {}
static void evhelper_sync_done(evutil_socket_t _fd, short _ev, void *raw)
{
auto wait = static_cast<epicsEvent*>(raw);
wait->signal();
}
void evbase::sync()
{
assert(!pvt->worker.isCurrentThread());
epicsEvent wait;
if(event_base_once(base, (evutil_socket_t)-1, EV_TIMEOUT, &evhelper_sync_done, &wait, nullptr)!=0)
throw std::runtime_error("event_base_once fails");
wait.wait();
}
namespace {
void dispatch_action(evutil_socket_t _fd, short _ev, void *raw)
{
try {
// take ownership of raw
std::unique_ptr<std::function<void()> > action(reinterpret_cast<std::function<void()>*>(raw));
(*action)();
}catch(std::exception& e){
log_crit_printf(logerr, "evhelper::call unhandled error %s : %s\n", typeid(&e).name(), e.what());
}
}
call([](){});
}
void evbase::dispatch(std::function<void()>&& fn)
{
std::unique_ptr<std::function<void()> > action(new std::function<void()>(std::move(fn)));
if(event_base_once(base, -1, EV_TIMEOUT, &dispatch_action, action.get(), nullptr)==0) {
// successfully queued. No longer my responsibility
action.release();
} else {
throw std::runtime_error("Unable to queue dispatch()");
bool empty;
{
Guard G(pvt->lock);
empty = pvt->actions.empty();
pvt->actions.emplace_back(std::move(fn), nullptr, nullptr);
}
}
// queue request to execute in event loop after at least delay seconds have passed
// @param delay second in future. must be finite and >=0
void evbase::later(double delay, std::function<void()>&& fn)
{
timeval tv;
tv.tv_sec = unsigned(delay);
tv.tv_usec = unsigned(delay*1e6)%1000000;
std::unique_ptr<std::function<void()> > action(new std::function<void()>(std::move(fn)));
if(event_base_once(base, -1, EV_TIMEOUT, &dispatch_action, action.get(), &tv)==0) {
// successfully queued. No longer my responsibility
action.release();
} else {
throw std::runtime_error("Unable to queue dispatch()");
}
}
namespace {
struct action_args {
std::function<void()> fn;
epicsEvent wait;
std::exception_ptr err;
action_args(std::function<void()>&& fn) :fn(std::move(fn)) {}
};
void call_action(evutil_socket_t _fd, short _ev, void *raw)
{
auto args(reinterpret_cast<action_args*>(raw));
try {
try {
args->fn();
}catch(std::exception& e){
args->err = std::current_exception();
}
args->wait.signal();
}catch(std::exception& e){
log_crit_printf(logerr, "evhelper::call unhandled error: %s\n", e.what());
args->wait.signal();
}
}
timeval now{};
if(empty && event_add(pvt->dowork.get(), &now))
throw std::runtime_error("Unable to wakeup dispatch()");
}
void evbase::call(std::function<void()>&& fn)
@@ -183,17 +180,23 @@ void evbase::call(std::function<void()>&& fn)
return;
}
action_args args(std::move(fn));
if(event_base_once(base, -1, EV_TIMEOUT, &call_action, &args, nullptr)==0) {
// successfully queued.
args.wait.wait();
if(args.err) {
std::rethrow_exception(args.err);
}
} else {
throw std::runtime_error("Unable to queue call()");
auto& done = call_done;
std::exception_ptr result;
bool empty;
{
Guard G(pvt->lock);
empty = pvt->actions.empty();
pvt->actions.emplace_back(std::move(fn), &result, &done);
}
timeval now{};
if(empty && event_add(pvt->dowork.get(), &now))
throw std::runtime_error("Unable to wakeup call()");
done.wait();
Guard G(pvt->lock);
if(result)
std::rethrow_exception(result);
}
void evbase::assertInLoop()
@@ -264,7 +267,7 @@ void evsocket::mcast_join(const SockAddr& grp, const SockAddr& iface) const
if(grp.family()!=iface.family() || grp.family()!=AF_INET)
throw std::invalid_argument("Unsupported address family");
ip_mreq req;
ip_mreq req{};
req.imr_multiaddr.s_addr = grp->in.sin_addr.s_addr;
req.imr_interface.s_addr = iface->in.sin_addr.s_addr;