The OpenD Programming Language

1 /**
2  * The event module provides a primitive for lightweight signaling of other threads
3  * (emulating Windows events on Posix)
4  *
5  * Copyright: Copyright (c) 2019 D Language Foundation
6  * License: Distributed under the
7  *    $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0).
8  *    (See accompanying file LICENSE)
9  * Authors: Rainer Schuetze
10  * Source:    $(DRUNTIMESRC core/sync/event.d)
11  */
12 module core.sync.event;
13 
14 version (Windows)
15 {
16     import core.sys.windows.basetsd /+: HANDLE +/;
17     import core.sys.windows.winerror /+: WAIT_TIMEOUT +/;
18     import core.sys.windows.winbase /+: CreateEvent, CloseHandle, SetEvent, ResetEvent,
19         WaitForSingleObject, INFINITE, WAIT_OBJECT_0+/;
20 }
21 else version (Posix)
22 {
23     import core.sys.posix.pthread;
24     import core.sys.posix.sys.types;
25     import core.sys.posix.time;
26 }
27 else version (FreeStanding)
28 {
29 
30 }
31 else
32 {
33     static assert(false, "Platform not supported");
34 }
35 
36 import core.time;
37 import core.internal.abort : abort;
38 
39 /**
40  * represents an event. Clients of an event are suspended while waiting
41  * for the event to be "signaled".
42  *
43  * Implemented using `pthread_mutex` and `pthread_condition` on Posix and
44  * `CreateEvent` and `SetEvent` on Windows.
45 ---
46 import core.sync.event, core.thread, std.file;
47 
48 struct ProcessFile
49 {
50     ThreadGroup group;
51     Event event;
52     void[] buffer;
53 
54     void doProcess()
55     {
56         event.wait();
57         // process buffer
58     }
59 
60     void process(string filename)
61     {
62         event.initialize(true, false);
63         group = new ThreadGroup;
64         for (int i = 0; i < 10; ++i)
65             group.create(&doProcess);
66 
67         buffer = std.file.read(filename);
68         event.setIfInitialized();
69         group.joinAll();
70         event.terminate();
71     }
72 }
73 ---
74  */
75 struct Event
76 {
77 nothrow @nogc:
78     /**
79      * Creates an event object.
80      *
81      * Params:
82      *  manualReset  = the state of the event is not reset automatically after resuming waiting clients
83      *  initialState = initial state of the signal
84      */
85     this(bool manualReset, bool initialState)
86     {
87         initialize(manualReset, initialState);
88     }
89 
90     /**
91      * Initializes an event object. Does nothing if the event is already initialized.
92      *
93      * Params:
94      *  manualReset  = the state of the event is not reset automatically after resuming waiting clients
95      *  initialState = initial state of the signal
96      */
97     void initialize(bool manualReset, bool initialState)
98     {
99         version (Windows)
100         {
101             if (m_event)
102                 return;
103             m_event = CreateEvent(null, manualReset, initialState, null);
104             m_event || abort("Error: CreateEvent failed.");
105         }
106         else version (Posix)
107         {
108             if (m_initalized)
109                 return;
110             pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) == 0 ||
111                 abort("Error: pthread_mutex_init failed.");
112             static if ( is( typeof( pthread_condattr_setclock ) ) )
113             {
114                 pthread_condattr_t attr = void;
115                 pthread_condattr_init(&attr) == 0 ||
116                     abort("Error: pthread_condattr_init failed.");
117                 pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0 ||
118                     abort("Error: pthread_condattr_setclock failed.");
119                 pthread_cond_init(&m_cond, &attr) == 0 ||
120                     abort("Error: pthread_cond_init failed.");
121                 pthread_condattr_destroy(&attr) == 0 ||
122                     abort("Error: pthread_condattr_destroy failed.");
123             }
124             else
125             {
126                 pthread_cond_init(&m_cond, null) == 0 ||
127                     abort("Error: pthread_cond_init failed.");
128             }
129             m_state = initialState;
130             m_manualReset = manualReset;
131             m_initalized = true;
132         }
133     }
134 
135     // copying not allowed, can produce resource leaks
136     @disable this(this);
137     @disable void opAssign(Event);
138 
139     ~this()
140     {
141         terminate();
142     }
143 
144     /**
145      * deinitialize event. Does nothing if the event is not initialized. There must not be
146      * threads currently waiting for the event to be signaled.
147     */
148     void terminate()
149     {
150         version (Windows)
151         {
152             if (m_event)
153                 CloseHandle(m_event);
154             m_event = null;
155         }
156         else version (Posix)
157         {
158             if (m_initalized)
159             {
160                 pthread_mutex_destroy(&m_mutex) == 0 ||
161                     abort("Error: pthread_mutex_destroy failed.");
162                 pthread_cond_destroy(&m_cond) == 0 ||
163                     abort("Error: pthread_cond_destroy failed.");
164                 m_initalized = false;
165             }
166         }
167     }
168 
169     void set()
170     {
171         setIfInitialized();
172     }
173 
174     /// Set the event to "signaled", so that waiting clients are resumed
175     void setIfInitialized()
176     {
177         version (Windows)
178         {
179             if (m_event)
180                 SetEvent(m_event);
181         }
182         else version (Posix)
183         {
184             if (m_initalized)
185             {
186                 pthread_mutex_lock(&m_mutex);
187                 m_state = true;
188                 pthread_cond_broadcast(&m_cond);
189                 pthread_mutex_unlock(&m_mutex);
190             }
191         }
192     }
193 
194     /// Reset the event manually
195     void reset()
196     {
197         version (Windows)
198         {
199             if (m_event)
200                 ResetEvent(m_event);
201         }
202         else version (Posix)
203         {
204             if (m_initalized)
205             {
206                 pthread_mutex_lock(&m_mutex);
207                 m_state = false;
208                 pthread_mutex_unlock(&m_mutex);
209             }
210         }
211     }
212 
213     /**
214      * Wait for the event to be signaled without timeout.
215      *
216      * Returns:
217      *  `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured
218      */
219     bool wait()
220     {
221         version (Windows)
222         {
223             return m_event && WaitForSingleObject(m_event, INFINITE) == WAIT_OBJECT_0;
224         }
225         else version (Posix)
226         {
227             return wait(Duration.max);
228         }
229 	else version (FreeStanding) assert(0);
230     }
231 
232     /**
233      * Wait for the event to be signaled with timeout.
234      *
235      * Params:
236      *  tmout = the maximum time to wait
237      * Returns:
238      *  `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or
239      *  the event is uninitialized or another error occured
240      */
241     bool wait(Duration tmout)
242     {
243         version (Windows)
244         {
245             if (!m_event)
246                 return false;
247 
248             auto maxWaitMillis = dur!("msecs")(uint.max - 1);
249 
250             while (tmout > maxWaitMillis)
251             {
252                 auto res = WaitForSingleObject(m_event, uint.max - 1);
253                 if (res != WAIT_TIMEOUT)
254                     return res == WAIT_OBJECT_0;
255                 tmout -= maxWaitMillis;
256             }
257             auto ms = cast(uint)(tmout.total!"msecs");
258             return WaitForSingleObject(m_event, ms) == WAIT_OBJECT_0;
259         }
260         else version (Posix)
261         {
262             if (!m_initalized)
263                 return false;
264 
265             pthread_mutex_lock(&m_mutex);
266 
267             int result = 0;
268             if (!m_state)
269             {
270                 if (tmout == Duration.max)
271                 {
272                     result = pthread_cond_wait(&m_cond, &m_mutex);
273                 }
274                 else
275                 {
276                     import core.sync.config;
277 
278                     timespec t = void;
279                     mktspec(t, tmout);
280 
281                     result = pthread_cond_timedwait(&m_cond, &m_mutex, &t);
282                 }
283             }
284             if (result == 0 && !m_manualReset)
285                 m_state = false;
286 
287             pthread_mutex_unlock(&m_mutex);
288 
289             return result == 0;
290         }
291 	else version (FreeStanding) assert(0);
292     }
293 
294 private:
295     version (Windows)
296     {
297         HANDLE m_event;
298     }
299     else version (Posix)
300     {
301         pthread_mutex_t m_mutex;
302         pthread_cond_t m_cond;
303         bool m_initalized;
304         bool m_state;
305         bool m_manualReset;
306     }
307 }
308 
309 // Test single-thread (non-shared) use.
310 @nogc nothrow unittest
311 {
312     // auto-reset, initial state false
313     Event ev1 = Event(false, false);
314     assert(!ev1.wait(1.dur!"msecs"));
315     ev1.setIfInitialized();
316     assert(ev1.wait());
317     assert(!ev1.wait(1.dur!"msecs"));
318 
319     // manual-reset, initial state true
320     Event ev2 = Event(true, true);
321     assert(ev2.wait());
322     assert(ev2.wait());
323     ev2.reset();
324     assert(!ev2.wait(1.dur!"msecs"));
325 }
326 
327 unittest
328 {
329     import core.thread, core.atomic;
330 
331     scope event      = new Event(true, false);
332     int  numThreads = 10;
333     shared int numRunning = 0;
334 
335     void testFn()
336     {
337         event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner
338         numRunning.atomicOp!"+="(1);
339     }
340 
341     auto group = new ThreadGroup;
342 
343     for (int i = 0; i < numThreads; ++i)
344         group.create(&testFn);
345 
346     auto start = MonoTime.currTime;
347     assert(numRunning == 0);
348 
349     event.setIfInitialized();
350     group.joinAll();
351 
352     assert(numRunning == numThreads);
353 
354     assert(MonoTime.currTime - start < 5.dur!"seconds");
355 }