1 module rt.sys.windows.oscondition; 2 3 version (Windows): 4 5 import core.sync.exception; 6 import core.sync.mutex; 7 import core.time; 8 9 import core.exception : AssertError, staticError; 10 11 import core.sync.semaphore; 12 import core.sys.windows.basetsd /+: HANDLE+/; 13 import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION, 14 DeleteCriticalSection, EnterCriticalSection, INFINITE, InitializeCriticalSection, 15 LeaveCriticalSection, ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/; 16 import core.sys.windows.windef /+: BOOL, DWORD+/; 17 import core.sys.windows.winerror /+: WAIT_TIMEOUT+/; 18 19 20 struct OsCondition 21 { 22 void create(Mutex m) nothrow @trusted @nogc 23 { 24 m_blockLock = CreateSemaphoreA( null, 1, 1, null ); 25 if ( m_blockLock == m_blockLock.init ) 26 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__); 27 scope(failure) CloseHandle( m_blockLock ); 28 29 m_blockQueue = CreateSemaphoreA( null, 0, int.max, null ); 30 if ( m_blockQueue == m_blockQueue.init ) 31 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__); 32 scope(failure) CloseHandle( m_blockQueue ); 33 34 InitializeCriticalSection( cast(RTL_CRITICAL_SECTION*) &m_unblockLock ); 35 m_assocMutex = m; 36 } 37 38 void destroy() @nogc 39 { 40 BOOL rc = CloseHandle( m_blockLock ); 41 assert( rc, "Unable to destroy condition" ); 42 rc = CloseHandle( m_blockQueue ); 43 assert( rc, "Unable to destroy condition" ); 44 DeleteCriticalSection( &m_unblockLock ); 45 } 46 47 void wait() 48 { 49 timedWait( INFINITE ); 50 } 51 52 bool wait( Duration val) 53 { 54 auto maxWaitMillis = dur!("msecs")( uint.max - 1 ); 55 56 while ( val > maxWaitMillis ) 57 { 58 if ( timedWait( cast(uint) 59 maxWaitMillis.total!"msecs" ) ) 60 return true; 61 val -= maxWaitMillis; 62 } 63 return timedWait( cast(uint) val.total!"msecs" ); 64 } 65 66 void notify() 67 { 68 notify_( false ); 69 } 70 71 void notifyAll() 72 { 73 notify_( true ); 74 } 75 76 @property Mutex mutex() 77 { 78 return m_assocMutex; 79 } 80 81 /// ditto 82 @property shared(Mutex) mutex() shared 83 { 84 import core.atomic : atomicLoad; 85 return atomicLoad(m_assocMutex); 86 } 87 88 // undocumented function for internal use 89 final @property Mutex mutex_nothrow() pure nothrow @safe @nogc 90 { 91 return m_assocMutex; 92 } 93 94 // ditto 95 final @property shared(Mutex) mutex_nothrow() shared pure nothrow @safe @nogc 96 { 97 import core.atomic : atomicLoad; 98 return atomicLoad(m_assocMutex); 99 } 100 101 private: 102 bool timedWait( DWORD timeout ) 103 { 104 int numSignalsLeft; 105 int numWaitersGone; 106 DWORD rc; 107 108 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE ); 109 assert( rc == WAIT_OBJECT_0 ); 110 111 EnterCriticalSection( &m_unblockLock ); 112 m_numWaitersBlocked++; 113 LeaveCriticalSection( &m_unblockLock ); 114 115 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); 116 assert( rc ); 117 118 m_assocMutex.unlock(); 119 scope(failure) m_assocMutex.lock(); 120 121 rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, timeout ); 122 assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT ); 123 bool timedOut = (rc == WAIT_TIMEOUT); 124 125 EnterCriticalSection( &m_unblockLock ); 126 scope(failure) LeaveCriticalSection( &m_unblockLock ); 127 128 if ( (numSignalsLeft = m_numWaitersToUnblock) != 0 ) 129 { 130 if ( timedOut ) 131 { 132 // timeout (or canceled) 133 if ( m_numWaitersBlocked != 0 ) 134 { 135 m_numWaitersBlocked--; 136 // do not unblock next waiter below (already unblocked) 137 numSignalsLeft = 0; 138 } 139 else 140 { 141 // spurious wakeup pending!! 142 m_numWaitersGone = 1; 143 } 144 } 145 if ( (--m_numWaitersToUnblock) == 0 ) 146 { 147 if ( m_numWaitersBlocked != 0 ) 148 { 149 // open the gate 150 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); 151 assert( rc ); 152 // do not open the gate below again 153 numSignalsLeft = 0; 154 } 155 else if ( (numWaitersGone = m_numWaitersGone) != 0 ) 156 { 157 m_numWaitersGone = 0; 158 } 159 } 160 } 161 else if ( (++m_numWaitersGone) == int.max / 2 ) 162 { 163 // timeout/canceled or spurious event :-) 164 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE ); 165 assert( rc == WAIT_OBJECT_0 ); 166 // something is going on here - test of timeouts? 167 m_numWaitersBlocked -= m_numWaitersGone; 168 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); 169 assert( rc == WAIT_OBJECT_0 ); 170 m_numWaitersGone = 0; 171 } 172 173 LeaveCriticalSection( &m_unblockLock ); 174 175 if ( numSignalsLeft == 1 ) 176 { 177 // better now than spurious later (same as ResetEvent) 178 for ( ; numWaitersGone > 0; --numWaitersGone ) 179 { 180 rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, INFINITE ); 181 assert( rc == WAIT_OBJECT_0 ); 182 } 183 // open the gate 184 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); 185 assert( rc ); 186 } 187 else if ( numSignalsLeft != 0 ) 188 { 189 // unblock next waiter 190 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null ); 191 assert( rc ); 192 } 193 m_assocMutex.lock(); 194 return !timedOut; 195 } 196 197 198 void notify_( bool all ) 199 { 200 DWORD rc; 201 202 EnterCriticalSection( &m_unblockLock ); 203 scope(failure) LeaveCriticalSection( &m_unblockLock ); 204 205 if ( m_numWaitersToUnblock != 0 ) 206 { 207 if ( m_numWaitersBlocked == 0 ) 208 { 209 LeaveCriticalSection( &m_unblockLock ); 210 return; 211 } 212 if ( all ) 213 { 214 m_numWaitersToUnblock += m_numWaitersBlocked; 215 m_numWaitersBlocked = 0; 216 } 217 else 218 { 219 m_numWaitersToUnblock++; 220 m_numWaitersBlocked--; 221 } 222 LeaveCriticalSection( &m_unblockLock ); 223 } 224 else if ( m_numWaitersBlocked > m_numWaitersGone ) 225 { 226 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE ); 227 assert( rc == WAIT_OBJECT_0 ); 228 if ( 0 != m_numWaitersGone ) 229 { 230 m_numWaitersBlocked -= m_numWaitersGone; 231 m_numWaitersGone = 0; 232 } 233 if ( all ) 234 { 235 m_numWaitersToUnblock = m_numWaitersBlocked; 236 m_numWaitersBlocked = 0; 237 } 238 else 239 { 240 m_numWaitersToUnblock = 1; 241 m_numWaitersBlocked--; 242 } 243 LeaveCriticalSection( &m_unblockLock ); 244 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null ); 245 assert( rc ); 246 } 247 else 248 { 249 LeaveCriticalSection( &m_unblockLock ); 250 } 251 } 252 253 254 // NOTE: This implementation uses Algorithm 8c as described here: 255 // http://groups.google.com/group/comp.programming.threads/ 256 // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a 257 HANDLE m_blockLock; // auto-reset event (now semaphore) 258 HANDLE m_blockQueue; // auto-reset event (now semaphore) 259 Mutex m_assocMutex; // external mutex/CS 260 CRITICAL_SECTION m_unblockLock; // internal mutex/CS 261 int m_numWaitersGone = 0; 262 int m_numWaitersBlocked = 0; 263 int m_numWaitersToUnblock = 0; 264 }