The OpenD Programming Language

1 /**
2 `std.parallelism` implements high-level primitives for SMP parallelism.
3 These include parallel foreach, parallel reduce, parallel eager map, pipelining
4 and future/promise parallelism.  `std.parallelism` is recommended when the
5 same operation is to be executed in parallel on different data, or when a
6 function is to be executed in a background thread and its result returned to a
7 well-defined main thread.  For communication between arbitrary threads, see
8 `std.concurrency`.
9 
10 `std.parallelism` is based on the concept of a `Task`.  A `Task` is an
11 object that represents the fundamental unit of work in this library and may be
12 executed in parallel with any other `Task`.  Using `Task`
13 directly allows programming with a future/promise paradigm.  All other
14 supported parallelism paradigms (parallel foreach, map, reduce, pipelining)
15 represent an additional level of abstraction over `Task`.  They
16 automatically create one or more `Task` objects, or closely related types
17 that are conceptually identical but not part of the public API.
18 
19 After creation, a `Task` may be executed in a new thread, or submitted
20 to a `TaskPool` for execution.  A `TaskPool` encapsulates a task queue
21 and its worker threads.  Its purpose is to efficiently map a large
22 number of `Task`s onto a smaller number of threads.  A task queue is a
23 FIFO queue of `Task` objects that have been submitted to the
24 `TaskPool` and are awaiting execution.  A worker thread is a thread that
25 is associated with exactly one task queue.  It executes the `Task` at the
26 front of its queue when the queue has work available, or sleeps when
27 no work is available.  Each task queue is associated with zero or
28 more worker threads.  If the result of a `Task` is needed before execution
29 by a worker thread has begun, the `Task` can be removed from the task queue
30 and executed immediately in the thread where the result is needed.
31 
32 Warning:  Unless marked as `@trusted` or `@safe`, artifacts in
33           this module allow implicit data sharing between threads and cannot
34           guarantee that client code is free from low level data races.
35 
36 Source:    $(PHOBOSSRC std/parallelism.d)
37 Author:  David Simcha
38 Copyright:  Copyright (c) 2009-2011, David Simcha.
39 License:    $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0)
40 */
41 module std.parallelism;
42 
43 version (WebAssembly) {} else:
44 
45 version (OSX)
46     version = Darwin;
47 else version (iOS)
48     version = Darwin;
49 else version (TVOS)
50     version = Darwin;
51 else version (WatchOS)
52     version = Darwin;
53 
54 ///
55 @system unittest
56 {
57     import std.algorithm.iteration : map;
58     import std.math.operations : isClose;
59     import std.parallelism : taskPool;
60     import std.range : iota;
61 
62     // Parallel reduce can be combined with
63     // std.algorithm.iteration.map to interesting effect.
64     // The following example (thanks to Russel Winder)
65     // calculates pi by quadrature  using
66     // std.algorithm.map and TaskPool.reduce.
67     // getTerm is evaluated in parallel as needed by
68     // TaskPool.reduce.
69     //
70     // Timings on an Intel i5-3450 quad core machine
71     // for n = 1_000_000_000:
72     //
73     // TaskPool.reduce:       1.067 s
74     // std.algorithm.reduce:  4.011 s
75 
76     enum n = 1_000_000;
77     enum delta = 1.0 / n;
78 
79     alias getTerm = (int i)
80     {
81         immutable x = ( i - 0.5 ) * delta;
82         return delta / ( 1.0 + x * x ) ;
83     };
84 
85     immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm);
86 
87     assert(pi.isClose(3.14159, 1e-5));
88 }
89 
90 import core.atomic;
91 import core.memory;
92 import core.sync.condition;
93 import core.thread;
94 
95 import std.functional;
96 import std.meta;
97 import std.range.primitives;
98 import std.traits;
99 
100 /*
101 (For now public undocumented with reserved name.)
102 
103 A lazily initialized global constant. The underlying value is a shared global
104 statically initialized to `outOfBandValue` which must not be a legit value of
105 the constant. Upon the first call the situation is detected and the global is
106 initialized by calling `initializer`. The initializer is assumed to be pure
107 (even if not marked as such), i.e. return the same value upon repeated calls.
108 For that reason, no special precautions are taken so `initializer` may be called
109 more than one time leading to benign races on the cached value.
110 
111 In the quiescent state the cost of the function is an atomic load from a global.
112 
113 Params:
114     T = The type of the pseudo-constant (may be qualified)
115     outOfBandValue = A value that cannot be valid, it is used for initialization
116     initializer = The function performing initialization; must be `nothrow`
117 
118 Returns:
119     The lazily initialized value
120 */
121 @property pure
122 T __lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)()
123 if (is(Unqual!T : T)
124     && is(typeof(initializer()) : T)
125     && is(typeof(outOfBandValue) : T))
126 {
127     static T impl() nothrow
128     {
129         // Thread-local cache
130         static Unqual!T tls = outOfBandValue;
131         auto local = tls;
132         // Shortest path, no atomic operations
133         if (local != outOfBandValue) return local;
134         // Process-level cache
135         static shared Unqual!T result = outOfBandValue;
136         // Initialize both process-level cache and tls
137         local = atomicLoad(result);
138         if (local == outOfBandValue)
139         {
140             local = initializer();
141             atomicStore(result, local);
142         }
143         tls = local;
144         return local;
145     }
146 
147     import std.traits : SetFunctionAttributes;
148     alias Fun = SetFunctionAttributes!(typeof(&impl), "D",
149         functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_);
150     auto purified = (() @trusted => cast(Fun) &impl)();
151     return purified();
152 }
153 
154 // Returns the size of a cache line.
155 alias cacheLineSize =
156     __lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl);
157 
158 private size_t cacheLineSizeImpl() @nogc nothrow @trusted
159 {
160     size_t result = 0;
161     import core.cpuid : datacache;
162     foreach (ref const cachelevel; datacache)
163     {
164         if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max)
165         {
166             result = cachelevel.lineSize;
167         }
168     }
169     return result;
170 }
171 
172 @nogc @safe nothrow unittest
173 {
174     assert(cacheLineSize == cacheLineSizeImpl);
175 }
176 
177 /* Atomics code.  These forward to core.atomic, but are written like this
178    for two reasons:
179 
180    1.  They used to actually contain ASM code and I don' want to have to change
181        to directly calling core.atomic in a zillion different places.
182 
183    2.  core.atomic has some misc. issues that make my use cases difficult
184        without wrapping it.  If I didn't wrap it, casts would be required
185        basically everywhere.
186 */
187 private void atomicSetUbyte(T)(ref T stuff, T newVal)
188 if (__traits(isIntegral, T) && is(T : ubyte))
189 {
190     //core.atomic.cas(cast(shared) &stuff, stuff, newVal);
191     atomicStore(*(cast(shared) &stuff), newVal);
192 }
193 
194 private ubyte atomicReadUbyte(T)(ref T val)
195 if (__traits(isIntegral, T) && is(T : ubyte))
196 {
197     return atomicLoad(*(cast(shared) &val));
198 }
199 
200 // This gets rid of the need for a lot of annoying casts in other parts of the
201 // code, when enums are involved.
202 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal)
203 if (__traits(isIntegral, T) && is(T : ubyte))
204 {
205     return core.atomic.cas(cast(shared) &stuff, testVal, newVal);
206 }
207 
208 /*--------------------- Generic helper functions, etc.------------------------*/
209 private template MapType(R, functions...)
210 {
211     static assert(functions.length);
212 
213     ElementType!R e = void;
214     alias MapType =
215         typeof(adjoin!(staticMap!(unaryFun, functions))(e));
216 }
217 
218 private template ReduceType(alias fun, R, E)
219 {
220     alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init));
221 }
222 
223 private template noUnsharedAliasing(T)
224 {
225     enum bool noUnsharedAliasing = !hasUnsharedAliasing!T;
226 }
227 
228 // This template tests whether a function may be executed in parallel from
229 // @safe code via Task.executeInNewThread().  There is an additional
230 // requirement for executing it via a TaskPool.  (See isSafeReturn).
231 private template isSafeTask(F)
232 {
233     enum bool isSafeTask =
234         (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 &&
235         (functionAttributes!F & FunctionAttribute.ref_) == 0 &&
236         (isFunctionPointer!F || !hasUnsharedAliasing!F) &&
237         allSatisfy!(noUnsharedAliasing, Parameters!F);
238 }
239 
240 @safe unittest
241 {
242     alias F1 = void function() @safe;
243     alias F2 = void function();
244     alias F3 = void function(uint, string) @trusted;
245     alias F4 = void function(uint, char[]);
246 
247     static assert( isSafeTask!F1);
248     static assert(!isSafeTask!F2);
249     static assert( isSafeTask!F3);
250     static assert(!isSafeTask!F4);
251 
252     alias F5 = uint[] function(uint, string) pure @trusted;
253     static assert( isSafeTask!F5);
254 }
255 
256 // This function decides whether Tasks that meet all of the other requirements
257 // for being executed from @safe code can be executed on a TaskPool.
258 // When executing via TaskPool, it's theoretically possible
259 // to return a value that is also pointed to by a worker thread's thread local
260 // storage.  When executing from executeInNewThread(), the thread that executed
261 // the Task is terminated by the time the return value is visible in the calling
262 // thread, so this is a non-issue.  It's also a non-issue for pure functions
263 // since they can't read global state.
264 private template isSafeReturn(T)
265 {
266     static if (!hasUnsharedAliasing!(T.ReturnType))
267     {
268         enum isSafeReturn = true;
269     }
270     else static if (T.isPure)
271     {
272         enum isSafeReturn = true;
273     }
274     else
275     {
276         enum isSafeReturn = false;
277     }
278 }
279 
280 private template randAssignable(R)
281 {
282     enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R;
283 }
284 
285 private enum TaskStatus : ubyte
286 {
287     notStarted,
288     inProgress,
289     done
290 }
291 
292 private template AliasReturn(alias fun, T...)
293 {
294     alias AliasReturn = typeof({ T args; return fun(args); });
295 }
296 
297 // Should be private, but std.algorithm.reduce is used in the zero-thread case
298 // and won't work w/ private.
299 template reduceAdjoin(functions...)
300 {
301     static if (functions.length == 1)
302     {
303         alias reduceAdjoin = binaryFun!(functions[0]);
304     }
305     else
306     {
307         T reduceAdjoin(T, U)(T lhs, U rhs)
308         {
309             alias funs = staticMap!(binaryFun, functions);
310 
311             foreach (i, Unused; typeof(lhs.expand))
312             {
313                 lhs.expand[i] = funs[i](lhs.expand[i], rhs);
314             }
315 
316             return lhs;
317         }
318     }
319 }
320 
321 private template reduceFinish(functions...)
322 {
323     static if (functions.length == 1)
324     {
325         alias reduceFinish = binaryFun!(functions[0]);
326     }
327     else
328     {
329         T reduceFinish(T)(T lhs, T rhs)
330         {
331             alias funs = staticMap!(binaryFun, functions);
332 
333             foreach (i, Unused; typeof(lhs.expand))
334             {
335                 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]);
336             }
337 
338             return lhs;
339         }
340     }
341 }
342 
343 private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2)
344 {
345     enum isRoundRobin = true;
346 }
347 
348 private template isRoundRobin(T)
349 {
350     enum isRoundRobin = false;
351 }
352 
353 @safe unittest
354 {
355     static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate())));
356     static assert(!isRoundRobin!(uint));
357 }
358 
359 // This is the base "class" for all of the other tasks.  Using C-style
360 // polymorphism to allow more direct control over memory allocation, etc.
361 private struct AbstractTask
362 {
363     AbstractTask* prev;
364     AbstractTask* next;
365 
366     // Pointer to a function that executes this task.
367     void function(void*) runTask;
368 
369     Throwable exception;
370     ubyte taskStatus = TaskStatus.notStarted;
371 
372     bool done() @property
373     {
374         if (atomicReadUbyte(taskStatus) == TaskStatus.done)
375         {
376             if (exception)
377             {
378                 throw exception;
379             }
380 
381             return true;
382         }
383 
384         return false;
385     }
386 
387     void job()
388     {
389         runTask(&this);
390     }
391 }
392 
393 /**
394 `Task` represents the fundamental unit of work.  A `Task` may be
395 executed in parallel with any other `Task`.  Using this struct directly
396 allows future/promise parallelism.  In this paradigm, a function (or delegate
397 or other callable) is executed in a thread other than the one it was called
398 from.  The calling thread does not block while the function is being executed.
399 A call to `workForce`, `yieldForce`, or `spinForce` is used to
400 ensure that the `Task` has finished executing and to obtain the return
401 value, if any.  These functions and `done` also act as full memory barriers,
402 meaning that any memory writes made in the thread that executed the `Task`
403 are guaranteed to be visible in the calling thread after one of these functions
404 returns.
405 
406 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can
407 be used to create an instance of this struct.  See `task` for usage examples.
408 
409 Function results are returned from `yieldForce`, `spinForce` and
410 `workForce` by ref.  If `fun` returns by ref, the reference will point
411 to the returned reference of `fun`.  Otherwise it will point to a
412 field in this struct.
413 
414 Copying of this struct is disabled, since it would provide no useful semantics.
415 If you want to pass this struct around, you should do so by reference or
416 pointer.
417 
418 Bugs:  Changes to `ref` and `out` arguments are not propagated to the
419        call site, only to `args` in this struct.
420 */
421 struct Task(alias fun, Args...)
422 {
423     @(imported!"core.attribute".mutableRefInit) private AbstractTask base = {runTask : &impl};
424     private alias base this;
425 
426     private @property AbstractTask* basePtr()
427     {
428         return &base;
429     }
430 
431     private static void impl(void* myTask)
432     {
433         import std.algorithm.internal : addressOf;
434 
435         Task* myCastedTask = cast(typeof(this)*) myTask;
436         static if (is(ReturnType == void))
437         {
438             fun(myCastedTask._args);
439         }
440         else static if (is(typeof(&(fun(myCastedTask._args)))))
441         {
442             myCastedTask.returnVal = addressOf(fun(myCastedTask._args));
443         }
444         else
445         {
446             myCastedTask.returnVal = fun(myCastedTask._args);
447         }
448     }
449 
450     private TaskPool pool;
451     private bool isScoped;  // True if created with scopedTask.
452 
453     Args _args;
454 
455     /**
456     The arguments the function was called with.  Changes to `out` and
457     `ref` arguments will be visible here.
458     */
459     static if (__traits(isSame, fun, run))
460     {
461         alias args = _args[1..$];
462     }
463     else
464     {
465         alias args = _args;
466     }
467 
468 
469     // The purpose of this code is to decide whether functions whose
470     // return values have unshared aliasing can be executed via
471     // TaskPool from @safe code.  See isSafeReturn.
472     static if (__traits(isSame, fun, run))
473     {
474         static if (isFunctionPointer!(_args[0]))
475         {
476             private enum bool isPure =
477             (functionAttributes!(Args[0]) & FunctionAttribute.pure_) != 0;
478         }
479         else
480         {
481             // BUG:  Should check this for delegates too, but std.traits
482             //       apparently doesn't allow this.  isPure is irrelevant
483             //       for delegates, at least for now since shared delegates
484             //       don't work.
485             private enum bool isPure = false;
486         }
487 
488     }
489     else
490     {
491         // We already know that we can't execute aliases in @safe code, so
492         // just put a dummy value here.
493         private enum bool isPure = false;
494     }
495 
496 
497     /**
498     The return type of the function called by this `Task`.  This can be
499     `void`.
500     */
501     alias ReturnType = typeof(fun(_args));
502 
503     static if (!is(ReturnType == void))
504     {
505         static if (is(typeof(&fun(_args))))
506         {
507             // Ref return.
508             ReturnType* returnVal;
509 
510             ref ReturnType fixRef(ReturnType* val)
511             {
512                 return *val;
513             }
514 
515         }
516         else
517         {
518             ReturnType returnVal;
519 
520             ref ReturnType fixRef(ref ReturnType val)
521             {
522                 return val;
523             }
524         }
525     }
526 
527     private void enforcePool()
528     {
529         import std.exception : enforce;
530         enforce(this.pool !is null, "Job not submitted yet.");
531     }
532 
533     static if (Args.length > 0)
534     {
535         private this(Args args)
536         {
537             _args = args;
538         }
539     }
540 
541     // Work around DMD bug https://issues.dlang.org/show_bug.cgi?id=6588,
542     // allow immutable elements.
543     static if (allSatisfy!(isAssignable, Args))
544     {
545         typeof(this) opAssign(typeof(this) rhs)
546         {
547             foreach (i, Type; typeof(this.tupleof))
548             {
549                 this.tupleof[i] = rhs.tupleof[i];
550             }
551             return this;
552         }
553     }
554     else
555     {
556         @disable typeof(this) opAssign(typeof(this) rhs);
557     }
558 
559     /**
560     If the `Task` isn't started yet, execute it in the current thread.
561     If it's done, return its return value, if any.  If it's in progress,
562     busy spin until it's done, then return the return value.  If it threw
563     an exception, rethrow that exception.
564 
565     This function should be used when you expect the result of the
566     `Task` to be available on a timescale shorter than that of an OS
567     context switch.
568      */
569     @property ref ReturnType spinForce() @trusted
570     {
571         enforcePool();
572 
573         this.pool.tryDeleteExecute(basePtr);
574 
575         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {}
576 
577         if (exception)
578         {
579             throw exception;
580         }
581 
582         static if (!is(ReturnType == void))
583         {
584             return fixRef(this.returnVal);
585         }
586     }
587 
588     /**
589     If the `Task` isn't started yet, execute it in the current thread.
590     If it's done, return its return value, if any.  If it's in progress,
591     wait on a condition variable.  If it threw an exception, rethrow that
592     exception.
593 
594     This function should be used for expensive functions, as waiting on a
595     condition variable introduces latency, but avoids wasted CPU cycles.
596      */
597     @property ref ReturnType yieldForce() @trusted
598     {
599         enforcePool();
600         this.pool.tryDeleteExecute(basePtr);
601 
602         if (done)
603         {
604             static if (is(ReturnType == void))
605             {
606                 return;
607             }
608             else
609             {
610                 return fixRef(this.returnVal);
611             }
612         }
613 
614         pool.waiterLock();
615         scope(exit) pool.waiterUnlock();
616 
617         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done)
618         {
619             pool.waitUntilCompletion();
620         }
621 
622         if (exception)
623         {
624             throw exception; // nocoverage
625         }
626 
627         static if (!is(ReturnType == void))
628         {
629             return fixRef(this.returnVal);
630         }
631     }
632 
633     /**
634     If this `Task` was not started yet, execute it in the current
635     thread.  If it is finished, return its result.  If it is in progress,
636     execute any other `Task` from the `TaskPool` instance that
637     this `Task` was submitted to until this one
638     is finished.  If it threw an exception, rethrow that exception.
639     If no other tasks are available or this `Task` was executed using
640     `executeInNewThread`, wait on a condition variable.
641      */
642     @property ref ReturnType workForce() @trusted
643     {
644         enforcePool();
645         this.pool.tryDeleteExecute(basePtr);
646 
647         while (true)
648         {
649             if (done)    // done() implicitly checks for exceptions.
650             {
651                 static if (is(ReturnType == void))
652                 {
653                     return;
654                 }
655                 else
656                 {
657                     return fixRef(this.returnVal);
658                 }
659             }
660 
661             AbstractTask* job;
662             {
663                 // Locking explicitly and calling popNoSync() because
664                 // pop() waits on a condition variable if there are no Tasks
665                 // in the queue.
666 
667                 pool.queueLock();
668                 scope(exit) pool.queueUnlock();
669                 job = pool.popNoSync();
670             }
671 
672 
673             if (job !is null)
674             {
675 
676                 version (verboseUnittest)
677                 {
678                     stderr.writeln("Doing workForce work.");
679                 }
680 
681                 pool.doJob(job);
682 
683                 if (done)
684                 {
685                     static if (is(ReturnType == void))
686                     {
687                         return;
688                     }
689                     else
690                     {
691                         return fixRef(this.returnVal);
692                     }
693                 }
694             }
695             else
696             {
697                 version (verboseUnittest)
698                 {
699                     stderr.writeln("Yield from workForce.");
700                 }
701 
702                 return yieldForce;
703             }
704         }
705     }
706 
707     /**
708     Returns `true` if the `Task` is finished executing.
709 
710     Throws:  Rethrows any exception thrown during the execution of the
711              `Task`.
712     */
713     @property bool done() @trusted
714     {
715         // Explicitly forwarded for documentation purposes.
716         return base.done;
717     }
718 
719     /**
720     Create a new thread for executing this `Task`, execute it in the
721     newly created thread, then terminate the thread.  This can be used for
722     future/promise parallelism.  An explicit priority may be given
723     to the `Task`.  If one is provided, its value is forwarded to
724     `core.thread.Thread.priority`. See $(REF task, std,parallelism) for
725     usage example.
726     */
727     void executeInNewThread() @trusted
728     {
729         pool = new TaskPool(basePtr);
730     }
731 
732     /// Ditto
733     void executeInNewThread(int priority) @trusted
734     {
735         pool = new TaskPool(basePtr, priority);
736     }
737 
738     @safe ~this()
739     {
740         if (isScoped && pool !is null && taskStatus != TaskStatus.done)
741         {
742             yieldForce;
743         }
744     }
745 
746     // When this is uncommented, it somehow gets called on returning from
747     // scopedTask even though the struct shouldn't be getting copied.
748     //@disable this(this) {}
749 }
750 
751 // Calls `fpOrDelegate` with `args`.  This is an
752 // adapter that makes `Task` work with delegates, function pointers and
753 // functors instead of just aliases.
754 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args)
755 {
756     return fpOrDelegate(args);
757 }
758 
759 /**
760 Creates a `Task` on the GC heap that calls an alias.  This may be executed
761 via `Task.executeInNewThread` or by submitting to a
762 $(REF TaskPool, std,parallelism).  A globally accessible instance of
763 `TaskPool` is provided by $(REF taskPool, std,parallelism).
764 
765 Returns:  A pointer to the `Task`.
766 
767 Example:
768 ---
769 // Read two files into memory at the same time.
770 import std.file;
771 
772 void main()
773 {
774     // Create and execute a Task for reading
775     // foo.txt.
776     auto file1Task = task!read("foo.txt");
777     file1Task.executeInNewThread();
778 
779     // Read bar.txt in parallel.
780     auto file2Data = read("bar.txt");
781 
782     // Get the results of reading foo.txt.
783     auto file1Data = file1Task.yieldForce;
784 }
785 ---
786 
787 ---
788 // Sorts an array using a parallel quick sort algorithm.
789 // The first partition is done serially.  Both recursion
790 // branches are then executed in parallel.
791 //
792 // Timings for sorting an array of 1,000,000 doubles on
793 // an Athlon 64 X2 dual core machine:
794 //
795 // This implementation:               176 milliseconds.
796 // Equivalent serial implementation:  280 milliseconds
797 void parallelSort(T)(T[] data)
798 {
799     // Sort small subarrays serially.
800     if (data.length < 100)
801     {
802          std.algorithm.sort(data);
803          return;
804     }
805 
806     // Partition the array.
807     swap(data[$ / 2], data[$ - 1]);
808     auto pivot = data[$ - 1];
809     bool lessThanPivot(T elem) { return elem < pivot; }
810 
811     auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]);
812     swap(data[$ - greaterEqual.length - 1], data[$ - 1]);
813 
814     auto less = data[0..$ - greaterEqual.length - 1];
815     greaterEqual = data[$ - greaterEqual.length..$];
816 
817     // Execute both recursion branches in parallel.
818     auto recurseTask = task!parallelSort(greaterEqual);
819     taskPool.put(recurseTask);
820     parallelSort(less);
821     recurseTask.yieldForce;
822 }
823 ---
824 */
825 auto task(alias fun, Args...)(Args args)
826 {
827     return new Task!(fun, Args)(args);
828 }
829 
830 /**
831 Creates a `Task` on the GC heap that calls a function pointer, delegate, or
832 class/struct with overloaded opCall.
833 
834 Example:
835 ---
836 // Read two files in at the same time again,
837 // but this time use a function pointer instead
838 // of an alias to represent std.file.read.
839 import std.file;
840 
841 void main()
842 {
843     // Create and execute a Task for reading
844     // foo.txt.
845     auto file1Task = task(&read!string, "foo.txt", size_t.max);
846     file1Task.executeInNewThread();
847 
848     // Read bar.txt in parallel.
849     auto file2Data = read("bar.txt");
850 
851     // Get the results of reading foo.txt.
852     auto file1Data = file1Task.yieldForce;
853 }
854 ---
855 
856 Notes: This function takes a non-scope delegate, meaning it can be
857        used with closures.  If you can't allocate a closure due to objects
858        on the stack that have scoped destruction, see `scopedTask`, which
859        takes a scope delegate.
860  */
861 auto task(F, Args...)(F delegateOrFp, Args args)
862 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
863 {
864     return new Task!(run, F, Args)(delegateOrFp, args);
865 }
866 
867 /**
868 Version of `task` usable from `@safe` code.  Usage mechanics are
869 identical to the non-@safe case, but safety introduces some restrictions:
870 
871 1.  `fun` must be @safe or @trusted.
872 
873 2.  `F` must not have any unshared aliasing as defined by
874     $(REF hasUnsharedAliasing, std,traits).  This means it
875     may not be an unshared delegate or a non-shared class or struct
876     with overloaded `opCall`.  This also precludes accepting template
877     alias parameters.
878 
879 3.  `Args` must not have unshared aliasing.
880 
881 4.  `fun` must not return by reference.
882 
883 5.  The return type must not have unshared aliasing unless `fun` is
884     `pure` or the `Task` is executed via `executeInNewThread` instead
885     of using a `TaskPool`.
886 
887 */
888 @trusted auto task(F, Args...)(F fun, Args args)
889 if (is(typeof(fun(args))) && isSafeTask!F)
890 {
891     return new Task!(run, F, Args)(fun, args);
892 }
893 
894 /**
895 These functions allow the creation of `Task` objects on the stack rather
896 than the GC heap.  The lifetime of a `Task` created by `scopedTask`
897 cannot exceed the lifetime of the scope it was created in.
898 
899 `scopedTask` might be preferred over `task`:
900 
901 1.  When a `Task` that calls a delegate is being created and a closure
902     cannot be allocated due to objects on the stack that have scoped
903     destruction.  The delegate overload of `scopedTask` takes a `scope`
904     delegate.
905 
906 2.  As a micro-optimization, to avoid the heap allocation associated with
907     `task` or with the creation of a closure.
908 
909 Usage is otherwise identical to `task`.
910 
911 Notes:  `Task` objects created using `scopedTask` will automatically
912 call `Task.yieldForce` in their destructor if necessary to ensure
913 the `Task` is complete before the stack frame they reside on is destroyed.
914 */
915 auto scopedTask(alias fun, Args...)(Args args)
916 {
917     auto ret = Task!(fun, Args)(args);
918     ret.isScoped = true;
919     return ret;
920 }
921 
922 /// Ditto
923 auto scopedTask(F, Args...)(scope F delegateOrFp, Args args)
924 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
925 {
926     auto ret = Task!(run, F, Args)(delegateOrFp, args);
927     ret.isScoped = true;
928     return ret;
929 }
930 
931 /// Ditto
932 @trusted auto scopedTask(F, Args...)(F fun, Args args)
933 if (is(typeof(fun(args))) && isSafeTask!F)
934 {
935     auto ret = Task!(run, F, Args)(fun, args);
936     ret.isScoped = true;
937     return ret;
938 }
939 
940 /**
941 The total number of CPU cores available on the current machine, as reported by
942 the operating system.
943 */
944 alias totalCPUs =
945     __lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl);
946 
947 uint totalCPUsImpl() @nogc nothrow @trusted
948 {
949     version (Windows)
950     {
951         // BUGS:  Only works on Windows 2000 and above.
952         import core.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo;
953         import std.algorithm.comparison : max;
954         SYSTEM_INFO si;
955         GetSystemInfo(&si);
956         return max(1, cast(uint) si.dwNumberOfProcessors);
957     }
958     else version (linux)
959     {
960         import core.stdc.stdlib : calloc;
961         import core.stdc.string : memset;
962         import core.sys.linux.sched : CPU_ALLOC_SIZE, CPU_FREE, CPU_COUNT, CPU_COUNT_S, cpu_set_t, sched_getaffinity;
963         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
964 
965         int count = 0;
966 
967         /**
968          * According to ruby's source code, CPU_ALLOC() doesn't work as expected.
969          *  see: https://github.com/ruby/ruby/commit/7d9e04de496915dd9e4544ee18b1a0026dc79242
970          *
971          *  The hardcode number also comes from ruby's source code.
972          *  see: https://github.com/ruby/ruby/commit/0fa75e813ecf0f5f3dd01f89aa76d0e25ab4fcd4
973          */
974         for (int n = 64; n <= 16384; n *= 2)
975         {
976             size_t size = CPU_ALLOC_SIZE(count);
977             if (size >= 0x400)
978             {
979                 auto cpuset = cast(cpu_set_t*) calloc(1, size);
980                 if (cpuset is null) break;
981                 if (sched_getaffinity(0, size, cpuset) == 0)
982                 {
983                     count = CPU_COUNT_S(size, cpuset);
984                 }
985                 CPU_FREE(cpuset);
986             }
987             else
988             {
989                 cpu_set_t cpuset;
990                 if (sched_getaffinity(0, cpu_set_t.sizeof, &cpuset) == 0)
991                 {
992                     count = CPU_COUNT(&cpuset);
993                 }
994             }
995 
996             if (count > 0)
997                 return cast(uint) count;
998         }
999 
1000         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1001     }
1002     else version (Darwin)
1003     {
1004         import core.sys.darwin.sys.sysctl : sysctlbyname;
1005         uint result;
1006         size_t len = result.sizeof;
1007         sysctlbyname("hw.physicalcpu", &result, &len, null, 0);
1008         return result;
1009     }
1010     else version (DragonFlyBSD)
1011     {
1012         import core.sys.dragonflybsd.sys.sysctl : sysctlbyname;
1013         uint result;
1014         size_t len = result.sizeof;
1015         sysctlbyname("hw.ncpu", &result, &len, null, 0);
1016         return result;
1017     }
1018     else version (FreeBSD)
1019     {
1020         import core.sys.freebsd.sys.sysctl : sysctlbyname;
1021         uint result;
1022         size_t len = result.sizeof;
1023         sysctlbyname("hw.ncpu", &result, &len, null, 0);
1024         return result;
1025     }
1026     else version (NetBSD)
1027     {
1028         import core.sys.netbsd.sys.sysctl : sysctlbyname;
1029         uint result;
1030         size_t len = result.sizeof;
1031         sysctlbyname("hw.ncpu", &result, &len, null, 0);
1032         return result;
1033     }
1034     else version (Solaris)
1035     {
1036         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1037         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1038     }
1039     else version (OpenBSD)
1040     {
1041         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1042         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1043     }
1044     else version (Hurd)
1045     {
1046         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1047         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1048     }
1049     else
1050     {
1051         static assert(0, "Don't know how to get N CPUs on this OS.");
1052     }
1053 }
1054 
1055 /*
1056 This class serves two purposes:
1057 
1058 1.  It distinguishes std.parallelism threads from other threads so that
1059     the std.parallelism daemon threads can be terminated.
1060 
1061 2.  It adds a reference to the pool that the thread is a member of,
1062     which is also necessary to allow the daemon threads to be properly
1063     terminated.
1064 */
1065 private final class ParallelismThread : Thread
1066 {
1067     this(void delegate() dg)
1068     {
1069         super(dg);
1070     }
1071 
1072     TaskPool pool;
1073 }
1074 
1075 // Kill daemon threads.
1076 shared static ~this()
1077 {
1078     foreach (ref thread; Thread)
1079     {
1080         auto pthread = cast(ParallelismThread) thread;
1081         if (pthread is null) continue;
1082         auto pool = pthread.pool;
1083         if (!pool.isDaemon) continue;
1084         pool.stop();
1085         pthread.join();
1086     }
1087 }
1088 
1089 /**
1090 This class encapsulates a task queue and a set of worker threads.  Its purpose
1091 is to efficiently map a large number of `Task`s onto a smaller number of
1092 threads.  A task queue is a FIFO queue of `Task` objects that have been
1093 submitted to the `TaskPool` and are awaiting execution.  A worker thread is a
1094 thread that executes the `Task` at the front of the queue when one is
1095 available and sleeps when the queue is empty.
1096 
1097 This class should usually be used via the global instantiation
1098 available via the $(REF taskPool, std,parallelism) property.
1099 Occasionally it is useful to explicitly instantiate a `TaskPool`:
1100 
1101 1.  When you want `TaskPool` instances with multiple priorities, for example
1102     a low priority pool and a high priority pool.
1103 
1104 2.  When the threads in the global task pool are waiting on a synchronization
1105     primitive (for example a mutex), and you want to parallelize the code that
1106     needs to run before these threads can be resumed.
1107 
1108 Note: The worker threads in this pool will not stop until
1109       `stop` or `finish` is called, even if the main thread
1110       has finished already. This may lead to programs that
1111       never end. If you do not want this behaviour, you can set `isDaemon`
1112       to true.
1113  */
1114 final class TaskPool
1115 {
1116 private:
1117 
1118     // A pool can either be a regular pool or a single-task pool.  A
1119     // single-task pool is a dummy pool that's fired up for
1120     // Task.executeInNewThread().
1121     bool isSingleTask;
1122 
1123     ParallelismThread[] pool;
1124     Thread singleTaskThread;
1125 
1126     AbstractTask* head;
1127     AbstractTask* tail;
1128     PoolState status = PoolState.running;
1129     Condition workerCondition;
1130     Condition waiterCondition;
1131     Mutex queueMutex;
1132     Mutex waiterMutex;  // For waiterCondition
1133 
1134     // The instanceStartIndex of the next instance that will be created.
1135     __gshared size_t nextInstanceIndex = 1;
1136 
1137     // The index of the current thread.
1138     static size_t threadIndex;
1139 
1140     // The index of the first thread in this instance.
1141     immutable size_t instanceStartIndex;
1142 
1143     // The index that the next thread to be initialized in this pool will have.
1144     size_t nextThreadIndex;
1145 
1146     enum PoolState : ubyte
1147     {
1148         running,
1149         finishing,
1150         stopNow
1151     }
1152 
1153     void doJob(AbstractTask* job)
1154     {
1155         assert(job.taskStatus == TaskStatus.inProgress);
1156         assert(job.next is null);
1157         assert(job.prev is null);
1158 
1159         scope(exit)
1160         {
1161             if (!isSingleTask)
1162             {
1163                 waiterLock();
1164                 scope(exit) waiterUnlock();
1165                 notifyWaiters();
1166             }
1167         }
1168 
1169         try
1170         {
1171             job.job();
1172         }
1173         catch (Throwable e)
1174         {
1175             job.exception = e;
1176         }
1177 
1178         atomicSetUbyte(job.taskStatus, TaskStatus.done);
1179     }
1180 
1181     // This function is used for dummy pools created by Task.executeInNewThread().
1182     void doSingleTask()
1183     {
1184         // No synchronization.  Pool is guaranteed to only have one thread,
1185         // and the queue is submitted to before this thread is created.
1186         assert(head);
1187         auto t = head;
1188         t.next = t.prev = head = null;
1189         doJob(t);
1190     }
1191 
1192     // This function performs initialization for each thread that affects
1193     // thread local storage and therefore must be done from within the
1194     // worker thread.  It then calls executeWorkLoop().
1195     void startWorkLoop()
1196     {
1197         // Initialize thread index.
1198         {
1199             queueLock();
1200             scope(exit) queueUnlock();
1201             threadIndex = nextThreadIndex;
1202             nextThreadIndex++;
1203         }
1204 
1205         executeWorkLoop();
1206     }
1207 
1208     // This is the main work loop that worker threads spend their time in
1209     // until they terminate.  It's also entered by non-worker threads when
1210     // finish() is called with the blocking variable set to true.
1211     void executeWorkLoop()
1212     {
1213         while (atomicReadUbyte(status) != PoolState.stopNow)
1214         {
1215             AbstractTask* task = pop();
1216             if (task is null)
1217             {
1218                 if (atomicReadUbyte(status) == PoolState.finishing)
1219                 {
1220                     atomicSetUbyte(status, PoolState.stopNow);
1221                     return;
1222                 }
1223             }
1224             else
1225             {
1226                 doJob(task);
1227             }
1228         }
1229     }
1230 
1231     // Pop a task off the queue.
1232     AbstractTask* pop()
1233     {
1234         queueLock();
1235         scope(exit) queueUnlock();
1236         auto ret = popNoSync();
1237         while (ret is null && status == PoolState.running)
1238         {
1239             wait();
1240             ret = popNoSync();
1241         }
1242         return ret;
1243     }
1244 
1245     AbstractTask* popNoSync()
1246     out(returned)
1247     {
1248         /* If task.prev and task.next aren't null, then another thread
1249          * can try to delete this task from the pool after it's
1250          * alreadly been deleted/popped.
1251          */
1252         if (returned !is null)
1253         {
1254             assert(returned.next is null);
1255             assert(returned.prev is null);
1256         }
1257     }
1258     do
1259     {
1260         if (isSingleTask) return null;
1261 
1262         AbstractTask* returned = head;
1263         if (head !is null)
1264         {
1265             head = head.next;
1266             returned.prev = null;
1267             returned.next = null;
1268             returned.taskStatus = TaskStatus.inProgress;
1269         }
1270         if (head !is null)
1271         {
1272             head.prev = null;
1273         }
1274 
1275         return returned;
1276     }
1277 
1278     // Push a task onto the queue.
1279     void abstractPut(AbstractTask* task)
1280     {
1281         queueLock();
1282         scope(exit) queueUnlock();
1283         abstractPutNoSync(task);
1284     }
1285 
1286     void abstractPutNoSync(AbstractTask* task)
1287     in
1288     {
1289         assert(task);
1290     }
1291     out
1292     {
1293         import std.conv : text;
1294 
1295         assert(tail.prev !is tail);
1296         assert(tail.next is null, text(tail.prev, '\t', tail.next));
1297         if (tail.prev !is null)
1298         {
1299             assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next));
1300         }
1301     }
1302     do
1303     {
1304         // Not using enforce() to save on function call overhead since this
1305         // is a performance critical function.
1306         if (status != PoolState.running)
1307         {
1308             throw new Error(
1309                 "Cannot submit a new task to a pool after calling " ~
1310                 "finish() or stop()."
1311             );
1312         }
1313 
1314         task.next = null;
1315         if (head is null)   //Queue is empty.
1316         {
1317             head = task;
1318             tail = task;
1319             tail.prev = null;
1320         }
1321         else
1322         {
1323             assert(tail);
1324             task.prev = tail;
1325             tail.next = task;
1326             tail = task;
1327         }
1328         notify();
1329     }
1330 
1331     void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t)
1332     {
1333         if (status != PoolState.running)
1334         {
1335             throw new Error(
1336                 "Cannot submit a new task to a pool after calling " ~
1337                 "finish() or stop()."
1338             );
1339         }
1340 
1341         if (head is null)
1342         {
1343             head = h;
1344             tail = t;
1345         }
1346         else
1347         {
1348             h.prev = tail;
1349             tail.next = h;
1350             tail = t;
1351         }
1352 
1353         notifyAll();
1354     }
1355 
1356     void tryDeleteExecute(AbstractTask* toExecute)
1357     {
1358         if (isSingleTask) return;
1359 
1360         if ( !deleteItem(toExecute) )
1361         {
1362             return;
1363         }
1364 
1365         try
1366         {
1367             toExecute.job();
1368         }
1369         catch (Exception e)
1370         {
1371             toExecute.exception = e;
1372         }
1373 
1374         atomicSetUbyte(toExecute.taskStatus, TaskStatus.done);
1375     }
1376 
1377     bool deleteItem(AbstractTask* item)
1378     {
1379         queueLock();
1380         scope(exit) queueUnlock();
1381         return deleteItemNoSync(item);
1382     }
1383 
1384     bool deleteItemNoSync(AbstractTask* item)
1385     {
1386         if (item.taskStatus != TaskStatus.notStarted)
1387         {
1388             return false;
1389         }
1390         item.taskStatus = TaskStatus.inProgress;
1391 
1392         if (item is head)
1393         {
1394             // Make sure head gets set properly.
1395             popNoSync();
1396             return true;
1397         }
1398         if (item is tail)
1399         {
1400             tail = tail.prev;
1401             if (tail !is null)
1402             {
1403                 tail.next = null;
1404             }
1405             item.next = null;
1406             item.prev = null;
1407             return true;
1408         }
1409         if (item.next !is null)
1410         {
1411             assert(item.next.prev is item);  // Check queue consistency.
1412             item.next.prev = item.prev;
1413         }
1414         if (item.prev !is null)
1415         {
1416             assert(item.prev.next is item);  // Check queue consistency.
1417             item.prev.next = item.next;
1418         }
1419         item.next = null;
1420         item.prev = null;
1421         return true;
1422     }
1423 
1424     void queueLock()
1425     {
1426         assert(queueMutex);
1427         if (!isSingleTask) queueMutex.lock();
1428     }
1429 
1430     void queueUnlock()
1431     {
1432         assert(queueMutex);
1433         if (!isSingleTask) queueMutex.unlock();
1434     }
1435 
1436     void waiterLock()
1437     {
1438         if (!isSingleTask) waiterMutex.lock();
1439     }
1440 
1441     void waiterUnlock()
1442     {
1443         if (!isSingleTask) waiterMutex.unlock();
1444     }
1445 
1446     void wait()
1447     {
1448         if (!isSingleTask) workerCondition.wait();
1449     }
1450 
1451     void notify()
1452     {
1453         if (!isSingleTask) workerCondition.notify();
1454     }
1455 
1456     void notifyAll()
1457     {
1458         if (!isSingleTask) workerCondition.notifyAll();
1459     }
1460 
1461     void waitUntilCompletion()
1462     {
1463         if (isSingleTask)
1464         {
1465             singleTaskThread.join();
1466         }
1467         else
1468         {
1469             waiterCondition.wait();
1470         }
1471     }
1472 
1473     void notifyWaiters()
1474     {
1475         if (!isSingleTask) waiterCondition.notifyAll();
1476     }
1477 
1478     // Private constructor for creating dummy pools that only have one thread,
1479     // only execute one Task, and then terminate.  This is used for
1480     // Task.executeInNewThread().
1481     this(AbstractTask* task, int priority = int.max)
1482     {
1483         assert(task);
1484 
1485         // Dummy value, not used.
1486         instanceStartIndex = 0;
1487 
1488         this.isSingleTask = true;
1489         task.taskStatus = TaskStatus.inProgress;
1490         this.head = task;
1491         singleTaskThread = new Thread(&doSingleTask);
1492         singleTaskThread.start();
1493 
1494         // Disabled until writing code to support
1495         // running thread with specified priority
1496         // See https://issues.dlang.org/show_bug.cgi?id=8960
1497 
1498         /*if (priority != int.max)
1499         {
1500             singleTaskThread.priority = priority;
1501         }*/
1502     }
1503 
1504 public:
1505     // This is used in parallel_algorithm but is too unstable to document
1506     // as public API.
1507     size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow
1508     {
1509         import std.algorithm.comparison : max;
1510 
1511         if (this.size == 0)
1512         {
1513             return max(rangeLen, 1);
1514         }
1515 
1516         immutable size_t eightSize = 4 * (this.size + 1);
1517         auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1);
1518         return max(ret, 1);
1519     }
1520 
1521     /**
1522     Default constructor that initializes a `TaskPool` with
1523     `totalCPUs` - 1 worker threads.  The minus 1 is included because the
1524     main thread will also be available to do work.
1525 
1526     Note:  On single-core machines, the primitives provided by `TaskPool`
1527            operate transparently in single-threaded mode.
1528      */
1529     this() @trusted
1530     {
1531         this(totalCPUs - 1);
1532     }
1533 
1534     /**
1535     Allows for custom number of worker threads.
1536     */
1537     this(size_t nWorkers) @trusted
1538     {
1539         synchronized(typeid(TaskPool))
1540         {
1541             instanceStartIndex = nextInstanceIndex;
1542 
1543             // The first worker thread to be initialized will have this index,
1544             // and will increment it.  The second worker to be initialized will
1545             // have this index plus 1.
1546             nextThreadIndex = instanceStartIndex;
1547             nextInstanceIndex += nWorkers;
1548         }
1549 
1550         queueMutex = new Mutex(this);
1551         waiterMutex = new Mutex();
1552         workerCondition = new Condition(queueMutex);
1553         waiterCondition = new Condition(waiterMutex);
1554 
1555         pool = new ParallelismThread[nWorkers];
1556         foreach (ref poolThread; pool)
1557         {
1558             poolThread = new ParallelismThread(&startWorkLoop);
1559             poolThread.pool = this;
1560             poolThread.start();
1561         }
1562     }
1563 
1564     /**
1565     Implements a parallel foreach loop over a range.  This works by implicitly
1566     creating and submitting one `Task` to the `TaskPool` for each worker
1567     thread.  A work unit is a set of consecutive elements of `range` to
1568     be processed by a worker thread between communication with any other
1569     thread.  The number of elements processed per work unit is controlled by the
1570     `workUnitSize` parameter.  Smaller work units provide better load
1571     balancing, but larger work units avoid the overhead of communicating
1572     with other threads frequently to fetch the next work unit.  Large work
1573     units also avoid false sharing in cases where the range is being modified.
1574     The less time a single iteration of the loop takes, the larger
1575     `workUnitSize` should be.  For very expensive loop bodies,
1576     `workUnitSize` should  be 1.  An overload that chooses a default work
1577     unit size is also available.
1578 
1579     Example:
1580     ---
1581     // Find the logarithm of every number from 1 to
1582     // 10_000_000 in parallel.
1583     auto logs = new double[10_000_000];
1584 
1585     // Parallel foreach works with or without an index
1586     // variable.  It can iterate by ref if range.front
1587     // returns by ref.
1588 
1589     // Iterate over logs using work units of size 100.
1590     foreach (i, ref elem; taskPool.parallel(logs, 100))
1591     {
1592         elem = log(i + 1.0);
1593     }
1594 
1595     // Same thing, but use the default work unit size.
1596     //
1597     // Timings on an Athlon 64 X2 dual core machine:
1598     //
1599     // Parallel foreach:  388 milliseconds
1600     // Regular foreach:   619 milliseconds
1601     foreach (i, ref elem; taskPool.parallel(logs))
1602     {
1603         elem = log(i + 1.0);
1604     }
1605     ---
1606 
1607     Notes:
1608 
1609     The memory usage of this implementation is guaranteed to be constant
1610     in `range.length`.
1611 
1612     Breaking from a parallel foreach loop via a break, labeled break,
1613     labeled continue, return or goto statement throws a
1614     `ParallelForeachError`.
1615 
1616     In the case of non-random access ranges, parallel foreach buffers lazily
1617     to an array of size `workUnitSize` before executing the parallel portion
1618     of the loop.  The exception is that, if a parallel foreach is executed
1619     over a range returned by `asyncBuf` or `map`, the copying is elided
1620     and the buffers are simply swapped.  In this case `workUnitSize` is
1621     ignored and the work unit size is set to the  buffer size of `range`.
1622 
1623     A memory barrier is guaranteed to be executed on exit from the loop,
1624     so that results produced by all threads are visible in the calling thread.
1625 
1626     $(B Exception Handling):
1627 
1628     When at least one exception is thrown from inside a parallel foreach loop,
1629     the submission of additional `Task` objects is terminated as soon as
1630     possible, in a non-deterministic manner.  All executing or
1631     enqueued work units are allowed to complete.  Then, all exceptions that
1632     were thrown by any work unit are chained using `Throwable.next` and
1633     rethrown.  The order of the exception chaining is non-deterministic.
1634     */
1635     ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
1636     {
1637         import std.exception : enforce;
1638         enforce(workUnitSize > 0, "workUnitSize must be > 0.");
1639         alias RetType = ParallelForeach!R;
1640         return RetType(this, range, workUnitSize);
1641     }
1642 
1643 
1644     /// Ditto
1645     ParallelForeach!R parallel(R)(R range)
1646     {
1647         static if (hasLength!R)
1648         {
1649             // Default work unit size is such that we would use 4x as many
1650             // slots as are in this thread pool.
1651             size_t workUnitSize = defaultWorkUnitSize(range.length);
1652             return parallel(range, workUnitSize);
1653         }
1654         else
1655         {
1656             // Just use a really, really dumb guess if the user is too lazy to
1657             // specify.
1658             return parallel(range, 512);
1659         }
1660     }
1661 
1662     ///
1663     template amap(functions...)
1664     {
1665         /**
1666         Eager parallel map.  The eagerness of this function means it has less
1667         overhead than the lazily evaluated `TaskPool.map` and should be
1668         preferred where the memory requirements of eagerness are acceptable.
1669         `functions` are the functions to be evaluated, passed as template
1670         alias parameters in a style similar to
1671         $(REF map, std,algorithm,iteration).
1672         The first argument must be a random access range. For performance
1673         reasons, amap will assume the range elements have not yet been
1674         initialized. Elements will be overwritten without calling a destructor
1675         nor doing an assignment. As such, the range must not contain meaningful
1676         data$(DDOC_COMMENT not a section): either un-initialized objects, or
1677         objects in their `.init` state.
1678 
1679         ---
1680         auto numbers = iota(100_000_000.0);
1681 
1682         // Find the square roots of numbers.
1683         //
1684         // Timings on an Athlon 64 X2 dual core machine:
1685         //
1686         // Parallel eager map:                   0.802 s
1687         // Equivalent serial implementation:     1.768 s
1688         auto squareRoots = taskPool.amap!sqrt(numbers);
1689         ---
1690 
1691         Immediately after the range argument, an optional work unit size argument
1692         may be provided.  Work units as used by `amap` are identical to those
1693         defined for parallel foreach.  If no work unit size is provided, the
1694         default work unit size is used.
1695 
1696         ---
1697         // Same thing, but make work unit size 100.
1698         auto squareRoots = taskPool.amap!sqrt(numbers, 100);
1699         ---
1700 
1701         An output range for returning the results may be provided as the last
1702         argument.  If one is not provided, an array of the proper type will be
1703         allocated on the garbage collected heap.  If one is provided, it must be a
1704         random access range with assignable elements, must have reference
1705         semantics with respect to assignment to its elements, and must have the
1706         same length as the input range.  Writing to adjacent elements from
1707         different threads must be safe.
1708 
1709         ---
1710         // Same thing, but explicitly allocate an array
1711         // to return the results in.  The element type
1712         // of the array may be either the exact type
1713         // returned by functions or an implicit conversion
1714         // target.
1715         auto squareRoots = new float[numbers.length];
1716         taskPool.amap!sqrt(numbers, squareRoots);
1717 
1718         // Multiple functions, explicit output range, and
1719         // explicit work unit size.
1720         auto results = new Tuple!(float, real)[numbers.length];
1721         taskPool.amap!(sqrt, log)(numbers, 100, results);
1722         ---
1723 
1724         Note:
1725 
1726         A memory barrier is guaranteed to be executed after all results are written
1727         but before returning so that results produced by all threads are visible
1728         in the calling thread.
1729 
1730         Tips:
1731 
1732         To perform the mapping operation in place, provide the same range for the
1733         input and output range.
1734 
1735         To parallelize the copying of a range with expensive to evaluate elements
1736         to an array, pass an identity function (a function that just returns
1737         whatever argument is provided to it) to `amap`.
1738 
1739         $(B Exception Handling):
1740 
1741         When at least one exception is thrown from inside the map functions,
1742         the submission of additional `Task` objects is terminated as soon as
1743         possible, in a non-deterministic manner.  All currently executing or
1744         enqueued work units are allowed to complete.  Then, all exceptions that
1745         were thrown from any work unit are chained using `Throwable.next` and
1746         rethrown.  The order of the exception chaining is non-deterministic.
1747         */
1748         auto amap(Args...)(Args args)
1749         if (isRandomAccessRange!(Args[0]))
1750         {
1751             import core.internal.lifetime : emplaceRef;
1752 
1753             alias fun = adjoin!(staticMap!(unaryFun, functions));
1754 
1755             alias range = args[0];
1756             immutable len = range.length;
1757 
1758             static if (
1759                 Args.length > 1 &&
1760                 randAssignable!(Args[$ - 1]) &&
1761                 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1]))
1762                 )
1763             {
1764                 import std.conv : text;
1765                 import std.exception : enforce;
1766 
1767                 alias buf = args[$ - 1];
1768                 alias args2 = args[0..$ - 1];
1769                 alias Args2 = Args[0..$ - 1];
1770                 enforce(buf.length == len,
1771                         text("Can't use a user supplied buffer that's the wrong ",
1772                              "size.  (Expected  :", len, " Got:  ", buf.length));
1773             }
1774             else static if (randAssignable!(Args[$ - 1]) && Args.length > 1)
1775             {
1776                 static assert(0, "Wrong buffer type.");
1777             }
1778             else
1779             {
1780                 import std.array : uninitializedArray;
1781 
1782                 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len);
1783                 alias args2 = args;
1784                 alias Args2 = Args;
1785             }
1786 
1787             if (!len) return buf;
1788 
1789             static if (isIntegral!(Args2[$ - 1]))
1790             {
1791                 static assert(args2.length == 2);
1792                 auto workUnitSize = cast(size_t) args2[1];
1793             }
1794             else
1795             {
1796                 static assert(args2.length == 1, Args);
1797                 auto workUnitSize = defaultWorkUnitSize(range.length);
1798             }
1799 
1800             alias R = typeof(range);
1801 
1802             if (workUnitSize > len)
1803             {
1804                 workUnitSize = len;
1805             }
1806 
1807             // Handle as a special case:
1808             if (size == 0)
1809             {
1810                 size_t index = 0;
1811                 foreach (elem; range)
1812                 {
1813                     emplaceRef(buf[index++], fun(elem));
1814                 }
1815                 return buf;
1816             }
1817 
1818             // Effectively -1:  chunkIndex + 1 == 0:
1819             shared size_t workUnitIndex = size_t.max;
1820             shared bool shouldContinue = true;
1821 
1822             void doIt()
1823             {
1824                 import std.algorithm.comparison : min;
1825 
1826                 scope(failure)
1827                 {
1828                     // If an exception is thrown, all threads should bail.
1829                     atomicStore(shouldContinue, false);
1830                 }
1831 
1832                 while (atomicLoad(shouldContinue))
1833                 {
1834                     immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
1835                     immutable start = workUnitSize * myUnitIndex;
1836                     if (start >= len)
1837                     {
1838                         atomicStore(shouldContinue, false);
1839                         break;
1840                     }
1841 
1842                     immutable end = min(len, start + workUnitSize);
1843 
1844                     static if (hasSlicing!R)
1845                     {
1846                         auto subrange = range[start .. end];
1847                         foreach (i; start .. end)
1848                         {
1849                             emplaceRef(buf[i], fun(subrange.front));
1850                             subrange.popFront();
1851                         }
1852                     }
1853                     else
1854                     {
1855                         foreach (i; start .. end)
1856                         {
1857                             emplaceRef(buf[i], fun(range[i]));
1858                         }
1859                     }
1860                 }
1861             }
1862 
1863             submitAndExecute(this, &doIt);
1864             return buf;
1865         }
1866     }
1867 
1868     ///
1869     template map(functions...)
1870     {
1871         /**
1872         A semi-lazy parallel map that can be used for pipelining.  The map
1873         functions are evaluated for the first `bufSize` elements and stored in a
1874         buffer and made available to `popFront`.  Meanwhile, in the
1875         background a second buffer of the same size is filled.  When the first
1876         buffer is exhausted, it is swapped with the second buffer and filled while
1877         the values from what was originally the second buffer are read.  This
1878         implementation allows for elements to be written to the buffer without
1879         the need for atomic operations or synchronization for each write, and
1880         enables the mapping function to be evaluated efficiently in parallel.
1881 
1882         `map` has more overhead than the simpler procedure used by `amap`
1883         but avoids the need to keep all results in memory simultaneously and works
1884         with non-random access ranges.
1885 
1886         Params:
1887 
1888         source = The $(REF_ALTTEXT input range, isInputRange, std,range,primitives)
1889         to be mapped.  If `source` is not random
1890         access it will be lazily buffered to an array of size `bufSize` before
1891         the map function is evaluated.  (For an exception to this rule, see Notes.)
1892 
1893         bufSize = The size of the buffer to store the evaluated elements.
1894 
1895         workUnitSize = The number of elements to evaluate in a single
1896         `Task`.  Must be less than or equal to `bufSize`, and
1897         should be a fraction of `bufSize` such that all worker threads can be
1898         used.  If the default of size_t.max is used, workUnitSize will be set to
1899         the pool-wide default.
1900 
1901         Returns:  An input range representing the results of the map.  This range
1902                   has a length iff `source` has a length.
1903 
1904         Notes:
1905 
1906         If a range returned by `map` or `asyncBuf` is used as an input to
1907         `map`, then as an optimization the copying from the output buffer
1908         of the first range to the input buffer of the second range is elided, even
1909         though the ranges returned by `map` and `asyncBuf` are non-random
1910         access ranges.  This means that the `bufSize` parameter passed to the
1911         current call to `map` will be ignored and the size of the buffer
1912         will be the buffer size of `source`.
1913 
1914         Example:
1915         ---
1916         // Pipeline reading a file, converting each line
1917         // to a number, taking the logarithms of the numbers,
1918         // and performing the additions necessary to find
1919         // the sum of the logarithms.
1920 
1921         auto lineRange = File("numberList.txt").byLine();
1922         auto dupedLines = std.algorithm.map!"a.idup"(lineRange);
1923         auto nums = taskPool.map!(to!double)(dupedLines);
1924         auto logs = taskPool.map!log10(nums);
1925 
1926         double sum = 0;
1927         foreach (elem; logs)
1928         {
1929             sum += elem;
1930         }
1931         ---
1932 
1933         $(B Exception Handling):
1934 
1935         Any exceptions thrown while iterating over `source`
1936         or computing the map function are re-thrown on a call to `popFront` or,
1937         if thrown during construction, are simply allowed to propagate to the
1938         caller.  In the case of exceptions thrown while computing the map function,
1939         the exceptions are chained as in `TaskPool.amap`.
1940         */
1941         auto
1942         map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max)
1943         if (isInputRange!S)
1944         {
1945             import std.exception : enforce;
1946 
1947             enforce(workUnitSize == size_t.max || workUnitSize <= bufSize,
1948                     "Work unit size must be smaller than buffer size.");
1949             alias fun = adjoin!(staticMap!(unaryFun, functions));
1950 
1951             static final class Map
1952             {
1953                 // This is a class because the task needs to be located on the
1954                 // heap and in the non-random access case source needs to be on
1955                 // the heap, too.
1956 
1957             private:
1958                 enum bufferTrick = is(typeof(source.buf1)) &&
1959                 is(typeof(source.bufPos)) &&
1960                 is(typeof(source.doBufSwap()));
1961 
1962                 alias E = MapType!(S, functions);
1963                 E[] buf1, buf2;
1964                 S source;
1965                 TaskPool pool;
1966                 Task!(run, E[] delegate(E[]), E[]) nextBufTask;
1967                 size_t workUnitSize;
1968                 size_t bufPos;
1969                 bool lastTaskWaited;
1970 
1971             static if (isRandomAccessRange!S)
1972             {
1973                 alias FromType = S;
1974 
1975                 void popSource()
1976                 {
1977                     import std.algorithm.comparison : min;
1978 
1979                     static if (__traits(compiles, source[0 .. source.length]))
1980                     {
1981                         source = source[min(buf1.length, source.length)..source.length];
1982                     }
1983                     else static if (__traits(compiles, source[0..$]))
1984                     {
1985                         source = source[min(buf1.length, source.length)..$];
1986                     }
1987                     else
1988                     {
1989                         static assert(0, "S must have slicing for Map."
1990                                       ~ "  " ~ S.stringof ~ " doesn't.");
1991                     }
1992                 }
1993             }
1994             else static if (bufferTrick)
1995             {
1996                 // Make sure we don't have the buffer recycling overload of
1997                 // asyncBuf.
1998                 static if (
1999                     is(typeof(source.source)) &&
2000                     isRoundRobin!(typeof(source.source))
2001                 )
2002                 {
2003                     static assert(0, "Cannot execute a parallel map on " ~
2004                                   "the buffer recycling overload of asyncBuf."
2005                                  );
2006                 }
2007 
2008                 alias FromType = typeof(source.buf1);
2009                 FromType from;
2010 
2011                 // Just swap our input buffer with source's output buffer.
2012                 // No need to copy element by element.
2013                 FromType dumpToFrom()
2014                 {
2015                     import std.algorithm.mutation : swap;
2016 
2017                     assert(source.buf1.length <= from.length);
2018                     from.length = source.buf1.length;
2019                     swap(source.buf1, from);
2020 
2021                     // Just in case this source has been popped before
2022                     // being sent to map:
2023                     from = from[source.bufPos..$];
2024 
2025                     static if (is(typeof(source._length)))
2026                     {
2027                         source._length -= (from.length - source.bufPos);
2028                     }
2029 
2030                     source.doBufSwap();
2031 
2032                     return from;
2033                 }
2034             }
2035             else
2036             {
2037                 alias FromType = ElementType!S[];
2038 
2039                 // The temporary array that data is copied to before being
2040                 // mapped.
2041                 FromType from;
2042 
2043                 FromType dumpToFrom()
2044                 {
2045                     assert(from !is null);
2046 
2047                     size_t i;
2048                     for (; !source.empty && i < from.length; source.popFront())
2049                     {
2050                         from[i++] = source.front;
2051                     }
2052 
2053                     from = from[0 .. i];
2054                     return from;
2055                 }
2056             }
2057 
2058             static if (hasLength!S)
2059             {
2060                 size_t _length;
2061 
2062                 public @property size_t length() const @safe pure nothrow
2063                 {
2064                     return _length;
2065                 }
2066             }
2067 
2068                 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool)
2069                 {
2070                     static if (bufferTrick)
2071                     {
2072                         bufSize = source.buf1.length;
2073                     }
2074 
2075                     buf1.length = bufSize;
2076                     buf2.length = bufSize;
2077 
2078                     static if (!isRandomAccessRange!S)
2079                     {
2080                         from.length = bufSize;
2081                     }
2082 
2083                     this.workUnitSize = (workUnitSize == size_t.max) ?
2084                             pool.defaultWorkUnitSize(bufSize) : workUnitSize;
2085                     this.source = source;
2086                     this.pool = pool;
2087 
2088                     static if (hasLength!S)
2089                     {
2090                         _length = source.length;
2091                     }
2092 
2093                     buf1 = fillBuf(buf1);
2094                     submitBuf2();
2095                 }
2096 
2097                 // The from parameter is a dummy and ignored in the random access
2098                 // case.
2099                 E[] fillBuf(E[] buf)
2100                 {
2101                     import std.algorithm.comparison : min;
2102 
2103                     static if (isRandomAccessRange!S)
2104                     {
2105                         import std.range : take;
2106                         auto toMap = take(source, buf.length);
2107                         scope(success) popSource();
2108                     }
2109                     else
2110                     {
2111                         auto toMap = dumpToFrom();
2112                     }
2113 
2114                     buf = buf[0 .. min(buf.length, toMap.length)];
2115 
2116                     // Handle as a special case:
2117                     if (pool.size == 0)
2118                     {
2119                         size_t index = 0;
2120                         foreach (elem; toMap)
2121                         {
2122                             buf[index++] = fun(elem);
2123                         }
2124                         return buf;
2125                     }
2126 
2127                     pool.amap!functions(toMap, workUnitSize, buf);
2128 
2129                     return buf;
2130                 }
2131 
2132                 void submitBuf2()
2133                 in
2134                 {
2135                     assert(nextBufTask.prev is null);
2136                     assert(nextBufTask.next is null);
2137                 }
2138                 do
2139                 {
2140                     // Hack to reuse the task object.
2141 
2142                     nextBufTask = typeof(nextBufTask).init;
2143                     nextBufTask._args[0] = &fillBuf;
2144                     nextBufTask._args[1] = buf2;
2145                     pool.put(nextBufTask);
2146                 }
2147 
2148                 void doBufSwap()
2149                 {
2150                     if (lastTaskWaited)
2151                     {
2152                         // Then the source is empty.  Signal it here.
2153                         buf1 = null;
2154                         buf2 = null;
2155 
2156                         static if (!isRandomAccessRange!S)
2157                         {
2158                             from = null;
2159                         }
2160 
2161                         return;
2162                     }
2163 
2164                     buf2 = buf1;
2165                     buf1 = nextBufTask.yieldForce;
2166                     bufPos = 0;
2167 
2168                     if (source.empty)
2169                     {
2170                         lastTaskWaited = true;
2171                     }
2172                     else
2173                     {
2174                         submitBuf2();
2175                     }
2176                 }
2177 
2178             public:
2179                 @property auto front()
2180                 {
2181                     return buf1[bufPos];
2182                 }
2183 
2184                 void popFront()
2185                 {
2186                     static if (hasLength!S)
2187                     {
2188                         _length--;
2189                     }
2190 
2191                     bufPos++;
2192                     if (bufPos >= buf1.length)
2193                     {
2194                         doBufSwap();
2195                     }
2196                 }
2197 
2198                 static if (isInfinite!S)
2199                 {
2200                     enum bool empty = false;
2201                 }
2202                 else
2203                 {
2204 
2205                     bool empty() const @property
2206                     {
2207                         // popFront() sets this when source is empty
2208                         return buf1.length == 0;
2209                     }
2210                 }
2211             }
2212             return new Map(source, bufSize, workUnitSize, this);
2213         }
2214     }
2215 
2216     /**
2217     Given a `source` range that is expensive to iterate over, returns an
2218     $(REF_ALTTEXT input range, isInputRange, std,range,primitives) that
2219     asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread,
2220     while making previously buffered elements from a second buffer, also of size
2221     `bufSize`, available via the range interface of the returned
2222     object.  The returned range has a length iff `hasLength!S`.
2223     `asyncBuf` is useful, for example, when performing expensive operations
2224     on the elements of ranges that represent data on a disk or network.
2225 
2226     Example:
2227     ---
2228     import std.conv, std.stdio;
2229 
2230     void main()
2231     {
2232         // Fetch lines of a file in a background thread
2233         // while processing previously fetched lines,
2234         // dealing with byLine's buffer recycling by
2235         // eagerly duplicating every line.
2236         auto lines = File("foo.txt").byLine();
2237         auto duped = std.algorithm.map!"a.idup"(lines);
2238 
2239         // Fetch more lines in the background while we
2240         // process the lines already read into memory
2241         // into a matrix of doubles.
2242         double[][] matrix;
2243         auto asyncReader = taskPool.asyncBuf(duped);
2244 
2245         foreach (line; asyncReader)
2246         {
2247             auto ls = line.split("\t");
2248             matrix ~= to!(double[])(ls);
2249         }
2250     }
2251     ---
2252 
2253     $(B Exception Handling):
2254 
2255     Any exceptions thrown while iterating over `source` are re-thrown on a
2256     call to `popFront` or, if thrown during construction, simply
2257     allowed to propagate to the caller.
2258     */
2259     auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S)
2260     {
2261         static final class AsyncBuf
2262         {
2263             // This is a class because the task and source both need to be on
2264             // the heap.
2265 
2266             // The element type of S.
2267             alias E = ElementType!S;  // Needs to be here b/c of forward ref bugs.
2268 
2269         private:
2270             E[] buf1, buf2;
2271             S source;
2272             TaskPool pool;
2273             Task!(run, E[] delegate(E[]), E[]) nextBufTask;
2274             size_t bufPos;
2275             bool lastTaskWaited;
2276 
2277             static if (hasLength!S)
2278             {
2279                 size_t _length;
2280 
2281                 // Available if hasLength!S.
2282                 public @property size_t length() const @safe pure nothrow
2283                 {
2284                     return _length;
2285                 }
2286             }
2287 
2288             this(S source, size_t bufSize, TaskPool pool)
2289             {
2290                 buf1.length = bufSize;
2291                 buf2.length = bufSize;
2292 
2293                 this.source = source;
2294                 this.pool = pool;
2295 
2296                 static if (hasLength!S)
2297                 {
2298                     _length = source.length;
2299                 }
2300 
2301                 buf1 = fillBuf(buf1);
2302                 submitBuf2();
2303             }
2304 
2305             E[] fillBuf(E[] buf)
2306             {
2307                 assert(buf !is null);
2308 
2309                 size_t i;
2310                 for (; !source.empty && i < buf.length; source.popFront())
2311                 {
2312                     buf[i++] = source.front;
2313                 }
2314 
2315                 buf = buf[0 .. i];
2316                 return buf;
2317             }
2318 
2319             void submitBuf2()
2320             in
2321             {
2322                 assert(nextBufTask.prev is null);
2323                 assert(nextBufTask.next is null);
2324             }
2325             do
2326             {
2327                 // Hack to reuse the task object.
2328 
2329                 nextBufTask = typeof(nextBufTask).init;
2330                 nextBufTask._args[0] = &fillBuf;
2331                 nextBufTask._args[1] = buf2;
2332                 pool.put(nextBufTask);
2333             }
2334 
2335             void doBufSwap()
2336             {
2337                 if (lastTaskWaited)
2338                 {
2339                     // Then source is empty.  Signal it here.
2340                     buf1 = null;
2341                     buf2 = null;
2342                     return;
2343                 }
2344 
2345                 buf2 = buf1;
2346                 buf1 = nextBufTask.yieldForce;
2347                 bufPos = 0;
2348 
2349                 if (source.empty)
2350                 {
2351                     lastTaskWaited = true;
2352                 }
2353                 else
2354                 {
2355                     submitBuf2();
2356                 }
2357             }
2358 
2359         public:
2360             E front() @property
2361             {
2362                 return buf1[bufPos];
2363             }
2364 
2365             void popFront()
2366             {
2367                 static if (hasLength!S)
2368                 {
2369                     _length--;
2370                 }
2371 
2372                 bufPos++;
2373                 if (bufPos >= buf1.length)
2374                 {
2375                     doBufSwap();
2376                 }
2377             }
2378 
2379             static if (isInfinite!S)
2380             {
2381                 enum bool empty = false;
2382             }
2383 
2384             else
2385             {
2386                 ///
2387                 bool empty() @property
2388                 {
2389                     // popFront() sets this when source is empty:
2390                     return buf1.length == 0;
2391                 }
2392             }
2393         }
2394         return new AsyncBuf(source, bufSize, this);
2395     }
2396 
2397     /**
2398     Given a callable object `next` that writes to a user-provided buffer and
2399     a second callable object `empty` that determines whether more data is
2400     available to write via `next`, returns an input range that
2401     asynchronously calls `next` with a set of size `nBuffers` of buffers
2402     and makes the results available in the order they were obtained via the
2403     input range interface of the returned object.  Similarly to the
2404     input range overload of `asyncBuf`, the first half of the buffers
2405     are made available via the range interface while the second half are
2406     filled and vice-versa.
2407 
2408     Params:
2409 
2410     next = A callable object that takes a single argument that must be an array
2411            with mutable elements.  When called, `next` writes data to
2412            the array provided by the caller.
2413 
2414     empty = A callable object that takes no arguments and returns a type
2415             implicitly convertible to `bool`.  This is used to signify
2416             that no more data is available to be obtained by calling `next`.
2417 
2418     initialBufSize = The initial size of each buffer.  If `next` takes its
2419                      array by reference, it may resize the buffers.
2420 
2421     nBuffers = The number of buffers to cycle through when calling `next`.
2422 
2423     Example:
2424     ---
2425     // Fetch lines of a file in a background
2426     // thread while processing previously fetched
2427     // lines, without duplicating any lines.
2428     auto file = File("foo.txt");
2429 
2430     void next(ref char[] buf)
2431     {
2432         file.readln(buf);
2433     }
2434 
2435     // Fetch more lines in the background while we
2436     // process the lines already read into memory
2437     // into a matrix of doubles.
2438     double[][] matrix;
2439     auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
2440 
2441     foreach (line; asyncReader)
2442     {
2443         auto ls = line.split("\t");
2444         matrix ~= to!(double[])(ls);
2445     }
2446     ---
2447 
2448     $(B Exception Handling):
2449 
2450     Any exceptions thrown while iterating over `range` are re-thrown on a
2451     call to `popFront`.
2452 
2453     Warning:
2454 
2455     Using the range returned by this function in a parallel foreach loop
2456     will not work because buffers may be overwritten while the task that
2457     processes them is in queue.  This is checked for at compile time
2458     and will result in a static assertion failure.
2459     */
2460     auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100)
2461     if (is(typeof(C2.init()) : bool) &&
2462         Parameters!C1.length == 1 &&
2463         Parameters!C2.length == 0 &&
2464         isArray!(Parameters!C1[0])
2465     ) {
2466         auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers);
2467         return asyncBuf(roundRobin, nBuffers / 2);
2468     }
2469 
2470     ///
2471     template reduce(functions...)
2472     {
2473         /**
2474         Parallel reduce on a random access range.  Except as otherwise noted,
2475         usage is similar to $(REF _reduce, std,algorithm,iteration).  There is
2476         also $(LREF fold) which does the same thing with a different parameter
2477         order.
2478 
2479         This function works by splitting the range to be reduced into work
2480         units, which are slices to be reduced in parallel.  Once the results
2481         from all work units are computed, a final serial reduction is performed
2482         on these results to compute the final answer. Therefore, care must be
2483         taken to choose the seed value appropriately.
2484 
2485         Because the reduction is being performed in parallel, `functions`
2486         must be associative.  For notational simplicity, let # be an
2487         infix operator representing `functions`.  Then, (a # b) # c must equal
2488         a # (b # c).  Floating point addition is not associative
2489         even though addition in exact arithmetic is.  Summing floating
2490         point numbers using this function may give different results than summing
2491         serially.  However, for many practical purposes floating point addition
2492         can be treated as associative.
2493 
2494         Note that, since `functions` are assumed to be associative,
2495         additional optimizations are made to the serial portion of the reduction
2496         algorithm. These take advantage of the instruction level parallelism of
2497         modern CPUs, in addition to the thread-level parallelism that the rest
2498         of this module exploits.  This can lead to better than linear speedups
2499         relative to $(REF _reduce, std,algorithm,iteration), especially for
2500         fine-grained benchmarks like dot products.
2501 
2502         An explicit seed may be provided as the first argument.  If
2503         provided, it is used as the seed for all work units and for the final
2504         reduction of results from all work units.  Therefore, if it is not the
2505         identity value for the operation being performed, results may differ
2506         from those generated by $(REF _reduce, std,algorithm,iteration) or
2507         depending on how many work units are used.  The next argument must be
2508         the range to be reduced.
2509         ---
2510         // Find the sum of squares of a range in parallel, using
2511         // an explicit seed.
2512         //
2513         // Timings on an Athlon 64 X2 dual core machine:
2514         //
2515         // Parallel reduce:                     72 milliseconds
2516         // Using std.algorithm.reduce instead:  181 milliseconds
2517         auto nums = iota(10_000_000.0f);
2518         auto sumSquares = taskPool.reduce!"a + b"(
2519             0.0, std.algorithm.map!"a * a"(nums)
2520         );
2521         ---
2522 
2523         If no explicit seed is provided, the first element of each work unit
2524         is used as a seed.  For the final reduction, the result from the first
2525         work unit is used as the seed.
2526         ---
2527         // Find the sum of a range in parallel, using the first
2528         // element of each work unit as the seed.
2529         auto sum = taskPool.reduce!"a + b"(nums);
2530         ---
2531 
2532         An explicit work unit size may be specified as the last argument.
2533         Specifying too small a work unit size will effectively serialize the
2534         reduction, as the final reduction of the result of each work unit will
2535         dominate computation time.  If `TaskPool.size` for this instance
2536         is zero, this parameter is ignored and one work unit is used.
2537         ---
2538         // Use a work unit size of 100.
2539         auto sum2 = taskPool.reduce!"a + b"(nums, 100);
2540 
2541         // Work unit size of 100 and explicit seed.
2542         auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100);
2543         ---
2544 
2545         Parallel reduce supports multiple functions, like
2546         `std.algorithm.reduce`.
2547         ---
2548         // Find both the min and max of nums.
2549         auto minMax = taskPool.reduce!(min, max)(nums);
2550         assert(minMax[0] == reduce!min(nums));
2551         assert(minMax[1] == reduce!max(nums));
2552         ---
2553 
2554         $(B Exception Handling):
2555 
2556         After this function is finished executing, any exceptions thrown
2557         are chained together via `Throwable.next` and rethrown.  The chaining
2558         order is non-deterministic.
2559 
2560         See_Also:
2561 
2562             $(LREF fold) is functionally equivalent to $(LREF _reduce) except the
2563             range parameter comes first and there is no need to use
2564             $(REF_ALTTEXT `tuple`,tuple,std,typecons) for multiple seeds.
2565          */
2566         auto reduce(Args...)(Args args)
2567         {
2568             import core.exception : OutOfMemoryError;
2569             import core.internal.lifetime : emplaceRef;
2570             import std.exception : enforce;
2571 
2572             alias fun = reduceAdjoin!functions;
2573             alias finishFun = reduceFinish!functions;
2574 
2575             static if (isIntegral!(Args[$ - 1]))
2576             {
2577                 size_t workUnitSize = cast(size_t) args[$ - 1];
2578                 alias args2 = args[0..$ - 1];
2579                 alias Args2 = Args[0..$ - 1];
2580             }
2581             else
2582             {
2583                 alias args2 = args;
2584                 alias Args2 = Args;
2585             }
2586 
2587             auto makeStartValue(Type)(Type e)
2588             {
2589                 static if (functions.length == 1)
2590                 {
2591                     return e;
2592                 }
2593                 else
2594                 {
2595                     typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void;
2596                     foreach (i, T; seed.Types)
2597                     {
2598                         emplaceRef(seed.expand[i], e);
2599                     }
2600 
2601                     return seed;
2602                 }
2603             }
2604 
2605             static if (args2.length == 2)
2606             {
2607                 static assert(isInputRange!(Args2[1]));
2608                 alias range = args2[1];
2609                 alias seed = args2[0];
2610                 enum explicitSeed = true;
2611 
2612                 static if (!is(typeof(workUnitSize)))
2613                 {
2614                     size_t workUnitSize = defaultWorkUnitSize(range.length);
2615                 }
2616             }
2617             else
2618             {
2619                 static assert(args2.length == 1);
2620                 alias range = args2[0];
2621 
2622                 static if (!is(typeof(workUnitSize)))
2623                 {
2624                     size_t workUnitSize = defaultWorkUnitSize(range.length);
2625                 }
2626 
2627                 enforce(!range.empty,
2628                     "Cannot reduce an empty range with first element as start value.");
2629 
2630                 auto seed = makeStartValue(range.front);
2631                 enum explicitSeed = false;
2632                 range.popFront();
2633             }
2634 
2635             alias E = typeof(seed);
2636             alias R = typeof(range);
2637 
2638             E reduceOnRange(R range, size_t lowerBound, size_t upperBound)
2639             {
2640                 // This is for exploiting instruction level parallelism by
2641                 // using multiple accumulator variables within each thread,
2642                 // since we're assuming functions are associative anyhow.
2643 
2644                 // This is so that loops can be unrolled automatically.
2645                 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5);
2646                 enum nILP = ilpTuple.length;
2647                 immutable subSize = (upperBound - lowerBound) / nILP;
2648 
2649                 if (subSize <= 1)
2650                 {
2651                     // Handle as a special case.
2652                     static if (explicitSeed)
2653                     {
2654                         E result = seed;
2655                     }
2656                     else
2657                     {
2658                         E result = makeStartValue(range[lowerBound]);
2659                         lowerBound++;
2660                     }
2661 
2662                     foreach (i; lowerBound .. upperBound)
2663                     {
2664                         result = fun(result, range[i]);
2665                     }
2666 
2667                     return result;
2668                 }
2669 
2670                 assert(subSize > 1);
2671                 E[nILP] results;
2672                 size_t[nILP] offsets;
2673 
2674                 foreach (i; ilpTuple)
2675                 {
2676                     offsets[i] = lowerBound + subSize * i;
2677 
2678                     static if (explicitSeed)
2679                     {
2680                         results[i] = seed;
2681                     }
2682                     else
2683                     {
2684                         results[i] = makeStartValue(range[offsets[i]]);
2685                         offsets[i]++;
2686                     }
2687                 }
2688 
2689                 immutable nLoop = subSize - (!explicitSeed);
2690                 foreach (i; 0 .. nLoop)
2691                 {
2692                     foreach (j; ilpTuple)
2693                     {
2694                         results[j] = fun(results[j], range[offsets[j]]);
2695                         offsets[j]++;
2696                     }
2697                 }
2698 
2699                 // Finish the remainder.
2700                 foreach (i; nILP * subSize + lowerBound .. upperBound)
2701                 {
2702                     results[$ - 1] = fun(results[$ - 1], range[i]);
2703                 }
2704 
2705                 foreach (i; ilpTuple[1..$])
2706                 {
2707                     results[0] = finishFun(results[0], results[i]);
2708                 }
2709 
2710                 return results[0];
2711             }
2712 
2713             immutable len = range.length;
2714             if (len == 0)
2715             {
2716                 return seed;
2717             }
2718 
2719             if (this.size == 0)
2720             {
2721                 return finishFun(seed, reduceOnRange(range, 0, len));
2722             }
2723 
2724             // Unlike the rest of the functions here, I can't use the Task object
2725             // recycling trick here because this has to work on non-commutative
2726             // operations.  After all the tasks are done executing, fun() has to
2727             // be applied on the results of these to get a final result, but
2728             // it can't be evaluated out of order.
2729 
2730             if (workUnitSize > len)
2731             {
2732                 workUnitSize = len;
2733             }
2734 
2735             immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1);
2736             assert(nWorkUnits * workUnitSize >= len);
2737 
2738             alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t);
2739             RTask[] tasks;
2740 
2741             // Can't use alloca() due to https://issues.dlang.org/show_bug.cgi?id=3753
2742             // Use a fixed buffer backed by malloc().
2743             enum maxStack = 2_048;
2744             byte[maxStack] buf = void;
2745             immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof;
2746 
2747             import core.stdc.stdlib : malloc, free;
2748             if (nBytesNeeded <= maxStack)
2749             {
2750                 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits];
2751             }
2752             else
2753             {
2754                 auto ptr = cast(RTask*) malloc(nBytesNeeded);
2755                 if (!ptr)
2756                 {
2757                     throw new OutOfMemoryError(
2758                         "Out of memory in std.parallelism."
2759                     );
2760                 }
2761 
2762                 tasks = ptr[0 .. nWorkUnits];
2763             }
2764 
2765             scope(exit)
2766             {
2767                 if (nBytesNeeded > maxStack)
2768                 {
2769                     free(tasks.ptr);
2770                 }
2771             }
2772 
2773             // Hack to take the address of a nested function w/o
2774             // making a closure.
2775             static auto scopedAddress(D)(scope D del) @system
2776             {
2777                 auto tmp = del;
2778                 return tmp;
2779             }
2780 
2781             size_t curPos = 0;
2782             void useTask(ref RTask task)
2783             {
2784                 import std.algorithm.comparison : min;
2785                 import core.lifetime : emplace;
2786 
2787                 // Private constructor, so can't feed it's arguments directly
2788                 // to emplace
2789                 emplace(&task, RTask
2790                 (
2791                     scopedAddress(&reduceOnRange),
2792                     range,
2793                     curPos, // lower bound.
2794                     cast() min(len, curPos + workUnitSize)  // upper bound.
2795                 ));
2796 
2797                 task.pool = this;
2798 
2799                 curPos += workUnitSize;
2800             }
2801 
2802             foreach (ref task; tasks)
2803             {
2804                 useTask(task);
2805             }
2806 
2807             foreach (i; 1 .. tasks.length - 1)
2808             {
2809                 tasks[i].next = tasks[i + 1].basePtr;
2810                 tasks[i + 1].prev = tasks[i].basePtr;
2811             }
2812 
2813             if (tasks.length > 1)
2814             {
2815                 queueLock();
2816                 scope(exit) queueUnlock();
2817 
2818                 abstractPutGroupNoSync(
2819                     tasks[1].basePtr,
2820                     tasks[$ - 1].basePtr
2821                 );
2822             }
2823 
2824             if (tasks.length > 0)
2825             {
2826                 try
2827                 {
2828                     tasks[0].job();
2829                 }
2830                 catch (Throwable e)
2831                 {
2832                     tasks[0].exception = e;
2833                 }
2834                 tasks[0].taskStatus = TaskStatus.done;
2835 
2836                 // Try to execute each of these in the current thread
2837                 foreach (ref task; tasks[1..$])
2838                 {
2839                     tryDeleteExecute(task.basePtr);
2840                 }
2841             }
2842 
2843             // Now that we've tried to execute every task, they're all either
2844             // done or in progress.  Force all of them.
2845             E result = seed;
2846 
2847             Throwable firstException;
2848 
2849             foreach (ref task; tasks)
2850             {
2851                 try
2852                 {
2853                     task.yieldForce;
2854                 }
2855                 catch (Throwable e)
2856                 {
2857                     /* Chain e to front because order doesn't matter and because
2858                      * e is not likely to be a chain itself (so fewer traversals)
2859                      */
2860                     firstException = Throwable.chainTogether(e, firstException);
2861                     continue;
2862                 }
2863 
2864                 if (!firstException) result = finishFun(result, task.returnVal);
2865             }
2866 
2867             if (firstException) throw firstException;
2868 
2869             return result;
2870         }
2871     }
2872 
2873     ///
2874     template fold(functions...)
2875     {
2876         /** Implements the homonym function (also known as `accumulate`, `compress`,
2877             `inject`, or `foldl`) present in various programming languages of
2878             functional flavor.
2879 
2880             `fold` is functionally equivalent to $(LREF reduce) except the range
2881             parameter comes first and there is no need to use $(REF_ALTTEXT
2882             `tuple`,tuple,std,typecons) for multiple seeds.
2883 
2884             There may be one or more callable entities (`functions` argument) to
2885             apply.
2886 
2887             Params:
2888                 args = Just the range to _fold over; or the range and one seed
2889                        per function; or the range, one seed per function, and
2890                        the work unit size
2891 
2892             Returns:
2893                 The accumulated result as a single value for single function and
2894                 as a tuple of values for multiple functions
2895 
2896             See_Also:
2897             Similar to $(REF _fold, std,algorithm,iteration), `fold` is a wrapper around $(LREF reduce).
2898 
2899             Example:
2900             ---
2901             static int adder(int a, int b)
2902             {
2903                 return a + b;
2904             }
2905             static int multiplier(int a, int b)
2906             {
2907                 return a * b;
2908             }
2909 
2910             // Just the range
2911             auto x = taskPool.fold!adder([1, 2, 3, 4]);
2912             assert(x == 10);
2913 
2914             // The range and the seeds (0 and 1 below; also note multiple
2915             // functions in this example)
2916             auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1);
2917             assert(y[0] == 10);
2918             assert(y[1] == 24);
2919 
2920             // The range, the seed (0), and the work unit size (20)
2921             auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20);
2922             assert(z == 10);
2923             ---
2924         */
2925         auto fold(Args...)(Args args)
2926         {
2927             static assert(isInputRange!(Args[0]), "First argument must be an InputRange");
2928 
2929             alias range = args[0];
2930 
2931             static if (Args.length == 1)
2932             {
2933                 // Just the range
2934                 return reduce!functions(range);
2935             }
2936             else static if (Args.length == 1 + functions.length ||
2937                             Args.length == 1 + functions.length + 1)
2938             {
2939                 static if (functions.length == 1)
2940                 {
2941                     alias seeds = args[1];
2942                 }
2943                 else
2944                 {
2945                     auto seeds()
2946                     {
2947                         import std.typecons : tuple;
2948                         return tuple(args[1 .. functions.length+1]);
2949                     }
2950                 }
2951 
2952                 static if (Args.length == 1 + functions.length)
2953                 {
2954                     // The range and the seeds
2955                     return reduce!functions(seeds, range);
2956                 }
2957                 else static if (Args.length == 1 + functions.length + 1)
2958                 {
2959                     // The range, the seeds, and the work unit size
2960                     static assert(isIntegral!(Args[$-1]), "Work unit size must be an integral type");
2961                     return reduce!functions(seeds, range, args[$-1]);
2962                 }
2963             }
2964             else
2965             {
2966                 import std.conv : text;
2967                 static assert(0, "Invalid number of arguments (" ~ Args.length.text ~ "): Should be an input range, "
2968                               ~ functions.length.text ~ " optional seed(s), and an optional work unit size.");
2969             }
2970         }
2971     }
2972 
2973     // This test is not included in the documentation because even though these
2974     // examples are for the inner fold() template, with their current location,
2975     // they would appear under the outer one. (We can't move this inside the
2976     // outer fold() template because then dmd runs out of memory possibly due to
2977     // recursive template instantiation, which is surprisingly not caught.)
2978     @system unittest
2979     {
2980         // Just the range
2981         auto x = taskPool.fold!"a + b"([1, 2, 3, 4]);
2982         assert(x == 10);
2983 
2984         // The range and the seeds (0 and 1 below; also note multiple
2985         // functions in this example)
2986         auto y = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 1);
2987         assert(y[0] == 10);
2988         assert(y[1] == 24);
2989 
2990         // The range, the seed (0), and the work unit size (20)
2991         auto z = taskPool.fold!"a + b"([1, 2, 3, 4], 0, 20);
2992         assert(z == 10);
2993     }
2994 
2995     /**
2996     Gets the index of the current thread relative to this `TaskPool`.  Any
2997     thread not in this pool will receive an index of 0.  The worker threads in
2998     this pool receive unique indices of 1 through `this.size`.
2999 
3000     This function is useful for maintaining worker-local resources.
3001 
3002     Example:
3003     ---
3004     // Execute a loop that computes the greatest common
3005     // divisor of every number from 0 through 999 with
3006     // 42 in parallel.  Write the results out to
3007     // a set of files, one for each thread.  This allows
3008     // results to be written out without any synchronization.
3009 
3010     import std.conv, std.range, std.numeric, std.stdio;
3011 
3012     void main()
3013     {
3014         auto filesHandles = new File[taskPool.size + 1];
3015         scope(exit) {
3016             foreach (ref handle; fileHandles)
3017             {
3018                 handle.close();
3019             }
3020         }
3021 
3022         foreach (i, ref handle; fileHandles)
3023         {
3024             handle = File("workerResults" ~ to!string(i) ~ ".txt");
3025         }
3026 
3027         foreach (num; parallel(iota(1_000)))
3028         {
3029             auto outHandle = fileHandles[taskPool.workerIndex];
3030             outHandle.writeln(num, '\t', gcd(num, 42));
3031         }
3032     }
3033     ---
3034     */
3035     size_t workerIndex() @property @safe const nothrow
3036     {
3037         immutable rawInd = threadIndex;
3038         return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ?
3039                 (rawInd - instanceStartIndex + 1) : 0;
3040     }
3041 
3042     /**
3043     Struct for creating worker-local storage.  Worker-local storage is
3044     thread-local storage that exists only for worker threads in a given
3045     `TaskPool` plus a single thread outside the pool.  It is allocated on the
3046     garbage collected heap in a way that avoids _false sharing, and doesn't
3047     necessarily have global scope within any thread.  It can be accessed from
3048     any worker thread in the `TaskPool` that created it, and one thread
3049     outside this `TaskPool`.  All threads outside the pool that created a
3050     given instance of worker-local storage share a single slot.
3051 
3052     Since the underlying data for this struct is heap-allocated, this struct
3053     has reference semantics when passed between functions.
3054 
3055     The main uses cases for `WorkerLocalStorage` are:
3056 
3057     1.  Performing parallel reductions with an imperative, as opposed to
3058         functional, programming style.  In this case, it's useful to treat
3059         `WorkerLocalStorage` as local to each thread for only the parallel
3060         portion of an algorithm.
3061 
3062     2.  Recycling temporary buffers across iterations of a parallel foreach loop.
3063 
3064     Example:
3065     ---
3066     // Calculate pi as in our synopsis example, but
3067     // use an imperative instead of a functional style.
3068     immutable n = 1_000_000_000;
3069     immutable delta = 1.0L / n;
3070 
3071     auto sums = taskPool.workerLocalStorage(0.0L);
3072     foreach (i; parallel(iota(n)))
3073     {
3074         immutable x = ( i - 0.5L ) * delta;
3075         immutable toAdd = delta / ( 1.0 + x * x );
3076         sums.get += toAdd;
3077     }
3078 
3079     // Add up the results from each worker thread.
3080     real pi = 0;
3081     foreach (threadResult; sums.toRange)
3082     {
3083         pi += 4.0L * threadResult;
3084     }
3085     ---
3086      */
3087     static struct WorkerLocalStorage(T)
3088     {
3089     private:
3090         TaskPool pool;
3091         size_t size;
3092 
3093         size_t elemSize;
3094         bool* stillThreadLocal;
3095 
3096         static size_t roundToLine(size_t num) pure nothrow
3097         {
3098             if (num % cacheLineSize == 0)
3099             {
3100                 return num;
3101             }
3102             else
3103             {
3104                 return ((num / cacheLineSize) + 1) * cacheLineSize;
3105             }
3106         }
3107 
3108         void* data;
3109 
3110         void initialize(TaskPool pool)
3111         {
3112             this.pool = pool;
3113             size = pool.size + 1;
3114             stillThreadLocal = new bool;
3115             *stillThreadLocal = true;
3116 
3117             // Determines whether the GC should scan the array.
3118             auto blkInfo = (typeid(T).flags & 1) ?
3119                            cast(GC.BlkAttr) 0 :
3120                            GC.BlkAttr.NO_SCAN;
3121 
3122             immutable nElem = pool.size + 1;
3123             elemSize = roundToLine(T.sizeof);
3124 
3125             // The + 3 is to pad one full cache line worth of space on either side
3126             // of the data structure to make sure false sharing with completely
3127             // unrelated heap data is prevented, and to provide enough padding to
3128             // make sure that data is cache line-aligned.
3129             data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize;
3130 
3131             // Cache line align data ptr.
3132             data = cast(void*) roundToLine(cast(size_t) data);
3133 
3134             foreach (i; 0 .. nElem)
3135             {
3136                 this.opIndex(i) = T.init;
3137             }
3138         }
3139 
3140         ref opIndex(this Qualified)(size_t index)
3141         {
3142             import std.conv : text;
3143             assert(index < size, text(index, '\t', uint.max));
3144             return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index));
3145         }
3146 
3147         void opIndexAssign(T val, size_t index)
3148         {
3149             assert(index < size);
3150             *(cast(T*) (data + elemSize * index)) = val;
3151         }
3152 
3153     public:
3154         /**
3155         Get the current thread's instance.  Returns by ref.
3156         Note that calling `get` from any thread
3157         outside the `TaskPool` that created this instance will return the
3158         same reference, so an instance of worker-local storage should only be
3159         accessed from one thread outside the pool that created it.  If this
3160         rule is violated, undefined behavior will result.
3161 
3162         If assertions are enabled and `toRange` has been called, then this
3163         WorkerLocalStorage instance is no longer worker-local and an assertion
3164         failure will result when calling this method.  This is not checked
3165         when assertions are disabled for performance reasons.
3166          */
3167         ref get(this Qualified)() @property
3168         {
3169             assert(*stillThreadLocal,
3170                 "Cannot call get() on this instance of WorkerLocalStorage " ~
3171                 "because it is no longer worker-local."
3172             );
3173             return opIndex(pool.workerIndex);
3174         }
3175 
3176         /**
3177         Assign a value to the current thread's instance.  This function has
3178         the same caveats as its overload.
3179         */
3180         void get(T val) @property
3181         {
3182             assert(*stillThreadLocal,
3183                 "Cannot call get() on this instance of WorkerLocalStorage " ~
3184                 "because it is no longer worker-local."
3185             );
3186 
3187             opIndexAssign(val, pool.workerIndex);
3188         }
3189 
3190         /**
3191         Returns a range view of the values for all threads, which can be used
3192         to further process the results of each thread after running the parallel
3193         part of your algorithm.  Do not use this method in the parallel portion
3194         of your algorithm.
3195 
3196         Calling this function sets a flag indicating that this struct is no
3197         longer worker-local, and attempting to use the `get` method again
3198         will result in an assertion failure if assertions are enabled.
3199          */
3200         WorkerLocalStorageRange!T toRange() @property
3201         {
3202             if (*stillThreadLocal)
3203             {
3204                 *stillThreadLocal = false;
3205 
3206                 // Make absolutely sure results are visible to all threads.
3207                 // This is probably not necessary since some other
3208                 // synchronization primitive will be used to signal that the
3209                 // parallel part of the algorithm is done, but the
3210                 // performance impact should be negligible, so it's better
3211                 // to be safe.
3212                 ubyte barrierDummy;
3213                 atomicSetUbyte(barrierDummy, 1);
3214             }
3215 
3216             return WorkerLocalStorageRange!T(this);
3217         }
3218     }
3219 
3220     /**
3221     Range primitives for worker-local storage.  The purpose of this is to
3222     access results produced by each worker thread from a single thread once you
3223     are no longer using the worker-local storage from multiple threads.
3224     Do not use this struct in the parallel portion of your algorithm.
3225 
3226     The proper way to instantiate this object is to call
3227     `WorkerLocalStorage.toRange`.  Once instantiated, this object behaves
3228     as a finite random-access range with assignable, lvalue elements and
3229     a length equal to the number of worker threads in the `TaskPool` that
3230     created it plus 1.
3231      */
3232     static struct WorkerLocalStorageRange(T)
3233     {
3234     private:
3235         WorkerLocalStorage!T workerLocalStorage;
3236 
3237         size_t _length;
3238         size_t beginOffset;
3239 
3240         this(WorkerLocalStorage!T wl)
3241         {
3242             this.workerLocalStorage = wl;
3243             _length = wl.size;
3244         }
3245 
3246     public:
3247         ref front(this Qualified)() @property
3248         {
3249             return this[0];
3250         }
3251 
3252         ref back(this Qualified)() @property
3253         {
3254             return this[_length - 1];
3255         }
3256 
3257         void popFront()
3258         {
3259             if (_length > 0)
3260             {
3261                 beginOffset++;
3262                 _length--;
3263             }
3264         }
3265 
3266         void popBack()
3267         {
3268             if (_length > 0)
3269             {
3270                 _length--;
3271             }
3272         }
3273 
3274         typeof(this) save() @property
3275         {
3276             return this;
3277         }
3278 
3279         ref opIndex(this Qualified)(size_t index)
3280         {
3281             assert(index < _length);
3282             return workerLocalStorage[index + beginOffset];
3283         }
3284 
3285         void opIndexAssign(T val, size_t index)
3286         {
3287             assert(index < _length);
3288             workerLocalStorage[index] = val;
3289         }
3290 
3291         typeof(this) opSlice(size_t lower, size_t upper)
3292         {
3293             assert(upper <= _length);
3294             auto newWl = this.workerLocalStorage;
3295             newWl.data += lower * newWl.elemSize;
3296             newWl.size = upper - lower;
3297             return typeof(this)(newWl);
3298         }
3299 
3300         bool empty() const @property
3301         {
3302             return length == 0;
3303         }
3304 
3305         size_t length() const @property
3306         {
3307             return _length;
3308         }
3309     }
3310 
3311     /**
3312     Creates an instance of worker-local storage, initialized with a given
3313     value.  The value is `lazy` so that you can, for example, easily
3314     create one instance of a class for each worker.  For usage example,
3315     see the `WorkerLocalStorage` struct.
3316      */
3317     WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init)
3318     {
3319         WorkerLocalStorage!T ret;
3320         ret.initialize(this);
3321         foreach (i; 0 .. size + 1)
3322         {
3323             ret[i] = initialVal;
3324         }
3325 
3326         // Memory barrier to make absolutely sure that what we wrote is
3327         // visible to worker threads.
3328         ubyte barrierDummy;
3329         atomicSetUbyte(barrierDummy, 0);
3330 
3331         return ret;
3332     }
3333 
3334     /**
3335     Signals to all worker threads to terminate as soon as they are finished
3336     with their current `Task`, or immediately if they are not executing a
3337     `Task`.  `Task`s that were in queue will not be executed unless
3338     a call to `Task.workForce`, `Task.yieldForce` or `Task.spinForce`
3339     causes them to be executed.
3340 
3341     Use only if you have waited on every `Task` and therefore know the
3342     queue is empty, or if you speculatively executed some tasks and no longer
3343     need the results.
3344      */
3345     void stop() @trusted
3346     {
3347         queueLock();
3348         scope(exit) queueUnlock();
3349         atomicSetUbyte(status, PoolState.stopNow);
3350         notifyAll();
3351     }
3352 
3353     /**
3354     Signals worker threads to terminate when the queue becomes empty.
3355 
3356     If blocking argument is true, wait for all worker threads to terminate
3357     before returning.  This option might be used in applications where
3358     task results are never consumed-- e.g. when `TaskPool` is employed as a
3359     rudimentary scheduler for tasks which communicate by means other than
3360     return values.
3361 
3362     Warning:  Calling this function with $(D blocking = true) from a worker
3363               thread that is a member of the same `TaskPool` that
3364               `finish` is being called on will result in a deadlock.
3365      */
3366     void finish(bool blocking = false) @trusted
3367     {
3368         {
3369             queueLock();
3370             scope(exit) queueUnlock();
3371             atomicCasUbyte(status, PoolState.running, PoolState.finishing);
3372             notifyAll();
3373         }
3374         if (blocking)
3375         {
3376             // Use this thread as a worker until everything is finished.
3377             executeWorkLoop();
3378 
3379             foreach (t; pool)
3380             {
3381                 // Maybe there should be something here to prevent a thread
3382                 // from calling join() on itself if this function is called
3383                 // from a worker thread in the same pool, but:
3384                 //
3385                 // 1.  Using an if statement to skip join() would result in
3386                 //     finish() returning without all tasks being finished.
3387                 //
3388                 // 2.  If an exception were thrown, it would bubble up to the
3389                 //     Task from which finish() was called and likely be
3390                 //     swallowed.
3391                 t.join();
3392             }
3393         }
3394     }
3395 
3396     /// Returns the number of worker threads in the pool.
3397     @property size_t size() @safe const pure nothrow
3398     {
3399         return pool.length;
3400     }
3401 
3402     /**
3403     Put a `Task` object on the back of the task queue.  The `Task`
3404     object may be passed by pointer or reference.
3405 
3406     Example:
3407     ---
3408     import std.file;
3409 
3410     // Create a task.
3411     auto t = task!read("foo.txt");
3412 
3413     // Add it to the queue to be executed.
3414     taskPool.put(t);
3415     ---
3416 
3417     Notes:
3418 
3419     @trusted overloads of this function are called for `Task`s if
3420     $(REF hasUnsharedAliasing, std,traits) is false for the `Task`'s
3421     return type or the function the `Task` executes is `pure`.
3422     `Task` objects that meet all other requirements specified in the
3423     `@trusted` overloads of `task` and `scopedTask` may be created
3424     and executed from `@safe` code via `Task.executeInNewThread` but
3425     not via `TaskPool`.
3426 
3427     While this function takes the address of variables that may
3428     be on the stack, some overloads are marked as @trusted.
3429     `Task` includes a destructor that waits for the task to complete
3430     before destroying the stack frame it is allocated on.  Therefore,
3431     it is impossible for the stack frame to be destroyed before the task is
3432     complete and no longer referenced by a `TaskPool`.
3433     */
3434     void put(alias fun, Args...)(ref Task!(fun, Args) task)
3435     if (!isSafeReturn!(typeof(task)))
3436     {
3437         task.pool = this;
3438         abstractPut(task.basePtr);
3439     }
3440 
3441     /// Ditto
3442     void put(alias fun, Args...)(Task!(fun, Args)* task)
3443     if (!isSafeReturn!(typeof(*task)))
3444     {
3445         import std.exception : enforce;
3446         enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3447         put(*task);
3448     }
3449 
3450     @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task)
3451     if (isSafeReturn!(typeof(task)))
3452     {
3453         task.pool = this;
3454         abstractPut(task.basePtr);
3455     }
3456 
3457     @trusted void put(alias fun, Args...)(Task!(fun, Args)* task)
3458     if (isSafeReturn!(typeof(*task)))
3459     {
3460         import std.exception : enforce;
3461         enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3462         put(*task);
3463     }
3464 
3465     /**
3466     These properties control whether the worker threads are daemon threads.
3467     A daemon thread is automatically terminated when all non-daemon threads
3468     have terminated.  A non-daemon thread will prevent a program from
3469     terminating as long as it has not terminated.
3470 
3471     If any `TaskPool` with non-daemon threads is active, either `stop`
3472     or `finish` must be called on it before the program can terminate.
3473 
3474     The worker treads in the `TaskPool` instance returned by the
3475     `taskPool` property are daemon by default.  The worker threads of
3476     manually instantiated task pools are non-daemon by default.
3477 
3478     Note:  For a size zero pool, the getter arbitrarily returns true and the
3479            setter has no effect.
3480     */
3481     bool isDaemon() @property @trusted
3482     {
3483         queueLock();
3484         scope(exit) queueUnlock();
3485         return (size == 0) ? true : pool[0].isDaemon;
3486     }
3487 
3488     /// Ditto
3489     void isDaemon(bool newVal) @property @trusted
3490     {
3491         queueLock();
3492         scope(exit) queueUnlock();
3493         foreach (thread; pool)
3494         {
3495             thread.isDaemon = newVal;
3496         }
3497     }
3498 
3499     /**
3500     These functions allow getting and setting the OS scheduling priority of
3501     the worker threads in this `TaskPool`.  They forward to
3502     `core.thread.Thread.priority`, so a given priority value here means the
3503     same thing as an identical priority value in `core.thread`.
3504 
3505     Note:  For a size zero pool, the getter arbitrarily returns
3506            `core.thread.Thread.PRIORITY_MIN` and the setter has no effect.
3507     */
3508     int priority() @property @trusted
3509     {
3510         return (size == 0) ? core.thread.Thread.PRIORITY_MIN :
3511         pool[0].priority;
3512     }
3513 
3514     /// Ditto
3515     void priority(int newPriority) @property @trusted
3516     {
3517         if (size > 0)
3518         {
3519             foreach (t; pool)
3520             {
3521                 t.priority = newPriority;
3522             }
3523         }
3524     }
3525 }
3526 
3527 @system unittest
3528 {
3529     import std.algorithm.iteration : sum;
3530     import std.range : iota;
3531     import std.typecons : tuple;
3532 
3533     enum N = 100;
3534     auto r = iota(1, N + 1);
3535     const expected = r.sum();
3536 
3537     // Just the range
3538     assert(taskPool.fold!"a + b"(r) == expected);
3539 
3540     // Range and seeds
3541     assert(taskPool.fold!"a + b"(r, 0) == expected);
3542     assert(taskPool.fold!("a + b", "a + b")(r, 0, 0) == tuple(expected, expected));
3543 
3544     // Range, seeds, and work unit size
3545     assert(taskPool.fold!"a + b"(r, 0, 42) == expected);
3546     assert(taskPool.fold!("a + b", "a + b")(r, 0, 0, 42) == tuple(expected, expected));
3547 }
3548 
3549 // Issue 16705
3550 @system unittest
3551 {
3552     struct MyIota
3553     {
3554         size_t front;
3555         void popFront()(){front++;}
3556         auto empty(){return front >= 25;}
3557         auto opIndex(size_t i){return front+i;}
3558         auto length(){return 25-front;}
3559     }
3560 
3561     auto mySum = taskPool.reduce!"a + b"(MyIota());
3562 }
3563 
3564 /**
3565 Returns a lazily initialized global instantiation of `TaskPool`.
3566 This function can safely be called concurrently from multiple non-worker
3567 threads.  The worker threads in this pool are daemon threads, meaning that it
3568 is not necessary to call `TaskPool.stop` or `TaskPool.finish` before
3569 terminating the main thread.
3570 */
3571 @property TaskPool taskPool() @trusted
3572 {
3573     import std.concurrency : initOnce;
3574     __gshared TaskPool pool;
3575     return initOnce!pool({
3576         auto p = new TaskPool(defaultPoolThreads);
3577         p.isDaemon = true;
3578         return p;
3579     }());
3580 }
3581 
3582 private shared uint _defaultPoolThreads = uint.max;
3583 
3584 /**
3585 These properties get and set the number of worker threads in the `TaskPool`
3586 instance returned by `taskPool`.  The default value is `totalCPUs` - 1.
3587 Calling the setter after the first call to `taskPool` does not changes
3588 number of worker threads in the instance returned by `taskPool`.
3589 */
3590 @property uint defaultPoolThreads() @trusted
3591 {
3592     const local = atomicLoad(_defaultPoolThreads);
3593     return local < uint.max ? local : totalCPUs - 1;
3594 }
3595 
3596 /// Ditto
3597 @property void defaultPoolThreads(uint newVal) @trusted
3598 {
3599     atomicStore(_defaultPoolThreads, newVal);
3600 }
3601 
3602 /**
3603 Convenience functions that forwards to `taskPool.parallel`.  The
3604 purpose of these is to make parallel foreach less verbose and more
3605 readable.
3606 
3607 Example:
3608 ---
3609 // Find the logarithm of every number from
3610 // 1 to 1_000_000 in parallel, using the
3611 // default TaskPool instance.
3612 auto logs = new double[1_000_000];
3613 
3614 foreach (i, ref elem; parallel(logs))
3615 {
3616     elem = log(i + 1.0);
3617 }
3618 ---
3619 
3620 */
3621 ParallelForeach!R parallel(R)(R range)
3622 {
3623     return taskPool.parallel(range);
3624 }
3625 
3626 /// Ditto
3627 ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
3628 {
3629     return taskPool.parallel(range, workUnitSize);
3630 }
3631 
3632 //  `each` should be usable with parallel
3633 // https://issues.dlang.org/show_bug.cgi?id=17019
3634 @system unittest
3635 {
3636     import std.algorithm.iteration : each, sum;
3637     import std.range : iota;
3638 
3639     // check behavior with parallel
3640     auto arr = new int[10];
3641     parallel(arr).each!((ref e) => e += 1);
3642     assert(arr.sum == 10);
3643 
3644     auto arrIndex = new int[10];
3645     parallel(arrIndex).each!((i, ref e) => e += i);
3646     assert(arrIndex.sum == 10.iota.sum);
3647 }
3648 
3649 // https://issues.dlang.org/show_bug.cgi?id=22745
3650 @system unittest
3651 {
3652     auto pool = new TaskPool(0);
3653     int[] empty;
3654     foreach (i; pool.parallel(empty)) {}
3655     pool.finish();
3656 }
3657 
3658 // Thrown when a parallel foreach loop is broken from.
3659 class ParallelForeachError : Error
3660 {
3661     this()
3662     {
3663         super("Cannot break from a parallel foreach loop using break, return, "
3664               ~ "labeled break/continue or goto statements.");
3665     }
3666 }
3667 
3668 /*------Structs that implement opApply for parallel foreach.------------------*/
3669 private template randLen(R)
3670 {
3671     enum randLen = isRandomAccessRange!R && hasLength!R;
3672 }
3673 
3674 private void submitAndExecute(
3675     TaskPool pool,
3676     scope void delegate() doIt
3677 )
3678 {
3679     import core.exception : OutOfMemoryError;
3680     immutable nThreads = pool.size + 1;
3681 
3682     alias PTask = typeof(scopedTask(doIt));
3683     import core.stdc.stdlib : malloc, free;
3684     import core.stdc.string : memcpy;
3685 
3686     // The logical thing to do would be to just use alloca() here, but that
3687     // causes problems on Windows for reasons that I don't understand
3688     // (tentatively a compiler bug) and definitely doesn't work on Posix due
3689     // to https://issues.dlang.org/show_bug.cgi?id=3753.
3690     // Therefore, allocate a fixed buffer and fall back to `malloc()` if
3691     // someone's using a ridiculous amount of threads.
3692     // Also, the using a byte array instead of a PTask array as the fixed buffer
3693     // is to prevent d'tors from being called on uninitialized excess PTask
3694     // instances.
3695     enum nBuf = 64;
3696     byte[nBuf * PTask.sizeof] buf = void;
3697     PTask[] tasks;
3698     if (nThreads <= nBuf)
3699     {
3700         tasks = (cast(PTask*) buf.ptr)[0 .. nThreads];
3701     }
3702     else
3703     {
3704         auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof);
3705         if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism.");
3706         tasks = ptr[0 .. nThreads];
3707     }
3708 
3709     scope(exit)
3710     {
3711         if (nThreads > nBuf)
3712         {
3713             free(tasks.ptr);
3714         }
3715     }
3716 
3717     foreach (ref t; tasks)
3718     {
3719         import core.stdc.string : memcpy;
3720 
3721         // This silly looking code is necessary to prevent d'tors from being
3722         // called on uninitialized objects.
3723         auto temp = scopedTask(doIt);
3724         memcpy(&t, &temp, PTask.sizeof);
3725 
3726         // This has to be done to t after copying, not temp before copying.
3727         // Otherwise, temp's destructor will sit here and wait for the
3728         // task to finish.
3729         t.pool = pool;
3730     }
3731 
3732     foreach (i; 1 .. tasks.length - 1)
3733     {
3734         tasks[i].next = tasks[i + 1].basePtr;
3735         tasks[i + 1].prev = tasks[i].basePtr;
3736     }
3737 
3738     if (tasks.length > 1)
3739     {
3740         pool.queueLock();
3741         scope(exit) pool.queueUnlock();
3742 
3743         pool.abstractPutGroupNoSync(
3744             tasks[1].basePtr,
3745             tasks[$ - 1].basePtr
3746         );
3747     }
3748 
3749     if (tasks.length > 0)
3750     {
3751         try
3752         {
3753             tasks[0].job();
3754         }
3755         catch (Throwable e)
3756         {
3757             tasks[0].exception = e; // nocoverage
3758         }
3759         tasks[0].taskStatus = TaskStatus.done;
3760 
3761         // Try to execute each of these in the current thread
3762         foreach (ref task; tasks[1..$])
3763         {
3764             pool.tryDeleteExecute(task.basePtr);
3765         }
3766     }
3767 
3768     Throwable firstException;
3769 
3770     foreach (i, ref task; tasks)
3771     {
3772         try
3773         {
3774             task.yieldForce;
3775         }
3776         catch (Throwable e)
3777         {
3778             /* Chain e to front because order doesn't matter and because
3779              * e is not likely to be a chain itself (so fewer traversals)
3780              */
3781             firstException = Throwable.chainTogether(e, firstException);
3782             continue;
3783         }
3784     }
3785 
3786     if (firstException) throw firstException;
3787 }
3788 
3789 void foreachErr()
3790 {
3791     throw new ParallelForeachError();
3792 }
3793 
3794 int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg)
3795 {
3796     with(p)
3797     {
3798         int res = 0;
3799         size_t index = 0;
3800 
3801         // The explicit ElementType!R in the foreach loops is necessary for
3802         // correct behavior when iterating over strings.
3803         static if (hasLvalueElements!R)
3804         {
3805             foreach (ref ElementType!R elem; range)
3806             {
3807                 static if (Parameters!dg.length == 2)
3808                 {
3809                     res = dg(index, elem);
3810                 }
3811                 else
3812                 {
3813                     res = dg(elem);
3814                 }
3815                 if (res) break;
3816                 index++;
3817             }
3818         }
3819         else
3820         {
3821             foreach (ElementType!R elem; range)
3822             {
3823                 static if (Parameters!dg.length == 2)
3824                 {
3825                     res = dg(index, elem);
3826                 }
3827                 else
3828                 {
3829                     res = dg(elem);
3830                 }
3831                 if (res) break;
3832                 index++;
3833             }
3834         }
3835         if (res) foreachErr;
3836         return res;
3837     }
3838 }
3839 
3840 private enum string parallelApplyMixinRandomAccess = q{
3841     // Handle empty thread pool as special case.
3842     if (pool.size == 0)
3843     {
3844         return doSizeZeroCase(this, dg);
3845     }
3846 
3847     // Whether iteration is with or without an index variable.
3848     enum withIndex = Parameters!(typeof(dg)).length == 2;
3849 
3850     shared size_t workUnitIndex = size_t.max;  // Effectively -1:  chunkIndex + 1 == 0
3851     immutable len = range.length;
3852     if (!len) return 0;
3853 
3854     shared bool shouldContinue = true;
3855 
3856     void doIt()
3857     {
3858         import std.algorithm.comparison : min;
3859 
3860         scope(failure)
3861         {
3862             // If an exception is thrown, all threads should bail.
3863             atomicStore(shouldContinue, false);
3864         }
3865 
3866         while (atomicLoad(shouldContinue))
3867         {
3868             immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
3869             immutable start = workUnitSize * myUnitIndex;
3870             if (start >= len)
3871             {
3872                 atomicStore(shouldContinue, false);
3873                 break;
3874             }
3875 
3876             immutable end = min(len, start + workUnitSize);
3877 
3878             foreach (i; start .. end)
3879             {
3880                 static if (withIndex)
3881                 {
3882                     if (dg(i, range[i])) foreachErr();
3883                 }
3884                 else
3885                 {
3886                     if (dg(range[i])) foreachErr();
3887                 }
3888             }
3889         }
3890     }
3891 
3892     submitAndExecute(pool, &doIt);
3893 
3894     return 0;
3895 };
3896 
3897 enum string parallelApplyMixinInputRange = q{
3898     // Handle empty thread pool as special case.
3899     if (pool.size == 0)
3900     {
3901         return doSizeZeroCase(this, dg);
3902     }
3903 
3904     // Whether iteration is with or without an index variable.
3905     enum withIndex = Parameters!(typeof(dg)).length == 2;
3906 
3907     // This protects the range while copying it.
3908     auto rangeMutex = new Mutex();
3909 
3910     shared bool shouldContinue = true;
3911 
3912     // The total number of elements that have been popped off range.
3913     // This is updated only while protected by rangeMutex;
3914     size_t nPopped = 0;
3915 
3916     static if (
3917         is(typeof(range.buf1)) &&
3918         is(typeof(range.bufPos)) &&
3919         is(typeof(range.doBufSwap()))
3920     )
3921     {
3922         // Make sure we don't have the buffer recycling overload of
3923         // asyncBuf.
3924         static if (
3925             is(typeof(range.source)) &&
3926             isRoundRobin!(typeof(range.source))
3927         )
3928         {
3929             static assert(0, "Cannot execute a parallel foreach loop on " ~
3930             "the buffer recycling overload of asyncBuf.");
3931         }
3932 
3933         enum bool bufferTrick = true;
3934     }
3935     else
3936     {
3937         enum bool bufferTrick = false;
3938     }
3939 
3940     void doIt()
3941     {
3942         scope(failure)
3943         {
3944             // If an exception is thrown, all threads should bail.
3945             atomicStore(shouldContinue, false);
3946         }
3947 
3948         static if (hasLvalueElements!R)
3949         {
3950             alias Temp = ElementType!R*[];
3951             Temp temp;
3952 
3953             // Returns:  The previous value of nPopped.
3954             size_t makeTemp()
3955             {
3956                 import std.algorithm.internal : addressOf;
3957                 import std.array : uninitializedArray;
3958 
3959                 if (temp is null)
3960                 {
3961                     temp = uninitializedArray!Temp(workUnitSize);
3962                 }
3963 
3964                 rangeMutex.lock();
3965                 scope(exit) rangeMutex.unlock();
3966 
3967                 size_t i = 0;
3968                 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3969                 {
3970                     temp[i] = addressOf(range.front);
3971                 }
3972 
3973                 temp = temp[0 .. i];
3974                 auto ret = nPopped;
3975                 nPopped += temp.length;
3976                 return ret;
3977             }
3978 
3979         }
3980         else
3981         {
3982 
3983             alias Temp = ElementType!R[];
3984             Temp temp;
3985 
3986             // Returns:  The previous value of nPopped.
3987             static if (!bufferTrick) size_t makeTemp()
3988             {
3989                 import std.array : uninitializedArray;
3990 
3991                 if (temp is null)
3992                 {
3993                     temp = uninitializedArray!Temp(workUnitSize);
3994                 }
3995 
3996                 rangeMutex.lock();
3997                 scope(exit) rangeMutex.unlock();
3998 
3999                 size_t i = 0;
4000                 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
4001                 {
4002                     temp[i] = range.front;
4003                 }
4004 
4005                 temp = temp[0 .. i];
4006                 auto ret = nPopped;
4007                 nPopped += temp.length;
4008                 return ret;
4009             }
4010 
4011             static if (bufferTrick) size_t makeTemp()
4012             {
4013                 import std.algorithm.mutation : swap;
4014                 rangeMutex.lock();
4015                 scope(exit) rangeMutex.unlock();
4016 
4017                 // Elide copying by just swapping buffers.
4018                 temp.length = range.buf1.length;
4019                 swap(range.buf1, temp);
4020 
4021                 // This is necessary in case popFront() has been called on
4022                 // range before entering the parallel foreach loop.
4023                 temp = temp[range.bufPos..$];
4024 
4025                 static if (is(typeof(range._length)))
4026                 {
4027                     range._length -= (temp.length - range.bufPos);
4028                 }
4029 
4030                 range.doBufSwap();
4031                 auto ret = nPopped;
4032                 nPopped += temp.length;
4033                 return ret;
4034             }
4035         }
4036 
4037         while (atomicLoad(shouldContinue))
4038         {
4039             auto overallIndex = makeTemp();
4040             if (temp.empty)
4041             {
4042                 atomicStore(shouldContinue, false);
4043                 break;
4044             }
4045 
4046             foreach (i; 0 .. temp.length)
4047             {
4048                 scope(success) overallIndex++;
4049 
4050                 static if (hasLvalueElements!R)
4051                 {
4052                     static if (withIndex)
4053                     {
4054                         if (dg(overallIndex, *temp[i])) foreachErr();
4055                     }
4056                     else
4057                     {
4058                         if (dg(*temp[i])) foreachErr();
4059                     }
4060                 }
4061                 else
4062                 {
4063                     static if (withIndex)
4064                     {
4065                         if (dg(overallIndex, temp[i])) foreachErr();
4066                     }
4067                     else
4068                     {
4069                         if (dg(temp[i])) foreachErr();
4070                     }
4071                 }
4072             }
4073         }
4074     }
4075 
4076     submitAndExecute(pool, &doIt);
4077 
4078     return 0;
4079 };
4080 
4081 
4082 private struct ParallelForeach(R)
4083 {
4084     TaskPool pool;
4085     R range;
4086     size_t workUnitSize;
4087     alias E = ElementType!R;
4088 
4089     static if (hasLvalueElements!R)
4090     {
4091         alias NoIndexDg = int delegate(ref E);
4092         alias IndexDg = int delegate(size_t, ref E);
4093     }
4094     else
4095     {
4096         alias NoIndexDg = int delegate(E);
4097         alias IndexDg = int delegate(size_t, E);
4098     }
4099 
4100     int opApply(scope NoIndexDg dg)
4101     {
4102         static if (randLen!R)
4103         {
4104             mixin(parallelApplyMixinRandomAccess);
4105         }
4106         else
4107         {
4108             mixin(parallelApplyMixinInputRange);
4109         }
4110     }
4111 
4112     int opApply(scope IndexDg dg)
4113     {
4114         static if (randLen!R)
4115         {
4116             mixin(parallelApplyMixinRandomAccess);
4117         }
4118         else
4119         {
4120             mixin(parallelApplyMixinInputRange);
4121         }
4122     }
4123 }
4124 
4125 /*
4126 This struct buffers the output of a callable that outputs data into a
4127 user-supplied buffer into a set of buffers of some fixed size.  It allows these
4128 buffers to be accessed with an input range interface.  This is used internally
4129 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an
4130 instance and forwards it to the input range overload of asyncBuf.
4131 */
4132 private struct RoundRobinBuffer(C1, C2)
4133 {
4134     // No need for constraints because they're already checked for in asyncBuf.
4135 
4136     alias Array = Parameters!(C1.init)[0];
4137     alias T = typeof(Array.init[0]);
4138 
4139     T[][] bufs;
4140     size_t index;
4141     C1 nextDel;
4142     C2 emptyDel;
4143     bool _empty;
4144     bool primed;
4145 
4146     this(
4147         C1 nextDel,
4148         C2 emptyDel,
4149         size_t initialBufSize,
4150         size_t nBuffers
4151     ) {
4152         this.nextDel = nextDel;
4153         this.emptyDel = emptyDel;
4154         bufs.length = nBuffers;
4155 
4156         foreach (ref buf; bufs)
4157         {
4158             buf.length = initialBufSize;
4159         }
4160     }
4161 
4162     void prime()
4163     in
4164     {
4165         assert(!empty);
4166     }
4167     do
4168     {
4169         scope(success) primed = true;
4170         nextDel(bufs[index]);
4171     }
4172 
4173 
4174     T[] front() @property
4175     in
4176     {
4177         assert(!empty);
4178     }
4179     do
4180     {
4181         if (!primed) prime();
4182         return bufs[index];
4183     }
4184 
4185     void popFront()
4186     {
4187         if (empty || emptyDel())
4188         {
4189             _empty = true;
4190             return;
4191         }
4192 
4193         index = (index + 1) % bufs.length;
4194         primed = false;
4195     }
4196 
4197     bool empty() @property const @safe pure nothrow
4198     {
4199         return _empty;
4200     }
4201 }
4202 
4203 version (StdUnittest)
4204 {
4205     // This was the only way I could get nested maps to work.
4206     private __gshared TaskPool poolInstance;
4207 }
4208 
4209 // These test basic functionality but don't stress test for threading bugs.
4210 // These are the tests that should be run every time Phobos is compiled.
4211 @system unittest
4212 {
4213     import std.algorithm.comparison : equal, min, max;
4214     import std.algorithm.iteration : filter, map, reduce;
4215     import std.array : split;
4216     import std.conv : text;
4217     import std.exception : assertThrown;
4218     import std.math.operations : isClose;
4219     import std.math.algebraic : sqrt, abs;
4220     import std.math.exponential : log;
4221     import std.range : indexed, iota, join;
4222     import std.typecons : Tuple, tuple;
4223     import std.stdio;
4224 
4225     poolInstance = new TaskPool(2);
4226     scope(exit) poolInstance.stop();
4227 
4228     // The only way this can be verified is manually.
4229     debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs);
4230 
4231     auto oldPriority = poolInstance.priority;
4232     poolInstance.priority = Thread.PRIORITY_MAX;
4233     assert(poolInstance.priority == Thread.PRIORITY_MAX);
4234 
4235     poolInstance.priority = Thread.PRIORITY_MIN;
4236     assert(poolInstance.priority == Thread.PRIORITY_MIN);
4237 
4238     poolInstance.priority = oldPriority;
4239     assert(poolInstance.priority == oldPriority);
4240 
4241     static void refFun(ref uint num)
4242     {
4243         num++;
4244     }
4245 
4246     uint x;
4247 
4248     // Test task().
4249     auto t = task!refFun(x);
4250     poolInstance.put(t);
4251     t.yieldForce;
4252     assert(t.args[0] == 1);
4253 
4254     auto t2 = task(&refFun, x);
4255     poolInstance.put(t2);
4256     t2.yieldForce;
4257     assert(t2.args[0] == 1);
4258 
4259     // Test scopedTask().
4260     auto st = scopedTask!refFun(x);
4261     poolInstance.put(st);
4262     st.yieldForce;
4263     assert(st.args[0] == 1);
4264 
4265     auto st2 = scopedTask(&refFun, x);
4266     poolInstance.put(st2);
4267     st2.yieldForce;
4268     assert(st2.args[0] == 1);
4269 
4270     // Test executeInNewThread().
4271     auto ct = scopedTask!refFun(x);
4272     ct.executeInNewThread(Thread.PRIORITY_MAX);
4273     ct.yieldForce;
4274     assert(ct.args[0] == 1);
4275 
4276     // Test ref return.
4277     uint toInc = 0;
4278     static ref T makeRef(T)(ref T num)
4279     {
4280         return num;
4281     }
4282 
4283     auto t3 = task!makeRef(toInc);
4284     taskPool.put(t3);
4285     assert(t3.args[0] == 0);
4286     t3.spinForce++;
4287     assert(t3.args[0] == 1);
4288 
4289     static void testSafe() @safe {
4290         static int bump(int num)
4291         {
4292             return num + 1;
4293         }
4294 
4295         auto safePool = new TaskPool(0);
4296         auto t = task(&bump, 1);
4297         taskPool.put(t);
4298         assert(t.yieldForce == 2);
4299 
4300         auto st = scopedTask(&bump, 1);
4301         taskPool.put(st);
4302         assert(st.yieldForce == 2);
4303         safePool.stop();
4304     }
4305 
4306     auto arr = [1,2,3,4,5];
4307     auto nums = new uint[5];
4308     auto nums2 = new uint[5];
4309 
4310     foreach (i, ref elem; poolInstance.parallel(arr))
4311     {
4312         elem++;
4313         nums[i] = cast(uint) i + 2;
4314         nums2[i] = elem;
4315     }
4316 
4317     assert(nums == [2,3,4,5,6], text(nums));
4318     assert(nums2 == nums, text(nums2));
4319     assert(arr == nums, text(arr));
4320 
4321     // Test const/immutable arguments.
4322     static int add(int lhs, int rhs)
4323     {
4324         return lhs + rhs;
4325     }
4326     immutable addLhs = 1;
4327     immutable addRhs = 2;
4328     auto addTask = task(&add, addLhs, addRhs);
4329     auto addScopedTask = scopedTask(&add, addLhs, addRhs);
4330     poolInstance.put(addTask);
4331     poolInstance.put(addScopedTask);
4332     assert(addTask.yieldForce == 3);
4333     assert(addScopedTask.yieldForce == 3);
4334 
4335     // Test parallel foreach with non-random access range.
4336     auto range = filter!"a != 666"([0, 1, 2, 3, 4]);
4337 
4338     foreach (i, elem; poolInstance.parallel(range))
4339     {
4340         nums[i] = cast(uint) i;
4341     }
4342 
4343     assert(nums == [0,1,2,3,4]);
4344 
4345     auto logs = new double[1_000_000];
4346     foreach (i, ref elem; poolInstance.parallel(logs))
4347     {
4348         elem = log(i + 1.0);
4349     }
4350 
4351     foreach (i, elem; logs)
4352     {
4353         assert(isClose(elem, log(double(i + 1))));
4354     }
4355 
4356     assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]);
4357     assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]);
4358     assert(poolInstance.amap!("a * a", "-a")([1,2,3]) ==
4359            [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4360 
4361     auto tupleBuf = new Tuple!(int, int)[3];
4362     poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf);
4363     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4364     poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf);
4365     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4366 
4367     // Test amap with a non-array buffer.
4368     auto toIndex = new int[5];
4369     auto ind = indexed(toIndex, [3, 1, 4, 0, 2]);
4370     poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind);
4371     assert(equal(ind, [2, 4, 6, 8, 10]));
4372     assert(equal(toIndex, [8, 4, 10, 2, 6]));
4373     poolInstance.amap!"a / 2"(ind, ind);
4374     assert(equal(ind, [1, 2, 3, 4, 5]));
4375     assert(equal(toIndex, [4, 2, 5, 1, 3]));
4376 
4377     auto buf = new int[5];
4378     poolInstance.amap!"a * a"([1,2,3,4,5], buf);
4379     assert(buf == [1,4,9,16,25]);
4380     poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf);
4381     assert(buf == [1,4,9,16,25]);
4382 
4383     assert(poolInstance.reduce!"a + b"([1]) == 1);
4384     assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10);
4385     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10);
4386     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10);
4387     assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4));
4388     assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) ==
4389            tuple(10, 24));
4390 
4391     immutable serialAns = reduce!"a + b"(iota(1000));
4392     assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns);
4393     assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns);
4394 
4395     // Test worker-local storage.
4396     auto wl = poolInstance.workerLocalStorage(0);
4397     foreach (i; poolInstance.parallel(iota(1000), 1))
4398     {
4399         wl.get = wl.get + i;
4400     }
4401 
4402     auto wlRange = wl.toRange;
4403     auto parallelSum = poolInstance.reduce!"a + b"(wlRange);
4404     assert(parallelSum == 499500);
4405     assert(wlRange[0 .. 1][0] == wlRange[0]);
4406     assert(wlRange[1 .. 2][0] == wlRange[1]);
4407 
4408     // Test finish()
4409     {
4410         static void slowFun() { Thread.sleep(dur!"msecs"(1)); }
4411 
4412         auto pool1 = new TaskPool();
4413         auto tSlow = task!slowFun();
4414         pool1.put(tSlow);
4415         pool1.finish();
4416         tSlow.yieldForce;
4417         // Can't assert that pool1.status == PoolState.stopNow because status
4418         // doesn't change until after the "done" flag is set and the waiting
4419         // thread is woken up.
4420 
4421         auto pool2 = new TaskPool();
4422         auto tSlow2 = task!slowFun();
4423         pool2.put(tSlow2);
4424         pool2.finish(true); // blocking
4425         assert(tSlow2.done);
4426 
4427         // Test fix for https://issues.dlang.org/show_bug.cgi?id=8582 by making pool size zero.
4428         auto pool3 = new TaskPool(0);
4429         auto tSlow3 = task!slowFun();
4430         pool3.put(tSlow3);
4431         pool3.finish(true); // blocking
4432         assert(tSlow3.done);
4433 
4434         // This is correct because no thread will terminate unless pool2.status
4435         // and pool3.status have already been set to stopNow.
4436         assert(pool2.status == TaskPool.PoolState.stopNow);
4437         assert(pool3.status == TaskPool.PoolState.stopNow);
4438     }
4439 
4440     // Test default pool stuff.
4441     assert(taskPool.size == totalCPUs - 1);
4442 
4443     nums = new uint[1000];
4444     foreach (i; parallel(iota(1000)))
4445     {
4446         nums[i] = cast(uint) i;
4447     }
4448     assert(equal(nums, iota(1000)));
4449 
4450     assert(equal(
4451                poolInstance.map!"a * a"(iota(3_000_001), 10_000),
4452                map!"a * a"(iota(3_000_001))
4453            ));
4454 
4455     // The filter is to kill random access and test the non-random access
4456     // branch.
4457     assert(equal(
4458                poolInstance.map!"a * a"(
4459                    filter!"a == a"(iota(3_000_001)
4460                                   ), 10_000, 1000),
4461                map!"a * a"(iota(3_000_001))
4462            ));
4463 
4464     assert(
4465         reduce!"a + b"(0UL,
4466                        poolInstance.map!"a * a"(iota(300_001), 10_000)
4467                       ) ==
4468         reduce!"a + b"(0UL,
4469                        map!"a * a"(iota(300_001))
4470                       )
4471     );
4472 
4473     assert(equal(
4474                iota(1_000_002),
4475                poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002)))
4476            ));
4477 
4478     {
4479         import std.conv : to;
4480         import std.file : deleteme;
4481 
4482         string temp_file = deleteme ~ "-tempDelMe.txt";
4483         auto file = File(temp_file, "wb");
4484         scope(exit)
4485         {
4486             file.close();
4487             import std.file;
4488             remove(temp_file);
4489         }
4490 
4491         auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]];
4492         foreach (row; written)
4493         {
4494             file.writeln(join(to!(string[])(row), "\t"));
4495         }
4496 
4497         file = File(temp_file);
4498 
4499         void next(ref char[] buf)
4500         {
4501             file.readln(buf);
4502             import std.string : chomp;
4503             buf = chomp(buf);
4504         }
4505 
4506         double[][] read;
4507         auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
4508 
4509         foreach (line; asyncReader)
4510         {
4511             if (line.length == 0) continue;
4512             auto ls = line.split("\t");
4513             read ~= to!(double[])(ls);
4514         }
4515 
4516         assert(read == written);
4517         file.close();
4518     }
4519 
4520     // Test Map/AsyncBuf chaining.
4521 
4522     auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100);
4523     auto temp = poolInstance.map!sqrt(
4524                     abuf, 100, 5
4525                 );
4526     auto lmchain = poolInstance.map!"a * a"(temp, 100, 5);
4527     lmchain.popFront();
4528 
4529     int ii;
4530     foreach ( elem; (lmchain))
4531     {
4532         if (!isClose(elem, ii))
4533         {
4534             stderr.writeln(ii, '\t', elem);
4535         }
4536         ii++;
4537     }
4538 
4539     // Test buffer trick in parallel foreach.
4540     abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100);
4541     abuf.popFront();
4542     auto bufTrickTest = new size_t[abuf.length];
4543     foreach (i, elem; parallel(abuf))
4544     {
4545         bufTrickTest[i] = i;
4546     }
4547 
4548     assert(equal(iota(1_000_000), bufTrickTest));
4549 
4550     auto myTask = task!(abs)(-1);
4551     taskPool.put(myTask);
4552     assert(myTask.spinForce == 1);
4553 
4554     // Test that worker local storage from one pool receives an index of 0
4555     // when the index is queried w.r.t. another pool.  The only way to do this
4556     // is non-deterministically.
4557     foreach (i; parallel(iota(1000), 1))
4558     {
4559         assert(poolInstance.workerIndex == 0);
4560     }
4561 
4562     foreach (i; poolInstance.parallel(iota(1000), 1))
4563     {
4564         assert(taskPool.workerIndex == 0);
4565     }
4566 
4567     // Test exception handling.
4568     static void parallelForeachThrow()
4569     {
4570         foreach (elem; parallel(iota(10)))
4571         {
4572             throw new Exception("");
4573         }
4574     }
4575 
4576     assertThrown!Exception(parallelForeachThrow());
4577 
4578     static int reduceException(int a, int b)
4579     {
4580         throw new Exception("");
4581     }
4582 
4583     assertThrown!Exception(
4584         poolInstance.reduce!reduceException(iota(3))
4585     );
4586 
4587     static int mapException(int a)
4588     {
4589         throw new Exception("");
4590     }
4591 
4592     assertThrown!Exception(
4593         poolInstance.amap!mapException(iota(3))
4594     );
4595 
4596     static void mapThrow()
4597     {
4598         auto m = poolInstance.map!mapException(iota(3));
4599         m.popFront();
4600     }
4601 
4602     assertThrown!Exception(mapThrow());
4603 
4604     struct ThrowingRange
4605     {
4606         @property int front()
4607         {
4608             return 1;
4609         }
4610         void popFront()
4611         {
4612             throw new Exception("");
4613         }
4614         enum bool empty = false;
4615     }
4616 
4617     assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init));
4618 }
4619 
4620 //version = parallelismStressTest;
4621 
4622 // These are more like stress tests than real unit tests.  They print out
4623 // tons of stuff and should not be run every time make unittest is run.
4624 version (parallelismStressTest)
4625 {
4626     @system unittest
4627     {
4628         import std.stdio : stderr, writeln, readln;
4629         import std.range : iota;
4630         import std.algorithm.iteration : filter, reduce;
4631 
4632         size_t attempt;
4633         for (; attempt < 10; attempt++)
4634             foreach (poolSize; [0, 4])
4635         {
4636 
4637             poolInstance = new TaskPool(poolSize);
4638 
4639             uint[] numbers = new uint[1_000];
4640 
4641             foreach (i; poolInstance.parallel( iota(0, numbers.length)) )
4642             {
4643                 numbers[i] = cast(uint) i;
4644             }
4645 
4646             // Make sure it works.
4647             foreach (i; 0 .. numbers.length)
4648             {
4649                 assert(numbers[i] == i);
4650             }
4651 
4652             stderr.writeln("Done creating nums.");
4653 
4654 
4655             auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000));
4656             foreach (num; poolInstance.parallel(myNumbers))
4657             {
4658                 assert(num % 7 > 0 && num < 1000);
4659             }
4660             stderr.writeln("Done modulus test.");
4661 
4662             uint[] squares = poolInstance.amap!"a * a"(numbers, 100);
4663             assert(squares.length == numbers.length);
4664             foreach (i, number; numbers)
4665             {
4666                 assert(squares[i] == number * number);
4667             }
4668             stderr.writeln("Done squares.");
4669 
4670             auto sumFuture = task!( reduce!"a + b" )(numbers);
4671             poolInstance.put(sumFuture);
4672 
4673             ulong sumSquares = 0;
4674             foreach (elem; numbers)
4675             {
4676                 sumSquares += elem * elem;
4677             }
4678 
4679             uint mySum = sumFuture.spinForce();
4680             assert(mySum == 999 * 1000 / 2);
4681 
4682             auto mySumParallel = poolInstance.reduce!"a + b"(numbers);
4683             assert(mySum == mySumParallel);
4684             stderr.writeln("Done sums.");
4685 
4686             auto myTask = task(
4687             {
4688                 synchronized writeln("Our lives are parallel...Our lives are parallel.");
4689             });
4690             poolInstance.put(myTask);
4691 
4692             auto nestedOuter = "abcd";
4693             auto nestedInner =  iota(0, 10, 2);
4694 
4695             foreach (i, letter; poolInstance.parallel(nestedOuter, 1))
4696             {
4697                 foreach (j, number; poolInstance.parallel(nestedInner, 1))
4698                 {
4699                     synchronized writeln(i, ": ", letter, "  ", j, ": ", number);
4700                 }
4701             }
4702 
4703             poolInstance.stop();
4704         }
4705 
4706         assert(attempt == 10);
4707         writeln("Press enter to go to next round of unittests.");
4708         readln();
4709     }
4710 
4711     // These unittests are intended more for actual testing and not so much
4712     // as examples.
4713     @system unittest
4714     {
4715         import std.stdio : stderr;
4716         import std.range : iota;
4717         import std.algorithm.iteration : filter, reduce;
4718         import std.math.algebraic : sqrt;
4719         import std.math.operations : isClose;
4720         import std.math.traits : isNaN;
4721         import std.conv : text;
4722 
4723         foreach (attempt; 0 .. 10)
4724         foreach (poolSize; [0, 4])
4725         {
4726             poolInstance = new TaskPool(poolSize);
4727 
4728             // Test indexing.
4729             stderr.writeln("Creator Raw Index:  ", poolInstance.threadIndex);
4730             assert(poolInstance.workerIndex() == 0);
4731 
4732             // Test worker-local storage.
4733             auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1);
4734             foreach (i; poolInstance.parallel(iota(0U, 1_000_000)))
4735             {
4736                 workerLocalStorage.get++;
4737             }
4738             assert(reduce!"a + b"(workerLocalStorage.toRange) ==
4739             1_000_000 + poolInstance.size + 1);
4740 
4741             // Make sure work is reasonably balanced among threads.  This test is
4742             // non-deterministic and is more of a sanity check than something that
4743             // has an absolute pass/fail.
4744             shared(uint)[void*] nJobsByThread;
4745             foreach (thread; poolInstance.pool)
4746             {
4747                 nJobsByThread[cast(void*) thread] = 0;
4748             }
4749             nJobsByThread[ cast(void*) Thread.getThis()] = 0;
4750 
4751             foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 ))
4752             {
4753                 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1);
4754             }
4755 
4756             stderr.writeln("\nCurrent thread is:  ",
4757             cast(void*) Thread.getThis());
4758             stderr.writeln("Workload distribution:  ");
4759             foreach (k, v; nJobsByThread)
4760             {
4761                 stderr.writeln(k, '\t', v);
4762             }
4763 
4764             // Test whether amap can be nested.
4765             real[][] matrix = new real[][](1000, 1000);
4766             foreach (i; poolInstance.parallel( iota(0, matrix.length) ))
4767             {
4768                 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) ))
4769                 {
4770                     matrix[i][j] = i * j;
4771                 }
4772             }
4773 
4774             // Get around weird bugs having to do w/ sqrt being an intrinsic:
4775             static real mySqrt(real num)
4776             {
4777                 return sqrt(num);
4778             }
4779 
4780             static real[] parallelSqrt(real[] nums)
4781             {
4782                 return poolInstance.amap!mySqrt(nums);
4783             }
4784 
4785             real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix);
4786 
4787             foreach (i, row; sqrtMatrix)
4788             {
4789                 foreach (j, elem; row)
4790                 {
4791                     real shouldBe = sqrt( cast(real) i * j);
4792                     assert(isClose(shouldBe, elem));
4793                     sqrtMatrix[i][j] = shouldBe;
4794                 }
4795             }
4796 
4797             auto saySuccess = task(
4798             {
4799                 stderr.writeln(
4800                     "Success doing matrix stuff that involves nested pool use.");
4801             });
4802             poolInstance.put(saySuccess);
4803             saySuccess.workForce();
4804 
4805             // A more thorough test of amap, reduce:  Find the sum of the square roots of
4806             // matrix.
4807 
4808             static real parallelSum(real[] input)
4809             {
4810                 return poolInstance.reduce!"a + b"(input);
4811             }
4812 
4813             auto sumSqrt = poolInstance.reduce!"a + b"(
4814                                poolInstance.amap!parallelSum(
4815                                    sqrtMatrix
4816                                )
4817                            );
4818 
4819             assert(isClose(sumSqrt, 4.437e8, 1e-2));
4820             stderr.writeln("Done sum of square roots.");
4821 
4822             // Test whether tasks work with function pointers.
4823             /+ // This part is buggy and needs to be fixed...
4824             auto nanTask = task(&isNaN, 1.0L);
4825             poolInstance.put(nanTask);
4826             assert(nanTask.spinForce == false);
4827 
4828             if (poolInstance.size > 0)
4829             {
4830                 // Test work waiting.
4831                 static void uselessFun()
4832                 {
4833                     foreach (i; 0 .. 1_000_000) {}
4834                 }
4835 
4836                 auto uselessTasks = new typeof(task(&uselessFun))[1000];
4837                 foreach (ref uselessTask; uselessTasks)
4838                 {
4839                     uselessTask = task(&uselessFun);
4840                 }
4841                 foreach (ref uselessTask; uselessTasks)
4842                 {
4843                     poolInstance.put(uselessTask);
4844                 }
4845                 foreach (ref uselessTask; uselessTasks)
4846                 {
4847                     uselessTask.workForce();
4848                 }
4849             }
4850              +/
4851 
4852             // Test the case of non-random access + ref returns.
4853             int[] nums = [1,2,3,4,5];
4854             static struct RemoveRandom
4855             {
4856                 int[] arr;
4857 
4858                 ref int front()
4859                 {
4860                     return arr.front;
4861                 }
4862                 void popFront()
4863                 {
4864                     arr.popFront();
4865                 }
4866                 bool empty()
4867                 {
4868                     return arr.empty;
4869                 }
4870             }
4871 
4872             auto refRange = RemoveRandom(nums);
4873             foreach (ref elem; poolInstance.parallel(refRange))
4874             {
4875                 elem++;
4876             }
4877             assert(nums == [2,3,4,5,6], text(nums));
4878             stderr.writeln("Nums:  ", nums);
4879 
4880             poolInstance.stop();
4881         }
4882     }
4883 }
4884 
4885 @system unittest
4886 {
4887     static struct __S_12733
4888     {
4889         invariant() { assert(checksum == 1_234_567_890); }
4890         this(ulong u){n = u;}
4891         void opAssign(__S_12733 s){this.n = s.n;}
4892         ulong n;
4893         ulong checksum = 1_234_567_890;
4894     }
4895 
4896     static auto __genPair_12733(ulong n) { return __S_12733(n); }
4897     immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ];
4898 
4899     auto result = taskPool.amap!__genPair_12733(data);
4900 }
4901 
4902 @safe unittest
4903 {
4904     import std.range : iota;
4905 
4906     // this test was in std.range, but caused cycles.
4907     assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} }));
4908 }
4909 
4910 @safe unittest
4911 {
4912     import std.algorithm.iteration : each;
4913 
4914     long[] arr;
4915     static assert(is(typeof({
4916         arr.parallel.each!"a++";
4917     })));
4918 }
4919 
4920 // https://issues.dlang.org/show_bug.cgi?id=17539
4921 @system unittest
4922 {
4923     import std.random : rndGen;
4924     // ensure compilation
4925     try foreach (rnd; rndGen.parallel) break;
4926     catch (ParallelForeachError e) {}
4927 }