Libosmium  2.16.0
Fast and flexible C++ library for working with OpenStreetMap data
queue.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_THREAD_QUEUE_HPP
2 #define OSMIUM_THREAD_QUEUE_HPP
3 
4 /*
5 
6 This file is part of Osmium (https://osmcode.org/libosmium).
7 
8 Copyright 2013-2021 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
36 #include <chrono>
37 #include <condition_variable>
38 #include <cstddef>
39 #include <mutex>
40 #include <queue>
41 #include <string>
42 #include <utility> // IWYU pragma: keep
43 
44 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
45 # include <atomic>
46 # include <iostream>
47 #endif
48 
49 namespace osmium {
50 
51  namespace thread {
52 
56  template <typename T>
57  class Queue {
58 
61  const std::size_t m_max_size;
62 
64  const std::string m_name;
65 
66  mutable std::mutex m_mutex;
67 
68  std::queue<T> m_queue;
69 
71  std::condition_variable m_data_available;
72 
74  std::condition_variable m_space_available;
75 
76 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
78  std::size_t m_largest_size;
79 
81  std::atomic<int> m_push_counter;
82 
85  std::atomic<int> m_full_counter;
86 
91  std::atomic<int> m_pop_counter;
92 
95  std::atomic<int> m_empty_counter;
96 #endif
97 
98  public:
99 
107  explicit Queue(std::size_t max_size = 0, std::string name = "") :
108  m_max_size(max_size),
109  m_name(std::move(name)),
110  m_queue()
111 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
112  ,
113  m_largest_size(0),
114  m_push_counter(0),
115  m_full_counter(0),
116  m_pop_counter(0),
117  m_empty_counter(0)
118 #endif
119  {
120  }
121 
122  Queue(const Queue&) = delete;
123  Queue& operator=(const Queue&) = delete;
124 
125  Queue(Queue&&) = delete;
126  Queue& operator=(Queue&&) = delete;
127 
128 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
129  ~Queue() {
130  std::cerr << "queue '" << m_name
131  << "' with max_size=" << m_max_size
132  << " had largest size " << m_largest_size
133  << " and was full " << m_full_counter
134  << " times in " << m_push_counter
135  << " push() calls and was empty " << m_empty_counter
136  << " times in " << m_pop_counter
137  << " pop() calls\n";
138  }
139 #else
140  ~Queue() = default;
141 #endif
142 
147  void push(T value) {
148  constexpr const std::chrono::milliseconds max_wait{10};
149 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
150  ++m_push_counter;
151 #endif
152  if (m_max_size) {
153  while (size() >= m_max_size) {
154  std::unique_lock<std::mutex> lock{m_mutex};
155  m_space_available.wait_for(lock, max_wait, [this] {
156  return m_queue.size() < m_max_size;
157  });
158 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
159  ++m_full_counter;
160 #endif
161  }
162  }
163  std::lock_guard<std::mutex> lock{m_mutex};
164  m_queue.push(std::move(value));
165 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
166  if (m_largest_size < m_queue.size()) {
167  m_largest_size = m_queue.size();
168  }
169 #endif
170  m_data_available.notify_one();
171  }
172 
173  void wait_and_pop(T& value) {
174 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
175  ++m_pop_counter;
176 #endif
177  std::unique_lock<std::mutex> lock{m_mutex};
178 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
179  if (m_queue.empty()) {
180  ++m_empty_counter;
181  }
182 #endif
183  m_data_available.wait(lock, [this] {
184  return !m_queue.empty();
185  });
186  if (!m_queue.empty()) {
187  value = std::move(m_queue.front());
188  m_queue.pop();
189  lock.unlock();
190  if (m_max_size) {
191  m_space_available.notify_one();
192  }
193  }
194  }
195 
196  bool try_pop(T& value) {
197 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
198  ++m_pop_counter;
199 #endif
200  {
201  std::lock_guard<std::mutex> lock{m_mutex};
202  if (m_queue.empty()) {
203 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
204  ++m_empty_counter;
205 #endif
206  return false;
207  }
208  value = std::move(m_queue.front());
209  m_queue.pop();
210  }
211  if (m_max_size) {
212  m_space_available.notify_one();
213  }
214  return true;
215  }
216 
217  bool empty() const {
218  std::lock_guard<std::mutex> lock{m_mutex};
219  return m_queue.empty();
220  }
221 
222  std::size_t size() const {
223  std::lock_guard<std::mutex> lock{m_mutex};
224  return m_queue.size();
225  }
226 
227  }; // class Queue
228 
229  } // namespace thread
230 
231 } // namespace osmium
232 
233 #endif // OSMIUM_THREAD_QUEUE_HPP
Definition: queue.hpp:57
bool try_pop(T &value)
Definition: queue.hpp:196
std::mutex m_mutex
Definition: queue.hpp:66
Queue & operator=(const Queue &)=delete
bool empty() const
Definition: queue.hpp:217
void push(T value)
Definition: queue.hpp:147
void wait_and_pop(T &value)
Definition: queue.hpp:173
Queue(const Queue &)=delete
std::condition_variable m_space_available
Used to signal producers when queue is not full.
Definition: queue.hpp:74
Queue(std::size_t max_size=0, std::string name="")
Definition: queue.hpp:107
std::size_t size() const
Definition: queue.hpp:222
std::queue< T > m_queue
Definition: queue.hpp:68
Queue & operator=(Queue &&)=delete
Queue(Queue &&)=delete
const std::size_t m_max_size
Definition: queue.hpp:61
std::condition_variable m_data_available
Used to signal consumers when data is available in the queue.
Definition: queue.hpp:71
const std::string m_name
Name of this queue (for debugging only).
Definition: queue.hpp:64
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
Definition: location.hpp:551