The OpenD Programming Language

1 /**
2  * $(SCRIPT inhibitQuickIndex = 1;)
3  * $(DIVC quickindex,
5  * $(TR $(TH Category) $(TH Symbols))
6  * $(TR $(TD Tid) $(TD
7  *     $(MYREF locate)
8  *     $(MYREF ownerTid)
9  *     $(MYREF register)
10  *     $(MYREF spawn)
11  *     $(MYREF spawnLinked)
12  *     $(MYREF thisTid)
13  *     $(MYREF Tid)
14  *     $(MYREF TidMissingException)
15  *     $(MYREF unregister)
16  * ))
17  * $(TR $(TD Message passing) $(TD
18  *     $(MYREF prioritySend)
19  *     $(MYREF receive)
20  *     $(MYREF receiveOnly)
21  *     $(MYREF receiveTimeout)
22  *     $(MYREF send)
23  *     $(MYREF setMaxMailboxSize)
24  * ))
25  * $(TR $(TD Message-related types) $(TD
26  *     $(MYREF LinkTerminated)
27  *     $(MYREF MailboxFull)
28  *     $(MYREF MessageMismatch)
29  *     $(MYREF OnCrowding)
30  *     $(MYREF OwnerTerminated)
31  *     $(MYREF PriorityMessageException)
32  * ))
33  * $(TR $(TD Scheduler) $(TD
34  *     $(MYREF FiberScheduler)
35  *     $(MYREF Generator)
36  *     $(MYREF Scheduler)
37  *     $(MYREF scheduler)
38  *     $(MYREF ThreadInfo)
39  *     $(MYREF ThreadScheduler)
40  *     $(MYREF yield)
41  * ))
42  * $(TR $(TD Misc) $(TD
43  *     $(MYREF initOnce)
44  * ))
45  * ))
46  *
47  * This is a low-level messaging API upon which more structured or restrictive
48  * APIs may be built.  The general idea is that every messageable entity is
49  * represented by a common handle type called a `Tid`, which allows messages to
50  * be sent to logical threads that are executing in both the current process
51  * and in external processes using the same interface.  This is an important
52  * aspect of scalability because it allows the components of a program to be
53  * spread across available resources with few to no changes to the actual
54  * implementation.
55  *
56  * A logical thread is an execution context that has its own stack and which
57  * runs asynchronously to other logical threads.  These may be preemptively
58  * scheduled kernel threads, $(MREF_ALTTEXT fibers, core, thread, fiber)
59  * (cooperative user-space threads), or some other concept with similar behavior.
60  *
61  * The type of concurrency used when logical threads are created is determined
62  * by the $(LREF Scheduler) selected at initialization time.  The default behavior is
63  * currently to create a new kernel thread per call to spawn, but other
64  * schedulers are available that multiplex fibers across the main thread or
65  * use some combination of the two approaches.
66  *
67  * Copyright: Copyright Sean Kelly 2009 - 2014.
68  * License:   <a href="">Boost License 1.0</a>.
69  * Authors:   Sean Kelly, Alex Rønne Petersen, Martin Nowak
70  * Source:    $(PHOBOSSRC std/concurrency.d)
71  */
72 /*          Copyright Sean Kelly 2009 - 2014.
73  * Distributed under the Boost Software License, Version 1.0.
74  *    (See accompanying file LICENSE_1_0.txt or copy at
75  *
76  */
77 module std.concurrency;
79 version(WebAssembly) {
80 	struct Tid {}
81 	auto thisTid() { return Tid.init; }
82 	auto ref initOnce(alias var)(lazy typeof(var) init) { return var = init; }
83 } else:
85 public import std.variant;
87 import core.atomic;
88 import core.sync.condition;
89 import core.sync.mutex;
90 import core.thread;
91 import std.range.primitives;
92 import std.range.interfaces : InputRange;
93 import std.traits;
95 ///
96 @system unittest
97 {
98     __gshared string received;
99     static void spawnedFunc(Tid ownerTid)
100     {
101         import std.conv : text;
102         // Receive a message from the owner thread.
103         receive((int i){
104             received = text("Received the number ", i);
106             // Send a message back to the owner thread
107             // indicating success.
108             send(ownerTid, true);
109         });
110     }
112     // Start spawnedFunc in a new thread.
113     auto childTid = spawn(&spawnedFunc, thisTid);
115     // Send the number 42 to this new thread.
116     send(childTid, 42);
118     // Receive the result code.
119     auto wasSuccessful = receiveOnly!(bool);
120     assert(wasSuccessful);
121     assert(received == "Received the number 42");
122 }
124 private
125 {
126     bool hasLocalAliasing(Types...)()
127     {
128         import std.typecons : Rebindable;
130         // Works around "statement is not reachable"
131         bool doesIt = false;
132         static foreach (T; Types)
133         {
134             static if (is(T == Tid))
135             { /* Allowed */ }
136             else static if (is(T : Rebindable!R, R))
137                 doesIt |= hasLocalAliasing!R;
138             else static if (is(T == struct))
139                 doesIt |= hasLocalAliasing!(typeof(T.tupleof));
140             else
141                 doesIt |= std.traits.hasUnsharedAliasing!(T);
142         }
143         return doesIt;
144     }
146     @safe unittest
147     {
148         static struct Container { Tid t; }
149         static assert(!hasLocalAliasing!(Tid, Container, int));
150     }
152     //
153     @safe unittest
154     {
155         import std.datetime.systime : SysTime;
156         static struct Container { SysTime time; }
157         static assert(!hasLocalAliasing!(SysTime, Container));
158     }
160     enum MsgType
161     {
162         standard,
163         priority,
164         linkDead,
165     }
167     struct Message
168     {
169         MsgType type;
170         Variant data;
172         this(T...)(MsgType t, T vals) if (T.length > 0)
173         {
174             static if (T.length == 1)
175             {
176                 type = t;
177                 data = vals[0];
178             }
179             else
180             {
181                 import std.typecons : Tuple;
183                 type = t;
184                 data = Tuple!(T)(vals);
185             }
186         }
188         @property auto convertsTo(T...)()
189         {
190             static if (T.length == 1)
191             {
192                 return is(T[0] == Variant) || data.convertsTo!(T);
193             }
194             else
195             {
196                 import std.typecons : Tuple;
197                 return data.convertsTo!(Tuple!(T));
198             }
199         }
201         @property auto get(T...)()
202         {
203             static if (T.length == 1)
204             {
205                 static if (is(T[0] == Variant))
206                     return data;
207                 else
208                     return data.get!(T);
209             }
210             else
211             {
212                 import std.typecons : Tuple;
213                 return data.get!(Tuple!(T));
214             }
215         }
217         auto map(Op)(Op op)
218         {
219             alias Args = Parameters!(Op);
221             static if (Args.length == 1)
222             {
223                 static if (is(Args[0] == Variant))
224                     return op(data);
225                 else
226                     return op(data.get!(Args));
227             }
228             else
229             {
230                 import std.typecons : Tuple;
231                 return op(data.get!(Tuple!(Args)).expand);
232             }
233         }
234     }
236     void checkops(T...)(T ops)
237     {
238         import std.format : format;
240         foreach (i, t1; T)
241         {
242             static assert(isFunctionPointer!t1 || isDelegate!t1,
243                     format!"T %d is not a function pointer or delegates"(i));
244             alias a1 = Parameters!(t1);
245             alias r1 = ReturnType!(t1);
247             static if (i < T.length - 1 && is(r1 == void))
248             {
249                 static assert(a1.length != 1 || !is(a1[0] == Variant),
250                               "function with arguments " ~ a1.stringof ~
251                               " occludes successive function");
253                 foreach (t2; T[i + 1 .. $])
254                 {
255                     alias a2 = Parameters!(t2);
257                     static assert(!is(a1 == a2),
258                         "function with arguments " ~ a1.stringof ~ " occludes successive function");
259                 }
260             }
261         }
262     }
264     @property ref ThreadInfo thisInfo() nothrow
265     {
266         import core.atomic : atomicLoad;
268         auto localScheduler = atomicLoad(scheduler);
269         if (localScheduler is null)
270             return ThreadInfo.thisInfo;
271         return localScheduler.thisInfo;
272     }
273 }
275 static ~this()
276 {
277     thisInfo.cleanup();
278 }
280 // Exceptions
282 /**
283  * Thrown on calls to $(LREF receiveOnly) if a message other than the type
284  * the receiving thread expected is sent.
285  */
286 class MessageMismatch : Exception
287 {
288     ///
289     this(string msg = "Unexpected message type") @safe pure nothrow @nogc
290     {
291         super(msg);
292     }
293 }
295 /**
296  * Thrown on calls to $(LREF receive) if the thread that spawned the receiving
297  * thread has terminated and no more messages exist.
298  */
299 class OwnerTerminated : Exception
300 {
301     ///
302     this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc
303     {
304         super(msg);
305         tid = t;
306     }
308     Tid tid;
309 }
311 /**
312  * Thrown if a linked thread has terminated.
313  */
314 class LinkTerminated : Exception
315 {
316     ///
317     this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc
318     {
319         super(msg);
320         tid = t;
321     }
323     Tid tid;
324 }
326 /**
327  * Thrown if a message was sent to a thread via
328  * $(REF prioritySend, std,concurrency) and the receiver does not have a handler
329  * for a message of this type.
330  */
331 class PriorityMessageException : Exception
332 {
333     ///
334     this(Variant vals)
335     {
336         super("Priority message");
337         message = vals;
338     }
340     /**
341      * The message that was sent.
342      */
343     Variant message;
344 }
346 /**
347  * Thrown on mailbox crowding if the mailbox is configured with
348  * `OnCrowding.throwException`.
349  */
350 class MailboxFull : Exception
351 {
352     ///
353     this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc
354     {
355         super(msg);
356         tid = t;
357     }
359     Tid tid;
360 }
362 /**
363  * Thrown when a `Tid` is missing, e.g. when $(LREF ownerTid) doesn't
364  * find an owner thread.
365  */
366 class TidMissingException : Exception
367 {
368     import std.exception : basicExceptionCtors;
369     ///
370     mixin basicExceptionCtors;
371 }
374 // Thread ID
377 /**
378  * An opaque type used to represent a logical thread.
379  */
380 struct Tid
381 {
382 private:
383     this(MessageBox m) @safe pure nothrow @nogc
384     {
385         mbox = m;
386     }
388     MessageBox mbox;
390 public:
392     /**
393      * Generate a convenient string for identifying this `Tid`.  This is only
394      * useful to see if `Tid`'s that are currently executing are the same or
395      * different, e.g. for logging and debugging.  It is potentially possible
396      * that a `Tid` executed in the future will have the same `toString()` output
397      * as another `Tid` that has already terminated.
398      */
399     void toString(W)(ref W w) const
400     {
401         import std.format.write : formattedWrite;
402         auto p = () @trusted { return cast(void*) mbox; }();
403         formattedWrite(w, "Tid(%x)", p);
404     }
406 }
408 @safe unittest
409 {
410     import std.conv : text;
411     Tid tid;
412     assert(text(tid) == "Tid(0)");
413     auto tid2 = thisTid;
414     assert(text(tid2) != "Tid(0)");
415     auto tid3 = tid2;
416     assert(text(tid2) == text(tid3));
417 }
419 //
420 @system unittest
421 {
422     import std.format : format;
424     const(Tid) b = spawn(() {});
425     assert(format!"%s"(b)[0 .. 4] == "Tid(");
426 }
428 /**
429  * Returns: The `Tid` of the caller's thread.
430  */
431 @property Tid thisTid() @safe
432 {
433     // TODO: remove when concurrency is safe
434     static auto trus() @trusted
435     {
436         if (thisInfo.ident != Tid.init)
437             return thisInfo.ident;
438         thisInfo.ident = Tid(new MessageBox);
439         return thisInfo.ident;
440     }
442     return trus();
443 }
445 /**
446  * Return the `Tid` of the thread which spawned the caller's thread.
447  *
448  * Throws: A `TidMissingException` exception if
449  * there is no owner thread.
450  */
451 @property Tid ownerTid()
452 {
453     import std.exception : enforce;
455     enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread.");
456     return thisInfo.owner;
457 }
459 @system unittest
460 {
461     import std.exception : assertThrown;
463     static void fun()
464     {
465         string res = receiveOnly!string();
466         assert(res == "Main calling");
467         ownerTid.send("Child responding");
468     }
470     assertThrown!TidMissingException(ownerTid);
471     auto child = spawn(&fun);
472     child.send("Main calling");
473     string res = receiveOnly!string();
474     assert(res == "Child responding");
475 }
477 // Thread Creation
479 private template isSpawnable(F, T...)
480 {
481     template isParamsImplicitlyConvertible(F1, F2, int i = 0)
482     {
483         alias param1 = Parameters!F1;
484         alias param2 = Parameters!F2;
485         static if (param1.length != param2.length)
486             enum isParamsImplicitlyConvertible = false;
487         else static if (param1.length == i)
488             enum isParamsImplicitlyConvertible = true;
489         else static if (is(param2[i] : param1[i]))
490             enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1,
491                     F2, i + 1);
492         else
493             enum isParamsImplicitlyConvertible = false;
494     }
496     enum isSpawnable = isCallable!F && is(ReturnType!F : void)
497             && isParamsImplicitlyConvertible!(F, void function(T))
498             && (isFunctionPointer!F || !hasUnsharedAliasing!F);
499 }
501 /**
502  * Starts `fn(args)` in a new logical thread.
503  *
504  * Executes the supplied function in a new logical thread represented by
505  * `Tid`.  The calling thread is designated as the owner of the new thread.
506  * When the owner thread terminates an `OwnerTerminated` message will be
507  * sent to the new thread, causing an `OwnerTerminated` exception to be
508  * thrown on `receive()`.
509  *
510  * Params:
511  *  fn   = The function to execute.
512  *  args = Arguments to the function.
513  *
514  * Returns:
515  *  A `Tid` representing the new logical thread.
516  *
517  * Notes:
518  *  `args` must not have unshared aliasing.  In other words, all arguments
519  *  to `fn` must either be `shared` or `immutable` or have no
520  *  pointer indirection.  This is necessary for enforcing isolation among
521  *  threads.
522  *
523  * Similarly, if `fn` is a delegate, it must not have unshared aliases, meaning
524  * `fn` must be either `shared` or `immutable`. */
525 Tid spawn(F, T...)(F fn, T args)
526 if (isSpawnable!(F, T))
527 {
528     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
529     return _spawn(false, fn, args);
530 }
532 ///
533 @system unittest
534 {
535     static void f(string msg)
536     {
537         assert(msg == "Hello World");
538     }
540     auto tid = spawn(&f, "Hello World");
541 }
543 /// Fails: char[] has mutable aliasing.
544 @system unittest
545 {
546     string msg = "Hello, World!";
548     static void f1(string msg) {}
549     static assert(!__traits(compiles, spawn(&f1, msg.dup)));
550     static assert( __traits(compiles, spawn(&f1, msg.idup)));
552     static void f2(char[] msg) {}
553     static assert(!__traits(compiles, spawn(&f2, msg.dup)));
554     static assert(!__traits(compiles, spawn(&f2, msg.idup)));
555 }
557 /// New thread with anonymous function
558 @system unittest
559 {
560     spawn({
561         ownerTid.send("This is so great!");
562     });
563     assert(receiveOnly!string == "This is so great!");
564 }
566 @system unittest
567 {
568     import core.thread : thread_joinAll;
570     __gshared string receivedMessage;
571     static void f1(string msg)
572     {
573         receivedMessage = msg;
574     }
576     auto tid1 = spawn(&f1, "Hello World");
577     thread_joinAll;
578     assert(receivedMessage == "Hello World");
579 }
581 /**
582  * Starts `fn(args)` in a logical thread and will receive a `LinkTerminated`
583  * message when the operation terminates.
584  *
585  * Executes the supplied function in a new logical thread represented by
586  * `Tid`.  This new thread is linked to the calling thread so that if either
587  * it or the calling thread terminates a `LinkTerminated` message will be sent
588  * to the other, causing a `LinkTerminated` exception to be thrown on `receive()`.
589  * The owner relationship from `spawn()` is preserved as well, so if the link
590  * between threads is broken, owner termination will still result in an
591  * `OwnerTerminated` exception to be thrown on `receive()`.
592  *
593  * Params:
594  *  fn   = The function to execute.
595  *  args = Arguments to the function.
596  *
597  * Returns:
598  *  A Tid representing the new thread.
599  */
600 Tid spawnLinked(F, T...)(F fn, T args)
601 if (isSpawnable!(F, T))
602 {
603     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
604     return _spawn(true, fn, args);
605 }
607 /*
608  *
609  */
610 private Tid _spawn(F, T...)(bool linked, F fn, T args)
611 if (isSpawnable!(F, T))
612 {
613     // TODO: MessageList and &exec should be shared.
614     auto spawnTid = Tid(new MessageBox);
615     auto ownerTid = thisTid;
617     void exec()
618     {
619         thisInfo.ident = spawnTid;
620         thisInfo.owner = ownerTid;
621         fn(args);
622     }
624     // TODO: MessageList and &exec should be shared.
625     if (scheduler !is null)
626         scheduler.spawn(&exec);
627     else
628     {
629         auto t = new Thread(&exec);
630         t.start();
631     }
632     thisInfo.links[spawnTid] = linked;
633     return spawnTid;
634 }
636 @system unittest
637 {
638     void function() fn1;
639     void function(int) fn2;
640     static assert(__traits(compiles, spawn(fn1)));
641     static assert(__traits(compiles, spawn(fn2, 2)));
642     static assert(!__traits(compiles, spawn(fn1, 1)));
643     static assert(!__traits(compiles, spawn(fn2)));
645     void delegate(int) shared dg1;
646     shared(void delegate(int)) dg2;
647     shared(void delegate(long) shared) dg3;
648     shared(void delegate(real, int, long) shared) dg4;
649     void delegate(int) immutable dg5;
650     void delegate(int) dg6;
651     static assert(__traits(compiles, spawn(dg1, 1)));
652     static assert(__traits(compiles, spawn(dg2, 2)));
653     static assert(__traits(compiles, spawn(dg3, 3)));
654     static assert(__traits(compiles, spawn(dg4, 4, 4, 4)));
655     static assert(__traits(compiles, spawn(dg5, 5)));
656     static assert(!__traits(compiles, spawn(dg6, 6)));
658     auto callable1  = new class{ void opCall(int) shared {} };
659     auto callable2  = cast(shared) new class{ void opCall(int) shared {} };
660     auto callable3  = new class{ void opCall(int) immutable {} };
661     auto callable4  = cast(immutable) new class{ void opCall(int) immutable {} };
662     auto callable5  = new class{ void opCall(int) {} };
663     auto callable6  = cast(shared) new class{ void opCall(int) immutable {} };
664     auto callable7  = cast(immutable) new class{ void opCall(int) shared {} };
665     auto callable8  = cast(shared) new class{ void opCall(int) const shared {} };
666     auto callable9  = cast(const shared) new class{ void opCall(int) shared {} };
667     auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} };
668     auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} };
669     static assert(!__traits(compiles, spawn(callable1,  1)));
670     static assert( __traits(compiles, spawn(callable2,  2)));
671     static assert(!__traits(compiles, spawn(callable3,  3)));
672     static assert( __traits(compiles, spawn(callable4,  4)));
673     static assert(!__traits(compiles, spawn(callable5,  5)));
674     static assert(!__traits(compiles, spawn(callable6,  6)));
675     static assert(!__traits(compiles, spawn(callable7,  7)));
676     static assert( __traits(compiles, spawn(callable8,  8)));
677     static assert(!__traits(compiles, spawn(callable9,  9)));
678     static assert( __traits(compiles, spawn(callable10, 10)));
679     static assert( __traits(compiles, spawn(callable11, 11)));
680 }
682 /**
683  * Places the values as a message at the back of tid's message queue.
684  *
685  * Sends the supplied value to the thread represented by tid.  As with
686  * $(REF spawn, std,concurrency), `T` must not have unshared aliasing.
687  */
688 void send(T...)(Tid tid, T vals)
689 in (tid.mbox !is null)
690 {
691     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
692     _send(tid, vals);
693 }
695 /**
696  * Places the values as a message on the front of tid's message queue.
697  *
698  * Send a message to `tid` but place it at the front of `tid`'s message
699  * queue instead of at the back.  This function is typically used for
700  * out-of-band communication, to signal exceptional conditions, etc.
701  */
702 void prioritySend(T...)(Tid tid, T vals)
703 in (tid.mbox !is null)
704 {
705     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
706     _send(MsgType.priority, tid, vals);
707 }
709 /*
710  * ditto
711  */
712 private void _send(T...)(Tid tid, T vals)
713 in (tid.mbox !is null)
714 {
715     _send(MsgType.standard, tid, vals);
716 }
718 /*
719  * Implementation of send.  This allows parameter checking to be different for
720  * both Tid.send() and .send().
721  */
722 private void _send(T...)(MsgType type, Tid tid, T vals)
723 in (tid.mbox !is null)
724 {
725     auto msg = Message(type, vals);
726     tid.mbox.put(msg);
727 }
729 /**
730  * Receives a message from another thread.
731  *
732  * Receive a message from another thread, or block if no messages of the
733  * specified types are available.  This function works by pattern matching
734  * a message against a set of delegates and executing the first match found.
735  *
736  * If a delegate that accepts a $(REF Variant, std,variant) is included as
737  * the last argument to `receive`, it will match any message that was not
738  * matched by an earlier delegate.  If more than one argument is sent,
739  * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
740  * sent.
741  *
742  * Params:
743  *     ops = Variadic list of function pointers and delegates. Entries
744  *           in this list must not occlude later entries.
745  *
746  * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
747  */
748 void receive(T...)( T ops )
749 in
750 {
751     assert(thisInfo.ident.mbox !is null,
752            "Cannot receive a message until a thread was spawned "
753            ~ "or thisTid was passed to a running thread.");
754 }
755 do
756 {
757     checkops( ops );
759     thisInfo.ident.mbox.get( ops );
760 }
762 ///
763 @system unittest
764 {
765     import std.variant : Variant;
767     auto process = ()
768     {
769         receive(
770             (int i) { ownerTid.send(1); },
771             (double f) { ownerTid.send(2); },
772             (Variant v) { ownerTid.send(3); }
773         );
774     };
776     {
777         auto tid = spawn(process);
778         send(tid, 42);
779         assert(receiveOnly!int == 1);
780     }
782     {
783         auto tid = spawn(process);
784         send(tid, 3.14);
785         assert(receiveOnly!int == 2);
786     }
788     {
789         auto tid = spawn(process);
790         send(tid, "something else");
791         assert(receiveOnly!int == 3);
792     }
793 }
795 @safe unittest
796 {
797     static assert( __traits( compiles,
798                       {
799                           receive( (Variant x) {} );
800                           receive( (int x) {}, (Variant x) {} );
801                       } ) );
803     static assert( !__traits( compiles,
804                        {
805                            receive( (Variant x) {}, (int x) {} );
806                        } ) );
808     static assert( !__traits( compiles,
809                        {
810                            receive( (int x) {}, (int x) {} );
811                        } ) );
812 }
814 // Make sure receive() works with free functions as well.
815 version (StdUnittest)
816 {
817     private void receiveFunction(int x) {}
818 }
819 @safe unittest
820 {
821     static assert( __traits( compiles,
822                       {
823                           receive( &receiveFunction );
824                           receive( &receiveFunction, (Variant x) {} );
825                       } ) );
826 }
829 private template receiveOnlyRet(T...)
830 {
831     static if ( T.length == 1 )
832     {
833         alias receiveOnlyRet = T[0];
834     }
835     else
836     {
837         import std.typecons : Tuple;
838         alias receiveOnlyRet = Tuple!(T);
839     }
840 }
842 /**
843  * Receives only messages with arguments of the specified types.
844  *
845  * Params:
846  *     T = Variadic list of types to be received.
847  *
848  * Returns: The received message.  If `T` has more than one entry,
849  *          the message will be packed into a $(REF Tuple, std,typecons).
850  *
851  * Throws: $(LREF MessageMismatch) if a message of types other than `T`
852  *         is received,
853  *         $(LREF OwnerTerminated) when the sending thread was terminated.
854  */
855 receiveOnlyRet!(T) receiveOnly(T...)()
856 in
857 {
858     assert(thisInfo.ident.mbox !is null,
859         "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
860 }
861 do
862 {
863     import std.format : format;
864     import std.meta : allSatisfy;
865     import std.typecons : Tuple;
867     Tuple!(T) ret;
869     thisInfo.ident.mbox.get((T val) {
870         static if (T.length)
871         {
872             static if (allSatisfy!(isAssignable, T))
873             {
874                 ret.field = val;
875             }
876             else
877             {
878                 import core.lifetime : emplace;
879                 emplace(&ret, val);
880             }
881         }
882     },
883     (LinkTerminated e) { throw e; },
884     (OwnerTerminated e) { throw e; },
885     (Variant val) {
886         static if (T.length > 1)
887             string exp = T.stringof;
888         else
889             string exp = T[0].stringof;
891         throw new MessageMismatch(
892             format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString()));
893     });
894     static if (T.length == 1)
895         return ret[0];
896     else
897         return ret;
898 }
900 ///
901 @system unittest
902 {
903     auto tid = spawn(
904     {
905         assert(receiveOnly!int == 42);
906     });
907     send(tid, 42);
908 }
910 ///
911 @system unittest
912 {
913     auto tid = spawn(
914     {
915         assert(receiveOnly!string == "text");
916     });
917     send(tid, "text");
918 }
920 ///
921 @system unittest
922 {
923     struct Record { string name; int age; }
925     auto tid = spawn(
926     {
927         auto msg = receiveOnly!(double, Record);
928         assert(msg[0] == 0.5);
929         assert(msg[1].name == "Alice");
930         assert(msg[1].age == 31);
931     });
933     send(tid, 0.5, Record("Alice", 31));
934 }
936 @system unittest
937 {
938     static void t1(Tid mainTid)
939     {
940         try
941         {
942             receiveOnly!string();
943             mainTid.send("");
944         }
945         catch (Throwable th)
946         {
947             mainTid.send(th.msg);
948         }
949     }
951     auto tid = spawn(&t1, thisTid);
952     tid.send(1);
953     string result = receiveOnly!string();
954     assert(result == "Unexpected message type: expected 'string', got 'int'");
955 }
957 //
958 @safe unittest
959 {
960     alias test = receiveOnly!(string, bool, bool);
961 }
963 /**
964  * Receives a message from another thread and gives up if no match
965  * arrives within a specified duration.
966  *
967  * Receive a message from another thread, or block until `duration` exceeds,
968  * if no messages of the specified types are available. This function works
969  * by pattern matching a message against a set of delegates and executing
970  * the first match found.
971  *
972  * If a delegate that accepts a $(REF Variant, std,variant) is included as
973  * the last argument, it will match any message that was not
974  * matched by an earlier delegate.  If more than one argument is sent,
975  * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
976  * sent.
977  *
978  * Params:
979  *     duration = Duration, how long to wait. If `duration` is negative,
980  *                won't wait at all.
981  *     ops = Variadic list of function pointers and delegates. Entries
982  *           in this list must not occlude later entries.
983  *
984  * Returns: `true` if it received a message and `false` if it timed out waiting
985  *          for one.
986  *
987  * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
988  */
989 bool receiveTimeout(T...)(Duration duration, T ops)
990 in
991 {
992     assert(thisInfo.ident.mbox !is null,
993         "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
994 }
995 do
996 {
997     checkops(ops);
999     return thisInfo.ident.mbox.get(duration, ops);
1000 }
1002 @safe unittest
1003 {
1004     static assert(__traits(compiles, {
1005         receiveTimeout(msecs(0), (Variant x) {});
1006         receiveTimeout(msecs(0), (int x) {}, (Variant x) {});
1007     }));
1009     static assert(!__traits(compiles, {
1010         receiveTimeout(msecs(0), (Variant x) {}, (int x) {});
1011     }));
1013     static assert(!__traits(compiles, {
1014         receiveTimeout(msecs(0), (int x) {}, (int x) {});
1015     }));
1017     static assert(__traits(compiles, {
1018         receiveTimeout(msecs(10), (int x) {}, (Variant x) {});
1019     }));
1020 }
1022 // MessageBox Limits
1024 /**
1025  * These behaviors may be specified when a mailbox is full.
1026  */
1027 enum OnCrowding
1028 {
1029     block, /// Wait until room is available.
1030     throwException, /// Throw a $(LREF MailboxFull) exception.
1031     ignore /// Abort the send and return.
1032 }
1034 private
1035 {
1036     bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc
1037     {
1038         return true;
1039     }
1041     bool onCrowdingThrow(Tid tid) @safe pure
1042     {
1043         throw new MailboxFull(tid);
1044     }
1046     bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc
1047     {
1048         return false;
1049     }
1050 }
1052 /**
1053  * Sets a maximum mailbox size.
1054  *
1055  * Sets a limit on the maximum number of user messages allowed in the mailbox.
1056  * If this limit is reached, the caller attempting to add a new message will
1057  * execute the behavior specified by doThis.  If messages is zero, the mailbox
1058  * is unbounded.
1059  *
1060  * Params:
1061  *  tid      = The Tid of the thread for which this limit should be set.
1062  *  messages = The maximum number of messages or zero if no limit.
1063  *  doThis   = The behavior executed when a message is sent to a full
1064  *             mailbox.
1065  */
1066 void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure
1067 in (tid.mbox !is null)
1068 {
1069     final switch (doThis)
1070     {
1071     case OnCrowding.block:
1072         return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock);
1073     case OnCrowding.throwException:
1074         return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow);
1075     case OnCrowding.ignore:
1076         return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore);
1077     }
1078 }
1080 /**
1081  * Sets a maximum mailbox size.
1082  *
1083  * Sets a limit on the maximum number of user messages allowed in the mailbox.
1084  * If this limit is reached, the caller attempting to add a new message will
1085  * execute onCrowdingDoThis.  If messages is zero, the mailbox is unbounded.
1086  *
1087  * Params:
1088  *  tid      = The Tid of the thread for which this limit should be set.
1089  *  messages = The maximum number of messages or zero if no limit.
1090  *  onCrowdingDoThis = The routine called when a message is sent to a full
1091  *                     mailbox.
1092  */
1093 void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis)
1094 in (tid.mbox !is null)
1095 {
1096     tid.mbox.setMaxMsgs(messages, onCrowdingDoThis);
1097 }
1099 private
1100 {
1101     __gshared Tid[string] tidByName;
1102     __gshared string[][Tid] namesByTid;
1103 }
1105 private @property Mutex registryLock() @system
1106 {
1107     __gshared Mutex impl;
1108     initOnce!impl(new Mutex);
1109     return impl;
1110 }
1112 private void unregisterMe(ref ThreadInfo me)
1113 {
1114     if (me.ident != Tid.init)
1115     {
1116         synchronized (registryLock)
1117         {
1118             if (auto allNames = me.ident in namesByTid)
1119             {
1120                 foreach (name; *allNames)
1121                     tidByName.remove(name);
1122                 namesByTid.remove(me.ident);
1123             }
1124         }
1125     }
1126 }
1128 /**
1129  * Associates name with tid.
1130  *
1131  * Associates name with tid in a process-local map.  When the thread
1132  * represented by tid terminates, any names associated with it will be
1133  * automatically unregistered.
1134  *
1135  * Params:
1136  *  name = The name to associate with tid.
1137  *  tid  = The tid register by name.
1138  *
1139  * Returns:
1140  *  true if the name is available and tid is not known to represent a
1141  *  defunct thread.
1142  */
1143 bool register(string name, Tid tid)
1144 in (tid.mbox !is null)
1145 {
1146     synchronized (registryLock)
1147     {
1148         if (name in tidByName)
1149             return false;
1150         if (tid.mbox.isClosed)
1151             return false;
1152         namesByTid[tid] ~= name;
1153         tidByName[name] = tid;
1154         return true;
1155     }
1156 }
1158 /**
1159  * Removes the registered name associated with a tid.
1160  *
1161  * Params:
1162  *  name = The name to unregister.
1163  *
1164  * Returns:
1165  *  true if the name is registered, false if not.
1166  */
1167 bool unregister(string name)
1168 {
1169     import std.algorithm.mutation : remove, SwapStrategy;
1170     import std.algorithm.searching : countUntil;
1172     synchronized (registryLock)
1173     {
1174         if (auto tid = name in tidByName)
1175         {
1176             auto allNames = *tid in namesByTid;
1177             auto pos = countUntil(*allNames, name);
1178             remove!(SwapStrategy.unstable)(*allNames, pos);
1179             tidByName.remove(name);
1180             return true;
1181         }
1182         return false;
1183     }
1184 }
1186 /**
1187  * Gets the `Tid` associated with name.
1188  *
1189  * Params:
1190  *  name = The name to locate within the registry.
1191  *
1192  * Returns:
1193  *  The associated `Tid` or `Tid.init` if name is not registered.
1194  */
1195 Tid locate(string name)
1196 {
1197     synchronized (registryLock)
1198     {
1199         if (auto tid = name in tidByName)
1200             return *tid;
1201         return Tid.init;
1202     }
1203 }
1205 /**
1206  * Encapsulates all implementation-level data needed for scheduling.
1207  *
1208  * When defining a $(LREF Scheduler), an instance of this struct must be associated
1209  * with each logical thread.  It contains all implementation-level information
1210  * needed by the internal API.
1211  */
1212 struct ThreadInfo
1213 {
1214     Tid ident;
1215     bool[Tid] links;
1216     Tid owner;
1218     /**
1219      * Gets a thread-local instance of `ThreadInfo`.
1220      *
1221      * Gets a thread-local instance of `ThreadInfo`, which should be used as the
1222      * default instance when info is requested for a thread not created by the
1223      * `Scheduler`.
1224      */
1225     static @property ref thisInfo() nothrow
1226     {
1227         static ThreadInfo val;
1228         return val;
1229     }
1231     /**
1232      * Cleans up this ThreadInfo.
1233      *
1234      * This must be called when a scheduled thread terminates.  It tears down
1235      * the messaging system for the thread and notifies interested parties of
1236      * the thread's termination.
1237      */
1238     void cleanup()
1239     {
1240         if (ident.mbox !is null)
1241             ident.mbox.close();
1242         foreach (tid; links.keys)
1243             _send(MsgType.linkDead, tid, ident);
1244         if (owner != Tid.init)
1245             _send(MsgType.linkDead, owner, ident);
1246         unregisterMe(this); // clean up registry entries
1247     }
1249     //
1250     @system unittest
1251     {
1252         register("main_thread", thisTid());
1254         ThreadInfo t;
1255         t.cleanup();
1257         assert(locate("main_thread") == thisTid());
1258     }
1259 }
1261 /**
1262  * A `Scheduler` controls how threading is performed by spawn.
1263  *
1264  * Implementing a `Scheduler` allows the concurrency mechanism used by this
1265  * module to be customized according to different needs.  By default, a call
1266  * to spawn will create a new kernel thread that executes the supplied routine
1267  * and terminates when finished.  But it is possible to create `Scheduler`s that
1268  * reuse threads, that multiplex `Fiber`s (coroutines) across a single thread,
1269  * or any number of other approaches.  By making the choice of `Scheduler` a
1270  * user-level option, `std.concurrency` may be used for far more types of
1271  * application than if this behavior were predefined.
1272  *
1273  * Example:
1274  * ---
1275  * import std.concurrency;
1276  * import std.stdio;
1277  *
1278  * void main()
1279  * {
1280  *     scheduler = new FiberScheduler;
1281  *     scheduler.start(
1282  *     {
1283  *         writeln("the rest of main goes here");
1284  *     });
1285  * }
1286  * ---
1287  *
1288  * Some schedulers have a dispatching loop that must run if they are to work
1289  * properly, so for the sake of consistency, when using a scheduler, `start()`
1290  * must be called within `main()`.  This yields control to the scheduler and
1291  * will ensure that any spawned threads are executed in an expected manner.
1292  */
1293 interface Scheduler
1294 {
1295     /**
1296      * Spawns the supplied op and starts the `Scheduler`.
1297      *
1298      * This is intended to be called at the start of the program to yield all
1299      * scheduling to the active `Scheduler` instance.  This is necessary for
1300      * schedulers that explicitly dispatch threads rather than simply relying
1301      * on the operating system to do so, and so start should always be called
1302      * within `main()` to begin normal program execution.
1303      *
1304      * Params:
1305      *  op = A wrapper for whatever the main thread would have done in the
1306      *       absence of a custom scheduler.  It will be automatically executed
1307      *       via a call to spawn by the `Scheduler`.
1308      */
1309     void start(void delegate() op);
1311     /**
1312      * Assigns a logical thread to execute the supplied op.
1313      *
1314      * This routine is called by spawn.  It is expected to instantiate a new
1315      * logical thread and run the supplied operation.  This thread must call
1316      * `thisInfo.cleanup()` when the thread terminates if the scheduled thread
1317      * is not a kernel thread--all kernel threads will have their `ThreadInfo`
1318      * cleaned up automatically by a thread-local destructor.
1319      *
1320      * Params:
1321      *  op = The function to execute.  This may be the actual function passed
1322      *       by the user to spawn itself, or may be a wrapper function.
1323      */
1324     void spawn(void delegate() op);
1326     /**
1327      * Yields execution to another logical thread.
1328      *
1329      * This routine is called at various points within concurrency-aware APIs
1330      * to provide a scheduler a chance to yield execution when using some sort
1331      * of cooperative multithreading model.  If this is not appropriate, such
1332      * as when each logical thread is backed by a dedicated kernel thread,
1333      * this routine may be a no-op.
1334      */
1335     void yield() nothrow;
1337     /**
1338      * Returns an appropriate `ThreadInfo` instance.
1339      *
1340      * Returns an instance of `ThreadInfo` specific to the logical thread that
1341      * is calling this routine or, if the calling thread was not create by
1342      * this scheduler, returns `ThreadInfo.thisInfo` instead.
1343      */
1344     @property ref ThreadInfo thisInfo() nothrow;
1346     /**
1347      * Creates a `Condition` variable analog for signaling.
1348      *
1349      * Creates a new `Condition` variable analog which is used to check for and
1350      * to signal the addition of messages to a thread's message queue.  Like
1351      * yield, some schedulers may need to define custom behavior so that calls
1352      * to `Condition.wait()` yield to another thread when no new messages are
1353      * available instead of blocking.
1354      *
1355      * Params:
1356      *  m = The `Mutex` that will be associated with this condition.  It will be
1357      *      locked prior to any operation on the condition, and so in some
1358      *      cases a `Scheduler` may need to hold this reference and unlock the
1359      *      mutex before yielding execution to another logical thread.
1360      */
1361     Condition newCondition(Mutex m) nothrow;
1362 }
1364 /**
1365  * An example `Scheduler` using kernel threads.
1366  *
1367  * This is an example `Scheduler` that mirrors the default scheduling behavior
1368  * of creating one kernel thread per call to spawn.  It is fully functional
1369  * and may be instantiated and used, but is not a necessary part of the
1370  * default functioning of this module.
1371  */
1372 class ThreadScheduler : Scheduler
1373 {
1374     /**
1375      * This simply runs op directly, since no real scheduling is needed by
1376      * this approach.
1377      */
1378     void start(void delegate() op)
1379     {
1380         op();
1381     }
1383     /**
1384      * Creates a new kernel thread and assigns it to run the supplied op.
1385      */
1386     void spawn(void delegate() op)
1387     {
1388         auto t = new Thread(op);
1389         t.start();
1390     }
1392     /**
1393      * This scheduler does no explicit multiplexing, so this is a no-op.
1394      */
1395     void yield() nothrow
1396     {
1397         // no explicit yield needed
1398     }
1400     /**
1401      * Returns `ThreadInfo.thisInfo`, since it is a thread-local instance of
1402      * `ThreadInfo`, which is the correct behavior for this scheduler.
1403      */
1404     @property ref ThreadInfo thisInfo() nothrow
1405     {
1406         return ThreadInfo.thisInfo;
1407     }
1409     /**
1410      * Creates a new `Condition` variable.  No custom behavior is needed here.
1411      */
1412     Condition newCondition(Mutex m) nothrow
1413     {
1414         return new Condition(m);
1415     }
1416 }
1418 /**
1419  * An example `Scheduler` using $(MREF_ALTTEXT `Fiber`s, core, thread, fiber).
1420  *
1421  * This is an example scheduler that creates a new `Fiber` per call to spawn
1422  * and multiplexes the execution of all fibers within the main thread.
1423  */
1424 class FiberScheduler : Scheduler
1425 {
1426     /**
1427      * This creates a new `Fiber` for the supplied op and then starts the
1428      * dispatcher.
1429      */
1430     void start(void delegate() op)
1431     {
1432         create(op);
1433         dispatch();
1434     }
1436     /**
1437      * This created a new `Fiber` for the supplied op and adds it to the
1438      * dispatch list.
1439      */
1440     void spawn(void delegate() op) nothrow
1441     {
1442         create(op);
1443         yield();
1444     }
1446     /**
1447      * If the caller is a scheduled `Fiber`, this yields execution to another
1448      * scheduled `Fiber`.
1449      */
1450     void yield() nothrow
1451     {
1452         // NOTE: It's possible that we should test whether the calling Fiber
1453         //       is an InfoFiber before yielding, but I think it's reasonable
1454         //       that any (non-Generator) fiber should yield here.
1455         if (Fiber.getThis())
1456             Fiber.yield();
1457     }
1459     /**
1460      * Returns an appropriate `ThreadInfo` instance.
1461      *
1462      * Returns a `ThreadInfo` instance specific to the calling `Fiber` if the
1463      * `Fiber` was created by this dispatcher, otherwise it returns
1464      * `ThreadInfo.thisInfo`.
1465      */
1466     @property ref ThreadInfo thisInfo() nothrow
1467     {
1468         auto f = cast(InfoFiber) Fiber.getThis();
1470         if (f !is null)
1471             return;
1472         return ThreadInfo.thisInfo;
1473     }
1475     /**
1476      * Returns a `Condition` analog that yields when wait or notify is called.
1477      *
1478      * Bug:
1479      * For the default implementation, `notifyAll` will behave like `notify`.
1480      *
1481      * Params:
1482      *   m = A `Mutex` to use for locking if the condition needs to be waited on
1483      *       or notified from multiple `Thread`s.
1484      *       If `null`, no `Mutex` will be used and it is assumed that the
1485      *       `Condition` is only waited on/notified from one `Thread`.
1486      */
1487     Condition newCondition(Mutex m) nothrow
1488     {
1489         return new FiberCondition(m);
1490     }
1492 protected:
1493     /**
1494      * Creates a new `Fiber` which calls the given delegate.
1495      *
1496      * Params:
1497      *   op = The delegate the fiber should call
1498      */
1499     void create(void delegate() op) nothrow
1500     {
1501         void wrap()
1502         {
1503             scope (exit)
1504             {
1505                 thisInfo.cleanup();
1506             }
1507             op();
1508         }
1510         m_fibers ~= new InfoFiber(&wrap);
1511     }
1513     /**
1514      * `Fiber` which embeds a `ThreadInfo`
1515      */
1516     static class InfoFiber : Fiber
1517     {
1518         ThreadInfo info;
1520         this(void delegate() op) nothrow
1521         {
1522             super(op);
1523         }
1525         this(void delegate() op, size_t sz) nothrow
1526         {
1527             super(op, sz);
1528         }
1529     }
1531 private:
1532     class FiberCondition : Condition
1533     {
1534         this(Mutex m) nothrow
1535         {
1536             super(m);
1537             notified = false;
1538         }
1540         override void wait() nothrow
1541         {
1542             scope (exit) notified = false;
1544             while (!notified)
1545                 switchContext();
1546         }
1548         override bool wait(Duration period) nothrow
1549         {
1550             import core.time : MonoTime;
1552             scope (exit) notified = false;
1554             for (auto limit = MonoTime.currTime + period;
1555                  !notified && !period.isNegative;
1556                  period = limit - MonoTime.currTime)
1557             {
1558                 this.outer.yield();
1559             }
1560             return notified;
1561         }
1563         override void notify() nothrow
1564         {
1565             notified = true;
1566             switchContext();
1567         }
1569         override void notifyAll() nothrow
1570         {
1571             notified = true;
1572             switchContext();
1573         }
1575     private:
1576         void switchContext() nothrow
1577         {
1578             if (mutex_nothrow) mutex_nothrow.unlock_nothrow();
1579             scope (exit)
1580                 if (mutex_nothrow)
1581                     mutex_nothrow.lock_nothrow();
1582             this.outer.yield();
1583         }
1585         bool notified;
1586     }
1588     void dispatch()
1589     {
1590         import std.algorithm.mutation : remove;
1592         while (m_fibers.length > 0)
1593         {
1594             auto t = m_fibers[m_pos].call(;
1595             if (t !is null && !(cast(OwnerTerminated) t))
1596             {
1597                 throw t;
1598             }
1599             if (m_fibers[m_pos].state == Fiber.State.TERM)
1600             {
1601                 if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length)
1602                     m_pos = 0;
1603             }
1604             else if (m_pos++ >= m_fibers.length - 1)
1605             {
1606                 m_pos = 0;
1607             }
1608         }
1609     }
1611     Fiber[] m_fibers;
1612     size_t m_pos;
1613 }
1615 @system unittest
1616 {
1617     static void receive(Condition cond, ref size_t received)
1618     {
1619         while (true)
1620         {
1621             synchronized (cond.mutex)
1622             {
1623                 cond.wait();
1624                 ++received;
1625             }
1626         }
1627     }
1629     static void send(Condition cond, ref size_t sent)
1630     {
1631         while (true)
1632         {
1633             synchronized (cond.mutex)
1634             {
1635                 ++sent;
1636                 cond.notify();
1637             }
1638         }
1639     }
1641     auto fs = new FiberScheduler;
1642     auto mtx = new Mutex;
1643     auto cond = fs.newCondition(mtx);
1645     size_t received, sent;
1646     auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); });
1648     assert(received == 0);
1650     assert(sent == 1);
1651     assert(received == 0);
1653     assert(received == 1);
1655     assert(received == 1);
1656 }
1658 /**
1659  * Sets the `Scheduler` behavior within the program.
1660  *
1661  * This variable sets the `Scheduler` behavior within this program.  Typically,
1662  * when setting a `Scheduler`, `scheduler.start()` should be called in `main`.  This
1663  * routine will not return until program execution is complete.
1664  */
1665 __gshared Scheduler scheduler;
1667 // Generator
1669 /**
1670  * If the caller is a `Fiber` and is not a $(LREF Generator), this function will call
1671  * `scheduler.yield()` or `Fiber.yield()`, as appropriate.
1672  */
1673 void yield() nothrow
1674 {
1675     auto fiber = Fiber.getThis();
1676     if (!(cast(IsGenerator) fiber))
1677     {
1678         if (scheduler is null)
1679         {
1680             if (fiber)
1681                 return Fiber.yield();
1682         }
1683         else
1684             scheduler.yield();
1685     }
1686 }
1688 /// Used to determine whether a Generator is running.
1689 private interface IsGenerator {}
1692 /**
1693  * A Generator is a $(MREF_ALTTEXT Fiber, core, thread, fiber)
1694  * that periodically returns values of type `T` to the
1695  * caller via `yield`.  This is represented as an InputRange.
1696  */
1697 class Generator(T) :
1698     Fiber, IsGenerator, InputRange!T
1699 {
1700     /**
1701      * Initializes a generator object which is associated with a static
1702      * D function.  The function will be called once to prepare the range
1703      * for iteration.
1704      *
1705      * Params:
1706      *  fn = The fiber function.
1707      *
1708      * In:
1709      *  fn must not be null.
1710      */
1711     this(void function() fn)
1712     {
1713         super(fn);
1714         call();
1715     }
1717     /**
1718      * Initializes a generator object which is associated with a static
1719      * D function.  The function will be called once to prepare the range
1720      * for iteration.
1721      *
1722      * Params:
1723      *  fn = The fiber function.
1724      *  sz = The stack size for this fiber.
1725      *
1726      * In:
1727      *  fn must not be null.
1728      */
1729     this(void function() fn, size_t sz)
1730     {
1731         super(fn, sz);
1732         call();
1733     }
1735     /**
1736      * Initializes a generator object which is associated with a static
1737      * D function.  The function will be called once to prepare the range
1738      * for iteration.
1739      *
1740      * Params:
1741      *  fn = The fiber function.
1742      *  sz = The stack size for this fiber.
1743      *  guardPageSize = size of the guard page to trap fiber's stack
1744      *                  overflows. Refer to $(REF Fiber, core,thread)'s
1745      *                  documentation for more details.
1746      *
1747      * In:
1748      *  fn must not be null.
1749      */
1750     this(void function() fn, size_t sz, size_t guardPageSize)
1751     {
1752         super(fn, sz, guardPageSize);
1753         call();
1754     }
1756     /**
1757      * Initializes a generator object which is associated with a dynamic
1758      * D function.  The function will be called once to prepare the range
1759      * for iteration.
1760      *
1761      * Params:
1762      *  dg = The fiber function.
1763      *
1764      * In:
1765      *  dg must not be null.
1766      */
1767     this(void delegate() dg)
1768     {
1769         super(dg);
1770         call();
1771     }
1773     /**
1774      * Initializes a generator object which is associated with a dynamic
1775      * D function.  The function will be called once to prepare the range
1776      * for iteration.
1777      *
1778      * Params:
1779      *  dg = The fiber function.
1780      *  sz = The stack size for this fiber.
1781      *
1782      * In:
1783      *  dg must not be null.
1784      */
1785     this(void delegate() dg, size_t sz)
1786     {
1787         super(dg, sz);
1788         call();
1789     }
1791     /**
1792      * Initializes a generator object which is associated with a dynamic
1793      * D function.  The function will be called once to prepare the range
1794      * for iteration.
1795      *
1796      * Params:
1797      *  dg = The fiber function.
1798      *  sz = The stack size for this fiber.
1799      *  guardPageSize = size of the guard page to trap fiber's stack
1800      *                  overflows. Refer to $(REF Fiber, core,thread)'s
1801      *                  documentation for more details.
1802      *
1803      * In:
1804      *  dg must not be null.
1805      */
1806     this(void delegate() dg, size_t sz, size_t guardPageSize)
1807     {
1808         super(dg, sz, guardPageSize);
1809         call();
1810     }
1812     /**
1813      * Returns true if the generator is empty.
1814      */
1815     final bool empty() @property
1816     {
1817         return m_value is null || state == State.TERM;
1818     }
1820     /**
1821      * Obtains the next value from the underlying function.
1822      */
1823     final void popFront()
1824     {
1825         call();
1826     }
1828     /**
1829      * Returns the most recently generated value by shallow copy.
1830      */
1831     final T front() @property
1832     {
1833         return *m_value;
1834     }
1836     /**
1837      * Returns the most recently generated value without executing a
1838      * copy contructor. Will not compile for element types defining a
1839      * postblit, because `Generator` does not return by reference.
1840      */
1841     final T moveFront()
1842     {
1843         static if (!hasElaborateCopyConstructor!T)
1844         {
1845             return front;
1846         }
1847         else
1848         {
1849             static assert(0,
1850                     "Fiber front is always rvalue and thus cannot be moved since it defines a postblit.");
1851         }
1852     }
1854     final int opApply(scope int delegate(T) loopBody)
1855     {
1856         int broken;
1857         for (; !empty; popFront())
1858         {
1859             broken = loopBody(front);
1860             if (broken) break;
1861         }
1862         return broken;
1863     }
1865     final int opApply(scope int delegate(size_t, T) loopBody)
1866     {
1867         int broken;
1868         for (size_t i; !empty; ++i, popFront())
1869         {
1870             broken = loopBody(i, front);
1871             if (broken) break;
1872         }
1873         return broken;
1874     }
1875 private:
1876     T* m_value;
1877 }
1879 ///
1880 @system unittest
1881 {
1882     auto tid = spawn({
1883         int i;
1884         while (i < 9)
1885             i = receiveOnly!int;
1887         ownerTid.send(i * 2);
1888     });
1890     auto r = new Generator!int({
1891         foreach (i; 1 .. 10)
1892             yield(i);
1893     });
1895     foreach (e; r)
1896         tid.send(e);
1898     assert(receiveOnly!int == 18);
1899 }
1901 /**
1902  * Yields a value of type T to the caller of the currently executing
1903  * generator.
1904  *
1905  * Params:
1906  *  value = The value to yield.
1907  */
1908 void yield(T)(ref T value)
1909 {
1910     Generator!T cur = cast(Generator!T) Fiber.getThis();
1911     if (cur !is null && cur.state == Fiber.State.EXEC)
1912     {
1913         cur.m_value = &value;
1914         return Fiber.yield();
1915     }
1916     throw new Exception("yield(T) called with no active generator for the supplied type");
1917 }
1919 /// ditto
1920 void yield(T)(T value)
1921 {
1922     yield(value);
1923 }
1925 @system unittest
1926 {
1927     import core.exception;
1928     import std.exception;
1930     auto mainTid = thisTid;
1931     alias testdg = () {
1932         auto tid = spawn(
1933         (Tid mainTid) {
1934             int i;
1935             scope (failure) mainTid.send(false);
1936             try
1937             {
1938                 for (i = 1; i < 10; i++)
1939                 {
1940                     if (receiveOnly!int() != i)
1941                     {
1942                         mainTid.send(false);
1943                         break;
1944                     }
1945                 }
1946             }
1947             catch (OwnerTerminated e)
1948             {
1949                 // i will advance 1 past the last value expected
1950                 mainTid.send(i == 4);
1951             }
1952         }, mainTid);
1953         auto r = new Generator!int(
1954         {
1955             assertThrown!Exception(yield(2.0));
1956             yield(); // ensure this is a no-op
1957             yield(1);
1958             yield(); // also once something has been yielded
1959             yield(2);
1960             yield(3);
1961         });
1963         foreach (e; r)
1964         {
1965             tid.send(e);
1966         }
1967     };
1969     scheduler = new ThreadScheduler;
1970     scheduler.spawn(testdg);
1971     assert(receiveOnly!bool());
1973     scheduler = new FiberScheduler;
1974     scheduler.start(testdg);
1975     assert(receiveOnly!bool());
1976     scheduler = null;
1977 }
1978 ///
1979 @system unittest
1980 {
1981     import std.range;
1983     InputRange!int myIota = iota(10).inputRangeObject;
1985     myIota.popFront();
1986     myIota.popFront();
1987     assert(myIota.moveFront == 2);
1988     assert(myIota.front == 2);
1989     myIota.popFront();
1990     assert(myIota.front == 3);
1992     //can be assigned to std.range.interfaces.InputRange directly
1993     myIota = new Generator!int(
1994     {
1995         foreach (i; 0 .. 10) yield(i);
1996     });
1998     myIota.popFront();
1999     myIota.popFront();
2000     assert(myIota.moveFront == 2);
2001     assert(myIota.front == 2);
2002     myIota.popFront();
2003     assert(myIota.front == 3);
2005     size_t[2] counter = [0, 0];
2006     foreach (i, unused; myIota) counter[] += [1, i];
2008     assert(myIota.empty);
2009     assert(counter == [7, 21]);
2010 }
2012 private
2013 {
2014     /*
2015      * A MessageBox is a message queue for one thread.  Other threads may send
2016      * messages to this owner by calling put(), and the owner receives them by
2017      * calling get().  The put() call is therefore effectively shared and the
2018      * get() call is effectively local.  setMaxMsgs may be used by any thread
2019      * to limit the size of the message queue.
2020      */
2021     class MessageBox
2022     {
2023         this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */
2024         {
2025             m_lock = new Mutex;
2026             m_closed = false;
2028             if (scheduler is null)
2029             {
2030                 m_putMsg = new Condition(m_lock);
2031                 m_notFull = new Condition(m_lock);
2032             }
2033             else
2034             {
2035                 m_putMsg = scheduler.newCondition(m_lock);
2036                 m_notFull = scheduler.newCondition(m_lock);
2037             }
2038         }
2040         ///
2041         final @property bool isClosed() @safe @nogc pure
2042         {
2043             synchronized (m_lock)
2044             {
2045                 return m_closed;
2046             }
2047         }
2049         /*
2050          * Sets a limit on the maximum number of user messages allowed in the
2051          * mailbox.  If this limit is reached, the caller attempting to add
2052          * a new message will execute call.  If num is zero, there is no limit
2053          * on the message queue.
2054          *
2055          * Params:
2056          *  num  = The maximum size of the queue or zero if the queue is
2057          *         unbounded.
2058          *  call = The routine to call when the queue is full.
2059          */
2060         final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure
2061         {
2062             synchronized (m_lock)
2063             {
2064                 m_maxMsgs = num;
2065                 m_onMaxMsgs = call;
2066             }
2067         }
2069         /*
2070          * If maxMsgs is not set, the message is added to the queue and the
2071          * owner is notified.  If the queue is full, the message will still be
2072          * accepted if it is a control message, otherwise onCrowdingDoThis is
2073          * called.  If the routine returns true, this call will block until
2074          * the owner has made space available in the queue.  If it returns
2075          * false, this call will abort.
2076          *
2077          * Params:
2078          *  msg = The message to put in the queue.
2079          *
2080          * Throws:
2081          *  An exception if the queue is full and onCrowdingDoThis throws.
2082          */
2083         final void put(ref Message msg)
2084         {
2085             synchronized (m_lock)
2086             {
2087                 // TODO: Generate an error here if m_closed is true, or maybe
2088                 //       put a message in the caller's queue?
2089                 if (!m_closed)
2090                 {
2091                     while (true)
2092                     {
2093                         if (isPriorityMsg(msg))
2094                         {
2095                             m_sharedPty.put(msg);
2096                             m_putMsg.notify();
2097                             return;
2098                         }
2099                         if (!mboxFull() || isControlMsg(msg))
2100                         {
2101                             m_sharedBox.put(msg);
2102                             m_putMsg.notify();
2103                             return;
2104                         }
2105                         if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid))
2106                         {
2107                             return;
2108                         }
2109                         m_putQueue++;
2110                         m_notFull.wait();
2111                         m_putQueue--;
2112                     }
2113                 }
2114             }
2115         }
2117         /*
2118          * Matches ops against each message in turn until a match is found.
2119          *
2120          * Params:
2121          *  ops = The operations to match.  Each may return a bool to indicate
2122          *        whether a message with a matching type is truly a match.
2123          *
2124          * Returns:
2125          *  true if a message was retrieved and false if not (such as if a
2126          *  timeout occurred).
2127          *
2128          * Throws:
2129          *  LinkTerminated if a linked thread terminated, or OwnerTerminated
2130          * if the owner thread terminates and no existing messages match the
2131          * supplied ops.
2132          */
2133         bool get(T...)(scope T vals)
2134         {
2135             import std.meta : AliasSeq;
2137             static assert(T.length, "T must not be empty");
2139             static if (is(T[0] : Duration))
2140             {
2141                 alias Ops = AliasSeq!(T[1 .. $]);
2142                 alias ops = vals[1 .. $];
2143                 enum timedWait = true;
2144                 Duration period = vals[0];
2145             }
2146             else
2147             {
2148                 alias Ops = AliasSeq!(T);
2149                 alias ops = vals[0 .. $];
2150                 enum timedWait = false;
2151             }
2153             bool onStandardMsg(ref Message msg)
2154             {
2155                 foreach (i, t; Ops)
2156                 {
2157                     alias Args = Parameters!(t);
2158                     auto op = ops[i];
2160                     if (msg.convertsTo!(Args))
2161                     {
2162                         alias RT = ReturnType!(t);
2163                         static if (is(RT == bool))
2164                         {
2165                             return;
2166                         }
2167                         else
2168                         {
2169                   ;
2170                             static if (!is(immutable RT == immutable noreturn))
2171                                 return true;
2172                         }
2173                     }
2174                 }
2175                 return false;
2176             }
2178             bool onLinkDeadMsg(ref Message msg)
2179             {
2180                 assert(msg.convertsTo!(Tid),
2181                         "Message could be converted to Tid");
2182                 auto tid = msg.get!(Tid);
2184                 if (bool* pDepends = tid in thisInfo.links)
2185                 {
2186                     auto depends = *pDepends;
2187                     thisInfo.links.remove(tid);
2188                     // Give the owner relationship precedence.
2189                     if (depends && tid != thisInfo.owner)
2190                     {
2191                         auto e = new LinkTerminated(tid);
2192                         auto m = Message(MsgType.standard, e);
2193                         if (onStandardMsg(m))
2194                             return true;
2195                         throw e;
2196                     }
2197                 }
2198                 if (tid == thisInfo.owner)
2199                 {
2200                     thisInfo.owner = Tid.init;
2201                     auto e = new OwnerTerminated(tid);
2202                     auto m = Message(MsgType.standard, e);
2203                     if (onStandardMsg(m))
2204                         return true;
2205                     throw e;
2206                 }
2207                 return false;
2208             }
2210             bool onControlMsg(ref Message msg)
2211             {
2212                 switch (msg.type)
2213                 {
2214                 case MsgType.linkDead:
2215                     return onLinkDeadMsg(msg);
2216                 default:
2217                     return false;
2218                 }
2219             }
2221             bool scan(ref ListT list)
2222             {
2223                 for (auto range = list[]; !range.empty;)
2224                 {
2225                     // Only the message handler will throw, so if this occurs
2226                     // we can be certain that the message was handled.
2227                     scope (failure)
2228                         list.removeAt(range);
2230                     if (isControlMsg(range.front))
2231                     {
2232                         if (onControlMsg(range.front))
2233                         {
2234                             // Although the linkDead message is a control message,
2235                             // it can be handled by the user.  Since the linkDead
2236                             // message throws if not handled, if we get here then
2237                             // it has been handled and we can return from receive.
2238                             // This is a weird special case that will have to be
2239                             // handled in a more general way if more are added.
2240                             if (!isLinkDeadMsg(range.front))
2241                             {
2242                                 list.removeAt(range);
2243                                 continue;
2244                             }
2245                             list.removeAt(range);
2246                             return true;
2247                         }
2248                         range.popFront();
2249                         continue;
2250                     }
2251                     else
2252                     {
2253                         if (onStandardMsg(range.front))
2254                         {
2255                             list.removeAt(range);
2256                             return true;
2257                         }
2258                         range.popFront();
2259                         continue;
2260                     }
2261                 }
2262                 return false;
2263             }
2265             bool pty(ref ListT list)
2266             {
2267                 if (!list.empty)
2268                 {
2269                     auto range = list[];
2271                     if (onStandardMsg(range.front))
2272                     {
2273                         list.removeAt(range);
2274                         return true;
2275                     }
2276                     if (range.front.convertsTo!(Throwable))
2277                         throw range.front.get!(Throwable);
2278                     else if (range.front.convertsTo!(shared(Throwable)))
2279                         /* Note: a shared type can be caught without the shared qualifier
2280                          * so throwing shared will be an error */
2281                         throw cast() range.front.get!(shared(Throwable));
2282                     else
2283                         throw new PriorityMessageException(;
2284                 }
2285                 return false;
2286             }
2288             static if (timedWait)
2289             {
2290                 import core.time : MonoTime;
2291                 auto limit = MonoTime.currTime + period;
2292             }
2294             while (true)
2295             {
2296                 ListT arrived;
2298                 if (pty(m_localPty) || scan(m_localBox))
2299                 {
2300                     return true;
2301                 }
2302                 yield();
2303                 synchronized (m_lock)
2304                 {
2305                     updateMsgCount();
2306                     while (m_sharedPty.empty && m_sharedBox.empty)
2307                     {
2308                         // NOTE: We're notifying all waiters here instead of just
2309                         //       a few because the onCrowding behavior may have
2310                         //       changed and we don't want to block sender threads
2311                         //       unnecessarily if the new behavior is not to block.
2312                         //       This will admittedly result in spurious wakeups
2313                         //       in other situations, but what can you do?
2314                         if (m_putQueue && !mboxFull())
2315                             m_notFull.notifyAll();
2316                         static if (timedWait)
2317                         {
2318                             if (period <= || !m_putMsg.wait(period))
2319                                 return false;
2320                         }
2321                         else
2322                         {
2323                             m_putMsg.wait();
2324                         }
2325                     }
2326                     m_localPty.put(m_sharedPty);
2327                     arrived.put(m_sharedBox);
2328                 }
2329                 if (m_localPty.empty)
2330                 {
2331                     scope (exit) m_localBox.put(arrived);
2332                     if (scan(arrived))
2333                     {
2334                         return true;
2335                     }
2336                     else
2337                     {
2338                         static if (timedWait)
2339                         {
2340                             period = limit - MonoTime.currTime;
2341                         }
2342                         continue;
2343                     }
2344                 }
2345                 m_localBox.put(arrived);
2346                 pty(m_localPty);
2347                 return true;
2348             }
2349         }
2351         /*
2352          * Called on thread termination.  This routine processes any remaining
2353          * control messages, clears out message queues, and sets a flag to
2354          * reject any future messages.
2355          */
2356         final void close()
2357         {
2358             static void onLinkDeadMsg(ref Message msg)
2359             {
2360                 assert(msg.convertsTo!(Tid),
2361                         "Message could be converted to Tid");
2362                 auto tid = msg.get!(Tid);
2364                 thisInfo.links.remove(tid);
2365                 if (tid == thisInfo.owner)
2366                     thisInfo.owner = Tid.init;
2367             }
2369             static void sweep(ref ListT list)
2370             {
2371                 for (auto range = list[]; !range.empty; range.popFront())
2372                 {
2373                     if (range.front.type == MsgType.linkDead)
2374                         onLinkDeadMsg(range.front);
2375                 }
2376             }
2378             ListT arrived;
2380             sweep(m_localBox);
2381             synchronized (m_lock)
2382             {
2383                 arrived.put(m_sharedBox);
2384                 m_closed = true;
2385             }
2386             m_localBox.clear();
2387             sweep(arrived);
2388         }
2390     private:
2391         // Routines involving local data only, no lock needed.
2393         bool mboxFull() @safe @nogc pure nothrow
2394         {
2395             return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length;
2396         }
2398         void updateMsgCount() @safe @nogc pure nothrow
2399         {
2400             m_localMsgs = m_localBox.length;
2401         }
2403         bool isControlMsg(ref Message msg) @safe @nogc pure nothrow
2404         {
2405             return msg.type != MsgType.standard && msg.type != MsgType.priority;
2406         }
2408         bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow
2409         {
2410             return msg.type == MsgType.priority;
2411         }
2413         bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow
2414         {
2415             return msg.type == MsgType.linkDead;
2416         }
2418         alias OnMaxFn = bool function(Tid);
2419         alias ListT = List!(Message);
2421         ListT m_localBox;
2422         ListT m_localPty;
2424         Mutex m_lock;
2425         Condition m_putMsg;
2426         Condition m_notFull;
2427         size_t m_putQueue;
2428         ListT m_sharedBox;
2429         ListT m_sharedPty;
2430         OnMaxFn m_onMaxMsgs;
2431         size_t m_localMsgs;
2432         size_t m_maxMsgs;
2433         bool m_closed;
2434     }
2436     /*
2437      *
2438      */
2439     struct List(T)
2440     {
2441         struct Range
2442         {
2443             import std.exception : enforce;
2445             @property bool empty() const
2446             {
2447                 return !;
2448             }
2450             @property ref T front()
2451             {
2452                 enforce(, "invalid list node");
2453                 return;
2454             }
2456             @property void front(T val)
2457             {
2458                 enforce(, "invalid list node");
2459        = val;
2460             }
2462             void popFront()
2463             {
2464                 enforce(, "invalid list node");
2465                 m_prev =;
2466             }
2468             private this(Node* p)
2469             {
2470                 m_prev = p;
2471             }
2473             private Node* m_prev;
2474         }
2476         void put(T val)
2477         {
2478             put(newNode(val));
2479         }
2481         void put(ref List!(T) rhs)
2482         {
2483             if (!rhs.empty)
2484             {
2485                 put(rhs.m_first);
2486                 while ( !is null)
2487                 {
2488                     m_last =;
2489                     m_count++;
2490                 }
2491                 rhs.m_first = null;
2492                 rhs.m_last = null;
2493                 rhs.m_count = 0;
2494             }
2495         }
2497         Range opSlice()
2498         {
2499             return Range(cast(Node*)&m_first);
2500         }
2502         void removeAt(Range r)
2503         {
2504             import std.exception : enforce;
2506             assert(m_count, "Can not remove from empty Range");
2507             Node* n = r.m_prev;
2508             enforce(n &&, "attempting to remove invalid list node");
2510             if (m_last is m_first)
2511                 m_last = null;
2512             else if (m_last is
2513                 m_last = n; // nocoverage
2514             Node* to_free =;
2515    =;
2516             freeNode(to_free);
2517             m_count--;
2518         }
2520         @property size_t length()
2521         {
2522             return m_count;
2523         }
2525         void clear()
2526         {
2527             m_first = m_last = null;
2528             m_count = 0;
2529         }
2531         @property bool empty()
2532         {
2533             return m_first is null;
2534         }
2536     private:
2537         struct Node
2538         {
2539             Node* next;
2540             T val;
2542             this(T v)
2543             {
2544                 val = v;
2545             }
2546         }
2548         static shared struct SpinLock
2549         {
2550             void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } }
2551             void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); }
2552             bool locked;
2553         }
2555         static shared SpinLock sm_lock;
2556         static shared Node* sm_head;
2558         Node* newNode(T v)
2559         {
2560             Node* n;
2561             {
2562                 sm_lock.lock();
2563                 scope (exit) sm_lock.unlock();
2565                 if (sm_head)
2566                 {
2567                     n = cast(Node*) sm_head;
2568                     sm_head =;
2569                 }
2570             }
2571             if (n)
2572             {
2573                 import core.lifetime : emplace;
2574                 emplace!Node(n, v);
2575             }
2576             else
2577             {
2578                 n = new Node(v);
2579             }
2580             return n;
2581         }
2583         void freeNode(Node* n)
2584         {
2585             // destroy val to free any owned GC memory
2586             destroy(n.val);
2588             sm_lock.lock();
2589             scope (exit) sm_lock.unlock();
2591             auto sn = cast(shared(Node)*) n;
2592    = sm_head;
2593             sm_head = sn;
2594         }
2596         void put(Node* n)
2597         {
2598             m_count++;
2599             if (!empty)
2600             {
2601        = n;
2602                 m_last = n;
2603                 return;
2604             }
2605             m_first = n;
2606             m_last = n;
2607         }
2609         Node* m_first;
2610         Node* m_last;
2611         size_t m_count;
2612     }
2613 }
2615 @system unittest
2616 {
2617     import std.typecons : tuple, Tuple;
2619     static void testfn(Tid tid)
2620     {
2621         receive((float val) { assert(0); }, (int val, int val2) {
2622             assert(val == 42 && val2 == 86);
2623         });
2624         receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); });
2625         receive((Variant val) {  });
2626         receive((string val) {
2627             if ("the quick brown fox" != val)
2628                 return false;
2629             return true;
2630         }, (string val) { assert(false); });
2631         prioritySend(tid, "done");
2632     }
2634     static void runTest(Tid tid)
2635     {
2636         send(tid, 42, 86);
2637         send(tid, tuple(42, 86));
2638         send(tid, "hello", "there");
2639         send(tid, "the quick brown fox");
2640         receive((string val) { assert(val == "done"); });
2641     }
2643     static void simpleTest()
2644     {
2645         auto tid = spawn(&testfn, thisTid);
2646         runTest(tid);
2648         // Run the test again with a limited mailbox size.
2649         tid = spawn(&testfn, thisTid);
2650         setMaxMailboxSize(tid, 2, OnCrowding.block);
2651         runTest(tid);
2652     }
2654     simpleTest();
2656     scheduler = new ThreadScheduler;
2657     simpleTest();
2658     scheduler = null;
2659 }
2661 private @property shared(Mutex) initOnceLock()
2662 {
2663     static shared Mutex lock;
2664     if (auto mtx = atomicLoad!(MemoryOrder.acq)(lock))
2665         return mtx;
2666     auto mtx = new shared Mutex;
2667     if (cas(&lock, cast(shared) null, mtx))
2668         return mtx;
2669     return atomicLoad!(MemoryOrder.acq)(lock);
2670 }
2672 /**
2673  * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a
2674  * thread-safe manner.
2675  *
2676  * The implementation guarantees that all threads simultaneously calling
2677  * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is
2678  * fully initialized. All side-effects of $(D_PARAM init) are globally visible
2679  * afterwards.
2680  *
2681  * Params:
2682  *   var = The variable to initialize
2683  *   init = The lazy initializer value
2684  *
2685  * Returns:
2686  *   A reference to the initialized variable
2687  */
2688 auto ref initOnce(alias var)(lazy typeof(var) init)
2689 {
2690     return initOnce!var(init, initOnceLock);
2691 }
2693 /// A typical use-case is to perform lazy but thread-safe initialization.
2694 @system unittest
2695 {
2696     static class MySingleton
2697     {
2698         static MySingleton instance()
2699         {
2700             __gshared MySingleton inst;
2701             return initOnce!inst(new MySingleton);
2702         }
2703     }
2705     assert(MySingleton.instance !is null);
2706 }
2708 @system unittest
2709 {
2710     static class MySingleton
2711     {
2712         static MySingleton instance()
2713         {
2714             __gshared MySingleton inst;
2715             return initOnce!inst(new MySingleton);
2716         }
2718     private:
2719         this() { val = ++cnt; }
2720         size_t val;
2721         __gshared size_t cnt;
2722     }
2724     foreach (_; 0 .. 10)
2725         spawn({ ownerTid.send(MySingleton.instance.val); });
2726     foreach (_; 0 .. 10)
2727         assert(receiveOnly!size_t == MySingleton.instance.val);
2728     assert(MySingleton.cnt == 1);
2729 }
2731 /**
2732  * Same as above, but takes a separate mutex instead of sharing one among
2733  * all initOnce instances.
2734  *
2735  * This should be used to avoid dead-locks when the $(D_PARAM init)
2736  * expression waits for the result of another thread that might also
2737  * call initOnce. Use with care.
2738  *
2739  * Params:
2740  *   var = The variable to initialize
2741  *   init = The lazy initializer value
2742  *   mutex = A mutex to prevent race conditions
2743  *
2744  * Returns:
2745  *   A reference to the initialized variable
2746  */
2747 auto ref initOnce(alias var)(lazy typeof(var) init, shared Mutex mutex)
2748 {
2749     // check that var is global, can't take address of a TLS variable
2750     static assert(is(typeof({ __gshared p = &var; })),
2751         "var must be 'static shared' or '__gshared'.");
2752     import core.atomic : atomicLoad, MemoryOrder, atomicStore;
2754     static shared bool flag;
2755     if (!atomicLoad!(MemoryOrder.acq)(flag))
2756     {
2757         synchronized (mutex)
2758         {
2759             if (!atomicLoad!(MemoryOrder.raw)(flag))
2760             {
2761                 var = init;
2762                 static if (!is(immutable typeof(var) == immutable noreturn))
2763                     atomicStore!(MemoryOrder.rel)(flag, true);
2764             }
2765         }
2766     }
2767     return var;
2768 }
2770 /// ditto
2771 auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex)
2772 {
2773     return initOnce!var(init, cast(shared) mutex);
2774 }
2776 /// Use a separate mutex when init blocks on another thread that might also call initOnce.
2777 @system unittest
2778 {
2779     import core.sync.mutex : Mutex;
2781     static shared bool varA, varB;
2782     static shared Mutex m;
2783     m = new shared Mutex;
2785     spawn({
2786         // use a different mutex for varB to avoid a dead-lock
2787         initOnce!varB(true, m);
2788         ownerTid.send(true);
2789     });
2790     // init depends on the result of the spawned thread
2791     initOnce!varA(receiveOnly!bool);
2792     assert(varA == true);
2793     assert(varB == true);
2794 }
2796 @system unittest
2797 {
2798     static shared bool a;
2799     __gshared bool b;
2800     static bool c;
2801     bool d;
2802     initOnce!a(true);
2803     initOnce!b(true);
2804     static assert(!__traits(compiles, initOnce!c(true))); // TLS
2805     static assert(!__traits(compiles, initOnce!d(true))); // local variable
2806 }
2808 // test ability to send shared arrays
2809 @system unittest
2810 {
2811     static shared int[] x = new shared(int)[1];
2812     auto tid = spawn({
2813         auto arr = receiveOnly!(shared(int)[]);
2814         arr[0] = 5;
2815         ownerTid.send(true);
2816     });
2817     tid.send(x);
2818     receiveOnly!(bool);
2819     assert(x[0] == 5);
2820 }
2822 //
2823 @system unittest
2824 {
2825     immutable aa = ["0":0];
2826     thisTid.send(aa);
2827     receiveOnly!(immutable int[string]); // compile error
2828 }
2830 //
2831 @system unittest
2832 {
2833     static struct Aggregate { const int a; const int[5] b; }
2834     static void t1(Tid mainTid)
2835     {
2836         const sendMe = Aggregate(42, [1, 2, 3, 4, 5]);
2837         mainTid.send(sendMe);
2838     }
2840     spawn(&t1, thisTid);
2841     auto result1 = receiveOnly!(const Aggregate)();
2842     immutable expected = Aggregate(42, [1, 2, 3, 4, 5]);
2843     assert(result1 == expected);
2844 }
2846 // Noreturn support
2847 @system unittest
2848 {
2849     static noreturn foo(int) { throw new Exception(""); }
2851     if (false) spawn(&foo, 1);
2852     if (false) spawnLinked(&foo, 1);
2854     if (false) receive(&foo);
2855     if (false) receiveTimeout(Duration.init, &foo);
2857     // Wrapped in __traits(compiles) to skip codegen which crashes dmd's backend
2858     static assert(__traits(compiles, receiveOnly!noreturn()                 ));
2859     static assert(__traits(compiles, send(Tid.init, noreturn.init)          ));
2860     static assert(__traits(compiles, prioritySend(Tid.init, noreturn.init)  ));
2861     static assert(__traits(compiles, yield(noreturn.init)                   ));
2863     static assert(__traits(compiles, {
2864         __gshared noreturn n;
2865         initOnce!n(noreturn.init);
2866     }));
2867 }