A library for working with phylogenetic and population genetic data.
v0.32.0
concurrent_queue.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_UTILS_THREADING_CONCURRENT_QUEUE_H_
2 #define GENESIS_UTILS_THREADING_CONCURRENT_QUEUE_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2024 Lucas Czech
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lucas.czech@sund.ku.dk>
23  University of Copenhagen, Globe Institute, Section for GeoGenetics
24  Oster Voldgade 5-7, 1350 Copenhagen K, Denmark
25 */
26 
27 /*
28  This code below is adapted from the excellent moodycamel::ConcurrentQueue
29  (https://github.com/cameron314/concurrentqueue), using version v1.0.4,
30  which was published under a simplified BSD license, and also dual-licensed
31  under the Boost Software License. The full original license
32  (https://github.com/cameron314/concurrentqueue/blob/master/LICENSE.md), is copied
33  in our documentation as well, see `genesis/doc/manual/supplement/acknowledgements.md`, and
34  @link supplement_acknowledgements_code_reuse_concurrent_queue Acknowledgements@endlink.
35 
36  See also BlockingConcurrentQueue and LightweightSemaphore for the other two classes
37  that we adapted from the original repository.
38 
39  We adapted the original code by (roughly) formatting it to our formatting standard, as well as
40  renaming the namespace from moodycamel to be contained within our namespace, to keep our
41  documentation and usage consistent. Other than that, all functionality is kept as-is.
42  It feels weird to rename the namespaces, as it might seem that we are trying to hide the fact
43  that this code is not ours. That is not the case, and is merely based on our compulsion
44  for orderly namespaces and documentation within genesis. Please note that the original code
45  is excellet work that we would like to acknowledge here! Also, if anyone wants to do the same
46  with genesis code, please feel free to do so ;-)
47 */
48 
56 // =================================================================================================
57 // Concurrent Queue
58 // =================================================================================================
59 
60 // Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue.
61 // An overview, including benchmark results, is provided here:
62 // http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
63 // The full design is also described in excruciating detail at:
64 // http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
65 
66 // Simplified BSD license:
67 // Copyright (c) 2013-2020, Cameron Desrochers.
68 // All rights reserved.
69 //
70 // Redistribution and use in source and binary forms, with or without modification,
71 // are permitted provided that the following conditions are met:
72 //
73 // - Redistributions of source code must retain the above copyright notice, this list of
74 // conditions and the following disclaimer.
75 // - Redistributions in binary form must reproduce the above copyright notice, this list of
76 // conditions and the following disclaimer in the documentation and/or other materials
77 // provided with the distribution.
78 //
79 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
80 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
81 // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
82 // THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
83 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
84 // OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
85 // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
86 // TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
87 // EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
88 
89 // Also dual-licensed under the Boost Software License (see LICENSE.md)
90 
91 #if defined(__GNUC__) && !defined(__INTEL_COMPILER)
92 // Disable -Wconversion warnings (spuriously triggered when Traits::size_t and
93 // Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings
94 // upon assigning any computed values)
95 #pragma GCC diagnostic push
96 #pragma GCC diagnostic ignored "-Wconversion"
97 
98 #ifdef MCDBGQ_USE_RELACY
99 #pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
100 #endif
101 #endif
102 
103 #if defined(_MSC_VER) && (!defined(_HAS_CXX17) || !_HAS_CXX17)
104 // VS2019 with /W4 warns about constant conditional expressions but unless /std=c++17 or higher
105 // does not support `if constexpr`, so we have no choice but to simply disable the warning
106 #pragma warning(push)
107 #pragma warning(disable : 4127) // conditional expression is constant
108 #endif
109 
110 #if defined(__APPLE__)
111 #include "TargetConditionals.h"
112 #endif
113 
114 #ifdef MCDBGQ_USE_RELACY
115 #include "relacy/relacy_std.hpp"
116 #include "relacy_shims.h"
117 // We only use malloc/free anyway, and the delete macro messes up `= delete` method declarations.
118 // We'll override the default trait malloc ourselves without a macro.
119 #undef new
120 #undef delete
121 #undef malloc
122 #undef free
123 #else
124 #include <atomic> // Requires C++11. Sorry VS2010.
125 #include <cassert>
126 #endif
127 #include <algorithm>
128 #include <array>
129 #include <climits> // for CHAR_BIT
130 #include <cstddef> // for max_align_t
131 #include <cstdint>
132 #include <cstdlib>
133 #include <limits>
134 #include <mutex> // used for thread exit synchronization
135 #include <thread> // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading
136 #include <type_traits>
137 #include <utility>
138 
139 // Platform-specific definitions of a numeric thread ID type and an invalid value
140 namespace genesis {
141 namespace utils {
142 namespace details {
143  template <typename thread_id_t>
147  static thread_id_hash_t prehash(thread_id_t const& x) { return x; }
148  };
149 } // namespace details
150 } // namespace utils
151 } // namespace genesis
152 #if defined(MCDBGQ_USE_RELACY)
153 namespace genesis {
154 namespace utils {
155 namespace details {
156  typedef std::uint32_t thread_id_t;
157  static const thread_id_t invalid_thread_id = 0xFFFFFFFFU;
158  static const thread_id_t invalid_thread_id2 = 0xFFFFFFFEU;
159  static inline thread_id_t thread_id() { return rl::thread_index(); }
160 } // namespace details
161 } // namespace utils
162 } // namespace genesis
163 #elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
164 // No sense pulling in windows.h in a header, we'll manually declare the function
165 // we use and rely on backwards-compatibility for this not to break
166 extern "C" __declspec(dllimport) unsigned long __stdcall GetCurrentThreadId(void);
167 namespace genesis {
168 namespace utils {
169 namespace details {
170  static_assert(sizeof(unsigned long) == sizeof(std::uint32_t), "Expected size of unsigned long to be 32 bits on Windows");
171  typedef std::uint32_t thread_id_t;
172  static const thread_id_t invalid_thread_id = 0; // See http://blogs.msdn.com/b/oldnewthing/archive/2004/02/23/78395.aspx
173  static const thread_id_t invalid_thread_id2 = 0xFFFFFFFFU; // Not technically guaranteed to be invalid, but is never used in practice. Note that all Win32 thread IDs are presently multiples of 4.
174  static inline thread_id_t thread_id() { return static_cast<thread_id_t>(::GetCurrentThreadId()); }
175 } // namespace details
176 } // namespace utils
177 } // namespace genesis
178 #elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE) || defined(__MVS__) || defined(MOODYCAMEL_NO_THREAD_LOCAL)
179 namespace genesis {
180 namespace utils {
181 namespace details {
182  static_assert(sizeof(std::thread::id) == 4 || sizeof(std::thread::id) == 8, "std::thread::id is expected to be either 4 or 8 bytes");
183 
184  typedef std::thread::id thread_id_t;
185  static const thread_id_t invalid_thread_id; // Default ctor creates invalid ID
186 
187  // Note we don't define a invalid_thread_id2 since std::thread::id doesn't have one; it's
188  // only used if MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is defined anyway, which it won't
189  // be.
190  static inline thread_id_t thread_id() { return std::this_thread::get_id(); }
191 
192  template <std::size_t>
193  struct thread_id_size {
194  };
195  template <>
196  struct thread_id_size<4> {
197  typedef std::uint32_t numeric_t;
198  };
199  template <>
200  struct thread_id_size<8> {
201  typedef std::uint64_t numeric_t;
202  };
203 
204  template <>
205  struct thread_id_converter<thread_id_t> {
206  typedef thread_id_size<sizeof(thread_id_t)>::numeric_t thread_id_numeric_size_t;
207 #ifndef __APPLE__
208  typedef std::size_t thread_id_hash_t;
209 #else
211 #endif
212 
213  static thread_id_hash_t prehash(thread_id_t const& x)
214  {
215 #ifndef __APPLE__
216  return std::hash<std::thread::id>()(x);
217 #else
218  return *reinterpret_cast<thread_id_hash_t const*>(&x);
219 #endif
220  }
221  };
222 } // namespace details
223 } // namespace utils
224 } // namespace genesis
225 #else
226 // Use a nice trick from this answer: http://stackoverflow.com/a/8438730/21475
227 // In order to get a numeric thread ID in a platform-independent way, we use a thread-local
228 // static variable's address as a thread identifier :-)
229 #if defined(__GNUC__) || defined(__INTEL_COMPILER)
230 #define MOODYCAMEL_THREADLOCAL __thread
231 #elif defined(_MSC_VER)
232 #define MOODYCAMEL_THREADLOCAL __declspec(thread)
233 #else
234 // Assume C++11 compliant compiler
235 #define MOODYCAMEL_THREADLOCAL thread_local
236 #endif
237 namespace genesis {
238 namespace utils {
239 namespace details {
240  typedef std::uintptr_t thread_id_t;
241  static const thread_id_t invalid_thread_id = 0; // Address can't be nullptr
242  static const thread_id_t invalid_thread_id2 = 1; // Member accesses off a null pointer are also generally invalid. Plus it's not aligned.
244  {
245  static MOODYCAMEL_THREADLOCAL int x;
246  return reinterpret_cast<thread_id_t>(&x);
247  }
248 } // namespace details
249 } // namespace utils
250 } // namespace genesis
251 #endif
252 
253 // Constexpr if
254 #ifndef MOODYCAMEL_CONSTEXPR_IF
255 #if (defined(_MSC_VER) && defined(_HAS_CXX17) && _HAS_CXX17) || __cplusplus > 201402L
256 #define MOODYCAMEL_CONSTEXPR_IF if constexpr
257 #define MOODYCAMEL_MAYBE_UNUSED [[maybe_unused]]
258 #else
259 #define MOODYCAMEL_CONSTEXPR_IF if
260 #define MOODYCAMEL_MAYBE_UNUSED
261 #endif
262 #endif
263 
264 // Exceptions
265 #ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
266 #if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
267 #define MOODYCAMEL_EXCEPTIONS_ENABLED
268 #endif
269 #endif
270 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
271 #define MOODYCAMEL_TRY try
272 #define MOODYCAMEL_CATCH(...) catch (__VA_ARGS__)
273 #define MOODYCAMEL_RETHROW throw
274 #define MOODYCAMEL_THROW(expr) throw(expr)
275 #else
276 #define MOODYCAMEL_TRY MOODYCAMEL_CONSTEXPR_IF(true)
277 #define MOODYCAMEL_CATCH(...) else MOODYCAMEL_CONSTEXPR_IF(false)
278 #define MOODYCAMEL_RETHROW
279 #define MOODYCAMEL_THROW(expr)
280 #endif
281 
282 #ifndef MOODYCAMEL_NOEXCEPT
283 #if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
284 #define MOODYCAMEL_NOEXCEPT
285 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
286 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
287 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
288 // VS2012's std::is_nothrow_[move_]constructible is broken and returns true when it shouldn't :-(
289 // We have to assume *all* non-trivial constructors may throw on VS2012!
290 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT
291 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value)
292 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
293 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
294 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT
295 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value)
296 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
297 #else
298 #define MOODYCAMEL_NOEXCEPT noexcept
299 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
300 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
301 #endif
302 #endif
303 
304 #ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
305 #ifdef MCDBGQ_USE_RELACY
306 #define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
307 #else
308 // VS2013 doesn't support `thread_local`, and MinGW-w64 w/ POSIX threading has a crippling bug: http://sourceforge.net/p/mingw-w64/bugs/445
309 // g++ <=4.7 doesn't support thread_local either.
310 // Finally, iOS/ARM doesn't have support for it either, and g++/ARM allows it to compile but it's unconfirmed to actually work
311 #if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__) && !defined(__MVS__)
312 // Assume `thread_local` is fully supported in all other C++11 compilers/platforms
313 #define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED // tentatively enabled for now; years ago several users report having problems with it on
314 #endif
315 #endif
316 #endif
317 
318 // VS2012 doesn't support deleted functions.
319 // In this case, we declare the function normally but don't define it. A link error will be generated if the function is called.
320 #ifndef MOODYCAMEL_DELETE_FUNCTION
321 #if defined(_MSC_VER) && _MSC_VER < 1800
322 #define MOODYCAMEL_DELETE_FUNCTION
323 #else
324 #define MOODYCAMEL_DELETE_FUNCTION = delete
325 #endif
326 #endif
327 
328 namespace genesis {
329 namespace utils {
330 namespace details {
331 #ifndef MOODYCAMEL_ALIGNAS
332 // VS2013 doesn't support alignas or alignof, and align() requires a constant literal
333 #if defined(_MSC_VER) && _MSC_VER <= 1800
334 #define MOODYCAMEL_ALIGNAS(alignment) __declspec(align(alignment))
335 #define MOODYCAMEL_ALIGNOF(obj) __alignof(obj)
336 #define MOODYCAMEL_ALIGNED_TYPE_LIKE(T, obj) typename details::Vs2013Aligned<std::alignment_of<obj>::value, T>::type
337  template <int Align, typename T>
338  struct Vs2013Aligned {
339  }; // default, unsupported alignment
340  template <typename T>
341  struct Vs2013Aligned<1, T> {
342  typedef __declspec(align(1)) T type;
343  };
344  template <typename T>
345  struct Vs2013Aligned<2, T> {
346  typedef __declspec(align(2)) T type;
347  };
348  template <typename T>
349  struct Vs2013Aligned<4, T> {
350  typedef __declspec(align(4)) T type;
351  };
352  template <typename T>
353  struct Vs2013Aligned<8, T> {
354  typedef __declspec(align(8)) T type;
355  };
356  template <typename T>
357  struct Vs2013Aligned<16, T> {
358  typedef __declspec(align(16)) T type;
359  };
360  template <typename T>
361  struct Vs2013Aligned<32, T> {
362  typedef __declspec(align(32)) T type;
363  };
364  template <typename T>
365  struct Vs2013Aligned<64, T> {
366  typedef __declspec(align(64)) T type;
367  };
368  template <typename T>
369  struct Vs2013Aligned<128, T> {
370  typedef __declspec(align(128)) T type;
371  };
372  template <typename T>
373  struct Vs2013Aligned<256, T> {
374  typedef __declspec(align(256)) T type;
375  };
376 #else
377  template <typename T>
378  struct identity {
379  typedef T type;
380  };
381 #define MOODYCAMEL_ALIGNAS(alignment) alignas(alignment)
382 #define MOODYCAMEL_ALIGNOF(obj) alignof(obj)
383 #define MOODYCAMEL_ALIGNED_TYPE_LIKE(T, obj) alignas(alignof(obj)) typename details::identity<T>::type
384 #endif
385 #endif
386 } // namespace details
387 } // namespace utils
388 } // namespace genesis
389 
390 // TSAN can false report races in lock-free code. To enable TSAN to be used from projects that use this one,
391 // we can apply per-function compile-time suppression.
392 // See https://clang.llvm.org/docs/ThreadSanitizer.html#has-feature-thread-sanitizer
393 #define MOODYCAMEL_NO_TSAN
394 #if defined(__has_feature)
395 #if __has_feature(thread_sanitizer)
396 #undef MOODYCAMEL_NO_TSAN
397 #define MOODYCAMEL_NO_TSAN __attribute__((no_sanitize("thread")))
398 #endif // TSAN
399 #endif // TSAN
400 
401 // Compiler-specific likely/unlikely hints
402 namespace genesis {
403 namespace utils {
404 namespace details {
405 #if defined(__GNUC__)
406  static inline bool(likely)(bool x)
407  {
408  return __builtin_expect((x), true);
409  }
410  static inline bool(unlikely)(bool x) { return __builtin_expect((x), false); }
411 #else
412  static inline bool(likely)(bool x)
413  {
414  return x;
415  }
416  static inline bool(unlikely)(bool x) { return x; }
417 #endif
418 } // namespace details
419 } // namespace utils
420 } // namespace genesis
421 
422 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
423 #include "internal/concurrentqueue_internal_debug.h"
424 #endif
425 
426 namespace genesis {
427 namespace utils {
428 namespace details {
429  template <typename T>
431  static_assert(std::is_integral<T>::value, "const_numeric_max can only be used with integers");
432  static const T value = std::numeric_limits<T>::is_signed
433  ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast<T>(1)
434  : static_cast<T>(-1);
435  };
436 
437 #if defined(__GLIBCXX__)
438  typedef ::max_align_t std_max_align_t; // libstdc++ forgot to add it to std:: for a while
439 #else
440  typedef std::max_align_t std_max_align_t; // Others (e.g. MSVC) insist it can *only* be accessed via std::
441 #endif
442 
443  // Some platforms have incorrectly set max_align_t to a type with <8 bytes alignment even while supporting
444  // 8-byte aligned scalar values (*cough* 32-bit iOS). Work around this with our own union. See issue #64.
445  typedef union {
447  long long y;
448  void* z;
449  } max_align_t;
450 }
451 
452 // Default traits for the ConcurrentQueue. To change some of the
453 // traits without re-implementing all of them, inherit from this
454 // struct and shadow the declarations you wish to be different;
455 // since the traits are used as a template type parameter, the
456 // shadowed declarations will be used where defined, and the defaults
457 // otherwise.
459  // General-purpose size type. std::size_t is strongly recommended.
460  typedef std::size_t size_t;
461 
462  // The type used for the enqueue and dequeue indices. Must be at least as
463  // large as size_t. Should be significantly larger than the number of elements
464  // you expect to hold at once, especially if you have a high turnover rate;
465  // for example, on 32-bit x86, if you expect to have over a hundred million
466  // elements or pump several million elements through your queue in a very
467  // short space of time, using a 32-bit type *may* trigger a race condition.
468  // A 64-bit int type is recommended in that case, and in practice will
469  // prevent a race condition no matter the usage of the queue. Note that
470  // whether the queue is lock-free with a 64-int type depends on the whether
471  // std::atomic<std::uint64_t> is lock-free, which is platform-specific.
472  typedef std::size_t index_t;
473 
474  // Internally, all elements are enqueued and dequeued from multi-element
475  // blocks; this is the smallest controllable unit. If you expect few elements
476  // but many producers, a smaller block size should be favoured. For few producers
477  // and/or many elements, a larger block size is preferred. A sane default
478  // is provided. Must be a power of 2.
479  static const size_t BLOCK_SIZE = 32;
480 
481  // For explicit producers (i.e. when using a producer token), the block is
482  // checked for being empty by iterating through a list of flags, one per element.
483  // For large block sizes, this is too inefficient, and switching to an atomic
484  // counter-based approach is faster. The switch is made for block sizes strictly
485  // larger than this threshold.
486  static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32;
487 
488  // How many full blocks can be expected for a single explicit producer? This should
489  // reflect that number's maximum for optimal performance. Must be a power of 2.
490  static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
491 
492  // How many full blocks can be expected for a single implicit producer? This should
493  // reflect that number's maximum for optimal performance. Must be a power of 2.
494  static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;
495 
496  // The initial size of the hash table mapping thread IDs to implicit producers.
497  // Note that the hash is resized every time it becomes half full.
498  // Must be a power of two, and either 0 or at least 1. If 0, implicit production
499  // (using the enqueue methods without an explicit producer token) is disabled.
500  static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32;
501 
502  // Controls the number of items that an explicit consumer (i.e. one with a token)
503  // must consume before it causes all consumers to rotate and move on to the next
504  // internal queue.
505  static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256;
506 
507  // The maximum number of elements (inclusive) that can be enqueued to a sub-queue.
508  // Enqueue operations that would cause this limit to be surpassed will fail. Note
509  // that this limit is enforced at the block level (for performance reasons), i.e.
510  // it's rounded up to the nearest block size.
512 
513  // The number of times to spin before sleeping when waiting on a semaphore.
514  // Recommended values are on the order of 1000-10000 unless the number of
515  // consumer threads exceeds the number of idle cores (in which case try 0-100).
516  // Only affects instances of the BlockingConcurrentQueue.
517  static const int MAX_SEMA_SPINS = 10000;
518 
519  // Whether to recycle dynamically-allocated blocks into an internal free list or
520  // not. If false, only pre-allocated blocks (controlled by the constructor
521  // arguments) will be recycled, and all others will be `free`d back to the heap.
522  // Note that blocks consumed by explicit producers are only freed on destruction
523  // of the queue (not following destruction of the token) regardless of this trait.
524  static const bool RECYCLE_ALLOCATED_BLOCKS = false;
525 
526 #ifndef MCDBGQ_USE_RELACY
527  // Memory allocation can be customized if needed.
528  // malloc should return nullptr on failure, and handle alignment like std::malloc.
529 #if defined(malloc) || defined(free)
530  // Gah, this is 2015, stop defining macros that break standard code already!
531  // Work around malloc/free being special macros:
532  static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); }
533  static inline void WORKAROUND_free(void* ptr) { return free(ptr); }
534  static inline void*(malloc)(size_t size) { return WORKAROUND_malloc(size); }
535  static inline void(free)(void* ptr) { return WORKAROUND_free(ptr); }
536 #else
537  static inline void* malloc(size_t size)
538  {
539  return std::malloc(size);
540  }
541  static inline void free(void* ptr) { return std::free(ptr); }
542 #endif
543 #else
544  // Debug versions when running under the Relacy race detector (ignore
545  // these in user code)
546  static inline void* malloc(size_t size) { return rl::rl_malloc(size, $); }
547  static inline void free(void* ptr) { return rl::rl_free(ptr, $); }
548 #endif
549 };
550 
551 // When producing or consuming many elements, the most efficient way is to:
552 // 1) Use one of the bulk-operation methods of the queue with a token
553 // 2) Failing that, use the bulk-operation methods without a token
554 // 3) Failing that, create a token and use that with the single-item methods
555 // 4) Failing that, use the single-parameter methods of the queue
556 // Having said that, don't create tokens willy-nilly -- ideally there should be
557 // a maximum of one token per thread (of each kind).
558 struct ProducerToken;
559 struct ConsumerToken;
560 
561 template <typename T, typename Traits>
563 template <typename T, typename Traits>
565 class ConcurrentQueueTests;
566 
567 namespace details {
570  std::atomic<bool> inactive;
572 
574  : next(nullptr)
575  , inactive(false)
576  , token(nullptr)
577  {
578  }
579  };
580 
581  template <bool use32>
582  struct _hash_32_or_64 {
583  static inline std::uint32_t hash(std::uint32_t h)
584  {
585  // MurmurHash3 finalizer -- see https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp
586  // Since the thread ID is already unique, all we really want to do is propagate that
587  // uniqueness evenly across all the bits, so that we can use a subset of the bits while
588  // reducing collisions significantly
589  h ^= h >> 16;
590  h *= 0x85ebca6b;
591  h ^= h >> 13;
592  h *= 0xc2b2ae35;
593  return h ^ (h >> 16);
594  }
595  };
596  template <>
597  struct _hash_32_or_64<1> {
598  static inline std::uint64_t hash(std::uint64_t h)
599  {
600  h ^= h >> 33;
601  h *= 0xff51afd7ed558ccd;
602  h ^= h >> 33;
603  h *= 0xc4ceb9fe1a85ec53;
604  return h ^ (h >> 33);
605  }
606  };
607  template <std::size_t size>
608  struct hash_32_or_64 : public _hash_32_or_64<(size > 4)> {
609  };
610 
611  static inline size_t hash_thread_id(thread_id_t id)
612  {
613  static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values");
614  return static_cast<size_t>(hash_32_or_64<sizeof(thread_id_converter<thread_id_t>::thread_id_hash_t)>::hash(
616  }
617 
618  template <typename T>
619  static inline bool circular_less_than(T a, T b)
620  {
621  static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "circular_less_than is intended to be used only with unsigned integer types");
622  return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << (static_cast<T>(sizeof(T) * CHAR_BIT - 1)));
623  // Note: extra parens around rhs of operator<< is MSVC bug: https://developercommunity2.visualstudio.com/t/C4554-triggers-when-both-lhs-and-rhs-is/10034931
624  // silencing the bug requires #pragma warning(disable: 4554) around the calling code and has no effect when done here.
625  }
626 
627  template <typename U>
628  static inline char* align_for(char* ptr)
629  {
630  const std::size_t alignment = std::alignment_of<U>::value;
631  return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
632  }
633 
634  template <typename T>
635  static inline T ceil_to_pow_2(T x)
636  {
637  static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "ceil_to_pow_2 is intended to be used only with unsigned integer types");
638 
639  // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
640  --x;
641  x |= x >> 1;
642  x |= x >> 2;
643  x |= x >> 4;
644  for (std::size_t i = 1; i < sizeof(T); i <<= 1) {
645  x |= x >> (i << 3);
646  }
647  ++x;
648  return x;
649  }
650 
651  template <typename T>
652  static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right)
653  {
654  T temp = std::move(left.load(std::memory_order_relaxed));
655  left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
656  right.store(std::move(temp), std::memory_order_relaxed);
657  }
658 
659  template <typename T>
660  static inline T const& nomove(T const& x)
661  {
662  return x;
663  }
664 
665  template <bool Enable>
666  struct nomove_if {
667  template <typename T>
668  static inline T const& eval(T const& x)
669  {
670  return x;
671  }
672  };
673 
674  template <>
675  struct nomove_if<false> {
676  template <typename U>
677  static inline auto eval(U&& x)
678  -> decltype(std::forward<U>(x))
679  {
680  return std::forward<U>(x);
681  }
682  };
683 
684  template <typename It>
685  static inline auto deref_noexcept(It& it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
686  {
687  return *it;
688  }
689 
690 #if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
691  template <typename T>
692  struct is_trivially_destructible : std::is_trivially_destructible<T> {
693  };
694 #else
695  template <typename T>
696  struct is_trivially_destructible : std::has_trivial_destructor<T> {
697  };
698 #endif
699 
700 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
701 #ifdef MCDBGQ_USE_RELACY
702  typedef RelacyThreadExitListener ThreadExitListener;
703  typedef RelacyThreadExitNotifier ThreadExitNotifier;
704 #else
705  class ThreadExitNotifier;
706 
708  typedef void (*callback_t)(void*);
710  void* userData;
711 
712  ThreadExitListener* next; // reserved for use by the ThreadExitNotifier
713  ThreadExitNotifier* chain; // reserved for use by the ThreadExitNotifier
714  };
715 
717  public:
718  static void subscribe(ThreadExitListener* listener)
719  {
720  auto& tlsInst = instance();
721  std::lock_guard<std::mutex> guard(mutex());
722  listener->next = tlsInst.tail;
723  listener->chain = &tlsInst;
724  tlsInst.tail = listener;
725  }
726 
727  static void unsubscribe(ThreadExitListener* listener)
728  {
729  std::lock_guard<std::mutex> guard(mutex());
730  if (!listener->chain) {
731  return; // race with ~ThreadExitNotifier
732  }
733  auto& tlsInst = *listener->chain;
734  listener->chain = nullptr;
735  ThreadExitListener** prev = &tlsInst.tail;
736  for (auto ptr = tlsInst.tail; ptr != nullptr; ptr = ptr->next) {
737  if (ptr == listener) {
738  *prev = ptr->next;
739  break;
740  }
741  prev = &ptr->next;
742  }
743  }
744 
745  private:
747  : tail(nullptr)
748  {
749  }
750  ThreadExitNotifier(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
751  ThreadExitNotifier& operator=(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
752 
753  ~ThreadExitNotifier()
754  {
755  // This thread is about to exit, let everyone know!
756  assert(this == &instance() && "If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
757  std::lock_guard<std::mutex> guard(mutex());
758  for (auto ptr = tail; ptr != nullptr; ptr = ptr->next) {
759  ptr->chain = nullptr;
760  ptr->callback(ptr->userData);
761  }
762  }
763 
764  // Thread-local
765  static inline ThreadExitNotifier& instance()
766  {
767  static thread_local ThreadExitNotifier notifier;
768  return notifier;
769  }
770 
771  static inline std::mutex& mutex()
772  {
773  // Must be static because the ThreadExitNotifier could be destroyed while unsubscribe is called
774  static std::mutex mutex;
775  return mutex;
776  }
777 
778  private:
779  ThreadExitListener* tail;
780  };
781 #endif
782 #endif
783 
784  template <typename T>
786  enum { value = 0 };
787  };
788  template <>
789  struct static_is_lock_free_num<signed char> {
790  enum { value = ATOMIC_CHAR_LOCK_FREE };
791  };
792  template <>
793  struct static_is_lock_free_num<short> {
794  enum { value = ATOMIC_SHORT_LOCK_FREE };
795  };
796  template <>
798  enum { value = ATOMIC_INT_LOCK_FREE };
799  };
800  template <>
801  struct static_is_lock_free_num<long> {
802  enum { value = ATOMIC_LONG_LOCK_FREE };
803  };
804  template <>
805  struct static_is_lock_free_num<long long> {
806  enum { value = ATOMIC_LLONG_LOCK_FREE };
807  };
808  template <typename T>
809  struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> {
810  };
811  template <>
812  struct static_is_lock_free<bool> {
813  enum { value = ATOMIC_BOOL_LOCK_FREE };
814  };
815  template <typename U>
816  struct static_is_lock_free<U*> {
817  enum { value = ATOMIC_POINTER_LOCK_FREE };
818  };
819 }
820 
822  template <typename T, typename Traits>
823  explicit ProducerToken(ConcurrentQueue<T, Traits>& queue);
824 
825  template <typename T, typename Traits>
827 
829  : producer(other.producer)
830  {
831  other.producer = nullptr;
832  if (producer != nullptr) {
833  producer->token = this;
834  }
835  }
836 
838  {
839  swap(other);
840  return *this;
841  }
842 
844  {
845  std::swap(producer, other.producer);
846  if (producer != nullptr) {
847  producer->token = this;
848  }
849  if (other.producer != nullptr) {
850  other.producer->token = &other;
851  }
852  }
853 
854  // A token is always valid unless:
855  // 1) Memory allocation failed during construction
856  // 2) It was moved via the move constructor
857  // (Note: assignment does a swap, leaving both potentially valid)
858  // 3) The associated queue was destroyed
859  // Note that if valid() returns true, that only indicates
860  // that the token is valid for use with a specific queue,
861  // but not which one; that's up to the user to track.
862  inline bool valid() const { return producer != nullptr; }
863 
865  {
866  if (producer != nullptr) {
867  producer->token = nullptr;
868  producer->inactive.store(true, std::memory_order_release);
869  }
870  }
871 
872  // Disable copying and assignment
875 
876 private:
877  template <typename T, typename Traits>
878  friend class ConcurrentQueue;
879  friend class ConcurrentQueueTests;
880 
881 protected:
883 };
884 
886  template <typename T, typename Traits>
888 
889  template <typename T, typename Traits>
891 
893  : initialOffset(other.initialOffset),
894  lastKnownGlobalOffset(other.lastKnownGlobalOffset),
895  itemsConsumedFromCurrent(other.itemsConsumedFromCurrent),
896  currentProducer(other.currentProducer),
897  desiredProducer(other.desiredProducer)
898  {
899  }
900 
902  {
903  swap(other);
904  return *this;
905  }
906 
908  {
909  std::swap(initialOffset, other.initialOffset);
910  std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
911  std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
912  std::swap(currentProducer, other.currentProducer);
913  std::swap(desiredProducer, other.desiredProducer);
914  }
915 
916  // Disable copying and assignment
919 
920 private:
921  template <typename T, typename Traits>
922  friend class ConcurrentQueue;
923  friend class ConcurrentQueueTests;
924 
925 private: // but shared with ConcurrentQueue
926  std::uint32_t initialOffset;
927  std::uint32_t lastKnownGlobalOffset;
928  std::uint32_t itemsConsumedFromCurrent;
931 };
932 
933 // Need to forward-declare this swap because it's in a namespace.
934 // See http://stackoverflow.com/questions/4492062/why-does-a-c-friend-class-need-a-forward-declaration-only-in-other-namespaces
935 template <typename T, typename Traits>
937 
938 template <typename T, typename Traits = ConcurrentQueueDefaultTraits>
939 class ConcurrentQueue {
940 public:
943 
944  typedef typename Traits::index_t index_t;
945  typedef typename Traits::size_t size_t;
946 
947  static const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE);
948  static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
949  static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
950  static const size_t IMPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE);
951  static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);
952  static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast<std::uint32_t>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
953 #ifdef _MSC_VER
954 #pragma warning(push)
955 #pragma warning(disable : 4307) // + integral constant overflow (that's what the ternary expression is for!)
956 #pragma warning(disable : 4309) // static_cast: Truncation of constant value
957 #endif
958  static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max<size_t>::value - static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) ? details::const_numeric_max<size_t>::value : ((static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE);
959 #ifdef _MSC_VER
960 #pragma warning(pop)
961 #endif
962 
963  static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value, "Traits::size_t must be an unsigned integral type");
964  static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value, "Traits::index_t must be an unsigned integral type");
965  static_assert(sizeof(index_t) >= sizeof(size_t), "Traits::index_t must be at least as wide as Traits::size_t");
966  static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)), "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
967  static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)), "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");
968  static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
969  static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) && !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
970  static_assert((INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) || !(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE & (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE - 1)), "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");
971  static_assert(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 || INITIAL_IMPLICIT_PRODUCER_HASH_SIZE >= 1, "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least 1 (or 0 to disable implicit enqueueing)");
972 
973 public:
974  // Creates a queue with at least `capacity` element slots; note that the
975  // actual number of elements that can be inserted without additional memory
976  // allocation depends on the number of producers and the block size (e.g. if
977  // the block size is equal to `capacity`, only a single block will be allocated
978  // up-front, which means only a single producer will be able to enqueue elements
979  // without an extra allocation -- blocks aren't shared between producers).
980  // This method is not thread safe -- it is up to the user to ensure that the
981  // queue is fully constructed before it starts being used by other threads (this
982  // includes making the memory effects of construction visible, possibly with a
983  // memory barrier).
984  explicit ConcurrentQueue(size_t capacity = 32 * BLOCK_SIZE)
985  : producerListTail(nullptr)
986  , producerCount(0)
987  , initialBlockPoolIndex(0)
988  , nextExplicitConsumerId(0)
989  , globalExplicitConsumerOffset(0)
990  {
991  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
992  populate_initial_implicit_producer_hash();
993  populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
994 
995 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
996  // Track all the producers using a fully-resolved typed list for
997  // each kind; this makes it possible to debug them starting from
998  // the root queue object (otherwise wacky casts are needed that
999  // don't compile in the debugger's expression evaluator).
1000  explicitProducers.store(nullptr, std::memory_order_relaxed);
1001  implicitProducers.store(nullptr, std::memory_order_relaxed);
1002 #endif
1003  }
1004 
1005  // Computes the correct amount of pre-allocated blocks for you based
1006  // on the minimum number of elements you want available at any given
1007  // time, and the maximum concurrent number of each type of producer.
1008  ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
1009  : producerListTail(nullptr)
1010  , producerCount(0)
1011  , initialBlockPoolIndex(0)
1012  , nextExplicitConsumerId(0)
1013  , globalExplicitConsumerOffset(0)
1014  {
1015  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
1016  populate_initial_implicit_producer_hash();
1017  size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers + maxImplicitProducers);
1018  populate_initial_block_list(blocks);
1019 
1020 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
1021  explicitProducers.store(nullptr, std::memory_order_relaxed);
1022  implicitProducers.store(nullptr, std::memory_order_relaxed);
1023 #endif
1024  }
1025 
1026  // Note: The queue should not be accessed concurrently while it's
1027  // being deleted. It's up to the user to synchronize this.
1028  // This method is not thread safe.
1030  {
1031  // Destroy producers
1032  auto ptr = producerListTail.load(std::memory_order_relaxed);
1033  while (ptr != nullptr) {
1034  auto next = ptr->next_prod();
1035  if (ptr->token != nullptr) {
1036  ptr->token->producer = nullptr;
1037  }
1038  destroy(ptr);
1039  ptr = next;
1040  }
1041 
1042  // Destroy implicit producer hash tables
1044  {
1045  auto hash = implicitProducerHash.load(std::memory_order_relaxed);
1046  while (hash != nullptr) {
1047  auto prev = hash->prev;
1048  if (prev != nullptr) { // The last hash is part of this object and was not allocated dynamically
1049  for (size_t i = 0; i != hash->capacity; ++i) {
1050  hash->entries[i].~ImplicitProducerKVP();
1051  }
1052  hash->~ImplicitProducerHash();
1053  (Traits::free)(hash);
1054  }
1055  hash = prev;
1056  }
1057  }
1058 
1059  // Destroy global free list
1060  auto block = freeList.head_unsafe();
1061  while (block != nullptr) {
1062  auto next = block->freeListNext.load(std::memory_order_relaxed);
1063  if (block->dynamicallyAllocated) {
1064  destroy(block);
1065  }
1066  block = next;
1067  }
1068 
1069  // Destroy initial free list
1070  destroy_array(initialBlockPool, initialBlockPoolSize);
1071  }
1072 
1073  // Disable copying and copy assignment
1076 
1077  // Moving is supported, but note that it is *not* a thread-safe operation.
1078  // Nobody can use the queue while it's being moved, and the memory effects
1079  // of that move must be propagated to other threads before they can use it.
1080  // Note: When a queue is moved, its tokens are still valid but can only be
1081  // used with the destination queue (i.e. semantically they are moved along
1082  // with the queue itself).
1084  : producerListTail(other.producerListTail.load(std::memory_order_relaxed)),
1085  producerCount(other.producerCount.load(std::memory_order_relaxed)),
1086  initialBlockPoolIndex(other.initialBlockPoolIndex.load(std::memory_order_relaxed)),
1087  initialBlockPool(other.initialBlockPool),
1088  initialBlockPoolSize(other.initialBlockPoolSize),
1089  freeList(std::move(other.freeList)),
1090  nextExplicitConsumerId(other.nextExplicitConsumerId.load(std::memory_order_relaxed)),
1091  globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(std::memory_order_relaxed))
1092  {
1093  // Move the other one into this, and leave the other one as an empty queue
1094  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
1095  populate_initial_implicit_producer_hash();
1096  swap_implicit_producer_hashes(other);
1097 
1098  other.producerListTail.store(nullptr, std::memory_order_relaxed);
1099  other.producerCount.store(0, std::memory_order_relaxed);
1100  other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
1101  other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
1102 
1103 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
1104  explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
1105  other.explicitProducers.store(nullptr, std::memory_order_relaxed);
1106  implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
1107  other.implicitProducers.store(nullptr, std::memory_order_relaxed);
1108 #endif
1109 
1110  other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
1111  other.initialBlockPoolSize = 0;
1112  other.initialBlockPool = nullptr;
1113 
1114  reown_producers();
1115  }
1116 
1118  {
1119  return swap_internal(other);
1120  }
1121 
1122  // Swaps this queue's state with the other's. Not thread-safe.
1123  // Swapping two queues does not invalidate their tokens, however
1124  // the tokens that were created for one queue must be used with
1125  // only the swapped queue (i.e. the tokens are tied to the
1126  // queue's movable state, not the object itself).
1128  {
1129  swap_internal(other);
1130  }
1131 
1132 private:
1133  ConcurrentQueue& swap_internal(ConcurrentQueue& other)
1134  {
1135  if (this == &other) {
1136  return *this;
1137  }
1138 
1139  details::swap_relaxed(producerListTail, other.producerListTail);
1140  details::swap_relaxed(producerCount, other.producerCount);
1141  details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex);
1142  std::swap(initialBlockPool, other.initialBlockPool);
1143  std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
1144  freeList.swap(other.freeList);
1145  details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId);
1146  details::swap_relaxed(globalExplicitConsumerOffset, other.globalExplicitConsumerOffset);
1147 
1148  swap_implicit_producer_hashes(other);
1149 
1150  reown_producers();
1151  other.reown_producers();
1152 
1153 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
1154  details::swap_relaxed(explicitProducers, other.explicitProducers);
1155  details::swap_relaxed(implicitProducers, other.implicitProducers);
1156 #endif
1157 
1158  return *this;
1159  }
1160 
1161 public:
1162  // Enqueues a single item (by copying it).
1163  // Allocates memory if required. Only fails if memory allocation fails (or implicit
1164  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
1165  // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1166  // Thread-safe.
1167  inline bool enqueue(T const& item)
1168  {
1170  return false;
1171  else return inner_enqueue<CanAlloc>(item);
1172  }
1173 
1174  // Enqueues a single item (by moving it, if possible).
1175  // Allocates memory if required. Only fails if memory allocation fails (or implicit
1176  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
1177  // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1178  // Thread-safe.
1179  inline bool enqueue(T&& item)
1180  {
1182  return false;
1183  else return inner_enqueue<CanAlloc>(std::move(item));
1184  }
1185 
1186  // Enqueues a single item (by copying it) using an explicit producer token.
1187  // Allocates memory if required. Only fails if memory allocation fails (or
1188  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1189  // Thread-safe.
1190  inline bool enqueue(producer_token_t const& token, T const& item)
1191  {
1192  return inner_enqueue<CanAlloc>(token, item);
1193  }
1194 
1195  // Enqueues a single item (by moving it, if possible) using an explicit producer token.
1196  // Allocates memory if required. Only fails if memory allocation fails (or
1197  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1198  // Thread-safe.
1199  inline bool enqueue(producer_token_t const& token, T&& item)
1200  {
1201  return inner_enqueue<CanAlloc>(token, std::move(item));
1202  }
1203 
1204  // Enqueues several items.
1205  // Allocates memory if required. Only fails if memory allocation fails (or
1206  // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
1207  // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1208  // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
1209  // Thread-safe.
1210  template <typename It>
1211  bool enqueue_bulk(It itemFirst, size_t count)
1212  {
1214  return false;
1215  else return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
1216  }
1217 
1218  // Enqueues several items using an explicit producer token.
1219  // Allocates memory if required. Only fails if memory allocation fails
1220  // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1221  // Note: Use std::make_move_iterator if the elements should be moved
1222  // instead of copied.
1223  // Thread-safe.
1224  template <typename It>
1225  bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1226  {
1227  return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
1228  }
1229 
1230  // Enqueues a single item (by copying it).
1231  // Does not allocate memory. Fails if not enough room to enqueue (or implicit
1232  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
1233  // is 0).
1234  // Thread-safe.
1235  inline bool try_enqueue(T const& item)
1236  {
1238  return false;
1239  else return inner_enqueue<CannotAlloc>(item);
1240  }
1241 
1242  // Enqueues a single item (by moving it, if possible).
1243  // Does not allocate memory (except for one-time implicit producer).
1244  // Fails if not enough room to enqueue (or implicit production is
1245  // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1246  // Thread-safe.
1247  inline bool try_enqueue(T&& item)
1248  {
1250  return false;
1251  else return inner_enqueue<CannotAlloc>(std::move(item));
1252  }
1253 
1254  // Enqueues a single item (by copying it) using an explicit producer token.
1255  // Does not allocate memory. Fails if not enough room to enqueue.
1256  // Thread-safe.
1257  inline bool try_enqueue(producer_token_t const& token, T const& item)
1258  {
1259  return inner_enqueue<CannotAlloc>(token, item);
1260  }
1261 
1262  // Enqueues a single item (by moving it, if possible) using an explicit producer token.
1263  // Does not allocate memory. Fails if not enough room to enqueue.
1264  // Thread-safe.
1265  inline bool try_enqueue(producer_token_t const& token, T&& item)
1266  {
1267  return inner_enqueue<CannotAlloc>(token, std::move(item));
1268  }
1269 
1270  // Enqueues several items.
1271  // Does not allocate memory (except for one-time implicit producer).
1272  // Fails if not enough room to enqueue (or implicit production is
1273  // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1274  // Note: Use std::make_move_iterator if the elements should be moved
1275  // instead of copied.
1276  // Thread-safe.
1277  template <typename It>
1278  bool try_enqueue_bulk(It itemFirst, size_t count)
1279  {
1281  return false;
1282  else return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1283  }
1284 
1285  // Enqueues several items using an explicit producer token.
1286  // Does not allocate memory. Fails if not enough room to enqueue.
1287  // Note: Use std::make_move_iterator if the elements should be moved
1288  // instead of copied.
1289  // Thread-safe.
1290  template <typename It>
1291  bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1292  {
1293  return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1294  }
1295 
1296  // Attempts to dequeue from the queue.
1297  // Returns false if all producer streams appeared empty at the time they
1298  // were checked (so, the queue is likely but not guaranteed to be empty).
1299  // Never allocates. Thread-safe.
1300  template <typename U>
1301  bool try_dequeue(U& item)
1302  {
1303  // Instead of simply trying each producer in turn (which could cause needless contention on the first
1304  // producer), we score them heuristically.
1305  size_t nonEmptyCount = 0;
1306  ProducerBase* best = nullptr;
1307  size_t bestSize = 0;
1308  for (auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod()) {
1309  auto size = ptr->size_approx();
1310  if (size > 0) {
1311  if (size > bestSize) {
1312  bestSize = size;
1313  best = ptr;
1314  }
1315  ++nonEmptyCount;
1316  }
1317  }
1318 
1319  // If there was at least one non-empty queue but it appears empty at the time
1320  // we try to dequeue from it, we need to make sure every queue's been tried
1321  if (nonEmptyCount > 0) {
1322  if ((details::likely)(best->dequeue(item))) {
1323  return true;
1324  }
1325  for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1326  if (ptr != best && ptr->dequeue(item)) {
1327  return true;
1328  }
1329  }
1330  }
1331  return false;
1332  }
1333 
1334  // Attempts to dequeue from the queue.
1335  // Returns false if all producer streams appeared empty at the time they
1336  // were checked (so, the queue is likely but not guaranteed to be empty).
1337  // This differs from the try_dequeue(item) method in that this one does
1338  // not attempt to reduce contention by interleaving the order that producer
1339  // streams are dequeued from. So, using this method can reduce overall throughput
1340  // under contention, but will give more predictable results in single-threaded
1341  // consumer scenarios. This is mostly only useful for internal unit tests.
1342  // Never allocates. Thread-safe.
1343  template <typename U>
1345  {
1346  for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1347  if (ptr->dequeue(item)) {
1348  return true;
1349  }
1350  }
1351  return false;
1352  }
1353 
1354  // Attempts to dequeue from the queue using an explicit consumer token.
1355  // Returns false if all producer streams appeared empty at the time they
1356  // were checked (so, the queue is likely but not guaranteed to be empty).
1357  // Never allocates. Thread-safe.
1358  template <typename U>
1359  bool try_dequeue(consumer_token_t& token, U& item)
1360  {
1361  // The idea is roughly as follows:
1362  // Every 256 items from one producer, make everyone rotate (increase the global offset) -> this means the highest efficiency consumer dictates the rotation speed of everyone else, more or less
1363  // If you see that the global offset has changed, you must reset your consumption counter and move to your designated place
1364  // If there's no items where you're supposed to be, keep moving until you find a producer with some items
1365  // If the global offset has not changed but you've run out of items to consume, move over from your current position until you find an producer with something in it
1366 
1367  if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1368  if (!update_current_producer_after_rotation(token)) {
1369  return false;
1370  }
1371  }
1372 
1373  // If there was at least one non-empty queue but it appears empty at the time
1374  // we try to dequeue from it, we need to make sure every queue's been tried
1375  if (static_cast<ProducerBase*>(token.currentProducer)->dequeue(item)) {
1376  if (++token.itemsConsumedFromCurrent == EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1377  globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1378  }
1379  return true;
1380  }
1381 
1382  auto tail = producerListTail.load(std::memory_order_acquire);
1383  auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
1384  if (ptr == nullptr) {
1385  ptr = tail;
1386  }
1387  while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
1388  if (ptr->dequeue(item)) {
1389  token.currentProducer = ptr;
1390  token.itemsConsumedFromCurrent = 1;
1391  return true;
1392  }
1393  ptr = ptr->next_prod();
1394  if (ptr == nullptr) {
1395  ptr = tail;
1396  }
1397  }
1398  return false;
1399  }
1400 
1401  // Attempts to dequeue several elements from the queue.
1402  // Returns the number of items actually dequeued.
1403  // Returns 0 if all producer streams appeared empty at the time they
1404  // were checked (so, the queue is likely but not guaranteed to be empty).
1405  // Never allocates. Thread-safe.
1406  template <typename It>
1407  size_t try_dequeue_bulk(It itemFirst, size_t max)
1408  {
1409  size_t count = 0;
1410  for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1411  count += ptr->dequeue_bulk(itemFirst, max - count);
1412  if (count == max) {
1413  break;
1414  }
1415  }
1416  return count;
1417  }
1418 
1419  // Attempts to dequeue several elements from the queue using an explicit consumer token.
1420  // Returns the number of items actually dequeued.
1421  // Returns 0 if all producer streams appeared empty at the time they
1422  // were checked (so, the queue is likely but not guaranteed to be empty).
1423  // Never allocates. Thread-safe.
1424  template <typename It>
1425  size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
1426  {
1427  if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1428  if (!update_current_producer_after_rotation(token)) {
1429  return 0;
1430  }
1431  }
1432 
1433  size_t count = static_cast<ProducerBase*>(token.currentProducer)->dequeue_bulk(itemFirst, max);
1434  if (count == max) {
1435  if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1436  globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1437  }
1438  return max;
1439  }
1440  token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count);
1441  max -= count;
1442 
1443  auto tail = producerListTail.load(std::memory_order_acquire);
1444  auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
1445  if (ptr == nullptr) {
1446  ptr = tail;
1447  }
1448  while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
1449  auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1450  count += dequeued;
1451  if (dequeued != 0) {
1452  token.currentProducer = ptr;
1453  token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued);
1454  }
1455  if (dequeued == max) {
1456  break;
1457  }
1458  max -= dequeued;
1459  ptr = ptr->next_prod();
1460  if (ptr == nullptr) {
1461  ptr = tail;
1462  }
1463  }
1464  return count;
1465  }
1466 
1467  // Attempts to dequeue from a specific producer's inner queue.
1468  // If you happen to know which producer you want to dequeue from, this
1469  // is significantly faster than using the general-case try_dequeue methods.
1470  // Returns false if the producer's queue appeared empty at the time it
1471  // was checked (so, the queue is likely but not guaranteed to be empty).
1472  // Never allocates. Thread-safe.
1473  template <typename U>
1474  inline bool try_dequeue_from_producer(producer_token_t const& producer, U& item)
1475  {
1476  return static_cast<ExplicitProducer*>(producer.producer)->dequeue(item);
1477  }
1478 
1479  // Attempts to dequeue several elements from a specific producer's inner queue.
1480  // Returns the number of items actually dequeued.
1481  // If you happen to know which producer you want to dequeue from, this
1482  // is significantly faster than using the general-case try_dequeue methods.
1483  // Returns 0 if the producer's queue appeared empty at the time it
1484  // was checked (so, the queue is likely but not guaranteed to be empty).
1485  // Never allocates. Thread-safe.
1486  template <typename It>
1487  inline size_t try_dequeue_bulk_from_producer(producer_token_t const& producer, It itemFirst, size_t max)
1488  {
1489  return static_cast<ExplicitProducer*>(producer.producer)->dequeue_bulk(itemFirst, max);
1490  }
1491 
1492  // Returns an estimate of the total number of elements currently in the queue. This
1493  // estimate is only accurate if the queue has completely stabilized before it is called
1494  // (i.e. all enqueue and dequeue operations have completed and their memory effects are
1495  // visible on the calling thread, and no further operations start while this method is
1496  // being called).
1497  // Thread-safe.
1498  size_t size_approx() const
1499  {
1500  size_t size = 0;
1501  for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1502  size += ptr->size_approx();
1503  }
1504  return size;
1505  }
1506 
1507  // Returns true if the underlying atomic variables used by
1508  // the queue are lock-free (they should be on most platforms).
1509  // Thread-safe.
1510  static constexpr bool is_lock_free()
1511  {
1513  }
1514 
1515 private:
1516  friend struct ProducerToken;
1517  friend struct ConsumerToken;
1519  friend struct ExplicitProducer;
1521  friend struct ImplicitProducer;
1522  friend class ConcurrentQueueTests;
1523 
1524  enum AllocationMode { CanAlloc,
1525  CannotAlloc };
1526 
1528  // Queue methods
1530 
1531  template <AllocationMode canAlloc, typename U>
1532  inline bool inner_enqueue(producer_token_t const& token, U&& element)
1533  {
1534  return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1535  }
1536 
1537  template <AllocationMode canAlloc, typename U>
1538  inline bool inner_enqueue(U&& element)
1539  {
1540  auto producer = get_or_add_implicit_producer();
1541  return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1542  }
1543 
1544  template <AllocationMode canAlloc, typename It>
1545  inline bool inner_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1546  {
1547  return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1548  }
1549 
1550  template <AllocationMode canAlloc, typename It>
1551  inline bool inner_enqueue_bulk(It itemFirst, size_t count)
1552  {
1553  auto producer = get_or_add_implicit_producer();
1554  return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1555  }
1556 
1557  inline bool update_current_producer_after_rotation(consumer_token_t& token)
1558  {
1559  // Ah, there's been a rotation, figure out where we should be!
1560  auto tail = producerListTail.load(std::memory_order_acquire);
1561  if (token.desiredProducer == nullptr && tail == nullptr) {
1562  return false;
1563  }
1564  auto prodCount = producerCount.load(std::memory_order_relaxed);
1565  auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
1566  if ((details::unlikely)(token.desiredProducer == nullptr)) {
1567  // Aha, first time we're dequeueing anything.
1568  // Figure out our local position
1569  // Note: offset is from start, not end, but we're traversing from end -- subtract from count first
1570  std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
1571  token.desiredProducer = tail;
1572  for (std::uint32_t i = 0; i != offset; ++i) {
1573  token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
1574  if (token.desiredProducer == nullptr) {
1575  token.desiredProducer = tail;
1576  }
1577  }
1578  }
1579 
1580  std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1581  if (delta >= prodCount) {
1582  delta = delta % prodCount;
1583  }
1584  for (std::uint32_t i = 0; i != delta; ++i) {
1585  token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
1586  if (token.desiredProducer == nullptr) {
1587  token.desiredProducer = tail;
1588  }
1589  }
1590 
1591  token.lastKnownGlobalOffset = globalOffset;
1592  token.currentProducer = token.desiredProducer;
1593  token.itemsConsumedFromCurrent = 0;
1594  return true;
1595  }
1596 
1598  // Free list
1600 
1601  template <typename N>
1602  struct FreeListNode {
1603  FreeListNode()
1604  : freeListRefs(0)
1605  , freeListNext(nullptr)
1606  {
1607  }
1608 
1609  std::atomic<std::uint32_t> freeListRefs;
1610  std::atomic<N*> freeListNext;
1611  };
1612 
1613  // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but
1614  // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly
1615  // speedy under low contention.
1616  template <typename N> // N must inherit FreeListNode or have the same fields (and initialization of them)
1617  struct FreeList {
1618  FreeList()
1619  : freeListHead(nullptr)
1620  {
1621  }
1622  FreeList(FreeList&& other)
1623  : freeListHead(other.freeListHead.load(std::memory_order_relaxed))
1624  {
1625  other.freeListHead.store(nullptr, std::memory_order_relaxed);
1626  }
1627  void swap(FreeList& other) { details::swap_relaxed(freeListHead, other.freeListHead); }
1628 
1629  FreeList(FreeList const&) MOODYCAMEL_DELETE_FUNCTION;
1630  FreeList& operator=(FreeList const&) MOODYCAMEL_DELETE_FUNCTION;
1631 
1632  inline void add(N* node)
1633  {
1634 #ifdef MCDBGQ_NOLOCKFREE_FREELIST
1635  debug::DebugLock lock(mutex);
1636 #endif
1637  // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to
1638  // set it using a fetch_add
1639  if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
1640  // Oh look! We were the last ones referencing this node, and we know
1641  // we want to add it to the free list, so let's do it!
1642  add_knowing_refcount_is_zero(node);
1643  }
1644  }
1645 
1646  inline N* try_get()
1647  {
1648 #ifdef MCDBGQ_NOLOCKFREE_FREELIST
1649  debug::DebugLock lock(mutex);
1650 #endif
1651  auto head = freeListHead.load(std::memory_order_acquire);
1652  while (head != nullptr) {
1653  auto prevHead = head;
1654  auto refs = head->freeListRefs.load(std::memory_order_relaxed);
1655  if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) {
1656  head = freeListHead.load(std::memory_order_acquire);
1657  continue;
1658  }
1659 
1660  // Good, reference count has been incremented (it wasn't at zero), which means we can read the
1661  // next and not worry about it changing between now and the time we do the CAS
1662  auto next = head->freeListNext.load(std::memory_order_relaxed);
1663  if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
1664  // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no
1665  // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on).
1666  assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
1667 
1668  // Decrease refcount twice, once for our ref, and once for the list's ref
1669  head->freeListRefs.fetch_sub(2, std::memory_order_release);
1670  return head;
1671  }
1672 
1673  // OK, the head must have changed on us, but we still need to decrease the refcount we increased.
1674  // Note that we don't need to release any memory effects, but we do need to ensure that the reference
1675  // count decrement happens-after the CAS on the head.
1676  refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1677  if (refs == SHOULD_BE_ON_FREELIST + 1) {
1678  add_knowing_refcount_is_zero(prevHead);
1679  }
1680  }
1681 
1682  return nullptr;
1683  }
1684 
1685  // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes)
1686  N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); }
1687 
1688  private:
1689  inline void add_knowing_refcount_is_zero(N* node)
1690  {
1691  // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we run
1692  // only one copy of this method per node at a time, i.e. the single thread case), then we know
1693  // we can safely change the next pointer of the node; however, once the refcount is back above
1694  // zero, then other threads could increase it (happens under heavy contention, when the refcount
1695  // goes to zero in between a load and a refcount increment of a node in try_get, then back up to
1696  // something non-zero, then the refcount increment is done by the other thread) -- so, if the CAS
1697  // to add the node to the actual list fails, decrease the refcount and leave the add operation to
1698  // the next thread who puts the refcount back at zero (which could be us, hence the loop).
1699  auto head = freeListHead.load(std::memory_order_relaxed);
1700  while (true) {
1701  node->freeListNext.store(head, std::memory_order_relaxed);
1702  node->freeListRefs.store(1, std::memory_order_release);
1703  if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
1704  // Hmm, the add failed, but we can only try again when the refcount goes back to zero
1705  if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
1706  continue;
1707  }
1708  }
1709  return;
1710  }
1711  }
1712 
1713  private:
1714  // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention)
1715  std::atomic<N*> freeListHead;
1716 
1717  static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1718  static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1719 
1720 #ifdef MCDBGQ_NOLOCKFREE_FREELIST
1721  debug::DebugMutex mutex;
1722 #endif
1723  };
1724 
1726  // Block
1728 
1729  enum InnerQueueContext { implicit_context = 0,
1730  explicit_context = 1 };
1731 
1732  struct Block {
1733  Block()
1734  : next(nullptr)
1735  , elementsCompletelyDequeued(0)
1736  , freeListRefs(0)
1737  , freeListNext(nullptr)
1738  , dynamicallyAllocated(true)
1739  {
1740 #ifdef MCDBGQ_TRACKMEM
1741  owner = nullptr;
1742 #endif
1743  }
1744 
1745  template <InnerQueueContext context>
1746  inline bool is_empty() const
1747  {
1749  {
1750  // Check flags
1751  for (size_t i = 0; i < BLOCK_SIZE; ++i) {
1752  if (!emptyFlags[i].load(std::memory_order_relaxed)) {
1753  return false;
1754  }
1755  }
1756 
1757  // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set
1758  std::atomic_thread_fence(std::memory_order_acquire);
1759  return true;
1760  }
1761  else
1762  {
1763  // Check counter
1764  if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
1765  std::atomic_thread_fence(std::memory_order_acquire);
1766  return true;
1767  }
1768  assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
1769  return false;
1770  }
1771  }
1772 
1773  // Returns true if the block is now empty (does not apply in explicit context)
1774  template <InnerQueueContext context>
1775  inline bool set_empty(MOODYCAMEL_MAYBE_UNUSED index_t i)
1776  {
1778  {
1779  // Set flag
1780  assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(std::memory_order_relaxed));
1781  emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(true, std::memory_order_release);
1782  return false;
1783  }
1784  else
1785  {
1786  // Increment counter
1787  auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
1788  assert(prevVal < BLOCK_SIZE);
1789  return prevVal == BLOCK_SIZE - 1;
1790  }
1791  }
1792 
1793  // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0).
1794  // Returns true if the block is now empty (does not apply in explicit context).
1795  template <InnerQueueContext context>
1796  inline bool set_many_empty(MOODYCAMEL_MAYBE_UNUSED index_t i, size_t count)
1797  {
1799  {
1800  // Set flags
1801  std::atomic_thread_fence(std::memory_order_release);
1802  i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count + 1;
1803  for (size_t j = 0; j != count; ++j) {
1804  assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
1805  emptyFlags[i + j].store(true, std::memory_order_relaxed);
1806  }
1807  return false;
1808  }
1809  else
1810  {
1811  // Increment counter
1812  auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
1813  assert(prevVal + count <= BLOCK_SIZE);
1814  return prevVal + count == BLOCK_SIZE;
1815  }
1816  }
1817 
1818  template <InnerQueueContext context>
1819  inline void set_all_empty()
1820  {
1822  {
1823  // Set all flags
1824  for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1825  emptyFlags[i].store(true, std::memory_order_relaxed);
1826  }
1827  }
1828  else
1829  {
1830  // Reset counter
1831  elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
1832  }
1833  }
1834 
1835  template <InnerQueueContext context>
1836  inline void reset_empty()
1837  {
1839  {
1840  // Reset flags
1841  for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1842  emptyFlags[i].store(false, std::memory_order_relaxed);
1843  }
1844  }
1845  else
1846  {
1847  // Reset counter
1848  elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
1849  }
1850  }
1851 
1852  inline T* operator[](index_t idx) MOODYCAMEL_NOEXCEPT { return static_cast<T*>(static_cast<void*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
1853  inline T const* operator[](index_t idx) const MOODYCAMEL_NOEXCEPT { return static_cast<T const*>(static_cast<void const*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
1854 
1855  private:
1856  static_assert(std::alignment_of<T>::value <= sizeof(T), "The queue does not support types with an alignment greater than their size at this time");
1857  MOODYCAMEL_ALIGNED_TYPE_LIKE(char[sizeof(T) * BLOCK_SIZE], T)
1858  elements;
1859 
1860  public:
1861  Block* next;
1862  std::atomic<size_t> elementsCompletelyDequeued;
1863  std::atomic<bool> emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
1864 
1865  public:
1866  std::atomic<std::uint32_t> freeListRefs;
1867  std::atomic<Block*> freeListNext;
1868  bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool'
1869 
1870 #ifdef MCDBGQ_TRACKMEM
1871  void* owner;
1872 #endif
1873  };
1874  static_assert(std::alignment_of<Block>::value >= std::alignment_of<T>::value, "Internal error: Blocks must be at least as aligned as the type they are wrapping");
1875 
1876 #ifdef MCDBGQ_TRACKMEM
1877 public:
1878  struct MemStats;
1879 
1880 private:
1881 #endif
1882 
1884  // Producer base
1886 
1887  struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase {
1888  ProducerBase(ConcurrentQueue* parent_, bool isExplicit_)
1889  : tailIndex(0)
1890  , headIndex(0)
1891  , dequeueOptimisticCount(0)
1892  , dequeueOvercommit(0)
1893  , tailBlock(nullptr)
1894  , isExplicit(isExplicit_)
1895  , parent(parent_)
1896  {
1897  }
1898 
1899  virtual ~ProducerBase() {}
1900 
1901  template <typename U>
1902  inline bool dequeue(U& element)
1903  {
1904  if (isExplicit) {
1905  return static_cast<ExplicitProducer*>(this)->dequeue(element);
1906  } else {
1907  return static_cast<ImplicitProducer*>(this)->dequeue(element);
1908  }
1909  }
1910 
1911  template <typename It>
1912  inline size_t dequeue_bulk(It& itemFirst, size_t max)
1913  {
1914  if (isExplicit) {
1915  return static_cast<ExplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
1916  } else {
1917  return static_cast<ImplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
1918  }
1919  }
1920 
1921  inline ProducerBase* next_prod() const { return static_cast<ProducerBase*>(next); }
1922 
1923  inline size_t size_approx() const
1924  {
1925  auto tail = tailIndex.load(std::memory_order_relaxed);
1926  auto head = headIndex.load(std::memory_order_relaxed);
1927  return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0;
1928  }
1929 
1930  inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); }
1931 
1932  protected:
1933  std::atomic<index_t> tailIndex; // Where to enqueue to next
1934  std::atomic<index_t> headIndex; // Where to dequeue from next
1935 
1936  std::atomic<index_t> dequeueOptimisticCount;
1937  std::atomic<index_t> dequeueOvercommit;
1938 
1939  Block* tailBlock;
1940 
1941  public:
1942  bool isExplicit;
1943  ConcurrentQueue* parent;
1944 
1945  protected:
1946 #ifdef MCDBGQ_TRACKMEM
1947  friend struct MemStats;
1948 #endif
1949  };
1950 
1952  // Explicit queue
1954 
1955  struct ExplicitProducer : public ProducerBase {
1956  explicit ExplicitProducer(ConcurrentQueue* parent_)
1957  : ProducerBase(parent_, true)
1958  , blockIndex(nullptr)
1959  , pr_blockIndexSlotsUsed(0)
1960  , pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1)
1961  , pr_blockIndexFront(0)
1962  , pr_blockIndexEntries(nullptr)
1963  , pr_blockIndexRaw(nullptr)
1964  {
1965  size_t poolBasedIndexSize = details::ceil_to_pow_2(parent_->initialBlockPoolSize) >> 1;
1966  if (poolBasedIndexSize > pr_blockIndexSize) {
1967  pr_blockIndexSize = poolBasedIndexSize;
1968  }
1969 
1970  new_block_index(0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE
1971  }
1972 
1973  ~ExplicitProducer()
1974  {
1975  // Destruct any elements not yet dequeued.
1976  // Since we're in the destructor, we can assume all elements
1977  // are either completely dequeued or completely not (no halfways).
1978  if (this->tailBlock != nullptr) { // Note this means there must be a block index too
1979  // First find the block that's partially dequeued, if any
1980  Block* halfDequeuedBlock = nullptr;
1981  if ((this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) != 0) {
1982  // The head's not on a block boundary, meaning a block somewhere is partially dequeued
1983  // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a boundary)
1984  size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
1985  while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) {
1986  i = (i + 1) & (pr_blockIndexSize - 1);
1987  }
1988  assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed)));
1989  halfDequeuedBlock = pr_blockIndexEntries[i].block;
1990  }
1991 
1992  // Start at the head block (note the first line in the loop gives us the head from the tail on the first iteration)
1993  auto block = this->tailBlock;
1994  do {
1995  block = block->next;
1996  if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1997  continue;
1998  }
1999 
2000  size_t i = 0; // Offset into block
2001  if (block == halfDequeuedBlock) {
2002  i = static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
2003  }
2004 
2005  // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index
2006  auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE : static_cast<size_t>(this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
2007  while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
2008  (*block)[i++]->~T();
2009  }
2010  } while (block != this->tailBlock);
2011  }
2012 
2013  // Destroy all blocks that we own
2014  if (this->tailBlock != nullptr) {
2015  auto block = this->tailBlock;
2016  do {
2017  auto nextBlock = block->next;
2018  this->parent->add_block_to_free_list(block);
2019  block = nextBlock;
2020  } while (block != this->tailBlock);
2021  }
2022 
2023  // Destroy the block indices
2024  auto header = static_cast<BlockIndexHeader*>(pr_blockIndexRaw);
2025  while (header != nullptr) {
2026  auto prev = static_cast<BlockIndexHeader*>(header->prev);
2027  header->~BlockIndexHeader();
2028  (Traits::free)(header);
2029  header = prev;
2030  }
2031  }
2032 
2033  template <AllocationMode allocMode, typename U>
2034  inline bool enqueue(U&& element)
2035  {
2036  index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2037  index_t newTailIndex = 1 + currentTailIndex;
2038  if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2039  // We reached the end of a block, start a new one
2040  auto startBlock = this->tailBlock;
2041  auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2042  if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2043  // We can re-use the block ahead of us, it's empty!
2044  this->tailBlock = this->tailBlock->next;
2045  this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2046 
2047  // We'll put the block on the block index (guaranteed to be room since we're conceptually removing the
2048  // last block from it first -- except instead of removing then adding, we can just overwrite).
2049  // Note that there must be a valid block index here, since even if allocation failed in the ctor,
2050  // it would have been re-attempted when adding the first block to the queue; since there is such
2051  // a block, a block index must have been successfully allocated.
2052  } else {
2053  // Whatever head value we see here is >= the last value we saw here (relatively),
2054  // and <= its current value. Since we have the most recent tail, the head must be
2055  // <= to it.
2056  auto head = this->headIndex.load(std::memory_order_relaxed);
2057  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2058  if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
2060  // We can't enqueue in another block because there's not enough leeway -- the
2061  // tail could surpass the head by the time the block fills up! (Or we'll exceed
2062  // the size limit, if the second part of the condition was true.)
2063  return false;
2064  }
2065  // We're going to need a new block; check that the block index has room
2066  if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
2067  // Hmm, the circular block index is already full -- we'll need
2068  // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if
2069  // the initial allocation failed in the constructor.
2070 
2071  MOODYCAMEL_CONSTEXPR_IF(allocMode == CannotAlloc)
2072  {
2073  return false;
2074  }
2075  else if (!new_block_index(pr_blockIndexSlotsUsed))
2076  {
2077  return false;
2078  }
2079  }
2080 
2081  // Insert a new block in the circular linked list
2082  auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2083  if (newBlock == nullptr) {
2084  return false;
2085  }
2086 #ifdef MCDBGQ_TRACKMEM
2087  newBlock->owner = this;
2088 #endif
2089  newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2090  if (this->tailBlock == nullptr) {
2091  newBlock->next = newBlock;
2092  } else {
2093  newBlock->next = this->tailBlock->next;
2094  this->tailBlock->next = newBlock;
2095  }
2096  this->tailBlock = newBlock;
2097  ++pr_blockIndexSlotsUsed;
2098  }
2099 
2100  MOODYCAMEL_CONSTEXPR_IF(!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<U>(element))))
2101  {
2102  // The constructor may throw. We want the element not to appear in the queue in
2103  // that case (without corrupting the queue):
2105  {
2106  new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2107  }
2108  MOODYCAMEL_CATCH(...)
2109  {
2110  // Revert change to the current block, but leave the new block available
2111  // for next time
2112  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2113  this->tailBlock = startBlock == nullptr ? this->tailBlock : startBlock;
2115  }
2116  }
2117  else
2118  {
2119  (void)startBlock;
2120  (void)originalBlockIndexSlotsUsed;
2121  }
2122 
2123  // Add block to block index
2124  auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2125  entry.base = currentTailIndex;
2126  entry.block = this->tailBlock;
2127  blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
2128  pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2129 
2130  MOODYCAMEL_CONSTEXPR_IF(!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<U>(element))))
2131  {
2132  this->tailIndex.store(newTailIndex, std::memory_order_release);
2133  return true;
2134  }
2135  }
2136 
2137  // Enqueue
2138  new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2139 
2140  this->tailIndex.store(newTailIndex, std::memory_order_release);
2141  return true;
2142  }
2143 
2144  template <typename U>
2145  bool dequeue(U& element)
2146  {
2147  auto tail = this->tailIndex.load(std::memory_order_relaxed);
2148  auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2149  if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2150  // Might be something to dequeue, let's give it a try
2151 
2152  // Note that this if is purely for performance purposes in the common case when the queue is
2153  // empty and the values are eventually consistent -- we may enter here spuriously.
2154 
2155  // Note that whatever the values of overcommit and tail are, they are not going to change (unless we
2156  // change them) and must be the same value at this point (inside the if) as when the if condition was
2157  // evaluated.
2158 
2159  // We insert an acquire fence here to synchronize-with the release upon incrementing dequeueOvercommit below.
2160  // This ensures that whatever the value we got loaded into overcommit, the load of dequeueOptisticCount in
2161  // the fetch_add below will result in a value at least as recent as that (and therefore at least as large).
2162  // Note that I believe a compiler (signal) fence here would be sufficient due to the nature of fetch_add (all
2163  // read-modify-write operations are guaranteed to work on the latest value in the modification order), but
2164  // unfortunately that can't be shown to be correct using only the C++11 standard.
2165  // See http://stackoverflow.com/questions/18223161/what-are-the-c11-memory-ordering-guarantees-in-this-corner-case
2166  std::atomic_thread_fence(std::memory_order_acquire);
2167 
2168  // Increment optimistic counter, then check if it went over the boundary
2169  auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2170 
2171  // Note that since dequeueOvercommit must be <= dequeueOptimisticCount (because dequeueOvercommit is only ever
2172  // incremented after dequeueOptimisticCount -- this is enforced in the `else` block below), and since we now
2173  // have a version of dequeueOptimisticCount that is at least as recent as overcommit (due to the release upon
2174  // incrementing dequeueOvercommit and the acquire above that synchronizes with it), overcommit <= myDequeueCount.
2175  // However, we can't assert this since both dequeueOptimisticCount and dequeueOvercommit may (independently)
2176  // overflow; in such a case, though, the logic still holds since the difference between the two is maintained.
2177 
2178  // Note that we reload tail here in case it changed; it will be the same value as before or greater, since
2179  // this load is sequenced after (happens after) the earlier load above. This is supported by read-read
2180  // coherency (as defined in the standard), explained here: http://en.cppreference.com/w/cpp/atomic/memory_order
2181  tail = this->tailIndex.load(std::memory_order_acquire);
2182  if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2183  // Guaranteed to be at least one element to dequeue!
2184 
2185  // Get the index. Note that since there's guaranteed to be at least one element, this
2186  // will never exceed tail. We need to do an acquire-release fence here since it's possible
2187  // that whatever condition got us to this point was for an earlier enqueued element (that
2188  // we already see the memory effects for), but that by the time we increment somebody else
2189  // has incremented it, and we need to see the memory effects for *that* element, which is
2190  // in such a case is necessarily visible on the thread that incremented it in the first
2191  // place with the more current condition (they must have acquired a tail that is at least
2192  // as recent).
2193  auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2194 
2195  // Determine which block the element is in
2196 
2197  auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2198  auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2199 
2200  // We need to be careful here about subtracting and dividing because of index wrap-around.
2201  // When an index wraps, we need to preserve the sign of the offset when dividing it by the
2202  // block size (in order to get a correct signed block count offset in all cases):
2203  auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2204  auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
2205  auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(blockBaseIndex - headBase) / static_cast<typename std::make_signed<index_t>::type>(BLOCK_SIZE));
2206  auto block = localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block;
2207 
2208  // Dequeue
2209  auto& el = *((*block)[index]);
2210  if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) {
2211  // Make sure the element is still fully dequeued and destroyed even if the assignment
2212  // throws
2213  struct Guard {
2214  Block* block;
2215  index_t index;
2216 
2217  ~Guard()
2218  {
2219  (*block)[index]->~T();
2220  block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2221  }
2222  } guard = { block, index };
2223 
2224  element = std::move(el); // NOLINT
2225  } else {
2226  element = std::move(el); // NOLINT
2227  el.~T(); // NOLINT
2228  block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2229  }
2230 
2231  return true;
2232  } else {
2233  // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
2234  this->dequeueOvercommit.fetch_add(1, std::memory_order_release); // Release so that the fetch_add on dequeueOptimisticCount is guaranteed to happen before this write
2235  }
2236  }
2237 
2238  return false;
2239  }
2240 
2241  template <AllocationMode allocMode, typename It>
2242  bool MOODYCAMEL_NO_TSAN enqueue_bulk(It itemFirst, size_t count)
2243  {
2244  // First, we need to make sure we have enough room to enqueue all of the elements;
2245  // this means pre-allocating blocks and putting them in the block index (but only if
2246  // all the allocations succeeded).
2247  index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2248  auto startBlock = this->tailBlock;
2249  auto originalBlockIndexFront = pr_blockIndexFront;
2250  auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2251 
2252  Block* firstAllocatedBlock = nullptr;
2253 
2254  // Figure out how many blocks we'll need to allocate, and do so
2255  size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2256  index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2257  if (blockBaseDiff > 0) {
2258  // Allocate as many blocks as possible from ahead
2259  while (blockBaseDiff > 0 && this->tailBlock != nullptr && this->tailBlock->next != firstAllocatedBlock && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2260  blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2261  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2262 
2263  this->tailBlock = this->tailBlock->next;
2264  firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2265 
2266  auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2267  entry.base = currentTailIndex;
2268  entry.block = this->tailBlock;
2269  pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2270  }
2271 
2272  // Now allocate as many blocks as necessary from the block pool
2273  while (blockBaseDiff > 0) {
2274  blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2275  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2276 
2277  auto head = this->headIndex.load(std::memory_order_relaxed);
2278  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2279  bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2280  if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2281  MOODYCAMEL_CONSTEXPR_IF(allocMode == CannotAlloc)
2282  {
2283  // Failed to allocate, undo changes (but keep injected blocks)
2284  pr_blockIndexFront = originalBlockIndexFront;
2285  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2286  this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2287  return false;
2288  }
2289  else if (full || !new_block_index(originalBlockIndexSlotsUsed))
2290  {
2291  // Failed to allocate, undo changes (but keep injected blocks)
2292  pr_blockIndexFront = originalBlockIndexFront;
2293  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2294  this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2295  return false;
2296  }
2297 
2298  // pr_blockIndexFront is updated inside new_block_index, so we need to
2299  // update our fallback value too (since we keep the new index even if we
2300  // later fail)
2301  originalBlockIndexFront = originalBlockIndexSlotsUsed;
2302  }
2303 
2304  // Insert a new block in the circular linked list
2305  auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2306  if (newBlock == nullptr) {
2307  pr_blockIndexFront = originalBlockIndexFront;
2308  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2309  this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2310  return false;
2311  }
2312 
2313 #ifdef MCDBGQ_TRACKMEM
2314  newBlock->owner = this;
2315 #endif
2316  newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>();
2317  if (this->tailBlock == nullptr) {
2318  newBlock->next = newBlock;
2319  } else {
2320  newBlock->next = this->tailBlock->next;
2321  this->tailBlock->next = newBlock;
2322  }
2323  this->tailBlock = newBlock;
2324  firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2325 
2326  ++pr_blockIndexSlotsUsed;
2327 
2328  auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2329  entry.base = currentTailIndex;
2330  entry.block = this->tailBlock;
2331  pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2332  }
2333 
2334  // Excellent, all allocations succeeded. Reset each block's emptiness before we fill them up, and
2335  // publish the new block index front
2336  auto block = firstAllocatedBlock;
2337  while (true) {
2338  block->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2339  if (block == this->tailBlock) {
2340  break;
2341  }
2342  block = block->next;
2343  }
2344 
2345  MOODYCAMEL_CONSTEXPR_IF(MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst))))
2346  {
2347  blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2348  }
2349  }
2350 
2351  // Enqueue, one block at a time
2352  index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2353  currentTailIndex = startTailIndex;
2354  auto endBlock = this->tailBlock;
2355  this->tailBlock = startBlock;
2356  assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
2357  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2358  this->tailBlock = firstAllocatedBlock;
2359  }
2360  while (true) {
2361  index_t stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2362  if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2363  stopIndex = newTailIndex;
2364  }
2365  MOODYCAMEL_CONSTEXPR_IF(MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst))))
2366  {
2367  while (currentTailIndex != stopIndex) {
2368  new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2369  }
2370  }
2371  else
2372  {
2374  {
2375  while (currentTailIndex != stopIndex) {
2376  // Must use copy constructor even if move constructor is available
2377  // because we may have to revert if there's an exception.
2378  // Sorry about the horrible templated next line, but it was the only way
2379  // to disable moving *at compile time*, which is important because a type
2380  // may only define a (noexcept) move constructor, and so calls to the
2381  // cctor will not compile, even if they are in an if branch that will never
2382  // be executed
2383  new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2384  ++currentTailIndex;
2385  ++itemFirst;
2386  }
2387  }
2388  MOODYCAMEL_CATCH(...)
2389  {
2390  // Oh dear, an exception's been thrown -- destroy the elements that
2391  // were enqueued so far and revert the entire bulk operation (we'll keep
2392  // any allocated blocks in our linked list for later, though).
2393  auto constructedStopIndex = currentTailIndex;
2394  auto lastBlockEnqueued = this->tailBlock;
2395 
2396  pr_blockIndexFront = originalBlockIndexFront;
2397  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2398  this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2399 
2400  if (!details::is_trivially_destructible<T>::value) {
2401  auto block = startBlock;
2402  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2403  block = firstAllocatedBlock;
2404  }
2405  currentTailIndex = startTailIndex;
2406  while (true) {
2407  stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2408  if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2409  stopIndex = constructedStopIndex;
2410  }
2411  while (currentTailIndex != stopIndex) {
2412  (*block)[currentTailIndex++]->~T();
2413  }
2414  if (block == lastBlockEnqueued) {
2415  break;
2416  }
2417  block = block->next;
2418  }
2419  }
2421  }
2422  }
2423 
2424  if (this->tailBlock == endBlock) {
2425  assert(currentTailIndex == newTailIndex);
2426  break;
2427  }
2428  this->tailBlock = this->tailBlock->next;
2429  }
2430 
2431  MOODYCAMEL_CONSTEXPR_IF(!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst))))
2432  {
2433  if (firstAllocatedBlock != nullptr)
2434  blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2435  }
2436 
2437  this->tailIndex.store(newTailIndex, std::memory_order_release);
2438  return true;
2439  }
2440 
2441  template <typename It>
2442  size_t dequeue_bulk(It& itemFirst, size_t max)
2443  {
2444  auto tail = this->tailIndex.load(std::memory_order_relaxed);
2445  auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2446  auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2447  if (details::circular_less_than<size_t>(0, desiredCount)) {
2448  desiredCount = desiredCount < max ? desiredCount : max;
2449  std::atomic_thread_fence(std::memory_order_acquire);
2450 
2451  auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2452 
2453  tail = this->tailIndex.load(std::memory_order_acquire);
2454  auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2455  if (details::circular_less_than<size_t>(0, actualCount)) {
2456  actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2457  if (actualCount < desiredCount) {
2458  this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2459  }
2460 
2461  // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2462  // will never exceed tail.
2463  auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2464 
2465  // Determine which block the first element is in
2466  auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2467  auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2468 
2469  auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2470  auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
2471  auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) / static_cast<typename std::make_signed<index_t>::type>(BLOCK_SIZE));
2472  auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2473 
2474  // Iterate the blocks and dequeue
2475  auto index = firstIndex;
2476  do {
2477  auto firstIndexInBlock = index;
2478  index_t endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2479  endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2480  auto block = localBlockIndex->entries[indexIndex].block;
2481  if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
2482  while (index != endIndex) {
2483  auto& el = *((*block)[index]);
2484  *itemFirst++ = std::move(el);
2485  el.~T();
2486  ++index;
2487  }
2488  } else {
2490  {
2491  while (index != endIndex) {
2492  auto& el = *((*block)[index]);
2493  *itemFirst = std::move(el);
2494  ++itemFirst;
2495  el.~T();
2496  ++index;
2497  }
2498  }
2499  MOODYCAMEL_CATCH(...)
2500  {
2501  // It's too late to revert the dequeue, but we can make sure that all
2502  // the dequeued objects are properly destroyed and the block index
2503  // (and empty count) are properly updated before we propagate the exception
2504  do {
2505  block = localBlockIndex->entries[indexIndex].block;
2506  while (index != endIndex) {
2507  (*block)[index++]->~T();
2508  }
2509  block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2510  indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2511 
2512  firstIndexInBlock = index;
2513  endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2514  endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2515  } while (index != firstIndex + actualCount);
2516 
2518  }
2519  }
2520  block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2521  indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2522  } while (index != firstIndex + actualCount);
2523 
2524  return actualCount;
2525  } else {
2526  // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
2527  this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2528  }
2529  }
2530 
2531  return 0;
2532  }
2533 
2534  private:
2535  struct BlockIndexEntry {
2536  index_t base;
2537  Block* block;
2538  };
2539 
2540  struct BlockIndexHeader {
2541  size_t size;
2542  std::atomic<size_t> front; // Current slot (not next, like pr_blockIndexFront)
2543  BlockIndexEntry* entries;
2544  void* prev;
2545  };
2546 
2547  bool new_block_index(size_t numberOfFilledSlotsToExpose)
2548  {
2549  auto prevBlockSizeMask = pr_blockIndexSize - 1;
2550 
2551  // Create the new block
2552  pr_blockIndexSize <<= 1;
2553  auto newRawPtr = static_cast<char*>((Traits::malloc)(sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * pr_blockIndexSize));
2554  if (newRawPtr == nullptr) {
2555  pr_blockIndexSize >>= 1; // Reset to allow graceful retry
2556  return false;
2557  }
2558 
2559  auto newBlockIndexEntries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(newRawPtr + sizeof(BlockIndexHeader)));
2560 
2561  // Copy in all the old indices, if any
2562  size_t j = 0;
2563  if (pr_blockIndexSlotsUsed != 0) {
2564  auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2565  do {
2566  newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
2567  i = (i + 1) & prevBlockSizeMask;
2568  } while (i != pr_blockIndexFront);
2569  }
2570 
2571  // Update everything
2572  auto header = new (newRawPtr) BlockIndexHeader;
2573  header->size = pr_blockIndexSize;
2574  header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
2575  header->entries = newBlockIndexEntries;
2576  header->prev = pr_blockIndexRaw; // we link the new block to the old one so we can free it later
2577 
2578  pr_blockIndexFront = j;
2579  pr_blockIndexEntries = newBlockIndexEntries;
2580  pr_blockIndexRaw = newRawPtr;
2581  blockIndex.store(header, std::memory_order_release);
2582 
2583  return true;
2584  }
2585 
2586  private:
2587  std::atomic<BlockIndexHeader*> blockIndex;
2588 
2589  // To be used by producer only -- consumer must use the ones in referenced by blockIndex
2590  size_t pr_blockIndexSlotsUsed;
2591  size_t pr_blockIndexSize;
2592  size_t pr_blockIndexFront; // Next slot (not current)
2593  BlockIndexEntry* pr_blockIndexEntries;
2594  void* pr_blockIndexRaw;
2595 
2596 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2597  public:
2598  ExplicitProducer* nextExplicitProducer;
2599 
2600  private:
2601 #endif
2602 
2603 #ifdef MCDBGQ_TRACKMEM
2604  friend struct MemStats;
2605 #endif
2606  };
2607 
2609  // Implicit queue
2611 
2612  struct ImplicitProducer : public ProducerBase {
2614  : ProducerBase(parent_, false)
2615  , nextBlockIndexCapacity(IMPLICIT_INITIAL_INDEX_SIZE)
2616  , blockIndex(nullptr)
2617  {
2618  new_block_index();
2619  }
2620 
2621  ~ImplicitProducer()
2622  {
2623  // Note that since we're in the destructor we can assume that all enqueue/dequeue operations
2624  // completed already; this means that all undequeued elements are placed contiguously across
2625  // contiguous blocks, and that only the first and last remaining blocks can be only partially
2626  // empty (all other remaining blocks must be completely full).
2627 
2628 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2629  // Unregister ourselves for thread termination notification
2630  if (!this->inactive.load(std::memory_order_relaxed)) {
2631  details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2632  }
2633 #endif
2634 
2635  // Destroy all remaining elements!
2636  auto tail = this->tailIndex.load(std::memory_order_relaxed);
2637  auto index = this->headIndex.load(std::memory_order_relaxed);
2638  Block* block = nullptr;
2639  assert(index == tail || details::circular_less_than(index, tail));
2640  bool forceFreeLastBlock = index != tail; // If we enter the loop, then the last (tail) block will not be freed
2641  while (index != tail) {
2642  if ((index & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 || block == nullptr) {
2643  if (block != nullptr) {
2644  // Free the old block
2645  this->parent->add_block_to_free_list(block);
2646  }
2647 
2648  block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed);
2649  }
2650 
2651  ((*block)[index])->~T();
2652  ++index;
2653  }
2654  // Even if the queue is empty, there's still one block that's not on the free list
2655  // (unless the head index reached the end of it, in which case the tail will be poised
2656  // to create a new block).
2657  if (this->tailBlock != nullptr && (forceFreeLastBlock || (tail & static_cast<index_t>(BLOCK_SIZE - 1)) != 0)) {
2658  this->parent->add_block_to_free_list(this->tailBlock);
2659  }
2660 
2661  // Destroy block index
2662  auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2663  if (localBlockIndex != nullptr) {
2664  for (size_t i = 0; i != localBlockIndex->capacity; ++i) {
2665  localBlockIndex->index[i]->~BlockIndexEntry();
2666  }
2667  do {
2668  auto prev = localBlockIndex->prev;
2669  localBlockIndex->~BlockIndexHeader();
2670  (Traits::free)(localBlockIndex);
2671  localBlockIndex = prev;
2672  } while (localBlockIndex != nullptr);
2673  }
2674  }
2675 
2676  template <AllocationMode allocMode, typename U>
2677  inline bool enqueue(U&& element)
2678  {
2679  index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2680  index_t newTailIndex = 1 + currentTailIndex;
2681  if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2682  // We reached the end of a block, start a new one
2683  auto head = this->headIndex.load(std::memory_order_relaxed);
2684  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2685  if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
2686  return false;
2687  }
2688 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2689  debug::DebugLock lock(mutex);
2690 #endif
2691  // Find out where we'll be inserting this block in the block index
2692  BlockIndexEntry* idxEntry;
2693  if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2694  return false;
2695  }
2696 
2697  // Get ahold of a new block
2698  auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2699  if (newBlock == nullptr) {
2700  rewind_block_index_tail();
2701  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2702  return false;
2703  }
2704 #ifdef MCDBGQ_TRACKMEM
2705  newBlock->owner = this;
2706 #endif
2707  newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2708 
2709  MOODYCAMEL_CONSTEXPR_IF(!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<U>(element))))
2710  {
2711  // May throw, try to insert now before we publish the fact that we have this new block
2713  {
2714  new ((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2715  }
2716  MOODYCAMEL_CATCH(...)
2717  {
2718  rewind_block_index_tail();
2719  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2720  this->parent->add_block_to_free_list(newBlock);
2722  }
2723  }
2724 
2725  // Insert the new block into the index
2726  idxEntry->value.store(newBlock, std::memory_order_relaxed);
2727 
2728  this->tailBlock = newBlock;
2729 
2730  MOODYCAMEL_CONSTEXPR_IF(!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<U>(element))))
2731  {
2732  this->tailIndex.store(newTailIndex, std::memory_order_release);
2733  return true;
2734  }
2735  }
2736 
2737  // Enqueue
2738  new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2739 
2740  this->tailIndex.store(newTailIndex, std::memory_order_release);
2741  return true;
2742  }
2743 
2744  template <typename U>
2745  bool dequeue(U& element)
2746  {
2747  // See ExplicitProducer::dequeue for rationale and explanation
2748  index_t tail = this->tailIndex.load(std::memory_order_relaxed);
2749  index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2750  if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2751  std::atomic_thread_fence(std::memory_order_acquire);
2752 
2753  index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2754  tail = this->tailIndex.load(std::memory_order_acquire);
2755  if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2756  index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2757 
2758  // Determine which block the element is in
2759  auto entry = get_block_index_entry_for_index(index);
2760 
2761  // Dequeue
2762  auto block = entry->value.load(std::memory_order_relaxed);
2763  auto& el = *((*block)[index]);
2764 
2765  if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) {
2766 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2767  // Note: Acquiring the mutex with every dequeue instead of only when a block
2768  // is released is very sub-optimal, but it is, after all, purely debug code.
2769  debug::DebugLock lock(producer->mutex);
2770 #endif
2771  struct Guard {
2772  Block* block;
2773  index_t index;
2774  BlockIndexEntry* entry;
2775  ConcurrentQueue* parent;
2776 
2777  ~Guard()
2778  {
2779  (*block)[index]->~T();
2780  if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2781  entry->value.store(nullptr, std::memory_order_relaxed);
2782  parent->add_block_to_free_list(block);
2783  }
2784  }
2785  } guard = { block, index, entry, this->parent };
2786 
2787  element = std::move(el); // NOLINT
2788  } else {
2789  element = std::move(el); // NOLINT
2790  el.~T(); // NOLINT
2791 
2792  if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2793  {
2794 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2795  debug::DebugLock lock(mutex);
2796 #endif
2797  // Add the block back into the global free pool (and remove from block index)
2798  entry->value.store(nullptr, std::memory_order_relaxed);
2799  }
2800  this->parent->add_block_to_free_list(block); // releases the above store
2801  }
2802  }
2803 
2804  return true;
2805  } else {
2806  this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2807  }
2808  }
2809 
2810  return false;
2811  }
2812 
2813 #ifdef _MSC_VER
2814 #pragma warning(push)
2815 #pragma warning(disable : 4706) // assignment within conditional expression
2816 #endif
2817  template <AllocationMode allocMode, typename It>
2818  bool enqueue_bulk(It itemFirst, size_t count)
2819  {
2820  // First, we need to make sure we have enough room to enqueue all of the elements;
2821  // this means pre-allocating blocks and putting them in the block index (but only if
2822  // all the allocations succeeded).
2823 
2824  // Note that the tailBlock we start off with may not be owned by us any more;
2825  // this happens if it was filled up exactly to the top (setting tailIndex to
2826  // the first index of the next block which is not yet allocated), then dequeued
2827  // completely (putting it on the free list) before we enqueue again.
2828 
2829  index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2830  auto startBlock = this->tailBlock;
2831  Block* firstAllocatedBlock = nullptr;
2832  auto endBlock = this->tailBlock;
2833 
2834  // Figure out how many blocks we'll need to allocate, and do so
2835  size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2836  index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2837  if (blockBaseDiff > 0) {
2838 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2839  debug::DebugLock lock(mutex);
2840 #endif
2841  do {
2842  blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2843  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2844 
2845  // Find out where we'll be inserting this block in the block index
2846  BlockIndexEntry* idxEntry = nullptr; // initialization here unnecessary but compiler can't always tell
2847  Block* newBlock;
2848  bool indexInserted = false;
2849  auto head = this->headIndex.load(std::memory_order_relaxed);
2850  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2851  bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2852 
2853  if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) || (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) == nullptr) {
2854  // Index allocation or block allocation failed; revert any other allocations
2855  // and index insertions done so far for this operation
2856  if (indexInserted) {
2857  rewind_block_index_tail();
2858  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2859  }
2860  currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2861  for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2862  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2863  idxEntry = get_block_index_entry_for_index(currentTailIndex);
2864  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2865  rewind_block_index_tail();
2866  }
2867  this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2868  this->tailBlock = startBlock;
2869 
2870  return false;
2871  }
2872 
2873 #ifdef MCDBGQ_TRACKMEM
2874  newBlock->owner = this;
2875 #endif
2876  newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2877  newBlock->next = nullptr;
2878 
2879  // Insert the new block into the index
2880  idxEntry->value.store(newBlock, std::memory_order_relaxed);
2881 
2882  // Store the chain of blocks so that we can undo if later allocations fail,
2883  // and so that we can find the blocks when we do the actual enqueueing
2884  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr) {
2885  assert(this->tailBlock != nullptr);
2886  this->tailBlock->next = newBlock;
2887  }
2888  this->tailBlock = newBlock;
2889  endBlock = newBlock;
2890  firstAllocatedBlock = firstAllocatedBlock == nullptr ? newBlock : firstAllocatedBlock;
2891  } while (blockBaseDiff > 0);
2892  }
2893 
2894  // Enqueue, one block at a time
2895  index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2896  currentTailIndex = startTailIndex;
2897  this->tailBlock = startBlock;
2898  assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
2899  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2900  this->tailBlock = firstAllocatedBlock;
2901  }
2902  while (true) {
2903  index_t stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2904  if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2905  stopIndex = newTailIndex;
2906  }
2907  MOODYCAMEL_CONSTEXPR_IF(MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst))))
2908  {
2909  while (currentTailIndex != stopIndex) {
2910  new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2911  }
2912  }
2913  else
2914  {
2916  {
2917  while (currentTailIndex != stopIndex) {
2918  new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2919  ++currentTailIndex;
2920  ++itemFirst;
2921  }
2922  }
2923  MOODYCAMEL_CATCH(...)
2924  {
2925  auto constructedStopIndex = currentTailIndex;
2926  auto lastBlockEnqueued = this->tailBlock;
2927 
2928  if (!details::is_trivially_destructible<T>::value) {
2929  auto block = startBlock;
2930  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2931  block = firstAllocatedBlock;
2932  }
2933  currentTailIndex = startTailIndex;
2934  while (true) {
2935  stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2936  if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2937  stopIndex = constructedStopIndex;
2938  }
2939  while (currentTailIndex != stopIndex) {
2940  (*block)[currentTailIndex++]->~T();
2941  }
2942  if (block == lastBlockEnqueued) {
2943  break;
2944  }
2945  block = block->next;
2946  }
2947  }
2948 
2949  currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2950  for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2951  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2952  auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
2953  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2954  rewind_block_index_tail();
2955  }
2956  this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2957  this->tailBlock = startBlock;
2959  }
2960  }
2961 
2962  if (this->tailBlock == endBlock) {
2963  assert(currentTailIndex == newTailIndex);
2964  break;
2965  }
2966  this->tailBlock = this->tailBlock->next;
2967  }
2968  this->tailIndex.store(newTailIndex, std::memory_order_release);
2969  return true;
2970  }
2971 #ifdef _MSC_VER
2972 #pragma warning(pop)
2973 #endif
2974 
2975  template <typename It>
2976  size_t dequeue_bulk(It& itemFirst, size_t max)
2977  {
2978  auto tail = this->tailIndex.load(std::memory_order_relaxed);
2979  auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2980  auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2981  if (details::circular_less_than<size_t>(0, desiredCount)) {
2982  desiredCount = desiredCount < max ? desiredCount : max;
2983  std::atomic_thread_fence(std::memory_order_acquire);
2984 
2985  auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2986 
2987  tail = this->tailIndex.load(std::memory_order_acquire);
2988  auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2989  if (details::circular_less_than<size_t>(0, actualCount)) {
2990  actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2991  if (actualCount < desiredCount) {
2992  this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2993  }
2994 
2995  // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2996  // will never exceed tail.
2997  auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2998 
2999  // Iterate the blocks and dequeue
3000  auto index = firstIndex;
3001  BlockIndexHeader* localBlockIndex;
3002  auto indexIndex = get_block_index_index_for_index(index, localBlockIndex);
3003  do {
3004  auto blockStartIndex = index;
3005  index_t endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
3006  endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
3007 
3008  auto entry = localBlockIndex->index[indexIndex];
3009  auto block = entry->value.load(std::memory_order_relaxed);
3010  if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
3011  while (index != endIndex) {
3012  auto& el = *((*block)[index]);
3013  *itemFirst++ = std::move(el);
3014  el.~T();
3015  ++index;
3016  }
3017  } else {
3019  {
3020  while (index != endIndex) {
3021  auto& el = *((*block)[index]);
3022  *itemFirst = std::move(el);
3023  ++itemFirst;
3024  el.~T();
3025  ++index;
3026  }
3027  }
3028  MOODYCAMEL_CATCH(...)
3029  {
3030  do {
3031  entry = localBlockIndex->index[indexIndex];
3032  block = entry->value.load(std::memory_order_relaxed);
3033  while (index != endIndex) {
3034  (*block)[index++]->~T();
3035  }
3036 
3037  if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
3038 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3039  debug::DebugLock lock(mutex);
3040 #endif
3041  entry->value.store(nullptr, std::memory_order_relaxed);
3042  this->parent->add_block_to_free_list(block);
3043  }
3044  indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
3045 
3046  blockStartIndex = index;
3047  endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
3048  endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
3049  } while (index != firstIndex + actualCount);
3050 
3052  }
3053  }
3054  if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
3055  {
3056 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3057  debug::DebugLock lock(mutex);
3058 #endif
3059  // Note that the set_many_empty above did a release, meaning that anybody who acquires the block
3060  // we're about to free can use it safely since our writes (and reads!) will have happened-before then.
3061  entry->value.store(nullptr, std::memory_order_relaxed);
3062  }
3063  this->parent->add_block_to_free_list(block); // releases the above store
3064  }
3065  indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
3066  } while (index != firstIndex + actualCount);
3067 
3068  return actualCount;
3069  } else {
3070  this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
3071  }
3072  }
3073 
3074  return 0;
3075  }
3076 
3077  private:
3078  // The block size must be > 1, so any number with the low bit set is an invalid block base index
3079  static const index_t INVALID_BLOCK_BASE = 1;
3080 
3081  struct BlockIndexEntry {
3082  std::atomic<index_t> key;
3083  std::atomic<Block*> value;
3084  };
3085 
3086  struct BlockIndexHeader {
3087  size_t capacity;
3088  std::atomic<size_t> tail;
3089  BlockIndexEntry* entries;
3090  BlockIndexEntry** index;
3091  BlockIndexHeader* prev;
3092  };
3093 
3094  template <AllocationMode allocMode>
3095  inline bool insert_block_index_entry(BlockIndexEntry*& idxEntry, index_t blockStartIndex)
3096  {
3097  auto localBlockIndex = blockIndex.load(std::memory_order_relaxed); // We're the only writer thread, relaxed is OK
3098  if (localBlockIndex == nullptr) {
3099  return false; // this can happen if new_block_index failed in the constructor
3100  }
3101  size_t newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
3102  idxEntry = localBlockIndex->index[newTail];
3103  if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE || idxEntry->value.load(std::memory_order_relaxed) == nullptr) {
3104 
3105  idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
3106  localBlockIndex->tail.store(newTail, std::memory_order_release);
3107  return true;
3108  }
3109 
3110  // No room in the old block index, try to allocate another one!
3111  MOODYCAMEL_CONSTEXPR_IF(allocMode == CannotAlloc)
3112  {
3113  return false;
3114  }
3115  else if (!new_block_index())
3116  {
3117  return false;
3118  }
3119  else
3120  {
3121  localBlockIndex = blockIndex.load(std::memory_order_relaxed);
3122  newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
3123  idxEntry = localBlockIndex->index[newTail];
3124  assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
3125  idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
3126  localBlockIndex->tail.store(newTail, std::memory_order_release);
3127  return true;
3128  }
3129  }
3130 
3131  inline void rewind_block_index_tail()
3132  {
3133  auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
3134  localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) & (localBlockIndex->capacity - 1), std::memory_order_relaxed);
3135  }
3136 
3137  inline BlockIndexEntry* get_block_index_entry_for_index(index_t index) const
3138  {
3139  BlockIndexHeader* localBlockIndex;
3140  auto idx = get_block_index_index_for_index(index, localBlockIndex);
3141  return localBlockIndex->index[idx];
3142  }
3143 
3144  inline size_t get_block_index_index_for_index(index_t index, BlockIndexHeader*& localBlockIndex) const
3145  {
3146 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3147  debug::DebugLock lock(mutex);
3148 #endif
3149  index &= ~static_cast<index_t>(BLOCK_SIZE - 1);
3150  localBlockIndex = blockIndex.load(std::memory_order_acquire);
3151  auto tail = localBlockIndex->tail.load(std::memory_order_acquire);
3152  auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
3153  assert(tailBase != INVALID_BLOCK_BASE);
3154  // Note: Must use division instead of shift because the index may wrap around, causing a negative
3155  // offset, whose negativity we want to preserve
3156  auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(index - tailBase) / static_cast<typename std::make_signed<index_t>::type>(BLOCK_SIZE));
3157  size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
3158  assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index && localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) != nullptr);
3159  return idx;
3160  }
3161 
3162  bool new_block_index()
3163  {
3164  auto prev = blockIndex.load(std::memory_order_relaxed);
3165  size_t prevCapacity = prev == nullptr ? 0 : prev->capacity;
3166  auto entryCount = prev == nullptr ? nextBlockIndexCapacity : prevCapacity;
3167  auto raw = static_cast<char*>((Traits::malloc)(
3168  sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * entryCount + std::alignment_of<BlockIndexEntry*>::value - 1 + sizeof(BlockIndexEntry*) * nextBlockIndexCapacity));
3169  if (raw == nullptr) {
3170  return false;
3171  }
3172 
3173  auto header = new (raw) BlockIndexHeader;
3174  auto entries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(raw + sizeof(BlockIndexHeader)));
3175  auto index = reinterpret_cast<BlockIndexEntry**>(details::align_for<BlockIndexEntry*>(reinterpret_cast<char*>(entries) + sizeof(BlockIndexEntry) * entryCount));
3176  if (prev != nullptr) {
3177  auto prevTail = prev->tail.load(std::memory_order_relaxed);
3178  auto prevPos = prevTail;
3179  size_t i = 0;
3180  do {
3181  prevPos = (prevPos + 1) & (prev->capacity - 1);
3182  index[i++] = prev->index[prevPos];
3183  } while (prevPos != prevTail);
3184  assert(i == prevCapacity);
3185  }
3186  for (size_t i = 0; i != entryCount; ++i) {
3187  new (entries + i) BlockIndexEntry;
3188  entries[i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
3189  index[prevCapacity + i] = entries + i;
3190  }
3191  header->prev = prev;
3192  header->entries = entries;
3193  header->index = index;
3194  header->capacity = nextBlockIndexCapacity;
3195  header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1), std::memory_order_relaxed);
3196 
3197  blockIndex.store(header, std::memory_order_release);
3198 
3199  nextBlockIndexCapacity <<= 1;
3200 
3201  return true;
3202  }
3203 
3204  private:
3205  size_t nextBlockIndexCapacity;
3206  std::atomic<BlockIndexHeader*> blockIndex;
3207 
3208 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3209  public:
3210  details::ThreadExitListener threadExitListener;
3211 
3212  private:
3213 #endif
3214 
3215 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3216  public:
3217  ImplicitProducer* nextImplicitProducer;
3218 
3219  private:
3220 #endif
3221 
3222 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3223  mutable debug::DebugMutex mutex;
3224 #endif
3225 #ifdef MCDBGQ_TRACKMEM
3226  friend struct MemStats;
3227 #endif
3228  };
3229 
3231  // Block pool manipulation
3233 
3234  void populate_initial_block_list(size_t blockCount)
3235  {
3236  initialBlockPoolSize = blockCount;
3237  if (initialBlockPoolSize == 0) {
3238  initialBlockPool = nullptr;
3239  return;
3240  }
3241 
3242  initialBlockPool = create_array<Block>(blockCount);
3243  if (initialBlockPool == nullptr) {
3244  initialBlockPoolSize = 0;
3245  }
3246  for (size_t i = 0; i < initialBlockPoolSize; ++i) {
3247  initialBlockPool[i].dynamicallyAllocated = false;
3248  }
3249  }
3250 
3251  inline Block* try_get_block_from_initial_pool()
3252  {
3253  if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
3254  return nullptr;
3255  }
3256 
3257  auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
3258 
3259  return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr;
3260  }
3261 
3262  inline void add_block_to_free_list(Block* block)
3263  {
3264 #ifdef MCDBGQ_TRACKMEM
3265  block->owner = nullptr;
3266 #endif
3267  if (!Traits::RECYCLE_ALLOCATED_BLOCKS && block->dynamicallyAllocated) {
3268  destroy(block);
3269  } else {
3270  freeList.add(block);
3271  }
3272  }
3273 
3274  inline void add_blocks_to_free_list(Block* block)
3275  {
3276  while (block != nullptr) {
3277  auto next = block->next;
3278  add_block_to_free_list(block);
3279  block = next;
3280  }
3281  }
3282 
3283  inline Block* try_get_block_from_free_list()
3284  {
3285  return freeList.try_get();
3286  }
3287 
3288  // Gets a free block from one of the memory pools, or allocates a new one (if applicable)
3289  template <AllocationMode canAlloc>
3290  Block* requisition_block()
3291  {
3292  auto block = try_get_block_from_initial_pool();
3293  if (block != nullptr) {
3294  return block;
3295  }
3296 
3297  block = try_get_block_from_free_list();
3298  if (block != nullptr) {
3299  return block;
3300  }
3301 
3302  MOODYCAMEL_CONSTEXPR_IF(canAlloc == CanAlloc)
3303  {
3304  return create<Block>();
3305  }
3306  else
3307  {
3308  return nullptr;
3309  }
3310  }
3311 
3312 #ifdef MCDBGQ_TRACKMEM
3313 public:
3314  struct MemStats {
3315  size_t allocatedBlocks;
3316  size_t usedBlocks;
3317  size_t freeBlocks;
3318  size_t ownedBlocksExplicit;
3319  size_t ownedBlocksImplicit;
3320  size_t implicitProducers;
3321  size_t explicitProducers;
3322  size_t elementsEnqueued;
3323  size_t blockClassBytes;
3324  size_t queueClassBytes;
3325  size_t implicitBlockIndexBytes;
3326  size_t explicitBlockIndexBytes;
3327 
3328  friend class ConcurrentQueue;
3329 
3330  private:
3331  static MemStats getFor(ConcurrentQueue* q)
3332  {
3333  MemStats stats = { 0 };
3334 
3335  stats.elementsEnqueued = q->size_approx();
3336 
3337  auto block = q->freeList.head_unsafe();
3338  while (block != nullptr) {
3339  ++stats.allocatedBlocks;
3340  ++stats.freeBlocks;
3341  block = block->freeListNext.load(std::memory_order_relaxed);
3342  }
3343 
3344  for (auto ptr = q->producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3345  bool implicit = dynamic_cast<ImplicitProducer*>(ptr) != nullptr;
3346  stats.implicitProducers += implicit ? 1 : 0;
3347  stats.explicitProducers += implicit ? 0 : 1;
3348 
3349  if (implicit) {
3350  auto prod = static_cast<ImplicitProducer*>(ptr);
3351  stats.queueClassBytes += sizeof(ImplicitProducer);
3352  auto head = prod->headIndex.load(std::memory_order_relaxed);
3353  auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3354  auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3355  if (hash != nullptr) {
3356  for (size_t i = 0; i != hash->capacity; ++i) {
3357  if (hash->index[i]->key.load(std::memory_order_relaxed) != ImplicitProducer::INVALID_BLOCK_BASE && hash->index[i]->value.load(std::memory_order_relaxed) != nullptr) {
3358  ++stats.allocatedBlocks;
3359  ++stats.ownedBlocksImplicit;
3360  }
3361  }
3362  stats.implicitBlockIndexBytes += hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry);
3363  for (; hash != nullptr; hash = hash->prev) {
3364  stats.implicitBlockIndexBytes += sizeof(typename ImplicitProducer::BlockIndexHeader) + hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry*);
3365  }
3366  }
3367  for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) {
3368  //auto block = prod->get_block_index_entry_for_index(head);
3369  ++stats.usedBlocks;
3370  }
3371  } else {
3372  auto prod = static_cast<ExplicitProducer*>(ptr);
3373  stats.queueClassBytes += sizeof(ExplicitProducer);
3374  auto tailBlock = prod->tailBlock;
3375  bool wasNonEmpty = false;
3376  if (tailBlock != nullptr) {
3377  auto block = tailBlock;
3378  do {
3379  ++stats.allocatedBlocks;
3380  if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) {
3381  ++stats.usedBlocks;
3382  wasNonEmpty = wasNonEmpty || block != tailBlock;
3383  }
3384  ++stats.ownedBlocksExplicit;
3385  block = block->next;
3386  } while (block != tailBlock);
3387  }
3388  auto index = prod->blockIndex.load(std::memory_order_relaxed);
3389  while (index != nullptr) {
3390  stats.explicitBlockIndexBytes += sizeof(typename ExplicitProducer::BlockIndexHeader) + index->size * sizeof(typename ExplicitProducer::BlockIndexEntry);
3391  index = static_cast<typename ExplicitProducer::BlockIndexHeader*>(index->prev);
3392  }
3393  }
3394  }
3395 
3396  auto freeOnInitialPool = q->initialBlockPoolIndex.load(std::memory_order_relaxed) >= q->initialBlockPoolSize ? 0 : q->initialBlockPoolSize - q->initialBlockPoolIndex.load(std::memory_order_relaxed);
3397  stats.allocatedBlocks += freeOnInitialPool;
3398  stats.freeBlocks += freeOnInitialPool;
3399 
3400  stats.blockClassBytes = sizeof(Block) * stats.allocatedBlocks;
3401  stats.queueClassBytes += sizeof(ConcurrentQueue);
3402 
3403  return stats;
3404  }
3405  };
3406 
3407  // For debugging only. Not thread-safe.
3408  MemStats getMemStats()
3409  {
3410  return MemStats::getFor(this);
3411  }
3412 
3413 private:
3414  friend struct MemStats;
3415 #endif
3416 
3418  // Producer list manipulation
3420 
3421  ProducerBase* recycle_or_create_producer(bool isExplicit)
3422  {
3423 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3424  debug::DebugLock lock(implicitProdMutex);
3425 #endif
3426  // Try to re-use one first
3427  for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3428  if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
3429  bool expected = true;
3430  if (ptr->inactive.compare_exchange_strong(expected, /* desired */ false, std::memory_order_acquire, std::memory_order_relaxed)) {
3431  // We caught one! It's been marked as activated, the caller can have it
3432  return ptr;
3433  }
3434  }
3435  }
3436 
3437  return add_producer(isExplicit ? static_cast<ProducerBase*>(create<ExplicitProducer>(this)) : create<ImplicitProducer>(this));
3438  }
3439 
3440  ProducerBase* add_producer(ProducerBase* producer)
3441  {
3442  // Handle failed memory allocation
3443  if (producer == nullptr) {
3444  return nullptr;
3445  }
3446 
3447  producerCount.fetch_add(1, std::memory_order_relaxed);
3448 
3449  // Add it to the lock-free list
3450  auto prevTail = producerListTail.load(std::memory_order_relaxed);
3451  do {
3452  producer->next = prevTail;
3453  } while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed));
3454 
3455 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3456  if (producer->isExplicit) {
3457  auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3458  do {
3459  static_cast<ExplicitProducer*>(producer)->nextExplicitProducer = prevTailExplicit;
3460  } while (!explicitProducers.compare_exchange_weak(prevTailExplicit, static_cast<ExplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3461  } else {
3462  auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3463  do {
3464  static_cast<ImplicitProducer*>(producer)->nextImplicitProducer = prevTailImplicit;
3465  } while (!implicitProducers.compare_exchange_weak(prevTailImplicit, static_cast<ImplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3466  }
3467 #endif
3468 
3469  return producer;
3470  }
3471 
3472  void reown_producers()
3473  {
3474  // After another instance is moved-into/swapped-with this one, all the
3475  // producers we stole still think their parents are the other queue.
3476  // So fix them up!
3477  for (auto ptr = producerListTail.load(std::memory_order_relaxed); ptr != nullptr; ptr = ptr->next_prod()) {
3478  ptr->parent = this;
3479  }
3480  }
3481 
3483  // Implicit producer hash
3485 
3486  struct ImplicitProducerKVP {
3487  std::atomic<details::thread_id_t> key;
3488  ImplicitProducer* value; // No need for atomicity since it's only read by the thread that sets it in the first place
3489 
3490  ImplicitProducerKVP()
3491  : value(nullptr)
3492  {
3493  }
3494 
3495  ImplicitProducerKVP(ImplicitProducerKVP&& other) MOODYCAMEL_NOEXCEPT
3496  {
3497  key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3498  value = other.value;
3499  }
3500 
3501  inline ImplicitProducerKVP& operator=(ImplicitProducerKVP&& other) MOODYCAMEL_NOEXCEPT
3502  {
3503  swap(other);
3504  return *this;
3505  }
3506 
3507  inline void swap(ImplicitProducerKVP& other) MOODYCAMEL_NOEXCEPT
3508  {
3509  if (this != &other) {
3510  details::swap_relaxed(key, other.key);
3511  std::swap(value, other.value);
3512  }
3513  }
3514  };
3515 
3516  template <typename XT, typename XTraits>
3517  friend void swap(typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&, typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&) MOODYCAMEL_NOEXCEPT;
3518 
3519  struct ImplicitProducerHash {
3520  size_t capacity;
3521  ImplicitProducerKVP* entries;
3522  ImplicitProducerHash* prev;
3523  };
3524 
3525  inline void populate_initial_implicit_producer_hash()
3526  {
3528  {
3529  return;
3530  }
3531  else
3532  {
3533  implicitProducerHashCount.store(0, std::memory_order_relaxed);
3534  auto hash = &initialImplicitProducerHash;
3535  hash->capacity = INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
3536  hash->entries = &initialImplicitProducerHashEntries[0];
3537  for (size_t i = 0; i != INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; ++i) {
3538  initialImplicitProducerHashEntries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3539  }
3540  hash->prev = nullptr;
3541  implicitProducerHash.store(hash, std::memory_order_relaxed);
3542  }
3543  }
3544 
3545  void swap_implicit_producer_hashes(ConcurrentQueue& other)
3546  {
3548  {
3549  return;
3550  }
3551  else
3552  {
3553  // Swap (assumes our implicit producer hash is initialized)
3554  initialImplicitProducerHashEntries.swap(other.initialImplicitProducerHashEntries);
3555  initialImplicitProducerHash.entries = &initialImplicitProducerHashEntries[0];
3556  other.initialImplicitProducerHash.entries = &other.initialImplicitProducerHashEntries[0];
3557 
3558  details::swap_relaxed(implicitProducerHashCount, other.implicitProducerHashCount);
3559 
3560  details::swap_relaxed(implicitProducerHash, other.implicitProducerHash);
3561  if (implicitProducerHash.load(std::memory_order_relaxed) == &other.initialImplicitProducerHash) {
3562  implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed);
3563  } else {
3564  ImplicitProducerHash* hash;
3565  for (hash = implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &other.initialImplicitProducerHash; hash = hash->prev) {
3566  continue;
3567  }
3568  hash->prev = &initialImplicitProducerHash;
3569  }
3570  if (other.implicitProducerHash.load(std::memory_order_relaxed) == &initialImplicitProducerHash) {
3571  other.implicitProducerHash.store(&other.initialImplicitProducerHash, std::memory_order_relaxed);
3572  } else {
3573  ImplicitProducerHash* hash;
3574  for (hash = other.implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &initialImplicitProducerHash; hash = hash->prev) {
3575  continue;
3576  }
3577  hash->prev = &other.initialImplicitProducerHash;
3578  }
3579  }
3580  }
3581 
3582  // Only fails (returns nullptr) if memory allocation fails
3583  ImplicitProducer* get_or_add_implicit_producer()
3584  {
3585  // Note that since the data is essentially thread-local (key is thread ID),
3586  // there's a reduced need for fences (memory ordering is already consistent
3587  // for any individual thread), except for the current table itself.
3588 
3589  // Start by looking for the thread ID in the current and all previous hash tables.
3590  // If it's not found, it must not be in there yet, since this same thread would
3591  // have added it previously to one of the tables that we traversed.
3592 
3593  // Code and algorithm adapted from http://preshing.com/20130605/the-worlds-simplest-lock-free-hash-table
3594 
3595 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3596  debug::DebugLock lock(implicitProdMutex);
3597 #endif
3598 
3599  auto id = details::thread_id();
3600  auto hashedId = details::hash_thread_id(id);
3601 
3602  auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
3603  assert(mainHash != nullptr); // silence clang-tidy and MSVC warnings (hash cannot be null)
3604  for (auto hash = mainHash; hash != nullptr; hash = hash->prev) {
3605  // Look for the id in this hash
3606  auto index = hashedId;
3607  while (true) { // Not an infinite loop because at least one slot is free in the hash table
3608  index &= hash->capacity - 1u;
3609 
3610  auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3611  if (probedKey == id) {
3612  // Found it! If we had to search several hashes deep, though, we should lazily add it
3613  // to the current main hash table to avoid the extended search next time.
3614  // Note there's guaranteed to be room in the current hash table since every subsequent
3615  // table implicitly reserves space for all previous tables (there's only one
3616  // implicitProducerHashCount).
3617  auto value = hash->entries[index].value;
3618  if (hash != mainHash) {
3619  index = hashedId;
3620  while (true) {
3621  index &= mainHash->capacity - 1u;
3622  auto empty = details::invalid_thread_id;
3623 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3624  auto reusable = details::invalid_thread_id2;
3625  if (mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_seq_cst, std::memory_order_relaxed) || mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3626 #else
3627  if (mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3628 #endif
3629  mainHash->entries[index].value = value;
3630  break;
3631  }
3632  ++index;
3633  }
3634  }
3635 
3636  return value;
3637  }
3638  if (probedKey == details::invalid_thread_id) {
3639  break; // Not in this hash table
3640  }
3641  ++index;
3642  }
3643  }
3644 
3645  // Insert!
3646  auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
3647  while (true) {
3648  // NOLINTNEXTLINE(clang-analyzer-core.NullDereference)
3649  if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) {
3650  // We've acquired the resize lock, try to allocate a bigger hash table.
3651  // Note the acquire fence synchronizes with the release fence at the end of this block, and hence when
3652  // we reload implicitProducerHash it must be the most recent version (it only gets changed within this
3653  // locked block).
3654  mainHash = implicitProducerHash.load(std::memory_order_acquire);
3655  if (newCount >= (mainHash->capacity >> 1)) {
3656  size_t newCapacity = mainHash->capacity << 1;
3657  while (newCount >= (newCapacity >> 1)) {
3658  newCapacity <<= 1;
3659  }
3660  auto raw = static_cast<char*>((Traits::malloc)(sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 + sizeof(ImplicitProducerKVP) * newCapacity));
3661  if (raw == nullptr) {
3662  // Allocation failed
3663  implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3664  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
3665  return nullptr;
3666  }
3667 
3668  auto newHash = new (raw) ImplicitProducerHash;
3669  newHash->capacity = static_cast<size_t>(newCapacity);
3670  newHash->entries = reinterpret_cast<ImplicitProducerKVP*>(details::align_for<ImplicitProducerKVP>(raw + sizeof(ImplicitProducerHash)));
3671  for (size_t i = 0; i != newCapacity; ++i) {
3672  new (newHash->entries + i) ImplicitProducerKVP;
3673  newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3674  }
3675  newHash->prev = mainHash;
3676  implicitProducerHash.store(newHash, std::memory_order_release);
3677  implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3678  mainHash = newHash;
3679  } else {
3680  implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3681  }
3682  }
3683 
3684  // If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table
3685  // to finish being allocated by another thread (and if we just finished allocating above, the condition will
3686  // always be true)
3687  if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3688  auto producer = static_cast<ImplicitProducer*>(recycle_or_create_producer(false));
3689  if (producer == nullptr) {
3690  implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3691  return nullptr;
3692  }
3693 
3694 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3695  producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3696  producer->threadExitListener.userData = producer;
3697  details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3698 #endif
3699 
3700  auto index = hashedId;
3701  while (true) {
3702  index &= mainHash->capacity - 1u;
3703  auto empty = details::invalid_thread_id;
3704 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3705  auto reusable = details::invalid_thread_id2;
3706  if (mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3707  implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed); // already counted as a used slot
3708  mainHash->entries[index].value = producer;
3709  break;
3710  }
3711 #endif
3712  if (mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3713  mainHash->entries[index].value = producer;
3714  break;
3715  }
3716  ++index;
3717  }
3718  return producer;
3719  }
3720 
3721  // Hmm, the old hash is quite full and somebody else is busy allocating a new one.
3722  // We need to wait for the allocating thread to finish (if it succeeds, we add, if not,
3723  // we try to allocate ourselves).
3724  mainHash = implicitProducerHash.load(std::memory_order_acquire);
3725  }
3726  }
3727 
3728 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3729  void implicit_producer_thread_exited(ImplicitProducer* producer)
3730  {
3731  // Remove from hash
3732 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3733  debug::DebugLock lock(implicitProdMutex);
3734 #endif
3735  auto hash = implicitProducerHash.load(std::memory_order_acquire);
3736  assert(hash != nullptr); // The thread exit listener is only registered if we were added to a hash in the first place
3737  auto id = details::thread_id();
3738  auto hashedId = details::hash_thread_id(id);
3739  details::thread_id_t probedKey;
3740 
3741  // We need to traverse all the hashes just in case other threads aren't on the current one yet and are
3742  // trying to add an entry thinking there's a free slot (because they reused a producer)
3743  for (; hash != nullptr; hash = hash->prev) {
3744  auto index = hashedId;
3745  do {
3746  index &= hash->capacity - 1u;
3747  probedKey = id;
3748  if (hash->entries[index].key.compare_exchange_strong(probedKey, details::invalid_thread_id2, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3749  break;
3750  }
3751  ++index;
3752  } while (probedKey != details::invalid_thread_id); // Can happen if the hash has changed but we weren't put back in it yet, or if we weren't added to this hash in the first place
3753  }
3754 
3755  // Mark the queue as being recyclable
3756  producer->inactive.store(true, std::memory_order_release);
3757  }
3758 
3759  static void implicit_producer_thread_exited_callback(void* userData)
3760  {
3761  auto producer = static_cast<ImplicitProducer*>(userData);
3762  auto queue = producer->parent;
3763  queue->implicit_producer_thread_exited(producer);
3764  }
3765 #endif
3766 
3768  // Utility functions
3770 
3771  template <typename TAlign>
3772  static inline void* aligned_malloc(size_t size)
3773  {
3774  MOODYCAMEL_CONSTEXPR_IF(std::alignment_of<TAlign>::value <= std::alignment_of<details::max_align_t>::value)
3775  return (Traits::malloc)(size);
3776  else
3777  {
3778  size_t alignment = std::alignment_of<TAlign>::value;
3779  void* raw = (Traits::malloc)(size + alignment - 1 + sizeof(void*));
3780  if (!raw)
3781  return nullptr;
3782  char* ptr = details::align_for<TAlign>(reinterpret_cast<char*>(raw) + sizeof(void*));
3783  *(reinterpret_cast<void**>(ptr) - 1) = raw;
3784  return ptr;
3785  }
3786  }
3787 
3788  template <typename TAlign>
3789  static inline void aligned_free(void* ptr)
3790  {
3791  MOODYCAMEL_CONSTEXPR_IF(std::alignment_of<TAlign>::value <= std::alignment_of<details::max_align_t>::value)
3792  return (Traits::free)(ptr);
3793  else(Traits::free)(ptr ? *(reinterpret_cast<void**>(ptr) - 1) : nullptr);
3794  }
3795 
3796  template <typename U>
3797  static inline U* create_array(size_t count)
3798  {
3799  assert(count > 0);
3800  U* p = static_cast<U*>(aligned_malloc<U>(sizeof(U) * count));
3801  if (p == nullptr)
3802  return nullptr;
3803 
3804  for (size_t i = 0; i != count; ++i)
3805  new (p + i) U();
3806  return p;
3807  }
3808 
3809  template <typename U>
3810  static inline void destroy_array(U* p, size_t count)
3811  {
3812  if (p != nullptr) {
3813  assert(count > 0);
3814  for (size_t i = count; i != 0;)
3815  (p + --i)->~U();
3816  }
3817  aligned_free<U>(p);
3818  }
3819 
3820  template <typename U>
3821  static inline U* create()
3822  {
3823  void* p = aligned_malloc<U>(sizeof(U));
3824  return p != nullptr ? new (p) U : nullptr;
3825  }
3826 
3827  template <typename U, typename A1>
3828  static inline U* create(A1&& a1)
3829  {
3830  void* p = aligned_malloc<U>(sizeof(U));
3831  return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr;
3832  }
3833 
3834  template <typename U>
3835  static inline void destroy(U* p)
3836  {
3837  if (p != nullptr)
3838  p->~U();
3839  aligned_free<U>(p);
3840  }
3841 
3842 private:
3843  std::atomic<ProducerBase*> producerListTail;
3844  std::atomic<std::uint32_t> producerCount;
3845 
3846  std::atomic<size_t> initialBlockPoolIndex;
3847  Block* initialBlockPool;
3848  size_t initialBlockPoolSize;
3849 
3850 #ifndef MCDBGQ_USEDEBUGFREELIST
3851  FreeList<Block> freeList;
3852 #else
3853  debug::DebugFreeList<Block> freeList;
3854 #endif
3855 
3856  std::atomic<ImplicitProducerHash*> implicitProducerHash;
3857  std::atomic<size_t> implicitProducerHashCount; // Number of slots logically used
3858  ImplicitProducerHash initialImplicitProducerHash;
3859  std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
3860  std::atomic_flag implicitProducerHashResizeInProgress;
3861 
3862  std::atomic<std::uint32_t> nextExplicitConsumerId;
3863  std::atomic<std::uint32_t> globalExplicitConsumerOffset;
3864 
3865 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3866  debug::DebugMutex implicitProdMutex;
3867 #endif
3868 
3869 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3870  std::atomic<ExplicitProducer*> explicitProducers;
3871  std::atomic<ImplicitProducer*> implicitProducers;
3872 #endif
3873 };
3874 
3875 template <typename T, typename Traits>
3877  : producer(queue.recycle_or_create_producer(true))
3878 {
3879  if (producer != nullptr) {
3880  producer->token = this;
3881  }
3882 }
3883 
3884 template <typename T, typename Traits>
3886  : producer(reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->recycle_or_create_producer(true))
3887 {
3888  if (producer != nullptr) {
3889  producer->token = this;
3890  }
3891 }
3892 
3893 template <typename T, typename Traits>
3895  : itemsConsumedFromCurrent(0)
3896  , currentProducer(nullptr)
3897  , desiredProducer(nullptr)
3898 {
3899  initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3900  lastKnownGlobalOffset = static_cast<std::uint32_t>(-1);
3901 }
3902 
3903 template <typename T, typename Traits>
3905  : itemsConsumedFromCurrent(0)
3906  , currentProducer(nullptr)
3907  , desiredProducer(nullptr)
3908 {
3909  initialOffset = reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3910  lastKnownGlobalOffset = static_cast<std::uint32_t>(-1);
3911 }
3912 
3913 template <typename T, typename Traits>
3915 {
3916  a.swap(b);
3917 }
3918 
3920 {
3921  a.swap(b);
3922 }
3923 
3925 {
3926  a.swap(b);
3927 }
3928 
3929 template <typename T, typename Traits>
3931 {
3932  a.swap(b);
3933 }
3934 
3935 } // namespace utils
3936 } // namespace genesis
3937 
3938 #if defined(_MSC_VER) && (!defined(_HAS_CXX17) || !_HAS_CXX17)
3939 #pragma warning(pop)
3940 #endif
3941 
3942 #if defined(__GNUC__) && !defined(__INTEL_COMPILER)
3943 #pragma GCC diagnostic pop
3944 #endif
3945 
3946 #endif // include guard
genesis::utils::ConcurrentQueue::enqueue
bool enqueue(producer_token_t const &token, T const &item)
Definition: concurrent_queue.hpp:1190
genesis::utils::ConcurrentQueue::ConcurrentQueue
ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
Definition: concurrent_queue.hpp:1008
genesis::utils::details::max_align_t
Definition: concurrent_queue.hpp:445
genesis::utils::ConcurrentQueueDefaultTraits::BLOCK_SIZE
static const size_t BLOCK_SIZE
Definition: concurrent_queue.hpp:479
genesis::utils::ProducerToken::swap
void swap(ProducerToken &other) noexcept
Definition: concurrent_queue.hpp:843
genesis::utils::ProducerToken::ProducerToken
ProducerToken(ProducerToken &&other) noexcept
Definition: concurrent_queue.hpp:828
genesis::utils::details::swap_relaxed
static void swap_relaxed(std::atomic< T > &left, std::atomic< T > &right)
Definition: concurrent_queue.hpp:652
MOODYCAMEL_NOEXCEPT
#define MOODYCAMEL_NOEXCEPT
Definition: concurrent_queue.hpp:298
genesis::utils::details::align_for
static char * align_for(char *ptr)
Definition: concurrent_queue.hpp:628
genesis::utils::ConcurrentQueue::try_enqueue
bool try_enqueue(producer_token_t const &token, T &&item)
Definition: concurrent_queue.hpp:1265
genesis::utils::ConcurrentQueue::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
Definition: concurrent_queue.hpp:948
genesis::utils::ConcurrentQueue::ConcurrentQueue
ConcurrentQueue(size_t capacity=32 *BLOCK_SIZE)
Definition: concurrent_queue.hpp:984
genesis::utils::details::const_numeric_max
Definition: concurrent_queue.hpp:430
genesis::utils::ConcurrentQueueDefaultTraits::MAX_SEMA_SPINS
static const int MAX_SEMA_SPINS
Definition: concurrent_queue.hpp:517
genesis::utils::ConcurrentQueue::try_enqueue_bulk
bool try_enqueue_bulk(It itemFirst, size_t count)
Definition: concurrent_queue.hpp:1278
genesis::utils::ConcurrentQueue::enqueue
bool enqueue(T const &item)
Definition: concurrent_queue.hpp:1167
genesis::utils::details::invalid_thread_id
static const thread_id_t invalid_thread_id
Definition: concurrent_queue.hpp:241
genesis::utils::tail
std::string tail(std::string const &text, size_t lines)
Return the last lines of the text.
Definition: string.cpp:505
MOODYCAMEL_RETHROW
#define MOODYCAMEL_RETHROW
Definition: concurrent_queue.hpp:273
genesis::utils::details::const_numeric_max::value
static const T value
Definition: concurrent_queue.hpp:432
genesis::utils::ProducerToken::ProducerToken
ProducerToken(ConcurrentQueue< T, Traits > &queue)
Definition: concurrent_queue.hpp:3876
genesis::utils::ConcurrentQueueDefaultTraits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
Definition: concurrent_queue.hpp:505
genesis::utils::ConcurrentQueueDefaultTraits::index_t
std::size_t index_t
Definition: concurrent_queue.hpp:472
genesis::utils::details::ConcurrentQueueProducerTypelessBase
Definition: concurrent_queue.hpp:568
genesis::utils::details::ConcurrentQueueProducerTypelessBase::next
ConcurrentQueueProducerTypelessBase * next
Definition: concurrent_queue.hpp:569
MOODYCAMEL_DELETE_FUNCTION
#define MOODYCAMEL_DELETE_FUNCTION
Definition: concurrent_queue.hpp:324
genesis::utils::details::thread_id_converter::thread_id_hash_t
thread_id_t thread_id_hash_t
Definition: concurrent_queue.hpp:146
genesis::utils::details::hash_thread_id
static size_t hash_thread_id(thread_id_t id)
Definition: concurrent_queue.hpp:611
genesis::utils::ConsumerToken
Definition: concurrent_queue.hpp:885
genesis::utils::ConcurrentQueue::ConcurrentQueue
ConcurrentQueue(ConcurrentQueue &&other) noexcept
Definition: concurrent_queue.hpp:1083
genesis::utils::ConsumerToken::operator=
ConsumerToken & operator=(ConsumerToken &&other) noexcept
Definition: concurrent_queue.hpp:901
genesis::utils::details::ThreadExitNotifier::subscribe
static void subscribe(ThreadExitListener *listener)
Definition: concurrent_queue.hpp:718
genesis::utils::details::ThreadExitListener::chain
ThreadExitNotifier * chain
Definition: concurrent_queue.hpp:713
genesis::utils::details::nomove_if
Definition: concurrent_queue.hpp:666
MOODYCAMEL_NO_TSAN
#define MOODYCAMEL_NO_TSAN
Definition: concurrent_queue.hpp:393
genesis::utils::ConsumerToken::swap
void swap(ConsumerToken &other) noexcept
Definition: concurrent_queue.hpp:907
genesis::utils::ConcurrentQueue::is_lock_free
static constexpr bool is_lock_free()
Definition: concurrent_queue.hpp:1510
genesis::utils::ConcurrentQueueDefaultTraits::IMPLICIT_INITIAL_INDEX_SIZE
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
Definition: concurrent_queue.hpp:494
genesis::utils::details::likely
static bool() likely(bool x)
Definition: concurrent_queue.hpp:412
genesis::utils::ConcurrentQueueDefaultTraits::malloc
static void * malloc(size_t size)
Definition: concurrent_queue.hpp:537
genesis::utils::BlockingConcurrentQueue
Definition: blocking_concurrent_queue.hpp:82
genesis::utils::ConcurrentQueue::ExplicitProducer
friend struct ExplicitProducer
Definition: concurrent_queue.hpp:1518
genesis::utils::ConcurrentQueueDefaultTraits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
Definition: concurrent_queue.hpp:486
genesis::utils::ConcurrentQueue::index_t
Traits::index_t index_t
Definition: concurrent_queue.hpp:944
genesis::utils::ConcurrentQueue::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
Definition: concurrent_queue.hpp:951
genesis::utils::offset
void offset(Histogram &h, double value)
Definition: operations.cpp:47
genesis::utils::ConcurrentQueue::operator=
ConcurrentQueue & operator=(ConcurrentQueue const &)=delete
genesis::utils::ConcurrentQueue::enqueue_bulk
bool enqueue_bulk(It itemFirst, size_t count)
Definition: concurrent_queue.hpp:1211
genesis::utils::details::ThreadExitNotifier
Definition: concurrent_queue.hpp:716
genesis::utils::ConcurrentQueue::try_dequeue_from_producer
bool try_dequeue_from_producer(producer_token_t const &producer, U &item)
Definition: concurrent_queue.hpp:1474
genesis::utils::ConsumerToken::ConcurrentQueueTests
friend class ConcurrentQueueTests
Definition: concurrent_queue.hpp:923
genesis::utils::swap
void swap(Color &lhs, Color &rhs)
Definition: color.hpp:207
genesis::utils::details::ConcurrentQueueProducerTypelessBase::inactive
std::atomic< bool > inactive
Definition: concurrent_queue.hpp:570
MOODYCAMEL_ALIGNED_TYPE_LIKE
#define MOODYCAMEL_ALIGNED_TYPE_LIKE(T, obj)
Definition: concurrent_queue.hpp:383
genesis::utils::details::identity::type
T type
Definition: concurrent_queue.hpp:379
genesis::utils::details::ThreadExitListener::callback_t
void(* callback_t)(void *)
Definition: concurrent_queue.hpp:708
genesis::utils::ProducerToken::valid
bool valid() const
Definition: concurrent_queue.hpp:862
genesis::utils::ConcurrentQueue::try_dequeue_bulk_from_producer
size_t try_dequeue_bulk_from_producer(producer_token_t const &producer, It itemFirst, size_t max)
Definition: concurrent_queue.hpp:1487
genesis::utils::details::ConcurrentQueueProducerTypelessBase::ConcurrentQueueProducerTypelessBase
ConcurrentQueueProducerTypelessBase()
Definition: concurrent_queue.hpp:573
genesis::utils::head
std::string head(std::string const &text, size_t lines)
Return the first lines of the text.
Definition: string.cpp:496
MOODYCAMEL_THREADLOCAL
#define MOODYCAMEL_THREADLOCAL
Definition: concurrent_queue.hpp:235
genesis::utils::details::unlikely
static bool() unlikely(bool x)
Definition: concurrent_queue.hpp:416
genesis::utils::ConcurrentQueue::EXPLICIT_INITIAL_INDEX_SIZE
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
Definition: concurrent_queue.hpp:949
MOODYCAMEL_NOEXCEPT_ASSIGN
#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr)
Definition: concurrent_queue.hpp:300
genesis::utils::ProducerToken::operator=
ProducerToken & operator=(ProducerToken &&other) noexcept
Definition: concurrent_queue.hpp:837
genesis::utils::ProducerToken::producer
details::ConcurrentQueueProducerTypelessBase * producer
Definition: concurrent_queue.hpp:882
genesis::utils::details::nomove_if::eval
static T const & eval(T const &x)
Definition: concurrent_queue.hpp:668
MOODYCAMEL_MAYBE_UNUSED
#define MOODYCAMEL_MAYBE_UNUSED
Definition: concurrent_queue.hpp:260
genesis::utils::ConcurrentQueueDefaultTraits::free
static void free(void *ptr)
Definition: concurrent_queue.hpp:541
genesis::utils::ConcurrentQueue::try_enqueue
bool try_enqueue(T const &item)
Definition: concurrent_queue.hpp:1235
genesis::utils::hash
Definition: hash_tuple.hpp:76
genesis::utils::ProducerToken::~ProducerToken
~ProducerToken()
Definition: concurrent_queue.hpp:864
MOODYCAMEL_NOEXCEPT_CTOR
#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)
Definition: concurrent_queue.hpp:299
genesis::utils::ConcurrentQueue::try_dequeue_non_interleaved
bool try_dequeue_non_interleaved(U &item)
Definition: concurrent_queue.hpp:1344
genesis::utils::ConcurrentQueue::consumer_token_t
ConsumerToken consumer_token_t
Definition: concurrent_queue.hpp:942
genesis::utils::ConcurrentQueue
Definition: concurrent_queue.hpp:562
genesis::utils::details::static_is_lock_free
Definition: concurrent_queue.hpp:809
genesis::utils::details::thread_id_converter::thread_id_numeric_size_t
thread_id_t thread_id_numeric_size_t
Definition: concurrent_queue.hpp:145
genesis::utils::details::identity
Definition: concurrent_queue.hpp:378
genesis::utils::details::invalid_thread_id2
static const thread_id_t invalid_thread_id2
Definition: concurrent_queue.hpp:242
MOODYCAMEL_TRY
#define MOODYCAMEL_TRY
Definition: concurrent_queue.hpp:271
genesis::utils::ConcurrentQueueDefaultTraits::EXPLICIT_INITIAL_INDEX_SIZE
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
Definition: concurrent_queue.hpp:490
genesis::utils::ConcurrentQueueDefaultTraits::MAX_SUBQUEUE_SIZE
static const size_t MAX_SUBQUEUE_SIZE
Definition: concurrent_queue.hpp:511
genesis::utils::details::ThreadExitListener::userData
void * userData
Definition: concurrent_queue.hpp:710
genesis::utils::ConcurrentQueue::producer_token_t
ProducerToken producer_token_t
Definition: concurrent_queue.hpp:941
genesis::utils::ConcurrentQueue::size_t
Traits::size_t size_t
Definition: concurrent_queue.hpp:945
genesis::utils::ConcurrentQueue::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
Definition: concurrent_queue.hpp:952
genesis::utils::ConcurrentQueue::size_approx
size_t size_approx() const
Definition: concurrent_queue.hpp:1498
genesis
Container namespace for all symbols of genesis in order to keep them separate when used as a library.
Definition: placement/formats/edge_color.cpp:42
genesis::utils::details::ThreadExitListener
Definition: concurrent_queue.hpp:707
genesis::utils::details::_hash_32_or_64< 1 >::hash
static std::uint64_t hash(std::uint64_t h)
Definition: concurrent_queue.hpp:598
genesis::utils::details::thread_id_converter
Definition: concurrent_queue.hpp:144
genesis::utils::details::max_align_t::x
std_max_align_t x
Definition: concurrent_queue.hpp:446
genesis::utils::ConcurrentQueue::IMPLICIT_INITIAL_INDEX_SIZE
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
Definition: concurrent_queue.hpp:950
genesis::utils::ConcurrentQueueDefaultTraits
Definition: concurrent_queue.hpp:458
genesis::utils::swap
void swap(ConsumerToken &a, ConsumerToken &b) noexcept
Definition: concurrent_queue.hpp:3924
genesis::utils::details::nomove_if< false >::eval
static auto eval(U &&x) -> decltype(std::forward< U >(x))
Definition: concurrent_queue.hpp:677
genesis::utils::ConcurrentQueue::BLOCK_SIZE
static const size_t BLOCK_SIZE
Definition: concurrent_queue.hpp:947
genesis::utils::details::deref_noexcept
static auto deref_noexcept(It &it) noexcept -> decltype(*it)
Definition: concurrent_queue.hpp:685
genesis::utils::details::std_max_align_t
std::max_align_t std_max_align_t
Definition: concurrent_queue.hpp:440
genesis::utils::ConcurrentQueue::try_dequeue_bulk
size_t try_dequeue_bulk(It itemFirst, size_t max)
Definition: concurrent_queue.hpp:1407
genesis::utils::ConcurrentQueue::try_enqueue
bool try_enqueue(producer_token_t const &token, T const &item)
Definition: concurrent_queue.hpp:1257
genesis::utils::details::ConcurrentQueueProducerTypelessBase::token
ProducerToken * token
Definition: concurrent_queue.hpp:571
genesis::utils::details::max_align_t::y
long long y
Definition: concurrent_queue.hpp:447
genesis::utils::ConsumerToken::ConsumerToken
ConsumerToken(ConsumerToken &&other) noexcept
Definition: concurrent_queue.hpp:892
genesis::utils::details::ThreadExitNotifier::unsubscribe
static void unsubscribe(ThreadExitListener *listener)
Definition: concurrent_queue.hpp:727
genesis::utils::ConcurrentQueue::~ConcurrentQueue
~ConcurrentQueue()
Definition: concurrent_queue.hpp:1029
genesis::utils::details::_hash_32_or_64::hash
static std::uint32_t hash(std::uint32_t h)
Definition: concurrent_queue.hpp:583
genesis::utils::details::circular_less_than
static bool circular_less_than(T a, T b)
Definition: concurrent_queue.hpp:619
genesis::utils::details::is_trivially_destructible
Definition: concurrent_queue.hpp:692
genesis::utils::ConcurrentQueue::try_enqueue
bool try_enqueue(T &&item)
Definition: concurrent_queue.hpp:1247
MOODYCAMEL_CONSTEXPR_IF
#define MOODYCAMEL_CONSTEXPR_IF
Definition: concurrent_queue.hpp:259
genesis::utils::details::thread_id
thread_id_t thread_id()
Definition: concurrent_queue.hpp:243
genesis::utils::ConcurrentQueue::MAX_SUBQUEUE_SIZE
static const size_t MAX_SUBQUEUE_SIZE
Definition: concurrent_queue.hpp:958
genesis::utils::details::static_is_lock_free_num::value
@ value
Definition: concurrent_queue.hpp:786
genesis::utils::details::ThreadExitListener::next
ThreadExitListener * next
Definition: concurrent_queue.hpp:712
genesis::utils::details::max_align_t::z
void * z
Definition: concurrent_queue.hpp:448
genesis::utils::ConcurrentQueue::ConcurrentQueueTests
friend class ConcurrentQueueTests
Definition: concurrent_queue.hpp:1522
genesis::utils::ConcurrentQueue::try_dequeue
bool try_dequeue(U &item)
Definition: concurrent_queue.hpp:1301
genesis::utils::ConcurrentQueue::try_enqueue_bulk
bool try_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
Definition: concurrent_queue.hpp:1291
genesis::utils::details::hash_32_or_64
Definition: concurrent_queue.hpp:608
genesis::utils::ConsumerToken::ConsumerToken
ConsumerToken(ConcurrentQueue< T, Traits > &q)
Definition: concurrent_queue.hpp:3894
genesis::utils::details::thread_id_converter::prehash
static thread_id_hash_t prehash(thread_id_t const &x)
Definition: concurrent_queue.hpp:147
genesis::utils::ConcurrentQueue::enqueue
bool enqueue(producer_token_t const &token, T &&item)
Definition: concurrent_queue.hpp:1199
genesis::utils::details::thread_id_t
std::uintptr_t thread_id_t
Definition: concurrent_queue.hpp:240
genesis::utils::ConcurrentQueue::swap
void swap(ConcurrentQueue &other) noexcept
Definition: concurrent_queue.hpp:1127
genesis::utils::details::_hash_32_or_64
Definition: concurrent_queue.hpp:582
genesis::utils::details::ceil_to_pow_2
static T ceil_to_pow_2(T x)
Definition: concurrent_queue.hpp:635
genesis::utils::ConcurrentQueueDefaultTraits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
Definition: concurrent_queue.hpp:500
genesis::utils::ProducerToken::ConcurrentQueueTests
friend class ConcurrentQueueTests
Definition: concurrent_queue.hpp:879
genesis::utils::ConcurrentQueueDefaultTraits::RECYCLE_ALLOCATED_BLOCKS
static const bool RECYCLE_ALLOCATED_BLOCKS
Definition: concurrent_queue.hpp:524
genesis::utils::ProducerToken
Definition: concurrent_queue.hpp:821
genesis::utils::ConcurrentQueue::enqueue
bool enqueue(T &&item)
Definition: concurrent_queue.hpp:1179
genesis::utils::details::nomove
static T const & nomove(T const &x)
Definition: concurrent_queue.hpp:660
genesis::utils::ConcurrentQueueDefaultTraits::size_t
std::size_t size_t
Definition: concurrent_queue.hpp:460
genesis::utils::ConcurrentQueue::try_dequeue_bulk
size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max)
Definition: concurrent_queue.hpp:1425
genesis::utils::ConcurrentQueue::operator=
ConcurrentQueue & operator=(ConcurrentQueue &&other) noexcept
Definition: concurrent_queue.hpp:1117
genesis::utils::details::ThreadExitListener::callback
callback_t callback
Definition: concurrent_queue.hpp:709
genesis::utils::details::static_is_lock_free_num
Definition: concurrent_queue.hpp:785
genesis::utils::ConcurrentQueue::enqueue_bulk
bool enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
Definition: concurrent_queue.hpp:1225
MOODYCAMEL_CATCH
#define MOODYCAMEL_CATCH(...)
Definition: concurrent_queue.hpp:272
genesis::utils::ConcurrentQueue::try_dequeue
bool try_dequeue(consumer_token_t &token, U &item)
Definition: concurrent_queue.hpp:1359
genesis::utils::ConcurrentQueue::ImplicitProducer
friend struct ImplicitProducer
Definition: concurrent_queue.hpp:1520