The OpenD Programming Language

1 /**
2  * The condition module provides a primitive for synchronized condition
3  * checking.
4  *
5  * Copyright: Copyright Sean Kelly 2005 - 2009.
6  * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
7  * Authors:   Sean Kelly
8  * Source:    $(DRUNTIMESRC core/sync/_condition.d)
9  */
10 
11 /*          Copyright Sean Kelly 2005 - 2009.
12  * Distributed under the Boost Software License, Version 1.0.
13  *    (See accompanying file LICENSE or copy at
14  *          http://www.boost.org/LICENSE_1_0.txt)
15  */
16 module core.sync.condition;
17 
18 
19 public import core.sync.exception;
20 public import core.sync.mutex;
21 public import core.time;
22 
23 import core.exception : AssertError, staticError;
24 
25 import rt.sys.config;
26 
27 mixin("import " ~ osConditionImport ~ ";");
28 
29 
30 ////////////////////////////////////////////////////////////////////////////////
31 // Condition
32 //
33 // void wait();
34 // void notify();
35 // void notifyAll();
36 ////////////////////////////////////////////////////////////////////////////////
37 
38 /**
39  * This class represents a condition variable as conceived by C.A.R. Hoare.  As
40  * per Mesa type monitors however, "signal" has been replaced with "notify" to
41  * indicate that control is not transferred to the waiter when a notification
42  * is sent.
43  */
44 class Condition
45 {
46     ////////////////////////////////////////////////////////////////////////////
47     // Initialization
48     ////////////////////////////////////////////////////////////////////////////
49 
50     /**
51      * Initializes a condition object which is associated with the supplied
52      * mutex object.
53      *
54      * Params:
55      *  m = The mutex with which this condition will be associated.
56      *
57      * Throws:
58      *  SyncError on error.
59      */
60     this( Mutex m ) nothrow @safe @nogc
61     {
62         this(m, true);
63     }
64 
65     /// ditto
66     this( shared Mutex m ) shared nothrow @safe @nogc
67     {
68         import core.atomic : atomicLoad;
69         this(atomicLoad(m), true);
70     }
71 
72     //
73     private this(this Q, M)( M m, bool _unused_ ) nothrow @trusted @nogc
74         if ((is(Q == Condition) && is(M == Mutex)) ||
75             (is(Q == shared Condition) && is(M == shared Mutex)))
76     {
77         (cast(OsCondition)osCondition).create(cast(Mutex)m);
78     }
79 
80     ~this() @nogc
81     {
82         (cast(OsCondition)osCondition).destroy();
83     }
84 
85 
86     ////////////////////////////////////////////////////////////////////////////
87     // General Properties
88     ////////////////////////////////////////////////////////////////////////////
89 
90 
91     /**
92      * Gets the mutex associated with this condition.
93      *
94      * Returns:
95      *  The mutex associated with this condition.
96      */
97     @property Mutex mutex()
98     {
99         return osCondition.mutex();
100     }
101 
102     /// ditto
103     @property shared(Mutex) mutex() shared
104     {
105         return osCondition.mutex();
106     }
107 
108     // undocumented function for internal use
109     final @property Mutex mutex_nothrow() pure nothrow @safe @nogc
110     {
111         return osCondition.mutex_nothrow();
112     }
113 
114     // ditto
115     final @property shared(Mutex) mutex_nothrow() shared pure nothrow @safe @nogc
116     {
117         return osCondition.mutex_nothrow();
118     }
119 
120     ////////////////////////////////////////////////////////////////////////////
121     // General Actions
122     ////////////////////////////////////////////////////////////////////////////
123 
124 
125     /**
126      * Wait until notified.
127      *
128      * Throws:
129      *  SyncError on error.
130      */
131     void wait()
132     {
133         wait!(typeof(this))(true);
134     }
135 
136     /// ditto
137     void wait() shared
138     {
139         wait!(typeof(this))(true);
140     }
141 
142     /// ditto
143     void wait(this Q)( bool _unused_ )
144         if (is(Q == Condition) || is(Q == shared Condition))
145     {
146         (cast(OsCondition)osCondition).wait();
147     }
148 
149     /**
150      * Suspends the calling thread until a notification occurs or until the
151      * supplied time period has elapsed.
152      *
153      * Params:
154      *  val = The time to wait.
155      *
156      * In:
157      *  val must be non-negative.
158      *
159      * Throws:
160      *  SyncError on error.
161      *
162      * Returns:
163      *  true if notified before the timeout and false if not.
164      */
165     bool wait( Duration val )
166     {
167         return wait!(typeof(this))(val, true);
168     }
169 
170     /// ditto
171     bool wait( Duration val ) shared
172     {
173         return wait!(typeof(this))(val, true);
174     }
175 
176     /// ditto
177     bool wait(this Q)( Duration val, bool _unused_ )
178         if (is(Q == Condition) || is(Q == shared Condition))
179     in
180     {
181         assert( !val.isNegative );
182     }
183     do
184     {
185         return (cast(OsCondition)osCondition).wait(val);
186     }
187 
188     /**
189      * Notifies one waiter.
190      *
191      * Throws:
192      *  SyncError on error.
193      */
194     void notify()
195     {
196         notify!(typeof(this))(true);
197     }
198 
199     /// ditto
200     void notify() shared
201     {
202         notify!(typeof(this))(true);
203     }
204 
205     /// ditto
206     void notify(this Q)( bool _unused_ )
207         if (is(Q == Condition) || is(Q == shared Condition))
208     {
209         (cast(OsCondition)osCondition).notify();
210     }
211 
212     /**
213      * Notifies all waiters.
214      *
215      * Throws:
216      *  SyncError on error.
217      */
218     void notifyAll()
219     {
220         notifyAll!(typeof(this))(true);
221     }
222 
223     /// ditto
224     void notifyAll() shared
225     {
226         notifyAll!(typeof(this))(true);
227     }
228 
229     /// ditto
230     void notifyAll(this Q)( bool _unused_ )
231         if (is(Q == Condition) || is(Q == shared Condition))
232     {
233         (cast(OsCondition)osCondition).notifyAll();
234     }
235 
236 private:
237 
238     OsCondition osCondition;
239 }
240 
241 
242 ////////////////////////////////////////////////////////////////////////////////
243 // Unit Tests
244 ////////////////////////////////////////////////////////////////////////////////
245 
246 unittest
247 {
248     import core.thread;
249     import core.sync.mutex;
250     import core.sync.semaphore;
251 
252 
253     void testNotify()
254     {
255         auto mutex      = new Mutex;
256         auto condReady  = new Condition( mutex );
257         auto semDone    = new Semaphore;
258         auto synLoop    = new Object;
259         int  numWaiters = 10;
260         int  numTries   = 10;
261         int  numReady   = 0;
262         int  numTotal   = 0;
263         int  numDone    = 0;
264         int  numPost    = 0;
265 
266         void waiter()
267         {
268             for ( int i = 0; i < numTries; ++i )
269             {
270                 synchronized( mutex )
271                 {
272                     while ( numReady < 1 )
273                     {
274                         condReady.wait();
275                     }
276                     --numReady;
277                     ++numTotal;
278                 }
279 
280                 synchronized( synLoop )
281                 {
282                     ++numDone;
283                 }
284                 semDone.wait();
285             }
286         }
287 
288         auto group = new ThreadGroup;
289 
290         for ( int i = 0; i < numWaiters; ++i )
291             group.create( &waiter );
292 
293         for ( int i = 0; i < numTries; ++i )
294         {
295             for ( int j = 0; j < numWaiters; ++j )
296             {
297                 synchronized( mutex )
298                 {
299                     ++numReady;
300                     condReady.notify();
301                 }
302             }
303             while ( true )
304             {
305                 synchronized( synLoop )
306                 {
307                     if ( numDone >= numWaiters )
308                         break;
309                 }
310                 Thread.yield();
311             }
312             for ( int j = 0; j < numWaiters; ++j )
313             {
314                 semDone.notify();
315             }
316         }
317 
318         group.joinAll();
319         assert( numTotal == numWaiters * numTries );
320     }
321 
322 
323     void testNotifyAll()
324     {
325         auto mutex      = new Mutex;
326         auto condReady  = new Condition( mutex );
327         int  numWaiters = 10;
328         int  numReady   = 0;
329         int  numDone    = 0;
330         bool alert      = false;
331 
332         void waiter()
333         {
334             synchronized( mutex )
335             {
336                 ++numReady;
337                 while ( !alert )
338                     condReady.wait();
339                 ++numDone;
340             }
341         }
342 
343         auto group = new ThreadGroup;
344 
345         for ( int i = 0; i < numWaiters; ++i )
346             group.create( &waiter );
347 
348         while ( true )
349         {
350             synchronized( mutex )
351             {
352                 if ( numReady >= numWaiters )
353                 {
354                     alert = true;
355                     condReady.notifyAll();
356                     break;
357                 }
358             }
359             Thread.yield();
360         }
361         group.joinAll();
362         assert( numReady == numWaiters && numDone == numWaiters );
363     }
364 
365 
366     void testWaitTimeout()
367     {
368         auto mutex      = new Mutex;
369         auto condReady  = new Condition( mutex );
370         bool waiting    = false;
371         bool alertedOne = true;
372         bool alertedTwo = true;
373 
374         void waiter()
375         {
376             synchronized( mutex )
377             {
378                 waiting    = true;
379                 // we never want to miss the notification (30s)
380                 alertedOne = condReady.wait( dur!"seconds"(30) );
381                 // but we don't want to wait long for the timeout (10ms)
382                 alertedTwo = condReady.wait( dur!"msecs"(10) );
383             }
384         }
385 
386         auto thread = new Thread( &waiter );
387         thread.start();
388 
389         while ( true )
390         {
391             synchronized( mutex )
392             {
393                 if ( waiting )
394                 {
395                     condReady.notify();
396                     break;
397                 }
398             }
399             Thread.yield();
400         }
401         thread.join();
402         assert( waiting );
403         assert( alertedOne );
404         assert( !alertedTwo );
405     }
406 
407     testNotify();
408     testNotifyAll();
409     testWaitTimeout();
410 }
411 
412 unittest
413 {
414     import core.thread;
415     import core.sync.mutex;
416     import core.sync.semaphore;
417 
418 
419     void testNotify()
420     {
421         auto mutex      = new shared Mutex;
422         auto condReady  = new shared Condition( mutex );
423         auto semDone    = new Semaphore;
424         auto synLoop    = new Object;
425         int  numWaiters = 10;
426         int  numTries   = 10;
427         int  numReady   = 0;
428         int  numTotal   = 0;
429         int  numDone    = 0;
430         int  numPost    = 0;
431 
432         void waiter()
433         {
434             for ( int i = 0; i < numTries; ++i )
435             {
436                 synchronized( mutex )
437                 {
438                     while ( numReady < 1 )
439                     {
440                         condReady.wait();
441                     }
442                     --numReady;
443                     ++numTotal;
444                 }
445 
446                 synchronized( synLoop )
447                 {
448                     ++numDone;
449                 }
450                 semDone.wait();
451             }
452         }
453 
454         auto group = new ThreadGroup;
455 
456         for ( int i = 0; i < numWaiters; ++i )
457             group.create( &waiter );
458 
459         for ( int i = 0; i < numTries; ++i )
460         {
461             for ( int j = 0; j < numWaiters; ++j )
462             {
463                 synchronized( mutex )
464                 {
465                     ++numReady;
466                     condReady.notify();
467                 }
468             }
469             while ( true )
470             {
471                 synchronized( synLoop )
472                 {
473                     if ( numDone >= numWaiters )
474                         break;
475                 }
476                 Thread.yield();
477             }
478             for ( int j = 0; j < numWaiters; ++j )
479             {
480                 semDone.notify();
481             }
482         }
483 
484         group.joinAll();
485         assert( numTotal == numWaiters * numTries );
486     }
487 
488 
489     void testNotifyAll()
490     {
491         auto mutex      = new shared Mutex;
492         auto condReady  = new shared Condition( mutex );
493         int  numWaiters = 10;
494         int  numReady   = 0;
495         int  numDone    = 0;
496         bool alert      = false;
497 
498         void waiter()
499         {
500             synchronized( mutex )
501             {
502                 ++numReady;
503                 while ( !alert )
504                     condReady.wait();
505                 ++numDone;
506             }
507         }
508 
509         auto group = new ThreadGroup;
510 
511         for ( int i = 0; i < numWaiters; ++i )
512             group.create( &waiter );
513 
514         while ( true )
515         {
516             synchronized( mutex )
517             {
518                 if ( numReady >= numWaiters )
519                 {
520                     alert = true;
521                     condReady.notifyAll();
522                     break;
523                 }
524             }
525             Thread.yield();
526         }
527         group.joinAll();
528         assert( numReady == numWaiters && numDone == numWaiters );
529     }
530 
531 
532     void testWaitTimeout()
533     {
534         auto mutex      = new shared Mutex;
535         auto condReady  = new shared Condition( mutex );
536         bool waiting    = false;
537         bool alertedOne = true;
538         bool alertedTwo = true;
539 
540         void waiter()
541         {
542             synchronized( mutex )
543             {
544                 waiting    = true;
545                 // we never want to miss the notification (30s)
546                 alertedOne = condReady.wait( dur!"seconds"(30) );
547                 // but we don't want to wait long for the timeout (10ms)
548                 alertedTwo = condReady.wait( dur!"msecs"(10) );
549             }
550         }
551 
552         auto thread = new Thread( &waiter );
553         thread.start();
554 
555         while ( true )
556         {
557             synchronized( mutex )
558             {
559                 if ( waiting )
560                 {
561                     condReady.notify();
562                     break;
563                 }
564             }
565             Thread.yield();
566         }
567         thread.join();
568         assert( waiting );
569         assert( alertedOne );
570         assert( !alertedTwo );
571     }
572 
573     testNotify();
574     testNotifyAll();
575     testWaitTimeout();
576 }