async-win32.c++ (9618B)
1 // Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #if _WIN32 23 24 // Request Vista-level APIs. 25 #include "win32-api-version.h" 26 27 #include "async-win32.h" 28 #include "debug.h" 29 #include <atomic> 30 #include "time.h" 31 #include "refcount.h" 32 #include <ntsecapi.h> // NTSTATUS 33 #include <ntstatus.h> // STATUS_SUCCESS 34 35 #undef ERROR // dammit windows.h 36 37 namespace kj { 38 39 Win32IocpEventPort::Win32IocpEventPort() 40 : clock(systemPreciseMonotonicClock()), 41 iocp(newIocpHandle()), thread(openCurrentThread()), timerImpl(clock.now()) {} 42 43 Win32IocpEventPort::~Win32IocpEventPort() noexcept(false) {} 44 45 class Win32IocpEventPort::IoPromiseAdapter final: public OVERLAPPED { 46 public: 47 IoPromiseAdapter(PromiseFulfiller<IoResult>& fulfiller, Win32IocpEventPort& port, 48 uint64_t offset, IoPromiseAdapter** selfPtr) 49 : fulfiller(fulfiller), port(port) { 50 *selfPtr = this; 51 52 memset(implicitCast<OVERLAPPED*>(this), 0, sizeof(OVERLAPPED)); 53 this->Offset = offset & 0x00000000FFFFFFFFull; 54 this->OffsetHigh = offset >> 32; 55 } 56 57 ~IoPromiseAdapter() { 58 if (handle != INVALID_HANDLE_VALUE) { 59 // Need to cancel the I/O. 60 // 61 // Note: Even if HasOverlappedIoCompleted(this) is true, CancelIoEx() still seems needed to 62 // force the completion event. 63 if (!CancelIoEx(handle, this)) { 64 DWORD error = GetLastError(); 65 66 // ERROR_NOT_FOUND probably means the operation already completed and is enqueued on the 67 // IOCP. 68 // 69 // ERROR_INVALID_HANDLE probably means that, amid a mass of destructors, the HANDLE was 70 // closed before all of the I/O promises were destroyed. We tolerate this so long as the 71 // I/O promises are also destroyed before returning to the event loop, hence the I/O 72 // tasks won't actually continue on a dead handle. 73 // 74 // TODO(cleanup): ERROR_INVALID_HANDLE really shouldn't be allowed. Unfortunately, the 75 // refcounted nature of capabilities and the RPC system seems to mean that objects 76 // are unwound in the wrong order in several of Cap'n Proto's tests. So we live with this 77 // for now. Note that even if a new handle is opened with the same numeric value, it 78 // should be hardless to call CancelIoEx() on it because it couldn't possibly be using 79 // the same OVERLAPPED structure. 80 if (error != ERROR_NOT_FOUND && error != ERROR_INVALID_HANDLE) { 81 KJ_FAIL_WIN32("CancelIoEx()", error, handle); 82 } 83 } 84 85 // We have to wait for the IOCP to poop out the event, so that we can safely destroy the 86 // OVERLAPPED. 87 while (handle != INVALID_HANDLE_VALUE) { 88 port.waitIocp(INFINITE); 89 } 90 } 91 } 92 93 void start(HANDLE handle) { 94 KJ_ASSERT(this->handle == INVALID_HANDLE_VALUE); 95 this->handle = handle; 96 } 97 98 void done(IoResult result) { 99 KJ_ASSERT(handle != INVALID_HANDLE_VALUE); 100 handle = INVALID_HANDLE_VALUE; 101 fulfiller.fulfill(kj::mv(result)); 102 } 103 104 private: 105 PromiseFulfiller<IoResult>& fulfiller; 106 Win32IocpEventPort& port; 107 108 HANDLE handle = INVALID_HANDLE_VALUE; 109 // If an I/O operation is currently enqueued, the handle on which it is enqueued. 110 }; 111 112 class Win32IocpEventPort::IoOperationImpl final: public Win32EventPort::IoOperation { 113 public: 114 explicit IoOperationImpl(Win32IocpEventPort& port, HANDLE handle, uint64_t offset) 115 : handle(handle), 116 promise(newAdaptedPromise<IoResult, IoPromiseAdapter>(port, offset, &promiseAdapter)) {} 117 118 LPOVERLAPPED getOverlapped() override { 119 KJ_REQUIRE(promiseAdapter != nullptr, "already called onComplete()"); 120 return promiseAdapter; 121 } 122 123 Promise<IoResult> onComplete() override { 124 KJ_REQUIRE(promiseAdapter != nullptr, "can only call onComplete() once"); 125 promiseAdapter->start(handle); 126 promiseAdapter = nullptr; 127 return kj::mv(promise); 128 } 129 130 private: 131 HANDLE handle; 132 IoPromiseAdapter* promiseAdapter; 133 Promise<IoResult> promise; 134 }; 135 136 class Win32IocpEventPort::IoObserverImpl final: public Win32EventPort::IoObserver { 137 public: 138 IoObserverImpl(Win32IocpEventPort& port, HANDLE handle) 139 : port(port), handle(handle) { 140 KJ_WIN32(CreateIoCompletionPort(handle, port.iocp, 0, 1), handle, port.iocp.get()); 141 } 142 143 Own<IoOperation> newOperation(uint64_t offset) { 144 return heap<IoOperationImpl>(port, handle, offset); 145 } 146 147 private: 148 Win32IocpEventPort& port; 149 HANDLE handle; 150 }; 151 152 Own<Win32EventPort::IoObserver> Win32IocpEventPort::observeIo(HANDLE handle) { 153 return heap<IoObserverImpl>(*this, handle); 154 } 155 156 Own<Win32EventPort::SignalObserver> Win32IocpEventPort::observeSignalState(HANDLE handle) { 157 return waitThreads.observeSignalState(handle); 158 } 159 160 bool Win32IocpEventPort::wait() { 161 waitIocp(timerImpl.timeoutToNextEvent(clock.now(), MILLISECONDS, INFINITE - 1) 162 .map([](uint64_t t) -> DWORD { return t; }) 163 .orDefault(INFINITE)); 164 165 timerImpl.advanceTo(clock.now()); 166 167 return receivedWake(); 168 } 169 170 bool Win32IocpEventPort::poll() { 171 waitIocp(0); 172 173 return receivedWake(); 174 } 175 176 void Win32IocpEventPort::wake() const { 177 if (!sentWake.load(std::memory_order_acquire)) { 178 sentWake.store(true, std::memory_order_release); 179 KJ_WIN32(PostQueuedCompletionStatus(iocp, 0, 0, nullptr)); 180 } 181 } 182 183 void Win32IocpEventPort::waitIocp(DWORD timeoutMs) { 184 if (isAllowApc) { 185 ULONG countReceived = 0; 186 OVERLAPPED_ENTRY entry; 187 memset(&entry, 0, sizeof(entry)); 188 189 if (GetQueuedCompletionStatusEx(iocp, &entry, 1, &countReceived, timeoutMs, TRUE)) { 190 KJ_ASSERT(countReceived == 1); 191 192 if (entry.lpOverlapped == nullptr) { 193 // wake() called in another thread, or APC queued. 194 } else { 195 DWORD error = ERROR_SUCCESS; 196 if (entry.lpOverlapped->Internal != STATUS_SUCCESS) { 197 error = LsaNtStatusToWinError(entry.lpOverlapped->Internal); 198 } 199 static_cast<IoPromiseAdapter*>(entry.lpOverlapped) 200 ->done(IoResult { error, entry.dwNumberOfBytesTransferred }); 201 } 202 } else { 203 // Call failed. 204 DWORD error = GetLastError(); 205 if (error == WAIT_TIMEOUT || error == WAIT_IO_COMPLETION) { 206 // WAIT_TIMEOUT = timed out (dunno why this isn't ERROR_TIMEOUT??) 207 // WAIT_IO_COMPLETION = APC queued 208 // Either way, nothing to do. 209 return; 210 } else { 211 KJ_FAIL_WIN32("GetQueuedCompletionStatusEx()", error, error, entry.lpOverlapped); 212 } 213 } 214 } else { 215 DWORD bytesTransferred; 216 ULONG_PTR completionKey; 217 LPOVERLAPPED overlapped = nullptr; 218 219 BOOL success = GetQueuedCompletionStatus( 220 iocp, &bytesTransferred, &completionKey, &overlapped, timeoutMs); 221 222 if (overlapped == nullptr) { 223 if (success) { 224 // wake() called in another thread. 225 } else { 226 DWORD error = GetLastError(); 227 if (error == WAIT_TIMEOUT) { 228 // Great, nothing to do. (Why this is WAIT_TIMEOUT and not ERROR_TIMEOUT I'm not sure.) 229 } else { 230 KJ_FAIL_WIN32("GetQueuedCompletionStatus()", error, error, overlapped); 231 } 232 } 233 } else { 234 DWORD error = success ? ERROR_SUCCESS : GetLastError(); 235 static_cast<IoPromiseAdapter*>(overlapped)->done(IoResult { error, bytesTransferred }); 236 } 237 } 238 } 239 240 bool Win32IocpEventPort::receivedWake() { 241 if (sentWake.load(std::memory_order_acquire)) { 242 sentWake.store(false, std::memory_order_release); 243 return true; 244 } else { 245 return false; 246 } 247 } 248 249 AutoCloseHandle Win32IocpEventPort::newIocpHandle() { 250 HANDLE h; 251 KJ_WIN32(h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1)); 252 return AutoCloseHandle(h); 253 } 254 255 AutoCloseHandle Win32IocpEventPort::openCurrentThread() { 256 HANDLE process = GetCurrentProcess(); 257 HANDLE result; 258 KJ_WIN32(DuplicateHandle(process, GetCurrentThread(), process, &result, 259 0, FALSE, DUPLICATE_SAME_ACCESS)); 260 return AutoCloseHandle(result); 261 } 262 263 // ======================================================================================= 264 265 Win32WaitObjectThreadPool::Win32WaitObjectThreadPool(uint mainThreadCount) {} 266 267 Own<Win32EventPort::SignalObserver> Win32WaitObjectThreadPool::observeSignalState(HANDLE handle) { 268 KJ_UNIMPLEMENTED("wait for win32 handles"); 269 } 270 271 uint Win32WaitObjectThreadPool::prepareMainThreadWait(HANDLE* handles[]) { 272 KJ_UNIMPLEMENTED("wait for win32 handles"); 273 } 274 275 bool Win32WaitObjectThreadPool::finishedMainThreadWait(DWORD returnCode) { 276 KJ_UNIMPLEMENTED("wait for win32 handles"); 277 } 278 279 } // namespace kj 280 281 #endif // _WIN32