rework Timer

I'm not sure how I broke it, but I know I don't have the patience
to fix it.  So replacing intrusive list and custom sorting with
std::list and std::list::merge().
This commit is contained in:
Michael Davidsaver
2018-04-04 20:51:23 -07:00
parent 7b8ef390ce
commit 1e1d94ed73
2 changed files with 87 additions and 112 deletions

View File

@@ -37,63 +37,44 @@ Timer::Timer(string threadName,ThreadPriority priority)
thread(threadName,priority,this)
{}
struct TimerCallback::IncreasingTime {
bool operator()(const TimerCallbackPtr& lhs, const TimerCallbackPtr& rhs) {
assert(lhs && rhs);
return lhs->timeToRun < rhs->timeToRun;
}
};
// call with mutex held
void Timer::addElement(TimerCallbackPtr const & timerCallback)
{
assert(!timerCallback->onList);
assert(!timerCallback->next);
queue_t temp;
temp.push_back(timerCallback);
timerCallback->onList = true;
if(!head) {
head = timerCallback;
timerCallback->next.reset();
return;
}
TimerCallbackPtr nextNode(head), prevNode;
while(true) {
if(timerCallback->timeToRun < nextNode->timeToRun) {
if(prevNode) {
prevNode->next = timerCallback;
} else {
head = timerCallback;
}
timerCallback->next = nextNode;
return;
}
if(nextNode->next.get()==NULL) {
nextNode->next = timerCallback;
timerCallback->next.reset();
return;
}
prevNode = nextNode;
nextNode = nextNode->next;
}
// merge sorted lists.
// for us effectively insertion sort.
queue.merge(temp, TimerCallback::IncreasingTime());
}
void Timer::cancel(TimerCallbackPtr const &timerCallback)
bool Timer::cancel(TimerCallbackPtr const &timerCallback)
{
Lock xx(mutex);
if(!timerCallback->onList) return;
TimerCallbackPtr nextNode(head);
TimerCallbackPtr prevNode;
while(true) {
if(nextNode.get()==timerCallback.get()) {
if(prevNode) {
prevNode->next = timerCallback->next;
} else {
head = timerCallback->next;
}
timerCallback->next.reset();
timerCallback->onList = false;
return;
if(!timerCallback->onList) return false;
for(queue_t::iterator it(queue.begin()), end(queue.end()); it != end; ++it)
{
TimerCallbackPtr& cur = *it;
if(cur.get() == timerCallback.get()) {
queue.erase(it);
cur->onList = false;
// iteration now invalid
return true;
}
prevNode = nextNode;
nextNode = nextNode->next;
}
throw std::logic_error(string(""));
throw std::logic_error("Timer::cancel() onList==true, but not found");
}
bool Timer::isScheduled(TimerCallbackPtr const &timerCallback) const
@@ -107,56 +88,47 @@ void Timer::run()
{
epicsGuard<epicsMutex> G(mutex);
epicsTime now(epicsTime::getCurrent());
while(alive) {
double waitfor;
TimerCallbackPtr next;
if(queue.empty()) {
// no jobs, just go to sleep
epicsGuardRelease<epicsMutex> U(G);
epicsTime currentTime(epicsTime::getCurrent());
waitForWork.wait();
now = epicsTime::getCurrent();
double delay = -1;
} else if((waitfor = queue.front()->timeToRun - now) <= 0) {
// execute first expired job
if(head) {
// there may be work to be done
TimerCallbackPtr work;
work.swap(queue.front());
work->onList = false;
queue.pop_front();
delay = head->timeToRun - currentTime;
{
epicsGuardRelease<epicsMutex> U(G);
if(delay <= 0.0) {
// head timer has expired
work->callback();
}
// we take head, move head = head->next
next.swap(head);
head.swap(next->next);
if(work->period > 0.0) {
work->timeToRun += work->period;
addElement(work);
}
next->onList = false;
// don't update 'now' until all expired jobs run
// re-schedule periodic
if(next->period > 0.0) {
next->timeToRun += next->period;
addElement(next);
}
} else {
// wait for first un-expired
epicsGuardRelease<epicsMutex> U(G);
if(head) {
delay = head->timeToRun - currentTime;
}
}
};
bool hasHead = !!head;
{
epicsGuardRelease<epicsMutex> U(G);
if(next) {
next->callback();
}
if(hasHead) {
waitForWork.wait(delay);
} else {
waitForWork.wait();
}
}
}
waitForWork.wait(waitfor);
now = epicsTime::getCurrent();
}
}
}
Timer::~Timer() {
@@ -172,14 +144,14 @@ void Timer::close() {
}
waitForWork.signal();
thread.exitWait();
TimerCallbackPtr timerCallback;
while(true) {
timerCallback = head;
if(head.get()==NULL) break;
queue_t temp;
temp.swap(queue);
for(;!temp.empty(); temp.pop_front()) {
TimerCallbackPtr& head = temp.front();
head->onList = false;
head->timerStopped();
head = timerCallback->next;
timerCallback->next.reset();
timerCallback->onList = false;
}
}
@@ -195,42 +167,39 @@ void Timer::schedulePeriodic(
double delay,
double period)
{
if(isScheduled(timerCallback)) {
throw std::logic_error(string("already queued"));
}
epicsTime now(epicsTime::getCurrent());
bool wasempty;
{
Lock xx(mutex);
if(timerCallback->onList) {
throw std::logic_error(string("already queued"));
}
if(!alive) {
timerCallback->timerStopped();
return;
}
}
TimeStamp timeStamp;
timeStamp.getCurrent();
timeStamp += delay;
timerCallback->timeToRun.getCurrent();
timerCallback->timeToRun += delay;
timerCallback->period = period;
bool isFirst = false;
{
Lock xx(mutex);
timerCallback->timeToRun = now + delay;
timerCallback->period = period;
wasempty = queue.empty();
addElement(timerCallback);
if(timerCallback.get()==head.get()) isFirst = true;
}
if(isFirst) waitForWork.signal();
if(wasempty) waitForWork.signal();
}
void Timer::dump(std::ostream& o) const
{
Lock xx(mutex);
if(!alive) return;
epicsTime currentTime(epicsTime::getCurrent());
TimerCallbackPtr nodeToCall(head);
epicsTime now(epicsTime::getCurrent());
while(nodeToCall) {
o << "timeToRun " << (nodeToCall->timeToRun - currentTime)
for(queue_t::const_iterator it(queue.begin()), end(queue.end()); it!=end; ++it) {
const TimerCallbackPtr& nodeToCall = *it;
o << "timeToRun " << (nodeToCall->timeToRun - now)
<< " period " << nodeToCall->period << "\n";
nodeToCall = nodeToCall->next;
}
}