rpc-twoparty-test.c++ (31437B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #define CAPNP_TESTING_CAPNP 1 23 24 #ifndef _GNU_SOURCE 25 #define _GNU_SOURCE 26 #endif 27 28 // Includes just for need SOL_SOCKET and SO_SNDBUF 29 #if _WIN32 30 #include <kj/win32-api-version.h> 31 #endif 32 33 #include "rpc-twoparty.h" 34 #include "test-util.h" 35 #include <capnp/rpc.capnp.h> 36 #include <kj/debug.h> 37 #include <kj/thread.h> 38 #include <kj/compat/gtest.h> 39 #include <kj/miniposix.h> 40 41 #if _WIN32 42 #include <winsock2.h> 43 #include <mswsock.h> 44 #include <kj/windows-sanity.h> 45 #else 46 #include <sys/socket.h> 47 #endif 48 49 // TODO(cleanup): Auto-generate stringification functions for union discriminants. 50 namespace capnp { 51 namespace rpc { 52 inline kj::String KJ_STRINGIFY(Message::Which which) { 53 return kj::str(static_cast<uint16_t>(which)); 54 } 55 } // namespace rpc 56 } // namespace capnp 57 58 namespace capnp { 59 namespace _ { 60 namespace { 61 62 class TestRestorer final: public SturdyRefRestorer<test::TestSturdyRefObjectId> { 63 public: 64 TestRestorer(int& callCount, int& handleCount) 65 : callCount(callCount), handleCount(handleCount) {} 66 67 Capability::Client restore(test::TestSturdyRefObjectId::Reader objectId) override { 68 switch (objectId.getTag()) { 69 case test::TestSturdyRefObjectId::Tag::TEST_INTERFACE: 70 return kj::heap<TestInterfaceImpl>(callCount); 71 case test::TestSturdyRefObjectId::Tag::TEST_EXTENDS: 72 return Capability::Client(newBrokenCap("No TestExtends implemented.")); 73 case test::TestSturdyRefObjectId::Tag::TEST_PIPELINE: 74 return kj::heap<TestPipelineImpl>(callCount); 75 case test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLEE: 76 return kj::heap<TestTailCalleeImpl>(callCount); 77 case test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLER: 78 return kj::heap<TestTailCallerImpl>(callCount); 79 case test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF: 80 return kj::heap<TestMoreStuffImpl>(callCount, handleCount); 81 } 82 KJ_UNREACHABLE; 83 } 84 85 private: 86 int& callCount; 87 int& handleCount; 88 }; 89 90 class TestMonotonicClock final: public kj::MonotonicClock { 91 public: 92 kj::TimePoint now() const override { 93 return time; 94 } 95 96 void reset() { time = kj::systemCoarseMonotonicClock().now(); } 97 void increment(kj::Duration d) { time += d; } 98 private: 99 kj::TimePoint time = kj::systemCoarseMonotonicClock().now(); 100 }; 101 102 kj::AsyncIoProvider::PipeThread runServer(kj::AsyncIoProvider& ioProvider, 103 int& callCount, int& handleCount) { 104 return ioProvider.newPipeThread( 105 [&callCount, &handleCount]( 106 kj::AsyncIoProvider& ioProvider, kj::AsyncIoStream& stream, kj::WaitScope& waitScope) { 107 TwoPartyVatNetwork network(stream, rpc::twoparty::Side::SERVER); 108 TestRestorer restorer(callCount, handleCount); 109 auto server = makeRpcServer(network, restorer); 110 network.onDisconnect().wait(waitScope); 111 }); 112 } 113 114 Capability::Client getPersistentCap(RpcSystem<rpc::twoparty::VatId>& client, 115 rpc::twoparty::Side side, 116 test::TestSturdyRefObjectId::Tag tag) { 117 // Create the VatId. 118 MallocMessageBuilder hostIdMessage(8); 119 auto hostId = hostIdMessage.initRoot<rpc::twoparty::VatId>(); 120 hostId.setSide(side); 121 122 // Create the SturdyRefObjectId. 123 MallocMessageBuilder objectIdMessage(8); 124 objectIdMessage.initRoot<test::TestSturdyRefObjectId>().setTag(tag); 125 126 // Connect to the remote capability. 127 return client.restore(hostId, objectIdMessage.getRoot<AnyPointer>()); 128 } 129 130 TEST(TwoPartyNetwork, Basic) { 131 auto ioContext = kj::setupAsyncIo(); 132 TestMonotonicClock clock; 133 int callCount = 0; 134 int handleCount = 0; 135 136 auto serverThread = runServer(*ioContext.provider, callCount, handleCount); 137 TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT, capnp::ReaderOptions(), clock); 138 auto rpcClient = makeRpcClient(network); 139 140 KJ_EXPECT(network.getCurrentQueueCount() == 0); 141 KJ_EXPECT(network.getCurrentQueueSize() == 0); 142 KJ_EXPECT(network.getOutgoingMessageWaitTime() == 0 * kj::SECONDS); 143 144 // Request the particular capability from the server. 145 auto client = getPersistentCap(rpcClient, rpc::twoparty::Side::SERVER, 146 test::TestSturdyRefObjectId::Tag::TEST_INTERFACE).castAs<test::TestInterface>(); 147 clock.increment(1 * kj::SECONDS); 148 149 KJ_EXPECT(network.getCurrentQueueCount() == 1); 150 KJ_EXPECT(network.getCurrentQueueSize() > 0); 151 KJ_EXPECT(network.getOutgoingMessageWaitTime() == 1 * kj::SECONDS); 152 size_t oldSize = network.getCurrentQueueSize(); 153 154 // Use the capability. 155 auto request1 = client.fooRequest(); 156 request1.setI(123); 157 request1.setJ(true); 158 auto promise1 = request1.send(); 159 160 KJ_EXPECT(network.getCurrentQueueCount() == 2); 161 KJ_EXPECT(network.getCurrentQueueSize() > oldSize); 162 KJ_EXPECT(network.getOutgoingMessageWaitTime() == 1 * kj::SECONDS); 163 oldSize = network.getCurrentQueueSize(); 164 165 auto request2 = client.bazRequest(); 166 initTestMessage(request2.initS()); 167 auto promise2 = request2.send(); 168 169 KJ_EXPECT(network.getCurrentQueueCount() == 3); 170 KJ_EXPECT(network.getCurrentQueueSize() > oldSize); 171 oldSize = network.getCurrentQueueSize(); 172 173 clock.increment(1 * kj::SECONDS); 174 175 bool barFailed = false; 176 auto request3 = client.barRequest(); 177 auto promise3 = request3.send().then( 178 [](Response<test::TestInterface::BarResults>&& response) { 179 ADD_FAILURE() << "Expected bar() call to fail."; 180 }, [&](kj::Exception&& e) { 181 barFailed = true; 182 }); 183 184 EXPECT_EQ(0, callCount); 185 186 KJ_EXPECT(network.getCurrentQueueCount() == 4); 187 KJ_EXPECT(network.getCurrentQueueSize() > oldSize); 188 // Oldest message is now 2 seconds old 189 KJ_EXPECT(network.getOutgoingMessageWaitTime() == 2 * kj::SECONDS); 190 oldSize = network.getCurrentQueueSize(); 191 192 auto response1 = promise1.wait(ioContext.waitScope); 193 194 EXPECT_EQ("foo", response1.getX()); 195 196 auto response2 = promise2.wait(ioContext.waitScope); 197 198 promise3.wait(ioContext.waitScope); 199 200 EXPECT_EQ(2, callCount); 201 EXPECT_TRUE(barFailed); 202 203 // There's still a `Finish` message queued. 204 KJ_EXPECT(network.getCurrentQueueCount() > 0); 205 KJ_EXPECT(network.getCurrentQueueSize() > 0); 206 // Oldest message was sent, next oldest should be 0 seconds old since we haven't incremented 207 // the clock yet. 208 KJ_EXPECT(network.getOutgoingMessageWaitTime() == 0 * kj::SECONDS); 209 210 // Let any I/O finish. 211 kj::Promise<void>(kj::NEVER_DONE).poll(ioContext.waitScope); 212 213 // Now nothing is queued. 214 KJ_EXPECT(network.getCurrentQueueCount() == 0); 215 KJ_EXPECT(network.getCurrentQueueSize() == 0); 216 } 217 218 TEST(TwoPartyNetwork, Pipelining) { 219 auto ioContext = kj::setupAsyncIo(); 220 int callCount = 0; 221 int handleCount = 0; 222 int reverseCallCount = 0; // Calls back from server to client. 223 224 auto serverThread = runServer(*ioContext.provider, callCount, handleCount); 225 TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT); 226 auto rpcClient = makeRpcClient(network); 227 228 bool disconnected = false; 229 kj::Promise<void> disconnectPromise = network.onDisconnect().then([&]() { disconnected = true; }); 230 231 { 232 // Request the particular capability from the server. 233 auto client = getPersistentCap(rpcClient, rpc::twoparty::Side::SERVER, 234 test::TestSturdyRefObjectId::Tag::TEST_PIPELINE).castAs<test::TestPipeline>(); 235 236 { 237 // Use the capability. 238 auto request = client.getCapRequest(); 239 request.setN(234); 240 request.setInCap(kj::heap<TestInterfaceImpl>(reverseCallCount)); 241 242 auto promise = request.send(); 243 244 auto pipelineRequest = promise.getOutBox().getCap().fooRequest(); 245 pipelineRequest.setI(321); 246 auto pipelinePromise = pipelineRequest.send(); 247 248 auto pipelineRequest2 = promise.getOutBox().getCap() 249 .castAs<test::TestExtends>().graultRequest(); 250 auto pipelinePromise2 = pipelineRequest2.send(); 251 252 promise = nullptr; // Just to be annoying, drop the original promise. 253 254 EXPECT_EQ(0, callCount); 255 EXPECT_EQ(0, reverseCallCount); 256 257 auto response = pipelinePromise.wait(ioContext.waitScope); 258 EXPECT_EQ("bar", response.getX()); 259 260 auto response2 = pipelinePromise2.wait(ioContext.waitScope); 261 checkTestMessage(response2); 262 263 EXPECT_EQ(3, callCount); 264 EXPECT_EQ(1, reverseCallCount); 265 } 266 267 EXPECT_FALSE(disconnected); 268 269 // What if we disconnect? 270 // TODO(cleanup): This is kind of cheating, we are shutting down the underlying socket to 271 // simulate a disconnect, but it's weird to pull the rug out from under our VatNetwork like 272 // this and it causes a bit of a race between write failures and read failures. This part of 273 // the test should maybe be restructured. 274 serverThread.pipe->shutdownWrite(); 275 276 // The other side should also disconnect. 277 disconnectPromise.wait(ioContext.waitScope); 278 279 { 280 // Use the now-broken capability. 281 auto request = client.getCapRequest(); 282 request.setN(234); 283 request.setInCap(kj::heap<TestInterfaceImpl>(reverseCallCount)); 284 285 auto promise = request.send(); 286 287 auto pipelineRequest = promise.getOutBox().getCap().fooRequest(); 288 pipelineRequest.setI(321); 289 auto pipelinePromise = pipelineRequest.send(); 290 291 auto pipelineRequest2 = promise.getOutBox().getCap() 292 .castAs<test::TestExtends>().graultRequest(); 293 auto pipelinePromise2 = pipelineRequest2.send(); 294 295 pipelinePromise.then([](auto) { 296 KJ_FAIL_EXPECT("should have thrown"); 297 }, [](kj::Exception&& e) { 298 KJ_EXPECT(e.getType() == kj::Exception::Type::DISCONNECTED); 299 // I wish we could test stack traces somehow... oh well. 300 }).wait(ioContext.waitScope); 301 302 pipelinePromise2.then([](auto) { 303 KJ_FAIL_EXPECT("should have thrown"); 304 }, [](kj::Exception&& e) { 305 KJ_EXPECT(e.getType() == kj::Exception::Type::DISCONNECTED); 306 // I wish we could test stack traces somehow... oh well. 307 }).wait(ioContext.waitScope); 308 309 EXPECT_EQ(3, callCount); 310 EXPECT_EQ(1, reverseCallCount); 311 } 312 } 313 } 314 315 TEST(TwoPartyNetwork, Release) { 316 auto ioContext = kj::setupAsyncIo(); 317 int callCount = 0; 318 int handleCount = 0; 319 320 auto serverThread = runServer(*ioContext.provider, callCount, handleCount); 321 TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT); 322 auto rpcClient = makeRpcClient(network); 323 324 // Request the particular capability from the server. 325 auto client = getPersistentCap(rpcClient, rpc::twoparty::Side::SERVER, 326 test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF).castAs<test::TestMoreStuff>(); 327 328 auto handle1 = client.getHandleRequest().send().wait(ioContext.waitScope).getHandle(); 329 auto promise = client.getHandleRequest().send(); 330 auto handle2 = promise.wait(ioContext.waitScope).getHandle(); 331 332 EXPECT_EQ(2, handleCount); 333 334 handle1 = nullptr; 335 336 // There once was a bug where the last outgoing message (and any capabilities attached) would 337 // not get cleaned up (until a new message was sent). This appeared to be a bug in Release, 338 // because if a client received a message and then released a capability from it but then did 339 // not make any further calls, then the capability would not be released because the message 340 // introducing it remained the last server -> client message (because a "Release" message has 341 // no reply). Here we are explicitly trying to catch this bug. This proves tricky, because when 342 // we drop a reference on the client side, there's no particular way to wait for the release 343 // message to reach the server except to make a subsequent call and wait for the return -- but 344 // that would mask the bug. So, we wait spin waiting for handleCount to change. 345 346 uint maxSpins = 1000; 347 348 while (handleCount > 1) { 349 ioContext.provider->getTimer().afterDelay(10 * kj::MILLISECONDS).wait(ioContext.waitScope); 350 KJ_ASSERT(--maxSpins > 0); 351 } 352 EXPECT_EQ(1, handleCount); 353 354 handle2 = nullptr; 355 356 ioContext.provider->getTimer().afterDelay(10 * kj::MILLISECONDS).wait(ioContext.waitScope); 357 EXPECT_EQ(1, handleCount); 358 359 promise = nullptr; 360 361 while (handleCount > 0) { 362 ioContext.provider->getTimer().afterDelay(10 * kj::MILLISECONDS).wait(ioContext.waitScope); 363 KJ_ASSERT(--maxSpins > 0); 364 } 365 EXPECT_EQ(0, handleCount); 366 } 367 368 TEST(TwoPartyNetwork, Abort) { 369 // Verify that aborts are received. 370 371 auto ioContext = kj::setupAsyncIo(); 372 int callCount = 0; 373 int handleCount = 0; 374 375 auto serverThread = runServer(*ioContext.provider, callCount, handleCount); 376 TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT); 377 378 MallocMessageBuilder refMessage(128); 379 auto hostId = refMessage.initRoot<rpc::twoparty::VatId>(); 380 hostId.setSide(rpc::twoparty::Side::SERVER); 381 382 auto conn = KJ_ASSERT_NONNULL(network.connect(hostId)); 383 384 { 385 // Send an invalid message (Return to non-existent question). 386 auto msg = conn->newOutgoingMessage(128); 387 auto body = msg->getBody().initAs<rpc::Message>().initReturn(); 388 body.setAnswerId(1234); 389 body.setCanceled(); 390 msg->send(); 391 } 392 393 auto reply = KJ_ASSERT_NONNULL(conn->receiveIncomingMessage().wait(ioContext.waitScope)); 394 EXPECT_EQ(rpc::Message::ABORT, reply->getBody().getAs<rpc::Message>().which()); 395 396 EXPECT_TRUE(conn->receiveIncomingMessage().wait(ioContext.waitScope) == nullptr); 397 } 398 399 TEST(TwoPartyNetwork, ConvenienceClasses) { 400 auto ioContext = kj::setupAsyncIo(); 401 402 int callCount = 0; 403 TwoPartyServer server(kj::heap<TestInterfaceImpl>(callCount)); 404 405 auto address = ioContext.provider->getNetwork() 406 .parseAddress("127.0.0.1").wait(ioContext.waitScope); 407 408 auto listener = address->listen(); 409 auto listenPromise = server.listen(*listener); 410 411 address = ioContext.provider->getNetwork() 412 .parseAddress("127.0.0.1", listener->getPort()).wait(ioContext.waitScope); 413 414 auto connection = address->connect().wait(ioContext.waitScope); 415 TwoPartyClient client(*connection); 416 auto cap = client.bootstrap().castAs<test::TestInterface>(); 417 418 auto request = cap.fooRequest(); 419 request.setI(123); 420 request.setJ(true); 421 EXPECT_EQ(0, callCount); 422 auto response = request.send().wait(ioContext.waitScope); 423 EXPECT_EQ("foo", response.getX()); 424 EXPECT_EQ(1, callCount); 425 } 426 427 TEST(TwoPartyNetwork, HugeMessage) { 428 auto ioContext = kj::setupAsyncIo(); 429 int callCount = 0; 430 int handleCount = 0; 431 432 auto serverThread = runServer(*ioContext.provider, callCount, handleCount); 433 TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT); 434 auto rpcClient = makeRpcClient(network); 435 436 auto client = getPersistentCap(rpcClient, rpc::twoparty::Side::SERVER, 437 test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF).castAs<test::TestMoreStuff>(); 438 439 // Oversized request fails. 440 { 441 auto req = client.methodWithDefaultsRequest(); 442 req.initA(100000000); // 100 MB 443 444 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("larger than our single-message size limit", 445 req.send().ignoreResult().wait(ioContext.waitScope)); 446 } 447 448 // Oversized response fails. 449 KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("larger than our single-message size limit", 450 client.getEnormousStringRequest().send().ignoreResult().wait(ioContext.waitScope)); 451 452 // Connection is still up. 453 { 454 auto req = client.getCallSequenceRequest(); 455 req.setExpected(0); 456 KJ_EXPECT(req.send().wait(ioContext.waitScope).getN() == 0); 457 } 458 } 459 460 class TestAuthenticatedBootstrapImpl final 461 : public test::TestAuthenticatedBootstrap<rpc::twoparty::VatId>::Server { 462 public: 463 TestAuthenticatedBootstrapImpl(rpc::twoparty::VatId::Reader clientId) { 464 this->clientId.setRoot(clientId); 465 } 466 467 protected: 468 kj::Promise<void> getCallerId(GetCallerIdContext context) override { 469 context.getResults().setCaller(clientId.getRoot<rpc::twoparty::VatId>()); 470 return kj::READY_NOW; 471 } 472 473 private: 474 MallocMessageBuilder clientId; 475 }; 476 477 class TestBootstrapFactory: public BootstrapFactory<rpc::twoparty::VatId> { 478 public: 479 Capability::Client createFor(rpc::twoparty::VatId::Reader clientId) { 480 called = true; 481 EXPECT_EQ(rpc::twoparty::Side::CLIENT, clientId.getSide()); 482 return kj::heap<TestAuthenticatedBootstrapImpl>(clientId); 483 } 484 485 bool called = false; 486 }; 487 488 kj::AsyncIoProvider::PipeThread runAuthenticatingServer( 489 kj::AsyncIoProvider& ioProvider, BootstrapFactory<rpc::twoparty::VatId>& bootstrapFactory) { 490 return ioProvider.newPipeThread([&bootstrapFactory]( 491 kj::AsyncIoProvider& ioProvider, kj::AsyncIoStream& stream, kj::WaitScope& waitScope) { 492 TwoPartyVatNetwork network(stream, rpc::twoparty::Side::SERVER); 493 auto server = makeRpcServer(network, bootstrapFactory); 494 network.onDisconnect().wait(waitScope); 495 }); 496 } 497 498 TEST(TwoPartyNetwork, BootstrapFactory) { 499 auto ioContext = kj::setupAsyncIo(); 500 TestBootstrapFactory bootstrapFactory; 501 auto serverThread = runAuthenticatingServer(*ioContext.provider, bootstrapFactory); 502 TwoPartyClient client(*serverThread.pipe); 503 auto resp = client.bootstrap().castAs<test::TestAuthenticatedBootstrap<rpc::twoparty::VatId>>() 504 .getCallerIdRequest().send().wait(ioContext.waitScope); 505 EXPECT_EQ(rpc::twoparty::Side::CLIENT, resp.getCaller().getSide()); 506 EXPECT_TRUE(bootstrapFactory.called); 507 } 508 509 // ======================================================================================= 510 511 #if !_WIN32 && !__CYGWIN__ // Windows and Cygwin don't support SCM_RIGHTS. 512 KJ_TEST("send FD over RPC") { 513 auto io = kj::setupAsyncIo(); 514 515 int callCount = 0; 516 int handleCount = 0; 517 TwoPartyServer server(kj::heap<TestMoreStuffImpl>(callCount, handleCount)); 518 auto pipe = io.provider->newCapabilityPipe(); 519 server.accept(kj::mv(pipe.ends[0]), 2); 520 TwoPartyClient client(*pipe.ends[1], 2); 521 522 auto cap = client.bootstrap().castAs<test::TestMoreStuff>(); 523 524 int pipeFds[2]; 525 KJ_SYSCALL(kj::miniposix::pipe(pipeFds)); 526 kj::AutoCloseFd in1(pipeFds[0]); 527 kj::AutoCloseFd out1(pipeFds[1]); 528 KJ_SYSCALL(kj::miniposix::pipe(pipeFds)); 529 kj::AutoCloseFd in2(pipeFds[0]); 530 kj::AutoCloseFd out2(pipeFds[1]); 531 532 capnp::RemotePromise<test::TestMoreStuff::WriteToFdResults> promise = nullptr; 533 { 534 auto req = cap.writeToFdRequest(); 535 536 // Order reversal intentional, just trying to mix things up. 537 req.setFdCap1(kj::heap<TestFdCap>(kj::mv(out2))); 538 req.setFdCap2(kj::heap<TestFdCap>(kj::mv(out1))); 539 540 promise = req.send(); 541 } 542 543 int in3 = KJ_ASSERT_NONNULL(promise.getFdCap3().getFd().wait(io.waitScope)); 544 KJ_EXPECT(io.lowLevelProvider->wrapInputFd(kj::mv(in3))->readAllText().wait(io.waitScope) 545 == "baz"); 546 547 { 548 auto promise2 = kj::mv(promise); // make sure the PipelineHook also goes out of scope 549 auto response = promise2.wait(io.waitScope); 550 KJ_EXPECT(response.getSecondFdPresent()); 551 } 552 553 KJ_EXPECT(io.lowLevelProvider->wrapInputFd(kj::mv(in1))->readAllText().wait(io.waitScope) 554 == "bar"); 555 KJ_EXPECT(io.lowLevelProvider->wrapInputFd(kj::mv(in2))->readAllText().wait(io.waitScope) 556 == "foo"); 557 } 558 559 KJ_TEST("FD per message limit") { 560 auto io = kj::setupAsyncIo(); 561 562 int callCount = 0; 563 int handleCount = 0; 564 TwoPartyServer server(kj::heap<TestMoreStuffImpl>(callCount, handleCount)); 565 auto pipe = io.provider->newCapabilityPipe(); 566 server.accept(kj::mv(pipe.ends[0]), 1); 567 TwoPartyClient client(*pipe.ends[1], 1); 568 569 auto cap = client.bootstrap().castAs<test::TestMoreStuff>(); 570 571 int pipeFds[2]; 572 KJ_SYSCALL(kj::miniposix::pipe(pipeFds)); 573 kj::AutoCloseFd in1(pipeFds[0]); 574 kj::AutoCloseFd out1(pipeFds[1]); 575 KJ_SYSCALL(kj::miniposix::pipe(pipeFds)); 576 kj::AutoCloseFd in2(pipeFds[0]); 577 kj::AutoCloseFd out2(pipeFds[1]); 578 579 capnp::RemotePromise<test::TestMoreStuff::WriteToFdResults> promise = nullptr; 580 { 581 auto req = cap.writeToFdRequest(); 582 583 // Order reversal intentional, just trying to mix things up. 584 req.setFdCap1(kj::heap<TestFdCap>(kj::mv(out2))); 585 req.setFdCap2(kj::heap<TestFdCap>(kj::mv(out1))); 586 587 promise = req.send(); 588 } 589 590 int in3 = KJ_ASSERT_NONNULL(promise.getFdCap3().getFd().wait(io.waitScope)); 591 KJ_EXPECT(io.lowLevelProvider->wrapInputFd(kj::mv(in3))->readAllText().wait(io.waitScope) 592 == "baz"); 593 594 { 595 auto promise2 = kj::mv(promise); // make sure the PipelineHook also goes out of scope 596 auto response = promise2.wait(io.waitScope); 597 KJ_EXPECT(!response.getSecondFdPresent()); 598 } 599 600 KJ_EXPECT(io.lowLevelProvider->wrapInputFd(kj::mv(in1))->readAllText().wait(io.waitScope) 601 == ""); 602 KJ_EXPECT(io.lowLevelProvider->wrapInputFd(kj::mv(in2))->readAllText().wait(io.waitScope) 603 == "foo"); 604 } 605 #endif // !_WIN32 && !__CYGWIN__ 606 607 // ======================================================================================= 608 609 class MockSndbufStream final: public kj::AsyncIoStream { 610 public: 611 MockSndbufStream(kj::Own<AsyncIoStream> inner, size_t& window, size_t& written) 612 : inner(kj::mv(inner)), window(window), written(written) {} 613 614 kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override { 615 return inner->read(buffer, minBytes, maxBytes); 616 } 617 kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { 618 return inner->tryRead(buffer, minBytes, maxBytes); 619 } 620 kj::Maybe<uint64_t> tryGetLength() override { 621 return inner->tryGetLength(); 622 } 623 kj::Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override { 624 return inner->pumpTo(output, amount); 625 } 626 kj::Promise<void> write(const void* buffer, size_t size) override { 627 written += size; 628 return inner->write(buffer, size); 629 } 630 kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override { 631 for (auto& piece: pieces) written += piece.size(); 632 return inner->write(pieces); 633 } 634 kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom( 635 kj::AsyncInputStream& input, uint64_t amount) override { 636 return inner->tryPumpFrom(input, amount); 637 } 638 kj::Promise<void> whenWriteDisconnected() override { return inner->whenWriteDisconnected(); } 639 void shutdownWrite() override { return inner->shutdownWrite(); } 640 void abortRead() override { return inner->abortRead(); } 641 642 void getsockopt(int level, int option, void* value, uint* length) override { 643 if (level == SOL_SOCKET && option == SO_SNDBUF) { 644 KJ_ASSERT(*length == sizeof(int)); 645 *reinterpret_cast<int*>(value) = window; 646 } else { 647 KJ_UNIMPLEMENTED("not implemented for test", level, option); 648 } 649 } 650 651 private: 652 kj::Own<AsyncIoStream> inner; 653 size_t& window; 654 size_t& written; 655 }; 656 657 KJ_TEST("Streaming over RPC") { 658 kj::EventLoop loop; 659 kj::WaitScope waitScope(loop); 660 661 auto pipe = kj::newTwoWayPipe(); 662 663 size_t window = 1024; 664 size_t clientWritten = 0; 665 size_t serverWritten = 0; 666 667 pipe.ends[0] = kj::heap<MockSndbufStream>(kj::mv(pipe.ends[0]), window, clientWritten); 668 pipe.ends[1] = kj::heap<MockSndbufStream>(kj::mv(pipe.ends[1]), window, serverWritten); 669 670 auto ownServer = kj::heap<TestStreamingImpl>(); 671 auto& server = *ownServer; 672 test::TestStreaming::Client serverCap(kj::mv(ownServer)); 673 674 TwoPartyClient tpClient(*pipe.ends[0]); 675 TwoPartyClient tpServer(*pipe.ends[1], serverCap, rpc::twoparty::Side::SERVER); 676 677 auto cap = tpClient.bootstrap().castAs<test::TestStreaming>(); 678 679 // Send stream requests until we can't anymore. 680 kj::Promise<void> promise = kj::READY_NOW; 681 uint count = 0; 682 while (promise.poll(waitScope)) { 683 promise.wait(waitScope); 684 685 auto req = cap.doStreamIRequest(); 686 req.setI(++count); 687 promise = req.send(); 688 } 689 690 // We should have sent... several. 691 KJ_EXPECT(count > 5); 692 693 // Now, cause calls to finish server-side one-at-a-time and check that this causes the client 694 // side to be willing to send more. 695 uint countReceived = 0; 696 for (uint i = 0; i < 50; i++) { 697 KJ_EXPECT(server.iSum == ++countReceived); 698 server.iSum = 0; 699 KJ_ASSERT_NONNULL(server.fulfiller)->fulfill(); 700 701 KJ_ASSERT(promise.poll(waitScope)); 702 promise.wait(waitScope); 703 704 auto req = cap.doStreamIRequest(); 705 req.setI(++count); 706 promise = req.send(); 707 if (promise.poll(waitScope)) { 708 // We'll see a couple of instances where completing one request frees up space to make two 709 // more. This is because the first few requests we made are a little bit larger than the 710 // rest due to being pipelined on the bootstrap. Once the bootstrap resolves, the request 711 // size gets smaller. 712 promise.wait(waitScope); 713 req = cap.doStreamIRequest(); 714 req.setI(++count); 715 promise = req.send(); 716 717 // We definitely shouldn't have freed up stream space for more than two additional requests! 718 KJ_ASSERT(!promise.poll(waitScope)); 719 } 720 } 721 } 722 723 KJ_TEST("Streaming over RPC then unwrap with CapabilitySet") { 724 kj::EventLoop loop; 725 kj::WaitScope waitScope(loop); 726 727 auto pipe = kj::newTwoWayPipe(); 728 729 CapabilityServerSet<test::TestStreaming> capSet; 730 731 auto ownServer = kj::heap<TestStreamingImpl>(); 732 auto& server = *ownServer; 733 auto serverCap = capSet.add(kj::mv(ownServer)); 734 735 auto paf = kj::newPromiseAndFulfiller<test::TestStreaming::Client>(); 736 737 TwoPartyClient tpClient(*pipe.ends[0], serverCap); 738 TwoPartyClient tpServer(*pipe.ends[1], kj::mv(paf.promise), rpc::twoparty::Side::SERVER); 739 740 auto clientCap = tpClient.bootstrap().castAs<test::TestStreaming>(); 741 742 // Send stream requests until we can't anymore. 743 kj::Promise<void> promise = kj::READY_NOW; 744 uint count = 0; 745 while (promise.poll(waitScope)) { 746 promise.wait(waitScope); 747 748 auto req = clientCap.doStreamIRequest(); 749 req.setI(++count); 750 promise = req.send(); 751 } 752 753 // We should have sent... several. 754 KJ_EXPECT(count > 10); 755 756 // Now try to unwrap. 757 auto unwrapPromise = capSet.getLocalServer(clientCap); 758 759 // It won't work yet, obviously, because we haven't resolved the promise. 760 KJ_EXPECT(!unwrapPromise.poll(waitScope)); 761 762 // So do that. 763 paf.fulfiller->fulfill(tpServer.bootstrap().castAs<test::TestStreaming>()); 764 clientCap.whenResolved().wait(waitScope); 765 766 // But the unwrap still doesn't resolve because streaming requests are queued up. 767 KJ_EXPECT(!unwrapPromise.poll(waitScope)); 768 769 // OK, let's resolve a streaming request. 770 KJ_ASSERT_NONNULL(server.fulfiller)->fulfill(); 771 772 // All of our call promises have now completed from the client's perspective. 773 promise.wait(waitScope); 774 775 // But we still can't unwrap, because calls are queued server-side. 776 KJ_EXPECT(!unwrapPromise.poll(waitScope)); 777 778 // Let's even make one more call now. But this is actually a local call since the promise 779 // resolved. 780 { 781 auto req = clientCap.doStreamIRequest(); 782 req.setI(++count); 783 promise = req.send(); 784 } 785 786 // Because it's a local call, it doesn't resolve early. The window is no longer in effect. 787 KJ_EXPECT(!promise.poll(waitScope)); 788 KJ_ASSERT_NONNULL(server.fulfiller)->fulfill(); 789 KJ_EXPECT(!promise.poll(waitScope)); 790 KJ_ASSERT_NONNULL(server.fulfiller)->fulfill(); 791 KJ_EXPECT(!promise.poll(waitScope)); 792 KJ_ASSERT_NONNULL(server.fulfiller)->fulfill(); 793 KJ_EXPECT(!promise.poll(waitScope)); 794 KJ_ASSERT_NONNULL(server.fulfiller)->fulfill(); 795 KJ_EXPECT(!promise.poll(waitScope)); 796 797 // Our unwrap promise is also still not resolved. 798 KJ_EXPECT(!unwrapPromise.poll(waitScope)); 799 800 // Close out stream calls until it does resolve! 801 while (!unwrapPromise.poll(waitScope)) { 802 KJ_ASSERT_NONNULL(server.fulfiller)->fulfill(); 803 } 804 805 // Now we can unwrap! 806 KJ_EXPECT(&KJ_ASSERT_NONNULL(unwrapPromise.wait(waitScope)) == &server); 807 808 // But our last stream call still isn't done. 809 KJ_EXPECT(!promise.poll(waitScope)); 810 811 // Finish it. 812 KJ_ASSERT_NONNULL(server.fulfiller)->fulfill(); 813 promise.wait(waitScope); 814 } 815 816 KJ_TEST("promise cap resolves between starting request and sending it") { 817 kj::EventLoop loop; 818 kj::WaitScope waitScope(loop); 819 auto pipe = kj::newTwoWayPipe(); 820 821 // Client exports TestCallOrderImpl as its bootstrap. 822 TwoPartyClient client(*pipe.ends[0], kj::heap<TestCallOrderImpl>(), rpc::twoparty::Side::CLIENT); 823 824 // Server exports a promise, which will later resolve to loop back to the capability the client 825 // exported. 826 auto paf = kj::newPromiseAndFulfiller<Capability::Client>(); 827 TwoPartyClient server(*pipe.ends[1], kj::mv(paf.promise), rpc::twoparty::Side::SERVER); 828 829 // Create a request but don't send it yet. 830 auto cap = client.bootstrap().castAs<test::TestCallOrder>(); 831 auto req1 = cap.getCallSequenceRequest(); 832 833 // Fulfill the promise now so that the server's bootstrap loops back to the client's bootstrap. 834 paf.fulfiller->fulfill(server.bootstrap()); 835 cap.whenResolved().wait(waitScope); 836 837 // Send the request we created earlier, and also create and send a second request. 838 auto promise1 = req1.send(); 839 auto promise2 = cap.getCallSequenceRequest().send(); 840 841 // They should arrive in order of send()s. 842 auto n1 = promise1.wait(waitScope).getN(); 843 KJ_EXPECT(n1 == 0, n1); 844 auto n2 = promise2.wait(waitScope).getN(); 845 KJ_EXPECT(n2 == 1, n2); 846 } 847 848 KJ_TEST("write error propagates to read error") { 849 kj::EventLoop loop; 850 kj::WaitScope waitScope(loop); 851 auto frontPipe = kj::newTwoWayPipe(); 852 auto backPipe = kj::newTwoWayPipe(); 853 854 TwoPartyClient client(*frontPipe.ends[0]); 855 856 int callCount; 857 TwoPartyClient server(*backPipe.ends[1], kj::heap<TestInterfaceImpl>(callCount), 858 rpc::twoparty::Side::SERVER); 859 860 auto pumpUpTask = frontPipe.ends[1]->pumpTo(*backPipe.ends[0]); 861 auto pumpDownTask = backPipe.ends[0]->pumpTo(*frontPipe.ends[1]); 862 863 auto cap = client.bootstrap().castAs<test::TestInterface>(); 864 865 // Make sure the connections work. 866 { 867 auto req = cap.fooRequest(); 868 req.setI(123); 869 req.setJ(true); 870 auto resp = req.send().wait(waitScope); 871 EXPECT_EQ("foo", resp.getX()); 872 } 873 874 // Disconnect upstream task in such a way that future writes on the client will fail, but the 875 // server doesn't notice the disconnect and so won't react. 876 pumpUpTask = nullptr; 877 frontPipe.ends[1]->abortRead(); // causes write() on ends[0] to fail in the future 878 879 { 880 auto req = cap.fooRequest(); 881 req.setI(123); 882 req.setJ(true); 883 auto promise = req.send().then([](auto) { 884 KJ_FAIL_EXPECT("expected exception"); 885 }, [](kj::Exception&& e) { 886 KJ_ASSERT(e.getDescription() == "abortRead() has been called"); 887 }); 888 889 KJ_ASSERT(promise.poll(waitScope)); 890 promise.wait(waitScope); 891 } 892 } 893 894 } // namespace 895 } // namespace _ 896 } // namespace capnp