The OpenD Programming Language

1 /**
2  * The event module provides a primitive for lightweight signaling of other threads
3  * (emulating Windows events on Posix)
4  *
5  * Copyright: Copyright (c) 2019 D Language Foundation
6  * License: Distributed under the
7  *    $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0).
8  *    (See accompanying file LICENSE)
9  * Authors: Rainer Schuetze
10  * Source:    $(DRUNTIMESRC core/sync/event.d)
11  */
12 module core.sync.event;
13 
14 import core.time;
15 
16 import rt.sys.config;
17 
18 /**
19  * represents an event. Clients of an event are suspended while waiting
20  * for the event to be "signaled".
21  *
22  * Implemented using `pthread_mutex` and `pthread_condition` on Posix and
23  * `CreateEvent` and `SetEvent` on Windows.
24 ---
25 import core.sync.event, core.thread, std.file;
26 
27 struct ProcessFile
28 {
29     ThreadGroup group;
30     Event event;
31     void[] buffer;
32 
33     void doProcess()
34     {
35         event.wait();
36         // process buffer
37     }
38 
39     void process(string filename)
40     {
41         event.initialize(true, false);
42         group = new ThreadGroup;
43         for (int i = 0; i < 10; ++i)
44             group.create(&doProcess);
45 
46         buffer = std.file.read(filename);
47         event.setIfInitialized();
48         group.joinAll();
49         event.terminate();
50     }
51 }
52 ---
53  */
54 struct Event
55 {
56 nothrow @nogc:
57     /**
58      * Creates an event object.
59      *
60      * Params:
61      *  manualReset  = the state of the event is not reset automatically after resuming waiting clients
62      *  initialState = initial state of the signal
63      */
64     this(bool manualReset, bool initialState)
65     {
66         initialize(manualReset, initialState);
67     }
68 
69     /**
70      * Initializes an event object. Does nothing if the event is already initialized.
71      *
72      * Params:
73      *  manualReset  = the state of the event is not reset automatically after resuming waiting clients
74      *  initialState = initial state of the signal
75      */
76     void initialize(bool manualReset, bool initialState)
77     {
78         osEvent.create(manualReset, initialState);
79     }
80 
81     // copying not allowed, can produce resource leaks
82     @disable this(this);
83     @disable void opAssign(Event);
84 
85     ~this()
86     {
87         terminate();
88     }
89 
90     /**
91      * deinitialize event. Does nothing if the event is not initialized. There must not be
92      * threads currently waiting for the event to be signaled.
93     */
94     void terminate()
95     {
96         osEvent.destroy();
97     }
98 
99     void set()
100     {
101         setIfInitialized();
102     }
103 
104     /// Set the event to "signaled", so that waiting clients are resumed
105     void setIfInitialized()
106     {
107         osEvent.setIfInitialized();
108     }
109 
110     /// Reset the event manually
111     void reset()
112     {
113         osEvent.reset();
114     }
115 
116     /**
117      * Wait for the event to be signaled without timeout.
118      *
119      * Returns:
120      *  `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured
121      */
122     bool wait()
123     {
124         return osEvent.wait();
125     }
126 
127     /**
128      * Wait for the event to be signaled with timeout.
129      *
130      * Params:
131      *  tmout = the maximum time to wait
132      * Returns:
133      *  `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or
134      *  the event is uninitialized or another error occured
135      */
136     bool wait(Duration tmout)
137     {
138         return osEvent.wait(tmout);
139     }
140 
141 private:
142     mixin("import " ~ osEventImport ~ ";");
143     OsEvent osEvent;
144 }
145 
146 // Test single-thread (non-shared) use.
147 @nogc nothrow unittest
148 {
149     // auto-reset, initial state false
150     Event ev1 = Event(false, false);
151     assert(!ev1.wait(1.dur!"msecs"));
152     ev1.setIfInitialized();
153     assert(ev1.wait());
154     assert(!ev1.wait(1.dur!"msecs"));
155 
156     // manual-reset, initial state true
157     Event ev2 = Event(true, true);
158     assert(ev2.wait());
159     assert(ev2.wait());
160     ev2.reset();
161     assert(!ev2.wait(1.dur!"msecs"));
162 }
163 
164 unittest
165 {
166     import core.thread, core.atomic;
167 
168     scope event      = new Event(true, false);
169     int  numThreads = 10;
170     shared int numRunning = 0;
171 
172     void testFn()
173     {
174         event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner
175         numRunning.atomicOp!"+="(1);
176     }
177 
178     auto group = new ThreadGroup;
179 
180     for (int i = 0; i < numThreads; ++i)
181         group.create(&testFn);
182 
183     auto start = MonoTime.currTime;
184     assert(numRunning == 0);
185 
186     event.setIfInitialized();
187     group.joinAll();
188 
189     assert(numRunning == numThreads);
190 
191     assert(MonoTime.currTime - start < 5.dur!"seconds");
192 }