A library for working with phylogenetic and population genetic data.
v0.32.0
blocking_concurrent_queue.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_UTILS_THREADING_BLOCKING_CONCURRENT_QUEUE_H_
2 #define GENESIS_UTILS_THREADING_BLOCKING_CONCURRENT_QUEUE_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2024 Lucas Czech
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lucas.czech@sund.ku.dk>
23  University of Copenhagen, Globe Institute, Section for GeoGenetics
24  Oster Voldgade 5-7, 1350 Copenhagen K, Denmark
25 */
26 
27 /*
28  This code below is adapted from the excellent moodycamel::ConcurrentQueue
29  (https://github.com/cameron314/concurrentqueue), using version v1.0.4,
30  which was published under a simplified BSD license, and also dual-licensed
31  under the Boost Software License. The full original license
32  (https://github.com/cameron314/concurrentqueue/blob/master/LICENSE.md), is copied
33  in our documentation as well, see `genesis/doc/manual/supplement/acknowledgements.md`, and
34  @link supplement_acknowledgements_code_reuse_concurrent_queue Acknowledgements@endlink.
35 
36  See also ConcurrentQueue and LightweightSemaphore for the other two classes
37  that we adapted from the original repository.
38 
39  We adapted the original code by (roughly) formatting it to our formatting standard, as well as
40  renaming the namespace from moodycamel to be contained within our namespace, to keep our
41  documentation and usage consistent. Other than that, all functionality is kept as-is.
42  It feels weird to rename the namespaces, as it might seem that we are trying to hide the fact
43  that this code is not ours. That is not the case, and is merely based on our compulsion
44  for orderly namespaces and documentation within genesis. Please note that the original code
45  is excellet work that we would like to acknowledge here! Also, if anyone wants to do the same
46  with genesis code, please feel free to do so ;-)
47 */
48 
58 
59 // =================================================================================================
60 // Blocking Concurrent Queue
61 // =================================================================================================
62 
63 // Provides an efficient blocking version of moodycamel::ConcurrentQueue.
64 // ©2015-2020 Cameron Desrochers. Distributed under the terms of the simplified
65 // BSD license, available at the top of concurrentqueue.h.
66 // Also dual-licensed under the Boost Software License (see LICENSE.md)
67 // Uses Jeff Preshing's semaphore implementation (under the terms of its
68 // separate zlib license, see lightweightsemaphore.h).
69 
70 #include <cerrno>
71 #include <chrono>
72 #include <ctime>
73 #include <memory>
74 #include <type_traits>
75 
76 namespace genesis {
77 namespace utils {
78 // This is a blocking version of the queue. It has an almost identical interface to
79 // the normal non-blocking version, with the addition of various wait_dequeue() methods
80 // and the removal of producer-specific dequeue methods.
81 template <typename T, typename Traits = ConcurrentQueueDefaultTraits>
83 private:
84  typedef ::genesis::utils::ConcurrentQueue<T, Traits> ConcurrentQueue;
85 
86 public:
89 
91  typedef typename ConcurrentQueue::size_t size_t;
92  typedef typename std::make_signed<size_t>::type ssize_t;
93 
94  static const size_t BLOCK_SIZE = ConcurrentQueue::BLOCK_SIZE;
101 
102 public:
103  // Creates a queue with at least `capacity` element slots; note that the
104  // actual number of elements that can be inserted without additional memory
105  // allocation depends on the number of producers and the block size (e.g. if
106  // the block size is equal to `capacity`, only a single block will be allocated
107  // up-front, which means only a single producer will be able to enqueue elements
108  // without an extra allocation -- blocks aren't shared between producers).
109  // This method is not thread safe -- it is up to the user to ensure that the
110  // queue is fully constructed before it starts being used by other threads (this
111  // includes making the memory effects of construction visible, possibly with a
112  // memory barrier).
113  explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
114  : inner(capacity)
115  , sema(create<LightweightSemaphore, ssize_t, int>(0, (int)Traits::MAX_SEMA_SPINS), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
116  {
117  assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
118  if (!sema) {
119  MOODYCAMEL_THROW(std::bad_alloc());
120  }
121  }
122 
123  BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
124  : inner(minCapacity, maxExplicitProducers, maxImplicitProducers)
125  , sema(create<LightweightSemaphore, ssize_t, int>(0, (int)Traits::MAX_SEMA_SPINS), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
126  {
127  assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
128  if (!sema) {
129  MOODYCAMEL_THROW(std::bad_alloc());
130  }
131  }
132 
133  // Disable copying and copy assignment
136 
137  // Moving is supported, but note that it is *not* a thread-safe operation.
138  // Nobody can use the queue while it's being moved, and the memory effects
139  // of that move must be propagated to other threads before they can use it.
140  // Note: When a queue is moved, its tokens are still valid but can only be
141  // used with the destination queue (i.e. semantically they are moved along
142  // with the queue itself).
144  : inner(std::move(other.inner)),
145  sema(std::move(other.sema))
146  {
147  }
148 
150  {
151  return swap_internal(other);
152  }
153 
154  // Swaps this queue's state with the other's. Not thread-safe.
155  // Swapping two queues does not invalidate their tokens, however
156  // the tokens that were created for one queue must be used with
157  // only the swapped queue (i.e. the tokens are tied to the
158  // queue's movable state, not the object itself).
160  {
161  swap_internal(other);
162  }
163 
164 private:
165  BlockingConcurrentQueue& swap_internal(BlockingConcurrentQueue& other)
166  {
167  if (this == &other) {
168  return *this;
169  }
170 
171  inner.swap(other.inner);
172  sema.swap(other.sema);
173  return *this;
174  }
175 
176 public:
177  // Enqueues a single item (by copying it).
178  // Allocates memory if required. Only fails if memory allocation fails (or implicit
179  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
180  // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
181  // Thread-safe.
182  inline bool enqueue(T const& item)
183  {
184  if ((details::likely)(inner.enqueue(item))) {
185  sema->signal();
186  return true;
187  }
188  return false;
189  }
190 
191  // Enqueues a single item (by moving it, if possible).
192  // Allocates memory if required. Only fails if memory allocation fails (or implicit
193  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
194  // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
195  // Thread-safe.
196  inline bool enqueue(T&& item)
197  {
198  if ((details::likely)(inner.enqueue(std::move(item)))) {
199  sema->signal();
200  return true;
201  }
202  return false;
203  }
204 
205  // Enqueues a single item (by copying it) using an explicit producer token.
206  // Allocates memory if required. Only fails if memory allocation fails (or
207  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
208  // Thread-safe.
209  inline bool enqueue(producer_token_t const& token, T const& item)
210  {
211  if ((details::likely)(inner.enqueue(token, item))) {
212  sema->signal();
213  return true;
214  }
215  return false;
216  }
217 
218  // Enqueues a single item (by moving it, if possible) using an explicit producer token.
219  // Allocates memory if required. Only fails if memory allocation fails (or
220  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
221  // Thread-safe.
222  inline bool enqueue(producer_token_t const& token, T&& item)
223  {
224  if ((details::likely)(inner.enqueue(token, std::move(item)))) {
225  sema->signal();
226  return true;
227  }
228  return false;
229  }
230 
231  // Enqueues several items.
232  // Allocates memory if required. Only fails if memory allocation fails (or
233  // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
234  // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
235  // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
236  // Thread-safe.
237  template <typename It>
238  inline bool enqueue_bulk(It itemFirst, size_t count)
239  {
240  if ((details::likely)(inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
241  sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
242  return true;
243  }
244  return false;
245  }
246 
247  // Enqueues several items using an explicit producer token.
248  // Allocates memory if required. Only fails if memory allocation fails
249  // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
250  // Note: Use std::make_move_iterator if the elements should be moved
251  // instead of copied.
252  // Thread-safe.
253  template <typename It>
254  inline bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
255  {
256  if ((details::likely)(inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
257  sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
258  return true;
259  }
260  return false;
261  }
262 
263  // Enqueues a single item (by copying it).
264  // Does not allocate memory. Fails if not enough room to enqueue (or implicit
265  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
266  // is 0).
267  // Thread-safe.
268  inline bool try_enqueue(T const& item)
269  {
270  if (inner.try_enqueue(item)) {
271  sema->signal();
272  return true;
273  }
274  return false;
275  }
276 
277  // Enqueues a single item (by moving it, if possible).
278  // Does not allocate memory (except for one-time implicit producer).
279  // Fails if not enough room to enqueue (or implicit production is
280  // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
281  // Thread-safe.
282  inline bool try_enqueue(T&& item)
283  {
284  if (inner.try_enqueue(std::move(item))) {
285  sema->signal();
286  return true;
287  }
288  return false;
289  }
290 
291  // Enqueues a single item (by copying it) using an explicit producer token.
292  // Does not allocate memory. Fails if not enough room to enqueue.
293  // Thread-safe.
294  inline bool try_enqueue(producer_token_t const& token, T const& item)
295  {
296  if (inner.try_enqueue(token, item)) {
297  sema->signal();
298  return true;
299  }
300  return false;
301  }
302 
303  // Enqueues a single item (by moving it, if possible) using an explicit producer token.
304  // Does not allocate memory. Fails if not enough room to enqueue.
305  // Thread-safe.
306  inline bool try_enqueue(producer_token_t const& token, T&& item)
307  {
308  if (inner.try_enqueue(token, std::move(item))) {
309  sema->signal();
310  return true;
311  }
312  return false;
313  }
314 
315  // Enqueues several items.
316  // Does not allocate memory (except for one-time implicit producer).
317  // Fails if not enough room to enqueue (or implicit production is
318  // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
319  // Note: Use std::make_move_iterator if the elements should be moved
320  // instead of copied.
321  // Thread-safe.
322  template <typename It>
323  inline bool try_enqueue_bulk(It itemFirst, size_t count)
324  {
325  if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
326  sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
327  return true;
328  }
329  return false;
330  }
331 
332  // Enqueues several items using an explicit producer token.
333  // Does not allocate memory. Fails if not enough room to enqueue.
334  // Note: Use std::make_move_iterator if the elements should be moved
335  // instead of copied.
336  // Thread-safe.
337  template <typename It>
338  inline bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
339  {
340  if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
341  sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
342  return true;
343  }
344  return false;
345  }
346 
347  // Attempts to dequeue from the queue.
348  // Returns false if all producer streams appeared empty at the time they
349  // were checked (so, the queue is likely but not guaranteed to be empty).
350  // Never allocates. Thread-safe.
351  template <typename U>
352  inline bool try_dequeue(U& item)
353  {
354  if (sema->tryWait()) {
355  while (!inner.try_dequeue(item)) {
356  continue;
357  }
358  return true;
359  }
360  return false;
361  }
362 
363  // Attempts to dequeue from the queue using an explicit consumer token.
364  // Returns false if all producer streams appeared empty at the time they
365  // were checked (so, the queue is likely but not guaranteed to be empty).
366  // Never allocates. Thread-safe.
367  template <typename U>
368  inline bool try_dequeue(consumer_token_t& token, U& item)
369  {
370  if (sema->tryWait()) {
371  while (!inner.try_dequeue(token, item)) {
372  continue;
373  }
374  return true;
375  }
376  return false;
377  }
378 
379  // Attempts to dequeue several elements from the queue.
380  // Returns the number of items actually dequeued.
381  // Returns 0 if all producer streams appeared empty at the time they
382  // were checked (so, the queue is likely but not guaranteed to be empty).
383  // Never allocates. Thread-safe.
384  template <typename It>
385  inline size_t try_dequeue_bulk(It itemFirst, size_t max)
386  {
387  size_t count = 0;
388  max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
389  while (count != max) {
390  count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
391  }
392  return count;
393  }
394 
395  // Attempts to dequeue several elements from the queue using an explicit consumer token.
396  // Returns the number of items actually dequeued.
397  // Returns 0 if all producer streams appeared empty at the time they
398  // were checked (so, the queue is likely but not guaranteed to be empty).
399  // Never allocates. Thread-safe.
400  template <typename It>
401  inline size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
402  {
403  size_t count = 0;
404  max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
405  while (count != max) {
406  count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
407  }
408  return count;
409  }
410 
411  // Blocks the current thread until there's something to dequeue, then
412  // dequeues it.
413  // Never allocates. Thread-safe.
414  template <typename U>
415  inline void wait_dequeue(U& item)
416  {
417  while (!sema->wait()) {
418  continue;
419  }
420  while (!inner.try_dequeue(item)) {
421  continue;
422  }
423  }
424 
425  // Blocks the current thread until either there's something to dequeue
426  // or the timeout (specified in microseconds) expires. Returns false
427  // without setting `item` if the timeout expires, otherwise assigns
428  // to `item` and returns true.
429  // Using a negative timeout indicates an indefinite timeout,
430  // and is thus functionally equivalent to calling wait_dequeue.
431  // Never allocates. Thread-safe.
432  template <typename U>
433  inline bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
434  {
435  if (!sema->wait(timeout_usecs)) {
436  return false;
437  }
438  while (!inner.try_dequeue(item)) {
439  continue;
440  }
441  return true;
442  }
443 
444  // Blocks the current thread until either there's something to dequeue
445  // or the timeout expires. Returns false without setting `item` if the
446  // timeout expires, otherwise assigns to `item` and returns true.
447  // Never allocates. Thread-safe.
448  template <typename U, typename Rep, typename Period>
449  inline bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period> const& timeout)
450  {
451  return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
452  }
453 
454  // Blocks the current thread until there's something to dequeue, then
455  // dequeues it using an explicit consumer token.
456  // Never allocates. Thread-safe.
457  template <typename U>
458  inline void wait_dequeue(consumer_token_t& token, U& item)
459  {
460  while (!sema->wait()) {
461  continue;
462  }
463  while (!inner.try_dequeue(token, item)) {
464  continue;
465  }
466  }
467 
468  // Blocks the current thread until either there's something to dequeue
469  // or the timeout (specified in microseconds) expires. Returns false
470  // without setting `item` if the timeout expires, otherwise assigns
471  // to `item` and returns true.
472  // Using a negative timeout indicates an indefinite timeout,
473  // and is thus functionally equivalent to calling wait_dequeue.
474  // Never allocates. Thread-safe.
475  template <typename U>
476  inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::int64_t timeout_usecs)
477  {
478  if (!sema->wait(timeout_usecs)) {
479  return false;
480  }
481  while (!inner.try_dequeue(token, item)) {
482  continue;
483  }
484  return true;
485  }
486 
487  // Blocks the current thread until either there's something to dequeue
488  // or the timeout expires. Returns false without setting `item` if the
489  // timeout expires, otherwise assigns to `item` and returns true.
490  // Never allocates. Thread-safe.
491  template <typename U, typename Rep, typename Period>
492  inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::chrono::duration<Rep, Period> const& timeout)
493  {
494  return wait_dequeue_timed(token, item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
495  }
496 
497  // Attempts to dequeue several elements from the queue.
498  // Returns the number of items actually dequeued, which will
499  // always be at least one (this method blocks until the queue
500  // is non-empty) and at most max.
501  // Never allocates. Thread-safe.
502  template <typename It>
503  inline size_t wait_dequeue_bulk(It itemFirst, size_t max)
504  {
505  size_t count = 0;
506  max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
507  while (count != max) {
508  count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
509  }
510  return count;
511  }
512 
513  // Attempts to dequeue several elements from the queue.
514  // Returns the number of items actually dequeued, which can
515  // be 0 if the timeout expires while waiting for elements,
516  // and at most max.
517  // Using a negative timeout indicates an indefinite timeout,
518  // and is thus functionally equivalent to calling wait_dequeue_bulk.
519  // Never allocates. Thread-safe.
520  template <typename It>
521  inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::int64_t timeout_usecs)
522  {
523  size_t count = 0;
524  max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
525  while (count != max) {
526  count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
527  }
528  return count;
529  }
530 
531  // Attempts to dequeue several elements from the queue.
532  // Returns the number of items actually dequeued, which can
533  // be 0 if the timeout expires while waiting for elements,
534  // and at most max.
535  // Never allocates. Thread-safe.
536  template <typename It, typename Rep, typename Period>
537  inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
538  {
539  return wait_dequeue_bulk_timed<It&>(itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
540  }
541 
542  // Attempts to dequeue several elements from the queue using an explicit consumer token.
543  // Returns the number of items actually dequeued, which will
544  // always be at least one (this method blocks until the queue
545  // is non-empty) and at most max.
546  // Never allocates. Thread-safe.
547  template <typename It>
548  inline size_t wait_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
549  {
550  size_t count = 0;
551  max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
552  while (count != max) {
553  count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
554  }
555  return count;
556  }
557 
558  // Attempts to dequeue several elements from the queue using an explicit consumer token.
559  // Returns the number of items actually dequeued, which can
560  // be 0 if the timeout expires while waiting for elements,
561  // and at most max.
562  // Using a negative timeout indicates an indefinite timeout,
563  // and is thus functionally equivalent to calling wait_dequeue_bulk.
564  // Never allocates. Thread-safe.
565  template <typename It>
566  inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::int64_t timeout_usecs)
567  {
568  size_t count = 0;
569  max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
570  while (count != max) {
571  count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
572  }
573  return count;
574  }
575 
576  // Attempts to dequeue several elements from the queue using an explicit consumer token.
577  // Returns the number of items actually dequeued, which can
578  // be 0 if the timeout expires while waiting for elements,
579  // and at most max.
580  // Never allocates. Thread-safe.
581  template <typename It, typename Rep, typename Period>
582  inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
583  {
584  return wait_dequeue_bulk_timed<It&>(token, itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
585  }
586 
587  // Returns an estimate of the total number of elements currently in the queue. This
588  // estimate is only accurate if the queue has completely stabilized before it is called
589  // (i.e. all enqueue and dequeue operations have completed and their memory effects are
590  // visible on the calling thread, and no further operations start while this method is
591  // being called).
592  // Thread-safe.
593  inline size_t size_approx() const
594  {
595  return (size_t)sema->availableApprox();
596  }
597 
598  // Returns true if the underlying atomic variables used by
599  // the queue are lock-free (they should be on most platforms).
600  // Thread-safe.
601  static constexpr bool is_lock_free()
602  {
604  }
605 
606 private:
607  template <typename U, typename A1, typename A2>
608  static inline U* create(A1&& a1, A2&& a2)
609  {
610  void* p = (Traits::malloc)(sizeof(U));
611  return p != nullptr ? new (p) U(std::forward<A1>(a1), std::forward<A2>(a2)) : nullptr;
612  }
613 
614  template <typename U>
615  static inline void destroy(U* p)
616  {
617  if (p != nullptr) {
618  p->~U();
619  }
620  (Traits::free)(p);
621  }
622 
623 private:
624  ConcurrentQueue inner;
625  std::unique_ptr<LightweightSemaphore, void (*)(LightweightSemaphore*)> sema;
626 };
627 
628 template <typename T, typename Traits>
630 {
631  a.swap(b);
632 }
633 
634 } // namespace utils
635 } // namespace genesis
636 
637 #endif // include guard
genesis::utils::BlockingConcurrentQueue::producer_token_t
ConcurrentQueue::producer_token_t producer_token_t
Definition: blocking_concurrent_queue.hpp:87
genesis::utils::BlockingConcurrentQueue::BLOCK_SIZE
static const size_t BLOCK_SIZE
Definition: blocking_concurrent_queue.hpp:94
MOODYCAMEL_NOEXCEPT
#define MOODYCAMEL_NOEXCEPT
Definition: concurrent_queue.hpp:298
genesis::utils::ConcurrentQueue::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
Definition: concurrent_queue.hpp:948
genesis::utils::BlockingConcurrentQueue::wait_dequeue_bulk_timed
size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::int64_t timeout_usecs)
Definition: blocking_concurrent_queue.hpp:521
genesis::utils::BlockingConcurrentQueue::wait_dequeue_timed
bool wait_dequeue_timed(consumer_token_t &token, U &item, std::chrono::duration< Rep, Period > const &timeout)
Definition: blocking_concurrent_queue.hpp:492
genesis::utils::BlockingConcurrentQueue::wait_dequeue
void wait_dequeue(U &item)
Definition: blocking_concurrent_queue.hpp:415
genesis::utils::ConcurrentQueue::try_enqueue_bulk
bool try_enqueue_bulk(It itemFirst, size_t count)
Definition: concurrent_queue.hpp:1278
genesis::utils::ConcurrentQueue::enqueue
bool enqueue(T const &item)
Definition: concurrent_queue.hpp:1167
genesis::utils::BlockingConcurrentQueue::wait_dequeue_bulk
size_t wait_dequeue_bulk(It itemFirst, size_t max)
Definition: blocking_concurrent_queue.hpp:503
genesis::utils::BlockingConcurrentQueue::enqueue
bool enqueue(producer_token_t const &token, T &&item)
Definition: blocking_concurrent_queue.hpp:222
genesis::utils::BlockingConcurrentQueue::enqueue_bulk
bool enqueue_bulk(It itemFirst, size_t count)
Definition: blocking_concurrent_queue.hpp:238
genesis::utils::BlockingConcurrentQueue::try_enqueue_bulk
bool try_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
Definition: blocking_concurrent_queue.hpp:338
genesis::utils::BlockingConcurrentQueue::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
Definition: blocking_concurrent_queue.hpp:99
genesis::utils::BlockingConcurrentQueue::try_dequeue
bool try_dequeue(U &item)
Definition: blocking_concurrent_queue.hpp:352
MOODYCAMEL_DELETE_FUNCTION
#define MOODYCAMEL_DELETE_FUNCTION
Definition: concurrent_queue.hpp:324
genesis::utils::BlockingConcurrentQueue::try_enqueue
bool try_enqueue(T const &item)
Definition: blocking_concurrent_queue.hpp:268
genesis::utils::ConsumerToken
Definition: concurrent_queue.hpp:885
genesis::utils::BlockingConcurrentQueue::wait_dequeue
void wait_dequeue(consumer_token_t &token, U &item)
Definition: blocking_concurrent_queue.hpp:458
concurrent_queue.hpp
genesis::utils::BlockingConcurrentQueue::try_enqueue_bulk
bool try_enqueue_bulk(It itemFirst, size_t count)
Definition: blocking_concurrent_queue.hpp:323
MOODYCAMEL_THROW
#define MOODYCAMEL_THROW(expr)
Definition: concurrent_queue.hpp:274
lightweight_semaphore.hpp
genesis::utils::BlockingConcurrentQueue::is_lock_free
static constexpr bool is_lock_free()
Definition: blocking_concurrent_queue.hpp:601
genesis::utils::LightweightSemaphore
Definition: lightweight_semaphore.hpp:323
genesis::utils::BlockingConcurrentQueue::size_t
ConcurrentQueue::size_t size_t
Definition: blocking_concurrent_queue.hpp:91
genesis::utils::ConcurrentQueue::is_lock_free
static constexpr bool is_lock_free()
Definition: concurrent_queue.hpp:1510
genesis::utils::BlockingConcurrentQueue::try_enqueue
bool try_enqueue(producer_token_t const &token, T const &item)
Definition: blocking_concurrent_queue.hpp:294
genesis::utils::BlockingConcurrentQueue::EXPLICIT_INITIAL_INDEX_SIZE
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
Definition: blocking_concurrent_queue.hpp:96
genesis::utils::details::likely
static bool() likely(bool x)
Definition: concurrent_queue.hpp:412
genesis::utils::BlockingConcurrentQueue
Definition: blocking_concurrent_queue.hpp:82
genesis::utils::BlockingConcurrentQueue::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
Definition: blocking_concurrent_queue.hpp:95
genesis::utils::ConcurrentQueue< WrappedTask, ConcurrentQueueDefaultTraits >::index_t
ConcurrentQueueDefaultTraits ::index_t index_t
Definition: concurrent_queue.hpp:944
genesis::utils::ConcurrentQueue::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
Definition: concurrent_queue.hpp:951
genesis::utils::BlockingConcurrentQueue::consumer_token_t
ConcurrentQueue::consumer_token_t consumer_token_t
Definition: blocking_concurrent_queue.hpp:88
genesis::utils::ConcurrentQueue::enqueue_bulk
bool enqueue_bulk(It itemFirst, size_t count)
Definition: concurrent_queue.hpp:1211
genesis::utils::swap
void swap(Color &lhs, Color &rhs)
Definition: color.hpp:207
genesis::utils::BlockingConcurrentQueue::try_enqueue
bool try_enqueue(T &&item)
Definition: blocking_concurrent_queue.hpp:282
genesis::utils::BlockingConcurrentQueue::try_dequeue
bool try_dequeue(consumer_token_t &token, U &item)
Definition: blocking_concurrent_queue.hpp:368
genesis::utils::BlockingConcurrentQueue::operator=
BlockingConcurrentQueue & operator=(BlockingConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
Definition: blocking_concurrent_queue.hpp:149
genesis::utils::BlockingConcurrentQueue::wait_dequeue_bulk_timed
size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::chrono::duration< Rep, Period > const &timeout)
Definition: blocking_concurrent_queue.hpp:537
genesis::utils::ConcurrentQueue::EXPLICIT_INITIAL_INDEX_SIZE
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
Definition: concurrent_queue.hpp:949
genesis::utils::BlockingConcurrentQueue::try_dequeue_bulk
size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max)
Definition: blocking_concurrent_queue.hpp:401
genesis::utils::BlockingConcurrentQueue::wait_dequeue_timed
bool wait_dequeue_timed(U &item, std::chrono::duration< Rep, Period > const &timeout)
Definition: blocking_concurrent_queue.hpp:449
genesis::utils::BlockingConcurrentQueue::wait_dequeue_bulk
size_t wait_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max)
Definition: blocking_concurrent_queue.hpp:548
genesis::utils::ConcurrentQueue::try_enqueue
bool try_enqueue(T const &item)
Definition: concurrent_queue.hpp:1235
genesis::utils::BlockingConcurrentQueue::ssize_t
std::make_signed< size_t >::type ssize_t
Definition: blocking_concurrent_queue.hpp:92
genesis::utils::ConcurrentQueue
Definition: concurrent_queue.hpp:562
genesis::utils::BlockingConcurrentQueue::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
Definition: blocking_concurrent_queue.hpp:98
genesis::utils::BlockingConcurrentQueue::MAX_SUBQUEUE_SIZE
static const size_t MAX_SUBQUEUE_SIZE
Definition: blocking_concurrent_queue.hpp:100
genesis::utils::ConcurrentQueue< WrappedTask, ConcurrentQueueDefaultTraits >::size_t
ConcurrentQueueDefaultTraits ::size_t size_t
Definition: concurrent_queue.hpp:945
genesis::utils::ConcurrentQueue::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
Definition: concurrent_queue.hpp:952
genesis
Container namespace for all symbols of genesis in order to keep them separate when used as a library.
Definition: placement/formats/edge_color.cpp:42
genesis::utils::BlockingConcurrentQueue::wait_dequeue_bulk_timed
size_t wait_dequeue_bulk_timed(consumer_token_t &token, It itemFirst, size_t max, std::int64_t timeout_usecs)
Definition: blocking_concurrent_queue.hpp:566
genesis::utils::BlockingConcurrentQueue::IMPLICIT_INITIAL_INDEX_SIZE
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
Definition: blocking_concurrent_queue.hpp:97
genesis::utils::BlockingConcurrentQueue::wait_dequeue_timed
bool wait_dequeue_timed(U &item, std::int64_t timeout_usecs)
Definition: blocking_concurrent_queue.hpp:433
genesis::utils::ConcurrentQueue::IMPLICIT_INITIAL_INDEX_SIZE
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
Definition: concurrent_queue.hpp:950
genesis::utils::ConcurrentQueue::BLOCK_SIZE
static const size_t BLOCK_SIZE
Definition: concurrent_queue.hpp:947
genesis::utils::BlockingConcurrentQueue::swap
void swap(BlockingConcurrentQueue &other) MOODYCAMEL_NOEXCEPT
Definition: blocking_concurrent_queue.hpp:159
genesis::utils::LightweightSemaphore::ssize_t
std::make_signed< std::size_t >::type ssize_t
Definition: lightweight_semaphore.hpp:325
genesis::utils::BlockingConcurrentQueue::BlockingConcurrentQueue
BlockingConcurrentQueue(size_t capacity=6 *BLOCK_SIZE)
Definition: blocking_concurrent_queue.hpp:113
genesis::utils::ConcurrentQueue::MAX_SUBQUEUE_SIZE
static const size_t MAX_SUBQUEUE_SIZE
Definition: concurrent_queue.hpp:958
genesis::utils::BlockingConcurrentQueue::try_enqueue
bool try_enqueue(producer_token_t const &token, T &&item)
Definition: blocking_concurrent_queue.hpp:306
genesis::utils::BlockingConcurrentQueue::enqueue
bool enqueue(T &&item)
Definition: blocking_concurrent_queue.hpp:196
genesis::utils::ConcurrentQueue::try_dequeue
bool try_dequeue(U &item)
Definition: concurrent_queue.hpp:1301
genesis::utils::BlockingConcurrentQueue::operator=
BlockingConcurrentQueue & operator=(BlockingConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
genesis::utils::BlockingConcurrentQueue::enqueue
bool enqueue(T const &item)
Definition: blocking_concurrent_queue.hpp:182
genesis::utils::BlockingConcurrentQueue::wait_dequeue_bulk_timed
size_t wait_dequeue_bulk_timed(consumer_token_t &token, It itemFirst, size_t max, std::chrono::duration< Rep, Period > const &timeout)
Definition: blocking_concurrent_queue.hpp:582
genesis::utils::BlockingConcurrentQueue::enqueue
bool enqueue(producer_token_t const &token, T const &item)
Definition: blocking_concurrent_queue.hpp:209
genesis::utils::BlockingConcurrentQueue::enqueue_bulk
bool enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
Definition: blocking_concurrent_queue.hpp:254
genesis::utils::ConcurrentQueue::swap
void swap(ConcurrentQueue &other) noexcept
Definition: concurrent_queue.hpp:1127
genesis::utils::ProducerToken
Definition: concurrent_queue.hpp:821
genesis::utils::BlockingConcurrentQueue::try_dequeue_bulk
size_t try_dequeue_bulk(It itemFirst, size_t max)
Definition: blocking_concurrent_queue.hpp:385
genesis::utils::BlockingConcurrentQueue::wait_dequeue_timed
bool wait_dequeue_timed(consumer_token_t &token, U &item, std::int64_t timeout_usecs)
Definition: blocking_concurrent_queue.hpp:476
genesis::utils::BlockingConcurrentQueue::size_approx
size_t size_approx() const
Definition: blocking_concurrent_queue.hpp:593
genesis::utils::BlockingConcurrentQueue::BlockingConcurrentQueue
BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
Definition: blocking_concurrent_queue.hpp:123
genesis::utils::BlockingConcurrentQueue::BlockingConcurrentQueue
BlockingConcurrentQueue(BlockingConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
Definition: blocking_concurrent_queue.hpp:143
genesis::utils::BlockingConcurrentQueue::index_t
ConcurrentQueue::index_t index_t
Definition: blocking_concurrent_queue.hpp:90