1 /** 2 * $(SCRIPT inhibitQuickIndex = 1;) 3 * $(DIVC quickindex, 4 * $(BOOKTABLE, 5 * $(TR $(TH Category) $(TH Symbols)) 6 * $(TR $(TD Tid) $(TD 7 * $(MYREF locate) 8 * $(MYREF ownerTid) 9 * $(MYREF register) 10 * $(MYREF spawn) 11 * $(MYREF spawnLinked) 12 * $(MYREF thisTid) 13 * $(MYREF Tid) 14 * $(MYREF TidMissingException) 15 * $(MYREF unregister) 16 * )) 17 * $(TR $(TD Message passing) $(TD 18 * $(MYREF prioritySend) 19 * $(MYREF receive) 20 * $(MYREF receiveOnly) 21 * $(MYREF receiveTimeout) 22 * $(MYREF send) 23 * $(MYREF setMaxMailboxSize) 24 * )) 25 * $(TR $(TD Message-related types) $(TD 26 * $(MYREF LinkTerminated) 27 * $(MYREF MailboxFull) 28 * $(MYREF MessageMismatch) 29 * $(MYREF OnCrowding) 30 * $(MYREF OwnerTerminated) 31 * $(MYREF PriorityMessageException) 32 * )) 33 * $(TR $(TD Scheduler) $(TD 34 * $(MYREF FiberScheduler) 35 * $(MYREF Generator) 36 * $(MYREF Scheduler) 37 * $(MYREF scheduler) 38 * $(MYREF ThreadInfo) 39 * $(MYREF ThreadScheduler) 40 * $(MYREF yield) 41 * )) 42 * $(TR $(TD Misc) $(TD 43 * $(MYREF initOnce) 44 * )) 45 * )) 46 * 47 * This is a low-level messaging API upon which more structured or restrictive 48 * APIs may be built. The general idea is that every messageable entity is 49 * represented by a common handle type called a `Tid`, which allows messages to 50 * be sent to logical threads that are executing in both the current process 51 * and in external processes using the same interface. This is an important 52 * aspect of scalability because it allows the components of a program to be 53 * spread across available resources with few to no changes to the actual 54 * implementation. 55 * 56 * A logical thread is an execution context that has its own stack and which 57 * runs asynchronously to other logical threads. These may be preemptively 58 * scheduled kernel threads, $(MREF_ALTTEXT fibers, core, thread, fiber) 59 * (cooperative user-space threads), or some other concept with similar behavior. 60 * 61 * The type of concurrency used when logical threads are created is determined 62 * by the $(LREF Scheduler) selected at initialization time. The default behavior is 63 * currently to create a new kernel thread per call to spawn, but other 64 * schedulers are available that multiplex fibers across the main thread or 65 * use some combination of the two approaches. 66 * 67 * Copyright: Copyright Sean Kelly 2009 - 2014. 68 * License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>. 69 * Authors: Sean Kelly, Alex Rønne Petersen, Martin Nowak 70 * Source: $(PHOBOSSRC std/concurrency.d) 71 */ 72 /* Copyright Sean Kelly 2009 - 2014. 73 * Distributed under the Boost Software License, Version 1.0. 74 * (See accompanying file LICENSE_1_0.txt or copy at 75 * http://www.boost.org/LICENSE_1_0.txt) 76 */ 77 module std.concurrency; 78 79 public import std.variant; 80 81 import core.atomic; 82 import core.sync.condition; 83 import core.sync.mutex; 84 import core.thread; 85 import std.range.primitives; 86 import std.range.interfaces : InputRange; 87 import std.traits; 88 89 /// 90 @system unittest 91 { 92 __gshared string received; 93 static void spawnedFunc(Tid ownerTid) 94 { 95 import std.conv : text; 96 // Receive a message from the owner thread. 97 receive((int i){ 98 received = text("Received the number ", i); 99 100 // Send a message back to the owner thread 101 // indicating success. 102 send(ownerTid, true); 103 }); 104 } 105 106 // Start spawnedFunc in a new thread. 107 auto childTid = spawn(&spawnedFunc, thisTid); 108 109 // Send the number 42 to this new thread. 110 send(childTid, 42); 111 112 // Receive the result code. 113 auto wasSuccessful = receiveOnly!(bool); 114 assert(wasSuccessful); 115 assert(received == "Received the number 42"); 116 } 117 118 private 119 { 120 bool hasLocalAliasing(Types...)() 121 { 122 import std.typecons : Rebindable; 123 124 // Works around "statement is not reachable" 125 bool doesIt = false; 126 static foreach (T; Types) 127 { 128 static if (is(T == Tid)) 129 { /* Allowed */ } 130 else static if (is(T : Rebindable!R, R)) 131 doesIt |= hasLocalAliasing!R; 132 else static if (is(T == struct)) 133 doesIt |= hasLocalAliasing!(typeof(T.tupleof)); 134 else 135 doesIt |= std.traits.hasUnsharedAliasing!(T); 136 } 137 return doesIt; 138 } 139 140 @safe unittest 141 { 142 static struct Container { Tid t; } 143 static assert(!hasLocalAliasing!(Tid, Container, int)); 144 } 145 146 // https://issues.dlang.org/show_bug.cgi?id=20097 147 @safe unittest 148 { 149 import std.datetime.systime : SysTime; 150 static struct Container { SysTime time; } 151 static assert(!hasLocalAliasing!(SysTime, Container)); 152 } 153 154 enum MsgType 155 { 156 standard, 157 priority, 158 linkDead, 159 } 160 161 struct Message 162 { 163 MsgType type; 164 Variant data; 165 166 this(T...)(MsgType t, T vals) if (T.length > 0) 167 { 168 static if (T.length == 1) 169 { 170 type = t; 171 data = vals[0]; 172 } 173 else 174 { 175 import std.typecons : Tuple; 176 177 type = t; 178 data = Tuple!(T)(vals); 179 } 180 } 181 182 @property auto convertsTo(T...)() 183 { 184 static if (T.length == 1) 185 { 186 return is(T[0] == Variant) || data.convertsTo!(T); 187 } 188 else 189 { 190 import std.typecons : Tuple; 191 return data.convertsTo!(Tuple!(T)); 192 } 193 } 194 195 @property auto get(T...)() 196 { 197 static if (T.length == 1) 198 { 199 static if (is(T[0] == Variant)) 200 return data; 201 else 202 return data.get!(T); 203 } 204 else 205 { 206 import std.typecons : Tuple; 207 return data.get!(Tuple!(T)); 208 } 209 } 210 211 auto map(Op)(Op op) 212 { 213 alias Args = Parameters!(Op); 214 215 static if (Args.length == 1) 216 { 217 static if (is(Args[0] == Variant)) 218 return op(data); 219 else 220 return op(data.get!(Args)); 221 } 222 else 223 { 224 import std.typecons : Tuple; 225 return op(data.get!(Tuple!(Args)).expand); 226 } 227 } 228 } 229 230 void checkops(T...)(T ops) 231 { 232 import std.format : format; 233 234 foreach (i, t1; T) 235 { 236 static assert(isFunctionPointer!t1 || isDelegate!t1, 237 format!"T %d is not a function pointer or delegates"(i)); 238 alias a1 = Parameters!(t1); 239 alias r1 = ReturnType!(t1); 240 241 static if (i < T.length - 1 && is(r1 == void)) 242 { 243 static assert(a1.length != 1 || !is(a1[0] == Variant), 244 "function with arguments " ~ a1.stringof ~ 245 " occludes successive function"); 246 247 foreach (t2; T[i + 1 .. $]) 248 { 249 alias a2 = Parameters!(t2); 250 251 static assert(!is(a1 == a2), 252 "function with arguments " ~ a1.stringof ~ " occludes successive function"); 253 } 254 } 255 } 256 } 257 258 @property ref ThreadInfo thisInfo() nothrow 259 { 260 import core.atomic : atomicLoad; 261 262 auto localScheduler = atomicLoad(scheduler); 263 if (localScheduler is null) 264 return ThreadInfo.thisInfo; 265 return localScheduler.thisInfo; 266 } 267 } 268 269 static ~this() 270 { 271 thisInfo.cleanup(); 272 } 273 274 // Exceptions 275 276 /** 277 * Thrown on calls to $(LREF receiveOnly) if a message other than the type 278 * the receiving thread expected is sent. 279 */ 280 class MessageMismatch : Exception 281 { 282 /// 283 this(string msg = "Unexpected message type") @safe pure nothrow @nogc 284 { 285 super(msg); 286 } 287 } 288 289 /** 290 * Thrown on calls to $(LREF receive) if the thread that spawned the receiving 291 * thread has terminated and no more messages exist. 292 */ 293 class OwnerTerminated : Exception 294 { 295 /// 296 this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc 297 { 298 super(msg); 299 tid = t; 300 } 301 302 Tid tid; 303 } 304 305 /** 306 * Thrown if a linked thread has terminated. 307 */ 308 class LinkTerminated : Exception 309 { 310 /// 311 this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc 312 { 313 super(msg); 314 tid = t; 315 } 316 317 Tid tid; 318 } 319 320 /** 321 * Thrown if a message was sent to a thread via 322 * $(REF prioritySend, std,concurrency) and the receiver does not have a handler 323 * for a message of this type. 324 */ 325 class PriorityMessageException : Exception 326 { 327 /// 328 this(Variant vals) 329 { 330 super("Priority message"); 331 message = vals; 332 } 333 334 /** 335 * The message that was sent. 336 */ 337 Variant message; 338 } 339 340 /** 341 * Thrown on mailbox crowding if the mailbox is configured with 342 * `OnCrowding.throwException`. 343 */ 344 class MailboxFull : Exception 345 { 346 /// 347 this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc 348 { 349 super(msg); 350 tid = t; 351 } 352 353 Tid tid; 354 } 355 356 /** 357 * Thrown when a `Tid` is missing, e.g. when $(LREF ownerTid) doesn't 358 * find an owner thread. 359 */ 360 class TidMissingException : Exception 361 { 362 import std.exception : basicExceptionCtors; 363 /// 364 mixin basicExceptionCtors; 365 } 366 367 368 // Thread ID 369 370 371 /** 372 * An opaque type used to represent a logical thread. 373 */ 374 struct Tid 375 { 376 private: 377 this(MessageBox m) @safe pure nothrow @nogc 378 { 379 mbox = m; 380 } 381 382 MessageBox mbox; 383 384 public: 385 386 /** 387 * Generate a convenient string for identifying this `Tid`. This is only 388 * useful to see if `Tid`'s that are currently executing are the same or 389 * different, e.g. for logging and debugging. It is potentially possible 390 * that a `Tid` executed in the future will have the same `toString()` output 391 * as another `Tid` that has already terminated. 392 */ 393 void toString(W)(ref W w) const 394 { 395 import std.format.write : formattedWrite; 396 auto p = () @trusted { return cast(void*) mbox; }(); 397 formattedWrite(w, "Tid(%x)", p); 398 } 399 400 } 401 402 @safe unittest 403 { 404 import std.conv : text; 405 Tid tid; 406 assert(text(tid) == "Tid(0)"); 407 auto tid2 = thisTid; 408 assert(text(tid2) != "Tid(0)"); 409 auto tid3 = tid2; 410 assert(text(tid2) == text(tid3)); 411 } 412 413 // https://issues.dlang.org/show_bug.cgi?id=21512 414 @system unittest 415 { 416 import std.format : format; 417 418 const(Tid) b = spawn(() {}); 419 assert(format!"%s"(b)[0 .. 4] == "Tid("); 420 } 421 422 /** 423 * Returns: The `Tid` of the caller's thread. 424 */ 425 @property Tid thisTid() @safe 426 { 427 // TODO: remove when concurrency is safe 428 static auto trus() @trusted 429 { 430 if (thisInfo.ident != Tid.init) 431 return thisInfo.ident; 432 thisInfo.ident = Tid(new MessageBox); 433 return thisInfo.ident; 434 } 435 436 return trus(); 437 } 438 439 /** 440 * Return the `Tid` of the thread which spawned the caller's thread. 441 * 442 * Throws: A `TidMissingException` exception if 443 * there is no owner thread. 444 */ 445 @property Tid ownerTid() 446 { 447 import std.exception : enforce; 448 449 enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread."); 450 return thisInfo.owner; 451 } 452 453 @system unittest 454 { 455 import std.exception : assertThrown; 456 457 static void fun() 458 { 459 string res = receiveOnly!string(); 460 assert(res == "Main calling"); 461 ownerTid.send("Child responding"); 462 } 463 464 assertThrown!TidMissingException(ownerTid); 465 auto child = spawn(&fun); 466 child.send("Main calling"); 467 string res = receiveOnly!string(); 468 assert(res == "Child responding"); 469 } 470 471 // Thread Creation 472 473 private template isSpawnable(F, T...) 474 { 475 template isParamsImplicitlyConvertible(F1, F2, int i = 0) 476 { 477 alias param1 = Parameters!F1; 478 alias param2 = Parameters!F2; 479 static if (param1.length != param2.length) 480 enum isParamsImplicitlyConvertible = false; 481 else static if (param1.length == i) 482 enum isParamsImplicitlyConvertible = true; 483 else static if (is(param2[i] : param1[i])) 484 enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1, 485 F2, i + 1); 486 else 487 enum isParamsImplicitlyConvertible = false; 488 } 489 490 enum isSpawnable = isCallable!F && is(ReturnType!F : void) 491 && isParamsImplicitlyConvertible!(F, void function(T)) 492 && (isFunctionPointer!F || !hasUnsharedAliasing!F); 493 } 494 495 /** 496 * Starts `fn(args)` in a new logical thread. 497 * 498 * Executes the supplied function in a new logical thread represented by 499 * `Tid`. The calling thread is designated as the owner of the new thread. 500 * When the owner thread terminates an `OwnerTerminated` message will be 501 * sent to the new thread, causing an `OwnerTerminated` exception to be 502 * thrown on `receive()`. 503 * 504 * Params: 505 * fn = The function to execute. 506 * args = Arguments to the function. 507 * 508 * Returns: 509 * A `Tid` representing the new logical thread. 510 * 511 * Notes: 512 * `args` must not have unshared aliasing. In other words, all arguments 513 * to `fn` must either be `shared` or `immutable` or have no 514 * pointer indirection. This is necessary for enforcing isolation among 515 * threads. 516 * 517 * Similarly, if `fn` is a delegate, it must not have unshared aliases, meaning 518 * `fn` must be either `shared` or `immutable`. */ 519 Tid spawn(F, T...)(F fn, T args) 520 if (isSpawnable!(F, T)) 521 { 522 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 523 return _spawn(false, fn, args); 524 } 525 526 /// 527 @system unittest 528 { 529 static void f(string msg) 530 { 531 assert(msg == "Hello World"); 532 } 533 534 auto tid = spawn(&f, "Hello World"); 535 } 536 537 /// Fails: char[] has mutable aliasing. 538 @system unittest 539 { 540 string msg = "Hello, World!"; 541 542 static void f1(string msg) {} 543 static assert(!__traits(compiles, spawn(&f1, msg.dup))); 544 static assert( __traits(compiles, spawn(&f1, msg.idup))); 545 546 static void f2(char[] msg) {} 547 static assert(!__traits(compiles, spawn(&f2, msg.dup))); 548 static assert(!__traits(compiles, spawn(&f2, msg.idup))); 549 } 550 551 /// New thread with anonymous function 552 @system unittest 553 { 554 spawn({ 555 ownerTid.send("This is so great!"); 556 }); 557 assert(receiveOnly!string == "This is so great!"); 558 } 559 560 @system unittest 561 { 562 import core.thread : thread_joinAll; 563 564 __gshared string receivedMessage; 565 static void f1(string msg) 566 { 567 receivedMessage = msg; 568 } 569 570 auto tid1 = spawn(&f1, "Hello World"); 571 thread_joinAll; 572 assert(receivedMessage == "Hello World"); 573 } 574 575 /** 576 * Starts `fn(args)` in a logical thread and will receive a `LinkTerminated` 577 * message when the operation terminates. 578 * 579 * Executes the supplied function in a new logical thread represented by 580 * `Tid`. This new thread is linked to the calling thread so that if either 581 * it or the calling thread terminates a `LinkTerminated` message will be sent 582 * to the other, causing a `LinkTerminated` exception to be thrown on `receive()`. 583 * The owner relationship from `spawn()` is preserved as well, so if the link 584 * between threads is broken, owner termination will still result in an 585 * `OwnerTerminated` exception to be thrown on `receive()`. 586 * 587 * Params: 588 * fn = The function to execute. 589 * args = Arguments to the function. 590 * 591 * Returns: 592 * A Tid representing the new thread. 593 */ 594 Tid spawnLinked(F, T...)(F fn, T args) 595 if (isSpawnable!(F, T)) 596 { 597 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 598 return _spawn(true, fn, args); 599 } 600 601 /* 602 * 603 */ 604 private Tid _spawn(F, T...)(bool linked, F fn, T args) 605 if (isSpawnable!(F, T)) 606 { 607 // TODO: MessageList and &exec should be shared. 608 auto spawnTid = Tid(new MessageBox); 609 auto ownerTid = thisTid; 610 611 void exec() 612 { 613 thisInfo.ident = spawnTid; 614 thisInfo.owner = ownerTid; 615 fn(args); 616 } 617 618 // TODO: MessageList and &exec should be shared. 619 if (scheduler !is null) 620 scheduler.spawn(&exec); 621 else 622 { 623 auto t = new Thread(&exec); 624 t.start(); 625 } 626 thisInfo.links[spawnTid] = linked; 627 return spawnTid; 628 } 629 630 @system unittest 631 { 632 void function() fn1; 633 void function(int) fn2; 634 static assert(__traits(compiles, spawn(fn1))); 635 static assert(__traits(compiles, spawn(fn2, 2))); 636 static assert(!__traits(compiles, spawn(fn1, 1))); 637 static assert(!__traits(compiles, spawn(fn2))); 638 639 void delegate(int) shared dg1; 640 shared(void delegate(int)) dg2; 641 shared(void delegate(long) shared) dg3; 642 shared(void delegate(real, int, long) shared) dg4; 643 void delegate(int) immutable dg5; 644 void delegate(int) dg6; 645 static assert(__traits(compiles, spawn(dg1, 1))); 646 static assert(__traits(compiles, spawn(dg2, 2))); 647 static assert(__traits(compiles, spawn(dg3, 3))); 648 static assert(__traits(compiles, spawn(dg4, 4, 4, 4))); 649 static assert(__traits(compiles, spawn(dg5, 5))); 650 static assert(!__traits(compiles, spawn(dg6, 6))); 651 652 auto callable1 = new class{ void opCall(int) shared {} }; 653 auto callable2 = cast(shared) new class{ void opCall(int) shared {} }; 654 auto callable3 = new class{ void opCall(int) immutable {} }; 655 auto callable4 = cast(immutable) new class{ void opCall(int) immutable {} }; 656 auto callable5 = new class{ void opCall(int) {} }; 657 auto callable6 = cast(shared) new class{ void opCall(int) immutable {} }; 658 auto callable7 = cast(immutable) new class{ void opCall(int) shared {} }; 659 auto callable8 = cast(shared) new class{ void opCall(int) const shared {} }; 660 auto callable9 = cast(const shared) new class{ void opCall(int) shared {} }; 661 auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} }; 662 auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} }; 663 static assert(!__traits(compiles, spawn(callable1, 1))); 664 static assert( __traits(compiles, spawn(callable2, 2))); 665 static assert(!__traits(compiles, spawn(callable3, 3))); 666 static assert( __traits(compiles, spawn(callable4, 4))); 667 static assert(!__traits(compiles, spawn(callable5, 5))); 668 static assert(!__traits(compiles, spawn(callable6, 6))); 669 static assert(!__traits(compiles, spawn(callable7, 7))); 670 static assert( __traits(compiles, spawn(callable8, 8))); 671 static assert(!__traits(compiles, spawn(callable9, 9))); 672 static assert( __traits(compiles, spawn(callable10, 10))); 673 static assert( __traits(compiles, spawn(callable11, 11))); 674 } 675 676 /** 677 * Places the values as a message at the back of tid's message queue. 678 * 679 * Sends the supplied value to the thread represented by tid. As with 680 * $(REF spawn, std,concurrency), `T` must not have unshared aliasing. 681 */ 682 void send(T...)(Tid tid, T vals) 683 in (tid.mbox !is null) 684 { 685 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 686 _send(tid, vals); 687 } 688 689 /** 690 * Places the values as a message on the front of tid's message queue. 691 * 692 * Send a message to `tid` but place it at the front of `tid`'s message 693 * queue instead of at the back. This function is typically used for 694 * out-of-band communication, to signal exceptional conditions, etc. 695 */ 696 void prioritySend(T...)(Tid tid, T vals) 697 in (tid.mbox !is null) 698 { 699 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 700 _send(MsgType.priority, tid, vals); 701 } 702 703 /* 704 * ditto 705 */ 706 private void _send(T...)(Tid tid, T vals) 707 in (tid.mbox !is null) 708 { 709 _send(MsgType.standard, tid, vals); 710 } 711 712 /* 713 * Implementation of send. This allows parameter checking to be different for 714 * both Tid.send() and .send(). 715 */ 716 private void _send(T...)(MsgType type, Tid tid, T vals) 717 in (tid.mbox !is null) 718 { 719 auto msg = Message(type, vals); 720 tid.mbox.put(msg); 721 } 722 723 /** 724 * Receives a message from another thread. 725 * 726 * Receive a message from another thread, or block if no messages of the 727 * specified types are available. This function works by pattern matching 728 * a message against a set of delegates and executing the first match found. 729 * 730 * If a delegate that accepts a $(REF Variant, std,variant) is included as 731 * the last argument to `receive`, it will match any message that was not 732 * matched by an earlier delegate. If more than one argument is sent, 733 * the `Variant` will contain a $(REF Tuple, std,typecons) of all values 734 * sent. 735 * 736 * Params: 737 * ops = Variadic list of function pointers and delegates. Entries 738 * in this list must not occlude later entries. 739 * 740 * Throws: $(LREF OwnerTerminated) when the sending thread was terminated. 741 */ 742 void receive(T...)( T ops ) 743 in 744 { 745 assert(thisInfo.ident.mbox !is null, 746 "Cannot receive a message until a thread was spawned " 747 ~ "or thisTid was passed to a running thread."); 748 } 749 do 750 { 751 checkops( ops ); 752 753 thisInfo.ident.mbox.get( ops ); 754 } 755 756 /// 757 @system unittest 758 { 759 import std.variant : Variant; 760 761 auto process = () 762 { 763 receive( 764 (int i) { ownerTid.send(1); }, 765 (double f) { ownerTid.send(2); }, 766 (Variant v) { ownerTid.send(3); } 767 ); 768 }; 769 770 { 771 auto tid = spawn(process); 772 send(tid, 42); 773 assert(receiveOnly!int == 1); 774 } 775 776 { 777 auto tid = spawn(process); 778 send(tid, 3.14); 779 assert(receiveOnly!int == 2); 780 } 781 782 { 783 auto tid = spawn(process); 784 send(tid, "something else"); 785 assert(receiveOnly!int == 3); 786 } 787 } 788 789 @safe unittest 790 { 791 static assert( __traits( compiles, 792 { 793 receive( (Variant x) {} ); 794 receive( (int x) {}, (Variant x) {} ); 795 } ) ); 796 797 static assert( !__traits( compiles, 798 { 799 receive( (Variant x) {}, (int x) {} ); 800 } ) ); 801 802 static assert( !__traits( compiles, 803 { 804 receive( (int x) {}, (int x) {} ); 805 } ) ); 806 } 807 808 // Make sure receive() works with free functions as well. 809 version (StdUnittest) 810 { 811 private void receiveFunction(int x) {} 812 } 813 @safe unittest 814 { 815 static assert( __traits( compiles, 816 { 817 receive( &receiveFunction ); 818 receive( &receiveFunction, (Variant x) {} ); 819 } ) ); 820 } 821 822 823 private template receiveOnlyRet(T...) 824 { 825 static if ( T.length == 1 ) 826 { 827 alias receiveOnlyRet = T[0]; 828 } 829 else 830 { 831 import std.typecons : Tuple; 832 alias receiveOnlyRet = Tuple!(T); 833 } 834 } 835 836 /** 837 * Receives only messages with arguments of the specified types. 838 * 839 * Params: 840 * T = Variadic list of types to be received. 841 * 842 * Returns: The received message. If `T` has more than one entry, 843 * the message will be packed into a $(REF Tuple, std,typecons). 844 * 845 * Throws: $(LREF MessageMismatch) if a message of types other than `T` 846 * is received, 847 * $(LREF OwnerTerminated) when the sending thread was terminated. 848 */ 849 receiveOnlyRet!(T) receiveOnly(T...)() 850 in 851 { 852 assert(thisInfo.ident.mbox !is null, 853 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); 854 } 855 do 856 { 857 import std.format : format; 858 import std.meta : allSatisfy; 859 import std.typecons : Tuple; 860 861 Tuple!(T) ret; 862 863 thisInfo.ident.mbox.get((T val) { 864 static if (T.length) 865 { 866 static if (allSatisfy!(isAssignable, T)) 867 { 868 ret.field = val; 869 } 870 else 871 { 872 import core.lifetime : emplace; 873 emplace(&ret, val); 874 } 875 } 876 }, 877 (LinkTerminated e) { throw e; }, 878 (OwnerTerminated e) { throw e; }, 879 (Variant val) { 880 static if (T.length > 1) 881 string exp = T.stringof; 882 else 883 string exp = T[0].stringof; 884 885 throw new MessageMismatch( 886 format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString())); 887 }); 888 static if (T.length == 1) 889 return ret[0]; 890 else 891 return ret; 892 } 893 894 /// 895 @system unittest 896 { 897 auto tid = spawn( 898 { 899 assert(receiveOnly!int == 42); 900 }); 901 send(tid, 42); 902 } 903 904 /// 905 @system unittest 906 { 907 auto tid = spawn( 908 { 909 assert(receiveOnly!string == "text"); 910 }); 911 send(tid, "text"); 912 } 913 914 /// 915 @system unittest 916 { 917 struct Record { string name; int age; } 918 919 auto tid = spawn( 920 { 921 auto msg = receiveOnly!(double, Record); 922 assert(msg[0] == 0.5); 923 assert(msg[1].name == "Alice"); 924 assert(msg[1].age == 31); 925 }); 926 927 send(tid, 0.5, Record("Alice", 31)); 928 } 929 930 @system unittest 931 { 932 static void t1(Tid mainTid) 933 { 934 try 935 { 936 receiveOnly!string(); 937 mainTid.send(""); 938 } 939 catch (Throwable th) 940 { 941 mainTid.send(th.msg); 942 } 943 } 944 945 auto tid = spawn(&t1, thisTid); 946 tid.send(1); 947 string result = receiveOnly!string(); 948 assert(result == "Unexpected message type: expected 'string', got 'int'"); 949 } 950 951 // https://issues.dlang.org/show_bug.cgi?id=21663 952 @safe unittest 953 { 954 alias test = receiveOnly!(string, bool, bool); 955 } 956 957 /** 958 * Receives a message from another thread and gives up if no match 959 * arrives within a specified duration. 960 * 961 * Receive a message from another thread, or block until `duration` exceeds, 962 * if no messages of the specified types are available. This function works 963 * by pattern matching a message against a set of delegates and executing 964 * the first match found. 965 * 966 * If a delegate that accepts a $(REF Variant, std,variant) is included as 967 * the last argument, it will match any message that was not 968 * matched by an earlier delegate. If more than one argument is sent, 969 * the `Variant` will contain a $(REF Tuple, std,typecons) of all values 970 * sent. 971 * 972 * Params: 973 * duration = Duration, how long to wait. If `duration` is negative, 974 * won't wait at all. 975 * ops = Variadic list of function pointers and delegates. Entries 976 * in this list must not occlude later entries. 977 * 978 * Returns: `true` if it received a message and `false` if it timed out waiting 979 * for one. 980 * 981 * Throws: $(LREF OwnerTerminated) when the sending thread was terminated. 982 */ 983 bool receiveTimeout(T...)(Duration duration, T ops) 984 in 985 { 986 assert(thisInfo.ident.mbox !is null, 987 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); 988 } 989 do 990 { 991 checkops(ops); 992 993 return thisInfo.ident.mbox.get(duration, ops); 994 } 995 996 @safe unittest 997 { 998 static assert(__traits(compiles, { 999 receiveTimeout(msecs(0), (Variant x) {}); 1000 receiveTimeout(msecs(0), (int x) {}, (Variant x) {}); 1001 })); 1002 1003 static assert(!__traits(compiles, { 1004 receiveTimeout(msecs(0), (Variant x) {}, (int x) {}); 1005 })); 1006 1007 static assert(!__traits(compiles, { 1008 receiveTimeout(msecs(0), (int x) {}, (int x) {}); 1009 })); 1010 1011 static assert(__traits(compiles, { 1012 receiveTimeout(msecs(10), (int x) {}, (Variant x) {}); 1013 })); 1014 } 1015 1016 // MessageBox Limits 1017 1018 /** 1019 * These behaviors may be specified when a mailbox is full. 1020 */ 1021 enum OnCrowding 1022 { 1023 block, /// Wait until room is available. 1024 throwException, /// Throw a $(LREF MailboxFull) exception. 1025 ignore /// Abort the send and return. 1026 } 1027 1028 private 1029 { 1030 bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc 1031 { 1032 return true; 1033 } 1034 1035 bool onCrowdingThrow(Tid tid) @safe pure 1036 { 1037 throw new MailboxFull(tid); 1038 } 1039 1040 bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc 1041 { 1042 return false; 1043 } 1044 } 1045 1046 /** 1047 * Sets a maximum mailbox size. 1048 * 1049 * Sets a limit on the maximum number of user messages allowed in the mailbox. 1050 * If this limit is reached, the caller attempting to add a new message will 1051 * execute the behavior specified by doThis. If messages is zero, the mailbox 1052 * is unbounded. 1053 * 1054 * Params: 1055 * tid = The Tid of the thread for which this limit should be set. 1056 * messages = The maximum number of messages or zero if no limit. 1057 * doThis = The behavior executed when a message is sent to a full 1058 * mailbox. 1059 */ 1060 void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure 1061 in (tid.mbox !is null) 1062 { 1063 final switch (doThis) 1064 { 1065 case OnCrowding.block: 1066 return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock); 1067 case OnCrowding.throwException: 1068 return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow); 1069 case OnCrowding.ignore: 1070 return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore); 1071 } 1072 } 1073 1074 /** 1075 * Sets a maximum mailbox size. 1076 * 1077 * Sets a limit on the maximum number of user messages allowed in the mailbox. 1078 * If this limit is reached, the caller attempting to add a new message will 1079 * execute onCrowdingDoThis. If messages is zero, the mailbox is unbounded. 1080 * 1081 * Params: 1082 * tid = The Tid of the thread for which this limit should be set. 1083 * messages = The maximum number of messages or zero if no limit. 1084 * onCrowdingDoThis = The routine called when a message is sent to a full 1085 * mailbox. 1086 */ 1087 void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis) 1088 in (tid.mbox !is null) 1089 { 1090 tid.mbox.setMaxMsgs(messages, onCrowdingDoThis); 1091 } 1092 1093 private 1094 { 1095 __gshared Tid[string] tidByName; 1096 __gshared string[][Tid] namesByTid; 1097 } 1098 1099 private @property Mutex registryLock() @system 1100 { 1101 __gshared Mutex impl; 1102 initOnce!impl(new Mutex); 1103 return impl; 1104 } 1105 1106 private void unregisterMe(ref ThreadInfo me) 1107 { 1108 if (me.ident != Tid.init) 1109 { 1110 synchronized (registryLock) 1111 { 1112 if (auto allNames = me.ident in namesByTid) 1113 { 1114 foreach (name; *allNames) 1115 tidByName.remove(name); 1116 namesByTid.remove(me.ident); 1117 } 1118 } 1119 } 1120 } 1121 1122 /** 1123 * Associates name with tid. 1124 * 1125 * Associates name with tid in a process-local map. When the thread 1126 * represented by tid terminates, any names associated with it will be 1127 * automatically unregistered. 1128 * 1129 * Params: 1130 * name = The name to associate with tid. 1131 * tid = The tid register by name. 1132 * 1133 * Returns: 1134 * true if the name is available and tid is not known to represent a 1135 * defunct thread. 1136 */ 1137 bool register(string name, Tid tid) 1138 in (tid.mbox !is null) 1139 { 1140 synchronized (registryLock) 1141 { 1142 if (name in tidByName) 1143 return false; 1144 if (tid.mbox.isClosed) 1145 return false; 1146 namesByTid[tid] ~= name; 1147 tidByName[name] = tid; 1148 return true; 1149 } 1150 } 1151 1152 /** 1153 * Removes the registered name associated with a tid. 1154 * 1155 * Params: 1156 * name = The name to unregister. 1157 * 1158 * Returns: 1159 * true if the name is registered, false if not. 1160 */ 1161 bool unregister(string name) 1162 { 1163 import std.algorithm.mutation : remove, SwapStrategy; 1164 import std.algorithm.searching : countUntil; 1165 1166 synchronized (registryLock) 1167 { 1168 if (auto tid = name in tidByName) 1169 { 1170 auto allNames = *tid in namesByTid; 1171 auto pos = countUntil(*allNames, name); 1172 remove!(SwapStrategy.unstable)(*allNames, pos); 1173 tidByName.remove(name); 1174 return true; 1175 } 1176 return false; 1177 } 1178 } 1179 1180 /** 1181 * Gets the `Tid` associated with name. 1182 * 1183 * Params: 1184 * name = The name to locate within the registry. 1185 * 1186 * Returns: 1187 * The associated `Tid` or `Tid.init` if name is not registered. 1188 */ 1189 Tid locate(string name) 1190 { 1191 synchronized (registryLock) 1192 { 1193 if (auto tid = name in tidByName) 1194 return *tid; 1195 return Tid.init; 1196 } 1197 } 1198 1199 /** 1200 * Encapsulates all implementation-level data needed for scheduling. 1201 * 1202 * When defining a $(LREF Scheduler), an instance of this struct must be associated 1203 * with each logical thread. It contains all implementation-level information 1204 * needed by the internal API. 1205 */ 1206 struct ThreadInfo 1207 { 1208 Tid ident; 1209 bool[Tid] links; 1210 Tid owner; 1211 1212 /** 1213 * Gets a thread-local instance of `ThreadInfo`. 1214 * 1215 * Gets a thread-local instance of `ThreadInfo`, which should be used as the 1216 * default instance when info is requested for a thread not created by the 1217 * `Scheduler`. 1218 */ 1219 static @property ref thisInfo() nothrow 1220 { 1221 static ThreadInfo val; 1222 return val; 1223 } 1224 1225 /** 1226 * Cleans up this ThreadInfo. 1227 * 1228 * This must be called when a scheduled thread terminates. It tears down 1229 * the messaging system for the thread and notifies interested parties of 1230 * the thread's termination. 1231 */ 1232 void cleanup() 1233 { 1234 if (ident.mbox !is null) 1235 ident.mbox.close(); 1236 foreach (tid; links.keys) 1237 _send(MsgType.linkDead, tid, ident); 1238 if (owner != Tid.init) 1239 _send(MsgType.linkDead, owner, ident); 1240 unregisterMe(this); // clean up registry entries 1241 } 1242 1243 // https://issues.dlang.org/show_bug.cgi?id=20160 1244 @system unittest 1245 { 1246 register("main_thread", thisTid()); 1247 1248 ThreadInfo t; 1249 t.cleanup(); 1250 1251 assert(locate("main_thread") == thisTid()); 1252 } 1253 } 1254 1255 /** 1256 * A `Scheduler` controls how threading is performed by spawn. 1257 * 1258 * Implementing a `Scheduler` allows the concurrency mechanism used by this 1259 * module to be customized according to different needs. By default, a call 1260 * to spawn will create a new kernel thread that executes the supplied routine 1261 * and terminates when finished. But it is possible to create `Scheduler`s that 1262 * reuse threads, that multiplex `Fiber`s (coroutines) across a single thread, 1263 * or any number of other approaches. By making the choice of `Scheduler` a 1264 * user-level option, `std.concurrency` may be used for far more types of 1265 * application than if this behavior were predefined. 1266 * 1267 * Example: 1268 * --- 1269 * import std.concurrency; 1270 * import std.stdio; 1271 * 1272 * void main() 1273 * { 1274 * scheduler = new FiberScheduler; 1275 * scheduler.start( 1276 * { 1277 * writeln("the rest of main goes here"); 1278 * }); 1279 * } 1280 * --- 1281 * 1282 * Some schedulers have a dispatching loop that must run if they are to work 1283 * properly, so for the sake of consistency, when using a scheduler, `start()` 1284 * must be called within `main()`. This yields control to the scheduler and 1285 * will ensure that any spawned threads are executed in an expected manner. 1286 */ 1287 interface Scheduler 1288 { 1289 /** 1290 * Spawns the supplied op and starts the `Scheduler`. 1291 * 1292 * This is intended to be called at the start of the program to yield all 1293 * scheduling to the active `Scheduler` instance. This is necessary for 1294 * schedulers that explicitly dispatch threads rather than simply relying 1295 * on the operating system to do so, and so start should always be called 1296 * within `main()` to begin normal program execution. 1297 * 1298 * Params: 1299 * op = A wrapper for whatever the main thread would have done in the 1300 * absence of a custom scheduler. It will be automatically executed 1301 * via a call to spawn by the `Scheduler`. 1302 */ 1303 void start(void delegate() op); 1304 1305 /** 1306 * Assigns a logical thread to execute the supplied op. 1307 * 1308 * This routine is called by spawn. It is expected to instantiate a new 1309 * logical thread and run the supplied operation. This thread must call 1310 * `thisInfo.cleanup()` when the thread terminates if the scheduled thread 1311 * is not a kernel thread--all kernel threads will have their `ThreadInfo` 1312 * cleaned up automatically by a thread-local destructor. 1313 * 1314 * Params: 1315 * op = The function to execute. This may be the actual function passed 1316 * by the user to spawn itself, or may be a wrapper function. 1317 */ 1318 void spawn(void delegate() op); 1319 1320 /** 1321 * Yields execution to another logical thread. 1322 * 1323 * This routine is called at various points within concurrency-aware APIs 1324 * to provide a scheduler a chance to yield execution when using some sort 1325 * of cooperative multithreading model. If this is not appropriate, such 1326 * as when each logical thread is backed by a dedicated kernel thread, 1327 * this routine may be a no-op. 1328 */ 1329 void yield() nothrow; 1330 1331 /** 1332 * Returns an appropriate `ThreadInfo` instance. 1333 * 1334 * Returns an instance of `ThreadInfo` specific to the logical thread that 1335 * is calling this routine or, if the calling thread was not create by 1336 * this scheduler, returns `ThreadInfo.thisInfo` instead. 1337 */ 1338 @property ref ThreadInfo thisInfo() nothrow; 1339 1340 /** 1341 * Creates a `Condition` variable analog for signaling. 1342 * 1343 * Creates a new `Condition` variable analog which is used to check for and 1344 * to signal the addition of messages to a thread's message queue. Like 1345 * yield, some schedulers may need to define custom behavior so that calls 1346 * to `Condition.wait()` yield to another thread when no new messages are 1347 * available instead of blocking. 1348 * 1349 * Params: 1350 * m = The `Mutex` that will be associated with this condition. It will be 1351 * locked prior to any operation on the condition, and so in some 1352 * cases a `Scheduler` may need to hold this reference and unlock the 1353 * mutex before yielding execution to another logical thread. 1354 */ 1355 Condition newCondition(Mutex m) nothrow; 1356 } 1357 1358 /** 1359 * An example `Scheduler` using kernel threads. 1360 * 1361 * This is an example `Scheduler` that mirrors the default scheduling behavior 1362 * of creating one kernel thread per call to spawn. It is fully functional 1363 * and may be instantiated and used, but is not a necessary part of the 1364 * default functioning of this module. 1365 */ 1366 class ThreadScheduler : Scheduler 1367 { 1368 /** 1369 * This simply runs op directly, since no real scheduling is needed by 1370 * this approach. 1371 */ 1372 void start(void delegate() op) 1373 { 1374 op(); 1375 } 1376 1377 /** 1378 * Creates a new kernel thread and assigns it to run the supplied op. 1379 */ 1380 void spawn(void delegate() op) 1381 { 1382 auto t = new Thread(op); 1383 t.start(); 1384 } 1385 1386 /** 1387 * This scheduler does no explicit multiplexing, so this is a no-op. 1388 */ 1389 void yield() nothrow 1390 { 1391 // no explicit yield needed 1392 } 1393 1394 /** 1395 * Returns `ThreadInfo.thisInfo`, since it is a thread-local instance of 1396 * `ThreadInfo`, which is the correct behavior for this scheduler. 1397 */ 1398 @property ref ThreadInfo thisInfo() nothrow 1399 { 1400 return ThreadInfo.thisInfo; 1401 } 1402 1403 /** 1404 * Creates a new `Condition` variable. No custom behavior is needed here. 1405 */ 1406 Condition newCondition(Mutex m) nothrow 1407 { 1408 return new Condition(m); 1409 } 1410 } 1411 1412 /** 1413 * An example `Scheduler` using $(MREF_ALTTEXT `Fiber`s, core, thread, fiber). 1414 * 1415 * This is an example scheduler that creates a new `Fiber` per call to spawn 1416 * and multiplexes the execution of all fibers within the main thread. 1417 */ 1418 class FiberScheduler : Scheduler 1419 { 1420 /** 1421 * This creates a new `Fiber` for the supplied op and then starts the 1422 * dispatcher. 1423 */ 1424 void start(void delegate() op) 1425 { 1426 create(op); 1427 dispatch(); 1428 } 1429 1430 /** 1431 * This created a new `Fiber` for the supplied op and adds it to the 1432 * dispatch list. 1433 */ 1434 void spawn(void delegate() op) nothrow 1435 { 1436 create(op); 1437 yield(); 1438 } 1439 1440 /** 1441 * If the caller is a scheduled `Fiber`, this yields execution to another 1442 * scheduled `Fiber`. 1443 */ 1444 void yield() nothrow 1445 { 1446 // NOTE: It's possible that we should test whether the calling Fiber 1447 // is an InfoFiber before yielding, but I think it's reasonable 1448 // that any (non-Generator) fiber should yield here. 1449 if (Fiber.getThis()) 1450 Fiber.yield(); 1451 } 1452 1453 /** 1454 * Returns an appropriate `ThreadInfo` instance. 1455 * 1456 * Returns a `ThreadInfo` instance specific to the calling `Fiber` if the 1457 * `Fiber` was created by this dispatcher, otherwise it returns 1458 * `ThreadInfo.thisInfo`. 1459 */ 1460 @property ref ThreadInfo thisInfo() nothrow 1461 { 1462 auto f = cast(InfoFiber) Fiber.getThis(); 1463 1464 if (f !is null) 1465 return f.info; 1466 return ThreadInfo.thisInfo; 1467 } 1468 1469 /** 1470 * Returns a `Condition` analog that yields when wait or notify is called. 1471 * 1472 * Bug: 1473 * For the default implementation, `notifyAll` will behave like `notify`. 1474 * 1475 * Params: 1476 * m = A `Mutex` to use for locking if the condition needs to be waited on 1477 * or notified from multiple `Thread`s. 1478 * If `null`, no `Mutex` will be used and it is assumed that the 1479 * `Condition` is only waited on/notified from one `Thread`. 1480 */ 1481 Condition newCondition(Mutex m) nothrow 1482 { 1483 return new FiberCondition(m); 1484 } 1485 1486 protected: 1487 /** 1488 * Creates a new `Fiber` which calls the given delegate. 1489 * 1490 * Params: 1491 * op = The delegate the fiber should call 1492 */ 1493 void create(void delegate() op) nothrow 1494 { 1495 void wrap() 1496 { 1497 scope (exit) 1498 { 1499 thisInfo.cleanup(); 1500 } 1501 op(); 1502 } 1503 1504 m_fibers ~= new InfoFiber(&wrap); 1505 } 1506 1507 /** 1508 * `Fiber` which embeds a `ThreadInfo` 1509 */ 1510 static class InfoFiber : Fiber 1511 { 1512 ThreadInfo info; 1513 1514 this(void delegate() op) nothrow 1515 { 1516 super(op); 1517 } 1518 1519 this(void delegate() op, size_t sz) nothrow 1520 { 1521 super(op, sz); 1522 } 1523 } 1524 1525 private: 1526 class FiberCondition : Condition 1527 { 1528 this(Mutex m) nothrow 1529 { 1530 super(m); 1531 notified = false; 1532 } 1533 1534 override void wait() nothrow 1535 { 1536 scope (exit) notified = false; 1537 1538 while (!notified) 1539 switchContext(); 1540 } 1541 1542 override bool wait(Duration period) nothrow 1543 { 1544 import core.time : MonoTime; 1545 1546 scope (exit) notified = false; 1547 1548 for (auto limit = MonoTime.currTime + period; 1549 !notified && !period.isNegative; 1550 period = limit - MonoTime.currTime) 1551 { 1552 this.outer.yield(); 1553 } 1554 return notified; 1555 } 1556 1557 override void notify() nothrow 1558 { 1559 notified = true; 1560 switchContext(); 1561 } 1562 1563 override void notifyAll() nothrow 1564 { 1565 notified = true; 1566 switchContext(); 1567 } 1568 1569 private: 1570 void switchContext() nothrow 1571 { 1572 if (mutex_nothrow) mutex_nothrow.unlock_nothrow(); 1573 scope (exit) 1574 if (mutex_nothrow) 1575 mutex_nothrow.lock_nothrow(); 1576 this.outer.yield(); 1577 } 1578 1579 bool notified; 1580 } 1581 1582 void dispatch() 1583 { 1584 import std.algorithm.mutation : remove; 1585 1586 while (m_fibers.length > 0) 1587 { 1588 auto t = m_fibers[m_pos].call(Fiber.Rethrow.no); 1589 if (t !is null && !(cast(OwnerTerminated) t)) 1590 { 1591 throw t; 1592 } 1593 if (m_fibers[m_pos].state == Fiber.State.TERM) 1594 { 1595 if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length) 1596 m_pos = 0; 1597 } 1598 else if (m_pos++ >= m_fibers.length - 1) 1599 { 1600 m_pos = 0; 1601 } 1602 } 1603 } 1604 1605 Fiber[] m_fibers; 1606 size_t m_pos; 1607 } 1608 1609 @system unittest 1610 { 1611 static void receive(Condition cond, ref size_t received) 1612 { 1613 while (true) 1614 { 1615 synchronized (cond.mutex) 1616 { 1617 cond.wait(); 1618 ++received; 1619 } 1620 } 1621 } 1622 1623 static void send(Condition cond, ref size_t sent) 1624 { 1625 while (true) 1626 { 1627 synchronized (cond.mutex) 1628 { 1629 ++sent; 1630 cond.notify(); 1631 } 1632 } 1633 } 1634 1635 auto fs = new FiberScheduler; 1636 auto mtx = new Mutex; 1637 auto cond = fs.newCondition(mtx); 1638 1639 size_t received, sent; 1640 auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); }); 1641 waiter.call(); 1642 assert(received == 0); 1643 notifier.call(); 1644 assert(sent == 1); 1645 assert(received == 0); 1646 waiter.call(); 1647 assert(received == 1); 1648 waiter.call(); 1649 assert(received == 1); 1650 } 1651 1652 /** 1653 * Sets the `Scheduler` behavior within the program. 1654 * 1655 * This variable sets the `Scheduler` behavior within this program. Typically, 1656 * when setting a `Scheduler`, `scheduler.start()` should be called in `main`. This 1657 * routine will not return until program execution is complete. 1658 */ 1659 __gshared Scheduler scheduler; 1660 1661 // Generator 1662 1663 /** 1664 * If the caller is a `Fiber` and is not a $(LREF Generator), this function will call 1665 * `scheduler.yield()` or `Fiber.yield()`, as appropriate. 1666 */ 1667 void yield() nothrow 1668 { 1669 auto fiber = Fiber.getThis(); 1670 if (!(cast(IsGenerator) fiber)) 1671 { 1672 if (scheduler is null) 1673 { 1674 if (fiber) 1675 return Fiber.yield(); 1676 } 1677 else 1678 scheduler.yield(); 1679 } 1680 } 1681 1682 /// Used to determine whether a Generator is running. 1683 private interface IsGenerator {} 1684 1685 1686 /** 1687 * A Generator is a $(MREF_ALTTEXT Fiber, core, thread, fiber) 1688 * that periodically returns values of type `T` to the 1689 * caller via `yield`. This is represented as an InputRange. 1690 */ 1691 class Generator(T) : 1692 Fiber, IsGenerator, InputRange!T 1693 { 1694 /** 1695 * Initializes a generator object which is associated with a static 1696 * D function. The function will be called once to prepare the range 1697 * for iteration. 1698 * 1699 * Params: 1700 * fn = The fiber function. 1701 * 1702 * In: 1703 * fn must not be null. 1704 */ 1705 this(void function() fn) 1706 { 1707 super(fn); 1708 call(); 1709 } 1710 1711 /** 1712 * Initializes a generator object which is associated with a static 1713 * D function. The function will be called once to prepare the range 1714 * for iteration. 1715 * 1716 * Params: 1717 * fn = The fiber function. 1718 * sz = The stack size for this fiber. 1719 * 1720 * In: 1721 * fn must not be null. 1722 */ 1723 this(void function() fn, size_t sz) 1724 { 1725 super(fn, sz); 1726 call(); 1727 } 1728 1729 /** 1730 * Initializes a generator object which is associated with a static 1731 * D function. The function will be called once to prepare the range 1732 * for iteration. 1733 * 1734 * Params: 1735 * fn = The fiber function. 1736 * sz = The stack size for this fiber. 1737 * guardPageSize = size of the guard page to trap fiber's stack 1738 * overflows. Refer to $(REF Fiber, core,thread)'s 1739 * documentation for more details. 1740 * 1741 * In: 1742 * fn must not be null. 1743 */ 1744 this(void function() fn, size_t sz, size_t guardPageSize) 1745 { 1746 super(fn, sz, guardPageSize); 1747 call(); 1748 } 1749 1750 /** 1751 * Initializes a generator object which is associated with a dynamic 1752 * D function. The function will be called once to prepare the range 1753 * for iteration. 1754 * 1755 * Params: 1756 * dg = The fiber function. 1757 * 1758 * In: 1759 * dg must not be null. 1760 */ 1761 this(void delegate() dg) 1762 { 1763 super(dg); 1764 call(); 1765 } 1766 1767 /** 1768 * Initializes a generator object which is associated with a dynamic 1769 * D function. The function will be called once to prepare the range 1770 * for iteration. 1771 * 1772 * Params: 1773 * dg = The fiber function. 1774 * sz = The stack size for this fiber. 1775 * 1776 * In: 1777 * dg must not be null. 1778 */ 1779 this(void delegate() dg, size_t sz) 1780 { 1781 super(dg, sz); 1782 call(); 1783 } 1784 1785 /** 1786 * Initializes a generator object which is associated with a dynamic 1787 * D function. The function will be called once to prepare the range 1788 * for iteration. 1789 * 1790 * Params: 1791 * dg = The fiber function. 1792 * sz = The stack size for this fiber. 1793 * guardPageSize = size of the guard page to trap fiber's stack 1794 * overflows. Refer to $(REF Fiber, core,thread)'s 1795 * documentation for more details. 1796 * 1797 * In: 1798 * dg must not be null. 1799 */ 1800 this(void delegate() dg, size_t sz, size_t guardPageSize) 1801 { 1802 super(dg, sz, guardPageSize); 1803 call(); 1804 } 1805 1806 /** 1807 * Returns true if the generator is empty. 1808 */ 1809 final bool empty() @property 1810 { 1811 return m_value is null || state == State.TERM; 1812 } 1813 1814 /** 1815 * Obtains the next value from the underlying function. 1816 */ 1817 final void popFront() 1818 { 1819 call(); 1820 } 1821 1822 /** 1823 * Returns the most recently generated value by shallow copy. 1824 */ 1825 final T front() @property 1826 { 1827 return *m_value; 1828 } 1829 1830 /** 1831 * Returns the most recently generated value without executing a 1832 * copy contructor. Will not compile for element types defining a 1833 * postblit, because `Generator` does not return by reference. 1834 */ 1835 final T moveFront() 1836 { 1837 static if (!hasElaborateCopyConstructor!T) 1838 { 1839 return front; 1840 } 1841 else 1842 { 1843 static assert(0, 1844 "Fiber front is always rvalue and thus cannot be moved since it defines a postblit."); 1845 } 1846 } 1847 1848 final int opApply(scope int delegate(T) loopBody) 1849 { 1850 int broken; 1851 for (; !empty; popFront()) 1852 { 1853 broken = loopBody(front); 1854 if (broken) break; 1855 } 1856 return broken; 1857 } 1858 1859 final int opApply(scope int delegate(size_t, T) loopBody) 1860 { 1861 int broken; 1862 for (size_t i; !empty; ++i, popFront()) 1863 { 1864 broken = loopBody(i, front); 1865 if (broken) break; 1866 } 1867 return broken; 1868 } 1869 private: 1870 T* m_value; 1871 } 1872 1873 /// 1874 @system unittest 1875 { 1876 auto tid = spawn({ 1877 int i; 1878 while (i < 9) 1879 i = receiveOnly!int; 1880 1881 ownerTid.send(i * 2); 1882 }); 1883 1884 auto r = new Generator!int({ 1885 foreach (i; 1 .. 10) 1886 yield(i); 1887 }); 1888 1889 foreach (e; r) 1890 tid.send(e); 1891 1892 assert(receiveOnly!int == 18); 1893 } 1894 1895 /** 1896 * Yields a value of type T to the caller of the currently executing 1897 * generator. 1898 * 1899 * Params: 1900 * value = The value to yield. 1901 */ 1902 void yield(T)(ref T value) 1903 { 1904 Generator!T cur = cast(Generator!T) Fiber.getThis(); 1905 if (cur !is null && cur.state == Fiber.State.EXEC) 1906 { 1907 cur.m_value = &value; 1908 return Fiber.yield(); 1909 } 1910 throw new Exception("yield(T) called with no active generator for the supplied type"); 1911 } 1912 1913 /// ditto 1914 void yield(T)(T value) 1915 { 1916 yield(value); 1917 } 1918 1919 @system unittest 1920 { 1921 import core.exception; 1922 import std.exception; 1923 1924 auto mainTid = thisTid; 1925 alias testdg = () { 1926 auto tid = spawn( 1927 (Tid mainTid) { 1928 int i; 1929 scope (failure) mainTid.send(false); 1930 try 1931 { 1932 for (i = 1; i < 10; i++) 1933 { 1934 if (receiveOnly!int() != i) 1935 { 1936 mainTid.send(false); 1937 break; 1938 } 1939 } 1940 } 1941 catch (OwnerTerminated e) 1942 { 1943 // i will advance 1 past the last value expected 1944 mainTid.send(i == 4); 1945 } 1946 }, mainTid); 1947 auto r = new Generator!int( 1948 { 1949 assertThrown!Exception(yield(2.0)); 1950 yield(); // ensure this is a no-op 1951 yield(1); 1952 yield(); // also once something has been yielded 1953 yield(2); 1954 yield(3); 1955 }); 1956 1957 foreach (e; r) 1958 { 1959 tid.send(e); 1960 } 1961 }; 1962 1963 scheduler = new ThreadScheduler; 1964 scheduler.spawn(testdg); 1965 assert(receiveOnly!bool()); 1966 1967 scheduler = new FiberScheduler; 1968 scheduler.start(testdg); 1969 assert(receiveOnly!bool()); 1970 scheduler = null; 1971 } 1972 /// 1973 @system unittest 1974 { 1975 import std.range; 1976 1977 InputRange!int myIota = iota(10).inputRangeObject; 1978 1979 myIota.popFront(); 1980 myIota.popFront(); 1981 assert(myIota.moveFront == 2); 1982 assert(myIota.front == 2); 1983 myIota.popFront(); 1984 assert(myIota.front == 3); 1985 1986 //can be assigned to std.range.interfaces.InputRange directly 1987 myIota = new Generator!int( 1988 { 1989 foreach (i; 0 .. 10) yield(i); 1990 }); 1991 1992 myIota.popFront(); 1993 myIota.popFront(); 1994 assert(myIota.moveFront == 2); 1995 assert(myIota.front == 2); 1996 myIota.popFront(); 1997 assert(myIota.front == 3); 1998 1999 size_t[2] counter = [0, 0]; 2000 foreach (i, unused; myIota) counter[] += [1, i]; 2001 2002 assert(myIota.empty); 2003 assert(counter == [7, 21]); 2004 } 2005 2006 private 2007 { 2008 /* 2009 * A MessageBox is a message queue for one thread. Other threads may send 2010 * messages to this owner by calling put(), and the owner receives them by 2011 * calling get(). The put() call is therefore effectively shared and the 2012 * get() call is effectively local. setMaxMsgs may be used by any thread 2013 * to limit the size of the message queue. 2014 */ 2015 class MessageBox 2016 { 2017 this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */ 2018 { 2019 m_lock = new Mutex; 2020 m_closed = false; 2021 2022 if (scheduler is null) 2023 { 2024 m_putMsg = new Condition(m_lock); 2025 m_notFull = new Condition(m_lock); 2026 } 2027 else 2028 { 2029 m_putMsg = scheduler.newCondition(m_lock); 2030 m_notFull = scheduler.newCondition(m_lock); 2031 } 2032 } 2033 2034 /// 2035 final @property bool isClosed() @safe @nogc pure 2036 { 2037 synchronized (m_lock) 2038 { 2039 return m_closed; 2040 } 2041 } 2042 2043 /* 2044 * Sets a limit on the maximum number of user messages allowed in the 2045 * mailbox. If this limit is reached, the caller attempting to add 2046 * a new message will execute call. If num is zero, there is no limit 2047 * on the message queue. 2048 * 2049 * Params: 2050 * num = The maximum size of the queue or zero if the queue is 2051 * unbounded. 2052 * call = The routine to call when the queue is full. 2053 */ 2054 final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure 2055 { 2056 synchronized (m_lock) 2057 { 2058 m_maxMsgs = num; 2059 m_onMaxMsgs = call; 2060 } 2061 } 2062 2063 /* 2064 * If maxMsgs is not set, the message is added to the queue and the 2065 * owner is notified. If the queue is full, the message will still be 2066 * accepted if it is a control message, otherwise onCrowdingDoThis is 2067 * called. If the routine returns true, this call will block until 2068 * the owner has made space available in the queue. If it returns 2069 * false, this call will abort. 2070 * 2071 * Params: 2072 * msg = The message to put in the queue. 2073 * 2074 * Throws: 2075 * An exception if the queue is full and onCrowdingDoThis throws. 2076 */ 2077 final void put(ref Message msg) 2078 { 2079 synchronized (m_lock) 2080 { 2081 // TODO: Generate an error here if m_closed is true, or maybe 2082 // put a message in the caller's queue? 2083 if (!m_closed) 2084 { 2085 while (true) 2086 { 2087 if (isPriorityMsg(msg)) 2088 { 2089 m_sharedPty.put(msg); 2090 m_putMsg.notify(); 2091 return; 2092 } 2093 if (!mboxFull() || isControlMsg(msg)) 2094 { 2095 m_sharedBox.put(msg); 2096 m_putMsg.notify(); 2097 return; 2098 } 2099 if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid)) 2100 { 2101 return; 2102 } 2103 m_putQueue++; 2104 m_notFull.wait(); 2105 m_putQueue--; 2106 } 2107 } 2108 } 2109 } 2110 2111 /* 2112 * Matches ops against each message in turn until a match is found. 2113 * 2114 * Params: 2115 * ops = The operations to match. Each may return a bool to indicate 2116 * whether a message with a matching type is truly a match. 2117 * 2118 * Returns: 2119 * true if a message was retrieved and false if not (such as if a 2120 * timeout occurred). 2121 * 2122 * Throws: 2123 * LinkTerminated if a linked thread terminated, or OwnerTerminated 2124 * if the owner thread terminates and no existing messages match the 2125 * supplied ops. 2126 */ 2127 bool get(T...)(scope T vals) 2128 { 2129 import std.meta : AliasSeq; 2130 2131 static assert(T.length, "T must not be empty"); 2132 2133 static if (is(T[0] : Duration)) 2134 { 2135 alias Ops = AliasSeq!(T[1 .. $]); 2136 alias ops = vals[1 .. $]; 2137 enum timedWait = true; 2138 Duration period = vals[0]; 2139 } 2140 else 2141 { 2142 alias Ops = AliasSeq!(T); 2143 alias ops = vals[0 .. $]; 2144 enum timedWait = false; 2145 } 2146 2147 bool onStandardMsg(ref Message msg) 2148 { 2149 foreach (i, t; Ops) 2150 { 2151 alias Args = Parameters!(t); 2152 auto op = ops[i]; 2153 2154 if (msg.convertsTo!(Args)) 2155 { 2156 alias RT = ReturnType!(t); 2157 static if (is(RT == bool)) 2158 { 2159 return msg.map(op); 2160 } 2161 else 2162 { 2163 msg.map(op); 2164 static if (!is(immutable RT == immutable noreturn)) 2165 return true; 2166 } 2167 } 2168 } 2169 return false; 2170 } 2171 2172 bool onLinkDeadMsg(ref Message msg) 2173 { 2174 assert(msg.convertsTo!(Tid), 2175 "Message could be converted to Tid"); 2176 auto tid = msg.get!(Tid); 2177 2178 if (bool* pDepends = tid in thisInfo.links) 2179 { 2180 auto depends = *pDepends; 2181 thisInfo.links.remove(tid); 2182 // Give the owner relationship precedence. 2183 if (depends && tid != thisInfo.owner) 2184 { 2185 auto e = new LinkTerminated(tid); 2186 auto m = Message(MsgType.standard, e); 2187 if (onStandardMsg(m)) 2188 return true; 2189 throw e; 2190 } 2191 } 2192 if (tid == thisInfo.owner) 2193 { 2194 thisInfo.owner = Tid.init; 2195 auto e = new OwnerTerminated(tid); 2196 auto m = Message(MsgType.standard, e); 2197 if (onStandardMsg(m)) 2198 return true; 2199 throw e; 2200 } 2201 return false; 2202 } 2203 2204 bool onControlMsg(ref Message msg) 2205 { 2206 switch (msg.type) 2207 { 2208 case MsgType.linkDead: 2209 return onLinkDeadMsg(msg); 2210 default: 2211 return false; 2212 } 2213 } 2214 2215 bool scan(ref ListT list) 2216 { 2217 for (auto range = list[]; !range.empty;) 2218 { 2219 // Only the message handler will throw, so if this occurs 2220 // we can be certain that the message was handled. 2221 scope (failure) 2222 list.removeAt(range); 2223 2224 if (isControlMsg(range.front)) 2225 { 2226 if (onControlMsg(range.front)) 2227 { 2228 // Although the linkDead message is a control message, 2229 // it can be handled by the user. Since the linkDead 2230 // message throws if not handled, if we get here then 2231 // it has been handled and we can return from receive. 2232 // This is a weird special case that will have to be 2233 // handled in a more general way if more are added. 2234 if (!isLinkDeadMsg(range.front)) 2235 { 2236 list.removeAt(range); 2237 continue; 2238 } 2239 list.removeAt(range); 2240 return true; 2241 } 2242 range.popFront(); 2243 continue; 2244 } 2245 else 2246 { 2247 if (onStandardMsg(range.front)) 2248 { 2249 list.removeAt(range); 2250 return true; 2251 } 2252 range.popFront(); 2253 continue; 2254 } 2255 } 2256 return false; 2257 } 2258 2259 bool pty(ref ListT list) 2260 { 2261 if (!list.empty) 2262 { 2263 auto range = list[]; 2264 2265 if (onStandardMsg(range.front)) 2266 { 2267 list.removeAt(range); 2268 return true; 2269 } 2270 if (range.front.convertsTo!(Throwable)) 2271 throw range.front.get!(Throwable); 2272 else if (range.front.convertsTo!(shared(Throwable))) 2273 /* Note: a shared type can be caught without the shared qualifier 2274 * so throwing shared will be an error */ 2275 throw cast() range.front.get!(shared(Throwable)); 2276 else 2277 throw new PriorityMessageException(range.front.data); 2278 } 2279 return false; 2280 } 2281 2282 static if (timedWait) 2283 { 2284 import core.time : MonoTime; 2285 auto limit = MonoTime.currTime + period; 2286 } 2287 2288 while (true) 2289 { 2290 ListT arrived; 2291 2292 if (pty(m_localPty) || scan(m_localBox)) 2293 { 2294 return true; 2295 } 2296 yield(); 2297 synchronized (m_lock) 2298 { 2299 updateMsgCount(); 2300 while (m_sharedPty.empty && m_sharedBox.empty) 2301 { 2302 // NOTE: We're notifying all waiters here instead of just 2303 // a few because the onCrowding behavior may have 2304 // changed and we don't want to block sender threads 2305 // unnecessarily if the new behavior is not to block. 2306 // This will admittedly result in spurious wakeups 2307 // in other situations, but what can you do? 2308 if (m_putQueue && !mboxFull()) 2309 m_notFull.notifyAll(); 2310 static if (timedWait) 2311 { 2312 if (period <= Duration.zero || !m_putMsg.wait(period)) 2313 return false; 2314 } 2315 else 2316 { 2317 m_putMsg.wait(); 2318 } 2319 } 2320 m_localPty.put(m_sharedPty); 2321 arrived.put(m_sharedBox); 2322 } 2323 if (m_localPty.empty) 2324 { 2325 scope (exit) m_localBox.put(arrived); 2326 if (scan(arrived)) 2327 { 2328 return true; 2329 } 2330 else 2331 { 2332 static if (timedWait) 2333 { 2334 period = limit - MonoTime.currTime; 2335 } 2336 continue; 2337 } 2338 } 2339 m_localBox.put(arrived); 2340 pty(m_localPty); 2341 return true; 2342 } 2343 } 2344 2345 /* 2346 * Called on thread termination. This routine processes any remaining 2347 * control messages, clears out message queues, and sets a flag to 2348 * reject any future messages. 2349 */ 2350 final void close() 2351 { 2352 static void onLinkDeadMsg(ref Message msg) 2353 { 2354 assert(msg.convertsTo!(Tid), 2355 "Message could be converted to Tid"); 2356 auto tid = msg.get!(Tid); 2357 2358 thisInfo.links.remove(tid); 2359 if (tid == thisInfo.owner) 2360 thisInfo.owner = Tid.init; 2361 } 2362 2363 static void sweep(ref ListT list) 2364 { 2365 for (auto range = list[]; !range.empty; range.popFront()) 2366 { 2367 if (range.front.type == MsgType.linkDead) 2368 onLinkDeadMsg(range.front); 2369 } 2370 } 2371 2372 ListT arrived; 2373 2374 sweep(m_localBox); 2375 synchronized (m_lock) 2376 { 2377 arrived.put(m_sharedBox); 2378 m_closed = true; 2379 } 2380 m_localBox.clear(); 2381 sweep(arrived); 2382 } 2383 2384 private: 2385 // Routines involving local data only, no lock needed. 2386 2387 bool mboxFull() @safe @nogc pure nothrow 2388 { 2389 return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length; 2390 } 2391 2392 void updateMsgCount() @safe @nogc pure nothrow 2393 { 2394 m_localMsgs = m_localBox.length; 2395 } 2396 2397 bool isControlMsg(ref Message msg) @safe @nogc pure nothrow 2398 { 2399 return msg.type != MsgType.standard && msg.type != MsgType.priority; 2400 } 2401 2402 bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow 2403 { 2404 return msg.type == MsgType.priority; 2405 } 2406 2407 bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow 2408 { 2409 return msg.type == MsgType.linkDead; 2410 } 2411 2412 alias OnMaxFn = bool function(Tid); 2413 alias ListT = List!(Message); 2414 2415 ListT m_localBox; 2416 ListT m_localPty; 2417 2418 Mutex m_lock; 2419 Condition m_putMsg; 2420 Condition m_notFull; 2421 size_t m_putQueue; 2422 ListT m_sharedBox; 2423 ListT m_sharedPty; 2424 OnMaxFn m_onMaxMsgs; 2425 size_t m_localMsgs; 2426 size_t m_maxMsgs; 2427 bool m_closed; 2428 } 2429 2430 /* 2431 * 2432 */ 2433 struct List(T) 2434 { 2435 struct Range 2436 { 2437 import std.exception : enforce; 2438 2439 @property bool empty() const 2440 { 2441 return !m_prev.next; 2442 } 2443 2444 @property ref T front() 2445 { 2446 enforce(m_prev.next, "invalid list node"); 2447 return m_prev.next.val; 2448 } 2449 2450 @property void front(T val) 2451 { 2452 enforce(m_prev.next, "invalid list node"); 2453 m_prev.next.val = val; 2454 } 2455 2456 void popFront() 2457 { 2458 enforce(m_prev.next, "invalid list node"); 2459 m_prev = m_prev.next; 2460 } 2461 2462 private this(Node* p) 2463 { 2464 m_prev = p; 2465 } 2466 2467 private Node* m_prev; 2468 } 2469 2470 void put(T val) 2471 { 2472 put(newNode(val)); 2473 } 2474 2475 void put(ref List!(T) rhs) 2476 { 2477 if (!rhs.empty) 2478 { 2479 put(rhs.m_first); 2480 while (m_last.next !is null) 2481 { 2482 m_last = m_last.next; 2483 m_count++; 2484 } 2485 rhs.m_first = null; 2486 rhs.m_last = null; 2487 rhs.m_count = 0; 2488 } 2489 } 2490 2491 Range opSlice() 2492 { 2493 return Range(cast(Node*)&m_first); 2494 } 2495 2496 void removeAt(Range r) 2497 { 2498 import std.exception : enforce; 2499 2500 assert(m_count, "Can not remove from empty Range"); 2501 Node* n = r.m_prev; 2502 enforce(n && n.next, "attempting to remove invalid list node"); 2503 2504 if (m_last is m_first) 2505 m_last = null; 2506 else if (m_last is n.next) 2507 m_last = n; // nocoverage 2508 Node* to_free = n.next; 2509 n.next = n.next.next; 2510 freeNode(to_free); 2511 m_count--; 2512 } 2513 2514 @property size_t length() 2515 { 2516 return m_count; 2517 } 2518 2519 void clear() 2520 { 2521 m_first = m_last = null; 2522 m_count = 0; 2523 } 2524 2525 @property bool empty() 2526 { 2527 return m_first is null; 2528 } 2529 2530 private: 2531 struct Node 2532 { 2533 Node* next; 2534 T val; 2535 2536 this(T v) 2537 { 2538 val = v; 2539 } 2540 } 2541 2542 static shared struct SpinLock 2543 { 2544 void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } } 2545 void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); } 2546 bool locked; 2547 } 2548 2549 static shared SpinLock sm_lock; 2550 static shared Node* sm_head; 2551 2552 Node* newNode(T v) 2553 { 2554 Node* n; 2555 { 2556 sm_lock.lock(); 2557 scope (exit) sm_lock.unlock(); 2558 2559 if (sm_head) 2560 { 2561 n = cast(Node*) sm_head; 2562 sm_head = sm_head.next; 2563 } 2564 } 2565 if (n) 2566 { 2567 import core.lifetime : emplace; 2568 emplace!Node(n, v); 2569 } 2570 else 2571 { 2572 n = new Node(v); 2573 } 2574 return n; 2575 } 2576 2577 void freeNode(Node* n) 2578 { 2579 // destroy val to free any owned GC memory 2580 destroy(n.val); 2581 2582 sm_lock.lock(); 2583 scope (exit) sm_lock.unlock(); 2584 2585 auto sn = cast(shared(Node)*) n; 2586 sn.next = sm_head; 2587 sm_head = sn; 2588 } 2589 2590 void put(Node* n) 2591 { 2592 m_count++; 2593 if (!empty) 2594 { 2595 m_last.next = n; 2596 m_last = n; 2597 return; 2598 } 2599 m_first = n; 2600 m_last = n; 2601 } 2602 2603 Node* m_first; 2604 Node* m_last; 2605 size_t m_count; 2606 } 2607 } 2608 2609 @system unittest 2610 { 2611 import std.typecons : tuple, Tuple; 2612 2613 static void testfn(Tid tid) 2614 { 2615 receive((float val) { assert(0); }, (int val, int val2) { 2616 assert(val == 42 && val2 == 86); 2617 }); 2618 receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); }); 2619 receive((Variant val) { }); 2620 receive((string val) { 2621 if ("the quick brown fox" != val) 2622 return false; 2623 return true; 2624 }, (string val) { assert(false); }); 2625 prioritySend(tid, "done"); 2626 } 2627 2628 static void runTest(Tid tid) 2629 { 2630 send(tid, 42, 86); 2631 send(tid, tuple(42, 86)); 2632 send(tid, "hello", "there"); 2633 send(tid, "the quick brown fox"); 2634 receive((string val) { assert(val == "done"); }); 2635 } 2636 2637 static void simpleTest() 2638 { 2639 auto tid = spawn(&testfn, thisTid); 2640 runTest(tid); 2641 2642 // Run the test again with a limited mailbox size. 2643 tid = spawn(&testfn, thisTid); 2644 setMaxMailboxSize(tid, 2, OnCrowding.block); 2645 runTest(tid); 2646 } 2647 2648 simpleTest(); 2649 2650 scheduler = new ThreadScheduler; 2651 simpleTest(); 2652 scheduler = null; 2653 } 2654 2655 private @property shared(Mutex) initOnceLock() 2656 { 2657 static shared Mutex lock; 2658 if (auto mtx = atomicLoad!(MemoryOrder.acq)(lock)) 2659 return mtx; 2660 auto mtx = new shared Mutex; 2661 if (cas(&lock, cast(shared) null, mtx)) 2662 return mtx; 2663 return atomicLoad!(MemoryOrder.acq)(lock); 2664 } 2665 2666 /** 2667 * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a 2668 * thread-safe manner. 2669 * 2670 * The implementation guarantees that all threads simultaneously calling 2671 * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is 2672 * fully initialized. All side-effects of $(D_PARAM init) are globally visible 2673 * afterwards. 2674 * 2675 * Params: 2676 * var = The variable to initialize 2677 * init = The lazy initializer value 2678 * 2679 * Returns: 2680 * A reference to the initialized variable 2681 */ 2682 auto ref initOnce(alias var)(lazy typeof(var) init) 2683 { 2684 return initOnce!var(init, initOnceLock); 2685 } 2686 2687 /// A typical use-case is to perform lazy but thread-safe initialization. 2688 @system unittest 2689 { 2690 static class MySingleton 2691 { 2692 static MySingleton instance() 2693 { 2694 __gshared MySingleton inst; 2695 return initOnce!inst(new MySingleton); 2696 } 2697 } 2698 2699 assert(MySingleton.instance !is null); 2700 } 2701 2702 @system unittest 2703 { 2704 static class MySingleton 2705 { 2706 static MySingleton instance() 2707 { 2708 __gshared MySingleton inst; 2709 return initOnce!inst(new MySingleton); 2710 } 2711 2712 private: 2713 this() { val = ++cnt; } 2714 size_t val; 2715 __gshared size_t cnt; 2716 } 2717 2718 foreach (_; 0 .. 10) 2719 spawn({ ownerTid.send(MySingleton.instance.val); }); 2720 foreach (_; 0 .. 10) 2721 assert(receiveOnly!size_t == MySingleton.instance.val); 2722 assert(MySingleton.cnt == 1); 2723 } 2724 2725 /** 2726 * Same as above, but takes a separate mutex instead of sharing one among 2727 * all initOnce instances. 2728 * 2729 * This should be used to avoid dead-locks when the $(D_PARAM init) 2730 * expression waits for the result of another thread that might also 2731 * call initOnce. Use with care. 2732 * 2733 * Params: 2734 * var = The variable to initialize 2735 * init = The lazy initializer value 2736 * mutex = A mutex to prevent race conditions 2737 * 2738 * Returns: 2739 * A reference to the initialized variable 2740 */ 2741 auto ref initOnce(alias var)(lazy typeof(var) init, shared Mutex mutex) 2742 { 2743 // check that var is global, can't take address of a TLS variable 2744 static assert(is(typeof({ __gshared p = &var; })), 2745 "var must be 'static shared' or '__gshared'."); 2746 import core.atomic : atomicLoad, MemoryOrder, atomicStore; 2747 2748 static shared bool flag; 2749 if (!atomicLoad!(MemoryOrder.acq)(flag)) 2750 { 2751 synchronized (mutex) 2752 { 2753 if (!atomicLoad!(MemoryOrder.raw)(flag)) 2754 { 2755 var = init; 2756 static if (!is(immutable typeof(var) == immutable noreturn)) 2757 atomicStore!(MemoryOrder.rel)(flag, true); 2758 } 2759 } 2760 } 2761 return var; 2762 } 2763 2764 /// ditto 2765 auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex) 2766 { 2767 return initOnce!var(init, cast(shared) mutex); 2768 } 2769 2770 /// Use a separate mutex when init blocks on another thread that might also call initOnce. 2771 @system unittest 2772 { 2773 import core.sync.mutex : Mutex; 2774 2775 static shared bool varA, varB; 2776 static shared Mutex m; 2777 m = new shared Mutex; 2778 2779 spawn({ 2780 // use a different mutex for varB to avoid a dead-lock 2781 initOnce!varB(true, m); 2782 ownerTid.send(true); 2783 }); 2784 // init depends on the result of the spawned thread 2785 initOnce!varA(receiveOnly!bool); 2786 assert(varA == true); 2787 assert(varB == true); 2788 } 2789 2790 @system unittest 2791 { 2792 static shared bool a; 2793 __gshared bool b; 2794 static bool c; 2795 bool d; 2796 initOnce!a(true); 2797 initOnce!b(true); 2798 static assert(!__traits(compiles, initOnce!c(true))); // TLS 2799 static assert(!__traits(compiles, initOnce!d(true))); // local variable 2800 } 2801 2802 // test ability to send shared arrays 2803 @system unittest 2804 { 2805 static shared int[] x = new shared(int)[1]; 2806 auto tid = spawn({ 2807 auto arr = receiveOnly!(shared(int)[]); 2808 arr[0] = 5; 2809 ownerTid.send(true); 2810 }); 2811 tid.send(x); 2812 receiveOnly!(bool); 2813 assert(x[0] == 5); 2814 } 2815 2816 // https://issues.dlang.org/show_bug.cgi?id=13930 2817 @system unittest 2818 { 2819 immutable aa = ["0":0]; 2820 thisTid.send(aa); 2821 receiveOnly!(immutable int[string]); // compile error 2822 } 2823 2824 // https://issues.dlang.org/show_bug.cgi?id=19345 2825 @system unittest 2826 { 2827 static struct Aggregate { const int a; const int[5] b; } 2828 static void t1(Tid mainTid) 2829 { 2830 const sendMe = Aggregate(42, [1, 2, 3, 4, 5]); 2831 mainTid.send(sendMe); 2832 } 2833 2834 spawn(&t1, thisTid); 2835 auto result1 = receiveOnly!(const Aggregate)(); 2836 immutable expected = Aggregate(42, [1, 2, 3, 4, 5]); 2837 assert(result1 == expected); 2838 } 2839 2840 // Noreturn support 2841 @system unittest 2842 { 2843 static noreturn foo(int) { throw new Exception(""); } 2844 2845 if (false) spawn(&foo, 1); 2846 if (false) spawnLinked(&foo, 1); 2847 2848 if (false) receive(&foo); 2849 if (false) receiveTimeout(Duration.init, &foo); 2850 2851 // Wrapped in __traits(compiles) to skip codegen which crashes dmd's backend 2852 static assert(__traits(compiles, receiveOnly!noreturn() )); 2853 static assert(__traits(compiles, send(Tid.init, noreturn.init) )); 2854 static assert(__traits(compiles, prioritySend(Tid.init, noreturn.init) )); 2855 static assert(__traits(compiles, yield(noreturn.init) )); 2856 2857 static assert(__traits(compiles, { 2858 __gshared noreturn n; 2859 initOnce!n(noreturn.init); 2860 })); 2861 }