This commit is contained in:
Michael Davidsaver
2017-07-10 14:03:39 +02:00
parent 8f4fafd668
commit eafd962d78
4 changed files with 45 additions and 6 deletions

View File

@ -58,6 +58,7 @@ struct Getter : public TestClientChannel::GetCallback,
virtual ~Getter()
{
channel.removeConnectListener(this);
op.cancel();
}
virtual void getDone(const TestGetEvent& event)

View File

@ -110,12 +110,15 @@ struct WorkQueue : public epicsThreadRunable {
};
WorkQueue monwork;
epicsMutex mutex;
epicsEvent done;
volatile size_t waitingFor;
#ifdef USE_SIGNAL
void sigdone(int num)
{
(void)num;
waitingFor = 0;
done.signal();
}
#endif
@ -127,13 +130,16 @@ struct MonTracker : public TestClientChannel::MonitorCallback,
POINTER_DEFINITIONS(MonTracker);
MonTracker(const std::string& name) :name(name) {}
virtual ~MonTracker() {}
virtual ~MonTracker() {mon.cancel();}
const std::string name;
TestMonitor mon;
virtual void monitorEvent(const TestMonitorEvent& evt) OVERRIDE FINAL
{
// shared_from_this() will fail as Cancel is delivered in our dtor.
if(evt.event==TestMonitorEvent::Cancel) return;
// running on internal provider worker thread
// minimize work here.
// TODO: bound queue size
@ -154,7 +160,9 @@ struct MonTracker : public TestClientChannel::MonitorCallback,
std::cout<<"Disconnect "<<name<<"\n";
break;
case TestMonitorEvent::Data:
while(mon.poll()) {
{
unsigned n;
for(n=0; n<2 && mon.poll(); n++) {
pvd::PVField::const_shared_pointer fld(mon.root->getSubField("value"));
if(!fld)
fld = mon.root;
@ -163,6 +171,11 @@ struct MonTracker : public TestClientChannel::MonitorCallback,
<<" Changed:"<<mon.changed
<<" overrun:"<<mon.overrun<<"\n";
}
if(n==2) {
// too many updates, re-queue to balance with others
monwork.push(shared_from_this(), evt);
}
}
break;
}
}
@ -235,6 +248,11 @@ int main(int argc, char *argv[]) {
std::vector<MonTracker::shared_pointer> monitors;
{
Guard G(mutex);
waitingFor = pvs.size();
}
for(pvs_t::const_iterator it=pvs.begin(); it!=pvs.end(); ++it) {
const std::string& pv = *it;
@ -247,10 +265,18 @@ int main(int argc, char *argv[]) {
monitors.push_back(mon);
}
if(waitTime<0.0)
done.wait();
else
done.wait(waitTime);
{
Guard G(mutex);
while(waitingFor) {
UnGuard U(G);
if(waitTime<0.0) {
done.wait();
} else if(!done.wait(waitTime)) {
std::cerr<<"Timeout\n";
break; // timeout
}
}
}
} catch(std::exception& e){
std::cout<<"Error: "<<e.what()<<"\n";

View File

@ -30,7 +30,10 @@ struct epicsShareClass TestOperation
TestOperation() {}
TestOperation(const std::tr1::shared_ptr<Impl>&);
~TestOperation();
//! Channel name
std::string name() const;
//! Immediate cancellation.
//! Does not wait for remote confirmation.
void cancel();
protected:
@ -64,7 +67,10 @@ struct epicsShareClass TestMonitor
TestMonitor(const std::tr1::shared_ptr<Impl>&);
~TestMonitor();
//! Channel name
std::string name() const;
//! Immediate cancellation.
//! Does not wait for remote confirmation.
void cancel();
//! updates root, changed, overrun
//! return true if root!=NULL
@ -123,10 +129,12 @@ public:
};
//! Issue request to retrieve current PV value
//! @param cb Completion notification callback. Must outlive TestOperation (call TestOperation::cancel() to force release)
TestOperation get(GetCallback* cb,
epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer());
//! Start an RPC call
//! @param cb Completion notification callback. Must outlive TestOperation (call TestOperation::cancel() to force release)
TestOperation rpc(GetCallback* cb,
const epics::pvData::PVStructure::const_shared_pointer& arguments,
epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer());
@ -140,6 +148,7 @@ public:
};
//! Initiate request to change PV
//! @param cb Completion notification callback. Must outlive TestOperation (call TestOperation::cancel() to force release)
//! TODO: produce bitset to mask fields being set
TestOperation put(PutCallback* cb,
epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer());
@ -150,6 +159,7 @@ public:
};
//! Begin subscription
//! @param cb Completion notification callback. Must outlive TestOperation (call TestOperation::cancel() to force release)
TestMonitor monitor(MonitorCallback *cb,
epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer());
@ -159,6 +169,7 @@ public:
virtual void connectEvent(const TestConnectEvent& evt)=0;
};
//! Append to list of listeners
//! @param cb Channel dis/connect notification callback. Must outlive TestClientChannel or call to removeConnectListener()
void addConnectListener(ConnectCallback*);
//! Remove from list of listeners
void removeConnectListener(ConnectCallback*);

View File

@ -49,6 +49,7 @@ struct TestMonitor::Impl : public pva::MonitorRequester
if(evt==TestMonitorEvent::Fail || evt==TestMonitorEvent::Cancel)
this->cb = 0; // last event
try {
UnGuard U(G);
cb->monitorEvent(event);