Portable C++ Timer Queue

Introduction

If you looked at my previous posts, you noticed I explored some of the nice things Boost Asio gives you, and how I implemented my own version.

I covered what I called callstack markers, which has more uses than it looks on the surface. Also covered strands, and why they are such a nice concept.

This time around, I’m going to focus on the equivalent of boost::timer::deadline_timer. The implementation shown here is custom made, portable C++11 and self contained.

If I had to roughly describe what deadline_timer is for while at the same time hinting at the implementation, I would say “deadline_timer allows you group together a bunch of handlers to be called when a timer expires“.
I think the “deadline” in deadline_timer is a bit misleading. With the definition of “deadline” being “the latest time or date by which something should be completed“, the first thing that crosses my mind is that it is used to cancel some operation that did not complete by the specified time/date. Such has to cancel a connect attempt. Certainly it can, but can be used for a lot more.
For simplicity, I’ll be calling it “timer(s)”, and not “deadline timer(s)”

A bit of brainstorming

So, how do we go about implementing timers?
At its core, you need a queue for the timers, and a worker thread that dequeues and executes timers according to the specified expiry times/dates. Lets call it “Timer Queue”. Win32 API uses the same name for such a thing: Timer Queues
What I explore in this post is a Timer Queue implementation. It allows one-shot timers. Higher level things such as grouping handlers in one timer and timers that can be reused (like boost Asio’s own implementation), can be built on top.

The trickiest thing to solve is how to correctly notify the worker thread that something happened that might change the current wait time.

For example:

  • Worker thread is waiting for timers to process
  • Main thread adds a timer A with expiry time of 10 seconds.
  • Worker thread detects there is a timer in the queue, and starts a wait based on that timer expiry time.
  • Main thread adds a timer B with expiry time 1 second
  • Worker thread needs to rethink the waiting time, since timer B should be executed before A.

Now, if you throw in a couple more threads adding or cancelling timers, things might get confusing. Personally, I think it’s a bad sign if to verify correctness of your multithreaded code you have to think too hard. Sure, performance is good, but first get it right. If performance is a problem, profile it, then think as hard as you need to get it right and fast.

My first pass at this was convoluted to say the least. There was a queue for the timers, then communication with the worker thread was done with another queue where I would pass commands such as “recalculate wait time”, or “execute this timer right now”, or “shutdown”.
To add insult to injury, those two queues were locked independently.
Sure enough, as I started to write this post to share the code, I’ve spotted another bug. Ever had that feeling of “Surely there must be a cleaner way to do this” ?

To be honest, in hindsight that first pass was quite bad. Maybe it just got that bad in small increments as I was focusing on the wrong problems.

What was sabotaging my efforts was that I was in the wrong mindset. I kept thinking off the thread-to-thread communication as “work to be done“, and thus the obvious choice there was to have a command queue.

After a moment of clarity, I changed my mindset from “work to be done” to “something has changed“, and a cleaner solution formed:

The thread wakes up when something has changed (e,g: Timer added/cancelled, or expired), and checks for work (e.g: execute handlers for expired timers). Extraneous notifications to wake up the thread have no side effects, since the thread wakes up, checks for work, recalculates the new wait time and goes back to waiting.

Implementation

The only helper class needed is a semaphore, which can be implemented in portable C++11 with a mutex, a condition_variable and a counter.

#pragma once
#include <mutex>
#include <condition_variable>

class Semaphore {
public:
    Semaphore(unsigned int count = 0) : m_count(count) {}

    void notify() {
        std::unique_lock<std::mutex> lock(m_mtx);
        m_count++;
        m_cv.notify_one();
    }

    void wait() {
        std::unique_lock<std::mutex> lock(m_mtx);
        m_cv.wait(lock, [this]() { return m_count > 0; });
        m_count--;
    }

    template <class Clock, class Duration>
    bool waitUntil(const std::chrono::time_point<Clock, Duration>& point) {
        std::unique_lock<std::mutex> lock(m_mtx);
        if (!m_cv.wait_until(lock, point, [this]() { return m_count > 0; }))
            return false;
        m_count--;
        return true;
    }

private:
    std::mutex m_mtx;
    std::condition_variable m_cv;
    unsigned int m_count;
};

