capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

rpc-twoparty-test.c++ (31437B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #define CAPNP_TESTING_CAPNP 1
     23 
     24 #ifndef _GNU_SOURCE
     25 #define _GNU_SOURCE
     26 #endif
     27 
     28 // Includes just for need SOL_SOCKET and SO_SNDBUF
     29 #if _WIN32
     30 #include <kj/win32-api-version.h>
     31 #endif
     32 
     33 #include "rpc-twoparty.h"
     34 #include "test-util.h"
     35 #include <capnp/rpc.capnp.h>
     36 #include <kj/debug.h>
     37 #include <kj/thread.h>
     38 #include <kj/compat/gtest.h>
     39 #include <kj/miniposix.h>
     40 
     41 #if _WIN32
     42 #include <winsock2.h>
     43 #include <mswsock.h>
     44 #include <kj/windows-sanity.h>
     45 #else
     46 #include <sys/socket.h>
     47 #endif
     48 
     49 // TODO(cleanup): Auto-generate stringification functions for union discriminants.
     50 namespace capnp {
     51 namespace rpc {
     52 inline kj::String KJ_STRINGIFY(Message::Which which) {
     53   return kj::str(static_cast<uint16_t>(which));
     54 }
     55 }  // namespace rpc
     56 }  // namespace capnp
     57 
     58 namespace capnp {
     59 namespace _ {
     60 namespace {
     61 
     62 class TestRestorer final: public SturdyRefRestorer<test::TestSturdyRefObjectId> {
     63 public:
     64   TestRestorer(int& callCount, int& handleCount)
     65       : callCount(callCount), handleCount(handleCount) {}
     66 
     67   Capability::Client restore(test::TestSturdyRefObjectId::Reader objectId) override {
     68     switch (objectId.getTag()) {
     69       case test::TestSturdyRefObjectId::Tag::TEST_INTERFACE:
     70         return kj::heap<TestInterfaceImpl>(callCount);
     71       case test::TestSturdyRefObjectId::Tag::TEST_EXTENDS:
     72         return Capability::Client(newBrokenCap("No TestExtends implemented."));
     73       case test::TestSturdyRefObjectId::Tag::TEST_PIPELINE:
     74         return kj::heap<TestPipelineImpl>(callCount);
     75       case test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLEE:
     76         return kj::heap<TestTailCalleeImpl>(callCount);
     77       case test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLER:
     78         return kj::heap<TestTailCallerImpl>(callCount);
     79       case test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF:
     80         return kj::heap<TestMoreStuffImpl>(callCount, handleCount);
     81     }
     82     KJ_UNREACHABLE;
     83   }
     84 
     85 private:
     86   int& callCount;
     87   int& handleCount;
     88 };
     89 
     90 class TestMonotonicClock final: public kj::MonotonicClock {
     91 public:
     92   kj::TimePoint now() const override {
     93     return time;
     94   }
     95 
     96   void reset() { time = kj::systemCoarseMonotonicClock().now(); }
     97   void increment(kj::Duration d) { time += d; }
     98 private:
     99   kj::TimePoint time = kj::systemCoarseMonotonicClock().now();
    100 };
    101 
    102 kj::AsyncIoProvider::PipeThread runServer(kj::AsyncIoProvider& ioProvider,
    103                                           int& callCount, int& handleCount) {
    104   return ioProvider.newPipeThread(
    105       [&callCount, &handleCount](
    106        kj::AsyncIoProvider& ioProvider, kj::AsyncIoStream& stream, kj::WaitScope& waitScope) {
    107     TwoPartyVatNetwork network(stream, rpc::twoparty::Side::SERVER);
    108     TestRestorer restorer(callCount, handleCount);
    109     auto server = makeRpcServer(network, restorer);
    110     network.onDisconnect().wait(waitScope);
    111   });
    112 }
    113 
    114 Capability::Client getPersistentCap(RpcSystem<rpc::twoparty::VatId>& client,
    115                                     rpc::twoparty::Side side,
    116                                     test::TestSturdyRefObjectId::Tag tag) {
    117   // Create the VatId.
    118   MallocMessageBuilder hostIdMessage(8);
    119   auto hostId = hostIdMessage.initRoot<rpc::twoparty::VatId>();
    120   hostId.setSide(side);
    121 
    122   // Create the SturdyRefObjectId.
    123   MallocMessageBuilder objectIdMessage(8);
    124   objectIdMessage.initRoot<test::TestSturdyRefObjectId>().setTag(tag);
    125 
    126   // Connect to the remote capability.
    127   return client.restore(hostId, objectIdMessage.getRoot<AnyPointer>());
    128 }
    129 
    130 TEST(TwoPartyNetwork, Basic) {
    131   auto ioContext = kj::setupAsyncIo();
    132   TestMonotonicClock clock;
    133   int callCount = 0;
    134   int handleCount = 0;
    135 
    136   auto serverThread = runServer(*ioContext.provider, callCount, handleCount);
    137   TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT, capnp::ReaderOptions(), clock);
    138   auto rpcClient = makeRpcClient(network);
    139 
    140   KJ_EXPECT(network.getCurrentQueueCount() == 0);
    141   KJ_EXPECT(network.getCurrentQueueSize() == 0);
    142   KJ_EXPECT(network.getOutgoingMessageWaitTime() == 0 * kj::SECONDS);
    143 
    144   // Request the particular capability from the server.
    145   auto client = getPersistentCap(rpcClient, rpc::twoparty::Side::SERVER,
    146       test::TestSturdyRefObjectId::Tag::TEST_INTERFACE).castAs<test::TestInterface>();
    147   clock.increment(1 * kj::SECONDS);
    148 
    149   KJ_EXPECT(network.getCurrentQueueCount() == 1);
    150   KJ_EXPECT(network.getCurrentQueueSize() > 0);
    151   KJ_EXPECT(network.getOutgoingMessageWaitTime() == 1 * kj::SECONDS);
    152   size_t oldSize = network.getCurrentQueueSize();
    153 
    154   // Use the capability.
    155   auto request1 = client.fooRequest();
    156   request1.setI(123);
    157   request1.setJ(true);
    158   auto promise1 = request1.send();
    159 
    160   KJ_EXPECT(network.getCurrentQueueCount() == 2);
    161   KJ_EXPECT(network.getCurrentQueueSize() > oldSize);
    162   KJ_EXPECT(network.getOutgoingMessageWaitTime() == 1 * kj::SECONDS);
    163   oldSize = network.getCurrentQueueSize();
    164 
    165   auto request2 = client.bazRequest();
    166   initTestMessage(request2.initS());
    167   auto promise2 = request2.send();
    168 
    169   KJ_EXPECT(network.getCurrentQueueCount() == 3);
    170   KJ_EXPECT(network.getCurrentQueueSize() > oldSize);
    171   oldSize = network.getCurrentQueueSize();
    172 
    173   clock.increment(1 * kj::SECONDS);
    174 
    175   bool barFailed = false;
    176   auto request3 = client.barRequest();
    177   auto promise3 = request3.send().then(
    178       [](Response<test::TestInterface::BarResults>&& response) {
    179         ADD_FAILURE() << "Expected bar() call to fail.";
    180       }, [&](kj::Exception&& e) {
    181         barFailed = true;
    182       });
    183 
    184   EXPECT_EQ(0, callCount);
    185 
    186   KJ_EXPECT(network.getCurrentQueueCount() == 4);
    187   KJ_EXPECT(network.getCurrentQueueSize() > oldSize);
    188   // Oldest message is now 2 seconds old
    189   KJ_EXPECT(network.getOutgoingMessageWaitTime() == 2 * kj::SECONDS);
    190   oldSize = network.getCurrentQueueSize();
    191 
    192   auto response1 = promise1.wait(ioContext.waitScope);
    193 
    194   EXPECT_EQ("foo", response1.getX());
    195 
    196   auto response2 = promise2.wait(ioContext.waitScope);
    197 
    198   promise3.wait(ioContext.waitScope);
    199 
    200   EXPECT_EQ(2, callCount);
    201   EXPECT_TRUE(barFailed);
    202 
    203   // There's still a `Finish` message queued.
    204   KJ_EXPECT(network.getCurrentQueueCount() > 0);
    205   KJ_EXPECT(network.getCurrentQueueSize() > 0);
    206   // Oldest message was sent, next oldest should be 0 seconds old since we haven't incremented
    207   // the clock yet.
    208   KJ_EXPECT(network.getOutgoingMessageWaitTime() == 0 * kj::SECONDS);
    209 
    210   // Let any I/O finish.
    211   kj::Promise<void>(kj::NEVER_DONE).poll(ioContext.waitScope);
    212 
    213   // Now nothing is queued.
    214   KJ_EXPECT(network.getCurrentQueueCount() == 0);
    215   KJ_EXPECT(network.getCurrentQueueSize() == 0);
    216 }
    217 
    218 TEST(TwoPartyNetwork, Pipelining) {
    219   auto ioContext = kj::setupAsyncIo();
    220   int callCount = 0;
    221   int handleCount = 0;
    222   int reverseCallCount = 0;  // Calls back from server to client.
    223 
    224   auto serverThread = runServer(*ioContext.provider, callCount, handleCount);
    225   TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT);
    226   auto rpcClient = makeRpcClient(network);
    227 
    228   bool disconnected = false;
    229   kj::Promise<void> disconnectPromise = network.onDisconnect().then([&]() { disconnected = true; });
    230 
    231   {
    232     // Request the particular capability from the server.
    233     auto client = getPersistentCap(rpcClient, rpc::twoparty::Side::SERVER,
    234         test::TestSturdyRefObjectId::Tag::TEST_PIPELINE).castAs<test::TestPipeline>();
    235 
    236     {
    237       // Use the capability.
    238       auto request = client.getCapRequest();
    239       request.setN(234);
    240       request.setInCap(kj::heap<TestInterfaceImpl>(reverseCallCount));
    241 
    242       auto promise = request.send();
    243 
    244       auto pipelineRequest = promise.getOutBox().getCap().fooRequest();
    245       pipelineRequest.setI(321);
    246       auto pipelinePromise = pipelineRequest.send();
    247 
    248       auto pipelineRequest2 = promise.getOutBox().getCap()
    249           .castAs<test::TestExtends>().graultRequest();
    250       auto pipelinePromise2 = pipelineRequest2.send();
    251 
    252       promise = nullptr;  // Just to be annoying, drop the original promise.
    253 
    254       EXPECT_EQ(0, callCount);
    255       EXPECT_EQ(0, reverseCallCount);
    256 
    257       auto response = pipelinePromise.wait(ioContext.waitScope);
    258       EXPECT_EQ("bar", response.getX());
    259 
    260       auto response2 = pipelinePromise2.wait(ioContext.waitScope);
    261       checkTestMessage(response2);
    262 
    263       EXPECT_EQ(3, callCount);
    264       EXPECT_EQ(1, reverseCallCount);
    265     }
    266 
    267     EXPECT_FALSE(disconnected);
    268 
    269     // What if we disconnect?
    270     // TODO(cleanup): This is kind of cheating, we are shutting down the underlying socket to
    271     //   simulate a disconnect, but it's weird to pull the rug out from under our VatNetwork like
    272     //   this and it causes a bit of a race between write failures and read failures. This part of
    273     //   the test should maybe be restructured.
    274     serverThread.pipe->shutdownWrite();
    275 
    276     // The other side should also disconnect.
    277     disconnectPromise.wait(ioContext.waitScope);
    278 
    279     {
    280       // Use the now-broken capability.
    281       auto request = client.getCapRequest();
    282       request.setN(234);
    283       request.setInCap(kj::heap<TestInterfaceImpl>(reverseCallCount));
    284 
    285       auto promise = request.send();
    286 
    287       auto pipelineRequest = promise.getOutBox().getCap().fooRequest();
    288       pipelineRequest.setI(321);
    289       auto pipelinePromise = pipelineRequest.send();
    290 
    291       auto pipelineRequest2 = promise.getOutBox().getCap()
    292           .castAs<test::TestExtends>().graultRequest();
    293       auto pipelinePromise2 = pipelineRequest2.send();
    294 
    295       pipelinePromise.then([](auto) {
    296         KJ_FAIL_EXPECT("should have thrown");
    297       }, [](kj::Exception&& e) {
    298         KJ_EXPECT(e.getType() == kj::Exception::Type::DISCONNECTED);
    299         // I wish we could test stack traces somehow... oh well.
    300       }).wait(ioContext.waitScope);
    301 
    302       pipelinePromise2.then([](auto) {
    303         KJ_FAIL_EXPECT("should have thrown");
    304       }, [](kj::Exception&& e) {
    305         KJ_EXPECT(e.getType() == kj::Exception::Type::DISCONNECTED);
    306         // I wish we could test stack traces somehow... oh well.
    307       }).wait(ioContext.waitScope);
    308 
    309       EXPECT_EQ(3, callCount);
    310       EXPECT_EQ(1, reverseCallCount);
    311     }
    312   }
    313 }
    314 
    315 TEST(TwoPartyNetwork, Release) {
    316   auto ioContext = kj::setupAsyncIo();
    317   int callCount = 0;
    318   int handleCount = 0;
    319 
    320   auto serverThread = runServer(*ioContext.provider, callCount, handleCount);
    321   TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT);
    322   auto rpcClient = makeRpcClient(network);
    323 
    324   // Request the particular capability from the server.
    325   auto client = getPersistentCap(rpcClient, rpc::twoparty::Side::SERVER,
    326       test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF).castAs<test::TestMoreStuff>();
    327 
    328   auto handle1 = client.getHandleRequest().send().wait(ioContext.waitScope).getHandle();
    329   auto promise = client.getHandleRequest().send();
    330   auto handle2 = promise.wait(ioContext.waitScope).getHandle();
    331 
    332   EXPECT_EQ(2, handleCount);
    333 
    334   handle1 = nullptr;
    335 
    336   // There once was a bug where the last outgoing message (and any capabilities attached) would
    337   // not get cleaned up (until a new message was sent). This appeared to be a bug in Release,
    338   // because if a client received a message and then released a capability from it but then did
    339   // not make any further calls, then the capability would not be released because the message
    340   // introducing it remained the last server -> client message (because a "Release" message has
    341   // no reply). Here we are explicitly trying to catch this bug. This proves tricky, because when
    342   // we drop a reference on the client side, there's no particular way to wait for the release
    343   // message to reach the server except to make a subsequent call and wait for the return -- but
    344   // that would mask the bug. So, we wait spin waiting for handleCount to change.
    345 
    346   uint maxSpins = 1000;
    347 
    348   while (handleCount > 1) {
    349     ioContext.provider->getTimer().afterDelay(10 * kj::MILLISECONDS).wait(ioContext.waitScope);
    350     KJ_ASSERT(--maxSpins > 0);
    351   }
    352   EXPECT_EQ(1, handleCount);
    353 
    354   handle2 = nullptr;
    355 
    356   ioContext.provider->getTimer().afterDelay(10 * kj::MILLISECONDS).wait(ioContext.waitScope);
    357   EXPECT_EQ(1, handleCount);
    358 
    359   promise = nullptr;
    360 
    361   while (handleCount > 0) {
    362     ioContext.provider->getTimer().afterDelay(10 * kj::MILLISECONDS).wait(ioContext.waitScope);
    363     KJ_ASSERT(--maxSpins > 0);
    364   }
    365   EXPECT_EQ(0, handleCount);
    366 }
    367 
    368 TEST(TwoPartyNetwork, Abort) {
    369   // Verify that aborts are received.
    370 
    371   auto ioContext = kj::setupAsyncIo();
    372   int callCount = 0;
    373   int handleCount = 0;
    374 
    375   auto serverThread = runServer(*ioContext.provider, callCount, handleCount);
    376   TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT);
    377 
    378   MallocMessageBuilder refMessage(128);
    379   auto hostId = refMessage.initRoot<rpc::twoparty::VatId>();
    380   hostId.setSide(rpc::twoparty::Side::SERVER);
    381 
    382   auto conn = KJ_ASSERT_NONNULL(network.connect(hostId));
    383 
    384   {
    385     // Send an invalid message (Return to non-existent question).
    386     auto msg = conn->newOutgoingMessage(128);
    387     auto body = msg->getBody().initAs<rpc::Message>().initReturn();
    388     body.setAnswerId(1234);
    389     body.setCanceled();
    390     msg->send();
    391   }
    392 
    393   auto reply = KJ_ASSERT_NONNULL(conn->receiveIncomingMessage().wait(ioContext.waitScope));
    394   EXPECT_EQ(rpc::Message::ABORT, reply->getBody().getAs<rpc::Message>().which());
    395 
    396   EXPECT_TRUE(conn->receiveIncomingMessage().wait(ioContext.waitScope) == nullptr);
    397 }
    398 
    399 TEST(TwoPartyNetwork, ConvenienceClasses) {
    400   auto ioContext = kj::setupAsyncIo();
    401 
    402   int callCount = 0;
    403   TwoPartyServer server(kj::heap<TestInterfaceImpl>(callCount));
    404 
    405   auto address = ioContext.provider->getNetwork()
    406       .parseAddress("127.0.0.1").wait(ioContext.waitScope);
    407 
    408   auto listener = address->listen();
    409   auto listenPromise = server.listen(*listener);
    410 
    411   address = ioContext.provider->getNetwork()
    412       .parseAddress("127.0.0.1", listener->getPort()).wait(ioContext.waitScope);
    413 
    414   auto connection = address->connect().wait(ioContext.waitScope);
    415   TwoPartyClient client(*connection);
    416   auto cap = client.bootstrap().castAs<test::TestInterface>();
    417 
    418   auto request = cap.fooRequest();
    419   request.setI(123);
    420   request.setJ(true);
    421   EXPECT_EQ(0, callCount);
    422   auto response = request.send().wait(ioContext.waitScope);
    423   EXPECT_EQ("foo", response.getX());
    424   EXPECT_EQ(1, callCount);
    425 }
    426 
    427 TEST(TwoPartyNetwork, HugeMessage) {
    428   auto ioContext = kj::setupAsyncIo();
    429   int callCount = 0;
    430   int handleCount = 0;
    431 
    432   auto serverThread = runServer(*ioContext.provider, callCount, handleCount);
    433   TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT);
    434   auto rpcClient = makeRpcClient(network);
    435 
    436   auto client = getPersistentCap(rpcClient, rpc::twoparty::Side::SERVER,
    437       test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF).castAs<test::TestMoreStuff>();
    438 
    439   // Oversized request fails.
    440   {
    441     auto req = client.methodWithDefaultsRequest();
    442     req.initA(100000000);  // 100 MB
    443 
    444     KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("larger than our single-message size limit",
    445         req.send().ignoreResult().wait(ioContext.waitScope));
    446   }
    447 
    448   // Oversized response fails.
    449   KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("larger than our single-message size limit",
    450       client.getEnormousStringRequest().send().ignoreResult().wait(ioContext.waitScope));
    451 
    452   // Connection is still up.
    453   {
    454     auto req = client.getCallSequenceRequest();
    455     req.setExpected(0);
    456     KJ_EXPECT(req.send().wait(ioContext.waitScope).getN() == 0);
    457   }
    458 }
    459 
    460 class TestAuthenticatedBootstrapImpl final
    461     : public test::TestAuthenticatedBootstrap<rpc::twoparty::VatId>::Server {
    462 public:
    463   TestAuthenticatedBootstrapImpl(rpc::twoparty::VatId::Reader clientId) {
    464     this->clientId.setRoot(clientId);
    465   }
    466 
    467 protected:
    468   kj::Promise<void> getCallerId(GetCallerIdContext context) override {
    469     context.getResults().setCaller(clientId.getRoot<rpc::twoparty::VatId>());
    470     return kj::READY_NOW;
    471   }
    472 
    473 private:
    474   MallocMessageBuilder clientId;
    475 };
    476 
    477 class TestBootstrapFactory: public BootstrapFactory<rpc::twoparty::VatId> {
    478 public:
    479   Capability::Client createFor(rpc::twoparty::VatId::Reader clientId) {
    480     called = true;
    481     EXPECT_EQ(rpc::twoparty::Side::CLIENT, clientId.getSide());
    482     return kj::heap<TestAuthenticatedBootstrapImpl>(clientId);
    483   }
    484 
    485   bool called = false;
    486 };
    487 
    488 kj::AsyncIoProvider::PipeThread runAuthenticatingServer(
    489     kj::AsyncIoProvider& ioProvider, BootstrapFactory<rpc::twoparty::VatId>& bootstrapFactory) {
    490   return ioProvider.newPipeThread([&bootstrapFactory](
    491       kj::AsyncIoProvider& ioProvider, kj::AsyncIoStream& stream, kj::WaitScope& waitScope) {
    492     TwoPartyVatNetwork network(stream, rpc::twoparty::Side::SERVER);
    493     auto server = makeRpcServer(network, bootstrapFactory);
    494     network.onDisconnect().wait(waitScope);
    495   });
    496 }
    497 
    498 TEST(TwoPartyNetwork, BootstrapFactory) {
    499   auto ioContext = kj::setupAsyncIo();
    500   TestBootstrapFactory bootstrapFactory;
    501   auto serverThread = runAuthenticatingServer(*ioContext.provider, bootstrapFactory);
    502   TwoPartyClient client(*serverThread.pipe);
    503   auto resp = client.bootstrap().castAs<test::TestAuthenticatedBootstrap<rpc::twoparty::VatId>>()
    504       .getCallerIdRequest().send().wait(ioContext.waitScope);
    505   EXPECT_EQ(rpc::twoparty::Side::CLIENT, resp.getCaller().getSide());
    506   EXPECT_TRUE(bootstrapFactory.called);
    507 }
    508 
    509 // =======================================================================================
    510 
    511 #if !_WIN32 && !__CYGWIN__  // Windows and Cygwin don't support SCM_RIGHTS.
    512 KJ_TEST("send FD over RPC") {
    513   auto io = kj::setupAsyncIo();
    514 
    515   int callCount = 0;
    516   int handleCount = 0;
    517   TwoPartyServer server(kj::heap<TestMoreStuffImpl>(callCount, handleCount));
    518   auto pipe = io.provider->newCapabilityPipe();
    519   server.accept(kj::mv(pipe.ends[0]), 2);
    520   TwoPartyClient client(*pipe.ends[1], 2);
    521 
    522   auto cap = client.bootstrap().castAs<test::TestMoreStuff>();
    523 
    524   int pipeFds[2];
    525   KJ_SYSCALL(kj::miniposix::pipe(pipeFds));
    526   kj::AutoCloseFd in1(pipeFds[0]);
    527   kj::AutoCloseFd out1(pipeFds[1]);
    528   KJ_SYSCALL(kj::miniposix::pipe(pipeFds));
    529   kj::AutoCloseFd in2(pipeFds[0]);
    530   kj::AutoCloseFd out2(pipeFds[1]);
    531 
    532   capnp::RemotePromise<test::TestMoreStuff::WriteToFdResults> promise = nullptr;
    533   {
    534     auto req = cap.writeToFdRequest();
    535 
    536     // Order reversal intentional, just trying to mix things up.
    537     req.setFdCap1(kj::heap<TestFdCap>(kj::mv(out2)));
    538     req.setFdCap2(kj::heap<TestFdCap>(kj::mv(out1)));
    539 
    540     promise = req.send();
    541   }
    542 
    543   int in3 = KJ_ASSERT_NONNULL(promise.getFdCap3().getFd().wait(io.waitScope));
    544   KJ_EXPECT(io.lowLevelProvider->wrapInputFd(kj::mv(in3))->readAllText().wait(io.waitScope)
    545             == "baz");
    546 
    547   {
    548     auto promise2 = kj::mv(promise);  // make sure the PipelineHook also goes out of scope
    549     auto response = promise2.wait(io.waitScope);
    550     KJ_EXPECT(response.getSecondFdPresent());
    551   }
    552 
    553   KJ_EXPECT(io.lowLevelProvider->wrapInputFd(kj::mv(in1))->readAllText().wait(io.waitScope)
    554             == "bar");
    555   KJ_EXPECT(io.lowLevelProvider->wrapInputFd(kj::mv(in2))->readAllText().wait(io.waitScope)
    556             == "foo");
    557 }
    558 
    559 KJ_TEST("FD per message limit") {
    560   auto io = kj::setupAsyncIo();
    561 
    562   int callCount = 0;
    563   int handleCount = 0;
    564   TwoPartyServer server(kj::heap<TestMoreStuffImpl>(callCount, handleCount));
    565   auto pipe = io.provider->newCapabilityPipe();
    566   server.accept(kj::mv(pipe.ends[0]), 1);
    567   TwoPartyClient client(*pipe.ends[1], 1);
    568 
    569   auto cap = client.bootstrap().castAs<test::TestMoreStuff>();
    570 
    571   int pipeFds[2];
    572   KJ_SYSCALL(kj::miniposix::pipe(pipeFds));
    573   kj::AutoCloseFd in1(pipeFds[0]);
    574   kj::AutoCloseFd out1(pipeFds[1]);
    575   KJ_SYSCALL(kj::miniposix::pipe(pipeFds));
    576   kj::AutoCloseFd in2(pipeFds[0]);
    577   kj::AutoCloseFd out2(pipeFds[1]);
    578 
    579   capnp::RemotePromise<test::TestMoreStuff::WriteToFdResults> promise = nullptr;
    580   {
    581     auto req = cap.writeToFdRequest();
    582 
    583     // Order reversal intentional, just trying to mix things up.
    584     req.setFdCap1(kj::heap<TestFdCap>(kj::mv(out2)));
    585     req.setFdCap2(kj::heap<TestFdCap>(kj::mv(out1)));
    586 
    587     promise = req.send();
    588   }
    589 
    590   int in3 = KJ_ASSERT_NONNULL(promise.getFdCap3().getFd().wait(io.waitScope));
    591   KJ_EXPECT(io.lowLevelProvider->wrapInputFd(kj::mv(in3))->readAllText().wait(io.waitScope)
    592             == "baz");
    593 
    594   {
    595     auto promise2 = kj::mv(promise);  // make sure the PipelineHook also goes out of scope
    596     auto response = promise2.wait(io.waitScope);
    597     KJ_EXPECT(!response.getSecondFdPresent());
    598   }
    599 
    600   KJ_EXPECT(io.lowLevelProvider->wrapInputFd(kj::mv(in1))->readAllText().wait(io.waitScope)
    601             == "");
    602   KJ_EXPECT(io.lowLevelProvider->wrapInputFd(kj::mv(in2))->readAllText().wait(io.waitScope)
    603             == "foo");
    604 }
    605 #endif  // !_WIN32 && !__CYGWIN__
    606 
    607 // =======================================================================================
    608 
    609 class MockSndbufStream final: public kj::AsyncIoStream {
    610 public:
    611   MockSndbufStream(kj::Own<AsyncIoStream> inner, size_t& window, size_t& written)
    612       : inner(kj::mv(inner)), window(window), written(written) {}
    613 
    614   kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
    615     return inner->read(buffer, minBytes, maxBytes);
    616   }
    617   kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
    618     return inner->tryRead(buffer, minBytes, maxBytes);
    619   }
    620   kj::Maybe<uint64_t> tryGetLength() override {
    621     return inner->tryGetLength();
    622   }
    623   kj::Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
    624     return inner->pumpTo(output, amount);
    625   }
    626   kj::Promise<void> write(const void* buffer, size_t size) override {
    627     written += size;
    628     return inner->write(buffer, size);
    629   }
    630   kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
    631     for (auto& piece: pieces) written += piece.size();
    632     return inner->write(pieces);
    633   }
    634   kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
    635       kj::AsyncInputStream& input, uint64_t amount) override {
    636     return inner->tryPumpFrom(input, amount);
    637   }
    638   kj::Promise<void> whenWriteDisconnected() override { return inner->whenWriteDisconnected(); }
    639   void shutdownWrite() override { return inner->shutdownWrite(); }
    640   void abortRead() override { return inner->abortRead(); }
    641 
    642   void getsockopt(int level, int option, void* value, uint* length) override {
    643     if (level == SOL_SOCKET && option == SO_SNDBUF) {
    644       KJ_ASSERT(*length == sizeof(int));
    645       *reinterpret_cast<int*>(value) = window;
    646     } else {
    647       KJ_UNIMPLEMENTED("not implemented for test", level, option);
    648     }
    649   }
    650 
    651 private:
    652   kj::Own<AsyncIoStream> inner;
    653   size_t& window;
    654   size_t& written;
    655 };
    656 
    657 KJ_TEST("Streaming over RPC") {
    658   kj::EventLoop loop;
    659   kj::WaitScope waitScope(loop);
    660 
    661   auto pipe = kj::newTwoWayPipe();
    662 
    663   size_t window = 1024;
    664   size_t clientWritten = 0;
    665   size_t serverWritten = 0;
    666 
    667   pipe.ends[0] = kj::heap<MockSndbufStream>(kj::mv(pipe.ends[0]), window, clientWritten);
    668   pipe.ends[1] = kj::heap<MockSndbufStream>(kj::mv(pipe.ends[1]), window, serverWritten);
    669 
    670   auto ownServer = kj::heap<TestStreamingImpl>();
    671   auto& server = *ownServer;
    672   test::TestStreaming::Client serverCap(kj::mv(ownServer));
    673 
    674   TwoPartyClient tpClient(*pipe.ends[0]);
    675   TwoPartyClient tpServer(*pipe.ends[1], serverCap, rpc::twoparty::Side::SERVER);
    676 
    677   auto cap = tpClient.bootstrap().castAs<test::TestStreaming>();
    678 
    679   // Send stream requests until we can't anymore.
    680   kj::Promise<void> promise = kj::READY_NOW;
    681   uint count = 0;
    682   while (promise.poll(waitScope)) {
    683     promise.wait(waitScope);
    684 
    685     auto req = cap.doStreamIRequest();
    686     req.setI(++count);
    687     promise = req.send();
    688   }
    689 
    690   // We should have sent... several.
    691   KJ_EXPECT(count > 5);
    692 
    693   // Now, cause calls to finish server-side one-at-a-time and check that this causes the client
    694   // side to be willing to send more.
    695   uint countReceived = 0;
    696   for (uint i = 0; i < 50; i++) {
    697     KJ_EXPECT(server.iSum == ++countReceived);
    698     server.iSum = 0;
    699     KJ_ASSERT_NONNULL(server.fulfiller)->fulfill();
    700 
    701     KJ_ASSERT(promise.poll(waitScope));
    702     promise.wait(waitScope);
    703 
    704     auto req = cap.doStreamIRequest();
    705     req.setI(++count);
    706     promise = req.send();
    707     if (promise.poll(waitScope)) {
    708       // We'll see a couple of instances where completing one request frees up space to make two
    709       // more. This is because the first few requests we made are a little bit larger than the
    710       // rest due to being pipelined on the bootstrap. Once the bootstrap resolves, the request
    711       // size gets smaller.
    712       promise.wait(waitScope);
    713       req = cap.doStreamIRequest();
    714       req.setI(++count);
    715       promise = req.send();
    716 
    717       // We definitely shouldn't have freed up stream space for more than two additional requests!
    718       KJ_ASSERT(!promise.poll(waitScope));
    719     }
    720   }
    721 }
    722 
    723 KJ_TEST("Streaming over RPC then unwrap with CapabilitySet") {
    724   kj::EventLoop loop;
    725   kj::WaitScope waitScope(loop);
    726 
    727   auto pipe = kj::newTwoWayPipe();
    728 
    729   CapabilityServerSet<test::TestStreaming> capSet;
    730 
    731   auto ownServer = kj::heap<TestStreamingImpl>();
    732   auto& server = *ownServer;
    733   auto serverCap = capSet.add(kj::mv(ownServer));
    734 
    735   auto paf = kj::newPromiseAndFulfiller<test::TestStreaming::Client>();
    736 
    737   TwoPartyClient tpClient(*pipe.ends[0], serverCap);
    738   TwoPartyClient tpServer(*pipe.ends[1], kj::mv(paf.promise), rpc::twoparty::Side::SERVER);
    739 
    740   auto clientCap = tpClient.bootstrap().castAs<test::TestStreaming>();
    741 
    742   // Send stream requests until we can't anymore.
    743   kj::Promise<void> promise = kj::READY_NOW;
    744   uint count = 0;
    745   while (promise.poll(waitScope)) {
    746     promise.wait(waitScope);
    747 
    748     auto req = clientCap.doStreamIRequest();
    749     req.setI(++count);
    750     promise = req.send();
    751   }
    752 
    753   // We should have sent... several.
    754   KJ_EXPECT(count > 10);
    755 
    756   // Now try to unwrap.
    757   auto unwrapPromise = capSet.getLocalServer(clientCap);
    758 
    759   // It won't work yet, obviously, because we haven't resolved the promise.
    760   KJ_EXPECT(!unwrapPromise.poll(waitScope));
    761 
    762   // So do that.
    763   paf.fulfiller->fulfill(tpServer.bootstrap().castAs<test::TestStreaming>());
    764   clientCap.whenResolved().wait(waitScope);
    765 
    766   // But the unwrap still doesn't resolve because streaming requests are queued up.
    767   KJ_EXPECT(!unwrapPromise.poll(waitScope));
    768 
    769   // OK, let's resolve a streaming request.
    770   KJ_ASSERT_NONNULL(server.fulfiller)->fulfill();
    771 
    772   // All of our call promises have now completed from the client's perspective.
    773   promise.wait(waitScope);
    774 
    775   // But we still can't unwrap, because calls are queued server-side.
    776   KJ_EXPECT(!unwrapPromise.poll(waitScope));
    777 
    778   // Let's even make one more call now. But this is actually a local call since the promise
    779   // resolved.
    780   {
    781     auto req = clientCap.doStreamIRequest();
    782     req.setI(++count);
    783     promise = req.send();
    784   }
    785 
    786   // Because it's a local call, it doesn't resolve early. The window is no longer in effect.
    787   KJ_EXPECT(!promise.poll(waitScope));
    788   KJ_ASSERT_NONNULL(server.fulfiller)->fulfill();
    789   KJ_EXPECT(!promise.poll(waitScope));
    790   KJ_ASSERT_NONNULL(server.fulfiller)->fulfill();
    791   KJ_EXPECT(!promise.poll(waitScope));
    792   KJ_ASSERT_NONNULL(server.fulfiller)->fulfill();
    793   KJ_EXPECT(!promise.poll(waitScope));
    794   KJ_ASSERT_NONNULL(server.fulfiller)->fulfill();
    795   KJ_EXPECT(!promise.poll(waitScope));
    796 
    797   // Our unwrap promise is also still not resolved.
    798   KJ_EXPECT(!unwrapPromise.poll(waitScope));
    799 
    800   // Close out stream calls until it does resolve!
    801   while (!unwrapPromise.poll(waitScope)) {
    802     KJ_ASSERT_NONNULL(server.fulfiller)->fulfill();
    803   }
    804 
    805   // Now we can unwrap!
    806   KJ_EXPECT(&KJ_ASSERT_NONNULL(unwrapPromise.wait(waitScope)) == &server);
    807 
    808   // But our last stream call still isn't done.
    809   KJ_EXPECT(!promise.poll(waitScope));
    810 
    811   // Finish it.
    812   KJ_ASSERT_NONNULL(server.fulfiller)->fulfill();
    813   promise.wait(waitScope);
    814 }
    815 
    816 KJ_TEST("promise cap resolves between starting request and sending it") {
    817   kj::EventLoop loop;
    818   kj::WaitScope waitScope(loop);
    819   auto pipe = kj::newTwoWayPipe();
    820 
    821   // Client exports TestCallOrderImpl as its bootstrap.
    822   TwoPartyClient client(*pipe.ends[0], kj::heap<TestCallOrderImpl>(), rpc::twoparty::Side::CLIENT);
    823 
    824   // Server exports a promise, which will later resolve to loop back to the capability the client
    825   // exported.
    826   auto paf = kj::newPromiseAndFulfiller<Capability::Client>();
    827   TwoPartyClient server(*pipe.ends[1], kj::mv(paf.promise), rpc::twoparty::Side::SERVER);
    828 
    829   // Create a request but don't send it yet.
    830   auto cap = client.bootstrap().castAs<test::TestCallOrder>();
    831   auto req1 = cap.getCallSequenceRequest();
    832 
    833   // Fulfill the promise now so that the server's bootstrap loops back to the client's bootstrap.
    834   paf.fulfiller->fulfill(server.bootstrap());
    835   cap.whenResolved().wait(waitScope);
    836 
    837   // Send the request we created earlier, and also create and send a second request.
    838   auto promise1 = req1.send();
    839   auto promise2 = cap.getCallSequenceRequest().send();
    840 
    841   // They should arrive in order of send()s.
    842   auto n1 = promise1.wait(waitScope).getN();
    843   KJ_EXPECT(n1 == 0, n1);
    844   auto n2 = promise2.wait(waitScope).getN();
    845   KJ_EXPECT(n2 == 1, n2);
    846 }
    847 
    848 KJ_TEST("write error propagates to read error") {
    849   kj::EventLoop loop;
    850   kj::WaitScope waitScope(loop);
    851   auto frontPipe = kj::newTwoWayPipe();
    852   auto backPipe = kj::newTwoWayPipe();
    853 
    854   TwoPartyClient client(*frontPipe.ends[0]);
    855 
    856   int callCount;
    857   TwoPartyClient server(*backPipe.ends[1], kj::heap<TestInterfaceImpl>(callCount),
    858                         rpc::twoparty::Side::SERVER);
    859 
    860   auto pumpUpTask = frontPipe.ends[1]->pumpTo(*backPipe.ends[0]);
    861   auto pumpDownTask = backPipe.ends[0]->pumpTo(*frontPipe.ends[1]);
    862 
    863   auto cap = client.bootstrap().castAs<test::TestInterface>();
    864 
    865   // Make sure the connections work.
    866   {
    867     auto req = cap.fooRequest();
    868     req.setI(123);
    869     req.setJ(true);
    870     auto resp = req.send().wait(waitScope);
    871     EXPECT_EQ("foo", resp.getX());
    872   }
    873 
    874   // Disconnect upstream task in such a way that future writes on the client will fail, but the
    875   // server doesn't notice the disconnect and so won't react.
    876   pumpUpTask = nullptr;
    877   frontPipe.ends[1]->abortRead();  // causes write() on ends[0] to fail in the future
    878 
    879   {
    880     auto req = cap.fooRequest();
    881     req.setI(123);
    882     req.setJ(true);
    883     auto promise = req.send().then([](auto) {
    884       KJ_FAIL_EXPECT("expected exception");
    885     }, [](kj::Exception&& e) {
    886       KJ_ASSERT(e.getDescription() == "abortRead() has been called");
    887     });
    888 
    889     KJ_ASSERT(promise.poll(waitScope));
    890     promise.wait(waitScope);
    891   }
    892 }
    893 
    894 }  // namespace
    895 }  // namespace _
    896 }  // namespace capnp