1 /** 2 * The condition module provides a primitive for synchronized condition 3 * checking. 4 * 5 * Copyright: Copyright Sean Kelly 2005 - 2009. 6 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 7 * Authors: Sean Kelly 8 * Source: $(DRUNTIMESRC core/sync/_condition.d) 9 */ 10 11 /* Copyright Sean Kelly 2005 - 2009. 12 * Distributed under the Boost Software License, Version 1.0. 13 * (See accompanying file LICENSE or copy at 14 * http://www.boost.org/LICENSE_1_0.txt) 15 */ 16 module core.sync.condition; 17 18 19 public import core.sync.exception; 20 public import core.sync.mutex; 21 public import core.time; 22 23 import core.exception : AssertError, staticError; 24 25 26 version (Windows) 27 { 28 import core.sync.semaphore; 29 import core.sys.windows.basetsd /+: HANDLE+/; 30 import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION, 31 DeleteCriticalSection, EnterCriticalSection, INFINITE, InitializeCriticalSection, 32 LeaveCriticalSection, ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/; 33 import core.sys.windows.windef /+: BOOL, DWORD+/; 34 import core.sys.windows.winerror /+: WAIT_TIMEOUT+/; 35 } 36 else version (Posix) 37 { 38 import core.sync.config; 39 import core.stdc.errno; 40 import core.sys.posix.pthread; 41 import core.sys.posix.time; 42 } 43 else version (FreeStanding) 44 { 45 } 46 else 47 { 48 static assert(false, "Platform not supported"); 49 } 50 51 52 //////////////////////////////////////////////////////////////////////////////// 53 // Condition 54 // 55 // void wait(); 56 // void notify(); 57 // void notifyAll(); 58 //////////////////////////////////////////////////////////////////////////////// 59 60 /** 61 * This class represents a condition variable as conceived by C.A.R. Hoare. As 62 * per Mesa type monitors however, "signal" has been replaced with "notify" to 63 * indicate that control is not transferred to the waiter when a notification 64 * is sent. 65 */ 66 class Condition 67 { 68 //////////////////////////////////////////////////////////////////////////// 69 // Initialization 70 //////////////////////////////////////////////////////////////////////////// 71 72 /** 73 * Initializes a condition object which is associated with the supplied 74 * mutex object. 75 * 76 * Params: 77 * m = The mutex with which this condition will be associated. 78 * 79 * Throws: 80 * SyncError on error. 81 */ 82 this( Mutex m ) nothrow @safe @nogc 83 { 84 this(m, true); 85 } 86 87 /// ditto 88 this( shared Mutex m ) shared nothrow @safe @nogc 89 { 90 import core.atomic : atomicLoad; 91 this(atomicLoad(m), true); 92 } 93 94 // 95 private this(this Q, M)( M m, bool _unused_ ) nothrow @trusted @nogc 96 if ((is(Q == Condition) && is(M == Mutex)) || 97 (is(Q == shared Condition) && is(M == shared Mutex))) 98 { 99 version (Windows) 100 { 101 static if (is(Q == Condition)) 102 { 103 alias HANDLE_TYPE = void*; 104 } 105 else 106 { 107 alias HANDLE_TYPE = shared(void*); 108 } 109 m_blockLock = cast(HANDLE_TYPE) CreateSemaphoreA( null, 1, 1, null ); 110 if ( m_blockLock == m_blockLock.init ) 111 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__); 112 scope(failure) CloseHandle( cast(void*) m_blockLock ); 113 114 m_blockQueue = cast(HANDLE_TYPE) CreateSemaphoreA( null, 0, int.max, null ); 115 if ( m_blockQueue == m_blockQueue.init ) 116 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__); 117 scope(failure) CloseHandle( cast(void*) m_blockQueue ); 118 119 InitializeCriticalSection( cast(RTL_CRITICAL_SECTION*) &m_unblockLock ); 120 m_assocMutex = m; 121 } 122 else version (Posix) 123 { 124 static if (is(Q == shared)) 125 { 126 import core.atomic : atomicLoad; 127 m_assocMutex = atomicLoad(m); 128 } 129 else 130 { 131 m_assocMutex = m; 132 } 133 static if ( is( typeof( pthread_condattr_setclock ) ) ) 134 { 135 () @trusted 136 { 137 pthread_condattr_t attr = void; 138 int rc = pthread_condattr_init( &attr ); 139 if ( rc ) 140 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__); 141 rc = pthread_condattr_setclock( &attr, CLOCK_MONOTONIC ); 142 if ( rc ) 143 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__); 144 rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, &attr ); 145 if ( rc ) 146 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__); 147 rc = pthread_condattr_destroy( &attr ); 148 if ( rc ) 149 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__); 150 } (); 151 } 152 else 153 { 154 int rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, null ); 155 if ( rc ) 156 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__); 157 } 158 } 159 } 160 161 ~this() @nogc 162 { 163 version (Windows) 164 { 165 BOOL rc = CloseHandle( m_blockLock ); 166 assert( rc, "Unable to destroy condition" ); 167 rc = CloseHandle( m_blockQueue ); 168 assert( rc, "Unable to destroy condition" ); 169 DeleteCriticalSection( &m_unblockLock ); 170 } 171 else version (Posix) 172 { 173 int rc = pthread_cond_destroy( &m_hndl ); 174 assert( !rc, "Unable to destroy condition" ); 175 } 176 } 177 178 179 //////////////////////////////////////////////////////////////////////////// 180 // General Properties 181 //////////////////////////////////////////////////////////////////////////// 182 183 184 /** 185 * Gets the mutex associated with this condition. 186 * 187 * Returns: 188 * The mutex associated with this condition. 189 */ 190 @property Mutex mutex() 191 { 192 return m_assocMutex; 193 } 194 195 /// ditto 196 @property shared(Mutex) mutex() shared 197 { 198 import core.atomic : atomicLoad; 199 return atomicLoad(m_assocMutex); 200 } 201 202 // undocumented function for internal use 203 final @property Mutex mutex_nothrow() pure nothrow @safe @nogc 204 { 205 return m_assocMutex; 206 } 207 208 // ditto 209 final @property shared(Mutex) mutex_nothrow() shared pure nothrow @safe @nogc 210 { 211 import core.atomic : atomicLoad; 212 return atomicLoad(m_assocMutex); 213 } 214 215 //////////////////////////////////////////////////////////////////////////// 216 // General Actions 217 //////////////////////////////////////////////////////////////////////////// 218 219 220 /** 221 * Wait until notified. 222 * 223 * Throws: 224 * SyncError on error. 225 */ 226 void wait() 227 { 228 wait!(typeof(this))(true); 229 } 230 231 /// ditto 232 void wait() shared 233 { 234 wait!(typeof(this))(true); 235 } 236 237 /// ditto 238 void wait(this Q)( bool _unused_ ) 239 if (is(Q == Condition) || is(Q == shared Condition)) 240 { 241 version (Windows) 242 { 243 timedWait( INFINITE ); 244 } 245 else version (Posix) 246 { 247 int rc = pthread_cond_wait( cast(pthread_cond_t*) &m_hndl, (cast(Mutex) m_assocMutex).handleAddr() ); 248 if ( rc ) 249 throw staticError!AssertError("Unable to wait for condition", __FILE__, __LINE__); 250 } 251 } 252 253 /** 254 * Suspends the calling thread until a notification occurs or until the 255 * supplied time period has elapsed. 256 * 257 * Params: 258 * val = The time to wait. 259 * 260 * In: 261 * val must be non-negative. 262 * 263 * Throws: 264 * SyncError on error. 265 * 266 * Returns: 267 * true if notified before the timeout and false if not. 268 */ 269 bool wait( Duration val ) 270 { 271 return wait!(typeof(this))(val, true); 272 } 273 274 /// ditto 275 bool wait( Duration val ) shared 276 { 277 return wait!(typeof(this))(val, true); 278 } 279 280 /// ditto 281 bool wait(this Q)( Duration val, bool _unused_ ) 282 if (is(Q == Condition) || is(Q == shared Condition)) 283 in 284 { 285 assert( !val.isNegative ); 286 } 287 do 288 { 289 version (Windows) 290 { 291 auto maxWaitMillis = dur!("msecs")( uint.max - 1 ); 292 293 while ( val > maxWaitMillis ) 294 { 295 if ( timedWait( cast(uint) 296 maxWaitMillis.total!"msecs" ) ) 297 return true; 298 val -= maxWaitMillis; 299 } 300 return timedWait( cast(uint) val.total!"msecs" ); 301 } 302 else version (Posix) 303 { 304 timespec t = void; 305 mktspec( t, val ); 306 307 int rc = pthread_cond_timedwait( cast(pthread_cond_t*) &m_hndl, 308 (cast(Mutex) m_assocMutex).handleAddr(), 309 &t ); 310 if ( !rc ) 311 return true; 312 if ( rc == ETIMEDOUT ) 313 return false; 314 throw staticError!AssertError("Unable to wait for condition", __FILE__, __LINE__); 315 } 316 else version (FreeStanding) assert(0); 317 } 318 319 /** 320 * Notifies one waiter. 321 * 322 * Throws: 323 * SyncError on error. 324 */ 325 void notify() 326 { 327 notify!(typeof(this))(true); 328 } 329 330 /// ditto 331 void notify() shared 332 { 333 notify!(typeof(this))(true); 334 } 335 336 /// ditto 337 void notify(this Q)( bool _unused_ ) 338 if (is(Q == Condition) || is(Q == shared Condition)) 339 { 340 version (Windows) 341 { 342 notify_( false ); 343 } 344 else version (Posix) 345 { 346 // Since OS X 10.7 (Lion), pthread_cond_signal returns EAGAIN after retrying 8192 times, 347 // so need to retrying while it returns EAGAIN. 348 // 349 // 10.7.0 (Lion): http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c 350 // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c 351 // 10.10.0 (Yosemite): http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c 352 // 10.11.0 (El Capitan): http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c 353 // 10.12.0 (Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c 354 // 10.13.0 (High Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c 355 // 10.14.0 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c 356 // 10.14.1 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c 357 358 int rc; 359 do { 360 rc = pthread_cond_signal( cast(pthread_cond_t*) &m_hndl ); 361 } while ( rc == EAGAIN ); 362 if ( rc ) 363 throw staticError!AssertError("Unable to notify condition", __FILE__, __LINE__); 364 } 365 } 366 367 /** 368 * Notifies all waiters. 369 * 370 * Throws: 371 * SyncError on error. 372 */ 373 void notifyAll() 374 { 375 notifyAll!(typeof(this))(true); 376 } 377 378 /// ditto 379 void notifyAll() shared 380 { 381 notifyAll!(typeof(this))(true); 382 } 383 384 /// ditto 385 void notifyAll(this Q)( bool _unused_ ) 386 if (is(Q == Condition) || is(Q == shared Condition)) 387 { 388 version (Windows) 389 { 390 notify_( true ); 391 } 392 else version (Posix) 393 { 394 // Since OS X 10.7 (Lion), pthread_cond_broadcast returns EAGAIN after retrying 8192 times, 395 // so need to retrying while it returns EAGAIN. 396 // 397 // 10.7.0 (Lion): http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c 398 // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c 399 // 10.10.0 (Yosemite): http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c 400 // 10.11.0 (El Capitan): http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c 401 // 10.12.0 (Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c 402 // 10.13.0 (High Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c 403 // 10.14.0 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c 404 // 10.14.1 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c 405 406 int rc; 407 do { 408 rc = pthread_cond_broadcast( cast(pthread_cond_t*) &m_hndl ); 409 } while ( rc == EAGAIN ); 410 if ( rc ) 411 throw staticError!AssertError("Unable to notify condition", __FILE__, __LINE__); 412 } 413 } 414 415 private: 416 version (Windows) 417 { 418 bool timedWait(this Q)( DWORD timeout ) 419 if (is(Q == Condition) || is(Q == shared Condition)) 420 { 421 static if (is(Q == Condition)) 422 { 423 auto op(string o, T, V1)(ref T val, V1 mod) 424 { 425 return mixin("val " ~ o ~ "mod"); 426 } 427 } 428 else 429 { 430 auto op(string o, T, V1)(ref shared T val, V1 mod) 431 { 432 import core.atomic: atomicOp; 433 return atomicOp!o(val, mod); 434 } 435 } 436 437 int numSignalsLeft; 438 int numWaitersGone; 439 DWORD rc; 440 441 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE ); 442 assert( rc == WAIT_OBJECT_0 ); 443 444 op!"+="(m_numWaitersBlocked, 1); 445 446 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); 447 assert( rc ); 448 449 m_assocMutex.unlock(); 450 scope(failure) m_assocMutex.lock(); 451 452 rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, timeout ); 453 assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT ); 454 bool timedOut = (rc == WAIT_TIMEOUT); 455 456 EnterCriticalSection( &m_unblockLock ); 457 scope(failure) LeaveCriticalSection( &m_unblockLock ); 458 459 if ( (numSignalsLeft = m_numWaitersToUnblock) != 0 ) 460 { 461 if ( timedOut ) 462 { 463 // timeout (or canceled) 464 if ( m_numWaitersBlocked != 0 ) 465 { 466 op!"-="(m_numWaitersBlocked, 1); 467 // do not unblock next waiter below (already unblocked) 468 numSignalsLeft = 0; 469 } 470 else 471 { 472 // spurious wakeup pending!! 473 m_numWaitersGone = 1; 474 } 475 } 476 if ( op!"-="(m_numWaitersToUnblock, 1) == 0 ) 477 { 478 if ( m_numWaitersBlocked != 0 ) 479 { 480 // open the gate 481 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); 482 assert( rc ); 483 // do not open the gate below again 484 numSignalsLeft = 0; 485 } 486 else if ( (numWaitersGone = m_numWaitersGone) != 0 ) 487 { 488 m_numWaitersGone = 0; 489 } 490 } 491 } 492 else if ( op!"+="(m_numWaitersGone, 1) == int.max / 2 ) 493 { 494 // timeout/canceled or spurious event :-) 495 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE ); 496 assert( rc == WAIT_OBJECT_0 ); 497 // something is going on here - test of timeouts? 498 op!"-="(m_numWaitersBlocked, m_numWaitersGone); 499 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); 500 assert( rc == WAIT_OBJECT_0 ); 501 m_numWaitersGone = 0; 502 } 503 504 LeaveCriticalSection( &m_unblockLock ); 505 506 if ( numSignalsLeft == 1 ) 507 { 508 // better now than spurious later (same as ResetEvent) 509 for ( ; numWaitersGone > 0; --numWaitersGone ) 510 { 511 rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, INFINITE ); 512 assert( rc == WAIT_OBJECT_0 ); 513 } 514 // open the gate 515 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); 516 assert( rc ); 517 } 518 else if ( numSignalsLeft != 0 ) 519 { 520 // unblock next waiter 521 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null ); 522 assert( rc ); 523 } 524 m_assocMutex.lock(); 525 return !timedOut; 526 } 527 528 529 void notify_(this Q)( bool all ) 530 if (is(Q == Condition) || is(Q == shared Condition)) 531 { 532 static if (is(Q == Condition)) 533 { 534 auto op(string o, T, V1)(ref T val, V1 mod) 535 { 536 return mixin("val " ~ o ~ "mod"); 537 } 538 } 539 else 540 { 541 auto op(string o, T, V1)(ref shared T val, V1 mod) 542 { 543 import core.atomic: atomicOp; 544 return atomicOp!o(val, mod); 545 } 546 } 547 548 DWORD rc; 549 550 EnterCriticalSection( &m_unblockLock ); 551 scope(failure) LeaveCriticalSection( &m_unblockLock ); 552 553 if ( m_numWaitersToUnblock != 0 ) 554 { 555 if ( m_numWaitersBlocked == 0 ) 556 { 557 LeaveCriticalSection( &m_unblockLock ); 558 return; 559 } 560 if ( all ) 561 { 562 op!"+="(m_numWaitersToUnblock, m_numWaitersBlocked); 563 m_numWaitersBlocked = 0; 564 } 565 else 566 { 567 op!"+="(m_numWaitersToUnblock, 1); 568 op!"-="(m_numWaitersBlocked, 1); 569 } 570 LeaveCriticalSection( &m_unblockLock ); 571 } 572 else if ( m_numWaitersBlocked > m_numWaitersGone ) 573 { 574 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE ); 575 assert( rc == WAIT_OBJECT_0 ); 576 if ( 0 != m_numWaitersGone ) 577 { 578 op!"-="(m_numWaitersBlocked, m_numWaitersGone); 579 m_numWaitersGone = 0; 580 } 581 if ( all ) 582 { 583 m_numWaitersToUnblock = m_numWaitersBlocked; 584 m_numWaitersBlocked = 0; 585 } 586 else 587 { 588 m_numWaitersToUnblock = 1; 589 op!"-="(m_numWaitersBlocked, 1); 590 } 591 LeaveCriticalSection( &m_unblockLock ); 592 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null ); 593 assert( rc ); 594 } 595 else 596 { 597 LeaveCriticalSection( &m_unblockLock ); 598 } 599 } 600 601 602 // NOTE: This implementation uses Algorithm 8c as described here: 603 // http://groups.google.com/group/comp.programming.threads/ 604 // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a 605 HANDLE m_blockLock; // auto-reset event (now semaphore) 606 HANDLE m_blockQueue; // auto-reset event (now semaphore) 607 Mutex m_assocMutex; // external mutex/CS 608 CRITICAL_SECTION m_unblockLock; // internal mutex/CS 609 int m_numWaitersGone = 0; 610 int m_numWaitersBlocked = 0; 611 int m_numWaitersToUnblock = 0; 612 } 613 else version (Posix) 614 { 615 Mutex m_assocMutex; 616 pthread_cond_t m_hndl; 617 } 618 else version (FreeStanding) 619 { 620 Mutex m_assocMutex; 621 } 622 } 623 624 625 //////////////////////////////////////////////////////////////////////////////// 626 // Unit Tests 627 //////////////////////////////////////////////////////////////////////////////// 628 629 unittest 630 { 631 import core.thread; 632 import core.sync.mutex; 633 import core.sync.semaphore; 634 635 636 void testNotify() 637 { 638 auto mutex = new Mutex; 639 auto condReady = new Condition( mutex ); 640 auto semDone = new Semaphore; 641 auto synLoop = new Object; 642 int numWaiters = 10; 643 int numTries = 10; 644 int numReady = 0; 645 int numTotal = 0; 646 int numDone = 0; 647 int numPost = 0; 648 649 void waiter() 650 { 651 for ( int i = 0; i < numTries; ++i ) 652 { 653 synchronized( mutex ) 654 { 655 while ( numReady < 1 ) 656 { 657 condReady.wait(); 658 } 659 --numReady; 660 ++numTotal; 661 } 662 663 synchronized( synLoop ) 664 { 665 ++numDone; 666 } 667 semDone.wait(); 668 } 669 } 670 671 auto group = new ThreadGroup; 672 673 for ( int i = 0; i < numWaiters; ++i ) 674 group.create( &waiter ); 675 676 for ( int i = 0; i < numTries; ++i ) 677 { 678 for ( int j = 0; j < numWaiters; ++j ) 679 { 680 synchronized( mutex ) 681 { 682 ++numReady; 683 condReady.notify(); 684 } 685 } 686 while ( true ) 687 { 688 synchronized( synLoop ) 689 { 690 if ( numDone >= numWaiters ) 691 break; 692 } 693 Thread.yield(); 694 } 695 for ( int j = 0; j < numWaiters; ++j ) 696 { 697 semDone.notify(); 698 } 699 } 700 701 group.joinAll(); 702 assert( numTotal == numWaiters * numTries ); 703 } 704 705 706 void testNotifyAll() 707 { 708 auto mutex = new Mutex; 709 auto condReady = new Condition( mutex ); 710 int numWaiters = 10; 711 int numReady = 0; 712 int numDone = 0; 713 bool alert = false; 714 715 void waiter() 716 { 717 synchronized( mutex ) 718 { 719 ++numReady; 720 while ( !alert ) 721 condReady.wait(); 722 ++numDone; 723 } 724 } 725 726 auto group = new ThreadGroup; 727 728 for ( int i = 0; i < numWaiters; ++i ) 729 group.create( &waiter ); 730 731 while ( true ) 732 { 733 synchronized( mutex ) 734 { 735 if ( numReady >= numWaiters ) 736 { 737 alert = true; 738 condReady.notifyAll(); 739 break; 740 } 741 } 742 Thread.yield(); 743 } 744 group.joinAll(); 745 assert( numReady == numWaiters && numDone == numWaiters ); 746 } 747 748 749 void testWaitTimeout() 750 { 751 auto mutex = new Mutex; 752 auto condReady = new Condition( mutex ); 753 bool waiting = false; 754 bool alertedOne = true; 755 bool alertedTwo = true; 756 757 void waiter() 758 { 759 synchronized( mutex ) 760 { 761 waiting = true; 762 // we never want to miss the notification (30s) 763 alertedOne = condReady.wait( dur!"seconds"(30) ); 764 // but we don't want to wait long for the timeout (10ms) 765 alertedTwo = condReady.wait( dur!"msecs"(10) ); 766 } 767 } 768 769 auto thread = new Thread( &waiter ); 770 thread.start(); 771 772 while ( true ) 773 { 774 synchronized( mutex ) 775 { 776 if ( waiting ) 777 { 778 condReady.notify(); 779 break; 780 } 781 } 782 Thread.yield(); 783 } 784 thread.join(); 785 assert( waiting ); 786 assert( alertedOne ); 787 assert( !alertedTwo ); 788 } 789 790 testNotify(); 791 testNotifyAll(); 792 testWaitTimeout(); 793 } 794 795 unittest 796 { 797 import core.thread; 798 import core.sync.mutex; 799 import core.sync.semaphore; 800 801 802 void testNotify() 803 { 804 auto mutex = new shared Mutex; 805 auto condReady = new shared Condition( mutex ); 806 auto semDone = new Semaphore; 807 auto synLoop = new Object; 808 int numWaiters = 10; 809 int numTries = 10; 810 int numReady = 0; 811 int numTotal = 0; 812 int numDone = 0; 813 int numPost = 0; 814 815 void waiter() 816 { 817 for ( int i = 0; i < numTries; ++i ) 818 { 819 synchronized( mutex ) 820 { 821 while ( numReady < 1 ) 822 { 823 condReady.wait(); 824 } 825 --numReady; 826 ++numTotal; 827 } 828 829 synchronized( synLoop ) 830 { 831 ++numDone; 832 } 833 semDone.wait(); 834 } 835 } 836 837 auto group = new ThreadGroup; 838 839 for ( int i = 0; i < numWaiters; ++i ) 840 group.create( &waiter ); 841 842 for ( int i = 0; i < numTries; ++i ) 843 { 844 for ( int j = 0; j < numWaiters; ++j ) 845 { 846 synchronized( mutex ) 847 { 848 ++numReady; 849 condReady.notify(); 850 } 851 } 852 while ( true ) 853 { 854 synchronized( synLoop ) 855 { 856 if ( numDone >= numWaiters ) 857 break; 858 } 859 Thread.yield(); 860 } 861 for ( int j = 0; j < numWaiters; ++j ) 862 { 863 semDone.notify(); 864 } 865 } 866 867 group.joinAll(); 868 assert( numTotal == numWaiters * numTries ); 869 } 870 871 872 void testNotifyAll() 873 { 874 auto mutex = new shared Mutex; 875 auto condReady = new shared Condition( mutex ); 876 int numWaiters = 10; 877 int numReady = 0; 878 int numDone = 0; 879 bool alert = false; 880 881 void waiter() 882 { 883 synchronized( mutex ) 884 { 885 ++numReady; 886 while ( !alert ) 887 condReady.wait(); 888 ++numDone; 889 } 890 } 891 892 auto group = new ThreadGroup; 893 894 for ( int i = 0; i < numWaiters; ++i ) 895 group.create( &waiter ); 896 897 while ( true ) 898 { 899 synchronized( mutex ) 900 { 901 if ( numReady >= numWaiters ) 902 { 903 alert = true; 904 condReady.notifyAll(); 905 break; 906 } 907 } 908 Thread.yield(); 909 } 910 group.joinAll(); 911 assert( numReady == numWaiters && numDone == numWaiters ); 912 } 913 914 915 void testWaitTimeout() 916 { 917 auto mutex = new shared Mutex; 918 auto condReady = new shared Condition( mutex ); 919 bool waiting = false; 920 bool alertedOne = true; 921 bool alertedTwo = true; 922 923 void waiter() 924 { 925 synchronized( mutex ) 926 { 927 waiting = true; 928 // we never want to miss the notification (30s) 929 alertedOne = condReady.wait( dur!"seconds"(30) ); 930 // but we don't want to wait long for the timeout (10ms) 931 alertedTwo = condReady.wait( dur!"msecs"(10) ); 932 } 933 } 934 935 auto thread = new Thread( &waiter ); 936 thread.start(); 937 938 while ( true ) 939 { 940 synchronized( mutex ) 941 { 942 if ( waiting ) 943 { 944 condReady.notify(); 945 break; 946 } 947 } 948 Thread.yield(); 949 } 950 thread.join(); 951 assert( waiting ); 952 assert( alertedOne ); 953 assert( !alertedTwo ); 954 } 955 956 testNotify(); 957 testNotifyAll(); 958 testWaitTimeout(); 959 }