1 /** 2 * The read/write mutex module provides a primitive for maintaining shared read 3 * access and mutually exclusive write access. 4 * 5 * Copyright: Copyright Sean Kelly 2005 - 2009. 6 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 7 * Authors: Sean Kelly 8 * Source: $(DRUNTIMESRC core/sync/_rwmutex.d) 9 */ 10 11 /* Copyright Sean Kelly 2005 - 2009. 12 * Distributed under the Boost Software License, Version 1.0. 13 * (See accompanying file LICENSE or copy at 14 * http://www.boost.org/LICENSE_1_0.txt) 15 */ 16 module core.sync.rwmutex; 17 18 19 public import core.sync.exception; 20 import core.sync.condition; 21 import core.sync.mutex; 22 import core.memory; 23 24 25 //////////////////////////////////////////////////////////////////////////////// 26 // ReadWriteMutex 27 // 28 // Reader reader(); 29 // Writer writer(); 30 //////////////////////////////////////////////////////////////////////////////// 31 32 33 /** 34 * This class represents a mutex that allows any number of readers to enter, 35 * but when a writer enters, all other readers and writers are blocked. 36 * 37 * Please note that this mutex is not recursive and is intended to guard access 38 * to data only. Also, no deadlock checking is in place because doing so would 39 * require dynamic memory allocation, which would reduce performance by an 40 * unacceptable amount. As a result, any attempt to recursively acquire this 41 * mutex may well deadlock the caller, particularly if a write lock is acquired 42 * while holding a read lock, or vice-versa. In practice, this should not be 43 * an issue however, because it is uncommon to call deeply into unknown code 44 * while holding a lock that simply protects data. 45 */ 46 class ReadWriteMutex 47 { 48 /** 49 * Defines the policy used by this mutex. Currently, two policies are 50 * defined. 51 * 52 * The first will queue writers until no readers hold the mutex, then 53 * pass the writers through one at a time. If a reader acquires the mutex 54 * while there are still writers queued, the reader will take precedence. 55 * 56 * The second will queue readers if there are any writers queued. Writers 57 * are passed through one at a time, and once there are no writers present, 58 * all queued readers will be alerted. 59 * 60 * Future policies may offer a more even balance between reader and writer 61 * precedence. 62 */ 63 enum Policy 64 { 65 PREFER_READERS, /// Readers get preference. This may starve writers. 66 PREFER_WRITERS /// Writers get preference. This may starve readers. 67 } 68 69 70 //////////////////////////////////////////////////////////////////////////// 71 // Initialization 72 //////////////////////////////////////////////////////////////////////////// 73 74 75 /** 76 * Initializes a read/write mutex object with the supplied policy. 77 * 78 * Params: 79 * policy = The policy to use. 80 * 81 * Throws: 82 * SyncError on error. 83 */ 84 this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow 85 { 86 m_commonMutex = new Mutex; 87 if ( !m_commonMutex ) 88 throw new SyncError( "Unable to initialize mutex" ); 89 90 m_readerQueue = new Condition( m_commonMutex ); 91 if ( !m_readerQueue ) 92 throw new SyncError( "Unable to initialize mutex" ); 93 94 m_writerQueue = new Condition( m_commonMutex ); 95 if ( !m_writerQueue ) 96 throw new SyncError( "Unable to initialize mutex" ); 97 98 m_policy = policy; 99 m_reader = new Reader; 100 m_writer = new Writer; 101 } 102 103 /// ditto 104 shared this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow 105 { 106 m_commonMutex = new shared Mutex; 107 if ( !m_commonMutex ) 108 throw new SyncError( "Unable to initialize mutex" ); 109 110 m_readerQueue = new shared Condition( m_commonMutex ); 111 if ( !m_readerQueue ) 112 throw new SyncError( "Unable to initialize mutex" ); 113 114 m_writerQueue = new shared Condition( m_commonMutex ); 115 if ( !m_writerQueue ) 116 throw new SyncError( "Unable to initialize mutex" ); 117 118 m_policy = policy; 119 m_reader = new shared Reader; 120 m_writer = new shared Writer; 121 } 122 123 //////////////////////////////////////////////////////////////////////////// 124 // General Properties 125 //////////////////////////////////////////////////////////////////////////// 126 127 128 /** 129 * Gets the policy used by this mutex. 130 * 131 * Returns: 132 * The policy used by this mutex. 133 */ 134 @property Policy policy() @safe nothrow 135 { 136 return m_policy; 137 } 138 139 ///ditto 140 @property Policy policy() shared @safe nothrow 141 { 142 return m_policy; 143 } 144 145 //////////////////////////////////////////////////////////////////////////// 146 // Reader/Writer Handles 147 //////////////////////////////////////////////////////////////////////////// 148 149 150 /** 151 * Gets an object representing the reader lock for the associated mutex. 152 * 153 * Returns: 154 * A reader sub-mutex. 155 */ 156 @property Reader reader() @safe nothrow 157 { 158 return m_reader; 159 } 160 161 ///ditto 162 @property shared(Reader) reader() shared @safe nothrow 163 { 164 return m_reader; 165 } 166 167 /** 168 * Gets an object representing the writer lock for the associated mutex. 169 * 170 * Returns: 171 * A writer sub-mutex. 172 */ 173 @property Writer writer() @safe nothrow 174 { 175 return m_writer; 176 } 177 178 ///ditto 179 @property shared(Writer) writer() shared @safe nothrow 180 { 181 return m_writer; 182 } 183 184 185 //////////////////////////////////////////////////////////////////////////// 186 // Reader 187 //////////////////////////////////////////////////////////////////////////// 188 189 190 /** 191 * This class can be considered a mutex in its own right, and is used to 192 * negotiate a read lock for the enclosing mutex. 193 */ 194 class Reader : 195 Object.Monitor 196 { 197 /** 198 * Initializes a read/write mutex reader proxy object. 199 */ 200 this(this Q)() @trusted nothrow 201 if (is(Q == Reader) || is(Q == shared Reader)) 202 { 203 m_proxy.link = this; 204 this.__monitor = cast(void*) &m_proxy; 205 } 206 207 /** 208 * Acquires a read lock on the enclosing mutex. 209 */ 210 @trusted void lock() 211 { 212 synchronized( m_commonMutex ) 213 { 214 ++m_numQueuedReaders; 215 scope(exit) --m_numQueuedReaders; 216 217 while ( shouldQueueReader ) 218 m_readerQueue.wait(); 219 ++m_numActiveReaders; 220 } 221 } 222 223 /// ditto 224 @trusted void lock() shared 225 { 226 synchronized( m_commonMutex ) 227 { 228 ++(cast()m_numQueuedReaders); 229 scope(exit) --(cast()m_numQueuedReaders); 230 231 while ( shouldQueueReader ) 232 m_readerQueue.wait(); 233 ++(cast()m_numActiveReaders); 234 } 235 } 236 237 /** 238 * Releases a read lock on the enclosing mutex. 239 */ 240 @trusted void unlock() 241 { 242 synchronized( m_commonMutex ) 243 { 244 if ( --m_numActiveReaders < 1 ) 245 { 246 if ( m_numQueuedWriters > 0 ) 247 m_writerQueue.notify(); 248 } 249 } 250 } 251 252 /// ditto 253 @trusted void unlock() shared 254 { 255 synchronized( m_commonMutex ) 256 { 257 if ( --(cast()m_numActiveReaders) < 1 ) 258 { 259 if ( m_numQueuedWriters > 0 ) 260 m_writerQueue.notify(); 261 } 262 } 263 } 264 265 /** 266 * Attempts to acquire a read lock on the enclosing mutex. If one can 267 * be obtained without blocking, the lock is acquired and true is 268 * returned. If not, the lock is not acquired and false is returned. 269 * 270 * Returns: 271 * true if the lock was acquired and false if not. 272 */ 273 @trusted bool tryLock() 274 { 275 synchronized( m_commonMutex ) 276 { 277 if ( shouldQueueReader ) 278 return false; 279 ++m_numActiveReaders; 280 return true; 281 } 282 } 283 284 /// ditto 285 @trusted bool tryLock() shared 286 { 287 synchronized( m_commonMutex ) 288 { 289 if ( shouldQueueReader ) 290 return false; 291 ++(cast()m_numActiveReaders); 292 return true; 293 } 294 } 295 296 /** 297 * Attempts to acquire a read lock on the enclosing mutex. If one can 298 * be obtained without blocking, the lock is acquired and true is 299 * returned. If not, the function blocks until either the lock can be 300 * obtained or the time elapsed exceeds $(D_PARAM timeout), returning 301 * true if the lock was acquired and false if the function timed out. 302 * 303 * Params: 304 * timeout = maximum amount of time to wait for the lock 305 * Returns: 306 * true if the lock was acquired and false if not. 307 */ 308 @trusted bool tryLock(Duration timeout) 309 { 310 synchronized( m_commonMutex ) 311 { 312 if (!shouldQueueReader) 313 { 314 ++m_numActiveReaders; 315 return true; 316 } 317 318 enum zero = Duration.zero(); 319 if (timeout <= zero) 320 return false; 321 322 ++m_numQueuedReaders; 323 scope(exit) --m_numQueuedReaders; 324 325 enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration. 326 const initialTime = MonoTime.currTime; 327 m_readerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall); 328 while (shouldQueueReader) 329 { 330 const timeElapsed = MonoTime.currTime - initialTime; 331 if (timeElapsed >= timeout) 332 return false; 333 auto nextWait = timeout - timeElapsed; 334 m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall); 335 } 336 ++m_numActiveReaders; 337 return true; 338 } 339 } 340 341 /// ditto 342 @trusted bool tryLock(Duration timeout) shared 343 { 344 const initialTime = MonoTime.currTime; 345 synchronized( m_commonMutex ) 346 { 347 ++(cast()m_numQueuedReaders); 348 scope(exit) --(cast()m_numQueuedReaders); 349 350 while (shouldQueueReader) 351 { 352 const timeElapsed = MonoTime.currTime - initialTime; 353 if (timeElapsed >= timeout) 354 return false; 355 auto nextWait = timeout - timeElapsed; 356 // Avoid problems calling wait(Duration) with huge arguments. 357 enum maxWaitPerCall = dur!"hours"(24 * 365); 358 m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall); 359 } 360 ++(cast()m_numActiveReaders); 361 return true; 362 } 363 } 364 365 366 private: 367 @property bool shouldQueueReader(this Q)() nothrow @safe @nogc 368 if (is(Q == Reader) || is(Q == shared Reader)) 369 { 370 if ( m_numActiveWriters > 0 ) 371 return true; 372 373 switch ( m_policy ) 374 { 375 case Policy.PREFER_WRITERS: 376 return m_numQueuedWriters > 0; 377 378 case Policy.PREFER_READERS: 379 default: 380 break; 381 } 382 383 return false; 384 } 385 386 struct MonitorProxy 387 { 388 Object.Monitor link; 389 } 390 391 MonitorProxy m_proxy; 392 } 393 394 395 //////////////////////////////////////////////////////////////////////////// 396 // Writer 397 //////////////////////////////////////////////////////////////////////////// 398 399 400 /** 401 * This class can be considered a mutex in its own right, and is used to 402 * negotiate a write lock for the enclosing mutex. 403 */ 404 class Writer : 405 Object.Monitor 406 { 407 /** 408 * Initializes a read/write mutex writer proxy object. 409 */ 410 this(this Q)() @trusted nothrow 411 if (is(Q == Writer) || is(Q == shared Writer)) 412 { 413 m_proxy.link = this; 414 this.__monitor = cast(void*) &m_proxy; 415 } 416 417 418 /** 419 * Acquires a write lock on the enclosing mutex. 420 */ 421 @trusted void lock() 422 { 423 synchronized( m_commonMutex ) 424 { 425 ++m_numQueuedWriters; 426 scope(exit) --m_numQueuedWriters; 427 428 while ( shouldQueueWriter ) 429 m_writerQueue.wait(); 430 ++m_numActiveWriters; 431 } 432 } 433 434 /// ditto 435 @trusted void lock() shared 436 { 437 synchronized( m_commonMutex ) 438 { 439 ++(cast()m_numQueuedWriters); 440 scope(exit) --(cast()m_numQueuedWriters); 441 442 while ( shouldQueueWriter ) 443 m_writerQueue.wait(); 444 ++(cast()m_numActiveWriters); 445 } 446 } 447 448 449 /** 450 * Releases a write lock on the enclosing mutex. 451 */ 452 @trusted void unlock() 453 { 454 synchronized( m_commonMutex ) 455 { 456 if ( --m_numActiveWriters < 1 ) 457 { 458 switch ( m_policy ) 459 { 460 default: 461 case Policy.PREFER_READERS: 462 if ( m_numQueuedReaders > 0 ) 463 m_readerQueue.notifyAll(); 464 else if ( m_numQueuedWriters > 0 ) 465 m_writerQueue.notify(); 466 break; 467 case Policy.PREFER_WRITERS: 468 if ( m_numQueuedWriters > 0 ) 469 m_writerQueue.notify(); 470 else if ( m_numQueuedReaders > 0 ) 471 m_readerQueue.notifyAll(); 472 } 473 } 474 } 475 } 476 477 /// ditto 478 @trusted void unlock() shared 479 { 480 synchronized( m_commonMutex ) 481 { 482 if ( --(cast()m_numActiveWriters) < 1 ) 483 { 484 switch ( m_policy ) 485 { 486 default: 487 case Policy.PREFER_READERS: 488 if ( m_numQueuedReaders > 0 ) 489 m_readerQueue.notifyAll(); 490 else if ( m_numQueuedWriters > 0 ) 491 m_writerQueue.notify(); 492 break; 493 case Policy.PREFER_WRITERS: 494 if ( m_numQueuedWriters > 0 ) 495 m_writerQueue.notify(); 496 else if ( m_numQueuedReaders > 0 ) 497 m_readerQueue.notifyAll(); 498 } 499 } 500 } 501 } 502 503 504 /** 505 * Attempts to acquire a write lock on the enclosing mutex. If one can 506 * be obtained without blocking, the lock is acquired and true is 507 * returned. If not, the lock is not acquired and false is returned. 508 * 509 * Returns: 510 * true if the lock was acquired and false if not. 511 */ 512 @trusted bool tryLock() 513 { 514 synchronized( m_commonMutex ) 515 { 516 if ( shouldQueueWriter ) 517 return false; 518 ++m_numActiveWriters; 519 return true; 520 } 521 } 522 523 /// ditto 524 @trusted bool tryLock() shared 525 { 526 synchronized( m_commonMutex ) 527 { 528 if ( shouldQueueWriter ) 529 return false; 530 ++(cast()m_numActiveWriters); 531 return true; 532 } 533 } 534 535 /** 536 * Attempts to acquire a write lock on the enclosing mutex. If one can 537 * be obtained without blocking, the lock is acquired and true is 538 * returned. If not, the function blocks until either the lock can be 539 * obtained or the time elapsed exceeds $(D_PARAM timeout), returning 540 * true if the lock was acquired and false if the function timed out. 541 * 542 * Params: 543 * timeout = maximum amount of time to wait for the lock 544 * Returns: 545 * true if the lock was acquired and false if not. 546 */ 547 @trusted bool tryLock(Duration timeout) 548 { 549 synchronized( m_commonMutex ) 550 { 551 if (!shouldQueueWriter) 552 { 553 ++m_numActiveWriters; 554 return true; 555 } 556 557 enum zero = Duration.zero(); 558 if (timeout <= zero) 559 return false; 560 561 ++m_numQueuedWriters; 562 scope(exit) --m_numQueuedWriters; 563 564 enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration. 565 const initialTime = MonoTime.currTime; 566 m_writerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall); 567 while (shouldQueueWriter) 568 { 569 const timeElapsed = MonoTime.currTime - initialTime; 570 if (timeElapsed >= timeout) 571 return false; 572 auto nextWait = timeout - timeElapsed; 573 m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall); 574 } 575 ++m_numActiveWriters; 576 return true; 577 } 578 } 579 580 /// ditto 581 @trusted bool tryLock(Duration timeout) shared 582 { 583 const initialTime = MonoTime.currTime; 584 synchronized( m_commonMutex ) 585 { 586 ++(cast()m_numQueuedWriters); 587 scope(exit) --(cast()m_numQueuedWriters); 588 589 while (shouldQueueWriter) 590 { 591 const timeElapsed = MonoTime.currTime - initialTime; 592 if (timeElapsed >= timeout) 593 return false; 594 auto nextWait = timeout - timeElapsed; 595 // Avoid problems calling wait(Duration) with huge arguments. 596 enum maxWaitPerCall = dur!"hours"(24 * 365); 597 m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall); 598 } 599 ++(cast()m_numActiveWriters); 600 return true; 601 } 602 } 603 604 private: 605 @property bool shouldQueueWriter(this Q)() 606 if (is(Q == Writer) || is(Q == shared Writer)) 607 { 608 if ( m_numActiveWriters > 0 || 609 m_numActiveReaders > 0 ) 610 return true; 611 switch ( m_policy ) 612 { 613 case Policy.PREFER_READERS: 614 return m_numQueuedReaders > 0; 615 616 case Policy.PREFER_WRITERS: 617 default: 618 break; 619 } 620 621 return false; 622 } 623 624 struct MonitorProxy 625 { 626 Object.Monitor link; 627 } 628 629 MonitorProxy m_proxy; 630 } 631 632 633 private: 634 Policy m_policy; 635 Reader m_reader; 636 Writer m_writer; 637 638 Mutex m_commonMutex; 639 Condition m_readerQueue; 640 Condition m_writerQueue; 641 642 int m_numQueuedReaders; 643 int m_numActiveReaders; 644 int m_numQueuedWriters; 645 int m_numActiveWriters; 646 } 647 648 649 //////////////////////////////////////////////////////////////////////////////// 650 // Unit Tests 651 //////////////////////////////////////////////////////////////////////////////// 652 653 654 unittest 655 { 656 import core.atomic, core.thread, core.sync.semaphore; 657 658 static void runTest(ReadWriteMutex.Policy policy) 659 { 660 scope mutex = new ReadWriteMutex(policy); 661 scope rdSemA = new Semaphore, rdSemB = new Semaphore, 662 wrSemA = new Semaphore, wrSemB = new Semaphore; 663 shared size_t numReaders, numWriters; 664 665 void readerFn() 666 { 667 synchronized (mutex.reader) 668 { 669 atomicOp!"+="(numReaders, 1); 670 rdSemA.notify(); 671 rdSemB.wait(); 672 atomicOp!"-="(numReaders, 1); 673 } 674 } 675 676 void writerFn() 677 { 678 synchronized (mutex.writer) 679 { 680 atomicOp!"+="(numWriters, 1); 681 wrSemA.notify(); 682 wrSemB.wait(); 683 atomicOp!"-="(numWriters, 1); 684 } 685 } 686 687 void waitQueued(size_t queuedReaders, size_t queuedWriters) 688 { 689 for (;;) 690 { 691 synchronized (mutex.m_commonMutex) 692 { 693 if (mutex.m_numQueuedReaders == queuedReaders && 694 mutex.m_numQueuedWriters == queuedWriters) 695 break; 696 } 697 Thread.yield(); 698 } 699 } 700 701 scope group = new ThreadGroup; 702 703 // 2 simultaneous readers 704 group.create(&readerFn); group.create(&readerFn); 705 rdSemA.wait(); rdSemA.wait(); 706 assert(numReaders == 2); 707 rdSemB.notify(); rdSemB.notify(); 708 group.joinAll(); 709 assert(numReaders == 0); 710 foreach (t; group) group.remove(t); 711 712 // 1 writer at a time 713 group.create(&writerFn); group.create(&writerFn); 714 wrSemA.wait(); 715 assert(!wrSemA.tryWait()); 716 assert(numWriters == 1); 717 wrSemB.notify(); 718 wrSemA.wait(); 719 assert(numWriters == 1); 720 wrSemB.notify(); 721 group.joinAll(); 722 assert(numWriters == 0); 723 foreach (t; group) group.remove(t); 724 725 // reader and writer are mutually exclusive 726 group.create(&readerFn); 727 rdSemA.wait(); 728 group.create(&writerFn); 729 waitQueued(0, 1); 730 assert(!wrSemA.tryWait()); 731 assert(numReaders == 1 && numWriters == 0); 732 rdSemB.notify(); 733 wrSemA.wait(); 734 assert(numReaders == 0 && numWriters == 1); 735 wrSemB.notify(); 736 group.joinAll(); 737 assert(numReaders == 0 && numWriters == 0); 738 foreach (t; group) group.remove(t); 739 740 // writer and reader are mutually exclusive 741 group.create(&writerFn); 742 wrSemA.wait(); 743 group.create(&readerFn); 744 waitQueued(1, 0); 745 assert(!rdSemA.tryWait()); 746 assert(numReaders == 0 && numWriters == 1); 747 wrSemB.notify(); 748 rdSemA.wait(); 749 assert(numReaders == 1 && numWriters == 0); 750 rdSemB.notify(); 751 group.joinAll(); 752 assert(numReaders == 0 && numWriters == 0); 753 foreach (t; group) group.remove(t); 754 755 // policy determines whether queued reader or writers progress first 756 group.create(&writerFn); 757 wrSemA.wait(); 758 group.create(&readerFn); 759 group.create(&writerFn); 760 waitQueued(1, 1); 761 assert(numReaders == 0 && numWriters == 1); 762 wrSemB.notify(); 763 764 if (policy == ReadWriteMutex.Policy.PREFER_READERS) 765 { 766 rdSemA.wait(); 767 assert(numReaders == 1 && numWriters == 0); 768 rdSemB.notify(); 769 wrSemA.wait(); 770 assert(numReaders == 0 && numWriters == 1); 771 wrSemB.notify(); 772 } 773 else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS) 774 { 775 wrSemA.wait(); 776 assert(numReaders == 0 && numWriters == 1); 777 wrSemB.notify(); 778 rdSemA.wait(); 779 assert(numReaders == 1 && numWriters == 0); 780 rdSemB.notify(); 781 } 782 group.joinAll(); 783 assert(numReaders == 0 && numWriters == 0); 784 foreach (t; group) group.remove(t); 785 } 786 runTest(ReadWriteMutex.Policy.PREFER_READERS); 787 runTest(ReadWriteMutex.Policy.PREFER_WRITERS); 788 } 789 790 unittest 791 { 792 import core.atomic, core.thread; 793 __gshared ReadWriteMutex rwmutex; 794 shared static bool threadTriedOnceToGetLock; 795 shared static bool threadFinallyGotLock; 796 797 rwmutex = new ReadWriteMutex(); 798 atomicFence; 799 const maxTimeAllowedForTest = dur!"seconds"(20); 800 // Test ReadWriteMutex.Reader.tryLock(Duration). 801 { 802 static void testReaderTryLock() 803 { 804 assert(!rwmutex.reader.tryLock(Duration.min)); 805 threadTriedOnceToGetLock.atomicStore(true); 806 assert(rwmutex.reader.tryLock(Duration.max)); 807 threadFinallyGotLock.atomicStore(true); 808 rwmutex.reader.unlock; 809 } 810 assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking"); 811 auto otherThread = new Thread(&testReaderTryLock).start; 812 const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest; 813 Thread.yield; 814 // We started otherThread with the writer lock held so otherThread's 815 // first rwlock.reader.tryLock with timeout Duration.min should fail. 816 while (!threadTriedOnceToGetLock.atomicLoad) 817 { 818 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); 819 Thread.yield; 820 } 821 rwmutex.writer.unlock; 822 // Soon after we release the writer lock otherThread's second 823 // rwlock.reader.tryLock with timeout Duration.max should succeed. 824 while (!threadFinallyGotLock.atomicLoad) 825 { 826 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); 827 Thread.yield; 828 } 829 otherThread.join; 830 } 831 threadTriedOnceToGetLock.atomicStore(false); // Reset. 832 threadFinallyGotLock.atomicStore(false); // Reset. 833 // Test ReadWriteMutex.Writer.tryLock(Duration). 834 { 835 static void testWriterTryLock() 836 { 837 assert(!rwmutex.writer.tryLock(Duration.min)); 838 threadTriedOnceToGetLock.atomicStore(true); 839 assert(rwmutex.writer.tryLock(Duration.max)); 840 threadFinallyGotLock.atomicStore(true); 841 rwmutex.writer.unlock; 842 } 843 assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking"); 844 auto otherThread = new Thread(&testWriterTryLock).start; 845 const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest; 846 Thread.yield; 847 // We started otherThread with the reader lock held so otherThread's 848 // first rwlock.writer.tryLock with timeout Duration.min should fail. 849 while (!threadTriedOnceToGetLock.atomicLoad) 850 { 851 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); 852 Thread.yield; 853 } 854 rwmutex.reader.unlock; 855 // Soon after we release the reader lock otherThread's second 856 // rwlock.writer.tryLock with timeout Duration.max should succeed. 857 while (!threadFinallyGotLock.atomicLoad) 858 { 859 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); 860 Thread.yield; 861 } 862 otherThread.join; 863 } 864 } 865 866 unittest 867 { 868 import core.atomic, core.thread, core.sync.semaphore; 869 870 static void runTest(ReadWriteMutex.Policy policy) 871 { 872 shared scope mutex = new shared ReadWriteMutex(policy); 873 scope rdSemA = new Semaphore, rdSemB = new Semaphore, 874 wrSemA = new Semaphore, wrSemB = new Semaphore; 875 shared size_t numReaders, numWriters; 876 877 void readerFn() 878 { 879 synchronized (mutex.reader) 880 { 881 atomicOp!"+="(numReaders, 1); 882 rdSemA.notify(); 883 rdSemB.wait(); 884 atomicOp!"-="(numReaders, 1); 885 } 886 } 887 888 void writerFn() 889 { 890 synchronized (mutex.writer) 891 { 892 atomicOp!"+="(numWriters, 1); 893 wrSemA.notify(); 894 wrSemB.wait(); 895 atomicOp!"-="(numWriters, 1); 896 } 897 } 898 899 void waitQueued(size_t queuedReaders, size_t queuedWriters) 900 { 901 for (;;) 902 { 903 synchronized (mutex.m_commonMutex) 904 { 905 if (mutex.m_numQueuedReaders == queuedReaders && 906 mutex.m_numQueuedWriters == queuedWriters) 907 break; 908 } 909 Thread.yield(); 910 } 911 } 912 913 scope group = new ThreadGroup; 914 915 // 2 simultaneous readers 916 group.create(&readerFn); group.create(&readerFn); 917 rdSemA.wait(); rdSemA.wait(); 918 assert(numReaders == 2); 919 rdSemB.notify(); rdSemB.notify(); 920 group.joinAll(); 921 assert(numReaders == 0); 922 foreach (t; group) group.remove(t); 923 924 // 1 writer at a time 925 group.create(&writerFn); group.create(&writerFn); 926 wrSemA.wait(); 927 assert(!wrSemA.tryWait()); 928 assert(numWriters == 1); 929 wrSemB.notify(); 930 wrSemA.wait(); 931 assert(numWriters == 1); 932 wrSemB.notify(); 933 group.joinAll(); 934 assert(numWriters == 0); 935 foreach (t; group) group.remove(t); 936 937 // reader and writer are mutually exclusive 938 group.create(&readerFn); 939 rdSemA.wait(); 940 group.create(&writerFn); 941 waitQueued(0, 1); 942 assert(!wrSemA.tryWait()); 943 assert(numReaders == 1 && numWriters == 0); 944 rdSemB.notify(); 945 wrSemA.wait(); 946 assert(numReaders == 0 && numWriters == 1); 947 wrSemB.notify(); 948 group.joinAll(); 949 assert(numReaders == 0 && numWriters == 0); 950 foreach (t; group) group.remove(t); 951 952 // writer and reader are mutually exclusive 953 group.create(&writerFn); 954 wrSemA.wait(); 955 group.create(&readerFn); 956 waitQueued(1, 0); 957 assert(!rdSemA.tryWait()); 958 assert(numReaders == 0 && numWriters == 1); 959 wrSemB.notify(); 960 rdSemA.wait(); 961 assert(numReaders == 1 && numWriters == 0); 962 rdSemB.notify(); 963 group.joinAll(); 964 assert(numReaders == 0 && numWriters == 0); 965 foreach (t; group) group.remove(t); 966 967 // policy determines whether queued reader or writers progress first 968 group.create(&writerFn); 969 wrSemA.wait(); 970 group.create(&readerFn); 971 group.create(&writerFn); 972 waitQueued(1, 1); 973 assert(numReaders == 0 && numWriters == 1); 974 wrSemB.notify(); 975 976 if (policy == ReadWriteMutex.Policy.PREFER_READERS) 977 { 978 rdSemA.wait(); 979 assert(numReaders == 1 && numWriters == 0); 980 rdSemB.notify(); 981 wrSemA.wait(); 982 assert(numReaders == 0 && numWriters == 1); 983 wrSemB.notify(); 984 } 985 else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS) 986 { 987 wrSemA.wait(); 988 assert(numReaders == 0 && numWriters == 1); 989 wrSemB.notify(); 990 rdSemA.wait(); 991 assert(numReaders == 1 && numWriters == 0); 992 rdSemB.notify(); 993 } 994 group.joinAll(); 995 assert(numReaders == 0 && numWriters == 0); 996 foreach (t; group) group.remove(t); 997 } 998 runTest(ReadWriteMutex.Policy.PREFER_READERS); 999 runTest(ReadWriteMutex.Policy.PREFER_WRITERS); 1000 } 1001 1002 unittest 1003 { 1004 import core.atomic, core.thread; 1005 shared static ReadWriteMutex rwmutex; 1006 shared static bool threadTriedOnceToGetLock; 1007 shared static bool threadFinallyGotLock; 1008 1009 rwmutex = new shared ReadWriteMutex(); 1010 atomicFence; 1011 const maxTimeAllowedForTest = dur!"seconds"(20); 1012 // Test ReadWriteMutex.Reader.tryLock(Duration). 1013 { 1014 static void testReaderTryLock() 1015 { 1016 assert(!rwmutex.reader.tryLock(Duration.min)); 1017 threadTriedOnceToGetLock.atomicStore(true); 1018 assert(rwmutex.reader.tryLock(Duration.max)); 1019 threadFinallyGotLock.atomicStore(true); 1020 rwmutex.reader.unlock; 1021 } 1022 assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking"); 1023 auto otherThread = new Thread(&testReaderTryLock).start; 1024 const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest; 1025 Thread.yield; 1026 // We started otherThread with the writer lock held so otherThread's 1027 // first rwlock.reader.tryLock with timeout Duration.min should fail. 1028 while (!threadTriedOnceToGetLock.atomicLoad) 1029 { 1030 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); 1031 Thread.yield; 1032 } 1033 rwmutex.writer.unlock; 1034 // Soon after we release the writer lock otherThread's second 1035 // rwlock.reader.tryLock with timeout Duration.max should succeed. 1036 while (!threadFinallyGotLock.atomicLoad) 1037 { 1038 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); 1039 Thread.yield; 1040 } 1041 otherThread.join; 1042 } 1043 threadTriedOnceToGetLock.atomicStore(false); // Reset. 1044 threadFinallyGotLock.atomicStore(false); // Reset. 1045 // Test ReadWriteMutex.Writer.tryLock(Duration). 1046 { 1047 static void testWriterTryLock() 1048 { 1049 assert(!rwmutex.writer.tryLock(Duration.min)); 1050 threadTriedOnceToGetLock.atomicStore(true); 1051 assert(rwmutex.writer.tryLock(Duration.max)); 1052 threadFinallyGotLock.atomicStore(true); 1053 rwmutex.writer.unlock; 1054 } 1055 assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking"); 1056 auto otherThread = new Thread(&testWriterTryLock).start; 1057 const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest; 1058 Thread.yield; 1059 // We started otherThread with the reader lock held so otherThread's 1060 // first rwlock.writer.tryLock with timeout Duration.min should fail. 1061 while (!threadTriedOnceToGetLock.atomicLoad) 1062 { 1063 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); 1064 Thread.yield; 1065 } 1066 rwmutex.reader.unlock; 1067 // Soon after we release the reader lock otherThread's second 1068 // rwlock.writer.tryLock with timeout Duration.max should succeed. 1069 while (!threadFinallyGotLock.atomicLoad) 1070 { 1071 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); 1072 Thread.yield; 1073 } 1074 otherThread.join; 1075 } 1076 }