A few more useful methods could be added to Semaphore, but were omitted to keep it short.

  • bool waitFor(duration) : To wait for specified duration before timeout
  • bool tryWait() : To check and decrement the semaphore without waiting.

And the TimerQueue implementation, heavily documented to further explain the implementation…

#pragma once
#include "Semaphore.h"
#include <thread>
#include <queue>
#include <chrono>
#include <assert.h>

// Timer Queue
//
// Allows execution of handlers at a specified time in the future
// Guarantees:
//	- All handlers are executed ONCE, even if canceled (aborted parameter will
//be set to true)
//		- If TimerQueue is destroyed, it will cancel all handlers.
//	- Handlers are ALWAYS executed in the Timer Queue worker thread.
//	- Handlers execution order is NOT guaranteed
//
class TimerQueue {
public:
    TimerQueue() {
        m_th = std::thread([this] { run(); });
    }

    ~TimerQueue() {
        cancelAll();
        // Abusing the timer queue to trigger the shutdown.
        add(0, [this](bool) { m_finish = true; });
        m_th.join();
    }

    //! Adds a new timer
    // \return
    //	Returns the ID of the new timer. You can use this ID to cancel the
    // timer
    uint64_t add(int64_t milliseconds, std::function<void(bool)> handler) {
        WorkItem item;
        item.end = Clock::now() + std::chrono::milliseconds(milliseconds);
        item.handler = std::move(handler);

        std::unique_lock<std::mutex> lk(m_mtx);
        uint64_t id = ++m_idcounter;
        item.id = id;
        m_items.push(std::move(item));
        lk.unlock();

        // Something changed, so wake up timer thread
        m_checkWork.notify();
        return id;
    }

    //! Cancels the specified timer
    // \return
    //	1 if the timer was cancelled.
    //	0 if you were too late to cancel (or the timer ID was never valid to
    // start with)
    size_t cancel(uint64_t id) {
        // Instead of removing the item from the container (thus breaking the
        // heap integrity), we set the item as having no handler, and put
        // that handler on a new item at the top for immediate execution
        // The timer thread will then ignore the original item, since it has no
        // handler.
        std::unique_lock<std::mutex> lk(m_mtx);
        for (auto&& item : m_items.getContainer()) {
            if (item.id == id && item.handler) {
                WorkItem newItem;
                // Zero time, so it stays at the top for immediate execution
                newItem.end = Clock::time_point();
                newItem.id = 0;  // Means it is a canceled item
                // Move the handler from item to newitem.
                // Also, we need to manually set the handler to nullptr, since
                // the standard does not guarantee moving an std::function will
                // empty it. Some STL implementation will empty it, others will
                // not.
                newItem.handler = std::move(item.handler);
                item.handler = nullptr;
                m_items.push(std::move(newItem));

                lk.unlock();
                // Something changed, so wake up timer thread
                m_checkWork.notify();
                return 1;
            }
        }
        return 0;
    }

    //! Cancels all timers
    // \return
    //	The number of timers cancelled
    size_t cancelAll() {
        // Setting all "end" to 0 (for immediate execution) is ok,
        // since it maintains the heap integrity
        std::unique_lock<std::mutex> lk(m_mtx);
        for (auto&& item : m_items.getContainer()) {
            if (item.id) {
                item.end = Clock::time_point();
                item.id = 0;
            }
        }
        auto ret = m_items.size();

        lk.unlock();
        m_checkWork.notify();
        return ret;
    }

private:
    using Clock = std::chrono::steady_clock;
    TimerQueue(const TimerQueue&) = delete;
    TimerQueue& operator=(const TimerQueue&) = delete;

    void run() {
        while (!m_finish) {
            auto end = calcWaitTime();
            if (end.first) {
                // Timers found, so wait until it expires (or something else
                // changes)
                m_checkWork.waitUntil(end.second);
            } else {
                // No timers exist, so wait forever until something changes
                m_checkWork.wait();
            }

            // Check and execute as much work as possible, such as, all expired
            // timers
            checkWork();
        }

        // If we are shutting down, we should not have any items left,
        // since the shutdown cancels all items
        assert(m_items.size() == 0);
    }

