1 #ifndef GENESIS_UTILS_THREADING_CONCURRENT_QUEUE_H_
2 #define GENESIS_UTILS_THREADING_CONCURRENT_QUEUE_H_
91 #if defined(__GNUC__) && !defined(__INTEL_COMPILER)
95 #pragma GCC diagnostic push
96 #pragma GCC diagnostic ignored "-Wconversion"
98 #ifdef MCDBGQ_USE_RELACY
99 #pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
103 #if defined(_MSC_VER) && (!defined(_HAS_CXX17) || !_HAS_CXX17)
106 #pragma warning(push)
107 #pragma warning(disable : 4127) // conditional expression is constant
110 #if defined(__APPLE__)
111 #include "TargetConditionals.h"
114 #ifdef MCDBGQ_USE_RELACY
115 #include "relacy/relacy_std.hpp"
116 #include "relacy_shims.h"
136 #include <type_traits>
143 template <
typename thread_
id_t>
152 #if defined(MCDBGQ_USE_RELACY)
163 #elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
166 extern "C" __declspec(dllimport)
unsigned long __stdcall GetCurrentThreadId(
void);
170 static_assert(
sizeof(
unsigned long) ==
sizeof(std::uint32_t),
"Expected size of unsigned long to be 32 bits on Windows");
178 #elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE) || defined(__MVS__) || defined(MOODYCAMEL_NO_THREAD_LOCAL)
182 static_assert(
sizeof(std::thread::id) == 4 ||
sizeof(std::thread::id) == 8,
"std::thread::id is expected to be either 4 or 8 bytes");
192 template <std::
size_t>
193 struct thread_id_size {
196 struct thread_id_size<4> {
197 typedef std::uint32_t numeric_t;
200 struct thread_id_size<8> {
201 typedef std::uint64_t numeric_t;
216 return std::hash<std::thread::id>()(x);
229 #if defined(__GNUC__) || defined(__INTEL_COMPILER)
230 #define MOODYCAMEL_THREADLOCAL __thread
231 #elif defined(_MSC_VER)
232 #define MOODYCAMEL_THREADLOCAL __declspec(thread)
235 #define MOODYCAMEL_THREADLOCAL thread_local
254 #ifndef MOODYCAMEL_CONSTEXPR_IF
255 #if (defined(_MSC_VER) && defined(_HAS_CXX17) && _HAS_CXX17) || __cplusplus > 201402L
256 #define MOODYCAMEL_CONSTEXPR_IF if constexpr
257 #define MOODYCAMEL_MAYBE_UNUSED [[maybe_unused]]
259 #define MOODYCAMEL_CONSTEXPR_IF if
260 #define MOODYCAMEL_MAYBE_UNUSED
265 #ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
266 #if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
267 #define MOODYCAMEL_EXCEPTIONS_ENABLED
270 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
271 #define MOODYCAMEL_TRY try
272 #define MOODYCAMEL_CATCH(...) catch (__VA_ARGS__)
273 #define MOODYCAMEL_RETHROW throw
274 #define MOODYCAMEL_THROW(expr) throw(expr)
276 #define MOODYCAMEL_TRY MOODYCAMEL_CONSTEXPR_IF(true)
277 #define MOODYCAMEL_CATCH(...) else MOODYCAMEL_CONSTEXPR_IF(false)
278 #define MOODYCAMEL_RETHROW
279 #define MOODYCAMEL_THROW(expr)
282 #ifndef MOODYCAMEL_NOEXCEPT
283 #if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
284 #define MOODYCAMEL_NOEXCEPT
285 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
286 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
287 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
290 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT
291 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value)
292 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
293 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
294 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT
295 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value)
296 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
298 #define MOODYCAMEL_NOEXCEPT noexcept
299 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
300 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
304 #ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
305 #ifdef MCDBGQ_USE_RELACY
306 #define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
311 #if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__) && !defined(__MVS__)
313 #define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED // tentatively enabled for now; years ago several users report having problems with it on
320 #ifndef MOODYCAMEL_DELETE_FUNCTION
321 #if defined(_MSC_VER) && _MSC_VER < 1800
322 #define MOODYCAMEL_DELETE_FUNCTION
324 #define MOODYCAMEL_DELETE_FUNCTION = delete
331 #ifndef MOODYCAMEL_ALIGNAS
333 #if defined(_MSC_VER) && _MSC_VER <= 1800
334 #define MOODYCAMEL_ALIGNAS(alignment) __declspec(align(alignment))
335 #define MOODYCAMEL_ALIGNOF(obj) __alignof(obj)
336 #define MOODYCAMEL_ALIGNED_TYPE_LIKE(T, obj) typename details::Vs2013Aligned<std::alignment_of<obj>::value, T>::type
337 template <
int Align,
typename T>
338 struct Vs2013Aligned {
340 template <
typename T>
341 struct Vs2013Aligned<1, T> {
342 typedef __declspec(align(1)) T type;
344 template <
typename T>
345 struct Vs2013Aligned<2, T> {
346 typedef __declspec(align(2)) T type;
348 template <
typename T>
349 struct Vs2013Aligned<4, T> {
350 typedef __declspec(align(4)) T type;
352 template <
typename T>
353 struct Vs2013Aligned<8, T> {
354 typedef __declspec(align(8)) T type;
356 template <
typename T>
357 struct Vs2013Aligned<16, T> {
358 typedef __declspec(align(16)) T type;
360 template <
typename T>
361 struct Vs2013Aligned<32, T> {
362 typedef __declspec(align(32)) T type;
364 template <
typename T>
365 struct Vs2013Aligned<64, T> {
366 typedef __declspec(align(64)) T type;
368 template <
typename T>
369 struct Vs2013Aligned<128, T> {
370 typedef __declspec(align(128)) T type;
372 template <
typename T>
373 struct Vs2013Aligned<256, T> {
374 typedef __declspec(align(256)) T type;
377 template <
typename T>
381 #define MOODYCAMEL_ALIGNAS(alignment) alignas(alignment)
382 #define MOODYCAMEL_ALIGNOF(obj) alignof(obj)
383 #define MOODYCAMEL_ALIGNED_TYPE_LIKE(T, obj) alignas(alignof(obj)) typename details::identity<T>::type
393 #define MOODYCAMEL_NO_TSAN
394 #if defined(__has_feature)
395 #if __has_feature(thread_sanitizer)
396 #undef MOODYCAMEL_NO_TSAN
397 #define MOODYCAMEL_NO_TSAN __attribute__((no_sanitize("thread")))
405 #if defined(__GNUC__)
406 static inline bool(
likely)(
bool x)
408 return __builtin_expect((x),
true);
410 static inline bool(
unlikely)(
bool x) {
return __builtin_expect((x),
false); }
422 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
423 #include "internal/concurrentqueue_internal_debug.h"
429 template <
typename T>
431 static_assert(std::is_integral<T>::value,
"const_numeric_max can only be used with integers");
432 static const T
value = std::numeric_limits<T>::is_signed
433 ? (
static_cast<T
>(1) << (
sizeof(T) * CHAR_BIT - 1)) -
static_cast<T
>(1)
434 :
static_cast<T
>(-1);
437 #if defined(__GLIBCXX__)
526 #ifndef MCDBGQ_USE_RELACY
529 #if defined(malloc) || defined(free)
532 static inline void* WORKAROUND_malloc(
size_t size) {
return malloc(size); }
533 static inline void WORKAROUND_free(
void* ptr) {
return free(ptr); }
534 static inline void*(
malloc)(
size_t size) {
return WORKAROUND_malloc(size); }
535 static inline void(
free)(
void* ptr) {
return WORKAROUND_free(ptr); }
539 return std::malloc(size);
541 static inline void free(
void* ptr) {
return std::free(ptr); }
546 static inline void*
malloc(
size_t size) {
return rl::rl_malloc(size, $); }
547 static inline void free(
void* ptr) {
return rl::rl_free(ptr, $); }
558 struct ProducerToken;
559 struct ConsumerToken;
561 template <
typename T,
typename Traits>
563 template <
typename T,
typename Traits>
565 class ConcurrentQueueTests;
581 template <
bool use32>
583 static inline std::uint32_t
hash(std::uint32_t h)
593 return h ^ (h >> 16);
598 static inline std::uint64_t
hash(std::uint64_t h)
601 h *= 0xff51afd7ed558ccd;
603 h *= 0xc4ceb9fe1a85ec53;
604 return h ^ (h >> 33);
607 template <std::
size_t size>
613 static_assert(
sizeof(
thread_id_t) <= 8,
"Expected a platform where thread IDs are at most 64-bit values");
618 template <
typename T>
621 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
"circular_less_than is intended to be used only with unsigned integer types");
622 return static_cast<T
>(a - b) >
static_cast<T
>(
static_cast<T
>(1) << (
static_cast<T
>(
sizeof(T) * CHAR_BIT - 1)));
627 template <
typename U>
630 const std::size_t alignment = std::alignment_of<U>::value;
631 return ptr + (alignment - (
reinterpret_cast<std::uintptr_t
>(ptr) % alignment)) % alignment;
634 template <
typename T>
637 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
"ceil_to_pow_2 is intended to be used only with unsigned integer types");
644 for (std::size_t i = 1; i <
sizeof(T); i <<= 1) {
651 template <
typename T>
652 static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right)
654 T temp = std::move(left.load(std::memory_order_relaxed));
655 left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
656 right.store(std::move(temp), std::memory_order_relaxed);
659 template <
typename T>
660 static inline T
const&
nomove(T
const& x)
665 template <
bool Enable>
667 template <
typename T>
668 static inline T
const&
eval(T
const& x)
676 template <
typename U>
678 -> decltype(std::forward<U>(x))
680 return std::forward<U>(x);
684 template <
typename It>
690 #if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
691 template <
typename T>
695 template <
typename T>
700 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
701 #ifdef MCDBGQ_USE_RELACY
702 typedef RelacyThreadExitListener ThreadExitListener;
703 typedef RelacyThreadExitNotifier ThreadExitNotifier;
705 class ThreadExitNotifier;
720 auto& tlsInst = instance();
721 std::lock_guard<std::mutex> guard(mutex());
722 listener->
next = tlsInst.tail;
723 listener->
chain = &tlsInst;
724 tlsInst.tail = listener;
729 std::lock_guard<std::mutex> guard(mutex());
730 if (!listener->
chain) {
733 auto& tlsInst = *listener->
chain;
734 listener->
chain =
nullptr;
736 for (
auto ptr = tlsInst.tail; ptr !=
nullptr; ptr = ptr->next) {
737 if (ptr == listener) {
753 ~ThreadExitNotifier()
756 assert(
this == &instance() &&
"If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
757 std::lock_guard<std::mutex> guard(mutex());
758 for (
auto ptr = tail; ptr !=
nullptr; ptr = ptr->next) {
759 ptr->chain =
nullptr;
760 ptr->callback(ptr->userData);
765 static inline ThreadExitNotifier& instance()
767 static thread_local ThreadExitNotifier notifier;
771 static inline std::mutex& mutex()
774 static std::mutex mutex;
779 ThreadExitListener* tail;
784 template <
typename T>
790 enum {
value = ATOMIC_CHAR_LOCK_FREE };
794 enum {
value = ATOMIC_SHORT_LOCK_FREE };
798 enum {
value = ATOMIC_INT_LOCK_FREE };
802 enum {
value = ATOMIC_LONG_LOCK_FREE };
806 enum {
value = ATOMIC_LLONG_LOCK_FREE };
808 template <
typename T>
813 enum {
value = ATOMIC_BOOL_LOCK_FREE };
815 template <
typename U>
817 enum { value = ATOMIC_POINTER_LOCK_FREE };
822 template <
typename T,
typename Traits>
825 template <
typename T,
typename Traits>
831 other.producer =
nullptr;
849 if (other.producer !=
nullptr) {
877 template <
typename T,
typename Traits>
886 template <
typename T,
typename Traits>
889 template <
typename T,
typename Traits>
893 : initialOffset(other.initialOffset),
894 lastKnownGlobalOffset(other.lastKnownGlobalOffset),
895 itemsConsumedFromCurrent(other.itemsConsumedFromCurrent),
896 currentProducer(other.currentProducer),
897 desiredProducer(other.desiredProducer)
909 std::swap(initialOffset, other.initialOffset);
910 std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
911 std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
912 std::swap(currentProducer, other.currentProducer);
913 std::swap(desiredProducer, other.desiredProducer);
921 template <
typename T,
typename Traits>
926 std::uint32_t initialOffset;
927 std::uint32_t lastKnownGlobalOffset;
928 std::uint32_t itemsConsumedFromCurrent;
935 template <
typename T,
typename Traits>
938 template <
typename T,
typename Traits = ConcurrentQueueDefaultTraits>
947 static const size_t BLOCK_SIZE =
static_cast<size_t>(Traits::BLOCK_SIZE);
954 #pragma warning(push)
955 #pragma warning(disable : 4307) // + integral constant overflow (that's what the ternary expression is for!)
956 #pragma warning(disable : 4309) // static_cast: Truncation of constant value
963 static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value,
"Traits::size_t must be an unsigned integral type");
964 static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value,
"Traits::index_t must be an unsigned integral type");
965 static_assert(
sizeof(
index_t) >=
sizeof(
size_t),
"Traits::index_t must be at least as wide as Traits::size_t");
985 : producerListTail(nullptr)
987 , initialBlockPoolIndex(0)
988 , nextExplicitConsumerId(0)
989 , globalExplicitConsumerOffset(0)
991 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
992 populate_initial_implicit_producer_hash();
993 populate_initial_block_list(capacity /
BLOCK_SIZE + ((capacity & (
BLOCK_SIZE - 1)) == 0 ? 0 : 1));
995 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
1000 explicitProducers.store(
nullptr, std::memory_order_relaxed);
1001 implicitProducers.store(
nullptr, std::memory_order_relaxed);
1008 ConcurrentQueue(
size_t minCapacity,
size_t maxExplicitProducers,
size_t maxImplicitProducers)
1009 : producerListTail(nullptr)
1011 , initialBlockPoolIndex(0)
1012 , nextExplicitConsumerId(0)
1013 , globalExplicitConsumerOffset(0)
1015 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
1016 populate_initial_implicit_producer_hash();
1017 size_t blocks = (((minCapacity +
BLOCK_SIZE - 1) /
BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers + maxImplicitProducers);
1018 populate_initial_block_list(blocks);
1020 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
1021 explicitProducers.store(
nullptr, std::memory_order_relaxed);
1022 implicitProducers.store(
nullptr, std::memory_order_relaxed);
1032 auto ptr = producerListTail.load(std::memory_order_relaxed);
1033 while (ptr !=
nullptr) {
1034 auto next = ptr->next_prod();
1035 if (ptr->token !=
nullptr) {
1036 ptr->token->producer =
nullptr;
1045 auto hash = implicitProducerHash.load(std::memory_order_relaxed);
1046 while (
hash !=
nullptr) {
1047 auto prev =
hash->prev;
1048 if (prev !=
nullptr) {
1049 for (
size_t i = 0; i !=
hash->capacity; ++i) {
1050 hash->entries[i].~ImplicitProducerKVP();
1052 hash->~ImplicitProducerHash();
1053 (Traits::free)(
hash);
1060 auto block = freeList.head_unsafe();
1061 while (block !=
nullptr) {
1062 auto next = block->freeListNext.load(std::memory_order_relaxed);
1063 if (block->dynamicallyAllocated) {
1070 destroy_array(initialBlockPool, initialBlockPoolSize);
1084 : producerListTail(other.producerListTail.load(std::memory_order_relaxed)),
1085 producerCount(other.producerCount.load(std::memory_order_relaxed)),
1086 initialBlockPoolIndex(other.initialBlockPoolIndex.load(std::memory_order_relaxed)),
1087 initialBlockPool(other.initialBlockPool),
1088 initialBlockPoolSize(other.initialBlockPoolSize),
1089 freeList(std::move(other.freeList)),
1090 nextExplicitConsumerId(other.nextExplicitConsumerId.load(std::memory_order_relaxed)),
1091 globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(std::memory_order_relaxed))
1094 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
1095 populate_initial_implicit_producer_hash();
1096 swap_implicit_producer_hashes(other);
1098 other.producerListTail.store(
nullptr, std::memory_order_relaxed);
1099 other.producerCount.store(0, std::memory_order_relaxed);
1100 other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
1101 other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
1103 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
1104 explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
1105 other.explicitProducers.store(
nullptr, std::memory_order_relaxed);
1106 implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
1107 other.implicitProducers.store(
nullptr, std::memory_order_relaxed);
1110 other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
1111 other.initialBlockPoolSize = 0;
1112 other.initialBlockPool =
nullptr;
1119 return swap_internal(other);
1129 swap_internal(other);
1135 if (
this == &other) {
1142 std::swap(initialBlockPool, other.initialBlockPool);
1143 std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
1144 freeList.swap(other.freeList);
1148 swap_implicit_producer_hashes(other);
1151 other.reown_producers();
1153 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
1171 else return inner_enqueue<CanAlloc>(item);
1183 else return inner_enqueue<CanAlloc>(std::move(item));
1192 return inner_enqueue<CanAlloc>(token, item);
1201 return inner_enqueue<CanAlloc>(token, std::move(item));
1210 template <
typename It>
1215 else return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
1224 template <
typename It>
1227 return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
1239 else return inner_enqueue<CannotAlloc>(item);
1251 else return inner_enqueue<CannotAlloc>(std::move(item));
1259 return inner_enqueue<CannotAlloc>(token, item);
1267 return inner_enqueue<CannotAlloc>(token, std::move(item));
1277 template <
typename It>
1282 else return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1290 template <
typename It>
1293 return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1300 template <
typename U>
1305 size_t nonEmptyCount = 0;
1306 ProducerBase* best =
nullptr;
1307 size_t bestSize = 0;
1308 for (
auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod()) {
1309 auto size = ptr->size_approx();
1311 if (size > bestSize) {
1321 if (nonEmptyCount > 0) {
1325 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1326 if (ptr != best && ptr->dequeue(item)) {
1343 template <
typename U>
1346 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1347 if (ptr->dequeue(item)) {
1358 template <
typename U>
1367 if (token.desiredProducer ==
nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1368 if (!update_current_producer_after_rotation(token)) {
1375 if (
static_cast<ProducerBase*
>(token.currentProducer)->dequeue(item)) {
1377 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1382 auto tail = producerListTail.load(std::memory_order_acquire);
1383 auto ptr =
static_cast<ProducerBase*
>(token.currentProducer)->next_prod();
1384 if (ptr ==
nullptr) {
1387 while (ptr !=
static_cast<ProducerBase*
>(token.currentProducer)) {
1388 if (ptr->dequeue(item)) {
1389 token.currentProducer = ptr;
1390 token.itemsConsumedFromCurrent = 1;
1393 ptr = ptr->next_prod();
1394 if (ptr ==
nullptr) {
1406 template <
typename It>
1410 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1411 count += ptr->dequeue_bulk(itemFirst, max - count);
1424 template <
typename It>
1427 if (token.desiredProducer ==
nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1428 if (!update_current_producer_after_rotation(token)) {
1433 size_t count =
static_cast<ProducerBase*
>(token.currentProducer)->dequeue_bulk(itemFirst, max);
1436 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1440 token.itemsConsumedFromCurrent +=
static_cast<std::uint32_t
>(count);
1443 auto tail = producerListTail.load(std::memory_order_acquire);
1444 auto ptr =
static_cast<ProducerBase*
>(token.currentProducer)->next_prod();
1445 if (ptr ==
nullptr) {
1448 while (ptr !=
static_cast<ProducerBase*
>(token.currentProducer)) {
1449 auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1451 if (dequeued != 0) {
1452 token.currentProducer = ptr;
1453 token.itemsConsumedFromCurrent =
static_cast<std::uint32_t
>(dequeued);
1455 if (dequeued == max) {
1459 ptr = ptr->next_prod();
1460 if (ptr ==
nullptr) {
1473 template <
typename U>
1486 template <
typename It>
1501 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1502 size += ptr->size_approx();
1524 enum AllocationMode { CanAlloc,
1531 template <AllocationMode canAlloc,
typename U>
1534 return static_cast<ExplicitProducer*
>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1537 template <AllocationMode canAlloc,
typename U>
1538 inline bool inner_enqueue(U&& element)
1540 auto producer = get_or_add_implicit_producer();
1541 return producer ==
nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1544 template <AllocationMode canAlloc,
typename It>
1545 inline bool inner_enqueue_bulk(
producer_token_t const& token, It itemFirst,
size_t count)
1547 return static_cast<ExplicitProducer*
>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1550 template <AllocationMode canAlloc,
typename It>
1551 inline bool inner_enqueue_bulk(It itemFirst,
size_t count)
1553 auto producer = get_or_add_implicit_producer();
1554 return producer ==
nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1557 inline bool update_current_producer_after_rotation(
consumer_token_t& token)
1560 auto tail = producerListTail.load(std::memory_order_acquire);
1561 if (token.desiredProducer ==
nullptr &&
tail ==
nullptr) {
1564 auto prodCount = producerCount.load(std::memory_order_relaxed);
1565 auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
1570 std::uint32_t
offset = prodCount - 1 - (token.initialOffset % prodCount);
1571 token.desiredProducer =
tail;
1572 for (std::uint32_t i = 0; i !=
offset; ++i) {
1573 token.desiredProducer =
static_cast<ProducerBase*
>(token.desiredProducer)->next_prod();
1574 if (token.desiredProducer ==
nullptr) {
1575 token.desiredProducer =
tail;
1580 std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1581 if (delta >= prodCount) {
1582 delta = delta % prodCount;
1584 for (std::uint32_t i = 0; i != delta; ++i) {
1585 token.desiredProducer =
static_cast<ProducerBase*
>(token.desiredProducer)->next_prod();
1586 if (token.desiredProducer ==
nullptr) {
1587 token.desiredProducer =
tail;
1591 token.lastKnownGlobalOffset = globalOffset;
1592 token.currentProducer = token.desiredProducer;
1593 token.itemsConsumedFromCurrent = 0;
1601 template <
typename N>
1602 struct FreeListNode {
1605 , freeListNext(nullptr)
1609 std::atomic<std::uint32_t> freeListRefs;
1610 std::atomic<N*> freeListNext;
1616 template <
typename N>
1619 : freeListHead(nullptr)
1622 FreeList(FreeList&& other)
1623 : freeListHead(other.freeListHead.load(std::memory_order_relaxed))
1625 other.freeListHead.store(
nullptr, std::memory_order_relaxed);
1632 inline void add(N* node)
1634 #ifdef MCDBGQ_NOLOCKFREE_FREELIST
1635 debug::DebugLock lock(mutex);
1639 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
1642 add_knowing_refcount_is_zero(node);
1648 #ifdef MCDBGQ_NOLOCKFREE_FREELIST
1649 debug::DebugLock lock(mutex);
1651 auto head = freeListHead.load(std::memory_order_acquire);
1652 while (
head !=
nullptr) {
1653 auto prevHead =
head;
1654 auto refs =
head->freeListRefs.load(std::memory_order_relaxed);
1655 if ((refs & REFS_MASK) == 0 || !
head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) {
1656 head = freeListHead.load(std::memory_order_acquire);
1662 auto next =
head->freeListNext.load(std::memory_order_relaxed);
1663 if (freeListHead.compare_exchange_strong(
head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
1666 assert((
head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
1669 head->freeListRefs.fetch_sub(2, std::memory_order_release);
1676 refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1677 if (refs == SHOULD_BE_ON_FREELIST + 1) {
1678 add_knowing_refcount_is_zero(prevHead);
1686 N* head_unsafe()
const {
return freeListHead.load(std::memory_order_relaxed); }
1689 inline void add_knowing_refcount_is_zero(N* node)
1699 auto head = freeListHead.load(std::memory_order_relaxed);
1701 node->freeListNext.store(
head, std::memory_order_relaxed);
1702 node->freeListRefs.store(1, std::memory_order_release);
1703 if (!freeListHead.compare_exchange_strong(
head, node, std::memory_order_release, std::memory_order_relaxed)) {
1705 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
1715 std::atomic<N*> freeListHead;
1717 static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1718 static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1720 #ifdef MCDBGQ_NOLOCKFREE_FREELIST
1721 debug::DebugMutex mutex;
1729 enum InnerQueueContext { implicit_context = 0,
1730 explicit_context = 1 };
1735 , elementsCompletelyDequeued(0)
1737 , freeListNext(nullptr)
1738 , dynamicallyAllocated(true)
1740 #ifdef MCDBGQ_TRACKMEM
1745 template <InnerQueueContext context>
1746 inline bool is_empty()
const
1752 if (!emptyFlags[i].load(std::memory_order_relaxed)) {
1758 std::atomic_thread_fence(std::memory_order_acquire);
1764 if (elementsCompletelyDequeued.load(std::memory_order_relaxed) ==
BLOCK_SIZE) {
1765 std::atomic_thread_fence(std::memory_order_acquire);
1768 assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <=
BLOCK_SIZE);
1774 template <InnerQueueContext context>
1780 assert(!emptyFlags[
BLOCK_SIZE - 1 -
static_cast<size_t>(i &
static_cast<index_t>(
BLOCK_SIZE - 1))].load(std::memory_order_relaxed));
1781 emptyFlags[
BLOCK_SIZE - 1 -
static_cast<size_t>(i &
static_cast<index_t>(
BLOCK_SIZE - 1))].store(
true, std::memory_order_release);
1787 auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
1795 template <InnerQueueContext context>
1801 std::atomic_thread_fence(std::memory_order_release);
1803 for (
size_t j = 0; j != count; ++j) {
1804 assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
1805 emptyFlags[i + j].store(
true, std::memory_order_relaxed);
1812 auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
1818 template <InnerQueueContext context>
1819 inline void set_all_empty()
1825 emptyFlags[i].store(
true, std::memory_order_relaxed);
1831 elementsCompletelyDequeued.store(
BLOCK_SIZE, std::memory_order_relaxed);
1835 template <InnerQueueContext context>
1836 inline void reset_empty()
1842 emptyFlags[i].store(
false, std::memory_order_relaxed);
1848 elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
1856 static_assert(std::alignment_of<T>::value <=
sizeof(T),
"The queue does not support types with an alignment greater than their size at this time");
1862 std::atomic<size_t> elementsCompletelyDequeued;
1866 std::atomic<std::uint32_t> freeListRefs;
1867 std::atomic<Block*> freeListNext;
1868 bool dynamicallyAllocated;
1870 #ifdef MCDBGQ_TRACKMEM
1874 static_assert(std::alignment_of<Block>::value >= std::alignment_of<T>::value,
"Internal error: Blocks must be at least as aligned as the type they are wrapping");
1876 #ifdef MCDBGQ_TRACKMEM
1887 struct ProducerBase :
public details::ConcurrentQueueProducerTypelessBase {
1891 , dequeueOptimisticCount(0)
1892 , dequeueOvercommit(0)
1893 , tailBlock(nullptr)
1894 , isExplicit(isExplicit_)
1899 virtual ~ProducerBase() {}
1901 template <
typename U>
1902 inline bool dequeue(U& element)
1911 template <
typename It>
1912 inline size_t dequeue_bulk(It& itemFirst,
size_t max)
1921 inline ProducerBase* next_prod()
const {
return static_cast<ProducerBase*
>(
next); }
1925 auto tail = tailIndex.load(std::memory_order_relaxed);
1926 auto head = headIndex.load(std::memory_order_relaxed);
1930 inline index_t getTail()
const {
return tailIndex.load(std::memory_order_relaxed); }
1933 std::atomic<index_t> tailIndex;
1934 std::atomic<index_t> headIndex;
1936 std::atomic<index_t> dequeueOptimisticCount;
1937 std::atomic<index_t> dequeueOvercommit;
1946 #ifdef MCDBGQ_TRACKMEM
1947 friend struct MemStats;
1957 : ProducerBase(parent_, true)
1958 , blockIndex(nullptr)
1959 , pr_blockIndexSlotsUsed(0)
1961 , pr_blockIndexFront(0)
1962 , pr_blockIndexEntries(nullptr)
1963 , pr_blockIndexRaw(nullptr)
1966 if (poolBasedIndexSize > pr_blockIndexSize) {
1967 pr_blockIndexSize = poolBasedIndexSize;
1978 if (this->tailBlock !=
nullptr) {
1980 Block* halfDequeuedBlock =
nullptr;
1981 if ((this->headIndex.load(std::memory_order_relaxed) &
static_cast<index_t>(
BLOCK_SIZE - 1)) != 0) {
1984 size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
1985 while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base +
BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) {
1986 i = (i + 1) & (pr_blockIndexSize - 1);
1988 assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed)));
1989 halfDequeuedBlock = pr_blockIndexEntries[i].block;
1993 auto block = this->tailBlock;
1995 block = block->next;
1996 if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2001 if (block == halfDequeuedBlock) {
2002 i =
static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) &
static_cast<index_t>(
BLOCK_SIZE - 1));
2006 auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) &
static_cast<index_t>(
BLOCK_SIZE - 1)) == 0 ?
BLOCK_SIZE :
static_cast<size_t>(this->tailIndex.load(std::memory_order_relaxed) &
static_cast<index_t>(
BLOCK_SIZE - 1));
2007 while (i !=
BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
2008 (*block)[i++]->~T();
2010 }
while (block != this->tailBlock);
2014 if (this->tailBlock !=
nullptr) {
2015 auto block = this->tailBlock;
2017 auto nextBlock = block->next;
2018 this->parent->add_block_to_free_list(block);
2020 }
while (block != this->tailBlock);
2024 auto header =
static_cast<BlockIndexHeader*
>(pr_blockIndexRaw);
2025 while (header !=
nullptr) {
2026 auto prev =
static_cast<BlockIndexHeader*
>(header->prev);
2027 header->~BlockIndexHeader();
2028 (Traits::free)(header);
2033 template <AllocationMode allocMode,
typename U>
2034 inline bool enqueue(U&& element)
2036 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2037 index_t newTailIndex = 1 + currentTailIndex;
2040 auto startBlock = this->tailBlock;
2041 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2042 if (this->tailBlock !=
nullptr && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2044 this->tailBlock = this->tailBlock->next;
2045 this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2056 auto head = this->headIndex.load(std::memory_order_relaxed);
2057 assert(!details::circular_less_than<index_t>(currentTailIndex,
head));
2058 if (!details::circular_less_than<index_t>(
head, currentTailIndex +
BLOCK_SIZE)
2066 if (pr_blockIndexRaw ==
nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
2075 else if (!new_block_index(pr_blockIndexSlotsUsed))
2082 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2083 if (newBlock ==
nullptr) {
2086 #ifdef MCDBGQ_TRACKMEM
2087 newBlock->owner =
this;
2089 newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2090 if (this->tailBlock ==
nullptr) {
2091 newBlock->next = newBlock;
2093 newBlock->next = this->tailBlock->next;
2094 this->tailBlock->next = newBlock;
2096 this->tailBlock = newBlock;
2097 ++pr_blockIndexSlotsUsed;
2106 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2112 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2113 this->tailBlock = startBlock ==
nullptr ? this->tailBlock : startBlock;
2120 (void)originalBlockIndexSlotsUsed;
2124 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2125 entry.base = currentTailIndex;
2126 entry.block = this->tailBlock;
2127 blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
2128 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2132 this->tailIndex.store(newTailIndex, std::memory_order_release);
2138 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2140 this->tailIndex.store(newTailIndex, std::memory_order_release);
2144 template <
typename U>
2145 bool dequeue(U& element)
2147 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2148 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2149 if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit,
tail)) {
2166 std::atomic_thread_fence(std::memory_order_acquire);
2169 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2181 tail = this->tailIndex.load(std::memory_order_acquire);
2182 if ((
details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit,
tail))) {
2193 auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2197 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2198 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2203 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2204 auto blockBaseIndex = index & ~static_cast<index_t>(
BLOCK_SIZE - 1);
2205 auto offset =
static_cast<size_t>(
static_cast<typename std::make_signed<index_t>::type
>(blockBaseIndex - headBase) /
static_cast<typename std::make_signed<index_t>::type
>(
BLOCK_SIZE));
2206 auto block = localBlockIndex->entries[(localBlockIndexHead +
offset) & (localBlockIndex->size - 1)].block;
2209 auto& el = *((*block)[index]);
2219 (*block)[index]->~T();
2220 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2222 } guard = { block, index };
2224 element = std::move(el);
2226 element = std::move(el);
2228 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2234 this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2241 template <AllocationMode allocMode,
typename It>
2247 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2248 auto startBlock = this->tailBlock;
2249 auto originalBlockIndexFront = pr_blockIndexFront;
2250 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2252 Block* firstAllocatedBlock =
nullptr;
2255 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~
static_cast<index_t>(
BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(
BLOCK_SIZE - 1));
2257 if (blockBaseDiff > 0) {
2259 while (blockBaseDiff > 0 && this->tailBlock !=
nullptr && this->tailBlock->next != firstAllocatedBlock && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2263 this->tailBlock = this->tailBlock->next;
2264 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? this->tailBlock : firstAllocatedBlock;
2266 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2267 entry.base = currentTailIndex;
2268 entry.block = this->tailBlock;
2269 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2273 while (blockBaseDiff > 0) {
2277 auto head = this->headIndex.load(std::memory_order_relaxed);
2278 assert(!details::circular_less_than<index_t>(currentTailIndex,
head));
2280 if (pr_blockIndexRaw ==
nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2284 pr_blockIndexFront = originalBlockIndexFront;
2285 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2286 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2289 else if (full || !new_block_index(originalBlockIndexSlotsUsed))
2292 pr_blockIndexFront = originalBlockIndexFront;
2293 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2294 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2301 originalBlockIndexFront = originalBlockIndexSlotsUsed;
2305 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2306 if (newBlock ==
nullptr) {
2307 pr_blockIndexFront = originalBlockIndexFront;
2308 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2309 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2313 #ifdef MCDBGQ_TRACKMEM
2314 newBlock->owner =
this;
2316 newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>();
2317 if (this->tailBlock ==
nullptr) {
2318 newBlock->next = newBlock;
2320 newBlock->next = this->tailBlock->next;
2321 this->tailBlock->next = newBlock;
2323 this->tailBlock = newBlock;
2324 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? this->tailBlock : firstAllocatedBlock;
2326 ++pr_blockIndexSlotsUsed;
2328 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2329 entry.base = currentTailIndex;
2330 entry.block = this->tailBlock;
2331 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2336 auto block = firstAllocatedBlock;
2338 block->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2339 if (block == this->tailBlock) {
2342 block = block->next;
2347 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2352 index_t newTailIndex = startTailIndex +
static_cast<index_t>(count);
2353 currentTailIndex = startTailIndex;
2354 auto endBlock = this->tailBlock;
2355 this->tailBlock = startBlock;
2356 assert((startTailIndex &
static_cast<index_t>(
BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock !=
nullptr || count == 0);
2357 if ((startTailIndex &
static_cast<index_t>(
BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock !=
nullptr) {
2358 this->tailBlock = firstAllocatedBlock;
2362 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2363 stopIndex = newTailIndex;
2367 while (currentTailIndex != stopIndex) {
2368 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2375 while (currentTailIndex != stopIndex) {
2393 auto constructedStopIndex = currentTailIndex;
2394 auto lastBlockEnqueued = this->tailBlock;
2396 pr_blockIndexFront = originalBlockIndexFront;
2397 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2398 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2400 if (!details::is_trivially_destructible<T>::value) {
2401 auto block = startBlock;
2403 block = firstAllocatedBlock;
2405 currentTailIndex = startTailIndex;
2408 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2409 stopIndex = constructedStopIndex;
2411 while (currentTailIndex != stopIndex) {
2412 (*block)[currentTailIndex++]->~T();
2414 if (block == lastBlockEnqueued) {
2417 block = block->next;
2424 if (this->tailBlock == endBlock) {
2425 assert(currentTailIndex == newTailIndex);
2428 this->tailBlock = this->tailBlock->next;
2433 if (firstAllocatedBlock !=
nullptr)
2434 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2437 this->tailIndex.store(newTailIndex, std::memory_order_release);
2441 template <
typename It>
2442 size_t dequeue_bulk(It& itemFirst,
size_t max)
2444 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2445 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2446 auto desiredCount =
static_cast<size_t>(
tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2447 if (details::circular_less_than<size_t>(0, desiredCount)) {
2448 desiredCount = desiredCount < max ? desiredCount : max;
2449 std::atomic_thread_fence(std::memory_order_acquire);
2451 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2453 tail = this->tailIndex.load(std::memory_order_acquire);
2454 auto actualCount =
static_cast<size_t>(
tail - (myDequeueCount - overcommit));
2455 if (details::circular_less_than<size_t>(0, actualCount)) {
2456 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2457 if (actualCount < desiredCount) {
2458 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2463 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2466 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2467 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2469 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2470 auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(
BLOCK_SIZE - 1);
2471 auto offset =
static_cast<size_t>(
static_cast<typename std::make_signed<index_t>::type
>(firstBlockBaseIndex - headBase) /
static_cast<typename std::make_signed<index_t>::type
>(
BLOCK_SIZE));
2472 auto indexIndex = (localBlockIndexHead +
offset) & (localBlockIndex->size - 1);
2475 auto index = firstIndex;
2477 auto firstIndexInBlock = index;
2479 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t>(actualCount), endIndex) ? firstIndex +
static_cast<index_t>(actualCount) : endIndex;
2480 auto block = localBlockIndex->entries[indexIndex].block;
2482 while (index != endIndex) {
2483 auto& el = *((*block)[index]);
2484 *itemFirst++ = std::move(el);
2491 while (index != endIndex) {
2492 auto& el = *((*block)[index]);
2493 *itemFirst = std::move(el);
2505 block = localBlockIndex->entries[indexIndex].block;
2506 while (index != endIndex) {
2507 (*block)[index++]->~T();
2509 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock,
static_cast<size_t>(endIndex - firstIndexInBlock));
2510 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2512 firstIndexInBlock = index;
2514 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t>(actualCount), endIndex) ? firstIndex +
static_cast<index_t>(actualCount) : endIndex;
2515 }
while (index != firstIndex + actualCount);
2520 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock,
static_cast<size_t>(endIndex - firstIndexInBlock));
2521 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2522 }
while (index != firstIndex + actualCount);
2527 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2535 struct BlockIndexEntry {
2540 struct BlockIndexHeader {
2542 std::atomic<size_t> front;
2543 BlockIndexEntry* entries;
2547 bool new_block_index(
size_t numberOfFilledSlotsToExpose)
2549 auto prevBlockSizeMask = pr_blockIndexSize - 1;
2552 pr_blockIndexSize <<= 1;
2553 auto newRawPtr =
static_cast<char*
>((Traits::malloc)(
sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 +
sizeof(BlockIndexEntry) * pr_blockIndexSize));
2554 if (newRawPtr ==
nullptr) {
2555 pr_blockIndexSize >>= 1;
2559 auto newBlockIndexEntries =
reinterpret_cast<BlockIndexEntry*
>(details::align_for<BlockIndexEntry>(newRawPtr +
sizeof(BlockIndexHeader)));
2563 if (pr_blockIndexSlotsUsed != 0) {
2564 auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2566 newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
2567 i = (i + 1) & prevBlockSizeMask;
2568 }
while (i != pr_blockIndexFront);
2572 auto header =
new (newRawPtr) BlockIndexHeader;
2573 header->size = pr_blockIndexSize;
2574 header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
2575 header->entries = newBlockIndexEntries;
2576 header->prev = pr_blockIndexRaw;
2578 pr_blockIndexFront = j;
2579 pr_blockIndexEntries = newBlockIndexEntries;
2580 pr_blockIndexRaw = newRawPtr;
2581 blockIndex.store(header, std::memory_order_release);
2587 std::atomic<BlockIndexHeader*> blockIndex;
2590 size_t pr_blockIndexSlotsUsed;
2591 size_t pr_blockIndexSize;
2592 size_t pr_blockIndexFront;
2593 BlockIndexEntry* pr_blockIndexEntries;
2594 void* pr_blockIndexRaw;
2596 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2603 #ifdef MCDBGQ_TRACKMEM
2604 friend struct MemStats;
2614 : ProducerBase(parent_, false)
2616 , blockIndex(nullptr)
2628 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2630 if (!this->
inactive.load(std::memory_order_relaxed)) {
2636 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2637 auto index = this->headIndex.load(std::memory_order_relaxed);
2638 Block* block =
nullptr;
2640 bool forceFreeLastBlock = index !=
tail;
2641 while (index !=
tail) {
2643 if (block !=
nullptr) {
2645 this->parent->add_block_to_free_list(block);
2648 block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed);
2651 ((*block)[index])->~T();
2657 if (this->tailBlock !=
nullptr && (forceFreeLastBlock || (
tail &
static_cast<index_t>(
BLOCK_SIZE - 1)) != 0)) {
2658 this->parent->add_block_to_free_list(this->tailBlock);
2662 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2663 if (localBlockIndex !=
nullptr) {
2664 for (
size_t i = 0; i != localBlockIndex->capacity; ++i) {
2665 localBlockIndex->index[i]->~BlockIndexEntry();
2668 auto prev = localBlockIndex->prev;
2669 localBlockIndex->~BlockIndexHeader();
2670 (Traits::free)(localBlockIndex);
2671 localBlockIndex = prev;
2672 }
while (localBlockIndex !=
nullptr);
2676 template <AllocationMode allocMode,
typename U>
2677 inline bool enqueue(U&& element)
2679 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2680 index_t newTailIndex = 1 + currentTailIndex;
2683 auto head = this->headIndex.load(std::memory_order_relaxed);
2684 assert(!details::circular_less_than<index_t>(currentTailIndex,
head));
2688 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2689 debug::DebugLock lock(mutex);
2692 BlockIndexEntry* idxEntry;
2693 if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2698 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2699 if (newBlock ==
nullptr) {
2700 rewind_block_index_tail();
2701 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2704 #ifdef MCDBGQ_TRACKMEM
2705 newBlock->owner =
this;
2707 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2714 new ((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2718 rewind_block_index_tail();
2719 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2720 this->parent->add_block_to_free_list(newBlock);
2726 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2728 this->tailBlock = newBlock;
2732 this->tailIndex.store(newTailIndex, std::memory_order_release);
2738 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2740 this->tailIndex.store(newTailIndex, std::memory_order_release);
2744 template <
typename U>
2745 bool dequeue(U& element)
2748 index_t tail = this->tailIndex.load(std::memory_order_relaxed);
2749 index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2750 if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit,
tail)) {
2751 std::atomic_thread_fence(std::memory_order_acquire);
2753 index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2754 tail = this->tailIndex.load(std::memory_order_acquire);
2755 if ((
details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit,
tail))) {
2756 index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2759 auto entry = get_block_index_entry_for_index(index);
2762 auto block = entry->value.load(std::memory_order_relaxed);
2763 auto& el = *((*block)[index]);
2766 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2769 debug::DebugLock lock(producer->mutex);
2774 BlockIndexEntry* entry;
2779 (*block)[index]->~T();
2780 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2781 entry->value.store(
nullptr, std::memory_order_relaxed);
2782 parent->add_block_to_free_list(block);
2785 } guard = { block, index, entry, this->parent };
2787 element = std::move(el);
2789 element = std::move(el);
2792 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2794 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2795 debug::DebugLock lock(mutex);
2798 entry->value.store(
nullptr, std::memory_order_relaxed);
2800 this->parent->add_block_to_free_list(block);
2806 this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2814 #pragma warning(push)
2815 #pragma warning(disable : 4706) // assignment within conditional expression
2817 template <AllocationMode allocMode,
typename It>
2829 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2830 auto startBlock = this->tailBlock;
2831 Block* firstAllocatedBlock =
nullptr;
2832 auto endBlock = this->tailBlock;
2835 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~
static_cast<index_t>(
BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(
BLOCK_SIZE - 1));
2837 if (blockBaseDiff > 0) {
2838 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2839 debug::DebugLock lock(mutex);
2846 BlockIndexEntry* idxEntry =
nullptr;
2848 bool indexInserted =
false;
2849 auto head = this->headIndex.load(std::memory_order_relaxed);
2850 assert(!details::circular_less_than<index_t>(currentTailIndex,
head));
2853 if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) || (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) ==
nullptr) {
2856 if (indexInserted) {
2857 rewind_block_index_tail();
2858 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2860 currentTailIndex = (startTailIndex - 1) & ~
static_cast<index_t>(
BLOCK_SIZE - 1);
2861 for (
auto block = firstAllocatedBlock; block !=
nullptr; block = block->next) {
2863 idxEntry = get_block_index_entry_for_index(currentTailIndex);
2864 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2865 rewind_block_index_tail();
2867 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2868 this->tailBlock = startBlock;
2873 #ifdef MCDBGQ_TRACKMEM
2874 newBlock->owner =
this;
2876 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2877 newBlock->next =
nullptr;
2880 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2884 if ((startTailIndex &
static_cast<index_t>(
BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock !=
nullptr) {
2885 assert(this->tailBlock !=
nullptr);
2886 this->tailBlock->next = newBlock;
2888 this->tailBlock = newBlock;
2889 endBlock = newBlock;
2890 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? newBlock : firstAllocatedBlock;
2891 }
while (blockBaseDiff > 0);
2895 index_t newTailIndex = startTailIndex +
static_cast<index_t>(count);
2896 currentTailIndex = startTailIndex;
2897 this->tailBlock = startBlock;
2898 assert((startTailIndex &
static_cast<index_t>(
BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock !=
nullptr || count == 0);
2899 if ((startTailIndex &
static_cast<index_t>(
BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock !=
nullptr) {
2900 this->tailBlock = firstAllocatedBlock;
2904 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2905 stopIndex = newTailIndex;
2909 while (currentTailIndex != stopIndex) {
2910 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2917 while (currentTailIndex != stopIndex) {
2925 auto constructedStopIndex = currentTailIndex;
2926 auto lastBlockEnqueued = this->tailBlock;
2928 if (!details::is_trivially_destructible<T>::value) {
2929 auto block = startBlock;
2931 block = firstAllocatedBlock;
2933 currentTailIndex = startTailIndex;
2936 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2937 stopIndex = constructedStopIndex;
2939 while (currentTailIndex != stopIndex) {
2940 (*block)[currentTailIndex++]->~T();
2942 if (block == lastBlockEnqueued) {
2945 block = block->next;
2949 currentTailIndex = (startTailIndex - 1) & ~
static_cast<index_t>(
BLOCK_SIZE - 1);
2950 for (
auto block = firstAllocatedBlock; block !=
nullptr; block = block->next) {
2952 auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
2953 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2954 rewind_block_index_tail();
2956 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2957 this->tailBlock = startBlock;
2962 if (this->tailBlock == endBlock) {
2963 assert(currentTailIndex == newTailIndex);
2966 this->tailBlock = this->tailBlock->next;
2968 this->tailIndex.store(newTailIndex, std::memory_order_release);
2972 #pragma warning(pop)
2975 template <
typename It>
2976 size_t dequeue_bulk(It& itemFirst,
size_t max)
2978 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2979 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2980 auto desiredCount =
static_cast<size_t>(
tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2981 if (details::circular_less_than<size_t>(0, desiredCount)) {
2982 desiredCount = desiredCount < max ? desiredCount : max;
2983 std::atomic_thread_fence(std::memory_order_acquire);
2985 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2987 tail = this->tailIndex.load(std::memory_order_acquire);
2988 auto actualCount =
static_cast<size_t>(
tail - (myDequeueCount - overcommit));
2989 if (details::circular_less_than<size_t>(0, actualCount)) {
2990 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2991 if (actualCount < desiredCount) {
2992 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2997 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
3000 auto index = firstIndex;
3001 BlockIndexHeader* localBlockIndex;
3002 auto indexIndex = get_block_index_index_for_index(index, localBlockIndex);
3004 auto blockStartIndex = index;
3006 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t>(actualCount), endIndex) ? firstIndex +
static_cast<index_t>(actualCount) : endIndex;
3008 auto entry = localBlockIndex->index[indexIndex];
3009 auto block = entry->value.load(std::memory_order_relaxed);
3011 while (index != endIndex) {
3012 auto& el = *((*block)[index]);
3013 *itemFirst++ = std::move(el);
3020 while (index != endIndex) {
3021 auto& el = *((*block)[index]);
3022 *itemFirst = std::move(el);
3031 entry = localBlockIndex->index[indexIndex];
3032 block = entry->value.load(std::memory_order_relaxed);
3033 while (index != endIndex) {
3034 (*block)[index++]->~T();
3037 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex,
static_cast<size_t>(endIndex - blockStartIndex))) {
3038 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3039 debug::DebugLock lock(mutex);
3041 entry->value.store(
nullptr, std::memory_order_relaxed);
3042 this->parent->add_block_to_free_list(block);
3044 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
3046 blockStartIndex = index;
3048 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t>(actualCount), endIndex) ? firstIndex +
static_cast<index_t>(actualCount) : endIndex;
3049 }
while (index != firstIndex + actualCount);
3054 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex,
static_cast<size_t>(endIndex - blockStartIndex))) {
3056 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3057 debug::DebugLock lock(mutex);
3061 entry->value.store(
nullptr, std::memory_order_relaxed);
3063 this->parent->add_block_to_free_list(block);
3065 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
3066 }
while (index != firstIndex + actualCount);
3070 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
3079 static const index_t INVALID_BLOCK_BASE = 1;
3081 struct BlockIndexEntry {
3082 std::atomic<index_t> key;
3083 std::atomic<Block*> value;
3086 struct BlockIndexHeader {
3088 std::atomic<size_t>
tail;
3089 BlockIndexEntry* entries;
3090 BlockIndexEntry** index;
3091 BlockIndexHeader* prev;
3094 template <AllocationMode allocMode>
3095 inline bool insert_block_index_entry(BlockIndexEntry*& idxEntry,
index_t blockStartIndex)
3097 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
3098 if (localBlockIndex ==
nullptr) {
3101 size_t newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
3102 idxEntry = localBlockIndex->index[newTail];
3103 if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE || idxEntry->value.load(std::memory_order_relaxed) ==
nullptr) {
3105 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
3106 localBlockIndex->tail.store(newTail, std::memory_order_release);
3115 else if (!new_block_index())
3121 localBlockIndex = blockIndex.load(std::memory_order_relaxed);
3122 newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
3123 idxEntry = localBlockIndex->index[newTail];
3124 assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
3125 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
3126 localBlockIndex->tail.store(newTail, std::memory_order_release);
3131 inline void rewind_block_index_tail()
3133 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
3134 localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) & (localBlockIndex->capacity - 1), std::memory_order_relaxed);
3137 inline BlockIndexEntry* get_block_index_entry_for_index(
index_t index)
const
3139 BlockIndexHeader* localBlockIndex;
3140 auto idx = get_block_index_index_for_index(index, localBlockIndex);
3141 return localBlockIndex->index[idx];
3144 inline size_t get_block_index_index_for_index(
index_t index, BlockIndexHeader*& localBlockIndex)
const
3146 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3147 debug::DebugLock lock(mutex);
3149 index &= ~static_cast<index_t>(
BLOCK_SIZE - 1);
3150 localBlockIndex = blockIndex.load(std::memory_order_acquire);
3151 auto tail = localBlockIndex->tail.load(std::memory_order_acquire);
3152 auto tailBase = localBlockIndex->index[
tail]->key.load(std::memory_order_relaxed);
3153 assert(tailBase != INVALID_BLOCK_BASE);
3156 auto offset =
static_cast<size_t>(
static_cast<typename std::make_signed<index_t>::type
>(index - tailBase) /
static_cast<typename std::make_signed<index_t>::type
>(
BLOCK_SIZE));
3157 size_t idx = (
tail +
offset) & (localBlockIndex->capacity - 1);
3158 assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index && localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) !=
nullptr);
3162 bool new_block_index()
3164 auto prev = blockIndex.load(std::memory_order_relaxed);
3165 size_t prevCapacity = prev ==
nullptr ? 0 : prev->capacity;
3166 auto entryCount = prev ==
nullptr ? nextBlockIndexCapacity : prevCapacity;
3167 auto raw =
static_cast<char*
>((Traits::malloc)(
3168 sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 +
sizeof(BlockIndexEntry) * entryCount + std::alignment_of<BlockIndexEntry*>::value - 1 +
sizeof(BlockIndexEntry*) * nextBlockIndexCapacity));
3169 if (raw ==
nullptr) {
3173 auto header =
new (raw) BlockIndexHeader;
3174 auto entries =
reinterpret_cast<BlockIndexEntry*
>(details::align_for<BlockIndexEntry>(raw +
sizeof(BlockIndexHeader)));
3175 auto index =
reinterpret_cast<BlockIndexEntry**
>(details::align_for<BlockIndexEntry*>(
reinterpret_cast<char*
>(entries) +
sizeof(BlockIndexEntry) * entryCount));
3176 if (prev !=
nullptr) {
3177 auto prevTail = prev->tail.load(std::memory_order_relaxed);
3178 auto prevPos = prevTail;
3181 prevPos = (prevPos + 1) & (prev->capacity - 1);
3182 index[i++] = prev->index[prevPos];
3183 }
while (prevPos != prevTail);
3184 assert(i == prevCapacity);
3186 for (
size_t i = 0; i != entryCount; ++i) {
3187 new (entries + i) BlockIndexEntry;
3188 entries[i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
3189 index[prevCapacity + i] = entries + i;
3191 header->prev = prev;
3192 header->entries = entries;
3193 header->index = index;
3194 header->capacity = nextBlockIndexCapacity;
3195 header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1), std::memory_order_relaxed);
3197 blockIndex.store(header, std::memory_order_release);
3199 nextBlockIndexCapacity <<= 1;
3205 size_t nextBlockIndexCapacity;
3206 std::atomic<BlockIndexHeader*> blockIndex;
3208 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3210 details::ThreadExitListener threadExitListener;
3215 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3222 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3223 mutable debug::DebugMutex mutex;
3225 #ifdef MCDBGQ_TRACKMEM
3226 friend struct MemStats;
3234 void populate_initial_block_list(
size_t blockCount)
3236 initialBlockPoolSize = blockCount;
3237 if (initialBlockPoolSize == 0) {
3238 initialBlockPool =
nullptr;
3242 initialBlockPool = create_array<Block>(blockCount);
3243 if (initialBlockPool ==
nullptr) {
3244 initialBlockPoolSize = 0;
3246 for (
size_t i = 0; i < initialBlockPoolSize; ++i) {
3247 initialBlockPool[i].dynamicallyAllocated =
false;
3251 inline Block* try_get_block_from_initial_pool()
3253 if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
3257 auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
3259 return index < initialBlockPoolSize ? (initialBlockPool + index) :
nullptr;
3262 inline void add_block_to_free_list(Block* block)
3264 #ifdef MCDBGQ_TRACKMEM
3265 block->owner =
nullptr;
3267 if (!Traits::RECYCLE_ALLOCATED_BLOCKS && block->dynamicallyAllocated) {
3270 freeList.add(block);
3274 inline void add_blocks_to_free_list(Block* block)
3276 while (block !=
nullptr) {
3277 auto next = block->next;
3278 add_block_to_free_list(block);
3283 inline Block* try_get_block_from_free_list()
3285 return freeList.try_get();
3289 template <AllocationMode canAlloc>
3290 Block* requisition_block()
3292 auto block = try_get_block_from_initial_pool();
3293 if (block !=
nullptr) {
3297 block = try_get_block_from_free_list();
3298 if (block !=
nullptr) {
3304 return create<Block>();
3312 #ifdef MCDBGQ_TRACKMEM
3315 size_t allocatedBlocks;
3318 size_t ownedBlocksExplicit;
3319 size_t ownedBlocksImplicit;
3320 size_t implicitProducers;
3321 size_t explicitProducers;
3322 size_t elementsEnqueued;
3323 size_t blockClassBytes;
3324 size_t queueClassBytes;
3325 size_t implicitBlockIndexBytes;
3326 size_t explicitBlockIndexBytes;
3333 MemStats stats = { 0 };
3335 stats.elementsEnqueued = q->size_approx();
3337 auto block = q->freeList.head_unsafe();
3338 while (block !=
nullptr) {
3339 ++stats.allocatedBlocks;
3341 block = block->freeListNext.load(std::memory_order_relaxed);
3344 for (
auto ptr = q->producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
3346 stats.implicitProducers += implicit ? 1 : 0;
3347 stats.explicitProducers += implicit ? 0 : 1;
3352 auto head = prod->headIndex.load(std::memory_order_relaxed);
3353 auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3354 auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3355 if (hash !=
nullptr) {
3356 for (
size_t i = 0; i != hash->capacity; ++i) {
3357 if (hash->index[i]->key.load(std::memory_order_relaxed) != ImplicitProducer::INVALID_BLOCK_BASE && hash->index[i]->value.load(std::memory_order_relaxed) !=
nullptr) {
3358 ++stats.allocatedBlocks;
3359 ++stats.ownedBlocksImplicit;
3362 stats.implicitBlockIndexBytes += hash->capacity *
sizeof(
typename ImplicitProducer::BlockIndexEntry);
3363 for (; hash !=
nullptr; hash = hash->prev) {
3364 stats.implicitBlockIndexBytes +=
sizeof(
typename ImplicitProducer::BlockIndexHeader) + hash->capacity *
sizeof(
typename ImplicitProducer::BlockIndexEntry*);
3374 auto tailBlock = prod->tailBlock;
3375 bool wasNonEmpty =
false;
3376 if (tailBlock !=
nullptr) {
3377 auto block = tailBlock;
3379 ++stats.allocatedBlocks;
3380 if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) {
3382 wasNonEmpty = wasNonEmpty || block != tailBlock;
3384 ++stats.ownedBlocksExplicit;
3385 block = block->next;
3386 }
while (block != tailBlock);
3388 auto index = prod->blockIndex.load(std::memory_order_relaxed);
3389 while (index !=
nullptr) {
3390 stats.explicitBlockIndexBytes +=
sizeof(
typename ExplicitProducer::BlockIndexHeader) + index->size *
sizeof(
typename ExplicitProducer::BlockIndexEntry);
3391 index =
static_cast<typename ExplicitProducer::BlockIndexHeader*
>(index->prev);
3396 auto freeOnInitialPool = q->initialBlockPoolIndex.load(std::memory_order_relaxed) >= q->initialBlockPoolSize ? 0 : q->initialBlockPoolSize - q->initialBlockPoolIndex.load(std::memory_order_relaxed);
3397 stats.allocatedBlocks += freeOnInitialPool;
3398 stats.freeBlocks += freeOnInitialPool;
3400 stats.blockClassBytes =
sizeof(Block) * stats.allocatedBlocks;
3408 MemStats getMemStats()
3410 return MemStats::getFor(
this);
3414 friend struct MemStats;
3421 ProducerBase* recycle_or_create_producer(
bool isExplicit)
3423 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3424 debug::DebugLock lock(implicitProdMutex);
3427 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
3428 if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
3429 bool expected =
true;
3430 if (ptr->inactive.compare_exchange_strong(expected,
false, std::memory_order_acquire, std::memory_order_relaxed)) {
3437 return add_producer(isExplicit ?
static_cast<ProducerBase*
>(create<ExplicitProducer>(
this)) : create<ImplicitProducer>(
this));
3440 ProducerBase* add_producer(ProducerBase* producer)
3443 if (producer ==
nullptr) {
3447 producerCount.fetch_add(1, std::memory_order_relaxed);
3450 auto prevTail = producerListTail.load(std::memory_order_relaxed);
3452 producer->next = prevTail;
3453 }
while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed));
3455 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3456 if (producer->isExplicit) {
3457 auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3459 static_cast<ExplicitProducer*
>(producer)->nextExplicitProducer = prevTailExplicit;
3460 }
while (!explicitProducers.compare_exchange_weak(prevTailExplicit,
static_cast<ExplicitProducer*
>(producer), std::memory_order_release, std::memory_order_relaxed));
3462 auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3464 static_cast<ImplicitProducer*
>(producer)->nextImplicitProducer = prevTailImplicit;
3465 }
while (!implicitProducers.compare_exchange_weak(prevTailImplicit,
static_cast<ImplicitProducer*
>(producer), std::memory_order_release, std::memory_order_relaxed));
3472 void reown_producers()
3477 for (
auto ptr = producerListTail.load(std::memory_order_relaxed); ptr !=
nullptr; ptr = ptr->next_prod()) {
3486 struct ImplicitProducerKVP {
3487 std::atomic<details::thread_id_t> key;
3490 ImplicitProducerKVP()
3497 key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3498 value = other.value;
3509 if (
this != &other) {
3516 template <
typename XT,
typename XTraits>
3517 friend void swap(
typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&,
typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&)
MOODYCAMEL_NOEXCEPT;
3519 struct ImplicitProducerHash {
3521 ImplicitProducerKVP* entries;
3522 ImplicitProducerHash* prev;
3525 inline void populate_initial_implicit_producer_hash()
3533 implicitProducerHashCount.store(0, std::memory_order_relaxed);
3534 auto hash = &initialImplicitProducerHash;
3536 hash->entries = &initialImplicitProducerHashEntries[0];
3540 hash->prev =
nullptr;
3541 implicitProducerHash.store(hash, std::memory_order_relaxed);
3554 initialImplicitProducerHashEntries.swap(other.initialImplicitProducerHashEntries);
3555 initialImplicitProducerHash.entries = &initialImplicitProducerHashEntries[0];
3556 other.initialImplicitProducerHash.entries = &other.initialImplicitProducerHashEntries[0];
3561 if (implicitProducerHash.load(std::memory_order_relaxed) == &other.initialImplicitProducerHash) {
3562 implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed);
3564 ImplicitProducerHash* hash;
3565 for (hash = implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &other.initialImplicitProducerHash; hash = hash->prev) {
3568 hash->prev = &initialImplicitProducerHash;
3570 if (other.implicitProducerHash.load(std::memory_order_relaxed) == &initialImplicitProducerHash) {
3571 other.implicitProducerHash.store(&other.initialImplicitProducerHash, std::memory_order_relaxed);
3573 ImplicitProducerHash* hash;
3574 for (hash = other.implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &initialImplicitProducerHash; hash = hash->prev) {
3577 hash->prev = &other.initialImplicitProducerHash;
3595 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3596 debug::DebugLock lock(implicitProdMutex);
3602 auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
3603 assert(mainHash !=
nullptr);
3604 for (
auto hash = mainHash; hash !=
nullptr; hash = hash->prev) {
3606 auto index = hashedId;
3608 index &= hash->capacity - 1u;
3610 auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3611 if (probedKey ==
id) {
3617 auto value = hash->entries[index].value;
3618 if (hash != mainHash) {
3621 index &= mainHash->capacity - 1u;
3623 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3625 if (mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_seq_cst, std::memory_order_relaxed) || mainHash->entries[index].key.compare_exchange_strong(reusable,
id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3627 if (mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3629 mainHash->entries[index].value = value;
3646 auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
3649 if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) {
3654 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3655 if (newCount >= (mainHash->capacity >> 1)) {
3656 size_t newCapacity = mainHash->capacity << 1;
3657 while (newCount >= (newCapacity >> 1)) {
3660 auto raw =
static_cast<char*
>((Traits::malloc)(
sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 +
sizeof(ImplicitProducerKVP) * newCapacity));
3661 if (raw ==
nullptr) {
3663 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3664 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
3668 auto newHash =
new (raw) ImplicitProducerHash;
3669 newHash->capacity =
static_cast<size_t>(newCapacity);
3670 newHash->entries =
reinterpret_cast<ImplicitProducerKVP*
>(details::align_for<ImplicitProducerKVP>(raw +
sizeof(ImplicitProducerHash)));
3671 for (
size_t i = 0; i != newCapacity; ++i) {
3672 new (newHash->entries + i) ImplicitProducerKVP;
3675 newHash->prev = mainHash;
3676 implicitProducerHash.store(newHash, std::memory_order_release);
3677 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3680 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3687 if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3688 auto producer =
static_cast<ImplicitProducer*
>(recycle_or_create_producer(
false));
3689 if (producer ==
nullptr) {
3690 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3694 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3695 producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3696 producer->threadExitListener.userData = producer;
3700 auto index = hashedId;
3702 index &= mainHash->capacity - 1u;
3704 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3706 if (mainHash->entries[index].key.compare_exchange_strong(reusable,
id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3707 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3708 mainHash->entries[index].value = producer;
3712 if (mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3713 mainHash->entries[index].value = producer;
3724 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3728 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3732 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3733 debug::DebugLock lock(implicitProdMutex);
3735 auto hash = implicitProducerHash.load(std::memory_order_acquire);
3736 assert(hash !=
nullptr);
3743 for (; hash !=
nullptr; hash = hash->prev) {
3744 auto index = hashedId;
3746 index &= hash->capacity - 1u;
3748 if (hash->entries[index].key.compare_exchange_strong(probedKey,
details::invalid_thread_id2, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3756 producer->inactive.store(
true, std::memory_order_release);
3759 static void implicit_producer_thread_exited_callback(
void* userData)
3762 auto queue = producer->parent;
3763 queue->implicit_producer_thread_exited(producer);
3771 template <
typename TAlign>
3772 static inline void* aligned_malloc(
size_t size)
3775 return (Traits::malloc)(size);
3778 size_t alignment = std::alignment_of<TAlign>::value;
3779 void* raw = (Traits::malloc)(size + alignment - 1 +
sizeof(
void*));
3782 char* ptr = details::align_for<TAlign>(
reinterpret_cast<char*
>(raw) +
sizeof(
void*));
3783 *(
reinterpret_cast<void**
>(ptr) - 1) = raw;
3788 template <
typename TAlign>
3789 static inline void aligned_free(
void* ptr)
3792 return (Traits::free)(ptr);
3793 else(Traits::free)(ptr ? *(
reinterpret_cast<void**
>(ptr) - 1) : nullptr);
3796 template <
typename U>
3797 static inline U* create_array(
size_t count)
3800 U* p =
static_cast<U*
>(aligned_malloc<U>(
sizeof(U) * count));
3804 for (
size_t i = 0; i != count; ++i)
3809 template <
typename U>
3810 static inline void destroy_array(U* p,
size_t count)
3814 for (
size_t i = count; i != 0;)
3820 template <
typename U>
3821 static inline U* create()
3823 void* p = aligned_malloc<U>(
sizeof(U));
3824 return p !=
nullptr ?
new (p) U :
nullptr;
3827 template <
typename U,
typename A1>
3828 static inline U* create(A1&& a1)
3830 void* p = aligned_malloc<U>(
sizeof(U));
3831 return p !=
nullptr ?
new (p) U(std::forward<A1>(a1)) :
nullptr;
3834 template <
typename U>
3835 static inline void destroy(U* p)
3843 std::atomic<ProducerBase*> producerListTail;
3844 std::atomic<std::uint32_t> producerCount;
3846 std::atomic<size_t> initialBlockPoolIndex;
3847 Block* initialBlockPool;
3848 size_t initialBlockPoolSize;
3850 #ifndef MCDBGQ_USEDEBUGFREELIST
3851 FreeList<Block> freeList;
3853 debug::DebugFreeList<Block> freeList;
3856 std::atomic<ImplicitProducerHash*> implicitProducerHash;
3857 std::atomic<size_t> implicitProducerHashCount;
3858 ImplicitProducerHash initialImplicitProducerHash;
3859 std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
3860 std::atomic_flag implicitProducerHashResizeInProgress;
3862 std::atomic<std::uint32_t> nextExplicitConsumerId;
3863 std::atomic<std::uint32_t> globalExplicitConsumerOffset;
3865 #ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3866 debug::DebugMutex implicitProdMutex;
3869 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3870 std::atomic<ExplicitProducer*> explicitProducers;
3871 std::atomic<ImplicitProducer*> implicitProducers;
3875 template <
typename T,
typename Traits>
3877 : producer(queue.recycle_or_create_producer(true))
3884 template <
typename T,
typename Traits>
3886 : producer(reinterpret_cast<
ConcurrentQueue<T, Traits>*>(&queue)->recycle_or_create_producer(true))
3893 template <
typename T,
typename Traits>
3895 : itemsConsumedFromCurrent(0)
3896 , currentProducer(nullptr)
3897 , desiredProducer(nullptr)
3899 initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3900 lastKnownGlobalOffset =
static_cast<std::uint32_t
>(-1);
3903 template <
typename T,
typename Traits>
3905 : itemsConsumedFromCurrent(0)
3906 , currentProducer(nullptr)
3907 , desiredProducer(nullptr)
3909 initialOffset =
reinterpret_cast<ConcurrentQueue<T, Traits>*
>(&queue)->nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3910 lastKnownGlobalOffset =
static_cast<std::uint32_t
>(-1);
3913 template <
typename T,
typename Traits>
3929 template <
typename T,
typename Traits>
3938 #if defined(_MSC_VER) && (!defined(_HAS_CXX17) || !_HAS_CXX17)
3939 #pragma warning(pop)
3942 #if defined(__GNUC__) && !defined(__INTEL_COMPILER)
3943 #pragma GCC diagnostic pop
3946 #endif // include guard