1 /** 2 * The event module provides a primitive for lightweight signaling of other threads 3 * (emulating Windows events on Posix) 4 * 5 * Copyright: Copyright (c) 2019 D Language Foundation 6 * License: Distributed under the 7 * $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0). 8 * (See accompanying file LICENSE) 9 * Authors: Rainer Schuetze 10 * Source: $(DRUNTIMESRC core/sync/event.d) 11 */ 12 module core.sync.event; 13 14 version (Windows) 15 { 16 import core.sys.windows.basetsd /+: HANDLE +/; 17 import core.sys.windows.winerror /+: WAIT_TIMEOUT +/; 18 import core.sys.windows.winbase /+: CreateEvent, CloseHandle, SetEvent, ResetEvent, 19 WaitForSingleObject, INFINITE, WAIT_OBJECT_0+/; 20 } 21 else version (Posix) 22 { 23 import core.sys.posix.pthread; 24 import core.sys.posix.sys.types; 25 import core.sys.posix.time; 26 } 27 else version (FreeStanding) 28 { 29 30 } 31 else 32 { 33 static assert(false, "Platform not supported"); 34 } 35 36 import core.time; 37 import core.internal.abort : abort; 38 39 /** 40 * represents an event. Clients of an event are suspended while waiting 41 * for the event to be "signaled". 42 * 43 * Implemented using `pthread_mutex` and `pthread_condition` on Posix and 44 * `CreateEvent` and `SetEvent` on Windows. 45 --- 46 import core.sync.event, core.thread, std.file; 47 48 struct ProcessFile 49 { 50 ThreadGroup group; 51 Event event; 52 void[] buffer; 53 54 void doProcess() 55 { 56 event.wait(); 57 // process buffer 58 } 59 60 void process(string filename) 61 { 62 event.initialize(true, false); 63 group = new ThreadGroup; 64 for (int i = 0; i < 10; ++i) 65 group.create(&doProcess); 66 67 buffer = std.file.read(filename); 68 event.setIfInitialized(); 69 group.joinAll(); 70 event.terminate(); 71 } 72 } 73 --- 74 */ 75 struct Event 76 { 77 nothrow @nogc: 78 /** 79 * Creates an event object. 80 * 81 * Params: 82 * manualReset = the state of the event is not reset automatically after resuming waiting clients 83 * initialState = initial state of the signal 84 */ 85 this(bool manualReset, bool initialState) 86 { 87 initialize(manualReset, initialState); 88 } 89 90 /** 91 * Initializes an event object. Does nothing if the event is already initialized. 92 * 93 * Params: 94 * manualReset = the state of the event is not reset automatically after resuming waiting clients 95 * initialState = initial state of the signal 96 */ 97 void initialize(bool manualReset, bool initialState) 98 { 99 version (Windows) 100 { 101 if (m_event) 102 return; 103 m_event = CreateEvent(null, manualReset, initialState, null); 104 m_event || abort("Error: CreateEvent failed."); 105 } 106 else version (Posix) 107 { 108 if (m_initalized) 109 return; 110 pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) == 0 || 111 abort("Error: pthread_mutex_init failed."); 112 static if ( is( typeof( pthread_condattr_setclock ) ) ) 113 { 114 pthread_condattr_t attr = void; 115 pthread_condattr_init(&attr) == 0 || 116 abort("Error: pthread_condattr_init failed."); 117 pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0 || 118 abort("Error: pthread_condattr_setclock failed."); 119 pthread_cond_init(&m_cond, &attr) == 0 || 120 abort("Error: pthread_cond_init failed."); 121 pthread_condattr_destroy(&attr) == 0 || 122 abort("Error: pthread_condattr_destroy failed."); 123 } 124 else 125 { 126 pthread_cond_init(&m_cond, null) == 0 || 127 abort("Error: pthread_cond_init failed."); 128 } 129 m_state = initialState; 130 m_manualReset = manualReset; 131 m_initalized = true; 132 } 133 } 134 135 // copying not allowed, can produce resource leaks 136 @disable this(this); 137 @disable void opAssign(Event); 138 139 ~this() 140 { 141 terminate(); 142 } 143 144 /** 145 * deinitialize event. Does nothing if the event is not initialized. There must not be 146 * threads currently waiting for the event to be signaled. 147 */ 148 void terminate() 149 { 150 version (Windows) 151 { 152 if (m_event) 153 CloseHandle(m_event); 154 m_event = null; 155 } 156 else version (Posix) 157 { 158 if (m_initalized) 159 { 160 pthread_mutex_destroy(&m_mutex) == 0 || 161 abort("Error: pthread_mutex_destroy failed."); 162 pthread_cond_destroy(&m_cond) == 0 || 163 abort("Error: pthread_cond_destroy failed."); 164 m_initalized = false; 165 } 166 } 167 } 168 169 void set() 170 { 171 setIfInitialized(); 172 } 173 174 /// Set the event to "signaled", so that waiting clients are resumed 175 void setIfInitialized() 176 { 177 version (Windows) 178 { 179 if (m_event) 180 SetEvent(m_event); 181 } 182 else version (Posix) 183 { 184 if (m_initalized) 185 { 186 pthread_mutex_lock(&m_mutex); 187 m_state = true; 188 pthread_cond_broadcast(&m_cond); 189 pthread_mutex_unlock(&m_mutex); 190 } 191 } 192 } 193 194 /// Reset the event manually 195 void reset() 196 { 197 version (Windows) 198 { 199 if (m_event) 200 ResetEvent(m_event); 201 } 202 else version (Posix) 203 { 204 if (m_initalized) 205 { 206 pthread_mutex_lock(&m_mutex); 207 m_state = false; 208 pthread_mutex_unlock(&m_mutex); 209 } 210 } 211 } 212 213 /** 214 * Wait for the event to be signaled without timeout. 215 * 216 * Returns: 217 * `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured 218 */ 219 bool wait() 220 { 221 version (Windows) 222 { 223 return m_event && WaitForSingleObject(m_event, INFINITE) == WAIT_OBJECT_0; 224 } 225 else version (Posix) 226 { 227 return wait(Duration.max); 228 } 229 else version (FreeStanding) assert(0); 230 } 231 232 /** 233 * Wait for the event to be signaled with timeout. 234 * 235 * Params: 236 * tmout = the maximum time to wait 237 * Returns: 238 * `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or 239 * the event is uninitialized or another error occured 240 */ 241 bool wait(Duration tmout) 242 { 243 version (Windows) 244 { 245 if (!m_event) 246 return false; 247 248 auto maxWaitMillis = dur!("msecs")(uint.max - 1); 249 250 while (tmout > maxWaitMillis) 251 { 252 auto res = WaitForSingleObject(m_event, uint.max - 1); 253 if (res != WAIT_TIMEOUT) 254 return res == WAIT_OBJECT_0; 255 tmout -= maxWaitMillis; 256 } 257 auto ms = cast(uint)(tmout.total!"msecs"); 258 return WaitForSingleObject(m_event, ms) == WAIT_OBJECT_0; 259 } 260 else version (Posix) 261 { 262 if (!m_initalized) 263 return false; 264 265 pthread_mutex_lock(&m_mutex); 266 267 int result = 0; 268 if (!m_state) 269 { 270 if (tmout == Duration.max) 271 { 272 result = pthread_cond_wait(&m_cond, &m_mutex); 273 } 274 else 275 { 276 import core.sync.config; 277 278 timespec t = void; 279 mktspec(t, tmout); 280 281 result = pthread_cond_timedwait(&m_cond, &m_mutex, &t); 282 } 283 } 284 if (result == 0 && !m_manualReset) 285 m_state = false; 286 287 pthread_mutex_unlock(&m_mutex); 288 289 return result == 0; 290 } 291 else version (FreeStanding) assert(0); 292 } 293 294 private: 295 version (Windows) 296 { 297 HANDLE m_event; 298 } 299 else version (Posix) 300 { 301 pthread_mutex_t m_mutex; 302 pthread_cond_t m_cond; 303 bool m_initalized; 304 bool m_state; 305 bool m_manualReset; 306 } 307 } 308 309 // Test single-thread (non-shared) use. 310 @nogc nothrow unittest 311 { 312 // auto-reset, initial state false 313 Event ev1 = Event(false, false); 314 assert(!ev1.wait(1.dur!"msecs")); 315 ev1.setIfInitialized(); 316 assert(ev1.wait()); 317 assert(!ev1.wait(1.dur!"msecs")); 318 319 // manual-reset, initial state true 320 Event ev2 = Event(true, true); 321 assert(ev2.wait()); 322 assert(ev2.wait()); 323 ev2.reset(); 324 assert(!ev2.wait(1.dur!"msecs")); 325 } 326 327 unittest 328 { 329 import core.thread, core.atomic; 330 331 scope event = new Event(true, false); 332 int numThreads = 10; 333 shared int numRunning = 0; 334 335 void testFn() 336 { 337 event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner 338 numRunning.atomicOp!"+="(1); 339 } 340 341 auto group = new ThreadGroup; 342 343 for (int i = 0; i < numThreads; ++i) 344 group.create(&testFn); 345 346 auto start = MonoTime.currTime; 347 assert(numRunning == 0); 348 349 event.setIfInitialized(); 350 group.joinAll(); 351 352 assert(numRunning == numThreads); 353 354 assert(MonoTime.currTime - start < 5.dur!"seconds"); 355 }