1 /** 2 `std.parallelism` implements high-level primitives for SMP parallelism. 3 These include parallel foreach, parallel reduce, parallel eager map, pipelining 4 and future/promise parallelism. `std.parallelism` is recommended when the 5 same operation is to be executed in parallel on different data, or when a 6 function is to be executed in a background thread and its result returned to a 7 well-defined main thread. For communication between arbitrary threads, see 8 `std.concurrency`. 9 10 `std.parallelism` is based on the concept of a `Task`. A `Task` is an 11 object that represents the fundamental unit of work in this library and may be 12 executed in parallel with any other `Task`. Using `Task` 13 directly allows programming with a future/promise paradigm. All other 14 supported parallelism paradigms (parallel foreach, map, reduce, pipelining) 15 represent an additional level of abstraction over `Task`. They 16 automatically create one or more `Task` objects, or closely related types 17 that are conceptually identical but not part of the public API. 18 19 After creation, a `Task` may be executed in a new thread, or submitted 20 to a `TaskPool` for execution. A `TaskPool` encapsulates a task queue 21 and its worker threads. Its purpose is to efficiently map a large 22 number of `Task`s onto a smaller number of threads. A task queue is a 23 FIFO queue of `Task` objects that have been submitted to the 24 `TaskPool` and are awaiting execution. A worker thread is a thread that 25 is associated with exactly one task queue. It executes the `Task` at the 26 front of its queue when the queue has work available, or sleeps when 27 no work is available. Each task queue is associated with zero or 28 more worker threads. If the result of a `Task` is needed before execution 29 by a worker thread has begun, the `Task` can be removed from the task queue 30 and executed immediately in the thread where the result is needed. 31 32 Warning: Unless marked as `@trusted` or `@safe`, artifacts in 33 this module allow implicit data sharing between threads and cannot 34 guarantee that client code is free from low level data races. 35 36 Source: $(PHOBOSSRC std/parallelism.d) 37 Author: David Simcha 38 Copyright: Copyright (c) 2009-2011, David Simcha. 39 License: $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0) 40 */ 41 module std.parallelism; 42 43 version (OSX) 44 version = Darwin; 45 else version (iOS) 46 version = Darwin; 47 else version (TVOS) 48 version = Darwin; 49 else version (WatchOS) 50 version = Darwin; 51 52 /// 53 @system unittest 54 { 55 import std.algorithm.iteration : map; 56 import std.math.operations : isClose; 57 import std.parallelism : taskPool; 58 import std.range : iota; 59 60 // Parallel reduce can be combined with 61 // std.algorithm.iteration.map to interesting effect. 62 // The following example (thanks to Russel Winder) 63 // calculates pi by quadrature using 64 // std.algorithm.map and TaskPool.reduce. 65 // getTerm is evaluated in parallel as needed by 66 // TaskPool.reduce. 67 // 68 // Timings on an Intel i5-3450 quad core machine 69 // for n = 1_000_000_000: 70 // 71 // TaskPool.reduce: 1.067 s 72 // std.algorithm.reduce: 4.011 s 73 74 enum n = 1_000_000; 75 enum delta = 1.0 / n; 76 77 alias getTerm = (int i) 78 { 79 immutable x = ( i - 0.5 ) * delta; 80 return delta / ( 1.0 + x * x ) ; 81 }; 82 83 immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm); 84 85 assert(pi.isClose(3.14159, 1e-5)); 86 } 87 88 import core.atomic; 89 import core.memory; 90 import core.sync.condition; 91 import core.thread; 92 93 import std.functional; 94 import std.meta; 95 import std.range.primitives; 96 import std.traits; 97 98 /* 99 (For now public undocumented with reserved name.) 100 101 A lazily initialized global constant. The underlying value is a shared global 102 statically initialized to `outOfBandValue` which must not be a legit value of 103 the constant. Upon the first call the situation is detected and the global is 104 initialized by calling `initializer`. The initializer is assumed to be pure 105 (even if not marked as such), i.e. return the same value upon repeated calls. 106 For that reason, no special precautions are taken so `initializer` may be called 107 more than one time leading to benign races on the cached value. 108 109 In the quiescent state the cost of the function is an atomic load from a global. 110 111 Params: 112 T = The type of the pseudo-constant (may be qualified) 113 outOfBandValue = A value that cannot be valid, it is used for initialization 114 initializer = The function performing initialization; must be `nothrow` 115 116 Returns: 117 The lazily initialized value 118 */ 119 @property pure 120 T __lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)() 121 if (is(Unqual!T : T) 122 && is(typeof(initializer()) : T) 123 && is(typeof(outOfBandValue) : T)) 124 { 125 static T impl() nothrow 126 { 127 // Thread-local cache 128 static Unqual!T tls = outOfBandValue; 129 auto local = tls; 130 // Shortest path, no atomic operations 131 if (local != outOfBandValue) return local; 132 // Process-level cache 133 static shared Unqual!T result = outOfBandValue; 134 // Initialize both process-level cache and tls 135 local = atomicLoad(result); 136 if (local == outOfBandValue) 137 { 138 local = initializer(); 139 atomicStore(result, local); 140 } 141 tls = local; 142 return local; 143 } 144 145 import std.traits : SetFunctionAttributes; 146 alias Fun = SetFunctionAttributes!(typeof(&impl), "D", 147 functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_); 148 auto purified = (() @trusted => cast(Fun) &impl)(); 149 return purified(); 150 } 151 152 // Returns the size of a cache line. 153 alias cacheLineSize = 154 __lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl); 155 156 private size_t cacheLineSizeImpl() @nogc nothrow @trusted 157 { 158 size_t result = 0; 159 import core.cpuid : datacache; 160 foreach (ref const cachelevel; datacache) 161 { 162 if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max) 163 { 164 result = cachelevel.lineSize; 165 } 166 } 167 return result; 168 } 169 170 @nogc @safe nothrow unittest 171 { 172 assert(cacheLineSize == cacheLineSizeImpl); 173 } 174 175 /* Atomics code. These forward to core.atomic, but are written like this 176 for two reasons: 177 178 1. They used to actually contain ASM code and I don' want to have to change 179 to directly calling core.atomic in a zillion different places. 180 181 2. core.atomic has some misc. issues that make my use cases difficult 182 without wrapping it. If I didn't wrap it, casts would be required 183 basically everywhere. 184 */ 185 private void atomicSetUbyte(T)(ref T stuff, T newVal) 186 if (__traits(isIntegral, T) && is(T : ubyte)) 187 { 188 //core.atomic.cas(cast(shared) &stuff, stuff, newVal); 189 atomicStore(*(cast(shared) &stuff), newVal); 190 } 191 192 private ubyte atomicReadUbyte(T)(ref T val) 193 if (__traits(isIntegral, T) && is(T : ubyte)) 194 { 195 return atomicLoad(*(cast(shared) &val)); 196 } 197 198 // This gets rid of the need for a lot of annoying casts in other parts of the 199 // code, when enums are involved. 200 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal) 201 if (__traits(isIntegral, T) && is(T : ubyte)) 202 { 203 return core.atomic.cas(cast(shared) &stuff, testVal, newVal); 204 } 205 206 /*--------------------- Generic helper functions, etc.------------------------*/ 207 private template MapType(R, functions...) 208 { 209 static assert(functions.length); 210 211 ElementType!R e = void; 212 alias MapType = 213 typeof(adjoin!(staticMap!(unaryFun, functions))(e)); 214 } 215 216 private template ReduceType(alias fun, R, E) 217 { 218 alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init)); 219 } 220 221 private template noUnsharedAliasing(T) 222 { 223 enum bool noUnsharedAliasing = !hasUnsharedAliasing!T; 224 } 225 226 // This template tests whether a function may be executed in parallel from 227 // @safe code via Task.executeInNewThread(). There is an additional 228 // requirement for executing it via a TaskPool. (See isSafeReturn). 229 private template isSafeTask(F) 230 { 231 enum bool isSafeTask = 232 (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 && 233 (functionAttributes!F & FunctionAttribute.ref_) == 0 && 234 (isFunctionPointer!F || !hasUnsharedAliasing!F) && 235 allSatisfy!(noUnsharedAliasing, Parameters!F); 236 } 237 238 @safe unittest 239 { 240 alias F1 = void function() @safe; 241 alias F2 = void function(); 242 alias F3 = void function(uint, string) @trusted; 243 alias F4 = void function(uint, char[]); 244 245 static assert( isSafeTask!F1); 246 static assert(!isSafeTask!F2); 247 static assert( isSafeTask!F3); 248 static assert(!isSafeTask!F4); 249 250 alias F5 = uint[] function(uint, string) pure @trusted; 251 static assert( isSafeTask!F5); 252 } 253 254 // This function decides whether Tasks that meet all of the other requirements 255 // for being executed from @safe code can be executed on a TaskPool. 256 // When executing via TaskPool, it's theoretically possible 257 // to return a value that is also pointed to by a worker thread's thread local 258 // storage. When executing from executeInNewThread(), the thread that executed 259 // the Task is terminated by the time the return value is visible in the calling 260 // thread, so this is a non-issue. It's also a non-issue for pure functions 261 // since they can't read global state. 262 private template isSafeReturn(T) 263 { 264 static if (!hasUnsharedAliasing!(T.ReturnType)) 265 { 266 enum isSafeReturn = true; 267 } 268 else static if (T.isPure) 269 { 270 enum isSafeReturn = true; 271 } 272 else 273 { 274 enum isSafeReturn = false; 275 } 276 } 277 278 private template randAssignable(R) 279 { 280 enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R; 281 } 282 283 private enum TaskStatus : ubyte 284 { 285 notStarted, 286 inProgress, 287 done 288 } 289 290 private template AliasReturn(alias fun, T...) 291 { 292 alias AliasReturn = typeof({ T args; return fun(args); }); 293 } 294 295 // Should be private, but std.algorithm.reduce is used in the zero-thread case 296 // and won't work w/ private. 297 template reduceAdjoin(functions...) 298 { 299 static if (functions.length == 1) 300 { 301 alias reduceAdjoin = binaryFun!(functions[0]); 302 } 303 else 304 { 305 T reduceAdjoin(T, U)(T lhs, U rhs) 306 { 307 alias funs = staticMap!(binaryFun, functions); 308 309 foreach (i, Unused; typeof(lhs.expand)) 310 { 311 lhs.expand[i] = funs[i](lhs.expand[i], rhs); 312 } 313 314 return lhs; 315 } 316 } 317 } 318 319 private template reduceFinish(functions...) 320 { 321 static if (functions.length == 1) 322 { 323 alias reduceFinish = binaryFun!(functions[0]); 324 } 325 else 326 { 327 T reduceFinish(T)(T lhs, T rhs) 328 { 329 alias funs = staticMap!(binaryFun, functions); 330 331 foreach (i, Unused; typeof(lhs.expand)) 332 { 333 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]); 334 } 335 336 return lhs; 337 } 338 } 339 } 340 341 private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2) 342 { 343 enum isRoundRobin = true; 344 } 345 346 private template isRoundRobin(T) 347 { 348 enum isRoundRobin = false; 349 } 350 351 @safe unittest 352 { 353 static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate()))); 354 static assert(!isRoundRobin!(uint)); 355 } 356 357 // This is the base "class" for all of the other tasks. Using C-style 358 // polymorphism to allow more direct control over memory allocation, etc. 359 private struct AbstractTask 360 { 361 AbstractTask* prev; 362 AbstractTask* next; 363 364 // Pointer to a function that executes this task. 365 void function(void*) runTask; 366 367 Throwable exception; 368 ubyte taskStatus = TaskStatus.notStarted; 369 370 bool done() @property 371 { 372 if (atomicReadUbyte(taskStatus) == TaskStatus.done) 373 { 374 if (exception) 375 { 376 throw exception; 377 } 378 379 return true; 380 } 381 382 return false; 383 } 384 385 void job() 386 { 387 runTask(&this); 388 } 389 } 390 391 /** 392 `Task` represents the fundamental unit of work. A `Task` may be 393 executed in parallel with any other `Task`. Using this struct directly 394 allows future/promise parallelism. In this paradigm, a function (or delegate 395 or other callable) is executed in a thread other than the one it was called 396 from. The calling thread does not block while the function is being executed. 397 A call to `workForce`, `yieldForce`, or `spinForce` is used to 398 ensure that the `Task` has finished executing and to obtain the return 399 value, if any. These functions and `done` also act as full memory barriers, 400 meaning that any memory writes made in the thread that executed the `Task` 401 are guaranteed to be visible in the calling thread after one of these functions 402 returns. 403 404 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can 405 be used to create an instance of this struct. See `task` for usage examples. 406 407 Function results are returned from `yieldForce`, `spinForce` and 408 `workForce` by ref. If `fun` returns by ref, the reference will point 409 to the returned reference of `fun`. Otherwise it will point to a 410 field in this struct. 411 412 Copying of this struct is disabled, since it would provide no useful semantics. 413 If you want to pass this struct around, you should do so by reference or 414 pointer. 415 416 Bugs: Changes to `ref` and `out` arguments are not propagated to the 417 call site, only to `args` in this struct. 418 */ 419 struct Task(alias fun, Args...) 420 { 421 @(imported!"core.attribute".mutableRefInit) private AbstractTask base = {runTask : &impl}; 422 private alias base this; 423 424 private @property AbstractTask* basePtr() 425 { 426 return &base; 427 } 428 429 private static void impl(void* myTask) 430 { 431 import std.algorithm.internal : addressOf; 432 433 Task* myCastedTask = cast(typeof(this)*) myTask; 434 static if (is(ReturnType == void)) 435 { 436 fun(myCastedTask._args); 437 } 438 else static if (is(typeof(&(fun(myCastedTask._args))))) 439 { 440 myCastedTask.returnVal = addressOf(fun(myCastedTask._args)); 441 } 442 else 443 { 444 myCastedTask.returnVal = fun(myCastedTask._args); 445 } 446 } 447 448 private TaskPool pool; 449 private bool isScoped; // True if created with scopedTask. 450 451 Args _args; 452 453 /** 454 The arguments the function was called with. Changes to `out` and 455 `ref` arguments will be visible here. 456 */ 457 static if (__traits(isSame, fun, run)) 458 { 459 alias args = _args[1..$]; 460 } 461 else 462 { 463 alias args = _args; 464 } 465 466 467 // The purpose of this code is to decide whether functions whose 468 // return values have unshared aliasing can be executed via 469 // TaskPool from @safe code. See isSafeReturn. 470 static if (__traits(isSame, fun, run)) 471 { 472 static if (isFunctionPointer!(_args[0])) 473 { 474 private enum bool isPure = 475 (functionAttributes!(Args[0]) & FunctionAttribute.pure_) != 0; 476 } 477 else 478 { 479 // BUG: Should check this for delegates too, but std.traits 480 // apparently doesn't allow this. isPure is irrelevant 481 // for delegates, at least for now since shared delegates 482 // don't work. 483 private enum bool isPure = false; 484 } 485 486 } 487 else 488 { 489 // We already know that we can't execute aliases in @safe code, so 490 // just put a dummy value here. 491 private enum bool isPure = false; 492 } 493 494 495 /** 496 The return type of the function called by this `Task`. This can be 497 `void`. 498 */ 499 alias ReturnType = typeof(fun(_args)); 500 501 static if (!is(ReturnType == void)) 502 { 503 static if (is(typeof(&fun(_args)))) 504 { 505 // Ref return. 506 ReturnType* returnVal; 507 508 ref ReturnType fixRef(ReturnType* val) 509 { 510 return *val; 511 } 512 513 } 514 else 515 { 516 ReturnType returnVal; 517 518 ref ReturnType fixRef(ref ReturnType val) 519 { 520 return val; 521 } 522 } 523 } 524 525 private void enforcePool() 526 { 527 import std.exception : enforce; 528 enforce(this.pool !is null, "Job not submitted yet."); 529 } 530 531 static if (Args.length > 0) 532 { 533 private this(Args args) 534 { 535 _args = args; 536 } 537 } 538 539 // Work around DMD bug https://issues.dlang.org/show_bug.cgi?id=6588, 540 // allow immutable elements. 541 static if (allSatisfy!(isAssignable, Args)) 542 { 543 typeof(this) opAssign(typeof(this) rhs) 544 { 545 foreach (i, Type; typeof(this.tupleof)) 546 { 547 this.tupleof[i] = rhs.tupleof[i]; 548 } 549 return this; 550 } 551 } 552 else 553 { 554 @disable typeof(this) opAssign(typeof(this) rhs); 555 } 556 557 /** 558 If the `Task` isn't started yet, execute it in the current thread. 559 If it's done, return its return value, if any. If it's in progress, 560 busy spin until it's done, then return the return value. If it threw 561 an exception, rethrow that exception. 562 563 This function should be used when you expect the result of the 564 `Task` to be available on a timescale shorter than that of an OS 565 context switch. 566 */ 567 @property ref ReturnType spinForce() @trusted 568 { 569 enforcePool(); 570 571 this.pool.tryDeleteExecute(basePtr); 572 573 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {} 574 575 if (exception) 576 { 577 throw exception; 578 } 579 580 static if (!is(ReturnType == void)) 581 { 582 return fixRef(this.returnVal); 583 } 584 } 585 586 /** 587 If the `Task` isn't started yet, execute it in the current thread. 588 If it's done, return its return value, if any. If it's in progress, 589 wait on a condition variable. If it threw an exception, rethrow that 590 exception. 591 592 This function should be used for expensive functions, as waiting on a 593 condition variable introduces latency, but avoids wasted CPU cycles. 594 */ 595 @property ref ReturnType yieldForce() @trusted 596 { 597 enforcePool(); 598 this.pool.tryDeleteExecute(basePtr); 599 600 if (done) 601 { 602 static if (is(ReturnType == void)) 603 { 604 return; 605 } 606 else 607 { 608 return fixRef(this.returnVal); 609 } 610 } 611 612 pool.waiterLock(); 613 scope(exit) pool.waiterUnlock(); 614 615 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) 616 { 617 pool.waitUntilCompletion(); 618 } 619 620 if (exception) 621 { 622 throw exception; // nocoverage 623 } 624 625 static if (!is(ReturnType == void)) 626 { 627 return fixRef(this.returnVal); 628 } 629 } 630 631 /** 632 If this `Task` was not started yet, execute it in the current 633 thread. If it is finished, return its result. If it is in progress, 634 execute any other `Task` from the `TaskPool` instance that 635 this `Task` was submitted to until this one 636 is finished. If it threw an exception, rethrow that exception. 637 If no other tasks are available or this `Task` was executed using 638 `executeInNewThread`, wait on a condition variable. 639 */ 640 @property ref ReturnType workForce() @trusted 641 { 642 enforcePool(); 643 this.pool.tryDeleteExecute(basePtr); 644 645 while (true) 646 { 647 if (done) // done() implicitly checks for exceptions. 648 { 649 static if (is(ReturnType == void)) 650 { 651 return; 652 } 653 else 654 { 655 return fixRef(this.returnVal); 656 } 657 } 658 659 AbstractTask* job; 660 { 661 // Locking explicitly and calling popNoSync() because 662 // pop() waits on a condition variable if there are no Tasks 663 // in the queue. 664 665 pool.queueLock(); 666 scope(exit) pool.queueUnlock(); 667 job = pool.popNoSync(); 668 } 669 670 671 if (job !is null) 672 { 673 674 version (verboseUnittest) 675 { 676 stderr.writeln("Doing workForce work."); 677 } 678 679 pool.doJob(job); 680 681 if (done) 682 { 683 static if (is(ReturnType == void)) 684 { 685 return; 686 } 687 else 688 { 689 return fixRef(this.returnVal); 690 } 691 } 692 } 693 else 694 { 695 version (verboseUnittest) 696 { 697 stderr.writeln("Yield from workForce."); 698 } 699 700 return yieldForce; 701 } 702 } 703 } 704 705 /** 706 Returns `true` if the `Task` is finished executing. 707 708 Throws: Rethrows any exception thrown during the execution of the 709 `Task`. 710 */ 711 @property bool done() @trusted 712 { 713 // Explicitly forwarded for documentation purposes. 714 return base.done; 715 } 716 717 /** 718 Create a new thread for executing this `Task`, execute it in the 719 newly created thread, then terminate the thread. This can be used for 720 future/promise parallelism. An explicit priority may be given 721 to the `Task`. If one is provided, its value is forwarded to 722 `core.thread.Thread.priority`. See $(REF task, std,parallelism) for 723 usage example. 724 */ 725 void executeInNewThread() @trusted 726 { 727 pool = new TaskPool(basePtr); 728 } 729 730 /// Ditto 731 void executeInNewThread(int priority) @trusted 732 { 733 pool = new TaskPool(basePtr, priority); 734 } 735 736 @safe ~this() 737 { 738 if (isScoped && pool !is null && taskStatus != TaskStatus.done) 739 { 740 yieldForce; 741 } 742 } 743 744 // When this is uncommented, it somehow gets called on returning from 745 // scopedTask even though the struct shouldn't be getting copied. 746 //@disable this(this) {} 747 } 748 749 // Calls `fpOrDelegate` with `args`. This is an 750 // adapter that makes `Task` work with delegates, function pointers and 751 // functors instead of just aliases. 752 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args) 753 { 754 return fpOrDelegate(args); 755 } 756 757 /** 758 Creates a `Task` on the GC heap that calls an alias. This may be executed 759 via `Task.executeInNewThread` or by submitting to a 760 $(REF TaskPool, std,parallelism). A globally accessible instance of 761 `TaskPool` is provided by $(REF taskPool, std,parallelism). 762 763 Returns: A pointer to the `Task`. 764 765 Example: 766 --- 767 // Read two files into memory at the same time. 768 import std.file; 769 770 void main() 771 { 772 // Create and execute a Task for reading 773 // foo.txt. 774 auto file1Task = task!read("foo.txt"); 775 file1Task.executeInNewThread(); 776 777 // Read bar.txt in parallel. 778 auto file2Data = read("bar.txt"); 779 780 // Get the results of reading foo.txt. 781 auto file1Data = file1Task.yieldForce; 782 } 783 --- 784 785 --- 786 // Sorts an array using a parallel quick sort algorithm. 787 // The first partition is done serially. Both recursion 788 // branches are then executed in parallel. 789 // 790 // Timings for sorting an array of 1,000,000 doubles on 791 // an Athlon 64 X2 dual core machine: 792 // 793 // This implementation: 176 milliseconds. 794 // Equivalent serial implementation: 280 milliseconds 795 void parallelSort(T)(T[] data) 796 { 797 // Sort small subarrays serially. 798 if (data.length < 100) 799 { 800 std.algorithm.sort(data); 801 return; 802 } 803 804 // Partition the array. 805 swap(data[$ / 2], data[$ - 1]); 806 auto pivot = data[$ - 1]; 807 bool lessThanPivot(T elem) { return elem < pivot; } 808 809 auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]); 810 swap(data[$ - greaterEqual.length - 1], data[$ - 1]); 811 812 auto less = data[0..$ - greaterEqual.length - 1]; 813 greaterEqual = data[$ - greaterEqual.length..$]; 814 815 // Execute both recursion branches in parallel. 816 auto recurseTask = task!parallelSort(greaterEqual); 817 taskPool.put(recurseTask); 818 parallelSort(less); 819 recurseTask.yieldForce; 820 } 821 --- 822 */ 823 auto task(alias fun, Args...)(Args args) 824 { 825 return new Task!(fun, Args)(args); 826 } 827 828 /** 829 Creates a `Task` on the GC heap that calls a function pointer, delegate, or 830 class/struct with overloaded opCall. 831 832 Example: 833 --- 834 // Read two files in at the same time again, 835 // but this time use a function pointer instead 836 // of an alias to represent std.file.read. 837 import std.file; 838 839 void main() 840 { 841 // Create and execute a Task for reading 842 // foo.txt. 843 auto file1Task = task(&read!string, "foo.txt", size_t.max); 844 file1Task.executeInNewThread(); 845 846 // Read bar.txt in parallel. 847 auto file2Data = read("bar.txt"); 848 849 // Get the results of reading foo.txt. 850 auto file1Data = file1Task.yieldForce; 851 } 852 --- 853 854 Notes: This function takes a non-scope delegate, meaning it can be 855 used with closures. If you can't allocate a closure due to objects 856 on the stack that have scoped destruction, see `scopedTask`, which 857 takes a scope delegate. 858 */ 859 auto task(F, Args...)(F delegateOrFp, Args args) 860 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) 861 { 862 return new Task!(run, F, Args)(delegateOrFp, args); 863 } 864 865 /** 866 Version of `task` usable from `@safe` code. Usage mechanics are 867 identical to the non-@safe case, but safety introduces some restrictions: 868 869 1. `fun` must be @safe or @trusted. 870 871 2. `F` must not have any unshared aliasing as defined by 872 $(REF hasUnsharedAliasing, std,traits). This means it 873 may not be an unshared delegate or a non-shared class or struct 874 with overloaded `opCall`. This also precludes accepting template 875 alias parameters. 876 877 3. `Args` must not have unshared aliasing. 878 879 4. `fun` must not return by reference. 880 881 5. The return type must not have unshared aliasing unless `fun` is 882 `pure` or the `Task` is executed via `executeInNewThread` instead 883 of using a `TaskPool`. 884 885 */ 886 @trusted auto task(F, Args...)(F fun, Args args) 887 if (is(typeof(fun(args))) && isSafeTask!F) 888 { 889 return new Task!(run, F, Args)(fun, args); 890 } 891 892 /** 893 These functions allow the creation of `Task` objects on the stack rather 894 than the GC heap. The lifetime of a `Task` created by `scopedTask` 895 cannot exceed the lifetime of the scope it was created in. 896 897 `scopedTask` might be preferred over `task`: 898 899 1. When a `Task` that calls a delegate is being created and a closure 900 cannot be allocated due to objects on the stack that have scoped 901 destruction. The delegate overload of `scopedTask` takes a `scope` 902 delegate. 903 904 2. As a micro-optimization, to avoid the heap allocation associated with 905 `task` or with the creation of a closure. 906 907 Usage is otherwise identical to `task`. 908 909 Notes: `Task` objects created using `scopedTask` will automatically 910 call `Task.yieldForce` in their destructor if necessary to ensure 911 the `Task` is complete before the stack frame they reside on is destroyed. 912 */ 913 auto scopedTask(alias fun, Args...)(Args args) 914 { 915 auto ret = Task!(fun, Args)(args); 916 ret.isScoped = true; 917 return ret; 918 } 919 920 /// Ditto 921 auto scopedTask(F, Args...)(scope F delegateOrFp, Args args) 922 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) 923 { 924 auto ret = Task!(run, F, Args)(delegateOrFp, args); 925 ret.isScoped = true; 926 return ret; 927 } 928 929 /// Ditto 930 @trusted auto scopedTask(F, Args...)(F fun, Args args) 931 if (is(typeof(fun(args))) && isSafeTask!F) 932 { 933 auto ret = Task!(run, F, Args)(fun, args); 934 ret.isScoped = true; 935 return ret; 936 } 937 938 /** 939 The total number of CPU cores available on the current machine, as reported by 940 the operating system. 941 */ 942 alias totalCPUs = 943 __lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl); 944 945 uint totalCPUsImpl() @nogc nothrow @trusted 946 { 947 version (Windows) 948 { 949 // BUGS: Only works on Windows 2000 and above. 950 import core.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo; 951 import std.algorithm.comparison : max; 952 SYSTEM_INFO si; 953 GetSystemInfo(&si); 954 return max(1, cast(uint) si.dwNumberOfProcessors); 955 } 956 else version (linux) 957 { 958 import core.stdc.stdlib : calloc; 959 import core.stdc.string : memset; 960 import core.sys.linux.sched : CPU_ALLOC_SIZE, CPU_FREE, CPU_COUNT, CPU_COUNT_S, cpu_set_t, sched_getaffinity; 961 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 962 963 int count = 0; 964 965 /** 966 * According to ruby's source code, CPU_ALLOC() doesn't work as expected. 967 * see: https://github.com/ruby/ruby/commit/7d9e04de496915dd9e4544ee18b1a0026dc79242 968 * 969 * The hardcode number also comes from ruby's source code. 970 * see: https://github.com/ruby/ruby/commit/0fa75e813ecf0f5f3dd01f89aa76d0e25ab4fcd4 971 */ 972 for (int n = 64; n <= 16384; n *= 2) 973 { 974 size_t size = CPU_ALLOC_SIZE(count); 975 if (size >= 0x400) 976 { 977 auto cpuset = cast(cpu_set_t*) calloc(1, size); 978 if (cpuset is null) break; 979 if (sched_getaffinity(0, size, cpuset) == 0) 980 { 981 count = CPU_COUNT_S(size, cpuset); 982 } 983 CPU_FREE(cpuset); 984 } 985 else 986 { 987 cpu_set_t cpuset; 988 if (sched_getaffinity(0, cpu_set_t.sizeof, &cpuset) == 0) 989 { 990 count = CPU_COUNT(&cpuset); 991 } 992 } 993 994 if (count > 0) 995 return cast(uint) count; 996 } 997 998 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 999 } 1000 else version (Darwin) 1001 { 1002 import core.sys.darwin.sys.sysctl : sysctlbyname; 1003 uint result; 1004 size_t len = result.sizeof; 1005 sysctlbyname("hw.physicalcpu", &result, &len, null, 0); 1006 return result; 1007 } 1008 else version (DragonFlyBSD) 1009 { 1010 import core.sys.dragonflybsd.sys.sysctl : sysctlbyname; 1011 uint result; 1012 size_t len = result.sizeof; 1013 sysctlbyname("hw.ncpu", &result, &len, null, 0); 1014 return result; 1015 } 1016 else version (FreeBSD) 1017 { 1018 import core.sys.freebsd.sys.sysctl : sysctlbyname; 1019 uint result; 1020 size_t len = result.sizeof; 1021 sysctlbyname("hw.ncpu", &result, &len, null, 0); 1022 return result; 1023 } 1024 else version (NetBSD) 1025 { 1026 import core.sys.netbsd.sys.sysctl : sysctlbyname; 1027 uint result; 1028 size_t len = result.sizeof; 1029 sysctlbyname("hw.ncpu", &result, &len, null, 0); 1030 return result; 1031 } 1032 else version (Solaris) 1033 { 1034 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 1035 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1036 } 1037 else version (OpenBSD) 1038 { 1039 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 1040 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1041 } 1042 else version (Hurd) 1043 { 1044 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 1045 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1046 } 1047 else 1048 { 1049 static assert(0, "Don't know how to get N CPUs on this OS."); 1050 } 1051 } 1052 1053 /* 1054 This class serves two purposes: 1055 1056 1. It distinguishes std.parallelism threads from other threads so that 1057 the std.parallelism daemon threads can be terminated. 1058 1059 2. It adds a reference to the pool that the thread is a member of, 1060 which is also necessary to allow the daemon threads to be properly 1061 terminated. 1062 */ 1063 private final class ParallelismThread : Thread 1064 { 1065 this(void delegate() dg) 1066 { 1067 super(dg); 1068 } 1069 1070 TaskPool pool; 1071 } 1072 1073 // Kill daemon threads. 1074 shared static ~this() 1075 { 1076 foreach (ref thread; Thread) 1077 { 1078 auto pthread = cast(ParallelismThread) thread; 1079 if (pthread is null) continue; 1080 auto pool = pthread.pool; 1081 if (!pool.isDaemon) continue; 1082 pool.stop(); 1083 pthread.join(); 1084 } 1085 } 1086 1087 /** 1088 This class encapsulates a task queue and a set of worker threads. Its purpose 1089 is to efficiently map a large number of `Task`s onto a smaller number of 1090 threads. A task queue is a FIFO queue of `Task` objects that have been 1091 submitted to the `TaskPool` and are awaiting execution. A worker thread is a 1092 thread that executes the `Task` at the front of the queue when one is 1093 available and sleeps when the queue is empty. 1094 1095 This class should usually be used via the global instantiation 1096 available via the $(REF taskPool, std,parallelism) property. 1097 Occasionally it is useful to explicitly instantiate a `TaskPool`: 1098 1099 1. When you want `TaskPool` instances with multiple priorities, for example 1100 a low priority pool and a high priority pool. 1101 1102 2. When the threads in the global task pool are waiting on a synchronization 1103 primitive (for example a mutex), and you want to parallelize the code that 1104 needs to run before these threads can be resumed. 1105 1106 Note: The worker threads in this pool will not stop until 1107 `stop` or `finish` is called, even if the main thread 1108 has finished already. This may lead to programs that 1109 never end. If you do not want this behaviour, you can set `isDaemon` 1110 to true. 1111 */ 1112 final class TaskPool 1113 { 1114 private: 1115 1116 // A pool can either be a regular pool or a single-task pool. A 1117 // single-task pool is a dummy pool that's fired up for 1118 // Task.executeInNewThread(). 1119 bool isSingleTask; 1120 1121 ParallelismThread[] pool; 1122 Thread singleTaskThread; 1123 1124 AbstractTask* head; 1125 AbstractTask* tail; 1126 PoolState status = PoolState.running; 1127 Condition workerCondition; 1128 Condition waiterCondition; 1129 Mutex queueMutex; 1130 Mutex waiterMutex; // For waiterCondition 1131 1132 // The instanceStartIndex of the next instance that will be created. 1133 __gshared size_t nextInstanceIndex = 1; 1134 1135 // The index of the current thread. 1136 static size_t threadIndex; 1137 1138 // The index of the first thread in this instance. 1139 immutable size_t instanceStartIndex; 1140 1141 // The index that the next thread to be initialized in this pool will have. 1142 size_t nextThreadIndex; 1143 1144 enum PoolState : ubyte 1145 { 1146 running, 1147 finishing, 1148 stopNow 1149 } 1150 1151 void doJob(AbstractTask* job) 1152 { 1153 assert(job.taskStatus == TaskStatus.inProgress); 1154 assert(job.next is null); 1155 assert(job.prev is null); 1156 1157 scope(exit) 1158 { 1159 if (!isSingleTask) 1160 { 1161 waiterLock(); 1162 scope(exit) waiterUnlock(); 1163 notifyWaiters(); 1164 } 1165 } 1166 1167 try 1168 { 1169 job.job(); 1170 } 1171 catch (Throwable e) 1172 { 1173 job.exception = e; 1174 } 1175 1176 atomicSetUbyte(job.taskStatus, TaskStatus.done); 1177 } 1178 1179 // This function is used for dummy pools created by Task.executeInNewThread(). 1180 void doSingleTask() 1181 { 1182 // No synchronization. Pool is guaranteed to only have one thread, 1183 // and the queue is submitted to before this thread is created. 1184 assert(head); 1185 auto t = head; 1186 t.next = t.prev = head = null; 1187 doJob(t); 1188 } 1189 1190 // This function performs initialization for each thread that affects 1191 // thread local storage and therefore must be done from within the 1192 // worker thread. It then calls executeWorkLoop(). 1193 void startWorkLoop() 1194 { 1195 // Initialize thread index. 1196 { 1197 queueLock(); 1198 scope(exit) queueUnlock(); 1199 threadIndex = nextThreadIndex; 1200 nextThreadIndex++; 1201 } 1202 1203 executeWorkLoop(); 1204 } 1205 1206 // This is the main work loop that worker threads spend their time in 1207 // until they terminate. It's also entered by non-worker threads when 1208 // finish() is called with the blocking variable set to true. 1209 void executeWorkLoop() 1210 { 1211 while (atomicReadUbyte(status) != PoolState.stopNow) 1212 { 1213 AbstractTask* task = pop(); 1214 if (task is null) 1215 { 1216 if (atomicReadUbyte(status) == PoolState.finishing) 1217 { 1218 atomicSetUbyte(status, PoolState.stopNow); 1219 return; 1220 } 1221 } 1222 else 1223 { 1224 doJob(task); 1225 } 1226 } 1227 } 1228 1229 // Pop a task off the queue. 1230 AbstractTask* pop() 1231 { 1232 queueLock(); 1233 scope(exit) queueUnlock(); 1234 auto ret = popNoSync(); 1235 while (ret is null && status == PoolState.running) 1236 { 1237 wait(); 1238 ret = popNoSync(); 1239 } 1240 return ret; 1241 } 1242 1243 AbstractTask* popNoSync() 1244 out(returned) 1245 { 1246 /* If task.prev and task.next aren't null, then another thread 1247 * can try to delete this task from the pool after it's 1248 * alreadly been deleted/popped. 1249 */ 1250 if (returned !is null) 1251 { 1252 assert(returned.next is null); 1253 assert(returned.prev is null); 1254 } 1255 } 1256 do 1257 { 1258 if (isSingleTask) return null; 1259 1260 AbstractTask* returned = head; 1261 if (head !is null) 1262 { 1263 head = head.next; 1264 returned.prev = null; 1265 returned.next = null; 1266 returned.taskStatus = TaskStatus.inProgress; 1267 } 1268 if (head !is null) 1269 { 1270 head.prev = null; 1271 } 1272 1273 return returned; 1274 } 1275 1276 // Push a task onto the queue. 1277 void abstractPut(AbstractTask* task) 1278 { 1279 queueLock(); 1280 scope(exit) queueUnlock(); 1281 abstractPutNoSync(task); 1282 } 1283 1284 void abstractPutNoSync(AbstractTask* task) 1285 in 1286 { 1287 assert(task); 1288 } 1289 out 1290 { 1291 import std.conv : text; 1292 1293 assert(tail.prev !is tail); 1294 assert(tail.next is null, text(tail.prev, '\t', tail.next)); 1295 if (tail.prev !is null) 1296 { 1297 assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next)); 1298 } 1299 } 1300 do 1301 { 1302 // Not using enforce() to save on function call overhead since this 1303 // is a performance critical function. 1304 if (status != PoolState.running) 1305 { 1306 throw new Error( 1307 "Cannot submit a new task to a pool after calling " ~ 1308 "finish() or stop()." 1309 ); 1310 } 1311 1312 task.next = null; 1313 if (head is null) //Queue is empty. 1314 { 1315 head = task; 1316 tail = task; 1317 tail.prev = null; 1318 } 1319 else 1320 { 1321 assert(tail); 1322 task.prev = tail; 1323 tail.next = task; 1324 tail = task; 1325 } 1326 notify(); 1327 } 1328 1329 void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t) 1330 { 1331 if (status != PoolState.running) 1332 { 1333 throw new Error( 1334 "Cannot submit a new task to a pool after calling " ~ 1335 "finish() or stop()." 1336 ); 1337 } 1338 1339 if (head is null) 1340 { 1341 head = h; 1342 tail = t; 1343 } 1344 else 1345 { 1346 h.prev = tail; 1347 tail.next = h; 1348 tail = t; 1349 } 1350 1351 notifyAll(); 1352 } 1353 1354 void tryDeleteExecute(AbstractTask* toExecute) 1355 { 1356 if (isSingleTask) return; 1357 1358 if ( !deleteItem(toExecute) ) 1359 { 1360 return; 1361 } 1362 1363 try 1364 { 1365 toExecute.job(); 1366 } 1367 catch (Exception e) 1368 { 1369 toExecute.exception = e; 1370 } 1371 1372 atomicSetUbyte(toExecute.taskStatus, TaskStatus.done); 1373 } 1374 1375 bool deleteItem(AbstractTask* item) 1376 { 1377 queueLock(); 1378 scope(exit) queueUnlock(); 1379 return deleteItemNoSync(item); 1380 } 1381 1382 bool deleteItemNoSync(AbstractTask* item) 1383 { 1384 if (item.taskStatus != TaskStatus.notStarted) 1385 { 1386 return false; 1387 } 1388 item.taskStatus = TaskStatus.inProgress; 1389 1390 if (item is head) 1391 { 1392 // Make sure head gets set properly. 1393 popNoSync(); 1394 return true; 1395 } 1396 if (item is tail) 1397 { 1398 tail = tail.prev; 1399 if (tail !is null) 1400 { 1401 tail.next = null; 1402 } 1403 item.next = null; 1404 item.prev = null; 1405 return true; 1406 } 1407 if (item.next !is null) 1408 { 1409 assert(item.next.prev is item); // Check queue consistency. 1410 item.next.prev = item.prev; 1411 } 1412 if (item.prev !is null) 1413 { 1414 assert(item.prev.next is item); // Check queue consistency. 1415 item.prev.next = item.next; 1416 } 1417 item.next = null; 1418 item.prev = null; 1419 return true; 1420 } 1421 1422 void queueLock() 1423 { 1424 assert(queueMutex); 1425 if (!isSingleTask) queueMutex.lock(); 1426 } 1427 1428 void queueUnlock() 1429 { 1430 assert(queueMutex); 1431 if (!isSingleTask) queueMutex.unlock(); 1432 } 1433 1434 void waiterLock() 1435 { 1436 if (!isSingleTask) waiterMutex.lock(); 1437 } 1438 1439 void waiterUnlock() 1440 { 1441 if (!isSingleTask) waiterMutex.unlock(); 1442 } 1443 1444 void wait() 1445 { 1446 if (!isSingleTask) workerCondition.wait(); 1447 } 1448 1449 void notify() 1450 { 1451 if (!isSingleTask) workerCondition.notify(); 1452 } 1453 1454 void notifyAll() 1455 { 1456 if (!isSingleTask) workerCondition.notifyAll(); 1457 } 1458 1459 void waitUntilCompletion() 1460 { 1461 if (isSingleTask) 1462 { 1463 singleTaskThread.join(); 1464 } 1465 else 1466 { 1467 waiterCondition.wait(); 1468 } 1469 } 1470 1471 void notifyWaiters() 1472 { 1473 if (!isSingleTask) waiterCondition.notifyAll(); 1474 } 1475 1476 // Private constructor for creating dummy pools that only have one thread, 1477 // only execute one Task, and then terminate. This is used for 1478 // Task.executeInNewThread(). 1479 this(AbstractTask* task, int priority = int.max) 1480 { 1481 assert(task); 1482 1483 // Dummy value, not used. 1484 instanceStartIndex = 0; 1485 1486 this.isSingleTask = true; 1487 task.taskStatus = TaskStatus.inProgress; 1488 this.head = task; 1489 singleTaskThread = new Thread(&doSingleTask); 1490 singleTaskThread.start(); 1491 1492 // Disabled until writing code to support 1493 // running thread with specified priority 1494 // See https://issues.dlang.org/show_bug.cgi?id=8960 1495 1496 /*if (priority != int.max) 1497 { 1498 singleTaskThread.priority = priority; 1499 }*/ 1500 } 1501 1502 public: 1503 // This is used in parallel_algorithm but is too unstable to document 1504 // as public API. 1505 size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow 1506 { 1507 import std.algorithm.comparison : max; 1508 1509 if (this.size == 0) 1510 { 1511 return max(rangeLen, 1); 1512 } 1513 1514 immutable size_t eightSize = 4 * (this.size + 1); 1515 auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1); 1516 return max(ret, 1); 1517 } 1518 1519 /** 1520 Default constructor that initializes a `TaskPool` with 1521 `totalCPUs` - 1 worker threads. The minus 1 is included because the 1522 main thread will also be available to do work. 1523 1524 Note: On single-core machines, the primitives provided by `TaskPool` 1525 operate transparently in single-threaded mode. 1526 */ 1527 this() @trusted 1528 { 1529 this(totalCPUs - 1); 1530 } 1531 1532 /** 1533 Allows for custom number of worker threads. 1534 */ 1535 this(size_t nWorkers) @trusted 1536 { 1537 synchronized(typeid(TaskPool)) 1538 { 1539 instanceStartIndex = nextInstanceIndex; 1540 1541 // The first worker thread to be initialized will have this index, 1542 // and will increment it. The second worker to be initialized will 1543 // have this index plus 1. 1544 nextThreadIndex = instanceStartIndex; 1545 nextInstanceIndex += nWorkers; 1546 } 1547 1548 queueMutex = new Mutex(this); 1549 waiterMutex = new Mutex(); 1550 workerCondition = new Condition(queueMutex); 1551 waiterCondition = new Condition(waiterMutex); 1552 1553 pool = new ParallelismThread[nWorkers]; 1554 foreach (ref poolThread; pool) 1555 { 1556 poolThread = new ParallelismThread(&startWorkLoop); 1557 poolThread.pool = this; 1558 poolThread.start(); 1559 } 1560 } 1561 1562 /** 1563 Implements a parallel foreach loop over a range. This works by implicitly 1564 creating and submitting one `Task` to the `TaskPool` for each worker 1565 thread. A work unit is a set of consecutive elements of `range` to 1566 be processed by a worker thread between communication with any other 1567 thread. The number of elements processed per work unit is controlled by the 1568 `workUnitSize` parameter. Smaller work units provide better load 1569 balancing, but larger work units avoid the overhead of communicating 1570 with other threads frequently to fetch the next work unit. Large work 1571 units also avoid false sharing in cases where the range is being modified. 1572 The less time a single iteration of the loop takes, the larger 1573 `workUnitSize` should be. For very expensive loop bodies, 1574 `workUnitSize` should be 1. An overload that chooses a default work 1575 unit size is also available. 1576 1577 Example: 1578 --- 1579 // Find the logarithm of every number from 1 to 1580 // 10_000_000 in parallel. 1581 auto logs = new double[10_000_000]; 1582 1583 // Parallel foreach works with or without an index 1584 // variable. It can iterate by ref if range.front 1585 // returns by ref. 1586 1587 // Iterate over logs using work units of size 100. 1588 foreach (i, ref elem; taskPool.parallel(logs, 100)) 1589 { 1590 elem = log(i + 1.0); 1591 } 1592 1593 // Same thing, but use the default work unit size. 1594 // 1595 // Timings on an Athlon 64 X2 dual core machine: 1596 // 1597 // Parallel foreach: 388 milliseconds 1598 // Regular foreach: 619 milliseconds 1599 foreach (i, ref elem; taskPool.parallel(logs)) 1600 { 1601 elem = log(i + 1.0); 1602 } 1603 --- 1604 1605 Notes: 1606 1607 The memory usage of this implementation is guaranteed to be constant 1608 in `range.length`. 1609 1610 Breaking from a parallel foreach loop via a break, labeled break, 1611 labeled continue, return or goto statement throws a 1612 `ParallelForeachError`. 1613 1614 In the case of non-random access ranges, parallel foreach buffers lazily 1615 to an array of size `workUnitSize` before executing the parallel portion 1616 of the loop. The exception is that, if a parallel foreach is executed 1617 over a range returned by `asyncBuf` or `map`, the copying is elided 1618 and the buffers are simply swapped. In this case `workUnitSize` is 1619 ignored and the work unit size is set to the buffer size of `range`. 1620 1621 A memory barrier is guaranteed to be executed on exit from the loop, 1622 so that results produced by all threads are visible in the calling thread. 1623 1624 $(B Exception Handling): 1625 1626 When at least one exception is thrown from inside a parallel foreach loop, 1627 the submission of additional `Task` objects is terminated as soon as 1628 possible, in a non-deterministic manner. All executing or 1629 enqueued work units are allowed to complete. Then, all exceptions that 1630 were thrown by any work unit are chained using `Throwable.next` and 1631 rethrown. The order of the exception chaining is non-deterministic. 1632 */ 1633 ParallelForeach!R parallel(R)(R range, size_t workUnitSize) 1634 { 1635 import std.exception : enforce; 1636 enforce(workUnitSize > 0, "workUnitSize must be > 0."); 1637 alias RetType = ParallelForeach!R; 1638 return RetType(this, range, workUnitSize); 1639 } 1640 1641 1642 /// Ditto 1643 ParallelForeach!R parallel(R)(R range) 1644 { 1645 static if (hasLength!R) 1646 { 1647 // Default work unit size is such that we would use 4x as many 1648 // slots as are in this thread pool. 1649 size_t workUnitSize = defaultWorkUnitSize(range.length); 1650 return parallel(range, workUnitSize); 1651 } 1652 else 1653 { 1654 // Just use a really, really dumb guess if the user is too lazy to 1655 // specify. 1656 return parallel(range, 512); 1657 } 1658 } 1659 1660 /// 1661 template amap(functions...) 1662 { 1663 /** 1664 Eager parallel map. The eagerness of this function means it has less 1665 overhead than the lazily evaluated `TaskPool.map` and should be 1666 preferred where the memory requirements of eagerness are acceptable. 1667 `functions` are the functions to be evaluated, passed as template 1668 alias parameters in a style similar to 1669 $(REF map, std,algorithm,iteration). 1670 The first argument must be a random access range. For performance 1671 reasons, amap will assume the range elements have not yet been 1672 initialized. Elements will be overwritten without calling a destructor 1673 nor doing an assignment. As such, the range must not contain meaningful 1674 data$(DDOC_COMMENT not a section): either un-initialized objects, or 1675 objects in their `.init` state. 1676 1677 --- 1678 auto numbers = iota(100_000_000.0); 1679 1680 // Find the square roots of numbers. 1681 // 1682 // Timings on an Athlon 64 X2 dual core machine: 1683 // 1684 // Parallel eager map: 0.802 s 1685 // Equivalent serial implementation: 1.768 s 1686 auto squareRoots = taskPool.amap!sqrt(numbers); 1687 --- 1688 1689 Immediately after the range argument, an optional work unit size argument 1690 may be provided. Work units as used by `amap` are identical to those 1691 defined for parallel foreach. If no work unit size is provided, the 1692 default work unit size is used. 1693 1694 --- 1695 // Same thing, but make work unit size 100. 1696 auto squareRoots = taskPool.amap!sqrt(numbers, 100); 1697 --- 1698 1699 An output range for returning the results may be provided as the last 1700 argument. If one is not provided, an array of the proper type will be 1701 allocated on the garbage collected heap. If one is provided, it must be a 1702 random access range with assignable elements, must have reference 1703 semantics with respect to assignment to its elements, and must have the 1704 same length as the input range. Writing to adjacent elements from 1705 different threads must be safe. 1706 1707 --- 1708 // Same thing, but explicitly allocate an array 1709 // to return the results in. The element type 1710 // of the array may be either the exact type 1711 // returned by functions or an implicit conversion 1712 // target. 1713 auto squareRoots = new float[numbers.length]; 1714 taskPool.amap!sqrt(numbers, squareRoots); 1715 1716 // Multiple functions, explicit output range, and 1717 // explicit work unit size. 1718 auto results = new Tuple!(float, real)[numbers.length]; 1719 taskPool.amap!(sqrt, log)(numbers, 100, results); 1720 --- 1721 1722 Note: 1723 1724 A memory barrier is guaranteed to be executed after all results are written 1725 but before returning so that results produced by all threads are visible 1726 in the calling thread. 1727 1728 Tips: 1729 1730 To perform the mapping operation in place, provide the same range for the 1731 input and output range. 1732 1733 To parallelize the copying of a range with expensive to evaluate elements 1734 to an array, pass an identity function (a function that just returns 1735 whatever argument is provided to it) to `amap`. 1736 1737 $(B Exception Handling): 1738 1739 When at least one exception is thrown from inside the map functions, 1740 the submission of additional `Task` objects is terminated as soon as 1741 possible, in a non-deterministic manner. All currently executing or 1742 enqueued work units are allowed to complete. Then, all exceptions that 1743 were thrown from any work unit are chained using `Throwable.next` and 1744 rethrown. The order of the exception chaining is non-deterministic. 1745 */ 1746 auto amap(Args...)(Args args) 1747 if (isRandomAccessRange!(Args[0])) 1748 { 1749 import core.internal.lifetime : emplaceRef; 1750 1751 alias fun = adjoin!(staticMap!(unaryFun, functions)); 1752 1753 alias range = args[0]; 1754 immutable len = range.length; 1755 1756 static if ( 1757 Args.length > 1 && 1758 randAssignable!(Args[$ - 1]) && 1759 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1])) 1760 ) 1761 { 1762 import std.conv : text; 1763 import std.exception : enforce; 1764 1765 alias buf = args[$ - 1]; 1766 alias args2 = args[0..$ - 1]; 1767 alias Args2 = Args[0..$ - 1]; 1768 enforce(buf.length == len, 1769 text("Can't use a user supplied buffer that's the wrong ", 1770 "size. (Expected :", len, " Got: ", buf.length)); 1771 } 1772 else static if (randAssignable!(Args[$ - 1]) && Args.length > 1) 1773 { 1774 static assert(0, "Wrong buffer type."); 1775 } 1776 else 1777 { 1778 import std.array : uninitializedArray; 1779 1780 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len); 1781 alias args2 = args; 1782 alias Args2 = Args; 1783 } 1784 1785 if (!len) return buf; 1786 1787 static if (isIntegral!(Args2[$ - 1])) 1788 { 1789 static assert(args2.length == 2); 1790 auto workUnitSize = cast(size_t) args2[1]; 1791 } 1792 else 1793 { 1794 static assert(args2.length == 1, Args); 1795 auto workUnitSize = defaultWorkUnitSize(range.length); 1796 } 1797 1798 alias R = typeof(range); 1799 1800 if (workUnitSize > len) 1801 { 1802 workUnitSize = len; 1803 } 1804 1805 // Handle as a special case: 1806 if (size == 0) 1807 { 1808 size_t index = 0; 1809 foreach (elem; range) 1810 { 1811 emplaceRef(buf[index++], fun(elem)); 1812 } 1813 return buf; 1814 } 1815 1816 // Effectively -1: chunkIndex + 1 == 0: 1817 shared size_t workUnitIndex = size_t.max; 1818 shared bool shouldContinue = true; 1819 1820 void doIt() 1821 { 1822 import std.algorithm.comparison : min; 1823 1824 scope(failure) 1825 { 1826 // If an exception is thrown, all threads should bail. 1827 atomicStore(shouldContinue, false); 1828 } 1829 1830 while (atomicLoad(shouldContinue)) 1831 { 1832 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); 1833 immutable start = workUnitSize * myUnitIndex; 1834 if (start >= len) 1835 { 1836 atomicStore(shouldContinue, false); 1837 break; 1838 } 1839 1840 immutable end = min(len, start + workUnitSize); 1841 1842 static if (hasSlicing!R) 1843 { 1844 auto subrange = range[start .. end]; 1845 foreach (i; start .. end) 1846 { 1847 emplaceRef(buf[i], fun(subrange.front)); 1848 subrange.popFront(); 1849 } 1850 } 1851 else 1852 { 1853 foreach (i; start .. end) 1854 { 1855 emplaceRef(buf[i], fun(range[i])); 1856 } 1857 } 1858 } 1859 } 1860 1861 submitAndExecute(this, &doIt); 1862 return buf; 1863 } 1864 } 1865 1866 /// 1867 template map(functions...) 1868 { 1869 /** 1870 A semi-lazy parallel map that can be used for pipelining. The map 1871 functions are evaluated for the first `bufSize` elements and stored in a 1872 buffer and made available to `popFront`. Meanwhile, in the 1873 background a second buffer of the same size is filled. When the first 1874 buffer is exhausted, it is swapped with the second buffer and filled while 1875 the values from what was originally the second buffer are read. This 1876 implementation allows for elements to be written to the buffer without 1877 the need for atomic operations or synchronization for each write, and 1878 enables the mapping function to be evaluated efficiently in parallel. 1879 1880 `map` has more overhead than the simpler procedure used by `amap` 1881 but avoids the need to keep all results in memory simultaneously and works 1882 with non-random access ranges. 1883 1884 Params: 1885 1886 source = The $(REF_ALTTEXT input range, isInputRange, std,range,primitives) 1887 to be mapped. If `source` is not random 1888 access it will be lazily buffered to an array of size `bufSize` before 1889 the map function is evaluated. (For an exception to this rule, see Notes.) 1890 1891 bufSize = The size of the buffer to store the evaluated elements. 1892 1893 workUnitSize = The number of elements to evaluate in a single 1894 `Task`. Must be less than or equal to `bufSize`, and 1895 should be a fraction of `bufSize` such that all worker threads can be 1896 used. If the default of size_t.max is used, workUnitSize will be set to 1897 the pool-wide default. 1898 1899 Returns: An input range representing the results of the map. This range 1900 has a length iff `source` has a length. 1901 1902 Notes: 1903 1904 If a range returned by `map` or `asyncBuf` is used as an input to 1905 `map`, then as an optimization the copying from the output buffer 1906 of the first range to the input buffer of the second range is elided, even 1907 though the ranges returned by `map` and `asyncBuf` are non-random 1908 access ranges. This means that the `bufSize` parameter passed to the 1909 current call to `map` will be ignored and the size of the buffer 1910 will be the buffer size of `source`. 1911 1912 Example: 1913 --- 1914 // Pipeline reading a file, converting each line 1915 // to a number, taking the logarithms of the numbers, 1916 // and performing the additions necessary to find 1917 // the sum of the logarithms. 1918 1919 auto lineRange = File("numberList.txt").byLine(); 1920 auto dupedLines = std.algorithm.map!"a.idup"(lineRange); 1921 auto nums = taskPool.map!(to!double)(dupedLines); 1922 auto logs = taskPool.map!log10(nums); 1923 1924 double sum = 0; 1925 foreach (elem; logs) 1926 { 1927 sum += elem; 1928 } 1929 --- 1930 1931 $(B Exception Handling): 1932 1933 Any exceptions thrown while iterating over `source` 1934 or computing the map function are re-thrown on a call to `popFront` or, 1935 if thrown during construction, are simply allowed to propagate to the 1936 caller. In the case of exceptions thrown while computing the map function, 1937 the exceptions are chained as in `TaskPool.amap`. 1938 */ 1939 auto 1940 map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max) 1941 if (isInputRange!S) 1942 { 1943 import std.exception : enforce; 1944 1945 enforce(workUnitSize == size_t.max || workUnitSize <= bufSize, 1946 "Work unit size must be smaller than buffer size."); 1947 alias fun = adjoin!(staticMap!(unaryFun, functions)); 1948 1949 static final class Map 1950 { 1951 // This is a class because the task needs to be located on the 1952 // heap and in the non-random access case source needs to be on 1953 // the heap, too. 1954 1955 private: 1956 enum bufferTrick = is(typeof(source.buf1)) && 1957 is(typeof(source.bufPos)) && 1958 is(typeof(source.doBufSwap())); 1959 1960 alias E = MapType!(S, functions); 1961 E[] buf1, buf2; 1962 S source; 1963 TaskPool pool; 1964 Task!(run, E[] delegate(E[]), E[]) nextBufTask; 1965 size_t workUnitSize; 1966 size_t bufPos; 1967 bool lastTaskWaited; 1968 1969 static if (isRandomAccessRange!S) 1970 { 1971 alias FromType = S; 1972 1973 void popSource() 1974 { 1975 import std.algorithm.comparison : min; 1976 1977 static if (__traits(compiles, source[0 .. source.length])) 1978 { 1979 source = source[min(buf1.length, source.length)..source.length]; 1980 } 1981 else static if (__traits(compiles, source[0..$])) 1982 { 1983 source = source[min(buf1.length, source.length)..$]; 1984 } 1985 else 1986 { 1987 static assert(0, "S must have slicing for Map." 1988 ~ " " ~ S.stringof ~ " doesn't."); 1989 } 1990 } 1991 } 1992 else static if (bufferTrick) 1993 { 1994 // Make sure we don't have the buffer recycling overload of 1995 // asyncBuf. 1996 static if ( 1997 is(typeof(source.source)) && 1998 isRoundRobin!(typeof(source.source)) 1999 ) 2000 { 2001 static assert(0, "Cannot execute a parallel map on " ~ 2002 "the buffer recycling overload of asyncBuf." 2003 ); 2004 } 2005 2006 alias FromType = typeof(source.buf1); 2007 FromType from; 2008 2009 // Just swap our input buffer with source's output buffer. 2010 // No need to copy element by element. 2011 FromType dumpToFrom() 2012 { 2013 import std.algorithm.mutation : swap; 2014 2015 assert(source.buf1.length <= from.length); 2016 from.length = source.buf1.length; 2017 swap(source.buf1, from); 2018 2019 // Just in case this source has been popped before 2020 // being sent to map: 2021 from = from[source.bufPos..$]; 2022 2023 static if (is(typeof(source._length))) 2024 { 2025 source._length -= (from.length - source.bufPos); 2026 } 2027 2028 source.doBufSwap(); 2029 2030 return from; 2031 } 2032 } 2033 else 2034 { 2035 alias FromType = ElementType!S[]; 2036 2037 // The temporary array that data is copied to before being 2038 // mapped. 2039 FromType from; 2040 2041 FromType dumpToFrom() 2042 { 2043 assert(from !is null); 2044 2045 size_t i; 2046 for (; !source.empty && i < from.length; source.popFront()) 2047 { 2048 from[i++] = source.front; 2049 } 2050 2051 from = from[0 .. i]; 2052 return from; 2053 } 2054 } 2055 2056 static if (hasLength!S) 2057 { 2058 size_t _length; 2059 2060 public @property size_t length() const @safe pure nothrow 2061 { 2062 return _length; 2063 } 2064 } 2065 2066 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool) 2067 { 2068 static if (bufferTrick) 2069 { 2070 bufSize = source.buf1.length; 2071 } 2072 2073 buf1.length = bufSize; 2074 buf2.length = bufSize; 2075 2076 static if (!isRandomAccessRange!S) 2077 { 2078 from.length = bufSize; 2079 } 2080 2081 this.workUnitSize = (workUnitSize == size_t.max) ? 2082 pool.defaultWorkUnitSize(bufSize) : workUnitSize; 2083 this.source = source; 2084 this.pool = pool; 2085 2086 static if (hasLength!S) 2087 { 2088 _length = source.length; 2089 } 2090 2091 buf1 = fillBuf(buf1); 2092 submitBuf2(); 2093 } 2094 2095 // The from parameter is a dummy and ignored in the random access 2096 // case. 2097 E[] fillBuf(E[] buf) 2098 { 2099 import std.algorithm.comparison : min; 2100 2101 static if (isRandomAccessRange!S) 2102 { 2103 import std.range : take; 2104 auto toMap = take(source, buf.length); 2105 scope(success) popSource(); 2106 } 2107 else 2108 { 2109 auto toMap = dumpToFrom(); 2110 } 2111 2112 buf = buf[0 .. min(buf.length, toMap.length)]; 2113 2114 // Handle as a special case: 2115 if (pool.size == 0) 2116 { 2117 size_t index = 0; 2118 foreach (elem; toMap) 2119 { 2120 buf[index++] = fun(elem); 2121 } 2122 return buf; 2123 } 2124 2125 pool.amap!functions(toMap, workUnitSize, buf); 2126 2127 return buf; 2128 } 2129 2130 void submitBuf2() 2131 in 2132 { 2133 assert(nextBufTask.prev is null); 2134 assert(nextBufTask.next is null); 2135 } 2136 do 2137 { 2138 // Hack to reuse the task object. 2139 2140 nextBufTask = typeof(nextBufTask).init; 2141 nextBufTask._args[0] = &fillBuf; 2142 nextBufTask._args[1] = buf2; 2143 pool.put(nextBufTask); 2144 } 2145 2146 void doBufSwap() 2147 { 2148 if (lastTaskWaited) 2149 { 2150 // Then the source is empty. Signal it here. 2151 buf1 = null; 2152 buf2 = null; 2153 2154 static if (!isRandomAccessRange!S) 2155 { 2156 from = null; 2157 } 2158 2159 return; 2160 } 2161 2162 buf2 = buf1; 2163 buf1 = nextBufTask.yieldForce; 2164 bufPos = 0; 2165 2166 if (source.empty) 2167 { 2168 lastTaskWaited = true; 2169 } 2170 else 2171 { 2172 submitBuf2(); 2173 } 2174 } 2175 2176 public: 2177 @property auto front() 2178 { 2179 return buf1[bufPos]; 2180 } 2181 2182 void popFront() 2183 { 2184 static if (hasLength!S) 2185 { 2186 _length--; 2187 } 2188 2189 bufPos++; 2190 if (bufPos >= buf1.length) 2191 { 2192 doBufSwap(); 2193 } 2194 } 2195 2196 static if (isInfinite!S) 2197 { 2198 enum bool empty = false; 2199 } 2200 else 2201 { 2202 2203 bool empty() const @property 2204 { 2205 // popFront() sets this when source is empty 2206 return buf1.length == 0; 2207 } 2208 } 2209 } 2210 return new Map(source, bufSize, workUnitSize, this); 2211 } 2212 } 2213 2214 /** 2215 Given a `source` range that is expensive to iterate over, returns an 2216 $(REF_ALTTEXT input range, isInputRange, std,range,primitives) that 2217 asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread, 2218 while making previously buffered elements from a second buffer, also of size 2219 `bufSize`, available via the range interface of the returned 2220 object. The returned range has a length iff `hasLength!S`. 2221 `asyncBuf` is useful, for example, when performing expensive operations 2222 on the elements of ranges that represent data on a disk or network. 2223 2224 Example: 2225 --- 2226 import std.conv, std.stdio; 2227 2228 void main() 2229 { 2230 // Fetch lines of a file in a background thread 2231 // while processing previously fetched lines, 2232 // dealing with byLine's buffer recycling by 2233 // eagerly duplicating every line. 2234 auto lines = File("foo.txt").byLine(); 2235 auto duped = std.algorithm.map!"a.idup"(lines); 2236 2237 // Fetch more lines in the background while we 2238 // process the lines already read into memory 2239 // into a matrix of doubles. 2240 double[][] matrix; 2241 auto asyncReader = taskPool.asyncBuf(duped); 2242 2243 foreach (line; asyncReader) 2244 { 2245 auto ls = line.split("\t"); 2246 matrix ~= to!(double[])(ls); 2247 } 2248 } 2249 --- 2250 2251 $(B Exception Handling): 2252 2253 Any exceptions thrown while iterating over `source` are re-thrown on a 2254 call to `popFront` or, if thrown during construction, simply 2255 allowed to propagate to the caller. 2256 */ 2257 auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S) 2258 { 2259 static final class AsyncBuf 2260 { 2261 // This is a class because the task and source both need to be on 2262 // the heap. 2263 2264 // The element type of S. 2265 alias E = ElementType!S; // Needs to be here b/c of forward ref bugs. 2266 2267 private: 2268 E[] buf1, buf2; 2269 S source; 2270 TaskPool pool; 2271 Task!(run, E[] delegate(E[]), E[]) nextBufTask; 2272 size_t bufPos; 2273 bool lastTaskWaited; 2274 2275 static if (hasLength!S) 2276 { 2277 size_t _length; 2278 2279 // Available if hasLength!S. 2280 public @property size_t length() const @safe pure nothrow 2281 { 2282 return _length; 2283 } 2284 } 2285 2286 this(S source, size_t bufSize, TaskPool pool) 2287 { 2288 buf1.length = bufSize; 2289 buf2.length = bufSize; 2290 2291 this.source = source; 2292 this.pool = pool; 2293 2294 static if (hasLength!S) 2295 { 2296 _length = source.length; 2297 } 2298 2299 buf1 = fillBuf(buf1); 2300 submitBuf2(); 2301 } 2302 2303 E[] fillBuf(E[] buf) 2304 { 2305 assert(buf !is null); 2306 2307 size_t i; 2308 for (; !source.empty && i < buf.length; source.popFront()) 2309 { 2310 buf[i++] = source.front; 2311 } 2312 2313 buf = buf[0 .. i]; 2314 return buf; 2315 } 2316 2317 void submitBuf2() 2318 in 2319 { 2320 assert(nextBufTask.prev is null); 2321 assert(nextBufTask.next is null); 2322 } 2323 do 2324 { 2325 // Hack to reuse the task object. 2326 2327 nextBufTask = typeof(nextBufTask).init; 2328 nextBufTask._args[0] = &fillBuf; 2329 nextBufTask._args[1] = buf2; 2330 pool.put(nextBufTask); 2331 } 2332 2333 void doBufSwap() 2334 { 2335 if (lastTaskWaited) 2336 { 2337 // Then source is empty. Signal it here. 2338 buf1 = null; 2339 buf2 = null; 2340 return; 2341 } 2342 2343 buf2 = buf1; 2344 buf1 = nextBufTask.yieldForce; 2345 bufPos = 0; 2346 2347 if (source.empty) 2348 { 2349 lastTaskWaited = true; 2350 } 2351 else 2352 { 2353 submitBuf2(); 2354 } 2355 } 2356 2357 public: 2358 E front() @property 2359 { 2360 return buf1[bufPos]; 2361 } 2362 2363 void popFront() 2364 { 2365 static if (hasLength!S) 2366 { 2367 _length--; 2368 } 2369 2370 bufPos++; 2371 if (bufPos >= buf1.length) 2372 { 2373 doBufSwap(); 2374 } 2375 } 2376 2377 static if (isInfinite!S) 2378 { 2379 enum bool empty = false; 2380 } 2381 2382 else 2383 { 2384 /// 2385 bool empty() @property 2386 { 2387 // popFront() sets this when source is empty: 2388 return buf1.length == 0; 2389 } 2390 } 2391 } 2392 return new AsyncBuf(source, bufSize, this); 2393 } 2394 2395 /** 2396 Given a callable object `next` that writes to a user-provided buffer and 2397 a second callable object `empty` that determines whether more data is 2398 available to write via `next`, returns an input range that 2399 asynchronously calls `next` with a set of size `nBuffers` of buffers 2400 and makes the results available in the order they were obtained via the 2401 input range interface of the returned object. Similarly to the 2402 input range overload of `asyncBuf`, the first half of the buffers 2403 are made available via the range interface while the second half are 2404 filled and vice-versa. 2405 2406 Params: 2407 2408 next = A callable object that takes a single argument that must be an array 2409 with mutable elements. When called, `next` writes data to 2410 the array provided by the caller. 2411 2412 empty = A callable object that takes no arguments and returns a type 2413 implicitly convertible to `bool`. This is used to signify 2414 that no more data is available to be obtained by calling `next`. 2415 2416 initialBufSize = The initial size of each buffer. If `next` takes its 2417 array by reference, it may resize the buffers. 2418 2419 nBuffers = The number of buffers to cycle through when calling `next`. 2420 2421 Example: 2422 --- 2423 // Fetch lines of a file in a background 2424 // thread while processing previously fetched 2425 // lines, without duplicating any lines. 2426 auto file = File("foo.txt"); 2427 2428 void next(ref char[] buf) 2429 { 2430 file.readln(buf); 2431 } 2432 2433 // Fetch more lines in the background while we 2434 // process the lines already read into memory 2435 // into a matrix of doubles. 2436 double[][] matrix; 2437 auto asyncReader = taskPool.asyncBuf(&next, &file.eof); 2438 2439 foreach (line; asyncReader) 2440 { 2441 auto ls = line.split("\t"); 2442 matrix ~= to!(double[])(ls); 2443 } 2444 --- 2445 2446 $(B Exception Handling): 2447 2448 Any exceptions thrown while iterating over `range` are re-thrown on a 2449 call to `popFront`. 2450 2451 Warning: 2452 2453 Using the range returned by this function in a parallel foreach loop 2454 will not work because buffers may be overwritten while the task that 2455 processes them is in queue. This is checked for at compile time 2456 and will result in a static assertion failure. 2457 */ 2458 auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100) 2459 if (is(typeof(C2.init()) : bool) && 2460 Parameters!C1.length == 1 && 2461 Parameters!C2.length == 0 && 2462 isArray!(Parameters!C1[0]) 2463 ) { 2464 auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers); 2465 return asyncBuf(roundRobin, nBuffers / 2); 2466 } 2467 2468 /// 2469 template reduce(functions...) 2470 { 2471 /** 2472 Parallel reduce on a random access range. Except as otherwise noted, 2473 usage is similar to $(REF _reduce, std,algorithm,iteration). There is 2474 also $(LREF fold) which does the same thing with a different parameter 2475 order. 2476 2477 This function works by splitting the range to be reduced into work 2478 units, which are slices to be reduced in parallel. Once the results 2479 from all work units are computed, a final serial reduction is performed 2480 on these results to compute the final answer. Therefore, care must be 2481 taken to choose the seed value appropriately. 2482 2483 Because the reduction is being performed in parallel, `functions` 2484 must be associative. For notational simplicity, let # be an 2485 infix operator representing `functions`. Then, (a # b) # c must equal 2486 a # (b # c). Floating point addition is not associative 2487 even though addition in exact arithmetic is. Summing floating 2488 point numbers using this function may give different results than summing 2489 serially. However, for many practical purposes floating point addition 2490 can be treated as associative. 2491 2492 Note that, since `functions` are assumed to be associative, 2493 additional optimizations are made to the serial portion of the reduction 2494 algorithm. These take advantage of the instruction level parallelism of 2495 modern CPUs, in addition to the thread-level parallelism that the rest 2496 of this module exploits. This can lead to better than linear speedups 2497 relative to $(REF _reduce, std,algorithm,iteration), especially for 2498 fine-grained benchmarks like dot products. 2499 2500 An explicit seed may be provided as the first argument. If 2501 provided, it is used as the seed for all work units and for the final 2502 reduction of results from all work units. Therefore, if it is not the 2503 identity value for the operation being performed, results may differ 2504 from those generated by $(REF _reduce, std,algorithm,iteration) or 2505 depending on how many work units are used. The next argument must be 2506 the range to be reduced. 2507 --- 2508 // Find the sum of squares of a range in parallel, using 2509 // an explicit seed. 2510 // 2511 // Timings on an Athlon 64 X2 dual core machine: 2512 // 2513 // Parallel reduce: 72 milliseconds 2514 // Using std.algorithm.reduce instead: 181 milliseconds 2515 auto nums = iota(10_000_000.0f); 2516 auto sumSquares = taskPool.reduce!"a + b"( 2517 0.0, std.algorithm.map!"a * a"(nums) 2518 ); 2519 --- 2520 2521 If no explicit seed is provided, the first element of each work unit 2522 is used as a seed. For the final reduction, the result from the first 2523 work unit is used as the seed. 2524 --- 2525 // Find the sum of a range in parallel, using the first 2526 // element of each work unit as the seed. 2527 auto sum = taskPool.reduce!"a + b"(nums); 2528 --- 2529 2530 An explicit work unit size may be specified as the last argument. 2531 Specifying too small a work unit size will effectively serialize the 2532 reduction, as the final reduction of the result of each work unit will 2533 dominate computation time. If `TaskPool.size` for this instance 2534 is zero, this parameter is ignored and one work unit is used. 2535 --- 2536 // Use a work unit size of 100. 2537 auto sum2 = taskPool.reduce!"a + b"(nums, 100); 2538 2539 // Work unit size of 100 and explicit seed. 2540 auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100); 2541 --- 2542 2543 Parallel reduce supports multiple functions, like 2544 `std.algorithm.reduce`. 2545 --- 2546 // Find both the min and max of nums. 2547 auto minMax = taskPool.reduce!(min, max)(nums); 2548 assert(minMax[0] == reduce!min(nums)); 2549 assert(minMax[1] == reduce!max(nums)); 2550 --- 2551 2552 $(B Exception Handling): 2553 2554 After this function is finished executing, any exceptions thrown 2555 are chained together via `Throwable.next` and rethrown. The chaining 2556 order is non-deterministic. 2557 2558 See_Also: 2559 2560 $(LREF fold) is functionally equivalent to $(LREF _reduce) except the 2561 range parameter comes first and there is no need to use 2562 $(REF_ALTTEXT `tuple`,tuple,std,typecons) for multiple seeds. 2563 */ 2564 auto reduce(Args...)(Args args) 2565 { 2566 import core.exception : OutOfMemoryError; 2567 import core.internal.lifetime : emplaceRef; 2568 import std.exception : enforce; 2569 2570 alias fun = reduceAdjoin!functions; 2571 alias finishFun = reduceFinish!functions; 2572 2573 static if (isIntegral!(Args[$ - 1])) 2574 { 2575 size_t workUnitSize = cast(size_t) args[$ - 1]; 2576 alias args2 = args[0..$ - 1]; 2577 alias Args2 = Args[0..$ - 1]; 2578 } 2579 else 2580 { 2581 alias args2 = args; 2582 alias Args2 = Args; 2583 } 2584 2585 auto makeStartValue(Type)(Type e) 2586 { 2587 static if (functions.length == 1) 2588 { 2589 return e; 2590 } 2591 else 2592 { 2593 typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void; 2594 foreach (i, T; seed.Types) 2595 { 2596 emplaceRef(seed.expand[i], e); 2597 } 2598 2599 return seed; 2600 } 2601 } 2602 2603 static if (args2.length == 2) 2604 { 2605 static assert(isInputRange!(Args2[1])); 2606 alias range = args2[1]; 2607 alias seed = args2[0]; 2608 enum explicitSeed = true; 2609 2610 static if (!is(typeof(workUnitSize))) 2611 { 2612 size_t workUnitSize = defaultWorkUnitSize(range.length); 2613 } 2614 } 2615 else 2616 { 2617 static assert(args2.length == 1); 2618 alias range = args2[0]; 2619 2620 static if (!is(typeof(workUnitSize))) 2621 { 2622 size_t workUnitSize = defaultWorkUnitSize(range.length); 2623 } 2624 2625 enforce(!range.empty, 2626 "Cannot reduce an empty range with first element as start value."); 2627 2628 auto seed = makeStartValue(range.front); 2629 enum explicitSeed = false; 2630 range.popFront(); 2631 } 2632 2633 alias E = typeof(seed); 2634 alias R = typeof(range); 2635 2636 E reduceOnRange(R range, size_t lowerBound, size_t upperBound) 2637 { 2638 // This is for exploiting instruction level parallelism by 2639 // using multiple accumulator variables within each thread, 2640 // since we're assuming functions are associative anyhow. 2641 2642 // This is so that loops can be unrolled automatically. 2643 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5); 2644 enum nILP = ilpTuple.length; 2645 immutable subSize = (upperBound - lowerBound) / nILP; 2646 2647 if (subSize <= 1) 2648 { 2649 // Handle as a special case. 2650 static if (explicitSeed) 2651 { 2652 E result = seed; 2653 } 2654 else 2655 { 2656 E result = makeStartValue(range[lowerBound]); 2657 lowerBound++; 2658 } 2659 2660 foreach (i; lowerBound .. upperBound) 2661 { 2662 result = fun(result, range[i]); 2663 } 2664 2665 return result; 2666 } 2667 2668 assert(subSize > 1); 2669 E[nILP] results; 2670 size_t[nILP] offsets; 2671 2672 foreach (i; ilpTuple) 2673 { 2674 offsets[i] = lowerBound + subSize * i; 2675 2676 static if (explicitSeed) 2677 { 2678 results[i] = seed; 2679 } 2680 else 2681 { 2682 results[i] = makeStartValue(range[offsets[i]]); 2683 offsets[i]++; 2684 } 2685 } 2686 2687 immutable nLoop = subSize - (!explicitSeed); 2688 foreach (i; 0 .. nLoop) 2689 { 2690 foreach (j; ilpTuple) 2691 { 2692 results[j] = fun(results[j], range[offsets[j]]); 2693 offsets[j]++; 2694 } 2695 } 2696 2697 // Finish the remainder. 2698 foreach (i; nILP * subSize + lowerBound .. upperBound) 2699 { 2700 results[$ - 1] = fun(results[$ - 1], range[i]); 2701 } 2702 2703 foreach (i; ilpTuple[1..$]) 2704 { 2705 results[0] = finishFun(results[0], results[i]); 2706 } 2707 2708 return results[0]; 2709 } 2710 2711 immutable len = range.length; 2712 if (len == 0) 2713 { 2714 return seed; 2715 } 2716 2717 if (this.size == 0) 2718 { 2719 return finishFun(seed, reduceOnRange(range, 0, len)); 2720 } 2721 2722 // Unlike the rest of the functions here, I can't use the Task object 2723 // recycling trick here because this has to work on non-commutative 2724 // operations. After all the tasks are done executing, fun() has to 2725 // be applied on the results of these to get a final result, but 2726 // it can't be evaluated out of order. 2727 2728 if (workUnitSize > len) 2729 { 2730 workUnitSize = len; 2731 } 2732 2733 immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1); 2734 assert(nWorkUnits * workUnitSize >= len); 2735 2736 alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t); 2737 RTask[] tasks; 2738 2739 // Can't use alloca() due to https://issues.dlang.org/show_bug.cgi?id=3753 2740 // Use a fixed buffer backed by malloc(). 2741 enum maxStack = 2_048; 2742 byte[maxStack] buf = void; 2743 immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof; 2744 2745 import core.stdc.stdlib : malloc, free; 2746 if (nBytesNeeded <= maxStack) 2747 { 2748 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits]; 2749 } 2750 else 2751 { 2752 auto ptr = cast(RTask*) malloc(nBytesNeeded); 2753 if (!ptr) 2754 { 2755 throw new OutOfMemoryError( 2756 "Out of memory in std.parallelism." 2757 ); 2758 } 2759 2760 tasks = ptr[0 .. nWorkUnits]; 2761 } 2762 2763 scope(exit) 2764 { 2765 if (nBytesNeeded > maxStack) 2766 { 2767 free(tasks.ptr); 2768 } 2769 } 2770 2771 // Hack to take the address of a nested function w/o 2772 // making a closure. 2773 static auto scopedAddress(D)(scope D del) @system 2774 { 2775 auto tmp = del; 2776 return tmp; 2777 } 2778 2779 size_t curPos = 0; 2780 void useTask(ref RTask task) 2781 { 2782 import std.algorithm.comparison : min; 2783 import core.lifetime : emplace; 2784 2785 // Private constructor, so can't feed it's arguments directly 2786 // to emplace 2787 emplace(&task, RTask 2788 ( 2789 scopedAddress(&reduceOnRange), 2790 range, 2791 curPos, // lower bound. 2792 cast() min(len, curPos + workUnitSize) // upper bound. 2793 )); 2794 2795 task.pool = this; 2796 2797 curPos += workUnitSize; 2798 } 2799 2800 foreach (ref task; tasks) 2801 { 2802 useTask(task); 2803 } 2804 2805 foreach (i; 1 .. tasks.length - 1) 2806 { 2807 tasks[i].next = tasks[i + 1].basePtr; 2808 tasks[i + 1].prev = tasks[i].basePtr; 2809 } 2810 2811 if (tasks.length > 1) 2812 { 2813 queueLock(); 2814 scope(exit) queueUnlock(); 2815 2816 abstractPutGroupNoSync( 2817 tasks[1].basePtr, 2818 tasks[$ - 1].basePtr 2819 ); 2820 } 2821 2822 if (tasks.length > 0) 2823 { 2824 try 2825 { 2826 tasks[0].job(); 2827 } 2828 catch (Throwable e) 2829 { 2830 tasks[0].exception = e; 2831 } 2832 tasks[0].taskStatus = TaskStatus.done; 2833 2834 // Try to execute each of these in the current thread 2835 foreach (ref task; tasks[1..$]) 2836 { 2837 tryDeleteExecute(task.basePtr); 2838 } 2839 } 2840 2841 // Now that we've tried to execute every task, they're all either 2842 // done or in progress. Force all of them. 2843 E result = seed; 2844 2845 Throwable firstException; 2846 2847 foreach (ref task; tasks) 2848 { 2849 try 2850 { 2851 task.yieldForce; 2852 } 2853 catch (Throwable e) 2854 { 2855 /* Chain e to front because order doesn't matter and because 2856 * e is not likely to be a chain itself (so fewer traversals) 2857 */ 2858 firstException = Throwable.chainTogether(e, firstException); 2859 continue; 2860 } 2861 2862 if (!firstException) result = finishFun(result, task.returnVal); 2863 } 2864 2865 if (firstException) throw firstException; 2866 2867 return result; 2868 } 2869 } 2870 2871 /// 2872 template fold(functions...) 2873 { 2874 /** Implements the homonym function (also known as `accumulate`, `compress`, 2875 `inject`, or `foldl`) present in various programming languages of 2876 functional flavor. 2877 2878 `fold` is functionally equivalent to $(LREF reduce) except the range 2879 parameter comes first and there is no need to use $(REF_ALTTEXT 2880 `tuple`,tuple,std,typecons) for multiple seeds. 2881 2882 There may be one or more callable entities (`functions` argument) to 2883 apply. 2884 2885 Params: 2886 args = Just the range to _fold over; or the range and one seed 2887 per function; or the range, one seed per function, and 2888 the work unit size 2889 2890 Returns: 2891 The accumulated result as a single value for single function and 2892 as a tuple of values for multiple functions 2893 2894 See_Also: 2895 Similar to $(REF _fold, std,algorithm,iteration), `fold` is a wrapper around $(LREF reduce). 2896 2897 Example: 2898 --- 2899 static int adder(int a, int b) 2900 { 2901 return a + b; 2902 } 2903 static int multiplier(int a, int b) 2904 { 2905 return a * b; 2906 } 2907 2908 // Just the range 2909 auto x = taskPool.fold!adder([1, 2, 3, 4]); 2910 assert(x == 10); 2911 2912 // The range and the seeds (0 and 1 below; also note multiple 2913 // functions in this example) 2914 auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1); 2915 assert(y[0] == 10); 2916 assert(y[1] == 24); 2917 2918 // The range, the seed (0), and the work unit size (20) 2919 auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20); 2920 assert(z == 10); 2921 --- 2922 */ 2923 auto fold(Args...)(Args args) 2924 { 2925 static assert(isInputRange!(Args[0]), "First argument must be an InputRange"); 2926 2927 alias range = args[0]; 2928 2929 static if (Args.length == 1) 2930 { 2931 // Just the range 2932 return reduce!functions(range); 2933 } 2934 else static if (Args.length == 1 + functions.length || 2935 Args.length == 1 + functions.length + 1) 2936 { 2937 static if (functions.length == 1) 2938 { 2939 alias seeds = args[1]; 2940 } 2941 else 2942 { 2943 auto seeds() 2944 { 2945 import std.typecons : tuple; 2946 return tuple(args[1 .. functions.length+1]); 2947 } 2948 } 2949 2950 static if (Args.length == 1 + functions.length) 2951 { 2952 // The range and the seeds 2953 return reduce!functions(seeds, range); 2954 } 2955 else static if (Args.length == 1 + functions.length + 1) 2956 { 2957 // The range, the seeds, and the work unit size 2958 static assert(isIntegral!(Args[$-1]), "Work unit size must be an integral type"); 2959 return reduce!functions(seeds, range, args[$-1]); 2960 } 2961 } 2962 else 2963 { 2964 import std.conv : text; 2965 static assert(0, "Invalid number of arguments (" ~ Args.length.text ~ "): Should be an input range, " 2966 ~ functions.length.text ~ " optional seed(s), and an optional work unit size."); 2967 } 2968 } 2969 } 2970 2971 // This test is not included in the documentation because even though these 2972 // examples are for the inner fold() template, with their current location, 2973 // they would appear under the outer one. (We can't move this inside the 2974 // outer fold() template because then dmd runs out of memory possibly due to 2975 // recursive template instantiation, which is surprisingly not caught.) 2976 @system unittest 2977 { 2978 // Just the range 2979 auto x = taskPool.fold!"a + b"([1, 2, 3, 4]); 2980 assert(x == 10); 2981 2982 // The range and the seeds (0 and 1 below; also note multiple 2983 // functions in this example) 2984 auto y = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 1); 2985 assert(y[0] == 10); 2986 assert(y[1] == 24); 2987 2988 // The range, the seed (0), and the work unit size (20) 2989 auto z = taskPool.fold!"a + b"([1, 2, 3, 4], 0, 20); 2990 assert(z == 10); 2991 } 2992 2993 /** 2994 Gets the index of the current thread relative to this `TaskPool`. Any 2995 thread not in this pool will receive an index of 0. The worker threads in 2996 this pool receive unique indices of 1 through `this.size`. 2997 2998 This function is useful for maintaining worker-local resources. 2999 3000 Example: 3001 --- 3002 // Execute a loop that computes the greatest common 3003 // divisor of every number from 0 through 999 with 3004 // 42 in parallel. Write the results out to 3005 // a set of files, one for each thread. This allows 3006 // results to be written out without any synchronization. 3007 3008 import std.conv, std.range, std.numeric, std.stdio; 3009 3010 void main() 3011 { 3012 auto filesHandles = new File[taskPool.size + 1]; 3013 scope(exit) { 3014 foreach (ref handle; fileHandles) 3015 { 3016 handle.close(); 3017 } 3018 } 3019 3020 foreach (i, ref handle; fileHandles) 3021 { 3022 handle = File("workerResults" ~ to!string(i) ~ ".txt"); 3023 } 3024 3025 foreach (num; parallel(iota(1_000))) 3026 { 3027 auto outHandle = fileHandles[taskPool.workerIndex]; 3028 outHandle.writeln(num, '\t', gcd(num, 42)); 3029 } 3030 } 3031 --- 3032 */ 3033 size_t workerIndex() @property @safe const nothrow 3034 { 3035 immutable rawInd = threadIndex; 3036 return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ? 3037 (rawInd - instanceStartIndex + 1) : 0; 3038 } 3039 3040 /** 3041 Struct for creating worker-local storage. Worker-local storage is 3042 thread-local storage that exists only for worker threads in a given 3043 `TaskPool` plus a single thread outside the pool. It is allocated on the 3044 garbage collected heap in a way that avoids _false sharing, and doesn't 3045 necessarily have global scope within any thread. It can be accessed from 3046 any worker thread in the `TaskPool` that created it, and one thread 3047 outside this `TaskPool`. All threads outside the pool that created a 3048 given instance of worker-local storage share a single slot. 3049 3050 Since the underlying data for this struct is heap-allocated, this struct 3051 has reference semantics when passed between functions. 3052 3053 The main uses cases for `WorkerLocalStorage` are: 3054 3055 1. Performing parallel reductions with an imperative, as opposed to 3056 functional, programming style. In this case, it's useful to treat 3057 `WorkerLocalStorage` as local to each thread for only the parallel 3058 portion of an algorithm. 3059 3060 2. Recycling temporary buffers across iterations of a parallel foreach loop. 3061 3062 Example: 3063 --- 3064 // Calculate pi as in our synopsis example, but 3065 // use an imperative instead of a functional style. 3066 immutable n = 1_000_000_000; 3067 immutable delta = 1.0L / n; 3068 3069 auto sums = taskPool.workerLocalStorage(0.0L); 3070 foreach (i; parallel(iota(n))) 3071 { 3072 immutable x = ( i - 0.5L ) * delta; 3073 immutable toAdd = delta / ( 1.0 + x * x ); 3074 sums.get += toAdd; 3075 } 3076 3077 // Add up the results from each worker thread. 3078 real pi = 0; 3079 foreach (threadResult; sums.toRange) 3080 { 3081 pi += 4.0L * threadResult; 3082 } 3083 --- 3084 */ 3085 static struct WorkerLocalStorage(T) 3086 { 3087 private: 3088 TaskPool pool; 3089 size_t size; 3090 3091 size_t elemSize; 3092 bool* stillThreadLocal; 3093 3094 static size_t roundToLine(size_t num) pure nothrow 3095 { 3096 if (num % cacheLineSize == 0) 3097 { 3098 return num; 3099 } 3100 else 3101 { 3102 return ((num / cacheLineSize) + 1) * cacheLineSize; 3103 } 3104 } 3105 3106 void* data; 3107 3108 void initialize(TaskPool pool) 3109 { 3110 this.pool = pool; 3111 size = pool.size + 1; 3112 stillThreadLocal = new bool; 3113 *stillThreadLocal = true; 3114 3115 // Determines whether the GC should scan the array. 3116 auto blkInfo = (typeid(T).flags & 1) ? 3117 cast(GC.BlkAttr) 0 : 3118 GC.BlkAttr.NO_SCAN; 3119 3120 immutable nElem = pool.size + 1; 3121 elemSize = roundToLine(T.sizeof); 3122 3123 // The + 3 is to pad one full cache line worth of space on either side 3124 // of the data structure to make sure false sharing with completely 3125 // unrelated heap data is prevented, and to provide enough padding to 3126 // make sure that data is cache line-aligned. 3127 data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize; 3128 3129 // Cache line align data ptr. 3130 data = cast(void*) roundToLine(cast(size_t) data); 3131 3132 foreach (i; 0 .. nElem) 3133 { 3134 this.opIndex(i) = T.init; 3135 } 3136 } 3137 3138 ref opIndex(this Qualified)(size_t index) 3139 { 3140 import std.conv : text; 3141 assert(index < size, text(index, '\t', uint.max)); 3142 return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index)); 3143 } 3144 3145 void opIndexAssign(T val, size_t index) 3146 { 3147 assert(index < size); 3148 *(cast(T*) (data + elemSize * index)) = val; 3149 } 3150 3151 public: 3152 /** 3153 Get the current thread's instance. Returns by ref. 3154 Note that calling `get` from any thread 3155 outside the `TaskPool` that created this instance will return the 3156 same reference, so an instance of worker-local storage should only be 3157 accessed from one thread outside the pool that created it. If this 3158 rule is violated, undefined behavior will result. 3159 3160 If assertions are enabled and `toRange` has been called, then this 3161 WorkerLocalStorage instance is no longer worker-local and an assertion 3162 failure will result when calling this method. This is not checked 3163 when assertions are disabled for performance reasons. 3164 */ 3165 ref get(this Qualified)() @property 3166 { 3167 assert(*stillThreadLocal, 3168 "Cannot call get() on this instance of WorkerLocalStorage " ~ 3169 "because it is no longer worker-local." 3170 ); 3171 return opIndex(pool.workerIndex); 3172 } 3173 3174 /** 3175 Assign a value to the current thread's instance. This function has 3176 the same caveats as its overload. 3177 */ 3178 void get(T val) @property 3179 { 3180 assert(*stillThreadLocal, 3181 "Cannot call get() on this instance of WorkerLocalStorage " ~ 3182 "because it is no longer worker-local." 3183 ); 3184 3185 opIndexAssign(val, pool.workerIndex); 3186 } 3187 3188 /** 3189 Returns a range view of the values for all threads, which can be used 3190 to further process the results of each thread after running the parallel 3191 part of your algorithm. Do not use this method in the parallel portion 3192 of your algorithm. 3193 3194 Calling this function sets a flag indicating that this struct is no 3195 longer worker-local, and attempting to use the `get` method again 3196 will result in an assertion failure if assertions are enabled. 3197 */ 3198 WorkerLocalStorageRange!T toRange() @property 3199 { 3200 if (*stillThreadLocal) 3201 { 3202 *stillThreadLocal = false; 3203 3204 // Make absolutely sure results are visible to all threads. 3205 // This is probably not necessary since some other 3206 // synchronization primitive will be used to signal that the 3207 // parallel part of the algorithm is done, but the 3208 // performance impact should be negligible, so it's better 3209 // to be safe. 3210 ubyte barrierDummy; 3211 atomicSetUbyte(barrierDummy, 1); 3212 } 3213 3214 return WorkerLocalStorageRange!T(this); 3215 } 3216 } 3217 3218 /** 3219 Range primitives for worker-local storage. The purpose of this is to 3220 access results produced by each worker thread from a single thread once you 3221 are no longer using the worker-local storage from multiple threads. 3222 Do not use this struct in the parallel portion of your algorithm. 3223 3224 The proper way to instantiate this object is to call 3225 `WorkerLocalStorage.toRange`. Once instantiated, this object behaves 3226 as a finite random-access range with assignable, lvalue elements and 3227 a length equal to the number of worker threads in the `TaskPool` that 3228 created it plus 1. 3229 */ 3230 static struct WorkerLocalStorageRange(T) 3231 { 3232 private: 3233 WorkerLocalStorage!T workerLocalStorage; 3234 3235 size_t _length; 3236 size_t beginOffset; 3237 3238 this(WorkerLocalStorage!T wl) 3239 { 3240 this.workerLocalStorage = wl; 3241 _length = wl.size; 3242 } 3243 3244 public: 3245 ref front(this Qualified)() @property 3246 { 3247 return this[0]; 3248 } 3249 3250 ref back(this Qualified)() @property 3251 { 3252 return this[_length - 1]; 3253 } 3254 3255 void popFront() 3256 { 3257 if (_length > 0) 3258 { 3259 beginOffset++; 3260 _length--; 3261 } 3262 } 3263 3264 void popBack() 3265 { 3266 if (_length > 0) 3267 { 3268 _length--; 3269 } 3270 } 3271 3272 typeof(this) save() @property 3273 { 3274 return this; 3275 } 3276 3277 ref opIndex(this Qualified)(size_t index) 3278 { 3279 assert(index < _length); 3280 return workerLocalStorage[index + beginOffset]; 3281 } 3282 3283 void opIndexAssign(T val, size_t index) 3284 { 3285 assert(index < _length); 3286 workerLocalStorage[index] = val; 3287 } 3288 3289 typeof(this) opSlice(size_t lower, size_t upper) 3290 { 3291 assert(upper <= _length); 3292 auto newWl = this.workerLocalStorage; 3293 newWl.data += lower * newWl.elemSize; 3294 newWl.size = upper - lower; 3295 return typeof(this)(newWl); 3296 } 3297 3298 bool empty() const @property 3299 { 3300 return length == 0; 3301 } 3302 3303 size_t length() const @property 3304 { 3305 return _length; 3306 } 3307 } 3308 3309 /** 3310 Creates an instance of worker-local storage, initialized with a given 3311 value. The value is `lazy` so that you can, for example, easily 3312 create one instance of a class for each worker. For usage example, 3313 see the `WorkerLocalStorage` struct. 3314 */ 3315 WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init) 3316 { 3317 WorkerLocalStorage!T ret; 3318 ret.initialize(this); 3319 foreach (i; 0 .. size + 1) 3320 { 3321 ret[i] = initialVal; 3322 } 3323 3324 // Memory barrier to make absolutely sure that what we wrote is 3325 // visible to worker threads. 3326 ubyte barrierDummy; 3327 atomicSetUbyte(barrierDummy, 0); 3328 3329 return ret; 3330 } 3331 3332 /** 3333 Signals to all worker threads to terminate as soon as they are finished 3334 with their current `Task`, or immediately if they are not executing a 3335 `Task`. `Task`s that were in queue will not be executed unless 3336 a call to `Task.workForce`, `Task.yieldForce` or `Task.spinForce` 3337 causes them to be executed. 3338 3339 Use only if you have waited on every `Task` and therefore know the 3340 queue is empty, or if you speculatively executed some tasks and no longer 3341 need the results. 3342 */ 3343 void stop() @trusted 3344 { 3345 queueLock(); 3346 scope(exit) queueUnlock(); 3347 atomicSetUbyte(status, PoolState.stopNow); 3348 notifyAll(); 3349 } 3350 3351 /** 3352 Signals worker threads to terminate when the queue becomes empty. 3353 3354 If blocking argument is true, wait for all worker threads to terminate 3355 before returning. This option might be used in applications where 3356 task results are never consumed-- e.g. when `TaskPool` is employed as a 3357 rudimentary scheduler for tasks which communicate by means other than 3358 return values. 3359 3360 Warning: Calling this function with $(D blocking = true) from a worker 3361 thread that is a member of the same `TaskPool` that 3362 `finish` is being called on will result in a deadlock. 3363 */ 3364 void finish(bool blocking = false) @trusted 3365 { 3366 { 3367 queueLock(); 3368 scope(exit) queueUnlock(); 3369 atomicCasUbyte(status, PoolState.running, PoolState.finishing); 3370 notifyAll(); 3371 } 3372 if (blocking) 3373 { 3374 // Use this thread as a worker until everything is finished. 3375 executeWorkLoop(); 3376 3377 foreach (t; pool) 3378 { 3379 // Maybe there should be something here to prevent a thread 3380 // from calling join() on itself if this function is called 3381 // from a worker thread in the same pool, but: 3382 // 3383 // 1. Using an if statement to skip join() would result in 3384 // finish() returning without all tasks being finished. 3385 // 3386 // 2. If an exception were thrown, it would bubble up to the 3387 // Task from which finish() was called and likely be 3388 // swallowed. 3389 t.join(); 3390 } 3391 } 3392 } 3393 3394 /// Returns the number of worker threads in the pool. 3395 @property size_t size() @safe const pure nothrow 3396 { 3397 return pool.length; 3398 } 3399 3400 /** 3401 Put a `Task` object on the back of the task queue. The `Task` 3402 object may be passed by pointer or reference. 3403 3404 Example: 3405 --- 3406 import std.file; 3407 3408 // Create a task. 3409 auto t = task!read("foo.txt"); 3410 3411 // Add it to the queue to be executed. 3412 taskPool.put(t); 3413 --- 3414 3415 Notes: 3416 3417 @trusted overloads of this function are called for `Task`s if 3418 $(REF hasUnsharedAliasing, std,traits) is false for the `Task`'s 3419 return type or the function the `Task` executes is `pure`. 3420 `Task` objects that meet all other requirements specified in the 3421 `@trusted` overloads of `task` and `scopedTask` may be created 3422 and executed from `@safe` code via `Task.executeInNewThread` but 3423 not via `TaskPool`. 3424 3425 While this function takes the address of variables that may 3426 be on the stack, some overloads are marked as @trusted. 3427 `Task` includes a destructor that waits for the task to complete 3428 before destroying the stack frame it is allocated on. Therefore, 3429 it is impossible for the stack frame to be destroyed before the task is 3430 complete and no longer referenced by a `TaskPool`. 3431 */ 3432 void put(alias fun, Args...)(ref Task!(fun, Args) task) 3433 if (!isSafeReturn!(typeof(task))) 3434 { 3435 task.pool = this; 3436 abstractPut(task.basePtr); 3437 } 3438 3439 /// Ditto 3440 void put(alias fun, Args...)(Task!(fun, Args)* task) 3441 if (!isSafeReturn!(typeof(*task))) 3442 { 3443 import std.exception : enforce; 3444 enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); 3445 put(*task); 3446 } 3447 3448 @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task) 3449 if (isSafeReturn!(typeof(task))) 3450 { 3451 task.pool = this; 3452 abstractPut(task.basePtr); 3453 } 3454 3455 @trusted void put(alias fun, Args...)(Task!(fun, Args)* task) 3456 if (isSafeReturn!(typeof(*task))) 3457 { 3458 import std.exception : enforce; 3459 enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); 3460 put(*task); 3461 } 3462 3463 /** 3464 These properties control whether the worker threads are daemon threads. 3465 A daemon thread is automatically terminated when all non-daemon threads 3466 have terminated. A non-daemon thread will prevent a program from 3467 terminating as long as it has not terminated. 3468 3469 If any `TaskPool` with non-daemon threads is active, either `stop` 3470 or `finish` must be called on it before the program can terminate. 3471 3472 The worker treads in the `TaskPool` instance returned by the 3473 `taskPool` property are daemon by default. The worker threads of 3474 manually instantiated task pools are non-daemon by default. 3475 3476 Note: For a size zero pool, the getter arbitrarily returns true and the 3477 setter has no effect. 3478 */ 3479 bool isDaemon() @property @trusted 3480 { 3481 queueLock(); 3482 scope(exit) queueUnlock(); 3483 return (size == 0) ? true : pool[0].isDaemon; 3484 } 3485 3486 /// Ditto 3487 void isDaemon(bool newVal) @property @trusted 3488 { 3489 queueLock(); 3490 scope(exit) queueUnlock(); 3491 foreach (thread; pool) 3492 { 3493 thread.isDaemon = newVal; 3494 } 3495 } 3496 3497 /** 3498 These functions allow getting and setting the OS scheduling priority of 3499 the worker threads in this `TaskPool`. They forward to 3500 `core.thread.Thread.priority`, so a given priority value here means the 3501 same thing as an identical priority value in `core.thread`. 3502 3503 Note: For a size zero pool, the getter arbitrarily returns 3504 `core.thread.Thread.PRIORITY_MIN` and the setter has no effect. 3505 */ 3506 int priority() @property @trusted 3507 { 3508 return (size == 0) ? core.thread.Thread.PRIORITY_MIN : 3509 pool[0].priority; 3510 } 3511 3512 /// Ditto 3513 void priority(int newPriority) @property @trusted 3514 { 3515 if (size > 0) 3516 { 3517 foreach (t; pool) 3518 { 3519 t.priority = newPriority; 3520 } 3521 } 3522 } 3523 } 3524 3525 @system unittest 3526 { 3527 import std.algorithm.iteration : sum; 3528 import std.range : iota; 3529 import std.typecons : tuple; 3530 3531 enum N = 100; 3532 auto r = iota(1, N + 1); 3533 const expected = r.sum(); 3534 3535 // Just the range 3536 assert(taskPool.fold!"a + b"(r) == expected); 3537 3538 // Range and seeds 3539 assert(taskPool.fold!"a + b"(r, 0) == expected); 3540 assert(taskPool.fold!("a + b", "a + b")(r, 0, 0) == tuple(expected, expected)); 3541 3542 // Range, seeds, and work unit size 3543 assert(taskPool.fold!"a + b"(r, 0, 42) == expected); 3544 assert(taskPool.fold!("a + b", "a + b")(r, 0, 0, 42) == tuple(expected, expected)); 3545 } 3546 3547 // Issue 16705 3548 @system unittest 3549 { 3550 struct MyIota 3551 { 3552 size_t front; 3553 void popFront()(){front++;} 3554 auto empty(){return front >= 25;} 3555 auto opIndex(size_t i){return front+i;} 3556 auto length(){return 25-front;} 3557 } 3558 3559 auto mySum = taskPool.reduce!"a + b"(MyIota()); 3560 } 3561 3562 /** 3563 Returns a lazily initialized global instantiation of `TaskPool`. 3564 This function can safely be called concurrently from multiple non-worker 3565 threads. The worker threads in this pool are daemon threads, meaning that it 3566 is not necessary to call `TaskPool.stop` or `TaskPool.finish` before 3567 terminating the main thread. 3568 */ 3569 @property TaskPool taskPool() @trusted 3570 { 3571 import std.concurrency : initOnce; 3572 __gshared TaskPool pool; 3573 return initOnce!pool({ 3574 auto p = new TaskPool(defaultPoolThreads); 3575 p.isDaemon = true; 3576 return p; 3577 }()); 3578 } 3579 3580 private shared uint _defaultPoolThreads = uint.max; 3581 3582 /** 3583 These properties get and set the number of worker threads in the `TaskPool` 3584 instance returned by `taskPool`. The default value is `totalCPUs` - 1. 3585 Calling the setter after the first call to `taskPool` does not changes 3586 number of worker threads in the instance returned by `taskPool`. 3587 */ 3588 @property uint defaultPoolThreads() @trusted 3589 { 3590 const local = atomicLoad(_defaultPoolThreads); 3591 return local < uint.max ? local : totalCPUs - 1; 3592 } 3593 3594 /// Ditto 3595 @property void defaultPoolThreads(uint newVal) @trusted 3596 { 3597 atomicStore(_defaultPoolThreads, newVal); 3598 } 3599 3600 /** 3601 Convenience functions that forwards to `taskPool.parallel`. The 3602 purpose of these is to make parallel foreach less verbose and more 3603 readable. 3604 3605 Example: 3606 --- 3607 // Find the logarithm of every number from 3608 // 1 to 1_000_000 in parallel, using the 3609 // default TaskPool instance. 3610 auto logs = new double[1_000_000]; 3611 3612 foreach (i, ref elem; parallel(logs)) 3613 { 3614 elem = log(i + 1.0); 3615 } 3616 --- 3617 3618 */ 3619 ParallelForeach!R parallel(R)(R range) 3620 { 3621 return taskPool.parallel(range); 3622 } 3623 3624 /// Ditto 3625 ParallelForeach!R parallel(R)(R range, size_t workUnitSize) 3626 { 3627 return taskPool.parallel(range, workUnitSize); 3628 } 3629 3630 // `each` should be usable with parallel 3631 // https://issues.dlang.org/show_bug.cgi?id=17019 3632 @system unittest 3633 { 3634 import std.algorithm.iteration : each, sum; 3635 import std.range : iota; 3636 3637 // check behavior with parallel 3638 auto arr = new int[10]; 3639 parallel(arr).each!((ref e) => e += 1); 3640 assert(arr.sum == 10); 3641 3642 auto arrIndex = new int[10]; 3643 parallel(arrIndex).each!((i, ref e) => e += i); 3644 assert(arrIndex.sum == 10.iota.sum); 3645 } 3646 3647 // https://issues.dlang.org/show_bug.cgi?id=22745 3648 @system unittest 3649 { 3650 auto pool = new TaskPool(0); 3651 int[] empty; 3652 foreach (i; pool.parallel(empty)) {} 3653 pool.finish(); 3654 } 3655 3656 // Thrown when a parallel foreach loop is broken from. 3657 class ParallelForeachError : Error 3658 { 3659 this() 3660 { 3661 super("Cannot break from a parallel foreach loop using break, return, " 3662 ~ "labeled break/continue or goto statements."); 3663 } 3664 } 3665 3666 /*------Structs that implement opApply for parallel foreach.------------------*/ 3667 private template randLen(R) 3668 { 3669 enum randLen = isRandomAccessRange!R && hasLength!R; 3670 } 3671 3672 private void submitAndExecute( 3673 TaskPool pool, 3674 scope void delegate() doIt 3675 ) 3676 { 3677 import core.exception : OutOfMemoryError; 3678 immutable nThreads = pool.size + 1; 3679 3680 alias PTask = typeof(scopedTask(doIt)); 3681 import core.stdc.stdlib : malloc, free; 3682 import core.stdc.string : memcpy; 3683 3684 // The logical thing to do would be to just use alloca() here, but that 3685 // causes problems on Windows for reasons that I don't understand 3686 // (tentatively a compiler bug) and definitely doesn't work on Posix due 3687 // to https://issues.dlang.org/show_bug.cgi?id=3753. 3688 // Therefore, allocate a fixed buffer and fall back to `malloc()` if 3689 // someone's using a ridiculous amount of threads. 3690 // Also, the using a byte array instead of a PTask array as the fixed buffer 3691 // is to prevent d'tors from being called on uninitialized excess PTask 3692 // instances. 3693 enum nBuf = 64; 3694 byte[nBuf * PTask.sizeof] buf = void; 3695 PTask[] tasks; 3696 if (nThreads <= nBuf) 3697 { 3698 tasks = (cast(PTask*) buf.ptr)[0 .. nThreads]; 3699 } 3700 else 3701 { 3702 auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof); 3703 if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism."); 3704 tasks = ptr[0 .. nThreads]; 3705 } 3706 3707 scope(exit) 3708 { 3709 if (nThreads > nBuf) 3710 { 3711 free(tasks.ptr); 3712 } 3713 } 3714 3715 foreach (ref t; tasks) 3716 { 3717 import core.stdc.string : memcpy; 3718 3719 // This silly looking code is necessary to prevent d'tors from being 3720 // called on uninitialized objects. 3721 auto temp = scopedTask(doIt); 3722 memcpy(&t, &temp, PTask.sizeof); 3723 3724 // This has to be done to t after copying, not temp before copying. 3725 // Otherwise, temp's destructor will sit here and wait for the 3726 // task to finish. 3727 t.pool = pool; 3728 } 3729 3730 foreach (i; 1 .. tasks.length - 1) 3731 { 3732 tasks[i].next = tasks[i + 1].basePtr; 3733 tasks[i + 1].prev = tasks[i].basePtr; 3734 } 3735 3736 if (tasks.length > 1) 3737 { 3738 pool.queueLock(); 3739 scope(exit) pool.queueUnlock(); 3740 3741 pool.abstractPutGroupNoSync( 3742 tasks[1].basePtr, 3743 tasks[$ - 1].basePtr 3744 ); 3745 } 3746 3747 if (tasks.length > 0) 3748 { 3749 try 3750 { 3751 tasks[0].job(); 3752 } 3753 catch (Throwable e) 3754 { 3755 tasks[0].exception = e; // nocoverage 3756 } 3757 tasks[0].taskStatus = TaskStatus.done; 3758 3759 // Try to execute each of these in the current thread 3760 foreach (ref task; tasks[1..$]) 3761 { 3762 pool.tryDeleteExecute(task.basePtr); 3763 } 3764 } 3765 3766 Throwable firstException; 3767 3768 foreach (i, ref task; tasks) 3769 { 3770 try 3771 { 3772 task.yieldForce; 3773 } 3774 catch (Throwable e) 3775 { 3776 /* Chain e to front because order doesn't matter and because 3777 * e is not likely to be a chain itself (so fewer traversals) 3778 */ 3779 firstException = Throwable.chainTogether(e, firstException); 3780 continue; 3781 } 3782 } 3783 3784 if (firstException) throw firstException; 3785 } 3786 3787 void foreachErr() 3788 { 3789 throw new ParallelForeachError(); 3790 } 3791 3792 int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg) 3793 { 3794 with(p) 3795 { 3796 int res = 0; 3797 size_t index = 0; 3798 3799 // The explicit ElementType!R in the foreach loops is necessary for 3800 // correct behavior when iterating over strings. 3801 static if (hasLvalueElements!R) 3802 { 3803 foreach (ref ElementType!R elem; range) 3804 { 3805 static if (Parameters!dg.length == 2) 3806 { 3807 res = dg(index, elem); 3808 } 3809 else 3810 { 3811 res = dg(elem); 3812 } 3813 if (res) break; 3814 index++; 3815 } 3816 } 3817 else 3818 { 3819 foreach (ElementType!R elem; range) 3820 { 3821 static if (Parameters!dg.length == 2) 3822 { 3823 res = dg(index, elem); 3824 } 3825 else 3826 { 3827 res = dg(elem); 3828 } 3829 if (res) break; 3830 index++; 3831 } 3832 } 3833 if (res) foreachErr; 3834 return res; 3835 } 3836 } 3837 3838 private enum string parallelApplyMixinRandomAccess = q{ 3839 // Handle empty thread pool as special case. 3840 if (pool.size == 0) 3841 { 3842 return doSizeZeroCase(this, dg); 3843 } 3844 3845 // Whether iteration is with or without an index variable. 3846 enum withIndex = Parameters!(typeof(dg)).length == 2; 3847 3848 shared size_t workUnitIndex = size_t.max; // Effectively -1: chunkIndex + 1 == 0 3849 immutable len = range.length; 3850 if (!len) return 0; 3851 3852 shared bool shouldContinue = true; 3853 3854 void doIt() 3855 { 3856 import std.algorithm.comparison : min; 3857 3858 scope(failure) 3859 { 3860 // If an exception is thrown, all threads should bail. 3861 atomicStore(shouldContinue, false); 3862 } 3863 3864 while (atomicLoad(shouldContinue)) 3865 { 3866 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); 3867 immutable start = workUnitSize * myUnitIndex; 3868 if (start >= len) 3869 { 3870 atomicStore(shouldContinue, false); 3871 break; 3872 } 3873 3874 immutable end = min(len, start + workUnitSize); 3875 3876 foreach (i; start .. end) 3877 { 3878 static if (withIndex) 3879 { 3880 if (dg(i, range[i])) foreachErr(); 3881 } 3882 else 3883 { 3884 if (dg(range[i])) foreachErr(); 3885 } 3886 } 3887 } 3888 } 3889 3890 submitAndExecute(pool, &doIt); 3891 3892 return 0; 3893 }; 3894 3895 enum string parallelApplyMixinInputRange = q{ 3896 // Handle empty thread pool as special case. 3897 if (pool.size == 0) 3898 { 3899 return doSizeZeroCase(this, dg); 3900 } 3901 3902 // Whether iteration is with or without an index variable. 3903 enum withIndex = Parameters!(typeof(dg)).length == 2; 3904 3905 // This protects the range while copying it. 3906 auto rangeMutex = new Mutex(); 3907 3908 shared bool shouldContinue = true; 3909 3910 // The total number of elements that have been popped off range. 3911 // This is updated only while protected by rangeMutex; 3912 size_t nPopped = 0; 3913 3914 static if ( 3915 is(typeof(range.buf1)) && 3916 is(typeof(range.bufPos)) && 3917 is(typeof(range.doBufSwap())) 3918 ) 3919 { 3920 // Make sure we don't have the buffer recycling overload of 3921 // asyncBuf. 3922 static if ( 3923 is(typeof(range.source)) && 3924 isRoundRobin!(typeof(range.source)) 3925 ) 3926 { 3927 static assert(0, "Cannot execute a parallel foreach loop on " ~ 3928 "the buffer recycling overload of asyncBuf."); 3929 } 3930 3931 enum bool bufferTrick = true; 3932 } 3933 else 3934 { 3935 enum bool bufferTrick = false; 3936 } 3937 3938 void doIt() 3939 { 3940 scope(failure) 3941 { 3942 // If an exception is thrown, all threads should bail. 3943 atomicStore(shouldContinue, false); 3944 } 3945 3946 static if (hasLvalueElements!R) 3947 { 3948 alias Temp = ElementType!R*[]; 3949 Temp temp; 3950 3951 // Returns: The previous value of nPopped. 3952 size_t makeTemp() 3953 { 3954 import std.algorithm.internal : addressOf; 3955 import std.array : uninitializedArray; 3956 3957 if (temp is null) 3958 { 3959 temp = uninitializedArray!Temp(workUnitSize); 3960 } 3961 3962 rangeMutex.lock(); 3963 scope(exit) rangeMutex.unlock(); 3964 3965 size_t i = 0; 3966 for (; i < workUnitSize && !range.empty; range.popFront(), i++) 3967 { 3968 temp[i] = addressOf(range.front); 3969 } 3970 3971 temp = temp[0 .. i]; 3972 auto ret = nPopped; 3973 nPopped += temp.length; 3974 return ret; 3975 } 3976 3977 } 3978 else 3979 { 3980 3981 alias Temp = ElementType!R[]; 3982 Temp temp; 3983 3984 // Returns: The previous value of nPopped. 3985 static if (!bufferTrick) size_t makeTemp() 3986 { 3987 import std.array : uninitializedArray; 3988 3989 if (temp is null) 3990 { 3991 temp = uninitializedArray!Temp(workUnitSize); 3992 } 3993 3994 rangeMutex.lock(); 3995 scope(exit) rangeMutex.unlock(); 3996 3997 size_t i = 0; 3998 for (; i < workUnitSize && !range.empty; range.popFront(), i++) 3999 { 4000 temp[i] = range.front; 4001 } 4002 4003 temp = temp[0 .. i]; 4004 auto ret = nPopped; 4005 nPopped += temp.length; 4006 return ret; 4007 } 4008 4009 static if (bufferTrick) size_t makeTemp() 4010 { 4011 import std.algorithm.mutation : swap; 4012 rangeMutex.lock(); 4013 scope(exit) rangeMutex.unlock(); 4014 4015 // Elide copying by just swapping buffers. 4016 temp.length = range.buf1.length; 4017 swap(range.buf1, temp); 4018 4019 // This is necessary in case popFront() has been called on 4020 // range before entering the parallel foreach loop. 4021 temp = temp[range.bufPos..$]; 4022 4023 static if (is(typeof(range._length))) 4024 { 4025 range._length -= (temp.length - range.bufPos); 4026 } 4027 4028 range.doBufSwap(); 4029 auto ret = nPopped; 4030 nPopped += temp.length; 4031 return ret; 4032 } 4033 } 4034 4035 while (atomicLoad(shouldContinue)) 4036 { 4037 auto overallIndex = makeTemp(); 4038 if (temp.empty) 4039 { 4040 atomicStore(shouldContinue, false); 4041 break; 4042 } 4043 4044 foreach (i; 0 .. temp.length) 4045 { 4046 scope(success) overallIndex++; 4047 4048 static if (hasLvalueElements!R) 4049 { 4050 static if (withIndex) 4051 { 4052 if (dg(overallIndex, *temp[i])) foreachErr(); 4053 } 4054 else 4055 { 4056 if (dg(*temp[i])) foreachErr(); 4057 } 4058 } 4059 else 4060 { 4061 static if (withIndex) 4062 { 4063 if (dg(overallIndex, temp[i])) foreachErr(); 4064 } 4065 else 4066 { 4067 if (dg(temp[i])) foreachErr(); 4068 } 4069 } 4070 } 4071 } 4072 } 4073 4074 submitAndExecute(pool, &doIt); 4075 4076 return 0; 4077 }; 4078 4079 4080 private struct ParallelForeach(R) 4081 { 4082 TaskPool pool; 4083 R range; 4084 size_t workUnitSize; 4085 alias E = ElementType!R; 4086 4087 static if (hasLvalueElements!R) 4088 { 4089 alias NoIndexDg = int delegate(ref E); 4090 alias IndexDg = int delegate(size_t, ref E); 4091 } 4092 else 4093 { 4094 alias NoIndexDg = int delegate(E); 4095 alias IndexDg = int delegate(size_t, E); 4096 } 4097 4098 int opApply(scope NoIndexDg dg) 4099 { 4100 static if (randLen!R) 4101 { 4102 mixin(parallelApplyMixinRandomAccess); 4103 } 4104 else 4105 { 4106 mixin(parallelApplyMixinInputRange); 4107 } 4108 } 4109 4110 int opApply(scope IndexDg dg) 4111 { 4112 static if (randLen!R) 4113 { 4114 mixin(parallelApplyMixinRandomAccess); 4115 } 4116 else 4117 { 4118 mixin(parallelApplyMixinInputRange); 4119 } 4120 } 4121 } 4122 4123 /* 4124 This struct buffers the output of a callable that outputs data into a 4125 user-supplied buffer into a set of buffers of some fixed size. It allows these 4126 buffers to be accessed with an input range interface. This is used internally 4127 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an 4128 instance and forwards it to the input range overload of asyncBuf. 4129 */ 4130 private struct RoundRobinBuffer(C1, C2) 4131 { 4132 // No need for constraints because they're already checked for in asyncBuf. 4133 4134 alias Array = Parameters!(C1.init)[0]; 4135 alias T = typeof(Array.init[0]); 4136 4137 T[][] bufs; 4138 size_t index; 4139 C1 nextDel; 4140 C2 emptyDel; 4141 bool _empty; 4142 bool primed; 4143 4144 this( 4145 C1 nextDel, 4146 C2 emptyDel, 4147 size_t initialBufSize, 4148 size_t nBuffers 4149 ) { 4150 this.nextDel = nextDel; 4151 this.emptyDel = emptyDel; 4152 bufs.length = nBuffers; 4153 4154 foreach (ref buf; bufs) 4155 { 4156 buf.length = initialBufSize; 4157 } 4158 } 4159 4160 void prime() 4161 in 4162 { 4163 assert(!empty); 4164 } 4165 do 4166 { 4167 scope(success) primed = true; 4168 nextDel(bufs[index]); 4169 } 4170 4171 4172 T[] front() @property 4173 in 4174 { 4175 assert(!empty); 4176 } 4177 do 4178 { 4179 if (!primed) prime(); 4180 return bufs[index]; 4181 } 4182 4183 void popFront() 4184 { 4185 if (empty || emptyDel()) 4186 { 4187 _empty = true; 4188 return; 4189 } 4190 4191 index = (index + 1) % bufs.length; 4192 primed = false; 4193 } 4194 4195 bool empty() @property const @safe pure nothrow 4196 { 4197 return _empty; 4198 } 4199 } 4200 4201 version (StdUnittest) 4202 { 4203 // This was the only way I could get nested maps to work. 4204 private __gshared TaskPool poolInstance; 4205 } 4206 4207 // These test basic functionality but don't stress test for threading bugs. 4208 // These are the tests that should be run every time Phobos is compiled. 4209 @system unittest 4210 { 4211 import std.algorithm.comparison : equal, min, max; 4212 import std.algorithm.iteration : filter, map, reduce; 4213 import std.array : split; 4214 import std.conv : text; 4215 import std.exception : assertThrown; 4216 import std.math.operations : isClose; 4217 import std.math.algebraic : sqrt, abs; 4218 import std.math.exponential : log; 4219 import std.range : indexed, iota, join; 4220 import std.typecons : Tuple, tuple; 4221 import std.stdio; 4222 4223 poolInstance = new TaskPool(2); 4224 scope(exit) poolInstance.stop(); 4225 4226 // The only way this can be verified is manually. 4227 debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs); 4228 4229 auto oldPriority = poolInstance.priority; 4230 poolInstance.priority = Thread.PRIORITY_MAX; 4231 assert(poolInstance.priority == Thread.PRIORITY_MAX); 4232 4233 poolInstance.priority = Thread.PRIORITY_MIN; 4234 assert(poolInstance.priority == Thread.PRIORITY_MIN); 4235 4236 poolInstance.priority = oldPriority; 4237 assert(poolInstance.priority == oldPriority); 4238 4239 static void refFun(ref uint num) 4240 { 4241 num++; 4242 } 4243 4244 uint x; 4245 4246 // Test task(). 4247 auto t = task!refFun(x); 4248 poolInstance.put(t); 4249 t.yieldForce; 4250 assert(t.args[0] == 1); 4251 4252 auto t2 = task(&refFun, x); 4253 poolInstance.put(t2); 4254 t2.yieldForce; 4255 assert(t2.args[0] == 1); 4256 4257 // Test scopedTask(). 4258 auto st = scopedTask!refFun(x); 4259 poolInstance.put(st); 4260 st.yieldForce; 4261 assert(st.args[0] == 1); 4262 4263 auto st2 = scopedTask(&refFun, x); 4264 poolInstance.put(st2); 4265 st2.yieldForce; 4266 assert(st2.args[0] == 1); 4267 4268 // Test executeInNewThread(). 4269 auto ct = scopedTask!refFun(x); 4270 ct.executeInNewThread(Thread.PRIORITY_MAX); 4271 ct.yieldForce; 4272 assert(ct.args[0] == 1); 4273 4274 // Test ref return. 4275 uint toInc = 0; 4276 static ref T makeRef(T)(ref T num) 4277 { 4278 return num; 4279 } 4280 4281 auto t3 = task!makeRef(toInc); 4282 taskPool.put(t3); 4283 assert(t3.args[0] == 0); 4284 t3.spinForce++; 4285 assert(t3.args[0] == 1); 4286 4287 static void testSafe() @safe { 4288 static int bump(int num) 4289 { 4290 return num + 1; 4291 } 4292 4293 auto safePool = new TaskPool(0); 4294 auto t = task(&bump, 1); 4295 taskPool.put(t); 4296 assert(t.yieldForce == 2); 4297 4298 auto st = scopedTask(&bump, 1); 4299 taskPool.put(st); 4300 assert(st.yieldForce == 2); 4301 safePool.stop(); 4302 } 4303 4304 auto arr = [1,2,3,4,5]; 4305 auto nums = new uint[5]; 4306 auto nums2 = new uint[5]; 4307 4308 foreach (i, ref elem; poolInstance.parallel(arr)) 4309 { 4310 elem++; 4311 nums[i] = cast(uint) i + 2; 4312 nums2[i] = elem; 4313 } 4314 4315 assert(nums == [2,3,4,5,6], text(nums)); 4316 assert(nums2 == nums, text(nums2)); 4317 assert(arr == nums, text(arr)); 4318 4319 // Test const/immutable arguments. 4320 static int add(int lhs, int rhs) 4321 { 4322 return lhs + rhs; 4323 } 4324 immutable addLhs = 1; 4325 immutable addRhs = 2; 4326 auto addTask = task(&add, addLhs, addRhs); 4327 auto addScopedTask = scopedTask(&add, addLhs, addRhs); 4328 poolInstance.put(addTask); 4329 poolInstance.put(addScopedTask); 4330 assert(addTask.yieldForce == 3); 4331 assert(addScopedTask.yieldForce == 3); 4332 4333 // Test parallel foreach with non-random access range. 4334 auto range = filter!"a != 666"([0, 1, 2, 3, 4]); 4335 4336 foreach (i, elem; poolInstance.parallel(range)) 4337 { 4338 nums[i] = cast(uint) i; 4339 } 4340 4341 assert(nums == [0,1,2,3,4]); 4342 4343 auto logs = new double[1_000_000]; 4344 foreach (i, ref elem; poolInstance.parallel(logs)) 4345 { 4346 elem = log(i + 1.0); 4347 } 4348 4349 foreach (i, elem; logs) 4350 { 4351 assert(isClose(elem, log(double(i + 1)))); 4352 } 4353 4354 assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]); 4355 assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]); 4356 assert(poolInstance.amap!("a * a", "-a")([1,2,3]) == 4357 [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4358 4359 auto tupleBuf = new Tuple!(int, int)[3]; 4360 poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf); 4361 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4362 poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf); 4363 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4364 4365 // Test amap with a non-array buffer. 4366 auto toIndex = new int[5]; 4367 auto ind = indexed(toIndex, [3, 1, 4, 0, 2]); 4368 poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind); 4369 assert(equal(ind, [2, 4, 6, 8, 10])); 4370 assert(equal(toIndex, [8, 4, 10, 2, 6])); 4371 poolInstance.amap!"a / 2"(ind, ind); 4372 assert(equal(ind, [1, 2, 3, 4, 5])); 4373 assert(equal(toIndex, [4, 2, 5, 1, 3])); 4374 4375 auto buf = new int[5]; 4376 poolInstance.amap!"a * a"([1,2,3,4,5], buf); 4377 assert(buf == [1,4,9,16,25]); 4378 poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf); 4379 assert(buf == [1,4,9,16,25]); 4380 4381 assert(poolInstance.reduce!"a + b"([1]) == 1); 4382 assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10); 4383 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10); 4384 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10); 4385 assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4)); 4386 assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) == 4387 tuple(10, 24)); 4388 4389 immutable serialAns = reduce!"a + b"(iota(1000)); 4390 assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns); 4391 assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns); 4392 4393 // Test worker-local storage. 4394 auto wl = poolInstance.workerLocalStorage(0); 4395 foreach (i; poolInstance.parallel(iota(1000), 1)) 4396 { 4397 wl.get = wl.get + i; 4398 } 4399 4400 auto wlRange = wl.toRange; 4401 auto parallelSum = poolInstance.reduce!"a + b"(wlRange); 4402 assert(parallelSum == 499500); 4403 assert(wlRange[0 .. 1][0] == wlRange[0]); 4404 assert(wlRange[1 .. 2][0] == wlRange[1]); 4405 4406 // Test finish() 4407 { 4408 static void slowFun() { Thread.sleep(dur!"msecs"(1)); } 4409 4410 auto pool1 = new TaskPool(); 4411 auto tSlow = task!slowFun(); 4412 pool1.put(tSlow); 4413 pool1.finish(); 4414 tSlow.yieldForce; 4415 // Can't assert that pool1.status == PoolState.stopNow because status 4416 // doesn't change until after the "done" flag is set and the waiting 4417 // thread is woken up. 4418 4419 auto pool2 = new TaskPool(); 4420 auto tSlow2 = task!slowFun(); 4421 pool2.put(tSlow2); 4422 pool2.finish(true); // blocking 4423 assert(tSlow2.done); 4424 4425 // Test fix for https://issues.dlang.org/show_bug.cgi?id=8582 by making pool size zero. 4426 auto pool3 = new TaskPool(0); 4427 auto tSlow3 = task!slowFun(); 4428 pool3.put(tSlow3); 4429 pool3.finish(true); // blocking 4430 assert(tSlow3.done); 4431 4432 // This is correct because no thread will terminate unless pool2.status 4433 // and pool3.status have already been set to stopNow. 4434 assert(pool2.status == TaskPool.PoolState.stopNow); 4435 assert(pool3.status == TaskPool.PoolState.stopNow); 4436 } 4437 4438 // Test default pool stuff. 4439 assert(taskPool.size == totalCPUs - 1); 4440 4441 nums = new uint[1000]; 4442 foreach (i; parallel(iota(1000))) 4443 { 4444 nums[i] = cast(uint) i; 4445 } 4446 assert(equal(nums, iota(1000))); 4447 4448 assert(equal( 4449 poolInstance.map!"a * a"(iota(3_000_001), 10_000), 4450 map!"a * a"(iota(3_000_001)) 4451 )); 4452 4453 // The filter is to kill random access and test the non-random access 4454 // branch. 4455 assert(equal( 4456 poolInstance.map!"a * a"( 4457 filter!"a == a"(iota(3_000_001) 4458 ), 10_000, 1000), 4459 map!"a * a"(iota(3_000_001)) 4460 )); 4461 4462 assert( 4463 reduce!"a + b"(0UL, 4464 poolInstance.map!"a * a"(iota(300_001), 10_000) 4465 ) == 4466 reduce!"a + b"(0UL, 4467 map!"a * a"(iota(300_001)) 4468 ) 4469 ); 4470 4471 assert(equal( 4472 iota(1_000_002), 4473 poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002))) 4474 )); 4475 4476 { 4477 import std.conv : to; 4478 import std.file : deleteme; 4479 4480 string temp_file = deleteme ~ "-tempDelMe.txt"; 4481 auto file = File(temp_file, "wb"); 4482 scope(exit) 4483 { 4484 file.close(); 4485 import std.file; 4486 remove(temp_file); 4487 } 4488 4489 auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]]; 4490 foreach (row; written) 4491 { 4492 file.writeln(join(to!(string[])(row), "\t")); 4493 } 4494 4495 file = File(temp_file); 4496 4497 void next(ref char[] buf) 4498 { 4499 file.readln(buf); 4500 import std.string : chomp; 4501 buf = chomp(buf); 4502 } 4503 4504 double[][] read; 4505 auto asyncReader = taskPool.asyncBuf(&next, &file.eof); 4506 4507 foreach (line; asyncReader) 4508 { 4509 if (line.length == 0) continue; 4510 auto ls = line.split("\t"); 4511 read ~= to!(double[])(ls); 4512 } 4513 4514 assert(read == written); 4515 file.close(); 4516 } 4517 4518 // Test Map/AsyncBuf chaining. 4519 4520 auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100); 4521 auto temp = poolInstance.map!sqrt( 4522 abuf, 100, 5 4523 ); 4524 auto lmchain = poolInstance.map!"a * a"(temp, 100, 5); 4525 lmchain.popFront(); 4526 4527 int ii; 4528 foreach ( elem; (lmchain)) 4529 { 4530 if (!isClose(elem, ii)) 4531 { 4532 stderr.writeln(ii, '\t', elem); 4533 } 4534 ii++; 4535 } 4536 4537 // Test buffer trick in parallel foreach. 4538 abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100); 4539 abuf.popFront(); 4540 auto bufTrickTest = new size_t[abuf.length]; 4541 foreach (i, elem; parallel(abuf)) 4542 { 4543 bufTrickTest[i] = i; 4544 } 4545 4546 assert(equal(iota(1_000_000), bufTrickTest)); 4547 4548 auto myTask = task!(abs)(-1); 4549 taskPool.put(myTask); 4550 assert(myTask.spinForce == 1); 4551 4552 // Test that worker local storage from one pool receives an index of 0 4553 // when the index is queried w.r.t. another pool. The only way to do this 4554 // is non-deterministically. 4555 foreach (i; parallel(iota(1000), 1)) 4556 { 4557 assert(poolInstance.workerIndex == 0); 4558 } 4559 4560 foreach (i; poolInstance.parallel(iota(1000), 1)) 4561 { 4562 assert(taskPool.workerIndex == 0); 4563 } 4564 4565 // Test exception handling. 4566 static void parallelForeachThrow() 4567 { 4568 foreach (elem; parallel(iota(10))) 4569 { 4570 throw new Exception(""); 4571 } 4572 } 4573 4574 assertThrown!Exception(parallelForeachThrow()); 4575 4576 static int reduceException(int a, int b) 4577 { 4578 throw new Exception(""); 4579 } 4580 4581 assertThrown!Exception( 4582 poolInstance.reduce!reduceException(iota(3)) 4583 ); 4584 4585 static int mapException(int a) 4586 { 4587 throw new Exception(""); 4588 } 4589 4590 assertThrown!Exception( 4591 poolInstance.amap!mapException(iota(3)) 4592 ); 4593 4594 static void mapThrow() 4595 { 4596 auto m = poolInstance.map!mapException(iota(3)); 4597 m.popFront(); 4598 } 4599 4600 assertThrown!Exception(mapThrow()); 4601 4602 struct ThrowingRange 4603 { 4604 @property int front() 4605 { 4606 return 1; 4607 } 4608 void popFront() 4609 { 4610 throw new Exception(""); 4611 } 4612 enum bool empty = false; 4613 } 4614 4615 assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init)); 4616 } 4617 4618 //version = parallelismStressTest; 4619 4620 // These are more like stress tests than real unit tests. They print out 4621 // tons of stuff and should not be run every time make unittest is run. 4622 version (parallelismStressTest) 4623 { 4624 @system unittest 4625 { 4626 import std.stdio : stderr, writeln, readln; 4627 import std.range : iota; 4628 import std.algorithm.iteration : filter, reduce; 4629 4630 size_t attempt; 4631 for (; attempt < 10; attempt++) 4632 foreach (poolSize; [0, 4]) 4633 { 4634 4635 poolInstance = new TaskPool(poolSize); 4636 4637 uint[] numbers = new uint[1_000]; 4638 4639 foreach (i; poolInstance.parallel( iota(0, numbers.length)) ) 4640 { 4641 numbers[i] = cast(uint) i; 4642 } 4643 4644 // Make sure it works. 4645 foreach (i; 0 .. numbers.length) 4646 { 4647 assert(numbers[i] == i); 4648 } 4649 4650 stderr.writeln("Done creating nums."); 4651 4652 4653 auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000)); 4654 foreach (num; poolInstance.parallel(myNumbers)) 4655 { 4656 assert(num % 7 > 0 && num < 1000); 4657 } 4658 stderr.writeln("Done modulus test."); 4659 4660 uint[] squares = poolInstance.amap!"a * a"(numbers, 100); 4661 assert(squares.length == numbers.length); 4662 foreach (i, number; numbers) 4663 { 4664 assert(squares[i] == number * number); 4665 } 4666 stderr.writeln("Done squares."); 4667 4668 auto sumFuture = task!( reduce!"a + b" )(numbers); 4669 poolInstance.put(sumFuture); 4670 4671 ulong sumSquares = 0; 4672 foreach (elem; numbers) 4673 { 4674 sumSquares += elem * elem; 4675 } 4676 4677 uint mySum = sumFuture.spinForce(); 4678 assert(mySum == 999 * 1000 / 2); 4679 4680 auto mySumParallel = poolInstance.reduce!"a + b"(numbers); 4681 assert(mySum == mySumParallel); 4682 stderr.writeln("Done sums."); 4683 4684 auto myTask = task( 4685 { 4686 synchronized writeln("Our lives are parallel...Our lives are parallel."); 4687 }); 4688 poolInstance.put(myTask); 4689 4690 auto nestedOuter = "abcd"; 4691 auto nestedInner = iota(0, 10, 2); 4692 4693 foreach (i, letter; poolInstance.parallel(nestedOuter, 1)) 4694 { 4695 foreach (j, number; poolInstance.parallel(nestedInner, 1)) 4696 { 4697 synchronized writeln(i, ": ", letter, " ", j, ": ", number); 4698 } 4699 } 4700 4701 poolInstance.stop(); 4702 } 4703 4704 assert(attempt == 10); 4705 writeln("Press enter to go to next round of unittests."); 4706 readln(); 4707 } 4708 4709 // These unittests are intended more for actual testing and not so much 4710 // as examples. 4711 @system unittest 4712 { 4713 import std.stdio : stderr; 4714 import std.range : iota; 4715 import std.algorithm.iteration : filter, reduce; 4716 import std.math.algebraic : sqrt; 4717 import std.math.operations : isClose; 4718 import std.math.traits : isNaN; 4719 import std.conv : text; 4720 4721 foreach (attempt; 0 .. 10) 4722 foreach (poolSize; [0, 4]) 4723 { 4724 poolInstance = new TaskPool(poolSize); 4725 4726 // Test indexing. 4727 stderr.writeln("Creator Raw Index: ", poolInstance.threadIndex); 4728 assert(poolInstance.workerIndex() == 0); 4729 4730 // Test worker-local storage. 4731 auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1); 4732 foreach (i; poolInstance.parallel(iota(0U, 1_000_000))) 4733 { 4734 workerLocalStorage.get++; 4735 } 4736 assert(reduce!"a + b"(workerLocalStorage.toRange) == 4737 1_000_000 + poolInstance.size + 1); 4738 4739 // Make sure work is reasonably balanced among threads. This test is 4740 // non-deterministic and is more of a sanity check than something that 4741 // has an absolute pass/fail. 4742 shared(uint)[void*] nJobsByThread; 4743 foreach (thread; poolInstance.pool) 4744 { 4745 nJobsByThread[cast(void*) thread] = 0; 4746 } 4747 nJobsByThread[ cast(void*) Thread.getThis()] = 0; 4748 4749 foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 )) 4750 { 4751 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1); 4752 } 4753 4754 stderr.writeln("\nCurrent thread is: ", 4755 cast(void*) Thread.getThis()); 4756 stderr.writeln("Workload distribution: "); 4757 foreach (k, v; nJobsByThread) 4758 { 4759 stderr.writeln(k, '\t', v); 4760 } 4761 4762 // Test whether amap can be nested. 4763 real[][] matrix = new real[][](1000, 1000); 4764 foreach (i; poolInstance.parallel( iota(0, matrix.length) )) 4765 { 4766 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) )) 4767 { 4768 matrix[i][j] = i * j; 4769 } 4770 } 4771 4772 // Get around weird bugs having to do w/ sqrt being an intrinsic: 4773 static real mySqrt(real num) 4774 { 4775 return sqrt(num); 4776 } 4777 4778 static real[] parallelSqrt(real[] nums) 4779 { 4780 return poolInstance.amap!mySqrt(nums); 4781 } 4782 4783 real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix); 4784 4785 foreach (i, row; sqrtMatrix) 4786 { 4787 foreach (j, elem; row) 4788 { 4789 real shouldBe = sqrt( cast(real) i * j); 4790 assert(isClose(shouldBe, elem)); 4791 sqrtMatrix[i][j] = shouldBe; 4792 } 4793 } 4794 4795 auto saySuccess = task( 4796 { 4797 stderr.writeln( 4798 "Success doing matrix stuff that involves nested pool use."); 4799 }); 4800 poolInstance.put(saySuccess); 4801 saySuccess.workForce(); 4802 4803 // A more thorough test of amap, reduce: Find the sum of the square roots of 4804 // matrix. 4805 4806 static real parallelSum(real[] input) 4807 { 4808 return poolInstance.reduce!"a + b"(input); 4809 } 4810 4811 auto sumSqrt = poolInstance.reduce!"a + b"( 4812 poolInstance.amap!parallelSum( 4813 sqrtMatrix 4814 ) 4815 ); 4816 4817 assert(isClose(sumSqrt, 4.437e8, 1e-2)); 4818 stderr.writeln("Done sum of square roots."); 4819 4820 // Test whether tasks work with function pointers. 4821 /+ // This part is buggy and needs to be fixed... 4822 auto nanTask = task(&isNaN, 1.0L); 4823 poolInstance.put(nanTask); 4824 assert(nanTask.spinForce == false); 4825 4826 if (poolInstance.size > 0) 4827 { 4828 // Test work waiting. 4829 static void uselessFun() 4830 { 4831 foreach (i; 0 .. 1_000_000) {} 4832 } 4833 4834 auto uselessTasks = new typeof(task(&uselessFun))[1000]; 4835 foreach (ref uselessTask; uselessTasks) 4836 { 4837 uselessTask = task(&uselessFun); 4838 } 4839 foreach (ref uselessTask; uselessTasks) 4840 { 4841 poolInstance.put(uselessTask); 4842 } 4843 foreach (ref uselessTask; uselessTasks) 4844 { 4845 uselessTask.workForce(); 4846 } 4847 } 4848 +/ 4849 4850 // Test the case of non-random access + ref returns. 4851 int[] nums = [1,2,3,4,5]; 4852 static struct RemoveRandom 4853 { 4854 int[] arr; 4855 4856 ref int front() 4857 { 4858 return arr.front; 4859 } 4860 void popFront() 4861 { 4862 arr.popFront(); 4863 } 4864 bool empty() 4865 { 4866 return arr.empty; 4867 } 4868 } 4869 4870 auto refRange = RemoveRandom(nums); 4871 foreach (ref elem; poolInstance.parallel(refRange)) 4872 { 4873 elem++; 4874 } 4875 assert(nums == [2,3,4,5,6], text(nums)); 4876 stderr.writeln("Nums: ", nums); 4877 4878 poolInstance.stop(); 4879 } 4880 } 4881 } 4882 4883 @system unittest 4884 { 4885 static struct __S_12733 4886 { 4887 invariant() { assert(checksum == 1_234_567_890); } 4888 this(ulong u){n = u;} 4889 void opAssign(__S_12733 s){this.n = s.n;} 4890 ulong n; 4891 ulong checksum = 1_234_567_890; 4892 } 4893 4894 static auto __genPair_12733(ulong n) { return __S_12733(n); } 4895 immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ]; 4896 4897 auto result = taskPool.amap!__genPair_12733(data); 4898 } 4899 4900 @safe unittest 4901 { 4902 import std.range : iota; 4903 4904 // this test was in std.range, but caused cycles. 4905 assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} })); 4906 } 4907 4908 @safe unittest 4909 { 4910 import std.algorithm.iteration : each; 4911 4912 long[] arr; 4913 static assert(is(typeof({ 4914 arr.parallel.each!"a++"; 4915 }))); 4916 } 4917 4918 // https://issues.dlang.org/show_bug.cgi?id=17539 4919 @system unittest 4920 { 4921 import std.random : rndGen; 4922 // ensure compilation 4923 try foreach (rnd; rndGen.parallel) break; 4924 catch (ParallelForeachError e) {} 4925 }