The OpenD Programming Language

1 /**
2  * The semaphore module provides a general use semaphore for synchronization.
3  *
4  * Copyright: Copyright Sean Kelly 2005 - 2009.
5  * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
6  * Authors:   Sean Kelly
7  * Source:    $(DRUNTIMESRC core/sync/_semaphore.d)
8  */
9 
10 /*          Copyright Sean Kelly 2005 - 2009.
11  * Distributed under the Boost Software License, Version 1.0.
12  *    (See accompanying file LICENSE or copy at
13  *          http://www.boost.org/LICENSE_1_0.txt)
14  */
15 module core.sync.semaphore;
16 
17 
18 public import core.sync.exception;
19 public import core.time;
20 
21 version (OSX)
22     version = Darwin;
23 else version (iOS)
24     version = Darwin;
25 else version (TVOS)
26     version = Darwin;
27 else version (WatchOS)
28     version = Darwin;
29 
30 version (Windows)
31 {
32     import core.sys.windows.basetsd /+: HANDLE+/;
33     import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, INFINITE,
34         ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/;
35     import core.sys.windows.windef /+: BOOL, DWORD+/;
36     import core.sys.windows.winerror /+: WAIT_TIMEOUT+/;
37 }
38 else version (Darwin)
39 {
40     import core.sync.config;
41     import core.stdc.errno;
42     import core.sys.posix.time;
43     import core.sys.darwin.mach.semaphore;
44 }
45 else version (Posix)
46 {
47     import core.sync.config;
48     import core.stdc.errno;
49     import core.sys.posix.pthread;
50     import core.sys.posix.semaphore;
51 }
52 else version (FreeStanding)
53 {
54 }
55 else
56 {
57     static assert(false, "Platform not supported");
58 }
59 
60 
61 ////////////////////////////////////////////////////////////////////////////////
62 // Semaphore
63 //
64 // void wait();
65 // void notify();
66 // bool tryWait();
67 ////////////////////////////////////////////////////////////////////////////////
68 
69 
70 /**
71  * This class represents a general counting semaphore as concieved by Edsger
72  * Dijkstra.  As per Mesa type monitors however, "signal" has been replaced
73  * with "notify" to indicate that control is not transferred to the waiter when
74  * a notification is sent.
75  */
76 class Semaphore
77 {
78     ////////////////////////////////////////////////////////////////////////////
79     // Initialization
80     ////////////////////////////////////////////////////////////////////////////
81 
82 
83     /**
84      * Initializes a semaphore object with the specified initial count.
85      *
86      * Params:
87      *  count = The initial count for the semaphore.
88      *
89      * Throws:
90      *  SyncError on error.
91      */
92     this( uint count = 0 )
93     {
94         version (Windows)
95         {
96             m_hndl = CreateSemaphoreA( null, count, int.max, null );
97             if ( m_hndl == m_hndl.init )
98                 throw new SyncError( "Unable to create semaphore" );
99         }
100         else version (Darwin)
101         {
102             auto rc = semaphore_create( mach_task_self(), &m_hndl, SYNC_POLICY_FIFO, count );
103             if ( rc )
104                 throw new SyncError( "Unable to create semaphore" );
105         }
106         else version (Posix)
107         {
108             int rc = sem_init( &m_hndl, 0, count );
109             if ( rc )
110                 throw new SyncError( "Unable to create semaphore" );
111         }
112     }
113 
114 
115     ~this()
116     {
117         version (Windows)
118         {
119             BOOL rc = CloseHandle( m_hndl );
120             assert( rc, "Unable to destroy semaphore" );
121         }
122         else version (Darwin)
123         {
124             auto rc = semaphore_destroy( mach_task_self(), m_hndl );
125             assert( !rc, "Unable to destroy semaphore" );
126         }
127         else version (Posix)
128         {
129             int rc = sem_destroy( &m_hndl );
130             assert( !rc, "Unable to destroy semaphore" );
131         }
132     }
133 
134 
135     ////////////////////////////////////////////////////////////////////////////
136     // General Actions
137     ////////////////////////////////////////////////////////////////////////////
138 
139 
140     /**
141      * Wait until the current count is above zero, then atomically decrement
142      * the count by one and return.
143      *
144      * Throws:
145      *  SyncError on error.
146      */
147     void wait()
148     {
149         version (Windows)
150         {
151             DWORD rc = WaitForSingleObject( m_hndl, INFINITE );
152             if ( rc != WAIT_OBJECT_0 )
153                 throw new SyncError( "Unable to wait for semaphore" );
154         }
155         else version (Darwin)
156         {
157             while ( true )
158             {
159                 auto rc = semaphore_wait( m_hndl );
160                 if ( !rc )
161                     return;
162                 if ( rc == KERN_ABORTED && errno == EINTR )
163                     continue;
164                 throw new SyncError( "Unable to wait for semaphore" );
165             }
166         }
167         else version (Posix)
168         {
169             while ( true )
170             {
171                 if ( !sem_wait( &m_hndl ) )
172                     return;
173                 if ( errno != EINTR )
174                     throw new SyncError( "Unable to wait for semaphore" );
175             }
176         }
177     }
178 
179 
180     /**
181      * Suspends the calling thread until the current count moves above zero or
182      * until the supplied time period has elapsed.  If the count moves above
183      * zero in this interval, then atomically decrement the count by one and
184      * return true.  Otherwise, return false.
185      *
186      * Params:
187      *  period = The time to wait.
188      *
189      * In:
190      *  period must be non-negative.
191      *
192      * Throws:
193      *  SyncError on error.
194      *
195      * Returns:
196      *  true if notified before the timeout and false if not.
197      */
198     bool wait( Duration period )
199     in
200     {
201         assert( !period.isNegative );
202     }
203     do
204     {
205         version (Windows)
206         {
207             auto maxWaitMillis = dur!("msecs")( uint.max - 1 );
208 
209             while ( period > maxWaitMillis )
210             {
211                 auto rc = WaitForSingleObject( m_hndl, cast(uint)
212                                                        maxWaitMillis.total!"msecs" );
213                 switch ( rc )
214                 {
215                 case WAIT_OBJECT_0:
216                     return true;
217                 case WAIT_TIMEOUT:
218                     period -= maxWaitMillis;
219                     continue;
220                 default:
221                     throw new SyncError( "Unable to wait for semaphore" );
222                 }
223             }
224             switch ( WaitForSingleObject( m_hndl, cast(uint) period.total!"msecs" ) )
225             {
226             case WAIT_OBJECT_0:
227                 return true;
228             case WAIT_TIMEOUT:
229                 return false;
230             default:
231                 throw new SyncError( "Unable to wait for semaphore" );
232             }
233         }
234         else version (Darwin)
235         {
236             mach_timespec_t t = void;
237             (cast(byte*) &t)[0 .. t.sizeof] = 0;
238 
239             if ( period.total!"seconds" > t.tv_sec.max )
240             {
241                 t.tv_sec  = t.tv_sec.max;
242                 t.tv_nsec = cast(typeof(t.tv_nsec)) period.split!("seconds", "nsecs")().nsecs;
243             }
244             else
245                 period.split!("seconds", "nsecs")(t.tv_sec, t.tv_nsec);
246             while ( true )
247             {
248                 auto rc = semaphore_timedwait( m_hndl, t );
249                 if ( !rc )
250                     return true;
251                 if ( rc == KERN_OPERATION_TIMED_OUT )
252                     return false;
253                 if ( rc != KERN_ABORTED || errno != EINTR )
254                     throw new SyncError( "Unable to wait for semaphore" );
255             }
256         }
257         else version (Posix)
258         {
259             import core.sys.posix.time : clock_gettime, CLOCK_REALTIME;
260 
261             timespec t = void;
262             clock_gettime( CLOCK_REALTIME, &t );
263             mvtspec( t, period );
264 
265             while ( true )
266             {
267                 if ( !sem_timedwait( &m_hndl, &t ) )
268                     return true;
269                 if ( errno == ETIMEDOUT )
270                     return false;
271                 if ( errno != EINTR )
272                     throw new SyncError( "Unable to wait for semaphore" );
273             }
274         }
275 	else version (FreeStanding) assert(0);
276     }
277 
278 
279     /**
280      * Atomically increment the current count by one.  This will notify one
281      * waiter, if there are any in the queue.
282      *
283      * Throws:
284      *  SyncError on error.
285      */
286     void notify()
287     {
288         version (Windows)
289         {
290             if ( !ReleaseSemaphore( m_hndl, 1, null ) )
291                 throw new SyncError( "Unable to notify semaphore" );
292         }
293         else version (Darwin)
294         {
295             auto rc = semaphore_signal( m_hndl );
296             if ( rc )
297                 throw new SyncError( "Unable to notify semaphore" );
298         }
299         else version (Posix)
300         {
301             int rc = sem_post( &m_hndl );
302             if ( rc )
303                 throw new SyncError( "Unable to notify semaphore" );
304         }
305     }
306 
307 
308     /**
309      * If the current count is equal to zero, return.  Otherwise, atomically
310      * decrement the count by one and return true.
311      *
312      * Throws:
313      *  SyncError on error.
314      *
315      * Returns:
316      *  true if the count was above zero and false if not.
317      */
318     bool tryWait()
319     {
320         version (Windows)
321         {
322             switch ( WaitForSingleObject( m_hndl, 0 ) )
323             {
324             case WAIT_OBJECT_0:
325                 return true;
326             case WAIT_TIMEOUT:
327                 return false;
328             default:
329                 throw new SyncError( "Unable to wait for semaphore" );
330             }
331         }
332         else version (Darwin)
333         {
334             return wait( dur!"hnsecs"(0) );
335         }
336         else version (Posix)
337         {
338             while ( true )
339             {
340                 if ( !sem_trywait( &m_hndl ) )
341                     return true;
342                 if ( errno == EAGAIN )
343                     return false;
344                 if ( errno != EINTR )
345                     throw new SyncError( "Unable to wait for semaphore" );
346             }
347         }
348 	else version (FreeStanding) assert(0);
349     }
350 
351 
352 protected:
353 
354     /// Aliases the operating-system-specific semaphore type.
355     version (Windows)        alias Handle = HANDLE;
356     /// ditto
357     else version (Darwin)    alias Handle = semaphore_t;
358     /// ditto
359     else version (Posix)     alias Handle = sem_t;
360     else version (FreeStanding) alias Handle = void*;
361 
362     /// Handle to the system-specific semaphore.
363     Handle m_hndl;
364 }
365 
366 
367 ////////////////////////////////////////////////////////////////////////////////
368 // Unit Tests
369 ////////////////////////////////////////////////////////////////////////////////
370 
371 unittest
372 {
373     import core.thread, core.atomic;
374 
375     void testWait()
376     {
377         auto semaphore = new Semaphore;
378         shared bool stopConsumption = false;
379         immutable numToProduce = 20;
380         immutable numConsumers = 10;
381         shared size_t numConsumed;
382         shared size_t numComplete;
383 
384         void consumer()
385         {
386             while (true)
387             {
388                 semaphore.wait();
389 
390                 if (atomicLoad(stopConsumption))
391                     break;
392                 atomicOp!"+="(numConsumed, 1);
393             }
394             atomicOp!"+="(numComplete, 1);
395         }
396 
397         void producer()
398         {
399             assert(!semaphore.tryWait());
400 
401             foreach (_; 0 .. numToProduce)
402                 semaphore.notify();
403 
404             // wait until all items are consumed
405             while (atomicLoad(numConsumed) != numToProduce)
406                 Thread.yield();
407 
408             // mark consumption as finished
409             atomicStore(stopConsumption, true);
410 
411             // wake all consumers
412             foreach (_; 0 .. numConsumers)
413                 semaphore.notify();
414 
415             // wait until all consumers completed
416             while (atomicLoad(numComplete) != numConsumers)
417                 Thread.yield();
418 
419             assert(!semaphore.tryWait());
420             semaphore.notify();
421             assert(semaphore.tryWait());
422             assert(!semaphore.tryWait());
423         }
424 
425         auto group = new ThreadGroup;
426 
427         for ( int i = 0; i < numConsumers; ++i )
428             group.create(&consumer);
429         group.create(&producer);
430         group.joinAll();
431     }
432 
433 
434     void testWaitTimeout()
435     {
436         auto sem = new Semaphore;
437         shared bool semReady;
438         bool alertedOne, alertedTwo;
439 
440         void waiter()
441         {
442             while (!atomicLoad(semReady))
443                 Thread.yield();
444             alertedOne = sem.wait(dur!"msecs"(1));
445             alertedTwo = sem.wait(dur!"msecs"(1));
446             assert(alertedOne && !alertedTwo);
447         }
448 
449         auto thread = new Thread(&waiter);
450         thread.start();
451 
452         sem.notify();
453         atomicStore(semReady, true);
454         thread.join();
455         assert(alertedOne && !alertedTwo);
456     }
457 
458     testWait();
459     testWaitTimeout();
460 }