client Add Operation::wait()

This commit is contained in:
Michael Davidsaver
2020-03-11 11:41:22 -07:00
parent 92228556d7
commit 25e712eb2a
6 changed files with 184 additions and 17 deletions
+56
View File
@@ -11,6 +11,7 @@
#include <osiSock.h>
#include <dbDefs.h>
#include <epicsThread.h>
#include <epicsGuard.h>
#include <pvxs/log.h>
#include <clientimpl.h>
@@ -19,6 +20,9 @@ DEFINE_LOGGER(setup, "pvxs.client.setup");
DEFINE_LOGGER(io, "pvxs.client.io");
DEFINE_LOGGER(duppv, "pvxs.client.dup");
typedef epicsGuard<epicsMutex> Guard;
typedef epicsGuardRelease<epicsMutex> UnGuard;
namespace pvxs {
namespace client {
@@ -52,6 +56,16 @@ Connected::Connected(const std::string& peerName)
Connected::~Connected() {}
Interrupted::Interrupted()
:std::runtime_error ("Interrupted")
{}
Interrupted::~Interrupted() {}
Timeout::Timeout()
:std::runtime_error ("Interrupted")
{}
Timeout::~Timeout() {}
Channel::Channel(const std::shared_ptr<Context::Pvt>& context, const std::string& name, uint32_t cid)
:context(context)
,name(name)
@@ -117,6 +131,35 @@ void Channel::disconnect(const std::shared_ptr<Channel>& self)
}
Value ResultWaiter::wait(double timeout)
{
Guard G(lock);
while(outcome==Busy) {
UnGuard U(G);
if(!notify.wait(timeout))
throw Timeout();
}
if(outcome==Done)
return result();
else
throw Interrupted();
}
void ResultWaiter::complete(Result&& result, bool interrupt)
{
bool wakeup;
{
Guard G(lock);
wakeup = outcome==Busy;
if(wakeup) {
this->result = std::move(result);
outcome = interrupt ? Abort : Done;
}
}
if(wakeup)
notify.trigger();
}
OperationBase::OperationBase(operation_t op, const std::shared_ptr<Channel>& chan)
:Operation(op)
,chan(chan)
@@ -124,6 +167,19 @@ OperationBase::OperationBase(operation_t op, const std::shared_ptr<Channel>& cha
OperationBase::~OperationBase() {}
Value OperationBase::wait(double timeout)
{
if(!waiter)
throw std::logic_error("Operation has custom .result() callback");
return waiter->wait(timeout);
}
void OperationBase::interrupt()
{
if(waiter)
waiter->complete(Result(), true);
}
RequestInfo::RequestInfo(uint32_t sid, uint32_t ioid, std::shared_ptr<OperationBase>& handle)
:sid(sid)
,ioid(ioid)