1 /** 2 `std.parallelism` implements high-level primitives for SMP parallelism. 3 These include parallel foreach, parallel reduce, parallel eager map, pipelining 4 and future/promise parallelism. `std.parallelism` is recommended when the 5 same operation is to be executed in parallel on different data, or when a 6 function is to be executed in a background thread and its result returned to a 7 well-defined main thread. For communication between arbitrary threads, see 8 `std.concurrency`. 9 10 `std.parallelism` is based on the concept of a `Task`. A `Task` is an 11 object that represents the fundamental unit of work in this library and may be 12 executed in parallel with any other `Task`. Using `Task` 13 directly allows programming with a future/promise paradigm. All other 14 supported parallelism paradigms (parallel foreach, map, reduce, pipelining) 15 represent an additional level of abstraction over `Task`. They 16 automatically create one or more `Task` objects, or closely related types 17 that are conceptually identical but not part of the public API. 18 19 After creation, a `Task` may be executed in a new thread, or submitted 20 to a `TaskPool` for execution. A `TaskPool` encapsulates a task queue 21 and its worker threads. Its purpose is to efficiently map a large 22 number of `Task`s onto a smaller number of threads. A task queue is a 23 FIFO queue of `Task` objects that have been submitted to the 24 `TaskPool` and are awaiting execution. A worker thread is a thread that 25 is associated with exactly one task queue. It executes the `Task` at the 26 front of its queue when the queue has work available, or sleeps when 27 no work is available. Each task queue is associated with zero or 28 more worker threads. If the result of a `Task` is needed before execution 29 by a worker thread has begun, the `Task` can be removed from the task queue 30 and executed immediately in the thread where the result is needed. 31 32 Warning: Unless marked as `@trusted` or `@safe`, artifacts in 33 this module allow implicit data sharing between threads and cannot 34 guarantee that client code is free from low level data races. 35 36 Source: $(PHOBOSSRC std/parallelism.d) 37 Author: David Simcha 38 Copyright: Copyright (c) 2009-2011, David Simcha. 39 License: $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0) 40 */ 41 module std.parallelism; 42 43 version (WebAssembly) {} else: 44 45 version (OSX) 46 version = Darwin; 47 else version (iOS) 48 version = Darwin; 49 else version (TVOS) 50 version = Darwin; 51 else version (WatchOS) 52 version = Darwin; 53 54 /// 55 @system unittest 56 { 57 import std.algorithm.iteration : map; 58 import std.math.operations : isClose; 59 import std.parallelism : taskPool; 60 import std.range : iota; 61 62 // Parallel reduce can be combined with 63 // std.algorithm.iteration.map to interesting effect. 64 // The following example (thanks to Russel Winder) 65 // calculates pi by quadrature using 66 // std.algorithm.map and TaskPool.reduce. 67 // getTerm is evaluated in parallel as needed by 68 // TaskPool.reduce. 69 // 70 // Timings on an Intel i5-3450 quad core machine 71 // for n = 1_000_000_000: 72 // 73 // TaskPool.reduce: 1.067 s 74 // std.algorithm.reduce: 4.011 s 75 76 enum n = 1_000_000; 77 enum delta = 1.0 / n; 78 79 alias getTerm = (int i) 80 { 81 immutable x = ( i - 0.5 ) * delta; 82 return delta / ( 1.0 + x * x ) ; 83 }; 84 85 immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm); 86 87 assert(pi.isClose(3.14159, 1e-5)); 88 } 89 90 import core.atomic; 91 import core.memory; 92 import core.sync.condition; 93 import core.thread; 94 95 import std.functional; 96 import std.meta; 97 import std.range.primitives; 98 import std.traits; 99 100 /* 101 (For now public undocumented with reserved name.) 102 103 A lazily initialized global constant. The underlying value is a shared global 104 statically initialized to `outOfBandValue` which must not be a legit value of 105 the constant. Upon the first call the situation is detected and the global is 106 initialized by calling `initializer`. The initializer is assumed to be pure 107 (even if not marked as such), i.e. return the same value upon repeated calls. 108 For that reason, no special precautions are taken so `initializer` may be called 109 more than one time leading to benign races on the cached value. 110 111 In the quiescent state the cost of the function is an atomic load from a global. 112 113 Params: 114 T = The type of the pseudo-constant (may be qualified) 115 outOfBandValue = A value that cannot be valid, it is used for initialization 116 initializer = The function performing initialization; must be `nothrow` 117 118 Returns: 119 The lazily initialized value 120 */ 121 @property pure 122 T __lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)() 123 if (is(Unqual!T : T) 124 && is(typeof(initializer()) : T) 125 && is(typeof(outOfBandValue) : T)) 126 { 127 static T impl() nothrow 128 { 129 // Thread-local cache 130 static Unqual!T tls = outOfBandValue; 131 auto local = tls; 132 // Shortest path, no atomic operations 133 if (local != outOfBandValue) return local; 134 // Process-level cache 135 static shared Unqual!T result = outOfBandValue; 136 // Initialize both process-level cache and tls 137 local = atomicLoad(result); 138 if (local == outOfBandValue) 139 { 140 local = initializer(); 141 atomicStore(result, local); 142 } 143 tls = local; 144 return local; 145 } 146 147 import std.traits : SetFunctionAttributes; 148 alias Fun = SetFunctionAttributes!(typeof(&impl), "D", 149 functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_); 150 auto purified = (() @trusted => cast(Fun) &impl)(); 151 return purified(); 152 } 153 154 // Returns the size of a cache line. 155 alias cacheLineSize = 156 __lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl); 157 158 private size_t cacheLineSizeImpl() @nogc nothrow @trusted 159 { 160 size_t result = 0; 161 import core.cpuid : datacache; 162 foreach (ref const cachelevel; datacache) 163 { 164 if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max) 165 { 166 result = cachelevel.lineSize; 167 } 168 } 169 return result; 170 } 171 172 @nogc @safe nothrow unittest 173 { 174 assert(cacheLineSize == cacheLineSizeImpl); 175 } 176 177 /* Atomics code. These forward to core.atomic, but are written like this 178 for two reasons: 179 180 1. They used to actually contain ASM code and I don' want to have to change 181 to directly calling core.atomic in a zillion different places. 182 183 2. core.atomic has some misc. issues that make my use cases difficult 184 without wrapping it. If I didn't wrap it, casts would be required 185 basically everywhere. 186 */ 187 private void atomicSetUbyte(T)(ref T stuff, T newVal) 188 if (__traits(isIntegral, T) && is(T : ubyte)) 189 { 190 //core.atomic.cas(cast(shared) &stuff, stuff, newVal); 191 atomicStore(*(cast(shared) &stuff), newVal); 192 } 193 194 private ubyte atomicReadUbyte(T)(ref T val) 195 if (__traits(isIntegral, T) && is(T : ubyte)) 196 { 197 return atomicLoad(*(cast(shared) &val)); 198 } 199 200 // This gets rid of the need for a lot of annoying casts in other parts of the 201 // code, when enums are involved. 202 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal) 203 if (__traits(isIntegral, T) && is(T : ubyte)) 204 { 205 return core.atomic.cas(cast(shared) &stuff, testVal, newVal); 206 } 207 208 /*--------------------- Generic helper functions, etc.------------------------*/ 209 private template MapType(R, functions...) 210 { 211 static assert(functions.length); 212 213 ElementType!R e = void; 214 alias MapType = 215 typeof(adjoin!(staticMap!(unaryFun, functions))(e)); 216 } 217 218 private template ReduceType(alias fun, R, E) 219 { 220 alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init)); 221 } 222 223 private template noUnsharedAliasing(T) 224 { 225 enum bool noUnsharedAliasing = !hasUnsharedAliasing!T; 226 } 227 228 // This template tests whether a function may be executed in parallel from 229 // @safe code via Task.executeInNewThread(). There is an additional 230 // requirement for executing it via a TaskPool. (See isSafeReturn). 231 private template isSafeTask(F) 232 { 233 enum bool isSafeTask = 234 (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 && 235 (functionAttributes!F & FunctionAttribute.ref_) == 0 && 236 (isFunctionPointer!F || !hasUnsharedAliasing!F) && 237 allSatisfy!(noUnsharedAliasing, Parameters!F); 238 } 239 240 @safe unittest 241 { 242 alias F1 = void function() @safe; 243 alias F2 = void function(); 244 alias F3 = void function(uint, string) @trusted; 245 alias F4 = void function(uint, char[]); 246 247 static assert( isSafeTask!F1); 248 static assert(!isSafeTask!F2); 249 static assert( isSafeTask!F3); 250 static assert(!isSafeTask!F4); 251 252 alias F5 = uint[] function(uint, string) pure @trusted; 253 static assert( isSafeTask!F5); 254 } 255 256 // This function decides whether Tasks that meet all of the other requirements 257 // for being executed from @safe code can be executed on a TaskPool. 258 // When executing via TaskPool, it's theoretically possible 259 // to return a value that is also pointed to by a worker thread's thread local 260 // storage. When executing from executeInNewThread(), the thread that executed 261 // the Task is terminated by the time the return value is visible in the calling 262 // thread, so this is a non-issue. It's also a non-issue for pure functions 263 // since they can't read global state. 264 private template isSafeReturn(T) 265 { 266 static if (!hasUnsharedAliasing!(T.ReturnType)) 267 { 268 enum isSafeReturn = true; 269 } 270 else static if (T.isPure) 271 { 272 enum isSafeReturn = true; 273 } 274 else 275 { 276 enum isSafeReturn = false; 277 } 278 } 279 280 private template randAssignable(R) 281 { 282 enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R; 283 } 284 285 private enum TaskStatus : ubyte 286 { 287 notStarted, 288 inProgress, 289 done 290 } 291 292 private template AliasReturn(alias fun, T...) 293 { 294 alias AliasReturn = typeof({ T args; return fun(args); }); 295 } 296 297 // Should be private, but std.algorithm.reduce is used in the zero-thread case 298 // and won't work w/ private. 299 template reduceAdjoin(functions...) 300 { 301 static if (functions.length == 1) 302 { 303 alias reduceAdjoin = binaryFun!(functions[0]); 304 } 305 else 306 { 307 T reduceAdjoin(T, U)(T lhs, U rhs) 308 { 309 alias funs = staticMap!(binaryFun, functions); 310 311 foreach (i, Unused; typeof(lhs.expand)) 312 { 313 lhs.expand[i] = funs[i](lhs.expand[i], rhs); 314 } 315 316 return lhs; 317 } 318 } 319 } 320 321 private template reduceFinish(functions...) 322 { 323 static if (functions.length == 1) 324 { 325 alias reduceFinish = binaryFun!(functions[0]); 326 } 327 else 328 { 329 T reduceFinish(T)(T lhs, T rhs) 330 { 331 alias funs = staticMap!(binaryFun, functions); 332 333 foreach (i, Unused; typeof(lhs.expand)) 334 { 335 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]); 336 } 337 338 return lhs; 339 } 340 } 341 } 342 343 private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2) 344 { 345 enum isRoundRobin = true; 346 } 347 348 private template isRoundRobin(T) 349 { 350 enum isRoundRobin = false; 351 } 352 353 @safe unittest 354 { 355 static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate()))); 356 static assert(!isRoundRobin!(uint)); 357 } 358 359 // This is the base "class" for all of the other tasks. Using C-style 360 // polymorphism to allow more direct control over memory allocation, etc. 361 private struct AbstractTask 362 { 363 AbstractTask* prev; 364 AbstractTask* next; 365 366 // Pointer to a function that executes this task. 367 void function(void*) runTask; 368 369 Throwable exception; 370 ubyte taskStatus = TaskStatus.notStarted; 371 372 bool done() @property 373 { 374 if (atomicReadUbyte(taskStatus) == TaskStatus.done) 375 { 376 if (exception) 377 { 378 throw exception; 379 } 380 381 return true; 382 } 383 384 return false; 385 } 386 387 void job() 388 { 389 runTask(&this); 390 } 391 } 392 393 /** 394 `Task` represents the fundamental unit of work. A `Task` may be 395 executed in parallel with any other `Task`. Using this struct directly 396 allows future/promise parallelism. In this paradigm, a function (or delegate 397 or other callable) is executed in a thread other than the one it was called 398 from. The calling thread does not block while the function is being executed. 399 A call to `workForce`, `yieldForce`, or `spinForce` is used to 400 ensure that the `Task` has finished executing and to obtain the return 401 value, if any. These functions and `done` also act as full memory barriers, 402 meaning that any memory writes made in the thread that executed the `Task` 403 are guaranteed to be visible in the calling thread after one of these functions 404 returns. 405 406 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can 407 be used to create an instance of this struct. See `task` for usage examples. 408 409 Function results are returned from `yieldForce`, `spinForce` and 410 `workForce` by ref. If `fun` returns by ref, the reference will point 411 to the returned reference of `fun`. Otherwise it will point to a 412 field in this struct. 413 414 Copying of this struct is disabled, since it would provide no useful semantics. 415 If you want to pass this struct around, you should do so by reference or 416 pointer. 417 418 Bugs: Changes to `ref` and `out` arguments are not propagated to the 419 call site, only to `args` in this struct. 420 */ 421 struct Task(alias fun, Args...) 422 { 423 @(imported!"core.attribute".mutableRefInit) private AbstractTask base = {runTask : &impl}; 424 private alias base this; 425 426 private @property AbstractTask* basePtr() 427 { 428 return &base; 429 } 430 431 private static void impl(void* myTask) 432 { 433 import std.algorithm.internal : addressOf; 434 435 Task* myCastedTask = cast(typeof(this)*) myTask; 436 static if (is(ReturnType == void)) 437 { 438 fun(myCastedTask._args); 439 } 440 else static if (is(typeof(&(fun(myCastedTask._args))))) 441 { 442 myCastedTask.returnVal = addressOf(fun(myCastedTask._args)); 443 } 444 else 445 { 446 myCastedTask.returnVal = fun(myCastedTask._args); 447 } 448 } 449 450 private TaskPool pool; 451 private bool isScoped; // True if created with scopedTask. 452 453 Args _args; 454 455 /** 456 The arguments the function was called with. Changes to `out` and 457 `ref` arguments will be visible here. 458 */ 459 static if (__traits(isSame, fun, run)) 460 { 461 alias args = _args[1..$]; 462 } 463 else 464 { 465 alias args = _args; 466 } 467 468 469 // The purpose of this code is to decide whether functions whose 470 // return values have unshared aliasing can be executed via 471 // TaskPool from @safe code. See isSafeReturn. 472 static if (__traits(isSame, fun, run)) 473 { 474 static if (isFunctionPointer!(_args[0])) 475 { 476 private enum bool isPure = 477 (functionAttributes!(Args[0]) & FunctionAttribute.pure_) != 0; 478 } 479 else 480 { 481 // BUG: Should check this for delegates too, but std.traits 482 // apparently doesn't allow this. isPure is irrelevant 483 // for delegates, at least for now since shared delegates 484 // don't work. 485 private enum bool isPure = false; 486 } 487 488 } 489 else 490 { 491 // We already know that we can't execute aliases in @safe code, so 492 // just put a dummy value here. 493 private enum bool isPure = false; 494 } 495 496 497 /** 498 The return type of the function called by this `Task`. This can be 499 `void`. 500 */ 501 alias ReturnType = typeof(fun(_args)); 502 503 static if (!is(ReturnType == void)) 504 { 505 static if (is(typeof(&fun(_args)))) 506 { 507 // Ref return. 508 ReturnType* returnVal; 509 510 ref ReturnType fixRef(ReturnType* val) 511 { 512 return *val; 513 } 514 515 } 516 else 517 { 518 ReturnType returnVal; 519 520 ref ReturnType fixRef(ref ReturnType val) 521 { 522 return val; 523 } 524 } 525 } 526 527 private void enforcePool() 528 { 529 import std.exception : enforce; 530 enforce(this.pool !is null, "Job not submitted yet."); 531 } 532 533 static if (Args.length > 0) 534 { 535 private this(Args args) 536 { 537 _args = args; 538 } 539 } 540 541 // Work around DMD bug https://issues.dlang.org/show_bug.cgi?id=6588, 542 // allow immutable elements. 543 static if (allSatisfy!(isAssignable, Args)) 544 { 545 typeof(this) opAssign(typeof(this) rhs) 546 { 547 foreach (i, Type; typeof(this.tupleof)) 548 { 549 this.tupleof[i] = rhs.tupleof[i]; 550 } 551 return this; 552 } 553 } 554 else 555 { 556 @disable typeof(this) opAssign(typeof(this) rhs); 557 } 558 559 /** 560 If the `Task` isn't started yet, execute it in the current thread. 561 If it's done, return its return value, if any. If it's in progress, 562 busy spin until it's done, then return the return value. If it threw 563 an exception, rethrow that exception. 564 565 This function should be used when you expect the result of the 566 `Task` to be available on a timescale shorter than that of an OS 567 context switch. 568 */ 569 @property ref ReturnType spinForce() @trusted 570 { 571 enforcePool(); 572 573 this.pool.tryDeleteExecute(basePtr); 574 575 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {} 576 577 if (exception) 578 { 579 throw exception; 580 } 581 582 static if (!is(ReturnType == void)) 583 { 584 return fixRef(this.returnVal); 585 } 586 } 587 588 /** 589 If the `Task` isn't started yet, execute it in the current thread. 590 If it's done, return its return value, if any. If it's in progress, 591 wait on a condition variable. If it threw an exception, rethrow that 592 exception. 593 594 This function should be used for expensive functions, as waiting on a 595 condition variable introduces latency, but avoids wasted CPU cycles. 596 */ 597 @property ref ReturnType yieldForce() @trusted 598 { 599 enforcePool(); 600 this.pool.tryDeleteExecute(basePtr); 601 602 if (done) 603 { 604 static if (is(ReturnType == void)) 605 { 606 return; 607 } 608 else 609 { 610 return fixRef(this.returnVal); 611 } 612 } 613 614 pool.waiterLock(); 615 scope(exit) pool.waiterUnlock(); 616 617 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) 618 { 619 pool.waitUntilCompletion(); 620 } 621 622 if (exception) 623 { 624 throw exception; // nocoverage 625 } 626 627 static if (!is(ReturnType == void)) 628 { 629 return fixRef(this.returnVal); 630 } 631 } 632 633 /** 634 If this `Task` was not started yet, execute it in the current 635 thread. If it is finished, return its result. If it is in progress, 636 execute any other `Task` from the `TaskPool` instance that 637 this `Task` was submitted to until this one 638 is finished. If it threw an exception, rethrow that exception. 639 If no other tasks are available or this `Task` was executed using 640 `executeInNewThread`, wait on a condition variable. 641 */ 642 @property ref ReturnType workForce() @trusted 643 { 644 enforcePool(); 645 this.pool.tryDeleteExecute(basePtr); 646 647 while (true) 648 { 649 if (done) // done() implicitly checks for exceptions. 650 { 651 static if (is(ReturnType == void)) 652 { 653 return; 654 } 655 else 656 { 657 return fixRef(this.returnVal); 658 } 659 } 660 661 AbstractTask* job; 662 { 663 // Locking explicitly and calling popNoSync() because 664 // pop() waits on a condition variable if there are no Tasks 665 // in the queue. 666 667 pool.queueLock(); 668 scope(exit) pool.queueUnlock(); 669 job = pool.popNoSync(); 670 } 671 672 673 if (job !is null) 674 { 675 676 version (verboseUnittest) 677 { 678 stderr.writeln("Doing workForce work."); 679 } 680 681 pool.doJob(job); 682 683 if (done) 684 { 685 static if (is(ReturnType == void)) 686 { 687 return; 688 } 689 else 690 { 691 return fixRef(this.returnVal); 692 } 693 } 694 } 695 else 696 { 697 version (verboseUnittest) 698 { 699 stderr.writeln("Yield from workForce."); 700 } 701 702 return yieldForce; 703 } 704 } 705 } 706 707 /** 708 Returns `true` if the `Task` is finished executing. 709 710 Throws: Rethrows any exception thrown during the execution of the 711 `Task`. 712 */ 713 @property bool done() @trusted 714 { 715 // Explicitly forwarded for documentation purposes. 716 return base.done; 717 } 718 719 /** 720 Create a new thread for executing this `Task`, execute it in the 721 newly created thread, then terminate the thread. This can be used for 722 future/promise parallelism. An explicit priority may be given 723 to the `Task`. If one is provided, its value is forwarded to 724 `core.thread.Thread.priority`. See $(REF task, std,parallelism) for 725 usage example. 726 */ 727 void executeInNewThread() @trusted 728 { 729 pool = new TaskPool(basePtr); 730 } 731 732 /// Ditto 733 void executeInNewThread(int priority) @trusted 734 { 735 pool = new TaskPool(basePtr, priority); 736 } 737 738 @safe ~this() 739 { 740 if (isScoped && pool !is null && taskStatus != TaskStatus.done) 741 { 742 yieldForce; 743 } 744 } 745 746 // When this is uncommented, it somehow gets called on returning from 747 // scopedTask even though the struct shouldn't be getting copied. 748 //@disable this(this) {} 749 } 750 751 // Calls `fpOrDelegate` with `args`. This is an 752 // adapter that makes `Task` work with delegates, function pointers and 753 // functors instead of just aliases. 754 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args) 755 { 756 return fpOrDelegate(args); 757 } 758 759 /** 760 Creates a `Task` on the GC heap that calls an alias. This may be executed 761 via `Task.executeInNewThread` or by submitting to a 762 $(REF TaskPool, std,parallelism). A globally accessible instance of 763 `TaskPool` is provided by $(REF taskPool, std,parallelism). 764 765 Returns: A pointer to the `Task`. 766 767 Example: 768 --- 769 // Read two files into memory at the same time. 770 import std.file; 771 772 void main() 773 { 774 // Create and execute a Task for reading 775 // foo.txt. 776 auto file1Task = task!read("foo.txt"); 777 file1Task.executeInNewThread(); 778 779 // Read bar.txt in parallel. 780 auto file2Data = read("bar.txt"); 781 782 // Get the results of reading foo.txt. 783 auto file1Data = file1Task.yieldForce; 784 } 785 --- 786 787 --- 788 // Sorts an array using a parallel quick sort algorithm. 789 // The first partition is done serially. Both recursion 790 // branches are then executed in parallel. 791 // 792 // Timings for sorting an array of 1,000,000 doubles on 793 // an Athlon 64 X2 dual core machine: 794 // 795 // This implementation: 176 milliseconds. 796 // Equivalent serial implementation: 280 milliseconds 797 void parallelSort(T)(T[] data) 798 { 799 // Sort small subarrays serially. 800 if (data.length < 100) 801 { 802 std.algorithm.sort(data); 803 return; 804 } 805 806 // Partition the array. 807 swap(data[$ / 2], data[$ - 1]); 808 auto pivot = data[$ - 1]; 809 bool lessThanPivot(T elem) { return elem < pivot; } 810 811 auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]); 812 swap(data[$ - greaterEqual.length - 1], data[$ - 1]); 813 814 auto less = data[0..$ - greaterEqual.length - 1]; 815 greaterEqual = data[$ - greaterEqual.length..$]; 816 817 // Execute both recursion branches in parallel. 818 auto recurseTask = task!parallelSort(greaterEqual); 819 taskPool.put(recurseTask); 820 parallelSort(less); 821 recurseTask.yieldForce; 822 } 823 --- 824 */ 825 auto task(alias fun, Args...)(Args args) 826 { 827 return new Task!(fun, Args)(args); 828 } 829 830 /** 831 Creates a `Task` on the GC heap that calls a function pointer, delegate, or 832 class/struct with overloaded opCall. 833 834 Example: 835 --- 836 // Read two files in at the same time again, 837 // but this time use a function pointer instead 838 // of an alias to represent std.file.read. 839 import std.file; 840 841 void main() 842 { 843 // Create and execute a Task for reading 844 // foo.txt. 845 auto file1Task = task(&read!string, "foo.txt", size_t.max); 846 file1Task.executeInNewThread(); 847 848 // Read bar.txt in parallel. 849 auto file2Data = read("bar.txt"); 850 851 // Get the results of reading foo.txt. 852 auto file1Data = file1Task.yieldForce; 853 } 854 --- 855 856 Notes: This function takes a non-scope delegate, meaning it can be 857 used with closures. If you can't allocate a closure due to objects 858 on the stack that have scoped destruction, see `scopedTask`, which 859 takes a scope delegate. 860 */ 861 auto task(F, Args...)(F delegateOrFp, Args args) 862 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) 863 { 864 return new Task!(run, F, Args)(delegateOrFp, args); 865 } 866 867 /** 868 Version of `task` usable from `@safe` code. Usage mechanics are 869 identical to the non-@safe case, but safety introduces some restrictions: 870 871 1. `fun` must be @safe or @trusted. 872 873 2. `F` must not have any unshared aliasing as defined by 874 $(REF hasUnsharedAliasing, std,traits). This means it 875 may not be an unshared delegate or a non-shared class or struct 876 with overloaded `opCall`. This also precludes accepting template 877 alias parameters. 878 879 3. `Args` must not have unshared aliasing. 880 881 4. `fun` must not return by reference. 882 883 5. The return type must not have unshared aliasing unless `fun` is 884 `pure` or the `Task` is executed via `executeInNewThread` instead 885 of using a `TaskPool`. 886 887 */ 888 @trusted auto task(F, Args...)(F fun, Args args) 889 if (is(typeof(fun(args))) && isSafeTask!F) 890 { 891 return new Task!(run, F, Args)(fun, args); 892 } 893 894 /** 895 These functions allow the creation of `Task` objects on the stack rather 896 than the GC heap. The lifetime of a `Task` created by `scopedTask` 897 cannot exceed the lifetime of the scope it was created in. 898 899 `scopedTask` might be preferred over `task`: 900 901 1. When a `Task` that calls a delegate is being created and a closure 902 cannot be allocated due to objects on the stack that have scoped 903 destruction. The delegate overload of `scopedTask` takes a `scope` 904 delegate. 905 906 2. As a micro-optimization, to avoid the heap allocation associated with 907 `task` or with the creation of a closure. 908 909 Usage is otherwise identical to `task`. 910 911 Notes: `Task` objects created using `scopedTask` will automatically 912 call `Task.yieldForce` in their destructor if necessary to ensure 913 the `Task` is complete before the stack frame they reside on is destroyed. 914 */ 915 auto scopedTask(alias fun, Args...)(Args args) 916 { 917 auto ret = Task!(fun, Args)(args); 918 ret.isScoped = true; 919 return ret; 920 } 921 922 /// Ditto 923 auto scopedTask(F, Args...)(scope F delegateOrFp, Args args) 924 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) 925 { 926 auto ret = Task!(run, F, Args)(delegateOrFp, args); 927 ret.isScoped = true; 928 return ret; 929 } 930 931 /// Ditto 932 @trusted auto scopedTask(F, Args...)(F fun, Args args) 933 if (is(typeof(fun(args))) && isSafeTask!F) 934 { 935 auto ret = Task!(run, F, Args)(fun, args); 936 ret.isScoped = true; 937 return ret; 938 } 939 940 /** 941 The total number of CPU cores available on the current machine, as reported by 942 the operating system. 943 */ 944 alias totalCPUs = 945 __lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl); 946 947 uint totalCPUsImpl() @nogc nothrow @trusted 948 { 949 version (Windows) 950 { 951 // BUGS: Only works on Windows 2000 and above. 952 import core.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo; 953 import std.algorithm.comparison : max; 954 SYSTEM_INFO si; 955 GetSystemInfo(&si); 956 return max(1, cast(uint) si.dwNumberOfProcessors); 957 } 958 else version (linux) 959 { 960 import core.stdc.stdlib : calloc; 961 import core.stdc.string : memset; 962 import core.sys.linux.sched : CPU_ALLOC_SIZE, CPU_FREE, CPU_COUNT, CPU_COUNT_S, cpu_set_t, sched_getaffinity; 963 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 964 965 int count = 0; 966 967 /** 968 * According to ruby's source code, CPU_ALLOC() doesn't work as expected. 969 * see: https://github.com/ruby/ruby/commit/7d9e04de496915dd9e4544ee18b1a0026dc79242 970 * 971 * The hardcode number also comes from ruby's source code. 972 * see: https://github.com/ruby/ruby/commit/0fa75e813ecf0f5f3dd01f89aa76d0e25ab4fcd4 973 */ 974 for (int n = 64; n <= 16384; n *= 2) 975 { 976 size_t size = CPU_ALLOC_SIZE(count); 977 if (size >= 0x400) 978 { 979 auto cpuset = cast(cpu_set_t*) calloc(1, size); 980 if (cpuset is null) break; 981 if (sched_getaffinity(0, size, cpuset) == 0) 982 { 983 count = CPU_COUNT_S(size, cpuset); 984 } 985 CPU_FREE(cpuset); 986 } 987 else 988 { 989 cpu_set_t cpuset; 990 if (sched_getaffinity(0, cpu_set_t.sizeof, &cpuset) == 0) 991 { 992 count = CPU_COUNT(&cpuset); 993 } 994 } 995 996 if (count > 0) 997 return cast(uint) count; 998 } 999 1000 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1001 } 1002 else version (Darwin) 1003 { 1004 import core.sys.darwin.sys.sysctl : sysctlbyname; 1005 uint result; 1006 size_t len = result.sizeof; 1007 sysctlbyname("hw.physicalcpu", &result, &len, null, 0); 1008 return result; 1009 } 1010 else version (DragonFlyBSD) 1011 { 1012 import core.sys.dragonflybsd.sys.sysctl : sysctlbyname; 1013 uint result; 1014 size_t len = result.sizeof; 1015 sysctlbyname("hw.ncpu", &result, &len, null, 0); 1016 return result; 1017 } 1018 else version (FreeBSD) 1019 { 1020 import core.sys.freebsd.sys.sysctl : sysctlbyname; 1021 uint result; 1022 size_t len = result.sizeof; 1023 sysctlbyname("hw.ncpu", &result, &len, null, 0); 1024 return result; 1025 } 1026 else version (NetBSD) 1027 { 1028 import core.sys.netbsd.sys.sysctl : sysctlbyname; 1029 uint result; 1030 size_t len = result.sizeof; 1031 sysctlbyname("hw.ncpu", &result, &len, null, 0); 1032 return result; 1033 } 1034 else version (Solaris) 1035 { 1036 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 1037 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1038 } 1039 else version (OpenBSD) 1040 { 1041 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 1042 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1043 } 1044 else version (Hurd) 1045 { 1046 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 1047 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1048 } 1049 else 1050 { 1051 static assert(0, "Don't know how to get N CPUs on this OS."); 1052 } 1053 } 1054 1055 /* 1056 This class serves two purposes: 1057 1058 1. It distinguishes std.parallelism threads from other threads so that 1059 the std.parallelism daemon threads can be terminated. 1060 1061 2. It adds a reference to the pool that the thread is a member of, 1062 which is also necessary to allow the daemon threads to be properly 1063 terminated. 1064 */ 1065 private final class ParallelismThread : Thread 1066 { 1067 this(void delegate() dg) 1068 { 1069 super(dg); 1070 } 1071 1072 TaskPool pool; 1073 } 1074 1075 // Kill daemon threads. 1076 shared static ~this() 1077 { 1078 foreach (ref thread; Thread) 1079 { 1080 auto pthread = cast(ParallelismThread) thread; 1081 if (pthread is null) continue; 1082 auto pool = pthread.pool; 1083 if (!pool.isDaemon) continue; 1084 pool.stop(); 1085 pthread.join(); 1086 } 1087 } 1088 1089 /** 1090 This class encapsulates a task queue and a set of worker threads. Its purpose 1091 is to efficiently map a large number of `Task`s onto a smaller number of 1092 threads. A task queue is a FIFO queue of `Task` objects that have been 1093 submitted to the `TaskPool` and are awaiting execution. A worker thread is a 1094 thread that executes the `Task` at the front of the queue when one is 1095 available and sleeps when the queue is empty. 1096 1097 This class should usually be used via the global instantiation 1098 available via the $(REF taskPool, std,parallelism) property. 1099 Occasionally it is useful to explicitly instantiate a `TaskPool`: 1100 1101 1. When you want `TaskPool` instances with multiple priorities, for example 1102 a low priority pool and a high priority pool. 1103 1104 2. When the threads in the global task pool are waiting on a synchronization 1105 primitive (for example a mutex), and you want to parallelize the code that 1106 needs to run before these threads can be resumed. 1107 1108 Note: The worker threads in this pool will not stop until 1109 `stop` or `finish` is called, even if the main thread 1110 has finished already. This may lead to programs that 1111 never end. If you do not want this behaviour, you can set `isDaemon` 1112 to true. 1113 */ 1114 final class TaskPool 1115 { 1116 private: 1117 1118 // A pool can either be a regular pool or a single-task pool. A 1119 // single-task pool is a dummy pool that's fired up for 1120 // Task.executeInNewThread(). 1121 bool isSingleTask; 1122 1123 ParallelismThread[] pool; 1124 Thread singleTaskThread; 1125 1126 AbstractTask* head; 1127 AbstractTask* tail; 1128 PoolState status = PoolState.running; 1129 Condition workerCondition; 1130 Condition waiterCondition; 1131 Mutex queueMutex; 1132 Mutex waiterMutex; // For waiterCondition 1133 1134 // The instanceStartIndex of the next instance that will be created. 1135 __gshared size_t nextInstanceIndex = 1; 1136 1137 // The index of the current thread. 1138 static size_t threadIndex; 1139 1140 // The index of the first thread in this instance. 1141 immutable size_t instanceStartIndex; 1142 1143 // The index that the next thread to be initialized in this pool will have. 1144 size_t nextThreadIndex; 1145 1146 enum PoolState : ubyte 1147 { 1148 running, 1149 finishing, 1150 stopNow 1151 } 1152 1153 void doJob(AbstractTask* job) 1154 { 1155 assert(job.taskStatus == TaskStatus.inProgress); 1156 assert(job.next is null); 1157 assert(job.prev is null); 1158 1159 try 1160 { 1161 job.job(); 1162 } 1163 catch (Throwable e) 1164 { 1165 job.exception = e; 1166 } 1167 1168 atomicSetUbyte(job.taskStatus, TaskStatus.done); 1169 1170 if (!isSingleTask) 1171 { 1172 waiterLock(); 1173 scope(exit) waiterUnlock(); 1174 notifyWaiters(); 1175 } 1176 } 1177 1178 // This function is used for dummy pools created by Task.executeInNewThread(). 1179 void doSingleTask() 1180 { 1181 // No synchronization. Pool is guaranteed to only have one thread, 1182 // and the queue is submitted to before this thread is created. 1183 assert(head); 1184 auto t = head; 1185 t.next = t.prev = head = null; 1186 doJob(t); 1187 } 1188 1189 // This function performs initialization for each thread that affects 1190 // thread local storage and therefore must be done from within the 1191 // worker thread. It then calls executeWorkLoop(). 1192 void startWorkLoop() 1193 { 1194 // Initialize thread index. 1195 { 1196 queueLock(); 1197 scope(exit) queueUnlock(); 1198 threadIndex = nextThreadIndex; 1199 nextThreadIndex++; 1200 } 1201 1202 executeWorkLoop(); 1203 } 1204 1205 // This is the main work loop that worker threads spend their time in 1206 // until they terminate. It's also entered by non-worker threads when 1207 // finish() is called with the blocking variable set to true. 1208 void executeWorkLoop() 1209 { 1210 while (atomicReadUbyte(status) != PoolState.stopNow) 1211 { 1212 AbstractTask* task = pop(); 1213 if (task is null) 1214 { 1215 if (atomicReadUbyte(status) == PoolState.finishing) 1216 { 1217 atomicSetUbyte(status, PoolState.stopNow); 1218 return; 1219 } 1220 } 1221 else 1222 { 1223 doJob(task); 1224 } 1225 } 1226 } 1227 1228 // Pop a task off the queue. 1229 AbstractTask* pop() 1230 { 1231 queueLock(); 1232 scope(exit) queueUnlock(); 1233 auto ret = popNoSync(); 1234 while (ret is null && status == PoolState.running) 1235 { 1236 wait(); 1237 ret = popNoSync(); 1238 } 1239 return ret; 1240 } 1241 1242 AbstractTask* popNoSync() 1243 out(returned) 1244 { 1245 /* If task.prev and task.next aren't null, then another thread 1246 * can try to delete this task from the pool after it's 1247 * alreadly been deleted/popped. 1248 */ 1249 if (returned !is null) 1250 { 1251 assert(returned.next is null); 1252 assert(returned.prev is null); 1253 } 1254 } 1255 do 1256 { 1257 if (isSingleTask) return null; 1258 1259 AbstractTask* returned = head; 1260 if (head !is null) 1261 { 1262 head = head.next; 1263 returned.prev = null; 1264 returned.next = null; 1265 returned.taskStatus = TaskStatus.inProgress; 1266 } 1267 if (head !is null) 1268 { 1269 head.prev = null; 1270 } 1271 1272 return returned; 1273 } 1274 1275 // Push a task onto the queue. 1276 void abstractPut(AbstractTask* task) 1277 { 1278 queueLock(); 1279 scope(exit) queueUnlock(); 1280 abstractPutNoSync(task); 1281 } 1282 1283 void abstractPutNoSync(AbstractTask* task) 1284 in 1285 { 1286 assert(task); 1287 } 1288 out 1289 { 1290 import std.conv : text; 1291 1292 assert(tail.prev !is tail); 1293 assert(tail.next is null, text(tail.prev, '\t', tail.next)); 1294 if (tail.prev !is null) 1295 { 1296 assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next)); 1297 } 1298 } 1299 do 1300 { 1301 // Not using enforce() to save on function call overhead since this 1302 // is a performance critical function. 1303 if (status != PoolState.running) 1304 { 1305 throw new Error( 1306 "Cannot submit a new task to a pool after calling " ~ 1307 "finish() or stop()." 1308 ); 1309 } 1310 1311 task.next = null; 1312 if (head is null) //Queue is empty. 1313 { 1314 head = task; 1315 tail = task; 1316 tail.prev = null; 1317 } 1318 else 1319 { 1320 assert(tail); 1321 task.prev = tail; 1322 tail.next = task; 1323 tail = task; 1324 } 1325 notify(); 1326 } 1327 1328 void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t) 1329 { 1330 if (status != PoolState.running) 1331 { 1332 throw new Error( 1333 "Cannot submit a new task to a pool after calling " ~ 1334 "finish() or stop()." 1335 ); 1336 } 1337 1338 if (head is null) 1339 { 1340 head = h; 1341 tail = t; 1342 } 1343 else 1344 { 1345 h.prev = tail; 1346 tail.next = h; 1347 tail = t; 1348 } 1349 1350 notifyAll(); 1351 } 1352 1353 void tryDeleteExecute(AbstractTask* toExecute) 1354 { 1355 if (isSingleTask) return; 1356 1357 if ( !deleteItem(toExecute) ) 1358 { 1359 return; 1360 } 1361 1362 try 1363 { 1364 toExecute.job(); 1365 } 1366 catch (Exception e) 1367 { 1368 toExecute.exception = e; 1369 } 1370 1371 atomicSetUbyte(toExecute.taskStatus, TaskStatus.done); 1372 } 1373 1374 bool deleteItem(AbstractTask* item) 1375 { 1376 queueLock(); 1377 scope(exit) queueUnlock(); 1378 return deleteItemNoSync(item); 1379 } 1380 1381 bool deleteItemNoSync(AbstractTask* item) 1382 { 1383 if (item.taskStatus != TaskStatus.notStarted) 1384 { 1385 return false; 1386 } 1387 item.taskStatus = TaskStatus.inProgress; 1388 1389 if (item is head) 1390 { 1391 // Make sure head gets set properly. 1392 popNoSync(); 1393 return true; 1394 } 1395 if (item is tail) 1396 { 1397 tail = tail.prev; 1398 if (tail !is null) 1399 { 1400 tail.next = null; 1401 } 1402 item.next = null; 1403 item.prev = null; 1404 return true; 1405 } 1406 if (item.next !is null) 1407 { 1408 assert(item.next.prev is item); // Check queue consistency. 1409 item.next.prev = item.prev; 1410 } 1411 if (item.prev !is null) 1412 { 1413 assert(item.prev.next is item); // Check queue consistency. 1414 item.prev.next = item.next; 1415 } 1416 item.next = null; 1417 item.prev = null; 1418 return true; 1419 } 1420 1421 void queueLock() 1422 { 1423 assert(queueMutex); 1424 if (!isSingleTask) queueMutex.lock(); 1425 } 1426 1427 void queueUnlock() 1428 { 1429 assert(queueMutex); 1430 if (!isSingleTask) queueMutex.unlock(); 1431 } 1432 1433 void waiterLock() 1434 { 1435 if (!isSingleTask) waiterMutex.lock(); 1436 } 1437 1438 void waiterUnlock() 1439 { 1440 if (!isSingleTask) waiterMutex.unlock(); 1441 } 1442 1443 void wait() 1444 { 1445 if (!isSingleTask) workerCondition.wait(); 1446 } 1447 1448 void notify() 1449 { 1450 if (!isSingleTask) workerCondition.notify(); 1451 } 1452 1453 void notifyAll() 1454 { 1455 if (!isSingleTask) workerCondition.notifyAll(); 1456 } 1457 1458 void waitUntilCompletion() 1459 { 1460 if (isSingleTask) 1461 { 1462 singleTaskThread.join(); 1463 } 1464 else 1465 { 1466 waiterCondition.wait(); 1467 } 1468 } 1469 1470 void notifyWaiters() 1471 { 1472 if (!isSingleTask) waiterCondition.notifyAll(); 1473 } 1474 1475 // Private constructor for creating dummy pools that only have one thread, 1476 // only execute one Task, and then terminate. This is used for 1477 // Task.executeInNewThread(). 1478 this(AbstractTask* task, int priority = int.max) 1479 { 1480 assert(task); 1481 1482 // Dummy value, not used. 1483 instanceStartIndex = 0; 1484 1485 this.isSingleTask = true; 1486 task.taskStatus = TaskStatus.inProgress; 1487 this.head = task; 1488 singleTaskThread = new Thread(&doSingleTask); 1489 singleTaskThread.start(); 1490 1491 // Disabled until writing code to support 1492 // running thread with specified priority 1493 // See https://issues.dlang.org/show_bug.cgi?id=8960 1494 1495 /*if (priority != int.max) 1496 { 1497 singleTaskThread.priority = priority; 1498 }*/ 1499 } 1500 1501 public: 1502 // This is used in parallel_algorithm but is too unstable to document 1503 // as public API. 1504 size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow 1505 { 1506 import std.algorithm.comparison : max; 1507 1508 if (this.size == 0) 1509 { 1510 return max(rangeLen, 1); 1511 } 1512 1513 immutable size_t eightSize = 4 * (this.size + 1); 1514 auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1); 1515 return max(ret, 1); 1516 } 1517 1518 /** 1519 Default constructor that initializes a `TaskPool` with 1520 `totalCPUs` - 1 worker threads. The minus 1 is included because the 1521 main thread will also be available to do work. 1522 1523 Note: On single-core machines, the primitives provided by `TaskPool` 1524 operate transparently in single-threaded mode. 1525 */ 1526 this() @trusted 1527 { 1528 this(totalCPUs - 1); 1529 } 1530 1531 /** 1532 Allows for custom number of worker threads. 1533 */ 1534 this(size_t nWorkers) @trusted 1535 { 1536 synchronized(typeid(TaskPool)) 1537 { 1538 instanceStartIndex = nextInstanceIndex; 1539 1540 // The first worker thread to be initialized will have this index, 1541 // and will increment it. The second worker to be initialized will 1542 // have this index plus 1. 1543 nextThreadIndex = instanceStartIndex; 1544 nextInstanceIndex += nWorkers; 1545 } 1546 1547 queueMutex = new Mutex(this); 1548 waiterMutex = new Mutex(); 1549 workerCondition = new Condition(queueMutex); 1550 waiterCondition = new Condition(waiterMutex); 1551 1552 pool = new ParallelismThread[nWorkers]; 1553 foreach (ref poolThread; pool) 1554 { 1555 poolThread = new ParallelismThread(&startWorkLoop); 1556 poolThread.pool = this; 1557 poolThread.start(); 1558 } 1559 } 1560 1561 /** 1562 Implements a parallel foreach loop over a range. This works by implicitly 1563 creating and submitting one `Task` to the `TaskPool` for each worker 1564 thread. A work unit is a set of consecutive elements of `range` to 1565 be processed by a worker thread between communication with any other 1566 thread. The number of elements processed per work unit is controlled by the 1567 `workUnitSize` parameter. Smaller work units provide better load 1568 balancing, but larger work units avoid the overhead of communicating 1569 with other threads frequently to fetch the next work unit. Large work 1570 units also avoid false sharing in cases where the range is being modified. 1571 The less time a single iteration of the loop takes, the larger 1572 `workUnitSize` should be. For very expensive loop bodies, 1573 `workUnitSize` should be 1. An overload that chooses a default work 1574 unit size is also available. 1575 1576 Example: 1577 --- 1578 // Find the logarithm of every number from 1 to 1579 // 10_000_000 in parallel. 1580 auto logs = new double[10_000_000]; 1581 1582 // Parallel foreach works with or without an index 1583 // variable. It can iterate by ref if range.front 1584 // returns by ref. 1585 1586 // Iterate over logs using work units of size 100. 1587 foreach (i, ref elem; taskPool.parallel(logs, 100)) 1588 { 1589 elem = log(i + 1.0); 1590 } 1591 1592 // Same thing, but use the default work unit size. 1593 // 1594 // Timings on an Athlon 64 X2 dual core machine: 1595 // 1596 // Parallel foreach: 388 milliseconds 1597 // Regular foreach: 619 milliseconds 1598 foreach (i, ref elem; taskPool.parallel(logs)) 1599 { 1600 elem = log(i + 1.0); 1601 } 1602 --- 1603 1604 Notes: 1605 1606 The memory usage of this implementation is guaranteed to be constant 1607 in `range.length`. 1608 1609 Breaking from a parallel foreach loop via a break, labeled break, 1610 labeled continue, return or goto statement throws a 1611 `ParallelForeachError`. 1612 1613 In the case of non-random access ranges, parallel foreach buffers lazily 1614 to an array of size `workUnitSize` before executing the parallel portion 1615 of the loop. The exception is that, if a parallel foreach is executed 1616 over a range returned by `asyncBuf` or `map`, the copying is elided 1617 and the buffers are simply swapped. In this case `workUnitSize` is 1618 ignored and the work unit size is set to the buffer size of `range`. 1619 1620 A memory barrier is guaranteed to be executed on exit from the loop, 1621 so that results produced by all threads are visible in the calling thread. 1622 1623 $(B Exception Handling): 1624 1625 When at least one exception is thrown from inside a parallel foreach loop, 1626 the submission of additional `Task` objects is terminated as soon as 1627 possible, in a non-deterministic manner. All executing or 1628 enqueued work units are allowed to complete. Then, all exceptions that 1629 were thrown by any work unit are chained using `Throwable.next` and 1630 rethrown. The order of the exception chaining is non-deterministic. 1631 */ 1632 ParallelForeach!R parallel(R)(R range, size_t workUnitSize) 1633 { 1634 import std.exception : enforce; 1635 enforce(workUnitSize > 0, "workUnitSize must be > 0."); 1636 alias RetType = ParallelForeach!R; 1637 return RetType(this, range, workUnitSize); 1638 } 1639 1640 1641 /// Ditto 1642 ParallelForeach!R parallel(R)(R range) 1643 { 1644 static if (hasLength!R) 1645 { 1646 // Default work unit size is such that we would use 4x as many 1647 // slots as are in this thread pool. 1648 size_t workUnitSize = defaultWorkUnitSize(range.length); 1649 return parallel(range, workUnitSize); 1650 } 1651 else 1652 { 1653 // Just use a really, really dumb guess if the user is too lazy to 1654 // specify. 1655 return parallel(range, 512); 1656 } 1657 } 1658 1659 /// 1660 template amap(functions...) 1661 { 1662 /** 1663 Eager parallel map. The eagerness of this function means it has less 1664 overhead than the lazily evaluated `TaskPool.map` and should be 1665 preferred where the memory requirements of eagerness are acceptable. 1666 `functions` are the functions to be evaluated, passed as template 1667 alias parameters in a style similar to 1668 $(REF map, std,algorithm,iteration). 1669 The first argument must be a random access range. For performance 1670 reasons, amap will assume the range elements have not yet been 1671 initialized. Elements will be overwritten without calling a destructor 1672 nor doing an assignment. As such, the range must not contain meaningful 1673 data$(DDOC_COMMENT not a section): either un-initialized objects, or 1674 objects in their `.init` state. 1675 1676 --- 1677 auto numbers = iota(100_000_000.0); 1678 1679 // Find the square roots of numbers. 1680 // 1681 // Timings on an Athlon 64 X2 dual core machine: 1682 // 1683 // Parallel eager map: 0.802 s 1684 // Equivalent serial implementation: 1.768 s 1685 auto squareRoots = taskPool.amap!sqrt(numbers); 1686 --- 1687 1688 Immediately after the range argument, an optional work unit size argument 1689 may be provided. Work units as used by `amap` are identical to those 1690 defined for parallel foreach. If no work unit size is provided, the 1691 default work unit size is used. 1692 1693 --- 1694 // Same thing, but make work unit size 100. 1695 auto squareRoots = taskPool.amap!sqrt(numbers, 100); 1696 --- 1697 1698 An output range for returning the results may be provided as the last 1699 argument. If one is not provided, an array of the proper type will be 1700 allocated on the garbage collected heap. If one is provided, it must be a 1701 random access range with assignable elements, must have reference 1702 semantics with respect to assignment to its elements, and must have the 1703 same length as the input range. Writing to adjacent elements from 1704 different threads must be safe. 1705 1706 --- 1707 // Same thing, but explicitly allocate an array 1708 // to return the results in. The element type 1709 // of the array may be either the exact type 1710 // returned by functions or an implicit conversion 1711 // target. 1712 auto squareRoots = new float[numbers.length]; 1713 taskPool.amap!sqrt(numbers, squareRoots); 1714 1715 // Multiple functions, explicit output range, and 1716 // explicit work unit size. 1717 auto results = new Tuple!(float, real)[numbers.length]; 1718 taskPool.amap!(sqrt, log)(numbers, 100, results); 1719 --- 1720 1721 Note: 1722 1723 A memory barrier is guaranteed to be executed after all results are written 1724 but before returning so that results produced by all threads are visible 1725 in the calling thread. 1726 1727 Tips: 1728 1729 To perform the mapping operation in place, provide the same range for the 1730 input and output range. 1731 1732 To parallelize the copying of a range with expensive to evaluate elements 1733 to an array, pass an identity function (a function that just returns 1734 whatever argument is provided to it) to `amap`. 1735 1736 $(B Exception Handling): 1737 1738 When at least one exception is thrown from inside the map functions, 1739 the submission of additional `Task` objects is terminated as soon as 1740 possible, in a non-deterministic manner. All currently executing or 1741 enqueued work units are allowed to complete. Then, all exceptions that 1742 were thrown from any work unit are chained using `Throwable.next` and 1743 rethrown. The order of the exception chaining is non-deterministic. 1744 */ 1745 auto amap(Args...)(Args args) 1746 if (isRandomAccessRange!(Args[0])) 1747 { 1748 import core.internal.lifetime : emplaceRef; 1749 1750 alias fun = adjoin!(staticMap!(unaryFun, functions)); 1751 1752 alias range = args[0]; 1753 immutable len = range.length; 1754 1755 static if ( 1756 Args.length > 1 && 1757 randAssignable!(Args[$ - 1]) && 1758 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1])) 1759 ) 1760 { 1761 import std.conv : text; 1762 import std.exception : enforce; 1763 1764 alias buf = args[$ - 1]; 1765 alias args2 = args[0..$ - 1]; 1766 alias Args2 = Args[0..$ - 1]; 1767 enforce(buf.length == len, 1768 text("Can't use a user supplied buffer that's the wrong ", 1769 "size. (Expected :", len, " Got: ", buf.length)); 1770 } 1771 else static if (randAssignable!(Args[$ - 1]) && Args.length > 1) 1772 { 1773 static assert(0, "Wrong buffer type."); 1774 } 1775 else 1776 { 1777 import std.array : uninitializedArray; 1778 1779 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len); 1780 alias args2 = args; 1781 alias Args2 = Args; 1782 } 1783 1784 if (!len) return buf; 1785 1786 static if (isIntegral!(Args2[$ - 1])) 1787 { 1788 static assert(args2.length == 2); 1789 auto workUnitSize = cast(size_t) args2[1]; 1790 } 1791 else 1792 { 1793 static assert(args2.length == 1, Args); 1794 auto workUnitSize = defaultWorkUnitSize(range.length); 1795 } 1796 1797 alias R = typeof(range); 1798 1799 if (workUnitSize > len) 1800 { 1801 workUnitSize = len; 1802 } 1803 1804 // Handle as a special case: 1805 if (size == 0) 1806 { 1807 size_t index = 0; 1808 foreach (elem; range) 1809 { 1810 emplaceRef(buf[index++], fun(elem)); 1811 } 1812 return buf; 1813 } 1814 1815 // Effectively -1: chunkIndex + 1 == 0: 1816 shared size_t workUnitIndex = size_t.max; 1817 shared bool shouldContinue = true; 1818 1819 void doIt() 1820 { 1821 import std.algorithm.comparison : min; 1822 1823 scope(failure) 1824 { 1825 // If an exception is thrown, all threads should bail. 1826 atomicStore(shouldContinue, false); 1827 } 1828 1829 while (atomicLoad(shouldContinue)) 1830 { 1831 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); 1832 immutable start = workUnitSize * myUnitIndex; 1833 if (start >= len) 1834 { 1835 atomicStore(shouldContinue, false); 1836 break; 1837 } 1838 1839 immutable end = min(len, start + workUnitSize); 1840 1841 static if (hasSlicing!R) 1842 { 1843 auto subrange = range[start .. end]; 1844 foreach (i; start .. end) 1845 { 1846 emplaceRef(buf[i], fun(subrange.front)); 1847 subrange.popFront(); 1848 } 1849 } 1850 else 1851 { 1852 foreach (i; start .. end) 1853 { 1854 emplaceRef(buf[i], fun(range[i])); 1855 } 1856 } 1857 } 1858 } 1859 1860 submitAndExecute(this, &doIt); 1861 return buf; 1862 } 1863 } 1864 1865 /// 1866 template map(functions...) 1867 { 1868 /** 1869 A semi-lazy parallel map that can be used for pipelining. The map 1870 functions are evaluated for the first `bufSize` elements and stored in a 1871 buffer and made available to `popFront`. Meanwhile, in the 1872 background a second buffer of the same size is filled. When the first 1873 buffer is exhausted, it is swapped with the second buffer and filled while 1874 the values from what was originally the second buffer are read. This 1875 implementation allows for elements to be written to the buffer without 1876 the need for atomic operations or synchronization for each write, and 1877 enables the mapping function to be evaluated efficiently in parallel. 1878 1879 `map` has more overhead than the simpler procedure used by `amap` 1880 but avoids the need to keep all results in memory simultaneously and works 1881 with non-random access ranges. 1882 1883 Params: 1884 1885 source = The $(REF_ALTTEXT input range, isInputRange, std,range,primitives) 1886 to be mapped. If `source` is not random 1887 access it will be lazily buffered to an array of size `bufSize` before 1888 the map function is evaluated. (For an exception to this rule, see Notes.) 1889 1890 bufSize = The size of the buffer to store the evaluated elements. 1891 1892 workUnitSize = The number of elements to evaluate in a single 1893 `Task`. Must be less than or equal to `bufSize`, and 1894 should be a fraction of `bufSize` such that all worker threads can be 1895 used. If the default of size_t.max is used, workUnitSize will be set to 1896 the pool-wide default. 1897 1898 Returns: An input range representing the results of the map. This range 1899 has a length iff `source` has a length. 1900 1901 Notes: 1902 1903 If a range returned by `map` or `asyncBuf` is used as an input to 1904 `map`, then as an optimization the copying from the output buffer 1905 of the first range to the input buffer of the second range is elided, even 1906 though the ranges returned by `map` and `asyncBuf` are non-random 1907 access ranges. This means that the `bufSize` parameter passed to the 1908 current call to `map` will be ignored and the size of the buffer 1909 will be the buffer size of `source`. 1910 1911 Example: 1912 --- 1913 // Pipeline reading a file, converting each line 1914 // to a number, taking the logarithms of the numbers, 1915 // and performing the additions necessary to find 1916 // the sum of the logarithms. 1917 1918 auto lineRange = File("numberList.txt").byLine(); 1919 auto dupedLines = std.algorithm.map!"a.idup"(lineRange); 1920 auto nums = taskPool.map!(to!double)(dupedLines); 1921 auto logs = taskPool.map!log10(nums); 1922 1923 double sum = 0; 1924 foreach (elem; logs) 1925 { 1926 sum += elem; 1927 } 1928 --- 1929 1930 $(B Exception Handling): 1931 1932 Any exceptions thrown while iterating over `source` 1933 or computing the map function are re-thrown on a call to `popFront` or, 1934 if thrown during construction, are simply allowed to propagate to the 1935 caller. In the case of exceptions thrown while computing the map function, 1936 the exceptions are chained as in `TaskPool.amap`. 1937 */ 1938 auto 1939 map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max) 1940 if (isInputRange!S) 1941 { 1942 import std.exception : enforce; 1943 1944 enforce(workUnitSize == size_t.max || workUnitSize <= bufSize, 1945 "Work unit size must be smaller than buffer size."); 1946 alias fun = adjoin!(staticMap!(unaryFun, functions)); 1947 1948 static final class Map 1949 { 1950 // This is a class because the task needs to be located on the 1951 // heap and in the non-random access case source needs to be on 1952 // the heap, too. 1953 1954 private: 1955 enum bufferTrick = is(typeof(source.buf1)) && 1956 is(typeof(source.bufPos)) && 1957 is(typeof(source.doBufSwap())); 1958 1959 alias E = MapType!(S, functions); 1960 E[] buf1, buf2; 1961 S source; 1962 TaskPool pool; 1963 Task!(run, E[] delegate(E[]), E[]) nextBufTask; 1964 size_t workUnitSize; 1965 size_t bufPos; 1966 bool lastTaskWaited; 1967 1968 static if (isRandomAccessRange!S) 1969 { 1970 alias FromType = S; 1971 1972 void popSource() 1973 { 1974 import std.algorithm.comparison : min; 1975 1976 static if (__traits(compiles, source[0 .. source.length])) 1977 { 1978 source = source[min(buf1.length, source.length)..source.length]; 1979 } 1980 else static if (__traits(compiles, source[0..$])) 1981 { 1982 source = source[min(buf1.length, source.length)..$]; 1983 } 1984 else 1985 { 1986 static assert(0, "S must have slicing for Map." 1987 ~ " " ~ S.stringof ~ " doesn't."); 1988 } 1989 } 1990 } 1991 else static if (bufferTrick) 1992 { 1993 // Make sure we don't have the buffer recycling overload of 1994 // asyncBuf. 1995 static if ( 1996 is(typeof(source.source)) && 1997 isRoundRobin!(typeof(source.source)) 1998 ) 1999 { 2000 static assert(0, "Cannot execute a parallel map on " ~ 2001 "the buffer recycling overload of asyncBuf." 2002 ); 2003 } 2004 2005 alias FromType = typeof(source.buf1); 2006 FromType from; 2007 2008 // Just swap our input buffer with source's output buffer. 2009 // No need to copy element by element. 2010 FromType dumpToFrom() 2011 { 2012 import std.algorithm.mutation : swap; 2013 2014 assert(source.buf1.length <= from.length); 2015 from.length = source.buf1.length; 2016 swap(source.buf1, from); 2017 2018 // Just in case this source has been popped before 2019 // being sent to map: 2020 from = from[source.bufPos..$]; 2021 2022 static if (is(typeof(source._length))) 2023 { 2024 source._length -= (from.length - source.bufPos); 2025 } 2026 2027 source.doBufSwap(); 2028 2029 return from; 2030 } 2031 } 2032 else 2033 { 2034 alias FromType = ElementType!S[]; 2035 2036 // The temporary array that data is copied to before being 2037 // mapped. 2038 FromType from; 2039 2040 FromType dumpToFrom() 2041 { 2042 assert(from !is null); 2043 2044 size_t i; 2045 for (; !source.empty && i < from.length; source.popFront()) 2046 { 2047 from[i++] = source.front; 2048 } 2049 2050 from = from[0 .. i]; 2051 return from; 2052 } 2053 } 2054 2055 static if (hasLength!S) 2056 { 2057 size_t _length; 2058 2059 public @property size_t length() const @safe pure nothrow 2060 { 2061 return _length; 2062 } 2063 } 2064 2065 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool) 2066 { 2067 static if (bufferTrick) 2068 { 2069 bufSize = source.buf1.length; 2070 } 2071 2072 buf1.length = bufSize; 2073 buf2.length = bufSize; 2074 2075 static if (!isRandomAccessRange!S) 2076 { 2077 from.length = bufSize; 2078 } 2079 2080 this.workUnitSize = (workUnitSize == size_t.max) ? 2081 pool.defaultWorkUnitSize(bufSize) : workUnitSize; 2082 this.source = source; 2083 this.pool = pool; 2084 2085 static if (hasLength!S) 2086 { 2087 _length = source.length; 2088 } 2089 2090 buf1 = fillBuf(buf1); 2091 submitBuf2(); 2092 } 2093 2094 // The from parameter is a dummy and ignored in the random access 2095 // case. 2096 E[] fillBuf(E[] buf) 2097 { 2098 import std.algorithm.comparison : min; 2099 2100 static if (isRandomAccessRange!S) 2101 { 2102 import std.range : take; 2103 auto toMap = take(source, buf.length); 2104 scope(success) popSource(); 2105 } 2106 else 2107 { 2108 auto toMap = dumpToFrom(); 2109 } 2110 2111 buf = buf[0 .. min(buf.length, toMap.length)]; 2112 2113 // Handle as a special case: 2114 if (pool.size == 0) 2115 { 2116 size_t index = 0; 2117 foreach (elem; toMap) 2118 { 2119 buf[index++] = fun(elem); 2120 } 2121 return buf; 2122 } 2123 2124 pool.amap!functions(toMap, workUnitSize, buf); 2125 2126 return buf; 2127 } 2128 2129 void submitBuf2() 2130 in 2131 { 2132 assert(nextBufTask.prev is null); 2133 assert(nextBufTask.next is null); 2134 } 2135 do 2136 { 2137 // Hack to reuse the task object. 2138 2139 nextBufTask = typeof(nextBufTask).init; 2140 nextBufTask._args[0] = &fillBuf; 2141 nextBufTask._args[1] = buf2; 2142 pool.put(nextBufTask); 2143 } 2144 2145 void doBufSwap() 2146 { 2147 if (lastTaskWaited) 2148 { 2149 // Then the source is empty. Signal it here. 2150 buf1 = null; 2151 buf2 = null; 2152 2153 static if (!isRandomAccessRange!S) 2154 { 2155 from = null; 2156 } 2157 2158 return; 2159 } 2160 2161 buf2 = buf1; 2162 buf1 = nextBufTask.yieldForce; 2163 bufPos = 0; 2164 2165 if (source.empty) 2166 { 2167 lastTaskWaited = true; 2168 } 2169 else 2170 { 2171 submitBuf2(); 2172 } 2173 } 2174 2175 public: 2176 @property auto front() 2177 { 2178 return buf1[bufPos]; 2179 } 2180 2181 void popFront() 2182 { 2183 static if (hasLength!S) 2184 { 2185 _length--; 2186 } 2187 2188 bufPos++; 2189 if (bufPos >= buf1.length) 2190 { 2191 doBufSwap(); 2192 } 2193 } 2194 2195 static if (isInfinite!S) 2196 { 2197 enum bool empty = false; 2198 } 2199 else 2200 { 2201 2202 bool empty() const @property 2203 { 2204 // popFront() sets this when source is empty 2205 return buf1.length == 0; 2206 } 2207 } 2208 } 2209 return new Map(source, bufSize, workUnitSize, this); 2210 } 2211 } 2212 2213 /** 2214 Given a `source` range that is expensive to iterate over, returns an 2215 $(REF_ALTTEXT input range, isInputRange, std,range,primitives) that 2216 asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread, 2217 while making previously buffered elements from a second buffer, also of size 2218 `bufSize`, available via the range interface of the returned 2219 object. The returned range has a length iff `hasLength!S`. 2220 `asyncBuf` is useful, for example, when performing expensive operations 2221 on the elements of ranges that represent data on a disk or network. 2222 2223 Example: 2224 --- 2225 import std.conv, std.stdio; 2226 2227 void main() 2228 { 2229 // Fetch lines of a file in a background thread 2230 // while processing previously fetched lines, 2231 // dealing with byLine's buffer recycling by 2232 // eagerly duplicating every line. 2233 auto lines = File("foo.txt").byLine(); 2234 auto duped = std.algorithm.map!"a.idup"(lines); 2235 2236 // Fetch more lines in the background while we 2237 // process the lines already read into memory 2238 // into a matrix of doubles. 2239 double[][] matrix; 2240 auto asyncReader = taskPool.asyncBuf(duped); 2241 2242 foreach (line; asyncReader) 2243 { 2244 auto ls = line.split("\t"); 2245 matrix ~= to!(double[])(ls); 2246 } 2247 } 2248 --- 2249 2250 $(B Exception Handling): 2251 2252 Any exceptions thrown while iterating over `source` are re-thrown on a 2253 call to `popFront` or, if thrown during construction, simply 2254 allowed to propagate to the caller. 2255 */ 2256 auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S) 2257 { 2258 static final class AsyncBuf 2259 { 2260 // This is a class because the task and source both need to be on 2261 // the heap. 2262 2263 // The element type of S. 2264 alias E = ElementType!S; // Needs to be here b/c of forward ref bugs. 2265 2266 private: 2267 E[] buf1, buf2; 2268 S source; 2269 TaskPool pool; 2270 Task!(run, E[] delegate(E[]), E[]) nextBufTask; 2271 size_t bufPos; 2272 bool lastTaskWaited; 2273 2274 static if (hasLength!S) 2275 { 2276 size_t _length; 2277 2278 // Available if hasLength!S. 2279 public @property size_t length() const @safe pure nothrow 2280 { 2281 return _length; 2282 } 2283 } 2284 2285 this(S source, size_t bufSize, TaskPool pool) 2286 { 2287 buf1.length = bufSize; 2288 buf2.length = bufSize; 2289 2290 this.source = source; 2291 this.pool = pool; 2292 2293 static if (hasLength!S) 2294 { 2295 _length = source.length; 2296 } 2297 2298 buf1 = fillBuf(buf1); 2299 submitBuf2(); 2300 } 2301 2302 E[] fillBuf(E[] buf) 2303 { 2304 assert(buf !is null); 2305 2306 size_t i; 2307 for (; !source.empty && i < buf.length; source.popFront()) 2308 { 2309 buf[i++] = source.front; 2310 } 2311 2312 buf = buf[0 .. i]; 2313 return buf; 2314 } 2315 2316 void submitBuf2() 2317 in 2318 { 2319 assert(nextBufTask.prev is null); 2320 assert(nextBufTask.next is null); 2321 } 2322 do 2323 { 2324 // Hack to reuse the task object. 2325 2326 nextBufTask = typeof(nextBufTask).init; 2327 nextBufTask._args[0] = &fillBuf; 2328 nextBufTask._args[1] = buf2; 2329 pool.put(nextBufTask); 2330 } 2331 2332 void doBufSwap() 2333 { 2334 if (lastTaskWaited) 2335 { 2336 // Then source is empty. Signal it here. 2337 buf1 = null; 2338 buf2 = null; 2339 return; 2340 } 2341 2342 buf2 = buf1; 2343 buf1 = nextBufTask.yieldForce; 2344 bufPos = 0; 2345 2346 if (source.empty) 2347 { 2348 lastTaskWaited = true; 2349 } 2350 else 2351 { 2352 submitBuf2(); 2353 } 2354 } 2355 2356 public: 2357 E front() @property 2358 { 2359 return buf1[bufPos]; 2360 } 2361 2362 void popFront() 2363 { 2364 static if (hasLength!S) 2365 { 2366 _length--; 2367 } 2368 2369 bufPos++; 2370 if (bufPos >= buf1.length) 2371 { 2372 doBufSwap(); 2373 } 2374 } 2375 2376 static if (isInfinite!S) 2377 { 2378 enum bool empty = false; 2379 } 2380 2381 else 2382 { 2383 /// 2384 bool empty() @property 2385 { 2386 // popFront() sets this when source is empty: 2387 return buf1.length == 0; 2388 } 2389 } 2390 } 2391 return new AsyncBuf(source, bufSize, this); 2392 } 2393 2394 /** 2395 Given a callable object `next` that writes to a user-provided buffer and 2396 a second callable object `empty` that determines whether more data is 2397 available to write via `next`, returns an input range that 2398 asynchronously calls `next` with a set of size `nBuffers` of buffers 2399 and makes the results available in the order they were obtained via the 2400 input range interface of the returned object. Similarly to the 2401 input range overload of `asyncBuf`, the first half of the buffers 2402 are made available via the range interface while the second half are 2403 filled and vice-versa. 2404 2405 Params: 2406 2407 next = A callable object that takes a single argument that must be an array 2408 with mutable elements. When called, `next` writes data to 2409 the array provided by the caller. 2410 2411 empty = A callable object that takes no arguments and returns a type 2412 implicitly convertible to `bool`. This is used to signify 2413 that no more data is available to be obtained by calling `next`. 2414 2415 initialBufSize = The initial size of each buffer. If `next` takes its 2416 array by reference, it may resize the buffers. 2417 2418 nBuffers = The number of buffers to cycle through when calling `next`. 2419 2420 Example: 2421 --- 2422 // Fetch lines of a file in a background 2423 // thread while processing previously fetched 2424 // lines, without duplicating any lines. 2425 auto file = File("foo.txt"); 2426 2427 void next(ref char[] buf) 2428 { 2429 file.readln(buf); 2430 } 2431 2432 // Fetch more lines in the background while we 2433 // process the lines already read into memory 2434 // into a matrix of doubles. 2435 double[][] matrix; 2436 auto asyncReader = taskPool.asyncBuf(&next, &file.eof); 2437 2438 foreach (line; asyncReader) 2439 { 2440 auto ls = line.split("\t"); 2441 matrix ~= to!(double[])(ls); 2442 } 2443 --- 2444 2445 $(B Exception Handling): 2446 2447 Any exceptions thrown while iterating over `range` are re-thrown on a 2448 call to `popFront`. 2449 2450 Warning: 2451 2452 Using the range returned by this function in a parallel foreach loop 2453 will not work because buffers may be overwritten while the task that 2454 processes them is in queue. This is checked for at compile time 2455 and will result in a static assertion failure. 2456 */ 2457 auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100) 2458 if (is(typeof(C2.init()) : bool) && 2459 Parameters!C1.length == 1 && 2460 Parameters!C2.length == 0 && 2461 isArray!(Parameters!C1[0]) 2462 ) { 2463 auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers); 2464 return asyncBuf(roundRobin, nBuffers / 2); 2465 } 2466 2467 /// 2468 template reduce(functions...) 2469 { 2470 /** 2471 Parallel reduce on a random access range. Except as otherwise noted, 2472 usage is similar to $(REF _reduce, std,algorithm,iteration). There is 2473 also $(LREF fold) which does the same thing with a different parameter 2474 order. 2475 2476 This function works by splitting the range to be reduced into work 2477 units, which are slices to be reduced in parallel. Once the results 2478 from all work units are computed, a final serial reduction is performed 2479 on these results to compute the final answer. Therefore, care must be 2480 taken to choose the seed value appropriately. 2481 2482 Because the reduction is being performed in parallel, `functions` 2483 must be associative. For notational simplicity, let # be an 2484 infix operator representing `functions`. Then, (a # b) # c must equal 2485 a # (b # c). Floating point addition is not associative 2486 even though addition in exact arithmetic is. Summing floating 2487 point numbers using this function may give different results than summing 2488 serially. However, for many practical purposes floating point addition 2489 can be treated as associative. 2490 2491 Note that, since `functions` are assumed to be associative, 2492 additional optimizations are made to the serial portion of the reduction 2493 algorithm. These take advantage of the instruction level parallelism of 2494 modern CPUs, in addition to the thread-level parallelism that the rest 2495 of this module exploits. This can lead to better than linear speedups 2496 relative to $(REF _reduce, std,algorithm,iteration), especially for 2497 fine-grained benchmarks like dot products. 2498 2499 An explicit seed may be provided as the first argument. If 2500 provided, it is used as the seed for all work units and for the final 2501 reduction of results from all work units. Therefore, if it is not the 2502 identity value for the operation being performed, results may differ 2503 from those generated by $(REF _reduce, std,algorithm,iteration) or 2504 depending on how many work units are used. The next argument must be 2505 the range to be reduced. 2506 --- 2507 // Find the sum of squares of a range in parallel, using 2508 // an explicit seed. 2509 // 2510 // Timings on an Athlon 64 X2 dual core machine: 2511 // 2512 // Parallel reduce: 72 milliseconds 2513 // Using std.algorithm.reduce instead: 181 milliseconds 2514 auto nums = iota(10_000_000.0f); 2515 auto sumSquares = taskPool.reduce!"a + b"( 2516 0.0, std.algorithm.map!"a * a"(nums) 2517 ); 2518 --- 2519 2520 If no explicit seed is provided, the first element of each work unit 2521 is used as a seed. For the final reduction, the result from the first 2522 work unit is used as the seed. 2523 --- 2524 // Find the sum of a range in parallel, using the first 2525 // element of each work unit as the seed. 2526 auto sum = taskPool.reduce!"a + b"(nums); 2527 --- 2528 2529 An explicit work unit size may be specified as the last argument. 2530 Specifying too small a work unit size will effectively serialize the 2531 reduction, as the final reduction of the result of each work unit will 2532 dominate computation time. If `TaskPool.size` for this instance 2533 is zero, this parameter is ignored and one work unit is used. 2534 --- 2535 // Use a work unit size of 100. 2536 auto sum2 = taskPool.reduce!"a + b"(nums, 100); 2537 2538 // Work unit size of 100 and explicit seed. 2539 auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100); 2540 --- 2541 2542 Parallel reduce supports multiple functions, like 2543 `std.algorithm.reduce`. 2544 --- 2545 // Find both the min and max of nums. 2546 auto minMax = taskPool.reduce!(min, max)(nums); 2547 assert(minMax[0] == reduce!min(nums)); 2548 assert(minMax[1] == reduce!max(nums)); 2549 --- 2550 2551 $(B Exception Handling): 2552 2553 After this function is finished executing, any exceptions thrown 2554 are chained together via `Throwable.next` and rethrown. The chaining 2555 order is non-deterministic. 2556 2557 See_Also: 2558 2559 $(LREF fold) is functionally equivalent to $(LREF _reduce) except the 2560 range parameter comes first and there is no need to use 2561 $(REF_ALTTEXT `tuple`,tuple,std,typecons) for multiple seeds. 2562 */ 2563 auto reduce(Args...)(Args args) 2564 { 2565 import core.exception : OutOfMemoryError; 2566 import core.internal.lifetime : emplaceRef; 2567 import std.exception : enforce; 2568 2569 alias fun = reduceAdjoin!functions; 2570 alias finishFun = reduceFinish!functions; 2571 2572 static if (isIntegral!(Args[$ - 1])) 2573 { 2574 size_t workUnitSize = cast(size_t) args[$ - 1]; 2575 alias args2 = args[0..$ - 1]; 2576 alias Args2 = Args[0..$ - 1]; 2577 } 2578 else 2579 { 2580 alias args2 = args; 2581 alias Args2 = Args; 2582 } 2583 2584 auto makeStartValue(Type)(Type e) 2585 { 2586 static if (functions.length == 1) 2587 { 2588 return e; 2589 } 2590 else 2591 { 2592 typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void; 2593 foreach (i, T; seed.Types) 2594 { 2595 emplaceRef(seed.expand[i], e); 2596 } 2597 2598 return seed; 2599 } 2600 } 2601 2602 static if (args2.length == 2) 2603 { 2604 static assert(isInputRange!(Args2[1])); 2605 alias range = args2[1]; 2606 alias seed = args2[0]; 2607 enum explicitSeed = true; 2608 2609 static if (!is(typeof(workUnitSize))) 2610 { 2611 size_t workUnitSize = defaultWorkUnitSize(range.length); 2612 } 2613 } 2614 else 2615 { 2616 static assert(args2.length == 1); 2617 alias range = args2[0]; 2618 2619 static if (!is(typeof(workUnitSize))) 2620 { 2621 size_t workUnitSize = defaultWorkUnitSize(range.length); 2622 } 2623 2624 enforce(!range.empty, 2625 "Cannot reduce an empty range with first element as start value."); 2626 2627 auto seed = makeStartValue(range.front); 2628 enum explicitSeed = false; 2629 range.popFront(); 2630 } 2631 2632 alias E = typeof(seed); 2633 alias R = typeof(range); 2634 2635 E reduceOnRange(R range, size_t lowerBound, size_t upperBound) 2636 { 2637 // This is for exploiting instruction level parallelism by 2638 // using multiple accumulator variables within each thread, 2639 // since we're assuming functions are associative anyhow. 2640 2641 // This is so that loops can be unrolled automatically. 2642 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5); 2643 enum nILP = ilpTuple.length; 2644 immutable subSize = (upperBound - lowerBound) / nILP; 2645 2646 if (subSize <= 1) 2647 { 2648 // Handle as a special case. 2649 static if (explicitSeed) 2650 { 2651 E result = seed; 2652 } 2653 else 2654 { 2655 E result = makeStartValue(range[lowerBound]); 2656 lowerBound++; 2657 } 2658 2659 foreach (i; lowerBound .. upperBound) 2660 { 2661 result = fun(result, range[i]); 2662 } 2663 2664 return result; 2665 } 2666 2667 assert(subSize > 1); 2668 E[nILP] results; 2669 size_t[nILP] offsets; 2670 2671 foreach (i; ilpTuple) 2672 { 2673 offsets[i] = lowerBound + subSize * i; 2674 2675 static if (explicitSeed) 2676 { 2677 results[i] = seed; 2678 } 2679 else 2680 { 2681 results[i] = makeStartValue(range[offsets[i]]); 2682 offsets[i]++; 2683 } 2684 } 2685 2686 immutable nLoop = subSize - (!explicitSeed); 2687 foreach (i; 0 .. nLoop) 2688 { 2689 foreach (j; ilpTuple) 2690 { 2691 results[j] = fun(results[j], range[offsets[j]]); 2692 offsets[j]++; 2693 } 2694 } 2695 2696 // Finish the remainder. 2697 foreach (i; nILP * subSize + lowerBound .. upperBound) 2698 { 2699 results[$ - 1] = fun(results[$ - 1], range[i]); 2700 } 2701 2702 foreach (i; ilpTuple[1..$]) 2703 { 2704 results[0] = finishFun(results[0], results[i]); 2705 } 2706 2707 return results[0]; 2708 } 2709 2710 immutable len = range.length; 2711 if (len == 0) 2712 { 2713 return seed; 2714 } 2715 2716 if (this.size == 0) 2717 { 2718 return finishFun(seed, reduceOnRange(range, 0, len)); 2719 } 2720 2721 // Unlike the rest of the functions here, I can't use the Task object 2722 // recycling trick here because this has to work on non-commutative 2723 // operations. After all the tasks are done executing, fun() has to 2724 // be applied on the results of these to get a final result, but 2725 // it can't be evaluated out of order. 2726 2727 if (workUnitSize > len) 2728 { 2729 workUnitSize = len; 2730 } 2731 2732 immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1); 2733 assert(nWorkUnits * workUnitSize >= len); 2734 2735 alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t); 2736 RTask[] tasks; 2737 2738 // Can't use alloca() due to https://issues.dlang.org/show_bug.cgi?id=3753 2739 // Use a fixed buffer backed by malloc(). 2740 enum maxStack = 2_048; 2741 byte[maxStack] buf = void; 2742 immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof; 2743 2744 import core.stdc.stdlib : malloc, free; 2745 if (nBytesNeeded <= maxStack) 2746 { 2747 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits]; 2748 } 2749 else 2750 { 2751 auto ptr = cast(RTask*) malloc(nBytesNeeded); 2752 if (!ptr) 2753 { 2754 throw new OutOfMemoryError( 2755 "Out of memory in std.parallelism." 2756 ); 2757 } 2758 2759 tasks = ptr[0 .. nWorkUnits]; 2760 } 2761 2762 scope(exit) 2763 { 2764 if (nBytesNeeded > maxStack) 2765 { 2766 free(tasks.ptr); 2767 } 2768 } 2769 2770 // Hack to take the address of a nested function w/o 2771 // making a closure. 2772 static auto scopedAddress(D)(scope D del) @system 2773 { 2774 auto tmp = del; 2775 return tmp; 2776 } 2777 2778 size_t curPos = 0; 2779 void useTask(ref RTask task) 2780 { 2781 import std.algorithm.comparison : min; 2782 import core.lifetime : emplace; 2783 2784 // Private constructor, so can't feed it's arguments directly 2785 // to emplace 2786 emplace(&task, RTask 2787 ( 2788 scopedAddress(&reduceOnRange), 2789 range, 2790 curPos, // lower bound. 2791 cast() min(len, curPos + workUnitSize) // upper bound. 2792 )); 2793 2794 task.pool = this; 2795 2796 curPos += workUnitSize; 2797 } 2798 2799 foreach (ref task; tasks) 2800 { 2801 useTask(task); 2802 } 2803 2804 foreach (i; 1 .. tasks.length - 1) 2805 { 2806 tasks[i].next = tasks[i + 1].basePtr; 2807 tasks[i + 1].prev = tasks[i].basePtr; 2808 } 2809 2810 if (tasks.length > 1) 2811 { 2812 queueLock(); 2813 scope(exit) queueUnlock(); 2814 2815 abstractPutGroupNoSync( 2816 tasks[1].basePtr, 2817 tasks[$ - 1].basePtr 2818 ); 2819 } 2820 2821 if (tasks.length > 0) 2822 { 2823 try 2824 { 2825 tasks[0].job(); 2826 } 2827 catch (Throwable e) 2828 { 2829 tasks[0].exception = e; 2830 } 2831 tasks[0].taskStatus = TaskStatus.done; 2832 2833 // Try to execute each of these in the current thread 2834 foreach (ref task; tasks[1..$]) 2835 { 2836 tryDeleteExecute(task.basePtr); 2837 } 2838 } 2839 2840 // Now that we've tried to execute every task, they're all either 2841 // done or in progress. Force all of them. 2842 E result = seed; 2843 2844 Throwable firstException; 2845 2846 foreach (ref task; tasks) 2847 { 2848 try 2849 { 2850 task.yieldForce; 2851 } 2852 catch (Throwable e) 2853 { 2854 /* Chain e to front because order doesn't matter and because 2855 * e is not likely to be a chain itself (so fewer traversals) 2856 */ 2857 firstException = Throwable.chainTogether(e, firstException); 2858 continue; 2859 } 2860 2861 if (!firstException) result = finishFun(result, task.returnVal); 2862 } 2863 2864 if (firstException) throw firstException; 2865 2866 return result; 2867 } 2868 } 2869 2870 /// 2871 template fold(functions...) 2872 { 2873 /** Implements the homonym function (also known as `accumulate`, `compress`, 2874 `inject`, or `foldl`) present in various programming languages of 2875 functional flavor. 2876 2877 `fold` is functionally equivalent to $(LREF reduce) except the range 2878 parameter comes first and there is no need to use $(REF_ALTTEXT 2879 `tuple`,tuple,std,typecons) for multiple seeds. 2880 2881 There may be one or more callable entities (`functions` argument) to 2882 apply. 2883 2884 Params: 2885 args = Just the range to _fold over; or the range and one seed 2886 per function; or the range, one seed per function, and 2887 the work unit size 2888 2889 Returns: 2890 The accumulated result as a single value for single function and 2891 as a tuple of values for multiple functions 2892 2893 See_Also: 2894 Similar to $(REF _fold, std,algorithm,iteration), `fold` is a wrapper around $(LREF reduce). 2895 2896 Example: 2897 --- 2898 static int adder(int a, int b) 2899 { 2900 return a + b; 2901 } 2902 static int multiplier(int a, int b) 2903 { 2904 return a * b; 2905 } 2906 2907 // Just the range 2908 auto x = taskPool.fold!adder([1, 2, 3, 4]); 2909 assert(x == 10); 2910 2911 // The range and the seeds (0 and 1 below; also note multiple 2912 // functions in this example) 2913 auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1); 2914 assert(y[0] == 10); 2915 assert(y[1] == 24); 2916 2917 // The range, the seed (0), and the work unit size (20) 2918 auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20); 2919 assert(z == 10); 2920 --- 2921 */ 2922 auto fold(Args...)(Args args) 2923 { 2924 static assert(isInputRange!(Args[0]), "First argument must be an InputRange"); 2925 2926 alias range = args[0]; 2927 2928 static if (Args.length == 1) 2929 { 2930 // Just the range 2931 return reduce!functions(range); 2932 } 2933 else static if (Args.length == 1 + functions.length || 2934 Args.length == 1 + functions.length + 1) 2935 { 2936 static if (functions.length == 1) 2937 { 2938 alias seeds = args[1]; 2939 } 2940 else 2941 { 2942 auto seeds() 2943 { 2944 import std.typecons : tuple; 2945 return tuple(args[1 .. functions.length+1]); 2946 } 2947 } 2948 2949 static if (Args.length == 1 + functions.length) 2950 { 2951 // The range and the seeds 2952 return reduce!functions(seeds, range); 2953 } 2954 else static if (Args.length == 1 + functions.length + 1) 2955 { 2956 // The range, the seeds, and the work unit size 2957 static assert(isIntegral!(Args[$-1]), "Work unit size must be an integral type"); 2958 return reduce!functions(seeds, range, args[$-1]); 2959 } 2960 } 2961 else 2962 { 2963 import std.conv : text; 2964 static assert(0, "Invalid number of arguments (" ~ Args.length.text ~ "): Should be an input range, " 2965 ~ functions.length.text ~ " optional seed(s), and an optional work unit size."); 2966 } 2967 } 2968 } 2969 2970 // This test is not included in the documentation because even though these 2971 // examples are for the inner fold() template, with their current location, 2972 // they would appear under the outer one. (We can't move this inside the 2973 // outer fold() template because then dmd runs out of memory possibly due to 2974 // recursive template instantiation, which is surprisingly not caught.) 2975 @system unittest 2976 { 2977 // Just the range 2978 auto x = taskPool.fold!"a + b"([1, 2, 3, 4]); 2979 assert(x == 10); 2980 2981 // The range and the seeds (0 and 1 below; also note multiple 2982 // functions in this example) 2983 auto y = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 1); 2984 assert(y[0] == 10); 2985 assert(y[1] == 24); 2986 2987 // The range, the seed (0), and the work unit size (20) 2988 auto z = taskPool.fold!"a + b"([1, 2, 3, 4], 0, 20); 2989 assert(z == 10); 2990 } 2991 2992 /** 2993 Gets the index of the current thread relative to this `TaskPool`. Any 2994 thread not in this pool will receive an index of 0. The worker threads in 2995 this pool receive unique indices of 1 through `this.size`. 2996 2997 This function is useful for maintaining worker-local resources. 2998 2999 Example: 3000 --- 3001 // Execute a loop that computes the greatest common 3002 // divisor of every number from 0 through 999 with 3003 // 42 in parallel. Write the results out to 3004 // a set of files, one for each thread. This allows 3005 // results to be written out without any synchronization. 3006 3007 import std.conv, std.range, std.numeric, std.stdio; 3008 3009 void main() 3010 { 3011 auto filesHandles = new File[taskPool.size + 1]; 3012 scope(exit) { 3013 foreach (ref handle; fileHandles) 3014 { 3015 handle.close(); 3016 } 3017 } 3018 3019 foreach (i, ref handle; fileHandles) 3020 { 3021 handle = File("workerResults" ~ to!string(i) ~ ".txt"); 3022 } 3023 3024 foreach (num; parallel(iota(1_000))) 3025 { 3026 auto outHandle = fileHandles[taskPool.workerIndex]; 3027 outHandle.writeln(num, '\t', gcd(num, 42)); 3028 } 3029 } 3030 --- 3031 */ 3032 size_t workerIndex() @property @safe const nothrow 3033 { 3034 immutable rawInd = threadIndex; 3035 return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ? 3036 (rawInd - instanceStartIndex + 1) : 0; 3037 } 3038 3039 /** 3040 Struct for creating worker-local storage. Worker-local storage is 3041 thread-local storage that exists only for worker threads in a given 3042 `TaskPool` plus a single thread outside the pool. It is allocated on the 3043 garbage collected heap in a way that avoids _false sharing, and doesn't 3044 necessarily have global scope within any thread. It can be accessed from 3045 any worker thread in the `TaskPool` that created it, and one thread 3046 outside this `TaskPool`. All threads outside the pool that created a 3047 given instance of worker-local storage share a single slot. 3048 3049 Since the underlying data for this struct is heap-allocated, this struct 3050 has reference semantics when passed between functions. 3051 3052 The main uses cases for `WorkerLocalStorage` are: 3053 3054 1. Performing parallel reductions with an imperative, as opposed to 3055 functional, programming style. In this case, it's useful to treat 3056 `WorkerLocalStorage` as local to each thread for only the parallel 3057 portion of an algorithm. 3058 3059 2. Recycling temporary buffers across iterations of a parallel foreach loop. 3060 3061 Example: 3062 --- 3063 // Calculate pi as in our synopsis example, but 3064 // use an imperative instead of a functional style. 3065 immutable n = 1_000_000_000; 3066 immutable delta = 1.0L / n; 3067 3068 auto sums = taskPool.workerLocalStorage(0.0L); 3069 foreach (i; parallel(iota(n))) 3070 { 3071 immutable x = ( i - 0.5L ) * delta; 3072 immutable toAdd = delta / ( 1.0 + x * x ); 3073 sums.get += toAdd; 3074 } 3075 3076 // Add up the results from each worker thread. 3077 real pi = 0; 3078 foreach (threadResult; sums.toRange) 3079 { 3080 pi += 4.0L * threadResult; 3081 } 3082 --- 3083 */ 3084 static struct WorkerLocalStorage(T) 3085 { 3086 private: 3087 TaskPool pool; 3088 size_t size; 3089 3090 size_t elemSize; 3091 bool* stillThreadLocal; 3092 3093 static size_t roundToLine(size_t num) pure nothrow 3094 { 3095 if (num % cacheLineSize == 0) 3096 { 3097 return num; 3098 } 3099 else 3100 { 3101 return ((num / cacheLineSize) + 1) * cacheLineSize; 3102 } 3103 } 3104 3105 void* data; 3106 3107 void initialize(TaskPool pool) 3108 { 3109 this.pool = pool; 3110 size = pool.size + 1; 3111 stillThreadLocal = new bool; 3112 *stillThreadLocal = true; 3113 3114 // Determines whether the GC should scan the array. 3115 auto blkInfo = (typeid(T).flags & 1) ? 3116 cast(GC.BlkAttr) 0 : 3117 GC.BlkAttr.NO_SCAN; 3118 3119 immutable nElem = pool.size + 1; 3120 elemSize = roundToLine(T.sizeof); 3121 3122 // The + 3 is to pad one full cache line worth of space on either side 3123 // of the data structure to make sure false sharing with completely 3124 // unrelated heap data is prevented, and to provide enough padding to 3125 // make sure that data is cache line-aligned. 3126 data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize; 3127 3128 // Cache line align data ptr. 3129 data = cast(void*) roundToLine(cast(size_t) data); 3130 3131 foreach (i; 0 .. nElem) 3132 { 3133 this.opIndex(i) = T.init; 3134 } 3135 } 3136 3137 ref opIndex(this Qualified)(size_t index) 3138 { 3139 import std.conv : text; 3140 assert(index < size, text(index, '\t', uint.max)); 3141 return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index)); 3142 } 3143 3144 void opIndexAssign(T val, size_t index) 3145 { 3146 assert(index < size); 3147 *(cast(T*) (data + elemSize * index)) = val; 3148 } 3149 3150 public: 3151 /** 3152 Get the current thread's instance. Returns by ref. 3153 Note that calling `get` from any thread 3154 outside the `TaskPool` that created this instance will return the 3155 same reference, so an instance of worker-local storage should only be 3156 accessed from one thread outside the pool that created it. If this 3157 rule is violated, undefined behavior will result. 3158 3159 If assertions are enabled and `toRange` has been called, then this 3160 WorkerLocalStorage instance is no longer worker-local and an assertion 3161 failure will result when calling this method. This is not checked 3162 when assertions are disabled for performance reasons. 3163 */ 3164 ref get(this Qualified)() @property 3165 { 3166 assert(*stillThreadLocal, 3167 "Cannot call get() on this instance of WorkerLocalStorage " ~ 3168 "because it is no longer worker-local." 3169 ); 3170 return opIndex(pool.workerIndex); 3171 } 3172 3173 /** 3174 Assign a value to the current thread's instance. This function has 3175 the same caveats as its overload. 3176 */ 3177 void get(T val) @property 3178 { 3179 assert(*stillThreadLocal, 3180 "Cannot call get() on this instance of WorkerLocalStorage " ~ 3181 "because it is no longer worker-local." 3182 ); 3183 3184 opIndexAssign(val, pool.workerIndex); 3185 } 3186 3187 /** 3188 Returns a range view of the values for all threads, which can be used 3189 to further process the results of each thread after running the parallel 3190 part of your algorithm. Do not use this method in the parallel portion 3191 of your algorithm. 3192 3193 Calling this function sets a flag indicating that this struct is no 3194 longer worker-local, and attempting to use the `get` method again 3195 will result in an assertion failure if assertions are enabled. 3196 */ 3197 WorkerLocalStorageRange!T toRange() @property 3198 { 3199 if (*stillThreadLocal) 3200 { 3201 *stillThreadLocal = false; 3202 3203 // Make absolutely sure results are visible to all threads. 3204 // This is probably not necessary since some other 3205 // synchronization primitive will be used to signal that the 3206 // parallel part of the algorithm is done, but the 3207 // performance impact should be negligible, so it's better 3208 // to be safe. 3209 ubyte barrierDummy; 3210 atomicSetUbyte(barrierDummy, 1); 3211 } 3212 3213 return WorkerLocalStorageRange!T(this); 3214 } 3215 } 3216 3217 /** 3218 Range primitives for worker-local storage. The purpose of this is to 3219 access results produced by each worker thread from a single thread once you 3220 are no longer using the worker-local storage from multiple threads. 3221 Do not use this struct in the parallel portion of your algorithm. 3222 3223 The proper way to instantiate this object is to call 3224 `WorkerLocalStorage.toRange`. Once instantiated, this object behaves 3225 as a finite random-access range with assignable, lvalue elements and 3226 a length equal to the number of worker threads in the `TaskPool` that 3227 created it plus 1. 3228 */ 3229 static struct WorkerLocalStorageRange(T) 3230 { 3231 private: 3232 WorkerLocalStorage!T workerLocalStorage; 3233 3234 size_t _length; 3235 size_t beginOffset; 3236 3237 this(WorkerLocalStorage!T wl) 3238 { 3239 this.workerLocalStorage = wl; 3240 _length = wl.size; 3241 } 3242 3243 public: 3244 ref front(this Qualified)() @property 3245 { 3246 return this[0]; 3247 } 3248 3249 ref back(this Qualified)() @property 3250 { 3251 return this[_length - 1]; 3252 } 3253 3254 void popFront() 3255 { 3256 if (_length > 0) 3257 { 3258 beginOffset++; 3259 _length--; 3260 } 3261 } 3262 3263 void popBack() 3264 { 3265 if (_length > 0) 3266 { 3267 _length--; 3268 } 3269 } 3270 3271 typeof(this) save() @property 3272 { 3273 return this; 3274 } 3275 3276 ref opIndex(this Qualified)(size_t index) 3277 { 3278 assert(index < _length); 3279 return workerLocalStorage[index + beginOffset]; 3280 } 3281 3282 void opIndexAssign(T val, size_t index) 3283 { 3284 assert(index < _length); 3285 workerLocalStorage[index] = val; 3286 } 3287 3288 typeof(this) opSlice(size_t lower, size_t upper) 3289 { 3290 assert(upper <= _length); 3291 auto newWl = this.workerLocalStorage; 3292 newWl.data += lower * newWl.elemSize; 3293 newWl.size = upper - lower; 3294 return typeof(this)(newWl); 3295 } 3296 3297 bool empty() const @property 3298 { 3299 return length == 0; 3300 } 3301 3302 size_t length() const @property 3303 { 3304 return _length; 3305 } 3306 } 3307 3308 /** 3309 Creates an instance of worker-local storage, initialized with a given 3310 value. The value is `lazy` so that you can, for example, easily 3311 create one instance of a class for each worker. For usage example, 3312 see the `WorkerLocalStorage` struct. 3313 */ 3314 WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init) 3315 { 3316 WorkerLocalStorage!T ret; 3317 ret.initialize(this); 3318 foreach (i; 0 .. size + 1) 3319 { 3320 ret[i] = initialVal; 3321 } 3322 3323 // Memory barrier to make absolutely sure that what we wrote is 3324 // visible to worker threads. 3325 ubyte barrierDummy; 3326 atomicSetUbyte(barrierDummy, 0); 3327 3328 return ret; 3329 } 3330 3331 /** 3332 Signals to all worker threads to terminate as soon as they are finished 3333 with their current `Task`, or immediately if they are not executing a 3334 `Task`. `Task`s that were in queue will not be executed unless 3335 a call to `Task.workForce`, `Task.yieldForce` or `Task.spinForce` 3336 causes them to be executed. 3337 3338 Use only if you have waited on every `Task` and therefore know the 3339 queue is empty, or if you speculatively executed some tasks and no longer 3340 need the results. 3341 */ 3342 void stop() @trusted 3343 { 3344 queueLock(); 3345 scope(exit) queueUnlock(); 3346 atomicSetUbyte(status, PoolState.stopNow); 3347 notifyAll(); 3348 } 3349 3350 /** 3351 Signals worker threads to terminate when the queue becomes empty. 3352 3353 If blocking argument is true, wait for all worker threads to terminate 3354 before returning. This option might be used in applications where 3355 task results are never consumed-- e.g. when `TaskPool` is employed as a 3356 rudimentary scheduler for tasks which communicate by means other than 3357 return values. 3358 3359 Warning: Calling this function with $(D blocking = true) from a worker 3360 thread that is a member of the same `TaskPool` that 3361 `finish` is being called on will result in a deadlock. 3362 */ 3363 void finish(bool blocking = false) @trusted 3364 { 3365 { 3366 queueLock(); 3367 scope(exit) queueUnlock(); 3368 atomicCasUbyte(status, PoolState.running, PoolState.finishing); 3369 notifyAll(); 3370 } 3371 if (blocking) 3372 { 3373 // Use this thread as a worker until everything is finished. 3374 executeWorkLoop(); 3375 3376 foreach (t; pool) 3377 { 3378 // Maybe there should be something here to prevent a thread 3379 // from calling join() on itself if this function is called 3380 // from a worker thread in the same pool, but: 3381 // 3382 // 1. Using an if statement to skip join() would result in 3383 // finish() returning without all tasks being finished. 3384 // 3385 // 2. If an exception were thrown, it would bubble up to the 3386 // Task from which finish() was called and likely be 3387 // swallowed. 3388 t.join(); 3389 } 3390 } 3391 } 3392 3393 /// Returns the number of worker threads in the pool. 3394 @property size_t size() @safe const pure nothrow 3395 { 3396 return pool.length; 3397 } 3398 3399 /** 3400 Put a `Task` object on the back of the task queue. The `Task` 3401 object may be passed by pointer or reference. 3402 3403 Example: 3404 --- 3405 import std.file; 3406 3407 // Create a task. 3408 auto t = task!read("foo.txt"); 3409 3410 // Add it to the queue to be executed. 3411 taskPool.put(t); 3412 --- 3413 3414 Notes: 3415 3416 @trusted overloads of this function are called for `Task`s if 3417 $(REF hasUnsharedAliasing, std,traits) is false for the `Task`'s 3418 return type or the function the `Task` executes is `pure`. 3419 `Task` objects that meet all other requirements specified in the 3420 `@trusted` overloads of `task` and `scopedTask` may be created 3421 and executed from `@safe` code via `Task.executeInNewThread` but 3422 not via `TaskPool`. 3423 3424 While this function takes the address of variables that may 3425 be on the stack, some overloads are marked as @trusted. 3426 `Task` includes a destructor that waits for the task to complete 3427 before destroying the stack frame it is allocated on. Therefore, 3428 it is impossible for the stack frame to be destroyed before the task is 3429 complete and no longer referenced by a `TaskPool`. 3430 */ 3431 void put(alias fun, Args...)(ref Task!(fun, Args) task) 3432 if (!isSafeReturn!(typeof(task))) 3433 { 3434 task.pool = this; 3435 abstractPut(task.basePtr); 3436 } 3437 3438 /// Ditto 3439 void put(alias fun, Args...)(Task!(fun, Args)* task) 3440 if (!isSafeReturn!(typeof(*task))) 3441 { 3442 import std.exception : enforce; 3443 enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); 3444 put(*task); 3445 } 3446 3447 @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task) 3448 if (isSafeReturn!(typeof(task))) 3449 { 3450 task.pool = this; 3451 abstractPut(task.basePtr); 3452 } 3453 3454 @trusted void put(alias fun, Args...)(Task!(fun, Args)* task) 3455 if (isSafeReturn!(typeof(*task))) 3456 { 3457 import std.exception : enforce; 3458 enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); 3459 put(*task); 3460 } 3461 3462 /** 3463 These properties control whether the worker threads are daemon threads. 3464 A daemon thread is automatically terminated when all non-daemon threads 3465 have terminated. A non-daemon thread will prevent a program from 3466 terminating as long as it has not terminated. 3467 3468 If any `TaskPool` with non-daemon threads is active, either `stop` 3469 or `finish` must be called on it before the program can terminate. 3470 3471 The worker treads in the `TaskPool` instance returned by the 3472 `taskPool` property are daemon by default. The worker threads of 3473 manually instantiated task pools are non-daemon by default. 3474 3475 Note: For a size zero pool, the getter arbitrarily returns true and the 3476 setter has no effect. 3477 */ 3478 bool isDaemon() @property @trusted 3479 { 3480 queueLock(); 3481 scope(exit) queueUnlock(); 3482 return (size == 0) ? true : pool[0].isDaemon; 3483 } 3484 3485 /// Ditto 3486 void isDaemon(bool newVal) @property @trusted 3487 { 3488 queueLock(); 3489 scope(exit) queueUnlock(); 3490 foreach (thread; pool) 3491 { 3492 thread.isDaemon = newVal; 3493 } 3494 } 3495 3496 /** 3497 These functions allow getting and setting the OS scheduling priority of 3498 the worker threads in this `TaskPool`. They forward to 3499 `core.thread.Thread.priority`, so a given priority value here means the 3500 same thing as an identical priority value in `core.thread`. 3501 3502 Note: For a size zero pool, the getter arbitrarily returns 3503 `core.thread.Thread.PRIORITY_MIN` and the setter has no effect. 3504 */ 3505 int priority() @property @trusted 3506 { 3507 return (size == 0) ? core.thread.Thread.PRIORITY_MIN : 3508 pool[0].priority; 3509 } 3510 3511 /// Ditto 3512 void priority(int newPriority) @property @trusted 3513 { 3514 if (size > 0) 3515 { 3516 foreach (t; pool) 3517 { 3518 t.priority = newPriority; 3519 } 3520 } 3521 } 3522 } 3523 3524 @system unittest 3525 { 3526 import std.algorithm.iteration : sum; 3527 import std.range : iota; 3528 import std.typecons : tuple; 3529 3530 enum N = 100; 3531 auto r = iota(1, N + 1); 3532 const expected = r.sum(); 3533 3534 // Just the range 3535 assert(taskPool.fold!"a + b"(r) == expected); 3536 3537 // Range and seeds 3538 assert(taskPool.fold!"a + b"(r, 0) == expected); 3539 assert(taskPool.fold!("a + b", "a + b")(r, 0, 0) == tuple(expected, expected)); 3540 3541 // Range, seeds, and work unit size 3542 assert(taskPool.fold!"a + b"(r, 0, 42) == expected); 3543 assert(taskPool.fold!("a + b", "a + b")(r, 0, 0, 42) == tuple(expected, expected)); 3544 } 3545 3546 // Issue 16705 3547 @system unittest 3548 { 3549 struct MyIota 3550 { 3551 size_t front; 3552 void popFront()(){front++;} 3553 auto empty(){return front >= 25;} 3554 auto opIndex(size_t i){return front+i;} 3555 auto length(){return 25-front;} 3556 } 3557 3558 auto mySum = taskPool.reduce!"a + b"(MyIota()); 3559 } 3560 3561 /** 3562 Returns a lazily initialized global instantiation of `TaskPool`. 3563 This function can safely be called concurrently from multiple non-worker 3564 threads. The worker threads in this pool are daemon threads, meaning that it 3565 is not necessary to call `TaskPool.stop` or `TaskPool.finish` before 3566 terminating the main thread. 3567 */ 3568 @property TaskPool taskPool() @trusted 3569 { 3570 import std.concurrency : initOnce; 3571 __gshared TaskPool pool; 3572 return initOnce!pool({ 3573 auto p = new TaskPool(defaultPoolThreads); 3574 p.isDaemon = true; 3575 return p; 3576 }()); 3577 } 3578 3579 private shared uint _defaultPoolThreads = uint.max; 3580 3581 /** 3582 These properties get and set the number of worker threads in the `TaskPool` 3583 instance returned by `taskPool`. The default value is `totalCPUs` - 1. 3584 Calling the setter after the first call to `taskPool` does not changes 3585 number of worker threads in the instance returned by `taskPool`. 3586 */ 3587 @property uint defaultPoolThreads() @trusted 3588 { 3589 const local = atomicLoad(_defaultPoolThreads); 3590 return local < uint.max ? local : totalCPUs - 1; 3591 } 3592 3593 /// Ditto 3594 @property void defaultPoolThreads(uint newVal) @trusted 3595 { 3596 atomicStore(_defaultPoolThreads, newVal); 3597 } 3598 3599 /** 3600 Convenience functions that forwards to `taskPool.parallel`. The 3601 purpose of these is to make parallel foreach less verbose and more 3602 readable. 3603 3604 Example: 3605 --- 3606 // Find the logarithm of every number from 3607 // 1 to 1_000_000 in parallel, using the 3608 // default TaskPool instance. 3609 auto logs = new double[1_000_000]; 3610 3611 foreach (i, ref elem; parallel(logs)) 3612 { 3613 elem = log(i + 1.0); 3614 } 3615 --- 3616 3617 */ 3618 ParallelForeach!R parallel(R)(R range) 3619 { 3620 return taskPool.parallel(range); 3621 } 3622 3623 /// Ditto 3624 ParallelForeach!R parallel(R)(R range, size_t workUnitSize) 3625 { 3626 return taskPool.parallel(range, workUnitSize); 3627 } 3628 3629 // `each` should be usable with parallel 3630 // https://issues.dlang.org/show_bug.cgi?id=17019 3631 @system unittest 3632 { 3633 import std.algorithm.iteration : each, sum; 3634 import std.range : iota; 3635 3636 // check behavior with parallel 3637 auto arr = new int[10]; 3638 parallel(arr).each!((ref e) => e += 1); 3639 assert(arr.sum == 10); 3640 3641 auto arrIndex = new int[10]; 3642 parallel(arrIndex).each!((i, ref e) => e += i); 3643 assert(arrIndex.sum == 10.iota.sum); 3644 } 3645 3646 // https://issues.dlang.org/show_bug.cgi?id=22745 3647 @system unittest 3648 { 3649 auto pool = new TaskPool(0); 3650 int[] empty; 3651 foreach (i; pool.parallel(empty)) {} 3652 pool.finish(); 3653 } 3654 3655 // Thrown when a parallel foreach loop is broken from. 3656 class ParallelForeachError : Error 3657 { 3658 this() 3659 { 3660 super("Cannot break from a parallel foreach loop using break, return, " 3661 ~ "labeled break/continue or goto statements."); 3662 } 3663 } 3664 3665 /*------Structs that implement opApply for parallel foreach.------------------*/ 3666 private template randLen(R) 3667 { 3668 enum randLen = isRandomAccessRange!R && hasLength!R; 3669 } 3670 3671 private void submitAndExecute( 3672 TaskPool pool, 3673 scope void delegate() doIt 3674 ) 3675 { 3676 import core.exception : OutOfMemoryError; 3677 immutable nThreads = pool.size + 1; 3678 3679 alias PTask = typeof(scopedTask(doIt)); 3680 import core.stdc.stdlib : malloc, free; 3681 import core.stdc.string : memcpy; 3682 3683 // The logical thing to do would be to just use alloca() here, but that 3684 // causes problems on Windows for reasons that I don't understand 3685 // (tentatively a compiler bug) and definitely doesn't work on Posix due 3686 // to https://issues.dlang.org/show_bug.cgi?id=3753. 3687 // Therefore, allocate a fixed buffer and fall back to `malloc()` if 3688 // someone's using a ridiculous amount of threads. 3689 // Also, the using a byte array instead of a PTask array as the fixed buffer 3690 // is to prevent d'tors from being called on uninitialized excess PTask 3691 // instances. 3692 enum nBuf = 64; 3693 byte[nBuf * PTask.sizeof] buf = void; 3694 PTask[] tasks; 3695 if (nThreads <= nBuf) 3696 { 3697 tasks = (cast(PTask*) buf.ptr)[0 .. nThreads]; 3698 } 3699 else 3700 { 3701 auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof); 3702 if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism."); 3703 tasks = ptr[0 .. nThreads]; 3704 } 3705 3706 scope(exit) 3707 { 3708 if (nThreads > nBuf) 3709 { 3710 free(tasks.ptr); 3711 } 3712 } 3713 3714 foreach (ref t; tasks) 3715 { 3716 import core.stdc.string : memcpy; 3717 3718 // This silly looking code is necessary to prevent d'tors from being 3719 // called on uninitialized objects. 3720 auto temp = scopedTask(doIt); 3721 memcpy(&t, &temp, PTask.sizeof); 3722 3723 // This has to be done to t after copying, not temp before copying. 3724 // Otherwise, temp's destructor will sit here and wait for the 3725 // task to finish. 3726 t.pool = pool; 3727 } 3728 3729 foreach (i; 1 .. tasks.length - 1) 3730 { 3731 tasks[i].next = tasks[i + 1].basePtr; 3732 tasks[i + 1].prev = tasks[i].basePtr; 3733 } 3734 3735 if (tasks.length > 1) 3736 { 3737 pool.queueLock(); 3738 scope(exit) pool.queueUnlock(); 3739 3740 pool.abstractPutGroupNoSync( 3741 tasks[1].basePtr, 3742 tasks[$ - 1].basePtr 3743 ); 3744 } 3745 3746 if (tasks.length > 0) 3747 { 3748 try 3749 { 3750 tasks[0].job(); 3751 } 3752 catch (Throwable e) 3753 { 3754 tasks[0].exception = e; // nocoverage 3755 } 3756 tasks[0].taskStatus = TaskStatus.done; 3757 3758 // Try to execute each of these in the current thread 3759 foreach (ref task; tasks[1..$]) 3760 { 3761 pool.tryDeleteExecute(task.basePtr); 3762 } 3763 } 3764 3765 Throwable firstException; 3766 3767 foreach (i, ref task; tasks) 3768 { 3769 try 3770 { 3771 task.yieldForce; 3772 } 3773 catch (Throwable e) 3774 { 3775 /* Chain e to front because order doesn't matter and because 3776 * e is not likely to be a chain itself (so fewer traversals) 3777 */ 3778 firstException = Throwable.chainTogether(e, firstException); 3779 continue; 3780 } 3781 } 3782 3783 if (firstException) throw firstException; 3784 } 3785 3786 void foreachErr() 3787 { 3788 throw new ParallelForeachError(); 3789 } 3790 3791 int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg) 3792 { 3793 with(p) 3794 { 3795 int res = 0; 3796 size_t index = 0; 3797 3798 // The explicit ElementType!R in the foreach loops is necessary for 3799 // correct behavior when iterating over strings. 3800 static if (hasLvalueElements!R) 3801 { 3802 foreach (ref ElementType!R elem; range) 3803 { 3804 static if (Parameters!dg.length == 2) 3805 { 3806 res = dg(index, elem); 3807 } 3808 else 3809 { 3810 res = dg(elem); 3811 } 3812 if (res) break; 3813 index++; 3814 } 3815 } 3816 else 3817 { 3818 foreach (ElementType!R elem; range) 3819 { 3820 static if (Parameters!dg.length == 2) 3821 { 3822 res = dg(index, elem); 3823 } 3824 else 3825 { 3826 res = dg(elem); 3827 } 3828 if (res) break; 3829 index++; 3830 } 3831 } 3832 if (res) foreachErr; 3833 return res; 3834 } 3835 } 3836 3837 private enum string parallelApplyMixinRandomAccess = q{ 3838 // Handle empty thread pool as special case. 3839 if (pool.size == 0) 3840 { 3841 return doSizeZeroCase(this, dg); 3842 } 3843 3844 // Whether iteration is with or without an index variable. 3845 enum withIndex = Parameters!(typeof(dg)).length == 2; 3846 3847 shared size_t workUnitIndex = size_t.max; // Effectively -1: chunkIndex + 1 == 0 3848 immutable len = range.length; 3849 if (!len) return 0; 3850 3851 shared bool shouldContinue = true; 3852 3853 void doIt() 3854 { 3855 import std.algorithm.comparison : min; 3856 3857 scope(failure) 3858 { 3859 // If an exception is thrown, all threads should bail. 3860 atomicStore(shouldContinue, false); 3861 } 3862 3863 while (atomicLoad(shouldContinue)) 3864 { 3865 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); 3866 immutable start = workUnitSize * myUnitIndex; 3867 if (start >= len) 3868 { 3869 atomicStore(shouldContinue, false); 3870 break; 3871 } 3872 3873 immutable end = min(len, start + workUnitSize); 3874 3875 foreach (i; start .. end) 3876 { 3877 static if (withIndex) 3878 { 3879 if (dg(i, range[i])) foreachErr(); 3880 } 3881 else 3882 { 3883 if (dg(range[i])) foreachErr(); 3884 } 3885 } 3886 } 3887 } 3888 3889 submitAndExecute(pool, &doIt); 3890 3891 return 0; 3892 }; 3893 3894 enum string parallelApplyMixinInputRange = q{ 3895 // Handle empty thread pool as special case. 3896 if (pool.size == 0) 3897 { 3898 return doSizeZeroCase(this, dg); 3899 } 3900 3901 // Whether iteration is with or without an index variable. 3902 enum withIndex = Parameters!(typeof(dg)).length == 2; 3903 3904 // This protects the range while copying it. 3905 auto rangeMutex = new Mutex(); 3906 3907 shared bool shouldContinue = true; 3908 3909 // The total number of elements that have been popped off range. 3910 // This is updated only while protected by rangeMutex; 3911 size_t nPopped = 0; 3912 3913 static if ( 3914 is(typeof(range.buf1)) && 3915 is(typeof(range.bufPos)) && 3916 is(typeof(range.doBufSwap())) 3917 ) 3918 { 3919 // Make sure we don't have the buffer recycling overload of 3920 // asyncBuf. 3921 static if ( 3922 is(typeof(range.source)) && 3923 isRoundRobin!(typeof(range.source)) 3924 ) 3925 { 3926 static assert(0, "Cannot execute a parallel foreach loop on " ~ 3927 "the buffer recycling overload of asyncBuf."); 3928 } 3929 3930 enum bool bufferTrick = true; 3931 } 3932 else 3933 { 3934 enum bool bufferTrick = false; 3935 } 3936 3937 void doIt() 3938 { 3939 scope(failure) 3940 { 3941 // If an exception is thrown, all threads should bail. 3942 atomicStore(shouldContinue, false); 3943 } 3944 3945 static if (hasLvalueElements!R) 3946 { 3947 alias Temp = ElementType!R*[]; 3948 Temp temp; 3949 3950 // Returns: The previous value of nPopped. 3951 size_t makeTemp() 3952 { 3953 import std.algorithm.internal : addressOf; 3954 import std.array : uninitializedArray; 3955 3956 if (temp is null) 3957 { 3958 temp = uninitializedArray!Temp(workUnitSize); 3959 } 3960 3961 rangeMutex.lock(); 3962 scope(exit) rangeMutex.unlock(); 3963 3964 size_t i = 0; 3965 for (; i < workUnitSize && !range.empty; range.popFront(), i++) 3966 { 3967 temp[i] = addressOf(range.front); 3968 } 3969 3970 temp = temp[0 .. i]; 3971 auto ret = nPopped; 3972 nPopped += temp.length; 3973 return ret; 3974 } 3975 3976 } 3977 else 3978 { 3979 3980 alias Temp = ElementType!R[]; 3981 Temp temp; 3982 3983 // Returns: The previous value of nPopped. 3984 static if (!bufferTrick) size_t makeTemp() 3985 { 3986 import std.array : uninitializedArray; 3987 3988 if (temp is null) 3989 { 3990 temp = uninitializedArray!Temp(workUnitSize); 3991 } 3992 3993 rangeMutex.lock(); 3994 scope(exit) rangeMutex.unlock(); 3995 3996 size_t i = 0; 3997 for (; i < workUnitSize && !range.empty; range.popFront(), i++) 3998 { 3999 temp[i] = range.front; 4000 } 4001 4002 temp = temp[0 .. i]; 4003 auto ret = nPopped; 4004 nPopped += temp.length; 4005 return ret; 4006 } 4007 4008 static if (bufferTrick) size_t makeTemp() 4009 { 4010 import std.algorithm.mutation : swap; 4011 rangeMutex.lock(); 4012 scope(exit) rangeMutex.unlock(); 4013 4014 // Elide copying by just swapping buffers. 4015 temp.length = range.buf1.length; 4016 swap(range.buf1, temp); 4017 4018 // This is necessary in case popFront() has been called on 4019 // range before entering the parallel foreach loop. 4020 temp = temp[range.bufPos..$]; 4021 4022 static if (is(typeof(range._length))) 4023 { 4024 range._length -= (temp.length - range.bufPos); 4025 } 4026 4027 range.doBufSwap(); 4028 auto ret = nPopped; 4029 nPopped += temp.length; 4030 return ret; 4031 } 4032 } 4033 4034 while (atomicLoad(shouldContinue)) 4035 { 4036 auto overallIndex = makeTemp(); 4037 if (temp.empty) 4038 { 4039 atomicStore(shouldContinue, false); 4040 break; 4041 } 4042 4043 foreach (i; 0 .. temp.length) 4044 { 4045 scope(success) overallIndex++; 4046 4047 static if (hasLvalueElements!R) 4048 { 4049 static if (withIndex) 4050 { 4051 if (dg(overallIndex, *temp[i])) foreachErr(); 4052 } 4053 else 4054 { 4055 if (dg(*temp[i])) foreachErr(); 4056 } 4057 } 4058 else 4059 { 4060 static if (withIndex) 4061 { 4062 if (dg(overallIndex, temp[i])) foreachErr(); 4063 } 4064 else 4065 { 4066 if (dg(temp[i])) foreachErr(); 4067 } 4068 } 4069 } 4070 } 4071 } 4072 4073 submitAndExecute(pool, &doIt); 4074 4075 return 0; 4076 }; 4077 4078 4079 private struct ParallelForeach(R) 4080 { 4081 TaskPool pool; 4082 R range; 4083 size_t workUnitSize; 4084 alias E = ElementType!R; 4085 4086 static if (hasLvalueElements!R) 4087 { 4088 alias NoIndexDg = int delegate(ref E); 4089 alias IndexDg = int delegate(size_t, ref E); 4090 } 4091 else 4092 { 4093 alias NoIndexDg = int delegate(E); 4094 alias IndexDg = int delegate(size_t, E); 4095 } 4096 4097 int opApply(scope NoIndexDg dg) 4098 { 4099 static if (randLen!R) 4100 { 4101 mixin(parallelApplyMixinRandomAccess); 4102 } 4103 else 4104 { 4105 mixin(parallelApplyMixinInputRange); 4106 } 4107 } 4108 4109 int opApply(scope IndexDg dg) 4110 { 4111 static if (randLen!R) 4112 { 4113 mixin(parallelApplyMixinRandomAccess); 4114 } 4115 else 4116 { 4117 mixin(parallelApplyMixinInputRange); 4118 } 4119 } 4120 } 4121 4122 /* 4123 This struct buffers the output of a callable that outputs data into a 4124 user-supplied buffer into a set of buffers of some fixed size. It allows these 4125 buffers to be accessed with an input range interface. This is used internally 4126 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an 4127 instance and forwards it to the input range overload of asyncBuf. 4128 */ 4129 private struct RoundRobinBuffer(C1, C2) 4130 { 4131 // No need for constraints because they're already checked for in asyncBuf. 4132 4133 alias Array = Parameters!(C1.init)[0]; 4134 alias T = typeof(Array.init[0]); 4135 4136 T[][] bufs; 4137 size_t index; 4138 C1 nextDel; 4139 C2 emptyDel; 4140 bool _empty; 4141 bool primed; 4142 4143 this( 4144 C1 nextDel, 4145 C2 emptyDel, 4146 size_t initialBufSize, 4147 size_t nBuffers 4148 ) { 4149 this.nextDel = nextDel; 4150 this.emptyDel = emptyDel; 4151 bufs.length = nBuffers; 4152 4153 foreach (ref buf; bufs) 4154 { 4155 buf.length = initialBufSize; 4156 } 4157 } 4158 4159 void prime() 4160 in 4161 { 4162 assert(!empty); 4163 } 4164 do 4165 { 4166 scope(success) primed = true; 4167 nextDel(bufs[index]); 4168 } 4169 4170 4171 T[] front() @property 4172 in 4173 { 4174 assert(!empty); 4175 } 4176 do 4177 { 4178 if (!primed) prime(); 4179 return bufs[index]; 4180 } 4181 4182 void popFront() 4183 { 4184 if (empty || emptyDel()) 4185 { 4186 _empty = true; 4187 return; 4188 } 4189 4190 index = (index + 1) % bufs.length; 4191 primed = false; 4192 } 4193 4194 bool empty() @property const @safe pure nothrow 4195 { 4196 return _empty; 4197 } 4198 } 4199 4200 version (StdUnittest) 4201 { 4202 // This was the only way I could get nested maps to work. 4203 private __gshared TaskPool poolInstance; 4204 } 4205 4206 // These test basic functionality but don't stress test for threading bugs. 4207 // These are the tests that should be run every time Phobos is compiled. 4208 @system unittest 4209 { 4210 import std.algorithm.comparison : equal, min, max; 4211 import std.algorithm.iteration : filter, map, reduce; 4212 import std.array : split; 4213 import std.conv : text; 4214 import std.exception : assertThrown; 4215 import std.math.operations : isClose; 4216 import std.math.algebraic : sqrt, abs; 4217 import std.math.exponential : log; 4218 import std.range : indexed, iota, join; 4219 import std.typecons : Tuple, tuple; 4220 import std.stdio; 4221 4222 poolInstance = new TaskPool(2); 4223 scope(exit) poolInstance.stop(); 4224 4225 // The only way this can be verified is manually. 4226 debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs); 4227 4228 auto oldPriority = poolInstance.priority; 4229 poolInstance.priority = Thread.PRIORITY_MAX; 4230 assert(poolInstance.priority == Thread.PRIORITY_MAX); 4231 4232 poolInstance.priority = Thread.PRIORITY_MIN; 4233 assert(poolInstance.priority == Thread.PRIORITY_MIN); 4234 4235 poolInstance.priority = oldPriority; 4236 assert(poolInstance.priority == oldPriority); 4237 4238 static void refFun(ref uint num) 4239 { 4240 num++; 4241 } 4242 4243 uint x; 4244 4245 // Test task(). 4246 auto t = task!refFun(x); 4247 poolInstance.put(t); 4248 t.yieldForce; 4249 assert(t.args[0] == 1); 4250 4251 auto t2 = task(&refFun, x); 4252 poolInstance.put(t2); 4253 t2.yieldForce; 4254 assert(t2.args[0] == 1); 4255 4256 // Test scopedTask(). 4257 auto st = scopedTask!refFun(x); 4258 poolInstance.put(st); 4259 st.yieldForce; 4260 assert(st.args[0] == 1); 4261 4262 auto st2 = scopedTask(&refFun, x); 4263 poolInstance.put(st2); 4264 st2.yieldForce; 4265 assert(st2.args[0] == 1); 4266 4267 // Test executeInNewThread(). 4268 auto ct = scopedTask!refFun(x); 4269 ct.executeInNewThread(Thread.PRIORITY_MAX); 4270 ct.yieldForce; 4271 assert(ct.args[0] == 1); 4272 4273 // Test ref return. 4274 uint toInc = 0; 4275 static ref T makeRef(T)(ref T num) 4276 { 4277 return num; 4278 } 4279 4280 auto t3 = task!makeRef(toInc); 4281 taskPool.put(t3); 4282 assert(t3.args[0] == 0); 4283 t3.spinForce++; 4284 assert(t3.args[0] == 1); 4285 4286 static void testSafe() @safe { 4287 static int bump(int num) 4288 { 4289 return num + 1; 4290 } 4291 4292 auto safePool = new TaskPool(0); 4293 auto t = task(&bump, 1); 4294 taskPool.put(t); 4295 assert(t.yieldForce == 2); 4296 4297 auto st = scopedTask(&bump, 1); 4298 taskPool.put(st); 4299 assert(st.yieldForce == 2); 4300 safePool.stop(); 4301 } 4302 4303 auto arr = [1,2,3,4,5]; 4304 auto nums = new uint[5]; 4305 auto nums2 = new uint[5]; 4306 4307 foreach (i, ref elem; poolInstance.parallel(arr)) 4308 { 4309 elem++; 4310 nums[i] = cast(uint) i + 2; 4311 nums2[i] = elem; 4312 } 4313 4314 assert(nums == [2,3,4,5,6], text(nums)); 4315 assert(nums2 == nums, text(nums2)); 4316 assert(arr == nums, text(arr)); 4317 4318 // Test const/immutable arguments. 4319 static int add(int lhs, int rhs) 4320 { 4321 return lhs + rhs; 4322 } 4323 immutable addLhs = 1; 4324 immutable addRhs = 2; 4325 auto addTask = task(&add, addLhs, addRhs); 4326 auto addScopedTask = scopedTask(&add, addLhs, addRhs); 4327 poolInstance.put(addTask); 4328 poolInstance.put(addScopedTask); 4329 assert(addTask.yieldForce == 3); 4330 assert(addScopedTask.yieldForce == 3); 4331 4332 // Test parallel foreach with non-random access range. 4333 auto range = filter!"a != 666"([0, 1, 2, 3, 4]); 4334 4335 foreach (i, elem; poolInstance.parallel(range)) 4336 { 4337 nums[i] = cast(uint) i; 4338 } 4339 4340 assert(nums == [0,1,2,3,4]); 4341 4342 auto logs = new double[1_000_000]; 4343 foreach (i, ref elem; poolInstance.parallel(logs)) 4344 { 4345 elem = log(i + 1.0); 4346 } 4347 4348 foreach (i, elem; logs) 4349 { 4350 assert(isClose(elem, log(double(i + 1)))); 4351 } 4352 4353 assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]); 4354 assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]); 4355 assert(poolInstance.amap!("a * a", "-a")([1,2,3]) == 4356 [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4357 4358 auto tupleBuf = new Tuple!(int, int)[3]; 4359 poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf); 4360 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4361 poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf); 4362 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4363 4364 // Test amap with a non-array buffer. 4365 auto toIndex = new int[5]; 4366 auto ind = indexed(toIndex, [3, 1, 4, 0, 2]); 4367 poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind); 4368 assert(equal(ind, [2, 4, 6, 8, 10])); 4369 assert(equal(toIndex, [8, 4, 10, 2, 6])); 4370 poolInstance.amap!"a / 2"(ind, ind); 4371 assert(equal(ind, [1, 2, 3, 4, 5])); 4372 assert(equal(toIndex, [4, 2, 5, 1, 3])); 4373 4374 auto buf = new int[5]; 4375 poolInstance.amap!"a * a"([1,2,3,4,5], buf); 4376 assert(buf == [1,4,9,16,25]); 4377 poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf); 4378 assert(buf == [1,4,9,16,25]); 4379 4380 assert(poolInstance.reduce!"a + b"([1]) == 1); 4381 assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10); 4382 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10); 4383 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10); 4384 assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4)); 4385 assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) == 4386 tuple(10, 24)); 4387 4388 immutable serialAns = reduce!"a + b"(iota(1000)); 4389 assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns); 4390 assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns); 4391 4392 // Test worker-local storage. 4393 auto wl = poolInstance.workerLocalStorage(0); 4394 foreach (i; poolInstance.parallel(iota(1000), 1)) 4395 { 4396 wl.get = wl.get + i; 4397 } 4398 4399 auto wlRange = wl.toRange; 4400 auto parallelSum = poolInstance.reduce!"a + b"(wlRange); 4401 assert(parallelSum == 499500); 4402 assert(wlRange[0 .. 1][0] == wlRange[0]); 4403 assert(wlRange[1 .. 2][0] == wlRange[1]); 4404 4405 // Test finish() 4406 { 4407 static void slowFun() { Thread.sleep(dur!"msecs"(1)); } 4408 4409 auto pool1 = new TaskPool(); 4410 auto tSlow = task!slowFun(); 4411 pool1.put(tSlow); 4412 pool1.finish(); 4413 tSlow.yieldForce; 4414 // Can't assert that pool1.status == PoolState.stopNow because status 4415 // doesn't change until after the "done" flag is set and the waiting 4416 // thread is woken up. 4417 4418 auto pool2 = new TaskPool(); 4419 auto tSlow2 = task!slowFun(); 4420 pool2.put(tSlow2); 4421 pool2.finish(true); // blocking 4422 assert(tSlow2.done); 4423 4424 // Test fix for https://issues.dlang.org/show_bug.cgi?id=8582 by making pool size zero. 4425 auto pool3 = new TaskPool(0); 4426 auto tSlow3 = task!slowFun(); 4427 pool3.put(tSlow3); 4428 pool3.finish(true); // blocking 4429 assert(tSlow3.done); 4430 4431 // This is correct because no thread will terminate unless pool2.status 4432 // and pool3.status have already been set to stopNow. 4433 assert(pool2.status == TaskPool.PoolState.stopNow); 4434 assert(pool3.status == TaskPool.PoolState.stopNow); 4435 } 4436 4437 // Test default pool stuff. 4438 assert(taskPool.size == totalCPUs - 1); 4439 4440 nums = new uint[1000]; 4441 foreach (i; parallel(iota(1000))) 4442 { 4443 nums[i] = cast(uint) i; 4444 } 4445 assert(equal(nums, iota(1000))); 4446 4447 assert(equal( 4448 poolInstance.map!"a * a"(iota(3_000_001), 10_000), 4449 map!"a * a"(iota(3_000_001)) 4450 )); 4451 4452 // The filter is to kill random access and test the non-random access 4453 // branch. 4454 assert(equal( 4455 poolInstance.map!"a * a"( 4456 filter!"a == a"(iota(3_000_001) 4457 ), 10_000, 1000), 4458 map!"a * a"(iota(3_000_001)) 4459 )); 4460 4461 assert( 4462 reduce!"a + b"(0UL, 4463 poolInstance.map!"a * a"(iota(300_001), 10_000) 4464 ) == 4465 reduce!"a + b"(0UL, 4466 map!"a * a"(iota(300_001)) 4467 ) 4468 ); 4469 4470 assert(equal( 4471 iota(1_000_002), 4472 poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002))) 4473 )); 4474 4475 { 4476 import std.conv : to; 4477 import std.file : deleteme; 4478 4479 string temp_file = deleteme ~ "-tempDelMe.txt"; 4480 auto file = File(temp_file, "wb"); 4481 scope(exit) 4482 { 4483 file.close(); 4484 import std.file; 4485 remove(temp_file); 4486 } 4487 4488 auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]]; 4489 foreach (row; written) 4490 { 4491 file.writeln(join(to!(string[])(row), "\t")); 4492 } 4493 4494 file = File(temp_file); 4495 4496 void next(ref char[] buf) 4497 { 4498 file.readln(buf); 4499 import std.string : chomp; 4500 buf = chomp(buf); 4501 } 4502 4503 double[][] read; 4504 auto asyncReader = taskPool.asyncBuf(&next, &file.eof); 4505 4506 foreach (line; asyncReader) 4507 { 4508 if (line.length == 0) continue; 4509 auto ls = line.split("\t"); 4510 read ~= to!(double[])(ls); 4511 } 4512 4513 assert(read == written); 4514 file.close(); 4515 } 4516 4517 // Test Map/AsyncBuf chaining. 4518 4519 auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100); 4520 auto temp = poolInstance.map!sqrt( 4521 abuf, 100, 5 4522 ); 4523 auto lmchain = poolInstance.map!"a * a"(temp, 100, 5); 4524 lmchain.popFront(); 4525 4526 int ii; 4527 foreach ( elem; (lmchain)) 4528 { 4529 if (!isClose(elem, ii)) 4530 { 4531 stderr.writeln(ii, '\t', elem); 4532 } 4533 ii++; 4534 } 4535 4536 // Test buffer trick in parallel foreach. 4537 abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100); 4538 abuf.popFront(); 4539 auto bufTrickTest = new size_t[abuf.length]; 4540 foreach (i, elem; parallel(abuf)) 4541 { 4542 bufTrickTest[i] = i; 4543 } 4544 4545 assert(equal(iota(1_000_000), bufTrickTest)); 4546 4547 auto myTask = task!(abs)(-1); 4548 taskPool.put(myTask); 4549 assert(myTask.spinForce == 1); 4550 4551 // Test that worker local storage from one pool receives an index of 0 4552 // when the index is queried w.r.t. another pool. The only way to do this 4553 // is non-deterministically. 4554 foreach (i; parallel(iota(1000), 1)) 4555 { 4556 assert(poolInstance.workerIndex == 0); 4557 } 4558 4559 foreach (i; poolInstance.parallel(iota(1000), 1)) 4560 { 4561 assert(taskPool.workerIndex == 0); 4562 } 4563 4564 // Test exception handling. 4565 static void parallelForeachThrow() 4566 { 4567 foreach (elem; parallel(iota(10))) 4568 { 4569 throw new Exception(""); 4570 } 4571 } 4572 4573 assertThrown!Exception(parallelForeachThrow()); 4574 4575 static int reduceException(int a, int b) 4576 { 4577 throw new Exception(""); 4578 } 4579 4580 assertThrown!Exception( 4581 poolInstance.reduce!reduceException(iota(3)) 4582 ); 4583 4584 static int mapException(int a) 4585 { 4586 throw new Exception(""); 4587 } 4588 4589 assertThrown!Exception( 4590 poolInstance.amap!mapException(iota(3)) 4591 ); 4592 4593 static void mapThrow() 4594 { 4595 auto m = poolInstance.map!mapException(iota(3)); 4596 m.popFront(); 4597 } 4598 4599 assertThrown!Exception(mapThrow()); 4600 4601 struct ThrowingRange 4602 { 4603 @property int front() 4604 { 4605 return 1; 4606 } 4607 void popFront() 4608 { 4609 throw new Exception(""); 4610 } 4611 enum bool empty = false; 4612 } 4613 4614 assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init)); 4615 } 4616 4617 //version = parallelismStressTest; 4618 4619 // These are more like stress tests than real unit tests. They print out 4620 // tons of stuff and should not be run every time make unittest is run. 4621 version (parallelismStressTest) 4622 { 4623 @system unittest 4624 { 4625 import std.stdio : stderr, writeln, readln; 4626 import std.range : iota; 4627 import std.algorithm.iteration : filter, reduce; 4628 4629 size_t attempt; 4630 for (; attempt < 10; attempt++) 4631 foreach (poolSize; [0, 4]) 4632 { 4633 4634 poolInstance = new TaskPool(poolSize); 4635 4636 uint[] numbers = new uint[1_000]; 4637 4638 foreach (i; poolInstance.parallel( iota(0, numbers.length)) ) 4639 { 4640 numbers[i] = cast(uint) i; 4641 } 4642 4643 // Make sure it works. 4644 foreach (i; 0 .. numbers.length) 4645 { 4646 assert(numbers[i] == i); 4647 } 4648 4649 stderr.writeln("Done creating nums."); 4650 4651 4652 auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000)); 4653 foreach (num; poolInstance.parallel(myNumbers)) 4654 { 4655 assert(num % 7 > 0 && num < 1000); 4656 } 4657 stderr.writeln("Done modulus test."); 4658 4659 uint[] squares = poolInstance.amap!"a * a"(numbers, 100); 4660 assert(squares.length == numbers.length); 4661 foreach (i, number; numbers) 4662 { 4663 assert(squares[i] == number * number); 4664 } 4665 stderr.writeln("Done squares."); 4666 4667 auto sumFuture = task!( reduce!"a + b" )(numbers); 4668 poolInstance.put(sumFuture); 4669 4670 ulong sumSquares = 0; 4671 foreach (elem; numbers) 4672 { 4673 sumSquares += elem * elem; 4674 } 4675 4676 uint mySum = sumFuture.spinForce(); 4677 assert(mySum == 999 * 1000 / 2); 4678 4679 auto mySumParallel = poolInstance.reduce!"a + b"(numbers); 4680 assert(mySum == mySumParallel); 4681 stderr.writeln("Done sums."); 4682 4683 auto myTask = task( 4684 { 4685 synchronized writeln("Our lives are parallel...Our lives are parallel."); 4686 }); 4687 poolInstance.put(myTask); 4688 4689 auto nestedOuter = "abcd"; 4690 auto nestedInner = iota(0, 10, 2); 4691 4692 foreach (i, letter; poolInstance.parallel(nestedOuter, 1)) 4693 { 4694 foreach (j, number; poolInstance.parallel(nestedInner, 1)) 4695 { 4696 synchronized writeln(i, ": ", letter, " ", j, ": ", number); 4697 } 4698 } 4699 4700 poolInstance.stop(); 4701 } 4702 4703 assert(attempt == 10); 4704 writeln("Press enter to go to next round of unittests."); 4705 readln(); 4706 } 4707 4708 // These unittests are intended more for actual testing and not so much 4709 // as examples. 4710 @system unittest 4711 { 4712 import std.stdio : stderr; 4713 import std.range : iota; 4714 import std.algorithm.iteration : filter, reduce; 4715 import std.math.algebraic : sqrt; 4716 import std.math.operations : isClose; 4717 import std.math.traits : isNaN; 4718 import std.conv : text; 4719 4720 foreach (attempt; 0 .. 10) 4721 foreach (poolSize; [0, 4]) 4722 { 4723 poolInstance = new TaskPool(poolSize); 4724 4725 // Test indexing. 4726 stderr.writeln("Creator Raw Index: ", poolInstance.threadIndex); 4727 assert(poolInstance.workerIndex() == 0); 4728 4729 // Test worker-local storage. 4730 auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1); 4731 foreach (i; poolInstance.parallel(iota(0U, 1_000_000))) 4732 { 4733 workerLocalStorage.get++; 4734 } 4735 assert(reduce!"a + b"(workerLocalStorage.toRange) == 4736 1_000_000 + poolInstance.size + 1); 4737 4738 // Make sure work is reasonably balanced among threads. This test is 4739 // non-deterministic and is more of a sanity check than something that 4740 // has an absolute pass/fail. 4741 shared(uint)[void*] nJobsByThread; 4742 foreach (thread; poolInstance.pool) 4743 { 4744 nJobsByThread[cast(void*) thread] = 0; 4745 } 4746 nJobsByThread[ cast(void*) Thread.getThis()] = 0; 4747 4748 foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 )) 4749 { 4750 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1); 4751 } 4752 4753 stderr.writeln("\nCurrent thread is: ", 4754 cast(void*) Thread.getThis()); 4755 stderr.writeln("Workload distribution: "); 4756 foreach (k, v; nJobsByThread) 4757 { 4758 stderr.writeln(k, '\t', v); 4759 } 4760 4761 // Test whether amap can be nested. 4762 real[][] matrix = new real[][](1000, 1000); 4763 foreach (i; poolInstance.parallel( iota(0, matrix.length) )) 4764 { 4765 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) )) 4766 { 4767 matrix[i][j] = i * j; 4768 } 4769 } 4770 4771 // Get around weird bugs having to do w/ sqrt being an intrinsic: 4772 static real mySqrt(real num) 4773 { 4774 return sqrt(num); 4775 } 4776 4777 static real[] parallelSqrt(real[] nums) 4778 { 4779 return poolInstance.amap!mySqrt(nums); 4780 } 4781 4782 real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix); 4783 4784 foreach (i, row; sqrtMatrix) 4785 { 4786 foreach (j, elem; row) 4787 { 4788 real shouldBe = sqrt( cast(real) i * j); 4789 assert(isClose(shouldBe, elem)); 4790 sqrtMatrix[i][j] = shouldBe; 4791 } 4792 } 4793 4794 auto saySuccess = task( 4795 { 4796 stderr.writeln( 4797 "Success doing matrix stuff that involves nested pool use."); 4798 }); 4799 poolInstance.put(saySuccess); 4800 saySuccess.workForce(); 4801 4802 // A more thorough test of amap, reduce: Find the sum of the square roots of 4803 // matrix. 4804 4805 static real parallelSum(real[] input) 4806 { 4807 return poolInstance.reduce!"a + b"(input); 4808 } 4809 4810 auto sumSqrt = poolInstance.reduce!"a + b"( 4811 poolInstance.amap!parallelSum( 4812 sqrtMatrix 4813 ) 4814 ); 4815 4816 assert(isClose(sumSqrt, 4.437e8, 1e-2)); 4817 stderr.writeln("Done sum of square roots."); 4818 4819 // Test whether tasks work with function pointers. 4820 /+ // This part is buggy and needs to be fixed... 4821 auto nanTask = task(&isNaN, 1.0L); 4822 poolInstance.put(nanTask); 4823 assert(nanTask.spinForce == false); 4824 4825 if (poolInstance.size > 0) 4826 { 4827 // Test work waiting. 4828 static void uselessFun() 4829 { 4830 foreach (i; 0 .. 1_000_000) {} 4831 } 4832 4833 auto uselessTasks = new typeof(task(&uselessFun))[1000]; 4834 foreach (ref uselessTask; uselessTasks) 4835 { 4836 uselessTask = task(&uselessFun); 4837 } 4838 foreach (ref uselessTask; uselessTasks) 4839 { 4840 poolInstance.put(uselessTask); 4841 } 4842 foreach (ref uselessTask; uselessTasks) 4843 { 4844 uselessTask.workForce(); 4845 } 4846 } 4847 +/ 4848 4849 // Test the case of non-random access + ref returns. 4850 int[] nums = [1,2,3,4,5]; 4851 static struct RemoveRandom 4852 { 4853 int[] arr; 4854 4855 ref int front() 4856 { 4857 return arr.front; 4858 } 4859 void popFront() 4860 { 4861 arr.popFront(); 4862 } 4863 bool empty() 4864 { 4865 return arr.empty; 4866 } 4867 } 4868 4869 auto refRange = RemoveRandom(nums); 4870 foreach (ref elem; poolInstance.parallel(refRange)) 4871 { 4872 elem++; 4873 } 4874 assert(nums == [2,3,4,5,6], text(nums)); 4875 stderr.writeln("Nums: ", nums); 4876 4877 poolInstance.stop(); 4878 } 4879 } 4880 } 4881 4882 @system unittest 4883 { 4884 static struct __S_12733 4885 { 4886 invariant() { assert(checksum == 1_234_567_890); } 4887 this(ulong u){n = u;} 4888 void opAssign(__S_12733 s){this.n = s.n;} 4889 ulong n; 4890 ulong checksum = 1_234_567_890; 4891 } 4892 4893 static auto __genPair_12733(ulong n) { return __S_12733(n); } 4894 immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ]; 4895 4896 auto result = taskPool.amap!__genPair_12733(data); 4897 } 4898 4899 @safe unittest 4900 { 4901 import std.range : iota; 4902 4903 // this test was in std.range, but caused cycles. 4904 assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} })); 4905 } 4906 4907 @safe unittest 4908 { 4909 import std.algorithm.iteration : each; 4910 4911 long[] arr; 4912 static assert(is(typeof({ 4913 arr.parallel.each!"a++"; 4914 }))); 4915 } 4916 4917 // https://issues.dlang.org/show_bug.cgi?id=17539 4918 @system unittest 4919 { 4920 import std.random : rndGen; 4921 // ensure compilation 4922 try foreach (rnd; rndGen.parallel) break; 4923 catch (ParallelForeachError e) {} 4924 }