1 /** 2 * The event module provides a primitive for lightweight signaling of other threads 3 * (emulating Windows events on Posix) 4 * 5 * Copyright: Copyright (c) 2019 D Language Foundation 6 * License: Distributed under the 7 * $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0). 8 * (See accompanying file LICENSE) 9 * Authors: Rainer Schuetze 10 * Source: $(DRUNTIMESRC core/sync/event.d) 11 */ 12 module core.sync.event; 13 14 import core.time; 15 16 import rt.sys.config; 17 18 /** 19 * represents an event. Clients of an event are suspended while waiting 20 * for the event to be "signaled". 21 * 22 * Implemented using `pthread_mutex` and `pthread_condition` on Posix and 23 * `CreateEvent` and `SetEvent` on Windows. 24 --- 25 import core.sync.event, core.thread, std.file; 26 27 struct ProcessFile 28 { 29 ThreadGroup group; 30 Event event; 31 void[] buffer; 32 33 void doProcess() 34 { 35 event.wait(); 36 // process buffer 37 } 38 39 void process(string filename) 40 { 41 event.initialize(true, false); 42 group = new ThreadGroup; 43 for (int i = 0; i < 10; ++i) 44 group.create(&doProcess); 45 46 buffer = std.file.read(filename); 47 event.setIfInitialized(); 48 group.joinAll(); 49 event.terminate(); 50 } 51 } 52 --- 53 */ 54 struct Event 55 { 56 nothrow @nogc: 57 /** 58 * Creates an event object. 59 * 60 * Params: 61 * manualReset = the state of the event is not reset automatically after resuming waiting clients 62 * initialState = initial state of the signal 63 */ 64 this(bool manualReset, bool initialState) 65 { 66 initialize(manualReset, initialState); 67 } 68 69 /** 70 * Initializes an event object. Does nothing if the event is already initialized. 71 * 72 * Params: 73 * manualReset = the state of the event is not reset automatically after resuming waiting clients 74 * initialState = initial state of the signal 75 */ 76 void initialize(bool manualReset, bool initialState) 77 { 78 osEvent.create(manualReset, initialState); 79 } 80 81 // copying not allowed, can produce resource leaks 82 @disable this(this); 83 @disable void opAssign(Event); 84 85 ~this() 86 { 87 terminate(); 88 } 89 90 /** 91 * deinitialize event. Does nothing if the event is not initialized. There must not be 92 * threads currently waiting for the event to be signaled. 93 */ 94 void terminate() 95 { 96 osEvent.destroy(); 97 } 98 99 void set() 100 { 101 setIfInitialized(); 102 } 103 104 /// Set the event to "signaled", so that waiting clients are resumed 105 void setIfInitialized() 106 { 107 osEvent.setIfInitialized(); 108 } 109 110 /// Reset the event manually 111 void reset() 112 { 113 osEvent.reset(); 114 } 115 116 /** 117 * Wait for the event to be signaled without timeout. 118 * 119 * Returns: 120 * `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured 121 */ 122 bool wait() 123 { 124 return osEvent.wait(); 125 } 126 127 /** 128 * Wait for the event to be signaled with timeout. 129 * 130 * Params: 131 * tmout = the maximum time to wait 132 * Returns: 133 * `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or 134 * the event is uninitialized or another error occured 135 */ 136 bool wait(Duration tmout) 137 { 138 return osEvent.wait(tmout); 139 } 140 141 private: 142 mixin("import " ~ osEventImport ~ ";"); 143 OsEvent osEvent; 144 } 145 146 // Test single-thread (non-shared) use. 147 @nogc nothrow unittest 148 { 149 // auto-reset, initial state false 150 Event ev1 = Event(false, false); 151 assert(!ev1.wait(1.dur!"msecs")); 152 ev1.setIfInitialized(); 153 assert(ev1.wait()); 154 assert(!ev1.wait(1.dur!"msecs")); 155 156 // manual-reset, initial state true 157 Event ev2 = Event(true, true); 158 assert(ev2.wait()); 159 assert(ev2.wait()); 160 ev2.reset(); 161 assert(!ev2.wait(1.dur!"msecs")); 162 } 163 164 unittest 165 { 166 import core.thread, core.atomic; 167 168 scope event = new Event(true, false); 169 int numThreads = 10; 170 shared int numRunning = 0; 171 172 void testFn() 173 { 174 event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner 175 numRunning.atomicOp!"+="(1); 176 } 177 178 auto group = new ThreadGroup; 179 180 for (int i = 0; i < numThreads; ++i) 181 group.create(&testFn); 182 183 auto start = MonoTime.currTime; 184 assert(numRunning == 0); 185 186 event.setIfInitialized(); 187 group.joinAll(); 188 189 assert(numRunning == numThreads); 190 191 assert(MonoTime.currTime - start < 5.dur!"seconds"); 192 }