1 /** 2 * The condition module provides a primitive for synchronized condition 3 * checking. 4 * 5 * Copyright: Copyright Sean Kelly 2005 - 2009. 6 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 7 * Authors: Sean Kelly 8 * Source: $(DRUNTIMESRC core/sync/_condition.d) 9 */ 10 11 /* Copyright Sean Kelly 2005 - 2009. 12 * Distributed under the Boost Software License, Version 1.0. 13 * (See accompanying file LICENSE or copy at 14 * http://www.boost.org/LICENSE_1_0.txt) 15 */ 16 module core.sync.condition; 17 18 19 public import core.sync.exception; 20 public import core.sync.mutex; 21 public import core.time; 22 23 import core.exception : AssertError, staticError; 24 25 import rt.sys.config; 26 27 mixin("import " ~ osConditionImport ~ ";"); 28 29 30 //////////////////////////////////////////////////////////////////////////////// 31 // Condition 32 // 33 // void wait(); 34 // void notify(); 35 // void notifyAll(); 36 //////////////////////////////////////////////////////////////////////////////// 37 38 /** 39 * This class represents a condition variable as conceived by C.A.R. Hoare. As 40 * per Mesa type monitors however, "signal" has been replaced with "notify" to 41 * indicate that control is not transferred to the waiter when a notification 42 * is sent. 43 */ 44 class Condition 45 { 46 //////////////////////////////////////////////////////////////////////////// 47 // Initialization 48 //////////////////////////////////////////////////////////////////////////// 49 50 /** 51 * Initializes a condition object which is associated with the supplied 52 * mutex object. 53 * 54 * Params: 55 * m = The mutex with which this condition will be associated. 56 * 57 * Throws: 58 * SyncError on error. 59 */ 60 this( Mutex m ) nothrow @safe @nogc 61 { 62 this(m, true); 63 } 64 65 /// ditto 66 this( shared Mutex m ) shared nothrow @safe @nogc 67 { 68 import core.atomic : atomicLoad; 69 this(atomicLoad(m), true); 70 } 71 72 // 73 private this(this Q, M)( M m, bool _unused_ ) nothrow @trusted @nogc 74 if ((is(Q == Condition) && is(M == Mutex)) || 75 (is(Q == shared Condition) && is(M == shared Mutex))) 76 { 77 (cast(OsCondition)osCondition).create(cast(Mutex)m); 78 } 79 80 ~this() @nogc 81 { 82 (cast(OsCondition)osCondition).destroy(); 83 } 84 85 86 //////////////////////////////////////////////////////////////////////////// 87 // General Properties 88 //////////////////////////////////////////////////////////////////////////// 89 90 91 /** 92 * Gets the mutex associated with this condition. 93 * 94 * Returns: 95 * The mutex associated with this condition. 96 */ 97 @property Mutex mutex() 98 { 99 return osCondition.mutex(); 100 } 101 102 /// ditto 103 @property shared(Mutex) mutex() shared 104 { 105 return osCondition.mutex(); 106 } 107 108 // undocumented function for internal use 109 final @property Mutex mutex_nothrow() pure nothrow @safe @nogc 110 { 111 return osCondition.mutex_nothrow(); 112 } 113 114 // ditto 115 final @property shared(Mutex) mutex_nothrow() shared pure nothrow @safe @nogc 116 { 117 return osCondition.mutex_nothrow(); 118 } 119 120 //////////////////////////////////////////////////////////////////////////// 121 // General Actions 122 //////////////////////////////////////////////////////////////////////////// 123 124 125 /** 126 * Wait until notified. 127 * 128 * Throws: 129 * SyncError on error. 130 */ 131 void wait() 132 { 133 wait!(typeof(this))(true); 134 } 135 136 /// ditto 137 void wait() shared 138 { 139 wait!(typeof(this))(true); 140 } 141 142 /// ditto 143 void wait(this Q)( bool _unused_ ) 144 if (is(Q == Condition) || is(Q == shared Condition)) 145 { 146 (cast(OsCondition)osCondition).wait(); 147 } 148 149 /** 150 * Suspends the calling thread until a notification occurs or until the 151 * supplied time period has elapsed. 152 * 153 * Params: 154 * val = The time to wait. 155 * 156 * In: 157 * val must be non-negative. 158 * 159 * Throws: 160 * SyncError on error. 161 * 162 * Returns: 163 * true if notified before the timeout and false if not. 164 */ 165 bool wait( Duration val ) 166 { 167 return wait!(typeof(this))(val, true); 168 } 169 170 /// ditto 171 bool wait( Duration val ) shared 172 { 173 return wait!(typeof(this))(val, true); 174 } 175 176 /// ditto 177 bool wait(this Q)( Duration val, bool _unused_ ) 178 if (is(Q == Condition) || is(Q == shared Condition)) 179 in 180 { 181 assert( !val.isNegative ); 182 } 183 do 184 { 185 return (cast(OsCondition)osCondition).wait(val); 186 } 187 188 /** 189 * Notifies one waiter. 190 * 191 * Throws: 192 * SyncError on error. 193 */ 194 void notify() 195 { 196 notify!(typeof(this))(true); 197 } 198 199 /// ditto 200 void notify() shared 201 { 202 notify!(typeof(this))(true); 203 } 204 205 /// ditto 206 void notify(this Q)( bool _unused_ ) 207 if (is(Q == Condition) || is(Q == shared Condition)) 208 { 209 (cast(OsCondition)osCondition).notify(); 210 } 211 212 /** 213 * Notifies all waiters. 214 * 215 * Throws: 216 * SyncError on error. 217 */ 218 void notifyAll() 219 { 220 notifyAll!(typeof(this))(true); 221 } 222 223 /// ditto 224 void notifyAll() shared 225 { 226 notifyAll!(typeof(this))(true); 227 } 228 229 /// ditto 230 void notifyAll(this Q)( bool _unused_ ) 231 if (is(Q == Condition) || is(Q == shared Condition)) 232 { 233 (cast(OsCondition)osCondition).notifyAll(); 234 } 235 236 private: 237 238 OsCondition osCondition; 239 } 240 241 242 //////////////////////////////////////////////////////////////////////////////// 243 // Unit Tests 244 //////////////////////////////////////////////////////////////////////////////// 245 246 unittest 247 { 248 import core.thread; 249 import core.sync.mutex; 250 import core.sync.semaphore; 251 252 253 void testNotify() 254 { 255 auto mutex = new Mutex; 256 auto condReady = new Condition( mutex ); 257 auto semDone = new Semaphore; 258 auto synLoop = new Object; 259 int numWaiters = 10; 260 int numTries = 10; 261 int numReady = 0; 262 int numTotal = 0; 263 int numDone = 0; 264 int numPost = 0; 265 266 void waiter() 267 { 268 for ( int i = 0; i < numTries; ++i ) 269 { 270 synchronized( mutex ) 271 { 272 while ( numReady < 1 ) 273 { 274 condReady.wait(); 275 } 276 --numReady; 277 ++numTotal; 278 } 279 280 synchronized( synLoop ) 281 { 282 ++numDone; 283 } 284 semDone.wait(); 285 } 286 } 287 288 auto group = new ThreadGroup; 289 290 for ( int i = 0; i < numWaiters; ++i ) 291 group.create( &waiter ); 292 293 for ( int i = 0; i < numTries; ++i ) 294 { 295 for ( int j = 0; j < numWaiters; ++j ) 296 { 297 synchronized( mutex ) 298 { 299 ++numReady; 300 condReady.notify(); 301 } 302 } 303 while ( true ) 304 { 305 synchronized( synLoop ) 306 { 307 if ( numDone >= numWaiters ) 308 break; 309 } 310 Thread.yield(); 311 } 312 for ( int j = 0; j < numWaiters; ++j ) 313 { 314 semDone.notify(); 315 } 316 } 317 318 group.joinAll(); 319 assert( numTotal == numWaiters * numTries ); 320 } 321 322 323 void testNotifyAll() 324 { 325 auto mutex = new Mutex; 326 auto condReady = new Condition( mutex ); 327 int numWaiters = 10; 328 int numReady = 0; 329 int numDone = 0; 330 bool alert = false; 331 332 void waiter() 333 { 334 synchronized( mutex ) 335 { 336 ++numReady; 337 while ( !alert ) 338 condReady.wait(); 339 ++numDone; 340 } 341 } 342 343 auto group = new ThreadGroup; 344 345 for ( int i = 0; i < numWaiters; ++i ) 346 group.create( &waiter ); 347 348 while ( true ) 349 { 350 synchronized( mutex ) 351 { 352 if ( numReady >= numWaiters ) 353 { 354 alert = true; 355 condReady.notifyAll(); 356 break; 357 } 358 } 359 Thread.yield(); 360 } 361 group.joinAll(); 362 assert( numReady == numWaiters && numDone == numWaiters ); 363 } 364 365 366 void testWaitTimeout() 367 { 368 auto mutex = new Mutex; 369 auto condReady = new Condition( mutex ); 370 bool waiting = false; 371 bool alertedOne = true; 372 bool alertedTwo = true; 373 374 void waiter() 375 { 376 synchronized( mutex ) 377 { 378 waiting = true; 379 // we never want to miss the notification (30s) 380 alertedOne = condReady.wait( dur!"seconds"(30) ); 381 // but we don't want to wait long for the timeout (10ms) 382 alertedTwo = condReady.wait( dur!"msecs"(10) ); 383 } 384 } 385 386 auto thread = new Thread( &waiter ); 387 thread.start(); 388 389 while ( true ) 390 { 391 synchronized( mutex ) 392 { 393 if ( waiting ) 394 { 395 condReady.notify(); 396 break; 397 } 398 } 399 Thread.yield(); 400 } 401 thread.join(); 402 assert( waiting ); 403 assert( alertedOne ); 404 assert( !alertedTwo ); 405 } 406 407 testNotify(); 408 testNotifyAll(); 409 testWaitTimeout(); 410 } 411 412 unittest 413 { 414 import core.thread; 415 import core.sync.mutex; 416 import core.sync.semaphore; 417 418 419 void testNotify() 420 { 421 auto mutex = new shared Mutex; 422 auto condReady = new shared Condition( mutex ); 423 auto semDone = new Semaphore; 424 auto synLoop = new Object; 425 int numWaiters = 10; 426 int numTries = 10; 427 int numReady = 0; 428 int numTotal = 0; 429 int numDone = 0; 430 int numPost = 0; 431 432 void waiter() 433 { 434 for ( int i = 0; i < numTries; ++i ) 435 { 436 synchronized( mutex ) 437 { 438 while ( numReady < 1 ) 439 { 440 condReady.wait(); 441 } 442 --numReady; 443 ++numTotal; 444 } 445 446 synchronized( synLoop ) 447 { 448 ++numDone; 449 } 450 semDone.wait(); 451 } 452 } 453 454 auto group = new ThreadGroup; 455 456 for ( int i = 0; i < numWaiters; ++i ) 457 group.create( &waiter ); 458 459 for ( int i = 0; i < numTries; ++i ) 460 { 461 for ( int j = 0; j < numWaiters; ++j ) 462 { 463 synchronized( mutex ) 464 { 465 ++numReady; 466 condReady.notify(); 467 } 468 } 469 while ( true ) 470 { 471 synchronized( synLoop ) 472 { 473 if ( numDone >= numWaiters ) 474 break; 475 } 476 Thread.yield(); 477 } 478 for ( int j = 0; j < numWaiters; ++j ) 479 { 480 semDone.notify(); 481 } 482 } 483 484 group.joinAll(); 485 assert( numTotal == numWaiters * numTries ); 486 } 487 488 489 void testNotifyAll() 490 { 491 auto mutex = new shared Mutex; 492 auto condReady = new shared Condition( mutex ); 493 int numWaiters = 10; 494 int numReady = 0; 495 int numDone = 0; 496 bool alert = false; 497 498 void waiter() 499 { 500 synchronized( mutex ) 501 { 502 ++numReady; 503 while ( !alert ) 504 condReady.wait(); 505 ++numDone; 506 } 507 } 508 509 auto group = new ThreadGroup; 510 511 for ( int i = 0; i < numWaiters; ++i ) 512 group.create( &waiter ); 513 514 while ( true ) 515 { 516 synchronized( mutex ) 517 { 518 if ( numReady >= numWaiters ) 519 { 520 alert = true; 521 condReady.notifyAll(); 522 break; 523 } 524 } 525 Thread.yield(); 526 } 527 group.joinAll(); 528 assert( numReady == numWaiters && numDone == numWaiters ); 529 } 530 531 532 void testWaitTimeout() 533 { 534 auto mutex = new shared Mutex; 535 auto condReady = new shared Condition( mutex ); 536 bool waiting = false; 537 bool alertedOne = true; 538 bool alertedTwo = true; 539 540 void waiter() 541 { 542 synchronized( mutex ) 543 { 544 waiting = true; 545 // we never want to miss the notification (30s) 546 alertedOne = condReady.wait( dur!"seconds"(30) ); 547 // but we don't want to wait long for the timeout (10ms) 548 alertedTwo = condReady.wait( dur!"msecs"(10) ); 549 } 550 } 551 552 auto thread = new Thread( &waiter ); 553 thread.start(); 554 555 while ( true ) 556 { 557 synchronized( mutex ) 558 { 559 if ( waiting ) 560 { 561 condReady.notify(); 562 break; 563 } 564 } 565 Thread.yield(); 566 } 567 thread.join(); 568 assert( waiting ); 569 assert( alertedOne ); 570 assert( !alertedTwo ); 571 } 572 573 testNotify(); 574 testNotifyAll(); 575 testWaitTimeout(); 576 }