{"id":407,"date":"2016-03-13T03:22:36","date_gmt":"2016-03-13T03:22:36","guid":{"rendered":"http:\/\/www.crazygaze.com\/blog\/?p=407"},"modified":"2020-02-26T23:29:13","modified_gmt":"2020-02-26T23:29:13","slug":"simple-way-to-shutdown-multiple-consumer-threads","status":"publish","type":"post","link":"https:\/\/www.crazygaze.com\/blog\/2016\/03\/13\/simple-way-to-shutdown-multiple-consumer-threads\/","title":{"rendered":"Simple way to shutdown multiple consumer threads"},"content":{"rendered":"\n<p>In my earlier post, I said I would be dissecting strands. Since I&#8217;m using strands with something like Boost Asio (mentioned <a href=\"http:\/\/www.crazygaze.com\/blog\/2016\/03\/04\/boost-asio-thread-safety-and-reinventing-the-wheel\/\">here<\/a>), it&#8217;s not feasible to share all those dependencies just to explain strands.<\/p>\n\n\n\n<p>So, as I though of a minimalistic <em>multiple producer \/ multiple consumer queue<\/em> that could give me a similar interface and behaviour to Boost Asio&#8217;s <em>io_service<\/em>, an apparently simple problem got me stuck for a while&#8230;<\/p>\n\n\n\n<p>Given a multiple consumer queue (e.g <em>WorkQueue<\/em>) that has a <em>run<\/em> method that blocks executing work items, and a <em>stop<\/em> method that signals all consumers to stop, how do I implement <em>stop<\/em> in a very simple way, so that all currently executing calls to <em>run<\/em> exit?<\/p>\n\n\n\n<p>By very simple way I mean without requiring too much thinking to verify correctness.<\/p>\n\n\n\n<p>First solution that crossed my mind was having the consumers waiting on two things. Waiting for work items OR a signal to finish. But as far as I&#8217;m aware, C++ doesn&#8217;t allow waiting on multiple condition variables (at the time of writing, at least).<\/p>\n\n\n\n<p>The simplest way I came up with was to use the work queue itself to signal the consumers to finish.<\/p>\n\n\n\n<figure class=\"wp-block-image size-large\"><img loading=\"lazy\" width=\"559\" height=\"282\" src=\"https:\/\/www.crazygaze.com\/blog\/wp-content\/uploads\/2016\/03\/img_56e4d05d363c1.png\" alt=\"\" class=\"wp-image-409\" srcset=\"https:\/\/www.crazygaze.com\/blog\/wp-content\/uploads\/2016\/03\/img_56e4d05d363c1.png 559w, https:\/\/www.crazygaze.com\/blog\/wp-content\/uploads\/2016\/03\/img_56e4d05d363c1-300x151.png 300w, https:\/\/www.crazygaze.com\/blog\/wp-content\/uploads\/2016\/03\/img_56e4d05d363c1-100x50.png 100w, https:\/\/www.crazygaze.com\/blog\/wp-content\/uploads\/2016\/03\/img_56e4d05d363c1-150x76.png 150w, https:\/\/www.crazygaze.com\/blog\/wp-content\/uploads\/2016\/03\/img_56e4d05d363c1-200x101.png 200w, https:\/\/www.crazygaze.com\/blog\/wp-content\/uploads\/2016\/03\/img_56e4d05d363c1-450x227.png 450w\" sizes=\"(max-width: 559px) 100vw, 559px\" \/><\/figure>\n\n\n\n<ul><li> A call to <em>stop<\/em> enqueues a <em>finish<\/em> work item, which triggers the shutdown<\/li><li> When a consumer dequeues a <em>finish<\/em>, it enqueues another <em>finish<\/em> before shutting down<\/li><li> The <em>finish<\/em> work item enqueued by the last consumer shutting down, or extra calls to <em>stop<\/em> has no side effects <\/li><\/ul>\n\n\n<div class=\"wp-block-syntaxhighlighter-code \"><pre class=\"brush: cpp; title: ; notranslate\" title=\"\">\n\/\/ Really simple Multiple producer \/ Multiple consumer work queue\nclass WorkQueue {\npublic:\n    \/\/ Add a new work item\n    template &lt;typename F&gt;\n    void push(F w) {\n        std::lock_guard&lt;std::mutex&gt; lock(m_mtx);\n        m_q.push(std::move(w));\n        m_cond.notify_all();\n    }\n \n    \/\/ Continuously waits for and executes any work items, until &quot;stop&quot; is\n    \/\/ called\n    void run() {\n        Callstack&lt;WorkQueue&gt;::Context ctx(this);\n        while (true) {\n            std::function&lt;void()&gt; w;\n            {\n                std::unique_lock&lt;std::mutex&gt; lock(m_mtx);\n                m_cond.wait(lock, &#91;this] { return !m_q.empty(); });\n                w = std::move(m_q.front());\n                m_q.pop();\n            }\n \n            if (w) {\n                w();\n            } else {\n                \/\/ An empty work item means we are shutting down, so enqueue\n                \/\/ another empty work item. This will in turn shut down another\n                \/\/ thread that is executing &quot;run&quot;\n                push(nullptr);\n                return;\n            }\n        };\n    }\n \n    \/\/ Causes any calls to &quot;run&quot; to exit.\n    void stop() {\n        push(nullptr);\n    }\n \n    \/\/ Tells if &quot;run&quot; is executing in the current thread\n    bool canDispatch() {\n        return Callstack&lt;WorkQueue&gt;::contains(this) != nullptr;\n    }\n \nprivate:\n    std::condition_variable m_cond;\n    std::mutex m_mtx;\n    std::queue&lt;std::function&lt;void()&gt;&gt; m_q;\n};\n<\/pre><\/div>\n\n\n<p>Note that <em>WorkQueue<\/em> makes use of the <em>CallStack<\/em> class I introduced in the <a href=\"http:\/\/www.crazygaze.com\/blog\/2016\/03\/11\/callstack-markers-boostasiodetailcall_stack\/\">previous post<\/a> .<\/p>\n\n\n\n<p>And some sample code testing it&#8230;<\/p>\n\n\n<div class=\"wp-block-syntaxhighlighter-code \"><pre class=\"brush: cpp; title: ; notranslate\" title=\"\">\n#include &quot;WorkQueue.h&quot;\n \nint main() {\n    WorkQueue work;\n \n    \/\/ Start a couple of consumer threads\n    std::vector&lt;std::thread&gt; ths;\n    for (int i = 0; i &lt; 4; i++) {\n        ths.push_back(std::thread(&#91;&amp;work] { work.run(); }));\n    }\n \n    std::vector&lt;int&gt; res;\n    res.resize(10000);\n \n    \/\/ Enqueue work.\n    for (int i = 0; i &lt; static_cast&lt;int&gt;(res.size()); i++) {\n        \/\/ These work items simply increment the element at index i.\n        work.push(&#91;i, &amp;res] { res&#91;i]++; });\n    }\n \n    \/\/ Stop consumers, and wait for the threads to finish\n    work.stop();\n    for (auto&amp;&amp; t : ths) t.join();\n \n    \/\/ Test if all work items were executed\n    for (int i = 0; i &lt; static_cast&lt;int&gt;(res.size()); i++) {\n        if (res&#91;i] != 1)\n            printf(&quot;ERROR: Index %d set to %dn&quot;, i, res&#91;i]);\n    }\n}\n<\/pre><\/div>\n\n\n<p>Of course for such lightweight work items such as these, this will have terrible performance. But for sufficiently complex work items, I don&#8217;t see any obvious problems with this approach.<\/p>\n\n\n\n<h3>License<\/h3>\n\n\n\n<p>The source code in this article is licensed under the <a href=\"https:\/\/creativecommons.org\/publicdomain\/zero\/1.0\/\">CC0 license<\/a>, so feel free to copy, modify, share, do whatever you want with it.<br>No attribution is required, but I&#8217;ll be happy if you do.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>In my earlier post, I said I would be dissecting strands. Since I&#8217;m using strands with something like Boost Asio (mentioned here), it&#8217;s not feasible to share all those dependencies just to explain strands. So, as I though of a minimalistic multiple producer \/ multiple consumer queue that could give me a similar interface and [&hellip;]<\/p>\n","protected":false},"author":3,"featured_media":0,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_mi_skip_tracking":false,"spay_email":"","jetpack_publicize_message":"","jetpack_is_tweetstorm":false,"jetpack_publicize_feature_enabled":true},"categories":[50],"tags":[14,12,52,10],"jetpack_featured_media_url":"","jetpack_publicize_connections":[],"jetpack_sharing_enabled":true,"jetpack_shortlink":"https:\/\/wp.me\/p7jpe0-6z","_links":{"self":[{"href":"https:\/\/www.crazygaze.com\/blog\/wp-json\/wp\/v2\/posts\/407"}],"collection":[{"href":"https:\/\/www.crazygaze.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.crazygaze.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.crazygaze.com\/blog\/wp-json\/wp\/v2\/users\/3"}],"replies":[{"embeddable":true,"href":"https:\/\/www.crazygaze.com\/blog\/wp-json\/wp\/v2\/comments?post=407"}],"version-history":[{"count":1,"href":"https:\/\/www.crazygaze.com\/blog\/wp-json\/wp\/v2\/posts\/407\/revisions"}],"predecessor-version":[{"id":849,"href":"https:\/\/www.crazygaze.com\/blog\/wp-json\/wp\/v2\/posts\/407\/revisions\/849"}],"wp:attachment":[{"href":"https:\/\/www.crazygaze.com\/blog\/wp-json\/wp\/v2\/media?parent=407"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.crazygaze.com\/blog\/wp-json\/wp\/v2\/categories?post=407"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.crazygaze.com\/blog\/wp-json\/wp\/v2\/tags?post=407"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}