byte-stream-test.c++ (26412B)
1 // Copyright (c) 2019 Cloudflare, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #include "byte-stream.h" 23 #include <kj/test.h> 24 #include <capnp/rpc-twoparty.h> 25 #include <stdlib.h> 26 27 namespace capnp { 28 namespace { 29 30 kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) { 31 if (expected.size() == 0) return kj::READY_NOW; 32 33 auto buffer = kj::heapArray<char>(expected.size()); 34 35 auto promise = in.tryRead(buffer.begin(), 1, buffer.size()); 36 return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<char> buffer, size_t amount) { 37 if (amount == 0) { 38 KJ_FAIL_ASSERT("expected data never sent", expected); 39 } 40 41 auto actual = buffer.slice(0, amount); 42 if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) { 43 KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual); 44 } 45 46 return expectRead(in, expected.slice(amount)); 47 })); 48 } 49 50 kj::String makeString(size_t size) { 51 auto bytes = kj::heapArray<char>(size); 52 for (char& c: bytes) { 53 c = 'a' + rand() % 26; 54 } 55 bytes[bytes.size() - 1] = 0; 56 return kj::String(kj::mv(bytes)); 57 }; 58 59 KJ_TEST("KJ -> ByteStream -> KJ without shortening") { 60 kj::EventLoop eventLoop; 61 kj::WaitScope waitScope(eventLoop); 62 63 ByteStreamFactory factory1; 64 ByteStreamFactory factory2; 65 66 auto pipe = kj::newOneWayPipe(); 67 68 auto wrapped = factory1.capnpToKj(factory2.kjToCapnp(kj::mv(pipe.out))); 69 70 { 71 auto promise = wrapped->write("foo", 3); 72 KJ_EXPECT(!promise.poll(waitScope)); 73 expectRead(*pipe.in, "foo").wait(waitScope); 74 promise.wait(waitScope); 75 } 76 77 { 78 // Write more than 1 << 16 bytes at once to exercise write splitting. 79 auto str = makeString(1 << 17); 80 auto promise = wrapped->write(str.begin(), str.size()); 81 KJ_EXPECT(!promise.poll(waitScope)); 82 expectRead(*pipe.in, str).wait(waitScope); 83 promise.wait(waitScope); 84 } 85 86 { 87 // Write more than 1 << 16 bytes via an array to exercise write splitting. 88 auto str = makeString(1 << 18); 89 auto pieces = kj::heapArrayBuilder<kj::ArrayPtr<const kj::byte>>(4); 90 91 // Two 2^15 pieces will be combined. 92 pieces.add(kj::arrayPtr(reinterpret_cast<kj::byte*>(str.begin()), 1 << 15)); 93 pieces.add(kj::arrayPtr(reinterpret_cast<kj::byte*>(str.begin() + (1 << 15)), 1 << 15)); 94 95 // One 2^16 piece will be written alone. 96 pieces.add(kj::arrayPtr(reinterpret_cast<kj::byte*>( 97 str.begin() + (1 << 16)), 1 << 16)); 98 99 // One 2^17 piece will be split. 100 pieces.add(kj::arrayPtr(reinterpret_cast<kj::byte*>( 101 str.begin() + (1 << 17)), str.size() - (1 << 17))); 102 103 auto promise = wrapped->write(pieces); 104 KJ_EXPECT(!promise.poll(waitScope)); 105 expectRead(*pipe.in, str).wait(waitScope); 106 promise.wait(waitScope); 107 } 108 109 wrapped = nullptr; 110 KJ_EXPECT(pipe.in->readAllText().wait(waitScope) == ""); 111 } 112 113 class ExactPointerWriter: public kj::AsyncOutputStream { 114 public: 115 kj::ArrayPtr<const char> receivedBuffer; 116 117 void fulfill() { 118 KJ_ASSERT_NONNULL(fulfiller)->fulfill(); 119 fulfiller = nullptr; 120 receivedBuffer = nullptr; 121 } 122 123 kj::Promise<void> write(const void* buffer, size_t size) override { 124 KJ_ASSERT(fulfiller == nullptr); 125 receivedBuffer = kj::arrayPtr(reinterpret_cast<const char*>(buffer), size); 126 auto paf = kj::newPromiseAndFulfiller<void>(); 127 fulfiller = kj::mv(paf.fulfiller); 128 return kj::mv(paf.promise); 129 } 130 kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override { 131 KJ_UNIMPLEMENTED("not implemented for test"); 132 } 133 kj::Promise<void> whenWriteDisconnected() override { 134 return kj::NEVER_DONE; 135 } 136 137 void expectBuffer(kj::StringPtr expected) { 138 KJ_EXPECT(receivedBuffer == expected.asArray(), receivedBuffer, expected); 139 } 140 141 private: 142 kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> fulfiller; 143 }; 144 145 KJ_TEST("KJ -> ByteStream -> KJ with shortening") { 146 kj::EventLoop eventLoop; 147 kj::WaitScope waitScope(eventLoop); 148 149 ByteStreamFactory factory; 150 151 auto pipe = kj::newOneWayPipe(); 152 153 ExactPointerWriter exactPointerWriter; 154 auto pumpPromise = pipe.in->pumpTo(exactPointerWriter); 155 156 auto wrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(pipe.out))); 157 158 { 159 char buffer[4] = "foo"; 160 auto promise = wrapped->write(buffer, 3); 161 KJ_EXPECT(!promise.poll(waitScope)); 162 163 // This first write won't have been path-shortened because we didn't know about the shorter 164 // path yet when it started. 165 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() != buffer); 166 KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "foo"); 167 exactPointerWriter.fulfill(); 168 promise.wait(waitScope); 169 } 170 171 { 172 char buffer[4] = "foo"; 173 auto promise = wrapped->write(buffer, 3); 174 KJ_EXPECT(!promise.poll(waitScope)); 175 176 // The second write was path-shortened so passes through the exact buffer! 177 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer); 178 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3); 179 exactPointerWriter.fulfill(); 180 promise.wait(waitScope); 181 } 182 183 wrapped = nullptr; 184 KJ_EXPECT(pipe.in->readAllText().wait(waitScope) == ""); 185 } 186 187 KJ_TEST("KJ -> ByteStream -> KJ -> ByteStream -> KJ with shortening") { 188 kj::EventLoop eventLoop; 189 kj::WaitScope waitScope(eventLoop); 190 191 ByteStreamFactory factory; 192 193 auto pipe = kj::newOneWayPipe(); 194 195 ExactPointerWriter exactPointerWriter; 196 auto pumpPromise = pipe.in->pumpTo(exactPointerWriter); 197 198 auto wrapped = factory.capnpToKj(factory.kjToCapnp( 199 factory.capnpToKj(factory.kjToCapnp(kj::mv(pipe.out))))); 200 201 { 202 char buffer[4] = "foo"; 203 auto promise = wrapped->write(buffer, 3); 204 KJ_EXPECT(!promise.poll(waitScope)); 205 206 // This first write won't have been path-shortened because we didn't know about the shorter 207 // path yet when it started. 208 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() != buffer); 209 KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "foo"); 210 exactPointerWriter.fulfill(); 211 promise.wait(waitScope); 212 } 213 214 { 215 char buffer[4] = "bar"; 216 auto promise = wrapped->write(buffer, 3); 217 KJ_EXPECT(!promise.poll(waitScope)); 218 219 // The second write was path-shortened so passes through the exact buffer! 220 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer); 221 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3); 222 exactPointerWriter.fulfill(); 223 promise.wait(waitScope); 224 } 225 226 wrapped = nullptr; 227 KJ_EXPECT(pumpPromise.wait(waitScope) == 6); 228 } 229 230 KJ_TEST("KJ -> ByteStream -> KJ pipe -> ByteStream -> KJ with shortening") { 231 kj::EventLoop eventLoop; 232 kj::WaitScope waitScope(eventLoop); 233 234 ByteStreamFactory factory; 235 236 auto backPipe = kj::newOneWayPipe(); 237 auto middlePipe = kj::newOneWayPipe(); 238 239 ExactPointerWriter exactPointerWriter; 240 auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter); 241 242 auto backWrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(backPipe.out))); 243 auto midPumpPormise = middlePipe.in->pumpTo(*backWrapped, 3); 244 245 auto wrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe.out))); 246 247 // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle. 248 auto disconnectPromise = wrapped->whenWriteDisconnected(); 249 KJ_EXPECT(!disconnectPromise.poll(waitScope)); 250 251 char buffer[7] = "foobar"; 252 auto writePromise = wrapped->write(buffer, 6); 253 KJ_EXPECT(!writePromise.poll(waitScope)); 254 255 // The first three bytes will tunnel all the way down to the destination. 256 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer); 257 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3); 258 exactPointerWriter.fulfill(); 259 260 KJ_EXPECT(midPumpPormise.wait(waitScope) == 3); 261 262 ExactPointerWriter exactPointerWriter2; 263 midPumpPormise = middlePipe.in->pumpTo(exactPointerWriter2, 6); 264 KJ_EXPECT(!writePromise.poll(waitScope)); 265 266 // The second half of the "foobar" write will have taken a slow path, because the write was 267 // restarted in the middle of the stream re-resolving itself. 268 KJ_EXPECT(kj::str(exactPointerWriter2.receivedBuffer) == "bar"); 269 exactPointerWriter2.fulfill(); 270 271 // Now that write is done. 272 writePromise.wait(waitScope); 273 KJ_EXPECT(!midPumpPormise.poll(waitScope)); 274 275 // If we write again, it'll hit the fast path. 276 char buffer2[4] = "baz"; 277 writePromise = wrapped->write(buffer2, 3); 278 KJ_EXPECT(!writePromise.poll(waitScope)); 279 KJ_EXPECT(exactPointerWriter2.receivedBuffer.begin() == buffer2); 280 KJ_EXPECT(exactPointerWriter2.receivedBuffer.size() == 3); 281 exactPointerWriter2.fulfill(); 282 283 KJ_EXPECT(midPumpPormise.wait(waitScope) == 6); 284 writePromise.wait(waitScope); 285 } 286 287 KJ_TEST("KJ -> ByteStream RPC -> KJ pipe -> ByteStream RPC -> KJ with shortening") { 288 // For this test, we're going to verify that if we have ByteStreams over RPC in both directions 289 // and we pump a ByteStream to another ByteStream at one end of the connection, it gets shortened 290 // all the way to the other end! 291 292 kj::EventLoop eventLoop; 293 kj::WaitScope waitScope(eventLoop); 294 295 ByteStreamFactory clientFactory; 296 ByteStreamFactory serverFactory; 297 298 auto backPipe = kj::newOneWayPipe(); 299 auto middlePipe = kj::newOneWayPipe(); 300 301 ExactPointerWriter exactPointerWriter; 302 auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter); 303 304 auto rpcConnection = kj::newTwoWayPipe(); 305 capnp::TwoPartyClient client(*rpcConnection.ends[0], 306 clientFactory.kjToCapnp(kj::mv(backPipe.out)), 307 rpc::twoparty::Side::CLIENT); 308 capnp::TwoPartyClient server(*rpcConnection.ends[1], 309 serverFactory.kjToCapnp(kj::mv(middlePipe.out)), 310 rpc::twoparty::Side::CLIENT); 311 312 auto backWrapped = serverFactory.capnpToKj(server.bootstrap().castAs<ByteStream>()); 313 auto midPumpPormise = middlePipe.in->pumpTo(*backWrapped, 3); 314 315 auto wrapped = clientFactory.capnpToKj(client.bootstrap().castAs<ByteStream>()); 316 317 // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle. 318 auto disconnectPromise = wrapped->whenWriteDisconnected(); 319 KJ_EXPECT(!disconnectPromise.poll(waitScope)); 320 321 char buffer[7] = "foobar"; 322 auto writePromise = wrapped->write(buffer, 6); 323 324 // The server side did a 3-byte pump. Path-shortening magic kicks in, and the first three bytes 325 // of the write on the client side go *directly* to the endpoint without a copy! 326 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer); 327 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3); 328 exactPointerWriter.fulfill(); 329 330 KJ_EXPECT(midPumpPormise.wait(waitScope) == 3); 331 332 ExactPointerWriter exactPointerWriter2; 333 midPumpPormise = middlePipe.in->pumpTo(exactPointerWriter2, 6); 334 midPumpPormise.poll(waitScope); 335 336 // The second half of the "foobar" write will have taken a slow path, because the write was 337 // restarted in the middle of the stream re-resolving itself. 338 KJ_EXPECT(kj::str(exactPointerWriter2.receivedBuffer) == "bar"); 339 exactPointerWriter2.fulfill(); 340 341 // Now that write is done. 342 writePromise.wait(waitScope); 343 KJ_EXPECT(!midPumpPormise.poll(waitScope)); 344 345 // If we write again, it'll finish the server-side pump (but won't be a zero-copy write since 346 // it has to go over RPC). 347 char buffer2[4] = "baz"; 348 writePromise = wrapped->write(buffer2, 3); 349 KJ_EXPECT(!midPumpPormise.poll(waitScope)); 350 KJ_EXPECT(kj::str(exactPointerWriter2.receivedBuffer) == "baz"); 351 exactPointerWriter2.fulfill(); 352 353 KJ_EXPECT(midPumpPormise.wait(waitScope) == 6); 354 writePromise.wait(waitScope); 355 } 356 357 KJ_TEST("KJ -> ByteStream RPC -> KJ pipe -> ByteStream RPC -> KJ with concurrent shortening") { 358 // This is similar to the previous test, but we start writing before the path-shortening has 359 // settled. This should result in some writes optimistically bouncing back and forth before 360 // the stream settles in. 361 362 kj::EventLoop eventLoop; 363 kj::WaitScope waitScope(eventLoop); 364 365 ByteStreamFactory clientFactory; 366 ByteStreamFactory serverFactory; 367 368 auto backPipe = kj::newOneWayPipe(); 369 auto middlePipe = kj::newOneWayPipe(); 370 371 ExactPointerWriter exactPointerWriter; 372 auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter); 373 374 auto rpcConnection = kj::newTwoWayPipe(); 375 capnp::TwoPartyClient client(*rpcConnection.ends[0], 376 clientFactory.kjToCapnp(kj::mv(backPipe.out)), 377 rpc::twoparty::Side::CLIENT); 378 capnp::TwoPartyClient server(*rpcConnection.ends[1], 379 serverFactory.kjToCapnp(kj::mv(middlePipe.out)), 380 rpc::twoparty::Side::CLIENT); 381 382 auto backWrapped = serverFactory.capnpToKj(server.bootstrap().castAs<ByteStream>()); 383 auto midPumpPormise = middlePipe.in->pumpTo(*backWrapped); 384 385 auto wrapped = clientFactory.capnpToKj(client.bootstrap().castAs<ByteStream>()); 386 387 char buffer[7] = "foobar"; 388 auto writePromise = wrapped->write(buffer, 6); 389 390 // The write went to RPC so it's not immediately received. 391 KJ_EXPECT(exactPointerWriter.receivedBuffer == nullptr); 392 393 // Write should be received after we turn the event loop. 394 waitScope.poll(); 395 KJ_EXPECT(exactPointerWriter.receivedBuffer != nullptr); 396 397 // Note that the promise that write() returned above has already resolved, because it hit RPC 398 // and went into the streaming window. 399 KJ_ASSERT(writePromise.poll(waitScope)); 400 writePromise.wait(waitScope); 401 402 // Let's start a second write. Even though the first write technically isn't done yet, it's 403 // legal for us to start a second one because the first write's returned promise optimistically 404 // resolved for streaming window reasons. This ends up being a very tricky case for our code! 405 char buffer2[7] = "bazqux"; 406 auto writePromise2 = wrapped->write(buffer2, 6); 407 408 // Now check the first write was correct, and close it out. 409 KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "foobar"); 410 exactPointerWriter.fulfill(); 411 412 // Turn event loop again. Now the second write arrives. 413 waitScope.poll(); 414 KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "bazqux"); 415 exactPointerWriter.fulfill(); 416 writePromise2.wait(waitScope); 417 418 // If we do another write now, it should be zero-copy, because everything has settled. 419 char buffer3[6] = "corge"; 420 auto writePromise3 = wrapped->write(buffer3, 5); 421 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer3); 422 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 5); 423 KJ_EXPECT(!writePromise3.poll(waitScope)); 424 exactPointerWriter.fulfill(); 425 writePromise3.wait(waitScope); 426 } 427 428 KJ_TEST("KJ -> KJ pipe -> ByteStream RPC -> KJ pipe -> ByteStream RPC -> KJ with concurrent shortening") { 429 // Same as previous test, except we add a KJ pipe at the beginning and pump it into the top of 430 // the pipe, which invokes tryPumpFrom() on the KjToCapnpStreamAdapter. 431 432 kj::EventLoop eventLoop; 433 kj::WaitScope waitScope(eventLoop); 434 435 ByteStreamFactory clientFactory; 436 ByteStreamFactory serverFactory; 437 438 auto backPipe = kj::newOneWayPipe(); 439 auto middlePipe = kj::newOneWayPipe(); 440 auto frontPipe = kj::newOneWayPipe(); 441 442 ExactPointerWriter exactPointerWriter; 443 auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter); 444 445 auto rpcConnection = kj::newTwoWayPipe(); 446 capnp::TwoPartyClient client(*rpcConnection.ends[0], 447 clientFactory.kjToCapnp(kj::mv(backPipe.out)), 448 rpc::twoparty::Side::CLIENT); 449 capnp::TwoPartyClient server(*rpcConnection.ends[1], 450 serverFactory.kjToCapnp(kj::mv(middlePipe.out)), 451 rpc::twoparty::Side::CLIENT); 452 453 auto backWrapped = serverFactory.capnpToKj(server.bootstrap().castAs<ByteStream>()); 454 auto midPumpPormise = middlePipe.in->pumpTo(*backWrapped); 455 456 auto wrapped = clientFactory.capnpToKj(client.bootstrap().castAs<ByteStream>()); 457 auto frontPumpPromise = frontPipe.in->pumpTo(*wrapped); 458 459 char buffer[7] = "foobar"; 460 auto writePromise = frontPipe.out->write(buffer, 6); 461 462 // The write went to RPC so it's not immediately received. 463 KJ_EXPECT(exactPointerWriter.receivedBuffer == nullptr); 464 465 // Write should be received after we turn the event loop. 466 waitScope.poll(); 467 KJ_EXPECT(exactPointerWriter.receivedBuffer != nullptr); 468 469 // Note that the promise that write() returned above has already resolved, because it hit RPC 470 // and went into the streaming window. 471 KJ_ASSERT(writePromise.poll(waitScope)); 472 writePromise.wait(waitScope); 473 474 // Let's start a second write. Even though the first write technically isn't done yet, it's 475 // legal for us to start a second one because the first write's returned promise optimistically 476 // resolved for streaming window reasons. This ends up being a very tricky case for our code! 477 char buffer2[7] = "bazqux"; 478 auto writePromise2 = frontPipe.out->write(buffer2, 6); 479 480 // Now check the first write was correct, and close it out. 481 KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "foobar"); 482 exactPointerWriter.fulfill(); 483 484 // Turn event loop again. Now the second write arrives. 485 waitScope.poll(); 486 KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "bazqux"); 487 exactPointerWriter.fulfill(); 488 writePromise2.wait(waitScope); 489 490 // If we do another write now, it should be zero-copy, because everything has settled. 491 char buffer3[6] = "corge"; 492 auto writePromise3 = frontPipe.out->write(buffer3, 5); 493 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer3); 494 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 5); 495 KJ_EXPECT(!writePromise3.poll(waitScope)); 496 exactPointerWriter.fulfill(); 497 writePromise3.wait(waitScope); 498 } 499 500 KJ_TEST("Two Substreams on one destination") { 501 kj::EventLoop eventLoop; 502 kj::WaitScope waitScope(eventLoop); 503 504 ByteStreamFactory factory; 505 506 auto backPipe = kj::newOneWayPipe(); 507 auto middlePipe1 = kj::newOneWayPipe(); 508 auto middlePipe2 = kj::newOneWayPipe(); 509 510 ExactPointerWriter exactPointerWriter; 511 auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter); 512 513 auto backWrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(backPipe.out))); 514 515 auto wrapped1 = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe1.out))); 516 auto wrapped2 = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe2.out))); 517 518 // Declare these buffers out here so that they can't possibly end up with the same address. 519 char buffer1[4] = "foo"; 520 char buffer2[4] = "bar"; 521 522 { 523 auto wrapped = kj::mv(wrapped1); 524 525 // First pump 3 bytes from the first stream. 526 auto midPumpPormise = middlePipe1.in->pumpTo(*backWrapped, 3); 527 528 // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle. 529 auto disconnectPromise = wrapped->whenWriteDisconnected(); 530 KJ_EXPECT(!disconnectPromise.poll(waitScope)); 531 532 auto writePromise = wrapped->write(buffer1, 3); 533 KJ_EXPECT(!writePromise.poll(waitScope)); 534 535 // The first write will tunnel all the way down to the destination. 536 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer1); 537 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3); 538 exactPointerWriter.fulfill(); 539 540 writePromise.wait(waitScope); 541 KJ_EXPECT(midPumpPormise.wait(waitScope) == 3); 542 } 543 544 { 545 auto wrapped = kj::mv(wrapped2); 546 547 // Now pump another 3 bytes from the second stream. 548 auto midPumpPormise = middlePipe2.in->pumpTo(*backWrapped, 3); 549 550 // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle. 551 auto disconnectPromise = wrapped->whenWriteDisconnected(); 552 KJ_EXPECT(!disconnectPromise.poll(waitScope)); 553 554 auto writePromise = wrapped->write(buffer2, 3); 555 KJ_EXPECT(!writePromise.poll(waitScope)); 556 557 // The second write will also tunnel all the way down to the destination. 558 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer2); 559 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3); 560 exactPointerWriter.fulfill(); 561 562 writePromise.wait(waitScope); 563 KJ_EXPECT(midPumpPormise.wait(waitScope) == 3); 564 } 565 } 566 567 KJ_TEST("Two Substreams on one destination no limits (pump to EOF)") { 568 kj::EventLoop eventLoop; 569 kj::WaitScope waitScope(eventLoop); 570 571 ByteStreamFactory factory; 572 573 auto backPipe = kj::newOneWayPipe(); 574 auto middlePipe1 = kj::newOneWayPipe(); 575 auto middlePipe2 = kj::newOneWayPipe(); 576 577 ExactPointerWriter exactPointerWriter; 578 auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter); 579 580 auto backWrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(backPipe.out))); 581 582 auto wrapped1 = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe1.out))); 583 auto wrapped2 = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe2.out))); 584 585 // Declare these buffers out here so that they can't possibly end up with the same address. 586 char buffer1[4] = "foo"; 587 char buffer2[4] = "bar"; 588 589 { 590 auto wrapped = kj::mv(wrapped1); 591 592 // First pump from the first stream until EOF. 593 auto midPumpPormise = middlePipe1.in->pumpTo(*backWrapped); 594 595 // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle. 596 auto disconnectPromise = wrapped->whenWriteDisconnected(); 597 KJ_EXPECT(!disconnectPromise.poll(waitScope)); 598 599 auto writePromise = wrapped->write(buffer1, 3); 600 KJ_EXPECT(!writePromise.poll(waitScope)); 601 602 // The first write will tunnel all the way down to the destination. 603 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer1); 604 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3); 605 exactPointerWriter.fulfill(); 606 607 writePromise.wait(waitScope); 608 { auto drop = kj::mv(wrapped); } 609 KJ_EXPECT(midPumpPormise.wait(waitScope) == 3); 610 } 611 612 { 613 auto wrapped = kj::mv(wrapped2); 614 615 // Now pump from the second stream until EOF. 616 auto midPumpPormise = middlePipe2.in->pumpTo(*backWrapped); 617 618 // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle. 619 auto disconnectPromise = wrapped->whenWriteDisconnected(); 620 KJ_EXPECT(!disconnectPromise.poll(waitScope)); 621 622 auto writePromise = wrapped->write(buffer2, 3); 623 KJ_EXPECT(!writePromise.poll(waitScope)); 624 625 // The second write will also tunnel all the way down to the destination. 626 KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer2); 627 KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3); 628 exactPointerWriter.fulfill(); 629 630 writePromise.wait(waitScope); 631 { auto drop = kj::mv(wrapped); } 632 KJ_EXPECT(midPumpPormise.wait(waitScope) == 3); 633 } 634 } 635 636 KJ_TEST("KJ -> ByteStream RPC -> KJ promise stream -> ByteStream -> KJ") { 637 // Test what happens if we queue up several requests on a ByteStream and then it resolves to 638 // a shorter path. 639 640 kj::EventLoop eventLoop; 641 kj::WaitScope waitScope(eventLoop); 642 643 ByteStreamFactory factory; 644 ExactPointerWriter exactPointerWriter; 645 646 auto paf = kj::newPromiseAndFulfiller<kj::Own<kj::AsyncOutputStream>>(); 647 auto backCap = factory.kjToCapnp(kj::newPromisedStream(kj::mv(paf.promise))); 648 649 auto rpcPipe = kj::newTwoWayPipe(); 650 capnp::TwoPartyClient client(*rpcPipe.ends[0]); 651 capnp::TwoPartyClient server(*rpcPipe.ends[1], kj::mv(backCap), rpc::twoparty::Side::SERVER); 652 auto front = factory.capnpToKj(client.bootstrap().castAs<ByteStream>()); 653 654 // These will all queue up in the RPC layer. 655 front->write("foo", 3).wait(waitScope); 656 front->write("bar", 3).wait(waitScope); 657 front->write("baz", 3).wait(waitScope); 658 front->write("qux", 3).wait(waitScope); 659 660 // Make sure those writes manage to get all the way through the RPC system and queue up in the 661 // LocalClient wrapping the CapnpToKjStreamAdapter at the other end. 662 waitScope.poll(); 663 664 // Fulfill the promise. 665 paf.fulfiller->fulfill(factory.capnpToKj(factory.kjToCapnp(kj::attachRef(exactPointerWriter)))); 666 waitScope.poll(); 667 668 // Now: 669 // - "foo" should have made it all the way down to the final output stream. 670 // - "bar", "baz", and "qux" are queued on the CapnpToKjStreamAdapter immediately wrapping the 671 // KJ promise stream. 672 // - But that stream adapter has discovered that there's another capnp stream downstream and has 673 // resolved itself to the later stream. 674 // - A new call at this time should NOT be allowed to hop the queue. 675 676 exactPointerWriter.expectBuffer("foo"); 677 678 front->write("corge", 5).wait(waitScope); 679 waitScope.poll(); 680 681 exactPointerWriter.fulfill(); 682 683 waitScope.poll(); 684 exactPointerWriter.expectBuffer("bar"); 685 exactPointerWriter.fulfill(); 686 687 waitScope.poll(); 688 exactPointerWriter.expectBuffer("baz"); 689 exactPointerWriter.fulfill(); 690 691 waitScope.poll(); 692 exactPointerWriter.expectBuffer("qux"); 693 exactPointerWriter.fulfill(); 694 695 waitScope.poll(); 696 exactPointerWriter.expectBuffer("corge"); 697 exactPointerWriter.fulfill(); 698 699 // There may still be some detach()ed promises holding on to some capabilities that transitively 700 // hold a fake Own<AsyncOutputStream> pointing at exactPointerWriter, which is actually on the 701 // stack. We created a fake Own pointing to a stack variable by using 702 // kj::attachRef(exactPointerWriter), above; it does not actually own the object it points to. 703 // We need to make sure those Owns are dropped before exactPoniterWriter is destroyed, otherwise 704 // ASAN will flag some invalid reads (of exactPointerWriter's vtable, in particular). 705 waitScope.cancelAllDetached(); 706 } 707 708 // TODO: 709 // - Parallel writes (requires streaming) 710 // - Write to KJ -> capnp -> RPC -> capnp -> KJ loopback without shortening, verify we can write 711 // several things to buffer (requires streaming). 712 // - Again, but with shortening which only occurs after some promise resolve. 713 714 } // namespace 715 } // namespace capnp