1 /** 2 * $(SCRIPT inhibitQuickIndex = 1;) 3 * $(DIVC quickindex, 4 * $(BOOKTABLE, 5 * $(TR $(TH Category) $(TH Symbols)) 6 * $(TR $(TD Tid) $(TD 7 * $(MYREF locate) 8 * $(MYREF ownerTid) 9 * $(MYREF register) 10 * $(MYREF spawn) 11 * $(MYREF spawnLinked) 12 * $(MYREF thisTid) 13 * $(MYREF Tid) 14 * $(MYREF TidMissingException) 15 * $(MYREF unregister) 16 * )) 17 * $(TR $(TD Message passing) $(TD 18 * $(MYREF prioritySend) 19 * $(MYREF receive) 20 * $(MYREF receiveOnly) 21 * $(MYREF receiveTimeout) 22 * $(MYREF send) 23 * $(MYREF setMaxMailboxSize) 24 * )) 25 * $(TR $(TD Message-related types) $(TD 26 * $(MYREF LinkTerminated) 27 * $(MYREF MailboxFull) 28 * $(MYREF MessageMismatch) 29 * $(MYREF OnCrowding) 30 * $(MYREF OwnerTerminated) 31 * $(MYREF PriorityMessageException) 32 * )) 33 * $(TR $(TD Scheduler) $(TD 34 * $(MYREF FiberScheduler) 35 * $(MYREF Generator) 36 * $(MYREF Scheduler) 37 * $(MYREF scheduler) 38 * $(MYREF ThreadInfo) 39 * $(MYREF ThreadScheduler) 40 * $(MYREF yield) 41 * )) 42 * $(TR $(TD Misc) $(TD 43 * $(MYREF initOnce) 44 * )) 45 * )) 46 * 47 * This is a low-level messaging API upon which more structured or restrictive 48 * APIs may be built. The general idea is that every messageable entity is 49 * represented by a common handle type called a `Tid`, which allows messages to 50 * be sent to logical threads that are executing in both the current process 51 * and in external processes using the same interface. This is an important 52 * aspect of scalability because it allows the components of a program to be 53 * spread across available resources with few to no changes to the actual 54 * implementation. 55 * 56 * A logical thread is an execution context that has its own stack and which 57 * runs asynchronously to other logical threads. These may be preemptively 58 * scheduled kernel threads, $(MREF_ALTTEXT fibers, core, thread, fiber) 59 * (cooperative user-space threads), or some other concept with similar behavior. 60 * 61 * The type of concurrency used when logical threads are created is determined 62 * by the $(LREF Scheduler) selected at initialization time. The default behavior is 63 * currently to create a new kernel thread per call to spawn, but other 64 * schedulers are available that multiplex fibers across the main thread or 65 * use some combination of the two approaches. 66 * 67 * Copyright: Copyright Sean Kelly 2009 - 2014. 68 * License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>. 69 * Authors: Sean Kelly, Alex Rønne Petersen, Martin Nowak 70 * Source: $(PHOBOSSRC std/concurrency.d) 71 */ 72 /* Copyright Sean Kelly 2009 - 2014. 73 * Distributed under the Boost Software License, Version 1.0. 74 * (See accompanying file LICENSE_1_0.txt or copy at 75 * http://www.boost.org/LICENSE_1_0.txt) 76 */ 77 module std.concurrency; 78 79 version(WebAssembly) { 80 struct Tid {} 81 auto thisTid() { return Tid.init; } 82 auto ref initOnce(alias var)(lazy typeof(var) init) { return var = init; } 83 } else: 84 85 public import std.variant; 86 87 import core.atomic; 88 import core.sync.condition; 89 import core.sync.mutex; 90 import core.thread; 91 import std.range.primitives; 92 import std.range.interfaces : InputRange; 93 import std.traits; 94 95 /// 96 @system unittest 97 { 98 __gshared string received; 99 static void spawnedFunc(Tid ownerTid) 100 { 101 import std.conv : text; 102 // Receive a message from the owner thread. 103 receive((int i){ 104 received = text("Received the number ", i); 105 106 // Send a message back to the owner thread 107 // indicating success. 108 send(ownerTid, true); 109 }); 110 } 111 112 // Start spawnedFunc in a new thread. 113 auto childTid = spawn(&spawnedFunc, thisTid); 114 115 // Send the number 42 to this new thread. 116 send(childTid, 42); 117 118 // Receive the result code. 119 auto wasSuccessful = receiveOnly!(bool); 120 assert(wasSuccessful); 121 assert(received == "Received the number 42"); 122 } 123 124 private 125 { 126 bool hasLocalAliasing(Types...)() 127 { 128 import std.typecons : Rebindable; 129 130 // Works around "statement is not reachable" 131 bool doesIt = false; 132 static foreach (T; Types) 133 { 134 static if (is(T == Tid)) 135 { /* Allowed */ } 136 else static if (is(T : Rebindable!R, R)) 137 doesIt |= hasLocalAliasing!R; 138 else static if (is(T == struct)) 139 doesIt |= hasLocalAliasing!(typeof(T.tupleof)); 140 else 141 doesIt |= std.traits.hasUnsharedAliasing!(T); 142 } 143 return doesIt; 144 } 145 146 @safe unittest 147 { 148 static struct Container { Tid t; } 149 static assert(!hasLocalAliasing!(Tid, Container, int)); 150 } 151 152 // https://issues.dlang.org/show_bug.cgi?id=20097 153 @safe unittest 154 { 155 import std.datetime.systime : SysTime; 156 static struct Container { SysTime time; } 157 static assert(!hasLocalAliasing!(SysTime, Container)); 158 } 159 160 enum MsgType 161 { 162 standard, 163 priority, 164 linkDead, 165 } 166 167 struct Message 168 { 169 MsgType type; 170 Variant data; 171 172 this(T...)(MsgType t, T vals) if (T.length > 0) 173 { 174 static if (T.length == 1) 175 { 176 type = t; 177 data = vals[0]; 178 } 179 else 180 { 181 import std.typecons : Tuple; 182 183 type = t; 184 data = Tuple!(T)(vals); 185 } 186 } 187 188 @property auto convertsTo(T...)() 189 { 190 static if (T.length == 1) 191 { 192 return is(T[0] == Variant) || data.convertsTo!(T); 193 } 194 else 195 { 196 import std.typecons : Tuple; 197 return data.convertsTo!(Tuple!(T)); 198 } 199 } 200 201 @property auto get(T...)() 202 { 203 static if (T.length == 1) 204 { 205 static if (is(T[0] == Variant)) 206 return data; 207 else 208 return data.get!(T); 209 } 210 else 211 { 212 import std.typecons : Tuple; 213 return data.get!(Tuple!(T)); 214 } 215 } 216 217 auto map(Op)(Op op) 218 { 219 alias Args = Parameters!(Op); 220 221 static if (Args.length == 1) 222 { 223 static if (is(Args[0] == Variant)) 224 return op(data); 225 else 226 return op(data.get!(Args)); 227 } 228 else 229 { 230 import std.typecons : Tuple; 231 return op(data.get!(Tuple!(Args)).expand); 232 } 233 } 234 } 235 236 void checkops(T...)(T ops) 237 { 238 import std.format : format; 239 240 foreach (i, t1; T) 241 { 242 static assert(isFunctionPointer!t1 || isDelegate!t1, 243 format!"T %d is not a function pointer or delegates"(i)); 244 alias a1 = Parameters!(t1); 245 alias r1 = ReturnType!(t1); 246 247 static if (i < T.length - 1 && is(r1 == void)) 248 { 249 static assert(a1.length != 1 || !is(a1[0] == Variant), 250 "function with arguments " ~ a1.stringof ~ 251 " occludes successive function"); 252 253 foreach (t2; T[i + 1 .. $]) 254 { 255 alias a2 = Parameters!(t2); 256 257 static assert(!is(a1 == a2), 258 "function with arguments " ~ a1.stringof ~ " occludes successive function"); 259 } 260 } 261 } 262 } 263 264 @property ref ThreadInfo thisInfo() nothrow 265 { 266 import core.atomic : atomicLoad; 267 268 auto localScheduler = atomicLoad(scheduler); 269 if (localScheduler is null) 270 return ThreadInfo.thisInfo; 271 return localScheduler.thisInfo; 272 } 273 } 274 275 static ~this() 276 { 277 thisInfo.cleanup(); 278 } 279 280 // Exceptions 281 282 /** 283 * Thrown on calls to $(LREF receiveOnly) if a message other than the type 284 * the receiving thread expected is sent. 285 */ 286 class MessageMismatch : Exception 287 { 288 /// 289 this(string msg = "Unexpected message type") @safe pure nothrow @nogc 290 { 291 super(msg); 292 } 293 } 294 295 /** 296 * Thrown on calls to $(LREF receive) if the thread that spawned the receiving 297 * thread has terminated and no more messages exist. 298 */ 299 class OwnerTerminated : Exception 300 { 301 /// 302 this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc 303 { 304 super(msg); 305 tid = t; 306 } 307 308 Tid tid; 309 } 310 311 /** 312 * Thrown if a linked thread has terminated. 313 */ 314 class LinkTerminated : Exception 315 { 316 /// 317 this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc 318 { 319 super(msg); 320 tid = t; 321 } 322 323 Tid tid; 324 } 325 326 /** 327 * Thrown if a message was sent to a thread via 328 * $(REF prioritySend, std,concurrency) and the receiver does not have a handler 329 * for a message of this type. 330 */ 331 class PriorityMessageException : Exception 332 { 333 /// 334 this(Variant vals) 335 { 336 super("Priority message"); 337 message = vals; 338 } 339 340 /** 341 * The message that was sent. 342 */ 343 Variant message; 344 } 345 346 /** 347 * Thrown on mailbox crowding if the mailbox is configured with 348 * `OnCrowding.throwException`. 349 */ 350 class MailboxFull : Exception 351 { 352 /// 353 this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc 354 { 355 super(msg); 356 tid = t; 357 } 358 359 Tid tid; 360 } 361 362 /** 363 * Thrown when a `Tid` is missing, e.g. when $(LREF ownerTid) doesn't 364 * find an owner thread. 365 */ 366 class TidMissingException : Exception 367 { 368 import std.exception : basicExceptionCtors; 369 /// 370 mixin basicExceptionCtors; 371 } 372 373 374 // Thread ID 375 376 377 /** 378 * An opaque type used to represent a logical thread. 379 */ 380 struct Tid 381 { 382 private: 383 this(MessageBox m) @safe pure nothrow @nogc 384 { 385 mbox = m; 386 } 387 388 MessageBox mbox; 389 390 public: 391 392 /** 393 * Generate a convenient string for identifying this `Tid`. This is only 394 * useful to see if `Tid`'s that are currently executing are the same or 395 * different, e.g. for logging and debugging. It is potentially possible 396 * that a `Tid` executed in the future will have the same `toString()` output 397 * as another `Tid` that has already terminated. 398 */ 399 void toString(W)(ref W w) const 400 { 401 import std.format.write : formattedWrite; 402 auto p = () @trusted { return cast(void*) mbox; }(); 403 formattedWrite(w, "Tid(%x)", p); 404 } 405 406 } 407 408 @safe unittest 409 { 410 import std.conv : text; 411 Tid tid; 412 assert(text(tid) == "Tid(0)"); 413 auto tid2 = thisTid; 414 assert(text(tid2) != "Tid(0)"); 415 auto tid3 = tid2; 416 assert(text(tid2) == text(tid3)); 417 } 418 419 // https://issues.dlang.org/show_bug.cgi?id=21512 420 @system unittest 421 { 422 import std.format : format; 423 424 const(Tid) b = spawn(() {}); 425 assert(format!"%s"(b)[0 .. 4] == "Tid("); 426 } 427 428 /** 429 * Returns: The `Tid` of the caller's thread. 430 */ 431 @property Tid thisTid() @safe 432 { 433 // TODO: remove when concurrency is safe 434 static auto trus() @trusted 435 { 436 if (thisInfo.ident != Tid.init) 437 return thisInfo.ident; 438 thisInfo.ident = Tid(new MessageBox); 439 return thisInfo.ident; 440 } 441 442 return trus(); 443 } 444 445 /** 446 * Return the `Tid` of the thread which spawned the caller's thread. 447 * 448 * Throws: A `TidMissingException` exception if 449 * there is no owner thread. 450 */ 451 @property Tid ownerTid() 452 { 453 import std.exception : enforce; 454 455 enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread."); 456 return thisInfo.owner; 457 } 458 459 @system unittest 460 { 461 import std.exception : assertThrown; 462 463 static void fun() 464 { 465 string res = receiveOnly!string(); 466 assert(res == "Main calling"); 467 ownerTid.send("Child responding"); 468 } 469 470 assertThrown!TidMissingException(ownerTid); 471 auto child = spawn(&fun); 472 child.send("Main calling"); 473 string res = receiveOnly!string(); 474 assert(res == "Child responding"); 475 } 476 477 // Thread Creation 478 479 private template isSpawnable(F, T...) 480 { 481 template isParamsImplicitlyConvertible(F1, F2, int i = 0) 482 { 483 alias param1 = Parameters!F1; 484 alias param2 = Parameters!F2; 485 static if (param1.length != param2.length) 486 enum isParamsImplicitlyConvertible = false; 487 else static if (param1.length == i) 488 enum isParamsImplicitlyConvertible = true; 489 else static if (is(param2[i] : param1[i])) 490 enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1, 491 F2, i + 1); 492 else 493 enum isParamsImplicitlyConvertible = false; 494 } 495 496 enum isSpawnable = isCallable!F && is(ReturnType!F : void) 497 && isParamsImplicitlyConvertible!(F, void function(T)) 498 && (isFunctionPointer!F || !hasUnsharedAliasing!F); 499 } 500 501 /** 502 * Starts `fn(args)` in a new logical thread. 503 * 504 * Executes the supplied function in a new logical thread represented by 505 * `Tid`. The calling thread is designated as the owner of the new thread. 506 * When the owner thread terminates an `OwnerTerminated` message will be 507 * sent to the new thread, causing an `OwnerTerminated` exception to be 508 * thrown on `receive()`. 509 * 510 * Params: 511 * fn = The function to execute. 512 * args = Arguments to the function. 513 * 514 * Returns: 515 * A `Tid` representing the new logical thread. 516 * 517 * Notes: 518 * `args` must not have unshared aliasing. In other words, all arguments 519 * to `fn` must either be `shared` or `immutable` or have no 520 * pointer indirection. This is necessary for enforcing isolation among 521 * threads. 522 * 523 * Similarly, if `fn` is a delegate, it must not have unshared aliases, meaning 524 * `fn` must be either `shared` or `immutable`. */ 525 Tid spawn(F, T...)(F fn, T args) 526 if (isSpawnable!(F, T)) 527 { 528 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 529 return _spawn(false, fn, args); 530 } 531 532 /// 533 @system unittest 534 { 535 static void f(string msg) 536 { 537 assert(msg == "Hello World"); 538 } 539 540 auto tid = spawn(&f, "Hello World"); 541 } 542 543 /// Fails: char[] has mutable aliasing. 544 @system unittest 545 { 546 string msg = "Hello, World!"; 547 548 static void f1(string msg) {} 549 static assert(!__traits(compiles, spawn(&f1, msg.dup))); 550 static assert( __traits(compiles, spawn(&f1, msg.idup))); 551 552 static void f2(char[] msg) {} 553 static assert(!__traits(compiles, spawn(&f2, msg.dup))); 554 static assert(!__traits(compiles, spawn(&f2, msg.idup))); 555 } 556 557 /// New thread with anonymous function 558 @system unittest 559 { 560 spawn({ 561 ownerTid.send("This is so great!"); 562 }); 563 assert(receiveOnly!string == "This is so great!"); 564 } 565 566 @system unittest 567 { 568 import core.thread : thread_joinAll; 569 570 __gshared string receivedMessage; 571 static void f1(string msg) 572 { 573 receivedMessage = msg; 574 } 575 576 auto tid1 = spawn(&f1, "Hello World"); 577 thread_joinAll; 578 assert(receivedMessage == "Hello World"); 579 } 580 581 /** 582 * Starts `fn(args)` in a logical thread and will receive a `LinkTerminated` 583 * message when the operation terminates. 584 * 585 * Executes the supplied function in a new logical thread represented by 586 * `Tid`. This new thread is linked to the calling thread so that if either 587 * it or the calling thread terminates a `LinkTerminated` message will be sent 588 * to the other, causing a `LinkTerminated` exception to be thrown on `receive()`. 589 * The owner relationship from `spawn()` is preserved as well, so if the link 590 * between threads is broken, owner termination will still result in an 591 * `OwnerTerminated` exception to be thrown on `receive()`. 592 * 593 * Params: 594 * fn = The function to execute. 595 * args = Arguments to the function. 596 * 597 * Returns: 598 * A Tid representing the new thread. 599 */ 600 Tid spawnLinked(F, T...)(F fn, T args) 601 if (isSpawnable!(F, T)) 602 { 603 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 604 return _spawn(true, fn, args); 605 } 606 607 /* 608 * 609 */ 610 private Tid _spawn(F, T...)(bool linked, F fn, T args) 611 if (isSpawnable!(F, T)) 612 { 613 // TODO: MessageList and &exec should be shared. 614 auto spawnTid = Tid(new MessageBox); 615 auto ownerTid = thisTid; 616 617 void exec() 618 { 619 thisInfo.ident = spawnTid; 620 thisInfo.owner = ownerTid; 621 fn(args); 622 } 623 624 // TODO: MessageList and &exec should be shared. 625 if (scheduler !is null) 626 scheduler.spawn(&exec); 627 else 628 { 629 auto t = new Thread(&exec); 630 t.start(); 631 } 632 thisInfo.links[spawnTid] = linked; 633 return spawnTid; 634 } 635 636 @system unittest 637 { 638 void function() fn1; 639 void function(int) fn2; 640 static assert(__traits(compiles, spawn(fn1))); 641 static assert(__traits(compiles, spawn(fn2, 2))); 642 static assert(!__traits(compiles, spawn(fn1, 1))); 643 static assert(!__traits(compiles, spawn(fn2))); 644 645 void delegate(int) shared dg1; 646 shared(void delegate(int)) dg2; 647 shared(void delegate(long) shared) dg3; 648 shared(void delegate(real, int, long) shared) dg4; 649 void delegate(int) immutable dg5; 650 void delegate(int) dg6; 651 static assert(__traits(compiles, spawn(dg1, 1))); 652 static assert(__traits(compiles, spawn(dg2, 2))); 653 static assert(__traits(compiles, spawn(dg3, 3))); 654 static assert(__traits(compiles, spawn(dg4, 4, 4, 4))); 655 static assert(__traits(compiles, spawn(dg5, 5))); 656 static assert(!__traits(compiles, spawn(dg6, 6))); 657 658 auto callable1 = new class{ void opCall(int) shared {} }; 659 auto callable2 = cast(shared) new class{ void opCall(int) shared {} }; 660 auto callable3 = new class{ void opCall(int) immutable {} }; 661 auto callable4 = cast(immutable) new class{ void opCall(int) immutable {} }; 662 auto callable5 = new class{ void opCall(int) {} }; 663 auto callable6 = cast(shared) new class{ void opCall(int) immutable {} }; 664 auto callable7 = cast(immutable) new class{ void opCall(int) shared {} }; 665 auto callable8 = cast(shared) new class{ void opCall(int) const shared {} }; 666 auto callable9 = cast(const shared) new class{ void opCall(int) shared {} }; 667 auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} }; 668 auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} }; 669 static assert(!__traits(compiles, spawn(callable1, 1))); 670 static assert( __traits(compiles, spawn(callable2, 2))); 671 static assert(!__traits(compiles, spawn(callable3, 3))); 672 static assert( __traits(compiles, spawn(callable4, 4))); 673 static assert(!__traits(compiles, spawn(callable5, 5))); 674 static assert(!__traits(compiles, spawn(callable6, 6))); 675 static assert(!__traits(compiles, spawn(callable7, 7))); 676 static assert( __traits(compiles, spawn(callable8, 8))); 677 static assert(!__traits(compiles, spawn(callable9, 9))); 678 static assert( __traits(compiles, spawn(callable10, 10))); 679 static assert( __traits(compiles, spawn(callable11, 11))); 680 } 681 682 /** 683 * Places the values as a message at the back of tid's message queue. 684 * 685 * Sends the supplied value to the thread represented by tid. As with 686 * $(REF spawn, std,concurrency), `T` must not have unshared aliasing. 687 */ 688 void send(T...)(Tid tid, T vals) 689 in (tid.mbox !is null) 690 { 691 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 692 _send(tid, vals); 693 } 694 695 /** 696 * Places the values as a message on the front of tid's message queue. 697 * 698 * Send a message to `tid` but place it at the front of `tid`'s message 699 * queue instead of at the back. This function is typically used for 700 * out-of-band communication, to signal exceptional conditions, etc. 701 */ 702 void prioritySend(T...)(Tid tid, T vals) 703 in (tid.mbox !is null) 704 { 705 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 706 _send(MsgType.priority, tid, vals); 707 } 708 709 /* 710 * ditto 711 */ 712 private void _send(T...)(Tid tid, T vals) 713 in (tid.mbox !is null) 714 { 715 _send(MsgType.standard, tid, vals); 716 } 717 718 /* 719 * Implementation of send. This allows parameter checking to be different for 720 * both Tid.send() and .send(). 721 */ 722 private void _send(T...)(MsgType type, Tid tid, T vals) 723 in (tid.mbox !is null) 724 { 725 auto msg = Message(type, vals); 726 tid.mbox.put(msg); 727 } 728 729 /** 730 * Receives a message from another thread. 731 * 732 * Receive a message from another thread, or block if no messages of the 733 * specified types are available. This function works by pattern matching 734 * a message against a set of delegates and executing the first match found. 735 * 736 * If a delegate that accepts a $(REF Variant, std,variant) is included as 737 * the last argument to `receive`, it will match any message that was not 738 * matched by an earlier delegate. If more than one argument is sent, 739 * the `Variant` will contain a $(REF Tuple, std,typecons) of all values 740 * sent. 741 * 742 * Params: 743 * ops = Variadic list of function pointers and delegates. Entries 744 * in this list must not occlude later entries. 745 * 746 * Throws: $(LREF OwnerTerminated) when the sending thread was terminated. 747 */ 748 void receive(T...)( T ops ) 749 in 750 { 751 assert(thisInfo.ident.mbox !is null, 752 "Cannot receive a message until a thread was spawned " 753 ~ "or thisTid was passed to a running thread."); 754 } 755 do 756 { 757 checkops( ops ); 758 759 thisInfo.ident.mbox.get( ops ); 760 } 761 762 /// 763 @system unittest 764 { 765 import std.variant : Variant; 766 767 auto process = () 768 { 769 receive( 770 (int i) { ownerTid.send(1); }, 771 (double f) { ownerTid.send(2); }, 772 (Variant v) { ownerTid.send(3); } 773 ); 774 }; 775 776 { 777 auto tid = spawn(process); 778 send(tid, 42); 779 assert(receiveOnly!int == 1); 780 } 781 782 { 783 auto tid = spawn(process); 784 send(tid, 3.14); 785 assert(receiveOnly!int == 2); 786 } 787 788 { 789 auto tid = spawn(process); 790 send(tid, "something else"); 791 assert(receiveOnly!int == 3); 792 } 793 } 794 795 @safe unittest 796 { 797 static assert( __traits( compiles, 798 { 799 receive( (Variant x) {} ); 800 receive( (int x) {}, (Variant x) {} ); 801 } ) ); 802 803 static assert( !__traits( compiles, 804 { 805 receive( (Variant x) {}, (int x) {} ); 806 } ) ); 807 808 static assert( !__traits( compiles, 809 { 810 receive( (int x) {}, (int x) {} ); 811 } ) ); 812 } 813 814 // Make sure receive() works with free functions as well. 815 version (StdUnittest) 816 { 817 private void receiveFunction(int x) {} 818 } 819 @safe unittest 820 { 821 static assert( __traits( compiles, 822 { 823 receive( &receiveFunction ); 824 receive( &receiveFunction, (Variant x) {} ); 825 } ) ); 826 } 827 828 829 private template receiveOnlyRet(T...) 830 { 831 static if ( T.length == 1 ) 832 { 833 alias receiveOnlyRet = T[0]; 834 } 835 else 836 { 837 import std.typecons : Tuple; 838 alias receiveOnlyRet = Tuple!(T); 839 } 840 } 841 842 /** 843 * Receives only messages with arguments of the specified types. 844 * 845 * Params: 846 * T = Variadic list of types to be received. 847 * 848 * Returns: The received message. If `T` has more than one entry, 849 * the message will be packed into a $(REF Tuple, std,typecons). 850 * 851 * Throws: $(LREF MessageMismatch) if a message of types other than `T` 852 * is received, 853 * $(LREF OwnerTerminated) when the sending thread was terminated. 854 */ 855 receiveOnlyRet!(T) receiveOnly(T...)() 856 in 857 { 858 assert(thisInfo.ident.mbox !is null, 859 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); 860 } 861 do 862 { 863 import std.format : format; 864 import std.meta : allSatisfy; 865 import std.typecons : Tuple; 866 867 Tuple!(T) ret; 868 869 thisInfo.ident.mbox.get((T val) { 870 static if (T.length) 871 { 872 static if (allSatisfy!(isAssignable, T)) 873 { 874 ret.field = val; 875 } 876 else 877 { 878 import core.lifetime : emplace; 879 emplace(&ret, val); 880 } 881 } 882 }, 883 (LinkTerminated e) { throw e; }, 884 (OwnerTerminated e) { throw e; }, 885 (Variant val) { 886 static if (T.length > 1) 887 string exp = T.stringof; 888 else 889 string exp = T[0].stringof; 890 891 throw new MessageMismatch( 892 format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString())); 893 }); 894 static if (T.length == 1) 895 return ret[0]; 896 else 897 return ret; 898 } 899 900 /// 901 @system unittest 902 { 903 auto tid = spawn( 904 { 905 assert(receiveOnly!int == 42); 906 }); 907 send(tid, 42); 908 } 909 910 /// 911 @system unittest 912 { 913 auto tid = spawn( 914 { 915 assert(receiveOnly!string == "text"); 916 }); 917 send(tid, "text"); 918 } 919 920 /// 921 @system unittest 922 { 923 struct Record { string name; int age; } 924 925 auto tid = spawn( 926 { 927 auto msg = receiveOnly!(double, Record); 928 assert(msg[0] == 0.5); 929 assert(msg[1].name == "Alice"); 930 assert(msg[1].age == 31); 931 }); 932 933 send(tid, 0.5, Record("Alice", 31)); 934 } 935 936 @system unittest 937 { 938 static void t1(Tid mainTid) 939 { 940 try 941 { 942 receiveOnly!string(); 943 mainTid.send(""); 944 } 945 catch (Throwable th) 946 { 947 mainTid.send(th.msg); 948 } 949 } 950 951 auto tid = spawn(&t1, thisTid); 952 tid.send(1); 953 string result = receiveOnly!string(); 954 assert(result == "Unexpected message type: expected 'string', got 'int'"); 955 } 956 957 // https://issues.dlang.org/show_bug.cgi?id=21663 958 @safe unittest 959 { 960 alias test = receiveOnly!(string, bool, bool); 961 } 962 963 /** 964 * Receives a message from another thread and gives up if no match 965 * arrives within a specified duration. 966 * 967 * Receive a message from another thread, or block until `duration` exceeds, 968 * if no messages of the specified types are available. This function works 969 * by pattern matching a message against a set of delegates and executing 970 * the first match found. 971 * 972 * If a delegate that accepts a $(REF Variant, std,variant) is included as 973 * the last argument, it will match any message that was not 974 * matched by an earlier delegate. If more than one argument is sent, 975 * the `Variant` will contain a $(REF Tuple, std,typecons) of all values 976 * sent. 977 * 978 * Params: 979 * duration = Duration, how long to wait. If `duration` is negative, 980 * won't wait at all. 981 * ops = Variadic list of function pointers and delegates. Entries 982 * in this list must not occlude later entries. 983 * 984 * Returns: `true` if it received a message and `false` if it timed out waiting 985 * for one. 986 * 987 * Throws: $(LREF OwnerTerminated) when the sending thread was terminated. 988 */ 989 bool receiveTimeout(T...)(Duration duration, T ops) 990 in 991 { 992 assert(thisInfo.ident.mbox !is null, 993 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); 994 } 995 do 996 { 997 checkops(ops); 998 999 return thisInfo.ident.mbox.get(duration, ops); 1000 } 1001 1002 @safe unittest 1003 { 1004 static assert(__traits(compiles, { 1005 receiveTimeout(msecs(0), (Variant x) {}); 1006 receiveTimeout(msecs(0), (int x) {}, (Variant x) {}); 1007 })); 1008 1009 static assert(!__traits(compiles, { 1010 receiveTimeout(msecs(0), (Variant x) {}, (int x) {}); 1011 })); 1012 1013 static assert(!__traits(compiles, { 1014 receiveTimeout(msecs(0), (int x) {}, (int x) {}); 1015 })); 1016 1017 static assert(__traits(compiles, { 1018 receiveTimeout(msecs(10), (int x) {}, (Variant x) {}); 1019 })); 1020 } 1021 1022 // MessageBox Limits 1023 1024 /** 1025 * These behaviors may be specified when a mailbox is full. 1026 */ 1027 enum OnCrowding 1028 { 1029 block, /// Wait until room is available. 1030 throwException, /// Throw a $(LREF MailboxFull) exception. 1031 ignore /// Abort the send and return. 1032 } 1033 1034 private 1035 { 1036 bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc 1037 { 1038 return true; 1039 } 1040 1041 bool onCrowdingThrow(Tid tid) @safe pure 1042 { 1043 throw new MailboxFull(tid); 1044 } 1045 1046 bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc 1047 { 1048 return false; 1049 } 1050 } 1051 1052 /** 1053 * Sets a maximum mailbox size. 1054 * 1055 * Sets a limit on the maximum number of user messages allowed in the mailbox. 1056 * If this limit is reached, the caller attempting to add a new message will 1057 * execute the behavior specified by doThis. If messages is zero, the mailbox 1058 * is unbounded. 1059 * 1060 * Params: 1061 * tid = The Tid of the thread for which this limit should be set. 1062 * messages = The maximum number of messages or zero if no limit. 1063 * doThis = The behavior executed when a message is sent to a full 1064 * mailbox. 1065 */ 1066 void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure 1067 in (tid.mbox !is null) 1068 { 1069 final switch (doThis) 1070 { 1071 case OnCrowding.block: 1072 return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock); 1073 case OnCrowding.throwException: 1074 return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow); 1075 case OnCrowding.ignore: 1076 return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore); 1077 } 1078 } 1079 1080 /** 1081 * Sets a maximum mailbox size. 1082 * 1083 * Sets a limit on the maximum number of user messages allowed in the mailbox. 1084 * If this limit is reached, the caller attempting to add a new message will 1085 * execute onCrowdingDoThis. If messages is zero, the mailbox is unbounded. 1086 * 1087 * Params: 1088 * tid = The Tid of the thread for which this limit should be set. 1089 * messages = The maximum number of messages or zero if no limit. 1090 * onCrowdingDoThis = The routine called when a message is sent to a full 1091 * mailbox. 1092 */ 1093 void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis) 1094 in (tid.mbox !is null) 1095 { 1096 tid.mbox.setMaxMsgs(messages, onCrowdingDoThis); 1097 } 1098 1099 private 1100 { 1101 __gshared Tid[string] tidByName; 1102 __gshared string[][Tid] namesByTid; 1103 } 1104 1105 private @property Mutex registryLock() @system 1106 { 1107 __gshared Mutex impl; 1108 initOnce!impl(new Mutex); 1109 return impl; 1110 } 1111 1112 private void unregisterMe(ref ThreadInfo me) 1113 { 1114 if (me.ident != Tid.init) 1115 { 1116 synchronized (registryLock) 1117 { 1118 if (auto allNames = me.ident in namesByTid) 1119 { 1120 foreach (name; *allNames) 1121 tidByName.remove(name); 1122 namesByTid.remove(me.ident); 1123 } 1124 } 1125 } 1126 } 1127 1128 /** 1129 * Associates name with tid. 1130 * 1131 * Associates name with tid in a process-local map. When the thread 1132 * represented by tid terminates, any names associated with it will be 1133 * automatically unregistered. 1134 * 1135 * Params: 1136 * name = The name to associate with tid. 1137 * tid = The tid register by name. 1138 * 1139 * Returns: 1140 * true if the name is available and tid is not known to represent a 1141 * defunct thread. 1142 */ 1143 bool register(string name, Tid tid) 1144 in (tid.mbox !is null) 1145 { 1146 synchronized (registryLock) 1147 { 1148 if (name in tidByName) 1149 return false; 1150 if (tid.mbox.isClosed) 1151 return false; 1152 namesByTid[tid] ~= name; 1153 tidByName[name] = tid; 1154 return true; 1155 } 1156 } 1157 1158 /** 1159 * Removes the registered name associated with a tid. 1160 * 1161 * Params: 1162 * name = The name to unregister. 1163 * 1164 * Returns: 1165 * true if the name is registered, false if not. 1166 */ 1167 bool unregister(string name) 1168 { 1169 import std.algorithm.mutation : remove, SwapStrategy; 1170 import std.algorithm.searching : countUntil; 1171 1172 synchronized (registryLock) 1173 { 1174 if (auto tid = name in tidByName) 1175 { 1176 auto allNames = *tid in namesByTid; 1177 auto pos = countUntil(*allNames, name); 1178 remove!(SwapStrategy.unstable)(*allNames, pos); 1179 tidByName.remove(name); 1180 return true; 1181 } 1182 return false; 1183 } 1184 } 1185 1186 /** 1187 * Gets the `Tid` associated with name. 1188 * 1189 * Params: 1190 * name = The name to locate within the registry. 1191 * 1192 * Returns: 1193 * The associated `Tid` or `Tid.init` if name is not registered. 1194 */ 1195 Tid locate(string name) 1196 { 1197 synchronized (registryLock) 1198 { 1199 if (auto tid = name in tidByName) 1200 return *tid; 1201 return Tid.init; 1202 } 1203 } 1204 1205 /** 1206 * Encapsulates all implementation-level data needed for scheduling. 1207 * 1208 * When defining a $(LREF Scheduler), an instance of this struct must be associated 1209 * with each logical thread. It contains all implementation-level information 1210 * needed by the internal API. 1211 */ 1212 struct ThreadInfo 1213 { 1214 Tid ident; 1215 bool[Tid] links; 1216 Tid owner; 1217 1218 /** 1219 * Gets a thread-local instance of `ThreadInfo`. 1220 * 1221 * Gets a thread-local instance of `ThreadInfo`, which should be used as the 1222 * default instance when info is requested for a thread not created by the 1223 * `Scheduler`. 1224 */ 1225 static @property ref thisInfo() nothrow 1226 { 1227 static ThreadInfo val; 1228 return val; 1229 } 1230 1231 /** 1232 * Cleans up this ThreadInfo. 1233 * 1234 * This must be called when a scheduled thread terminates. It tears down 1235 * the messaging system for the thread and notifies interested parties of 1236 * the thread's termination. 1237 */ 1238 void cleanup() 1239 { 1240 if (ident.mbox !is null) 1241 ident.mbox.close(); 1242 foreach (tid; links.keys) 1243 _send(MsgType.linkDead, tid, ident); 1244 if (owner != Tid.init) 1245 _send(MsgType.linkDead, owner, ident); 1246 unregisterMe(this); // clean up registry entries 1247 } 1248 1249 // https://issues.dlang.org/show_bug.cgi?id=20160 1250 @system unittest 1251 { 1252 register("main_thread", thisTid()); 1253 1254 ThreadInfo t; 1255 t.cleanup(); 1256 1257 assert(locate("main_thread") == thisTid()); 1258 } 1259 } 1260 1261 /** 1262 * A `Scheduler` controls how threading is performed by spawn. 1263 * 1264 * Implementing a `Scheduler` allows the concurrency mechanism used by this 1265 * module to be customized according to different needs. By default, a call 1266 * to spawn will create a new kernel thread that executes the supplied routine 1267 * and terminates when finished. But it is possible to create `Scheduler`s that 1268 * reuse threads, that multiplex `Fiber`s (coroutines) across a single thread, 1269 * or any number of other approaches. By making the choice of `Scheduler` a 1270 * user-level option, `std.concurrency` may be used for far more types of 1271 * application than if this behavior were predefined. 1272 * 1273 * Example: 1274 * --- 1275 * import std.concurrency; 1276 * import std.stdio; 1277 * 1278 * void main() 1279 * { 1280 * scheduler = new FiberScheduler; 1281 * scheduler.start( 1282 * { 1283 * writeln("the rest of main goes here"); 1284 * }); 1285 * } 1286 * --- 1287 * 1288 * Some schedulers have a dispatching loop that must run if they are to work 1289 * properly, so for the sake of consistency, when using a scheduler, `start()` 1290 * must be called within `main()`. This yields control to the scheduler and 1291 * will ensure that any spawned threads are executed in an expected manner. 1292 */ 1293 interface Scheduler 1294 { 1295 /** 1296 * Spawns the supplied op and starts the `Scheduler`. 1297 * 1298 * This is intended to be called at the start of the program to yield all 1299 * scheduling to the active `Scheduler` instance. This is necessary for 1300 * schedulers that explicitly dispatch threads rather than simply relying 1301 * on the operating system to do so, and so start should always be called 1302 * within `main()` to begin normal program execution. 1303 * 1304 * Params: 1305 * op = A wrapper for whatever the main thread would have done in the 1306 * absence of a custom scheduler. It will be automatically executed 1307 * via a call to spawn by the `Scheduler`. 1308 */ 1309 void start(void delegate() op); 1310 1311 /** 1312 * Assigns a logical thread to execute the supplied op. 1313 * 1314 * This routine is called by spawn. It is expected to instantiate a new 1315 * logical thread and run the supplied operation. This thread must call 1316 * `thisInfo.cleanup()` when the thread terminates if the scheduled thread 1317 * is not a kernel thread--all kernel threads will have their `ThreadInfo` 1318 * cleaned up automatically by a thread-local destructor. 1319 * 1320 * Params: 1321 * op = The function to execute. This may be the actual function passed 1322 * by the user to spawn itself, or may be a wrapper function. 1323 */ 1324 void spawn(void delegate() op); 1325 1326 /** 1327 * Yields execution to another logical thread. 1328 * 1329 * This routine is called at various points within concurrency-aware APIs 1330 * to provide a scheduler a chance to yield execution when using some sort 1331 * of cooperative multithreading model. If this is not appropriate, such 1332 * as when each logical thread is backed by a dedicated kernel thread, 1333 * this routine may be a no-op. 1334 */ 1335 void yield() nothrow; 1336 1337 /** 1338 * Returns an appropriate `ThreadInfo` instance. 1339 * 1340 * Returns an instance of `ThreadInfo` specific to the logical thread that 1341 * is calling this routine or, if the calling thread was not create by 1342 * this scheduler, returns `ThreadInfo.thisInfo` instead. 1343 */ 1344 @property ref ThreadInfo thisInfo() nothrow; 1345 1346 /** 1347 * Creates a `Condition` variable analog for signaling. 1348 * 1349 * Creates a new `Condition` variable analog which is used to check for and 1350 * to signal the addition of messages to a thread's message queue. Like 1351 * yield, some schedulers may need to define custom behavior so that calls 1352 * to `Condition.wait()` yield to another thread when no new messages are 1353 * available instead of blocking. 1354 * 1355 * Params: 1356 * m = The `Mutex` that will be associated with this condition. It will be 1357 * locked prior to any operation on the condition, and so in some 1358 * cases a `Scheduler` may need to hold this reference and unlock the 1359 * mutex before yielding execution to another logical thread. 1360 */ 1361 Condition newCondition(Mutex m) nothrow; 1362 } 1363 1364 /** 1365 * An example `Scheduler` using kernel threads. 1366 * 1367 * This is an example `Scheduler` that mirrors the default scheduling behavior 1368 * of creating one kernel thread per call to spawn. It is fully functional 1369 * and may be instantiated and used, but is not a necessary part of the 1370 * default functioning of this module. 1371 */ 1372 class ThreadScheduler : Scheduler 1373 { 1374 /** 1375 * This simply runs op directly, since no real scheduling is needed by 1376 * this approach. 1377 */ 1378 void start(void delegate() op) 1379 { 1380 op(); 1381 } 1382 1383 /** 1384 * Creates a new kernel thread and assigns it to run the supplied op. 1385 */ 1386 void spawn(void delegate() op) 1387 { 1388 auto t = new Thread(op); 1389 t.start(); 1390 } 1391 1392 /** 1393 * This scheduler does no explicit multiplexing, so this is a no-op. 1394 */ 1395 void yield() nothrow 1396 { 1397 // no explicit yield needed 1398 } 1399 1400 /** 1401 * Returns `ThreadInfo.thisInfo`, since it is a thread-local instance of 1402 * `ThreadInfo`, which is the correct behavior for this scheduler. 1403 */ 1404 @property ref ThreadInfo thisInfo() nothrow 1405 { 1406 return ThreadInfo.thisInfo; 1407 } 1408 1409 /** 1410 * Creates a new `Condition` variable. No custom behavior is needed here. 1411 */ 1412 Condition newCondition(Mutex m) nothrow 1413 { 1414 return new Condition(m); 1415 } 1416 } 1417 1418 /** 1419 * An example `Scheduler` using $(MREF_ALTTEXT `Fiber`s, core, thread, fiber). 1420 * 1421 * This is an example scheduler that creates a new `Fiber` per call to spawn 1422 * and multiplexes the execution of all fibers within the main thread. 1423 */ 1424 class FiberScheduler : Scheduler 1425 { 1426 /** 1427 * This creates a new `Fiber` for the supplied op and then starts the 1428 * dispatcher. 1429 */ 1430 void start(void delegate() op) 1431 { 1432 create(op); 1433 dispatch(); 1434 } 1435 1436 /** 1437 * This created a new `Fiber` for the supplied op and adds it to the 1438 * dispatch list. 1439 */ 1440 void spawn(void delegate() op) nothrow 1441 { 1442 create(op); 1443 yield(); 1444 } 1445 1446 /** 1447 * If the caller is a scheduled `Fiber`, this yields execution to another 1448 * scheduled `Fiber`. 1449 */ 1450 void yield() nothrow 1451 { 1452 // NOTE: It's possible that we should test whether the calling Fiber 1453 // is an InfoFiber before yielding, but I think it's reasonable 1454 // that any (non-Generator) fiber should yield here. 1455 if (Fiber.getThis()) 1456 Fiber.yield(); 1457 } 1458 1459 /** 1460 * Returns an appropriate `ThreadInfo` instance. 1461 * 1462 * Returns a `ThreadInfo` instance specific to the calling `Fiber` if the 1463 * `Fiber` was created by this dispatcher, otherwise it returns 1464 * `ThreadInfo.thisInfo`. 1465 */ 1466 @property ref ThreadInfo thisInfo() nothrow 1467 { 1468 auto f = cast(InfoFiber) Fiber.getThis(); 1469 1470 if (f !is null) 1471 return f.info; 1472 return ThreadInfo.thisInfo; 1473 } 1474 1475 /** 1476 * Returns a `Condition` analog that yields when wait or notify is called. 1477 * 1478 * Bug: 1479 * For the default implementation, `notifyAll` will behave like `notify`. 1480 * 1481 * Params: 1482 * m = A `Mutex` to use for locking if the condition needs to be waited on 1483 * or notified from multiple `Thread`s. 1484 * If `null`, no `Mutex` will be used and it is assumed that the 1485 * `Condition` is only waited on/notified from one `Thread`. 1486 */ 1487 Condition newCondition(Mutex m) nothrow 1488 { 1489 return new FiberCondition(m); 1490 } 1491 1492 protected: 1493 /** 1494 * Creates a new `Fiber` which calls the given delegate. 1495 * 1496 * Params: 1497 * op = The delegate the fiber should call 1498 */ 1499 void create(void delegate() op) nothrow 1500 { 1501 void wrap() 1502 { 1503 scope (exit) 1504 { 1505 thisInfo.cleanup(); 1506 } 1507 op(); 1508 } 1509 1510 m_fibers ~= new InfoFiber(&wrap); 1511 } 1512 1513 /** 1514 * `Fiber` which embeds a `ThreadInfo` 1515 */ 1516 static class InfoFiber : Fiber 1517 { 1518 ThreadInfo info; 1519 1520 this(void delegate() op) nothrow 1521 { 1522 super(op); 1523 } 1524 1525 this(void delegate() op, size_t sz) nothrow 1526 { 1527 super(op, sz); 1528 } 1529 } 1530 1531 private: 1532 class FiberCondition : Condition 1533 { 1534 this(Mutex m) nothrow 1535 { 1536 super(m); 1537 notified = false; 1538 } 1539 1540 override void wait() nothrow 1541 { 1542 scope (exit) notified = false; 1543 1544 while (!notified) 1545 switchContext(); 1546 } 1547 1548 override bool wait(Duration period) nothrow 1549 { 1550 import core.time : MonoTime; 1551 1552 scope (exit) notified = false; 1553 1554 for (auto limit = MonoTime.currTime + period; 1555 !notified && !period.isNegative; 1556 period = limit - MonoTime.currTime) 1557 { 1558 this.outer.yield(); 1559 } 1560 return notified; 1561 } 1562 1563 override void notify() nothrow 1564 { 1565 notified = true; 1566 switchContext(); 1567 } 1568 1569 override void notifyAll() nothrow 1570 { 1571 notified = true; 1572 switchContext(); 1573 } 1574 1575 private: 1576 void switchContext() nothrow 1577 { 1578 if (mutex_nothrow) mutex_nothrow.unlock_nothrow(); 1579 scope (exit) 1580 if (mutex_nothrow) 1581 mutex_nothrow.lock_nothrow(); 1582 this.outer.yield(); 1583 } 1584 1585 bool notified; 1586 } 1587 1588 void dispatch() 1589 { 1590 import std.algorithm.mutation : remove; 1591 1592 while (m_fibers.length > 0) 1593 { 1594 auto t = m_fibers[m_pos].call(Fiber.Rethrow.no); 1595 if (t !is null && !(cast(OwnerTerminated) t)) 1596 { 1597 throw t; 1598 } 1599 if (m_fibers[m_pos].state == Fiber.State.TERM) 1600 { 1601 if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length) 1602 m_pos = 0; 1603 } 1604 else if (m_pos++ >= m_fibers.length - 1) 1605 { 1606 m_pos = 0; 1607 } 1608 } 1609 } 1610 1611 Fiber[] m_fibers; 1612 size_t m_pos; 1613 } 1614 1615 @system unittest 1616 { 1617 static void receive(Condition cond, ref size_t received) 1618 { 1619 while (true) 1620 { 1621 synchronized (cond.mutex) 1622 { 1623 cond.wait(); 1624 ++received; 1625 } 1626 } 1627 } 1628 1629 static void send(Condition cond, ref size_t sent) 1630 { 1631 while (true) 1632 { 1633 synchronized (cond.mutex) 1634 { 1635 ++sent; 1636 cond.notify(); 1637 } 1638 } 1639 } 1640 1641 auto fs = new FiberScheduler; 1642 auto mtx = new Mutex; 1643 auto cond = fs.newCondition(mtx); 1644 1645 size_t received, sent; 1646 auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); }); 1647 waiter.call(); 1648 assert(received == 0); 1649 notifier.call(); 1650 assert(sent == 1); 1651 assert(received == 0); 1652 waiter.call(); 1653 assert(received == 1); 1654 waiter.call(); 1655 assert(received == 1); 1656 } 1657 1658 /** 1659 * Sets the `Scheduler` behavior within the program. 1660 * 1661 * This variable sets the `Scheduler` behavior within this program. Typically, 1662 * when setting a `Scheduler`, `scheduler.start()` should be called in `main`. This 1663 * routine will not return until program execution is complete. 1664 */ 1665 __gshared Scheduler scheduler; 1666 1667 // Generator 1668 1669 /** 1670 * If the caller is a `Fiber` and is not a $(LREF Generator), this function will call 1671 * `scheduler.yield()` or `Fiber.yield()`, as appropriate. 1672 */ 1673 void yield() nothrow 1674 { 1675 auto fiber = Fiber.getThis(); 1676 if (!(cast(IsGenerator) fiber)) 1677 { 1678 if (scheduler is null) 1679 { 1680 if (fiber) 1681 return Fiber.yield(); 1682 } 1683 else 1684 scheduler.yield(); 1685 } 1686 } 1687 1688 /// Used to determine whether a Generator is running. 1689 private interface IsGenerator {} 1690 1691 1692 /** 1693 * A Generator is a $(MREF_ALTTEXT Fiber, core, thread, fiber) 1694 * that periodically returns values of type `T` to the 1695 * caller via `yield`. This is represented as an InputRange. 1696 */ 1697 class Generator(T) : 1698 Fiber, IsGenerator, InputRange!T 1699 { 1700 /** 1701 * Initializes a generator object which is associated with a static 1702 * D function. The function will be called once to prepare the range 1703 * for iteration. 1704 * 1705 * Params: 1706 * fn = The fiber function. 1707 * 1708 * In: 1709 * fn must not be null. 1710 */ 1711 this(void function() fn) 1712 { 1713 super(fn); 1714 call(); 1715 } 1716 1717 /** 1718 * Initializes a generator object which is associated with a static 1719 * D function. The function will be called once to prepare the range 1720 * for iteration. 1721 * 1722 * Params: 1723 * fn = The fiber function. 1724 * sz = The stack size for this fiber. 1725 * 1726 * In: 1727 * fn must not be null. 1728 */ 1729 this(void function() fn, size_t sz) 1730 { 1731 super(fn, sz); 1732 call(); 1733 } 1734 1735 /** 1736 * Initializes a generator object which is associated with a static 1737 * D function. The function will be called once to prepare the range 1738 * for iteration. 1739 * 1740 * Params: 1741 * fn = The fiber function. 1742 * sz = The stack size for this fiber. 1743 * guardPageSize = size of the guard page to trap fiber's stack 1744 * overflows. Refer to $(REF Fiber, core,thread)'s 1745 * documentation for more details. 1746 * 1747 * In: 1748 * fn must not be null. 1749 */ 1750 this(void function() fn, size_t sz, size_t guardPageSize) 1751 { 1752 super(fn, sz, guardPageSize); 1753 call(); 1754 } 1755 1756 /** 1757 * Initializes a generator object which is associated with a dynamic 1758 * D function. The function will be called once to prepare the range 1759 * for iteration. 1760 * 1761 * Params: 1762 * dg = The fiber function. 1763 * 1764 * In: 1765 * dg must not be null. 1766 */ 1767 this(void delegate() dg) 1768 { 1769 super(dg); 1770 call(); 1771 } 1772 1773 /** 1774 * Initializes a generator object which is associated with a dynamic 1775 * D function. The function will be called once to prepare the range 1776 * for iteration. 1777 * 1778 * Params: 1779 * dg = The fiber function. 1780 * sz = The stack size for this fiber. 1781 * 1782 * In: 1783 * dg must not be null. 1784 */ 1785 this(void delegate() dg, size_t sz) 1786 { 1787 super(dg, sz); 1788 call(); 1789 } 1790 1791 /** 1792 * Initializes a generator object which is associated with a dynamic 1793 * D function. The function will be called once to prepare the range 1794 * for iteration. 1795 * 1796 * Params: 1797 * dg = The fiber function. 1798 * sz = The stack size for this fiber. 1799 * guardPageSize = size of the guard page to trap fiber's stack 1800 * overflows. Refer to $(REF Fiber, core,thread)'s 1801 * documentation for more details. 1802 * 1803 * In: 1804 * dg must not be null. 1805 */ 1806 this(void delegate() dg, size_t sz, size_t guardPageSize) 1807 { 1808 super(dg, sz, guardPageSize); 1809 call(); 1810 } 1811 1812 /** 1813 * Returns true if the generator is empty. 1814 */ 1815 final bool empty() @property 1816 { 1817 return m_value is null || state == State.TERM; 1818 } 1819 1820 /** 1821 * Obtains the next value from the underlying function. 1822 */ 1823 final void popFront() 1824 { 1825 call(); 1826 } 1827 1828 /** 1829 * Returns the most recently generated value by shallow copy. 1830 */ 1831 final T front() @property 1832 { 1833 return *m_value; 1834 } 1835 1836 /** 1837 * Returns the most recently generated value without executing a 1838 * copy contructor. Will not compile for element types defining a 1839 * postblit, because `Generator` does not return by reference. 1840 */ 1841 final T moveFront() 1842 { 1843 static if (!hasElaborateCopyConstructor!T) 1844 { 1845 return front; 1846 } 1847 else 1848 { 1849 static assert(0, 1850 "Fiber front is always rvalue and thus cannot be moved since it defines a postblit."); 1851 } 1852 } 1853 1854 final int opApply(scope int delegate(T) loopBody) 1855 { 1856 int broken; 1857 for (; !empty; popFront()) 1858 { 1859 broken = loopBody(front); 1860 if (broken) break; 1861 } 1862 return broken; 1863 } 1864 1865 final int opApply(scope int delegate(size_t, T) loopBody) 1866 { 1867 int broken; 1868 for (size_t i; !empty; ++i, popFront()) 1869 { 1870 broken = loopBody(i, front); 1871 if (broken) break; 1872 } 1873 return broken; 1874 } 1875 private: 1876 T* m_value; 1877 } 1878 1879 /// 1880 @system unittest 1881 { 1882 auto tid = spawn({ 1883 int i; 1884 while (i < 9) 1885 i = receiveOnly!int; 1886 1887 ownerTid.send(i * 2); 1888 }); 1889 1890 auto r = new Generator!int({ 1891 foreach (i; 1 .. 10) 1892 yield(i); 1893 }); 1894 1895 foreach (e; r) 1896 tid.send(e); 1897 1898 assert(receiveOnly!int == 18); 1899 } 1900 1901 /** 1902 * Yields a value of type T to the caller of the currently executing 1903 * generator. 1904 * 1905 * Params: 1906 * value = The value to yield. 1907 */ 1908 void yield(T)(ref T value) 1909 { 1910 Generator!T cur = cast(Generator!T) Fiber.getThis(); 1911 if (cur !is null && cur.state == Fiber.State.EXEC) 1912 { 1913 cur.m_value = &value; 1914 return Fiber.yield(); 1915 } 1916 throw new Exception("yield(T) called with no active generator for the supplied type"); 1917 } 1918 1919 /// ditto 1920 void yield(T)(T value) 1921 { 1922 yield(value); 1923 } 1924 1925 @system unittest 1926 { 1927 import core.exception; 1928 import std.exception; 1929 1930 auto mainTid = thisTid; 1931 alias testdg = () { 1932 auto tid = spawn( 1933 (Tid mainTid) { 1934 int i; 1935 scope (failure) mainTid.send(false); 1936 try 1937 { 1938 for (i = 1; i < 10; i++) 1939 { 1940 if (receiveOnly!int() != i) 1941 { 1942 mainTid.send(false); 1943 break; 1944 } 1945 } 1946 } 1947 catch (OwnerTerminated e) 1948 { 1949 // i will advance 1 past the last value expected 1950 mainTid.send(i == 4); 1951 } 1952 }, mainTid); 1953 auto r = new Generator!int( 1954 { 1955 assertThrown!Exception(yield(2.0)); 1956 yield(); // ensure this is a no-op 1957 yield(1); 1958 yield(); // also once something has been yielded 1959 yield(2); 1960 yield(3); 1961 }); 1962 1963 foreach (e; r) 1964 { 1965 tid.send(e); 1966 } 1967 }; 1968 1969 scheduler = new ThreadScheduler; 1970 scheduler.spawn(testdg); 1971 assert(receiveOnly!bool()); 1972 1973 scheduler = new FiberScheduler; 1974 scheduler.start(testdg); 1975 assert(receiveOnly!bool()); 1976 scheduler = null; 1977 } 1978 /// 1979 @system unittest 1980 { 1981 import std.range; 1982 1983 InputRange!int myIota = iota(10).inputRangeObject; 1984 1985 myIota.popFront(); 1986 myIota.popFront(); 1987 assert(myIota.moveFront == 2); 1988 assert(myIota.front == 2); 1989 myIota.popFront(); 1990 assert(myIota.front == 3); 1991 1992 //can be assigned to std.range.interfaces.InputRange directly 1993 myIota = new Generator!int( 1994 { 1995 foreach (i; 0 .. 10) yield(i); 1996 }); 1997 1998 myIota.popFront(); 1999 myIota.popFront(); 2000 assert(myIota.moveFront == 2); 2001 assert(myIota.front == 2); 2002 myIota.popFront(); 2003 assert(myIota.front == 3); 2004 2005 size_t[2] counter = [0, 0]; 2006 foreach (i, unused; myIota) counter[] += [1, i]; 2007 2008 assert(myIota.empty); 2009 assert(counter == [7, 21]); 2010 } 2011 2012 private 2013 { 2014 /* 2015 * A MessageBox is a message queue for one thread. Other threads may send 2016 * messages to this owner by calling put(), and the owner receives them by 2017 * calling get(). The put() call is therefore effectively shared and the 2018 * get() call is effectively local. setMaxMsgs may be used by any thread 2019 * to limit the size of the message queue. 2020 */ 2021 class MessageBox 2022 { 2023 this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */ 2024 { 2025 m_lock = new Mutex; 2026 m_closed = false; 2027 2028 if (scheduler is null) 2029 { 2030 m_putMsg = new Condition(m_lock); 2031 m_notFull = new Condition(m_lock); 2032 } 2033 else 2034 { 2035 m_putMsg = scheduler.newCondition(m_lock); 2036 m_notFull = scheduler.newCondition(m_lock); 2037 } 2038 } 2039 2040 /// 2041 final @property bool isClosed() @safe @nogc pure 2042 { 2043 synchronized (m_lock) 2044 { 2045 return m_closed; 2046 } 2047 } 2048 2049 /* 2050 * Sets a limit on the maximum number of user messages allowed in the 2051 * mailbox. If this limit is reached, the caller attempting to add 2052 * a new message will execute call. If num is zero, there is no limit 2053 * on the message queue. 2054 * 2055 * Params: 2056 * num = The maximum size of the queue or zero if the queue is 2057 * unbounded. 2058 * call = The routine to call when the queue is full. 2059 */ 2060 final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure 2061 { 2062 synchronized (m_lock) 2063 { 2064 m_maxMsgs = num; 2065 m_onMaxMsgs = call; 2066 } 2067 } 2068 2069 /* 2070 * If maxMsgs is not set, the message is added to the queue and the 2071 * owner is notified. If the queue is full, the message will still be 2072 * accepted if it is a control message, otherwise onCrowdingDoThis is 2073 * called. If the routine returns true, this call will block until 2074 * the owner has made space available in the queue. If it returns 2075 * false, this call will abort. 2076 * 2077 * Params: 2078 * msg = The message to put in the queue. 2079 * 2080 * Throws: 2081 * An exception if the queue is full and onCrowdingDoThis throws. 2082 */ 2083 final void put(ref Message msg) 2084 { 2085 synchronized (m_lock) 2086 { 2087 // TODO: Generate an error here if m_closed is true, or maybe 2088 // put a message in the caller's queue? 2089 if (!m_closed) 2090 { 2091 while (true) 2092 { 2093 if (isPriorityMsg(msg)) 2094 { 2095 m_sharedPty.put(msg); 2096 m_putMsg.notify(); 2097 return; 2098 } 2099 if (!mboxFull() || isControlMsg(msg)) 2100 { 2101 m_sharedBox.put(msg); 2102 m_putMsg.notify(); 2103 return; 2104 } 2105 if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid)) 2106 { 2107 return; 2108 } 2109 m_putQueue++; 2110 m_notFull.wait(); 2111 m_putQueue--; 2112 } 2113 } 2114 } 2115 } 2116 2117 /* 2118 * Matches ops against each message in turn until a match is found. 2119 * 2120 * Params: 2121 * ops = The operations to match. Each may return a bool to indicate 2122 * whether a message with a matching type is truly a match. 2123 * 2124 * Returns: 2125 * true if a message was retrieved and false if not (such as if a 2126 * timeout occurred). 2127 * 2128 * Throws: 2129 * LinkTerminated if a linked thread terminated, or OwnerTerminated 2130 * if the owner thread terminates and no existing messages match the 2131 * supplied ops. 2132 */ 2133 bool get(T...)(scope T vals) 2134 { 2135 import std.meta : AliasSeq; 2136 2137 static assert(T.length, "T must not be empty"); 2138 2139 static if (is(T[0] : Duration)) 2140 { 2141 alias Ops = AliasSeq!(T[1 .. $]); 2142 alias ops = vals[1 .. $]; 2143 enum timedWait = true; 2144 Duration period = vals[0]; 2145 } 2146 else 2147 { 2148 alias Ops = AliasSeq!(T); 2149 alias ops = vals[0 .. $]; 2150 enum timedWait = false; 2151 } 2152 2153 bool onStandardMsg(ref Message msg) 2154 { 2155 foreach (i, t; Ops) 2156 { 2157 alias Args = Parameters!(t); 2158 auto op = ops[i]; 2159 2160 if (msg.convertsTo!(Args)) 2161 { 2162 alias RT = ReturnType!(t); 2163 static if (is(RT == bool)) 2164 { 2165 return msg.map(op); 2166 } 2167 else 2168 { 2169 msg.map(op); 2170 static if (!is(immutable RT == immutable noreturn)) 2171 return true; 2172 } 2173 } 2174 } 2175 return false; 2176 } 2177 2178 bool onLinkDeadMsg(ref Message msg) 2179 { 2180 assert(msg.convertsTo!(Tid), 2181 "Message could be converted to Tid"); 2182 auto tid = msg.get!(Tid); 2183 2184 if (bool* pDepends = tid in thisInfo.links) 2185 { 2186 auto depends = *pDepends; 2187 thisInfo.links.remove(tid); 2188 // Give the owner relationship precedence. 2189 if (depends && tid != thisInfo.owner) 2190 { 2191 auto e = new LinkTerminated(tid); 2192 auto m = Message(MsgType.standard, e); 2193 if (onStandardMsg(m)) 2194 return true; 2195 throw e; 2196 } 2197 } 2198 if (tid == thisInfo.owner) 2199 { 2200 thisInfo.owner = Tid.init; 2201 auto e = new OwnerTerminated(tid); 2202 auto m = Message(MsgType.standard, e); 2203 if (onStandardMsg(m)) 2204 return true; 2205 throw e; 2206 } 2207 return false; 2208 } 2209 2210 bool onControlMsg(ref Message msg) 2211 { 2212 switch (msg.type) 2213 { 2214 case MsgType.linkDead: 2215 return onLinkDeadMsg(msg); 2216 default: 2217 return false; 2218 } 2219 } 2220 2221 bool scan(ref ListT list) 2222 { 2223 for (auto range = list[]; !range.empty;) 2224 { 2225 // Only the message handler will throw, so if this occurs 2226 // we can be certain that the message was handled. 2227 scope (failure) 2228 list.removeAt(range); 2229 2230 if (isControlMsg(range.front)) 2231 { 2232 if (onControlMsg(range.front)) 2233 { 2234 // Although the linkDead message is a control message, 2235 // it can be handled by the user. Since the linkDead 2236 // message throws if not handled, if we get here then 2237 // it has been handled and we can return from receive. 2238 // This is a weird special case that will have to be 2239 // handled in a more general way if more are added. 2240 if (!isLinkDeadMsg(range.front)) 2241 { 2242 list.removeAt(range); 2243 continue; 2244 } 2245 list.removeAt(range); 2246 return true; 2247 } 2248 range.popFront(); 2249 continue; 2250 } 2251 else 2252 { 2253 if (onStandardMsg(range.front)) 2254 { 2255 list.removeAt(range); 2256 return true; 2257 } 2258 range.popFront(); 2259 continue; 2260 } 2261 } 2262 return false; 2263 } 2264 2265 bool pty(ref ListT list) 2266 { 2267 if (!list.empty) 2268 { 2269 auto range = list[]; 2270 2271 if (onStandardMsg(range.front)) 2272 { 2273 list.removeAt(range); 2274 return true; 2275 } 2276 if (range.front.convertsTo!(Throwable)) 2277 throw range.front.get!(Throwable); 2278 else if (range.front.convertsTo!(shared(Throwable))) 2279 /* Note: a shared type can be caught without the shared qualifier 2280 * so throwing shared will be an error */ 2281 throw cast() range.front.get!(shared(Throwable)); 2282 else 2283 throw new PriorityMessageException(range.front.data); 2284 } 2285 return false; 2286 } 2287 2288 static if (timedWait) 2289 { 2290 import core.time : MonoTime; 2291 auto limit = MonoTime.currTime + period; 2292 } 2293 2294 while (true) 2295 { 2296 ListT arrived; 2297 2298 if (pty(m_localPty) || scan(m_localBox)) 2299 { 2300 return true; 2301 } 2302 yield(); 2303 synchronized (m_lock) 2304 { 2305 updateMsgCount(); 2306 while (m_sharedPty.empty && m_sharedBox.empty) 2307 { 2308 // NOTE: We're notifying all waiters here instead of just 2309 // a few because the onCrowding behavior may have 2310 // changed and we don't want to block sender threads 2311 // unnecessarily if the new behavior is not to block. 2312 // This will admittedly result in spurious wakeups 2313 // in other situations, but what can you do? 2314 if (m_putQueue && !mboxFull()) 2315 m_notFull.notifyAll(); 2316 static if (timedWait) 2317 { 2318 if (period <= Duration.zero || !m_putMsg.wait(period)) 2319 return false; 2320 } 2321 else 2322 { 2323 m_putMsg.wait(); 2324 } 2325 } 2326 m_localPty.put(m_sharedPty); 2327 arrived.put(m_sharedBox); 2328 } 2329 if (m_localPty.empty) 2330 { 2331 scope (exit) m_localBox.put(arrived); 2332 if (scan(arrived)) 2333 { 2334 return true; 2335 } 2336 else 2337 { 2338 static if (timedWait) 2339 { 2340 period = limit - MonoTime.currTime; 2341 } 2342 continue; 2343 } 2344 } 2345 m_localBox.put(arrived); 2346 pty(m_localPty); 2347 return true; 2348 } 2349 } 2350 2351 /* 2352 * Called on thread termination. This routine processes any remaining 2353 * control messages, clears out message queues, and sets a flag to 2354 * reject any future messages. 2355 */ 2356 final void close() 2357 { 2358 static void onLinkDeadMsg(ref Message msg) 2359 { 2360 assert(msg.convertsTo!(Tid), 2361 "Message could be converted to Tid"); 2362 auto tid = msg.get!(Tid); 2363 2364 thisInfo.links.remove(tid); 2365 if (tid == thisInfo.owner) 2366 thisInfo.owner = Tid.init; 2367 } 2368 2369 static void sweep(ref ListT list) 2370 { 2371 for (auto range = list[]; !range.empty; range.popFront()) 2372 { 2373 if (range.front.type == MsgType.linkDead) 2374 onLinkDeadMsg(range.front); 2375 } 2376 } 2377 2378 ListT arrived; 2379 2380 sweep(m_localBox); 2381 synchronized (m_lock) 2382 { 2383 arrived.put(m_sharedBox); 2384 m_closed = true; 2385 } 2386 m_localBox.clear(); 2387 sweep(arrived); 2388 } 2389 2390 private: 2391 // Routines involving local data only, no lock needed. 2392 2393 bool mboxFull() @safe @nogc pure nothrow 2394 { 2395 return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length; 2396 } 2397 2398 void updateMsgCount() @safe @nogc pure nothrow 2399 { 2400 m_localMsgs = m_localBox.length; 2401 } 2402 2403 bool isControlMsg(ref Message msg) @safe @nogc pure nothrow 2404 { 2405 return msg.type != MsgType.standard && msg.type != MsgType.priority; 2406 } 2407 2408 bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow 2409 { 2410 return msg.type == MsgType.priority; 2411 } 2412 2413 bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow 2414 { 2415 return msg.type == MsgType.linkDead; 2416 } 2417 2418 alias OnMaxFn = bool function(Tid); 2419 alias ListT = List!(Message); 2420 2421 ListT m_localBox; 2422 ListT m_localPty; 2423 2424 Mutex m_lock; 2425 Condition m_putMsg; 2426 Condition m_notFull; 2427 size_t m_putQueue; 2428 ListT m_sharedBox; 2429 ListT m_sharedPty; 2430 OnMaxFn m_onMaxMsgs; 2431 size_t m_localMsgs; 2432 size_t m_maxMsgs; 2433 bool m_closed; 2434 } 2435 2436 /* 2437 * 2438 */ 2439 struct List(T) 2440 { 2441 struct Range 2442 { 2443 import std.exception : enforce; 2444 2445 @property bool empty() const 2446 { 2447 return !m_prev.next; 2448 } 2449 2450 @property ref T front() 2451 { 2452 enforce(m_prev.next, "invalid list node"); 2453 return m_prev.next.val; 2454 } 2455 2456 @property void front(T val) 2457 { 2458 enforce(m_prev.next, "invalid list node"); 2459 m_prev.next.val = val; 2460 } 2461 2462 void popFront() 2463 { 2464 enforce(m_prev.next, "invalid list node"); 2465 m_prev = m_prev.next; 2466 } 2467 2468 private this(Node* p) 2469 { 2470 m_prev = p; 2471 } 2472 2473 private Node* m_prev; 2474 } 2475 2476 void put(T val) 2477 { 2478 put(newNode(val)); 2479 } 2480 2481 void put(ref List!(T) rhs) 2482 { 2483 if (!rhs.empty) 2484 { 2485 put(rhs.m_first); 2486 while (m_last.next !is null) 2487 { 2488 m_last = m_last.next; 2489 m_count++; 2490 } 2491 rhs.m_first = null; 2492 rhs.m_last = null; 2493 rhs.m_count = 0; 2494 } 2495 } 2496 2497 Range opSlice() 2498 { 2499 return Range(cast(Node*)&m_first); 2500 } 2501 2502 void removeAt(Range r) 2503 { 2504 import std.exception : enforce; 2505 2506 assert(m_count, "Can not remove from empty Range"); 2507 Node* n = r.m_prev; 2508 enforce(n && n.next, "attempting to remove invalid list node"); 2509 2510 if (m_last is m_first) 2511 m_last = null; 2512 else if (m_last is n.next) 2513 m_last = n; // nocoverage 2514 Node* to_free = n.next; 2515 n.next = n.next.next; 2516 freeNode(to_free); 2517 m_count--; 2518 } 2519 2520 @property size_t length() 2521 { 2522 return m_count; 2523 } 2524 2525 void clear() 2526 { 2527 m_first = m_last = null; 2528 m_count = 0; 2529 } 2530 2531 @property bool empty() 2532 { 2533 return m_first is null; 2534 } 2535 2536 private: 2537 struct Node 2538 { 2539 Node* next; 2540 T val; 2541 2542 this(T v) 2543 { 2544 val = v; 2545 } 2546 } 2547 2548 static shared struct SpinLock 2549 { 2550 void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } } 2551 void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); } 2552 bool locked; 2553 } 2554 2555 static shared SpinLock sm_lock; 2556 static shared Node* sm_head; 2557 2558 Node* newNode(T v) 2559 { 2560 Node* n; 2561 { 2562 sm_lock.lock(); 2563 scope (exit) sm_lock.unlock(); 2564 2565 if (sm_head) 2566 { 2567 n = cast(Node*) sm_head; 2568 sm_head = sm_head.next; 2569 } 2570 } 2571 if (n) 2572 { 2573 import core.lifetime : emplace; 2574 emplace!Node(n, v); 2575 } 2576 else 2577 { 2578 n = new Node(v); 2579 } 2580 return n; 2581 } 2582 2583 void freeNode(Node* n) 2584 { 2585 // destroy val to free any owned GC memory 2586 destroy(n.val); 2587 2588 sm_lock.lock(); 2589 scope (exit) sm_lock.unlock(); 2590 2591 auto sn = cast(shared(Node)*) n; 2592 sn.next = sm_head; 2593 sm_head = sn; 2594 } 2595 2596 void put(Node* n) 2597 { 2598 m_count++; 2599 if (!empty) 2600 { 2601 m_last.next = n; 2602 m_last = n; 2603 return; 2604 } 2605 m_first = n; 2606 m_last = n; 2607 } 2608 2609 Node* m_first; 2610 Node* m_last; 2611 size_t m_count; 2612 } 2613 } 2614 2615 @system unittest 2616 { 2617 import std.typecons : tuple, Tuple; 2618 2619 static void testfn(Tid tid) 2620 { 2621 receive((float val) { assert(0); }, (int val, int val2) { 2622 assert(val == 42 && val2 == 86); 2623 }); 2624 receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); }); 2625 receive((Variant val) { }); 2626 receive((string val) { 2627 if ("the quick brown fox" != val) 2628 return false; 2629 return true; 2630 }, (string val) { assert(false); }); 2631 prioritySend(tid, "done"); 2632 } 2633 2634 static void runTest(Tid tid) 2635 { 2636 send(tid, 42, 86); 2637 send(tid, tuple(42, 86)); 2638 send(tid, "hello", "there"); 2639 send(tid, "the quick brown fox"); 2640 receive((string val) { assert(val == "done"); }); 2641 } 2642 2643 static void simpleTest() 2644 { 2645 auto tid = spawn(&testfn, thisTid); 2646 runTest(tid); 2647 2648 // Run the test again with a limited mailbox size. 2649 tid = spawn(&testfn, thisTid); 2650 setMaxMailboxSize(tid, 2, OnCrowding.block); 2651 runTest(tid); 2652 } 2653 2654 simpleTest(); 2655 2656 scheduler = new ThreadScheduler; 2657 simpleTest(); 2658 scheduler = null; 2659 } 2660 2661 private @property shared(Mutex) initOnceLock() 2662 { 2663 static shared Mutex lock; 2664 if (auto mtx = atomicLoad!(MemoryOrder.acq)(lock)) 2665 return mtx; 2666 auto mtx = new shared Mutex; 2667 if (cas(&lock, cast(shared) null, mtx)) 2668 return mtx; 2669 return atomicLoad!(MemoryOrder.acq)(lock); 2670 } 2671 2672 /** 2673 * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a 2674 * thread-safe manner. 2675 * 2676 * The implementation guarantees that all threads simultaneously calling 2677 * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is 2678 * fully initialized. All side-effects of $(D_PARAM init) are globally visible 2679 * afterwards. 2680 * 2681 * Params: 2682 * var = The variable to initialize 2683 * init = The lazy initializer value 2684 * 2685 * Returns: 2686 * A reference to the initialized variable 2687 */ 2688 auto ref initOnce(alias var)(lazy typeof(var) init) 2689 { 2690 return initOnce!var(init, initOnceLock); 2691 } 2692 2693 /// A typical use-case is to perform lazy but thread-safe initialization. 2694 @system unittest 2695 { 2696 static class MySingleton 2697 { 2698 static MySingleton instance() 2699 { 2700 __gshared MySingleton inst; 2701 return initOnce!inst(new MySingleton); 2702 } 2703 } 2704 2705 assert(MySingleton.instance !is null); 2706 } 2707 2708 @system unittest 2709 { 2710 static class MySingleton 2711 { 2712 static MySingleton instance() 2713 { 2714 __gshared MySingleton inst; 2715 return initOnce!inst(new MySingleton); 2716 } 2717 2718 private: 2719 this() { val = ++cnt; } 2720 size_t val; 2721 __gshared size_t cnt; 2722 } 2723 2724 foreach (_; 0 .. 10) 2725 spawn({ ownerTid.send(MySingleton.instance.val); }); 2726 foreach (_; 0 .. 10) 2727 assert(receiveOnly!size_t == MySingleton.instance.val); 2728 assert(MySingleton.cnt == 1); 2729 } 2730 2731 /** 2732 * Same as above, but takes a separate mutex instead of sharing one among 2733 * all initOnce instances. 2734 * 2735 * This should be used to avoid dead-locks when the $(D_PARAM init) 2736 * expression waits for the result of another thread that might also 2737 * call initOnce. Use with care. 2738 * 2739 * Params: 2740 * var = The variable to initialize 2741 * init = The lazy initializer value 2742 * mutex = A mutex to prevent race conditions 2743 * 2744 * Returns: 2745 * A reference to the initialized variable 2746 */ 2747 auto ref initOnce(alias var)(lazy typeof(var) init, shared Mutex mutex) 2748 { 2749 // check that var is global, can't take address of a TLS variable 2750 static assert(is(typeof({ __gshared p = &var; })), 2751 "var must be 'static shared' or '__gshared'."); 2752 import core.atomic : atomicLoad, MemoryOrder, atomicStore; 2753 2754 static shared bool flag; 2755 if (!atomicLoad!(MemoryOrder.acq)(flag)) 2756 { 2757 synchronized (mutex) 2758 { 2759 if (!atomicLoad!(MemoryOrder.raw)(flag)) 2760 { 2761 var = init; 2762 static if (!is(immutable typeof(var) == immutable noreturn)) 2763 atomicStore!(MemoryOrder.rel)(flag, true); 2764 } 2765 } 2766 } 2767 return var; 2768 } 2769 2770 /// ditto 2771 auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex) 2772 { 2773 return initOnce!var(init, cast(shared) mutex); 2774 } 2775 2776 /// Use a separate mutex when init blocks on another thread that might also call initOnce. 2777 @system unittest 2778 { 2779 import core.sync.mutex : Mutex; 2780 2781 static shared bool varA, varB; 2782 static shared Mutex m; 2783 m = new shared Mutex; 2784 2785 spawn({ 2786 // use a different mutex for varB to avoid a dead-lock 2787 initOnce!varB(true, m); 2788 ownerTid.send(true); 2789 }); 2790 // init depends on the result of the spawned thread 2791 initOnce!varA(receiveOnly!bool); 2792 assert(varA == true); 2793 assert(varB == true); 2794 } 2795 2796 @system unittest 2797 { 2798 static shared bool a; 2799 __gshared bool b; 2800 static bool c; 2801 bool d; 2802 initOnce!a(true); 2803 initOnce!b(true); 2804 static assert(!__traits(compiles, initOnce!c(true))); // TLS 2805 static assert(!__traits(compiles, initOnce!d(true))); // local variable 2806 } 2807 2808 // test ability to send shared arrays 2809 @system unittest 2810 { 2811 static shared int[] x = new shared(int)[1]; 2812 auto tid = spawn({ 2813 auto arr = receiveOnly!(shared(int)[]); 2814 arr[0] = 5; 2815 ownerTid.send(true); 2816 }); 2817 tid.send(x); 2818 receiveOnly!(bool); 2819 assert(x[0] == 5); 2820 } 2821 2822 // https://issues.dlang.org/show_bug.cgi?id=13930 2823 @system unittest 2824 { 2825 immutable aa = ["0":0]; 2826 thisTid.send(aa); 2827 receiveOnly!(immutable int[string]); // compile error 2828 } 2829 2830 // https://issues.dlang.org/show_bug.cgi?id=19345 2831 @system unittest 2832 { 2833 static struct Aggregate { const int a; const int[5] b; } 2834 static void t1(Tid mainTid) 2835 { 2836 const sendMe = Aggregate(42, [1, 2, 3, 4, 5]); 2837 mainTid.send(sendMe); 2838 } 2839 2840 spawn(&t1, thisTid); 2841 auto result1 = receiveOnly!(const Aggregate)(); 2842 immutable expected = Aggregate(42, [1, 2, 3, 4, 5]); 2843 assert(result1 == expected); 2844 } 2845 2846 // Noreturn support 2847 @system unittest 2848 { 2849 static noreturn foo(int) { throw new Exception(""); } 2850 2851 if (false) spawn(&foo, 1); 2852 if (false) spawnLinked(&foo, 1); 2853 2854 if (false) receive(&foo); 2855 if (false) receiveTimeout(Duration.init, &foo); 2856 2857 // Wrapped in __traits(compiles) to skip codegen which crashes dmd's backend 2858 static assert(__traits(compiles, receiveOnly!noreturn() )); 2859 static assert(__traits(compiles, send(Tid.init, noreturn.init) )); 2860 static assert(__traits(compiles, prioritySend(Tid.init, noreturn.init) )); 2861 static assert(__traits(compiles, yield(noreturn.init) )); 2862 2863 static assert(__traits(compiles, { 2864 __gshared noreturn n; 2865 initOnce!n(noreturn.init); 2866 })); 2867 }