capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

async-coroutine-test.c++ (17153B)


      1 // Copyright (c) 2020 Cloudflare, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #include <kj/async.h>
     23 #include <kj/array.h>
     24 #include <kj/compat/http.h>
     25 #include <kj/debug.h>
     26 #include <kj/test.h>
     27 
     28 namespace kj {
     29 namespace {
     30 
     31 #ifdef KJ_HAS_COROUTINE
     32 
     33 template <typename T>
     34 Promise<kj::Decay<T>> identity(T&& value) {
     35   co_return kj::fwd<T>(value);
     36 }
     37 // Work around a bonkers MSVC ICE with a separate overload.
     38 Promise<const char*> identity(const char* value) {
     39   co_return value;
     40 }
     41 
     42 KJ_TEST("Identity coroutine") {
     43   EventLoop loop;
     44   WaitScope waitScope(loop);
     45 
     46   KJ_EXPECT(identity(123).wait(waitScope) == 123);
     47   KJ_EXPECT(*identity(kj::heap(456)).wait(waitScope) == 456);
     48 
     49   {
     50     auto p = identity("we can cancel the coroutine");
     51   }
     52 }
     53 
     54 template <typename T>
     55 Promise<T> simpleCoroutine(kj::Promise<T> result, kj::Promise<bool> dontThrow = true) {
     56   KJ_ASSERT(co_await dontThrow);
     57   co_return co_await result;
     58 }
     59 
     60 KJ_TEST("Simple coroutine test") {
     61   EventLoop loop;
     62   WaitScope waitScope(loop);
     63 
     64   simpleCoroutine(kj::Promise<void>(kj::READY_NOW)).wait(waitScope);
     65 
     66   KJ_EXPECT(simpleCoroutine(kj::Promise<int>(123)).wait(waitScope) == 123);
     67 }
     68 
     69 struct Counter {
     70   size_t& count;
     71   Counter(size_t& count): count(count) {}
     72   ~Counter() { ++count; }
     73   KJ_DISALLOW_COPY(Counter);
     74 };
     75 
     76 kj::Promise<void> countDtorsAroundAwait(size_t& count, kj::Promise<void> promise) {
     77   Counter counter1(count);
     78   co_await promise;
     79   Counter counter2(count);
     80   co_return;
     81 };
     82 
     83 KJ_TEST("co_awaiting an immediate promise does not suspend if the event loop is empty and running") {
     84   // The coroutine PromiseNode implementation contains an optimization which allows us to avoid
     85   // suspending the coroutine and instead immediately call PromiseNode::get() and proceed with
     86   // execution.
     87 
     88   EventLoop loop;
     89   WaitScope waitScope(loop);
     90 
     91   // The immediate-execution optimization is only enabled when the event loop is running, so use an
     92   // eagerly-evaluated evalLater() to perform the test from within the event loop. (If we didn't
     93   // eagerly-evaluate the promise, the result would be extracted after the loop finished.)
     94   kj::evalLater([&]() {
     95     size_t count = 0;
     96 
     97     auto promise = kj::Promise<void>(kj::READY_NOW);
     98     auto coroPromise = countDtorsAroundAwait(count, kj::READY_NOW);
     99 
    100     // `coro` has not been destroyed yet, but it completed and unwound its frame.
    101     KJ_EXPECT(count == 2);
    102   }).eagerlyEvaluate(nullptr).wait(waitScope);
    103 
    104   kj::evalLater([&]() {
    105     // If there are no background tasks in the queue, coroutines execute through an evalLater()
    106     // without suspending.
    107 
    108     size_t count = 0;
    109     bool evalLaterRan = false;
    110 
    111     auto promise = kj::evalLater([&]() { evalLaterRan = true; });
    112     auto coroPromise = countDtorsAroundAwait(count, kj::mv(promise));
    113 
    114     KJ_EXPECT(evalLaterRan == true);
    115     KJ_EXPECT(count == 2);
    116   }).eagerlyEvaluate(nullptr).wait(waitScope);
    117 }
    118 
    119 KJ_TEST("co_awaiting an immediate promise suspends if the event loop is not running") {
    120   // We only want to enable the immediate-execution optimization if the event loop is running, or
    121   // else a whole bunch of RPC tests break, because some .then()s get evaluated on promise
    122   // construction, before any .wait() call.
    123 
    124   EventLoop loop;
    125   WaitScope waitScope(loop);
    126 
    127   size_t count = 0;
    128 
    129   auto promise = kj::Promise<void>(kj::READY_NOW);
    130   auto coroPromise = countDtorsAroundAwait(count, kj::READY_NOW);
    131 
    132   // In the previous test, this exact same code executed immediately because the event loop was
    133   // running.
    134   KJ_EXPECT(count == 0);
    135 }
    136 
    137 KJ_TEST("co_awaiting immediate promises suspends if the event loop is not empty") {
    138   // We want to make sure that we can still return to the event loop when we need to.
    139 
    140   EventLoop loop;
    141   WaitScope waitScope(loop);
    142 
    143   // The immediate-execution optimization is only enabled when the event loop is running, so use an
    144   // eagerly-evaluated evalLater() to perform the test from within the event loop. (If we didn't
    145   // eagerly-evaluate the promise, the result would be extracted after the loop finished.)
    146   kj::evalLater([&]() {
    147     size_t count = 0;
    148 
    149     // We need to enqueue an Event on the event loop to inhibit the immediate-execution
    150     // optimization. Creating and then immediately fulfilling an EagerPromiseNode is a convenient
    151     // way to do so.
    152     auto paf = newPromiseAndFulfiller<void>();
    153     paf.promise = paf.promise.eagerlyEvaluate(nullptr);
    154     paf.fulfiller->fulfill();
    155 
    156     auto promise = kj::Promise<void>(kj::READY_NOW);
    157     auto coroPromise = countDtorsAroundAwait(count, kj::READY_NOW);
    158 
    159     // We didn't immediately extract the READY_NOW.
    160     KJ_EXPECT(count == 0);
    161   }).eagerlyEvaluate(nullptr).wait(waitScope);
    162 
    163   kj::evalLater([&]() {
    164     size_t count = 0;
    165     bool evalLaterRan = false;
    166 
    167     // We need to enqueue an Event on the event loop to inhibit the immediate-execution
    168     // optimization. Creating and then immediately fulfilling an EagerPromiseNode is a convenient
    169     // way to do so.
    170     auto paf = newPromiseAndFulfiller<void>();
    171     paf.promise = paf.promise.eagerlyEvaluate(nullptr);
    172     paf.fulfiller->fulfill();
    173 
    174     auto promise = kj::evalLater([&]() { evalLaterRan = true; });
    175     auto coroPromise = countDtorsAroundAwait(count, kj::mv(promise));
    176 
    177     // We didn't continue through the evalLater() promise, because the background promise's
    178     // continuation was next in the event loop's queue.
    179     KJ_EXPECT(evalLaterRan == false);
    180     // No Counter destructor has run.
    181     KJ_EXPECT(count == 0, count);
    182   }).eagerlyEvaluate(nullptr).wait(waitScope);
    183 }
    184 
    185 KJ_TEST("Exceptions propagate through layered coroutines") {
    186   EventLoop loop;
    187   WaitScope waitScope(loop);
    188 
    189   auto throwy = simpleCoroutine(kj::Promise<int>(kj::NEVER_DONE), false);
    190 
    191   KJ_EXPECT_THROW_RECOVERABLE(FAILED, simpleCoroutine(kj::mv(throwy)).wait(waitScope));
    192 }
    193 
    194 KJ_TEST("Exceptions before the first co_await don't escape, but reject the promise") {
    195   EventLoop loop;
    196   WaitScope waitScope(loop);
    197 
    198   auto throwEarly = []() -> Promise<void> {
    199     KJ_FAIL_ASSERT("test exception");
    200 #ifdef __GNUC__
    201 // Yes, this `co_return` is unreachable. But without it, this function is no longer a coroutine.
    202 #pragma GCC diagnostic push
    203 #pragma GCC diagnostic ignored "-Wunreachable-code"
    204 #endif  // __GNUC__
    205     co_return;
    206 #ifdef __GNUC__
    207 #pragma GCC diagnostic pop
    208 #endif  // __GNUC__
    209   };
    210 
    211   auto throwy = throwEarly();
    212 
    213   KJ_EXPECT_THROW_RECOVERABLE(FAILED, throwy.wait(waitScope));
    214 }
    215 
    216 KJ_TEST("Coroutines can catch exceptions from co_await") {
    217   EventLoop loop;
    218   WaitScope waitScope(loop);
    219 
    220   kj::String description;
    221 
    222   auto tryCatch = [&](kj::Promise<void> promise) -> kj::Promise<kj::String> {
    223     try {
    224       co_await promise;
    225     } catch (const kj::Exception& exception) {
    226       co_return kj::str(exception.getDescription());
    227     }
    228     KJ_FAIL_EXPECT("should have thrown");
    229     KJ_UNREACHABLE;
    230   };
    231 
    232   {
    233     // Immediately ready case.
    234     auto promise = kj::Promise<void>(KJ_EXCEPTION(FAILED, "catch me"));
    235     KJ_EXPECT(tryCatch(kj::mv(promise)).wait(waitScope) == "catch me");
    236   }
    237 
    238   {
    239     // Ready later case.
    240     auto promise = kj::evalLater([]() -> kj::Promise<void> {
    241       return KJ_EXCEPTION(FAILED, "catch me");
    242     });
    243     KJ_EXPECT(tryCatch(kj::mv(promise)).wait(waitScope) == "catch me");
    244   }
    245 }
    246 
    247 KJ_TEST("Coroutines can be canceled while suspended") {
    248   EventLoop loop;
    249   WaitScope waitScope(loop);
    250 
    251   size_t count = 0;
    252 
    253   auto coro = [&](kj::Promise<int> promise) -> kj::Promise<void> {
    254     Counter counter1(count);
    255     co_await kj::evalLater([](){});
    256     Counter counter2(count);
    257     co_await promise;
    258   };
    259 
    260   {
    261     auto neverDone = kj::Promise<int>(kj::NEVER_DONE);
    262     neverDone = neverDone.attach(kj::heap<Counter>(count));
    263     auto promise = coro(kj::mv(neverDone));
    264     KJ_EXPECT(!promise.poll(waitScope));
    265   }
    266 
    267   // Stack variables on both sides of a co_await, plus coroutine arguments are destroyed.
    268   KJ_EXPECT(count == 3);
    269 }
    270 
    271 kj::Promise<void> deferredThrowCoroutine(kj::Promise<void> awaitMe) {
    272   KJ_DEFER(kj::throwFatalException(KJ_EXCEPTION(FAILED, "thrown during unwind")));
    273   co_await awaitMe;
    274   co_return;
    275 };
    276 
    277 KJ_TEST("Exceptions during suspended coroutine frame-unwind propagate via destructor") {
    278   EventLoop loop;
    279   WaitScope waitScope(loop);
    280 
    281   auto exception = KJ_ASSERT_NONNULL(kj::runCatchingExceptions([&]() {
    282     deferredThrowCoroutine(kj::NEVER_DONE);
    283   }));
    284 
    285   KJ_EXPECT(exception.getDescription() == "thrown during unwind");
    286 };
    287 
    288 KJ_TEST("Exceptions during suspended coroutine frame-unwind do not cause a memory leak") {
    289   EventLoop loop;
    290   WaitScope waitScope(loop);
    291 
    292   // We can't easily test for memory leaks without hooking operator new and delete. However, we can
    293   // arrange for the test to crash on failure, by having the coroutine suspend at a promise that we
    294   // later fulfill, thus arming the Coroutine's Event. If we fail to destroy the coroutine in this
    295   // state, EventLoop will throw on destruction because it can still see the Event in its list.
    296 
    297   auto exception = KJ_ASSERT_NONNULL(kj::runCatchingExceptions([&]() {
    298     auto paf = kj::newPromiseAndFulfiller<void>();
    299 
    300     auto coroPromise = deferredThrowCoroutine(kj::mv(paf.promise));
    301 
    302     // Arm the Coroutine's Event.
    303     paf.fulfiller->fulfill();
    304 
    305     // If destroying `coroPromise` does not run ~Event(), then ~EventLoop() will crash later.
    306   }));
    307 
    308   KJ_EXPECT(exception.getDescription() == "thrown during unwind");
    309 };
    310 
    311 KJ_TEST("Exceptions during completed coroutine frame-unwind propagate via returned Promise") {
    312   EventLoop loop;
    313   WaitScope waitScope(loop);
    314 
    315   {
    316     // First, prove that exceptions don't escape the destructor of a completed coroutine.
    317     auto promise = deferredThrowCoroutine(kj::READY_NOW);
    318     KJ_EXPECT(promise.poll(waitScope));
    319   }
    320 
    321   {
    322     // Next, prove that they show up via the returned Promise.
    323     auto promise = deferredThrowCoroutine(kj::READY_NOW);
    324     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("thrown during unwind", promise.wait(waitScope));
    325   }
    326 }
    327 
    328 KJ_TEST("Coroutine destruction exceptions are ignored if there is another exception in flight") {
    329   EventLoop loop;
    330   WaitScope waitScope(loop);
    331 
    332   auto exception = KJ_ASSERT_NONNULL(kj::runCatchingExceptions([&]() {
    333     auto promise = deferredThrowCoroutine(kj::NEVER_DONE);
    334     kj::throwFatalException(KJ_EXCEPTION(FAILED, "thrown before destroying throwy promise"));
    335   }));
    336 
    337   KJ_EXPECT(exception.getDescription() == "thrown before destroying throwy promise");
    338 }
    339 
    340 KJ_TEST("co_await only sees coroutine destruction exceptions if promise was not rejected") {
    341   EventLoop loop;
    342   WaitScope waitScope(loop);
    343 
    344   // throwyDtorPromise is an immediate void promise that will throw when it's destroyed, which
    345   // we expect to be able to catch from a coroutine which co_awaits it.
    346   auto throwyDtorPromise = kj::Promise<void>(kj::READY_NOW)
    347       .attach(kj::defer([]() {
    348     kj::throwFatalException(KJ_EXCEPTION(FAILED, "thrown during unwind"));
    349   }));
    350 
    351   // rejectedThrowyDtorPromise is a rejected promise. When co_awaited in a coroutine,
    352   // Awaiter::await_resume() will throw that exception for us to catch, but before we can catch it,
    353   // the temporary promise will be destroyed. The exception it throws during unwind will be ignored,
    354   // and the caller of the coroutine will see only the "thrown during execution" exception.
    355   auto rejectedThrowyDtorPromise = kj::evalNow([&]() -> kj::Promise<void> {
    356     kj::throwFatalException(KJ_EXCEPTION(FAILED, "thrown during execution"));
    357   }).attach(kj::defer([]() {
    358     kj::throwFatalException(KJ_EXCEPTION(FAILED, "thrown during unwind"));
    359   }));
    360 
    361   auto awaitPromise = [](kj::Promise<void> promise) -> kj::Promise<void> {
    362     co_await promise;
    363   };
    364 
    365   KJ_EXPECT_THROW_MESSAGE("thrown during unwind",
    366       awaitPromise(kj::mv(throwyDtorPromise)).wait(waitScope));
    367 
    368   KJ_EXPECT_THROW_MESSAGE("thrown during execution",
    369       awaitPromise(kj::mv(rejectedThrowyDtorPromise)).wait(waitScope));
    370 }
    371 
    372 uint countLines(StringPtr s) {
    373   uint lines = 0;
    374   for (char c: s) {
    375     lines += c == '\n';
    376   }
    377   return lines;
    378 }
    379 
    380 #if (!_MSC_VER || defined(__clang__)) && !__aarch64__
    381 // TODO(msvc): This test relies on GetFunctorStartAddress, which is not supported on MSVC currently,
    382 //   so skip the test.
    383 // TODO(someday): Test is flakey on arm64, depending on how it's compiled. I haven't had a chance to
    384 //   investigate much, but noticed that it failed in a debug build, but passed in a local opt build.
    385 KJ_TEST("Can trace through coroutines") {
    386   // This verifies that async traces, generated either from promises or from events, can see through
    387   // coroutines.
    388   //
    389   // This test may be a bit brittle because it depends on specific trace counts.
    390 
    391   // Enable stack traces, even in release mode.
    392   class EnableFullStackTrace: public ExceptionCallback {
    393   public:
    394     StackTraceMode stackTraceMode() override { return StackTraceMode::FULL; }
    395   };
    396   EnableFullStackTrace exceptionCallback;
    397 
    398   EventLoop loop;
    399   WaitScope waitScope(loop);
    400 
    401   auto paf = newPromiseAndFulfiller<void>();
    402 
    403   // Get an async trace when the promise is fulfilled. We eagerlyEvaluate() to make sure the
    404   // continuation executes while the event loop is running.
    405   paf.promise = paf.promise.then([]() {
    406     auto trace = getAsyncTrace();
    407     // We expect one entry for waitImpl(), one for the coroutine, and one for this continuation.
    408     // When building in debug mode with CMake, I observed this count can be 2. The missing frame is
    409     // probably this continuation. Let's just expect a range.
    410     auto count = countLines(trace);
    411     KJ_EXPECT(0 < count && count <= 3);
    412   }).eagerlyEvaluate(nullptr);
    413 
    414   auto coroPromise = [&]() -> kj::Promise<void> {
    415     co_await paf.promise;
    416   }();
    417 
    418   {
    419     auto trace = coroPromise.trace();
    420     // One for the Coroutine PromiseNode, one for paf.promise.
    421     KJ_EXPECT(countLines(trace) >= 2);
    422   }
    423 
    424   paf.fulfiller->fulfill();
    425 
    426   coroPromise.wait(waitScope);
    427 }
    428 #endif  // !_MSC_VER || defined(__clang__)
    429 
    430 Promise<void> sendData(Promise<Own<NetworkAddress>> addressPromise) {
    431   auto address = co_await addressPromise;
    432   auto client = co_await address->connect();
    433   co_await client->write("foo", 3);
    434 }
    435 
    436 Promise<String> receiveDataCoroutine(Own<ConnectionReceiver> listener) {
    437   auto server = co_await listener->accept();
    438   char buffer[4];
    439   auto n = co_await server->read(buffer, 3, 4);
    440   KJ_EXPECT(3u == n);
    441   co_return heapString(buffer, n);
    442 }
    443 
    444 KJ_TEST("Simple network test with coroutine") {
    445   auto io = setupAsyncIo();
    446   auto& network = io.provider->getNetwork();
    447 
    448   Own<NetworkAddress> serverAddress = network.parseAddress("*", 0).wait(io.waitScope);
    449   Own<ConnectionReceiver> listener = serverAddress->listen();
    450 
    451   sendData(network.parseAddress("localhost", listener->getPort()))
    452       .detach([](Exception&& exception) {
    453     KJ_FAIL_EXPECT(exception);
    454   });
    455 
    456   String result = receiveDataCoroutine(kj::mv(listener)).wait(io.waitScope);
    457 
    458   KJ_EXPECT("foo" == result);
    459 }
    460 
    461 Promise<Own<AsyncIoStream>> httpClientConnect(AsyncIoContext& io) {
    462   auto addr = co_await io.provider->getNetwork().parseAddress("capnproto.org", 80);
    463   co_return co_await addr->connect();
    464 }
    465 
    466 Promise<void> httpClient(Own<AsyncIoStream> connection) {
    467   // Borrowed and rewritten from compat/http-test.c++.
    468 
    469   HttpHeaderTable table;
    470   auto client = newHttpClient(table, *connection);
    471 
    472   HttpHeaders headers(table);
    473   headers.set(HttpHeaderId::HOST, "capnproto.org");
    474 
    475   auto response = co_await client->request(HttpMethod::GET, "/", headers).response;
    476   KJ_EXPECT(response.statusCode / 100 == 3);
    477   auto location = KJ_ASSERT_NONNULL(response.headers->get(HttpHeaderId::LOCATION));
    478   KJ_EXPECT(location == "https://capnproto.org/");
    479 
    480   auto body = co_await response.body->readAllText();
    481 }
    482 
    483 KJ_TEST("HttpClient to capnproto.org with a coroutine") {
    484   auto io = setupAsyncIo();
    485 
    486   auto promise = httpClientConnect(io).then([](Own<AsyncIoStream> connection) {
    487     return httpClient(kj::mv(connection));
    488   }, [](Exception&&) {
    489     KJ_LOG(WARNING, "skipping test because couldn't connect to capnproto.org");
    490   });
    491 
    492   promise.wait(io.waitScope);
    493 }
    494 
    495 #endif  // KJ_HAS_COROUTINE
    496 
    497 }  // namespace
    498 }  // namespace kj