rpc-test.c++ (52040B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #define CAPNP_TESTING_CAPNP 1 23 24 #include "rpc.h" 25 #include "test-util.h" 26 #include "schema.h" 27 #include "serialize.h" 28 #include <kj/debug.h> 29 #include <kj/string-tree.h> 30 #include <kj/compat/gtest.h> 31 #include <capnp/rpc.capnp.h> 32 #include <map> 33 #include <queue> 34 35 // TODO(cleanup): Auto-generate stringification functions for union discriminants. 36 namespace capnp { 37 namespace rpc { 38 inline kj::String KJ_STRINGIFY(Message::Which which) { 39 return kj::str(static_cast<uint16_t>(which)); 40 } 41 } // namespace rpc 42 } // namespace capnp 43 44 namespace capnp { 45 namespace _ { // private 46 namespace { 47 48 class RpcDumper { 49 // Class which stringifies RPC messages for debugging purposes, including decoding params and 50 // results based on the call's interface and method IDs and extracting cap descriptors. 51 // 52 // TODO(cleanup): Clean this code up and move it to someplace reusable, so it can be used as 53 // a packet inspector / debugging tool for Cap'n Proto network traffic. 54 55 public: 56 void addSchema(InterfaceSchema schema) { 57 schemas[schema.getProto().getId()] = schema; 58 } 59 60 enum Sender { 61 CLIENT, 62 SERVER 63 }; 64 65 kj::String dump(rpc::Message::Reader message, Sender sender) { 66 const char* senderName = sender == CLIENT ? "client" : "server"; 67 68 switch (message.which()) { 69 case rpc::Message::CALL: { 70 auto call = message.getCall(); 71 auto iter = schemas.find(call.getInterfaceId()); 72 if (iter == schemas.end()) { 73 break; 74 } 75 InterfaceSchema schema = iter->second; 76 auto methods = schema.getMethods(); 77 if (call.getMethodId() >= methods.size()) { 78 break; 79 } 80 InterfaceSchema::Method method = methods[call.getMethodId()]; 81 82 auto schemaProto = schema.getProto(); 83 auto interfaceName = 84 schemaProto.getDisplayName().slice(schemaProto.getDisplayNamePrefixLength()); 85 86 auto methodProto = method.getProto(); 87 auto paramType = method.getParamType(); 88 auto resultType = method.getResultType(); 89 90 if (call.getSendResultsTo().isCaller()) { 91 returnTypes[std::make_pair(sender, call.getQuestionId())] = resultType; 92 } 93 94 auto payload = call.getParams(); 95 auto params = kj::str(payload.getContent().getAs<DynamicStruct>(paramType)); 96 97 auto sendResultsTo = call.getSendResultsTo(); 98 99 return kj::str(senderName, "(", call.getQuestionId(), "): call ", 100 call.getTarget(), " <- ", interfaceName, ".", 101 methodProto.getName(), params, 102 " caps:[", kj::strArray(payload.getCapTable(), ", "), "]", 103 sendResultsTo.isCaller() ? kj::str() 104 : kj::str(" sendResultsTo:", sendResultsTo)); 105 } 106 107 case rpc::Message::RETURN: { 108 auto ret = message.getReturn(); 109 110 auto iter = returnTypes.find( 111 std::make_pair(sender == CLIENT ? SERVER : CLIENT, ret.getAnswerId())); 112 if (iter == returnTypes.end()) { 113 break; 114 } 115 116 auto schema = iter->second; 117 returnTypes.erase(iter); 118 if (ret.which() != rpc::Return::RESULTS) { 119 // Oops, no results returned. We don't check this earlier because we want to make sure 120 // returnTypes.erase() gets a chance to happen. 121 break; 122 } 123 124 auto payload = ret.getResults(); 125 126 if (schema.getProto().isStruct()) { 127 auto results = kj::str(payload.getContent().getAs<DynamicStruct>(schema.asStruct())); 128 129 return kj::str(senderName, "(", ret.getAnswerId(), "): return ", results, 130 " caps:[", kj::strArray(payload.getCapTable(), ", "), "]"); 131 } else if (schema.getProto().isInterface()) { 132 payload.getContent().getAs<DynamicCapability>(schema.asInterface()); 133 return kj::str(senderName, "(", ret.getAnswerId(), "): return cap ", 134 kj::strArray(payload.getCapTable(), ", ")); 135 } else { 136 break; 137 } 138 } 139 140 case rpc::Message::BOOTSTRAP: { 141 auto restore = message.getBootstrap(); 142 143 returnTypes[std::make_pair(sender, restore.getQuestionId())] = InterfaceSchema(); 144 145 return kj::str(senderName, "(", restore.getQuestionId(), "): bootstrap ", 146 restore.getDeprecatedObjectId().getAs<test::TestSturdyRefObjectId>()); 147 } 148 149 default: 150 break; 151 } 152 153 return kj::str(senderName, ": ", message); 154 } 155 156 private: 157 std::map<uint64_t, InterfaceSchema> schemas; 158 std::map<std::pair<Sender, uint32_t>, Schema> returnTypes; 159 }; 160 161 // ======================================================================================= 162 163 class TestNetworkAdapter; 164 165 class TestNetwork { 166 public: 167 TestNetwork() { 168 dumper.addSchema(Schema::from<test::TestInterface>()); 169 dumper.addSchema(Schema::from<test::TestExtends>()); 170 dumper.addSchema(Schema::from<test::TestPipeline>()); 171 dumper.addSchema(Schema::from<test::TestCallOrder>()); 172 dumper.addSchema(Schema::from<test::TestTailCallee>()); 173 dumper.addSchema(Schema::from<test::TestTailCaller>()); 174 dumper.addSchema(Schema::from<test::TestMoreStuff>()); 175 } 176 ~TestNetwork() noexcept(false); 177 178 TestNetworkAdapter& add(kj::StringPtr name); 179 180 kj::Maybe<TestNetworkAdapter&> find(kj::StringPtr name) { 181 auto iter = map.find(name); 182 if (iter == map.end()) { 183 return nullptr; 184 } else { 185 return *iter->second; 186 } 187 } 188 189 RpcDumper dumper; 190 191 private: 192 std::map<kj::StringPtr, kj::Own<TestNetworkAdapter>> map; 193 }; 194 195 typedef VatNetwork< 196 test::TestSturdyRefHostId, test::TestProvisionId, test::TestRecipientId, 197 test::TestThirdPartyCapId, test::TestJoinResult> TestNetworkAdapterBase; 198 199 class TestNetworkAdapter final: public TestNetworkAdapterBase { 200 public: 201 TestNetworkAdapter(TestNetwork& network, kj::StringPtr self): network(network), self(self) {} 202 203 ~TestNetworkAdapter() { 204 kj::Exception exception = KJ_EXCEPTION(FAILED, "Network was destroyed."); 205 for (auto& entry: connections) { 206 entry.second->disconnect(kj::cp(exception)); 207 } 208 } 209 210 uint getSentCount() { return sent; } 211 uint getReceivedCount() { return received; } 212 213 void onSend(kj::Function<bool(MessageBuilder& message)> callback) { 214 // Invokes the given callback every time a message is sent. Callback can return false to cause 215 // send() to do nothing. 216 sendCallback = kj::mv(callback); 217 } 218 219 typedef TestNetworkAdapterBase::Connection Connection; 220 221 class ConnectionImpl final 222 : public Connection, public kj::Refcounted, public kj::TaskSet::ErrorHandler { 223 public: 224 ConnectionImpl(TestNetworkAdapter& network, RpcDumper::Sender sender) 225 : network(network), sender(sender), tasks(kj::heap<kj::TaskSet>(*this)) {} 226 227 void attach(ConnectionImpl& other) { 228 KJ_REQUIRE(partner == nullptr); 229 KJ_REQUIRE(other.partner == nullptr); 230 partner = other; 231 other.partner = *this; 232 } 233 234 void disconnect(kj::Exception&& exception) { 235 while (!fulfillers.empty()) { 236 fulfillers.front()->reject(kj::cp(exception)); 237 fulfillers.pop(); 238 } 239 240 networkException = kj::mv(exception); 241 242 tasks = nullptr; 243 } 244 245 class IncomingRpcMessageImpl final: public IncomingRpcMessage, public kj::Refcounted { 246 public: 247 IncomingRpcMessageImpl(kj::Array<word> data) 248 : data(kj::mv(data)), 249 message(this->data) {} 250 251 AnyPointer::Reader getBody() override { 252 return message.getRoot<AnyPointer>(); 253 } 254 255 size_t sizeInWords() override { 256 return data.size(); 257 } 258 259 kj::Array<word> data; 260 FlatArrayMessageReader message; 261 }; 262 263 class OutgoingRpcMessageImpl final: public OutgoingRpcMessage { 264 public: 265 OutgoingRpcMessageImpl(ConnectionImpl& connection, uint firstSegmentWordSize) 266 : connection(connection), 267 message(firstSegmentWordSize == 0 ? SUGGESTED_FIRST_SEGMENT_WORDS 268 : firstSegmentWordSize) {} 269 270 AnyPointer::Builder getBody() override { 271 return message.getRoot<AnyPointer>(); 272 } 273 274 void send() override { 275 if (!connection.network.sendCallback(message)) return; 276 277 if (connection.networkException != nullptr) { 278 return; 279 } 280 281 ++connection.network.sent; 282 283 // Uncomment to get a debug dump. 284 // kj::String msg = connection.network.network.dumper.dump( 285 // message.getRoot<rpc::Message>(), connection.sender); 286 // KJ_ DBG(msg); 287 288 auto incomingMessage = kj::heap<IncomingRpcMessageImpl>(messageToFlatArray(message)); 289 290 auto connectionPtr = &connection; 291 connection.tasks->add(kj::evalLater(kj::mvCapture(incomingMessage, 292 [connectionPtr](kj::Own<IncomingRpcMessageImpl>&& message) { 293 KJ_IF_MAYBE(p, connectionPtr->partner) { 294 if (p->fulfillers.empty()) { 295 p->messages.push(kj::mv(message)); 296 } else { 297 ++p->network.received; 298 p->fulfillers.front()->fulfill( 299 kj::Own<IncomingRpcMessage>(kj::mv(message))); 300 p->fulfillers.pop(); 301 } 302 } 303 }))); 304 } 305 306 size_t sizeInWords() override { 307 return message.sizeInWords(); 308 } 309 310 private: 311 ConnectionImpl& connection; 312 MallocMessageBuilder message; 313 }; 314 315 test::TestSturdyRefHostId::Reader getPeerVatId() override { 316 // Not actually implemented for the purpose of this test. 317 return test::TestSturdyRefHostId::Reader(); 318 } 319 320 kj::Own<OutgoingRpcMessage> newOutgoingMessage(uint firstSegmentWordSize) override { 321 return kj::heap<OutgoingRpcMessageImpl>(*this, firstSegmentWordSize); 322 } 323 kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> receiveIncomingMessage() override { 324 KJ_IF_MAYBE(e, networkException) { 325 return kj::cp(*e); 326 } 327 328 if (messages.empty()) { 329 KJ_IF_MAYBE(f, fulfillOnEnd) { 330 f->get()->fulfill(); 331 return kj::Maybe<kj::Own<IncomingRpcMessage>>(nullptr); 332 } else { 333 auto paf = kj::newPromiseAndFulfiller<kj::Maybe<kj::Own<IncomingRpcMessage>>>(); 334 fulfillers.push(kj::mv(paf.fulfiller)); 335 return kj::mv(paf.promise); 336 } 337 } else { 338 ++network.received; 339 auto result = kj::mv(messages.front()); 340 messages.pop(); 341 return kj::Maybe<kj::Own<IncomingRpcMessage>>(kj::mv(result)); 342 } 343 } 344 kj::Promise<void> shutdown() override { 345 KJ_IF_MAYBE(p, partner) { 346 auto paf = kj::newPromiseAndFulfiller<void>(); 347 p->fulfillOnEnd = kj::mv(paf.fulfiller); 348 return kj::mv(paf.promise); 349 } else { 350 return kj::READY_NOW; 351 } 352 } 353 354 void taskFailed(kj::Exception&& exception) override { 355 ADD_FAILURE() << kj::str(exception).cStr(); 356 } 357 358 private: 359 TestNetworkAdapter& network; 360 RpcDumper::Sender sender KJ_UNUSED_MEMBER; 361 kj::Maybe<ConnectionImpl&> partner; 362 363 kj::Maybe<kj::Exception> networkException; 364 365 std::queue<kj::Own<kj::PromiseFulfiller<kj::Maybe<kj::Own<IncomingRpcMessage>>>>> fulfillers; 366 std::queue<kj::Own<IncomingRpcMessage>> messages; 367 kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> fulfillOnEnd; 368 369 kj::Own<kj::TaskSet> tasks; 370 }; 371 372 kj::Maybe<kj::Own<Connection>> connect(test::TestSturdyRefHostId::Reader hostId) override { 373 if (hostId.getHost() == self) { 374 return nullptr; 375 } 376 377 TestNetworkAdapter& dst = KJ_REQUIRE_NONNULL(network.find(hostId.getHost())); 378 379 auto iter = connections.find(&dst); 380 if (iter == connections.end()) { 381 auto local = kj::refcounted<ConnectionImpl>(*this, RpcDumper::CLIENT); 382 auto remote = kj::refcounted<ConnectionImpl>(dst, RpcDumper::SERVER); 383 local->attach(*remote); 384 385 connections[&dst] = kj::addRef(*local); 386 dst.connections[this] = kj::addRef(*remote); 387 388 if (dst.fulfillerQueue.empty()) { 389 dst.connectionQueue.push(kj::mv(remote)); 390 } else { 391 dst.fulfillerQueue.front()->fulfill(kj::mv(remote)); 392 dst.fulfillerQueue.pop(); 393 } 394 395 return kj::Own<Connection>(kj::mv(local)); 396 } else { 397 return kj::Own<Connection>(kj::addRef(*iter->second)); 398 } 399 } 400 401 kj::Promise<kj::Own<Connection>> accept() override { 402 if (connectionQueue.empty()) { 403 auto paf = kj::newPromiseAndFulfiller<kj::Own<Connection>>(); 404 fulfillerQueue.push(kj::mv(paf.fulfiller)); 405 return kj::mv(paf.promise); 406 } else { 407 auto result = kj::mv(connectionQueue.front()); 408 connectionQueue.pop(); 409 return kj::mv(result); 410 } 411 } 412 413 private: 414 TestNetwork& network; 415 kj::StringPtr self; 416 uint sent = 0; 417 uint received = 0; 418 419 std::map<const TestNetworkAdapter*, kj::Own<ConnectionImpl>> connections; 420 std::queue<kj::Own<kj::PromiseFulfiller<kj::Own<Connection>>>> fulfillerQueue; 421 std::queue<kj::Own<Connection>> connectionQueue; 422 423 kj::Function<bool(MessageBuilder& message)> sendCallback = [](MessageBuilder&) { return true; }; 424 }; 425 426 TestNetwork::~TestNetwork() noexcept(false) {} 427 428 TestNetworkAdapter& TestNetwork::add(kj::StringPtr name) { 429 return *(map[name] = kj::heap<TestNetworkAdapter>(*this, name)); 430 } 431 432 // ======================================================================================= 433 434 class TestRestorer final: public SturdyRefRestorer<test::TestSturdyRefObjectId> { 435 public: 436 int callCount = 0; 437 int handleCount = 0; 438 439 Capability::Client restore(test::TestSturdyRefObjectId::Reader objectId) override { 440 switch (objectId.getTag()) { 441 case test::TestSturdyRefObjectId::Tag::TEST_INTERFACE: 442 return kj::heap<TestInterfaceImpl>(callCount); 443 case test::TestSturdyRefObjectId::Tag::TEST_EXTENDS: 444 return Capability::Client(newBrokenCap("No TestExtends implemented.")); 445 case test::TestSturdyRefObjectId::Tag::TEST_PIPELINE: 446 return kj::heap<TestPipelineImpl>(callCount); 447 case test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLEE: 448 return kj::heap<TestTailCalleeImpl>(callCount); 449 case test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLER: 450 return kj::heap<TestTailCallerImpl>(callCount); 451 case test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF: 452 return kj::heap<TestMoreStuffImpl>(callCount, handleCount); 453 } 454 KJ_UNREACHABLE; 455 } 456 }; 457 458 struct TestContext { 459 kj::EventLoop loop; 460 kj::WaitScope waitScope; 461 TestNetwork network; 462 TestRestorer restorer; 463 TestNetworkAdapter& clientNetwork; 464 TestNetworkAdapter& serverNetwork; 465 RpcSystem<test::TestSturdyRefHostId> rpcClient; 466 RpcSystem<test::TestSturdyRefHostId> rpcServer; 467 468 TestContext() 469 : waitScope(loop), 470 clientNetwork(network.add("client")), 471 serverNetwork(network.add("server")), 472 rpcClient(makeRpcClient(clientNetwork)), 473 rpcServer(makeRpcServer(serverNetwork, restorer)) {} 474 TestContext(Capability::Client bootstrap) 475 : waitScope(loop), 476 clientNetwork(network.add("client")), 477 serverNetwork(network.add("server")), 478 rpcClient(makeRpcClient(clientNetwork)), 479 rpcServer(makeRpcServer(serverNetwork, bootstrap)) {} 480 481 Capability::Client connect(test::TestSturdyRefObjectId::Tag tag) { 482 MallocMessageBuilder refMessage(128); 483 auto ref = refMessage.initRoot<test::TestSturdyRef>(); 484 auto hostId = ref.initHostId(); 485 hostId.setHost("server"); 486 ref.getObjectId().initAs<test::TestSturdyRefObjectId>().setTag(tag); 487 488 return rpcClient.restore(hostId, ref.getObjectId()); 489 } 490 }; 491 492 TEST(Rpc, Basic) { 493 TestContext context; 494 495 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_INTERFACE) 496 .castAs<test::TestInterface>(); 497 498 auto request1 = client.fooRequest(); 499 request1.setI(123); 500 request1.setJ(true); 501 auto promise1 = request1.send(); 502 503 // We used to call bar() after baz(), hence the numbering, but this masked the case where the 504 // RPC system actually disconnected on bar() (thus returning an exception, which we decided 505 // was expected). 506 bool barFailed = false; 507 auto request3 = client.barRequest(); 508 auto promise3 = request3.send().then( 509 [](Response<test::TestInterface::BarResults>&& response) { 510 ADD_FAILURE() << "Expected bar() call to fail."; 511 }, [&](kj::Exception&& e) { 512 barFailed = true; 513 }); 514 515 auto request2 = client.bazRequest(); 516 initTestMessage(request2.initS()); 517 auto promise2 = request2.send(); 518 519 EXPECT_EQ(0, context.restorer.callCount); 520 521 auto response1 = promise1.wait(context.waitScope); 522 523 EXPECT_EQ("foo", response1.getX()); 524 525 auto response2 = promise2.wait(context.waitScope); 526 527 promise3.wait(context.waitScope); 528 529 EXPECT_EQ(2, context.restorer.callCount); 530 EXPECT_TRUE(barFailed); 531 } 532 533 TEST(Rpc, Pipelining) { 534 TestContext context; 535 536 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_PIPELINE) 537 .castAs<test::TestPipeline>(); 538 539 int chainedCallCount = 0; 540 541 auto request = client.getCapRequest(); 542 request.setN(234); 543 request.setInCap(kj::heap<TestInterfaceImpl>(chainedCallCount)); 544 545 auto promise = request.send(); 546 547 auto pipelineRequest = promise.getOutBox().getCap().fooRequest(); 548 pipelineRequest.setI(321); 549 auto pipelinePromise = pipelineRequest.send(); 550 551 auto pipelineRequest2 = promise.getOutBox().getCap().castAs<test::TestExtends>().graultRequest(); 552 auto pipelinePromise2 = pipelineRequest2.send(); 553 554 promise = nullptr; // Just to be annoying, drop the original promise. 555 556 EXPECT_EQ(0, context.restorer.callCount); 557 EXPECT_EQ(0, chainedCallCount); 558 559 auto response = pipelinePromise.wait(context.waitScope); 560 EXPECT_EQ("bar", response.getX()); 561 562 auto response2 = pipelinePromise2.wait(context.waitScope); 563 checkTestMessage(response2); 564 565 EXPECT_EQ(3, context.restorer.callCount); 566 EXPECT_EQ(1, chainedCallCount); 567 } 568 569 KJ_TEST("RPC context.setPipeline") { 570 TestContext context; 571 572 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_PIPELINE) 573 .castAs<test::TestPipeline>(); 574 575 auto promise = client.getCapPipelineOnlyRequest().send(); 576 577 auto pipelineRequest = promise.getOutBox().getCap().fooRequest(); 578 pipelineRequest.setI(321); 579 auto pipelinePromise = pipelineRequest.send(); 580 581 auto pipelineRequest2 = promise.getOutBox().getCap().castAs<test::TestExtends>().graultRequest(); 582 auto pipelinePromise2 = pipelineRequest2.send(); 583 584 EXPECT_EQ(0, context.restorer.callCount); 585 586 auto response = pipelinePromise.wait(context.waitScope); 587 EXPECT_EQ("bar", response.getX()); 588 589 auto response2 = pipelinePromise2.wait(context.waitScope); 590 checkTestMessage(response2); 591 592 EXPECT_EQ(3, context.restorer.callCount); 593 594 // The original promise never completed. 595 KJ_EXPECT(!promise.poll(context.waitScope)); 596 } 597 598 TEST(Rpc, Release) { 599 TestContext context; 600 601 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF) 602 .castAs<test::TestMoreStuff>(); 603 604 auto handle1 = client.getHandleRequest().send().wait(context.waitScope).getHandle(); 605 auto promise = client.getHandleRequest().send(); 606 auto handle2 = promise.wait(context.waitScope).getHandle(); 607 608 EXPECT_EQ(2, context.restorer.handleCount); 609 610 handle1 = nullptr; 611 612 for (uint i = 0; i < 16; i++) kj::evalLater([]() {}).wait(context.waitScope); 613 EXPECT_EQ(1, context.restorer.handleCount); 614 615 handle2 = nullptr; 616 617 for (uint i = 0; i < 16; i++) kj::evalLater([]() {}).wait(context.waitScope); 618 EXPECT_EQ(1, context.restorer.handleCount); 619 620 promise = nullptr; 621 622 for (uint i = 0; i < 16; i++) kj::evalLater([]() {}).wait(context.waitScope); 623 EXPECT_EQ(0, context.restorer.handleCount); 624 } 625 626 TEST(Rpc, ReleaseOnCancel) { 627 // At one time, there was a bug where if a Return contained capabilities, but the client had 628 // canceled the request and already send a Finish (which presumably didn't reach the server before 629 // the Return), then we'd leak those caps. Test for that. 630 631 TestContext context; 632 633 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF) 634 .castAs<test::TestMoreStuff>(); 635 client.whenResolved().wait(context.waitScope); 636 637 { 638 auto promise = client.getHandleRequest().send(); 639 640 // If the server receives cancellation too early, it won't even return a capability in the 641 // results, it will just return "canceled". We want to emulate the case where the return message 642 // and the cancel (finish) message cross paths. It turns out that exactly two evalLater()s get 643 // us there. 644 // 645 // TODO(cleanup): This is fragile, but I'm not sure how else to write it without a ton 646 // of scaffolding. 647 kj::evalLater([]() {}).wait(context.waitScope); 648 kj::evalLater([]() {}).wait(context.waitScope); 649 } 650 651 for (uint i = 0; i < 16; i++) kj::evalLater([]() {}).wait(context.waitScope); 652 EXPECT_EQ(0, context.restorer.handleCount); 653 } 654 655 TEST(Rpc, TailCall) { 656 TestContext context; 657 658 auto caller = context.connect(test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLER) 659 .castAs<test::TestTailCaller>(); 660 661 int calleeCallCount = 0; 662 663 test::TestTailCallee::Client callee(kj::heap<TestTailCalleeImpl>(calleeCallCount)); 664 665 auto request = caller.fooRequest(); 666 request.setI(456); 667 request.setCallee(callee); 668 669 auto promise = request.send(); 670 671 auto dependentCall0 = promise.getC().getCallSequenceRequest().send(); 672 673 auto response = promise.wait(context.waitScope); 674 EXPECT_EQ(456, response.getI()); 675 EXPECT_EQ("from TestTailCaller", response.getT()); 676 677 auto dependentCall1 = promise.getC().getCallSequenceRequest().send(); 678 679 EXPECT_EQ(0, dependentCall0.wait(context.waitScope).getN()); 680 EXPECT_EQ(1, dependentCall1.wait(context.waitScope).getN()); 681 682 // TODO(someday): We used to initiate dependentCall2 here before waiting on the first two calls, 683 // and the ordering was still "correct". But this was apparently by accident. Calling getC() on 684 // the final response returns a different capability from calling getC() on the promise. There 685 // are no guarantees on the ordering of calls on the response capability vs. the earlier 686 // promise. When ordering matters, applications should take the original promise capability and 687 // keep using that. In theory the RPC system could create continuity here, but it would be 688 // annoying: for each capability that had been fetched on the promise, it would need to 689 // traverse to the same capability in the final response and swap it out in-place for the 690 // pipelined cap returned earlier. Maybe we'll determine later that that's really needed but 691 // for now I'm not gonna do it. 692 auto dependentCall2 = response.getC().getCallSequenceRequest().send(); 693 694 EXPECT_EQ(2, dependentCall2.wait(context.waitScope).getN()); 695 696 EXPECT_EQ(1, calleeCallCount); 697 EXPECT_EQ(1, context.restorer.callCount); 698 } 699 700 class TestHangingTailCallee final: public test::TestTailCallee::Server { 701 public: 702 TestHangingTailCallee(int& callCount, int& cancelCount) 703 : callCount(callCount), cancelCount(cancelCount) {} 704 705 kj::Promise<void> foo(FooContext context) override { 706 context.allowCancellation(); 707 ++callCount; 708 return kj::Promise<void>(kj::NEVER_DONE) 709 .attach(kj::defer([&cancelCount = cancelCount]() { ++cancelCount; })); 710 } 711 712 private: 713 int& callCount; 714 int& cancelCount; 715 }; 716 717 class TestRacingTailCaller final: public test::TestTailCaller::Server { 718 public: 719 TestRacingTailCaller(kj::Promise<void> unblock): unblock(kj::mv(unblock)) {} 720 721 kj::Promise<void> foo(FooContext context) override { 722 return unblock.then([context]() mutable { 723 auto tailRequest = context.getParams().getCallee().fooRequest(); 724 return context.tailCall(kj::mv(tailRequest)); 725 }); 726 } 727 728 private: 729 kj::Promise<void> unblock; 730 }; 731 732 TEST(Rpc, TailCallCancel) { 733 TestContext context; 734 735 auto caller = context.connect(test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLER) 736 .castAs<test::TestTailCaller>(); 737 738 int callCount = 0, cancelCount = 0; 739 740 test::TestTailCallee::Client callee(kj::heap<TestHangingTailCallee>(callCount, cancelCount)); 741 742 { 743 auto request = caller.fooRequest(); 744 request.setCallee(callee); 745 746 auto promise = request.send(); 747 748 KJ_ASSERT(callCount == 0); 749 KJ_ASSERT(cancelCount == 0); 750 751 KJ_ASSERT(!promise.poll(context.waitScope)); 752 753 KJ_ASSERT(callCount == 1); 754 KJ_ASSERT(cancelCount == 0); 755 } 756 757 kj::Promise<void>(kj::NEVER_DONE).poll(context.waitScope); 758 759 KJ_ASSERT(callCount == 1); 760 KJ_ASSERT(cancelCount == 1); 761 } 762 763 TEST(Rpc, TailCallCancelRace) { 764 auto paf = kj::newPromiseAndFulfiller<void>(); 765 TestContext context(kj::heap<TestRacingTailCaller>(kj::mv(paf.promise))); 766 767 MallocMessageBuilder serverHostIdBuilder; 768 auto serverHostId = serverHostIdBuilder.getRoot<test::TestSturdyRefHostId>(); 769 serverHostId.setHost("server"); 770 771 auto caller = context.rpcClient.bootstrap(serverHostId).castAs<test::TestTailCaller>(); 772 773 int callCount = 0, cancelCount = 0; 774 775 test::TestTailCallee::Client callee(kj::heap<TestHangingTailCallee>(callCount, cancelCount)); 776 777 { 778 auto request = caller.fooRequest(); 779 request.setCallee(callee); 780 781 auto promise = request.send(); 782 783 KJ_ASSERT(callCount == 0); 784 KJ_ASSERT(cancelCount == 0); 785 786 KJ_ASSERT(!promise.poll(context.waitScope)); 787 788 KJ_ASSERT(callCount == 0); 789 KJ_ASSERT(cancelCount == 0); 790 791 // Unblock the server and at the same time cancel the client. 792 paf.fulfiller->fulfill(); 793 } 794 795 kj::Promise<void>(kj::NEVER_DONE).poll(context.waitScope); 796 797 KJ_ASSERT(callCount == 1); 798 KJ_ASSERT(cancelCount == 1); 799 } 800 801 TEST(Rpc, Cancelation) { 802 // Tests allowCancellation(). 803 804 TestContext context; 805 806 auto paf = kj::newPromiseAndFulfiller<void>(); 807 bool destroyed = false; 808 auto destructionPromise = paf.promise.then([&]() { destroyed = true; }).eagerlyEvaluate(nullptr); 809 810 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF) 811 .castAs<test::TestMoreStuff>(); 812 813 kj::Promise<void> promise = nullptr; 814 815 bool returned = false; 816 { 817 auto request = client.expectCancelRequest(); 818 request.setCap(kj::heap<TestCapDestructor>(kj::mv(paf.fulfiller))); 819 promise = request.send().then( 820 [&](Response<test::TestMoreStuff::ExpectCancelResults>&& response) { 821 returned = true; 822 }).eagerlyEvaluate(nullptr); 823 } 824 kj::evalLater([]() {}).wait(context.waitScope); 825 kj::evalLater([]() {}).wait(context.waitScope); 826 kj::evalLater([]() {}).wait(context.waitScope); 827 kj::evalLater([]() {}).wait(context.waitScope); 828 kj::evalLater([]() {}).wait(context.waitScope); 829 kj::evalLater([]() {}).wait(context.waitScope); 830 831 // We can detect that the method was canceled because it will drop the cap. 832 EXPECT_FALSE(destroyed); 833 EXPECT_FALSE(returned); 834 835 promise = nullptr; // request cancellation 836 destructionPromise.wait(context.waitScope); 837 838 EXPECT_TRUE(destroyed); 839 EXPECT_FALSE(returned); 840 } 841 842 TEST(Rpc, PromiseResolve) { 843 TestContext context; 844 845 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF) 846 .castAs<test::TestMoreStuff>(); 847 848 int chainedCallCount = 0; 849 850 auto request = client.callFooRequest(); 851 auto request2 = client.callFooWhenResolvedRequest(); 852 853 auto paf = kj::newPromiseAndFulfiller<test::TestInterface::Client>(); 854 855 { 856 auto fork = paf.promise.fork(); 857 request.setCap(fork.addBranch()); 858 request2.setCap(fork.addBranch()); 859 } 860 861 auto promise = request.send(); 862 auto promise2 = request2.send(); 863 864 // Make sure getCap() has been called on the server side by sending another call and waiting 865 // for it. 866 EXPECT_EQ(2, client.getCallSequenceRequest().send().wait(context.waitScope).getN()); 867 EXPECT_EQ(3, context.restorer.callCount); 868 869 // OK, now fulfill the local promise. 870 paf.fulfiller->fulfill(kj::heap<TestInterfaceImpl>(chainedCallCount)); 871 872 // We should now be able to wait for getCap() to finish. 873 EXPECT_EQ("bar", promise.wait(context.waitScope).getS()); 874 EXPECT_EQ("bar", promise2.wait(context.waitScope).getS()); 875 876 EXPECT_EQ(3, context.restorer.callCount); 877 EXPECT_EQ(2, chainedCallCount); 878 } 879 880 TEST(Rpc, RetainAndRelease) { 881 TestContext context; 882 883 auto paf = kj::newPromiseAndFulfiller<void>(); 884 bool destroyed = false; 885 auto destructionPromise = paf.promise.then([&]() { destroyed = true; }).eagerlyEvaluate(nullptr); 886 887 { 888 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF) 889 .castAs<test::TestMoreStuff>(); 890 891 { 892 auto request = client.holdRequest(); 893 request.setCap(kj::heap<TestCapDestructor>(kj::mv(paf.fulfiller))); 894 request.send().wait(context.waitScope); 895 } 896 897 // Do some other call to add a round trip. 898 EXPECT_EQ(1, client.getCallSequenceRequest().send().wait(context.waitScope).getN()); 899 900 // Shouldn't be destroyed because it's being held by the server. 901 EXPECT_FALSE(destroyed); 902 903 // We can ask it to call the held capability. 904 EXPECT_EQ("bar", client.callHeldRequest().send().wait(context.waitScope).getS()); 905 906 { 907 // We can get the cap back from it. 908 auto capCopy = client.getHeldRequest().send().wait(context.waitScope).getCap(); 909 910 { 911 // And call it, without any network communications. 912 uint oldSentCount = context.clientNetwork.getSentCount(); 913 auto request = capCopy.fooRequest(); 914 request.setI(123); 915 request.setJ(true); 916 EXPECT_EQ("foo", request.send().wait(context.waitScope).getX()); 917 EXPECT_EQ(oldSentCount, context.clientNetwork.getSentCount()); 918 } 919 920 { 921 // We can send another copy of the same cap to another method, and it works. 922 auto request = client.callFooRequest(); 923 request.setCap(capCopy); 924 EXPECT_EQ("bar", request.send().wait(context.waitScope).getS()); 925 } 926 } 927 928 // Give some time to settle. 929 EXPECT_EQ(5, client.getCallSequenceRequest().send().wait(context.waitScope).getN()); 930 EXPECT_EQ(6, client.getCallSequenceRequest().send().wait(context.waitScope).getN()); 931 EXPECT_EQ(7, client.getCallSequenceRequest().send().wait(context.waitScope).getN()); 932 933 // Can't be destroyed, we haven't released it. 934 EXPECT_FALSE(destroyed); 935 } 936 937 // We released our client, which should cause the server to be released, which in turn will 938 // release the cap pointing back to us. 939 destructionPromise.wait(context.waitScope); 940 EXPECT_TRUE(destroyed); 941 } 942 943 TEST(Rpc, Cancel) { 944 TestContext context; 945 946 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF) 947 .castAs<test::TestMoreStuff>(); 948 949 auto paf = kj::newPromiseAndFulfiller<void>(); 950 bool destroyed = false; 951 auto destructionPromise = paf.promise.then([&]() { destroyed = true; }).eagerlyEvaluate(nullptr); 952 953 { 954 auto request = client.neverReturnRequest(); 955 request.setCap(kj::heap<TestCapDestructor>(kj::mv(paf.fulfiller))); 956 957 { 958 auto responsePromise = request.send(); 959 960 // Allow some time to settle. 961 EXPECT_EQ(1, client.getCallSequenceRequest().send().wait(context.waitScope).getN()); 962 EXPECT_EQ(2, client.getCallSequenceRequest().send().wait(context.waitScope).getN()); 963 964 // The cap shouldn't have been destroyed yet because the call never returned. 965 EXPECT_FALSE(destroyed); 966 } 967 } 968 969 // Now the cap should be released. 970 destructionPromise.wait(context.waitScope); 971 EXPECT_TRUE(destroyed); 972 } 973 974 TEST(Rpc, SendTwice) { 975 TestContext context; 976 977 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF) 978 .castAs<test::TestMoreStuff>(); 979 980 auto paf = kj::newPromiseAndFulfiller<void>(); 981 bool destroyed = false; 982 auto destructionPromise = paf.promise.then([&]() { destroyed = true; }).eagerlyEvaluate(nullptr); 983 984 auto cap = test::TestInterface::Client(kj::heap<TestCapDestructor>(kj::mv(paf.fulfiller))); 985 986 { 987 auto request = client.callFooRequest(); 988 request.setCap(cap); 989 990 EXPECT_EQ("bar", request.send().wait(context.waitScope).getS()); 991 } 992 993 // Allow some time for the server to release `cap`. 994 EXPECT_EQ(1, client.getCallSequenceRequest().send().wait(context.waitScope).getN()); 995 996 { 997 // More requests with the same cap. 998 auto request = client.callFooRequest(); 999 auto request2 = client.callFooRequest(); 1000 request.setCap(cap); 1001 request2.setCap(kj::mv(cap)); 1002 1003 auto promise = request.send(); 1004 auto promise2 = request2.send(); 1005 1006 EXPECT_EQ("bar", promise.wait(context.waitScope).getS()); 1007 EXPECT_EQ("bar", promise2.wait(context.waitScope).getS()); 1008 } 1009 1010 // Now the cap should be released. 1011 destructionPromise.wait(context.waitScope); 1012 EXPECT_TRUE(destroyed); 1013 } 1014 1015 RemotePromise<test::TestCallOrder::GetCallSequenceResults> getCallSequence( 1016 test::TestCallOrder::Client& client, uint expected) { 1017 auto req = client.getCallSequenceRequest(); 1018 req.setExpected(expected); 1019 return req.send(); 1020 } 1021 1022 TEST(Rpc, Embargo) { 1023 TestContext context; 1024 1025 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF) 1026 .castAs<test::TestMoreStuff>(); 1027 1028 auto cap = test::TestCallOrder::Client(kj::heap<TestCallOrderImpl>()); 1029 1030 auto earlyCall = client.getCallSequenceRequest().send(); 1031 1032 auto echoRequest = client.echoRequest(); 1033 echoRequest.setCap(cap); 1034 auto echo = echoRequest.send(); 1035 1036 auto pipeline = echo.getCap(); 1037 1038 auto call0 = getCallSequence(pipeline, 0); 1039 auto call1 = getCallSequence(pipeline, 1); 1040 1041 earlyCall.wait(context.waitScope); 1042 1043 auto call2 = getCallSequence(pipeline, 2); 1044 1045 auto resolved = echo.wait(context.waitScope).getCap(); 1046 1047 auto call3 = getCallSequence(pipeline, 3); 1048 auto call4 = getCallSequence(pipeline, 4); 1049 auto call5 = getCallSequence(pipeline, 5); 1050 1051 EXPECT_EQ(0, call0.wait(context.waitScope).getN()); 1052 EXPECT_EQ(1, call1.wait(context.waitScope).getN()); 1053 EXPECT_EQ(2, call2.wait(context.waitScope).getN()); 1054 EXPECT_EQ(3, call3.wait(context.waitScope).getN()); 1055 EXPECT_EQ(4, call4.wait(context.waitScope).getN()); 1056 EXPECT_EQ(5, call5.wait(context.waitScope).getN()); 1057 } 1058 1059 TEST(Rpc, EmbargoUnwrap) { 1060 // Test that embargos properly block unwraping a capability using CapabilityServerSet. 1061 1062 TestContext context; 1063 1064 capnp::CapabilityServerSet<test::TestCallOrder> capSet; 1065 1066 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF) 1067 .castAs<test::TestMoreStuff>(); 1068 1069 auto cap = capSet.add(kj::heap<TestCallOrderImpl>()); 1070 1071 auto earlyCall = client.getCallSequenceRequest().send(); 1072 1073 auto echoRequest = client.echoRequest(); 1074 echoRequest.setCap(cap); 1075 auto echo = echoRequest.send(); 1076 1077 auto pipeline = echo.getCap(); 1078 1079 auto unwrap = capSet.getLocalServer(pipeline) 1080 .then([](kj::Maybe<test::TestCallOrder::Server&> unwrapped) { 1081 return kj::downcast<TestCallOrderImpl>(KJ_ASSERT_NONNULL(unwrapped)).getCount(); 1082 }).eagerlyEvaluate(nullptr); 1083 1084 auto call0 = getCallSequence(pipeline, 0); 1085 auto call1 = getCallSequence(pipeline, 1); 1086 1087 earlyCall.wait(context.waitScope); 1088 1089 auto call2 = getCallSequence(pipeline, 2); 1090 1091 auto resolved = echo.wait(context.waitScope).getCap(); 1092 1093 auto call3 = getCallSequence(pipeline, 4); 1094 auto call4 = getCallSequence(pipeline, 4); 1095 auto call5 = getCallSequence(pipeline, 5); 1096 1097 EXPECT_EQ(0, call0.wait(context.waitScope).getN()); 1098 EXPECT_EQ(1, call1.wait(context.waitScope).getN()); 1099 EXPECT_EQ(2, call2.wait(context.waitScope).getN()); 1100 EXPECT_EQ(3, call3.wait(context.waitScope).getN()); 1101 EXPECT_EQ(4, call4.wait(context.waitScope).getN()); 1102 EXPECT_EQ(5, call5.wait(context.waitScope).getN()); 1103 1104 uint unwrappedAt = unwrap.wait(context.waitScope); 1105 KJ_EXPECT(unwrappedAt >= 3, unwrappedAt); 1106 } 1107 1108 template <typename T> 1109 void expectPromiseThrows(kj::Promise<T>&& promise, kj::WaitScope& waitScope) { 1110 EXPECT_TRUE(promise.then([](T&&) { return false; }, [](kj::Exception&&) { return true; }) 1111 .wait(waitScope)); 1112 } 1113 1114 template <> 1115 void expectPromiseThrows(kj::Promise<void>&& promise, kj::WaitScope& waitScope) { 1116 EXPECT_TRUE(promise.then([]() { return false; }, [](kj::Exception&&) { return true; }) 1117 .wait(waitScope)); 1118 } 1119 1120 TEST(Rpc, EmbargoError) { 1121 TestContext context; 1122 1123 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF) 1124 .castAs<test::TestMoreStuff>(); 1125 1126 auto paf = kj::newPromiseAndFulfiller<test::TestCallOrder::Client>(); 1127 1128 auto cap = test::TestCallOrder::Client(kj::mv(paf.promise)); 1129 1130 auto earlyCall = client.getCallSequenceRequest().send(); 1131 1132 auto echoRequest = client.echoRequest(); 1133 echoRequest.setCap(cap); 1134 auto echo = echoRequest.send(); 1135 1136 auto pipeline = echo.getCap(); 1137 1138 auto call0 = getCallSequence(pipeline, 0); 1139 auto call1 = getCallSequence(pipeline, 1); 1140 1141 earlyCall.wait(context.waitScope); 1142 1143 auto call2 = getCallSequence(pipeline, 2); 1144 1145 auto resolved = echo.wait(context.waitScope).getCap(); 1146 1147 auto call3 = getCallSequence(pipeline, 3); 1148 auto call4 = getCallSequence(pipeline, 4); 1149 auto call5 = getCallSequence(pipeline, 5); 1150 1151 paf.fulfiller->rejectIfThrows([]() { KJ_FAIL_ASSERT("foo") { break; } }); 1152 1153 expectPromiseThrows(kj::mv(call0), context.waitScope); 1154 expectPromiseThrows(kj::mv(call1), context.waitScope); 1155 expectPromiseThrows(kj::mv(call2), context.waitScope); 1156 expectPromiseThrows(kj::mv(call3), context.waitScope); 1157 expectPromiseThrows(kj::mv(call4), context.waitScope); 1158 expectPromiseThrows(kj::mv(call5), context.waitScope); 1159 1160 // Verify that we're still connected (there were no protocol errors). 1161 getCallSequence(client, 1).wait(context.waitScope); 1162 } 1163 1164 TEST(Rpc, EmbargoNull) { 1165 // Set up a situation where we pipeline on a capability that ends up coming back null. This 1166 // should NOT cause a Disembargo to be sent, but due to a bug in earlier versions of Cap'n Proto, 1167 // a Disembargo was indeed sent to the null capability, which caused the server to disconnect 1168 // due to protocol error. 1169 1170 TestContext context; 1171 1172 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF) 1173 .castAs<test::TestMoreStuff>(); 1174 1175 auto promise = client.getNullRequest().send(); 1176 1177 auto cap = promise.getNullCap(); 1178 1179 auto call0 = cap.getCallSequenceRequest().send(); 1180 1181 promise.wait(context.waitScope); 1182 1183 auto call1 = cap.getCallSequenceRequest().send(); 1184 1185 expectPromiseThrows(kj::mv(call0), context.waitScope); 1186 expectPromiseThrows(kj::mv(call1), context.waitScope); 1187 1188 // Verify that we're still connected (there were no protocol errors). 1189 getCallSequence(client, 0).wait(context.waitScope); 1190 } 1191 1192 TEST(Rpc, CallBrokenPromise) { 1193 // Tell the server to call back to a promise client, then resolve the promise to an error. 1194 1195 TestContext context; 1196 1197 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF) 1198 .castAs<test::TestMoreStuff>(); 1199 auto paf = kj::newPromiseAndFulfiller<test::TestInterface::Client>(); 1200 1201 { 1202 auto req = client.holdRequest(); 1203 req.setCap(kj::mv(paf.promise)); 1204 req.send().wait(context.waitScope); 1205 } 1206 1207 bool returned = false; 1208 auto req = client.callHeldRequest().send() 1209 .then([&](capnp::Response<test::TestMoreStuff::CallHeldResults>&&) { 1210 returned = true; 1211 }, [&](kj::Exception&& e) { 1212 returned = true; 1213 kj::throwRecoverableException(kj::mv(e)); 1214 }).eagerlyEvaluate(nullptr); 1215 1216 kj::evalLater([]() {}).wait(context.waitScope); 1217 kj::evalLater([]() {}).wait(context.waitScope); 1218 kj::evalLater([]() {}).wait(context.waitScope); 1219 kj::evalLater([]() {}).wait(context.waitScope); 1220 kj::evalLater([]() {}).wait(context.waitScope); 1221 kj::evalLater([]() {}).wait(context.waitScope); 1222 kj::evalLater([]() {}).wait(context.waitScope); 1223 kj::evalLater([]() {}).wait(context.waitScope); 1224 1225 EXPECT_FALSE(returned); 1226 1227 paf.fulfiller->rejectIfThrows([]() { KJ_FAIL_ASSERT("foo") { break; } }); 1228 1229 expectPromiseThrows(kj::mv(req), context.waitScope); 1230 EXPECT_TRUE(returned); 1231 1232 kj::evalLater([]() {}).wait(context.waitScope); 1233 kj::evalLater([]() {}).wait(context.waitScope); 1234 kj::evalLater([]() {}).wait(context.waitScope); 1235 kj::evalLater([]() {}).wait(context.waitScope); 1236 kj::evalLater([]() {}).wait(context.waitScope); 1237 kj::evalLater([]() {}).wait(context.waitScope); 1238 kj::evalLater([]() {}).wait(context.waitScope); 1239 kj::evalLater([]() {}).wait(context.waitScope); 1240 1241 // Verify that we're still connected (there were no protocol errors). 1242 getCallSequence(client, 1).wait(context.waitScope); 1243 } 1244 1245 TEST(Rpc, Abort) { 1246 // Verify that aborts are received. 1247 1248 TestContext context; 1249 1250 MallocMessageBuilder refMessage(128); 1251 auto hostId = refMessage.initRoot<test::TestSturdyRefHostId>(); 1252 hostId.setHost("server"); 1253 1254 auto conn = KJ_ASSERT_NONNULL(context.clientNetwork.connect(hostId)); 1255 1256 { 1257 // Send an invalid message (Return to non-existent question). 1258 auto msg = conn->newOutgoingMessage(128); 1259 auto body = msg->getBody().initAs<rpc::Message>().initReturn(); 1260 body.setAnswerId(1234); 1261 body.setCanceled(); 1262 msg->send(); 1263 } 1264 1265 auto reply = KJ_ASSERT_NONNULL(conn->receiveIncomingMessage().wait(context.waitScope)); 1266 EXPECT_EQ(rpc::Message::ABORT, reply->getBody().getAs<rpc::Message>().which()); 1267 1268 EXPECT_TRUE(conn->receiveIncomingMessage().wait(context.waitScope) == nullptr); 1269 } 1270 1271 KJ_TEST("loopback bootstrap()") { 1272 int callCount = 0; 1273 test::TestInterface::Client bootstrap = kj::heap<TestInterfaceImpl>(callCount); 1274 1275 MallocMessageBuilder hostIdBuilder; 1276 auto hostId = hostIdBuilder.getRoot<test::TestSturdyRefHostId>(); 1277 hostId.setHost("server"); 1278 1279 TestContext context(bootstrap); 1280 auto client = context.rpcServer.bootstrap(hostId).castAs<test::TestInterface>(); 1281 1282 auto request = client.fooRequest(); 1283 request.setI(123); 1284 request.setJ(true); 1285 auto response = request.send().wait(context.waitScope); 1286 1287 KJ_EXPECT(response.getX() == "foo"); 1288 KJ_EXPECT(callCount == 1); 1289 } 1290 1291 KJ_TEST("method throws exception") { 1292 TestContext context; 1293 1294 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF) 1295 .castAs<test::TestMoreStuff>(); 1296 1297 kj::Maybe<kj::Exception> maybeException; 1298 client.throwExceptionRequest().send().ignoreResult() 1299 .catch_([&](kj::Exception&& e) { 1300 maybeException = kj::mv(e); 1301 }).wait(context.waitScope); 1302 1303 auto exception = KJ_ASSERT_NONNULL(maybeException); 1304 KJ_EXPECT(exception.getDescription() == "remote exception: test exception"); 1305 KJ_EXPECT(exception.getRemoteTrace() == nullptr); 1306 } 1307 1308 KJ_TEST("method throws exception won't redundantly add remote exception prefix") { 1309 TestContext context; 1310 1311 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF) 1312 .castAs<test::TestMoreStuff>(); 1313 1314 kj::Maybe<kj::Exception> maybeException; 1315 client.throwRemoteExceptionRequest().send().ignoreResult() 1316 .catch_([&](kj::Exception&& e) { 1317 maybeException = kj::mv(e); 1318 }).wait(context.waitScope); 1319 1320 auto exception = KJ_ASSERT_NONNULL(maybeException); 1321 KJ_EXPECT(exception.getDescription() == "remote exception: test exception"); 1322 KJ_EXPECT(exception.getRemoteTrace() == nullptr); 1323 } 1324 1325 KJ_TEST("method throws exception with trace encoder") { 1326 TestContext context; 1327 1328 context.rpcServer.setTraceEncoder([](const kj::Exception& e) { 1329 return kj::str("trace for ", e.getDescription()); 1330 }); 1331 1332 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF) 1333 .castAs<test::TestMoreStuff>(); 1334 1335 kj::Maybe<kj::Exception> maybeException; 1336 client.throwExceptionRequest().send().ignoreResult() 1337 .catch_([&](kj::Exception&& e) { 1338 maybeException = kj::mv(e); 1339 }).wait(context.waitScope); 1340 1341 auto exception = KJ_ASSERT_NONNULL(maybeException); 1342 KJ_EXPECT(exception.getDescription() == "remote exception: test exception"); 1343 KJ_EXPECT(exception.getRemoteTrace() == "trace for test exception"); 1344 } 1345 1346 KJ_TEST("when OutgoingRpcMessage::send() throws, we don't leak exports") { 1347 // When OutgoingRpcMessage::send() throws an exception on a Call message, we need to clean up 1348 // anything that had been added to the export table as part of the call. At one point this 1349 // cleanup was missing, so exports would leak. 1350 1351 TestContext context; 1352 1353 uint32_t expectedExportNumber = 0; 1354 uint interceptCount = 0; 1355 bool shouldThrowFromSend = false; 1356 context.clientNetwork.onSend([&](MessageBuilder& builder) { 1357 auto message = builder.getRoot<rpc::Message>().asReader(); 1358 if (message.isCall()) { 1359 auto call = message.getCall(); 1360 if (call.getInterfaceId() == capnp::typeId<test::TestMoreStuff>() && 1361 call.getMethodId() == 0) { 1362 // callFoo() request, expect a capability in the param caps. Specifically we expect a 1363 // promise, because that's what we send below. 1364 auto capTable = call.getParams().getCapTable(); 1365 KJ_ASSERT(capTable.size() == 1); 1366 auto desc = capTable[0]; 1367 KJ_ASSERT(desc.isSenderPromise()); 1368 KJ_ASSERT(desc.getSenderPromise() == expectedExportNumber); 1369 1370 ++interceptCount; 1371 if (shouldThrowFromSend) { 1372 kj::throwRecoverableException(KJ_EXCEPTION(FAILED, "intercepted")); 1373 return false; // only matters when -fno-exceptions 1374 } 1375 } 1376 } 1377 return true; 1378 }); 1379 1380 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF) 1381 .castAs<test::TestMoreStuff>(); 1382 1383 { 1384 shouldThrowFromSend = true; 1385 auto req = client.callFooRequest(); 1386 req.setCap(kj::Promise<test::TestInterface::Client>(kj::NEVER_DONE)); 1387 req.send().then([](auto&&) { 1388 KJ_FAIL_ASSERT("should have thrown"); 1389 }, [](kj::Exception&& e) { 1390 KJ_EXPECT(e.getDescription() == "intercepted", e); 1391 }).wait(context.waitScope); 1392 } 1393 1394 KJ_EXPECT(interceptCount == 1); 1395 1396 // Sending again should use the same export number, because the export table entry should have 1397 // been released when send() threw. (At one point, this was a bug...) 1398 { 1399 shouldThrowFromSend = true; 1400 auto req = client.callFooRequest(); 1401 req.setCap(kj::Promise<test::TestInterface::Client>(kj::NEVER_DONE)); 1402 req.send().then([](auto&&) { 1403 KJ_FAIL_ASSERT("should have thrown"); 1404 }, [](kj::Exception&& e) { 1405 KJ_EXPECT(e.getDescription() == "intercepted", e); 1406 }).wait(context.waitScope); 1407 } 1408 1409 KJ_EXPECT(interceptCount == 2); 1410 1411 // Now lets start a call that doesn't throw. The export number should still be zero because 1412 // the previous exports were released. 1413 { 1414 shouldThrowFromSend = false; 1415 auto req = client.callFooRequest(); 1416 req.setCap(kj::Promise<test::TestInterface::Client>(kj::NEVER_DONE)); 1417 auto promise = req.send(); 1418 KJ_EXPECT(!promise.poll(context.waitScope)); 1419 1420 KJ_EXPECT(interceptCount == 3); 1421 } 1422 1423 // We canceled the previous call, BUT the exported capability is still present until the other 1424 // side drops it, which it won't because the call isn't marked cancelable and never completes. 1425 // Now, let's send another call. This time, we expect a new export number will actually be 1426 // allocated. 1427 { 1428 shouldThrowFromSend = false; 1429 expectedExportNumber = 1; 1430 auto req = client.callFooRequest(); 1431 auto paf = kj::newPromiseAndFulfiller<test::TestInterface::Client>(); 1432 req.setCap(kj::mv(paf.promise)); 1433 auto promise = req.send(); 1434 KJ_EXPECT(!promise.poll(context.waitScope)); 1435 1436 KJ_EXPECT(interceptCount == 4); 1437 1438 // Now let's actually let the RPC complete so we can verify the RPC system isn't broken or 1439 // anything. 1440 int callCount = 0; 1441 paf.fulfiller->fulfill(kj::heap<TestInterfaceImpl>(callCount)); 1442 auto resp = promise.wait(context.waitScope); 1443 KJ_EXPECT(resp.getS() == "bar"); 1444 KJ_EXPECT(callCount == 1); 1445 } 1446 1447 // Now if we do yet another call, it'll reuse export number 1. 1448 { 1449 shouldThrowFromSend = false; 1450 expectedExportNumber = 1; 1451 auto req = client.callFooRequest(); 1452 req.setCap(kj::Promise<test::TestInterface::Client>(kj::NEVER_DONE)); 1453 auto promise = req.send(); 1454 KJ_EXPECT(!promise.poll(context.waitScope)); 1455 1456 KJ_EXPECT(interceptCount == 5); 1457 } 1458 } 1459 1460 KJ_TEST("export the same promise twice") { 1461 TestContext context; 1462 1463 bool exportIsPromise; 1464 uint32_t expectedExportNumber; 1465 uint interceptCount = 0; 1466 context.clientNetwork.onSend([&](MessageBuilder& builder) { 1467 auto message = builder.getRoot<rpc::Message>().asReader(); 1468 if (message.isCall()) { 1469 auto call = message.getCall(); 1470 if (call.getInterfaceId() == capnp::typeId<test::TestMoreStuff>() && 1471 call.getMethodId() == 0) { 1472 // callFoo() request, expect a capability in the param caps. Specifically we expect a 1473 // promise, because that's what we send below. 1474 auto capTable = call.getParams().getCapTable(); 1475 KJ_ASSERT(capTable.size() == 1); 1476 auto desc = capTable[0]; 1477 if (exportIsPromise) { 1478 KJ_ASSERT(desc.isSenderPromise()); 1479 KJ_ASSERT(desc.getSenderPromise() == expectedExportNumber); 1480 } else { 1481 KJ_ASSERT(desc.isSenderHosted()); 1482 KJ_ASSERT(desc.getSenderHosted() == expectedExportNumber); 1483 } 1484 1485 ++interceptCount; 1486 } 1487 } 1488 return true; 1489 }); 1490 1491 auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF) 1492 .castAs<test::TestMoreStuff>(); 1493 1494 auto sendReq = [&](test::TestInterface::Client cap) { 1495 auto req = client.callFooRequest(); 1496 req.setCap(kj::mv(cap)); 1497 return req.send(); 1498 }; 1499 1500 auto expectNeverDone = [&](auto& promise) { 1501 if (promise.poll(context.waitScope)) { 1502 promise.wait(context.waitScope); // let it throw if it's going to 1503 KJ_FAIL_ASSERT("promise finished without throwing"); 1504 } 1505 }; 1506 1507 int callCount = 0; 1508 test::TestInterface::Client normalCap = kj::heap<TestInterfaceImpl>(callCount); 1509 test::TestInterface::Client promiseCap = kj::Promise<test::TestInterface::Client>(kj::NEVER_DONE); 1510 1511 // Send request with a promise capability in the params. 1512 exportIsPromise = true; 1513 expectedExportNumber = 0; 1514 auto promise1 = sendReq(promiseCap); 1515 expectNeverDone(promise1); 1516 KJ_EXPECT(interceptCount == 1); 1517 1518 // Send a second request with the same promise should use the same export table entry. 1519 auto promise2 = sendReq(promiseCap); 1520 expectNeverDone(promise2); 1521 KJ_EXPECT(interceptCount == 2); 1522 1523 // Sending a request with a different promise should use a different export table entry. 1524 expectedExportNumber = 1; 1525 auto promise3 = sendReq(kj::Promise<test::TestInterface::Client>(kj::NEVER_DONE)); 1526 expectNeverDone(promise3); 1527 KJ_EXPECT(interceptCount == 3); 1528 1529 // Now try sending a non-promise cap. We'll send all these requests at once before waiting on 1530 // any of them since these will acutally complete.k 1531 exportIsPromise = false; 1532 expectedExportNumber = 2; 1533 auto promise4 = sendReq(normalCap); 1534 auto promise5 = sendReq(normalCap); 1535 expectedExportNumber = 3; 1536 auto promise6 = sendReq(kj::heap<TestInterfaceImpl>(callCount)); 1537 KJ_EXPECT(interceptCount == 6); 1538 1539 KJ_EXPECT(promise4.wait(context.waitScope).getS() == "bar"); 1540 KJ_EXPECT(promise5.wait(context.waitScope).getS() == "bar"); 1541 KJ_EXPECT(promise6.wait(context.waitScope).getS() == "bar"); 1542 KJ_EXPECT(callCount == 3); 1543 } 1544 1545 } // namespace 1546 } // namespace _ (private) 1547 } // namespace capnp