1 /** 2 * The semaphore module provides a general use semaphore for synchronization. 3 * 4 * Copyright: Copyright Sean Kelly 2005 - 2009. 5 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 6 * Authors: Sean Kelly 7 * Source: $(DRUNTIMESRC core/sync/_semaphore.d) 8 */ 9 10 /* Copyright Sean Kelly 2005 - 2009. 11 * Distributed under the Boost Software License, Version 1.0. 12 * (See accompanying file LICENSE or copy at 13 * http://www.boost.org/LICENSE_1_0.txt) 14 */ 15 module core.sync.semaphore; 16 17 18 public import core.sync.exception; 19 public import core.time; 20 21 version (OSX) 22 version = Darwin; 23 else version (iOS) 24 version = Darwin; 25 else version (TVOS) 26 version = Darwin; 27 else version (WatchOS) 28 version = Darwin; 29 30 version (Windows) 31 { 32 import core.sys.windows.basetsd /+: HANDLE+/; 33 import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, INFINITE, 34 ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/; 35 import core.sys.windows.windef /+: BOOL, DWORD+/; 36 import core.sys.windows.winerror /+: WAIT_TIMEOUT+/; 37 } 38 else version (Darwin) 39 { 40 import core.sync.config; 41 import core.stdc.errno; 42 import core.sys.posix.time; 43 import core.sys.darwin.mach.semaphore; 44 } 45 else version (Posix) 46 { 47 import core.sync.config; 48 import core.stdc.errno; 49 import core.sys.posix.pthread; 50 import core.sys.posix.semaphore; 51 } 52 else version (FreeStanding) 53 { 54 } 55 else 56 { 57 static assert(false, "Platform not supported"); 58 } 59 60 61 //////////////////////////////////////////////////////////////////////////////// 62 // Semaphore 63 // 64 // void wait(); 65 // void notify(); 66 // bool tryWait(); 67 //////////////////////////////////////////////////////////////////////////////// 68 69 70 /** 71 * This class represents a general counting semaphore as concieved by Edsger 72 * Dijkstra. As per Mesa type monitors however, "signal" has been replaced 73 * with "notify" to indicate that control is not transferred to the waiter when 74 * a notification is sent. 75 */ 76 class Semaphore 77 { 78 //////////////////////////////////////////////////////////////////////////// 79 // Initialization 80 //////////////////////////////////////////////////////////////////////////// 81 82 83 /** 84 * Initializes a semaphore object with the specified initial count. 85 * 86 * Params: 87 * count = The initial count for the semaphore. 88 * 89 * Throws: 90 * SyncError on error. 91 */ 92 this( uint count = 0 ) 93 { 94 version (Windows) 95 { 96 m_hndl = CreateSemaphoreA( null, count, int.max, null ); 97 if ( m_hndl == m_hndl.init ) 98 throw new SyncError( "Unable to create semaphore" ); 99 } 100 else version (Darwin) 101 { 102 auto rc = semaphore_create( mach_task_self(), &m_hndl, SYNC_POLICY_FIFO, count ); 103 if ( rc ) 104 throw new SyncError( "Unable to create semaphore" ); 105 } 106 else version (Posix) 107 { 108 int rc = sem_init( &m_hndl, 0, count ); 109 if ( rc ) 110 throw new SyncError( "Unable to create semaphore" ); 111 } 112 } 113 114 115 ~this() 116 { 117 version (Windows) 118 { 119 BOOL rc = CloseHandle( m_hndl ); 120 assert( rc, "Unable to destroy semaphore" ); 121 } 122 else version (Darwin) 123 { 124 auto rc = semaphore_destroy( mach_task_self(), m_hndl ); 125 assert( !rc, "Unable to destroy semaphore" ); 126 } 127 else version (Posix) 128 { 129 int rc = sem_destroy( &m_hndl ); 130 assert( !rc, "Unable to destroy semaphore" ); 131 } 132 } 133 134 135 //////////////////////////////////////////////////////////////////////////// 136 // General Actions 137 //////////////////////////////////////////////////////////////////////////// 138 139 140 /** 141 * Wait until the current count is above zero, then atomically decrement 142 * the count by one and return. 143 * 144 * Throws: 145 * SyncError on error. 146 */ 147 void wait() 148 { 149 version (Windows) 150 { 151 DWORD rc = WaitForSingleObject( m_hndl, INFINITE ); 152 if ( rc != WAIT_OBJECT_0 ) 153 throw new SyncError( "Unable to wait for semaphore" ); 154 } 155 else version (Darwin) 156 { 157 while ( true ) 158 { 159 auto rc = semaphore_wait( m_hndl ); 160 if ( !rc ) 161 return; 162 if ( rc == KERN_ABORTED && errno == EINTR ) 163 continue; 164 throw new SyncError( "Unable to wait for semaphore" ); 165 } 166 } 167 else version (Posix) 168 { 169 while ( true ) 170 { 171 if ( !sem_wait( &m_hndl ) ) 172 return; 173 if ( errno != EINTR ) 174 throw new SyncError( "Unable to wait for semaphore" ); 175 } 176 } 177 } 178 179 180 /** 181 * Suspends the calling thread until the current count moves above zero or 182 * until the supplied time period has elapsed. If the count moves above 183 * zero in this interval, then atomically decrement the count by one and 184 * return true. Otherwise, return false. 185 * 186 * Params: 187 * period = The time to wait. 188 * 189 * In: 190 * period must be non-negative. 191 * 192 * Throws: 193 * SyncError on error. 194 * 195 * Returns: 196 * true if notified before the timeout and false if not. 197 */ 198 bool wait( Duration period ) 199 in 200 { 201 assert( !period.isNegative ); 202 } 203 do 204 { 205 version (Windows) 206 { 207 auto maxWaitMillis = dur!("msecs")( uint.max - 1 ); 208 209 while ( period > maxWaitMillis ) 210 { 211 auto rc = WaitForSingleObject( m_hndl, cast(uint) 212 maxWaitMillis.total!"msecs" ); 213 switch ( rc ) 214 { 215 case WAIT_OBJECT_0: 216 return true; 217 case WAIT_TIMEOUT: 218 period -= maxWaitMillis; 219 continue; 220 default: 221 throw new SyncError( "Unable to wait for semaphore" ); 222 } 223 } 224 switch ( WaitForSingleObject( m_hndl, cast(uint) period.total!"msecs" ) ) 225 { 226 case WAIT_OBJECT_0: 227 return true; 228 case WAIT_TIMEOUT: 229 return false; 230 default: 231 throw new SyncError( "Unable to wait for semaphore" ); 232 } 233 } 234 else version (Darwin) 235 { 236 mach_timespec_t t = void; 237 (cast(byte*) &t)[0 .. t.sizeof] = 0; 238 239 if ( period.total!"seconds" > t.tv_sec.max ) 240 { 241 t.tv_sec = t.tv_sec.max; 242 t.tv_nsec = cast(typeof(t.tv_nsec)) period.split!("seconds", "nsecs")().nsecs; 243 } 244 else 245 period.split!("seconds", "nsecs")(t.tv_sec, t.tv_nsec); 246 while ( true ) 247 { 248 auto rc = semaphore_timedwait( m_hndl, t ); 249 if ( !rc ) 250 return true; 251 if ( rc == KERN_OPERATION_TIMED_OUT ) 252 return false; 253 if ( rc != KERN_ABORTED || errno != EINTR ) 254 throw new SyncError( "Unable to wait for semaphore" ); 255 } 256 } 257 else version (Posix) 258 { 259 import core.sys.posix.time : clock_gettime, CLOCK_REALTIME; 260 261 timespec t = void; 262 clock_gettime( CLOCK_REALTIME, &t ); 263 mvtspec( t, period ); 264 265 while ( true ) 266 { 267 if ( !sem_timedwait( &m_hndl, &t ) ) 268 return true; 269 if ( errno == ETIMEDOUT ) 270 return false; 271 if ( errno != EINTR ) 272 throw new SyncError( "Unable to wait for semaphore" ); 273 } 274 } 275 else version (FreeStanding) assert(0); 276 } 277 278 279 /** 280 * Atomically increment the current count by one. This will notify one 281 * waiter, if there are any in the queue. 282 * 283 * Throws: 284 * SyncError on error. 285 */ 286 void notify() 287 { 288 version (Windows) 289 { 290 if ( !ReleaseSemaphore( m_hndl, 1, null ) ) 291 throw new SyncError( "Unable to notify semaphore" ); 292 } 293 else version (Darwin) 294 { 295 auto rc = semaphore_signal( m_hndl ); 296 if ( rc ) 297 throw new SyncError( "Unable to notify semaphore" ); 298 } 299 else version (Posix) 300 { 301 int rc = sem_post( &m_hndl ); 302 if ( rc ) 303 throw new SyncError( "Unable to notify semaphore" ); 304 } 305 } 306 307 308 /** 309 * If the current count is equal to zero, return. Otherwise, atomically 310 * decrement the count by one and return true. 311 * 312 * Throws: 313 * SyncError on error. 314 * 315 * Returns: 316 * true if the count was above zero and false if not. 317 */ 318 bool tryWait() 319 { 320 version (Windows) 321 { 322 switch ( WaitForSingleObject( m_hndl, 0 ) ) 323 { 324 case WAIT_OBJECT_0: 325 return true; 326 case WAIT_TIMEOUT: 327 return false; 328 default: 329 throw new SyncError( "Unable to wait for semaphore" ); 330 } 331 } 332 else version (Darwin) 333 { 334 return wait( dur!"hnsecs"(0) ); 335 } 336 else version (Posix) 337 { 338 while ( true ) 339 { 340 if ( !sem_trywait( &m_hndl ) ) 341 return true; 342 if ( errno == EAGAIN ) 343 return false; 344 if ( errno != EINTR ) 345 throw new SyncError( "Unable to wait for semaphore" ); 346 } 347 } 348 else version (FreeStanding) assert(0); 349 } 350 351 352 protected: 353 354 /// Aliases the operating-system-specific semaphore type. 355 version (Windows) alias Handle = HANDLE; 356 /// ditto 357 else version (Darwin) alias Handle = semaphore_t; 358 /// ditto 359 else version (Posix) alias Handle = sem_t; 360 else version (FreeStanding) alias Handle = void*; 361 362 /// Handle to the system-specific semaphore. 363 Handle m_hndl; 364 } 365 366 367 //////////////////////////////////////////////////////////////////////////////// 368 // Unit Tests 369 //////////////////////////////////////////////////////////////////////////////// 370 371 unittest 372 { 373 import core.thread, core.atomic; 374 375 void testWait() 376 { 377 auto semaphore = new Semaphore; 378 shared bool stopConsumption = false; 379 immutable numToProduce = 20; 380 immutable numConsumers = 10; 381 shared size_t numConsumed; 382 shared size_t numComplete; 383 384 void consumer() 385 { 386 while (true) 387 { 388 semaphore.wait(); 389 390 if (atomicLoad(stopConsumption)) 391 break; 392 atomicOp!"+="(numConsumed, 1); 393 } 394 atomicOp!"+="(numComplete, 1); 395 } 396 397 void producer() 398 { 399 assert(!semaphore.tryWait()); 400 401 foreach (_; 0 .. numToProduce) 402 semaphore.notify(); 403 404 // wait until all items are consumed 405 while (atomicLoad(numConsumed) != numToProduce) 406 Thread.yield(); 407 408 // mark consumption as finished 409 atomicStore(stopConsumption, true); 410 411 // wake all consumers 412 foreach (_; 0 .. numConsumers) 413 semaphore.notify(); 414 415 // wait until all consumers completed 416 while (atomicLoad(numComplete) != numConsumers) 417 Thread.yield(); 418 419 assert(!semaphore.tryWait()); 420 semaphore.notify(); 421 assert(semaphore.tryWait()); 422 assert(!semaphore.tryWait()); 423 } 424 425 auto group = new ThreadGroup; 426 427 for ( int i = 0; i < numConsumers; ++i ) 428 group.create(&consumer); 429 group.create(&producer); 430 group.joinAll(); 431 } 432 433 434 void testWaitTimeout() 435 { 436 auto sem = new Semaphore; 437 shared bool semReady; 438 bool alertedOne, alertedTwo; 439 440 void waiter() 441 { 442 while (!atomicLoad(semReady)) 443 Thread.yield(); 444 alertedOne = sem.wait(dur!"msecs"(1)); 445 alertedTwo = sem.wait(dur!"msecs"(1)); 446 assert(alertedOne && !alertedTwo); 447 } 448 449 auto thread = new Thread(&waiter); 450 thread.start(); 451 452 sem.notify(); 453 atomicStore(semReady, true); 454 thread.join(); 455 assert(alertedOne && !alertedTwo); 456 } 457 458 testWait(); 459 testWaitTimeout(); 460 }