capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

async-win32.c++ (9618B)


      1 // Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #if _WIN32
     23 
     24 // Request Vista-level APIs.
     25 #include "win32-api-version.h"
     26 
     27 #include "async-win32.h"
     28 #include "debug.h"
     29 #include <atomic>
     30 #include "time.h"
     31 #include "refcount.h"
     32 #include <ntsecapi.h>  // NTSTATUS
     33 #include <ntstatus.h>  // STATUS_SUCCESS
     34 
     35 #undef ERROR  // dammit windows.h
     36 
     37 namespace kj {
     38 
     39 Win32IocpEventPort::Win32IocpEventPort()
     40     : clock(systemPreciseMonotonicClock()),
     41       iocp(newIocpHandle()), thread(openCurrentThread()), timerImpl(clock.now()) {}
     42 
     43 Win32IocpEventPort::~Win32IocpEventPort() noexcept(false) {}
     44 
     45 class Win32IocpEventPort::IoPromiseAdapter final: public OVERLAPPED {
     46 public:
     47   IoPromiseAdapter(PromiseFulfiller<IoResult>& fulfiller, Win32IocpEventPort& port,
     48                    uint64_t offset, IoPromiseAdapter** selfPtr)
     49       : fulfiller(fulfiller), port(port) {
     50     *selfPtr = this;
     51 
     52     memset(implicitCast<OVERLAPPED*>(this), 0, sizeof(OVERLAPPED));
     53     this->Offset = offset & 0x00000000FFFFFFFFull;
     54     this->OffsetHigh = offset >> 32;
     55   }
     56 
     57   ~IoPromiseAdapter() {
     58     if (handle != INVALID_HANDLE_VALUE) {
     59       // Need to cancel the I/O.
     60       //
     61       // Note: Even if HasOverlappedIoCompleted(this) is true, CancelIoEx() still seems needed to
     62       //   force the completion event.
     63       if (!CancelIoEx(handle, this)) {
     64         DWORD error = GetLastError();
     65 
     66         // ERROR_NOT_FOUND probably means the operation already completed and is enqueued on the
     67         // IOCP.
     68         //
     69         // ERROR_INVALID_HANDLE probably means that, amid a mass of destructors, the HANDLE was
     70         // closed before all of the I/O promises were destroyed. We tolerate this so long as the
     71         // I/O promises are also destroyed before returning to the event loop, hence the I/O
     72         // tasks won't actually continue on a dead handle.
     73         //
     74         // TODO(cleanup): ERROR_INVALID_HANDLE really shouldn't be allowed. Unfortunately, the
     75         //   refcounted nature of capabilities and the RPC system seems to mean that objects
     76         //   are unwound in the wrong order in several of Cap'n Proto's tests. So we live with this
     77         //   for now. Note that even if a new handle is opened with the same numeric value, it
     78         //   should be hardless to call CancelIoEx() on it because it couldn't possibly be using
     79         //   the same OVERLAPPED structure.
     80         if (error != ERROR_NOT_FOUND && error != ERROR_INVALID_HANDLE) {
     81           KJ_FAIL_WIN32("CancelIoEx()", error, handle);
     82         }
     83       }
     84 
     85       // We have to wait for the IOCP to poop out the event, so that we can safely destroy the
     86       // OVERLAPPED.
     87       while (handle != INVALID_HANDLE_VALUE) {
     88         port.waitIocp(INFINITE);
     89       }
     90     }
     91   }
     92 
     93   void start(HANDLE handle) {
     94     KJ_ASSERT(this->handle == INVALID_HANDLE_VALUE);
     95     this->handle = handle;
     96   }
     97 
     98   void done(IoResult result) {
     99     KJ_ASSERT(handle != INVALID_HANDLE_VALUE);
    100     handle = INVALID_HANDLE_VALUE;
    101     fulfiller.fulfill(kj::mv(result));
    102   }
    103 
    104 private:
    105   PromiseFulfiller<IoResult>& fulfiller;
    106   Win32IocpEventPort& port;
    107 
    108   HANDLE handle = INVALID_HANDLE_VALUE;
    109   // If an I/O operation is currently enqueued, the handle on which it is enqueued.
    110 };
    111 
    112 class Win32IocpEventPort::IoOperationImpl final: public Win32EventPort::IoOperation {
    113 public:
    114   explicit IoOperationImpl(Win32IocpEventPort& port, HANDLE handle, uint64_t offset)
    115       : handle(handle),
    116         promise(newAdaptedPromise<IoResult, IoPromiseAdapter>(port, offset, &promiseAdapter)) {}
    117 
    118   LPOVERLAPPED getOverlapped() override {
    119     KJ_REQUIRE(promiseAdapter != nullptr, "already called onComplete()");
    120     return promiseAdapter;
    121   }
    122 
    123   Promise<IoResult> onComplete() override {
    124     KJ_REQUIRE(promiseAdapter != nullptr, "can only call onComplete() once");
    125     promiseAdapter->start(handle);
    126     promiseAdapter = nullptr;
    127     return kj::mv(promise);
    128   }
    129 
    130 private:
    131   HANDLE handle;
    132   IoPromiseAdapter* promiseAdapter;
    133   Promise<IoResult> promise;
    134 };
    135 
    136 class Win32IocpEventPort::IoObserverImpl final: public Win32EventPort::IoObserver {
    137 public:
    138   IoObserverImpl(Win32IocpEventPort& port, HANDLE handle)
    139       : port(port), handle(handle) {
    140     KJ_WIN32(CreateIoCompletionPort(handle, port.iocp, 0, 1), handle, port.iocp.get());
    141   }
    142 
    143   Own<IoOperation> newOperation(uint64_t offset) {
    144     return heap<IoOperationImpl>(port, handle, offset);
    145   }
    146 
    147 private:
    148   Win32IocpEventPort& port;
    149   HANDLE handle;
    150 };
    151 
    152 Own<Win32EventPort::IoObserver> Win32IocpEventPort::observeIo(HANDLE handle) {
    153   return heap<IoObserverImpl>(*this, handle);
    154 }
    155 
    156 Own<Win32EventPort::SignalObserver> Win32IocpEventPort::observeSignalState(HANDLE handle) {
    157   return waitThreads.observeSignalState(handle);
    158 }
    159 
    160 bool Win32IocpEventPort::wait() {
    161   waitIocp(timerImpl.timeoutToNextEvent(clock.now(), MILLISECONDS, INFINITE - 1)
    162       .map([](uint64_t t) -> DWORD { return t; })
    163       .orDefault(INFINITE));
    164 
    165   timerImpl.advanceTo(clock.now());
    166 
    167   return receivedWake();
    168 }
    169 
    170 bool Win32IocpEventPort::poll() {
    171   waitIocp(0);
    172 
    173   return receivedWake();
    174 }
    175 
    176 void Win32IocpEventPort::wake() const {
    177   if (!sentWake.load(std::memory_order_acquire)) {
    178     sentWake.store(true, std::memory_order_release);
    179     KJ_WIN32(PostQueuedCompletionStatus(iocp, 0, 0, nullptr));
    180   }
    181 }
    182 
    183 void Win32IocpEventPort::waitIocp(DWORD timeoutMs) {
    184   if (isAllowApc) {
    185     ULONG countReceived = 0;
    186     OVERLAPPED_ENTRY entry;
    187     memset(&entry, 0, sizeof(entry));
    188 
    189     if (GetQueuedCompletionStatusEx(iocp, &entry, 1, &countReceived, timeoutMs, TRUE)) {
    190       KJ_ASSERT(countReceived == 1);
    191 
    192       if (entry.lpOverlapped == nullptr) {
    193         // wake() called in another thread, or APC queued.
    194       } else {
    195         DWORD error = ERROR_SUCCESS;
    196         if (entry.lpOverlapped->Internal != STATUS_SUCCESS) {
    197           error = LsaNtStatusToWinError(entry.lpOverlapped->Internal);
    198         }
    199         static_cast<IoPromiseAdapter*>(entry.lpOverlapped)
    200             ->done(IoResult { error, entry.dwNumberOfBytesTransferred });
    201       }
    202     } else {
    203       // Call failed.
    204       DWORD error = GetLastError();
    205       if (error == WAIT_TIMEOUT || error == WAIT_IO_COMPLETION) {
    206         // WAIT_TIMEOUT = timed out (dunno why this isn't ERROR_TIMEOUT??)
    207         // WAIT_IO_COMPLETION = APC queued
    208         // Either way, nothing to do.
    209         return;
    210       } else {
    211         KJ_FAIL_WIN32("GetQueuedCompletionStatusEx()", error, error, entry.lpOverlapped);
    212       }
    213     }
    214   } else {
    215     DWORD bytesTransferred;
    216     ULONG_PTR completionKey;
    217     LPOVERLAPPED overlapped = nullptr;
    218 
    219     BOOL success = GetQueuedCompletionStatus(
    220         iocp, &bytesTransferred, &completionKey, &overlapped, timeoutMs);
    221 
    222     if (overlapped == nullptr) {
    223       if (success) {
    224         // wake() called in another thread.
    225       } else {
    226         DWORD error = GetLastError();
    227         if (error == WAIT_TIMEOUT) {
    228           // Great, nothing to do. (Why this is WAIT_TIMEOUT and not ERROR_TIMEOUT I'm not sure.)
    229         } else {
    230           KJ_FAIL_WIN32("GetQueuedCompletionStatus()", error, error, overlapped);
    231         }
    232       }
    233     } else {
    234       DWORD error = success ? ERROR_SUCCESS : GetLastError();
    235       static_cast<IoPromiseAdapter*>(overlapped)->done(IoResult { error, bytesTransferred });
    236     }
    237   }
    238 }
    239 
    240 bool Win32IocpEventPort::receivedWake() {
    241   if (sentWake.load(std::memory_order_acquire)) {
    242     sentWake.store(false, std::memory_order_release);
    243     return true;
    244   } else {
    245     return false;
    246   }
    247 }
    248 
    249 AutoCloseHandle Win32IocpEventPort::newIocpHandle() {
    250   HANDLE h;
    251   KJ_WIN32(h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1));
    252   return AutoCloseHandle(h);
    253 }
    254 
    255 AutoCloseHandle Win32IocpEventPort::openCurrentThread() {
    256   HANDLE process = GetCurrentProcess();
    257   HANDLE result;
    258   KJ_WIN32(DuplicateHandle(process, GetCurrentThread(), process, &result,
    259                            0, FALSE, DUPLICATE_SAME_ACCESS));
    260   return AutoCloseHandle(result);
    261 }
    262 
    263 // =======================================================================================
    264 
    265 Win32WaitObjectThreadPool::Win32WaitObjectThreadPool(uint mainThreadCount) {}
    266 
    267 Own<Win32EventPort::SignalObserver> Win32WaitObjectThreadPool::observeSignalState(HANDLE handle) {
    268   KJ_UNIMPLEMENTED("wait for win32 handles");
    269 }
    270 
    271 uint Win32WaitObjectThreadPool::prepareMainThreadWait(HANDLE* handles[]) {
    272   KJ_UNIMPLEMENTED("wait for win32 handles");
    273 }
    274 
    275 bool Win32WaitObjectThreadPool::finishedMainThreadWait(DWORD returnCode) {
    276   KJ_UNIMPLEMENTED("wait for win32 handles");
    277 }
    278 
    279 } // namespace kj
    280 
    281 #endif  // _WIN32