Simple way to shutdown multiple consumer threads

In my earlier post, I said I would be dissecting strands. Since I’m using strands with something like Boost Asio (mentioned here), it’s not feasible to share all those dependencies just to explain strands.

So, as I though of a minimalistic multiple producer / multiple consumer queue that could give me a similar interface and behaviour to Boost Asio’s io_service, an apparently simple problem got me stuck for a while…

Given a multiple consumer queue (e.g WorkQueue) that has a run method that blocks executing work items, and a stop method that signals all consumers to stop, how do I implement stop in a very simple way, so that all currently executing calls to run exit?

By very simple way I mean without requiring too much thinking to verify correctness.

First solution that crossed my mind was having the consumers waiting on two things. Waiting for work items OR a signal to finish. But as far as I’m aware, C++ doesn’t allow waiting on multiple condition variables (at the time of writing, at least).

The simplest way I came up with was to use the work queue itself to signal the consumers to finish.

  • A call to stop enqueues a finish work item, which triggers the shutdown
  • When a consumer dequeues a finish, it enqueues another finish before shutting down
  • The finish work item enqueued by the last consumer shutting down, or extra calls to stop has no side effects
// Really simple Multiple producer / Multiple consumer work queue
class WorkQueue {
public:
    // Add a new work item
    template <typename F>
    void push(F w) {
        std::lock_guard<std::mutex> lock(m_mtx);
        m_q.push(std::move(w));
        m_cond.notify_all();
    }
 
    // Continuously waits for and executes any work items, until "stop" is
    // called
    void run() {
        Callstack<WorkQueue>::Context ctx(this);
        while (true) {
            std::function<void()> w;
            {
                std::unique_lock<std::mutex> lock(m_mtx);
                m_cond.wait(lock, [this] { return !m_q.empty(); });
                w = std::move(m_q.front());
                m_q.pop();
            }
 
            if (w) {
                w();
            } else {
                // An empty work item means we are shutting down, so enqueue
                // another empty work item. This will in turn shut down another
                // thread that is executing "run"
                push(nullptr);
                return;
            }
        };
    }
 
    // Causes any calls to "run" to exit.
    void stop() {
        push(nullptr);
    }
 
    // Tells if "run" is executing in the current thread
    bool canDispatch() {
        return Callstack<WorkQueue>::contains(this) != nullptr;
    }
 
private:
    std::condition_variable m_cond;
    std::mutex m_mtx;
    std::queue<std::function<void()>> m_q;
};

Note that WorkQueue makes use of the CallStack class I introduced in the previous post .

And some sample code testing it…

#include "WorkQueue.h"
 
int main() {
    WorkQueue work;
 
    // Start a couple of consumer threads
    std::vector<std::thread> ths;
    for (int i = 0; i < 4; i++) {
        ths.push_back(std::thread([&work] { work.run(); }));
    }
 
    std::vector<int> res;
    res.resize(10000);
 
    // Enqueue work.
    for (int i = 0; i < static_cast<int>(res.size()); i++) {
        // These work items simply increment the element at index i.
        work.push([i, &res] { res[i]++; });
    }
 
    // Stop consumers, and wait for the threads to finish
    work.stop();
    for (auto&& t : ths) t.join();
 
    // Test if all work items were executed
    for (int i = 0; i < static_cast<int>(res.size()); i++) {
        if (res[i] != 1)
            printf("ERROR: Index %d set to %dn", i, res[i]);
    }
}

Of course for such lightweight work items such as these, this will have terrible performance. But for sufficiently complex work items, I don’t see any obvious problems with this approach.

License

The source code in this article is licensed under the CC0 license, so feel free to copy, modify, share, do whatever you want with it.
No attribution is required, but I’ll be happy if you do.

0 0 votes
Article Rating
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x