tlx
Loading...
Searching...
No Matches
thread_pool.hpp
Go to the documentation of this file.
1/*******************************************************************************
2 * tlx/thread_pool.hpp
3 *
4 * Part of tlx - http://panthema.net/tlx
5 *
6 * Copyright (C) 2015-2019 Timo Bingmann <tb@panthema.net>
7 *
8 * All rights reserved. Published under the Boost Software License, Version 1.0
9 ******************************************************************************/
10
11#ifndef TLX_THREAD_POOL_HEADER
12#define TLX_THREAD_POOL_HEADER
13
14#include <atomic>
15#include <cassert>
16#include <condition_variable>
17#include <deque>
18#include <mutex>
19#include <thread>
20
22#include <tlx/delegate.hpp>
23
24namespace tlx {
25
26/*!
27 * ThreadPool starts a fixed number p of std::threads which process Jobs that
28 * are \ref enqueue "enqueued" into a concurrent job queue. The jobs
29 * themselves can enqueue more jobs that will be processed when a thread is
30 * ready.
31 *
32 * The ThreadPool can either run until
33 *
34 * 1. all jobs are done AND all threads are idle, when called with
35 * loop_until_empty(), or
36 *
37 * 2. until Terminate() is called when run with loop_until_terminate().
38 *
39 * Jobs are plain tlx::Delegate<void()> objects, hence the pool user must pass
40 * in ALL CONTEXT himself. The best method to pass parameters to Jobs is to use
41 * lambda captures. Alternatively, old-school objects implementing operator(),
42 * or std::binds can be used.
43 *
44 * The ThreadPool uses a condition variable to wait for new jobs and does not
45 * remain busy waiting.
46 *
47 * Note that the threads in the pool start **before** the two loop functions are
48 * called. In case of loop_until_empty() the threads continue to be idle
49 * afterwards, and can be reused, until the ThreadPool is destroyed.
50
51\code
52ThreadPool pool(4); // pool with 4 threads
53
54int value = 0;
55pool.enqueue([&value]() {
56 // increment value in another thread.
57 ++value;
58});
59
60pool.loop_until_empty();
61\endcode
62
63 */
65{
66public:
67 using Job = Delegate<void ()>;
68 using InitThread = Delegate<void (size_t)>;
69
70private:
71 //! Deque of scheduled jobs.
72 std::deque<Job> jobs_;
73
74 //! Mutex used to access the queue of scheduled jobs.
75 std::mutex mutex_;
76
77 //! threads in pool
79
80 //! Condition variable used to notify that a new job has been inserted in
81 //! the queue.
82 std::condition_variable cv_jobs_;
83 //! Condition variable to signal when a jobs finishes.
84 std::condition_variable cv_finished_;
85
86 //! Counter for number of threads busy.
87 std::atomic<size_t> busy_ = { 0 };
88 //! Counter for number of idle threads waiting for a job.
89 std::atomic<size_t> idle_ = { 0 };
90 //! Counter for total number of jobs executed
91 std::atomic<size_t> done_ = { 0 };
92
93 //! Flag whether to terminate
94 std::atomic<bool> terminate_ = { false };
95
96 //! Run once per worker thread
98
99public:
100 //! Construct running thread pool of num_threads
102 size_t num_threads = std::thread::hardware_concurrency(),
103 InitThread&& init_thread = InitThread());
104
105 //! non-copyable: delete copy-constructor
106 ThreadPool(const ThreadPool&) = delete;
107 //! non-copyable: delete assignment operator
109
110 //! Stop processing jobs, terminate threads.
111 ~ThreadPool();
112
113 //! enqueue a Job, the caller must pass in all context using captures.
114 void enqueue(Job&& job);
115
116 //! Loop until no more jobs are in the queue AND all threads are idle. When
117 //! this occurs, this method exits, however, the threads remain active.
118 void loop_until_empty();
119
120 //! Loop until terminate flag was set.
122
123 //! Terminate thread pool gracefully, wait until currently running jobs
124 //! finish and then exit. This should be called from within one of the
125 //! enqueue jobs or from an outside thread.
126 void terminate();
127
128 //! Return number of jobs currently completed.
129 size_t done() const;
130
131 //! Return number of threads in pool
132 size_t size() const;
133
134 //! return number of idle threads in pool
135 size_t idle() const;
136
137 //! true if any thread is idle (= waiting for jobs)
138 bool has_idle() const;
139
140 //! Return thread handle to thread i
141 std::thread& thread(size_t i);
142
143private:
144 //! Worker function, one per thread is started.
145 void worker(size_t p);
146};
147
148} // namespace tlx
149
150#endif // !TLX_THREAD_POOL_HEADER
151
152/******************************************************************************/
Simpler non-growing vector without initialization.
ThreadPool starts a fixed number p of std::threads which process Jobs that are enqueued into a concur...
void loop_until_terminate()
Loop until terminate flag was set.
std::atomic< size_t > idle_
Counter for number of idle threads waiting for a job.
std::atomic< size_t > busy_
Counter for number of threads busy.
std::atomic< size_t > done_
Counter for total number of jobs executed.
std::thread & thread(size_t i)
Return thread handle to thread i.
size_t size() const
Return number of threads in pool.
std::mutex mutex_
Mutex used to access the queue of scheduled jobs.
void worker(size_t p)
Worker function, one per thread is started.
simple_vector< std::thread > threads_
threads in pool
Delegate< void(size_t)> InitThread
std::atomic< bool > terminate_
Flag whether to terminate.
std::condition_variable cv_jobs_
Condition variable used to notify that a new job has been inserted in the queue.
void terminate()
Terminate thread pool gracefully, wait until currently running jobs finish and then exit.
bool has_idle() const
true if any thread is idle (= waiting for jobs)
size_t done() const
Return number of jobs currently completed.
void loop_until_empty()
Loop until no more jobs are in the queue AND all threads are idle.
ThreadPool & operator=(const ThreadPool &)=delete
non-copyable: delete assignment operator
std::deque< Job > jobs_
Deque of scheduled jobs.
InitThread init_thread_
Run once per worker thread.
void enqueue(Job &&job)
enqueue a Job, the caller must pass in all context using captures.
~ThreadPool()
Stop processing jobs, terminate threads.
ThreadPool(const ThreadPool &)=delete
non-copyable: delete copy-constructor
std::condition_variable cv_finished_
Condition variable to signal when a jobs finishes.
size_t idle() const
return number of idle threads in pool