    std::pair<bool, Clock::time_point> calcWaitTime() {
        std::lock_guard<std::mutex> lk(m_mtx);
        while (m_items.size()) {
            if (m_items.top().handler) {
                // Item present, so return the new wait time
                return std::make_pair(true, m_items.top().end);
            } else {
                // Discard empty handlers (they were cancelled)
                m_items.pop();
            }
        }

        // No items found, so return no wait time (causes the thread to wait
        // indefinitely)
        return std::make_pair(false, Clock::time_point());
    }

    void checkWork() {
        std::unique_lock<std::mutex> lk(m_mtx);
        while (m_items.size() && m_items.top().end <= Clock::now()) {
            WorkItem item(std::move(m_items.top()));
            m_items.pop();

            lk.unlock();
            if (item.handler)
                item.handler(item.id == 0);
            lk.lock();
        }
    }

    Semaphore m_checkWork;
    std::thread m_th;
    bool m_finish = false;
    uint64_t m_idcounter = 0;

    struct WorkItem {
        Clock::time_point end;
        uint64_t id;  // id==0 means it was cancelled
        std::function<void(bool)> handler;
        bool operator>(const WorkItem& other) const {
            return end > other.end;
        }
    };

    std::mutex m_mtx;
    // Inheriting from priority_queue, so we can access the internal container
    class Queue : public std::priority_queue<WorkItem, std::vector<WorkItem>,
                                             std::greater<WorkItem>> {
    public:
        std::vector<WorkItem>& getContainer() {
            return this->c;
        }
    } m_items;
};

And a small example just adding and cancelling timers…

#include "TimerQueue.h"
#include <future>

namespace Timing {

using Clock = std::chrono::high_resolution_clock;
static thread_local Clock::time_point ms_previous;
double now() {
    static auto start = Clock::now();
    return std::chrono::duration<double, std::milli>(Clock::now() - start)
        .count();
}

void sleep(unsigned ms) {
    std::this_thread::sleep_for(std::chrono::milliseconds(ms));
}

}  // namespace Timing

int main() {
    TimerQueue q;

    // Create timer with ID 1
    q.add(10000, [start = Timing::now()](bool aborted) mutable {
        printf("ID 1: aborted=%s, Elapsed %4.2fms\n",
               aborted ? "true " : "false", Timing::now() - start);
    });

    // Create Timer with ID 2
    q.add(10001, [start = Timing::now()](bool aborted) mutable {
        printf("ID 2: aborted=%s, Elapsed %4.2fms\n",
               aborted ? "true " : "false", Timing::now() - start);
    });

    // Should cancel timers with ID 1 and 2
    q.cancelAll();

    // Create timer with ID 3
    q.add(1000, [start = Timing::now()](bool aborted) mutable {
        printf("ID 3: aborted=%s, Elapsed %4.2fms\n",
               aborted ? "true " : "false", Timing::now() - start);
    });

    // Create timer with ID 4
    auto id = q.add(2000, [start = Timing::now()](bool aborted) mutable {
        printf("ID 4: aborted=%s, Elapsed %4.2fms\n",
               aborted ? "true " : "false", Timing::now() - start);
    });

    // Cancel timer with ID 4
    auto ret = q.cancel(id);
    assert(ret == 1);

    // Give just enough time to execute timer with ID 3 before destroying the
    // TimerQueue
    Timing::sleep(1500);

    // At this point, when destroying TimerQueue, the timer with ID 4 is still
    // pending and will be cancelled implicitly by the destructor
    return 0;
}

The output will be this (with varying *Elapsed” values)…

ID 1: aborted=true , Elapsed 0.01ms
ID 2: aborted=true , Elapsed 0.03ms
ID 4: aborted=true , Elapsed 0.03ms
ID 3: aborted=false, Elapsed 1000.92ms
Things to improve

Main objective with this implementation was to have a small, self-contained and portable implementation. Therefore a few things can be improved if performance is a problem:

  • Use some kind of lockless queue
  • Cancelling a timer requires iterating through all the timers (the worst case scenario)
  • Add checks in TimerQueue::add to detect if we are shutting down
  • Try to get rid of unnecessary wake ups.
    • For example, for simplicity, adding or cancelling a timer makes no effort to detect if it’s really necessary to wake up the timer thread.
  • Using a uint64_t to identify timers works fine, but it’s error prone.
  • A fresh pair of eyes looking at the code, since this was a quick implementation with some basic tests.

