capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

byte-stream-test.c++ (26412B)


      1 // Copyright (c) 2019 Cloudflare, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #include "byte-stream.h"
     23 #include <kj/test.h>
     24 #include <capnp/rpc-twoparty.h>
     25 #include <stdlib.h>
     26 
     27 namespace capnp {
     28 namespace {
     29 
     30 kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) {
     31   if (expected.size() == 0) return kj::READY_NOW;
     32 
     33   auto buffer = kj::heapArray<char>(expected.size());
     34 
     35   auto promise = in.tryRead(buffer.begin(), 1, buffer.size());
     36   return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<char> buffer, size_t amount) {
     37     if (amount == 0) {
     38       KJ_FAIL_ASSERT("expected data never sent", expected);
     39     }
     40 
     41     auto actual = buffer.slice(0, amount);
     42     if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) {
     43       KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual);
     44     }
     45 
     46     return expectRead(in, expected.slice(amount));
     47   }));
     48 }
     49 
     50 kj::String makeString(size_t size) {
     51   auto bytes = kj::heapArray<char>(size);
     52   for (char& c: bytes) {
     53     c = 'a' + rand() % 26;
     54   }
     55   bytes[bytes.size() - 1] = 0;
     56   return kj::String(kj::mv(bytes));
     57 };
     58 
     59 KJ_TEST("KJ -> ByteStream -> KJ without shortening") {
     60   kj::EventLoop eventLoop;
     61   kj::WaitScope waitScope(eventLoop);
     62 
     63   ByteStreamFactory factory1;
     64   ByteStreamFactory factory2;
     65 
     66   auto pipe = kj::newOneWayPipe();
     67 
     68   auto wrapped = factory1.capnpToKj(factory2.kjToCapnp(kj::mv(pipe.out)));
     69 
     70   {
     71     auto promise = wrapped->write("foo", 3);
     72     KJ_EXPECT(!promise.poll(waitScope));
     73     expectRead(*pipe.in, "foo").wait(waitScope);
     74     promise.wait(waitScope);
     75   }
     76 
     77   {
     78     // Write more than 1 << 16 bytes at once to exercise write splitting.
     79     auto str = makeString(1 << 17);
     80     auto promise = wrapped->write(str.begin(), str.size());
     81     KJ_EXPECT(!promise.poll(waitScope));
     82     expectRead(*pipe.in, str).wait(waitScope);
     83     promise.wait(waitScope);
     84   }
     85 
     86   {
     87     // Write more than 1 << 16 bytes via an array to exercise write splitting.
     88     auto str = makeString(1 << 18);
     89     auto pieces = kj::heapArrayBuilder<kj::ArrayPtr<const kj::byte>>(4);
     90 
     91     // Two 2^15 pieces will be combined.
     92     pieces.add(kj::arrayPtr(reinterpret_cast<kj::byte*>(str.begin()), 1 << 15));
     93     pieces.add(kj::arrayPtr(reinterpret_cast<kj::byte*>(str.begin() + (1 << 15)), 1 << 15));
     94 
     95     // One 2^16 piece will be written alone.
     96     pieces.add(kj::arrayPtr(reinterpret_cast<kj::byte*>(
     97         str.begin() + (1 << 16)), 1 << 16));
     98 
     99     // One 2^17 piece will be split.
    100     pieces.add(kj::arrayPtr(reinterpret_cast<kj::byte*>(
    101         str.begin() + (1 << 17)), str.size() - (1 << 17)));
    102 
    103     auto promise = wrapped->write(pieces);
    104     KJ_EXPECT(!promise.poll(waitScope));
    105     expectRead(*pipe.in, str).wait(waitScope);
    106     promise.wait(waitScope);
    107   }
    108 
    109   wrapped = nullptr;
    110   KJ_EXPECT(pipe.in->readAllText().wait(waitScope) == "");
    111 }
    112 
    113 class ExactPointerWriter: public kj::AsyncOutputStream {
    114 public:
    115   kj::ArrayPtr<const char> receivedBuffer;
    116 
    117   void fulfill() {
    118     KJ_ASSERT_NONNULL(fulfiller)->fulfill();
    119     fulfiller = nullptr;
    120     receivedBuffer = nullptr;
    121   }
    122 
    123   kj::Promise<void> write(const void* buffer, size_t size) override {
    124     KJ_ASSERT(fulfiller == nullptr);
    125     receivedBuffer = kj::arrayPtr(reinterpret_cast<const char*>(buffer), size);
    126     auto paf = kj::newPromiseAndFulfiller<void>();
    127     fulfiller = kj::mv(paf.fulfiller);
    128     return kj::mv(paf.promise);
    129   }
    130   kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
    131     KJ_UNIMPLEMENTED("not implemented for test");
    132   }
    133   kj::Promise<void> whenWriteDisconnected() override {
    134     return kj::NEVER_DONE;
    135   }
    136 
    137   void expectBuffer(kj::StringPtr expected) {
    138     KJ_EXPECT(receivedBuffer == expected.asArray(), receivedBuffer, expected);
    139   }
    140 
    141 private:
    142   kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> fulfiller;
    143 };
    144 
    145 KJ_TEST("KJ -> ByteStream -> KJ with shortening") {
    146   kj::EventLoop eventLoop;
    147   kj::WaitScope waitScope(eventLoop);
    148 
    149   ByteStreamFactory factory;
    150 
    151   auto pipe = kj::newOneWayPipe();
    152 
    153   ExactPointerWriter exactPointerWriter;
    154   auto pumpPromise = pipe.in->pumpTo(exactPointerWriter);
    155 
    156   auto wrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(pipe.out)));
    157 
    158   {
    159     char buffer[4] = "foo";
    160     auto promise = wrapped->write(buffer, 3);
    161     KJ_EXPECT(!promise.poll(waitScope));
    162 
    163     // This first write won't have been path-shortened because we didn't know about the shorter
    164     // path yet when it started.
    165     KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() != buffer);
    166     KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "foo");
    167     exactPointerWriter.fulfill();
    168     promise.wait(waitScope);
    169   }
    170 
    171   {
    172     char buffer[4] = "foo";
    173     auto promise = wrapped->write(buffer, 3);
    174     KJ_EXPECT(!promise.poll(waitScope));
    175 
    176     // The second write was path-shortened so passes through the exact buffer!
    177     KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer);
    178     KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
    179     exactPointerWriter.fulfill();
    180     promise.wait(waitScope);
    181   }
    182 
    183   wrapped = nullptr;
    184   KJ_EXPECT(pipe.in->readAllText().wait(waitScope) == "");
    185 }
    186 
    187 KJ_TEST("KJ -> ByteStream -> KJ -> ByteStream -> KJ with shortening") {
    188   kj::EventLoop eventLoop;
    189   kj::WaitScope waitScope(eventLoop);
    190 
    191   ByteStreamFactory factory;
    192 
    193   auto pipe = kj::newOneWayPipe();
    194 
    195   ExactPointerWriter exactPointerWriter;
    196   auto pumpPromise = pipe.in->pumpTo(exactPointerWriter);
    197 
    198   auto wrapped = factory.capnpToKj(factory.kjToCapnp(
    199                  factory.capnpToKj(factory.kjToCapnp(kj::mv(pipe.out)))));
    200 
    201   {
    202     char buffer[4] = "foo";
    203     auto promise = wrapped->write(buffer, 3);
    204     KJ_EXPECT(!promise.poll(waitScope));
    205 
    206     // This first write won't have been path-shortened because we didn't know about the shorter
    207     // path yet when it started.
    208     KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() != buffer);
    209     KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "foo");
    210     exactPointerWriter.fulfill();
    211     promise.wait(waitScope);
    212   }
    213 
    214   {
    215     char buffer[4] = "bar";
    216     auto promise = wrapped->write(buffer, 3);
    217     KJ_EXPECT(!promise.poll(waitScope));
    218 
    219     // The second write was path-shortened so passes through the exact buffer!
    220     KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer);
    221     KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
    222     exactPointerWriter.fulfill();
    223     promise.wait(waitScope);
    224   }
    225 
    226   wrapped = nullptr;
    227   KJ_EXPECT(pumpPromise.wait(waitScope) == 6);
    228 }
    229 
    230 KJ_TEST("KJ -> ByteStream -> KJ pipe -> ByteStream -> KJ with shortening") {
    231   kj::EventLoop eventLoop;
    232   kj::WaitScope waitScope(eventLoop);
    233 
    234   ByteStreamFactory factory;
    235 
    236   auto backPipe = kj::newOneWayPipe();
    237   auto middlePipe = kj::newOneWayPipe();
    238 
    239   ExactPointerWriter exactPointerWriter;
    240   auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
    241 
    242   auto backWrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(backPipe.out)));
    243   auto midPumpPormise = middlePipe.in->pumpTo(*backWrapped, 3);
    244 
    245   auto wrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe.out)));
    246 
    247   // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
    248   auto disconnectPromise = wrapped->whenWriteDisconnected();
    249   KJ_EXPECT(!disconnectPromise.poll(waitScope));
    250 
    251   char buffer[7] = "foobar";
    252   auto writePromise = wrapped->write(buffer, 6);
    253   KJ_EXPECT(!writePromise.poll(waitScope));
    254 
    255   // The first three bytes will tunnel all the way down to the destination.
    256   KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer);
    257   KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
    258   exactPointerWriter.fulfill();
    259 
    260   KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
    261 
    262   ExactPointerWriter exactPointerWriter2;
    263   midPumpPormise = middlePipe.in->pumpTo(exactPointerWriter2, 6);
    264   KJ_EXPECT(!writePromise.poll(waitScope));
    265 
    266   // The second half of the "foobar" write will have taken a slow path, because the write was
    267   // restarted in the middle of the stream re-resolving itself.
    268   KJ_EXPECT(kj::str(exactPointerWriter2.receivedBuffer) == "bar");
    269   exactPointerWriter2.fulfill();
    270 
    271   // Now that write is done.
    272   writePromise.wait(waitScope);
    273   KJ_EXPECT(!midPumpPormise.poll(waitScope));
    274 
    275   // If we write again, it'll hit the fast path.
    276   char buffer2[4] = "baz";
    277   writePromise = wrapped->write(buffer2, 3);
    278   KJ_EXPECT(!writePromise.poll(waitScope));
    279   KJ_EXPECT(exactPointerWriter2.receivedBuffer.begin() == buffer2);
    280   KJ_EXPECT(exactPointerWriter2.receivedBuffer.size() == 3);
    281   exactPointerWriter2.fulfill();
    282 
    283   KJ_EXPECT(midPumpPormise.wait(waitScope) == 6);
    284   writePromise.wait(waitScope);
    285 }
    286 
    287 KJ_TEST("KJ -> ByteStream RPC -> KJ pipe -> ByteStream RPC -> KJ with shortening") {
    288   // For this test, we're going to verify that if we have ByteStreams over RPC in both directions
    289   // and we pump a ByteStream to another ByteStream at one end of the connection, it gets shortened
    290   // all the way to the other end!
    291 
    292   kj::EventLoop eventLoop;
    293   kj::WaitScope waitScope(eventLoop);
    294 
    295   ByteStreamFactory clientFactory;
    296   ByteStreamFactory serverFactory;
    297 
    298   auto backPipe = kj::newOneWayPipe();
    299   auto middlePipe = kj::newOneWayPipe();
    300 
    301   ExactPointerWriter exactPointerWriter;
    302   auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
    303 
    304   auto rpcConnection = kj::newTwoWayPipe();
    305   capnp::TwoPartyClient client(*rpcConnection.ends[0],
    306       clientFactory.kjToCapnp(kj::mv(backPipe.out)),
    307       rpc::twoparty::Side::CLIENT);
    308   capnp::TwoPartyClient server(*rpcConnection.ends[1],
    309       serverFactory.kjToCapnp(kj::mv(middlePipe.out)),
    310       rpc::twoparty::Side::CLIENT);
    311 
    312   auto backWrapped = serverFactory.capnpToKj(server.bootstrap().castAs<ByteStream>());
    313   auto midPumpPormise = middlePipe.in->pumpTo(*backWrapped, 3);
    314 
    315   auto wrapped = clientFactory.capnpToKj(client.bootstrap().castAs<ByteStream>());
    316 
    317   // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
    318   auto disconnectPromise = wrapped->whenWriteDisconnected();
    319   KJ_EXPECT(!disconnectPromise.poll(waitScope));
    320 
    321   char buffer[7] = "foobar";
    322   auto writePromise = wrapped->write(buffer, 6);
    323 
    324   // The server side did a 3-byte pump. Path-shortening magic kicks in, and the first three bytes
    325   // of the write on the client side go *directly* to the endpoint without a copy!
    326   KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer);
    327   KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
    328   exactPointerWriter.fulfill();
    329 
    330   KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
    331 
    332   ExactPointerWriter exactPointerWriter2;
    333   midPumpPormise = middlePipe.in->pumpTo(exactPointerWriter2, 6);
    334   midPumpPormise.poll(waitScope);
    335 
    336   // The second half of the "foobar" write will have taken a slow path, because the write was
    337   // restarted in the middle of the stream re-resolving itself.
    338   KJ_EXPECT(kj::str(exactPointerWriter2.receivedBuffer) == "bar");
    339   exactPointerWriter2.fulfill();
    340 
    341   // Now that write is done.
    342   writePromise.wait(waitScope);
    343   KJ_EXPECT(!midPumpPormise.poll(waitScope));
    344 
    345   // If we write again, it'll finish the server-side pump (but won't be a zero-copy write since
    346   // it has to go over RPC).
    347   char buffer2[4] = "baz";
    348   writePromise = wrapped->write(buffer2, 3);
    349   KJ_EXPECT(!midPumpPormise.poll(waitScope));
    350   KJ_EXPECT(kj::str(exactPointerWriter2.receivedBuffer) == "baz");
    351   exactPointerWriter2.fulfill();
    352 
    353   KJ_EXPECT(midPumpPormise.wait(waitScope) == 6);
    354   writePromise.wait(waitScope);
    355 }
    356 
    357 KJ_TEST("KJ -> ByteStream RPC -> KJ pipe -> ByteStream RPC -> KJ with concurrent shortening") {
    358   // This is similar to the previous test, but we start writing before the path-shortening has
    359   // settled. This should result in some writes optimistically bouncing back and forth before
    360   // the stream settles in.
    361 
    362   kj::EventLoop eventLoop;
    363   kj::WaitScope waitScope(eventLoop);
    364 
    365   ByteStreamFactory clientFactory;
    366   ByteStreamFactory serverFactory;
    367 
    368   auto backPipe = kj::newOneWayPipe();
    369   auto middlePipe = kj::newOneWayPipe();
    370 
    371   ExactPointerWriter exactPointerWriter;
    372   auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
    373 
    374   auto rpcConnection = kj::newTwoWayPipe();
    375   capnp::TwoPartyClient client(*rpcConnection.ends[0],
    376       clientFactory.kjToCapnp(kj::mv(backPipe.out)),
    377       rpc::twoparty::Side::CLIENT);
    378   capnp::TwoPartyClient server(*rpcConnection.ends[1],
    379       serverFactory.kjToCapnp(kj::mv(middlePipe.out)),
    380       rpc::twoparty::Side::CLIENT);
    381 
    382   auto backWrapped = serverFactory.capnpToKj(server.bootstrap().castAs<ByteStream>());
    383   auto midPumpPormise = middlePipe.in->pumpTo(*backWrapped);
    384 
    385   auto wrapped = clientFactory.capnpToKj(client.bootstrap().castAs<ByteStream>());
    386 
    387   char buffer[7] = "foobar";
    388   auto writePromise = wrapped->write(buffer, 6);
    389 
    390   // The write went to RPC so it's not immediately received.
    391   KJ_EXPECT(exactPointerWriter.receivedBuffer == nullptr);
    392 
    393   // Write should be received after we turn the event loop.
    394   waitScope.poll();
    395   KJ_EXPECT(exactPointerWriter.receivedBuffer != nullptr);
    396 
    397   // Note that the promise that write() returned above has already resolved, because it hit RPC
    398   // and went into the streaming window.
    399   KJ_ASSERT(writePromise.poll(waitScope));
    400   writePromise.wait(waitScope);
    401 
    402   // Let's start a second write. Even though the first write technically isn't done yet, it's
    403   // legal for us to start a second one because the first write's returned promise optimistically
    404   // resolved for streaming window reasons. This ends up being a very tricky case for our code!
    405   char buffer2[7] = "bazqux";
    406   auto writePromise2 = wrapped->write(buffer2, 6);
    407 
    408   // Now check the first write was correct, and close it out.
    409   KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "foobar");
    410   exactPointerWriter.fulfill();
    411 
    412   // Turn event loop again. Now the second write arrives.
    413   waitScope.poll();
    414   KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "bazqux");
    415   exactPointerWriter.fulfill();
    416   writePromise2.wait(waitScope);
    417 
    418   // If we do another write now, it should be zero-copy, because everything has settled.
    419   char buffer3[6] = "corge";
    420   auto writePromise3 = wrapped->write(buffer3, 5);
    421   KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer3);
    422   KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 5);
    423   KJ_EXPECT(!writePromise3.poll(waitScope));
    424   exactPointerWriter.fulfill();
    425   writePromise3.wait(waitScope);
    426 }
    427 
    428 KJ_TEST("KJ -> KJ pipe -> ByteStream RPC -> KJ pipe -> ByteStream RPC -> KJ with concurrent shortening") {
    429   // Same as previous test, except we add a KJ pipe at the beginning and pump it into the top of
    430   // the pipe, which invokes tryPumpFrom() on the KjToCapnpStreamAdapter.
    431 
    432   kj::EventLoop eventLoop;
    433   kj::WaitScope waitScope(eventLoop);
    434 
    435   ByteStreamFactory clientFactory;
    436   ByteStreamFactory serverFactory;
    437 
    438   auto backPipe = kj::newOneWayPipe();
    439   auto middlePipe = kj::newOneWayPipe();
    440   auto frontPipe = kj::newOneWayPipe();
    441 
    442   ExactPointerWriter exactPointerWriter;
    443   auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
    444 
    445   auto rpcConnection = kj::newTwoWayPipe();
    446   capnp::TwoPartyClient client(*rpcConnection.ends[0],
    447       clientFactory.kjToCapnp(kj::mv(backPipe.out)),
    448       rpc::twoparty::Side::CLIENT);
    449   capnp::TwoPartyClient server(*rpcConnection.ends[1],
    450       serverFactory.kjToCapnp(kj::mv(middlePipe.out)),
    451       rpc::twoparty::Side::CLIENT);
    452 
    453   auto backWrapped = serverFactory.capnpToKj(server.bootstrap().castAs<ByteStream>());
    454   auto midPumpPormise = middlePipe.in->pumpTo(*backWrapped);
    455 
    456   auto wrapped = clientFactory.capnpToKj(client.bootstrap().castAs<ByteStream>());
    457   auto frontPumpPromise = frontPipe.in->pumpTo(*wrapped);
    458 
    459   char buffer[7] = "foobar";
    460   auto writePromise = frontPipe.out->write(buffer, 6);
    461 
    462   // The write went to RPC so it's not immediately received.
    463   KJ_EXPECT(exactPointerWriter.receivedBuffer == nullptr);
    464 
    465   // Write should be received after we turn the event loop.
    466   waitScope.poll();
    467   KJ_EXPECT(exactPointerWriter.receivedBuffer != nullptr);
    468 
    469   // Note that the promise that write() returned above has already resolved, because it hit RPC
    470   // and went into the streaming window.
    471   KJ_ASSERT(writePromise.poll(waitScope));
    472   writePromise.wait(waitScope);
    473 
    474   // Let's start a second write. Even though the first write technically isn't done yet, it's
    475   // legal for us to start a second one because the first write's returned promise optimistically
    476   // resolved for streaming window reasons. This ends up being a very tricky case for our code!
    477   char buffer2[7] = "bazqux";
    478   auto writePromise2 = frontPipe.out->write(buffer2, 6);
    479 
    480   // Now check the first write was correct, and close it out.
    481   KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "foobar");
    482   exactPointerWriter.fulfill();
    483 
    484   // Turn event loop again. Now the second write arrives.
    485   waitScope.poll();
    486   KJ_EXPECT(kj::str(exactPointerWriter.receivedBuffer) == "bazqux");
    487   exactPointerWriter.fulfill();
    488   writePromise2.wait(waitScope);
    489 
    490   // If we do another write now, it should be zero-copy, because everything has settled.
    491   char buffer3[6] = "corge";
    492   auto writePromise3 = frontPipe.out->write(buffer3, 5);
    493   KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer3);
    494   KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 5);
    495   KJ_EXPECT(!writePromise3.poll(waitScope));
    496   exactPointerWriter.fulfill();
    497   writePromise3.wait(waitScope);
    498 }
    499 
    500 KJ_TEST("Two Substreams on one destination") {
    501   kj::EventLoop eventLoop;
    502   kj::WaitScope waitScope(eventLoop);
    503 
    504   ByteStreamFactory factory;
    505 
    506   auto backPipe = kj::newOneWayPipe();
    507   auto middlePipe1 = kj::newOneWayPipe();
    508   auto middlePipe2 = kj::newOneWayPipe();
    509 
    510   ExactPointerWriter exactPointerWriter;
    511   auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
    512 
    513   auto backWrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(backPipe.out)));
    514 
    515   auto wrapped1 = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe1.out)));
    516   auto wrapped2 = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe2.out)));
    517 
    518   // Declare these buffers out here so that they can't possibly end up with the same address.
    519   char buffer1[4] = "foo";
    520   char buffer2[4] = "bar";
    521 
    522   {
    523     auto wrapped = kj::mv(wrapped1);
    524 
    525     // First pump 3 bytes from the first stream.
    526     auto midPumpPormise = middlePipe1.in->pumpTo(*backWrapped, 3);
    527 
    528     // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
    529     auto disconnectPromise = wrapped->whenWriteDisconnected();
    530     KJ_EXPECT(!disconnectPromise.poll(waitScope));
    531 
    532     auto writePromise = wrapped->write(buffer1, 3);
    533     KJ_EXPECT(!writePromise.poll(waitScope));
    534 
    535     // The first write will tunnel all the way down to the destination.
    536     KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer1);
    537     KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
    538     exactPointerWriter.fulfill();
    539 
    540     writePromise.wait(waitScope);
    541     KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
    542   }
    543 
    544   {
    545     auto wrapped = kj::mv(wrapped2);
    546 
    547     // Now pump another 3 bytes from the second stream.
    548     auto midPumpPormise = middlePipe2.in->pumpTo(*backWrapped, 3);
    549 
    550     // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
    551     auto disconnectPromise = wrapped->whenWriteDisconnected();
    552     KJ_EXPECT(!disconnectPromise.poll(waitScope));
    553 
    554     auto writePromise = wrapped->write(buffer2, 3);
    555     KJ_EXPECT(!writePromise.poll(waitScope));
    556 
    557     // The second write will also tunnel all the way down to the destination.
    558     KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer2);
    559     KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
    560     exactPointerWriter.fulfill();
    561 
    562     writePromise.wait(waitScope);
    563     KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
    564   }
    565 }
    566 
    567 KJ_TEST("Two Substreams on one destination no limits (pump to EOF)") {
    568   kj::EventLoop eventLoop;
    569   kj::WaitScope waitScope(eventLoop);
    570 
    571   ByteStreamFactory factory;
    572 
    573   auto backPipe = kj::newOneWayPipe();
    574   auto middlePipe1 = kj::newOneWayPipe();
    575   auto middlePipe2 = kj::newOneWayPipe();
    576 
    577   ExactPointerWriter exactPointerWriter;
    578   auto backPumpPromise = backPipe.in->pumpTo(exactPointerWriter);
    579 
    580   auto backWrapped = factory.capnpToKj(factory.kjToCapnp(kj::mv(backPipe.out)));
    581 
    582   auto wrapped1 = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe1.out)));
    583   auto wrapped2 = factory.capnpToKj(factory.kjToCapnp(kj::mv(middlePipe2.out)));
    584 
    585   // Declare these buffers out here so that they can't possibly end up with the same address.
    586   char buffer1[4] = "foo";
    587   char buffer2[4] = "bar";
    588 
    589   {
    590     auto wrapped = kj::mv(wrapped1);
    591 
    592     // First pump from the first stream until EOF.
    593     auto midPumpPormise = middlePipe1.in->pumpTo(*backWrapped);
    594 
    595     // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
    596     auto disconnectPromise = wrapped->whenWriteDisconnected();
    597     KJ_EXPECT(!disconnectPromise.poll(waitScope));
    598 
    599     auto writePromise = wrapped->write(buffer1, 3);
    600     KJ_EXPECT(!writePromise.poll(waitScope));
    601 
    602     // The first write will tunnel all the way down to the destination.
    603     KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer1);
    604     KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
    605     exactPointerWriter.fulfill();
    606 
    607     writePromise.wait(waitScope);
    608     { auto drop = kj::mv(wrapped); }
    609     KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
    610   }
    611 
    612   {
    613     auto wrapped = kj::mv(wrapped2);
    614 
    615     // Now pump from the second stream until EOF.
    616     auto midPumpPormise = middlePipe2.in->pumpTo(*backWrapped);
    617 
    618     // Poll whenWriteDisconnected(), mainly as a way to let all the path-shortening settle.
    619     auto disconnectPromise = wrapped->whenWriteDisconnected();
    620     KJ_EXPECT(!disconnectPromise.poll(waitScope));
    621 
    622     auto writePromise = wrapped->write(buffer2, 3);
    623     KJ_EXPECT(!writePromise.poll(waitScope));
    624 
    625     // The second write will also tunnel all the way down to the destination.
    626     KJ_EXPECT(exactPointerWriter.receivedBuffer.begin() == buffer2);
    627     KJ_EXPECT(exactPointerWriter.receivedBuffer.size() == 3);
    628     exactPointerWriter.fulfill();
    629 
    630     writePromise.wait(waitScope);
    631     { auto drop = kj::mv(wrapped); }
    632     KJ_EXPECT(midPumpPormise.wait(waitScope) == 3);
    633   }
    634 }
    635 
    636 KJ_TEST("KJ -> ByteStream RPC -> KJ promise stream -> ByteStream -> KJ") {
    637   // Test what happens if we queue up several requests on a ByteStream and then it resolves to
    638   // a shorter path.
    639 
    640   kj::EventLoop eventLoop;
    641   kj::WaitScope waitScope(eventLoop);
    642 
    643   ByteStreamFactory factory;
    644   ExactPointerWriter exactPointerWriter;
    645 
    646   auto paf = kj::newPromiseAndFulfiller<kj::Own<kj::AsyncOutputStream>>();
    647   auto backCap = factory.kjToCapnp(kj::newPromisedStream(kj::mv(paf.promise)));
    648 
    649   auto rpcPipe = kj::newTwoWayPipe();
    650   capnp::TwoPartyClient client(*rpcPipe.ends[0]);
    651   capnp::TwoPartyClient server(*rpcPipe.ends[1], kj::mv(backCap), rpc::twoparty::Side::SERVER);
    652   auto front = factory.capnpToKj(client.bootstrap().castAs<ByteStream>());
    653 
    654   // These will all queue up in the RPC layer.
    655   front->write("foo", 3).wait(waitScope);
    656   front->write("bar", 3).wait(waitScope);
    657   front->write("baz", 3).wait(waitScope);
    658   front->write("qux", 3).wait(waitScope);
    659 
    660   // Make sure those writes manage to get all the way through the RPC system and queue up in the
    661   // LocalClient wrapping the CapnpToKjStreamAdapter at the other end.
    662   waitScope.poll();
    663 
    664   // Fulfill the promise.
    665   paf.fulfiller->fulfill(factory.capnpToKj(factory.kjToCapnp(kj::attachRef(exactPointerWriter))));
    666   waitScope.poll();
    667 
    668   // Now:
    669   // - "foo" should have made it all the way down to the final output stream.
    670   // - "bar", "baz", and "qux" are queued on the CapnpToKjStreamAdapter immediately wrapping the
    671   //   KJ promise stream.
    672   // - But that stream adapter has discovered that there's another capnp stream downstream and has
    673   //   resolved itself to the later stream.
    674   // - A new call at this time should NOT be allowed to hop the queue.
    675 
    676   exactPointerWriter.expectBuffer("foo");
    677 
    678   front->write("corge", 5).wait(waitScope);
    679   waitScope.poll();
    680 
    681   exactPointerWriter.fulfill();
    682 
    683   waitScope.poll();
    684   exactPointerWriter.expectBuffer("bar");
    685   exactPointerWriter.fulfill();
    686 
    687   waitScope.poll();
    688   exactPointerWriter.expectBuffer("baz");
    689   exactPointerWriter.fulfill();
    690 
    691   waitScope.poll();
    692   exactPointerWriter.expectBuffer("qux");
    693   exactPointerWriter.fulfill();
    694 
    695   waitScope.poll();
    696   exactPointerWriter.expectBuffer("corge");
    697   exactPointerWriter.fulfill();
    698 
    699   // There may still be some detach()ed promises holding on to some capabilities that transitively
    700   // hold a fake Own<AsyncOutputStream> pointing at exactPointerWriter, which is actually on the
    701   // stack. We created a fake Own pointing to a stack variable by using
    702   // kj::attachRef(exactPointerWriter), above; it does not actually own the object it points to.
    703   // We need to make sure those Owns are dropped before exactPoniterWriter is destroyed, otherwise
    704   // ASAN will flag some invalid reads (of exactPointerWriter's vtable, in particular).
    705   waitScope.cancelAllDetached();
    706 }
    707 
    708 // TODO:
    709 // - Parallel writes (requires streaming)
    710 // - Write to KJ -> capnp -> RPC -> capnp -> KJ loopback without shortening, verify we can write
    711 //   several things to buffer (requires streaming).
    712 // - Again, but with shortening which only occurs after some promise resolve.
    713 
    714 }  // namespace
    715 }  // namespace capnp