Hermes  0.9.5-beta
Hierarchical Distributed I/O Buffering System
thread_pool.h
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
2  * Distributed under BSD 3-Clause license. *
3  * Copyright by The HDF Group. *
4  * Copyright by the Illinois Institute of Technology. *
5  * All rights reserved. *
6  * *
7  * This file is part of Hermes. The full Hermes copyright notice, including *
8  * terms governing use, modification, and redistribution, is contained in *
9  * the COPYING file, which can be found at the top directory. If you do not *
10  * have access to the file, you may request a copy from help@hdfgroup.org. *
11  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
12 
13 #ifndef HERMES_THREAD_POOL_H_
14 #define HERMES_THREAD_POOL_H_
15 
16 #include <condition_variable>
17 #include <future>
18 #include <mutex>
19 #include <queue>
20 #include <thread>
21 
22 namespace hermes {
26 class ThreadPool {
27  public:
29  explicit ThreadPool(
30  unsigned num_threads = std::thread::hardware_concurrency()) {
31  while (num_threads--) {
32  threads.emplace_back([this] {
33  while (true) {
34  std::unique_lock<std::mutex> lock(mutex);
35  condvar.wait(lock, [this]() {
36  return !queue_high.empty() || !queue_low.empty();
37  });
38  bool high_priority = !queue_high.empty();
39  auto task = high_priority ? std::move(queue_high.front())
40  : std::move(queue_low.front());
41  if (task.valid()) {
42  if (high_priority) {
43  queue_high.pop();
44  } else {
45  queue_low.pop();
46  }
47  lock.unlock();
48  // run the task - this cannot throw; any exception
49  // will be stored in the corresponding future
50  task();
51  } else {
52  // an empty task is used to signal end of stream
53  // don't pop it off the top; all threads need to see it
54  break;
55  }
56  }
57  });
58  }
59  }
60 
62  template <typename F, typename R = std::result_of_t<F && ()>>
63  std::future<R> run(F&& f, bool high_priority = false) const {
64  auto task = std::packaged_task<R()>(std::forward<F>(f));
65  auto future = task.get_future();
66  {
67  std::lock_guard<std::mutex> lock(mutex);
68  // conversion to packaged_task<void()> erases the return type
69  // so it can be stored in the queue. the future will still
70  // contain the correct type
71  if (high_priority) {
72  queue_high.push(std::packaged_task<void()>(std::move(task)));
73  } else {
74  queue_low.push(std::packaged_task<void()>(std::move(task)));
75  }
76  }
77  condvar.notify_one();
78  return future;
79  }
80 
81  ~ThreadPool() {
82  // push a single empty task onto the queue and notify all threads,
83  // then wait for them to terminate
84  {
85  std::lock_guard<std::mutex> lock(mutex);
86  queue_low.push({});
87  }
88  condvar.notify_all();
89  for (auto& thread : threads) {
90  thread.join();
91  }
92  }
93 
94  private:
95  std::vector<std::thread> threads;
97  mutable std::queue<std::packaged_task<void()>> queue_low;
99  mutable std::queue<std::packaged_task<void()>> queue_high;
100  mutable std::mutex mutex;
101  mutable std::condition_variable condvar;
102 };
103 } // namespace hermes
104 #endif // HERMES_THREAD_POOL_H_
Definition: thread_pool.h:26
std::vector< std::thread > threads
Definition: thread_pool.h:95
ThreadPool(unsigned num_threads=std::thread::hardware_concurrency())
Definition: thread_pool.h:29
std::condition_variable condvar
Definition: thread_pool.h:101
std::mutex mutex
Definition: thread_pool.h:100
std::queue< std::packaged_task< void()> > queue_low
Definition: thread_pool.h:97
std::queue< std::packaged_task< void()> > queue_high
Definition: thread_pool.h:99
std::future< R > run(F &&f, bool high_priority=false) const
Definition: thread_pool.h:63
Definition: adapter_utils.cc:35