The OpenD Programming Language

1 /**
2  * The semaphore module provides a general use semaphore for synchronization.
3  *
4  * Copyright: Copyright Sean Kelly 2005 - 2009.
5  * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
6  * Authors:   Sean Kelly
7  * Source:    $(DRUNTIMESRC core/sync/_semaphore.d)
8  */
9 
10 /*          Copyright Sean Kelly 2005 - 2009.
11  * Distributed under the Boost Software License, Version 1.0.
12  *    (See accompanying file LICENSE or copy at
13  *          http://www.boost.org/LICENSE_1_0.txt)
14  */
15 module core.sync.semaphore;
16 
17 
18 public import core.sync.exception;
19 public import core.time;
20 
21 import rt.sys.config;
22 
23 mixin("import " ~ osSemaphoreImport ~ ";");
24 
25 ////////////////////////////////////////////////////////////////////////////////
26 // Semaphore
27 //
28 // void wait();
29 // void notify();
30 // bool tryWait();
31 ////////////////////////////////////////////////////////////////////////////////
32 
33 
34 /**
35  * This class represents a general counting semaphore as concieved by Edsger
36  * Dijkstra.  As per Mesa type monitors however, "signal" has been replaced
37  * with "notify" to indicate that control is not transferred to the waiter when
38  * a notification is sent.
39  */
40 class Semaphore
41 {
42     ////////////////////////////////////////////////////////////////////////////
43     // Initialization
44     ////////////////////////////////////////////////////////////////////////////
45 
46 
47     /**
48      * Initializes a semaphore object with the specified initial count.
49      *
50      * Params:
51      *  count = The initial count for the semaphore.
52      *
53      * Throws:
54      *  SyncError on error.
55      */
56     this( uint count = 0 )
57     {
58         osSemaphore.create(count);
59     }
60 
61 
62     ~this()
63     {
64         osSemaphore.destroy();
65     }
66 
67 
68     ////////////////////////////////////////////////////////////////////////////
69     // General Actions
70     ////////////////////////////////////////////////////////////////////////////
71 
72 
73     /**
74      * Wait until the current count is above zero, then atomically decrement
75      * the count by one and return.
76      *
77      * Throws:
78      *  SyncError on error.
79      */
80     void wait()
81     {
82         osSemaphore.wait();
83     }
84 
85 
86     /**
87      * Suspends the calling thread until the current count moves above zero or
88      * until the supplied time period has elapsed.  If the count moves above
89      * zero in this interval, then atomically decrement the count by one and
90      * return true.  Otherwise, return false.
91      *
92      * Params:
93      *  period = The time to wait.
94      *
95      * In:
96      *  period must be non-negative.
97      *
98      * Throws:
99      *  SyncError on error.
100      *
101      * Returns:
102      *  true if notified before the timeout and false if not.
103      */
104     bool wait( Duration period )
105     in
106     {
107         assert( !period.isNegative );
108     }
109     do
110     {
111         return osSemaphore.wait(period);
112     }
113 
114 
115     /**
116      * Atomically increment the current count by one.  This will notify one
117      * waiter, if there are any in the queue.
118      *
119      * Throws:
120      *  SyncError on error.
121      */
122     void notify()
123     {
124         osSemaphore.notify();
125     }
126 
127 
128     /**
129      * If the current count is equal to zero, return.  Otherwise, atomically
130      * decrement the count by one and return true.
131      *
132      * Throws:
133      *  SyncError on error.
134      *
135      * Returns:
136      *  true if the count was above zero and false if not.
137      */
138     bool tryWait()
139     {
140         return osSemaphore.tryWait();
141     }
142 
143 
144 protected:
145 
146     OsSemaphore osSemaphore;
147 }
148 
149 
150 ////////////////////////////////////////////////////////////////////////////////
151 // Unit Tests
152 ////////////////////////////////////////////////////////////////////////////////
153 
154 unittest
155 {
156     import core.thread, core.atomic;
157 
158     void testWait()
159     {
160         auto semaphore = new Semaphore;
161         shared bool stopConsumption = false;
162         immutable numToProduce = 20;
163         immutable numConsumers = 10;
164         shared size_t numConsumed;
165         shared size_t numComplete;
166 
167         void consumer()
168         {
169             while (true)
170             {
171                 semaphore.wait();
172 
173                 if (atomicLoad(stopConsumption))
174                     break;
175                 atomicOp!"+="(numConsumed, 1);
176             }
177             atomicOp!"+="(numComplete, 1);
178         }
179 
180         void producer()
181         {
182             assert(!semaphore.tryWait());
183 
184             foreach (_; 0 .. numToProduce)
185                 semaphore.notify();
186 
187             // wait until all items are consumed
188             while (atomicLoad(numConsumed) != numToProduce)
189                 Thread.yield();
190 
191             // mark consumption as finished
192             atomicStore(stopConsumption, true);
193 
194             // wake all consumers
195             foreach (_; 0 .. numConsumers)
196                 semaphore.notify();
197 
198             // wait until all consumers completed
199             while (atomicLoad(numComplete) != numConsumers)
200                 Thread.yield();
201 
202             assert(!semaphore.tryWait());
203             semaphore.notify();
204             assert(semaphore.tryWait());
205             assert(!semaphore.tryWait());
206         }
207 
208         auto group = new ThreadGroup;
209 
210         for ( int i = 0; i < numConsumers; ++i )
211             group.create(&consumer);
212         group.create(&producer);
213         group.joinAll();
214     }
215 
216 
217     void testWaitTimeout()
218     {
219         auto sem = new Semaphore;
220         shared bool semReady;
221         bool alertedOne, alertedTwo;
222 
223         void waiter()
224         {
225             while (!atomicLoad(semReady))
226                 Thread.yield();
227             alertedOne = sem.wait(dur!"msecs"(1));
228             alertedTwo = sem.wait(dur!"msecs"(1));
229             assert(alertedOne && !alertedTwo);
230         }
231 
232         auto thread = new Thread(&waiter);
233         thread.start();
234 
235         sem.notify();
236         atomicStore(semReady, true);
237         thread.join();
238         assert(alertedOne && !alertedTwo);
239     }
240 
241     testWait();
242     testWaitTimeout();
243 }