The OpenD Programming Language

1 /**
2  * The read/write mutex module provides a primitive for maintaining shared read
3  * access and mutually exclusive write access.
4  *
5  * Copyright: Copyright Sean Kelly 2005 - 2009.
6  * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
7  * Authors:   Sean Kelly
8  * Source:    $(DRUNTIMESRC core/sync/_rwmutex.d)
9  */
10 
11 /*          Copyright Sean Kelly 2005 - 2009.
12  * Distributed under the Boost Software License, Version 1.0.
13  *    (See accompanying file LICENSE or copy at
14  *          http://www.boost.org/LICENSE_1_0.txt)
15  */
16 module core.sync.rwmutex;
17 
18 
19 public import core.sync.exception;
20 import core.sync.condition;
21 import core.sync.mutex;
22 import core.memory;
23 
24 
25 ////////////////////////////////////////////////////////////////////////////////
26 // ReadWriteMutex
27 //
28 // Reader reader();
29 // Writer writer();
30 ////////////////////////////////////////////////////////////////////////////////
31 
32 
33 /**
34  * This class represents a mutex that allows any number of readers to enter,
35  * but when a writer enters, all other readers and writers are blocked.
36  *
37  * Please note that this mutex is not recursive and is intended to guard access
38  * to data only.  Also, no deadlock checking is in place because doing so would
39  * require dynamic memory allocation, which would reduce performance by an
40  * unacceptable amount.  As a result, any attempt to recursively acquire this
41  * mutex may well deadlock the caller, particularly if a write lock is acquired
42  * while holding a read lock, or vice-versa.  In practice, this should not be
43  * an issue however, because it is uncommon to call deeply into unknown code
44  * while holding a lock that simply protects data.
45  */
46 class ReadWriteMutex
47 {
48     /**
49      * Defines the policy used by this mutex.  Currently, two policies are
50      * defined.
51      *
52      * The first will queue writers until no readers hold the mutex, then
53      * pass the writers through one at a time.  If a reader acquires the mutex
54      * while there are still writers queued, the reader will take precedence.
55      *
56      * The second will queue readers if there are any writers queued.  Writers
57      * are passed through one at a time, and once there are no writers present,
58      * all queued readers will be alerted.
59      *
60      * Future policies may offer a more even balance between reader and writer
61      * precedence.
62      */
63     enum Policy
64     {
65         PREFER_READERS, /// Readers get preference.  This may starve writers.
66         PREFER_WRITERS  /// Writers get preference.  This may starve readers.
67     }
68 
69 
70     ////////////////////////////////////////////////////////////////////////////
71     // Initialization
72     ////////////////////////////////////////////////////////////////////////////
73 
74 
75     /**
76      * Initializes a read/write mutex object with the supplied policy.
77      *
78      * Params:
79      *  policy = The policy to use.
80      *
81      * Throws:
82      *  SyncError on error.
83      */
84     this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow
85     {
86         m_commonMutex = new Mutex;
87         if ( !m_commonMutex )
88             throw new SyncError( "Unable to initialize mutex" );
89 
90         m_readerQueue = new Condition( m_commonMutex );
91         if ( !m_readerQueue )
92             throw new SyncError( "Unable to initialize mutex" );
93 
94         m_writerQueue = new Condition( m_commonMutex );
95         if ( !m_writerQueue )
96             throw new SyncError( "Unable to initialize mutex" );
97 
98         m_policy = policy;
99         m_reader = new Reader;
100         m_writer = new Writer;
101     }
102 
103     /// ditto
104     shared this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow
105     {
106         m_commonMutex = new shared Mutex;
107         if ( !m_commonMutex )
108             throw new SyncError( "Unable to initialize mutex" );
109 
110         m_readerQueue = new shared Condition( m_commonMutex );
111         if ( !m_readerQueue )
112             throw new SyncError( "Unable to initialize mutex" );
113 
114         m_writerQueue = new shared Condition( m_commonMutex );
115         if ( !m_writerQueue )
116             throw new SyncError( "Unable to initialize mutex" );
117 
118         m_policy = policy;
119         m_reader = new shared Reader;
120         m_writer = new shared Writer;
121     }
122 
123     ////////////////////////////////////////////////////////////////////////////
124     // General Properties
125     ////////////////////////////////////////////////////////////////////////////
126 
127 
128     /**
129      * Gets the policy used by this mutex.
130      *
131      * Returns:
132      *  The policy used by this mutex.
133      */
134     @property Policy policy() @safe nothrow
135     {
136         return m_policy;
137     }
138 
139     ///ditto
140     @property Policy policy() shared @safe nothrow
141     {
142         return m_policy;
143     }
144 
145     ////////////////////////////////////////////////////////////////////////////
146     // Reader/Writer Handles
147     ////////////////////////////////////////////////////////////////////////////
148 
149 
150     /**
151      * Gets an object representing the reader lock for the associated mutex.
152      *
153      * Returns:
154      *  A reader sub-mutex.
155      */
156     @property Reader reader() @safe nothrow
157     {
158         return m_reader;
159     }
160 
161     ///ditto
162     @property shared(Reader) reader() shared @safe nothrow
163     {
164         return m_reader;
165     }
166 
167     /**
168      * Gets an object representing the writer lock for the associated mutex.
169      *
170      * Returns:
171      *  A writer sub-mutex.
172      */
173     @property Writer writer() @safe nothrow
174     {
175         return m_writer;
176     }
177 
178     ///ditto
179     @property shared(Writer) writer() shared @safe nothrow
180     {
181         return m_writer;
182     }
183 
184 
185     ////////////////////////////////////////////////////////////////////////////
186     // Reader
187     ////////////////////////////////////////////////////////////////////////////
188 
189 
190     /**
191      * This class can be considered a mutex in its own right, and is used to
192      * negotiate a read lock for the enclosing mutex.
193      */
194     class Reader :
195         Object.Monitor
196     {
197         /**
198          * Initializes a read/write mutex reader proxy object.
199          */
200         this(this Q)() @trusted nothrow
201             if (is(Q == Reader) || is(Q == shared Reader))
202         {
203             m_proxy.link = this;
204             this.__monitor = cast(void*) &m_proxy;
205         }
206 
207         /**
208          * Acquires a read lock on the enclosing mutex.
209          */
210         @trusted void lock()
211         {
212             synchronized( m_commonMutex )
213             {
214                 ++m_numQueuedReaders;
215                 scope(exit) --m_numQueuedReaders;
216 
217                 while ( shouldQueueReader )
218                     m_readerQueue.wait();
219                 ++m_numActiveReaders;
220             }
221         }
222 
223         /// ditto
224         @trusted void lock() shared
225         {
226             synchronized( m_commonMutex )
227             {
228                 ++(cast()m_numQueuedReaders);
229                 scope(exit) --(cast()m_numQueuedReaders);
230 
231                 while ( shouldQueueReader )
232                     m_readerQueue.wait();
233                 ++(cast()m_numActiveReaders);
234             }
235         }
236 
237         /**
238          * Releases a read lock on the enclosing mutex.
239          */
240         @trusted void unlock()
241         {
242             synchronized( m_commonMutex )
243             {
244                 if ( --m_numActiveReaders < 1 )
245                 {
246                     if ( m_numQueuedWriters > 0 )
247                         m_writerQueue.notify();
248                 }
249             }
250         }
251 
252         /// ditto
253         @trusted void unlock() shared
254         {
255             synchronized( m_commonMutex )
256             {
257                 if ( --(cast()m_numActiveReaders) < 1 )
258                 {
259                     if ( m_numQueuedWriters > 0 )
260                         m_writerQueue.notify();
261                 }
262             }
263         }
264 
265         /**
266          * Attempts to acquire a read lock on the enclosing mutex.  If one can
267          * be obtained without blocking, the lock is acquired and true is
268          * returned.  If not, the lock is not acquired and false is returned.
269          *
270          * Returns:
271          *  true if the lock was acquired and false if not.
272          */
273         @trusted bool tryLock()
274         {
275             synchronized( m_commonMutex )
276             {
277                 if ( shouldQueueReader )
278                     return false;
279                 ++m_numActiveReaders;
280                 return true;
281             }
282         }
283 
284         /// ditto
285         @trusted bool tryLock() shared
286         {
287             synchronized( m_commonMutex )
288             {
289                 if ( shouldQueueReader )
290                     return false;
291                 ++(cast()m_numActiveReaders);
292                 return true;
293             }
294         }
295 
296         /**
297          * Attempts to acquire a read lock on the enclosing mutex. If one can
298          * be obtained without blocking, the lock is acquired and true is
299          * returned. If not, the function blocks until either the lock can be
300          * obtained or the time elapsed exceeds $(D_PARAM timeout), returning
301          * true if the lock was acquired and false if the function timed out.
302          *
303          * Params:
304          *  timeout = maximum amount of time to wait for the lock
305          * Returns:
306          *  true if the lock was acquired and false if not.
307          */
308         @trusted bool tryLock(Duration timeout)
309         {
310             synchronized( m_commonMutex )
311             {
312                 if (!shouldQueueReader)
313                 {
314                     ++m_numActiveReaders;
315                     return true;
316                 }
317 
318                 enum zero = Duration.zero();
319                 if (timeout <= zero)
320                     return false;
321 
322                 ++m_numQueuedReaders;
323                 scope(exit) --m_numQueuedReaders;
324 
325                 enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration.
326                 const initialTime = MonoTime.currTime;
327                 m_readerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall);
328                 while (shouldQueueReader)
329                 {
330                     const timeElapsed = MonoTime.currTime - initialTime;
331                     if (timeElapsed >= timeout)
332                         return false;
333                     auto nextWait = timeout - timeElapsed;
334                     m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
335                 }
336                 ++m_numActiveReaders;
337                 return true;
338             }
339         }
340 
341         /// ditto
342         @trusted bool tryLock(Duration timeout) shared
343         {
344             const initialTime = MonoTime.currTime;
345             synchronized( m_commonMutex )
346             {
347                 ++(cast()m_numQueuedReaders);
348                 scope(exit) --(cast()m_numQueuedReaders);
349 
350                 while (shouldQueueReader)
351                 {
352                     const timeElapsed = MonoTime.currTime - initialTime;
353                     if (timeElapsed >= timeout)
354                         return false;
355                     auto nextWait = timeout - timeElapsed;
356                     // Avoid problems calling wait(Duration) with huge arguments.
357                     enum maxWaitPerCall = dur!"hours"(24 * 365);
358                     m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
359                 }
360                 ++(cast()m_numActiveReaders);
361                 return true;
362             }
363         }
364 
365 
366     private:
367         @property bool shouldQueueReader(this Q)() nothrow @safe @nogc
368             if (is(Q == Reader) || is(Q == shared Reader))
369         {
370             if ( m_numActiveWriters > 0 )
371                 return true;
372 
373             switch ( m_policy )
374             {
375             case Policy.PREFER_WRITERS:
376                  return m_numQueuedWriters > 0;
377 
378             case Policy.PREFER_READERS:
379             default:
380                  break;
381             }
382 
383         return false;
384         }
385 
386         struct MonitorProxy
387         {
388             Object.Monitor link;
389         }
390 
391         MonitorProxy    m_proxy;
392     }
393 
394 
395     ////////////////////////////////////////////////////////////////////////////
396     // Writer
397     ////////////////////////////////////////////////////////////////////////////
398 
399 
400     /**
401      * This class can be considered a mutex in its own right, and is used to
402      * negotiate a write lock for the enclosing mutex.
403      */
404     class Writer :
405         Object.Monitor
406     {
407         /**
408          * Initializes a read/write mutex writer proxy object.
409          */
410         this(this Q)() @trusted nothrow
411             if (is(Q == Writer) || is(Q == shared Writer))
412         {
413             m_proxy.link = this;
414             this.__monitor = cast(void*) &m_proxy;
415         }
416 
417 
418         /**
419          * Acquires a write lock on the enclosing mutex.
420          */
421         @trusted void lock()
422         {
423             synchronized( m_commonMutex )
424             {
425                 ++m_numQueuedWriters;
426                 scope(exit) --m_numQueuedWriters;
427 
428                 while ( shouldQueueWriter )
429                     m_writerQueue.wait();
430                 ++m_numActiveWriters;
431             }
432         }
433 
434         /// ditto
435         @trusted void lock() shared
436         {
437             synchronized( m_commonMutex )
438             {
439                 ++(cast()m_numQueuedWriters);
440                 scope(exit) --(cast()m_numQueuedWriters);
441 
442                 while ( shouldQueueWriter )
443                     m_writerQueue.wait();
444                 ++(cast()m_numActiveWriters);
445             }
446         }
447 
448 
449         /**
450          * Releases a write lock on the enclosing mutex.
451          */
452         @trusted void unlock()
453         {
454             synchronized( m_commonMutex )
455             {
456                 if ( --m_numActiveWriters < 1 )
457                 {
458                     switch ( m_policy )
459                     {
460                     default:
461                     case Policy.PREFER_READERS:
462                         if ( m_numQueuedReaders > 0 )
463                             m_readerQueue.notifyAll();
464                         else if ( m_numQueuedWriters > 0 )
465                             m_writerQueue.notify();
466                         break;
467                     case Policy.PREFER_WRITERS:
468                         if ( m_numQueuedWriters > 0 )
469                             m_writerQueue.notify();
470                         else if ( m_numQueuedReaders > 0 )
471                             m_readerQueue.notifyAll();
472                     }
473                 }
474             }
475         }
476 
477         /// ditto
478         @trusted void unlock() shared
479         {
480             synchronized( m_commonMutex )
481             {
482                 if ( --(cast()m_numActiveWriters) < 1 )
483                 {
484                     switch ( m_policy )
485                     {
486                     default:
487                     case Policy.PREFER_READERS:
488                         if ( m_numQueuedReaders > 0 )
489                             m_readerQueue.notifyAll();
490                         else if ( m_numQueuedWriters > 0 )
491                             m_writerQueue.notify();
492                         break;
493                     case Policy.PREFER_WRITERS:
494                         if ( m_numQueuedWriters > 0 )
495                             m_writerQueue.notify();
496                         else if ( m_numQueuedReaders > 0 )
497                             m_readerQueue.notifyAll();
498                     }
499                 }
500             }
501         }
502 
503 
504         /**
505          * Attempts to acquire a write lock on the enclosing mutex.  If one can
506          * be obtained without blocking, the lock is acquired and true is
507          * returned.  If not, the lock is not acquired and false is returned.
508          *
509          * Returns:
510          *  true if the lock was acquired and false if not.
511          */
512         @trusted bool tryLock()
513         {
514             synchronized( m_commonMutex )
515             {
516                 if ( shouldQueueWriter )
517                     return false;
518                 ++m_numActiveWriters;
519                 return true;
520             }
521         }
522 
523         /// ditto
524         @trusted bool tryLock() shared
525         {
526             synchronized( m_commonMutex )
527             {
528                 if ( shouldQueueWriter )
529                     return false;
530                 ++(cast()m_numActiveWriters);
531                 return true;
532             }
533         }
534 
535         /**
536          * Attempts to acquire a write lock on the enclosing mutex. If one can
537          * be obtained without blocking, the lock is acquired and true is
538          * returned. If not, the function blocks until either the lock can be
539          * obtained or the time elapsed exceeds $(D_PARAM timeout), returning
540          * true if the lock was acquired and false if the function timed out.
541          *
542          * Params:
543          *  timeout = maximum amount of time to wait for the lock
544          * Returns:
545          *  true if the lock was acquired and false if not.
546          */
547         @trusted bool tryLock(Duration timeout)
548         {
549             synchronized( m_commonMutex )
550             {
551                 if (!shouldQueueWriter)
552                 {
553                     ++m_numActiveWriters;
554                     return true;
555                 }
556 
557                 enum zero = Duration.zero();
558                 if (timeout <= zero)
559                     return false;
560 
561                 ++m_numQueuedWriters;
562                 scope(exit) --m_numQueuedWriters;
563 
564                 enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration.
565                 const initialTime = MonoTime.currTime;
566                 m_writerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall);
567                 while (shouldQueueWriter)
568                 {
569                     const timeElapsed = MonoTime.currTime - initialTime;
570                     if (timeElapsed >= timeout)
571                         return false;
572                     auto nextWait = timeout - timeElapsed;
573                     m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
574                 }
575                 ++m_numActiveWriters;
576                 return true;
577             }
578         }
579 
580         /// ditto
581         @trusted bool tryLock(Duration timeout) shared
582         {
583             const initialTime = MonoTime.currTime;
584             synchronized( m_commonMutex )
585             {
586                 ++(cast()m_numQueuedWriters);
587                 scope(exit) --(cast()m_numQueuedWriters);
588 
589                 while (shouldQueueWriter)
590                 {
591                     const timeElapsed = MonoTime.currTime - initialTime;
592                     if (timeElapsed >= timeout)
593                         return false;
594                     auto nextWait = timeout - timeElapsed;
595                     // Avoid problems calling wait(Duration) with huge arguments.
596                     enum maxWaitPerCall = dur!"hours"(24 * 365);
597                     m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
598                 }
599                 ++(cast()m_numActiveWriters);
600                 return true;
601             }
602         }
603 
604     private:
605         @property bool shouldQueueWriter(this Q)()
606             if (is(Q == Writer) || is(Q == shared Writer))
607         {
608             if ( m_numActiveWriters > 0 ||
609                 m_numActiveReaders > 0 )
610                 return true;
611             switch ( m_policy )
612             {
613             case Policy.PREFER_READERS:
614                 return m_numQueuedReaders > 0;
615 
616             case Policy.PREFER_WRITERS:
617             default:
618                  break;
619             }
620 
621         return false;
622         }
623 
624         struct MonitorProxy
625         {
626             Object.Monitor link;
627         }
628 
629         MonitorProxy    m_proxy;
630     }
631 
632 
633 private:
634     Policy      m_policy;
635     Reader      m_reader;
636     Writer      m_writer;
637 
638     Mutex       m_commonMutex;
639     Condition   m_readerQueue;
640     Condition   m_writerQueue;
641 
642     int         m_numQueuedReaders;
643     int         m_numActiveReaders;
644     int         m_numQueuedWriters;
645     int         m_numActiveWriters;
646 }
647 
648 
649 ////////////////////////////////////////////////////////////////////////////////
650 // Unit Tests
651 ////////////////////////////////////////////////////////////////////////////////
652 
653 
654 unittest
655 {
656     import core.atomic, core.thread, core.sync.semaphore;
657 
658     static void runTest(ReadWriteMutex.Policy policy)
659     {
660         scope mutex = new ReadWriteMutex(policy);
661         scope rdSemA = new Semaphore, rdSemB = new Semaphore,
662               wrSemA = new Semaphore, wrSemB = new Semaphore;
663         shared size_t numReaders, numWriters;
664 
665         void readerFn()
666         {
667             synchronized (mutex.reader)
668             {
669                 atomicOp!"+="(numReaders, 1);
670                 rdSemA.notify();
671                 rdSemB.wait();
672                 atomicOp!"-="(numReaders, 1);
673             }
674         }
675 
676         void writerFn()
677         {
678             synchronized (mutex.writer)
679             {
680                 atomicOp!"+="(numWriters, 1);
681                 wrSemA.notify();
682                 wrSemB.wait();
683                 atomicOp!"-="(numWriters, 1);
684             }
685         }
686 
687         void waitQueued(size_t queuedReaders, size_t queuedWriters)
688         {
689             for (;;)
690             {
691                 synchronized (mutex.m_commonMutex)
692                 {
693                     if (mutex.m_numQueuedReaders == queuedReaders &&
694                         mutex.m_numQueuedWriters == queuedWriters)
695                         break;
696                 }
697                 Thread.yield();
698             }
699         }
700 
701         scope group = new ThreadGroup;
702 
703         // 2 simultaneous readers
704         group.create(&readerFn); group.create(&readerFn);
705         rdSemA.wait(); rdSemA.wait();
706         assert(numReaders == 2);
707         rdSemB.notify(); rdSemB.notify();
708         group.joinAll();
709         assert(numReaders == 0);
710         foreach (t; group) group.remove(t);
711 
712         // 1 writer at a time
713         group.create(&writerFn); group.create(&writerFn);
714         wrSemA.wait();
715         assert(!wrSemA.tryWait());
716         assert(numWriters == 1);
717         wrSemB.notify();
718         wrSemA.wait();
719         assert(numWriters == 1);
720         wrSemB.notify();
721         group.joinAll();
722         assert(numWriters == 0);
723         foreach (t; group) group.remove(t);
724 
725         // reader and writer are mutually exclusive
726         group.create(&readerFn);
727         rdSemA.wait();
728         group.create(&writerFn);
729         waitQueued(0, 1);
730         assert(!wrSemA.tryWait());
731         assert(numReaders == 1 && numWriters == 0);
732         rdSemB.notify();
733         wrSemA.wait();
734         assert(numReaders == 0 && numWriters == 1);
735         wrSemB.notify();
736         group.joinAll();
737         assert(numReaders == 0 && numWriters == 0);
738         foreach (t; group) group.remove(t);
739 
740         // writer and reader are mutually exclusive
741         group.create(&writerFn);
742         wrSemA.wait();
743         group.create(&readerFn);
744         waitQueued(1, 0);
745         assert(!rdSemA.tryWait());
746         assert(numReaders == 0 && numWriters == 1);
747         wrSemB.notify();
748         rdSemA.wait();
749         assert(numReaders == 1 && numWriters == 0);
750         rdSemB.notify();
751         group.joinAll();
752         assert(numReaders == 0 && numWriters == 0);
753         foreach (t; group) group.remove(t);
754 
755         // policy determines whether queued reader or writers progress first
756         group.create(&writerFn);
757         wrSemA.wait();
758         group.create(&readerFn);
759         group.create(&writerFn);
760         waitQueued(1, 1);
761         assert(numReaders == 0 && numWriters == 1);
762         wrSemB.notify();
763 
764         if (policy == ReadWriteMutex.Policy.PREFER_READERS)
765         {
766             rdSemA.wait();
767             assert(numReaders == 1 && numWriters == 0);
768             rdSemB.notify();
769             wrSemA.wait();
770             assert(numReaders == 0 && numWriters == 1);
771             wrSemB.notify();
772         }
773         else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS)
774         {
775             wrSemA.wait();
776             assert(numReaders == 0 && numWriters == 1);
777             wrSemB.notify();
778             rdSemA.wait();
779             assert(numReaders == 1 && numWriters == 0);
780             rdSemB.notify();
781         }
782         group.joinAll();
783         assert(numReaders == 0 && numWriters == 0);
784         foreach (t; group) group.remove(t);
785     }
786     runTest(ReadWriteMutex.Policy.PREFER_READERS);
787     runTest(ReadWriteMutex.Policy.PREFER_WRITERS);
788 }
789 
790 unittest
791 {
792     import core.atomic, core.thread;
793     __gshared ReadWriteMutex rwmutex;
794     shared static bool threadTriedOnceToGetLock;
795     shared static bool threadFinallyGotLock;
796 
797     rwmutex = new ReadWriteMutex();
798     atomicFence;
799     const maxTimeAllowedForTest = dur!"seconds"(20);
800     // Test ReadWriteMutex.Reader.tryLock(Duration).
801     {
802         static void testReaderTryLock()
803         {
804             assert(!rwmutex.reader.tryLock(Duration.min));
805             threadTriedOnceToGetLock.atomicStore(true);
806             assert(rwmutex.reader.tryLock(Duration.max));
807             threadFinallyGotLock.atomicStore(true);
808             rwmutex.reader.unlock;
809         }
810         assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
811         auto otherThread = new Thread(&testReaderTryLock).start;
812         const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
813         Thread.yield;
814         // We started otherThread with the writer lock held so otherThread's
815         // first rwlock.reader.tryLock with timeout Duration.min should fail.
816         while (!threadTriedOnceToGetLock.atomicLoad)
817         {
818             assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
819             Thread.yield;
820         }
821         rwmutex.writer.unlock;
822         // Soon after we release the writer lock otherThread's second
823         // rwlock.reader.tryLock with timeout Duration.max should succeed.
824         while (!threadFinallyGotLock.atomicLoad)
825         {
826             assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
827             Thread.yield;
828         }
829         otherThread.join;
830     }
831     threadTriedOnceToGetLock.atomicStore(false); // Reset.
832     threadFinallyGotLock.atomicStore(false); // Reset.
833     // Test ReadWriteMutex.Writer.tryLock(Duration).
834     {
835         static void testWriterTryLock()
836         {
837             assert(!rwmutex.writer.tryLock(Duration.min));
838             threadTriedOnceToGetLock.atomicStore(true);
839             assert(rwmutex.writer.tryLock(Duration.max));
840             threadFinallyGotLock.atomicStore(true);
841             rwmutex.writer.unlock;
842         }
843         assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
844         auto otherThread = new Thread(&testWriterTryLock).start;
845         const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
846         Thread.yield;
847         // We started otherThread with the reader lock held so otherThread's
848         // first rwlock.writer.tryLock with timeout Duration.min should fail.
849         while (!threadTriedOnceToGetLock.atomicLoad)
850         {
851             assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
852             Thread.yield;
853         }
854         rwmutex.reader.unlock;
855         // Soon after we release the reader lock otherThread's second
856         // rwlock.writer.tryLock with timeout Duration.max should succeed.
857         while (!threadFinallyGotLock.atomicLoad)
858         {
859             assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
860             Thread.yield;
861         }
862         otherThread.join;
863     }
864 }
865 
866 unittest
867 {
868     import core.atomic, core.thread, core.sync.semaphore;
869 
870     static void runTest(ReadWriteMutex.Policy policy)
871     {
872         shared scope mutex = new shared ReadWriteMutex(policy);
873         scope rdSemA = new Semaphore, rdSemB = new Semaphore,
874               wrSemA = new Semaphore, wrSemB = new Semaphore;
875         shared size_t numReaders, numWriters;
876 
877         void readerFn()
878         {
879             synchronized (mutex.reader)
880             {
881                 atomicOp!"+="(numReaders, 1);
882                 rdSemA.notify();
883                 rdSemB.wait();
884                 atomicOp!"-="(numReaders, 1);
885             }
886         }
887 
888         void writerFn()
889         {
890             synchronized (mutex.writer)
891             {
892                 atomicOp!"+="(numWriters, 1);
893                 wrSemA.notify();
894                 wrSemB.wait();
895                 atomicOp!"-="(numWriters, 1);
896             }
897         }
898 
899         void waitQueued(size_t queuedReaders, size_t queuedWriters)
900         {
901             for (;;)
902             {
903                 synchronized (mutex.m_commonMutex)
904                 {
905                     if (mutex.m_numQueuedReaders == queuedReaders &&
906                         mutex.m_numQueuedWriters == queuedWriters)
907                         break;
908                 }
909                 Thread.yield();
910             }
911         }
912 
913         scope group = new ThreadGroup;
914 
915         // 2 simultaneous readers
916         group.create(&readerFn); group.create(&readerFn);
917         rdSemA.wait(); rdSemA.wait();
918         assert(numReaders == 2);
919         rdSemB.notify(); rdSemB.notify();
920         group.joinAll();
921         assert(numReaders == 0);
922         foreach (t; group) group.remove(t);
923 
924         // 1 writer at a time
925         group.create(&writerFn); group.create(&writerFn);
926         wrSemA.wait();
927         assert(!wrSemA.tryWait());
928         assert(numWriters == 1);
929         wrSemB.notify();
930         wrSemA.wait();
931         assert(numWriters == 1);
932         wrSemB.notify();
933         group.joinAll();
934         assert(numWriters == 0);
935         foreach (t; group) group.remove(t);
936 
937         // reader and writer are mutually exclusive
938         group.create(&readerFn);
939         rdSemA.wait();
940         group.create(&writerFn);
941         waitQueued(0, 1);
942         assert(!wrSemA.tryWait());
943         assert(numReaders == 1 && numWriters == 0);
944         rdSemB.notify();
945         wrSemA.wait();
946         assert(numReaders == 0 && numWriters == 1);
947         wrSemB.notify();
948         group.joinAll();
949         assert(numReaders == 0 && numWriters == 0);
950         foreach (t; group) group.remove(t);
951 
952         // writer and reader are mutually exclusive
953         group.create(&writerFn);
954         wrSemA.wait();
955         group.create(&readerFn);
956         waitQueued(1, 0);
957         assert(!rdSemA.tryWait());
958         assert(numReaders == 0 && numWriters == 1);
959         wrSemB.notify();
960         rdSemA.wait();
961         assert(numReaders == 1 && numWriters == 0);
962         rdSemB.notify();
963         group.joinAll();
964         assert(numReaders == 0 && numWriters == 0);
965         foreach (t; group) group.remove(t);
966 
967         // policy determines whether queued reader or writers progress first
968         group.create(&writerFn);
969         wrSemA.wait();
970         group.create(&readerFn);
971         group.create(&writerFn);
972         waitQueued(1, 1);
973         assert(numReaders == 0 && numWriters == 1);
974         wrSemB.notify();
975 
976         if (policy == ReadWriteMutex.Policy.PREFER_READERS)
977         {
978             rdSemA.wait();
979             assert(numReaders == 1 && numWriters == 0);
980             rdSemB.notify();
981             wrSemA.wait();
982             assert(numReaders == 0 && numWriters == 1);
983             wrSemB.notify();
984         }
985         else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS)
986         {
987             wrSemA.wait();
988             assert(numReaders == 0 && numWriters == 1);
989             wrSemB.notify();
990             rdSemA.wait();
991             assert(numReaders == 1 && numWriters == 0);
992             rdSemB.notify();
993         }
994         group.joinAll();
995         assert(numReaders == 0 && numWriters == 0);
996         foreach (t; group) group.remove(t);
997     }
998     runTest(ReadWriteMutex.Policy.PREFER_READERS);
999     runTest(ReadWriteMutex.Policy.PREFER_WRITERS);
1000 }
1001 
1002 unittest
1003 {
1004     import core.atomic, core.thread;
1005     shared static ReadWriteMutex rwmutex;
1006     shared static bool threadTriedOnceToGetLock;
1007     shared static bool threadFinallyGotLock;
1008 
1009     rwmutex = new shared ReadWriteMutex();
1010     atomicFence;
1011     const maxTimeAllowedForTest = dur!"seconds"(20);
1012     // Test ReadWriteMutex.Reader.tryLock(Duration).
1013     {
1014         static void testReaderTryLock()
1015         {
1016             assert(!rwmutex.reader.tryLock(Duration.min));
1017             threadTriedOnceToGetLock.atomicStore(true);
1018             assert(rwmutex.reader.tryLock(Duration.max));
1019             threadFinallyGotLock.atomicStore(true);
1020             rwmutex.reader.unlock;
1021         }
1022         assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
1023         auto otherThread = new Thread(&testReaderTryLock).start;
1024         const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
1025         Thread.yield;
1026         // We started otherThread with the writer lock held so otherThread's
1027         // first rwlock.reader.tryLock with timeout Duration.min should fail.
1028         while (!threadTriedOnceToGetLock.atomicLoad)
1029         {
1030             assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
1031             Thread.yield;
1032         }
1033         rwmutex.writer.unlock;
1034         // Soon after we release the writer lock otherThread's second
1035         // rwlock.reader.tryLock with timeout Duration.max should succeed.
1036         while (!threadFinallyGotLock.atomicLoad)
1037         {
1038             assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
1039             Thread.yield;
1040         }
1041         otherThread.join;
1042     }
1043     threadTriedOnceToGetLock.atomicStore(false); // Reset.
1044     threadFinallyGotLock.atomicStore(false); // Reset.
1045     // Test ReadWriteMutex.Writer.tryLock(Duration).
1046     {
1047         static void testWriterTryLock()
1048         {
1049             assert(!rwmutex.writer.tryLock(Duration.min));
1050             threadTriedOnceToGetLock.atomicStore(true);
1051             assert(rwmutex.writer.tryLock(Duration.max));
1052             threadFinallyGotLock.atomicStore(true);
1053             rwmutex.writer.unlock;
1054         }
1055         assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
1056         auto otherThread = new Thread(&testWriterTryLock).start;
1057         const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
1058         Thread.yield;
1059         // We started otherThread with the reader lock held so otherThread's
1060         // first rwlock.writer.tryLock with timeout Duration.min should fail.
1061         while (!threadTriedOnceToGetLock.atomicLoad)
1062         {
1063             assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
1064             Thread.yield;
1065         }
1066         rwmutex.reader.unlock;
1067         // Soon after we release the reader lock otherThread's second
1068         // rwlock.writer.tryLock with timeout Duration.max should succeed.
1069         while (!threadFinallyGotLock.atomicLoad)
1070         {
1071             assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
1072             Thread.yield;
1073         }
1074         otherThread.join;
1075     }
1076 }