1 /** 2 `std.parallelism` implements high-level primitives for SMP parallelism. 3 These include parallel foreach, parallel reduce, parallel eager map, pipelining 4 and future/promise parallelism. `std.parallelism` is recommended when the 5 same operation is to be executed in parallel on different data, or when a 6 function is to be executed in a background thread and its result returned to a 7 well-defined main thread. For communication between arbitrary threads, see 8 `std.concurrency`. 9 10 `std.parallelism` is based on the concept of a `Task`. A `Task` is an 11 object that represents the fundamental unit of work in this library and may be 12 executed in parallel with any other `Task`. Using `Task` 13 directly allows programming with a future/promise paradigm. All other 14 supported parallelism paradigms (parallel foreach, map, reduce, pipelining) 15 represent an additional level of abstraction over `Task`. They 16 automatically create one or more `Task` objects, or closely related types 17 that are conceptually identical but not part of the public API. 18 19 After creation, a `Task` may be executed in a new thread, or submitted 20 to a `TaskPool` for execution. A `TaskPool` encapsulates a task queue 21 and its worker threads. Its purpose is to efficiently map a large 22 number of `Task`s onto a smaller number of threads. A task queue is a 23 FIFO queue of `Task` objects that have been submitted to the 24 `TaskPool` and are awaiting execution. A worker thread is a thread that 25 is associated with exactly one task queue. It executes the `Task` at the 26 front of its queue when the queue has work available, or sleeps when 27 no work is available. Each task queue is associated with zero or 28 more worker threads. If the result of a `Task` is needed before execution 29 by a worker thread has begun, the `Task` can be removed from the task queue 30 and executed immediately in the thread where the result is needed. 31 32 Warning: Unless marked as `@trusted` or `@safe`, artifacts in 33 this module allow implicit data sharing between threads and cannot 34 guarantee that client code is free from low level data races. 35 36 Source: $(PHOBOSSRC std/parallelism.d) 37 Author: David Simcha 38 Copyright: Copyright (c) 2009-2011, David Simcha. 39 License: $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0) 40 */ 41 module std.parallelism; 42 43 version (WebAssembly) {} else: 44 45 version (OSX) 46 version = Darwin; 47 else version (iOS) 48 version = Darwin; 49 else version (TVOS) 50 version = Darwin; 51 else version (WatchOS) 52 version = Darwin; 53 54 /// 55 @system unittest 56 { 57 import std.algorithm.iteration : map; 58 import std.math.operations : isClose; 59 import std.parallelism : taskPool; 60 import std.range : iota; 61 62 // Parallel reduce can be combined with 63 // std.algorithm.iteration.map to interesting effect. 64 // The following example (thanks to Russel Winder) 65 // calculates pi by quadrature using 66 // std.algorithm.map and TaskPool.reduce. 67 // getTerm is evaluated in parallel as needed by 68 // TaskPool.reduce. 69 // 70 // Timings on an Intel i5-3450 quad core machine 71 // for n = 1_000_000_000: 72 // 73 // TaskPool.reduce: 1.067 s 74 // std.algorithm.reduce: 4.011 s 75 76 enum n = 1_000_000; 77 enum delta = 1.0 / n; 78 79 alias getTerm = (int i) 80 { 81 immutable x = ( i - 0.5 ) * delta; 82 return delta / ( 1.0 + x * x ) ; 83 }; 84 85 immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm); 86 87 assert(pi.isClose(3.14159, 1e-5)); 88 } 89 90 import core.atomic; 91 import core.memory; 92 import core.sync.condition; 93 import core.thread; 94 95 import std.functional; 96 import std.meta; 97 import std.range.primitives; 98 import std.traits; 99 100 /* 101 (For now public undocumented with reserved name.) 102 103 A lazily initialized global constant. The underlying value is a shared global 104 statically initialized to `outOfBandValue` which must not be a legit value of 105 the constant. Upon the first call the situation is detected and the global is 106 initialized by calling `initializer`. The initializer is assumed to be pure 107 (even if not marked as such), i.e. return the same value upon repeated calls. 108 For that reason, no special precautions are taken so `initializer` may be called 109 more than one time leading to benign races on the cached value. 110 111 In the quiescent state the cost of the function is an atomic load from a global. 112 113 Params: 114 T = The type of the pseudo-constant (may be qualified) 115 outOfBandValue = A value that cannot be valid, it is used for initialization 116 initializer = The function performing initialization; must be `nothrow` 117 118 Returns: 119 The lazily initialized value 120 */ 121 @property pure 122 T __lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)() 123 if (is(Unqual!T : T) 124 && is(typeof(initializer()) : T) 125 && is(typeof(outOfBandValue) : T)) 126 { 127 static T impl() nothrow 128 { 129 // Thread-local cache 130 static Unqual!T tls = outOfBandValue; 131 auto local = tls; 132 // Shortest path, no atomic operations 133 if (local != outOfBandValue) return local; 134 // Process-level cache 135 static shared Unqual!T result = outOfBandValue; 136 // Initialize both process-level cache and tls 137 local = atomicLoad(result); 138 if (local == outOfBandValue) 139 { 140 local = initializer(); 141 atomicStore(result, local); 142 } 143 tls = local; 144 return local; 145 } 146 147 import std.traits : SetFunctionAttributes; 148 alias Fun = SetFunctionAttributes!(typeof(&impl), "D", 149 functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_); 150 auto purified = (() @trusted => cast(Fun) &impl)(); 151 return purified(); 152 } 153 154 // Returns the size of a cache line. 155 alias cacheLineSize = 156 __lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl); 157 158 private size_t cacheLineSizeImpl() @nogc nothrow @trusted 159 { 160 size_t result = 0; 161 import core.cpuid : datacache; 162 foreach (ref const cachelevel; datacache) 163 { 164 if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max) 165 { 166 result = cachelevel.lineSize; 167 } 168 } 169 return result; 170 } 171 172 @nogc @safe nothrow unittest 173 { 174 assert(cacheLineSize == cacheLineSizeImpl); 175 } 176 177 /* Atomics code. These forward to core.atomic, but are written like this 178 for two reasons: 179 180 1. They used to actually contain ASM code and I don' want to have to change 181 to directly calling core.atomic in a zillion different places. 182 183 2. core.atomic has some misc. issues that make my use cases difficult 184 without wrapping it. If I didn't wrap it, casts would be required 185 basically everywhere. 186 */ 187 private void atomicSetUbyte(T)(ref T stuff, T newVal) 188 if (__traits(isIntegral, T) && is(T : ubyte)) 189 { 190 //core.atomic.cas(cast(shared) &stuff, stuff, newVal); 191 atomicStore(*(cast(shared) &stuff), newVal); 192 } 193 194 private ubyte atomicReadUbyte(T)(ref T val) 195 if (__traits(isIntegral, T) && is(T : ubyte)) 196 { 197 return atomicLoad(*(cast(shared) &val)); 198 } 199 200 // This gets rid of the need for a lot of annoying casts in other parts of the 201 // code, when enums are involved. 202 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal) 203 if (__traits(isIntegral, T) && is(T : ubyte)) 204 { 205 return core.atomic.cas(cast(shared) &stuff, testVal, newVal); 206 } 207 208 /*--------------------- Generic helper functions, etc.------------------------*/ 209 private template MapType(R, functions...) 210 { 211 static assert(functions.length); 212 213 ElementType!R e = void; 214 alias MapType = 215 typeof(adjoin!(staticMap!(unaryFun, functions))(e)); 216 } 217 218 private template ReduceType(alias fun, R, E) 219 { 220 alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init)); 221 } 222 223 private template noUnsharedAliasing(T) 224 { 225 enum bool noUnsharedAliasing = !hasUnsharedAliasing!T; 226 } 227 228 // This template tests whether a function may be executed in parallel from 229 // @safe code via Task.executeInNewThread(). There is an additional 230 // requirement for executing it via a TaskPool. (See isSafeReturn). 231 private template isSafeTask(F) 232 { 233 enum bool isSafeTask = 234 (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 && 235 (functionAttributes!F & FunctionAttribute.ref_) == 0 && 236 (isFunctionPointer!F || !hasUnsharedAliasing!F) && 237 allSatisfy!(noUnsharedAliasing, Parameters!F); 238 } 239 240 @safe unittest 241 { 242 alias F1 = void function() @safe; 243 alias F2 = void function(); 244 alias F3 = void function(uint, string) @trusted; 245 alias F4 = void function(uint, char[]); 246 247 static assert( isSafeTask!F1); 248 static assert(!isSafeTask!F2); 249 static assert( isSafeTask!F3); 250 static assert(!isSafeTask!F4); 251 252 alias F5 = uint[] function(uint, string) pure @trusted; 253 static assert( isSafeTask!F5); 254 } 255 256 // This function decides whether Tasks that meet all of the other requirements 257 // for being executed from @safe code can be executed on a TaskPool. 258 // When executing via TaskPool, it's theoretically possible 259 // to return a value that is also pointed to by a worker thread's thread local 260 // storage. When executing from executeInNewThread(), the thread that executed 261 // the Task is terminated by the time the return value is visible in the calling 262 // thread, so this is a non-issue. It's also a non-issue for pure functions 263 // since they can't read global state. 264 private template isSafeReturn(T) 265 { 266 static if (!hasUnsharedAliasing!(T.ReturnType)) 267 { 268 enum isSafeReturn = true; 269 } 270 else static if (T.isPure) 271 { 272 enum isSafeReturn = true; 273 } 274 else 275 { 276 enum isSafeReturn = false; 277 } 278 } 279 280 private template randAssignable(R) 281 { 282 enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R; 283 } 284 285 private enum TaskStatus : ubyte 286 { 287 notStarted, 288 inProgress, 289 done 290 } 291 292 private template AliasReturn(alias fun, T...) 293 { 294 alias AliasReturn = typeof({ T args; return fun(args); }); 295 } 296 297 // Should be private, but std.algorithm.reduce is used in the zero-thread case 298 // and won't work w/ private. 299 template reduceAdjoin(functions...) 300 { 301 static if (functions.length == 1) 302 { 303 alias reduceAdjoin = binaryFun!(functions[0]); 304 } 305 else 306 { 307 T reduceAdjoin(T, U)(T lhs, U rhs) 308 { 309 alias funs = staticMap!(binaryFun, functions); 310 311 foreach (i, Unused; typeof(lhs.expand)) 312 { 313 lhs.expand[i] = funs[i](lhs.expand[i], rhs); 314 } 315 316 return lhs; 317 } 318 } 319 } 320 321 private template reduceFinish(functions...) 322 { 323 static if (functions.length == 1) 324 { 325 alias reduceFinish = binaryFun!(functions[0]); 326 } 327 else 328 { 329 T reduceFinish(T)(T lhs, T rhs) 330 { 331 alias funs = staticMap!(binaryFun, functions); 332 333 foreach (i, Unused; typeof(lhs.expand)) 334 { 335 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]); 336 } 337 338 return lhs; 339 } 340 } 341 } 342 343 private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2) 344 { 345 enum isRoundRobin = true; 346 } 347 348 private template isRoundRobin(T) 349 { 350 enum isRoundRobin = false; 351 } 352 353 @safe unittest 354 { 355 static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate()))); 356 static assert(!isRoundRobin!(uint)); 357 } 358 359 // This is the base "class" for all of the other tasks. Using C-style 360 // polymorphism to allow more direct control over memory allocation, etc. 361 private struct AbstractTask 362 { 363 AbstractTask* prev; 364 AbstractTask* next; 365 366 // Pointer to a function that executes this task. 367 void function(void*) runTask; 368 369 Throwable exception; 370 ubyte taskStatus = TaskStatus.notStarted; 371 372 bool done() @property 373 { 374 if (atomicReadUbyte(taskStatus) == TaskStatus.done) 375 { 376 if (exception) 377 { 378 throw exception; 379 } 380 381 return true; 382 } 383 384 return false; 385 } 386 387 void job() 388 { 389 runTask(&this); 390 } 391 } 392 393 /** 394 `Task` represents the fundamental unit of work. A `Task` may be 395 executed in parallel with any other `Task`. Using this struct directly 396 allows future/promise parallelism. In this paradigm, a function (or delegate 397 or other callable) is executed in a thread other than the one it was called 398 from. The calling thread does not block while the function is being executed. 399 A call to `workForce`, `yieldForce`, or `spinForce` is used to 400 ensure that the `Task` has finished executing and to obtain the return 401 value, if any. These functions and `done` also act as full memory barriers, 402 meaning that any memory writes made in the thread that executed the `Task` 403 are guaranteed to be visible in the calling thread after one of these functions 404 returns. 405 406 The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can 407 be used to create an instance of this struct. See `task` for usage examples. 408 409 Function results are returned from `yieldForce`, `spinForce` and 410 `workForce` by ref. If `fun` returns by ref, the reference will point 411 to the returned reference of `fun`. Otherwise it will point to a 412 field in this struct. 413 414 Copying of this struct is disabled, since it would provide no useful semantics. 415 If you want to pass this struct around, you should do so by reference or 416 pointer. 417 418 Bugs: Changes to `ref` and `out` arguments are not propagated to the 419 call site, only to `args` in this struct. 420 */ 421 struct Task(alias fun, Args...) 422 { 423 @(imported!"core.attribute".mutableRefInit) private AbstractTask base = {runTask : &impl}; 424 private alias base this; 425 426 private @property AbstractTask* basePtr() 427 { 428 return &base; 429 } 430 431 private static void impl(void* myTask) 432 { 433 import std.algorithm.internal : addressOf; 434 435 Task* myCastedTask = cast(typeof(this)*) myTask; 436 static if (is(ReturnType == void)) 437 { 438 fun(myCastedTask._args); 439 } 440 else static if (is(typeof(&(fun(myCastedTask._args))))) 441 { 442 myCastedTask.returnVal = addressOf(fun(myCastedTask._args)); 443 } 444 else 445 { 446 myCastedTask.returnVal = fun(myCastedTask._args); 447 } 448 } 449 450 private TaskPool pool; 451 private bool isScoped; // True if created with scopedTask. 452 453 Args _args; 454 455 /** 456 The arguments the function was called with. Changes to `out` and 457 `ref` arguments will be visible here. 458 */ 459 static if (__traits(isSame, fun, run)) 460 { 461 alias args = _args[1..$]; 462 } 463 else 464 { 465 alias args = _args; 466 } 467 468 469 // The purpose of this code is to decide whether functions whose 470 // return values have unshared aliasing can be executed via 471 // TaskPool from @safe code. See isSafeReturn. 472 static if (__traits(isSame, fun, run)) 473 { 474 static if (isFunctionPointer!(_args[0])) 475 { 476 private enum bool isPure = 477 (functionAttributes!(Args[0]) & FunctionAttribute.pure_) != 0; 478 } 479 else 480 { 481 // BUG: Should check this for delegates too, but std.traits 482 // apparently doesn't allow this. isPure is irrelevant 483 // for delegates, at least for now since shared delegates 484 // don't work. 485 private enum bool isPure = false; 486 } 487 488 } 489 else 490 { 491 // We already know that we can't execute aliases in @safe code, so 492 // just put a dummy value here. 493 private enum bool isPure = false; 494 } 495 496 497 /** 498 The return type of the function called by this `Task`. This can be 499 `void`. 500 */ 501 alias ReturnType = typeof(fun(_args)); 502 503 static if (!is(ReturnType == void)) 504 { 505 static if (is(typeof(&fun(_args)))) 506 { 507 // Ref return. 508 ReturnType* returnVal; 509 510 ref ReturnType fixRef(ReturnType* val) 511 { 512 return *val; 513 } 514 515 } 516 else 517 { 518 ReturnType returnVal; 519 520 ref ReturnType fixRef(ref ReturnType val) 521 { 522 return val; 523 } 524 } 525 } 526 527 private void enforcePool() 528 { 529 import std.exception : enforce; 530 enforce(this.pool !is null, "Job not submitted yet."); 531 } 532 533 static if (Args.length > 0) 534 { 535 private this(Args args) 536 { 537 _args = args; 538 } 539 } 540 541 // Work around DMD bug https://issues.dlang.org/show_bug.cgi?id=6588, 542 // allow immutable elements. 543 static if (allSatisfy!(isAssignable, Args)) 544 { 545 typeof(this) opAssign(typeof(this) rhs) 546 { 547 foreach (i, Type; typeof(this.tupleof)) 548 { 549 this.tupleof[i] = rhs.tupleof[i]; 550 } 551 return this; 552 } 553 } 554 else 555 { 556 @disable typeof(this) opAssign(typeof(this) rhs); 557 } 558 559 /** 560 If the `Task` isn't started yet, execute it in the current thread. 561 If it's done, return its return value, if any. If it's in progress, 562 busy spin until it's done, then return the return value. If it threw 563 an exception, rethrow that exception. 564 565 This function should be used when you expect the result of the 566 `Task` to be available on a timescale shorter than that of an OS 567 context switch. 568 */ 569 @property ref ReturnType spinForce() @trusted 570 { 571 enforcePool(); 572 573 this.pool.tryDeleteExecute(basePtr); 574 575 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {} 576 577 if (exception) 578 { 579 throw exception; 580 } 581 582 static if (!is(ReturnType == void)) 583 { 584 return fixRef(this.returnVal); 585 } 586 } 587 588 /** 589 If the `Task` isn't started yet, execute it in the current thread. 590 If it's done, return its return value, if any. If it's in progress, 591 wait on a condition variable. If it threw an exception, rethrow that 592 exception. 593 594 This function should be used for expensive functions, as waiting on a 595 condition variable introduces latency, but avoids wasted CPU cycles. 596 */ 597 @property ref ReturnType yieldForce() @trusted 598 { 599 enforcePool(); 600 this.pool.tryDeleteExecute(basePtr); 601 602 if (done) 603 { 604 static if (is(ReturnType == void)) 605 { 606 return; 607 } 608 else 609 { 610 return fixRef(this.returnVal); 611 } 612 } 613 614 pool.waiterLock(); 615 scope(exit) pool.waiterUnlock(); 616 617 while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) 618 { 619 pool.waitUntilCompletion(); 620 } 621 622 if (exception) 623 { 624 throw exception; // nocoverage 625 } 626 627 static if (!is(ReturnType == void)) 628 { 629 return fixRef(this.returnVal); 630 } 631 } 632 633 /** 634 If this `Task` was not started yet, execute it in the current 635 thread. If it is finished, return its result. If it is in progress, 636 execute any other `Task` from the `TaskPool` instance that 637 this `Task` was submitted to until this one 638 is finished. If it threw an exception, rethrow that exception. 639 If no other tasks are available or this `Task` was executed using 640 `executeInNewThread`, wait on a condition variable. 641 */ 642 @property ref ReturnType workForce() @trusted 643 { 644 enforcePool(); 645 this.pool.tryDeleteExecute(basePtr); 646 647 while (true) 648 { 649 if (done) // done() implicitly checks for exceptions. 650 { 651 static if (is(ReturnType == void)) 652 { 653 return; 654 } 655 else 656 { 657 return fixRef(this.returnVal); 658 } 659 } 660 661 AbstractTask* job; 662 { 663 // Locking explicitly and calling popNoSync() because 664 // pop() waits on a condition variable if there are no Tasks 665 // in the queue. 666 667 pool.queueLock(); 668 scope(exit) pool.queueUnlock(); 669 job = pool.popNoSync(); 670 } 671 672 673 if (job !is null) 674 { 675 676 version (verboseUnittest) 677 { 678 stderr.writeln("Doing workForce work."); 679 } 680 681 pool.doJob(job); 682 683 if (done) 684 { 685 static if (is(ReturnType == void)) 686 { 687 return; 688 } 689 else 690 { 691 return fixRef(this.returnVal); 692 } 693 } 694 } 695 else 696 { 697 version (verboseUnittest) 698 { 699 stderr.writeln("Yield from workForce."); 700 } 701 702 return yieldForce; 703 } 704 } 705 } 706 707 /** 708 Returns `true` if the `Task` is finished executing. 709 710 Throws: Rethrows any exception thrown during the execution of the 711 `Task`. 712 */ 713 @property bool done() @trusted 714 { 715 // Explicitly forwarded for documentation purposes. 716 return base.done; 717 } 718 719 /** 720 Create a new thread for executing this `Task`, execute it in the 721 newly created thread, then terminate the thread. This can be used for 722 future/promise parallelism. An explicit priority may be given 723 to the `Task`. If one is provided, its value is forwarded to 724 `core.thread.Thread.priority`. See $(REF task, std,parallelism) for 725 usage example. 726 */ 727 void executeInNewThread() @trusted 728 { 729 pool = new TaskPool(basePtr); 730 } 731 732 /// Ditto 733 void executeInNewThread(int priority) @trusted 734 { 735 pool = new TaskPool(basePtr, priority); 736 } 737 738 @safe ~this() 739 { 740 if (isScoped && pool !is null && taskStatus != TaskStatus.done) 741 { 742 yieldForce; 743 } 744 } 745 746 // When this is uncommented, it somehow gets called on returning from 747 // scopedTask even though the struct shouldn't be getting copied. 748 //@disable this(this) {} 749 } 750 751 // Calls `fpOrDelegate` with `args`. This is an 752 // adapter that makes `Task` work with delegates, function pointers and 753 // functors instead of just aliases. 754 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args) 755 { 756 return fpOrDelegate(args); 757 } 758 759 /** 760 Creates a `Task` on the GC heap that calls an alias. This may be executed 761 via `Task.executeInNewThread` or by submitting to a 762 $(REF TaskPool, std,parallelism). A globally accessible instance of 763 `TaskPool` is provided by $(REF taskPool, std,parallelism). 764 765 Returns: A pointer to the `Task`. 766 767 Example: 768 --- 769 // Read two files into memory at the same time. 770 import std.file; 771 772 void main() 773 { 774 // Create and execute a Task for reading 775 // foo.txt. 776 auto file1Task = task!read("foo.txt"); 777 file1Task.executeInNewThread(); 778 779 // Read bar.txt in parallel. 780 auto file2Data = read("bar.txt"); 781 782 // Get the results of reading foo.txt. 783 auto file1Data = file1Task.yieldForce; 784 } 785 --- 786 787 --- 788 // Sorts an array using a parallel quick sort algorithm. 789 // The first partition is done serially. Both recursion 790 // branches are then executed in parallel. 791 // 792 // Timings for sorting an array of 1,000,000 doubles on 793 // an Athlon 64 X2 dual core machine: 794 // 795 // This implementation: 176 milliseconds. 796 // Equivalent serial implementation: 280 milliseconds 797 void parallelSort(T)(T[] data) 798 { 799 // Sort small subarrays serially. 800 if (data.length < 100) 801 { 802 std.algorithm.sort(data); 803 return; 804 } 805 806 // Partition the array. 807 swap(data[$ / 2], data[$ - 1]); 808 auto pivot = data[$ - 1]; 809 bool lessThanPivot(T elem) { return elem < pivot; } 810 811 auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]); 812 swap(data[$ - greaterEqual.length - 1], data[$ - 1]); 813 814 auto less = data[0..$ - greaterEqual.length - 1]; 815 greaterEqual = data[$ - greaterEqual.length..$]; 816 817 // Execute both recursion branches in parallel. 818 auto recurseTask = task!parallelSort(greaterEqual); 819 taskPool.put(recurseTask); 820 parallelSort(less); 821 recurseTask.yieldForce; 822 } 823 --- 824 */ 825 auto task(alias fun, Args...)(Args args) 826 { 827 return new Task!(fun, Args)(args); 828 } 829 830 /** 831 Creates a `Task` on the GC heap that calls a function pointer, delegate, or 832 class/struct with overloaded opCall. 833 834 Example: 835 --- 836 // Read two files in at the same time again, 837 // but this time use a function pointer instead 838 // of an alias to represent std.file.read. 839 import std.file; 840 841 void main() 842 { 843 // Create and execute a Task for reading 844 // foo.txt. 845 auto file1Task = task(&read!string, "foo.txt", size_t.max); 846 file1Task.executeInNewThread(); 847 848 // Read bar.txt in parallel. 849 auto file2Data = read("bar.txt"); 850 851 // Get the results of reading foo.txt. 852 auto file1Data = file1Task.yieldForce; 853 } 854 --- 855 856 Notes: This function takes a non-scope delegate, meaning it can be 857 used with closures. If you can't allocate a closure due to objects 858 on the stack that have scoped destruction, see `scopedTask`, which 859 takes a scope delegate. 860 */ 861 auto task(F, Args...)(F delegateOrFp, Args args) 862 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) 863 { 864 return new Task!(run, F, Args)(delegateOrFp, args); 865 } 866 867 /** 868 Version of `task` usable from `@safe` code. Usage mechanics are 869 identical to the non-@safe case, but safety introduces some restrictions: 870 871 1. `fun` must be @safe or @trusted. 872 873 2. `F` must not have any unshared aliasing as defined by 874 $(REF hasUnsharedAliasing, std,traits). This means it 875 may not be an unshared delegate or a non-shared class or struct 876 with overloaded `opCall`. This also precludes accepting template 877 alias parameters. 878 879 3. `Args` must not have unshared aliasing. 880 881 4. `fun` must not return by reference. 882 883 5. The return type must not have unshared aliasing unless `fun` is 884 `pure` or the `Task` is executed via `executeInNewThread` instead 885 of using a `TaskPool`. 886 887 */ 888 @trusted auto task(F, Args...)(F fun, Args args) 889 if (is(typeof(fun(args))) && isSafeTask!F) 890 { 891 return new Task!(run, F, Args)(fun, args); 892 } 893 894 /** 895 These functions allow the creation of `Task` objects on the stack rather 896 than the GC heap. The lifetime of a `Task` created by `scopedTask` 897 cannot exceed the lifetime of the scope it was created in. 898 899 `scopedTask` might be preferred over `task`: 900 901 1. When a `Task` that calls a delegate is being created and a closure 902 cannot be allocated due to objects on the stack that have scoped 903 destruction. The delegate overload of `scopedTask` takes a `scope` 904 delegate. 905 906 2. As a micro-optimization, to avoid the heap allocation associated with 907 `task` or with the creation of a closure. 908 909 Usage is otherwise identical to `task`. 910 911 Notes: `Task` objects created using `scopedTask` will automatically 912 call `Task.yieldForce` in their destructor if necessary to ensure 913 the `Task` is complete before the stack frame they reside on is destroyed. 914 */ 915 auto scopedTask(alias fun, Args...)(Args args) 916 { 917 auto ret = Task!(fun, Args)(args); 918 ret.isScoped = true; 919 return ret; 920 } 921 922 /// Ditto 923 auto scopedTask(F, Args...)(scope F delegateOrFp, Args args) 924 if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) 925 { 926 auto ret = Task!(run, F, Args)(delegateOrFp, args); 927 ret.isScoped = true; 928 return ret; 929 } 930 931 /// Ditto 932 @trusted auto scopedTask(F, Args...)(F fun, Args args) 933 if (is(typeof(fun(args))) && isSafeTask!F) 934 { 935 auto ret = Task!(run, F, Args)(fun, args); 936 ret.isScoped = true; 937 return ret; 938 } 939 940 /** 941 The total number of CPU cores available on the current machine, as reported by 942 the operating system. 943 */ 944 alias totalCPUs = 945 __lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl); 946 947 uint totalCPUsImpl() @nogc nothrow @trusted 948 { 949 version (Windows) 950 { 951 // BUGS: Only works on Windows 2000 and above. 952 import core.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo; 953 import std.algorithm.comparison : max; 954 SYSTEM_INFO si; 955 GetSystemInfo(&si); 956 return max(1, cast(uint) si.dwNumberOfProcessors); 957 } 958 else version (linux) 959 { 960 import core.stdc.stdlib : calloc; 961 import core.stdc.string : memset; 962 import core.sys.linux.sched : CPU_ALLOC_SIZE, CPU_FREE, CPU_COUNT, CPU_COUNT_S, cpu_set_t, sched_getaffinity; 963 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 964 965 int count = 0; 966 967 /** 968 * According to ruby's source code, CPU_ALLOC() doesn't work as expected. 969 * see: https://github.com/ruby/ruby/commit/7d9e04de496915dd9e4544ee18b1a0026dc79242 970 * 971 * The hardcode number also comes from ruby's source code. 972 * see: https://github.com/ruby/ruby/commit/0fa75e813ecf0f5f3dd01f89aa76d0e25ab4fcd4 973 */ 974 for (int n = 64; n <= 16384; n *= 2) 975 { 976 size_t size = CPU_ALLOC_SIZE(count); 977 if (size >= 0x400) 978 { 979 auto cpuset = cast(cpu_set_t*) calloc(1, size); 980 if (cpuset is null) break; 981 if (sched_getaffinity(0, size, cpuset) == 0) 982 { 983 count = CPU_COUNT_S(size, cpuset); 984 } 985 CPU_FREE(cpuset); 986 } 987 else 988 { 989 cpu_set_t cpuset; 990 if (sched_getaffinity(0, cpu_set_t.sizeof, &cpuset) == 0) 991 { 992 count = CPU_COUNT(&cpuset); 993 } 994 } 995 996 if (count > 0) 997 return cast(uint) count; 998 } 999 1000 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1001 } 1002 else version (Darwin) 1003 { 1004 import core.sys.darwin.sys.sysctl : sysctlbyname; 1005 uint result; 1006 size_t len = result.sizeof; 1007 sysctlbyname("hw.physicalcpu", &result, &len, null, 0); 1008 return result; 1009 } 1010 else version (DragonFlyBSD) 1011 { 1012 import core.sys.dragonflybsd.sys.sysctl : sysctlbyname; 1013 uint result; 1014 size_t len = result.sizeof; 1015 sysctlbyname("hw.ncpu", &result, &len, null, 0); 1016 return result; 1017 } 1018 else version (FreeBSD) 1019 { 1020 import core.sys.freebsd.sys.sysctl : sysctlbyname; 1021 uint result; 1022 size_t len = result.sizeof; 1023 sysctlbyname("hw.ncpu", &result, &len, null, 0); 1024 return result; 1025 } 1026 else version (NetBSD) 1027 { 1028 import core.sys.netbsd.sys.sysctl : sysctlbyname; 1029 uint result; 1030 size_t len = result.sizeof; 1031 sysctlbyname("hw.ncpu", &result, &len, null, 0); 1032 return result; 1033 } 1034 else version (Solaris) 1035 { 1036 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 1037 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1038 } 1039 else version (OpenBSD) 1040 { 1041 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 1042 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1043 } 1044 else version (Hurd) 1045 { 1046 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 1047 return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); 1048 } 1049 else 1050 { 1051 static assert(0, "Don't know how to get N CPUs on this OS."); 1052 } 1053 } 1054 1055 /* 1056 This class serves two purposes: 1057 1058 1. It distinguishes std.parallelism threads from other threads so that 1059 the std.parallelism daemon threads can be terminated. 1060 1061 2. It adds a reference to the pool that the thread is a member of, 1062 which is also necessary to allow the daemon threads to be properly 1063 terminated. 1064 */ 1065 private final class ParallelismThread : Thread 1066 { 1067 this(void delegate() dg) 1068 { 1069 super(dg); 1070 } 1071 1072 TaskPool pool; 1073 } 1074 1075 // Kill daemon threads. 1076 shared static ~this() 1077 { 1078 foreach (ref thread; Thread) 1079 { 1080 auto pthread = cast(ParallelismThread) thread; 1081 if (pthread is null) continue; 1082 auto pool = pthread.pool; 1083 if (!pool.isDaemon) continue; 1084 pool.stop(); 1085 pthread.join(); 1086 } 1087 } 1088 1089 /** 1090 This class encapsulates a task queue and a set of worker threads. Its purpose 1091 is to efficiently map a large number of `Task`s onto a smaller number of 1092 threads. A task queue is a FIFO queue of `Task` objects that have been 1093 submitted to the `TaskPool` and are awaiting execution. A worker thread is a 1094 thread that executes the `Task` at the front of the queue when one is 1095 available and sleeps when the queue is empty. 1096 1097 This class should usually be used via the global instantiation 1098 available via the $(REF taskPool, std,parallelism) property. 1099 Occasionally it is useful to explicitly instantiate a `TaskPool`: 1100 1101 1. When you want `TaskPool` instances with multiple priorities, for example 1102 a low priority pool and a high priority pool. 1103 1104 2. When the threads in the global task pool are waiting on a synchronization 1105 primitive (for example a mutex), and you want to parallelize the code that 1106 needs to run before these threads can be resumed. 1107 1108 Note: The worker threads in this pool will not stop until 1109 `stop` or `finish` is called, even if the main thread 1110 has finished already. This may lead to programs that 1111 never end. If you do not want this behaviour, you can set `isDaemon` 1112 to true. 1113 */ 1114 final class TaskPool 1115 { 1116 private: 1117 1118 // A pool can either be a regular pool or a single-task pool. A 1119 // single-task pool is a dummy pool that's fired up for 1120 // Task.executeInNewThread(). 1121 bool isSingleTask; 1122 1123 ParallelismThread[] pool; 1124 Thread singleTaskThread; 1125 1126 AbstractTask* head; 1127 AbstractTask* tail; 1128 PoolState status = PoolState.running; 1129 Condition workerCondition; 1130 Condition waiterCondition; 1131 Mutex queueMutex; 1132 Mutex waiterMutex; // For waiterCondition 1133 1134 // The instanceStartIndex of the next instance that will be created. 1135 __gshared size_t nextInstanceIndex = 1; 1136 1137 // The index of the current thread. 1138 static size_t threadIndex; 1139 1140 // The index of the first thread in this instance. 1141 immutable size_t instanceStartIndex; 1142 1143 // The index that the next thread to be initialized in this pool will have. 1144 size_t nextThreadIndex; 1145 1146 enum PoolState : ubyte 1147 { 1148 running, 1149 finishing, 1150 stopNow 1151 } 1152 1153 void doJob(AbstractTask* job) 1154 { 1155 assert(job.taskStatus == TaskStatus.inProgress); 1156 assert(job.next is null); 1157 assert(job.prev is null); 1158 1159 scope(exit) 1160 { 1161 if (!isSingleTask) 1162 { 1163 waiterLock(); 1164 scope(exit) waiterUnlock(); 1165 notifyWaiters(); 1166 } 1167 } 1168 1169 try 1170 { 1171 job.job(); 1172 } 1173 catch (Throwable e) 1174 { 1175 job.exception = e; 1176 } 1177 1178 atomicSetUbyte(job.taskStatus, TaskStatus.done); 1179 } 1180 1181 // This function is used for dummy pools created by Task.executeInNewThread(). 1182 void doSingleTask() 1183 { 1184 // No synchronization. Pool is guaranteed to only have one thread, 1185 // and the queue is submitted to before this thread is created. 1186 assert(head); 1187 auto t = head; 1188 t.next = t.prev = head = null; 1189 doJob(t); 1190 } 1191 1192 // This function performs initialization for each thread that affects 1193 // thread local storage and therefore must be done from within the 1194 // worker thread. It then calls executeWorkLoop(). 1195 void startWorkLoop() 1196 { 1197 // Initialize thread index. 1198 { 1199 queueLock(); 1200 scope(exit) queueUnlock(); 1201 threadIndex = nextThreadIndex; 1202 nextThreadIndex++; 1203 } 1204 1205 executeWorkLoop(); 1206 } 1207 1208 // This is the main work loop that worker threads spend their time in 1209 // until they terminate. It's also entered by non-worker threads when 1210 // finish() is called with the blocking variable set to true. 1211 void executeWorkLoop() 1212 { 1213 while (atomicReadUbyte(status) != PoolState.stopNow) 1214 { 1215 AbstractTask* task = pop(); 1216 if (task is null) 1217 { 1218 if (atomicReadUbyte(status) == PoolState.finishing) 1219 { 1220 atomicSetUbyte(status, PoolState.stopNow); 1221 return; 1222 } 1223 } 1224 else 1225 { 1226 doJob(task); 1227 } 1228 } 1229 } 1230 1231 // Pop a task off the queue. 1232 AbstractTask* pop() 1233 { 1234 queueLock(); 1235 scope(exit) queueUnlock(); 1236 auto ret = popNoSync(); 1237 while (ret is null && status == PoolState.running) 1238 { 1239 wait(); 1240 ret = popNoSync(); 1241 } 1242 return ret; 1243 } 1244 1245 AbstractTask* popNoSync() 1246 out(returned) 1247 { 1248 /* If task.prev and task.next aren't null, then another thread 1249 * can try to delete this task from the pool after it's 1250 * alreadly been deleted/popped. 1251 */ 1252 if (returned !is null) 1253 { 1254 assert(returned.next is null); 1255 assert(returned.prev is null); 1256 } 1257 } 1258 do 1259 { 1260 if (isSingleTask) return null; 1261 1262 AbstractTask* returned = head; 1263 if (head !is null) 1264 { 1265 head = head.next; 1266 returned.prev = null; 1267 returned.next = null; 1268 returned.taskStatus = TaskStatus.inProgress; 1269 } 1270 if (head !is null) 1271 { 1272 head.prev = null; 1273 } 1274 1275 return returned; 1276 } 1277 1278 // Push a task onto the queue. 1279 void abstractPut(AbstractTask* task) 1280 { 1281 queueLock(); 1282 scope(exit) queueUnlock(); 1283 abstractPutNoSync(task); 1284 } 1285 1286 void abstractPutNoSync(AbstractTask* task) 1287 in 1288 { 1289 assert(task); 1290 } 1291 out 1292 { 1293 import std.conv : text; 1294 1295 assert(tail.prev !is tail); 1296 assert(tail.next is null, text(tail.prev, '\t', tail.next)); 1297 if (tail.prev !is null) 1298 { 1299 assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next)); 1300 } 1301 } 1302 do 1303 { 1304 // Not using enforce() to save on function call overhead since this 1305 // is a performance critical function. 1306 if (status != PoolState.running) 1307 { 1308 throw new Error( 1309 "Cannot submit a new task to a pool after calling " ~ 1310 "finish() or stop()." 1311 ); 1312 } 1313 1314 task.next = null; 1315 if (head is null) //Queue is empty. 1316 { 1317 head = task; 1318 tail = task; 1319 tail.prev = null; 1320 } 1321 else 1322 { 1323 assert(tail); 1324 task.prev = tail; 1325 tail.next = task; 1326 tail = task; 1327 } 1328 notify(); 1329 } 1330 1331 void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t) 1332 { 1333 if (status != PoolState.running) 1334 { 1335 throw new Error( 1336 "Cannot submit a new task to a pool after calling " ~ 1337 "finish() or stop()." 1338 ); 1339 } 1340 1341 if (head is null) 1342 { 1343 head = h; 1344 tail = t; 1345 } 1346 else 1347 { 1348 h.prev = tail; 1349 tail.next = h; 1350 tail = t; 1351 } 1352 1353 notifyAll(); 1354 } 1355 1356 void tryDeleteExecute(AbstractTask* toExecute) 1357 { 1358 if (isSingleTask) return; 1359 1360 if ( !deleteItem(toExecute) ) 1361 { 1362 return; 1363 } 1364 1365 try 1366 { 1367 toExecute.job(); 1368 } 1369 catch (Exception e) 1370 { 1371 toExecute.exception = e; 1372 } 1373 1374 atomicSetUbyte(toExecute.taskStatus, TaskStatus.done); 1375 } 1376 1377 bool deleteItem(AbstractTask* item) 1378 { 1379 queueLock(); 1380 scope(exit) queueUnlock(); 1381 return deleteItemNoSync(item); 1382 } 1383 1384 bool deleteItemNoSync(AbstractTask* item) 1385 { 1386 if (item.taskStatus != TaskStatus.notStarted) 1387 { 1388 return false; 1389 } 1390 item.taskStatus = TaskStatus.inProgress; 1391 1392 if (item is head) 1393 { 1394 // Make sure head gets set properly. 1395 popNoSync(); 1396 return true; 1397 } 1398 if (item is tail) 1399 { 1400 tail = tail.prev; 1401 if (tail !is null) 1402 { 1403 tail.next = null; 1404 } 1405 item.next = null; 1406 item.prev = null; 1407 return true; 1408 } 1409 if (item.next !is null) 1410 { 1411 assert(item.next.prev is item); // Check queue consistency. 1412 item.next.prev = item.prev; 1413 } 1414 if (item.prev !is null) 1415 { 1416 assert(item.prev.next is item); // Check queue consistency. 1417 item.prev.next = item.next; 1418 } 1419 item.next = null; 1420 item.prev = null; 1421 return true; 1422 } 1423 1424 void queueLock() 1425 { 1426 assert(queueMutex); 1427 if (!isSingleTask) queueMutex.lock(); 1428 } 1429 1430 void queueUnlock() 1431 { 1432 assert(queueMutex); 1433 if (!isSingleTask) queueMutex.unlock(); 1434 } 1435 1436 void waiterLock() 1437 { 1438 if (!isSingleTask) waiterMutex.lock(); 1439 } 1440 1441 void waiterUnlock() 1442 { 1443 if (!isSingleTask) waiterMutex.unlock(); 1444 } 1445 1446 void wait() 1447 { 1448 if (!isSingleTask) workerCondition.wait(); 1449 } 1450 1451 void notify() 1452 { 1453 if (!isSingleTask) workerCondition.notify(); 1454 } 1455 1456 void notifyAll() 1457 { 1458 if (!isSingleTask) workerCondition.notifyAll(); 1459 } 1460 1461 void waitUntilCompletion() 1462 { 1463 if (isSingleTask) 1464 { 1465 singleTaskThread.join(); 1466 } 1467 else 1468 { 1469 waiterCondition.wait(); 1470 } 1471 } 1472 1473 void notifyWaiters() 1474 { 1475 if (!isSingleTask) waiterCondition.notifyAll(); 1476 } 1477 1478 // Private constructor for creating dummy pools that only have one thread, 1479 // only execute one Task, and then terminate. This is used for 1480 // Task.executeInNewThread(). 1481 this(AbstractTask* task, int priority = int.max) 1482 { 1483 assert(task); 1484 1485 // Dummy value, not used. 1486 instanceStartIndex = 0; 1487 1488 this.isSingleTask = true; 1489 task.taskStatus = TaskStatus.inProgress; 1490 this.head = task; 1491 singleTaskThread = new Thread(&doSingleTask); 1492 singleTaskThread.start(); 1493 1494 // Disabled until writing code to support 1495 // running thread with specified priority 1496 // See https://issues.dlang.org/show_bug.cgi?id=8960 1497 1498 /*if (priority != int.max) 1499 { 1500 singleTaskThread.priority = priority; 1501 }*/ 1502 } 1503 1504 public: 1505 // This is used in parallel_algorithm but is too unstable to document 1506 // as public API. 1507 size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow 1508 { 1509 import std.algorithm.comparison : max; 1510 1511 if (this.size == 0) 1512 { 1513 return max(rangeLen, 1); 1514 } 1515 1516 immutable size_t eightSize = 4 * (this.size + 1); 1517 auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1); 1518 return max(ret, 1); 1519 } 1520 1521 /** 1522 Default constructor that initializes a `TaskPool` with 1523 `totalCPUs` - 1 worker threads. The minus 1 is included because the 1524 main thread will also be available to do work. 1525 1526 Note: On single-core machines, the primitives provided by `TaskPool` 1527 operate transparently in single-threaded mode. 1528 */ 1529 this() @trusted 1530 { 1531 this(totalCPUs - 1); 1532 } 1533 1534 /** 1535 Allows for custom number of worker threads. 1536 */ 1537 this(size_t nWorkers) @trusted 1538 { 1539 synchronized(typeid(TaskPool)) 1540 { 1541 instanceStartIndex = nextInstanceIndex; 1542 1543 // The first worker thread to be initialized will have this index, 1544 // and will increment it. The second worker to be initialized will 1545 // have this index plus 1. 1546 nextThreadIndex = instanceStartIndex; 1547 nextInstanceIndex += nWorkers; 1548 } 1549 1550 queueMutex = new Mutex(this); 1551 waiterMutex = new Mutex(); 1552 workerCondition = new Condition(queueMutex); 1553 waiterCondition = new Condition(waiterMutex); 1554 1555 pool = new ParallelismThread[nWorkers]; 1556 foreach (ref poolThread; pool) 1557 { 1558 poolThread = new ParallelismThread(&startWorkLoop); 1559 poolThread.pool = this; 1560 poolThread.start(); 1561 } 1562 } 1563 1564 /** 1565 Implements a parallel foreach loop over a range. This works by implicitly 1566 creating and submitting one `Task` to the `TaskPool` for each worker 1567 thread. A work unit is a set of consecutive elements of `range` to 1568 be processed by a worker thread between communication with any other 1569 thread. The number of elements processed per work unit is controlled by the 1570 `workUnitSize` parameter. Smaller work units provide better load 1571 balancing, but larger work units avoid the overhead of communicating 1572 with other threads frequently to fetch the next work unit. Large work 1573 units also avoid false sharing in cases where the range is being modified. 1574 The less time a single iteration of the loop takes, the larger 1575 `workUnitSize` should be. For very expensive loop bodies, 1576 `workUnitSize` should be 1. An overload that chooses a default work 1577 unit size is also available. 1578 1579 Example: 1580 --- 1581 // Find the logarithm of every number from 1 to 1582 // 10_000_000 in parallel. 1583 auto logs = new double[10_000_000]; 1584 1585 // Parallel foreach works with or without an index 1586 // variable. It can iterate by ref if range.front 1587 // returns by ref. 1588 1589 // Iterate over logs using work units of size 100. 1590 foreach (i, ref elem; taskPool.parallel(logs, 100)) 1591 { 1592 elem = log(i + 1.0); 1593 } 1594 1595 // Same thing, but use the default work unit size. 1596 // 1597 // Timings on an Athlon 64 X2 dual core machine: 1598 // 1599 // Parallel foreach: 388 milliseconds 1600 // Regular foreach: 619 milliseconds 1601 foreach (i, ref elem; taskPool.parallel(logs)) 1602 { 1603 elem = log(i + 1.0); 1604 } 1605 --- 1606 1607 Notes: 1608 1609 The memory usage of this implementation is guaranteed to be constant 1610 in `range.length`. 1611 1612 Breaking from a parallel foreach loop via a break, labeled break, 1613 labeled continue, return or goto statement throws a 1614 `ParallelForeachError`. 1615 1616 In the case of non-random access ranges, parallel foreach buffers lazily 1617 to an array of size `workUnitSize` before executing the parallel portion 1618 of the loop. The exception is that, if a parallel foreach is executed 1619 over a range returned by `asyncBuf` or `map`, the copying is elided 1620 and the buffers are simply swapped. In this case `workUnitSize` is 1621 ignored and the work unit size is set to the buffer size of `range`. 1622 1623 A memory barrier is guaranteed to be executed on exit from the loop, 1624 so that results produced by all threads are visible in the calling thread. 1625 1626 $(B Exception Handling): 1627 1628 When at least one exception is thrown from inside a parallel foreach loop, 1629 the submission of additional `Task` objects is terminated as soon as 1630 possible, in a non-deterministic manner. All executing or 1631 enqueued work units are allowed to complete. Then, all exceptions that 1632 were thrown by any work unit are chained using `Throwable.next` and 1633 rethrown. The order of the exception chaining is non-deterministic. 1634 */ 1635 ParallelForeach!R parallel(R)(R range, size_t workUnitSize) 1636 { 1637 import std.exception : enforce; 1638 enforce(workUnitSize > 0, "workUnitSize must be > 0."); 1639 alias RetType = ParallelForeach!R; 1640 return RetType(this, range, workUnitSize); 1641 } 1642 1643 1644 /// Ditto 1645 ParallelForeach!R parallel(R)(R range) 1646 { 1647 static if (hasLength!R) 1648 { 1649 // Default work unit size is such that we would use 4x as many 1650 // slots as are in this thread pool. 1651 size_t workUnitSize = defaultWorkUnitSize(range.length); 1652 return parallel(range, workUnitSize); 1653 } 1654 else 1655 { 1656 // Just use a really, really dumb guess if the user is too lazy to 1657 // specify. 1658 return parallel(range, 512); 1659 } 1660 } 1661 1662 /// 1663 template amap(functions...) 1664 { 1665 /** 1666 Eager parallel map. The eagerness of this function means it has less 1667 overhead than the lazily evaluated `TaskPool.map` and should be 1668 preferred where the memory requirements of eagerness are acceptable. 1669 `functions` are the functions to be evaluated, passed as template 1670 alias parameters in a style similar to 1671 $(REF map, std,algorithm,iteration). 1672 The first argument must be a random access range. For performance 1673 reasons, amap will assume the range elements have not yet been 1674 initialized. Elements will be overwritten without calling a destructor 1675 nor doing an assignment. As such, the range must not contain meaningful 1676 data$(DDOC_COMMENT not a section): either un-initialized objects, or 1677 objects in their `.init` state. 1678 1679 --- 1680 auto numbers = iota(100_000_000.0); 1681 1682 // Find the square roots of numbers. 1683 // 1684 // Timings on an Athlon 64 X2 dual core machine: 1685 // 1686 // Parallel eager map: 0.802 s 1687 // Equivalent serial implementation: 1.768 s 1688 auto squareRoots = taskPool.amap!sqrt(numbers); 1689 --- 1690 1691 Immediately after the range argument, an optional work unit size argument 1692 may be provided. Work units as used by `amap` are identical to those 1693 defined for parallel foreach. If no work unit size is provided, the 1694 default work unit size is used. 1695 1696 --- 1697 // Same thing, but make work unit size 100. 1698 auto squareRoots = taskPool.amap!sqrt(numbers, 100); 1699 --- 1700 1701 An output range for returning the results may be provided as the last 1702 argument. If one is not provided, an array of the proper type will be 1703 allocated on the garbage collected heap. If one is provided, it must be a 1704 random access range with assignable elements, must have reference 1705 semantics with respect to assignment to its elements, and must have the 1706 same length as the input range. Writing to adjacent elements from 1707 different threads must be safe. 1708 1709 --- 1710 // Same thing, but explicitly allocate an array 1711 // to return the results in. The element type 1712 // of the array may be either the exact type 1713 // returned by functions or an implicit conversion 1714 // target. 1715 auto squareRoots = new float[numbers.length]; 1716 taskPool.amap!sqrt(numbers, squareRoots); 1717 1718 // Multiple functions, explicit output range, and 1719 // explicit work unit size. 1720 auto results = new Tuple!(float, real)[numbers.length]; 1721 taskPool.amap!(sqrt, log)(numbers, 100, results); 1722 --- 1723 1724 Note: 1725 1726 A memory barrier is guaranteed to be executed after all results are written 1727 but before returning so that results produced by all threads are visible 1728 in the calling thread. 1729 1730 Tips: 1731 1732 To perform the mapping operation in place, provide the same range for the 1733 input and output range. 1734 1735 To parallelize the copying of a range with expensive to evaluate elements 1736 to an array, pass an identity function (a function that just returns 1737 whatever argument is provided to it) to `amap`. 1738 1739 $(B Exception Handling): 1740 1741 When at least one exception is thrown from inside the map functions, 1742 the submission of additional `Task` objects is terminated as soon as 1743 possible, in a non-deterministic manner. All currently executing or 1744 enqueued work units are allowed to complete. Then, all exceptions that 1745 were thrown from any work unit are chained using `Throwable.next` and 1746 rethrown. The order of the exception chaining is non-deterministic. 1747 */ 1748 auto amap(Args...)(Args args) 1749 if (isRandomAccessRange!(Args[0])) 1750 { 1751 import core.internal.lifetime : emplaceRef; 1752 1753 alias fun = adjoin!(staticMap!(unaryFun, functions)); 1754 1755 alias range = args[0]; 1756 immutable len = range.length; 1757 1758 static if ( 1759 Args.length > 1 && 1760 randAssignable!(Args[$ - 1]) && 1761 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1])) 1762 ) 1763 { 1764 import std.conv : text; 1765 import std.exception : enforce; 1766 1767 alias buf = args[$ - 1]; 1768 alias args2 = args[0..$ - 1]; 1769 alias Args2 = Args[0..$ - 1]; 1770 enforce(buf.length == len, 1771 text("Can't use a user supplied buffer that's the wrong ", 1772 "size. (Expected :", len, " Got: ", buf.length)); 1773 } 1774 else static if (randAssignable!(Args[$ - 1]) && Args.length > 1) 1775 { 1776 static assert(0, "Wrong buffer type."); 1777 } 1778 else 1779 { 1780 import std.array : uninitializedArray; 1781 1782 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len); 1783 alias args2 = args; 1784 alias Args2 = Args; 1785 } 1786 1787 if (!len) return buf; 1788 1789 static if (isIntegral!(Args2[$ - 1])) 1790 { 1791 static assert(args2.length == 2); 1792 auto workUnitSize = cast(size_t) args2[1]; 1793 } 1794 else 1795 { 1796 static assert(args2.length == 1, Args); 1797 auto workUnitSize = defaultWorkUnitSize(range.length); 1798 } 1799 1800 alias R = typeof(range); 1801 1802 if (workUnitSize > len) 1803 { 1804 workUnitSize = len; 1805 } 1806 1807 // Handle as a special case: 1808 if (size == 0) 1809 { 1810 size_t index = 0; 1811 foreach (elem; range) 1812 { 1813 emplaceRef(buf[index++], fun(elem)); 1814 } 1815 return buf; 1816 } 1817 1818 // Effectively -1: chunkIndex + 1 == 0: 1819 shared size_t workUnitIndex = size_t.max; 1820 shared bool shouldContinue = true; 1821 1822 void doIt() 1823 { 1824 import std.algorithm.comparison : min; 1825 1826 scope(failure) 1827 { 1828 // If an exception is thrown, all threads should bail. 1829 atomicStore(shouldContinue, false); 1830 } 1831 1832 while (atomicLoad(shouldContinue)) 1833 { 1834 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); 1835 immutable start = workUnitSize * myUnitIndex; 1836 if (start >= len) 1837 { 1838 atomicStore(shouldContinue, false); 1839 break; 1840 } 1841 1842 immutable end = min(len, start + workUnitSize); 1843 1844 static if (hasSlicing!R) 1845 { 1846 auto subrange = range[start .. end]; 1847 foreach (i; start .. end) 1848 { 1849 emplaceRef(buf[i], fun(subrange.front)); 1850 subrange.popFront(); 1851 } 1852 } 1853 else 1854 { 1855 foreach (i; start .. end) 1856 { 1857 emplaceRef(buf[i], fun(range[i])); 1858 } 1859 } 1860 } 1861 } 1862 1863 submitAndExecute(this, &doIt); 1864 return buf; 1865 } 1866 } 1867 1868 /// 1869 template map(functions...) 1870 { 1871 /** 1872 A semi-lazy parallel map that can be used for pipelining. The map 1873 functions are evaluated for the first `bufSize` elements and stored in a 1874 buffer and made available to `popFront`. Meanwhile, in the 1875 background a second buffer of the same size is filled. When the first 1876 buffer is exhausted, it is swapped with the second buffer and filled while 1877 the values from what was originally the second buffer are read. This 1878 implementation allows for elements to be written to the buffer without 1879 the need for atomic operations or synchronization for each write, and 1880 enables the mapping function to be evaluated efficiently in parallel. 1881 1882 `map` has more overhead than the simpler procedure used by `amap` 1883 but avoids the need to keep all results in memory simultaneously and works 1884 with non-random access ranges. 1885 1886 Params: 1887 1888 source = The $(REF_ALTTEXT input range, isInputRange, std,range,primitives) 1889 to be mapped. If `source` is not random 1890 access it will be lazily buffered to an array of size `bufSize` before 1891 the map function is evaluated. (For an exception to this rule, see Notes.) 1892 1893 bufSize = The size of the buffer to store the evaluated elements. 1894 1895 workUnitSize = The number of elements to evaluate in a single 1896 `Task`. Must be less than or equal to `bufSize`, and 1897 should be a fraction of `bufSize` such that all worker threads can be 1898 used. If the default of size_t.max is used, workUnitSize will be set to 1899 the pool-wide default. 1900 1901 Returns: An input range representing the results of the map. This range 1902 has a length iff `source` has a length. 1903 1904 Notes: 1905 1906 If a range returned by `map` or `asyncBuf` is used as an input to 1907 `map`, then as an optimization the copying from the output buffer 1908 of the first range to the input buffer of the second range is elided, even 1909 though the ranges returned by `map` and `asyncBuf` are non-random 1910 access ranges. This means that the `bufSize` parameter passed to the 1911 current call to `map` will be ignored and the size of the buffer 1912 will be the buffer size of `source`. 1913 1914 Example: 1915 --- 1916 // Pipeline reading a file, converting each line 1917 // to a number, taking the logarithms of the numbers, 1918 // and performing the additions necessary to find 1919 // the sum of the logarithms. 1920 1921 auto lineRange = File("numberList.txt").byLine(); 1922 auto dupedLines = std.algorithm.map!"a.idup"(lineRange); 1923 auto nums = taskPool.map!(to!double)(dupedLines); 1924 auto logs = taskPool.map!log10(nums); 1925 1926 double sum = 0; 1927 foreach (elem; logs) 1928 { 1929 sum += elem; 1930 } 1931 --- 1932 1933 $(B Exception Handling): 1934 1935 Any exceptions thrown while iterating over `source` 1936 or computing the map function are re-thrown on a call to `popFront` or, 1937 if thrown during construction, are simply allowed to propagate to the 1938 caller. In the case of exceptions thrown while computing the map function, 1939 the exceptions are chained as in `TaskPool.amap`. 1940 */ 1941 auto 1942 map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max) 1943 if (isInputRange!S) 1944 { 1945 import std.exception : enforce; 1946 1947 enforce(workUnitSize == size_t.max || workUnitSize <= bufSize, 1948 "Work unit size must be smaller than buffer size."); 1949 alias fun = adjoin!(staticMap!(unaryFun, functions)); 1950 1951 static final class Map 1952 { 1953 // This is a class because the task needs to be located on the 1954 // heap and in the non-random access case source needs to be on 1955 // the heap, too. 1956 1957 private: 1958 enum bufferTrick = is(typeof(source.buf1)) && 1959 is(typeof(source.bufPos)) && 1960 is(typeof(source.doBufSwap())); 1961 1962 alias E = MapType!(S, functions); 1963 E[] buf1, buf2; 1964 S source; 1965 TaskPool pool; 1966 Task!(run, E[] delegate(E[]), E[]) nextBufTask; 1967 size_t workUnitSize; 1968 size_t bufPos; 1969 bool lastTaskWaited; 1970 1971 static if (isRandomAccessRange!S) 1972 { 1973 alias FromType = S; 1974 1975 void popSource() 1976 { 1977 import std.algorithm.comparison : min; 1978 1979 static if (__traits(compiles, source[0 .. source.length])) 1980 { 1981 source = source[min(buf1.length, source.length)..source.length]; 1982 } 1983 else static if (__traits(compiles, source[0..$])) 1984 { 1985 source = source[min(buf1.length, source.length)..$]; 1986 } 1987 else 1988 { 1989 static assert(0, "S must have slicing for Map." 1990 ~ " " ~ S.stringof ~ " doesn't."); 1991 } 1992 } 1993 } 1994 else static if (bufferTrick) 1995 { 1996 // Make sure we don't have the buffer recycling overload of 1997 // asyncBuf. 1998 static if ( 1999 is(typeof(source.source)) && 2000 isRoundRobin!(typeof(source.source)) 2001 ) 2002 { 2003 static assert(0, "Cannot execute a parallel map on " ~ 2004 "the buffer recycling overload of asyncBuf." 2005 ); 2006 } 2007 2008 alias FromType = typeof(source.buf1); 2009 FromType from; 2010 2011 // Just swap our input buffer with source's output buffer. 2012 // No need to copy element by element. 2013 FromType dumpToFrom() 2014 { 2015 import std.algorithm.mutation : swap; 2016 2017 assert(source.buf1.length <= from.length); 2018 from.length = source.buf1.length; 2019 swap(source.buf1, from); 2020 2021 // Just in case this source has been popped before 2022 // being sent to map: 2023 from = from[source.bufPos..$]; 2024 2025 static if (is(typeof(source._length))) 2026 { 2027 source._length -= (from.length - source.bufPos); 2028 } 2029 2030 source.doBufSwap(); 2031 2032 return from; 2033 } 2034 } 2035 else 2036 { 2037 alias FromType = ElementType!S[]; 2038 2039 // The temporary array that data is copied to before being 2040 // mapped. 2041 FromType from; 2042 2043 FromType dumpToFrom() 2044 { 2045 assert(from !is null); 2046 2047 size_t i; 2048 for (; !source.empty && i < from.length; source.popFront()) 2049 { 2050 from[i++] = source.front; 2051 } 2052 2053 from = from[0 .. i]; 2054 return from; 2055 } 2056 } 2057 2058 static if (hasLength!S) 2059 { 2060 size_t _length; 2061 2062 public @property size_t length() const @safe pure nothrow 2063 { 2064 return _length; 2065 } 2066 } 2067 2068 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool) 2069 { 2070 static if (bufferTrick) 2071 { 2072 bufSize = source.buf1.length; 2073 } 2074 2075 buf1.length = bufSize; 2076 buf2.length = bufSize; 2077 2078 static if (!isRandomAccessRange!S) 2079 { 2080 from.length = bufSize; 2081 } 2082 2083 this.workUnitSize = (workUnitSize == size_t.max) ? 2084 pool.defaultWorkUnitSize(bufSize) : workUnitSize; 2085 this.source = source; 2086 this.pool = pool; 2087 2088 static if (hasLength!S) 2089 { 2090 _length = source.length; 2091 } 2092 2093 buf1 = fillBuf(buf1); 2094 submitBuf2(); 2095 } 2096 2097 // The from parameter is a dummy and ignored in the random access 2098 // case. 2099 E[] fillBuf(E[] buf) 2100 { 2101 import std.algorithm.comparison : min; 2102 2103 static if (isRandomAccessRange!S) 2104 { 2105 import std.range : take; 2106 auto toMap = take(source, buf.length); 2107 scope(success) popSource(); 2108 } 2109 else 2110 { 2111 auto toMap = dumpToFrom(); 2112 } 2113 2114 buf = buf[0 .. min(buf.length, toMap.length)]; 2115 2116 // Handle as a special case: 2117 if (pool.size == 0) 2118 { 2119 size_t index = 0; 2120 foreach (elem; toMap) 2121 { 2122 buf[index++] = fun(elem); 2123 } 2124 return buf; 2125 } 2126 2127 pool.amap!functions(toMap, workUnitSize, buf); 2128 2129 return buf; 2130 } 2131 2132 void submitBuf2() 2133 in 2134 { 2135 assert(nextBufTask.prev is null); 2136 assert(nextBufTask.next is null); 2137 } 2138 do 2139 { 2140 // Hack to reuse the task object. 2141 2142 nextBufTask = typeof(nextBufTask).init; 2143 nextBufTask._args[0] = &fillBuf; 2144 nextBufTask._args[1] = buf2; 2145 pool.put(nextBufTask); 2146 } 2147 2148 void doBufSwap() 2149 { 2150 if (lastTaskWaited) 2151 { 2152 // Then the source is empty. Signal it here. 2153 buf1 = null; 2154 buf2 = null; 2155 2156 static if (!isRandomAccessRange!S) 2157 { 2158 from = null; 2159 } 2160 2161 return; 2162 } 2163 2164 buf2 = buf1; 2165 buf1 = nextBufTask.yieldForce; 2166 bufPos = 0; 2167 2168 if (source.empty) 2169 { 2170 lastTaskWaited = true; 2171 } 2172 else 2173 { 2174 submitBuf2(); 2175 } 2176 } 2177 2178 public: 2179 @property auto front() 2180 { 2181 return buf1[bufPos]; 2182 } 2183 2184 void popFront() 2185 { 2186 static if (hasLength!S) 2187 { 2188 _length--; 2189 } 2190 2191 bufPos++; 2192 if (bufPos >= buf1.length) 2193 { 2194 doBufSwap(); 2195 } 2196 } 2197 2198 static if (isInfinite!S) 2199 { 2200 enum bool empty = false; 2201 } 2202 else 2203 { 2204 2205 bool empty() const @property 2206 { 2207 // popFront() sets this when source is empty 2208 return buf1.length == 0; 2209 } 2210 } 2211 } 2212 return new Map(source, bufSize, workUnitSize, this); 2213 } 2214 } 2215 2216 /** 2217 Given a `source` range that is expensive to iterate over, returns an 2218 $(REF_ALTTEXT input range, isInputRange, std,range,primitives) that 2219 asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread, 2220 while making previously buffered elements from a second buffer, also of size 2221 `bufSize`, available via the range interface of the returned 2222 object. The returned range has a length iff `hasLength!S`. 2223 `asyncBuf` is useful, for example, when performing expensive operations 2224 on the elements of ranges that represent data on a disk or network. 2225 2226 Example: 2227 --- 2228 import std.conv, std.stdio; 2229 2230 void main() 2231 { 2232 // Fetch lines of a file in a background thread 2233 // while processing previously fetched lines, 2234 // dealing with byLine's buffer recycling by 2235 // eagerly duplicating every line. 2236 auto lines = File("foo.txt").byLine(); 2237 auto duped = std.algorithm.map!"a.idup"(lines); 2238 2239 // Fetch more lines in the background while we 2240 // process the lines already read into memory 2241 // into a matrix of doubles. 2242 double[][] matrix; 2243 auto asyncReader = taskPool.asyncBuf(duped); 2244 2245 foreach (line; asyncReader) 2246 { 2247 auto ls = line.split("\t"); 2248 matrix ~= to!(double[])(ls); 2249 } 2250 } 2251 --- 2252 2253 $(B Exception Handling): 2254 2255 Any exceptions thrown while iterating over `source` are re-thrown on a 2256 call to `popFront` or, if thrown during construction, simply 2257 allowed to propagate to the caller. 2258 */ 2259 auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S) 2260 { 2261 static final class AsyncBuf 2262 { 2263 // This is a class because the task and source both need to be on 2264 // the heap. 2265 2266 // The element type of S. 2267 alias E = ElementType!S; // Needs to be here b/c of forward ref bugs. 2268 2269 private: 2270 E[] buf1, buf2; 2271 S source; 2272 TaskPool pool; 2273 Task!(run, E[] delegate(E[]), E[]) nextBufTask; 2274 size_t bufPos; 2275 bool lastTaskWaited; 2276 2277 static if (hasLength!S) 2278 { 2279 size_t _length; 2280 2281 // Available if hasLength!S. 2282 public @property size_t length() const @safe pure nothrow 2283 { 2284 return _length; 2285 } 2286 } 2287 2288 this(S source, size_t bufSize, TaskPool pool) 2289 { 2290 buf1.length = bufSize; 2291 buf2.length = bufSize; 2292 2293 this.source = source; 2294 this.pool = pool; 2295 2296 static if (hasLength!S) 2297 { 2298 _length = source.length; 2299 } 2300 2301 buf1 = fillBuf(buf1); 2302 submitBuf2(); 2303 } 2304 2305 E[] fillBuf(E[] buf) 2306 { 2307 assert(buf !is null); 2308 2309 size_t i; 2310 for (; !source.empty && i < buf.length; source.popFront()) 2311 { 2312 buf[i++] = source.front; 2313 } 2314 2315 buf = buf[0 .. i]; 2316 return buf; 2317 } 2318 2319 void submitBuf2() 2320 in 2321 { 2322 assert(nextBufTask.prev is null); 2323 assert(nextBufTask.next is null); 2324 } 2325 do 2326 { 2327 // Hack to reuse the task object. 2328 2329 nextBufTask = typeof(nextBufTask).init; 2330 nextBufTask._args[0] = &fillBuf; 2331 nextBufTask._args[1] = buf2; 2332 pool.put(nextBufTask); 2333 } 2334 2335 void doBufSwap() 2336 { 2337 if (lastTaskWaited) 2338 { 2339 // Then source is empty. Signal it here. 2340 buf1 = null; 2341 buf2 = null; 2342 return; 2343 } 2344 2345 buf2 = buf1; 2346 buf1 = nextBufTask.yieldForce; 2347 bufPos = 0; 2348 2349 if (source.empty) 2350 { 2351 lastTaskWaited = true; 2352 } 2353 else 2354 { 2355 submitBuf2(); 2356 } 2357 } 2358 2359 public: 2360 E front() @property 2361 { 2362 return buf1[bufPos]; 2363 } 2364 2365 void popFront() 2366 { 2367 static if (hasLength!S) 2368 { 2369 _length--; 2370 } 2371 2372 bufPos++; 2373 if (bufPos >= buf1.length) 2374 { 2375 doBufSwap(); 2376 } 2377 } 2378 2379 static if (isInfinite!S) 2380 { 2381 enum bool empty = false; 2382 } 2383 2384 else 2385 { 2386 /// 2387 bool empty() @property 2388 { 2389 // popFront() sets this when source is empty: 2390 return buf1.length == 0; 2391 } 2392 } 2393 } 2394 return new AsyncBuf(source, bufSize, this); 2395 } 2396 2397 /** 2398 Given a callable object `next` that writes to a user-provided buffer and 2399 a second callable object `empty` that determines whether more data is 2400 available to write via `next`, returns an input range that 2401 asynchronously calls `next` with a set of size `nBuffers` of buffers 2402 and makes the results available in the order they were obtained via the 2403 input range interface of the returned object. Similarly to the 2404 input range overload of `asyncBuf`, the first half of the buffers 2405 are made available via the range interface while the second half are 2406 filled and vice-versa. 2407 2408 Params: 2409 2410 next = A callable object that takes a single argument that must be an array 2411 with mutable elements. When called, `next` writes data to 2412 the array provided by the caller. 2413 2414 empty = A callable object that takes no arguments and returns a type 2415 implicitly convertible to `bool`. This is used to signify 2416 that no more data is available to be obtained by calling `next`. 2417 2418 initialBufSize = The initial size of each buffer. If `next` takes its 2419 array by reference, it may resize the buffers. 2420 2421 nBuffers = The number of buffers to cycle through when calling `next`. 2422 2423 Example: 2424 --- 2425 // Fetch lines of a file in a background 2426 // thread while processing previously fetched 2427 // lines, without duplicating any lines. 2428 auto file = File("foo.txt"); 2429 2430 void next(ref char[] buf) 2431 { 2432 file.readln(buf); 2433 } 2434 2435 // Fetch more lines in the background while we 2436 // process the lines already read into memory 2437 // into a matrix of doubles. 2438 double[][] matrix; 2439 auto asyncReader = taskPool.asyncBuf(&next, &file.eof); 2440 2441 foreach (line; asyncReader) 2442 { 2443 auto ls = line.split("\t"); 2444 matrix ~= to!(double[])(ls); 2445 } 2446 --- 2447 2448 $(B Exception Handling): 2449 2450 Any exceptions thrown while iterating over `range` are re-thrown on a 2451 call to `popFront`. 2452 2453 Warning: 2454 2455 Using the range returned by this function in a parallel foreach loop 2456 will not work because buffers may be overwritten while the task that 2457 processes them is in queue. This is checked for at compile time 2458 and will result in a static assertion failure. 2459 */ 2460 auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100) 2461 if (is(typeof(C2.init()) : bool) && 2462 Parameters!C1.length == 1 && 2463 Parameters!C2.length == 0 && 2464 isArray!(Parameters!C1[0]) 2465 ) { 2466 auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers); 2467 return asyncBuf(roundRobin, nBuffers / 2); 2468 } 2469 2470 /// 2471 template reduce(functions...) 2472 { 2473 /** 2474 Parallel reduce on a random access range. Except as otherwise noted, 2475 usage is similar to $(REF _reduce, std,algorithm,iteration). There is 2476 also $(LREF fold) which does the same thing with a different parameter 2477 order. 2478 2479 This function works by splitting the range to be reduced into work 2480 units, which are slices to be reduced in parallel. Once the results 2481 from all work units are computed, a final serial reduction is performed 2482 on these results to compute the final answer. Therefore, care must be 2483 taken to choose the seed value appropriately. 2484 2485 Because the reduction is being performed in parallel, `functions` 2486 must be associative. For notational simplicity, let # be an 2487 infix operator representing `functions`. Then, (a # b) # c must equal 2488 a # (b # c). Floating point addition is not associative 2489 even though addition in exact arithmetic is. Summing floating 2490 point numbers using this function may give different results than summing 2491 serially. However, for many practical purposes floating point addition 2492 can be treated as associative. 2493 2494 Note that, since `functions` are assumed to be associative, 2495 additional optimizations are made to the serial portion of the reduction 2496 algorithm. These take advantage of the instruction level parallelism of 2497 modern CPUs, in addition to the thread-level parallelism that the rest 2498 of this module exploits. This can lead to better than linear speedups 2499 relative to $(REF _reduce, std,algorithm,iteration), especially for 2500 fine-grained benchmarks like dot products. 2501 2502 An explicit seed may be provided as the first argument. If 2503 provided, it is used as the seed for all work units and for the final 2504 reduction of results from all work units. Therefore, if it is not the 2505 identity value for the operation being performed, results may differ 2506 from those generated by $(REF _reduce, std,algorithm,iteration) or 2507 depending on how many work units are used. The next argument must be 2508 the range to be reduced. 2509 --- 2510 // Find the sum of squares of a range in parallel, using 2511 // an explicit seed. 2512 // 2513 // Timings on an Athlon 64 X2 dual core machine: 2514 // 2515 // Parallel reduce: 72 milliseconds 2516 // Using std.algorithm.reduce instead: 181 milliseconds 2517 auto nums = iota(10_000_000.0f); 2518 auto sumSquares = taskPool.reduce!"a + b"( 2519 0.0, std.algorithm.map!"a * a"(nums) 2520 ); 2521 --- 2522 2523 If no explicit seed is provided, the first element of each work unit 2524 is used as a seed. For the final reduction, the result from the first 2525 work unit is used as the seed. 2526 --- 2527 // Find the sum of a range in parallel, using the first 2528 // element of each work unit as the seed. 2529 auto sum = taskPool.reduce!"a + b"(nums); 2530 --- 2531 2532 An explicit work unit size may be specified as the last argument. 2533 Specifying too small a work unit size will effectively serialize the 2534 reduction, as the final reduction of the result of each work unit will 2535 dominate computation time. If `TaskPool.size` for this instance 2536 is zero, this parameter is ignored and one work unit is used. 2537 --- 2538 // Use a work unit size of 100. 2539 auto sum2 = taskPool.reduce!"a + b"(nums, 100); 2540 2541 // Work unit size of 100 and explicit seed. 2542 auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100); 2543 --- 2544 2545 Parallel reduce supports multiple functions, like 2546 `std.algorithm.reduce`. 2547 --- 2548 // Find both the min and max of nums. 2549 auto minMax = taskPool.reduce!(min, max)(nums); 2550 assert(minMax[0] == reduce!min(nums)); 2551 assert(minMax[1] == reduce!max(nums)); 2552 --- 2553 2554 $(B Exception Handling): 2555 2556 After this function is finished executing, any exceptions thrown 2557 are chained together via `Throwable.next` and rethrown. The chaining 2558 order is non-deterministic. 2559 2560 See_Also: 2561 2562 $(LREF fold) is functionally equivalent to $(LREF _reduce) except the 2563 range parameter comes first and there is no need to use 2564 $(REF_ALTTEXT `tuple`,tuple,std,typecons) for multiple seeds. 2565 */ 2566 auto reduce(Args...)(Args args) 2567 { 2568 import core.exception : OutOfMemoryError; 2569 import core.internal.lifetime : emplaceRef; 2570 import std.exception : enforce; 2571 2572 alias fun = reduceAdjoin!functions; 2573 alias finishFun = reduceFinish!functions; 2574 2575 static if (isIntegral!(Args[$ - 1])) 2576 { 2577 size_t workUnitSize = cast(size_t) args[$ - 1]; 2578 alias args2 = args[0..$ - 1]; 2579 alias Args2 = Args[0..$ - 1]; 2580 } 2581 else 2582 { 2583 alias args2 = args; 2584 alias Args2 = Args; 2585 } 2586 2587 auto makeStartValue(Type)(Type e) 2588 { 2589 static if (functions.length == 1) 2590 { 2591 return e; 2592 } 2593 else 2594 { 2595 typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void; 2596 foreach (i, T; seed.Types) 2597 { 2598 emplaceRef(seed.expand[i], e); 2599 } 2600 2601 return seed; 2602 } 2603 } 2604 2605 static if (args2.length == 2) 2606 { 2607 static assert(isInputRange!(Args2[1])); 2608 alias range = args2[1]; 2609 alias seed = args2[0]; 2610 enum explicitSeed = true; 2611 2612 static if (!is(typeof(workUnitSize))) 2613 { 2614 size_t workUnitSize = defaultWorkUnitSize(range.length); 2615 } 2616 } 2617 else 2618 { 2619 static assert(args2.length == 1); 2620 alias range = args2[0]; 2621 2622 static if (!is(typeof(workUnitSize))) 2623 { 2624 size_t workUnitSize = defaultWorkUnitSize(range.length); 2625 } 2626 2627 enforce(!range.empty, 2628 "Cannot reduce an empty range with first element as start value."); 2629 2630 auto seed = makeStartValue(range.front); 2631 enum explicitSeed = false; 2632 range.popFront(); 2633 } 2634 2635 alias E = typeof(seed); 2636 alias R = typeof(range); 2637 2638 E reduceOnRange(R range, size_t lowerBound, size_t upperBound) 2639 { 2640 // This is for exploiting instruction level parallelism by 2641 // using multiple accumulator variables within each thread, 2642 // since we're assuming functions are associative anyhow. 2643 2644 // This is so that loops can be unrolled automatically. 2645 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5); 2646 enum nILP = ilpTuple.length; 2647 immutable subSize = (upperBound - lowerBound) / nILP; 2648 2649 if (subSize <= 1) 2650 { 2651 // Handle as a special case. 2652 static if (explicitSeed) 2653 { 2654 E result = seed; 2655 } 2656 else 2657 { 2658 E result = makeStartValue(range[lowerBound]); 2659 lowerBound++; 2660 } 2661 2662 foreach (i; lowerBound .. upperBound) 2663 { 2664 result = fun(result, range[i]); 2665 } 2666 2667 return result; 2668 } 2669 2670 assert(subSize > 1); 2671 E[nILP] results; 2672 size_t[nILP] offsets; 2673 2674 foreach (i; ilpTuple) 2675 { 2676 offsets[i] = lowerBound + subSize * i; 2677 2678 static if (explicitSeed) 2679 { 2680 results[i] = seed; 2681 } 2682 else 2683 { 2684 results[i] = makeStartValue(range[offsets[i]]); 2685 offsets[i]++; 2686 } 2687 } 2688 2689 immutable nLoop = subSize - (!explicitSeed); 2690 foreach (i; 0 .. nLoop) 2691 { 2692 foreach (j; ilpTuple) 2693 { 2694 results[j] = fun(results[j], range[offsets[j]]); 2695 offsets[j]++; 2696 } 2697 } 2698 2699 // Finish the remainder. 2700 foreach (i; nILP * subSize + lowerBound .. upperBound) 2701 { 2702 results[$ - 1] = fun(results[$ - 1], range[i]); 2703 } 2704 2705 foreach (i; ilpTuple[1..$]) 2706 { 2707 results[0] = finishFun(results[0], results[i]); 2708 } 2709 2710 return results[0]; 2711 } 2712 2713 immutable len = range.length; 2714 if (len == 0) 2715 { 2716 return seed; 2717 } 2718 2719 if (this.size == 0) 2720 { 2721 return finishFun(seed, reduceOnRange(range, 0, len)); 2722 } 2723 2724 // Unlike the rest of the functions here, I can't use the Task object 2725 // recycling trick here because this has to work on non-commutative 2726 // operations. After all the tasks are done executing, fun() has to 2727 // be applied on the results of these to get a final result, but 2728 // it can't be evaluated out of order. 2729 2730 if (workUnitSize > len) 2731 { 2732 workUnitSize = len; 2733 } 2734 2735 immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1); 2736 assert(nWorkUnits * workUnitSize >= len); 2737 2738 alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t); 2739 RTask[] tasks; 2740 2741 // Can't use alloca() due to https://issues.dlang.org/show_bug.cgi?id=3753 2742 // Use a fixed buffer backed by malloc(). 2743 enum maxStack = 2_048; 2744 byte[maxStack] buf = void; 2745 immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof; 2746 2747 import core.stdc.stdlib : malloc, free; 2748 if (nBytesNeeded <= maxStack) 2749 { 2750 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits]; 2751 } 2752 else 2753 { 2754 auto ptr = cast(RTask*) malloc(nBytesNeeded); 2755 if (!ptr) 2756 { 2757 throw new OutOfMemoryError( 2758 "Out of memory in std.parallelism." 2759 ); 2760 } 2761 2762 tasks = ptr[0 .. nWorkUnits]; 2763 } 2764 2765 scope(exit) 2766 { 2767 if (nBytesNeeded > maxStack) 2768 { 2769 free(tasks.ptr); 2770 } 2771 } 2772 2773 // Hack to take the address of a nested function w/o 2774 // making a closure. 2775 static auto scopedAddress(D)(scope D del) @system 2776 { 2777 auto tmp = del; 2778 return tmp; 2779 } 2780 2781 size_t curPos = 0; 2782 void useTask(ref RTask task) 2783 { 2784 import std.algorithm.comparison : min; 2785 import core.lifetime : emplace; 2786 2787 // Private constructor, so can't feed it's arguments directly 2788 // to emplace 2789 emplace(&task, RTask 2790 ( 2791 scopedAddress(&reduceOnRange), 2792 range, 2793 curPos, // lower bound. 2794 cast() min(len, curPos + workUnitSize) // upper bound. 2795 )); 2796 2797 task.pool = this; 2798 2799 curPos += workUnitSize; 2800 } 2801 2802 foreach (ref task; tasks) 2803 { 2804 useTask(task); 2805 } 2806 2807 foreach (i; 1 .. tasks.length - 1) 2808 { 2809 tasks[i].next = tasks[i + 1].basePtr; 2810 tasks[i + 1].prev = tasks[i].basePtr; 2811 } 2812 2813 if (tasks.length > 1) 2814 { 2815 queueLock(); 2816 scope(exit) queueUnlock(); 2817 2818 abstractPutGroupNoSync( 2819 tasks[1].basePtr, 2820 tasks[$ - 1].basePtr 2821 ); 2822 } 2823 2824 if (tasks.length > 0) 2825 { 2826 try 2827 { 2828 tasks[0].job(); 2829 } 2830 catch (Throwable e) 2831 { 2832 tasks[0].exception = e; 2833 } 2834 tasks[0].taskStatus = TaskStatus.done; 2835 2836 // Try to execute each of these in the current thread 2837 foreach (ref task; tasks[1..$]) 2838 { 2839 tryDeleteExecute(task.basePtr); 2840 } 2841 } 2842 2843 // Now that we've tried to execute every task, they're all either 2844 // done or in progress. Force all of them. 2845 E result = seed; 2846 2847 Throwable firstException; 2848 2849 foreach (ref task; tasks) 2850 { 2851 try 2852 { 2853 task.yieldForce; 2854 } 2855 catch (Throwable e) 2856 { 2857 /* Chain e to front because order doesn't matter and because 2858 * e is not likely to be a chain itself (so fewer traversals) 2859 */ 2860 firstException = Throwable.chainTogether(e, firstException); 2861 continue; 2862 } 2863 2864 if (!firstException) result = finishFun(result, task.returnVal); 2865 } 2866 2867 if (firstException) throw firstException; 2868 2869 return result; 2870 } 2871 } 2872 2873 /// 2874 template fold(functions...) 2875 { 2876 /** Implements the homonym function (also known as `accumulate`, `compress`, 2877 `inject`, or `foldl`) present in various programming languages of 2878 functional flavor. 2879 2880 `fold` is functionally equivalent to $(LREF reduce) except the range 2881 parameter comes first and there is no need to use $(REF_ALTTEXT 2882 `tuple`,tuple,std,typecons) for multiple seeds. 2883 2884 There may be one or more callable entities (`functions` argument) to 2885 apply. 2886 2887 Params: 2888 args = Just the range to _fold over; or the range and one seed 2889 per function; or the range, one seed per function, and 2890 the work unit size 2891 2892 Returns: 2893 The accumulated result as a single value for single function and 2894 as a tuple of values for multiple functions 2895 2896 See_Also: 2897 Similar to $(REF _fold, std,algorithm,iteration), `fold` is a wrapper around $(LREF reduce). 2898 2899 Example: 2900 --- 2901 static int adder(int a, int b) 2902 { 2903 return a + b; 2904 } 2905 static int multiplier(int a, int b) 2906 { 2907 return a * b; 2908 } 2909 2910 // Just the range 2911 auto x = taskPool.fold!adder([1, 2, 3, 4]); 2912 assert(x == 10); 2913 2914 // The range and the seeds (0 and 1 below; also note multiple 2915 // functions in this example) 2916 auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1); 2917 assert(y[0] == 10); 2918 assert(y[1] == 24); 2919 2920 // The range, the seed (0), and the work unit size (20) 2921 auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20); 2922 assert(z == 10); 2923 --- 2924 */ 2925 auto fold(Args...)(Args args) 2926 { 2927 static assert(isInputRange!(Args[0]), "First argument must be an InputRange"); 2928 2929 alias range = args[0]; 2930 2931 static if (Args.length == 1) 2932 { 2933 // Just the range 2934 return reduce!functions(range); 2935 } 2936 else static if (Args.length == 1 + functions.length || 2937 Args.length == 1 + functions.length + 1) 2938 { 2939 static if (functions.length == 1) 2940 { 2941 alias seeds = args[1]; 2942 } 2943 else 2944 { 2945 auto seeds() 2946 { 2947 import std.typecons : tuple; 2948 return tuple(args[1 .. functions.length+1]); 2949 } 2950 } 2951 2952 static if (Args.length == 1 + functions.length) 2953 { 2954 // The range and the seeds 2955 return reduce!functions(seeds, range); 2956 } 2957 else static if (Args.length == 1 + functions.length + 1) 2958 { 2959 // The range, the seeds, and the work unit size 2960 static assert(isIntegral!(Args[$-1]), "Work unit size must be an integral type"); 2961 return reduce!functions(seeds, range, args[$-1]); 2962 } 2963 } 2964 else 2965 { 2966 import std.conv : text; 2967 static assert(0, "Invalid number of arguments (" ~ Args.length.text ~ "): Should be an input range, " 2968 ~ functions.length.text ~ " optional seed(s), and an optional work unit size."); 2969 } 2970 } 2971 } 2972 2973 // This test is not included in the documentation because even though these 2974 // examples are for the inner fold() template, with their current location, 2975 // they would appear under the outer one. (We can't move this inside the 2976 // outer fold() template because then dmd runs out of memory possibly due to 2977 // recursive template instantiation, which is surprisingly not caught.) 2978 @system unittest 2979 { 2980 // Just the range 2981 auto x = taskPool.fold!"a + b"([1, 2, 3, 4]); 2982 assert(x == 10); 2983 2984 // The range and the seeds (0 and 1 below; also note multiple 2985 // functions in this example) 2986 auto y = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 1); 2987 assert(y[0] == 10); 2988 assert(y[1] == 24); 2989 2990 // The range, the seed (0), and the work unit size (20) 2991 auto z = taskPool.fold!"a + b"([1, 2, 3, 4], 0, 20); 2992 assert(z == 10); 2993 } 2994 2995 /** 2996 Gets the index of the current thread relative to this `TaskPool`. Any 2997 thread not in this pool will receive an index of 0. The worker threads in 2998 this pool receive unique indices of 1 through `this.size`. 2999 3000 This function is useful for maintaining worker-local resources. 3001 3002 Example: 3003 --- 3004 // Execute a loop that computes the greatest common 3005 // divisor of every number from 0 through 999 with 3006 // 42 in parallel. Write the results out to 3007 // a set of files, one for each thread. This allows 3008 // results to be written out without any synchronization. 3009 3010 import std.conv, std.range, std.numeric, std.stdio; 3011 3012 void main() 3013 { 3014 auto filesHandles = new File[taskPool.size + 1]; 3015 scope(exit) { 3016 foreach (ref handle; fileHandles) 3017 { 3018 handle.close(); 3019 } 3020 } 3021 3022 foreach (i, ref handle; fileHandles) 3023 { 3024 handle = File("workerResults" ~ to!string(i) ~ ".txt"); 3025 } 3026 3027 foreach (num; parallel(iota(1_000))) 3028 { 3029 auto outHandle = fileHandles[taskPool.workerIndex]; 3030 outHandle.writeln(num, '\t', gcd(num, 42)); 3031 } 3032 } 3033 --- 3034 */ 3035 size_t workerIndex() @property @safe const nothrow 3036 { 3037 immutable rawInd = threadIndex; 3038 return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ? 3039 (rawInd - instanceStartIndex + 1) : 0; 3040 } 3041 3042 /** 3043 Struct for creating worker-local storage. Worker-local storage is 3044 thread-local storage that exists only for worker threads in a given 3045 `TaskPool` plus a single thread outside the pool. It is allocated on the 3046 garbage collected heap in a way that avoids _false sharing, and doesn't 3047 necessarily have global scope within any thread. It can be accessed from 3048 any worker thread in the `TaskPool` that created it, and one thread 3049 outside this `TaskPool`. All threads outside the pool that created a 3050 given instance of worker-local storage share a single slot. 3051 3052 Since the underlying data for this struct is heap-allocated, this struct 3053 has reference semantics when passed between functions. 3054 3055 The main uses cases for `WorkerLocalStorage` are: 3056 3057 1. Performing parallel reductions with an imperative, as opposed to 3058 functional, programming style. In this case, it's useful to treat 3059 `WorkerLocalStorage` as local to each thread for only the parallel 3060 portion of an algorithm. 3061 3062 2. Recycling temporary buffers across iterations of a parallel foreach loop. 3063 3064 Example: 3065 --- 3066 // Calculate pi as in our synopsis example, but 3067 // use an imperative instead of a functional style. 3068 immutable n = 1_000_000_000; 3069 immutable delta = 1.0L / n; 3070 3071 auto sums = taskPool.workerLocalStorage(0.0L); 3072 foreach (i; parallel(iota(n))) 3073 { 3074 immutable x = ( i - 0.5L ) * delta; 3075 immutable toAdd = delta / ( 1.0 + x * x ); 3076 sums.get += toAdd; 3077 } 3078 3079 // Add up the results from each worker thread. 3080 real pi = 0; 3081 foreach (threadResult; sums.toRange) 3082 { 3083 pi += 4.0L * threadResult; 3084 } 3085 --- 3086 */ 3087 static struct WorkerLocalStorage(T) 3088 { 3089 private: 3090 TaskPool pool; 3091 size_t size; 3092 3093 size_t elemSize; 3094 bool* stillThreadLocal; 3095 3096 static size_t roundToLine(size_t num) pure nothrow 3097 { 3098 if (num % cacheLineSize == 0) 3099 { 3100 return num; 3101 } 3102 else 3103 { 3104 return ((num / cacheLineSize) + 1) * cacheLineSize; 3105 } 3106 } 3107 3108 void* data; 3109 3110 void initialize(TaskPool pool) 3111 { 3112 this.pool = pool; 3113 size = pool.size + 1; 3114 stillThreadLocal = new bool; 3115 *stillThreadLocal = true; 3116 3117 // Determines whether the GC should scan the array. 3118 auto blkInfo = (typeid(T).flags & 1) ? 3119 cast(GC.BlkAttr) 0 : 3120 GC.BlkAttr.NO_SCAN; 3121 3122 immutable nElem = pool.size + 1; 3123 elemSize = roundToLine(T.sizeof); 3124 3125 // The + 3 is to pad one full cache line worth of space on either side 3126 // of the data structure to make sure false sharing with completely 3127 // unrelated heap data is prevented, and to provide enough padding to 3128 // make sure that data is cache line-aligned. 3129 data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize; 3130 3131 // Cache line align data ptr. 3132 data = cast(void*) roundToLine(cast(size_t) data); 3133 3134 foreach (i; 0 .. nElem) 3135 { 3136 this.opIndex(i) = T.init; 3137 } 3138 } 3139 3140 ref opIndex(this Qualified)(size_t index) 3141 { 3142 import std.conv : text; 3143 assert(index < size, text(index, '\t', uint.max)); 3144 return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index)); 3145 } 3146 3147 void opIndexAssign(T val, size_t index) 3148 { 3149 assert(index < size); 3150 *(cast(T*) (data + elemSize * index)) = val; 3151 } 3152 3153 public: 3154 /** 3155 Get the current thread's instance. Returns by ref. 3156 Note that calling `get` from any thread 3157 outside the `TaskPool` that created this instance will return the 3158 same reference, so an instance of worker-local storage should only be 3159 accessed from one thread outside the pool that created it. If this 3160 rule is violated, undefined behavior will result. 3161 3162 If assertions are enabled and `toRange` has been called, then this 3163 WorkerLocalStorage instance is no longer worker-local and an assertion 3164 failure will result when calling this method. This is not checked 3165 when assertions are disabled for performance reasons. 3166 */ 3167 ref get(this Qualified)() @property 3168 { 3169 assert(*stillThreadLocal, 3170 "Cannot call get() on this instance of WorkerLocalStorage " ~ 3171 "because it is no longer worker-local." 3172 ); 3173 return opIndex(pool.workerIndex); 3174 } 3175 3176 /** 3177 Assign a value to the current thread's instance. This function has 3178 the same caveats as its overload. 3179 */ 3180 void get(T val) @property 3181 { 3182 assert(*stillThreadLocal, 3183 "Cannot call get() on this instance of WorkerLocalStorage " ~ 3184 "because it is no longer worker-local." 3185 ); 3186 3187 opIndexAssign(val, pool.workerIndex); 3188 } 3189 3190 /** 3191 Returns a range view of the values for all threads, which can be used 3192 to further process the results of each thread after running the parallel 3193 part of your algorithm. Do not use this method in the parallel portion 3194 of your algorithm. 3195 3196 Calling this function sets a flag indicating that this struct is no 3197 longer worker-local, and attempting to use the `get` method again 3198 will result in an assertion failure if assertions are enabled. 3199 */ 3200 WorkerLocalStorageRange!T toRange() @property 3201 { 3202 if (*stillThreadLocal) 3203 { 3204 *stillThreadLocal = false; 3205 3206 // Make absolutely sure results are visible to all threads. 3207 // This is probably not necessary since some other 3208 // synchronization primitive will be used to signal that the 3209 // parallel part of the algorithm is done, but the 3210 // performance impact should be negligible, so it's better 3211 // to be safe. 3212 ubyte barrierDummy; 3213 atomicSetUbyte(barrierDummy, 1); 3214 } 3215 3216 return WorkerLocalStorageRange!T(this); 3217 } 3218 } 3219 3220 /** 3221 Range primitives for worker-local storage. The purpose of this is to 3222 access results produced by each worker thread from a single thread once you 3223 are no longer using the worker-local storage from multiple threads. 3224 Do not use this struct in the parallel portion of your algorithm. 3225 3226 The proper way to instantiate this object is to call 3227 `WorkerLocalStorage.toRange`. Once instantiated, this object behaves 3228 as a finite random-access range with assignable, lvalue elements and 3229 a length equal to the number of worker threads in the `TaskPool` that 3230 created it plus 1. 3231 */ 3232 static struct WorkerLocalStorageRange(T) 3233 { 3234 private: 3235 WorkerLocalStorage!T workerLocalStorage; 3236 3237 size_t _length; 3238 size_t beginOffset; 3239 3240 this(WorkerLocalStorage!T wl) 3241 { 3242 this.workerLocalStorage = wl; 3243 _length = wl.size; 3244 } 3245 3246 public: 3247 ref front(this Qualified)() @property 3248 { 3249 return this[0]; 3250 } 3251 3252 ref back(this Qualified)() @property 3253 { 3254 return this[_length - 1]; 3255 } 3256 3257 void popFront() 3258 { 3259 if (_length > 0) 3260 { 3261 beginOffset++; 3262 _length--; 3263 } 3264 } 3265 3266 void popBack() 3267 { 3268 if (_length > 0) 3269 { 3270 _length--; 3271 } 3272 } 3273 3274 typeof(this) save() @property 3275 { 3276 return this; 3277 } 3278 3279 ref opIndex(this Qualified)(size_t index) 3280 { 3281 assert(index < _length); 3282 return workerLocalStorage[index + beginOffset]; 3283 } 3284 3285 void opIndexAssign(T val, size_t index) 3286 { 3287 assert(index < _length); 3288 workerLocalStorage[index] = val; 3289 } 3290 3291 typeof(this) opSlice(size_t lower, size_t upper) 3292 { 3293 assert(upper <= _length); 3294 auto newWl = this.workerLocalStorage; 3295 newWl.data += lower * newWl.elemSize; 3296 newWl.size = upper - lower; 3297 return typeof(this)(newWl); 3298 } 3299 3300 bool empty() const @property 3301 { 3302 return length == 0; 3303 } 3304 3305 size_t length() const @property 3306 { 3307 return _length; 3308 } 3309 } 3310 3311 /** 3312 Creates an instance of worker-local storage, initialized with a given 3313 value. The value is `lazy` so that you can, for example, easily 3314 create one instance of a class for each worker. For usage example, 3315 see the `WorkerLocalStorage` struct. 3316 */ 3317 WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init) 3318 { 3319 WorkerLocalStorage!T ret; 3320 ret.initialize(this); 3321 foreach (i; 0 .. size + 1) 3322 { 3323 ret[i] = initialVal; 3324 } 3325 3326 // Memory barrier to make absolutely sure that what we wrote is 3327 // visible to worker threads. 3328 ubyte barrierDummy; 3329 atomicSetUbyte(barrierDummy, 0); 3330 3331 return ret; 3332 } 3333 3334 /** 3335 Signals to all worker threads to terminate as soon as they are finished 3336 with their current `Task`, or immediately if they are not executing a 3337 `Task`. `Task`s that were in queue will not be executed unless 3338 a call to `Task.workForce`, `Task.yieldForce` or `Task.spinForce` 3339 causes them to be executed. 3340 3341 Use only if you have waited on every `Task` and therefore know the 3342 queue is empty, or if you speculatively executed some tasks and no longer 3343 need the results. 3344 */ 3345 void stop() @trusted 3346 { 3347 queueLock(); 3348 scope(exit) queueUnlock(); 3349 atomicSetUbyte(status, PoolState.stopNow); 3350 notifyAll(); 3351 } 3352 3353 /** 3354 Signals worker threads to terminate when the queue becomes empty. 3355 3356 If blocking argument is true, wait for all worker threads to terminate 3357 before returning. This option might be used in applications where 3358 task results are never consumed-- e.g. when `TaskPool` is employed as a 3359 rudimentary scheduler for tasks which communicate by means other than 3360 return values. 3361 3362 Warning: Calling this function with $(D blocking = true) from a worker 3363 thread that is a member of the same `TaskPool` that 3364 `finish` is being called on will result in a deadlock. 3365 */ 3366 void finish(bool blocking = false) @trusted 3367 { 3368 { 3369 queueLock(); 3370 scope(exit) queueUnlock(); 3371 atomicCasUbyte(status, PoolState.running, PoolState.finishing); 3372 notifyAll(); 3373 } 3374 if (blocking) 3375 { 3376 // Use this thread as a worker until everything is finished. 3377 executeWorkLoop(); 3378 3379 foreach (t; pool) 3380 { 3381 // Maybe there should be something here to prevent a thread 3382 // from calling join() on itself if this function is called 3383 // from a worker thread in the same pool, but: 3384 // 3385 // 1. Using an if statement to skip join() would result in 3386 // finish() returning without all tasks being finished. 3387 // 3388 // 2. If an exception were thrown, it would bubble up to the 3389 // Task from which finish() was called and likely be 3390 // swallowed. 3391 t.join(); 3392 } 3393 } 3394 } 3395 3396 /// Returns the number of worker threads in the pool. 3397 @property size_t size() @safe const pure nothrow 3398 { 3399 return pool.length; 3400 } 3401 3402 /** 3403 Put a `Task` object on the back of the task queue. The `Task` 3404 object may be passed by pointer or reference. 3405 3406 Example: 3407 --- 3408 import std.file; 3409 3410 // Create a task. 3411 auto t = task!read("foo.txt"); 3412 3413 // Add it to the queue to be executed. 3414 taskPool.put(t); 3415 --- 3416 3417 Notes: 3418 3419 @trusted overloads of this function are called for `Task`s if 3420 $(REF hasUnsharedAliasing, std,traits) is false for the `Task`'s 3421 return type or the function the `Task` executes is `pure`. 3422 `Task` objects that meet all other requirements specified in the 3423 `@trusted` overloads of `task` and `scopedTask` may be created 3424 and executed from `@safe` code via `Task.executeInNewThread` but 3425 not via `TaskPool`. 3426 3427 While this function takes the address of variables that may 3428 be on the stack, some overloads are marked as @trusted. 3429 `Task` includes a destructor that waits for the task to complete 3430 before destroying the stack frame it is allocated on. Therefore, 3431 it is impossible for the stack frame to be destroyed before the task is 3432 complete and no longer referenced by a `TaskPool`. 3433 */ 3434 void put(alias fun, Args...)(ref Task!(fun, Args) task) 3435 if (!isSafeReturn!(typeof(task))) 3436 { 3437 task.pool = this; 3438 abstractPut(task.basePtr); 3439 } 3440 3441 /// Ditto 3442 void put(alias fun, Args...)(Task!(fun, Args)* task) 3443 if (!isSafeReturn!(typeof(*task))) 3444 { 3445 import std.exception : enforce; 3446 enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); 3447 put(*task); 3448 } 3449 3450 @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task) 3451 if (isSafeReturn!(typeof(task))) 3452 { 3453 task.pool = this; 3454 abstractPut(task.basePtr); 3455 } 3456 3457 @trusted void put(alias fun, Args...)(Task!(fun, Args)* task) 3458 if (isSafeReturn!(typeof(*task))) 3459 { 3460 import std.exception : enforce; 3461 enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); 3462 put(*task); 3463 } 3464 3465 /** 3466 These properties control whether the worker threads are daemon threads. 3467 A daemon thread is automatically terminated when all non-daemon threads 3468 have terminated. A non-daemon thread will prevent a program from 3469 terminating as long as it has not terminated. 3470 3471 If any `TaskPool` with non-daemon threads is active, either `stop` 3472 or `finish` must be called on it before the program can terminate. 3473 3474 The worker treads in the `TaskPool` instance returned by the 3475 `taskPool` property are daemon by default. The worker threads of 3476 manually instantiated task pools are non-daemon by default. 3477 3478 Note: For a size zero pool, the getter arbitrarily returns true and the 3479 setter has no effect. 3480 */ 3481 bool isDaemon() @property @trusted 3482 { 3483 queueLock(); 3484 scope(exit) queueUnlock(); 3485 return (size == 0) ? true : pool[0].isDaemon; 3486 } 3487 3488 /// Ditto 3489 void isDaemon(bool newVal) @property @trusted 3490 { 3491 queueLock(); 3492 scope(exit) queueUnlock(); 3493 foreach (thread; pool) 3494 { 3495 thread.isDaemon = newVal; 3496 } 3497 } 3498 3499 /** 3500 These functions allow getting and setting the OS scheduling priority of 3501 the worker threads in this `TaskPool`. They forward to 3502 `core.thread.Thread.priority`, so a given priority value here means the 3503 same thing as an identical priority value in `core.thread`. 3504 3505 Note: For a size zero pool, the getter arbitrarily returns 3506 `core.thread.Thread.PRIORITY_MIN` and the setter has no effect. 3507 */ 3508 int priority() @property @trusted 3509 { 3510 return (size == 0) ? core.thread.Thread.PRIORITY_MIN : 3511 pool[0].priority; 3512 } 3513 3514 /// Ditto 3515 void priority(int newPriority) @property @trusted 3516 { 3517 if (size > 0) 3518 { 3519 foreach (t; pool) 3520 { 3521 t.priority = newPriority; 3522 } 3523 } 3524 } 3525 } 3526 3527 @system unittest 3528 { 3529 import std.algorithm.iteration : sum; 3530 import std.range : iota; 3531 import std.typecons : tuple; 3532 3533 enum N = 100; 3534 auto r = iota(1, N + 1); 3535 const expected = r.sum(); 3536 3537 // Just the range 3538 assert(taskPool.fold!"a + b"(r) == expected); 3539 3540 // Range and seeds 3541 assert(taskPool.fold!"a + b"(r, 0) == expected); 3542 assert(taskPool.fold!("a + b", "a + b")(r, 0, 0) == tuple(expected, expected)); 3543 3544 // Range, seeds, and work unit size 3545 assert(taskPool.fold!"a + b"(r, 0, 42) == expected); 3546 assert(taskPool.fold!("a + b", "a + b")(r, 0, 0, 42) == tuple(expected, expected)); 3547 } 3548 3549 // Issue 16705 3550 @system unittest 3551 { 3552 struct MyIota 3553 { 3554 size_t front; 3555 void popFront()(){front++;} 3556 auto empty(){return front >= 25;} 3557 auto opIndex(size_t i){return front+i;} 3558 auto length(){return 25-front;} 3559 } 3560 3561 auto mySum = taskPool.reduce!"a + b"(MyIota()); 3562 } 3563 3564 /** 3565 Returns a lazily initialized global instantiation of `TaskPool`. 3566 This function can safely be called concurrently from multiple non-worker 3567 threads. The worker threads in this pool are daemon threads, meaning that it 3568 is not necessary to call `TaskPool.stop` or `TaskPool.finish` before 3569 terminating the main thread. 3570 */ 3571 @property TaskPool taskPool() @trusted 3572 { 3573 import std.concurrency : initOnce; 3574 __gshared TaskPool pool; 3575 return initOnce!pool({ 3576 auto p = new TaskPool(defaultPoolThreads); 3577 p.isDaemon = true; 3578 return p; 3579 }()); 3580 } 3581 3582 private shared uint _defaultPoolThreads = uint.max; 3583 3584 /** 3585 These properties get and set the number of worker threads in the `TaskPool` 3586 instance returned by `taskPool`. The default value is `totalCPUs` - 1. 3587 Calling the setter after the first call to `taskPool` does not changes 3588 number of worker threads in the instance returned by `taskPool`. 3589 */ 3590 @property uint defaultPoolThreads() @trusted 3591 { 3592 const local = atomicLoad(_defaultPoolThreads); 3593 return local < uint.max ? local : totalCPUs - 1; 3594 } 3595 3596 /// Ditto 3597 @property void defaultPoolThreads(uint newVal) @trusted 3598 { 3599 atomicStore(_defaultPoolThreads, newVal); 3600 } 3601 3602 /** 3603 Convenience functions that forwards to `taskPool.parallel`. The 3604 purpose of these is to make parallel foreach less verbose and more 3605 readable. 3606 3607 Example: 3608 --- 3609 // Find the logarithm of every number from 3610 // 1 to 1_000_000 in parallel, using the 3611 // default TaskPool instance. 3612 auto logs = new double[1_000_000]; 3613 3614 foreach (i, ref elem; parallel(logs)) 3615 { 3616 elem = log(i + 1.0); 3617 } 3618 --- 3619 3620 */ 3621 ParallelForeach!R parallel(R)(R range) 3622 { 3623 return taskPool.parallel(range); 3624 } 3625 3626 /// Ditto 3627 ParallelForeach!R parallel(R)(R range, size_t workUnitSize) 3628 { 3629 return taskPool.parallel(range, workUnitSize); 3630 } 3631 3632 // `each` should be usable with parallel 3633 // https://issues.dlang.org/show_bug.cgi?id=17019 3634 @system unittest 3635 { 3636 import std.algorithm.iteration : each, sum; 3637 import std.range : iota; 3638 3639 // check behavior with parallel 3640 auto arr = new int[10]; 3641 parallel(arr).each!((ref e) => e += 1); 3642 assert(arr.sum == 10); 3643 3644 auto arrIndex = new int[10]; 3645 parallel(arrIndex).each!((i, ref e) => e += i); 3646 assert(arrIndex.sum == 10.iota.sum); 3647 } 3648 3649 // https://issues.dlang.org/show_bug.cgi?id=22745 3650 @system unittest 3651 { 3652 auto pool = new TaskPool(0); 3653 int[] empty; 3654 foreach (i; pool.parallel(empty)) {} 3655 pool.finish(); 3656 } 3657 3658 // Thrown when a parallel foreach loop is broken from. 3659 class ParallelForeachError : Error 3660 { 3661 this() 3662 { 3663 super("Cannot break from a parallel foreach loop using break, return, " 3664 ~ "labeled break/continue or goto statements."); 3665 } 3666 } 3667 3668 /*------Structs that implement opApply for parallel foreach.------------------*/ 3669 private template randLen(R) 3670 { 3671 enum randLen = isRandomAccessRange!R && hasLength!R; 3672 } 3673 3674 private void submitAndExecute( 3675 TaskPool pool, 3676 scope void delegate() doIt 3677 ) 3678 { 3679 import core.exception : OutOfMemoryError; 3680 immutable nThreads = pool.size + 1; 3681 3682 alias PTask = typeof(scopedTask(doIt)); 3683 import core.stdc.stdlib : malloc, free; 3684 import core.stdc.string : memcpy; 3685 3686 // The logical thing to do would be to just use alloca() here, but that 3687 // causes problems on Windows for reasons that I don't understand 3688 // (tentatively a compiler bug) and definitely doesn't work on Posix due 3689 // to https://issues.dlang.org/show_bug.cgi?id=3753. 3690 // Therefore, allocate a fixed buffer and fall back to `malloc()` if 3691 // someone's using a ridiculous amount of threads. 3692 // Also, the using a byte array instead of a PTask array as the fixed buffer 3693 // is to prevent d'tors from being called on uninitialized excess PTask 3694 // instances. 3695 enum nBuf = 64; 3696 byte[nBuf * PTask.sizeof] buf = void; 3697 PTask[] tasks; 3698 if (nThreads <= nBuf) 3699 { 3700 tasks = (cast(PTask*) buf.ptr)[0 .. nThreads]; 3701 } 3702 else 3703 { 3704 auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof); 3705 if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism."); 3706 tasks = ptr[0 .. nThreads]; 3707 } 3708 3709 scope(exit) 3710 { 3711 if (nThreads > nBuf) 3712 { 3713 free(tasks.ptr); 3714 } 3715 } 3716 3717 foreach (ref t; tasks) 3718 { 3719 import core.stdc.string : memcpy; 3720 3721 // This silly looking code is necessary to prevent d'tors from being 3722 // called on uninitialized objects. 3723 auto temp = scopedTask(doIt); 3724 memcpy(&t, &temp, PTask.sizeof); 3725 3726 // This has to be done to t after copying, not temp before copying. 3727 // Otherwise, temp's destructor will sit here and wait for the 3728 // task to finish. 3729 t.pool = pool; 3730 } 3731 3732 foreach (i; 1 .. tasks.length - 1) 3733 { 3734 tasks[i].next = tasks[i + 1].basePtr; 3735 tasks[i + 1].prev = tasks[i].basePtr; 3736 } 3737 3738 if (tasks.length > 1) 3739 { 3740 pool.queueLock(); 3741 scope(exit) pool.queueUnlock(); 3742 3743 pool.abstractPutGroupNoSync( 3744 tasks[1].basePtr, 3745 tasks[$ - 1].basePtr 3746 ); 3747 } 3748 3749 if (tasks.length > 0) 3750 { 3751 try 3752 { 3753 tasks[0].job(); 3754 } 3755 catch (Throwable e) 3756 { 3757 tasks[0].exception = e; // nocoverage 3758 } 3759 tasks[0].taskStatus = TaskStatus.done; 3760 3761 // Try to execute each of these in the current thread 3762 foreach (ref task; tasks[1..$]) 3763 { 3764 pool.tryDeleteExecute(task.basePtr); 3765 } 3766 } 3767 3768 Throwable firstException; 3769 3770 foreach (i, ref task; tasks) 3771 { 3772 try 3773 { 3774 task.yieldForce; 3775 } 3776 catch (Throwable e) 3777 { 3778 /* Chain e to front because order doesn't matter and because 3779 * e is not likely to be a chain itself (so fewer traversals) 3780 */ 3781 firstException = Throwable.chainTogether(e, firstException); 3782 continue; 3783 } 3784 } 3785 3786 if (firstException) throw firstException; 3787 } 3788 3789 void foreachErr() 3790 { 3791 throw new ParallelForeachError(); 3792 } 3793 3794 int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg) 3795 { 3796 with(p) 3797 { 3798 int res = 0; 3799 size_t index = 0; 3800 3801 // The explicit ElementType!R in the foreach loops is necessary for 3802 // correct behavior when iterating over strings. 3803 static if (hasLvalueElements!R) 3804 { 3805 foreach (ref ElementType!R elem; range) 3806 { 3807 static if (Parameters!dg.length == 2) 3808 { 3809 res = dg(index, elem); 3810 } 3811 else 3812 { 3813 res = dg(elem); 3814 } 3815 if (res) break; 3816 index++; 3817 } 3818 } 3819 else 3820 { 3821 foreach (ElementType!R elem; range) 3822 { 3823 static if (Parameters!dg.length == 2) 3824 { 3825 res = dg(index, elem); 3826 } 3827 else 3828 { 3829 res = dg(elem); 3830 } 3831 if (res) break; 3832 index++; 3833 } 3834 } 3835 if (res) foreachErr; 3836 return res; 3837 } 3838 } 3839 3840 private enum string parallelApplyMixinRandomAccess = q{ 3841 // Handle empty thread pool as special case. 3842 if (pool.size == 0) 3843 { 3844 return doSizeZeroCase(this, dg); 3845 } 3846 3847 // Whether iteration is with or without an index variable. 3848 enum withIndex = Parameters!(typeof(dg)).length == 2; 3849 3850 shared size_t workUnitIndex = size_t.max; // Effectively -1: chunkIndex + 1 == 0 3851 immutable len = range.length; 3852 if (!len) return 0; 3853 3854 shared bool shouldContinue = true; 3855 3856 void doIt() 3857 { 3858 import std.algorithm.comparison : min; 3859 3860 scope(failure) 3861 { 3862 // If an exception is thrown, all threads should bail. 3863 atomicStore(shouldContinue, false); 3864 } 3865 3866 while (atomicLoad(shouldContinue)) 3867 { 3868 immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); 3869 immutable start = workUnitSize * myUnitIndex; 3870 if (start >= len) 3871 { 3872 atomicStore(shouldContinue, false); 3873 break; 3874 } 3875 3876 immutable end = min(len, start + workUnitSize); 3877 3878 foreach (i; start .. end) 3879 { 3880 static if (withIndex) 3881 { 3882 if (dg(i, range[i])) foreachErr(); 3883 } 3884 else 3885 { 3886 if (dg(range[i])) foreachErr(); 3887 } 3888 } 3889 } 3890 } 3891 3892 submitAndExecute(pool, &doIt); 3893 3894 return 0; 3895 }; 3896 3897 enum string parallelApplyMixinInputRange = q{ 3898 // Handle empty thread pool as special case. 3899 if (pool.size == 0) 3900 { 3901 return doSizeZeroCase(this, dg); 3902 } 3903 3904 // Whether iteration is with or without an index variable. 3905 enum withIndex = Parameters!(typeof(dg)).length == 2; 3906 3907 // This protects the range while copying it. 3908 auto rangeMutex = new Mutex(); 3909 3910 shared bool shouldContinue = true; 3911 3912 // The total number of elements that have been popped off range. 3913 // This is updated only while protected by rangeMutex; 3914 size_t nPopped = 0; 3915 3916 static if ( 3917 is(typeof(range.buf1)) && 3918 is(typeof(range.bufPos)) && 3919 is(typeof(range.doBufSwap())) 3920 ) 3921 { 3922 // Make sure we don't have the buffer recycling overload of 3923 // asyncBuf. 3924 static if ( 3925 is(typeof(range.source)) && 3926 isRoundRobin!(typeof(range.source)) 3927 ) 3928 { 3929 static assert(0, "Cannot execute a parallel foreach loop on " ~ 3930 "the buffer recycling overload of asyncBuf."); 3931 } 3932 3933 enum bool bufferTrick = true; 3934 } 3935 else 3936 { 3937 enum bool bufferTrick = false; 3938 } 3939 3940 void doIt() 3941 { 3942 scope(failure) 3943 { 3944 // If an exception is thrown, all threads should bail. 3945 atomicStore(shouldContinue, false); 3946 } 3947 3948 static if (hasLvalueElements!R) 3949 { 3950 alias Temp = ElementType!R*[]; 3951 Temp temp; 3952 3953 // Returns: The previous value of nPopped. 3954 size_t makeTemp() 3955 { 3956 import std.algorithm.internal : addressOf; 3957 import std.array : uninitializedArray; 3958 3959 if (temp is null) 3960 { 3961 temp = uninitializedArray!Temp(workUnitSize); 3962 } 3963 3964 rangeMutex.lock(); 3965 scope(exit) rangeMutex.unlock(); 3966 3967 size_t i = 0; 3968 for (; i < workUnitSize && !range.empty; range.popFront(), i++) 3969 { 3970 temp[i] = addressOf(range.front); 3971 } 3972 3973 temp = temp[0 .. i]; 3974 auto ret = nPopped; 3975 nPopped += temp.length; 3976 return ret; 3977 } 3978 3979 } 3980 else 3981 { 3982 3983 alias Temp = ElementType!R[]; 3984 Temp temp; 3985 3986 // Returns: The previous value of nPopped. 3987 static if (!bufferTrick) size_t makeTemp() 3988 { 3989 import std.array : uninitializedArray; 3990 3991 if (temp is null) 3992 { 3993 temp = uninitializedArray!Temp(workUnitSize); 3994 } 3995 3996 rangeMutex.lock(); 3997 scope(exit) rangeMutex.unlock(); 3998 3999 size_t i = 0; 4000 for (; i < workUnitSize && !range.empty; range.popFront(), i++) 4001 { 4002 temp[i] = range.front; 4003 } 4004 4005 temp = temp[0 .. i]; 4006 auto ret = nPopped; 4007 nPopped += temp.length; 4008 return ret; 4009 } 4010 4011 static if (bufferTrick) size_t makeTemp() 4012 { 4013 import std.algorithm.mutation : swap; 4014 rangeMutex.lock(); 4015 scope(exit) rangeMutex.unlock(); 4016 4017 // Elide copying by just swapping buffers. 4018 temp.length = range.buf1.length; 4019 swap(range.buf1, temp); 4020 4021 // This is necessary in case popFront() has been called on 4022 // range before entering the parallel foreach loop. 4023 temp = temp[range.bufPos..$]; 4024 4025 static if (is(typeof(range._length))) 4026 { 4027 range._length -= (temp.length - range.bufPos); 4028 } 4029 4030 range.doBufSwap(); 4031 auto ret = nPopped; 4032 nPopped += temp.length; 4033 return ret; 4034 } 4035 } 4036 4037 while (atomicLoad(shouldContinue)) 4038 { 4039 auto overallIndex = makeTemp(); 4040 if (temp.empty) 4041 { 4042 atomicStore(shouldContinue, false); 4043 break; 4044 } 4045 4046 foreach (i; 0 .. temp.length) 4047 { 4048 scope(success) overallIndex++; 4049 4050 static if (hasLvalueElements!R) 4051 { 4052 static if (withIndex) 4053 { 4054 if (dg(overallIndex, *temp[i])) foreachErr(); 4055 } 4056 else 4057 { 4058 if (dg(*temp[i])) foreachErr(); 4059 } 4060 } 4061 else 4062 { 4063 static if (withIndex) 4064 { 4065 if (dg(overallIndex, temp[i])) foreachErr(); 4066 } 4067 else 4068 { 4069 if (dg(temp[i])) foreachErr(); 4070 } 4071 } 4072 } 4073 } 4074 } 4075 4076 submitAndExecute(pool, &doIt); 4077 4078 return 0; 4079 }; 4080 4081 4082 private struct ParallelForeach(R) 4083 { 4084 TaskPool pool; 4085 R range; 4086 size_t workUnitSize; 4087 alias E = ElementType!R; 4088 4089 static if (hasLvalueElements!R) 4090 { 4091 alias NoIndexDg = int delegate(ref E); 4092 alias IndexDg = int delegate(size_t, ref E); 4093 } 4094 else 4095 { 4096 alias NoIndexDg = int delegate(E); 4097 alias IndexDg = int delegate(size_t, E); 4098 } 4099 4100 int opApply(scope NoIndexDg dg) 4101 { 4102 static if (randLen!R) 4103 { 4104 mixin(parallelApplyMixinRandomAccess); 4105 } 4106 else 4107 { 4108 mixin(parallelApplyMixinInputRange); 4109 } 4110 } 4111 4112 int opApply(scope IndexDg dg) 4113 { 4114 static if (randLen!R) 4115 { 4116 mixin(parallelApplyMixinRandomAccess); 4117 } 4118 else 4119 { 4120 mixin(parallelApplyMixinInputRange); 4121 } 4122 } 4123 } 4124 4125 /* 4126 This struct buffers the output of a callable that outputs data into a 4127 user-supplied buffer into a set of buffers of some fixed size. It allows these 4128 buffers to be accessed with an input range interface. This is used internally 4129 in the buffer-recycling overload of TaskPool.asyncBuf, which creates an 4130 instance and forwards it to the input range overload of asyncBuf. 4131 */ 4132 private struct RoundRobinBuffer(C1, C2) 4133 { 4134 // No need for constraints because they're already checked for in asyncBuf. 4135 4136 alias Array = Parameters!(C1.init)[0]; 4137 alias T = typeof(Array.init[0]); 4138 4139 T[][] bufs; 4140 size_t index; 4141 C1 nextDel; 4142 C2 emptyDel; 4143 bool _empty; 4144 bool primed; 4145 4146 this( 4147 C1 nextDel, 4148 C2 emptyDel, 4149 size_t initialBufSize, 4150 size_t nBuffers 4151 ) { 4152 this.nextDel = nextDel; 4153 this.emptyDel = emptyDel; 4154 bufs.length = nBuffers; 4155 4156 foreach (ref buf; bufs) 4157 { 4158 buf.length = initialBufSize; 4159 } 4160 } 4161 4162 void prime() 4163 in 4164 { 4165 assert(!empty); 4166 } 4167 do 4168 { 4169 scope(success) primed = true; 4170 nextDel(bufs[index]); 4171 } 4172 4173 4174 T[] front() @property 4175 in 4176 { 4177 assert(!empty); 4178 } 4179 do 4180 { 4181 if (!primed) prime(); 4182 return bufs[index]; 4183 } 4184 4185 void popFront() 4186 { 4187 if (empty || emptyDel()) 4188 { 4189 _empty = true; 4190 return; 4191 } 4192 4193 index = (index + 1) % bufs.length; 4194 primed = false; 4195 } 4196 4197 bool empty() @property const @safe pure nothrow 4198 { 4199 return _empty; 4200 } 4201 } 4202 4203 version (StdUnittest) 4204 { 4205 // This was the only way I could get nested maps to work. 4206 private __gshared TaskPool poolInstance; 4207 } 4208 4209 // These test basic functionality but don't stress test for threading bugs. 4210 // These are the tests that should be run every time Phobos is compiled. 4211 @system unittest 4212 { 4213 import std.algorithm.comparison : equal, min, max; 4214 import std.algorithm.iteration : filter, map, reduce; 4215 import std.array : split; 4216 import std.conv : text; 4217 import std.exception : assertThrown; 4218 import std.math.operations : isClose; 4219 import std.math.algebraic : sqrt, abs; 4220 import std.math.exponential : log; 4221 import std.range : indexed, iota, join; 4222 import std.typecons : Tuple, tuple; 4223 import std.stdio; 4224 4225 poolInstance = new TaskPool(2); 4226 scope(exit) poolInstance.stop(); 4227 4228 // The only way this can be verified is manually. 4229 debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs); 4230 4231 auto oldPriority = poolInstance.priority; 4232 poolInstance.priority = Thread.PRIORITY_MAX; 4233 assert(poolInstance.priority == Thread.PRIORITY_MAX); 4234 4235 poolInstance.priority = Thread.PRIORITY_MIN; 4236 assert(poolInstance.priority == Thread.PRIORITY_MIN); 4237 4238 poolInstance.priority = oldPriority; 4239 assert(poolInstance.priority == oldPriority); 4240 4241 static void refFun(ref uint num) 4242 { 4243 num++; 4244 } 4245 4246 uint x; 4247 4248 // Test task(). 4249 auto t = task!refFun(x); 4250 poolInstance.put(t); 4251 t.yieldForce; 4252 assert(t.args[0] == 1); 4253 4254 auto t2 = task(&refFun, x); 4255 poolInstance.put(t2); 4256 t2.yieldForce; 4257 assert(t2.args[0] == 1); 4258 4259 // Test scopedTask(). 4260 auto st = scopedTask!refFun(x); 4261 poolInstance.put(st); 4262 st.yieldForce; 4263 assert(st.args[0] == 1); 4264 4265 auto st2 = scopedTask(&refFun, x); 4266 poolInstance.put(st2); 4267 st2.yieldForce; 4268 assert(st2.args[0] == 1); 4269 4270 // Test executeInNewThread(). 4271 auto ct = scopedTask!refFun(x); 4272 ct.executeInNewThread(Thread.PRIORITY_MAX); 4273 ct.yieldForce; 4274 assert(ct.args[0] == 1); 4275 4276 // Test ref return. 4277 uint toInc = 0; 4278 static ref T makeRef(T)(ref T num) 4279 { 4280 return num; 4281 } 4282 4283 auto t3 = task!makeRef(toInc); 4284 taskPool.put(t3); 4285 assert(t3.args[0] == 0); 4286 t3.spinForce++; 4287 assert(t3.args[0] == 1); 4288 4289 static void testSafe() @safe { 4290 static int bump(int num) 4291 { 4292 return num + 1; 4293 } 4294 4295 auto safePool = new TaskPool(0); 4296 auto t = task(&bump, 1); 4297 taskPool.put(t); 4298 assert(t.yieldForce == 2); 4299 4300 auto st = scopedTask(&bump, 1); 4301 taskPool.put(st); 4302 assert(st.yieldForce == 2); 4303 safePool.stop(); 4304 } 4305 4306 auto arr = [1,2,3,4,5]; 4307 auto nums = new uint[5]; 4308 auto nums2 = new uint[5]; 4309 4310 foreach (i, ref elem; poolInstance.parallel(arr)) 4311 { 4312 elem++; 4313 nums[i] = cast(uint) i + 2; 4314 nums2[i] = elem; 4315 } 4316 4317 assert(nums == [2,3,4,5,6], text(nums)); 4318 assert(nums2 == nums, text(nums2)); 4319 assert(arr == nums, text(arr)); 4320 4321 // Test const/immutable arguments. 4322 static int add(int lhs, int rhs) 4323 { 4324 return lhs + rhs; 4325 } 4326 immutable addLhs = 1; 4327 immutable addRhs = 2; 4328 auto addTask = task(&add, addLhs, addRhs); 4329 auto addScopedTask = scopedTask(&add, addLhs, addRhs); 4330 poolInstance.put(addTask); 4331 poolInstance.put(addScopedTask); 4332 assert(addTask.yieldForce == 3); 4333 assert(addScopedTask.yieldForce == 3); 4334 4335 // Test parallel foreach with non-random access range. 4336 auto range = filter!"a != 666"([0, 1, 2, 3, 4]); 4337 4338 foreach (i, elem; poolInstance.parallel(range)) 4339 { 4340 nums[i] = cast(uint) i; 4341 } 4342 4343 assert(nums == [0,1,2,3,4]); 4344 4345 auto logs = new double[1_000_000]; 4346 foreach (i, ref elem; poolInstance.parallel(logs)) 4347 { 4348 elem = log(i + 1.0); 4349 } 4350 4351 foreach (i, elem; logs) 4352 { 4353 assert(isClose(elem, log(double(i + 1)))); 4354 } 4355 4356 assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]); 4357 assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]); 4358 assert(poolInstance.amap!("a * a", "-a")([1,2,3]) == 4359 [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4360 4361 auto tupleBuf = new Tuple!(int, int)[3]; 4362 poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf); 4363 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4364 poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf); 4365 assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); 4366 4367 // Test amap with a non-array buffer. 4368 auto toIndex = new int[5]; 4369 auto ind = indexed(toIndex, [3, 1, 4, 0, 2]); 4370 poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind); 4371 assert(equal(ind, [2, 4, 6, 8, 10])); 4372 assert(equal(toIndex, [8, 4, 10, 2, 6])); 4373 poolInstance.amap!"a / 2"(ind, ind); 4374 assert(equal(ind, [1, 2, 3, 4, 5])); 4375 assert(equal(toIndex, [4, 2, 5, 1, 3])); 4376 4377 auto buf = new int[5]; 4378 poolInstance.amap!"a * a"([1,2,3,4,5], buf); 4379 assert(buf == [1,4,9,16,25]); 4380 poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf); 4381 assert(buf == [1,4,9,16,25]); 4382 4383 assert(poolInstance.reduce!"a + b"([1]) == 1); 4384 assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10); 4385 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10); 4386 assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10); 4387 assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4)); 4388 assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) == 4389 tuple(10, 24)); 4390 4391 immutable serialAns = reduce!"a + b"(iota(1000)); 4392 assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns); 4393 assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns); 4394 4395 // Test worker-local storage. 4396 auto wl = poolInstance.workerLocalStorage(0); 4397 foreach (i; poolInstance.parallel(iota(1000), 1)) 4398 { 4399 wl.get = wl.get + i; 4400 } 4401 4402 auto wlRange = wl.toRange; 4403 auto parallelSum = poolInstance.reduce!"a + b"(wlRange); 4404 assert(parallelSum == 499500); 4405 assert(wlRange[0 .. 1][0] == wlRange[0]); 4406 assert(wlRange[1 .. 2][0] == wlRange[1]); 4407 4408 // Test finish() 4409 { 4410 static void slowFun() { Thread.sleep(dur!"msecs"(1)); } 4411 4412 auto pool1 = new TaskPool(); 4413 auto tSlow = task!slowFun(); 4414 pool1.put(tSlow); 4415 pool1.finish(); 4416 tSlow.yieldForce; 4417 // Can't assert that pool1.status == PoolState.stopNow because status 4418 // doesn't change until after the "done" flag is set and the waiting 4419 // thread is woken up. 4420 4421 auto pool2 = new TaskPool(); 4422 auto tSlow2 = task!slowFun(); 4423 pool2.put(tSlow2); 4424 pool2.finish(true); // blocking 4425 assert(tSlow2.done); 4426 4427 // Test fix for https://issues.dlang.org/show_bug.cgi?id=8582 by making pool size zero. 4428 auto pool3 = new TaskPool(0); 4429 auto tSlow3 = task!slowFun(); 4430 pool3.put(tSlow3); 4431 pool3.finish(true); // blocking 4432 assert(tSlow3.done); 4433 4434 // This is correct because no thread will terminate unless pool2.status 4435 // and pool3.status have already been set to stopNow. 4436 assert(pool2.status == TaskPool.PoolState.stopNow); 4437 assert(pool3.status == TaskPool.PoolState.stopNow); 4438 } 4439 4440 // Test default pool stuff. 4441 assert(taskPool.size == totalCPUs - 1); 4442 4443 nums = new uint[1000]; 4444 foreach (i; parallel(iota(1000))) 4445 { 4446 nums[i] = cast(uint) i; 4447 } 4448 assert(equal(nums, iota(1000))); 4449 4450 assert(equal( 4451 poolInstance.map!"a * a"(iota(3_000_001), 10_000), 4452 map!"a * a"(iota(3_000_001)) 4453 )); 4454 4455 // The filter is to kill random access and test the non-random access 4456 // branch. 4457 assert(equal( 4458 poolInstance.map!"a * a"( 4459 filter!"a == a"(iota(3_000_001) 4460 ), 10_000, 1000), 4461 map!"a * a"(iota(3_000_001)) 4462 )); 4463 4464 assert( 4465 reduce!"a + b"(0UL, 4466 poolInstance.map!"a * a"(iota(300_001), 10_000) 4467 ) == 4468 reduce!"a + b"(0UL, 4469 map!"a * a"(iota(300_001)) 4470 ) 4471 ); 4472 4473 assert(equal( 4474 iota(1_000_002), 4475 poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002))) 4476 )); 4477 4478 { 4479 import std.conv : to; 4480 import std.file : deleteme; 4481 4482 string temp_file = deleteme ~ "-tempDelMe.txt"; 4483 auto file = File(temp_file, "wb"); 4484 scope(exit) 4485 { 4486 file.close(); 4487 import std.file; 4488 remove(temp_file); 4489 } 4490 4491 auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]]; 4492 foreach (row; written) 4493 { 4494 file.writeln(join(to!(string[])(row), "\t")); 4495 } 4496 4497 file = File(temp_file); 4498 4499 void next(ref char[] buf) 4500 { 4501 file.readln(buf); 4502 import std.string : chomp; 4503 buf = chomp(buf); 4504 } 4505 4506 double[][] read; 4507 auto asyncReader = taskPool.asyncBuf(&next, &file.eof); 4508 4509 foreach (line; asyncReader) 4510 { 4511 if (line.length == 0) continue; 4512 auto ls = line.split("\t"); 4513 read ~= to!(double[])(ls); 4514 } 4515 4516 assert(read == written); 4517 file.close(); 4518 } 4519 4520 // Test Map/AsyncBuf chaining. 4521 4522 auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100); 4523 auto temp = poolInstance.map!sqrt( 4524 abuf, 100, 5 4525 ); 4526 auto lmchain = poolInstance.map!"a * a"(temp, 100, 5); 4527 lmchain.popFront(); 4528 4529 int ii; 4530 foreach ( elem; (lmchain)) 4531 { 4532 if (!isClose(elem, ii)) 4533 { 4534 stderr.writeln(ii, '\t', elem); 4535 } 4536 ii++; 4537 } 4538 4539 // Test buffer trick in parallel foreach. 4540 abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100); 4541 abuf.popFront(); 4542 auto bufTrickTest = new size_t[abuf.length]; 4543 foreach (i, elem; parallel(abuf)) 4544 { 4545 bufTrickTest[i] = i; 4546 } 4547 4548 assert(equal(iota(1_000_000), bufTrickTest)); 4549 4550 auto myTask = task!(abs)(-1); 4551 taskPool.put(myTask); 4552 assert(myTask.spinForce == 1); 4553 4554 // Test that worker local storage from one pool receives an index of 0 4555 // when the index is queried w.r.t. another pool. The only way to do this 4556 // is non-deterministically. 4557 foreach (i; parallel(iota(1000), 1)) 4558 { 4559 assert(poolInstance.workerIndex == 0); 4560 } 4561 4562 foreach (i; poolInstance.parallel(iota(1000), 1)) 4563 { 4564 assert(taskPool.workerIndex == 0); 4565 } 4566 4567 // Test exception handling. 4568 static void parallelForeachThrow() 4569 { 4570 foreach (elem; parallel(iota(10))) 4571 { 4572 throw new Exception(""); 4573 } 4574 } 4575 4576 assertThrown!Exception(parallelForeachThrow()); 4577 4578 static int reduceException(int a, int b) 4579 { 4580 throw new Exception(""); 4581 } 4582 4583 assertThrown!Exception( 4584 poolInstance.reduce!reduceException(iota(3)) 4585 ); 4586 4587 static int mapException(int a) 4588 { 4589 throw new Exception(""); 4590 } 4591 4592 assertThrown!Exception( 4593 poolInstance.amap!mapException(iota(3)) 4594 ); 4595 4596 static void mapThrow() 4597 { 4598 auto m = poolInstance.map!mapException(iota(3)); 4599 m.popFront(); 4600 } 4601 4602 assertThrown!Exception(mapThrow()); 4603 4604 struct ThrowingRange 4605 { 4606 @property int front() 4607 { 4608 return 1; 4609 } 4610 void popFront() 4611 { 4612 throw new Exception(""); 4613 } 4614 enum bool empty = false; 4615 } 4616 4617 assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init)); 4618 } 4619 4620 //version = parallelismStressTest; 4621 4622 // These are more like stress tests than real unit tests. They print out 4623 // tons of stuff and should not be run every time make unittest is run. 4624 version (parallelismStressTest) 4625 { 4626 @system unittest 4627 { 4628 import std.stdio : stderr, writeln, readln; 4629 import std.range : iota; 4630 import std.algorithm.iteration : filter, reduce; 4631 4632 size_t attempt; 4633 for (; attempt < 10; attempt++) 4634 foreach (poolSize; [0, 4]) 4635 { 4636 4637 poolInstance = new TaskPool(poolSize); 4638 4639 uint[] numbers = new uint[1_000]; 4640 4641 foreach (i; poolInstance.parallel( iota(0, numbers.length)) ) 4642 { 4643 numbers[i] = cast(uint) i; 4644 } 4645 4646 // Make sure it works. 4647 foreach (i; 0 .. numbers.length) 4648 { 4649 assert(numbers[i] == i); 4650 } 4651 4652 stderr.writeln("Done creating nums."); 4653 4654 4655 auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000)); 4656 foreach (num; poolInstance.parallel(myNumbers)) 4657 { 4658 assert(num % 7 > 0 && num < 1000); 4659 } 4660 stderr.writeln("Done modulus test."); 4661 4662 uint[] squares = poolInstance.amap!"a * a"(numbers, 100); 4663 assert(squares.length == numbers.length); 4664 foreach (i, number; numbers) 4665 { 4666 assert(squares[i] == number * number); 4667 } 4668 stderr.writeln("Done squares."); 4669 4670 auto sumFuture = task!( reduce!"a + b" )(numbers); 4671 poolInstance.put(sumFuture); 4672 4673 ulong sumSquares = 0; 4674 foreach (elem; numbers) 4675 { 4676 sumSquares += elem * elem; 4677 } 4678 4679 uint mySum = sumFuture.spinForce(); 4680 assert(mySum == 999 * 1000 / 2); 4681 4682 auto mySumParallel = poolInstance.reduce!"a + b"(numbers); 4683 assert(mySum == mySumParallel); 4684 stderr.writeln("Done sums."); 4685 4686 auto myTask = task( 4687 { 4688 synchronized writeln("Our lives are parallel...Our lives are parallel."); 4689 }); 4690 poolInstance.put(myTask); 4691 4692 auto nestedOuter = "abcd"; 4693 auto nestedInner = iota(0, 10, 2); 4694 4695 foreach (i, letter; poolInstance.parallel(nestedOuter, 1)) 4696 { 4697 foreach (j, number; poolInstance.parallel(nestedInner, 1)) 4698 { 4699 synchronized writeln(i, ": ", letter, " ", j, ": ", number); 4700 } 4701 } 4702 4703 poolInstance.stop(); 4704 } 4705 4706 assert(attempt == 10); 4707 writeln("Press enter to go to next round of unittests."); 4708 readln(); 4709 } 4710 4711 // These unittests are intended more for actual testing and not so much 4712 // as examples. 4713 @system unittest 4714 { 4715 import std.stdio : stderr; 4716 import std.range : iota; 4717 import std.algorithm.iteration : filter, reduce; 4718 import std.math.algebraic : sqrt; 4719 import std.math.operations : isClose; 4720 import std.math.traits : isNaN; 4721 import std.conv : text; 4722 4723 foreach (attempt; 0 .. 10) 4724 foreach (poolSize; [0, 4]) 4725 { 4726 poolInstance = new TaskPool(poolSize); 4727 4728 // Test indexing. 4729 stderr.writeln("Creator Raw Index: ", poolInstance.threadIndex); 4730 assert(poolInstance.workerIndex() == 0); 4731 4732 // Test worker-local storage. 4733 auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1); 4734 foreach (i; poolInstance.parallel(iota(0U, 1_000_000))) 4735 { 4736 workerLocalStorage.get++; 4737 } 4738 assert(reduce!"a + b"(workerLocalStorage.toRange) == 4739 1_000_000 + poolInstance.size + 1); 4740 4741 // Make sure work is reasonably balanced among threads. This test is 4742 // non-deterministic and is more of a sanity check than something that 4743 // has an absolute pass/fail. 4744 shared(uint)[void*] nJobsByThread; 4745 foreach (thread; poolInstance.pool) 4746 { 4747 nJobsByThread[cast(void*) thread] = 0; 4748 } 4749 nJobsByThread[ cast(void*) Thread.getThis()] = 0; 4750 4751 foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 )) 4752 { 4753 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1); 4754 } 4755 4756 stderr.writeln("\nCurrent thread is: ", 4757 cast(void*) Thread.getThis()); 4758 stderr.writeln("Workload distribution: "); 4759 foreach (k, v; nJobsByThread) 4760 { 4761 stderr.writeln(k, '\t', v); 4762 } 4763 4764 // Test whether amap can be nested. 4765 real[][] matrix = new real[][](1000, 1000); 4766 foreach (i; poolInstance.parallel( iota(0, matrix.length) )) 4767 { 4768 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) )) 4769 { 4770 matrix[i][j] = i * j; 4771 } 4772 } 4773 4774 // Get around weird bugs having to do w/ sqrt being an intrinsic: 4775 static real mySqrt(real num) 4776 { 4777 return sqrt(num); 4778 } 4779 4780 static real[] parallelSqrt(real[] nums) 4781 { 4782 return poolInstance.amap!mySqrt(nums); 4783 } 4784 4785 real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix); 4786 4787 foreach (i, row; sqrtMatrix) 4788 { 4789 foreach (j, elem; row) 4790 { 4791 real shouldBe = sqrt( cast(real) i * j); 4792 assert(isClose(shouldBe, elem)); 4793 sqrtMatrix[i][j] = shouldBe; 4794 } 4795 } 4796 4797 auto saySuccess = task( 4798 { 4799 stderr.writeln( 4800 "Success doing matrix stuff that involves nested pool use."); 4801 }); 4802 poolInstance.put(saySuccess); 4803 saySuccess.workForce(); 4804 4805 // A more thorough test of amap, reduce: Find the sum of the square roots of 4806 // matrix. 4807 4808 static real parallelSum(real[] input) 4809 { 4810 return poolInstance.reduce!"a + b"(input); 4811 } 4812 4813 auto sumSqrt = poolInstance.reduce!"a + b"( 4814 poolInstance.amap!parallelSum( 4815 sqrtMatrix 4816 ) 4817 ); 4818 4819 assert(isClose(sumSqrt, 4.437e8, 1e-2)); 4820 stderr.writeln("Done sum of square roots."); 4821 4822 // Test whether tasks work with function pointers. 4823 /+ // This part is buggy and needs to be fixed... 4824 auto nanTask = task(&isNaN, 1.0L); 4825 poolInstance.put(nanTask); 4826 assert(nanTask.spinForce == false); 4827 4828 if (poolInstance.size > 0) 4829 { 4830 // Test work waiting. 4831 static void uselessFun() 4832 { 4833 foreach (i; 0 .. 1_000_000) {} 4834 } 4835 4836 auto uselessTasks = new typeof(task(&uselessFun))[1000]; 4837 foreach (ref uselessTask; uselessTasks) 4838 { 4839 uselessTask = task(&uselessFun); 4840 } 4841 foreach (ref uselessTask; uselessTasks) 4842 { 4843 poolInstance.put(uselessTask); 4844 } 4845 foreach (ref uselessTask; uselessTasks) 4846 { 4847 uselessTask.workForce(); 4848 } 4849 } 4850 +/ 4851 4852 // Test the case of non-random access + ref returns. 4853 int[] nums = [1,2,3,4,5]; 4854 static struct RemoveRandom 4855 { 4856 int[] arr; 4857 4858 ref int front() 4859 { 4860 return arr.front; 4861 } 4862 void popFront() 4863 { 4864 arr.popFront(); 4865 } 4866 bool empty() 4867 { 4868 return arr.empty; 4869 } 4870 } 4871 4872 auto refRange = RemoveRandom(nums); 4873 foreach (ref elem; poolInstance.parallel(refRange)) 4874 { 4875 elem++; 4876 } 4877 assert(nums == [2,3,4,5,6], text(nums)); 4878 stderr.writeln("Nums: ", nums); 4879 4880 poolInstance.stop(); 4881 } 4882 } 4883 } 4884 4885 @system unittest 4886 { 4887 static struct __S_12733 4888 { 4889 invariant() { assert(checksum == 1_234_567_890); } 4890 this(ulong u){n = u;} 4891 void opAssign(__S_12733 s){this.n = s.n;} 4892 ulong n; 4893 ulong checksum = 1_234_567_890; 4894 } 4895 4896 static auto __genPair_12733(ulong n) { return __S_12733(n); } 4897 immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ]; 4898 4899 auto result = taskPool.amap!__genPair_12733(data); 4900 } 4901 4902 @safe unittest 4903 { 4904 import std.range : iota; 4905 4906 // this test was in std.range, but caused cycles. 4907 assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} })); 4908 } 4909 4910 @safe unittest 4911 { 4912 import std.algorithm.iteration : each; 4913 4914 long[] arr; 4915 static assert(is(typeof({ 4916 arr.parallel.each!"a++"; 4917 }))); 4918 } 4919 4920 // https://issues.dlang.org/show_bug.cgi?id=17539 4921 @system unittest 4922 { 4923 import std.random : rndGen; 4924 // ensure compilation 4925 try foreach (rnd; rndGen.parallel) break; 4926 catch (ParallelForeachError e) {} 4927 }