async-coroutine-test.c++ (17153B)
1 // Copyright (c) 2020 Cloudflare, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #include <kj/async.h> 23 #include <kj/array.h> 24 #include <kj/compat/http.h> 25 #include <kj/debug.h> 26 #include <kj/test.h> 27 28 namespace kj { 29 namespace { 30 31 #ifdef KJ_HAS_COROUTINE 32 33 template <typename T> 34 Promise<kj::Decay<T>> identity(T&& value) { 35 co_return kj::fwd<T>(value); 36 } 37 // Work around a bonkers MSVC ICE with a separate overload. 38 Promise<const char*> identity(const char* value) { 39 co_return value; 40 } 41 42 KJ_TEST("Identity coroutine") { 43 EventLoop loop; 44 WaitScope waitScope(loop); 45 46 KJ_EXPECT(identity(123).wait(waitScope) == 123); 47 KJ_EXPECT(*identity(kj::heap(456)).wait(waitScope) == 456); 48 49 { 50 auto p = identity("we can cancel the coroutine"); 51 } 52 } 53 54 template <typename T> 55 Promise<T> simpleCoroutine(kj::Promise<T> result, kj::Promise<bool> dontThrow = true) { 56 KJ_ASSERT(co_await dontThrow); 57 co_return co_await result; 58 } 59 60 KJ_TEST("Simple coroutine test") { 61 EventLoop loop; 62 WaitScope waitScope(loop); 63 64 simpleCoroutine(kj::Promise<void>(kj::READY_NOW)).wait(waitScope); 65 66 KJ_EXPECT(simpleCoroutine(kj::Promise<int>(123)).wait(waitScope) == 123); 67 } 68 69 struct Counter { 70 size_t& count; 71 Counter(size_t& count): count(count) {} 72 ~Counter() { ++count; } 73 KJ_DISALLOW_COPY(Counter); 74 }; 75 76 kj::Promise<void> countDtorsAroundAwait(size_t& count, kj::Promise<void> promise) { 77 Counter counter1(count); 78 co_await promise; 79 Counter counter2(count); 80 co_return; 81 }; 82 83 KJ_TEST("co_awaiting an immediate promise does not suspend if the event loop is empty and running") { 84 // The coroutine PromiseNode implementation contains an optimization which allows us to avoid 85 // suspending the coroutine and instead immediately call PromiseNode::get() and proceed with 86 // execution. 87 88 EventLoop loop; 89 WaitScope waitScope(loop); 90 91 // The immediate-execution optimization is only enabled when the event loop is running, so use an 92 // eagerly-evaluated evalLater() to perform the test from within the event loop. (If we didn't 93 // eagerly-evaluate the promise, the result would be extracted after the loop finished.) 94 kj::evalLater([&]() { 95 size_t count = 0; 96 97 auto promise = kj::Promise<void>(kj::READY_NOW); 98 auto coroPromise = countDtorsAroundAwait(count, kj::READY_NOW); 99 100 // `coro` has not been destroyed yet, but it completed and unwound its frame. 101 KJ_EXPECT(count == 2); 102 }).eagerlyEvaluate(nullptr).wait(waitScope); 103 104 kj::evalLater([&]() { 105 // If there are no background tasks in the queue, coroutines execute through an evalLater() 106 // without suspending. 107 108 size_t count = 0; 109 bool evalLaterRan = false; 110 111 auto promise = kj::evalLater([&]() { evalLaterRan = true; }); 112 auto coroPromise = countDtorsAroundAwait(count, kj::mv(promise)); 113 114 KJ_EXPECT(evalLaterRan == true); 115 KJ_EXPECT(count == 2); 116 }).eagerlyEvaluate(nullptr).wait(waitScope); 117 } 118 119 KJ_TEST("co_awaiting an immediate promise suspends if the event loop is not running") { 120 // We only want to enable the immediate-execution optimization if the event loop is running, or 121 // else a whole bunch of RPC tests break, because some .then()s get evaluated on promise 122 // construction, before any .wait() call. 123 124 EventLoop loop; 125 WaitScope waitScope(loop); 126 127 size_t count = 0; 128 129 auto promise = kj::Promise<void>(kj::READY_NOW); 130 auto coroPromise = countDtorsAroundAwait(count, kj::READY_NOW); 131 132 // In the previous test, this exact same code executed immediately because the event loop was 133 // running. 134 KJ_EXPECT(count == 0); 135 } 136 137 KJ_TEST("co_awaiting immediate promises suspends if the event loop is not empty") { 138 // We want to make sure that we can still return to the event loop when we need to. 139 140 EventLoop loop; 141 WaitScope waitScope(loop); 142 143 // The immediate-execution optimization is only enabled when the event loop is running, so use an 144 // eagerly-evaluated evalLater() to perform the test from within the event loop. (If we didn't 145 // eagerly-evaluate the promise, the result would be extracted after the loop finished.) 146 kj::evalLater([&]() { 147 size_t count = 0; 148 149 // We need to enqueue an Event on the event loop to inhibit the immediate-execution 150 // optimization. Creating and then immediately fulfilling an EagerPromiseNode is a convenient 151 // way to do so. 152 auto paf = newPromiseAndFulfiller<void>(); 153 paf.promise = paf.promise.eagerlyEvaluate(nullptr); 154 paf.fulfiller->fulfill(); 155 156 auto promise = kj::Promise<void>(kj::READY_NOW); 157 auto coroPromise = countDtorsAroundAwait(count, kj::READY_NOW); 158 159 // We didn't immediately extract the READY_NOW. 160 KJ_EXPECT(count == 0); 161 }).eagerlyEvaluate(nullptr).wait(waitScope); 162 163 kj::evalLater([&]() { 164 size_t count = 0; 165 bool evalLaterRan = false; 166 167 // We need to enqueue an Event on the event loop to inhibit the immediate-execution 168 // optimization. Creating and then immediately fulfilling an EagerPromiseNode is a convenient 169 // way to do so. 170 auto paf = newPromiseAndFulfiller<void>(); 171 paf.promise = paf.promise.eagerlyEvaluate(nullptr); 172 paf.fulfiller->fulfill(); 173 174 auto promise = kj::evalLater([&]() { evalLaterRan = true; }); 175 auto coroPromise = countDtorsAroundAwait(count, kj::mv(promise)); 176 177 // We didn't continue through the evalLater() promise, because the background promise's 178 // continuation was next in the event loop's queue. 179 KJ_EXPECT(evalLaterRan == false); 180 // No Counter destructor has run. 181 KJ_EXPECT(count == 0, count); 182 }).eagerlyEvaluate(nullptr).wait(waitScope); 183 } 184 185 KJ_TEST("Exceptions propagate through layered coroutines") { 186 EventLoop loop; 187 WaitScope waitScope(loop); 188 189 auto throwy = simpleCoroutine(kj::Promise<int>(kj::NEVER_DONE), false); 190 191 KJ_EXPECT_THROW_RECOVERABLE(FAILED, simpleCoroutine(kj::mv(throwy)).wait(waitScope)); 192 } 193 194 KJ_TEST("Exceptions before the first co_await don't escape, but reject the promise") { 195 EventLoop loop; 196 WaitScope waitScope(loop); 197 198 auto throwEarly = []() -> Promise<void> { 199 KJ_FAIL_ASSERT("test exception"); 200 #ifdef __GNUC__ 201 // Yes, this `co_return` is unreachable. But without it, this function is no longer a coroutine. 202 #pragma GCC diagnostic push 203 #pragma GCC diagnostic ignored "-Wunreachable-code" 204 #endif // __GNUC__ 205 co_return; 206 #ifdef __GNUC__ 207 #pragma GCC diagnostic pop 208 #endif // __GNUC__ 209 }; 210 211 auto throwy = throwEarly(); 212 213 KJ_EXPECT_THROW_RECOVERABLE(FAILED, throwy.wait(waitScope)); 214 } 215 216 KJ_TEST("Coroutines can catch exceptions from co_await") { 217 EventLoop loop; 218 WaitScope waitScope(loop); 219 220 kj::String description; 221 222 auto tryCatch = [&](kj::Promise<void> promise) -> kj::Promise<kj::String> { 223 try { 224 co_await promise; 225 } catch (const kj::Exception& exception) { 226 co_return kj::str(exception.getDescription()); 227 } 228 KJ_FAIL_EXPECT("should have thrown"); 229 KJ_UNREACHABLE; 230 }; 231 232 { 233 // Immediately ready case. 234 auto promise = kj::Promise<void>(KJ_EXCEPTION(FAILED, "catch me")); 235 KJ_EXPECT(tryCatch(kj::mv(promise)).wait(waitScope) == "catch me"); 236 } 237 238 { 239 // Ready later case. 240 auto promise = kj::evalLater([]() -> kj::Promise<void> { 241 return KJ_EXCEPTION(FAILED, "catch me"); 242 }); 243 KJ_EXPECT(tryCatch(kj::mv(promise)).wait(waitScope) == "catch me"); 244 } 245 } 246 247 KJ_TEST("Coroutines can be canceled while suspended") { 248 EventLoop loop; 249 WaitScope waitScope(loop); 250 251 size_t count = 0; 252 253 auto coro = [&](kj::Promise<int> promise) -> kj::Promise<void> { 254 Counter counter1(count); 255 co_await kj::evalLater([](){}); 256 Counter counter2(count); 257 co_await promise; 258 }; 259 260 { 261 auto neverDone = kj::Promise<int>(kj::NEVER_DONE); 262 neverDone = neverDone.attach(kj::heap<Counter>(count)); 263 auto promise = coro(kj::mv(neverDone)); 264 KJ_EXPECT(!promise.poll(waitScope)); 265 } 266 267 // Stack variables on both sides of a co_await, plus coroutine arguments are destroyed. 268 KJ_EXPECT(count == 3); 269 } 270 271 kj::Promise<void> deferredThrowCoroutine(kj::Promise<void> awaitMe) { 272 KJ_DEFER(kj::throwFatalException(KJ_EXCEPTION(FAILED, "thrown during unwind"))); 273 co_await awaitMe; 274 co_return; 275 }; 276 277 KJ_TEST("Exceptions during suspended coroutine frame-unwind propagate via destructor") { 278 EventLoop loop; 279 WaitScope waitScope(loop); 280 281 auto exception = KJ_ASSERT_NONNULL(kj::runCatchingExceptions([&]() { 282 deferredThrowCoroutine(kj::NEVER_DONE); 283 })); 284 285 KJ_EXPECT(exception.getDescription() == "thrown during unwind"); 286 }; 287 288 KJ_TEST("Exceptions during suspended coroutine frame-unwind do not cause a memory leak") { 289 EventLoop loop; 290 WaitScope waitScope(loop); 291 292 // We can't easily test for memory leaks without hooking operator new and delete. However, we can 293 // arrange for the test to crash on failure, by having the coroutine suspend at a promise that we 294 // later fulfill, thus arming the Coroutine's Event. If we fail to destroy the coroutine in this 295 // state, EventLoop will throw on destruction because it can still see the Event in its list. 296 297 auto exception = KJ_ASSERT_NONNULL(kj::runCatchingExceptions([&]() { 298 auto paf = kj::newPromiseAndFulfiller<void>(); 299 300 auto coroPromise = deferredThrowCoroutine(kj::mv(paf.promise)); 301 302 // Arm the Coroutine's Event. 303 paf.fulfiller->fulfill(); 304 305 // If destroying `coroPromise` does not run ~Event(), then ~EventLoop() will crash later. 306 })); 307 308 KJ_EXPECT(exception.getDescription() == "thrown during unwind"); 309 }; 310 311 KJ_TEST("Exceptions during completed coroutine frame-unwind propagate via returned Promise") { 312 EventLoop loop; 313 WaitScope waitScope(loop); 314 315 { 316 // First, prove that exceptions don't escape the destructor of a completed coroutine. 317 auto promise = deferredThrowCoroutine(kj::READY_NOW); 318 KJ_EXPECT(promise.poll(waitScope)); 319 } 320 321 { 322 // Next, prove that they show up via the returned Promise. 323 auto promise = deferredThrowCoroutine(kj::READY_NOW); 324 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("thrown during unwind", promise.wait(waitScope)); 325 } 326 } 327 328 KJ_TEST("Coroutine destruction exceptions are ignored if there is another exception in flight") { 329 EventLoop loop; 330 WaitScope waitScope(loop); 331 332 auto exception = KJ_ASSERT_NONNULL(kj::runCatchingExceptions([&]() { 333 auto promise = deferredThrowCoroutine(kj::NEVER_DONE); 334 kj::throwFatalException(KJ_EXCEPTION(FAILED, "thrown before destroying throwy promise")); 335 })); 336 337 KJ_EXPECT(exception.getDescription() == "thrown before destroying throwy promise"); 338 } 339 340 KJ_TEST("co_await only sees coroutine destruction exceptions if promise was not rejected") { 341 EventLoop loop; 342 WaitScope waitScope(loop); 343 344 // throwyDtorPromise is an immediate void promise that will throw when it's destroyed, which 345 // we expect to be able to catch from a coroutine which co_awaits it. 346 auto throwyDtorPromise = kj::Promise<void>(kj::READY_NOW) 347 .attach(kj::defer([]() { 348 kj::throwFatalException(KJ_EXCEPTION(FAILED, "thrown during unwind")); 349 })); 350 351 // rejectedThrowyDtorPromise is a rejected promise. When co_awaited in a coroutine, 352 // Awaiter::await_resume() will throw that exception for us to catch, but before we can catch it, 353 // the temporary promise will be destroyed. The exception it throws during unwind will be ignored, 354 // and the caller of the coroutine will see only the "thrown during execution" exception. 355 auto rejectedThrowyDtorPromise = kj::evalNow([&]() -> kj::Promise<void> { 356 kj::throwFatalException(KJ_EXCEPTION(FAILED, "thrown during execution")); 357 }).attach(kj::defer([]() { 358 kj::throwFatalException(KJ_EXCEPTION(FAILED, "thrown during unwind")); 359 })); 360 361 auto awaitPromise = [](kj::Promise<void> promise) -> kj::Promise<void> { 362 co_await promise; 363 }; 364 365 KJ_EXPECT_THROW_MESSAGE("thrown during unwind", 366 awaitPromise(kj::mv(throwyDtorPromise)).wait(waitScope)); 367 368 KJ_EXPECT_THROW_MESSAGE("thrown during execution", 369 awaitPromise(kj::mv(rejectedThrowyDtorPromise)).wait(waitScope)); 370 } 371 372 uint countLines(StringPtr s) { 373 uint lines = 0; 374 for (char c: s) { 375 lines += c == '\n'; 376 } 377 return lines; 378 } 379 380 #if (!_MSC_VER || defined(__clang__)) && !__aarch64__ 381 // TODO(msvc): This test relies on GetFunctorStartAddress, which is not supported on MSVC currently, 382 // so skip the test. 383 // TODO(someday): Test is flakey on arm64, depending on how it's compiled. I haven't had a chance to 384 // investigate much, but noticed that it failed in a debug build, but passed in a local opt build. 385 KJ_TEST("Can trace through coroutines") { 386 // This verifies that async traces, generated either from promises or from events, can see through 387 // coroutines. 388 // 389 // This test may be a bit brittle because it depends on specific trace counts. 390 391 // Enable stack traces, even in release mode. 392 class EnableFullStackTrace: public ExceptionCallback { 393 public: 394 StackTraceMode stackTraceMode() override { return StackTraceMode::FULL; } 395 }; 396 EnableFullStackTrace exceptionCallback; 397 398 EventLoop loop; 399 WaitScope waitScope(loop); 400 401 auto paf = newPromiseAndFulfiller<void>(); 402 403 // Get an async trace when the promise is fulfilled. We eagerlyEvaluate() to make sure the 404 // continuation executes while the event loop is running. 405 paf.promise = paf.promise.then([]() { 406 auto trace = getAsyncTrace(); 407 // We expect one entry for waitImpl(), one for the coroutine, and one for this continuation. 408 // When building in debug mode with CMake, I observed this count can be 2. The missing frame is 409 // probably this continuation. Let's just expect a range. 410 auto count = countLines(trace); 411 KJ_EXPECT(0 < count && count <= 3); 412 }).eagerlyEvaluate(nullptr); 413 414 auto coroPromise = [&]() -> kj::Promise<void> { 415 co_await paf.promise; 416 }(); 417 418 { 419 auto trace = coroPromise.trace(); 420 // One for the Coroutine PromiseNode, one for paf.promise. 421 KJ_EXPECT(countLines(trace) >= 2); 422 } 423 424 paf.fulfiller->fulfill(); 425 426 coroPromise.wait(waitScope); 427 } 428 #endif // !_MSC_VER || defined(__clang__) 429 430 Promise<void> sendData(Promise<Own<NetworkAddress>> addressPromise) { 431 auto address = co_await addressPromise; 432 auto client = co_await address->connect(); 433 co_await client->write("foo", 3); 434 } 435 436 Promise<String> receiveDataCoroutine(Own<ConnectionReceiver> listener) { 437 auto server = co_await listener->accept(); 438 char buffer[4]; 439 auto n = co_await server->read(buffer, 3, 4); 440 KJ_EXPECT(3u == n); 441 co_return heapString(buffer, n); 442 } 443 444 KJ_TEST("Simple network test with coroutine") { 445 auto io = setupAsyncIo(); 446 auto& network = io.provider->getNetwork(); 447 448 Own<NetworkAddress> serverAddress = network.parseAddress("*", 0).wait(io.waitScope); 449 Own<ConnectionReceiver> listener = serverAddress->listen(); 450 451 sendData(network.parseAddress("localhost", listener->getPort())) 452 .detach([](Exception&& exception) { 453 KJ_FAIL_EXPECT(exception); 454 }); 455 456 String result = receiveDataCoroutine(kj::mv(listener)).wait(io.waitScope); 457 458 KJ_EXPECT("foo" == result); 459 } 460 461 Promise<Own<AsyncIoStream>> httpClientConnect(AsyncIoContext& io) { 462 auto addr = co_await io.provider->getNetwork().parseAddress("capnproto.org", 80); 463 co_return co_await addr->connect(); 464 } 465 466 Promise<void> httpClient(Own<AsyncIoStream> connection) { 467 // Borrowed and rewritten from compat/http-test.c++. 468 469 HttpHeaderTable table; 470 auto client = newHttpClient(table, *connection); 471 472 HttpHeaders headers(table); 473 headers.set(HttpHeaderId::HOST, "capnproto.org"); 474 475 auto response = co_await client->request(HttpMethod::GET, "/", headers).response; 476 KJ_EXPECT(response.statusCode / 100 == 3); 477 auto location = KJ_ASSERT_NONNULL(response.headers->get(HttpHeaderId::LOCATION)); 478 KJ_EXPECT(location == "https://capnproto.org/"); 479 480 auto body = co_await response.body->readAllText(); 481 } 482 483 KJ_TEST("HttpClient to capnproto.org with a coroutine") { 484 auto io = setupAsyncIo(); 485 486 auto promise = httpClientConnect(io).then([](Own<AsyncIoStream> connection) { 487 return httpClient(kj::mv(connection)); 488 }, [](Exception&&) { 489 KJ_LOG(WARNING, "skipping test because couldn't connect to capnproto.org"); 490 }); 491 492 promise.wait(io.waitScope); 493 } 494 495 #endif // KJ_HAS_COROUTINE 496 497 } // namespace 498 } // namespace kj