The OpenD Programming Language

TaskPool

This class encapsulates a task queue and a set of worker threads. Its purpose is to efficiently map a large number of Tasks onto a smaller number of threads. A task queue is a FIFO queue of Task objects that have been submitted to the TaskPool and are awaiting execution. A worker thread is a thread that executes the Task at the front of the queue when one is available and sleeps when the queue is empty.

This class should usually be used via the global instantiation available via the std.parallelism.taskPool property. Occasionally it is useful to explicitly instantiate a TaskPool:

1. When you want TaskPool instances with multiple priorities, for example a low priority pool and a high priority pool.

2. When the threads in the global task pool are waiting on a synchronization primitive (for example a mutex), and you want to parallelize the code that needs to run before these threads can be resumed.

Note: The worker threads in this pool will not stop until stop or finish is called, even if the main thread has finished already. This may lead to programs that never end. If you do not want this behaviour, you can set isDaemon to true.

Constructors

this
this()

Default constructor that initializes a TaskPool with totalCPUs - 1 worker threads. The minus 1 is included because the main thread will also be available to do work.

this
this(size_t nWorkers)

Allows for custom number of worker threads.

Members

Functions

asyncBuf
auto asyncBuf(S source, size_t bufSize)

Given a source range that is expensive to iterate over, returns an input range that asynchronously buffers the contents of source into a buffer of bufSize elements in a worker thread, while making previously buffered elements from a second buffer, also of size bufSize, available via the range interface of the returned object. The returned range has a length iff hasLength!S. asyncBuf is useful, for example, when performing expensive operations on the elements of ranges that represent data on a disk or network.

asyncBuf
auto asyncBuf(C1 next, C2 empty, size_t initialBufSize, size_t nBuffers)

Given a callable object next that writes to a user-provided buffer and a second callable object empty that determines whether more data is available to write via next, returns an input range that asynchronously calls next with a set of size nBuffers of buffers and makes the results available in the order they were obtained via the input range interface of the returned object. Similarly to the input range overload of asyncBuf, the first half of the buffers are made available via the range interface while the second half are filled and vice-versa.

finish
void finish(bool blocking)

Signals worker threads to terminate when the queue becomes empty.

isDaemon
void isDaemon(bool newVal)

These properties control whether the worker threads are daemon threads. A daemon thread is automatically terminated when all non-daemon threads have terminated. A non-daemon thread will prevent a program from terminating as long as it has not terminated.

isDaemon
bool isDaemon()

These properties control whether the worker threads are daemon threads. A daemon thread is automatically terminated when all non-daemon threads have terminated. A non-daemon thread will prevent a program from terminating as long as it has not terminated.

parallel
ParallelForeach!R parallel(R range, size_t workUnitSize)
ParallelForeach!R parallel(R range)

Implements a parallel foreach loop over a range. This works by implicitly creating and submitting one Task to the TaskPool for each worker thread. A work unit is a set of consecutive elements of range to be processed by a worker thread between communication with any other thread. The number of elements processed per work unit is controlled by the workUnitSize parameter. Smaller work units provide better load balancing, but larger work units avoid the overhead of communicating with other threads frequently to fetch the next work unit. Large work units also avoid false sharing in cases where the range is being modified. The less time a single iteration of the loop takes, the larger workUnitSize should be. For very expensive loop bodies, workUnitSize should be 1. An overload that chooses a default work unit size is also available.

priority
int priority()
void priority(int newPriority)

These functions allow getting and setting the OS scheduling priority of the worker threads in this TaskPool. They forward to core.thread.Thread.priority, so a given priority value here means the same thing as an identical priority value in core.thread.

put
void put(Task!(fun, Args) task)
void put(Task!(fun, Args)* task)

Put a Task object on the back of the task queue. The Task object may be passed by pointer or reference.

stop
void stop()

Signals to all worker threads to terminate as soon as they are finished with their current Task, or immediately if they are not executing a Task. Tasks that were in queue will not be executed unless a call to Task.workForce, Task.yieldForce or Task.spinForce causes them to be executed.

workerIndex
size_t workerIndex()

Gets the index of the current thread relative to this TaskPool. Any thread not in this pool will receive an index of 0. The worker threads in this pool receive unique indices of 1 through this.size.

workerLocalStorage
WorkerLocalStorage!T workerLocalStorage(T initialVal)

Creates an instance of worker-local storage, initialized with a given value. The value is lazy so that you can, for example, easily create one instance of a class for each worker. For usage example, see the WorkerLocalStorage struct.

Properties

size
size_t size [@property getter]

Returns the number of worker threads in the pool.

Structs

WorkerLocalStorage
struct WorkerLocalStorage(T)

Struct for creating worker-local storage. Worker-local storage is thread-local storage that exists only for worker threads in a given TaskPool plus a single thread outside the pool. It is allocated on the garbage collected heap in a way that avoids _false sharing, and doesn't necessarily have global scope within any thread. It can be accessed from any worker thread in the TaskPool that created it, and one thread outside this TaskPool. All threads outside the pool that created a given instance of worker-local storage share a single slot.

WorkerLocalStorageRange
struct WorkerLocalStorageRange(T)

Range primitives for worker-local storage. The purpose of this is to access results produced by each worker thread from a single thread once you are no longer using the worker-local storage from multiple threads. Do not use this struct in the parallel portion of your algorithm.

Templates

amap
template amap(functions...)
fold
template fold(functions...)
map
template map(functions...)
reduce
template reduce(functions...)

Meta