1 /** 2 * The semaphore module provides a general use semaphore for synchronization. 3 * 4 * Copyright: Copyright Sean Kelly 2005 - 2009. 5 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 6 * Authors: Sean Kelly 7 * Source: $(DRUNTIMESRC core/sync/_semaphore.d) 8 */ 9 10 /* Copyright Sean Kelly 2005 - 2009. 11 * Distributed under the Boost Software License, Version 1.0. 12 * (See accompanying file LICENSE or copy at 13 * http://www.boost.org/LICENSE_1_0.txt) 14 */ 15 module core.sync.semaphore; 16 17 18 public import core.sync.exception; 19 public import core.time; 20 21 import rt.sys.config; 22 23 mixin("import " ~ osSemaphoreImport ~ ";"); 24 25 //////////////////////////////////////////////////////////////////////////////// 26 // Semaphore 27 // 28 // void wait(); 29 // void notify(); 30 // bool tryWait(); 31 //////////////////////////////////////////////////////////////////////////////// 32 33 34 /** 35 * This class represents a general counting semaphore as concieved by Edsger 36 * Dijkstra. As per Mesa type monitors however, "signal" has been replaced 37 * with "notify" to indicate that control is not transferred to the waiter when 38 * a notification is sent. 39 */ 40 class Semaphore 41 { 42 //////////////////////////////////////////////////////////////////////////// 43 // Initialization 44 //////////////////////////////////////////////////////////////////////////// 45 46 47 /** 48 * Initializes a semaphore object with the specified initial count. 49 * 50 * Params: 51 * count = The initial count for the semaphore. 52 * 53 * Throws: 54 * SyncError on error. 55 */ 56 this( uint count = 0 ) 57 { 58 osSemaphore.create(count); 59 } 60 61 62 ~this() 63 { 64 osSemaphore.destroy(); 65 } 66 67 68 //////////////////////////////////////////////////////////////////////////// 69 // General Actions 70 //////////////////////////////////////////////////////////////////////////// 71 72 73 /** 74 * Wait until the current count is above zero, then atomically decrement 75 * the count by one and return. 76 * 77 * Throws: 78 * SyncError on error. 79 */ 80 void wait() 81 { 82 osSemaphore.wait(); 83 } 84 85 86 /** 87 * Suspends the calling thread until the current count moves above zero or 88 * until the supplied time period has elapsed. If the count moves above 89 * zero in this interval, then atomically decrement the count by one and 90 * return true. Otherwise, return false. 91 * 92 * Params: 93 * period = The time to wait. 94 * 95 * In: 96 * period must be non-negative. 97 * 98 * Throws: 99 * SyncError on error. 100 * 101 * Returns: 102 * true if notified before the timeout and false if not. 103 */ 104 bool wait( Duration period ) 105 in 106 { 107 assert( !period.isNegative ); 108 } 109 do 110 { 111 return osSemaphore.wait(period); 112 } 113 114 115 /** 116 * Atomically increment the current count by one. This will notify one 117 * waiter, if there are any in the queue. 118 * 119 * Throws: 120 * SyncError on error. 121 */ 122 void notify() 123 { 124 osSemaphore.notify(); 125 } 126 127 128 /** 129 * If the current count is equal to zero, return. Otherwise, atomically 130 * decrement the count by one and return true. 131 * 132 * Throws: 133 * SyncError on error. 134 * 135 * Returns: 136 * true if the count was above zero and false if not. 137 */ 138 bool tryWait() 139 { 140 return osSemaphore.tryWait(); 141 } 142 143 144 protected: 145 146 OsSemaphore osSemaphore; 147 } 148 149 150 //////////////////////////////////////////////////////////////////////////////// 151 // Unit Tests 152 //////////////////////////////////////////////////////////////////////////////// 153 154 unittest 155 { 156 import core.thread, core.atomic; 157 158 void testWait() 159 { 160 auto semaphore = new Semaphore; 161 shared bool stopConsumption = false; 162 immutable numToProduce = 20; 163 immutable numConsumers = 10; 164 shared size_t numConsumed; 165 shared size_t numComplete; 166 167 void consumer() 168 { 169 while (true) 170 { 171 semaphore.wait(); 172 173 if (atomicLoad(stopConsumption)) 174 break; 175 atomicOp!"+="(numConsumed, 1); 176 } 177 atomicOp!"+="(numComplete, 1); 178 } 179 180 void producer() 181 { 182 assert(!semaphore.tryWait()); 183 184 foreach (_; 0 .. numToProduce) 185 semaphore.notify(); 186 187 // wait until all items are consumed 188 while (atomicLoad(numConsumed) != numToProduce) 189 Thread.yield(); 190 191 // mark consumption as finished 192 atomicStore(stopConsumption, true); 193 194 // wake all consumers 195 foreach (_; 0 .. numConsumers) 196 semaphore.notify(); 197 198 // wait until all consumers completed 199 while (atomicLoad(numComplete) != numConsumers) 200 Thread.yield(); 201 202 assert(!semaphore.tryWait()); 203 semaphore.notify(); 204 assert(semaphore.tryWait()); 205 assert(!semaphore.tryWait()); 206 } 207 208 auto group = new ThreadGroup; 209 210 for ( int i = 0; i < numConsumers; ++i ) 211 group.create(&consumer); 212 group.create(&producer); 213 group.joinAll(); 214 } 215 216 217 void testWaitTimeout() 218 { 219 auto sem = new Semaphore; 220 shared bool semReady; 221 bool alertedOne, alertedTwo; 222 223 void waiter() 224 { 225 while (!atomicLoad(semReady)) 226 Thread.yield(); 227 alertedOne = sem.wait(dur!"msecs"(1)); 228 alertedTwo = sem.wait(dur!"msecs"(1)); 229 assert(alertedOne && !alertedTwo); 230 } 231 232 auto thread = new Thread(&waiter); 233 thread.start(); 234 235 sem.notify(); 236 atomicStore(semReady, true); 237 thread.join(); 238 assert(alertedOne && !alertedTwo); 239 } 240 241 testWait(); 242 testWaitTimeout(); 243 }