capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

rpc-test.c++ (52040B)


      1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #define CAPNP_TESTING_CAPNP 1
     23 
     24 #include "rpc.h"
     25 #include "test-util.h"
     26 #include "schema.h"
     27 #include "serialize.h"
     28 #include <kj/debug.h>
     29 #include <kj/string-tree.h>
     30 #include <kj/compat/gtest.h>
     31 #include <capnp/rpc.capnp.h>
     32 #include <map>
     33 #include <queue>
     34 
     35 // TODO(cleanup): Auto-generate stringification functions for union discriminants.
     36 namespace capnp {
     37 namespace rpc {
     38 inline kj::String KJ_STRINGIFY(Message::Which which) {
     39   return kj::str(static_cast<uint16_t>(which));
     40 }
     41 }  // namespace rpc
     42 }  // namespace capnp
     43 
     44 namespace capnp {
     45 namespace _ {  // private
     46 namespace {
     47 
     48 class RpcDumper {
     49   // Class which stringifies RPC messages for debugging purposes, including decoding params and
     50   // results based on the call's interface and method IDs and extracting cap descriptors.
     51   //
     52   // TODO(cleanup):  Clean this code up and move it to someplace reusable, so it can be used as
     53   //   a packet inspector / debugging tool for Cap'n Proto network traffic.
     54 
     55 public:
     56   void addSchema(InterfaceSchema schema) {
     57     schemas[schema.getProto().getId()] = schema;
     58   }
     59 
     60   enum Sender {
     61     CLIENT,
     62     SERVER
     63   };
     64 
     65   kj::String dump(rpc::Message::Reader message, Sender sender) {
     66     const char* senderName = sender == CLIENT ? "client" : "server";
     67 
     68     switch (message.which()) {
     69       case rpc::Message::CALL: {
     70         auto call = message.getCall();
     71         auto iter = schemas.find(call.getInterfaceId());
     72         if (iter == schemas.end()) {
     73           break;
     74         }
     75         InterfaceSchema schema = iter->second;
     76         auto methods = schema.getMethods();
     77         if (call.getMethodId() >= methods.size()) {
     78           break;
     79         }
     80         InterfaceSchema::Method method = methods[call.getMethodId()];
     81 
     82         auto schemaProto = schema.getProto();
     83         auto interfaceName =
     84             schemaProto.getDisplayName().slice(schemaProto.getDisplayNamePrefixLength());
     85 
     86         auto methodProto = method.getProto();
     87         auto paramType = method.getParamType();
     88         auto resultType = method.getResultType();
     89 
     90         if (call.getSendResultsTo().isCaller()) {
     91           returnTypes[std::make_pair(sender, call.getQuestionId())] = resultType;
     92         }
     93 
     94         auto payload = call.getParams();
     95         auto params = kj::str(payload.getContent().getAs<DynamicStruct>(paramType));
     96 
     97         auto sendResultsTo = call.getSendResultsTo();
     98 
     99         return kj::str(senderName, "(", call.getQuestionId(), "): call ",
    100                        call.getTarget(), " <- ", interfaceName, ".",
    101                        methodProto.getName(), params,
    102                        " caps:[", kj::strArray(payload.getCapTable(), ", "), "]",
    103                        sendResultsTo.isCaller() ? kj::str()
    104                                                 : kj::str(" sendResultsTo:", sendResultsTo));
    105       }
    106 
    107       case rpc::Message::RETURN: {
    108         auto ret = message.getReturn();
    109 
    110         auto iter = returnTypes.find(
    111             std::make_pair(sender == CLIENT ? SERVER : CLIENT, ret.getAnswerId()));
    112         if (iter == returnTypes.end()) {
    113           break;
    114         }
    115 
    116         auto schema = iter->second;
    117         returnTypes.erase(iter);
    118         if (ret.which() != rpc::Return::RESULTS) {
    119           // Oops, no results returned.  We don't check this earlier because we want to make sure
    120           // returnTypes.erase() gets a chance to happen.
    121           break;
    122         }
    123 
    124         auto payload = ret.getResults();
    125 
    126         if (schema.getProto().isStruct()) {
    127           auto results = kj::str(payload.getContent().getAs<DynamicStruct>(schema.asStruct()));
    128 
    129           return kj::str(senderName, "(", ret.getAnswerId(), "): return ", results,
    130                          " caps:[", kj::strArray(payload.getCapTable(), ", "), "]");
    131         } else if (schema.getProto().isInterface()) {
    132           payload.getContent().getAs<DynamicCapability>(schema.asInterface());
    133           return kj::str(senderName, "(", ret.getAnswerId(), "): return cap ",
    134                          kj::strArray(payload.getCapTable(), ", "));
    135         } else {
    136           break;
    137         }
    138       }
    139 
    140       case rpc::Message::BOOTSTRAP: {
    141         auto restore = message.getBootstrap();
    142 
    143         returnTypes[std::make_pair(sender, restore.getQuestionId())] = InterfaceSchema();
    144 
    145         return kj::str(senderName, "(", restore.getQuestionId(), "): bootstrap ",
    146                        restore.getDeprecatedObjectId().getAs<test::TestSturdyRefObjectId>());
    147       }
    148 
    149       default:
    150         break;
    151     }
    152 
    153     return kj::str(senderName, ": ", message);
    154   }
    155 
    156 private:
    157   std::map<uint64_t, InterfaceSchema> schemas;
    158   std::map<std::pair<Sender, uint32_t>, Schema> returnTypes;
    159 };
    160 
    161 // =======================================================================================
    162 
    163 class TestNetworkAdapter;
    164 
    165 class TestNetwork {
    166 public:
    167   TestNetwork() {
    168     dumper.addSchema(Schema::from<test::TestInterface>());
    169     dumper.addSchema(Schema::from<test::TestExtends>());
    170     dumper.addSchema(Schema::from<test::TestPipeline>());
    171     dumper.addSchema(Schema::from<test::TestCallOrder>());
    172     dumper.addSchema(Schema::from<test::TestTailCallee>());
    173     dumper.addSchema(Schema::from<test::TestTailCaller>());
    174     dumper.addSchema(Schema::from<test::TestMoreStuff>());
    175   }
    176   ~TestNetwork() noexcept(false);
    177 
    178   TestNetworkAdapter& add(kj::StringPtr name);
    179 
    180   kj::Maybe<TestNetworkAdapter&> find(kj::StringPtr name) {
    181     auto iter = map.find(name);
    182     if (iter == map.end()) {
    183       return nullptr;
    184     } else {
    185       return *iter->second;
    186     }
    187   }
    188 
    189   RpcDumper dumper;
    190 
    191 private:
    192   std::map<kj::StringPtr, kj::Own<TestNetworkAdapter>> map;
    193 };
    194 
    195 typedef VatNetwork<
    196     test::TestSturdyRefHostId, test::TestProvisionId, test::TestRecipientId,
    197     test::TestThirdPartyCapId, test::TestJoinResult> TestNetworkAdapterBase;
    198 
    199 class TestNetworkAdapter final: public TestNetworkAdapterBase {
    200 public:
    201   TestNetworkAdapter(TestNetwork& network, kj::StringPtr self): network(network), self(self) {}
    202 
    203   ~TestNetworkAdapter() {
    204     kj::Exception exception = KJ_EXCEPTION(FAILED, "Network was destroyed.");
    205     for (auto& entry: connections) {
    206       entry.second->disconnect(kj::cp(exception));
    207     }
    208   }
    209 
    210   uint getSentCount() { return sent; }
    211   uint getReceivedCount() { return received; }
    212 
    213   void onSend(kj::Function<bool(MessageBuilder& message)> callback) {
    214     // Invokes the given callback every time a message is sent. Callback can return false to cause
    215     // send() to do nothing.
    216     sendCallback = kj::mv(callback);
    217   }
    218 
    219   typedef TestNetworkAdapterBase::Connection Connection;
    220 
    221   class ConnectionImpl final
    222       : public Connection, public kj::Refcounted, public kj::TaskSet::ErrorHandler {
    223   public:
    224     ConnectionImpl(TestNetworkAdapter& network, RpcDumper::Sender sender)
    225         : network(network), sender(sender), tasks(kj::heap<kj::TaskSet>(*this)) {}
    226 
    227     void attach(ConnectionImpl& other) {
    228       KJ_REQUIRE(partner == nullptr);
    229       KJ_REQUIRE(other.partner == nullptr);
    230       partner = other;
    231       other.partner = *this;
    232     }
    233 
    234     void disconnect(kj::Exception&& exception) {
    235       while (!fulfillers.empty()) {
    236         fulfillers.front()->reject(kj::cp(exception));
    237         fulfillers.pop();
    238       }
    239 
    240       networkException = kj::mv(exception);
    241 
    242       tasks = nullptr;
    243     }
    244 
    245     class IncomingRpcMessageImpl final: public IncomingRpcMessage, public kj::Refcounted {
    246     public:
    247       IncomingRpcMessageImpl(kj::Array<word> data)
    248           : data(kj::mv(data)),
    249             message(this->data) {}
    250 
    251       AnyPointer::Reader getBody() override {
    252         return message.getRoot<AnyPointer>();
    253       }
    254 
    255       size_t sizeInWords() override {
    256         return data.size();
    257       }
    258 
    259       kj::Array<word> data;
    260       FlatArrayMessageReader message;
    261     };
    262 
    263     class OutgoingRpcMessageImpl final: public OutgoingRpcMessage {
    264     public:
    265       OutgoingRpcMessageImpl(ConnectionImpl& connection, uint firstSegmentWordSize)
    266           : connection(connection),
    267             message(firstSegmentWordSize == 0 ? SUGGESTED_FIRST_SEGMENT_WORDS
    268                                               : firstSegmentWordSize) {}
    269 
    270       AnyPointer::Builder getBody() override {
    271         return message.getRoot<AnyPointer>();
    272       }
    273 
    274       void send() override {
    275         if (!connection.network.sendCallback(message)) return;
    276 
    277         if (connection.networkException != nullptr) {
    278           return;
    279         }
    280 
    281         ++connection.network.sent;
    282 
    283         // Uncomment to get a debug dump.
    284 //        kj::String msg = connection.network.network.dumper.dump(
    285 //            message.getRoot<rpc::Message>(), connection.sender);
    286 //        KJ_ DBG(msg);
    287 
    288         auto incomingMessage = kj::heap<IncomingRpcMessageImpl>(messageToFlatArray(message));
    289 
    290         auto connectionPtr = &connection;
    291         connection.tasks->add(kj::evalLater(kj::mvCapture(incomingMessage,
    292             [connectionPtr](kj::Own<IncomingRpcMessageImpl>&& message) {
    293           KJ_IF_MAYBE(p, connectionPtr->partner) {
    294             if (p->fulfillers.empty()) {
    295               p->messages.push(kj::mv(message));
    296             } else {
    297               ++p->network.received;
    298               p->fulfillers.front()->fulfill(
    299                   kj::Own<IncomingRpcMessage>(kj::mv(message)));
    300               p->fulfillers.pop();
    301             }
    302           }
    303         })));
    304       }
    305 
    306       size_t sizeInWords() override {
    307         return message.sizeInWords();
    308       }
    309 
    310     private:
    311       ConnectionImpl& connection;
    312       MallocMessageBuilder message;
    313     };
    314 
    315     test::TestSturdyRefHostId::Reader getPeerVatId() override {
    316       // Not actually implemented for the purpose of this test.
    317       return test::TestSturdyRefHostId::Reader();
    318     }
    319 
    320     kj::Own<OutgoingRpcMessage> newOutgoingMessage(uint firstSegmentWordSize) override {
    321       return kj::heap<OutgoingRpcMessageImpl>(*this, firstSegmentWordSize);
    322     }
    323     kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> receiveIncomingMessage() override {
    324       KJ_IF_MAYBE(e, networkException) {
    325         return kj::cp(*e);
    326       }
    327 
    328       if (messages.empty()) {
    329         KJ_IF_MAYBE(f, fulfillOnEnd) {
    330           f->get()->fulfill();
    331           return kj::Maybe<kj::Own<IncomingRpcMessage>>(nullptr);
    332         } else {
    333           auto paf = kj::newPromiseAndFulfiller<kj::Maybe<kj::Own<IncomingRpcMessage>>>();
    334           fulfillers.push(kj::mv(paf.fulfiller));
    335           return kj::mv(paf.promise);
    336         }
    337       } else {
    338         ++network.received;
    339         auto result = kj::mv(messages.front());
    340         messages.pop();
    341         return kj::Maybe<kj::Own<IncomingRpcMessage>>(kj::mv(result));
    342       }
    343     }
    344     kj::Promise<void> shutdown() override {
    345       KJ_IF_MAYBE(p, partner) {
    346         auto paf = kj::newPromiseAndFulfiller<void>();
    347         p->fulfillOnEnd = kj::mv(paf.fulfiller);
    348         return kj::mv(paf.promise);
    349       } else {
    350         return kj::READY_NOW;
    351       }
    352     }
    353 
    354     void taskFailed(kj::Exception&& exception) override {
    355       ADD_FAILURE() << kj::str(exception).cStr();
    356     }
    357 
    358   private:
    359     TestNetworkAdapter& network;
    360     RpcDumper::Sender sender KJ_UNUSED_MEMBER;
    361     kj::Maybe<ConnectionImpl&> partner;
    362 
    363     kj::Maybe<kj::Exception> networkException;
    364 
    365     std::queue<kj::Own<kj::PromiseFulfiller<kj::Maybe<kj::Own<IncomingRpcMessage>>>>> fulfillers;
    366     std::queue<kj::Own<IncomingRpcMessage>> messages;
    367     kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> fulfillOnEnd;
    368 
    369     kj::Own<kj::TaskSet> tasks;
    370   };
    371 
    372   kj::Maybe<kj::Own<Connection>> connect(test::TestSturdyRefHostId::Reader hostId) override {
    373     if (hostId.getHost() == self) {
    374       return nullptr;
    375     }
    376 
    377     TestNetworkAdapter& dst = KJ_REQUIRE_NONNULL(network.find(hostId.getHost()));
    378 
    379     auto iter = connections.find(&dst);
    380     if (iter == connections.end()) {
    381       auto local = kj::refcounted<ConnectionImpl>(*this, RpcDumper::CLIENT);
    382       auto remote = kj::refcounted<ConnectionImpl>(dst, RpcDumper::SERVER);
    383       local->attach(*remote);
    384 
    385       connections[&dst] = kj::addRef(*local);
    386       dst.connections[this] = kj::addRef(*remote);
    387 
    388       if (dst.fulfillerQueue.empty()) {
    389         dst.connectionQueue.push(kj::mv(remote));
    390       } else {
    391         dst.fulfillerQueue.front()->fulfill(kj::mv(remote));
    392         dst.fulfillerQueue.pop();
    393       }
    394 
    395       return kj::Own<Connection>(kj::mv(local));
    396     } else {
    397       return kj::Own<Connection>(kj::addRef(*iter->second));
    398     }
    399   }
    400 
    401   kj::Promise<kj::Own<Connection>> accept() override {
    402     if (connectionQueue.empty()) {
    403       auto paf = kj::newPromiseAndFulfiller<kj::Own<Connection>>();
    404       fulfillerQueue.push(kj::mv(paf.fulfiller));
    405       return kj::mv(paf.promise);
    406     } else {
    407       auto result = kj::mv(connectionQueue.front());
    408       connectionQueue.pop();
    409       return kj::mv(result);
    410     }
    411   }
    412 
    413 private:
    414   TestNetwork& network;
    415   kj::StringPtr self;
    416   uint sent = 0;
    417   uint received = 0;
    418 
    419   std::map<const TestNetworkAdapter*, kj::Own<ConnectionImpl>> connections;
    420   std::queue<kj::Own<kj::PromiseFulfiller<kj::Own<Connection>>>> fulfillerQueue;
    421   std::queue<kj::Own<Connection>> connectionQueue;
    422 
    423   kj::Function<bool(MessageBuilder& message)> sendCallback = [](MessageBuilder&) { return true; };
    424 };
    425 
    426 TestNetwork::~TestNetwork() noexcept(false) {}
    427 
    428 TestNetworkAdapter& TestNetwork::add(kj::StringPtr name) {
    429   return *(map[name] = kj::heap<TestNetworkAdapter>(*this, name));
    430 }
    431 
    432 // =======================================================================================
    433 
    434 class TestRestorer final: public SturdyRefRestorer<test::TestSturdyRefObjectId> {
    435 public:
    436   int callCount = 0;
    437   int handleCount = 0;
    438 
    439   Capability::Client restore(test::TestSturdyRefObjectId::Reader objectId) override {
    440     switch (objectId.getTag()) {
    441       case test::TestSturdyRefObjectId::Tag::TEST_INTERFACE:
    442         return kj::heap<TestInterfaceImpl>(callCount);
    443       case test::TestSturdyRefObjectId::Tag::TEST_EXTENDS:
    444         return Capability::Client(newBrokenCap("No TestExtends implemented."));
    445       case test::TestSturdyRefObjectId::Tag::TEST_PIPELINE:
    446         return kj::heap<TestPipelineImpl>(callCount);
    447       case test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLEE:
    448         return kj::heap<TestTailCalleeImpl>(callCount);
    449       case test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLER:
    450         return kj::heap<TestTailCallerImpl>(callCount);
    451       case test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF:
    452         return kj::heap<TestMoreStuffImpl>(callCount, handleCount);
    453     }
    454     KJ_UNREACHABLE;
    455   }
    456 };
    457 
    458 struct TestContext {
    459   kj::EventLoop loop;
    460   kj::WaitScope waitScope;
    461   TestNetwork network;
    462   TestRestorer restorer;
    463   TestNetworkAdapter& clientNetwork;
    464   TestNetworkAdapter& serverNetwork;
    465   RpcSystem<test::TestSturdyRefHostId> rpcClient;
    466   RpcSystem<test::TestSturdyRefHostId> rpcServer;
    467 
    468   TestContext()
    469       : waitScope(loop),
    470         clientNetwork(network.add("client")),
    471         serverNetwork(network.add("server")),
    472         rpcClient(makeRpcClient(clientNetwork)),
    473         rpcServer(makeRpcServer(serverNetwork, restorer)) {}
    474   TestContext(Capability::Client bootstrap)
    475       : waitScope(loop),
    476         clientNetwork(network.add("client")),
    477         serverNetwork(network.add("server")),
    478         rpcClient(makeRpcClient(clientNetwork)),
    479         rpcServer(makeRpcServer(serverNetwork, bootstrap)) {}
    480 
    481   Capability::Client connect(test::TestSturdyRefObjectId::Tag tag) {
    482     MallocMessageBuilder refMessage(128);
    483     auto ref = refMessage.initRoot<test::TestSturdyRef>();
    484     auto hostId = ref.initHostId();
    485     hostId.setHost("server");
    486     ref.getObjectId().initAs<test::TestSturdyRefObjectId>().setTag(tag);
    487 
    488     return rpcClient.restore(hostId, ref.getObjectId());
    489   }
    490 };
    491 
    492 TEST(Rpc, Basic) {
    493   TestContext context;
    494 
    495   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_INTERFACE)
    496       .castAs<test::TestInterface>();
    497 
    498   auto request1 = client.fooRequest();
    499   request1.setI(123);
    500   request1.setJ(true);
    501   auto promise1 = request1.send();
    502 
    503   // We used to call bar() after baz(), hence the numbering, but this masked the case where the
    504   // RPC system actually disconnected on bar() (thus returning an exception, which we decided
    505   // was expected).
    506   bool barFailed = false;
    507   auto request3 = client.barRequest();
    508   auto promise3 = request3.send().then(
    509       [](Response<test::TestInterface::BarResults>&& response) {
    510         ADD_FAILURE() << "Expected bar() call to fail.";
    511       }, [&](kj::Exception&& e) {
    512         barFailed = true;
    513       });
    514 
    515   auto request2 = client.bazRequest();
    516   initTestMessage(request2.initS());
    517   auto promise2 = request2.send();
    518 
    519   EXPECT_EQ(0, context.restorer.callCount);
    520 
    521   auto response1 = promise1.wait(context.waitScope);
    522 
    523   EXPECT_EQ("foo", response1.getX());
    524 
    525   auto response2 = promise2.wait(context.waitScope);
    526 
    527   promise3.wait(context.waitScope);
    528 
    529   EXPECT_EQ(2, context.restorer.callCount);
    530   EXPECT_TRUE(barFailed);
    531 }
    532 
    533 TEST(Rpc, Pipelining) {
    534   TestContext context;
    535 
    536   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_PIPELINE)
    537       .castAs<test::TestPipeline>();
    538 
    539   int chainedCallCount = 0;
    540 
    541   auto request = client.getCapRequest();
    542   request.setN(234);
    543   request.setInCap(kj::heap<TestInterfaceImpl>(chainedCallCount));
    544 
    545   auto promise = request.send();
    546 
    547   auto pipelineRequest = promise.getOutBox().getCap().fooRequest();
    548   pipelineRequest.setI(321);
    549   auto pipelinePromise = pipelineRequest.send();
    550 
    551   auto pipelineRequest2 = promise.getOutBox().getCap().castAs<test::TestExtends>().graultRequest();
    552   auto pipelinePromise2 = pipelineRequest2.send();
    553 
    554   promise = nullptr;  // Just to be annoying, drop the original promise.
    555 
    556   EXPECT_EQ(0, context.restorer.callCount);
    557   EXPECT_EQ(0, chainedCallCount);
    558 
    559   auto response = pipelinePromise.wait(context.waitScope);
    560   EXPECT_EQ("bar", response.getX());
    561 
    562   auto response2 = pipelinePromise2.wait(context.waitScope);
    563   checkTestMessage(response2);
    564 
    565   EXPECT_EQ(3, context.restorer.callCount);
    566   EXPECT_EQ(1, chainedCallCount);
    567 }
    568 
    569 KJ_TEST("RPC context.setPipeline") {
    570   TestContext context;
    571 
    572   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_PIPELINE)
    573       .castAs<test::TestPipeline>();
    574 
    575   auto promise = client.getCapPipelineOnlyRequest().send();
    576 
    577   auto pipelineRequest = promise.getOutBox().getCap().fooRequest();
    578   pipelineRequest.setI(321);
    579   auto pipelinePromise = pipelineRequest.send();
    580 
    581   auto pipelineRequest2 = promise.getOutBox().getCap().castAs<test::TestExtends>().graultRequest();
    582   auto pipelinePromise2 = pipelineRequest2.send();
    583 
    584   EXPECT_EQ(0, context.restorer.callCount);
    585 
    586   auto response = pipelinePromise.wait(context.waitScope);
    587   EXPECT_EQ("bar", response.getX());
    588 
    589   auto response2 = pipelinePromise2.wait(context.waitScope);
    590   checkTestMessage(response2);
    591 
    592   EXPECT_EQ(3, context.restorer.callCount);
    593 
    594   // The original promise never completed.
    595   KJ_EXPECT(!promise.poll(context.waitScope));
    596 }
    597 
    598 TEST(Rpc, Release) {
    599   TestContext context;
    600 
    601   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
    602       .castAs<test::TestMoreStuff>();
    603 
    604   auto handle1 = client.getHandleRequest().send().wait(context.waitScope).getHandle();
    605   auto promise = client.getHandleRequest().send();
    606   auto handle2 = promise.wait(context.waitScope).getHandle();
    607 
    608   EXPECT_EQ(2, context.restorer.handleCount);
    609 
    610   handle1 = nullptr;
    611 
    612   for (uint i = 0; i < 16; i++) kj::evalLater([]() {}).wait(context.waitScope);
    613   EXPECT_EQ(1, context.restorer.handleCount);
    614 
    615   handle2 = nullptr;
    616 
    617   for (uint i = 0; i < 16; i++) kj::evalLater([]() {}).wait(context.waitScope);
    618   EXPECT_EQ(1, context.restorer.handleCount);
    619 
    620   promise = nullptr;
    621 
    622   for (uint i = 0; i < 16; i++) kj::evalLater([]() {}).wait(context.waitScope);
    623   EXPECT_EQ(0, context.restorer.handleCount);
    624 }
    625 
    626 TEST(Rpc, ReleaseOnCancel) {
    627   // At one time, there was a bug where if a Return contained capabilities, but the client had
    628   // canceled the request and already send a Finish (which presumably didn't reach the server before
    629   // the Return), then we'd leak those caps. Test for that.
    630 
    631   TestContext context;
    632 
    633   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
    634       .castAs<test::TestMoreStuff>();
    635   client.whenResolved().wait(context.waitScope);
    636 
    637   {
    638     auto promise = client.getHandleRequest().send();
    639 
    640     // If the server receives cancellation too early, it won't even return a capability in the
    641     // results, it will just return "canceled". We want to emulate the case where the return message
    642     // and the cancel (finish) message cross paths. It turns out that exactly two evalLater()s get
    643     // us there.
    644     //
    645     // TODO(cleanup): This is fragile, but I'm not sure how else to write it without a ton
    646     //   of scaffolding.
    647     kj::evalLater([]() {}).wait(context.waitScope);
    648     kj::evalLater([]() {}).wait(context.waitScope);
    649   }
    650 
    651   for (uint i = 0; i < 16; i++) kj::evalLater([]() {}).wait(context.waitScope);
    652   EXPECT_EQ(0, context.restorer.handleCount);
    653 }
    654 
    655 TEST(Rpc, TailCall) {
    656   TestContext context;
    657 
    658   auto caller = context.connect(test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLER)
    659       .castAs<test::TestTailCaller>();
    660 
    661   int calleeCallCount = 0;
    662 
    663   test::TestTailCallee::Client callee(kj::heap<TestTailCalleeImpl>(calleeCallCount));
    664 
    665   auto request = caller.fooRequest();
    666   request.setI(456);
    667   request.setCallee(callee);
    668 
    669   auto promise = request.send();
    670 
    671   auto dependentCall0 = promise.getC().getCallSequenceRequest().send();
    672 
    673   auto response = promise.wait(context.waitScope);
    674   EXPECT_EQ(456, response.getI());
    675   EXPECT_EQ("from TestTailCaller", response.getT());
    676 
    677   auto dependentCall1 = promise.getC().getCallSequenceRequest().send();
    678 
    679   EXPECT_EQ(0, dependentCall0.wait(context.waitScope).getN());
    680   EXPECT_EQ(1, dependentCall1.wait(context.waitScope).getN());
    681 
    682   // TODO(someday): We used to initiate dependentCall2 here before waiting on the first two calls,
    683   //   and the ordering was still "correct". But this was apparently by accident. Calling getC() on
    684   //   the final response returns a different capability from calling getC() on the promise. There
    685   //   are no guarantees on the ordering of calls on the response capability vs. the earlier
    686   //   promise. When ordering matters, applications should take the original promise capability and
    687   //   keep using that. In theory the RPC system could create continuity here, but it would be
    688   //   annoying: for each capability that had been fetched on the promise, it would need to
    689   //   traverse to the same capability in the final response and swap it out in-place for the
    690   //   pipelined cap returned earlier. Maybe we'll determine later that that's really needed but
    691   //   for now I'm not gonna do it.
    692   auto dependentCall2 = response.getC().getCallSequenceRequest().send();
    693 
    694   EXPECT_EQ(2, dependentCall2.wait(context.waitScope).getN());
    695 
    696   EXPECT_EQ(1, calleeCallCount);
    697   EXPECT_EQ(1, context.restorer.callCount);
    698 }
    699 
    700 class TestHangingTailCallee final: public test::TestTailCallee::Server {
    701 public:
    702   TestHangingTailCallee(int& callCount, int& cancelCount)
    703       : callCount(callCount), cancelCount(cancelCount) {}
    704 
    705   kj::Promise<void> foo(FooContext context) override {
    706     context.allowCancellation();
    707     ++callCount;
    708     return kj::Promise<void>(kj::NEVER_DONE)
    709         .attach(kj::defer([&cancelCount = cancelCount]() { ++cancelCount; }));
    710   }
    711 
    712 private:
    713   int& callCount;
    714   int& cancelCount;
    715 };
    716 
    717 class TestRacingTailCaller final: public test::TestTailCaller::Server {
    718 public:
    719   TestRacingTailCaller(kj::Promise<void> unblock): unblock(kj::mv(unblock)) {}
    720 
    721   kj::Promise<void> foo(FooContext context) override {
    722     return unblock.then([context]() mutable {
    723       auto tailRequest = context.getParams().getCallee().fooRequest();
    724       return context.tailCall(kj::mv(tailRequest));
    725     });
    726   }
    727 
    728 private:
    729   kj::Promise<void> unblock;
    730 };
    731 
    732 TEST(Rpc, TailCallCancel) {
    733   TestContext context;
    734 
    735   auto caller = context.connect(test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLER)
    736       .castAs<test::TestTailCaller>();
    737 
    738   int callCount = 0, cancelCount = 0;
    739 
    740   test::TestTailCallee::Client callee(kj::heap<TestHangingTailCallee>(callCount, cancelCount));
    741 
    742   {
    743     auto request = caller.fooRequest();
    744     request.setCallee(callee);
    745 
    746     auto promise = request.send();
    747 
    748     KJ_ASSERT(callCount == 0);
    749     KJ_ASSERT(cancelCount == 0);
    750 
    751     KJ_ASSERT(!promise.poll(context.waitScope));
    752 
    753     KJ_ASSERT(callCount == 1);
    754     KJ_ASSERT(cancelCount == 0);
    755   }
    756 
    757   kj::Promise<void>(kj::NEVER_DONE).poll(context.waitScope);
    758 
    759   KJ_ASSERT(callCount == 1);
    760   KJ_ASSERT(cancelCount == 1);
    761 }
    762 
    763 TEST(Rpc, TailCallCancelRace) {
    764   auto paf = kj::newPromiseAndFulfiller<void>();
    765   TestContext context(kj::heap<TestRacingTailCaller>(kj::mv(paf.promise)));
    766 
    767   MallocMessageBuilder serverHostIdBuilder;
    768   auto serverHostId = serverHostIdBuilder.getRoot<test::TestSturdyRefHostId>();
    769   serverHostId.setHost("server");
    770 
    771   auto caller = context.rpcClient.bootstrap(serverHostId).castAs<test::TestTailCaller>();
    772 
    773   int callCount = 0, cancelCount = 0;
    774 
    775   test::TestTailCallee::Client callee(kj::heap<TestHangingTailCallee>(callCount, cancelCount));
    776 
    777   {
    778     auto request = caller.fooRequest();
    779     request.setCallee(callee);
    780 
    781     auto promise = request.send();
    782 
    783     KJ_ASSERT(callCount == 0);
    784     KJ_ASSERT(cancelCount == 0);
    785 
    786     KJ_ASSERT(!promise.poll(context.waitScope));
    787 
    788     KJ_ASSERT(callCount == 0);
    789     KJ_ASSERT(cancelCount == 0);
    790 
    791     // Unblock the server and at the same time cancel the client.
    792     paf.fulfiller->fulfill();
    793   }
    794 
    795   kj::Promise<void>(kj::NEVER_DONE).poll(context.waitScope);
    796 
    797   KJ_ASSERT(callCount == 1);
    798   KJ_ASSERT(cancelCount == 1);
    799 }
    800 
    801 TEST(Rpc, Cancelation) {
    802   // Tests allowCancellation().
    803 
    804   TestContext context;
    805 
    806   auto paf = kj::newPromiseAndFulfiller<void>();
    807   bool destroyed = false;
    808   auto destructionPromise = paf.promise.then([&]() { destroyed = true; }).eagerlyEvaluate(nullptr);
    809 
    810   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
    811       .castAs<test::TestMoreStuff>();
    812 
    813   kj::Promise<void> promise = nullptr;
    814 
    815   bool returned = false;
    816   {
    817     auto request = client.expectCancelRequest();
    818     request.setCap(kj::heap<TestCapDestructor>(kj::mv(paf.fulfiller)));
    819     promise = request.send().then(
    820         [&](Response<test::TestMoreStuff::ExpectCancelResults>&& response) {
    821       returned = true;
    822     }).eagerlyEvaluate(nullptr);
    823   }
    824   kj::evalLater([]() {}).wait(context.waitScope);
    825   kj::evalLater([]() {}).wait(context.waitScope);
    826   kj::evalLater([]() {}).wait(context.waitScope);
    827   kj::evalLater([]() {}).wait(context.waitScope);
    828   kj::evalLater([]() {}).wait(context.waitScope);
    829   kj::evalLater([]() {}).wait(context.waitScope);
    830 
    831   // We can detect that the method was canceled because it will drop the cap.
    832   EXPECT_FALSE(destroyed);
    833   EXPECT_FALSE(returned);
    834 
    835   promise = nullptr;  // request cancellation
    836   destructionPromise.wait(context.waitScope);
    837 
    838   EXPECT_TRUE(destroyed);
    839   EXPECT_FALSE(returned);
    840 }
    841 
    842 TEST(Rpc, PromiseResolve) {
    843   TestContext context;
    844 
    845   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
    846       .castAs<test::TestMoreStuff>();
    847 
    848   int chainedCallCount = 0;
    849 
    850   auto request = client.callFooRequest();
    851   auto request2 = client.callFooWhenResolvedRequest();
    852 
    853   auto paf = kj::newPromiseAndFulfiller<test::TestInterface::Client>();
    854 
    855   {
    856     auto fork = paf.promise.fork();
    857     request.setCap(fork.addBranch());
    858     request2.setCap(fork.addBranch());
    859   }
    860 
    861   auto promise = request.send();
    862   auto promise2 = request2.send();
    863 
    864   // Make sure getCap() has been called on the server side by sending another call and waiting
    865   // for it.
    866   EXPECT_EQ(2, client.getCallSequenceRequest().send().wait(context.waitScope).getN());
    867   EXPECT_EQ(3, context.restorer.callCount);
    868 
    869   // OK, now fulfill the local promise.
    870   paf.fulfiller->fulfill(kj::heap<TestInterfaceImpl>(chainedCallCount));
    871 
    872   // We should now be able to wait for getCap() to finish.
    873   EXPECT_EQ("bar", promise.wait(context.waitScope).getS());
    874   EXPECT_EQ("bar", promise2.wait(context.waitScope).getS());
    875 
    876   EXPECT_EQ(3, context.restorer.callCount);
    877   EXPECT_EQ(2, chainedCallCount);
    878 }
    879 
    880 TEST(Rpc, RetainAndRelease) {
    881   TestContext context;
    882 
    883   auto paf = kj::newPromiseAndFulfiller<void>();
    884   bool destroyed = false;
    885   auto destructionPromise = paf.promise.then([&]() { destroyed = true; }).eagerlyEvaluate(nullptr);
    886 
    887   {
    888     auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
    889         .castAs<test::TestMoreStuff>();
    890 
    891     {
    892       auto request = client.holdRequest();
    893       request.setCap(kj::heap<TestCapDestructor>(kj::mv(paf.fulfiller)));
    894       request.send().wait(context.waitScope);
    895     }
    896 
    897     // Do some other call to add a round trip.
    898     EXPECT_EQ(1, client.getCallSequenceRequest().send().wait(context.waitScope).getN());
    899 
    900     // Shouldn't be destroyed because it's being held by the server.
    901     EXPECT_FALSE(destroyed);
    902 
    903     // We can ask it to call the held capability.
    904     EXPECT_EQ("bar", client.callHeldRequest().send().wait(context.waitScope).getS());
    905 
    906     {
    907       // We can get the cap back from it.
    908       auto capCopy = client.getHeldRequest().send().wait(context.waitScope).getCap();
    909 
    910       {
    911         // And call it, without any network communications.
    912         uint oldSentCount = context.clientNetwork.getSentCount();
    913         auto request = capCopy.fooRequest();
    914         request.setI(123);
    915         request.setJ(true);
    916         EXPECT_EQ("foo", request.send().wait(context.waitScope).getX());
    917         EXPECT_EQ(oldSentCount, context.clientNetwork.getSentCount());
    918       }
    919 
    920       {
    921         // We can send another copy of the same cap to another method, and it works.
    922         auto request = client.callFooRequest();
    923         request.setCap(capCopy);
    924         EXPECT_EQ("bar", request.send().wait(context.waitScope).getS());
    925       }
    926     }
    927 
    928     // Give some time to settle.
    929     EXPECT_EQ(5, client.getCallSequenceRequest().send().wait(context.waitScope).getN());
    930     EXPECT_EQ(6, client.getCallSequenceRequest().send().wait(context.waitScope).getN());
    931     EXPECT_EQ(7, client.getCallSequenceRequest().send().wait(context.waitScope).getN());
    932 
    933     // Can't be destroyed, we haven't released it.
    934     EXPECT_FALSE(destroyed);
    935   }
    936 
    937   // We released our client, which should cause the server to be released, which in turn will
    938   // release the cap pointing back to us.
    939   destructionPromise.wait(context.waitScope);
    940   EXPECT_TRUE(destroyed);
    941 }
    942 
    943 TEST(Rpc, Cancel) {
    944   TestContext context;
    945 
    946   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
    947       .castAs<test::TestMoreStuff>();
    948 
    949   auto paf = kj::newPromiseAndFulfiller<void>();
    950   bool destroyed = false;
    951   auto destructionPromise = paf.promise.then([&]() { destroyed = true; }).eagerlyEvaluate(nullptr);
    952 
    953   {
    954     auto request = client.neverReturnRequest();
    955     request.setCap(kj::heap<TestCapDestructor>(kj::mv(paf.fulfiller)));
    956 
    957     {
    958       auto responsePromise = request.send();
    959 
    960       // Allow some time to settle.
    961       EXPECT_EQ(1, client.getCallSequenceRequest().send().wait(context.waitScope).getN());
    962       EXPECT_EQ(2, client.getCallSequenceRequest().send().wait(context.waitScope).getN());
    963 
    964       // The cap shouldn't have been destroyed yet because the call never returned.
    965       EXPECT_FALSE(destroyed);
    966     }
    967   }
    968 
    969   // Now the cap should be released.
    970   destructionPromise.wait(context.waitScope);
    971   EXPECT_TRUE(destroyed);
    972 }
    973 
    974 TEST(Rpc, SendTwice) {
    975   TestContext context;
    976 
    977   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
    978       .castAs<test::TestMoreStuff>();
    979 
    980   auto paf = kj::newPromiseAndFulfiller<void>();
    981   bool destroyed = false;
    982   auto destructionPromise = paf.promise.then([&]() { destroyed = true; }).eagerlyEvaluate(nullptr);
    983 
    984   auto cap = test::TestInterface::Client(kj::heap<TestCapDestructor>(kj::mv(paf.fulfiller)));
    985 
    986   {
    987     auto request = client.callFooRequest();
    988     request.setCap(cap);
    989 
    990     EXPECT_EQ("bar", request.send().wait(context.waitScope).getS());
    991   }
    992 
    993   // Allow some time for the server to release `cap`.
    994   EXPECT_EQ(1, client.getCallSequenceRequest().send().wait(context.waitScope).getN());
    995 
    996   {
    997     // More requests with the same cap.
    998     auto request = client.callFooRequest();
    999     auto request2 = client.callFooRequest();
   1000     request.setCap(cap);
   1001     request2.setCap(kj::mv(cap));
   1002 
   1003     auto promise = request.send();
   1004     auto promise2 = request2.send();
   1005 
   1006     EXPECT_EQ("bar", promise.wait(context.waitScope).getS());
   1007     EXPECT_EQ("bar", promise2.wait(context.waitScope).getS());
   1008   }
   1009 
   1010   // Now the cap should be released.
   1011   destructionPromise.wait(context.waitScope);
   1012   EXPECT_TRUE(destroyed);
   1013 }
   1014 
   1015 RemotePromise<test::TestCallOrder::GetCallSequenceResults> getCallSequence(
   1016     test::TestCallOrder::Client& client, uint expected) {
   1017   auto req = client.getCallSequenceRequest();
   1018   req.setExpected(expected);
   1019   return req.send();
   1020 }
   1021 
   1022 TEST(Rpc, Embargo) {
   1023   TestContext context;
   1024 
   1025   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
   1026       .castAs<test::TestMoreStuff>();
   1027 
   1028   auto cap = test::TestCallOrder::Client(kj::heap<TestCallOrderImpl>());
   1029 
   1030   auto earlyCall = client.getCallSequenceRequest().send();
   1031 
   1032   auto echoRequest = client.echoRequest();
   1033   echoRequest.setCap(cap);
   1034   auto echo = echoRequest.send();
   1035 
   1036   auto pipeline = echo.getCap();
   1037 
   1038   auto call0 = getCallSequence(pipeline, 0);
   1039   auto call1 = getCallSequence(pipeline, 1);
   1040 
   1041   earlyCall.wait(context.waitScope);
   1042 
   1043   auto call2 = getCallSequence(pipeline, 2);
   1044 
   1045   auto resolved = echo.wait(context.waitScope).getCap();
   1046 
   1047   auto call3 = getCallSequence(pipeline, 3);
   1048   auto call4 = getCallSequence(pipeline, 4);
   1049   auto call5 = getCallSequence(pipeline, 5);
   1050 
   1051   EXPECT_EQ(0, call0.wait(context.waitScope).getN());
   1052   EXPECT_EQ(1, call1.wait(context.waitScope).getN());
   1053   EXPECT_EQ(2, call2.wait(context.waitScope).getN());
   1054   EXPECT_EQ(3, call3.wait(context.waitScope).getN());
   1055   EXPECT_EQ(4, call4.wait(context.waitScope).getN());
   1056   EXPECT_EQ(5, call5.wait(context.waitScope).getN());
   1057 }
   1058 
   1059 TEST(Rpc, EmbargoUnwrap) {
   1060   // Test that embargos properly block unwraping a capability using CapabilityServerSet.
   1061 
   1062   TestContext context;
   1063 
   1064   capnp::CapabilityServerSet<test::TestCallOrder> capSet;
   1065 
   1066   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
   1067       .castAs<test::TestMoreStuff>();
   1068 
   1069   auto cap = capSet.add(kj::heap<TestCallOrderImpl>());
   1070 
   1071   auto earlyCall = client.getCallSequenceRequest().send();
   1072 
   1073   auto echoRequest = client.echoRequest();
   1074   echoRequest.setCap(cap);
   1075   auto echo = echoRequest.send();
   1076 
   1077   auto pipeline = echo.getCap();
   1078 
   1079   auto unwrap = capSet.getLocalServer(pipeline)
   1080       .then([](kj::Maybe<test::TestCallOrder::Server&> unwrapped) {
   1081     return kj::downcast<TestCallOrderImpl>(KJ_ASSERT_NONNULL(unwrapped)).getCount();
   1082   }).eagerlyEvaluate(nullptr);
   1083 
   1084   auto call0 = getCallSequence(pipeline, 0);
   1085   auto call1 = getCallSequence(pipeline, 1);
   1086 
   1087   earlyCall.wait(context.waitScope);
   1088 
   1089   auto call2 = getCallSequence(pipeline, 2);
   1090 
   1091   auto resolved = echo.wait(context.waitScope).getCap();
   1092 
   1093   auto call3 = getCallSequence(pipeline, 4);
   1094   auto call4 = getCallSequence(pipeline, 4);
   1095   auto call5 = getCallSequence(pipeline, 5);
   1096 
   1097   EXPECT_EQ(0, call0.wait(context.waitScope).getN());
   1098   EXPECT_EQ(1, call1.wait(context.waitScope).getN());
   1099   EXPECT_EQ(2, call2.wait(context.waitScope).getN());
   1100   EXPECT_EQ(3, call3.wait(context.waitScope).getN());
   1101   EXPECT_EQ(4, call4.wait(context.waitScope).getN());
   1102   EXPECT_EQ(5, call5.wait(context.waitScope).getN());
   1103 
   1104   uint unwrappedAt = unwrap.wait(context.waitScope);
   1105   KJ_EXPECT(unwrappedAt >= 3, unwrappedAt);
   1106 }
   1107 
   1108 template <typename T>
   1109 void expectPromiseThrows(kj::Promise<T>&& promise, kj::WaitScope& waitScope) {
   1110   EXPECT_TRUE(promise.then([](T&&) { return false; }, [](kj::Exception&&) { return true; })
   1111       .wait(waitScope));
   1112 }
   1113 
   1114 template <>
   1115 void expectPromiseThrows(kj::Promise<void>&& promise, kj::WaitScope& waitScope) {
   1116   EXPECT_TRUE(promise.then([]() { return false; }, [](kj::Exception&&) { return true; })
   1117       .wait(waitScope));
   1118 }
   1119 
   1120 TEST(Rpc, EmbargoError) {
   1121   TestContext context;
   1122 
   1123   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
   1124       .castAs<test::TestMoreStuff>();
   1125 
   1126   auto paf = kj::newPromiseAndFulfiller<test::TestCallOrder::Client>();
   1127 
   1128   auto cap = test::TestCallOrder::Client(kj::mv(paf.promise));
   1129 
   1130   auto earlyCall = client.getCallSequenceRequest().send();
   1131 
   1132   auto echoRequest = client.echoRequest();
   1133   echoRequest.setCap(cap);
   1134   auto echo = echoRequest.send();
   1135 
   1136   auto pipeline = echo.getCap();
   1137 
   1138   auto call0 = getCallSequence(pipeline, 0);
   1139   auto call1 = getCallSequence(pipeline, 1);
   1140 
   1141   earlyCall.wait(context.waitScope);
   1142 
   1143   auto call2 = getCallSequence(pipeline, 2);
   1144 
   1145   auto resolved = echo.wait(context.waitScope).getCap();
   1146 
   1147   auto call3 = getCallSequence(pipeline, 3);
   1148   auto call4 = getCallSequence(pipeline, 4);
   1149   auto call5 = getCallSequence(pipeline, 5);
   1150 
   1151   paf.fulfiller->rejectIfThrows([]() { KJ_FAIL_ASSERT("foo") { break; } });
   1152 
   1153   expectPromiseThrows(kj::mv(call0), context.waitScope);
   1154   expectPromiseThrows(kj::mv(call1), context.waitScope);
   1155   expectPromiseThrows(kj::mv(call2), context.waitScope);
   1156   expectPromiseThrows(kj::mv(call3), context.waitScope);
   1157   expectPromiseThrows(kj::mv(call4), context.waitScope);
   1158   expectPromiseThrows(kj::mv(call5), context.waitScope);
   1159 
   1160   // Verify that we're still connected (there were no protocol errors).
   1161   getCallSequence(client, 1).wait(context.waitScope);
   1162 }
   1163 
   1164 TEST(Rpc, EmbargoNull) {
   1165   // Set up a situation where we pipeline on a capability that ends up coming back null. This
   1166   // should NOT cause a Disembargo to be sent, but due to a bug in earlier versions of Cap'n Proto,
   1167   // a Disembargo was indeed sent to the null capability, which caused the server to disconnect
   1168   // due to protocol error.
   1169 
   1170   TestContext context;
   1171 
   1172   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
   1173       .castAs<test::TestMoreStuff>();
   1174 
   1175   auto promise = client.getNullRequest().send();
   1176 
   1177   auto cap = promise.getNullCap();
   1178 
   1179   auto call0 = cap.getCallSequenceRequest().send();
   1180 
   1181   promise.wait(context.waitScope);
   1182 
   1183   auto call1 = cap.getCallSequenceRequest().send();
   1184 
   1185   expectPromiseThrows(kj::mv(call0), context.waitScope);
   1186   expectPromiseThrows(kj::mv(call1), context.waitScope);
   1187 
   1188   // Verify that we're still connected (there were no protocol errors).
   1189   getCallSequence(client, 0).wait(context.waitScope);
   1190 }
   1191 
   1192 TEST(Rpc, CallBrokenPromise) {
   1193   // Tell the server to call back to a promise client, then resolve the promise to an error.
   1194 
   1195   TestContext context;
   1196 
   1197   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
   1198       .castAs<test::TestMoreStuff>();
   1199   auto paf = kj::newPromiseAndFulfiller<test::TestInterface::Client>();
   1200 
   1201   {
   1202     auto req = client.holdRequest();
   1203     req.setCap(kj::mv(paf.promise));
   1204     req.send().wait(context.waitScope);
   1205   }
   1206 
   1207   bool returned = false;
   1208   auto req = client.callHeldRequest().send()
   1209       .then([&](capnp::Response<test::TestMoreStuff::CallHeldResults>&&) {
   1210     returned = true;
   1211   }, [&](kj::Exception&& e) {
   1212     returned = true;
   1213     kj::throwRecoverableException(kj::mv(e));
   1214   }).eagerlyEvaluate(nullptr);
   1215 
   1216   kj::evalLater([]() {}).wait(context.waitScope);
   1217   kj::evalLater([]() {}).wait(context.waitScope);
   1218   kj::evalLater([]() {}).wait(context.waitScope);
   1219   kj::evalLater([]() {}).wait(context.waitScope);
   1220   kj::evalLater([]() {}).wait(context.waitScope);
   1221   kj::evalLater([]() {}).wait(context.waitScope);
   1222   kj::evalLater([]() {}).wait(context.waitScope);
   1223   kj::evalLater([]() {}).wait(context.waitScope);
   1224 
   1225   EXPECT_FALSE(returned);
   1226 
   1227   paf.fulfiller->rejectIfThrows([]() { KJ_FAIL_ASSERT("foo") { break; } });
   1228 
   1229   expectPromiseThrows(kj::mv(req), context.waitScope);
   1230   EXPECT_TRUE(returned);
   1231 
   1232   kj::evalLater([]() {}).wait(context.waitScope);
   1233   kj::evalLater([]() {}).wait(context.waitScope);
   1234   kj::evalLater([]() {}).wait(context.waitScope);
   1235   kj::evalLater([]() {}).wait(context.waitScope);
   1236   kj::evalLater([]() {}).wait(context.waitScope);
   1237   kj::evalLater([]() {}).wait(context.waitScope);
   1238   kj::evalLater([]() {}).wait(context.waitScope);
   1239   kj::evalLater([]() {}).wait(context.waitScope);
   1240 
   1241   // Verify that we're still connected (there were no protocol errors).
   1242   getCallSequence(client, 1).wait(context.waitScope);
   1243 }
   1244 
   1245 TEST(Rpc, Abort) {
   1246   // Verify that aborts are received.
   1247 
   1248   TestContext context;
   1249 
   1250   MallocMessageBuilder refMessage(128);
   1251   auto hostId = refMessage.initRoot<test::TestSturdyRefHostId>();
   1252   hostId.setHost("server");
   1253 
   1254   auto conn = KJ_ASSERT_NONNULL(context.clientNetwork.connect(hostId));
   1255 
   1256   {
   1257     // Send an invalid message (Return to non-existent question).
   1258     auto msg = conn->newOutgoingMessage(128);
   1259     auto body = msg->getBody().initAs<rpc::Message>().initReturn();
   1260     body.setAnswerId(1234);
   1261     body.setCanceled();
   1262     msg->send();
   1263   }
   1264 
   1265   auto reply = KJ_ASSERT_NONNULL(conn->receiveIncomingMessage().wait(context.waitScope));
   1266   EXPECT_EQ(rpc::Message::ABORT, reply->getBody().getAs<rpc::Message>().which());
   1267 
   1268   EXPECT_TRUE(conn->receiveIncomingMessage().wait(context.waitScope) == nullptr);
   1269 }
   1270 
   1271 KJ_TEST("loopback bootstrap()") {
   1272   int callCount = 0;
   1273   test::TestInterface::Client bootstrap = kj::heap<TestInterfaceImpl>(callCount);
   1274 
   1275   MallocMessageBuilder hostIdBuilder;
   1276   auto hostId = hostIdBuilder.getRoot<test::TestSturdyRefHostId>();
   1277   hostId.setHost("server");
   1278 
   1279   TestContext context(bootstrap);
   1280   auto client = context.rpcServer.bootstrap(hostId).castAs<test::TestInterface>();
   1281 
   1282   auto request = client.fooRequest();
   1283   request.setI(123);
   1284   request.setJ(true);
   1285   auto response = request.send().wait(context.waitScope);
   1286 
   1287   KJ_EXPECT(response.getX() == "foo");
   1288   KJ_EXPECT(callCount == 1);
   1289 }
   1290 
   1291 KJ_TEST("method throws exception") {
   1292   TestContext context;
   1293 
   1294   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
   1295       .castAs<test::TestMoreStuff>();
   1296 
   1297   kj::Maybe<kj::Exception> maybeException;
   1298   client.throwExceptionRequest().send().ignoreResult()
   1299       .catch_([&](kj::Exception&& e) {
   1300     maybeException = kj::mv(e);
   1301   }).wait(context.waitScope);
   1302 
   1303   auto exception = KJ_ASSERT_NONNULL(maybeException);
   1304   KJ_EXPECT(exception.getDescription() == "remote exception: test exception");
   1305   KJ_EXPECT(exception.getRemoteTrace() == nullptr);
   1306 }
   1307 
   1308 KJ_TEST("method throws exception won't redundantly add remote exception prefix") {
   1309   TestContext context;
   1310 
   1311   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
   1312       .castAs<test::TestMoreStuff>();
   1313 
   1314   kj::Maybe<kj::Exception> maybeException;
   1315   client.throwRemoteExceptionRequest().send().ignoreResult()
   1316       .catch_([&](kj::Exception&& e) {
   1317     maybeException = kj::mv(e);
   1318   }).wait(context.waitScope);
   1319 
   1320   auto exception = KJ_ASSERT_NONNULL(maybeException);
   1321   KJ_EXPECT(exception.getDescription() == "remote exception: test exception");
   1322   KJ_EXPECT(exception.getRemoteTrace() == nullptr);
   1323 }
   1324 
   1325 KJ_TEST("method throws exception with trace encoder") {
   1326   TestContext context;
   1327 
   1328   context.rpcServer.setTraceEncoder([](const kj::Exception& e) {
   1329     return kj::str("trace for ", e.getDescription());
   1330   });
   1331 
   1332   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
   1333       .castAs<test::TestMoreStuff>();
   1334 
   1335   kj::Maybe<kj::Exception> maybeException;
   1336   client.throwExceptionRequest().send().ignoreResult()
   1337       .catch_([&](kj::Exception&& e) {
   1338     maybeException = kj::mv(e);
   1339   }).wait(context.waitScope);
   1340 
   1341   auto exception = KJ_ASSERT_NONNULL(maybeException);
   1342   KJ_EXPECT(exception.getDescription() == "remote exception: test exception");
   1343   KJ_EXPECT(exception.getRemoteTrace() == "trace for test exception");
   1344 }
   1345 
   1346 KJ_TEST("when OutgoingRpcMessage::send() throws, we don't leak exports") {
   1347   // When OutgoingRpcMessage::send() throws an exception on a Call message, we need to clean up
   1348   // anything that had been added to the export table as part of the call. At one point this
   1349   // cleanup was missing, so exports would leak.
   1350 
   1351   TestContext context;
   1352 
   1353   uint32_t expectedExportNumber = 0;
   1354   uint interceptCount = 0;
   1355   bool shouldThrowFromSend = false;
   1356   context.clientNetwork.onSend([&](MessageBuilder& builder) {
   1357     auto message = builder.getRoot<rpc::Message>().asReader();
   1358     if (message.isCall()) {
   1359       auto call = message.getCall();
   1360       if (call.getInterfaceId() == capnp::typeId<test::TestMoreStuff>() &&
   1361           call.getMethodId() == 0) {
   1362         // callFoo() request, expect a capability in the param caps. Specifically we expect a
   1363         // promise, because that's what we send below.
   1364         auto capTable = call.getParams().getCapTable();
   1365         KJ_ASSERT(capTable.size() == 1);
   1366         auto desc = capTable[0];
   1367         KJ_ASSERT(desc.isSenderPromise());
   1368         KJ_ASSERT(desc.getSenderPromise() == expectedExportNumber);
   1369 
   1370         ++interceptCount;
   1371         if (shouldThrowFromSend) {
   1372           kj::throwRecoverableException(KJ_EXCEPTION(FAILED, "intercepted"));
   1373           return false;  // only matters when -fno-exceptions
   1374         }
   1375       }
   1376     }
   1377     return true;
   1378   });
   1379 
   1380   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
   1381       .castAs<test::TestMoreStuff>();
   1382 
   1383   {
   1384     shouldThrowFromSend = true;
   1385     auto req = client.callFooRequest();
   1386     req.setCap(kj::Promise<test::TestInterface::Client>(kj::NEVER_DONE));
   1387     req.send().then([](auto&&) {
   1388       KJ_FAIL_ASSERT("should have thrown");
   1389     }, [](kj::Exception&& e) {
   1390       KJ_EXPECT(e.getDescription() == "intercepted", e);
   1391     }).wait(context.waitScope);
   1392   }
   1393 
   1394   KJ_EXPECT(interceptCount == 1);
   1395 
   1396   // Sending again should use the same export number, because the export table entry should have
   1397   // been released when send() threw. (At one point, this was a bug...)
   1398   {
   1399     shouldThrowFromSend = true;
   1400     auto req = client.callFooRequest();
   1401     req.setCap(kj::Promise<test::TestInterface::Client>(kj::NEVER_DONE));
   1402     req.send().then([](auto&&) {
   1403       KJ_FAIL_ASSERT("should have thrown");
   1404     }, [](kj::Exception&& e) {
   1405       KJ_EXPECT(e.getDescription() == "intercepted", e);
   1406     }).wait(context.waitScope);
   1407   }
   1408 
   1409   KJ_EXPECT(interceptCount == 2);
   1410 
   1411   // Now lets start a call that doesn't throw. The export number should still be zero because
   1412   // the previous exports were released.
   1413   {
   1414     shouldThrowFromSend = false;
   1415     auto req = client.callFooRequest();
   1416     req.setCap(kj::Promise<test::TestInterface::Client>(kj::NEVER_DONE));
   1417     auto promise = req.send();
   1418     KJ_EXPECT(!promise.poll(context.waitScope));
   1419 
   1420     KJ_EXPECT(interceptCount == 3);
   1421   }
   1422 
   1423   // We canceled the previous call, BUT the exported capability is still present until the other
   1424   // side drops it, which it won't because the call isn't marked cancelable and never completes.
   1425   // Now, let's send another call. This time, we expect a new export number will actually be
   1426   // allocated.
   1427   {
   1428     shouldThrowFromSend = false;
   1429     expectedExportNumber = 1;
   1430     auto req = client.callFooRequest();
   1431     auto paf = kj::newPromiseAndFulfiller<test::TestInterface::Client>();
   1432     req.setCap(kj::mv(paf.promise));
   1433     auto promise = req.send();
   1434     KJ_EXPECT(!promise.poll(context.waitScope));
   1435 
   1436     KJ_EXPECT(interceptCount == 4);
   1437 
   1438     // Now let's actually let the RPC complete so we can verify the RPC system isn't broken or
   1439     // anything.
   1440     int callCount = 0;
   1441     paf.fulfiller->fulfill(kj::heap<TestInterfaceImpl>(callCount));
   1442     auto resp = promise.wait(context.waitScope);
   1443     KJ_EXPECT(resp.getS() == "bar");
   1444     KJ_EXPECT(callCount == 1);
   1445   }
   1446 
   1447   // Now if we do yet another call, it'll reuse export number 1.
   1448   {
   1449     shouldThrowFromSend = false;
   1450     expectedExportNumber = 1;
   1451     auto req = client.callFooRequest();
   1452     req.setCap(kj::Promise<test::TestInterface::Client>(kj::NEVER_DONE));
   1453     auto promise = req.send();
   1454     KJ_EXPECT(!promise.poll(context.waitScope));
   1455 
   1456     KJ_EXPECT(interceptCount == 5);
   1457   }
   1458 }
   1459 
   1460 KJ_TEST("export the same promise twice") {
   1461   TestContext context;
   1462 
   1463   bool exportIsPromise;
   1464   uint32_t expectedExportNumber;
   1465   uint interceptCount = 0;
   1466   context.clientNetwork.onSend([&](MessageBuilder& builder) {
   1467     auto message = builder.getRoot<rpc::Message>().asReader();
   1468     if (message.isCall()) {
   1469       auto call = message.getCall();
   1470       if (call.getInterfaceId() == capnp::typeId<test::TestMoreStuff>() &&
   1471           call.getMethodId() == 0) {
   1472         // callFoo() request, expect a capability in the param caps. Specifically we expect a
   1473         // promise, because that's what we send below.
   1474         auto capTable = call.getParams().getCapTable();
   1475         KJ_ASSERT(capTable.size() == 1);
   1476         auto desc = capTable[0];
   1477         if (exportIsPromise) {
   1478           KJ_ASSERT(desc.isSenderPromise());
   1479           KJ_ASSERT(desc.getSenderPromise() == expectedExportNumber);
   1480         } else {
   1481           KJ_ASSERT(desc.isSenderHosted());
   1482           KJ_ASSERT(desc.getSenderHosted() == expectedExportNumber);
   1483         }
   1484 
   1485         ++interceptCount;
   1486       }
   1487     }
   1488     return true;
   1489   });
   1490 
   1491   auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
   1492       .castAs<test::TestMoreStuff>();
   1493 
   1494   auto sendReq = [&](test::TestInterface::Client cap) {
   1495     auto req = client.callFooRequest();
   1496     req.setCap(kj::mv(cap));
   1497     return req.send();
   1498   };
   1499 
   1500   auto expectNeverDone = [&](auto& promise) {
   1501     if (promise.poll(context.waitScope)) {
   1502       promise.wait(context.waitScope);  // let it throw if it's going to
   1503       KJ_FAIL_ASSERT("promise finished without throwing");
   1504     }
   1505   };
   1506 
   1507   int callCount = 0;
   1508   test::TestInterface::Client normalCap = kj::heap<TestInterfaceImpl>(callCount);
   1509   test::TestInterface::Client promiseCap = kj::Promise<test::TestInterface::Client>(kj::NEVER_DONE);
   1510 
   1511   // Send request with a promise capability in the params.
   1512   exportIsPromise = true;
   1513   expectedExportNumber = 0;
   1514   auto promise1 = sendReq(promiseCap);
   1515   expectNeverDone(promise1);
   1516   KJ_EXPECT(interceptCount == 1);
   1517 
   1518   // Send a second request with the same promise should use the same export table entry.
   1519   auto promise2 = sendReq(promiseCap);
   1520   expectNeverDone(promise2);
   1521   KJ_EXPECT(interceptCount == 2);
   1522 
   1523   // Sending a request with a different promise should use a different export table entry.
   1524   expectedExportNumber = 1;
   1525   auto promise3 = sendReq(kj::Promise<test::TestInterface::Client>(kj::NEVER_DONE));
   1526   expectNeverDone(promise3);
   1527   KJ_EXPECT(interceptCount == 3);
   1528 
   1529   // Now try sending a non-promise cap. We'll send all these requests at once before waiting on
   1530   // any of them since these will acutally complete.k
   1531   exportIsPromise = false;
   1532   expectedExportNumber = 2;
   1533   auto promise4 = sendReq(normalCap);
   1534   auto promise5 = sendReq(normalCap);
   1535   expectedExportNumber = 3;
   1536   auto promise6 = sendReq(kj::heap<TestInterfaceImpl>(callCount));
   1537   KJ_EXPECT(interceptCount == 6);
   1538 
   1539   KJ_EXPECT(promise4.wait(context.waitScope).getS() == "bar");
   1540   KJ_EXPECT(promise5.wait(context.waitScope).getS() == "bar");
   1541   KJ_EXPECT(promise6.wait(context.waitScope).getS() == "bar");
   1542   KJ_EXPECT(callCount == 3);
   1543 }
   1544 
   1545 }  // namespace
   1546 }  // namespace _ (private)
   1547 }  // namespace capnp