The OpenD Programming Language

1 /**
2  * The condition module provides a primitive for synchronized condition
3  * checking.
4  *
5  * Copyright: Copyright Sean Kelly 2005 - 2009.
6  * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
7  * Authors:   Sean Kelly
8  * Source:    $(DRUNTIMESRC core/sync/_condition.d)
9  */
10 
11 /*          Copyright Sean Kelly 2005 - 2009.
12  * Distributed under the Boost Software License, Version 1.0.
13  *    (See accompanying file LICENSE or copy at
14  *          http://www.boost.org/LICENSE_1_0.txt)
15  */
16 module core.sync.condition;
17 
18 
19 public import core.sync.exception;
20 public import core.sync.mutex;
21 public import core.time;
22 
23 import core.exception : AssertError, staticError;
24 
25 
26 version (Windows)
27 {
28     import core.sync.semaphore;
29     import core.sys.windows.basetsd /+: HANDLE+/;
30     import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION,
31         DeleteCriticalSection, EnterCriticalSection, INFINITE, InitializeCriticalSection,
32         LeaveCriticalSection, ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/;
33     import core.sys.windows.windef /+: BOOL, DWORD+/;
34     import core.sys.windows.winerror /+: WAIT_TIMEOUT+/;
35 }
36 else version (Posix)
37 {
38     import core.sync.config;
39     import core.stdc.errno;
40     import core.sys.posix.pthread;
41     import core.sys.posix.time;
42 }
43 else version (FreeStanding)
44 {
45 }
46 else
47 {
48     static assert(false, "Platform not supported");
49 }
50 
51 
52 ////////////////////////////////////////////////////////////////////////////////
53 // Condition
54 //
55 // void wait();
56 // void notify();
57 // void notifyAll();
58 ////////////////////////////////////////////////////////////////////////////////
59 
60 /**
61  * This class represents a condition variable as conceived by C.A.R. Hoare.  As
62  * per Mesa type monitors however, "signal" has been replaced with "notify" to
63  * indicate that control is not transferred to the waiter when a notification
64  * is sent.
65  */
66 class Condition
67 {
68     ////////////////////////////////////////////////////////////////////////////
69     // Initialization
70     ////////////////////////////////////////////////////////////////////////////
71 
72     /**
73      * Initializes a condition object which is associated with the supplied
74      * mutex object.
75      *
76      * Params:
77      *  m = The mutex with which this condition will be associated.
78      *
79      * Throws:
80      *  SyncError on error.
81      */
82     this( Mutex m ) nothrow @safe @nogc
83     {
84         this(m, true);
85     }
86 
87     /// ditto
88     this( shared Mutex m ) shared nothrow @safe @nogc
89     {
90         import core.atomic : atomicLoad;
91         this(atomicLoad(m), true);
92     }
93 
94     //
95     private this(this Q, M)( M m, bool _unused_ ) nothrow @trusted @nogc
96         if ((is(Q == Condition) && is(M == Mutex)) ||
97             (is(Q == shared Condition) && is(M == shared Mutex)))
98     {
99         version (Windows)
100         {
101             static if (is(Q == Condition))
102             {
103                 alias HANDLE_TYPE = void*;
104             }
105             else
106             {
107                 alias HANDLE_TYPE = shared(void*);
108             }
109             m_blockLock = cast(HANDLE_TYPE) CreateSemaphoreA( null, 1, 1, null );
110             if ( m_blockLock == m_blockLock.init )
111                 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__);
112             scope(failure) CloseHandle( cast(void*) m_blockLock );
113 
114             m_blockQueue = cast(HANDLE_TYPE) CreateSemaphoreA( null, 0, int.max, null );
115             if ( m_blockQueue == m_blockQueue.init )
116                 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__);
117             scope(failure) CloseHandle( cast(void*) m_blockQueue );
118 
119             InitializeCriticalSection( cast(RTL_CRITICAL_SECTION*) &m_unblockLock );
120             m_assocMutex = m;
121         }
122         else version (Posix)
123         {
124             static if (is(Q == shared))
125             {
126                 import core.atomic : atomicLoad;
127                 m_assocMutex = atomicLoad(m);
128             }
129             else
130             {
131                 m_assocMutex = m;
132             }
133             static if ( is( typeof( pthread_condattr_setclock ) ) )
134             {
135                 () @trusted
136                 {
137                     pthread_condattr_t attr = void;
138                     int rc  = pthread_condattr_init( &attr );
139                     if ( rc )
140                         throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__);
141                     rc = pthread_condattr_setclock( &attr, CLOCK_MONOTONIC );
142                     if ( rc )
143                         throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__);
144                     rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, &attr );
145                     if ( rc )
146                         throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__);
147                     rc = pthread_condattr_destroy( &attr );
148                     if ( rc )
149                         throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__);
150                 } ();
151             }
152             else
153             {
154                 int rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, null );
155                 if ( rc )
156                     throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__);
157             }
158         }
159     }
160 
161     ~this() @nogc
162     {
163         version (Windows)
164         {
165             BOOL rc = CloseHandle( m_blockLock );
166             assert( rc, "Unable to destroy condition" );
167             rc = CloseHandle( m_blockQueue );
168             assert( rc, "Unable to destroy condition" );
169             DeleteCriticalSection( &m_unblockLock );
170         }
171         else version (Posix)
172         {
173             int rc = pthread_cond_destroy( &m_hndl );
174             assert( !rc, "Unable to destroy condition" );
175         }
176     }
177 
178 
179     ////////////////////////////////////////////////////////////////////////////
180     // General Properties
181     ////////////////////////////////////////////////////////////////////////////
182 
183 
184     /**
185      * Gets the mutex associated with this condition.
186      *
187      * Returns:
188      *  The mutex associated with this condition.
189      */
190     @property Mutex mutex()
191     {
192         return m_assocMutex;
193     }
194 
195     /// ditto
196     @property shared(Mutex) mutex() shared
197     {
198         import core.atomic : atomicLoad;
199         return atomicLoad(m_assocMutex);
200     }
201 
202     // undocumented function for internal use
203     final @property Mutex mutex_nothrow() pure nothrow @safe @nogc
204     {
205         return m_assocMutex;
206     }
207 
208     // ditto
209     final @property shared(Mutex) mutex_nothrow() shared pure nothrow @safe @nogc
210     {
211         import core.atomic : atomicLoad;
212         return atomicLoad(m_assocMutex);
213     }
214 
215     ////////////////////////////////////////////////////////////////////////////
216     // General Actions
217     ////////////////////////////////////////////////////////////////////////////
218 
219 
220     /**
221      * Wait until notified.
222      *
223      * Throws:
224      *  SyncError on error.
225      */
226     void wait()
227     {
228         wait!(typeof(this))(true);
229     }
230 
231     /// ditto
232     void wait() shared
233     {
234         wait!(typeof(this))(true);
235     }
236 
237     /// ditto
238     void wait(this Q)( bool _unused_ )
239         if (is(Q == Condition) || is(Q == shared Condition))
240     {
241         version (Windows)
242         {
243             timedWait( INFINITE );
244         }
245         else version (Posix)
246         {
247             int rc = pthread_cond_wait( cast(pthread_cond_t*) &m_hndl, (cast(Mutex) m_assocMutex).handleAddr() );
248             if ( rc )
249                 throw staticError!AssertError("Unable to wait for condition", __FILE__, __LINE__);
250         }
251     }
252 
253     /**
254      * Suspends the calling thread until a notification occurs or until the
255      * supplied time period has elapsed.
256      *
257      * Params:
258      *  val = The time to wait.
259      *
260      * In:
261      *  val must be non-negative.
262      *
263      * Throws:
264      *  SyncError on error.
265      *
266      * Returns:
267      *  true if notified before the timeout and false if not.
268      */
269     bool wait( Duration val )
270     {
271         return wait!(typeof(this))(val, true);
272     }
273 
274     /// ditto
275     bool wait( Duration val ) shared
276     {
277         return wait!(typeof(this))(val, true);
278     }
279 
280     /// ditto
281     bool wait(this Q)( Duration val, bool _unused_ )
282         if (is(Q == Condition) || is(Q == shared Condition))
283     in
284     {
285         assert( !val.isNegative );
286     }
287     do
288     {
289         version (Windows)
290         {
291             auto maxWaitMillis = dur!("msecs")( uint.max - 1 );
292 
293             while ( val > maxWaitMillis )
294             {
295                 if ( timedWait( cast(uint)
296                                maxWaitMillis.total!"msecs" ) )
297                     return true;
298                 val -= maxWaitMillis;
299             }
300             return timedWait( cast(uint) val.total!"msecs" );
301         }
302         else version (Posix)
303         {
304             timespec t = void;
305             mktspec( t, val );
306 
307             int rc = pthread_cond_timedwait( cast(pthread_cond_t*) &m_hndl,
308                                              (cast(Mutex) m_assocMutex).handleAddr(),
309                                              &t );
310             if ( !rc )
311                 return true;
312             if ( rc == ETIMEDOUT )
313                 return false;
314             throw staticError!AssertError("Unable to wait for condition", __FILE__, __LINE__);
315         }
316 	else version (FreeStanding) assert(0);
317     }
318 
319     /**
320      * Notifies one waiter.
321      *
322      * Throws:
323      *  SyncError on error.
324      */
325     void notify()
326     {
327         notify!(typeof(this))(true);
328     }
329 
330     /// ditto
331     void notify() shared
332     {
333         notify!(typeof(this))(true);
334     }
335 
336     /// ditto
337     void notify(this Q)( bool _unused_ )
338         if (is(Q == Condition) || is(Q == shared Condition))
339     {
340         version (Windows)
341         {
342             notify_( false );
343         }
344         else version (Posix)
345         {
346             // Since OS X 10.7 (Lion), pthread_cond_signal returns EAGAIN after retrying 8192 times,
347             // so need to retrying while it returns EAGAIN.
348             //
349             // 10.7.0 (Lion):          http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c
350             // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c
351             // 10.10.0 (Yosemite):     http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c
352             // 10.11.0 (El Capitan):   http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c
353             // 10.12.0 (Sierra):       http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c
354             // 10.13.0 (High Sierra):  http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c
355             // 10.14.0 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c
356             // 10.14.1 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c
357 
358             int rc;
359             do {
360                 rc = pthread_cond_signal( cast(pthread_cond_t*) &m_hndl );
361             } while ( rc == EAGAIN );
362             if ( rc )
363                 throw staticError!AssertError("Unable to notify condition", __FILE__, __LINE__);
364         }
365     }
366 
367     /**
368      * Notifies all waiters.
369      *
370      * Throws:
371      *  SyncError on error.
372      */
373     void notifyAll()
374     {
375         notifyAll!(typeof(this))(true);
376     }
377 
378     /// ditto
379     void notifyAll() shared
380     {
381         notifyAll!(typeof(this))(true);
382     }
383 
384     /// ditto
385     void notifyAll(this Q)( bool _unused_ )
386         if (is(Q == Condition) || is(Q == shared Condition))
387     {
388         version (Windows)
389         {
390             notify_( true );
391         }
392         else version (Posix)
393         {
394             // Since OS X 10.7 (Lion), pthread_cond_broadcast returns EAGAIN after retrying 8192 times,
395             // so need to retrying while it returns EAGAIN.
396             //
397             // 10.7.0 (Lion):          http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c
398             // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c
399             // 10.10.0 (Yosemite):     http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c
400             // 10.11.0 (El Capitan):   http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c
401             // 10.12.0 (Sierra):       http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c
402             // 10.13.0 (High Sierra):  http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c
403             // 10.14.0 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c
404             // 10.14.1 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c
405 
406             int rc;
407             do {
408                 rc = pthread_cond_broadcast( cast(pthread_cond_t*) &m_hndl );
409             } while ( rc == EAGAIN );
410             if ( rc )
411                 throw staticError!AssertError("Unable to notify condition", __FILE__, __LINE__);
412         }
413     }
414 
415 private:
416     version (Windows)
417     {
418         bool timedWait(this Q)( DWORD timeout )
419             if (is(Q == Condition) || is(Q == shared Condition))
420         {
421             static if (is(Q == Condition))
422             {
423                 auto op(string o, T, V1)(ref T val, V1 mod)
424                 {
425                     return mixin("val " ~ o ~ "mod");
426                 }
427             }
428             else
429             {
430                 auto op(string o, T, V1)(ref shared T val, V1 mod)
431                 {
432                     import core.atomic: atomicOp;
433                     return atomicOp!o(val, mod);
434                 }
435             }
436 
437             int   numSignalsLeft;
438             int   numWaitersGone;
439             DWORD rc;
440 
441             rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
442             assert( rc == WAIT_OBJECT_0 );
443 
444             op!"+="(m_numWaitersBlocked, 1);
445 
446             rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
447             assert( rc );
448 
449             m_assocMutex.unlock();
450             scope(failure) m_assocMutex.lock();
451 
452             rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, timeout );
453             assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
454             bool timedOut = (rc == WAIT_TIMEOUT);
455 
456             EnterCriticalSection( &m_unblockLock );
457             scope(failure) LeaveCriticalSection( &m_unblockLock );
458 
459             if ( (numSignalsLeft = m_numWaitersToUnblock) != 0 )
460             {
461                 if ( timedOut )
462                 {
463                     // timeout (or canceled)
464                     if ( m_numWaitersBlocked != 0 )
465                     {
466                         op!"-="(m_numWaitersBlocked, 1);
467                         // do not unblock next waiter below (already unblocked)
468                         numSignalsLeft = 0;
469                     }
470                     else
471                     {
472                         // spurious wakeup pending!!
473                         m_numWaitersGone = 1;
474                     }
475                 }
476                 if ( op!"-="(m_numWaitersToUnblock, 1) == 0 )
477                 {
478                     if ( m_numWaitersBlocked != 0 )
479                     {
480                         // open the gate
481                         rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
482                         assert( rc );
483                         // do not open the gate below again
484                         numSignalsLeft = 0;
485                     }
486                     else if ( (numWaitersGone = m_numWaitersGone) != 0 )
487                     {
488                         m_numWaitersGone = 0;
489                     }
490                 }
491             }
492             else if ( op!"+="(m_numWaitersGone, 1) == int.max / 2 )
493             {
494                 // timeout/canceled or spurious event :-)
495                 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
496                 assert( rc == WAIT_OBJECT_0 );
497                 // something is going on here - test of timeouts?
498                 op!"-="(m_numWaitersBlocked, m_numWaitersGone);
499                 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
500                 assert( rc == WAIT_OBJECT_0 );
501                 m_numWaitersGone = 0;
502             }
503 
504             LeaveCriticalSection( &m_unblockLock );
505 
506             if ( numSignalsLeft == 1 )
507             {
508                 // better now than spurious later (same as ResetEvent)
509                 for ( ; numWaitersGone > 0; --numWaitersGone )
510                 {
511                     rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, INFINITE );
512                     assert( rc == WAIT_OBJECT_0 );
513                 }
514                 // open the gate
515                 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
516                 assert( rc );
517             }
518             else if ( numSignalsLeft != 0 )
519             {
520                 // unblock next waiter
521                 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null );
522                 assert( rc );
523             }
524             m_assocMutex.lock();
525             return !timedOut;
526         }
527 
528 
529         void notify_(this Q)( bool all )
530             if (is(Q == Condition) || is(Q == shared Condition))
531         {
532             static if (is(Q == Condition))
533             {
534                 auto op(string o, T, V1)(ref T val, V1 mod)
535                 {
536                     return mixin("val " ~ o ~ "mod");
537                 }
538             }
539             else
540             {
541                 auto op(string o, T, V1)(ref shared T val, V1 mod)
542                 {
543                     import core.atomic: atomicOp;
544                     return atomicOp!o(val, mod);
545                 }
546             }
547 
548             DWORD rc;
549 
550             EnterCriticalSection( &m_unblockLock );
551             scope(failure) LeaveCriticalSection( &m_unblockLock );
552 
553             if ( m_numWaitersToUnblock != 0 )
554             {
555                 if ( m_numWaitersBlocked == 0 )
556                 {
557                     LeaveCriticalSection( &m_unblockLock );
558                     return;
559                 }
560                 if ( all )
561                 {
562                     op!"+="(m_numWaitersToUnblock, m_numWaitersBlocked);
563                     m_numWaitersBlocked = 0;
564                 }
565                 else
566                 {
567                     op!"+="(m_numWaitersToUnblock, 1);
568                     op!"-="(m_numWaitersBlocked, 1);
569                 }
570                 LeaveCriticalSection( &m_unblockLock );
571             }
572             else if ( m_numWaitersBlocked > m_numWaitersGone )
573             {
574                 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
575                 assert( rc == WAIT_OBJECT_0 );
576                 if ( 0 != m_numWaitersGone )
577                 {
578                     op!"-="(m_numWaitersBlocked, m_numWaitersGone);
579                     m_numWaitersGone = 0;
580                 }
581                 if ( all )
582                 {
583                     m_numWaitersToUnblock = m_numWaitersBlocked;
584                     m_numWaitersBlocked = 0;
585                 }
586                 else
587                 {
588                     m_numWaitersToUnblock = 1;
589                     op!"-="(m_numWaitersBlocked, 1);
590                 }
591                 LeaveCriticalSection( &m_unblockLock );
592                 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null );
593                 assert( rc );
594             }
595             else
596             {
597                 LeaveCriticalSection( &m_unblockLock );
598             }
599         }
600 
601 
602         // NOTE: This implementation uses Algorithm 8c as described here:
603         //       http://groups.google.com/group/comp.programming.threads/
604         //              browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a
605         HANDLE              m_blockLock;    // auto-reset event (now semaphore)
606         HANDLE              m_blockQueue;   // auto-reset event (now semaphore)
607         Mutex               m_assocMutex;   // external mutex/CS
608         CRITICAL_SECTION    m_unblockLock;  // internal mutex/CS
609         int                 m_numWaitersGone        = 0;
610         int                 m_numWaitersBlocked     = 0;
611         int                 m_numWaitersToUnblock   = 0;
612     }
613     else version (Posix)
614     {
615         Mutex               m_assocMutex;
616         pthread_cond_t      m_hndl;
617     }
618     else version (FreeStanding)
619     {
620     	Mutex m_assocMutex;
621     }
622 }
623 
624 
625 ////////////////////////////////////////////////////////////////////////////////
626 // Unit Tests
627 ////////////////////////////////////////////////////////////////////////////////
628 
629 unittest
630 {
631     import core.thread;
632     import core.sync.mutex;
633     import core.sync.semaphore;
634 
635 
636     void testNotify()
637     {
638         auto mutex      = new Mutex;
639         auto condReady  = new Condition( mutex );
640         auto semDone    = new Semaphore;
641         auto synLoop    = new Object;
642         int  numWaiters = 10;
643         int  numTries   = 10;
644         int  numReady   = 0;
645         int  numTotal   = 0;
646         int  numDone    = 0;
647         int  numPost    = 0;
648 
649         void waiter()
650         {
651             for ( int i = 0; i < numTries; ++i )
652             {
653                 synchronized( mutex )
654                 {
655                     while ( numReady < 1 )
656                     {
657                         condReady.wait();
658                     }
659                     --numReady;
660                     ++numTotal;
661                 }
662 
663                 synchronized( synLoop )
664                 {
665                     ++numDone;
666                 }
667                 semDone.wait();
668             }
669         }
670 
671         auto group = new ThreadGroup;
672 
673         for ( int i = 0; i < numWaiters; ++i )
674             group.create( &waiter );
675 
676         for ( int i = 0; i < numTries; ++i )
677         {
678             for ( int j = 0; j < numWaiters; ++j )
679             {
680                 synchronized( mutex )
681                 {
682                     ++numReady;
683                     condReady.notify();
684                 }
685             }
686             while ( true )
687             {
688                 synchronized( synLoop )
689                 {
690                     if ( numDone >= numWaiters )
691                         break;
692                 }
693                 Thread.yield();
694             }
695             for ( int j = 0; j < numWaiters; ++j )
696             {
697                 semDone.notify();
698             }
699         }
700 
701         group.joinAll();
702         assert( numTotal == numWaiters * numTries );
703     }
704 
705 
706     void testNotifyAll()
707     {
708         auto mutex      = new Mutex;
709         auto condReady  = new Condition( mutex );
710         int  numWaiters = 10;
711         int  numReady   = 0;
712         int  numDone    = 0;
713         bool alert      = false;
714 
715         void waiter()
716         {
717             synchronized( mutex )
718             {
719                 ++numReady;
720                 while ( !alert )
721                     condReady.wait();
722                 ++numDone;
723             }
724         }
725 
726         auto group = new ThreadGroup;
727 
728         for ( int i = 0; i < numWaiters; ++i )
729             group.create( &waiter );
730 
731         while ( true )
732         {
733             synchronized( mutex )
734             {
735                 if ( numReady >= numWaiters )
736                 {
737                     alert = true;
738                     condReady.notifyAll();
739                     break;
740                 }
741             }
742             Thread.yield();
743         }
744         group.joinAll();
745         assert( numReady == numWaiters && numDone == numWaiters );
746     }
747 
748 
749     void testWaitTimeout()
750     {
751         auto mutex      = new Mutex;
752         auto condReady  = new Condition( mutex );
753         bool waiting    = false;
754         bool alertedOne = true;
755         bool alertedTwo = true;
756 
757         void waiter()
758         {
759             synchronized( mutex )
760             {
761                 waiting    = true;
762                 // we never want to miss the notification (30s)
763                 alertedOne = condReady.wait( dur!"seconds"(30) );
764                 // but we don't want to wait long for the timeout (10ms)
765                 alertedTwo = condReady.wait( dur!"msecs"(10) );
766             }
767         }
768 
769         auto thread = new Thread( &waiter );
770         thread.start();
771 
772         while ( true )
773         {
774             synchronized( mutex )
775             {
776                 if ( waiting )
777                 {
778                     condReady.notify();
779                     break;
780                 }
781             }
782             Thread.yield();
783         }
784         thread.join();
785         assert( waiting );
786         assert( alertedOne );
787         assert( !alertedTwo );
788     }
789 
790     testNotify();
791     testNotifyAll();
792     testWaitTimeout();
793 }
794 
795 unittest
796 {
797     import core.thread;
798     import core.sync.mutex;
799     import core.sync.semaphore;
800 
801 
802     void testNotify()
803     {
804         auto mutex      = new shared Mutex;
805         auto condReady  = new shared Condition( mutex );
806         auto semDone    = new Semaphore;
807         auto synLoop    = new Object;
808         int  numWaiters = 10;
809         int  numTries   = 10;
810         int  numReady   = 0;
811         int  numTotal   = 0;
812         int  numDone    = 0;
813         int  numPost    = 0;
814 
815         void waiter()
816         {
817             for ( int i = 0; i < numTries; ++i )
818             {
819                 synchronized( mutex )
820                 {
821                     while ( numReady < 1 )
822                     {
823                         condReady.wait();
824                     }
825                     --numReady;
826                     ++numTotal;
827                 }
828 
829                 synchronized( synLoop )
830                 {
831                     ++numDone;
832                 }
833                 semDone.wait();
834             }
835         }
836 
837         auto group = new ThreadGroup;
838 
839         for ( int i = 0; i < numWaiters; ++i )
840             group.create( &waiter );
841 
842         for ( int i = 0; i < numTries; ++i )
843         {
844             for ( int j = 0; j < numWaiters; ++j )
845             {
846                 synchronized( mutex )
847                 {
848                     ++numReady;
849                     condReady.notify();
850                 }
851             }
852             while ( true )
853             {
854                 synchronized( synLoop )
855                 {
856                     if ( numDone >= numWaiters )
857                         break;
858                 }
859                 Thread.yield();
860             }
861             for ( int j = 0; j < numWaiters; ++j )
862             {
863                 semDone.notify();
864             }
865         }
866 
867         group.joinAll();
868         assert( numTotal == numWaiters * numTries );
869     }
870 
871 
872     void testNotifyAll()
873     {
874         auto mutex      = new shared Mutex;
875         auto condReady  = new shared Condition( mutex );
876         int  numWaiters = 10;
877         int  numReady   = 0;
878         int  numDone    = 0;
879         bool alert      = false;
880 
881         void waiter()
882         {
883             synchronized( mutex )
884             {
885                 ++numReady;
886                 while ( !alert )
887                     condReady.wait();
888                 ++numDone;
889             }
890         }
891 
892         auto group = new ThreadGroup;
893 
894         for ( int i = 0; i < numWaiters; ++i )
895             group.create( &waiter );
896 
897         while ( true )
898         {
899             synchronized( mutex )
900             {
901                 if ( numReady >= numWaiters )
902                 {
903                     alert = true;
904                     condReady.notifyAll();
905                     break;
906                 }
907             }
908             Thread.yield();
909         }
910         group.joinAll();
911         assert( numReady == numWaiters && numDone == numWaiters );
912     }
913 
914 
915     void testWaitTimeout()
916     {
917         auto mutex      = new shared Mutex;
918         auto condReady  = new shared Condition( mutex );
919         bool waiting    = false;
920         bool alertedOne = true;
921         bool alertedTwo = true;
922 
923         void waiter()
924         {
925             synchronized( mutex )
926             {
927                 waiting    = true;
928                 // we never want to miss the notification (30s)
929                 alertedOne = condReady.wait( dur!"seconds"(30) );
930                 // but we don't want to wait long for the timeout (10ms)
931                 alertedTwo = condReady.wait( dur!"msecs"(10) );
932             }
933         }
934 
935         auto thread = new Thread( &waiter );
936         thread.start();
937 
938         while ( true )
939         {
940             synchronized( mutex )
941             {
942                 if ( waiting )
943                 {
944                     condReady.notify();
945                     break;
946                 }
947             }
948             Thread.yield();
949         }
950         thread.join();
951         assert( waiting );
952         assert( alertedOne );
953         assert( !alertedTwo );
954     }
955 
956     testNotify();
957     testNotifyAll();
958     testWaitTimeout();
959 }