rpc.c++ (131328B)
1 // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #include "rpc.h" 23 #include "message.h" 24 #include <kj/debug.h> 25 #include <kj/vector.h> 26 #include <kj/async.h> 27 #include <kj/one-of.h> 28 #include <kj/function.h> 29 #include <functional> // std::greater 30 #include <unordered_map> 31 #include <map> 32 #include <queue> 33 #include <capnp/rpc.capnp.h> 34 #include <kj/io.h> 35 #include <kj/map.h> 36 37 namespace capnp { 38 namespace _ { // private 39 40 namespace { 41 42 template <typename T> 43 inline constexpr uint messageSizeHint() { 44 return 1 + sizeInWords<rpc::Message>() + sizeInWords<T>(); 45 } 46 template <> 47 inline constexpr uint messageSizeHint<void>() { 48 return 1 + sizeInWords<rpc::Message>(); 49 } 50 51 constexpr const uint MESSAGE_TARGET_SIZE_HINT = sizeInWords<rpc::MessageTarget>() + 52 sizeInWords<rpc::PromisedAnswer>() + 16; // +16 for ops; hope that's enough 53 54 constexpr const uint CAP_DESCRIPTOR_SIZE_HINT = sizeInWords<rpc::CapDescriptor>() + 55 sizeInWords<rpc::PromisedAnswer>(); 56 57 constexpr const uint64_t MAX_SIZE_HINT = 1 << 20; 58 59 uint copySizeHint(MessageSize size) { 60 uint64_t sizeHint = size.wordCount + size.capCount * CAP_DESCRIPTOR_SIZE_HINT 61 // if capCount > 0, the cap descriptor list has a 1-word tag 62 + (size.capCount > 0); 63 return kj::min(MAX_SIZE_HINT, sizeHint); 64 } 65 66 uint firstSegmentSize(kj::Maybe<MessageSize> sizeHint, uint additional) { 67 KJ_IF_MAYBE(s, sizeHint) { 68 return copySizeHint(*s) + additional; 69 } else { 70 return 0; 71 } 72 } 73 74 kj::Maybe<kj::Array<PipelineOp>> toPipelineOps(List<rpc::PromisedAnswer::Op>::Reader ops) { 75 auto result = kj::heapArrayBuilder<PipelineOp>(ops.size()); 76 for (auto opReader: ops) { 77 PipelineOp op; 78 switch (opReader.which()) { 79 case rpc::PromisedAnswer::Op::NOOP: 80 op.type = PipelineOp::NOOP; 81 break; 82 case rpc::PromisedAnswer::Op::GET_POINTER_FIELD: 83 op.type = PipelineOp::GET_POINTER_FIELD; 84 op.pointerIndex = opReader.getGetPointerField(); 85 break; 86 default: 87 KJ_FAIL_REQUIRE("Unsupported pipeline op.", (uint)opReader.which()) { 88 return nullptr; 89 } 90 } 91 result.add(op); 92 } 93 return result.finish(); 94 } 95 96 Orphan<List<rpc::PromisedAnswer::Op>> fromPipelineOps( 97 Orphanage orphanage, kj::ArrayPtr<const PipelineOp> ops) { 98 auto result = orphanage.newOrphan<List<rpc::PromisedAnswer::Op>>(ops.size()); 99 auto builder = result.get(); 100 for (uint i: kj::indices(ops)) { 101 rpc::PromisedAnswer::Op::Builder opBuilder = builder[i]; 102 switch (ops[i].type) { 103 case PipelineOp::NOOP: 104 opBuilder.setNoop(); 105 break; 106 case PipelineOp::GET_POINTER_FIELD: 107 opBuilder.setGetPointerField(ops[i].pointerIndex); 108 break; 109 } 110 } 111 return result; 112 } 113 114 kj::Exception toException(const rpc::Exception::Reader& exception) { 115 auto reason = [&]() { 116 if (exception.getReason().startsWith("remote exception: ")) { 117 return kj::str(exception.getReason()); 118 } else { 119 return kj::str("remote exception: ", exception.getReason()); 120 } 121 }(); 122 123 kj::Exception result(static_cast<kj::Exception::Type>(exception.getType()), 124 "(remote)", 0, kj::mv(reason)); 125 if (exception.hasTrace()) { 126 result.setRemoteTrace(kj::str(exception.getTrace())); 127 } 128 return result; 129 } 130 131 void fromException(const kj::Exception& exception, rpc::Exception::Builder builder, 132 kj::Maybe<kj::Function<kj::String(const kj::Exception&)>&> traceEncoder) { 133 kj::StringPtr description = exception.getDescription(); 134 135 // Include context, if any. 136 kj::Vector<kj::String> contextLines; 137 for (auto context = exception.getContext();;) { 138 KJ_IF_MAYBE(c, context) { 139 contextLines.add(kj::str("context: ", c->file, ": ", c->line, ": ", c->description)); 140 context = c->next; 141 } else { 142 break; 143 } 144 } 145 kj::String scratch; 146 if (contextLines.size() > 0) { 147 scratch = kj::str(description, '\n', kj::strArray(contextLines, "\n")); 148 description = scratch; 149 } 150 151 builder.setReason(description); 152 builder.setType(static_cast<rpc::Exception::Type>(exception.getType())); 153 154 KJ_IF_MAYBE(t, traceEncoder) { 155 builder.setTrace((*t)(exception)); 156 } 157 158 if (exception.getType() == kj::Exception::Type::FAILED && 159 !exception.getDescription().startsWith("remote exception:")) { 160 KJ_LOG(INFO, "returning failure over rpc", exception); 161 } 162 } 163 164 uint exceptionSizeHint(const kj::Exception& exception) { 165 return sizeInWords<rpc::Exception>() + exception.getDescription().size() / sizeof(word) + 1; 166 } 167 168 // ======================================================================================= 169 170 template <typename Id, typename T> 171 class ExportTable { 172 // Table mapping integers to T, where the integers are chosen locally. 173 174 public: 175 kj::Maybe<T&> find(Id id) { 176 if (id < slots.size() && slots[id] != nullptr) { 177 return slots[id]; 178 } else { 179 return nullptr; 180 } 181 } 182 183 T erase(Id id, T& entry) { 184 // Remove an entry from the table and return it. We return it so that the caller can be 185 // careful to release it (possibly invoking arbitrary destructors) at a time that makes sense. 186 // `entry` is a reference to the entry being released -- we require this in order to prove 187 // that the caller has already done a find() to check that this entry exists. We can't check 188 // ourselves because the caller may have nullified the entry in the meantime. 189 KJ_DREQUIRE(&entry == &slots[id]); 190 T toRelease = kj::mv(slots[id]); 191 slots[id] = T(); 192 freeIds.push(id); 193 return toRelease; 194 } 195 196 T& next(Id& id) { 197 if (freeIds.empty()) { 198 id = slots.size(); 199 return slots.add(); 200 } else { 201 id = freeIds.top(); 202 freeIds.pop(); 203 return slots[id]; 204 } 205 } 206 207 template <typename Func> 208 void forEach(Func&& func) { 209 for (Id i = 0; i < slots.size(); i++) { 210 if (slots[i] != nullptr) { 211 func(i, slots[i]); 212 } 213 } 214 } 215 216 private: 217 kj::Vector<T> slots; 218 std::priority_queue<Id, std::vector<Id>, std::greater<Id>> freeIds; 219 }; 220 221 template <typename Id, typename T> 222 class ImportTable { 223 // Table mapping integers to T, where the integers are chosen remotely. 224 225 public: 226 T& operator[](Id id) { 227 if (id < kj::size(low)) { 228 return low[id]; 229 } else { 230 return high[id]; 231 } 232 } 233 234 kj::Maybe<T&> find(Id id) { 235 if (id < kj::size(low)) { 236 return low[id]; 237 } else { 238 auto iter = high.find(id); 239 if (iter == high.end()) { 240 return nullptr; 241 } else { 242 return iter->second; 243 } 244 } 245 } 246 247 T erase(Id id) { 248 // Remove an entry from the table and return it. We return it so that the caller can be 249 // careful to release it (possibly invoking arbitrary destructors) at a time that makes sense. 250 if (id < kj::size(low)) { 251 T toRelease = kj::mv(low[id]); 252 low[id] = T(); 253 return toRelease; 254 } else { 255 T toRelease = kj::mv(high[id]); 256 high.erase(id); 257 return toRelease; 258 } 259 } 260 261 template <typename Func> 262 void forEach(Func&& func) { 263 for (Id i: kj::indices(low)) { 264 func(i, low[i]); 265 } 266 for (auto& entry: high) { 267 func(entry.first, entry.second); 268 } 269 } 270 271 private: 272 T low[16]; 273 std::unordered_map<Id, T> high; 274 }; 275 276 // ======================================================================================= 277 278 class RpcConnectionState final: public kj::TaskSet::ErrorHandler, public kj::Refcounted { 279 public: 280 struct DisconnectInfo { 281 kj::Promise<void> shutdownPromise; 282 // Task which is working on sending an abort message and cleanly ending the connection. 283 }; 284 285 RpcConnectionState(BootstrapFactoryBase& bootstrapFactory, 286 kj::Maybe<SturdyRefRestorerBase&> restorer, 287 kj::Own<VatNetworkBase::Connection>&& connectionParam, 288 kj::Own<kj::PromiseFulfiller<DisconnectInfo>>&& disconnectFulfiller, 289 size_t flowLimit, 290 kj::Maybe<kj::Function<kj::String(const kj::Exception&)>&> traceEncoder) 291 : bootstrapFactory(bootstrapFactory), 292 restorer(restorer), disconnectFulfiller(kj::mv(disconnectFulfiller)), flowLimit(flowLimit), 293 traceEncoder(traceEncoder), tasks(*this) { 294 connection.init<Connected>(kj::mv(connectionParam)); 295 tasks.add(messageLoop()); 296 } 297 298 kj::Own<ClientHook> restore(AnyPointer::Reader objectId) { 299 if (connection.is<Disconnected>()) { 300 return newBrokenCap(kj::cp(connection.get<Disconnected>())); 301 } 302 303 QuestionId questionId; 304 auto& question = questions.next(questionId); 305 306 question.isAwaitingReturn = true; 307 308 auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>(); 309 310 auto questionRef = kj::refcounted<QuestionRef>(*this, questionId, kj::mv(paf.fulfiller)); 311 question.selfRef = *questionRef; 312 313 paf.promise = paf.promise.attach(kj::addRef(*questionRef)); 314 315 { 316 auto message = connection.get<Connected>()->newOutgoingMessage( 317 objectId.targetSize().wordCount + messageSizeHint<rpc::Bootstrap>()); 318 319 auto builder = message->getBody().initAs<rpc::Message>().initBootstrap(); 320 builder.setQuestionId(questionId); 321 builder.getDeprecatedObjectId().set(objectId); 322 323 message->send(); 324 } 325 326 auto pipeline = kj::refcounted<RpcPipeline>(*this, kj::mv(questionRef), kj::mv(paf.promise)); 327 328 return pipeline->getPipelinedCap(kj::Array<const PipelineOp>(nullptr)); 329 } 330 331 void taskFailed(kj::Exception&& exception) override { 332 disconnect(kj::mv(exception)); 333 } 334 335 void disconnect(kj::Exception&& exception) { 336 // After disconnect(), the RpcSystem could be destroyed, making `traceEncoder` a dangling 337 // reference, so null it out before we return from here. We don't need it anymore once 338 // disconnected anyway. 339 KJ_DEFER(traceEncoder = nullptr); 340 341 if (!connection.is<Connected>()) { 342 // Already disconnected. 343 return; 344 } 345 346 kj::Exception networkException(kj::Exception::Type::DISCONNECTED, 347 exception.getFile(), exception.getLine(), kj::heapString(exception.getDescription())); 348 349 // Don't throw away the stack trace. 350 if (exception.getRemoteTrace() != nullptr) { 351 networkException.setRemoteTrace(kj::str(exception.getRemoteTrace())); 352 } 353 for (void* addr: exception.getStackTrace()) { 354 networkException.addTrace(addr); 355 } 356 // If your stack trace points here, it means that the exception became the reason that the 357 // RPC connection was disconnected. The exception was then thrown by all in-flight calls and 358 // all future calls on this connection. 359 networkException.addTraceHere(); 360 361 KJ_IF_MAYBE(newException, kj::runCatchingExceptions([&]() { 362 // Carefully pull all the objects out of the tables prior to releasing them because their 363 // destructors could come back and mess with the tables. 364 kj::Vector<kj::Own<PipelineHook>> pipelinesToRelease; 365 kj::Vector<kj::Own<ClientHook>> clientsToRelease; 366 kj::Vector<kj::Promise<kj::Own<RpcResponse>>> tailCallsToRelease; 367 kj::Vector<kj::Promise<void>> resolveOpsToRelease; 368 369 // All current questions complete with exceptions. 370 questions.forEach([&](QuestionId id, Question& question) { 371 KJ_IF_MAYBE(questionRef, question.selfRef) { 372 // QuestionRef still present. 373 questionRef->reject(kj::cp(networkException)); 374 } 375 }); 376 377 answers.forEach([&](AnswerId id, Answer& answer) { 378 KJ_IF_MAYBE(p, answer.pipeline) { 379 pipelinesToRelease.add(kj::mv(*p)); 380 } 381 382 KJ_IF_MAYBE(promise, answer.redirectedResults) { 383 tailCallsToRelease.add(kj::mv(*promise)); 384 } 385 386 KJ_IF_MAYBE(context, answer.callContext) { 387 context->requestCancel(); 388 } 389 }); 390 391 exports.forEach([&](ExportId id, Export& exp) { 392 clientsToRelease.add(kj::mv(exp.clientHook)); 393 KJ_IF_MAYBE(op, exp.resolveOp) { 394 resolveOpsToRelease.add(kj::mv(*op)); 395 } 396 exp = Export(); 397 }); 398 399 imports.forEach([&](ImportId id, Import& import) { 400 KJ_IF_MAYBE(f, import.promiseFulfiller) { 401 f->get()->reject(kj::cp(networkException)); 402 } 403 }); 404 405 embargoes.forEach([&](EmbargoId id, Embargo& embargo) { 406 KJ_IF_MAYBE(f, embargo.fulfiller) { 407 f->get()->reject(kj::cp(networkException)); 408 } 409 }); 410 })) { 411 // Some destructor must have thrown an exception. There is no appropriate place to report 412 // these errors. 413 KJ_LOG(ERROR, "Uncaught exception when destroying capabilities dropped by disconnect.", 414 *newException); 415 } 416 417 // Send an abort message, but ignore failure. 418 kj::runCatchingExceptions([&]() { 419 auto message = connection.get<Connected>()->newOutgoingMessage( 420 messageSizeHint<void>() + exceptionSizeHint(exception)); 421 fromException(exception, message->getBody().getAs<rpc::Message>().initAbort()); 422 message->send(); 423 }); 424 425 // Indicate disconnect. 426 auto shutdownPromise = connection.get<Connected>()->shutdown() 427 .attach(kj::mv(connection.get<Connected>())) 428 .then([]() -> kj::Promise<void> { return kj::READY_NOW; }, 429 [origException = kj::mv(exception)](kj::Exception&& e) -> kj::Promise<void> { 430 // Don't report disconnects as an error. 431 if (e.getType() == kj::Exception::Type::DISCONNECTED) { 432 return kj::READY_NOW; 433 } 434 // If the error is just what was passed in to disconnect(), don't report it back out 435 // since it shouldn't be anything the caller doesn't already know about. 436 if (e.getType() == origException.getType() && 437 e.getDescription() == origException.getDescription()) { 438 return kj::READY_NOW; 439 } 440 return kj::mv(e); 441 }); 442 disconnectFulfiller->fulfill(DisconnectInfo { kj::mv(shutdownPromise) }); 443 connection.init<Disconnected>(kj::mv(networkException)); 444 canceler.cancel(networkException); 445 } 446 447 void setFlowLimit(size_t words) { 448 flowLimit = words; 449 maybeUnblockFlow(); 450 } 451 452 private: 453 class RpcClient; 454 class ImportClient; 455 class PromiseClient; 456 class QuestionRef; 457 class RpcPipeline; 458 class RpcCallContext; 459 class RpcResponse; 460 461 // ======================================================================================= 462 // The Four Tables entry types 463 // 464 // We have to define these before we can define the class's fields. 465 466 typedef uint32_t QuestionId; 467 typedef QuestionId AnswerId; 468 typedef uint32_t ExportId; 469 typedef ExportId ImportId; 470 // See equivalent definitions in rpc.capnp. 471 // 472 // We always use the type that refers to the local table of the same name. So e.g. although 473 // QuestionId and AnswerId are the same type, we use QuestionId when referring to an entry in 474 // the local question table (which corresponds to the peer's answer table) and use AnswerId 475 // to refer to an entry in our answer table (which corresponds to the peer's question table). 476 // Since all messages in the RPC protocol are defined from the sender's point of view, this 477 // means that any time we read an ID from a received message, its type should invert. 478 // TODO(cleanup): Perhaps we could enforce that in a type-safe way? Hmm... 479 480 struct Question { 481 kj::Array<ExportId> paramExports; 482 // List of exports that were sent in the request. If the response has `releaseParamCaps` these 483 // will need to be released. 484 485 kj::Maybe<QuestionRef&> selfRef; 486 // The local QuestionRef, set to nullptr when it is destroyed, which is also when `Finish` is 487 // sent. 488 489 bool isAwaitingReturn = false; 490 // True from when `Call` is sent until `Return` is received. 491 492 bool isTailCall = false; 493 // Is this a tail call? If so, we don't expect to receive results in the `Return`. 494 495 bool skipFinish = false; 496 // If true, don't send a Finish message. 497 498 inline bool operator==(decltype(nullptr)) const { 499 return !isAwaitingReturn && selfRef == nullptr; 500 } 501 inline bool operator!=(decltype(nullptr)) const { return !operator==(nullptr); } 502 }; 503 504 struct Answer { 505 Answer() = default; 506 Answer(const Answer&) = delete; 507 Answer(Answer&&) = default; 508 Answer& operator=(Answer&&) = default; 509 // If we don't explicitly write all this, we get some stupid error deep in STL. 510 511 bool active = false; 512 // True from the point when the Call message is received to the point when both the `Finish` 513 // message has been received and the `Return` has been sent. 514 515 kj::Maybe<kj::Own<PipelineHook>> pipeline; 516 // Send pipelined calls here. Becomes null as soon as a `Finish` is received. 517 518 kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> redirectedResults; 519 // For locally-redirected calls (Call.sendResultsTo.yourself), this is a promise for the call 520 // result, to be picked up by a subsequent `Return`. 521 522 kj::Maybe<RpcCallContext&> callContext; 523 // The call context, if it's still active. Becomes null when the `Return` message is sent. 524 // This object, if non-null, is owned by `asyncOp`. 525 526 kj::Array<ExportId> resultExports; 527 // List of exports that were sent in the results. If the finish has `releaseResultCaps` these 528 // will need to be released. 529 }; 530 531 struct Export { 532 uint refcount = 0; 533 // When this reaches 0, drop `clientHook` and free this export. 534 535 kj::Own<ClientHook> clientHook; 536 537 kj::Maybe<kj::Promise<void>> resolveOp = nullptr; 538 // If this export is a promise (not a settled capability), the `resolveOp` represents the 539 // ongoing operation to wait for that promise to resolve and then send a `Resolve` message. 540 541 inline bool operator==(decltype(nullptr)) const { return refcount == 0; } 542 inline bool operator!=(decltype(nullptr)) const { return refcount != 0; } 543 }; 544 545 struct Import { 546 Import() = default; 547 Import(const Import&) = delete; 548 Import(Import&&) = default; 549 Import& operator=(Import&&) = default; 550 // If we don't explicitly write all this, we get some stupid error deep in STL. 551 552 kj::Maybe<ImportClient&> importClient; 553 // Becomes null when the import is destroyed. 554 555 kj::Maybe<RpcClient&> appClient; 556 // Either a copy of importClient, or, in the case of promises, the wrapping PromiseClient. 557 // Becomes null when it is discarded *or* when the import is destroyed (e.g. the promise is 558 // resolved and the import is no longer needed). 559 560 kj::Maybe<kj::Own<kj::PromiseFulfiller<kj::Own<ClientHook>>>> promiseFulfiller; 561 // If non-null, the import is a promise. 562 }; 563 564 typedef uint32_t EmbargoId; 565 566 struct Embargo { 567 // For handling the `Disembargo` message when looping back to self. 568 569 kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> fulfiller; 570 // Fulfill this when the Disembargo arrives. 571 572 inline bool operator==(decltype(nullptr)) const { return fulfiller == nullptr; } 573 inline bool operator!=(decltype(nullptr)) const { return fulfiller != nullptr; } 574 }; 575 576 // ======================================================================================= 577 // OK, now we can define RpcConnectionState's member data. 578 579 BootstrapFactoryBase& bootstrapFactory; 580 kj::Maybe<SturdyRefRestorerBase&> restorer; 581 582 typedef kj::Own<VatNetworkBase::Connection> Connected; 583 typedef kj::Exception Disconnected; 584 kj::OneOf<Connected, Disconnected> connection; 585 // Once the connection has failed, we drop it and replace it with an exception, which will be 586 // thrown from all further calls. 587 588 kj::Canceler canceler; 589 // Will be canceled if and when `connection` is changed from `Connected` to `Disconnected`. 590 // TODO(cleanup): `Connected` should be a struct that contains the connection and the Canceler, 591 // but that's more refactoring than I want to do right now. 592 593 kj::Own<kj::PromiseFulfiller<DisconnectInfo>> disconnectFulfiller; 594 595 ExportTable<ExportId, Export> exports; 596 ExportTable<QuestionId, Question> questions; 597 ImportTable<AnswerId, Answer> answers; 598 ImportTable<ImportId, Import> imports; 599 // The Four Tables! 600 // The order of the tables is important for correct destruction. 601 602 std::unordered_map<ClientHook*, ExportId> exportsByCap; 603 // Maps already-exported ClientHook objects to their ID in the export table. 604 605 ExportTable<EmbargoId, Embargo> embargoes; 606 // There are only four tables. This definitely isn't a fifth table. I don't know what you're 607 // talking about. 608 609 size_t flowLimit; 610 size_t callWordsInFlight = 0; 611 612 kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> flowWaiter; 613 // If non-null, we're currently blocking incoming messages waiting for callWordsInFlight to drop 614 // below flowLimit. Fulfill this to un-block. 615 616 kj::Maybe<kj::Function<kj::String(const kj::Exception&)>&> traceEncoder; 617 618 kj::TaskSet tasks; 619 620 // ===================================================================================== 621 // ClientHook implementations 622 623 class RpcClient: public ClientHook, public kj::Refcounted { 624 public: 625 RpcClient(RpcConnectionState& connectionState) 626 : connectionState(kj::addRef(connectionState)) {} 627 628 virtual kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor, 629 kj::Vector<int>& fds) = 0; 630 // Writes a CapDescriptor referencing this client. The CapDescriptor must be sent as part of 631 // the very next message sent on the connection, as it may become invalid if other things 632 // happen. 633 // 634 // If writing the descriptor adds a new export to the export table, or increments the refcount 635 // on an existing one, then the ID is returned and the caller is responsible for removing it 636 // later. 637 638 virtual kj::Maybe<kj::Own<ClientHook>> writeTarget( 639 rpc::MessageTarget::Builder target) = 0; 640 // Writes the appropriate call target for calls to this capability and returns null. 641 // 642 // - OR - 643 // 644 // If calls have been redirected to some other local ClientHook, returns that hook instead. 645 // This can happen if the capability represents a promise that has been resolved. 646 647 virtual kj::Own<ClientHook> getInnermostClient() = 0; 648 // If this client just wraps some other client -- even if it is only *temporarily* wrapping 649 // that other client -- return a reference to the other client, transitively. Otherwise, 650 // return a new reference to *this. 651 652 virtual void adoptFlowController(kj::Own<RpcFlowController> flowController) { 653 // Called when a PromiseClient resolves to another RpcClient. If streaming calls were 654 // outstanding on the old client, we'd like to keep using the same FlowController on the new 655 // client, so as to keep the flow steady. 656 657 if (this->flowController == nullptr) { 658 // We don't have any existing flowController so we can adopt this one, yay! 659 this->flowController = kj::mv(flowController); 660 } else { 661 // Apparently, there is an existing flowController. This is an unusual scenario: Apparently 662 // we had two stream capabilities, we were streaming to both of them, and they later 663 // resolved to the same capability. This probably never happens because streaming use cases 664 // normally call for there to be only one client. But, it's certainly possible, and we need 665 // to handle it. We'll do the conservative thing and just make sure that all the calls 666 // finish. This may mean we'll over-buffer temporarily; oh well. 667 connectionState->tasks.add(flowController->waitAllAcked().attach(kj::mv(flowController))); 668 } 669 } 670 671 // implements ClientHook ----------------------------------------- 672 673 Request<AnyPointer, AnyPointer> newCall( 674 uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override { 675 return newCallNoIntercept(interfaceId, methodId, sizeHint); 676 } 677 678 Request<AnyPointer, AnyPointer> newCallNoIntercept( 679 uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) { 680 if (!connectionState->connection.is<Connected>()) { 681 return newBrokenRequest(kj::cp(connectionState->connection.get<Disconnected>()), sizeHint); 682 } 683 684 auto request = kj::heap<RpcRequest>( 685 *connectionState, *connectionState->connection.get<Connected>(), 686 sizeHint, kj::addRef(*this)); 687 auto callBuilder = request->getCall(); 688 689 callBuilder.setInterfaceId(interfaceId); 690 callBuilder.setMethodId(methodId); 691 692 auto root = request->getRoot(); 693 return Request<AnyPointer, AnyPointer>(root, kj::mv(request)); 694 } 695 696 VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId, 697 kj::Own<CallContextHook>&& context) override { 698 return callNoIntercept(interfaceId, methodId, kj::mv(context)); 699 } 700 701 VoidPromiseAndPipeline callNoIntercept(uint64_t interfaceId, uint16_t methodId, 702 kj::Own<CallContextHook>&& context) { 703 // Implement call() by copying params and results messages. 704 705 auto params = context->getParams(); 706 auto request = newCallNoIntercept(interfaceId, methodId, params.targetSize()); 707 708 request.set(params); 709 context->releaseParams(); 710 711 // We can and should propagate cancellation. 712 context->allowCancellation(); 713 714 return context->directTailCall(RequestHook::from(kj::mv(request))); 715 } 716 717 kj::Own<ClientHook> addRef() override { 718 return kj::addRef(*this); 719 } 720 const void* getBrand() override { 721 return connectionState.get(); 722 } 723 724 kj::Own<RpcConnectionState> connectionState; 725 726 kj::Maybe<kj::Own<RpcFlowController>> flowController; 727 // Becomes non-null the first time a streaming call is made on this capability. 728 }; 729 730 class ImportClient final: public RpcClient { 731 // A ClientHook that wraps an entry in the import table. 732 733 public: 734 ImportClient(RpcConnectionState& connectionState, ImportId importId, 735 kj::Maybe<kj::AutoCloseFd> fd) 736 : RpcClient(connectionState), importId(importId), fd(kj::mv(fd)) {} 737 738 ~ImportClient() noexcept(false) { 739 unwindDetector.catchExceptionsIfUnwinding([&]() { 740 // Remove self from the import table, if the table is still pointing at us. 741 KJ_IF_MAYBE(import, connectionState->imports.find(importId)) { 742 KJ_IF_MAYBE(i, import->importClient) { 743 if (i == this) { 744 connectionState->imports.erase(importId); 745 } 746 } 747 } 748 749 // Send a message releasing our remote references. 750 if (remoteRefcount > 0 && connectionState->connection.is<Connected>()) { 751 auto message = connectionState->connection.get<Connected>()->newOutgoingMessage( 752 messageSizeHint<rpc::Release>()); 753 rpc::Release::Builder builder = message->getBody().initAs<rpc::Message>().initRelease(); 754 builder.setId(importId); 755 builder.setReferenceCount(remoteRefcount); 756 message->send(); 757 } 758 }); 759 } 760 761 void setFdIfMissing(kj::Maybe<kj::AutoCloseFd> newFd) { 762 if (fd == nullptr) { 763 fd = kj::mv(newFd); 764 } 765 } 766 767 void addRemoteRef() { 768 // Add a new RemoteRef and return a new ref to this client representing it. 769 ++remoteRefcount; 770 } 771 772 kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor, 773 kj::Vector<int>& fds) override { 774 descriptor.setReceiverHosted(importId); 775 return nullptr; 776 } 777 778 kj::Maybe<kj::Own<ClientHook>> writeTarget( 779 rpc::MessageTarget::Builder target) override { 780 target.setImportedCap(importId); 781 return nullptr; 782 } 783 784 kj::Own<ClientHook> getInnermostClient() override { 785 return kj::addRef(*this); 786 } 787 788 // implements ClientHook ----------------------------------------- 789 790 kj::Maybe<ClientHook&> getResolved() override { 791 return nullptr; 792 } 793 794 kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override { 795 return nullptr; 796 } 797 798 kj::Maybe<int> getFd() override { 799 return fd.map([](auto& f) { return f.get(); }); 800 } 801 802 private: 803 ImportId importId; 804 kj::Maybe<kj::AutoCloseFd> fd; 805 806 uint remoteRefcount = 0; 807 // Number of times we've received this import from the peer. 808 809 kj::UnwindDetector unwindDetector; 810 }; 811 812 class PipelineClient final: public RpcClient { 813 // A ClientHook representing a pipelined promise. Always wrapped in PromiseClient. 814 815 public: 816 PipelineClient(RpcConnectionState& connectionState, 817 kj::Own<QuestionRef>&& questionRef, 818 kj::Array<PipelineOp>&& ops) 819 : RpcClient(connectionState), questionRef(kj::mv(questionRef)), ops(kj::mv(ops)) {} 820 821 kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor, 822 kj::Vector<int>& fds) override { 823 auto promisedAnswer = descriptor.initReceiverAnswer(); 824 promisedAnswer.setQuestionId(questionRef->getId()); 825 promisedAnswer.adoptTransform(fromPipelineOps( 826 Orphanage::getForMessageContaining(descriptor), ops)); 827 return nullptr; 828 } 829 830 kj::Maybe<kj::Own<ClientHook>> writeTarget( 831 rpc::MessageTarget::Builder target) override { 832 auto builder = target.initPromisedAnswer(); 833 builder.setQuestionId(questionRef->getId()); 834 builder.adoptTransform(fromPipelineOps(Orphanage::getForMessageContaining(builder), ops)); 835 return nullptr; 836 } 837 838 kj::Own<ClientHook> getInnermostClient() override { 839 return kj::addRef(*this); 840 } 841 842 // implements ClientHook ----------------------------------------- 843 844 kj::Maybe<ClientHook&> getResolved() override { 845 return nullptr; 846 } 847 848 kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override { 849 return nullptr; 850 } 851 852 kj::Maybe<int> getFd() override { 853 return nullptr; 854 } 855 856 private: 857 kj::Own<QuestionRef> questionRef; 858 kj::Array<PipelineOp> ops; 859 }; 860 861 class PromiseClient final: public RpcClient { 862 // A ClientHook that initially wraps one client (in practice, an ImportClient or a 863 // PipelineClient) and then, later on, redirects to some other client. 864 865 public: 866 PromiseClient(RpcConnectionState& connectionState, 867 kj::Own<RpcClient> initial, 868 kj::Promise<kj::Own<ClientHook>> eventual, 869 kj::Maybe<ImportId> importId) 870 : RpcClient(connectionState), 871 cap(kj::mv(initial)), 872 importId(importId), 873 fork(eventual.then( 874 [this](kj::Own<ClientHook>&& resolution) { 875 return resolve(kj::mv(resolution)); 876 }, [this](kj::Exception&& exception) { 877 return resolve(newBrokenCap(kj::mv(exception))); 878 }).catch_([&](kj::Exception&& e) { 879 // Make any exceptions thrown from resolve() go to the connection's TaskSet which 880 // will cause the connection to be terminated. 881 connectionState.tasks.add(kj::cp(e)); 882 return newBrokenCap(kj::mv(e)); 883 }).fork()) {} 884 // Create a client that starts out forwarding all calls to `initial` but, once `eventual` 885 // resolves, will forward there instead. 886 887 ~PromiseClient() noexcept(false) { 888 KJ_IF_MAYBE(id, importId) { 889 // This object is representing an import promise. That means the import table may still 890 // contain a pointer back to it. Remove that pointer. Note that we have to verify that 891 // the import still exists and the pointer still points back to this object because this 892 // object may actually outlive the import. 893 KJ_IF_MAYBE(import, connectionState->imports.find(*id)) { 894 KJ_IF_MAYBE(c, import->appClient) { 895 if (c == this) { 896 import->appClient = nullptr; 897 } 898 } 899 } 900 } 901 } 902 903 kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor, 904 kj::Vector<int>& fds) override { 905 receivedCall = true; 906 return connectionState->writeDescriptor(*cap, descriptor, fds); 907 } 908 909 kj::Maybe<kj::Own<ClientHook>> writeTarget( 910 rpc::MessageTarget::Builder target) override { 911 receivedCall = true; 912 return connectionState->writeTarget(*cap, target); 913 } 914 915 kj::Own<ClientHook> getInnermostClient() override { 916 receivedCall = true; 917 return connectionState->getInnermostClient(*cap); 918 } 919 920 void adoptFlowController(kj::Own<RpcFlowController> flowController) override { 921 if (cap->getBrand() == connectionState.get()) { 922 // Pass the flow controller on to our inner cap. 923 kj::downcast<RpcClient>(*cap).adoptFlowController(kj::mv(flowController)); 924 } else { 925 // We resolved to a capability that isn't another RPC capability. We should simply make 926 // sure that all the calls complete. 927 connectionState->tasks.add(flowController->waitAllAcked().attach(kj::mv(flowController))); 928 } 929 } 930 931 // implements ClientHook ----------------------------------------- 932 933 Request<AnyPointer, AnyPointer> newCall( 934 uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override { 935 receivedCall = true; 936 937 // IMPORTANT: We must call our superclass's version of newCall(), NOT cap->newCall(), because 938 // the Request object we create needs to check at send() time whether the promise has 939 // resolved and, if so, redirect to the new target. 940 return RpcClient::newCall(interfaceId, methodId, sizeHint); 941 } 942 943 VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId, 944 kj::Own<CallContextHook>&& context) override { 945 receivedCall = true; 946 return cap->call(interfaceId, methodId, kj::mv(context)); 947 } 948 949 kj::Maybe<ClientHook&> getResolved() override { 950 if (isResolved()) { 951 return *cap; 952 } else { 953 return nullptr; 954 } 955 } 956 957 kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override { 958 return fork.addBranch(); 959 } 960 961 kj::Maybe<int> getFd() override { 962 if (isResolved()) { 963 return cap->getFd(); 964 } else { 965 // In theory, before resolution, the ImportClient for the promise could have an FD 966 // attached, if the promise itself was presented with an attached FD. However, we can't 967 // really return that one here because it may be closed when we get the Resolve message 968 // later. In theory we could have the PromiseClient itself take ownership of an FD that 969 // arrived attached to a promise cap, but the use case for that is questionable. I'm 970 // keeping it simple for now. 971 return nullptr; 972 } 973 } 974 975 private: 976 kj::Own<ClientHook> cap; 977 978 kj::Maybe<ImportId> importId; 979 kj::ForkedPromise<kj::Own<ClientHook>> fork; 980 981 bool receivedCall = false; 982 983 enum { 984 UNRESOLVED, 985 // Not resolved at all yet. 986 987 REMOTE, 988 // Remote promise resolved to a remote settled capability (or null/error). 989 990 REFLECTED, 991 // Remote promise resolved to one of our own exports. 992 993 MERGED, 994 // Remote promise resolved to another remote promise which itself wasn't resolved yet, so we 995 // merged them. In this case, `cap` is guaranteed to point to another PromiseClient. 996 997 BROKEN 998 // Resolved to null or error. 999 } resolutionType = UNRESOLVED; 1000 1001 inline bool isResolved() { 1002 return resolutionType != UNRESOLVED; 1003 } 1004 1005 kj::Promise<kj::Own<ClientHook>> resolve(kj::Own<ClientHook> replacement) { 1006 KJ_DASSERT(!isResolved()); 1007 1008 const void* replacementBrand = replacement->getBrand(); 1009 bool isSameConnection = replacementBrand == connectionState.get(); 1010 if (isSameConnection) { 1011 // We resolved to some other RPC capability hosted by the same peer. 1012 KJ_IF_MAYBE(promise, replacement->whenMoreResolved()) { 1013 // We resolved to another remote promise. If *that* promise eventually resolves back 1014 // to us, we'll need a disembargo. Possibilities: 1015 // 1. The other promise hasn't resolved at all yet. In that case we can simply set its 1016 // `receivedCall` flag and let it handle the disembargo later. 1017 // 2. The other promise has received a Resolve message and decided to initiate a 1018 // disembargo which it is still waiting for. In that case we will certainly also need 1019 // a disembargo for the same reason that the other promise did. And, we can't simply 1020 // wait for their disembargo; we need to start a new one of our own. 1021 // 3. The other promise has resolved already (with or without a disembargo). In this 1022 // case we should treat it as if we resolved directly to the other promise's result, 1023 // possibly requiring a disembargo under the same conditions. 1024 1025 // We know the other object is a PromiseClient because it's the only ClientHook 1026 // type in the RPC implementation which returns non-null for `whenMoreResolved()`. 1027 PromiseClient* other = &kj::downcast<PromiseClient>(*replacement); 1028 while (other->resolutionType == MERGED) { 1029 // There's no need to resolve to a thing that's just going to resolve to another thing. 1030 replacement = other->cap->addRef(); 1031 other = &kj::downcast<PromiseClient>(*replacement); 1032 1033 // Note that replacementBrand is unchanged since we'd only merge with other 1034 // PromiseClients on the same connection. 1035 KJ_DASSERT(replacement->getBrand() == replacementBrand); 1036 } 1037 1038 if (other->isResolved()) { 1039 // The other capability resolved already. If it determined that it resolved as 1040 // relfected, then we determine the same. 1041 resolutionType = other->resolutionType; 1042 } else { 1043 // The other capability hasn't resolved yet, so we can safely merge with it and do a 1044 // single combined disembargo if needed later. 1045 other->receivedCall = other->receivedCall || receivedCall; 1046 resolutionType = MERGED; 1047 } 1048 } else { 1049 resolutionType = REMOTE; 1050 } 1051 } else { 1052 if (replacementBrand == &ClientHook::NULL_CAPABILITY_BRAND || 1053 replacementBrand == &ClientHook::BROKEN_CAPABILITY_BRAND) { 1054 // We don't consider null or broken capabilities as "reflected" because they may have 1055 // been communicated to us literally as a null pointer or an exception on the wire, 1056 // rather than as a reference to one of our exports, in which case a disembargo won't 1057 // work. But also, call ordering is completely irrelevant with these so there's no need 1058 // to disembargo anyway. 1059 resolutionType = BROKEN; 1060 } else { 1061 resolutionType = REFLECTED; 1062 } 1063 } 1064 1065 // Every branch above ends by setting resolutionType to something other than UNRESOLVED. 1066 KJ_DASSERT(isResolved()); 1067 1068 // If the original capability was used for streaming calls, it will have a 1069 // `flowController` that might still be shepherding those calls. We'll need make sure that 1070 // it doesn't get thrown away. Note that we know that *cap is an RpcClient because resolve() 1071 // is only called once and our constructor required that the initial capability is an 1072 // RpcClient. 1073 KJ_IF_MAYBE(f, kj::downcast<RpcClient>(*cap).flowController) { 1074 if (isSameConnection) { 1075 // The new target is on the same connection. It would make a lot of sense to keep using 1076 // the same flow controller if possible. 1077 kj::downcast<RpcClient>(*replacement).adoptFlowController(kj::mv(*f)); 1078 } else { 1079 // The new target is something else. The best we can do is wait for the controller to 1080 // drain. New calls will be flow-controlled in a new way without knowing about the old 1081 // controller. 1082 connectionState->tasks.add(f->get()->waitAllAcked().attach(kj::mv(*f))); 1083 } 1084 } 1085 1086 if (resolutionType == REFLECTED && receivedCall && 1087 connectionState->connection.is<Connected>()) { 1088 // The new capability is hosted locally, not on the remote machine. And, we had made calls 1089 // to the promise. We need to make sure those calls echo back to us before we allow new 1090 // calls to go directly to the local capability, so we need to set a local embargo and send 1091 // a `Disembargo` to echo through the peer. 1092 1093 auto message = connectionState->connection.get<Connected>()->newOutgoingMessage( 1094 messageSizeHint<rpc::Disembargo>() + MESSAGE_TARGET_SIZE_HINT); 1095 1096 auto disembargo = message->getBody().initAs<rpc::Message>().initDisembargo(); 1097 1098 { 1099 auto redirect = connectionState->writeTarget(*cap, disembargo.initTarget()); 1100 KJ_ASSERT(redirect == nullptr, 1101 "Original promise target should always be from this RPC connection."); 1102 } 1103 1104 EmbargoId embargoId; 1105 Embargo& embargo = connectionState->embargoes.next(embargoId); 1106 1107 disembargo.getContext().setSenderLoopback(embargoId); 1108 1109 auto paf = kj::newPromiseAndFulfiller<void>(); 1110 embargo.fulfiller = kj::mv(paf.fulfiller); 1111 1112 // Make a promise which resolves to `replacement` as soon as the `Disembargo` comes back. 1113 auto embargoPromise = paf.promise.then([replacement = kj::mv(replacement)]() mutable { 1114 return kj::mv(replacement); 1115 }); 1116 1117 // We need to queue up calls in the meantime, so we'll resolve ourselves to a local promise 1118 // client instead. 1119 replacement = newLocalPromiseClient(kj::mv(embargoPromise)); 1120 1121 // Send the `Disembargo`. 1122 message->send(); 1123 } 1124 1125 cap = replacement->addRef(); 1126 1127 return kj::mv(replacement); 1128 } 1129 }; 1130 1131 kj::Maybe<ExportId> writeDescriptor(ClientHook& cap, rpc::CapDescriptor::Builder descriptor, 1132 kj::Vector<int>& fds) { 1133 // Write a descriptor for the given capability. 1134 1135 // Find the innermost wrapped capability. 1136 ClientHook* inner = ∩ 1137 for (;;) { 1138 KJ_IF_MAYBE(resolved, inner->getResolved()) { 1139 inner = resolved; 1140 } else { 1141 break; 1142 } 1143 } 1144 1145 KJ_IF_MAYBE(fd, inner->getFd()) { 1146 descriptor.setAttachedFd(fds.size()); 1147 fds.add(kj::mv(*fd)); 1148 } 1149 1150 if (inner->getBrand() == this) { 1151 return kj::downcast<RpcClient>(*inner).writeDescriptor(descriptor, fds); 1152 } else { 1153 auto iter = exportsByCap.find(inner); 1154 if (iter != exportsByCap.end()) { 1155 // We've already seen and exported this capability before. Just up the refcount. 1156 auto& exp = KJ_ASSERT_NONNULL(exports.find(iter->second)); 1157 ++exp.refcount; 1158 if (exp.resolveOp == nullptr) { 1159 descriptor.setSenderHosted(iter->second); 1160 } else { 1161 descriptor.setSenderPromise(iter->second); 1162 } 1163 return iter->second; 1164 } else { 1165 // This is the first time we've seen this capability. 1166 ExportId exportId; 1167 auto& exp = exports.next(exportId); 1168 exportsByCap[inner] = exportId; 1169 exp.refcount = 1; 1170 exp.clientHook = inner->addRef(); 1171 1172 KJ_IF_MAYBE(wrapped, inner->whenMoreResolved()) { 1173 // This is a promise. Arrange for the `Resolve` message to be sent later. 1174 exp.resolveOp = resolveExportedPromise(exportId, kj::mv(*wrapped)); 1175 descriptor.setSenderPromise(exportId); 1176 } else { 1177 descriptor.setSenderHosted(exportId); 1178 } 1179 1180 return exportId; 1181 } 1182 } 1183 } 1184 1185 kj::Array<ExportId> writeDescriptors(kj::ArrayPtr<kj::Maybe<kj::Own<ClientHook>>> capTable, 1186 rpc::Payload::Builder payload, kj::Vector<int>& fds) { 1187 if (capTable.size() == 0) { 1188 // Calling initCapTable(0) will still allocate a 1-word tag, which we'd like to avoid... 1189 return nullptr; 1190 } 1191 1192 auto capTableBuilder = payload.initCapTable(capTable.size()); 1193 kj::Vector<ExportId> exports(capTable.size()); 1194 for (uint i: kj::indices(capTable)) { 1195 KJ_IF_MAYBE(cap, capTable[i]) { 1196 KJ_IF_MAYBE(exportId, writeDescriptor(**cap, capTableBuilder[i], fds)) { 1197 exports.add(*exportId); 1198 } 1199 } else { 1200 capTableBuilder[i].setNone(); 1201 } 1202 } 1203 return exports.releaseAsArray(); 1204 } 1205 1206 kj::Maybe<kj::Own<ClientHook>> writeTarget(ClientHook& cap, rpc::MessageTarget::Builder target) { 1207 // If calls to the given capability should pass over this connection, fill in `target` 1208 // appropriately for such a call and return nullptr. Otherwise, return a `ClientHook` to which 1209 // the call should be forwarded; the caller should then delegate the call to that `ClientHook`. 1210 // 1211 // The main case where this ends up returning non-null is if `cap` is a promise that has 1212 // recently resolved. The application might have started building a request before the promise 1213 // resolved, and so the request may have been built on the assumption that it would be sent over 1214 // this network connection, but then the promise resolved to point somewhere else before the 1215 // request was sent. Now the request has to be redirected to the new target instead. 1216 1217 if (cap.getBrand() == this) { 1218 return kj::downcast<RpcClient>(cap).writeTarget(target); 1219 } else { 1220 return cap.addRef(); 1221 } 1222 } 1223 1224 kj::Own<ClientHook> getInnermostClient(ClientHook& client) { 1225 ClientHook* ptr = &client; 1226 for (;;) { 1227 KJ_IF_MAYBE(inner, ptr->getResolved()) { 1228 ptr = inner; 1229 } else { 1230 break; 1231 } 1232 } 1233 1234 if (ptr->getBrand() == this) { 1235 return kj::downcast<RpcClient>(*ptr).getInnermostClient(); 1236 } else { 1237 return ptr->addRef(); 1238 } 1239 } 1240 1241 kj::Promise<void> resolveExportedPromise( 1242 ExportId exportId, kj::Promise<kj::Own<ClientHook>>&& promise) { 1243 // Implements exporting of a promise. The promise has been exported under the given ID, and is 1244 // to eventually resolve to the ClientHook produced by `promise`. This method waits for that 1245 // resolve to happen and then sends the appropriate `Resolve` message to the peer. 1246 1247 return promise.then( 1248 [this,exportId](kj::Own<ClientHook>&& resolution) -> kj::Promise<void> { 1249 // Successful resolution. 1250 1251 KJ_ASSERT(connection.is<Connected>(), 1252 "Resolving export should have been canceled on disconnect.") { 1253 return kj::READY_NOW; 1254 } 1255 1256 // Get the innermost ClientHook backing the resolved client. This includes traversing 1257 // PromiseClients that haven't resolved yet to their underlying ImportClient or 1258 // PipelineClient, so that we get a remote promise that might resolve later. This is 1259 // important to make sure that if the peer sends a `Disembargo` back to us, it bounces back 1260 // correctly instead of going to the result of some future resolution. See the documentation 1261 // for `Disembargo` in `rpc.capnp`. 1262 resolution = getInnermostClient(*resolution); 1263 1264 // Update the export table to point at this object instead. We know that our entry in the 1265 // export table is still live because when it is destroyed the asynchronous resolution task 1266 // (i.e. this code) is canceled. 1267 auto& exp = KJ_ASSERT_NONNULL(exports.find(exportId)); 1268 exportsByCap.erase(exp.clientHook); 1269 exp.clientHook = kj::mv(resolution); 1270 1271 if (exp.clientHook->getBrand() != this) { 1272 // We're resolving to a local capability. If we're resolving to a promise, we might be 1273 // able to reuse our export table entry and avoid sending a message. 1274 1275 KJ_IF_MAYBE(promise, exp.clientHook->whenMoreResolved()) { 1276 // We're replacing a promise with another local promise. In this case, we might actually 1277 // be able to just reuse the existing export table entry to represent the new promise -- 1278 // unless it already has an entry. Let's check. 1279 1280 auto insertResult = exportsByCap.insert(std::make_pair(exp.clientHook.get(), exportId)); 1281 1282 if (insertResult.second) { 1283 // The new promise was not already in the table, therefore the existing export table 1284 // entry has now been repurposed to represent it. There is no need to send a resolve 1285 // message at all. We do, however, have to start resolving the next promise. 1286 return resolveExportedPromise(exportId, kj::mv(*promise)); 1287 } 1288 } 1289 } 1290 1291 // OK, we have to send a `Resolve` message. 1292 auto message = connection.get<Connected>()->newOutgoingMessage( 1293 messageSizeHint<rpc::Resolve>() + sizeInWords<rpc::CapDescriptor>() + 16); 1294 auto resolve = message->getBody().initAs<rpc::Message>().initResolve(); 1295 resolve.setPromiseId(exportId); 1296 kj::Vector<int> fds; 1297 writeDescriptor(*exp.clientHook, resolve.initCap(), fds); 1298 message->setFds(fds.releaseAsArray()); 1299 message->send(); 1300 1301 return kj::READY_NOW; 1302 }, [this,exportId](kj::Exception&& exception) { 1303 // send error resolution 1304 auto message = connection.get<Connected>()->newOutgoingMessage( 1305 messageSizeHint<rpc::Resolve>() + exceptionSizeHint(exception) + 8); 1306 auto resolve = message->getBody().initAs<rpc::Message>().initResolve(); 1307 resolve.setPromiseId(exportId); 1308 fromException(exception, resolve.initException()); 1309 message->send(); 1310 }).eagerlyEvaluate([this](kj::Exception&& exception) { 1311 // Put the exception on the TaskSet which will cause the connection to be terminated. 1312 tasks.add(kj::mv(exception)); 1313 }); 1314 } 1315 1316 void fromException(const kj::Exception& exception, rpc::Exception::Builder builder) { 1317 _::fromException(exception, builder, traceEncoder); 1318 } 1319 1320 // ===================================================================================== 1321 // Interpreting CapDescriptor 1322 1323 kj::Own<ClientHook> import(ImportId importId, bool isPromise, kj::Maybe<kj::AutoCloseFd> fd) { 1324 // Receive a new import. 1325 1326 auto& import = imports[importId]; 1327 kj::Own<ImportClient> importClient; 1328 1329 // Create the ImportClient, or if one already exists, use it. 1330 KJ_IF_MAYBE(c, import.importClient) { 1331 importClient = kj::addRef(*c); 1332 1333 // If the same import is introduced multiple times, and it is missing an FD the first time, 1334 // but it has one on a later attempt, we want to attach the later one. This could happen 1335 // because the first introduction was part of a message that had too many other FDs and went 1336 // over the per-message limit. Perhaps the protocol design is such that this other message 1337 // doesn't really care if the FDs are transferred or not, but the later message really does 1338 // care; it would be bad if the previous message blocked later messages from delivering the 1339 // FD just because it happened to reference the same capability. 1340 importClient->setFdIfMissing(kj::mv(fd)); 1341 } else { 1342 importClient = kj::refcounted<ImportClient>(*this, importId, kj::mv(fd)); 1343 import.importClient = *importClient; 1344 } 1345 1346 // We just received a copy of this import ID, so the remote refcount has gone up. 1347 importClient->addRemoteRef(); 1348 1349 if (isPromise) { 1350 // We need to construct a PromiseClient around this import, if we haven't already. 1351 KJ_IF_MAYBE(c, import.appClient) { 1352 // Use the existing one. 1353 return kj::addRef(*c); 1354 } else { 1355 // Create a promise for this import's resolution. 1356 auto paf = kj::newPromiseAndFulfiller<kj::Own<ClientHook>>(); 1357 import.promiseFulfiller = kj::mv(paf.fulfiller); 1358 1359 // Make sure the import is not destroyed while this promise exists. 1360 paf.promise = paf.promise.attach(kj::addRef(*importClient)); 1361 1362 // Create a PromiseClient around it and return it. 1363 auto result = kj::refcounted<PromiseClient>( 1364 *this, kj::mv(importClient), kj::mv(paf.promise), importId); 1365 import.appClient = *result; 1366 return kj::mv(result); 1367 } 1368 } else { 1369 import.appClient = *importClient; 1370 return kj::mv(importClient); 1371 } 1372 } 1373 1374 class TribbleRaceBlocker: public ClientHook, public kj::Refcounted { 1375 // Hack to work around a problem that arises during the Tribble 4-way Race Condition as 1376 // described in rpc.capnp in the documentation for the `Disembargo` message. 1377 // 1378 // Consider a remote promise that is resolved by a `Resolve` message. PromiseClient::resolve() 1379 // is eventually called and given the `ClientHook` for the resolution. Imagine that the 1380 // `ClientHook` it receives turns out to be an `ImportClient`. There are two ways this could 1381 // have happened: 1382 // 1383 // 1. The `Resolve` message contained a `CapDescriptor` of type `senderHosted`, naming an entry 1384 // in the sender's export table, and the `ImportClient` refers to the corresponding slot on 1385 // the receiver's import table. In this case, no embargo is needed, because messages to the 1386 // resolved location traverse the same path as messages to the promise would have. 1387 // 1388 // 2. The `Resolve` message contained a `CapDescriptor` of type `receiverHosted`, naming an 1389 // entry in the receiver's export table. That entry just happened to contain an 1390 // `ImportClient` refering back to the sender. This specifically happens when the entry 1391 // in question had previously itself referred to a promise, and that promise has since 1392 // resolved to a remote capability, at which point the export table entry was replaced by 1393 // the appropriate `ImportClient` representing that. Presumably, the peer *did not yet know* 1394 // about this resolution, which is why it sent a `receiverHosted` pointing to something that 1395 // reflects back to the sender, rather than sending `senderHosted` in the first place. 1396 // 1397 // In this case, an embargo *is* required, because peer may still be reflecting messages 1398 // sent to this promise back to us. In fact, the peer *must* continue reflecting messages, 1399 // even when it eventually learns that the eventual destination is one of its own 1400 // capabilities, due to the Tribble 4-way Race Condition rule. 1401 // 1402 // Since this case requires an embargo, somehow PromiseClient::resolve() must be able to 1403 // distinguish it from the case (1). One solution would be for us to pass some extra flag 1404 // all the way from where the `Resolve` messages is received to `PromiseClient::resolve()`. 1405 // That solution is reasonably easy in the `Resolve` case, but gets notably more difficult 1406 // in the case of `Return`s, which also resolve promises and are subject to all the same 1407 // problems. In the case of a `Return`, some non-RPC-specific code is involved in the 1408 // resolution, making it harder to pass along a flag. 1409 // 1410 // Instead, we use this hack: When we read an entry in the export table and discover that 1411 // it actually contains an `ImportClient` or a `PipelineClient` reflecting back over our 1412 // own connection, then we wrap it in a `TribbleRaceBlocker`. This wrapper prevents 1413 // `PromiseClient` from recognizing the capability as being remote, so it instead treats it 1414 // as local. That causes it to set up an embargo as desired. 1415 // 1416 // TODO(perf): This actually blocks further promise resolution in the case where the 1417 // ImportClient or PipelineClient itself ends up being yet another promise that resolves 1418 // back over the connection again. What we probably really need to do here is, instead of 1419 // placing `ImportClient` or `PipelineClient` on the export table, place a special type there 1420 // that both knows what to do with future incoming messages to that export ID, but also knows 1421 // what to do when that export is the subject of a `Resolve`. 1422 1423 public: 1424 TribbleRaceBlocker(kj::Own<ClientHook> inner): inner(kj::mv(inner)) {} 1425 1426 Request<AnyPointer, AnyPointer> newCall( 1427 uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override { 1428 return inner->newCall(interfaceId, methodId, sizeHint); 1429 } 1430 VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId, 1431 kj::Own<CallContextHook>&& context) override { 1432 return inner->call(interfaceId, methodId, kj::mv(context)); 1433 } 1434 kj::Maybe<ClientHook&> getResolved() override { 1435 // We always wrap either PipelineClient or ImportClient, both of which return null for this 1436 // anyway. 1437 return nullptr; 1438 } 1439 kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override { 1440 // We always wrap either PipelineClient or ImportClient, both of which return null for this 1441 // anyway. 1442 return nullptr; 1443 } 1444 kj::Own<ClientHook> addRef() override { 1445 return kj::addRef(*this); 1446 } 1447 const void* getBrand() override { 1448 return nullptr; 1449 } 1450 kj::Maybe<int> getFd() override { 1451 return inner->getFd(); 1452 } 1453 1454 private: 1455 kj::Own<ClientHook> inner; 1456 }; 1457 1458 kj::Maybe<kj::Own<ClientHook>> receiveCap(rpc::CapDescriptor::Reader descriptor, 1459 kj::ArrayPtr<kj::AutoCloseFd> fds) { 1460 uint fdIndex = descriptor.getAttachedFd(); 1461 kj::Maybe<kj::AutoCloseFd> fd; 1462 if (fdIndex < fds.size() && fds[fdIndex] != nullptr) { 1463 fd = kj::mv(fds[fdIndex]); 1464 } 1465 1466 switch (descriptor.which()) { 1467 case rpc::CapDescriptor::NONE: 1468 return nullptr; 1469 1470 case rpc::CapDescriptor::SENDER_HOSTED: 1471 return import(descriptor.getSenderHosted(), false, kj::mv(fd)); 1472 case rpc::CapDescriptor::SENDER_PROMISE: 1473 return import(descriptor.getSenderPromise(), true, kj::mv(fd)); 1474 1475 case rpc::CapDescriptor::RECEIVER_HOSTED: 1476 KJ_IF_MAYBE(exp, exports.find(descriptor.getReceiverHosted())) { 1477 auto result = exp->clientHook->addRef(); 1478 if (result->getBrand() == this) { 1479 result = kj::refcounted<TribbleRaceBlocker>(kj::mv(result)); 1480 } 1481 return kj::mv(result); 1482 } else { 1483 return newBrokenCap("invalid 'receiverHosted' export ID"); 1484 } 1485 1486 case rpc::CapDescriptor::RECEIVER_ANSWER: { 1487 auto promisedAnswer = descriptor.getReceiverAnswer(); 1488 1489 KJ_IF_MAYBE(answer, answers.find(promisedAnswer.getQuestionId())) { 1490 if (answer->active) { 1491 KJ_IF_MAYBE(pipeline, answer->pipeline) { 1492 KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) { 1493 auto result = pipeline->get()->getPipelinedCap(*ops); 1494 if (result->getBrand() == this) { 1495 result = kj::refcounted<TribbleRaceBlocker>(kj::mv(result)); 1496 } 1497 return kj::mv(result); 1498 } else { 1499 return newBrokenCap("unrecognized pipeline ops"); 1500 } 1501 } 1502 } 1503 } 1504 1505 return newBrokenCap("invalid 'receiverAnswer'"); 1506 } 1507 1508 case rpc::CapDescriptor::THIRD_PARTY_HOSTED: 1509 // We don't support third-party caps, so use the vine instead. 1510 return import(descriptor.getThirdPartyHosted().getVineId(), false, kj::mv(fd)); 1511 1512 default: 1513 KJ_FAIL_REQUIRE("unknown CapDescriptor type") { break; } 1514 return newBrokenCap("unknown CapDescriptor type"); 1515 } 1516 } 1517 1518 kj::Array<kj::Maybe<kj::Own<ClientHook>>> receiveCaps(List<rpc::CapDescriptor>::Reader capTable, 1519 kj::ArrayPtr<kj::AutoCloseFd> fds) { 1520 auto result = kj::heapArrayBuilder<kj::Maybe<kj::Own<ClientHook>>>(capTable.size()); 1521 for (auto cap: capTable) { 1522 result.add(receiveCap(cap, fds)); 1523 } 1524 return result.finish(); 1525 } 1526 1527 // ===================================================================================== 1528 // RequestHook/PipelineHook/ResponseHook implementations 1529 1530 class QuestionRef: public kj::Refcounted { 1531 // A reference to an entry on the question table. Used to detect when the `Finish` message 1532 // can be sent. 1533 1534 public: 1535 inline QuestionRef( 1536 RpcConnectionState& connectionState, QuestionId id, 1537 kj::Own<kj::PromiseFulfiller<kj::Promise<kj::Own<RpcResponse>>>> fulfiller) 1538 : connectionState(kj::addRef(connectionState)), id(id), fulfiller(kj::mv(fulfiller)) {} 1539 1540 ~QuestionRef() noexcept { 1541 // Contrary to KJ style, we declare this destructor `noexcept` because if anything in here 1542 // throws (without being caught) we're probably in pretty bad shape and going to be crashing 1543 // later anyway. Better to abort now. 1544 1545 auto& question = KJ_ASSERT_NONNULL( 1546 connectionState->questions.find(id), "Question ID no longer on table?"); 1547 1548 // Send the "Finish" message (if the connection is not already broken). 1549 if (connectionState->connection.is<Connected>() && !question.skipFinish) { 1550 KJ_IF_MAYBE(e, kj::runCatchingExceptions([&]() { 1551 auto message = connectionState->connection.get<Connected>()->newOutgoingMessage( 1552 messageSizeHint<rpc::Finish>()); 1553 auto builder = message->getBody().getAs<rpc::Message>().initFinish(); 1554 builder.setQuestionId(id); 1555 // If we're still awaiting a return, then this request is being canceled, and we're going 1556 // to ignore any capabilities in the return message, so set releaseResultCaps true. If we 1557 // already received the return, then we've already built local proxies for the caps and 1558 // will send Release messages when those are destroyed. 1559 builder.setReleaseResultCaps(question.isAwaitingReturn); 1560 message->send(); 1561 })) { 1562 connectionState->disconnect(kj::mv(*e)); 1563 } 1564 } 1565 1566 // Check if the question has returned and, if so, remove it from the table. 1567 // Remove question ID from the table. Must do this *after* sending `Finish` to ensure that 1568 // the ID is not re-allocated before the `Finish` message can be sent. 1569 if (question.isAwaitingReturn) { 1570 // Still waiting for return, so just remove the QuestionRef pointer from the table. 1571 question.selfRef = nullptr; 1572 } else { 1573 // Call has already returned, so we can now remove it from the table. 1574 connectionState->questions.erase(id, question); 1575 } 1576 } 1577 1578 inline QuestionId getId() const { return id; } 1579 1580 void fulfill(kj::Own<RpcResponse>&& response) { 1581 fulfiller->fulfill(kj::mv(response)); 1582 } 1583 1584 void fulfill(kj::Promise<kj::Own<RpcResponse>>&& promise) { 1585 fulfiller->fulfill(kj::mv(promise)); 1586 } 1587 1588 void reject(kj::Exception&& exception) { 1589 fulfiller->reject(kj::mv(exception)); 1590 } 1591 1592 private: 1593 kj::Own<RpcConnectionState> connectionState; 1594 QuestionId id; 1595 kj::Own<kj::PromiseFulfiller<kj::Promise<kj::Own<RpcResponse>>>> fulfiller; 1596 }; 1597 1598 class RpcRequest final: public RequestHook { 1599 public: 1600 RpcRequest(RpcConnectionState& connectionState, VatNetworkBase::Connection& connection, 1601 kj::Maybe<MessageSize> sizeHint, kj::Own<RpcClient>&& target) 1602 : connectionState(kj::addRef(connectionState)), 1603 target(kj::mv(target)), 1604 message(connection.newOutgoingMessage( 1605 firstSegmentSize(sizeHint, messageSizeHint<rpc::Call>() + 1606 sizeInWords<rpc::Payload>() + MESSAGE_TARGET_SIZE_HINT))), 1607 callBuilder(message->getBody().getAs<rpc::Message>().initCall()), 1608 paramsBuilder(capTable.imbue(callBuilder.getParams().getContent())) {} 1609 1610 inline AnyPointer::Builder getRoot() { 1611 return paramsBuilder; 1612 } 1613 inline rpc::Call::Builder getCall() { 1614 return callBuilder; 1615 } 1616 1617 RemotePromise<AnyPointer> send() override { 1618 if (!connectionState->connection.is<Connected>()) { 1619 // Connection is broken. 1620 const kj::Exception& e = connectionState->connection.get<Disconnected>(); 1621 return RemotePromise<AnyPointer>( 1622 kj::Promise<Response<AnyPointer>>(kj::cp(e)), 1623 AnyPointer::Pipeline(newBrokenPipeline(kj::cp(e)))); 1624 } 1625 1626 KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) { 1627 // Whoops, this capability has been redirected while we were building the request! 1628 // We'll have to make a new request and do a copy. Ick. 1629 1630 auto replacement = redirect->get()->newCall( 1631 callBuilder.getInterfaceId(), callBuilder.getMethodId(), paramsBuilder.targetSize()); 1632 replacement.set(paramsBuilder); 1633 return replacement.send(); 1634 } else { 1635 auto sendResult = sendInternal(false); 1636 1637 auto forkedPromise = sendResult.promise.fork(); 1638 1639 // The pipeline must get notified of resolution before the app does to maintain ordering. 1640 auto pipeline = kj::refcounted<RpcPipeline>( 1641 *connectionState, kj::mv(sendResult.questionRef), forkedPromise.addBranch()); 1642 1643 auto appPromise = forkedPromise.addBranch().then( 1644 [=](kj::Own<RpcResponse>&& response) { 1645 auto reader = response->getResults(); 1646 return Response<AnyPointer>(reader, kj::mv(response)); 1647 }); 1648 1649 return RemotePromise<AnyPointer>( 1650 kj::mv(appPromise), 1651 AnyPointer::Pipeline(kj::mv(pipeline))); 1652 } 1653 } 1654 1655 kj::Promise<void> sendStreaming() override { 1656 if (!connectionState->connection.is<Connected>()) { 1657 // Connection is broken. 1658 return kj::cp(connectionState->connection.get<Disconnected>()); 1659 } 1660 1661 KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) { 1662 // Whoops, this capability has been redirected while we were building the request! 1663 // We'll have to make a new request and do a copy. Ick. 1664 1665 auto replacement = redirect->get()->newCall( 1666 callBuilder.getInterfaceId(), callBuilder.getMethodId(), paramsBuilder.targetSize()); 1667 replacement.set(paramsBuilder); 1668 return RequestHook::from(kj::mv(replacement))->sendStreaming(); 1669 } else { 1670 return sendStreamingInternal(false); 1671 } 1672 } 1673 1674 struct TailInfo { 1675 QuestionId questionId; 1676 kj::Promise<void> promise; 1677 kj::Own<PipelineHook> pipeline; 1678 }; 1679 1680 kj::Maybe<TailInfo> tailSend() { 1681 // Send the request as a tail call. 1682 // 1683 // Returns null if for some reason a tail call is not possible and the caller should fall 1684 // back to using send() and copying the response. 1685 1686 SendInternalResult sendResult; 1687 1688 if (!connectionState->connection.is<Connected>()) { 1689 // Disconnected; fall back to a regular send() which will fail appropriately. 1690 return nullptr; 1691 } 1692 1693 KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) { 1694 // Whoops, this capability has been redirected while we were building the request! 1695 // Fall back to regular send(). 1696 return nullptr; 1697 } else { 1698 sendResult = sendInternal(true); 1699 } 1700 1701 auto promise = sendResult.promise.then([](kj::Own<RpcResponse>&& response) { 1702 // Response should be null if `Return` handling code is correct. 1703 KJ_ASSERT(!response) { break; } 1704 }); 1705 1706 QuestionId questionId = sendResult.questionRef->getId(); 1707 1708 auto pipeline = kj::refcounted<RpcPipeline>(*connectionState, kj::mv(sendResult.questionRef)); 1709 1710 return TailInfo { questionId, kj::mv(promise), kj::mv(pipeline) }; 1711 } 1712 1713 const void* getBrand() override { 1714 return connectionState.get(); 1715 } 1716 1717 private: 1718 kj::Own<RpcConnectionState> connectionState; 1719 1720 kj::Own<RpcClient> target; 1721 kj::Own<OutgoingRpcMessage> message; 1722 BuilderCapabilityTable capTable; 1723 rpc::Call::Builder callBuilder; 1724 AnyPointer::Builder paramsBuilder; 1725 1726 struct SendInternalResult { 1727 kj::Own<QuestionRef> questionRef; 1728 kj::Promise<kj::Own<RpcResponse>> promise = nullptr; 1729 }; 1730 1731 struct SetupSendResult: public SendInternalResult { 1732 QuestionId questionId; 1733 Question& question; 1734 1735 SetupSendResult(SendInternalResult&& super, QuestionId questionId, Question& question) 1736 : SendInternalResult(kj::mv(super)), questionId(questionId), question(question) {} 1737 // TODO(cleanup): This constructor is implicit in C++17. 1738 }; 1739 1740 SetupSendResult setupSend(bool isTailCall) { 1741 // Build the cap table. 1742 kj::Vector<int> fds; 1743 auto exports = connectionState->writeDescriptors( 1744 capTable.getTable(), callBuilder.getParams(), fds); 1745 message->setFds(fds.releaseAsArray()); 1746 1747 // Init the question table. Do this after writing descriptors to avoid interference. 1748 QuestionId questionId; 1749 auto& question = connectionState->questions.next(questionId); 1750 question.isAwaitingReturn = true; 1751 question.paramExports = kj::mv(exports); 1752 question.isTailCall = isTailCall; 1753 1754 // Make the QuentionRef and result promise. 1755 SendInternalResult result; 1756 auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>(); 1757 result.questionRef = kj::refcounted<QuestionRef>( 1758 *connectionState, questionId, kj::mv(paf.fulfiller)); 1759 question.selfRef = *result.questionRef; 1760 result.promise = paf.promise.attach(kj::addRef(*result.questionRef)); 1761 1762 return { kj::mv(result), questionId, question }; 1763 } 1764 1765 SendInternalResult sendInternal(bool isTailCall) { 1766 auto result = setupSend(isTailCall); 1767 1768 // Finish and send. 1769 callBuilder.setQuestionId(result.questionId); 1770 if (isTailCall) { 1771 callBuilder.getSendResultsTo().setYourself(); 1772 } 1773 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { 1774 KJ_CONTEXT("sending RPC call", 1775 callBuilder.getInterfaceId(), callBuilder.getMethodId()); 1776 message->send(); 1777 })) { 1778 // We can't safely throw the exception from here since we've already modified the question 1779 // table state. We'll have to reject the promise instead. 1780 result.question.isAwaitingReturn = false; 1781 result.question.skipFinish = true; 1782 connectionState->releaseExports(result.question.paramExports); 1783 result.questionRef->reject(kj::mv(*exception)); 1784 } 1785 1786 // Send and return. 1787 return kj::mv(result); 1788 } 1789 1790 kj::Promise<void> sendStreamingInternal(bool isTailCall) { 1791 auto setup = setupSend(isTailCall); 1792 1793 // Finish and send. 1794 callBuilder.setQuestionId(setup.questionId); 1795 if (isTailCall) { 1796 callBuilder.getSendResultsTo().setYourself(); 1797 } 1798 kj::Promise<void> flowPromise = nullptr; 1799 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { 1800 KJ_CONTEXT("sending RPC call", 1801 callBuilder.getInterfaceId(), callBuilder.getMethodId()); 1802 RpcFlowController* flow; 1803 KJ_IF_MAYBE(f, target->flowController) { 1804 flow = *f; 1805 } else { 1806 flow = target->flowController.emplace( 1807 connectionState->connection.get<Connected>()->newStream()); 1808 } 1809 flowPromise = flow->send(kj::mv(message), setup.promise.ignoreResult()); 1810 })) { 1811 // We can't safely throw the exception from here since we've already modified the question 1812 // table state. We'll have to reject the promise instead. 1813 setup.question.isAwaitingReturn = false; 1814 setup.question.skipFinish = true; 1815 setup.questionRef->reject(kj::cp(*exception)); 1816 return kj::mv(*exception); 1817 } 1818 1819 return kj::mv(flowPromise); 1820 } 1821 }; 1822 1823 class RpcPipeline final: public PipelineHook, public kj::Refcounted { 1824 public: 1825 RpcPipeline(RpcConnectionState& connectionState, kj::Own<QuestionRef>&& questionRef, 1826 kj::Promise<kj::Own<RpcResponse>>&& redirectLaterParam) 1827 : connectionState(kj::addRef(connectionState)), 1828 redirectLater(redirectLaterParam.fork()), 1829 resolveSelfPromise(KJ_ASSERT_NONNULL(redirectLater).addBranch().then( 1830 [this](kj::Own<RpcResponse>&& response) { 1831 resolve(kj::mv(response)); 1832 }, [this](kj::Exception&& exception) { 1833 resolve(kj::mv(exception)); 1834 }).eagerlyEvaluate([&](kj::Exception&& e) { 1835 // Make any exceptions thrown from resolve() go to the connection's TaskSet which 1836 // will cause the connection to be terminated. 1837 connectionState.tasks.add(kj::mv(e)); 1838 })) { 1839 // Construct a new RpcPipeline. 1840 1841 state.init<Waiting>(kj::mv(questionRef)); 1842 } 1843 1844 RpcPipeline(RpcConnectionState& connectionState, kj::Own<QuestionRef>&& questionRef) 1845 : connectionState(kj::addRef(connectionState)), 1846 resolveSelfPromise(nullptr) { 1847 // Construct a new RpcPipeline that is never expected to resolve. 1848 1849 state.init<Waiting>(kj::mv(questionRef)); 1850 } 1851 1852 // implements PipelineHook --------------------------------------- 1853 1854 kj::Own<PipelineHook> addRef() override { 1855 return kj::addRef(*this); 1856 } 1857 1858 kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override { 1859 auto copy = kj::heapArrayBuilder<PipelineOp>(ops.size()); 1860 for (auto& op: ops) { 1861 copy.add(op); 1862 } 1863 return getPipelinedCap(copy.finish()); 1864 } 1865 1866 kj::Own<ClientHook> getPipelinedCap(kj::Array<PipelineOp>&& ops) override { 1867 return clientMap.findOrCreate(ops.asPtr(), [&]() { 1868 if (state.is<Waiting>()) { 1869 // Wrap a PipelineClient in a PromiseClient. 1870 auto pipelineClient = kj::refcounted<PipelineClient>( 1871 *connectionState, kj::addRef(*state.get<Waiting>()), kj::heapArray(ops.asPtr())); 1872 1873 KJ_IF_MAYBE(r, redirectLater) { 1874 auto resolutionPromise = r->addBranch().then( 1875 [ops = kj::heapArray(ops.asPtr())](kj::Own<RpcResponse>&& response) { 1876 return response->getResults().getPipelinedCap(kj::mv(ops)); 1877 }); 1878 1879 return kj::HashMap<kj::Array<PipelineOp>, kj::Own<ClientHook>>::Entry { 1880 kj::mv(ops), 1881 kj::refcounted<PromiseClient>( 1882 *connectionState, kj::mv(pipelineClient), kj::mv(resolutionPromise), nullptr) 1883 }; 1884 } else { 1885 // Oh, this pipeline will never get redirected, so just return the PipelineClient. 1886 return kj::HashMap<kj::Array<PipelineOp>, kj::Own<ClientHook>>::Entry { 1887 kj::mv(ops), kj::mv(pipelineClient) 1888 }; 1889 } 1890 } else if (state.is<Resolved>()) { 1891 auto pipelineClient = state.get<Resolved>()->getResults().getPipelinedCap(ops); 1892 return kj::HashMap<kj::Array<PipelineOp>, kj::Own<ClientHook>>::Entry { 1893 kj::mv(ops), kj::mv(pipelineClient) 1894 }; 1895 } else { 1896 return kj::HashMap<kj::Array<PipelineOp>, kj::Own<ClientHook>>::Entry { 1897 kj::mv(ops), newBrokenCap(kj::cp(state.get<Broken>())) 1898 }; 1899 } 1900 })->addRef(); 1901 } 1902 1903 private: 1904 kj::Own<RpcConnectionState> connectionState; 1905 kj::Maybe<kj::ForkedPromise<kj::Own<RpcResponse>>> redirectLater; 1906 1907 typedef kj::Own<QuestionRef> Waiting; 1908 typedef kj::Own<RpcResponse> Resolved; 1909 typedef kj::Exception Broken; 1910 kj::OneOf<Waiting, Resolved, Broken> state; 1911 1912 kj::HashMap<kj::Array<PipelineOp>, kj::Own<ClientHook>> clientMap; 1913 // See QueuedPipeline::clientMap in capability.c++ for a discussion of why we must memoize 1914 // the results of getPipelinedCap(). RpcPipeline has a similar problem when a capability we 1915 // return is later subject to an embargo. It's important that the embargo is correctly applied 1916 // across all calls to the same capability. 1917 1918 // Keep this last, because the continuation uses *this, so it should be destroyed first to 1919 // ensure the continuation is not still running. 1920 kj::Promise<void> resolveSelfPromise; 1921 1922 void resolve(kj::Own<RpcResponse>&& response) { 1923 KJ_ASSERT(state.is<Waiting>(), "Already resolved?"); 1924 state.init<Resolved>(kj::mv(response)); 1925 } 1926 1927 void resolve(const kj::Exception&& exception) { 1928 KJ_ASSERT(state.is<Waiting>(), "Already resolved?"); 1929 state.init<Broken>(kj::mv(exception)); 1930 } 1931 }; 1932 1933 class RpcResponse: public ResponseHook { 1934 public: 1935 virtual AnyPointer::Reader getResults() = 0; 1936 virtual kj::Own<RpcResponse> addRef() = 0; 1937 }; 1938 1939 class RpcResponseImpl final: public RpcResponse, public kj::Refcounted { 1940 public: 1941 RpcResponseImpl(RpcConnectionState& connectionState, 1942 kj::Own<QuestionRef>&& questionRef, 1943 kj::Own<IncomingRpcMessage>&& message, 1944 kj::Array<kj::Maybe<kj::Own<ClientHook>>> capTableArray, 1945 AnyPointer::Reader results) 1946 : connectionState(kj::addRef(connectionState)), 1947 message(kj::mv(message)), 1948 capTable(kj::mv(capTableArray)), 1949 reader(capTable.imbue(results)), 1950 questionRef(kj::mv(questionRef)) {} 1951 1952 AnyPointer::Reader getResults() override { 1953 return reader; 1954 } 1955 1956 kj::Own<RpcResponse> addRef() override { 1957 return kj::addRef(*this); 1958 } 1959 1960 private: 1961 kj::Own<RpcConnectionState> connectionState; 1962 kj::Own<IncomingRpcMessage> message; 1963 ReaderCapabilityTable capTable; 1964 AnyPointer::Reader reader; 1965 kj::Own<QuestionRef> questionRef; 1966 }; 1967 1968 // ===================================================================================== 1969 // CallContextHook implementation 1970 1971 class RpcServerResponse { 1972 public: 1973 virtual AnyPointer::Builder getResultsBuilder() = 0; 1974 }; 1975 1976 class RpcServerResponseImpl final: public RpcServerResponse { 1977 public: 1978 RpcServerResponseImpl(RpcConnectionState& connectionState, 1979 kj::Own<OutgoingRpcMessage>&& message, 1980 rpc::Payload::Builder payload) 1981 : connectionState(connectionState), 1982 message(kj::mv(message)), 1983 payload(payload) {} 1984 1985 AnyPointer::Builder getResultsBuilder() override { 1986 return capTable.imbue(payload.getContent()); 1987 } 1988 1989 kj::Maybe<kj::Array<ExportId>> send() { 1990 // Send the response and return the export list. Returns nullptr if there were no caps. 1991 // (Could return a non-null empty array if there were caps but none of them were exports.) 1992 1993 // Build the cap table. 1994 auto capTable = this->capTable.getTable(); 1995 kj::Vector<int> fds; 1996 auto exports = connectionState.writeDescriptors(capTable, payload, fds); 1997 message->setFds(fds.releaseAsArray()); 1998 1999 // Capabilities that we are returning are subject to embargos. See `Disembargo` in rpc.capnp. 2000 // As explained there, in order to deal with the Tribble 4-way race condition, we need to 2001 // make sure that if we're returning any remote promises, that we ignore any subsequent 2002 // resolution of those promises for the purpose of pipelined requests on this answer. Luckily, 2003 // we can modify the cap table in-place. 2004 for (auto& slot: capTable) { 2005 KJ_IF_MAYBE(cap, slot) { 2006 slot = connectionState.getInnermostClient(**cap); 2007 } 2008 } 2009 2010 message->send(); 2011 if (capTable.size() == 0) { 2012 return nullptr; 2013 } else { 2014 return kj::mv(exports); 2015 } 2016 } 2017 2018 private: 2019 RpcConnectionState& connectionState; 2020 kj::Own<OutgoingRpcMessage> message; 2021 BuilderCapabilityTable capTable; 2022 rpc::Payload::Builder payload; 2023 }; 2024 2025 class LocallyRedirectedRpcResponse final 2026 : public RpcResponse, public RpcServerResponse, public kj::Refcounted{ 2027 public: 2028 LocallyRedirectedRpcResponse(kj::Maybe<MessageSize> sizeHint) 2029 : message(sizeHint.map([](MessageSize size) { return size.wordCount; }) 2030 .orDefault(SUGGESTED_FIRST_SEGMENT_WORDS)) {} 2031 2032 AnyPointer::Builder getResultsBuilder() override { 2033 return message.getRoot<AnyPointer>(); 2034 } 2035 2036 AnyPointer::Reader getResults() override { 2037 return message.getRoot<AnyPointer>(); 2038 } 2039 2040 kj::Own<RpcResponse> addRef() override { 2041 return kj::addRef(*this); 2042 } 2043 2044 private: 2045 MallocMessageBuilder message; 2046 }; 2047 2048 class RpcCallContext final: public CallContextHook, public kj::Refcounted { 2049 public: 2050 RpcCallContext(RpcConnectionState& connectionState, AnswerId answerId, 2051 kj::Own<IncomingRpcMessage>&& request, 2052 kj::Array<kj::Maybe<kj::Own<ClientHook>>> capTableArray, 2053 const AnyPointer::Reader& params, 2054 bool redirectResults, kj::Own<kj::PromiseFulfiller<void>>&& cancelFulfiller, 2055 uint64_t interfaceId, uint16_t methodId) 2056 : connectionState(kj::addRef(connectionState)), 2057 answerId(answerId), 2058 interfaceId(interfaceId), 2059 methodId(methodId), 2060 requestSize(request->sizeInWords()), 2061 request(kj::mv(request)), 2062 paramsCapTable(kj::mv(capTableArray)), 2063 params(paramsCapTable.imbue(params)), 2064 returnMessage(nullptr), 2065 redirectResults(redirectResults), 2066 cancelFulfiller(kj::mv(cancelFulfiller)) { 2067 connectionState.callWordsInFlight += requestSize; 2068 } 2069 2070 ~RpcCallContext() noexcept(false) { 2071 if (isFirstResponder()) { 2072 // We haven't sent a return yet, so we must have been canceled. Send a cancellation return. 2073 unwindDetector.catchExceptionsIfUnwinding([&]() { 2074 // Don't send anything if the connection is broken. 2075 bool shouldFreePipeline = true; 2076 if (connectionState->connection.is<Connected>()) { 2077 auto message = connectionState->connection.get<Connected>()->newOutgoingMessage( 2078 messageSizeHint<rpc::Return>() + sizeInWords<rpc::Payload>()); 2079 auto builder = message->getBody().initAs<rpc::Message>().initReturn(); 2080 2081 builder.setAnswerId(answerId); 2082 builder.setReleaseParamCaps(false); 2083 2084 if (redirectResults) { 2085 // The reason we haven't sent a return is because the results were sent somewhere 2086 // else. 2087 builder.setResultsSentElsewhere(); 2088 2089 // The pipeline could still be valid and in-use in this case. 2090 shouldFreePipeline = false; 2091 } else { 2092 builder.setCanceled(); 2093 } 2094 2095 message->send(); 2096 } 2097 2098 cleanupAnswerTable(nullptr, shouldFreePipeline); 2099 }); 2100 } 2101 } 2102 2103 kj::Own<RpcResponse> consumeRedirectedResponse() { 2104 KJ_ASSERT(redirectResults); 2105 2106 if (response == nullptr) getResults(MessageSize{0, 0}); // force initialization of response 2107 2108 // Note that the context needs to keep its own reference to the response so that it doesn't 2109 // get GC'd until the PipelineHook drops its reference to the context. 2110 return kj::downcast<LocallyRedirectedRpcResponse>(*KJ_ASSERT_NONNULL(response)).addRef(); 2111 } 2112 2113 void sendReturn() { 2114 KJ_ASSERT(!redirectResults); 2115 2116 // Avoid sending results if canceled so that we don't have to figure out whether or not 2117 // `releaseResultCaps` was set in the already-received `Finish`. 2118 if (!(cancellationFlags & CANCEL_REQUESTED) && isFirstResponder()) { 2119 KJ_ASSERT(connectionState->connection.is<Connected>(), 2120 "Cancellation should have been requested on disconnect.") { 2121 return; 2122 } 2123 2124 if (response == nullptr) getResults(MessageSize{0, 0}); // force initialization of response 2125 2126 returnMessage.setAnswerId(answerId); 2127 returnMessage.setReleaseParamCaps(false); 2128 2129 kj::Maybe<kj::Array<ExportId>> exports; 2130 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { 2131 // Debug info incase send() fails due to overside message. 2132 KJ_CONTEXT("returning from RPC call", interfaceId, methodId); 2133 exports = kj::downcast<RpcServerResponseImpl>(*KJ_ASSERT_NONNULL(response)).send(); 2134 })) { 2135 responseSent = false; 2136 sendErrorReturn(kj::mv(*exception)); 2137 return; 2138 } 2139 2140 KJ_IF_MAYBE(e, exports) { 2141 // Caps were returned, so we can't free the pipeline yet. 2142 cleanupAnswerTable(kj::mv(*e), false); 2143 } else { 2144 // No caps in the results, therefore the pipeline is irrelevant. 2145 cleanupAnswerTable(nullptr, true); 2146 } 2147 } 2148 } 2149 void sendErrorReturn(kj::Exception&& exception) { 2150 KJ_ASSERT(!redirectResults); 2151 if (isFirstResponder()) { 2152 if (connectionState->connection.is<Connected>()) { 2153 auto message = connectionState->connection.get<Connected>()->newOutgoingMessage( 2154 messageSizeHint<rpc::Return>() + exceptionSizeHint(exception)); 2155 auto builder = message->getBody().initAs<rpc::Message>().initReturn(); 2156 2157 builder.setAnswerId(answerId); 2158 builder.setReleaseParamCaps(false); 2159 connectionState->fromException(exception, builder.initException()); 2160 2161 message->send(); 2162 } 2163 2164 // Do not allow releasing the pipeline because we want pipelined calls to propagate the 2165 // exception rather than fail with a "no such field" exception. 2166 cleanupAnswerTable(nullptr, false); 2167 } 2168 } 2169 void sendRedirectReturn() { 2170 KJ_ASSERT(redirectResults); 2171 2172 if (isFirstResponder()) { 2173 auto message = connectionState->connection.get<Connected>()->newOutgoingMessage( 2174 messageSizeHint<rpc::Return>()); 2175 auto builder = message->getBody().initAs<rpc::Message>().initReturn(); 2176 2177 builder.setAnswerId(answerId); 2178 builder.setReleaseParamCaps(false); 2179 builder.setResultsSentElsewhere(); 2180 2181 message->send(); 2182 2183 cleanupAnswerTable(nullptr, false); 2184 } 2185 } 2186 2187 void requestCancel() { 2188 // Hints that the caller wishes to cancel this call. At the next time when cancellation is 2189 // deemed safe, the RpcCallContext shall send a canceled Return -- or if it never becomes 2190 // safe, the RpcCallContext will send a normal return when the call completes. Either way 2191 // the RpcCallContext is now responsible for cleaning up the entry in the answer table, since 2192 // a Finish message was already received. 2193 2194 bool previouslyAllowedButNotRequested = cancellationFlags == CANCEL_ALLOWED; 2195 cancellationFlags |= CANCEL_REQUESTED; 2196 2197 if (previouslyAllowedButNotRequested) { 2198 // We just set CANCEL_REQUESTED, and CANCEL_ALLOWED was already set previously. Initiate 2199 // the cancellation. 2200 cancelFulfiller->fulfill(); 2201 } 2202 } 2203 2204 // implements CallContextHook ------------------------------------ 2205 2206 AnyPointer::Reader getParams() override { 2207 KJ_REQUIRE(request != nullptr, "Can't call getParams() after releaseParams()."); 2208 return params; 2209 } 2210 void releaseParams() override { 2211 request = nullptr; 2212 } 2213 AnyPointer::Builder getResults(kj::Maybe<MessageSize> sizeHint) override { 2214 KJ_IF_MAYBE(r, response) { 2215 return r->get()->getResultsBuilder(); 2216 } else { 2217 kj::Own<RpcServerResponse> response; 2218 2219 if (redirectResults || !connectionState->connection.is<Connected>()) { 2220 response = kj::refcounted<LocallyRedirectedRpcResponse>(sizeHint); 2221 } else { 2222 auto message = connectionState->connection.get<Connected>()->newOutgoingMessage( 2223 firstSegmentSize(sizeHint, messageSizeHint<rpc::Return>() + 2224 sizeInWords<rpc::Payload>())); 2225 returnMessage = message->getBody().initAs<rpc::Message>().initReturn(); 2226 response = kj::heap<RpcServerResponseImpl>( 2227 *connectionState, kj::mv(message), returnMessage.getResults()); 2228 } 2229 2230 auto results = response->getResultsBuilder(); 2231 this->response = kj::mv(response); 2232 return results; 2233 } 2234 } 2235 void setPipeline(kj::Own<PipelineHook>&& pipeline) override { 2236 KJ_IF_MAYBE(f, tailCallPipelineFulfiller) { 2237 f->get()->fulfill(AnyPointer::Pipeline(kj::mv(pipeline))); 2238 } 2239 } 2240 kj::Promise<void> tailCall(kj::Own<RequestHook>&& request) override { 2241 auto result = directTailCall(kj::mv(request)); 2242 KJ_IF_MAYBE(f, tailCallPipelineFulfiller) { 2243 f->get()->fulfill(AnyPointer::Pipeline(kj::mv(result.pipeline))); 2244 } 2245 return kj::mv(result.promise); 2246 } 2247 ClientHook::VoidPromiseAndPipeline directTailCall(kj::Own<RequestHook>&& request) override { 2248 KJ_REQUIRE(response == nullptr, 2249 "Can't call tailCall() after initializing the results struct."); 2250 2251 if (request->getBrand() == connectionState.get() && !redirectResults) { 2252 // The tail call is headed towards the peer that called us in the first place, so we can 2253 // optimize out the return trip. 2254 2255 KJ_IF_MAYBE(tailInfo, kj::downcast<RpcRequest>(*request).tailSend()) { 2256 if (isFirstResponder()) { 2257 if (connectionState->connection.is<Connected>()) { 2258 auto message = connectionState->connection.get<Connected>()->newOutgoingMessage( 2259 messageSizeHint<rpc::Return>()); 2260 auto builder = message->getBody().initAs<rpc::Message>().initReturn(); 2261 2262 builder.setAnswerId(answerId); 2263 builder.setReleaseParamCaps(false); 2264 builder.setTakeFromOtherQuestion(tailInfo->questionId); 2265 2266 message->send(); 2267 } 2268 2269 // There are no caps in our return message, but of course the tail results could have 2270 // caps, so we must continue to honor pipeline calls (and just bounce them back). 2271 cleanupAnswerTable(nullptr, false); 2272 } 2273 return { kj::mv(tailInfo->promise), kj::mv(tailInfo->pipeline) }; 2274 } 2275 } 2276 2277 // Just forwarding to another local call. 2278 auto promise = request->send(); 2279 2280 // Wait for response. 2281 auto voidPromise = promise.then([this](Response<AnyPointer>&& tailResponse) { 2282 // Copy the response. 2283 // TODO(perf): It would be nice if we could somehow make the response get built in-place 2284 // but requires some refactoring. 2285 getResults(tailResponse.targetSize()).set(tailResponse); 2286 }); 2287 2288 return { kj::mv(voidPromise), PipelineHook::from(kj::mv(promise)) }; 2289 } 2290 kj::Promise<AnyPointer::Pipeline> onTailCall() override { 2291 auto paf = kj::newPromiseAndFulfiller<AnyPointer::Pipeline>(); 2292 tailCallPipelineFulfiller = kj::mv(paf.fulfiller); 2293 return kj::mv(paf.promise); 2294 } 2295 void allowCancellation() override { 2296 bool previouslyRequestedButNotAllowed = cancellationFlags == CANCEL_REQUESTED; 2297 cancellationFlags |= CANCEL_ALLOWED; 2298 2299 if (previouslyRequestedButNotAllowed) { 2300 // We just set CANCEL_ALLOWED, and CANCEL_REQUESTED was already set previously. Initiate 2301 // the cancellation. 2302 cancelFulfiller->fulfill(); 2303 } 2304 } 2305 kj::Own<CallContextHook> addRef() override { 2306 return kj::addRef(*this); 2307 } 2308 2309 private: 2310 kj::Own<RpcConnectionState> connectionState; 2311 AnswerId answerId; 2312 2313 uint64_t interfaceId; 2314 uint16_t methodId; 2315 // For debugging. 2316 2317 // Request --------------------------------------------- 2318 2319 size_t requestSize; // for flow limit purposes 2320 kj::Maybe<kj::Own<IncomingRpcMessage>> request; 2321 ReaderCapabilityTable paramsCapTable; 2322 AnyPointer::Reader params; 2323 2324 // Response -------------------------------------------- 2325 2326 kj::Maybe<kj::Own<RpcServerResponse>> response; 2327 rpc::Return::Builder returnMessage; 2328 bool redirectResults = false; 2329 bool responseSent = false; 2330 kj::Maybe<kj::Own<kj::PromiseFulfiller<AnyPointer::Pipeline>>> tailCallPipelineFulfiller; 2331 2332 // Cancellation state ---------------------------------- 2333 2334 enum CancellationFlags { 2335 CANCEL_REQUESTED = 1, 2336 CANCEL_ALLOWED = 2 2337 }; 2338 2339 uint8_t cancellationFlags = 0; 2340 // When both flags are set, the cancellation process will begin. 2341 2342 kj::Own<kj::PromiseFulfiller<void>> cancelFulfiller; 2343 // Fulfilled when cancellation has been both requested and permitted. The fulfilled promise is 2344 // exclusive-joined with the outermost promise waiting on the call return, so fulfilling it 2345 // cancels that promise. 2346 2347 kj::UnwindDetector unwindDetector; 2348 2349 // ----------------------------------------------------- 2350 2351 bool isFirstResponder() { 2352 if (responseSent) { 2353 return false; 2354 } else { 2355 responseSent = true; 2356 return true; 2357 } 2358 } 2359 2360 void cleanupAnswerTable(kj::Array<ExportId> resultExports, bool shouldFreePipeline) { 2361 // We need to remove the `callContext` pointer -- which points back to us -- from the 2362 // answer table. Or we might even be responsible for removing the entire answer table 2363 // entry. 2364 2365 if (cancellationFlags & CANCEL_REQUESTED) { 2366 // Already received `Finish` so it's our job to erase the table entry. We shouldn't have 2367 // sent results if canceled, so we shouldn't have an export list to deal with. 2368 KJ_ASSERT(resultExports.size() == 0); 2369 connectionState->answers.erase(answerId); 2370 } else { 2371 // We just have to null out callContext and set the exports. 2372 auto& answer = connectionState->answers[answerId]; 2373 answer.callContext = nullptr; 2374 answer.resultExports = kj::mv(resultExports); 2375 2376 if (shouldFreePipeline) { 2377 // We can free the pipeline early, because we know all pipeline calls are invalid (e.g. 2378 // because there are no caps in the result to receive pipeline requests). 2379 KJ_ASSERT(resultExports.size() == 0); 2380 answer.pipeline = nullptr; 2381 } 2382 } 2383 2384 // Also, this is the right time to stop counting the call against the flow limit. 2385 connectionState->callWordsInFlight -= requestSize; 2386 connectionState->maybeUnblockFlow(); 2387 } 2388 }; 2389 2390 // ===================================================================================== 2391 // Message handling 2392 2393 void maybeUnblockFlow() { 2394 if (callWordsInFlight < flowLimit) { 2395 KJ_IF_MAYBE(w, flowWaiter) { 2396 w->get()->fulfill(); 2397 flowWaiter = nullptr; 2398 } 2399 } 2400 } 2401 2402 kj::Promise<void> messageLoop() { 2403 if (!connection.is<Connected>()) { 2404 return kj::READY_NOW; 2405 } 2406 2407 if (callWordsInFlight > flowLimit) { 2408 auto paf = kj::newPromiseAndFulfiller<void>(); 2409 flowWaiter = kj::mv(paf.fulfiller); 2410 return paf.promise.then([this]() { 2411 return messageLoop(); 2412 }); 2413 } 2414 2415 return canceler.wrap(connection.get<Connected>()->receiveIncomingMessage()).then( 2416 [this](kj::Maybe<kj::Own<IncomingRpcMessage>>&& message) { 2417 KJ_IF_MAYBE(m, message) { 2418 handleMessage(kj::mv(*m)); 2419 return true; 2420 } else { 2421 disconnect(KJ_EXCEPTION(DISCONNECTED, "Peer disconnected.")); 2422 return false; 2423 } 2424 }).then([this](bool keepGoing) { 2425 // No exceptions; continue loop. 2426 // 2427 // (We do this in a separate continuation to handle the case where exceptions are 2428 // disabled.) 2429 // 2430 // TODO(perf): We add an evalLater() here so that anything we needed to do in reaction to 2431 // the previous message has a chance to complete before the next message is handled. In 2432 // paticular, without this, I observed an ordering problem: I saw a case where a `Return` 2433 // message was followed by a `Resolve` message, but the `PromiseClient` associated with the 2434 // `Resolve` had its `resolve()` method invoked _before_ any `PromiseClient`s associated 2435 // with pipelined capabilities resolved by the `Return`. This could lead to an 2436 // incorrectly-ordered interaction between `PromiseClient`s when they resolve to each 2437 // other. This is probably really a bug in the way `Return`s are handled -- apparently, 2438 // resolution of `PromiseClient`s based on returned capabilites does not occur in a 2439 // depth-first way, when it should. If we could fix that then we can probably remove this 2440 // `evalLater()`. However, the `evalLater()` is not that bad and solves the problem... 2441 if (keepGoing) tasks.add(kj::evalLater([this]() { return messageLoop(); })); 2442 }); 2443 } 2444 2445 void handleMessage(kj::Own<IncomingRpcMessage> message) { 2446 auto reader = message->getBody().getAs<rpc::Message>(); 2447 2448 switch (reader.which()) { 2449 case rpc::Message::UNIMPLEMENTED: 2450 handleUnimplemented(reader.getUnimplemented()); 2451 break; 2452 2453 case rpc::Message::ABORT: 2454 handleAbort(reader.getAbort()); 2455 break; 2456 2457 case rpc::Message::BOOTSTRAP: 2458 handleBootstrap(kj::mv(message), reader.getBootstrap()); 2459 break; 2460 2461 case rpc::Message::CALL: 2462 handleCall(kj::mv(message), reader.getCall()); 2463 break; 2464 2465 case rpc::Message::RETURN: 2466 handleReturn(kj::mv(message), reader.getReturn()); 2467 break; 2468 2469 case rpc::Message::FINISH: 2470 handleFinish(reader.getFinish()); 2471 break; 2472 2473 case rpc::Message::RESOLVE: 2474 handleResolve(kj::mv(message), reader.getResolve()); 2475 break; 2476 2477 case rpc::Message::RELEASE: 2478 handleRelease(reader.getRelease()); 2479 break; 2480 2481 case rpc::Message::DISEMBARGO: 2482 handleDisembargo(reader.getDisembargo()); 2483 break; 2484 2485 default: { 2486 if (connection.is<Connected>()) { 2487 auto message = connection.get<Connected>()->newOutgoingMessage( 2488 firstSegmentSize(reader.totalSize(), messageSizeHint<void>())); 2489 message->getBody().initAs<rpc::Message>().setUnimplemented(reader); 2490 message->send(); 2491 } 2492 break; 2493 } 2494 } 2495 } 2496 2497 void handleUnimplemented(const rpc::Message::Reader& message) { 2498 switch (message.which()) { 2499 case rpc::Message::RESOLVE: { 2500 auto resolve = message.getResolve(); 2501 switch (resolve.which()) { 2502 case rpc::Resolve::CAP: { 2503 auto cap = resolve.getCap(); 2504 switch (cap.which()) { 2505 case rpc::CapDescriptor::NONE: 2506 // Nothing to do (but this ought never to happen). 2507 break; 2508 case rpc::CapDescriptor::SENDER_HOSTED: 2509 releaseExport(cap.getSenderHosted(), 1); 2510 break; 2511 case rpc::CapDescriptor::SENDER_PROMISE: 2512 releaseExport(cap.getSenderPromise(), 1); 2513 break; 2514 case rpc::CapDescriptor::RECEIVER_ANSWER: 2515 case rpc::CapDescriptor::RECEIVER_HOSTED: 2516 // Nothing to do. 2517 break; 2518 case rpc::CapDescriptor::THIRD_PARTY_HOSTED: 2519 releaseExport(cap.getThirdPartyHosted().getVineId(), 1); 2520 break; 2521 } 2522 break; 2523 } 2524 case rpc::Resolve::EXCEPTION: 2525 // Nothing to do. 2526 break; 2527 } 2528 break; 2529 } 2530 2531 default: 2532 KJ_FAIL_ASSERT("Peer did not implement required RPC message type.", (uint)message.which()); 2533 break; 2534 } 2535 } 2536 2537 void handleAbort(const rpc::Exception::Reader& exception) { 2538 kj::throwRecoverableException(toException(exception)); 2539 } 2540 2541 // --------------------------------------------------------------------------- 2542 // Level 0 2543 2544 class SingleCapPipeline: public PipelineHook, public kj::Refcounted { 2545 public: 2546 SingleCapPipeline(kj::Own<ClientHook>&& cap) 2547 : cap(kj::mv(cap)) {} 2548 2549 kj::Own<PipelineHook> addRef() override { 2550 return kj::addRef(*this); 2551 } 2552 2553 kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override { 2554 if (ops.size() == 0) { 2555 return cap->addRef(); 2556 } else { 2557 return newBrokenCap("Invalid pipeline transform."); 2558 } 2559 } 2560 2561 private: 2562 kj::Own<ClientHook> cap; 2563 }; 2564 2565 void handleBootstrap(kj::Own<IncomingRpcMessage>&& message, 2566 const rpc::Bootstrap::Reader& bootstrap) { 2567 AnswerId answerId = bootstrap.getQuestionId(); 2568 2569 if (!connection.is<Connected>()) { 2570 // Disconnected; ignore. 2571 return; 2572 } 2573 2574 VatNetworkBase::Connection& conn = *connection.get<Connected>(); 2575 auto response = conn.newOutgoingMessage( 2576 messageSizeHint<rpc::Return>() + sizeInWords<rpc::CapDescriptor>() + 32); 2577 2578 rpc::Return::Builder ret = response->getBody().getAs<rpc::Message>().initReturn(); 2579 ret.setAnswerId(answerId); 2580 2581 kj::Own<ClientHook> capHook; 2582 kj::Array<ExportId> resultExports; 2583 KJ_DEFER(releaseExports(resultExports)); // in case something goes wrong 2584 2585 // Call the restorer and initialize the answer. 2586 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { 2587 Capability::Client cap = nullptr; 2588 2589 if (bootstrap.hasDeprecatedObjectId()) { 2590 KJ_IF_MAYBE(r, restorer) { 2591 cap = r->baseRestore(bootstrap.getDeprecatedObjectId()); 2592 } else { 2593 KJ_FAIL_REQUIRE("This vat only supports a bootstrap interface, not the old " 2594 "Cap'n-Proto-0.4-style named exports.") { return; } 2595 } 2596 } else { 2597 cap = bootstrapFactory.baseCreateFor(conn.baseGetPeerVatId()); 2598 } 2599 2600 BuilderCapabilityTable capTable; 2601 auto payload = ret.initResults(); 2602 capTable.imbue(payload.getContent()).setAs<Capability>(kj::mv(cap)); 2603 2604 auto capTableArray = capTable.getTable(); 2605 KJ_DASSERT(capTableArray.size() == 1); 2606 kj::Vector<int> fds; 2607 resultExports = writeDescriptors(capTableArray, payload, fds); 2608 response->setFds(fds.releaseAsArray()); 2609 capHook = KJ_ASSERT_NONNULL(capTableArray[0])->addRef(); 2610 })) { 2611 fromException(*exception, ret.initException()); 2612 capHook = newBrokenCap(kj::mv(*exception)); 2613 } 2614 2615 message = nullptr; 2616 2617 // Add the answer to the answer table for pipelining and send the response. 2618 auto& answer = answers[answerId]; 2619 KJ_REQUIRE(!answer.active, "questionId is already in use", answerId) { 2620 return; 2621 } 2622 2623 answer.resultExports = kj::mv(resultExports); 2624 answer.active = true; 2625 answer.pipeline = kj::Own<PipelineHook>(kj::refcounted<SingleCapPipeline>(kj::mv(capHook))); 2626 2627 response->send(); 2628 } 2629 2630 void handleCall(kj::Own<IncomingRpcMessage>&& message, const rpc::Call::Reader& call) { 2631 kj::Own<ClientHook> capability; 2632 2633 KJ_IF_MAYBE(t, getMessageTarget(call.getTarget())) { 2634 capability = kj::mv(*t); 2635 } else { 2636 // Exception already reported. 2637 return; 2638 } 2639 2640 bool redirectResults; 2641 switch (call.getSendResultsTo().which()) { 2642 case rpc::Call::SendResultsTo::CALLER: 2643 redirectResults = false; 2644 break; 2645 case rpc::Call::SendResultsTo::YOURSELF: 2646 redirectResults = true; 2647 break; 2648 default: 2649 KJ_FAIL_REQUIRE("Unsupported `Call.sendResultsTo`.") { return; } 2650 } 2651 2652 auto payload = call.getParams(); 2653 auto capTableArray = receiveCaps(payload.getCapTable(), message->getAttachedFds()); 2654 auto cancelPaf = kj::newPromiseAndFulfiller<void>(); 2655 2656 AnswerId answerId = call.getQuestionId(); 2657 2658 auto context = kj::refcounted<RpcCallContext>( 2659 *this, answerId, kj::mv(message), kj::mv(capTableArray), payload.getContent(), 2660 redirectResults, kj::mv(cancelPaf.fulfiller), 2661 call.getInterfaceId(), call.getMethodId()); 2662 2663 // No more using `call` after this point, as it now belongs to the context. 2664 2665 { 2666 auto& answer = answers[answerId]; 2667 2668 KJ_REQUIRE(!answer.active, "questionId is already in use") { 2669 return; 2670 } 2671 2672 answer.active = true; 2673 answer.callContext = *context; 2674 } 2675 2676 auto promiseAndPipeline = startCall( 2677 call.getInterfaceId(), call.getMethodId(), kj::mv(capability), context->addRef()); 2678 2679 // Things may have changed -- in particular if startCall() immediately called 2680 // context->directTailCall(). 2681 2682 { 2683 auto& answer = answers[answerId]; 2684 2685 answer.pipeline = kj::mv(promiseAndPipeline.pipeline); 2686 2687 if (redirectResults) { 2688 auto resultsPromise = promiseAndPipeline.promise.then( 2689 kj::mvCapture(context, [](kj::Own<RpcCallContext>&& context) { 2690 return context->consumeRedirectedResponse(); 2691 })); 2692 2693 // If the call that later picks up `redirectedResults` decides to discard it, we need to 2694 // make sure our call is not itself canceled unless it has called allowCancellation(). 2695 // So we fork the promise and join one branch with the cancellation promise, in order to 2696 // hold on to it. 2697 auto forked = resultsPromise.fork(); 2698 answer.redirectedResults = forked.addBranch(); 2699 2700 cancelPaf.promise 2701 .exclusiveJoin(forked.addBranch().then([](kj::Own<RpcResponse>&&){})) 2702 .detach([](kj::Exception&&) {}); 2703 } else { 2704 // Hack: Both the success and error continuations need to use the context. We could 2705 // refcount, but both will be destroyed at the same time anyway. 2706 RpcCallContext* contextPtr = context; 2707 2708 promiseAndPipeline.promise.then( 2709 [contextPtr]() { 2710 contextPtr->sendReturn(); 2711 }, [contextPtr](kj::Exception&& exception) { 2712 contextPtr->sendErrorReturn(kj::mv(exception)); 2713 }).catch_([&](kj::Exception&& exception) { 2714 // Handle exceptions that occur in sendReturn()/sendErrorReturn(). 2715 taskFailed(kj::mv(exception)); 2716 }).attach(kj::mv(context)) 2717 .exclusiveJoin(kj::mv(cancelPaf.promise)) 2718 .detach([](kj::Exception&&) {}); 2719 } 2720 } 2721 } 2722 2723 ClientHook::VoidPromiseAndPipeline startCall( 2724 uint64_t interfaceId, uint64_t methodId, 2725 kj::Own<ClientHook>&& capability, kj::Own<CallContextHook>&& context) { 2726 return capability->call(interfaceId, methodId, kj::mv(context)); 2727 } 2728 2729 kj::Maybe<kj::Own<ClientHook>> getMessageTarget(const rpc::MessageTarget::Reader& target) { 2730 switch (target.which()) { 2731 case rpc::MessageTarget::IMPORTED_CAP: { 2732 KJ_IF_MAYBE(exp, exports.find(target.getImportedCap())) { 2733 return exp->clientHook->addRef(); 2734 } else { 2735 KJ_FAIL_REQUIRE("Message target is not a current export ID.") { 2736 return nullptr; 2737 } 2738 } 2739 break; 2740 } 2741 2742 case rpc::MessageTarget::PROMISED_ANSWER: { 2743 auto promisedAnswer = target.getPromisedAnswer(); 2744 kj::Own<PipelineHook> pipeline; 2745 2746 auto& base = answers[promisedAnswer.getQuestionId()]; 2747 KJ_REQUIRE(base.active, "PromisedAnswer.questionId is not a current question.") { 2748 return nullptr; 2749 } 2750 KJ_IF_MAYBE(p, base.pipeline) { 2751 pipeline = p->get()->addRef(); 2752 } else { 2753 pipeline = newBrokenPipeline(KJ_EXCEPTION(FAILED, 2754 "Pipeline call on a request that returned no capabilities or was already closed.")); 2755 } 2756 2757 KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) { 2758 return pipeline->getPipelinedCap(*ops); 2759 } else { 2760 // Exception already thrown. 2761 return nullptr; 2762 } 2763 } 2764 2765 default: 2766 KJ_FAIL_REQUIRE("Unknown message target type.", target) { 2767 return nullptr; 2768 } 2769 } 2770 2771 KJ_UNREACHABLE; 2772 } 2773 2774 void handleReturn(kj::Own<IncomingRpcMessage>&& message, const rpc::Return::Reader& ret) { 2775 // Transitive destructors can end up manipulating the question table and invalidating our 2776 // pointer into it, so make sure these destructors run later. 2777 kj::Array<ExportId> exportsToRelease; 2778 KJ_DEFER(releaseExports(exportsToRelease)); 2779 kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> promiseToRelease; 2780 2781 KJ_IF_MAYBE(question, questions.find(ret.getAnswerId())) { 2782 KJ_REQUIRE(question->isAwaitingReturn, "Duplicate Return.") { return; } 2783 question->isAwaitingReturn = false; 2784 2785 if (ret.getReleaseParamCaps()) { 2786 exportsToRelease = kj::mv(question->paramExports); 2787 } else { 2788 question->paramExports = nullptr; 2789 } 2790 2791 KJ_IF_MAYBE(questionRef, question->selfRef) { 2792 switch (ret.which()) { 2793 case rpc::Return::RESULTS: { 2794 KJ_REQUIRE(!question->isTailCall, 2795 "Tail call `Return` must set `resultsSentElsewhere`, not `results`.") { 2796 return; 2797 } 2798 2799 auto payload = ret.getResults(); 2800 auto capTableArray = receiveCaps(payload.getCapTable(), message->getAttachedFds()); 2801 questionRef->fulfill(kj::refcounted<RpcResponseImpl>( 2802 *this, kj::addRef(*questionRef), kj::mv(message), 2803 kj::mv(capTableArray), payload.getContent())); 2804 break; 2805 } 2806 2807 case rpc::Return::EXCEPTION: 2808 KJ_REQUIRE(!question->isTailCall, 2809 "Tail call `Return` must set `resultsSentElsewhere`, not `exception`.") { 2810 return; 2811 } 2812 2813 questionRef->reject(toException(ret.getException())); 2814 break; 2815 2816 case rpc::Return::CANCELED: 2817 KJ_FAIL_REQUIRE("Return message falsely claims call was canceled.") { return; } 2818 break; 2819 2820 case rpc::Return::RESULTS_SENT_ELSEWHERE: 2821 KJ_REQUIRE(question->isTailCall, 2822 "`Return` had `resultsSentElsewhere` but this was not a tail call.") { 2823 return; 2824 } 2825 2826 // Tail calls are fulfilled with a null pointer. 2827 questionRef->fulfill(kj::Own<RpcResponse>()); 2828 break; 2829 2830 case rpc::Return::TAKE_FROM_OTHER_QUESTION: 2831 KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherQuestion())) { 2832 KJ_IF_MAYBE(response, answer->redirectedResults) { 2833 questionRef->fulfill(kj::mv(*response)); 2834 answer->redirectedResults = nullptr; 2835 2836 KJ_IF_MAYBE(context, answer->callContext) { 2837 // Send the `Return` message for the call of which we're taking ownership, so 2838 // that the peer knows it can now tear down the call state. 2839 context->sendRedirectReturn(); 2840 2841 // There are three conditions, all of which must be true, before a call is 2842 // canceled: 2843 // 1. The RPC opts in by calling context->allowCancellation(). 2844 // 2. We request cancellation with context->requestCancel(). 2845 // 3. The final response promise -- which we passed to questionRef->fulfill() 2846 // above -- must be dropped. 2847 // 2848 // We would like #3 to imply #2. So... we can just make #2 be true. 2849 context->requestCancel(); 2850 } 2851 } else { 2852 KJ_FAIL_REQUIRE("`Return.takeFromOtherQuestion` referenced a call that did not " 2853 "use `sendResultsTo.yourself`.") { return; } 2854 } 2855 } else { 2856 KJ_FAIL_REQUIRE("`Return.takeFromOtherQuestion` had invalid answer ID.") { return; } 2857 } 2858 2859 break; 2860 2861 default: 2862 KJ_FAIL_REQUIRE("Unknown 'Return' type.") { return; } 2863 } 2864 } else { 2865 // This is a response to a question that we canceled earlier. 2866 2867 if (ret.isTakeFromOtherQuestion()) { 2868 // This turned out to be a tail call back to us! We now take ownership of the tail call. 2869 // Since the caller canceled, we need to cancel out the tail call, if it still exists. 2870 2871 KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherQuestion())) { 2872 // Indeed, it does still exist. 2873 2874 // Throw away the result promise. 2875 promiseToRelease = kj::mv(answer->redirectedResults); 2876 2877 KJ_IF_MAYBE(context, answer->callContext) { 2878 // Send the `Return` message for the call of which we're taking ownership, so 2879 // that the peer knows it can now tear down the call state. 2880 context->sendRedirectReturn(); 2881 2882 // Since the caller has been canceled, make sure the callee that we're tailing to 2883 // gets canceled. 2884 context->requestCancel(); 2885 } 2886 } 2887 } 2888 2889 // Looks like this question was canceled earlier, so `Finish` was already sent, with 2890 // `releaseResultCaps` set true so that we don't have to release them here. We can go 2891 // ahead and delete it from the table. 2892 questions.erase(ret.getAnswerId(), *question); 2893 } 2894 2895 } else { 2896 KJ_FAIL_REQUIRE("Invalid question ID in Return message.") { return; } 2897 } 2898 } 2899 2900 void handleFinish(const rpc::Finish::Reader& finish) { 2901 // Delay release of these things until return so that transitive destructors don't accidentally 2902 // modify the answer table and invalidate our pointer into it. 2903 kj::Array<ExportId> exportsToRelease; 2904 KJ_DEFER(releaseExports(exportsToRelease)); 2905 Answer answerToRelease; 2906 kj::Maybe<kj::Own<PipelineHook>> pipelineToRelease; 2907 2908 KJ_IF_MAYBE(answer, answers.find(finish.getQuestionId())) { 2909 KJ_REQUIRE(answer->active, "'Finish' for invalid question ID.") { return; } 2910 2911 if (finish.getReleaseResultCaps()) { 2912 exportsToRelease = kj::mv(answer->resultExports); 2913 } else { 2914 answer->resultExports = nullptr; 2915 } 2916 2917 pipelineToRelease = kj::mv(answer->pipeline); 2918 2919 // If the call isn't actually done yet, cancel it. Otherwise, we can go ahead and erase the 2920 // question from the table. 2921 KJ_IF_MAYBE(context, answer->callContext) { 2922 context->requestCancel(); 2923 } else { 2924 answerToRelease = answers.erase(finish.getQuestionId()); 2925 } 2926 } else { 2927 KJ_FAIL_REQUIRE("'Finish' for invalid question ID.") { return; } 2928 } 2929 } 2930 2931 // --------------------------------------------------------------------------- 2932 // Level 1 2933 2934 void handleResolve(kj::Own<IncomingRpcMessage>&& message, const rpc::Resolve::Reader& resolve) { 2935 kj::Own<ClientHook> replacement; 2936 kj::Maybe<kj::Exception> exception; 2937 2938 // Extract the replacement capability. 2939 switch (resolve.which()) { 2940 case rpc::Resolve::CAP: 2941 KJ_IF_MAYBE(cap, receiveCap(resolve.getCap(), message->getAttachedFds())) { 2942 replacement = kj::mv(*cap); 2943 } else { 2944 KJ_FAIL_REQUIRE("'Resolve' contained 'CapDescriptor.none'.") { return; } 2945 } 2946 break; 2947 2948 case rpc::Resolve::EXCEPTION: 2949 // We can't set `replacement` to a new broken cap here because this will confuse 2950 // PromiseClient::Resolve() into thinking that the remote promise resolved to a local 2951 // capability and therefore a Disembargo is needed. We must actually reject the promise. 2952 exception = toException(resolve.getException()); 2953 break; 2954 2955 default: 2956 KJ_FAIL_REQUIRE("Unknown 'Resolve' type.") { return; } 2957 } 2958 2959 // If the import is on the table, fulfill it. 2960 KJ_IF_MAYBE(import, imports.find(resolve.getPromiseId())) { 2961 KJ_IF_MAYBE(fulfiller, import->promiseFulfiller) { 2962 // OK, this is in fact an unfulfilled promise! 2963 KJ_IF_MAYBE(e, exception) { 2964 fulfiller->get()->reject(kj::mv(*e)); 2965 } else { 2966 fulfiller->get()->fulfill(kj::mv(replacement)); 2967 } 2968 } else if (import->importClient != nullptr) { 2969 // It appears this is a valid entry on the import table, but was not expected to be a 2970 // promise. 2971 KJ_FAIL_REQUIRE("Got 'Resolve' for a non-promise import.") { break; } 2972 } 2973 } 2974 } 2975 2976 void handleRelease(const rpc::Release::Reader& release) { 2977 releaseExport(release.getId(), release.getReferenceCount()); 2978 } 2979 2980 void releaseExport(ExportId id, uint refcount) { 2981 KJ_IF_MAYBE(exp, exports.find(id)) { 2982 KJ_REQUIRE(refcount <= exp->refcount, "Tried to drop export's refcount below zero.") { 2983 return; 2984 } 2985 2986 exp->refcount -= refcount; 2987 if (exp->refcount == 0) { 2988 exportsByCap.erase(exp->clientHook); 2989 exports.erase(id, *exp); 2990 } 2991 } else { 2992 KJ_FAIL_REQUIRE("Tried to release invalid export ID.") { 2993 return; 2994 } 2995 } 2996 } 2997 2998 void releaseExports(kj::ArrayPtr<ExportId> exports) { 2999 for (auto exportId: exports) { 3000 releaseExport(exportId, 1); 3001 } 3002 } 3003 3004 void handleDisembargo(const rpc::Disembargo::Reader& disembargo) { 3005 auto context = disembargo.getContext(); 3006 switch (context.which()) { 3007 case rpc::Disembargo::Context::SENDER_LOOPBACK: { 3008 kj::Own<ClientHook> target; 3009 3010 KJ_IF_MAYBE(t, getMessageTarget(disembargo.getTarget())) { 3011 target = kj::mv(*t); 3012 } else { 3013 // Exception already reported. 3014 return; 3015 } 3016 3017 for (;;) { 3018 KJ_IF_MAYBE(r, target->getResolved()) { 3019 target = r->addRef(); 3020 } else { 3021 break; 3022 } 3023 } 3024 3025 KJ_REQUIRE(target->getBrand() == this, 3026 "'Disembargo' of type 'senderLoopback' sent to an object that does not point " 3027 "back to the sender.") { 3028 return; 3029 } 3030 3031 EmbargoId embargoId = context.getSenderLoopback(); 3032 3033 // We need to insert an evalLast() here to make sure that any pending calls towards this 3034 // cap have had time to find their way through the event loop. 3035 tasks.add(canceler.wrap(kj::evalLast(kj::mvCapture( 3036 target, [this,embargoId](kj::Own<ClientHook>&& target) { 3037 if (!connection.is<Connected>()) { 3038 return; 3039 } 3040 3041 RpcClient& downcasted = kj::downcast<RpcClient>(*target); 3042 3043 auto message = connection.get<Connected>()->newOutgoingMessage( 3044 messageSizeHint<rpc::Disembargo>() + MESSAGE_TARGET_SIZE_HINT); 3045 auto builder = message->getBody().initAs<rpc::Message>().initDisembargo(); 3046 3047 { 3048 auto redirect = downcasted.writeTarget(builder.initTarget()); 3049 3050 // Disembargoes should only be sent to capabilities that were previously the subject of 3051 // a `Resolve` message. But `writeTarget` only ever returns non-null when called on 3052 // a PromiseClient. The code which sends `Resolve` and `Return` should have replaced 3053 // any promise with a direct node in order to solve the Tribble 4-way race condition. 3054 // See the documentation of Disembargo in rpc.capnp for more. 3055 KJ_REQUIRE(redirect == nullptr, 3056 "'Disembargo' of type 'senderLoopback' sent to an object that does not " 3057 "appear to have been the subject of a previous 'Resolve' message.") { 3058 return; 3059 } 3060 } 3061 3062 builder.getContext().setReceiverLoopback(embargoId); 3063 3064 message->send(); 3065 })))); 3066 3067 break; 3068 } 3069 3070 case rpc::Disembargo::Context::RECEIVER_LOOPBACK: { 3071 KJ_IF_MAYBE(embargo, embargoes.find(context.getReceiverLoopback())) { 3072 KJ_ASSERT_NONNULL(embargo->fulfiller)->fulfill(); 3073 embargoes.erase(context.getReceiverLoopback(), *embargo); 3074 } else { 3075 KJ_FAIL_REQUIRE("Invalid embargo ID in 'Disembargo.context.receiverLoopback'.") { 3076 return; 3077 } 3078 } 3079 break; 3080 } 3081 3082 default: 3083 KJ_FAIL_REQUIRE("Unimplemented Disembargo type.", disembargo) { return; } 3084 } 3085 } 3086 3087 // --------------------------------------------------------------------------- 3088 // Level 2 3089 }; 3090 3091 } // namespace 3092 3093 class RpcSystemBase::Impl final: private BootstrapFactoryBase, private kj::TaskSet::ErrorHandler { 3094 public: 3095 Impl(VatNetworkBase& network, kj::Maybe<Capability::Client> bootstrapInterface) 3096 : network(network), bootstrapInterface(kj::mv(bootstrapInterface)), 3097 bootstrapFactory(*this), tasks(*this) { 3098 acceptLoopPromise = acceptLoop().eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); }); 3099 } 3100 Impl(VatNetworkBase& network, BootstrapFactoryBase& bootstrapFactory) 3101 : network(network), bootstrapFactory(bootstrapFactory), tasks(*this) { 3102 acceptLoopPromise = acceptLoop().eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); }); 3103 } 3104 Impl(VatNetworkBase& network, SturdyRefRestorerBase& restorer) 3105 : network(network), bootstrapFactory(*this), restorer(restorer), tasks(*this) { 3106 acceptLoopPromise = acceptLoop().eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); }); 3107 } 3108 3109 ~Impl() noexcept(false) { 3110 unwindDetector.catchExceptionsIfUnwinding([&]() { 3111 // std::unordered_map doesn't like it when elements' destructors throw, so carefully 3112 // disassemble it. 3113 if (!connections.empty()) { 3114 kj::Vector<kj::Own<RpcConnectionState>> deleteMe(connections.size()); 3115 kj::Exception shutdownException = KJ_EXCEPTION(DISCONNECTED, "RpcSystem was destroyed."); 3116 for (auto& entry: connections) { 3117 entry.second->disconnect(kj::cp(shutdownException)); 3118 deleteMe.add(kj::mv(entry.second)); 3119 } 3120 } 3121 }); 3122 } 3123 3124 Capability::Client bootstrap(AnyStruct::Reader vatId) { 3125 // For now we delegate to restore() since it's equivalent, but eventually we'll remove restore() 3126 // and implement bootstrap() directly. 3127 return restore(vatId, AnyPointer::Reader()); 3128 } 3129 3130 Capability::Client restore(AnyStruct::Reader vatId, AnyPointer::Reader objectId) { 3131 KJ_IF_MAYBE(connection, network.baseConnect(vatId)) { 3132 auto& state = getConnectionState(kj::mv(*connection)); 3133 return Capability::Client(state.restore(objectId)); 3134 } else if (objectId.isNull()) { 3135 // Turns out `vatId` refers to ourselves, so we can also pass it as the client ID for 3136 // baseCreateFor(). 3137 return bootstrapFactory.baseCreateFor(vatId); 3138 } else KJ_IF_MAYBE(r, restorer) { 3139 return r->baseRestore(objectId); 3140 } else { 3141 return Capability::Client(newBrokenCap( 3142 "This vat only supports a bootstrap interface, not the old Cap'n-Proto-0.4-style " 3143 "named exports.")); 3144 } 3145 } 3146 3147 void setFlowLimit(size_t words) { 3148 flowLimit = words; 3149 3150 for (auto& conn: connections) { 3151 conn.second->setFlowLimit(words); 3152 } 3153 } 3154 3155 void setTraceEncoder(kj::Function<kj::String(const kj::Exception&)> func) { 3156 traceEncoder = kj::mv(func); 3157 } 3158 3159 kj::Promise<void> run() { return kj::mv(acceptLoopPromise); } 3160 3161 private: 3162 VatNetworkBase& network; 3163 kj::Maybe<Capability::Client> bootstrapInterface; 3164 BootstrapFactoryBase& bootstrapFactory; 3165 kj::Maybe<SturdyRefRestorerBase&> restorer; 3166 size_t flowLimit = kj::maxValue; 3167 kj::Maybe<kj::Function<kj::String(const kj::Exception&)>> traceEncoder; 3168 kj::Promise<void> acceptLoopPromise = nullptr; 3169 kj::TaskSet tasks; 3170 3171 typedef std::unordered_map<VatNetworkBase::Connection*, kj::Own<RpcConnectionState>> 3172 ConnectionMap; 3173 ConnectionMap connections; 3174 3175 kj::UnwindDetector unwindDetector; 3176 3177 RpcConnectionState& getConnectionState(kj::Own<VatNetworkBase::Connection>&& connection) { 3178 auto iter = connections.find(connection); 3179 if (iter == connections.end()) { 3180 VatNetworkBase::Connection* connectionPtr = connection; 3181 auto onDisconnect = kj::newPromiseAndFulfiller<RpcConnectionState::DisconnectInfo>(); 3182 tasks.add(onDisconnect.promise 3183 .then([this,connectionPtr](RpcConnectionState::DisconnectInfo info) { 3184 connections.erase(connectionPtr); 3185 tasks.add(kj::mv(info.shutdownPromise)); 3186 })); 3187 auto newState = kj::refcounted<RpcConnectionState>( 3188 bootstrapFactory, restorer, kj::mv(connection), 3189 kj::mv(onDisconnect.fulfiller), flowLimit, traceEncoder); 3190 RpcConnectionState& result = *newState; 3191 connections.insert(std::make_pair(connectionPtr, kj::mv(newState))); 3192 return result; 3193 } else { 3194 return *iter->second; 3195 } 3196 } 3197 3198 kj::Promise<void> acceptLoop() { 3199 return network.baseAccept().then( 3200 [this](kj::Own<VatNetworkBase::Connection>&& connection) { 3201 getConnectionState(kj::mv(connection)); 3202 return acceptLoop(); 3203 }); 3204 } 3205 3206 Capability::Client baseCreateFor(AnyStruct::Reader clientId) override { 3207 // Implements BootstrapFactory::baseCreateFor() in terms of `bootstrapInterface` or `restorer`, 3208 // for use when we were given one of those instead of an actual `bootstrapFactory`. 3209 3210 KJ_IF_MAYBE(cap, bootstrapInterface) { 3211 return *cap; 3212 } else KJ_IF_MAYBE(r, restorer) { 3213 return r->baseRestore(AnyPointer::Reader()); 3214 } else { 3215 return KJ_EXCEPTION(FAILED, "This vat does not expose any public/bootstrap interfaces."); 3216 } 3217 } 3218 3219 void taskFailed(kj::Exception&& exception) override { 3220 KJ_LOG(ERROR, exception); 3221 } 3222 }; 3223 3224 RpcSystemBase::RpcSystemBase(VatNetworkBase& network, 3225 kj::Maybe<Capability::Client> bootstrapInterface) 3226 : impl(kj::heap<Impl>(network, kj::mv(bootstrapInterface))) {} 3227 RpcSystemBase::RpcSystemBase(VatNetworkBase& network, 3228 BootstrapFactoryBase& bootstrapFactory) 3229 : impl(kj::heap<Impl>(network, bootstrapFactory)) {} 3230 RpcSystemBase::RpcSystemBase(VatNetworkBase& network, SturdyRefRestorerBase& restorer) 3231 : impl(kj::heap<Impl>(network, restorer)) {} 3232 RpcSystemBase::RpcSystemBase(RpcSystemBase&& other) noexcept = default; 3233 RpcSystemBase::~RpcSystemBase() noexcept(false) {} 3234 3235 Capability::Client RpcSystemBase::baseBootstrap(AnyStruct::Reader vatId) { 3236 return impl->bootstrap(vatId); 3237 } 3238 3239 Capability::Client RpcSystemBase::baseRestore( 3240 AnyStruct::Reader hostId, AnyPointer::Reader objectId) { 3241 return impl->restore(hostId, objectId); 3242 } 3243 3244 void RpcSystemBase::baseSetFlowLimit(size_t words) { 3245 return impl->setFlowLimit(words); 3246 } 3247 3248 void RpcSystemBase::setTraceEncoder(kj::Function<kj::String(const kj::Exception&)> func) { 3249 impl->setTraceEncoder(kj::mv(func)); 3250 } 3251 3252 kj::Promise<void> RpcSystemBase::run() { 3253 return impl->run(); 3254 } 3255 3256 } // namespace _ (private) 3257 3258 // ======================================================================================= 3259 3260 namespace { 3261 3262 class WindowFlowController final: public RpcFlowController, private kj::TaskSet::ErrorHandler { 3263 public: 3264 WindowFlowController(RpcFlowController::WindowGetter& windowGetter) 3265 : windowGetter(windowGetter), tasks(*this) { 3266 state.init<Running>(); 3267 } 3268 3269 kj::Promise<void> send(kj::Own<OutgoingRpcMessage> message, kj::Promise<void> ack) override { 3270 auto size = message->sizeInWords() * sizeof(capnp::word); 3271 maxMessageSize = kj::max(size, maxMessageSize); 3272 3273 // We are REQUIRED to send the message NOW to maintain correct ordering. 3274 message->send(); 3275 3276 inFlight += size; 3277 tasks.add(ack.then([this, size]() { 3278 inFlight -= size; 3279 KJ_SWITCH_ONEOF(state) { 3280 KJ_CASE_ONEOF(blockedSends, Running) { 3281 if (isReady()) { 3282 // Release all fulfillers. 3283 for (auto& fulfiller: blockedSends) { 3284 fulfiller->fulfill(); 3285 } 3286 blockedSends.clear(); 3287 3288 } 3289 3290 KJ_IF_MAYBE(f, emptyFulfiller) { 3291 if (inFlight == 0) { 3292 f->get()->fulfill(tasks.onEmpty()); 3293 } 3294 } 3295 } 3296 KJ_CASE_ONEOF(exception, kj::Exception) { 3297 // A previous call failed, but this one -- which was already in-flight at the time -- 3298 // ended up succeeding. That may indicate that the server side is not properly 3299 // handling streaming error propagation. Nothing much we can do about it here though. 3300 } 3301 } 3302 })); 3303 3304 KJ_SWITCH_ONEOF(state) { 3305 KJ_CASE_ONEOF(blockedSends, Running) { 3306 if (isReady()) { 3307 return kj::READY_NOW; 3308 } else { 3309 auto paf = kj::newPromiseAndFulfiller<void>(); 3310 blockedSends.add(kj::mv(paf.fulfiller)); 3311 return kj::mv(paf.promise); 3312 } 3313 } 3314 KJ_CASE_ONEOF(exception, kj::Exception) { 3315 return kj::cp(exception); 3316 } 3317 } 3318 KJ_UNREACHABLE; 3319 } 3320 3321 kj::Promise<void> waitAllAcked() override { 3322 KJ_IF_MAYBE(q, state.tryGet<Running>()) { 3323 if (!q->empty()) { 3324 auto paf = kj::newPromiseAndFulfiller<kj::Promise<void>>(); 3325 emptyFulfiller = kj::mv(paf.fulfiller); 3326 return kj::mv(paf.promise); 3327 } 3328 } 3329 return tasks.onEmpty(); 3330 } 3331 3332 private: 3333 RpcFlowController::WindowGetter& windowGetter; 3334 size_t inFlight = 0; 3335 size_t maxMessageSize = 0; 3336 3337 typedef kj::Vector<kj::Own<kj::PromiseFulfiller<void>>> Running; 3338 kj::OneOf<Running, kj::Exception> state; 3339 3340 kj::Maybe<kj::Own<kj::PromiseFulfiller<kj::Promise<void>>>> emptyFulfiller; 3341 3342 kj::TaskSet tasks; 3343 3344 void taskFailed(kj::Exception&& exception) override { 3345 KJ_SWITCH_ONEOF(state) { 3346 KJ_CASE_ONEOF(blockedSends, Running) { 3347 // Fail out all pending sends. 3348 for (auto& fulfiller: blockedSends) { 3349 fulfiller->reject(kj::cp(exception)); 3350 } 3351 // Fail out all future sends. 3352 state = kj::mv(exception); 3353 } 3354 KJ_CASE_ONEOF(exception, kj::Exception) { 3355 // ignore redundant exception 3356 } 3357 } 3358 } 3359 3360 bool isReady() { 3361 // We extend the window by maxMessageSize to avoid a pathological situation when a message 3362 // is larger than the window size. Otherwise, after sending that message, we would end up 3363 // not sending any others until the ack was received, wasting a round trip's worth of 3364 // bandwidth. 3365 return inFlight <= maxMessageSize // avoid getWindow() call if unnecessary 3366 || inFlight < windowGetter.getWindow() + maxMessageSize; 3367 } 3368 }; 3369 3370 class FixedWindowFlowController final 3371 : public RpcFlowController, public RpcFlowController::WindowGetter { 3372 public: 3373 FixedWindowFlowController(size_t windowSize): windowSize(windowSize), inner(*this) {} 3374 3375 kj::Promise<void> send(kj::Own<OutgoingRpcMessage> message, kj::Promise<void> ack) override { 3376 return inner.send(kj::mv(message), kj::mv(ack)); 3377 } 3378 3379 kj::Promise<void> waitAllAcked() override { 3380 return inner.waitAllAcked(); 3381 } 3382 3383 size_t getWindow() override { return windowSize; } 3384 3385 private: 3386 size_t windowSize; 3387 WindowFlowController inner; 3388 }; 3389 3390 } // namespace 3391 3392 kj::Own<RpcFlowController> RpcFlowController::newFixedWindowController(size_t windowSize) { 3393 return kj::heap<FixedWindowFlowController>(windowSize); 3394 } 3395 kj::Own<RpcFlowController> RpcFlowController::newVariableWindowController(WindowGetter& getter) { 3396 return kj::heap<WindowFlowController>(getter); 3397 } 3398 3399 } // namespace capnp