The OpenD Programming Language

1 module rt.sys.windows.oscondition;
2 
3 version (Windows):
4 
5 import core.sync.exception;
6 import core.sync.mutex;
7 import core.time;
8 
9 import core.exception : AssertError, staticError;
10 
11 import core.sync.semaphore;
12 import core.sys.windows.basetsd /+: HANDLE+/;
13 import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION,
14     DeleteCriticalSection, EnterCriticalSection, INFINITE, InitializeCriticalSection,
15     LeaveCriticalSection, ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/;
16 import core.sys.windows.windef /+: BOOL, DWORD+/;
17 import core.sys.windows.winerror /+: WAIT_TIMEOUT+/;
18 
19 
20 struct OsCondition
21 {
22     void create(Mutex m) nothrow @trusted @nogc
23     {
24         m_blockLock = CreateSemaphoreA( null, 1, 1, null );
25         if ( m_blockLock == m_blockLock.init )
26             throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__);
27         scope(failure) CloseHandle( m_blockLock );
28 
29         m_blockQueue = CreateSemaphoreA( null, 0, int.max, null );
30         if ( m_blockQueue == m_blockQueue.init )
31             throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__);
32         scope(failure) CloseHandle( m_blockQueue );
33 
34         InitializeCriticalSection( cast(RTL_CRITICAL_SECTION*) &m_unblockLock );
35         m_assocMutex = m;
36     }
37 
38     void destroy() @nogc
39     {
40         BOOL rc = CloseHandle( m_blockLock );
41         assert( rc, "Unable to destroy condition" );
42         rc = CloseHandle( m_blockQueue );
43         assert( rc, "Unable to destroy condition" );
44         DeleteCriticalSection( &m_unblockLock );
45     }
46 
47     void wait()
48     {
49         timedWait( INFINITE );
50     }
51 
52     bool wait( Duration val)
53     {
54         auto maxWaitMillis = dur!("msecs")( uint.max - 1 );
55 
56         while ( val > maxWaitMillis )
57         {
58             if ( timedWait( cast(uint)
59                             maxWaitMillis.total!"msecs" ) )
60                 return true;
61             val -= maxWaitMillis;
62         }
63         return timedWait( cast(uint) val.total!"msecs" );
64     }
65 
66     void notify()
67     {
68         notify_( false );
69     }
70 
71     void notifyAll()
72     {
73         notify_( true );
74     }
75 
76     @property Mutex mutex()
77     {
78         return m_assocMutex;
79     }
80 
81      /// ditto
82     @property shared(Mutex) mutex() shared
83     {
84         import core.atomic : atomicLoad;
85         return atomicLoad(m_assocMutex);
86     }
87 
88     // undocumented function for internal use
89     final @property Mutex mutex_nothrow() pure nothrow @safe @nogc
90     {
91         return m_assocMutex;
92     }
93 
94     // ditto
95     final @property shared(Mutex) mutex_nothrow() shared pure nothrow @safe @nogc
96     {
97         import core.atomic : atomicLoad;
98         return atomicLoad(m_assocMutex);
99     }
100 
101 private:
102     bool timedWait( DWORD timeout )
103     {
104         int   numSignalsLeft;
105         int   numWaitersGone;
106         DWORD rc;
107 
108         rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
109         assert( rc == WAIT_OBJECT_0 );
110 
111         EnterCriticalSection( &m_unblockLock );
112         m_numWaitersBlocked++;
113         LeaveCriticalSection( &m_unblockLock );
114 
115         rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
116         assert( rc );
117 
118         m_assocMutex.unlock();
119         scope(failure) m_assocMutex.lock();
120 
121         rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, timeout );
122         assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
123         bool timedOut = (rc == WAIT_TIMEOUT);
124 
125         EnterCriticalSection( &m_unblockLock );
126         scope(failure) LeaveCriticalSection( &m_unblockLock );
127 
128         if ( (numSignalsLeft = m_numWaitersToUnblock) != 0 )
129         {
130             if ( timedOut )
131             {
132                 // timeout (or canceled)
133                 if ( m_numWaitersBlocked != 0 )
134                 {
135                     m_numWaitersBlocked--;
136                     // do not unblock next waiter below (already unblocked)
137                     numSignalsLeft = 0;
138                 }
139                 else
140                 {
141                     // spurious wakeup pending!!
142                     m_numWaitersGone = 1;
143                 }
144             }
145             if ( (--m_numWaitersToUnblock) == 0 )
146             {
147                 if ( m_numWaitersBlocked != 0 )
148                 {
149                     // open the gate
150                     rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
151                     assert( rc );
152                     // do not open the gate below again
153                     numSignalsLeft = 0;
154                 }
155                 else if ( (numWaitersGone = m_numWaitersGone) != 0 )
156                 {
157                     m_numWaitersGone = 0;
158                 }
159             }
160         }
161         else if ( (++m_numWaitersGone) == int.max / 2 )
162         {
163             // timeout/canceled or spurious event :-)
164             rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
165             assert( rc == WAIT_OBJECT_0 );
166             // something is going on here - test of timeouts?
167             m_numWaitersBlocked -= m_numWaitersGone;
168             rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
169             assert( rc == WAIT_OBJECT_0 );
170             m_numWaitersGone = 0;
171         }
172 
173         LeaveCriticalSection( &m_unblockLock );
174 
175         if ( numSignalsLeft == 1 )
176         {
177             // better now than spurious later (same as ResetEvent)
178             for ( ; numWaitersGone > 0; --numWaitersGone )
179             {
180                 rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, INFINITE );
181                 assert( rc == WAIT_OBJECT_0 );
182             }
183             // open the gate
184             rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
185             assert( rc );
186         }
187         else if ( numSignalsLeft != 0 )
188         {
189             // unblock next waiter
190             rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null );
191             assert( rc );
192         }
193         m_assocMutex.lock();
194         return !timedOut;
195     }
196 
197 
198     void notify_( bool all )
199     {
200         DWORD rc;
201 
202         EnterCriticalSection( &m_unblockLock );
203         scope(failure) LeaveCriticalSection( &m_unblockLock );
204 
205         if ( m_numWaitersToUnblock != 0 )
206         {
207             if ( m_numWaitersBlocked == 0 )
208             {
209                 LeaveCriticalSection( &m_unblockLock );
210                 return;
211             }
212             if ( all )
213             {
214                 m_numWaitersToUnblock += m_numWaitersBlocked;
215                 m_numWaitersBlocked = 0;
216             }
217             else
218             {
219                 m_numWaitersToUnblock++;
220                 m_numWaitersBlocked--;
221             }
222             LeaveCriticalSection( &m_unblockLock );
223         }
224         else if ( m_numWaitersBlocked > m_numWaitersGone )
225         {
226             rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
227             assert( rc == WAIT_OBJECT_0 );
228             if ( 0 != m_numWaitersGone )
229             {
230                 m_numWaitersBlocked -= m_numWaitersGone;
231                 m_numWaitersGone = 0;
232             }
233             if ( all )
234             {
235                 m_numWaitersToUnblock = m_numWaitersBlocked;
236                 m_numWaitersBlocked = 0;
237             }
238             else
239             {
240                 m_numWaitersToUnblock = 1;
241                 m_numWaitersBlocked--;
242             }
243             LeaveCriticalSection( &m_unblockLock );
244             rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null );
245             assert( rc );
246         }
247         else
248         {
249             LeaveCriticalSection( &m_unblockLock );
250         }
251     }
252 
253 
254     // NOTE: This implementation uses Algorithm 8c as described here:
255     //       http://groups.google.com/group/comp.programming.threads/
256     //              browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a
257     HANDLE              m_blockLock;    // auto-reset event (now semaphore)
258     HANDLE              m_blockQueue;   // auto-reset event (now semaphore)
259     Mutex               m_assocMutex;   // external mutex/CS
260     CRITICAL_SECTION    m_unblockLock;  // internal mutex/CS
261     int                 m_numWaitersGone        = 0;
262     int                 m_numWaitersBlocked     = 0;
263     int                 m_numWaitersToUnblock   = 0;
264 }