A library for working with phylogenetic and population genetic data.
v0.32.0
lightweight_semaphore.hpp
Go to the documentation of this file.
1 #ifndef GENESIS_UTILS_THREADING_LIGHTWEIGHT_SEMAPHORE_H_
2 #define GENESIS_UTILS_THREADING_LIGHTWEIGHT_SEMAPHORE_H_
3 
4 /*
5  Genesis - A toolkit for working with phylogenetic data.
6  Copyright (C) 2014-2024 Lucas Czech
7 
8  This program is free software: you can redistribute it and/or modify
9  it under the terms of the GNU General Public License as published by
10  the Free Software Foundation, either version 3 of the License, or
11  (at your option) any later version.
12 
13  This program is distributed in the hope that it will be useful,
14  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  GNU General Public License for more details.
17 
18  You should have received a copy of the GNU General Public License
19  along with this program. If not, see <http://www.gnu.org/licenses/>.
20 
21  Contact:
22  Lucas Czech <lucas.czech@sund.ku.dk>
23  University of Copenhagen, Globe Institute, Section for GeoGenetics
24  Oster Voldgade 5-7, 1350 Copenhagen K, Denmark
25 */
26 
27 /*
28  This code below is adapted from the excellent moodycamel::ConcurrentQueue
29  (https://github.com/cameron314/concurrentqueue), using version v1.0.4,
30  which was published under a simplified BSD license, and also dual-licensed
31  under the Boost Software License. The full original license
32  (https://github.com/cameron314/concurrentqueue/blob/master/LICENSE.md), is copied
33  in our documentation as well, see `genesis/doc/manual/supplement/acknowledgements.md`, and
34  @link supplement_acknowledgements_code_reuse_concurrent_queue Acknowledgements@endlink.
35 
36  See also ConcurrentQueue and BlockingConcurrentQueue for the other two classes
37  that we adapted from the original repository.
38 
39  We adapted the original code by (roughly) formatting it to our formatting standard, as well as
40  renaming the namespace from moodycamel to be contained within our namespace, to keep our
41  documentation and usage consistent. Other than that, all functionality is kept as-is.
42  It feels weird to rename the namespaces, as it might seem that we are trying to hide the fact
43  that this code is not ours. That is not the case, and is merely based on our compulsion
44  for orderly namespaces and documentation within genesis. Please note that the original code
45  is excellet work that we would like to acknowledge here! Also, if anyone wants to do the same
46  with genesis code, please feel free to do so ;-)
47 */
48 
56 // =================================================================================================
57 // Lightweight Semaphore
58 // =================================================================================================
59 
60 // Provides an efficient implementation of a semaphore (LightweightSemaphore).
61 // This is an extension of Jeff Preshing's sempahore implementation (licensed
62 // under the terms of its separate zlib license) that has been adapted and
63 // extended by Cameron Desrochers.
64 
65 #include <atomic>
66 #include <cstddef> // For std::size_t
67 #include <type_traits> // For std::make_signed<T>
68 
69 #if defined(_WIN32)
70 // Avoid including windows.h in a header; we only need a handful of
71 // items, so we'll redeclare them here (this is relatively safe since
72 // the API generally has to remain stable between Windows versions).
73 // I know this is an ugly hack but it still beats polluting the global
74 // namespace with thousands of generic names or adding a .cpp for nothing.
75 extern "C" {
76 struct _SECURITY_ATTRIBUTES;
77 __declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, long lInitialCount, long lMaximumCount, const wchar_t* lpName);
78 __declspec(dllimport) int __stdcall CloseHandle(void* hObject);
79 __declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, unsigned long dwMilliseconds);
80 __declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, long* lpPreviousCount);
81 }
82 #elif defined(__MACH__)
83 #include <mach/mach.h>
84 #elif defined(__MVS__)
85 #include <zos-semaphore.h>
86 #elif defined(__unix__)
87 #include <semaphore.h>
88 
89 #if defined(__GLIBC_PREREQ) && defined(_GNU_SOURCE)
90 #if __GLIBC_PREREQ(2, 30)
91 #define MOODYCAMEL_LIGHTWEIGHTSEMAPHORE_MONOTONIC
92 #endif
93 #endif
94 #endif
95 
96 namespace genesis {
97 namespace utils {
98 namespace details {
99 
100 // Code in the mpmc_sema namespace below is an adaptation of Jeff Preshing's
101 // portable + lightweight semaphore implementations, originally from
102 // https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h
103 // LICENSE:
104 // Copyright (c) 2015 Jeff Preshing
105 //
106 // This software is provided 'as-is', without any express or implied
107 // warranty. In no event will the authors be held liable for any damages
108 // arising from the use of this software.
109 //
110 // Permission is granted to anyone to use this software for any purpose,
111 // including commercial applications, and to alter it and redistribute it
112 // freely, subject to the following restrictions:
113 //
114 // 1. The origin of this software must not be misrepresented; you must not
115 // claim that you wrote the original software. If you use this software
116 // in a product, an acknowledgement in the product documentation would be
117 // appreciated but is not required.
118 // 2. Altered source versions must be plainly marked as such, and must not be
119 // misrepresented as being the original software.
120 // 3. This notice may not be removed or altered from any source distribution.
121 #if defined(_WIN32)
122  class Semaphore {
123  private:
124  void* m_hSema;
125 
126  Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
127  Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
128 
129  public:
130  Semaphore(int initialCount = 0)
131  {
132  assert(initialCount >= 0);
133  const long maxLong = 0x7fffffff;
134  m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr);
135  assert(m_hSema);
136  }
137 
138  ~Semaphore()
139  {
140  CloseHandle(m_hSema);
141  }
142 
143  bool wait()
144  {
145  const unsigned long infinite = 0xffffffff;
146  return WaitForSingleObject(m_hSema, infinite) == 0;
147  }
148 
149  bool try_wait()
150  {
151  return WaitForSingleObject(m_hSema, 0) == 0;
152  }
153 
154  bool timed_wait(std::uint64_t usecs)
155  {
156  return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) == 0;
157  }
158 
159  void signal(int count = 1)
160  {
161  while (!ReleaseSemaphore(m_hSema, count, nullptr))
162  ;
163  }
164  };
165 #elif defined(__MACH__)
166  //---------------------------------------------------------
167  // Semaphore (Apple iOS and OSX)
168  // Can't use POSIX semaphores due to http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html
169  //---------------------------------------------------------
170  class Semaphore {
171  private:
172  semaphore_t m_sema;
173 
174  Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
175  Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
176 
177  public:
178  Semaphore(int initialCount = 0)
179  {
180  assert(initialCount >= 0);
181  kern_return_t rc = semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
182  assert(rc == KERN_SUCCESS);
183  (void)rc;
184  }
185 
186  ~Semaphore()
187  {
188  semaphore_destroy(mach_task_self(), m_sema);
189  }
190 
191  bool wait()
192  {
193  return semaphore_wait(m_sema) == KERN_SUCCESS;
194  }
195 
196  bool try_wait()
197  {
198  return timed_wait(0);
199  }
200 
201  bool timed_wait(std::uint64_t timeout_usecs)
202  {
203  mach_timespec_t ts;
204  ts.tv_sec = static_cast<unsigned int>(timeout_usecs / 1000000);
205  ts.tv_nsec = static_cast<int>((timeout_usecs % 1000000) * 1000);
206 
207  // added in OSX 10.10: https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html
208  kern_return_t rc = semaphore_timedwait(m_sema, ts);
209  return rc == KERN_SUCCESS;
210  }
211 
212  void signal()
213  {
214  while (semaphore_signal(m_sema) != KERN_SUCCESS)
215  ;
216  }
217 
218  void signal(int count)
219  {
220  while (count-- > 0) {
221  while (semaphore_signal(m_sema) != KERN_SUCCESS)
222  ;
223  }
224  }
225  };
226 #elif defined(__unix__) || defined(__MVS__)
227  //---------------------------------------------------------
228  // Semaphore (POSIX, Linux, zOS)
229  //---------------------------------------------------------
230  class Semaphore {
231  private:
232  sem_t m_sema;
233 
234  Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
235  Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
236 
237  public:
238  Semaphore(int initialCount = 0)
239  {
240  assert(initialCount >= 0);
241  int rc = sem_init(&m_sema, 0, static_cast<unsigned int>(initialCount));
242  assert(rc == 0);
243  (void)rc;
244  }
245 
246  ~Semaphore()
247  {
248  sem_destroy(&m_sema);
249  }
250 
251  bool wait()
252  {
253  // http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error
254  int rc;
255  do {
256  rc = sem_wait(&m_sema);
257  } while (rc == -1 && errno == EINTR);
258  return rc == 0;
259  }
260 
261  bool try_wait()
262  {
263  int rc;
264  do {
265  rc = sem_trywait(&m_sema);
266  } while (rc == -1 && errno == EINTR);
267  return rc == 0;
268  }
269 
270  bool timed_wait(std::uint64_t usecs)
271  {
272  struct timespec ts;
273  const int usecs_in_1_sec = 1000000;
274  const int nsecs_in_1_sec = 1000000000;
275 #ifdef MOODYCAMEL_LIGHTWEIGHTSEMAPHORE_MONOTONIC
276  clock_gettime(CLOCK_MONOTONIC, &ts);
277 #else
278  clock_gettime(CLOCK_REALTIME, &ts);
279 #endif
280  ts.tv_sec += (time_t)(usecs / usecs_in_1_sec);
281  ts.tv_nsec += (long)(usecs % usecs_in_1_sec) * 1000;
282  // sem_timedwait bombs if you have more than 1e9 in tv_nsec
283  // so we have to clean things up before passing it in
284  if (ts.tv_nsec >= nsecs_in_1_sec) {
285  ts.tv_nsec -= nsecs_in_1_sec;
286  ++ts.tv_sec;
287  }
288 
289  int rc;
290  do {
291 #ifdef MOODYCAMEL_LIGHTWEIGHTSEMAPHORE_MONOTONIC
292  rc = sem_clockwait(&m_sema, CLOCK_MONOTONIC, &ts);
293 #else
294  rc = sem_timedwait(&m_sema, &ts);
295 #endif
296  } while (rc == -1 && errno == EINTR);
297  return rc == 0;
298  }
299 
300  void signal()
301  {
302  while (sem_post(&m_sema) == -1)
303  ;
304  }
305 
306  void signal(int count)
307  {
308  while (count-- > 0) {
309  while (sem_post(&m_sema) == -1)
310  ;
311  }
312  }
313  };
314 #else
315 #error Unsupported platform! (No semaphore wrapper available)
316 #endif
317 
318 } // end namespace details
319 
320 //---------------------------------------------------------
321 // LightweightSemaphore
322 //---------------------------------------------------------
324 public:
325  typedef std::make_signed<std::size_t>::type ssize_t;
326 
327 private:
328  std::atomic<ssize_t> m_count;
329  details::Semaphore m_sema;
330  int m_maxSpins;
331 
332  bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1)
333  {
334  ssize_t oldCount;
335  int spin = m_maxSpins;
336  while (--spin >= 0) {
337  oldCount = m_count.load(std::memory_order_relaxed);
338  if ((oldCount > 0) && m_count.compare_exchange_strong(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
339  return true;
340  std::atomic_signal_fence(std::memory_order_acquire); // Prevent the compiler from collapsing the loop.
341  }
342  oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
343  if (oldCount > 0)
344  return true;
345  if (timeout_usecs < 0) {
346  if (m_sema.wait())
347  return true;
348  }
349  if (timeout_usecs > 0 && m_sema.timed_wait((std::uint64_t)timeout_usecs))
350  return true;
351  // At this point, we've timed out waiting for the semaphore, but the
352  // count is still decremented indicating we may still be waiting on
353  // it. So we have to re-adjust the count, but only if the semaphore
354  // wasn't signaled enough times for us too since then. If it was, we
355  // need to release the semaphore too.
356  while (true) {
357  oldCount = m_count.load(std::memory_order_acquire);
358  if (oldCount >= 0 && m_sema.try_wait())
359  return true;
360  if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
361  return false;
362  }
363  }
364 
365  ssize_t waitManyWithPartialSpinning(ssize_t max, std::int64_t timeout_usecs = -1)
366  {
367  assert(max > 0);
368  ssize_t oldCount;
369  int spin = m_maxSpins;
370  while (--spin >= 0) {
371  oldCount = m_count.load(std::memory_order_relaxed);
372  if (oldCount > 0) {
373  ssize_t newCount = oldCount > max ? oldCount - max : 0;
374  if (m_count.compare_exchange_strong(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
375  return oldCount - newCount;
376  }
377  std::atomic_signal_fence(std::memory_order_acquire);
378  }
379  oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
380  if (oldCount <= 0) {
381  if ((timeout_usecs == 0) || (timeout_usecs < 0 && !m_sema.wait()) || (timeout_usecs > 0 && !m_sema.timed_wait((std::uint64_t)timeout_usecs))) {
382  while (true) {
383  oldCount = m_count.load(std::memory_order_acquire);
384  if (oldCount >= 0 && m_sema.try_wait())
385  break;
386  if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_relaxed))
387  return 0;
388  }
389  }
390  }
391  if (max > 1)
392  return 1 + tryWaitMany(max - 1);
393  return 1;
394  }
395 
396 public:
397  LightweightSemaphore(ssize_t initialCount = 0, int maxSpins = 10000)
398  : m_count(initialCount)
399  , m_maxSpins(maxSpins)
400  {
401  assert(initialCount >= 0);
402  assert(maxSpins >= 0);
403  }
404 
405  bool tryWait()
406  {
407  ssize_t oldCount = m_count.load(std::memory_order_relaxed);
408  while (oldCount > 0) {
409  if (m_count.compare_exchange_weak(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
410  return true;
411  }
412  return false;
413  }
414 
415  bool wait()
416  {
417  return tryWait() || waitWithPartialSpinning();
418  }
419 
420  bool wait(std::int64_t timeout_usecs)
421  {
422  return tryWait() || waitWithPartialSpinning(timeout_usecs);
423  }
424 
425  // Acquires between 0 and (greedily) max, inclusive
427  {
428  assert(max >= 0);
429  ssize_t oldCount = m_count.load(std::memory_order_relaxed);
430  while (oldCount > 0) {
431  ssize_t newCount = oldCount > max ? oldCount - max : 0;
432  if (m_count.compare_exchange_weak(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
433  return oldCount - newCount;
434  }
435  return 0;
436  }
437 
438  // Acquires at least one, and (greedily) at most max
439  ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs)
440  {
441  assert(max >= 0);
442  ssize_t result = tryWaitMany(max);
443  if (result == 0 && max > 0)
444  result = waitManyWithPartialSpinning(max, timeout_usecs);
445  return result;
446  }
447 
449  {
450  ssize_t result = waitMany(max, -1);
451  assert(result > 0);
452  return result;
453  }
454 
455  void signal(ssize_t count = 1)
456  {
457  assert(count >= 0);
458  ssize_t oldCount = m_count.fetch_add(count, std::memory_order_release);
459  ssize_t toRelease = -oldCount < count ? -oldCount : count;
460  if (toRelease > 0) {
461  m_sema.signal((int)toRelease);
462  }
463  }
464 
465  std::size_t availableApprox() const
466  {
467  ssize_t count = m_count.load(std::memory_order_relaxed);
468  return count > 0 ? static_cast<std::size_t>(count) : 0;
469  }
470 };
471 
472 } // namespace utils
473 } // namespace genesis
474 
475 #endif // include guard
genesis::utils::LightweightSemaphore::waitMany
ssize_t waitMany(ssize_t max)
Definition: lightweight_semaphore.hpp:448
genesis::utils::LightweightSemaphore::signal
void signal(ssize_t count=1)
Definition: lightweight_semaphore.hpp:455
MOODYCAMEL_DELETE_FUNCTION
#define MOODYCAMEL_DELETE_FUNCTION
Definition: concurrent_queue.hpp:324
genesis::utils::LightweightSemaphore::tryWaitMany
ssize_t tryWaitMany(ssize_t max)
Definition: lightweight_semaphore.hpp:426
genesis::utils::LightweightSemaphore::availableApprox
std::size_t availableApprox() const
Definition: lightweight_semaphore.hpp:465
genesis::utils::LightweightSemaphore
Definition: lightweight_semaphore.hpp:323
genesis
Container namespace for all symbols of genesis in order to keep them separate when used as a library.
Definition: placement/formats/edge_color.cpp:42
genesis::utils::LightweightSemaphore::LightweightSemaphore
LightweightSemaphore(ssize_t initialCount=0, int maxSpins=10000)
Definition: lightweight_semaphore.hpp:397
genesis::utils::LightweightSemaphore::wait
bool wait()
Definition: lightweight_semaphore.hpp:415
genesis::utils::LightweightSemaphore::ssize_t
std::make_signed< std::size_t >::type ssize_t
Definition: lightweight_semaphore.hpp:325
genesis::utils::LightweightSemaphore::wait
bool wait(std::int64_t timeout_usecs)
Definition: lightweight_semaphore.hpp:420
genesis::utils::LightweightSemaphore::waitMany
ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs)
Definition: lightweight_semaphore.hpp:439
genesis::utils::LightweightSemaphore::tryWait
bool tryWait()
Definition: lightweight_semaphore.hpp:405