The OpenD Programming Language

1 /**
2 `std.parallelism` implements high-level primitives for SMP parallelism.
3 These include parallel foreach, parallel reduce, parallel eager map, pipelining
4 and future/promise parallelism.  `std.parallelism` is recommended when the
5 same operation is to be executed in parallel on different data, or when a
6 function is to be executed in a background thread and its result returned to a
7 well-defined main thread.  For communication between arbitrary threads, see
8 `std.concurrency`.
9 
10 `std.parallelism` is based on the concept of a `Task`.  A `Task` is an
11 object that represents the fundamental unit of work in this library and may be
12 executed in parallel with any other `Task`.  Using `Task`
13 directly allows programming with a future/promise paradigm.  All other
14 supported parallelism paradigms (parallel foreach, map, reduce, pipelining)
15 represent an additional level of abstraction over `Task`.  They
16 automatically create one or more `Task` objects, or closely related types
17 that are conceptually identical but not part of the public API.
18 
19 After creation, a `Task` may be executed in a new thread, or submitted
20 to a `TaskPool` for execution.  A `TaskPool` encapsulates a task queue
21 and its worker threads.  Its purpose is to efficiently map a large
22 number of `Task`s onto a smaller number of threads.  A task queue is a
23 FIFO queue of `Task` objects that have been submitted to the
24 `TaskPool` and are awaiting execution.  A worker thread is a thread that
25 is associated with exactly one task queue.  It executes the `Task` at the
26 front of its queue when the queue has work available, or sleeps when
27 no work is available.  Each task queue is associated with zero or
28 more worker threads.  If the result of a `Task` is needed before execution
29 by a worker thread has begun, the `Task` can be removed from the task queue
30 and executed immediately in the thread where the result is needed.
31 
32 Warning:  Unless marked as `@trusted` or `@safe`, artifacts in
33           this module allow implicit data sharing between threads and cannot
34           guarantee that client code is free from low level data races.
35 
36 Source:    $(PHOBOSSRC std/parallelism.d)
37 Author:  David Simcha
38 Copyright:  Copyright (c) 2009-2011, David Simcha.
39 License:    $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0)
40 */
41 module std.parallelism;
42 
43 version (WebAssembly) {} else:
44 
45 version (OSX)
46     version = Darwin;
47 else version (iOS)
48     version = Darwin;
49 else version (TVOS)
50     version = Darwin;
51 else version (WatchOS)
52     version = Darwin;
53 
54 ///
55 @system unittest
56 {
57     import std.algorithm.iteration : map;
58     import std.math.operations : isClose;
59     import std.parallelism : taskPool;
60     import std.range : iota;
61 
62     // Parallel reduce can be combined with
63     // std.algorithm.iteration.map to interesting effect.
64     // The following example (thanks to Russel Winder)
65     // calculates pi by quadrature  using
66     // std.algorithm.map and TaskPool.reduce.
67     // getTerm is evaluated in parallel as needed by
68     // TaskPool.reduce.
69     //
70     // Timings on an Intel i5-3450 quad core machine
71     // for n = 1_000_000_000:
72     //
73     // TaskPool.reduce:       1.067 s
74     // std.algorithm.reduce:  4.011 s
75 
76     enum n = 1_000_000;
77     enum delta = 1.0 / n;
78 
79     alias getTerm = (int i)
80     {
81         immutable x = ( i - 0.5 ) * delta;
82         return delta / ( 1.0 + x * x ) ;
83     };
84 
85     immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm);
86 
87     assert(pi.isClose(3.14159, 1e-5));
88 }
89 
90 import core.atomic;
91 import core.memory;
92 import core.sync.condition;
93 import core.thread;
94 
95 import std.functional;
96 import std.meta;
97 import std.range.primitives;
98 import std.traits;
99 
100 /*
101 (For now public undocumented with reserved name.)
102 
103 A lazily initialized global constant. The underlying value is a shared global
104 statically initialized to `outOfBandValue` which must not be a legit value of
105 the constant. Upon the first call the situation is detected and the global is
106 initialized by calling `initializer`. The initializer is assumed to be pure
107 (even if not marked as such), i.e. return the same value upon repeated calls.
108 For that reason, no special precautions are taken so `initializer` may be called
109 more than one time leading to benign races on the cached value.
110 
111 In the quiescent state the cost of the function is an atomic load from a global.
112 
113 Params:
114     T = The type of the pseudo-constant (may be qualified)
115     outOfBandValue = A value that cannot be valid, it is used for initialization
116     initializer = The function performing initialization; must be `nothrow`
117 
118 Returns:
119     The lazily initialized value
120 */
121 @property pure
122 T __lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)()
123 if (is(Unqual!T : T)
124     && is(typeof(initializer()) : T)
125     && is(typeof(outOfBandValue) : T))
126 {
127     static T impl() nothrow
128     {
129         // Thread-local cache
130         static Unqual!T tls = outOfBandValue;
131         auto local = tls;
132         // Shortest path, no atomic operations
133         if (local != outOfBandValue) return local;
134         // Process-level cache
135         static shared Unqual!T result = outOfBandValue;
136         // Initialize both process-level cache and tls
137         local = atomicLoad(result);
138         if (local == outOfBandValue)
139         {
140             local = initializer();
141             atomicStore(result, local);
142         }
143         tls = local;
144         return local;
145     }
146 
147     import std.traits : SetFunctionAttributes;
148     alias Fun = SetFunctionAttributes!(typeof(&impl), "D",
149         functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_);
150     auto purified = (() @trusted => cast(Fun) &impl)();
151     return purified();
152 }
153 
154 // Returns the size of a cache line.
155 alias cacheLineSize =
156     __lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl);
157 
158 private size_t cacheLineSizeImpl() @nogc nothrow @trusted
159 {
160     size_t result = 0;
161     import core.cpuid : datacache;
162     foreach (ref const cachelevel; datacache)
163     {
164         if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max)
165         {
166             result = cachelevel.lineSize;
167         }
168     }
169     return result;
170 }
171 
172 @nogc @safe nothrow unittest
173 {
174     assert(cacheLineSize == cacheLineSizeImpl);
175 }
176 
177 /* Atomics code.  These forward to core.atomic, but are written like this
178    for two reasons:
179 
180    1.  They used to actually contain ASM code and I don' want to have to change
181        to directly calling core.atomic in a zillion different places.
182 
183    2.  core.atomic has some misc. issues that make my use cases difficult
184        without wrapping it.  If I didn't wrap it, casts would be required
185        basically everywhere.
186 */
187 private void atomicSetUbyte(T)(ref T stuff, T newVal)
188 if (__traits(isIntegral, T) && is(T : ubyte))
189 {
190     //core.atomic.cas(cast(shared) &stuff, stuff, newVal);
191     atomicStore(*(cast(shared) &stuff), newVal);
192 }
193 
194 private ubyte atomicReadUbyte(T)(ref T val)
195 if (__traits(isIntegral, T) && is(T : ubyte))
196 {
197     return atomicLoad(*(cast(shared) &val));
198 }
199 
200 // This gets rid of the need for a lot of annoying casts in other parts of the
201 // code, when enums are involved.
202 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal)
203 if (__traits(isIntegral, T) && is(T : ubyte))
204 {
205     return core.atomic.cas(cast(shared) &stuff, testVal, newVal);
206 }
207 
208 /*--------------------- Generic helper functions, etc.------------------------*/
209 private template MapType(R, functions...)
210 {
211     static assert(functions.length);
212 
213     ElementType!R e = void;
214     alias MapType =
215         typeof(adjoin!(staticMap!(unaryFun, functions))(e));
216 }
217 
218 private template ReduceType(alias fun, R, E)
219 {
220     alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init));
221 }
222 
223 private template noUnsharedAliasing(T)
224 {
225     enum bool noUnsharedAliasing = !hasUnsharedAliasing!T;
226 }
227 
228 // This template tests whether a function may be executed in parallel from
229 // @safe code via Task.executeInNewThread().  There is an additional
230 // requirement for executing it via a TaskPool.  (See isSafeReturn).
231 private template isSafeTask(F)
232 {
233     enum bool isSafeTask =
234         (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 &&
235         (functionAttributes!F & FunctionAttribute.ref_) == 0 &&
236         (isFunctionPointer!F || !hasUnsharedAliasing!F) &&
237         allSatisfy!(noUnsharedAliasing, Parameters!F);
238 }
239 
240 @safe unittest
241 {
242     alias F1 = void function() @safe;
243     alias F2 = void function();
244     alias F3 = void function(uint, string) @trusted;
245     alias F4 = void function(uint, char[]);
246 
247     static assert( isSafeTask!F1);
248     static assert(!isSafeTask!F2);
249     static assert( isSafeTask!F3);
250     static assert(!isSafeTask!F4);
251 
252     alias F5 = uint[] function(uint, string) pure @trusted;
253     static assert( isSafeTask!F5);
254 }
255 
256 // This function decides whether Tasks that meet all of the other requirements
257 // for being executed from @safe code can be executed on a TaskPool.
258 // When executing via TaskPool, it's theoretically possible
259 // to return a value that is also pointed to by a worker thread's thread local
260 // storage.  When executing from executeInNewThread(), the thread that executed
261 // the Task is terminated by the time the return value is visible in the calling
262 // thread, so this is a non-issue.  It's also a non-issue for pure functions
263 // since they can't read global state.
264 private template isSafeReturn(T)
265 {
266     static if (!hasUnsharedAliasing!(T.ReturnType))
267     {
268         enum isSafeReturn = true;
269     }
270     else static if (T.isPure)
271     {
272         enum isSafeReturn = true;
273     }
274     else
275     {
276         enum isSafeReturn = false;
277     }
278 }
279 
280 private template randAssignable(R)
281 {
282     enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R;
283 }
284 
285 private enum TaskStatus : ubyte
286 {
287     notStarted,
288     inProgress,
289     done
290 }
291 
292 private template AliasReturn(alias fun, T...)
293 {
294     alias AliasReturn = typeof({ T args; return fun(args); });
295 }
296 
297 // Should be private, but std.algorithm.reduce is used in the zero-thread case
298 // and won't work w/ private.
299 template reduceAdjoin(functions...)
300 {
301     static if (functions.length == 1)
302     {
303         alias reduceAdjoin = binaryFun!(functions[0]);
304     }
305     else
306     {
307         T reduceAdjoin(T, U)(T lhs, U rhs)
308         {
309             alias funs = staticMap!(binaryFun, functions);
310 
311             foreach (i, Unused; typeof(lhs.expand))
312             {
313                 lhs.expand[i] = funs[i](lhs.expand[i], rhs);
314             }
315 
316             return lhs;
317         }
318     }
319 }
320 
321 private template reduceFinish(functions...)
322 {
323     static if (functions.length == 1)
324     {
325         alias reduceFinish = binaryFun!(functions[0]);
326     }
327     else
328     {
329         T reduceFinish(T)(T lhs, T rhs)
330         {
331             alias funs = staticMap!(binaryFun, functions);
332 
333             foreach (i, Unused; typeof(lhs.expand))
334             {
335                 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]);
336             }
337 
338             return lhs;
339         }
340     }
341 }
342 
343 private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2)
344 {
345     enum isRoundRobin = true;
346 }
347 
348 private template isRoundRobin(T)
349 {
350     enum isRoundRobin = false;
351 }
352 
353 @safe unittest
354 {
355     static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate())));
356     static assert(!isRoundRobin!(uint));
357 }
358 
359 // This is the base "class" for all of the other tasks.  Using C-style
360 // polymorphism to allow more direct control over memory allocation, etc.
361 private struct AbstractTask
362 {
363     AbstractTask* prev;
364     AbstractTask* next;
365 
366     // Pointer to a function that executes this task.
367     void function(void*) runTask;
368 
369     Throwable exception;
370     ubyte taskStatus = TaskStatus.notStarted;
371 
372     bool done() @property
373     {
374         if (atomicReadUbyte(taskStatus) == TaskStatus.done)
375         {
376             if (exception)
377             {
378                 throw exception;
379             }
380 
381             return true;
382         }
383 
384         return false;
385     }
386 
387     void job()
388     {
389         runTask(&this);
390     }
391 }
392 
393 /**
394 `Task` represents the fundamental unit of work.  A `Task` may be
395 executed in parallel with any other `Task`.  Using this struct directly
396 allows future/promise parallelism.  In this paradigm, a function (or delegate
397 or other callable) is executed in a thread other than the one it was called
398 from.  The calling thread does not block while the function is being executed.
399 A call to `workForce`, `yieldForce`, or `spinForce` is used to
400 ensure that the `Task` has finished executing and to obtain the return
401 value, if any.  These functions and `done` also act as full memory barriers,
402 meaning that any memory writes made in the thread that executed the `Task`
403 are guaranteed to be visible in the calling thread after one of these functions
404 returns.
405 
406 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can
407 be used to create an instance of this struct.  See `task` for usage examples.
408 
409 Function results are returned from `yieldForce`, `spinForce` and
410 `workForce` by ref.  If `fun` returns by ref, the reference will point
411 to the returned reference of `fun`.  Otherwise it will point to a
412 field in this struct.
413 
414 Copying of this struct is disabled, since it would provide no useful semantics.
415 If you want to pass this struct around, you should do so by reference or
416 pointer.
417 
418 Bugs:  Changes to `ref` and `out` arguments are not propagated to the
419        call site, only to `args` in this struct.
420 */
421 struct Task(alias fun, Args...)
422 {
423     @(imported!"core.attribute".mutableRefInit) private AbstractTask base = {runTask : &impl};
424     private alias base this;
425 
426     private @property AbstractTask* basePtr()
427     {
428         return &base;
429     }
430 
431     private static void impl(void* myTask)
432     {
433         import std.algorithm.internal : addressOf;
434 
435         Task* myCastedTask = cast(typeof(this)*) myTask;
436         static if (is(ReturnType == void))
437         {
438             fun(myCastedTask._args);
439         }
440         else static if (is(typeof(&(fun(myCastedTask._args)))))
441         {
442             myCastedTask.returnVal = addressOf(fun(myCastedTask._args));
443         }
444         else
445         {
446             myCastedTask.returnVal = fun(myCastedTask._args);
447         }
448     }
449 
450     private TaskPool pool;
451     private bool isScoped;  // True if created with scopedTask.
452 
453     Args _args;
454 
455     /**
456     The arguments the function was called with.  Changes to `out` and
457     `ref` arguments will be visible here.
458     */
459     static if (__traits(isSame, fun, run))
460     {
461         alias args = _args[1..$];
462     }
463     else
464     {
465         alias args = _args;
466     }
467 
468 
469     // The purpose of this code is to decide whether functions whose
470     // return values have unshared aliasing can be executed via
471     // TaskPool from @safe code.  See isSafeReturn.
472     static if (__traits(isSame, fun, run))
473     {
474         static if (isFunctionPointer!(_args[0]))
475         {
476             private enum bool isPure =
477             (functionAttributes!(Args[0]) & FunctionAttribute.pure_) != 0;
478         }
479         else
480         {
481             // BUG:  Should check this for delegates too, but std.traits
482             //       apparently doesn't allow this.  isPure is irrelevant
483             //       for delegates, at least for now since shared delegates
484             //       don't work.
485             private enum bool isPure = false;
486         }
487 
488     }
489     else
490     {
491         // We already know that we can't execute aliases in @safe code, so
492         // just put a dummy value here.
493         private enum bool isPure = false;
494     }
495 
496 
497     /**
498     The return type of the function called by this `Task`.  This can be
499     `void`.
500     */
501     alias ReturnType = typeof(fun(_args));
502 
503     static if (!is(ReturnType == void))
504     {
505         static if (is(typeof(&fun(_args))))
506         {
507             // Ref return.
508             ReturnType* returnVal;
509 
510             ref ReturnType fixRef(ReturnType* val)
511             {
512                 return *val;
513             }
514 
515         }
516         else
517         {
518             ReturnType returnVal;
519 
520             ref ReturnType fixRef(ref ReturnType val)
521             {
522                 return val;
523             }
524         }
525     }
526 
527     private void enforcePool()
528     {
529         import std.exception : enforce;
530         enforce(this.pool !is null, "Job not submitted yet.");
531     }
532 
533     static if (Args.length > 0)
534     {
535         private this(Args args)
536         {
537             _args = args;
538         }
539     }
540 
541     // Work around DMD bug https://issues.dlang.org/show_bug.cgi?id=6588,
542     // allow immutable elements.
543     static if (allSatisfy!(isAssignable, Args))
544     {
545         typeof(this) opAssign(typeof(this) rhs)
546         {
547             foreach (i, Type; typeof(this.tupleof))
548             {
549                 this.tupleof[i] = rhs.tupleof[i];
550             }
551             return this;
552         }
553     }
554     else
555     {
556         @disable typeof(this) opAssign(typeof(this) rhs);
557     }
558 
559     /**
560     If the `Task` isn't started yet, execute it in the current thread.
561     If it's done, return its return value, if any.  If it's in progress,
562     busy spin until it's done, then return the return value.  If it threw
563     an exception, rethrow that exception.
564 
565     This function should be used when you expect the result of the
566     `Task` to be available on a timescale shorter than that of an OS
567     context switch.
568      */
569     @property ref ReturnType spinForce() @trusted
570     {
571         enforcePool();
572 
573         this.pool.tryDeleteExecute(basePtr);
574 
575         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {}
576 
577         if (exception)
578         {
579             throw exception;
580         }
581 
582         static if (!is(ReturnType == void))
583         {
584             return fixRef(this.returnVal);
585         }
586     }
587 
588     /**
589     If the `Task` isn't started yet, execute it in the current thread.
590     If it's done, return its return value, if any.  If it's in progress,
591     wait on a condition variable.  If it threw an exception, rethrow that
592     exception.
593 
594     This function should be used for expensive functions, as waiting on a
595     condition variable introduces latency, but avoids wasted CPU cycles.
596      */
597     @property ref ReturnType yieldForce() @trusted
598     {
599         enforcePool();
600         this.pool.tryDeleteExecute(basePtr);
601 
602         if (done)
603         {
604             static if (is(ReturnType == void))
605             {
606                 return;
607             }
608             else
609             {
610                 return fixRef(this.returnVal);
611             }
612         }
613 
614         pool.waiterLock();
615         scope(exit) pool.waiterUnlock();
616 
617         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done)
618         {
619             pool.waitUntilCompletion();
620         }
621 
622         if (exception)
623         {
624             throw exception; // nocoverage
625         }
626 
627         static if (!is(ReturnType == void))
628         {
629             return fixRef(this.returnVal);
630         }
631     }
632 
633     /**
634     If this `Task` was not started yet, execute it in the current
635     thread.  If it is finished, return its result.  If it is in progress,
636     execute any other `Task` from the `TaskPool` instance that
637     this `Task` was submitted to until this one
638     is finished.  If it threw an exception, rethrow that exception.
639     If no other tasks are available or this `Task` was executed using
640     `executeInNewThread`, wait on a condition variable.
641      */
642     @property ref ReturnType workForce() @trusted
643     {
644         enforcePool();
645         this.pool.tryDeleteExecute(basePtr);
646 
647         while (true)
648         {
649             if (done)    // done() implicitly checks for exceptions.
650             {
651                 static if (is(ReturnType == void))
652                 {
653                     return;
654                 }
655                 else
656                 {
657                     return fixRef(this.returnVal);
658                 }
659             }
660 
661             AbstractTask* job;
662             {
663                 // Locking explicitly and calling popNoSync() because
664                 // pop() waits on a condition variable if there are no Tasks
665                 // in the queue.
666 
667                 pool.queueLock();
668                 scope(exit) pool.queueUnlock();
669                 job = pool.popNoSync();
670             }
671 
672 
673             if (job !is null)
674             {
675 
676                 version (verboseUnittest)
677                 {
678                     stderr.writeln("Doing workForce work.");
679                 }
680 
681                 pool.doJob(job);
682 
683                 if (done)
684                 {
685                     static if (is(ReturnType == void))
686                     {
687                         return;
688                     }
689                     else
690                     {
691                         return fixRef(this.returnVal);
692                     }
693                 }
694             }
695             else
696             {
697                 version (verboseUnittest)
698                 {
699                     stderr.writeln("Yield from workForce.");
700                 }
701 
702                 return yieldForce;
703             }
704         }
705     }
706 
707     /**
708     Returns `true` if the `Task` is finished executing.
709 
710     Throws:  Rethrows any exception thrown during the execution of the
711              `Task`.
712     */
713     @property bool done() @trusted
714     {
715         // Explicitly forwarded for documentation purposes.
716         return base.done;
717     }
718 
719     /**
720     Create a new thread for executing this `Task`, execute it in the
721     newly created thread, then terminate the thread.  This can be used for
722     future/promise parallelism.  An explicit priority may be given
723     to the `Task`.  If one is provided, its value is forwarded to
724     `core.thread.Thread.priority`. See $(REF task, std,parallelism) for
725     usage example.
726     */
727     void executeInNewThread() @trusted
728     {
729         pool = new TaskPool(basePtr);
730     }
731 
732     /// Ditto
733     void executeInNewThread(int priority) @trusted
734     {
735         pool = new TaskPool(basePtr, priority);
736     }
737 
738     @safe ~this()
739     {
740         if (isScoped && pool !is null && taskStatus != TaskStatus.done)
741         {
742             yieldForce;
743         }
744     }
745 
746     // When this is uncommented, it somehow gets called on returning from
747     // scopedTask even though the struct shouldn't be getting copied.
748     //@disable this(this) {}
749 }
750 
751 // Calls `fpOrDelegate` with `args`.  This is an
752 // adapter that makes `Task` work with delegates, function pointers and
753 // functors instead of just aliases.
754 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args)
755 {
756     return fpOrDelegate(args);
757 }
758 
759 /**
760 Creates a `Task` on the GC heap that calls an alias.  This may be executed
761 via `Task.executeInNewThread` or by submitting to a
762 $(REF TaskPool, std,parallelism).  A globally accessible instance of
763 `TaskPool` is provided by $(REF taskPool, std,parallelism).
764 
765 Returns:  A pointer to the `Task`.
766 
767 Example:
768 ---
769 // Read two files into memory at the same time.
770 import std.file;
771 
772 void main()
773 {
774     // Create and execute a Task for reading
775     // foo.txt.
776     auto file1Task = task!read("foo.txt");
777     file1Task.executeInNewThread();
778 
779     // Read bar.txt in parallel.
780     auto file2Data = read("bar.txt");
781 
782     // Get the results of reading foo.txt.
783     auto file1Data = file1Task.yieldForce;
784 }
785 ---
786 
787 ---
788 // Sorts an array using a parallel quick sort algorithm.
789 // The first partition is done serially.  Both recursion
790 // branches are then executed in parallel.
791 //
792 // Timings for sorting an array of 1,000,000 doubles on
793 // an Athlon 64 X2 dual core machine:
794 //
795 // This implementation:               176 milliseconds.
796 // Equivalent serial implementation:  280 milliseconds
797 void parallelSort(T)(T[] data)
798 {
799     // Sort small subarrays serially.
800     if (data.length < 100)
801     {
802          std.algorithm.sort(data);
803          return;
804     }
805 
806     // Partition the array.
807     swap(data[$ / 2], data[$ - 1]);
808     auto pivot = data[$ - 1];
809     bool lessThanPivot(T elem) { return elem < pivot; }
810 
811     auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]);
812     swap(data[$ - greaterEqual.length - 1], data[$ - 1]);
813 
814     auto less = data[0..$ - greaterEqual.length - 1];
815     greaterEqual = data[$ - greaterEqual.length..$];
816 
817     // Execute both recursion branches in parallel.
818     auto recurseTask = task!parallelSort(greaterEqual);
819     taskPool.put(recurseTask);
820     parallelSort(less);
821     recurseTask.yieldForce;
822 }
823 ---
824 */
825 auto task(alias fun, Args...)(Args args)
826 {
827     return new Task!(fun, Args)(args);
828 }
829 
830 /**
831 Creates a `Task` on the GC heap that calls a function pointer, delegate, or
832 class/struct with overloaded opCall.
833 
834 Example:
835 ---
836 // Read two files in at the same time again,
837 // but this time use a function pointer instead
838 // of an alias to represent std.file.read.
839 import std.file;
840 
841 void main()
842 {
843     // Create and execute a Task for reading
844     // foo.txt.
845     auto file1Task = task(&read!string, "foo.txt", size_t.max);
846     file1Task.executeInNewThread();
847 
848     // Read bar.txt in parallel.
849     auto file2Data = read("bar.txt");
850 
851     // Get the results of reading foo.txt.
852     auto file1Data = file1Task.yieldForce;
853 }
854 ---
855 
856 Notes: This function takes a non-scope delegate, meaning it can be
857        used with closures.  If you can't allocate a closure due to objects
858        on the stack that have scoped destruction, see `scopedTask`, which
859        takes a scope delegate.
860  */
861 auto task(F, Args...)(F delegateOrFp, Args args)
862 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
863 {
864     return new Task!(run, F, Args)(delegateOrFp, args);
865 }
866 
867 /**
868 Version of `task` usable from `@safe` code.  Usage mechanics are
869 identical to the non-@safe case, but safety introduces some restrictions:
870 
871 1.  `fun` must be @safe or @trusted.
872 
873 2.  `F` must not have any unshared aliasing as defined by
874     $(REF hasUnsharedAliasing, std,traits).  This means it
875     may not be an unshared delegate or a non-shared class or struct
876     with overloaded `opCall`.  This also precludes accepting template
877     alias parameters.
878 
879 3.  `Args` must not have unshared aliasing.
880 
881 4.  `fun` must not return by reference.
882 
883 5.  The return type must not have unshared aliasing unless `fun` is
884     `pure` or the `Task` is executed via `executeInNewThread` instead
885     of using a `TaskPool`.
886 
887 */
888 @trusted auto task(F, Args...)(F fun, Args args)
889 if (is(typeof(fun(args))) && isSafeTask!F)
890 {
891     return new Task!(run, F, Args)(fun, args);
892 }
893 
894 /**
895 These functions allow the creation of `Task` objects on the stack rather
896 than the GC heap.  The lifetime of a `Task` created by `scopedTask`
897 cannot exceed the lifetime of the scope it was created in.
898 
899 `scopedTask` might be preferred over `task`:
900 
901 1.  When a `Task` that calls a delegate is being created and a closure
902     cannot be allocated due to objects on the stack that have scoped
903     destruction.  The delegate overload of `scopedTask` takes a `scope`
904     delegate.
905 
906 2.  As a micro-optimization, to avoid the heap allocation associated with
907     `task` or with the creation of a closure.
908 
909 Usage is otherwise identical to `task`.
910 
911 Notes:  `Task` objects created using `scopedTask` will automatically
912 call `Task.yieldForce` in their destructor if necessary to ensure
913 the `Task` is complete before the stack frame they reside on is destroyed.
914 */
915 auto scopedTask(alias fun, Args...)(Args args)
916 {
917     auto ret = Task!(fun, Args)(args);
918     ret.isScoped = true;
919     return ret;
920 }
921 
922 /// Ditto
923 auto scopedTask(F, Args...)(scope F delegateOrFp, Args args)
924 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
925 {
926     auto ret = Task!(run, F, Args)(delegateOrFp, args);
927     ret.isScoped = true;
928     return ret;
929 }
930 
931 /// Ditto
932 @trusted auto scopedTask(F, Args...)(F fun, Args args)
933 if (is(typeof(fun(args))) && isSafeTask!F)
934 {
935     auto ret = Task!(run, F, Args)(fun, args);
936     ret.isScoped = true;
937     return ret;
938 }
939 
940 /**
941 The total number of CPU cores available on the current machine, as reported by
942 the operating system.
943 */
944 alias totalCPUs =
945     __lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl);
946 
947 uint totalCPUsImpl() @nogc nothrow @trusted
948 {
949     version (Windows)
950     {
951         // BUGS:  Only works on Windows 2000 and above.
952         import core.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo;
953         import std.algorithm.comparison : max;
954         SYSTEM_INFO si;
955         GetSystemInfo(&si);
956         return max(1, cast(uint) si.dwNumberOfProcessors);
957     }
958     else version (linux)
959     {
960         import core.stdc.stdlib : calloc;
961         import core.stdc.string : memset;
962         import core.sys.linux.sched : CPU_ALLOC_SIZE, CPU_FREE, CPU_COUNT, CPU_COUNT_S, cpu_set_t, sched_getaffinity;
963         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
964 
965         int count = 0;
966 
967         /**
968          * According to ruby's source code, CPU_ALLOC() doesn't work as expected.
969          *  see: https://github.com/ruby/ruby/commit/7d9e04de496915dd9e4544ee18b1a0026dc79242
970          *
971          *  The hardcode number also comes from ruby's source code.
972          *  see: https://github.com/ruby/ruby/commit/0fa75e813ecf0f5f3dd01f89aa76d0e25ab4fcd4
973          */
974         for (int n = 64; n <= 16384; n *= 2)
975         {
976             size_t size = CPU_ALLOC_SIZE(count);
977             if (size >= 0x400)
978             {
979                 auto cpuset = cast(cpu_set_t*) calloc(1, size);
980                 if (cpuset is null) break;
981                 if (sched_getaffinity(0, size, cpuset) == 0)
982                 {
983                     count = CPU_COUNT_S(size, cpuset);
984                 }
985                 CPU_FREE(cpuset);
986             }
987             else
988             {
989                 cpu_set_t cpuset;
990                 if (sched_getaffinity(0, cpu_set_t.sizeof, &cpuset) == 0)
991                 {
992                     count = CPU_COUNT(&cpuset);
993                 }
994             }
995 
996             if (count > 0)
997                 return cast(uint) count;
998         }
999 
1000         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1001     }
1002     else version (Darwin)
1003     {
1004         import core.sys.darwin.sys.sysctl : sysctlbyname;
1005         uint result;
1006         size_t len = result.sizeof;
1007         sysctlbyname("hw.physicalcpu", &result, &len, null, 0);
1008         return result;
1009     }
1010     else version (DragonFlyBSD)
1011     {
1012         import core.sys.dragonflybsd.sys.sysctl : sysctlbyname;
1013         uint result;
1014         size_t len = result.sizeof;
1015         sysctlbyname("hw.ncpu", &result, &len, null, 0);
1016         return result;
1017     }
1018     else version (FreeBSD)
1019     {
1020         import core.sys.freebsd.sys.sysctl : sysctlbyname;
1021         uint result;
1022         size_t len = result.sizeof;
1023         sysctlbyname("hw.ncpu", &result, &len, null, 0);
1024         return result;
1025     }
1026     else version (NetBSD)
1027     {
1028         import core.sys.netbsd.sys.sysctl : sysctlbyname;
1029         uint result;
1030         size_t len = result.sizeof;
1031         sysctlbyname("hw.ncpu", &result, &len, null, 0);
1032         return result;
1033     }
1034     else version (Solaris)
1035     {
1036         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1037         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1038     }
1039     else version (OpenBSD)
1040     {
1041         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1042         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1043     }
1044     else version (Hurd)
1045     {
1046         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
1047         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
1048     }
1049     else
1050     {
1051         static assert(0, "Don't know how to get N CPUs on this OS.");
1052     }
1053 }
1054 
1055 /*
1056 This class serves two purposes:
1057 
1058 1.  It distinguishes std.parallelism threads from other threads so that
1059     the std.parallelism daemon threads can be terminated.
1060 
1061 2.  It adds a reference to the pool that the thread is a member of,
1062     which is also necessary to allow the daemon threads to be properly
1063     terminated.
1064 */
1065 private final class ParallelismThread : Thread
1066 {
1067     this(void delegate() dg)
1068     {
1069         super(dg);
1070     }
1071 
1072     TaskPool pool;
1073 }
1074 
1075 // Kill daemon threads.
1076 shared static ~this()
1077 {
1078     foreach (ref thread; Thread)
1079     {
1080         auto pthread = cast(ParallelismThread) thread;
1081         if (pthread is null) continue;
1082         auto pool = pthread.pool;
1083         if (!pool.isDaemon) continue;
1084         pool.stop();
1085         pthread.join();
1086     }
1087 }
1088 
1089 /**
1090 This class encapsulates a task queue and a set of worker threads.  Its purpose
1091 is to efficiently map a large number of `Task`s onto a smaller number of
1092 threads.  A task queue is a FIFO queue of `Task` objects that have been
1093 submitted to the `TaskPool` and are awaiting execution.  A worker thread is a
1094 thread that executes the `Task` at the front of the queue when one is
1095 available and sleeps when the queue is empty.
1096 
1097 This class should usually be used via the global instantiation
1098 available via the $(REF taskPool, std,parallelism) property.
1099 Occasionally it is useful to explicitly instantiate a `TaskPool`:
1100 
1101 1.  When you want `TaskPool` instances with multiple priorities, for example
1102     a low priority pool and a high priority pool.
1103 
1104 2.  When the threads in the global task pool are waiting on a synchronization
1105     primitive (for example a mutex), and you want to parallelize the code that
1106     needs to run before these threads can be resumed.
1107 
1108 Note: The worker threads in this pool will not stop until
1109       `stop` or `finish` is called, even if the main thread
1110       has finished already. This may lead to programs that
1111       never end. If you do not want this behaviour, you can set `isDaemon`
1112       to true.
1113  */
1114 final class TaskPool
1115 {
1116 private:
1117 
1118     // A pool can either be a regular pool or a single-task pool.  A
1119     // single-task pool is a dummy pool that's fired up for
1120     // Task.executeInNewThread().
1121     bool isSingleTask;
1122 
1123     ParallelismThread[] pool;
1124     Thread singleTaskThread;
1125 
1126     AbstractTask* head;
1127     AbstractTask* tail;
1128     PoolState status = PoolState.running;
1129     Condition workerCondition;
1130     Condition waiterCondition;
1131     Mutex queueMutex;
1132     Mutex waiterMutex;  // For waiterCondition
1133 
1134     // The instanceStartIndex of the next instance that will be created.
1135     __gshared size_t nextInstanceIndex = 1;
1136 
1137     // The index of the current thread.
1138     static size_t threadIndex;
1139 
1140     // The index of the first thread in this instance.
1141     immutable size_t instanceStartIndex;
1142 
1143     // The index that the next thread to be initialized in this pool will have.
1144     size_t nextThreadIndex;
1145 
1146     enum PoolState : ubyte
1147     {
1148         running,
1149         finishing,
1150         stopNow
1151     }
1152 
1153     void doJob(AbstractTask* job)
1154     {
1155         assert(job.taskStatus == TaskStatus.inProgress);
1156         assert(job.next is null);
1157         assert(job.prev is null);
1158 
1159         try
1160         {
1161             job.job();
1162         }
1163         catch (Throwable e)
1164         {
1165             job.exception = e;
1166         }
1167 
1168         atomicSetUbyte(job.taskStatus, TaskStatus.done);
1169 
1170 	if (!isSingleTask)
1171 	{
1172             waiterLock();
1173             scope(exit) waiterUnlock();
1174             notifyWaiters();
1175         }
1176     }
1177 
1178     // This function is used for dummy pools created by Task.executeInNewThread().
1179     void doSingleTask()
1180     {
1181         // No synchronization.  Pool is guaranteed to only have one thread,
1182         // and the queue is submitted to before this thread is created.
1183         assert(head);
1184         auto t = head;
1185         t.next = t.prev = head = null;
1186         doJob(t);
1187     }
1188 
1189     // This function performs initialization for each thread that affects
1190     // thread local storage and therefore must be done from within the
1191     // worker thread.  It then calls executeWorkLoop().
1192     void startWorkLoop()
1193     {
1194         // Initialize thread index.
1195         {
1196             queueLock();
1197             scope(exit) queueUnlock();
1198             threadIndex = nextThreadIndex;
1199             nextThreadIndex++;
1200         }
1201 
1202         executeWorkLoop();
1203     }
1204 
1205     // This is the main work loop that worker threads spend their time in
1206     // until they terminate.  It's also entered by non-worker threads when
1207     // finish() is called with the blocking variable set to true.
1208     void executeWorkLoop()
1209     {
1210         while (atomicReadUbyte(status) != PoolState.stopNow)
1211         {
1212             AbstractTask* task = pop();
1213             if (task is null)
1214             {
1215                 if (atomicReadUbyte(status) == PoolState.finishing)
1216                 {
1217                     atomicSetUbyte(status, PoolState.stopNow);
1218                     return;
1219                 }
1220             }
1221             else
1222             {
1223                 doJob(task);
1224             }
1225         }
1226     }
1227 
1228     // Pop a task off the queue.
1229     AbstractTask* pop()
1230     {
1231         queueLock();
1232         scope(exit) queueUnlock();
1233         auto ret = popNoSync();
1234         while (ret is null && status == PoolState.running)
1235         {
1236             wait();
1237             ret = popNoSync();
1238         }
1239         return ret;
1240     }
1241 
1242     AbstractTask* popNoSync()
1243     out(returned)
1244     {
1245         /* If task.prev and task.next aren't null, then another thread
1246          * can try to delete this task from the pool after it's
1247          * alreadly been deleted/popped.
1248          */
1249         if (returned !is null)
1250         {
1251             assert(returned.next is null);
1252             assert(returned.prev is null);
1253         }
1254     }
1255     do
1256     {
1257         if (isSingleTask) return null;
1258 
1259         AbstractTask* returned = head;
1260         if (head !is null)
1261         {
1262             head = head.next;
1263             returned.prev = null;
1264             returned.next = null;
1265             returned.taskStatus = TaskStatus.inProgress;
1266         }
1267         if (head !is null)
1268         {
1269             head.prev = null;
1270         }
1271 
1272         return returned;
1273     }
1274 
1275     // Push a task onto the queue.
1276     void abstractPut(AbstractTask* task)
1277     {
1278         queueLock();
1279         scope(exit) queueUnlock();
1280         abstractPutNoSync(task);
1281     }
1282 
1283     void abstractPutNoSync(AbstractTask* task)
1284     in
1285     {
1286         assert(task);
1287     }
1288     out
1289     {
1290         import std.conv : text;
1291 
1292         assert(tail.prev !is tail);
1293         assert(tail.next is null, text(tail.prev, '\t', tail.next));
1294         if (tail.prev !is null)
1295         {
1296             assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next));
1297         }
1298     }
1299     do
1300     {
1301         // Not using enforce() to save on function call overhead since this
1302         // is a performance critical function.
1303         if (status != PoolState.running)
1304         {
1305             throw new Error(
1306                 "Cannot submit a new task to a pool after calling " ~
1307                 "finish() or stop()."
1308             );
1309         }
1310 
1311         task.next = null;
1312         if (head is null)   //Queue is empty.
1313         {
1314             head = task;
1315             tail = task;
1316             tail.prev = null;
1317         }
1318         else
1319         {
1320             assert(tail);
1321             task.prev = tail;
1322             tail.next = task;
1323             tail = task;
1324         }
1325         notify();
1326     }
1327 
1328     void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t)
1329     {
1330         if (status != PoolState.running)
1331         {
1332             throw new Error(
1333                 "Cannot submit a new task to a pool after calling " ~
1334                 "finish() or stop()."
1335             );
1336         }
1337 
1338         if (head is null)
1339         {
1340             head = h;
1341             tail = t;
1342         }
1343         else
1344         {
1345             h.prev = tail;
1346             tail.next = h;
1347             tail = t;
1348         }
1349 
1350         notifyAll();
1351     }
1352 
1353     void tryDeleteExecute(AbstractTask* toExecute)
1354     {
1355         if (isSingleTask) return;
1356 
1357         if ( !deleteItem(toExecute) )
1358         {
1359             return;
1360         }
1361 
1362         try
1363         {
1364             toExecute.job();
1365         }
1366         catch (Exception e)
1367         {
1368             toExecute.exception = e;
1369         }
1370 
1371         atomicSetUbyte(toExecute.taskStatus, TaskStatus.done);
1372     }
1373 
1374     bool deleteItem(AbstractTask* item)
1375     {
1376         queueLock();
1377         scope(exit) queueUnlock();
1378         return deleteItemNoSync(item);
1379     }
1380 
1381     bool deleteItemNoSync(AbstractTask* item)
1382     {
1383         if (item.taskStatus != TaskStatus.notStarted)
1384         {
1385             return false;
1386         }
1387         item.taskStatus = TaskStatus.inProgress;
1388 
1389         if (item is head)
1390         {
1391             // Make sure head gets set properly.
1392             popNoSync();
1393             return true;
1394         }
1395         if (item is tail)
1396         {
1397             tail = tail.prev;
1398             if (tail !is null)
1399             {
1400                 tail.next = null;
1401             }
1402             item.next = null;
1403             item.prev = null;
1404             return true;
1405         }
1406         if (item.next !is null)
1407         {
1408             assert(item.next.prev is item);  // Check queue consistency.
1409             item.next.prev = item.prev;
1410         }
1411         if (item.prev !is null)
1412         {
1413             assert(item.prev.next is item);  // Check queue consistency.
1414             item.prev.next = item.next;
1415         }
1416         item.next = null;
1417         item.prev = null;
1418         return true;
1419     }
1420 
1421     void queueLock()
1422     {
1423         assert(queueMutex);
1424         if (!isSingleTask) queueMutex.lock();
1425     }
1426 
1427     void queueUnlock()
1428     {
1429         assert(queueMutex);
1430         if (!isSingleTask) queueMutex.unlock();
1431     }
1432 
1433     void waiterLock()
1434     {
1435         if (!isSingleTask) waiterMutex.lock();
1436     }
1437 
1438     void waiterUnlock()
1439     {
1440         if (!isSingleTask) waiterMutex.unlock();
1441     }
1442 
1443     void wait()
1444     {
1445         if (!isSingleTask) workerCondition.wait();
1446     }
1447 
1448     void notify()
1449     {
1450         if (!isSingleTask) workerCondition.notify();
1451     }
1452 
1453     void notifyAll()
1454     {
1455         if (!isSingleTask) workerCondition.notifyAll();
1456     }
1457 
1458     void waitUntilCompletion()
1459     {
1460         if (isSingleTask)
1461         {
1462             singleTaskThread.join();
1463         }
1464         else
1465         {
1466             waiterCondition.wait();
1467         }
1468     }
1469 
1470     void notifyWaiters()
1471     {
1472         if (!isSingleTask) waiterCondition.notifyAll();
1473     }
1474 
1475     // Private constructor for creating dummy pools that only have one thread,
1476     // only execute one Task, and then terminate.  This is used for
1477     // Task.executeInNewThread().
1478     this(AbstractTask* task, int priority = int.max)
1479     {
1480         assert(task);
1481 
1482         // Dummy value, not used.
1483         instanceStartIndex = 0;
1484 
1485         this.isSingleTask = true;
1486         task.taskStatus = TaskStatus.inProgress;
1487         this.head = task;
1488         singleTaskThread = new Thread(&doSingleTask);
1489         singleTaskThread.start();
1490 
1491         // Disabled until writing code to support
1492         // running thread with specified priority
1493         // See https://issues.dlang.org/show_bug.cgi?id=8960
1494 
1495         /*if (priority != int.max)
1496         {
1497             singleTaskThread.priority = priority;
1498         }*/
1499     }
1500 
1501 public:
1502     // This is used in parallel_algorithm but is too unstable to document
1503     // as public API.
1504     size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow
1505     {
1506         import std.algorithm.comparison : max;
1507 
1508         if (this.size == 0)
1509         {
1510             return max(rangeLen, 1);
1511         }
1512 
1513         immutable size_t eightSize = 4 * (this.size + 1);
1514         auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1);
1515         return max(ret, 1);
1516     }
1517 
1518     /**
1519     Default constructor that initializes a `TaskPool` with
1520     `totalCPUs` - 1 worker threads.  The minus 1 is included because the
1521     main thread will also be available to do work.
1522 
1523     Note:  On single-core machines, the primitives provided by `TaskPool`
1524            operate transparently in single-threaded mode.
1525      */
1526     this() @trusted
1527     {
1528         this(totalCPUs - 1);
1529     }
1530 
1531     /**
1532     Allows for custom number of worker threads.
1533     */
1534     this(size_t nWorkers) @trusted
1535     {
1536         synchronized(typeid(TaskPool))
1537         {
1538             instanceStartIndex = nextInstanceIndex;
1539 
1540             // The first worker thread to be initialized will have this index,
1541             // and will increment it.  The second worker to be initialized will
1542             // have this index plus 1.
1543             nextThreadIndex = instanceStartIndex;
1544             nextInstanceIndex += nWorkers;
1545         }
1546 
1547         queueMutex = new Mutex(this);
1548         waiterMutex = new Mutex();
1549         workerCondition = new Condition(queueMutex);
1550         waiterCondition = new Condition(waiterMutex);
1551 
1552         pool = new ParallelismThread[nWorkers];
1553         foreach (ref poolThread; pool)
1554         {
1555             poolThread = new ParallelismThread(&startWorkLoop);
1556             poolThread.pool = this;
1557             poolThread.start();
1558         }
1559     }
1560 
1561     /**
1562     Implements a parallel foreach loop over a range.  This works by implicitly
1563     creating and submitting one `Task` to the `TaskPool` for each worker
1564     thread.  A work unit is a set of consecutive elements of `range` to
1565     be processed by a worker thread between communication with any other
1566     thread.  The number of elements processed per work unit is controlled by the
1567     `workUnitSize` parameter.  Smaller work units provide better load
1568     balancing, but larger work units avoid the overhead of communicating
1569     with other threads frequently to fetch the next work unit.  Large work
1570     units also avoid false sharing in cases where the range is being modified.
1571     The less time a single iteration of the loop takes, the larger
1572     `workUnitSize` should be.  For very expensive loop bodies,
1573     `workUnitSize` should  be 1.  An overload that chooses a default work
1574     unit size is also available.
1575 
1576     Example:
1577     ---
1578     // Find the logarithm of every number from 1 to
1579     // 10_000_000 in parallel.
1580     auto logs = new double[10_000_000];
1581 
1582     // Parallel foreach works with or without an index
1583     // variable.  It can iterate by ref if range.front
1584     // returns by ref.
1585 
1586     // Iterate over logs using work units of size 100.
1587     foreach (i, ref elem; taskPool.parallel(logs, 100))
1588     {
1589         elem = log(i + 1.0);
1590     }
1591 
1592     // Same thing, but use the default work unit size.
1593     //
1594     // Timings on an Athlon 64 X2 dual core machine:
1595     //
1596     // Parallel foreach:  388 milliseconds
1597     // Regular foreach:   619 milliseconds
1598     foreach (i, ref elem; taskPool.parallel(logs))
1599     {
1600         elem = log(i + 1.0);
1601     }
1602     ---
1603 
1604     Notes:
1605 
1606     The memory usage of this implementation is guaranteed to be constant
1607     in `range.length`.
1608 
1609     Breaking from a parallel foreach loop via a break, labeled break,
1610     labeled continue, return or goto statement throws a
1611     `ParallelForeachError`.
1612 
1613     In the case of non-random access ranges, parallel foreach buffers lazily
1614     to an array of size `workUnitSize` before executing the parallel portion
1615     of the loop.  The exception is that, if a parallel foreach is executed
1616     over a range returned by `asyncBuf` or `map`, the copying is elided
1617     and the buffers are simply swapped.  In this case `workUnitSize` is
1618     ignored and the work unit size is set to the  buffer size of `range`.
1619 
1620     A memory barrier is guaranteed to be executed on exit from the loop,
1621     so that results produced by all threads are visible in the calling thread.
1622 
1623     $(B Exception Handling):
1624 
1625     When at least one exception is thrown from inside a parallel foreach loop,
1626     the submission of additional `Task` objects is terminated as soon as
1627     possible, in a non-deterministic manner.  All executing or
1628     enqueued work units are allowed to complete.  Then, all exceptions that
1629     were thrown by any work unit are chained using `Throwable.next` and
1630     rethrown.  The order of the exception chaining is non-deterministic.
1631     */
1632     ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
1633     {
1634         import std.exception : enforce;
1635         enforce(workUnitSize > 0, "workUnitSize must be > 0.");
1636         alias RetType = ParallelForeach!R;
1637         return RetType(this, range, workUnitSize);
1638     }
1639 
1640 
1641     /// Ditto
1642     ParallelForeach!R parallel(R)(R range)
1643     {
1644         static if (hasLength!R)
1645         {
1646             // Default work unit size is such that we would use 4x as many
1647             // slots as are in this thread pool.
1648             size_t workUnitSize = defaultWorkUnitSize(range.length);
1649             return parallel(range, workUnitSize);
1650         }
1651         else
1652         {
1653             // Just use a really, really dumb guess if the user is too lazy to
1654             // specify.
1655             return parallel(range, 512);
1656         }
1657     }
1658 
1659     ///
1660     template amap(functions...)
1661     {
1662         /**
1663         Eager parallel map.  The eagerness of this function means it has less
1664         overhead than the lazily evaluated `TaskPool.map` and should be
1665         preferred where the memory requirements of eagerness are acceptable.
1666         `functions` are the functions to be evaluated, passed as template
1667         alias parameters in a style similar to
1668         $(REF map, std,algorithm,iteration).
1669         The first argument must be a random access range. For performance
1670         reasons, amap will assume the range elements have not yet been
1671         initialized. Elements will be overwritten without calling a destructor
1672         nor doing an assignment. As such, the range must not contain meaningful
1673         data$(DDOC_COMMENT not a section): either un-initialized objects, or
1674         objects in their `.init` state.
1675 
1676         ---
1677         auto numbers = iota(100_000_000.0);
1678 
1679         // Find the square roots of numbers.
1680         //
1681         // Timings on an Athlon 64 X2 dual core machine:
1682         //
1683         // Parallel eager map:                   0.802 s
1684         // Equivalent serial implementation:     1.768 s
1685         auto squareRoots = taskPool.amap!sqrt(numbers);
1686         ---
1687 
1688         Immediately after the range argument, an optional work unit size argument
1689         may be provided.  Work units as used by `amap` are identical to those
1690         defined for parallel foreach.  If no work unit size is provided, the
1691         default work unit size is used.
1692 
1693         ---
1694         // Same thing, but make work unit size 100.
1695         auto squareRoots = taskPool.amap!sqrt(numbers, 100);
1696         ---
1697 
1698         An output range for returning the results may be provided as the last
1699         argument.  If one is not provided, an array of the proper type will be
1700         allocated on the garbage collected heap.  If one is provided, it must be a
1701         random access range with assignable elements, must have reference
1702         semantics with respect to assignment to its elements, and must have the
1703         same length as the input range.  Writing to adjacent elements from
1704         different threads must be safe.
1705 
1706         ---
1707         // Same thing, but explicitly allocate an array
1708         // to return the results in.  The element type
1709         // of the array may be either the exact type
1710         // returned by functions or an implicit conversion
1711         // target.
1712         auto squareRoots = new float[numbers.length];
1713         taskPool.amap!sqrt(numbers, squareRoots);
1714 
1715         // Multiple functions, explicit output range, and
1716         // explicit work unit size.
1717         auto results = new Tuple!(float, real)[numbers.length];
1718         taskPool.amap!(sqrt, log)(numbers, 100, results);
1719         ---
1720 
1721         Note:
1722 
1723         A memory barrier is guaranteed to be executed after all results are written
1724         but before returning so that results produced by all threads are visible
1725         in the calling thread.
1726 
1727         Tips:
1728 
1729         To perform the mapping operation in place, provide the same range for the
1730         input and output range.
1731 
1732         To parallelize the copying of a range with expensive to evaluate elements
1733         to an array, pass an identity function (a function that just returns
1734         whatever argument is provided to it) to `amap`.
1735 
1736         $(B Exception Handling):
1737 
1738         When at least one exception is thrown from inside the map functions,
1739         the submission of additional `Task` objects is terminated as soon as
1740         possible, in a non-deterministic manner.  All currently executing or
1741         enqueued work units are allowed to complete.  Then, all exceptions that
1742         were thrown from any work unit are chained using `Throwable.next` and
1743         rethrown.  The order of the exception chaining is non-deterministic.
1744         */
1745         auto amap(Args...)(Args args)
1746         if (isRandomAccessRange!(Args[0]))
1747         {
1748             import core.internal.lifetime : emplaceRef;
1749 
1750             alias fun = adjoin!(staticMap!(unaryFun, functions));
1751 
1752             alias range = args[0];
1753             immutable len = range.length;
1754 
1755             static if (
1756                 Args.length > 1 &&
1757                 randAssignable!(Args[$ - 1]) &&
1758                 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1]))
1759                 )
1760             {
1761                 import std.conv : text;
1762                 import std.exception : enforce;
1763 
1764                 alias buf = args[$ - 1];
1765                 alias args2 = args[0..$ - 1];
1766                 alias Args2 = Args[0..$ - 1];
1767                 enforce(buf.length == len,
1768                         text("Can't use a user supplied buffer that's the wrong ",
1769                              "size.  (Expected  :", len, " Got:  ", buf.length));
1770             }
1771             else static if (randAssignable!(Args[$ - 1]) && Args.length > 1)
1772             {
1773                 static assert(0, "Wrong buffer type.");
1774             }
1775             else
1776             {
1777                 import std.array : uninitializedArray;
1778 
1779                 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len);
1780                 alias args2 = args;
1781                 alias Args2 = Args;
1782             }
1783 
1784             if (!len) return buf;
1785 
1786             static if (isIntegral!(Args2[$ - 1]))
1787             {
1788                 static assert(args2.length == 2);
1789                 auto workUnitSize = cast(size_t) args2[1];
1790             }
1791             else
1792             {
1793                 static assert(args2.length == 1, Args);
1794                 auto workUnitSize = defaultWorkUnitSize(range.length);
1795             }
1796 
1797             alias R = typeof(range);
1798 
1799             if (workUnitSize > len)
1800             {
1801                 workUnitSize = len;
1802             }
1803 
1804             // Handle as a special case:
1805             if (size == 0)
1806             {
1807                 size_t index = 0;
1808                 foreach (elem; range)
1809                 {
1810                     emplaceRef(buf[index++], fun(elem));
1811                 }
1812                 return buf;
1813             }
1814 
1815             // Effectively -1:  chunkIndex + 1 == 0:
1816             shared size_t workUnitIndex = size_t.max;
1817             shared bool shouldContinue = true;
1818 
1819             void doIt()
1820             {
1821                 import std.algorithm.comparison : min;
1822 
1823                 scope(failure)
1824                 {
1825                     // If an exception is thrown, all threads should bail.
1826                     atomicStore(shouldContinue, false);
1827                 }
1828 
1829                 while (atomicLoad(shouldContinue))
1830                 {
1831                     immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
1832                     immutable start = workUnitSize * myUnitIndex;
1833                     if (start >= len)
1834                     {
1835                         atomicStore(shouldContinue, false);
1836                         break;
1837                     }
1838 
1839                     immutable end = min(len, start + workUnitSize);
1840 
1841                     static if (hasSlicing!R)
1842                     {
1843                         auto subrange = range[start .. end];
1844                         foreach (i; start .. end)
1845                         {
1846                             emplaceRef(buf[i], fun(subrange.front));
1847                             subrange.popFront();
1848                         }
1849                     }
1850                     else
1851                     {
1852                         foreach (i; start .. end)
1853                         {
1854                             emplaceRef(buf[i], fun(range[i]));
1855                         }
1856                     }
1857                 }
1858             }
1859 
1860             submitAndExecute(this, &doIt);
1861             return buf;
1862         }
1863     }
1864 
1865     ///
1866     template map(functions...)
1867     {
1868         /**
1869         A semi-lazy parallel map that can be used for pipelining.  The map
1870         functions are evaluated for the first `bufSize` elements and stored in a
1871         buffer and made available to `popFront`.  Meanwhile, in the
1872         background a second buffer of the same size is filled.  When the first
1873         buffer is exhausted, it is swapped with the second buffer and filled while
1874         the values from what was originally the second buffer are read.  This
1875         implementation allows for elements to be written to the buffer without
1876         the need for atomic operations or synchronization for each write, and
1877         enables the mapping function to be evaluated efficiently in parallel.
1878 
1879         `map` has more overhead than the simpler procedure used by `amap`
1880         but avoids the need to keep all results in memory simultaneously and works
1881         with non-random access ranges.
1882 
1883         Params:
1884 
1885         source = The $(REF_ALTTEXT input range, isInputRange, std,range,primitives)
1886         to be mapped.  If `source` is not random
1887         access it will be lazily buffered to an array of size `bufSize` before
1888         the map function is evaluated.  (For an exception to this rule, see Notes.)
1889 
1890         bufSize = The size of the buffer to store the evaluated elements.
1891 
1892         workUnitSize = The number of elements to evaluate in a single
1893         `Task`.  Must be less than or equal to `bufSize`, and
1894         should be a fraction of `bufSize` such that all worker threads can be
1895         used.  If the default of size_t.max is used, workUnitSize will be set to
1896         the pool-wide default.
1897 
1898         Returns:  An input range representing the results of the map.  This range
1899                   has a length iff `source` has a length.
1900 
1901         Notes:
1902 
1903         If a range returned by `map` or `asyncBuf` is used as an input to
1904         `map`, then as an optimization the copying from the output buffer
1905         of the first range to the input buffer of the second range is elided, even
1906         though the ranges returned by `map` and `asyncBuf` are non-random
1907         access ranges.  This means that the `bufSize` parameter passed to the
1908         current call to `map` will be ignored and the size of the buffer
1909         will be the buffer size of `source`.
1910 
1911         Example:
1912         ---
1913         // Pipeline reading a file, converting each line
1914         // to a number, taking the logarithms of the numbers,
1915         // and performing the additions necessary to find
1916         // the sum of the logarithms.
1917 
1918         auto lineRange = File("numberList.txt").byLine();
1919         auto dupedLines = std.algorithm.map!"a.idup"(lineRange);
1920         auto nums = taskPool.map!(to!double)(dupedLines);
1921         auto logs = taskPool.map!log10(nums);
1922 
1923         double sum = 0;
1924         foreach (elem; logs)
1925         {
1926             sum += elem;
1927         }
1928         ---
1929 
1930         $(B Exception Handling):
1931 
1932         Any exceptions thrown while iterating over `source`
1933         or computing the map function are re-thrown on a call to `popFront` or,
1934         if thrown during construction, are simply allowed to propagate to the
1935         caller.  In the case of exceptions thrown while computing the map function,
1936         the exceptions are chained as in `TaskPool.amap`.
1937         */
1938         auto
1939         map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max)
1940         if (isInputRange!S)
1941         {
1942             import std.exception : enforce;
1943 
1944             enforce(workUnitSize == size_t.max || workUnitSize <= bufSize,
1945                     "Work unit size must be smaller than buffer size.");
1946             alias fun = adjoin!(staticMap!(unaryFun, functions));
1947 
1948             static final class Map
1949             {
1950                 // This is a class because the task needs to be located on the
1951                 // heap and in the non-random access case source needs to be on
1952                 // the heap, too.
1953 
1954             private:
1955                 enum bufferTrick = is(typeof(source.buf1)) &&
1956                 is(typeof(source.bufPos)) &&
1957                 is(typeof(source.doBufSwap()));
1958 
1959                 alias E = MapType!(S, functions);
1960                 E[] buf1, buf2;
1961                 S source;
1962                 TaskPool pool;
1963                 Task!(run, E[] delegate(E[]), E[]) nextBufTask;
1964                 size_t workUnitSize;
1965                 size_t bufPos;
1966                 bool lastTaskWaited;
1967 
1968             static if (isRandomAccessRange!S)
1969             {
1970                 alias FromType = S;
1971 
1972                 void popSource()
1973                 {
1974                     import std.algorithm.comparison : min;
1975 
1976                     static if (__traits(compiles, source[0 .. source.length]))
1977                     {
1978                         source = source[min(buf1.length, source.length)..source.length];
1979                     }
1980                     else static if (__traits(compiles, source[0..$]))
1981                     {
1982                         source = source[min(buf1.length, source.length)..$];
1983                     }
1984                     else
1985                     {
1986                         static assert(0, "S must have slicing for Map."
1987                                       ~ "  " ~ S.stringof ~ " doesn't.");
1988                     }
1989                 }
1990             }
1991             else static if (bufferTrick)
1992             {
1993                 // Make sure we don't have the buffer recycling overload of
1994                 // asyncBuf.
1995                 static if (
1996                     is(typeof(source.source)) &&
1997                     isRoundRobin!(typeof(source.source))
1998                 )
1999                 {
2000                     static assert(0, "Cannot execute a parallel map on " ~
2001                                   "the buffer recycling overload of asyncBuf."
2002                                  );
2003                 }
2004 
2005                 alias FromType = typeof(source.buf1);
2006                 FromType from;
2007 
2008                 // Just swap our input buffer with source's output buffer.
2009                 // No need to copy element by element.
2010                 FromType dumpToFrom()
2011                 {
2012                     import std.algorithm.mutation : swap;
2013 
2014                     assert(source.buf1.length <= from.length);
2015                     from.length = source.buf1.length;
2016                     swap(source.buf1, from);
2017 
2018                     // Just in case this source has been popped before
2019                     // being sent to map:
2020                     from = from[source.bufPos..$];
2021 
2022                     static if (is(typeof(source._length)))
2023                     {
2024                         source._length -= (from.length - source.bufPos);
2025                     }
2026 
2027                     source.doBufSwap();
2028 
2029                     return from;
2030                 }
2031             }
2032             else
2033             {
2034                 alias FromType = ElementType!S[];
2035 
2036                 // The temporary array that data is copied to before being
2037                 // mapped.
2038                 FromType from;
2039 
2040                 FromType dumpToFrom()
2041                 {
2042                     assert(from !is null);
2043 
2044                     size_t i;
2045                     for (; !source.empty && i < from.length; source.popFront())
2046                     {
2047                         from[i++] = source.front;
2048                     }
2049 
2050                     from = from[0 .. i];
2051                     return from;
2052                 }
2053             }
2054 
2055             static if (hasLength!S)
2056             {
2057                 size_t _length;
2058 
2059                 public @property size_t length() const @safe pure nothrow
2060                 {
2061                     return _length;
2062                 }
2063             }
2064 
2065                 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool)
2066                 {
2067                     static if (bufferTrick)
2068                     {
2069                         bufSize = source.buf1.length;
2070                     }
2071 
2072                     buf1.length = bufSize;
2073                     buf2.length = bufSize;
2074 
2075                     static if (!isRandomAccessRange!S)
2076                     {
2077                         from.length = bufSize;
2078                     }
2079 
2080                     this.workUnitSize = (workUnitSize == size_t.max) ?
2081                             pool.defaultWorkUnitSize(bufSize) : workUnitSize;
2082                     this.source = source;
2083                     this.pool = pool;
2084 
2085                     static if (hasLength!S)
2086                     {
2087                         _length = source.length;
2088                     }
2089 
2090                     buf1 = fillBuf(buf1);
2091                     submitBuf2();
2092                 }
2093 
2094                 // The from parameter is a dummy and ignored in the random access
2095                 // case.
2096                 E[] fillBuf(E[] buf)
2097                 {
2098                     import std.algorithm.comparison : min;
2099 
2100                     static if (isRandomAccessRange!S)
2101                     {
2102                         import std.range : take;
2103                         auto toMap = take(source, buf.length);
2104                         scope(success) popSource();
2105                     }
2106                     else
2107                     {
2108                         auto toMap = dumpToFrom();
2109                     }
2110 
2111                     buf = buf[0 .. min(buf.length, toMap.length)];
2112 
2113                     // Handle as a special case:
2114                     if (pool.size == 0)
2115                     {
2116                         size_t index = 0;
2117                         foreach (elem; toMap)
2118                         {
2119                             buf[index++] = fun(elem);
2120                         }
2121                         return buf;
2122                     }
2123 
2124                     pool.amap!functions(toMap, workUnitSize, buf);
2125 
2126                     return buf;
2127                 }
2128 
2129                 void submitBuf2()
2130                 in
2131                 {
2132                     assert(nextBufTask.prev is null);
2133                     assert(nextBufTask.next is null);
2134                 }
2135                 do
2136                 {
2137                     // Hack to reuse the task object.
2138 
2139                     nextBufTask = typeof(nextBufTask).init;
2140                     nextBufTask._args[0] = &fillBuf;
2141                     nextBufTask._args[1] = buf2;
2142                     pool.put(nextBufTask);
2143                 }
2144 
2145                 void doBufSwap()
2146                 {
2147                     if (lastTaskWaited)
2148                     {
2149                         // Then the source is empty.  Signal it here.
2150                         buf1 = null;
2151                         buf2 = null;
2152 
2153                         static if (!isRandomAccessRange!S)
2154                         {
2155                             from = null;
2156                         }
2157 
2158                         return;
2159                     }
2160 
2161                     buf2 = buf1;
2162                     buf1 = nextBufTask.yieldForce;
2163                     bufPos = 0;
2164 
2165                     if (source.empty)
2166                     {
2167                         lastTaskWaited = true;
2168                     }
2169                     else
2170                     {
2171                         submitBuf2();
2172                     }
2173                 }
2174 
2175             public:
2176                 @property auto front()
2177                 {
2178                     return buf1[bufPos];
2179                 }
2180 
2181                 void popFront()
2182                 {
2183                     static if (hasLength!S)
2184                     {
2185                         _length--;
2186                     }
2187 
2188                     bufPos++;
2189                     if (bufPos >= buf1.length)
2190                     {
2191                         doBufSwap();
2192                     }
2193                 }
2194 
2195                 static if (isInfinite!S)
2196                 {
2197                     enum bool empty = false;
2198                 }
2199                 else
2200                 {
2201 
2202                     bool empty() const @property
2203                     {
2204                         // popFront() sets this when source is empty
2205                         return buf1.length == 0;
2206                     }
2207                 }
2208             }
2209             return new Map(source, bufSize, workUnitSize, this);
2210         }
2211     }
2212 
2213     /**
2214     Given a `source` range that is expensive to iterate over, returns an
2215     $(REF_ALTTEXT input range, isInputRange, std,range,primitives) that
2216     asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread,
2217     while making previously buffered elements from a second buffer, also of size
2218     `bufSize`, available via the range interface of the returned
2219     object.  The returned range has a length iff `hasLength!S`.
2220     `asyncBuf` is useful, for example, when performing expensive operations
2221     on the elements of ranges that represent data on a disk or network.
2222 
2223     Example:
2224     ---
2225     import std.conv, std.stdio;
2226 
2227     void main()
2228     {
2229         // Fetch lines of a file in a background thread
2230         // while processing previously fetched lines,
2231         // dealing with byLine's buffer recycling by
2232         // eagerly duplicating every line.
2233         auto lines = File("foo.txt").byLine();
2234         auto duped = std.algorithm.map!"a.idup"(lines);
2235 
2236         // Fetch more lines in the background while we
2237         // process the lines already read into memory
2238         // into a matrix of doubles.
2239         double[][] matrix;
2240         auto asyncReader = taskPool.asyncBuf(duped);
2241 
2242         foreach (line; asyncReader)
2243         {
2244             auto ls = line.split("\t");
2245             matrix ~= to!(double[])(ls);
2246         }
2247     }
2248     ---
2249 
2250     $(B Exception Handling):
2251 
2252     Any exceptions thrown while iterating over `source` are re-thrown on a
2253     call to `popFront` or, if thrown during construction, simply
2254     allowed to propagate to the caller.
2255     */
2256     auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S)
2257     {
2258         static final class AsyncBuf
2259         {
2260             // This is a class because the task and source both need to be on
2261             // the heap.
2262 
2263             // The element type of S.
2264             alias E = ElementType!S;  // Needs to be here b/c of forward ref bugs.
2265 
2266         private:
2267             E[] buf1, buf2;
2268             S source;
2269             TaskPool pool;
2270             Task!(run, E[] delegate(E[]), E[]) nextBufTask;
2271             size_t bufPos;
2272             bool lastTaskWaited;
2273 
2274             static if (hasLength!S)
2275             {
2276                 size_t _length;
2277 
2278                 // Available if hasLength!S.
2279                 public @property size_t length() const @safe pure nothrow
2280                 {
2281                     return _length;
2282                 }
2283             }
2284 
2285             this(S source, size_t bufSize, TaskPool pool)
2286             {
2287                 buf1.length = bufSize;
2288                 buf2.length = bufSize;
2289 
2290                 this.source = source;
2291                 this.pool = pool;
2292 
2293                 static if (hasLength!S)
2294                 {
2295                     _length = source.length;
2296                 }
2297 
2298                 buf1 = fillBuf(buf1);
2299                 submitBuf2();
2300             }
2301 
2302             E[] fillBuf(E[] buf)
2303             {
2304                 assert(buf !is null);
2305 
2306                 size_t i;
2307                 for (; !source.empty && i < buf.length; source.popFront())
2308                 {
2309                     buf[i++] = source.front;
2310                 }
2311 
2312                 buf = buf[0 .. i];
2313                 return buf;
2314             }
2315 
2316             void submitBuf2()
2317             in
2318             {
2319                 assert(nextBufTask.prev is null);
2320                 assert(nextBufTask.next is null);
2321             }
2322             do
2323             {
2324                 // Hack to reuse the task object.
2325 
2326                 nextBufTask = typeof(nextBufTask).init;
2327                 nextBufTask._args[0] = &fillBuf;
2328                 nextBufTask._args[1] = buf2;
2329                 pool.put(nextBufTask);
2330             }
2331 
2332             void doBufSwap()
2333             {
2334                 if (lastTaskWaited)
2335                 {
2336                     // Then source is empty.  Signal it here.
2337                     buf1 = null;
2338                     buf2 = null;
2339                     return;
2340                 }
2341 
2342                 buf2 = buf1;
2343                 buf1 = nextBufTask.yieldForce;
2344                 bufPos = 0;
2345 
2346                 if (source.empty)
2347                 {
2348                     lastTaskWaited = true;
2349                 }
2350                 else
2351                 {
2352                     submitBuf2();
2353                 }
2354             }
2355 
2356         public:
2357             E front() @property
2358             {
2359                 return buf1[bufPos];
2360             }
2361 
2362             void popFront()
2363             {
2364                 static if (hasLength!S)
2365                 {
2366                     _length--;
2367                 }
2368 
2369                 bufPos++;
2370                 if (bufPos >= buf1.length)
2371                 {
2372                     doBufSwap();
2373                 }
2374             }
2375 
2376             static if (isInfinite!S)
2377             {
2378                 enum bool empty = false;
2379             }
2380 
2381             else
2382             {
2383                 ///
2384                 bool empty() @property
2385                 {
2386                     // popFront() sets this when source is empty:
2387                     return buf1.length == 0;
2388                 }
2389             }
2390         }
2391         return new AsyncBuf(source, bufSize, this);
2392     }
2393 
2394     /**
2395     Given a callable object `next` that writes to a user-provided buffer and
2396     a second callable object `empty` that determines whether more data is
2397     available to write via `next`, returns an input range that
2398     asynchronously calls `next` with a set of size `nBuffers` of buffers
2399     and makes the results available in the order they were obtained via the
2400     input range interface of the returned object.  Similarly to the
2401     input range overload of `asyncBuf`, the first half of the buffers
2402     are made available via the range interface while the second half are
2403     filled and vice-versa.
2404 
2405     Params:
2406 
2407     next = A callable object that takes a single argument that must be an array
2408            with mutable elements.  When called, `next` writes data to
2409            the array provided by the caller.
2410 
2411     empty = A callable object that takes no arguments and returns a type
2412             implicitly convertible to `bool`.  This is used to signify
2413             that no more data is available to be obtained by calling `next`.
2414 
2415     initialBufSize = The initial size of each buffer.  If `next` takes its
2416                      array by reference, it may resize the buffers.
2417 
2418     nBuffers = The number of buffers to cycle through when calling `next`.
2419 
2420     Example:
2421     ---
2422     // Fetch lines of a file in a background
2423     // thread while processing previously fetched
2424     // lines, without duplicating any lines.
2425     auto file = File("foo.txt");
2426 
2427     void next(ref char[] buf)
2428     {
2429         file.readln(buf);
2430     }
2431 
2432     // Fetch more lines in the background while we
2433     // process the lines already read into memory
2434     // into a matrix of doubles.
2435     double[][] matrix;
2436     auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
2437 
2438     foreach (line; asyncReader)
2439     {
2440         auto ls = line.split("\t");
2441         matrix ~= to!(double[])(ls);
2442     }
2443     ---
2444 
2445     $(B Exception Handling):
2446 
2447     Any exceptions thrown while iterating over `range` are re-thrown on a
2448     call to `popFront`.
2449 
2450     Warning:
2451 
2452     Using the range returned by this function in a parallel foreach loop
2453     will not work because buffers may be overwritten while the task that
2454     processes them is in queue.  This is checked for at compile time
2455     and will result in a static assertion failure.
2456     */
2457     auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100)
2458     if (is(typeof(C2.init()) : bool) &&
2459         Parameters!C1.length == 1 &&
2460         Parameters!C2.length == 0 &&
2461         isArray!(Parameters!C1[0])
2462     ) {
2463         auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers);
2464         return asyncBuf(roundRobin, nBuffers / 2);
2465     }
2466 
2467     ///
2468     template reduce(functions...)
2469     {
2470         /**
2471         Parallel reduce on a random access range.  Except as otherwise noted,
2472         usage is similar to $(REF _reduce, std,algorithm,iteration).  There is
2473         also $(LREF fold) which does the same thing with a different parameter
2474         order.
2475 
2476         This function works by splitting the range to be reduced into work
2477         units, which are slices to be reduced in parallel.  Once the results
2478         from all work units are computed, a final serial reduction is performed
2479         on these results to compute the final answer. Therefore, care must be
2480         taken to choose the seed value appropriately.
2481 
2482         Because the reduction is being performed in parallel, `functions`
2483         must be associative.  For notational simplicity, let # be an
2484         infix operator representing `functions`.  Then, (a # b) # c must equal
2485         a # (b # c).  Floating point addition is not associative
2486         even though addition in exact arithmetic is.  Summing floating
2487         point numbers using this function may give different results than summing
2488         serially.  However, for many practical purposes floating point addition
2489         can be treated as associative.
2490 
2491         Note that, since `functions` are assumed to be associative,
2492         additional optimizations are made to the serial portion of the reduction
2493         algorithm. These take advantage of the instruction level parallelism of
2494         modern CPUs, in addition to the thread-level parallelism that the rest
2495         of this module exploits.  This can lead to better than linear speedups
2496         relative to $(REF _reduce, std,algorithm,iteration), especially for
2497         fine-grained benchmarks like dot products.
2498 
2499         An explicit seed may be provided as the first argument.  If
2500         provided, it is used as the seed for all work units and for the final
2501         reduction of results from all work units.  Therefore, if it is not the
2502         identity value for the operation being performed, results may differ
2503         from those generated by $(REF _reduce, std,algorithm,iteration) or
2504         depending on how many work units are used.  The next argument must be
2505         the range to be reduced.
2506         ---
2507         // Find the sum of squares of a range in parallel, using
2508         // an explicit seed.
2509         //
2510         // Timings on an Athlon 64 X2 dual core machine:
2511         //
2512         // Parallel reduce:                     72 milliseconds
2513         // Using std.algorithm.reduce instead:  181 milliseconds
2514         auto nums = iota(10_000_000.0f);
2515         auto sumSquares = taskPool.reduce!"a + b"(
2516             0.0, std.algorithm.map!"a * a"(nums)
2517         );
2518         ---
2519 
2520         If no explicit seed is provided, the first element of each work unit
2521         is used as a seed.  For the final reduction, the result from the first
2522         work unit is used as the seed.
2523         ---
2524         // Find the sum of a range in parallel, using the first
2525         // element of each work unit as the seed.
2526         auto sum = taskPool.reduce!"a + b"(nums);
2527         ---
2528 
2529         An explicit work unit size may be specified as the last argument.
2530         Specifying too small a work unit size will effectively serialize the
2531         reduction, as the final reduction of the result of each work unit will
2532         dominate computation time.  If `TaskPool.size` for this instance
2533         is zero, this parameter is ignored and one work unit is used.
2534         ---
2535         // Use a work unit size of 100.
2536         auto sum2 = taskPool.reduce!"a + b"(nums, 100);
2537 
2538         // Work unit size of 100 and explicit seed.
2539         auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100);
2540         ---
2541 
2542         Parallel reduce supports multiple functions, like
2543         `std.algorithm.reduce`.
2544         ---
2545         // Find both the min and max of nums.
2546         auto minMax = taskPool.reduce!(min, max)(nums);
2547         assert(minMax[0] == reduce!min(nums));
2548         assert(minMax[1] == reduce!max(nums));
2549         ---
2550 
2551         $(B Exception Handling):
2552 
2553         After this function is finished executing, any exceptions thrown
2554         are chained together via `Throwable.next` and rethrown.  The chaining
2555         order is non-deterministic.
2556 
2557         See_Also:
2558 
2559             $(LREF fold) is functionally equivalent to $(LREF _reduce) except the
2560             range parameter comes first and there is no need to use
2561             $(REF_ALTTEXT `tuple`,tuple,std,typecons) for multiple seeds.
2562          */
2563         auto reduce(Args...)(Args args)
2564         {
2565             import core.exception : OutOfMemoryError;
2566             import core.internal.lifetime : emplaceRef;
2567             import std.exception : enforce;
2568 
2569             alias fun = reduceAdjoin!functions;
2570             alias finishFun = reduceFinish!functions;
2571 
2572             static if (isIntegral!(Args[$ - 1]))
2573             {
2574                 size_t workUnitSize = cast(size_t) args[$ - 1];
2575                 alias args2 = args[0..$ - 1];
2576                 alias Args2 = Args[0..$ - 1];
2577             }
2578             else
2579             {
2580                 alias args2 = args;
2581                 alias Args2 = Args;
2582             }
2583 
2584             auto makeStartValue(Type)(Type e)
2585             {
2586                 static if (functions.length == 1)
2587                 {
2588                     return e;
2589                 }
2590                 else
2591                 {
2592                     typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void;
2593                     foreach (i, T; seed.Types)
2594                     {
2595                         emplaceRef(seed.expand[i], e);
2596                     }
2597 
2598                     return seed;
2599                 }
2600             }
2601 
2602             static if (args2.length == 2)
2603             {
2604                 static assert(isInputRange!(Args2[1]));
2605                 alias range = args2[1];
2606                 alias seed = args2[0];
2607                 enum explicitSeed = true;
2608 
2609                 static if (!is(typeof(workUnitSize)))
2610                 {
2611                     size_t workUnitSize = defaultWorkUnitSize(range.length);
2612                 }
2613             }
2614             else
2615             {
2616                 static assert(args2.length == 1);
2617                 alias range = args2[0];
2618 
2619                 static if (!is(typeof(workUnitSize)))
2620                 {
2621                     size_t workUnitSize = defaultWorkUnitSize(range.length);
2622                 }
2623 
2624                 enforce(!range.empty,
2625                     "Cannot reduce an empty range with first element as start value.");
2626 
2627                 auto seed = makeStartValue(range.front);
2628                 enum explicitSeed = false;
2629                 range.popFront();
2630             }
2631 
2632             alias E = typeof(seed);
2633             alias R = typeof(range);
2634 
2635             E reduceOnRange(R range, size_t lowerBound, size_t upperBound)
2636             {
2637                 // This is for exploiting instruction level parallelism by
2638                 // using multiple accumulator variables within each thread,
2639                 // since we're assuming functions are associative anyhow.
2640 
2641                 // This is so that loops can be unrolled automatically.
2642                 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5);
2643                 enum nILP = ilpTuple.length;
2644                 immutable subSize = (upperBound - lowerBound) / nILP;
2645 
2646                 if (subSize <= 1)
2647                 {
2648                     // Handle as a special case.
2649                     static if (explicitSeed)
2650                     {
2651                         E result = seed;
2652                     }
2653                     else
2654                     {
2655                         E result = makeStartValue(range[lowerBound]);
2656                         lowerBound++;
2657                     }
2658 
2659                     foreach (i; lowerBound .. upperBound)
2660                     {
2661                         result = fun(result, range[i]);
2662                     }
2663 
2664                     return result;
2665                 }
2666 
2667                 assert(subSize > 1);
2668                 E[nILP] results;
2669                 size_t[nILP] offsets;
2670 
2671                 foreach (i; ilpTuple)
2672                 {
2673                     offsets[i] = lowerBound + subSize * i;
2674 
2675                     static if (explicitSeed)
2676                     {
2677                         results[i] = seed;
2678                     }
2679                     else
2680                     {
2681                         results[i] = makeStartValue(range[offsets[i]]);
2682                         offsets[i]++;
2683                     }
2684                 }
2685 
2686                 immutable nLoop = subSize - (!explicitSeed);
2687                 foreach (i; 0 .. nLoop)
2688                 {
2689                     foreach (j; ilpTuple)
2690                     {
2691                         results[j] = fun(results[j], range[offsets[j]]);
2692                         offsets[j]++;
2693                     }
2694                 }
2695 
2696                 // Finish the remainder.
2697                 foreach (i; nILP * subSize + lowerBound .. upperBound)
2698                 {
2699                     results[$ - 1] = fun(results[$ - 1], range[i]);
2700                 }
2701 
2702                 foreach (i; ilpTuple[1..$])
2703                 {
2704                     results[0] = finishFun(results[0], results[i]);
2705                 }
2706 
2707                 return results[0];
2708             }
2709 
2710             immutable len = range.length;
2711             if (len == 0)
2712             {
2713                 return seed;
2714             }
2715 
2716             if (this.size == 0)
2717             {
2718                 return finishFun(seed, reduceOnRange(range, 0, len));
2719             }
2720 
2721             // Unlike the rest of the functions here, I can't use the Task object
2722             // recycling trick here because this has to work on non-commutative
2723             // operations.  After all the tasks are done executing, fun() has to
2724             // be applied on the results of these to get a final result, but
2725             // it can't be evaluated out of order.
2726 
2727             if (workUnitSize > len)
2728             {
2729                 workUnitSize = len;
2730             }
2731 
2732             immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1);
2733             assert(nWorkUnits * workUnitSize >= len);
2734 
2735             alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t);
2736             RTask[] tasks;
2737 
2738             // Can't use alloca() due to https://issues.dlang.org/show_bug.cgi?id=3753
2739             // Use a fixed buffer backed by malloc().
2740             enum maxStack = 2_048;
2741             byte[maxStack] buf = void;
2742             immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof;
2743 
2744             import core.stdc.stdlib : malloc, free;
2745             if (nBytesNeeded <= maxStack)
2746             {
2747                 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits];
2748             }
2749             else
2750             {
2751                 auto ptr = cast(RTask*) malloc(nBytesNeeded);
2752                 if (!ptr)
2753                 {
2754                     throw new OutOfMemoryError(
2755                         "Out of memory in std.parallelism."
2756                     );
2757                 }
2758 
2759                 tasks = ptr[0 .. nWorkUnits];
2760             }
2761 
2762             scope(exit)
2763             {
2764                 if (nBytesNeeded > maxStack)
2765                 {
2766                     free(tasks.ptr);
2767                 }
2768             }
2769 
2770             // Hack to take the address of a nested function w/o
2771             // making a closure.
2772             static auto scopedAddress(D)(scope D del) @system
2773             {
2774                 auto tmp = del;
2775                 return tmp;
2776             }
2777 
2778             size_t curPos = 0;
2779             void useTask(ref RTask task)
2780             {
2781                 import std.algorithm.comparison : min;
2782                 import core.lifetime : emplace;
2783 
2784                 // Private constructor, so can't feed it's arguments directly
2785                 // to emplace
2786                 emplace(&task, RTask
2787                 (
2788                     scopedAddress(&reduceOnRange),
2789                     range,
2790                     curPos, // lower bound.
2791                     cast() min(len, curPos + workUnitSize)  // upper bound.
2792                 ));
2793 
2794                 task.pool = this;
2795 
2796                 curPos += workUnitSize;
2797             }
2798 
2799             foreach (ref task; tasks)
2800             {
2801                 useTask(task);
2802             }
2803 
2804             foreach (i; 1 .. tasks.length - 1)
2805             {
2806                 tasks[i].next = tasks[i + 1].basePtr;
2807                 tasks[i + 1].prev = tasks[i].basePtr;
2808             }
2809 
2810             if (tasks.length > 1)
2811             {
2812                 queueLock();
2813                 scope(exit) queueUnlock();
2814 
2815                 abstractPutGroupNoSync(
2816                     tasks[1].basePtr,
2817                     tasks[$ - 1].basePtr
2818                 );
2819             }
2820 
2821             if (tasks.length > 0)
2822             {
2823                 try
2824                 {
2825                     tasks[0].job();
2826                 }
2827                 catch (Throwable e)
2828                 {
2829                     tasks[0].exception = e;
2830                 }
2831                 tasks[0].taskStatus = TaskStatus.done;
2832 
2833                 // Try to execute each of these in the current thread
2834                 foreach (ref task; tasks[1..$])
2835                 {
2836                     tryDeleteExecute(task.basePtr);
2837                 }
2838             }
2839 
2840             // Now that we've tried to execute every task, they're all either
2841             // done or in progress.  Force all of them.
2842             E result = seed;
2843 
2844             Throwable firstException;
2845 
2846             foreach (ref task; tasks)
2847             {
2848                 try
2849                 {
2850                     task.yieldForce;
2851                 }
2852                 catch (Throwable e)
2853                 {
2854                     /* Chain e to front because order doesn't matter and because
2855                      * e is not likely to be a chain itself (so fewer traversals)
2856                      */
2857                     firstException = Throwable.chainTogether(e, firstException);
2858                     continue;
2859                 }
2860 
2861                 if (!firstException) result = finishFun(result, task.returnVal);
2862             }
2863 
2864             if (firstException) throw firstException;
2865 
2866             return result;
2867         }
2868     }
2869 
2870     ///
2871     template fold(functions...)
2872     {
2873         /** Implements the homonym function (also known as `accumulate`, `compress`,
2874             `inject`, or `foldl`) present in various programming languages of
2875             functional flavor.
2876 
2877             `fold` is functionally equivalent to $(LREF reduce) except the range
2878             parameter comes first and there is no need to use $(REF_ALTTEXT
2879             `tuple`,tuple,std,typecons) for multiple seeds.
2880 
2881             There may be one or more callable entities (`functions` argument) to
2882             apply.
2883 
2884             Params:
2885                 args = Just the range to _fold over; or the range and one seed
2886                        per function; or the range, one seed per function, and
2887                        the work unit size
2888 
2889             Returns:
2890                 The accumulated result as a single value for single function and
2891                 as a tuple of values for multiple functions
2892 
2893             See_Also:
2894             Similar to $(REF _fold, std,algorithm,iteration), `fold` is a wrapper around $(LREF reduce).
2895 
2896             Example:
2897             ---
2898             static int adder(int a, int b)
2899             {
2900                 return a + b;
2901             }
2902             static int multiplier(int a, int b)
2903             {
2904                 return a * b;
2905             }
2906 
2907             // Just the range
2908             auto x = taskPool.fold!adder([1, 2, 3, 4]);
2909             assert(x == 10);
2910 
2911             // The range and the seeds (0 and 1 below; also note multiple
2912             // functions in this example)
2913             auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1);
2914             assert(y[0] == 10);
2915             assert(y[1] == 24);
2916 
2917             // The range, the seed (0), and the work unit size (20)
2918             auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20);
2919             assert(z == 10);
2920             ---
2921         */
2922         auto fold(Args...)(Args args)
2923         {
2924             static assert(isInputRange!(Args[0]), "First argument must be an InputRange");
2925 
2926             alias range = args[0];
2927 
2928             static if (Args.length == 1)
2929             {
2930                 // Just the range
2931                 return reduce!functions(range);
2932             }
2933             else static if (Args.length == 1 + functions.length ||
2934                             Args.length == 1 + functions.length + 1)
2935             {
2936                 static if (functions.length == 1)
2937                 {
2938                     alias seeds = args[1];
2939                 }
2940                 else
2941                 {
2942                     auto seeds()
2943                     {
2944                         import std.typecons : tuple;
2945                         return tuple(args[1 .. functions.length+1]);
2946                     }
2947                 }
2948 
2949                 static if (Args.length == 1 + functions.length)
2950                 {
2951                     // The range and the seeds
2952                     return reduce!functions(seeds, range);
2953                 }
2954                 else static if (Args.length == 1 + functions.length + 1)
2955                 {
2956                     // The range, the seeds, and the work unit size
2957                     static assert(isIntegral!(Args[$-1]), "Work unit size must be an integral type");
2958                     return reduce!functions(seeds, range, args[$-1]);
2959                 }
2960             }
2961             else
2962             {
2963                 import std.conv : text;
2964                 static assert(0, "Invalid number of arguments (" ~ Args.length.text ~ "): Should be an input range, "
2965                               ~ functions.length.text ~ " optional seed(s), and an optional work unit size.");
2966             }
2967         }
2968     }
2969 
2970     // This test is not included in the documentation because even though these
2971     // examples are for the inner fold() template, with their current location,
2972     // they would appear under the outer one. (We can't move this inside the
2973     // outer fold() template because then dmd runs out of memory possibly due to
2974     // recursive template instantiation, which is surprisingly not caught.)
2975     @system unittest
2976     {
2977         // Just the range
2978         auto x = taskPool.fold!"a + b"([1, 2, 3, 4]);
2979         assert(x == 10);
2980 
2981         // The range and the seeds (0 and 1 below; also note multiple
2982         // functions in this example)
2983         auto y = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 1);
2984         assert(y[0] == 10);
2985         assert(y[1] == 24);
2986 
2987         // The range, the seed (0), and the work unit size (20)
2988         auto z = taskPool.fold!"a + b"([1, 2, 3, 4], 0, 20);
2989         assert(z == 10);
2990     }
2991 
2992     /**
2993     Gets the index of the current thread relative to this `TaskPool`.  Any
2994     thread not in this pool will receive an index of 0.  The worker threads in
2995     this pool receive unique indices of 1 through `this.size`.
2996 
2997     This function is useful for maintaining worker-local resources.
2998 
2999     Example:
3000     ---
3001     // Execute a loop that computes the greatest common
3002     // divisor of every number from 0 through 999 with
3003     // 42 in parallel.  Write the results out to
3004     // a set of files, one for each thread.  This allows
3005     // results to be written out without any synchronization.
3006 
3007     import std.conv, std.range, std.numeric, std.stdio;
3008 
3009     void main()
3010     {
3011         auto filesHandles = new File[taskPool.size + 1];
3012         scope(exit) {
3013             foreach (ref handle; fileHandles)
3014             {
3015                 handle.close();
3016             }
3017         }
3018 
3019         foreach (i, ref handle; fileHandles)
3020         {
3021             handle = File("workerResults" ~ to!string(i) ~ ".txt");
3022         }
3023 
3024         foreach (num; parallel(iota(1_000)))
3025         {
3026             auto outHandle = fileHandles[taskPool.workerIndex];
3027             outHandle.writeln(num, '\t', gcd(num, 42));
3028         }
3029     }
3030     ---
3031     */
3032     size_t workerIndex() @property @safe const nothrow
3033     {
3034         immutable rawInd = threadIndex;
3035         return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ?
3036                 (rawInd - instanceStartIndex + 1) : 0;
3037     }
3038 
3039     /**
3040     Struct for creating worker-local storage.  Worker-local storage is
3041     thread-local storage that exists only for worker threads in a given
3042     `TaskPool` plus a single thread outside the pool.  It is allocated on the
3043     garbage collected heap in a way that avoids _false sharing, and doesn't
3044     necessarily have global scope within any thread.  It can be accessed from
3045     any worker thread in the `TaskPool` that created it, and one thread
3046     outside this `TaskPool`.  All threads outside the pool that created a
3047     given instance of worker-local storage share a single slot.
3048 
3049     Since the underlying data for this struct is heap-allocated, this struct
3050     has reference semantics when passed between functions.
3051 
3052     The main uses cases for `WorkerLocalStorage` are:
3053 
3054     1.  Performing parallel reductions with an imperative, as opposed to
3055         functional, programming style.  In this case, it's useful to treat
3056         `WorkerLocalStorage` as local to each thread for only the parallel
3057         portion of an algorithm.
3058 
3059     2.  Recycling temporary buffers across iterations of a parallel foreach loop.
3060 
3061     Example:
3062     ---
3063     // Calculate pi as in our synopsis example, but
3064     // use an imperative instead of a functional style.
3065     immutable n = 1_000_000_000;
3066     immutable delta = 1.0L / n;
3067 
3068     auto sums = taskPool.workerLocalStorage(0.0L);
3069     foreach (i; parallel(iota(n)))
3070     {
3071         immutable x = ( i - 0.5L ) * delta;
3072         immutable toAdd = delta / ( 1.0 + x * x );
3073         sums.get += toAdd;
3074     }
3075 
3076     // Add up the results from each worker thread.
3077     real pi = 0;
3078     foreach (threadResult; sums.toRange)
3079     {
3080         pi += 4.0L * threadResult;
3081     }
3082     ---
3083      */
3084     static struct WorkerLocalStorage(T)
3085     {
3086     private:
3087         TaskPool pool;
3088         size_t size;
3089 
3090         size_t elemSize;
3091         bool* stillThreadLocal;
3092 
3093         static size_t roundToLine(size_t num) pure nothrow
3094         {
3095             if (num % cacheLineSize == 0)
3096             {
3097                 return num;
3098             }
3099             else
3100             {
3101                 return ((num / cacheLineSize) + 1) * cacheLineSize;
3102             }
3103         }
3104 
3105         void* data;
3106 
3107         void initialize(TaskPool pool)
3108         {
3109             this.pool = pool;
3110             size = pool.size + 1;
3111             stillThreadLocal = new bool;
3112             *stillThreadLocal = true;
3113 
3114             // Determines whether the GC should scan the array.
3115             auto blkInfo = (typeid(T).flags & 1) ?
3116                            cast(GC.BlkAttr) 0 :
3117                            GC.BlkAttr.NO_SCAN;
3118 
3119             immutable nElem = pool.size + 1;
3120             elemSize = roundToLine(T.sizeof);
3121 
3122             // The + 3 is to pad one full cache line worth of space on either side
3123             // of the data structure to make sure false sharing with completely
3124             // unrelated heap data is prevented, and to provide enough padding to
3125             // make sure that data is cache line-aligned.
3126             data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize;
3127 
3128             // Cache line align data ptr.
3129             data = cast(void*) roundToLine(cast(size_t) data);
3130 
3131             foreach (i; 0 .. nElem)
3132             {
3133                 this.opIndex(i) = T.init;
3134             }
3135         }
3136 
3137         ref opIndex(this Qualified)(size_t index)
3138         {
3139             import std.conv : text;
3140             assert(index < size, text(index, '\t', uint.max));
3141             return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index));
3142         }
3143 
3144         void opIndexAssign(T val, size_t index)
3145         {
3146             assert(index < size);
3147             *(cast(T*) (data + elemSize * index)) = val;
3148         }
3149 
3150     public:
3151         /**
3152         Get the current thread's instance.  Returns by ref.
3153         Note that calling `get` from any thread
3154         outside the `TaskPool` that created this instance will return the
3155         same reference, so an instance of worker-local storage should only be
3156         accessed from one thread outside the pool that created it.  If this
3157         rule is violated, undefined behavior will result.
3158 
3159         If assertions are enabled and `toRange` has been called, then this
3160         WorkerLocalStorage instance is no longer worker-local and an assertion
3161         failure will result when calling this method.  This is not checked
3162         when assertions are disabled for performance reasons.
3163          */
3164         ref get(this Qualified)() @property
3165         {
3166             assert(*stillThreadLocal,
3167                 "Cannot call get() on this instance of WorkerLocalStorage " ~
3168                 "because it is no longer worker-local."
3169             );
3170             return opIndex(pool.workerIndex);
3171         }
3172 
3173         /**
3174         Assign a value to the current thread's instance.  This function has
3175         the same caveats as its overload.
3176         */
3177         void get(T val) @property
3178         {
3179             assert(*stillThreadLocal,
3180                 "Cannot call get() on this instance of WorkerLocalStorage " ~
3181                 "because it is no longer worker-local."
3182             );
3183 
3184             opIndexAssign(val, pool.workerIndex);
3185         }
3186 
3187         /**
3188         Returns a range view of the values for all threads, which can be used
3189         to further process the results of each thread after running the parallel
3190         part of your algorithm.  Do not use this method in the parallel portion
3191         of your algorithm.
3192 
3193         Calling this function sets a flag indicating that this struct is no
3194         longer worker-local, and attempting to use the `get` method again
3195         will result in an assertion failure if assertions are enabled.
3196          */
3197         WorkerLocalStorageRange!T toRange() @property
3198         {
3199             if (*stillThreadLocal)
3200             {
3201                 *stillThreadLocal = false;
3202 
3203                 // Make absolutely sure results are visible to all threads.
3204                 // This is probably not necessary since some other
3205                 // synchronization primitive will be used to signal that the
3206                 // parallel part of the algorithm is done, but the
3207                 // performance impact should be negligible, so it's better
3208                 // to be safe.
3209                 ubyte barrierDummy;
3210                 atomicSetUbyte(barrierDummy, 1);
3211             }
3212 
3213             return WorkerLocalStorageRange!T(this);
3214         }
3215     }
3216 
3217     /**
3218     Range primitives for worker-local storage.  The purpose of this is to
3219     access results produced by each worker thread from a single thread once you
3220     are no longer using the worker-local storage from multiple threads.
3221     Do not use this struct in the parallel portion of your algorithm.
3222 
3223     The proper way to instantiate this object is to call
3224     `WorkerLocalStorage.toRange`.  Once instantiated, this object behaves
3225     as a finite random-access range with assignable, lvalue elements and
3226     a length equal to the number of worker threads in the `TaskPool` that
3227     created it plus 1.
3228      */
3229     static struct WorkerLocalStorageRange(T)
3230     {
3231     private:
3232         WorkerLocalStorage!T workerLocalStorage;
3233 
3234         size_t _length;
3235         size_t beginOffset;
3236 
3237         this(WorkerLocalStorage!T wl)
3238         {
3239             this.workerLocalStorage = wl;
3240             _length = wl.size;
3241         }
3242 
3243     public:
3244         ref front(this Qualified)() @property
3245         {
3246             return this[0];
3247         }
3248 
3249         ref back(this Qualified)() @property
3250         {
3251             return this[_length - 1];
3252         }
3253 
3254         void popFront()
3255         {
3256             if (_length > 0)
3257             {
3258                 beginOffset++;
3259                 _length--;
3260             }
3261         }
3262 
3263         void popBack()
3264         {
3265             if (_length > 0)
3266             {
3267                 _length--;
3268             }
3269         }
3270 
3271         typeof(this) save() @property
3272         {
3273             return this;
3274         }
3275 
3276         ref opIndex(this Qualified)(size_t index)
3277         {
3278             assert(index < _length);
3279             return workerLocalStorage[index + beginOffset];
3280         }
3281 
3282         void opIndexAssign(T val, size_t index)
3283         {
3284             assert(index < _length);
3285             workerLocalStorage[index] = val;
3286         }
3287 
3288         typeof(this) opSlice(size_t lower, size_t upper)
3289         {
3290             assert(upper <= _length);
3291             auto newWl = this.workerLocalStorage;
3292             newWl.data += lower * newWl.elemSize;
3293             newWl.size = upper - lower;
3294             return typeof(this)(newWl);
3295         }
3296 
3297         bool empty() const @property
3298         {
3299             return length == 0;
3300         }
3301 
3302         size_t length() const @property
3303         {
3304             return _length;
3305         }
3306     }
3307 
3308     /**
3309     Creates an instance of worker-local storage, initialized with a given
3310     value.  The value is `lazy` so that you can, for example, easily
3311     create one instance of a class for each worker.  For usage example,
3312     see the `WorkerLocalStorage` struct.
3313      */
3314     WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init)
3315     {
3316         WorkerLocalStorage!T ret;
3317         ret.initialize(this);
3318         foreach (i; 0 .. size + 1)
3319         {
3320             ret[i] = initialVal;
3321         }
3322 
3323         // Memory barrier to make absolutely sure that what we wrote is
3324         // visible to worker threads.
3325         ubyte barrierDummy;
3326         atomicSetUbyte(barrierDummy, 0);
3327 
3328         return ret;
3329     }
3330 
3331     /**
3332     Signals to all worker threads to terminate as soon as they are finished
3333     with their current `Task`, or immediately if they are not executing a
3334     `Task`.  `Task`s that were in queue will not be executed unless
3335     a call to `Task.workForce`, `Task.yieldForce` or `Task.spinForce`
3336     causes them to be executed.
3337 
3338     Use only if you have waited on every `Task` and therefore know the
3339     queue is empty, or if you speculatively executed some tasks and no longer
3340     need the results.
3341      */
3342     void stop() @trusted
3343     {
3344         queueLock();
3345         scope(exit) queueUnlock();
3346         atomicSetUbyte(status, PoolState.stopNow);
3347         notifyAll();
3348     }
3349 
3350     /**
3351     Signals worker threads to terminate when the queue becomes empty.
3352 
3353     If blocking argument is true, wait for all worker threads to terminate
3354     before returning.  This option might be used in applications where
3355     task results are never consumed-- e.g. when `TaskPool` is employed as a
3356     rudimentary scheduler for tasks which communicate by means other than
3357     return values.
3358 
3359     Warning:  Calling this function with $(D blocking = true) from a worker
3360               thread that is a member of the same `TaskPool` that
3361               `finish` is being called on will result in a deadlock.
3362      */
3363     void finish(bool blocking = false) @trusted
3364     {
3365         {
3366             queueLock();
3367             scope(exit) queueUnlock();
3368             atomicCasUbyte(status, PoolState.running, PoolState.finishing);
3369             notifyAll();
3370         }
3371         if (blocking)
3372         {
3373             // Use this thread as a worker until everything is finished.
3374             executeWorkLoop();
3375 
3376             foreach (t; pool)
3377             {
3378                 // Maybe there should be something here to prevent a thread
3379                 // from calling join() on itself if this function is called
3380                 // from a worker thread in the same pool, but:
3381                 //
3382                 // 1.  Using an if statement to skip join() would result in
3383                 //     finish() returning without all tasks being finished.
3384                 //
3385                 // 2.  If an exception were thrown, it would bubble up to the
3386                 //     Task from which finish() was called and likely be
3387                 //     swallowed.
3388                 t.join();
3389             }
3390         }
3391     }
3392 
3393     /// Returns the number of worker threads in the pool.
3394     @property size_t size() @safe const pure nothrow
3395     {
3396         return pool.length;
3397     }
3398 
3399     /**
3400     Put a `Task` object on the back of the task queue.  The `Task`
3401     object may be passed by pointer or reference.
3402 
3403     Example:
3404     ---
3405     import std.file;
3406 
3407     // Create a task.
3408     auto t = task!read("foo.txt");
3409 
3410     // Add it to the queue to be executed.
3411     taskPool.put(t);
3412     ---
3413 
3414     Notes:
3415 
3416     @trusted overloads of this function are called for `Task`s if
3417     $(REF hasUnsharedAliasing, std,traits) is false for the `Task`'s
3418     return type or the function the `Task` executes is `pure`.
3419     `Task` objects that meet all other requirements specified in the
3420     `@trusted` overloads of `task` and `scopedTask` may be created
3421     and executed from `@safe` code via `Task.executeInNewThread` but
3422     not via `TaskPool`.
3423 
3424     While this function takes the address of variables that may
3425     be on the stack, some overloads are marked as @trusted.
3426     `Task` includes a destructor that waits for the task to complete
3427     before destroying the stack frame it is allocated on.  Therefore,
3428     it is impossible for the stack frame to be destroyed before the task is
3429     complete and no longer referenced by a `TaskPool`.
3430     */
3431     void put(alias fun, Args...)(ref Task!(fun, Args) task)
3432     if (!isSafeReturn!(typeof(task)))
3433     {
3434         task.pool = this;
3435         abstractPut(task.basePtr);
3436     }
3437 
3438     /// Ditto
3439     void put(alias fun, Args...)(Task!(fun, Args)* task)
3440     if (!isSafeReturn!(typeof(*task)))
3441     {
3442         import std.exception : enforce;
3443         enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3444         put(*task);
3445     }
3446 
3447     @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task)
3448     if (isSafeReturn!(typeof(task)))
3449     {
3450         task.pool = this;
3451         abstractPut(task.basePtr);
3452     }
3453 
3454     @trusted void put(alias fun, Args...)(Task!(fun, Args)* task)
3455     if (isSafeReturn!(typeof(*task)))
3456     {
3457         import std.exception : enforce;
3458         enforce(task !is null, "Cannot put a null Task on a TaskPool queue.");
3459         put(*task);
3460     }
3461 
3462     /**
3463     These properties control whether the worker threads are daemon threads.
3464     A daemon thread is automatically terminated when all non-daemon threads
3465     have terminated.  A non-daemon thread will prevent a program from
3466     terminating as long as it has not terminated.
3467 
3468     If any `TaskPool` with non-daemon threads is active, either `stop`
3469     or `finish` must be called on it before the program can terminate.
3470 
3471     The worker treads in the `TaskPool` instance returned by the
3472     `taskPool` property are daemon by default.  The worker threads of
3473     manually instantiated task pools are non-daemon by default.
3474 
3475     Note:  For a size zero pool, the getter arbitrarily returns true and the
3476            setter has no effect.
3477     */
3478     bool isDaemon() @property @trusted
3479     {
3480         queueLock();
3481         scope(exit) queueUnlock();
3482         return (size == 0) ? true : pool[0].isDaemon;
3483     }
3484 
3485     /// Ditto
3486     void isDaemon(bool newVal) @property @trusted
3487     {
3488         queueLock();
3489         scope(exit) queueUnlock();
3490         foreach (thread; pool)
3491         {
3492             thread.isDaemon = newVal;
3493         }
3494     }
3495 
3496     /**
3497     These functions allow getting and setting the OS scheduling priority of
3498     the worker threads in this `TaskPool`.  They forward to
3499     `core.thread.Thread.priority`, so a given priority value here means the
3500     same thing as an identical priority value in `core.thread`.
3501 
3502     Note:  For a size zero pool, the getter arbitrarily returns
3503            `core.thread.Thread.PRIORITY_MIN` and the setter has no effect.
3504     */
3505     int priority() @property @trusted
3506     {
3507         return (size == 0) ? core.thread.Thread.PRIORITY_MIN :
3508         pool[0].priority;
3509     }
3510 
3511     /// Ditto
3512     void priority(int newPriority) @property @trusted
3513     {
3514         if (size > 0)
3515         {
3516             foreach (t; pool)
3517             {
3518                 t.priority = newPriority;
3519             }
3520         }
3521     }
3522 }
3523 
3524 @system unittest
3525 {
3526     import std.algorithm.iteration : sum;
3527     import std.range : iota;
3528     import std.typecons : tuple;
3529 
3530     enum N = 100;
3531     auto r = iota(1, N + 1);
3532     const expected = r.sum();
3533 
3534     // Just the range
3535     assert(taskPool.fold!"a + b"(r) == expected);
3536 
3537     // Range and seeds
3538     assert(taskPool.fold!"a + b"(r, 0) == expected);
3539     assert(taskPool.fold!("a + b", "a + b")(r, 0, 0) == tuple(expected, expected));
3540 
3541     // Range, seeds, and work unit size
3542     assert(taskPool.fold!"a + b"(r, 0, 42) == expected);
3543     assert(taskPool.fold!("a + b", "a + b")(r, 0, 0, 42) == tuple(expected, expected));
3544 }
3545 
3546 // Issue 16705
3547 @system unittest
3548 {
3549     struct MyIota
3550     {
3551         size_t front;
3552         void popFront()(){front++;}
3553         auto empty(){return front >= 25;}
3554         auto opIndex(size_t i){return front+i;}
3555         auto length(){return 25-front;}
3556     }
3557 
3558     auto mySum = taskPool.reduce!"a + b"(MyIota());
3559 }
3560 
3561 /**
3562 Returns a lazily initialized global instantiation of `TaskPool`.
3563 This function can safely be called concurrently from multiple non-worker
3564 threads.  The worker threads in this pool are daemon threads, meaning that it
3565 is not necessary to call `TaskPool.stop` or `TaskPool.finish` before
3566 terminating the main thread.
3567 */
3568 @property TaskPool taskPool() @trusted
3569 {
3570     import std.concurrency : initOnce;
3571     __gshared TaskPool pool;
3572     return initOnce!pool({
3573         auto p = new TaskPool(defaultPoolThreads);
3574         p.isDaemon = true;
3575         return p;
3576     }());
3577 }
3578 
3579 private shared uint _defaultPoolThreads = uint.max;
3580 
3581 /**
3582 These properties get and set the number of worker threads in the `TaskPool`
3583 instance returned by `taskPool`.  The default value is `totalCPUs` - 1.
3584 Calling the setter after the first call to `taskPool` does not changes
3585 number of worker threads in the instance returned by `taskPool`.
3586 */
3587 @property uint defaultPoolThreads() @trusted
3588 {
3589     const local = atomicLoad(_defaultPoolThreads);
3590     return local < uint.max ? local : totalCPUs - 1;
3591 }
3592 
3593 /// Ditto
3594 @property void defaultPoolThreads(uint newVal) @trusted
3595 {
3596     atomicStore(_defaultPoolThreads, newVal);
3597 }
3598 
3599 /**
3600 Convenience functions that forwards to `taskPool.parallel`.  The
3601 purpose of these is to make parallel foreach less verbose and more
3602 readable.
3603 
3604 Example:
3605 ---
3606 // Find the logarithm of every number from
3607 // 1 to 1_000_000 in parallel, using the
3608 // default TaskPool instance.
3609 auto logs = new double[1_000_000];
3610 
3611 foreach (i, ref elem; parallel(logs))
3612 {
3613     elem = log(i + 1.0);
3614 }
3615 ---
3616 
3617 */
3618 ParallelForeach!R parallel(R)(R range)
3619 {
3620     return taskPool.parallel(range);
3621 }
3622 
3623 /// Ditto
3624 ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
3625 {
3626     return taskPool.parallel(range, workUnitSize);
3627 }
3628 
3629 //  `each` should be usable with parallel
3630 // https://issues.dlang.org/show_bug.cgi?id=17019
3631 @system unittest
3632 {
3633     import std.algorithm.iteration : each, sum;
3634     import std.range : iota;
3635 
3636     // check behavior with parallel
3637     auto arr = new int[10];
3638     parallel(arr).each!((ref e) => e += 1);
3639     assert(arr.sum == 10);
3640 
3641     auto arrIndex = new int[10];
3642     parallel(arrIndex).each!((i, ref e) => e += i);
3643     assert(arrIndex.sum == 10.iota.sum);
3644 }
3645 
3646 // https://issues.dlang.org/show_bug.cgi?id=22745
3647 @system unittest
3648 {
3649     auto pool = new TaskPool(0);
3650     int[] empty;
3651     foreach (i; pool.parallel(empty)) {}
3652     pool.finish();
3653 }
3654 
3655 // Thrown when a parallel foreach loop is broken from.
3656 class ParallelForeachError : Error
3657 {
3658     this()
3659     {
3660         super("Cannot break from a parallel foreach loop using break, return, "
3661               ~ "labeled break/continue or goto statements.");
3662     }
3663 }
3664 
3665 /*------Structs that implement opApply for parallel foreach.------------------*/
3666 private template randLen(R)
3667 {
3668     enum randLen = isRandomAccessRange!R && hasLength!R;
3669 }
3670 
3671 private void submitAndExecute(
3672     TaskPool pool,
3673     scope void delegate() doIt
3674 )
3675 {
3676     import core.exception : OutOfMemoryError;
3677     immutable nThreads = pool.size + 1;
3678 
3679     alias PTask = typeof(scopedTask(doIt));
3680     import core.stdc.stdlib : malloc, free;
3681     import core.stdc.string : memcpy;
3682 
3683     // The logical thing to do would be to just use alloca() here, but that
3684     // causes problems on Windows for reasons that I don't understand
3685     // (tentatively a compiler bug) and definitely doesn't work on Posix due
3686     // to https://issues.dlang.org/show_bug.cgi?id=3753.
3687     // Therefore, allocate a fixed buffer and fall back to `malloc()` if
3688     // someone's using a ridiculous amount of threads.
3689     // Also, the using a byte array instead of a PTask array as the fixed buffer
3690     // is to prevent d'tors from being called on uninitialized excess PTask
3691     // instances.
3692     enum nBuf = 64;
3693     byte[nBuf * PTask.sizeof] buf = void;
3694     PTask[] tasks;
3695     if (nThreads <= nBuf)
3696     {
3697         tasks = (cast(PTask*) buf.ptr)[0 .. nThreads];
3698     }
3699     else
3700     {
3701         auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof);
3702         if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism.");
3703         tasks = ptr[0 .. nThreads];
3704     }
3705 
3706     scope(exit)
3707     {
3708         if (nThreads > nBuf)
3709         {
3710             free(tasks.ptr);
3711         }
3712     }
3713 
3714     foreach (ref t; tasks)
3715     {
3716         import core.stdc.string : memcpy;
3717 
3718         // This silly looking code is necessary to prevent d'tors from being
3719         // called on uninitialized objects.
3720         auto temp = scopedTask(doIt);
3721         memcpy(&t, &temp, PTask.sizeof);
3722 
3723         // This has to be done to t after copying, not temp before copying.
3724         // Otherwise, temp's destructor will sit here and wait for the
3725         // task to finish.
3726         t.pool = pool;
3727     }
3728 
3729     foreach (i; 1 .. tasks.length - 1)
3730     {
3731         tasks[i].next = tasks[i + 1].basePtr;
3732         tasks[i + 1].prev = tasks[i].basePtr;
3733     }
3734 
3735     if (tasks.length > 1)
3736     {
3737         pool.queueLock();
3738         scope(exit) pool.queueUnlock();
3739 
3740         pool.abstractPutGroupNoSync(
3741             tasks[1].basePtr,
3742             tasks[$ - 1].basePtr
3743         );
3744     }
3745 
3746     if (tasks.length > 0)
3747     {
3748         try
3749         {
3750             tasks[0].job();
3751         }
3752         catch (Throwable e)
3753         {
3754             tasks[0].exception = e; // nocoverage
3755         }
3756         tasks[0].taskStatus = TaskStatus.done;
3757 
3758         // Try to execute each of these in the current thread
3759         foreach (ref task; tasks[1..$])
3760         {
3761             pool.tryDeleteExecute(task.basePtr);
3762         }
3763     }
3764 
3765     Throwable firstException;
3766 
3767     foreach (i, ref task; tasks)
3768     {
3769         try
3770         {
3771             task.yieldForce;
3772         }
3773         catch (Throwable e)
3774         {
3775             /* Chain e to front because order doesn't matter and because
3776              * e is not likely to be a chain itself (so fewer traversals)
3777              */
3778             firstException = Throwable.chainTogether(e, firstException);
3779             continue;
3780         }
3781     }
3782 
3783     if (firstException) throw firstException;
3784 }
3785 
3786 void foreachErr()
3787 {
3788     throw new ParallelForeachError();
3789 }
3790 
3791 int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg)
3792 {
3793     with(p)
3794     {
3795         int res = 0;
3796         size_t index = 0;
3797 
3798         // The explicit ElementType!R in the foreach loops is necessary for
3799         // correct behavior when iterating over strings.
3800         static if (hasLvalueElements!R)
3801         {
3802             foreach (ref ElementType!R elem; range)
3803             {
3804                 static if (Parameters!dg.length == 2)
3805                 {
3806                     res = dg(index, elem);
3807                 }
3808                 else
3809                 {
3810                     res = dg(elem);
3811                 }
3812                 if (res) break;
3813                 index++;
3814             }
3815         }
3816         else
3817         {
3818             foreach (ElementType!R elem; range)
3819             {
3820                 static if (Parameters!dg.length == 2)
3821                 {
3822                     res = dg(index, elem);
3823                 }
3824                 else
3825                 {
3826                     res = dg(elem);
3827                 }
3828                 if (res) break;
3829                 index++;
3830             }
3831         }
3832         if (res) foreachErr;
3833         return res;
3834     }
3835 }
3836 
3837 private enum string parallelApplyMixinRandomAccess = q{
3838     // Handle empty thread pool as special case.
3839     if (pool.size == 0)
3840     {
3841         return doSizeZeroCase(this, dg);
3842     }
3843 
3844     // Whether iteration is with or without an index variable.
3845     enum withIndex = Parameters!(typeof(dg)).length == 2;
3846 
3847     shared size_t workUnitIndex = size_t.max;  // Effectively -1:  chunkIndex + 1 == 0
3848     immutable len = range.length;
3849     if (!len) return 0;
3850 
3851     shared bool shouldContinue = true;
3852 
3853     void doIt()
3854     {
3855         import std.algorithm.comparison : min;
3856 
3857         scope(failure)
3858         {
3859             // If an exception is thrown, all threads should bail.
3860             atomicStore(shouldContinue, false);
3861         }
3862 
3863         while (atomicLoad(shouldContinue))
3864         {
3865             immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1);
3866             immutable start = workUnitSize * myUnitIndex;
3867             if (start >= len)
3868             {
3869                 atomicStore(shouldContinue, false);
3870                 break;
3871             }
3872 
3873             immutable end = min(len, start + workUnitSize);
3874 
3875             foreach (i; start .. end)
3876             {
3877                 static if (withIndex)
3878                 {
3879                     if (dg(i, range[i])) foreachErr();
3880                 }
3881                 else
3882                 {
3883                     if (dg(range[i])) foreachErr();
3884                 }
3885             }
3886         }
3887     }
3888 
3889     submitAndExecute(pool, &doIt);
3890 
3891     return 0;
3892 };
3893 
3894 enum string parallelApplyMixinInputRange = q{
3895     // Handle empty thread pool as special case.
3896     if (pool.size == 0)
3897     {
3898         return doSizeZeroCase(this, dg);
3899     }
3900 
3901     // Whether iteration is with or without an index variable.
3902     enum withIndex = Parameters!(typeof(dg)).length == 2;
3903 
3904     // This protects the range while copying it.
3905     auto rangeMutex = new Mutex();
3906 
3907     shared bool shouldContinue = true;
3908 
3909     // The total number of elements that have been popped off range.
3910     // This is updated only while protected by rangeMutex;
3911     size_t nPopped = 0;
3912 
3913     static if (
3914         is(typeof(range.buf1)) &&
3915         is(typeof(range.bufPos)) &&
3916         is(typeof(range.doBufSwap()))
3917     )
3918     {
3919         // Make sure we don't have the buffer recycling overload of
3920         // asyncBuf.
3921         static if (
3922             is(typeof(range.source)) &&
3923             isRoundRobin!(typeof(range.source))
3924         )
3925         {
3926             static assert(0, "Cannot execute a parallel foreach loop on " ~
3927             "the buffer recycling overload of asyncBuf.");
3928         }
3929 
3930         enum bool bufferTrick = true;
3931     }
3932     else
3933     {
3934         enum bool bufferTrick = false;
3935     }
3936 
3937     void doIt()
3938     {
3939         scope(failure)
3940         {
3941             // If an exception is thrown, all threads should bail.
3942             atomicStore(shouldContinue, false);
3943         }
3944 
3945         static if (hasLvalueElements!R)
3946         {
3947             alias Temp = ElementType!R*[];
3948             Temp temp;
3949 
3950             // Returns:  The previous value of nPopped.
3951             size_t makeTemp()
3952             {
3953                 import std.algorithm.internal : addressOf;
3954                 import std.array : uninitializedArray;
3955 
3956                 if (temp is null)
3957                 {
3958                     temp = uninitializedArray!Temp(workUnitSize);
3959                 }
3960 
3961                 rangeMutex.lock();
3962                 scope(exit) rangeMutex.unlock();
3963 
3964                 size_t i = 0;
3965                 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3966                 {
3967                     temp[i] = addressOf(range.front);
3968                 }
3969 
3970                 temp = temp[0 .. i];
3971                 auto ret = nPopped;
3972                 nPopped += temp.length;
3973                 return ret;
3974             }
3975 
3976         }
3977         else
3978         {
3979 
3980             alias Temp = ElementType!R[];
3981             Temp temp;
3982 
3983             // Returns:  The previous value of nPopped.
3984             static if (!bufferTrick) size_t makeTemp()
3985             {
3986                 import std.array : uninitializedArray;
3987 
3988                 if (temp is null)
3989                 {
3990                     temp = uninitializedArray!Temp(workUnitSize);
3991                 }
3992 
3993                 rangeMutex.lock();
3994                 scope(exit) rangeMutex.unlock();
3995 
3996                 size_t i = 0;
3997                 for (; i < workUnitSize && !range.empty; range.popFront(), i++)
3998                 {
3999                     temp[i] = range.front;
4000                 }
4001 
4002                 temp = temp[0 .. i];
4003                 auto ret = nPopped;
4004                 nPopped += temp.length;
4005                 return ret;
4006             }
4007 
4008             static if (bufferTrick) size_t makeTemp()
4009             {
4010                 import std.algorithm.mutation : swap;
4011                 rangeMutex.lock();
4012                 scope(exit) rangeMutex.unlock();
4013 
4014                 // Elide copying by just swapping buffers.
4015                 temp.length = range.buf1.length;
4016                 swap(range.buf1, temp);
4017 
4018                 // This is necessary in case popFront() has been called on
4019                 // range before entering the parallel foreach loop.
4020                 temp = temp[range.bufPos..$];
4021 
4022                 static if (is(typeof(range._length)))
4023                 {
4024                     range._length -= (temp.length - range.bufPos);
4025                 }
4026 
4027                 range.doBufSwap();
4028                 auto ret = nPopped;
4029                 nPopped += temp.length;
4030                 return ret;
4031             }
4032         }
4033 
4034         while (atomicLoad(shouldContinue))
4035         {
4036             auto overallIndex = makeTemp();
4037             if (temp.empty)
4038             {
4039                 atomicStore(shouldContinue, false);
4040                 break;
4041             }
4042 
4043             foreach (i; 0 .. temp.length)
4044             {
4045                 scope(success) overallIndex++;
4046 
4047                 static if (hasLvalueElements!R)
4048                 {
4049                     static if (withIndex)
4050                     {
4051                         if (dg(overallIndex, *temp[i])) foreachErr();
4052                     }
4053                     else
4054                     {
4055                         if (dg(*temp[i])) foreachErr();
4056                     }
4057                 }
4058                 else
4059                 {
4060                     static if (withIndex)
4061                     {
4062                         if (dg(overallIndex, temp[i])) foreachErr();
4063                     }
4064                     else
4065                     {
4066                         if (dg(temp[i])) foreachErr();
4067                     }
4068                 }
4069             }
4070         }
4071     }
4072 
4073     submitAndExecute(pool, &doIt);
4074 
4075     return 0;
4076 };
4077 
4078 
4079 private struct ParallelForeach(R)
4080 {
4081     TaskPool pool;
4082     R range;
4083     size_t workUnitSize;
4084     alias E = ElementType!R;
4085 
4086     static if (hasLvalueElements!R)
4087     {
4088         alias NoIndexDg = int delegate(ref E);
4089         alias IndexDg = int delegate(size_t, ref E);
4090     }
4091     else
4092     {
4093         alias NoIndexDg = int delegate(E);
4094         alias IndexDg = int delegate(size_t, E);
4095     }
4096 
4097     int opApply(scope NoIndexDg dg)
4098     {
4099         static if (randLen!R)
4100         {
4101             mixin(parallelApplyMixinRandomAccess);
4102         }
4103         else
4104         {
4105             mixin(parallelApplyMixinInputRange);
4106         }
4107     }
4108 
4109     int opApply(scope IndexDg dg)
4110     {
4111         static if (randLen!R)
4112         {
4113             mixin(parallelApplyMixinRandomAccess);
4114         }
4115         else
4116         {
4117             mixin(parallelApplyMixinInputRange);
4118         }
4119     }
4120 }
4121 
4122 /*
4123 This struct buffers the output of a callable that outputs data into a
4124 user-supplied buffer into a set of buffers of some fixed size.  It allows these
4125 buffers to be accessed with an input range interface.  This is used internally
4126 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an
4127 instance and forwards it to the input range overload of asyncBuf.
4128 */
4129 private struct RoundRobinBuffer(C1, C2)
4130 {
4131     // No need for constraints because they're already checked for in asyncBuf.
4132 
4133     alias Array = Parameters!(C1.init)[0];
4134     alias T = typeof(Array.init[0]);
4135 
4136     T[][] bufs;
4137     size_t index;
4138     C1 nextDel;
4139     C2 emptyDel;
4140     bool _empty;
4141     bool primed;
4142 
4143     this(
4144         C1 nextDel,
4145         C2 emptyDel,
4146         size_t initialBufSize,
4147         size_t nBuffers
4148     ) {
4149         this.nextDel = nextDel;
4150         this.emptyDel = emptyDel;
4151         bufs.length = nBuffers;
4152 
4153         foreach (ref buf; bufs)
4154         {
4155             buf.length = initialBufSize;
4156         }
4157     }
4158 
4159     void prime()
4160     in
4161     {
4162         assert(!empty);
4163     }
4164     do
4165     {
4166         scope(success) primed = true;
4167         nextDel(bufs[index]);
4168     }
4169 
4170 
4171     T[] front() @property
4172     in
4173     {
4174         assert(!empty);
4175     }
4176     do
4177     {
4178         if (!primed) prime();
4179         return bufs[index];
4180     }
4181 
4182     void popFront()
4183     {
4184         if (empty || emptyDel())
4185         {
4186             _empty = true;
4187             return;
4188         }
4189 
4190         index = (index + 1) % bufs.length;
4191         primed = false;
4192     }
4193 
4194     bool empty() @property const @safe pure nothrow
4195     {
4196         return _empty;
4197     }
4198 }
4199 
4200 version (StdUnittest)
4201 {
4202     // This was the only way I could get nested maps to work.
4203     private __gshared TaskPool poolInstance;
4204 }
4205 
4206 // These test basic functionality but don't stress test for threading bugs.
4207 // These are the tests that should be run every time Phobos is compiled.
4208 @system unittest
4209 {
4210     import std.algorithm.comparison : equal, min, max;
4211     import std.algorithm.iteration : filter, map, reduce;
4212     import std.array : split;
4213     import std.conv : text;
4214     import std.exception : assertThrown;
4215     import std.math.operations : isClose;
4216     import std.math.algebraic : sqrt, abs;
4217     import std.math.exponential : log;
4218     import std.range : indexed, iota, join;
4219     import std.typecons : Tuple, tuple;
4220     import std.stdio;
4221 
4222     poolInstance = new TaskPool(2);
4223     scope(exit) poolInstance.stop();
4224 
4225     // The only way this can be verified is manually.
4226     debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs);
4227 
4228     auto oldPriority = poolInstance.priority;
4229     poolInstance.priority = Thread.PRIORITY_MAX;
4230     assert(poolInstance.priority == Thread.PRIORITY_MAX);
4231 
4232     poolInstance.priority = Thread.PRIORITY_MIN;
4233     assert(poolInstance.priority == Thread.PRIORITY_MIN);
4234 
4235     poolInstance.priority = oldPriority;
4236     assert(poolInstance.priority == oldPriority);
4237 
4238     static void refFun(ref uint num)
4239     {
4240         num++;
4241     }
4242 
4243     uint x;
4244 
4245     // Test task().
4246     auto t = task!refFun(x);
4247     poolInstance.put(t);
4248     t.yieldForce;
4249     assert(t.args[0] == 1);
4250 
4251     auto t2 = task(&refFun, x);
4252     poolInstance.put(t2);
4253     t2.yieldForce;
4254     assert(t2.args[0] == 1);
4255 
4256     // Test scopedTask().
4257     auto st = scopedTask!refFun(x);
4258     poolInstance.put(st);
4259     st.yieldForce;
4260     assert(st.args[0] == 1);
4261 
4262     auto st2 = scopedTask(&refFun, x);
4263     poolInstance.put(st2);
4264     st2.yieldForce;
4265     assert(st2.args[0] == 1);
4266 
4267     // Test executeInNewThread().
4268     auto ct = scopedTask!refFun(x);
4269     ct.executeInNewThread(Thread.PRIORITY_MAX);
4270     ct.yieldForce;
4271     assert(ct.args[0] == 1);
4272 
4273     // Test ref return.
4274     uint toInc = 0;
4275     static ref T makeRef(T)(ref T num)
4276     {
4277         return num;
4278     }
4279 
4280     auto t3 = task!makeRef(toInc);
4281     taskPool.put(t3);
4282     assert(t3.args[0] == 0);
4283     t3.spinForce++;
4284     assert(t3.args[0] == 1);
4285 
4286     static void testSafe() @safe {
4287         static int bump(int num)
4288         {
4289             return num + 1;
4290         }
4291 
4292         auto safePool = new TaskPool(0);
4293         auto t = task(&bump, 1);
4294         taskPool.put(t);
4295         assert(t.yieldForce == 2);
4296 
4297         auto st = scopedTask(&bump, 1);
4298         taskPool.put(st);
4299         assert(st.yieldForce == 2);
4300         safePool.stop();
4301     }
4302 
4303     auto arr = [1,2,3,4,5];
4304     auto nums = new uint[5];
4305     auto nums2 = new uint[5];
4306 
4307     foreach (i, ref elem; poolInstance.parallel(arr))
4308     {
4309         elem++;
4310         nums[i] = cast(uint) i + 2;
4311         nums2[i] = elem;
4312     }
4313 
4314     assert(nums == [2,3,4,5,6], text(nums));
4315     assert(nums2 == nums, text(nums2));
4316     assert(arr == nums, text(arr));
4317 
4318     // Test const/immutable arguments.
4319     static int add(int lhs, int rhs)
4320     {
4321         return lhs + rhs;
4322     }
4323     immutable addLhs = 1;
4324     immutable addRhs = 2;
4325     auto addTask = task(&add, addLhs, addRhs);
4326     auto addScopedTask = scopedTask(&add, addLhs, addRhs);
4327     poolInstance.put(addTask);
4328     poolInstance.put(addScopedTask);
4329     assert(addTask.yieldForce == 3);
4330     assert(addScopedTask.yieldForce == 3);
4331 
4332     // Test parallel foreach with non-random access range.
4333     auto range = filter!"a != 666"([0, 1, 2, 3, 4]);
4334 
4335     foreach (i, elem; poolInstance.parallel(range))
4336     {
4337         nums[i] = cast(uint) i;
4338     }
4339 
4340     assert(nums == [0,1,2,3,4]);
4341 
4342     auto logs = new double[1_000_000];
4343     foreach (i, ref elem; poolInstance.parallel(logs))
4344     {
4345         elem = log(i + 1.0);
4346     }
4347 
4348     foreach (i, elem; logs)
4349     {
4350         assert(isClose(elem, log(double(i + 1))));
4351     }
4352 
4353     assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]);
4354     assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]);
4355     assert(poolInstance.amap!("a * a", "-a")([1,2,3]) ==
4356            [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4357 
4358     auto tupleBuf = new Tuple!(int, int)[3];
4359     poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf);
4360     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4361     poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf);
4362     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
4363 
4364     // Test amap with a non-array buffer.
4365     auto toIndex = new int[5];
4366     auto ind = indexed(toIndex, [3, 1, 4, 0, 2]);
4367     poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind);
4368     assert(equal(ind, [2, 4, 6, 8, 10]));
4369     assert(equal(toIndex, [8, 4, 10, 2, 6]));
4370     poolInstance.amap!"a / 2"(ind, ind);
4371     assert(equal(ind, [1, 2, 3, 4, 5]));
4372     assert(equal(toIndex, [4, 2, 5, 1, 3]));
4373 
4374     auto buf = new int[5];
4375     poolInstance.amap!"a * a"([1,2,3,4,5], buf);
4376     assert(buf == [1,4,9,16,25]);
4377     poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf);
4378     assert(buf == [1,4,9,16,25]);
4379 
4380     assert(poolInstance.reduce!"a + b"([1]) == 1);
4381     assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10);
4382     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10);
4383     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10);
4384     assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4));
4385     assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) ==
4386            tuple(10, 24));
4387 
4388     immutable serialAns = reduce!"a + b"(iota(1000));
4389     assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns);
4390     assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns);
4391 
4392     // Test worker-local storage.
4393     auto wl = poolInstance.workerLocalStorage(0);
4394     foreach (i; poolInstance.parallel(iota(1000), 1))
4395     {
4396         wl.get = wl.get + i;
4397     }
4398 
4399     auto wlRange = wl.toRange;
4400     auto parallelSum = poolInstance.reduce!"a + b"(wlRange);
4401     assert(parallelSum == 499500);
4402     assert(wlRange[0 .. 1][0] == wlRange[0]);
4403     assert(wlRange[1 .. 2][0] == wlRange[1]);
4404 
4405     // Test finish()
4406     {
4407         static void slowFun() { Thread.sleep(dur!"msecs"(1)); }
4408 
4409         auto pool1 = new TaskPool();
4410         auto tSlow = task!slowFun();
4411         pool1.put(tSlow);
4412         pool1.finish();
4413         tSlow.yieldForce;
4414         // Can't assert that pool1.status == PoolState.stopNow because status
4415         // doesn't change until after the "done" flag is set and the waiting
4416         // thread is woken up.
4417 
4418         auto pool2 = new TaskPool();
4419         auto tSlow2 = task!slowFun();
4420         pool2.put(tSlow2);
4421         pool2.finish(true); // blocking
4422         assert(tSlow2.done);
4423 
4424         // Test fix for https://issues.dlang.org/show_bug.cgi?id=8582 by making pool size zero.
4425         auto pool3 = new TaskPool(0);
4426         auto tSlow3 = task!slowFun();
4427         pool3.put(tSlow3);
4428         pool3.finish(true); // blocking
4429         assert(tSlow3.done);
4430 
4431         // This is correct because no thread will terminate unless pool2.status
4432         // and pool3.status have already been set to stopNow.
4433         assert(pool2.status == TaskPool.PoolState.stopNow);
4434         assert(pool3.status == TaskPool.PoolState.stopNow);
4435     }
4436 
4437     // Test default pool stuff.
4438     assert(taskPool.size == totalCPUs - 1);
4439 
4440     nums = new uint[1000];
4441     foreach (i; parallel(iota(1000)))
4442     {
4443         nums[i] = cast(uint) i;
4444     }
4445     assert(equal(nums, iota(1000)));
4446 
4447     assert(equal(
4448                poolInstance.map!"a * a"(iota(3_000_001), 10_000),
4449                map!"a * a"(iota(3_000_001))
4450            ));
4451 
4452     // The filter is to kill random access and test the non-random access
4453     // branch.
4454     assert(equal(
4455                poolInstance.map!"a * a"(
4456                    filter!"a == a"(iota(3_000_001)
4457                                   ), 10_000, 1000),
4458                map!"a * a"(iota(3_000_001))
4459            ));
4460 
4461     assert(
4462         reduce!"a + b"(0UL,
4463                        poolInstance.map!"a * a"(iota(300_001), 10_000)
4464                       ) ==
4465         reduce!"a + b"(0UL,
4466                        map!"a * a"(iota(300_001))
4467                       )
4468     );
4469 
4470     assert(equal(
4471                iota(1_000_002),
4472                poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002)))
4473            ));
4474 
4475     {
4476         import std.conv : to;
4477         import std.file : deleteme;
4478 
4479         string temp_file = deleteme ~ "-tempDelMe.txt";
4480         auto file = File(temp_file, "wb");
4481         scope(exit)
4482         {
4483             file.close();
4484             import std.file;
4485             remove(temp_file);
4486         }
4487 
4488         auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]];
4489         foreach (row; written)
4490         {
4491             file.writeln(join(to!(string[])(row), "\t"));
4492         }
4493 
4494         file = File(temp_file);
4495 
4496         void next(ref char[] buf)
4497         {
4498             file.readln(buf);
4499             import std.string : chomp;
4500             buf = chomp(buf);
4501         }
4502 
4503         double[][] read;
4504         auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
4505 
4506         foreach (line; asyncReader)
4507         {
4508             if (line.length == 0) continue;
4509             auto ls = line.split("\t");
4510             read ~= to!(double[])(ls);
4511         }
4512 
4513         assert(read == written);
4514         file.close();
4515     }
4516 
4517     // Test Map/AsyncBuf chaining.
4518 
4519     auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100);
4520     auto temp = poolInstance.map!sqrt(
4521                     abuf, 100, 5
4522                 );
4523     auto lmchain = poolInstance.map!"a * a"(temp, 100, 5);
4524     lmchain.popFront();
4525 
4526     int ii;
4527     foreach ( elem; (lmchain))
4528     {
4529         if (!isClose(elem, ii))
4530         {
4531             stderr.writeln(ii, '\t', elem);
4532         }
4533         ii++;
4534     }
4535 
4536     // Test buffer trick in parallel foreach.
4537     abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100);
4538     abuf.popFront();
4539     auto bufTrickTest = new size_t[abuf.length];
4540     foreach (i, elem; parallel(abuf))
4541     {
4542         bufTrickTest[i] = i;
4543     }
4544 
4545     assert(equal(iota(1_000_000), bufTrickTest));
4546 
4547     auto myTask = task!(abs)(-1);
4548     taskPool.put(myTask);
4549     assert(myTask.spinForce == 1);
4550 
4551     // Test that worker local storage from one pool receives an index of 0
4552     // when the index is queried w.r.t. another pool.  The only way to do this
4553     // is non-deterministically.
4554     foreach (i; parallel(iota(1000), 1))
4555     {
4556         assert(poolInstance.workerIndex == 0);
4557     }
4558 
4559     foreach (i; poolInstance.parallel(iota(1000), 1))
4560     {
4561         assert(taskPool.workerIndex == 0);
4562     }
4563 
4564     // Test exception handling.
4565     static void parallelForeachThrow()
4566     {
4567         foreach (elem; parallel(iota(10)))
4568         {
4569             throw new Exception("");
4570         }
4571     }
4572 
4573     assertThrown!Exception(parallelForeachThrow());
4574 
4575     static int reduceException(int a, int b)
4576     {
4577         throw new Exception("");
4578     }
4579 
4580     assertThrown!Exception(
4581         poolInstance.reduce!reduceException(iota(3))
4582     );
4583 
4584     static int mapException(int a)
4585     {
4586         throw new Exception("");
4587     }
4588 
4589     assertThrown!Exception(
4590         poolInstance.amap!mapException(iota(3))
4591     );
4592 
4593     static void mapThrow()
4594     {
4595         auto m = poolInstance.map!mapException(iota(3));
4596         m.popFront();
4597     }
4598 
4599     assertThrown!Exception(mapThrow());
4600 
4601     struct ThrowingRange
4602     {
4603         @property int front()
4604         {
4605             return 1;
4606         }
4607         void popFront()
4608         {
4609             throw new Exception("");
4610         }
4611         enum bool empty = false;
4612     }
4613 
4614     assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init));
4615 }
4616 
4617 //version = parallelismStressTest;
4618 
4619 // These are more like stress tests than real unit tests.  They print out
4620 // tons of stuff and should not be run every time make unittest is run.
4621 version (parallelismStressTest)
4622 {
4623     @system unittest
4624     {
4625         import std.stdio : stderr, writeln, readln;
4626         import std.range : iota;
4627         import std.algorithm.iteration : filter, reduce;
4628 
4629         size_t attempt;
4630         for (; attempt < 10; attempt++)
4631             foreach (poolSize; [0, 4])
4632         {
4633 
4634             poolInstance = new TaskPool(poolSize);
4635 
4636             uint[] numbers = new uint[1_000];
4637 
4638             foreach (i; poolInstance.parallel( iota(0, numbers.length)) )
4639             {
4640                 numbers[i] = cast(uint) i;
4641             }
4642 
4643             // Make sure it works.
4644             foreach (i; 0 .. numbers.length)
4645             {
4646                 assert(numbers[i] == i);
4647             }
4648 
4649             stderr.writeln("Done creating nums.");
4650 
4651 
4652             auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000));
4653             foreach (num; poolInstance.parallel(myNumbers))
4654             {
4655                 assert(num % 7 > 0 && num < 1000);
4656             }
4657             stderr.writeln("Done modulus test.");
4658 
4659             uint[] squares = poolInstance.amap!"a * a"(numbers, 100);
4660             assert(squares.length == numbers.length);
4661             foreach (i, number; numbers)
4662             {
4663                 assert(squares[i] == number * number);
4664             }
4665             stderr.writeln("Done squares.");
4666 
4667             auto sumFuture = task!( reduce!"a + b" )(numbers);
4668             poolInstance.put(sumFuture);
4669 
4670             ulong sumSquares = 0;
4671             foreach (elem; numbers)
4672             {
4673                 sumSquares += elem * elem;
4674             }
4675 
4676             uint mySum = sumFuture.spinForce();
4677             assert(mySum == 999 * 1000 / 2);
4678 
4679             auto mySumParallel = poolInstance.reduce!"a + b"(numbers);
4680             assert(mySum == mySumParallel);
4681             stderr.writeln("Done sums.");
4682 
4683             auto myTask = task(
4684             {
4685                 synchronized writeln("Our lives are parallel...Our lives are parallel.");
4686             });
4687             poolInstance.put(myTask);
4688 
4689             auto nestedOuter = "abcd";
4690             auto nestedInner =  iota(0, 10, 2);
4691 
4692             foreach (i, letter; poolInstance.parallel(nestedOuter, 1))
4693             {
4694                 foreach (j, number; poolInstance.parallel(nestedInner, 1))
4695                 {
4696                     synchronized writeln(i, ": ", letter, "  ", j, ": ", number);
4697                 }
4698             }
4699 
4700             poolInstance.stop();
4701         }
4702 
4703         assert(attempt == 10);
4704         writeln("Press enter to go to next round of unittests.");
4705         readln();
4706     }
4707 
4708     // These unittests are intended more for actual testing and not so much
4709     // as examples.
4710     @system unittest
4711     {
4712         import std.stdio : stderr;
4713         import std.range : iota;
4714         import std.algorithm.iteration : filter, reduce;
4715         import std.math.algebraic : sqrt;
4716         import std.math.operations : isClose;
4717         import std.math.traits : isNaN;
4718         import std.conv : text;
4719 
4720         foreach (attempt; 0 .. 10)
4721         foreach (poolSize; [0, 4])
4722         {
4723             poolInstance = new TaskPool(poolSize);
4724 
4725             // Test indexing.
4726             stderr.writeln("Creator Raw Index:  ", poolInstance.threadIndex);
4727             assert(poolInstance.workerIndex() == 0);
4728 
4729             // Test worker-local storage.
4730             auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1);
4731             foreach (i; poolInstance.parallel(iota(0U, 1_000_000)))
4732             {
4733                 workerLocalStorage.get++;
4734             }
4735             assert(reduce!"a + b"(workerLocalStorage.toRange) ==
4736             1_000_000 + poolInstance.size + 1);
4737 
4738             // Make sure work is reasonably balanced among threads.  This test is
4739             // non-deterministic and is more of a sanity check than something that
4740             // has an absolute pass/fail.
4741             shared(uint)[void*] nJobsByThread;
4742             foreach (thread; poolInstance.pool)
4743             {
4744                 nJobsByThread[cast(void*) thread] = 0;
4745             }
4746             nJobsByThread[ cast(void*) Thread.getThis()] = 0;
4747 
4748             foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 ))
4749             {
4750                 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1);
4751             }
4752 
4753             stderr.writeln("\nCurrent thread is:  ",
4754             cast(void*) Thread.getThis());
4755             stderr.writeln("Workload distribution:  ");
4756             foreach (k, v; nJobsByThread)
4757             {
4758                 stderr.writeln(k, '\t', v);
4759             }
4760 
4761             // Test whether amap can be nested.
4762             real[][] matrix = new real[][](1000, 1000);
4763             foreach (i; poolInstance.parallel( iota(0, matrix.length) ))
4764             {
4765                 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) ))
4766                 {
4767                     matrix[i][j] = i * j;
4768                 }
4769             }
4770 
4771             // Get around weird bugs having to do w/ sqrt being an intrinsic:
4772             static real mySqrt(real num)
4773             {
4774                 return sqrt(num);
4775             }
4776 
4777             static real[] parallelSqrt(real[] nums)
4778             {
4779                 return poolInstance.amap!mySqrt(nums);
4780             }
4781 
4782             real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix);
4783 
4784             foreach (i, row; sqrtMatrix)
4785             {
4786                 foreach (j, elem; row)
4787                 {
4788                     real shouldBe = sqrt( cast(real) i * j);
4789                     assert(isClose(shouldBe, elem));
4790                     sqrtMatrix[i][j] = shouldBe;
4791                 }
4792             }
4793 
4794             auto saySuccess = task(
4795             {
4796                 stderr.writeln(
4797                     "Success doing matrix stuff that involves nested pool use.");
4798             });
4799             poolInstance.put(saySuccess);
4800             saySuccess.workForce();
4801 
4802             // A more thorough test of amap, reduce:  Find the sum of the square roots of
4803             // matrix.
4804 
4805             static real parallelSum(real[] input)
4806             {
4807                 return poolInstance.reduce!"a + b"(input);
4808             }
4809 
4810             auto sumSqrt = poolInstance.reduce!"a + b"(
4811                                poolInstance.amap!parallelSum(
4812                                    sqrtMatrix
4813                                )
4814                            );
4815 
4816             assert(isClose(sumSqrt, 4.437e8, 1e-2));
4817             stderr.writeln("Done sum of square roots.");
4818 
4819             // Test whether tasks work with function pointers.
4820             /+ // This part is buggy and needs to be fixed...
4821             auto nanTask = task(&isNaN, 1.0L);
4822             poolInstance.put(nanTask);
4823             assert(nanTask.spinForce == false);
4824 
4825             if (poolInstance.size > 0)
4826             {
4827                 // Test work waiting.
4828                 static void uselessFun()
4829                 {
4830                     foreach (i; 0 .. 1_000_000) {}
4831                 }
4832 
4833                 auto uselessTasks = new typeof(task(&uselessFun))[1000];
4834                 foreach (ref uselessTask; uselessTasks)
4835                 {
4836                     uselessTask = task(&uselessFun);
4837                 }
4838                 foreach (ref uselessTask; uselessTasks)
4839                 {
4840                     poolInstance.put(uselessTask);
4841                 }
4842                 foreach (ref uselessTask; uselessTasks)
4843                 {
4844                     uselessTask.workForce();
4845                 }
4846             }
4847              +/
4848 
4849             // Test the case of non-random access + ref returns.
4850             int[] nums = [1,2,3,4,5];
4851             static struct RemoveRandom
4852             {
4853                 int[] arr;
4854 
4855                 ref int front()
4856                 {
4857                     return arr.front;
4858                 }
4859                 void popFront()
4860                 {
4861                     arr.popFront();
4862                 }
4863                 bool empty()
4864                 {
4865                     return arr.empty;
4866                 }
4867             }
4868 
4869             auto refRange = RemoveRandom(nums);
4870             foreach (ref elem; poolInstance.parallel(refRange))
4871             {
4872                 elem++;
4873             }
4874             assert(nums == [2,3,4,5,6], text(nums));
4875             stderr.writeln("Nums:  ", nums);
4876 
4877             poolInstance.stop();
4878         }
4879     }
4880 }
4881 
4882 @system unittest
4883 {
4884     static struct __S_12733
4885     {
4886         invariant() { assert(checksum == 1_234_567_890); }
4887         this(ulong u){n = u;}
4888         void opAssign(__S_12733 s){this.n = s.n;}
4889         ulong n;
4890         ulong checksum = 1_234_567_890;
4891     }
4892 
4893     static auto __genPair_12733(ulong n) { return __S_12733(n); }
4894     immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ];
4895 
4896     auto result = taskPool.amap!__genPair_12733(data);
4897 }
4898 
4899 @safe unittest
4900 {
4901     import std.range : iota;
4902 
4903     // this test was in std.range, but caused cycles.
4904     assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} }));
4905 }
4906 
4907 @safe unittest
4908 {
4909     import std.algorithm.iteration : each;
4910 
4911     long[] arr;
4912     static assert(is(typeof({
4913         arr.parallel.each!"a++";
4914     })));
4915 }
4916 
4917 // https://issues.dlang.org/show_bug.cgi?id=17539
4918 @system unittest
4919 {
4920     import std.random : rndGen;
4921     // ensure compilation
4922     try foreach (rnd; rndGen.parallel) break;
4923     catch (ParallelForeachError e) {}
4924 }