A library for working with phylogenetic and population genetic data.
v0.32.0
threadsafe_queue.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_UTILS_THREADING_THREADSAFE_QUEUE_H_
2 #define GENESIS_UTILS_THREADING_THREADSAFE_QUEUE_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2024 Lucas Czech
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lczech@carnegiescience.edu>
23  Department of Plant Biology, Carnegie Institution For Science
24  260 Panama Street, Stanford, CA 94305, USA
25 */
26 
27 /*
28  The code below is adapted from https://github.com/anthonywilliams/ccia_code_samples
29  and was published under the Boost Software License:
30 
31  Boost Software License - Version 1.0 - August 17th, 2003
32 
33  Permission is hereby granted, free of charge, to any person or organization
34  obtaining a copy of the software and accompanying documentation covered by
35  this license (the "Software") to use, reproduce, display, distribute,
36  execute, and transmit the Software, and to prepare derivative works of the
37  Software, and to permit third-parties to whom the Software is furnished to
38  do so, all subject to the following:
39 
40  The copyright notices in the Software and this entire statement, including
41  the above license grant, this restriction and the following disclaimer,
42  must be included in all copies of the Software, in whole or in part, and
43  all derivative works of the Software, unless such copies or derivative
44  works are solely in the form of machine-executable object code generated by
45  a source language processor.
46 
47  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
48  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
49  FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
50  SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
51  FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
52  ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
53  DEALINGS IN THE SOFTWARE.
54  */
55 
63 #include <atomic>
64 #include <condition_variable>
65 #include <future>
66 #include <memory>
67 #include <mutex>
68 #include <queue>
69 
70 namespace genesis {
71 namespace utils {
72 
73 // =================================================================================================
74 // Threadsafe Queue
75 // =================================================================================================
76 
88 template <typename T>
90 {
91 public:
92 
93  // -------------------------------------------------------------------------
94  // Constructor and Rule of Five
95  // -------------------------------------------------------------------------
96 
97  ThreadsafeQueue() = default;
98  ~ThreadsafeQueue() = default;
99 
101  {
102  std::lock_guard<std::mutex> lk( other.mutex_ );
103  data_queue_ = other.data_queue_;
104  }
105 
107  {
108  std::lock_guard<std::mutex> lk(other.mutex_);
109  data_queue_ = std::move( other.data_queue_ );
110  }
111 
113  {
114  if( &other == this ) {
115  return *this;
116  }
117 
118  // Lock both mutexes without deadlock, C++11.
119  // In C++17, the next three lines could simply be this one instead:
120  // std::scoped_lock lk( mutex_, other.mutex_ );
121  std::lock(mutex_, other.mutex_);
122  std::lock_guard<std::mutex> lhs_lk(mutex_, std::adopt_lock);
123  std::lock_guard<std::mutex> rhs_lk(other.mutex_, std::adopt_lock);
124 
125  data_queue_ = other.data_queue_;
126  return *this;
127  }
128 
130  {
131  if( &other == this ) {
132  return *this;
133  }
134 
135  // See above, copy assignment. Same here.
136  std::lock(mutex_, other.mutex_);
137  std::lock_guard<std::mutex> lhs_lk(mutex_, std::adopt_lock);
138  std::lock_guard<std::mutex> rhs_lk(other.mutex_, std::adopt_lock);
139  data_queue_ = std::move( other.data_queue_ );
140  return *this;
141  }
142 
143  // -------------------------------------------------------------------------
144  // Member Functions
145  // -------------------------------------------------------------------------
146 
147  void push( T new_value )
148  {
149  std::lock_guard<std::mutex> lk(mutex_);
150  data_queue_.push( std::move( new_value ));
151  data_cond_.notify_one();
152  }
153 
154  void wait_and_pop( T& value )
155  {
156  std::unique_lock<std::mutex> lk(mutex_);
157  data_cond_.wait(lk, [this] { return !data_queue_.empty(); });
158  value = std::move( data_queue_.front() );
159  data_queue_.pop();
160  }
161 
162  std::shared_ptr<T> wait_and_pop()
163  {
164  std::unique_lock<std::mutex> lk(mutex_);
165  data_cond_.wait(lk, [this] { return !data_queue_.empty(); });
166  std::shared_ptr<T> res(std::make_shared<T>(data_queue_.front()));
167  data_queue_.pop();
168  return res;
169  }
170 
171  bool try_pop( T& value )
172  {
173  std::lock_guard<std::mutex> lk(mutex_);
174  if( data_queue_.empty() ) {
175  return false;
176  }
177  value = std::move( data_queue_.front() );
178  data_queue_.pop();
179  return true;
180  }
181 
182  std::shared_ptr<T> try_pop()
183  {
184  std::lock_guard<std::mutex> lk(mutex_);
185  if( data_queue_.empty() ) {
186  return std::shared_ptr<T>();
187  }
188  std::shared_ptr<T> res(std::make_shared<T>(data_queue_.front()));
189  data_queue_.pop();
190  return res;
191  }
192 
193  bool empty() const
194  {
195  std::lock_guard<std::mutex> lk(mutex_);
196  return data_queue_.empty();
197  }
198 
199 private:
200 
201  mutable std::mutex mutex_;
202  std::queue<T> data_queue_;
203  std::condition_variable data_cond_;
204 };
205 
206 // =================================================================================================
207 // Threadsafe Queue
208 // =================================================================================================
209 
210 // The below code is another threadsafe queue implementation from the same book,
211 // but it seems broken, see https://github.com/anthonywilliams/ccia_code_samples/issues/53
212 // We hence do not use it here, but keep it here for later reference, should that issue get resolved.
213 
214 // /**
215 // * @brief Thread-safe queue with fine-grained locking.
216 // *
217 // * This implementation is from the excellent "C++ Concurrency in Action" book by Anthony Williams,
218 // * second edition, chapter 6, listings 6.7 to 6.10.
219 // * The code is published at https://github.com/anthonywilliams/ccia_code_samples under
220 // * the [Boost Software License](https://www.boost.org/LICENSE_1_0.txt).
221 // */
222 // template<typename T>
223 // class ThreadsafeQueue
224 // {
225 // private:
226 //
227 // // -------------------------------------------------------------------------
228 // // Member Types
229 // // -------------------------------------------------------------------------
230 //
231 // struct Node
232 // {
233 // std::shared_ptr<T> data;
234 // std::unique_ptr<Node> next;
235 // };
236 //
237 // // -------------------------------------------------------------------------
238 // // Constructor and Rule of Five
239 // // -------------------------------------------------------------------------
240 //
241 // public:
242 //
243 // ThreadsafeQueue()
244 // : head_( new Node )
245 // , tail_( head_.get() )
246 // {}
247 //
248 // ~ThreadsafeQueue() = default;
249 //
250 // ThreadsafeQueue( ThreadsafeQueue const& ) = delete;
251 // ThreadsafeQueue( ThreadsafeQueue&& ) = default;
252 //
253 // ThreadsafeQueue& operator=( ThreadsafeQueue const& ) = delete;
254 // ThreadsafeQueue& operator=( ThreadsafeQueue&& ) = default;
255 //
256 // // -------------------------------------------------------------------------
257 // // Member Functions
258 // // -------------------------------------------------------------------------
259 //
260 // bool empty()
261 // {
262 // std::lock_guard<std::mutex> head_lock( head_mutex_ );
263 // return ( head_.get() == get_tail_() );
264 // }
265 //
266 // void push( T new_value )
267 // {
268 // // Create the new elements
269 // std::shared_ptr<T> new_data(
270 // std::make_shared<T>( std::move( new_value ))
271 // );
272 // std::unique_ptr<Node> new_node( new Node );
273 //
274 // // Lock scope to do the push
275 // {
276 // std::lock_guard<std::mutex> tail_lock( tail_mutex_ );
277 // tail_->data = new_data;
278 // Node* const new_tail = new_node.get();
279 // tail_->next = std::move( new_node );
280 // tail_ = new_tail;
281 // }
282 //
283 // // Notify any waiting pop
284 // data_cond_.notify_one();
285 // }
286 //
287 // std::shared_ptr<T> try_pop()
288 // {
289 // std::unique_ptr<Node> const old_head = try_pop_head_();
290 // return old_head ? old_head->data : std::shared_ptr<T>();
291 // }
292 //
293 // bool try_pop( T& value )
294 // {
295 // std::unique_ptr<Node> const old_head = try_pop_head_( value );
296 // return old_head;
297 // }
298 //
299 // std::shared_ptr<T> wait_and_pop()
300 // {
301 // std::unique_ptr<Node> const old_head = wait_pop_head_();
302 // return old_head->data;
303 // }
304 //
305 // void wait_and_pop( T& value )
306 // {
307 // std::unique_ptr<Node> const old_head = wait_pop_head_( value );
308 // }
309 //
310 // private:
311 //
312 // // -------------------------------------------------------------------------
313 // // Members Functions
314 // // -------------------------------------------------------------------------
315 //
316 //
317 // Node* get_tail_()
318 // {
319 // std::lock_guard<std::mutex> tail_lock( tail_mutex_ );
320 // return tail_;
321 // }
322 //
323 // std::unique_ptr<Node> pop_head_()
324 // {
325 // std::unique_ptr<Node> old_head = std::move( head_ );
326 // head_ = std::move( old_head->next );
327 // return old_head;
328 // }
329 //
330 // std::unique_ptr<Node> try_pop_head_()
331 // {
332 // std::lock_guard<std::mutex> head_lock( head_mutex_ );
333 // if( head_.get() == get_tail_() ) {
334 // return std::unique_ptr<Node>();
335 // }
336 // return pop_head_();
337 // }
338 //
339 // std::unique_ptr<Node> try_pop_head_( T& value )
340 // {
341 // std::lock_guard<std::mutex> head_lock( head_mutex_ );
342 // if( head_.get() == get_tail_() ) {
343 // return std::unique_ptr<Node>();
344 // }
345 // value = std::move( *head_->data );
346 // return pop_head_();
347 // }
348 //
349 // std::unique_lock<std::mutex> wait_for_data_()
350 // {
351 // std::unique_lock<std::mutex> head_lock( head_mutex_ );
352 // // The following wait can block indefinitely...
353 // // See https://github.com/anthonywilliams/ccia_code_samples/issues/53
354 // data_cond_.wait(
355 // head_lock,
356 // [&]{
357 // return head_.get() != get_tail_();
358 // }
359 // );
360 //
361 // // This is a potential workaround, but of course makes the queue slower.
362 // // while( head.get() == get_tail() ) {
363 // // data_cond.wait_for(
364 // // head_lock,
365 // // std::chrono::milliseconds(1)
366 // // );
367 // // }
368 //
369 // // Bug in the listing and book,
370 // // see https://github.com/anthonywilliams/ccia_code_samples/issues/52
371 // // return std::move( head_lock );
372 // return head_lock;
373 // }
374 //
375 // std::unique_ptr<Node> wait_pop_head_()
376 // {
377 // std::unique_lock<std::mutex> head_lock( wait_for_data_() );
378 // return pop_head_();
379 // }
380 //
381 // std::unique_ptr<Node> wait_pop_head_(T& value)
382 // {
383 // std::unique_lock<std::mutex> head_lock( wait_for_data_() );
384 // value = std::move( *head_->data );
385 // return pop_head_();
386 // }
387 //
388 // // -------------------------------------------------------------------------
389 // // Member Variables
390 // // -------------------------------------------------------------------------
391 //
392 // // Access mutexes
393 // std::mutex head_mutex_;
394 // std::mutex tail_mutex_;
395 //
396 // // Queue, with ownership controlled by the head
397 // std::unique_ptr<Node> head_;
398 // Node* tail_;
399 //
400 // // Condition for wait_and_pop()
401 // std::condition_variable data_cond_;
402 // };
403 
404 } // namespace utils
405 } // namespace genesis
406 
407 #endif // include guard
genesis::utils::ThreadsafeQueue
Thread-safe queue.
Definition: threadsafe_queue.hpp:89
genesis::utils::ThreadsafeQueue::wait_and_pop
std::shared_ptr< T > wait_and_pop()
Definition: threadsafe_queue.hpp:162
genesis::utils::ThreadsafeQueue::ThreadsafeQueue
ThreadsafeQueue(ThreadsafeQueue const &other)
Definition: threadsafe_queue.hpp:100
genesis::utils::ThreadsafeQueue::ThreadsafeQueue
ThreadsafeQueue()=default
genesis::utils::ThreadsafeQueue::operator=
ThreadsafeQueue & operator=(ThreadsafeQueue const &other)
Definition: threadsafe_queue.hpp:112
genesis::utils::ThreadsafeQueue::operator=
ThreadsafeQueue & operator=(ThreadsafeQueue &&other) noexcept
Definition: threadsafe_queue.hpp:129
genesis::utils::ThreadsafeQueue::push
void push(T new_value)
Definition: threadsafe_queue.hpp:147
genesis::utils::ThreadsafeQueue::try_pop
bool try_pop(T &value)
Definition: threadsafe_queue.hpp:171
genesis::utils::ThreadsafeQueue::~ThreadsafeQueue
~ThreadsafeQueue()=default
genesis::utils::ThreadsafeQueue::try_pop
std::shared_ptr< T > try_pop()
Definition: threadsafe_queue.hpp:182
genesis::utils::ThreadsafeQueue::wait_and_pop
void wait_and_pop(T &value)
Definition: threadsafe_queue.hpp:154
genesis
Container namespace for all symbols of genesis in order to keep them separate when used as a library.
Definition: placement/formats/edge_color.cpp:42
genesis::utils::ThreadsafeQueue::empty
bool empty() const
Definition: threadsafe_queue.hpp:193
genesis::utils::ThreadsafeQueue::ThreadsafeQueue
ThreadsafeQueue(ThreadsafeQueue &&other) noexcept
Definition: threadsafe_queue.hpp:106