Conclusion

Although TimerQueue is useful as-is, if you are familiar with Boost’s Asio deadline_timer, you will spot a couple of differences:

  • TimerQueue is not in any way associated with something akin to Boost Asio’s io_service
    • Handlers are executed in the TimerQueue’s own thread.
  • Timers are one-shot
    • Boost Asio deadline_timer is reusable.

A close match to Boost Asio deadline_timer can be implemented on top of TimerQueue by having a Timer class that aggregates handlers, and puts a one-off timer on TimerQueue whenever we set an expiry time. That one-off timer will run any handlers the Timer instance owns. Even more, instead of executing the handlers in the TimerQueue thread, Timer can forward execution of the handlers to another object, thus giving the application control when/where to execute the handlers.

While writing this, I took a quick look at how Boost’s Asio’s implements deadline timers (on Windows), to see if it was significantly different. Particularly, I was curious how deadline_timer is tied to io_service. If there is another thread involved for triggering timer related things, or if somehow everything was controlled by Windows Completion Ports.
From my short study of the beast’s guts, it seems it uses a somewhat similar method:

  • There is a thread for timer related things (struct win_iocp_io_service::timer_thread_function )
    • This thread just loops until shutdown, waiting on a Win32 waitable timer object for something to happen.
    • When this waitable timer expires, it uses PostQueuedCompletionStatus on the associated Completion Port, to trigger any handler execution in the appropriate threads (where the user is calling io_service::run)
  • Changes to deadline_timer instances set that win32 waitable timer

I said somewhat similar, since my implementation executes handlers in the timer queue thread itself when a timer expires. In Boost Asio, when a timer expires, the timer thread sends a signal to the respective io_service, and the handlers are executed there, as explained above.

Feel free to comment with corrections, suggestions, or ask me to cover other areas.

License

The source code in this article is licensed under the CC0 license, so feel free to copy, modify, share, do whatever you want with it.
No attribution is required, but I’ll be happy if you do.

Updates

  • 2016-08-25 : Fixed a bug in cancel (Thanks to Daniel’s comment)
    • According to the standard, moving an std::function does not guarantee it will empty it. Visual Studio’s empties it, and so I missed that bug.
  • 2016-09-01 : Fixed a bug in cancelAll (Thanks to Patrick’s comment)
    • Because calls to cancel don’t remove the original item, but set the handler to nullptr so that the worker thread then ignores those, cancelAll was then ignoring those items and not changing the expiry time (for immediate execution). This meant destroying a TimerQueue would assert since the container would not be empty by the time the worker thread finished.
    • The behaviour was still correct since the items left in the container at the time of destruction were already cancelled items (no handler to execute).
,

Leave a Reply

10 Comments on "Portable C++ Timer Queue"

Notify of
avatar
Daniel
Guest

I tried out your timer queue implementation on OS-X and think I found a bug:

the cancelled timers will fire again later, because it seems the original handler is not set to nullptr automatically with

newItem.handler = std::move(item.handler);

If I set item.handler to nullptr, everything works fine for me. Btw, what is the exact license of your code above ?

Patrick
Guest

I think by solving this issue in the cancel, another bug was introduced. Since the handler of that item is a nullptr, it’s end won’t be reset by cancelAll, and the queue won’t be empty (leading to the assert at the end of run being false)

Pawel
Guest
Rui, I was looking for a modern implementation of something closest to a timer wheel it seems to do the job for me but i also wanted to see periodic option so i created. Here is the patch. — TimerQueue.h.org 2017-01-10 21:12:38.359332810 -0800 +++ TimerQueue.h 2017-01-10 21:21:42.066201334 -0800 @@ -24,7 +24,7 @@ ~TimerQueue() { cancelAll(); // Abusing the timer queue to trigger the shutdown. – add(0, [this](bool) { m_finish = true; }); + add(0, false,[this](bool) { m_finish = true; }); m_th.join(); } @@ -32,7 +32,7 @@ // return // Returns the ID of the new timer. You can use this… Read more »
wpDiscuz